Python

asyncio로 개발하기

비동기 프로그래밍은 고전적인 “순차적” 프로그래밍과 다릅니다.

이 페이지는 흔한 실수와 함정을 나열하고, 이를 피하는 방법을 설명합니다.

디버그 모드

기본적으로 asyncio는 프로덕션 모드로 실행됩니다. 개발을 쉽게 하려고 asyncio에는 디버그 모드를 제공합니다.

여러 가지 방법으로 asyncio 디버그 모드를 활성화할 수 있습니다:

디버그 모드를 활성화하는 것 외에도, 다음을 고려하십시오:

  • asyncio 로거의 로그 수준을 logging.DEBUG로 설정, 예를 들어 응용 프로그램 시작 시 다음 코드 조각을 실행할 수 있습니다:

    logging.basicConfig(level=logging.DEBUG)
    
  • ResourceWarning 경고를 표시하도록 warnings 모듈을 구성. 이렇게 하는 한 가지 방법은 -W default 명령 줄 옵션을 사용하는 것입니다.

디버그 모드가 활성화되면:

  • 많은 스레드 안전하지 않은 asyncio API(loop.call_soon()loop.call_at() 메서드와 같은)가 잘못된 스레드에서 호출될 때 예외를 발생시킵니다.

  • I/O 선택기의 실행 시간은 I/O 연산 수행에 너무 오래 걸리면 로그 됩니다.

  • 100 밀리 초 보다 오래 걸리는 콜백이 로그 됩니다. loop.slow_callback_duration 어트리뷰트는 “느린” 것으로 간주할 최소 실행 시간(초)을 설정하는 데 사용될 수 있습니다.

동시성과 다중 스레드

이벤트 루프는 스레드(일반적으로 주 스레드)에서 실행되며 그 스레드에서 모든 콜백과 태스크를 실행합니다. 태스크가 이벤트 루프에서 실행되는 동안, 다른 태스크는 같은 스레드에서 실행될 수 없습니다. 태스크가 await 표현식을 실행하면, 실행 중인 태스크가 일시 중지되고 이벤트 루프는 다음 태스크를 실행합니다.

다른 OS 스레드에서 콜백을 예약하려면, loop.call_soon_threadsafe() 메서드를 사용해야 합니다. 예:

loop.call_soon_threadsafe(callback, *args)

거의 모든 asyncio 객체는 스레드 안전하지 않습니다. 태스크나 콜백 외부에서 작동하는 코드가 없으면 일반적으로 문제가 되지 않습니다. 그러한 코드가 저수준 asyncio API를 호출해야 하면, loop.call_soon_threadsafe() 메서드를 사용해야 합니다, 예를 들어:

loop.call_soon_threadsafe(fut.cancel)

다른 OS 스레드에서 코루틴 객체를 예약하려면, run_coroutine_threadsafe() 함수를 사용해야 합니다. 결과에 액세스할 수 있도록 concurrent.futures.Future를 반환합니다:

async def coro_func():
     return await asyncio.sleep(1, 42)

# 나중에 다른 OS 스레드에서:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# 결과를 기다립니다:
result = future.result()

시그널을 처리하려면, 이벤트 루프를 메인 스레드에서 실행해야 합니다.

loop.run_in_executor() 메서드는 concurrent.futures.ThreadPoolExecutor 또는 :class:`~concurrent.futures.InterpreterPoolExecutor`와 함께 사용되어 이벤트 루프가 실행되는 OS 스레드를 차단하지 않고 다른 OS 스레드에서 블로킹 코드를 실행할 수 있습니다.

현재 다른 프로세스(가령 multiprocessing으로 시작된 프로세스)에서 직접 코루틴이나 콜백을 예약할 방법은 없습니다. Event loop methods 섹션은 이벤트 루프를 블록하지 않고 파이프를 읽고 파일 기술자를 감시할 수 있는 API를 나열합니다. 또한 asyncio의 서브 프로세스 API는 프로세스를 시작하고 이벤트 루프에서 프로세스와 통신하는 방법을 제공합니다. 마지막으로, 앞서 언급한 loop.run_in_executor() 메서드를 concurrent.futures.ProcessPoolExecutor와 함께 사용하여 다른 프로세스에서 코드를 실행할 수도 있습니다.

블로킹 코드 실행하기

블로킹 (CPU 병목) 코드는 직접 호출하면 안 됩니다. 예를 들어, 함수가 CPU 집약적인 계산을 1초 동안 수행하면, 모든 동시 asyncio 태스크와 IO 연산이 1초 지연됩니다.

실행기는 이벤트 루프의 OS 스레드를 차단하는 것을 방지하기 위해 다른 인터프리터 또는 프로세스에서 작업을 실행하는 데 사용될 수 있습니다. 자세한 내용은 loop.run_in_executor() 메서드를 참조하십시오.

로깅

asyncio는 logging 모듈을 사용하고, 모든 로깅은 "asyncio" 로거를 통해 수행됩니다.

기본 로그 수준은 logging.INFO며, 쉽게 조정할 수 있습니다:

logging.getLogger("asyncio").setLevel(logging.WARNING)

네트워크 로깅은 이벤트 루프를 차단할 수 있습니다. 로그 처리를 위해 별도의 스레드를 사용하거나 비차단 I/O를 사용하는 것이 좋습니다. 예를 들어, :ref:`blocking-handlers`를 참조하십시오.

await 하지 않은 코루틴 감지

코루틴 함수가 호출되었지만 기다리지 않을 때(예를 들어, await coro() 대신 coro())나 코루틴이 asyncio.create_task()로 예약되지 않으면 asyncio가 RuntimeWarning을 방출합니다:

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

출력:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

디버그 모드에서의 출력:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

일반적인 수정법은 코루틴을 await 하거나 asyncio.create_task() 함수를 호출하는 것입니다:

async def main():
    await test()

전달되지 않은 예외 감지

Future.set_exception()가 호출되었지만, Future 객체가 await 되지 않으면, 예외는 절대로 사용자 코드로 전파되지 않습니다. 이럴 때, Future 객체가 가비지 수집될 때 asyncio가 로그 메시지를 출력합니다.

처리되지 않은 예외의 예:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

출력:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

태스크가 만들어진 곳의 트레이스백을 얻으려면 디버그 모드를 활성화하세요:

asyncio.run(main(), debug=True)

디버그 모드에서의 출력:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

비동기 제너레이터 모범 사례

정확하고 효율적인 asyncio 코드를 작성하려면 특정 함정들을 인식해야 합니다. 이 섹션은 몇 시간을 들여 디버깅하는 시간을 절약해 줄 수 있는 필수적인 모범 사례들을 요약합니다.

비동기 제너레이터 명시적으로 닫기

비동기 제너레이터 <비동기 제너레이터 이터레이터>`을 수동으로 닫는 것이 권장됩니다. 만약 제너레이터가 조기에 종료되면—예를 들어 ``async for` 루프의 본문에서 예외가 발생한 경우—비동기 정리 코드가 예기치 않은 컨텍스트에서 실행될 수 있습니다. 이는 관련 작업들이 완료된 후 또는 비동기 제너레이터의 가비지 컬렉션 훅이 호출되는 이벤트 루프 종료 중에 발생할 수 있습니다.

이를 피하려면, 해당 제너레이터의 aclose() 메서드를 호출하여 명시적으로 닫거나, contextlib.aclosing() 컨텍스트 관리자를 사용하십시오:

import asyncio
import contextlib

async def gen():
  yield 1
  yield 2

async def func():
  async with contextlib.aclosing(gen()) as g:
    async for x in g:
      break  # 끝까지 이터레이트하지 않음

asyncio.run(func())

앞서 언급했듯이, 이러한 비동기 제너레이터의 정리 작업은 지연됩니다. 다음 예제는 비동기 제너레이터의 최종화가 예상치 못한 순서로 발생할 수 있음을 보여줍니다:

import asyncio
work_done = False

async def cursor():
    try:
        yield 1
    finally:
        assert work_done

async def rows():
    global work_done
    try:
        yield 2
    finally:
        await asyncio.sleep(0.1) # 비동기 작업을 모방
        work_done = True


async def main():
    async for c in cursor():
        async for r in rows():
            break
        break

asyncio.run(main())

이 예제는 다음과 같은 출력을 만듭니다:

unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<<async_generator_athrow without __name__>()> exception=AssertionError()>
Traceback (most recent call last):
  File "example.py", line 6, in cursor
    yield 1
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "example.py", line 8, in cursor
    assert work_done
           ^^^^^^^^^
AssertionError

cursor() 비동기 제너레이터가 rows 제너레이터보다 먼저 최종화되었습니다. 이는 예상치 못한 동작입니다.

이 예제는 cursor``와 ``rows 비동기 제너레이터를 명시적으로 닫아서 고칠 수 있습니다:

async def main():
    async with contextlib.aclosing(cursor()) as cursor_gen:
        async for c in cursor_gen:
            async with contextlib.aclosing(rows()) as rows_gen:
                async for r in rows_gen:
                    break
            break

이벤트 루프가 실행 중일 때만 비동기 제너레이터 생성

:term:`비동기 제너레이터 <비동기 제너레이터 이터레이터>`는 이벤트 루프가 생성된 후에만 생성하는 것이 권장됩니다.

비동기 제너레이터가 안정적으로 닫히도록 보장하기 위해, 이벤트 루프는 콜백 함수를 등록하는 sys.set_asyncgen_hooks() 함수를 사용합니다. 이 콜백 함수들은 실행 중인 비동기 제너레이터 목록을 업데이트하여 일관된 상태를 유지합니다.

loop.shutdown_asyncgens() 함수가 호출되면, 실행 중인 제너레이터들은 우아하게 중지되고 목록은 정리됩니다.

비동기 제너레이터는 첫 번째 반복 중에 해당 시스템 훅을 호출합니다. 동시에, 제너레이터는 해당 훅이 호출되었음을 기록하며 다시 호출하지 않습니다.

따라서, 이벤트 루프가 생성되기 전에 반복이 시작되면, 훅들이 제너레이터가 호출하려고 시도한 후에 설정되기 때문에 이벤트 루프는 제너레이터를 활성 제너레이터 목록에 추가할 수 없습니다. 결과적으로, 필요할 경우 이벤트 루프가 제너레이터를 종료할 수 없습니다.

다음 예를 고려하십시오:

import asyncio

async def agenfn():
    try:
        yield 10
    finally:
        await asyncio.sleep(0)


with asyncio.Runner() as runner:
    agen = agenfn()
    print(runner.run(anext(agen)))
    del agen

출력:

10
Exception ignored while closing generator <async_generator object agenfn at 0x000002F71CD10D70>:
Traceback (most recent call last):
  File "example.py", line 13, in <module>
    del agen
        ^^^^
RuntimeError: async generator ignored GeneratorExit

이 예제는 다음과 같이 고칠 수 있습니다:

import asyncio

async def agenfn():
    try:
        yield 10
    finally:
        await asyncio.sleep(0)

async def main():
    agen = agenfn()
    print(await anext(agen))
    del agen

asyncio.run(main())

같은 제너레이터의 동시 반복 및 닫기 피하기

비동기 제너레이터는 다른 __anext__() / athrow() / aclose() 호출이 진행되는 동안 재진입될 수 있습니다. 이는 비동기 제너레이터의 일관되지 않은 상태를 초래하여 오류가 발생할 수 있습니다.

다음 예제를 고려해 봅시다:

import asyncio

async def consumer():
    for idx in range(100):
        await asyncio.sleep(0)
        message = yield idx
        print('received', message)

async def amain():
    agenerator = consumer()
    await agenerator.asend(None)

    fa = asyncio.create_task(agenerator.asend('A'))
    fb = asyncio.create_task(agenerator.asend('B'))
    await fa
    await fb

asyncio.run(amain())

출력:

received A
Traceback (most recent call last):
  File "test.py", line 38, in <module>
    asyncio.run(amain())
    ~~~~~~~~~~~^^^^^^^^^
  File "Lib/asyncio/runners.py", line 204, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "Lib/asyncio/runners.py", line 127, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "Lib/asyncio/base_events.py", line 719, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "test.py", line 36, in amain
    await fb
RuntimeError: anext(): asynchronous generator is already running

따라서, 비동기 제너레이터를 병렬 작업이나 여러 이벤트 루프에 걸쳐 사용하는 것을 피하는 것이 좋습니다.