M
M
Maxim Vasiliev2016-02-18 05:04:29
Python
Maxim Vasiliev, 2016-02-18 05:04:29

How to organize iterative-recursive parallel processing?

Available:

  • huge array of data (which is read iteratively)
  • the desire to process each data in a separate subprocess
  • processing one data can generate new tasks jcursively (not a huge number), and they also want to be parallelized

Obviously some combination of multiprocessing.Pool and multiprocessing.Queue is needed.
For Pool, you can use map functions with the chunksize parameter corresponding to the number of subprocesses, then the iterator will be asked for the number of tasks approximately corresponding to the number of free processors:
pool = Pool(num_workers)
for r in pool.map(data_handler, data_input_iter, num_workers): pass

But it is not clear how to generate tasks recursively here.
For Queue, you can set its size by the number of subprocesses, then the filling will be blocked approximately until the next handler is released.
queue = Queue(num_workers)
for datum in data_input_iter:
  queue.put(datum, block=True)

But this will block filling the queue from the handler subprocess.

Answer the question

In order to leave comments, you need to log in

1 answer(s)
M
Maxim Vasiliev, 2016-02-18
@qmax

In general, it turned out like this parsley.
How viable is yet to be seen.

class Worker():
    def __init__(self, consumer, queue):
        self.consumer = consumer
        self.queue = queue

    def __call__(self, inp):
        if inp is None:
            return
        res = self.consumer(inp)
        if res is not None:
            for r in res:
                self.queue.put(r)


def multiprocess(producer, consumer, num_workers):
    pool = mp.Pool(num_workers)
    queue = mp.Manager().Queue()
    worker = Worker(consumer, queue)

    for _ in pool.imap_unordered(worker, producer, num_workers):
        while not queue.empty():
            pool.apply(worker, (queue.get(),))

    pool.close()
    pool.join()

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question