G
G
Guerro692020-07-08 23:18:04
Python
Guerro69, 2020-07-08 23:18:04

How to constantly get events from LongPoll and VK?

The bottom line is, I am writing a code in python`e in order to listen to LongPoll VK and receive events from there, I wrote a code that allows you to pull one event from LongPoll`a for 25 seconds, but it’s not very convenient to work with this when I I took up the task of making sure that LongPoll was listening all the time, nothing good came of it. Here is my code:

import json
import requests

class LongPoll:
  def __init__(self, session, wait=25):
    self.token = session[0]
    self.grp_id = session[1]
    self.v = session[-1]
    self.wait = wait
    self.url = 'https://api.vk.com/method/'
    self.http = requests.Session()
    self.ts = None
    self.server = None
    self.key = None
    self.__update_data()

  def __update_data(self, updts=True):
    api_data = {
      'access_token': self.token,
      'v': self.v,
      'group_id': self.grp_id
      }
    r = self.http.post(f"{self.url}groups.getLongPollServer?", params=api_data).json()
    self.server = r['response']['server']
    self.key = r['response']['key']
    if updts:
      self.ts = r['response']['ts']

  def _lp_check(self):
    lp_data = {
      'act': 'a_check',
      'key': self.key,
      'wait': self.wait,
      'mode': 2,
      'ts': self.ts
    }
    r = self.http.post(self.server, params=lp_data, timeout=self.wait + 10).json()
    if "failed" in r:
      if r['failed'] == 1:
        self.__update_data()
      elif r['failed'] == 2:
        self.__update_data(updts=False)
      elif r['failed'] == 3:
        self.__update_data()
    else:
      return r['updates']

  def listening(self):
    print('LongPoll listening...\n')
    while True:
      yield [
        i
        for i in self._lp_check()
      ]

lp = LongPoll(['Здесь мой токен', 190582509, 5.95])

for event in lp.listening():
  print(event)


As a result, I do not get every event, but just a cycle in which one event spins forever.

Answer the question

In order to leave comments, you need to log in

1 answer(s)
G
Guerro69, 2020-07-08
@Guerro69

The answer is solved, it was just necessary to update the LongPoll server data after receiving the event:

def listening(self):
    print('LongPoll listening...\n')
    while True:
      for i in self._lp_check():
        yield i
        self.__update_data()

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question