streamset hive到mysql_如何使用StreamSets实时采集Kafka数据并写入Hive表

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github:

https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的

a5418acd959a9299d9b094f7aca6794a.png

内容概述

1.测试环境准备

2.配置StreamSets

3.创建Pipline及测试

4.总结

测试环境

1.RedHat7.3

2.CM和CDH版本为cdh5.13.3

3.Kafka2.2.0(0.10.2)

4.StreamSets3.3.0

前置条件

1.集群已启用Kerberos

2.集群已安装Kafka并启用Kerberos

2.测试环境准备

1.准备一个访问Kerberos环境的Kafka的fayson.keytab文件

[root@cdh01 ~]# kadmin.local

kadmin.local:  listprincs fayson*

fayson@FAYSON.COM

kadmin.local:  xst -norandkey -k fayson.keytab fayson@FAYSON.COM

(可左右滑动)

c534c463b7708784f83009c1c1f14ff6.png

fayson.keytab主要在向Kafka生产消息和StreamSets消费Kafka数据时使用。

2.准备向Kerberos环境的Kafka集群生产数据脚本

426b85386970c7ad956d926e3936b406.png

该脚本用于向Kafka发送JSON数据,脚本说明:

run.sh:向Kafka指定topic生产数据的脚本

ods_user_600.txt:发送到Kafka的测试数据,共600条测试数据,数据的id是唯一的。

0b441698ba5725ee01d39d1c4b7a6a22.png

注意:发送数据的示例代码是将ods_user_600.txt的每条数据转换为json格式了,示例数据如下:

{

"occupation": "生产工作、运输工作和部分体力劳动者",

"address": "台东东二路16号-8-8",

"city": "长治",

"marriage": "1",

"sex": "1",

"name": "仲淑兰",

"mobile_phone_num": "13607268580",

"bank_name": "广州银行31",

"id": "510105197906185179",

"child_num": "1",

"fix_phone_num": "15004170180"

}

(可左右滑动)

lib:Fayson打包的示例jar包及Kafka的依赖包

e07bba54e80acc66006f7a20530cebef.png

conf:示例代码运行的配置文件

2bba7b7a474afaf4973ed7e0ec2a0ae8.png

该脚本运行主要依赖0286.properties、jaas.conf、krb5.conf和fayson.keytab文件。

0286.properties配置文件,将如下配置修改为自己集群的环境:

[root@cdh04 conf]# vim 0286.properties

bootstrap.servers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092

topic.name=kafka_hive_topic

krb5.debug=false

group.id=testgroup

(可左右滑动)

99224d03be54d4173ce2f30334bbc7fb.png

jaas.conf文件内容如下:

[root@cdh04 conf]# vim jaas.conf

KafkaClient {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"

principal="fayson@FAYSON.COM";

};

Client {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

storeKey=true

keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"

principal="fayson@FAYSON.COM";

};

(可左右滑动)

f9c2459735cfa4417d3eaaa4389525e3.png

krb5.conf:拷贝集群的krb5.conf文件到conf目录下即可。

8ceeb63c51103238e23ada1b649dabe0.png

3.通过如下命令创建测试topic

[root@cdh04 conf]# kafka-topics --create --zookeeper cdh01.fayson.com:2181 --replication-factor 3 --partitions 1 --topic kafka_hive_topic

(可左右滑动)

98296ec31add7905d6b27dd43be4a750.png

4.通过Hue为sdc用户授权

ff417228e4ee9560f98cf0fbfdacb4a5.png

授予default库的所有权限以及/user/hive/warehouse目录的URI权限,否则sdc用户无法创建表。

3.StreamSets配置

由于Kafka集群启用了Kerberos,所以这里在使用StreamSets消费Kafka数据之前,需要配置StreamSets访问Kafka的Kerberos用户信息,具体配置如下:

1.登录Cloudera Manager并进入StreamSets服务

5305ff745e91eac9a8bc5a339f486255.png

2.点击“配置”,搜索“sdc_java_opts”,在该配置项中增加如下内容

-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf

(可左右滑动)

a7d12ac4edfda3289a854f70eb07ac6a.png

注意:jaas.conf文件需要存在于StreamSets的Data Collector服务所在节点的指定目录下。

4.创建StreamSets的Pipline

1.登录StreamSets,创建一个kafka2kudu的Pipline

09790ce5dfcabd07a7383c26622606e9.png

2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息

09db37bdb2032fa1c1f105213352b8ca.png

注意:Kafka的版本选择,这里Fayson选择的Apache的Kafka,在Kerberos环境下选择CDH对应的Kafka版本会报“Couldnot get partition count for topic 'kafka_hive_topic'”

配置Kafka相关信息,如Broker、ZK、Group、Topic及Kerberos信息

c845db7439973a06338ad1feb698cfb9.png

注意:访问Kerberos环境的Kafka需要配置security.protocol=SASL_PLAINTEXT和sasl.kerberos.service.name=kafka两个参数。

配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON

017d176c38909c4ea7371b3e6396c54b.png

3.添加Hive Metadata中间处理模块,选择对应的CDH版本

f2fe93f1c3ee5c0e0792642994662e40.png

配置Hive的JDBC信息

04ab9bd0e8f9c0ba79759fb7a4dfed6c.png

注意:这里访问Hive的JDBC连接,需要添加Kerberos信息,由于无法通过StreamSets界面指定我们自定义用户访问Hive,所以这里默认使用的是StreamSets的sdc用户,如果集群启用了Sentry则需要注意为sdc用户授权,否则无法创建hive表和写数据。

配置Hive的表信息,指定表名和库名

b8f6c17892a2bde61f3b4d7d7225250d.png

指定数据格式,指定为Avro,选项中有parquet格式,但在后续处理中并不支持parquet格式

af6581369d8d9b61626faec4da816ff3.png

4.添加Hadoop FS处理模块,主要用于将HiveMetadata的数据写入HDFS

42f2b87ce5109f32ff002e9ef7c95883.png

配置Hadoop FS,配置HDFS URL和是否启用Kerberos认证

e3b4396f5d82b4cae53386cadfeed822.png

配置Hadoop FS的Out Files

82157dbc29682f0749b069f8f8cc2ca4.png

注意:勾选“Directory in Header”使HDFS写入数据时使用上一步中Hive Metadata模块传递的目录,“IdleTimeout”主要是用于指定Hadoop FS模块空闲多久则将数据刷到HDFS数据目录。

配置Late Records参数,使用默认参数即可

ce20d97027944e822beb736dd8db058a.png

指定写入到HDFS的数据格式

290dc980b26c1d5d04810a38ce1a91cd.png

5.添加Hive Metastore模块,该模块主要用于向Hive库中创建表

9d76fa8ae62321ef73c309bdaef75022.png

配置Hive信息,JDBC访问URL

ab39b9e3cfe52484982f4815f829f026.png

Hive Metastore的高级配置

ace13502c626a1b2bf8e9c63e636f332.png

6.点击校验流程,如下图所示则说明流程正常

1475dd9262390d8970576809faa8b0d9.png

到此为止完成了Kafka数据到Hive的流程配置。

5.流程测试验证

1.启动kafka2hive的Pipline,启动成功如下图显示

68391d1d4b6193ba21952d156f790807.png

2.在命令行运行run.sh脚本向Kafka发送消息

[root@cdh04 ~]# cd /data/disk1/0286-kafka-shell/

[root@cdh04 0286-kafka-shell]# sh run.sh ods_user_600.txt

(可左右滑动)

0a71a2c3060f5e55f97425c74cad0d7f.png

3.在StreamSets中查看kafka2hive的pipline运行情况

695ebe743c1585d50ba052c8786bf811.png

4.使用sdc用户登录Hue查看ods_user表数据

ac213648443b178aefc5846843b6030e.png

入库的数据总条数

90d3f6159f4375c5cc040b792851ff10.png

可以看到ods_user表的总条数与准备的测试数据量一致。

6.总结

1.Kafka集群启用了Kerberos后,StreamSets的Kafka模块在消费数据时需要在sdc_java_opt中加载jaas.conf,指定消费Kafka数据的Kerberos账号。

2.Hive Metadata模块主要是用于将Kafka的JSON数据进行封装分流处理,data数据交给HDFS模块,MetaData数据交个HiveMetastore模块,HDFS模块主要用于写数据到hive表的数据目录,HiveMetastore主要用于判断表是否存在是否需要创建表。

3.由于HiveMetastore模块无法指定自定义的Kerberos账号,默认使用sdc用户访问Hive,在启用Sentry的集群则需要为sdc用户授权,否则无权限创建表。

4.HDFS模块在接收到HiveMetadata模块的数据后生成的为临时文件,不是立即将数据写入到HDFS,可以通过“Idle Timeout”参数来控制刷新数据到HDFS的频率。

GitHub地址:

生产消息脚本地址:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0286-kafka-shell

源码地址:

https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/kerberos/ReadUserInfoFileToKafka_0286.java

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

4fac3e72088ba7fa31aace91a692d4ad.gif

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操


版权声明:本文为weixin_35927281原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。