Python os.pipe 函数
上次修改时间:2025 年 4 月 11 日
本综合指南探讨了 Python 的 os.pipe 函数,该函数用于创建进程间通信的管道。我们将介绍管道创建、数据流、父子进程以及实际的 IPC 示例。
基本定义
os.pipe 函数创建一个管道 - 用于进程间通信的单向数据通道。它返回一对文件描述符(读,写)。
管道提供了一种进程通过写入和读取管道来进行通信的方式。写入到写入端的数据可以从读取端读取。
创建一个简单的管道
这个基本示例演示了如何创建一个管道并在单个进程中使用它。虽然不实用,但它展示了基本的管道操作。
import os
# Create a pipe
read_fd, write_fd = os.pipe()
# Write data to the pipe
os.write(write_fd, b"Hello from pipe!")
# Read data from the pipe
data = os.read(read_fd, 100)
print(f"Received: {data.decode()}")
# Close file descriptors
os.close(read_fd)
os.close(write_fd)
这会创建一个管道,将数据写入其中,然后再读回来。请注意,完成后必须关闭这两个文件描述符,以释放系统资源。
管道是单向的 - 数据从 write_fd 流向 read_fd。尝试从 write_fd 读取或写入 read_fd 将会失败。
父子进程通信
管道的一个常见用例是父进程和子进程之间的通信。此示例显示如何派生一个进程并交换数据。
import os
# Create pipe before forking
read_fd, write_fd = os.pipe()
pid = os.fork()
if pid > 0: # Parent process
os.close(read_fd) # Close unused read end
# Send message to child
message = "Hello child process!"
os.write(write_fd, message.encode())
os.close(write_fd)
print("Parent sent message to child")
else: # Child process
os.close(write_fd) # Close unused write end
# Receive message from parent
data = os.read(read_fd, 1024)
print(f"Child received: {data.decode()}")
os.close(read_fd)
父进程关闭其读取端并写入管道。子进程关闭其写入端并从管道读取。这演示了单向通信。
请记住关闭两个进程中未使用的端,以防止资源泄漏和潜在的死锁。
双向通信
对于进程之间的双向通信,我们需要两个管道。此示例显示如何设置父进程和子进程之间的双向通信。
import os
# Create two pipes
parent_read, child_write = os.pipe()
child_read, parent_write = os.pipe()
pid = os.fork()
if pid > 0: # Parent process
os.close(child_read)
os.close(child_write)
# Send to child
os.write(parent_write, b"Parent says hello")
# Receive from child
data = os.read(parent_read, 1024)
print(f"Parent received: {data.decode()}")
os.close(parent_read)
os.close(parent_write)
else: # Child process
os.close(parent_read)
os.close(parent_write)
# Receive from parent
data = os.read(child_read, 1024)
print(f"Child received: {data.decode()}")
# Send to parent
os.write(child_write, b"Child says hi back")
os.close(child_read)
os.close(child_write)
这会创建两个管道 - 一个用于父进程到子进程,另一个用于子进程到父进程的通信。每个进程都会关闭它不使用的端。
双向通信需要仔细管理文件描述符,以避免死锁并确保正确的清理。
带子进程的管道
管道可以将 Python 进程与外部程序连接起来。此示例显示如何将管道与 subprocess.Popen 一起使用以进行进程间通信。
import os
import subprocess
# Create pipe
read_fd, write_fd = os.pipe()
# Start subprocess with pipe
proc = subprocess.Popen(["cat"], stdin=subprocess.PIPE, stdout=write_fd,
close_fds=True)
# Write to subprocess stdin
proc.stdin.write(b"Data for subprocess\n")
proc.stdin.close()
# Read from pipe
data = os.read(read_fd, 1024)
print(f"Received from subprocess: {data.decode()}")
# Clean up
os.close(read_fd)
os.close(write_fd)
proc.wait()
这会创建一个管道,并将其连接到子进程的标准输出。父进程写入子进程的标准输入,并从连接到其标准输出的管道读取。
close_fds=True 确保文件描述符不会泄漏到子进程。此技术适用于任何命令行程序。
非阻塞管道读取
默认情况下,管道读取会阻塞,直到有数据可用。此示例显示如何使用 fcntl 进行非阻塞读取并处理部分读取。
import os
import fcntl
# Create pipe
read_fd, write_fd = os.pipe()
# Set read end to non-blocking
flags = fcntl.fcntl(read_fd, fcntl.F_GETFL)
fcntl.fcntl(read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Try reading (will fail with EAGAIN if no data)
try:
data = os.read(read_fd, 1024)
print(f"Got data: {data.decode()}")
except BlockingIOError:
print("No data available yet")
# Write some data
os.write(write_fd, b"Now there's data")
# Read again
try:
data = os.read(read_fd, 1024)
print(f"Got data: {data.decode()}")
except BlockingIOError:
print("No data available")
# Clean up
os.close(read_fd)
os.close(write_fd)
这会在管道的读取端设置 O_NONBLOCK 标志。现在,当没有数据可用时,读取将引发 BlockingIOError (EAGAIN) 而不是阻塞。
非阻塞 I/O 适用于事件循环或轮询多个管道时,但需要仔细的错误处理,以处理部分读取和写入。
带有 select 的管道
select 模块可以监视多个管道的可读性。此示例显示如何将 select 与管道一起使用,以实现高效的 I/O 多路复用。
import os
import select
# Create two pipes
pipe1_r, pipe1_w = os.pipe()
pipe2_r, pipe2_w = os.pipe()
# Write to second pipe
os.write(pipe2_w, b"Data in pipe 2")
# Use select to monitor pipes
readable, _, _ = select.select([pipe1_r, pipe2_r], [], [], 1.0)
for fd in readable:
if fd == pipe1_r:
print("Pipe 1 has data")
data = os.read(pipe1_r, 1024)
elif fd == pipe2_r:
print("Pipe 2 has data")
data = os.read(pipe2_r, 1024)
print(f"Received: {data.decode()}")
# Clean up
os.close(pipe1_r)
os.close(pipe1_w)
os.close(pipe2_r)
os.close(pipe2_w)
Select 可以有效地监视多个文件描述符的可读性。它仅返回那些具有可读取数据的描述符。
此模式适用于需要在没有繁忙等待的情况下同时处理多个通信通道的服务器或程序。
大数据传输
管道可以处理大数据传输,但需要适当的缓冲。此示例演示了大型数据的分块读取和写入。
import os
read_fd, write_fd = os.pipe()
# Writer process
pid = os.fork()
if pid == 0: # Child (writer)
os.close(read_fd)
large_data = b"X" * 1000000 # 1MB of data
chunk_size = 4096
for i in range(0, len(large_data), chunk_size):
chunk = large_data[i:i+chunk_size]
os.write(write_fd, chunk)
os.close(write_fd)
os._exit(0)
# Reader process
os.close(write_fd)
received = bytearray()
while True:
chunk = os.read(read_fd, 4096)
if not chunk:
break
received.extend(chunk)
os.close(read_fd)
print(f"Received {len(received)} bytes")
此示例以块的形式传输 1MB 的数据。写入器以 4KB 的块发送数据,读取器累积这些块,直到管道关闭。
对于大型传输,分块可以防止缓冲区溢出并允许进度跟踪。管道自动处理进程之间的流量控制。
安全注意事项
- 数据完整性: 管道提供可靠的按顺序交付
- 访问控制: 管道文件描述符继承到子进程
- 缓冲区限制: 管道具有系统定义的容量限制
- 死锁: 未正确关闭的管道可能导致挂起
- 平台差异: 行为可能因操作系统而异
最佳实践
- 关闭未使用的端: 始终关闭未使用的管道端
- 处理错误: 检查 EPIPE 和其他管道错误
- 使用分块: 对于大数据,以合理的块读取/写入
- 考虑替代方案: 对于复杂的 IPC,请考虑多进程处理
- 清理: 确保所有文件描述符都已正确关闭
资料来源
作者
列出所有 Python 教程。