flume自定义拦截器,可限制采集速度

public class SpeedInterceptor implements Interceptor{

private static final Logger logger = LoggerFactory.getLogger(SpeedInterceptor.class);
private static long KB = 1024L;
private long lastEventSentTick = System.nanoTime();
private long pastSentLength = 0L;
private long max;
//1秒钟=1000000000L纳秒
private long timeCostPerCheck = 1000000000L;
private boolean flag = true;
private int num = 0;

public SpeedInterceptor(long limitRate){
this.max = (limitRate * KB);
}

@Override
public void initialize() {
// no-operation
}

@Override
public Event intercept(Event event) {
this.num +=1 ;
if(this.pastSentLength > this.max) {
//获取纳秒
long nowTick = System.nanoTime();
//倍数
long multiple = this.pastSentLength / this.max;
//缺失时间=所需时间-实际时间
long missedTime = multiple * this.timeCostPerCheck - (nowTick - this.lastEventSentTick);
if(missedTime > 0L) {
try {
/*                    System.out.printf("Limit source send rate, headerLength:%d,pastSentLength:%d,lastEventSentTick:%d,sleepTime:%d, num:%d\n",
headerSize, pastSentLength, lastEventSentTick, missedTime / 1000000, num);*/
//                    logger.info("missedTime:="+missedTime);
Thread.sleep(missedTime / 1000000L,(int)(missedTime % 1000000L));
} catch(InterruptedException e){

//发生异常时抛出,flume会自行处理这个可以异常
throw new RuntimeException(e);
}
}
this.num = 0;
this.pastSentLength = 0L;
this.lastEventSentTick = (nowTick + (missedTime > 0L ? missedTime : 0L));
}
this.pastSentLength += event.getBody().length;
return event;
}

@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;

}

@Override
public void close() {
// no-operation
}


public static class Builder implements Interceptor.Builder {

private long limitRate;
@Override
public Interceptor build() {
return new SpeedInterceptor(this.limitRate);
}
@Override
public void configure(Context context) {
this.limitRate = context.getLong(Constants.LIMIT_RATE, Long.valueOf(Constants.DEFAULT_RATE));
//this.headerSize = context.getLong(Constants.HEADER_SIZE, Long.valueOf(Constants.DEFAULT_SIZE));
}

public static class Constants {
public static long DEFAULT_RATE = 500L;
//public static long DEFAULT_SIZE = 16L;
public static String LIMIT_RATE = "limitRate";
}
}
}


版权声明:本文为qq_40026968原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。