RocketMQ二 补充:springboot整合rocketmq

springboot整合rocketmq

建议先阅读RocketMQ二:java整合rocketmq开发模型

环境依赖

 <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>
    </dependencies>

在这里插入图片描述
要选择和服务端的rocketmq相同的版本

代码案例

基础案例

生产者
在需要发送的地方引入RocketMQTemplate

@Resource
private RocketMQTemplate rocketMQTemplate;
	/**
	 * 普通消息发送
	 */
	public boolean sendMessage(String topic, String jsonString, Map<String, String> map) {
		//同步发送消息
//		SendResult sendResult = rocketMQTemplate.syncSend(topic, jsonString);

		//单向发送
//        rocketMQTemplate.sendOneWay(topic, msg);

//        //异步发送消息
//		rocketMQTemplate.asyncSend(topic, jsonString, new SendCallback() {
//			@Override
//			public void onSuccess(SendResult sendResult) {
//			    System.out.println(sendResult);
//			}
//
//			@Override
//			public void onException(Throwable throwable) {
//				System.out.printf("%-10d OK %s %n", "发送失败", throwable);
//			}
//		});
		//自定义属性发送
		MessageBuilder<Object> builder = MessageBuilder.withPayload(jsonString);
		map.forEach((index, value) -> {
			builder.setHeader(index, value);
		});
		Message<Object> message = builder.build();
		SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
		String status = sendResult.getSendStatus().name();
		if ("send_ok".equalsIgnoreCase(status)) return true;
		return false;
	}

消费者

@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",consumeMode= ConsumeMode.CONCURRENTLY)
public class SimpleConsumer implements RocketMQListener<String> {
	@Override
	public void onMessage(String msg) {
		System.out.println("收到消息:"+msg);
	}
}

简单的消费场景建议用上面的消费者,如果是一些复杂的消费场景,需要用到其他的属性,比如头信息,队列信息等这样就不好使了,建议使用下面这种方式

@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",consumeMode= ConsumeMode.CONCURRENTLY)
public class Consumer implements RocketMQListener<MessageExt> {
	@Override
	public void onMessage(MessageExt message) {
		String msgId = message.getMsgId();
		String tags = message.getTags();
		Map<String, String> properties = message.getProperties();
		User user = JSONObject.parseObject(message.getBody(), User.class);
		System.out.println("msgId:"+msgId);
		System.out.println("tags:"+tags);
		System.out.println("body:"+user);
		properties.forEach((i,v)->{
			System.out.println(i+":"+v);
		});
	}
}

RocketMQMessageListener 注解解析

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
	//nameserver 服务地址  ${rocketmq.name-server:}默认从配置文件中获取
    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    //权限校验
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    //自定义的消息轨迹主题
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
	//消费者组
    String consumerGroup();
	//订阅的主题
    String topic();
	//用于消息过滤 通过selectorType 和 selectorExpression定制自己的过滤策略
    SelectorType selectorType() default SelectorType.TAG;
	//用于消息过滤
    String selectorExpression() default "*";
	//消费模式  顺序消费还是并行消费
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
	//消息模式 广播还是集群
    MessageModel messageModel() default MessageModel.CLUSTERING;
	//消费者最大线程数
    int consumeThreadMax() default 64;
	//重试次数 并发模式下-1表示16次  顺序模式下-1表示Integer的最大值
    int maxReconsumeTimes() default -1;
	//消费超时
    long consumeTimeout() default 15L;
	//重试超时
    int replyTimeout() default 3000;
	//权限校验
    String accessKey() default "${rocketmq.consumer.access-key:}";
	//权限校验
    String secretKey() default "${rocketmq.consumer.secret-key:}";
	//是否开启 消息轨迹
    boolean enableMsgTrace() default false;
	//自定义的消息轨迹主题
    String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";
	//nameserver服务地址
    String nameServer() default "${rocketmq.name-server:}";
	
    String accessChannel() default "${rocketmq.access-channel:}";
}

顺序发送

/**
	 * 顺序发送
	 */
	public boolean sendOrderly(String topic, String jsonString, Map<String, String> map) {
		for (int i = 0; i < 100; i++) {
			//表示同一orderId 表示是同一订单的消息
			int orderId = i;
			//统一订单消息设计到处理步骤
			for (int j = 0; j < 4; j++) {
				String step = "";
				switch (j) {
					case 0:
						step = "支付";
						break;
					case 1:
						step = "扣减库存";
						break;
					case 2:
						step = "发送物流";
						break;
					case 3:
						step = "用户签收";
						break;
				}
				SendResult sendResult = rocketMQTemplate.syncSendOrderly(
						topic, jsonString + "OrderId: "+ orderId+ " step: " + step, orderId + "");

			}
		}
		return true;
	}
//注意:这里使用consumeMode= ConsumeMode.ORDERLY 相当于 上一篇的 MessageListenerOrderly
@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",consumeMode= ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener<String> {
	@Override
	public void onMessage(String message) {
		//业务处理
		System.out.println(message);
	}
}

广播消息

@RocketMQMessageListener 属性配置 messageModel = MessageModel.BROADCASTING

延迟消息

//延迟发送
	public boolean sendDelay(String topic, String jsonString,Long timeOut){
		Message<String> build = MessageBuilder.withPayload(jsonString).build();
		//开源版本不支持 自定义时间 timeOut 没什么作用   但是可以通过设置延迟级别 这个级别和原生api 是一样的
		SendResult sendResult = rocketMQTemplate.syncSend(topic, build, timeOut,4);
		System.out.println(sendResult);
		return true;
	}

消息过滤

消息过滤通过RocketMQMessageListener 的selectorType和selectorExpression属性搭配使用
消息发送

//消息过滤
	public boolean sendFilter(String topic, String jsonString,String tags,Map<String, String> map){
		//自定义属性发送
		MessageBuilder<Object> builder = MessageBuilder.withPayload(jsonString);
		String tag = "";
		for (int i = 0; i < 10; i++) {
			if (i%2 == 0){
				tag = "mytag";
			}else {
				if (map != null){
					map.forEach((index, value) -> {
						builder.setHeader(index, value);
					});
				}
				tag = "othertag";
			}
			Message<Object> message = builder.build();
			//springboot 中 把tags整合到 topic + ":mytag"  冒号后边就是标签
			SendResult sendResult = rocketMQTemplate.syncSend(topic + ":" + tag, message);
//			System.out.println(sendResult);
		}
		return true;
	}

简单的过滤

@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",
		consumeMode= ConsumeMode.CONCURRENTLY,selectorType = SelectorType.TAG,
		selectorExpression = "mytag")
public class FilterConsumer implements RocketMQListener<MessageExt> {
	@Override
	public void onMessage(MessageExt message) {
		String tags = message.getTags();
		String body = new String(message.getBody());
		System.out.println("consumer1---tags:"+tags+" body:"+body);
	}
}

通过sql过滤

@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer2", topic = "test2",
		consumeMode= ConsumeMode.CONCURRENTLY,selectorType = SelectorType.SQL92,
		selectorExpression = "TAGS is not null and TAGS = 'othertag' and aa is not null and aa = 'dd' ")
public class FilterConsumer2 implements RocketMQListener<MessageExt> {
	@Override
	public void onMessage(MessageExt message) {
		String tags = message.getTags();
		String body = new String(message.getBody());
		System.out.println("consumer2---tags:"+tags+" body:"+body);
	}
}

事务消息

发送消息

//事务消息
	public boolean sendMessageInTransaction(String topic, String jsonString) throws InterruptedException {
		String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
		for (int i = 0; i < 10; i++) {
			Message<String> message = MessageBuilder.withPayload(jsonString)
					//建议不要使用下面这行代码的事务Id 作为后续业务处理的唯一性
//					.setHeader(RocketMQHeaders.TRANSACTION_ID,"TransID_"+i)
					//建议自定义ID  表示该事务的id 作为唯一性
					.setHeader("orderId","TransID_"+i)
					//建议不要使用这种方式 配置tags  后续处理会丢失
//					.setHeader(RocketMQHeaders.TAGS,tags[i % tags.length])
					//建议使用自定义的属性 来配置tags 后续如果需要根据tags过滤 可以使用sql方式过滤
					.setHeader("tags",tags[i % tags.length])
					.setHeader("MyProp","MyProp_"+i)
					.build();
			SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, message,"");
			Thread.sleep(10);
		}
		return true;
	}

注册事务处理机制
该机制用于 提交 或者回滚 相当于 原生api的producer.setTransactionListener(new TransactionListener() {}

package com.rocketmq.demo.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;

/**
 * ·
 * 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。
 * 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
 * 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。
 **/
//@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
    //在提交完事务消息后执行
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //这个事务id 最好自己定义 可以根据订单Id 或者某些唯一的主键
        Object transId = msg.getHeaders().get("orderId");
        String tags = msg.getHeaders().get("tags", String.class);
        if(StringUtils.contains(tags,"TagA")){
            //模拟业务成功
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagB")){
            //模拟业务失败回滚
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            //模拟业务超时 网络异常
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    //在对UNKNOWN状态的消息进行状态回查时执行
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        //这个事务id 最好自己定义 可以根据订单Id 或者某些唯一的主键
        //真正的业务处理过程 需要根据这个id去查对应的状态 作为模拟测试  这里用不上
        String transId = msg.getHeaders().get("orderId").toString();
        String tags = msg.getHeaders().get("tags").toString();
        if(StringUtils.contains(tags,"TagC")){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagD")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

消息接收
和之前没什么不同

@Component
@RocketMQMessageListener(consumerGroup = "springbootConsumer", topic = "test2",consumeMode= ConsumeMode.CONCURRENTLY)
public class Consumer implements RocketMQListener<MessageExt> {
	@Override
	public void onMessage(MessageExt message) {
		String msg = new String(message.getBody());
		String orderId = message.getUserProperty("orderId");
		String tags = message.getUserProperty("tags");
		System.out.println("orderId:"+orderId);
		System.out.println("tags:"+tags);
		System.out.println("body:"+msg);
	}
}

注意事项:
之前注册事务处理机制的时候没有指定组,而是指定了rocketMQTemplate
在这里插入图片描述
因此当你有很多个消费者组的时候,都是共用这个事务处理逻辑,那么我们怎么解决这个问题呢?
由于之前指定的是rocketMQTemplate 这个beanName,那么根据这个,我们可以定义另一个rocketMQTemplateOther,然后@RocketMQTransactionListener指定这个rocketMQTemplateOther,通过
rocketMQTemplateOther发送的事务消息就会经过其他事务处理机制。

案例:
使用原来的rocketMQTemplate
在这里插入图片描述
就会进入到
在这里插入图片描述
定义另一个RocketMQTemplate

/**
 * 定义另一个 RocketMQTemplate
 */
@ExtRocketMQTemplateConfiguration
public class RocketMQTemplateOther extends RocketMQTemplate {
}

定义另一个事务处理机制
在这里插入图片描述
在发送消息的地方引入

@Resource
private RocketMQTemplate rocketMQTemplateOther;

通过rocketMQTemplateOther来发送事务
在这里插入图片描述
在这里插入图片描述
这样就实现了 每个消费者组 之间事务机制的隔离

上诉例子的全部发送代码

package com.rocketmq.demo.unti;

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;

@Component
public class RocketUtil {

	@Resource
	private RocketMQTemplate rocketMQTemplate;
	@Resource
	private RocketMQTemplate rocketMQTemplateOther;

	/**
	 * 普通消息发送
	 */
	public boolean sendMessage(String topic, String jsonString, Map<String, String> map) {
		//同步发送消息
//		SendResult sendResult = rocketMQTemplate.syncSend(topic, jsonString);

		//单向发送
//        rocketMQTemplate.sendOneWay(topic, msg);

//        //异步发送消息
//		rocketMQTemplate.asyncSend(topic, jsonString, new SendCallback() {
//			@Override
//			public void onSuccess(SendResult sendResult) {
//			    System.out.println(sendResult);
//			}
//
//			@Override
//			public void onException(Throwable throwable) {
//				System.out.printf("%-10d OK %s %n", "发送失败", throwable);
//			}
//		});
		//自定义属性发送
		MessageBuilder<Object> builder = MessageBuilder.withPayload(jsonString);
		map.forEach((index, value) -> {
			builder.setHeader(index, value);
		});
		Message<Object> message = builder.build();
		SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
		String status = sendResult.getSendStatus().name();
		if ("send_ok".equalsIgnoreCase(status)) return true;
		return false;
	}

	/**
	 * 顺序发送
	 */
	public boolean sendOrderly(String topic, String jsonString, Map<String, String> map) {
		for (int i = 0; i < 100; i++) {
			//表示同一orderId 表示是同一订单的消息
			int orderId = i;
			//统一订单消息设计到处理步骤
			for (int j = 0; j < 4; j++) {
				String step = "";
				switch (j) {
					case 0:
						step = "支付";
						break;
					case 1:
						step = "扣减库存";
						break;
					case 2:
						step = "发送物流";
						break;
					case 3:
						step = "用户签收";
						break;
				}
				SendResult sendResult = rocketMQTemplate.syncSendOrderly(
						topic, jsonString + "OrderId: "+ orderId+ " step: " + step, orderId + "");

			}
		}
		return true;
	}
	//延迟发送
	public boolean sendDelay(String topic, String jsonString,Long timeOut){
		Message<String> build = MessageBuilder.withPayload(jsonString).build();
		//开源版本不支持 自定义时间 timeOut 没什么作用   但是可以通过设置延迟级别 这个级别和原生api 是一样的
		SendResult sendResult = rocketMQTemplate.syncSend(topic, build, timeOut,4);
		System.out.println(sendResult);
		return true;
	}

	//消息过滤
	public boolean sendFilter(String topic, String jsonString,String tags,Map<String, String> map){
		//自定义属性发送
		MessageBuilder<Object> builder = MessageBuilder.withPayload(jsonString);
		String tag = "";
		for (int i = 0; i < 10; i++) {
			if (i%2 == 0){
				tag = "mytag";
			}else {
				if (map != null){
					map.forEach((index, value) -> {
						builder.setHeader(index, value);
					});
				}
				tag = "othertag";
			}
			Message<Object> message = builder.build();
			//springboot 中 把tags整合到 topic + ":mytag"  冒号后边就是标签
			SendResult sendResult = rocketMQTemplate.syncSend(topic + ":" + tag, message);
//			System.out.println(sendResult);
		}
		return true;
	}
	//事务消息
	public boolean sendMessageInTransaction(String topic, String jsonString) throws InterruptedException {
		String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
		for (int i = 0; i < 10; i++) {
			Message<String> message = MessageBuilder.withPayload(jsonString)
					//建议不要使用下面这行代码的事务Id 作为后续业务处理的唯一性
//					.setHeader(RocketMQHeaders.TRANSACTION_ID,"TransID_"+i)
					//建议自定义ID  表示该事务的id 作为唯一性
					.setHeader("orderId","TransID_"+i)
					//建议不要使用这种方式 配置tags  后续处理会丢失
//					.setHeader(RocketMQHeaders.TAGS,tags[i % tags.length])
					//建议使用自定义的属性 来配置tags 后续如果需要根据tags过滤 可以使用sql方式过滤
					.setHeader("tags",tags[i % tags.length])
					.setHeader("MyProp","MyProp_"+i)
					.build();
//			SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, message,"");
			SendResult sendResult = rocketMQTemplateOther.sendMessageInTransaction(topic, message,"");
			Thread.sleep(10);
		}
		return true;
	}
}

版权声明:本文为admin522043032原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。