以下记录Flink Table API批量写入Iceberg表,批量和实时读取表,以及和Hive表打通的一个小案例。借助Iceberg提供的ACID能力,我们可以间接完成对Hive表的细粒度数据变更操作(当然还有许多其他方面的良好支持)。
在本案例中,个人认为相对于Stream API,使用Table API读写Iceberg会更加方便。(当然Flink也可以实时读取Kafka表,而后实时写入到Iceberg表中,整条数据线是可以流批一体串通的。)
环境
Flink 1.13.6, Iceberg 0.13.1,Hadoop 3.2.2,Hive 3.1.2
写数据案例
注意:
- 需要开启checkpoint
- 为了方便后续整合,这里采用的是Hadoop catalog
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TableAPIWriteIceberg {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tabenv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1 create catalog
tabenv.executeSql("" +
"create catalog hadoop_iceberg_dev with " +
"('type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://chdp01:9000/iceberg/flink'" +
")");
//2 using catalog
tabenv.useCatalog("hadoop_iceberg_dev");
//3 create database under the above hadooop catalog
tabenv.executeSql("create database if not exists iceberg ");
//4 using iceberg db
tabenv.useDatabase("iceberg");
//5 create iceberg table
//tabenv.executeSql("drop table if exists hadoop_iceberg_dev.iceberg.userAddr");
tabenv.executeSql("" +
"create table hadoop_iceberg_dev.iceberg.userAddr( userId int, city string, dt string) partitioned by (dt)");
//6 insert data
tabenv.executeSql("insert into hadoop_iceberg_dev.iceberg.userAddr values(1,'SH','2022-12-12'),(2,'SH','2022-12-12'),(3,'BJ','2022-12-13')");
}
}
测试
可以在hive中创建location_based_table
,然后就可以通过hive结构直观的看到上面写入到Iceberg表中的数据了。
注意:
- 将Iceberg-hive-runtime-0.12.1.jar添加到Hive lib目录下,某些版本的hive可能没有libfb303-0.9.3.jar,也需要从网上下载添加。
create table iceberg_user_addr(
id int,
name string,
dt string
)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location 'hdfs://chdp01:9000/iceberg/flink/iceberg/userAddr'
tblproperties ('iceberg.catalog'='location_based_table');
select *from iceberg_user2 ;
结果截图:(多出来的4号数据是后续从hive表中插入的,可以忽视哈)
读数据案例
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TableAPIReadIceberg {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tabenv = StreamTableEnvironment.create(env);
Configuration conf = tabenv.getConfig().getConfiguration();
//conf for streaming read
conf.setBoolean("table.dynamic-table-options.enabled",true);
env.enableCheckpointing(1000);
//1 create catalog
tabenv.executeSql("" +
"create catalog hadoop_iceberg_dev with " +
"('type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://chdp01:9000/iceberg/flink'" +
")");
//2 batch read
// tabenv.executeSql(" select *from hadoop_iceberg_dev.iceberg.userAddr").print();
//3 streaming read
tabenv.executeSql(" select *from hadoop_iceberg_dev.iceberg.userAddr /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ").print();
}
}
测试
批量读取现象和常规数据查询一致。以下记录实时读取测试,其实就是Flink Sql的动态查询表。启动程序后从Hive表中插入数据,而后可从控制台中实时刷出的新增数据,其延迟时间和程序中设置的checkpoint时长正相关。
insert into table iceberg_user_addr values(4,'SH','2022-12-12');
依赖导入
<properties>
<flink.version>1.13.6</flink.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.22</slf4j.version>
<hive.version>3.1.2</hive.version>
<!--<iceberg.version>0.11.1</iceberg.version>-->
<iceberg.version>0.13.1</iceberg.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>${iceberg.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.13</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!--added for 0.12 iceberg version, with class not found bug -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>1.11.6</version>
</dependency>
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>
<!--CDC-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
版权声明:本文为qq_34901049原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。