Bienvenue sur IndexError.

Ici vous pouvez poser des questions sur Python et le Framework Django.

Mais aussi sur les technos front comme React, Angular, Typescript et Javascript en général.

Consultez la FAQ pour améliorer vos chances d'avoir des réponses à vos questions.

Multiprocessing entre differentes machines

+7 votes

J'ai un code qui me permet de lancer différents processus et de partager l'information entre eux. On va dire que ma connaissance du sujet se limite à ce qui est abordé dans le tuto sur sametmax mais reste suffisant pour mes besoins.

J'aimerais pouvoir faire pareil mais entre deux (en fait trois) ordinateurs sur un même réseau local.
Je ne demande pas vraiment de réponse clé en main mais plus une direction où chercher parce que là je commence à être perdu entre tous ces machins sockets et je perds plus mon temps à me demander quoi lire.

Donc l'idée c'est d'utiliser les Queue() entre les process pour les faire communiquer entre eux (ils font leur taf dans leur coin et enregistre le tout dans une BDD) mais au travers du réseau plutôt qu'en local.

demandé 26-Fev-2015 par Feadurn (138 points)
edité 26-Fev-2015 par max

Il faudrait préciser le titre de la question par rapport à son contenu.

Ce n'est pas clair, parle-t-on :
- de distribution,
- de parallélisation,
- de concurrence,
- de communication inter-processus ?

5 Réponses

+10 votes
 
Meilleure réponse

J'ai déjà utilisé: execnet et c'est super. Je peux que te conseiller d'y jeter un oeil.

Tu écris ton code comme d'habitude, tu ouvres une "gateway", et execnet s'occupe de sérialiser la fonction et de la faire exécuter à distance.
C'est aussi un bon moyen de faire communiquer du python2 et du python3 ensemble!

Un exemple de la doc pour attirer les curieux:

import execnet

def multiplier(channel, facteur):
"""channel: c'est l'objet qui te permet de communiquer entre local et remote
facteur: un paramètre normal de la fonction pour montrer que le code qui s'exécute en remote
est défini en local et envoyé lors de la création de la gateway"""
while not channel.isclosed():  # tant que le channel n'est pas fermé
    param = channel.receive()  # lit une valeur envoyé par le côté "local"
    channel.send(param * facteur)  # calcul la valeur et renvoie le résultat

# création de la gateway
gw = execnet.makegateway("ssh=ton_ordi_remote")
# Tu envoies la fonction "*multiplier*" et la valeur du paramètre "*facteur*" à l'hote distant
# et ça l'exécute automatiquement
channel = gw.remote_exec(multiplier, factor=10)

# Ici on va envoyer des données au code qui tourne en remote à travers le "*channel*"
for i in range(5):
    channel.send(i)  # calcule i*10 ordinateur distant!
    result = channel.receive()  # donne moi le résultat
    assert result == i * 10  # que je le vérifie

gw.exit()  # on arrête tout, il est tard
répondu 26-Fev-2015 par bulange (618 points)
sélectionné 26-Fev-2015 par Feadurn

Je regarde a ca et je tente de comprendre. Merci pour la direction suggeree et le code en exemple

+3 votes

Mon niveau de compréhension est le meme que le tiens mais Je me demande si autobahn et le protocol wamp ne seraient pas la solution aussi.

On code un composant qui extrait les urls comme dans le tuto de sametmax et le publish à un autre composant qui subscribe sur le même sujet. Et là communication se fait via crossbar qui fait le "hub" entre les composant à travers le reseau.

exemple simple :

le publisher :

class RssComponent(ApplicationSession):

"""
    An application component that publishes an event
"""

@inlineCallbacks
def onJoin(self, details):        
    while True:
        # recuperation des flux rss dans une procedure
        feeds = yield self.call('eu.trigger-happy.rss.feeds_url')
        for data in feeds:
            for item in data['data']:
                print('publishing {}'.format(item))
                # publication aux subscribers
                self.publish(u'eu.trigger-happy.rss',
                             {'trigger_id': data['trigger_id'], 'user_id': data['user_id'],'item': item})
        yield sleep(120)


@wamp.register(u'eu.trigger-happy.rss.feeds_url')
@inlineCallbacks
def get_feeds_url(self):
    """
        get the URL stored in the database
    """
    query = "SELECT ... FROM django_th_rss "
    rows = yield self.db.runQuery(query)
    feeds = []
    print('get the feeds url...')
    for feed in rows:
        print('get feeds from {0} => {1}'.format(feed[1], feed[2]))
        if feed[0] <= self.right_now():
            feeds.append({'trigger_id': feed[3], "user_id": feed[4],
                      'data': Feeds(**{'url_to_parse': feed[2]}).datas()})
    returnValue(feeds)

le subscriber :

class EvernoteComponent(ApplicationSession):

    """
    An application component that subscribes and receives events
    """

    @inlineCallbacks
    def onJoin(self, details):

        @inlineCallbacks
        def on_event(data):
            print (json.dumps(data, indent=4))
            yield self.call('eu.trigger-happy.evernote.save', data)

    try:
        yield self.subscribe(on_event, u'eu.trigger-happy.rss')
        print("subscribe topic")
    except Exception as e:
        print("could not subscribe to topic: {0}".format(e))


    @wamp.register(u'eu.trigger-happy.evernote.save')
    @inlineCallbacks
    def save_data(self, stuff):
    """
        ici traitement de stuff et enregistrement dans evernote
    """
            pass

J'utilise tout ce truc depuis samedi seulement ;) mais ca marche d'enfer ;)

répondu 26-Fev-2015 par foxmask (2,830 points)
edité 26-Fev-2015 par foxmask

J'avais pense a autobahn, mais ce n'est pas sortir l'artillerie lourde pour juste communiquer un simple queue()?

c'est fort probable
tout dépend du future de l'application je pense
si c'est un truc qui peut vite prendre de l'ampleur et que la scabilité est un enjeu alors non c'est pas lourd ; sinon ya des projets beaucoup moins conséquents c'est sûr.

J'utiliserais MQTT avec Python, dans tous les cas, il te faut un mécanisme de communication entre les processus.

+1 vote

J'ai déja utilisé MPI en C++ et il y a un biding python. C'est un framework/protocole de communication (au dessus des sockets) qui permet de transmettre des messages de façon haut niveau (messages blocants/non blocants, thread safe, proche en proche ou broadcast, permet la création de groupe de communication, de plein de topologies de réseau, ...). De ce que je sais, Open MPI est très utilisé dans les clusters de calcul scientifiques.

Avantages :
* une fois installé sur toutes les machines et noté leur ip dans le fichier de config, tu lances un code, il se propage et communique sans emmerde
* hautement configurable (peut-être un inconvéniant dans ton cas)

Inconvéniants :
* la syntaxe fait un peu peur au début
* il n'utilises que des sockets (comme toutes les autres solutions proposées) ce qui fait un overhead par rapport à une utilisation des IPC si jamais tu l'utilises pour faire du multiprocessing sur une seule machine.

Exemple pris ici :

    #!/usr/bin/env python
    """Demonstrate the task-pull paradigm for high-throughput computing
    using mpi4py. Task pull is an efficient way to perform a large number of
    independent tasks when there are more tasks than processors, especially
    when the run times vary for each task. 

    This code is over-commented for instructional purposes.

    This example was contributed by Craig Finch (cfinch@ieee.org).
    Inspired by http://math.acadiau.ca/ACMMaC/Rmpi/index.html
    """

    from mpi4py import MPI

    def enum(*sequential, **named):
        """Handy way to fake an enumerated type in Python
        http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python
        """
        enums = dict(zip(sequential, range(len(sequential))), **named)
        return type('Enum', (), enums)

    # Define MPI message tags
    tags = enum('READY', 'DONE', 'EXIT', 'START')

    # Initializations and preliminaries
    comm = MPI.COMM_WORLD   # get MPI communicator object
    size = comm.size        # total number of processes
    rank = comm.rank        # rank of this process
    status = MPI.Status()   # get MPI status object

    if rank == 0:
        # Master process executes code below
        tasks = range(2*size)
        task_index = 0
        num_workers = size - 1
        closed_workers = 0
        print("Master starting with %d workers" % num_workers)
        while closed_workers < num_workers:
            data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
            source = status.Get_source()
            tag = status.Get_tag()
            if tag == tags.READY:
                # Worker is ready, so send it a task
                if task_index < len(tasks):
                    comm.send(tasks[task_index], dest=source, tag=tags.START)
                    print("Sending task %d to worker %d" % (task_index, source))
                    task_index += 1
                else:
                    comm.send(None, dest=source, tag=tags.EXIT)
            elif tag == tags.DONE:
                results = data
                print("Got data from worker %d" % source)
            elif tag == tags.EXIT:
                print("Worker %d exited." % source)
                closed_workers += 1

        print("Master finishing")
    else:
        # Worker processes execute code below
        name = MPI.Get_processor_name()
        print("I am a worker with rank %d on %s." % (rank, name))
        while True:
            comm.send(None, dest=0, tag=tags.READY)
            task = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
            tag = status.Get_tag()

            if tag == tags.START:
                # Do the work here
                result = task**2
                comm.send(result, dest=0, tag=tags.DONE)
            elif tag == tags.EXIT:
                break

        comm.send(None, dest=0, tag=tags.EXIT)
répondu 3-Mar-2015 par showok (212 points)

C'est vrai que ça fait un peu peur ;)

+2 votes

Franchement je partirai sur Mosquito (MQTT).

La doc est disponible pour le module Python à cette adresse : https://pypi.python.org/pypi/paho-mqtt

C'est simple, propre, efficace et sécurisé (TLS).

Un petit exemple pratique : http://oliversmith.io/technology/2010/08/15/first-steps-using-python-and-mqtt/

répondu 3-Mar-2015 par DoubleNain (1,662 points)
edité 3-Mar-2015 par DoubleNain
+1 vote

perso je ne me prends pas la tete, j'empile les job dans un redis et chacun vient chercher son boulot.

répondu 4-Mar-2015 par juke (318 points)
...