ZetCode

Python os.killpg 函数

上次修改时间:2025 年 4 月 11 日

本综合指南探讨了 Python 的 os.killpg 函数,该函数向进程组发送信号。我们将介绍信号类型、进程组以及进程管理的实际示例。

基本定义

os.killpg 函数向进程组发送信号。 它需要进程组 ID (pgid) 和信号编号作为参数。

关键参数:pgid(进程组 ID),sig(信号编号)。 成功时,返回 None。 失败时引发 OSError(无效的 pgid 或权限)。

向进程组发送 SIGTERM

这个基本示例演示了发送 SIGTERM 以终止组中所有进程。 与 SIGKILL 不同,SIGTERM 允许优雅关闭。

basic_killpg.py
import os
import signal
import time
from multiprocessing import Process

def worker():
    print(f"Worker PID: {os.getpid()} running")
    time.sleep(60)

if __name__ == "__main__":
    # Create a new process group
    os.setpgrp()
    
    # Start child processes
    children = [Process(target=worker) for _ in range(3)]
    for p in children:
        p.start()
    
    print(f"Process group ID: {os.getpgid(0)}")
    input("Press Enter to terminate process group...")
    
    # Send SIGTERM to entire process group
    os.killpg(os.getpgid(0), signal.SIGTERM)
    print("Sent SIGTERM to process group")

这将创建一个具有工作进程的进程组。 触发后,它会将 SIGTERM 发送到组中的所有进程。 每个进程都可以处理该信号。

请注意,我们使用 os.getpgid(0) 来获取我们自己的进程组 ID(0 表示当前进程组)。

使用 SIGTERM 优雅关闭

此示例显示了用于优雅关闭的正确信号处理。 进程可以在收到 SIGTERM 时清除资源后再退出。

graceful_shutdown.py
import os
import signal
import sys
import time
from multiprocessing import Process

def cleanup():
    print(f"{os.getpid()}: Performing cleanup...")
    time.sleep(1)  # Simulate cleanup
    print(f"{os.getpid()}: Cleanup complete")

def signal_handler(signum, frame):
    print(f"{os.getpid()}: Received signal {signum}")
    cleanup()
    sys.exit(0)

def worker():
    signal.signal(signal.SIGTERM, signal_handler)
    print(f"Worker {os.getpid()} running")
    while True:
        time.sleep(1)

if __name__ == "__main__":
    os.setpgrp()
    children = [Process(target=worker) for _ in range(3)]
    for p in children:
        p.start()
    
    print(f"Main PID: {os.getpid()}, PGID: {os.getpgid(0)}")
    input("Press Enter to terminate...")
    
    os.killpg(os.getpgid(0), signal.SIGTERM)
    for p in children:
        p.join()
    print("All processes terminated gracefully")

每个工作进程都注册一个 SIGTERM 处理程序以进行清理。 当 killpg 发送 SIGTERM 时,所有进程都会在退出前执行清理。 主进程等待它们。

这种模式在需要清理的服务器和长时间运行的进程中很常见。

使用 SIGKILL 强制终止

SIGKILL 无法捕获或忽略。 此示例显示了如何使用它来强制终止组中无响应的进程。

force_kill.py
import os
import signal
import time
from multiprocessing import Process

def unresponsive_worker():
    print(f"Unresponsive worker {os.getpid()} running")
    while True:
        time.sleep(1)
        # Simulate ignoring SIGTERM
        pass

if __name__ == "__main__":
    os.setpgrp()
    children = [Process(target=unresponsive_worker) for _ in range(3)]
    for p in children:
        p.start()
    
    print(f"PGID: {os.getpgid(0)}")
    input("First try SIGTERM (press Enter)...")
    
    # Try graceful termination first
    os.killpg(os.getpgid(0), signal.SIGTERM)
    time.sleep(2)  # Give processes time to exit
    
    # Check if any children still alive
    alive = any(p.is_alive() for p in children)
    if alive:
        print("Processes not responding to SIGTERM")
        input("Press Enter to force kill with SIGKILL...")
        os.killpg(os.getpgid(0), signal.SIGKILL)
    
    for p in children:
        p.join(timeout=0.1)
    print("All processes terminated")

首先尝试使用 SIGTERM 优雅关闭。 如果进程没有退出,则会使用 SIGKILL。 SIGKILL 会立即终止进程。

谨慎使用 SIGKILL,因为它不允许清理并且可能会使资源处于锁定状态。

不同的进程组

这演示了杀死特定的进程组,而不是当前的进程组。 我们创建两个独立的组并有选择地终止一个。

specific_group.py
import os
import signal
import time
from multiprocessing import Process

def group_member(name):
    print(f"{name} PID: {os.getpid()}, PGID: {os.getpgid(0)}")
    while True:
        time.sleep(1)

if __name__ == "__main__":
    # Create first process group
    p1 = Process(target=group_member, args=("Group1",))
    p1.start()
    time.sleep(0.1)  # Ensure process starts
    
    # Create second process group
    p2 = Process(target=group_member, args=("Group2",))
    p2.start()
    time.sleep(0.1)
    
    # Get their PGIDs
    pgid1 = os.getpgid(p1.pid)
    pgid2 = os.getpgid(p2.pid)
    
    print(f"Group1 PGID: {pgid1}, Group2 PGID: {pgid2}")
    input("Press Enter to kill Group1...")
    
    os.killpg(pgid1, signal.SIGTERM)
    p1.join()
    print("Group1 terminated, Group2 still running")
    input("Press Enter to exit...")
    os.killpg(pgid2, signal.SIGTERM)
    p2.join()

我们创建两个独立的进程组并终止其中一个,同时保持另一个运行。 默认情况下,每个进程在 Unix 上创建自己的组。

这显示了如何定位特定组,而不仅仅是当前组。

错误处理

本示例演示了使用 os.killpg 时的正确错误处理,包括权限检查和无效进程组情况。

error_handling.py
import os
import signal
import errno

def safe_killpg(pgid, sig):
    try:
        os.killpg(pgid, sig)
        print(f"Successfully sent signal {sig} to group {pgid}")
    except ProcessLookupError:
        print(f"Process group {pgid} does not exist")
    except PermissionError:
        print(f"Permission denied to signal group {pgid}")
    except OSError as e:
        if e.errno == errno.ESRCH:
            print(f"No such process group {pgid}")
        else:
            print(f"Error signaling group {pgid}: {e}")

if __name__ == "__main__":
    # Test with various scenarios
    safe_killpg(0, signal.SIGTERM)  # Current process group
    safe_killpg(999999, signal.SIGTERM)  # Non-existent group
    safe_killpg(1, signal.SIGTERM)  # Init process (usually permission denied)

safe_killpg 函数可以优雅地处理常见的错误情况。 它检查不存在的组、权限问题和其他潜在错误。

在管理进程时,正确的错误处理至关重要,因为在检查和发出信号之间条件可能会发生变化。

信号传播

本示例显示了信号如何在组中传播到子进程,并演示了不同的信号行为。

signal_propagation.py
import os
import signal
import time
from multiprocessing import Process

def child(signal_name):
    print(f"Child {os.getpid()} waiting for {signal_name}")
    while True:
        time.sleep(1)

if __name__ == "__main__":
    os.setpgrp()
    signals = [
        (signal.SIGTERM, "SIGTERM"),
        (signal.SIGINT, "SIGINT"),
        (signal.SIGHUP, "SIGHUP")
    ]
    
    # Create a child for each signal type
    children = [Process(target=child, args=(name,)) for _, name in signals]
    for p in children:
        p.start()
    
    print(f"Main PID: {os.getpid()}, PGID: {os.getpgid(0)}")
    for sig, name in signals:
        input(f"Press Enter to send {name}...")
        os.killpg(os.getpgid(0), sig)
        time.sleep(0.5)  # Allow time for signal handling
    
    for p in children:
        p.join(timeout=1)
    print("Done")

这将创建多个子进程并将不同的信号发送到组。 在实际情况下,每个子进程通常会以不同的方式处理这些信号。

像 SIGTERM 这样的信号默认终止,而像 SIGHUP 这样的其他信号可能具有不同的默认操作或被处理程序捕获。

进程组创建

这个高级示例演示了如何创建和管理自定义进程组,然后向特定组发出信号。

custom_groups.py
import os
import signal
import time
from multiprocessing import Process

def group_worker(name, pgid):
    # Create new session and process group
    os.setsid()
    print(f"{name} PID: {os.getpid()}, PGID: {os.getpgid(0)}")
    while True:
        time.sleep(1)

if __name__ == "__main__":
    # Create two separate process groups
    group1 = Process(target=group_worker, args=("Group1",))
    group1.start()
    time.sleep(0.1)
    
    group2 = Process(target=group_worker, args=("Group2",))
    group2.start()
    time.sleep(0.1)
    
    # Get their PGIDs
    pgid1 = os.getpgid(group1.pid)
    pgid2 = os.getpgid(group2.pid)
    
    print(f"Main PGID: {os.getpgid(0)}")
    print(f"Group1 PGID: {pgid1}, Group2 PGID: {pgid2}")
    
    input("Press Enter to terminate Group1...")
    os.killpg(pgid1, signal.SIGTERM)
    group1.join()
    
    input("Press Enter to terminate Group2...")
    os.killpg(pgid2, signal.SIGTERM)
    group2.join()
    
    print("All groups terminated")

每个 worker 使用 os.setsid() 创建自己的会话和进程组。 这允许完全独立的进程组可以单独发出信号。

这种模式对于独立于系统中其他组管理相关进程集非常有用。

安全注意事项

最佳实践

资料来源

作者

我的名字是 Jan Bodnar,我是一位热情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直在撰写编程文章。 迄今为止,我撰写了 1,400 多篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程