Python __aiter__ 方法
最后修改于 2025 年 4 月 8 日
这份全面的指南探讨了 Python 的 __aiter__ 方法,这个特殊方法使异步迭代成为可能。我们将涵盖基本用法、自定义异步迭代器、实际示例和常见模式。
基本定义
__aiter__ 方法定义了对象在 async for 循环中使用时的行为。它返回一个异步迭代器对象,该对象必须实现 __anext__。
主要特征:它必须是一个异步函数(用 async def 定义),返回一个异步迭代器,并且与 __anext__ 协同工作以支持异步迭代。它是 __iter__ 的异步等价物。
基本异步迭代器
这是一个简单的实现,展示了 __aiter__ 如何与 __anext__ 协同工作来创建一个异步迭代器。
class AsyncCounter:
def __init__(self, stop):
self.stop = stop
self.current = 0
async def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
self.current += 1
return self.current - 1
async def main():
async for i in AsyncCounter(3):
print(i)
import asyncio
asyncio.run(main())
这个示例创建了一个异步计数器,它产生从 0 到 2 的数字。__aiter__ 方法返回 self,而 __anext__ 产生值,直到达到停止值。
请注意使用 StopAsyncIteration 代替 StopIteration,以及需要异步上下文的 async for 循环。
异步数据获取器
这个例子演示了一个实际的用例——异步地从多个 URL 获取数据,同时支持异步迭代。
import aiohttp
class AsyncDataFetcher:
def __init__(self, urls):
self.urls = urls
async def __aiter__(self):
self.session = aiohttp.ClientSession()
self.index = 0
return self
async def __anext__(self):
if self.index >= len(self.urls):
await self.session.close()
raise StopAsyncIteration
url = self.urls[self.index]
self.index += 1
async with self.session.get(url) as response:
data = await response.text()
return (url, len(data))
async def main():
urls = [
'https://pythonlang.cn',
'https://docs.pythonlang.cn',
'https://pypi.ac.cn'
]
async for url, length in AsyncDataFetcher(urls):
print(f"{url}: {length} bytes")
asyncio.run(main())
这个异步获取器并发地下载多个网页。 __aiter__ 设置 HTTP 会话,而 __anext__ 依次获取每个 URL。
迭代器通过在迭代完成时关闭会话来正确管理资源。每次迭代返回 URL 和内容长度的元组。
异步生成器作为迭代器
Python 3.6+ 允许使用异步生成器,它们自动实现 __aiter__ 和 __anext__。
async def async_counter(stop):
for i in range(stop):
await asyncio.sleep(0.1) # Simulate async work
yield i
async def main():
async for num in async_counter(5):
print(num)
asyncio.run(main())
这个异步生成器简化了异步迭代器的创建。 在底层,它实现了与具有 __aiter__ 和 __anext__ 的类相同的协议。
生成器在每个 yield 和 await 处暂停,使其非常适合异步产生值。不需要显式的 StopAsyncIteration。
缓冲异步读取器
这个示例展示了一个异步迭代器,它以块的形式读取文件,这对于处理大型文件而无需将所有内容加载到内存中非常有用。
class AsyncFileReader:
def __init__(self, filename, chunk_size=1024):
self.filename = filename
self.chunk_size = chunk_size
async def __aiter__(self):
self.file = open(self.filename, 'rb')
return self
async def __anext__(self):
data = await asyncio.to_thread(
self.file.read, self.chunk_size
)
if not data:
self.file.close()
raise StopAsyncIteration
return data
async def main():
async for chunk in AsyncFileReader('large_file.dat'):
print(f"Read {len(chunk)} bytes")
asyncio.run(main())
这个读取器使用线程池处理文件块以进行阻塞 I/O 操作。 __aiter__ 打开文件,而 __anext__ 读取每个块。
asyncio.to_thread 在单独的线程中运行阻塞文件操作,使异步循环保持响应。 迭代器在完成后正确关闭文件。
速率限制的异步生产者
此示例演示了一个以速率限制产生项目的异步迭代器,这对于具有请求限制的 API 非常有用。
class RateLimitedProducer:
def __init__(self, items, requests_per_second):
self.items = items
self.delay = 1.0 / requests_per_second
async def __aiter__(self):
self.index = 0
return self
async def __anext__(self):
if self.index >= len(self.items):
raise StopAsyncIteration
item = self.items[self.index]
self.index += 1
await asyncio.sleep(self.delay)
return item
async def main():
items = [f"item_{i}" for i in range(10)]
async for item in RateLimitedProducer(items, 2): # 2 items/sec
print(f"Processed {item} at {time.time()}")
asyncio.run(main())
这个生产者以受控的速率产生项目。 __anext__ 方法在项目之间休眠以维持所需的速率。
睡眠持续时间根据所需的速率(项目/秒)计算。 此模式在与速率限制的 API 或服务交互时非常有用。
最佳实践
- 资源管理: 完成后在
__anext__中清理资源 - 错误处理: 处理并正确指示迭代期间的错误
- 使用异步生成器: 对于更简单的情况,首选它们
- 文档行为: 清楚地记录迭代模式
- 考虑取消: 正确处理 asyncio 取消
资料来源
作者
列出所有 Python 教程。