Python ThreadPoolExecutor
最后修改于 2025 年 4 月 19 日
本教程将使用 ThreadPoolExecutor 探索 Python 中的并发编程,这是一种高效管理线程的强大工具。
并发编程旨在通过同时执行任务来提高代码效率。这可以通过多线程、并行或异步操作来实现。在这里,我们专注于使用 ThreadPoolExecutor 进行多线程。
线程是程序中一个独立的执行流。线程在 I/O 密集型任务(如文件下载或数据库查询)方面表现出色。Python 的全局解释器锁 (GIL) 限制了真正的并行性,使得线程更适合 I/O 密集型任务,而不是 CPU 密集型任务,后者更适合使用多进程。
全局解释器锁 (GIL) 是 Python 中的一种机制,它确保即使在多核系统上,一次也只有一个线程执行 Python 字节码。这可以防止并发问题,但会限制并行执行。
threading
模块提供了在 Python 中创建和管理线程的基础接口,从而实现并发任务执行。
ThreadPoolExecutor
concurrent.futures
模块提供了一个高级接口来并发执行任务。ThreadPoolExecutor 是该模块的一部分,它通过维护一个可重用的工作线程池来简化线程管理,从而简化了线程的创建、执行和清理。
创建新线程会产生显著的开销。ThreadPoolExecutor 的工作线程在任务完成后会被重用,从而提高性能和效率。
Future
一个 Future 对象代表异步操作的最终结果,该结果可能是一个值或一个异常。它的 result
方法在操作完成后检索操作的结果。
Python ThreadPoolExecutor submit
submit
方法安排一个可调用对象执行,并返回一个 Future 对象,该对象跟踪任务的进度和结果。
#!/usr/bin/python from time import sleep from concurrent.futures import ThreadPoolExecutor import threading def task(id, n): print(f"thread {id} started") print(f"thread {id} : {threading.get_ident()}") sleep(n) print(f"thread {id} completed") with ThreadPoolExecutor() as executor: executor.submit(task, 1, 4) executor.submit(task, 2, 3) executor.submit(task, 3, 2)
此示例演示了使用 ThreadPoolExecutor 的 submit
方法提交三个任务以进行并发执行。
def task(id, n): print(f"thread {id} started") print(f"thread {id} : {threading.get_ident()}") sleep(n) print(f"thread {id} completed")
task
函数打印线程的 ID、其唯一标识符和状态消息。它使用 sleep
来模拟耗时操作,例如 I/O 密集型工作。
with ThreadPoolExecutor() as executor:
我们将 ThreadPoolExecutor 实例创建为一个上下文管理器,确保它在任务完成后自动关闭。
executor.submit(task, 1, 4) executor.submit(task, 2, 3) executor.submit(task, 3, 2)
使用 submit
提交了三个任务,每个任务都有唯一的 ID 和睡眠持续时间,从而实现并发执行。
$ ./submitfun.py thread 1 started thread 1 : 140563097032256 thread 2 started thread 2 : 140563088639552 thread 3 started thread 3 : 140563005306432 thread 3 completed thread 2 completed thread 1 completed
Python ThreadPoolExecutor map
map
方法将一个函数应用于一个或多个可迭代对象中的每个项,并发执行任务并按顺序返回结果。
#!/usr/bin/python from time import sleep from concurrent.futures import ThreadPoolExecutor import threading def task(id, n): print(f"thread {id} started") print(f"thread {id} : {threading.get_ident()}") sleep(n) print(f"thread {id} completed") with ThreadPoolExecutor() as executor: executor.map(task, [1, 2, 3], [4, 3, 2])
此示例使用 map
重新实现了先前的程序,传递了线程 ID 和睡眠持续时间的列表以进行并发执行。
Python ThreadPoolExecutor Future.result
Future 对象封装了并发任务的结果。result
方法检索任务的返回值,并会阻塞直到任务完成。
#!/usr/bin/python from time import sleep, perf_counter import random from concurrent.futures import ThreadPoolExecutor def task(tid): r = random.randint(1, 5) print(f'task {tid} started, sleeping {r} secs') sleep(r) return f'finished task {tid}, slept {r}' start = perf_counter() with ThreadPoolExecutor() as executor: t1 = executor.submit(task, 1) t2 = executor.submit(task, 2) t3 = executor.submit(task, 3) print(t1.result()) print(t2.result()) print(t3.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
此示例运行睡眠时间随机的任务,检索它们的结果,并使用 perf_counter
测量总执行时间。
return f'finished task {tid}, slept {r}'
任务返回一个描述其完成情况的字符串,可通过 Future 对象的 result
方法访问。
start = perf_counter()
perf_counter
函数提供高精度计时,以测量并发任务的总持续时间。
t1 = executor.submit(task, 1) t2 = executor.submit(task, 2) t3 = executor.submit(task, 3) print(t1.result()) print(t2.result()) print(t3.result())
提交了三个任务,并使用 result
检索了它们的结果,result
会阻塞直到每个任务完成,从而保留提交顺序。
$ ./resultfun.py task 1 started, sleeping 3 secs task 2 started, sleeping 4 secs task 3 started, sleeping 1 secs finished task 1, slept 3 finished task 2, slept 4 finished task 3, slept 1 It took 4.005295900977217 second(s) to finish.
由于并发执行,程序运行时间等于最长任务的运行时间。result
方法的阻塞特性确保结果按提交顺序出现。下一个示例解决了这个限制。
Python ThreadPoolExecutor as_completed
as_completed
函数提供了一个 Future 对象的迭代器,它在任务完成时产生结果,而不考虑提交顺序。
请注意,map
不能与 as_completed
一起使用,因为 map
按可迭代对象的顺序返回结果。
#!/usr/bin/python from time import sleep, perf_counter import random from concurrent.futures import ThreadPoolExecutor, as_completed def task(tid): r = random.randint(1, 5) print(f'task {tid} started, sleeping {r} secs') sleep(r) return f'finished task {tid}, slept {r}' start = perf_counter() with ThreadPoolExecutor() as executor: tids = [1, 2, 3] futures = [] for tid in tids: futures.append(executor.submit(task, tid)) for res in as_completed(futures): print(res.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
此示例以任务完成的顺序检索任务结果,使用 as_completed
动态处理 Futures。
$ ./as_completed.py task 1 started, sleeping 3 secs task 2 started, sleeping 4 secs task 3 started, sleeping 2 secs finished task 3, slept 2 finished task 1, slept 3 finished task 2, slept 4 It took 4.00534593896009 second(s) to finish.
多个并发 HTTP 请求
此示例使用 ThreadPoolExecutor 并发执行多个 HTTP 请求,利用 requests
库获取网页状态码。
#!/usr/bin/python import requests import concurrent.futures import time def get_status(url): resp = requests.get(url=url) return resp.status_code urls = ['http://webcode.me', 'https://httpbin.org/get', 'https://google.com', 'https://stackoverflow.com', 'https://github.com', 'https://clojure.net.cn', 'https://fsharp.org'] tm1 = time.perf_counter() with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in urls: futures.append(executor.submit(get_status, url=url)) for future in concurrent.futures.as_completed(futures): print(future.result()) tm2 = time.perf_counter() print(f'elapsed {tm2-tm1:0.2f} seconds')
该程序并发检查多个网站的 HTTP 状态码,展示了 ThreadPoolExecutor 在 I/O 密集型任务方面的效率。
$ ./web_requests.py 200 200 200 200 200 200 200 elapsed 0.81 seconds
并发 ping
此示例使用 ThreadPoolExecutor 并发 ping 多个网站,通过 subprocess
模块执行外部 ping
命令。
#!/usr/bin/python from time import perf_counter from concurrent.futures import ThreadPoolExecutor, as_completed import subprocess def task(url): ok, _ = subprocess.getstatusoutput( [f'ping -c 3 -w 10 {url}']) return ok == 0, url urls = ['webcode.me', 'clojure.org', 'fsharp.org', 'www.perl.org', 'python.org', 'go.dev', 'raku.org'] start = perf_counter() with ThreadPoolExecutor() as executor: futures = [] for url in urls: futures.append(executor.submit(task, url)) for future in as_completed(futures): r, u = future.result() if r: print(f'OK -> {u}') else: print(f'failed -> {u}') finish = perf_counter() print(f"elapsed {finish-start} second(s)")
该程序并发 ping 多个网站,根据 ping
命令的退出状态报告其可用性。
ok, _ = subprocess.getstatusoutput( [f'ping -c 3 -w 10 {url}'])
getstatusoutput
函数捕获 ping
命令的退出代码和输出,该命令向指定 URL 发送三个具有 10 秒超时的 ICMP 数据包。
$ ./pinging.py OK -> go.dev OK -> fsharp.org OK -> www.perl.org OK -> python.org OK -> raku.org OK -> clojure.org OK -> webcode.me elapsed 2.384801392967347 second(s)
并发文件读取
此示例使用 ThreadPoolExecutor 并发读取多个文本文件,展示了其在 I/O 密集型文件操作中的应用。
#!/usr/bin/python from concurrent.futures import ThreadPoolExecutor, as_completed from time import perf_counter def read_file(filename): try: with open(filename, 'r') as f: content = f.read() return f"Read {filename}: {len(content)} chars" except Exception as e: return f"Error reading {filename}: {e}" files = ['file1.txt', 'file2.txt', 'file3.txt'] start = perf_counter() with ThreadPoolExecutor() as executor: futures = [executor.submit(read_file, f) for f in files] for future in as_completed(futures): print(future.result()) finish = perf_counter() print(f"Elapsed {finish-start:.2f} seconds")
该程序并发读取三个文本文件,报告从每个文件中读取的字符数或遇到的任何错误。
并发数据库查询
此示例演示了使用 ThreadPoolExecutor 并发执行多个 SQLite 数据库查询,非常适合 I/O 密集型数据库任务。
#!/usr/bin/python import sqlite3 from concurrent.futures import ThreadPoolExecutor, as_completed from time import perf_counter def query_db(query): with sqlite3.connect('example.db') as conn: cursor = conn.cursor() cursor.execute(query) result = cursor.fetchall() return f"Query '{query}' returned {len(result)} rows" queries = [ "SELECT * FROM users WHERE age > 20", "SELECT * FROM orders WHERE total > 100", "SELECT * FROM products" ] start = perf_counter() with ThreadPoolExecutor() as executor: futures = [executor.submit(query_db, q) for q in queries] for future in as_completed(futures): print(future.result()) finish = perf_counter() print(f"Elapsed {finish-start:.2f} seconds")
该程序并发执行三个 SQLite 查询,报告每个查询返回的行数,展示了高效的数据库访问。
并发图片下载
此示例使用 ThreadPoolExecutor 从 URL 并发下载多张图片,利用 requests
库进行 HTTP 请求。
#!/usr/bin/python import requests from concurrent.futures import ThreadPoolExecutor, as_completed from time import perf_counter def download_image(url): try: resp = requests.get(url) filename = url.split('/')[-1] with open(filename, 'wb') as f: f.write(resp.content) return f"Downloaded {filename}" except Exception as e: return f"Error downloading {url}: {e}" urls = [ 'https://example.com/image1.jpg', 'https://example.com/image2.png', 'https://example.com/image3.jpg' ] start = perf_counter() with ThreadPoolExecutor() as executor: futures = [executor.submit(download_image, u) for u in urls] for future in as_completed(futures): print(future.result()) finish = perf_counter() print(f"Elapsed {finish-start:.2f} seconds")
该程序并发下载三张图片,将它们本地保存并报告成功或错误,非常适合网络密集型任务。
并发文本处理
此示例使用 ThreadPoolExecutor 并发处理多个文本字符串,计算每个字符串中的单词数,适用于轻量级文本任务。
#!/usr/bin/python from concurrent.futures import ThreadPoolExecutor, as_completed from time import perf_counter def count_words(text): words = text.split() return f"Text '{text[:20]}...' has {len(words)} words" texts = [ "Python is a versatile programming language", "Concurrency improves performance in I/O tasks", "ThreadPoolExecutor simplifies thread management" ] start = perf_counter() with ThreadPoolExecutor() as executor: futures = [executor.submit(count_words, t) for t in texts] for future in as_completed(futures): print(future.result()) finish = perf_counter() print(f"Elapsed {finish-start:.2f} seconds")
该程序并发计算三个文本字符串的单词数,报告每个字符串的单词计数,展示了简单的并发文本处理。
来源
Python ThreadPoolExecutor - 语言参考
本教程涵盖了使用 ThreadPoolExecutor 进行并发编程,展示了它在 Python 中处理 I/O 密集型任务的有效性。
作者
列出所有 Python 教程。