ZetCode

Python ThreadPoolExecutor

最后修改于 2025 年 4 月 19 日

本教程将使用 ThreadPoolExecutor 探索 Python 中的并发编程,这是一种高效管理线程的强大工具。

并发编程旨在通过同时执行任务来提高代码效率。这可以通过多线程、并行或异步操作来实现。在这里,我们专注于使用 ThreadPoolExecutor 进行多线程。

线程是程序中一个独立的执行流。线程在 I/O 密集型任务(如文件下载或数据库查询)方面表现出色。Python 的全局解释器锁 (GIL) 限制了真正的并行性,使得线程更适合 I/O 密集型任务,而不是 CPU 密集型任务,后者更适合使用多进程。

全局解释器锁 (GIL) 是 Python 中的一种机制,它确保即使在多核系统上,一次也只有一个线程执行 Python 字节码。这可以防止并发问题,但会限制并行执行。

threading 模块提供了在 Python 中创建和管理线程的基础接口,从而实现并发任务执行。

ThreadPoolExecutor

concurrent.futures 模块提供了一个高级接口来并发执行任务。ThreadPoolExecutor 是该模块的一部分,它通过维护一个可重用的工作线程池来简化线程管理,从而简化了线程的创建、执行和清理。

创建新线程会产生显著的开销。ThreadPoolExecutor 的工作线程在任务完成后会被重用,从而提高性能和效率。

Future

一个 Future 对象代表异步操作的最终结果,该结果可能是一个值或一个异常。它的 result 方法在操作完成后检索操作的结果。

Python ThreadPoolExecutor submit

submit 方法安排一个可调用对象执行,并返回一个 Future 对象,该对象跟踪任务的进度和结果。

submitfun.py
#!/usr/bin/python

from time import sleep
from concurrent.futures import ThreadPoolExecutor
import threading

def task(id, n):

    print(f"thread {id} started")
    print(f"thread {id} : {threading.get_ident()}")
    sleep(n)
    print(f"thread {id} completed")


with ThreadPoolExecutor() as executor:

    executor.submit(task, 1, 4)
    executor.submit(task, 2, 3)
    executor.submit(task, 3, 2)

此示例演示了使用 ThreadPoolExecutor 的 submit 方法提交三个任务以进行并发执行。

def task(id, n):

    print(f"thread {id} started")
    print(f"thread {id} : {threading.get_ident()}")
    sleep(n)
    print(f"thread {id} completed")

task 函数打印线程的 ID、其唯一标识符和状态消息。它使用 sleep 来模拟耗时操作,例如 I/O 密集型工作。

with ThreadPoolExecutor() as executor:

我们将 ThreadPoolExecutor 实例创建为一个上下文管理器,确保它在任务完成后自动关闭。

executor.submit(task, 1, 4)
executor.submit(task, 2, 3)
executor.submit(task, 3, 2)

使用 submit 提交了三个任务,每个任务都有唯一的 ID 和睡眠持续时间,从而实现并发执行。

$ ./submitfun.py
thread 1 started
thread 1 : 140563097032256
thread 2 started
thread 2 : 140563088639552
thread 3 started
thread 3 : 140563005306432
thread 3 completed
thread 2 completed
thread 1 completed

Python ThreadPoolExecutor map

map 方法将一个函数应用于一个或多个可迭代对象中的每个项,并发执行任务并按顺序返回结果。

mapfun.py
#!/usr/bin/python

from time import sleep
from concurrent.futures import ThreadPoolExecutor
import threading

def task(id, n):

    print(f"thread {id} started")
    print(f"thread {id} : {threading.get_ident()}")
    sleep(n)
    print(f"thread {id} completed")

with ThreadPoolExecutor() as executor:

    executor.map(task, [1, 2, 3], [4, 3, 2])

此示例使用 map 重新实现了先前的程序,传递了线程 ID 和睡眠持续时间的列表以进行并发执行。

Python ThreadPoolExecutor Future.result

Future 对象封装了并发任务的结果。result 方法检索任务的返回值,并会阻塞直到任务完成。

resultfun.py
#!/usr/bin/python

from time import sleep, perf_counter
import random
from concurrent.futures import ThreadPoolExecutor

def task(tid):

    r = random.randint(1, 5)
    print(f'task {tid} started, sleeping {r} secs')
    sleep(r)

    return f'finished task {tid}, slept {r}'

start = perf_counter()

with ThreadPoolExecutor() as executor:

    t1 = executor.submit(task, 1)
    t2 = executor.submit(task, 2)
    t3 = executor.submit(task, 3)

    print(t1.result())
    print(t2.result())
    print(t3.result())

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

此示例运行睡眠时间随机的任务,检索它们的结果,并使用 perf_counter 测量总执行时间。

return f'finished task {tid}, slept {r}'

任务返回一个描述其完成情况的字符串,可通过 Future 对象的 result 方法访问。

start = perf_counter()

perf_counter 函数提供高精度计时,以测量并发任务的总持续时间。

t1 = executor.submit(task, 1)
t2 = executor.submit(task, 2)
t3 = executor.submit(task, 3)

print(t1.result())
print(t2.result())
print(t3.result())

提交了三个任务,并使用 result 检索了它们的结果,result 会阻塞直到每个任务完成,从而保留提交顺序。

$ ./resultfun.py
task 1 started, sleeping 3 secs
task 2 started, sleeping 4 secs
task 3 started, sleeping 1 secs
finished task 1, slept 3
finished task 2, slept 4
finished task 3, slept 1
It took 4.005295900977217 second(s) to finish.

由于并发执行,程序运行时间等于最长任务的运行时间。result 方法的阻塞特性确保结果按提交顺序出现。下一个示例解决了这个限制。

Python ThreadPoolExecutor as_completed

as_completed 函数提供了一个 Future 对象的迭代器,它在任务完成时产生结果,而不考虑提交顺序。

请注意,map 不能与 as_completed 一起使用,因为 map 按可迭代对象的顺序返回结果。

as_completed.py
#!/usr/bin/python

from time import sleep, perf_counter
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def task(tid):

    r = random.randint(1, 5)
    print(f'task {tid} started, sleeping {r} secs')
    sleep(r)

    return f'finished task {tid}, slept {r}'

start = perf_counter()

with ThreadPoolExecutor() as executor:

    tids = [1, 2, 3]
    futures = []

    for tid in tids:
        futures.append(executor.submit(task, tid))

    for res in as_completed(futures):
        print(res.result())

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

此示例以任务完成的顺序检索任务结果,使用 as_completed 动态处理 Futures。

$ ./as_completed.py
task 1 started, sleeping 3 secs
task 2 started, sleeping 4 secs
task 3 started, sleeping 2 secs
finished task 3, slept 2
finished task 1, slept 3
finished task 2, slept 4
It took 4.00534593896009 second(s) to finish.

多个并发 HTTP 请求

此示例使用 ThreadPoolExecutor 并发执行多个 HTTP 请求,利用 requests 库获取网页状态码。

web_requests.py
#!/usr/bin/python

import requests
import concurrent.futures
import time

def get_status(url):

    resp = requests.get(url=url)
    return resp.status_code

urls = ['http://webcode.me', 'https://httpbin.org/get',
    'https://google.com', 'https://stackoverflow.com',
    'https://github.com', 'https://clojure.net.cn',
    'https://fsharp.org']

tm1 = time.perf_counter()

with concurrent.futures.ThreadPoolExecutor() as executor:

    futures = []

    for url in urls:
        futures.append(executor.submit(get_status, url=url))

    for future in concurrent.futures.as_completed(futures):
        print(future.result())

tm2 = time.perf_counter()
print(f'elapsed {tm2-tm1:0.2f} seconds')

该程序并发检查多个网站的 HTTP 状态码,展示了 ThreadPoolExecutor 在 I/O 密集型任务方面的效率。

$ ./web_requests.py 
200
200
200
200
200
200
200
elapsed 0.81 seconds

并发 ping

此示例使用 ThreadPoolExecutor 并发 ping 多个网站,通过 subprocess 模块执行外部 ping 命令。

pinging.py
#!/usr/bin/python

from time import perf_counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import subprocess


def task(url):

    ok, _ = subprocess.getstatusoutput(
        [f'ping -c 3 -w 10 {url}'])
    
    return ok == 0, url


urls = ['webcode.me', 'clojure.org', 'fsharp.org', 
    'www.perl.org', 'python.org', 'go.dev', 'raku.org']

start = perf_counter()

with ThreadPoolExecutor() as executor:

    futures = []

    for url in urls:
        futures.append(executor.submit(task, url))

    for future in as_completed(futures):

        r, u = future.result()
        
        if r:
            print(f'OK -> {u}')
        else:
            print(f'failed -> {u}')

finish = perf_counter()

print(f"elapsed {finish-start} second(s)")

该程序并发 ping 多个网站,根据 ping 命令的退出状态报告其可用性。

ok, _ = subprocess.getstatusoutput(
    [f'ping -c 3 -w 10 {url}'])

getstatusoutput 函数捕获 ping 命令的退出代码和输出,该命令向指定 URL 发送三个具有 10 秒超时的 ICMP 数据包。

$ ./pinging.py 
OK -> go.dev
OK -> fsharp.org
OK -> www.perl.org
OK -> python.org
OK -> raku.org
OK -> clojure.org
OK -> webcode.me
elapsed 2.384801392967347 second(s)

并发文件读取

此示例使用 ThreadPoolExecutor 并发读取多个文本文件,展示了其在 I/O 密集型文件操作中的应用。

file_reading.py
#!/usr/bin/python

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import perf_counter

def read_file(filename):
    try:
        with open(filename, 'r') as f:
            content = f.read()
        return f"Read {filename}: {len(content)} chars"
    except Exception as e:
        return f"Error reading {filename}: {e}"

files = ['file1.txt', 'file2.txt', 'file3.txt']

start = perf_counter()

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(read_file, f) for f in files]
    for future in as_completed(futures):
        print(future.result())

finish = perf_counter()
print(f"Elapsed {finish-start:.2f} seconds")

该程序并发读取三个文本文件,报告从每个文件中读取的字符数或遇到的任何错误。

并发数据库查询

此示例演示了使用 ThreadPoolExecutor 并发执行多个 SQLite 数据库查询,非常适合 I/O 密集型数据库任务。

db_queries.py
#!/usr/bin/python

import sqlite3
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import perf_counter

def query_db(query):
    with sqlite3.connect('example.db') as conn:
        cursor = conn.cursor()
        cursor.execute(query)
        result = cursor.fetchall()
    return f"Query '{query}' returned {len(result)} rows"

queries = [
    "SELECT * FROM users WHERE age > 20",
    "SELECT * FROM orders WHERE total > 100",
    "SELECT * FROM products"
]

start = perf_counter()

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(query_db, q) for q in queries]
    for future in as_completed(futures):
        print(future.result())

finish = perf_counter()
print(f"Elapsed {finish-start:.2f} seconds")

该程序并发执行三个 SQLite 查询,报告每个查询返回的行数,展示了高效的数据库访问。

并发图片下载

此示例使用 ThreadPoolExecutor 从 URL 并发下载多张图片,利用 requests 库进行 HTTP 请求。

image_downloads.py
#!/usr/bin/python

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import perf_counter

def download_image(url):
    try:
        resp = requests.get(url)
        filename = url.split('/')[-1]
        with open(filename, 'wb') as f:
            f.write(resp.content)
        return f"Downloaded {filename}"
    except Exception as e:
        return f"Error downloading {url}: {e}"

urls = [
    'https://example.com/image1.jpg',
    'https://example.com/image2.png',
    'https://example.com/image3.jpg'
]

start = perf_counter()

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(download_image, u) for u in urls]
    for future in as_completed(futures):
        print(future.result())

finish = perf_counter()
print(f"Elapsed {finish-start:.2f} seconds")

该程序并发下载三张图片,将它们本地保存并报告成功或错误,非常适合网络密集型任务。

并发文本处理

此示例使用 ThreadPoolExecutor 并发处理多个文本字符串,计算每个字符串中的单词数,适用于轻量级文本任务。

text_processing.py
#!/usr/bin/python

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import perf_counter

def count_words(text):
    words = text.split()
    return f"Text '{text[:20]}...' has {len(words)} words"

texts = [
    "Python is a versatile programming language",
    "Concurrency improves performance in I/O tasks",
    "ThreadPoolExecutor simplifies thread management"
]

start = perf_counter()

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(count_words, t) for t in texts]
    for future in as_completed(futures):
        print(future.result())

finish = perf_counter()
print(f"Elapsed {finish-start:.2f} seconds")

该程序并发计算三个文本字符串的单词数,报告每个字符串的单词计数,展示了简单的并发文本处理。

来源

Python ThreadPoolExecutor - 语言参考

本教程涵盖了使用 ThreadPoolExecutor 进行并发编程,展示了它在 Python 中处理 I/O 密集型任务的有效性。

作者

我的名字是 Jan Bodnar,我是一名经验丰富的程序员。自 2007 年以来,我撰写了超过 1400 篇编程文章和 8 本电子书,并拥有十多年的编程教学经验。

列出所有 Python 教程