Threading & Queue
Posté le jeu. 10 novembre 2016 dans python
Préambule : décorateur pour tracer les exécutions¶
On commence par créer un décorateur qui nous permet de stocker dans la liste executions
le set (thread_name, t0, t1)
, à savoir le nom du thread réalisant l'exécution, ainsi que les dates de début et de fin d'exécution de la fonction.
import time
import threading
def timer(executions):
def decorated(func):
def wrapper(*args, **kwargs):
t0 = time.time()
output = func(*args, **kwargs)
t1 = time.time()
time.sleep(0.01)
executions.append((threading.current_thread().name, t0, t1))
return output
return wrapper
return decorated
Fonction de test : travail matriciel¶
On crée également la fonction sur laquelle on va chercher à travailler. Ici on crée deux matrices (listes de listes) aléatoires de taille n, on calcule leur produit, et on écrit le résultat dans un fichier temporaire.
Cela permet d'avoir une fonction qui va d'une part réaliser des actions en mémoire (les calculs matriciels) et sur le disque (écriture fichier).
Et on décore la fonction avec notre décorateur pour permettre de tracer des appels.
#import numpy as np
import tempfile
from random import random
EXECUTIONS = []
@timer(EXECUTIONS)
def matrix_function(n):
# Création de deux "matrices" aléatoires
a = [[random() for _ in range(n)] for _ in range(n)]
b = [[random() for _ in range(n)] for _ in range(n)]
# Calcul de leur produit
c = [[0. for _ in range(n)] for _ in range(n)]
for i in range(n):
for j in range(n):
c[i][j] = sum([a[i][k]*b[k][j] for k in range(n)])
# Export vers un fichier
with tempfile.TemporaryFile(mode='w') as fp:
for i in range(n):
for j in range(n):
fp.write('{} '.format(c[i][j]))
fp.write('\n')
A noter que rien dans cette fonction n'est optimisé ; le but est justement d'avoir des traitements coûteux pour mieux appréhender les temps de calcul.
# Nombre d'appels à la fonction
N = 100
# Tailles des matrices à créer
nmin, nmax = 40, 50
sizes = [int(nmin + (nmax-nmin)*random()) for _ in range(N)]
del EXECUTIONS[:]
for value in sizes:
matrix_function(value)
Représentation des durées d'exécution¶
Pour représenter les exécutions de la fonction, on crée une fonction qui travaille sur la liste EXECUTIONS
.
from matplotlib import pyplot as plt
def plot_executions(executions):
# Noms des threads, utilisés pour graduer l'axe "y"
names = sorted(list(set([exe[0] for exe in executions])))
# Ordonnées du graphe : indice du thread dans la liste des noms
y = [names.index(val[0])+1 for val in executions]
# Abscisses : temps de début et de fin
xstart = [val[1] for val in executions]
xmin = min(xstart)
xstart = [val-xmin for val in xstart]
xstop = [val[2]-xmin for val in executions]
# Couleurs :
colors = ['bc'[i%2] for i in range(len(executions))]
# Tracé
maxy = max(y)
maxxstop = max(xstop)
plt.figure(figsize=(10, (4*maxy+84)/46))
plt.hlines(y, xstart, xstop, colors, lw=4)
plt.vlines([maxxstop], [0.], [maxy+1.], 'red', lw=0.5)
plt.text(maxxstop, 0., 'Fin : {:.3}s'.format(maxxstop),
horizontalalignment='center',
verticalalignment='bottom',
color='red')
plt.xlim(0., int(maxxstop)+1)
plt.ylim(0., maxy+1.)
plt.xlabel('Temps d\'exécution (s)')
plt.ylabel('Thread')
plt.yticks([val for val in range(1, maxy+1, 2)])
plt.show()
Et on l'appelle...
plot_executions(EXECUTIONS)
Threading¶
Pour utiliser le multi-threading, il suffit d'instancier des threading.Thread
auxquels on passe comme arguments le nom de la fonction à exécuter (ici matrix_function
), et ses arguments (la taille de la matrice).
On crée donc ici un thread par appel à la fonction. Dans notre cas, on crée 100 threads, chacun travaillant sur un appel à la fonction.
Une fois le thread créé, on le démarre (via start
), et on le garde bien au chaud dans la liste threads
.
Pour attendre la fin de l'exécution des threads, on appelle enfin la méthode join
sur les threads lancés.
import threading
del EXECUTIONS[:]
threads = []
for value in sizes:
th = threading.Thread(target=matrix_function,
args=(value,))
th.start()
threads.append(th)
for th in threads:
th.join()
De la même manière que pour l'exécution séquentielle, on représente les durées d'exécution par thread.
plot_executions(EXECUTIONS)
Queue¶
Ici la manière d'utiliser les threads est différente. On crée une classe dérivée de threading.Thread
, que l'on instancie un nombre limité de fois en lui passant une instance de queue.Queue
. Cette queue sera remplie par les exécutions de matrix_function
à réaliser. A l'exécution de la méthode run
, ces threads vont aller piocher dans la queue les tâches à réaliser jusqu'à épuisement.
import queue
class MyThread(threading.Thread):
"""Classe de définition de mes threads.
"""
def __init__(self, q):
"""Constructeur.
:param q: instance de queue.Queue
"""
threading.Thread.__init__(self)
self.q = q
def run(self):
"""Lance le thread.
Tant que l'ordre n'est pas donné de tuer ce thread, alors
on réalise des exécutions.
"""
while True:
# Obtient un élément de la queue
item = self.q.get()
# Cet élément a été entré en tant que (fonction, *arguments)
function, args = item[0], item[1:]
# Appel à la fonction
function(*args)
# Finalisation de la tâche
self.q.task_done()
Pour utiliser cette classe, on réalise les traitements suivants :
- création d'une queue que l'on remplira plus loin avec les exécutions à réaliser ;
- création d'un nombre figé de threads, et démarrage de ces threads ;
- remplissage de la queue par des tuples
(fonction, arguments)
- demande au thread principal d'attendre la fin d'exécution des autres threads.
A noter qu'à l'étape 3, dès le premier q.put
un thread se met en route pour exécuter sa tâche.
# Création de la queue contenant les configurations à générer
q = queue.Queue()
# Nombre de threads à générer
nb_threads = 4
# Création des threads pour la génération des configurations
for i in range(nb_threads):
thread = MyThread(q)
thread.setDaemon(True)
thread.start()
# Remplissage de la queue
del EXECUTIONS[:]
for value in sizes:
q.put((matrix_function, value))
# Attend que tous les calculs de la queue soient finis
q.join()
Comme les fois précédentes, on représente les temps d'exécution des différentes fonctions.
plot_executions(EXECUTIONS)
Performance en fonction du nombre de threads¶
Sur les tests précédents, on peut se demander l'intérêt d'utiliser des queue.Queue
: le temps de calcul est assez proche si on les utilise ou pas.
Premier argument : le nombre de threads lançable en même temps n'est pas illimité, et on risque de tout casser si on dépasse la limite.
Second argument : il se trouve que notre fonction n'apprécie pas d'être lancée de trop nombreuses fois en même temps. Pour avoir fait les tests, il semblerait que ce soit la partie écritures disque qui soit limitante, probablement à cause d'accès concurrents au disque. Aussi à mesure que le nombre de threads augmente, le temps de calcul va tout d'abord décroître pour augmenter ensuite.
Le code suivant permet d'illustrer mon propos. On lance le code pour différents nombres de threads, et on stocke le couple (nombre de threads, temps de calcul).
import sys
results = []
nb_threads_list = list(range(1, 10))+list(range(10, 30, 5))+list(range(30, 51, 10))
for _ in range(3):
for nb_threads in nb_threads_list:
sys.stdout.write("\rExecution avec {} threads...".format(nb_threads))
sys.stdout.flush()
# Création de la queue contenant les configurations à générer
q = queue.Queue()
# Création des threads pour la génération des configurations
for i in range(nb_threads):
thread = MyThread(q)
thread.setDaemon(True)
thread.start()
# Remplissage de la queue
del EXECUTIONS[:]
T0 = time.time()
for value in sizes:
q.put((matrix_function, value))
# Attend que tous les calculs de la queue soient finis
q.join()
T1 = time.time()
results.append((nb_threads, T1-T0))
On représente alors le temps de calcul en fonction du nombre de threads...
exe_time_list = []
for nb_threads in nb_threads_list:
min_val = min([result[1] for result in results if result[0]==nb_threads])
exe_time_list.append(min_val)
#nb_threads =
#exe_time = [result[1] for result in results]
print()
plt.plot([result[0] for result in results], [result[1] for result in results], 'b+')
plt.plot(nb_threads_list, exe_time_list, 'ro-')
plt.show()
Et on peut admirer que notre temps de calcul commence par décroître, atteint un minimum pour 4 threads, et repart ensuite à la hausse.