串流¶
串流是支援 async/await (async/await-ready) 的高階原語 (high-level primitive),用於處理網路連線。串流不需要使用回呼 (callback) 或低階協定和傳輸 (transport) 就能夠傳送和接收資料。
這是一個使用 asyncio 串流編寫的 TCP echo 用戶端範例:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
另請參閱下方 Examples 段落。
串流函式
下面的高階 asyncio 函式可以用來建立和處理串流:
- async asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)¶
建立網路連線並回傳一對
(reader, writer)物件。回傳的 reader 和 writer 物件是
StreamReader和StreamWriter類別的實例。limit 指定了回傳的
StreamReader實例所使用的緩衝區 (buffer) 大小限制。limit 預設為 64 KiB。其餘的引數會直接傳遞到
loop.create_connection()。備註
The sock argument transfers ownership of the socket to the
StreamWritercreated. To close the socket, call itsclose()method.在 3.7 版的變更: 新增 ssl_handshake_timeout 參數。
在 3.8 版的變更: 新增 happy_eyeballs_delay 和 interleave 參數。
在 3.10 版的變更: 移除 loop 參數。
在 3.11 版的變更: 新增 ssl_shutdown_timeout 參數。
- async asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
啟動 socket 伺服器。
當一個新的用戶端連線被建立時,回呼函式 client_connected_cb 就會被呼叫。該函式會接收到一對引數
(reader, writer),分別為StreamReader和StreamWriter的實例。client_connected_cb 既可以是普通的可呼叫物件 (callable),也可以是一個協程函式;如果它是一個協程函式,它將自動作為
Task來被排程。limit 指定了回傳的
StreamReader實例所使用的緩衝區 (buffer) 大小限制。limit 預設為 64 KiB。剩下的引數將會直接傳遞給
loop.create_server()。備註
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's
close()method.在 3.7 版的變更: 新增 ssl_handshake_timeout 與 start_serving 參數。
在 3.10 版的變更: 移除 loop 參數。
在 3.11 版的變更: 新增 ssl_shutdown_timeout 參數。
在 3.13 版的變更: 新增 keep_alive 參數。
Unix Sockets
- async asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
建立一個 Unix socket 連線並回傳一對
(reader, writer)。與
open_connection()相似,但是是操作 Unix sockets。另請參閱
loop.create_unix_connection()文件。備註
The sock argument transfers ownership of the socket to the
StreamWritercreated. To close the socket, call itsclose()method.可用性: Unix.
在 3.7 版的變更: 新增 ssl_handshake_timeout 參數。path 參數現在可以是個 path-like object
在 3.10 版的變更: 移除 loop 參數。
在 3.11 版的變更: 新增 ssl_shutdown_timeout 參數。
- async asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)¶
啟動一個 Unix socket 伺服器。
與
start_server()相似,但會是操作 Unix sockets。If cleanup_socket is true then the Unix socket will automatically be removed from the filesystem when the server is closed, unless the socket has been replaced after the server has been created.
另請參閱
loop.create_unix_server()文件。備註
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's
close()method.可用性: Unix.
在 3.7 版的變更: 新增 ssl_handshake_timeout 與 start_serving 參數。path 參數現在可以是個 path-like object。
在 3.10 版的變更: 移除 loop 參數。
在 3.11 版的變更: 新增 ssl_shutdown_timeout 參數。
在 3.13 版的變更: 新增 cleanup_socket 參數。
StreamReader¶
- class asyncio.StreamReader¶
表示一個有提供 API 來從 IO 串流中讀取資料的 reader 物件。作為一個 asynchronous iterable,此物件支援
async for陳述式。不建議直接實例化 StreamReader 物件;使用
open_connection()和start_server()會是較好的做法。- feed_eof()¶
Acknowledge the EOF.
- async read(n=-1)¶
從串流中讀取至多 n 個位元組的資料。
如果沒有設定 n 或是被設為
-1,則會持續讀取直到 EOF,然後回傳所有讀取到的bytes。讀取到 EOF 且內部緩衝區是空的,則回傳一個空的bytes物件。如果 n 為
0,則立即回傳一個空的bytes物件。If n is positive, return at most n available
bytesas soon as at least 1 byte is available in the internal buffer. If EOF is received before any byte is read, return an emptybytesobject.
- async readline()¶
讀取一行,其中"行"指的是以
\n結尾的位元組序列。如果讀取到 EOF 而沒有找到
\n,該方法會回傳部分的已讀取資料。如果讀取到 EOF 且內部緩衝區是空的,則回傳一個空的
bytes物件。
- async readexactly(n)¶
讀取剛好 n 個位元組。
如果在讀取完 n 個位元組之前讀取到 EOF,則會引發
IncompleteReadError。使用IncompleteReadError.partial屬性來取得串流結束前已讀取的部分資料。
- async readuntil(separator=b'\n')¶
從串流中持續讀取資料直到出現 separator。
成功後,資料和 separator(分隔符號)會從內部緩衝區中刪除(或者說是被消費掉 (consumed))。回傳的資料在末尾會有一個 separator。
如果讀取的資料量超過了設定的串流限制,將會引發
LimitOverrunError例外,資料將被留在內部緩衝區中,並可以再次被讀取。如果在完整的 separator 被找到之前就讀取到 EOF,則會引發
IncompleteReadError例外,且內部緩衝區會被重置。