内置 UDF
本文档介绍 SQLRec 内置的用户定义函数(UDF),包括表函数(Table Function)和标量函数(Scalar Function)。
概述
SQLRec 提供了丰富的内置 UDF,用于推荐系统开发中的常见操作,如去重、打散、向量计算等。
UDF 分类:
| 类型 | 说明 | 返回值 |
|---|---|---|
| Table Function | 表函数,接收表作为输入,返回表 | CacheTable |
| Scalar Function | 标量函数,接收标量值,返回标量值 | 单个值 |
表函数(Table Function)
dedup
去重函数,根据指定列从输入表中排除已存在于去重表中的记录。
函数签名:
public CacheTable evaluate(CacheTable input, CacheTable dedupTable, String col1, String col2)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
input | CacheTable | 输入表 |
dedupTable | CacheTable | 去重表,包含需要排除的值 |
col1 | String | 输入表中用于去重的列名 |
col2 | String | 去重表中用于匹配的列名 |
返回值:返回去重后的 CacheTable,结构与输入表相同。
使用示例:
-- 获取用户已曝光的物品
CACHE TABLE exposured_item AS
SELECT item_id
FROM user_info JOIN user_exposure_item ON user_id = user_info.id;
-- 从召回结果中排除已曝光物品
CACHE TABLE dedup_recall AS
CALL dedup(recall_item, exposured_item, 'item_id', 'item_id');工作原理:
- 从
dedupTable的col2列收集所有值 - 遍历
input表,排除col1列值存在于去重集合中的记录 - 返回去重后的结果表
shuffle
随机打乱函数,将输入表中的记录随机排序。
函数签名:
public CacheTable evaluate(CacheTable input)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
input | CacheTable | 输入表 |
返回值:返回随机排序后的 CacheTable,结构和数据与输入表相同。
使用示例:
-- 随机打乱推荐结果
CACHE TABLE shuffled_result AS
CALL shuffle(recall_item);
-- 取打乱后的前 N 个
CACHE TABLE random_top_n AS
SELECT * FROM shuffled_result LIMIT 10;window_diversify
窗口打散函数,确保相邻的记录不会过于集中在某个类目,实现推荐结果的多样性。
函数签名:
public CacheTable evaluate(
CacheTable input,
String categoryColumnName,
String windowSize,
String maxCategoryNumInWindow,
String maxReturnRecord
)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
input | CacheTable | 输入表 |
categoryColumnName | String | 类目列名,用于打散的依据 |
windowSize | String | 滑动窗口大小 |
maxCategoryNumInWindow | String | 窗口内每个类目最多出现的次数 |
maxReturnRecord | String | 最大返回记录数 |
返回值:返回打散后的 CacheTable,结构与输入表相同。
使用示例:
-- 类目打散:窗口大小为 3,每个类目在窗口内最多出现 1 次,返回 10 条
CACHE TABLE diversify_result AS
CALL window_diversify(rec_item, 'category1', '3', '1', '10');工作原理:
- 维护一个滑动窗口,统计窗口内各类目的出现次数
- 遍历输入记录,优先选择窗口内未超限的类目
- 当窗口滑动时,移除最旧记录的类目计数
- 确保推荐结果的多样性,避免同类目物品连续出现
add_col
添加列函数,为输入表添加一个新列,所有行的该列值相同。
函数签名:
public CacheTable evaluate(CacheTable input, String colName, String value)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
input | CacheTable | 输入表 |
colName | String | 新列名 |
value | String | 新列的值(所有行相同) |
返回值:返回添加新列后的 CacheTable。
使用示例:
-- 添加一个来源标识列
CACHE TABLE result_with_source AS
CALL add_col(recall_item, 'source', 'daily_rec');
-- 添加时间戳列
CACHE TABLE result_with_time AS
CALL add_col(recall_item, 'rec_time', '2024-01-01');注意事项:
- 新列名不能与已有列名重复
- 新列类型为
VARCHAR
call_service
模型服务调用函数,用于调用已部署的模型服务进行推理。详见 模型文档。
batch_call_service
批量模型服务调用函数,用于在 Flink SQL 中批量调用已部署的模型服务进行推理。该函数将多行数据批量发送到远程服务,并将返回结果与原始数据合并输出。
注意
此函数只能在 Flink SQL 中使用,不支持 SQLRec 的 CACHE TABLE 语法。
函数签名:
@FunctionHint(output = @DataTypeHint("ROW<" +
"long_map MAP<STRING, BIGINT>, " +
"double_map MAP<STRING, DOUBLE>, " +
"string_map MAP<STRING, STRING>, " +
"long_array_map MAP<STRING, ARRAY<BIGINT>>, " +
"double_array_map MAP<STRING, ARRAY<DOUBLE>>, " +
"string_array_map MAP<STRING, ARRAY<STRING>>" +
">"))
public void eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object... args)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
serviceUrl | String | 模型服务的 URL 地址 |
batchSize | Integer | 批量大小,每次请求发送的行数 |
fieldName-value pairs | Object... | 字段名-值对,用于指定要发送到服务的字段,必须成对出现 |
返回值:返回一个 ROW 类型,包含以下字段:
| 字段名 | 类型 | 说明 |
|---|---|---|
long_map | MAP<STRING, BIGINT> | 长整型字段的 Map |
double_map | MAP<STRING, DOUBLE> | 双精度浮点型字段的 Map |
string_map | MAP<STRING, STRING> | 字符串型字段的 Map |
long_array_map | MAP<STRING, ARRAY<BIGINT>> | 长整型数组字段的 Map |
double_array_map | MAP<STRING, ARRAY<DOUBLE>> | 双精度浮点型数组字段的 Map |
string_array_map | MAP<STRING, ARRAY<STRING>> | 字符串型数组字段的 Map |
使用示例:
-- 创建临时函数
CREATE TEMPORARY FUNCTION batch_call_service AS 'com.sqlrec.udf.udtf.BatchCallServiceUDTF';
-- 调用模型服务生成物品向量
INSERT INTO item_embedding
SELECT
r.long_map['movie_id'] AS id,
r.string_map['title'] AS title,
r.string_array_map['genres'] AS genres,
r.double_array_map['item_tower_emb'] AS embedding
FROM ml_movies, LATERAL TABLE(batch_call_service(
'http://test-recall-service-item.sqlrec.svc.cluster.local:80/predict',
128,
'movie_id', movie_id,
'title', title,
'genres', genres
)) AS r
WHERE dt = '2024-01-01';工作原理:
- 函数接收多行数据,将字段名-值对缓存到缓冲区
- 当缓冲区大小达到
batchSize时,将数据批量发送到模型服务 - 模型服务接收 JSON 数组格式的请求,返回包含预测结果的 JSON 对象
- 函数将预测结果与原始数据合并,按类型分类存储到不同的 Map 中
- 每行数据输出一个 ROW,可通过 Map 访问原始字段和预测结果
请求格式:
发送到模型服务的 JSON 格式为对象数组:
[
{"movie_id": 1, "title": "Toy Story", "genres": ["Animation", "Comedy"]},
{"movie_id": 2, "title": "Jumanji", "genres": ["Adventure", "Children"]}
]响应格式:
模型服务应返回一个 JSON 对象,其中每个字段的值是一个数组,数组长度与请求数据行数相同:
{
"item_tower_emb": [[0.1, 0.2, ...], [0.3, 0.4, ...]],
"score": [0.95, 0.87]
}注意事项:
- 此函数只能在 Flink SQL 中使用,需要使用
LATERAL TABLE语法 batchSize建议根据模型服务的性能和网络延迟进行调整,通常设置为 64-256- 模型服务需要支持 POST 请求,接收 JSON 数组并返回 JSON 对象
- 返回结果中的数组字段会自动按行索引与输入数据对应
- 在
close()方法中会处理缓冲区中剩余的数据
call_service_with_qv
带 Query-Value 模式的模型服务调用函数。详见 模型文档。
truncate_table
表截取函数,从输入表中截取指定范围的行记录。
函数签名:
public CacheTable evaluate(CacheTable input, String start, String end)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
input | CacheTable | 输入表 |
start | String | 起始行索引(从 0 开始,包含) |
end | String | 结束行索引(不包含) |
返回值:返回截取后的 CacheTable,结构与输入表相同。
使用示例:
-- 获取第 10 到 20 条记录
CACHE TABLE partial_result AS
CALL truncate_table(recall_item, '10', '20');
-- 获取前 100 条记录
CACHE TABLE top_100 AS
CALL truncate_table(recall_item, '0', '100');注意事项:
start和end必须为有效的整数字符串start和end必须为非负数start必须小于或等于end- 截取范围为左闭右开区间
[start, end)
get_variables
获取变量函数,从执行上下文中获取所有变量,返回一个包含变量键值对的表。
函数签名:
public CacheTable evaluate(ExecuteContext context)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
context | ExecuteContext | 执行上下文 |
返回值:返回一个 2 列的 CacheTable,列名为 key 和 value,类型均为 VARCHAR。
使用示例:
-- 设置一些变量
SET 'user_id' = '12345';
SET 'limit' = '100';
-- 获取所有变量
CACHE TABLE all_vars AS
CALL get_variables();
-- 查看变量
SELECT * FROM all_vars;工作原理:
- 从执行上下文中获取所有变量
- 将每个变量的键值对转换为一行记录
- 返回包含所有变量的表
set_variables
设置变量函数,从表中读取键值对并设置到执行上下文中。
函数签名:
public CacheTable evaluate(ExecuteContext context, CacheTable input)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
context | ExecuteContext | 执行上下文 |
input | CacheTable | 输入表,必须恰好有 2 列,且均为字符串类型 |
返回值:返回输入表本身。
使用示例:
-- 创建变量表
CACHE TABLE var_table AS
SELECT 'user_id' AS key, '12345' AS value
UNION ALL
SELECT 'limit', '100';
-- 设置变量
CALL set_variables(var_table);
-- 使用设置的变量
SELECT `get`('user_id') AS user_id;注意事项:
- 输入表必须恰好有 2 列
- 两列都必须是字符串类型(VARCHAR 或 CHAR)
- 第一列为变量名,第二列为变量值
- 如果变量值为 NULL,则会删除该变量
feature_coverage_metrics
特征覆盖率打点函数,计算表中各字段的特征覆盖率并上报指标。
函数签名:
public Void evaluate(ExecuteContext context, String metricsName, CacheTable... tables)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
context | ExecuteContext | 执行上下文 |
metricsName | String | 指标名称 |
tables | CacheTable... | 一个或多个输入表 |
返回值:无返回值。
使用示例:
-- 计算并上报特征覆盖率
CALL feature_coverage_metrics('feature.coverage', user_features, item_features);
-- 仅计算单个表的覆盖率
CALL feature_coverage_metrics('user.feature.coverage', user_info);工作原理:
- 遍历每个表的每个字段
- 统计每个字段的非空值数量(
null、空Collection、空Map视为缺失) - 计算覆盖率 = 非空值数量 / 总行数
- 使用 summary 类型上报指标,tags 包含
table(表名)和field(字段名)
注意事项:
- 如果表为空,则跳过该表
- 指标名称不能为空
标量函数(Scalar Function)
array_contains
数组包含函数,检查数组是否包含指定元素。
函数签名:
public static Boolean evaluate(List<?> list, Object element)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
list | List<?> | 输入数组 |
element | Object | 要检查的元素 |
返回值:如果数组包含该元素返回 true,否则返回 false;如果任一参数为 null 则返回 null。
使用示例:
-- 检查用户标签是否包含 'vip'
SELECT
user_id,
array_contains(tags, 'vip') AS is_vip
FROM user_info;
-- 筛选包含特定标签的用户
SELECT *
FROM user_info
WHERE array_contains(tags, 'active') = true;array_contains_all
数组全包含函数,检查数组是否包含所有指定元素。
函数签名:
public static Boolean evaluate(List<?> list, List<?> elements)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
list | List<?> | 输入数组 |
elements | List<?> | 要检查的元素列表 |
返回值:如果数组包含所有指定元素返回 true,否则返回 false;如果任一参数为 null 则返回 null。
使用示例:
-- 检查用户是否同时拥有多个标签
SELECT
user_id,
array_contains_all(tags, ARRAY['vip', 'active']) AS is_vip_active
FROM user_info;
-- 筛选同时满足多个条件的用户
SELECT *
FROM user_info
WHERE array_contains_all(tags, ARRAY['premium', 'verified']) = true;array_contains_any
数组任一包含函数,检查数组是否包含指定元素中的任意一个。
函数签名:
public static Boolean evaluate(List<?> list, List<?> elements)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
list | List<?> | 输入数组 |
elements | List<?> | 要检查的元素列表 |
返回值:如果数组包含任一指定元素返回 true,否则返回 false;如果任一参数为 null 则返回 null。
使用示例:
-- 检查用户是否拥有任意一个 VIP 等级
SELECT
user_id,
array_contains_any(levels, ARRAY['gold', 'platinum', 'diamond']) AS is_high_level
FROM user_info;
-- 筛选拥有任意指定标签的用户
SELECT *
FROM user_info
WHERE array_contains_any(tags, ARRAY['new_user', 'trial']) = true;random_vec
随机向量生成函数,生成指定维度的归一化随机向量。
函数签名:
public List<Double> evaluate(String dimensionStr)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
dimensionStr | String | 向量维度,必须是正整数字符串 |
返回值:返回归一化的随机向量(List<Double>),向量的 L2 范数为 1。
使用示例:
-- 生成 64 维随机向量
SELECT
user_id,
random_vec('64') AS random_embedding
FROM user_info;
-- 为冷启动用户生成随机向量
CACHE TABLE cold_start_users AS
SELECT
user_id,
random_vec('128') AS user_embedding
FROM new_users;工作原理:
- 解析维度参数为整数
- 生成指定维度的随机向量
- 对向量进行 L2 归一化,使范数为 1
注意事项:
- 维度必须是正整数
- 生成的向量已归一化,可直接用于相似度计算
uuid
UUID 生成函数,生成一个随机的 UUID 字符串。
函数签名:
public String evaluate()返回值:返回一个随机 UUID 字符串,格式如 ee073e63-b74a-4c7e-8fea-60459729099c。
使用示例:
-- 生成请求 ID
CACHE TABLE request_meta AS
SELECT
user_id,
CAST(CURRENT_TIMESTAMP AS BIGINT) AS req_time,
uuid() AS req_id
FROM user_info;l2_norm
L2 归一化函数,对向量进行 L2 归一化处理。
函数签名:
public Object evaluate(Object vector)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
vector | Object | 输入向量,必须是数字列表 |
返回值:返回归一化后的向量(List<Double>),使得向量的 L2 范数为 1。
使用示例:
-- 对用户向量进行归一化
CACHE TABLE normalized_user AS
SELECT
user_id,
l2_norm(user_embedding) AS normalized_embedding
FROM user_features;工作原理:
- 计算向量的 L2 范数:
norm = sqrt(sum(x_i^2)) - 对每个元素除以范数:
x_i' = x_i / norm - 归一化后的向量常用于余弦相似度计算
ip
内积(Inner Product)计算函数,计算两个向量的内积(点积)。
函数签名:
public Object evaluate(Object emb1, Object emb2)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
emb1 | Object | 第一个向量,必须是数字列表 |
emb2 | Object | 第二个向量,必须是数字列表 |
返回值:返回两个向量的内积(Float)。
使用示例:
-- 计算用户向量和物品向量的内积
SELECT
user_id,
item_id,
ip(user_embedding, item_embedding) AS similarity
FROM user_item_pairs;
-- 向量召回:按内积排序
CACHE TABLE vector_recall AS
SELECT item_embedding.id AS item_id
FROM user_embedding JOIN item_embedding ON 1=1
ORDER BY ip(user_embedding.embedding, item_embedding.embedding) DESC
LIMIT 300;工作原理:
- 内积计算:
ip = sum(emb1[i] * emb2[i]) - 如果向量已归一化,内积等于余弦相似度
- 常用于向量检索和相似度计算
get
变量获取函数,从执行上下文中获取变量的值。常用于在SQL中引用通过 SET 语句设置的变量。
函数签名:
public static String evaluate(DataContext context, String key)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
key | String | 变量名 |
返回值:返回变量的值(String),如果变量不存在则返回 NULL。
注意
由于 get 是 SQL 关键字,使用时需要用反引号包裹函数名,写作 `get`。
使用示例:
-- 设置变量
SET 'user_id' = '12345';
-- 获取变量值
SELECT `get`('user_id') AS user_id;
-- 在表达式中使用
SELECT `get`('user_id') || '_suffix' AS user_id_with_suffix;
-- 类型转换
SELECT CAST(`get`('limit_count') AS INT) AS limit_count;
-- 从表中获取变量名并使用
CACHE TABLE var_names AS SELECT 'user_id' AS var_name;
SELECT `get`(var_name) AS var_value FROM var_names;工作原理:
- 函数接收一个变量名作为参数
- 从执行上下文(
ExecuteContext)中查找对应的变量值 - 返回变量值,如果变量不存在则返回
NULL
典型应用场景:
- 参数化SQL查询
- 动态配置传递
- 跨语句共享变量
get_or_default
变量获取函数(带默认值),从执行上下文中获取变量的值,如果变量不存在则返回指定的默认值。
函数签名:
public static String evaluate(DataContext context, String key, String defaultValue)参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
key | String | 变量名 |
defaultValue | String | 默认值,当变量不存在时返回 |
返回值:返回变量的值(String),如果变量不存在则返回 defaultValue。
使用示例:
-- 设置变量
SET 'func_name' = 'add_col';
-- 获取变量值,如果不存在则使用默认值
SELECT `get_or_default`('user_id', 'default_user') AS user_id;
-- 动态调用函数:变量存在时使用变量值
CALL `get_or_default`('func_name', 'shuffle')(my_table);
-- 动态调用函数:变量不存在时使用默认值
CALL `get_or_default`('unknown_func', 'shuffle')(my_table);工作原理:
- 函数接收变量名和默认值两个参数
- 从执行上下文(
ExecuteContext)中查找对应的变量值 - 如果变量存在,返回变量值;如果变量不存在,返回默认值
典型应用场景:
- 动态函数调用,提供兜底函数
- 配置项获取,提供默认配置
- 参数化 SQL,提供默认参数
自定义 UDF
可以参考 编程模型 文档了解如何开发自定义 UDF。