스트림

스트림은 네트워크 연결로 작업하기 위해, async/await에서 사용할 수 있는 고수준 프리미티브입니다. 스트림은 콜백이나 저수준 프로토콜과 트랜스포트를 사용하지 않고 데이터를 송수신할 수 있게 합니다.

다음은 asyncio 스트림을 사용하여 작성된 TCP 메아리 클라이언트의 예입니다:

import asyncio

async def tcp_echo_client(message):
    async with asyncio.connect('127.0.0.1', 8888) as stream:
        print(f'Send: {message!r}')
        await stream.write(message.encode())

        data = await stream.read(100)
        print(f'Received: {data.decode()!r}')

asyncio.run(tcp_echo_client('Hello World!'))

아래의 예제 절도 참조하십시오.

스트림 함수

다음 최상위 asyncio 함수를 사용하여 스트림을 만들고 작업할 수 있습니다.:

coroutine asyncio.connect(host=None, port=None, *, limit=2**16, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

host : port 주소로 TCP 소켓을 연결하고 StreamMode.READWRITE 모드의 Stream 객체를 반환합니다.

limit는 반환된 Stream 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

나머지 인자는 loop.create_connection()로 직접 전달됩니다.

이 함수는 await와 함께 사용하여 연결된 스트림을 얻을 수 있습니다:

stream = await asyncio.connect('127.0.0.1', 8888)

이 함수는 비동기 컨텍스트 관리자로도 사용할 수 있습니다:

async with asyncio.connect('127.0.0.1', 8888) as stream:
    ...

버전 3.8에 추가.

coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)

네트워크 연결을 만들고 (reader, writer) 객체 쌍을 반환합니다.

반환된 readerwriter 객체는 StreamReaderStreamWriter 클래스의 인스턴스입니다.

loop 인자는 선택적이며 이 함수를 코루틴에서 기다릴 때 언제나 자동으로 결정될 수 있습니다.

limit는 반환된 StreamReader 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

나머지 인자는 loop.create_connection()로 직접 전달됩니다.

버전 3.7에 추가: ssl_handshake_timeout 매개 변수.

Deprecated since version 3.8, will be removed in version 3.10: open_connection()은 폐지되었고 connect()로 대신합니다.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=2**16, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

소켓 서버를 시작합니다.

새 클라이언트 연결이 만들어질 때마다 client_connected_cb 콜백이 호출됩니다. 이 콜백은 두 개의 인자로 (reader, writer) 쌍을 받는데, StreamReaderStreamWriter 클래스의 인스턴스입니다.

client_connected_cb는 일반 콜러블이나 코루틴 함수 일 수 있습니다; 코루틴 함수면, 자동으로 Task로 예약됩니다.

loop 인자는 선택적이며, 이 메서드를 코루틴이 기다릴 때 항상 자동으로 결정될 수 있습니다.

limit는 반환된 StreamReader 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

나머지 인자는 loop.create_server()로 직접 전달됩니다.

버전 3.7에 추가: ssl_handshake_timeoutstart_serving 매개 변수.

Deprecated since version 3.8, will be removed in version 3.10: start_server()는 폐지되었고 StreamServer로 대신합니다.

coroutine asyncio.connect_read_pipe(pipe, *, limit=2**16)

파일류 객체 pipe를 취해서, StreamReader와 비슷한 API를 가진 StreamMode.READ 모드의 Stream 객체를 반환합니다. 비동기 컨텍스트 관리자로 사용할 수도 있습니다.

limit는 반환된 Stream 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

버전 3.8에 추가.

coroutine asyncio.connect_write_pipe(pipe, *, limit=2**16)

파일류 객체 pipe를 취해서, StreamWriter와 비슷한 API를 가진 StreamMode.WRITE 모드의 Stream 객체를 반환합니다. 비동기 컨텍스트 관리자로 사용할 수도 있습니다.

limit는 반환된 Stream 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

버전 3.8에 추가.

유닉스 소켓

asyncio.connect_unix(path=None, *, limit=2**16, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

path 주소를 갖는 소켓으로 유닉스 소켓 연결을 만들고, 판독기(reader)와 기록기(writer)로 사용할 수 있는 StreamMode.READWRITE 모드의 어웨이터블 Stream 객체를 반환합니다.

limit는 반환된 Stream 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

나머지 인자는 loop.create_unix_connection()으로 직접 전달됩니다.

이 함수는 await와 함께 사용하여 연결된 스트림을 얻을 수 있습니다:

stream = await asyncio.connect_unix('/tmp/example.sock')

이 함수는 비동기 컨텍스트 관리자로도 사용할 수 있습니다:

async with asyncio.connect_unix('/tmp/example.sock') as stream:
    ...

가용성: 유닉스.

버전 3.8에 추가.

coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

유닉스 소켓 연결을 만들고 (reader, writer) 쌍을 반환합니다.

open_connection()과 비슷하지만, 유닉스 소켓에서 작동합니다.

loop.create_unix_connection()의 설명서도 참조하십시오.

가용성: 유닉스.

버전 3.7에 추가: ssl_handshake_timeout 매개 변수.

버전 3.7에서 변경: path 매개 변수는 이제 경로류 객체가 될 수 있습니다.

Deprecated since version 3.8, will be removed in version 3.10: open_unix_connection()은 폐지되었고, connect_unix()로 대신합니다.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

유닉스 소켓 서버를 시작합니다.

start_server()와 비슷하지만, 유닉스 소켓에서 작동합니다.

loop.create_unix_server()의 설명서도 참조하십시오.

가용성: 유닉스.

버전 3.7에 추가: ssl_handshake_timeoutstart_serving 매개 변수.

버전 3.7에서 변경: path 매개 변수는 이제 경로류 객체가 될 수 있습니다.

Deprecated since version 3.8, will be removed in version 3.10: start_unix_server()는 폐지되었고, UnixStreamServer로 대신합니다.


StreamServer

class asyncio.StreamServer(client_connected_cb, /, host=None, port=None, *, limit=2**16, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, shutdown_timeout=60)

새 클라이언트 연결이 만들어질 때마다 client_connected_cb 콜백이 호출됩니다. 이 콜백은 StreamMode.READWRITE 모드의 Stream 객체를 받습니다.

client_connected_cb는 일반 콜러블이나 코루틴 함수 일 수 있습니다; 코루틴 함수면, 자동으로 Task로 예약됩니다.

limit는 반환된 Stream 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

나머지 인자는 loop.create_server()로 직접 전달됩니다.

coroutine start_serving()

지정된 host와 port에 바인드하고 서버를 시작합니다.

coroutine serve_forever()

코루틴이 취소될 때까지 연결을 받아들이기 시작합니다. serve_forever 태스크를 취소하면 서버가 닫힙니다.

이 메서드는 서버가 이미 연결을 받아들이고 있을 때 호출 할 수 있습니다. 하나의 Server 객체 당 하나의 serve_forever 태스크 만 존재할 수 있습니다.

is_serving()

서버가 바인드되어 현재 서빙 중이면 True를 반환합니다.

bind()

지정된 hostport에 서버를 바인드합니다. 이 메서드는 StreamServer가 비동기 컨텍스트 관리자로 사용될 때 __aenter__ 중에 자동으로 호출됩니다.

is_bound()

서버가 바인드 되었으면 True를 반환합니다.

coroutine abort()

연결을 닫고 계류 중인 모든 태스크를 취소합니다.

coroutine close()

연결을 닫습니다. 이 메서드는 StreamServer가 비동기 컨텍스트 관리자로 사용될 때 __aexit__ 중에 자동으로 호출됩니다.

sockets

서버가 바인드된 소켓 객체의 튜플을 반환합니다.

버전 3.8에 추가.

UnixStreamServer

class asyncio.UnixStreamServer(client_connected_cb, /, path=None, *, limit=2**16, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)

새 클라이언트 연결이 만들어질 때마다 client_connected_cb 콜백이 호출됩니다. 이 콜백은 StreamMode.READWRITE 모드의 Stream 객체를 받습니다.

client_connected_cb는 일반 콜러블이나 코루틴 함수 일 수 있습니다; 코루틴 함수면, 자동으로 Task로 예약됩니다.

limit는 반환된 Stream 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

나머지 인자는 loop.create_unix_server()로 직접 전달됩니다.

coroutine start_serving()

지정된 host와 port에 바인드하고 서버를 시작합니다.

is_serving()

서버가 바인드되어 현재 서빙 중이면 True를 반환합니다.

bind()

지정된 hostport에 서버를 바인드 합니다. 이 메서드는 UnixStreamServer가 비동기 컨텍스트 관리자로 사용될 때 __aenter__ 중에 자동으로 호출됩니다.

is_bound()

서버가 바인드 되었으면 True를 반환합니다.

coroutine abort()

연결을 닫고 계류 중인 모든 태스크를 취소합니다.

coroutine close()

연결을 닫습니다. 이 메서드는 UnixStreamServer가 비동기 컨텍스트 관리자로 사용될 때 __aexit__ 중에 자동으로 호출됩니다.

sockets

서버가 바인드된 소켓 객체의 튜플을 반환합니다.

가용성: 유닉스.

버전 3.8에 추가.

Stream

class asyncio.Stream

IO 스트림에 데이터를 읽고 쓰는 API를 제공하는 Stream 객체를 나타냅니다. StreamReaderStreamWriter에서 제공하는 API가 포함됩니다.

Stream 객체를 직접 인스턴스로 만들지 마십시오; 대신 connect()StreamServer 같은 API를 사용하십시오.

버전 3.8에 추가.

StreamMode

class asyncio.StreamMode

Stream 객체의 mode를 판단하는 데 사용할 수 있는 값 집합을 정의하는 enum.Flag의 서브 클래스.

READ

스트림 객체는 읽을 수 있으며 StreamReader의 API를 제공합니다.

WRITE

스트림 객체는 쓸 수 있으며 StreamWriter의 API를 제공합니다.

READWRITE

스트림 객체는 읽고 쓸 수 있으며 StreamReaderStreamWriter의 API를 모두 제공합니다.

버전 3.8에 추가.

StreamReader

class asyncio.StreamReader

IO 스트림에서 데이터를 읽는 API를 제공하는 판독기(reader) 객체를 나타냅니다.

StreamReader 객체를 직접 인스턴스로 만드는 것은 권장되지 않습니다. 대신 open_connection()start_server()를 사용하십시오.

coroutine read(n=-1)

최대 n 바이트를 읽습니다. n이 제공되지 않거나 -1로 설정되면, EOF까지 읽은 후 모든 읽은 바이트를 반환합니다.

EOF를 수신했고 내부 버퍼가 비어 있으면, 빈 bytes 객체를 반환합니다.

coroutine readline()

한 줄을 읽습니다. 여기서 "줄"은 \n로 끝나는 바이트의 시퀀스입니다.

EOF를 수신했고, \n를 찾을 수 없으면, 이 메서드는 부분적으로 읽은 데이터를 반환합니다.

EOF를 수신했고, 내부 버퍼가 비어 있으면 빈 bytes 객체를 반환합니다.

coroutine readexactly(n)

정확히 n 바이트를 읽습니다.

n 바이트를 읽기 전에 EOF에 도달하면, IncompleteReadError를 일으킵니다. 부분적으로 읽은 데이터를 가져오려면 IncompleteReadError.partial 어트리뷰트를 사용하십시오.

coroutine readuntil(separator=b'\n')

separator가 발견될 때까지 스트림에서 데이터를 읽습니다.

성공하면, 데이터와 separator가 내부 버퍼에서 제거됩니다 (소비됩니다). 반환된 데이터에는 끝에 separator가 포함됩니다.

읽은 데이터의 양이 구성된 스트림 제한을 초과하면 LimitOverrunError 예외가 발생하고, 데이터는 내부 버퍼에 그대로 남아 있으며 다시 읽을 수 있습니다.

완전한 separator가 발견되기 전에 EOF에 도달하면 IncompleteReadError 예외가 발생하고, 내부 버퍼가 재설정됩니다. IncompleteReadError.partial 어트리뷰트에는 separator 일부가 포함될 수 있습니다.

버전 3.5.2에 추가.

at_eof()

버퍼가 비어 있고 feed_eof()가 호출되었으면 True를 반환합니다.

flowdas

feed_eof() 는 EOF를 수신했을 때 호출되는 내부 메서드입니다.

StreamWriter

class asyncio.StreamWriter

IO 스트림에 데이터를 쓰는 API를 제공하는 기록기(writer) 객체를 나타냅니다.

StreamWriter 객체를 직접 인스턴스로 만드는 것은 권장되지 않습니다. 대신 open_connection()start_server()를 사용하십시오.

write(data)

이 메서드는 하부 소켓에 data를 즉시 기록하려고 시도합니다. 실패하면, data는 보낼 수 있을 때까지 내부 쓰기 버퍼에 계류됩니다.

파이썬 3.8부터는, write() 메서드를 직접 await 할 수 있습니다:

await stream.write(data)

await는 데이터가 소켓에 기록될 때까지 현재 코루틴을 일시 중지합니다.

다음은 파이썬 <= 3.7 에서 작동하는 동등한 코드입니다:

stream.write(data)
await stream.drain()

flowdas

파이썬 3.7까지, 이 메서드는 흐름 제어의 대상이 아니었습니다. 호출 뒤에는 drain()\이 와야 합니다.

버전 3.8에서 변경: await stream.write(...) 문법을 지원합니다.

writelines(data)

이 메서드는 하부 소켓에 바이트열의 리스트(또는 임의의 이터러블)를 즉시 기록합니다. 실패하면, data는 보낼 수 있을 때까지 내부 쓰기 버퍼에 계류됩니다.

파이썬 3.8부터는, write() 메서드를 직접 await 할 수 있습니다:

await stream.writelines(lines)

await는 데이터가 소켓에 기록될 때까지 현재 코루틴을 일시 중지합니다.

다음은 파이썬 <= 3.7 에서 작동하는 동등한 코드입니다:

stream.writelines(lines)
await stream.drain()

flowdas

파이썬 3.7까지, 이 메서드는 흐름 제어의 대상이 아니었습니다. 호출 뒤에는 drain()\이 와야 합니다.

버전 3.8에서 변경: await stream.writelines() 문법을 지원합니다.

close()

이 메서드는 스트림과 하부 소켓을 닫습니다.

파이썬 3.8부터, close() 메서드를 직접 await 할 수 있습니다:

await stream.close()

await는 스트림과 하부 소켓이 닫힐 때까지(그리고 보안 연결에서 SSL 종료가 수행될 때까지) 현재의 코루틴을 일시 중지합니다.

다음은 파이썬 <= 3.7 에서 작동하는 동등한 코드입니다:

stream.close()
await stream.wait_closed()

버전 3.8에서 변경: await stream.close() 문법을 지원합니다.

can_write_eof()

하부 트랜스포트가 write_eof() 메서드를 지원하면 True를 반환하고, 그렇지 않으면 False를 반환합니다.

write_eof()

버퍼링 된 쓰기 데이터가 플러시 된 후에 스트림의 쓰기 끝을 닫습니다.

transport

하부 asyncio 트랜스포트를 돌려줍니다.

get_extra_info(name, default=None)

선택적 트랜스포트 정보에 액세스합니다; 자세한 내용은 BaseTransport.get_extra_info()를 참조하십시오.

coroutine drain()

스트림에 기록을 다시 시작하는 것이 적절할 때까지 기다립니다. 예:

writer.write(data)
await writer.drain()

이것은 하부 IO 쓰기 버퍼와 상호 작용하는 흐름 제어 메서드입니다. 버퍼의 크기가 높은 수위에 도달하면, 버퍼 크기가 낮은 수위까지 내려가서 쓰기가 다시 시작될 수 있을 때까지 drain()은 블록합니다. 기다릴 것이 없으면, drain()은 즉시 반환합니다.

is_closing()

스트림이 닫혔거나 닫히고 있으면 True를 반환합니다.

버전 3.7에 추가.

coroutine wait_closed()

스트림이 닫힐 때까지 기다립니다.

하부 연결이 닫힐 때까지 기다리려면 close() 뒤에 호출해야 합니다.

버전 3.7에 추가.

예제

스트림을 사용하는 TCP 메아리 클라이언트

asyncio.connect() 함수를 사용하는 TCP 메아리 클라이언트:

import asyncio

async def tcp_echo_client(message):
    async with asyncio.connect('127.0.0.1', 8888) as stream:
        print(f'Send: {message!r}')
        await stream.write(message.encode())

        data = await stream.read(100)
        print(f'Received: {data.decode()!r}')

asyncio.run(tcp_echo_client('Hello World!'))

더 보기

TCP 메아리 클라이언트 프로토콜 예제는 저수준 loop.create_connection() 메서드를 사용합니다.

스트림을 사용하는 TCP 메아리 서버

asyncio.StreamServer 클래스를 사용하는 TCP 메아리 서버:

import asyncio

async def handle_echo(stream):
    data = await stream.read(100)
    message = data.decode()
    addr = stream.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    await stream.write(data)

    print("Close the connection")
    await stream.close()

async def main():
    async with asyncio.StreamServer(
            handle_echo, '127.0.0.1', 8888) as server:
        addr = server.sockets[0].getsockname()
        print(f'Serving on {addr}')
        await server.serve_forever()

asyncio.run(main())

더 보기

TCP 메아리 서버 프로토콜 예제는 loop.create_server() 메서드를 사용합니다.

HTTP 헤더 가져오기

명령 줄로 전달된 URL의 HTTP 헤더를 조회하는 간단한 예제:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        stream = await asyncio.connect(url.hostname, 443, ssl=True)
    else:
        stream = await asyncio.connect(url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    stream.write(query.encode('latin-1'))
    while (line := await stream.readline()):
        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # 바디를 무시하고, 소켓을 닫습니다
    await stream.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

사용법:

python example.py http://example.com/path/page.html

또는 HTTPS를 사용하면:

python example.py https://example.com/path/page.html

스트림을 사용하여 데이터를 기다리는 열린 소켓 등록

asyncio.connect() 함수를 사용하여 소켓이 데이터를 수신할 때까지 기다리는 코루틴:

import asyncio
import socket

async def wait_for_data():
    # 저수준 API에 액세스하기 위해 현재 이벤트 루프에 대한 참조를 가져옵니다.
    loop = asyncio.get_running_loop()

    # 연결된 소켓 쌍을 만듭니다.
    rsock, wsock = socket.socketpair()

    # 데이터를 기다리는 열린 소켓을 등록합니다.
    async with asyncio.connect(sock=rsock) as stream:
        # 네트워크로부터의 데이터 수신을 시뮬레이션합니다
        loop.call_soon(wsock.send, 'abc'.encode())

        # 데이터를 기다립니다
        data = await stream.read(100)

        # 데이터를 받았습니다, 할 일을 마쳤습니다: 소켓을 닫습니다.
        print("Received:", data.decode())

    # 두 번째 소켓을 닫습니다
    wsock.close()

asyncio.run(wait_for_data())

더 보기

프로토콜을 사용하여 데이터를 기다리는 열린 소켓 등록 예제는 저수준 프로토콜과 loop.create_connection() 메서드를 사용합니다.

파일 기술자에서 읽기 이벤트를 관찰하기 예제는 저수준 loop.add_reader() 메서드를 사용하여 파일 기술자를 관찰합니다.