KafkaAppender 使用的问题

 

KafkaAppender 核心发送信息代码

    private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
        final Layout<? extends Serializable> layout = getLayout();
        byte[] data;
        if (layout instanceof SerializedLayout) {
            final byte[] header = layout.getHeader();
            final byte[] body = layout.toByteArray(event);
            data = new byte[header.length + body.length];
            System.arraycopy(header, 0, data, 0, header.length);
            System.arraycopy(body, 0, data, header.length, body.length);
        } else {
            data = layout.toByteArray(event);
        }
        manager.send(data);
    }

 Kafka在发送的时候会判断异步还是同步,异步的时候

public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
        if (producer != null) {
            byte[] newKey = null;

            if(key != null && key.contains("${")) {
                newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
            } else if (key != null) {
                newKey = key.getBytes(StandardCharsets.UTF_8);
            }

            final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
            if (syncSend) {
                final Future<RecordMetadata> response = producer.send(newRecord);
                response.get(timeoutMillis, TimeUnit.MILLISECONDS);
            } else {
                producer.send(newRecord, new Callback() {
                    @Override
                    public void onCompletion(final RecordMetadata metadata, final Exception e) {            
                        if (e != null) {
                            //这里只是打印了下,外部没认为异常,怎么才能故障转移
                            LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
                        }
                    }
                });
            }
        }
    }

 


版权声明:本文为caigaoqing11原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。