ZetCode

Python os.lockf 函数

上次修改时间:2025 年 4 月 11 日

本综合指南探讨了 Python 的 os.lockf 函数,该函数应用、测试或删除开放文件上的 POSIX 建议性锁。我们将涵盖锁类型、阻塞行为和实际的同步示例。

基本定义

os.lockf 函数操作打开的文件描述符上的文件锁。 它提供建议性锁定 - 进程必须通过检查锁来协作。 锁在文件关闭或进程退出时释放。

关键参数:fd(文件描述符)、cmd(锁操作:F_LOCK、F_TLOCK、F_ULOCK、F_TEST)、len(要锁定的字节数,0 表示 EOF)。 仅适用于类 Unix 系统。

基本文件锁定

此示例演示了获取和释放文件上的独占锁。 如果另一个进程持有锁,F_LOCK 命令会阻塞直到可以获取锁。

basic_lock.py
import os

file_path = "data.txt"

# Open file and acquire lock
with open(file_path, "w") as f:
    fd = f.fileno()
    os.lockf(fd, os.F_LOCK, 0)  # Lock entire file
    
    # Critical section
    f.write("Important data\n")
    
    # Lock automatically released when file closes
    print("Lock released automatically")

print("File closed and lock released")

该锁针对整个文件(len=0)获取,并在写入操作期间保持。 关闭文件时,锁会自动释放。

这确保一次只有一个进程可以写入文件,从而防止并发写入导致的数据损坏。

非阻塞锁定尝试

F_TLOCK 命令尝试获取锁而不阻塞。 如果锁不可用,它会引发 OSError 而不是等待。

nonblocking_lock.py
import os
import time

file_path = "shared.txt"

try:
    with open(file_path, "w") as f:
        fd = f.fileno()
        try:
            os.lockf(fd, os.F_TLOCK, 0)
            print("Lock acquired successfully")
            f.write("Exclusive data\n")
            time.sleep(2)  # Simulate work
        except OSError:
            print("Could not acquire lock - file is locked by another process")
except IOError as e:
    print(f"File error: {e}")

这尝试获取立即锁。 如果另一个进程持有锁,它会捕获 OSError 并继续,而不会阻塞程序。

当您想尝试一个操作,但不想无限期地等待资源时,非阻塞锁非常有用。

测试现有锁

F_TEST 命令检查是否可以获取锁,而无需实际获取它。 这对于在尝试操作之前测试锁状态很有用。

test_lock.py
import os

file_path = "status.log"

with open(file_path, "r+") as f:
    fd = f.fileno()
    
    try:
        os.lockf(fd, os.F_TEST, 0)
        print("File is not locked - safe to proceed")
        
        # Now acquire the lock for real
        os.lockf(fd, os.F_LOCK, 0)
        print("Lock acquired")
        
        # Read and update file
        data = f.read()
        f.seek(0)
        f.write(f"Updated: {data}")
        
    except OSError:
        print("File is currently locked by another process")

首先,我们测试文件是否被锁定,然后,如果可用,则继续进行实际锁定。 这种两步方法对于用户反馈很有用。

请注意,在测试和实际锁定之间仍然存在竞争条件 - 该文件可能在这些操作之间被另一个进程锁定。

部分文件锁定

我们可以锁定特定的字节范围,而不是锁定整个文件。 这允许不同的进程处理同一文件的不同部分。

partial_lock.py
import os

file_path = "database.bin"
record_size = 100

def update_record(record_num, data):
    with open(file_path, "r+b") as f:
        fd = f.fileno()
        offset = record_num * record_size
        
        # Lock only this record
        os.lockf(fd, os.F_LOCK, record_size)
        f.seek(offset)
        f.write(data)
        print(f"Updated record {record_num}")
        
        # Lock released when file closes

# Simulate updating different records
update_record(0, b"X" * 100)  # Lock first 100 bytes
update_record(1, b"Y" * 100)  # Lock next 100 bytes

每次调用仅锁定文件的特定 100 字节段,从而允许并发更新到不同的记录。 锁从当前文件位置开始。

这种技术对于数据库类应用程序非常有用,在这些应用程序中,不同的进程需要更新共享文件的不同部分。

进程同步

此示例演示了使用文件锁在多个进程之间进行协调,从而确保一次只有一个进程执行关键代码。

process_sync.py
import os
import time
from multiprocessing import Process

lock_file = "process.lock"

def worker(pid):
    print(f"Process {pid} starting")
    
    with open(lock_file, "w") as f:
        try:
            os.lockf(f.fileno(), os.F_LOCK, 0)
            print(f"Process {pid} acquired lock")
            time.sleep(2)  # Simulate work
            print(f"Process {pid} releasing lock")
        except OSError as e:
            print(f"Process {pid} lock failed: {e}")

# Create multiple processes
processes = [Process(target=worker, args=(i,)) for i in range(3)]
for p in processes:
    p.start()
for p in processes:
    p.join()

os.unlink(lock_file)

三个进程竞争锁。 第一个获取锁的进程执行其关键部分,而其他进程等待。 每个进程依次获得独占访问权限。

这种模式对于确保一次只有一个分布式作业实例跨共享文件系统的多台机器运行非常有用。

锁定超时模式

虽然 os.lockf 不直接支持超时,但我们可以通过将 F_TLOCK 与重试和延迟相结合来实现超时。

lock_timeout.py
import os
import time

file_path = "timed_lock.txt"
timeout = 5  # seconds
retry_interval = 0.1

def acquire_with_timeout():
    start_time = time.time()
    with open(file_path, "w") as f:
        fd = f.fileno()
        while True:
            try:
                os.lockf(fd, os.F_TLOCK, 0)
                print("Lock acquired")
                return True
            except OSError:
                if time.time() - start_time > timeout:
                    print("Timeout waiting for lock")
                    return False
                time.sleep(retry_interval)

if acquire_with_timeout():
    try:
        # Critical section
        print("Doing work...")
        time.sleep(1)
    finally:
        # Lock released when file closes
        pass
else:
    print("Failed to acquire lock within timeout")

这尝试以非阻塞方式获取锁,重试直到成功或超时到期。 重试间隔控制检查的频率。

当您需要限制等待资源的时间,然后再放弃或尝试其他方法时,此模式很有用。

解锁特定部分

F_ULOCK 命令释放先前获取的锁。 这比等待文件关闭可以进行更精细的控制,但必须与原始锁定范围匹配。

explicit_unlock.py
import os

file_path = "explicit.txt"

with open(file_path, "w+") as f:
    fd = f.fileno()
    
    # Lock first 50 bytes
    os.lockf(fd, os.F_LOCK, 50)
    f.write("Locked data")
    print("First section locked and written")
    
    # Explicitly unlock first section
    os.lockf(fd, os.F_ULOCK, 50)
    print("First section unlocked")
    
    # Now lock remaining file
    f.seek(50)
    os.lockf(fd, os.F_LOCK, 0)
    f.write("Second locked section")
    print("Remaining file locked and written")
    
    # Final unlock happens automatically

我们首先锁定并写入文件开头,然后在锁定其余部分之前显式释放该锁。 这显示了对锁定区域的细粒度控制。

当代码的不同部分需要独立于文件句柄生命周期来管理锁时,显式解锁非常有用。

安全注意事项

最佳实践

资料来源

作者

我叫 Jan Bodnar,是一位热情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直在撰写编程文章。 到目前为止,我已经撰写了 1,400 多篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程