此博客仅针对flume当中conf配置文件
首先我们来了解一下flume
flume可以说是大数据当中常见的一个数据采集工具,在本博客当中,你将可以学到flume当中conf文件的配置,非常简单也非常好用。
启动flume的命令:
在flume当中bin目录下输入./flume-ng agent --conf conf --conf-file /conf文件的路径 --name conf文件内agent的名称
因为我们使用的flume当中有用到kafka,kafka主要有zookeeper作为基础,所以配置组件绝对不可出错
source的配置
对于source的配置有很多种,具体参考flume官方,在这里我们将说明几种类型
exec类型的source.
agent_name.sources=source_name
agent_name.sources.source_name.type=exec
agent_name.sources.source_name.shell=/bin/bash -c ##这个命令是写接下来命令执行的环境变量
agent_name.sources.source_name.command=tail -n +0 -F /路径 ##这则命令当中tail是linux下命令,主要作用是监听文件,-n是从文件第几行开始,+0是正着数下标0,-F是实时更新,路径就是你所要监听的文件是什么
agent_name.sources.source_name.restart=true
agent_name.sources.source_name.batchSize=1000 ##这个大小就不用多解释什么了吧23333
agent_name.sources.source_name.channels=channels_name ##把source绑定channel通道当中
spooldir类型的source
agent_name.sources=sources_name
agent_name.sources.sources_name.type=spooldir
agent_name.sources.sources_name.spoolDir=/路径 ##此项未深究,目录最多监听5或6个文件,默认采集过的文件会打上后缀
agent_name.sources.sources_name.channels=channels_name ##把source绑定channel通道当中
kafka类型的source
agent_name.sources=sources_name
agent_name.sources.sources_name.type=org.apache.flume.source.kafka.KafkaSource
agent_name.sources.sources_name.topic=test ##发布kafka消息的topic名
agent_name.sources.sources_name.zookeeperConnect=local:2181,slave:2181,slave2:2181 ##zookeeper集群的地址端口
agent_name.sources.sources_name.bootstrap.server=local:9092,slave:9092,slave2:9092 ##kafka集群的地址与端口
agent_name.sources.sources_name.kafka.consumer.timeout.ms=10 ##接收消息
agent_name.sources.sources_name.group.id=flume
agent_name.sources.sources_name.batchSize=1000
agent_name.sources.sources_name.channels=channels_name ##把source绑定channel通道当中
source的拦截器使用
这个比较特殊,它是用于数据初步过滤的,需要了解一定的正则表达式
agent_name.sources.sources_name.interceptors=i1
agent_name.sources.sources_name.interpcetors.i1.type=regex_filter
agent_name.sources.sources_name.interceptors.i1.regex=(填写你需要的数据,正则表达式进行筛选)
agent_name.sources.sourecs_name.interpceotrs.i1.excludeEvents=false ##这里的false代表着收集,true代表着删除
sink的配置
sink到kafka配置项
agent_name.sinks=sink_name
agent_name.sinks.sink_name.type=org.apache.flume.sink.kafka.KafkaSink
agent_name.sinks.sink_name.topic=test ##你启动kafka后创建的topic名称
agent_name.sinks.sink_name.brokerList=local:2181,slave1:2181,slave2:2181 ##kafka集群的地址
agent_name.sinks.sink_name.metadata.brokerList=local:2181,slave1:2181,slave2:2181 ##kafka集群地址+端口(来源为基础zookeeper)
agent_name.sinks.sink_name.flumeBatchSize=1000 ##最大值数量控制
agent_name.sinks.sink_name.serializer.class=kafka.serializer.StringEncoder ##关于转码为字符类型
agent_name.sinks.sink_name.producer.type=sync ##剩下的应该是生产者相关的...producer嘛
agent_name.sinks.sink_name.producer.acks=1
agent_name.sinks.sink_name.producer.linger.ms=1
agent_name.sinks.sink_name.producer.compression.type=snappy
agent_name.sinks.sink_name.channel=channel_name ##将sink绑定到channel通道
sink到hdfs的配置项
agent_name.sinks=sink_name
agent_name.sinks.sink_name.hdfs.type=hdfs ##hdfs类型
agent_name.sinks.sink_name.hdfs.path=hdfs://local:8020/usr ##上传hdfs后的路径
agent_name.sinks.sink_name.hdfs.filePrefix=%Y-%m-%d ##文件名前缀
agent_name.sinks.sink_name.hdfs.round=true ##根据时间来滚动文件夹,true代表是
agent_name.sinks.sink_name.hdfs.roundValue=10 ##数值
agent_name.sinks.sink_name.hdfs.roundUnit=minute ##单位,hour是小时,minute是分钟
agent_name.sinks.sink_name.hdfs.rollSize=5160000 ##根据大小滚动文件,此值大约是5MB
agent_name.sinks.sink_name.hdfs.rollCount=0 ##根据收到的消息条数滚动文件,默认10条,设置为0不按照条数滚动
agent_name.sinks.sink_name.hdfs.rollInterval=600 ##根据时间来滚动文件,单位“s(秒)”
agent_name.sinks.sink_name.hdfs.idleTimeout=600 ##多长时间文件未进行写入便滚动成一个文件,单位"s(秒)"
agent_name.sinks.sink_name.hdfs.useLocalTimeStamp=true ##是否启用本地时间戳
agent_name.sinks.sink_name.hdfs.minBlockReplicas=1 ##这一项和下面一项设置后可以准确按照自己设置来进行文件滚动
agent_name.sinks.sink_name.hdfs.callTimeout=40000 ##
agent_name.sinks.sink_name.hdfs.writeFormat=Text ##默认文件类型格式为文本
agent_name.sinks.sink_name.hdfs.fileType=DataStream ##文件类型为文本,此项不设置会出现乱码
agent_name.sinks.sink_name.channel=channel_name ##将sink绑定到channel通道
channel的配置
channel的配置项
agent_name.channels=channel_name
agent_name.channels.channel_name.type=memory ##通道类型为内存通道
agent_name.channels.channel_name.keep-alive=30
agent_name.channels.channel_name.capacity=100000 ##通道可容纳多少条event
agent_name.channels.channel_name.transanctionCapacity=1000 ##通道一次可以发送多少条event
在使用拦截器时如果conf内写汉字乱码可以更改环境变量
vim /etc/locale.conf
将LANG的值改为zh_CN.utf-8 即可
改完以后重新连接就好了
以上就是本博客当中的全部内容.具体实验环境请根据实际进行调配
憨憨博主,在线求关注(收藏)
好文章记得转发给朋友哦~
版权声明:本文为weixin_43937130原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。