下载
go get -u -v github.com/apache/rocketmq-client-go/v2
初始化
type MqConf struct {
NameServers []string `mapstructure:"nameServers"`
}
var (
MqProducer rocketmq.Producer
MqPushConsumerSuccess rocketmq.PushConsumer
MqPushConsumerFail rocketmq.PushConsumer
MqPushConsumerDelay rocketmq.PushConsumer
)
const (
MqRetryTimes = 3
)
func InitMq(mqConf *MqConf) {
if mqConf == nil {
panic("mq config is nil")
return
}
var err error
MqProducer, err = rocketmq.NewProducer(
producer.WithGroupName("notify_producer"),
producer.WithNameServer(mqConf.NameServers),
producer.WithRetry(MqRetryTimes),
)
if err != nil {
panic(fmt.Sprintf("init rocket mq producer err:%v", err))
return
}
err = MqProducer.Start()
if err != nil {
panic(fmt.Sprintf("producer mq start err:%v", err))
return
}
MqPushConsumerSuccess, err = rocketmq.NewPushConsumer(
consumer.WithGroupName("notify_consumer_success"),
consumer.WithNameServer(mqConf.NameServers),
)
if err != nil {
panic(fmt.Sprintf("init rocket mq push consumer err:%v", err))
return
}
MqPushConsumerFail, err = rocketmq.NewPushConsumer(
consumer.WithGroupName("notify_consumer_fail"),
consumer.WithNameServer(mqConf.NameServers),
)
if err != nil {
panic(fmt.Sprintf("init rocket mq push consumer err:%v", err))
return
}
MqPushConsumerDelay, err = rocketmq.NewPushConsumer(
consumer.WithGroupName("notify_consumer_delay"),
consumer.WithNameServer(mqConf.NameServers),
)
if err != nil {
panic(fmt.Sprintf("init rocket mq push consumer err:%v", err))
return
}
}
func ShutDownMq() {
_ = MqProducer.Shutdown()
_ = MqPushConsumerSuccess.Shutdown()
_ = MqPushConsumerFail.Shutdown()
_ = MqPushConsumerDelay.Shutdown()
}
这里生产者只实例化了一个,然后可以启动了。
消费者如果是多个topic或者多个tag需要实例化多个
即:同一group下的所有消费者应该订阅的topic和tag都是相同的
同一group下的topic相同,但tag订阅不同也是不行的
否则会导致rebalance导致consumer只消费一部分topic
消费者可以订阅多个topic或者tag,但是同一组内的订阅的关系必须一致
只实例化一次即可,定义全局变量或者单例都可以
订阅代码
func SubScribe() {
var err error
err = conf.MqPushConsumerSuccess.Subscribe(include.TopicNotifySucess, consumer.MessageSelector{}, callBackSucc)
if err != nil {
panic(fmt.Sprintf("consumer subscribe TopicNotifySucess err:%v", err))
return
}
err = conf.MqPushConsumerFail.Subscribe(include.TopicNotifyFail, consumer.MessageSelector{}, callBackFail)
if err != nil {
panic(fmt.Sprintf("consumer subscribe TopicNotifyFail err:%v", err))
return
}
err = conf.MqPushConsumerDelay.Subscribe(include.TopicNotifyDelay, consumer.MessageSelector{}, callBackTimeOut)
if err != nil {
panic(fmt.Sprintf("consumer subscribe TopicNotifyDelay err:%v", err))
return
}
err = conf.MqPushConsumerSuccess.Start()
if err != nil {
panic(fmt.Sprintf("MqPushConsumerSuccess start err:%v", err))
return
}
err = conf.MqPushConsumerFail.Start()
if err != nil {
panic(fmt.Sprintf("MqPushConsumerFail start err:%v", err))
return
}
err = conf.MqPushConsumerDelay.Start()
if err != nil {
panic(fmt.Sprintf("MqPushConsumerDelay start err:%v", err))
return
}
}
func callBackSucc(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Println(string(msgs[i].Body))
}
return consumer.ConsumeSuccess, nil
}
func callBackFail(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Println(string(msgs[i].Body))
}
return consumer.ConsumeSuccess, nil
}
func callBackTimeOut(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Println(string(msgs[i].Body))
}
return consumer.ConsumeSuccess, nil
}
1.必须要先订阅再start, 如果启动报错,topic 未被发现,可以尝试手动创建该topic
2.rocketmq可能会存在重复的消息,所以需要消费的回调函数里,保持幂等
发送消息
const (
RetryTime = 3
SleepTime = time.Millisecond * 10
)
func SendSuccNotifyByMq(taskID string) (err error) {
msg := primitive.NewMessage(include.TopicNotifySucess, []byte(taskID))
//msg.WithTag(include.TagSuccNotify)
for i := 0; i < RetryTime; i++ {
res, err := conf.MqProducer.SendSync(context.Background(), msg)
if err != nil {
time.Sleep(SleepTime)
continue
}
break
}
return
}
func SendFailNotifyByMq(taskID string) (err error) {
msg := primitive.NewMessage(include.TopicNotifyFail, []byte(taskID))
//msg.WithTag(include.TagFailNotify) 如果要发送带tag的加上这个
for i := 0; i < RetryTime; i++ {
res, err := conf.MqProducer.SendSync(context.Background(), msg)
if err != nil {
time.Sleep(SleepTime)
continue
}
break
}
return
}
//发送延时消息
func SendDelayNotifyByMq(taskID string) (err error) {
msg := primitive.NewMessage(include.TopicNotifyDelay, []byte(taskID))
msg.WithDelayTimeLevel(conf.C.DelayTime)
for i := 0; i < RetryTime; i++ {
res, err := conf.MqProducer.SendSync(context.Background(), msg)
if err != nil {
time.Sleep(SleepTime)
continue
}
break
}
return
}
1.加上尝试,会有发送失败的情况
2.延时消息分等级#1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1就是1s 2就是5s ,依次类推,不能指定自己的时间
测试例子
//func TestMq(t *testing.T) {
//
// var err error
// MqProducer, err := rocketmq.NewProducer(
// producer.WithGroupName("notify_test"),
// producer.WithNameServer([]string{"ip:port"}),
// producer.WithRetry(3),
// )
// if err != nil {
// panic(fmt.Sprintf("init rocket mq producer err:%v", err))
// return
// }
//
// err = MqProducer.Start()
// if err != nil {
// panic(fmt.Sprintf("producer mq start err:%v", err))
// return
// }
// ch := make(chan struct{}, 3)
// defer MqProducer.Shutdown()
//发送带tag的消息
// msg := primitive.NewMessage("test_notify_topic", []byte("111111111111"))
// //msg.WithTag(include.TagSuccNotify)
// 同步发送
// res, err := MqProducer.SendSync(context.Background(), msg)
//
// fmt.Println(res, err)
//
// msg = primitive.NewMessage("test_notify_topic_1", []byte("22222222222222"))
// //msg.WithTag(include.TagFailNotify)
//
// res, err = MqProducer.SendSync(context.Background(), msg)
//
// fmt.Println(res, err)
// 发送延时消息
// msg = primitive.NewMessage("test_notify_topic_delay", []byte("delay msg"))
// //msg.WithTag(include.TagFailNotify)
// msg.WithDelayTimeLevel(2)
// res, err = MqProducer.SendSync(context.Background(), msg)
//
// fmt.Println(res, err)
// 如果是同机器 同一组下的多个消费者,需要不同的实例化名字,不同机器的同一组不需要,默认会以机器
ip@port有关
// MqPushConsumer1, err := rocketmq.NewPushConsumer(
// //consumer.WithInstance("mq1"),
// consumer.WithGroupName("notify_test1"),
// consumer.WithNameServer([]string{"192.168.11.8:9876"}),
// )
//
// MqPushConsumer2, err := rocketmq.NewPushConsumer(
// //consumer.WithInstance("mq2"),
// consumer.WithGroupName("notify_test2"),
// consumer.WithNameServer([]string{"192.168.11.8:9876"}),
// )
//
// MqPushConsumer3, err := rocketmq.NewPushConsumer(
// //consumer.WithInstance("mq2"),
// consumer.WithGroupName("notify_test3"),
// consumer.WithNameServer([]string{"192.168.11.8:9876"}),
// )
// if err != nil {
// panic(fmt.Sprintf("init rocket mq push consumer err:%v", err))
// return
// }
// 取带该选择器的tag的消息,会过滤掉其他的
// selector := consumer.MessageSelector{
// Type: consumer.TAG,
// //Expression: include.TagSuccNotify,
// }
// err = MqPushConsumer1.Subscribe("test_notify_topic", selector, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
// for i := range msgs {
// fmt.Println("11111111111111111", msgs[i])
// }
// ch <- struct{}{}
// return consumer.ConsumeSuccess, nil
// })
// if err != nil {
// panic(fmt.Sprintf("consumer subscribe succ notify tag err:%v", err))
// return
// }
//
// selector = consumer.MessageSelector{
// Type: consumer.TAG,
// //Expression: include.TagFailNotify,
// }
// err = MqPushConsumer2.Subscribe("test_notify_topic_1", selector, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
// for i := range msgs {
// //conf.Logger.Info("monitor notify produce suc msg, job id:%v, task id:%v, redo times:%v, msg:%v", taskInfo.JobID.Hex(), taskID.Hex(), taskInfo.Redo, msgs[i])
// fmt.Println("22222222222222", msgs[i])
// //conf.Logger.Debug("call back succ recv message", "msg", msgs[i])
// }
// ch <- struct{}{}
// return consumer.ConsumeSuccess, nil
// })
// if err != nil {
// panic(fmt.Sprintf("consumer subscribe fail notify tag err:%v", err))
// return
// }
//
// selector = consumer.MessageSelector{
// //Type: consumer.TAG,
// //Expression: include.TagFailNotify,
// }
// err = MqPushConsumer3.Subscribe("test_notify_topic_delay", selector, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
// for i := range msgs {
// fmt.Println("333333333333333", msgs[i])
// }
// ch <- struct{}{}
// return consumer.ConsumeSuccess, nil
// })
// if err != nil {
// panic(fmt.Sprintf("consumer subscribe fail notify tag err:%v", err))
// return
// }
//
// err = MqPushConsumer1.Start()
// err = MqPushConsumer2.Start()
// err = MqPushConsumer3.Start()
//
// if err != nil {
// panic(fmt.Sprintf("mq push consumer err:%v", err))
// return
// }
//
// defer MqPushConsumer1.Shutdown()
// defer MqPushConsumer2.Shutdown()
// defer MqPushConsumer3.Shutdown()
//
// <-ch
// <-ch
// <-ch
//}
//
func callBackSucc(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Println(msgs[i])
}
return consumer.ConsumeSuccess, nil
}
func callBackFail(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Println(msgs[i])
}
return consumer.ConsumeSuccess, nil
}
其他
更多的例子参考
版权声明:本文为qq_28119741原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。