Answer the question
In order to leave comments, you need to log in
Simple chat on tornado and websocket?
And again I, and again here.
I already created this question and it was answered to me (Thank you). https://toster.ru/q/540718
But this solution is not suitable for chat because it creates a new connection for each class, and as a result, a whole avalanche of messages with exponential growth is obtained. I think the problem can be solved with a separate redis handler that will work separately and iterate over the list. But the problem is that the GIL won't let me access that list at the same time. How can I synchronize threads so that when a change is accessed from a websocket, it will pause. Although this also has its drawbacks, I can lose messages during the suspension of the radish handler. Please help.
# Тут держу список соединений каждое соединение тоже представляет
# список из строки с названием треда и экземпляр класса
connections = []
# Только единожды инициализирую редис
r = redis.StrictRedis()
p = r.pubsub()
# и подписываю на паттерн сообщений
p.psubscribe("thread:"*)
class EchoWebSocket(tornado.websocket.WebSocketHandler):
async def open(self):
print("WebSocket opened")
def check_origin(self, origin):
return True
async def on_message(self, data):
json_data = json.loads(data)
if json_data['type'] == "SEND_MESSAGE":
token = json_data['token']
message = json_data['message']
thread = json_data['thread']
await self.post(message, thread, token)
if json_data['type'] == "SUBSCRIBE_THREAD":
thread = "thread:" + str(json_data['id'])
connections.append([thread, self])
async def post(self, message, thread, token):
http_client = httpclient.AsyncHTTPClient()
url = "http://127.0.0.1:8000/api/v0/thread/add/"
headers = {'Authorization': 'JWT ' + token, "Content-Type":"application/json"}
context = {'thread':thread, 'message':message}
body = json.dumps(context)
request = await http_client.fetch(request=url, method="POST", headers=headers, body=body)
http_client.close()
def on_close(self):
# Пока не проработал отписку от треда
self.connection.close()
print("WebSocket closed")
async def reader():
while True:
for evt in p.listen():
for channel, connection in connections:
if channel == evt['channel'].decode("UTF-8"):
await connection.write_message(evt["data"])
application = tornado.web.Application([
(r"/", EchoWebSocket),
])
application.autoreload = True
application.listen(8888, '127.0.0.1')
tornado.autoreload.start()
try:
loop = tornado.ioloop.IOLoop.current()
loop.start()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.current().stop()
Answer the question
In order to leave comments, you need to log in
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question