ZetCode

Python sqlite3.Connection.create_aggregate 方法

上次修改时间:2025 年 4 月 15 日

本综合指南探讨了 Python 的 create_aggregate 方法,该方法允许在 SQLite 中创建自定义聚合函数。我们将涵盖该方法的参数、实现模式和实际示例。

基本定义

create_aggregate 方法将 Python 可调用对象注册为 SQL 聚合函数。聚合函数对多行进行操作并返回单个结果,例如 SUM 或 AVG。

主要特性:它需要一个具有 stepfinalize 方法的类,在连接的生命周期内持续存在,并且可以处理多个参数。该方法扩展了 SQLite 的内置聚合功能。

基本聚合函数

此示例展示了如何创建一个简单的聚合函数,该函数计算一组数字的几何平均值。

basic_aggregate.py
import sqlite3
import math

class GeometricMean:
    def __init__(self):
        self.product = 1
        self.count = 0

    def step(self, value):
        if value is not None:
            self.product *= value
            self.count += 1

    def finalize(self):
        if self.count == 0:
            return None
        return math.pow(self.product, 1/self.count)

with sqlite3.connect(':memory:') as conn:
    conn.create_aggregate("geomean", 1, GeometricMean)
    cursor = conn.cursor()
    
    cursor.execute("CREATE TABLE data (value REAL)")
    cursor.executemany("INSERT INTO data VALUES (?)", [(2,), (8,), (32,)])
    
    cursor.execute("SELECT geomean(value) FROM data")
    result = cursor.fetchone()[0]
    print(f"Geometric mean: {result:.2f}")  # Output: 8.00

GeometricMean 类在聚合期间维护状态。step 方法处理每一行,而 finalize 计算最终结果。该函数已注册到 create_aggregate

为了简单起见,该示例使用内存数据库。聚合函数的工作方式与查询中的内置 SQL 函数完全相同。

字符串聚合

此示例演示了字符串聚合,将多个字符串与自定义分隔符组合在一起。

string_aggregate.py
import sqlite3

class StringAgg:
    def __init__(self):
        self.strings = []
        self.separator = ', '

    def step(self, value, separator=None):
        if separator is not None:
            self.separator = separator
        if value is not None:
            self.strings.append(str(value))

    def finalize(self):
        return self.separator.join(self.strings)

with sqlite3.connect(':memory:') as conn:
    conn.create_aggregate("str_agg", -1, StringAgg)  # -1 for variable args
    cursor = conn.cursor()
    
    cursor.execute("CREATE TABLE phrases (text TEXT)")
    data = [('Hello',), ('world',), ('from',), ('SQLite',)]
    cursor.executemany("INSERT INTO phrases VALUES (?)", data)
    
    cursor.execute("SELECT str_agg(text, ' | ') FROM phrases")
    result = cursor.fetchone()[0]
    print(result)  # Output: Hello | world | from | SQLite

StringAgg 类通过检查可选的分隔符参数来处理可变参数。create_aggregate 中的 -1 允许可变数量的参数。

当 SQLite 的内置 group_concat 不足以满足需求时,此模式对于创建自定义字符串聚合函数非常有用。

统计众数计算

此示例实现了一个统计众数聚合函数,用于查找数据集中最频繁的值。

mode_aggregate.py
import sqlite3
from collections import defaultdict

class Mode:
    def __init__(self):
        self.frequencies = defaultdict(int)
        self.max_count = 0
        self.mode_value = None

    def step(self, value):
        if value is not None:
            self.frequencies[value] += 1
            if self.frequencies[value] > self.max_count:
                self.max_count = self.frequencies[value]
                self.mode_value = value

    def finalize(self):
        return self.mode_value

with sqlite3.connect(':memory:') as conn:
    conn.create_aggregate("mode", 1, Mode)
    cursor = conn.cursor()
    
    cursor.execute("CREATE TABLE numbers (num INTEGER)")
    data = [(7,), (3,), (7,), (1,), (3,), (7,), (5,)]
    cursor.executemany("INSERT INTO numbers VALUES (?)", data)
    
    cursor.execute("SELECT mode(num) FROM numbers")
    result = cursor.fetchone()[0]
    print(f"Mode: {result}")  # Output: 7

Mode 类使用字典来跟踪值频率。step 方法更新计数,而 finalize 返回最频繁的值。

这演示了如何在聚合期间维护复杂状态并实现 SQLite 中本地没有的统计函数。

范围计算

此示例创建了一个聚合函数,用于计算一组数字的范围(最大值 - 最小值)。

range_aggregate.py
import sqlite3

class Range:
    def __init__(self):
        self.min_val = None
        self.max_val = None

    def step(self, value):
        if value is not None:
            if self.min_val is None or value < self.min_val:
                self.min_val = value
            if self.max_val is None or value > self.max_val:
                self.max_val = value

    def finalize(self):
        if self.min_val is None or self.max_val is None:
            return None
        return self.max_val - self.min_val

with sqlite3.connect('example.db') as conn:
    conn.create_aggregate("calc_range", 1, Range)
    cursor = conn.cursor()
    
    cursor.execute("CREATE TABLE IF NOT EXISTS measurements (value REAL)")
    cursor.executemany("INSERT INTO measurements VALUES (?)", 
                      [(12.5,), (18.3,), (15.1,), (22.7,), (14.9,)])
    
    cursor.execute("SELECT calc_range(value) FROM measurements")
    result = cursor.fetchone()[0]
    print(f"Range: {result:.1f}")  # Output: 10.2

Range 类在处理过程中跟踪最小值和最大值。finalize 方法计算它们之间的差。

此示例展示了如何创建特定领域的聚合函数,这些函数将复杂计算封装在可重用的 SQL 函数中。

加权平均值

此示例实现了一个加权平均值聚合函数,该函数将值和权重作为参数。

weighted_avg.py
import sqlite3

class WeightedAvg:
    def __init__(self):
        self.total = 0.0
        self.weights_sum = 0.0

    def step(self, value, weight):
        if value is not None and weight is not None:
            self.total += value * weight
            self.weights_sum += weight

    def finalize(self):
        if self.weights_sum == 0:
            return None
        return self.total / self.weights_sum

with sqlite3.connect(':memory:') as conn:
    conn.create_aggregate("weighted_avg", 2, WeightedAvg)
    cursor = conn.cursor()
    
    cursor.execute("CREATE TABLE scores (score REAL, weight REAL)")
    data = [(85, 0.2), (90, 0.3), (78, 0.5)]
    cursor.executemany("INSERT INTO scores VALUES (?, ?)", data)
    
    cursor.execute("SELECT weighted_avg(score, weight) FROM scores")
    result = cursor.fetchone()[0]
    print(f"Weighted average: {result:.2f}")  # Output: 83.60

WeightedAvg 类处理每行两个参数。它维护用于计算的运行总计,展示了多参数聚合。

这种模式对于财务计算、评分系统或任何需要加权测量的场景都很有用。

JSON 数组聚合

此示例创建了一个聚合函数,用于从列值构建 JSON 数组。

json_aggregate.py
import sqlite3
import json

class JSONArray:
    def __init__(self):
        self.items = []

    def step(self, value):
        if value is not None:
            self.items.append(value)

    def finalize(self):
        return json.dumps(self.items)

with sqlite3.connect(':memory:') as conn:
    conn.create_aggregate("json_array", 1, JSONArray)
    cursor = conn.cursor()
    
    cursor.execute("CREATE TABLE colors (name TEXT)")
    cursor.executemany("INSERT INTO colors VALUES (?)", 
                      [('red',), ('green',), ('blue',)])
    
    cursor.execute("SELECT json_array(name) FROM colors")
    result = cursor.fetchone()[0]
    print(result)  # Output: ["red", "green", "blue"]

JSONArray 类将值收集到一个列表中,并将它们转换为 JSON 字符串。这演示了如何创建自定义序列化格式。

当使用使用 JSON 数据的应用程序时,此技术特别有用,它提供了直接的数据库到 JSON 的转换。

First/Last 聚合函数

此示例实现了 FIRST 和 LAST 聚合函数,这些函数返回组中的第一个和最后一个值。

first_last.py
import sqlite3

class First:
    def __init__(self):
        self.first_value = None
        self.first_set = False

    def step(self, value):
        if not self.first_set and value is not None:
            self.first_value = value
            self.first_set = True

    def finalize(self):
        return self.first_value

class Last:
    def __init__(self):
        self.last_value = None

    def step(self, value):
        if value is not None:
            self.last_value = value

    def finalize(self):
        return self.last_value

with sqlite3.connect(':memory:') as conn:
    conn.create_aggregate("first", 1, First)
    conn.create_aggregate("last", 1, Last)
    cursor = conn.cursor()
    
    cursor.execute("CREATE TABLE events (ts TEXT, event TEXT)")
    data = [
        ('2023-01-01 08:00', 'start'),
        ('2023-01-01 09:30', 'progress'),
        ('2023-01-01 12:00', 'complete')
    ]
    cursor.executemany("INSERT INTO events VALUES (?, ?)", data)
    
    cursor.execute("SELECT first(event), last(event) FROM events")
    first, last = cursor.fetchone()
    print(f"First: {first}, Last: {last}")  # Output: First: start, Last: complete

First 类捕获它遇到的第一个非 NULL 值,而 Last 会不断更新到最新的值。两者都注册为单独的聚合函数。

这些函数对于时间序列数据分析非常有用,可以轻松访问边界值,而无需复杂的窗口函数。

最佳实践

资料来源

作者

我叫 Jan Bodnar,是一位充满激情的程序员,拥有丰富的编程经验。 我从 2007 年开始撰写编程文章。到目前为止,我已经撰写了 1,400 多篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程