flume 自定义Interceptor 按照日志类型的不同,将日志分类打印

1 实验环境

三台Hadoop集群(102 103 104) 版本:3.1.3
flume版本:1.9.0

2 需求
Hadoop102:通过netcat 模拟日志数据,根据自定义的拦截器进行分类,分别打印在Hadoop103 和 Hadoop104 的控制台

3 自定义拦截器编码

3.1 创建maven工程添加依赖

<dependency>
 <groupId>org.apache.flume</groupId>
 <artifactId>flume-ng-core</artifactId>
 <version>1.9.0</version>
</dependency>

3.2 代码实现

package com.nuo.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author nuonuo
 * @create 2022-05-25 11:00
 */
public class CustomInterceptor implements Interceptor {

    //声明一个集合,用于存放拦截器处理后的事件
    private List<Event> addHeaderEvents;

    //初始化
    @Override
    public void initialize() {
        //初始化一个集合,用于存放拦截器处理后的事件
        addHeaderEvents = new ArrayList<>();

    }

    //单个事件处理方法
    @Override
    public Event intercept(Event event) {

        //1 获取header&body
        Map<String, String> headers = event.getHeaders();
        String body = new String(event.getBody());

        //2 根据是否包含字符串“nuo” 添加不同的头信息
        if (body.contains("nuo")){
            headers.put("type","nuo");
        }
        else {
            headers.put("type","other");
        }

        //3 返回数据
        return event;
    }

    //批量
    @Override
    public List<Event> intercept(List<Event> events) {

        //1 清空集合
        addHeaderEvents.clear();

        //2 遍历events
        for (Event event : events) {
            addHeaderEvents.add(intercept(event));
        }

        //3 返回数据
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

3.3 打包并将其放在Hadoop102 flume/lib 路径下
当启动Hadoop102的flume进程的时候 系统会自动根据配置文件中的全类名类lib目录下寻找对应的jar包
(无需分发到其他两台节点,因为其他两台节点只是用来打印日志)

4 编辑配置文件

在Hadoop102 flume/job/group4下创建flume1 flume2 flume3

Hadoop102对应flume1

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.nuo.flume.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.nuo = c1
a1.sources.r1.selector.mapping.other = c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

注意:配置文件中的拦截器的类型需要根据自己写的拦截器的全类名进行修改替换

Hadoop103对应flume2

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

Hadoop103对应flume3

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

flume2和flume3 的port和bind根据flume1中的定义进行修改即可

把group4分发给其他两个节点

4 分别启动flume2 flume 3 flume1 进程(先群起Hadoop集群)
Hadoop103 和 Hadoop104 需要打印在控制台

[nuo@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume2.conf -Dflume.root.logger=INFO,console

[nuo@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume3.conf -Dflume.root.logger=INFO,console

[nuo@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume1.conf

5 在Hadoop102 启动nc localhost 44444 输入相关 内容进行测试

[nuo@hadoop102 flume]$ nc localhost 44444
hello
OK
nuonuo is hadsome
OK
nu shafd
OK
sdsd
OK
sd
OK
sd
OK
nuo
OK

Hadoop104 控制台

2022-05-25 11:32:17,656 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{type=other} body: 68 65 6C 6C 6F                                  hello }



2022-05-25 11:48:03,169 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{type=other} body: 6E 75 20 73 68 61 66 64                         nu shafd }
2022-05-25 11:48:03,169 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{type=other} body: 73 64 73 64                                     sdsd }
2022-05-25 11:48:03,170 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{type=other} body: 73 64                                           sd }
2022-05-25 11:48:03,170 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{type=other} body: 73 64                                           sd }

Hadoop103控制台

2022-05-25 11:32:43,122 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{type=nuo} body: 6E 75 6F 6E 75 6F 20 69 73 20 68 61 64 73 6F 6D nuonuo is hadsom }
2022-05-25 11:48:09,605 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{type=nuo} body: 6E 75 6F                                        nuo }

学的时候踩的坑:代码里的头信息type必须和配置文件里的对应 否则无法打印相关信息


版权声明:本文为xiaonuoyo原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。