Answer the question
In order to leave comments, you need to log in
How to properly use pyzmq ioloop with multithreading?
First, I'll explain what I want to achieve with the code I've written.
Application logic must be decoupled from the messaging system.
Above the message passing system, there should be an abstract wrapper that allows you to get rid of duplicate code.
Two different information transfer systems, such as rabbitmq and zeromq, should coexist in one process.
And here's what happened so far
class BaseZmqNode():
__metaclass__ = ABCMeta
def __init__(self, host, port, bind, hwm):
self.node = self.create_node()
self.node.host = host
self.port = port
self.context = zmq.Context().instance()
self.socket = self.create_socket()
if bind:
self.socket.bind(self.build_address(host, port))
else:
self.socket.connect(self.build_address(host, port))
self.set_hwm(hwm)
def set_hwm(self, hwm):
self.socket.set_hwm(hwm)
def send_multipart(self, message):
self.socket.send_multipart(message)
def send_json(self, json):
self.socket.send_json(json)
@abstractmethod
def create_node(self):
return BaseMessagingNode()
def close(self):
self.socket.close()
@staticmethod
def build_address(host, port):
strings = [host, ':', str(port)]
return ''.join(strings)
@abstractmethod
def create_socket(self):
pass
class BaseZmqReceiver(BaseZmqNode):
__metaclass__ = ABCMeta
def __init__(self, host, port, hwm, bind, on_receive_callback):
super(BaseZmqReceiver, self).__init__(host=host, port=port, bind=bind, hwm=hwm)
self.node.on_message_callback = on_receive_callback
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.on_message_received)
self.runner = ZmqLoopRunner(self.on_close)
self.runner.start()
def on_message_received(self, message):
return self.node.on_message_callback(message)
def create_node(self):
return ReceivingNode(None, None)
def on_close(self):
self.stream.close()
self.socket.close()
def close(self):
# super(BaseZmqReceiver, self).close()
self.runner.stop()
class ZmqLoopRunner(Thread):
def __init__(self, callback):
super(ZmqLoopRunner, self).__init__()
self.loop = IOLoop.current()
self.callback = callback
def run(self):
self.loop.start()
self.callback()
def stop(self):
self.loop.stop()
class ZmqTest(AbstractMessagingTest):
def setUp(self):
super(ZmqTest, self).setUp()
self.multipart_messages = self.create_multipart_messages(10)
def tearDown(self):
super(ZmqTest, self).tearDown()
def test_request_reply(self):
try:
requester = ZmqReq(host='tcp://localhost', port=6000)
self.request = 'Hello'
self.reply = 'World!'
replier = ZmqRep(host='tcp://*', port=6000, request_processor=self.on_request_received)
self.assertEqual(self.reply, requester.execute(request=self.request))
except Exception as ex:
print(ex)
finally:
replier.close()
requester.close()
def test_address_creation(self):
full_address = "tcp://localhost:5559"
self.assertEqual(full_address, ZmqSubscriber.build_address("tcp://localhost", 5559))
self.assertEqual('tcp://*:6000', ZmqPublisher.build_address("tcp://*", 6000))
def test_publisher_subscriber(self):
print('Testing pub sub')
publisher = ZmqPublisher('tcp://*', 6000)
subscriber = ZmqSubscriber('tcp://localhost', 6000, self.handle_message)
self.send_messages(publisher, wait=False)
sleep(0.5)
self.assertSequenceEqual(self.test_messages, self.received_messages)
publisher.close()
subscriber.close()
def handle_message(self, message):
self.base_handle_message(message[0])
def test_send_json(self):
print('Testing send json')
publisher = ZmqPublisher('tcp://*', 6000)
subscriber = ZmqSubscriber('tcp://localhost', 6000, self.handle_json_message)
md = {'1' : 1}
publisher.send_json(md)
publisher.close()
subscriber.close()
def create_multipart_messages(self, size):
messages = []
for i in range(size):
messages.append(['Multipart test message', str(i)])
return messages
def send_multipart_messages(self, sender):
for message in self.multipart_messages:
sender.send_multipart(message)
def test_multipart_messages(self):
print('Testing multipart')
publisher = ZmqPublisher('tcp://*', 6000)
subscriber = ZmqSubscriber('tcp://localhost', 6000, self.base_handle_message)
self.send_multipart_messages(publisher)
sleep(0.5)
self.assertSequenceEqual(self.multipart_messages, self.received_messages)
publisher.close()
subscriber.close()
def test_push_pull_multipart(self):
print('Testing pushpull multipart')
ventilator = ZmqPush('tcp://*', 6000)
worker = ZmqPull('tcp://localhost', 6000, self.base_handle_message)
self.send_multipart_messages(ventilator)
sleep(0.5)
self.assertSequenceEqual(self.multipart_messages, self.received_messages)
ventilator.close()
worker.close()
def handle_json_message(self, json):
print(str(json))
def test_push_pull(self):
print('Testing push pull')
ventilator = ZmqPush('tcp://*', 6000)
worker = ZmqPull('tcp://localhost', 6000, self.handle_message)
self.send_messages(ventilator, wait=False)
sleep(0.5)
self.assertSequenceEqual(self.test_messages, self.received_messages)
ventilator.close()
worker.close()
def on_request_received(self, message):
if message[0] == self.request:
return self.reply
else:
return 'ERROR'
File "/opt/leos/code/messaging_system/zeromq/ZmqLoopRunner.py", line 12, in run
self.loop.start()
File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 160, in start
super(ZMQIOLoop, self).start()
File "/Library/Python/2.7/site-packages/tornado/ioloop.py", line 646, in start
event_pairs = self._impl.poll(poll_timeout)
File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 132, in poll
z_events = self._poller.poll(1000*timeout)
File "/Library/Python/2.7/site-packages/zmq/sugar/poll.py", line 110, in poll
return zmq_poll(self.sockets, timeout=timeout)
File "_poll.pyx", line 125, in zmq.backend.cython._poll.zmq_poll (zmq/backend/cython/_poll.c:1705)
IndexError: list index out of range
Answer the question
In order to leave comments, you need to log in
I looked diagonally, maybe an empty list falls into your on_request_received , which ZmqRep tries to process for the last. Look at this and check for an empty list if it is.
If this is not the case, you should publish the code with the problem somewhere on github, for example. So that people do not suffer with copy-paste.
I think we need to better understand the ZMQStream documentation.
I just see in the code that you expect a stream to send you some messages, or it may turn out that it sends you a piece of data as soon as it appears in the socket, i.e. literally sends you a stream of bytes, from which you yourself must isolate meaningful structures - messages.
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question