itertools --- 建立產生高效率迴圈之疊代器的函式


這個模組實作了許多 疊代器 (iterator) 構建塊 (building block),其靈感來自 APL、Haskell 和 SML 的結構。每個構建塊都以適合 Python 的形式來重新設計。

這個模組標準化了快速且高效率利用記憶體的核心工具集,這些工具本身或組合使用都很有用。它們共同構成了一個「疊代器代數 (iterator algebra)」,使得在純 Python 中簡潔且高效地建構專用工具成為可能。

例如,SML 提供了一個造表工具:tabulate(f),它產生一個序列 f(0), f(1), ...。在 Python 中,可以透過結合 map()count() 組成 map(f, count()) 以達到同樣的效果。

一般疊代器:

疊代器

引數

結果

範例

accumulate()

p [,func]

p0, p0+p1, p0+p1+p2, ...

accumulate([1,2,3,4,5]) 1 3 6 10 15

batched()

p, n

(p0, p1, ..., p_n-1), ...

batched('ABCDEFG', n=3) ABC DEF G

chain()

p, q, ...

p0, p1, ... plast, q0, q1, ...

chain('ABC', 'DEF') A B C D E F

chain.from_iterable()

可疊代物件

p0, p1, ... plast, q0, q1, ...

chain.from_iterable(['ABC', 'DEF']) A B C D E F

compress()

data, selectors

(d[0] if s[0]), (d[1] if s[1]), ...

compress('ABCDEF', [1,0,1,0,1,1]) A C E F

count()

[start[, step]]

start, start+step, start+2*step, ...

count(10) 10 11 12 13 14 ...

cycle()

p

p0, p1, ... plast, p0, p1, ...

cycle('ABCD') A B C D A B C D ...

dropwhile()

predicate, seq

seq[n], seq[n+1],當 predicate 失敗時開始

dropwhile(lambda x: x<5, [1,4,6,3,8]) 6 3 8

filterfalse()

predicate, seq

當 predicate(elem) 失敗時 seq 的元素

filterfalse(lambda x: x<5, [1,4,6,3,8]) 6 8

groupby()

iterable[, key]

根據 key(v) 的值分組的子疊代器

groupby(['A','B','DEF'], len) (1, A B) (3, DEF)

islice()

seq, [start,] stop [, step]

seq[start:stop:step] 的元素

islice('ABCDEFG', 2, None) C D E F G

pairwise()

可疊代物件

(p[0], p[1]), (p[1], p[2])

pairwise('ABCDEFG') AB BC CD DE EF FG

repeat()

elem [,n]

elem, elem, elem,... 重複無限次或 n 次

repeat(10, 3) 10 10 10

starmap()

func, seq

func(*seq[0]), func(*seq[1]), ...

starmap(pow, [(2,5), (3,2), (10,3)]) 32 9 1000

takewhile()

predicate, seq

seq[0], seq[1],直到 predicate 失敗

takewhile(lambda x: x<5, [1,4,6,3,8]) 1 4

tee()

it, n

it1, it2, ... itn,將一個疊代器分成 n 個

tee('ABC', 2) A B C, A B C

zip_longest()

p, q, ...

(p[0], q[0]), (p[1], q[1]), ...

zip_longest('ABCD', 'xy', fillvalue='-') Ax By C- D-

組合疊代器:

疊代器

引數

結果

product()

p, q, ... [repeat=1]

笛卡爾乘積 (cartesian product),相當於巢狀的 for 迴圈

permutations()

p[, r]

長度為 r 的元組,所有可能的定序,無重複元素

combinations()

p, r

長度為 r 的元組,按照排序過後的定序,無重複元素

combinations_with_replacement()

p, r

長度為 r 的元組,按照排序過後的定序,有重複元素

範例

結果

product('ABCD', repeat=2)

AA AB AC AD BA BB BC BD CA CB CC CD DA DB DC DD

permutations('ABCD', 2)

AB AC AD BA BC BD CA CB CD DA DB DC

combinations('ABCD', 2)

AB AC AD BC BD CD

combinations_with_replacement('ABCD', 2)

AA AB AC AD BB BC BD CC CD DD

Itertool 函式

以下的函式都會建構並回傳疊代器。一些函式提供無限長度的串流 (stream),因此應僅由截斷串流的函式或迴圈來存取它們。

itertools.accumulate(iterable[, function, *, initial=None])

建立一個回傳累積和的疊代器,或其他二進位函式的累積結果。

function 預設為加法。function 應接受兩個引數,即累積總和和來自 iterable 的值。

如果提供了 initial 值,則累積將從該值開始,並且輸出的元素數將比輸入的可疊代物件多一個。

大致等價於:

def accumulate(iterable, function=operator.add, *, initial=None):
    'Return running totals'
    # accumulate([1,2,3,4,5]) → 1 3 6 10 15
    # accumulate([1,2,3,4,5], initial=100) → 100 101 103 106 110 115
    # accumulate([1,2,3,4,5], operator.mul) → 1 2 6 24 120

    iterator = iter(iterable)
    total = initial
    if initial is None:
        try:
            total = next(iterator)
        except StopIteration:
            return

    yield total
    for element in iterator:
        total = function(total, element)
        yield total

function 引數可以被設定為 min() 以得到連續的最小值,設定為 max() 以得到連續的最大值,或者設定為 operator.mul() 以得到連續的乘積。也可以透過累積利息和付款來建立攤銷表 (Amortization tables)

>>> data = [3, 4, 6, 2, 1, 9, 0, 7, 5, 8]
>>> list(accumulate(data, max))              # 運行最大值
[3, 4, 6, 6, 6, 9, 9, 9, 9, 9]
>>> list(accumulate(data, operator.mul))     # 運行乘積
[3, 12, 72, 144, 144, 1296, 0, 0, 0, 0]

# 攤銷一筆 1000 的 5% 貸款,分 10 年、每年支付 90
>>> update = lambda balance, payment: round(balance * 1.05) - payment
>>> list(accumulate(repeat(90, 10), update, initial=1_000))
[1000, 960, 918, 874, 828, 779, 728, 674, 618, 559, 497]

可參見 functools.reduce(),其是個類似的函式,但僅回傳最終的累積值。

在 3.2 版被加入.

在 3.3 版的變更: 新增可選的 function 參數。

在 3.8 版的變更: 新增可選的 initial 參數。

itertools.batched(iterable, n, *, strict=False)

將來自 iterable 的資料分批為長度為 n 的元組。最後一個批次可能比 n 短。

如果 strict 為真,則當最後一個批次比 n 短時,會引發 ValueError

對輸入的可疊代物件進行迴圈,並將資料累積到大小為 n 的元組中。輸入是惰性地被消耗 (consumed lazily) 的,會剛好足夠填充一批的資料。一旦批次填滿或輸入的可疊代物件耗盡,就會 yield 出結果:

>>> flattened_data = ['roses', 'red', 'violets', 'blue', 'sugar', 'sweet']
>>> unflattened = list(batched(flattened_data, 2))
>>> unflattened
[('roses', 'red'), ('violets', 'blue'), ('sugar', 'sweet')]

大致等價於:

def batched(iterable, n, *, strict=False):
    # batched('ABCDEFG', 3) → ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    iterator = iter(iterable)
    while batch := tuple(islice(iterator, n)):
        if strict and len(batch) != n:
            raise ValueError('batched(): incomplete batch')
        yield batch

在 3.12 版被加入.

在 3.13 版的變更: 新增 strict 選項。

itertools.chain(*iterables)

建立一個疊代器,從第一個可疊代物件回傳元素直到其耗盡,然後繼續處理下一個可疊代物件,直到所有可疊代物件都耗盡。這將多個資料來源結合為單一個疊代器。大致等價於:

def chain(*iterables):
    # chain('ABC', 'DEF') → A B C D E F
    for iterable in iterables:
        yield from iterable
classmethod chain.from_iterable(iterable)

chain() 的另一個建構函式。從單個可疊代的引數中得到鏈接的輸入,該引數是惰性計算的。大致等價於:

def from_iterable(iterables):
    # chain.from_iterable(['ABC', 'DEF']) → A B C D E F
    for iterable in iterables:
        yield from iterable
itertools.combinations(iterable, r)

從輸入 iterable 中回傳長度為 r 的元素的子序列。

輸出是 product() 的子序列,僅保留作為 iterable 子序列的條目。輸出的長度由 math.comb() 給定,當 0 r n 時,長度為 n! / r! / (n - r)!,當 r > n 時為零。

根據輸入值 iterable 的順序,組合的元組會按照字典順序輸出。如果輸入的 iterable 已經排序,則輸出的元組也將按排序的順序產生。

元素是根據它們的位置(而非值)來決定其唯一性。如果輸入的元素都是獨特的,則每個組合內將不會有重複的值。

大致等價於:

def combinations(iterable, r):
    # combinations('ABCD', 2) → AB AC AD BC BD CD
    # combinations(range(4), 3) → 012 013 023 123

    pool = tuple(iterable)
    n = len(pool)
    if r > n:
        return
    indices = list(range(r))

    yield tuple(pool[i] for i in indices)
    while True:
        for i in reversed(range(r)):
            if indices[i] != i + n - r:
                break
        else:
            return
        indices[i] += 1
        for j in range(i+1, r):
            indices[j] = indices[j-1] + 1
        yield tuple(pool