数据湖之Hudi基础:入门介绍和编译部署
主要记录下Hudi的概述和打包编译等内容,方便参考
文章目录
- 简介
- 官网
- 发展历史
- Hudi特性
- 使用场景
- 安装部署
- 编译环境准备
- 编译hudi
- 1.源码包上传到服务器
- 2.修改pom文件
- 3.修改源码兼容hadoop3
- 4.手动安装kafka依赖(非必须)
- 5.解决spark模块依赖冲突
- 6.执行编译
- 7.测试hudi-client
- 简单测试编译后spark包可用性
简介
Apache Hudi(Hadoop Upserts Delete and Incremental)是下一代流数据湖平台。Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取服务、数据集群/压缩优化和并发,同时保持数据的开源文件格式。
Apache Hudi不仅非常适合于流工作负载,而且还允许创建高效的增量批处理管道。
Apache Hudi可以轻松地在任何云存储平台上使用。Hudi的高级性能优化,使分析工作负载更快的任何流行的查询引擎,包括Apache Spark、Flink、Presto、Trino、Hive等。
官网
https://hudi.apache.org/
Apache Hudi(Hadoop Upserts Delete and Incremental)
是下一代流数据湖平台。Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取服务、数据集群/压缩优化和并发,同时保持数据的开源文件格式。
Apache Hudi不仅非常适合于流工作负载,而且还允许创建高效的增量批处理管道。
Apache Hudi可以轻松地在任何云存储平台上使用。Hudi的高级性能优化,使分析工作负载更快的任何流行的查询引擎,包括Apache Spark、Flink、Presto、Trino、Hive等。
发展历史
2015 年:发表了增量处理的核心思想/原则(O’reilly 文章)。
2016 年:由 Uber 创建并为所有数据库/关键业务提供支持。
2017 年:由 Uber 开源,并支撑 100PB 数据湖。
2018 年:吸引大量使用者,并因云计算普及。
2019 年:成为 ASF 孵化项目,并增加更多平台组件。
2020 年:毕业成为 Apache 顶级项目,社区、下载量、采用率增长超过 10 倍。
2021 年:支持 Uber 500PB 数据湖,SQL DML、Flink 集成、索引、元服务器、缓存。
Hudi特性
-
可插拔索引机制支持快速Upsert/Delete。
-
支持增量拉取表变更以进行处理。
-
支持事务提交及回滚,并发控制。
-
支持Spark、Presto、Trino、Hive、Flink等引擎的SQL读写。
-
自动管理小文件,数据聚簇,压缩,清理。
-
流式摄入,内置CDC源和工具。
-
内置可扩展存储访问的元数据跟踪。
-
向后兼容的方式实现表结构变更的支持。
使用场景
近实时写入
-
减少碎片化工具的使用。
-
CDC 增量导入 RDBMS 数据。
-
限制小文件的大小和数量。
近实时分析
-
相对于秒级存储(Druid, OpenTSDB),节省资源。
-
提供分钟级别时效性,支撑更高效的查询。
-
Hudi作为lib,非常轻量。
增量 pipeline
- 区分arrivetime和event time处理延迟数据。
- 更短的调度interval减少端到端延迟(小时 -> 分钟) => Incremental Processing。
增量导出
- 替代部分Kafka的场景,数据导出到在线服务存储 e.g. ES。
安装部署
hudi是以lib包的形式提供功能,不同版本对spark、flink支持的依赖包不一样,具体要看官网对应版本的版本支持说明
本文会做的测试的环境如下
Linux Centos7
组件 | 版本 |
---|---|
Hudi | 0.12.1 |
Hadoop | 3.2.4 |
Hive | 3.1.3 |
Flink | 1.14 scala-2.12 |
Spark | 3.2.2 scala-2.12 |
hudi官网只提供了源码,需要自己编译
编译环境准备
linux环境编译
部署maven并配置环境变量
这个简单就不贴了
maven版本最好3.6以上别太低
这里贴下指定阿里仓库
修改setting.xml,指定为阿里仓库地址
vim $MAVEN_HOME/conf/settings.xml
<!-- 添加阿里云镜像-->
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
编译hudi
1.源码包上传到服务器
源码下载:https://dlcdn.apache.org/hudi/0.12.1/hudi-0.12.1.src.tgz
将hudi-0.12.1.src.tgz
上传到/opt/software
,并解压
tar -zxvf /opt/software/hudi-0.12.1.src.tgz -C /opt/software
2.修改pom文件
vim /opt/software/hudi-0.12.1/pom.xml
- 新增repository加速依赖下载
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
- 修改Hive/Hadoop依赖的组件版本
<hadoop.version>3.2.4</hadoop.version>
<hive.version>3.1.3</hive.version>
3.修改源码兼容hadoop3
要兼容hadoop3,除了修改版本,还需要修改如下代码:
vim /opt/software/hudi-0.12.1/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
修改第110行,增加一个入参 null
vim 进入末行模式,然后输入set nu回车就可以看到行号
110行原先代码try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
修改后:try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) {
110 try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) {
111 try (HoodieParquetStreamWriter<IndexedRecord> parquetWriter = new HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {
112 for (IndexedRecord record : records) {
113 String recordKey = getRecordKey(record).orElse(null);
114 parquetWriter.writeAvro(recordKey, record);
115 }
116 outputStream.flush();
117 }
118 }
4.手动安装kafka依赖(非必须)
0.12.0似乎是有这个问题,我这0.12.1编译没有这个问题
如果编译报错:common-utils-5.3.4.jar
、common-config-5.3.4.jar
、kafka-avro-serializer-5.3.4.jar
、kafka-schema-registry-client-5.3.4.jar
这几个jar找不到,那就要单独下载并install到你的maven仓库。
下载地址:http://packages.confluent.io/archive/5.3/confluent-5.3.4-2.12.zip
解压后找到以上报错找不到的jar包,上传服务器,并install
mvn install:install-file -DgroupId=io.confluent -DartifactId=common-config -Dversion=5.3.4 -Dpackaging=jar -Dfile=./common-config-5.3.4.jar
mvn install:install-file -DgroupId=io.confluent -DartifactId=common-utils -Dversion=5.3.4 -Dpackaging=jar -Dfile=./common-utils-5.3.4.jar
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=5.3.4 -Dpackaging=jar -Dfile=./kafka-avro-serializer-5.3.4.jar
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=5.3.4 -Dpackaging=jar -Dfile=./kafka-schema-registry-client-5.3.4.jar
5.解决spark模块依赖冲突
修改了Hive版本为3.1.3,其携带的jetty是0.9.3,hudi本身用的jetty是0.9.4,存在依赖冲突。
不改可以编译通过,但是运行spark向hudi里插入数据会报错
个人测试是编译OK,但是执行插入数据就报如下错误
java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V
-
修改
hudi-spark-bundle
的pom
文件,排除低版本jetty,添加hudi指定版本的jettyvim /opt/software/hudi-0.12.1/packaging/hudi-spark-bundle/pom.xml
大概
369
行位置开始的hive-service
、hive-jdbc
、hive-metastore
、hive-common
增加下方的
<exclusions>...</exclusions>
部分
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
在此文件增加依赖
<!-- 增加hudi配置版本的jetty -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>${jetty.version}</version>
</dependency>
-
修改
hudi-utilities-bundle
的pom
文件,排除低版本jetty,添加hudi指定版本的jetty解决的是:使用DeltaStreamer工具向hudi表插入数据时,也会报Jetty的错误使用DeltaStreamer工具向hudi表插入数据时,也会报Jetty的错误
vim /opt/software/hudi-0.12.1/packaging/hudi-utilities-bundle/pom.xml
hudi依赖相关:搜索找到hudi-common位置
hudi-0.12.1中,此包使用的是maven-shade-plugin插件进行include hudi相关依赖,故我们也是用相同方式进行exclude
在的下一级(与同级)增加
<excludes>
<exclude>org.eclipse.jetty:*</exclude>
</excludes>
另外:如果是hudi-0.12.0版本,可能不是使用maven-shade-plugin插件进行include hudi相关依赖,而使用的是正常depency的依赖引入,那么需要做如下几个依赖的exclude
hudi-common
和hudi-client-common
增加exclude项【hudi-0.12.1不用此操作】
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
Hive依赖相关:搜索hive-service的依赖位置,对如下几个依赖进行处理
hive-service
增加exclude项
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
hive-jdbc
增加exclude
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
hive-metastore
增加exclude项
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
hive-common
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
增加jetty单独依赖
<!-- 增加hudi配置版本的jetty -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>${jetty.version}</version>
</dependency>
6.执行编译
mvn clean package -DskipTests -Dspark3.2 -Dflink1.14 -Dscala-2.12 -Dhadoop.version=3.2.4 -Pflink-bundle-shade-hive3
编译完成后,相关的包在packaging目录的各个模块中的target里:
[root@m1 packaging]# pwd
/opt/software/hudi-0.12.1/packaging
[root@m1 packaging]# ls
hudi-aws-bundle hudi-hadoop-mr-bundle hudi-presto-bundle hudi-utilities-bundle
hudi-datahub-sync-bundle hudi-hive-sync-bundle hudi-spark-bundle hudi-utilities-slim-bundle
hudi-flink-bundle hudi-integ-test-bundle hudi-timeline-server-bundle README.md
hudi-gcp-bundle hudi-kafka-connect-bundle hudi-trino-bundle
7.测试hudi-client
/opt/software/hudi-0.12.1/hudi-cli/hudi-cli.sh
出现如下即OK
Main called
===================================================================
* ___ ___ *
* /\__\ ___ /\ \ ___ *
* / / / /\__\ / \ \ /\ \ *
* / /__/ / / / / /\ \ \ \ \ \ *
* / \ \ ___ / / / / / \ \__\ / \__\ *
* / /\ \ /\__\ / /__/ ___ / /__/ \ |__| / /\/__/ *
* \/ \ \/ / / \ \ \ /\__\ \ \ \ / / / /\/ / / *
* \ / / \ \ / / / \ \ / / / \ /__/ *
* / / / \ \/ / / \ \/ / / \ \__\ *
* / / / \ / / \ / / \/__/ *
* \/__/ \/__/ \/__/ Apache Hudi CLI *
* *
===================================================================
10137 [main] INFO org.apache.hudi.cli.Main [] - Starting Main v0.12.1 using Java 1.8.0_181 on m1 with PID 6681 (/opt
/software/hudi-0.12.1/hudi-cli/target/hudi-cli-0.12.1.jar started by root in /opt/software/hudi-0.12.1/packaging)
10145 [main] INFO org.apache.hudi.cli.Main [] - No active profile set, falling back to 1 default profile: "default"
Table command getting loaded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/software/hudi-0.12.1/hudi-cli/target/lib/log4j-slf4j-impl-2.17.2.jar!/org/slf4
j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/software/hudi-0.12.1/hudi-cli/target/lib/slf4j-reload4j-1.7.35.jar!/org/slf4j/
impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
11689 [main] WARN org.jline [] - The Parser of class org.springframework.shell.jline.ExtendedDefaultParser does not
support the CompletingParsedLine interface. Completion with escaped or quoted words won't work correctly.
11768 [main] INFO org.apache.hudi.cli.Main [] - Started Main in 2.031 seconds (JVM running for 11.806)
hudi->
简单测试编译后spark包可用性
需要有hadoop环境和spark
1.部署好hadoop集群、spark组件
这里不过多赘述如何安装这俩,spark只需要解压就行
spark下载:https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
2.拷贝编译好的包到spark的jars目录
cp /opt/software/hudi-0.12.1/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.1.jar /opt/module/spark-3.2.2/jars
3.启动hadoop
4.spark-shell方式测试
-
启动spark-shell
spark-shell \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
-
执行如下scala代码
// 设置表名,基本路径和数据生成器 import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_trips_cow" val basePath = "file:///tmp/hudi_trips_cow" val dataGen = new DataGenerator // 插入数据 val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath) // 查询数据 val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
也可以去/tmp/hudi_trips_cow/目录下查看是否有数据文件
-
执行示例
[root@m3 spark3]# bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2023-01-16 01:40:40,221 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://m3:4040 Spark context available as 'sc' (master = local[*], app id = local-1673851241535). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.2.2 /_/ Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181) Type in expressions to have them evaluated. Type :help for more information. scala> import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.QuickstartUtils._ scala> import scala.collection.JavaConversions._ import scala.collection.JavaConversions._ scala> import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SaveMode._ scala> import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceReadOptions._ scala> import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._ scala> import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.config.HoodieWriteConfig._ scala> val tableName = "hudi_trips_cow" tableName: String = hudi_trips_cow scala> val basePath = "file:///tmp/hudi_trips_cow" basePath: String = file:///tmp/hudi_trips_cow scala> val dataGen = new DataGenerator dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@2b2bcb4a scala> val inserts = convertToStringList(dataGen.generateInserts(10)) inserts: java.util.List[String] = [{"ts": 1673839191417, "uuid": "0b652f6a-1349-444e-8442-976fc149b589", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1673565741975, "uuid": "20e0932b-5baa-4c74-a423-8e72a3c1dcef", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1673405567377, "uuid": "2417e9e6-c5a7-4399-b7a1-4b6e2fb90372", "rider": "rider-213", "driver"... scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) warning: one deprecation (since 2.12.0) warning: one deprecation (since 2.2.0) warning: two deprecations in total; for details, enable `:setting -deprecation' or `:replay -deprecation' df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Overwrite). | save(basePath) warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation' 2023-01-16 01:41:57,422 WARN config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf 2023-01-16 01:41:57,443 WARN config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file 2023-01-16 01:41:57,471 WARN hudi.HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_cow already exists. Deleting existing data & overwriting with new data. 2023-01-16 01:41:58,404 WARN metadata.HoodieBackedTableMetadata: Metadata table was not found at path file:/tmp/hudi_trips_cow/.hoodie/metadata scala> val tripsSnapshotDF = spark. | read. | format("hudi"). | load(basePath) tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") scala> spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() +------------------+-------------------+-------------------+-------------+ | fare| begin_lon| begin_lat| ts| +------------------+-------------------+-------------------+-------------+ | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1673405567377| | 93.56018115236618|0.14285051259466197|0.21624150367601136|1673491795155| | 27.79478688582596| 0.6273212202489661|0.11488393157088261|1673772916404| | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1673347004963| |34.158284716382845|0.46157858450465483| 0.4726905879569653|1673839191417| | 66.62084366450246|0.03844104444445928| 0.0750588760043035|1673613988097| | 43.4923811219014| 0.8779402295427752| 0.6100070562136587|1673565741975| | 41.06290929046368| 0.8192868687714224| 0.651058505660742|1673298206656| +------------------+-------------------+-------------------+-------------+
如果插入时报错:java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V
去看下上文:解决spark依赖冲突小节解决