A
A
Alexander Lebedev2017-12-23 15:05:32
Python
Alexander Lebedev, 2017-12-23 15:05:32

How to get values ​​from multiple concurrently running processes in Python?

Good day.
There is a code that simultaneously launches several asynchronous processes according to the list. It is necessary to get the result from each process, and update the status in the database. Code example:

from time import sleep
from multiprocessing import Process, Value
import subprocess

def worker_email(keyword, func_result):
    subprocess.Popen(["python", "mongoworker.py", str(keyword)])
    func_result.value = 1
    return True

keywords_list = ['apple', 'banana', 'orange', 'strawberry']

if __name__ == '__main__':
    for keyword in keywords_list:
        # Выполняю задачу
        func_result = Value('i', 0)
        p = Process(target=worker_email, args=(keyword,func_result))
        p.start()
        # Обновляю статус задачи
        if func_result.value == 1:
            stream.update_one({'_id': doc['_id']}, {"$set": {"status": True}}, upsert=False)

What is the problem: if you use p.join(), then everything works, but the processes are executed in turn. If not used, then the processes are not closed and the status is not updated. A working option is to use p.join(), but execute not the function code, but subprocess.Popen, but this looks somehow obscene.
In fact, I would appreciate any advice :)

Answer the question

In order to leave comments, you need to log in

1 answer(s)
A
Alexander Lebedev, 2017-12-23
@sortarage

I decided by transferring the connection to the database and checking the result in the worker function itself. More or less like this:

def worker_email(keyword, task_id):

    # Соединяюсь с базой
    client = MongoClient('mongodb://localhost:27017/')
    db = client.admetric
    stream = db.stream

    sleep(10)
    print('Yo:' + keyword)

    # Обновляю статус задачи на "Выполнено" (если все ок) или не меняю статус и отправляю на повторое выполнение (если не ок)
    if True:
        stream.update_one({'_id': task_id}, {"$set": {"status": True}}, upsert=False)

    # Отключаюсь от базы
    client.close()
    return True

UPD: A more detailed version:
def update_status(task_id, func_result):
    # Соединяюсь с базой
    client = MongoClient('mongodb://localhost:27017/')
    db = client.admetric
    stream = db.stream

    # Обновляю статус задачи на "Выполнено" (если все ок) или не меняю статус и отправляю на повторое выполнение (если не ок)
    if func_result:
        stream.update_one({'_id': task_id}, {"$set": {"status": True}}, upsert=False)

    # Отключаюсь от базы
    client.close()

def yo_func(keyword):
    sleep(10)
    print('Yo:' + keyword)
    return True

def worker_email(keyword, task_id):
    update_status(task_id, yo_func(keyword))

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question