rocketmq在golang的使用

本文主要是演示golang操作rocketmq。

下载安装开发包

go get 命令可以借助代码管理工具通过远程拉取或更新代码包及其依赖包,并自动完成编译和安装。整个过程就像安装一个 App 一样简单。这个命令在内部实际上分成了两步操作:第一步是下载源码包,第二步是执行 go install。所以我们执行如下命令进行下载安装:

go get github.com/apache/rocketmq-client-go/v2 

执行过程如图:
在这里插入图片描述
rocketmq客户端安装完成。后面我们才能使用rocketmq包提供的命令进行编码。其实在实际项目开发中的使用方式:
在go mod文件中添加:

require (
    github.com/apache/rocketmq-client-go/v2 v2.1.0
)

编写代码

package main

import (
   "context"
   "fmt"
   "os"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/admin"
   "github.com/apache/rocketmq-client-go/v2/consumer"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
   // 1. 创建主题,这一步可以省略,在send的时候如果没有topic,也会进行创建。
   CreateTopic("testTopic01")
   // 2.生产者向主题中发送消息
   SendSyncMessage("hello world2022send test ,rocketmq go client!  too,是的")
   // 3.消费者订阅主题并消费
   SubcribeMessage()
}

func CreateTopic(topicName string) {
   endPoint := []string{"192.168.120.78:9876"}
   // 创建主题
   testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(endPoint)))
   if err != nil {
      fmt.Printf("connection error: %s\n", err.Error())
   }
   err = testAdmin.CreateTopic(context.Background(), admin.WithTopicCreate(topicName))
   if err != nil {
      fmt.Printf("createTopic error: %s\n", err.Error())
   }
}

func SendSyncMessage(message string) {
   // 发送消息
   endPoint := []string{"192.168.120.78:9876"}
   // 创建一个producer实例
   p, _ := rocketmq.NewProducer(
      producer.WithNameServer(endPoint),
      producer.WithRetry(2),
      producer.WithGroupName("ProducerGroupName"),
   )
   // 启动
   err := p.Start()
   if err != nil {
      fmt.Printf("start producer error: %s", err.Error())
      os.Exit(1)
   }

   // 发送消息
   result, err := p.SendSync(context.Background(), &primitive.Message{
      Topic: "testTopic01",
      Body:  []byte(message),
   })

   if err != nil {
      fmt.Printf("send message error: %s\n", err.Error())
   } else {
      fmt.Printf("send message seccess: result=%s\n", result.String())
   }
}

func SubcribeMessage() {
   // 订阅主题、消费
   endPoint := []string{"192.168.120.78:9876"}
   // 创建一个consumer实例
   c, err := rocketmq.NewPushConsumer(consumer.WithNameServer(endPoint),
      consumer.WithConsumerModel(consumer.Clustering),
      consumer.WithGroupName("ConsumerGroupName"),
   )

   // 订阅topic
   err = c.Subscribe("testTopic01", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
      for i := range msgs {
         fmt.Printf("subscribe callback : %v \n", msgs[i])
      }
      return consumer.ConsumeSuccess, nil
   })

   if err != nil {
      fmt.Printf("subscribe message error: %s\n", err.Error())
   }

   // 启动consumer
   err = c.Start()

   if err != nil {
      fmt.Printf("consumer start error: %s\n", err.Error())
      os.Exit(-1)
   }

   err = c.Shutdown()
   if err != nil {
      fmt.Printf("shutdown Consumer error: %s\n", err.Error())
   }
}

运行结果

我们运行代码可以选择用命令:go run 指定文件名,也可以找到main函数执行run,还可以类似配置我们启动类进行配置入口文件。运行日志如下:

GOROOT=/usr/local/go #gosetup
GOPATH=/Users/dxm/go #gosetup
/usr/local/go/bin/go build -o /private/var/folders/bw/xvcy7d7j7nscgrtsbk1lmc500000gn/T/GoLand/___go_build_awesomeProject awesomeProject #gosetup
/private/var/folders/bw/xvcy7d7j7nscgrtsbk1lmc500000gn/T/GoLand/___go_build_awesomeProject
ERRO[0005] create topic error                            broker="192.168.120.78:10911" topic=testTopic01 underlayError="request timeout"
createTopic error: request timeout
WARN[0005] query topic route from server error           underlayError="topic not exist"
WARN[0005] queryTopicRouteInfoFromServer return nil      topic=testTopic01
INFO[0005] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":7,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="<nil>" topic=testTopic01
send message seccess: result=SendResult [sendStatus=0, msgIds=C0A8784EB39C0000000074f884e00001, offsetMsgId=C0A8784E00002A9D000000000000021E, queueOffset=0, messageQueue=MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=1]]
INFO[0005] the consumer start beginning                  consumerGroup=ConsumerGroupName messageModel=Clustering unitMode=false
WARN[0005] query topic route from server error           underlayError="topic not exist"
WARN[0005] queryTopicRouteInfoFromServer return nil      topic=testTopic01
INFO[0005] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":1,\"writeQueueNums\":1,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="<nil>" topic="%RETRY%ConsumerGroupName"
INFO[0005] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":7,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" topic=testTopic01
INFO[0005] receive broker's notification to consumer group  consumerGroup=ConsumerGroupName
INFO[0005] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":1,\"writeQueueNums\":1,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="<nil>" topic="%RETRY%ConsumerGroupName"
WARN[0005] delete mq from offset table                   MessageQueue="MessageQueue [topic=%RETRY%ConsumerGroupName, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName
WARN[0005] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=%RETRY%ConsumerGroupName, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] the MessageQueue changed, version also updated  changeTo=1655975244827267000 changedFrom=0
INFO[0005] The PullThresholdForTopic is changed          changeTo=102400 changedFrom=102400
INFO[0005] The PullThresholdSizeForTopic is changed      changeTo=51200 changedFrom=51200
INFO[0005] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"192.168.120.78:10909\"}}]}" changedFrom="<nil>" topic=testTopic01
WARN[0005] delete mq from offset table                   MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=1]" consumerGroup=ConsumerGroupName
WARN[0005] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=1]" consumerGroup=ConsumerGroupName offset=0
WARN[0005] delete mq from offset table                   MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=2]" consumerGroup=ConsumerGroupName
WARN[0005] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=2]" consumerGroup=ConsumerGroupName offset=0
WARN[0005] delete mq from offset table                   MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=3]" consumerGroup=ConsumerGroupName
subscribe callback : [Message=[topic=testTopic01, body=hello world2022send test ,rocketmq go client!  too, Flag=0, properties=map[CONSUME_START_TIME:1655975244853 MAX_FFSET:1 MIN_OFFSET:0 UNIQ_KEY:C0A8784EB39C0000000074f884e00001], TransactionId=], MsgId=C0A8784EB39C0000000074f884e00001, OffsetMsgId=C0A8784E00002A9D000000000000021E,QueueId=1, StoreSize=196, QueueOffset=0, SysFlag=0, BornTimestamp=1655975244765, BornHost=172.23.0.1:60480, StoreTimestamp=1655975244804, StoreHost=192.168.120.78:10909, CommitLogOffset=542, BodyCRC=1591959001, ReconsumeTimes=0, PreparedTransactionOffset=0] 
WARN[0005] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=3]" consumerGroup=ConsumerGroupName offset=0
WARN[0005] delete mq from offset table                   MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName
WARN[0005] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] the MessageQueue changed, version also updated  changeTo=1655975244861143000 changedFrom=0
INFO[0005] The PullThresholdForTopic is changed          changeTo=20480 changedFrom=102400
INFO[0005] The PullThresholdSizeForTopic is changed      changeTo=10240 changedFrom=51200
INFO[0005] push consumer close pullConsumer listener.    consumerGroup=ConsumerGroupName
INFO[0005] update offset to broker success               MessageQueue="MessageQueue [topic=%RETRY%ConsumerGroupName, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] update offset to broker success               MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=1]" consumerGroup=ConsumerGroupName offset=1
INFO[0005] update offset to broker success               MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=2]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] update offset to broker success               MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=3]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] update offset to broker success               MessageQueue="MessageQueue [topic=testTopic01, brokerName=broker-a, queueId=0]" consumerGroup=ConsumerGroupName offset=0
INFO[0005] will remove client from clientMap             clientID=192.168.120.78@45980

Process finished with the exit code 0

从日志大家可以发现有很多error,但是实际上程序执行是成功的,因为在实际开发中不需要显式的去执行Createtopic函数的,因为在发送消息的时候,如果topic不存在会自动创建(注:有的时候使用的云服务商的MQ需要手动创建topic,也不需要执行Createtopic函数,所以这里仅仅是Demo示例)。

我们从console看一下topic:
在这里插入图片描述
我们再来看看message:在这里插入图片描述
好,至此,rocketmq在golang的使用示例就完成了。


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_44798288/article/details/125522934