Python sqlite3.Connection.rollback 方法
上次修改时间:2025 年 4 月 15 日
这份全面的指南探讨了 Python 的 sqlite3.Connection.rollback 方法,该方法对于 SQLite 数据库中的事务管理至关重要。
基本定义
rollback 方法会还原当前事务中所做的所有更改。 它将数据库恢复到事务开始之前的状态。
主要特性:它仅在非自动提交模式下有效,影响自上次提交以来的所有更改,并且通常在错误处理中使用以保持数据库一致性。
基本回滚示例
此示例演示了事务中回滚的基本用法。
import sqlite3
with sqlite3.connect('transactions.db') as conn:
try:
cursor = conn.cursor()
cursor.execute("INSERT INTO accounts (name, balance) VALUES (?, ?)",
('Bob', 1000))
# Simulate an error
raise ValueError("Something went wrong")
conn.commit()
except Exception as e:
print(f"Error occurred: {e}")
conn.rollback()
print("Transaction rolled back")
在此示例中,我们通过执行插入操作来启动事务。 发生错误时,我们会捕获它并调用 rollback 以撤消插入。
with 语句确保正确关闭连接,而 try-except 块处理错误时的事务回滚。
多操作事务中的回滚
此示例显示回滚如何影响事务中的多个操作。
import sqlite3
with sqlite3.connect('bank.db') as conn:
conn.execute("PRAGMA foreign_keys = ON") # Enable foreign key constraints
try:
cursor = conn.cursor()
# First operation
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
# Second operation
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
# Verify balances
cursor.execute("SELECT balance FROM accounts WHERE id = 1")
balance = cursor.fetchone()[0]
if balance < 0:
raise ValueError("Insufficient funds")
conn.commit()
except Exception as e:
conn.rollback()
print(f"Transaction failed: {e}. All changes reverted.")
在这里,我们执行两个更新操作作为资金转移的一部分。 如果任何检查失败或发生错误,则两个更新将一起回滚。
这演示了原子事务行为 - 要么所有操作都成功,要么都不成功。 外键约束确保引用完整性。
使用上下文管理器的回滚
此示例显示了上下文管理器如何简化回滚处理。
import sqlite3
from contextlib import contextmanager
@contextmanager
def transaction(conn):
try:
yield
conn.commit()
except:
conn.rollback()
raise
with sqlite3.connect('inventory.db') as conn:
with transaction(conn):
cursor = conn.cursor()
cursor.execute("UPDATE products SET stock = stock - 5 WHERE id = 101")
cursor.execute("UPDATE inventory SET last_updated = CURRENT_TIMESTAMP")
# If either update fails, both will be rolled back
我们创建一个自定义事务上下文管理器,该管理器根据是否发生异常自动处理提交/回滚。
这种模式减少了样板代码,并确保了整个应用程序中一致的事务处理。
使用保存点的回滚
此示例演示了如何将回滚与保存点一起使用以进行部分回滚。
import sqlite3
with sqlite3.connect('orders.db') as conn:
try:
cursor = conn.cursor()
# Main transaction
cursor.execute("INSERT INTO orders (customer_id) VALUES (42)")
order_id = cursor.lastrowid
# Create savepoint
cursor.execute("SAVEPOINT item_add")
try:
cursor.execute("INSERT INTO order_items (order_id, product_id) VALUES (?, ?)",
(order_id, 101))
cursor.execute("INSERT INTO order_items (order_id, product_id) VALUES (?, ?)",
(order_id, 205))
except sqlite3.Error as e:
print("Item addition failed, rolling back items")
cursor.execute("ROLLBACK TO SAVEPOINT item_add")
conn.commit()
except Exception as e:
conn.rollback()
print("Order creation failed:", e)
在这里,我们使用一个保存点来标记事务中的一个点。 如果添加项目失败,我们将仅回滚到保存点,从而保留订单创建。
保存点允许在单个事务中进行嵌套事务类行为。
自动提交模式下的回滚
此示例显示了回滚在自动提交模式下的不同行为。
import sqlite3
# Connect with isolation_level=None for autocommit mode
with sqlite3.connect('autocommit.db', isolation_level=None) as conn:
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS logs (message TEXT)")
try:
cursor.execute("INSERT INTO logs VALUES ('First message')")
# In autocommit mode, each statement is automatically committed
cursor.execute("INSERT INTO logs VALUES (NULL)") # Will fail
except sqlite3.IntegrityError:
print("Rollback in autocommit mode only affects current statement")
conn.rollback() # Has no effect on already committed statements
cursor.execute("SELECT COUNT(*) FROM logs")
print(f"Total logs: {cursor.fetchone()[0]}") # First insert remains
在自动提交模式 (isolation_level=None) 下,每个 SQL 语句都会自动提交。 回滚仅影响尚未完成的当前语句。
这证明了为什么对于多语句操作,显式事务管理通常是更可取的。
与连接池的回滚
此示例显示了 Web 应用程序中与连接池的回滚用法。
import sqlite3
from queue import Queue
class ConnectionPool:
def __init__(self, db_path, pool_size=5):
self._pool = Queue(pool_size)
for _ in range(pool_size):
conn = sqlite3.connect(db_path)
self._pool.put(conn)
def get_conn(self):
return self._pool.get()
def return_conn(self, conn):
# Ensure connection is in a good state before returning
try:
conn.rollback() # Rollback any pending transaction
except sqlite3.Error:
conn.close() # If rollback fails, discard the connection
conn = sqlite3.connect('app.db') # Create new connection
self._pool.put(conn)
# Usage example
pool = ConnectionPool('app.db')
conn = pool.get_conn()
try:
cursor = conn.cursor()
cursor.execute("UPDATE settings SET value = 'new' WHERE key = 'theme'")
conn.commit()
except Exception:
conn.rollback()
raise
finally:
pool.return_conn(conn)
此连接池实现会在将连接返回到池时自动回滚任何挂起的事务。
这可以防止使用池连接的应用程序的不同部分之间的事务状态泄漏。
并发访问场景中的回滚
此示例演示了在多个连接竞争时如何处理回滚。
import sqlite3
import threading
import time
def transfer_funds(from_acc, to_acc, amount):
with sqlite3.connect('bank.db', timeout=10.0) as conn:
conn.execute("PRAGMA busy_timeout = 5000") # 5 second timeout
try:
cursor = conn.cursor()
cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, from_acc))
cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, to_acc))
conn.commit()
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
print("Database locked, retrying...")
time.sleep(0.1)
transfer_funds(from_acc, to_acc, amount) # Retry
else:
conn.rollback()
raise
# Simulate concurrent transfers
threads = [
threading.Thread(target=transfer_funds, args=(1, 2, 100)),
threading.Thread(target=transfer_funds, args=(2, 1, 50))
]
for t in threads:
t.start()
for t in threads:
t.join()
此示例处理可能因并发访问而发生的数据库锁定问题。 如果数据库被锁定,我们会在延迟后重试该事务。
回滚可确保失败的事务在重试时不会使数据库处于不一致状态。
最佳实践
- 始终在错误处理中使用回滚: 确保正确还原失败的事务
- 与上下文管理器结合使用: 用于自动资源清理
- 考虑保存点: 对于需要部分回滚的复杂事务
- 处理数据库锁: 为并发访问实现重试逻辑
- 测试回滚场景: 验证您的错误处理是否按预期工作
资料来源
作者
列出所有 Python 教程。