D
D
Danil Kislov2021-05-16 07:58:28
Python
Danil Kislov, 2021-05-16 07:58:28

How to fix hangups in Postgres database queries?

I use a python script to process messages received via the MQTT protocol, Mosquitto broker. For some time the script works correctly, but then hangs without giving any errors. Requests start to run slower than new ones arrive. The code:

import paho.mqtt.client as mqtt
import json
from contextlib import closing
import psycopg2
from datetime import datetime
import pytz
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
import requests


base_url = 'someurl.com'


def on_connect(client, userdata, rc, sdf):
    print("Connected with result code " + str(rc))


# The callback for when a PUBLISH message is received from the server.

with closing(psycopg2.connect(dbname='db1', user='admin', password='pas', host="localhost", options='-c statement_timeout=0')) as conn:
    with conn.cursor() as cursor:

        def on_message(client, userdata, msg):
            modes = [0, 10, 15, 25]
            postgre_bool = ['false', 'true']
            data = json.loads(msg.payload.decode())

            if 'id' in data:
                        tr_id = data['id']
                        status = "OK"
                        cursor.execute("INSERT INTO api_telelog (imei, log) VALUES (%s, %s)", (tr_id, json.dumps(data)))
                        conn.commit()
                        cursor.execute("SELECT owner_id, latitude, longitude, id FROM api_scooter WHERE tracker_id = %s;", (tr_id, ))
                        is_in_base = False
                        for _ in cursor:
                            is_in_base = True
                        cursor.execute("SELECT owner_id, latitude, longitude, id, status FROM api_scooter WHERE tracker_id = %s;",
                                       (tr_id,))
                        scooter_data = cursor.fetchone()
                        finally_command = "UPDATE api_scooter SET "
                        vars = []
                        if is_in_base:
                            cursor.execute(
                                "SELECT email FROM api_employee WHERE id = %s;",
                                (scooter_data[0],))
                            email = cursor.fetchone()[0]
                            if 'lat' in data:
                                lat = data['lat']
                                lon = data['lon']
                                vars += [lat, lon]
                                finally_command += "latitude = %s, longitude = %s, "
                            else:
                                status = "NG"
                            if 'lamp' in data:
                                lamp = postgre_bool[int(data['lamp'])]
                                vars.append(lamp)
                                finally_command += "lamp = %s, "
                            if 'slock' in data:
                                lock = postgre_bool[int(data['slock'])]
                                vars.append(lock)
                                finally_command += "lock = %s, "
                            if 'enb' in data:
                                engine = postgre_bool[int(data['enb'])]
                                vars.append(engine)
                                finally_command += "engine = %s, "
                            if 'gear' in data:
                                mode = modes[int(data['gear'])]
                                vars.append(mode)
                                finally_command += "speed_limit = %s, "
                            if 'ubat' in data:
                                battery = data['ubat']
                                vars.append(battery)
                                finally_command += "battery = %s, "
                                command = """SELECT min_voltage, max_voltage FROM api_scooter WHERE tracker_id = %s;"""
                                cursor.execute(command, (tr_id, ))
                                voltages = cursor.fetchone()
                                percent = ((battery/1000) - voltages[0])/(voltages[1] - voltages[0])
                                if ((percent*100) < 20) and scooter_data[4] != "LB":
                                    send_alert_to_mail("Заряд самоката id " + str(scooter_data[3]) + " ниже 20% ", email)
                                    status = "LB"
                            if 'sat' in data:
                                gps = data['sat']
                                vars.append(gps)
                                finally_command += "gps = %s, "
                                if int(data['sat']) == 0:
                                    status = 'NG'
                                    send_alert_to_mail("Потеряно GPS соединение с самокатом id " + str(scooter_data[3]), email)
                            if 'csq' in data:
                                gsm = data['csq']
                                vars.append(gsm)
                                finally_command += "gsm = %s, "
                            if 'alarm' in data:
                                if data['alarm'] != 0:
                                    send_alert_to_mail("Поптыка угона самоката " + str(scooter_data[3]) + " " +
                                                       str(scooter_data[1]) + ", " + str(scooter_data[2]), email)
                                    status = "HJ"
                            if 'ver' in data:
                                vars.append(data['ver'])
                                finally_command += "firmware_version = %s, "
                            finally_command += "last_ping = %s, alert_status = %s WHERE tracker_id = %s;"
                            vars += [pytz.utc.localize(datetime.utcnow()), status, tr_id]
                            cursor.execute(finally_command, vars)
                            conn.commit()


        client = mqtt.Client()
        client.on_connect = on_connect
        client.on_message = on_message
        client.tls_set()
        client.tls_insecure_set(True)

        client.connect("www.someurl.ru", 8883, 60)

        client.subscribe("scooter")

        client.loop_forever()

Please explain the reason for this behavior and how to resolve the error. Thanks in advance!

Answer the question

In order to leave comments, you need to log in

3 answer(s)
T
Timtaran, 2021-05-16
@HooinKema

The reason is most likely in the ping to the database (if it is remote), the solution is to raise your PSQL server

R
Romses Panagiotis, 2021-05-16
@romesses

There are remarks to the code:
Everything is dumped together (refactoring is required)
The risk of SQL injections. Use SQL query builder/ORM.
There are no checks of the result of working with the DBMS.
How to solve:
send_alert_to_mailsynchronous and causes blocking - make it an asynchronous call and even send to a queue specifically for sending emails.
Wrap all database calls with a decorator with execution time measurement. Usually, the problem is not in the DBMS, but in the interaction with it and all sorts of locks, as with send_alert_to_mail.

Poptyka
bug :-)

D
Dimonchik, 2021-05-16
@dimonchik2013

not enough code
but in any case
https://habr.com/ru/post/486710/

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question