ZetCode

Python sqlite3.Connection.set_progress_handler 方法

上次修改时间:2025 年 4 月 15 日

本综合指南探讨了 Python 的 sqlite3.Connection.set_progress_handler 方法,该方法允许监视长时间运行的 SQLite 操作。我们将介绍基本用法、实际示例和常见模式。

基本定义

set_progress_handler 方法注册一个回调函数,SQLite 将在长时间运行的操作期间定期调用该函数。 这使得能够进行进度监视和潜在的取消。

主要特征:回调每 N 个 SQLite 虚拟机操作调用一次,不接收任何参数,并且可以返回非零值以中止当前操作。 它对于长时间的查询或更新非常有用。

基本进度处理程序

此示例显示了 set_progress_handler 的最简单用法,用于监视数据库操作。

basic_handler.py
import sqlite3

def progress_callback():
    print("SQLite operation in progress...")
    return 0  # Return 0 to continue, non-zero to abort

with sqlite3.connect(':memory:') as conn:
    conn.set_progress_handler(progress_callback, 1000)
    cursor = conn.cursor()
    
    # Create a large table to trigger progress callbacks
    cursor.execute("CREATE TABLE test (id INTEGER, data TEXT)")
    cursor.executemany("INSERT INTO test VALUES (?, ?)", 
                      [(i, 'x'*100) for i in range(10000)])

每 1000 个 SQLite 虚拟机操作调用一次进度回调。 回调打印一条消息,但不中止操作(返回 0)。

此模式对于在交互式应用程序中的长时间运行的数据库操作期间提供反馈非常有用。

中止长时间的操作

此示例演示了如何使用进度处理程序中止长时间运行的操作。

abort_operation.py
import sqlite3
import time

class OperationAborted(Exception):
    pass

def progress_callback():
    print("Operation taking too long, aborting...")
    return 1  # Non-zero return aborts the operation

with sqlite3.connect(':memory:') as conn:
    conn.set_progress_handler(progress_callback, 1000)
    cursor = conn.cursor()
    
    try:
        cursor.execute("CREATE TABLE test (id INTEGER, data TEXT)")
        # This insert will be aborted by the progress handler
        cursor.executemany("INSERT INTO test VALUES (?, ?)", 
                          [(i, 'x'*100) for i in range(100000)])
    except sqlite3.OperationalError as e:
        print(f"Operation aborted: {e}")

进度回调在第一次调用后返回 1,导致 SQLite 中止当前操作。 这会引发 OperationalError

此技术对于在应用程序中实现查询超时或用户可取消的操作很有价值。

进度条实现

以下是如何使用进度处理程序实现简单的进度条。

progress_bar.py
import sqlite3
import sys

class ProgressTracker:
    def __init__(self, total_ops):
        self.count = 0
        self.total_ops = total_ops
    
    def __call__(self):
        self.count += 1
        progress = min(100, int((self.count * 100) / self.total_ops))
        sys.stdout.write(f"\rProgress: {progress}%")
        sys.stdout.flush()
        return 0

with sqlite3.connect(':memory:') as conn:
    tracker = ProgressTracker(500)
    conn.set_progress_handler(tracker, 100)
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE test (id INTEGER, data TEXT)")
    cursor.executemany("INSERT INTO test VALUES (?, ?)", 
                      [(i, 'x'*100) for i in range(5000)])
    print("\nOperation completed")

ProgressTracker 类维护回调调用之间的状态并更新进度百分比。 每 100 个操作调用一次回调。

这为用户在长时间的数据库操作期间提供视觉反馈,从而改善用户体验。

基于时间的进度监控

此示例显示了如何使用进度处理程序实现基于时间的的操作监控。

time_based.py
import sqlite3
import time

class TimeoutMonitor:
    def __init__(self, timeout_seconds):
        self.start_time = time.time()
        self.timeout = timeout_seconds
    
    def __call__(self):
        elapsed = time.time() - self.start_time
        if elapsed > self.timeout:
            print(f"\nTimeout after {elapsed:.1f} seconds")
            return 1  # Abort operation
        print(".", end="", flush=True)
        return 0

with sqlite3.connect(':memory:') as conn:
    monitor = TimeoutMonitor(2)  # 2 second timeout
    conn.set_progress_handler(monitor, 1000)
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE test (id INTEGER, data TEXT)")
    try:
        cursor.executemany("INSERT INTO test VALUES (?, ?)", 
                          [(i, 'x'*100) for i in range(100000)])
    except sqlite3.OperationalError:
        print("\nOperation timed out")

如果 TimeoutMonitor 类跟踪经过的时间并超过指定的超时时间,则中止该操作。 点打印为进度指示器。

此模式对于在生产系统中强制执行数据库操作的最长执行时间非常有用。

多操作进度跟踪

此示例演示了跨多个数据库操作跟踪进度。

multi_operation.py
import sqlite3

class MultiOpTracker:
    def __init__(self):
        self.operation_count = 0
        self.total_steps = 0
    
    def __call__(self):
        self.total_steps += 1
        if self.total_steps % 1000 == 0:
            print(f"Step {self.total_steps} of operation {self.operation_count}")
        return 0
    
    def begin_operation(self, name):
        self.operation_count += 1
        print(f"Starting operation {self.operation_count}: {name}")

with sqlite3.connect(':memory:') as conn:
    tracker = MultiOpTracker()
    conn.set_progress_handler(tracker, 100)
    
    cursor = conn.cursor()
    
    tracker.begin_operation("Create table")
    cursor.execute("CREATE TABLE test (id INTEGER, data TEXT)")
    
    tracker.begin_operation("Insert data")
    cursor.executemany("INSERT INTO test VALUES (?, ?)", 
                      [(i, 'x'*100) for i in range(5000)])
    
    tracker.begin_operation("Create index")
    cursor.execute("CREATE INDEX idx_test ON test(id)")

MultiOpTracker 维护跨多个操作的计数,从而提供详细的进度信息。 每个操作都单独跟踪。

这种方法对于由需要进度跟踪的多个步骤组成的复杂数据库操作非常有价值。

线程安全进度报告

此示例显示了如何使用进度处理程序实现线程安全进度报告。

thread_safe.py
import sqlite3
import threading

class ThreadSafeProgress:
    def __init__(self):
        self.lock = threading.Lock()
        self.counter = 0
    
    def __call__(self):
        with self.lock:
            self.counter += 1
            if self.counter % 1000 == 0:
                print(f"Operations completed: {self.counter}")
        return 0

def worker(db_path, progress):
    with sqlite3.connect(db_path) as conn:
        conn.set_progress_handler(progress, 100)
        cursor = conn.cursor()
        cursor.executemany("INSERT INTO test VALUES (?, ?)", 
                          [(i, 'x'*100) for i in range(5000)])

with sqlite3.connect(':memory:') as main_conn:
    cursor = main_conn.cursor()
    cursor.execute("CREATE TABLE test (id INTEGER, data TEXT)")
    
    progress = ThreadSafeProgress()
    threads = []
    
    for i in range(3):
        t = threading.Thread(target=worker, args=(':memory:', progress))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()

当从多个线程访问时,ThreadSafeProgress 类使用锁来保护共享状态。 每个线程通过同一处理程序实例报告进度。

在多线程应用程序中使用进度处理程序以防止进度报告中的竞争条件时,此模式至关重要。

具有状态的自定义进度处理程序

此高级示例演示了具有复杂状态管理和外部控制的进度处理程序。

stateful_handler.py
import sqlite3
import time

class StatefulProgress:
    def __init__(self, interval=1.0):
        self.last_report = 0
        self.interval = interval
        self.ops_count = 0
        self.should_abort = False
    
    def __call__(self):
        self.ops_count += 1
        now = time.time()
        
        if now - self.last_report >= self.interval:
            print(f"Operations: {self.ops_count}, Abort: {self.should_abort}")
            self.last_report = now
        
        return 1 if self.should_abort else 0

with sqlite3.connect(':memory:') as conn:
    progress = StatefulProgress(0.5)  # Report every 0.5 seconds
    conn.set_progress_handler(progress, 1000)
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE test (id INTEGER, data TEXT)")
    
    try:
        # Start long-running operation
        cursor.executemany("INSERT INTO test VALUES (?, ?)", 
                          [(i, 'x'*100) for i in range(100000)])
    except sqlite3.OperationalError:
        print("Operation aborted by user")
    
    # Simulate user abort after 3 seconds
    time.sleep(3)
    progress.should_abort = True

StatefulProgress 类维护详细的操作状态,并允许外部控制中止。 它以规则的时间间隔而不是操作计数来报告进度。

这种高级模式支持需要外部控制或复杂状态管理的复杂进度监视方案。

最佳实践

资料来源

作者

我的名字是 Jan Bodnar,我是一位充满热情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直在撰写编程文章。 迄今为止,我撰写了 1,400 多篇文章和 8 本电子书。 我在编程教学方面拥有超过十年的经验。

列出所有 Python 教程