Answer the question
In order to leave comments, you need to log in
How to implement RabbitMQ (Pika) multi-threaded message processing?
There is a task: to process N messages in parallel by one consumer. Running N processes is not an option (the database crashes due to a large number of connections). In this case, each process after completing the processing of the next message must send basic_ack to the broker. How to implement it?
Database: MongoDB (using MongoEngine)
I establish a connection to the queue like this:
def create_connect_rabbitmq():
return pika.BlockingConnection(pika.ConnectionParameters(
host=app_config['RABBITMQ_HOST'],
port=app_config['RABBITMQ_PORT'],
virtual_host=app_config['RABBITMQ_VIRTUAL_HOST'],
credentials=pika.PlainCredentials(
username=app_config['RABBITMQ_USERNAME'],
password=app_config['RABBITMQ_PASSWORD']
)
))
def create_queues(channel):
channel.queue_declare(
queue=app_config['RABBITMQ_QUEUE']['PROFILE_PARSER'],
durable=True
)
connection = create_connect_rabbitmq()
channel = connection.channel()
channel.basic_qos(prefetch_count=app_config['RABBITMQ_PREFETCH_COUNT'])
database = create_connect_database()
create_queues(channel)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.basic_consume(
consumer_callback=profile_handler,
queue=app_config['RABBITMQ_QUEUE']['PROFILE_PARSER'],
no_ack=False
)
channel.start_consuming()
Answer the question
In order to leave comments, you need to log in
As I understand it, connections to mongo end. I suspect that you need to use the connection pool. To do this, you need to set reasonable limits on the number of connections, as described here - api.mongodb.com/python/current/faq.html#how-does-c...
But I could be wrong.
And yes, we are not launching a fork, but a thread.
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question