R
R
Ranc582020-04-20 23:25:07
Python
Ranc58, 2020-04-20 23:25:07

What is the correct way to use ProcessPoolExecutor with aiohttp?

There is an aiohttp server that runs some cpu bound task in a separate process. Simplified example below:

import asyncio
import time
from concurrent.futures.process import ProcessPoolExecutor


from aiohttp import web


async def task_for_executor(app):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(
        app.process_pool,
        time.sleep, 2
    )
    print("ok")


async def input_queue_listener(app):
    while True:
        await app.input_queue.get()
        app.input_queue.task_done()
        loop = asyncio.get_event_loop()
        loop.create_task(task_for_executor(app))


async def index(request):
    await request.app.input_queue.put(True)
    return web.json_response(data={"status": "ok"}, status=201)


async def startup(app):
    input_queue = asyncio.Queue()
    app.process_pool = ProcessPoolExecutor(3)
    app.input_queue = input_queue
    app.listen_task = asyncio.create_task(
        input_queue_listener(app)
    )


async def shutdown(app):
    app.listen_task.cancel()
    app.process_pool.shutdown()


if __name__ == '__main__':
    app = web.Application()
    app.on_startup.append(startup)
    app.on_shutdown.append(shutdown)
    app.add_routes([web.get('/', index),])
    web.run_app(app)


Проблема:
Когда мы дергаем ручку запускается задача в executor'e. Все вроде ок. Можем еще несколько раз дернуть ручку-запустить задачу. Если мы завершим работу сервера через Ctrl+C во время обработки задач (например послать штук 10 запросов) - все заверишься нормально. Но когда мы завершаем работу сервера ПОСЛЕ выполнения всех задач (т.е. в момент когда ничего не выполняется) - то вылетают ошибки:
======== Running on http://0.0.0.0:8080 ========
(Press CTRL+C to quit)
ok
^CProcess SpawnProcess-1:
Process SpawnProcess-3:
Process SpawnProcess-2:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 97, in get
    res = self._recv_bytes()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt

Answer the question

In order to leave comments, you need to log in

1 answer(s)
R
Ranc58, 2020-04-21
@Ranc58

Вопрос решился советом с другого ресурса. Продублирую сюда:


Проблема не в том, что sigint не обрабатывается, а в том, что он поднимается в каждый форк и там все взрывается, поэтому в трейсе нарисована куча KeyboardInterrupt вместо нуля.
А если sigint в форках игнорировать, то похоже что все ништяк. По идее в кейсе с aiohttp то же самое должно прокатить.

Необходимо указать initializer при запуске пула:
app.process_pool = ProcessPoolExecutor(3, initializer=register_signal_handler)

И код самой функции:
def register_signal_handler():
    signal.signal(signal.SIGINT, lambda _, __: None)

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question