j'ai une tache simple à exécuter plein de fois, donc je cherche à "paralléliser" mon code. J'utilise multiprocessing, avec un Pool
de workers. Cela marche globalement comme le built-in map.
Comme je ne peux passer qu'un seul argument avec map_async qui est l’itérable parallélisé, j'utilise partial
de functools
qui me retourne une fonction à 1 paramètre (l'itérable) en fixant tout les autres (cf. Introduction au currying).
L'exemple suivant marche très bien, sauf si l'un de mes paramètre fixé est un pointeur vers un fichier (j'essaye ici d'écrire dans un fichier plutôt qu'à l'écran).
from functools import partial
from multiprocessing import Pool
def ecrire_une_ligne_ecran(i, commentaire):
print("{} - {}".format(i, commentaire))
def ecrire_une_ligne_fichier(i, fichier_out):
fichier_out.write("{}\n".format(i))
def mon_pgm(chemin_vers_fichier_out,
nb_lignes = 5,
nb_jobs=1):
fichier_out = open(chemin_vers_fichier_out, 'w')
fct_parallele_ecran = partial(ecrire_une_ligne_ecran, commentaire="ca marche",)
fct_parallele_fichier = partial(ecrire_une_ligne_fichier, fichier_out=fichier_out,)
if nb_jobs>1:
p = Pool(nb_jobs)
resultat = p.map_async(func=fct_parallele_ecran, iterable=range(nb_lignes))
resultat.get() # marche très bien
resultat = p.map_async(func=fct_parallele_fichier, iterable=range(nb_lignes))
resultat.get() # ne marche pas avec un pointeur de fichier
else: # marche très bien
print("map simple")
resultat = map(fct_parallele_ecran, range(nb_lignes))
resultat = map(fct_parallele_fichier, range(nb_lignes))
fichier_out.close()
mon_pgm("toto.txt", nb_lignes=3, nb_jobs=1) # -> marche très bien
mon_pgm("toto.txt", nb_lignes=3, nb_jobs=2) # -> erreur I/O...
l'exemple marche très bien pour nb_jobs=1 (le fichier est créé et remplie), mais j'ai l'erreur
ValueError: I/O operation on closed file
dés que je passe par le Pool
de workers.
ps : j'analyse coupe par coupe des volumes de 32Go, d'où la nécessité d'optimiser en temps et en espace disque : je ne peux pas ouvrir et charger toute l'image en mémoire...