本文目的是给出一种ClickHouse中如何在Kafka表引擎出现异常或者需要再次消费之前已消费过的数据的场景
kafka下载地址
wget https://mirror.bit.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
脚本在kafka的bin路径下
建表语句
这里以CSV格式为例,其余格式可以参考另一篇:【ClickHouse系列】Kafka引擎表消费CSV/JSON/AVRO类型数据
CREATE TABLE test (
C_CUSTKEY UInt32,
C_NAME String,
C_ADDRESS String,
C_CITY String,
C_NATION String,
C_REGION String,
C_PHONE String,
C_MKTSEGMENT String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka-0:9092,kafka-1:9092,kafka-2:9092',
kafka_topic_list = 'test',
kafka_group_name = 'ck',
kafka_format = 'CSV',
kafka_num_consumers = 3;
查看partition的offset
./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092 --group ck --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ck test 2 5 16 11 - - -
ck test 1 5 16 11 - - -
ck test 0 9 15 6 - - -
Detach Kafka引擎表
detach table test;
重置partition的offset
同时重置partition1和partition2的offset
./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092 --group ck --topic test:1,2 --reset-offsets --to-offset 5 --execute GROUP TOPIC PARTITION NEW-OFFSET ck test 1 5 ck test 2 5同时重置所有的partition的offset
./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092 --group ck --topic test --reset-offsets --to-offset 5 --execute GROUP TOPIC PARTITION NEW-OFFSET ck test 0 5 ck test 1 5 ck test 2 5重置指定partition的offset
./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092 --group ck --topic test:1 --reset-offsets --to-offset 6 --execute GROUP TOPIC PARTITION NEW-OFFSET ck test 1 6
Attach Kafka引擎表
attach table test;
在查询Kafka引擎表就可以从指定位点进行消费。
版权声明:本文为weixin_39992480原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。