flume纯配置文件初步入门

首先我们来了解一下flume

flume可以说是大数据当中常见的一个数据采集工具,在本博客当中,你将可以学到flume当中conf文件的配置,非常简单也非常好用。
启动flume的命令:
在flume当中bin目录下输入./flume-ng agent --conf conf --conf-file /conf文件的路径 --name conf文件内agent的名称
因为我们使用的flume当中有用到kafka,kafka主要有zookeeper作为基础,所以配置组件绝对不可出错

source的配置

对于source的配置有很多种,具体参考flume官方,在这里我们将说明几种类型

  1. exec类型的source.
agent_name.sources=source_name
agent_name.sources.source_name.type=exec
agent_name.sources.source_name.shell=/bin/bash -c ##这个命令是写接下来命令执行的环境变量
agent_name.sources.source_name.command=tail -n +0 -F /路径  ##这则命令当中tail是linux下命令,主要作用是监听文件,-n是从文件第几行开始,+0是正着数下标0,-F是实时更新,路径就是你所要监听的文件是什么
agent_name.sources.source_name.restart=true
agent_name.sources.source_name.batchSize=1000  ##这个大小就不用多解释什么了吧23333
agent_name.sources.source_name.channels=channels_name  ##把source绑定channel通道当中
  1. spooldir类型的source
agent_name.sources=sources_name
agent_name.sources.sources_name.type=spooldir
agent_name.sources.sources_name.spoolDir=/路径  ##此项未深究,目录最多监听5或6个文件,默认采集过的文件会打上后缀
agent_name.sources.sources_name.channels=channels_name  ##把source绑定channel通道当中
  1. kafka类型的source
agent_name.sources=sources_name
agent_name.sources.sources_name.type=org.apache.flume.source.kafka.KafkaSource
agent_name.sources.sources_name.topic=test  ##发布kafka消息的topic名
agent_name.sources.sources_name.zookeeperConnect=local:2181,slave:2181,slave2:2181  ##zookeeper集群的地址端口
agent_name.sources.sources_name.bootstrap.server=local:9092,slave:9092,slave2:9092  ##kafka集群的地址与端口
agent_name.sources.sources_name.kafka.consumer.timeout.ms=10  ##接收消息
agent_name.sources.sources_name.group.id=flume
agent_name.sources.sources_name.batchSize=1000
agent_name.sources.sources_name.channels=channels_name  ##把source绑定channel通道当中

source的拦截器使用

这个比较特殊,它是用于数据初步过滤的,需要了解一定的正则表达式

agent_name.sources.sources_name.interceptors=i1
agent_name.sources.sources_name.interpcetors.i1.type=regex_filter
agent_name.sources.sources_name.interceptors.i1.regex=(填写你需要的数据,正则表达式进行筛选)
agent_name.sources.sourecs_name.interpceotrs.i1.excludeEvents=false  ##这里的false代表着收集,true代表着删除

sink的配置

  1. sink到kafka配置项
 agent_name.sinks=sink_name
 agent_name.sinks.sink_name.type=org.apache.flume.sink.kafka.KafkaSink
 agent_name.sinks.sink_name.topic=test  ##你启动kafka后创建的topic名称
 agent_name.sinks.sink_name.brokerList=local:2181,slave1:2181,slave2:2181 ##kafka集群的地址
 agent_name.sinks.sink_name.metadata.brokerList=local:2181,slave1:2181,slave2:2181 ##kafka集群地址+端口(来源为基础zookeeper)
 agent_name.sinks.sink_name.flumeBatchSize=1000 ##最大值数量控制
 agent_name.sinks.sink_name.serializer.class=kafka.serializer.StringEncoder ##关于转码为字符类型
 agent_name.sinks.sink_name.producer.type=sync ##剩下的应该是生产者相关的...producer嘛
 agent_name.sinks.sink_name.producer.acks=1
 agent_name.sinks.sink_name.producer.linger.ms=1
 agent_name.sinks.sink_name.producer.compression.type=snappy
 agent_name.sinks.sink_name.channel=channel_name  ##将sink绑定到channel通道
  1. sink到hdfs的配置项
agent_name.sinks=sink_name
agent_name.sinks.sink_name.hdfs.type=hdfs  ##hdfs类型
agent_name.sinks.sink_name.hdfs.path=hdfs://local:8020/usr  ##上传hdfs后的路径
agent_name.sinks.sink_name.hdfs.filePrefix=%Y-%m-%d  ##文件名前缀
agent_name.sinks.sink_name.hdfs.round=true  ##根据时间来滚动文件夹,true代表是
agent_name.sinks.sink_name.hdfs.roundValue=10  ##数值
agent_name.sinks.sink_name.hdfs.roundUnit=minute  ##单位,hour是小时,minute是分钟
agent_name.sinks.sink_name.hdfs.rollSize=5160000  ##根据大小滚动文件,此值大约是5MB
agent_name.sinks.sink_name.hdfs.rollCount=0  ##根据收到的消息条数滚动文件,默认10条,设置为0不按照条数滚动
agent_name.sinks.sink_name.hdfs.rollInterval=600  ##根据时间来滚动文件,单位“s(秒)”
agent_name.sinks.sink_name.hdfs.idleTimeout=600  ##多长时间文件未进行写入便滚动成一个文件,单位"s(秒)"
agent_name.sinks.sink_name.hdfs.useLocalTimeStamp=true  ##是否启用本地时间戳
agent_name.sinks.sink_name.hdfs.minBlockReplicas=1  ##这一项和下面一项设置后可以准确按照自己设置来进行文件滚动
agent_name.sinks.sink_name.hdfs.callTimeout=40000 ##
agent_name.sinks.sink_name.hdfs.writeFormat=Text  ##默认文件类型格式为文本
agent_name.sinks.sink_name.hdfs.fileType=DataStream  ##文件类型为文本,此项不设置会出现乱码
agent_name.sinks.sink_name.channel=channel_name  ##将sink绑定到channel通道

channel的配置

  1. channel的配置项
agent_name.channels=channel_name
agent_name.channels.channel_name.type=memory  ##通道类型为内存通道
agent_name.channels.channel_name.keep-alive=30
agent_name.channels.channel_name.capacity=100000 ##通道可容纳多少条event
agent_name.channels.channel_name.transanctionCapacity=1000 ##通道一次可以发送多少条event

在使用拦截器时如果conf内写汉字乱码可以更改环境变量

vim /etc/locale.conf
将LANG的值改为zh_CN.utf-8 即可
改完以后重新连接就好了

以上就是本博客当中的全部内容.具体实验环境请根据实际进行调配

憨憨博主,在线求关注(收藏)
好文章记得转发给朋友哦~


版权声明:本文为weixin_43937130原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。