最近要接入rocketMQ,公司是在阿里云上买的rocketMQ服务,发现ALiYun rocketMQ 和通常的Apache rocketMQ 不太一样。看着阿里云的文档花了好长时间接入成功了。为了放便后续其他伙伴接入,就简单封装了一下。后续伙伴接入只需配一些必填的配置项就可以完成接入了。简单记录一下;
maven:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>
client包源码:
1、启动注解:
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import({AliyunRocketMqAutoConfiguration.class, RocketMqProperties.class})
public @interface EnableAliyunRocketMq {
}
2、自动化配置Bean类 :
/**
* ALiYun-rocketMQ 接入配置
*
* @author zhangjiahui
* @date 2020/12/15 3:48 下午
*/
@Configuration
public class AliyunRocketMqAutoConfiguration {
@Autowired
private RocketMqProperties rocketMqProperties;
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean({Producer.class})
public Producer producer() {
Producer producer = ONSFactory.createProducer(rocketMqProperties.getProperties());
producer.start();
return producer;
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean({OrderProducer.class})
public OrderProducer orderProducer() {
OrderProducer orderProducer = ONSFactory.createOrderProducer(rocketMqProperties.getProperties());
orderProducer.start();
return orderProducer;
}
@Bean
@ConditionalOnBean({Producer.class, OrderProducer.class})
public AliyunRocketMqProducerHolder rocketMqProducerHolder(@Autowired Producer producer,
@Autowired OrderProducer orderProducer) {
AliyunRocketMqProducerHolder holder = AliyunRocketMqProducerHolder.getInstance();
holder.init(producer, orderProducer);
return holder;
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnBean(MessageOrderConsumerListener.class)
public OrderConsumer orderConsumer(@Autowired(required = false) List<MessageOrderConsumerListener> messageOrderConsumerListeners) {
Properties properties = rocketMqProperties.getProperties();
properties.setProperty(PropertyKeyConst.GROUP_ID, rocketMqProperties.getGroupId());
OrderConsumer orderConsumer = ONSFactory.createOrderedConsumer(properties);
if (!CollectionUtils.isEmpty(messageOrderConsumerListeners)) {
messageOrderConsumerListeners.forEach(l -> orderConsumer.subscribe(l.topic(), l.subExpression(), l));
orderConsumer.start();
}
return orderConsumer;
}
}
3、配置项:
@Configuration
@ConditionalOnMissingBean({RocketMqProperties.class})
public class RocketMqProperties {
/**
* 阿里云身份验证AccessKey ID,在阿里云用户信息管理控制台获取。
*/
@Value("${aliyun.oss.accessKey}")
private String accessKey;
/**
* 阿里云身份验证AccessKey Secret,在阿里云用户信息管理控制台获取。
*/
@Value("${aliyun.oss.secretKey}")
private String secretKey;
/**
* 实例地址
*/
@Value("${aliyun.rocketmq.nameSrvAddr}")
private String nameSrvAddr;
/**
* consumer groupId
*/
@Value("${aliyun.rocketmq.groupId:default}")
private String groupId;
/**
* 设置Consumer实例的消费模式,取值说明如下:
* CLUSTERING(默认值):表示集群消费。
* BROADCASTING:表示广播消费。
*/
@Value("${aliyun.rocketmq.messageModel:CLUSTERING}")
private String messageModel;
/**
* 设置消息发送的超时时间,默认值:3000;单位:毫秒。
*/
@Value("${aliyun.rocketmq.sendMsgTimeoutMillis:3000}")
private String sendMsgTimeoutMillis;
/**
* 通用配置
*
* @return
*/
public Properties getProperties() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
properties.setProperty(PropertyKeyConst.GROUP_ID, this.groupId);
properties.setProperty(PropertyKeyConst.MessageModel, this.messageModel);
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
return properties;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getNameSrvAddr() {
return nameSrvAddr;
}
public void setNameSrvAddr(String nameSrvAddr) {
this.nameSrvAddr = nameSrvAddr;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getMessageModel() {
return messageModel;
}
public void setMessageModel(String messageModel) {
this.messageModel = messageModel;
}
public String getSendMsgTimeoutMillis() {
return sendMsgTimeoutMillis;
}
public void setSendMsgTimeoutMillis(String sendMsgTimeoutMillis) {
this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
}
4、消费回调方法:
/**
* 消费回调监听器
*
* @author zhangjiahui
* @date 2020/12/15 5:17 下午
*/
public abstract class MessageOrderConsumerListener implements MessageOrderListener {
/**
* 订阅的topic
*
* @return
*/
public abstract String topic();
/**
* 订阅过滤表达式字符串,broker依据此表达式进行过滤。目前只支持或运算<br> eg: "tag1 || tag2 || tag3"<br>
*
* 如果subExpression等于null或者*,则表示全部订阅
*
* @return
*/
public String subExpression() {
return null;
}
5、Producer生产者生产消息工具类:
/**
* Baijiahulian.com Inc. Copyright (c) 2014-2020 All Rights Reserved.
*/
package com.baijia.yunying.rocketmq.util;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* RocketMQ Producer 工具类
* @author zhangjiahui
*/
public class AliyunRocketMqProducerHolder {
private static final Logger LOG = LoggerFactory.getLogger(AliyunRocketMqProducerHolder.class);
private static final AliyunRocketMqProducerHolder INSTANCE = new AliyunRocketMqProducerHolder();
private Producer producer;
private OrderProducer orderProducer;
public static AliyunRocketMqProducerHolder getInstance() {
return INSTANCE;
}
public void init(Producer producer, OrderProducer orderProducer) {
INSTANCE.producer = producer;
INSTANCE.orderProducer = orderProducer;
}
/**
* send msg to kafka
*
* @param topic
*/
public static Boolean sendMessage(String topic, String tag, String body) {
LOG.info("send rocket message start.topic:{} tag:{} body:{}", topic, tag, body);
Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
message.setKey(UUID.randomUUID().toString());
try {
INSTANCE.producer.send(message);
} catch (Exception e) {
LOG.error("send rocket message error", e);
return false;
}
LOG.info("send rocket message end");
return true;
}
/**
* send msg to kafka
*
* @param topic
*/
public static Boolean sendOrderMessage(String topic, String tag, String body) {
LOG.info("send rocket order message start.topic:{} tag:{} body:{}", topic, tag, body);
Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
String uuid = UUID.randomUUID().toString();
message.setKey(uuid);
try {
INSTANCE.orderProducer.send(message, uuid);
} catch (Exception e) {
LOG.error("send rocket message error", e);
return false;
}
LOG.info("send rocket order message end");
return true;
}
}
6、可选择:支持SpringBoot自动配置:META-INF下增加spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.baijia.yunying.rocketmq.config.AliyunRocketMqAutoConfiguration
使用方式:
1、开启:可以把注解加到启动入口方法上或者@Configuration类上
@SpringBootApplication
@EnableAliyunRocketMq
public class WebApplication {
public static void main(String[] args) {
SpringApplication.run(WebApplication.class, args);
}
}
1、消费MQ
/** 消费者
* @author zhangjiahui
* @date 2020/12/16 2:46 下午
*/
@Component
public class AliyunRocketMqListener extends MessageOrderConsumerListener {
@Override
public String topic() {
return "xxx_xxx_xxx";
}
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
return OrderAction.Success;
}
}
2、使用AliyunRocketMqProducerHolder工具类即可:
AliyunRocketMqProducerHolder.sendMessage(String topic, String tag, String body);
AliyunRocketMqProducerHolder.sendOrderMessage(String topic, String tag, String body);
心的:技术沉淀,封装公共脚手架,这套就是封装到我们freamwork-client里。后续再用直接通过Maven引用即可。时间问题代码里也有一些可以升级的地方。
版权声明:本文为qq360694660原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。