Z
Z
zetsuro2020-02-03 10:08:03
Python
zetsuro, 2020-02-03 10:08:03

How to work with psycopg2 and multiprocessing?

Application structure:

main.py

import multiprocessing
from postgresql import POSTGRESQL


class Core:

    def __init__(self):
        self.postgres = POSTGRESQL()

    def complete_level_3(self, sub_task_id):
        
        some_information = self.postgres.some_insert_func(sub_task_id)
        
        some_information_two = self.postgres.some_update_func(some_information)

    def run_level_two(self, task_id):

        sub_task_id_list = self.postgres.get_subtasks_id(task_id)

        for sub_task_id in sub_task_id_list:

            process = multiprocessing.Process(target=self.complete_level_3, args=(sub_task_id,))
            process.start()

    def run_level_one(self, task_id_list):

        for task_id in task_id_list:

            process = multiprocessing.Process(target=self.run_level_two, args=(task_id,))
            process.start()


Very cropped, but I think the essence is clear.

The main process is launched, which generates n-oe number of sub-processes, which in turn generate another level of sub-processes.

And all these processes interact with the Postgresql database using the psycopg2 driver.

The connection to the database is implemented through a pool connection.
autocommit=True.
Each request is placed in a separate function. Each function has its own connection and cursor object. After the function is executed, the cursor is closed and the connection object is returned to the pool.
The class itself that interacts with the database is implemented using the Singleton method.

The essence of the problem:
Under small loads, this whole circuit works correctly and without errors.
But the more parallel processes I create, the more errors occur in frequently used functions.
For example: "session creation", "history update", etc.

Mistakes, not quite standard. Based on the logs, it can be assumed that during the execute requests that fly at the same time, they are mixed during the flight, and changed by floating arguments. - And this is fucking nonsense, and this, in principle, cannot be, from the point of view of my logic. But the facts and logs say otherwise. When I look at the log file, I see that an argument flies into the insert that, in principle, could not have been there, because at the beginning of the function there are a lot of checks, and in general this argument was in another part of the code. (too before or after this function)

Adding locks and autocommit = True slightly corrected the situation, but not completely.

For example:
Two requests flies:
sql1 = "SELECT text, score FROM text_table WHERE id = %s" % (row_id,)
sql2 = "SELECT id FROM language WHERE type = %s" % (type_str,)


And, let's say, instead of row_id, type_str can arrive, although they even differ in types.
And this kind of error pops up from time to time.

I'm at a loss. Help me please.

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question