Python sqlite3.Connection.blobopen 方法
上次修改时间:2025 年 4 月 15 日
本综合指南探讨了 Python 的 sqlite3.Connection.blobopen 方法,用于在 SQLite 数据库中高效处理 BLOB。我们将介绍基本用法、参数和实际示例。
基本定义
blobopen 方法提供对 SQLite 中 BLOB 数据的直接访问,而无需将整个内容加载到内存中。它返回一个 Blob 对象,该对象支持类似文件的操作。
主要特点:它支持大型 BLOB 的流式传输,支持随机访问,并在事务中工作。该方法需要表名、列名、rowid 和可选的只读标志。
基本 BLOB 访问
此示例演示了如何从数据库表中打开和读取 BLOB。
import sqlite3
with sqlite3.connect('images.db') as conn:
# Create table and insert sample BLOB
conn.execute('''CREATE TABLE IF NOT EXISTS images
(id INTEGER PRIMARY KEY, name TEXT, data BLOB)''')
# Insert sample image (in real code, use actual binary data)
conn.execute("INSERT INTO images (name, data) VALUES (?, ?)",
('sample.png', b'PNG\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'))
# Get the last inserted rowid
rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
# Open the BLOB
blob = conn.blobopen('images', 'data', rowid)
print(blob.read()) # Read entire BLOB
blob.close()
此示例创建一个带有 BLOB 列的表,插入示例数据,并使用 blobopen 将其读回。BLOB 像文件对象一样被访问。
请注意,我们必须知道包含我们要访问的 BLOB 的记录的 rowid。BLOB 在完成后应关闭。
分块读取 BLOB
对于大型 BLOB,分块读取比一次性加载所有内容更节省内存。
import sqlite3
with sqlite3.connect('large_data.db') as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS documents
(id INTEGER PRIMARY KEY, name TEXT, content BLOB)''')
# Insert large document (simulated)
large_data = b'X' * (1024 * 1024) # 1MB of data
conn.execute("INSERT INTO documents (name, content) VALUES (?, ?)",
('bigfile.bin', large_data))
rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
# Read in 64KB chunks
with conn.blobopen('documents', 'content', rowid) as blob:
while chunk := blob.read(65536): # 64KB chunks
print(f"Read {len(chunk)} bytes")
此示例演示了以可管理的分块读取大型 BLOB。with 语句确保在读取后正确关闭 BLOB。
块大小(此处为 65536 字节)可以根据内存约束和性能要求进行调整。
写入 BLOB
blobopen 方法也可用于高效地写入 BLOB 数据。
import sqlite3
with sqlite3.connect('files.db') as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS files
(id INTEGER PRIMARY KEY, name TEXT, data BLOB)''')
# Create empty record to get rowid
conn.execute("INSERT INTO files (name) VALUES ('output.bin')")
rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
# Write data to BLOB
with conn.blobopen('files', 'data', rowid, readonly=False) as blob:
blob.write(b'HEADER')
blob.write(b'DATA' * 1000)
blob.write(b'FOOTER')
# Verify
data = conn.execute("SELECT data FROM files WHERE id = ?", (rowid,)).fetchone()[0]
print(f"Wrote {len(data)} bytes")
此示例演示如何将数据写入 BLOB 列。我们首先创建一个空记录以获取 rowid,然后在写入模式下打开 BLOB。
写入需要 readonly=False 参数。更改会在事务提交时提交。
BLOB 中的随机访问
BLOB 对象支持使用 seek 和 tell 方法进行随机访问,类似于文件对象。
import sqlite3
with sqlite3.connect('random.db') as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS binary_data
(id INTEGER PRIMARY KEY, tag TEXT, payload BLOB)''')
# Insert sample data
data = bytes(range(256)) # 0-255 bytes
conn.execute("INSERT INTO binary_data (tag, payload) VALUES (?, ?)",
('sample', data))
rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
# Access random positions
with conn.blobopen('binary_data', 'payload', rowid) as blob:
blob.seek(128) # Go to middle
print(f"Byte at 128: {blob.read(1)}")
print(f"Current position: {blob.tell()}")
blob.seek(-10, 2) # 10 bytes from end
print(f"Last 10 bytes: {blob.read()}")
此示例演示了 BLOB 中的随机访问。seek 方法移动当前位置,tell 报告当前位置。
seek 的第二个参数指定参考点(0=开始,1=当前,2=结束),类似于文件操作。
只读 BLOB 访问
BLOB 可以以只读模式打开,以便在仅需要读取时确保安全。
import sqlite3
with sqlite3.connect('products.db') as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS products
(id INTEGER PRIMARY KEY, name TEXT, image BLOB)''')
# Insert sample product
conn.execute("INSERT INTO products (name, image) VALUES (?, ?)",
('Widget', b'IMAGE_DATA'))
rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
# Open read-only
with conn.blobopen('products', 'image', rowid, readonly=True) as blob:
print(f"BLOB size: {blob.length()} bytes")
try:
blob.write(b'NEW') # Will raise error
except sqlite3.OperationalError as e:
print(f"Expected error: {e}")
此示例显示只读 BLOB 访问。尝试写入会引发 OperationalError。length 方法返回 BLOB 大小。
当您只需要读取数据并希望防止意外修改时,只读模式更安全。
带有显式事务的 BLOB
BLOB 操作参与事务,并且更改仅在提交后才永久生效。
import sqlite3
with sqlite3.connect('transactions.db') as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS versions
(id INTEGER PRIMARY KEY, version INTEGER, data BLOB)''')
# Start transaction
conn.execute("BEGIN")
try:
# Insert and get rowid
conn.execute("INSERT INTO versions (version) VALUES (1)")
rowid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
# Write to BLOB
with conn.blobopen('versions', 'data', rowid, readonly=False) as blob:
blob.write(b'VERSION_1_DATA')
# Verify before commit
data = conn.execute("SELECT data FROM versions WHERE id = ?", (rowid,)).fetchone()
print(f"Before commit: {data[0] is None}") # True - not committed yet
conn.commit() # Make changes permanent
# Verify after commit
data = conn.execute("SELECT data FROM versions WHERE id = ?", (rowid,)).fetchone()
print(f"After commit: {len(data[0])}") # Length of written data
except:
conn.rollback()
raise
此示例演示了 BLOB 操作的事务性质。更改仅在提交后对其他连接可见。
显式事务确保原子性 - 要么所有更改都成功,要么都不成功。BLOB 写入是此事务的一部分。
带有行工厂的 BLOB
将 BLOB 访问与行工厂结合使用可以方便地处理数据。
import sqlite3
def dict_factory(cursor, row):
return {col[0]: row[idx] for idx, col in enumerate(cursor.description)}
with sqlite3.connect('media.db') as conn:
conn.row_factory = dict_factory
conn.execute('''CREATE TABLE IF NOT EXISTS media
(id INTEGER PRIMARY KEY, title TEXT, content BLOB)''')
# Insert sample media
conn.execute("INSERT INTO media (title, content) VALUES (?, ?)",
('Song', b'AUDIO_DATA'))
# Query with row factory
row = conn.execute("SELECT id, title FROM media").fetchone()
print(f"Media title: {row['title']}")
# Access BLOB using rowid from the same row
with conn.blobopen('media', 'content', row['id']) as blob:
print(f"First 10 bytes: {blob.read(10)}")
此示例将 BLOB 访问与返回字典的行工厂结合使用。我们首先查询元数据,然后使用 rowid 访问 BLOB 内容。
当您需要在决定是否加载可能很大的 BLOB 内容之前检查元数据时,此模式非常有用。
最佳实践
- 始终关闭 BLOB: 使用上下文管理器或显式关闭
- 使用分块读取: 对于大型 BLOB 以节省内存
- 考虑只读: 仅需要读取时
- 处理事务: 记住 BLOB 写入需要提交
- 检查行是否存在: 在 blobopen 之前验证 rowid
资料来源
作者
列出所有 Python 教程。