Python os.forkpty 函数
上次修改时间:2025 年 4 月 11 日
这份全面的指南探讨了 Python 的 os.forkpty
函数,它使用伪终端创建一个子进程。我们将介绍终端模拟、进程通信和实际示例。
基本定义
os.forkpty
函数结合了 fork() 和 pty 创建。它创建一个连接到新的伪终端 (pty) 的子进程。
返回一个元组 (pid, fd),其中 pid 在子进程中为 0,在父进程中为子进程的 PID。 fd 是 pty 主端的的文件描述符。子进程将从端作为 stdin/stdout/stderr。
基本 Forkpty 示例
这个简单的例子演示了 os.forkpty 的基本用法。父进程通过伪终端与子进程通信。
import os import sys pid, fd = os.forkpty() if pid == 0: # Child process print("Child process running in pty") sys.stdout.flush() data = sys.stdin.readline() print(f"Child received: {data.strip()}") sys.exit(0) else: # Parent process print(f"Parent process with child PID: {pid}") os.write(fd, b"Hello from parent\n") output = os.read(fd, 1024) print(f"Parent received: {output.decode().strip()}") os.waitpid(pid, 0)
子进程在伪终端中运行,可以由父进程控制。父进程通过 pty 向子进程写入数据并从中读取数据。
这演示了使用伪终端的父进程和子进程之间的基本通信模式。
在 Pty 中运行 Shell
这个例子展示了如何在伪终端中运行 shell。父进程可以发送命令并读取输出,就像与真实终端交互一样。
import os import select import sys pid, fd = os.forkpty() if pid == 0: # Child process os.execvp("bash", ["bash"]) else: # Parent process while True: r, w, e = select.select([fd, sys.stdin], [], []) if fd in r: output = os.read(fd, 1024) if not output: break sys.stdout.write(output.decode()) sys.stdout.flush() if sys.stdin in r: cmd = sys.stdin.readline() os.write(fd, cmd.encode())
子进程在伪终端中运行 bash。父进程使用 select 以非阻塞方式处理用户输入和 pty 输出。
这创建了一个交互式 shell 会话,可以通过 pty 发送命令并接收输出。
终端大小控制
这个例子演示了如何控制 pty 的终端大小。父进程可以修改窗口大小,从而影响子进程中的程序。
import os import fcntl import termios import struct def set_winsize(fd, rows, cols): winsize = struct.pack("HHHH", rows, cols, 0, 0) fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize) pid, fd = os.forkpty() if pid == 0: # Child process os.execvp("bash", ["bash"]) else: # Parent process # Set initial terminal size set_winsize(fd, 40, 120) # After some time, resize the terminal import time time.sleep(2) set_winsize(fd, 20, 60) os.waitpid(pid, 0)
set_winsize 函数使用 ioctl 和 TIOCSWINSZ 来更改终端尺寸。 这会影响文本编辑器等程序显示内容的方式。
该示例显示了 40x120 的初始大小,然后在 2 秒后更改为 20x60。 子进程中的程序将适应新尺寸。
原始模式终端
这个例子将 pty 置于原始模式,禁用行缓冲和特殊字符处理。 这对于交互式应用程序很有用。
import os import tty import sys import select pid, fd = os.forkpty() if pid == 0: # Child process os.execvp("bash", ["bash"]) else: # Parent process # Put terminal in raw mode old_settings = tty.tcgetattr(fd) tty.setraw(fd) try: while True: r, w, e = select.select([fd, sys.stdin], [], []) if fd in r: output = os.read(fd, 1024) if not output: break sys.stdout.write(output.decode()) sys.stdout.flush() if sys.stdin in r: cmd = sys.stdin.read(1) os.write(fd, cmd.encode()) finally: # Restore original terminal settings tty.tcsetattr(fd, tty.TCSADRAIN, old_settings)
父进程使用 tty.setraw() 将 pty 置于原始模式。 这将禁用回显、行缓冲和特殊字符处理(例如 Ctrl+C)。
该示例确保在完成后恢复终端设置。 这对于避免将终端置于无法使用的状态非常重要。
带有环境控制的 Pty
这个例子展示了如何控制在 pty 中运行的子进程的环境。 我们在执行 exec 之前设置自定义环境变量。
import os import sys pid, fd = os.forkpty() if pid == 0: # Child process # Modify environment os.environ["CUSTOM_VAR"] = "special_value" os.environ["TERM"] = "xterm-256color" # Run command that shows environment os.execvp("bash", ["bash", "-c", "echo $CUSTOM_VAR; echo $TERM; sleep 2"]) else: # Parent process output = os.read(fd, 1024) print("Child output:") print(output.decode()) os.waitpid(pid, 0)
子进程在执行 bash 之前设置自定义环境变量。 这些变量仅对子进程及其后代可见。
该示例演示了如何设置自定义变量和标准终端相关变量(例如 TERM)。
Pty 中的信号处理
这个例子演示了通过 pty 通信的父进程和子进程之间的信号处理。 父进程可以向子进程发送信号。
import os import signal import time import sys pid, fd = os.forkpty() if pid == 0: # Child process # Set up signal handler def handler(signum, frame): print(f"\nChild received signal {signum}") sys.exit(0) signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGTERM, handler) print("Child waiting for signals...") while True: time.sleep(1) else: # Parent process time.sleep(1) # Give child time to start print(f"Parent sending SIGINT to child {pid}") os.kill(pid, signal.SIGINT) status = os.waitpid(pid, 0) print(f"Child exit status: {status[1]}")
子进程为 SIGINT 和 SIGTERM 设置信号处理程序。 父进程发送 SIGINT 以演示通过 pty 传递信号。
这展示了如何使用信号来控制在伪终端中运行的子进程。
带有超时的 Pty
这个例子实现了从 pty 读取时的超时,演示了如何处理子进程可能挂起或响应时间过长的情况。
import os import select import sys import time pid, fd = os.forkpty() if pid == 0: # Child process print("Child will respond after 3 seconds") time.sleep(3) print("Child done") sys.exit(0) else: # Parent process timeout = 2 # seconds start = time.time() while True: remaining = timeout - (time.time() - start) if remaining <= 0: print("\nTimeout reached, killing child") os.kill(pid, signal.SIGTERM) break r, w, e = select.select([fd], [], [], remaining) if fd in r: output = os.read(fd, 1024) if not output: break print(output.decode(), end="") else: print("\nNo output within timeout period") os.kill(pid, signal.SIGTERM) break os.waitpid(pid, 0)
父进程等待子进程的输出,超时时间为 2 秒。 子进程故意休眠 3 秒以触发超时。
这演示了如何在通过 pty 与进程交互时实现超时,这对于健壮的应用程序非常重要。
安全注意事项
- 权限分离: 子进程继承父进程权限
- 终端注入: 谨慎处理 pty 的不可信输入
- 信号处理: 正确处理两个进程中的信号
- 资源清理: 确保 pty 被正确关闭
- 平台差异: 在 Unix 系统之间,行为可能会有所不同
最佳实践
- 错误处理: 检查返回值并处理错误
- 资源管理: 使用上下文管理器管理文件描述符
- 信号安全: 注意两个进程中的信号处理
- 终端设置: 完成后恢复原始设置
- 测试: 使用不同的终端类型和大小进行测试
资料来源
作者
列出所有 Python 教程。