flume初级使用指南

目录

#--------1 使用思路

#--------2 组件的选型

1 source        

2 channel      

3 sink

 #--------3 .conf文件的编写:

1 先定义 

2 编写各个组件的属性

3 将各个组件进行连接

#--------4 flune的拓扑结构  (avro源和avro槽是关键)

#--------5 监控程序ganglia

#--------6 结语

#--------1 使用思路


0 还是得查官方网站最有用!! https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
1 想好用什么源source
2 使用什么通道channel
3 采集的目的地sink
4 编写好conf文件放在指定目录下
5 使用命令进行实现,命令格式:
/root/flume/bin/flume-ng agent -n agent的名字 -c /root/flume/conf/ -f job/对应conf文件名

                             
#--------2 组件的选型


#============source        
《注意:由于后续版本的更新推出的Taildir source功能过于强大所以基本不会使用Exec和Spooldir 》

    适用面                                source                    优点/缺点 
>监听端口发送的信息                     netcat                监听端口数据
    例如 
    a2.sources.r1.type = netcat
    a2.sources.r1.bind = node01
    a2.sources.r1.port = 44444
    
>监控本地单个文件上传(以命令的形式)     Exec                 缺点:不能断电续传
    例如
    a1.sources.r1.type = exec
    #linux命令tail -F监听一个文件(不能监听一个文件夹)
    a1.sources.r1.command = tail -F /root/hive-3.1.2/logs/root/hive.log
    a1.sources.r1.shell = /bin/bash -c
    
>监控单目录中新文件的上传                 Spooldir             缺点:不能动态追加
    例如 
    a3.sources.r3.type = spooldir
    #监听的文件夹
    a3.sources.r3.spoolDir = /root/flume/upload
    #给上传之后的文件加上的后缀
    a3.sources.r3.fileSuffix = .COMPLETED
    a3.sources.r3.fileHeader = true
    #忽略所有以.tmp 结尾的文件,不上传
    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    
>监控多目录多文件(可以追加)             Taildir                优点:既能断电续传也能动态追加  
                                                    缺点:文件改名之后会重复上传,例如hive.log第二天
                                                    会更名为hive.log.2022-01-09这样第二天更名之后的文
                                                    件又会被上传一次(可以通过修改源码的方式改进)
    例如 
    a3.sources.r3.type = TAILDIR
    #存放标记文件的位置
    a3.sources.r3.positionFile = /root/flume/tail_dir.json
    #监听的文件组
    a3.sources.r3.filegroups = f1 f2
    #只监听file1文件夹下的包含file的文件
    a3.sources.r3.filegroups.f1 = /root/flume/files/.*file.*
    #只监听file2文件夹下的包含log的文件
    a3.sources.r3.filegroups.f2 = /root/flume/files2/.*log.*
    
>复杂拓扑连接                            avro                用于复杂拓扑的跨机器跨端口连接源
    例如 
    a3.sources.r1.type = avro
    #连接的avro sink的ip地址
    a3.sources.r1.bind = node04
    #连接的avro sink的端口号
    a3.sources.r1.port = 4141
    
>fulme-kafka-flume标准日志采集            kafka                作为消费者接收kafka中topic的数据
    例如
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    #设置topic通道中一次传输的个数为5000个不满足5000个,
    a1.sources.r1.batchSize = 5000
    #但是时间到达2000ms时也进行传输
    a1.sources.r1.batchDurationMillis = 2000
    #绑定kafka集群的地址可以绑定多的防止所连接的那一台kafka所在机器宕机无法访问
    a1.sources.r1.kafka.bootstrap.servers = warehouse:9092
    #连接kafka集群中的哪几个topic,可以使用kafka.topics.regex,使用正则表达式连接多个topic
    a1.sources.r1.kafka.topics=topic_log
    #定义拦截器(定义名字,可以定义多个,拦截器需要自定义(自定义拦截器这一块我还不是很熟练后续要多练习))
    a1.sources.r1.interceptors = i1
    #时间拦截器用于解决 零点漂移问题 这个问题大概就是日志生产时间明明是8.9日 23:59但是由于
    #在flume传输的过程中消耗了一部分时间所以到达kafka中的时间已经是8.10日之后,那么这个日志
    #将会后移至8.10日在进行处理,通过在event中的头文件中用拦截器加上日志生产时间的方式可以解决
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder


#=============channel


                                        memory                内存传输 可靠性低 传输效率高
    例如
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
                                        file                磁盘传输 可靠性高 传输效率低
    例如
    ## channel1
    a1.channels.c1.type = file
    #备份内存中索引数据的磁盘路径
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    #存放channel中数据的磁盘路径
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/

                                        kafka channel       数据存储在kafka
                                                            存储在磁盘中可靠性高
                                                            传输效率高
    例如
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
    a1.channels.c1.kafka.topic = topic_log
    a1.channels.c1.parseAsFlumeEvent = false

                                    
    
#=============sink


                                        avro                拓扑连接
    例如
    a1.sinks.k1.type = avro 
    a1.sinks.k1.hostname = node01
    a1.sinks.k1.port = 4141
                                        hdfs                采集到hdfs
    例如
    a3.sinks.k1.type = hdfs
    a3.sinks.k1.hdfs.path = hdfs://node01:8020/flume2/%Y%m%d/%H
    #上传文件的前缀
    a3.sinks.k1.hdfs.filePrefix = flume2-
    #是否按照时间滚动文件夹
    a3.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k1.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k1.hdfs.rollInterval = 60
    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k1.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a3.sinks.k1.hdfs.rollCount = 0

                                        logger                采集为日志,可以直接打印到控制台
    例如 
    a3.sinks.k1.type = logger
    
                                        file_roll            采集到本地
    例如
    a2.sinks.k1.type = file_roll
    a2.sinks.k1.sink.directory = /root/tmp/flume/hive_logs_local
    
                                        kafka                采集到kafka
                                        

#--------3 .conf文件的编写:


(多查询官方文档https://flume.apache.org/releases/content/1.9.0/FlumeUtaserGuide.html)

1 先定义 


         定义agent(其实在flume中一个agent相当于一个JVM进程)的名字
         定义sources的名字
         定义channels的名字
         定义sinks的名字
    例如
         a3.sources = r3
         a3.sinks = k3
         a3.channels = c3

2 编写各个组件的属性


         参考上面的组件选型
         编写source的属性
         编写channel的属性
         编写sink的属性
         
3 将各个组件进行连接


         将source与channels进行连接(一个源可以连多个通道)
            a1.sources.r1.channels = c1 c2
         将sinks与channel进行连接(一个通道可以连接多个槽)
            a1.sinks.k1.channel = c1
            a1.sinks.k2.channel = c2
         
    
#--------4 flune的拓扑结构  (avro源和avro槽是关键)


(avro源是服务端,应该先启动这个agent)
1 简单串联        avro源(服务器server)和avro槽(客户端client)
2 复制            需要选择器Replicating ChannelSelector
3 多路复用        需要拦截器 自定义 Interceptor 不同的信息发往不同的sink
4 负载均衡        需要sink group 策略 load_balance(配置rand随机或轮询的工作策略)
5 故障转移        需要sink group 策略 failover(配置组内的sink优先级)
6 聚合            两个avro槽同时指向一个avro源
7 f-k-f模式        指的是使用flume作为kafka集群的生产者接收日志服务器中的数据然后通过kafka缓存之后再由一个fluem作为消费者消费kafka中topic的数据,然后经由这个fluem传输到对应的sink中


#--------5 监控程序ganglia

(这里并没有列出怎么安装和使用如果有兴趣后续可能会更新)
启动/关闭在node01上/root/shell/ganglia.sh    bash ganglia.sh
web端页面,在电脑登录web页面: 网站 http://node01/ganglia/
监控的是flume的各项指标如下

EventPutAttemptCount  source        尝试写入 channel 的事件总数量
EventPutSuccessCount          成功写入 channel 且提交的事件总数量
EventTakeAttemptCount sink        尝试从 channel 拉取事件的总数量
EventTakeSuccessCount sink         成功读取的事件的总数量
ChannelSize           目前 channel 中事件的总数量
ChannelCapacity       channel 的容量
某一时刻 EventPutSuccessCount= EventTakeSuccessCount + ChannelSize
这些指标中能看出集群的flume是否丢失数据是否正常运行

#--------6 结语

说实话是第一次写博客,写的哪里不好或者有错误欢迎指出。



版权声明:本文为qq_47981823原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。