怎么重设消费者组位移

重设策略

一、 位移维度

  1. Earliest: 把位移调整到当前最早位移处,不一定是0,因为很久以前的消息可能被自动删除了
  2. Latest:  把位移调整到当前最新位移处
  3. Current: 把位移调整到当前最新提交位移处
  4. Specified-Offset: 把位移调整到指定移处
  5. Shift-By-N: 把位移调整到指定移处+N处,N允许负数

二、时间维度

  1. DateTime: 把位移调整到大于给定时间的最小位移处
  2. Duration: 把位移调整到距离当前时间指定间隔位移处, 具体格式类似于java8的 java.time.Duration --- P*DT*H*M*S

 

重设消费者组位移方式

  • 通过消费者API :KafkaConsumer - seek()、seekToBeginning、seekToEnd 方法
  • 通过命令行: 
  1. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute 
  2. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
  3. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
  4. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
  5. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
  6. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-08-23T12:00:00.000 --execute
  7. bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration P1DT10H30M0S --execute
     

 


版权声明:本文为yujianping_123原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。