【kafka系列教程06】kafka接口API

Apache Kafka引入一个新的java客户端(在org.apache.kafka.clients 包中),替代老的Scala客户端,但是为了兼容,将会共存一段时间。为了减少依赖,这些客户端都有一个独立的jar,而旧的Scala客户端继续与服务端保留在同个包下。

Kafka有4个核心API:

  • Producer API允许应用程序发送数据流到kafka集群中的topic。
  • Consumer API允许应用程序从kafka集群的topic中读取数据流。
  • Streams API允许从输入topic转换数据流到输出topic。
  • Connect API通过实现连接器(connector),不断地从一些源系统或应用程序中拉取数据到kafka,或从kafka提交数据到宿系统(sink system)或应用程序。

kafka公开了其所有的功能协议,与语言无关。只有java客户端作为kafka项目的一部分进行维护,其他的作为开源的项目提供,这里提供了非java客户端的列表。
https://cwiki.apache.org/confluence/display/KAFKA/Clients

 

kafka生产者API(Producer API):

如何使用生产者:

我们鼓励所有新开发的程序使用新的Java生产者,新的java生产者客户端比以前的Scala的客户端更快、功能更全面。通过下面的例子,引入Maven(可以更改新的版本号)。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.1.0</version>
    </dependency>

kafka客户端发布record(消息)到kafka集群。

新的生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。

一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main方法里就能直接运行,

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。

send()方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。

ack是判别请求是否为完整的条件(就是是判断是不是成功发送了)。我们指定了“all”将会阻塞消息,这种设置性能最低,但是是最可靠的。

retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。

producer(生产者)缓存每个分区未发送的消息。缓存的大小是通过 batch.size配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。

默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。

buffer.memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。

key.serializervalue.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerializaerStringSerializer处理简单的string或byte类型。

send()

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

异步发送一条消息到topic,并调用callback(当发送已确认)。

send是异步的,并且一旦消息被保存在等待发送的消息缓存中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。

发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果topic使用的是LogAppendTime,则追加消息时,时间戳是broker的本地时间。

由于send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。

如果要模拟一个简单的阻塞调用,你可以调用get()方法。

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();

完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知。

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });

发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1保证执行 callback2之前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用自己的Executor来并行处理。

pecified by:

send in interface Producer<K,V>

Parameters:

record - 发送的记录(消息)
callback - 用户提供的callback,服务器来调用这个callback来应答结果(null表示没有callback)。

Throws:

InterruptException - 如果线程在阻塞中断。
SerializationException - 如果key或value不是给定有效配置的serializers。
TimeoutException - 如果获取元数据或消息分配内存话费的时间超过max.block.ms。
KafkaException - Kafka有关的错误(不属于公共API的异常)。


kafka新消费者API(Consumer API):

这个新的消费API,清除了0.8版本的高版本和低版本消费者之间的区别,你可以通过下面的maven,引入依赖到你的客户端。

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.0.0</version>
</dependency>

如何使用新的消费者:

kafka客户端从kafka集群消费消息(记录)。它会透明地处理kafka集群中服务器的故障。它获取集群内数据的分区,也和服务器进行交互,允许消费者组进行负载平衡消费。(见下文)。

 

消费者维持TCP连接到必要的broker来获取消息。故障导致消费者关闭使用,会泄露这些连接,消费者不是线程安全的,可以查看更多关于Multi-threaded(多线程)处理的细节。

 

偏移量和消费者的位置

kafka为每个分区的每条消息保持偏移量的值,这个偏移量是该分区中一条消息的唯一标示符。也表示消费者在分区的位置。也就是说,一个位置是5的消费者,说明已经消费了0到4的消息(记录)并下一个接收消息的偏移量设置为5。关于的消费者,实际上“位置”有2个概念。

 

消费者将给出下一个消息的偏移量的位置,这个是消费者在分区中能看到的最后的偏移量,消费者收到的数据称为poll(long)[长轮询],每次接收消息,偏移量会自动增长,

 

“已提交”的位置是已安全保存的最后偏移量,如果处理失败,这个偏移量会恢复并重新开始。消费者可以自动定期提交偏移量,也可以选择通过调用commitSync来控制,这是阻塞的,直到偏移量提交成功或在提交过程中发生致命的错误,commitAsync是非阻塞式的,当成功或失败时,会引发OffsetCommitCallback。

 

这个区别是当一条消息已认为已被消费,控制权在消费者,下面我们进一步更详细地讨论。

 

消费者组和主题订阅

Kafka使用消费者组概念,允许进程池瓜分消费和处理消息的工作。这些进程可以在同一台机器运行,或更可能的是,可以部分到多台机器上,以提供额外的可扩展性和容错性处理。


每个kafka消费者都能配置一个属于它自己的消费者组。并可以动态的配置它需要订阅的主题列表,通过`subscribe(List, ConsumerRebalanceListener)`,或订阅匹配特定模式的主题,通过`subscribe(Pattern, ConsumerRebalanceListener)`,kafka将已订阅主题的每个消费者组中的每条消息发送给一个进程。这是通过在每个组的消费者进程平衡主题的分区来实现的。 所有,如果一个主题有4个分区,并且一个消费者组有2个进程,每个进程将从2个分区来进行消费,这个是动态维护的:如果一个进程故障,分区将重新分派到同组的其他的进程。如果有新的进程加入该组,分区将现有消费者移动到新的进程。


所以,如果2个进程订阅了一个主题,指定不同的组,他们将获取这个主题所有的消息,如果他们指定相同的组,那么它们将每个获取大约一半的消息。

从概念上讲,你可以把消费者组看作一个单一的逻辑用户(订阅者),碰巧组成了多个进程。作为一个多用户系统。kafka也支持任意数量的消费者组提供一个指定的主题不重复的数据,

这是关于常用消息系统功能的简单的概述,类似于传统消息系统中的队列,所有进程将是一个单独的消费组的一部分,因此,消息的交付由该组进行平衡,类似于队列。与传统的消息传递系统不同的是,你可以有多个这样的组。在传统的消息传递系统中,每个进程都有它自己的消费组,所以每个进程都会订阅发布到主题的所有消息。


此外,当组重新分配自动发生,消费者可以通过调用`ConsumerRebalanceListener`通知,这使得他们能够完成必要的应用程序级的逻辑,如状态清除,手动偏移提交(注意,指定的消费者组的偏移量总是已经提交的)


它也有可能为消费者手动指定分配给它通过`assign(List)`,这将禁用动态分区分配。

 

示例

这个消费者API提供了灵活性,以涵盖各种消费场景,下面是一些例子来演示如何使用它们。

自动提交偏移量

这是个【自动提交偏移量】的简单的kafka消费者API。

    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

设置`enable.auto.commit`,偏移量由`auto.commit.interval.ms`控制自动提交的频率。

集群是通过配置bootstrap.servers指定一个或多个broker。不需要制定全部的broker,会自动发现在集群中的其余的borker(最好指定多个,万一有服务器故障)。

在这个例子中,客户端订阅了主题`foo`和`bar`。消费者组叫`test`。

broker通过心跳机器自动检测test组中失败的进程,消费者会自动`ping`集群,告诉进群它还活着。只要消费者能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过`session.timeout.ms`,那么就会认为是故障的,它的分区将被分配到别的进程。
 

这个`deserializer`设置是指定如何去把byte转为为object类型,例子中,通过指定string的 deserializers,我们告诉我们获取到的消息的key和value只是简单的string类型。

 

手动控制偏移量

不依赖于定期提交偏移量,你可以自己控制偏移量,当消息认为已消费过了,这个时候再去提交它们的偏移量。这个是很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。在这里例子中,当我们有足够的消息进行批处理时我们将它们插入到数据。它们接收了消费者的消息之后,消息将被认为已经消费了,这个时候我们的过程失败了,我们读取我们的内存缓存区的消息,有可能他们已被插入到数据库中了。为了避免这一点,我们将手动提交的偏移量,当相应的消息已被插入到数据库中。我们准确控制一条消息才被认为是消费的。提出了一个相反的可能性:在插入到数据库中,但在提交之前,这个过程可能会失败(尽管这可能只有几毫秒,但它是一种可能性)。在这种情况下,进程将获取到已提交的偏移量,并会重复插入的最后一批数据。用这种方式,被称为`“至少一次”`担保,因为每个消息可能会提供一次,但在故障情况下,可以重复。

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }

上面的例子使用commitSync表示所有收到的消息为”已提交",在某些情况下,你可以希望更精细的控制,通过指定一个明确消息的偏移量为“已提交”。在下面,我们的例子中,我们处理完每个分区中的消息后,提交偏移量。

try {
         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

注:已提交的偏移量应该一直是你的应用程序将读取的下一条消息来的偏移量。因此,调用commitSync(offsets)时,你应该添加最后一条消息的偏移量。

 

订阅指定的分区

在前面的例子中,我们订阅我们感兴趣的主题,让kafka提供给我们平分后的主题的分区,它提供了一个简单的负载平衡机制,所以我们的程序通过多个实例来瓜分处理这些消息。

在这种模式下,消费者将只会获取它订阅的分区,如果消费者实例故障,不会尝试重新平衡分区到其他的实例。

  • 第一种情况是,如果这个进程与该分区保持某种本地状态(如 本地磁盘上的key-value存储)因此它应该只能获取这个分区的消息,它是保持在磁盘上。
  • 另一种情况是,如果这个过程本身是高度可用的,将重新启动,如果失败(可能使用集群管理框架如YARN,Mesos,或者AWS设施,或作为一个流处理框架的一部分)。在这种情况下不需要kafka来发现故障和重新分配,而消费进程将在另一台机器重新启动。


这种模式很容易指定,不是订阅主题,只需要消费者订阅特定的分区即可:

String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));

消费者指定的组仍然用于提交偏移量。但现在分区设置只会改变消费者的分区。如果消费者指定新的分区,并不会尝试故障检查。
 

它不能即订阅了指定的分区(无负载平衡),又使用相同的消费者实例的主题(负载平衡)。

 

存储kafka之外的偏移量

消费者应用程序不必使用kafka内置的偏移量仓库,它可以自己选择存储偏移量的仓库。主要的一点是允许应用程序存储偏移量和偏移量的结果存储在同一个系统中,原子的消费情况,这并不一定,但将使消费完全原子,并给出“恰好一次”的语义比默认“至少一次”语义更强壮的,你要用kafka的偏移提交功能。

 

这有结合的例子。

  • 如果消费的结果被存储在关系数据库中,存储在数据库中的偏移,也允许提交偏移量和结果,并在单个事务中。因此,事物将成功和偏移量将被更新的基础上。如果被消耗或结果将不会被存储并且偏移量也不会被更新。
  • 如果结果存储在一个本地仓库,它也可能存储偏移量。例如,一个搜索索引可以通过订阅一个特定的分区和存储两个偏移和索引数据一起建立。如果这是在一种原子的方式进行的,它通常是可能的,即使发生事故导致unsync数据丢失,剩下是也相应的偏移量存储。这意味着,在这种情况下,当恢复后,失去最近更新索引进程从刚刚地方恢复索引,它确保没有更新丢失。

每个消息都有自己的偏移,所以要管理自己的偏移,你只需要做到以下几点:

  • 配置 enable.auto.commit=false
  • 使用提供的每个 ConsumerRecord 来保存你的位置。.
  • 在重启时使用恢复消费者的位置用 seek(TopicPartition, long).

当分区分配也手动完成,这种类型的使用是最小的。(像上文搜索索引的情况).如果分区分配是自动完成的,需要特别小心处理分区分配变更的情况。可以通过提供的 `ConsumerRebalanceListener`调用`subscribe(Collection, ConsumerRebalanceListener) `和`subscribe(Pattern, ConsumerRebalanceListener)`。例如,当分区从消费者拿一条消息,消息费想要提交这些分区的偏移量,通过执行`ConsumerRebalanceListener.onPartitionsRevoked(Collection)`,当分区分配给消费者,消费者通过`ConsumerRebalanceListener.onPartitionsAssigned(Collection)`,为新的分区和正确初始化位置的消费者找到偏移。

kafka允许指定位置,使用`seek(TopicPartition, long)`来指定新的位置,一些特别的方法寻找最早和最晚的偏移,服务器维护也可用(`seekToBeginning(Collection)` 和 `seekToEnd(Collection))`。

消费流控制

如果多个分区分配一个消费者获取的数据,它将试图同时消费所有的,有效地给这些分区为消费相同的优先级。然而在某些情况下,消费者可能首先想全速专注于获取的一些子集分配分区,当这些分区很少或已经没有消费数据了在去抓取其他分区。

还有这样一种情况,流处理,其中处理器由两个topic获取和执行这两个流的连接。当的topic之一是早已落后于其他落后,处理器想暂停为了得到滞后流赶上从前面的topic取。另一个例子是在消费者的Bootstrap启动,其中有很多历史数据的追赶中,应用程序通常想要得到的一些话题考虑获取其他topic之前的最新数据。

kafka支持动态控制消费流量,分别在future的`poll(long)`调用中执行中使用 `pause(Collection)` 和 `resume(Collection)` 来暂停消费指定分配的分区,重新开始消费指定暂停的分区。

 

Kafka Streams API:

在0.10.0增加了一个新的客户端库,Kafka Stream,Kafka Stream具有Alpha的优点,你可以使用maven引入到你的项目:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.0.0</version>
    </dependency>

如何使用:

(注意,@InterfaceStability.Unstable注解的类,是公共API,在未来可能改变,不保证向后兼容)

@InterfaceStability.Unstable
public class KafkaStreams
extends Object

Kafka允许从1个或多个topic进行连续的执行计算,并输出到0或多个topic。可以通过使用TopologyBuilder类来定义指定计算逻辑的DAG拓扑的处理器,或使用KStreamBuilder类,该类提供了高级别的kstream DSL,来定义转换。KafkaStreams类管理kafka Stream实例的生命周期。一个Stream实例可以包含一个或多个Thread(在配置中指定)。

一个KafkaStreams实例可以与任何具有相同应用ID(无论是同一进程,这台机器上的其他进程,或远程的机器)的其它的实例作为一个单一(也可能是分布式的)的Stream处理客户端。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以平衡处理负载。

在内部,KafkaStream实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读和写。

一个简单的例子:

    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

 

Kafka Connect API:

Connect API实现一个连接器(connector),不断地从一些数据源系统拉取数据到kafka,或从kafka推送到宿系统(sink system)。

大多数Connect使用者不需要直接操作这个API,可以使用之前构建的连接器,不需要编写任何代码。有关Connect的其他信息。

想实现自定义连接器的,可以看javadochttps://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/connect)。

Kafka Connect 是一个可扩展、可靠的在Kafka和其他系统之间流传输的数据工具。它可以通过connectors(连接器)简单、快速的将大集合数据导入和导出kafka。Kafka Connect可以接收整个数据库或收集来自所有的应用程序的消息到Kafka Topic。使这些数据可用于低延迟流处理。导出可以把topic的数据发送到secondary storage(辅助存储也叫二级存储)也可以发送到查询系统或批处理系统做离线分析。Kafka Connect功能包括:

  • Kafka连接器通用框架:Kafka Connect 规范了kafka与其他数据系统集成,简化了connector的开发、部署和管理。

  • 分布式和单机模式- 扩展到大型支持整个organization的集中管理服务,也可缩小到开发,测试和小规模生产部署。 

  • REST 接口- 使用REST API来提交并管理Kafka Connect集群。

  • 自动的offset管理- 从connector获取少量的信息,Kafka Connect来管理offset的提交,所以connector的开发者不需要担心这个容易出错的部分。

  • 分布式和默认扩展- Kafka Connect建立在现有的组管理协议上。更多的工作可以添加扩展到Kafka Connect集群。

  • 流/批量集成- 利用kafka现有的能力,Kafka Connect是一个桥接流和批量数据系统的理想解决方案。


版权声明:本文为dcm19920115原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。