A
A
acerrusm2018-03-13 07:00:41
PHP
acerrusm, 2018-03-13 07:00:41

How to make Swoole socket server work in conjunction with RabbitMQ?

Hello. Having played enough with RabbitMQ and Swoole separately, I decided to combine them. The essence of the task:
1) Php application sends a message to the RabbitMQ worker
2) RabbitMQ worker receives the message and broadcasts it to all existing socket connections
To begin with, I created a Swoole socket server that listens to connections (swoole_sever.php). I run it in one terminal window.

$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);

$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
    echo "connection open: {$req->fd}\n";
});

$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
    echo "received message: {$frame->data}\n";
    $server->push($frame->fd, json_encode(["hello", "world"]));
});

$server->on('close', function($server, $fd)
{
    echo "connection close: {$fd}\n";
});

$server->start();

Next, in another terminal window, the RabbitMQ worker is launched, which waits for a message (worker.php)
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done", "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);


    // Здесь я пытаюсь подключиться к сокету и отправить сообщение
    $cli = new \swoole_http_client('0.0.0.0', 2345);

    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });

    $cli->upgrade('/', function($cli)
    {
        $cli->push('This is the message to send to Swoole server');
        $cli->close();
    });
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

I create a new task in which a message will be sent to the RabbitMQ worker (task.php)
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

And finally I start the task
php new_task.php
As a result, the task successfully reaches the worker, it successfully fulfills the callback function
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

But nothing comes to the Swoole socket server. Even in the terminal I don't see the "connection open" message. However, if after that you stop the worker in the terminal, then in the window where the socket server is running, the function is triggered. How is this even possible? I've been trying to figure it out for 2 days now.
$server->on('close', function($server, $fd)

Answer the question

In order to leave comments, you need to log in

1 answer(s)
A
acerrusm, 2018-03-19
@acerrusm

There is news about Swoole + RabbitMQ. I got at least some explanation from the developer HERE . But again, the example code he posted doesn't work for me . I tried first on a virtual machine, then on a simple ubuntu server, but in vain. Although the picture is more or less clear.
I wrote to the developer about the problem, but he is still silent. Therefore, I want to ask those who are interested to try to run swoole together with rabbitmq. Suddenly, all the same, I'm a krivoruk and a working code example. If you encounter the same problem, be sure to write about it on github.
Step 1.
Install Swoole
If you will install Swoole, then install using the command:
I do not recommend installing by compiling, otherwise you will be tired of deleting later. uninstaller is not included and the "make uninstall" command will not work.
It must be version 2.1.1. because versions below do not have coroutine.
Step 2:
Install IDE-helper to make it easier to work with methods and classes
Step 3:
Install phpAMQP . This is the repository that swoole forked to itself and added support for Swoole. Those. the command
will install only phpAMQP WITHOUT Swoole support and you will need to manually add 2 files:
1) php-amqplib/PhpAmqpLib/Connection/AMQPSwooleConnection.php
2) php-amqplib/PhpAmqpLib/Wire/IO/SwooleIO.php
Step 4:
InstallRabbitMQ
Good Luck!
UPDATE : StackOverflow helped with the solution: https://stackoverflow.com/questions/49226659/swool...
The problem was that in worker.php I used an instance of the swoole_http_client class, which, as it turned out, works asynchronously.
For synchronous operation, you will need a WebSocketClient class, which can be found here
Next, replace

$cli = new \swoole_http_client('0.0.0.0', 2345);

    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });

    $cli->upgrade('/', function($cli)
    {
        $cli->push('This is the message to send to Swoole server');
        $cli->close();
    });

on the
$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();

All.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question