Answer the question
In order to leave comments, you need to log in
Celery: nested chord and events
Colleagues, there was an incomprehensible situation with nested chord.
There are tasks:
@app.task(base=CheckerTask, name='task_a')
def task_a():
pass
@app.task(base=CheckerTask, name='task_b')
def task_b():
pass
@app.task
def complete():
pass
chord_a = chord([task_a.s(...).set(queue='st'), task_b.s(...).set(queue='st')], complete.s(...).set(queue='publish'))
chord_b = chord([task_a.s(...).set(queue='st'), task_b.s(...).set(queue='st')], complete.s(...).set(queue='publish'))
chord([chord_a.s(), chord_b.s()], main_complete.s().set(queue='publish')).apply_async()
Answer the question
In order to leave comments, you need to log in
Just did a test:
import celery
from celery.canvas import chord
@celery.task
def task_a(*args, **kwargs):
logger.info('task_a')
return 1
@celery.task
def task_b(*args, **kwargs):
logger.info('task_b')
return 1
@celery.task
def task_c(*args, **kwargs):
logger.info('task_c')
return 1
@celery.task
def task_d(*args, **kwargs):
logger.info('task_d')
return 1
@celery.task
def notify_a(*args, **kwargs):
logger.info('notify_a')
return 1
@celery.task
def notify_b(*args, **kwargs):
logger.info('notify_b')
return 1
@celery.task
def finish(*args, **kwargs):
logger.info('finish')
return 1
def test_chord_chord():
ch1 = chord([task_a.s(), task_b.s()], notify_a.s())
ch2 = chord([task_c.s(), task_d.s()], notify_b.s())
main_task = chord([ch1, ch2], finish.s())
main_task.apply_async()
[2014-06-04 16:30:29,811: INFO/MainProcess] Got task from broker: celery.chord[36d088c0-648b-482e-9075-59e6aa1d519a]
[2014-06-04 16:30:29,883: INFO/MainProcess] Task celery.chord[36d088c0-648b-482e-9075-59e6aa1d519a] succeeded in 0.0439801216125s: <GroupResult:...
[2014-06-04 16:30:30,821: INFO/MainProcess] Got task from broker: celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] eta:[2014-06-04 16:30:30.845580+04:00]
[2014-06-04 16:30:30,825: INFO/MainProcess] Got task from broker: celery.chord[41452081-7a54-4c8f-8328-a5861b9ceefc]
[2014-06-04 16:30:30,828: INFO/MainProcess] Got task from broker: celery.chord[2a448074-02a1-4bb2-bf2a-9942b6157baa]
[2014-06-04 16:30:30,875: INFO/MainProcess] Task celery.chord[2a448074-02a1-4bb2-bf2a-9942b6157baa] succeeded in 0.0350089073181s: <GroupResult:...
[2014-06-04 16:30:30,876: INFO/MainProcess] Task celery.chord[41452081-7a54-4c8f-8328-a5861b9ceefc] succeeded in 0.0363390445709s: <GroupResult:...
[2014-06-04 16:30:30,876: INFO/MainProcess] Task celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] retry: Retry in 1s
[2014-06-04 16:30:31,834: INFO/MainProcess] Got task from broker: celery.chord_unlock[6ffd6da8-e2b0-405d-912f-55e2ce52b10b] eta:[2014-06-04 16:30:31.846696+04:00]
[2014-06-04 16:30:31,837: INFO/MainProcess] Got task from broker: aggregator.tasks.task_c[1bc49e88-9348-441d-a5f2-776e35f3d178]
[2014-06-04 16:30:31,840: INFO/MainProcess] Got task from broker: aggregator.tasks.task_d[7ce8e784-2eee-45bf-81c4-ecebe8343c76]
[2014-06-04 16:30:31,843: INFO/MainProcess] Got task from broker: celery.chord_unlock[b6520638-e68e-4600-a306-9445ba54d7bd] eta:[2014-06-04 16:30:31.845846+04:00]
[2014-06-04 16:30:31,846: INFO/MainProcess] Got task from broker: aggregator.tasks.task_a[f624d9df-a32e-40a7-beff-cbf49f414a2f]
[2014-06-04 16:30:31,850: INFO/MainProcess] Got task from broker: aggregator.tasks.task_b[1979141c-2a9e-4dbb-a56f-848711c58afa]
[2014-06-04 16:30:31,856: INFO/MainProcess] Got task from broker: celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] eta:[2014-06-04 16:30:31.857219+04:00]
[2014-06-04 16:30:31,860: INFO/MainProcess] task_c
[2014-06-04 16:30:31,860: INFO/MainProcess] task_a
[2014-06-04 16:30:31,861: INFO/MainProcess] task_d
[2014-06-04 16:30:31,863: INFO/MainProcess] Task aggregator.tasks.task_c[1bc49e88-9348-441d-a5f2-776e35f3d178] succeeded in 0.00339508056641s: 1
[2014-06-04 16:30:31,865: INFO/MainProcess] task_b
[2014-06-04 16:30:31,866: INFO/MainProcess] Task aggregator.tasks.task_a[f624d9df-a32e-40a7-beff-cbf49f414a2f] succeeded in 0.00567007064819s: 1
[2014-06-04 16:30:31,867: INFO/MainProcess] Task aggregator.tasks.task_d[7ce8e784-2eee-45bf-81c4-ecebe8343c76] succeeded in 0.00664710998535s: 1
[2014-06-04 16:30:31,868: INFO/MainProcess] Task aggregator.tasks.task_b[1979141c-2a9e-4dbb-a56f-848711c58afa] succeeded in 0.00276899337769s: 1
[2014-06-04 16:30:31,870: INFO/MainProcess] Task celery.chord_unlock[6ffd6da8-e2b0-405d-912f-55e2ce52b10b] succeeded in 0.00752401351929s: None
[2014-06-04 16:30:31,885: INFO/MainProcess] Task celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] retry: Retry in 1s
[2014-06-04 16:30:31,890: INFO/MainProcess] Task celery.chord_unlock[b6520638-e68e-4600-a306-9445ba54d7bd] retry: Retry in 1s
[2014-06-04 16:30:32,865: INFO/MainProcess] Got task from broker: aggregator.tasks.notify_b[c7ee5cde-934f-4838-b7ec-4b965deab3f8]
[2014-06-04 16:30:32,869: INFO/MainProcess] Got task from broker: celery.chord_unlock[b6520638-e68e-4600-a306-9445ba54d7bd] eta:[2014-06-04 16:30:32.870193+04:00]
[2014-06-04 16:30:32,871: INFO/MainProcess] notify_b
[2014-06-04 16:30:32,873: INFO/MainProcess] Got task from broker: celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] eta:[2014-06-04 16:30:32.883596+04:00]
[2014-06-04 16:30:32,874: INFO/MainProcess] Task aggregator.tasks.notify_b[c7ee5cde-934f-4838-b7ec-4b965deab3f8] succeeded in 0.00337815284729s: 1
[2014-06-04 16:30:32,882: INFO/MainProcess] Task celery.chord_unlock[b6520638-e68e-4600-a306-9445ba54d7bd] succeeded in 0.00427103042603s: None
[2014-06-04 16:30:32,893: INFO/MainProcess] Task celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] retry: Retry in 1s
[2014-06-04 16:30:33,881: INFO/MainProcess] Got task from broker: celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] eta:[2014-06-04 16:30:33.888341+04:00]
[2014-06-04 16:30:33,884: INFO/MainProcess] Got task from broker: aggregator.tasks.notify_a[5fb97e6c-5dc3-42a9-b52f-f6aaf228bfbc]
[2014-06-04 16:30:33,888: INFO/MainProcess] notify_a
[2014-06-04 16:30:33,890: INFO/MainProcess] Task aggregator.tasks.notify_a[5fb97e6c-5dc3-42a9-b52f-f6aaf228bfbc] succeeded in 0.00259184837341s: 1
[2014-06-04 16:30:33,898: INFO/MainProcess] Task celery.chord_unlock[7deb4165-c2fb-4771-8c56-edf30912b4c3] succeeded in 0.00864100456238s: None
[2014-06-04 16:30:34,890: INFO/MainProcess] Got task from broker: aggregator.tasks.finish[1042518e-bea4-49fb-a11c-37b5ffa29c1a]
[2014-06-04 16:30:34,893: INFO/MainProcess] finish
[2014-06-04 16:30:34,895: INFO/MainProcess] Task aggregator.tasks.finish[1042518e-bea4-49fb-a11c-37b5ffa29c1a] succeeded in 0.00239610671997s: 1
Django==1.5.5
celery==3.0.19
celery-with-mongodb==3.0
django-celery==3.0.17
kombu==2.5.10
INSTALLED_APPS = (
...
'djcelery',
'celery',
...
)
BROKER_URL = MONGO_CONN_STR
CELERY_RESULT_BACKEND = "mongodb"
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question