로깅 요리책¶
- 저자
Vinay Sajip <vinay_sajip at red-dove dot com>
이 페이지는 과거에 유용했던 로깅 관련 조리법을 많이 포함하고 있습니다.
여러 모듈에서 로깅 사용하기¶
logging.getLogger('someLogger')
를 여러 번 호출하면 같은 로거 객체에 대한 참조가 반환됩니다. 같은 모듈 내에서뿐만 아니라, 같은 파이썬 인터프리터 프로세스에 있는 한, 여러 모듈에서도 마찬가지입니다. 참조가 같은 객체를 가리킨다는 것에 더해, 응용 프로그램 코드는 하나의 모듈에서 부모 로거를 정의 및 구성하고 별도의 모듈에서 자식 로거를 생성 (구성하지 않음) 할 수 있으며, 자식에 대한 모든 로거 호출은 부모로 전달됩니다. 다음은 메인 모듈입니다:
import logging
import auxiliary_module
# 'spam_application' 으로 로거 생성
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# 디버그 메시지도 기록하는 파일 처리기 생성
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# 더 높은 수준의 콘솔 처리기 생성
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 포매터를 만들고 처리기에 추가
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 처리기를 로거에 추가
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')
다음은 보조 모듈입니다:
import logging
# 로거 생성
module_logger = logging.getLogger('spam_application.auxiliary')
class Auxiliary:
def __init__(self):
self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
self.logger.info('creating an instance of Auxiliary')
def do_something(self):
self.logger.info('doing something')
a = 1 + 1
self.logger.info('done doing something')
def some_function():
module_logger.info('received a call to "some_function"')
출력은 이렇게 됩니다:
2005-03-23 23:47:11,663 - spam_application - INFO -
creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
done with auxiliary_module.some_function()
여러 스레드에서 로깅 하기¶
여러 스레드에서 로깅 하는데 특별한 노력이 필요하지는 않습니다. 다음 예제에서는 메인 (최초) 스레드와 다른 스레드에서의 로깅을 보여줍니다:
import logging
import threading
import time
def worker(arg):
while not arg['stop']:
logging.debug('Hi from myfunc')
time.sleep(0.5)
def main():
logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
info = {'stop': False}
thread = threading.Thread(target=worker, args=(info,))
thread.start()
while True:
try:
logging.debug('Hello from main')
time.sleep(0.75)
except KeyboardInterrupt:
info['stop'] = True
break
thread.join()
if __name__ == '__main__':
main()
실행하면 스크립트는 다음과 같이 인쇄합니다:
0 Thread-1 Hi from myfunc
3 MainThread Hello from main
505 Thread-1 Hi from myfunc
755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc
예상대로 로그 출력이 산재해 있음을 볼 수 있습니다. 물론, 이 방법은 여기에 표시된 것보다 많은 스레드에서도 작동합니다.
다중 처리기 및 포매터¶
로거는 일반 파이썬 객체입니다. addHandler()
메서드에는 추가할 수 있는 처리기의 수에 대한 최소 또는 최대 할당량이 없습니다. 때로는 응용 프로그램이 모든 심각도의 모든 메시지를 텍스트 파일에 기록하는 동시에, 에러 또는 그 이상을 콘솔에 기록하는 것이 유용 할 수 있습니다. 이렇게 설정하려면, 적절한 처리기를 구성하기만 하면 됩니다. 응용 프로그램 코드의 로깅 호출은 변경되지 않습니다. 다음은 앞의 간단한 모듈 기반 구성 예제를 약간 수정 한 것입니다:
import logging
logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# 디버그 메시지도 기록하는 파일 처리기 생성
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# 더 높은 수준의 콘솔 처리기 생성
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 포매터를 만들고 처리기에 추가
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# 처리기를 로거에 추가
logger.addHandler(ch)
logger.addHandler(fh)
# '응용 프로그램' 코드
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
'응용 프로그램' 코드는 여러 처리기에 신경 쓰지 않습니다. 변경된 것은 fh 라는 새로운 처리기의 추가 및 구성뿐입니다.
flowdas
본문의 설명과는 달리 앞의 예제에도 이미 fh 처리기가 들어있습니다. 저자의 의도는, 앞의 예제에 콘솔 처리기 (ch) 만 들어있는 것입니다.
중요도가 높거나 낮은 필터를 사용하여 새 처리기를 만드는 기능은 응용 프로그램을 작성하고 테스트할 때 매우 유용합니다. 디버깅을 위해 많은 print
문을 사용하는 대신에 logger.debug
를 사용하십시오: 나중에 삭제하거나 주석 처리해야 할 print 문과 달리, logger.debug 문은 소스 코드에서 그대로 유지될 수 있고, 그들을 다시 필요로 할 때까지 휴면 상태로 남아 있습니다. 그때, 필요한 유일한 변경은 로거 또는 처리기의 심각도 수준을 DEBUG로 수정하는 것입니다.
여러 대상으로 로깅 하기¶
다른 메시지 포맷으로 다른 상황에서 콘솔과 파일에 기록하려고 한다고 가정 해 봅시다. DEBUG 이상 수준의 메시지를 파일에 기록하고, 수준 INFO 이상인 메시지를 콘솔에 기록하려고 한다고 가정 해보십시오. 또한, 타임스탬프가 파일에는 포함되어야 하지만, 콘솔 메시지에는 없어야 한다고 가정합시다. 이렇게 하면 됩니다:
import logging
# 파일 로깅 설정 - 자세한 내용은 이전 섹션을 참조하세요
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M',
filename='/temp/myapp.log',
filemode='w')
# sys.stderr에 INFO 메시지 이상을 기록하는 처리기를 정의합니다
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# 콘솔용으로 더 간단한 포맷을 설정합니다
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# 처리기가 이 포맷을 사용하도록 알립니다
console.setFormatter(formatter)
# 루트 로거에 처리기를 추가합니다
logging.getLogger('').addHandler(console)
# 이제 루트 로거나 다른 로거에 로그할 수 있습니다. 먼저 루트 ...
logging.info('Jackdaws love my big sphinx of quartz.')
# 이제 응용 프로그램의 영역을 나타내는 몇 가지 다른 로거를 정의합니다:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
실행하면 콘솔에는 다음과 같이 출력됩니다.
root : INFO Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO How quickly daft jumping zebras vex.
myapp.area2 : WARNING Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR The five boxing wizards jump quickly.
파일에는 이렇게 기록됩니다.
10-22 22:19 root INFO Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1 INFO How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2 ERROR The five boxing wizards jump quickly.
보시다시피 DEBUG 메시지는 파일에만 표시됩니다. 다른 메시지는 두 목적지로 전송됩니다.
이 예제는 콘솔과 파일 처리기를 사용하지만, 여러분이 선택하는 처리기의 수나 조합에 제약이 없습니다.
구성 서버 예제¶
다음은 로깅 구성 서버를 사용하는 모듈의 예입니다:
import logging
import logging.config
import time
import os
# 초기 구성 파일을 읽습니다
logging.config.fileConfig('logging.conf')
# 포트 9999에 리스너를 만들고 시작시킵니다
t = logging.config.listen(9999)
t.start()
logger = logging.getLogger('simpleExample')
try:
# Ctrl+C 를 누를 때까지, 새로운 구성의 차이점을 확인하기위해
# 로깅 호출을 반복합니다
while True:
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
time.sleep(5)
except KeyboardInterrupt:
# 청소합니다
logging.config.stopListening()
t.join()
다음은 파일 이름을 받아서, 그 파일을 새 로깅 구성으로 (이진 인코딩된 길이를 적절하게 앞에 붙여서) 서버로 보내는 스크립트입니다:
#!/usr/bin/env python
import socket, sys, struct
with open(sys.argv[1], 'rb') as f:
data_to_send = f.read()
HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')
블록 하는 처리기 다루기¶
때로는 로깅 처리기가 로깅 중인 스레드를 블록 하지 않고 작업을 수행해야 하는 경우가 있습니다. 이것은 웹 응용 프로그램에서 흔히 나타나는 요구사항이지만, 물론 다른 시나리오에서도 발생합니다.
흔히 느린 행동을 보이는 범인은 SMTPHandler
입니다: 개발자의 통제 밖에 있는 여러 가지 이유로, 전자 우편을 보내는 데 오랜 시간이 걸릴 수 있습니다 (예를 들어, 잘 동작하지 않는 메일 또는 네트워크 인프라). 그러나 거의 모든 네트워크 기반 처리기는 블록 할 수 있습니다. SocketHandler
작업도 너무 느린 DNS 질의를 이면에서 수행 할 수 있습니다 (그리고 이 질의는 여러분의 통제 밖에 있는, 파이썬 계층 아래의 소켓 라이브러리 코드 깊숙이 있을 수 있습니다).
한 가지 해결책은 두 부분으로 된 접근법을 사용하는 것입니다. 첫 번째 부분에서는, 성능이 중요한 스레드에서 액세스하는 로거에 QueueHandler
만 붙입니다. 그들은 단순히 큐에 씁니다. 충분한 용량으로 큐의 크기를 조정하거나, 크기의 상한이 없도록 초기화 할 수 있습니다. 큐에 대한 쓰기는 일반적으로 신속하게 받아들여지지만, 코드에서 예방책으로 queue.Full
예외를 잡아야 할 것입니다. 코드에 성능이 중요한 스레드가 있는 라이브러리 개발자인 경우, 코드를 사용할 다른 개발자의 이익을 위해 이것을 (여러분의 로거에 QueueHandlers
만 붙이라는 제안과 함께) 문서로 만들어야 합니다.
해결책의 두 번째 부분은 QueueListener
며, 이는 QueueHandler
에 상응하여 설계되었습니다. QueueListener
는 매우 간단합니다: 큐와 처리기를 넘겨주면 QueueHandlers
(또는 LogRecords
의 다른 소스)에서 보낸 LogRecord를 큐에서 수신하는 내부 스레드를 시작합니다. LogRecords
는 큐에서 제거되고 처리를 위해 처리기로 전달됩니다.
별도의 QueueListener
클래스를 사용하면 같은 인스턴스를 사용하여 여러 개의 QueueHandlers
를 처리할 수 있다는 장점이 있습니다. 이것은 특별한 이점 없이 처리기당 하나의 스레드를 먹게 되는 기존의 처리기 클래스의 스레드 버전을 만드는 것보다 자원 친화적입니다.
이 두 클래스를 사용하는 예제는 다음과 같습니다 (임포트 생략):
que = queue.Queue(-1) # 크기 제한이 없습니다
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# 로그 출력은 내부 큐를 살피는 내부 스레드가 아닌 이벤트를 만든 스레드(메인 스레드)를
# 표시합니다. 이것이 여러분이 원하는 것입니다.
root.warning('Look out!')
listener.stop()
실행하면, 다음과 같은 결과를 만듭니다:
MainThread: Look out!
버전 3.5에서 변경: 파이썬 3.5 이전 버전에서는, QueueListener
는 항상 큐에서 받은 모든 메시지를 초기화될 때 제공된 모든 처리기로 전달했습니다. (이것은 큐가 채워질 때 수준 필터링이 모두 반대편에서 행해졌다고 가정했기 때문입니다.) 3.5 이후부터, 이 동작은 키워드 인자 respect_handler_level=True
를 리스너의 생성자에 전달함으로써 변경될 수 있습니다. 이렇게 할 때, 리스너는 각 메시지의 수준을 처리기의 수준과 비교하여, 적절한 메시지만 처리기에 전달되도록 합니다.
네트워크에서 로깅 이벤트 보내고 받기¶
네트워크를 통해 로깅 이벤트를 보내고, 받는 쪽에서 처리하려고 한다고 합시다. 이렇게 하는 간단한 방법은 SocketHandler
인스턴스를 보내는 쪽의 루트 로거에 연결하는 것입니다:
import logging, logging.handlers
rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# 포매터는 신경 쓰지 마세요. 소켓 처리기는 이벤트를 포맷되지
# 않은 피클로 전송합니다
rootLogger.addHandler(socketHandler)
# 이제 루트 로거나 다른 로거에 로그할 수 있습니다. 먼저 루트 ...
logging.info('Jackdaws love my big sphinx of quartz.')
# 이제 응용 프로그램의 영역을 나타내는 몇 가지 다른 로거를 정의합니다:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
수신 측에서는 socketserver
모듈을 사용하여 수신기를 구성할 수 있습니다. 기본적인 작업 예제는 다음과 같습니다:
import pickle
import logging
import logging.handlers
import socketserver
import struct
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""스트리밍 로깅 요청을 위한 처리기.
기본적으로 로컬에서 구성된 로깅 정책을 사용하여 레코드를 기록합니다.
"""
def handle(self):
"""
여러 요청을 처리합니다 - 각 요청은 4 바이트 길이 다음에 피클 형식의
LogRecord로 구성됩니다. 로컬에서 구성된 정책에 따라 레코드를 로깅합니다.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# 이름이 지정되면, 레코드에서 암시된 이름 대신 지정한 이름의
# 로거를 사용합니다.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# 주의: 모든 레코드가 기록됩니다. Logger.handle은 일반적으로
# 로거 수준 필터링 후에 호출되기 때문입니다. 필터링을 원한다면, 사이클
# 낭비와 네트워크 대역폭을 절약하기 위해 클라이언트 측에서 처리하십시오!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
테스트에 적합한 간단한 TCP 소켓 기반 로그 수신기.
"""
allow_reuse_address = True
def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None
def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort
def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server...')
tcpserver.serve_until_stopped()
if __name__ == '__main__':
main()
먼저 서버를 실행한 다음 클라이언트를 실행합니다. 클라이언트 쪽에서는 콘솔에 아무것도 인쇄되지 않습니다. 서버 측에서 다음과 같은 내용이 보여야 합니다.:
About to start TCP server...
59 root INFO Jackdaws love my big sphinx of quartz.
59 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
69 myapp.area1 INFO How quickly daft jumping zebras vex.
69 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
69 myapp.area2 ERROR The five boxing wizards jump quickly.
일부 시나리오에서는 피클이 몇 가지 보안 문제를 수반함에 유의하십시오. 이 문제가 중요하면, makePickle()
메서드를 재정의하고 거기서 여러분의 대안을 구현해서 다른 직렬화를 사용하는 한편, 위의 스크립트가 그 직렬화를 사용하도록 수정하십시오.
로그 출력에 문맥 정보 추가¶
로깅 호출에 전달된 매개 변수 외에도 로깅 출력에 문맥 정보가 포함되기 원하는 경우가 있습니다. 예를 들어, 네트워크 응용 프로그램에서, (원격 클라이언트의 사용자 이름 또는 IP 주소와 같은) 클라이언트별 정보를 로그에 기록하는 것이 바람직 할 수 있습니다. 이를 달성하기 위해 extra 매개 변수를 사용할 수는 있지만, 이러한 방식으로 정보를 전달하는 것이 항상 편리하지는 않습니다. 연결마다 Logger
인스턴스를 만들고 싶을지 모르지만, 이러한 인스턴스는 가비지 수집되지 않기 때문에 좋지 않습니다. Logger
인스턴스의 수가 응용 프로그램 로깅에 사용하고자 하는 세분성 수준에 의존적일 때 이것이 실제로 문제가 되지는 않지만, Logger
인스턴스의 수가 실질적으로 무제한이 되면 관리하기 어려울 수 있습니다.
문맥 정보 전달에 LoggerAdapters 사용하기¶
로깅 이벤트 정보와 함께 출력되는 문맥 정보를 전달하는 쉬운 방법은 LoggerAdapter
클래스를 사용하는 것입니다. 이 클래스는 Logger
처럼 보이도록 설계되어 있어서, debug()
, info()
, warning()
, error()
, exception()
, critical()
및 log()
를 호출할 수 있습니다. 이 메서드들은 Logger
에 있는 것과 똑같은 서명을 가지므로, 두 형의 인스턴스를 같은 의미로 사용할 수 있습니다.
LoggerAdapter
의 인스턴스를 생성할 때, Logger
인스턴스와 문맥 정보가 포함된 딕셔너리류 객체를 전달합니다. LoggerAdapter
의 인스턴스에서 로깅 메서드 중 하나를 호출하면, 생성자에 전달된 하위 Logger
인스턴스에 호출을 위임하고, 이 호출에 문맥 정보를 전달하도록 배치합니다. 다음은 LoggerAdapter
코드에서 발췌한 내용입니다:
def debug(self, msg, /, *args, **kwargs):
"""
이 어댑터 인스턴스의 문맥 정보를 추가 한 후,
하부 로거에 debug 호출을 위임합니다.
"""
msg, kwargs = self.process(msg, kwargs)
self.logger.debug(msg, *args, **kwargs)
LoggerAdapter
의 process()
메서드는 문맥 정보가 로그 출력에 추가되는 곳입니다. 로깅 호출의 메시지 및 키워드 인자를 받아서, 하부 로거에 대한 호출에서 사용될 (대체로) 수정된 버전을 돌려줍니다. 이 메서드의 기본 구현은 메시지는 그대로 두고, 키워드 인자에 생성자로 전달된 딕셔너리류 객체를 값으로 갖는 'extra' 키를 삽입합니다. 물론, 어댑터에 대한 호출에서 'extra' 키워드 인자를 전달한 경우 자동으로 덮어씁니다.
'extra'를 사용하는 장점은, 딕셔너리류 객체에 들어있는 값이 LogRecord
인스턴스의 __dict__에 병합되어, 키에 대해 알고 있는 Formatter
인스턴스로 사용자 정의된 문자열을 사용할 수 있게 된다는 것입니다. 다른 방법이 필요한 경우, 가령 메시지 문자열의 앞이나 뒤에 문맥 정보를 덧붙이려는 경우, LoggerAdapter
의 서브 클래스를 만들고, 필요한 작업을 수행하기 위해 process()
를 재정의해야 합니다. 다음은 간단한 예제입니다:
class CustomAdapter(logging.LoggerAdapter):
"""
이 예제 어댑터는 전달된 딕셔너리류 객체에 'connid'키가 있을 것으로 기대하며,
꺽쇠괄호로 둘러싼 값을 로그 메시지의 앞에 붙입니다.
"""
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
이런 식으로 사용할 수 있습니다:
logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})
그러면 어댑터에 로그 하는 모든 이벤트는 로그 메시지 앞에 some_conn_id
값이 붙습니다.
딕셔너리 이외의 객체를 사용하여 문맥 정보 전달하기¶
실제 딕셔너리를 LoggerAdapter
에 전달할 필요는 없습니다 - 로깅에 딕셔너리처럼 보일 수 있도록, __getitem__
과 __iter__
를 구현하는 클래스의 인스턴스를 전달할 수 있습니다. 값을 동적으로 생성하려는 경우 (반면에 딕셔너리에 들어있는 값은 바뀌지 않습니다) 유용합니다.
문맥 정보 전달에 필터 사용하기¶
사용자 정의 Filter
를 사용하여 로그 출력에 문맥 정보를 추가할 수도 있습니다. Filter
인스턴스는 전달된 LogRecords
를 수정할 수 있는데, 어트리뷰트를 추가해서 적절한 포맷 문자열이나 필요하다면 사용자 정의 Formatter
를 사용해서 출력되도록 할 수 있습니다.
예를 들어 웹 응용 프로그램에서, 처리 중인 요청(또는 적어도 그것의 흥미로운 부분)을 스레드 로컬 (threading.local
) 변수에 저장한 다음, Filter
에서 액세스해서, 요청에서 온 정보를 - 원격 IP 주소와 원격 사용자의 사용자 이름이라고 합시다 - 위의 LoggerAdapter
예제에서와같이 어트리뷰트 이름 'ip'와 'user'를 사용하여 LogRecord
에 추가할 수 있습니다. 이 경우 같은 포맷 문자열을 사용하여 위에 표시된 것과 비슷한 출력을 얻을 수 있습니다. 다음은 스크립트 예입니다:
import logging
from random import choice
class ContextFilter(logging.Filter):
"""
문맥 정보를 로그에 주입하는 필터입니다.
실제 문맥 정보를 사용하는 대신, 이 시연에서는 무작위
데이터만 사용합니다.
"""
USERS = ['jim', 'fred', 'sheila']
IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']
def filter(self, record):
record.ip = choice(ContextFilter.IPS)
record.user = choice(ContextFilter.USERS)
return True
if __name__ == '__main__':
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
a1 = logging.getLogger('a.b.c')
a2 = logging.getLogger('d.e.f')
f = ContextFilter()
a1.addFilter(f)
a2.addFilter(f)
a1.debug('A debug message')
a1.info('An info message with %s', 'some parameters')
for x in range(10):
lvl = choice(levels)
lvlname = logging.getLevelName(lvl)
a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')
flowdas
본문의 설명과는 달리 LoggerAdapter
의 예제는 IP 주소나 사용자 이름을 사용하고 있지 않습니다. 아마 어느 시점에선가
상호 참조를 손상 시킨 변경이 있었던 것으로 추정됩니다. LoggerAdapter
를 언급하는 부분은 무시해주세요.
실행하면 다음과 같은 결과가 나옵니다:
2010-09-06 22:38:15,292 a.b.c DEBUG IP: 123.231.231.123 User: fred A debug message
2010-09-06 22:38:15,300 a.b.c INFO IP: 192.168.0.1 User: sheila An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 127.0.0.1 User: jim A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 127.0.0.1 User: sheila A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 123.231.231.123 User: fred A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1 User: jim A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 192.168.0.1 User: jim A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR IP: 127.0.0.1 User: sheila A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG IP: 123.231.231.123 User: fred A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO IP: 123.231.231.123 User: fred A message at INFO level with 2 parameters
여러 프로세스에서 단일 파일에 로깅 하기¶
logging 이 스레드-안전하고, 단일 프로세스의 여러 스레드에서 단일 파일로 로깅 하는 것이 지원되지만, 파이썬에서 여러 프로세스가 단일 파일에 액세스하는 것을 직렬화하는 표준적인 방법이 없으므로, 여러 프로세스에서 단일 파일로 로깅 하는 것은 지원되지 않습니다. 여러 프로세스에서 하나의 파일에 로그 해야 하는 경우, 이 작업을 수행하는 한 가지 방법은 모든 프로세스가 로그를 SocketHandler
에 기록하고, 소켓에서 읽어서 파일로 로그 하는 소켓 서버를 구현하는 별도의 프로세스를 사용하는 것입니다. (원한다면, 기존 프로세스 중 하나에서 한 스레드가 이 기능을 전담하도록 할 수 있습니다.) 이 섹션에서 이 접근법을 더 자세하게 설명하고, 여러분의 응용 프로그램에 적용하기 위한 출발점으로 사용할 수 있는 작동하는 소켓 수신기를 제공합니다.
multiprocessing
모듈의 Lock
클래스를 사용하는 독자적인 처리기를 작성하여 여러 프로세스에서 파일에 액세스하는 것을 직렬화 할 수 있습니다. 기존 FileHandler
와 서브 클래스들은, 앞으로는 가능할 수 있지만, 현재 multiprocessing
을 사용하지 않습니다. 현재 multiprocessing
모듈이 모든 플랫폼에서 작동하는 록 기능을 제공하지는 않는다는 것에 유의하십시오 (https://bugs.python.org/issue3770 를 참조하세요).
또는, Queue
와 QueueHandler
를 사용하여, 모든 로깅 이벤트를 다중 프로세스 응용 프로그램의 프로세스 중 하나에 보낼 수 있습니다. 다음 예제 스크립트는 이렇게 하는 방법을 보여줍니다; 예제에서 별도의 리스너 프로세스가 다른 프로세스가 보낸 이벤트를 수신하고 자체 로깅 구성에 따라 이벤트를 기록합니다. 이 예제가 한 가지 방법만을 보여 주지만 (예를 들어, 별도의 리스너 프로세스 대신 리스너 스레드를 사용할 수도 있습니다 -- 구현은 비슷할 것입니다), 리스너와 응용 프로그램의 다른 프로세스가 완전히 다른 로깅 구성을 사용하도록 허용하고, 여러분 자신의 특별한 요구 사항을 충족하는 코드의 기초로 사용할 수 있습니다:
# 여러분의 코드에서도 이 임포트가 필요할겁니다
import logging
import logging.handlers
import multiprocessing
# 다음 두 줄의 임포트는 이 시연만을 위한 것입니다
from random import choice, random
import time
#
# 리스너 및 작업자에 대한 로깅 구성을 정의하고 싶기 때문에, 리스너 및 작업자 프로세스 함수는 해당 프로세스에 대한
# 로깅을 구성하기위한 콜러블 configurer 매개 변수를 받아들입니다.
# 이 함수들은 통신을 위해 사용하는 queue 도 받습니다.
#
# 실제로는 리스너를 원하는대로 구성 할 수 있지만, 이 간단한 예제에서는 리스너가 수신 된 레코드에 수준 또는
# 필터 논리를 적용하지 않습니다. 실제로 필터링 될 이벤트를 프로세스간에 보내지 않기 위해, 이런 논리는 작업자
# 프로세스에 적용하고 싶을겁니다.
#
# 결과를 쉽게 볼 수 있도록 회전 된 파일의 크기를 작게 만듭니다.
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# 리스너 프로세스 최상위 루프입니다: 큐에서 로그 이벤트(LogRecords)를 기다리고 처리합니다.
# LogRecord로 None을 얻으면 종료합니다.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # 리스너에서 종료 신호로 이 것을 보냅니다.
break
logger = logging.getLogger(record.name)
logger.handle(record) # 수준 또는 필터 논리가 적용되지 않습니다 - 그대로 보냅니다!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# 이 시연에서 무작위로 선택하는 데 사용되는 배열
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
# 작업자 구성은 작업자 프로세스가 실행의 처음에 수행됩니다.
# 윈도우에서는 포크 개념에 의존 할 수 없기 때문에,
각 프로세스는 시작할 때 로깅 구성 코드를 실행합니다.
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # 하나의 처리기만 필요합니다
root = logging.getLogger()
root.addHandler(h)
# 시연을 위해, 모든 메시지를 보냅니다; 다른 수준이나 필터 논리는 적용되지 않습니다.
root.setLevel(logging.DEBUG)
# 작업자 프로세스 최상위 루프입니다. 종료하기 전에 임의의 지연이 삽입되는 10개의 이벤트만 로그합니다.
# 인쇄 메시지는 여러분에게 무엇인가하고 있다는 것을 알리기 위함입니다!
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
print('Worker finished: %s' % name)
# 여기가 시연이 조정되는 곳입니다. 큐를 만들고, 리스너를 만들고 시작시키고, 10개의 작업자를 만들고
# 시작시킨 후에 종료할 때까지 기다리고, 리스너가 종료하도록 큐에 None 을 보냅니다.
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
main()
위의 스크립트 변형은 로깅을 메인 프로세스의 별도의 스레드에서 유지합니다:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
if __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'ERROR',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# 이 시점에서, 메인 프로세스는 자체적으로 유용한 작업을 수행 할 수 있습니다.
# 그 일이 끝나면, 작업자들이 종료할 때까지 기다릴 수 있습니다...
for wp in workers:
wp.join()
# 그리고 이제 로깅 스레드도 종료하도록 알립니다
q.put(None)
lp.join()
이 변형은 특정 로거에 대한 구성을 적용하는 방법을 보여줍니다 - 예를 들어, foo
로거는 foo
서브 시스템의 모든 이벤트를 mplog-foo.log
파일에 저장하는 특별한 처리기를 갖고 있습니다. 이것은 메인 프로세스의 로깅 시스템이 ( 로깅 이벤트가 작업자 프로세스에서 만들어졌다 하더라도) 메시지를 적절한 대상으로 전달하는 데 사용됩니다.
concurrent.futures.ProcessPoolExecutor 사용하기¶
concurrent.futures.ProcessPoolExecutor
를 사용하여 작업자 프로세스를 시작하려면, 약간 다른 방법으로 큐를 만들어야 합니다.
queue = multiprocessing.Queue(-1)
대신에, 다음을 사용해야 합니다
queue = multiprocessing.Manager().Queue(-1) # 위의 예제에서도 작동합니다
그런 다음 작업자 생성을:
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
에서 다음과 같이 대체할 수 있습니다 (먼저 concurrent.futures
를 임포트 하는 것을 기억하십시오):
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for i in range(10):
executor.submit(worker_process, queue, worker_configurer)
파일 회전 사용하기¶
때로는 로그 파일이 특정 크기까지 커지도록 한 다음, 새 파일을 열고 그곳에 로그에 기록하려고 할 수 있습니다. 이 파일들을 특정 수만 유지하고, 그 수 만큼의 파일이 만들어지면, 파일을 회전시켜 파일의 개수와 크기 모두 제한되도록 하고 싶을 수 있습니다. 이 사용 패턴을 위해, logging 패키지는 RotatingFileHandler
를 제공합니다:
import glob
import logging
import logging.handlers
LOG_FILENAME = 'logging_rotatingfile_example.out'
# 원하는 출력 수준으로 로거를 설정합니다
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)
# 로그 메시지 처리기를 로거에 추가합니다
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=20, backupCount=5)
my_logger.addHandler(handler)
# 메시지를 로그합니다
for i in range(20):
my_logger.debug('i = %d' % i)
# 만들어진 파일을 봅니다
logfiles = glob.glob('%s*' % LOG_FILENAME)
for filename in logfiles:
print(filename)
결과는 6개의 파일이어야 하고, 각기 응용 프로그램에 대한 로그 기록의 일부입니다:
logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5
가장 최근의 파일은 항상 logging_rotatingfile_example.out
이며, 크기 제한에 도달할 때마다 접미사 .1
이 붙은 이름으로 변경됩니다. 기존 백업 파일 각각의 이름이 변경되어 접미사가 증가하고 ( .1
이 .2
가 되는 등) .6
파일이 지워집니다.
분명, 이 예제는 로그 길이를 극단적으로 작게 설정합니다. maxBytes 를 적절한 값으로 설정하고 싶을 겁니다.
대체 포매팅 스타일 사용하기¶
logging이 파이썬 표준 라이브러리에 추가되었을 때, 가변 내용으로 메시지를 포맷하는 유일한 방법은 %-포매팅 방법을 사용하는 것이었습니다. 그 이후로, 파이썬은 두 개의 새로운 포매팅 접근법을 얻었습니다: string.Template
(파이썬 2.4에 추가됨)과 str.format()
(파이썬 2.6에 추가됨).
로깅은 (3.2부터) 이 두 가지 추가 포매팅 스타일에 대해 개선된 지원을 제공합니다. Formatter
클래스는 style
이라는 추가적인 키워드 매개 변수를 취하도록 개선되었습니다. 기본값은 '%'
이지만, 다른 두 가지 포매팅 스타일에 해당하는 '{'
및 '$'
를 사용할 수 있습니다. (여러분이 기대하듯이) 이전 버전과의 호환성은 기본적으로 유지되지만, style 매개변수를 명시적으로 지정하면 str.format()
또는 string.Template
과 함께 작동하는 포맷 문자열을 지정할 수 있습니다. 다음은 가능성을 보여주기 위한 예제 콘솔 세션입니다:
>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
... style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
... style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>
로그로 최종 출력하기 위해 로깅 메시지를 포매팅하는 것은 개별 로깅 메시지가 구성되는 방식과 완전히 별개입니다. 개별 메시지에는 다음과 같이 %-포매팅을 사용할 수 있습니다:
>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>
로깅 호출(logger.debug()
, logger.info()
등)은 실제 로깅 메시지 자체를 위해서는 위치 매개 변수만을 취하고, 키워드 매개 변수는 실제 로깅 호출을 어떻게 다뤄야 하는지를 지정하는 옵션을 결정하는 용도로만 사용됩니다 (예를 들어, 트레이스백 정보를 로그 해야 할지를 가리키는 exc_info
키워드 매개 변수나 로그에 추가되는 문맥 정보를 나타내는 extra
키워드 매개 변수). 그래서 여러분은 str.format()
또는 string.Template
문법을 사용하여 직접 로깅 호출을 할 수 없습니다, 내부적으로 logging 패키지가 %-포매팅을 사용하여 포맷 문자열과 변수 인자를 병합하기 때문입니다. 이전 버전과의 호환성을 유지하는 동안은 이 상황이 바뀌지 않을 것입니다. 기존 코드에 있는 모든 로깅 호출이 %-포맷 문자열을 사용하기 때문입니다.
그러나 {}- 및 $- 포매팅을 사용하여 개별 로그 메시지를 구성하는 방법이 있습니다. 메시지의 경우, 메시지 포맷 문자열로 임의의 객체를 사용할 수 있으며, logging 패키지는 실제 형식 문자열을 얻기 위해 그 객체에 대해 str()
을 호출한다는 것을 상기하십시오. 다음 두 가지 클래스를 고려하십시오:
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
이 중 하나를 포맷 문자열 대신 사용하면, {}- 또는 $-포매팅을 사용하여 포맷된 로그 출력의 "%(message)s", "{message}" 또는 "$message" 자리에 나타나는 실제 "message" 부분을 만들 수 있습니다. 어떤 것을 로그 하고 싶을 때마다 클래스 이름을 사용하는 것은 다소 꼴사납지만, __(두 개의 밑줄 ---- gettext.gettext()
나 그 형제들의 동의어/별칭으로 사용되는 _ 과 혼동하지 마세요)와 같은 별칭을 사용하면 꽤 쓸만합니다.
위의 클래스가 파이썬에 포함되어 있지는 않지만, 아주 쉽게 여러분의 코드에 복사하여 붙여넣을 수 있습니다. 다음과 같이 사용될 수 있습니다 (wherever
라는 모듈에서 선언되었다고 가정합니다):
>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
... point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
위의 예제는 print()
를 사용하여 포매팅이 어떻게 작동하는지 보여주고 있지만, 물론 이 접근법으로 실제 로깅 할 때는 logger.debug()
나 그와 유사한 것을 사용해야 합니다.
한 가지 지적할 점은, 이 접근법이 성능상으로 큰 문제가 없다는 것입니다: 실제 포매팅은 로깅 호출을 할 때가 아니라 로깅 된 메시지를 실제로 처리기가 로그로 출력할 때 (그리고 실제로 그렇게 될 때만) 발생합니다. 그래서 여러분이 실수할 수도 있을 특이함은 괄호가 포맷 문자열과 인자들을 모두 감싼다는 것뿐입니다. __ 표기법이 단지 XXXMessage 클래스 중 하나에 대한 생성자 호출의 편의 문법이기 때문입니다.
원한다면, LoggerAdapter
를 사용하여 다음 예제와 같이 위와 유사한 효과를 얻을 수 있습니다:
import logging
class Message:
def __init__(self, fmt, args):
self.fmt = fmt
self.args = args
def __str__(self):
return self.fmt.format(*self.args)
class StyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None):
super(StyleAdapter, self).__init__(logger, extra or {})
def log(self, level, msg, /, *args, **kwargs):
if self.isEnabledFor(level):
msg, kwargs = self.process(msg, kwargs)
self.logger._log(level, Message(msg, args), (), **kwargs)
logger = StyleAdapter(logging.getLogger(__name__))
def main():
logger.debug('Hello, {}', 'world!')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
이 스크립트는 파이썬 3.2 이상에서 실행될 때 Hello, world!
라는 메시지를 기록해야 합니다.
사용자 정의 LogRecord
¶
모든 로깅 이벤트는 LogRecord
인스턴스로 표현됩니다. 이벤트가 로그 되고 로거 수준에 의해 필터링 되지 않으면, LogRecord
가 생성되고 이벤트에 대한 정보로 채워진 다음 해당 로거(와 그 조상들, 계층 상위로의 전파가 비활성화된 지점의 로거까지)의 처리기로 전달됩니다. 파이썬 3.2 이전에는, 이 생성이 일어나는 곳이 두 곳밖에 없었습니다:
Logger.makeRecord()
, 이벤트 로깅의 일반적인 프로세스에서 호출됩니다. 인스턴스를 생성하기 위해LogRecord
를 직접 호출합니다.makeLogRecord()
, LogRecord에 추가될 어트리뷰트를 포함하는 딕셔너리와 함께 호출됩니다. 보통 적절한 딕셔너리가 네트워크를 통해 (예를 들어,SocketHandler
를 통해 피클 형태로, 또는HTTPHandler
를 통해 JSON 형식으로) 수신될 때 호출됩니다.
이것은 보통 LogRecord
로 특별한 것을 할 필요가 있다면, 다음 중 하나를 해야 한다는 것을 의미합니다.
Logger.makeRecord()
를 재정의하는 자신만의Logger
서브 클래스를 만들고, 관심 있는 로거의 인스턴스가 만들어지기 전에setLoggerClass()
를 사용하여 설정하십시오.
첫 번째 접근법은 여러 라이브러리가 서로 다른 일을 하고 싶어 하는 시나리오에서는 다루기 힘들 것입니다. 각자 자신의 Logger
서브 클래스를 설정하려고 시도할 것이고, 마지막 것이 이기게 될 것입니다.
두 번째 접근법은 많은 경우에 합리적으로 잘 작동하지만, LogRecord
의 특별한 서브 클래스를 사용할 수는 없습니다. 라이브러리 개발자는 로거에 적절한 필터를 설정할 수 있지만, 새로운 로거를 도입할 때마다 이를 수행해야 한다는 것을 기억해야 합니다. 이를 고려하지 않는다면 새 패키지나 모듈을 추가하고 모듈 수준에서 단순히 다음과 같이 합니다:
logger = logging.getLogger(__name__)
이것은 아마도 고려해야 할 많은 것 중 하나일 뿐입니다. 개발자는 자신의 최상위 로거에 첨부된 NullHandler
에도 필터를 추가 할 수 있지만, 응용 프로그램 개발자가 하위 수준 라이브러리 로거에 처리기를 연결하면 호출되지 않습니다 --- 그래서 그 처리기로부터의 출력은 라이브러리 개발자의 의도를 반영하지 못합니다.
파이썬 3.2 이상에서는, LogRecord
생성이 사용자가 지정할 수 있는 팩토리를 통해 수행됩니다. 팩토리는 setLogRecordFactory()
로 설정할 수 있고, getLogRecordFactory()
로 조회할 수 있는 콜러블입니다. 팩토리는 LogRecord
생성자와 같은 서명으로 호출되고, 기본 설정은 LogRecord
입니다.
이 방법을 사용하면 사용자 정의 팩토리가 LogRecord 생성의 모든 측면을 제어 할 수 있습니다. 예를 들어, 서브 클래스를 반환하거나, 다음과 같은 방법으로 생성된 레코드에 어트리뷰트를 추가 할 수 있습니다:
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.custom_attribute = 0xdecafbad
return record
logging.setLogRecordFactory(record_factory)
이 패턴은 서로 다른 라이브러리가 팩토리를 체인으로 연결할 수 있도록 하며, 서로의 어트리뷰트를 덮어쓰거나 의도하지 않게 표준으로 제공된 어트리뷰트를 덮어쓰지 않는 한, 놀랄 일은 없어야 합니다. 그러나 체인의 각 고리는 모든 로깅 작업에 실행시간 오버헤드를 추가하므로, Filter
를 사용해서 원하는 결과를 얻을 수 없을 때만 이 기법을 사용해야 합니다.
QueueHandler 서브 클래스 만들기 - ZeroMQ 예제¶
QueueHandler
서브 클래스를 사용하여 다른 유형의 큐에 메시지를 보낼 수 있습니다, 예를 들어 ZeroMQ 'publish' 소켓. 아래 예제에서, 소켓은 별도로 생성되어 처리기로 ('queue'로) 전달됩니다:
import zmq # ZeroMQ의 파이썬 바인딩인 pyzmq 를 사용합니다
import json # 호환성있게 레코드를 직렬화하는데 사용
ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB) # 또는 zmq.PUSH 나 다른 적절한 값
sock.bind('tcp://*:5556') # 또는 기타 다른 위치
class ZeroMQSocketHandler(QueueHandler):
def enqueue(self, record):
self.queue.send_json(record.__dict__)
handler = ZeroMQSocketHandler(sock)
물론 구성하는 다른 방법이 있습니다. 예를 들어 처리기가 소켓을 만드는데 필요한 데이터를 전달하는 것입니다:
class ZeroMQSocketHandler(QueueHandler):
def __init__(self, uri, socktype=zmq.PUB, ctx=None):
self.ctx = ctx or zmq.Context()
socket = zmq.Socket(self.ctx, socktype)
socket.bind(uri)
super().__init__(socket)
def enqueue(self, record):
self.queue.send_json(record.__dict__)
def close(self):
self.queue.close()
QueueListener 서브 클래스 만들기 - ZeroMQ 예제¶
다른 유형의 큐에서 메시지를 받기 위해 QueueListener
의 서브 클래스를 만들 수도 있습니다, 예를 들어 ZeroMQ 'subscribe' 소켓. 다음은 그 예입니다:
class ZeroMQSocketListener(QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
self.ctx = kwargs.get('ctx') or zmq.Context()
socket = zmq.Socket(self.ctx, zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '') # 모든 것을 구독합니다
socket.connect(uri)
super().__init__(socket, *handlers, **kwargs)
def dequeue(self):
msg = self.queue.recv_json()
return logging.makeLogRecord(msg)
더 보기
- 모듈
logging
logging 모듈에 대한 API 레퍼런스
- 모듈
logging.config
logging 모듈용 구성 API.
- 모듈
logging.handlers
logging 모듈에 포함된 유용한 처리기.
딕셔너리 기반 구성의 예¶
다음은 로깅 구성 딕셔너리의 예입니다 - 장고 프로젝트 설명서 에서 가져왔습니다. 이 딕셔너리를 dictConfig()
로 전달하여 구성을 적용합니다:
LOGGING = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
},
'simple': {
'format': '%(levelname)s %(message)s'
},
},
'filters': {
'special': {
'()': 'project.logging.SpecialFilter',
'foo': 'bar',
}
},
'handlers': {
'null': {
'level':'DEBUG',
'class':'django.utils.log.NullHandler',
},
'console':{
'level':'DEBUG',
'class':'logging.StreamHandler',
'formatter': 'simple'
},
'mail_admins': {
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'filters': ['special']
}
},
'loggers': {
'django': {
'handlers':['null'],
'propagate': True,
'level':'INFO',
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myproject.custom': {
'handlers': ['console', 'mail_admins'],
'level': 'INFO',
'filters': ['special']
}
}
}
이 구성에 대한 더 자세한 정보는 장고 설명서의 관련 섹션 을 참조하세요.
rotator와 namer를 사용해서 로그 회전 처리하기¶
다음 코드 조각에 namer 와 rotator를 정의하는 예가 있는데, 로그 파일을 zlib 기반으로 압축합니다:
def namer(name):
return name + ".gz"
def rotator(source, dest):
with open(source, "rb") as sf:
data = sf.read()
compressed = zlib.compress(data, 9)
with open(dest, "wb") as df:
df.write(compressed)
os.remove(source)
rh = logging.handlers.RotatingFileHandler(...)
rh.rotator = rotator
rh.namer = namer
이것은 "진짜" .gz 파일이 아닙니다. 단순히 압축된 데이터일 뿐이고, 실제 gzip 파일에서 찾을 수 있는 "컨테이너" 가 없습니다. 이 코드 조각은 단지 설명을 위한 것일 뿐입니다.
좀 더 정교한 multiprocessing 예제¶
다음 동작하는 예제에서는 구성 파일을 사용하여 로깅을 multiprocessing과 함께 사용하는 방법을 보여줍니다. 구성은 매우 간단하지만, 실제 multiprocessing 시나리오에서 더 복잡한 구성을 구현할 수 있음을 예시합니다.
이 예에서, 주 프로세스는 리스너 프로세스와 몇 개의 작업자 프로세스를 생성합니다. 주 프로세스, 리스너 및 작업자를 위한 세 가지 구성이 있습니다 (모든 작업자는 같은 구성을 공유합니다). 주 프로세스에서의 로깅, 작업자가 QueueHandler에 로그 하는 방법, 그리고 리스너가 QueueListener 및 더욱 복잡한 로깅 구성을 구현하고 큐를 통해 수신한 이벤트를 구성에서 지정된 처리기로 전달하도록 배치하는 는 방법을 볼 수 있습니다. 이러한 구성은 설명을 위한 것이지만, 이 예제를 여러분 자신의 시나리오에 적용할 수 있어야 합니다.
스크립트는 다음과 같습니다 - 독스트링과 주석이 어떻게 작동하는지 잘 설명하기를 바랍니다:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time
class MyHandler:
"""
이벤트 로깅을위한 간단한 처리기. 리스너 프로세스에서 실행되고, 수신된 레코드의 이름을 기반으로
로거에 이벤트를 전달합니다. 그러면 레코드는 로깅 시스템에 의해 해당 로거에 대해 구성된 처리기로
전달됩니다.
"""
def handle(self, record):
if record.name == "root":
logger = logging.getLogger()
else:
logger = logging.getLogger(record.name)
if logger.isEnabledFor(record.levelno):
# 파일 및 콘솔에 로깅을 수행하는 것이 리스너임을 보여주기 위해 프로세스 이름을 변환합니다.
record.processName = '%s (for %s)' % (current_process().name, record.processName)
logger.handle(record)
def listener_process(q, stop_event, config):
"""
이것은 주 프로세스에서 수행 될 수 있지만, 설명을 위해 별도의 프로세스에서 수행합니다.
지정된 구성에 따라 로깅을 초기화하고, 리스너를 시작하고 주 프로세스가 이벤트를 통해 완료를
알릴때까지 기다립니다. 그러고나면 리스너가 중지되고 프로세스가 종료됩니다.
"""
logging.config.dictConfig(config)
listener = logging.handlers.QueueListener(q, MyHandler())
listener.start()
if os.name == 'posix':
# POSIX에서, setup 로거는 부모 프로세스에서 구성되었지만, dictConfig
# 호출 후 사용 불가능하게 설정되어 있어야합니다.
# 윈도우에서는 fork가 사용되지 않기때문에, setup 로거가 자식 프로세스에는 없습니다.
# 그래서 새로 만들어질 것이고 메시지가 나타나게 됩니다 - 그래서 "if posix" 절이 필요합니다.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
stop_event.wait()
listener.stop()
def worker_process(config):
"""
설명을 위해 여러개를 만듭니다. 실제로는, 똑 같은 프로세스 대신에 다른 종류의
프로세스일 수 있습니다.
지정된 구성에 따라 로깅을 초기화하고 무작위로 선택된 로거에 임의의 수준으로 100 개의
메시지를 로그합니다.
다른 프로세스가 실행될 수 있도록 작은 sleep이 추가됩니다. 이것이 꼭 필요한 것은
아니지만, 생략했을 때보다 여러 프로세스들의 출력을 조금 더 섞습니다.
"""
logging.config.dictConfig(config)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
if os.name == 'posix':
# POSIX에서, setup 로거는 부모 프로세스에서 구성되었지만, dictConfig
# 호출 후 사용 불가능하게 설정되어 있어야합니다.
# 윈도우에서는 fork가 사용되지 않기때문에, setup 로거가 자식 프로세스에는 없습니다.
# 그래서 새로 만들어질 것이고 메시지가 나타나게 됩니다 - 그래서 "if posix" 절이 필요합니다.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
time.sleep(0.01)
def main():
q = Queue()
# 주 프로세스는 콘솔에 인쇄하는 간단한 구성을 얻습니다.
config_initial = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO'
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG'
}
}
# 작업자 프로세스 구성은 루트 로거에 QueueHandler 를 첨부하는것 뿐인데, 모든 메시지를 큐로 보냅니다.
# 부모 프로세스의 "setup" 로거를 사용하지 않도록 기존 로거를 비활성화합니다. fork() 된 자식
# 프로세스에 그 로거가 존재할 것이기 때문에 POSIX에서 필요합니다.
config_worker = {
'version': 1,
'disable_existing_loggers': True,
'handlers': {
'queue': {
'class': 'logging.handlers.QueueHandler',
'queue': q
}
},
'root': {
'handlers': ['queue'],
'level': 'DEBUG'
}
}
# 리스너 프로세스 구성은 이벤트를 원하는 어떤 처리기로도 보낼 수 있는 로깅 구성의 유연성을 보여줍니다.
# 부모 프로세스의 "setup" 로거를 사용하지 않도록 기존 로거를 비활성화합니다. fork() 된 자식
# 프로세스에 그 로거가 존재할 것이기 때문에 POSIX에서 필요합니다.
config_listener = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
},
'simple': {
'class': 'logging.Formatter',
'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple',
'level': 'INFO'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed'
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed'
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'formatter': 'detailed',
'level': 'ERROR'
}
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'handlers': ['console', 'file', 'errors'],
'level': 'DEBUG'
}
}
# 부모에서의 로깅이 정상적으로 작동하는 것을 보여주기 위해 몇 가지 초기 이벤트를 로그합니다.
logging.config.dictConfig(config_initial)
logger = logging.getLogger('setup')
logger.info('About to create workers ...')
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1),
args=(config_worker,))
workers.append(wp)
wp.start()
logger.info('Started worker: %s', wp.name)
logger.info('About to create listener ...')
stop_event = Event()
lp = Process(target=listener_process, name='listener',
args=(q, stop_event, config_listener))
lp.start()
logger.info('Started listener')
# 우리는 이제 작업자들이 일을 끝낼때까지 기다립니다.
for wp in workers:
wp.join()
# 작업자들이 모두 종료했습니다. 리스너도 이제 멈출 수 있습니다.
# 부모에서의 로깅은 정상적으로 작동합니다.
logger.info('Telling listener to stop ...')
stop_event.set()
lp.join()
logger.info('All done.')
if __name__ == '__main__':
main()
SysLogHandler로 전송된 메시지에 BOM 삽입하기¶
RFC 5424 는 유니코드 메시지가 다음 구조를 갖는 바이트들로 syslog 데몬에 전송되어야 함을 요구합니다: 선택적인 순수 ASCII 구성 요소, 그 뒤를 이어 UTF-8 바이트 순서 표식 (BOM), 그 뒤를 이어 UTF-8으로 인코딩된 유니코드. (이 규격의 관련 절 을 참조하십시오.)
파이썬 3.1에서, BOM을 메시지에 삽입하는 코드가 SysLogHandler
에 추가되었지만, 유감스럽게도, BOM이 메시지의 시작 부분에 나타나서 그 앞에 순수 ASCII 구성 요소를 허락하지 않도록 잘못 구현되었습니다.
이 동작이 잘못됨에 따라, 잘못된 BOM 삽입 코드가 파이썬 3.2.4 이상에서 제거되었습니다. 그러나, 올바른 코드로 대체되지는 않았고, BOM을 포함하고, 그 앞에 순수 ASCII 시퀀스, 그 뒤에 UTF-8으로 인코딩된 임의의 유니코드로 구성된 RFC 5424-호환 메시지를 생성하려는 경우 다음과 같이 해야 합니다:
Formatter
인스턴스를SysLogHandler
인스턴스에 다음과 같은 포맷 문자열과 함께 첨부하십시오:'ASCII section\ufeffUnicode section'
유니코드 코드 포인트 U+FEFF는, UTF-8을 사용하여 인코딩될 때, UTF-8 BOM으로 인코딩됩니다 -- 바이트열
b'\xef\xbb\xbf'
.ASCII section을 원하는 자리 표시기로 바꾸십시오. 그러나 치환 후 나타나는 데이터가 항상 ASCII임미 보장되어야 합니다 (그렇게 되면, UTF-8 인코딩 이후에는 변경되지 않은 채로 유지됩니다).
Unidcode section을 원하는 자리 표시기로 바꾸십시오; 치환 후 나타나는 데이터에 ASCII 범위를 벗어나는 문자가 포함되어 있어도 괜찮습니다 -- UTF-8을 사용하여 인코딩됩니다.
포맷된 된 메시지는 SysLogHandler
에 의해 UTF-8 인코딩을 사용하여 인코딩*됩니다*. 위의 규칙을 따르는 경우, RFC 5424-호환 메시지를 생성할 수 있어야 합니다. 그렇지 않으면, logging이 불평하지 않을 수도 있지만, 메시지가 RFC 5424와 호환되지 않고 syslog 데몬이 불평 할 수 있습니다.
구조적 로깅 구현¶
대부분의 로깅 메시지는 사람이 읽을 수 있도록 만들어졌기 때문에 쉽게 기계에서 파싱 할 수 없지만, 프로그램에서 (복잡한 정규식을 사용하지 않고도) 구문 분석할 수 있는 구조화된 포맷으로 메시지를 출력하려는 상황이 있을 수 있습니다. 이것은 logging 패키지를 사용하여 쉽게 달성 할 수 있습니다. 이것이 달성될 수 있는 여러 가지 방법이 있지만, 다음은 JSON을 사용하여 기계가 파싱할 수 있는 방식으로 이벤트를 직렬화하는 간단한 접근법입니다:
import json
import logging
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
return '%s >>> %s' % (self.message, json.dumps(self.kwargs))
_ = StructuredMessage # 꼭 필요하지는 않지만, 가독성을 높입니다
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))
위의 스크립트가 실행되면 다음과 같이 인쇄됩니다:
message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}
항목의 순서는 사용된 파이썬 버전에 따라 다를 수 있습니다.
flowdas
딕셔너리의 순서가 보존되는 파이썬 3.7 (실질적으로는 3.6) 부터는 다음과 같이 키워드 인자 순서를 따릅니다.
message 1 >>> {"foo": "bar", "bar": "baz", "num": 123, "fnum": 123.456}
좀 더 특별한 처리가 필요한 경우, 다음 예제와 같이 사용자 정의 JSON 인코더를 사용할 수 있습니다:
from __future__ import unicode_literals
import json
import logging
# 다음은 스크립트가 2.x와 3.x에서 변경없이 실행되도록 합니다
try:
unicode
except NameError:
unicode = str
class Encoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, set):
return tuple(o)
elif isinstance(o, unicode):
return o.encode('unicode_escape').decode('ascii')
return super(Encoder, self).default(o)
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
s = Encoder().encode(self.kwargs)
return '%s >>> %s' % (self.message, s)
_ = StructuredMessage # 꼭 필요하지는 않지만, 가독성을 높입니다
def main():
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))
if __name__ == '__main__':
main()
flowdas
unicode_escape
는 코덱입니다 (자세한 내용은 codecs
를 보세요).
이 코덱을 사용한 변환이 꼭 필요하지는 않습니다.
하지만 set
은 기본 json.JSONEncoder
가 처리하지 못하기 때문에 필요합니다.
위의 스크립트를 실행하면 다음과 같이 인쇄합니다:
message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}
항목의 순서는 사용된 파이썬 버전에 따라 다를 수 있습니다.
dictConfig()
로 처리기를 사용자 정의하기¶
특정 상황에서 로깅 처리기를 사용자 정의하고 싶을 때가 있고, dictConfig()
를 사용하고 있다면 서브 클래스를 만들지 않고도 이 작업을 수행 할 수 있습니다. 예를 들어, 로그 파일의 소유권을 설정하고 싶다고 합시다. POSIX에서, shutil.chown()
을 사용하면 쉽게 할 수 있지만, 표준 라이브러리의 파일 처리기는 내장된 지원을 제공하지 않습니다. 다음과 같은 일반 함수를 사용하여 처리기 생성을 사용자 정의 할 수 있습니다:
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
그런 다음, dictConfig()
에 전달되는 로깅 구성에서, 이 함수를 호출하여 로깅 처리기를 생성하도록 지정할 수 있습니다:
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# 아래의 값들은 이 딕셔너리에서 pop 되고 처리기를 만들고 처리기의 수준과
# 포매터를 설정하는데 사용됩니다.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# 아래의 값들은 처리기 생성자 콜러블에 키워드 인자로 전달됩니다.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
이 예제에서는 단지 예를 들기 위해 pulse
라는 사용자와 그룹을 사용하여 소유권을 설정합니다. 작동하는 스크립트 chowntest.py
로 정리하면:
import logging, logging.config, os, shutil
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# 아래의 값들은 이 딕셔너리에서 pop 되고 처리기를 만들고 처리기의 수준과
# 포매터를 설정하는데 사용됩니다.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# 아래의 값들은 처리기 생성자 콜러블에 키워드 인자로 전달됩니다.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')
이것을 실행하기 위해서는, 아마도 root
로 실행해야 할 것입니다:
$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log
이 예제는 shutil.chown()
이 등장한 파이썬 3.3을 사용합니다. 이 접근법은 dictConfig()
를 지원하는 모든 파이썬 버전에서 작동합니다 - 파이썬 2.7, 3.2 이상. 3.3 이전 버전의 경우, (예를 들어) os.chown()
을 사용하여 실제 소유권 변경을 구현해야 합니다.
실제로는, 처리기 생성 함수가 프로젝트 어딘가에 있는 유틸리티 모듈에 있을 수 있습니다. 구성에 있는 다음과 같은 줄 대신:
'()': owned_file_handler,
예를 들면 이렇게 쓸 수 있습니다:
'()': 'ext://project.util.owned_file_handler',
여기서 project.util
은 함수가 있는 패키지의 실제 이름으로 바꿀 수 있습니다. 위의 작업 스크립트에서 'ext://__main__.owned_file_handler'
를 사용해도 됩니다. 여기서, 실제 콜러블은 ext://
스펙으로부터 dictConfig()
에 의해 결정됩니다.
이 예제는 희망하건대 다른 형태의 파일 변경을 - 예를 들어 특정 POSIX 권한 비트 설정 - 같은 방법으로 (os.chmod()
를 사용해서) 구현하는 방법도 알려줍니다.
물론 이 접근법은 FileHandler
이외의 처리기 유형으로도 확장될 수 있습니다 - 예를 들어, 회전 파일 처리기 중 하나 또는 다른 유형의 처리기 모두.
응용 프로그램 전체에서 특정 포맷 스타일 사용하기¶
flowdas
이 절은 앞선 대체 포매팅 스타일 사용하기 절과 내용이 꽤 겹칩니다.
파이썬 3.2에서, Formatter
는 style
키워드 매개변수를 얻었는데, 이전 버전과의 호환성을 위해 %
를 기본값으로 사용하면서 {
또는 $
를 지정하면 str.format()
과 string.Template
에 의해 지원되는 포매팅 접근법을 사용할 수 있도록 합니다. 이것은 로그 되는 최종 출력으로 로깅 메시지를 포매팅하는 것과 관계된 것이고, 개별 로깅 메시지가 만들어지는 방법과는 무관함에 주의하십시오.
로깅 호출(debug()
, info()
등)은 실제 로깅 메시지 자체를 위해서는 위치 매개 변수만을 취하고, 키워드 매개 변수는 실제 로깅 호출을 어떻게 다뤄야 하는지를 지정하는 옵션을 결정하는 용도로만 사용됩니다 (예를 들어, 트레이스백 정보를 로그 해야 할지를 가리키는 exc_info
키워드 매개 변수나 로그에 추가되는 문맥 정보를 나타내는 extra
키워드 매개 변수). 그래서 여러분은 str.format()
또는 string.Template
문법을 사용하여 직접 로깅 호출을 할 수 없습니다, 내부적으로 logging 패키지가 %-포매팅을 사용하여 포맷 문자열과 변수 인자를 병합하기 때문입니다. 이전 버전과의 호환성을 유지하는 동안은 이 상황이 바뀌지 않을 것입니다. 기존 코드에 있는 모든 로깅 호출이 %-포맷 문자열을 사용하기 때문입니다.
포맷 스타일을 특정 로거와 연관시키는 제안이 있었지만, 이전 버전과의 호환성 문제가 있는데, 기존 코드가 그 로거 이름으로 %-포매팅을 사용할 수 있기 때문입니다.
제삼자 라이브러리와 여러분의 코드 간에 상호 운용이 가능하도록 로깅 하려면, 개별 로깅 호출 수준에서 포매팅을 결정해야 합니다. 이렇게 할 때 대체 포매팅 스타일을 수용 할 수 있는 몇 가지 길이 열립니다.
LogRecord 팩토리 사용¶
파이썬 3.2에서, 위에서 언급 한 Formatter
변경 사항과 함께, logging 패키지는 setLogRecordFactory()
함수를 사용하여 사용자가 자신의 LogRecord
서브 클래스를 설정할 수 있는 기능을 얻었습니다. 이것을 사용하면, 원하는 일을 하도록 getMessage()
메서드를 재정의하는 여러분 자신의 LogRecord
서브 클래스를 설정할 수 있습니다. 이 메서드의 베이스 클래스 구현이 msg % args
포매팅이 일어나는 곳이며, 여러분이 대체 포매팅으로 치환할 수 있는 곳입니다; 그러나, 모든 포매팅 스타일을 지원하면서 다른 코드와의 상호 운용성을 보장하기 위해 %-포매팅을 기본값으로 사용하도록 주의해야 합니다. 또한, 베이스 구현과 마찬가지로 str(self.msg)
를 호출하도록 주의해야 합니다.
flowdas
str(self.msg)
호출 조건은 임의의 객체를 메시지로 사용하기 절에서 설명하는
내용과 관련있습니다. getMessage()
의 베이스 구현에서는 이런식으로
처리되고 있습니다:
def getMessage(self):
msg = str(self.msg)
if self.args:
msg = msg % self.args
return msg
자세한 정보는 setLogRecordFactory()
와 LogRecord
에 대한 레퍼런스 설명서를 참조하십시오.
사용자 정의 메시지 객체 사용¶
{}- 및 $-포매팅을 사용하여 개별 로그 메시지를 작성할 수 있는 또 다른, 아마도 더 간단한 방법이 있습니다. (임의의 객체를 메시지로 사용하기에서) 로깅 할 때 임의의 객체를 메시지 포맷 문자열로 사용할 수 있고, logging 패키지는 그 객체에 대해 str()
을 호출하여 실제 형식 문자열을 얻는다고 했던 것을 기억하실 수 있을 겁니다. 다음 두 클래스를 생각해봅시다:
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
이 중 하나를 포맷 문자열 대신 사용하면, {}- 또는 $-포매팅을 사용하여 포맷된 로그 출력의 "%(message)s", "{message}" 또는 "$message" 자리에 나타나는 실제 "message" 부분을 만들 수 있습니다. 어떤 것을 로그 하고 싶을 때마다 클래스 이름을 사용하는 것이 다소 꼴사납다면, 메시지에 M
이나 _
과 같은 별칭을 사용해서 더 쓸만하게 만들 수 있습니다 (또는 지역화에 _
를 사용하고 있다면, 아마도 __
).
flowdas
_
는 흔히 gettext.gettext()
의 별칭으로 만들어져, 여러 언어로 번역될 텍스트를 감싸는
용도로 사용됩니다.
이 접근법의 예가 아래에 나와 있습니다. 먼저, str.format()
를 사용하는 포매팅입니다:
>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)
두 번째로, string.Template
를 사용하는 포매팅입니다:
>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
한 가지 지적할 점은, 이 접근법이 성능상으로 큰 문제가 없다는 것입니다: 실제 포매팅은 로깅 호출을 할 때가 아니라 로깅 된 메시지를 실제로 처리기가 로그로 출력할 때 (그리고 실제로 그렇게 될 때만) 발생합니다. 그래서 여러분이 실수할 수도 있을 특이함은 괄호가 포맷 문자열과 인자들을 모두 감싼다는 것뿐입니다. __ 표기법이 단지 XXXMessage
클래스 중 하나에 대한 생성자 호출의 편의 문법이기 때문입니다.
dictConfig()
로 필터 구성하기¶
dictConfig()
를 사용하여 필터를 구성할 수 있습니다. 하지만 처음에는 어떻게 해야 할지 명확하지 않을 수 있습니다 (그래서 이 조리법을 제공합니다). Filter
가 표준 라이브러리에 포함된 유일한 필터 클래스이고, 많은 요구 사항을 충족시키지는 않을 것이기 때문에 (오직 베이스 클래스로 제공됩니다), 일반적으로 filter()
메서드를 재정의하는 여러분 자신의 Filter
서브 클래스를 정의할 필요가 있습니다. 이렇게 하려면, 필터를 생성하는 데 사용될 콜러블을 필터의 구성 딕셔너리에 ()
키로 지정하십시오 (클래스가 가장 분명하지만 Filter
인스턴스를 반환하는 콜러블은 모두 가능합니다). 다음은 완전한 예입니다:
import logging
import logging.config
import sys
class MyFilter(logging.Filter):
def __init__(self, param=None):
self.param = param
def filter(self, record):
if self.param is None:
allow = True
else:
allow = self.param not in record.msg
if allow:
record.msg = 'changed: ' + record.msg
return allow
LOGGING = {
'version': 1,
'filters': {
'myfilter': {
'()': MyFilter,
'param': 'noshow',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': ['myfilter']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console']
},
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.debug('hello')
logging.debug('hello - noshow')
이 예제는 인스턴스를 만드는 콜러블로 키워드 매개 변수 형식으로 구성 데이터를 전달하는 방법을 보여줍니다. 실행하면, 위의 스크립트는 다음을 인쇄합니다:
changed: hello
필터가 구성된 대로 작동하고 있음을 보여줍니다.
주목해야 할 몇 가지 추가 사항:
구성에서 직접 참조할 수 없는 경우 (예를 들어, 다른 모듈에 있고 구성 딕셔너리가 있는 곳에서 직접 임포트 할 수 없는 경우), 외부 객체에 대한 액세스 에 설명된 대로
ext://...
형식을 사용할 수 있습니다. 예를 들어, 위의 예에서MyFilter
대신'ext://__main__.MyFilter'
를 사용할 수 있습니다.필터뿐만 아니라, 이 기술을 사용자 정의 처리기 및 포매터를 구성하는데 사용할 수도 있습니다. logging이 구성에서 사용자 정의 객체를 어떻게 지원하는지에 대한 더 많은 정보는 사용자 정의 객체 를 보시고, 위의 다른 요리책 조리법 dictConfig()로 처리기를 사용자 정의하기 도 보십시오.
사용자 정의된 예외 포매팅¶
예외 포매팅을 사용자 정의하고 싶을 때가 있습니다 - 논쟁의 여지는 있지만, 예외 정보가 포함된 경우에도 이벤트 당 정확히 한 줄이 기록되기 원한다고 합시다. 다음 예제처럼, 사용자 정의 포매터 클래스를 사용할 수 있습니다:
import logging
class OneLineExceptionFormatter(logging.Formatter):
def formatException(self, exc_info):
"""
한 줄에 인쇄되도록 예외를 포맷합니다.
"""
result = super(OneLineExceptionFormatter, self).formatException(exc_info)
return repr(result) # 또는 여러분이 원하는 어떤 방법으로든 한 줄로 포맷하세요
def format(self, record):
s = super(OneLineExceptionFormatter, self).format(record)
if record.exc_text:
s = s.replace('\n', '') + '|'
return s
def configure_logging():
fh = logging.FileHandler('output.txt', 'w')
f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
'%d/%m/%Y %H:%M:%S')
fh.setFormatter(f)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(fh)
def main():
configure_logging()
logging.info('Sample message')
try:
x = 1 / 0
except ZeroDivisionError as e:
logging.exception('ZeroDivisionError: %s', e)
if __name__ == '__main__':
main()
실행하면, 정확하게 두 줄의 파일이 생성됩니다:
28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n File "logtest7.py", line 30, in main\n x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|
위의 처리는 단순하지만, 예외 정보를 원하는 대로 포맷하는 방법을 알려줍니다. traceback
모듈은 더욱 전문화된 요구에 도움이 될 수 있습니다.
로깅 메시지 말하기¶
로깅 메시지를 보여주는 대신 들려주는 것이 바람직한 상황이 있을 수 있습니다. 여러분의 시스템에 텍스트-음성 변환 (TTS) 기능이 있다면 쉽습니다, 파이썬 바인딩이 없어도 됩니다. 대부분의 TTS 시스템에는 실행할 수 있는 명령행 프로그램이 있으며, 이것을 subprocess
를 사용하여 처리기에서 호출 할 수 있습니다. 여기서 TTS 명령행 프로그램이 사용자와 상호 작용하거나, 완료하는 데 오랜 시간이 걸릴 것으로 기대되지 않으며, 로그 되는 메시지의 빈도가 메시지로 사용자를 압도할 정도로 높지 않으며, 메시지는 동시에 처리되지 않고 한 번에 하나씩 읽어도 된다고 가정합니다. 아래의 예제 구현은 다음 메시지가 처리되기 전에 하나의 메시지를 다 읽을 때까지 대기하고, 이 때문에 다른 처리기가 대기 상태로 유지될 수 있습니다. 다음은 espeak
TTS 패키지가 사용 가능하다고 가정하는 접근법을 보여주는 간단한 예입니다:
import logging
import subprocess
import sys
class TTSHandler(logging.Handler):
def emit(self, record):
msg = self.format(record)
# 영국 여성 음성으로 천천히 말합니다
cmd = ['espeak', '-s150', '-ven+f3', msg]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# 프로그램이 종료할 때까지 기다립니다
p.communicate()
def configure_logging():
h = TTSHandler()
root = logging.getLogger()
root.addHandler(h)
# 기본 포매터는 메시지를 그대로 반환합니다
root.setLevel(logging.DEBUG)
def main():
logging.info('Hello')
logging.debug('Goodbye')
if __name__ == '__main__':
configure_logging()
sys.exit(main())
실행하면, 이 스크립트는 여성 음성으로 "Hello"와 "Goodbye"를 차례대로 말합니다.
물론 위의 접근법은 다른 TTS 시스템과 명령행에서 실행되는 외부 프로그램을 통해 메시지를 처리 할 수 있는 전혀 다른 시스템에도 적용될 수 있습니다.
로깅 메시지를 버퍼링하고 조건부 출력하기¶
임시 영역에 메시지를 기록하고 특정 조건이 발생할 때만 메시지를 출력하려는 상황이 있을 수 있습니다. 예를 들어, 함수에서 디버그 이벤트를 로깅 하기를 원할 수 있습니다. 함수가 에러 없이 완료되면 수집된 디버그 정보로 로그를 어지럽히고 싶지 않지만, 에러가 있으면 에러뿐만 아니라 모든 디버그 정보를 출력하고 싶습니다.
다음은 로깅이 이러한 방식으로 작동하기 원하는 함수에 데코레이터를 사용하여 이를 수행할 방법을 보여주는 예제입니다. logging.handlers.MemoryHandler
를 사용하는데, 어떤 상황이 발생할 때까지 로그 된 이벤트를 버퍼링할 수 있도록 하고, 때가 되면 버퍼링 된 이벤트들이 flush
됩니다 - 처리를 위해 다른 처리기(target
처리기)로 전달됩니다. 기본적으로, MemoryHandler
는 버퍼가 다 차거나 수준이 지정된 임계값보다 크거나 같은 이벤트가 발생하면 플러시 됩니다. 사용자 정의 플러시 동작을 원할 경우, 이 조리법을 MemoryHandler
의 더 특수한 서브 클래스와 함께 사용할 수 있습니다.
예제 스크립트에는 간단한 함수 foo
가 있는데, 모든 로그 수준을 순회하면서, 어떤 수준으로 로그 할지를 sys.stderr
에 쓴 다음, 그 수준으로 실제 메시지를 로깅 합니다. 매개 변수를 foo
에 전달할 수 있는데, 참이면 ERROR 및 CRITICAL 수준으로 로그 합니다 - 그렇지 않으면 DEBUG, INFO 및 WARNING 수준에서만 로그 합니다.
이 스크립트는 필요한 조건부 로깅을 수행할 데코레이터로 foo
를 데코레이트 하기만 합니다. 데코레이터는 로거를 매개 변수로 받고 데코레이트 된 함수가 호출되는 동안 메모리 처리기를 연결합니다. 데코레이터는 target 처리기, 플러싱이 발생해야 하는 수준 및 버퍼 용량(버퍼 된 레코드의 수)을 추가로 매개 변수로 받을 수 있습니다. 이것들은 각각 sys.stderr
로 쓰는 StreamHandler
, logging.ERROR
, 100
을 기본값으로 합니다.
스크립트는 다음과 같습니다:
import logging
from logging.handlers import MemoryHandler
import sys
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
if target_handler is None:
target_handler = logging.StreamHandler()
if flush_level is None:
flush_level = logging.ERROR
if capacity is None:
capacity = 100
handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)
def decorator(fn):
def wrapper(*args, **kwargs):
logger.addHandler(handler)
try:
return fn(*args, **kwargs)
except Exception:
logger.exception('call failed')
raise
finally:
super(MemoryHandler, handler).flush()
logger.removeHandler(handler)
return wrapper
return decorator
def write_line(s):
sys.stderr.write('%s\n' % s)
def foo(fail=False):
write_line('about to log at DEBUG ...')
logger.debug('Actually logged at DEBUG')
write_line('about to log at INFO ...')
logger.info('Actually logged at INFO')
write_line('about to log at WARNING ...')
logger.warning('Actually logged at WARNING')
if fail:
write_line('about to log at ERROR ...')
logger.error('Actually logged at ERROR')
write_line('about to log at CRITICAL ...')
logger.critical('Actually logged at CRITICAL')
return fail
decorated_foo = log_if_errors(logger)(foo)
if __name__ == '__main__':
logger.setLevel(logging.DEBUG)
write_line('Calling undecorated foo with False')
assert not foo(False)
write_line('Calling undecorated foo with True')
assert foo(True)
write_line('Calling decorated foo with False')
assert not decorated_foo(False)
write_line('Calling decorated foo with True')
assert decorated_foo(True)
이 스크립트를 실행하면 다음과 같은 출력이 나타납니다.:
Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL
보시다시피, 실제 로깅 출력은 심각도가 ERROR 이상인 이벤트가 기록될 때만 발생하지만, 이 경우 심각도가 낮은 이전 이벤트도 기록됩니다.
물론 전통적인 데코레이션 수단을 쓸 수 있습니다.:
@log_if_errors(logger)
def foo(fail=False):
...
구성을 통해 UTC(GMT)로 시간을 포맷하기¶
때로는 UTC를 사용하여 시간을 포맷하고 싶습니다. 아래에 표시된 UTCFormatter 와 같은 클래스를 사용할 수 있습니다:
import logging
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
이제 Formatter
대신 코드에서 UTCFormatter
를 사용할 수 있습니다. 구성을 통해 이를 수행하려면, 다음에 나오는 완전한 예제에 의해 설명된 접근법으로 dictConfig()
API를 사용할 수 있습니다:
import logging
import logging.config
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'utc': {
'()': UTCFormatter,
'format': '%(asctime)s %(message)s',
},
'local': {
'format': '%(asctime)s %(message)s',
}
},
'handlers': {
'console1': {
'class': 'logging.StreamHandler',
'formatter': 'utc',
},
'console2': {
'class': 'logging.StreamHandler',
'formatter': 'local',
},
},
'root': {
'handlers': ['console1', 'console2'],
}
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.warning('The local time is %s', time.asctime())
이 스크립트를 실행하면, 다음과 같은 내용을 인쇄합니다:
2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015
시간이 한 처리기에서는 UTC로, 다른 처리기에서는 지역 시간으로 포맷되는 것을 보여줍니다.
선택적 로깅을 위해 컨텍스트 관리자 사용하기¶
로깅 구성을 일시적으로 변경하고 무언가를 한 후에 되돌리는 것이 유용할 때가 있습니다. 이를 위해, 컨텍스트 관리자는 로깅 컨텍스트를 저장하고 복원하는 가장 분명한 방법입니다. 다음은 그러한 컨텍스트 관리자의 간단한 예입니다. 컨텍스트 관리자의 범위 안에서 선택적으로 로깅 수준을 변경하고 로깅 처리기를 추가 할 수 있습니다:
import logging
import sys
class LoggingContext:
def __init__(self, logger, level=None, handler=None, close=True):
self.logger = logger
self.level = level
self.handler = handler
self.close = close
def __enter__(self):
if self.level is not None:
self.old_level = self.logger.level
self.logger.setLevel(self.level)
if self.handler:
self.logger.addHandler(self.handler)
def __exit__(self, et, ev, tb):
if self.level is not None:
self.logger.setLevel(self.old_level)
if self.handler:
self.logger.removeHandler(self.handler)
if self.handler and self.close:
self.handler.close()
# 묵시적으로 None 을 돌려줍니다 => 예외를 삼키지 않습니다
수준 값을 지정하면, 로거의 수준은 컨텍스트 관리자가 적용되는 with 블록의 범위 안에서 해당 값으로 설정됩니다. 처리기를 지정하면, 블록 진입 시 로거에 추가되고 블록에서 빠져나갈 때 제거됩니다. 블록을 빠져나갈 때 처리기를 닫도록 관리자에게 요청할 수도 있습니다 - 더는 처리기가 필요하지 않으면 이렇게 할 수 있습니다.
작동 원리를 보여주기 위해, 다음 코드 블록을 위에 추가 할 수 있습니다:
if __name__ == '__main__':
logger = logging.getLogger('foo')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
logger.info('1. This should appear just once on stderr.')
logger.debug('2. This should not appear.')
with LoggingContext(logger, level=logging.DEBUG):
logger.debug('3. This should appear once on stderr.')
logger.debug('4. This should not appear.')
h = logging.StreamHandler(sys.stdout)
with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
logger.debug('5. This should appear twice - once on stderr and once on stdout.')
logger.info('6. This should appear just once on stderr.')
logger.debug('7. This should not appear.')
우리는 초기에 로거 수준을 INFO
로 설정합니다. 그래서 메시지 #1은 나타나고 메시지 #2는 나타나지 않습니다. 그다음에 with
블록에서 수준을 DEBUG
로 임시 변경하면, 메시지 #3이 나타납니다. 블록이 종료되면 로거 수준이 INFO
로 복원되므로, 메시지 #4가 표시되지 않습니다. 그다음 with
블록에서 수준을 다시 DEBUG
로 다시 설정하지만, sys.stdout
으로 쓰는 처리기도 추가합니다. 따라서 메시지 #5는 콘솔에 두 번 표시됩니다 (stderr
를 통해 한 번, stdout
을 통해 한 번). with
문장이 완료된 후에 상태는 이전과 같으므로, (메시지 #1처럼) 메시지 #6이 나타나고, (메시지 #2처럼) 메시지 #7은 보이지 않습니다.
이렇게 만든 스크립트를 실행하면, 결과는 다음과 같습니다:
$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
다시 실행하면서 stderr
를 /dev/null
로 리디렉트하면, 다음과 같이 stdout
으로 출력된 메시지만 나타납니다:
$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.
다시 한번, 하지만 stdout
을 /dev/null
로 리디렉트하면, 이렇게 됩니다:
$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
이 경우, stdout
에 인쇄된 메시지 #5는 예상대로 나타나지 않습니다.
물론 여기서 설명한 방법을 일반화 할 수 있습니다. 예를 들어 로깅 필터를 임시로 첨부 할 수 있습니다. 위의 코드는 파이썬 2와 파이썬 3에서 모두 작동합니다.
CLI 응용 프로그램 시작 템플릿¶
다음과 같은 것들을 하는 방법을 보여주는 예입니다:
명령 줄 인자에 기반한 로깅 수준 사용하기
별도의 파일에 있는 여러 부속 명령으로 분기하고, 모두 일관된 방식으로 같은 수준에서 로깅 하기
간단하고 최소의 구성을 사용하기
어떤 서비스를 중지, 시작 또는 다시 시작하는 작업을 위한 명령 줄 응용 프로그램이 있다고 가정합니다. 이것은 예시의 목적을 위해 start.py
, stop.py
및 restart.py
에 개별 명령이 구현되고, 응용 프로그램의 메인 스크립트는 app.py
파일이 되도록 구성할 수 있습니다. 명령 줄 인자를 통해 응용 프로그램의 상세도를 제어하려고 하고, logging.INFO
를 기본값으로 한다고 더 가정해 봅시다. app.py
를 작성할 수 있는 한 가지 방법은 다음과 같습니다:
import argparse
import importlib
import logging
import os
import sys
def main(args=None):
scriptname = os.path.basename(__file__)
parser = argparse.ArgumentParser(scriptname)
levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
parser.add_argument('--log-level', default='INFO', choices=levels)
subparsers = parser.add_subparsers(dest='command',
help='Available commands:')
start_cmd = subparsers.add_parser('start', help='Start a service')
start_cmd.add_argument('name', metavar='NAME',
help='Name of service to start')
stop_cmd = subparsers.add_parser('stop',
help='Stop one or more services')
stop_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to stop')
restart_cmd = subparsers.add_parser('restart',
help='Restart one or more services')
restart_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to restart')
options = parser.parse_args()
# 명령을 분기하는 코드는 모두 이 파일에 있을 수 있습니다. 단지 예시의 목적으로, 각 명령을 별도의
# 모듈로 구현합니다.
try:
mod = importlib.import_module(options.command)
cmd = getattr(mod, 'command')
except (ImportError, AttributeError):
print('Unable to find the code for command \'%s\'' % options.command)
return 1
# 여기를 좀 다듬고 구성을 파일이나 딕셔너리에서 로드 할 수 있습니다
logging.basicConfig(level=options.log_level,
format='%(levelname)s %(name)s %(message)s')
cmd(options)
if __name__ == '__main__':
sys.exit(main())
그리고 start
, stop
및 restart
명령은 별도의 모듈로 구현할 수 있습니다, 가령 시작하려면:
# start.py
import logging
logger = logging.getLogger(__name__)
def command(options):
logger.debug('About to start %s', options.name)
# 실제로는 여기에서 명령을 처리합니다 ...
logger.info('Started the \'%s\' service.', options.name)
그리고 멈추려면:
# stop.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to stop %s', services)
# 실제로는 여기에서 명령을 처리합니다 ...
logger.info('Stopped the %s service%s.', services, plural)
비슷하게, 다시 시작하려면:
# restart.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to restart %s', services)
# 실제로는 여기에서 명령을 처리합니다 ...
logger.info('Restarted the %s service%s.', services, plural)
이 응용 프로그램을 기본 로그 수준으로 실행하면, 이런 출력을 얻습니다:
$ python app.py start foo
INFO start Started the 'foo' service.
$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
첫 번째 단어는 로깅 수준이고, 두 번째 단어는 이벤트가 로그 된 장소의 모듈이나 패키지 이름입니다.
로깅 수준을 변경하면, 로그로 전송되는 정보를 변경할 수 있습니다. 예를 들어, 우리가 더 많은 정보를 원한다면:
$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.
$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
그리고 덜 원한다면:
$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz
이 경우, WARNING
수준 이상으로 아무것도 로그 하지 않았으므로, 명령은 콘솔에 아무것도 인쇄하지 않습니다.
로깅을 위한 Qt GUI¶
때때로 나오는 질문은 GUI 응용 프로그램에 로그 하는 방법입니다. Qt 프레임워크는 PySide2나 PyQt5 라이브러리를 사용하는 파이썬 바인딩이 있는 인기 있는 교차 플랫폼 UI 프레임워크입니다.
다음 예는 Qt GUI에 로그 하는 방법을 보여줍니다. 이것은 콜러블을 취하는 간단한 QtHandler
클래스를 소개합니다. 이 클래스는 GUI 업데이트를 하는 메인 스레드의 슬롯이어야 합니다. 또한 UI 자체 (수동 로깅을 위한 버튼을 통해) 뿐만 아니라 백그라운드에서 작업하는 작업자 스레드에서 GUI에 로그 하는 방법을 보여주기 위해 작업자 스레드도 만듭니다 (여기서는, 단지 임의의 짧은 지연을 주고 임의의 수준으로 메시지를 로깅 합니다).
작업자 스레드는 threading
모듈 대신 Qt의 QThread
클래스를 사용하여 구현되는데, 다른 Qt
구성 요소와 더 잘 통합되는 QThread
를 사용해야 하는 상황이 있기 때문입니다.
이 코드는 PySide2
나 PyQt5
최신 배포에서 작동해야 합니다. 여러분은 이 접근법을 이전 버전의 Qt에 적용할 수 있을 겁니다. 자세한 내용은 코드 조각의 주석을 참조하십시오.
import datetime
import logging
import random
import sys
import time
# PySide2와 PyQt5의 사소한 차이점을 다룹니다
try:
from PySide2 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
from PyQt5 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
logger = logging.getLogger(__name__)
#
# 시그널은 올바르게 초기화되기 위해서 QObject나 서브 클래스에 포함될 필요가 있습니다.
#
class Signaller(QtCore.QObject):
signal = Signal(str, logging.LogRecord)
#
# Qt GUI로의 출력은 메인 스레드에서만 발생해야 합니다. 따라서, 이 처리기는 메인 스레드에서 실행되도록
# 설정된 슬롯 함수를 받아들이도록 설계되었습니다. 이 예제에서, 함수는 포맷된 로그 메시지인 문자열 인자와
# 이를 생성한 로그 레코드를 받아들입니다. 포맷된 문자열은 단지 편의를 위한 것입니다 - 슬롯 함수 자체에서
# 원하는 방식으로 문자열을 포맷하여 출력할 수 있습니다.
#
# 슬롯 함수가 원하는 GUI 업데이트를 수행하도록 지정합니다. 처리기는 구체적인 UI 요소를 모르거나 신경 쓰지
# 않습니다.
#
class QtHandler(logging.Handler):
def __init__(self, slotfunc, *args, **kwargs):
super(QtHandler, self).__init__(*args, **kwargs)
self.signaller = Signaller()
self.signaller.signal.connect(slotfunc)
def emit(self, record):
s = self.format(record)
self.signaller.signal.emit(s, record)
#
# 이 예제는 QThread를 사용하는데, 파이썬 수준의 스레드는 "Dummy-1" 과 같은 이름이 붙는다는 뜻입니다.
# 아래 함수는 현재 스레드의 Qt 이름을 가져옵니다.
#
def ctname():
return QtCore.QThread.currentThread().objectName()
#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
#
# 이 작업자 클래스는 메인 스레드와 분리된 스레드에서 수행되는 작업을 나타냅니다. 작업을 수행하기 위해
# 스레드를 시작하는 방법은 작업자의 슬롯에 연결되는 버튼 누르기를 통하는 것입니다.
#
# LogRecord의 기본 threadName 값은 별로 사용되지 않기 때문에, 위에서 계산된 QThread 이름을
# 포함하는 qThreadName을 추가하고, 그 값을 "extra" 딕셔너리에 전달하여 QThread 이름으로
# LogRecord를 갱신하도록 합니다.
#
# 이 예제 작업자는 단지 몇 초 정도의 임의 지연으로 산재시킨 메시지를 순차적으로 출력합니다.
#
class Worker(QtCore.QObject):
@Slot()
def start(self):
extra = {'qThreadName': ctname() }
logger.debug('Started work', extra=extra)
i = 1
# 인터럽트 될 때까지 스레드를 실행합니다. 이것은 상당히 깨끗한 스레드 종료를 허락합니다.
while not QtCore.QThread.currentThread().isInterruptionRequested():
delay = 0.5 + random.random() * 2
time.sleep(delay)
level = random.choice(LEVELS)
logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
i += 1
#
# 이 요리책 예제를 위한 간단한 UI를 구현합니다. 다음과 같은 것들이 포함됩니다:
#
# * 포맷된 로그 메시지를 담는 읽기 전용 텍스트 편집 창
# * 별도의 스레드에서 작업과 로그를 시작하는 버튼
# * 메인 스레드에서 무엇인가를 로그 하는 버튼
# * 로그 창을 지우는 버튼
#
class Window(QtWidgets.QWidget):
COLORS = {
logging.DEBUG: 'black',
logging.INFO: 'blue',
logging.WARNING: 'orange',
logging.ERROR: 'red',
logging.CRITICAL: 'purple',
}
def __init__(self, app):
super(Window, self).__init__()
self.app = app
self.textedit = te = QtWidgets.QPlainTextEdit(self)
# 플랫폼의 기본 고정 폭 글꼴을 설정합니다
f = QtGui.QFont('nosuchfont')
f.setStyleHint(f.Monospace)
te.setFont(f)
te.setReadOnly(True)
PB = QtWidgets.QPushButton
self.work_button = PB('Start background work', self)
self.log_button = PB('Log a message at a random level', self)
self.clear_button = PB('Clear log window', self)
self.handler = h = QtHandler(self.update_status)
# 포맷 문자열에서 threadName 대신 qThreadName을 사용하는 것을 기억하세요.
fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
formatter = logging.Formatter(fs)
h.setFormatter(formatter)
logger.addHandler(h)
# 종료할 때 QThread를 종료하도록 설정합니다
app.aboutToQuit.connect(self.force_quit)
# 모든 위젯을 배치합니다
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(te)
layout.addWidget(self.work_button)
layout.addWidget(self.log_button)
layout.addWidget(self.clear_button)
self.setFixedSize(900, 400)
# 비 작업자 슬롯과 시그널을 연결합니다
self.log_button.clicked.connect(self.manual_update)
self.clear_button.clicked.connect(self.clear_display)
# 새 작업자 스레드를 시작하고 작업자를 위한 슬롯을 연결합니다
self.start_thread()
self.work_button.clicked.connect(self.worker.start)
# 일단 시작되면, 버튼은 비활성화되어야 합니다
self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))
def start_thread(self):
self.worker = Worker()
self.worker_thread = QtCore.QThread()
self.worker.setObjectName('Worker')
self.worker_thread.setObjectName('WorkerThread') # for qThreadName
self.worker.moveToThread(self.worker_thread)
# 이것은 작업자 스레드에서 이벤트 루프를 시작합니다
self.worker_thread.start()
def kill_thread(self):
# 작업자에게 중지하라고 알립니다, 그런 다음 종료하라고 알리고 종료할 때까지 기다립니다
self.worker_thread.requestInterruption()
if self.worker_thread.isRunning():
self.worker_thread.quit()
self.worker_thread.wait()
else:
print('worker has already exited.')
def force_quit(self):
# 창을 닫을 때 사용
if self.worker_thread.isRunning():
self.kill_thread()
# 아래 함수는 UI를 업데이트하고 슬롯이 설정된 메인 스레드에서 실행됩니다
@Slot(str, logging.LogRecord)
def update_status(self, status, record):
color = self.COLORS.get(record.levelno, 'black')
s = '<pre><font color="%s">%s</font></pre>' % (color, status)
self.textedit.appendHtml(s)
@Slot()
def manual_update(self):
# This function uses the formatted message passed in, but also uses
# information from the record to format the message in an appropriate
# color according to its severity (level).
level = random.choice(LEVELS)
extra = {'qThreadName': ctname() }
logger.log(level, 'Manually logged!', extra=extra)
@Slot()
def clear_display(self):
self.textedit.clear()
def main():
QtCore.QThread.currentThread().setObjectName('MainThread')
logging.getLogger().setLevel(logging.DEBUG)
app = QtWidgets.QApplication(sys.argv)
example = Window(app)
example.show()
sys.exit(app.exec_())
if __name__=='__main__':
main()