Bienvenue sur IndexError.

Ici vous pouvez poser des questions sur Python et le Framework Django.

Mais aussi sur les technos front comme React, Angular, Typescript et Javascript en général.

Consultez la FAQ pour améliorer vos chances d'avoir des réponses à vos questions.

Partager un accès à un fichier en lecture/ecriture en multiprocessing

+3 votes

j'ai une tache simple à exécuter plein de fois, donc je cherche à "paralléliser" mon code. J'utilise multiprocessing, avec un Pool de workers. Cela marche globalement comme le built-in map.

Comme je ne peux passer qu'un seul argument avec map_async qui est l’itérable parallélisé, j'utilise partial de functools qui me retourne une fonction à 1 paramètre (l'itérable) en fixant tout les autres (cf. Introduction au currying).

L'exemple suivant marche très bien, sauf si l'un de mes paramètre fixé est un pointeur vers un fichier (j'essaye ici d'écrire dans un fichier plutôt qu'à l'écran).

from functools import partial
from multiprocessing import Pool

def ecrire_une_ligne_ecran(i, commentaire):
    print("{} - {}".format(i, commentaire))

def ecrire_une_ligne_fichier(i, fichier_out):
    fichier_out.write("{}\n".format(i))

def mon_pgm(chemin_vers_fichier_out,
            nb_lignes = 5,
            nb_jobs=1):
    fichier_out = open(chemin_vers_fichier_out, 'w')

    fct_parallele_ecran = partial(ecrire_une_ligne_ecran, commentaire="ca marche",)
    fct_parallele_fichier = partial(ecrire_une_ligne_fichier, fichier_out=fichier_out,)

    if nb_jobs>1:
        p = Pool(nb_jobs)
        resultat = p.map_async(func=fct_parallele_ecran, iterable=range(nb_lignes))
        resultat.get()    # marche très bien
        resultat = p.map_async(func=fct_parallele_fichier, iterable=range(nb_lignes))
        resultat.get()    # ne marche pas avec un pointeur de fichier
    else:    # marche très bien
        print("map simple")
        resultat = map(fct_parallele_ecran, range(nb_lignes))
        resultat = map(fct_parallele_fichier, range(nb_lignes))

    fichier_out.close()

mon_pgm("toto.txt", nb_lignes=3, nb_jobs=1)    # -> marche très bien
mon_pgm("toto.txt", nb_lignes=3, nb_jobs=2)    # -> erreur I/O...

l'exemple marche très bien pour nb_jobs=1 (le fichier est créé et remplie), mais j'ai l'erreur

ValueError: I/O operation on closed file

dés que je passe par le Pool de workers.

ps : j'analyse coupe par coupe des volumes de 32Go, d'où la nécessité d'optimiser en temps et en espace disque : je ne peux pas ouvrir et charger toute l'image en mémoire...

demandé 4-Mar-2015 par Lhassa (794 points)

1 Réponse

+5 votes
 
Meilleure réponse

Tu ne peux pas partager des fichiers avec multiprocessing.
En python3.4 tu as une erreur un peu plus explicite:

TypeError: cannot serialize '_io.TextIOWrapper' object

Les objets "file" que tu récupères en python2.7 sont dans un mode bizarre:

<uninitialized file>

qui explique ton erreur:

ValueError: I/O operation on closed file

Si tu y tiens vraiment, il existe une librairie qui permet de partager des descripteurs de fichiers d'un processus à l'autre: libancillary

Mais le plus simple serait:

  • en lecture, de passer le nom du fichier (et un offset) dans lequel ton worker travaille (il l'ouvrirait).
  • en écriture, de passer un nom de fichier unique pour chaque worker, ou une Queue dans laquelle tes workers put des messages, un autre process se chargerait de "dépiler" la queue pour sérialiser les accès au fichier.
répondu 4-Mar-2015 par bulange (618 points)
sélectionné 6-Mar-2015 par Lhassa

j'avais pas vu ça comme ça... thks
j'ai pas envie d'avoir des dépendances, par contre les Queues ça ressemble plus à ce qu'il me faut...
pour la lecture du fichier, effectivement, c'est probablement facile.
pour l'écriture, une Queue est probablement la solution, dés que j'arrive à en faire un exemple qui marche, je le post.

En terme de Queues python ça pourrait t'intéresser: http://python-rq.org/docs/

@max : rq semble très intéressant, surtout associé à redis, mais je ne suis pas sur de bien maîtriser cela encore. A explorer!

@bulange : queue est effectivement une bonne solution. Du coup voilà comment j'ai construit le code (il n'y a plus de fichier, mais les fichiers ne sont plus un problème avec cette solution...)

from __future__ import print_function

from multiprocessing import Queue, Process, current_process
import time, random

def ecrire_une_ligne_fichier(task, gachette, results):
    while not task.empty(): # tant qu'il y a du boulot...
        t = task.get()      # récupère une tâche à faire
        gachette.put(t)     # bloquant si l'empreinte memoire est déjà saturée
        time.sleep(random.random()/5.)     # l'action à faire...
        results.put([t, current_process().name]) # alimente la queue de résultats


def mon_pgm(chemin_vers_fichier_out, nb_actions, nb_workers, empreinte_memoire):
    mesTaches = Queue()                     # queue des tâche à faire
    mesResultats = Queue()                  # queue des résultats à traiter
    maGachette = Queue(empreinte_memoire)   # queue permettant de gérer l'empreinte mémoire

    for nl in range(nb_actions): mesTaches.put(nl)  # remplissage de la queue des tâches à faire

    # création des workers, à paramétrer en fonction des capacités de la machine hôte
    Workers = [Process(target=ecrire_une_ligne_fichier, args=(mesTaches, maGachette, mesResultats)) for i in range(nb_workers)]
    # on démare les workers, ils commencent à travailler
    for each in Workers: each.start()

    print(" res : résultats en attente de traitement")
    print(" mem : empreinte mémoire")
    print("  id - res/mem")
    # tant que les 3 listes ne sont pas vide, il y a de boulot en cour
    while not (mesTaches.empty() and maGachette.empty() and mesResultats.empty()):
        result = mesResultats.get() # on récupère un résultat pour le traiter
        # sortie classique, ou impression fichier...
        print(" {: 3d} - {: 3d}/{: 3d}".format(result[0], mesResultats.qsize(), maGachette.qsize()))
        time.sleep(1)      # le traitement des résultats peut prendre du temps (écriture fichier...)
        t=maGachette.get() # on a fini avec ces données, on peut libérer le calcul suivant

mon_pgm("toto.txt", nb_actions=10, nb_workers=2, empreinte_memoire=4) 

avec comme résultats :

>>>>python thread_file4.py 
res : résultats en attente de traitement
mem : empreinte mémoire
 id - res/mem
  1 -   0/  3
  2 -   2/  4
  0 -   2/  4
  3 -   2/  4
  4 -   2/  4
  5 -   2/  4
  6 -   2/  4
  7 -   2/  3
  8 -   1/  2
  9 -   0/  1

on a bien mem-res <= nb_workers
ici, le traitement des résultats est plus long que leur génération, on vérifie bien que l'empreinte mémoire est respectée...

...