Answer the question
In order to leave comments, you need to log in
PHP and AMQP how?
It is necessary to organize a constant reception of messages from the RabbitMQ queue in php.
If you use the php extension (from pear) www.php.net/manual/en/book.amqp.php instead of the client on php github.com/videlalvaro/php-amqplib , then the speed of receiving messages from the queue is at least 2 times faster.
In the examples on off. The RabbitMQ website in Python is called, behind this method the data reception cycle is hidden, which calls a callback at the right time: channel.start_consuming()
For some reason, this is not provided for in the php extension. There is AMQPQueue::consume() - returns the specified number of messages and AMQPQueue::get() - returns 1 message and the number in the queue.
But it is not clear how best to organize the reception cycle in this case.
The first thing that comes to mind is to do something like this:
while(true) {
$message = $q->get(0); // Запрашиваем очередное сообщенеи из очереди
if($message['count'] === -1) {
// Если значение count равно -1, ждем дальше (сообщений в очереди еще нет)
usleep(50000);
} else {
// Вызываем функцию обработчик сообщения
if(rcv_msg($message)) {
// Сообщаем, что сообщение принято, если обработчик вернул TRUE
$q->ack($message['delivery_tag']);
} else {
break;
}
}
}
If you reduce the delay (usleep (...)) or completely eliminate it, then the load on the sys cpu (php activity and the rabbitmq service) becomes, of course, dumb. // Loop as long as the channel has callbacks registered
while(count($ch->callbacks)) {
$ch->wait(); // Ожидание сообщения происходит в недрах этого метода, цикл не выполняется пока их нет.
}
This code does not strain the cpu at all. Answer the question
In order to leave comments, you need to log in
You misunderstood the documentation a bit.
AMQPQueue::consume() is a blocking function, i.e. it will keep the thread running until the message returns.
AMQPQueue::get() is non-blocking, i.e. if there is no task, it will immediately return FALSE.
Those. all you need to do is something like this:
while(true) {
$q->consume("rcv_msg");
}
There was a similar task, it was solved through redis and its blpop operation + timeouts for connections
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question