V
V
Vladimir Moroz2020-01-10 12:17:43
Flask
Vladimir Moroz, 2020-01-10 12:17:43

How to set up 2 independent queues to process tasks by running a service on the same port?

How to delimit the queue and worker execution settings so that short tasks do not queue due to long tasks?
General description
Written Restful backend in flask. 3 endpoints. They work asynchronously through one port.
2 of them process the task quickly, 1 - for a long time. You need to separate them so that for short tasks there is always a worker who will process the task as soon as it arrives.
I can run through 2 ports. But the external server sends requests to one port.
Now there are 2 independent queues, but the tasks in them are executed in the general order, that is, short tasks are waiting for their turn, as I understand they have common workers.
Thank you in advance.
A reverse proxy is configured through Nginx.
Server and message queue settings:

from flask import Flask
from flask_restful import Api
from flask_sqlalchemy import SQLAlchemy
from flask_rest.celery_pack import make_celery
from flask_migrate import Migrate
from kombu import Queue, Exchange

app = Flask(__name__)

app.config.from_object('config')
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///mapping_status.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379',
    CELERY_QUEUES = (
    Queue('long', Exchange('long'), routing_key='long'),
    Queue('short', Exchange('short'), routing_key='short'),
    ),
    # CELERY ROUTES
    CELERY_ROUTES = {
    'flask_rest.flask_rest_release1.map_this': {'queue': 'long'},
    'flask_rest.flask_rest_release1.map_annul_dd': {'queue': 'short'},
    'flask_rest.flask_rest_release1.map_fns_dd': {'queue': 'short'}
    }
)

api = Api(app)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
celery = make_celery(app)

from flask_rest.flask_rest_release1 import long_task, short_task_1, short_task_2


api.add_resource(long_task, '/long_task')
api.add_resource(short_task_1, '/short_task_1')
api.add_resource(short_task_2, '/short_task_2')

UWSGI settings
[uwsgi]
master = true
http-socket = 0.0.0.0:8000
chdir = /root/to/chdir
smart-attach-daemon = celery_worker/all_queue.pid celery -A flask_rest.celery worker -Q long, short -E -n program_with_tasks --pidfile celery_worker/all_queue.pid
touch-reload = uwsgi_dev_1.ini
workers = 1
processes = 2
chmod-socket = 777
wsgi-file = run.py
callable = app
task_acks_late = True
task_track_started = True
; daemonize = uwsgi_dev_1.log
CELERYD_MAX_TASKS_PER_CHILD = 1 
CELERY_CREATE_MISSING_QUEUES = True

Answer the question

In order to leave comments, you need to log in

1 answer(s)
V
Vladimir Moroz, 2020-01-10
@Vladimyr1991

I read a little documentation and decided to conduct an experiment using the scientific poke method and solved the problem. It turned out that within one .ini file with settings, you can specify the launch of celery 2 times.
Individual settings can be made in the celery launch line.

[uwsgi]
master = true
http-socket = 0.0.0.0:8000
chdir = /app/mapping
smart-attach-daemon = celery_worker/long.pid celery -A flask_rest.celery worker -Q long -E -n worker_long --concurrency=3 --pidfile celery_worker/long.pid
smart-attach-daemon = celery_worker/short.pid celery -A flask_rest.celery worker -Q short -E -n worker_short --concurrency=1 --pidfile celery_worker/short.pid
touch-reload = uwsgi_prod.ini
workers = 4
processes = 2
chmod-socket = 777
wsgi-file = run.py
callable = app
task_acks_late = True
task_track_started = True
daemonize = uwsgi_prod.log

CELERYD_MAX_TASKS_PER_CHILD = 1 
CELERY_CREATE_MISSING_QUEUES = True

something like this...

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question