A
A
ARTISshoque2019-02-15 17:30:52
Python
ARTISshoque, 2019-02-15 17:30:52

How to use nested list in multiprocessing in Python3?

For example, I have a dictionary:

D = {'x': [1, 2, 3], 'y': [4, 5, 6], 'z': [7, 8, 9]}

and I want to process it using multiple processes. To simplify the example, let's just multiply each element of the lists by 10 (although in reality a more complex function will be used, for which multiprocessing is advisable). The code outline is like this:
import multiprocessing

PROCESSES = 4

class Worker(multiprocessing.Process):

    def __init__(self, work_queue):
        super().__init__()
        self.work_queue = work_queue

    def run(self):
        while True:
            try:
                key, index = self.work_queue.get()
                self.process(key, index)
            finally:
                self.work_queue.task_done()

    def process(self, key, index):
        D[key][index] = D[key][index] * 10
        # Здесь написана полная ерунда, но я просто хочу показать, что мне нужно получить


def main():
    work_queue = multiprocessing.JoinableQueue()
    for i in range(PROCESSES):
        worker = Worker(work_queue)
        worker.daemon = True
        worker.start()
    for key, value in D.items():
        for i in range(len(value)):
            work_queue.put((key, i))
    work_queue.join()

main()

The problem is that I don't see how a dictionary with nested lists can be passed to the queue, so that individual processes can store the results of their calculations in this dictionary. If I use Manager().dict(), then simple values ​​(strings, numbers) are preserved, but lists are not.
Please help me to sort out this issue.

Answer the question

In order to leave comments, you need to log in

1 answer(s)
R
Roman Kitaev, 2019-02-15
@ARTISshoque

It is not necessary to write so low-level (usually).

from concurrent.futures import ProcessPoolExecutor


D = {'x': [1, 2, 3], 'y': [4, 5, 6], 'z': [7, 8, 9]}


def process(arg):
    key, values = arg
    return key, [v * 10 for v in values]


if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        result = executor.map(process, D.items())

    print(dict(result))

To make sure it really works and doesn't block each other:
from concurrent.futures import ProcessPoolExecutor
from time import sleep


D = {'x': [1, 2, 3], 'y': [4, 5, 6], 'z': [7, 8, 9]}


def process(arg):
    key, values = arg
    print("executing", key, values)
    sleep(1)
    return key, [v * 10 for v in values]


if __name__ == "__main__":
    with ProcessPoolExecutor(2) as executor:
        result = executor.map(process, D.items())

    print(dict(result))

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question