Spring接入-阿里云RocketMQ

最近要接入rocketMQ,公司是在阿里云上买的rocketMQ服务,发现ALiYun rocketMQ 和通常的Apache rocketMQ 不太一样。看着阿里云的文档花了好长时间接入成功了。为了放便后续其他伙伴接入,就简单封装了一下。后续伙伴接入只需配一些必填的配置项就可以完成接入了。简单记录一下;

maven:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.4.Final</version>
</dependency>

client包源码: 

1、启动注解:

@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import({AliyunRocketMqAutoConfiguration.class, RocketMqProperties.class})
public @interface EnableAliyunRocketMq {
}

2、自动化配置Bean类 :

/**
 * ALiYun-rocketMQ 接入配置
 *
 * @author zhangjiahui
 * @date 2020/12/15 3:48 下午
 */
@Configuration
public class AliyunRocketMqAutoConfiguration {

    @Autowired
    private RocketMqProperties rocketMqProperties;

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean({Producer.class})
    public Producer producer() {
        Producer producer = ONSFactory.createProducer(rocketMqProperties.getProperties());
        producer.start();
        return producer;
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean({OrderProducer.class})
    public OrderProducer orderProducer() {
        OrderProducer orderProducer = ONSFactory.createOrderProducer(rocketMqProperties.getProperties());
        orderProducer.start();
        return orderProducer;
    }


    @Bean
    @ConditionalOnBean({Producer.class, OrderProducer.class})
    public AliyunRocketMqProducerHolder rocketMqProducerHolder(@Autowired Producer producer,
                                                               @Autowired OrderProducer orderProducer) {
        AliyunRocketMqProducerHolder holder = AliyunRocketMqProducerHolder.getInstance();
        holder.init(producer, orderProducer);
        return holder;
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnBean(MessageOrderConsumerListener.class)
    public OrderConsumer orderConsumer(@Autowired(required = false) List<MessageOrderConsumerListener> messageOrderConsumerListeners) {
        Properties properties = rocketMqProperties.getProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, rocketMqProperties.getGroupId());
        OrderConsumer orderConsumer = ONSFactory.createOrderedConsumer(properties);
        if (!CollectionUtils.isEmpty(messageOrderConsumerListeners)) {
            messageOrderConsumerListeners.forEach(l -> orderConsumer.subscribe(l.topic(), l.subExpression(), l));
            orderConsumer.start();
        }
        return orderConsumer;
    }
}

3、配置项:  

@Configuration
@ConditionalOnMissingBean({RocketMqProperties.class})
public class RocketMqProperties {

    /**
     * 阿里云身份验证AccessKey ID,在阿里云用户信息管理控制台获取。
     */
    @Value("${aliyun.oss.accessKey}")
    private String accessKey;

    /**
     * 阿里云身份验证AccessKey Secret,在阿里云用户信息管理控制台获取。
     */
    @Value("${aliyun.oss.secretKey}")
    private String secretKey;

    /**
     * 实例地址
     */
    @Value("${aliyun.rocketmq.nameSrvAddr}")
    private String nameSrvAddr;

    /**
     * consumer groupId
     */
    @Value("${aliyun.rocketmq.groupId:default}")
    private String groupId;

    /**
     * 设置Consumer实例的消费模式,取值说明如下:
     * CLUSTERING(默认值):表示集群消费。
     * BROADCASTING:表示广播消费。
     */
    @Value("${aliyun.rocketmq.messageModel:CLUSTERING}")
    private String messageModel;

    /**
     * 设置消息发送的超时时间,默认值:3000;单位:毫秒。
     */
    @Value("${aliyun.rocketmq.sendMsgTimeoutMillis:3000}")
    private String sendMsgTimeoutMillis;

    /**
     * 通用配置
     *
     * @return
     */
    public Properties getProperties() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        properties.setProperty(PropertyKeyConst.GROUP_ID, this.groupId);
        properties.setProperty(PropertyKeyConst.MessageModel, this.messageModel);
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
        return properties;
    }

    public String getAccessKey() {
        return accessKey;
    }

    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }

    public String getSecretKey() {
        return secretKey;
    }

    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }

    public String getNameSrvAddr() {
        return nameSrvAddr;
    }

    public void setNameSrvAddr(String nameSrvAddr) {
        this.nameSrvAddr = nameSrvAddr;
    }

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getMessageModel() {
        return messageModel;
    }

    public void setMessageModel(String messageModel) {
        this.messageModel = messageModel;
    }

    public String getSendMsgTimeoutMillis() {
        return sendMsgTimeoutMillis;
    }

    public void setSendMsgTimeoutMillis(String sendMsgTimeoutMillis) {
        this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
    }

4、消费回调方法:

/**
 * 消费回调监听器
 *
 * @author zhangjiahui
 * @date 2020/12/15 5:17 下午
 */
public abstract class MessageOrderConsumerListener implements MessageOrderListener {

    /**
     * 订阅的topic
     *
     * @return
     */
    public abstract String topic();

    /**
     * 订阅过滤表达式字符串,broker依据此表达式进行过滤。目前只支持或运算<br> eg: "tag1 || tag2 || tag3"<br>
     *
     * 如果subExpression等于null或者*,则表示全部订阅
     *
     * @return
     */
    public String subExpression() {
        return null;
    }

 5、Producer生产者生产消息工具类: 

/**
 * Baijiahulian.com Inc. Copyright (c) 2014-2020 All Rights Reserved.
 */
package com.baijia.yunying.rocketmq.util;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

/**
 * RocketMQ Producer 工具类
 * @author zhangjiahui
 */
public class AliyunRocketMqProducerHolder {

    private static final Logger LOG = LoggerFactory.getLogger(AliyunRocketMqProducerHolder.class);

    private static final AliyunRocketMqProducerHolder INSTANCE = new AliyunRocketMqProducerHolder();

    private Producer producer;

    private OrderProducer orderProducer;

    public static AliyunRocketMqProducerHolder getInstance() {
        return INSTANCE;
    }

    public void init(Producer producer, OrderProducer orderProducer) {
        INSTANCE.producer = producer;
        INSTANCE.orderProducer = orderProducer;
    }

    /**
     * send msg to kafka
     *
     * @param topic
     */
    public static Boolean sendMessage(String topic, String tag, String body) {
        LOG.info("send rocket message start.topic:{} tag:{} body:{}", topic, tag, body);
        Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
        message.setKey(UUID.randomUUID().toString());
        try {
            INSTANCE.producer.send(message);
        } catch (Exception e) {
            LOG.error("send rocket message error", e);
            return false;
        }
        LOG.info("send rocket message end");
        return true;
    }

    /**
     * send msg to kafka
     *
     * @param topic
     */
    public static Boolean sendOrderMessage(String topic, String tag, String body) {
        LOG.info("send rocket order message start.topic:{} tag:{} body:{}", topic, tag, body);
        Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
        String uuid = UUID.randomUUID().toString();
        message.setKey(uuid);
        try {
            INSTANCE.orderProducer.send(message, uuid);
        } catch (Exception e) {
            LOG.error("send rocket message error", e);
            return false;
        }
        LOG.info("send rocket order message end");
        return true;
    }
}

6、可选择:支持SpringBoot自动配置:META-INF下增加spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    com.baijia.yunying.rocketmq.config.AliyunRocketMqAutoConfiguration

 

使用方式:

1、开启:可以把注解加到启动入口方法上或者@Configuration类上

@SpringBootApplication
@EnableAliyunRocketMq
public class WebApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebApplication.class, args);
    }

}

1、消费MQ

/** 消费者
 * @author zhangjiahui
 * @date 2020/12/16 2:46 下午
 */
@Component
public class AliyunRocketMqListener extends MessageOrderConsumerListener {

    @Override
    public String topic() {
        return "xxx_xxx_xxx";
    }

    @Override
    public OrderAction consume(Message message, ConsumeOrderContext context) {
        return OrderAction.Success;
    }
}

2、使用AliyunRocketMqProducerHolder工具类即可:

AliyunRocketMqProducerHolder.sendMessage(String topic, String tag, String body);
AliyunRocketMqProducerHolder.sendOrderMessage(String topic, String tag, String body);

 心的:技术沉淀,封装公共脚手架,这套就是封装到我们freamwork-client里。后续再用直接通过Maven引用即可。时间问题代码里也有一些可以升级的地方。


版权声明:本文为qq360694660原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。