You need to enable JavaScript to run this app.
导航

高阶使用

最近更新时间2024.04.10 14:57:49

首次发布时间2022.04.28 13:01:14

火山引擎 E-MapReduce(EMR)支持通过 Spark、Flink 、 Hive 、Presto和Trino 等引擎对 Hudi 表进行读写操作。创建EMR集群,并安装Hudi服务后,EMR已经默认将Hudi相关依赖集成到Flink、Spark、Hive、Trino、Presto开源组件中,无需额外配置。
Hudi 整体支持情况如下:

查询操作写入操作
COWMORCOWMOR
Spark支持支持支持支持
Flink支持支持支持支持
Presto支持支持不支持不支持
Trino支持支持不支持不支持
Hive支持支持不支持不支持

1 Hudi-Spark集成

可以通过Spark 3.x和Spark-2.4.3+对 Hudi 进行读写操作。我们更推荐使用 SparkSQL 的 SQL 语法来操作 Hudi,可以极大的简化 Hudi 的使用成本。使用的基本方法详见 Hudi 使用说明-基础使用

1.1 集成配置

本段主要介绍如何使用 Spark ThriftServer 配置连接 Hudi。

说明

目前只有EMR 2.x版本才支持Spark ThriftServer。

  1. 登录 EMR 控制台

  2. 在左侧导航栏中,进入集群详情 > 服务列表 > Spark > 服务参数界面。

  3. 安装完 Hudi 后,可以到 sparkthriftserver 配置页面,找到 spark-defaults 中的 spark.sql.extensions 加上 org.apache.spark.sql.hudi.HoodieSparkSessionExtension (如果已有存在的值,用逗号隔开)
    alt

  4. 选择自定义配置,在 spark-thrift-sparkconf 添加新的选项 spark.serializer,写入值 org.apache.spark.serializer.KyroSerializer

  1. 对于 EMR 1.3 版本,需要额外增加一个配置,EMR 1.2 版本不需要该步骤

    1. 选项的key为:spark.sql.catalog.spark_catalog

    2. 选项的value为:org.apache.spark.sql.hudi.catalog.HoodieCatalog

  2. 单击确定按钮,完成参数配置。

  3. 单击右上角服务操作 > 重启按钮,重启 Spark 全部组件。

1.2 使用方式

使用 beeline 连接 sparkthriftserver 用于测试, 参考 LDAP 文档Spark最佳实践 ,来配置用户名密码进行 sparkthriftserver 连接。

beeline -u jdbc:hive2://emr-30f8q2lxxxxxxxxxx-master-1:10000/default -n xxxx -p xxxxx

接着您即可使用标准的 SparkSQL 操作 Hudi 表。

1.3 导入外表到 hudi 中

对于已有的外表,我们也可通过 SparkSQL 将外表数据导入到 hudi 表中,下方是一个很小的 lineitem 表,将其保存为文本文件,上传。

lineitem_small.tbl
11.82KB

文本样例内容如下表所示

1|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the|
1|67310|7311|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold |
1|63700|3701|3|8|13309.60|0.10|0.02|N|O|1996-01-29|1996-03-05|1996-01-31|TAKE BACK RETURN|REG AIR|riously. regular, express dep|
1|2132|4633|4|28|28955.64|0.09|0.06|N|O|1996-04-21|1996-03-30|1996-05-16|NONE|AIR|lites. fluffily even de|
1|24027|1534|5|24|22824.48|0.10|0.04|N|O|1996-03-30|1996-03-14|1996-04-01|NONE|FOB| pending foxes. slyly re|
1|15635|638|6|32|49620.16|0.07|0.02|N|O|1996-01-30|1996-02-07|1996-02-03|DELIVER IN PERSON|MAIL|arefully slyly ex|
2|106170|1191|1|38|44694.46|0.00|0.05|N|O|1997-01-28|1997-01-14|1997-02-02|TAKE BACK RETURN|RAIL|ven requests. deposits breach a|
3|4297|1798|1|45|54058.05|0.06|0.00|R|F|1994-02-02|1994-01-04|1994-02-23|NONE|AIR|ongside of the furiously brave acco|
3|19036|6540|2|49|46796.47|0.10|0.00|R|F|1993-11-09|1993-12-20|1993-11-24|TAKE BACK RETURN|RAIL| unusual accounts. eve|
3|128449|3474|3|27|39890.88|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |

1.3.1 上传数据集

通过 scp/或者其它方式上传数据集到集群

scp lineitem_small.tbl root@master_ip

登陆集群进行上传

ssh master_ip

 export HADOOP_USER_NAME=hive
 hadoop fs -mkdir /user/hive/lineitem
 hadoop fs -put lineitem_small.tbl /user/hive/lineitem/

1.3.2 建立外表

使用 Beeline 等方式连接 SparkThriftServer,参考 1.2 使用方式
使用 DDL 建表用于加载源文件

create external table lineitem (
  l_orderkey int, 
  l_partkey int,
  l_suppkey int,
  l_linenumber int,
  l_quantity double,
  l_extendedprice double,
  l_discount double,
  l_tax double,
  l_returnflag string,
  l_linestatus string,
  l_shipdate string,
  l_commitdate string,
  l_receiptdate string,
  l_shipinstruct string,
  l_shipmode string,
  l_comment string) 
row format delimited 
fields terminated by '|' 
stored as textfile 
location '/user/hive/lineitem';

1.3.3 导入 Hudi 表

创建 Hudi 表并使用 SparkSQL 导入外部数据到 hudi_lineitem 表中

create table hudi_lineitem using hudi tblproperties (type = 'cow', primaryKey = 'l_orderkey') select * from lineitem;

查询 Hudi 表,确认数据导入Hudi成功。

SELECT
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
FROM
    hudi_lineitem
WHERE
    l_shipdate <= date '1998-12-01' - interval '90' day
GROUP BY
    l_returnflag,
    l_linestatus
ORDER BY
    l_returnflag,
    l_linestatus;

接着您就可以使用 Hudi 的特性对其中的数据进行增删改查操作。您可以参考 Hudi 使用说明中的语法,详见使用说明

2 Hudi-Presto 集成

2.1 集成配置

EMR Presto 已与 Hudi 深度集成,所以您无需进行额外的配置,即可查询 Hudi 表数据。目前 Presto 支持 Hudi 表查询,不支持 Hudi 表写入。因此 Presto 需要在 Spark 端提前导入数据用于查询测试。

2.2 连接方式

可以使用的presto cli连接测试,或者参考 presto 基础使用文档。

presto --user <username> --password --catalog hive

2.3 使用 PrestoSQL 查询 Hudi

可以直接使用上文 SparkSQL 导入的样例表进行查询。

use default;
show tables;
SELECT
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
FROM
    hudi_lineitem
WHERE
    l_shipdate <= date '1998-12-01' - interval '90' day
GROUP BY
    l_returnflag,
    l_linestatus
ORDER BY
    l_returnflag,
    l_linestatus;

3 Hudi-Trino 集成

3.1 集成配置

Trino 和 Hudi 同为可选组件,需要在集群组件可选列表里面安装。 安装完 Hudi 后,只需要重启 Trino 全部组件,即可使 Trino 能够查询 Hudi 表。

3.2 使用方法

连接 Trino 请参考 Trino 使用文档,配置 Trino 的 cli 连接字符串:

trino --user <username> --password --catalog hive

集成后即可安装标准的 Trino SQL 语法完整查询 COW 表。对于 MOR 表,Trino 支持有限,因此不推荐在 Trino 中使用 MOR 表。

use default;
show tables;
select * from hudi_cow_nonpcf_tbl;

4 Hudi-Hive 集成

如果是创建EMR集群后,才安装Hudi组件,则需要在成功安装Hudi后,重启Hive相关服务,否则不需要做额外配置。

4.1 使用方法

使用 beeline 连接 hiveserver2 进行测试,在 -n -p 参数后指定对应的用户名密码,以及选择正确的hiveserver2地址。

beeline -u jdbc:hive2://master-1-1:10000/default -n xxx -p xxxx

查询 MOR 表时,需要先执行以下命令,来设置对应的参数,COW 表无需额外配置:

set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

在 HiveServer2 中正常查询 Hudi 表即可。

select * from hudi_cow_nonpcf_tbl;
select count(1) from hudi_cow_nonpcf_tbl;