I
I
i3a4em2022-03-28 05:17:09
Django
i3a4em, 2022-03-28 05:17:09

How to run an infinite loop in a django application?

There is a django application. The application generates tasks through celery beat.
Tasks are nothing more than a series of requests to the API of third-party services.
- Requests go through a proxy
- Requests can be quite long
- The size of the series is determined by the type of task, and can include several dozen requests.

Initially, everything worked on celery, but the impracticality of this approach is quite obvious. Workers slept 90% of the working time (connecting to a proxy and waiting for responses).
Naturally, with an increase in the number of tasks, I would have to create an unreasonable number of workers that would simply consume resources.
The choice became obvious - to rewrite everything on asyncio.
But, as far as I know, celery and asyncio are not very friendly from the box. Hence celery disappears.
(Yes, I heard about celery-pool-asyncio, but in my opinion it is too dreary for such a simple task)

And so. I wrote an asynchronous handler that looks like this (this is not really important, but in case someone is interested)

async def process_response_queue(queue: asyncio.Queue):
    """
    Получает результаты задач и перезаписывает статус в redis
    """
    while True:
        struct, result = await queue.get()
        SetTaskStatus(struct, result)

class TaskManager:
    """
    Управление задачами
    """
    def __init__(self):
        self.queue = asyncio.Queue()
        self.tasks: Dict[str, asyncio.Task] = {}

    async def Handler(self, **kwargs):
        """
        Отдает задачу на выполнение. Удаляет в случае ошибки. Результат помещает в queue
        """
        struct = kwargs.get('struct')

        try:
            # ApiClient просто отправляет запросы
            await self.queue.put((struct, await ApiClient(struct)))
            
        finally:
            del self.tasks[struct.get('key')]
            print(f"Task for {struct.get('key')} canceled")
        
    def start_processor(self):

        self.tasks["_processor"] = asyncio.create_task(process_response_queue(self.queue))

    def start_new_task(self, **kwargs):
        """
        # Создает новую задачу
        """
        task_ig = kwargs.get('struct').get('key')
        self.tasks[task_ig] = asyncio.create_task(self.Handler(**kwargs))
        
async def mainloop():

    default.REDIS_CLIENT.flushdb()
    task_manager = TaskManager()
    task_manager.start_processor()


    while True:
        for key in default.REDIS_CLIENT.scan_iter("task:*"):
            struct = json.loads(default.REDIS_CLIENT.get(key).decode())
            struct.get('key') = key.decode()
            
            if struct.get("status") == "created":
                account = await sync_to_async(Accounts.objects.get)(id=struct["account_id"])
                project = await sync_to_async((lambda x: x.cluster.project))(account)
                task_manager.start_new_task(account=account, project=project, struct=struct)
                struct["status"] = "pending"
                default.REDIS_CLIENT.set(key, json.dumps(struct))


The mainloop and ApiClient function interacts with Django ORM and auxiliary modules. I would not want to take everything outside the application, because it is important to leave the opportunity to work directly with models. (By the way, maybe there is a way?)

It starts like this:
def LoopStart():
    asyncio.run(mainloop())

Thread(target=LoopStart, daemon=True).start()


There is an opinion that asyncio + threading is a bad solution.

What problems might arise with such an implementation?
What should I do if this is really a bad decision?

Answer the question

In order to leave comments, you need to log in

2 answer(s)
S
Sergey Gornostaev, 2022-03-28
@i3a4em

I would stop using Django models, they are not intended to be used in a concurrent environment, and write a separate asynchronous microservice connected to the django project through a couple of queues.

D
Dr. Bacon, 2022-03-28
@bacon

Initially, everything worked on celery, but the impracticality of this approach is quite obvious. Workers slept 90% of the working time (connecting to a proxy and waiting for responses).
Naturally, with an increase in the number of tasks, I would have to create an unreasonable number of workers that would simply consume resources.
the output is wrong, this is a normal solution, unless there was a problem waiting for answers.
I would not want to take everything outside the application, as it is important to leave the ability to work directly with models
option, run the event loop in custom django-admin commands, and the command itself through the same systemd

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question