1.下载
下载地址:Apache flume 1.9.0
2.部署
- 创建部署目录,并上传安装包
- mkdir sys && rz -be
- 解压安装包
- tar -zxvf apache-flume-1.9.0-bin.tar.gz
- 设置环境变量
- vi ~/.bash_profile
- source ~/.bash_profile
- 修改配置文件
- cd $FLUME_HOME/conf
- mv flume-env.sh.template flume-env.sh
- vi flume-env.sh,修改JAVA_HOME的值,需要安装java8以上版本;如有需要也可适当调整JAVA_OPTS参数
3.配置
1.实时监控单个日志文件变化,并写入hdfs
- mv flume-conf.properties.template flume-conf.properties
- 修改flume-conf,配置source
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 ## configure the source # exec 指的是命令 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/bigdata/logs/test.log a1.sources.r1.channels = c1- 修改flume-conf,配置sink
# Config the sink #下沉目标 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 #指定目录, flume帮做目的替换 a1.sinks.k1.hdfs.path = hdfs://bi-name1/data/logs/trace_logs/111/%y-%m-%d/ #文件的命名, 前缀 a1.sinks.k1.hdfs.filePrefix = warn #10 分钟就改目录,生成新的目录 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute #时间:每3s滚动生成一个新的文件 0表示不使用时间来滚动 a1.sinks.k1.hdfs.rollInterval = 0 #空间: 文件滚动的大小限制(bytes) 当达到1kb时滚动生成新的文件 a1.sinks.k1.hdfs.rollSize = 1024 #事件:写入多少个event数据后滚动文件(事件个数),滚动生成新的文件 a1.sinks.k1.hdfs.rollCount = 20 #5个事件就开始往里面写入 a1.sinks.k1.hdfs.batchSize = 5 #用本地时间格式化目录 a1.sinks.k1.hdfs.useLocalTimeStamp = true #下沉后, 生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream- 修改flume-conf,配置channel
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c12.实时监控目录日志文件变化,并写入hdfs
- 配置source
# source类型 a1.sources.s1.type = TAILDIR # 元数据位置 a1.sources.s1.positionFile = /home/bigdata/file/flume/taildir_position.json # 监控的目录 a1.sources.s1.filegroups = f1 a1.sources.s1.filegroups.f1=/home/bigdata/logs/.*log a1.sources.s1.fileHeader = true- sink和channel配置同上
3.实时监控目录日志文件变化,并写入kafka,配置如下
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 ## configure the source # exec 指的是命令 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/bigdata/logs/test.log a1.sources.r1.channels = c1 ## configure the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.channel=c1 a1.sinks.k1.kafka.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092 a1.sinks.k1.kafka.topic = test1 #指定必须有多少个分区副本接收到了消息,生产者才认为消息发送成功, ##0:Never wait,1:wait for leader only,-1:wait for all replicas;default:1 a1.sinks.k1.kafka.producer.acks=0 ## configure the channel a1.channels.c1.type = file a1.channels.c1.checkpointDir = /data/flume/checkpoint a1.channels.c1.dataDirs = /data/flume/data # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
4.运行并测试
启动Flume:
$ bin/flume-ng agent -n a1 -c conf -f conf/flume.conf -Dflume.root.logger=INFO,console使用python2.7追加写入日志文件
#-*- coding: utf-8 -*- import time import sys import io reload(sys) sys.setdefaultencoding('utf-8') filePath="/home/bigdata/logs/test.log" def writeLog(): ff = io.open(filePath,"ab+") while True: time.sleep(1) tm = str(time.time()) json='{"name":"zhangsan","age":20}' ff.write(str(json+"\n")) if __name__=="__main__": print(str(time.time())) writeLog()查看kafka是否收到消息
$ kafka-console-consumer --bootstrap-server slave199:9092 --topic test1
5.bug修复
6.补充
- python2.7 io.open()的mode参数说明
关于open()的mode参数:
'r':读 ; 'w':写 ; 'a':追加
'r+' == r+w(可读可写,文件若不存在就报错(IOError))
'w+' == w+r(可读可写,文件若不存在就创建)
'a+' ==a+r(可追加可写,文件若不存在就创建)
如果是二进制文件,就都加一个b就好啦:
'rb' 'wb' 'ab' 'rb+' 'wb+' 'ab+'
- Flume启动命令说明
flume-ng agent --conf conf --conf-file conf/file.log --name agent1 -Dflume.root.logger=DEBUG, console
-c (--conf) : flume的conf文件路径
-f (--conf-file) : 自定义的flume配置文件
-n (--name): 自定义的flume配置文件中agent的name
- flume sink hdfs属性说明:
type HDFS
hdfs.path 必填,HDFS 目录路径 (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Flume在目录下创建文件的名称前缀
hdfs.fileSuffix – 追加到文件的名称后缀 (eg .avro - 注: 日期时间不会自动添加)
hdfs.inUsePrefix – Flume正在处理的文件所加的前缀
hdfs.inUseSuffix .tmp Flume正在处理的文件所加的后缀
版权声明:本文为drinkatmoon原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。