A
A
Alexey Obukhov2020-01-30 17:25:32
Python
Alexey Obukhov, 2020-01-30 17:25:32

How to optimize long poll implementation to deliver messages to clients from redis pub/sub in asynchronous python?

There are a couple of thousand devices that receive notifications about events and commands from the monitoring server.
The devices have pure c++ and websocketpp , and the same big problem with a very unstable gprs connection. If it is necessary to send a message from the server, we get a connection error and wait until the client finally wakes up and reconnects.

It was decided to change the connection to long polling. Since the main engine of the django server, I would like to stay within the framework of python.

I'm looking at aiohttp + aioredis. Now for each device in redis there is a separate channel (but you can change it to 1 for all).
As a result, after picking in the manuals of asynchronous libraries, such a freak was born. I strongly suspect that it will not work normally.

  1. for each client, a separate connection to the radish is already zashkvar.
  2. after the connection timeout expires, the client simply drops the connection itself, instead of the server responding with http 204.
  3. Since the connection is broken, the connection to the radish remains subscribed to the channel. During reconnection, a message may arrive in the channel, which the client will not receive after reconnecting. (now on websocket, when a client connects, the server crawls into a large database for unsent messages, and only then sits down on the pubsub of the radish).


Tell me, how to correctly implement a similar pattern of server behavior in asynchronous python?

import asyncio
import aioredis
from aiohttp import web

redis_url = 'redis://127.0.0.1:6379'


async def handle_poll(request):
    name = request.match_info.get('name', 'unknown')
    sub = await aioredis.create_redis(redis_url, encoding='utf-8')
    res = await sub.subscribe('channel:' + name)
    ch1 = res[0]
    msg = None

    if await ch1.wait_message():
        msg = await ch1.get()
    else:
        await sub.unsubscribe('channel:' + name)
        return web.Response(status=204)

    await sub.unsubscribe('channel:' + name)
    return web.Response(text=msg.decode('utf-8'))


def main():
    app = web.Application()
    app.router.add_get('/poll/{name}', handle_poll)
    web.run_app(app)


if __name__ == '__main__':
    try:
        main()
    except ValueError as e:
        print(e)


It is clear that the example is purely conditional. lack of authorization, logging and much more ;)

Answer the question

In order to leave comments, you need to log in

1 answer(s)
A
Alexey Obukhov, 2020-01-31
@Obukhoff

Well, since I don't seem to get an answer, I'll answer myself.
Here is a good example - https://gist.github.com/rcarmo/3f0772f2cbe0612b699...

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question