【知识梳理】《Kafka权威指南》知识梳理

Kafka快的原因:

  • 零拷贝:数据拷贝不仅过用户态,直接从内核态到socket缓冲区
  • 顺序读写
  • 分批发送
  • 消息压缩

第1章  初识Kafka

消息:Kafka数据单元

批次:一组消息

主题:Kafka通过主题进行分类,由于一个主题包含几个分区,因此无法保证整个主题范围内消息的顺序,但可以保证消息在单个分区内的顺序

生产者、消费者:消费者把每个分区最后读取的消息偏移量保存在Zookeeper或Kafka上,如果消费者关闭或重启,读取状态不会丢失

消费者群组:会有一个或多个消费者共同读取一个主题,群组保证每个分区只能被一个消费者使用

broker:一个独立的Kafka服务器,单个broker可以轻松处理数千个分区以及每秒百万级的消息量

partition:物理概念,每个topic包含一个或多个partition。每个partition为一个目录,partition命名规则为topic名称+有序序号

分区的复制:

Kafka broker默认的消息保留策略:

(1)保留一段时间(2)保留到消息达到一定大小字节数

多集群的使用范围:

  • 数据类型分离
  • 安全需求隔离
  • 多数据中心

多集群数据复制:MirrorMaker

Kafka使用场景:

  • 活动跟踪
  • 传递消息
  • 度量指标和日志记录
  • 提交日志

第2章  安装Kafka

安装Zookeeper:Zookeeper保存broker、主题、分区的元数据;Kafka最好使用自己的Zookeeper群组

安装Kafka Broker

常规配置:

  • broker.id:每个broker的唯一标示符
  • port:单个事件处理监听9092端口
  • zookeeper.connect: 保存broker元数据的Zookeeper地址
  • log.dirs:日志片段,会在拥有最少分区的路径新增分区
  • num.recovery.threads.per.data.dir
  • auto.create.topics.enable:自动创建主题

主题配置:

  • num.partitions:新创建的主题包含多少个分区(默认为1,可增加分区个数,但不能减少分区个数,分区的大小限制在25G以内)
  • log.retention.ms:根据时间来决定数据可以被保留多久(默认1周)
  • log.retention.bytes:通过保留的消息字节数来判断消息是否过期
  • log.segments.bytes:当日志片段大小达到log.segment.bytes指定上限时,日志片段会被关闭,一个新的日志片段会打开
  • log.segments.ms:多长时间日志片段会被关闭
  • message.max.bytes:限制单个消息的大小

跨集群负载均衡:

如果要把一个broker加入到集群中,只需要修改两个配置参数

(1)所有的broker都必须配置相同的zookeeper.connect

(2)每个broker的broker.id唯一

操作系统调优:

虚拟内存:

  • vm.swappiness:避免内存交换,该值尽量小一点(建议为1)
  • vm.dirty.backgroundratio
  • vm.dirty_ratio:刷新到磁盘之前的脏页数量(大于20)

磁盘

网络:

  • socket缓冲区:net.core.wmem_default、net.core.rmem_default
  • TCP_Socket读写缓冲区:net.ipv4.tcp_wmem、net.ipv4.tcp_rmem
  • 其他:net.ipv4.tcp_window_scaling(时间窗扩展,提升客户端传输数据的效率)、net.ipv4.tcp_max_syn_backlog(并发连接)、net.core.netdev_max_backlog(网络流量爆发,允许更多数据包排队等待内核处理)

第3章  Kafka生产者  ---  向Kafka写入数据

Kafka发送消息步骤:

ProducerRecord包含目标主题和要发送的内容,还可以指定键或分区。在发送ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组,才能在网络传播

记录会被添加到批次中,这个批次会发送到相同的主题和分区上,独立线程负责将记录批次发送到broker上

如果消息成功写入Kafka,就返回一个RecordMetaData对象,包含了主题和分区信息

如果写入失败,则返回一个错误

生产者设置的属性:

  • bootstrap.servers:broker地址清单
  • key.serializer:键对象序列化成字节数组(ByteArraySerializer、StringSerializer、IntegerSerializer)
  • value.serializer:值对象序列化成字节数组

发送消息到Kafka

发送并忘记

ProducerRecord<String,String> record = new ProducerRecord<>("CustomerCountry","Precision Products","France");
try{
    producer.send(record);//消息先放入到缓冲区,单独线程发送到服务器端
}catch (Exception e){
    e.printStackTrace();
}

同步发送消息

producer.send(record).get();

异步发送消息

producer.send(record,new DemoProducerCallback());
private class DemoProducerCallback implements Callback{
    @Override
   public void onCompletion(RecordMetadata recordMetadata,Exception e){
      if(e!=null) e.printStackTrace();
   }
}

生产者配置:

  • acks:

    acks=0:不等待任何服务器的响应,会丢失信息

    acks=1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应(如果一个没有收到消息的节点成为新首领,消息还会丢失)

    acks=2:只有当所有参与复制的节点全部收到消息时,才会成功

  • buffer.memory:生产者内存缓冲区的大小
  • compression.type(snappy、gzip、lz4)
  • retries:重发消息的次数,如果达到次数,则生产者会放弃重试并返回错误
  • batch.size:当有多个消息需要发送到同一个分区时,生产者会把它们放在同一个批次中
  • linger.ms:生产者发送批次前等待更多消息加入的时间
  • client.id:识别消息的来源
  • max.in.flight.requests.per.connection:生产者在收到服务器响应前可以发送多少个消息
  • timeout.ms、request.timeout.ms、metadata.fetch.timeout.ms
  • max.block.ms:最大阻塞时间
  • max.request.size:生产者发送请求的大小
  • receive.buffer.bytes、send.buffer.bytes:TCP socket接收和发送数据包的缓冲区大小

自定义序列化器

分区:

键的用途:(1)可以作为消息的附加信息 (2)决定消息被写入到主题的哪个分区

  • 如果键值为null,且使用了默认的分区器,分区器会轮询将消息分布到各个分区上
  • 如果键值不为空,且使用了默认的分区器,键值通过散列映射

一旦主题增加了新的分区,新来的记录的映射可能会发生改变

自定义分区:

public class DemoPartitioner implements Partitioner{
   public void configure(Map<String,?> configs){}
     public int partition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
         int numPartitions = partitions.size();
         
          if((keyBytes==null)||(!(key instanceof String)))  throw new InvalidRecordException("...");
           if(((String)key).equals("Banana"))  return numPartitions;
             
            return (Math.abs(Utils.murmur2(keyBytes))%(numPartitions - 1));
    } 
}

第4章  Kafka消费者  ---  从Kafka读取数据

消费者群组:一个群组里的消费者订阅同一个主题,每个消费者接收主题一部分分区的信息(可伸缩读取能力和处理能力)

Tips:

  1. 不要让消费者的数量超过主题分区的数量,多余的消费者会被闲置
  2. 当一个消费者被关闭或发生崩溃时,就会离开群组,原本它读取的分区将由群组里的其他消费者负责
  3. 在主题发生变化时,会发生分区重分配

再均衡:分区所有权从一个消费者转移到另一个消费者

再均衡期间,消费者无法读取消息,造成整个群组一小段时间不可用

当分区被重新分配给另一个消费者,消费者当前的读取状态会丢失。它有可能还需要去刷新缓存

群组协调器:消费者通过向群组协调器的broker发送心跳,来维持它们和群组的从属关系以及分区所属权关系

           如果消费者停止发送心跳时间足够长,会话会过期。群组协调器会认为其死亡,发生再均衡(如果消费者发生崩溃,协调器会等待几秒钟)

分配分区的过程:

    当消费者要加入群组时,会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者将成为“群主”。群主从协调器获取群组成员列表,并负责给每一个消费者分配分区

创建消费者:

Properties props = new Properties();
props.put("bootstrap.servers","broker1:9092,broker2:9092");
props.put("group.id","CountryCounter");//group.id 指定了消费者属于哪个消费者群主
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);

订阅主题:

consumer.subscribe(Collections.singletonList("customerCountries"));
consumer.subscribe("test.*");//正则表达式
轮询:
try{
    while(true){
      ConsumerRecords<String,String> records = consumer.poll(100);//轮询
      for(ConsumerRecords<String,String> record : records){
         //record.topic(),record.partition(),record.offset(),record.key(),record.value()
          
         int updatedCount = 1;
           if(custCountryMap.countainsValue(record.value()))  updatedCount = custCountryMap.get(record.value()) + 1;
           custCountryMap.put(record.value(),updatedCount);
          
           JSONObject json = new JSONObject(custCountryMap);
       }
    }
}finally{
    consumer.close();
}

消费者的配置:

  • fetch.min.bytes: 消费者从服务器获取记录的最小字节数
  • fetch.max.wait.ms: 指定broker的等待时间
  • max.partition.fetch.bytes: 指定了每个分区里返回给消费者最大的字节数
  • session.timout.mx: 指定了消费者可以多久不发送心跳(heartbeat.interval.ms 指定了poll方法向协调器发送心跳的频率,heartbeat.interval.ms必须比session.timeout.ms小)
  • auto.offset.reset: (latest / earliest)没有偏移量或偏移量无效的情况下,对记录读取的处理
  • enable.auto.commit: 是否自动提交偏移量
  • partition.assignment.strategy:分区的分配策略

Range:把主题的连续分区分配给消费者,会导致不均衡  

RoundRobin:把主题逐个分配给消费者,会很均衡。所有消费者分配相同数量的分区

  • client.id:标识客户端
  • max.poll.records:控制单子调用call返回的记录数量
  • receive.buffer.bytes 和 send.buffer.bytes :TCP缓冲区大小(-1表示使用系统默认值)

偏移量:针对发生崩溃,或再均衡之后处理数据使用

         消费者会向_consumer_offset的特殊主题发送包含偏移量的消息

再均衡可能会出现的问题:

  • 重复处理:

  • 缺失:

  • 阻塞提交偏移量:consumer.commitSync();   //阻塞提交偏移量
  • 异步提交偏移量:consumer.commitAsync();

回调经常被用于记录提交错误或生成度量指标

consumer.commitAsync(new OffsetCommitCallback(){
   public void onComplete(Map<TopicParition,OffsetAndMetadata> offsets,Exception e){
      ......
   }
});

同步/异步组合提交:普通提交可以使用异步,但是消费者关闭前一定要用同步保证成功

try{
 consumer.commitAsync();
}finally{ 
 consumer.commitSync();
 consumer.close();
}

提交特定偏移量:

Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap();
currentOffsets.put(new TopicPartition(record.topic(),record.partition(),new OffsetAndMetadata(record.offset()+1,"no metadata")));
consumer.commitAsync(currentOffets,null);

ConsumerRebalanceListener:再均衡监听器

  • onPartitionsRevoked
  • onPartitionsAssigned

从特定偏移量开始处理记录:

  • seekToBegining
  • seek
  • seekToEnd

消费者退出:

consumer.wakeup();

会抛出WakeupException异常,不需要处理WakeupException,但退出之后要彻底关闭消费者

反序列化器:

p68

不建议使用自定义序列化器和自定义反序列化器,它们把生产者和消费者紧紧地耦合在一起,并且很脆弱,容易出错


Zookeeper信息维护:

        kafka使用zookeeper来维护群成员的信息,在broker启动的时候,通过创建临时节点把自己ID注册到zk上,kafka组件订阅zookeeper的/brokers/ids路径,当有broker加入集群或退出集群时,这些组件就会获得通知

        broker断开会从zk节点上移除,监听broker列表的kafka组件会被告知

        在完全关闭一个broker之后,使用相同的ID启动一个全新的broker,会立即加入集群,并拥有与旧的broker相同的分区和主题

控制器:

    第一个启动的broker在zk上创建/controller让自己成为控制器,其他broker在控制器上创建zookeeper watch对象,接收节点变更通知(zk负责选举控制器)

    除了一般broker的功能外,还负责分区首领的选举(控制器负责选择分区首领)

   

   zookeeper epoch:每个新选出的控制器通过Zookeeper的条件递增操作获得一个全新的、数值更大的 zk epoch,epoch可以避免“脑裂”(两个节点同时认为自己是控制器)

复制:核心

    kafka使用主题来组织数据

    每个主题被分为若干个分区(分区是基本存储单元)

   每个分区有多个副本。这些副本保存在broker上,每个broker可以保存成百上千个属于不同主题和分区的副本

首领副本:每个分区都有一个首领副本

跟随者副本:从首领副本处复制消息,如果一个副本无法与首领保持一致,在首领发生失效时,它不可能成为新首领

处理请求:

    broker大部分工作是处理:客户端、分区副本、控制器发送给分区首领的请求

   

请求消息标准消息头:

  • Request type
  • Request version
  • Correlation ID
  • Client ID

broker会在它所监听的每一个端口运行一个Acceptor线程,该线程会创建一个连接,并交给Processor线程处理。

请求类型:1、生产请求(写入) 2、获取请求(读取) 3、元数据请求

生产请求和获取请求都必须发送给分区的首领副本上,元数据请求可以是任意一个broker

一般情况下,客户端会把这些信息缓存起来,并直接往目标broker发送生产请求和获取请求。但要不时发送元数据请求刷新信息

kafka使用零复制技术向客户端发送消息(不需要经过任何中间缓冲区)

等所有同步副本复制了消息之后,才允许对消费者可见

    

文件管理:

将分区划分成片,方便在大文件中查找或删除消息

主题设置了保留期限,规定数据被删除之前保留多长时间

文件格式:

Kafka的消息和偏移量保存在文件中,保存在磁盘上的数据格式与从生产者发送过来或发送给消费者的消息格式一致。

kafka使用零复制技术给消费者发送消息,同时避免对消费者已经压缩的消息解压或再压缩。

分区的分配:

首先通过轮询的方式,给每个分区确定首领分区

为分区选好broker之后,要在对应的broker下分配目录。新的分区总是添加到数量最小的那个目录中。

DumLogSegment 校验压缩内容

bin/kafka-run-class.sh  kafka.tools.DumpLogSegments

索引:

    Kafka为每个分区维护一个索引,若索引损坏会重新生成。


第6章  可靠的数据传递

Kafka可靠性保证机制:

  • kafka可以保证分区消息的顺序
  • 只有当消息被写入分区的所有同步副本时,才被认为是“已提交”的
  • 只要有一个副本活跃,已提交的消息就不会丢失
  • 消费者只能读到已经提交的消息

kafka的复制机制   和   分区的多副本架构是kafka可靠保证的核心

分区同步副本时,跟随者副本同步需要满足以下条件:

  • 与zk之间有活跃会话
  • 规定时间内从首领处获取过消息

    不满足上述条件的副本会被认为是不同步的。不同步副本可以恢复成同步副本。如果一个副本或多个副本在同步和非同步状态之间快速切换,说明集群内部有问题。通常是Java不恰当垃圾回收配置导致。

broker可靠性关键配置:

  • replication.factor: 主题的副本系数
  • unclean.leader.election: 不完全首领选举   true:允许不同步的副本成为首领,存在丢失数据风险  false:等待原先的首领重新上线,降低可用性
  • min.insync.replicas:最少有多少同步副本,才能写入数据

消费者可靠性配置:

  • group.id
  • auto.offset.reset:规定没有偏移量时做什么,怎么做(earlist、latest)
  • enable.auto.commit:让消费者基于任务调度自动提交偏移量
  • auto.commit.interval.ms:自动提交的频率

kafka现在不支持“Exactly-Once”的语义,最常用的方法是写入到支持唯一键的系统中(保持幂等性)


第7章  构建数据管道

构建数据通道的场景:

kafka作为两个通道的端点(s3->kafka、kafka->mongoDB)

kafka作为两个端点的中间媒介(Flume->kafka->Spark Streaming)

使用kafka价值:可以作为数据管道各个数据段之间的大型缓冲区,有效地解耦数据管道数据的生产者和消费者

需要考虑的问题:(1)及时性(2)可靠性(3)高吞吐量(4)数据格式 (5)转换

转换:

  • ETL: 提取--转换--加载
  • ELT:提取--加载--转换(只做少量转换)(高保真数据管道 / 数据湖架构)

Connect API和客户端API:

如果要将Kafka连接到数据存储系统,可以使用Connect

如果要连接的数据存储系统没有相应的连接器,可以考虑客户端API或Connect API开发应用程序

Connet:Connect是Kafka的一部分,为在Kafka和外部数据存储系统之间移动数据提供了一种可靠可伸缩的方式。

          Connect以worker进程集群的方式运行,我们基于worker进程安装连接器插件,然后使用REST API来管理和配置connector。worker进程是长时间持续运行的作业。

          Connect把数据提供给worker,数据池的连接器负责从worker进程获取数据,并写入目标系统

运行Connect:

  • 分布模式:bin/connect-distributed.sh config/connect-distributed.properties
  • 单机模式:bin/connect-standalone.sh  config/connect-distributed.properties

  Connect重要参数:

bootstrap.servers

group.id

检查REST API是否正常: curl  http://localhost:8083/

检查已经安装好的连接器插件: curl  http://localhost:8083/connector-plugins

连接器负责的内容:

  • 决定需要运行多少个任务
  • 按照任务来拆分数据复制
  • 从worker进程获取任务配置并将其传递下去

第8章  跨集群数据镜像

集群间的复制叫做镜像

Hub 和 Spoke架构:

    在数据访问方面所有限制,因为区域中心之间的数据完全独立

双活架构:

    它可以为就近的用户提供服务,具有性能上的优势,而且不会因为数据的可用性问题,在功能上作出牺牲。因为每个数据中心具备完整的功能,一旦一个数据中心发生失效,就可以把用户重定向到另一个数据中心。在这种架构中,在kafka头部信息可以包含源数据中心的信息,避免信息循环镜像。

主备架构:

MirrorMaker为每一个消费者分配一个线程,消费者从原集群的主题和分区上读取数据,然后通过公共生产者将数据发送到目标集群上。消费者每60s通知生产者发送所有数据到Kafka,并等待Kakfa的确认。然后消费者再通知原集群提交这些事件对应的偏移量

bin/kafka-mirror-maker  --consumer.config  xxx  --producer.config xxx --new.consumer --num.streams=2(流的数量) --whitelist ".*"(镜像的主题)

MirrorMaker的基本命令行参数:

  • consumer.config:指定消费者的配置文件(auto.commit.enable一般为false、auto.offset.reset一般为latest)
  • producer.config
  • new.consumer
  • num.streams

whitelist:镜像的主题名字(正则表达式)


第9章  管理Kafka

主题操作:

  • 创建、修改、删除和查看集群的主题:kafka-topics.sh
  • 创建主题:kafka-topics.sh  --zookeeper  xxx  --create  --topic(主题名称)  --replication-factor(复制系数)  --partitions(分区数量)  --disable-rack-aware (不需要机架分配策略)  --if-not-exeists (忽略重复创建主题错误)
  • 删除主题:

kafka-topics.sh  --zookeeper  xxx  --delete  --topic  xxxx (delete.topic.enable 必须设置为true)

  • 列出主题:kafka-topics.sh  --zookeeper  xxx  --list
  • 列出主题详细信息:

kakfa-topics.sh --zookeeper  xxx --describe  --under-replicated-partitions(列出所有包含不同步副本的分区)  --unavailable-partitions(列出所有没有首领的分区) --topics-with-overrides(包含覆盖配置的主题)

分区操作:

增加分区:kafka-topics.sh  --zookeeper  xxx  --alter  --topic xxx  --partitions nums

无法删除分区。建议一开始就设置好分区,如果删除分区,只能删除主题后重建

消费者群组:

    对于旧版本,信息保存在zookeeper上;对于新版本,信息保存在broker上

列出旧版本消费者群组:kafka-consumer-groups.sh --zookeeper  xxx  --list

列出新版本消费者群组:kafka-consumer-groups.sh --new-consumer --bootstrap-server xxxx --list

指定群组:kafka-consumer-groups.sh --zookeeper  xxx --describe --group xxx

删除群组:

    只有旧版本消费者客户端才支持删除群组的操作,在执行操作之前,必须关闭所有的消费者。

删除群组:kafka-consumer-groups.sh  --zookeeper  xxx  --delete  --group  xxx

删除群组某个topic:kafka-consumer-groups.sh  --zookeeper xxx delete --group xxx --topic xxx

偏移量管理:

导出偏移量:kafka-run-class.sh  kafka.tools.ExportZKOffsets --zkconnect  xxxx  --group xxxx --output-file  outputfile

导入偏移量:kafka-run-class.sh  kafka.tools.ImportZKOffets  --zkconnect  xxxx  --input-file  inputfile

动态配置管理:

kafka-configs.sh  p143

列出被覆盖的配置:kafka-configs.sh --zookeeper  xxxx --describe --entity-type topics --entity-name xxx

移除被覆盖的配置:kafka-configs.sh --zookeeper  xxxx --alter  --entity-type  topics  --entity-name xxxx --delete-config xxx

手动触发分区选举:kafka-preferred-replica-election.sh

修改分区副本:kafka-reassign-partitions.sh

副本验证:kafka-replica-verification.sh(集群副本的一致性):kafka-replica-verification.sh  --broker-list xxx,yyy,zzz  --topic-whrite-list 'ttt'

控制台消费者:kafka-console-consumer.sh

--formatter  CLASSNAME:指定消息格式化器的类名,用于解码消息

--from-begining

--max-message NUM:在推出前最多读取NUM个消息

--partition NUM: 指定只读ID为NUM的分区

kafka-console-consumer.sh  --zookeeper  xxx --topic __consumer_offsets  --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessages' --max-message 1

控制台生产者:kafka-console-producer.sh

--broker-list:指定了一个或多个broker

--topic:指定生成消息的目标日志,在生成完消息之后,需要发送一个EOF字符来关闭客户端

kafka-console-producer.sh --broker-list xxx,yyy,zzz  --topic my-topic

移除集群控制器:手动删除ZooKeeper中的/controller

取消分区重分配:手动删除/admin/reassign_partitions节点,重新进行控制器选举

移除待删除的主题:手动删除/admin/delete_topic

手动删除主题:

  1. 先关闭所有broker
  2. 删除ZooKeeper下的/brokers/topics/TOPICNAME
  3. 删除每个broker分区目录
  4. 重启所有broker

第10章  监控Kafka


第11章  流式处理 

kafka客户端类库中提供了强大的流式处理类库

数据流:无边界数据集的抽象表示

数据流模型的属性:(1)有序(2)不可变的数据记录(3)事件流可重播

流式处理中的“时间”:

  1. 事件时间:所追踪事件发生时间和记录的创建时间
  2. 日志追加时间:指事件保存到broker的时间
  3. 处理时间:应用程序在收到事件之后要对其进行处理的时间

流式一系列事件,每个时间就是一个变更

为了将流转化成表,需要“应用”流里面所包含的所有变更,也就是流的“物化”

流式处理的设计模式:

  • 单个事件处理

  • 使用本地状态

  • 多阶段处理和重分区

  • 使用外部查找

  • 流和流的对接

uploading.gif

Kafka有两个基于流的API,一个是底层的Processor API,一个是高级的Streams DSL

Streams DSL:在使用DSL API时,一般会先用StreamBuilder创建一个拓扑,拓扑是一个有向图(DAG),包含了各个转换过程,将会被应用到流的事件上。在创建好拓扑后,使用拓扑创建一个KafkaStreams执行对象。多个线程会随着KafkaStreams对象启动,将拓扑应用到流的事件上。

//简单聚合Demo:生产者和消费者内嵌到Kafka Streams引擎里

public class WordCountExample{
    public static void main(String[] args) throws Exception{
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serde.String().getClass().getName());
    }
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String,String> source = builder.stream("wordcount-input");
        final Pattern pattern = Pattern.complie("\\W+");
        KStream counts = source.flatMapValues(value->Arrays.asList(pattern.split(value.toLowerCase()))).map(key,value) -> new KeyValue<Object,Object>(value,value)).filter((key,value)->(!value.equals("the")))).groupByKey().count("CountStore").mapValues(value->Long.toString(value)).toStream();
        counts.to("wordcount-output");
}

//基于时间窗口聚合Demo

KStream<TickerWindow,TradeStats> stats = source.groupByKey().aggregate(TradeStates::new,
(k,v,tradestats -> tradestats.add(v),TimeWindows.of(5000).advanceBy(1000), new TradeStatsSerde(),"trade-stats-store").toStream((key,value) -> new TickerWindow(key.key(),key.window().start()).mapValues((trade) -> trade.computeAvgPrice());

stats.to(new TickerWindowSerde(),new TradeStats(),"stockstatoutput");

//点击流Demo

KStream<Integer,PageView> views = builder.stream(Serdes.Integer(),new PageViewSerde(),Constants,PAGE_VIEW_TOPIC);
KStream<Integer,Search> searches = builder.stream(Serdes.Integer(),new SearchSerde(),Constants.SEARCH_TOPIC);
KStream<Integer,UserProfile> profiles = builder.stream(Serdes.Integer90,new Pro.fileSerde(),Constants.USER_PROFILE_TOPIC,"profile-store");
KStream<Integer,UserActivity> viewsWithProfile = views.leftJoin(profiles,(page,profile) -> new UserActivity(profile.getUserID(),profile.getUserName(),profile.getZipcode(),profile.getInterests(),"",page.getPage());
KStream<Integer,UserActivity> userActivityKStream = viewsWithProfile.leftJoin(searches,(userActivity,search)->userActivity.updateSearch(search.getSearchTerms()),JoinWindows.of(1000),Serdes,Integer(),new UserActivitySerde(),new SearchSerde())

补充:

Kafka等幂性保证:https://www.cnblogs.com/smartloli/p/11922639.html

Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。那这两个概念的用途是什么呢?

  • ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
  • SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。


版权声明:本文为victorzzzz原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。