Python __anext__ 方法
最后修改于 2025 年 4 月 8 日
本综合指南探讨了 Python 的 __anext__ 方法,该特殊方法启用了异步迭代。我们将涵盖基本用法、自定义异步迭代器、错误处理和实际示例。
基本定义
__anext__ 方法是一个协程,它从异步迭代器返回下一个项目。它是 __next__ 的异步等价物。
关键特性:它必须定义为异步函数,返回一个可等待对象,并且在没有更多项目可用时引发 StopAsyncIteration。它与 async for 循环一起使用。
基本异步迭代器实现
这是一个简单的异步迭代器,演示了 __anext__ 的用法。它展示了异步迭代所需的最小实现。
class AsyncCounter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
self.current += 1
return self.current - 1
async def main():
async for num in AsyncCounter(3):
print(num)
import asyncio
asyncio.run(main())
此示例创建一个异步计数器,该计数器生成从 0 到 2 的数字。__anext__ 方法递增计数器并返回值。
当计数器达到停止值时,它会引发 StopAsyncIteration。这会将迭代结束的信号发送到 async for 循环。
带延迟的异步迭代器
此示例展示了 __anext__ 如何包含可等待的操作,例如睡眠,使其可用于 I/O 绑定任务。
class DelayedIterator:
def __init__(self, items, delay):
self.items = iter(items)
self.delay = delay
def __aiter__(self):
return self
async def __anext__(self):
try:
item = next(self.items)
await asyncio.sleep(self.delay)
return item
except StopIteration:
raise StopAsyncIteration
async def main():
async for char in DelayedIterator("ABC", 0.5):
print(char)
asyncio.run(main())
此迭代器在项目之间引入延迟。它包装了一个常规迭代器并添加了异步行为。await asyncio.sleep() 演示了异步操作。
从 StopIteration 到 StopAsyncIteration 的转换很重要。它在异步上下文中正确地指示了迭代的结束。
异步数据获取器
这个实际示例展示了 __anext__ 异步地从多个 URL 获取数据,演示了实际用法。
import aiohttp
class AsyncDataFetcher:
def __init__(self, urls):
self.urls = iter(urls)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
def __aiter__(self):
return self
async def __anext__(self):
try:
url = next(self.urls)
except StopIteration:
raise StopAsyncIteration
async with self.session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://example.com',
'https://pythonlang.cn',
'https://pypi.ac.cn'
]
async with AsyncDataFetcher(urls) as fetcher:
async for content in fetcher:
print(f"Fetched {len(content)} bytes")
asyncio.run(main())
这个异步迭代器逐个获取网页。它使用 aiohttp 进行异步 HTTP 请求。__anext__ 方法处理每个获取。
上下文管理器方法(__aenter__、__aexit__)管理 HTTP 会话生命周期。这确保了正确的资源清理。
__anext__ 中的错误处理
此示例演示了 __anext__ 中的正确错误处理,展示了如何在异步迭代期间管理异常。
class SafeAsyncIterator:
def __init__(self, data):
self.data = iter(data)
self.retries = 3
def __aiter__(self):
return self
async def __anext__(self):
for attempt in range(self.retries):
try:
return next(self.data)
except StopIteration:
raise StopAsyncIteration
except Exception as e:
if attempt == self.retries - 1:
raise
print(f"Error, retrying... ({e})")
await asyncio.sleep(1)
async def main():
data = [1, 2, "error", 3, 4]
try:
async for item in SafeAsyncIterator(data):
print(f"Got: {item}")
except Exception as e:
print(f"Failed after retries: {e}")
asyncio.run(main())
此迭代器实现了异常的重试逻辑。在传播错误之前,它会多次尝试每个项目。重试之间的延迟是异步的。
该示例展示了如何处理正常的迭代完成(StopAsyncIteration)和错误情况。这使得迭代器更加健壮。
异步生成器替代方案
此示例将 __anext__ 实现与异步生成器进行比较,展示了一种更简洁的替代语法。
async def async_counter(stop):
for i in range(stop):
await asyncio.sleep(0.1)
yield i
async def main():
# Using async generator
async for num in async_counter(3):
print(f"Generator: {num}")
# Equivalent __anext__ implementation
class Counter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.current - 1
async for num in Counter(3):
print(f"Class: {num}")
asyncio.run(main())
异步生成器为简单情况提供更清晰的语法。但是,带有 __anext__ 的类为复杂场景提供了更多控制。
两种实现的功能相同。选择取决于对状态管理、可重用性和复杂性的需求。
最佳实践
- 始终引发 StopAsyncIteration: 正确指示迭代结束
- 优雅地处理错误: 如果迭代失败,请清理资源
- 记录迭代行为: 记录迭代器是有限的/无限的
- 考虑异步生成器: 对于基本情况通常更简单
- 管理资源: 使用异步上下文管理器进行清理
资料来源
作者
列出所有 Python 教程。