Python sqlite3.Cursor.arraysize 属性
上次修改时间:2025 年 4 月 15 日
这份综合指南探讨了 Python 的 sqlite3.Cursor.arraysize 属性。我们将介绍它的用途、优化优势以及在 SQLite 数据库中的实际使用示例。
基本定义
SQLite 游标的 arraysize 属性决定了调用 fetchmany 时一次获取多少行。它通过减少往返次数来优化数据库操作。
默认值为 1,这意味着每次 fetchmany 调用检索一行。增加此值可以提高处理大型结果集时的性能。
默认 arraysize 行为
此示例演示了未显式设置时 arraysize 的默认行为。游标一次获取一行。
import sqlite3
with sqlite3.connect(':memory:') as conn:
conn.execute('CREATE TABLE test (id INTEGER, data TEXT)')
conn.executemany('INSERT INTO test VALUES (?, ?)', [(i, f'Data {i}') for i in range(1, 6)])
with conn.cursor() as cursor:
cursor.execute('SELECT * FROM test')
print(f"Default arraysize: {cursor.arraysize}") # Output: 1
# fetchmany() returns 1 row by default
print(cursor.fetchmany()) # [(1, 'Data 1')]
print(cursor.fetchmany(2)) # [(2, 'Data 2'), (3, 'Data 3')]
该示例表明,在不设置 arraysize 的情况下,它默认为 1。fetchmany 方法仍然可以通过指定参数来获取更多行。
这种默认行为是安全的,但对于通常一起处理多个行的大型数据集而言,可能不是最佳的。
设置 arraysize 进行批量获取
在这里,我们设置 arraysize 以一次获取多个行,从而提高大型结果集的性能。
import sqlite3
with sqlite3.connect('products.db') as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS products
(id INTEGER PRIMARY KEY, name TEXT, price REAL)''')
conn.executemany('INSERT INTO products (name, price) VALUES (?, ?)',
[('Laptop', 999.99), ('Phone', 699.99), ('Tablet', 399.99),
('Monitor', 249.99), ('Keyboard', 49.99)])
with conn.cursor() as cursor:
cursor.arraysize = 3 # Set batch size to 3 rows
cursor.execute('SELECT * FROM products')
while True:
batch = cursor.fetchmany()
if not batch:
break
print(f"Fetched batch of {len(batch)} items:")
for product in batch:
print(f" {product[1]}: ${product[2]:.2f}")
此示例将 arraysize 设置为 3,导致 fetchmany 每次调用返回 3 行。该循环批量处理产品,而不是单独处理。
批量获取减少了数据库往返次数,这对于联网数据库或大型结果集尤其有利。
arraysize 与大型数据集
对于非常大的数据集,适当的 arraysize 可以显着减少内存使用并提高性能。
import sqlite3
import time
def process_large_data(arraysize):
with sqlite3.connect('large_data.db') as conn:
conn.execute('CREATE TABLE IF NOT EXISTS measurements (id INTEGER, value REAL)')
# Insert sample data (100,000 rows)
if conn.execute('SELECT COUNT(*) FROM measurements').fetchone()[0] == 0:
conn.executemany('INSERT INTO measurements VALUES (?, ?)',
[(i, i * 0.1) for i in range(1, 100001)])
with conn.cursor() as cursor:
cursor.arraysize = arraysize
start_time = time.time()
cursor.execute('SELECT * FROM measurements')
total_rows = 0
while True:
batch = cursor.fetchmany()
if not batch:
break
total_rows += len(batch)
# Process batch here
elapsed = time.time() - start_time
print(f"arraysize {arraysize}: Processed {total_rows} rows in {elapsed:.3f} seconds")
# Compare different array sizes
process_large_data(1) # Default
process_large_data(100) # Moderate
process_large_data(1000) # Large
此基准测试比较了处理 100,000 行时不同的 arraysize 值。通常,较大的尺寸性能更好,但需要更多的内存。
最佳值取决于您的特定用例、可用内存和行大小。建议测试不同的值。
arraysize 与 fetchall()
虽然 arraysize 主要影响 fetchmany,但它可以在某些实现中影响 fetchall 的内存使用。
import sqlite3
import sys
with sqlite3.connect(':memory:') as conn:
conn.execute('CREATE TABLE data (id INTEGER, content TEXT)')
conn.executemany('INSERT INTO data VALUES (?, ?)',
[(i, 'X' * 1000) for i in range(1, 1001)]) # 1MB of data
with conn.cursor() as cursor:
# Small arraysize
cursor.arraysize = 1
print("Memory with arraysize=1:", end=' ')
data = cursor.execute('SELECT * FROM data').fetchall()
print(f"{sys.getsizeof(data) / 1024:.1f} KB")
# Larger arraysize
cursor.arraysize = 100
print("Memory with arraysize=100:", end=' ')
data = cursor.execute('SELECT * FROM data').fetchall()
print(f"{sys.getsizeof(data) / 1024:.1f} KB")
此示例显示了即使使用 fetchall,arraysize 也可能如何影响内存使用。差异因 Python 实现而异。
为了保持一致的行为,当内存效率至关重要时,请使用 fetchmany 显式管理批处理大小。
动态 arraysize 调整
可以根据查询结果或系统条件动态调整 arraysize。
import sqlite3
import psutil
def get_memory_usage():
return psutil.virtual_memory().percent
with sqlite3.connect('adaptive.db') as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS sensor_data
(timestamp TEXT, sensor_id INTEGER, value REAL)''')
# Insert sample sensor data
if conn.execute('SELECT COUNT(*) FROM sensor_data').fetchone()[0] == 0:
import datetime
now = datetime.datetime.now()
data = [(str(now - datetime.timedelta(seconds=i)), i % 10, i * 0.1)
for i in range(10000)]
conn.executemany('INSERT INTO sensor_data VALUES (?, ?, ?)', data)
with conn.cursor() as cursor:
cursor.execute('SELECT * FROM sensor_data ORDER BY timestamp DESC')
# Start with moderate arraysize
cursor.arraysize = 100
while True:
# Adjust arraysize based on memory pressure
if get_memory_usage() > 70:
cursor.arraysize = max(10, cursor.arraysize // 2)
elif get_memory_usage() < 50 and cursor.arraysize < 1000:
cursor.arraysize = min(1000, cursor.arraysize * 2)
batch = cursor.fetchmany()
if not batch:
break
print(f"Processing {len(batch)} rows (arraysize: {cursor.arraysize})")
# Process batch here
这个高级示例根据系统内存使用情况动态调整 arraysize。当内存受限时,它会减小批处理大小,并在可用时增加它。
这种自适应方法在资源受限的环境中或处理不可预测的数据大小时非常有用。
arraysize 与不同的行大小
最佳 arraysize 可能会因结果集中每行的大小而异。
import sqlite3
import time
def test_row_size(row_size, arraysize):
with sqlite3.connect(':memory:') as conn:
# Create table with specified row size
conn.execute(f'CREATE TABLE data (id INTEGER, content TEXT)')
data = [(i, 'X' * row_size) for i in range(1000)]
conn.executemany('INSERT INTO data VALUES (?, ?)', data)
with conn.cursor() as cursor:
cursor.arraysize = arraysize
start = time.time()
cursor.execute('SELECT * FROM data')
total = 0
while True:
batch = cursor.fetchmany()
if not batch:
break
total += len(batch)
elapsed = time.time() - start
print(f"Row size {row_size}B, arraysize {arraysize}: {elapsed:.4f}s")
# Test different combinations
test_row_size(10, 100) # Small rows, large batch
test_row_size(10, 10) # Small rows, small batch
test_row_size(1000, 100) # Large rows, large batch
test_row_size(1000, 10) # Large rows, small batch
此示例演示了行大小如何影响不同 arraysize 值的性能。较大的行可能需要较小的批处理大小,以避免过度使用内存。
在处理大型 BLOB 或文本字段时,请考虑较小的 arraysize 值,以平衡性能和内存消耗。
最佳实践
- 分析性能: 针对您的特定用例测试不同的 arraysize 值
- 考虑内存: 较大的 arraysize 每次获取使用更多的内存
- 网络注意事项: 较大的批处理减少了远程数据库的往返次数
- 默认设置是安全的: arraysize=1 适用于所有情况,但可能较慢
- 记录您的选择: 注释您选择特定 arraysize 值的理由
资料来源
作者
列出所有 Python 教程。