ZetCode

Python sqlite3.Connection.create_function 方法

上次修改时间:2025 年 4 月 15 日

本综合指南探讨了 Python 的 sqlite3.Connection.create_function 方法,该方法允许在 SQLite 数据库中将 Python 函数注册为 SQL 函数。

基本定义

create_function 方法将 Python 函数注册为 SQL 函数。 然后可以从 SQL 语句中调用此函数。 它是 SQLite 连接对象的一部分。

主要特点:它将 Python 函数绑定到 SQL 名称,支持可变参数计数,并保持 Python 和 SQLite 之间的类型一致性。

基本函数注册

此示例演示如何将简单的 Python 函数注册为 SQL 函数。

basic_function.py
import sqlite3

def square(x):
    return x * x

with sqlite3.connect(':memory:') as conn:
    conn.create_function('square', 1, square)
    
    cursor = conn.cursor()
    cursor.execute("SELECT square(5)")
    result = cursor.fetchone()
    print(result[0])  # Output: 25

create_function 接受三个参数:SQL 函数名称、参数数量和 Python 函数。 在这里,我们将 square 注册为 SQL 函数。

然后可以在 SQL 查询中调用已注册的函数,就像内置 SQL 函数一样。 连接会自动管理函数的生命周期。

字符串操作函数

此示例演示如何注册字符串操作函数。

string_function.py
import sqlite3

def reverse_string(s):
    return s[::-1]

with sqlite3.connect(':memory:') as conn:
    conn.create_function('reverse', 1, reverse_string)
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE words (word TEXT)")
    cursor.execute("INSERT INTO words VALUES ('hello'), ('world')")
    
    cursor.execute("SELECT word, reverse(word) FROM words")
    for word, reversed_word in cursor:
        print(f"{word} -> {reversed_word}")

reverse_string 函数接受一个字符串并返回其反转形式。 我们将其注册为 SQL 名称 reverse 并在 SELECT 查询中使用它。

这显示了如何将 Python 的字符串操作功能暴露给 SQL,从而扩展 SQLite 超出其内置函数的功能。

具有多个参数的函数

此示例显示了一个接受多个参数的函数。

multi_param_function.py
import sqlite3

def power(base, exponent):
    return base ** exponent

with sqlite3.connect(':memory:') as conn:
    conn.create_function('power', 2, power)
    
    cursor = conn.cursor()
    cursor.execute("SELECT power(2, 3), power(5, 2)")
    results = cursor.fetchone()
    print(f"2^3 = {results[0]}, 5^2 = {results[1]}")

power 函数接受两个参数并返回第一个参数的第二个参数的幂。 我们用 2 个参数注册它。

在 SQL 中调用该函数时,我们传递两个参数。 该函数的工作方式类似于具有多个参数的内置 SQL 函数。

可变数量的参数

SQLite 支持具有可变数量参数的函数。 以下是如何实现一个。

varargs_function.py
import sqlite3

def concatenate(*args):
    return ''.join(str(arg) for arg in args)

with sqlite3.connect(':memory:') as conn:
    conn.create_function('concat', -1, concatenate)
    
    cursor = conn.cursor()
    cursor.execute("SELECT concat('Hello', ' ', 'World', '!')")
    print(cursor.fetchone()[0])  # Output: Hello World!
    
    cursor.execute("SELECT concat(1, 2, 3)")
    print(cursor.fetchone()[0])  # Output: 123

使用 -1 作为参数计数表示可变数量的参数。 Python 函数将它们作为元组接收。

此示例将所有参数连接成一个字符串,无论提供了多少个参数。 该函数可以处理字符串和数字。

具有数据库上下文的函数

函数可以访问数据库上下文以进行更高级的操作。

context_function.py
import sqlite3

def count_rows(table_name):
    cursor = conn.cursor()
    cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
    return cursor.fetchone()[0]

with sqlite3.connect(':memory:') as conn:
    conn.create_function('rowcount', 1, count_rows)
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE users (id INTEGER, name TEXT)")
    cursor.execute("INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob')")
    
    cursor.execute("SELECT rowcount('users')")
    print(f"Row count: {cursor.fetchone()[0]}")  # Output: Row count: 2

此示例显示了一个查询数据库的函数。 请注意,我们在函数内部使用了外部范围的连接对象。

使用此类函数时要小心,以避免 SQL 注入(如此处所示的直接表名插值)。 始终验证输入。

聚合函数

SQLite 支持自定义聚合函数。 这是一个简单的例子。

aggregate_function.py
import sqlite3

class GeometricMean:
    def __init__(self):
        self.product = 1.0
        self.count = 0
    
    def step(self, value):
        self.product *= value
        self.count += 1
    
    def finalize(self):
        return self.product ** (1.0 / self.count) if self.count else 0.0

with sqlite3.connect(':memory:') as conn:
    conn.create_aggregate("geom_mean", 1, GeometricMean)
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE numbers (value REAL)")
    cursor.execute("INSERT INTO numbers VALUES (2), (8), (32)")
    
    cursor.execute("SELECT geom_mean(value) FROM numbers")
    result = cursor.fetchone()[0]
    print(f"Geometric mean: {result:.2f}")  # Output: Geometric mean: 8.00

聚合函数需要一个带有 stepfinalize 方法的类。 step 处理每一行,finalize 返回结果。

此示例计算一组数字的几何平均值,演示如何在 SQLite 中实现自定义聚合。

窗口函数

SQLite 还支持窗口函数。 这是一个简单的移动平均示例。

window_function.py
import sqlite3

class MovingAverage:
    def __init__(self):
        self.window = []
    
    def step(self, value, size):
        self.window.append(value)
        if len(self.window) > size:
            self.window.pop(0)
        return sum(self.window) / len(self.window) if self.window else 0.0
    
    def finalize(self):
        return None  # Not used for window functions

with sqlite3.connect(':memory:') as conn:
    conn.create_window_function("moving_avg", 2, MovingAverage)
    
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE stock_prices (day INTEGER, price REAL)")
    cursor.executemany("INSERT INTO stock_prices VALUES (?, ?)",
                      [(1, 10), (2, 12), (3, 15), (4, 14), (5, 16)])
    
    cursor.execute("""
        SELECT day, price, 
               moving_avg(price, 3) OVER (ORDER BY day) AS avg_price
        FROM stock_prices
    """)
    
    for row in cursor:
        print(f"Day {row[0]}: Price={row[1]}, Avg={row[2]:.2f}")

窗口函数类似于聚合,但对行的滑动窗口进行操作。 它们使用 create_window_function 注册。

此示例计算股票价格的 3 天移动平均线,展示如何在 SQLite 中实现自定义窗口函数。

最佳实践

资料来源

作者

我的名字是 Jan Bodnar,我是一位充满激情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直在撰写编程文章。 迄今为止,我已经撰写了超过 1,400 篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程