10 Kafka开荒

一. Kafka开荒

1.1 什么是消息队列

  1. 点对点模式
    在这里插入图片描述
    一对一,消费者主动拉取数据,消息收到后消息清除
通常是一个 基于拉取或者查询的消息传送模型,从队列中请求信息,而不是将消息推送到客户端。
只能有一个接收者 , 消息收到后Kafka清除数据
  1. 发布订阅模式
    在这里插入图片描述
    一对多,数据产生后,推送给所有订阅者
基于 推送的笑死传送模型,发布订阅模型可有多种不同的订阅者
订阅者又分为:
	临时订阅者 , 主动监听才能收到消息
	持久订阅者 , 监听主题的所有消息,即使订阅者离线

1.2 消息队列优点

  1. 解耦
允许独立扩展 修改两边的处理过程,只要确保他们遵循同样的接口约束
不影响两方Server的运行
  1. 冗余
把数据持久化直到他们完全被处理, 规避了数据丢失的风险
很多消息队列采用 “插入-获取-删除”范式,
在把一个消息从队列中删除之前,需要确定
  1. 扩展性
消息队列解耦了处理过程
so 增大消息入队 & 处理的频率是很容易的,只要另外增加处理即可
  1. 灵活性 & 峰值处理能力
访问量剧增,应用仍然需要继续发挥作用
消息队列可以让关键组件顶住突发的访问压力
  1. 可恢复性
降低了进程间的耦合度,
所以即使一个处理消息的进程挂掉,加入队列的消息仍然可以在系统恢复后被处理
  1. 顺序保证
大部分消息队列是排序的
Kafka保证一个Partition内的消息的有序性
  1. 缓冲
解决高峰速度,生产和消费速度不一致的情况。
类似于三峡 削峰
  1. 异步通信
允许生产者 先生产 先放到队列中 , 消费者可以不立即处理 , 需要处理的时候再去处理
当然能存多少,受Kafka集群规模影响,虽然也有把Kafka当作数仓的案例

1.3 Kafka简介

kafka.apache.org

在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。

  1. 开源消息队列,Scala编写,Apache托管
  2. Kafka最初由LinkedIn开发,在2011年开源,2012年毕业
  3. Kafka为处理实时数据提供一个统一,高质量,低等待的平台
  4. 分布式消息队列。Kafka对消息保存时对Topic分类,Producer Consumer
  5. Kafka集群有多个kafka实例,每个实例(server)称为broker
  6. 无论是Kafka集群,还是consumer都依赖于ZK集群保存一些meta信息,保证系统可用性

1.4 Kafka架构设计

Kafka整体架构图

在这里插入图片描述
Kafka详细架构图

在这里插入图片描述

  1. Producer : 向Kafka broker发送消息 的客户端
  2. Consumer : 从Kafka broker 获取消息的客户端
  3. topic : 可以理解成为一个队列
  4. Consumer Group(CG) : Kafka用来实现一个topic消息的广播(发给所有的Consumer) & 单播的手段。
一个Topic可以有多个CG,Topic的消息会复制(概念上的复制)到所有CG
但是每个partition只会把消息发给CG中的一个Consumer。
	如果需要广播,只要每个consumer都有一个独立的CG即可
	如果需要单播,所有consumer在同一个CG

用CG还可以将consumer进行自由的分组
  1. Broker : 一台Kafka服务器就是一个broker,一个broker 可以容纳多个topic
  2. Partition : 一个有序的队列
为了实现扩展性,一个非常大的topic可以分布到多个broker上,  1 topic : n partition
每个partition是一个有序队列。
Partition中每条消息都会被分配一个有序ID (offset)
Kafka只保证按一个partition中的ID排序发给consumer ,不保证一个topic整体
  1. Offset
用偏移量来命名,方便查找
例如想找2049的位置,只要找到2048.kafka的文件即可
当然the first offset就是00000000000.kafka。

二. 环境部署

2.1 解压安装包

  1. 解压
[ifeng@ifeng software]$ tar -zxvf kafka_2.12-2.2.1.tgz -C ~/app
  1. 创建软连接
[ifeng@ifeng app]$ ln -s kafka_2.12-2.2.1/ kafka
  1. 创建logs
mkdir kafka/logs
  1. 修改配置文件
cd config
[ifeng@ifeng config]$ vi server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径	
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

  1. 配置环境变量
vi etc/profile
export KAFKA_HOME=/home/ifeng/app/kafka


export PATH=${KAFKA_HOME}/bin:${MAVEN_HOME}/bin:/home/ifeng/app/zookeeper/bin:
  1. 分发安装包
 xsync kafka/
注意:分发之后记得配置其他机器的环境变量
  1. 修改broker.id
分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
	注:broker.id不得重复
  1. 启动集群
    依次在hadoop102、hadoop103、hadoop104节点上启动kafka
bin/kafka-server-start.sh config/server.properties &
  1. 关闭集群
bin/kafka-server-stop.sh stop

2.2 Kafka 命令行操作

  1. 查看当前服务器中所有的topic
bin/kafka-topics.sh --zookeeper ifeng:2181 --list
  1. 创建topic
bin/kafka-topics.sh --zookeeper ifeng:2181 \
--create --replication-factor 1 \
--partitions 1 \
--topic first
  • topic 定义topic的名称
  • replication-factor 定义副本数、
  • partition 定义分区数

在这里插入图片描述

一个broker只能有一个分区,但是可以在一台机器上启动多个broker

# 两个broker 的 broker.id 要不同
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
  1. 发送消息
bin/kafka-console-producer.sh \
--broker-list ifeng:9092 --topic first
>hello world

  1. 消费消息
bin/kafka-console-consumer.sh \
--bootstrap-server ifeng:2181 \
--from-beginning --topic first
  1. 删除topic
bin/kafka-topics.sh --zookeeper ifeng:2181 \
--delete --topic first

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

  1. 查看某个Topic的详情
bin/kafka-topics.sh --zookeeper ifeng:2181 \
--describe --topic first

三. Kafka工作流程分析

在这里插入图片描述

3.1.1 写入方式

producer采用推模式 将消息发布到broker , 每条消息都别追加(append)到分区(partition)中,属于顺序写磁盘,顺序写入磁盘比随机写入内存 效率要高,保证kafka高吞吐

3.1.2 分区(partition)

消息都被发送到一个topic , 本质就是一个目录
topic就是由一些Partition Log(分区日志)组成
在这里插入图片描述
在这里插入图片描述
每个Partition中的消息都是有序的,生产的消息被不断追加到 Partition Log上,其中每一个消息都被赋予一个唯一的offset

1 分区的原因

  • 方便在集群中扩展,每个Partition可以通过调整来适应它所在的机器,一个topic 又可以由多个partition组成,任意大小的数据都可以
  • 提高并发,以partition为单位进行读写

2 分区的原则

  • 指定了partition,直接使用
  • 未指定partition 但指定了key,通过对key的value进行一个hash出一个partition
  • partition和key都未指定,轮询选出一个patition
DefaultPartitioner类
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

3.1.3 副本Replication

同一个partition可能会有多个replication (server.properties配置中的default.replication.factor=N)
没有replication的请跨下,一旦一个broker宕机,其上所有的partition的数据都消失,producer也不能再将数据存到其他patition。
多个replication需要选举出一个leader
producer只能于这个leader进行交互,其他replication只能作为follower从leader中copy

3.1.4 写入数据

在这里插入图片描述

  1. Producer 从ZK的 /brokers/…/state 节点找到该partition的leader
  2. Producer 将消息发送给leader
  3. leader 将 消息写入到log
  4. follower 从leader pull 消息,写入本地log后向leader发送ACK
  5. leader 收到所有ISR中的ACK后,增加HW(high watermark , 最后commit的offset)并向producer发送ACK

3.2 Broker 保存消息

3.2.1 存储方式

物理上把topic 分成一个 or 多个 partition (对应server.properties中的num.partitions=3配置)
每个patition物理上对应一个文件夹(该文件夹存储该partition的所有消息&索引文件)

[ifeng@hadoop102 logs]$ ll
drwxrwxr-x. 2 ifeng ifeng  4096 86 14:37 first-0
drwxrwxr-x. 2 ifeng ifeng  4096 86 14:35 first-1
drwxrwxr-x. 2 ifeng ifeng  4096 86 14:37 first-2
[ifeng@hadoop102 logs]$ cd first-0
[ifeng@hadoop102 first-0]$ ll
-rw-rw-r--. 1 ifeng ifeng 10485760 86 14:33 00000000000000000000.index
-rw-rw-r--. 1 ifeng ifeng      219 86 15:07 00000000000000000000.log
-rw-rw-r--. 1 ifeng ifeng 10485756 86 14:33 00000000000000000000.timeindex
-rw-rw-r--. 1 ifeng ifeng        8 86 14:37 leader-epoch-checkpoint

3.2.2 存储策略

无论消息是否被消费,kafka都会保留所有消息,有两种策列可以删除旧数据:

  • 基于时间:log.retention.hours = 168
  • 基于大小:log.retention.bytes = 10737418224

Action:因为Kafka读取特定消息的时间复杂度为O(1) , 即于文件大小无关,所以删除过期文件于提高kafka性能无关

3.3.3 Zookeeper存储结构

在这里插入图片描述

Producer不在ZK 中注册 , 消费者在ZK中注册

3.3 Kafka消费过程分析

kafka 提供了两套consumer API:高级Consumer API & 低级 API

3.3.1 高级API

1 高级API的优点

  • 写法简单
  • 不需要自行管理offset,通过ZK自动管理
  • 不需要进行分区管理,副本不需要管理
  • Consumer断线会自动根据上一次记录在ZK中的offset去接着获取数据(默认一分钟更新一次ZK中的offset)
  • 可以使用group来区分对同一个topic的不同程序访问分离开来(不同group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)

2 高级API的缺点

  • 不能自行空值offset(对于某些特殊需求)
  • 不能细化空值如分区、副本、ZK

3.3.2 低级API

1 低级API的优点

  • 自行空值offset
  • 自行控制链接分区,对分区自定义进行负载均衡
  • 对ZK的依赖程度降低(offset不一定要靠zk存储,自行存储也可以 比如在文件或内存中)

2 低级API的缺点

太过复杂,需要自定控制offset , 链接哪个分区 , 找到分区leader等

3.3.3 消费者组

在这里插入图片描述
N个consumer组成 consumer group

同一时间只能由group中的一个consumer读取, 但是可以多个group同时消费这个partition

图中,1 Consumer : 2 Partition 1 Consumer : 1 Partition 。某个消费者读取对应的某个分区,也可以叫做分区拥有者

这种情况下,水平扩展可以同时读取到大量的消息。若 一个consumer读取失败了,其他消费者负载均衡读取之前失败的消费者分区

3.3.4 消费方式

consumer采用 pull 模式从broker中获取数据

push模式很难适应消费速率不同的consumer , 因为 消息发送速度由broker决定
它的目标为尽快传递速度,但是这样很容易造成consumer来不及处理,可能造成网路拥塞

pull模式可与根据consumer的消费能力以适当的速度消费消息,简化了broker的设计,
consumer可以自主控制消费消息的速率,
consumer可以控制消费的方式----批量消费 逐条消费 ,
consumer还可以选择不同的提交方式从而实现不同的传输语义

pull缺点:若kafka没有消息,可能陷入循环中,一直在等待数据到达
为了避免陷入这种循环,pull 请求中设定参数 ,允许consumer在等待数据等待的 长轮询 中进行阻塞(并且可选地等待到给定地字节数,确保传输大小)

3.3.5 CG案例

需求 : 同一个消费组中的消费者,同时是能有一个消费者消费

  1. 在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id属性为任意组名。
vi consumer.properties
group.id=ifeng

  1. 在hadoop102、hadoop103上分别启动消费者
[ifeng@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 \
--topic first \
--consumer.config config/consumer.properties
[ifeng@hadoop103 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 \
--topic first \
 --consumer.config config/consumer.properties

  1. 在hadoop104上启动生产者
bin/kafka-console-producer.sh \
--broker-list hadoop:102:9092 --topic first

同一时间只有一个消费者接收到消息

四. Kafka API实战

4.1 环境准备

  1. 启动kafka 打开一个消费者
bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 \
--topic first
  1. 导入pom
<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>0.11.0.0</version>
    </dependency>
</dependencies>

4.2 Kafka生产者JavaAPI

4.2.1 创建生产者

过时的API

package KafkaAPI;

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;




public class OldProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("metadata.broker.list","ifeng:9092");
        properties.put("requset.required.acks","1");
        properties.put("serializer.class","kafka.serializer.StringEncoder");

        Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));

        KeyedMessage<Integer, String> message = new KeyedMessage<>("first", "hello world");
        producer.send(message);


    }
}

新API

package KafkaAPI;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class NewProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 通过Properties设置主机号和端口号
        properties.put("bootstrap.servers","ifeng:9092");
        // 等待所有节点的应答
        properties.put("acks","all");
        // 消息发送最大尝试次数
        properties.put("retries",0);
        // 一批消息处理大小
        properties.put("batch.size",16384);
        // 请求延迟
        properties.put("linger.ms",1);
        // 发送缓冲区内存大小
        properties.put("buffer.memory",33554432);
        // key序列化
        properties.put("key.serializer","org.apache.kafka.common.serialiaztion.StringSerializer");
        // value序列化
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        for(int i = 0 ; i < 50 ; i ++){
            producer.send(new ProducerRecord<String,String>("first",Integer.toString(i),"hello world-" + i));
        }
        
        producer.close();


    }
}

创建生产者带回调函数(新API)

package KafkaAPI;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class CallBackProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 通过Properties设置主机号和端口号
        properties.put("bootstrap.servers", "ifeng:9092");
        // 等待所有节点的应答
        properties.put("acks", "all");
        // 消息发送最大尝试次数
        properties.put("retries", 0);
        // 一批消息处理大小
        properties.put("batch.size", 16384);
        // 请求延迟
        properties.put("linger.ms", 1);
        // 发送缓冲区内存大小
        properties.put("buffer.memory", 33554432);
        // key序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialiaztion.StringSerializer");
        // value序列化
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 50; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i),new Callback(){

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (metadata != null) {

                        System.err.println(metadata.partition() + "---" + metadata.offset());
                    }
                }
            });

        }

        producer.close();
    }
}

4.2.4 自定义分区生产者

需求 : 将所有的数据存储到topic的0号分区上

定义实现类Partitioner接口

// 过时API
package KafkaAPI;

import java.util.Map;
import kafka.producer.Partitioner;


public class CustomPartitioner implements Partitioner {

    public CustomPartitioner() {
        super();
    }

    @Override
    public int partition(Object key, int numPartitions) {
        // 控制分区
        return 0;
    }
}

自定义分区

package KafkaAPI;

import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;



public class CustomPartitioner implements Partitioner {

    @Override
    public void configure(Map<String, ?> configs) {

    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 控制分区
        return 0;
    }

    @Override
    public void close() {

    }
}

调用此生产者

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class PartitionerProducer {

	public static void main(String[] args) {
		
		Properties props = new Properties();
		// Kafka服务端的主机名和端口号
		props.put("bootstrap.servers", "ifeng:9092");
		// 等待所有副本节点的应答
		props.put("acks", "all");
		// 消息发送最大尝试次数
		props.put("retries", 0);
		// 一批消息处理大小
		props.put("batch.size", 16384);
		// 增加服务端请求延时
		props.put("linger.ms", 1);
		// 发送缓存区内存大小
		props.put("buffer.memory", 33554432);
		// key序列化
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// value序列化
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 自定义分区
		props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner");

		Producer<String, String> producer = new KafkaProducer<>(props);
		producer.send(new ProducerRecord<String, String>("first", "1", "ifeng"));

		producer.close();
	}
}

在ifeng上监控/opt/module/kafka/logs/目录下first主题3个分区的log日志动态变化情况

数据都存储到指定的分区了。

4.3 Kafka 消费者API

高级API

  1. 在控制台创建发送者
bin/kafka-console-producer.sh \
--broker-list ifeng:9092 --topic first
>hello world
  1. 创建消费者(过时API)
package KafkaAPI;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class CustomConsumer {

    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.put("zookeeper.connect", "ifeng:2181");
        properties.put("group.id", "g1");
        properties.put("zookeeper.session.timeout.ms", "500");
        properties.put("zookeeper.sync.time.ms", "250");
        properties.put("auto.commit.interval.ms", "1000");

        // 创建消费者连接器
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

        HashMap<String, Integer> topicCount = new HashMap<>();
        topicCount.put("first", 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);

        KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);

        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        while (it.hasNext()) {
            System.out.println(new String(it.next().message()));
        }
    }
}

官方提供的消费者(新API)

package KafkaAPI;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class CustomNewConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        // 定义kakfa 服务的地址,不需要将所有broker指定上
        props.put("bootstrap.servers", "ifeng:9092");
        // 制定consumer group
        props.put("group.id", "test");
        // 是否自动确认offset
        props.put("enable.auto.commit", "true");
        // 自动确认offset的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 定义consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 消费者订阅的topic, 可同时订阅多个
        consumer.subscribe(Arrays.asList("first", "second","third"));

        while (true) {
            // 读取数据,读取超时时间为100ms
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

五. Kafka拦截器

5.1 拦截器原理

Producer 拦截器(interceptor)是在Kafka0.10 版本引入的,实现clients端的定制化控制

Interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息
同时,producer允许用户指定多个interceptor按照顺序作用于同一条消息
形成一条拦截链(interceptor chain)

Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括

(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
(4)close:
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

六. Kafka Stream

Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。

为什么还需要Kafka Stream

第一,Spark和Storm都是流式处理框架,而Kafka Stream提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而Kafka Stream作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。

在这里插入图片描述
第二,虽然Cloudera与Hortonworks方便了Storm和Spark的部署,但是这些框架的部署仍然相对复杂。而Kafka Stream作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。
第三,就流式处理系统而言,基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了Kafka,此时使用Kafka Stream的成本非常低。
第四,使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。但是Kafka作为类库不占用系统资源。
第五,由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。
第六,由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并行度。


版权声明:本文为weixin_39381833原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。