Golang 操作Kafka

1. 生产者生产消息

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
  // sasl认证
	config.Net.SASL.Enable = true
	config.Net.SASL.User = "admin"
	config.Net.SASL.Password = "admin"
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"192.168.67.128:9092", "192.168.67.129:9092", "192.168.67.130:9092"}, config)
	if err != nil {
		fmt.Println("producer close, err:", err)
		return
	}

	defer client.Close()
	// 构造⼀个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "ffcs"
	msg.Value = sarama.StringEncoder("hello kafka")
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send message failed,", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

2.异步发送

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Return.Successes = true
	p, err := sarama.NewAsyncProducer([]string{"192.168.67.128:9092", "192.168.67.129:9092", "192.168.67.130:9092"}, config)
	if err != nil {
		fmt.Println("producer close, err:", err)
		return
	}
	defer p.Close()
	go func(p sarama.AsyncProducer) {
		errors := p.Errors()
		for {
			select {
			case err := <-errors:
				if err != nil {
					glog.Errorln(err)
				}
			case <-p.Successes():
				logs.Info("消息发送producer成功")
			}
		}
	}(p)
	for  {
		v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
		fmt.Fprintln(os.Stdout, v)
		msg := &sarama.ProducerMessage{
			Topic:     "ffcs",
			Value:     sarama.ByteEncoder(v),
		}
		p.Input() <- msg
		time.Sleep(time.Second *1)
	}
}

3. 消费者消费消息

func main() {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V0_11_0_2
	config.Net.SASL.Enable = true
	config.Net.SASL.User = "admin"
	config.Net.SASL.Password = "admin"
	// consumer
	consumer, err := sarama.NewConsumer([]string{"192.168.67.128:9092", "192.168.67.129:9092", "192.168.67.130:9092"}, config)
	if err != nil {
		fmt.Printf("consumer_test create consumer error %s\n", err.Error())
		return
	}
	partitionList, err := consumer.Partitions("test") // 根据topic取到所有的分区
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	defer consumer.Close()
	for partition := range partitionList {
		partitionConsumer, err := consumer.ConsumePartition("ffcs", int32(partition), sarama.OffsetOldest)
		if err != nil {
			fmt.Printf("try create partition_consumer error %s\n", err.Error())
			return
		}
		defer partitionConsumer.Close()
		for {
			select {
			case msg := <-partitionConsumer.Messages():
				fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
					msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
			case err := <-partitionConsumer.Errors():
				fmt.Printf("err :%s\n", err.Error())
			}
		}
	}
}

4. 消费者组

type Consumer struct {
	ready        chan bool
	consumerName string
}

func main() {
	consumer := &Consumer{
		ready:        make(chan bool),
		consumerName: "consumer1",
	}
	consumer2 := &Consumer{
		ready:        make(chan bool),
		consumerName: "consumer2",
	}
	ctx, consumerCancel := context.WithCancel(context.Background())
	wg := &sync.WaitGroup{}
	wg.Add(2)
	go startConsumer(ctx, []string{"ffcs"}, consumer)

	go startConsumer(ctx, []string{"ffcs"}, consumer2)
	wg.Wait()
	consumerCancel()
}


func (consumer *Consumer) Setup(_ sarama.ConsumerGroupSession) error {
	// Mark the consumer as ready
	close(consumer.ready)
	return nil
}
func (consumer *Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		logs.Info("ConsumerName:%s Message claimed: value = %s, topic = %s,partition=%d", consumer.consumerName, string(message.Value), message.Topic, message.Partition)
		session.MarkMessage(message, "")
	}
	return nil
}

func startConsumer(ctx context.Context, topics []string, consumer *Consumer) {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V0_11_0_2
	config.Net.SASL.Enable = true
	config.Net.SASL.User = "admin"
	config.Net.SASL.Password = "admin"
	// consumer
	consumerGroup, err := sarama.NewConsumerGroup([]string{"192.168.67.142:9092"}, "test-group", config)
	if err != nil {
		fmt.Printf("NewConsumerGroup create consumer error %s\n", err.Error())
		return
	}
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if err := consumerGroup.Consume(context.Background(), topics, consumer); err != nil {
				fmt.Printf("Error from consumer: %v", err)
			}
			consumer.ready = make(chan bool)
		}
	}()
	<-consumer.ready

	select {
	case <-ctx.Done():
		logs.Info("kafka terminating: context cancelled")
	}

	wg.Wait()
}

5. 自定义分区器

 type myPartitioner struct {
      partition int32
  }

  func (p *myPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
      encode, err := message.Value.Encode()
      if err != nil {
          return 0, err
      }
      value := string(encode)
      ret := p.partition
      if strings.Contains(value, "chan") {
          p.partition = 1
      }
      return ret, nil
  }

  func (p *myPartitioner) RequiresConsistency() bool {
      return false
  }

  func NewMyPartitioner(topic string) sarama.Partitioner {
      return &myPartitioner{}
  }

func main(){
  config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll 
	config.Producer.Partitioner = NewMyPartitioner   // 自定义的分区器
}


版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/binter12138/article/details/124508049