M
M
Maxim2017-12-26 18:53:54
Django
Maxim, 2017-12-26 18:53:54

How to collect the execution of all tasks from one function for further response to the user?

Greetings,
the task is to wait for the execution of all tasks with a further response to the client, a code example for a more specific understanding

@task(base=NotifierTask)
first(*args):
  # runs some time
  return json_resp

@task(base=NotifierTask)
second(*args):
  # runs some time
  return json_resp

@task(base=NotifierTask)
third(*args):
  # runs some time
  return json_resp


# На этот урл идет запрос
# количество выполнения ф-ций может варьироваться в зависимости от запроса 
# может выполниться от 1 до 40 ф-ций
def main(request):
  # there we get some data
  data = request.POST.get('data')

  for some_func in data:
    if some_func == 'first':
      first.delay(args)
    elif some_func == 'second':
      second.delay(args)
    elif some_func == 'third':
      third.delay(args)

# Все это дело попадает на воркер селери

# Notifier Task
class NotifierTask(Task):
    """
    Tasks that sends notification on completion.
    """
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        # Получаем результат выполнения и прокидываем через сокет на сторону клиента

The task is to obtain the result of the execution of all functions in order to form a response for the user in the future.
The purpose of the main function is only to accept a request from the user, quickly distribute tasks among workers and respond without waiting for their completion.
What options can you recommend?
Using the celery group
When testing, the execution of tasks behaves unpredictably
@task(base=NotifierTask)
def celery_test(id, position):
    time.sleep(id)
    return "Time delay: %s, Position: %s" % (str(id), str(position))


def some_request(request):
    from django.http import JsonResponse
    if request.method == 'GET':

        # tasks_list = []
        # count = 0
        # while count < 10:
        #     count += 1
        #     tasks_list.append(celery_test.s(count))
        #
        # job = group(tasks_list)

        job = group([
            celery_test.s(10, 1),
            celery_test.s(2, 2),
            celery_test.s(2, 3),
            celery_test.s(2, 4),
            celery_test.s(5, 5),
        ])

        job.apply_async()


        return JsonResponse(dict(success='Done'))

Execution result
5a43a380d6d2b004415545.png
It can be seen that the task with id=10 was executed instantly, without delay, and according to the results, its value was added to the rest.
What explains this behavior or am I doing something wrong?
It turns out that the tasks keep the order of execution specified in the group, this can be seen from the flower and the worker log

Answer the question

In order to leave comments, you need to log in

1 answer(s)
V
Vladimir, 2017-12-28
@maximkv25

we collect tasks in a sheet and feed it all into the chord function

task_ = []
task_.append(clear_old_project.s(api_key=api_key))
for group_ in list_all_groups:
            task_.append(frequency_start.s(api_key=api_key,
                                           job_request_id=job_request_id,
                                           app_secret=app_secret,
                                           project_id=proj_id,
                                           group_id=group_.id,
                                           providers=providers,
                                           region_key=region_key,
                                           phrase_forms=phrase_forms))

chord(task_)(generate_reports_frequency.s(api_key, proj_id, job_request_id, app_secret, providers, region_key,o_currency))

upon completion of all tasks, the results will be transferred to generate_reports_frequency as a list of jn returns from all functions into one variable (first parameter)

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question