P
P
Pavel2017-09-07 15:54:28
Python
Pavel, 2017-09-07 15:54:28

Flask Celery Singleton how?

I want to make the task a singleton.
utils/celery.py

def init_celery(app, celery):
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask

tasks.py
class CelerySingleton(celery.Task, Singleton):
    pass

@celery.task(base=CelerySingleton)
def calc():
# при обращении к sqlalchemy получаю
# application not registered on db instance and no application bound to current context

I understand that in tasks.py the CelerySingleton class inherits celery.Task, which is not a ContextTask at all. But how to do it right?

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question