Answer the question
In order to leave comments, you need to log in
What is the correct way to use ProcessPoolExecutor with aiohttp?
There is an aiohttp server that runs some cpu bound task in a separate process. Simplified example below:
import asyncio
import time
from concurrent.futures.process import ProcessPoolExecutor
from aiohttp import web
async def task_for_executor(app):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
app.process_pool,
time.sleep, 2
)
print("ok")
async def input_queue_listener(app):
while True:
await app.input_queue.get()
app.input_queue.task_done()
loop = asyncio.get_event_loop()
loop.create_task(task_for_executor(app))
async def index(request):
await request.app.input_queue.put(True)
return web.json_response(data={"status": "ok"}, status=201)
async def startup(app):
input_queue = asyncio.Queue()
app.process_pool = ProcessPoolExecutor(3)
app.input_queue = input_queue
app.listen_task = asyncio.create_task(
input_queue_listener(app)
)
async def shutdown(app):
app.listen_task.cancel()
app.process_pool.shutdown()
if __name__ == '__main__':
app = web.Application()
app.on_startup.append(startup)
app.on_shutdown.append(shutdown)
app.add_routes([web.get('/', index),])
web.run_app(app)
Ctrl+C
во время обработки задач (например послать штук 10 запросов) - все заверишься нормально. Но когда мы завершаем работу сервера ПОСЛЕ выполнения всех задач (т.е. в момент когда ничего не выполняется) - то вылетают ошибки:======== Running on http://0.0.0.0:8080 ========
(Press CTRL+C to quit)
ok
^CProcess SpawnProcess-1:
Process SpawnProcess-3:
Process SpawnProcess-2:
Traceback (most recent call last):
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
KeyboardInterrupt
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 97, in get
res = self._recv_bytes()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
Answer the question
In order to leave comments, you need to log in
Вопрос решился советом с другого ресурса. Продублирую сюда:
Проблема не в том, что sigint не обрабатывается, а в том, что он поднимается в каждый форк и там все взрывается, поэтому в трейсе нарисована куча KeyboardInterrupt вместо нуля.
А если sigint в форках игнорировать, то похоже что все ништяк. По идее в кейсе с aiohttp то же самое должно прокатить.
initializer
при запуске пула:app.process_pool = ProcessPoolExecutor(3, initializer=register_signal_handler)
def register_signal_handler():
signal.signal(signal.SIGINT, lambda _, __: None)
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question