1.client的作用:
针对代码执行的过程出现错误日志,打印的logger.error()里的日志进行统一拦截,按照一定的规则封装成数据发送到服务端;
2.原理介绍
利用log4j2提供的扩张功能,主要是扩展log4j2的相关扩展点
@Plugin(name = "LogTracker", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
public class LogTrackerAppender extends AbstractAppender {
public static final int CCT_HOURS = 8;
public final String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
protected final Boolean enabled;
protected final String project;
protected final String senderClient;
protected final String sendUrl;
protected final String hostIp;
protected final Integer retries;
protected Integer totalSizeInBytes;
protected Integer maxBlockMs;
protected Integer ioThreadCount;
protected Integer batchSizeThresholdInBytes;
protected Integer batchCountThreshold;
protected Integer lingerMs;
protected Integer baseRetryBackoffMs;
protected Integer maxRetryBackoffMs;
protected LogSender sender;
private final ProducerConfig producerConfig = new ProducerConfig();
protected LogTrackerAppender(String name, Filter filter, Layout<? extends Serializable> layout,
String project,
String hostIp,
Integer retries,
String sendUrl,
String senderClient,
Integer totalSizeInBytes,
Integer maxBlockMs,
Integer ioThreadCount,
Integer batchSizeThresholdInBytes,
Integer batchCountThreshold,
Integer lingerMs,
Integer baseRetryBackoffMs,
Integer maxRetryBackoffMs,
Boolean enabled) {
super(name, filter, layout);
this.project = project;
this.hostIp = hostIp;
this.retries = retries;
this.sendUrl = sendUrl;
this.senderClient = senderClient;
this.totalSizeInBytes = totalSizeInBytes;
this.maxBlockMs = maxBlockMs;
this.ioThreadCount = ioThreadCount;
this.batchSizeThresholdInBytes = batchSizeThresholdInBytes;
this.batchCountThreshold = batchCountThreshold;
this.lingerMs = lingerMs;
this.baseRetryBackoffMs = baseRetryBackoffMs;
this.maxRetryBackoffMs = maxRetryBackoffMs;
this.enabled = enabled;
}
@Override
public void append(LogEvent event) {
if (enabled) {
// 获取错误信息
String throwableStr = event.getThrown() == null ? event.getMessage().getFormat() : getThrowableStr(event.getThrown());
// 获取traceId
String traceId = TraceContext.traceId();
long timeMillis = event.getTimeMillis();
LocalDateTime localDateTime = Instant.ofEpochMilli(timeMillis).atZone(ZoneOffset.ofHours(CCT_HOURS)).toLocalDateTime();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN);
String time = localDateTime.format(formatter);
LogTrackerRecord trackerRecord = new LogTrackerRecord();
trackerRecord.setLogLevel(event.getLevel().toString());
trackerRecord.setProject(project);
trackerRecord.setHostIp(hostIp);
trackerRecord.setTraceId(traceId);
trackerRecord.setClassName(event.getLoggerName());
if (event.getSource() != null) {
trackerRecord.setResourceName(event.getSource().toString());
}
trackerRecord.setMessage(throwableStr);
trackerRecord.setThreadName(event.getThreadName());
trackerRecord.setTime(time);
String s = JSONObject.toJSONString(trackerRecord, SerializerFeature.PrettyFormat);
LOGGER.error("succ trackerRecord:{}", s);
try {
sender.send(project, Collections.singletonList(trackerRecord));
} catch (Exception e) {
this.error(
"Failed to send log, project=" + project
+ ", logRecordList=" + Collections.singletonList(trackerRecord), e);
}
}
}
@Override
public void start() {
super.start();
if (batchCountThreshold != null && batchCountThreshold > 0 && batchCountThreshold <= ProducerConfig.MAX_BATCH_COUNT) {
producerConfig.setBatchCountThreshold(batchCountThreshold);
}
if (batchSizeThresholdInBytes != null && batchSizeThresholdInBytes > 0 && batchSizeThresholdInBytes <= ProducerConfig.MAX_BATCH_SIZE_IN_BYTES) {
producerConfig.setBatchSizeThresholdInBytes(batchSizeThresholdInBytes);
}
if (ioThreadCount != null && ioThreadCount > 0) {
producerConfig.setIoThreadCount(ioThreadCount);
}
if (baseRetryBackoffMs != null && baseRetryBackoffMs > 0) {
producerConfig.setBaseRetryBackoffMs(baseRetryBackoffMs);
}
if (retries != null) {
producerConfig.setRetries(retries);
}
if (lingerMs != null && lingerMs >= ProducerConfig.LINGER_MS_LOWER_LIMIT) {
producerConfig.setLingerMs(lingerMs);
}
if (maxBlockMs != null) {
producerConfig.setMaxBlockMs(maxBlockMs);
}
if (maxRetryBackoffMs != null && maxRetryBackoffMs > 0) {
producerConfig.setMaxRetryBackoffMs(maxRetryBackoffMs);
}
if (totalSizeInBytes != null && totalSizeInBytes > 0) {
producerConfig.setTotalSizeInBytes(totalSizeInBytes);
}
sender = new LogSender(producerConfig);
sender.putProjectConfig(project, senderClient, sendUrl);
}
@Override
public void stop() {
super.stop();
if (sender != null) {
try {
sender.close();
} catch (Exception e) {
this.error("Failed to close LoghubAppender.", e);
}
}
}
/**
* 获取异常信息
*
* @param throwable Throwable对象
* @return 异常信息
*/
private String getThrowableStr(Throwable throwable) {
if (throwable == null) {
return null;
}
StringBuilder sb = new StringBuilder();
boolean isFirst = true;
for (String s : Throwables.toStringList(throwable)) {
if (isFirst) {
isFirst = false;
} else {
sb.append(System.getProperty("line.separator"));
}
sb.append(s);
}
return sb.toString();
}
@PluginBuilderFactory
public static <B extends LogTrackerAppenderBuilder<B>> B newBuilder() {
return new LogTrackerAppenderBuilder<B>().asBuilder();
}
}
- 当收集到error log会触发 LogTrackerAppender
- 通过 LogSender 发送数据,数据会被加入到 LogAccumulator 中的某个 ProducerBatch 里
在调用 send 方法过程中,如果发现目标 ProducerBatch 包含的日志条数到达了 maxBatchCount 或该 ProducerBatch 剩余的空间无法容纳当前数据,则会首先将该 ProducerBatch 投递到 IOThreadPool 里,然后再新建一个 ProducerBatch 存放当前数据
Mover 会遍历 LogAccumulator 中的每个 ProducerBatch,把超过了缓存时间的 batch 加入 expiredBatches 里。同时会记录未过期 batch 的最近超时时间,记为 t
将从 LogAccumulator 中获取的 expiredBatches 投递到 IOThreadPool 里
获取 RetryQueue 中所有满足发送条件的 ProducerBatch,如果当前没有 batch 满足发送条件则最多等待时间 t
将从 RetryQueue 中获取的 expiredBatches 投递到 IOThreadPool 里。(Mover 完成步骤 7 后会再次进入步骤 4)
IOThreadPool 中的工作线程从阻塞队列里获取 ProducerBatch,然后发送给目标 LogTracker Server
如果数据发送失败,且满足下列条件,会计算当前 ProducerBatch 的下次计划发送时间,然后将其放入将该 ProducerBatch 写入重试队列。
该错误可以重试。
RetryQueue 没有关闭。
LogAccumulator
为了提高吞吐量,一个常见的做法是将若干个小包合并成大包批量发送, LogAccumulator 的主要作用便是合并待发送的数据。LogAccumulator 会根据project 属性将其缓存到内部 map 的不同位置。这个 map 的 key 为上述project属性(后序可能会扩展),value 为 ProducerBatch。为了保证线程安全同时支持高并发,这里选用 ConcurrentMap 作为 map 的实现。
- LogAccumulator 的另一个作用是控制缓存数据的总大小,这里选用 Semaphore 实现控制逻辑。Semaphore 是基于 AQS 实现的高性能同步工具,它会首先尝试通过自旋的方式获取共享资源,减少线程上下文切换的开销。
RetryQueue
RetryQueue 用于存放发送失败待重试的 ProducerBatch,每个 batch 有一个字段用于标识下次计划发送时间。为了高效地获取超时 batch,内部选用 DelayQueue 存放这些 batch。DelayQueue 是一种按时间排序的优先队列,最先超时的 batch 会被优先取出,同时它也是线程安全的。
Mover
Mover 是一个独立的线程,它会循环地将 LogAccumulator 和 RetryQueue 中的超时 batch 投递到 IOThreadPool 里。为了避免空转占用宝贵的 CPU 资源,当 Mover 发现 LogAccumulator 和 RetryQueue 里没有满足发送条件的 batch 时,会在 RetryQueue 的 expiredBatches 方法上等待用户配置的数据最长缓存时间 lingerMs。
IOThreadPool
IOThreadPool 中的工作线程用于真正执行数据发送任务,该线程池的大小可通过参数 ioThreadCount 指定,默认为可用处理器个数乘以 2。
SendProducerBatchTask
SendProducerBatchTask 封装了 batch 发送逻辑。为了避免阻塞 IO 线程,不论当前 batch 发送成功与否都会将其投递到队列中交由独立线程去执行回调。另外,如果某个发送失败的 batch 满足重试条件,不会在当前 IO 线程中立即重试(立即重试通常也会失败),而是根据指数退避策略将其投递到 RetryQueue 中。
BatchHandler
Producer 会启动一个 SuccessBatchHandler 和一个 FailureBatchHandler 分别用来处理发送成功或失败的 batch。Handler 在执行完 batch 的 callback、设置好 batch 的 future 后便会“释放”该 batch 占用的内存空间,供新的数据使用。分开处理的原因是为了隔离发送成功和发送失败的 batch,保持 producer 整体的流动性。