Skip to main content

Python concurrent.futures


ProcessPoolExecutor

import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed


def task(index: int) -> int:
print(f"task {index} start in {os.getpid()}")

if index % 3 == 0:
raise RuntimeError(f"task {index} error")

time.sleep(3 - index % 3)

return index


with ProcessPoolExecutor(max_workers=3) as executor:
# Non-blocking
# pool 크기에 상관 없이 작업 자체는 바로 제출 됨
futures = [executor.submit(task, *args) for args in [(i,) for i in range(10)]]
print("submit done")

# 완료된 작업을 가져옴
for future in as_completed(futures):
try:
# 에러 없이 task가 완료되었을 때 결과를 가져옴
result = future.result()
print(f"task {result} done")
except BaseException as e:
# task를 실행하는 도중 발생한 에러 처리
print(f"exception: {e}")
import os
import time
from concurrent.futures import Future, ProcessPoolExecutor, wait


def task(index: int) -> int:
print(f"task {index} start in {os.getpid()}")

if index % 3 == 0:
raise RuntimeError(f"task {index} error")

time.sleep(3 - index % 3)

return index


def callback(fut: Future[int]):
try:
# 에러 없이 task가 완료되었을 때 결과를 가져옴
result = fut.result()
print(f"task {result} done")
except BaseException as e:
# task를 실행하는 도중 발생한 에러 처리
print(f"exception: {e}")


with ProcessPoolExecutor(max_workers=3) as executor:
# Non-blocking
# pool 크기에 상관 없이 작업 자체는 바로 제출 됨
futures: list[Future[int]] = []
for args in [(i,) for i in range(10)]:
future = executor.submit(task, *args)
future.add_done_callback(callback)
futures.append(future)
print("submit done")

# 모든 작업이 완료될 때까지 대기
wait(futures)
warning

ProcessPoolExecutor를 사용할 때, timeout 기능을 제공하는 Future의 메서드나, 이를 인자로 받는 함수를 사용하면 프로그램 상에서는 timeout된 것으로 보이지만 실제로는 작업이 계속 진행되는 문제가 있습니다.

timeout이 필요한 경우, ProcessPoolExecutor._processes를 직접 사용하여 Process를 종료시키거나 multiprocessing 모듈로 프로그램을 작성해야합니다.

ProcessPoolExecutor with asyncio

asyncio로 프로그래밍을 하다보면 CPU-bound 작업에 의해 이벤트 루프가 블로킹되어 다른 작업이 실행되지 않는 경우가 있습니다. 이를 해결하기위해 ProcessPoolExecutor만 사용하는 경우에도 이벤트 루프가 블로킹되기 때문에, loop.run_in_executor를 사용하여 함수 실행을 awaitable로 변경하는 작업을 해줘야합니다.

import asyncio
from concurrent.futures import ProcessPoolExecutor

def task(index: int) -> int:
print(f"task {index} start in {os.getpid()}")

if index % 3 == 0:
raise RuntimeError(f"task {index} error")

time.sleep(10 - index % 3)

return index

async def cpu_bound_task() -> list[int]:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=3) as executor:
# Non-blocking
# pool 크기에 상관 없이 작업 자체는 바로 제출 됨
futures = [
loop.run_in_executor(executor, task, *args) for args in [(i,) for i in range(10)]
]
print("submit done")

results = []
for future in asyncio.as_completed(futures):
try:
# 에러 없이 task가 완료되었을 때 결과를 가져옴
# 기다리는 동안 다른 async 작업이 실행될 수 있음
result = await future
print(f"task {result} done")

results.append(result)
except BaseException as e:
# task를 실행하는 도중 발생한 에러 처리
print(f"exception: {e}")

return results

자식 프로세스에서 signal 핸들링

프로세스가 생성될 때,

  • fork의 경우 자식 프로세스는 부모의 모든 자원을 상속받습니다. 따라서 생성되는 시점의 부모 프로세스의 signal handler를 상속받습니다.
  • spawn의 경우 자식 프로세스는 필요한 자원만 상속받습니다.

자식 프로세스가 시그널에 의해 종료되는 것을 막아야하는 경우가 있는데, 이를 위해 Process 시작 시 signal handler를 무시하도록 설정해야합니다.

import signal
from concurrent.futures import ProcessPoolExecutor


def ignore_signal() -> None:
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)


with ProcessPoolExecutor(5, initializer=ignore_signal) as executor:
...