ZetCode

Python os.forkpty 函数

上次修改时间:2025 年 4 月 11 日

这份全面的指南探讨了 Python 的 os.forkpty 函数,它使用伪终端创建一个子进程。我们将介绍终端模拟、进程通信和实际示例。

基本定义

os.forkpty 函数结合了 fork() 和 pty 创建。它创建一个连接到新的伪终端 (pty) 的子进程。

返回一个元组 (pid, fd),其中 pid 在子进程中为 0,在父进程中为子进程的 PID。 fd 是 pty 主端的的文件描述符。子进程将从端作为 stdin/stdout/stderr。

基本 Forkpty 示例

这个简单的例子演示了 os.forkpty 的基本用法。父进程通过伪终端与子进程通信。

basic_forkpty.py
import os
import sys

pid, fd = os.forkpty()

if pid == 0:  # Child process
    print("Child process running in pty")
    sys.stdout.flush()
    data = sys.stdin.readline()
    print(f"Child received: {data.strip()}")
    sys.exit(0)
else:  # Parent process
    print(f"Parent process with child PID: {pid}")
    os.write(fd, b"Hello from parent\n")
    output = os.read(fd, 1024)
    print(f"Parent received: {output.decode().strip()}")
    os.waitpid(pid, 0)

子进程在伪终端中运行,可以由父进程控制。父进程通过 pty 向子进程写入数据并从中读取数据。

这演示了使用伪终端的父进程和子进程之间的基本通信模式。

在 Pty 中运行 Shell

这个例子展示了如何在伪终端中运行 shell。父进程可以发送命令并读取输出,就像与真实终端交互一样。

shell_in_pty.py
import os
import select
import sys

pid, fd = os.forkpty()

if pid == 0:  # Child process
    os.execvp("bash", ["bash"])
else:  # Parent process
    while True:
        r, w, e = select.select([fd, sys.stdin], [], [])
        
        if fd in r:
            output = os.read(fd, 1024)
            if not output:
                break
            sys.stdout.write(output.decode())
            sys.stdout.flush()
            
        if sys.stdin in r:
            cmd = sys.stdin.readline()
            os.write(fd, cmd.encode())

子进程在伪终端中运行 bash。父进程使用 select 以非阻塞方式处理用户输入和 pty 输出。

这创建了一个交互式 shell 会话,可以通过 pty 发送命令并接收输出。

终端大小控制

这个例子演示了如何控制 pty 的终端大小。父进程可以修改窗口大小,从而影响子进程中的程序。

terminal_size.py
import os
import fcntl
import termios
import struct

def set_winsize(fd, rows, cols):
    winsize = struct.pack("HHHH", rows, cols, 0, 0)
    fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)

pid, fd = os.forkpty()

if pid == 0:  # Child process
    os.execvp("bash", ["bash"])
else:  # Parent process
    # Set initial terminal size
    set_winsize(fd, 40, 120)
    
    # After some time, resize the terminal
    import time
    time.sleep(2)
    set_winsize(fd, 20, 60)
    
    os.waitpid(pid, 0)

set_winsize 函数使用 ioctl 和 TIOCSWINSZ 来更改终端尺寸。 这会影响文本编辑器等程序显示内容的方式。

该示例显示了 40x120 的初始大小,然后在 2 秒后更改为 20x60。 子进程中的程序将适应新尺寸。

原始模式终端

这个例子将 pty 置于原始模式,禁用行缓冲和特殊字符处理。 这对于交互式应用程序很有用。

raw_mode.py
import os
import tty
import sys
import select

pid, fd = os.forkpty()

if pid == 0:  # Child process
    os.execvp("bash", ["bash"])
else:  # Parent process
    # Put terminal in raw mode
    old_settings = tty.tcgetattr(fd)
    tty.setraw(fd)
    
    try:
        while True:
            r, w, e = select.select([fd, sys.stdin], [], [])
            
            if fd in r:
                output = os.read(fd, 1024)
                if not output:
                    break
                sys.stdout.write(output.decode())
                sys.stdout.flush()
                
            if sys.stdin in r:
                cmd = sys.stdin.read(1)
                os.write(fd, cmd.encode())
    finally:
        # Restore original terminal settings
        tty.tcsetattr(fd, tty.TCSADRAIN, old_settings)

父进程使用 tty.setraw() 将 pty 置于原始模式。 这将禁用回显、行缓冲和特殊字符处理(例如 Ctrl+C)。

该示例确保在完成后恢复终端设置。 这对于避免将终端置于无法使用的状态非常重要。

带有环境控制的 Pty

这个例子展示了如何控制在 pty 中运行的子进程的环境。 我们在执行 exec 之前设置自定义环境变量。

pty_environment.py
import os
import sys

pid, fd = os.forkpty()

if pid == 0:  # Child process
    # Modify environment
    os.environ["CUSTOM_VAR"] = "special_value"
    os.environ["TERM"] = "xterm-256color"
    
    # Run command that shows environment
    os.execvp("bash", ["bash", "-c", "echo $CUSTOM_VAR; echo $TERM; sleep 2"])
else:  # Parent process
    output = os.read(fd, 1024)
    print("Child output:")
    print(output.decode())
    os.waitpid(pid, 0)

子进程在执行 bash 之前设置自定义环境变量。 这些变量仅对子进程及其后代可见。

该示例演示了如何设置自定义变量和标准终端相关变量(例如 TERM)。

Pty 中的信号处理

这个例子演示了通过 pty 通信的父进程和子进程之间的信号处理。 父进程可以向子进程发送信号。

pty_signals.py
import os
import signal
import time
import sys

pid, fd = os.forkpty()

if pid == 0:  # Child process
    # Set up signal handler
    def handler(signum, frame):
        print(f"\nChild received signal {signum}")
        sys.exit(0)
    
    signal.signal(signal.SIGINT, handler)
    signal.signal(signal.SIGTERM, handler)
    
    print("Child waiting for signals...")
    while True:
        time.sleep(1)
else:  # Parent process
    time.sleep(1)  # Give child time to start
    print(f"Parent sending SIGINT to child {pid}")
    os.kill(pid, signal.SIGINT)
    
    status = os.waitpid(pid, 0)
    print(f"Child exit status: {status[1]}")

子进程为 SIGINT 和 SIGTERM 设置信号处理程序。 父进程发送 SIGINT 以演示通过 pty 传递信号。

这展示了如何使用信号来控制在伪终端中运行的子进程。

带有超时的 Pty

这个例子实现了从 pty 读取时的超时,演示了如何处理子进程可能挂起或响应时间过长的情况。

pty_timeout.py
import os
import select
import sys
import time

pid, fd = os.forkpty()

if pid == 0:  # Child process
    print("Child will respond after 3 seconds")
    time.sleep(3)
    print("Child done")
    sys.exit(0)
else:  # Parent process
    timeout = 2  # seconds
    start = time.time()
    
    while True:
        remaining = timeout - (time.time() - start)
        if remaining <= 0:
            print("\nTimeout reached, killing child")
            os.kill(pid, signal.SIGTERM)
            break
            
        r, w, e = select.select([fd], [], [], remaining)
        
        if fd in r:
            output = os.read(fd, 1024)
            if not output:
                break
            print(output.decode(), end="")
        else:
            print("\nNo output within timeout period")
            os.kill(pid, signal.SIGTERM)
            break
    
    os.waitpid(pid, 0)

父进程等待子进程的输出,超时时间为 2 秒。 子进程故意休眠 3 秒以触发超时。

这演示了如何在通过 pty 与进程交互时实现超时,这对于健壮的应用程序非常重要。

安全注意事项

最佳实践

资料来源

作者

我的名字是 Jan Bodnar,我是一位充满激情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直在撰写编程文章。 迄今为止,我已经撰写了 1,400 多篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程