Bienvenue sur IndexError.

Ici vous pouvez poser des questions sur Python et le Framework Django.

Mais aussi sur les technos front comme React, Angular, Typescript et Javascript en général.

Consultez la FAQ pour améliorer vos chances d'avoir des réponses à vos questions.

Mauvais usage de multiprocessing via une methode de classe

+2 votes

Le code suivant, qui marche, résume mon projet jusqu'ici:

On a deux classes, une classe P qui a un attribut, et une classe pleinDeP qui a entre autre un attribut qui est une liste de P.

J'effectue souvent une opération sur les P de la liste, je me moque dans quel ordre elle est faite et dans quel ordre on a les résultats, donc je pense que c'est plutôt bien parallelisable.

J'ai réussi à bidouiller un truc qui marche avec mulitprocessing, malheureusement ça tourne en environ 20 fois plus de temps avec 4 coeurs que sur un seul. C'est lié je pense aux 3 redéfinitions de fonction:

J'ai du créer une fonction modP modifiée (modPReturn) qui renvoie un objet de type P. Peut être que j'ai mal fait.
Pire encore, apply_async ne voulait pas prendre P.modPReturn comme fonction, j'ai du recréer une 2e redéfinition (redef) qui puisse prendre mes arguments.
Par ailleurs, les résultats sont stockés dans une liste dont la taille n'est pas fixée au début de l'exécution (alors qu'on la connait) -> J'imagine qu'on perd des années là dessus. Mais c'est tout ce que j'ai trouvé pour gérer les résultats "à la volée" renvoyée par les processes. (En fait pas tout à fait, j'ai essayée les Queues mais à déplier c'était encore plus lent).

Au final ma question serait "est il possible d'appliquer une méthode de classe sur un objet d'une autre classe de façon parallèle avec multiprocessing?"

# -*- coding: utf-8 -*-

### ici les imports:
from __future__ import print_function
import multiprocessing as mp
import time
import random

### ici les classes:
class P():
    def __init__(self,x=.5): 
        self.x=x+random.random()

    def modP(self,a,b):
        self.x = a*self.x+b

    def modPReturn(self,a,b):
        nptemp=P(self.x)
        nptemp.x = a*nptemp.x+b
        return nptemp

class pleinDeP():
    def __init__(self,N=20): 
        self.bloc=[P() for _ in range(N)]

    def muteV(self,a,b):
         for p in self.bloc:
            p.modP(a,b)       


def redef(x,a,b):
    return P.modPReturn(x,a,b)

def log_result(result):
        result_list.append(result)

### ici le coeur:
if __name__ == '__main__':
    ma=1
    mb=10

    mpdp1=pleinDeP(2500)
    debser=time.time()
    mpdp1.muteV(ma,mb)
    finser=time.time()
    print("temps pas parallelle: " + str(finser-debser))
    print(mpdp1.bloc[0].x) ## si ça tourne autour de 10 c'est que ça a marché

    result_list = []
    mpdp2=pleinDeP(2500)
    debpar=time.time()
    pool= mp.Pool(processes=4)
    for x in mpdp2.bloc:
        pool.apply_async(redef,args = (x,ma,mb),callback = log_result)
    pool.close()
    pool.join()

    mpdp2.bloc=result_list
    finpar=time.time()    
    print("temps parallelle: " + str(finpar-debpar))
    print(mpdp2.bloc[0].x) ## si ça tourne autour de 10 c'est que ça a marché
demandé 23-Mar-2015 par Beoti1 (156 points)
edité 23-Mar-2015 par Lhassa

Pour la lisibilité j'aurai évité le mix camelCase des methodes de la classe en fooBar() avec la derniere fonction en foo_bar()
Du coup à moins que cette façon de coder ne soit imposée/inspirée d'une lib/framework (genre WAMP/Crossbar la joue ainsi), ca colle pas à la pep8 ;)
Ca fait pas avancer le schmilblick mais c'est pour que vous ayez des reponses plus constructives par la suite ;)

d'autre part, pour les test de performances, fait attention à un truc : sur un petit jeu de données ou pour des tâches simples, en général, les résultats peuvent être faussés (plus de temps de transfert de données que de temps de calculs par exemple...)

de plus, un bon moyen de prévoir du code parallélisable (ou multi process), c'est d'utiliser le map d'une fonction sur une liste... derrière, c'est assez facile de le lancer de manière asynchrone, ou de le gérer avec des Queue...

@foxmask : bien reçu, je vais faire plus attention là dessus

@Lhassa : Bien vu, je pense que mon exemple est très (trop) simple. Je ne suis même pas sur que sur mon vrai code la fonction soit beaucoup plus compliquée, c'est juste que je dois la répéter un million de fois. Bref, je vais regarder du côté des maps.

1 Réponse

+1 vote
 
Meilleure réponse

je te conseil de lire ça et éventuellement aussi ma question et réponse ici avec entre autre une solution pour utiliser les Queue. Je pense qu'une Queue est la bonne solution pour toi, mais j'ai pas trop regarder...

pour ce qui est d'utiliser une méthode de Classe directement, je pense que ce n'est pas possible simplement, à cause de la manière dont la lib gère la chose.

sinon, cette solution semble être bien plus rapide que la tienne pour la partie multi-process (mais moins rapide que la solution native)

### ici le coeur:
if __name__ == '__main__':
    ma=1
    mb=10
    from functools import partial
    mpdp4=pleinDeP(25000)
    debpar=time.time()
    pool= mp.Pool(processes=4)
    Lhassa_redef = partial(redef, a=ma, b=mb)
    print(mpdp4.bloc[0].x) ## si ça tourne autour de 10 c'est que ça a marché
    log_result4_tmp = pool.map_async(Lhassa_redef, mpdp4.bloc)
    mpdp4.bloc = log_result4_tmp.get()
    finpar=time.time()
    print("temps parallelle4: " + str(finpar-debpar))
    print(mpdp4.bloc[0].x) ## si ça tourne autour de 10 c'est que ça a marché
répondu 23-Mar-2015 par Lhassa (794 points)
sélectionné 24-Mar-2015 par Beoti1

Bien vu, c'est une bonne base pour moi merci. Je vais me retaper la doc des Queues, ça semble logique. En tout cas le map a l'air plus prometteur que le apply asynchrone.

en fait, je pense que c'est assez proche dans ton cas, peut être un tout petit peu de réorganisation de l'algo...
d'une manière générale, c'est assez bon d'utiliser les map, pour de l'optimisation future (sauf erreur de ma part, ça permet d'aller vers de la prog vectorielle, très bien gérée par les GPU)

...