Bienvenue sur IndexError.

Ici vous pouvez poser des questions sur Python et le Framework Django.

Mais aussi sur les technos front comme React, Angular, Typescript et Javascript en général.

Consultez la FAQ pour améliorer vos chances d'avoir des réponses à vos questions.

un décorateur générique

+5 votes

Actuellement avec une application django, j'ai des management command qui font un import selon la présence d'une variable dans le settings.py comme suit :

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.core.management.base import BaseCommand
from django.conf import settings

if settings.RQ_QUEUES:
    from django_th.tasks_rq import read_data
else:
    from django_th.tasks import read_data

class Command(BaseCommand):

    help = 'Trigger all the services'

    def handle(self, *args, **options):
    read_data.delay()

Le code de tasks_rq.py et tasks.py sont rigoureusement identiques... modulo le décorateur

Dans le premier le décorateur est @job et utilisé pour RQ & Django-RQ, le second est @sharer_tasks et utilisé par Celery.

En soit c'est pratique parce que il n'y a pas d'effet de bord, mais ça duplique quand même tout le code du module tasks(_rq).py pour 3 lignes de décoration.

Comment faire pour arriver à produire un décorateur qui wrappe ces 2 là et qui ne pète pas lors de son appel parce que soit RQ est absent soit Celery est absent ?

J'ai cherché de l'héritage de décorateur mais j'ai l'impression que je me fourvoie

demandé 12-Fev-2016 par foxmask (2,880 points)

c'est marrant plus je creuse python plus je croise des patterns que j'avais encore jamais utilisé

@Sam ce qui me rend perplexe c'est le "fait un module task générique qui regroupe le code commun." tu parles du code commun des décorateurs de RQ et Celery ou tu parles du code commun de mes modules tasks(_rq).py ?

Le code commun a tous tes modules tasks*. Si tu as beaucoup de code spécifique à RQ ou Celery, il peut avoir son propre module, il faut juste qu'il se base sur le module task commun.

Le pattern que tu vois là est un design pattern très académique appelé le pattern strategy. Il est très utilisé en POO, mais là on a la version fonctionnelle, puisqu'on utilise déjà la version fonctionnelle du pattern décorateur.

1 Réponse

+4 votes
 
Meilleure réponse

Il te faut utiliser des imports conditionnels et un peu d'injection de dépendance.

D'abord, fait un module task generique qui regroupe le code commun.

Ce module task generique doit avoir une factory pour son decorator task:

# hop, on injecte la stragie selon le module (celery, rq) disponible
def task_decorator_factory(stragegy): 

    def tasks(func):
        # faire d'autres trucs
        func = stragegy(func)
        # faire d'autres truc
        return func

    return tasks


# on essaye d'importer les dépendances, et créer un decorator générique,
# avec la première bonne stratégie dispo
try:
    from celery import sharer_tasks

    tasks = task_decorator_factory(sharer_tasks)
except ImportError:

    try:
        from dango_rq import job

        tasks = task_decorator_factory(job)

    except ImportError:
        raise ImportError('Celery or Django-rq must be installed before you can use this')
répondu 17-Fev-2016 par Sam (4,984 points)
sélectionné 24-Fev-2016 par foxmask

je me penche sur ca et je vous dirai

voilà où j'en suis arrivé, dans mon tasks.py j'ai :

from django_th.tasks_generic import tasks

@tasks('default')
def read_data():
    """
    The purpose of this tasks is to put data in cache
    because of the read_data function of each
    service
    """
    trigger = TriggerService.objects.filter(status=True).select_related(
    'consumer__name', 'provider__name')

    for service in trigger:
        reading.delay(service)

dans tasks_generic.py j'ai mis

def task_decorator_factory(strategy):

    def tasks(function):
        print("strat {}".format(strategy))
        # drop the existing parameter in the @tasks() decorator 
        # when it's for Celery because it does not need them
        if strategy.__name__ == 'shared_task':
            function = strategy()
            print(function)
        else:
            function = strategy(function)
            print(function)
        return function
    return tasks


try:
    from django_rq import job

    tasks = task_decorator_factory(job)

except ImportError:
    try:
    from celery import shared_task

    tasks = task_decorator_factory(shared_task)

    except ImportError:
    raise ImportError('Celery or Django-rq must be installed before you can use this')

et à l'execution de ma tache read_data j'ai une exception liée à RQ :

strat <function job at 0x7fa62a3e5158>
<rq.decorators.job object at 0x7fa628a67e80>
23:04:41 TypeError: read_data() takes 1 positional argument but 4 were given
Traceback (most recent call last):
  File "/home/foxmask/DjangoVirtualEnv/django-trigger-happy/lib/python3.4/site-packages/rq/worker.py", line 568, in perform_job
    rv = job.perform()
  File "/home/foxmask/DjangoVirtualEnv/django-trigger-happy/lib/python3.4/site-packages/rq/job.py", line 495, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File "/home/foxmask/DjangoVirtualEnv/django-trigger-happy/django-th/django_th/tasks.py", line 156, in reading
    service.date_triggered)
TypeError: read_data() takes 1 positional argument but 4 were given
Traceback (most recent call last):
  File "/home/foxmask/DjangoVirtualEnv/django-trigger-happy/lib/python3.4/site-packages/rq/worker.py", line 568, in perform_job
    rv = job.perform()
  File "/home/foxmask/DjangoVirtualEnv/django-trigger-happy/lib/python3.4/site-packages/rq/job.py", line 495, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File "/home/foxmask/DjangoVirtualEnv/django-trigger-happy/django-th/django_th/tasks.py", line 156, in reading
    service.date_triggered)
TypeError: read_data() takes 1 positional argument but 4 were given
23:04:41 Invoking exception handler <bound method Worker.move_to_failed_queue of <rq.worker.Worker object at 0x7fa629bf0cc0>>
23:04:41 Moving job to 'failed' queue
23:04:41 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
23:04:41 
23:04:41 *** Listening on default...
23:04:41 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.

Pour autant si dans tasks_generic.py j'inverse les 2 imports pour mettre celery en premier, quand je l'execute ; je n'ai aucune erreur, parce que je shoot les paramètres passés au décorator :

strat <function shared_task at 0x7f1e1233cea0>
<function shared_task.<locals>.create_shared_task.<locals>.__inner at 0x7f1e109961e0>

un idée ?

Ouvre une autre question pour ça.

...