go使用rocketmq

下载

go get -u -v github.com/apache/rocketmq-client-go/v2

初始化

type MqConf struct {
	NameServers []string `mapstructure:"nameServers"`
}

var (
	MqProducer            rocketmq.Producer
	MqPushConsumerSuccess rocketmq.PushConsumer
	MqPushConsumerFail    rocketmq.PushConsumer
	MqPushConsumerDelay   rocketmq.PushConsumer
)

const (
	MqRetryTimes = 3
)

func InitMq(mqConf *MqConf) {
	if mqConf == nil {
		panic("mq config is nil")
		return
	}
	var err error
	MqProducer, err = rocketmq.NewProducer(
		producer.WithGroupName("notify_producer"),
		producer.WithNameServer(mqConf.NameServers),
		producer.WithRetry(MqRetryTimes),
	)
	if err != nil {
		panic(fmt.Sprintf("init rocket mq producer err:%v", err))
		return
	}

	err = MqProducer.Start()
	if err != nil {
		panic(fmt.Sprintf("producer mq start err:%v", err))
		return
	}

	MqPushConsumerSuccess, err = rocketmq.NewPushConsumer(
		consumer.WithGroupName("notify_consumer_success"),
		consumer.WithNameServer(mqConf.NameServers),
	)
	if err != nil {
		panic(fmt.Sprintf("init rocket mq push consumer err:%v", err))
		return
	}

	MqPushConsumerFail, err = rocketmq.NewPushConsumer(
		consumer.WithGroupName("notify_consumer_fail"),
		consumer.WithNameServer(mqConf.NameServers),
	)
	if err != nil {
		panic(fmt.Sprintf("init rocket mq push consumer err:%v", err))
		return
	}

	MqPushConsumerDelay, err = rocketmq.NewPushConsumer(
		consumer.WithGroupName("notify_consumer_delay"),
		consumer.WithNameServer(mqConf.NameServers),
	)
	if err != nil {
		panic(fmt.Sprintf("init rocket mq push consumer err:%v", err))
		return
	}


}

func ShutDownMq() {
	_ = MqProducer.Shutdown()
	_ = MqPushConsumerSuccess.Shutdown()
	_ = MqPushConsumerFail.Shutdown()
	_ = MqPushConsumerDelay.Shutdown()
}

这里生产者只实例化了一个,然后可以启动了。

消费者如果是多个topic或者多个tag需要实例化多个

即:同一group下的所有消费者应该订阅的topic和tag都是相同的

同一group下的topic相同,但tag订阅不同也是不行的

否则会导致rebalance导致consumer只消费一部分topic

消费者可以订阅多个topic或者tag,但是同一组内的订阅的关系必须一致

只实例化一次即可,定义全局变量或者单例都可以

订阅代码

func SubScribe() {
	var err error
	err = conf.MqPushConsumerSuccess.Subscribe(include.TopicNotifySucess, consumer.MessageSelector{}, callBackSucc)
	if err != nil {
		panic(fmt.Sprintf("consumer subscribe TopicNotifySucess err:%v", err))
		return
	}

	err = conf.MqPushConsumerFail.Subscribe(include.TopicNotifyFail, consumer.MessageSelector{}, callBackFail)
	if err != nil {
		panic(fmt.Sprintf("consumer subscribe TopicNotifyFail err:%v", err))
		return
	}

	err = conf.MqPushConsumerDelay.Subscribe(include.TopicNotifyDelay, consumer.MessageSelector{}, callBackTimeOut)
	if err != nil {
		panic(fmt.Sprintf("consumer subscribe TopicNotifyDelay err:%v", err))
		return
	}

	err = conf.MqPushConsumerSuccess.Start()
	if err != nil {
		panic(fmt.Sprintf("MqPushConsumerSuccess start err:%v", err))
		return
	}

	err = conf.MqPushConsumerFail.Start()
	if err != nil {
		panic(fmt.Sprintf("MqPushConsumerFail start err:%v", err))
		return
	}

	err = conf.MqPushConsumerDelay.Start()
	if err != nil {
		panic(fmt.Sprintf("MqPushConsumerDelay start err:%v", err))
		return
	}

}
func callBackSucc(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

	for i := range msgs {
        fmt.Println(string(msgs[i].Body))
	}
	return consumer.ConsumeSuccess, nil
}

func callBackFail(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

	for i := range msgs {
		fmt.Println(string(msgs[i].Body))
	}
	return consumer.ConsumeSuccess, nil
}

func callBackTimeOut(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

	for i := range msgs {
		fmt.Println(string(msgs[i].Body))
	}
	return consumer.ConsumeSuccess, nil
}

1.必须要先订阅再start, 如果启动报错,topic 未被发现,可以尝试手动创建该topic

2.rocketmq可能会存在重复的消息,所以需要消费的回调函数里,保持幂等

发送消息

const (
	RetryTime = 3
	SleepTime = time.Millisecond * 10
)

func SendSuccNotifyByMq(taskID string) (err error) {
	msg := primitive.NewMessage(include.TopicNotifySucess, []byte(taskID))
	//msg.WithTag(include.TagSuccNotify)
	for i := 0; i < RetryTime; i++ {
		res, err := conf.MqProducer.SendSync(context.Background(), msg)
		if err != nil {
			time.Sleep(SleepTime)
			continue
		}
		break
	}
	return

}

func SendFailNotifyByMq(taskID string) (err error) {
	msg := primitive.NewMessage(include.TopicNotifyFail, []byte(taskID))
	//msg.WithTag(include.TagFailNotify) 如果要发送带tag的加上这个
	for i := 0; i < RetryTime; i++ {
        res, err := conf.MqProducer.SendSync(context.Background(), msg)
	    if err != nil {
			time.Sleep(SleepTime)
			continue
		}
		break
	}
	return
}

//发送延时消息
func SendDelayNotifyByMq(taskID string) (err error) {
	msg := primitive.NewMessage(include.TopicNotifyDelay, []byte(taskID))
	msg.WithDelayTimeLevel(conf.C.DelayTime)
	for i := 0; i < RetryTime; i++ {
		res, err := conf.MqProducer.SendSync(context.Background(), msg)
		if err != nil {
			time.Sleep(SleepTime)
			continue
		}
		break
	}
	return
}

1.加上尝试,会有发送失败的情况

2.延时消息分等级#1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h  

1就是1s 2就是5s ,依次类推,不能指定自己的时间

测试例子

//func TestMq(t *testing.T) {
//
//	var err error
//	MqProducer, err := rocketmq.NewProducer(
//		producer.WithGroupName("notify_test"),
//		producer.WithNameServer([]string{"ip:port"}),
//		producer.WithRetry(3),
//	)
//	if err != nil {
//		panic(fmt.Sprintf("init rocket mq producer err:%v", err))
//		return
//	}
//
//	err = MqProducer.Start()
//	if err != nil {
//		panic(fmt.Sprintf("producer mq start err:%v", err))
//		return
//	}
//	ch := make(chan struct{}, 3)
//	defer MqProducer.Shutdown()
    //发送带tag的消息
//	msg := primitive.NewMessage("test_notify_topic", []byte("111111111111"))
//	//msg.WithTag(include.TagSuccNotify)
//  同步发送
//	res, err := MqProducer.SendSync(context.Background(), msg)
//
//	fmt.Println(res, err)
//
//	msg = primitive.NewMessage("test_notify_topic_1", []byte("22222222222222"))
//	//msg.WithTag(include.TagFailNotify)
//
//	res, err = MqProducer.SendSync(context.Background(), msg)
//
//	fmt.Println(res, err)
//  发送延时消息
//	msg = primitive.NewMessage("test_notify_topic_delay", []byte("delay msg"))
//	//msg.WithTag(include.TagFailNotify)
//	msg.WithDelayTimeLevel(2)
//	res, err = MqProducer.SendSync(context.Background(), msg)
//
//	fmt.Println(res, err)
//  如果是同机器 同一组下的多个消费者,需要不同的实例化名字,不同机器的同一组不需要,默认会以机器
ip@port有关
//	MqPushConsumer1, err := rocketmq.NewPushConsumer(
//		//consumer.WithInstance("mq1"),
//		consumer.WithGroupName("notify_test1"),
//		consumer.WithNameServer([]string{"192.168.11.8:9876"}),
//	)
//
//	MqPushConsumer2, err := rocketmq.NewPushConsumer(
//		//consumer.WithInstance("mq2"),
//		consumer.WithGroupName("notify_test2"),
//		consumer.WithNameServer([]string{"192.168.11.8:9876"}),
//	)
//
//	MqPushConsumer3, err := rocketmq.NewPushConsumer(
//		//consumer.WithInstance("mq2"),
//		consumer.WithGroupName("notify_test3"),
//		consumer.WithNameServer([]string{"192.168.11.8:9876"}),
//	)
//	if err != nil {
//		panic(fmt.Sprintf("init rocket mq push consumer err:%v", err))
//		return
//	}
//  取带该选择器的tag的消息,会过滤掉其他的
//	selector := consumer.MessageSelector{
//		Type:       consumer.TAG,
//		//Expression: include.TagSuccNotify,
//	}
//	err = MqPushConsumer1.Subscribe("test_notify_topic", selector, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
//		for i := range msgs {
//			fmt.Println("11111111111111111", msgs[i])

//		}
//		ch <- struct{}{}
//		return consumer.ConsumeSuccess, nil
//	})
//	if err != nil {
//		panic(fmt.Sprintf("consumer subscribe succ notify tag err:%v", err))
//		return
//	}
//
//	selector = consumer.MessageSelector{
//		Type:       consumer.TAG,
//		//Expression: include.TagFailNotify,
//	}
//	err = MqPushConsumer2.Subscribe("test_notify_topic_1", selector, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
//		for i := range msgs {
//			//conf.Logger.Info("monitor notify produce suc msg, job id:%v, task id:%v, redo times:%v, msg:%v", taskInfo.JobID.Hex(), taskID.Hex(), taskInfo.Redo, msgs[i])
//			fmt.Println("22222222222222", msgs[i])
//			//conf.Logger.Debug("call back succ recv message", "msg", msgs[i])
//		}
//		ch <- struct{}{}
//		return consumer.ConsumeSuccess, nil
//	})
//	if err != nil {
//		panic(fmt.Sprintf("consumer subscribe fail notify tag err:%v", err))
//		return
//	}
//
//	selector = consumer.MessageSelector{
//		//Type:       consumer.TAG,
//		//Expression: include.TagFailNotify,
//	}
//	err = MqPushConsumer3.Subscribe("test_notify_topic_delay", selector, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
//		for i := range msgs {
//			fmt.Println("333333333333333", msgs[i])
//		}
//		ch <- struct{}{}
//		return consumer.ConsumeSuccess, nil
//	})
//	if err != nil {
//		panic(fmt.Sprintf("consumer subscribe fail notify tag err:%v", err))
//		return
//	}
//
//	err = MqPushConsumer1.Start()
//	err = MqPushConsumer2.Start()
//	err = MqPushConsumer3.Start()
//
//	if err != nil {
//		panic(fmt.Sprintf("mq push consumer err:%v", err))
//		return
//	}
//
//	defer MqPushConsumer1.Shutdown()
//	defer MqPushConsumer2.Shutdown()
//	defer MqPushConsumer3.Shutdown()
//
//	<-ch
//	<-ch
//	<-ch
//}
//
func callBackSucc(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

	for i := range msgs {
		fmt.Println(msgs[i])
	}
	return consumer.ConsumeSuccess, nil
}

func callBackFail(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {


	for i := range msgs {
		fmt.Println(msgs[i])
	}
	return consumer.ConsumeSuccess, nil
}

其他

更多的例子参考

https://github.com/apache/rocketmq-client-go/tree/native


版权声明:本文为qq_28119741原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。