flume拦截mysql_Flume 读取日志文件数据存入到 Mysql

需求是用 flume 监控日志文件,然后将文件的内容存放在 MySQL 数据库中。

本文结构(注意:环境都是在 CentOS 7(192.168.5.105)下)

flume1.7.0 的安装和配置

MySQL 表设计

MysqlSink 插件的编写

连接服务器远程调试

打包更新到服务器

flume1.7.0 的安装和配置

官网下载 flume1.7.0,我这里都放到自己的共享盘里面,方便自己也方面别人:

上传到 CentOS 7 指定目录:

解压文件:

tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /home/common/

cd ../common/

mv apache-flume-1.7.0-bin/ flume-1.7.0

cd flume-1.7.0/conf

cp flume-conf.properties.template flume-conf.properties

vi flume-conf.properties

配置如下:

#配置 Agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#配置 Source

a1.sources.r1.type = exec

a1.sources.r1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/data/XXX/logback.log

a1.sources.r1.deserializer.outputCharset = UTF-8

#配置 Sink

a1.sinks.k1.type = XXX.XXX.logs.sink.XXXXX

a1.sinks.k1.hostname=192.168.5.111

a1.sinks.k1.port=3306

a1.sinks.k1.databaseName=database

a1.sinks.k1.tableName=tables

a1.sinks.k1.user=root

a1.sinks.k1.password=password

a1.sinks.k1.channel = c1

#配置 Channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

注意:记得配置 flume 运行 Java 的环境

cp flume-env.sh.template flume-env.sh

vi flume-env.sh 修改JAVA_HOME路径(这里写你自己的Java 环境)

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64

Java 方面的编写和 MySQL 表的创建

注意下这个 jar 包放置的点,我是用下面的方式:

官方建议在 flume 的 plugins.d (plugins.d 目录需要自己创建)目录下创建 一个自己定义的目录,在自定义的目录下新建 lib 和 libext 文件夹,lib 文件夹为放自定义组件的 jar 包,libext 文件夹下放 自定义组件的依赖包。

​ flume-1.7.0/plugins.d/

flume-1.7.0/plugins.d/project/

flume-1.7.0/plugins.d/project/lib/XXXXX.jar

flume-1.7.0/plugins.d/project/libext/mysql-connector-java-6.0.5.jar

坑点 1:WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+

​DriverManager.getConnection("jdbc:mysql://localhost:3306/logdb","root","123456");

​改成

​DriverManager.getConnection("jdbc:mysql://localhost:3306/logdb?useSSL=false","root","123456");

最后,启动 flume

./flume-ng agent -c /home/common/flume-1.7.0/conf -f /home/common/flume-1.7.0/conf/flume-conf.properties -n agnet1 -Dflume.root.logger=INFO,console

使用过程中大问题总汇:

flume 的 memeryChannel 中 transactionCapacity 和 sink 的 batchsize

运行时候报下面错误:

16/04/29 09:36:15 ERROR sink.AbstractRpcSink: Rpc Sink avro-sink: Unable to get event from channel memoryChannel. Exception follows.

org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count

at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)

at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)

at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)

at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:354)

at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

at java.lang.Thread.run(Thread.java:745)

参考的解决方案:

原因是:

flume 的实时日志收集,用 flume 默认的配置后,发现不是完全实时的,于是看了一下,原来是 memeryChannel 的 transactionCapacity 在作怪,因为他默认是 100,也就是说收集端的 sink 会在收集到了 100 条以后再去提交事务(即发送到下一个目的地)。

flume 收集 Java exception 错误日志的问题

flume 在收集到java throw Exception 异常日志信息的时候既然不是整条ERROR 异常错误,如下:

它是将上面的异常每条都传输到后台处理,像这种应该合并成一个Event里面

参考的解决方案:

我采用的是自定义 Source 的方式,可以将错误的信息合并后传输:

其核心思想是:

如果当前有两行记录,如果两行记录都是“正常”的日志信息,比如:

2016-07-08 09:33:53-[Analysis-WC] INFO [main] FormulaContextUtil.init(68) | into FormulaContextUtil.init method begin ....

2016-07-08 09:33:53-[Analysis-WC] INFO [main] FormulaContextUtil.init(133) | FormulaContextUtil.init method end ....

则单独将每一行当做一个 Event,传递;

当连续的两条记录诸如这样的形式:

2016-07-28 15:49:05-[Analysis-WC] ERROR [http-8080-2] ControllerProxy.afterMethod(43) | java.lang.NullPointerException

java.lang.NullPointerException

at com.ap.alt.system.web.LoginMgrController.getSumMXYJTC(LoginMgrController.java:304)

则将下一行日志合并到上一个 Event 中。


版权声明:本文为weixin_39682897原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。