You need to enable JavaScript to run this app.
导航

StarRocks Spark Connector

最近更新时间2024.04.30 14:09:53

首次发布时间2024.04.30 14:09:53

StarRocks 支持通过 Spark 读取或写入数据。您可以使用 Spark Connector 连接 Spark 与 StarRocks 实现数据导入,其原理是在内存中对数据进行攒批,按批次使用 Stream Load 将数据导入 StarRocks。Spark Connector 支持 DataFrame 和 SQL 接入形式,并支持 Batch 和 Structured Streaming 作业类型。

1 获取 Spark Connector

您可以从 Maven 中央仓库 中下载与您 Spark 版本匹配的最新的 spark-connector-starrocks.jar 文件,也可以使用由 EMR 团队提供的 Spark Connector 版本。

说明

  • EMR 团队提供的 Spark Connector Jar 文件随 Spark 安装包一同附送,您可以在支持部署 Spark 组件的 EMR 集群 /usr/lib/emr/current/spark/jars 路径下找到对应的 jar 文件。

  • Spark Connector 默认不包含 JDBC 驱动,您需要确保在 classpath 路径下包含 mysql-connector-java.jar 文件。

相对于开源版本的 Spark Connector,我们更加推荐您使用 EMR 团队提供的 Spark Connector 版本,相对而言优势如下:

  1. 能够与 EMR 集群,及其周边生态更好的集成。

  2. 增加一些 EMR 团队定制开发的竞争力特性。

  3. 更加及时修复一些已知的 bug。

不过兼容开源是 EMR 作为开源大数据平台的基本原则,您仍然可以坚持使用开源 Spark Connector 版本。

2 使用方式

本小节以导入数据到 StarRocks 明细表 examples.tb_duplicate_key 为例,该表的建表语句如下:

CREATE TABLE IF NOT EXISTS tb_duplicate_key
(
    event_time BIGINT       NOT NULL COMMENT 'timestamp of event',
    event_type INT          NOT NULL COMMENT 'type of event',
    user_id    INT          NOT NULL COMMENT 'id of user',
    device     VARCHAR(128) NULL     COMMENT 'device'
) ENGINE = OLAP DUPLICATE KEY(event_time, event_type)
DISTRIBUTED BY HASH(user_id)
PROPERTIES (
    'replication_num' = '3'
);

2.1 Spark SQL 方式

您可以直接通过 Spark SQL 形式将数据写入 StarRocks 对应数据表中,步骤如下:

  1. 进入 Spark SQL 交互终端,参考 Spark SQL Client 使用方式 进入 Spark SQL 交互终端。

  2. 通过 CREATE TABLE 创建一张 StarRocks tb_duplicate_key 表的映射表,不要求同名:

CREATE TABLE IF NOT EXISTS tb_duplicate_key 
USING starrocks 
OPTIONS
(
    "starrocks.fe.http.url"="{fe_ip}:8030",
    "starrocks.fe.jdbc.url"="jdbc:mysql://{fe_ip}:9030",
    "starrocks.table.identifier"="examples.tb_duplicate_key",
    "starrocks.user"="system_query_user",
    "starrocks.password"="******"
);
  1. 通过 INSERT INTO 操作将数据插入映射表:
INSERT INTO tb_duplicate_key
VALUES (1703128450, 1, 1001, 'PHONE'),
       (1703128451, 0, 1002, 'PAD'),
       (1703128452, 1, 1003, 'TV');

正常情况下,您可以在 StarRocks 中查询到刚刚由 Spark 侧写入的数据。

2.2 Spark DataFrame 方式

本小节以 Batch 任务为例,演示将内存中构造的数据通过 Spark DataFrame 方式导入 StarRocks 的 tb_duplicate_key 表。Scala 示例代码如下:

val spark = SparkSession
  .builder()
  .appName("load_data_example")
  .getOrCreate()

import spark.implicits._

// 模拟数据
val data = Seq(
  (1703128450, 1, 1001L, "PHONE"),
  (1703128451, 0, 1002L, "PAD"),
  (1703128452, 1, 1003L, "TV"),
)

// 将数据写入 StarRocks
val df = data.toDF("event_time", "event_type", "user_id", "device")
df.write
  .format("starrocks")
  .option("starrocks.fe.http.url", "{fe_ip}:8030")
  .option("starrocks.fe.jdbc.url", "jdbc:mysql://{fe_ip}:9030")
  .option("starrocks.table.identifier", "examples.tb_duplicate_key")
  .option("starrocks.user", "system_query_user")
  .option("starrocks.password", "******")
  .mode("append")
  .save()

关于如何提交 Spark 任务可以参考 Spark 使用文档

3 配置参数

参数必须参数值说明
starrocks.fe.jdbc.url配置 FE 节点 MySQL 服务器地址,格式为 jdbc:mysql://<fe_host>:<fe_query_port>
starrocks.fe.http.url配置 FE 节点 HTTP 服务器,格式为 <fe_host>:<fe_http_port>,多个以英文逗号 , 分隔。

starrocks.table.identifier

目标导入的 StarRocks 数据表,格式为:<database_name>.<table_name>

starrocks.userStarRocks 访问用户名称。
starrocks.passwordStarRocks 访问用户密码。

starrocks.write.label.prefix

  • 默认值:spark-

用于配置 Stream Load 导入任务 label 的前缀,推荐为作业依据具体的业务场景配置 label 前缀。

starrocks.write.enable.transaction-stream-load

  • 默认值:true

用于配置导入操作是否使用 Stream Load 事务接口,相对于普通接口在内存占用和性能层面均有更好的表现。

如果配置了 starrocks.write.max.retries > 0,则该参数不生效。

starrocks.write.buffer.size

  • 默认值:104857600

  • 单位:默认为字节,支持 kbmbgb

用于配置缓存在内存中的数据量,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。

starrocks.write.buffer.rowsInteger.MAX_VALUE用于配置缓存在内存中的数据条数,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。

starrocks.write.flush.interval.ms

  • 默认值:300000

  • 单位:毫秒

用于配置数据发送的时间间隔。

starrocks.write.max.retries

  • 默认值:3

用于配置最大失败重试次数。

如果该参数配置值大于 0,则忽略 starrocks.write.enable.transaction-stream-load 配置

starrocks.write.retry.interval.ms

  • 默认值:10000

  • 单位:毫秒

失败重试时间间隔。

starrocks.columns用于配置写入部分列,多个列名以英文逗号 , 分隔。
starrocks.write.num.partitions用于配置 Spark 并行写入的分区数,默认由 Spark 决定。

starrocks.write.partition.columns

用于配置 Spark 分区的列,如果不指定则使用所有写入的列进行分区。

该参数仅在配置starrocks.write.num.partitions 后生效。

starrocks.timezone用于配置时区。

4 更多信息

您可以访问 StarRocks 官方文档 了解关于使用 Spark Connector 向 StarRocks 导入数据的更多介绍,以及如何使用 Spark Connector 读取 StarRocks 中的数据。