Answer the question
In order to leave comments, you need to log in
How to run an infinite loop in a django application?
There is a django application. The application generates tasks through celery beat.
Tasks are nothing more than a series of requests to the API of third-party services.
- Requests go through a proxy
- Requests can be quite long
- The size of the series is determined by the type of task, and can include several dozen requests.
Initially, everything worked on celery, but the impracticality of this approach is quite obvious. Workers slept 90% of the working time (connecting to a proxy and waiting for responses).
Naturally, with an increase in the number of tasks, I would have to create an unreasonable number of workers that would simply consume resources.
The choice became obvious - to rewrite everything on asyncio.
But, as far as I know, celery and asyncio are not very friendly from the box. Hence celery disappears.
(Yes, I heard about celery-pool-asyncio, but in my opinion it is too dreary for such a simple task)
And so. I wrote an asynchronous handler that looks like this (this is not really important, but in case someone is interested)
async def process_response_queue(queue: asyncio.Queue):
"""
Получает результаты задач и перезаписывает статус в redis
"""
while True:
struct, result = await queue.get()
SetTaskStatus(struct, result)
class TaskManager:
"""
Управление задачами
"""
def __init__(self):
self.queue = asyncio.Queue()
self.tasks: Dict[str, asyncio.Task] = {}
async def Handler(self, **kwargs):
"""
Отдает задачу на выполнение. Удаляет в случае ошибки. Результат помещает в queue
"""
struct = kwargs.get('struct')
try:
# ApiClient просто отправляет запросы
await self.queue.put((struct, await ApiClient(struct)))
finally:
del self.tasks[struct.get('key')]
print(f"Task for {struct.get('key')} canceled")
def start_processor(self):
self.tasks["_processor"] = asyncio.create_task(process_response_queue(self.queue))
def start_new_task(self, **kwargs):
"""
# Создает новую задачу
"""
task_ig = kwargs.get('struct').get('key')
self.tasks[task_ig] = asyncio.create_task(self.Handler(**kwargs))
async def mainloop():
default.REDIS_CLIENT.flushdb()
task_manager = TaskManager()
task_manager.start_processor()
while True:
for key in default.REDIS_CLIENT.scan_iter("task:*"):
struct = json.loads(default.REDIS_CLIENT.get(key).decode())
struct.get('key') = key.decode()
if struct.get("status") == "created":
account = await sync_to_async(Accounts.objects.get)(id=struct["account_id"])
project = await sync_to_async((lambda x: x.cluster.project))(account)
task_manager.start_new_task(account=account, project=project, struct=struct)
struct["status"] = "pending"
default.REDIS_CLIENT.set(key, json.dumps(struct))
def LoopStart():
asyncio.run(mainloop())
Thread(target=LoopStart, daemon=True).start()
Answer the question
In order to leave comments, you need to log in
I would stop using Django models, they are not intended to be used in a concurrent environment, and write a separate asynchronous microservice connected to the django project through a couple of queues.
Initially, everything worked on celery, but the impracticality of this approach is quite obvious. Workers slept 90% of the working time (connecting to a proxy and waiting for responses).the output is wrong, this is a normal solution, unless there was a problem waiting for answers.
Naturally, with an increase in the number of tasks, I would have to create an unreasonable number of workers that would simply consume resources.
I would not want to take everything outside the application, as it is important to leave the ability to work directly with modelsoption, run the event loop in custom django-admin commands, and the command itself through the same systemd
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question