Threading & Queue

Posté le jeu. 10 novembre 2016 dans python

Préambule : décorateur pour tracer les exécutions

On commence par créer un décorateur qui nous permet de stocker dans la liste executions le set (thread_name, t0, t1), à savoir le nom du thread réalisant l'exécution, ainsi que les dates de début et de fin d'exécution de la fonction.

In [1]:
import time
import threading

def timer(executions):
    def decorated(func):
        def wrapper(*args, **kwargs):
            t0 = time.time()
            output = func(*args, **kwargs)
            t1 = time.time()
            time.sleep(0.01)
            executions.append((threading.current_thread().name, t0, t1))
            return output
        return wrapper
    return decorated    

Fonction de test : travail matriciel

On crée également la fonction sur laquelle on va chercher à travailler. Ici on crée deux matrices (listes de listes) aléatoires de taille n, on calcule leur produit, et on écrit le résultat dans un fichier temporaire.

Cela permet d'avoir une fonction qui va d'une part réaliser des actions en mémoire (les calculs matriciels) et sur le disque (écriture fichier).

Et on décore la fonction avec notre décorateur pour permettre de tracer des appels.

In [2]:
#import numpy as np
import tempfile
from random import random

EXECUTIONS = []

@timer(EXECUTIONS)
def matrix_function(n):
    # Création de deux "matrices" aléatoires
    a = [[random() for _ in range(n)] for _ in range(n)]
    b = [[random() for _ in range(n)] for _ in range(n)]

    # Calcul de leur produit
    c = [[0. for _ in range(n)] for _ in range(n)]
    for i in range(n):
        for j in range(n):
            c[i][j] = sum([a[i][k]*b[k][j] for k in range(n)])

    # Export vers un fichier
    with tempfile.TemporaryFile(mode='w') as fp:
        for i in range(n):
            for j in range(n):
                fp.write('{} '.format(c[i][j]))
            fp.write('\n')

A noter que rien dans cette fonction n'est optimisé ; le but est justement d'avoir des traitements coûteux pour mieux appréhender les temps de calcul.

Utilisation séquentielles de la fonction

Appels de la fonction

On va tester N exécutions de notre fonction, pour des matrices de taille aléatoire comprise entre nmin et nmax. On crée donc la liste des tailles, que l'on utilisera partout par la suite, et on appelle la fonction.

In [3]:
# Nombre d'appels à la fonction
N = 100

# Tailles des matrices à créer
nmin, nmax = 40, 50
sizes = [int(nmin + (nmax-nmin)*random()) for _ in range(N)]

del EXECUTIONS[:]
for value in sizes:
    matrix_function(value)

Représentation des durées d'exécution

Pour représenter les exécutions de la fonction, on crée une fonction qui travaille sur la liste EXECUTIONS.

In [4]:
from matplotlib import pyplot as plt

def plot_executions(executions):
    # Noms des threads, utilisés pour graduer l'axe "y"
    names = sorted(list(set([exe[0] for exe in executions])))
    # Ordonnées du graphe : indice du thread dans la liste des noms
    y = [names.index(val[0])+1 for val in executions]
    # Abscisses : temps de début et de fin
    xstart = [val[1] for val in executions]
    xmin = min(xstart)
    xstart = [val-xmin for val in xstart]
    xstop = [val[2]-xmin for val in executions]
    # Couleurs : 
    colors = ['bc'[i%2] for i in range(len(executions))]

    # Tracé
    maxy = max(y)
    maxxstop = max(xstop)
    plt.figure(figsize=(10, (4*maxy+84)/46))
    plt.hlines(y, xstart, xstop, colors, lw=4)
    plt.vlines([maxxstop], [0.], [maxy+1.], 'red', lw=0.5)
    plt.text(maxxstop, 0., 'Fin : {:.3}s'.format(maxxstop),
             horizontalalignment='center',
             verticalalignment='bottom',
             color='red')
    plt.xlim(0., int(maxxstop)+1)
    plt.ylim(0., maxy+1.)
    plt.xlabel('Temps d\'exécution (s)')
    plt.ylabel('Thread')
    plt.yticks([val for val in range(1, maxy+1, 2)])
    plt.show()

Et on l'appelle...

In [5]:
plot_executions(EXECUTIONS)

Threading

Pour utiliser le multi-threading, il suffit d'instancier des threading.Thread auxquels on passe comme arguments le nom de la fonction à exécuter (ici matrix_function), et ses arguments (la taille de la matrice).

On crée donc ici un thread par appel à la fonction. Dans notre cas, on crée 100 threads, chacun travaillant sur un appel à la fonction.

Une fois le thread créé, on le démarre (via start), et on le garde bien au chaud dans la liste threads. Pour attendre la fin de l'exécution des threads, on appelle enfin la méthode join sur les threads lancés.

In [6]:
import threading

del EXECUTIONS[:]

threads = []
for value in sizes:
    th = threading.Thread(target=matrix_function,
                          args=(value,))
    th.start()
    threads.append(th)

for th in threads:
    th.join()

De la même manière que pour l'exécution séquentielle, on représente les durées d'exécution par thread.

In [7]:
plot_executions(EXECUTIONS)

Queue

Ici la manière d'utiliser les threads est différente. On crée une classe dérivée de threading.Thread, que l'on instancie un nombre limité de fois en lui passant une instance de queue.Queue. Cette queue sera remplie par les exécutions de matrix_function à réaliser. A l'exécution de la méthode run, ces threads vont aller piocher dans la queue les tâches à réaliser jusqu'à épuisement.

In [8]:
import queue

class MyThread(threading.Thread):
    """Classe de définition de mes threads.
    """
    def __init__(self, q):
        """Constructeur.
        :param q: instance de queue.Queue
        """
        threading.Thread.__init__(self)
        self.q = q

    def run(self):
        """Lance le thread.
        Tant que l'ordre n'est pas donné de tuer ce thread, alors
        on réalise des exécutions.
        """
        while True:
            # Obtient un élément de la queue
            item = self.q.get()

            # Cet élément a été entré en tant que (fonction, *arguments)
            function, args = item[0], item[1:]

            # Appel à la fonction
            function(*args)

            # Finalisation de la tâche
            self.q.task_done()

Pour utiliser cette classe, on réalise les traitements suivants :

  1. création d'une queue que l'on remplira plus loin avec les exécutions à réaliser ;
  2. création d'un nombre figé de threads, et démarrage de ces threads ;
  3. remplissage de la queue par des tuples (fonction, arguments)
  4. demande au thread principal d'attendre la fin d'exécution des autres threads.

A noter qu'à l'étape 3, dès le premier q.put un thread se met en route pour exécuter sa tâche.

In [9]:
# Création de la queue contenant les configurations à générer
q = queue.Queue()

# Nombre de threads à générer
nb_threads = 4

# Création des threads pour la génération des configurations
for i in range(nb_threads):
    thread = MyThread(q)
    thread.setDaemon(True)
    thread.start()

# Remplissage de la queue
del EXECUTIONS[:]
for value in sizes:
    q.put((matrix_function, value))

# Attend que tous les calculs de la queue soient finis
q.join()

Comme les fois précédentes, on représente les temps d'exécution des différentes fonctions.

In [10]:
plot_executions(EXECUTIONS)

Performance en fonction du nombre de threads

Sur les tests précédents, on peut se demander l'intérêt d'utiliser des queue.Queue : le temps de calcul est assez proche si on les utilise ou pas.

Premier argument : le nombre de threads lançable en même temps n'est pas illimité, et on risque de tout casser si on dépasse la limite.

Second argument : il se trouve que notre fonction n'apprécie pas d'être lancée de trop nombreuses fois en même temps. Pour avoir fait les tests, il semblerait que ce soit la partie écritures disque qui soit limitante, probablement à cause d'accès concurrents au disque. Aussi à mesure que le nombre de threads augmente, le temps de calcul va tout d'abord décroître pour augmenter ensuite.

Le code suivant permet d'illustrer mon propos. On lance le code pour différents nombres de threads, et on stocke le couple (nombre de threads, temps de calcul).

In [11]:
import sys
results = []
nb_threads_list = list(range(1, 10))+list(range(10, 30, 5))+list(range(30, 51, 10))
for _ in range(3):
    for nb_threads in nb_threads_list:
        sys.stdout.write("\rExecution avec {} threads...".format(nb_threads))
        sys.stdout.flush()
        # Création de la queue contenant les configurations à générer
        q = queue.Queue()

        # Création des threads pour la génération des configurations
        for i in range(nb_threads):
            thread = MyThread(q)
            thread.setDaemon(True)
            thread.start()

        # Remplissage de la queue
        del EXECUTIONS[:]
        T0 = time.time()
        for value in sizes:
            q.put((matrix_function, value))

        # Attend que tous les calculs de la queue soient finis
        q.join()

        T1 = time.time()
        results.append((nb_threads, T1-T0))
Execution avec 50 threads...

On représente alors le temps de calcul en fonction du nombre de threads...

In [12]:
exe_time_list = []

for nb_threads in nb_threads_list:
    min_val = min([result[1] for result in results if result[0]==nb_threads])
    exe_time_list.append(min_val)

#nb_threads = 
#exe_time = [result[1] for result in results]
print()
plt.plot([result[0] for result in results], [result[1] for result in results], 'b+')
plt.plot(nb_threads_list, exe_time_list, 'ro-')

plt.show()

Et on peut admirer que notre temps de calcul commence par décroître, atteint un minimum pour 4 threads, et repart ensuite à la hausse.