asyncio와 스레드 안전한 Python¶
asyncio는 이벤트 루프를 스케줄러로 사용하여 작업 간 전환을 통해 높은 효율성의 동시성을 활성화하며, 이를 통해 논블로킹 I/O 작업을 수행할 수 있게 합니다. 이는 I/O 바운드 사용 사례의 성능을 향상시킵니다. 또한 CPU 바운드 작업을 스레드 또는 프로세스 풀로 오프로드할 수 있지만, 이는 여전히 CPython의 :term:`글로벌 인터프리터 잠금`에 의해 제한됩니다.
하지만 :ref:`무료 스레드 Python <freethreading-python-howto>`에서는 GIL이 비활성화되어 Python이 진정한 다중 스레드 코드를 실행할 수 있습니다. 이는 asyncio가 GIL이 부과하는 제한 없이 여러 CPU 코어의 이점을 취할 수 있음을 의미합니다.
Python 3.14부터 asyncio는 무료 스레드 Python에 대한 클래스 구현 지원이 기본 기능이 되었으며, asyncio 구현은 멀티 스레드 환경에서도 안전하게 사용할 수 있습니다.
단일 이벤트 루프는 한 코어에서 많은 연결을 동시에 처리할 수 있지만, 각 연결을 처리하기 위해 실행되는 Python 코드는 여전히 순차적으로 실행됩니다. 요청에 사소하지 않은 양의 요청당 계산이 포함되면, 그 처리가 병목 현상이 되고 단일 코어로는 더 이상 따라가지 못하게 됩니다. asyncio를 스레드와 결합하는 것이 여기서 가장 유용합니다: 스레드당 이벤트 루프를 실행함으로써, 다양한 요청 처리가 여러 CPU 코어에서 병렬로 실행될 수 있습니다. 또한 asyncio 애플리케이션에서 블로킹하거나 CPU 바운드 코드를 실행해야 할 때도 유용합니다.
더 보기
`Scaling asyncio on Free-Threaded Python <https://labs.quansight.org/blog/scaling-asyncio-on-free-threaded-python>`은 Kumar Aditya가 작성한 블로그 게시물로, 무료 스레드 Python 환경에서 asyncio를 안전하고 효율적으로 만드는 내부 변경 사항과 그 결과로 개선된 벤치마크를 설명합니다.
스레드 안전성 고려 사항¶
asyncio는 무료 스레드 Python 환경에서 스레드 안전하도록 설계되었지만, asyncio를 스레드와 함께 사용할 때 여전히 염두에 두어야 할 몇 가지 고려 사항이 있습니다:
이벤트 루프: 각 스레드는 공유해서는 안 되는 자신만의 이벤트 루프를 가져야 합니다. 이렇게 해야 이벤트 루프가 다른 스레드의 간섭 없이 자체 작업을 관리하고 콜백을 처리할 수 있습니다.
작업 관리: 한 스레드에서 생성된 작업(Task)과 Future는 다른 스레드에서 await 하거나 조작해서는 안 됩니다.
스레드 안전 API: 여러 스레드에서 asyncio와 상호 작용할 때는, 다른 스레드에서 이벤트 루프로 코루틴을 제출하기 위해 :func:`asyncio.run_coroutine_threadsafe`와 같은 asyncio가 제공하는 스레드 안전 API를 사용하는 것이 중요합니다. 다른 스레드에서 콜백을 호출해야 하는 경우, :meth:`loop.call_soon_threadsafe`를 사용하여 안전하게 예약할 수 있습니다.
동기화: asyncio가 제공하는 동기화 프리미티브(예:
asyncio.Lock및asyncio.Event)는 여러 스레드에 걸쳐 사용하도록 설계되지 않았습니다. 스레드 간 동기화가 필요한 경우 대신threading모듈의 동기화 프리미티브를 사용해야 합니다.
asyncio와 스레드 사용하기¶
asyncio는 각 스레드당 하나의 이벤트 루프를 실행하는 것을 지원하여, 무료 스레드 Python 환경에서 여러 CPU 코어의 이점을 활용할 수 있게 합니다. 각 스레드가 자체 이벤트 루프를 실행할 수 있으며, 작업을 독립적으로 예약할 수 있습니다.
asyncio와 스레드를 사용하는 방법의 예시입니다:
import asyncio
import threading
async def worker(name: str) -> None:
print(f"Worker {name} 시작")
await asyncio.sleep(1)
print(f"Worker {name} 완료")
def run_loop(name: str) -> None:
asyncio.run(worker(name))
threads = [
threading.Thread(target=run_loop, args=(f"T{i}",))
for i in range(4)
]
for t in threads:
t.start()
for t in threads:
t.join()
이 예시에서는 각 스레드가 :func:`asyncio.run`을 사용하여 자체 이벤트 루프를 생성하고 그 위에서 코루틴을 실행합니다. 스레드들은 동시에 실행되며, 무료 스레드 빌드에서는 별도의 CPU 코어에서 병렬로 실행될 수 있습니다.
스레드 간 Producer/consumer¶
일반(비-asyncio) 스레드가 다른 스레드에서 실행되는 asyncio 이벤트 루프에 작업을 전달해야 하는 경우, 단일 이벤트 루프 내에서만 안전한 asyncio.Queue 대신 :class:`queue.Queue`와 같은 스레드 안전 프리미티브를 사용해야 합니다.:
import asyncio
import queue
import threading
def producer(q: queue.Queue[int]) -> None:
for i in range(5):
print(f"생산 중 {i}")
q.put(i)
q.shutdown()
async def consumer(q: queue.Queue[int]) -> None:
while True:
try:
item = q.get_nowait()
except queue.Empty:
await asyncio.sleep(0.1)
continue
except queue.ShutDown:
break
print(f"소비: {item}")
await asyncio.sleep(item)
q: queue.Queue[int] = queue.Queue()
consumer_thread = threading.Thread(
target=lambda: asyncio.run(consumer(q))
)
consumer_thread.start()
producer(q)
consumer_thread.join()
프로듀서는 메인 스레드에서 실행되고 소비자는 자체 스레드의 이벤트 루프 내에서 실행되지만, queue.Queue 를 통해 안전하게 통신합니다. 큐가 비어 있으면 소비자는 잠시 대기한 후 다시 시도합니다. 프로듀서가 완료되면 shutdown() 을 호출하여, 후속 get_nowait() 호출이 queue.ShutDown 을 발생시켜 소비자가 깔끔하게 종료할 수 있습니다.