rocketmq
安装
推荐一下这个大佬的安装方法
https://www.cnblogs.com/chenyuanbo/p/15485504.html
实战
接收
Consumer
package com.changan;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @program: t139springcloudalibaba
* @description:
* @author: Mr.shen
* @create: 2022-06-25 10:55
**/
public class Consumer {
public static void main(String[] args) throws Exception {
// TODO 1 创建消费者,指定所属的消费者组名
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("jack-consumer-group");
// TODO 2 指定NameServer的地址
defaultMQPushConsumer.setNamesrvAddr("119.23.176.114:9876");
// TODO 3 指定消费者订阅的主题和标签
defaultMQPushConsumer.subscribe("jack-topic", "*");
// TODO 4 进行订阅:注册回调函数,编写处理消息的逻辑
defaultMQPushConsumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> {
// 1 try catch(throwable)确保不会因为业务逻辑的异常,导致消息出现重复消费的现象
// 2 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest.run()中会对Throwable进行捕获,
// 并且返回ConsumeConcurrentlyStatus.RECONSUME_LATER
try {
System.out.println("收到消息--》" + list);
// 模拟业务异常
int i = 1 / 0;
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// TODO 5 启动消费者
defaultMQPushConsumer.start();
System.out.println("消费者启动成功。。。");
}
}
发送消息
SyncProducer
package com.changan;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @program: t139springcloudalibaba
* @description:
* @author: Mr.shen
* @create: 2022-06-25 10:53
**/
public class SyncProducer {
public static void main(String[] args) throws Exception {
// TODO 1 创建消息生产者,指定生成组名
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("jack-producer-group");
// TODO 2 指定NameServer的地址
defaultMQProducer.setNamesrvAddr("119.23.176.114:9876");
// TODO 3 启动生产者
defaultMQProducer.start();
// TODO 4 构建消息对象,主要是设置消息的主题、标签、内容
Message message = new Message("jack-topic", "jack-tag", ("测试消息发送 --Jack").getBytes());
// TODO 5 发送消息
SendResult result = defaultMQProducer.send(message);
System.out.println("SendResult-->" + result);
// TODO 6 关闭生产者
defaultMQProducer.shutdown();
}
}
版权声明:本文为weixin_50098749原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。