concurrent.futures --- 啟動平行任務

在 3.2 版被加入.

原始碼:Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.pyLib/concurrent/futures/interpreter.py


concurrent.futures 模組提供了一個高階介面來非同步地 (asynchronously) 執行可呼叫物件 (callable) 。

非同步執行可以透過 ThreadPoolExecutorInterpreterPoolExecutor 來使用執行緒 (thread) 執行,或透過 ProcessPoolExecutor 來使用單獨行程 (process) 執行。兩者都實作了相同的介面,該介面由抽象的 Executor 類別定義。

concurrent.futures.Future must not be confused with asyncio.Future, which is designed for use with asyncio tasks and coroutines. See the asyncio's Future documentation for a detailed comparison of the two.

可用性: not WASI.

此模組在 WebAssembly 平台上不起作用或無法使用。更多資訊請參閱 WebAssembly 平台

Executor 物件

class concurrent.futures.Executor

提供非同步執行呼叫方法的抽象類別。不應直接使用它,而應透過其具體子類別來使用。

submit(fn, /, *args, **kwargs)

為可呼叫物件 fn 排程來以 fn(*args, **kwargs) 的形式執行並回傳一個表示可呼叫的執行的 Future 物件。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)

類似於 map(fn, *iterables),除了:

  • The iterables are collected immediately rather than lazily, unless a buffersize is specified to limit the number of submitted tasks whose results have not yet been yielded. If the buffer is full, iteration over the iterables pauses until a result is yielded from the buffer.

  • fn 是非同步執行的,並且對 fn 的多次呼叫可以並行處理。

如果 __next__() 被呼叫,且在原先呼叫 Executor.map()timeout 秒後結果仍不可用,回傳的疊代器就會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就不會有限制。

如果 fn 呼叫引發例外,則當從疊代器中檢索到它的值時將引發該例外。

使用 ProcessPoolExecutor 時,此方法將 iterables 分成許多分塊 (chunks),並將其作為獨立的任務來提交給池 (pool)。可以透過將 chunksize 設定為正整數來指定這些分塊的(約略)大小。對於非常長的可疊代物件,chunksize 使用較大的值(與預設大小 1 相比)可以顯著提高性能。對於 ThreadPoolExecutorInterpreterPoolExecutorchunksize 無效。

在 3.5 版的變更: 新增 chunksize 參數。

在 3.14 版的變更: 新增 buffersize 參數。

shutdown(wait=True, *, cancel_futures=False)

向 executor 發出訊號 (signal),表明它應該在目前未定 (pending) 的 future 完成執行時釋放它正在使用的任何資源。在關閉後呼叫 Executor.submit()Executor.map() 將引發 RuntimeError

如果 waitTrue 則此方法將不會回傳,直到所有未定的 futures 完成執行並且與 executor 關聯的資源都被釋放。如果 waitFalse 則此方法將立即回傳,並且當所有未定的 future 執行完畢時,與 executor 關聯的資源將被釋放。不管 wait 的值如何,整個 Python 程式都不會退出,直到所有未定的 futures 執行完畢。

如果 cancel_futuresTrue,此方法將取消 executor 尚未開始運行的所有未定 future。無論 cancel_futures 的值如何,任何已完成或正在運行的 future 都不會被取消。

如果 cancel_futureswait 都為 True,則 executor 已開始運行的所有 future 將在此方法回傳之前完成。剩餘的 future 被取消。

如果使用 with 陳述式來將 executor 用作 context manager,那你就可以不用明確地呼叫此方法,這將會自己關閉 Executor(如同呼叫 Executor.shutdown()wait 被設定為 True 般等待):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

在 3.9 版的變更: 新增 cancel_futures

ThreadPoolExecutor

ThreadPoolExecutor 是一個 Executor 子類別,它使用執行緒池來非同步地執行呼叫。

當與 Future 關聯的可呼叫物件等待另一個 Future 的結果時,可能會發生死鎖 (deadlock)。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b 永遠不會完成,因為它正在等待 a。
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a 永遠不會完成,因為它正在等待 b。
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # 這將永遠不會完成,因為只有一個工作執行緒且
    # 它正在執行這個函式。
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(wait_on_future)
# 注意:呼叫 future.result() 也會造成死鎖 (deadlock),因為
# 唯一的工作執行緒已經在等待 wait_on_future()。
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一個 Executor 子類別,它使用最多有 max_workers 個執行緒的池來非同步地執行呼叫。

所有排隊到 ThreadPoolExecutor 的執行緒都將在直譯器退出之前加入。請注意,執行此操作的退出處理程式會在任何使用 atexit 新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的例外,以便向執行緒發出訊號來正常退出 (gracefully exit)。因此,建議不要將 ThreadPoolExecutor 用於長時間運行的任務。

initializer 是一個可選的可呼叫物件,在每個工作執行緒開始時呼叫; initargs 是傳遞給 initializer 的引數元組 (tuple)。如果 initializer 引發例外,所有目前未定的作業以及任何向池中提交 (submit) 更多作業的嘗試都將引發 BrokenThreadPool

在 3.5 版的變更: 如果 max_workersNone 或未給定,它將預設為機器上的處理器數量乘以 5,這假定了 ThreadPoolExecutor 通常用於 I/O 重疊而非 CPU 密集的作業,並且 worker 的數量應該高於 ProcessPoolExecutor 的 worker 數量。

在 3.6 版的變更: 新增 thread_name_prefix 參數以允許使用者控制由池所建立的工作執行緒 (worker thread) 的 threading.Thread 名稱,以便於除錯。

在 3.7 版的變更: 新增 initializerinitargs 引數。

在 3.8 版的變更: max_workers 的預設值改為 min(32, os.cpu_count() + 4)。此預設值為 I/O 密集任務至少保留了 5 個 worker。它最多使用 32 個 CPU 核心來執行CPU 密集任務,以釋放 GIL。並且它避免了在多核機器上隱晦地使用非常大量的資源。

ThreadPoolExecutor 現在在啟動 max_workers 工作執行緒之前會重用 (reuse) 空閒的工作執行緒。

在 3.13 版的變更: max_workers 的預設值被改為 min(32, (os.process_cpu_count() or 1) + 4)

ThreadPoolExecutor 範例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return