Python sqlite3.Connection.rollback 方法
上次修改时间:2025 年 4 月 15 日
这份全面的指南探讨了 Python 的 sqlite3.Connection.rollback
方法,该方法对于 SQLite 数据库中的事务管理至关重要。
基本定义
rollback
方法会还原当前事务中所做的所有更改。 它将数据库恢复到事务开始之前的状态。
主要特性:它仅在非自动提交模式下有效,影响自上次提交以来的所有更改,并且通常在错误处理中使用以保持数据库一致性。
基本回滚示例
此示例演示了事务中回滚的基本用法。
import sqlite3 with sqlite3.connect('transactions.db') as conn: try: cursor = conn.cursor() cursor.execute("INSERT INTO accounts (name, balance) VALUES (?, ?)", ('Bob', 1000)) # Simulate an error raise ValueError("Something went wrong") conn.commit() except Exception as e: print(f"Error occurred: {e}") conn.rollback() print("Transaction rolled back")
在此示例中,我们通过执行插入操作来启动事务。 发生错误时,我们会捕获它并调用 rollback 以撤消插入。
with 语句确保正确关闭连接,而 try-except 块处理错误时的事务回滚。
多操作事务中的回滚
此示例显示回滚如何影响事务中的多个操作。
import sqlite3 with sqlite3.connect('bank.db') as conn: conn.execute("PRAGMA foreign_keys = ON") # Enable foreign key constraints try: cursor = conn.cursor() # First operation cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") # Second operation cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") # Verify balances cursor.execute("SELECT balance FROM accounts WHERE id = 1") balance = cursor.fetchone()[0] if balance < 0: raise ValueError("Insufficient funds") conn.commit() except Exception as e: conn.rollback() print(f"Transaction failed: {e}. All changes reverted.")
在这里,我们执行两个更新操作作为资金转移的一部分。 如果任何检查失败或发生错误,则两个更新将一起回滚。
这演示了原子事务行为 - 要么所有操作都成功,要么都不成功。 外键约束确保引用完整性。
使用上下文管理器的回滚
此示例显示了上下文管理器如何简化回滚处理。
import sqlite3 from contextlib import contextmanager @contextmanager def transaction(conn): try: yield conn.commit() except: conn.rollback() raise with sqlite3.connect('inventory.db') as conn: with transaction(conn): cursor = conn.cursor() cursor.execute("UPDATE products SET stock = stock - 5 WHERE id = 101") cursor.execute("UPDATE inventory SET last_updated = CURRENT_TIMESTAMP") # If either update fails, both will be rolled back
我们创建一个自定义事务上下文管理器,该管理器根据是否发生异常自动处理提交/回滚。
这种模式减少了样板代码,并确保了整个应用程序中一致的事务处理。
使用保存点的回滚
此示例演示了如何将回滚与保存点一起使用以进行部分回滚。
import sqlite3 with sqlite3.connect('orders.db') as conn: try: cursor = conn.cursor() # Main transaction cursor.execute("INSERT INTO orders (customer_id) VALUES (42)") order_id = cursor.lastrowid # Create savepoint cursor.execute("SAVEPOINT item_add") try: cursor.execute("INSERT INTO order_items (order_id, product_id) VALUES (?, ?)", (order_id, 101)) cursor.execute("INSERT INTO order_items (order_id, product_id) VALUES (?, ?)", (order_id, 205)) except sqlite3.Error as e: print("Item addition failed, rolling back items") cursor.execute("ROLLBACK TO SAVEPOINT item_add") conn.commit() except Exception as e: conn.rollback() print("Order creation failed:", e)
在这里,我们使用一个保存点来标记事务中的一个点。 如果添加项目失败,我们将仅回滚到保存点,从而保留订单创建。
保存点允许在单个事务中进行嵌套事务类行为。
自动提交模式下的回滚
此示例显示了回滚在自动提交模式下的不同行为。
import sqlite3 # Connect with isolation_level=None for autocommit mode with sqlite3.connect('autocommit.db', isolation_level=None) as conn: cursor = conn.cursor() cursor.execute("CREATE TABLE IF NOT EXISTS logs (message TEXT)") try: cursor.execute("INSERT INTO logs VALUES ('First message')") # In autocommit mode, each statement is automatically committed cursor.execute("INSERT INTO logs VALUES (NULL)") # Will fail except sqlite3.IntegrityError: print("Rollback in autocommit mode only affects current statement") conn.rollback() # Has no effect on already committed statements cursor.execute("SELECT COUNT(*) FROM logs") print(f"Total logs: {cursor.fetchone()[0]}") # First insert remains
在自动提交模式 (isolation_level=None) 下,每个 SQL 语句都会自动提交。 回滚仅影响尚未完成的当前语句。
这证明了为什么对于多语句操作,显式事务管理通常是更可取的。
与连接池的回滚
此示例显示了 Web 应用程序中与连接池的回滚用法。
import sqlite3 from queue import Queue class ConnectionPool: def __init__(self, db_path, pool_size=5): self._pool = Queue(pool_size) for _ in range(pool_size): conn = sqlite3.connect(db_path) self._pool.put(conn) def get_conn(self): return self._pool.get() def return_conn(self, conn): # Ensure connection is in a good state before returning try: conn.rollback() # Rollback any pending transaction except sqlite3.Error: conn.close() # If rollback fails, discard the connection conn = sqlite3.connect('app.db') # Create new connection self._pool.put(conn) # Usage example pool = ConnectionPool('app.db') conn = pool.get_conn() try: cursor = conn.cursor() cursor.execute("UPDATE settings SET value = 'new' WHERE key = 'theme'") conn.commit() except Exception: conn.rollback() raise finally: pool.return_conn(conn)
此连接池实现会在将连接返回到池时自动回滚任何挂起的事务。
这可以防止使用池连接的应用程序的不同部分之间的事务状态泄漏。
并发访问场景中的回滚
此示例演示了在多个连接竞争时如何处理回滚。
import sqlite3 import threading import time def transfer_funds(from_acc, to_acc, amount): with sqlite3.connect('bank.db', timeout=10.0) as conn: conn.execute("PRAGMA busy_timeout = 5000") # 5 second timeout try: cursor = conn.cursor() cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_acc)) cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_acc)) conn.commit() except sqlite3.OperationalError as e: if "database is locked" in str(e): print("Database locked, retrying...") time.sleep(0.1) transfer_funds(from_acc, to_acc, amount) # Retry else: conn.rollback() raise # Simulate concurrent transfers threads = [ threading.Thread(target=transfer_funds, args=(1, 2, 100)), threading.Thread(target=transfer_funds, args=(2, 1, 50)) ] for t in threads: t.start() for t in threads: t.join()
此示例处理可能因并发访问而发生的数据库锁定问题。 如果数据库被锁定,我们会在延迟后重试该事务。
回滚可确保失败的事务在重试时不会使数据库处于不一致状态。
最佳实践
- 始终在错误处理中使用回滚: 确保正确还原失败的事务
- 与上下文管理器结合使用: 用于自动资源清理
- 考虑保存点: 对于需要部分回滚的复杂事务
- 处理数据库锁: 为并发访问实现重试逻辑
- 测试回滚场景: 验证您的错误处理是否按预期工作
资料来源
作者
列出所有 Python 教程。