17.2. multiprocessing — Process-based parallelism¶
Source code: Lib/multiprocessing/
17.2.1. 簡介¶
multiprocessing is a package that supports spawning processes using an
API similar to the threading module. The multiprocessing package
offers both local and remote concurrency, effectively side-stepping the
Global Interpreter Lock by using subprocesses instead of threads. Due
to this, the multiprocessing module allows the programmer to fully
leverage multiple processors on a given machine. It runs on both Unix and
Windows.
The multiprocessing module also introduces APIs which do not have
analogs in the threading module. A prime example of this is the
Pool object which offers a convenient means of
parallelizing the execution of a function across multiple input values,
distributing the input data across processes (data parallelism). The following
example demonstrates the common practice of defining such functions in a module
so that child processes can successfully import that module. This basic example
of data parallelism using Pool,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
will print to standard output
[1, 4, 9]
17.2.1.1. The Process class¶
In multiprocessing, processes are spawned by creating a Process
object and then calling its start() method. Process
follows the API of threading.Thread. A trivial example of a
multiprocess program is
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
To show the individual process IDs involved, here is an expanded example:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
For an explanation of why the if __name__ == '__main__' part is
necessary, see Programming guidelines.
17.2.1.2. Contexts and start methods¶
Depending on the platform, multiprocessing supports three ways
to start a process. These start methods are
- spawn
The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process objects
run()method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.Available on Unix and Windows. The default on Windows.
- fork
The parent process uses
os.fork()to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.Available on Unix only. The default on Unix.
- forkserver
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use
os.fork(). No unnecessary resources are inherited.Available on Unix platforms which support passing file descriptors over Unix pipes.
3.4 版更變: spawn added on all unix platforms, and forkserver added for some unix platforms. Child processes no longer inherit all of the parents inheritable handles on Windows.
On Unix using the spawn or forkserver start methods will also start a semaphore tracker process which tracks the unlinked named semaphores created by processes of the program. When all processes have exited the semaphore tracker unlinks any remaining semaphores. Usually there should be none, but if a process was killed by a signal there may be some 「leaked」 semaphores. (Unlinking the named semaphores is a serious matter since the system allows only a limited number, and they will not be automatically unlinked until the next reboot.)
To select a start method you use the set_start_method() in
the if __name__ == '__main__' clause of the main module. For
example:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method() should not be used more than once in the
program.
Alternatively, you can use get_context() to obtain a context
object. Context objects have the same API as the multiprocessing
module, and allow one to use multiple start methods in the same
program.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Note that objects related to one context may not be compatible with processes for a different context. In particular, locks created using the fork context cannot be passed to processes started using the spawn or forkserver start methods.
A library which wants to use a particular start method should probably
use get_context() to avoid interfering with the choice of the
library user.
17.2.1.3. Exchanging objects between processes¶
multiprocessing supports two types of communication channel between
processes:
Queues
The
Queueclass is a near clone ofqueue.Queue. For example:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Queues are thread and process safe.
Pipes
The
Pipe()function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()The two connection objects returned by
Pipe()represent the two ends of the pipe. Each connection object hassend()andrecv()methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
17.2.1.4. Synchronization between processes¶
multiprocessing contains equivalents of all the synchronization
primitives from threading. For instance one can use a lock to ensure
that only one process prints to standard output at a time:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Without using the lock output from the different processes is liable to get all mixed up.
17.2.1.5. Sharing state between processes¶
As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes.
However, if you really do need to use some shared data then
multiprocessing provides a couple of ways of doing so.
Shared memory
Data can be stored in a shared memory map using
ValueorArray. For example, the following codefrom multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])will print
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]The
'd'and'i'arguments used when creatingnumandarrare typecodes of the kind used by thearraymodule:'d'indicates a double precision float and'i'indicates a signed integer. These shared objects will be process and thread-safe.For more flexibility in using shared memory one can use the
multiprocessing.sharedctypesmodule which supports the creation of arbitrary ctypes objects allocated from shared memory.
Server process
A manager object returned by
Manager()controls a server process which holds Python objects and allows other processes to manipulate them using proxies.A manager returned by
Manager()will support typeslist,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,ValueandArray. For example,from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)will print
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.
17.2.1.6. Using a pool of workers¶
The Pool class represents a pool of worker
processes. It has methods which allows tasks to be offloaded to the worker
processes in a few different ways.
For example:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Note that the methods of a pool should only ever be used by the process which created it.
備註
Functionality within this package requires that the __main__ module be
importable by the children. This is covered in Programming guidelines
however it is worth pointing out here. This means that some examples, such
as the multiprocessing.pool.Pool examples will not work in the
interactive interpreter. For example:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(If you try this it will actually output three full tracebacks interleaved in a semi-random fashion, and then you may have to stop the master process somehow.)
17.2.2. Reference¶
The multiprocessing package mostly replicates the API of the
threading module.
17.2.2.1. Process and exceptions¶
-
class
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ Process objects represent activity that is run in a separate process. The
Processclass has equivalents of all the methods ofthreading.Thread.The constructor should always be called with keyword arguments. group should always be
None; it exists solely for compatibility withthreading.Thread. target is the callable object to be invoked by therun()method. It defaults toNone, meaning nothing is called. name is the process name (seenamefor more details). args is the argument tuple for the target invocation. kwargs is a dictionary of keyword arguments for the target invocation. If provided, the keyword-only daemon argument sets the processdaemonflag toTrueorFalse. IfNone(the default), this flag will be inherited from the creating process.By default, no arguments are passed to target.
If a subclass overrides the constructor, it must make sure it invokes the base class constructor (
Process.__init__()) before doing anything else to the process.3.3 版更變: Added the daemon argument.
-
run()¶ Method representing the process’s activity.
You may override this method in a subclass. The standard
run()method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
start()¶ Start the process’s activity.
This must be called at most once per process object. It arranges for the object’s
run()method to be invoked in a separate process.
-
join(
-
