ZetCode

Python sqlite3.Connection.rollback 方法

上次修改时间:2025 年 4 月 15 日

这份全面的指南探讨了 Python 的 sqlite3.Connection.rollback 方法,该方法对于 SQLite 数据库中的事务管理至关重要。

基本定义

rollback 方法会还原当前事务中所做的所有更改。 它将数据库恢复到事务开始之前的状态。

主要特性:它仅在非自动提交模式下有效,影响自上次提交以来的所有更改,并且通常在错误处理中使用以保持数据库一致性。

基本回滚示例

此示例演示了事务中回滚的基本用法。

basic_rollback.py
import sqlite3

with sqlite3.connect('transactions.db') as conn:
    try:
        cursor = conn.cursor()
        cursor.execute("INSERT INTO accounts (name, balance) VALUES (?, ?)", 
                      ('Bob', 1000))
        # Simulate an error
        raise ValueError("Something went wrong")
        conn.commit()
    except Exception as e:
        print(f"Error occurred: {e}")
        conn.rollback()
        print("Transaction rolled back")

在此示例中,我们通过执行插入操作来启动事务。 发生错误时,我们会捕获它并调用 rollback 以撤消插入。

with 语句确保正确关闭连接,而 try-except 块处理错误时的事务回滚。

多操作事务中的回滚

此示例显示回滚如何影响事务中的多个操作。

multi_op_rollback.py
import sqlite3

with sqlite3.connect('bank.db') as conn:
    conn.execute("PRAGMA foreign_keys = ON")  # Enable foreign key constraints
    try:
        cursor = conn.cursor()
        # First operation
        cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
        # Second operation
        cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
        # Verify balances
        cursor.execute("SELECT balance FROM accounts WHERE id = 1")
        balance = cursor.fetchone()[0]
        if balance < 0:
            raise ValueError("Insufficient funds")
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(f"Transaction failed: {e}. All changes reverted.")

在这里,我们执行两个更新操作作为资金转移的一部分。 如果任何检查失败或发生错误,则两个更新将一起回滚。

这演示了原子事务行为 - 要么所有操作都成功,要么都不成功。 外键约束确保引用完整性。

使用上下文管理器的回滚

此示例显示了上下文管理器如何简化回滚处理。

context_rollback.py
import sqlite3
from contextlib import contextmanager

@contextmanager
def transaction(conn):
    try:
        yield
        conn.commit()
    except:
        conn.rollback()
        raise

with sqlite3.connect('inventory.db') as conn:
    with transaction(conn):
        cursor = conn.cursor()
        cursor.execute("UPDATE products SET stock = stock - 5 WHERE id = 101")
        cursor.execute("UPDATE inventory SET last_updated = CURRENT_TIMESTAMP")
        # If either update fails, both will be rolled back

我们创建一个自定义事务上下文管理器,该管理器根据是否发生异常自动处理提交/回滚。

这种模式减少了样板代码,并确保了整个应用程序中一致的事务处理。

使用保存点的回滚

此示例演示了如何将回滚与保存点一起使用以进行部分回滚。

savepoint_rollback.py
import sqlite3

with sqlite3.connect('orders.db') as conn:
    try:
        cursor = conn.cursor()
        # Main transaction
        cursor.execute("INSERT INTO orders (customer_id) VALUES (42)")
        order_id = cursor.lastrowid
        
        # Create savepoint
        cursor.execute("SAVEPOINT item_add")
        try:
            cursor.execute("INSERT INTO order_items (order_id, product_id) VALUES (?, ?)",
                         (order_id, 101))
            cursor.execute("INSERT INTO order_items (order_id, product_id) VALUES (?, ?)",
                         (order_id, 205))
        except sqlite3.Error as e:
            print("Item addition failed, rolling back items")
            cursor.execute("ROLLBACK TO SAVEPOINT item_add")
        
        conn.commit()
    except Exception as e:
        conn.rollback()
        print("Order creation failed:", e)

在这里,我们使用一个保存点来标记事务中的一个点。 如果添加项目失败,我们将仅回滚到保存点,从而保留订单创建。

保存点允许在单个事务中进行嵌套事务类行为。

自动提交模式下的回滚

此示例显示了回滚在自动提交模式下的不同行为。

autocommit_rollback.py
import sqlite3

# Connect with isolation_level=None for autocommit mode
with sqlite3.connect('autocommit.db', isolation_level=None) as conn:
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS logs (message TEXT)")
    
    try:
        cursor.execute("INSERT INTO logs VALUES ('First message')")
        # In autocommit mode, each statement is automatically committed
        cursor.execute("INSERT INTO logs VALUES (NULL)")  # Will fail
    except sqlite3.IntegrityError:
        print("Rollback in autocommit mode only affects current statement")
        conn.rollback()  # Has no effect on already committed statements
        
    cursor.execute("SELECT COUNT(*) FROM logs")
    print(f"Total logs: {cursor.fetchone()[0]}")  # First insert remains

在自动提交模式 (isolation_level=None) 下,每个 SQL 语句都会自动提交。 回滚仅影响尚未完成的当前语句。

这证明了为什么对于多语句操作,显式事务管理通常是更可取的。

与连接池的回滚

此示例显示了 Web 应用程序中与连接池的回滚用法。

pooling_rollback.py
import sqlite3
from queue import Queue

class ConnectionPool:
    def __init__(self, db_path, pool_size=5):
        self._pool = Queue(pool_size)
        for _ in range(pool_size):
            conn = sqlite3.connect(db_path)
            self._pool.put(conn)
    
    def get_conn(self):
        return self._pool.get()
    
    def return_conn(self, conn):
        # Ensure connection is in a good state before returning
        try:
            conn.rollback()  # Rollback any pending transaction
        except sqlite3.Error:
            conn.close()  # If rollback fails, discard the connection
            conn = sqlite3.connect('app.db')  # Create new connection
        self._pool.put(conn)

# Usage example
pool = ConnectionPool('app.db')
conn = pool.get_conn()
try:
    cursor = conn.cursor()
    cursor.execute("UPDATE settings SET value = 'new' WHERE key = 'theme'")
    conn.commit()
except Exception:
    conn.rollback()
    raise
finally:
    pool.return_conn(conn)

此连接池实现会在将连接返回到池时自动回滚任何挂起的事务。

这可以防止使用池连接的应用程序的不同部分之间的事务状态泄漏。

并发访问场景中的回滚

此示例演示了在多个连接竞争时如何处理回滚。

concurrent_rollback.py
import sqlite3
import threading
import time

def transfer_funds(from_acc, to_acc, amount):
    with sqlite3.connect('bank.db', timeout=10.0) as conn:
        conn.execute("PRAGMA busy_timeout = 5000")  # 5 second timeout
        try:
            cursor = conn.cursor()
            cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?",
                         (amount, from_acc))
            cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?",
                         (amount, to_acc))
            conn.commit()
        except sqlite3.OperationalError as e:
            if "database is locked" in str(e):
                print("Database locked, retrying...")
                time.sleep(0.1)
                transfer_funds(from_acc, to_acc, amount)  # Retry
            else:
                conn.rollback()
                raise

# Simulate concurrent transfers
threads = [
    threading.Thread(target=transfer_funds, args=(1, 2, 100)),
    threading.Thread(target=transfer_funds, args=(2, 1, 50))
]
for t in threads:
    t.start()
for t in threads:
    t.join()

此示例处理可能因并发访问而发生的数据库锁定问题。 如果数据库被锁定,我们会在延迟后重试该事务。

回滚可确保失败的事务在重试时不会使数据库处于不一致状态。

最佳实践

资料来源

作者

我叫 Jan Bodnar,是一位充满激情的程序员,拥有丰富的编程经验。 我从 2007 年开始撰写编程文章。 迄今为止,我已经撰写了 1,400 多篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程