Python sqlite3.Connection.create_aggregate 方法
上次修改时间:2025 年 4 月 15 日
本综合指南探讨了 Python 的 create_aggregate 方法,该方法允许在 SQLite 中创建自定义聚合函数。我们将涵盖该方法的参数、实现模式和实际示例。
基本定义
create_aggregate 方法将 Python 可调用对象注册为 SQL 聚合函数。聚合函数对多行进行操作并返回单个结果,例如 SUM 或 AVG。
主要特性:它需要一个具有 step 和 finalize 方法的类,在连接的生命周期内持续存在,并且可以处理多个参数。该方法扩展了 SQLite 的内置聚合功能。
基本聚合函数
此示例展示了如何创建一个简单的聚合函数,该函数计算一组数字的几何平均值。
import sqlite3
import math
class GeometricMean:
def __init__(self):
self.product = 1
self.count = 0
def step(self, value):
if value is not None:
self.product *= value
self.count += 1
def finalize(self):
if self.count == 0:
return None
return math.pow(self.product, 1/self.count)
with sqlite3.connect(':memory:') as conn:
conn.create_aggregate("geomean", 1, GeometricMean)
cursor = conn.cursor()
cursor.execute("CREATE TABLE data (value REAL)")
cursor.executemany("INSERT INTO data VALUES (?)", [(2,), (8,), (32,)])
cursor.execute("SELECT geomean(value) FROM data")
result = cursor.fetchone()[0]
print(f"Geometric mean: {result:.2f}") # Output: 8.00
GeometricMean 类在聚合期间维护状态。step 方法处理每一行,而 finalize 计算最终结果。该函数已注册到 create_aggregate。
为了简单起见,该示例使用内存数据库。聚合函数的工作方式与查询中的内置 SQL 函数完全相同。
字符串聚合
此示例演示了字符串聚合,将多个字符串与自定义分隔符组合在一起。
import sqlite3
class StringAgg:
def __init__(self):
self.strings = []
self.separator = ', '
def step(self, value, separator=None):
if separator is not None:
self.separator = separator
if value is not None:
self.strings.append(str(value))
def finalize(self):
return self.separator.join(self.strings)
with sqlite3.connect(':memory:') as conn:
conn.create_aggregate("str_agg", -1, StringAgg) # -1 for variable args
cursor = conn.cursor()
cursor.execute("CREATE TABLE phrases (text TEXT)")
data = [('Hello',), ('world',), ('from',), ('SQLite',)]
cursor.executemany("INSERT INTO phrases VALUES (?)", data)
cursor.execute("SELECT str_agg(text, ' | ') FROM phrases")
result = cursor.fetchone()[0]
print(result) # Output: Hello | world | from | SQLite
StringAgg 类通过检查可选的分隔符参数来处理可变参数。create_aggregate 中的 -1 允许可变数量的参数。
当 SQLite 的内置 group_concat 不足以满足需求时,此模式对于创建自定义字符串聚合函数非常有用。
统计众数计算
此示例实现了一个统计众数聚合函数,用于查找数据集中最频繁的值。
import sqlite3
from collections import defaultdict
class Mode:
def __init__(self):
self.frequencies = defaultdict(int)
self.max_count = 0
self.mode_value = None
def step(self, value):
if value is not None:
self.frequencies[value] += 1
if self.frequencies[value] > self.max_count:
self.max_count = self.frequencies[value]
self.mode_value = value
def finalize(self):
return self.mode_value
with sqlite3.connect(':memory:') as conn:
conn.create_aggregate("mode", 1, Mode)
cursor = conn.cursor()
cursor.execute("CREATE TABLE numbers (num INTEGER)")
data = [(7,), (3,), (7,), (1,), (3,), (7,), (5,)]
cursor.executemany("INSERT INTO numbers VALUES (?)", data)
cursor.execute("SELECT mode(num) FROM numbers")
result = cursor.fetchone()[0]
print(f"Mode: {result}") # Output: 7
Mode 类使用字典来跟踪值频率。step 方法更新计数,而 finalize 返回最频繁的值。
这演示了如何在聚合期间维护复杂状态并实现 SQLite 中本地没有的统计函数。
范围计算
此示例创建了一个聚合函数,用于计算一组数字的范围(最大值 - 最小值)。
import sqlite3
class Range:
def __init__(self):
self.min_val = None
self.max_val = None
def step(self, value):
if value is not None:
if self.min_val is None or value < self.min_val:
self.min_val = value
if self.max_val is None or value > self.max_val:
self.max_val = value
def finalize(self):
if self.min_val is None or self.max_val is None:
return None
return self.max_val - self.min_val
with sqlite3.connect('example.db') as conn:
conn.create_aggregate("calc_range", 1, Range)
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS measurements (value REAL)")
cursor.executemany("INSERT INTO measurements VALUES (?)",
[(12.5,), (18.3,), (15.1,), (22.7,), (14.9,)])
cursor.execute("SELECT calc_range(value) FROM measurements")
result = cursor.fetchone()[0]
print(f"Range: {result:.1f}") # Output: 10.2
Range 类在处理过程中跟踪最小值和最大值。finalize 方法计算它们之间的差。
此示例展示了如何创建特定领域的聚合函数,这些函数将复杂计算封装在可重用的 SQL 函数中。
加权平均值
此示例实现了一个加权平均值聚合函数,该函数将值和权重作为参数。
import sqlite3
class WeightedAvg:
def __init__(self):
self.total = 0.0
self.weights_sum = 0.0
def step(self, value, weight):
if value is not None and weight is not None:
self.total += value * weight
self.weights_sum += weight
def finalize(self):
if self.weights_sum == 0:
return None
return self.total / self.weights_sum
with sqlite3.connect(':memory:') as conn:
conn.create_aggregate("weighted_avg", 2, WeightedAvg)
cursor = conn.cursor()
cursor.execute("CREATE TABLE scores (score REAL, weight REAL)")
data = [(85, 0.2), (90, 0.3), (78, 0.5)]
cursor.executemany("INSERT INTO scores VALUES (?, ?)", data)
cursor.execute("SELECT weighted_avg(score, weight) FROM scores")
result = cursor.fetchone()[0]
print(f"Weighted average: {result:.2f}") # Output: 83.60
WeightedAvg 类处理每行两个参数。它维护用于计算的运行总计,展示了多参数聚合。
这种模式对于财务计算、评分系统或任何需要加权测量的场景都很有用。
JSON 数组聚合
此示例创建了一个聚合函数,用于从列值构建 JSON 数组。
import sqlite3
import json
class JSONArray:
def __init__(self):
self.items = []
def step(self, value):
if value is not None:
self.items.append(value)
def finalize(self):
return json.dumps(self.items)
with sqlite3.connect(':memory:') as conn:
conn.create_aggregate("json_array", 1, JSONArray)
cursor = conn.cursor()
cursor.execute("CREATE TABLE colors (name TEXT)")
cursor.executemany("INSERT INTO colors VALUES (?)",
[('red',), ('green',), ('blue',)])
cursor.execute("SELECT json_array(name) FROM colors")
result = cursor.fetchone()[0]
print(result) # Output: ["red", "green", "blue"]
JSONArray 类将值收集到一个列表中,并将它们转换为 JSON 字符串。这演示了如何创建自定义序列化格式。
当使用使用 JSON 数据的应用程序时,此技术特别有用,它提供了直接的数据库到 JSON 的转换。
First/Last 聚合函数
此示例实现了 FIRST 和 LAST 聚合函数,这些函数返回组中的第一个和最后一个值。
import sqlite3
class First:
def __init__(self):
self.first_value = None
self.first_set = False
def step(self, value):
if not self.first_set and value is not None:
self.first_value = value
self.first_set = True
def finalize(self):
return self.first_value
class Last:
def __init__(self):
self.last_value = None
def step(self, value):
if value is not None:
self.last_value = value
def finalize(self):
return self.last_value
with sqlite3.connect(':memory:') as conn:
conn.create_aggregate("first", 1, First)
conn.create_aggregate("last", 1, Last)
cursor = conn.cursor()
cursor.execute("CREATE TABLE events (ts TEXT, event TEXT)")
data = [
('2023-01-01 08:00', 'start'),
('2023-01-01 09:30', 'progress'),
('2023-01-01 12:00', 'complete')
]
cursor.executemany("INSERT INTO events VALUES (?, ?)", data)
cursor.execute("SELECT first(event), last(event) FROM events")
first, last = cursor.fetchone()
print(f"First: {first}, Last: {last}") # Output: First: start, Last: complete
First 类捕获它遇到的第一个非 NULL 值,而 Last 会不断更新到最新的值。两者都注册为单独的聚合函数。
这些函数对于时间序列数据分析非常有用,可以轻松访问边界值,而无需复杂的窗口函数。
最佳实践
- 正确初始化状态: 清理初始化可防止调用之间出现问题
- 处理 NULL 值: 检查 step 方法中是否有 None
- 使用适当的参数计数: 指定正确的 num_params 或使用 -1
- 保持状态最小: 仅存储最终计算所需的内容
- 记录行为: 清楚地记录您的聚合函数
资料来源
作者
列出所有 Python 教程。