Python asyncio
最后修改于 2024 年 1 月 29 日
在本文中,我们展示了如何在Python中使用asyncio模块进行异步编程。
通过异步编程,我们可以实现任务与主程序执行的并发执行。
asyncio模块
asyncio是一个用于在Python中编写异步程序的库。
该模块提供了高级API来
- 并发运行Python协程
- 执行网络I/O和IPC
- 控制子进程
- 通过队列分发任务
- 同步并发代码
并发编程用于两种类型的任务:I/O密集型任务和CPU密集型任务。从网络请求数据、访问数据库或读写文件都是I/O密集型任务。CPU密集型任务是计算量大的任务,例如数学计算或图形处理。
要在Python中进行异步编程,我们使用事件循环、协程和Future。事件循环是负责管理异步任务并分发它们执行的主要任务。协程是调度事件执行的函数。Future是协程执行的结果。结果可能以异常结束。
协程是Python中用于协作式多任务处理的函数,它们可以被暂停和恢复。async关键字用于创建Python协程。await关键字暂停协程的执行,直到它完成并返回结果数据。
Python asyncio简单示例
接下来是一个使用asyncio的简单示例。
#!/usr/bin/python
import asyncio
async def mul(x, y):
return x * y
loop = asyncio.get_event_loop()
try:
res2 = loop.run_until_complete(mul(5, 5))
print(res2)
finally:
loop.close()
该程序创建并运行一个异步函数,该函数将两个数字相乘。
async def mul(x, y):
return x * y
协程是使用async修饰符声明的函数。
loop = asyncio.get_event_loop()
get_event_loop返回一个asyncio事件循环。执行异步代码需要事件循环。
res = loop.run_until_complete(mul(5, 5))
run_until_complete函数运行事件循环直到Future完成。它返回Future的结果,或引发其异常。Future表示异步操作的最终结果。
Python asyncio create_task
create_task函数将给定的协程包装成一个任务并安排其执行。它返回任务对象。任务在get_running_loop返回的循环中执行。
协程被包装成任务以获得额外的功能,例如任务取消或检查就绪状态。
#!/usr/bin/python
import asyncio
async def mul(x, y):
return x * y
loop = asyncio.get_event_loop()
try:
task = loop.create_task(mul(10, 10))
res = loop.run_until_complete(task)
print(res)
finally:
loop.close()
该示例创建一个任务并安排其执行。它打印最终结果。
Python asyncio.sleep
asyncio.sleep函数将当前协程暂停指定的秒数。该函数常用于模拟长时间运行的任务。
#!/usr/bin/python
import asyncio
import time
async def task(tid, n):
await asyncio.sleep(n)
print(f'task {tid} finished')
loop = asyncio.get_event_loop()
tm1 = time.perf_counter()
try:
t1 = loop.create_task(task(1, 3))
loop.run_until_complete(t1)
t2 = loop.create_task(task(2, 2))
loop.run_until_complete(t2)
t3 = loop.create_task(task(3, 1))
loop.run_until_complete(t3)
finally:
loop.close()
tm2 = time.perf_counter()
print(f'Total time elapsed: {tm2-tm1:0.2f} seconds')
在此示例中,我们创建并调度了三个任务。这些任务使用asyncio.sleep模拟了一些工作。每个任务都与主程序异步运行,但任务本身在主程序中顺序运行。我们使用time.perf_counter测量经过的时间。
$ ./sleep.py task 1 finished task 2 finished task 3 finished Total time elapsed: 6.01 seconds
Python asyncio gather
asyncio.gather函数用于并发调度多个协程。
#!/usr/bin/python
import asyncio
import time
async def task(tid, n):
await asyncio.sleep(n)
print(f'task {tid} finished')
loop = asyncio.get_event_loop()
tm1 = time.perf_counter()
try:
tasks = [
loop.create_task(task(1, 3)),
loop.create_task(task(2, 2)),
loop.create_task(task(3, 1))
]
loop.run_until_complete(asyncio.gather(*tasks))
finally:
loop.close()
tm2 = time.perf_counter()
print(f'Total time elapsed: {tm2-tm1:0.2f} seconds')
在此示例中,我们使用asyncio.gather并发运行三个任务。
$ ./asyncio_gather.py task 3 finished task 2 finished task 1 finished Total time elapsed: 3.00 seconds
Python asyncio.run
asyncio.run是一个方便的函数,它简化了我们的代码。该函数创建一个事件循环,调度协程,最后关闭循环。
#!/usr/bin/python
import asyncio
import time
async def task(tid, n):
await asyncio.sleep(n)
print(f'task {tid} finished')
async def main():
t1 = asyncio.create_task(task(1, 3))
t2 = asyncio.create_task(task(2, 2))
t3 = asyncio.create_task(task(3, 1))
await asyncio.gather(t1, t2, t3)
tm1 = time.perf_counter()
asyncio.run(main())
tm2 = time.perf_counter()
print(f'Total time elapsed: {tm2-tm1:0.2f} seconds')
使用asyncio.run,代码更紧凑。
Python asyncio 回显服务器
在下面的示例中,我们创建一个回显服务器。回显服务器将客户端的消息发送回。
#!/usr/bin/python
import asyncio
class EchoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print(f'received: {data}')
self.transport.write(data)
async def main(host, port):
print(f'starting server on port {port}')
loop = asyncio.get_running_loop()
server = await loop.create_server(EchoProtocol, host, port)
await server.serve_forever()
try:
asyncio.run(main('127.0.0.1', 8001))
except KeyboardInterrupt:
print('terminated')
create_server函数返回一个协程,该协程创建一个绑定到主机和端口的TCP服务器。serve_forever函数开始接受连接,直到协程被取消。服务器在协程被取消时关闭。
$ nc 127.0.0.1 8001 Hello! Hello! Hi! Hi!
我们使用nc命令测试服务器。
多个HTTP请求
要进行多个异步HTTP请求,我们使用httpx模块。该模块提供了一个包含同步和异步API的HTTP客户端,并同时支持HTTP/1.1和HTTP/2。
#!/usr/bin/python
import httpx
import asyncio
async def get_async(url):
async with httpx.AsyncClient() as client:
return await client.get(url)
urls = ['http://webcode.me', 'https://httpbin.org/get',
'https://google.com', 'https://stackoverflow.com',
'https://github.com']
async def launch():
resps = await asyncio.gather(*map(get_async, urls))
data = [resp.status_code for resp in resps]
for status_code in data:
print(status_code)
asyncio.run(launch())
该示例对五个网站进行异步HTTP请求。它打印所有提供URL的状态码。
$ ./async_req.py 200 200 200 200 200
Python Playwright 异步示例
许多库都支持异步编程。MS Playwright库允许以同步和异步模式自动化浏览器。
#!/usr/bin/python
import asyncio
from playwright.async_api import async_playwright
async def main():
async with async_playwright() as playwright:
webkit = playwright.webkit
browser = await webkit.launch()
page = await browser.new_page()
url = 'http://webcode.me'
await page.goto(url)
title = await page.title()
print(title)
await browser.close()
asyncio.run(main())
在此示例中,我们使用Playwright获取网页标题。我们使用了async API。
$ ./async_title.py My html page
来源
在本文中,我们使用了Python的asyncio模块。
作者
列出所有 Python 教程。