Python os.setpgrp 函数
上次修改时间:2025 年 4 月 11 日
本综合指南探讨 Python 的 os.setpgrp 函数,该函数用于创建或加入进程组。我们将介绍 Unix 进程管理、组创建以及进程控制的实际示例。
基本定义
os.setpgrp 函数创建一个新的进程组或加入一个已存在的进程组。 这是一个 Unix 专属的进程管理系统调用。
进程组有助于一起管理相关进程,特别是对于信号处理和终端控制。 该函数不带参数,并返回新的进程组 ID。
创建新的进程组
这个基本示例演示了如何从 Python 脚本创建一个新的进程组。 子进程成为新进程组的组长。
import os
import time
print(f"Parent PID: {os.getpid()}, PGID: {os.getpgid(0)}")
pid = os.fork()
if pid == 0: # Child process
print(f"Child PID before setpgrp: {os.getpid()}, PGID: {os.getpgid(0)}")
os.setpgrp()
print(f"Child PID after setpgrp: {os.getpid()}, PGID: {os.getpgid(0)}")
time.sleep(10)
else: # Parent process
print(f"Parent waiting for child {pid}")
os.waitpid(pid, 0)
子进程调用 os.setpgrp() 来创建一个新的进程组。 父进程等待子进程完成后再退出。
请注意,在调用 setpgrp 后,子进程的进程组 ID 会发生变化,成为其自身组的组长。
守护进程的创建
这个示例展示了创建守护进程时如何使用 os.setpgrp 。守护进程通常创建新的进程组以变得独立。
import os
import sys
import time
def daemonize():
# Fork and exit parent
pid = os.fork()
if pid > 0:
sys.exit(0)
# Create new session and process group
os.setsid()
os.setpgrp()
# Fork again to prevent acquiring controlling terminal
pid = os.fork()
if pid > 0:
sys.exit(0)
# Change working directory
os.chdir('/')
# Redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, 'r')
so = open(os.devnull, 'a+')
se = open(os.devnull, 'a+')
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
if __name__ == "__main__":
daemonize()
print("Daemon running (this won't be seen)")
while True:
with open("/tmp/daemon.log", "a") as f:
f.write("Daemon working...\n")
time.sleep(5)
这通过两次 fork、创建一个新的会话以及设置一个新的进程组来创建一个合适的守护进程。 守护进程将其输出重定向到 /dev/null。
os.setpgrp 调用确保守护进程在其自己的进程组中运行,独立于启动它的终端。
进程组隔离
这个示例演示了如何隔离进程组以防止信号影响子进程。 父进程在新的进程组中创建一个子进程。
import os
import signal
import time
def child_process():
print(f"Child PID: {os.getpid()}, PGID: {os.getpgid(0)}")
time.sleep(30)
print("Child exiting normally")
pid = os.fork()
if pid == 0:
os.setpgrp() # Create new process group
child_process()
else:
print(f"Parent PID: {os.getpid()}, Child PID: {pid}")
print("Sending SIGTERM to entire process group in 5 seconds...")
time.sleep(5)
os.killpg(os.getpgid(pid), signal.SIGTERM)
print("Signal sent")
os.waitpid(pid, 0)
子进程使用 os.setpgrp() 创建自己的进程组。 5 秒后,父进程将 SIGTERM 发送到子进程的进程组。
这演示了进程组如何允许一次向多个相关进程发送信号,同时将它们与其他组隔离。
作业控制模拟
这个示例通过创建多个进程组并分别管理它们来模拟简单的作业控制。 它演示了前台和后台作业。
import os
import time
def worker(name):
print(f"{name} started (PID: {os.getpid()}, PGID: {os.getpgid(0)})")
for i in range(5):
print(f"{name} working...")
time.sleep(1)
print(f"{name} finished")
# Create background job
bg_pid = os.fork()
if bg_pid == 0:
os.setpgrp()
worker("Background job")
os._exit(0)
# Create foreground job
fg_pid = os.fork()
if fg_pid == 0:
worker("Foreground job")
os._exit(0)
print(f"Background job PID: {bg_pid}, PGID: {os.getpgid(bg_pid)}")
print(f"Foreground job PID: {fg_pid}, PGID: {os.getpgid(fg_pid)}")
# Wait for foreground job
os.waitpid(fg_pid, 0)
print("Foreground job completed")
# Check background job status
try:
os.waitpid(bg_pid, os.WNOHANG)
print("Background job still running")
except ChildProcessError:
print("Background job already finished")
后台作业使用 os.setpgrp() 创建自己的进程组,而前台作业则保留在父进程的组中。 父进程等待前台作业完成。
这演示了 shell 如何使用进程组管理前台和后台作业。
进程组继承
此示例显示默认情况下如何继承进程组,以及 os.setpgrp 如何更改此行为。 它 fork 多个进程。
import os
def print_ids(label):
print(f"{label} - PID: {os.getpid()}, PPID: {os.getppid()}, PGID: {os.getpgid(0)}")
print_ids("Original process")
pid1 = os.fork()
if pid1 == 0:
print_ids("Child 1 (inherited group)")
pid2 = os.fork()
if pid2 == 0:
print_ids("Child 2 (before setpgrp)")
os.setpgrp()
print_ids("Child 2 (after setpgrp)")
os._exit(0)
os.waitpid(pid2, 0)
os._exit(0)
os.waitpid(pid1, 0)
pid3 = os.fork()
if pid3 == 0:
os.setpgrp()
print_ids("Child 3 (new group)")
os._exit(0)
os.waitpid(pid3, 0)
该示例显示了三个级别的 fork。 子进程 1 继承父进程的组,子进程 2 创建一个新组,子进程 3 立即创建自己的组。
这演示了默认情况下如何继承进程组,但可以通过 os.setpgrp() 更改。
使用进程组进行信号处理
此示例演示了进程组如何影响信号传递。 它创建具有不同组配置的多个进程。
import os
import signal
import time
def handler(signum, frame):
print(f"Process {os.getpid()} received signal {signum}")
# Set up signal handler
signal.signal(signal.SIGUSR1, handler)
# Process in original group
pid1 = os.fork()
if pid1 == 0:
print(f"Process A (PID: {os.getpid()}, PGID: {os.getpgid(0)})")
time.sleep(10)
os._exit(0)
# Process in new group
pid2 = os.fork()
if pid2 == 0:
os.setpgrp()
print(f"Process B (PID: {os.getpid()}, PGID: {os.getpgid(0)})")
time.sleep(10)
os._exit(0)
print("Sending signal to original process group in 3 seconds...")
time.sleep(3)
os.killpg(os.getpgid(0), signal.SIGUSR1)
print("Sending signal to all processes in 3 seconds...")
time.sleep(3)
os.kill(-1, signal.SIGUSR1)
os.waitpid(pid1, 0)
os.waitpid(pid2, 0)
进程 A 保留在原始进程组中,而进程 B 创建一个新组。 该示例将信号发送到不同的组以演示效果。
第一个信号仅到达进程 A,而第二个信号到达两个进程,显示了进程组如何控制信号传递。
终端控制示例
这个高级示例展示了进程组如何与终端控制相关。 它演示了创建一个可以控制终端的进程组。
import os
import pty
import tty
import signal
def child_process():
os.setpgrp()
print(f"Child PID: {os.getpid()}, PGID: {os.getpgid(0)}")
# Set up terminal
tty.setraw(0)
print("Child has terminal control. Press Ctrl+C to exit")
try:
while True:
data = os.read(0, 1024)
if not data:
break
os.write(1, data.upper())
except KeyboardInterrupt:
print("\nChild exiting")
pid, fd = pty.fork()
if pid == 0:
child_process()
os._exit(0)
else:
print(f"Parent PID: {os.getpid()}, Child PID: {pid}")
print("Waiting for child to finish...")
os.waitpid(pid, 0)
print("Parent exiting")
子进程使用 os.setpgrp() 创建一个新的进程组并控制伪终端。 它以大写形式回显输入,直到按下 Ctrl+C。
这演示了进程组对于终端控制至关重要,以及它们如何与 Ctrl+C 等终端信号交互。
安全注意事项
- 权限要求:可能需要适当的权限
- 信号隔离:影响信号的传递方式
- 终端控制:影响哪个进程组控制终端
- 平台限制:Unix 专属功能
- 孤立进程:新组可能需要特殊的清理
最佳实践
- 用于守护进程:对于正确的守护进程操作至关重要
- 与 setsid 结合使用:通常一起使用以实现完全隔离
- 考虑替代方案:
os.setsid可能是更可取的 - 记录意图:清楚地说明更改进程组的原因
- 处理清理:确保为新组进行正确的信号处理
资料来源
作者
列出所有 Python 教程。