concurrent.futures --- 啟動平行任務¶
在 3.2 版被加入.
原始碼:Lib/concurrent/futures/thread.py、Lib/concurrent/futures/process.py 與 Lib/concurrent/futures/interpreter.py
concurrent.futures 模組提供了一個高階介面來非同步地 (asynchronously) 執行可呼叫物件 (callable) 。
非同步執行可以透過 ThreadPoolExecutor 或 InterpreterPoolExecutor 來使用執行緒 (thread) 執行,或透過 ProcessPoolExecutor 來使用單獨行程 (process) 執行。兩者都實作了相同的介面,該介面由抽象的 Executor 類別定義。
concurrent.futures.Future must not be confused with
asyncio.Future, which is designed for use with asyncio
tasks and coroutines. See the asyncio's Future
documentation for a detailed comparison of the two.
可用性: not WASI.
此模組在 WebAssembly 平台上不起作用或無法使用。更多資訊請參閱 WebAssembly 平台。
Executor 物件¶
- class concurrent.futures.Executor¶
提供非同步執行呼叫方法的抽象類別。不應直接使用它,而應透過其具體子類別來使用。
- submit(fn, /, *args, **kwargs)¶
為可呼叫物件 fn 排程來以
fn(*args, **kwargs)的形式執行並回傳一個表示可呼叫的執行的Future物件。with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)¶
類似於
map(fn, *iterables),除了:The iterables are collected immediately rather than lazily, unless a buffersize is specified to limit the number of submitted tasks whose results have not yet been yielded. If the buffer is full, iteration over the iterables pauses until a result is yielded from the buffer.
fn 是非同步執行的,並且對 fn 的多次呼叫可以並行處理。
如果
__next__()被呼叫,且在原先呼叫Executor.map()的 timeout 秒後結果仍不可用,回傳的疊代器就會引發TimeoutError。timeout 可以是整數或浮點數。如果未指定 timeout 或為None,則等待時間就不會有限制。如果 fn 呼叫引發例外,則當從疊代器中檢索到它的值時將引發該例外。
使用
ProcessPoolExecutor時,此方法將 iterables 分成許多分塊 (chunks),並將其作為獨立的任務來提交給池 (pool)。可以透過將 chunksize 設定為正整數來指定這些分塊的(約略)大小。對於非常長的可疊代物件,chunksize 使用較大的值(與預設大小 1 相比)可以顯著提高性能。對於ThreadPoolExecutor和InterpreterPoolExecutor,chunksize 無效。在 3.5 版的變更: 新增 chunksize 參數。
在 3.14 版的變更: 新增 buffersize 參數。
- shutdown(wait=True, *, cancel_futures=False)¶
向 executor 發出訊號 (signal),表明它應該在目前未定 (pending) 的 future 完成執行時釋放它正在使用的任何資源。在關閉後呼叫
Executor.submit()和Executor.map()將引發RuntimeError。如果 wait 為
True則此方法將不會回傳,直到所有未定的 futures 完成執行並且與 executor 關聯的資源都被釋放。如果 wait 為False則此方法將立即回傳,並且當所有未定的 future 執行完畢時,與 executor 關聯的資源將被釋放。不管 wait 的值如何,整個 Python 程式都不會退出,直到所有未定的 futures 執行完畢。如果 cancel_futures 為
True,此方法將取消 executor 尚未開始運行的所有未定 future。無論 cancel_futures 的值如何,任何已完成或正在運行的 future 都不會被取消。如果 cancel_futures 和 wait 都為
True,則 executor 已開始運行的所有 future 將在此方法回傳之前完成。剩餘的 future 被取消。如果使用
with陳述式來將 executor 用作 context manager,那你就可以不用明確地呼叫此方法,這將會自己關閉Executor(如同呼叫Executor.shutdown()時 wait 被設定為True般等待):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
在 3.9 版的變更: 新增 cancel_futures。
ThreadPoolExecutor¶
ThreadPoolExecutor 是一個 Executor 子類別,它使用執行緒池來非同步地執行呼叫。
當與 Future 關聯的可呼叫物件等待另一個 Future 的結果時,可能會發生死鎖 (deadlock)。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b 永遠不會完成,因為它正在等待 a。
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a 永遠不會完成,因為它正在等待 b。
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
和:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# 這將永遠不會完成,因為只有一個工作執行緒且
# 它正在執行這個函式。
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(wait_on_future)
# 注意:呼叫 future.result() 也會造成死鎖 (deadlock),因為
# 唯一的工作執行緒已經在等待 wait_on_future()。
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
一個
Executor子類別,它使用最多有 max_workers 個執行緒的池來非同步地執行呼叫。所有排隊到
ThreadPoolExecutor的執行緒都將在直譯器退出之前加入。請注意,執行此操作的退出處理程式會在任何使用atexit新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的例外,以便向執行緒發出訊號來正常退出 (gracefully exit)。因此,建議不要將ThreadPoolExecutor用於長時間運行的任務。initializer 是一個可選的可呼叫物件,在每個工作執行緒開始時呼叫; initargs 是傳遞給 initializer 的引數元組 (tuple)。如果 initializer 引發例外,所有目前未定的作業以及任何向池中提交 (submit) 更多作業的嘗試都將引發
BrokenThreadPool。在 3.5 版的變更: 如果 max_workers 為
None或未給定,它將預設為機器上的處理器數量乘以5,這假定了ThreadPoolExecutor通常用於 I/O 重疊而非 CPU 密集的作業,並且 worker 的數量應該高於ProcessPoolExecutor的 worker 數量。在 3.6 版的變更: 新增 thread_name_prefix 參數以允許使用者控制由池所建立的工作執行緒 (worker thread) 的
threading.Thread名稱,以便於除錯。在 3.7 版的變更: 新增 initializer 與 initargs 引數。
在 3.8 版的變更: max_workers 的預設值改為
min(32, os.cpu_count() + 4)。此預設值為 I/O 密集任務至少保留了 5 個 worker。它最多使用 32 個 CPU 核心來執行CPU 密集任務,以釋放 GIL。並且它避免了在多核機器上隱晦地使用非常大量的資源。ThreadPoolExecutor 現在在啟動 max_workers 工作執行緒之前會重用 (reuse) 空閒的工作執行緒。
在 3.13 版的變更: max_workers 的預設值被改為
min(32, (os.process_cpu_count() or 1) + 4)。
ThreadPoolExecutor 範例¶
import concurrent.futures