L
L
LakeForest2021-09-08 15:35:42
Python
LakeForest, 2021-09-08 15:35:42

I'm running a websocket and a queue listener in the same container. Why does the listener fall off after 1-2 requests?

import os
from models.ws_handler import WsHandler
from consumer import Consumer
import tornado.ioloop

if __name__ == '__main__':
    consumer = Consumer()
    consumer.start_start_consuming_thread()

    application = tornado.web.Application(handlers=[
        (r'/ws', WsHandler)
    ], default_host="0.0.0.0")

    port = int(os.environ["PORT"])
    application.listen(port=port)
    tornado.ioloop.IOLoop.instance().start()


consumer.py
class Consumer:

    def __init__(self):
        self.connector = Connector()

    def start_start_consuming_thread(self):
        threading.Thread(target=self.start_consuming).start()

    def start_consuming(self):
        queue = 'qwerty'
        self.connector.connect()
        self.connector.setup_queue(queue)
        self.connector.channel.basic_consume(queue=queue, auto_ack=True, on_message_callback=self.callback)
        self.connector.channel.start_consuming()

    def stop(self):
        self.connector.disconnect()

    def callback(self, ch, method, properties, body):
        body_arr = json.loads(body)
        client_id = body_arr.get("client_id")
        logging.info(f" [x] Received {str(client_id)}")
        for client in CLIENTS:
            if client.client_id == client_id:
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                future = asyncio.ensure_future(client.write_message(body))
                loop.run_until_complete(future)
                break


There is nothing unusual in the logs, it just stops listening and that's it.

INFO:root:User connects.
ws_server      | INFO:pika.adapters.utils.connection_workflow:Pika version 1.2.0 connecting to ('172.30.0.2', 5672)
ws_server      | INFO:pika.adapters.utils.io_services_utils:Socket connected: <socket.socket fd=12, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.30.0.6', 36408), raddr=('172.30.0.2', 5672)>
ws_server      | INFO:pika.adapters.utils.connection_workflow:Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>).
ws_server      | INFO:pika.adapters.utils.connection_workflow:AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
ws_server      | INFO:pika.adapters.utils.connection_workflow:AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
ws_server      | INFO:pika.adapters.blocking_connection:Connection workflow succeeded: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
ws_server      | INFO:pika.adapters.blocking_connection:Created channel=1
ws_server      | INFO:tornado.access:101 GET /ws (172.30.0.7) 0.81ms

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question