You need to enable JavaScript to run this app.
导航

StarRocks Flink Connector

最近更新时间2024.04.30 14:09:59

首次发布时间2024.04.30 14:09:59

StarRocks 支持通过 Flink 读取或写入数据。您可以使用 Flink Connector 连接 Flink 与 StarRocks 实现数据导入,其原理是在内存中对数据进行攒批,按批次使用 Stream Load 将数据导入 StarRocks。Flink Connector 支持 DataStream API、Table API & SQL,以及 Python API,并且相对于 Flink 官方提供的 JDBC Connector 具备更好的性能和稳定性。

您可以从 Maven 中央仓库 中下载与您 Flink 版本匹配的最新的 flink-connector-starrocks.jar 文件,也可以使用由 EMR 团队提供的 Flink Connector 版本。

EMR 团队提供的 Flink Connector Jar 文件随 Flink 安装包一同附送,您可以在支持部署 Flink 组件的 EMR 集群 /usr/lib/emr/current/flink/connectors 路径下找到对应的 jar 文件。

相对于开源版本的 Flink Connector,我们更加推荐您使用 EMR 团队提供的 Flink Connector 版本,相对而言优势如下:

  1. 能够与 EMR 集群,及其周边生态更好的集成。

  2. 增加一些 EMR 团队定制开发的竞争力特性。

  3. 更加及时修复一些已知的 bug。

不过兼容开源是 EMR 作为开源大数据平台的基本原则,您仍然可以坚持使用开源 Flink Connector 版本。

2 使用方式

本小节以导入数据到 StarRocks 明细表 examples.tb_duplicate_key 为例,该表的建表语句如下:

CREATE TABLE IF NOT EXISTS tb_duplicate_key
(
    event_time BIGINT       NOT NULL COMMENT 'timestamp of event',
    event_type INT          NOT NULL COMMENT 'type of event',
    user_id    INT          NOT NULL COMMENT 'id of user',
    device     VARCHAR(128) NULL     COMMENT 'device'
) ENGINE = OLAP DUPLICATE KEY(event_time, event_type)
DISTRIBUTED BY HASH(user_id)
PROPERTIES (
    'replication_num' = '3'
);

您可以直接通过 Flink SQL 形式将数据写入 StarRocks 对应数据表中,步骤如下:

  1. 进入 Flink SQL 交互终端,参考 Flink SQL Client 使用方式 进入 Flink SQL 交互终端,这里以 YARN Session 模式为例。

  2. 通过 CREATE TABLE 创建一张 StarRocks tb_duplicate_key 表的映射表,不要求同名:

CREATE TABLE IF NOT EXISTS tb_duplicate_key
(
    event_time BIGINT       NOT NULL COMMENT 'timestamp of event',
    event_type INT          NOT NULL COMMENT 'type of event',
    user_id    INT          NOT NULL COMMENT 'id of user',
    device     VARCHAR(128) NULL COMMENT 'device'
)
WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://{fe_ip}:9030',
    'load-url' = '{fe_ip}:8030',
    'database-name' = 'examples',
    'table-name' = 'tb_duplicate_key',
    'username' = 'system_query_user',
    'password' = '***'
);
  1. 通过 INSERT INTO 操作将数据插入映射表:
INSERT INTO tb_duplicate_key
VALUES (1703128450, 1, 1001, 'PHONE'),
       (1703128451, 0, 1002, 'PAD'),
       (1703128452, 1, 1003, 'TV');

正常情况下,您可以在 StarRocks 中查询到刚刚由 Flink 侧写入的数据。

您也可以按照输入的数据类型编写对应的 Flink DataStream 作业,将数据 Sink 到指定的 StarRocks 表,支持的数据格式包括 CSV、JSON,以及自定义 Java 对象。

2.2.1 CSV 格式数据示例

本小节演示将内存中构造的 CSV 数据通过 Flink DataStream 方式导入 StarRocks 的 tb_duplicate_key 表,示例代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 模拟 CSV 格式数据
String[] records = new String[]{
        "1703128450, 1, 1001, PHONE",
        "1703128451, 0, 1002, PAD",
        "1703128452, 1, 1003, TV"
};

DataStream<String> source = env.fromElements(records);

// 配置 Sink 选项
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
        .withProperty("jdbc-url", "jdbc:mysql://{fe_ip}:9030")
        .withProperty("load-url", "{fe_ip}:8030")
        .withProperty("database-name", "examples")
        .withProperty("table-name", "tb_duplicate_key")
        .withProperty("username", "system_query_user")
        .withProperty("password", "******")
        .withProperty("sink.properties.format", "csv")
        .withProperty("sink.properties.column_separator", ",")
        .build();
// 创建并注册 Sink
source.addSink(StarRocksSink.sink(options));

env.execute("load_data_example");

关于如何提交 Flink 任务可以参考 Flink 使用文档

2.2.2 JSON 格式数据示例

本小节演示将内存中构造的 JSON 数据通过 Flink DataStream 方式导入 StarRocks 的 tb_duplicate_key 表,示例代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 模拟 JSON 格式数据
String[] records = new String[]{
        "{\"event_time\":1703128450,\"event_type\":1,\"user_id\":1001,\"device\":\"PHONE\"}",
        "{\"event_time\":1703128451,\"event_type\":0,\"user_id\":1002,\"device\":\"PAD\"}",
        "{\"event_time\":1703128452,\"event_type\":1,\"user_id\":1003,\"device\":\"TV\"}"
};

DataStream<String> source = env.fromElements(records);

// 配置 Sink 选项
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
        .withProperty("jdbc-url", "jdbc:mysql://{fe_ip}:9030")
        .withProperty("load-url", "{fe_ip}:8030")
        .withProperty("database-name", "examples")
        .withProperty("table-name", "tb_duplicate_key")
        .withProperty("username", "system_query_user")
        .withProperty("password", "******")
        .withProperty("sink.properties.format", "json")
        .build();
// 创建并注册 Sink
source.addSink(StarRocksSink.sink(options));

env.execute("load_data_example");

关于如何提交 Flink 任务可以参考 Flink 使用文档

2.2.3 自定义 Java 对象格式数据示例

本小节演示将内存中构造的 Java 数据对象通过 Flink DataStream 方式导入 StarRocks 的 tb_duplicate_key 表。假设该表单行数据的数据结构 Record 定义如下:

public static final class Record {

        private final Long eventTime;
        private final Integer eventType;
        private final Integer userId;
        private final String device;

        public Record(Long eventTime, Integer eventType, Integer userId, String device) {
            this.eventTime = eventTime;
            this.eventType = eventType;
            this.userId = userId;
            this.device = device;
        }

        // ... getter
    }

我们需要实现 StarRocksSinkRowBuilder 接口定义 Record 对象到行数组的转换方式:

private static class RecordTransformer implements StarRocksSinkRowBuilder<Record> {

        @Override
        public void accept(Object[] row, Record record) {
            row[0] = record.getEventTime();
            row[1] = record.getEventType();
            row[2] = record.getUserId();
            row[3] = record.getDevice();

            // 对于主键表而言,还需要进一步指定最后一个元素,标识当前操作是 UPSERT 或 DELETE
            // row[row.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
        }

    }

最后模拟在内存中构造 Java 数据对象并实现导入,示例如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 模拟 JSON 格式数据
Record[] records = new Record[]{
        new Record(1703128450L, 1, 1001, "PHONE"),
        new Record(1703128451L, 0, 1002, "PAD"),
        new Record(1703128452L, 1, 1003, "TV")
};

DataStream<Record> source = env.fromElements(records);

// 定义表 Schema
TableSchema schema = TableSchema.builder()
        .field("event_time", DataTypes.BIGINT().notNull())
        .field("event_type", DataTypes.INT().notNull())
        .field("user_id", DataTypes.INT().notNull())
        .field("device", DataTypes.VARCHAR(128))
        .build();

// 配置 Sink 选项
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
        .withProperty("jdbc-url", "jdbc:mysql://{fe_ip}:9030")
        .withProperty("load-url", "{fe_ip}:8030")
        .withProperty("database-name", "examples")
        .withProperty("table-name", "tb_duplicate_key")
        .withProperty("username", "system_query_user")
        .withProperty("password", "******")
        .build();
// 创建并注册 Sink
source.addSink(StarRocksSink.sink(schema, options, new RecordTransformer()));

env.execute("load_data_example");

关于如何提交 Flink 任务可以参考 Flink 使用文档

3 配置参数

参数必须参数值说明
connector固定为 starrocks。
jdbc-url配置 FE 节点 MySQL 服务器地址,格式为 jdbc:mysql://<fe_host>:<fe_query_port>,多个以英文逗号 , 分隔。
load-url配置 FE 节点 HTTP 服务器,格式为 <fe_host>:<fe_http_port>,多个以英文逗号 , 分隔。
database-name目标导入的 StarRocks 数据库名。
table-name目标导入的 StarRocks 数据表名。
usernameStarRocks 访问用户名称。
passwordStarRocks 访问用户密码。

sink.semantic

  • 默认值:at-least-once

Sink 一致性语义,支持 at-least-once 和 exactly-once。

sink.version

  • 默认值:AUTO

Flink Connector 底层基于 Stream Load 实现,该参数用户配置使用的 Stream Load 接口版本,支持:

  • V1:使用 Stream Load 普通接口。

  • V2:使用 Stream Load 事务接口,相对于 V1 而言在内存占用和 exactly-once 语义层面均有更好的表现。

  • AUTO:依据 StarRocks 版本自动选择,推荐配置为 AUTO。

sink.label-prefix用于配置 Stream Load 导入任务 label 的前缀,推荐为作业依据具体的业务场景配置 label 前缀。

sink.buffer-flush.max-bytes

  • 默认值:90M

  • 单位:字节

  • 取值范围:[64MB, 10GB]

用于配置缓存在内存中的数据量,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。

该参数仅在 sink.semantic=at-least-once 时生效。

sink.buffer-flush.max-rows

  • 默认值:500000

  • 取值范围:[64000, 5000000]

用于配置缓存在内存中的数据条数,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。

该参数仅在 sink.version=V1 && sink.semantic=at-least-once 时生效

sink.buffer-flush.interval-ms

  • 默认值:300000

  • 单位:毫秒

  • 取值范围:[1000, 3600000]

用于配置数据发送的时间间隔。

该参数仅在 sink.semantic=at-least-once 时生效。

sink.max-retries

  • 默认值:3

  • 取值范围:[0, 10]

用于配置最大失败重试次数。

该参数仅在 sink.version=V1 时生效。

sink.connect.timeout-ms

  • 默认值:30000

  • 单位:毫秒

  • 取值范围:[100, 60000]

用于配置建立连接的超时时间。

该参数默认值自 v1.2.9 版本开始调整为 30000 毫秒,此前为 1000 毫秒。

sink.socket.timeout-ms

  • 默认值:-1

  • 单位:毫秒

  • 取值范围:

用于配置 Socket 连接超时时间,建议参数值要大于 Stream Load timeout 配置值。

sink.wait-for-continue.timeout-ms

  • 默认值:10000

  • 单位:毫秒

  • 取值范围:[3000, 60000]

用于配置等待 FE HTTP 100-continue 应答的超时时间。

sink.ignore.update-before

  • 默认值:true

用于配置将数据导入到主键表时是否忽略来自 Flink 的 UPDATE_BEFORE 记录,如果设置为 false,则在主键表中视为 DELETE 操作。

sink.parallelism

用于配置写入并行度,默认由 Flink Planner 决定。

  • 该参数仅适用于Flink SQL。

  • 在多并行度的场景中,用户需要确保数据按正确顺序写入。

4 更多信息

您可以访问 StarRocks 官方文档 了解关于使用 Flink Connector 向 StarRocks 导入数据的更多介绍,以及如何使用 Flink Connector 读取 StarRocks 中的数据。