目录
#--------4 flune的拓扑结构 (avro源和avro槽是关键)
#--------1 使用思路
0 还是得查官方网站最有用!! https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
1 想好用什么源source
2 使用什么通道channel
3 采集的目的地sink
4 编写好conf文件放在指定目录下
5 使用命令进行实现,命令格式:
/root/flume/bin/flume-ng agent -n agent的名字 -c /root/flume/conf/ -f job/对应conf文件名
#--------2 组件的选型
#============source
《注意:由于后续版本的更新推出的Taildir source功能过于强大所以基本不会使用Exec和Spooldir 》
适用面 source 优点/缺点
>监听端口发送的信息 netcat 监听端口数据
例如
a2.sources.r1.type = netcat
a2.sources.r1.bind = node01
a2.sources.r1.port = 44444
>监控本地单个文件上传(以命令的形式) Exec 缺点:不能断电续传
例如
a1.sources.r1.type = exec
#linux命令tail -F监听一个文件(不能监听一个文件夹)
a1.sources.r1.command = tail -F /root/hive-3.1.2/logs/root/hive.log
a1.sources.r1.shell = /bin/bash -c
>监控单目录中新文件的上传 Spooldir 缺点:不能动态追加
例如
a3.sources.r3.type = spooldir
#监听的文件夹
a3.sources.r3.spoolDir = /root/flume/upload
#给上传之后的文件加上的后缀
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
>监控多目录多文件(可以追加) Taildir 优点:既能断电续传也能动态追加
缺点:文件改名之后会重复上传,例如hive.log第二天
会更名为hive.log.2022-01-09这样第二天更名之后的文
件又会被上传一次(可以通过修改源码的方式改进)
例如
a3.sources.r3.type = TAILDIR
#存放标记文件的位置
a3.sources.r3.positionFile = /root/flume/tail_dir.json
#监听的文件组
a3.sources.r3.filegroups = f1 f2
#只监听file1文件夹下的包含file的文件
a3.sources.r3.filegroups.f1 = /root/flume/files/.*file.*
#只监听file2文件夹下的包含log的文件
a3.sources.r3.filegroups.f2 = /root/flume/files2/.*log.*
>复杂拓扑连接 avro 用于复杂拓扑的跨机器跨端口连接源
例如
a3.sources.r1.type = avro
#连接的avro sink的ip地址
a3.sources.r1.bind = node04
#连接的avro sink的端口号
a3.sources.r1.port = 4141
>fulme-kafka-flume标准日志采集 kafka 作为消费者接收kafka中topic的数据
例如
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#设置topic通道中一次传输的个数为5000个不满足5000个,
a1.sources.r1.batchSize = 5000
#但是时间到达2000ms时也进行传输
a1.sources.r1.batchDurationMillis = 2000
#绑定kafka集群的地址可以绑定多的防止所连接的那一台kafka所在机器宕机无法访问
a1.sources.r1.kafka.bootstrap.servers = warehouse:9092
#连接kafka集群中的哪几个topic,可以使用kafka.topics.regex,使用正则表达式连接多个topic
a1.sources.r1.kafka.topics=topic_log
#定义拦截器(定义名字,可以定义多个,拦截器需要自定义(自定义拦截器这一块我还不是很熟练后续要多练习))
a1.sources.r1.interceptors = i1
#时间拦截器用于解决 零点漂移问题 这个问题大概就是日志生产时间明明是8.9日 23:59但是由于
#在flume传输的过程中消耗了一部分时间所以到达kafka中的时间已经是8.10日之后,那么这个日志
#将会后移至8.10日在进行处理,通过在event中的头文件中用拦截器加上日志生产时间的方式可以解决
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
#=============channel
memory 内存传输 可靠性低 传输效率高
例如
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
file 磁盘传输 可靠性高 传输效率低
例如
## channel1
a1.channels.c1.type = file
#备份内存中索引数据的磁盘路径
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
#存放channel中数据的磁盘路径
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
kafka channel 数据存储在kafka
存储在磁盘中可靠性高
传输效率高
例如
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#=============sink
avro 拓扑连接
例如
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node01
a1.sinks.k1.port = 4141
hdfs 采集到hdfs
例如
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://node01:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a3.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k1.hdfs.rollCount = 0
logger 采集为日志,可以直接打印到控制台
例如
a3.sinks.k1.type = logger
file_roll 采集到本地
例如
a2.sinks.k1.type = file_roll
a2.sinks.k1.sink.directory = /root/tmp/flume/hive_logs_local
kafka 采集到kafka
#--------3 .conf文件的编写:
(多查询官方文档https://flume.apache.org/releases/content/1.9.0/FlumeUtaserGuide.html)
1 先定义
定义agent(其实在flume中一个agent相当于一个JVM进程)的名字
定义sources的名字
定义channels的名字
定义sinks的名字
例如
a3.sources = r3
a3.sinks = k3
a3.channels = c3
2 编写各个组件的属性
参考上面的组件选型
编写source的属性
编写channel的属性
编写sink的属性
3 将各个组件进行连接
将source与channels进行连接(一个源可以连多个通道)
a1.sources.r1.channels = c1 c2
将sinks与channel进行连接(一个通道可以连接多个槽)
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
#--------4 flune的拓扑结构 (avro源和avro槽是关键)
(avro源是服务端,应该先启动这个agent)
1 简单串联 avro源(服务器server)和avro槽(客户端client)
2 复制 需要选择器Replicating ChannelSelector
3 多路复用 需要拦截器 自定义 Interceptor 不同的信息发往不同的sink
4 负载均衡 需要sink group 策略 load_balance(配置rand随机或轮询的工作策略)
5 故障转移 需要sink group 策略 failover(配置组内的sink优先级)
6 聚合 两个avro槽同时指向一个avro源
7 f-k-f模式 指的是使用flume作为kafka集群的生产者接收日志服务器中的数据然后通过kafka缓存之后再由一个fluem作为消费者消费kafka中topic的数据,然后经由这个fluem传输到对应的sink中
#--------5 监控程序ganglia
(这里并没有列出怎么安装和使用如果有兴趣后续可能会更新)
启动/关闭在node01上/root/shell/ganglia.sh bash ganglia.sh
web端页面,在电脑登录web页面: 网站 http://node01/ganglia/
监控的是flume的各项指标如下
EventPutAttemptCount source 尝试写入 channel 的事件总数量
EventPutSuccessCount 成功写入 channel 且提交的事件总数量
EventTakeAttemptCount sink 尝试从 channel 拉取事件的总数量
EventTakeSuccessCount sink 成功读取的事件的总数量
ChannelSize 目前 channel 中事件的总数量
ChannelCapacity channel 的容量
某一时刻 EventPutSuccessCount= EventTakeSuccessCount + ChannelSize
这些指标中能看出集群的flume是否丢失数据是否正常运行
#--------6 结语
说实话是第一次写博客,写的哪里不好或者有错误欢迎指出。