Apache Zeppelin 整合 Spark 和 Hudi
•
编程语言
一 环境信息
1.1 组件版本
| 组件 | 版本 |
|---|---|
| Spark | 3.2.3 |
| Hudi | 0.14.0 |
| Zeppelin | 0.11.0-SNAPSHOT |
1.2 环境准备
- Zeppelin 整合 Spark 参考:Apache Zeppelin 一文打尽
- Hudi0.14.0编译参考:Hudi0.14.0 最新编译
二 整合 Spark 和 Hudi
2.1 配置
%spark.conf SPARK_HOME /usr/lib/spark # set execution mode spark.master yarn spark.submit.deployMode client # --jars spark.jars /root/app/jars/hudi-spark3.2-bundle_2.12-0.14.0.jar # --conf spark.serializer org.apache.spark.serializer.KryoSerializer spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.kryo.registrator org.apache.spark.HoodieSparkKryoRegistrar
Specifying yarn-client & yarn-cluster in spark.master is not supported in Spark 3.x any more, instead you need to use spark.master and spark.submit.deployMode together.
| Mode | spark.master | spark.submit.deployMode |
|---|---|---|
| Yarn Client | yarn | client |
| Yarn Cluster | yarn | cluster |
2.2 导入依赖
%spark import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.table.HoodieTableConfig._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions._ import org.apache.hudi.common.model.HoodieRecord import spark.implicits._
2.3 插入数据
%spark
val tableName = "trips_table"
val basePath = "hdfs:///tmp/trips_table"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
option(PARTITIONPATH_FIELD_NAME.key(), "city").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
2.3 查询数据
%spark
val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.createOrReplaceTempView("trips_table")
spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM trips_table WHERE fare > 20.0").show()
结果:
+--------------------+-----+-------------+-------+--------+-------------+ | uuid| fare| ts| rider| driver| city| +--------------------+-----+-------------+-------+--------+-------------+ |e96c4396-3fad-413...| 27.7|1695091554788|rider-C|driver-M|san_francisco| |9909a8b1-2d15-4d3...| 33.9|1695046462179|rider-D|driver-L|san_francisco| |e3cf430c-889d-401...|34.15|1695516137016|rider-F|driver-P| sao_paulo| +--------------------+-----+-------------+-------+--------+-------------+

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/89d9e635ea.html
