ZetCode

Python sqlite3.Connection.blobopen 方法

上次修改时间:2025 年 4 月 15 日

本综合指南探讨了 Python 的 sqlite3.Connection.blobopen 方法,用于在 SQLite 数据库中高效处理 BLOB。我们将介绍基本用法、参数和实际示例。

基本定义

blobopen 方法提供对 SQLite 中 BLOB 数据的直接访问,而无需将整个内容加载到内存中。它返回一个 Blob 对象,该对象支持类似文件的操作。

主要特点:它支持大型 BLOB 的流式传输,支持随机访问,并在事务中工作。该方法需要表名、列名、rowid 和可选的只读标志。

基本 BLOB 访问

此示例演示了如何从数据库表中打开和读取 BLOB。

basic_blob.py
import sqlite3

with sqlite3.connect('images.db') as conn:
    # Create table and insert sample BLOB
    conn.execute('''CREATE TABLE IF NOT EXISTS images
                    (id INTEGER PRIMARY KEY, name TEXT, data BLOB)''')
    
    # Insert sample image (in real code, use actual binary data)
    conn.execute("INSERT INTO images (name, data) VALUES (?, ?)",
                 ('sample.png', b'PNG\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'))
    
    # Get the last inserted rowid
    rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
    
    # Open the BLOB
    blob = conn.blobopen('images', 'data', rowid)
    print(blob.read())  # Read entire BLOB
    blob.close()

此示例创建一个带有 BLOB 列的表,插入示例数据,并使用 blobopen 将其读回。BLOB 像文件对象一样被访问。

请注意,我们必须知道包含我们要访问的 BLOB 的记录的 rowid。BLOB 在完成后应关闭。

分块读取 BLOB

对于大型 BLOB,分块读取比一次性加载所有内容更节省内存。

chunked_read.py
import sqlite3

with sqlite3.connect('large_data.db') as conn:
    conn.execute('''CREATE TABLE IF NOT EXISTS documents
                    (id INTEGER PRIMARY KEY, name TEXT, content BLOB)''')
    
    # Insert large document (simulated)
    large_data = b'X' * (1024 * 1024)  # 1MB of data
    conn.execute("INSERT INTO documents (name, content) VALUES (?, ?)",
                 ('bigfile.bin', large_data))
    rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
    
    # Read in 64KB chunks
    with conn.blobopen('documents', 'content', rowid) as blob:
        while chunk := blob.read(65536):  # 64KB chunks
            print(f"Read {len(chunk)} bytes")

此示例演示了以可管理的分块读取大型 BLOB。with 语句确保在读取后正确关闭 BLOB。

块大小(此处为 65536 字节)可以根据内存约束和性能要求进行调整。

写入 BLOB

blobopen 方法也可用于高效地写入 BLOB 数据。

blob_write.py
import sqlite3

with sqlite3.connect('files.db') as conn:
    conn.execute('''CREATE TABLE IF NOT EXISTS files
                    (id INTEGER PRIMARY KEY, name TEXT, data BLOB)''')
    
    # Create empty record to get rowid
    conn.execute("INSERT INTO files (name) VALUES ('output.bin')")
    rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
    
    # Write data to BLOB
    with conn.blobopen('files', 'data', rowid, readonly=False) as blob:
        blob.write(b'HEADER')
        blob.write(b'DATA' * 1000)
        blob.write(b'FOOTER')
    
    # Verify
    data = conn.execute("SELECT data FROM files WHERE id = ?", (rowid,)).fetchone()[0]
    print(f"Wrote {len(data)} bytes")

此示例演示如何将数据写入 BLOB 列。我们首先创建一个空记录以获取 rowid,然后在写入模式下打开 BLOB。

写入需要 readonly=False 参数。更改会在事务提交时提交。

BLOB 中的随机访问

BLOB 对象支持使用 seektell 方法进行随机访问,类似于文件对象。

random_access.py
import sqlite3

with sqlite3.connect('random.db') as conn:
    conn.execute('''CREATE TABLE IF NOT EXISTS binary_data
                    (id INTEGER PRIMARY KEY, tag TEXT, payload BLOB)''')
    
    # Insert sample data
    data = bytes(range(256))  # 0-255 bytes
    conn.execute("INSERT INTO binary_data (tag, payload) VALUES (?, ?)",
                 ('sample', data))
    rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
    
    # Access random positions
    with conn.blobopen('binary_data', 'payload', rowid) as blob:
        blob.seek(128)  # Go to middle
        print(f"Byte at 128: {blob.read(1)}")
        print(f"Current position: {blob.tell()}")
        blob.seek(-10, 2)  # 10 bytes from end
        print(f"Last 10 bytes: {blob.read()}")

此示例演示了 BLOB 中的随机访问。seek 方法移动当前位置,tell 报告当前位置。

seek 的第二个参数指定参考点(0=开始,1=当前,2=结束),类似于文件操作。

只读 BLOB 访问

BLOB 可以以只读模式打开,以便在仅需要读取时确保安全。

readonly_blob.py
import sqlite3

with sqlite3.connect('products.db') as conn:
    conn.execute('''CREATE TABLE IF NOT EXISTS products
                    (id INTEGER PRIMARY KEY, name TEXT, image BLOB)''')
    
    # Insert sample product
    conn.execute("INSERT INTO products (name, image) VALUES (?, ?)",
                 ('Widget', b'IMAGE_DATA'))
    rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
    
    # Open read-only
    with conn.blobopen('products', 'image', rowid, readonly=True) as blob:
        print(f"BLOB size: {blob.length()} bytes")
        try:
            blob.write(b'NEW')  # Will raise error
        except sqlite3.OperationalError as e:
            print(f"Expected error: {e}")

此示例显示只读 BLOB 访问。尝试写入会引发 OperationalErrorlength 方法返回 BLOB 大小。

当您只需要读取数据并希望防止意外修改时,只读模式更安全。

带有显式事务的 BLOB

BLOB 操作参与事务,并且更改仅在提交后才永久生效。

blob_transaction.py
import sqlite3

with sqlite3.connect('transactions.db') as conn:
    conn.execute('''CREATE TABLE IF NOT EXISTS versions
                    (id INTEGER PRIMARY KEY, version INTEGER, data BLOB)''')
    
    # Start transaction
    conn.execute("BEGIN")
    try:
        # Insert and get rowid
        conn.execute("INSERT INTO versions (version) VALUES (1)")
        rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
        
        # Write to BLOB
        with conn.blobopen('versions', 'data', rowid, readonly=False) as blob:
            blob.write(b'VERSION_1_DATA')
        
        # Verify before commit
        data = conn.execute("SELECT data FROM versions WHERE id = ?", (rowid,)).fetchone()
        print(f"Before commit: {data[0] is None}")  # True - not committed yet
        
        conn.commit()  # Make changes permanent
        
        # Verify after commit
        data = conn.execute("SELECT data FROM versions WHERE id = ?", (rowid,)).fetchone()
        print(f"After commit: {len(data[0])}")  # Length of written data
    except:
        conn.rollback()
        raise

此示例演示了 BLOB 操作的事务性质。更改仅在提交后对其他连接可见。

显式事务确保原子性 - 要么所有更改都成功,要么都不成功。BLOB 写入是此事务的一部分。

带有行工厂的 BLOB

将 BLOB 访问与行工厂结合使用可以方便地处理数据。

blob_row_factory.py
import sqlite3

def dict_factory(cursor, row):
    return {col[0]: row[idx] for idx, col in enumerate(cursor.description)}

with sqlite3.connect('media.db') as conn:
    conn.row_factory = dict_factory
    conn.execute('''CREATE TABLE IF NOT EXISTS media
                    (id INTEGER PRIMARY KEY, title TEXT, content BLOB)''')
    
    # Insert sample media
    conn.execute("INSERT INTO media (title, content) VALUES (?, ?)",
                 ('Song', b'AUDIO_DATA'))
    
    # Query with row factory
    row = conn.execute("SELECT id, title FROM media").fetchone()
    print(f"Media title: {row['title']}")
    
    # Access BLOB using rowid from the same row
    with conn.blobopen('media', 'content', row['id']) as blob:
        print(f"First 10 bytes: {blob.read(10)}")

此示例将 BLOB 访问与返回字典的行工厂结合使用。我们首先查询元数据,然后使用 rowid 访问 BLOB 内容。

当您需要在决定是否加载可能很大的 BLOB 内容之前检查元数据时,此模式非常有用。

最佳实践

资料来源

作者

我叫 Jan Bodnar,是一位充满激情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直在撰写编程文章。 迄今为止,我已经撰写了 1,400 多篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程