ZetCode

Python __anext__ 方法

最后修改于 2025 年 4 月 8 日

本综合指南探讨了 Python 的 __anext__ 方法,该特殊方法启用了异步迭代。我们将涵盖基本用法、自定义异步迭代器、错误处理和实际示例。

基本定义

__anext__ 方法是一个协程,它从异步迭代器返回下一个项目。它是 __next__ 的异步等价物。

关键特性:它必须定义为异步函数,返回一个可等待对象,并且在没有更多项目可用时引发 StopAsyncIteration。它与 async for 循环一起使用。

基本异步迭代器实现

这是一个简单的异步迭代器,演示了 __anext__ 的用法。它展示了异步迭代所需的最小实现。

basic_anext.py
class AsyncCounter:
    def __init__(self, stop):
        self.current = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        self.current += 1
        return self.current - 1

async def main():
    async for num in AsyncCounter(3):
        print(num)

import asyncio
asyncio.run(main())

此示例创建一个异步计数器,该计数器生成从 0 到 2 的数字。__anext__ 方法递增计数器并返回值。

当计数器达到停止值时,它会引发 StopAsyncIteration。这会将迭代结束的信号发送到 async for 循环。

带延迟的异步迭代器

此示例展示了 __anext__ 如何包含可等待的操作,例如睡眠,使其可用于 I/O 绑定任务。

delayed_anext.py
class DelayedIterator:
    def __init__(self, items, delay):
        self.items = iter(items)
        self.delay = delay

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            item = next(self.items)
            await asyncio.sleep(self.delay)
            return item
        except StopIteration:
            raise StopAsyncIteration

async def main():
    async for char in DelayedIterator("ABC", 0.5):
        print(char)

asyncio.run(main())

此迭代器在项目之间引入延迟。它包装了一个常规迭代器并添加了异步行为。await asyncio.sleep() 演示了异步操作。

StopIterationStopAsyncIteration 的转换很重要。它在异步上下文中正确地指示了迭代的结束。

异步数据获取器

这个实际示例展示了 __anext__ 异步地从多个 URL 获取数据,演示了实际用法。

async_fetcher.py
import aiohttp

class AsyncDataFetcher:
    def __init__(self, urls):
        self.urls = iter(urls)
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *args):
        await self.session.close()

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            url = next(self.urls)
        except StopIteration:
            raise StopAsyncIteration
        
        async with self.session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        'https://example.com',
        'https://pythonlang.cn',
        'https://pypi.ac.cn'
    ]
    
    async with AsyncDataFetcher(urls) as fetcher:
        async for content in fetcher:
            print(f"Fetched {len(content)} bytes")

asyncio.run(main())

这个异步迭代器逐个获取网页。它使用 aiohttp 进行异步 HTTP 请求。__anext__ 方法处理每个获取。

上下文管理器方法(__aenter____aexit__)管理 HTTP 会话生命周期。这确保了正确的资源清理。

__anext__ 中的错误处理

此示例演示了 __anext__ 中的正确错误处理,展示了如何在异步迭代期间管理异常。

error_handling.py
class SafeAsyncIterator:
    def __init__(self, data):
        self.data = iter(data)
        self.retries = 3

    def __aiter__(self):
        return self

    async def __anext__(self):
        for attempt in range(self.retries):
            try:
                return next(self.data)
            except StopIteration:
                raise StopAsyncIteration
            except Exception as e:
                if attempt == self.retries - 1:
                    raise
                print(f"Error, retrying... ({e})")
                await asyncio.sleep(1)

async def main():
    data = [1, 2, "error", 3, 4]
    
    try:
        async for item in SafeAsyncIterator(data):
            print(f"Got: {item}")
    except Exception as e:
        print(f"Failed after retries: {e}")

asyncio.run(main())

此迭代器实现了异常的重试逻辑。在传播错误之前,它会多次尝试每个项目。重试之间的延迟是异步的。

该示例展示了如何处理正常的迭代完成(StopAsyncIteration)和错误情况。这使得迭代器更加健壮。

异步生成器替代方案

此示例将 __anext__ 实现与异步生成器进行比较,展示了一种更简洁的替代语法。

async_gen.py
async def async_counter(stop):
    for i in range(stop):
        await asyncio.sleep(0.1)
        yield i

async def main():
    # Using async generator
    async for num in async_counter(3):
        print(f"Generator: {num}")
    
    # Equivalent __anext__ implementation
    class Counter:
        def __init__(self, stop):
            self.current = 0
            self.stop = stop
        
        def __aiter__(self):
            return self
        
        async def __anext__(self):
            if self.current >= self.stop:
                raise StopAsyncIteration
            await asyncio.sleep(0.1)
            self.current += 1
            return self.current - 1
    
    async for num in Counter(3):
        print(f"Class: {num}")

asyncio.run(main())

异步生成器为简单情况提供更清晰的语法。但是,带有 __anext__ 的类为复杂场景提供了更多控制。

两种实现的功能相同。选择取决于对状态管理、可重用性和复杂性的需求。

最佳实践

资料来源

作者

我叫 Jan Bodnar,是一位充满热情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直撰写编程文章。 迄今为止,我撰写了超过 1,400 篇文章和 8 本电子书。 我在教授编程方面拥有超过十年的经验。

列出所有 Python 教程