B
B
Boris_B2014-08-08 15:37:03
Python
Boris_B, 2014-08-08 15:37:03

How to properly use pyzmq ioloop with multithreading?

First, I'll explain what I want to achieve with the code I've written.
Application logic must be decoupled from the messaging system.
Above the message passing system, there should be an abstract wrapper that allows you to get rid of duplicate code.
Two different information transfer systems, such as rabbitmq and zeromq, should coexist in one process.
And here's what happened so far

class BaseZmqNode():
    __metaclass__ = ABCMeta

    def __init__(self, host, port,  bind, hwm):
        self.node = self.create_node()
        self.node.host = host
        self.port = port
        self.context = zmq.Context().instance()
        self.socket = self.create_socket()
        if bind:
            self.socket.bind(self.build_address(host, port))
        else:
            self.socket.connect(self.build_address(host, port))
        self.set_hwm(hwm)

    def set_hwm(self, hwm):
        self.socket.set_hwm(hwm)

    def send_multipart(self, message):
        self.socket.send_multipart(message)

    def send_json(self, json):
        self.socket.send_json(json)
    @abstractmethod
    def create_node(self):
        return BaseMessagingNode()

    def close(self):
        self.socket.close()

    @staticmethod
    def build_address(host, port):
        strings = [host, ':', str(port)]
        return ''.join(strings)

    @abstractmethod
    def create_socket(self):
        pass

This is the base class for all nodes like pub, sub etc.
class BaseZmqReceiver(BaseZmqNode):

    __metaclass__ = ABCMeta

    def __init__(self, host, port, hwm, bind, on_receive_callback):
        super(BaseZmqReceiver, self).__init__(host=host, port=port, bind=bind, hwm=hwm)
        self.node.on_message_callback = on_receive_callback
        self.stream = ZMQStream(self.socket)
        self.stream.on_recv(self.on_message_received)
        self.runner = ZmqLoopRunner(self.on_close)
        self.runner.start()

    def on_message_received(self, message):
        return self.node.on_message_callback(message)

    def create_node(self):
        return ReceivingNode(None, None)

    def on_close(self):

        self.stream.close()
        self.socket.close()

    def close(self):
        # super(BaseZmqReceiver, self).close()

        self.runner.stop()

This is the base class for all nodes that use zmq ioloop.
class ZmqLoopRunner(Thread):

    def __init__(self, callback):
        super(ZmqLoopRunner, self).__init__()
        self.loop = IOLoop.current()
        self.callback = callback

    def run(self):
        self.loop.start()
        self.callback()

    def stop(self):
        self.loop.stop()

Loop is placed on a separate thread so that it doesn't block the application.
class ZmqTest(AbstractMessagingTest):

    def setUp(self):
        super(ZmqTest, self).setUp()
        self.multipart_messages = self.create_multipart_messages(10)

    def tearDown(self):
        super(ZmqTest, self).tearDown()

    def test_request_reply(self):
        try:
            requester = ZmqReq(host='tcp://localhost', port=6000)
            self.request = 'Hello'
            self.reply = 'World!'
            replier = ZmqRep(host='tcp://*', port=6000, request_processor=self.on_request_received)
            self.assertEqual(self.reply, requester.execute(request=self.request))
        except Exception as ex:
            print(ex)
        finally:
            replier.close()
            requester.close()

    def test_address_creation(self):
        full_address = "tcp://localhost:5559"
        self.assertEqual(full_address, ZmqSubscriber.build_address("tcp://localhost", 5559))
        self.assertEqual('tcp://*:6000', ZmqPublisher.build_address("tcp://*", 6000))

    def test_publisher_subscriber(self):

        print('Testing pub sub')
        publisher = ZmqPublisher('tcp://*', 6000)
        subscriber = ZmqSubscriber('tcp://localhost', 6000, self.handle_message)
        self.send_messages(publisher, wait=False)
        sleep(0.5)
        self.assertSequenceEqual(self.test_messages, self.received_messages)
        publisher.close()
        subscriber.close()

    def handle_message(self, message):
        self.base_handle_message(message[0])

    def test_send_json(self):

        print('Testing send json')
        publisher = ZmqPublisher('tcp://*', 6000)
        subscriber = ZmqSubscriber('tcp://localhost', 6000, self.handle_json_message)
        md = {'1' : 1}
        publisher.send_json(md)
        publisher.close()
        subscriber.close()

    def create_multipart_messages(self, size):
        messages = []
        for i in range(size):
            messages.append(['Multipart test message', str(i)])
        return messages

    def send_multipart_messages(self, sender):
        for message in self.multipart_messages:
            sender.send_multipart(message)


    def test_multipart_messages(self):

        print('Testing multipart')
        publisher = ZmqPublisher('tcp://*', 6000)
        subscriber = ZmqSubscriber('tcp://localhost', 6000, self.base_handle_message)
        self.send_multipart_messages(publisher)
        sleep(0.5)
        self.assertSequenceEqual(self.multipart_messages, self.received_messages)
        publisher.close()
        subscriber.close()

    def test_push_pull_multipart(self):

        print('Testing pushpull multipart')
        ventilator = ZmqPush('tcp://*', 6000)
        worker = ZmqPull('tcp://localhost', 6000, self.base_handle_message)
        self.send_multipart_messages(ventilator)
        sleep(0.5)
        self.assertSequenceEqual(self.multipart_messages, self.received_messages)
        ventilator.close()
        worker.close()


    def handle_json_message(self, json):
        print(str(json))

    def test_push_pull(self):

        print('Testing push pull')
        ventilator = ZmqPush('tcp://*', 6000)
        worker = ZmqPull('tcp://localhost', 6000, self.handle_message)
        self.send_messages(ventilator, wait=False)
        sleep(0.5)
        self.assertSequenceEqual(self.test_messages, self.received_messages)
        ventilator.close()
        worker.close()

    def on_request_received(self, message):
        if message[0] == self.request:
            return self.reply
        else:
            return 'ERROR'

And here are the tests. Depending on where I put the stream and socket closing code, they can pass, but sometimes throw an exception. For example this.
File "/opt/leos/code/messaging_system/zeromq/ZmqLoopRunner.py", line 12, in run
    self.loop.start()
  File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 160, in start
    super(ZMQIOLoop, self).start()
  File "/Library/Python/2.7/site-packages/tornado/ioloop.py", line 646, in start

    event_pairs = self._impl.poll(poll_timeout)
  File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 132, in poll
    z_events = self._poller.poll(1000*timeout)

  File "/Library/Python/2.7/site-packages/zmq/sugar/poll.py", line 110, in poll
    return zmq_poll(self.sockets, timeout=timeout)
  File "_poll.pyx", line 125, in zmq.backend.cython._poll.zmq_poll (zmq/backend/cython/_poll.c:1705)
IndexError: list index out of range

Or some test might fail. I'm confused because I don't understand what I'm doing wrong.

Answer the question

In order to leave comments, you need to log in

2 answer(s)
D
Disassociative, 2014-08-08
@Disassociative

I looked diagonally, maybe an empty list falls into your on_request_received , which ZmqRep tries to process for the last. Look at this and check for an empty list if it is.
If this is not the case, you should publish the code with the problem somewhere on github, for example. So that people do not suffer with copy-paste.

S
snowpiercer, 2014-08-09
@snowpiercer

I think we need to better understand the ZMQStream documentation.
I just see in the code that you expect a stream to send you some messages, or it may turn out that it sends you a piece of data as soon as it appears in the socket, i.e. literally sends you a stream of bytes, from which you yourself must isolate meaningful structures - messages.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question