Python 多进程
最后修改于 2024 年 1 月 29 日
Python 多进程教程是 Python 中基于进程的并行性的入门教程。
Python 多进程
multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。所使用的 API 与经典的 threading 模块相似。它同时提供本地和远程并发。
multiprocesing 模块通过使用子进程而不是线程来避免全局解释器锁 (GIL) 的限制。多进程代码的执行顺序与串行代码不同。不能保证第一个创建的进程将是第一个完成的。
Python GIL
全局解释器锁 (GIL) 是 Python 解释器中用于同步线程执行的一种机制,即使在多核处理器上运行时,一次也只有一个本机线程能够执行。
C 扩展,如 numpy,可以手动释放 GIL 以加快计算速度。此外,GIL 在潜在的阻塞 I/O 操作之前被释放。
请注意,Jython 和 IronPython 都没有 GIL。
并发与并行
并发是指两个或多个计算在相同的时间范围内发生。并行是指两个或多个计算在同一时刻发生。因此,并行是并发的一个特例。它需要多个 CPU 单元或核心。
Python 中的真正并行是通过创建多个进程来实现的,每个进程都有一个带有自己独立 GIL 的 Python 解释器。
Python 提供了三个用于并发的模块:multiprocessing、threading 和 asyncio。当任务是 CPU 密集型时,我们应该考虑 multiprocessing 模块。当任务是 I/O 密集型且需要大量连接时,建议使用 asyncio 模块。对于其他类型的任务以及库无法与 asyncio 协作时,可以考虑 threading 模块。
易于并行
易于并行这个术语用于描述一个可以轻松并行运行的问题或工作负载。重要的是要认识到,并非所有工作负载都可以分解为子任务并并行运行。例如,那些需要在子任务之间进行大量通信的任务。
完美并行计算的例子包括
- 蒙特卡洛分析
- 数值积分
- 计算机图形渲染
- 密码学中的暴力搜索
- 遗传算法
可以应用并行计算的另一种情况是,当我们运行多个不同的计算时,也就是说,我们不将一个问题分解为子任务。例如,我们可以并行计算 π 的不同算法。
进程与线程
进程和线程都是独立的执行序列。下表总结了进程和线程之间的区别
| 进程 | 线程 |
|---|---|
| 进程在独立的内存中运行(进程隔离) | 线程共享内存 |
| 占用更多内存 | 占用更少内存 |
| 子进程可能变成僵尸进程 | 不可能出现僵尸进程 |
| 开销更大 | 开销更小 |
| 创建和销毁更慢 | 创建和销毁更快 |
| 更容易编写和调试 | 可能更难编写和调试 |
进程
Process 对象表示在一个单独的进程中运行的活动。multiprocessing.Process 类具有 threading.Thread 所有方法的等效方法。Process 构造函数应始终使用关键字参数调用。
构造函数的 target 参数是要由 run 方法调用的可调用对象。name 是进程的名称。start 方法启动进程的活动。join 方法会阻塞,直到调用 join 方法的进程终止。如果提供了 timeout 选项,它最多会阻塞 timeout 秒。is_alive 方法返回一个布尔值,指示进程是否仍在运行。terminate 方法终止进程。
__main__ 守卫
Python 多进程样式指南建议将多进程代码放在 __name__ == '__main__' 惯用法中。这是由于 Windows 创建进程的方式。此守卫是为了防止进程生成无限循环。
简单进程示例
下面是一个使用 multiprocessing 的简单程序。
#!/usr/bin/python
from multiprocessing import Process
def fun(name):
print(f'hello {name}')
def main():
p = Process(target=fun, args=('Peter',))
p.start()
if __name__ == '__main__':
main()
我们创建一个新进程并向其传递一个值。
def fun(name):
print(f'hello {name}')
该函数打印传递的参数。
def main():
p = Process(target=fun, args=('Peter',))
p.start()
创建了一个新进程。target 选项提供了在新进程中运行的可调用对象。args 提供了要传递的数据。多进程代码放在主守卫中。进程使用 start 方法启动。
if __name__ == '__main__':
main()
代码放在 __name__ == '__main__' 惯用法中。
Python 多进程 join
join 方法会阻塞主进程的执行,直到调用 join 方法的进程终止。如果没有 join 方法,主进程不会等待子进程终止。
#!/usr/bin/python
from multiprocessing import Process
import time
def fun():
print('starting fun')
time.sleep(2)
print('finishing fun')
def main():
p = Process(target=fun)
p.start()
p.join()
if __name__ == '__main__':
print('starting main')
main()
print('finishing main')
该示例在新创建的进程上调用 join。
$ ./joining.py starting main starting fun finishing fun finishing main
在子进程完成后,会打印“finishing main”消息。
$ ./joining.py starting main finishing main starting fun finishing fun
当我们注释掉 join 方法时,主进程在子进程完成之前结束。
重要的是要在 start 方法之后调用 join 方法。
#!/usr/bin/python
from multiprocessing import Process
import time
def fun(val):
print(f'starting fun with {val} s')
time.sleep(val)
print(f'finishing fun with {val} s')
def main():
p1 = Process(target=fun, args=(3, ))
p1.start()
# p1.join()
p2 = Process(target=fun, args=(2, ))
p2.start()
# p2.join()
p3 = Process(target=fun, args=(1, ))
p3.start()
# p3.join()
p1.join()
p2.join()
p3.join()
print('finished main')
if __name__ == '__main__':
main()
如果我们错误地调用 join 方法,那么我们实际上是顺序运行进程。(错误的方式被注释掉了。)
Python 多进程 is_alive
is_alive 方法确定进程是否正在运行。
#!/usr/bin/python
from multiprocessing import Process
import time
def fun():
print('calling fun')
time.sleep(2)
def main():
print('main fun')
p = Process(target=fun)
p.start()
p.join()
print(f'Process p is alive: {p.is_alive()}')
if __name__ == '__main__':
main()
当我们使用 join 方法等待子进程完成时,当我们检查它时,进程已经死亡。如果注释掉 join,则进程仍然存活。
Python 多进程进程 ID
os.getpid 返回当前进程 ID,而 os.getppid 返回父进程 ID。
#!/usr/bin/python
from multiprocessing import Process
import os
def fun():
print('--------------------------')
print('calling fun')
print('parent process id:', os.getppid())
print('process id:', os.getpid())
def main():
print('main fun')
print('process id:', os.getpid())
p1 = Process(target=fun)
p1.start()
p1.join()
p2 = Process(target=fun)
p2.start()
p2.join()
if __name__ == '__main__':
main()
该示例运行两个子进程。它打印它们的 ID 和它们的父 ID。
$ ./parent_id.py main fun process id: 7605 -------------------------- calling fun parent process id: 7605 process id: 7606 -------------------------- calling fun parent process id: 7605 process id: 7607
父 ID 相同,每个子进程的进程 ID 不同。
命名进程
使用 Process 的 name 属性,我们可以为工作进程指定一个特定的名称。否则,模块会创建自己的名称。
#!/usr/bin/python
from multiprocessing import Process, current_process
import time
def worker():
name = current_process().name
print(name, 'Starting')
time.sleep(2)
print(name, 'Exiting')
def service():
name = current_process().name
print(name, 'Starting')
time.sleep(3)
print(name, 'Exiting')
if __name__ == '__main__':
service = Process(name='Service 1', target=service)
worker1 = Process(name='Worker 1', target=worker)
worker2 = Process(target=worker) # use default name
worker1.start()
worker2.start()
service.start()
在示例中,我们创建了三个进程;其中两个被赋予了自定义名称。
$ ./naming_workers.py Worker 1 Starting Process-3 Starting Service 1 Starting Worker 1 Exiting Process-3 Exiting Service 1 Exiting
子类化 Process
当我们子类化 Process 时,我们会覆盖 run 方法。
#!/usr/bin/python
import time
from multiprocessing import Process
class Worker(Process):
def run(self):
print(f'In {self.name}')
time.sleep(2)
def main():
worker = Worker()
worker.start()
worker2 = Worker()
worker2.start()
worker.join()
worker2.join()
if __name__ == '__main__':
main()
我们创建一个继承自 Process 的 Worker 类。在 run 方法中,我们编写工作进程的代码。
Python 多进程 Pool
工作进程的管理可以通过 Pool 对象来简化。它控制一个工作进程池,可以向其中提交作业。池的 map 方法将给定的可迭代对象分成若干块,然后将它们作为单独的任务提交给进程池。池的 map 是内置 map 方法的并行等效项。map 会阻塞主执行,直到所有计算完成。
Pool 可以将进程数作为参数。这是一个我们可以尝试的值。如果我们不提供任何值,则使用 os.cpu_count 返回的数字。
#!/usr/bin/python
import time
from timeit import default_timer as timer
from multiprocessing import Pool, cpu_count
def square(n):
time.sleep(2)
return n * n
def main():
start = timer()
print(f'starting computations on {cpu_count()} cores')
values = (2, 4, 6, 8)
with Pool() as pool:
res = pool.map(square, values)
print(res)
end = timer()
print(f'elapsed time: {end - start}')
if __name__ == '__main__':
main()
在示例中,我们创建了一个进程池,并将值应用于 square 函数。核心数量是通过 cpu_unit 函数确定的。
$ ./worker_pool.py starting computations on 4 cores [4, 16, 36, 64] elapsed time: 2.0256662130013865
在具有四个核心的计算机上,完成四个持续两秒的计算花费了略多于 2 秒。
$ ./worker_pool.py starting computations on 4 cores [4, 16, 36, 64, 100] elapsed time: 4.029600699999719
当我们添加要计算的附加值时,时间增加到超过四秒。
多个参数
要将多个参数传递给工作函数,我们可以使用 starmap 方法。可迭代对象的元素被期望是解包为参数的可迭代对象。
#!/usr/bin/python
import time
from timeit import default_timer as timer
from multiprocessing import Pool, cpu_count
def power(x, n):
time.sleep(1)
return x ** n
def main():
start = timer()
print(f'starting computations on {cpu_count()} cores')
values = ((2, 2), (4, 3), (5, 5))
with Pool() as pool:
res = pool.starmap(power, values)
print(res)
end = timer()
print(f'elapsed time: {end - start}')
if __name__ == '__main__':
main()
在此示例中,我们将两个值传递给 power 函数:值和指数。
$ ./multi_args.py starting computations on 4 cores [4, 64, 3125] elapsed time: 1.0230950259974634
多个函数
以下示例显示了如何在池中运行多个函数。
#!/usr/bin/python
from multiprocessing import Pool
import functools
def inc(x):
return x + 1
def dec(x):
return x - 1
def add(x, y):
return x + y
def smap(f):
return f()
def main():
f_inc = functools.partial(inc, 4)
f_dec = functools.partial(dec, 2)
f_add = functools.partial(add, 3, 4)
with Pool() as pool:
res = pool.map(smap, [f_inc, f_dec, f_add])
print(res)
if __name__ == '__main__':
main()
我们有三个函数,它们在池中独立运行。我们使用 functools.partial 在函数执行之前准备它们及其参数。
$ ./multiple_functions.py [5, 1, 7]
Python 多进程 π 计算
π 是任何圆的周长与其直径之比。π 是一个无理数,其小数形式既不结束也不重复。它约等于 3.14159。有几种计算 π 的公式。
计算 π 的近似值可能需要很长时间,因此我们可以利用并行计算。我们使用 Bailey–Borwein–Plouffe 公式来计算 π。
#!/usr/bin/python
from decimal import Decimal, getcontext
from timeit import default_timer as timer
def pi(precision):
getcontext().prec = precision
return sum(1/Decimal(16)**k *
(Decimal(4)/(8*k+1) -
Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) -
Decimal(1)/(8*k+6)) for k in range (precision))
start = timer()
values = (1000, 1500, 2000)
data = list(map(pi, values))
print(data)
end = timer()
print(f'sequentially: {end - start}')
首先,我们按顺序计算三个近似值。精度是计算出的 π 的位数。
$ ./calc_pi.py ... sequentially: 0.5738053179993585
在我们的机器上,计算三个近似值花费了 0.57381 秒。
在下面的示例中,我们使用进程池来计算这三个近似值。
#!/usr/bin/python
from decimal import Decimal, getcontext
from timeit import default_timer as timer
from multiprocessing import Pool, current_process
import time
def pi(precision):
getcontext().prec=precision
return sum(1/Decimal(16)**k *
(Decimal(4)/(8*k+1) -
Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) -
Decimal(1)/(8*k+6)) for k in range (precision))
def main():
start = timer()
with Pool(3) as pool:
values = (1000, 1500, 2000)
data = pool.map(pi, values)
print(data)
end = timer()
print(f'paralelly: {end - start}')
if __name__ == '__main__':
main()
我们在一个由三个进程组成的池中运行计算,并获得了一些小效率提升。
./calc_pi2.py ... paralelly: 0.38216479000038817
当我们在并行中运行计算时,花费了 0.38216479 秒。
进程中的独立内存
在多进程中,每个工作进程都有自己的内存。内存不像在线程中那样共享。
#!/usr/bin/python
from multiprocessing import Process, current_process
data = [1, 2]
def fun():
global data
data.extend((3, 4, 5))
print(f'Result in {current_process().name}: {data}')
def main():
worker = Process(target=fun)
worker.start()
worker.join()
print(f'Result in main: {data}')
if __name__ == '__main__':
main()
我们创建一个工作进程,并向其传递全局 data 列表。我们在工作进程中向列表中添加额外的值,但主进程中的原始列表未被修改。
$ ./own_memory_space.py Result in Process-1: [1, 2, 3, 4, 5] Result in main: [1, 2]
从输出中可以看出,这两个列表是独立的。
在进程之间共享状态
可以使用 Value 或 Array 将数据存储在共享内存中。
#!/usr/bin/python
from multiprocessing import Process, Value
from time import sleep
def f(counter):
sleep(1)
with counter.get_lock():
counter.value += 1
print(f'Counter: {counter.value}')
def main():
counter = Value('i', 0)
processes = [Process(target=f, args=(counter, )) for _ in range(30)]
for p in processes:
p.start()
for p in processes:
p.join()
if __name__ == '__main__':
main()
该示例创建了一个在进程之间共享的计数器对象。每个进程都会增加计数器。
with counter.get_lock():
counter.value += 1
每个进程必须为自己获取一个锁。
使用队列进行消息传递
消息传递是进程间通信的首选方式。消息传递避免了使用同步原语(如锁)的需求,而锁在使用和纠错方面都非常困难,尤其是在复杂情况下。
要传递消息,我们可以利用 pipe 来连接两个进程。队列允许多个生产者和消费者。
#!/usr/bin/python
from multiprocessing import Process, Queue
import random
def rand_val(queue):
num = random.random()
queue.put(num)
def main():
queue = Queue()
processes = [Process(target=rand_val, args=(queue,)) for _ in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()
results = [queue.get() for _ in processes]
print(results)
if __name__ == "__main__":
main()
在示例中,我们创建了四个进程。每个进程生成一个随机值并将其放入队列。在所有进程完成之后,我们从队列中获取所有值。
processes = [Process(target=rand_val, args=(queue,)) for _ in range(4)]
队列作为参数传递给进程。
results = [queue.get() for _ in processes]
get 方法会从队列中移除并返回项。
$ ./simple_queue.py [0.7829025790441544, 0.46465345633928223, 0.4804438310782676, 0.7146952404346074]
该示例生成了一个包含四个随机值列表。
在下面的示例中,我们将单词放入队列。创建的进程从队列中读取单词。
#!/usr/bin/python
from multiprocessing import Queue, Process, current_process
def worker(queue):
name = current_process().name
print(f'{name} data received: {queue.get()}')
def main():
queue = Queue()
queue.put("wood")
queue.put("sky")
queue.put("cloud")
queue.put("ocean")
processes = [Process(target=worker, args=(queue,)) for _ in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()
if __name__ == "__main__":
main()
创建了四个进程;每个进程从队列中读取一个单词并打印它。
$ ./simple_queue2.py Process-1 data received: wood Process-2 data received: sky Process-3 data received: cloud Process-4 data received: ocean
队列顺序
在多进程中,不保证进程以特定的顺序完成。
#!/usr/bin/python
from multiprocessing import Process, Queue
import time
import random
def square(idx, x, queue):
time.sleep(random.randint(1, 3))
queue.put((idx, x * x))
def main():
data = [2, 4, 6, 3, 5, 8, 9, 7]
queue = Queue()
processes = [Process(target=square, args=(idx, val, queue))
for idx, val in enumerate(data)]
for p in processes:
p.start()
for p in processes:
p.join()
unsorted_result = [queue.get() for _ in processes]
result = [val[1] for val in sorted(unsorted_result)]
print(result)
if __name__ == '__main__':
main()
我们有计算值平方的进程。输入数据是有序的,我们需要维护这个顺序。为了解决这个问题,我们为每个输入值保留一个额外的索引。
def square(idx, x, queue):
time.sleep(random.randint(1, 3))
queue.put((idx, x * x))
为了说明变化,我们使用 sleep 方法随机减慢计算速度。我们将索引连同计算出的平方一起放入队列。
unsorted_result = [queue.get() for _ in processes]
我们获取结果。此时,元组的顺序是随机的。
result = [val[1] for val in sorted(unsorted_result)]
我们按索引值对结果数据进行排序。
$ ./queue_order.py [4, 16, 36, 9, 25, 64, 81, 49]
我们得到与初始数据相对应的平方值。
使用蒙特卡洛方法计算 π
蒙特卡洛方法是一大类计算算法,它依赖于重复的随机抽样来获得数值结果。基本概念是利用随机性来解决原则上可以确定性解决的问题。
以下公式用于计算 π 的近似值
M 是在正方形中生成的点数,N 是总点数。
虽然这种 π 计算方法很有趣,也非常适合学校示例,但它并不十分准确。有更好的算法可以获得 π。
#!/usr/bin/python
from random import random
from math import sqrt
from timeit import default_timer as timer
def pi(n):
count = 0
for i in range(n):
x, y = random(), random()
r = sqrt(pow(x, 2) + pow(y, 2))
if r < 1:
count += 1
return 4 * count / n
start = timer()
pi_est = pi(100_000_000)
end = timer()
print(f'elapsed time: {end - start}')
print(f'π estimate: {pi_est}')
在示例中,我们使用一百万个生成的随机点来计算 π 值的近似值。
$ ./monte_carlo_pi.py elapsed time: 44.7768127549989 π estimate: 3.14136588
计算 π 的近似值花费了 44.78 秒。
现在我们将整个 π 计算任务分解为子任务。
#!/usr/bin/python
import random
from multiprocessing import Pool, cpu_count
from math import sqrt
from timeit import default_timer as timer
def pi_part(n):
print(n)
count = 0
for i in range(int(n)):
x, y = random.random(), random.random()
r = sqrt(pow(x, 2) + pow(y, 2))
if r < 1:
count += 1
return count
def main():
start = timer()
np = cpu_count()
print(f'You have {np} cores')
n = 100_000_000
part_count = [n/np for i in range(np)]
with Pool(processes=np) as pool:
count = pool.map(pi_part, part_count)
pi_est = sum(count) / (n * 1.0) * 4
end = timer()
print(f'elapsed time: {end - start}')
print(f'π estimate: {pi_est}')
if __name__=='__main__':
main()
在示例中,我们找出核心数量并将随机抽样分解为子任务。每个任务将独立计算随机值。
n = 100_000_000 part_count = [n/np for i in range(np)]
每个子任务将计算一部分,而不是一次性计算 100,000,000。
count = pool.map(pi_part, part_count) pi_est = sum(count) / (n * 1.0) * 4
部分计算被传递给 count 变量,然后将总和用于最终公式。
$ ./monte_carlo_pi_mul.py You have 4 cores 25000000.0 25000000.0 25000000.0 25000000.0 elapsed time: 29.45832426099878 π estimate: 3.1414868
当使用四核并行运行示例时,计算花费了 29.46 秒。
来源
在本文中,我们研究了 multiprocessing 模块。
作者
列出所有 Python 教程。