ZetCode

Python os.mkfifo 函数

上次修改时间:2025 年 4 月 11 日

这份全面的指南探讨了 Python 的 os.mkfifo 函数,该函数创建用于进程间通信的命名管道 (FIFO)。我们将介绍 FIFO 的创建、权限和实际的 IPC 示例。

基本定义

os.mkfifo 函数在文件系统中创建一个命名管道 (FIFO)。 FIFO 允许不相关的进程相互通信。

关键参数:path(FIFO 位置),mode(权限位,默认为 0o666)。 在类 Unix 系统上,FIFO 显示为特殊文件,但不存储数据。

创建基本 FIFO

os.mkfifo 最简单的用法是创建一个具有默认权限的命名管道。 此示例演示基本的 FIFO 创建和清理。

basic_fifo.py
import os
import sys

fifo_path = "/tmp/my_fifo"

try:
    # Create FIFO with default permissions (0o666)
    os.mkfifo(fifo_path)
    print(f"Created FIFO at {fifo_path}")
    
    # FIFO exists now - could open and use it here
    
except FileExistsError:
    print(f"FIFO {fifo_path} already exists", file=sys.stderr)
except OSError as e:
    print(f"Error creating FIFO: {e}", file=sys.stderr)
finally:
    # Clean up
    if os.path.exists(fifo_path):
        os.unlink(fifo_path)
        print(f"Removed {fifo_path}")

此示例创建一个 FIFO,处理潜在的错误,并确保清理。 FIFO 在创建后会被删除,以演示正确的资源管理。

请注意,即使创建进程退出,FIFO 也会持续存在,直到显式删除。 不再需要 FIFO 时,请务必进行清理。

设置自定义权限

mode 参数控制 FIFO 权限。 此示例演示如何设置特定权限并使用 os.stat 验证它们。

fifo_permissions.py
import os
import stat

fifo_path = "/tmp/secure_fifo"
permissions = 0o600  # Read/write for owner only

try:
    # Create FIFO with restricted permissions
    os.mkfifo(fifo_path, mode=permissions)
    print(f"Created secure FIFO at {fifo_path}")
    
    # Verify permissions
    st = os.stat(fifo_path)
    actual_mode = stat.S_IMODE(st.st_mode)
    print(f"Requested permissions: {oct(permissions)}")
    print(f"Actual permissions: {oct(actual_mode)}")
    
    if actual_mode == permissions:
        print("Permissions match")
    else:
        print("Warning: Permissions don't match request")
        
finally:
    if os.path.exists(fifo_path):
        os.unlink(fifo_path)

这将创建一个具有 600 权限(仅所有者读/写)的 FIFO。 使用 os.stat 验证实际权限,以确保它们匹配。

请注意,umask 会影响最终权限。 实际模式将为 mode & ~umask。 如果需要,可以使用 os.umask() 来控制此设置。

使用 FIFO 的简单 IPC

此示例演示使用 FIFO 的基本进程间通信。 一个进程写入 FIFO,而另一个进程从 FIFO 读取。

simple_ipc.py
import os
import sys
from multiprocessing import Process

fifo_path = "/tmp/ipc_fifo"

def writer():
    try:
        os.mkfifo(fifo_path)
        print("Writer: FIFO created")
        
        with open(fifo_path, "w") as fifo:
            print("Writer: Sending message...")
            fifo.write("Hello from writer!")
            fifo.flush()
            
    finally:
        if os.path.exists(fifo_path):
            os.unlink(fifo_path)

def reader():
    try:
        print("Reader: Waiting for message...")
        with open(fifo_path, "r") as fifo:
            message = fifo.read()
            print(f"Reader received: {message}")
            
    except FileNotFoundError:
        print("Reader: FIFO not found yet")

if __name__ == "__main__":
    # Start reader first (it will block waiting for writer)
    reader_proc = Process(target=reader)
    reader_proc.start()
    
    # Start writer after a short delay
    import time
    time.sleep(1)
    writer_proc = Process(target=writer)
    writer_proc.start()
    
    # Wait for processes to complete
    writer_proc.join()
    reader_proc.join()

写入器创建 FIFO,发送消息,然后退出。 读取器会阻塞,直到 FIFO 中有可用数据。 多进程模拟单独的进程。

请注意,打开 FIFO 以进行读取会阻塞,直到另一个进程打开它以进行写入,反之亦然。 这是正常的 FIFO 行为。

非阻塞 FIFO 操作

此示例演示如何使用 os.O_NONBLOCK 标志在 FIFO 上执行非阻塞操作。 当您不想等待另一端时很有用。

nonblocking_fifo.py
import os
import errno

fifo_path = "/tmp/nonblock_fifo"

try:
    os.mkfifo(fifo_path)
    
    # Open FIFO in non-blocking mode
    fd = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK)
    try:
        # Try to read - will raise ENXIO if no writers
        try:
            data = os.read(fd, 1024)
            print(f"Read: {data.decode()}")
        except OSError as e:
            if e.errno == errno.ENXIO:
                print("No writers connected to FIFO")
            else:
                raise
                
    finally:
        os.close(fd)
        
finally:
    if os.path.exists(fifo_path):
        os.unlink(fifo_path)

这以非阻塞模式打开 FIFO 进行读取。 如果没有写入器连接,则读取操作将失败并显示 ENXIO 而不是阻塞。

非阻塞模式对于检查 FIFO 状态而不等待很有用,但需要仔细的错误处理才能正确操作。

双向通信

此示例演示使用两个 FIFO 的双向通信 - 每个方向一个。 请求-响应协议的常见模式。

bidirectional_fifo.py
import os
import sys
from multiprocessing import Process

request_fifo = "/tmp/request_fifo"
response_fifo = "/tmp/response_fifo"

def server():
    try:
        os.mkfifo(request_fifo)
        os.mkfifo(response_fifo)
        print("Server: FIFOs created")
        
        with open(request_fifo, "r") as req, open(response_fifo, "w") as resp:
            print("Server: Waiting for request...")
            message = req.read()
            print(f"Server received: {message}")
            
            resp.write(f"Response to: {message}")
            resp.flush()
            
    finally:
        for fifo in [request_fifo, response_fifo]:
            if os.path.exists(fifo):
                os.unlink(fifo)

def client():
    try:
        with open(request_fifo, "w") as req, open(response_fifo, "r") as resp:
            print("Client: Sending request...")
            req.write("Client request")
            req.flush()
            
            response = resp.read()
            print(f"Client received: {response}")
            
    except FileNotFoundError:
        print("Client: FIFOs not ready yet")

if __name__ == "__main__":
    server_proc = Process(target=server)
    server_proc.start()
    
    import time
    time.sleep(1)  # Give server time to create FIFOs
    
    client_proc = Process(target=client)
    client_proc.start()
    
    server_proc.join()
    client_proc.join()

服务器创建两个 FIFO - 一个用于请求,一个用于响应。 客户端发送请求并通过单独的管道等待响应。

这种模式在客户端-服务器架构中很常见,在客户端-服务器架构中,双方都需要独立发送和接收数据。

具有多个写入器的 FIFO

此示例演示了多个进程如何写入同一个 FIFO,读取器按顺序接收所有消息。

multi_writer_fifo.py
import os
import sys
from multiprocessing import Process
import time

fifo_path = "/tmp/multi_writer_fifo"

def writer(id):
    try:
        with open(fifo_path, "w") as fifo:
            for i in range(3):
                msg = f"Writer {id} message {i}"
                fifo.write(msg + "\n")
                fifo.flush()
                time.sleep(0.5)
    except FileNotFoundError:
        print(f"Writer {id}: FIFO not found")

def reader():
    try:
        print("Reader: Waiting for messages...")
        with open(fifo_path, "r") as fifo:
            while True:
                line = fifo.readline()
                if not line:
                    break
                print(f"Reader got: {line.strip()}")
    except FileNotFoundError:
        print("Reader: FIFO not found")

if __name__ == "__main__":
    try:
        os.mkfifo(fifo_path)
        
        reader_proc = Process(target=reader)
        reader_proc.start()
        
        writers = []
        for i in range(3):
            writer_proc = Process(target=writer, args=(i,))
            writers.append(writer_proc)
            writer_proc.start()
            time.sleep(0.2)
            
        for w in writers:
            w.join()
            
        # Send EOF to reader
        with open(fifo_path, "w") as fifo:
            pass
            
        reader_proc.join()
        
    finally:
        if os.path.exists(fifo_path):
            os.unlink(fifo_path)

三个写入器进程将消息发送到单个 FIFO。 读取器按写入顺序接收所有消息。 每个写入器发送三个消息。

请注意,当写入小于 PIPE_BUF(在 Linux 上通常为 4096 字节)时,FIFO 会保留消息边界。 较大的写入可能会交错。

带有 Select 的 FIFO

这个高级示例演示了如何使用 select.select() 监视多个 FIFO 的可读性。 对于复用 I/O 操作很有用。

select_fifo.py
import os
import select
import sys
from multiprocessing import Process

fifo1 = "/tmp/select_fifo1"
fifo2 = "/tmp/select_fifo2"

def writer(fifo_path, messages):
    try:
        os.mkfifo(fifo_path)
        with open(fifo_path, "w") as f:
            for msg in messages:
                f.write(msg + "\n")
                f.flush()
                import time
                time.sleep(1)
    finally:
        if os.path.exists(fifo_path):
            os.unlink(fifo_path)

def reader():
    try:
        os.mkfifo(fifo1)
        os.mkfifo(fifo2)
        
        # Open FIFOs in non-blocking mode
        fd1 = os.open(fifo1, os.O_RDONLY | os.O_NONBLOCK)
        fd2 = os.open(fifo2, os.O_RDONLY | os.O_NONBLOCK)
        
        try:
            while True:
                # Wait for data on either FIFO
                rlist, _, _ = select.select([fd1, fd2], [], [], 5.0)
                
                if not rlist:
                    print("Timeout waiting for data")
                    break
                    
                for fd in rlist:
                    if fd == fd1:
                        data = os.read(fd1, 1024)
                        print(f"FIFO1: {data.decode().strip()}")
                    elif fd == fd2:
                        data = os.read(fd2, 1024)
                        print(f"FIFO2: {data.decode().strip()}")
                        
        finally:
            os.close(fd1)
            os.close(fd2)
            
    finally:
        for fifo in [fifo1, fifo2]:
            if os.path.exists(fifo):
                os.unlink(fifo)

if __name__ == "__main__":
    reader_proc = Process(target=reader)
    reader_proc.start()
    
    import time
    time.sleep(1)  # Let reader create FIFOs
    
    writer1 = Process(target=writer, args=(fifo1, ["A1", "A2", "A3"]))
    writer2 = Process(target=writer, args=(fifo2, ["B1", "B2", "B3"]))
    
    writer1.start()
    writer2.start()
    
    writer1.join()
    writer2.join()
    reader_proc.join()

读取器使用 select() 同时监视两个 FIFO。 每个写入器将其消息发送到各自的 FIFO。 读取器在收到消息时处理消息。

Select 可以有效地监视多个文件描述符。 如果没有数据到达,timeout 参数可防止无限期阻塞。

安全注意事项

最佳实践

资料来源

作者

我的名字是 Jan Bodnar,我是一位充满激情的程序员,拥有丰富的编程经验。 我自 2007 年以来一直在撰写编程文章。 迄今为止,我已经撰写了 1,400 多篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程