ZetCode

Python os.pipe 函数

上次修改时间:2025 年 4 月 11 日

本综合指南探讨了 Python 的 os.pipe 函数,该函数用于创建进程间通信的管道。我们将介绍管道创建、数据流、父子进程以及实际的 IPC 示例。

基本定义

os.pipe 函数创建一个管道 - 用于进程间通信的单向数据通道。它返回一对文件描述符(读,写)。

管道提供了一种进程通过写入和读取管道来进行通信的方式。写入到写入端的数据可以从读取端读取。

创建一个简单的管道

这个基本示例演示了如何创建一个管道并在单个进程中使用它。虽然不实用,但它展示了基本的管道操作。

simple_pipe.py
import os

# Create a pipe
read_fd, write_fd = os.pipe()

# Write data to the pipe
os.write(write_fd, b"Hello from pipe!")

# Read data from the pipe
data = os.read(read_fd, 100)
print(f"Received: {data.decode()}")

# Close file descriptors
os.close(read_fd)
os.close(write_fd)

这会创建一个管道,将数据写入其中,然后再读回来。请注意,完成后必须关闭这两个文件描述符,以释放系统资源。

管道是单向的 - 数据从 write_fd 流向 read_fd。尝试从 write_fd 读取或写入 read_fd 将会失败。

父子进程通信

管道的一个常见用例是父进程和子进程之间的通信。此示例显示如何派生一个进程并交换数据。

parent_child_pipe.py
import os

# Create pipe before forking
read_fd, write_fd = os.pipe()

pid = os.fork()

if pid > 0:  # Parent process
    os.close(read_fd)  # Close unused read end
    
    # Send message to child
    message = "Hello child process!"
    os.write(write_fd, message.encode())
    os.close(write_fd)
    
    print("Parent sent message to child")
else:  # Child process
    os.close(write_fd)  # Close unused write end
    
    # Receive message from parent
    data = os.read(read_fd, 1024)
    print(f"Child received: {data.decode()}")
    os.close(read_fd)

父进程关闭其读取端并写入管道。子进程关闭其写入端并从管道读取。这演示了单向通信。

请记住关闭两个进程中未使用的端,以防止资源泄漏和潜在的死锁。

双向通信

对于进程之间的双向通信,我们需要两个管道。此示例显示如何设置父进程和子进程之间的双向通信。

bidirectional_pipe.py
import os

# Create two pipes
parent_read, child_write = os.pipe()
child_read, parent_write = os.pipe()

pid = os.fork()

if pid > 0:  # Parent process
    os.close(child_read)
    os.close(child_write)
    
    # Send to child
    os.write(parent_write, b"Parent says hello")
    
    # Receive from child
    data = os.read(parent_read, 1024)
    print(f"Parent received: {data.decode()}")
    
    os.close(parent_read)
    os.close(parent_write)
else:  # Child process
    os.close(parent_read)
    os.close(parent_write)
    
    # Receive from parent
    data = os.read(child_read, 1024)
    print(f"Child received: {data.decode()}")
    
    # Send to parent
    os.write(child_write, b"Child says hi back")
    
    os.close(child_read)
    os.close(child_write)

这会创建两个管道 - 一个用于父进程到子进程,另一个用于子进程到父进程的通信。每个进程都会关闭它不使用的端。

双向通信需要仔细管理文件描述符,以避免死锁并确保正确的清理。

带子进程的管道

管道可以将 Python 进程与外部程序连接起来。此示例显示如何将管道与 subprocess.Popen 一起使用以进行进程间通信。

subprocess_pipe.py
import os
import subprocess

# Create pipe
read_fd, write_fd = os.pipe()

# Start subprocess with pipe
proc = subprocess.Popen(["cat"], stdin=subprocess.PIPE, stdout=write_fd,
                       close_fds=True)

# Write to subprocess stdin
proc.stdin.write(b"Data for subprocess\n")
proc.stdin.close()

# Read from pipe
data = os.read(read_fd, 1024)
print(f"Received from subprocess: {data.decode()}")

# Clean up
os.close(read_fd)
os.close(write_fd)
proc.wait()

这会创建一个管道,并将其连接到子进程的标准输出。父进程写入子进程的标准输入,并从连接到其标准输出的管道读取。

close_fds=True 确保文件描述符不会泄漏到子进程。此技术适用于任何命令行程序。

非阻塞管道读取

默认情况下,管道读取会阻塞,直到有数据可用。此示例显示如何使用 fcntl 进行非阻塞读取并处理部分读取。

nonblocking_pipe.py
import os
import fcntl

# Create pipe
read_fd, write_fd = os.pipe()

# Set read end to non-blocking
flags = fcntl.fcntl(read_fd, fcntl.F_GETFL)
fcntl.fcntl(read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

# Try reading (will fail with EAGAIN if no data)
try:
    data = os.read(read_fd, 1024)
    print(f"Got data: {data.decode()}")
except BlockingIOError:
    print("No data available yet")

# Write some data
os.write(write_fd, b"Now there's data")

# Read again
try:
    data = os.read(read_fd, 1024)
    print(f"Got data: {data.decode()}")
except BlockingIOError:
    print("No data available")

# Clean up
os.close(read_fd)
os.close(write_fd)

这会在管道的读取端设置 O_NONBLOCK 标志。现在,当没有数据可用时,读取将引发 BlockingIOError (EAGAIN) 而不是阻塞。

非阻塞 I/O 适用于事件循环或轮询多个管道时,但需要仔细的错误处理,以处理部分读取和写入。

带有 select 的管道

select 模块可以监视多个管道的可读性。此示例显示如何将 select 与管道一起使用,以实现高效的 I/O 多路复用。

select_pipe.py
import os
import select

# Create two pipes
pipe1_r, pipe1_w = os.pipe()
pipe2_r, pipe2_w = os.pipe()

# Write to second pipe
os.write(pipe2_w, b"Data in pipe 2")

# Use select to monitor pipes
readable, _, _ = select.select([pipe1_r, pipe2_r], [], [], 1.0)

for fd in readable:
    if fd == pipe1_r:
        print("Pipe 1 has data")
        data = os.read(pipe1_r, 1024)
    elif fd == pipe2_r:
        print("Pipe 2 has data")
        data = os.read(pipe2_r, 1024)
        print(f"Received: {data.decode()}")

# Clean up
os.close(pipe1_r)
os.close(pipe1_w)
os.close(pipe2_r)
os.close(pipe2_w)

Select 可以有效地监视多个文件描述符的可读性。它仅返回那些具有可读取数据的描述符。

此模式适用于需要在没有繁忙等待的情况下同时处理多个通信通道的服务器或程序。

大数据传输

管道可以处理大数据传输,但需要适当的缓冲。此示例演示了大型数据的分块读取和写入。

large_data_pipe.py
import os

read_fd, write_fd = os.pipe()

# Writer process
pid = os.fork()
if pid == 0:  # Child (writer)
    os.close(read_fd)
    
    large_data = b"X" * 1000000  # 1MB of data
    chunk_size = 4096
    
    for i in range(0, len(large_data), chunk_size):
        chunk = large_data[i:i+chunk_size]
        os.write(write_fd, chunk)
    
    os.close(write_fd)
    os._exit(0)

# Reader process
os.close(write_fd)
received = bytearray()

while True:
    chunk = os.read(read_fd, 4096)
    if not chunk:
        break
    received.extend(chunk)

os.close(read_fd)
print(f"Received {len(received)} bytes")

此示例以块的形式传输 1MB 的数据。写入器以 4KB 的块发送数据,读取器累积这些块,直到管道关闭。

对于大型传输,分块可以防止缓冲区溢出并允许进度跟踪。管道自动处理进程之间的流量控制。

安全注意事项

最佳实践

资料来源

作者

我叫 Jan Bodnar,是一位充满热情的程序员,拥有丰富的编程经验。自 2007 年以来,我一直撰写编程文章。迄今为止,我已经撰写了 1,400 多篇文章和 8 本电子书。我拥有超过十年的编程教学经验。

列出所有 Python 教程