Python sqlite3.Connection.blobopen 方法
上次修改时间:2025 年 4 月 15 日
本综合指南探讨了 Python 的 sqlite3.Connection.blobopen
方法,用于在 SQLite 数据库中高效处理 BLOB。我们将介绍基本用法、参数和实际示例。
基本定义
blobopen
方法提供对 SQLite 中 BLOB 数据的直接访问,而无需将整个内容加载到内存中。它返回一个 Blob
对象,该对象支持类似文件的操作。
主要特点:它支持大型 BLOB 的流式传输,支持随机访问,并在事务中工作。该方法需要表名、列名、rowid 和可选的只读标志。
基本 BLOB 访问
此示例演示了如何从数据库表中打开和读取 BLOB。
import sqlite3 with sqlite3.connect('images.db') as conn: # Create table and insert sample BLOB conn.execute('''CREATE TABLE IF NOT EXISTS images (id INTEGER PRIMARY KEY, name TEXT, data BLOB)''') # Insert sample image (in real code, use actual binary data) conn.execute("INSERT INTO images (name, data) VALUES (?, ?)", ('sample.png', b'PNG\x89\x50\x4E\x47\x0D\x0A\x1A\x0A')) # Get the last inserted rowid rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] # Open the BLOB blob = conn.blobopen('images', 'data', rowid) print(blob.read()) # Read entire BLOB blob.close()
此示例创建一个带有 BLOB 列的表,插入示例数据,并使用 blobopen
将其读回。BLOB 像文件对象一样被访问。
请注意,我们必须知道包含我们要访问的 BLOB 的记录的 rowid。BLOB 在完成后应关闭。
分块读取 BLOB
对于大型 BLOB,分块读取比一次性加载所有内容更节省内存。
import sqlite3 with sqlite3.connect('large_data.db') as conn: conn.execute('''CREATE TABLE IF NOT EXISTS documents (id INTEGER PRIMARY KEY, name TEXT, content BLOB)''') # Insert large document (simulated) large_data = b'X' * (1024 * 1024) # 1MB of data conn.execute("INSERT INTO documents (name, content) VALUES (?, ?)", ('bigfile.bin', large_data)) rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] # Read in 64KB chunks with conn.blobopen('documents', 'content', rowid) as blob: while chunk := blob.read(65536): # 64KB chunks print(f"Read {len(chunk)} bytes")
此示例演示了以可管理的分块读取大型 BLOB。with
语句确保在读取后正确关闭 BLOB。
块大小(此处为 65536 字节)可以根据内存约束和性能要求进行调整。
写入 BLOB
blobopen
方法也可用于高效地写入 BLOB 数据。
import sqlite3 with sqlite3.connect('files.db') as conn: conn.execute('''CREATE TABLE IF NOT EXISTS files (id INTEGER PRIMARY KEY, name TEXT, data BLOB)''') # Create empty record to get rowid conn.execute("INSERT INTO files (name) VALUES ('output.bin')") rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] # Write data to BLOB with conn.blobopen('files', 'data', rowid, readonly=False) as blob: blob.write(b'HEADER') blob.write(b'DATA' * 1000) blob.write(b'FOOTER') # Verify data = conn.execute("SELECT data FROM files WHERE id = ?", (rowid,)).fetchone()[0] print(f"Wrote {len(data)} bytes")
此示例演示如何将数据写入 BLOB 列。我们首先创建一个空记录以获取 rowid,然后在写入模式下打开 BLOB。
写入需要 readonly=False
参数。更改会在事务提交时提交。
BLOB 中的随机访问
BLOB 对象支持使用 seek
和 tell
方法进行随机访问,类似于文件对象。
import sqlite3 with sqlite3.connect('random.db') as conn: conn.execute('''CREATE TABLE IF NOT EXISTS binary_data (id INTEGER PRIMARY KEY, tag TEXT, payload BLOB)''') # Insert sample data data = bytes(range(256)) # 0-255 bytes conn.execute("INSERT INTO binary_data (tag, payload) VALUES (?, ?)", ('sample', data)) rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] # Access random positions with conn.blobopen('binary_data', 'payload', rowid) as blob: blob.seek(128) # Go to middle print(f"Byte at 128: {blob.read(1)}") print(f"Current position: {blob.tell()}") blob.seek(-10, 2) # 10 bytes from end print(f"Last 10 bytes: {blob.read()}")
此示例演示了 BLOB 中的随机访问。seek
方法移动当前位置,tell
报告当前位置。
seek
的第二个参数指定参考点(0=开始,1=当前,2=结束),类似于文件操作。
只读 BLOB 访问
BLOB 可以以只读模式打开,以便在仅需要读取时确保安全。
import sqlite3 with sqlite3.connect('products.db') as conn: conn.execute('''CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY, name TEXT, image BLOB)''') # Insert sample product conn.execute("INSERT INTO products (name, image) VALUES (?, ?)", ('Widget', b'IMAGE_DATA')) rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] # Open read-only with conn.blobopen('products', 'image', rowid, readonly=True) as blob: print(f"BLOB size: {blob.length()} bytes") try: blob.write(b'NEW') # Will raise error except sqlite3.OperationalError as e: print(f"Expected error: {e}")
此示例显示只读 BLOB 访问。尝试写入会引发 OperationalError
。length
方法返回 BLOB 大小。
当您只需要读取数据并希望防止意外修改时,只读模式更安全。
带有显式事务的 BLOB
BLOB 操作参与事务,并且更改仅在提交后才永久生效。
import sqlite3 with sqlite3.connect('transactions.db') as conn: conn.execute('''CREATE TABLE IF NOT EXISTS versions (id INTEGER PRIMARY KEY, version INTEGER, data BLOB)''') # Start transaction conn.execute("BEGIN") try: # Insert and get rowid conn.execute("INSERT INTO versions (version) VALUES (1)") rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] # Write to BLOB with conn.blobopen('versions', 'data', rowid, readonly=False) as blob: blob.write(b'VERSION_1_DATA') # Verify before commit data = conn.execute("SELECT data FROM versions WHERE id = ?", (rowid,)).fetchone() print(f"Before commit: {data[0] is None}") # True - not committed yet conn.commit() # Make changes permanent # Verify after commit data = conn.execute("SELECT data FROM versions WHERE id = ?", (rowid,)).fetchone() print(f"After commit: {len(data[0])}") # Length of written data except: conn.rollback() raise
此示例演示了 BLOB 操作的事务性质。更改仅在提交后对其他连接可见。
显式事务确保原子性 - 要么所有更改都成功,要么都不成功。BLOB 写入是此事务的一部分。
带有行工厂的 BLOB
将 BLOB 访问与行工厂结合使用可以方便地处理数据。
import sqlite3 def dict_factory(cursor, row): return {col[0]: row[idx] for idx, col in enumerate(cursor.description)} with sqlite3.connect('media.db') as conn: conn.row_factory = dict_factory conn.execute('''CREATE TABLE IF NOT EXISTS media (id INTEGER PRIMARY KEY, title TEXT, content BLOB)''') # Insert sample media conn.execute("INSERT INTO media (title, content) VALUES (?, ?)", ('Song', b'AUDIO_DATA')) # Query with row factory row = conn.execute("SELECT id, title FROM media").fetchone() print(f"Media title: {row['title']}") # Access BLOB using rowid from the same row with conn.blobopen('media', 'content', row['id']) as blob: print(f"First 10 bytes: {blob.read(10)}")
此示例将 BLOB 访问与返回字典的行工厂结合使用。我们首先查询元数据,然后使用 rowid 访问 BLOB 内容。
当您需要在决定是否加载可能很大的 BLOB 内容之前检查元数据时,此模式非常有用。
最佳实践
- 始终关闭 BLOB: 使用上下文管理器或显式关闭
- 使用分块读取: 对于大型 BLOB 以节省内存
- 考虑只读: 仅需要读取时
- 处理事务: 记住 BLOB 写入需要提交
- 检查行是否存在: 在 blobopen 之前验证 rowid
资料来源
作者
列出所有 Python 教程。