Bienvenue sur IndexError.

Ici vous pouvez poser des questions sur Python et le Framework Django.

Consultez la FAQ pour améliorer vos chances d'avoir des réponses à vos questions.

nameko et decorators multiples ou dynamiques

0 votes

avec nameko, on peut utiliser du pub/sub avec un décorateur.

pour un service donné je vais avoir plusieurs "publishers" possibles alors je me demandais quelle methode utilisée:

soit un decorateur par service

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc

class ServiceA:
    """ Event dispatching service. """
    name = "service_a"

    dispatch = EventDispatcher()

    @rpc
    def dispatching_method(self, payload):
    self.dispatch("event_type", payload)

class ServiceC:
    """ Event dispatching service. """
    name = "service_c"

    dispatch = EventDispatcher()

    @rpc
    def dispatching_method(self, payload):
    self.dispatch("event_type", payload)

    @event_handler("service_b", "event_type")
    @event_handler("service_c", "event_type")
    @event_handler("service_d", "event_type")
    def handle_event(self, payload):
    print("service a received:", payload)

class ServiceD:
    """ Event dispatching service. """
    name = "service_d"

    dispatch = EventDispatcher()

    @rpc
    def dispatching_method(self, payload):
    self.dispatch("event_type", payload)

    @event_handler("service_a", "event_type")
    @event_handler("service_b", "event_type")
    @event_handler("service_c", "event_type")
    def handle_event(self, payload):
    print("service d received:", payload)


class ServiceB:
    """ Event listening service. """
    name = "service_b"

    @event_handler("service_a", "event_type")
    @event_handler("service_c", "event_type")
    @event_handler("service_d", "event_type")
    def handle_event(self, payload):
    print("service b received:", payload)

soit "un moyen" pour mettre le nom du service (mais je sèche)

    @event_handler(service_name, "event_type")
    def handle_event(self, payload):
        print("service xxx received:", payload)
demandé 20-Dec-2017 par foxmask (2,892 points)

Dans un autre style, j'ai tenté une factory pattern aussi

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc


class ServiceFactory:

    def factory(service_name):
    if service_name == "service_a":
        return ServiceA()
    if service_name == "service_b":
        return ServiceB()
    if service_name == "service_c":
        return ServiceC()
    assert 0, "Bad service_name : " + service_name

    factory = staticmethod(factory)


class Service(ServiceFactory):

    name = "service"

    dispatch = EventDispatcher()

    @rpc
    def for_who(self, service_name, payload):
        t = ServiceFactory.factory(service_name)
        t.read_data(payload)


class ServiceA:
    """ Event dispatching service. """
    name = "service_a"

    dispatch = EventDispatcher()

    @rpc
    def read_data(self, payload):
    print("'i'm in read_data of {} with payload {}".format(self.__class__, payload))
    self.dispatch("save_data", payload)

    @event_handler("service", "save_data")
    def handle_event(self, payload):
    print("service A received:", payload)


class ServiceB:
    """ Event listening service. """
    name = "service_b"

    @rpc
    def read_data(self, payload):
    self.dispatch("save_data", payload)

    @event_handler("service", "save_data")
    def handle_event(self, payload):
    print("service b received:", payload)

class ServiceC:
    """ Event dispatching service. """
    name = "service_c"

    dispatch = EventDispatcher()

    @rpc
    def read_data(self, payload):
    self.dispatch("save_data", payload)

    @event_handler("service", "save_data")
    def handle_event(self, payload):
    print("service C received:", payload)

Mais là j'ai droit à un pb

>>> n.rpc.service.for_who('service_a', 'foobar')
'i'm in read_data of <class 'test.ServiceA'> with payload foobar
error handling worker <WorkerContext [service.for_who] at 0x7f714960c5c0>: 'EventDispatcher' object is not callable
Traceback (most recent call last):
  File "/home/foxmask/DjangoVirtualEnv/namekotst/lib/python3.6/site-packages/nameko/containers.py", line 388, in _run_worker
    result = method(*worker_ctx.args, **worker_ctx.kwargs)
  File "./test.py", line 28, in for_who
    t.read_data(payload)
  File "./test.py", line 40, in read_data
    self.dispatch("save_data", payload)
TypeError: 'EventDispatcher' object is not callable
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/home/foxmask/DjangoVirtualEnv/namekotst/lib/python3.6/site-packages/nameko/rpc.py", line 374, in __call__
    return reply.result()
  File "/home/foxmask/DjangoVirtualEnv/namekotst/lib/python3.6/site-packages/nameko/rpc.py", line 332, in result
    raise deserialize(error)
nameko.exceptions.RemoteError: TypeError 'EventDispatcher' object is not callable

Je sèche à faire manger mon appel RPC à nameko :/

Se connecter ou S'inscrire pour répondre à cette question.

...