Python os.mkfifo 函数
上次修改时间:2025 年 4 月 11 日
这份全面的指南探讨了 Python 的 os.mkfifo 函数,该函数创建用于进程间通信的命名管道 (FIFO)。我们将介绍 FIFO 的创建、权限和实际的 IPC 示例。
基本定义
os.mkfifo 函数在文件系统中创建一个命名管道 (FIFO)。 FIFO 允许不相关的进程相互通信。
关键参数:path(FIFO 位置),mode(权限位,默认为 0o666)。 在类 Unix 系统上,FIFO 显示为特殊文件,但不存储数据。
创建基本 FIFO
os.mkfifo 最简单的用法是创建一个具有默认权限的命名管道。 此示例演示基本的 FIFO 创建和清理。
import os
import sys
fifo_path = "/tmp/my_fifo"
try:
# Create FIFO with default permissions (0o666)
os.mkfifo(fifo_path)
print(f"Created FIFO at {fifo_path}")
# FIFO exists now - could open and use it here
except FileExistsError:
print(f"FIFO {fifo_path} already exists", file=sys.stderr)
except OSError as e:
print(f"Error creating FIFO: {e}", file=sys.stderr)
finally:
# Clean up
if os.path.exists(fifo_path):
os.unlink(fifo_path)
print(f"Removed {fifo_path}")
此示例创建一个 FIFO,处理潜在的错误,并确保清理。 FIFO 在创建后会被删除,以演示正确的资源管理。
请注意,即使创建进程退出,FIFO 也会持续存在,直到显式删除。 不再需要 FIFO 时,请务必进行清理。
设置自定义权限
mode 参数控制 FIFO 权限。 此示例演示如何设置特定权限并使用 os.stat 验证它们。
import os
import stat
fifo_path = "/tmp/secure_fifo"
permissions = 0o600 # Read/write for owner only
try:
# Create FIFO with restricted permissions
os.mkfifo(fifo_path, mode=permissions)
print(f"Created secure FIFO at {fifo_path}")
# Verify permissions
st = os.stat(fifo_path)
actual_mode = stat.S_IMODE(st.st_mode)
print(f"Requested permissions: {oct(permissions)}")
print(f"Actual permissions: {oct(actual_mode)}")
if actual_mode == permissions:
print("Permissions match")
else:
print("Warning: Permissions don't match request")
finally:
if os.path.exists(fifo_path):
os.unlink(fifo_path)
这将创建一个具有 600 权限(仅所有者读/写)的 FIFO。 使用 os.stat 验证实际权限,以确保它们匹配。
请注意,umask 会影响最终权限。 实际模式将为 mode & ~umask。 如果需要,可以使用 os.umask() 来控制此设置。
使用 FIFO 的简单 IPC
此示例演示使用 FIFO 的基本进程间通信。 一个进程写入 FIFO,而另一个进程从 FIFO 读取。
import os
import sys
from multiprocessing import Process
fifo_path = "/tmp/ipc_fifo"
def writer():
try:
os.mkfifo(fifo_path)
print("Writer: FIFO created")
with open(fifo_path, "w") as fifo:
print("Writer: Sending message...")
fifo.write("Hello from writer!")
fifo.flush()
finally:
if os.path.exists(fifo_path):
os.unlink(fifo_path)
def reader():
try:
print("Reader: Waiting for message...")
with open(fifo_path, "r") as fifo:
message = fifo.read()
print(f"Reader received: {message}")
except FileNotFoundError:
print("Reader: FIFO not found yet")
if __name__ == "__main__":
# Start reader first (it will block waiting for writer)
reader_proc = Process(target=reader)
reader_proc.start()
# Start writer after a short delay
import time
time.sleep(1)
writer_proc = Process(target=writer)
writer_proc.start()
# Wait for processes to complete
writer_proc.join()
reader_proc.join()
写入器创建 FIFO,发送消息,然后退出。 读取器会阻塞,直到 FIFO 中有可用数据。 多进程模拟单独的进程。
请注意,打开 FIFO 以进行读取会阻塞,直到另一个进程打开它以进行写入,反之亦然。 这是正常的 FIFO 行为。
非阻塞 FIFO 操作
此示例演示如何使用 os.O_NONBLOCK 标志在 FIFO 上执行非阻塞操作。 当您不想等待另一端时很有用。
import os
import errno
fifo_path = "/tmp/nonblock_fifo"
try:
os.mkfifo(fifo_path)
# Open FIFO in non-blocking mode
fd = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK)
try:
# Try to read - will raise ENXIO if no writers
try:
data = os.read(fd, 1024)
print(f"Read: {data.decode()}")
except OSError as e:
if e.errno == errno.ENXIO:
print("No writers connected to FIFO")
else:
raise
finally:
os.close(fd)
finally:
if os.path.exists(fifo_path):
os.unlink(fifo_path)
这以非阻塞模式打开 FIFO 进行读取。 如果没有写入器连接,则读取操作将失败并显示 ENXIO 而不是阻塞。
非阻塞模式对于检查 FIFO 状态而不等待很有用,但需要仔细的错误处理才能正确操作。
双向通信
此示例演示使用两个 FIFO 的双向通信 - 每个方向一个。 请求-响应协议的常见模式。
import os
import sys
from multiprocessing import Process
request_fifo = "/tmp/request_fifo"
response_fifo = "/tmp/response_fifo"
def server():
try:
os.mkfifo(request_fifo)
os.mkfifo(response_fifo)
print("Server: FIFOs created")
with open(request_fifo, "r") as req, open(response_fifo, "w") as resp:
print("Server: Waiting for request...")
message = req.read()
print(f"Server received: {message}")
resp.write(f"Response to: {message}")
resp.flush()
finally:
for fifo in [request_fifo, response_fifo]:
if os.path.exists(fifo):
os.unlink(fifo)
def client():
try:
with open(request_fifo, "w") as req, open(response_fifo, "r") as resp:
print("Client: Sending request...")
req.write("Client request")
req.flush()
response = resp.read()
print(f"Client received: {response}")
except FileNotFoundError:
print("Client: FIFOs not ready yet")
if __name__ == "__main__":
server_proc = Process(target=server)
server_proc.start()
import time
time.sleep(1) # Give server time to create FIFOs
client_proc = Process(target=client)
client_proc.start()
server_proc.join()
client_proc.join()
服务器创建两个 FIFO - 一个用于请求,一个用于响应。 客户端发送请求并通过单独的管道等待响应。
这种模式在客户端-服务器架构中很常见,在客户端-服务器架构中,双方都需要独立发送和接收数据。
具有多个写入器的 FIFO
此示例演示了多个进程如何写入同一个 FIFO,读取器按顺序接收所有消息。
import os
import sys
from multiprocessing import Process
import time
fifo_path = "/tmp/multi_writer_fifo"
def writer(id):
try:
with open(fifo_path, "w") as fifo:
for i in range(3):
msg = f"Writer {id} message {i}"
fifo.write(msg + "\n")
fifo.flush()
time.sleep(0.5)
except FileNotFoundError:
print(f"Writer {id}: FIFO not found")
def reader():
try:
print("Reader: Waiting for messages...")
with open(fifo_path, "r") as fifo:
while True:
line = fifo.readline()
if not line:
break
print(f"Reader got: {line.strip()}")
except FileNotFoundError:
print("Reader: FIFO not found")
if __name__ == "__main__":
try:
os.mkfifo(fifo_path)
reader_proc = Process(target=reader)
reader_proc.start()
writers = []
for i in range(3):
writer_proc = Process(target=writer, args=(i,))
writers.append(writer_proc)
writer_proc.start()
time.sleep(0.2)
for w in writers:
w.join()
# Send EOF to reader
with open(fifo_path, "w") as fifo:
pass
reader_proc.join()
finally:
if os.path.exists(fifo_path):
os.unlink(fifo_path)
三个写入器进程将消息发送到单个 FIFO。 读取器按写入顺序接收所有消息。 每个写入器发送三个消息。
请注意,当写入小于 PIPE_BUF(在 Linux 上通常为 4096 字节)时,FIFO 会保留消息边界。 较大的写入可能会交错。
带有 Select 的 FIFO
这个高级示例演示了如何使用 select.select() 监视多个 FIFO 的可读性。 对于复用 I/O 操作很有用。
import os
import select
import sys
from multiprocessing import Process
fifo1 = "/tmp/select_fifo1"
fifo2 = "/tmp/select_fifo2"
def writer(fifo_path, messages):
try:
os.mkfifo(fifo_path)
with open(fifo_path, "w") as f:
for msg in messages:
f.write(msg + "\n")
f.flush()
import time
time.sleep(1)
finally:
if os.path.exists(fifo_path):
os.unlink(fifo_path)
def reader():
try:
os.mkfifo(fifo1)
os.mkfifo(fifo2)
# Open FIFOs in non-blocking mode
fd1 = os.open(fifo1, os.O_RDONLY | os.O_NONBLOCK)
fd2 = os.open(fifo2, os.O_RDONLY | os.O_NONBLOCK)
try:
while True:
# Wait for data on either FIFO
rlist, _, _ = select.select([fd1, fd2], [], [], 5.0)
if not rlist:
print("Timeout waiting for data")
break
for fd in rlist:
if fd == fd1:
data = os.read(fd1, 1024)
print(f"FIFO1: {data.decode().strip()}")
elif fd == fd2:
data = os.read(fd2, 1024)
print(f"FIFO2: {data.decode().strip()}")
finally:
os.close(fd1)
os.close(fd2)
finally:
for fifo in [fifo1, fifo2]:
if os.path.exists(fifo):
os.unlink(fifo)
if __name__ == "__main__":
reader_proc = Process(target=reader)
reader_proc.start()
import time
time.sleep(1) # Let reader create FIFOs
writer1 = Process(target=writer, args=(fifo1, ["A1", "A2", "A3"]))
writer2 = Process(target=writer, args=(fifo2, ["B1", "B2", "B3"]))
writer1.start()
writer2.start()
writer1.join()
writer2.join()
reader_proc.join()
读取器使用 select() 同时监视两个 FIFO。 每个写入器将其消息发送到各自的 FIFO。 读取器在收到消息时处理消息。
Select 可以有效地监视多个文件描述符。 如果没有数据到达,timeout 参数可防止无限期阻塞。
安全注意事项
- 权限控制: 对敏感 FIFO 设置限制性权限
- 清理: 不再需要 FIFO 时,务必将其删除
- 位置: 在 /tmp 等安全目录中创建 FIFO
- 竞争条件: 注意 FIFO 创建的 TOCTOU 问题
- 平台限制: FIFO 在某些系统上可能具有大小限制
最佳实践
- 错误处理: 始终处理 FileExistsError 和其他异常
- 资源管理: 对 FIFO 文件对象使用上下文管理器
- 清理: 在 finally 块中实现正确的清理
- 权限: 使用 mode 参数设置适当的权限
- 非阻塞操作: 考虑对响应式应用程序使用非阻塞模式
- 消息边界: 保持写入小于 PIPE_BUF 以避免交错
- 监视: 使用 select() 或类似方法有效地处理多个 FIFO
资料来源
作者
列出所有 Python 教程。