Python

로깅 요리책

이 페이지에는 과거에 유용했던 로깅 관련 여러 레시피가 포함되어 있습니다. 튜토리얼 및 참조 정보 링크는 :ref:`cookbook-ref-links`를 참조하십시오.

여러 모듈에서 로깅 사용하기

logging.getLogger('someLogger') 를 여러 번 호출하면 같은 로거 객체에 대한 참조가 반환됩니다. 같은 모듈 내에서뿐만 아니라, 같은 파이썬 인터프리터 프로세스에 있는 한, 여러 모듈에서도 마찬가지입니다. 참조가 같은 객체를 가리킨다는 것에 더해, 응용 프로그램 코드는 하나의 모듈에서 부모 로거를 정의 및 구성하고 별도의 모듈에서 자식 로거를 생성 (구성하지 않음) 할 수 있으며, 자식에 대한 모든 로거 호출은 부모로 전달됩니다. 다음은 메인 모듈입니다:

import logging
import auxiliary_module

# 로거를 생성합니다.
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# 디버그 메시지를 로그하는 파일 핸들러를 생성합니다.
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# 더 높은 로그 레벨의 콘솔 핸들러를 생성합니다.
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 포매터를 생성하고 핸들러에 추가합니다.
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 로거에 핸들러를 추가합니다.
logger.addHandler(fh)
logger.addHandler(ch)

logger.info('auxiliary_module.Auxiliary 인스턴스를 생성합니다.')
a = auxiliary_module.Auxiliary()
logger.info('auxiliary_module.Auxiliary 인스턴스가 생성되었습니다.')
logger.info('auxiliary_module.Auxiliary.do_something을 호출합니다.')
a.do_something()
logger.info('auxiliary_module.Auxiliary.do_something 완료되었습니다.')
logger.info('auxiliary_module.some_function()을 호출합니다.')
auxiliary_module.some_function()
logger.info('auxiliary_module.some_function() 완료되었습니다.')

다음은 보조 모듈입니다:

import logging

# 로거를 생성합니다.
module_logger = logging.getLogger('spam_application.auxiliary')

class Auxiliary:
    def __init__(self):
        self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
        self.logger.info('Auxiliary 인스턴스를 생성합니다.')

    def do_something(self):
        self.logger.info('작업 수행 중입니다.')
        a = 1 + 1
        self.logger.info('작업 수행 완료되었습니다.')

def some_function():
    module_logger.info('"some_function" 호출을 받았습니다.')

출력은 이렇게 됩니다:

2005-03-23 23:47:11,663 - spam_application - INFO -
   creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
   creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
   created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
   calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
   doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
   done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
   finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
   calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
   received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
   done with auxiliary_module.some_function()

여러 스레드에서 로깅 하기

여러 스레드에서 로깅 하는데 특별한 노력이 필요하지는 않습니다. 다음 예제에서는 메인 (최초) 스레드와 다른 스레드에서의 로깅을 보여줍니다:

import logging
import threading
import time

def worker(arg):
    while not arg['stop']:
        logging.debug('Hi from myfunc')
        time.sleep(0.5)

def main():
    logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
    info = {'stop': False}
    thread = threading.Thread(target=worker, args=(info,))
    thread.start()
    while True:
        try:
            logging.debug('Hello from main')
            time.sleep(0.75)
        except KeyboardInterrupt:
            info['stop'] = True
            break
    thread.join()

if __name__ == '__main__':
    main()

실행하면 스크립트는 다음과 같이 인쇄합니다:

   0 Thread-1 Hi from myfunc
   3 MainThread Hello from main
 505 Thread-1 Hi from myfunc
 755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc

예상대로 로그 출력이 산재해 있음을 볼 수 있습니다. 물론, 이 방법은 여기에 표시된 것보다 많은 스레드에서도 작동합니다.

다중 처리기 및 포매터

로거는 일반 파이썬 객체입니다. addHandler() 메서드에는 추가할 수 있는 처리기의 수에 대한 최소 또는 최대 할당량이 없습니다. 때로는 응용 프로그램이 모든 심각도의 모든 메시지를 텍스트 파일에 기록하는 동시에, 에러 또는 그 이상을 콘솔에 기록하는 것이 유용 할 수 있습니다. 이렇게 설정하려면, 적절한 처리기를 구성하기만 하면 됩니다. 응용 프로그램 코드의 로깅 호출은 변경되지 않습니다. 다음은 앞의 간단한 모듈 기반 구성 예제를 약간 수정 한 것입니다:

import logging

# 디렉터리 핸들러를 만들고 심지어 디버그 메시지도 기록합니다.
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
// 콘솔 핸들러는  높은 로그 수준을 갖도록 만듭니다.
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)

# 포매터를 만들고 핸들러에 추가합니다.
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)

# 핸들러를 로거에 추가합니다.
logger.addHandler(ch)
logger.addHandler(fh)

# '응용 프로그램' 코드
logger.debug('디버그 메시지')
logger.info('정보 메시지')
logger.warning('경고 메시지')
logger.error('오류 메시지')
logger.critical('심각한 메시지')

‘응용 프로그램’ 코드는 여러 처리기에 신경 쓰지 않습니다. 변경된 것은 fh 라는 새로운 처리기의 추가 및 구성뿐입니다.

중요도가 높거나 낮은 필터를 사용하여 새 처리기를 만드는 기능은 응용 프로그램을 작성하고 테스트할 때 매우 유용합니다. 디버깅을 위해 많은 print 문을 사용하는 대신에 logger.debug 를 사용하십시오: 나중에 삭제하거나 주석 처리해야 할 print 문과 달리, logger.debug 문은 소스 코드에서 그대로 유지될 수 있고, 그들을 다시 필요로 할 때까지 휴면 상태로 남아 있습니다. 그때, 필요한 유일한 변경은 로거 또는 처리기의 심각도 수준을 DEBUG로 수정하는 것입니다.

여러 대상으로 로깅 하기

다른 메시지 포맷으로 다른 상황에서 콘솔과 파일에 기록하려고 한다고 가정 해 봅시다. DEBUG 이상 수준의 메시지를 파일에 기록하고, 수준 INFO 이상인 메시지를 콘솔에 기록하려고 한다고 가정 해보십시오. 또한, 타임스탬프가 파일에는 포함되어야 하지만, 콘솔 메시지에는 없어야 한다고 가정합시다. 이렇게 하면 됩니다:

import logging

# 파일에 로깅을 설정합니다 - 자세한 내용은 이전 섹션을 참조하세요.
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M',
                    filename='/tmp/myapp.log',
                    filemode='w')
# sys.stderr로 INFO 메시지 이상의 수준을 기록하는 핸들러를 정의합니다.
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# 콘솔 사용에 더 간단한 형식을 설정합니다.
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# 핸들러에게 이 형식을 사용하도록 지시합니다.
console.setFormatter(formatter)
# 루트 로거에 핸들러를 추가합니다.
logging.getLogger().addHandler(console)

# 이제, 루트 로거 또는 기타 어떤 로거로도 로그를 기록할 수 있습니다. 먼저 루트...
logging.info('잭더즈는 나의 큰 수정의 속삭임을 사랑한다.')

# 다음으로, 애플리케이션의 영역을 나타낼 수 있는 몇 가지 다른 로거를 정의합니다:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('빠른 제피로스가 불어와 어릿광대 짐을 괴롭힌다.')
logger1.info('어떻게 빠르게 어릿광대 장작에 걸린 얼룩말이 괴롭게 할까.')
logger2.warning('감옥의 활기찬 여우가 quack에게서 돈을 빼앗았다.')
logger2.error('다섯 마리의 권투사 마법사가 재빨리 점프한다.')

실행하면 콘솔에는 다음과 같이 출력됩니다.

root        : INFO     Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO     How quickly daft jumping zebras vex.
myapp.area2 : WARNING  Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR    The five boxing wizards jump quickly.

파일에는 이렇게 기록됩니다.

10-22 22:19 root         INFO     Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1  DEBUG    Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1  INFO     How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2  WARNING  Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2  ERROR    The five boxing wizards jump quickly.

보시다시피 DEBUG 메시지는 파일에만 표시됩니다. 다른 메시지는 두 목적지로 전송됩니다.

이 예제는 콘솔과 파일 처리기를 사용하지만, 여러분이 선택하는 처리기의 수나 조합에 제약이 없습니다.

위의 로그 파일 이름 /tmp/myapp.log 선택은 POSIX 시스템에서 임시 파일을 위한 표준 위치를 사용함을 의미합니다. Windows에서는 로그를 위해 다른 디렉터리 이름을 선택해야 할 수도 있습니다. 단지 해당 디렉터리가 존재하며 파일 생성 및 업데이트 권한이 있는지 확인하시면 됩니다.

수준의 사용자 정의 처리

때로는 핸들러에서 수준에 대한 표준 처리와 약간 다른 작업을 수행하고 싶을 수 있습니다. 이 경우, 필터를 사용해야 합니다. 다음과 같은 시나리오를 살펴보겠습니다:

  • 심각도 INFOWARNING 수준의 메시지를 sys.stdout 으로 전송합니다.

  • 심각도가 ERROR 이상의 메시지를 sys.stderr 로 전송합니다.

  • 심각도 DEBUG 이상의 메시지를 파일 app.log 으로 전송합니다.

다음 JSON과 같이 로깅을 구성한다고 가정해 봅시다:

{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "simple": {
            "format": "%(levelname)-8s - %(message)s"
        }
    },
    "handlers": {
        "stdout": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "simple",
            "stream": "ext://sys.stdout"
        },
        "stderr": {
            "class": "logging.StreamHandler",
            "level": "ERROR",
            "formatter": "simple",
            "stream": "ext://sys.stderr"
        },
        "file": {
            "class": "logging.FileHandler",
            "formatter": "simple",
            "filename": "app.log",
            "mode": "w"
        }
    },
    "root": {
        "level": "DEBUG",
        "handlers": [
            "stderr",
            "stdout",
            "file"
        ]
    }
}

이 설정은 거의 우리가 원하는 바를 수행하지만, sys.stdout``에서 심각도 ``ERROR 메시지가 표시되고, 이 심각도와 높은 이벤트만 추적되며, 또한 INFOWARNING 메시지도 표시됩니다. 이를 방지하기 위해, 해당 메시지를 제외하는 필터를 설정하고 관련 핸들러에 추가할 수 있습니다. 이는 formattershandlers``와 병렬로 ``filters 섹션을 추가하여 구성할 수 있습니다:

{
    "filters": {
        "warnings_and_below": {
            "()" : "__main__.filter_maker",
            "level": "WARNING"
        }
    }
}

그리고 stdout 핸들러의 섹션을 수정하여 추가합니다:

{
    "stdout": {
        "class": "logging.StreamHandler",
        "level": "INFO",
        "formatter": "simple",
        "stream": "ext://sys.stdout",
        "filters": ["warnings_and_below"]
    }
}

필터는 단순히 함수이므로, 다음과 같이 filter_maker (팩토리 함수)를 정의할 수 있습니다:

def filter_maker(level):
    level = getattr(logging, level)

    def filter(record):
        return record.levelno <= level

    return filter

이것은 전달된 문자열 인수를 숫자 수준으로 변환하고, 주어진 레벨보다 같거나 낮은 레벨의 기록일 경우에만 True 를 반환하는 함수를 반환합니다. 이 예시에서 저는 테스트 스크립트인 main.py 에서 명령줄로 실행하여 filter_maker 를 정의했으므로 해당 모듈은 __main__ 이 됩니다. 따라서 필터 구성에서는 __main__.filter_maker 입니다. 다른 모듈에 정의하면 변경해야 합니다.

필터를 추가한 후, 전체 내용이 다음과 같은 main.py 를 실행할 수 있습니다:

import json
import logging
import logging.config

CONFIG = '''
{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "simple": {
            "format": "%(levelname)-8s - %(message)s"
        }
    },
    "filters": {
        "warnings_and_below": {
            "()" : "__main__.filter_maker",
            "level": "WARNING"
        }
    },
    "handlers": {
        "stdout": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "simple",
            "stream": "ext://sys.stdout",
            "filters": ["warnings_and_below"]
        },
        "stderr": {
            "class": "logging.StreamHandler",
            "level": "ERROR",
            "formatter": "simple",
            "stream": "ext://sys.stderr"
        },
        "file": {
            "class": "logging.FileHandler",
            "formatter": "simple",
            "filename": "app.log",
            "mode": "w"
        }
    },
    "root": {
        "level": "DEBUG",
        "handlers": [
            "stderr",
            "stdout",
            "file"
        ]
    }
}
'''

def filter_maker(level):
    level = getattr(logging, level)

    def filter(record):
        return record.levelno <= level

    return filter

logging.config.dictConfig(json.loads(CONFIG))
logging.debug('A DEBUG message')
logging.info('An INFO message')
logging.warning('A WARNING message')
logging.error('An ERROR message')
logging.critical('A CRITICAL message')

그리고 다음과 같이 실행한 후:

`python main.py 2>stderr.log >stdout.log` 실행하여 결과를 확인해 보세요.

결과가 예상한 대로 나온 것을 확인할 수 있습니다:

$ more *.log
::::::::::::::
app.log
::::::::::::::
DEBUG    - A DEBUG message
INFO     - An INFO message
WARNING  - A WARNING message
ERROR    - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stderr.log
::::::::::::::
ERROR    - An ERROR message
CRITICAL - A CRITICAL message
::::::::::::::
stdout.log
::::::::::::::
INFO     - An INFO message
WARNING  - A WARNING message

구성 서버 예제

다음은 로깅 구성 서버를 사용하는 모듈의 예입니다:

import logging
import logging.config
import time
import os

# 초기 설정 파일을 읽습니다.
logging.config.fileConfig('logging.conf')

# 포트 9999에서 리스너를 생성하고 시작합니다.
t = logging.config.listen(9999)
t.start()

logger = logging.getLogger('simpleExample')

try:
    # 차이점을 확인하기 위해 로깅 호출을 반복합니다.
    # Ctrl+C 를 누를 때까지 새로운 구성이 유지됩니다.
    while True:
        logger.debug('디버그 메시지')
        logger.info('정보 메시지')
        logger.warning('경고 메시지')
        logger.error('오류 메시지')
        logger.critical('심각한 메시지')
        time.sleep(5)
except KeyboardInterrupt:
    # 정리합니다.
    logging.config.stopListening()
    t.join()

다음은 파일 이름을 받아서, 그 파일을 새 로깅 구성으로 (이진 인코딩된 길이를 적절하게 앞에 붙여서) 서버로 보내는 스크립트입니다:

#!/usr/bin/env python
import socket, sys, struct

with open(sys.argv[1], 'rb') as f:
    data_to_send = f.read()

HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')

블록 하는 처리기 다루기

때로는 로깅하는 스레드를 차단하지 않고 로깅 처리기(handler)가 작업을 수행하도록 해야 할 때가 있습니다. 이는 웹 애플리케이션에서 흔하지만, 물론 다른 시나리오에서도 발생합니다.

흔히 느린 행동을 보이는 범인은 SMTPHandler 입니다: 개발자의 통제 밖에 있는 여러 가지 이유로, 전자 우편을 보내는 데 오랜 시간이 걸릴 수 있습니다 (예를 들어, 잘 동작하지 않는 메일 또는 네트워크 인프라). 그러나 거의 모든 네트워크 기반 처리기는 블록 할 수 있습니다. SocketHandler 작업도 너무 느린 DNS 질의를 이면에서 수행 할 수 있습니다 (그리고 이 질의는 여러분의 통제 밖에 있는, 파이썬 계층 아래의 소켓 라이브러리 코드 깊숙이 있을 수 있습니다).

한 가지 해결책은 두 부분으로 된 접근법을 사용하는 것입니다. 첫 번째 부분에서는, 성능이 중요한 스레드에서 액세스하는 로거에 QueueHandler 만 붙입니다. 그들은 단순히 큐에 씁니다. 충분한 용량으로 큐의 크기를 조정하거나, 크기의 상한이 없도록 초기화 할 수 있습니다. 큐에 대한 쓰기는 일반적으로 신속하게 받아들여지지만, 코드에서 예방책으로 queue.Full 예외를 잡아야 할 것입니다. 코드에 성능이 중요한 스레드가 있는 라이브러리 개발자인 경우, 코드를 사용할 다른 개발자의 이익을 위해 이것을 (여러분의 로거에 QueueHandlers 만 붙이라는 제안과 함께) 문서로 만들어야 합니다.

해결책의 두 번째 부분은 QueueListener며, 이는 QueueHandler 에 상응하여 설계되었습니다. QueueListener 는 매우 간단합니다: 큐와 처리기를 넘겨주면 QueueHandlers (또는 LogRecords 의 다른 소스)에서 보낸 LogRecord를 큐에서 수신하는 내부 스레드를 시작합니다. LogRecords 는 큐에서 제거되고 처리를 위해 처리기로 전달됩니다.

별도의 QueueListener 클래스를 사용하면 같은 인스턴스를 사용하여 여러 개의 QueueHandlers 를 처리할 수 있다는 장점이 있습니다. 이것은 특별한 이점 없이 처리기당 하나의 스레드를 먹게 되는 기존의 처리기 클래스의 스레드 버전을 만드는 것보다 자원 친화적입니다.

이 두 클래스를 사용하는 예제는 다음과 같습니다 (임포트 생략):

que = queue.Queue(-1)  # 크기 제한 없음
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# 로그 출력은 내부 큐를 모니터링하는 내부 스레드 대신
# 이벤트를 생성한 스레드(메인 스레드)를 표시합니다. 이것이 바로 원하는 동작입니다.
root.warning('주의하세요!')
listener.stop()

실행하면, 다음과 같은 결과를 만듭니다:

MainThread: Look out!

참고

이전 논의는 특정적으로 비동기 코드에 관한 것이 아니었고, 느린 로깅 처리기에 대한 것이었지만, 비동기 코드에서 로깅할 때는 네트워크나 심지어 파일 핸들러도 문제(이벤트 루프 차단)를 일으킬 수 있다는 점을 유념해야 합니다. 그 이유는 일부 로깅 작업이 asyncio 내부에서 수행되기 때문입니다. 따라서 애플리케이션에 비동기 코드를 사용한다면, 모든 차단 코드가 QueueListener 스레드에서만 실행되도록 위의 접근 방식을 사용하여 로깅하는 것이 가장 좋습니다.

버전 3.5에서 변경: 파이썬 3.5 이전 버전에서는, QueueListener 는 항상 큐에서 받은 모든 메시지를 초기화될 때 제공된 모든 처리기로 전달했습니다. (이것은 큐가 채워질 때 수준 필터링이 모두 반대편에서 행해졌다고 가정했기 때문입니다.) 3.5 이후부터, 이 동작은 키워드 인자 respect_handler_level=True 를 리스너의 생성자에 전달함으로써 변경될 수 있습니다. 이렇게 할 때, 리스너는 각 메시지의 수준을 처리기의 수준과 비교하여, 적절한 메시지만 처리기에 전달되도록 합니다.

버전 3.14에서 변경: QueueListenerwith 문을 통해 시작(및 중지)할 수 있습니다. 예를 들면 다음과 같습니다:

with QueueListener(que, handler) as listener:
    # 'with' 블록에 진입하면 큐 리스너가 자동으로 시작됩니다.
    pass
# 'with' 블록을 벗어나면 큐 리스너가 자동으로 중지됩니다.

네트워크에서 로깅 이벤트 보내고 받기

네트워크를 통해 로깅 이벤트를 보내고, 받는 쪽에서 처리하려고 한다고 합시다. 이렇게 하는 간단한 방법은 SocketHandler 인스턴스를 보내는 쪽의 루트 로거에 연결하는 것입니다:

import logging, logging.handlers

rootLogger = logging.getLogger()
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
                    logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# 포매터는 신경 쓰지 않아도 됩니다. 소켓 핸들러는 이벤트를
# 서식이 적용되지 않은 피클로 전송하기 때문입니다.
rootLogger.addHandler(socketHandler)

# 이제 루트 로거 또는 다른 모든 로거에 로그를 작성할 수 있습니다. 먼저 루트...
logging.info('잭도우가 수정 같은 큰 스핑크스를 사랑합니다.')

# 다음으로, 애플리케이션의 특정 영역을 나타낼 수 있는 몇 가지 다른 로거를 정의합니다:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('민들레가 재빨리 불어와 어수룩한 짐에게 문제를 일으킨다.')
logger1.info('재빠르게 날뛰는 얼룩말이 어수룩함을 문제 삼는다.')
logger2.warning('지옥의 자신감 넘치는 여우가 장난감을 빼앗아갔다.')
logger2.error('다섯 마리의 복싱 마법사가 빨리 점프한다.')

수신 측에서는 socketserver 모듈을 사용하여 수신기를 구성할 수 있습니다. 기본적인 작업 예제는 다음과 같습니다:

import pickle
import logging
import logging.handlers
import socketserver
import struct


class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    """스트리밍 로깅 요청을 위한 핸들러입니다.

    이것은 기본적으로 현재 구성된 어떤 로깅 정책을 사용해서 레코드를 기록합니다.
    """

    def handle(self):
        """
        여러 요청을 처리합니다. 각 요청은 4바이트 길이로 시작하며,
        그 뒤에 pickle 형식의 LogRecord가 붙습니다. 현재 구성된
        어떤 정책에 따라 레코드를 기록합니다.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return pickle.loads(data)

    def handleLogRecord(self, record):
        # 이름이 지정되면, 레코드가 암시하는 것이 아니라 이름을 가진 로거를 사용합니다.
        if self.server.logname is not None:
            name = self.server.logname
        else:
            name = record.name
        logger = logging.getLogger(name)
        # 참고: 모든 레코드가 기록됩니다. 이는 Logger.handle이 일반적으로
        # 로거 수준 필터링 후에 호출되기 때문입니다. 만약 필터링을 하려면,
        # 주기 낭비와 네트워크 대역폭의 손실을 막기 위해 클라이언트 측에서 수행하십시오!
        logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
    """
    테스트에 적합한 간단한 TCP 소켓 기반 로깅 수신자입니다.
    """

    allow_reuse_address = True

    def __init__(self, host='localhost',
                 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                 handler=LogRecordStreamHandler):
        socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
        self.abort = 0
        self.timeout = 1
        self.logname = None

    def serve_until_stopped(self):
        import select
        abort = 0
        while not abort:
            rd, wr, ex = select.select([self.socket.fileno()],
                                       [], [],
                                       self.timeout)
            if rd:
                self.handle_request()
            abort = self.abort

def main():
    logging.basicConfig(
        format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
    tcpserver = LogRecordSocketReceiver()
    print('TCP 서버 시작 예정...')
    tcpserver.serve_until_stopped()

if __name__ == '__main__':
    main()

먼저 서버를 실행한 다음 클라이언트를 실행합니다. 클라이언트 쪽에서는 콘솔에 아무것도 인쇄되지 않습니다. 서버 측에서 다음과 같은 내용이 보여야 합니다.:

About to start TCP server...
   59 root            INFO     Jackdaws love my big sphinx of quartz.
   59 myapp.area1     DEBUG    Quick zephyrs blow, vexing daft Jim.
   69 myapp.area1     INFO     How quickly daft jumping zebras vex.
   69 myapp.area2     WARNING  Jail zesty vixen who grabbed pay from quack.
   69 myapp.area2     ERROR    The five boxing wizards jump quickly.

참고: pickle은 일부 시나리오에서 보안 문제가 있을 수 있습니다. 이런 영향이 있다면, makePickle() 메서드를 오버라이드하고 대체 방식을 구현하여, 위의 스크립트를 수정하는 방법으로 대체 직렬화 방식을 사용할 수 있습니다.

프로덕션 환경에서 로깅 소켓 리스너 실행하기

로깅 리스너를 프로덕션에서 실행하려면 `Supervisor <http://supervisord.org/>`와 같은 프로세스 관리 도구를 사용해야 할 수 있습니다. 여기에는 Supervisor를 사용하여 위의 기능을 실행하는 데 필요한 기본 파일들이 포함된 Gist<socket-listener-gist>가 있습니다. 이 구성에는 다음 파일들로 구성되어 있습니다:

파일

용도

prepare.sh

테스트 환경을 준비하는 Bash 스크립트입니다.

supervisor.conf

리스너와 다중 프로세스 웹 애플리케이션에 대한 항목이 포함된 Supervisor 설정 파일입니다.

ensure_app.sh

Supervisor가 위의 구성으로 실행되도록 보장하는 Bash 스크립트입니다.

log_listener.py

로그 이벤트를 수신하여 파일에 기록하는 소켓 리스너 프로그램입니다.

main.py

리스너에 연결된 소켓을 통해 로깅을 수행하는 간단한 웹 애플리케이션입니다.

webapp.json

웹 애플리케이션용 JSON 설정 파일입니다.

client.py

웹 애플리케이션을 테스트하는 Python 스크립트입니다.

웹 애플리케이션은 `Gunicorn <https://gunicorn.org/>`을 사용하며, 이는 요청 처리를 위해 여러 작업자 프로세스를 시작하는 인기 있는 웹 애플리케이션 서버입니다. 이 예제 설정은 작업자들이 서로 충돌 없이 동일한 로그 파일에 기록할 수 있는 방법을 보여주며, 모두 소켓 리스너를 거칩니다.

이 파일들을 테스트하려면 POSIX 환경에서 다음을 수행하십시오:

  1. Download ZIP 버튼을 사용하여 `the Gist <socket-listener-gist>`를 ZIP 아카이브로 다운로드하십시오.

  2. 위 파일을 아카이브에서 작업 디렉터리로 압축 해제하십시오.

  3. 작업 디렉터리에서 bash prepare.sh 을 실행하여 환경 설정을 완료합니다. 이 명령어는 Supervisor 관련 파일과 로그 파일을 담는 run 하위 디렉터리를, 그리고 bottle, gunicornsupervisor 가 설치되는 가상 환경을 담는 venv 하위 디렉터리를 생성합니다.

  4. bash ensure_app.sh 를 실행하여 Supervisor가 위의 구성으로 실행되고 있는지 확인하십시오.

  5. 웹 애플리케이션을 실행하려면 venv/bin/python client.py 를 실행합니다. 이 과정에서 로그에 기록이 발생하게 됩니다.

  6. run 하위 디렉터리의 로그 파일들을 검사하십시오. 패턴 :file:`app.log*`과 일치하는 파일들에서 가장 최근의 로그 라인을 볼 수 있습니다. 이 로그들은 서로 다른 작업자 프로세스들이 비결정적으로 처리했기 때문에 특정 순서를 따르지 않을 것입니다.

  7. venv/bin/supervisorctl -c supervisor.conf shutdown 을 실행하여 리스너와 웹 애플리케이션을 중지할 수 있습니다.

테스트 환경 내의 다른 것과 충돌하는 경우 드물게 구성 파일들을 조정해야 할 수도 있습니다.

기본 구성은 9020 포트의 TCP 소켓을 사용합니다. 대신 Unix 도메인 소켓을 사용하는 방법은 다음과 같습니다:

  1. listener.json`에 사용할 도메인 소켓 경로를 가진 ``socket` 키를 추가하십시오. 이 키가 존재하면, 리스너는 해당 도메인 소켓에서 리스닝을 수행하며 TCP 소켓(port 키는 무시됨)에서는 리스닝하지 않습니다.

  2. webapp.json 에서 소켓 핸들러 구성 딕셔너리를 수정하여 host 값을 도메인 소켓 경로로 설정하고, port 값은 null 로 지정하십시오.

로그 출력에 문맥 정보 추가

로깅 호출에 전달된 매개 변수 외에도 로깅 출력에 문맥 정보가 포함되기 원하는 경우가 있습니다. 예를 들어, 네트워크 응용 프로그램에서, (원격 클라이언트의 사용자 이름 또는 IP 주소와 같은) 클라이언트별 정보를 로그에 기록하는 것이 바람직 할 수 있습니다. 이를 달성하기 위해 extra 매개 변수를 사용할 수는 있지만, 이러한 방식으로 정보를 전달하는 것이 항상 편리하지는 않습니다. 연결마다 Logger 인스턴스를 만들고 싶을지 모르지만, 이러한 인스턴스는 가비지 수집되지 않기 때문에 좋지 않습니다. Logger 인스턴스의 수가 응용 프로그램 로깅에 사용하고자 하는 세분성 수준에 의존적일 때 이것이 실제로 문제가 되지는 않지만, Logger 인스턴스의 수가 실질적으로 무제한이 되면 관리하기 어려울 수 있습니다.

문맥 정보 전달에 LoggerAdapters 사용하기

로깅 이벤트 정보와 함께 출력되는 문맥 정보를 전달하는 쉬운 방법은 LoggerAdapter 클래스를 사용하는 것입니다. 이 클래스는 Logger처럼 보이도록 설계되어 있어서, debug(), info(), warning(), error(), exception(), critical()log()를 호출할 수 있습니다. 이 메서드들은 Logger 에 있는 것과 똑같은 서명을 가지므로, 두 형의 인스턴스를 같은 의미로 사용할 수 있습니다.

LoggerAdapter 의 인스턴스를 생성할 때, Logger 인스턴스와 문맥 정보가 포함된 딕셔너리류 객체를 전달합니다. LoggerAdapter 의 인스턴스에서 로깅 메서드 중 하나를 호출하면, 생성자에 전달된 하위 Logger 인스턴스에 호출을 위임하고, 이 호출에 문맥 정보를 전달하도록 배치합니다. 다음은 LoggerAdapter 코드에서 발췌한 내용입니다:

def debug(self, msg, /, *args, **kwargs):
    """
    이 어댑터 인스턴스에서 컨텍스트 정보를 추가한 후,
    기반 로거에 디버그 호출을 위임합니다.
    """
    msg, kwargs = self.process(msg, kwargs)
    self.logger.debug(msg, *args, **kwargs)

LoggerAdapterprocess() 메서드는 문맥 정보가 로그 출력에 추가되는 곳입니다. 로깅 호출의 메시지 및 키워드 인자를 받아서, 하부 로거에 대한 호출에서 사용될 (대체로) 수정된 버전을 돌려줍니다. 이 메서드의 기본 구현은 메시지는 그대로 두고, 키워드 인자에 생성자로 전달된 딕셔너리류 객체를 값으로 갖는 ‘extra’ 키를 삽입합니다. 물론, 어댑터에 대한 호출에서 ‘extra’ 키워드 인자를 전달한 경우 자동으로 덮어씁니다.

‘extra’를 사용하는 장점은, 딕셔너리류 객체에 들어있는 값이 LogRecord 인스턴스의 __dict__에 병합되어, 키에 대해 알고 있는 Formatter 인스턴스로 사용자 정의된 문자열을 사용할 수 있게 된다는 것입니다. 다른 방법이 필요한 경우, 가령 메시지 문자열의 앞이나 뒤에 문맥 정보를 덧붙이려는 경우, LoggerAdapter 의 서브 클래스를 만들고, 필요한 작업을 수행하기 위해 process() 를 재정의해야 합니다. 다음은 간단한 예제입니다:

class CustomAdapter(logging.LoggerAdapter):
    """
    이 예제 어댑터는 전달된 딕셔너리 형태의 객체가 'connid' 키를 가지고,
    그 값이 대괄호 안에 로그 메시지 앞에 붙도록 예상합니다.
    """
    def process(self, msg, kwargs):
        return '[%s] %s' % (self.extra['connid'], msg), kwargs

이런 식으로 사용할 수 있습니다:

logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})

그러면 어댑터에 로그 하는 모든 이벤트는 로그 메시지 앞에 some_conn_id 값이 붙습니다.

딕셔너리 이외의 객체를 사용하여 문맥 정보 전달하기

실제 딕셔너리를 LoggerAdapter 에 전달할 필요는 없습니다 - 로깅에 딕셔너리처럼 보일 수 있도록, __getitem____iter__ 를 구현하는 클래스의 인스턴스를 전달할 수 있습니다. 값을 동적으로 생성하려는 경우 (반면에 딕셔너리에 들어있는 값은 바뀌지 않습니다) 유용합니다.

문맥 정보 전달에 필터 사용하기

사용자 정의 Filter를 사용하여 로그 출력에 문맥 정보를 추가할 수도 있습니다. Filter 인스턴스는 전달된 LogRecords 를 수정할 수 있는데, 어트리뷰트를 추가해서 적절한 포맷 문자열이나 필요하다면 사용자 정의 Formatter를 사용해서 출력되도록 할 수 있습니다.

예를 들어 웹 응용 프로그램에서, 처리 중인 요청(또는 적어도 그것의 흥미로운 부분)을 스레드 로컬 (threading.local) 변수에 저장한 다음, Filter 에서 액세스해서, 요청에서 온 정보를 - 원격 IP 주소와 원격 사용자의 사용자 이름이라고 합시다 - 위의 LoggerAdapter 예제에서와같이 어트리뷰트 이름 ‘ip’와 ‘user’를 사용하여 LogRecord 에 추가할 수 있습니다. 이 경우 같은 포맷 문자열을 사용하여 위에 표시된 것과 비슷한 출력을 얻을 수 있습니다. 다음은 스크립트 예입니다:

import logging
from random import choice

class ContextFilter(logging.Filter):
    """
    이 필터는 로그에 컨텍스트 정보를 주입합니다.

    실제 컨텍스트 정보 대신, 이 데모에서는 무작위 데이터를 사용합니다.
    """

    USERS = ['jim', 'fred', 'sheila']
    IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']

    def filter(self, record):

        record.ip = choice(ContextFilter.IPS)
        record.user = choice(ContextFilter.USERS)
        return True

if __name__ == '__main__':
    levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    a1 = logging.getLogger('a.b.c')
    a2 = logging.getLogger('d.e.f')

    f = ContextFilter()
    a1.addFilter(f)
    a2.addFilter(f)
    a1.debug('A debug message')
    a1.info('An info message with %s', 'some parameters')
    for x in range(10):
        lvl = choice(levels)
        lvlname = logging.getLevelName(lvl)
        a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')

실행하면 다음과 같은 결과가 나옵니다:

2010-09-06 22:38:15,292 a.b.c DEBUG    IP: 123.231.231.123 User: fred     A debug message
2010-09-06 22:38:15,300 a.b.c INFO     IP: 192.168.0.1     User: sheila   An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 127.0.0.1       User: jim      A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 127.0.0.1       User: sheila   A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 123.231.231.123 User: fred     A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1     User: jim      A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 192.168.0.1     User: jim      A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR    IP: 127.0.0.1       User: sheila   A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG    IP: 123.231.231.123 User: fred     A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO     IP: 123.231.231.123 User: fred     A message at INFO level with 2 parameters

contextvars 사용법

Python 3.7부터 contextvars 모듈은 컨텍스트-지역 스토리지를 제공하여 threadingasyncio 처리가 모두 가능합니다. 따라서 이러한 유형의 스토리지는 스레드 로컬보다 일반적으로 선호될 수 있습니다. 다음 예제는 멀티스레드 환경에서 웹 애플리케이션이 처리한 요청 속성과 같이 컨텍스트 정보가 로그에 채워지는 방법을 보여줍니다.

예시로, 서로 독립적이지만 동일한 Python 프로세스 내에서 실행되고 공통 라이브러리를 사용하는 여러 웹 애플리케이션이 있다고 가정해 봅시다. 각 애플리케이션이 자체 로그를 가지면서도 모든 로깅 메시지(및 다른 요청 처리 코드)가 적절한 애플리케이션의 로그 파일로 전달되게 하고, 동시에 클라이언트 IP, HTTP 요청 메서드 및 클라이언트 사용자 이름과 같은 추가 컨텍스트 정보를 포함하려면 어떻게 해야 할까요?

라이브러리가 다음 코드로 시뮬레이션될 수 있다고 가정해 보겠습니다.

# webapplib.py
import logging
import time

logger = logging.getLogger(__name__)

def useful():
    # 라이브러리에서 기록하는 대표적인 이벤트입니다.
    logger.debug('Hello from webapplib!')
    # 다른 스레드가 실행될 수 있도록 잠시 대기합니다.
    time.sleep(0.01)

우리는 두 개의 간단한 클래스인 `Request`와 `WebApp`을 통해 여러 웹 애플리케이션을 시뮬레이션할 수 있습니다. 이는 실제로 스레드 기반의 웹 애플리케이션이 작동하는 방식을 시뮬레이션합니다. 각 요청은 스레드에 의해 처리됩니다:

# main.py
import argparse
from contextvars import ContextVar
import logging
import os
from random import choice
import threading
import webapplib

logger = logging.getLogger(__name__)
root = logging.getLogger()
root.setLevel(logging.DEBUG)

class Request:
    """
    더미 HTTP 요청 메서드, 클라이언트 IP 주소 및 클라이언트 사용자 이름을 보유하는 단순한 더미 요청 클래스
    """
    def __init__(self, method, ip, user):
        self.method = method
        self.ip = ip
        self.user = user

# 시뮬레이션에서 사용될 더미 요청 집합 - 무작위로
# 이 리스트에서 선택합니다. 모든 GET 요청은 192.168.2.XXX 주소에서
# 오는 반면, POST 요청은 192.16.3.XXX 주소에서 온다는 점에 유의하십시오.
# 샘플 요청에는 세 명의 사용자가 나타납니다.

REQUESTS = [
    Request('GET', '192.168.2.20', 'jim'),
    Request('POST', '192.168.3.20', 'fred'),
    Request('GET', '192.168.2.21', 'sheila'),
    Request('POST', '192.168.3.21', 'jim'),
    Request('GET', '192.168.2.22', 'fred'),
    Request('POST', '192.168.3.22', 'sheila'),
]

# 포맷 문자열에 HTTP 메서드, 클라이언트 IP 및 사용자 이름과 같은
# 요청 컨텍스트 정보에 대한 참조가 포함되어 있음에 유의하십시오.

formatter = logging.Formatter('%(threadName)-11s %(appName)s %(name)-9s %(user)-6s %(ip)s %(method)-4s %(message)s')

# 컨텍스트 변수들을 생성합니다. 이 변수들은 요청 처리 시작 시 채워지며,
# 해당 처리 중에 발생하는 로깅에 사용됩니다.

ctx_request = ContextVar('request')
ctx_appname = ContextVar('appname')

class InjectingFilter(logging.Filter):
    """
    로그에 컨텍스트별 정보를 주입하고 특정 웹 앱에 대한 정보만 해당 로그에 포함되도록 보장하는 필터
    """
    def __init__(self, app):
        self.app = app

    def filter(self, record):
        request = ctx_request.get()
        record.method = request.method
        record.ip = request.ip
        record.user = request.user
        record.appName = appName = ctx_appname.get()
        return appName == self.app.name

class WebApp:
    """
    웹 앱 전용 로그를 위한 자체 핸들러와 필터를 가진 더미 웹 애플리케이션 클래스.
    """
    def __init__(self, name):
        self.name = name
        handler = logging.FileHandler(name + '.log', 'w')
        f = InjectingFilter(self)
        handler.setFormatter(formatter)
        handler.addFilter(f)
        root.addHandler(handler)
        self.num_requests = 0

    def process_request(self, request):
        """
        요청을 처리하기 위한 더미 메서드입니다. 모든 요청에 대해 서로 다른 스레드에서 호출됩니다.
        다른 작업을 수행하기 전에 컨텍스트 정보를 컨텍스트 변수에 저장합니다.
        """
        ctx_request.set(request)
        ctx_appname.set(self.name)
        self.num_requests += 1
        logger.debug('Request processing started')
        webapplib.useful()
        logger.debug('Request processing finished')

def main():
    fn = os.path.splitext(os.path.basename(__file__))[0]
    adhf = argparse.ArgumentDefaultsHelpFormatter
    ap = argparse.ArgumentParser(formatter_class=adhf, prog=fn,
                                 description='Simulate a couple of web '
                                             'applications handling some '
                                             'requests, showing how request '
                                             'context can be used to '
                                             'populate logs')
    aa = ap.add_argument
    aa('--count', '-c', type=int, default=100, help='How many requests to simulate')
    options = ap.parse_args()

    # 더미 웹 앱들을 생성하고 무작위로 선택할 수 있도록 리스트에 넣습니다
    app1 = WebApp('app1')
    app2 = WebApp('app2')
    apps = [app1, app2]
    threads = []
    # 모든 이벤트를 캡처할 공통 핸들러를 추가합니다
    handler = logging.FileHandler('app.log', 'w')
    handler.setFormatter(formatter)
    root.addHandler(handler)

    # 요청 처리를 위한 호출을 생성합니다
    for i in range(options.count):
        try:
            # 무작위로 앱을 선택하고 처리할 요청을 선택합니다
            app = choice(apps)
            request = choice(REQUESTS)
            # 자체 스레드에서 요청을 처리합니다
            t = threading.Thread(target=app.process_request, args=(request,))
            threads.append(t)
            t.start()
        except KeyboardInterrupt:
            break

    # 스레드들이 종료될 때까지 기다립니다
    for t in threads:
        t.join()

    for app in apps:
        print('%s processed %s requests' % (app.name, app.num_requests))

if __name__ == '__main__':
    main()

위 코드를 실행하면, 요청의 대략 절반이 :file:`app1.log`로, 나머지는 :file:`app2.log`로 들어가고, 모든 요청은 :file:`app.log`에 기록되는 것을 확인할 수 있습니다. 각 웹앱별 로그에는 해당 웹앱의 로그 엔트리만 포함되며, 요청 정보는 로그에서 일관되게 표시됩니다 (즉, 각 더미 요청의 정보가 항상 한 줄의 로그에 함께 나타납니다). 이는 다음 셸 출력을 통해 설명됩니다:

~/logging-contextual-webapp$ python main.py
app1 processed 51 requests
app2 processed 49 requests
~/logging-contextual-webapp$ wc -l *.log
  153 app1.log
  147 app2.log
  300 app.log
  600 total
~/logging-contextual-webapp$ head -3 app1.log
Thread-3 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
Thread-3 (process_request) app1 webapplib jim    192.168.3.21 POST Hello from webapplib!
Thread-5 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ head -3 app2.log
Thread-1 (process_request) app2 __main__  sheila 192.168.2.21 GET  Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET  Hello from webapplib!
Thread-2 (process_request) app2 __main__  jim    192.168.2.20 GET  Request processing started
~/logging-contextual-webapp$ head app.log
Thread-1 (process_request) app2 __main__  sheila 192.168.2.21 GET  Request processing started
Thread-1 (process_request) app2 webapplib sheila 192.168.2.21 GET  Hello from webapplib!
Thread-2 (process_request) app2 __main__  jim    192.168.2.20 GET  Request processing started
Thread-3 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
Thread-2 (process_request) app2 webapplib jim    192.168.2.20 GET  Hello from webapplib!
Thread-3 (process_request) app1 webapplib jim    192.168.3.21 POST Hello from webapplib!
Thread-4 (process_request) app2 __main__  fred   192.168.2.22 GET  Request processing started
Thread-5 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
Thread-4 (process_request) app2 webapplib fred   192.168.2.22 GET  Hello from webapplib!
Thread-6 (process_request) app1 __main__  jim    192.168.3.21 POST Request processing started
~/logging-contextual-webapp$ grep app1 app1.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app2.log | wc -l
147
~/logging-contextual-webapp$ grep app1 app.log | wc -l
153
~/logging-contextual-webapp$ grep app2 app.log | wc -l
147

핸들러에 컨텍스트 정보 부여하기

각 :class:`~Handler`는 고유한 필터 체인을 가지고 있습니다. 다른 핸들러로 누출되지 않게 :class:`LogRecord`에 컨텍스트 정보를 추가하려면, 다음 스크립트에 표시된 것처럼 인플레이스(in-place) 수정 대신 새로운 :class:`~LogRecord`를 반환하는 필터를 사용할 수 있습니다:

import copy
import logging

def filter(record: logging.LogRecord):
    record = copy.copy(record)
    record.user = 'jim'
    return record

if __name__ == '__main__':
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(message)s from %(user)-8s')
    handler.setFormatter(formatter)
    handler.addFilter(filter)
    logger.addHandler(handler)

    logger.info('A log message')

여러 프로세스에서 단일 파일에 로깅 하기

logging 이 스레드-안전하고, 단일 프로세스의 여러 스레드에서 단일 파일로 로깅 하는 것이 지원되지만, 파이썬에서 여러 프로세스가 단일 파일에 액세스하는 것을 직렬화하는 표준적인 방법이 없으므로, 여러 프로세스에서 단일 파일로 로깅 하는 것은 지원되지 않습니다. 여러 프로세스에서 하나의 파일에 로그 해야 하는 경우, 이 작업을 수행하는 한 가지 방법은 모든 프로세스가 로그를 SocketHandler 에 기록하고, 소켓에서 읽어서 파일로 로그 하는 소켓 서버를 구현하는 별도의 프로세스를 사용하는 것입니다. (원한다면, 기존 프로세스 중 하나에서 한 스레드가 이 기능을 전담하도록 할 수 있습니다.) 이 섹션에서 이 접근법을 더 자세하게 설명하고, 여러분의 응용 프로그램에 적용하기 위한 출발점으로 사용할 수 있는 작동하는 소켓 수신기를 제공합니다.

또한 multiprocessing 모듈의 Lock 클래스를 사용하여 파일에 대한 프로세스 간 접근을 직렬화하는 자체 핸들러를 작성할 수도 있습니다. 표준 라이브러리의 :class:`FileHandler`와 하위 클래스는 :mod:`multiprocessing`을 사용하지 않습니다.

또는, QueueQueueHandler 를 사용하여, 모든 로깅 이벤트를 다중 프로세스 응용 프로그램의 프로세스 중 하나에 보낼 수 있습니다. 다음 예제 스크립트는 이렇게 하는 방법을 보여줍니다; 예제에서 별도의 리스너 프로세스가 다른 프로세스가 보낸 이벤트를 수신하고 자체 로깅 구성에 따라 이벤트를 기록합니다. 이 예제가 한 가지 방법만을 보여 주지만 (예를 들어, 별도의 리스너 프로세스 대신 리스너 스레드를 사용할 수도 있습니다 – 구현은 비슷할 것입니다), 리스너와 응용 프로그램의 다른 프로세스가 완전히 다른 로깅 구성을 사용하도록 허용하고, 여러분 자신의 특별한 요구 사항을 충족하는 코드의 기초로 사용할 수 있습니다:

# 본인의 코드에는 이 임포트들이 필요할 것입니다
import logging
import logging.handlers
import multiprocessing

# 다음 두 임포트 라인은 이 데모에서만 사용됩니다
from random import choice, random
import time

#
# 리스너와 워커를 위한 로깅 설정을 정의하고 싶을 것이므로,
# 리스너와 워커 프로세스 함수는 해당 프로세스의 로깅을 설정하기 위한
# 콜러블인 configurer 파라미터를 받습니다. 이 함수들에는 통신에 사용할
# 큐도 전달됩니다.
#
# 실제로 리스너는 원하는 대로 설정할 수 있지만, 이 간단한 예제에서는
# 리스너가 수신된 레코드에 레벨이나 필터 로직을 적용하지 않음에 유의하십시오.
# 실제로는 프로세스 간에 필터링될 이벤트를 보내지 않도록 워커 프로세스에서
# 이 로직을 수행하고 싶을 것입니다.
#
# 로테이트된 파일의 크기는 결과를 쉽게 볼 수 있도록 작게 설정되었습니다.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# 이것은 리스너 프로세스의 최상위 루프입니다: 큐에서 로깅 이벤트(LogRecords)를
# 기다리고 처리하며, LogRecord로 None을 받으면
# 종료합니다.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # 리스너에게 종료를 알리기 위해 이것을 센티널로 보냅니다.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # 레벨이나 필터 로직이 적용되지 않았습니다 - 그냥 실행합니다!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# 이 데모에서 무작위 선택을 위해 사용되는 배열들

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# 워커 설정은 워커 프로세스 실행 시작 시 수행됩니다.
# 윈도우에서는 fork 시맨틱에 의존할 수 없으므로, 각 프로세스는
# 시작할 때 로깅 설정 코드를 실행하게 됨을 유의하십시오.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # 핸들러 하나만 있으면 됩니다
    root = logging.getLogger()
    root.addHandler(h)
    # 데모를 위해 모든 메시지를 보냅니다. 다른 레벨이나 필터 로직은 적용되지 않았습니다.
    root.setLevel(logging.DEBUG)

# 이것은 워커 프로세스의 최상위 루프이며, 무작위 지연 시간을 두고
# 10개의 이벤트를 로깅한 후 종료합니다.
# print 메시지는 단지 무언가 진행 중임을 알리기 위한 것입니다!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# 여기서 데모가 조율됩니다. 큐를 생성하고, 리스너를 생성 및 시작하며,
# 10개의 워커를 생성하고 시작한 후, 워커들이 끝나기를 기다립니다.
# 그런 다음 리스너에게 종료를 알리기 위해 큐에 None을 보냅니다.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

위의 스크립트 변형은 로깅을 메인 프로세스의 별도의 스레드에서 유지합니다:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)


def worker_process(q):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
    q = Queue()
    d = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    # 이 시점에서 메인 프로세스는 자체적으로 유용한 작업을 수행할 수 있습니다.
    # 완료되면 작업자가 종료될 때까지 기다릴 수 있습니다...
    for wp in workers:
        wp.join()
    # 이제 로깅 스레드에게도 종료하도록 알려줍니다.
    q.put(None)
    lp.join()

이 변형은 특정 로거에 대한 구성을 적용하는 방법을 보여줍니다 - 예를 들어, foo 로거는 foo 서브 시스템의 모든 이벤트를 mplog-foo.log 파일에 저장하는 특별한 처리기를 갖고 있습니다. 이것은 메인 프로세스의 로깅 시스템이 ( 로깅 이벤트가 작업자 프로세스에서 만들어졌다 하더라도) 메시지를 적절한 대상으로 전달하는 데 사용됩니다.

concurrent.futures.ProcessPoolExecutor 사용하기

concurrent.futures.ProcessPoolExecutor를 사용하여 작업자 프로세스를 시작하려면, 약간 다른 방법으로 큐를 만들어야 합니다.

queue = multiprocessing.Queue(-1)

대신에, 다음을 사용해야 합니다

queue = multiprocessing.Manager().Queue(-1)  # 위의 예제에서도 작동합니다.

그런 다음 작업자 생성을:

workers = []
for i in range(10):
    worker = multiprocessing.Process(target=worker_process,
                                     args=(queue, worker_configurer))
    workers.append(worker)
    worker.start()
for w in workers:
    w.join()

에서 다음과 같이 대체할 수 있습니다 (먼저 concurrent.futures를 임포트 하는 것을 기억하십시오):

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for i in range(10):
        executor.submit(worker_process, queue, worker_configurer)

Gunicorn 및 uWSGI를 사용한 웹 애플리케이션 배포

웹 애플리케이션을 Gunicorn 또는 uWSGI (또는 유사한)를 사용하여 배포할 때, 클라이언트 요청을 처리하기 위해 여러 워커 프로세스가 생성됩니다. 이러한 환경에서는 웹 애플리케이션 내에서 파일 기반 핸들러를 직접 생성하는 것을 피하십시오. 대신, SocketHandler 를 사용하여 웹 애플리케이션이 별도의 프로세스에 있는 리스너로 로깅하도록 하십시오. 이 설정은 Supervisor와 같은 프로세스 관리 도구를 사용하여 수행할 수 있으며, 자세한 내용은 Running a logging socket listener in production 를 참조하십시오.

파일 회전 사용하기

때로는 로그 파일이 특정 크기까지 커지도록 하고, 그 후 새 파일을 열어 기록해야 할 때가 있습니다. 이러한 파일 중 특정 개수를 유지하고, 해당 개수의 파일이 생성되었을 때 파일들을 회전시켜 파일의 수와 파일의 크기가 모두 제한되도록 하고 싶을 수 있습니다. 이 사용 패턴을 위해 로깅 패키지는 RotatingFileHandler::를 제공합니다.

import glob
import logging
import logging.handlers

LOG_FILENAME = 'logging_rotatingfile_example.out' # 로깅 메시지 파일명을 정의합니다.

# 원하는 출력 수준을 가진 특정 로거를 설정합니다.
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)

# 핸들러에 로그 메시지를 추가합니다.
handler = logging.handlers.RotatingFileHandler( # 파일을 순환하며 처리하는 핸들러입니다.
              LOG_FILENAME, maxBytes=20, backupCount=5)

my_logger.addHandler(handler)

# 일부 메시지를 로깅합니다.
for i in range(20):
    my_logger.debug('i = %d' % i) # 디버그 레벨로 현재 인덱스를 출력합니다.

# 어떤 파일들이 생성되었는지 확인합니다.
logfiles = glob.glob('%s*' % LOG_FILENAME)

for filename in logfiles:
    print(filename) # 생성된 모든 로그 파일을 출력합니다.

결과는 6개의 파일이어야 하고, 각기 응용 프로그램에 대한 로그 기록의 일부입니다:

logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5

가장 최근의 파일은 항상 logging_rotatingfile_example.out 이며, 크기 제한에 도달할 때마다 접미사 .1 이 붙은 이름으로 변경됩니다. 기존 백업 파일 각각의 이름이 변경되어 접미사가 증가하고 ( .1.2 가 되는 등) .6 파일이 지워집니다.

분명, 이 예제는 로그 길이를 극단적으로 작게 설정합니다. maxBytes 를 적절한 값으로 설정하고 싶을 겁니다.

대체 포매팅 스타일 사용하기

logging이 파이썬 표준 라이브러리에 추가되었을 때, 가변 내용으로 메시지를 포맷하는 유일한 방법은 %-포매팅 방법을 사용하는 것이었습니다. 그 이후로, 파이썬은 두 개의 새로운 포매팅 접근법을 얻었습니다: string.Template(파이썬 2.4에 추가됨)과 str.format()(파이썬 2.6에 추가됨).

로깅은 (3.2부터) 이 두 가지 추가 포매팅 스타일에 대해 개선된 지원을 제공합니다. Formatter 클래스는 style 이라는 추가적인 키워드 매개 변수를 취하도록 개선되었습니다. 기본값은 '%' 이지만, 다른 두 가지 포매팅 스타일에 해당하는 '{''$' 를 사용할 수 있습니다. (여러분이 기대하듯이) 이전 버전과의 호환성은 기본적으로 유지되지만, style 매개변수를 명시적으로 지정하면 str.format() 또는 string.Template 과 함께 작동하는 포맷 문자열을 지정할 수 있습니다. 다음은 가능성을 보여주기 위한 예제 콘솔 세션입니다:

>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
...                        style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG    This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
...                        style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>

로그로 최종 출력하기 위해 로깅 메시지를 포매팅하는 것은 개별 로깅 메시지가 구성되는 방식과 완전히 별개입니다. 개별 메시지에는 다음과 같이 %-포매팅을 사용할 수 있습니다:

>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>

로깅 호출(logger.debug(), logger.info() 등)은 실제 로깅 메시지 자체를 위해서는 위치 매개 변수만을 취하고, 키워드 매개 변수는 실제 로깅 호출을 어떻게 다뤄야 하는지를 지정하는 옵션을 결정하는 용도로만 사용됩니다 (예를 들어, 트레이스백 정보를 로그 해야 할지를 가리키는 exc_info 키워드 매개 변수나 로그에 추가되는 문맥 정보를 나타내는 extra 키워드 매개 변수). 그래서 여러분은 str.format() 또는 string.Template 문법을 사용하여 직접 로깅 호출을 할 수 없습니다, 내부적으로 logging 패키지가 %-포매팅을 사용하여 포맷 문자열과 변수 인자를 병합하기 때문입니다. 이전 버전과의 호환성을 유지하는 동안은 이 상황이 바뀌지 않을 것입니다. 기존 코드에 있는 모든 로깅 호출이 %-포맷 문자열을 사용하기 때문입니다.

그러나 {}- 및 $- 포매팅을 사용하여 개별 로그 메시지를 구성하는 방법이 있습니다. 메시지의 경우, 메시지 포맷 문자열로 임의의 객체를 사용할 수 있으며, logging 패키지는 실제 형식 문자열을 얻기 위해 그 객체에 대해 str() 을 호출한다는 것을 상기하십시오. 다음 두 가지 클래스를 고려하십시오:

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

이 중 하나를 포맷 문자열 대신 사용하면, {}- 또는 $-포매팅을 사용하여 포맷된 로그 출력의 “%(message)s”, “{message}” 또는 “$message” 자리에 나타나는 실제 “message” 부분을 만들 수 있습니다. 어떤 것을 로그 하고 싶을 때마다 클래스 이름을 사용하는 것은 다소 꼴사납지만, __(두 개의 밑줄 —- gettext.gettext() 나 그 형제들의 동의어/별칭으로 사용되는 _ 과 혼동하지 마세요)와 같은 별칭을 사용하면 꽤 쓸만합니다.

위의 클래스가 파이썬에 포함되어 있지는 않지만, 아주 쉽게 여러분의 코드에 복사하여 붙여넣을 수 있습니다. 다음과 같이 사용될 수 있습니다 (wherever 라는 모듈에서 선언되었다고 가정합니다):

>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
...       point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

위의 예제는 print() 를 사용하여 포매팅이 어떻게 작동하는지 보여주고 있지만, 물론 이 접근법으로 실제 로깅 할 때는 logger.debug() 나 그와 유사한 것을 사용해야 합니다.

이 접근 방식으로는 성능 저하가 없다는 점에 주목할 필요가 있습니다. 실제 형식 지정은 로깅을 호출할 때가 아니라, 로그 메시지가 핸들러를 통해 실제로 출력되기 직전에 (그리고 경우에 따라) 발생합니다. 따라서 다소 특이한 점은 괄호가 포맷 문자열과 인수를 둘러싸고 있다는 것이지, 오직 포맷 문자열만 둘러싸고 있지는 않다는 것입니다. 이것은 __ 표기법이 :samp:`{XXX}Message` 클래스 중 하나에 대한 생성자 호출을 위한 구문적 설탕이기 때문입니다.

원한다면, LoggerAdapter 를 사용하여 다음 예제와 같이 위와 유사한 효과를 얻을 수 있습니다:

import logging

class Message:
    def __init__(self, fmt, args):
        self.fmt = fmt
        self.args = args

    def __str__(self):
        return self.fmt.format(*self.args)

class StyleAdapter(logging.LoggerAdapter):
    def log(self, level, msg, /, *args, stacklevel=1, **kwargs):
        if self.isEnabledFor(level):
            msg, kwargs = self.process(msg, kwargs)
            self.logger.log(level, Message(msg, args), **kwargs,
                            stacklevel=stacklevel+1)

logger = StyleAdapter(logging.getLogger(__name__))

def main():
    logger.debug('Hello, {}', 'world!')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()

위에 제시된 스크립트가 Python 3.8 이상에서 실행될 경우 Hello, world! 메시지를 로깅해야 합니다.

사용자 정의 LogRecord

모든 로깅 이벤트는 LogRecord 인스턴스로 표현됩니다. 이벤트가 로그 되고 로거 수준에 의해 필터링 되지 않으면, LogRecord 가 생성되고 이벤트에 대한 정보로 채워진 다음 해당 로거(와 그 조상들, 계층 상위로의 전파가 비활성화된 지점의 로거까지)의 처리기로 전달됩니다. 파이썬 3.2 이전에는, 이 생성이 일어나는 곳이 두 곳밖에 없었습니다:

  • Logger.makeRecord(), 이벤트 로깅의 일반적인 프로세스에서 호출됩니다. 인스턴스를 생성하기 위해 LogRecord 를 직접 호출합니다.

  • makeLogRecord(), LogRecord에 추가될 어트리뷰트를 포함하는 딕셔너리와 함께 호출됩니다. 보통 적절한 딕셔너리가 네트워크를 통해 (예를 들어, SocketHandler 를 통해 피클 형태로, 또는 HTTPHandler 를 통해 JSON 형식으로) 수신될 때 호출됩니다.

이것은 보통 LogRecord 로 특별한 것을 할 필요가 있다면, 다음 중 하나를 해야 한다는 것을 의미합니다.

  • Logger.makeRecord() 를 재정의하는 자신만의 Logger 서브 클래스를 만들고, 관심 있는 로거의 인스턴스가 만들어지기 전에 setLoggerClass() 를 사용하여 설정하십시오.

  • 로거나 처리기에 Filter를 추가해서 filter() 메서드가 호출될 때 필요한 특별한 조작을 하십시오.

첫 번째 접근법은 여러 라이브러리가 서로 다른 일을 하고 싶어 하는 시나리오에서는 다루기 힘들 것입니다. 각자 자신의 Logger 서브 클래스를 설정하려고 시도할 것이고, 마지막 것이 이기게 될 것입니다.

두 번째 접근법은 많은 경우에 합리적으로 잘 작동하지만, LogRecord 의 특별한 서브 클래스를 사용할 수는 없습니다. 라이브러리 개발자는 로거에 적절한 필터를 설정할 수 있지만, 새로운 로거를 도입할 때마다 이를 수행해야 한다는 것을 기억해야 합니다. 이를 고려하지 않는다면 새 패키지나 모듈을 추가하고 모듈 수준에서 단순히 다음과 같이 합니다:

logger = logging.getLogger(__name__)

이것은 아마도 고려해야 할 많은 것 중 하나일 뿐입니다. 개발자는 자신의 최상위 로거에 첨부된 NullHandler 에도 필터를 추가 할 수 있지만, 응용 프로그램 개발자가 하위 수준 라이브러리 로거에 처리기를 연결하면 호출되지 않습니다 — 그래서 그 처리기로부터의 출력은 라이브러리 개발자의 의도를 반영하지 못합니다.

파이썬 3.2 이상에서는, LogRecord 생성이 사용자가 지정할 수 있는 팩토리를 통해 수행됩니다. 팩토리는 setLogRecordFactory() 로 설정할 수 있고, getLogRecordFactory() 로 조회할 수 있는 콜러블입니다. 팩토리는 LogRecord 생성자와 같은 서명으로 호출되고, 기본 설정은 LogRecord 입니다.

이 방법을 사용하면 사용자 정의 팩토리가 LogRecord 생성의 모든 측면을 제어 할 수 있습니다. 예를 들어, 서브 클래스를 반환하거나, 다음과 같은 방법으로 생성된 레코드에 어트리뷰트를 추가 할 수 있습니다:

old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
    record = old_factory(*args, **kwargs)
    record.custom_attribute = 0xdecafbad
    return record

logging.setLogRecordFactory(record_factory)

이 패턴은 서로 다른 라이브러리가 팩토리를 체인으로 연결할 수 있도록 하며, 서로의 어트리뷰트를 덮어쓰거나 의도하지 않게 표준으로 제공된 어트리뷰트를 덮어쓰지 않는 한, 놀랄 일은 없어야 합니다. 그러나 체인의 각 고리는 모든 로깅 작업에 실행시간 오버헤드를 추가하므로, Filter를 사용해서 원하는 결과를 얻을 수 없을 때만 이 기법을 사용해야 합니다.

QueueHandler 및 QueueListener 상속: ZeroMQ 예제

QueueHandler 클래스 상속

QueueHandler 서브 클래스를 사용하여 다른 유형의 큐에 메시지를 보낼 수 있습니다, 예를 들어 ZeroMQ ‘publish’ 소켓. 아래 예제에서, 소켓은 별도로 생성되어 처리기로 (‘queue’로) 전달됩니다:

import zmq   # pyzmq 사용. ZeroMQ의 Python 바인딩입니다.
import json  # 레코드를 포터블하게 직렬화하기 위함입니다.

ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB) # 또는 zmq.PUSH와 같은 적절한 값을 사용합니다.
sock.bind('tcp://*:5556')        # 또는 원하는 주소를 지정합니다.

class ZeroMQSocketHandler(QueueHandler):
    def enqueue(self, record):
        self.queue.send_json(record.__dict__) # 레코드를 큐에 전송합니다.

물론 구성하는 다른 방법이 있습니다. 예를 들어 처리기가 소켓을 만드는데 필요한 데이터를 전달하는 것입니다:

class ZeroMQSocketHandler(QueueHandler):
    def __init__(self, uri, socktype=zmq.PUB, ctx=None):
        self.ctx = ctx or zmq.Context()
        socket = zmq.Socket(self.ctx, socktype)
        socket.bind(uri)
        super().__init__(socket)

    def enqueue(self, record):
        self.queue.send_json(record.__dict__)

    def close(self):
        self.queue.close()

QueueListener 클래스 상속

다른 유형의 큐에서 메시지를 받기 위해 QueueListener 의 서브 클래스를 만들 수도 있습니다, 예를 들어 ZeroMQ ‘subscribe’ 소켓. 다음은 그 예입니다:

class ZeroMQSocketListener(QueueListener): # ZeroMQ 소켓 리스너를 구현합니다.
    def __init__(self, uri, /, *handlers, **kwargs): # URI와 핸들러/추가 전용 키워드 인수를 받습니다.
        self.ctx = kwargs.get('ctx') or zmq.Context()
        socket = zmq.Socket(self.ctx, zmq.SUB) # 구독(Subscribe) 소켓을 생성합니다.
        socket.setsockopt_string(zmq.SUBSCRIBE, '')  # 모든 메시지를 구독하도록 설정합니다.
        socket.connect(uri) # 지정된 URI에 연결합니다.
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self):
        msg = self.queue.recv_json() # 큐에서 JSON 형식의 메시지를 수신합니다.
        return logging.makeLogRecord(msg) # 로깅 기록 객체로 만듭니다.

QueueHandler 및 QueueListener 상속: pynng 예제입니다.

위 섹션과 유사하게 pynng 를 사용하여 리스너와 핸들러를 구현할 수 있습니다. 이는 NNG <https://nng.nanomsg.org/> 의 Python 바인딩이며, ZeroMQ의 영적 후계자로 알려져 있습니다. 다음 스니펫들은 pynng 가 설치된 환경에서 테스트할 수 있습니다. 다양성을 위해 리스너를 먼저 제시합니다.

QueueListener 클래스 상속

# listener.py
import json
import logging
import logging.handlers

import pynng

DEFAULT_ADDR = "tcp://localhost:13232"

interrupted = False

class NNGSocketListener(logging.handlers.QueueListener):

    def __init__(self, uri, /, *handlers, **kwargs):
        # 인터럽터블하도록 타임아웃을 설정하고, 구독자 소켓을 엽니다.
        socket = pynng.Sub0(listen=uri, recv_timeout=500)
        # b'' 구독은 모든 토픽과 일치합니다.
        topics = kwargs.pop('topics', None) or b''
        socket.subscribe(topics)
        # 이 소켓을 큐로 취급합니다.
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self, block):
        data = None
        # 인터럽트되지 않은 상태이고 소켓에서 데이터가 수신될 때까지 루프를 계속합니다.
        while not interrupted:
            try:
                data = self.queue.recv(block=block)
                break
            except pynng.Timeout:
                pass
            except pynng.Closed:  # Ctrl-C를 누를 때 가끔 발생합니다.
                break
        if data is None:
            return None
        # 발행자로부터 전송된 로깅 이벤트를 가져옵니다.
        event = json.loads(data.decode('utf-8'))
        return logging.makeLogRecord(event)

    def enqueue_sentinel(self):
        # 이 구현에서는 사용하지 않습니다. 소켓이 실제로 큐가 아니기 때문입니다.
        pass

logging.getLogger('pynng').propagate = False
listener = NNGSocketListener(DEFAULT_ADDR, logging.StreamHandler(), topics=b'')
listener.start()
print('종료하려면 Ctrl-C를 누르십시오.')
try:
    while True:
        pass
except KeyboardInterrupt:
    interrupted = True
finally:
    listener.stop()

QueueHandler 클래스 상속

# sender.py
import json
import logging
import logging.handlers
import time
import random

import pynng

DEFAULT_ADDR = "tcp://localhost:13232"

class NNGSocketHandler(logging.handlers.QueueHandler):

    def __init__(self, uri):
        socket = pynng.Pub0(dial=uri, send_timeout=500)
        super().__init__(socket)

    def enqueue(self, record):
        # 레코드를 UTF-8로 인코딩된 JSON으로 전송합니다.
        d = dict(record.__dict__)
        data = json.dumps(d)
        self.queue.send(data.encode('utf-8'))

    def close(self):
        self.queue.close()

logging.getLogger('pynng').propagate = False
handler = NNGSocketHandler(DEFAULT_ADDR)
# 프로세스 ID를 출력에서 확인하도록 설정합니다.
logging.basicConfig(level=logging.DEBUG,
                    handlers=[logging.StreamHandler(), handler],
                    format='%(levelname)-8s %(name)10s %(process)6s %(message)s')
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)
logger_names = ('myapp', 'myapp.lib1', 'myapp.lib2')
msgno = 1
while True:
    # 임의로 일부 로거와 레벨을 선택하여 로그를 생성합니다.
    level = random.choice(levels)
    logger = logging.getLogger(random.choice(logger_names))
    logger.log(level, 'Message no. %5d' % msgno)
    msgno += 1
    delay = random.random() * 2 + 0.5
    time.sleep(delay)

위의 두 스니펫을 별도의 명령어 셸에서 실행할 수 있습니다. 리스너를 한 셸에서, 전송기를 두 개의 별도 셸에서 실행하면 다음과 같은 결과를 확인할 수 있습니다. 첫 번째 발신기 셸에서는:

$ python sender.py
DEBUG         myapp    613 Message no.     1
WARNING  myapp.lib2    613 Message no.     2
CRITICAL myapp.lib2    613 Message no.     3
WARNING  myapp.lib2    613 Message no.     4
CRITICAL myapp.lib1    613 Message no.     5
DEBUG         myapp    613 Message no.     6
CRITICAL myapp.lib1    613 Message no.     7
INFO     myapp.lib1    613 Message no.     8
(and so on)

두 번째 발신기 셸에서는:

$ python sender.py
INFO     myapp.lib2    657 Message no.     1
CRITICAL myapp.lib2    657 Message no.     2
CRITICAL      myapp    657 Message no.     3
CRITICAL myapp.lib1    657 Message no.     4
INFO     myapp.lib1    657 Message no.     5
WARNING  myapp.lib2    657 Message no.     6
CRITICAL      myapp    657 Message no.     7
DEBUG    myapp.lib1    657 Message no.     8
(and so on)

리스너 셸에서는:

$ python listener.py
Press Ctrl-C to stop.
DEBUG         myapp    613 Message no.     1
WARNING  myapp.lib2    613 Message no.     2
INFO     myapp.lib2    657 Message no.     1
CRITICAL myapp.lib2    613 Message no.     3
CRITICAL myapp.lib2    657 Message no.     2
CRITICAL      myapp    657 Message no.     3
WARNING  myapp.lib2    613 Message no.     4
CRITICAL myapp.lib1    613 Message no.     5
CRITICAL myapp.lib1    657 Message no.     4
INFO     myapp.lib1    657 Message no.     5
DEBUG         myapp    613 Message no.     6
WARNING  myapp.lib2    657 Message no.     6
CRITICAL      myapp    657 Message no.     7
CRITICAL myapp.lib1    613 Message no.     7
INFO     myapp.lib1    613 Message no.     8
DEBUG    myapp.lib1    657 Message no.     8
(and so on)

보시다시피, 두 개의 발신 프로세스로부터의 로깅이 리스너의 출력에서 교차되어 나타납니다.

딕셔너리 기반 구성의 예

다음은 로깅 구성 딕셔너리의 예입니다 - 장고 프로젝트 설명서 에서 가져왔습니다. 이 딕셔너리를 dictConfig() 로 전달하여 구성을 적용합니다:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
        'simple': {
            'format': '{levelname} {message}',
            'style': '{',
        },
    },
    'filters': {
        'special': {
            '()': 'project.logging.SpecialFilter',
            'foo': 'bar',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'simple',
        },
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'filters': ['special']
        }
    },
    'loggers': {
        'django': {
            'handlers': ['console'],
            'propagate': True,
        },
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': False,
        },
        'myproject.custom': {
            'handlers': ['console', 'mail_admins'],
            'level': 'INFO',
            'filters': ['special']
        }
    }
}

이 구성에 대한 더 자세한 정보는 장고 설명서의 관련 섹션 을 참조하세요.

rotator와 namer를 사용해서 로그 회전 처리하기

namer와 rotator를 정의하는 방법을 보여주는 실행 가능한 스크립트가 다음 예시입니다. 이 스크립트는 로그 파일의 gzip 압축을 보여줍니다:

import gzip
import logging
import logging.handlers
import os
import shutil

def namer(name):
    return name + ".gz"

def rotator(source, dest):
    with open(source, 'rb') as f_in:
        with gzip.open(dest, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    os.remove(source)


rh = logging.handlers.RotatingFileHandler('rotated.log', maxBytes=128, backupCount=5)
rh.rotator = rotator
rh.namer = namer

root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(rh)
f = logging.Formatter('%(asctime)s %(message)s')
rh.setFormatter(f)
for i in range(1000):
    root.info(f'Message no. {i + 1}')

이 코드를 실행한 후에는 여섯 개의 새로운 파일을 볼 수 있으며, 그 중 다섯 개는 압축되어 있습니다:

$ ls rotated.log*
rotated.log       rotated.log.2.gz  rotated.log.4.gz
rotated.log.1.gz  rotated.log.3.gz  rotated.log.5.gz
$ zcat rotated.log.1.gz
2023-01-20 02:28:17,767 Message no. 996
2023-01-20 02:28:17,767 Message no. 997
2023-01-20 02:28:17,767 Message no. 998

좀 더 정교한 multiprocessing 예제

다음 동작하는 예제에서는 구성 파일을 사용하여 로깅을 multiprocessing과 함께 사용하는 방법을 보여줍니다. 구성은 매우 간단하지만, 실제 multiprocessing 시나리오에서 더 복잡한 구성을 구현할 수 있음을 예시합니다.

이 예에서, 주 프로세스는 리스너 프로세스와 몇 개의 작업자 프로세스를 생성합니다. 주 프로세스, 리스너 및 작업자를 위한 세 가지 구성이 있습니다 (모든 작업자는 같은 구성을 공유합니다). 주 프로세스에서의 로깅, 작업자가 QueueHandler에 로그 하는 방법, 그리고 리스너가 QueueListener 및 더욱 복잡한 로깅 구성을 구현하고 큐를 통해 수신한 이벤트를 구성에서 지정된 처리기로 전달하도록 배치하는 는 방법을 볼 수 있습니다. 이러한 구성은 설명을 위한 것이지만, 이 예제를 여러분 자신의 시나리오에 적용할 수 있어야 합니다.

스크립트는 다음과 같습니다 - 독스트링과 주석이 어떻게 작동하는지 잘 설명하기를 바랍니다:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time

class MyHandler:
    """
    로깅 이벤트를 위한 간단한 핸들러입니다. 리스너 프로세스에서 실행되며
    수신된 레코드의 이름에 따라 로거에 이벤트를 전달합니다.
    그 후 로깅 시스템에 의해 해당 로거에 구성된 핸들러로 전달됩니다.
    """

    def handle(self, record):
        if record.name == "root":
            logger = logging.getLogger()
        else:
            logger = logging.getLogger(record.name)

        if logger.isEnabledFor(record.levelno):
            # 프로세스 이름은 파일 및 콘솔에 로깅을 수행하는 것이
            # 리스너임을 보여주기 위해 변환됩니다.
            record.processName = '%s (for %s)' % (current_process().name, record.processName)
            logger.handle(record)

def listener_process(q, stop_event, config):
    """
    메인 프로세스에서 수행할 수도 있지만, 예시를 위해 별도의 프로세스에서
    수행됩니다.

    지정된 구성에 따라 로깅을 초기화하고, 리스너를 시작한 다음
    메인 프로세스가 이벤트를 통해 완료 신호를 보낼 때까지 기다립니다.
    그 후 리스너가 중지되고 프로세스가 종료됩니다.
    """
    logging.config.dictConfig(config)
    listener = logging.handlers.QueueListener(q, MyHandler())
    listener.start()
    if os.name == 'posix':
        # POSIX에서 setup 로거는 부모 프로세스에서 구성되었겠지만,
        # dictConfig 호출 이후에는 비활성화되어야 합니다.
        # Windows에서는 fork가 사용되지 않으므로 자식 프로세스에 setup
        # 로거가 존재하지 않으며, 새로 생성되어 메시지가 나타날 것입니다.
        # 따라서 "if posix" 절을 사용합니다.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    stop_event.wait()
    listener.stop()

def worker_process(config):
    """
    예시를 위해 여러 개가 생성됩니다. 실제로는 서로 동일한 프로세스보다는
    서로 다른 프로세스 집합일 수 있습니다.

    지정된 구성에 따라 로깅을 초기화하고, 임의로 선택된 로거에 임의의
    레벨로 100개의 메시지를 로깅합니다.

    다른 프로세스가 실행될 기회를 주기 위해 짧은 대기(sleep)가 추가되었습니다.
    엄밀히 말하면 필요하지는 않지만, 이를 생략했을 때보다 서로 다른 프로세스의
    출력이 좀 더 섞이게 됩니다.
    """
    logging.config.dictConfig(config)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    if os.name == 'posix':
        # POSIX에서 setup 로거는 부모 프로세스에서 구성되었겠지만,
        # dictConfig 호출 이후에는 비활성화되어야 합니다.
        # Windows에서는 fork가 사용되지 않으므로 자식 프로세스에 setup
        # 로거가 존재하지 않으며, 새로 생성되어 메시지가 나타날 것입니다.
        # 따라서 "if posix" 절을 사용합니다.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)
        time.sleep(0.01)

def main():
    q = Queue()
    # 메인 프로세스는 콘솔에 출력하는 간단한 구성을 가집니다.
    config_initial = {
        'version': 1,
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO'
            }
        },
        'root': {
            'handlers': ['console'],
            'level': 'DEBUG'
        }
    }
    # 워커 프로세스 구성은 루트 로거에 연결된 QueueHandler일 뿐이며,
    # 이를 통해 모든 메시지를 큐로 보낼 수 있습니다.
    # 부모 프로세스에서 사용된 "setup" 로거를 비활성화하기 위해 기존 로거를
    # 비활성화합니다. fork() 이후 자식 프로세스에 로거가 존재하게 되는
    # POSIX에서 이것이 필요합니다.
    config_worker = {
        'version': 1,
        'disable_existing_loggers': True,
        'handlers': {
            'queue': {
                'class': 'logging.handlers.QueueHandler',
                'queue': q
            }
        },
        'root': {
            'handlers': ['queue'],
            'level': 'DEBUG'
        }
    }
    # 리스너 프로세스 구성은 이벤트를 원하는 핸들러로 전달하기 위해
    # 로깅 구성의 모든 유연성을 사용할 수 있음을 보여줍니다.
    # 부모 프로세스에서 사용된 "setup" 로거를 비활성화하기 위해 기존 로거를
    # 비활성화합니다. fork() 이후 자식 프로세스에 로거가 존재하게 되는
    # POSIX에서 이것이 필요합니다.
    config_listener = {
        'version': 1,
        'disable_existing_loggers': True,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            },
            'simple': {
                'class': 'logging.Formatter',
                'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'formatter': 'simple',
                'level': 'INFO'
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'formatter': 'detailed',
                'level': 'ERROR'
            }
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'handlers': ['console', 'file', 'errors'],
            'level': 'DEBUG'
        }
    }
    # 부모 프로세스에서의 로깅이 정상적으로 작동함을 보여주기 위해
    # 몇 가지 초기 이벤트를 로깅합니다.
    logging.config.dictConfig(config_initial)
    logger = logging.getLogger('setup')
    logger.info('About to create workers ...')
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1),
                     args=(config_worker,))
        workers.append(wp)
        wp.start()
        logger.info('Started worker: %s', wp.name)
    logger.info('About to create listener ...')
    stop_event = Event()
    lp = Process(target=listener_process, name='listener',
                 args=(q, stop_event, config_listener))
    lp.start()
    logger.info('Started listener')
    # 이제 워커가 작업을 마칠 때까지 대기합니다.
    for wp in workers:
        wp.join()
    # 워커가 모두 완료되었으므로 이제 리스너를 중지할 수 있습니다.
    # 부모 프로세스에서의 로깅은 여전히 정상적으로 작동합니다.
    logger.info('Telling listener to stop ...')
    stop_event.set()
    lp.join()
    logger.info('All done.')

if __name__ == '__main__':
    main()

SysLogHandler로 전송된 메시지에 BOM 삽입하기

RFC 5424 는 유니코드 메시지가 다음 구조를 갖는 바이트들로 syslog 데몬에 전송되어야 함을 요구합니다: 선택적인 순수 ASCII 구성 요소, 그 뒤를 이어 UTF-8 바이트 순서 표식 (BOM), 그 뒤를 이어 UTF-8으로 인코딩된 유니코드. (이 규격의 관련 절 을 참조하십시오.)

파이썬 3.1에서, BOM을 메시지에 삽입하는 코드가 SysLogHandler 에 추가되었지만, 유감스럽게도, BOM이 메시지의 시작 부분에 나타나서 그 앞에 순수 ASCII 구성 요소를 허락하지 않도록 잘못 구현되었습니다.

이 동작이 잘못됨에 따라, 잘못된 BOM 삽입 코드가 파이썬 3.2.4 이상에서 제거되었습니다. 그러나, 올바른 코드로 대체되지는 않았고, BOM을 포함하고, 그 앞에 순수 ASCII 시퀀스, 그 뒤에 UTF-8으로 인코딩된 임의의 유니코드로 구성된 RFC 5424-호환 메시지를 생성하려는 경우 다음과 같이 해야 합니다:

  1. Formatter 인스턴스를 SysLogHandler 인스턴스에 다음과 같은 포맷 문자열과 함께 첨부하십시오:

    'ASCII section\ufeffUnicode section'
    

    유니코드 코드 포인트 U+FEFF는, UTF-8을 사용하여 인코딩될 때, UTF-8 BOM으로 인코딩됩니다 – 바이트열 b'\xef\xbb\xbf'.

  2. ASCII section을 원하는 자리 표시기로 바꾸십시오. 그러나 치환 후 나타나는 데이터가 항상 ASCII임미 보장되어야 합니다 (그렇게 되면, UTF-8 인코딩 이후에는 변경되지 않은 채로 유지됩니다).

  3. Unidcode section을 원하는 자리 표시기로 바꾸십시오; 치환 후 나타나는 데이터에 ASCII 범위를 벗어나는 문자가 포함되어 있어도 괜찮습니다 – UTF-8을 사용하여 인코딩됩니다.

포맷된 된 메시지는 SysLogHandler 에 의해 UTF-8 인코딩을 사용하여 인코딩*됩니다*. 위의 규칙을 따르는 경우, RFC 5424-호환 메시지를 생성할 수 있어야 합니다. 그렇지 않으면, logging이 불평하지 않을 수도 있지만, 메시지가 RFC 5424와 호환되지 않고 syslog 데몬이 불평 할 수 있습니다.

구조적 로깅 구현

대부분의 로깅 메시지는 사람이 읽을 수 있도록 만들어졌기 때문에 쉽게 기계에서 파싱 할 수 없지만, 프로그램에서 (복잡한 정규식을 사용하지 않고도) 구문 분석할 수 있는 구조화된 포맷으로 메시지를 출력하려는 상황이 있을 수 있습니다. 이것은 logging 패키지를 사용하여 쉽게 달성 할 수 있습니다. 이것이 달성될 수 있는 여러 가지 방법이 있지만, 다음은 JSON을 사용하여 기계가 파싱할 수 있는 방식으로 이벤트를 직렬화하는 간단한 접근법입니다:

import json
import logging

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        return '%s >>> %s' % (self.message, json.dumps(self.kwargs))

_ = StructuredMessage   # 가독성 향상을 위해 선택 사항

logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('메시지 1', foo='bar', bar='baz', num=123, fnum=123.456))

위의 스크립트가 실행되면 다음과 같이 인쇄됩니다:

message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}

항목의 순서는 사용된 파이썬 버전에 따라 다를 수 있습니다.

좀 더 특별한 처리가 필요한 경우, 다음 예제와 같이 사용자 정의 JSON 인코더를 사용할 수 있습니다:

import json
import logging


class Encoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, set):
            return tuple(o)
        elif isinstance(o, str):
            return o.encode('unicode_escape').decode('ascii')
        return super().default(o)

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        s = Encoder().encode(self.kwargs)
        return '%s >>> %s' % (self.message, s)

_ = StructuredMessage   # 가독성을 높이기 위한 선택적 사용 함수

def main():
    logging.basicConfig(level=logging.INFO, format='%(message)s')
    logging.info(_('메시지 1', set_value={1, 2, 3}, snowman='\u2603'))

if __name__ == '__main__':
    main()

위의 스크립트를 실행하면 다음과 같이 인쇄합니다:

message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}

항목의 순서는 사용된 파이썬 버전에 따라 다를 수 있습니다.

dictConfig()로 처리기를 사용자 정의하기

특정 상황에서 로깅 처리기를 사용자 정의하고 싶을 때가 있고, dictConfig()를 사용하고 있다면 서브 클래스를 만들지 않고도 이 작업을 수행 할 수 있습니다. 예를 들어, 로그 파일의 소유권을 설정하고 싶다고 합시다. POSIX에서, shutil.chown() 을 사용하면 쉽게 할 수 있지만, 표준 라이브러리의 파일 처리기는 내장된 지원을 제공하지 않습니다. 다음과 같은 일반 함수를 사용하여 처리기 생성을 사용자 정의 할 수 있습니다:

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

그런 다음, dictConfig() 에 전달되는 로깅 구성에서, 이 함수를 호출하여 로깅 처리기를 생성하도록 지정할 수 있습니다:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # 아래 값들은 이 딕셔너리에서 추출되어 핸들러를 생성하고,
            # 핸들러의 레벨 및 포매터를 설정하는 데 사용됩니다.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # 아래 값들은 핸들러 생성 호출 가능자에
            # 키워드 인자 형태로 전달됩니다.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

이 예제에서는 단지 예를 들기 위해 pulse 라는 사용자와 그룹을 사용하여 소유권을 설정합니다. 작동하는 스크립트 chowntest.py 로 정리하면:

import logging, logging.config, os, shutil

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # 아래 값들은 이 딕셔너리에서 추출되어 핸들러를 생성하고,
            # 핸들러의 레벨 및 포매터를 설정하는 데 사용됩니다.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # 아래 값들은 핸들러 생성 호출 가능자에
            # 키워드 인자 형태로 전달됩니다.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('디버그 메시지 추가됨')

이것을 실행하기 위해서는, 아마도 root 로 실행해야 할 것입니다:

$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log

이 예제는 shutil.chown() 이 등장한 파이썬 3.3을 사용합니다. 이 접근법은 dictConfig()를 지원하는 모든 파이썬 버전에서 작동합니다 - 파이썬 2.7, 3.2 이상. 3.3 이전 버전의 경우, (예를 들어) os.chown() 을 사용하여 실제 소유권 변경을 구현해야 합니다.

실제로는, 처리기 생성 함수가 프로젝트 어딘가에 있는 유틸리티 모듈에 있을 수 있습니다. 구성에 있는 다음과 같은 줄 대신:

'()': owned_file_handler,

예를 들면 이렇게 쓸 수 있습니다:

'()': 'ext://project.util.owned_file_handler',

여기서 project.util 은 함수가 있는 패키지의 실제 이름으로 바꿀 수 있습니다. 위의 작업 스크립트에서 'ext://__main__.owned_file_handler' 를 사용해도 됩니다. 여기서, 실제 콜러블은 ext:// 스펙으로부터 dictConfig() 에 의해 결정됩니다.

이 예제는 희망하건대 다른 형태의 파일 변경을 - 예를 들어 특정 POSIX 권한 비트 설정 - 같은 방법으로 (os.chmod() 를 사용해서) 구현하는 방법도 알려줍니다.

물론 이 접근법은 FileHandler 이외의 처리기 유형으로도 확장될 수 있습니다 - 예를 들어, 회전 파일 처리기 중 하나 또는 다른 유형의 처리기 모두.

응용 프로그램 전체에서 특정 포맷 스타일 사용하기

파이썬 3.2에서, Formatterstyle 키워드 매개변수를 얻었는데, 이전 버전과의 호환성을 위해 % 를 기본값으로 사용하면서 { 또는 $ 를 지정하면 str.format()string.Template 에 의해 지원되는 포매팅 접근법을 사용할 수 있도록 합니다. 이것은 로그 되는 최종 출력으로 로깅 메시지를 포매팅하는 것과 관계된 것이고, 개별 로깅 메시지가 만들어지는 방법과는 무관함에 주의하십시오.

로깅 호출(debug(), info() 등)은 실제 로깅 메시지 자체를 위해서는 위치 매개 변수만을 취하고, 키워드 매개 변수는 실제 로깅 호출을 어떻게 처리해야 하는지를 지정하는 옵션을 결정하는 용도로만 사용됩니다 (예: 트레이스백 정보를 로그 해야 할지를 가리키는 exc_info 키워드 매개 변수나 로그에 추가되는 문맥 정보를 나타내는 extra 키워드 매개 변수). 따라서 여러분은 str.format() 또는 string.Template 구문을 사용하여 직접 로깅 호출을 할 수 없습니다. 이는 내부적으로 logging 패키지가 %-포매팅을 사용하여 포맷 문자열과 변수 인자를 병합하기 때문입니다. 이전 버전과의 하위 호환성을 유지하는 한 이 상황은 변경되지 않을 것입니다. 기존 코드에 존재하는 모든 로깅 호출이 %-포맷 문자열을 사용하고 있기 때문입니다.

포맷 스타일을 특정 로거와 연관시키는 제안이 있었지만, 이전 버전과의 호환성 문제가 있는데, 기존 코드가 그 로거 이름으로 %-포매팅을 사용할 수 있기 때문입니다.

제삼자 라이브러리와 여러분의 코드 간에 상호 운용이 가능하도록 로깅 하려면, 개별 로깅 호출 수준에서 포매팅을 결정해야 합니다. 이렇게 할 때 대체 포매팅 스타일을 수용 할 수 있는 몇 가지 길이 열립니다.

LogRecord 팩토리 사용

파이썬 3.2에서, 위에서 언급 한 Formatter 변경 사항과 함께, logging 패키지는 setLogRecordFactory() 함수를 사용하여 사용자가 자신의 LogRecord 서브 클래스를 설정할 수 있는 기능을 얻었습니다. 이것을 사용하면, 원하는 일을 하도록 getMessage() 메서드를 재정의하는 여러분 자신의 LogRecord 서브 클래스를 설정할 수 있습니다. 이 메서드의 베이스 클래스 구현이 msg % args 포매팅이 일어나는 곳이며, 여러분이 대체 포매팅으로 치환할 수 있는 곳입니다; 그러나, 모든 포매팅 스타일을 지원하면서 다른 코드와의 상호 운용성을 보장하기 위해 %-포매팅을 기본값으로 사용하도록 주의해야 합니다. 또한, 베이스 구현과 마찬가지로 str(self.msg) 를 호출하도록 주의해야 합니다.

자세한 정보는 setLogRecordFactory()LogRecord 에 대한 레퍼런스 설명서를 참조하십시오.

사용자 정의 메시지 객체 사용

{}- 및 $-포매팅을 사용하여 개별 로그 메시지를 작성할 수 있는 또 다른, 아마도 더 간단한 방법이 있습니다. (임의의 객체를 메시지로 사용하기에서) 로깅 할 때 임의의 객체를 메시지 포맷 문자열로 사용할 수 있고, logging 패키지는 그 객체에 대해 str() 을 호출하여 실제 형식 문자열을 얻는다고 했던 것을 기억하실 수 있을 겁니다. 다음 두 클래스를 생각해봅시다:

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

이 중 하나를 포맷 문자열 대신 사용하면, {}- 또는 $-포매팅을 사용하여 포맷된 로그 출력의 “%(message)s”, “{message}” 또는 “$message” 자리에 나타나는 실제 “message” 부분을 만들 수 있습니다. 어떤 것을 로그 하고 싶을 때마다 클래스 이름을 사용하는 것이 다소 꼴사납다면, 메시지에 M 이나 _ 과 같은 별칭을 사용해서 더 쓸만하게 만들 수 있습니다 (또는 지역화에 _ 를 사용하고 있다면, 아마도 __).

이 접근법의 예가 아래에 나와 있습니다. 먼저, str.format() 를 사용하는 포매팅입니다:

>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)

두 번째로, string.Template 를 사용하는 포매팅입니다:

>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

참고 사항으로 말씀드리자면, 이 접근 방식은 상당한 성능 저하를 일으키지 않습니다. 실제 포맷팅이 발생하는 시점은 로깅 호출을 할 때가 아니라, 핸들러에 의해 로그 메시지가 실제로 출력되려 할 때 (그리고 그 경우에만) 발생하기 때문입니다. 따라서 다소 특이하게 느껴질 수 있는 부분은 괄호가 포맷 문자열과 인자 주변에 위치하는 것이지, 포맷 문자열에만 적용되는 것은 아니라는 점입니다. 이는 __ 표기가 위의 XXXMessage 클래스 중 하나를 호출하는 생성자용 문법 설탕(syntax sugar)이기 때문입니다.

dictConfig()로 필터 구성하기

dictConfig() 를 사용하여 필터를 구성할 수 있습니다. 하지만 처음에는 어떻게 해야 할지 명확하지 않을 수 있습니다 (그래서 이 조리법을 제공합니다). Filter 가 표준 라이브러리에 포함된 유일한 필터 클래스이고, 많은 요구 사항을 충족시키지는 않을 것이기 때문에 (오직 베이스 클래스로 제공됩니다), 일반적으로 filter() 메서드를 재정의하는 여러분 자신의 Filter 서브 클래스를 정의할 필요가 있습니다. 이렇게 하려면, 필터를 생성하는 데 사용될 콜러블을 필터의 구성 딕셔너리에 () 키로 지정하십시오 (클래스가 가장 분명하지만 Filter 인스턴스를 반환하는 콜러블은 모두 가능합니다). 다음은 완전한 예입니다:

import logging
import logging.config
import sys

class MyFilter(logging.Filter):
    def __init__(self, param=None):
        self.param = param

    def filter(self, record):
        if self.param is None:
            allow = True
        else:
            allow = self.param not in record.msg
        if allow:
            record.msg = 'changed: ' + record.msg
        return allow

LOGGING = {
    'version': 1,
    'filters': {
        'myfilter': {
            '()': MyFilter,
            'param': 'noshow',
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'filters': ['myfilter']
        }
    },
    'root': {
        'level': 'DEBUG',
        'handlers': ['console']
    },
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.debug('hello')
    logging.debug('hello - noshow')

이 예제는 인스턴스를 만드는 콜러블로 키워드 매개 변수 형식으로 구성 데이터를 전달하는 방법을 보여줍니다. 실행하면, 위의 스크립트는 다음을 인쇄합니다:

changed: hello

필터가 구성된 대로 작동하고 있음을 보여줍니다.

주목해야 할 몇 가지 추가 사항:

  • 구성에서 직접 참조할 수 없는 경우 (예를 들어, 다른 모듈에 있고 구성 딕셔너리가 있는 곳에서 직접 임포트 할 수 없는 경우), 외부 객체에 대한 액세스 에 설명된 대로 ext://... 형식을 사용할 수 있습니다. 예를 들어, 위의 예에서 MyFilter 대신 'ext://__main__.MyFilter' 를 사용할 수 있습니다.

  • 필터뿐만 아니라, 이 기술을 사용자 정의 처리기 및 포매터를 구성하는데 사용할 수도 있습니다. logging이 구성에서 사용자 정의 객체를 어떻게 지원하는지에 대한 더 많은 정보는 사용자 정의 객체 를 보시고, 위의 다른 요리책 조리법 dictConfig()로 처리기를 사용자 정의하기 도 보십시오.

사용자 정의된 예외 포매팅

예외 포매팅을 사용자 정의하고 싶을 때가 있습니다 - 논쟁의 여지는 있지만, 예외 정보가 포함된 경우에도 이벤트 당 정확히 한 줄이 기록되기 원한다고 합시다. 다음 예제처럼, 사용자 정의 포매터 클래스를 사용할 수 있습니다:

import logging

class OneLineExceptionFormatter(logging.Formatter):
    def formatException(self, exc_info):
        """
        예외를 하나의 줄에 출력되도록 포맷합니다.
        """
        result = super().formatException(exc_info)
        return repr(result)  # 또는 원하는 방식으로 한 줄로 만듭니다.

    def format(self, record):
        s = super().format(record)
        if record.exc_text:
            s = s.replace('\n', '') + '|'
        return s

def configure_logging():
    fh = logging.FileHandler('output.txt', 'w')
    f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
                                  '%d/%m/%Y %H:%M:%S')
    fh.setFormatter(f)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(fh)

def main():
    configure_logging()
    logging.info('Sample message')
    try:
        x = 1 / 0
    except ZeroDivisionError as e:
        logging.exception('ZeroDivisionError: %s', e)

if __name__ == '__main__':
    main()

실행하면, 정확하게 두 줄의 파일이 생성됩니다:

28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: division by zero|'Traceback (most recent call last):\n  File "logtest7.py", line 30, in main\n    x = 1 / 0\nZeroDivisionError: division by zero'|

위의 처리는 단순하지만, 예외 정보를 원하는 대로 포맷하는 방법을 알려줍니다. traceback 모듈은 더욱 전문화된 요구에 도움이 될 수 있습니다.

로깅 메시지 말하기

로깅 메시지를 보여주는 대신 들려주는 것이 바람직한 상황이 있을 수 있습니다. 여러분의 시스템에 텍스트-음성 변환 (TTS) 기능이 있다면 쉽습니다, 파이썬 바인딩이 없어도 됩니다. 대부분의 TTS 시스템에는 실행할 수 있는 명령행 프로그램이 있으며, 이것을 subprocess 를 사용하여 처리기에서 호출 할 수 있습니다. 여기서 TTS 명령행 프로그램이 사용자와 상호 작용하거나, 완료하는 데 오랜 시간이 걸릴 것으로 기대되지 않으며, 로그 되는 메시지의 빈도가 메시지로 사용자를 압도할 정도로 높지 않으며, 메시지는 동시에 처리되지 않고 한 번에 하나씩 읽어도 된다고 가정합니다. 아래의 예제 구현은 다음 메시지가 처리되기 전에 하나의 메시지를 다 읽을 때까지 대기하고, 이 때문에 다른 처리기가 대기 상태로 유지될 수 있습니다. 다음은 espeak TTS 패키지가 사용 가능하다고 가정하는 접근법을 보여주는 간단한 예입니다:

import logging
import subprocess
import sys

class TTSHandler(logging.Handler):
    def emit(self, record):
        msg = self.format(record)
        # 여성 영어 목소리로 천천히 말합니다.
        cmd = ['espeak', '-s150', '-ven+f3', msg]
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        # 프로그램이 끝날 때까지 기다립니다.
        p.communicate()

def configure_logging():
    h = TTSHandler()
    root = logging.getLogger()
    root.addHandler(h)
    # 기본 포매터는 메시지만 반환합니다.
    root.setLevel(logging.DEBUG)

def main():
    logging.info('Hello')
    logging.debug('Goodbye')

if __name__ == '__main__':
    configure_logging()
    sys.exit(main())

실행하면, 이 스크립트는 여성 음성으로 “Hello”와 “Goodbye”를 차례대로 말합니다.

물론 위의 접근법은 다른 TTS 시스템과 명령행에서 실행되는 외부 프로그램을 통해 메시지를 처리 할 수 있는 전혀 다른 시스템에도 적용될 수 있습니다.

로깅 메시지를 버퍼링하고 조건부 출력하기

임시 영역에 메시지를 기록하고 특정 조건이 발생할 때만 메시지를 출력하려는 상황이 있을 수 있습니다. 예를 들어, 함수에서 디버그 이벤트를 로깅 하기를 원할 수 있습니다. 함수가 에러 없이 완료되면 수집된 디버그 정보로 로그를 어지럽히고 싶지 않지만, 에러가 있으면 에러뿐만 아니라 모든 디버그 정보를 출력하고 싶습니다.

다음은 로깅이 이러한 방식으로 작동하기 원하는 함수에 데코레이터를 사용하여 이를 수행할 방법을 보여주는 예제입니다. logging.handlers.MemoryHandler 를 사용하는데, 어떤 상황이 발생할 때까지 로그 된 이벤트를 버퍼링할 수 있도록 하고, 때가 되면 버퍼링 된 이벤트들이 flush 됩니다 - 처리를 위해 다른 처리기(target 처리기)로 전달됩니다. 기본적으로, MemoryHandler 는 버퍼가 다 차거나 수준이 지정된 임계값보다 크거나 같은 이벤트가 발생하면 플러시 됩니다. 사용자 정의 플러시 동작을 원할 경우, 이 조리법을 MemoryHandler 의 더 특수한 서브 클래스와 함께 사용할 수 있습니다.

예제 스크립트에는 간단한 함수 foo 가 있는데, 모든 로그 수준을 순회하면서, 어떤 수준으로 로그 할지를 sys.stderr 에 쓴 다음, 그 수준으로 실제 메시지를 로깅 합니다. 매개 변수를 foo 에 전달할 수 있는데, 참이면 ERROR 및 CRITICAL 수준으로 로그 합니다 - 그렇지 않으면 DEBUG, INFO 및 WARNING 수준에서만 로그 합니다.

이 스크립트는 필요한 조건부 로깅을 수행할 데코레이터로 foo 를 데코레이트 하기만 합니다. 데코레이터는 로거를 매개 변수로 받고 데코레이트 된 함수가 호출되는 동안 메모리 처리기를 연결합니다. 데코레이터는 target 처리기, 플러싱이 발생해야 하는 수준 및 버퍼 용량(버퍼 된 레코드의 수)을 추가로 매개 변수로 받을 수 있습니다. 이것들은 각각 sys.stderr 로 쓰는 StreamHandler, logging.ERROR, 100 을 기본값으로 합니다.

스크립트는 다음과 같습니다:

import logging
from logging.handlers import MemoryHandler
import sys

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
    if target_handler is None:
        target_handler = logging.StreamHandler()
    if flush_level is None:
        flush_level = logging.ERROR
    if capacity is None:
        capacity = 100
    handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)

    def decorator(fn):
        def wrapper(*args, **kwargs):
            logger.addHandler(handler)
            try:
                return fn(*args, **kwargs)
            except Exception:
                logger.exception('call failed')
                raise
            finally:
                super(MemoryHandler, handler).flush()
                logger.removeHandler(handler)
        return wrapper

    return decorator

def write_line(s):
    sys.stderr.write('%s\n' % s)

def foo(fail=False):
    write_line('about to log at DEBUG ...')
    logger.debug('Actually logged at DEBUG')
    write_line('about to log at INFO ...')
    logger.info('Actually logged at INFO')
    write_line('about to log at WARNING ...')
    logger.warning('Actually logged at WARNING')
    if fail:
        write_line('about to log at ERROR ...')
        logger.error('Actually logged at ERROR')
        write_line('about to log at CRITICAL ...')
        logger.critical('Actually logged at CRITICAL')
    return fail

decorated_foo = log_if_errors(logger)(foo)

if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    write_line('Calling undecorated foo with False')
    assert not foo(False)
    write_line('Calling undecorated foo with True')
    assert foo(True)
    write_line('Calling decorated foo with False')
    assert not decorated_foo(False)
    write_line('Calling decorated foo with True')
    assert decorated_foo(True)

이 스크립트를 실행하면 다음과 같은 출력이 나타납니다.:

Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL

보시다시피, 실제 로깅 출력은 심각도가 ERROR 이상인 이벤트가 기록될 때만 발생하지만, 이 경우 심각도가 낮은 이전 이벤트도 기록됩니다.

물론 전통적인 데코레이션 수단을 쓸 수 있습니다.:

@log_if_errors(logger)
def foo(fail=False):
    ...

로깅 메시지를 이메일로 보내기, 버퍼링 포함

로그 메시지를 이메일을 통해 전송하는 방법을 설명하기 위해, 여러 개의 메시지가 하나의 이메일로 전송되게 하려면 (필요하고 선택적인 인수를 보려면 `-h`` 인수로 다운로드한 스크립트를 실행하십시오.)

import logging
import logging.handlers
import smtplib

class BufferingSMTPHandler(logging.handlers.BufferingHandler):
    def __init__(self, mailhost, port, username, password, fromaddr, toaddrs,
                 subject, capacity):
        logging.handlers.BufferingHandler.__init__(self, capacity)
        self.mailhost = mailhost
        self.mailport = port
        self.username = username
        self.password = password
        self.fromaddr = fromaddr
        if isinstance(toaddrs, str):
            toaddrs = [toaddrs]
        self.toaddrs = toaddrs
        self.subject = subject
        self.setFormatter(logging.Formatter("%(asctime)s %(levelname)-5s %(message)s"))

    def flush(self):
        if len(self.buffer) > 0:
            try:
                smtp = smtplib.SMTP(self.mailhost, self.mailport)
                smtp.starttls()
                smtp.login(self.username, self.password)
                msg = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n" % (self.fromaddr, ','.join(self.toaddrs), self.subject)
                for record in self.buffer:
                    s = self.format(record)
                    msg = msg + s + "\r\n"
                smtp.sendmail(self.fromaddr, self.toaddrs, msg)
                smtp.quit()
            except Exception:
                if logging.raiseExceptions:
                    raise
            self.buffer = []

if __name__ == '__main__':
    import argparse

    ap = argparse.ArgumentParser()
    aa = ap.add_argument
    aa('host', metavar='HOST', help='SMTP server')
    aa('--port', '-p', type=int, default=587, help='SMTP port')
    aa('user', metavar='USER', help='SMTP username')
    aa('password', metavar='PASSWORD', help='SMTP password')
    aa('to', metavar='TO', help='Addressee for emails')
    aa('sender', metavar='SENDER', help='Sender email address')
    aa('--subject', '-s',
       default='Test Logging email from Python logging module (buffering)',
       help='Subject of email')
    options = ap.parse_args()
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    h = BufferingSMTPHandler(options.host, options.port, options.user,
                             options.password, options.sender,
                             options.to, options.subject, 10)
    logger.addHandler(h)
    for i in range(102):
        logger.info("Info index = %d", i)
    h.flush()
    h.close()

이 스크립트를 실행하고 SMTP 서버가 올바르게 설정되어 있다면, 지정한 수신자에게 총 11개의 이메일이 전송되는 것을 확인할 수 있습니다. 처음 열 개의 이메일 각각에는 10개의 로그 메시지가 포함되며, 열한 번째 이메일에는 두 개의 메시지가 포함됩니다. 이는 스크립트에 명시된 대로 총 102개의 메시지를 구성합니다.

구성을 통해 UTC(GMT)로 시간을 포맷하기

때로는 UTC를 사용하여 시간을 포맷하고 싶을 수 있는데, 아래에 표시된 UTCFormatter 와 같은 클래스를 사용하면 이를 수행할 수 있습니다:

import logging
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

이제 Formatter 대신 코드에서 UTCFormatter 를 사용할 수 있습니다. 구성을 통해 이를 수행하려면, 다음에 나오는 완전한 예제에 의해 설명된 접근법으로 dictConfig() API를 사용할 수 있습니다:

import logging
import logging.config
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'utc': {
            '()': UTCFormatter,
            'format': '%(asctime)s %(message)s',
        },
        'local': {
            'format': '%(asctime)s %(message)s',
        }
    },
    'handlers': {
        'console1': {
            'class': 'logging.StreamHandler',
            'formatter': 'utc',
        },
        'console2': {
            'class': 'logging.StreamHandler',
            'formatter': 'local',
        },
    },
    'root': {
        'handlers': ['console1', 'console2'],
   }
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.warning('The local time is %s', time.asctime())

이 스크립트를 실행하면, 다음과 같은 내용을 인쇄합니다:

2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015

시간이 한 처리기에서는 UTC로, 다른 처리기에서는 지역 시간으로 포맷되는 것을 보여줍니다.

선택적 로깅을 위해 컨텍스트 관리자 사용하기

로깅 구성을 일시적으로 변경하고 무언가를 한 후에 되돌리는 것이 유용할 때가 있습니다. 이를 위해, 컨텍스트 관리자는 로깅 컨텍스트를 저장하고 복원하는 가장 분명한 방법입니다. 다음은 그러한 컨텍스트 관리자의 간단한 예입니다. 컨텍스트 관리자의 범위 안에서 선택적으로 로깅 수준을 변경하고 로깅 처리기를 추가 할 수 있습니다:

import logging
import sys

class LoggingContext:
    def __init__(self, logger, level=None, handler=None, close=True):
        self.logger = logger
        self.level = level
        self.handler = handler
        self.close = close

    def __enter__(self):
        if self.level is not None:
            self.old_level = self.logger.level
            self.logger.setLevel(self.level)
        if self.handler:
            self.logger.addHandler(self.handler)

    def __exit__(self, et, ev, tb):
        if self.level is not None:
            self.logger.setLevel(self.old_level)
        if self.handler:
            self.logger.removeHandler(self.handler)
        if self.handler and self.close:
            self.handler.close()
        # None을 암묵적으로 반환 => 예외를 가로채지 않음

수준 값을 지정하면, 로거의 수준은 컨텍스트 관리자가 적용되는 with 블록의 범위 안에서 해당 값으로 설정됩니다. 처리기를 지정하면, 블록 진입 시 로거에 추가되고 블록에서 빠져나갈 때 제거됩니다. 블록을 빠져나갈 때 처리기를 닫도록 관리자에게 요청할 수도 있습니다 - 더는 처리기가 필요하지 않으면 이렇게 할 수 있습니다.

작동 원리를 보여주기 위해, 다음 코드 블록을 위에 추가 할 수 있습니다:

if __name__ == '__main__':
    logger = logging.getLogger('foo')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    logger.info('1. This should appear just once on stderr.')
    logger.debug('2. This should not appear.')
    with LoggingContext(logger, level=logging.DEBUG):
        logger.debug('3. This should appear once on stderr.')
    logger.debug('4. This should not appear.')
    h = logging.StreamHandler(sys.stdout)
    with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
        logger.debug('5. This should appear twice - once on stderr and once on stdout.')
    logger.info('6. This should appear just once on stderr.')
    logger.debug('7. This should not appear.')

우리는 초기에 로거 수준을 INFO 로 설정합니다. 그래서 메시지 #1은 나타나고 메시지 #2는 나타나지 않습니다. 그다음에 with 블록에서 수준을 DEBUG 로 임시 변경하면, 메시지 #3이 나타납니다. 블록이 종료되면 로거 수준이 INFO 로 복원되므로, 메시지 #4가 표시되지 않습니다. 그다음 with 블록에서 수준을 다시 DEBUG 로 다시 설정하지만, sys.stdout 으로 쓰는 처리기도 추가합니다. 따라서 메시지 #5는 콘솔에 두 번 표시됩니다 (stderr 를 통해 한 번, stdout 을 통해 한 번). with 문장이 완료된 후에 상태는 이전과 같으므로, (메시지 #1처럼) 메시지 #6이 나타나고, (메시지 #2처럼) 메시지 #7은 보이지 않습니다.

이렇게 만든 스크립트를 실행하면, 결과는 다음과 같습니다:

$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

다시 실행하면서 stderr/dev/null 로 리디렉트하면, 다음과 같이 stdout 으로 출력된 메시지만 나타납니다:

$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.

다시 한번, 하지만 stdout/dev/null 로 리디렉트하면, 이렇게 됩니다:

$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

이 경우, stdout 에 인쇄된 메시지 #5는 예상대로 나타나지 않습니다.

물론 여기서 설명한 방법을 일반화 할 수 있습니다. 예를 들어 로깅 필터를 임시로 첨부 할 수 있습니다. 위의 코드는 파이썬 2와 파이썬 3에서 모두 작동합니다.

CLI 응용 프로그램 시작 템플릿

다음과 같은 것들을 하는 방법을 보여주는 예입니다:

  • 명령 줄 인자에 기반한 로깅 수준 사용하기

  • 별도의 파일에 있는 여러 부속 명령으로 분기하고, 모두 일관된 방식으로 같은 수준에서 로깅 하기

  • 간단하고 최소의 구성을 사용하기

어떤 서비스를 중지, 시작 또는 다시 시작하는 작업을 위한 명령 줄 응용 프로그램이 있다고 가정합니다. 이것은 예시의 목적을 위해 start.py, stop.pyrestart.py에 개별 명령이 구현되고, 응용 프로그램의 메인 스크립트는 app.py 파일이 되도록 구성할 수 있습니다. 명령 줄 인자를 통해 응용 프로그램의 상세도를 제어하려고 하고, logging.INFO를 기본값으로 한다고 더 가정해 봅시다. app.py를 작성할 수 있는 한 가지 방법은 다음과 같습니다:

import argparse
import importlib
import logging
import os
import sys

def main(args=None):
    scriptname = os.path.basename(__file__)
    parser = argparse.ArgumentParser(scriptname)
    levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
    parser.add_argument('--log-level', default='INFO', choices=levels, help='로깅 레벨 지정') # Added descriptive help text for translation example
    subparsers = parser.add_subparsers(dest='command',
                                       help='사용 가능한 명령:')
    start_cmd = subparsers.add_parser('start', help='서비스 시작')
    start_cmd.add_argument('name', metavar='이름',
                           help='시작할 서비스의 이름')
    stop_cmd = subparsers.add_parser('stop',
                                     help='하나 이상의 서비스를 중지합니다.')
    stop_cmd.add_argument('names', metavar='이름', nargs='+',
                          help='중지할 서비스의 이름')
    restart_cmd = subparsers.add_parser('restart',
                                        help='하나 이상의 서비스를 재시작합니다.')
    restart_cmd.add_argument('names', metavar='이름', nargs='+',
                             help='재시작할 서비스의 이름')
    options = parser.parse_args()
    # 명령을 디스패치하는 코드는 모두 이 파일에 있을 수 있습니다. 설명 목적으로만, 각 명령은 별도의 모듈에 구현했습니다.
    try:
        mod = importlib.import_module(options.command)
        cmd = getattr(mod, 'command')
    except (ImportError, AttributeError):
        print('명령어 \'%s\' 에 대한 코드를 찾을 수 없습니다.' % options.command) # Translation of error message
        return 1
    # 파일이나 딕셔너리에서 설정을 로드할 수 있습니다.
    logging.basicConfig(level=options.log_level,
                        format='%(levelname)s %(name)s %(message)s')
    cmd(options)

if __name__ == '__main__':
    sys.exit(main())

그리고 start, stoprestart 명령은 별도의 모듈로 구현할 수 있습니다, 가령 시작하려면:

# start.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    logger.debug('시작할 서비스: %s', options.name) # Translating the message string placeholder
    # 실제 명령 처리 로직을 여기에 구현합니다 ...
    logger.info('서비스 \'%s\'를 시작했습니다.', options.name)

그리고 멈추려면:

# stop.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '서비스 \'%s\'' % options.names[0] # Translating variable name for clarity (though not necessary, maintaining context is safer)
    else:
        plural = '개' # Changed plural marker to a more general Korean one suitable for mixed services/items
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services_list = services[:i] + ' 및 ' + services[i + 2:] # Changed connection word
    logger.debug('중지할 서비스: %s', services_list)
    # 실제 명령 처리 로직을 여기에 구현합니다 ...
    logger.info('서비스 %s를 중지했습니다.%s.', services_list, plural)

비슷하게, 다시 시작하려면:

# restart.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '서비스 \'%s\'' % options.names[0]
    else:
        plural = '개' # Changed plural marker to a more general Korean one suitable for mixed services/items
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services_list = services[:i] + ' 및 ' + services[i + 2:] # Changed connection word
    logger.debug('재시작할 서비스: %s', services_list)
    # 실제 명령 처리 로직을 여기에 구현합니다 ...
    logger.info('서비스 %s를 재시작했습니다.%s.', services_list, plural)

이 응용 프로그램을 기본 로그 수준으로 실행하면, 이런 출력을 얻습니다:

$ python app.py start foo
INFO start Started the 'foo' service.

$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

첫 번째 단어는 로깅 수준이고, 두 번째 단어는 이벤트가 로그 된 장소의 모듈이나 패키지 이름입니다.

로깅 수준을 변경하면, 로그로 전송되는 정보를 변경할 수 있습니다. 예를 들어, 우리가 더 많은 정보를 원한다면:

$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.

$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

그리고 덜 원한다면:

$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz

이 경우, WARNING 수준 이상으로 아무것도 로그 하지 않았으므로, 명령은 콘솔에 아무것도 인쇄하지 않습니다.

로깅을 위한 Qt GUI

가끔 발생하는 질문 중 하나는 GUI 응용 프로그램에 로깅하는 방법입니다. Qt 프레임워크는 PySide2 또는 PyQt5 라이브러리를 사용하는 인기 있는 교차 플랫폼 UI 프레임워크입니다.

다음 예는 Qt GUI에 로그 하는 방법을 보여줍니다. 이것은 콜러블을 취하는 간단한 QtHandler 클래스를 소개합니다. 이 클래스는 GUI 업데이트를 하는 메인 스레드의 슬롯이어야 합니다. 또한 UI 자체 (수동 로깅을 위한 버튼을 통해) 뿐만 아니라 백그라운드에서 작업하는 작업자 스레드에서 GUI에 로그 하는 방법을 보여주기 위해 작업자 스레드도 만듭니다 (여기서는, 단지 임의의 짧은 지연을 주고 임의의 수준으로 메시지를 로깅 합니다).

작업자 스레드는 threading 모듈 대신 Qt의 QThread 클래스를 사용하여 구현되는데, 다른 Qt 구성 요소와 더 잘 통합되는 QThread를 사용해야 하는 상황이 있기 때문입니다.

이 코드는 PySide6, PyQt6, PySide2 또는 PyQt5 의 최신 릴리스와 함께 작동해야 합니다. Qt의 이전 버전에도 접근 방식을 조정할 수 있어야 합니다. 더 자세한 정보는 코드 스니펫의 주석을 참조하십시오.

import logging
import random
import sys
import time

# 서로 다른 Qt 패키지 간의 사소한 차이 처리
try:
    from PySide6 import QtCore, QtGui, QtWidgets
    Signal = QtCore.Signal
    Slot = QtCore.Slot
except ImportError:
    try:
        from PyQt6 import QtCore, QtGui, QtWidgets
        Signal = QtCore.pyqtSignal
        Slot = QtCore.pyqtSlot
    except ImportError:
        try:
            from PySide2 import QtCore, QtGui, QtWidgets
            Signal = QtCore.Signal
            Slot = QtCore.Slot
        except ImportError:
            from PyQt5 import QtCore, QtGui, QtWidgets
            Signal = QtCore.pyqtSignal
            Slot = QtCore.pyqtSlot

logger = logging.getLogger(__name__)


#
# 신호(Signal)가 올바르게 초기화되려면 QObject 또는 그 하위 클래스에
# 포함되어야 합니다.
#
class Signaller(QtCore.QObject):
    signal = Signal(str, logging.LogRecord)

#
# Qt GUI로의 출력은 메인 스레드에서만 발생해야 합니다. 따라서 이 핸들러는
# 메인 스레드에서 실행되도록 설정된 슬롯 함수를 받도록 설계되었습니다.
# 이 예제에서 함수는 포맷된 로그 메시지인 문자열 인자와 이를 생성한
# 로그 레코드를 받습니다. 포맷된 문자열은 단지 편의를 위한 것이며,
# 슬롯 함수 자체에서 원하는 방식으로 출력용 문자열을 포맷할 수 있습니다.
#
# 원하는 GUI 업데이트를 수행하도록 슬롯 함수를 지정합니다. 핸들러는
# 특정 UI 요소에 대해 알거나 신경 쓰지 않습니다.
#
class QtHandler(logging.Handler):
    def __init__(self, slotfunc, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.signaller = Signaller()
        self.signaller.signal.connect(slotfunc)

    def emit(self, record):
        s = self.format(record)
        self.signaller.signal.emit(s, record)

#
# 이 예제는 QThread를 사용하며, 이는 파이썬 수준의 스레드 이름이
# "Dummy-1"과 같이 지정됨을 의미합니다. 아래 함수는 현재 스레드의
# Qt 이름을 가져옵니다.
#
def ctname():
    return QtCore.QThread.currentThread().objectName()


#
# 로깅을 위한 랜덤 레벨을 생성하는 데 사용됩니다.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)

#
# 이 워커(worker) 클래스는 메인 스레드와 별개의 스레드에서 수행되는 작업을 나타냅니다.
# 스레드가 작업을 시작하도록 하는 방법은 워커의 슬롯에 연결된
# 버튼 클릭을 통하는 것입니다.
#
# LogRecord의 기본 threadName 값은 큰 도움이 되지 않으므로, 위에서 계산된
# QThread 이름이 포함된 qThreadName을 추가하고, 해당 값을 LogRecord를
# QThread 이름으로 업데이트하는 데 사용되는 "extra" 딕셔너리에 전달합니다.
#
# 이 예제 워커는 메시지를 순차적으로 출력하며, 그 사이에 몇 초 정도의
# 랜덤한 지연 시간을 둡니다.
#
class Worker(QtCore.QObject):
    @Slot()
    def start(self):
        extra = {'qThreadName': ctname() }
        logger.debug('Started work', extra=extra)
        i = 1
        # 중단 요청이 있을 때까지 스레드를 실행합니다. 이를 통해
        # 합리적으로 깔끔하게 스레드를 종료할 수 있습니다.
        while not QtCore.QThread.currentThread().isInterruptionRequested():
            delay = 0.5 + random.random() * 2
            time.sleep(delay)
            try:
                if random.random() < 0.1:
                    raise ValueError('Exception raised: %d' % i)
                else:
                    level = random.choice(LEVELS)
                    logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
            except ValueError as e:
                logger.exception('Failed: %s', e, extra=extra)
            i += 1

#
# 이 요리법(cookbook) 예제를 위한 간단한 UI를 구현합니다. 다음을 포함합니다:
#
# * 포맷된 로그 메시지를 보관하는 읽기 전용 텍스트 편집 창
# * 별도의 스레드에서 작업을 시작하고 로깅을 수행하는 버튼
# * 메인 스레드에서 무언가를 로깅하는 버튼
# * 로그 창을 비우는 버튼
#
class Window(QtWidgets.QWidget):

    COLORS = {
        logging.DEBUG: 'black',
        logging.INFO: 'blue',
        logging.WARNING: 'orange',
        logging.ERROR: 'red',
        logging.CRITICAL: 'purple',
    }

    def __init__(self, app):
        super().__init__()
        self.app = app
        self.textedit = te = QtWidgets.QPlainTextEdit(self)
        # 플랫폼의 기본 고정 폭(monospace) 글꼴을 설정합니다.
        f = QtGui.QFont('nosuchfont')
        if hasattr(f, 'Monospace'):
            f.setStyleHint(f.Monospace)
        else:
            f.setStyleHint(f.StyleHint.Monospace)  # Qt6용
        te.setFont(f)
        te.setReadOnly(True)
        PB = QtWidgets.QPushButton
        self.work_button = PB('Start background work', self)
        self.log_button = PB('Log a message at a random level', self)
        self.clear_button = PB('Clear log window', self)
        self.handler = h = QtHandler(self.update_status)
        # 포맷 문자열에서 threadName 대신 qThreadName을 사용해야 함을 기억하세요.
        fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
        formatter = logging.Formatter(fs)
        h.setFormatter(formatter)
        logger.addHandler(h)
        # 종료 시 QThread를 종료하도록 설정합니다.
        app.aboutToQuit.connect(self.force_quit)

        # 모든 위젯을 배치합니다.
        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(te)
        layout.addWidget(self.work_button)
        layout.addWidget(self.log_button)
        layout.addWidget(self.clear_button)
        self.setFixedSize(900, 400)

        # 워커가 아닌 슬롯과 신호를 연결합니다.
        self.log_button.clicked.connect(self.manual_update)
        self.clear_button.clicked.connect(self.clear_display)

        # 새 워커 스레드를 시작하고 워커용 슬롯을 연결합니다.
        self.start_thread()
        self.work_button.clicked.connect(self.worker.start)
        # 시작되면 버튼이 비활성화되어야 합니다.
        self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))

    def start_thread(self):
        self.worker = Worker()
        self.worker_thread = QtCore.QThread()
        self.worker.setObjectName('Worker')
        self.worker_thread.setObjectName('WorkerThread')  # qThreadName용
        self.worker.moveToThread(self.worker_thread)
        # 워커 스레드에서 이벤트 루프를 시작합니다.
        self.worker_thread.start()

    def kill_thread(self):
        # 워커에게 중지를 요청한 다음, 종료를 요청하고 완료될 때까지 기다립니다.
        self.worker_thread.requestInterruption()
        if self.worker_thread.isRunning():
            self.worker_thread.quit()
            self.worker_thread.wait()
        else:
            print('worker has already exited.')

    def force_quit(self):
        # 창이 닫힐 때 사용됩니다.
        if self.worker_thread.isRunning():
            self.kill_thread()

    # 아래 함수들은 슬롯이 설정된 메인 스레드에서 실행되어 UI를 업데이트합니다.

    @Slot(str, logging.LogRecord)
    def update_status(self, status, record):
        color = self.COLORS.get(record.levelno, 'black')
        s = '<pre><font color="%s">%s</font></pre>' % (color, status)
        self.textedit.appendHtml(s)

    @Slot()
    def manual_update(self):
        # 이 함수는 전달된 포맷된 메시지를 사용하지만, 레코드의 정보를 사용하여
        # 심각도(레벨)에 따라 적절한 색상으로 메시지를 포맷합니다.
        level = random.choice(LEVELS)
        extra = {'qThreadName': ctname() }
        logger.log(level, 'Manually logged!', extra=extra)

    @Slot()
    def clear_display(self):
        self.textedit.clear()


def main():
    QtCore.QThread.currentThread().setObjectName('MainThread')
    logging.getLogger().setLevel(logging.DEBUG)
    app = QtWidgets.QApplication(sys.argv)
    example = Window(app)
    example.show()
    if hasattr(app, 'exec'):
        rc = app.exec()
    else:
        rc = app.exec_()
    sys.exit(rc)

if __name__=='__main__':
    main()

RFC5424 지원을 통한 syslog 로깅

비록 RFC 5424 날짜는 2009년에 사용되지만, 대부분의 syslog 서버는 기본적으로 2001년에서 유래한 이전 버전인 RFC 3164 를 사용하도록 설정되어 있습니다. logging 이 Python에 추가된 것은 2003년이었으며, 당시 이 프로토콜은 더 초기(그리고 유일하게 존재하는) 프로토콜을 지원했습니다. RFC 5424가 발표되었지만, syslog 서버에 광범위하게 배포되지 않았기 때문에 SysLogHandler 기능은 업데이트되지 않았습니다.

[msgid] RFC 5424 contains some useful features such as support for structured data, and if you need to be able to log to a syslog server with support for it, you can do so with a subclassed handler which looks something like this:

import datetime as dt
import logging.handlers
import re
import socket
import time

class SysLogHandler5424(logging.handlers.SysLogHandler):

    tz_offset = re.compile(r'([+-]\d{2})(\d{2})$')
    escaped = re.compile(r'([\]"\\])')

    def __init__(self, *args, **kwargs):
        self.msgid = kwargs.pop('msgid', None)
        self.appname = kwargs.pop('appname', None)
        super().__init__(*args, **kwargs)

    def format(self, record):
        version = 1 # 버전은 항상 1입니다.
        asctime = dt.datetime.fromtimestamp(record.created).isoformat()
        m = self.tz_offset.prefixmatch(time.strftime('%z'))
        has_offset = False
        if m and time.timezone:
            hrs, mins = m.groups()
            if int(hrs) or int(mins):
                has_offset = True # 시간대 오프셋이 있음
        if not has_offset:
            asctime += 'Z' # 오프셋이 없으면 Zulu(UTC)로 간주합니다.
        else:
            asctime += f'{hrs}:{mins}'
        try:
            hostname = socket.gethostname()
        except Exception:
            hostname = '-' # 호스트 이름을 얻는 데 실패하면 '-'를 사용합니다.
        appname = self.appname or '-'
        procid = record.process
        msgid = '-'
        msg = super().format(record)
        sdata = '-'
        if hasattr(record, 'structured_data'):
            sd = record.structured_data
            # 여기는 SD-ID가 키이고 PARAM-NAME을 PARAM-VALUE에 매핑하는 딕셔너리인 경우입니다. (자세한 내용은 RFC 참조)
            # 구조적 데이터 핸들링 예시: 실제 프로덕션 환경에서는 적절한 오류 확인이 필요합니다.
            parts = []

            def replacer(m):
                g = m.groups()
                return '\\' + g[0] # 백슬래시 이스케이프 처리

            for sdid, dv in sd.items():
                part = f'[{sdid}'
                for k, v in dv.items():
                    s = str(v)
                    s = self.escaped.sub(replacer, s) # 특수 문자를 이스케이프합니다.
                    part += f' {k}="{s}"'
                part += ']'
                parts.append(part)
            sdata = ''.join(parts)
        return f'{version} {asctime} {hostname} {appname} {procid} {msgid} {sdata} {msg}'

[msgid] You’ll need to be familiar with RFC 5424 to fully understand the above code, and it may be that you have slightly different needs (e.g. for how you pass structural data to the log). Nevertheless, the above should be adaptable to your speciric needs. With the above handler, you’d pass structured data using something like this:

sd = {
    'foo@12345': {'bar': 'baz', 'baz': 'bozz', 'fizz': r'buzz'},
    'foo@54321': {'rab': 'baz', 'zab': 'bozz', 'zzif': r'buzz'}
}
extra = {'structured_data': sd}
i = 1
logger.debug('Message %d', i, extra=extra)

How to treat a logger like an output stream

[msgid] Sometimes, you need to interface to a third-party API which expects a file-like object to write to, but you want to direct the API’s output to a logger. You can do this using a class which wraps a logger with a file-like API. Here’s a short script illustrating such a class:

import logging

LoggerWriter 클래스는 파일처럼 동작하는 객체에서 흔히 사용되는 로깅 래퍼입니다.  코드는 출력 스트림을 가로채서 로그 메시지가 개행 문자를 포함하여 여러 줄일 ,  줄이 별도의 로그 레벨과 함께 기록되도록 처리합니다.

class LoggerWriter:
    def __init__(self, logger, level):
        self.logger = logger
        self.level = level

    def write(self, message):
        if message != '\n':  # 새 줄만 출력하는 것을 방지하려면 메시지에 추가 내용을 넣으세요.
            self.logger.log(self.level, message)

    def flush(self):
        # 실제로 아무것도 하지 않지만, 파일 같은 객체라면 예상될 수 있는 메서드이므로 옵션입니다.
        pass

    def close(self):
        # 실제로 아무것도 하지 않지만, 파일 같은 객체라면 기대할 수 있는 동작입니다. 따라서 다음과 같이 플래그를 설정하여 나중에 write 호출 시 예외가 발생하도록 할 수도 있습니다.
        pass

def main():
    logging.basicConfig(level=logging.DEBUG)
    logger = logging.getLogger('demo')
    info_fp = LoggerWriter(logger, logging.INFO)
    debug_fp = LoggerWriter(logger, logging.DEBUG)
    print('INFO 메시지', file=info_fp)
    print('DEBUG 메시지', file=debug_fp)

if __name__ == "__main__":
    main()

이 스크립트가 실행되면 다음과 같이 인쇄됩니다:

INFO:demo:An INFO message
DEBUG:demo:A DEBUG message

다음과 같이 LoggerWriter 를 사용하여 sys.stdoutsys.stderr 을 리다이렉션할 수도 있습니다:

import sys

sys.stdout = LoggerWriter(logger, logging.INFO)
sys.stderr = LoggerWriter(logger, logging.WARNING)

이는 원하는 로깅 설정을 완료한 후에 해야 합니다. 위의 예제에서 basicConfig() 호출이 이 작업을 수행합니다 (LoggerWriter 인스턴스에 의해 덮어씌워지기 sys.stderr 값을 사용). 그런 다음 다음과 같은 결과를 얻게 됩니다:

>>> print('Foo')
INFO:demo:Foo
>>> print('Bar', file=sys.stderr)
WARNING:demo:Bar
>>>

물론, 위의 예제들은 `:func:`~logging.basicConfig`가 사용하는 형식에 따른 출력을 보여주지만, 로깅을 구성할 때 다른 포매터를 사용할 수 있습니다.

위 방식의 경우, 버퍼링과 우리가 가로채는 write 호출 순서에 다소 의존하게 된다는 점에 유의해야 합니다. 예를 들어 위에서 정의한 LoggerWriter 를 사용한다면 다음 코드를 가진 스니펫이 있다고 가정해 보겠습니다:

sys.stderr = LoggerWriter(logger, logging.WARNING)
1 / 0

그러면 이 스크립트를 실행하면 다음과 같은 결과가 발생합니다:

WARNING:demo:Traceback (most recent call last):

WARNING:demo:  File "/home/runner/cookbook-loggerwriter/test.py", line 53, in <module>

WARNING:demo:
WARNING:demo:main()
WARNING:demo:  File "/home/runner/cookbook-loggerwriter/test.py", line 49, in main

WARNING:demo:
WARNING:demo:1 / 0
WARNING:demo:ZeroDivisionError
WARNING:demo::
WARNING:demo:division by zero

보시다시피, 이 출력은 이상적이지 않습니다. 그 이유는 sys.stderr``에 쓰는 밑단 코드가 여러 쓰기를 수행하며, 각각의 쓰기 작업이 별도의 로그 라인(예: 위의 마지막 줄)을 생성하기 때문입니다. 문제를 해결하려면 무언가를 버퍼링하고 개행 문자가 보일 때만 로그 라인을 출력해야 합니다. 조금 나은 ``LoggerWriter 구현을 사용해 보겠습니다:

class BufferingLoggerWriter(LoggerWriter):
    def __init__(self, logger, level):
        super().__init__(logger, level)
        self.buffer = ''

    def write(self, message):
        if '\n' not in message:
            self.buffer += message
        else:
            parts = message.split('\n')
            if self.buffer:
                s = self.buffer + parts.pop(0)
                self.logger.log(self.level, s)
            self.buffer = parts.pop()
            for part in parts:
                self.logger.log(self.level, part)

이것은 단순히 새 줄이 나타날 때까지 정보를 버퍼링하고, 그 다음 완성된 줄을 기록합니다. 이 방법을 사용하면 더 좋은 출력을 얻을 수 있습니다:

WARNING:demo:Traceback (most recent call last):
WARNING:demo:  File "/home/runner/cookbook-loggerwriter/main.py", line 55, in <module>
WARNING:demo:    main()
WARNING:demo:  File "/home/runner/cookbook-loggerwriter/main.py", line 52, in main
WARNING:demo:    1/0
WARNING:demo:ZeroDivisionError: division by zero

로깅 출력에서 개행 문자를 일관되게 처리하는 방법

일반적으로 기록되는 메시지(콘솔이나 파일에)는 단일 텍스트 줄로 구성됩니다. 하지만 때로는 로깅 형식 문자열이 개행 문자를 포함하거나, 로깅된 데이터가 개행 문자를 포함하는 등 여러 줄의 메시지를 처리해야 할 필요가 있습니다. 이러한 메시들을 각 라인이 별도로 로그되었던 것처럼 일관되게 포매팅되도록 균일하게 처리하려면 다음과 같이 핸들러 믹신을 사용하면 됩니다:

# 이 코드는 mymixins.py 모듈에 있다고 가정합니다
import copy

class MultilineMixin:
    def emit(self, record):
        s = record.getMessage()
        if '\n' not in s:
            super().emit(record)
        else:
            lines = s.splitlines()
            rec = copy.copy(record)
            rec.args = None
            for line in lines:
                rec.msg = line
                super().emit(rec)

이 믹신을 다음 스크립트처럼 사용할 수 있습니다:

import logging

from mymixins import MultilineMixin

logger = logging.getLogger(__name__)

class StreamHandler(MultilineMixin, logging.StreamHandler):
    pass

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-9s %(message)s',
                        handlers = [StreamHandler()])
    logger.debug('Single line')
    logger.debug('Multiple lines:\nfool me once ...')
    logger.debug('Another single line')
    logger.debug('Multiple lines:\n%s', 'fool me ...\ncan\'t get fooled again')

스크립트를 실행하면 다음과 비슷한 출력이 인쇄됩니다:

2025-07-02 13:54:47,234 DEBUG     Single line
2025-07-02 13:54:47,234 DEBUG     Multiple lines:
2025-07-02 13:54:47,234 DEBUG     fool me once ...
2025-07-02 13:54:47,234 DEBUG     Another single line
2025-07-02 13:54:47,234 DEBUG     Multiple lines:
2025-07-02 13:54:47,234 DEBUG     fool me ...
2025-07-02 13:54:47,234 DEBUG     can't get fooled again

반면에, 만약 `log injection <https://owasp.org/www-community/attacks/Log_Injection>`_에 대한 우려가 있다면, 다음 예제와 같이 개행 문자를 이스케이프하는 포매터를 사용할 수 있습니다:

import logging

logger = logging.getLogger(__name__)

class EscapingFormatter(logging.Formatter):
    def format(self, record):
        s = super().format(record)
        return s.replace('\n', r'\n')

if __name__ == '__main__':
    h = logging.StreamHandler()
    h.setFormatter(EscapingFormatter('%(asctime)s %(levelname)-9s %(message)s'))
    logging.basicConfig(level=logging.DEBUG, handlers = [h])
    logger.debug('Single line')
    logger.debug('Multiple lines:\nfool me once ...')
    logger.debug('Another single line')
    logger.debug('Multiple lines:\n%s', 'fool me ...\ncan\'t get fooled again')

물론, 가장 적합하다고 생각되는 어떤 이스케이핑 방식이든 사용하실 수 있습니다. 스크립트를 실행하면 다음과 같은 출력을 생성해야 합니다:

2025-07-09 06:47:33,783 DEBUG     Single line
2025-07-09 06:47:33,783 DEBUG     Multiple lines:\nfool me once ...
2025-07-09 06:47:33,783 DEBUG     Another single line
2025-07-09 06:47:33,783 DEBUG     Multiple lines:\nfool me ...\ncan't get fooled again

이스케이프 동작은 하위 라이브러리 기본값이 될 수 없습니다, 왜냐하면 이는 이전 호환성을 깨뜨리기 때문입니다.

피해야 할 패턴

이전 섹션에서는 여러분이 수행하거나 처리해야 할 다양한 방법들을 설명했지만, 유용하지 않은, 따라서 대부분의 경우 피해야 하는 사용 패턴들도 언급할 가치가 있습니다. 아래 섹션들은 특별한 순서가 없습니다.

여러 로그 파일 열기

Windows에서는 일반적으로 같은 파일을 여러 번 열 수 없으며, 이 경우 “파일이 다른 프로세스에서 사용 중입니다” 오류가 발생합니다. 하지만 POSIX 플랫폼에서는 같은 파일을 여러 번 열어도 오류를 받지 않습니다. 이는 예를 들어 다음의 방식으로 실수로 수행될 수 있습니다:

  • 동일한 파일에 대한 핸들러를 두 번 추가하는 경우 (예: 복사/붙여넣기/변경을 잊은 오류).

  • 이름이 달라 보이지만 실제로는 하나가 다른 하나의 심볼릭 링크인 두 파일을 여는 경우.

  • 프로세스를 포크하고, 그 후 부모와 자식 모두 동일한 파일을 참조하게 되는 경우. 예를 들어 multiprocessing 모듈을 사용할 때 발생할 수 있습니다.

파일을 여러 번 여는 작업은 대부분의 경우 작동하는 것처럼 보일 수 있지만, 실제로 여러 문제를 초래할 수 있습니다:

  • 여러 스레드나 프로세스가 같은 파일에 쓰려고 시도하기 때문에 로깅 출력이 엉망이 될 수 있습니다. 로깅은 여러 스레드가 동일한 핸들러 인스턴스를 동시에 사용하는 것을 방지하지만, 두 개의 다른 핸들러 인스턴스를 사용하여 동시에 쓰기가 시도되는 경우에는 그러한 보호 장치가 없습니다.

  • 파일을 삭제하려는 시도가 (예: 파일 로테이션 중에) 아무런 경고 없이 실패하는 경우가 있습니다. 이는 다른 참조가 해당 파일을 가리키고 있기 때문입니다. 이로 인해 혼란과 시간 낭비되는 디버깅 시간이 발생할 수 있으며, 로그 항목이 예상치 못한 장소에 있거나 완전히 손실될 수 있습니다. 또는 이동해야 할 파일이 제자리에 남아 크기가 예상치 못하게 커질 수도 있습니다.

그러한 문제를 우회하려면 :ref:`multiple-processes`에서 설명하는 기술들을 사용하십시오.

클래스 내의 속성으로 로거를 사용하거나 파라미터로 전달하기

이것을 해야 하는 예외적인 경우가 있을 수 있지만, 일반적으로는 무의미합니다. 왜냐하면 로거는 싱글턴이기 때문입니다. 코드에서 logging.getLogger(name) 을 사용하여 이름으로 주어진 로거 인스턴스에 항상 접근할 수 있으므로, 인스턴스를 전달하고 속성으로 유지하는 것은 무의미합니다. Java 및 C#와 같은 다른 언어에서는 로거가 종종 정적 클래스 속성이라는 점에 유의하십시오. 그러나 이 패턴은 모듈(클래스가 아닌)이 소프트웨어 분해 단위이기 때문에 Python에서는 말이 되지 않습니다.

라이브러리의 로거에 :class:`~logging.NullHandler`가 아닌 다른 핸들러를 추가하기

로깅을 구성하는 것은 핸들러와 포맷터, 필터를 추가하는 것이지 라이브러리 개발자의 책임이 아닙니다. 만약 라이브러리를 유지 관리하고 있다면, NullHandler 인스턴스를 제외한 어떤 로거에도 핸들러를 추가하지 않도록 하십시오.

많은 수의 로거 생성하기

로거는 스크립트 실행 중에 절대 해제되지 않는 싱글턴이므로, 많은 로거를 생성하면 해제될 수 없는 메모리를 소모하게 됩니다. 파일 처리 또는 네트워크 연결마다 로거를 만들기보다는, :ref:`existing mechanisms <context-info>`를 사용하여 로그에 컨텍스트 정보를 전달하고, 생성되는 로거는 여러분의 애플리케이션 영역(일반적으로 모듈이지만 때로는 그보다 약간 더 세분화된 것을 의미합니다)을 기술하는 것으로 제한하십시오.

기타 자원

더 보기

모듈 logging

logging 모듈에 대한 API 레퍼런스

모듈 logging.config

logging 모듈용 구성 API.

모듈 logging.handlers

logging 모듈에 포함된 유용한 처리기.

기초 자습서

고급 자습서

분실물 보관소