https://github.com/logstash/logstash-logback-encoder/tree/logstash-logback-encoder-4.7#async
TCP Appenders
To output JSON for LoggingEvents over TCP, use a LogstashTcpSocketAppender with a LogstashEncoder or LoggingEventCompositeJsonEncoder.
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>127.0.0.1:4560</destination>
<!-- encoder is required -->
<encoder class="net.logstash.logback.encoder.LogstashEncoder" />
</appender>
<root level="DEBUG">
<appender-ref ref="stash" />
</root>
</configuration>
Unlike the UDP appender, an encoder must be configured for the TCP appenders. You can use a Logstash*Encoder, *EventCompositeJsonEncoder, or any other logback encoder. All of the output formatting options are configured at the encoder level.
Internally, the TCP appenders are asynchronous (using the LMAX Disruptor RingBuffer). All the encoding and TCP communication is delegated to a single writer thread. There is no need to wrap the TCP appenders with another asynchronous appender (such as AsyncAppender or LoggingEventAsyncDisruptorAppender).
All the configuration parameters (except for sub-appender) of the async appenders are valid for TCP appenders. For example, waitStrategyType and ringBufferSize.
The TCP appenders will never block the logging thread. If the RingBuffer is full (e.g. due to slow network, etc), then events will be dropped.
并且配置ringBufferSize 的值必须是2的正n幂
Must be a positive power of 2.
LogstashTcpSocketAppender 与 AsyncDisruptorAppender 的继承关系如下
LogstashTcpSocketAppender -> AbstractLogstashTcpSocketAppender -> AsyncDisruptorAppender
AsyncDisruptorAppender 代码如下
public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAware, Listener extends AppenderListener<Event>> extends UnsynchronizedAppenderBase<Event> {
public static final int DEFAULT_RING_BUFFER_SIZE = 8192;
public static final WaitStrategy DEFAULT_WAIT_STRATEGY = new BlockingWaitStrategy();
/**
* The size of the {@link RingBuffer}.
* Defaults to {@value #DEFAULT_RING_BUFFER_SIZE}.
* If the handler thread is not as fast as the producing threads,
* then the {@link RingBuffer} will eventually fill up,
* at which point events will be dropped.
* <p>
* Must be a positive power of 2.
*/
private int ringBufferSize = DEFAULT_RING_BUFFER_SIZE;
/**
* The {@link ProducerType} to use to configure the disruptor.
* By default this is {@link ProducerType#MULTI}.
* Only set to {@link ProducerType#SINGLE} if only one thread
* will ever be appending to this appender.
*/
private ProducerType producerType = DEFAULT_PRODUCER_TYPE;
/**
* The {@link WaitStrategy} to used by the RingBuffer
* when pulling events to be processed by {@link #eventHandler}.
* <p>
* By default, a {@link BlockingWaitStrategy} is used, which is the most
* CPU conservative, but results in a higher latency.
* If you need lower latency (at the cost of higher CPU usage),
* consider using a {@link SleepingWaitStrategy} or a {@link PhasedBackoffWaitStrategy}.
*/
private WaitStrategy waitStrategy = DEFAULT_WAIT_STRATEGY;
logback.xml 配置如下
<springProfile name="local">
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>127.0.0.1:51401</destination>
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
<ringBufferSize>65536</ringBufferSize>
</appender>
<root level="DEBUG">
<appender-ref ref="ERROR"/>
<appender-ref ref="ROLLING_FILE"/>
<appender-ref ref="LOGSTASH"/>
</root>
</springProfile>
最终ringBufferSize会被设置到一个int数组的size
public final class MultiProducerSequencer extends AbstractSequencer {
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE;
private static final long SCALE;
private final Sequence gatingSequenceCache = new Sequence(-1L);
private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;
public MultiProducerSequencer(int bufferSize, WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
this.availableBuffer = new int[bufferSize];
this.indexMask = bufferSize - 1;
this.indexShift = Util.log2(bufferSize);
this.initialiseAvailableBuffer();
}
...
}