使用kafka包消费pulsar topic,只支持subscriptionType为“FAILOVER”的topic。

问题:

使用kafka包消费pulsar topic,只支持subscriptionType为“FAILOVER”的topic。

使用包:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client-kafka</artifactId>
    <version>2.5.0</version>
</dependency>

原因分析:

1.spring-kafka包的代码

KafkaMessageListenerContainer

package org.springframework.kafka.listener;

/**
 * Single-threaded Message listener container using the Java {@link Consumer} supporting
 * auto-partition assignment or user-configured assignment.
 * <p>
 * With the latter, initial partition offsets can be provided.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 * @author Murali Reddy
 * @author Marius Bogoevici
 * @author Martin Dam
 * @author Artem Bilan
 * @author Loic Talhouarne
 * @author Vladimir Tsanev
 * @author Chen Binbin
 * @author Yang Qiju
 * @author Tom van den Berge
 */
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
...


	@SuppressWarnings("unchecked")
	ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
        ...
        
			if (KafkaMessageListenerContainer.this.topicPartitions == null) {
				if (this.containerProperties.getTopicPattern() != null) {
					consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
				}
				else {
				// 调用pulsar-client-kafka包的方法,创建消费者
	consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
				}
			}
        ...
    }
}

2.pulsar-client-kafka包

KafkaConsumer
package org.apache.kafka.clients.consumer;
public class KafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
    ...
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        ...
        try {
            Iterator var5 = topics.iterator();

            while(true) {
                while(var5.hasNext()) {
                    String topic = (String)var5.next();
                    int numberOfPartitions = (Integer)((PulsarClientImpl)this.client).getNumberOfPartitions(topic).get();
                    // 加载自定义配置
                    ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(this.client, this.properties);
                    // subscriptionType固定设为了Failover
                    consumerBuilder.subscriptionType(SubscriptionType.Failover);
                    consumerBuilder.messageListener(this);
                    consumerBuilder.subscriptionName(this.groupId);
                    ...
                }
                ...

                return;
            }
        } catch (Exception var14) {
            futures.forEach((f) -> {
                try {
                    ((org.apache.pulsar.client.api.Consumer)f.get()).close();
                } catch (Exception var2) {
                }

            });
            throw new RuntimeException(var14);
        }
        ...
    }
    ...
}

原因是:创建consumer时,subscriptionType固定设为了Failover.


版权声明:本文为bandancer原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。