SparkSQL操作hudi

SparkSQL操作hudi

1、登录

#spark 3.1
spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

#spark 3.0
spark-sql --packages org.apache.hudi:hudi-spark3.0.3-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.0.3 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

2、创建普通表

#创建普通表
create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
) using hudi;
#创建没有分区的没有聚合字段
create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);

3、创建分区表

#创建分区表
create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

4、从现有表创建表

#非分区表
create table hudi_existing_tbl0 using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';
#分区表
create table hudi_existing_tbl1 using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';

5、用查询结果创建新表(CTAS)

#用查询结果创建新表
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;
#用查询结果创建分区表
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
#创建parquet数据表
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';
#利用数据进行分区
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (datestr) as select * from parquet_mngd;

6、插入数据

#向普通表插入数据
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
#向分区表插入数据
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
#插入静态分区
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;
#数据本地化
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

7、查询数据

#查询语句
select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0

8、修改数据

#修改数据的依法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
#例程
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

9、合并数据

#按条件合并数据
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]
/**
<merge_condition> =A equal bool condition 
<matched_action>  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action>  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
  */

10、删除数据

#语法结构
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
#例程
delete from hudi_cow_nonpcf_tbl where uuid = 1;

11、覆盖写入

#普通表
insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
#动态分区表
insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10';
#静态分区表
insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

12、修改数据表

#修改表名
ALTER TABLE oldTableName RENAME TO newTableName
#例程
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
#增加列簇
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
#例程
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
#修改列簇类型
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
#例程
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
#修改数据
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
#例程
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

13、hudi分区命令

#显示分区
SHOW PARTITIONS tableIdentifier
#例程
show partitions hudi_cow_pt_tbl;
#删除分区
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )
#例程
alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10');

版权声明:本文为m0_43405302原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。