Bienvenue sur IndexError.

Ici vous pouvez poser des questions sur Python et le Framework Django.

Mais aussi sur les technos front comme React, Angular, Typescript et Javascript en général.

Consultez la FAQ pour améliorer vos chances d'avoir des réponses à vos questions.

Celery tests unitaire (py.test)

0 votes

J'utilise celery pour faire des traitements répetitives.

class Callback(celery.Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        ....
        super(Callback, self).on_failure(exc, task_id, args, kwargs, einfo)

@celery.task(bind=True, default_retry_delay=3, max_retries=1, base=Callback)
def faire_qlq_chose(self, v):
  try:
     ....
  except MyExcept:
    self.retry()

je veux faire des tests unitaires sur "faireqlqchose" sauf que je ne passe pas par la fonction "on_failure" si j'ai 'MyExcept'.
la config de celery pour les tests unitiares, dans conftest.py:

app.config['CELERY_ALWAYS_EAGER'] = True
app.config['CELERY_ALWAYS_EAGER'] = True

quelqu'un à une idée pour que mon programme passe dans la fonction "on_failure" si "MyExcept" .

demandé 28-Jun par my
edité 28-Jun par max

1 Réponse

0 votes

Je pense que ton problème vient du fait que par défaut les exceptions meurent dans le conteneur de tâche Celery, il faut que dans ta configuration tu définisses explicitement que tu veux propager l'exception pour qu'elle soit récupérée en dehors de l'exécution de la tâche.

En Celery<4 c'était le point de configuration CELERY_TASK_EAGER_PROPAGATES mais dans la dernière version il faut utiliser une autre typologie de configuration :
http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_eager_propagates

répondu 29-Jun par debnet (962 points)

Bonjour,
Merci pour la réponse.
Ça fonctionne très bien.
Merci encore

Accepte la réponse pour que les autres utilisateurs puissent déterminer que c'est résolu. ;)

...