Python sqlite3.Connection.interrupt 方法
上次修改时间:2025 年 4 月 15 日
本综合指南探讨了 Python 的 sqlite3.Connection.interrupt 方法,该方法允许中断长时间运行的 SQLite 查询。我们将介绍基本用法、实际示例和常见模式。
基本定义
interrupt 方法在连接对象上调用,以中止任何挂起的数据库操作。它会导致任何当前正在执行的 SQL 语句引发 OperationalError 异常。
主要特点:可以从任何线程调用,影响连接上的所有游标,并且是线程安全的。该方法对于实现查询超时或取消非常有用。
基本中断用法
此示例显示了 interrupt 取消长时间运行的查询的最简单用法。
import sqlite3
import threading
import time
def long_running_query(conn):
with conn:
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM large_table")
print("Query completed successfully")
except sqlite3.OperationalError as e:
print("Query was interrupted:", e)
# Create database and connection
with sqlite3.connect(':memory:') as conn:
conn.execute("CREATE TABLE large_table AS SELECT 1 a FROM generate_series(1, 1000000)")
# Start query in separate thread
thread = threading.Thread(target=long_running_query, args=(conn,))
thread.start()
# Wait briefly then interrupt
time.sleep(0.1)
conn.interrupt()
thread.join()
此示例创建一个大表并在单独的线程中启动一个查询。主线程在短暂延迟后中断查询。中断时,查询引发 OperationalError 异常。
从任何线程调用 interrupt 方法都是安全的,即使另一个线程正在执行查询。
超时实现
此示例演示了如何使用带有计时器线程的 interrupt 实现查询超时。
import sqlite3
import threading
def set_timeout(conn, seconds):
def interrupt():
conn.interrupt()
timer = threading.Timer(seconds, interrupt)
timer.start()
return timer
with sqlite3.connect(':memory:') as conn:
# Create test data
conn.execute("CREATE TABLE test AS SELECT 1 a FROM generate_series(1, 100000)")
try:
# Set 1 second timeout
timer = set_timeout(conn, 1.0)
cursor = conn.cursor()
cursor.execute("SELECT * FROM test JOIN test t2")
print("Query completed before timeout")
except sqlite3.OperationalError:
print("Query timed out")
finally:
timer.cancel()
此示例创建一个计时器线程,该线程在指定的超时后调用 interrupt。如果查询花费的时间过长,它将被中断。
计时器在 finally 块中取消,以确保在查询在超时之前完成时进行清理。
中断多个游标
此示例显示了 interrupt 如何影响连接上的所有游标。
import sqlite3
import threading
import time
def run_query(conn, query):
with conn:
cursor = conn.cursor()
try:
cursor.execute(query)
print(f"Query '{query}' completed")
except sqlite3.OperationalError:
print(f"Query '{query}' interrupted")
with sqlite3.connect(':memory:') as conn:
# Create test data
conn.execute("CREATE TABLE data AS SELECT 1 a FROM generate_series(1, 1000000)")
# Start two queries
t1 = threading.Thread(target=run_query, args=(conn, "SELECT * FROM data"))
t2 = threading.Thread(target=run_query, args=(conn, "SELECT COUNT(*) FROM data"))
t1.start()
t2.start()
# Interrupt both queries
time.sleep(0.1)
conn.interrupt()
t1.join()
t2.join()
此示例在同一连接上启动两个查询。调用 interrupt 会同时影响两个查询,表明它在连接级别运行。
调用此方法时,连接上的所有活动游标都将收到中断。
优雅的查询取消
此示例显示了如何使用 interrupt 实现优雅的取消模式。
import sqlite3
import threading
import time
class QueryExecutor:
def __init__(self, conn):
self.conn = conn
self.cancel_flag = False
def execute(self, query):
with self.conn:
cursor = self.conn.cursor()
try:
cursor.execute(query)
while not self.cancel_flag and cursor.fetchone():
pass
if self.cancel_flag:
print("Query canceled gracefully")
else:
print("Query completed")
except sqlite3.OperationalError:
print("Query interrupted")
with sqlite3.connect(':memory:') as conn:
conn.execute("CREATE TABLE data AS SELECT 1 a FROM generate_series(1, 1000000)")
executor = QueryExecutor(conn)
# Start query
thread = threading.Thread(target=executor.execute,
args=("SELECT * FROM data",))
thread.start()
# Cancel after delay
time.sleep(0.1)
executor.cancel_flag = True
conn.interrupt()
thread.join()
此示例将 interrupt 与取消标志结合使用,以实现更优雅的处理。查询在处理期间检查该标志。
该模式允许在完全中止查询之前进行清理操作,从而提供比立即中断更好的控制。
事务期间中断
此示例演示了事务期间 interrupt 的行为。
import sqlite3
import threading
import time
def run_transaction(conn):
with conn:
cursor = conn.cursor()
try:
cursor.execute("BEGIN")
cursor.execute("INSERT INTO test VALUES (1)")
time.sleep(1) # Simulate long operation
cursor.execute("INSERT INTO test VALUES (2)")
conn.commit()
print("Transaction committed")
except sqlite3.OperationalError:
print("Transaction interrupted")
conn.rollback()
with sqlite3.connect(':memory:') as conn:
conn.execute("CREATE TABLE test (id INTEGER)")
# Start transaction
thread = threading.Thread(target=run_transaction, args=(conn,))
thread.start()
# Interrupt during transaction
time.sleep(0.5)
conn.interrupt()
thread.join()
# Verify state
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM test")
print("Rows in table:", cursor.fetchone()[0])
此示例表明中断事务会导致它回滚。 interrupt 方法通过中止整个事务来保持数据库一致性。
最终计数显示未插入任何行,确认发生了回滚。
中断与连接池
此示例演示了如何在多线程应用程序中使用带有连接池的 interrupt。
import sqlite3
import threading
from queue import Queue
import time
class ConnectionPool:
def __init__(self, db_path, size=5):
self.pool = Queue(size)
for _ in range(size):
conn = sqlite3.connect(db_path)
self.pool.put(conn)
def get_connection(self):
return self.pool.get()
def return_connection(self, conn):
self.pool.put(conn)
def worker(pool, query):
conn = pool.get_connection()
try:
cursor = conn.cursor()
try:
cursor.execute(query)
print("Query completed")
except sqlite3.OperationalError:
print("Query interrupted")
finally:
pool.return_connection(conn)
# Setup
with sqlite3.connect('pool.db') as conn:
conn.execute("CREATE TABLE IF NOT EXISTS data (id INTEGER)")
conn.execute("INSERT INTO data VALUES (1)")
pool = ConnectionPool('pool.db')
# Start workers
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(pool, "SELECT * FROM data, data d2"))
t.start()
threads.append(t)
# Interrupt all connections
time.sleep(0.1)
for _ in range(3):
conn = pool.get_connection()
conn.interrupt()
pool.return_connection(conn)
for t in threads:
t.join()
此示例显示了一个连接池,其中每个连接都可以独立中断。该池有效地管理有限的数据库连接。
每个工作线程都从池中获取一个连接,我们中断所有连接以演示该模式。
中断与进度回调
此示例将 interrupt 与进度处理程序结合使用,以实现更复杂的控制。
import sqlite3
import threading
import time
class QueryMonitor:
def __init__(self, conn):
self.conn = conn
self.should_interrupt = False
conn.set_progress_handler(self.progress_handler, 1000)
def progress_handler(self):
if self.should_interrupt:
return 1 # Non-zero return interrupts operation
return 0
def execute_query(self, query):
cursor = self.conn.cursor()
try:
cursor.execute(query)
print("Query completed")
except sqlite3.OperationalError:
print("Query interrupted")
with sqlite3.connect(':memory:') as conn:
conn.execute("CREATE TABLE data AS SELECT 1 a FROM generate_series(1, 1000000)")
monitor = QueryMonitor(conn)
# Start query
thread = threading.Thread(target=monitor.execute_query,
args=("SELECT * FROM data",))
thread.start()
# Set interrupt flag after delay
time.sleep(0.1)
monitor.should_interrupt = True
thread.join()
此示例将 SQLite 的进度处理程序与 interrupt 一起使用。处理程序定期检查一个标志,并可以触发中断。
与单独使用 interrupt 相比,进度处理程序方法在查询执行期间提供更频繁的中断点。
最佳实践
- 谨慎使用:中断应该是特殊情况
- 清理资源:确保中断后正确清理
- 处理错误:始终捕获来自中断的 OperationalError 异常
- 考虑替代方案:超时可能比中断更好
- 记录用法:在你的 API 中明确中断行为
资料来源
作者
列出所有 Python 教程。