Skip to content

内置 Connectors

本文档介绍 SQLRec 内置的数据连接器及其使用方法。

概述

SQLRec 提供了多种内置连接器,用于连接不同的数据存储系统。连接器基于 Calcite 表抽象实现,支持 SQL 查询和数据写入操作。

表类型层次结构

SQLRec 的连接器基于以下表类型层次结构:

SqlRecTable (抽象基类)

    ├── SqlRecKvTable (键值表,支持主键查询和缓存)
    │       │
    │       └── SqlRecVectorTable (向量表,支持向量检索)

    └── 其他表类型...

表类型说明

表类型说明特性
SqlRecTable抽象基类,继承自 Calcite 的 AbstractTable提供基础表功能
SqlRecKvTable键值表,支持主键查询支持主键索引、缓存机制、过滤查询、数据修改
SqlRecVectorTable向量表,支持向量检索继承 SqlRecKvTable,支持向量相似度搜索

内置连接器

1. Redis Connector

Redis 连接器用于连接 Redis 数据库,支持键值存储和查询。

连接器标识符redis

继承类型SqlRecKvTable

特性

  • 支持单机模式和集群模式
  • 支持 String 和 List 数据结构
  • 支持本地缓存加速查询
  • 支持主键过滤查询
  • 支持数据写入和删除

配置参数

参数类型默认值说明
urlString-Redis 连接 URL,格式:redis://password@host:port/db
redis-modeStringsingleRedis 模式,可选值:single(单机)、cluster(集群)
data-structureStringstring数据结构,可选值:stringlist
max-list-sizeInteger0List 最大长度,0 表示无限制
ttlInteger2592000Key 过期时间(秒),默认 30 天
cache-ttlInteger30本地缓存过期时间(秒),0 表示不缓存
max-cache-sizeInteger100000本地缓存最大条目数

使用示例

sql
CREATE TABLE user_table (
  id BIGINT,
  name STRING,
  country STRING,
  age INT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'url' = 'redis://localhost:6379/0'
);

CREATE TABLE user_interest_category1 (
  user_id BIGINT,
  category1 STRING,
  score FLOAT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'data-structure' = 'list',
  'url' = 'redis://localhost:6379/0'
);

注意事项

  • Redis 连接器只支持主键相等过滤(WHERE key = value
  • 使用本地缓存可以显著提升查询性能
  • List 数据结构适合存储多值场景

2. Milvus Connector

Milvus 连接器用于连接 Milvus 向量数据库,支持向量相似度检索。

连接器标识符milvus

继承类型SqlRecVectorTable

特性

  • 支持向量相似度搜索(ANN)
  • 支持主键查询
  • 支持过滤条件
  • 支持数据插入和删除
  • 支持投影列优化

配置参数

参数类型默认值说明
urlString-Milvus 服务器地址
tokenString-Milvus 认证令牌
databaseStringdefault数据库名称
collectionString-集合名称

使用示例

sql
CREATE TABLE item_embedding (
  id BIGINT,
  embedding ARRAY<FLOAT>,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'milvus',
  'url' = 'http://localhost:19530',
  'token' = 'root:Milvus',
  'database' = 'default',
  'collection' = 'item_embedding'
);

注意事项

  • Milvus 连接器支持复杂的过滤条件
  • 向量搜索需要指定向量字段和查询向量
  • 支持投影列优化,只返回需要的列

3. Kafka Connector

Kafka 连接器用于连接 Apache Kafka 消息队列,支持消息写入。

连接器标识符kafka

继承类型SqlRecTable

特性

  • 支持消息写入到 Kafka Topic
  • 支持 JSON 格式消息
  • 支持批量发送优化
  • 支持自定义序列化器

配置参数

参数类型默认值说明
properties.bootstrap.serversString-Kafka Broker 地址
topicString-Kafka Topic 名称
formatStringjson消息格式
properties.producer.key.serializerStringorg.apache.kafka.common.serialization.StringSerializerKey 序列化器
properties.producer.value.serializerStringorg.apache.kafka.common.serialization.StringSerializerValue 序列化器
properties.producer.linger.msInteger5000批量发送等待时间(毫秒)

使用示例

sql
CREATE TABLE rec_log_kafka (
  user_id BIGINT,
  item_id BIGINT,
  item_name STRING,
  rec_reason STRING,
  req_time BIGINT,
  req_id STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'rec_log',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json'
);

注意事项

  • Kafka 连接器主要用于消息写入,不支持查询操作
  • linger.ms 参数控制批量发送,较大的值可以提高吞吐量但增加延迟
  • 消息以 JSON 格式发送到 Kafka