multiprocessing — 프로세스 기반 병렬 처리¶
소스 코드: Lib/multiprocessing/
소개¶
multiprocessing`는 :mod:!threading` 모듈과 유사한 API를 사용하여 프로세스 스폰을 지원하는 패키지입니다. multiprocessing 패키지는 로컬 및 원격 동시성을 모두 제공하여, 쓰레드 대신 서브프로세스를 사용함으로써 Global Interpreter Lock <global interpreter lock>`을 효과적으로 우회합니다. 따라서 :mod:!multiprocessing` 모듈은 프로그래머가 특정 기계의 여러 프로세서를 완전히 활용할 수 있도록 합니다. POSIX와 Windows 모두에서 실행됩니다.
multiprocessing 모듈은 Pool 객체를 도입하며, 이 객체는 여러 입력 값에 걸쳐 함수의 실행을 병렬화하고 입력 데이터를 프로세스 간에 분산시키는 편리한 방법을 제공합니다(데이터 병렬 처리). 다음 예제는 자식 프로세스가 해당 모듈을 성공적으로 임포트 할 수 있도록, 모듈에서 이러한 함수를 정의하는 일반적인 방식을 보여줍니다. 이 기본 데이터 병렬 처리 예제는 Pool 를 사용합니다:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
표준 출력으로 다음과 같은 것을 인쇄합니다
[1, 4, 9]
multiprocessing 모듈은 또한 threading 모듈에는 유사한 항목이 없는 API들을 도입하는데, 실행 중인 프로세스를 terminate 또는 interrupt 하는 기능 등이 있습니다.
더 보기
concurrent.futures.ProcessPoolExecutor 는 호출 프로세스의 실행을 차단하지 않고 백그라운드 프로세스로 작업을 전송하는 더 높은 수준의 인터페이스를 제공합니다. Pool 인터페이스를 직접 사용하는 것과 비교하여, concurrent.futures API는 기본 프로세스 풀에 작업을 전송하는 것이 결과 대기를 하는 것과 분리될 수 있도록 보다 쉽게 허용합니다.
Process 클래스¶
multiprocessing`에서, 프로세스는 :class:`Process 객체를 생성하고 그 후 start() 메서드를 호출하여 스폰됩니다. :class:`Process`는 :class:`threading.Thread`의 API를 따릅니다. 다중 프로세스 프로그램의 간단한 예는 다음과 같습니다:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
이 과정에 참여하는 개별 프로세스의 ID를 보기 위해, 이렇게 예제를 확장합니다:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
if __name__ == '__main__' 부분이 필요한 이유에 대한 설명은 프로그래밍 지침을 보십시오.
Process`에 대한 인수는 보통 자식 프로세스로 전달될 수 있도록 피클 가능해야 합니다. 위의 예제를 REPL에 직접 입력하려고 하면, 자식 프로세스가 ``__main__` 모듈에서 f 함수를 찾으려고 시도하며 :exc:`AttributeError`가 발생할 수 있습니다.
컨텍스트 및 시작 방법¶
운영체제 플랫폼에 따라, :mod:`!multiprocessing`은 프로세스를 시작하는 세 가지 방법을 지원합니다. 이러한 시작 방법 은
- spawn
부모 프로세스는 새로운 Python 인터프리터 프로세스를 시작합니다. 자식 프로세스는 오직 해당
run()메서드를 실행하는 데 필요한 리소스만을 상속받게 됩니다. 특히, 부모 프로세스의 불필요한 파일 기술자와 핸들은 상속되지 않습니다. 이 방법을 사용하여 프로세스를 시작하는 것은 fork 나 forkserver 를 사용하는 것에 비해 상당히 느립니다.POSIX 및 Windows 플랫폼에서 사용 가능합니다. 윈도우와 macOS의 기본값입니다.
- fork
부모 프로세스는
os.fork()를 사용하여 파이썬 인터프리터를 포크 합니다. 자식 프로세스는, 시작될 때, 부모 프로세스와 실질적으로 같습니다. 부모의 모든 자원이 자식 프로세스에 의해 상속됩니다. 다중 스레드 프로세스를 안전하게 포크 하기 어렵다는 점에 주의하십시오.POSIX 시스템에서 사용 가능합니다.
버전 3.14에서 변경: 이는 어떤 플랫폼에서도 더 이상 기본 시작 방법이 아닙니다. fork 가 필요한 코드는 반드시
get_context()또는set_start_method()를 통해 명시적으로 지정해야 합니다.버전 3.12에서 변경: 만약 Python이 프로세스가 여러 스레드를 가지고 있음을 감지할 수 있다면, 이 시작 방법이 내부적으로 호출하는
os.fork()함수는DeprecationWarning`을 발생시킵니다. 다른 시작 방법을 사용하십시오. 더 자세한 설명은 :func:`os.fork문서를 참조하십시오.
- forkserver
프로그램이 시작되고 forkserver 시작 방법이 선택되면, 서버 프로세스가 스폰됩니다. 그 이후부터는 새로운 프로세스가 필요할 때마다 부모 프로세스는 서버에 연결하여 새 프로세스를 포크해 달라고 요청합니다. 포크 서버 프로세스는 시스템 라이브러리나 사전 로드된 임포트가 사이드 효과로 스레드를 스폰하는 경우를 제외하고는 단일 스레드로 작동하므로, 일반적으로 :func:`os.fork`를 사용해도 안전합니다. 불필요한 리소스는 상속되지 않습니다.
리눅스와 같이 Unix 파이프를 통해 파일 기술자를 전달하는 것을 지원하는 POSIX 플랫폼에서 사용할 수 있습니다. 해당 플랫폼의 기본값입니다.
버전 3.14에서 변경: 이는 POSIX 플랫폼의 기본 시작 방법이 되었습니다.
버전 3.4에서 변경: spawn 은 모든 POSIX 플랫폼에 추가되었고, forkserver 는 일부 POSIX 플랫폼용으로 추가되었습니다. 윈도우에서는 자식 프로세스가 부모의 모든 상속 가능한 핸들을 더 이상 상속받지 않습니다.
버전 3.8에서 변경: macOS에서는, spawn 시작 방법이 이제 기본값입니다. fork 시작 방법은 macOS 시스템 라이브러리가 스레드를 시작할 수 있어 서브 프로세스의 충돌로 이어질 수 있으므로 안전하지 않은 것으로 간주해야 합니다. :issue:`33725`를 참조하십시오.
버전 3.14에서 변경: POSIX 플랫폼에서 기본 시작 방법은 일반적인 멀티스레드 프로세스 비호환성을 피하면서 성능을 유지하기 위해 fork 에서 forkserver 로 변경되었습니다. gh-84559 를 참조하십시오.
POSIX에서 spawn 또는 forkserver 시작 방법을 사용하면 프로그램의 프로세스에 의해 생성된 연결 해제된 이름 지정 시스템 리소스(SharedMemory 객체와 같은)를 추적하는 자원 추적기 (resource tracker) 프로세스가 또한 시작됩니다. 모든 프로세스가 종료되면 자원 추적기가 남아있는 추적된 객체를 연결 해제합니다. 보통은 남아 있는 것이 없어야 하지만, 만약 프로세스가 시그널에 의해 종료되었다면 “유출”된 자원이 있을 수 있습니다. (유출된 세마포어든 공유 메모리 세그먼트든 다음 재부팅까지 자동으로 연결 해제되지 않습니다. 이는 두 객체 모두에게 문제가 될 수 있는데, 시스템이 허용하는 이름 지정 세마포어의 수가 제한적이고, 공유 메모리 세그먼트는 주 메모리의 공간을 차지하기 때문입니다.)
시작 방법을 선택하려면 메인 모듈의 if __name__ == '__main__' 절에서 set_start_method()를 사용하십시오. 예를 들면:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method() 는 프로그램에서 한 번만 사용되어야 합니다.
또는, get_context()를 사용하여 컨텍스트 객체를 얻을 수 있습니다. 컨텍스트 객체는 multiprocessing 모듈과 같은 API를 제공하므로 한 프로그램에서 여러 시작 방법을 사용할 수 있습니다.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
한 컨텍스트와 관련된 객체는 다른 컨텍스트의 프로세스와 호환되지 않을 수 있음에 주의하십시오. 특히 fork 컨텍스트를 사용하여 생성된 록은 spawn 또는 forkserver 시작 방법을 사용하여 시작된 프로세스로 전달될 수 없습니다.
multiprocessing 또는 :class:`~concurrent.futures.ProcessPoolExecutor`를 사용하는 라이브러리는 사용자가 자신만의 프로세싱 컨텍스트를 제공하도록 설계되어야 합니다. 라이브러리 내에서 특정 컨텍스트를 사용하면 라이브러리 사용자 애플리케이션의 나머지 부분과 비호환성이 발생할 수 있습니다. 항상 라이브러리가 특정 시작 방법을 요구하는지 문서화하십시오.
경고
일반적으로 'spawn' 및 'forkserver' 시작 방법은 POSIX 시스템에서 “프리즈된” 실행 파일(즉, PyInstaller 및 cx_Freeze 와 같은 패키지가 생성하는 바이너리)과 함께 사용할 수 없습니다. 코드가 스레드를 사용하지 않는 경우에만 'fork' 시작 방법이 작동할 수 있습니다.
프로세스 간 객체 교환¶
:mod:`!multiprocessing`은 프로세스 간 두 가지 유형의 통신 채널을 지원합니다:
큐
Queue클래스는queue.Queue의 클론에 가깝습니다. 예를 들면:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # "[42, None, 'hello']"를 출력합니다 p.join()큐는 스레드 및 프로세스 안전성을 보장합니다.
multiprocessing큐에 넣은 모든 객체는 직렬화됩니다.
파이프
Pipe()함수는 파이프로 연결된 한 쌍의 연결 객체를 돌려주는데 기본적으로 양방향(duplex)입니다. 예를 들면:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # "[42, None, 'hello']"를 출력합니다 p.join()
Pipe()가 반환하는 두 개의 연결 객체는 파이프의 두 끝을 나타냅니다. 각 연결 객체에는 (다른 것도 있지만)send()및recv()메서드가 있습니다. 두 프로세스 (또는 스레드)가 파이프의 같은 끝에서 동시에 읽거나 쓰려고 하면 파이프의 데이터가 손상될 수 있습니다. 물론 파이프의 다른 끝을 동시에 사용하는 프로세스로 인해 손상될 위험은 없습니다.
send()메서드는 객체를 직렬화하고 :meth:`~Connection.recv`는 객체를 다시 생성합니다.
프로세스 간 동기화¶
:mod:`!multiprocessing`은 :mod:`threading`의 모든 동기화 프리미티브에 대한 등가물을 포함하고 있습니다. 예를 들어, 한 번에 하나의 프로세스만 표준 출력으로 인쇄하도록 록을 사용할 수 있습니다:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
록을 사용하지 않으면 다른 프로세스의 출력들이 모두 섞일 수 있습니다.
작업자 풀 사용¶
Pool 클래스는 작업자 프로세스 풀을 나타냅니다. 여기에는 몇 가지 다른 방법으로 작업을 작업자 프로세스로 넘길 수 있는 메서드가 있습니다.
예를 들면:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# 4개의 워커 프로세스 시작
with Pool(processes=4) as pool:
# "[0, 1, 4,..., 81]" 출력
print(pool.map(f, range(10)))
# 임의 순서로 동일한 숫자들을 출력
for i in pool.imap_unordered(f, range(10)):
print(i)
# "f(20)" 비동기 평가
res = pool.apply_async(f, (20,)) # *오직* 하나의 프로세스에서 실행됨
print(res.get(timeout=1)) # "400" 출력
# "os.getpid()" 비동기로 평가
res = pool.apply_async(os.getpid, ()) # *오직* 하나의 프로세스에서 실행됨
print(res.get(timeout=1)) # 해당 프로세스의 PID를 출력
# 여러 평가를 비동기적으로 시작하면 *더 많은* 프로세스를 사용할 수 있음
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# 하나의 워커에게 10초 동안 대기 요청
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("인내심이 부족하여 multiprocessing.TimeoutError가 발생했습니다")
print("현재로서는 풀을 사용하여 더 많은 작업을 할 수 있습니다")
# 'with'-블록을 종료하면 풀이 중단됩니다
print("이제 풀은 닫혔으며 더 이상 사용할 수 없습니다")
풀의 메서드는 풀을 만든 프로세스에서만 사용되어야 함에 유의하세요.
참고
이 패키지 내의 기능을 사용하려면 __main__ 모듈을 자식이 임포트 할 수 있어야 합니다. 이것은 프로그래밍 지침에서 다루지만, 여기에서 지적할 가치가 있습니다. 이것은 몇몇 예제, 가령 multiprocessing.pool.Pool 예제가 대화형 인터프리터에서 동작하지 않음을 의미합니다. 예를 들면:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
(이것을 시도해 보면 실제로 세 개의 전체 트레이스백이 어느 정도 임의로 번갈아 출력됩니다. 그런 다음 부모 프로세스를 중지시켜야 할 수도 있습니다.)
레퍼런스¶
multiprocessing 패키지는 대부분 threading 모듈의 API를 복제합니다.
전역 시작 방법¶
Python은 프로세스를 생성하고 초기화하는 여러 방법을 지원합니다. 전역 시작 방법은 프로세스 생성에 대한 기본 메커니즘을 설정합니다.
여러 multiprocessing 함수 및 메서드는 특정 객체를 인스턴스화할 수 있으므로, 아직 전역 시작 방법이 설정되지 않았다면 암묵적으로 시스템의 기본값으로 설정하게 됩니다. 전역 시작 방법은 한 번만 설정할 수 있습니다. 시스템 기본값에서 시작 방법을 변경해야 하는 경우, 함수나 메서드를 호출하거나 이 객체를 생성하기 전에 사전에 전역 시작 방법을 활성적으로 설정해야 합니다.
Process와 예외¶
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
프로세스 객체는 별도의 프로세스에서 실행되는 작업을 나타냅니다.
Process클래스는threading.Thread의 모든 메서드와 같은 메서드를 갖습니다.The constructor should always be called with keyword arguments. group should always be
None; it exists solely for compatibility withthreading.Thread. target is the callable object to be invoked by therun()method. It defaults toNone, meaning nothing is called. name is the process name (seenamefor more details). args is the argument tuple for the target invocation. kwargs is a dictionary of keyword arguments for the target invocation. If provided, the keyword-only daemon argument sets the processdaemonflag toTrueorFalse. IfNone(the default), this flag will be inherited from the creating process.기본적으로, target 에는 아무 인자도 전달되지 않습니다. 기본값이
()인 args 인자를 사용하여 target 에 전달할 인자 목록 또는 튜플을 지정할 수 있습니다.서브 클래스가 생성자를 재정의하면, 프로세스에 다른 작업을 수행하기 전에 베이스 클래스 생성자(
super().__init__())를 호출했는지 확인해야 합니다.참고
일반적으로 :class:`Process`의 모든 인자는 피클 가능해야 합니다. 이는 로컬로 정의된 target 함수를 사용하여 REPL에서 :class:`Process`를 생성하거나 :class:`concurrent.futures.ProcessPoolExecutor`를 사용하는 경우 자주 관찰됩니다.
현재 REPL 세션에 정의된 콜러블 객체를 전달하면, 자식 프로세스가 target 으로 시작할 때 포착되지 않은
AttributeError예외로 인해 종료됩니다. 이는 unpickling 과정에서 로드되기 위해 함수가 임포트 가능한 모듈 내에 정의되어 있어야 하기 때문입니다.자식으로부터 발생할 수 있는 이 포착 불가능한 오류의 예:
>>> import multiprocessing as mp >>> def knigit(): ... print("Ni!") ... >>> process = mp.Process(target=knigit) >>> process.start() >>> Traceback (most recent call last): File ".../multiprocessing/spawn.py", line ..., in spawn_main File ".../multiprocessing/spawn.py", line ..., in _main AttributeError: module '__main__' has no attribute 'knigit' >>> process <SpawnProcess name='SpawnProcess-1' pid=379473 parent=378707 stopped exitcode=1>
spawn 과 forkserver 시작 방법 을 참조하십시오. 이 제한 사항은
"fork"시작 방법을 사용하는 경우에는 사실이 아니지만, Python3.14부터는 어떤 플랫폼에서도 기본값이 아닙니다. 컨텍스트 및 시작 방법 를 참조하십시오. 또한 gh-132898 도 참고하십시오.버전 3.3에서 변경: daemon 매개 변수를 추가했습니다.
- run()¶
프로세스의 활동을 나타내는 메서드.
서브 클래스에서 이 메서드를 재정의할 수 있습니다. 표준
run()메서드는 객체의 생성자에 target 인자로 전달된 콜러블 객체를 호출하는데 (있다면) args 와 kwargs 인자를 각각 위치 인자와 키워드 인자로 사용합니다.:class:`Process`에 전달되는 args 인자로 목록이나 튜플을 사용하면 동일한 효과를 얻습니다.
예제:
>>> from multiprocessing import Process >>> p = Process(target=print, args=[1]) >>> p.run() 1 >>> p = Process(target=print, args=(1,)) >>> p.run() 1
- join([timeout])¶
선택적 인자 timeout 이
None(기본값) 인 경우, 메서드는join()메서드가 호출된 프로세스가 종료될 때까지 블록 됩니다. timeout 이 양수면 최대 timeout 초 동안 블록 됩니다. 이 메서드는 프로세스가 종료되거나 메서드가 시간 초과 되면None을 돌려줌에 주의해야 합니다. 프로세스의exitcode를 검사하여 종료되었는지 확인하십시오.프로세스는 여러 번 조인할 수 있습니다.
교착 상태를 유발할 수 있으므로 프로세스는 자신을 조인할 수 없습니다. 프로세스가 시작되기 전에 프로세스에 조인하려고 하면 에러가 발생합니다.
- name¶
프로세스의 이름. 이름은 식별 목적으로만 사용되는 문자열입니다. 다른 의미는 없습니다. 여러 프로세스에 같은 이름이 주어질 수 있습니다.
초기 이름은 생성자에 의해 설정됩니다. 명시적 이름이 생성자에 제공되지 않으면, ‘Process-N1:N2:…:Nk’ 형식의 이름이 만들어지는데, 각각의 Nk 는 부모의 N 번째 자식입니다.
- daemon¶
프로세스의 데몬 플래그, 논리값.
start()가 호출되기 전에 설정되어야 합니다.초깃값은 생성 프로세스에서 상속됩니다.
프로세스가 종료할 때, 모든 데몬 자식 프로세스를 강제 종료시키려고(terminate) 시도합니다.
데몬 프로세스는 하위 프로세스를 만들 수 없음에 유의하십시오. 그렇지 않으면 부모 프로세스가 종료될 때 데몬 프로세스가 강제 종료되어, 데몬 프로세스가 자식 프로세스를 고아로 남기게 됩니다. 또한, 이들은 유닉스 데몬이나 서비스가 아닙니다, 데몬이 아닌 프로세스들이 종료되면 강제 종료되는 (그리고 조인되지 않는) 일반 프로세스입니다.
threading.ThreadAPI 외에도Process객체는 다음 어트리뷰트와 메서드도 지원합니다 :- pid¶
프로세스 ID를 돌려줍니다. 프로세스가 스폰 되기 전에는
None입니다.
- exitcode¶
자식 프로세스의 종료 코드입니다. 과정이 아직 종료되지 않았다면
None입니다.만약 자식의
run()메서드가 정상적으로 반환되었다면, 종료 코드는 0입니다. 만약 정수 인자 N 와 함께sys.exit()를 통해 종료되었다면, 종료 코드는 N 이 됩니다.만약 자식이
run()내부에서 처리되지 않은 예외로 인해 종료되었다면, 종료 코드는 1입니다. 만약 시그널 N 에 의해 종료되었다면, 종료 코드는 음수 값 -N 이 됩니다.
- authkey¶
프로세스의 인증 키 (바이트열) 입니다.
multiprocessing`을 초기화하면 메인 프로세스에는 :func:`os.urandom을 사용하여 임의의 문자열이 할당됩니다.Process객체가 생성될 때, 부모 프로세스의 인증 키를 상속받습니다.authkey를 다른 바이트열로 설정하여 변경할 수 있습니다.인증 키를 참조하세요.
- sentinel¶
프로세스가 끝나면 “준비(ready)” 될 시스템 객체의 숫자 핸들.
콜론(~)연속된 여러 이벤트를 한 번에 기다리려면 이 값을 사용할 수 있습니다(
multiprocessing.connection.wait()). 그렇지 않다면 :meth:`join`을 호출하는 것이 더 간단합니다.Windows에서는
WaitForSingleObject및WaitForMultipleObjectsAPI 호출 계열에서 사용할 수 있는 OS 핸들입니다. POSIX에서는select모듈의 원시 메서드와 사용할 수 있는 파일 기술자입니다.Added in version 3.3.
- interrupt()¶
프로세스를 종료합니다. POSIX에서는
SIGINT시그널을 사용하여 작동합니다. Windows에서의 동작은 정의되지 않았습니다.기본적으로, 이 메서드는 자식 프로세스를
KeyboardInterrupt`를 발생시켜 종료합니다. 이러한 동작은 자식 프로세스에서 해당 시그널 핸들러를 설정하여 변경할 수 있습니다(:func:`signal.signal+SIGINT).참고: 자식 프로세스가 :exc:`KeyboardInterrupt`를 포착하고 무시하면, 프로세스는 종료되지 않습니다.
참고: 기본 동작은 예외가 처리되지 않은 경우처럼
exitcode를1로 설정할 것입니다. 다르게exitcode를 갖도록 하려면, 단순히KeyboardInterrupt를 포착하고exit(your_code)를 호출할 수 있습니다.Added in version 3.14.
- terminate()¶
프로세스를 종료합니다. POSIX에서는 이 작업을
SIGTERM시그널을 사용하여 수행하며, Windows에서는TerminateProcess`가 사용됩니다. 종료 핸들러나 `finally()절 등은 실행되지 않는에 유의하십시오.프로세스의 자손 프로세스들은 강제 종료되지 않을 것입니다 – 단순히 고아가 될 것입니다.
경고
연결된 프로세스가 파이프 또는 큐를 사용할 때 이 메서드를 사용하면, 파이프 또는 큐가 손상되어 다른 프로세스에서 사용할 수 없게 될 수 있습니다. 마찬가지로, 프로세스가 록이나 세마포어 등을 획득한 경우 강제 종료하면 다른 프로세스가 교착 상태가 될 수 있습니다.
- kill()¶
terminate`와 동일하지만 POSIX에서는 ``SIGKILL`()시그널을 사용합니다.Added in version 3.7.
- close()¶
Process객체를 닫아, 그것과 관련된 모든 자원을 해제합니다. 하부 프로세스가 여전히 실행 중이면ValueError가 발생합니다. 일단close()가 성공적으로 반환되면,Process객체의 다른 대부분의 메서드와 어트리뷰트는ValueError를 발생시킵니다.Added in version 3.7.
start(),join(),is_alive(),terminate()및exitcode메서드는 프로세스 객체를 생성한 프로세스에 의해서만 호출되어야 합니다.Process의 몇몇 메서드를 사용하는 예제:>>> import multiprocessing, time, signal >>> mp_context = multiprocessing.get_context('spawn') >>> p = mp_context.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <...Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <...Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <...Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError¶
모든
multiprocessing예외의 베이스 클래스입니다.
- exception multiprocessing.BufferTooShort¶
:meth:`Connection.recv_bytes_into`가 공급된 버퍼 객체가 읽은 메시지에 너무 작을 때 발생하는 예외입니다.
e가BufferTooShort의 인스턴스라면,e.args[0]는 메시지를 바이트열로 줍니다.
- exception multiprocessing.AuthenticationError¶
인증 에러가 일어날 때 발생합니다.
- exception multiprocessing.TimeoutError¶
시간제한이 초과하였을 때 시간제한을 건 메서드에 의해 발생합니다.
파이프와 큐¶
여러 프로세스를 사용할 때, 일반적으로 프로세스 간 통신을 위해 메시지 전달을 사용하고 록과 같은 동기화 프리미티브 사용을 피합니다.
메시지를 전달하기 위해 Pipe() (두 프로세스 간의 연결) 또는 큐(여러 생산자와 소비자를 허용합니다)를 사용할 수 있습니다.
Queue, SimpleQueue 그리고 JoinableQueue 형은, 표준 라이브러리의 queue.Queue 클래스에 따라 모델링 된, 다중 생산자, 다중 소비자 FIFO 큐입니다. 이것들은 파이썬 2.5의 queue.Queue 클래스에서 도입된 task_done()과 join() 메서드가 Queue 에 없다는 점에서 다릅니다.
JoinableQueue를 사용하면, 큐에서 제거된 작업마다 JoinableQueue.task_done()을 호출해야 합니다. 그렇지 않으면 완료되지 않은 작업의 수를 세는 데 사용되는 세마포어가 결국 오버플로 되어 예외를 일으킵니다.
다른 Python 큐 구현체와 다른 점은, multiprocessing 큐는 put 된 모든 객체를 pickle 을 사용하여 직렬화한다는 것입니다. get 메서드가 반환하는 객체는 원본 객체와 메모리를 공유하지 않는 재구성된 객체입니다.
관리자 객체를 사용하여 공유 큐를 생성할 수도 있습니다 – 관리자를 보세요.
참고
multiprocessing 은 제한 시간 초과를 알리기 위해 일반적인 queue.Empty 와 queue.Full 예외를 사용합니다. 이들은 multiprocessing 네임스페이스에서 사용할 수 없으므로 :mod:`queue`에서 임포트해야 합니다.
참고
객체를 큐에 넣으면, 객체는 피클 되고 배경 스레드가 나중에 피클 된 데이터를 하부 파이프로 플러시 합니다. 이것은 다소 의외의 결과로 이어지지만, 실제적인 어려움을 일으키지는 않아야 합니다 – 이것이 여러분을 정말로 신경 쓰이게 한다면, 대신 관리자 로 만든 큐를 사용할 수 있습니다.
빈 큐에 객체를 넣은 후에,
empty()메서드가False를 반환하고get_nowait()가queue.Empty를 일으키지 않고 반환할 수 있기 전까지 극히 작은 지연이 있을 수 있습니다.여러 프로세스가 객체를 큐에 넣는 경우, 반대편에서 객체가 다른 순서로 수신될 수 있습니다. 그러나, 같은 프로세스에 의해 큐에 들어간 객체들은 항상 상대적인 순서가 유지됩니다.
경고
Queue를 사용하려고 하는 동안 Process.terminate() 또는 os.kill() 을 사용하여 프로세스를 죽이면, 큐의 데이터가 손상될 수 있습니다. 이로 인해 나중에 다른 프로세스가 큐를 사용하려고 할 때 예외가 발생할 수 있습니다.
경고
위에서 언급했듯이, 자식 프로세스가 항목을 큐에 넣었을 때 (그리고 JoinableQueue.cancel_join_thread 를 사용하지 않았다면), 버퍼링 된 모든 항목이 파이프로 플러시 될 때까지 해당 프로세스가 종료되지 않습니다.
이것은, 여러분이 그 자식 프로세스를 조인하려고 하면, 큐에 넣은 모든 항목을 소진하지 않는 한 교착 상태가 발생할 수 있다는 뜻입니다. 마찬가지로, 그 자식 프로세스가 데몬이 아니면 부모 프로세스가 종료 시점에 데몬이 아닌 모든 자식을 조인하려고 할 때 정지될 수 있습니다.
관리자를 사용하여 생성된 큐에는 이 문제가 없습니다. 프로그래밍 지침을 참조하세요.
프로세스 간 통신을 위해 큐를 사용하는 예는 예제을 참조하십시오.
- multiprocessing.Pipe(duplex=True)¶
파이프의 끝을 나타내는
Connection객체 쌍(conn1, conn2)를 반환합니다.duplex 가
True(기본값) 면 파이프는 양방향입니다. duplex 가False인 경우 파이프는 단방향입니다:conn1은 메시지를 받는 데에만 사용할 수 있고,conn2는 메시지를 보낼 때만 사용할 수 있습니다.send()메서드는 객체를pickle을 사용하여 직렬화하고, :meth:`~multiprocessing.Connection.recv`는 객체를 재구성합니다.
- class multiprocessing.Queue([maxsize])¶
파이프와 몇 개의 록/세마포어를 사용하여 구현된 프로세스 공유 큐를 반환합니다. 프로세스가 처음으로 항목을 큐에 넣으면 버퍼에서 파이프로 객체를 전송하는 피더 스레드가 시작됩니다.
이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
제한 시간 초과를 알리기 위해 표준 라이브러리의
queue모듈에서 정의되는queue.Empty와queue.Full예외를 일으킵니다.Queue`는 :meth:`~queue.Queue.task_done,join(), 그리고 :meth:`~queue.Queue.shutdown`을 제외한 :class:`queue.Queue`의 모든 메서드를 구현합니다.- qsize()¶
큐의 대략의 크기를 돌려줍니다. 다중 스레딩/다중 프로세싱 특성을 타기 때문에 이 숫자는 신뢰할 수 없습니다.
이것은 `sem_getvalue()`가 구현되지 않은 macOS와 같은 플랫폼에서 :exc:`NotImplementedError`를 발생시킬 수 있음에 유의하십시오.
- empty()¶
큐가 비어 있다면
True를, 그렇지 않으면False를 반환합니다. 다중 스레딩/다중 프로세싱 특성을 타기 때문에 신뢰할 수 없습니다.닫힌 큐에서는 :exc:`OSError`를 발생시킬 수 있습니다 (보장되지 않음).
- full()¶
큐가 가득 차면
True를, 그렇지 않으면False를 반환합니다. 다중 스레딩/다중 프로세싱 특성을 타기 때문에 신뢰할 수 없습니다.
- put(obj[, block[, timeout]])¶
obj를 큐에 넣습니다. 선택적 인자 block 이
True(기본값)이고 timeout 이None(기본값) 이면, 빈 슬롯이 생길 때까지 필요한 경우 블록합니다. timeout 이 양수인 경우, 최대 timeout 초만큼 블록하고 그 시간 내에 사용 가능 슬롯이 생기지 않으면queue.Full예외를 발생시킵니다. 그렇지 않으면 (block 이False) 빈 슬롯을 즉시 사용할 수 있으면 큐에 항목을 넣고, 그렇지 않으면queue.Full예외를 발생시킵니다 (이 경우 timeout 은 무시됩니다).버전 3.8에서 변경: 큐가 닫혔으면,
AssertionError대신ValueError가 발생합니다.
- put_nowait(obj)¶
put(obj, False)와 같습니다.
- get([block[, timeout]])¶
큐에서 항목을 제거하고 반환합니다. 선택적 인자 block 이
True(기본값)이고 timeout 이None(기본값) 이면, 항목이 들어올 때까지 필요한 경우 블록합니다. timeout 이 양수인 경우, 최대 timeout 초만큼 블록하고 그 시간 내에 항목이 들어오지 않으면queue.Empty예외를 발생시킵니다. 그렇지 않으면 (block이False) 즉시 사용할 수 있는 항목이 있으면 반환하고, 그렇지 않으면queue.Empty예외를 발생시킵니다 (이 경우 timeout 은 무시됩니다).버전 3.8에서 변경: 큐가 닫혔으면,
OSError대신ValueError가 발생합니다.
- get_nowait()¶
get(False)와 같습니다.
multiprocessing.Queue에는queue.Queue에서 찾을 수 없는 몇 가지 추가 메서드가 있습니다. 일반적으로 이러한 메서드는 대부분 코드에서 필요하지 않습니다:- close()¶
큐를 닫습니다: 내부 자원을 해제합니다.
큐는 더 이상 사용되어서는 안 됩니다. 예를 들어,
get(),put()및empty()메서드는 호출되지 않아야 합니다.백그라운드 스레드는 버퍼링된 모든 데이터를 파이프에 플러시하면 종료됩니다. 이는 큐가 가비지 컬렉션될 때 자동으로 호출됩니다.
- join_thread()¶
배경 스레드에 조인합니다.
close()가 호출된 후에만 사용할 수 있습니다. 배경 스레드가 종료될 때까지 블록해서 버퍼의 모든 데이터가 파이프로 플러시 되었음을 보증합니다.기본적으로 프로세스가 큐를 만든 주체가 아니면 종료할 때 큐의 배경 스레드를 조인하려고 합니다. 프로세스는
cancel_join_thread()를 호출하여join_thread()가 아무것도 하지 않게 할 수 있습니다.
- cancel_join_thread()¶
join_thread()의 블록을 방지합니다. 특히, 프로세스가 종료할 때 배경 스레드를 자동으로 조인하는 것을 막습니다 –join_thread()를 보십시오.이 메서드의 더 나은 이름은
allow_exit_without_flush()일 수 있습니다. 이 함수는 큐에 넣은 데이터가 손실될 가능성이 높으므로, 거의 사용할 필요가 없을 것입니다. 오직 현재 프로세스가 아래쪽 파이프에 데이터를 플러시하기를 기다리지 않고 즉시 종료해야 하며, 데이터 손실을 신경 쓰지 않을 때만 존재합니다.
참고
이 클래스의 기능은 호스트 운영 체제의 작동하는 공유 세마포어 구현을 요구합니다. 그런 것이 없으면, 클래스의 기능이 비활성화되고,
Queue의 인스턴스를 만들려고 하면ImportError를 일으킵니다. 자세한 내용은 bpo-3770을 참조하십시오. 아래에 나열된 특수 큐 형들도 마찬가지입니다.
- class multiprocessing.SimpleQueue¶
이것은 단순화된
Queue형으로, 록이 걸린Pipe에 매우 가깝습니다.이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
- close()¶
큐를 닫습니다: 내부 자원을 해제합니다.
큐를 닫은 후에는 더는 사용해서는 안 됩니다. 예를 들어,
get(),put()및empty()메서드가 더는 호출되지 않아야 합니다.Added in version 3.9.
- get()¶
큐에서 항목을 제거하고 반환합니다.
- put(item)¶
item 을 큐에 넣습니다.
- class multiprocessing.JoinableQueue([maxsize])¶
Queue서브 클래스JoinableQueue는 추가로task_done()과join()메서드를 가진 큐입니다.이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
- task_done()¶
앞서 큐에 넣은 작업이 완료되었음을 나타냅니다. 큐 소비자가 사용합니다. 작업을 가져오는데 사용된 각
get()마다, 뒤따르는task_done()호출은 작업에 대한 처리가 완료되었음을 큐에 알립니다.만약
join()이 현재 블록하고 있다면, 모든 항목이 처리될 때 재개될 것입니다 (put()으로 큐에 넣은 모든 항목에 대해task_done()호출을 수신했다는 뜻입니다).큐에 있는 항목보다 많이 호출되면
ValueError를 발생시킵니다.
- join()¶
큐의 모든 항목을 가져가서 처리할 때까지 블록합니다.
항목이 큐에 추가될 때마다 완료되지 않은 작업의 수는 올라갑니다. 소비자가 그 항목을 꺼냈고 그에 대한 모든 작업을 완료했음을 알리기 위해
task_done()을 호출할 때마다 숫자는 줄어듭니다. 완료되지 않은 작업의 수가 0으로 떨어지면join()이 블록으로부터 풀려납니다.
잡동사니¶
- multiprocessing.active_children()¶
현재 프로세스의 모든 살아있는 자식 리스트를 반환합니다.
이것을 호출하면 이미 완료된 프로세스에 “조인” 하는 부작용이 있습니다.
- multiprocessing.cpu_count()¶
시스템의 CPU 수를 반환합니다.
이 숫자는 현재 프로세스에서 사용할 수 있는 CPU 수와 같지 않습니다. 사용 가능한 CPU 수는
os.process_cpu_count()를 사용하거나len(os.sched_getaffinity(0))로 얻을 수 있습니다.CPU 개수를 결정할 수 없는 경우 :exc:`NotImplementedError`가 발생합니다.
버전 3.13에서 변경: 반환 값은 단순히
osCPU 카운트 API를 래핑하는 것이기 때문에,-X cpu_count플래그나 :envvar:`PYTHON_CPU_COUNT`를 사용하여 재정의할 수도 있습니다.
- multiprocessing.current_process()¶
현재 프로세스에 해당하는
Process객체를 반환합니다.threading.current_thread()와 유사한 기능을 제공합니다.
- multiprocessing.parent_process()¶
current_process()의 부모 프로세스에 해당하는Process객체를 반환합니다. 메인 프로세스에서,parent_process는None입니다.Added in version 3.8.
- multiprocessing.freeze_support()¶
multiprocessing`을 사용한 프로그램이 실행 파일로 프리징된 경우에 대한 지원을 추가합니다. (``**py2exe**`,**PyInstaller**, 및**cx_Freeze**등에서 테스트되었습니다.)메인 모듈의
if __name__ == '__main__'줄 바로 뒤에서 이 함수를 호출해야 합니다. 예를 들면:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
freeze_support()줄이 생략된 경우 고정된 실행 파일을 실행하려고 하면RuntimeError가 발생합니다.시작 방법이 spawn 인 경우
freeze_support()를 호출해도 아무 효과가 없습니다. 또한, 모듈이 파이썬 인터프리터에 의해 정상적으로 실행되는 경우 (프로그램이 프리징되지 않은 경우)에도freeze_support()는 효과가 없습니다.
- multiprocessing.get_all_start_methods()¶
지원되는 시작 방법 목록을 반환하며, 첫 번째 항목은 기본값입니다. 가능한 시작 방법은
'fork','spawn'및'forkserver'입니다. 모든 플랫폼이 모든 방법을 지원하는 것은 아닙니다. :ref:`multiprocessing-start-methods`를 참조하십시오.Added in version 3.4.
- multiprocessing.get_context(method=None)¶
multiprocessing모듈과 같은 속성을 가진 컨텍스트 객체를 반환합니다.method 가
None인 경우 기본 컨텍스트가 반환됩니다. 전역 시작 방법이 설정되지 않은 경우, 시스템 기본값으로 설정된다는 점에 유의하십시오. 자세한 내용은 전역 시작 방법 를 참조하십시오. 그렇지 않으면 method 는'fork','spawn'또는'forkserver'여야 합니다. 지정된 시작 방법이 사용 가능한지 여부에 따라ValueError가 발생합니다. 컨텍스트 및 시작 방법 를 참조하십시오.Added in version 3.4.
- multiprocessing.get_start_method(allow_none=False)¶
프로세스를 기동하기 위해서 사용되는 시작 방법의 이름을 돌려줍니다.
전역 시작 방법이 설정되지 않았고 allow_none 이
False인 경우, 전역 시작 방법은 기본값으로 설정되고 그 이름이 반환됩니다. 자세한 내용은 전역 시작 방법 를 참조하십시오.반환 값은
'fork','spawn','forkserver'또는None일 수 있습니다. 자세한 내용은 컨텍스트 및 시작 방법 를 참조하십시오.Added in version 3.4.
버전 3.8에서 변경: macOS에서, spawn 시작 방법이 이제 기본값입니다. fork 시작 방법은 서브 프로세스의 충돌로 이어질 수 있기 때문에, 안전하지 않은 것으로 간주해야 합니다. bpo-33725를 참조하십시오.
- multiprocessing.set_executable(executable)¶
자식 프로세스를 시작하는 데 사용할 파이썬 인터프리터의 경로를 설정합니다. (기본값은 :data:`sys.executable`가 사용됩니다). 임베더는 다음과 같은 작업을 수행해야 할 수도 있습니다:
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
자식 프로세스를 만들기 전에 해야 합니다.
버전 3.4에서 변경:
'spawn'시작 방법을 사용할 때 POSIX에서 이제 지원됩니다.버전 3.11에서 변경: :term:`path-like object`를 받습니다.
- multiprocessing.set_forkserver_preload(module_names, *, on_error='ignore')¶
이 설정은 forked 프로세스에 이미 임포트된 상태가 상속되도록 forkserver 메인 프로세스가 임포트하려고 시도하는 모듈 이름 목록을 지정합니다. 이 기능은 각 프로세스에서 반복 작업을 피하여 성능 향상에 사용될 수 있습니다.
이것이 작동하려면 forkserver 프로세스가 시작되기 전 ( :class:`Pool`을 생성하거나 :class:`Process`를 시작하기 전에) 호출되어야 합니다.
on_error 매개 변수는 모듈 프리로딩 중의
ImportError예외 처리 방식을 제어합니다:"ignore"(기본값)는 실패를 조용히 무시하고,"warn"은 forkserver 서브프로세스가 stderr에ImportWarning을 내보내게 하며,"fail"은 예외 트레이스백과 함께 stderr에서 forkserver 서브프로세스를 종료시켜 이후 프로세스 생성이EOFError또는ConnectionError로 실패하게 합니다.'forkserver'시작 방법을 사용할 때만 의미가 있습니다. :ref:`multiprocessing-start-methods`를 참조하십시오.Added in version 3.4.
버전 3.15에서 변경: on_error 매개 변수를 추가했습니다.
- multiprocessing.set_start_method(method, force=False)¶
자식 프로세스를 시작하는 데 사용해야 하는 방법을 설정합니다. method 인수는
'fork','spawn'또는'forkserver'가 될 수 있습니다. 시작 방법이 이미 설정되었고 force 가True가 아니면RuntimeError가 발생합니다. method 가None이고 force 가True이면 시작 방법은None으로 설정됩니다. method 가None이고 force 가False이면 컨텍스트는 기본 컨텍스트로 설정됩니다.이것은 한 번만 호출해야 하며, 메인 모듈의
if __name__ == '__main__'절 내에서 보호되어야 합니다.:ref:`multiprocessing-start-methods`를 참조하십시오.
Added in version 3.4.
참고
multiprocessing`에는 :func:`threading.active_count, threading.enumerate(), threading.settrace(), threading.setprofile(), threading.Timer 또는 :class:`threading.local`의 대응 물이 없습니다.
Connection 객체¶
연결 객체를 사용하면 피클 가능한 객체나 문자열을 보내고 받을 수 있습니다. 메시지 지향 연결된 소켓으로 생각할 수 있습니다.
연결 객체는 보통 Pipe 를 사용해서 만들어집니다 – 리스너와 클라이언트 도 참고하세요.
- class multiprocessing.connection.Connection¶
- send(obj)¶
연결의 반대편 끝에서
recv()를 사용하여 읽을 객체를 보냅니다.객체는 피클 가능해야 합니다. 매우 큰 피클(약 32 MiB+, OS에 따라 다릅니다)은
ValueError예외를 발생시킬 수 있습니다.
- recv()¶
연결의 반대편 끝에서
send()로 보낸 객체를 반환합니다. 뭔가 수신할 때까지 블록합니다. 수신할 내용이 없고 반대편 끝이 닫혔으면EOFError를 발생시킵니다.
- fileno()¶
연결이 사용하는 파일 기술자나 핸들을 돌려줍니다.
- close()¶
연결을 닫습니다.
연결이 가비지 수집될 때 자동으로 호출됩니다.
- poll([timeout])¶
읽어 들일 데이터가 있는지를 돌려줍니다.
timeout 을 지정하지 않으면 즉시 반환됩니다. timeout 이 숫자면 블록할 최대 시간(초)을 지정합니다. timeout 이
None이면 시간제한이 없습니다.여러 개의 연결 객체를
multiprocessing.connection.wait()을 사용하여 한 번에 폴링 할 수 있습니다.
- send_bytes(buf[, offset[, size]])¶
바이트열류 객체 의 바이트 데이터를 하나의 완전한 메시지로 보냅니다.
offset 이 주어진 경우, buffer 에서 해당 위치부터 데이터를 읽습니다. size 가 주어진 경우 그만큼의 바이트를 버퍼에서 읽게 됩니다. 매우 큰 버퍼(약 32 MiB 이상, OS마다 다름)는
ValueError예외를 발생시킬 수 있습니다.
- recv_bytes([maxlength])¶
접속의 반대편 끝에서 송신된 바이트 데이터의 완전한 메시지를 문자열로 돌려줍니다. 뭔가 수신할 때까지 블록합니다. 수신할 내용이 없고 반대편 끝이 닫혔으면
EOFError를 발생시킵니다.maxlength 가 지정되고 메시지가 maxlength 보다 길면
OSError가 발생하고 연결은 더는 읽을 수 없게 됩니다.
- recv_bytes_into(buf[, offset])¶
연결의 반대편 끝에서 전송된 바이트 데이터로 구성된 전체 메시지를 buf 에 읽어들이고 메시지의 바이트 수를 반환합니다. 무언가를 받을 때까지 대기(블록)합니다. 수신할 내용이 없고 상대방 끝이 닫혔으면
EOFError를 발생시킵니다.buf*는 쓰기가 가능한 :term:`바이트열류 객체 <bytes-like object>`여야 합니다. *offset*이 주어진 경우 메시지는 버퍼의 해당 위치부터 기록됩니다. offset은 *buf 길이보다 작은 음수가 아닌 정수여야 합니다 (바이트 단위).
버퍼가 너무 작으면
BufferTooShort예외가 발생하고, 완전한 메시지는e.args[0]으로 제공되는데, 여기서e는 예외 인스턴스입니다.
버전 3.3에서 변경: 이제 연결 객체 자체를
Connection.send()와Connection.recv()를 사용하여 프로세스 간에 전송할 수 있습니다.Connection 객체 역시 이제 컨텍스트 관리 프로토콜을 지원합니다 – :ref:`typecontextmanager`를 참조하세요. :meth:`~contextmanager.__enter__`는 Connection 객체를 반환하고, :meth:`~contextmanager.__exit__`는 :meth:`close`를 호출합니다.
예를 들어:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
경고
Connection.recv() 메서드는 수신한 데이터를 자동으로 언 피클 합니다. 메시지를 보낸 프로세스를 신뢰할 수 없다면 보안상 위험 할 수 있습니다.
따라서, 연결 객체가 Pipe()를 사용하여 생성되지 않았다면, 일종의 인증을 수행한 후에만 recv() 및 send() 메서드를 사용해야 합니다. 인증 키를 참조하세요.
경고
프로세스가 파이프에 읽거나 쓰려고 할 때 죽으면, 파이프의 데이터가 손상될 가능성이 있습니다. 메시지 경계가 어디에 있는지 확신할 수 없는 상태가 될 가능성이 있기 때문입니다.
동기화 프리미티브¶
일반적으로 다중 프로세스 프로그램에서는 동기화 프리미티브가 다중 스레드 프로그램에서만큼 필요하지는 않습니다. threading 모듈에 대한 설명서를 참조하십시오.
관리자 객체를 사용하여 동기화 프리미티브를 생성할 수도 있습니다 – 관리자를 참조하세요.
- class multiprocessing.Barrier(parties[, action[, timeout]])¶
배리어(barrier) 객체:
threading.Barrier의 복제본.이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
Added in version 3.3.
- class multiprocessing.BoundedSemaphore([value])¶
제한된 세마포어 객체:
threading.BoundedSemaphore과 유사한 대응 물.이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
대응 물과 한 가지 차이가 있습니다:
acquire메서드의 첫 번째 인자에 block 이라는 이름을 사용해서Lock.acquire()와의 일관성을 유지합니다.- locked()¶
이 객체가 현재 잠겨 있는지 여부를 나타내는 부울(boolean) 값을 반환합니다.
Added in version 3.14.
참고
macOS에서는 이 기능을
Semaphore와 구분할 수 없습니다. 왜냐하면 해당 플랫폼에서sem_getvalue()가 구현되어 있지 않기 때문입니다.
- class multiprocessing.Condition([lock])¶
조건 변수:
threading.Condition의 별칭.lock 을 지정하는 경우,
multiprocessing의Lock객체 또는RLock객체여야 합니다.이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
버전 3.3에서 변경:
wait_for()메서드가 추가되었습니다.
- class multiprocessing.Event¶
threading.Event의 복제본.이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
- class multiprocessing.Lock¶
비 재귀적 록 객체:
threading.Lock과 유사한 대응 물. 일단 프로세스 또는 스레드가 록을 획득하면, 프로세스 또는 스레드에서 록을 획득하려는 후속 시도는 록이 해제될 때까지 블록 됩니다; 모든 프로세스 또는 스레드가 이를 해제할 수 있습니다. 스레드에 적용되는threading.Lock의 개념과 동작은, 명시된 경우를 제외하고,multiprocessing.Lock를 통해 프로세스나 스레드에 그대로 적용됩니다.Lock은 실제로 기본 컨텍스트로 초기화된multiprocessing.synchronize.Lock의 인스턴스를 반환하는 팩토리 함수입니다.이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
Lock은 컨텍스트 관리자 프로토콜을 지원하므로with문에서 사용될 수 있습니다.- acquire(block=True, timeout=None)¶
블록하거나 블록하지 않는 방식으로 록을 획득합니다.
block 인자가
True(기본값) 로 설정되면, 메서드 호출은 록이 해제 상태가 될 때까지 블록 한 다음, 잠금 상태로 만들고True를 반환합니다. 이 첫 번째 인자의 이름은threading.Lock.acquire()와 다르다는 것에 유의하세요.block 인자가
False로 설정되면, 메서드 호출은 블록 되지 않습니다. 록이 현재 잠금 상태면False를 반환합니다. 그렇지 않으면 록을 잠금 상태로 설정하고True를 반환합니다.timeout 에 대해 양의 부동 소수점 값을 사용하여 호출하는 경우, 록을 얻을 수 없는 한 최대 timeout 으로 지정된 시간(초) 동안 블록합니다. timeout 을 음수 값으로 호출하는 것은 timeout 에 0을 주는 것과 같습니다. timeout 값이
None(기본값) 인 호출은 제한 시간을 무한대로 설정합니다. timeout 에 대한 음수와None값의 처리는threading.Lock.acquire()에서 구현된 동작과 다르다는 것에 주의하십시오. timeout 인자는 block 인자가False로 설정되면 실제적인 의미는 없고 무시됩니다. 록이 획득되면True를 돌려주고, 제한 시간 초과가 발생하면False를 돌려줍니다.
- release()¶
록을 해제합니다. 이것은 원래 록을 획득한 프로세스나 스레드뿐만 아니라 모든 프로세스나 스레드에서 호출 할 수 있습니다.
동작은
threading.Lock.release()와 같지만, 해제된 록에서 호출될 때ValueError가 발생한다는 점만 다릅니다.
- locked()¶
이 객체가 현재 잠겨 있는지 여부를 나타내는 부울(boolean) 값을 반환합니다.
Added in version 3.14.
- class multiprocessing.RLock¶
재귀적 록 객체:
threading.RLock과 유사한 대응 물. 재귀적 록은 획득한 프로세스 또는 스레드에 의해 해제되어야 합니다. 일단 프로세스나 스레드가 재귀적 록을 획득하면, 같은 프로세스나 스레드가 블록 없이 다시 획득할 수 있습니다; 해당 프로세스나 스레드는 획득할 때마다 한 번 해제해야 합니다.RLock은 실제로 기본 컨텍스트로 초기화된multiprocessing.synchronize.RLock의 인스턴스를 반환하는 팩토리 함수입니다.이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
RLock은 컨텍스트 관리자 프로토콜을 지원하므로with문에서 사용될 수 있습니다.- acquire(block=True, timeout=None)¶
블록하거나 블록하지 않는 방식으로 록을 획득합니다.
block 인자를
True로 설정해서 호출하면, 록이 현재 프로세스나 스레드가 이미 획득한 상태가 아니면 록이 (어떤 프로세스나 스레드도 획득하지 않은) 록 해제 상태가 될 때까지 블록합니다. 이후에 현재 프로세스나 스레드가 (소유권이 아직 없는 경우) 록 소유권을 얻게 되며 록 내 재귀 수준이 1 증가하고True를 반환합니다. 이 첫 번째 인자의 동작에는, 인자의 이름부터 시작해서threading.RLock.acquire()구현과 비교되는 몇 가지 차이점이 있습니다.block 인자를
False로 설정해서 호출하면 블록하지 않습니다. 록이 이미 다른 프로세스나 스레드에 의해 획득되었으면 (그래서 소유하고 있으면), 현재 프로세스나 스레드는 소유권을 갖지 않으며 록 내 재귀 수준은 변경되지 않고False를 반환합니다. 록이 해제 상태에 있으면, 현재 프로세스 또는 스레드가 소유권을 가져오며 재귀 수준이 증가하고True를 반환합니다.timeout 인자의 사용법과 동작은
Lock.acquire()와 같습니다. timeout 의 이러한 동작 중 일부는threading.RLock.acquire()에서 구현된 동작과 다르다는 것에 주의하십시오.
- release()¶
재귀 수준을 감소시키면서 록을 해제합니다. 감소 후에 재귀 수준이 0이면, 록을 해제 상태(어떤 프로세스나 스레드에도 소유되지 않음)로 재설정하고, 다른 프로세스나 스레드가 록이 해제될 때까지 기다리며 블록하고 있는 경우 해당 프로세스나 스레드 중 정확히 하나가 계속 진행하도록 허용합니다. 감소 후에 재귀 수준이 여전히 0이 아닌 경우, 록은 획득된 상태로 남고 호출한 프로세스나 스레드에 의해 소유됩니다.
호출한 프로세스나 스레드가 록을 소유하고 있을 때만 이 메서드를 호출하십시오. 이 메서드가 소유자가 아닌 프로세스나 스레드에 의해 호출되거나, 록이 해제 (소유되지 않은) 상태면
AssertionError가 발생합니다. 이 상황에서 발생하는 예외 형은threading.RLock.release()에서 구현된 동작과 다릅니다.
- locked()¶
이 객체가 현재 잠겨 있는지 여부를 나타내는 부울(boolean) 값을 반환합니다.
Added in version 3.14.
- class multiprocessing.Semaphore([value])¶
세마포어 객체:
threading.Semaphore와 유사한 대응 물.이 클래스를 인스턴스화하는 것이 전역 시작 메서드를 설정할 수 있습니다. 자세한 내용은 :ref:`global-start-method`를 참조하십시오.
대응 물과 한 가지 차이가 있습니다:
acquire메서드의 첫 번째 인자에 block 이라는 이름을 사용해서Lock.acquire()와의 일관성을 유지합니다.- get_value()¶
세마포어의 현재 값을 반환합니다.
이것은 `sem_getvalue()`가 구현되지 않은 macOS와 같은 플랫폼에서 :exc:`NotImplementedError`를 발생시킬 수 있음에 유의하십시오.
- locked()¶
이 객체가 현재 잠겨 있는지 여부를 나타내는 부울(boolean) 값을 반환합니다.
Added in version 3.14.
참고
macOS에서는 sem_timedwait 가 지원되지 않아, 타임아웃을 지정하여 acquire() 를 호출하면 대기 루프를 사용해 해당 함수의 동작을 에뮬레이션합니다.
참고
이 패키지의 기능 중 일부는 호스트 운영 체제의 작동하는 공유 세마포어 구현을 요구합니다. 그런 것이 없으면, multiprocessing.synchronize 모듈이 비활성화되고, 임포트하려고 하면 ImportError 를 일으킵니다. 자세한 내용은 bpo-3770을 참조하십시오.
관리자¶
관리자는 서로 다른 컴퓨터에서 실행되는 프로세스 간에 네트워크를 통해 공유하는 것을 포함하여 서로 다른 프로세스 간에 공유할 수 있는 데이터를 만드는 방법을 제공합니다. 관리자 객체는 공유 객체 를 관리하는 서버 프로세스를 제어합니다. 다른 프로세스는 프락시를 사용하여 공유 객체에 액세스 할 수 있습니다.
- multiprocessing.Manager()¶
프로세스 간에 객체를 공유하는 데 사용할 수 있는 시작된
SyncManager객체를 반환합니다. 반환된 관리자 객체는 생성된 자식 프로세스에 해당하며 공유 객체를 만들고 해당 프락시를 반환하는 메서드가 있습니다.
관리자 프로세스는 가비지 수집되거나 상위 프로세스가 종료되자마자 종료됩니다. 관리자 클래스는 multiprocessing.managers 모듈에 정의되어 있습니다 :
- class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)¶
BaseManager 객체를 만듭니다.
일단 생성되면 관리자 객체가 시작된 관리자 프로세스를 참조하게 하려고
start()또는get_server().serve_forever()를 호출해야 합니다.address 는 관리자 프로세스가 새 연결을 리슨하는 주소입니다. address 가
None이면 임의의 것이 선택됩니다.authkey 는 서버 프로세스로 들어오는 연결의 유효성을 검사하는 데 사용되는 인증 키입니다. authkey 가
None이면current_process().authkey가 사용됩니다. 그렇지 않으면 authkey 가 사용되며 바이트열이어야 합니다.serializer 는
'pickle'(pickle직렬화 사용) 또는'xmlrpclib'(xmlrpc.client직렬화 사용)이어야 합니다.ctx 는 컨텍스트 객체이거나
None입니다 (현재 Context 사용).None인 경우, 이것을 호출하면 전역 시작 메서드(global start method)가 설정될 수 있습니다. 자세한 내용은 전역 시작 방법 를 참조하십시오.shutdown_timeout is a timeout in seconds used to wait until the process used by the manager completes in the
shutdown()method. If the shutdown times out, the process is terminated. If terminating the process also times out, the process is killed.버전 3.11에서 변경: shutdown_timeout 매개변수가 추가되었습니다.
- start([initializer[, initargs]])¶
관리자를 시작시키기 위해 서브 프로세스를 시작합니다. initializer 가
None이 아닌 경우, 서브 프로세스는 시작할 때initializer(*initargs)를 호출합니다.
- get_server()¶
Manager의 제어를 받는 실제 서버를 나타내는
Server객체를 반환합니다.Server객체는serve_forever()메서드를 지원합니다:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server는 추가로address어트리뷰트를 가지고 있습니다.
- connect()¶
지역 관리자 객체를 원격 관리자 프로세스에 연결합니다:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
관리자 클래스에 형이나 콜러블을 등록하는데 사용할 수 있는 클래스 메서드.
typeid 는 특정 형의 공유 객체를 식별하는 데 사용되는 “형 식별자” 입니다. 문자열이어야 합니다.
callable 은 이 형 식별자에 대한 객체를 만드는 데 사용되는 콜러블 객체입니다. 관리자 인스턴스가
connect()메서드를 사용하여 서버에 연결되거나, create_method 인자가False면None으로 남겨 둘 수 있습니다.proxytype 은, 이 typeid 의 공유 객체의 프락시를 만드는 데 사용되는
BaseProxy의 서브 클래스입니다.None이면 프락시 클래스가 자동으로 생성됩니다.exposed 는 이 typeid에 대한 프락시가
BaseProxy._callmethod()를 사용하여 액세스 할 수 있도록 허용해야 하는 메서드 이름의 시퀀스를 지정하는 데 사용됩니다. (만약 exposed 가None이면, 존재하는 경우,proxytype._exposed_가 대신 사용됩니다.) exposed 리스트가 지정되지 않은 경우, 공유 객체의 모든 “공용 메서드” 에 액세스 할 수 있습니다. (여기서 “공용 메서드” 는__call__()메서드가 있고 그 이름이'_'로 시작하지 않는 어트리뷰트를 의미합니다.)method_to_typeid 는 프락시를 반환해야 하는 노출된 메서드의 반환형을 지정하는 데 사용되는 매핑입니다. 메서드 이름을 typeid 문자열로 매핑합니다. (만일 method_to_typeid 가
None이면, 존재한다면,proxytype._method_to_typeid_가 대신 사용됩니다.) 메서드의 이름이 이 매핑의 키가 아니거나 매핑이None이면, 메서드에 의해 반환된 객체는 값으로 복사됩니다.create_method 는 이름이 typeid 인 메서드를 만들어야 하는지를 결정합니다. 이 메서드는 서버 프로세스에 새 공유 객체를 만들고 프락시를 반환하도록 지시하는 데 사용될 수 있습니다. 기본적으로
True입니다.
BaseManager인스턴스는 읽기 전용 프로퍼티를 하나 가지고 있습니다:- address¶
관리자가 사용하는 주소.
버전 3.3에서 변경: 관리자 객체는 컨텍스트 관리 프로토콜을 지원합니다 – 컨텍스트 관리자 형를 보세요.
__enter__()는 서버 프로세스를 시작하고 (아직 시작하지 않았다면), 관리자 객체를 반환합니다.__exit__()는shutdown()을 호출합니다.이전 버전에서
__enter__()는 관리자의 서버 프로세스가 아직 시작되지 않았을 때 시작시키지 않았습니다.
- class multiprocessing.managers.SyncManager¶
프로세스의 동기화에 사용할 수 있는
BaseManager의 서브 클래스입니다. 이 형의 객체는multiprocessing.Manager()에 의해 반환됩니다.이 클래스의 메서드는 여러 프로세스에서 동기화 할 수 있도록 일반적으로 사용되는 많은 데이터형을 생성하고 프락시 객체를 반환합니다. 특히 공유 리스트와 딕셔너리가 포함됩니다.
- Barrier(parties[, action[, timeout]])¶
공유
threading.Barrier객체를 생성하고 프락시를 반환합니다.Added in version 3.3.
- BoundedSemaphore([value])¶
공유
threading.BoundedSemaphore객체를 생성하고 프락시를 반환합니다.
- Condition([lock])¶
공유
threading.Condition객체를 생성하고 프락시를 반환합니다.lock 이 제공되면
threading.Lock또는threading.RLock객체에 대한 프락시여야 합니다.버전 3.3에서 변경:
wait_for()메서드가 추가되었습니다.
- Event()¶
공유
threading.Event객체를 생성하고 프락시를 반환합니다.
- Lock()¶
공유
threading.Lock객체를 생성하고 프락시를 반환합니다.
- Queue([maxsize])¶
공유
queue.Queue객체를 생성하고 프락시를 반환합니다.
- RLock()¶
공유
threading.RLock객체를 생성하고 프락시를 반환합니다.
- Semaphore([value])¶
공유
threading.Semaphore객체를 생성하고 프락시를 반환합니다.
- Array(typecode, sequence)¶
배열을 만들고 프락시를 반환합니다.
- Value(typecode, value)¶
쓰기 가능한
value어트리뷰트를 가진 객체를 생성하고 프락시를 반환합니다.
- set()¶
- set(sequence)
- set(mapping)
공유
set객체를 생성하고 프록시를 반환합니다.Added in version 3.14:
set지원이 추가되었습니다.
버전 3.6에서 변경: 공유 객체는 중첩될 수 있습니다. 예를 들어, 공유 리스트와 같은 공유 컨테이너 객체는,
SyncManager에 의해 모두 관리되고 동기화되는 다른 공유 객체를 포함 할 수 있습니다.
- class multiprocessing.managers.Namespace¶
SyncManager로 등록 할 수 있는 형입니다.이름 공간 객체에는 공용 메서드가 없지만, 쓰기 가능한 어트리뷰트가 있습니다. repr 은 그것의 어트리뷰트 값을 보여줍니다.
그러나, 이름 공간 객체의 프락시를 사용할 때,
'_'로 시작하는 어트리뷰트는 프락시의 어트리뷰트가 되며 참조 대상의 어트리뷰트가 아닙니다:>>> mp_context = multiprocessing.get_context('spawn') >>> manager = mp_context.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # 이는 프록시의 속성입니다. >>> print(Global) Namespace(x=10, y='hello')
사용자 정의 관리자¶
자신만의 관리자를 만들려면, BaseManager 의 서브 클래스를 만들고 register() 클래스 메서드를 사용하여 새로운 형이나 콜러블을 관리자 클래스에 등록합니다. 예를 들면:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # 결과를 출력합니다(7)
print(maths.mul(7, 8)) # 결과를 출력합니다(56)
원격 관리자 사용하기¶
한 기계에서 관리자 서버를 실행하고 다른 기계의 클라이언트가 관리자 서버를 사용하도록 할 수 있습니다 (관련된 방화벽이 허용한다고 가정합니다).
다음 명령을 실행하면 원격 클라이언트가 액세스 할 수 있는 단일 공유 큐를 위한 서버가 만들어집니다:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
한 클라이언트는 다음과 같이 서버에 액세스 할 수 있습니다:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
또 다른 클라이언트도 사용할 수 있습니다:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
지역 프로세스 역시, 위의 클라이언트가 원격으로 액세스하는 코드를 사용하여 같은 큐에 액세스 할 수 있습니다:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
프락시 객체¶
프락시는 (아마도) 다른 프로세스에 있는 공유 객체를 가리키는 객체입니다. 공유 객체는 프락시의 지시 대상 이라고 합니다. 여러 프락시 객체는 같은 지시 대상을 가질 수 있습니다.
프락시 객체에는 지시 대상의 해당 메서드를 호출하는 메서드가 있습니다 (그러나 지시 대상의 모든 메서드가 반드시 프락시를 통해 사용할 수 있는 것은 아닙니다). 이런 식으로, 프락시는 지시 대상처럼 사용될 수 있습니다:
>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
프락시에 str() 을 적용하면 지시 대상의 표현이 반환되는 반면, repr() 을 적용하면 프락시의 표현이 반환됩니다.
프락시 객체의 중요한 특징은, 피클 가능해서 프로세스 간에 전달될 수 있다는 것입니다. 지시 대상은 프락시 객체를 포함 할 수 있습니다. 이것은 관리된 리스트, 딕셔너리 및 다른 프락시 객체 의 중첩을 허용합니다:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
비슷하게, 딕셔너리와 리스트 프락시는 서로 중첩될 수 있습니다:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
(프락시가 아닌) 표준 list 또는 dict 객체가 지시 대상에 포함되어있는 경우, 이 가변 값들에 대한 수정은 관리자를 통해 전파되지 않습니다. 포함된 값이 언제 수정되는지 프락시가 알 방법이 없기 때문입니다. 그러나 컨테이너 프락시에 값을 저장하는 것(프락시 객체의 __setitem__ 을 호출합니다)은 관리자를 통해 전파되므로, 그 항목을 효과적으로 수정하기 위해, 수정된 값을 컨테이너 프락시에 다시 대입할 수 있습니다:
# 리스트 프록시를 생성하고 가변 객체 (딕셔너리)를 추가합니다.
lproxy = manager.list()
lproxy.append({})
# 이제 딕셔너리를 수정합니다.
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# 이 시점에서 d에 대한 변경 사항은 아직 동기화되지 않았지만,
# 딕셔너리를 업데이트함으로써 프록시가 변경 사항을 감지하게 됩니다.
lproxy[0] = d
이 접근법은 아마도 대부분의 사용 사례에서 중첩된 프락시 객체를 사용하는 것보다 불편하지만, 동기화에 대한 제어 수준을 보여줍니다.
참고
:mod:`!multiprocessing`의 프록시 타입은 값으로 비교하는 것을 지원하도록 아무런 기능도 제공하지 않습니다. 따라서, 예를 들어 다음과 같은 결과가 나옵니다:
>>> manager.list([1,2,3]) == [1,2,3]
False
비교할 때는 지시 대상의 사본을 대신 사용해야 합니다.
- class multiprocessing.managers.BaseProxy¶
프락시 객체는
BaseProxy의 서브 클래스의 인스턴스입니다.- _callmethod(methodname[, args[, kwds]])¶
프락시의 지시 대상 메서드를 호출하고 결과를 반환합니다.
proxy가 프락시이고, 그 지시 대상이obj면, 표현식proxy._callmethod(methodname, args, kwds)
은 표현식
getattr(obj, methodname)(*args, **kwds)
을 관리자 프로세스에서 평가합니다.
반환된 값은 호출 결과의 복사본이거나 새 공유 객체에 대한 프락시입니다 –
BaseManager.register()의 method_to_typeid 인자에 대한 설명서를 보십시오.호출 때문에 예외가 발생하면,
_callmethod()가 다시 일으킵니다. 관리자 프로세스에서 다른 예외가 발생하면RemoteError예외로 변환되어_callmethod()가 일으킵니다.특히, methodname 이 노출되지 않았으면 예외가 발생합니다.
_callmethod()사용법의 예:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # l[2:7]과 동일 [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # l[20]과 동일 Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()¶
지시 대상의 복사본을 반환합니다.
지시 대상이 피클 가능하지 않으면 예외가 발생합니다.
- __repr__()¶
프락시 객체의 표현을 반환합니다.
- __str__()¶
지시 대상의 표현을 반환합니다.
정리¶
프락시 객체는 weakref 콜백을 사용해서 가비지 수집 시 자신의 지시 대상을 소유한 관리자에서 자신을 등록 취소합니다.
더는 참조하는 프락시가 없는 경우 공유 객체는 관리자 프로세스에서 삭제됩니다.
프로세스 풀¶
Pool 클래스를 사용하여, 제출된 작업을 수행할 프로세스 풀을 만들 수 있습니다.
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
작업을 제출할 수 있는 작업자 프로세스 풀을 제어하는 프로세스 풀 객체. 제한 시간과 콜백을 사용하는 비동기 결과를 지원하고 병렬 map 구현을 제공합니다.
processes 는 사용할 작업자 프로세스 개수입니다. processes 가
None이면os.process_cpu_count()에서 반환하는 숫자가 사용됩니다.initializer 가
None이 아니면, 각 작업자 프로세스는 시작할 때initializer(*initargs)를 호출합니다.maxtasksperchild 는, 사용되지 않는 자원을 해제할 수 있도록, 작업 프로세스가 종료되고 새 작업 프로세스로 교체되기 전에 완료할 수 있는 작업 수입니다. 기본 maxtasksperchild 는
None입니다. 이는 작업자 프로세스가 풀만큼 오래감을 의미합니다.context 매개 변수는 작업자 프로세스를 시작하는 데 사용되는 컨텍스트를 지정할 수 있습니다. 보통 풀은
multiprocessing.Pool()함수나 컨텍스트 객체의Pool()메서드를 사용하여 생성합니다. 두 경우 모두 context 가 적절하게 설정됩니다.None이면, 이 함수를 호출하면 현재 전역 시작 방법이 아직 설정되지 않은 경우 해당 값을 설정하는 부수 효과가 있습니다.get_context()함수를 참조하세요.풀 객체의 메서드는 풀을 생성한 프로세스에 의해서만 호출되어야 합니다.
경고
multiprocessing.pool객체에는 풀을 컨텍스트 관리자로 사용하거나close()와terminate()를 수동으로 호출하여 (다른 자원과 마찬가지로) 올바르게 관리해야 하는 내부 자원이 있습니다. 이를 수행하지 않으면 파이널리제이션 때 프로세스가 멈출 수 있습니다.풀을 파괴하기 위해 가비지 컬렉터에 의존해서는 안 됩니다. CPython은 풀의 최종화기가 호출됨을 보장하지 않기 때문입니다 (더 자세한 내용은 :meth:`object.__del__`를 참조하세요).
버전 3.2에서 변경: maxtasksperchild 매개 변수가 추가되었습니다.
버전 3.4에서 변경: context 매개 변수가 추가되었습니다.
버전 3.13에서 변경: processes 는 기본적으로
os.process_cpu_count()를 사용하며,os.cpu_count()를 사용하지 않습니다.참고
Pool내의 작업자 프로세스는 일반적으로 Pool의 전체 작업 큐 지속 기간 동안 존재합니다. 다른 시스템(예: Apache, mod_wsgi 등)에서 작업자가 잡고 있는 리소스를 해제하기 위해 흔히 사용되는 패턴은, 풀 내의 작업자가 종료되고 새로운 프로세스로 대체되기 전에 일정량의 작업을 완료하도록 허용하는 것입니다.Pool의 maxtasksperchild 인자는 이 기능을 최종 사용자에게 노출합니다.- apply(func[, args[, kwds]])¶
인자 args 및 키워드 인자 kwds 를 사용하여 func 를 호출합니다. 결과가 준비될 때까지 블록 됩니다. 이 블록 때문에,
apply_async()가 병렬로 작업을 수행하는 데 더 적합합니다. 또한 func 는 풀의 작업자 중 하나에서만 실행됩니다.
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
AsyncResult객체를 반환하는apply()메서드의 변형입니다.callback 이 지정되면 단일 인자를 받아들이는 콜러블이어야 합니다. 결과가 준비되면 callback 을 이 결과를 인자로 호출합니다. 실패한 결과면 error_callback 이 대신 적용됩니다.
error_callback 이 지정되면 단일 인자를 허용하는 콜러블이어야 합니다. 대상 함수가 실패하면, error_callback 이 예외 인스턴스를 인자로 호출됩니다.
콜백은 즉시 완료되어야 합니다. 그렇지 않으면 결과를 처리하는 스레드가 블록 됩니다.
- map(func, iterable[, chunksize])¶
map()내장 함수의 병렬 버전입니다 (하지만 하나의 iterable 인자만 지원합니다, 여러 이터러블에 대해서는starmap()을 참조하십시오). 결과가 준비될 때까지 블록 됩니다.이 메서드는 iterable을 여러 묶음으로 잘라서 별도의 작업으로 프로세스 풀에 제출합니다. 이러한 묶음의 (대략적인) 크기는 chunksize 를 양의 정수로 설정하여 지정할 수 있습니다.
매우 긴 이터러블은 높은 메모리 사용을 유발할 수 있습니다. 더 나은 효율성을 위해, 명시적인 chunksize 옵션으로
imap()이나imap_unordered()를 사용하는 것을 고려하십시오.
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
AsyncResult객체를 반환하는map()메서드의 변형입니다.callback 이 지정되면 단일 인자를 받아들이는 콜러블이어야 합니다. 결과가 준비되면 callback 을 이 결과를 인자로 호출합니다. 실패한 결과면 error_callback 이 대신 적용됩니다.
error_callback 이 지정되면 단일 인자를 허용하는 콜러블이어야 합니다. 대상 함수가 실패하면, error_callback 이 예외 인스턴스를 인자로 호출됩니다.
콜백은 즉시 완료되어야 합니다. 그렇지 않으면 결과를 처리하는 스레드가 블록 됩니다.
- imap(func, iterable[, chunksize])¶
map()의 느긋한 버전.chunksize 인자는
map()메서드에서 사용된 인자와 같습니다. 매우 긴 iterable의 경우 chunksize 에 큰 값을 사용하면 기본값1을 사용하는 것보다 작업을 많이 빠르게 완료 할 수 있습니다.또한 chunksize 가
1이면imap()메서드에 의해 반환된 이터레이터의next()메서드는 선택적 timeout 매개 변수를 가집니다:next(timeout)은 결과가 timeout 초 내에 반환될 수 없는 경우multiprocessing.TimeoutError를 발생시킵니다.
- imap_unordered(func, iterable[, chunksize])¶
imap()과 같지만, 반환된 이터레이터가 제공하는 결과의 순서가 임의적인 것으로 간주하여야 합니다. (단 하나의 작업자 프로세스가 있는 경우에만 순서가 “올바름” 이 보장됩니다.
- starmap(func, iterable[, chunksize])¶
map()과 비슷하지만, iterable 의 요소는 인수로 언팩되지 않는 가능한 반복(iterables)이 되어야 합니다.따라서 iterable 이
[(1,2), (3, 4)]미면 결과는[func(1,2), func(3,4)]가 됩니다.Added in version 3.3.
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
starmap()과map_async()의 조합으로 이터러블의 iterable 을 이터레이트하고 이터러블을 언팩해서 func 를 호출합니다. 결과 객체를 반환합니다.Added in version 3.3.
- close()¶
더는 작업이 풀에 제출되지 않도록 합니다. 모든 작업이 완료되면 작업자 프로세스가 종료됩니다.
- terminate()¶
계류 중인 작업을 완료하지 않고 즉시 작업자 프로세스를 중지합니다. 풀 객체가 가비지 수집될 때
terminate()가 즉시 호출됩니다.
- join()¶
작업자 프로세스가 종료될 때까지 기다립니다.
join()호출 전에 반드시close()나terminate()를 호출해야합니다 .
버전 3.3에서 변경: 풀 객체는 이제 컨텍스트 관리 프로토콜을 지원합니다 – 컨텍스트 관리자 형를 보십시오.
__enter__()는 풀 객체를 반환하고,__exit__()는terminate()를 호출합니다.
- class multiprocessing.pool.AsyncResult¶
Pool.apply_async()와Pool.map_async()에 의해 반환되는 결과의 클래스.- get([timeout])¶
결과가 도착할 때 반환합니다. timeout 이
None이 아니고 결과가 timeout 초 내에 도착하지 않으면multiprocessing.TimeoutError가 발생합니다. 원격 호출이 예외를 발생시키는 경우 해당 예외는get()에 의해 다시 발생합니다.
- wait([timeout])¶
결과가 사용 가능할 때까지 또는 timeout 초가 지날 때까지 기다립니다.
- ready()¶
호출이 완료했는지를 돌려줍니다.
- successful()¶
예외를 발생시키지 않고 호출이 완료되었는지를 돌려줍니다. 결과가 준비되지 않았으면
ValueError를 발생시킵니다.버전 3.7에서 변경: 결과가 준비되지 않았으면,
AssertionError대신ValueError가 발생합니다.
다음 예제는 풀 사용 방법을 보여줍니다.:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # 4개의 작업자 프로세스를 시작합니다.
result = pool.apply_async(f, (10,)) # 단일 프로세스에서 "f(10)"을 비동기적으로 평가합니다.
print(result.get(timeout=1)) # 컴퓨터가 *매우* 느리지 않다면 "100"이 출력됩니다.
print(pool.map(f, range(10))) # "[0, 1, 4,..., 81]"이 출력됩니다.
it = pool.imap(f, range(10))
print(next(it)) # "0"이 출력됩니다.
print(next(it)) # "1"이 출력됩니다.
print(it.next(timeout=1)) # 컴퓨터가 *매우* 느리지 않다면 "4"가 출력됩니다.
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # multiprocessing.TimeoutError를 발생시킵니다.
리스너와 클라이언트¶
보통 프로세스 간 메시지 전달은 큐를 사용하거나 Pipe() 가 반환하는 Connection 객체를 사용하여 수행됩니다.
그러나 multiprocessing.connection 모듈은 약간의 추가적인 유연성을 허용합니다. 기본적으로 소켓이나 윈도우 이름있는 파이프를 다루기 위한 높은 수준의 메시지 지향 API를 제공합니다. 또한 hmac 모듈을 사용한 다이제스트 인증 지원과 여러 연결을 동시에 폴링하는 기능도 가지고 있습니다.
- multiprocessing.connection.deliver_challenge(connection, authkey)¶
무작위로 생성된 메시지를 연결의 다른 쪽 끝으로 보내고 응답을 기다립니다.
응답이 authkey 를 키로 사용하는 메시지의 다이제스트와 일치하면 환영 메시지가 연결의 다른 끝으로 전송됩니다. 그렇지 않으면
AuthenticationError가 발생합니다.
- multiprocessing.connection.answer_challenge(connection, authkey)¶
메시지를 수신하고, authkey 를 키로 사용하여 메시지의 다이제스트를 계산한 다음, 다이제스트를 다시 보냅니다.
환영 메시지가 수신되지 않으면,
AuthenticationError가 발생합니다.
- multiprocessing.connection.Client(address[, family[, authkey]])¶
주소 address 를 사용하는 리스너에 대한 연결을 설정하려고 시도하고,
Connection을 반환합니다.연결 유형은 family 인자에 의해 결정되지만, 일반적으로 address 형식에서 유추 할 수 있으므로 일반적으로 생략 할 수 있습니다. (주소 형식를 참조하세요)
만약 authkey 가 주어지고
None이 아니라면, 바이트 문자열이어야 하며 HMAC 기반 인증 체인지를 위한 비밀 키로 사용됩니다. authkey 가None이면 인증은 수행되지 않습니다. 인증에 실패하면AuthenticationError가 발생합니다. 인증 키 를 참조하세요.
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
연결을 ‘리스닝’ 하는 바인드된 소켓이나 윈도우의 이름있는 파이프에 대한 래퍼입니다.
address 는 리스너 객체의 바인드된 소켓이나 이름있는 파이프가 사용할 주소입니다.
참고
주소가 ‘0.0.0.0’ 인 경우, 주소는 윈도우에서 연결 가능한 끝점이 아닙니다. 연결할 수 있는 끝점이 필요한 경우, ‘127.0.0.1’을 사용해야 합니다.
family 는 사용할 소켓(또는 이름있는 파이프)의 유형입니다. 문자열
'AF_INET'(TCP 소켓),'AF_UNIX'(유닉스 도메인 소켓),'AF_PIPE'(윈도우 이름있는 파이프) 중 하나일 수 있습니다. 이 중 오직 첫 번째 것만 항상 사용할 수 있음이 보장됩니다. family 가None이면, address 의 형식으로부터 유추됩니다. address 역시None이면, 기본값이 선택됩니다. 이 기본값은 사용 가능한 것 중 가장 빠른 것으로 기대되는 것입니다. 주소 형식를 참조하세요. family 가'AF_UNIX'이고 주소가None이면, 소켓은tempfile.mkstemp()를 사용하여 만들어진 비공개 임시 디렉터리에 생성됩니다.리스너 객체가 소켓을 사용하면, backlog (기본적으로 1) 는 소켓이 바인드되면 소켓의
listen()메서드에 전달됩니다.만약 authkey 가 주어지고
None이 아니라면, 바이트 문자열이어야 하며 HMAC 기반 인증 체인지를 위한 비밀 키로 사용됩니다. authkey 가None이면 인증은 수행되지 않습니다. 인증에 실패하면AuthenticationError가 발생합니다. 인증 키 를 참조하세요.- accept()¶
리스너 객체의 바인드된 소켓 또는 이름있는 파이프에 대한 연결을 수락하고
Connection객체를 반환합니다. 인증이 시도되고 실패하면AuthenticationError가 발생합니다.
- close()¶
리스너 객체의 바운드된 소켓 또는 이름있는 파이프를 닫습니다. 리스너가 가비지 수집될 때 자동으로 호출됩니다. 그러나 명시적으로 호출하는 것이 좋습니다.
리스너 객체는 다음과 같은 읽기 전용 프로퍼티를 가집니다:
- address¶
리스너 객체에서 사용 중인 주소.
- last_accepted¶
마지막으로 수락한 연결이 온 주소. 없으면
None입니다.
버전 3.3에서 변경: 리스너 객체는 컨텍스트 관리 프로토콜을 지원합니다 – 컨텍스트 관리자 형를 보세요.
__enter__()는 리스너 객체를 반환하고,__exit__()는close()를 호출합니다.
- multiprocessing.connection.wait(object_list, timeout=None)¶
object_list 에 있는 객체가 준비될 때까지 기다립니다. object_list 에 있는 객체 중 준비된 것들의 리스트를 반환합니다. timeout 이 float면, 호출이 최대 지정된 초만큼 블록 됩니다. timeout 이
None이면, 시간제한 없이 블록 됩니다. 음수 timeout은 0과 같습니다.POSIX 및 Windows 모두에서, 객체는 object__list 에 포함될 수 있습니다. 이 경우 객체는
읽기 가능한
Connection객체;연결되고 읽기 가능한
socket.socket객체; 또는
연결이나 소켓 객체는 읽을 수 있는 데이터가 있거나 반대편 끝이 닫히면 준비가 됩니다.
POSIX:
wait(object_list, timeout)almost equivalentselect.select(object_list, [], [], timeout). The difference is that, ifselect.select()is interrupted by a signal, it can raiseOSErrorwith an error number ofEINTR, whereaswait()will not.Windows: object_list*의 항목은 대기 가능한 정수 핸들(Win32 함수 ``WaitForMultipleObjects()``의 문서에서 사용되는 정의에 따름)이거나, 소켓 핸들이나 파이프 핸들을 반환하는 :meth:`~io.IOBase.fileno` 메서드를 가진 객체여야 합니다. (파이프 핸들 및 소켓 핸들은 **대기 가능한(waitable)* 핸들이 아님을 유의하세요.)
Added in version 3.3.
예제
다음 서버 코드는 인증 키로 'secret password' 를 사용하는 리스너를 만듭니다. 그런 다음 연결을 기다리고 어떤 데이터를 클라이언트로 보냅니다.:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family는 'AF\_INET'으로 추론됩니다.
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float]) # 값을 전송합니다. (예: 배열 데이터)
conn.send_bytes(b'hello') # 바이트 데이터를 전송합니다.
arr = array('i', [42, 1729])
conn.send_bytes(array('i', [42, 1729])) # 배열 바이트 데이터를 전송합니다.
다음 코드는 서버에 연결하고 서버로부터 어떤 데이터를 받습니다:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
다음 코드는 wait() 을 사용하여 여러 프로세스로부터 오는 메시지를 한 번에 기다립니다:
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name)) # 현재 프로세스 이름과 함께 메시지를 전송합니다.
w.close() # 쓰기 파이프를 닫습니다.
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# p가 쓰기 파이프에 대한 유일한 핸들 소유자가 되도록 현재 닫아줍니다. 이렇게 하면 다음에서 w를 닫을 때, wait()이 읽기 끝이 준비되었음을 즉시 보고하도록 보장합니다.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv() # 메시지를 수신합니다.
except EOFError:
readers.remove(r) # 연결이 종료되면 목록에서 제거합니다.
else:
print(msg)
주소 형식¶
'AF_INET'주소는(hostname, port)형식의 튜플입니다. hostname 은 문자열이고, port 는 정수입니다.'AF_UNIX'주소는 파일 시스템의 파일 이름을 나타내는 문자열입니다.An
'AF_PIPE'address is a string of the formr'\\.\pipe\PipeName'. To useClient()to connect to a named pipe on a remote computer called ServerName one should use an address of the formr'\\ServerName\pipe\PipeName'instead.
두 개의 역 슬래시로 시작하는 문자열은 기본적으로 'AF_UNIX' 주소가 아니라 'AF_PIPE' 주소로 간주합니다.
인증 키¶
Connection.recv 를 사용할 때, 수신된 데이터는 자동으로 언 피클 됩니다. 안타깝게도, 신뢰할 수 없는 출처의 데이터를 언 피클 하는 것은 보안상의 위험입니다. 때문에 Listener와 Client() 는 hmac 모듈을 사용하여 다이제스트 인증을 제공합니다.
인증 키는 암호로 여겨질 수 있는 바이트열입니다: 일단 연결이 이루어지면 양 끝은 다른 쪽이 인증 키를 알고 있음을 증명하도록 요구합니다. (양쪽 끝이 같은 키를 사용하고 있음을 증명하는 데는 연결을 통해 키를 보내는 것을 수반하지 않습니다.)
인증이 요청되었지만 인증 키가 지정되지 않으면, current_process().authkey 의 반환 값이 사용됩니다 (Process 를 보세요). 이 값은 현재 프로세스가 생성하는 Process 객체에 의해 자동으로 상속됩니다. 이것은 다중 프로세스 프로그램의 모든 프로세스는 (기본적으로) 자신들 간의 연결을 설정할 때 사용할 수 있는 하나의 인증 키를 공유한다는 것을 뜻합니다.
적절한 인증 키는 os.urandom() 을 사용하여 생성할 수도 있습니다.
이 인증은 주소로 접근 가능한 Listener 및 Client() 연결에 적용됩니다. 이는 :func:`~multiprocessing.Pipe`에 의해 생성되는 익명 파이프나 :class:`~multiprocessing.Queue`에 내부적으로 사용되지는 않습니다. :mod:`multiprocessing`은 동일한 사용자로 실행되는 모든 로컬 프로세스를 신뢰하는 것으로 간주합니다; 대부분의 운영 체제에서 이러한 프로세스는 관계없이 서로의 파이프 파일 기술자(file descriptor)에 접근할 수 있습니다. 동일한 사용자의 프로세스 사이에 격리가 필요한 애플리케이션은 운영체제 수준에서 이를 구성해야 합니다. 예를 들어, 워커를 다른 사용자 계정으로 실행하거나 샌드박스 내에서 실행하는 것이 가능합니다.
로깅¶
로깅에 대한 일부 지원이 제공됩니다. 그러나, logging 패키지는 프로세스 공유 록을 사용하지 않으므로 (처리기형에 따라) 다른 프로세스의 메시지가 뒤섞일 가능성이 있습니다.
- multiprocessing.get_logger()¶
:mod:`!multiprocessing`이 사용하는 로거를 반환합니다. 필요하다면, 새로운 로거가 생성됩니다.
로거는 처음 생성될 때 수준이 :const:`logging.NOTSET`이며 기본 핸들러가 없습니다. 이 로거로 전송된 메시지는 기본적으로 루트 로거로 전파되지 않습니다.
윈도우에서 자식 프로세스는 부모 프로세스의 로거의 수준만 상속받습니다 – 그 밖의 다른 로거 사용자 지정은 상속되지 않습니다.
- multiprocessing.log_to_stderr(level=None)¶
This function performs a call to
get_logger()but in addition to returning the logger created by get_logger, it adds a handler which sends output tosys.stderrusing format'[%(levelname)s/%(processName)s] %(message)s'. You can modifylevelnameof the logger by passing alevelargument.
다음은 로깅이 켜져 있는 예제 세션입니다:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
로깅 수준의 전체 표는 logging 모듈을 참조하십시오.
multiprocessing.dummy 모듈입니다.¶
multiprocessing.dummy`는 :mod:!multiprocessing`의 API를 복제하지만, threading 모듈을 감싸는 것 이상은 아닙니다.
특히, multiprocessing.dummy`에서 제공하는 ``Pool` 함수는 모든 동일한 메서드 호출을 지원하지만 작업자 프로세스 대신 작업자 스레드 풀을 사용하는 :class:`Pool`의 서브 클래스인 :class:`ThreadPool`의 인스턴스를 반환합니다.
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
작업을 제출할 수 있는 작업자 스레드 풀을 제어하는 스레드 풀 객체.
ThreadPool인스턴스는Pool인스턴스와 완전히 호환되며, 해당 리소스는 컨텍스트 관리자로 풀을 사용하거나close()와terminate()를 수동으로 호출하여 적절하게 관리해야 합니다.processes 는 사용할 워커 스레드 수를 나타냅니다. processes 가
None인 경우,os.process_cpu_count()에서 반환된 숫자가 사용됩니다.initializer 가
None이 아니면, 각 작업자 프로세스는 시작할 때initializer(*initargs)를 호출합니다.Pool과 달리, maxtasksperchild와 context는 제공할 수 없습니다.참고
ThreadPool은 프로세스 풀을 중심으로 설계되고concurrent.futures모듈 도입 이전에 설계된Pool과 같은 인터페이스를 공유합니다. 따라서, 스레드가 지원하는 풀에 적합하지 않은 일부 연산을 상속하고, 비동기 작업의 상태를 나타내는 자체 형AsyncResult를 가지고 있는데 다른 라이브러리에서는 이해하지 못합니다.사용자는 일반적으로 처음부터 스레드를 중심으로 설계되고
asyncio를 포함한 다른 많은 라이브러리와 호환되는concurrent.futures.Future인스턴스를 반환하는 더 간단한 인터페이스를 가진concurrent.futures.ThreadPoolExecutor를 사용하는 것을 선호해야 합니다.
프로그래밍 지침¶
:mod:`!multiprocessing`을 사용할 때 준수해야 할 지침과 관용구가 있습니다.
모든 시작 방법¶
다음은 모든 시작 방법에 적용됩니다.
공유 상태를 피하세요
가능한 한 프로세스 간에 많은 양의 데이터가 이동하지 않도록 해야 합니다.
저수준 동기화 프리미티브를 사용하기보다, 프로세스 간 통신을 위해 큐나 파이프를 사용하는 것이 아마도 최선입니다.
피클 가능성
프락시 메서드에 대한 인자가 피클 가능한지 확인하십시오.
프락시의 스레드 안전성
록으로 보호하지 않는 한 둘 이상의 스레드에서 프락시 객체를 사용하지 마십시오.
(여러 프로세스가 같은 프락시를 사용하는 문제는 존재하지 않습니다.)
좀비 프로세스 조인하기
POSIX에서 프로세스가 종료되었지만 join되지 않은 프로세스는 좀비 됩니다. 이런 적이 있어서는 안 되며, 새 프로세스가 시작될 때마다 (또는 :func:`~multiprocessing.active_children`가 호출될 때마다) 아직 join되지 않은 모든 완료된 프로세스가 join됩니다. 따라서, 종료된 프로세스의 :meth:`Process.is_alive <multiprocessing.Process.is_alive>`를 호출하는 것 또한 프로세스를 join하게 됩니다. 그럼에도 불구하고 시작한 모든 프로세스는 명시적으로 join 하는 것이 좋은 관행입니다.
피클/언 피클보다 상속하는 것이 더 좋습니다.
spawn 또는 forkserver 시작 방법을 사용할 때, :mod:`!multiprocessing`의 많은 유형은 자식 프로세스가 사용할 수 있도록 선택 가능해야 합니다. 하지만 일반적으로 파이프나 큐를 사용하여 공유 객체를 다른 프로세스로 보내는 것은 피해야 합니다. 대신, 다른 곳에 생성된 공유 자원에 접근해야 하는 프로세스가 조상 프로세스로부터 상속받을 수 있도록 프로그램을 구성해야 합니다.
프로세스 강제 종료를 피하세요
Process.terminate메서드를 사용해서 프로세스를 정지시키는 것은, 그 프로세스가 현재 사용하고 있는 공유 자원(가령 록, 세마포어, 파이프, 큐)을 손상하거나 다른 프로세스에서 사용할 수 없게 만들 수 있습니다.따라서, 아마도 어떤 공유 자원도 사용하지 않는 프로세스에만
Process.terminate사용을 고려하는 것이 최선일 겁니다.
큐를 사용하는 프로세스 조인하기
큐에 항목을 넣은 프로세스는 종료되기 전에 버퍼링 된 모든 항목이 “피더” 스레드에 의해 하부 파이프로 공급될 때까지 대기합니다. (자식 프로세스는
Queue.cancel_join_thread메서드를 호출해서 이 동작을 회피할 수 있습니다.)이것은, 큐를 사용할 때마다 큐에 넣은 모든 항목이 결국 프로세스가 조인되기 전에 제거되도록 해야 함을 의미합니다. 그렇지 않으면 큐에 항목을 넣은 프로세스가 종료되리라고 보장할 수 없습니다. 데몬이 아닌 프로세스가 자동으로 조인된다는 것도 기억하세요.
교착 상태에 빠지는 예는 다음과 같습니다:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # 이 코드는 데드락 상태에 빠집니다. obj = queue.get()이 문제를 고치는 방법은 마지막 두 줄의 순서를 바꾸는 것입니다 (또는 간단히
p.join()줄을 지우는 것입니다).
자식 프로세스에 자원을 명시적으로 전달하세요.
POSIX에서 fork 시작 방법을 사용할 경우, 자식 프로세스는 전역 자원을 사용하여 부모 프로세스에서 생성된 공유 자원을 이용할 수 있습니다. 하지만 객체를 자식 프로세스의 생성자 인자로 전달하는 것이 더 좋습니다.
윈도우 및 다른 시작 방법과 (잠재적으로) 호환될 수 있는 코드를 만드는 것 외에도, 이것은 자식 프로세스가 아직 살아있는 동안 객체가 부모 프로세스에서 가비지 수집되지 않음을 보장합니다. 부모 프로세스에서 그 객체가 가비지 수집될 때 일부 자원이 해제되면 이것이 중요 할 수 있습니다.
그래서 예를 들면
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()는 다음과 같이 다시 써야 합니다
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
sys.stdin 을 “파일류 객체”로 교체할 때 조심하세요
:mod:`!multiprocessing`은 원래 무조건 다음과 같이 호출했습니다:
os.close(sys.stdin.fileno())
multiprocessing.Process._bootstrap()메서드에서 하는 작업입니다 — 이것은 손자 프로세스와 관련된 문제로 이어졌습니다. 이것은 다음과 같이 변경되었습니다:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)이는 프로세스들이 충돌하여 나쁜 파일 디스크립터 오류가 발생하는 근본적인 문제를 해결하지만, :func:`sys.stdin`을 출력 버퍼링이 있는 “파일 같은 객체”로 대체하는 애플리케이션에는 잠재적인 위험을 초래합니다. 이 위험은 여러 프로세스가 이 파일 같은 객체에 대해 :meth:`~io.IOBase.close`를 호출할 경우, 동일한 데이터가 해당 객체에 여러 번 플러시될 수 있으며, 그 결과 손상이 발생할 수 있다는 것입니다.
파일류 객체를 작성하고 여러분 자신의 캐싱을 구현하면, 캐시에 추가할 때마다 pid를 저장하고, pid가 변경되면 캐시를 버려서 포크에 안전하게 만들 수 있습니다. 예를 들면:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
spawn 과 forkserver 시작 방법¶
fork 시작 방법에는 적용되지 않는 몇 가지 추가 제약 조건이 있습니다.
더 높은 피클 가능성
~multiprocessing.Process`의 모든 인자가 선택 가능하도록 보장하십시오. 또한, ``Process.__init__``을 서브클래싱하는 경우, :meth:`Process.start메서드가 호출될 때 인스턴스가 선택 가능함을 확실히 해야 합니다.
전역 변수
자식 프로세스에서 실행되는 코드가 전역 변수에 접근하려고 시도하면, 그 값은 (있는 경우)
Process.start가 호출되는 시점의 부모 프로세스의 값과 같지 않을 수 있습니다.하지만, 모듈 수준의 상수인 전역 변수는 문제가 되지 않습니다.
메인 모듈의 안전한 임포트
주 모듈이 의도하지 않은 부작용(새 프로세스를 시작하는 것과 같은) 없이 새로운 파이썬 인터프리터에 의해 안전하게 임포트될 수 있도록 하십시오.
예를 들어, spawn 또는 forkserver 시작 방법을 사용해서 다음 모듈을 실행하면
RuntimeError로 실패합니다:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()대신 다음과 같이
if __name__ == '__main__':을 사용하여 프로그램의 “진입 지점”을 보호해야 합니다:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(
freeze_support()줄은 프로그램이 프로즌 되지 않고 정상적으로 실행될 경우 생략될 수 있습니다.)이것은 새로 스폰 된 파이썬 인터프리터가 모듈을 안전하게 임포트 한 다음 모듈의
foo()함수를 실행할 수 있게 해줍니다.메인 모듈에서 풀이나 관리자를 만들면 비슷한 제한이 적용됩니다.
예제¶
사용자 정의된 관리자와 프락시를 만들고 사용하는 방법에 대한 시연:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('Foo.f()가 호출되었습니다.')
def g(self):
print('Foo.g()가 호출되었습니다.')
def _h(self):
print('Foo._h()가 호출되었습니다.')
# 간단한 제너레이터 함수입니다.
def baz():
for i in range(10):
yield i*i
# 제너레이터 객체를 위한 프록시 타입입니다.
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# operator 모듈을 반환하는 함수입니다.
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# Foo 클래스를 등록하고, `f()`와 `g()`를 프록시를 통해 사용 가능하게 만듭니다.
MyManager.register('Foo1', Foo)
# Foo 클래스를 등록하고, `g()`와 `_h()`를 프록시를 통해 사용 가능하게 만듭니다.
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# 제너레이터 함수 baz를 등록하고, 프록시로 사용할 수 있도록 `GeneratorProxy`를 이용합니다.
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# get_operator_module()을 등록하고, 공용 함수들을 프록시를 통해 사용 가능하게 만듭니다.
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Pool 사용하기:
import multiprocessing
import time
import random
import sys
#
# 테스트 코드에서 사용하는 함수들
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# 테스트 코드
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# 테스트들
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# 에러 핸들링 테스트
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# 타임아웃 테스트
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
큐를 사용하여 작업을 작업자 프로세스 집단에 제공하고 결과를 수집하는 방법을 보여주는 예:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# 워커 프로세스에서 실행되는 함수입니다.
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# 결과를 계산하는 데 사용되는 함수입니다.
#
def calculate(func, args):
result = func(*args)
return '%s는 %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# 작업에서 참조하는 함수들입니다.
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4 # 프로세스 개수
TASKS1 = [(mul, (i, 7)) for i in range(20)] # 첫 번째 작업 리스트
TASKS2 = [(plus, (i, 8)) for i in range(10)] # 두 번째 작업 리스트
# 큐를 생성합니다.
task_queue = Queue()
done_queue = Queue()
# 작업을 제출합니다.
for task in TASKS1:
task_queue.put(task)
# 워커 프로세스를 시작합니다.
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# 결과를 가져와 출력합니다.
print('순서가 지정되지 않은 결과:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# `put()`을 사용하여 더 많은 작업을 추가합니다.
for task in TASKS2:
task_queue.put(task)
# 더 많은 결과를 가져와 출력합니다.
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# 자식 프로세스들에게 중지하라고 알립니다.
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()