J'ai un problème bizarre avec Celery et Redis sous GNU/Linux que je n'ai pas sous Windows.
J'ai un canvas de tâches pour des imports de données qui fonctionne très bien et nous avons identifié qu'il était possible de faire certaines tâches simultanément, alors naïf comme nous sommes nous avons créé des groupes que nous avons chainés. Ca donne un truc comme ça :
group_1 = group(
importer_organismes.s(),
importer_entreprises.s(),
importer_caisses_assurance.s(),
)
group_2 = group(
importer_populations.s(),
)
group_3 = group(
importer_contrats_collectifs.s(),
importer_contrats_individuels.s(),
)
group_4 = group(
importer_personnes.s(),
importer_produits.s(),
)
group_5 = group(
importer_adresses.s(),
importer_domiciliations_bancaires.s(),
importer_numeros_insee.s(),
importer_parametres_personnes.s(),
importer_liens_caisse_personne.s(),
verifier_personnes.s(),
)
workflow = chain(
group_1,
merge.s(),
group_2,
merge.s(),
group_3,
merge.s(),
group_4,
merge.s(),
group_5,
merge.s(),
)
context = Context()
return workflow.apply_async(args=(context, ))
Entre chaque groupe, nous avons mis une sous-tâche permettant de récolter toutes les données de sortie et d'en faire un unique conteneur.
Quand nous exécutons le canvas sous GNU/Linux, le premier groupe s'exécute mais seulement importer_entreprises
ou importer_organismes
et importer_caisses_assurance
ensemble, c'est assez aléatoire, mais jamais les 3 en même temps. Pire encore, une fois ces tâches terminées (avec succès), elles ne passent pas le relais aux suivantes !
Bref, on ne trouve pas de solution après avoir épluché la documentation et le web.
Configuration Celery
# Celery configuration
BROKER_URL = 'redis://localhost:6379/0'
BROKER_TRANSPORT_OPTIONS = {
'visibility_timeout': 43200,
'fanout_prefix': True,
'fanout_patterns': True,
}
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_DISABLE_RATE_LIMITS = True
CELERY_ALWAYS_EAGER = False
Celery runner (Django)
# coding: utf-8
import os
from celery import Celery
# Logging
import logging
log = logging.getLogger(__name__)
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nouvelleoffre.settings')
# Load environment configuration
from configurations import importer
importer.install()
app = Celery('nouvelleoffre')
# Django setup (only for Django 1.7)
# from django imports setup
# setup()
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
# Auto-discover celery tasks inside Django applications
from django.conf import settings
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Lancement du worker
NAME="demo"
DJANGODIR=/home/demo/demo
NUM_WORKERS=1
DJANGO_SETTINGS_MODULE=nouvelleoffre.settings
DJANGO_CONFIGURATION=Staging
DJANGO_WSGI_MODULE=nouvelleoffre.wsgi
# Activate the virtual environment
cd $DJANGODIR
# source /home/demo/.pyenv/versions/nouvelleoffre/bin/activate
export DJANGO_SETTINGS_MODULE=$DJANGO_SETTINGS_MODULE
export DJANGO_CONFIGURATION=$DJANGO_CONFIGURATION
export PYTHONPATH=$DJANGODIR:$PYTHONPATH
# Create the run directory if it doesn't exist
RUNDIR=$(dirname $SOCKFILE)
test -d $RUNDIR || mkdir -p $RUNDIR
PYENV=/home/demo/.pyenv/versions/demo/bin
# Start your Django Celery workers
# Programs meant to be run under supervisor should not daemonize themselves (do not use --daemon)
exec $PYENV/celery worker -A nouvelleoffre --loglevel=INFO --logfile=logs/celery.log --without-mingle --without-gossip