Python __aiter__ 方法
最后修改于 2025 年 4 月 8 日
这份全面的指南探讨了 Python 的 __aiter__
方法,这个特殊方法使异步迭代成为可能。我们将涵盖基本用法、自定义异步迭代器、实际示例和常见模式。
基本定义
__aiter__
方法定义了对象在 async for
循环中使用时的行为。它返回一个异步迭代器对象,该对象必须实现 __anext__
。
主要特征:它必须是一个异步函数(用 async def
定义),返回一个异步迭代器,并且与 __anext__
协同工作以支持异步迭代。它是 __iter__
的异步等价物。
基本异步迭代器
这是一个简单的实现,展示了 __aiter__
如何与 __anext__
协同工作来创建一个异步迭代器。
class AsyncCounter: def __init__(self, stop): self.stop = stop self.current = 0 async def __aiter__(self): return self async def __anext__(self): if self.current >= self.stop: raise StopAsyncIteration self.current += 1 return self.current - 1 async def main(): async for i in AsyncCounter(3): print(i) import asyncio asyncio.run(main())
这个示例创建了一个异步计数器,它产生从 0 到 2 的数字。__aiter__
方法返回 self,而 __anext__
产生值,直到达到停止值。
请注意使用 StopAsyncIteration
代替 StopIteration
,以及需要异步上下文的 async for
循环。
异步数据获取器
这个例子演示了一个实际的用例——异步地从多个 URL 获取数据,同时支持异步迭代。
import aiohttp class AsyncDataFetcher: def __init__(self, urls): self.urls = urls async def __aiter__(self): self.session = aiohttp.ClientSession() self.index = 0 return self async def __anext__(self): if self.index >= len(self.urls): await self.session.close() raise StopAsyncIteration url = self.urls[self.index] self.index += 1 async with self.session.get(url) as response: data = await response.text() return (url, len(data)) async def main(): urls = [ 'https://pythonlang.cn', 'https://docs.pythonlang.cn', 'https://pypi.ac.cn' ] async for url, length in AsyncDataFetcher(urls): print(f"{url}: {length} bytes") asyncio.run(main())
这个异步获取器并发地下载多个网页。 __aiter__
设置 HTTP 会话,而 __anext__
依次获取每个 URL。
迭代器通过在迭代完成时关闭会话来正确管理资源。每次迭代返回 URL 和内容长度的元组。
异步生成器作为迭代器
Python 3.6+ 允许使用异步生成器,它们自动实现 __aiter__
和 __anext__
。
async def async_counter(stop): for i in range(stop): await asyncio.sleep(0.1) # Simulate async work yield i async def main(): async for num in async_counter(5): print(num) asyncio.run(main())
这个异步生成器简化了异步迭代器的创建。 在底层,它实现了与具有 __aiter__
和 __anext__
的类相同的协议。
生成器在每个 yield
和 await
处暂停,使其非常适合异步产生值。不需要显式的 StopAsyncIteration
。
缓冲异步读取器
这个示例展示了一个异步迭代器,它以块的形式读取文件,这对于处理大型文件而无需将所有内容加载到内存中非常有用。
class AsyncFileReader: def __init__(self, filename, chunk_size=1024): self.filename = filename self.chunk_size = chunk_size async def __aiter__(self): self.file = open(self.filename, 'rb') return self async def __anext__(self): data = await asyncio.to_thread( self.file.read, self.chunk_size ) if not data: self.file.close() raise StopAsyncIteration return data async def main(): async for chunk in AsyncFileReader('large_file.dat'): print(f"Read {len(chunk)} bytes") asyncio.run(main())
这个读取器使用线程池处理文件块以进行阻塞 I/O 操作。 __aiter__
打开文件,而 __anext__
读取每个块。
asyncio.to_thread
在单独的线程中运行阻塞文件操作,使异步循环保持响应。 迭代器在完成后正确关闭文件。
速率限制的异步生产者
此示例演示了一个以速率限制产生项目的异步迭代器,这对于具有请求限制的 API 非常有用。
class RateLimitedProducer: def __init__(self, items, requests_per_second): self.items = items self.delay = 1.0 / requests_per_second async def __aiter__(self): self.index = 0 return self async def __anext__(self): if self.index >= len(self.items): raise StopAsyncIteration item = self.items[self.index] self.index += 1 await asyncio.sleep(self.delay) return item async def main(): items = [f"item_{i}" for i in range(10)] async for item in RateLimitedProducer(items, 2): # 2 items/sec print(f"Processed {item} at {time.time()}") asyncio.run(main())
这个生产者以受控的速率产生项目。 __anext__
方法在项目之间休眠以维持所需的速率。
睡眠持续时间根据所需的速率(项目/秒)计算。 此模式在与速率限制的 API 或服务交互时非常有用。
最佳实践
- 资源管理: 完成后在
__anext__
中清理资源 - 错误处理: 处理并正确指示迭代期间的错误
- 使用异步生成器: 对于更简单的情况,首选它们
- 文档行为: 清楚地记录迭代模式
- 考虑取消: 正确处理 asyncio 取消
资料来源
作者
列出所有 Python 教程。