ZetCode

Python sqlite3.Connection.interrupt 方法

上次修改时间:2025 年 4 月 15 日

本综合指南探讨了 Python 的 sqlite3.Connection.interrupt 方法,该方法允许中断长时间运行的 SQLite 查询。我们将介绍基本用法、实际示例和常见模式。

基本定义

interrupt 方法在连接对象上调用,以中止任何挂起的数据库操作。它会导致任何当前正在执行的 SQL 语句引发 OperationalError 异常。

主要特点:可以从任何线程调用,影响连接上的所有游标,并且是线程安全的。该方法对于实现查询超时或取消非常有用。

基本中断用法

此示例显示了 interrupt 取消长时间运行的查询的最简单用法。

basic_interrupt.py
import sqlite3
import threading
import time

def long_running_query(conn):
    with conn:
        cursor = conn.cursor()
        try:
            cursor.execute("SELECT * FROM large_table")
            print("Query completed successfully")
        except sqlite3.OperationalError as e:
            print("Query was interrupted:", e)

# Create database and connection
with sqlite3.connect(':memory:') as conn:
    conn.execute("CREATE TABLE large_table AS SELECT 1 a FROM generate_series(1, 1000000)")
    
    # Start query in separate thread
    thread = threading.Thread(target=long_running_query, args=(conn,))
    thread.start()
    
    # Wait briefly then interrupt
    time.sleep(0.1)
    conn.interrupt()
    thread.join()

此示例创建一个大表并在单独的线程中启动一个查询。主线程在短暂延迟后中断查询。中断时,查询引发 OperationalError 异常。

从任何线程调用 interrupt 方法都是安全的,即使另一个线程正在执行查询。

超时实现

此示例演示了如何使用带有计时器线程的 interrupt 实现查询超时。

query_timeout.py
import sqlite3
import threading

def set_timeout(conn, seconds):
    def interrupt():
        conn.interrupt()
    timer = threading.Timer(seconds, interrupt)
    timer.start()
    return timer

with sqlite3.connect(':memory:') as conn:
    # Create test data
    conn.execute("CREATE TABLE test AS SELECT 1 a FROM generate_series(1, 100000)")
    
    try:
        # Set 1 second timeout
        timer = set_timeout(conn, 1.0)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM test JOIN test t2")
        print("Query completed before timeout")
    except sqlite3.OperationalError:
        print("Query timed out")
    finally:
        timer.cancel()

此示例创建一个计时器线程,该线程在指定的超时后调用 interrupt。如果查询花费的时间过长,它将被中断。

计时器在 finally 块中取消,以确保在查询在超时之前完成时进行清理。

中断多个游标

此示例显示了 interrupt 如何影响连接上的所有游标。

multiple_cursors.py
import sqlite3
import threading
import time

def run_query(conn, query):
    with conn:
        cursor = conn.cursor()
        try:
            cursor.execute(query)
            print(f"Query '{query}' completed")
        except sqlite3.OperationalError:
            print(f"Query '{query}' interrupted")

with sqlite3.connect(':memory:') as conn:
    # Create test data
    conn.execute("CREATE TABLE data AS SELECT 1 a FROM generate_series(1, 1000000)")
    
    # Start two queries
    t1 = threading.Thread(target=run_query, args=(conn, "SELECT * FROM data"))
    t2 = threading.Thread(target=run_query, args=(conn, "SELECT COUNT(*) FROM data"))
    t1.start()
    t2.start()
    
    # Interrupt both queries
    time.sleep(0.1)
    conn.interrupt()
    t1.join()
    t2.join()

此示例在同一连接上启动两个查询。调用 interrupt 会同时影响两个查询,表明它在连接级别运行。

调用此方法时,连接上的所有活动游标都将收到中断。

优雅的查询取消

此示例显示了如何使用 interrupt 实现优雅的取消模式。

graceful_cancel.py
import sqlite3
import threading
import time

class QueryExecutor:
    def __init__(self, conn):
        self.conn = conn
        self.cancel_flag = False
        
    def execute(self, query):
        with self.conn:
            cursor = self.conn.cursor()
            try:
                cursor.execute(query)
                while not self.cancel_flag and cursor.fetchone():
                    pass
                if self.cancel_flag:
                    print("Query canceled gracefully")
                else:
                    print("Query completed")
            except sqlite3.OperationalError:
                print("Query interrupted")

with sqlite3.connect(':memory:') as conn:
    conn.execute("CREATE TABLE data AS SELECT 1 a FROM generate_series(1, 1000000)")
    executor = QueryExecutor(conn)
    
    # Start query
    thread = threading.Thread(target=executor.execute, 
                            args=("SELECT * FROM data",))
    thread.start()
    
    # Cancel after delay
    time.sleep(0.1)
    executor.cancel_flag = True
    conn.interrupt()
    thread.join()

此示例将 interrupt 与取消标志结合使用,以实现更优雅的处理。查询在处理期间检查该标志。

该模式允许在完全中止查询之前进行清理操作,从而提供比立即中断更好的控制。

事务期间中断

此示例演示了事务期间 interrupt 的行为。

transaction_interrupt.py
import sqlite3
import threading
import time

def run_transaction(conn):
    with conn:
        cursor = conn.cursor()
        try:
            cursor.execute("BEGIN")
            cursor.execute("INSERT INTO test VALUES (1)")
            time.sleep(1)  # Simulate long operation
            cursor.execute("INSERT INTO test VALUES (2)")
            conn.commit()
            print("Transaction committed")
        except sqlite3.OperationalError:
            print("Transaction interrupted")
            conn.rollback()

with sqlite3.connect(':memory:') as conn:
    conn.execute("CREATE TABLE test (id INTEGER)")
    
    # Start transaction
    thread = threading.Thread(target=run_transaction, args=(conn,))
    thread.start()
    
    # Interrupt during transaction
    time.sleep(0.5)
    conn.interrupt()
    thread.join()
    
    # Verify state
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM test")
    print("Rows in table:", cursor.fetchone()[0])

此示例表明中断事务会导致它回滚。 interrupt 方法通过中止整个事务来保持数据库一致性。

最终计数显示未插入任何行,确认发生了回滚。

中断与连接池

此示例演示了如何在多线程应用程序中使用带有连接池的 interrupt

connection_pool.py
import sqlite3
import threading
from queue import Queue
import time

class ConnectionPool:
    def __init__(self, db_path, size=5):
        self.pool = Queue(size)
        for _ in range(size):
            conn = sqlite3.connect(db_path)
            self.pool.put(conn)
            
    def get_connection(self):
        return self.pool.get()
    
    def return_connection(self, conn):
        self.pool.put(conn)

def worker(pool, query):
    conn = pool.get_connection()
    try:
        cursor = conn.cursor()
        try:
            cursor.execute(query)
            print("Query completed")
        except sqlite3.OperationalError:
            print("Query interrupted")
    finally:
        pool.return_connection(conn)

# Setup
with sqlite3.connect('pool.db') as conn:
    conn.execute("CREATE TABLE IF NOT EXISTS data (id INTEGER)")
    conn.execute("INSERT INTO data VALUES (1)")

pool = ConnectionPool('pool.db')

# Start workers
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(pool, "SELECT * FROM data, data d2"))
    t.start()
    threads.append(t)

# Interrupt all connections
time.sleep(0.1)
for _ in range(3):
    conn = pool.get_connection()
    conn.interrupt()
    pool.return_connection(conn)

for t in threads:
    t.join()

此示例显示了一个连接池,其中每个连接都可以独立中断。该池有效地管理有限的数据库连接。

每个工作线程都从池中获取一个连接,我们中断所有连接以演示该模式。

中断与进度回调

此示例将 interrupt 与进度处理程序结合使用,以实现更复杂的控制。

progress_callback.py
import sqlite3
import threading
import time

class QueryMonitor:
    def __init__(self, conn):
        self.conn = conn
        self.should_interrupt = False
        conn.set_progress_handler(self.progress_handler, 1000)
        
    def progress_handler(self):
        if self.should_interrupt:
            return 1  # Non-zero return interrupts operation
        return 0
    
    def execute_query(self, query):
        cursor = self.conn.cursor()
        try:
            cursor.execute(query)
            print("Query completed")
        except sqlite3.OperationalError:
            print("Query interrupted")

with sqlite3.connect(':memory:') as conn:
    conn.execute("CREATE TABLE data AS SELECT 1 a FROM generate_series(1, 1000000)")
    monitor = QueryMonitor(conn)
    
    # Start query
    thread = threading.Thread(target=monitor.execute_query,
                            args=("SELECT * FROM data",))
    thread.start()
    
    # Set interrupt flag after delay
    time.sleep(0.1)
    monitor.should_interrupt = True
    thread.join()

此示例将 SQLite 的进度处理程序与 interrupt 一起使用。处理程序定期检查一个标志,并可以触发中断。

与单独使用 interrupt 相比,进度处理程序方法在查询执行期间提供更频繁的中断点。

最佳实践

资料来源

作者

我叫 Jan Bodnar,我是一位充满热情的程序员,拥有丰富的编程经验。 我从 2007 年开始撰写编程文章。到目前为止,我已经撰写了超过 1,400 篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程