M
M
Mr_Sinister2016-10-11 22:33:47
Python
Mr_Sinister, 2016-10-11 22:33:47

How to implement RabbitMQ (Pika) multi-threaded message processing?

There is a task: to process N messages in parallel by one consumer. Running N processes is not an option (the database crashes due to a large number of connections). In this case, each process after completing the processing of the next message must send basic_ack to the broker. How to implement it?
Database: MongoDB (using MongoEngine)
I establish a connection to the queue like this:

def create_connect_rabbitmq():
    return pika.BlockingConnection(pika.ConnectionParameters(
        host=app_config['RABBITMQ_HOST'],
        port=app_config['RABBITMQ_PORT'],
        virtual_host=app_config['RABBITMQ_VIRTUAL_HOST'],
        credentials=pika.PlainCredentials(
            username=app_config['RABBITMQ_USERNAME'],
            password=app_config['RABBITMQ_PASSWORD']
        )
    ))

def create_queues(channel):
    channel.queue_declare(
        queue=app_config['RABBITMQ_QUEUE']['PROFILE_PARSER'],
        durable=True
    )

connection = create_connect_rabbitmq()
channel = connection.channel()
channel.basic_qos(prefetch_count=app_config['RABBITMQ_PREFETCH_COUNT'])

database = create_connect_database()

create_queues(channel)
print(' [*] Waiting for messages. To exit press CTRL+C')

channel.basic_consume(
    consumer_callback=profile_handler,
    queue=app_config['RABBITMQ_QUEUE']['PROFILE_PARSER'],
    no_ack=False
)
channel.start_consuming()

Answer the question

In order to leave comments, you need to log in

1 answer(s)
A
Alexey Cheremisin, 2016-10-12
@leahch

As I understand it, connections to mongo end. I suspect that you need to use the connection pool. To do this, you need to set reasonable limits on the number of connections, as described here - api.mongodb.com/python/current/faq.html#how-does-c...
But I could be wrong.
And yes, we are not launching a fork, but a thread.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question