Answer the question
In order to leave comments, you need to log in
Flask, factory. How to start celery worker?
I can't figure out how to start celery worker.
The application starts via cli
autoapp.py:
# -*- coding: utf-8 -*-
"""Create an application instance."""
from flask.helpers import get_debug_flag
from myapp.app import create_app
from myapp.settings import DevConfig, ProdConfig
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
# -*- coding: utf-8 -*-
"""Extensions module. Each extension is initialized in the app factory located in app.py."""
from flask_bcrypt import Bcrypt
from flask_caching import Cache
from flask_debugtoolbar import DebugToolbarExtension
from flask_login import LoginManager
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from flask_wtf.csrf import CsrfProtect
from flask_celery import Celery
bcrypt = Bcrypt()
csrf_protect = CsrfProtect()
login_manager = LoginManager()
db = SQLAlchemy()
migrate = Migrate()
cache = Cache()
debug_toolbar = DebugToolbarExtension()
celery = Celery()
def create_app(config_object=ProdConfig):
"""An application factory, as explained here: http://flask.pocoo.org/docs/patterns/appfactories/.
:param config_object: The configuration object to use.
"""
app = Flask(__name__.split('.')[0])
app.config.from_object(config_object)
register_extensions(app)
register_blueprints(app)
register_errorhandlers(app)
register_shellcontext(app)
register_commands(app)
return app
def register_extensions(app):
"""Register Flask extensions."""
assets.init_app(app)
bcrypt.init_app(app)
cache.init_app(app)
db.init_app(app)
csrf_protect.init_app(app)
login_manager.init_app(app)
debug_toolbar.init_app(app)
migrate.init_app(app, db)
celery.init_app(app)
return None
celery -A myapp.extensions.celery worker
[2017-02-15 16:34:05,823: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
Trying again in 2.00 seconds...
Answer the question
In order to leave comments, you need to log in
Read slides.skien.cc/flask-hacks-and-best-practices/#17
Stuffed
initialization into
autoapp.py:
# -*- coding: utf-8 -*-
"""Create an application instance."""
from flask.helpers import get_debug_flag
from myapp.app import create_app
from myapp.settings import DevConfig, ProdConfig
from myapp.extensions import celery
from myapp.utils import init_celery
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
init_celery(app, celery)
def init_celery(app, celery):
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
celery worker --app=autoapp.celery --loglevel=debug
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question