Python os.mkfifo 函数
上次修改时间:2025 年 4 月 11 日
这份全面的指南探讨了 Python 的 os.mkfifo
函数,该函数创建用于进程间通信的命名管道 (FIFO)。我们将介绍 FIFO 的创建、权限和实际的 IPC 示例。
基本定义
os.mkfifo
函数在文件系统中创建一个命名管道 (FIFO)。 FIFO 允许不相关的进程相互通信。
关键参数:path(FIFO 位置),mode(权限位,默认为 0o666)。 在类 Unix 系统上,FIFO 显示为特殊文件,但不存储数据。
创建基本 FIFO
os.mkfifo
最简单的用法是创建一个具有默认权限的命名管道。 此示例演示基本的 FIFO 创建和清理。
import os import sys fifo_path = "/tmp/my_fifo" try: # Create FIFO with default permissions (0o666) os.mkfifo(fifo_path) print(f"Created FIFO at {fifo_path}") # FIFO exists now - could open and use it here except FileExistsError: print(f"FIFO {fifo_path} already exists", file=sys.stderr) except OSError as e: print(f"Error creating FIFO: {e}", file=sys.stderr) finally: # Clean up if os.path.exists(fifo_path): os.unlink(fifo_path) print(f"Removed {fifo_path}")
此示例创建一个 FIFO,处理潜在的错误,并确保清理。 FIFO 在创建后会被删除,以演示正确的资源管理。
请注意,即使创建进程退出,FIFO 也会持续存在,直到显式删除。 不再需要 FIFO 时,请务必进行清理。
设置自定义权限
mode 参数控制 FIFO 权限。 此示例演示如何设置特定权限并使用 os.stat 验证它们。
import os import stat fifo_path = "/tmp/secure_fifo" permissions = 0o600 # Read/write for owner only try: # Create FIFO with restricted permissions os.mkfifo(fifo_path, mode=permissions) print(f"Created secure FIFO at {fifo_path}") # Verify permissions st = os.stat(fifo_path) actual_mode = stat.S_IMODE(st.st_mode) print(f"Requested permissions: {oct(permissions)}") print(f"Actual permissions: {oct(actual_mode)}") if actual_mode == permissions: print("Permissions match") else: print("Warning: Permissions don't match request") finally: if os.path.exists(fifo_path): os.unlink(fifo_path)
这将创建一个具有 600 权限(仅所有者读/写)的 FIFO。 使用 os.stat 验证实际权限,以确保它们匹配。
请注意,umask 会影响最终权限。 实际模式将为 mode & ~umask。 如果需要,可以使用 os.umask() 来控制此设置。
使用 FIFO 的简单 IPC
此示例演示使用 FIFO 的基本进程间通信。 一个进程写入 FIFO,而另一个进程从 FIFO 读取。
import os import sys from multiprocessing import Process fifo_path = "/tmp/ipc_fifo" def writer(): try: os.mkfifo(fifo_path) print("Writer: FIFO created") with open(fifo_path, "w") as fifo: print("Writer: Sending message...") fifo.write("Hello from writer!") fifo.flush() finally: if os.path.exists(fifo_path): os.unlink(fifo_path) def reader(): try: print("Reader: Waiting for message...") with open(fifo_path, "r") as fifo: message = fifo.read() print(f"Reader received: {message}") except FileNotFoundError: print("Reader: FIFO not found yet") if __name__ == "__main__": # Start reader first (it will block waiting for writer) reader_proc = Process(target=reader) reader_proc.start() # Start writer after a short delay import time time.sleep(1) writer_proc = Process(target=writer) writer_proc.start() # Wait for processes to complete writer_proc.join() reader_proc.join()
写入器创建 FIFO,发送消息,然后退出。 读取器会阻塞,直到 FIFO 中有可用数据。 多进程模拟单独的进程。
请注意,打开 FIFO 以进行读取会阻塞,直到另一个进程打开它以进行写入,反之亦然。 这是正常的 FIFO 行为。
非阻塞 FIFO 操作
此示例演示如何使用 os.O_NONBLOCK 标志在 FIFO 上执行非阻塞操作。 当您不想等待另一端时很有用。
import os import errno fifo_path = "/tmp/nonblock_fifo" try: os.mkfifo(fifo_path) # Open FIFO in non-blocking mode fd = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK) try: # Try to read - will raise ENXIO if no writers try: data = os.read(fd, 1024) print(f"Read: {data.decode()}") except OSError as e: if e.errno == errno.ENXIO: print("No writers connected to FIFO") else: raise finally: os.close(fd) finally: if os.path.exists(fifo_path): os.unlink(fifo_path)
这以非阻塞模式打开 FIFO 进行读取。 如果没有写入器连接,则读取操作将失败并显示 ENXIO 而不是阻塞。
非阻塞模式对于检查 FIFO 状态而不等待很有用,但需要仔细的错误处理才能正确操作。
双向通信
此示例演示使用两个 FIFO 的双向通信 - 每个方向一个。 请求-响应协议的常见模式。
import os import sys from multiprocessing import Process request_fifo = "/tmp/request_fifo" response_fifo = "/tmp/response_fifo" def server(): try: os.mkfifo(request_fifo) os.mkfifo(response_fifo) print("Server: FIFOs created") with open(request_fifo, "r") as req, open(response_fifo, "w") as resp: print("Server: Waiting for request...") message = req.read() print(f"Server received: {message}") resp.write(f"Response to: {message}") resp.flush() finally: for fifo in [request_fifo, response_fifo]: if os.path.exists(fifo): os.unlink(fifo) def client(): try: with open(request_fifo, "w") as req, open(response_fifo, "r") as resp: print("Client: Sending request...") req.write("Client request") req.flush() response = resp.read() print(f"Client received: {response}") except FileNotFoundError: print("Client: FIFOs not ready yet") if __name__ == "__main__": server_proc = Process(target=server) server_proc.start() import time time.sleep(1) # Give server time to create FIFOs client_proc = Process(target=client) client_proc.start() server_proc.join() client_proc.join()
服务器创建两个 FIFO - 一个用于请求,一个用于响应。 客户端发送请求并通过单独的管道等待响应。
这种模式在客户端-服务器架构中很常见,在客户端-服务器架构中,双方都需要独立发送和接收数据。
具有多个写入器的 FIFO
此示例演示了多个进程如何写入同一个 FIFO,读取器按顺序接收所有消息。
import os import sys from multiprocessing import Process import time fifo_path = "/tmp/multi_writer_fifo" def writer(id): try: with open(fifo_path, "w") as fifo: for i in range(3): msg = f"Writer {id} message {i}" fifo.write(msg + "\n") fifo.flush() time.sleep(0.5) except FileNotFoundError: print(f"Writer {id}: FIFO not found") def reader(): try: print("Reader: Waiting for messages...") with open(fifo_path, "r") as fifo: while True: line = fifo.readline() if not line: break print(f"Reader got: {line.strip()}") except FileNotFoundError: print("Reader: FIFO not found") if __name__ == "__main__": try: os.mkfifo(fifo_path) reader_proc = Process(target=reader) reader_proc.start() writers = [] for i in range(3): writer_proc = Process(target=writer, args=(i,)) writers.append(writer_proc) writer_proc.start() time.sleep(0.2) for w in writers: w.join() # Send EOF to reader with open(fifo_path, "w") as fifo: pass reader_proc.join() finally: if os.path.exists(fifo_path): os.unlink(fifo_path)
三个写入器进程将消息发送到单个 FIFO。 读取器按写入顺序接收所有消息。 每个写入器发送三个消息。
请注意,当写入小于 PIPE_BUF(在 Linux 上通常为 4096 字节)时,FIFO 会保留消息边界。 较大的写入可能会交错。
带有 Select 的 FIFO
这个高级示例演示了如何使用 select.select() 监视多个 FIFO 的可读性。 对于复用 I/O 操作很有用。
import os import select import sys from multiprocessing import Process fifo1 = "/tmp/select_fifo1" fifo2 = "/tmp/select_fifo2" def writer(fifo_path, messages): try: os.mkfifo(fifo_path) with open(fifo_path, "w") as f: for msg in messages: f.write(msg + "\n") f.flush() import time time.sleep(1) finally: if os.path.exists(fifo_path): os.unlink(fifo_path) def reader(): try: os.mkfifo(fifo1) os.mkfifo(fifo2) # Open FIFOs in non-blocking mode fd1 = os.open(fifo1, os.O_RDONLY | os.O_NONBLOCK) fd2 = os.open(fifo2, os.O_RDONLY | os.O_NONBLOCK) try: while True: # Wait for data on either FIFO rlist, _, _ = select.select([fd1, fd2], [], [], 5.0) if not rlist: print("Timeout waiting for data") break for fd in rlist: if fd == fd1: data = os.read(fd1, 1024) print(f"FIFO1: {data.decode().strip()}") elif fd == fd2: data = os.read(fd2, 1024) print(f"FIFO2: {data.decode().strip()}") finally: os.close(fd1) os.close(fd2) finally: for fifo in [fifo1, fifo2]: if os.path.exists(fifo): os.unlink(fifo) if __name__ == "__main__": reader_proc = Process(target=reader) reader_proc.start() import time time.sleep(1) # Let reader create FIFOs writer1 = Process(target=writer, args=(fifo1, ["A1", "A2", "A3"])) writer2 = Process(target=writer, args=(fifo2, ["B1", "B2", "B3"])) writer1.start() writer2.start() writer1.join() writer2.join() reader_proc.join()
读取器使用 select() 同时监视两个 FIFO。 每个写入器将其消息发送到各自的 FIFO。 读取器在收到消息时处理消息。
Select 可以有效地监视多个文件描述符。 如果没有数据到达,timeout 参数可防止无限期阻塞。
安全注意事项
- 权限控制: 对敏感 FIFO 设置限制性权限
- 清理: 不再需要 FIFO 时,务必将其删除
- 位置: 在 /tmp 等安全目录中创建 FIFO
- 竞争条件: 注意 FIFO 创建的 TOCTOU 问题
- 平台限制: FIFO 在某些系统上可能具有大小限制
最佳实践
- 错误处理: 始终处理 FileExistsError 和其他异常
- 资源管理: 对 FIFO 文件对象使用上下文管理器
- 清理: 在 finally 块中实现正确的清理
- 权限: 使用 mode 参数设置适当的权限
- 非阻塞操作: 考虑对响应式应用程序使用非阻塞模式
- 消息边界: 保持写入小于 PIPE_BUF 以避免交错
- 监视: 使用 select() 或类似方法有效地处理多个 FIFO
资料来源
作者
列出所有 Python 教程。