ZetCode

Python anext 函数

上次修改时间:2025 年 4 月 11 日

本综合指南探讨 Python 的 anext 函数,该函数用于从异步迭代器中检索下一个项目。我们将涵盖异步迭代器、错误处理和异步迭代的实践示例。

基本定义

anext 函数是 next 的异步等效项。 它从异步迭代器中检索下一个项目,如果提供默认值并且迭代器已耗尽,则返回默认值。

主要特征:与异步迭代器一起使用,必须等待,当耗尽时引发 StopAsyncIteration(除非提供了默认值),并在 Python 3.10 中引入。

基本异步迭代器用法

以下是一个使用异步迭代器的简单示例,展示了 anext 如何从异步源检索值。

basic_anext.py
import asyncio

class AsyncCounter:
    def __init__(self, stop):
        self.current = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.current += 1
        return self.current

async def main():
    counter = AsyncCounter(3)
    print(await anext(counter))  # 1
    print(await anext(counter))  # 2
    print(await anext(counter))  # 3

asyncio.run(main())

此示例显示了带有自定义异步迭代器的 anext。 每次调用 anext 都必须等待,并返回迭代器中的下一个值。

AsyncCounter 类使用 __aiter____anext__ 方法实现异步迭代器协议。 请注意用于发出完成信号的 StopAsyncIteration 异常。

使用默认值

anext 可以接受一个默认值,用于在迭代器耗尽时返回,而不是引发 StopAsyncIteration

default_value.py
import asyncio

async def async_gen():
    yield "first"
    yield "second"

async def main():
    ag = async_gen()
    print(await anext(ag))            # 'first'
    print(await anext(ag))            # 'second'
    print(await anext(ag, "default")) # 'default'

asyncio.run(main())

此示例演示了默认值的行为。 在产生两个值后,异步生成器将被耗尽。 第三个 anext 调用返回默认值,而不是引发异常。

当您想要在没有 try/except 块的情况下处理迭代结束时,此模式非常有用。 默认值可以是任何值,包括 None

错误处理

此示例显示了在使用带有异步迭代器的 anext 时如何正确处理错误。

error_handling.py
import asyncio

async def failing_async_gen():
    yield 1
    raise ValueError("Something went wrong")

async def main():
    ag = failing_async_gen()
    try:
        print(await anext(ag))  # 1
        print(await anext(ag))  # Raises ValueError
    except ValueError as e:
        print(f"Caught error: {e}")

    # Exhausted iterator case
    ag = failing_async_gen()
    try:
        await anext(ag)
        await anext(ag)
        await anext(ag)  # Would raise StopAsyncIteration
    except StopAsyncIteration:
        print("Iterator exhausted")

asyncio.run(main())

这些示例演示了使用 anext 进行错误处理。 第一个例子展示了如何处理来自迭代器的自定义异常。 第二个例子展示了如何捕获 StopAsyncIteration

使用异步迭代器时,正确的错误处理至关重要,因为它们可能会引发与迭代相关的异常和特定于域的异常。

使用异步生成器

anext 与异步生成器无缝协作,异步生成器是创建异步迭代器的最常见方式。

async_generators.py
import asyncio

async def async_data_fetcher():
    for i in range(3):
        await asyncio.sleep(0.1)
        yield f"data-{i}"

async def main():
    fetcher = async_data_fetcher()
    while True:
        try:
            data = await anext(fetcher)
            print(f"Received: {data}")
        except StopAsyncIteration:
            print("No more data")
            break

asyncio.run(main())

此示例显示了带有异步生成器函数的 anext。 生成器异步生成值,anext 逐个检索它们。

当您不知道异步迭代器中项目的确切数量时,while-try-except 模式在消费异步迭代器时很常见。

真实世界的 API 分页

此示例演示了一个实际用例:使用 anext 遍历异步 API。

api_pagination.py
import asyncio
from typing import AsyncIterator, Dict, Any

class AsyncAPIClient:
    def __init__(self):
        self.page = 0
    
    async def fetch_page(self) -> Dict[str, Any]:
        self.page += 1
        if self.page > 3:
            return {"items": [], "has_more": False}
        
        await asyncio.sleep(0.2)
        return {
            "items": [f"item-{self.page}-{i}" for i in range(2)],
            "has_more": self.page < 3
        }

    def __aiter__(self) -> AsyncIterator[str]:
        return self
    
    async def __anext__(self) -> str:
        if not hasattr(self, "_current_page"):
            self._current_page = await self.fetch_page()
            self._items_iter = iter(self._current_page["items"])
        
        try:
            return next(self._items_iter)
        except StopIteration:
            if not self._current_page["has_more"]:
                raise StopAsyncIteration
            
            self._current_page = await self.fetch_page()
            self._items_iter = iter(self._current_page["items"])
            return await self.__anext__()

async def main():
    client = AsyncAPIClient()
    try:
        while True:
            item = await anext(client)
            print(f"Processing: {item}")
    except StopAsyncIteration:
        print("All items processed")

asyncio.run(main())

此示例实现了通过异步 API 进行分页。 AsyncAPIClient 获取数据页面并生成单个项目。 anext 处理页面项目和页面转换的迭代。

该实现展示了如何在提供简单接口供使用者通过 anext 使用的同时,管理复杂的异步迭代状态。

最佳实践

资料来源

作者

我的名字是 Jan Bodnar,我是一位充满激情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直撰写编程文章。 迄今为止,我已经撰写了超过 1,400 篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程