K
K
Kirill Petrov2020-11-17 11:02:29
Python
Kirill Petrov, 2020-11-17 11:02:29

Writing to 1 variable from two threads. How to properly process a queue in python?

For example, there is such a simplified construction with the launch of 2 threads and the simultaneous rewriting of one variable, as a result, a random is formed in the last line of the output.

import threading as th

lenAllData = 1000000
queueData = []

def threadAddQueue():
    global queueData
    # Симулируем заполнение очереди
    for i in range(lenAllData):
        queueData.append(i)

def threadProcQueue():
    global queueData
    queueProceed = []
    # симулируем разбор очереди
    while True:
        if len(queueData) > 0:
            queueProceed = queueProceed + queueData
            queueData = []
            print(f'Length {len(queueProceed)}, last value {queueProceed[-1]}')

# Запускаем и получаем рандом в выводе
if __name__ == '__main__':
    th1 = th.Thread(target=threadAddQueue)
    th1.start()

    th2 = th.Thread(target=threadProcQueue)
    th2.start()

Output example

Length 33021, last value 33020
Length 52147, last value 52147
Length 71771, last value 71772
Length 106174, last value 106175
Length 143665, last value 143667
Length 184920, last value 184922
Length 223584, last value 223587
Length 242001, last value 242004
Length 277876, last value 277879
Length 313757, last value 313761
Length 331471, last value 351468
Length 374527, last value 394524
Length 393301, last value 413299
Length 408631, last value 428629
Length 425010, last value 464092
Length 441459, last value 502028
Length 479035, last value 557692
Length 498091, last value 597529
Length 536072, last value 652986
Length 572724, last value 709981
Length 609640, last value 764660
Length 646480, last value 824379
Length 682897, last value 881066
Length 724371, last value 943361
Length 761276, last value 997796


And at the same time, you can’t pause the first thread for a long time, because another part of the program will hang because of this.
How to properly process the queue without suspending the first thread and not losing data? At the same time, I need a pattern similar to global variables and threads (not asyncio)

Answer the question

In order to leave comments, you need to log in

3 answer(s)
K
Kirill Petrov, 2020-11-17
@Recosh

In general, I decided this:

import threading as th
from collections import deque

lenAllData = 1000000

class ClassForQueue:
    queueData = deque()

def threadAddQueue():
    # Симулируем заполнение очереди
    print('Start added to Queue')
    for i in range(lenAllData):
        ClassForQueue.queueData.append(i)
    print('All added to Queue')


def threadProcQueue():
    # симулируем разбор очереди
    result = deque()
    while True:
        if len(ClassForQueue.queueData):
            result.append(ClassForQueue.queueData.popleft())
        if result[-1] == lenAllData - 1:
            print(f'final! Length {len(result)}, last value {result[-1]}')
            break

# Запускаем и получаем адекватные данные в выводе
if __name__ == '__main__':
    th1 = th.Thread(target=threadAddQueue)
    th1.start()

    th2 = th.Thread(target=threadProcQueue)
    th2.start()

As it turned out, queues slowed down threads a lot, since I don’t need to fumble data in more than 1 thread, I decided to try deque , it began to work even faster than in the original example

B
bbkmzzzz, 2020-11-17
@bbkmzzzz

Queue will come to the rescue

V
Vladimir Korotenko, 2020-11-17
@firedragon

The standard solution in all languages ​​is either locking after receiving data, or lockfree primitives. In the last project, I used a dumb stream to get data and polls that simply return the current value. This made it possible to raise rps from a miserable 5 to 400

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question