Hudi实战

目录

Hudi 实战

概念 

表类型

查询类型

实战例子

先试试默认的Copy On Write表

再试试hudi和hive的集成表现

再试试默认的Merge On Read表

总结


Hudi 实战

官网 :https://hudi.apache.org/docs/spark_quick-start-guide.html#setup

概念 

Apache Hudi (pronounced “hoodie”) 在hadoop兼容的存储中提供流的功能

  • 更新/删除记录 (如何在表中更新或者删除数据?)
  • 变更流 (如何得到变化的数据?)

表类型

Hudi 支持2中表类型

  • Copy On Write : 用专用的列存储文件格式比如parquet来存储,更新操作实际上是简单的利用同步合并重新写一份全量文件。
  • Merge On Read : 用列存储(如parquet)和行存储(如 avro)两种文件格式的组合来存储,更新操作会把日志先记录在增量(delta)文件中,然后再同步或者异步地通过合并产生新的列存储全量文件。

如下是Copy On Write表的读写入示意图

如下是Merge On Read表的读写入示意图

查询类型

Hudi 支持3中查询类型

  • Snapshot Queries : 这种快照查询查询的是某张表在最近一次提交或者压缩后的快照,如果表是merge on read表,这种查询会产生分钟级别的延迟,因为它要实时合并全量文件和增量文件;如果表是 copy on write 表,这种查询会提供无需等待的即席查询,直接查询全量文件,并且提供更新删除操作等写相关的特性。
  • Incremental Queries : 这种查询只查询某表在最近一次提交或者压缩后的新数据,这强力地为增量管道场景提供了变更流特性。
  • Read Optimized Queries : 这种查询也是查询某张表在最近一次提交或者压缩后的快照,只展现全量文件内容以保证查询效率。

如下是三种查询在Merge On Read表的读示意图

实战例子

在大数据平台 华为 FusionInght HD 6.5.1中使用 hudi 0.7.0 版本 

先试试默认的Copy On Write表

1 初始化用户和环境

[omm@cnsz92vl09632 bin]$ source /opt/hadoopclient/bigdata_env
[omm@cnsz92vl09632 bin]$ kinit hdadmin
Password for hdadmin@HADOOP.COM:
[omm@cnsz92vl09632 bin]$ cd /opt/hadoopclient/Spark2x/spark/bin

2 由于环境离线在只能使用离线的hudi和avro包进行试验 分别下载3个jar包

3 由于我spark版本是2.3.2 有jar包冲突 先把spark libs里面的avro 包移走

4 使用spark-shell 启动hudi

spark-shell \
 --jars '/home/omm/hudi-spark-bundle_2.11-0.7.0.jar,/home/omm/spark-avro_2.11-2.4.0.jar,/home/omm/avro-1.8.2.jar'\
 --conf spark.driver.extraClassPath=/home/omm/avro-1.8.2.jar \
 --conf spark.executor.extraClassPath=/home/omm/avro-1.8.2.jar \
 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

 

5 按照官网例子 先引入5个包

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

6 指定表名 指定文件目录 创建随机生成器

val tableName = "hudi_trips_cow"
val basePath = "hdfs:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

 

7 随机生成10条 insert 数据形成inserts 结果集

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

8 将inserts结果集 读到 dataframe df 并写入hudi表 

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(TABLE_TYPE_OPT_KEY,MOR_TABLE_TYPE_OPT_VAL).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

9 我们看看文件下面是什么情况

10 查询hudi表

11 看看表结构

12 我们试试upsert

13 看看文件

发现每个分区都多了1个parquet文件

14 reload 数据集

再用sql 查询

15 我们试试 Incremental query

整个表里面有2个commit_time

查看 commttime 在 20210325145140 之后的数据查到7条有update的数据

16 试试 Point in time query

查看commttime 在 000到20210325145140 之间的数据 有10条

17 试试删除数据

选出2条准备删除

执行删除

再查一下数据

只有15条了

看一下文件发现其中一个目录多了一个parquet文件

再试试hudi和hive的集成表现

官方的jar包不支持华为HD hive 我改了源码做了集成。重新打了包hudi-spark-bundle_2.11-0.7.0.jar

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.config.HoodieIndexConfig._
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.hudi.config.HoodieWriteConfig._


val tableName = "hudi_trips_cow"
val basePath = "hdfs:///tenant/CMHK_TENANT/ads_cmhk_demo.db/hudi_trips_cow"
val dataGen = new DataGenerator

选择了一个新的hdfs路径

这里发现hive的class路径里面没有hudi格式类还要把上面的jar包放到hive 的lib 下面 重启hive

  df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(TABLE_NAME, tableName).
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(BLOOM_INDEX_UPDATE_PARTITION_PATH, "true").
  option(INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()).
  option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName).
  option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName).
  option(HIVE_SYNC_ENABLED_OPT_KEY, "true").
  option(HIVE_PARTITION_FIELDS_OPT_KEY, "d,r,t").
  option(HIVE_DATABASE_OPT_KEY, "ads_cmhk_demo").
  option(HIVE_TABLE_OPT_KEY, "myhuditable").
  option(HIVE_URL_OPT_KEY, "jdbc:hive2://100.75.186.178:24002,100.75.186.177:24002,100.75.186.176:24002/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.hadoop.com@HADOOP.COM").
  mode(Overwrite).
  save(basePath)

选择了一些hive的参数进行设置,包括kerberos的hive url,已经库名 表名 分区名

运行成功

换到hive里面看看情况

查一下这张表有10条数据 这是一张hive的外部表

再试试默认的Merge On Read表

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_mor"
val basePath = "hdfs:///tmp/hudi_trips_mor"
val dataGen = new DataGenerator

 

另外建一张在mor表

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(TABLE_TYPE_OPT_KEY,MOR_TABLE_TYPE_OPT_VAL).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

 

加入第3行的option 变做MOR表

看看文件

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*")
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select * from  hudi_trips_snapshot").show()

 

查询结果和COW表一样的

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(TABLE_TYPE_OPT_KEY,MOR_TABLE_TYPE_OPT_VAL).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

试下update,同样mor模式

看看文件,发现出现log文件,成功!

查看数据,已经update成功

总结

感觉hudi和kudu是做同件事情,在保证了性能的情况下解决了大数据中更新和删除的问题。不同点是hudi的存储可以基于hdfs,而kudu只能本地文件系统。


版权声明:本文为cy19871228cy原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。