flume监听mysql_flume 读取数据存入mysql

本文需求是用flume 监控文件,然后将文件的内容存放在mysql数据库中。

本文结构

1.mysql 表设计

2. MysqlSink编写

3.conf 配置

4. 打包测试

1.mysql 表设计

首先声明本文的event ,本文的event内容 是”exec taili,yang”,exectaili 为数据 content ,“yang“ 为createBY 中间用“,“隔开 。

然后创建数据库和表 如图所示(数据库名字为sinktest ,表名为mysqltest ,id 是自增的。)

0818b9ca8b590ca3270a3433284dd417.png

2. MysqlSink编写

3.1 pom.xml

首先新建maven 项目,pom.xml 文件如下:

4.0.0

com.us

flumeDemo

1.0-SNAPSHOT

1.8

1.8

1.7.0

org.apache.flume

flume-ng-core

${version.flume}

org.apache.flume

flume-ng-configuration

${version.flume}

mysql

mysql-connector-java

6.0.5

3.2 java bean

对应我门的事件和数据库表建立java bean对象 Info.java

package com.us.flume;

/** * Created by yangyibo on 17/1/5. */

public class Info {

private String content;

private String createBy;

public String getContent() {

return content;

}

public void setContent(String content) {

this.content = content;

}

public String getCreateBy() {

return createBy;

}

public void setCreateBy(String createBy) {

this.createBy = createBy;

}

}

3.3 自定义Sink编写

MysqlSink.java

package com.us.flume;

import com.google.common.base.Preconditions;

import com.google.common.base.Throwables;

import com.google.common.collect.Lists;

import org.apache.flume.*;

import org.apache.flume.conf.Configurable;

import org.apache.flume.sink.AbstractSink;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.SQLException;

import java.util.List;

/** * Created by yangyibo on 17/1/5. */

public class MysqlSink extends AbstractSink implements Configurable {

private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);

private String hostname;

private String port;

private String databaseName;

private String tableName;

private String user;

private String password;

private PreparedStatement preparedStatement;

private Connection conn;

private int batchSize;

public MysqlSink() {

LOG.info("MysqlSink start...");

}

public void configure(Context context) {

hostname = context.getString("hostname");

Preconditions.checkNotNull(hostname, "hostname must be set!!");

port = context.getString("port");

Preconditions.checkNotNull(port, "port must be set!!");

databaseName = context.getString("databaseName");

Preconditions.checkNotNull(databaseName, "databaseName must be set!!");

tableName = context.getString("tableName");

Preconditions.checkNotNull(tableName, "tableName must be set!!");

user = context.getString("user");

Preconditions.checkNotNull(user, "user must be set!!");

password = context.getString("password");

Preconditions.checkNotNull(password, "password must be set!!");

batchSize = context.getInteger("batchSize", 100);

Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");

}

@Override

public void start() {

super.start();

try {

//调用Class.forName()方法加载驱动程序

Class.forName("com.mysql.jdbc.Driver");

} catch (ClassNotFoundException e) {

e.printStackTrace();

}

String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;

//调用DriverManager对象的getConnection()方法,获得一个Connection对象

try {

conn = DriverManager.getConnection(url, user, password);

conn.setAutoCommit(false);

//创建一个Statement对象

preparedStatement = conn.prepareStatement("insert into " + tableName +

" (content,create_by) values (?,?)");

} catch (SQLException e) {

e.printStackTrace();

System.exit(1);

}

}

@Override

public void stop() {

super.stop();

if (preparedStatement != null) {

try {

preparedStatement.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

if (conn != null) {

try {

conn.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

}

public Status process() throws EventDeliveryException {

Status result = Status.READY;

Channel channel = getChannel();

Transaction transaction = channel.getTransaction();

Event event;

String content;

List infos = Lists.newArrayList();

transaction.begin();

try {

for (int i = 0; i < batchSize; i++) {

event = channel.take();

if (event != null) {//对事件进行处理

//event 的 body 为 "exec tail$i , abel"

content = new String(event.getBody());

Info info=new Info();

if (content.contains(",")) {

//存储 event 的 content

info.setContent(content.substring(0, content.indexOf(",")));

//存储 event 的 create +1 是要减去那个 ","

info.setCreateBy(content.substring(content.indexOf(",")+1));

}else{

info.setContent(content);

}

infos.add(info);

} else {

result = Status.BACKOFF;

break;

}

}

if (infos.size() > 0) {

preparedStatement.clearBatch();

for (Info temp : infos) {

preparedStatement.setString(1, temp.getContent());

preparedStatement.setString(2, temp.getCreateBy());

preparedStatement.addBatch();

}

preparedStatement.executeBatch();

conn.commit();

}

transaction.commit();

} catch (Exception e) {

try {

transaction.rollback();

} catch (Exception e2) {

LOG.error("Exception in rollback. Rollback might not have been" +

"successful.", e2);

}

LOG.error("Failed to commit transaction." +

"Transaction rolled back.", e);

Throwables.propagate(e);

} finally {

transaction.close();

}

return result;

}

}

3.conf 配置

在flume的conf 文件夹下新建配置文件 mysqlSink.conf 内容如下:

注意:localhost 为mysql 数据库所在的服务器IP;

/opt/apps/logs/tail.log 为我要监控的文件;

com.us.flume.MysqlSink 是本文第2步中自定义sink的mysqlsink的全称命

agent1.sources = source1

agent1.sinks = mysqlSink

agent1.channels = channel1

# Describe/configure source1

agent1.sources.source1.type = exec

agent1.sources.source1.command = tail -F /opt/apps/logs/tail.log

agent1.sources.source1.channels = channel1

# Describe mysqlSink

agent1.sinks.mysqlSink.type =com.us.flume.MysqlSink

agent1.sinks.mysqlSink.hostname=localhost

agent1.sinks.mysqlSink.port=3306

agent1.sinks.mysqlSink.databaseName=sinktest

agent1.sinks.mysqlSink.tableName=mysqltest

agent1.sinks.mysqlSink.user=root

agent1.sinks.mysqlSink.password=xxxxxx

agent1.sinks.mysqlSink.channel = channel1

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 1000

agent1.channels.channel1.transactionCapactiy = 100

4. 打包测试

4.1 打包

将步骤3 的项目打成 jar包, 放到 flume 的lib 文件夹下。并将mysql-connector-java 的jar 包也放入 flume 的lib 文件夹下。

注意:(使用idea 打包在creat jar for modules时,main class 空着不用填写,因为我门没有可运行类)

4.2 启动测试

在flume的bin目录下下执行启动命令

./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/mysqlSink.conf -n agent1 -Dflume.root.logger=INFO,console

打开一个新终端 ,向我们监控的文件 发送我门定义的事件。

for i in {1..10};do echo "exec tail$i , yang" >> /opt/apps/logs/tail.log;done;

此时打开数据库可以看到我门的事件已经插入到数据库了。

(由于测试多次,所以id 不是从零开始了。)

0818b9ca8b590ca3270a3433284dd417.png


版权声明:本文为weixin_29499357原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。