W
W
Warington2021-06-07 11:51:29
Python
Warington, 2021-06-07 11:51:29

"Pika.exceptions.StreamLostError: Transport indicated EOF" rabbitmq - python3 - pgsql where to dig?

In principle, the code itself is working (it was created for the purpose of the experiment) and performs the necessary functions. Randomly (in my opinion) in time, this exception occurs, which interrupts the program. Is it possible to somehow modify the program or something in the RabbitMQ configuration?
Do I understand correctly that the code is being processed for too long and the rabbit breaks the connection, or is the reason something else?

[[email protected] opt]# python3 receive_rabbit.py
Traceback (most recent call last):
File "receive_rabbit.py", line 62, in
channel.start_consuming()
File "/usr/local/lib/python3.6/site- packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming
self._process_data_events(time_limit=None)
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "/usr/local/lib/ python3.6/site-packages/pika/adapters/blocking_connection.py", line 824, in process_data_events
self._flush_output(common_terminator)
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection .py", line 523, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Transport indicated EOF

RabbitMQ log

=ERROR REPORT==== 6-Jun-2021::23:56:03 ===
closing AMQP connection <0.588.0> (172.17.0.1:38714 -> 172.17.0.2:5672):
{writer,send_failed,{error,timeout}}

The code itself

import pika
import psycopg2
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost', port=5672, credentials=pika.PlainCredentials('user', 'passwd'), heartbeat=0))
channel = connection.channel()

channel.queue_declare(queue='telegraf')


def callback(ch, method, properties, body):
        body=str(body).split('\\n')
        body[0]=body[0].replace("b'",'')
        i = 0
        for i in range(len(body)):
                conn = psycopg2.connect(dbname='dbname', user='user', password='password', host='host')
                cursor = conn.cursor()
                cursor.execute("SET application_name = 'TransferFromRabbitToPG'")
                if "'"not in body[i]:
                        str_body = body[i].split(' ')
                        tel_value = str_body[1]
                        tel_timestamp = str_body[2]
                        str_type_body = str_body[0].split('.')
                        tel_host = str_type_body[0]
                        tel_type = str_type_body[1]
                        counter = 2
                        tel_column = ''
                        for counter in range (len(str_type_body)):
                                if counter > 1:
                                        tel_column = tel_column + str_type_body[counter] + '_'
                        tel_column = tel_column.replace('-','_')
                        #print('host: '+tel_host)
                        #print('type: '+tel_type)
                        #print('column: '+tel_column)
                        #print('value: '+tel_value)
                        #print('time: '+tel_timestamp)
                        #print(len(str_type_body))
                        cursor.execute('BEGIN')
                        cursor.execute('CREATE SCHEMA IF NOT EXISTS '+tel_host)
                        cursor.execute('CREATE TABLE IF NOT EXISTS '+tel_host+'."'+tel_type+'" (time bigserial PRIMARY KEY)')
                        try:
                                cursor.execute('CREATE INDEX IF NOT EXISTS index_'+tel_host+'_'+tel_type.replace('-','_')+' ON '+tel_host+'."'+tel_type+'" (time DESC)')
                                cursor.execute('ALTER TABLE '+tel_host+'."'+tel_type+'" ADD COLUMN IF NOT EXISTS '+tel_column+' real')
                        except psycopg2.errors.DeadlockDetected:
                                per_null=""
                        cursor.execute('COMMIT')
                        try:
                                cursor.execute('INSERT INTO '+tel_host+'."'+tel_type+'" (time) VALUES (\''+tel_timestamp+'\')')
                        except psycopg2.errors.UniqueViolation:
                                per_null=""
                        cursor.execute('BEGIN')
                        cursor.execute('UPDATE '+tel_host+'."'+tel_type+'" SET '+tel_column+' = \''+tel_value+'\' WHERE time = \''+tel_timestamp+'\'')
                        cursor.execute('COMMIT')
                        cursor.close()
                        conn.close()
channel.basic_consume(
                        "telegraf",
                        callback,
                        auto_ack=True,
                        exclusive=False,
                        consumer_tag=None,
                        arguments=None)
channel.start_consuming()

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question