Python __anext__ 方法
最后修改于 2025 年 4 月 8 日
本综合指南探讨了 Python 的 __anext__
方法,该特殊方法启用了异步迭代。我们将涵盖基本用法、自定义异步迭代器、错误处理和实际示例。
基本定义
__anext__
方法是一个协程,它从异步迭代器返回下一个项目。它是 __next__
的异步等价物。
关键特性:它必须定义为异步函数,返回一个可等待对象,并且在没有更多项目可用时引发 StopAsyncIteration
。它与 async for
循环一起使用。
基本异步迭代器实现
这是一个简单的异步迭代器,演示了 __anext__
的用法。它展示了异步迭代所需的最小实现。
class AsyncCounter: def __init__(self, stop): self.current = 0 self.stop = stop def __aiter__(self): return self async def __anext__(self): if self.current >= self.stop: raise StopAsyncIteration self.current += 1 return self.current - 1 async def main(): async for num in AsyncCounter(3): print(num) import asyncio asyncio.run(main())
此示例创建一个异步计数器,该计数器生成从 0 到 2 的数字。__anext__
方法递增计数器并返回值。
当计数器达到停止值时,它会引发 StopAsyncIteration
。这会将迭代结束的信号发送到 async for
循环。
带延迟的异步迭代器
此示例展示了 __anext__
如何包含可等待的操作,例如睡眠,使其可用于 I/O 绑定任务。
class DelayedIterator: def __init__(self, items, delay): self.items = iter(items) self.delay = delay def __aiter__(self): return self async def __anext__(self): try: item = next(self.items) await asyncio.sleep(self.delay) return item except StopIteration: raise StopAsyncIteration async def main(): async for char in DelayedIterator("ABC", 0.5): print(char) asyncio.run(main())
此迭代器在项目之间引入延迟。它包装了一个常规迭代器并添加了异步行为。await asyncio.sleep()
演示了异步操作。
从 StopIteration
到 StopAsyncIteration
的转换很重要。它在异步上下文中正确地指示了迭代的结束。
异步数据获取器
这个实际示例展示了 __anext__
异步地从多个 URL 获取数据,演示了实际用法。
import aiohttp class AsyncDataFetcher: def __init__(self, urls): self.urls = iter(urls) self.session = None async def __aenter__(self): self.session = aiohttp.ClientSession() return self async def __aexit__(self, *args): await self.session.close() def __aiter__(self): return self async def __anext__(self): try: url = next(self.urls) except StopIteration: raise StopAsyncIteration async with self.session.get(url) as response: return await response.text() async def main(): urls = [ 'https://example.com', 'https://pythonlang.cn', 'https://pypi.ac.cn' ] async with AsyncDataFetcher(urls) as fetcher: async for content in fetcher: print(f"Fetched {len(content)} bytes") asyncio.run(main())
这个异步迭代器逐个获取网页。它使用 aiohttp
进行异步 HTTP 请求。__anext__
方法处理每个获取。
上下文管理器方法(__aenter__
、__aexit__
)管理 HTTP 会话生命周期。这确保了正确的资源清理。
__anext__ 中的错误处理
此示例演示了 __anext__
中的正确错误处理,展示了如何在异步迭代期间管理异常。
class SafeAsyncIterator: def __init__(self, data): self.data = iter(data) self.retries = 3 def __aiter__(self): return self async def __anext__(self): for attempt in range(self.retries): try: return next(self.data) except StopIteration: raise StopAsyncIteration except Exception as e: if attempt == self.retries - 1: raise print(f"Error, retrying... ({e})") await asyncio.sleep(1) async def main(): data = [1, 2, "error", 3, 4] try: async for item in SafeAsyncIterator(data): print(f"Got: {item}") except Exception as e: print(f"Failed after retries: {e}") asyncio.run(main())
此迭代器实现了异常的重试逻辑。在传播错误之前,它会多次尝试每个项目。重试之间的延迟是异步的。
该示例展示了如何处理正常的迭代完成(StopAsyncIteration
)和错误情况。这使得迭代器更加健壮。
异步生成器替代方案
此示例将 __anext__
实现与异步生成器进行比较,展示了一种更简洁的替代语法。
async def async_counter(stop): for i in range(stop): await asyncio.sleep(0.1) yield i async def main(): # Using async generator async for num in async_counter(3): print(f"Generator: {num}") # Equivalent __anext__ implementation class Counter: def __init__(self, stop): self.current = 0 self.stop = stop def __aiter__(self): return self async def __anext__(self): if self.current >= self.stop: raise StopAsyncIteration await asyncio.sleep(0.1) self.current += 1 return self.current - 1 async for num in Counter(3): print(f"Class: {num}") asyncio.run(main())
异步生成器为简单情况提供更清晰的语法。但是,带有 __anext__
的类为复杂场景提供了更多控制。
两种实现的功能相同。选择取决于对状态管理、可重用性和复杂性的需求。
最佳实践
- 始终引发 StopAsyncIteration: 正确指示迭代结束
- 优雅地处理错误: 如果迭代失败,请清理资源
- 记录迭代行为: 记录迭代器是有限的/无限的
- 考虑异步生成器: 对于基本情况通常更简单
- 管理资源: 使用异步上下文管理器进行清理
资料来源
作者
列出所有 Python 教程。