消息队列
在秒杀场景中,用户为了能抢到最终的商品,完成“秒杀”,一般会在秒杀开始前不断地刷新浏览器,防止自己错过秒杀活动的开始,这个过程的请求如果直接到达数据库,很可能会造成服务器的崩溃。
如何应对客户端对数据库频繁的读请求呢?一种办法是使用缓存,将用户比较关心的库存、销量和价格等热点数据存入缓存中,将请求挡在数据库上层的缓存中。另外一种办法是将可以静态化的数据,例如商品图片、商品详情等数据尽量静态化,并放入CDN节点中,这样也可以减轻服务器的压力。
但是,当秒杀活动开始时,对于100个商品的请求次数可能在同一时间达到几千甚至上万次,而且这时一定会去数据库中修改库存以及创建订单,属于高并发的写请求,如何应对如此量级的高并发呢,最常见的办法是使用消息队列。
消息队列的作用
生活中其实处处可见消息队列,从广义上来讲,所有的负责客户端和服务端通讯的第三方(中间件)都叫消息队列,例如菜鸟驿站之于顾客和商家,邮局之于收发信件的双方等等。在分布式业务场景中,消息队列最主要的作用在于:削峰限流、应用解耦以及异步处理等。
削峰限流
消息队列的一个作用是拦截浏览器到达的请求,举例来说,如果有10000个请求同时到达,而服务器的处理能力是每秒10个,那么所有的请求都会被暂存在消息队列中,并在后台启用10个线程同步处理这些请求,这样大约需要100秒可以处理完所有的请求,并返回给浏览器处理结果。这样的响应时间看似很长,但对于秒杀系统这样只在那一瞬间流量特别大的情况是可以接受的(何况我只是举例来说啦)。
当然,在真正的业务场景中,仅仅是靠消息队列是完全不够的,试想一下,如果所有请求全部存入队列,那这队列的容量得多大?在我们的例子中,至少得保证能存放10000个请求吧,然而实际上能买到商品的用户只有100个,这样其实很多请求都是无需处理的,也就是说,是资源冗余的。
所以,消息队列常常还需要配合其他组件来使用,以达到削峰限流的目的,其管理流程如下图所示
![![[Drawing 2022-10-30 15.40.57.excalidraw]]](https://img-blog.csdnimg.cn/45e85a65d49d48a1ba801dff33e6cb5b.png)
对于秒杀系统中限流工作流程的理解
- 用户点击秒杀,实际上给后台发送了一个获取验证码的请求。
- 后台产生验证码(图片,数字,拖动等),然后将其返回给前台。
- 用户填写验证码的结果,将其一并发送给后台,请求自己的令牌。
- 令牌的总数是一定的,假设验证码填写正确,并且用户也成功请求到了自己的令牌,那么接下来就是将其加入队列等待执行了。对于未能请求到令牌的用户,将直接返回秒杀已结束的提示信息。
- 限流器其实与秒杀大闸的作用是类似的,只不过秒杀大闸用于分布式场景,限制的是所有服务的总流量,而这里的限流器只限制单机流量罢了。
- 当请求通过层层“选拔”后,才会最终到达执行队列,等待处理。
应用解耦
消息队列的第二个作用是应用解耦,顾名思义,就是降低各个应用程序之间的耦合度。举例说明,USB口有不同的型号,如果电脑想要和不同的手机之间传递数据,那市面上有多少种USB口,电脑就要实现多少种,这显然是不可能的,所以才会有扩展坞,这时候扩展坞就可以充当“消息队列”,电脑和手机之间通过扩展坞进行通信。将来即便是USB口再有拓展,那也只需要去扩展坞这个中间件,而我们的电脑(应用程序)确是健壮的。
在有消息队列的情况下,服务器和多个客户端之间的通信是这样的。
![![[Drawing 2022-10-30 17.50.14.excalidraw]]](https://img-blog.csdnimg.cn/cd976959255a4e559408aed6a819788c.png)
异步处理业务
消息队列的另外一个作用是异步处理业务,比如在秒杀流程中,一般会经历用户下单、用户付款以及订单生成等过程,这里面只有用户下单这一个节点是高并发的,其它流程同一时间一般不会有过多的请求。那么其实可以将用户下单以外的请求放在另外的队列处理机中去进行处理,从而达到异步处理业务的效果。
为啥选择RocketMQ
常见的消息队列有ActiveMQ、RabbitMQ、Kafka和RocketMQ,那么我们为什么要选择RocketMQ呢?当然是因为我们只会RocketMQ啦(不是)。
但是可不能跟面试官这么说,所以我在广泛查阅了资料(一篇)后,对这四种我所了解的消息队列做出了如下比较。
| 特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|---|
| 客户端SDK | Java,.NET,C++ | 语言无关 | Java,Scala | Java,C++,Go |
| 消息推拉模式 | Push | Push | Pull | Pull |
| 顺序消息 | 加锁的方式实现有序 | 分区有序 | 严格有序,并可优雅扩展 | |
| 定时消息 | 支持 | 不支持 | 支持 | |
| 集群消息 | 不支持 | 支持(异步生产者) | 支持(同步模式) | |
| 广播消息 | 支持 | 不支持 | 支持 | |
| 单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
| 消息时延 | ms级 | us级 | ms级以内 | ms级 |
| 高可用 | 主从架构 | 主从架构 | 分布式架构 | 分布式架构 |
| 消息回溯 | 不支持 | 不支持 | 支持 |
首先,我希望我使用的是一个拉式的消息模式,并且希望它支持的消息类型比较全面,在秒杀场景下,它的吞吐量我也希望能尽量高,综合考虑下来,只有RocketMQ最合适了。
RocketMQ在秒杀系统中的应用
异步处理业务
SpringBoot集成RocketMQ
安装RocketMQ
- 去官网下载
rocketmq-all-4.9.1-bin-release.zip文件,解压后进入主文件夹。 - 启动NameServer
nohup sh bin/mqnamesrv &
# 验证是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log
- 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 验证是否启动成功
tail -f ~/logs/rocketmqlogs/broker.log
- 测试消息收发
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
- 关闭Broker和NameServer
在SpringBoot中使用RocketMQ
- 引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
- 在
application.yml中配置RocketMQ
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: seckill
- 创建生产者
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
// 消息发送方法
public void sendMessage(String topic, String message) {
for (int i = 0; i < 10; i++) {
String destination = topic + ":tag" + (i % 3 + 1);
Message build = MessageBuilder.withPayload(message + i).build();
rocketMQTemplate.asyncSend(destination, build, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("SUCCESS: " + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("FAILURE: " + throwable.getMessage());
}
}, 3000);
}
}
}
- 创建消费者组(共计三个)
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup1", topic = "TestTopic", selectorExpression = "tag1")
public class SpringConsumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Consumer1 receive message: " + message);
}
}
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup2", topic = "TestTopic", selectorExpression = "tag2")
public class SpringConsumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Consumer2 receive message: " + message);
}
}
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup3", topic = "TestTopic", selectorExpression = "tag3")
public class SpringConsumer3 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Consumer3 receive message: " + message);
}
}
- 创建控制器方法
@Controller
@RequestMapping("/mqtest")
public class SpringController {
@Resource
private SpringProducer springProducer;
@GetMapping("/sendmessage")
public String sendMessage(String message) {
springProducer.sendMessage("TestTopic", message);
return "消息发送成功";
}
}
- 创建主应用,并启动SpringBoot应用
@SpringBootApplication(scanBasePackages = "com.musoulee.myseckill.rocketmq")
public class RocketMQTestWithSpring {
public static void main(String[] args) {
SpringApplication.run(RocketMQTestWithSpring.class);
}
}
- 浏览器发送请求
http:127.0.0.1:8080/mqtest/message?message=hello world,控制台打印结果如下,可以看到,各消费者组都成功消费了自己所订阅的消息。

异步扣减库存流程
前面有提到,可以将一个复杂的业务拆分成为多个流程,并将它们放在多个队列处理机中异步处理,使用RocketMQ中的事务性消息可以很轻松地做到这件事。
总体来讲,事务性消息采用两阶段提交的方式来保证本地事务的一致性,以秒杀系统下单并最终扣减库存为例,整个流程如下图所示
![[Drawing 2022-10-31 00.13.52.excalidraw|800]]
下面我来逐步解释一下上面这个图
- 第一步,生产者向MQ服务器发送一个消息,说自己已经准备好要创建订单了,此时是半成品消息,叫做
half message。 - 第二步,服务器返回给生产者“就绪”的信息,意思是你干你的吧。
- 第三步,生产者开始执行本地事务,这个过程通过
RocketMQLocalTransactionListener#executeLocalTransaction()方法来完成。 - 第四步,生产者向MQ服务器提交事务的执行结果,要么
commit,要么rollback。 - 第五步,如果因为断网等原因导致第四步中服务器接收不到生产者提交的事务结果,那么会启动一个回查机制。
- 第六步,生产者去检查本地事务的执行状态,这个过程通过
RocketMQLocalTransactionListener#checkLocalTransaction()方法来完成。 - 第七步,生产者再次向MQ服务器提交本地事务的执行结果,如果仍未被服务器接收到,那么会再次启动回查机制,默认回查个15次就完事儿。
- 第八步,只有当MQ服务器接收到事务执行成功的信息后,才会将这个消息投递给消费者,在这里体现为库存在MySQL中的最终扣减。
如何快速回查事务是否执行成功?
这主要是通过一张称为库存流水的表来实现的,在准备阶段,生产者向MQ服务器发送半成品消息时就往库存流水表内添加了一条记录,它包括商品id、下单数量以及流水状态三个字段,其中最为关键的就是流水状态这个字段,它有三个取值
- 默认是0,代表流水刚刚创建
- 如果事务成功提交,那么就将其状态修改为1(代表COMMIT)
- 如果创建订单失败,那么就将其状态修改为2(代表ROLLBACK)
为什么要使用库存流水来进行回查呢?
这是由于库存流水表的粒度更小,多个事务并发执行时加锁的开销更小。
代码实现
OrderServiceImpl#createAsync()
public void createAsync(String userId, String itemId, String promotionId, int amount) {
// 查看是否售罄
String key = "item:stock:over:" + itemId;
if (redisTemplate.hasKey(key)) {
throw new BusinessException(CommonErrorEnum.STOCK_NOT_ENOUGH);
}
// 生成库存流水
ItemStockLog itemStockLog = itemService.createItemStockLog(itemId, amount);
logger.debug("创建库存流水成功:" + itemStockLog);
// 生成消息体
JSONObject body = new JSONObject();
body.put("itemId", itemId);
body.put("amount", amount);
body.put("itemStockLogId", itemStockLog.getId());
// 本地事务参数,本地事务需要执行的是生成订单的方法
JSONObject args = new JSONObject();
args.put("userId", userId);
args.put("itemId", itemId);
args.put("promotionId", promotionId);
args.put("itemStockLogId", itemStockLog.getId());
// 发送消息
String destination = "seckill:decrease_stock";
Message message = MessageBuilder.withPayload(body.toString()).build();
logger.debug("尝试投递扣减库存消息【" + body.toString() + "】");
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, args);
if (transactionSendResult.getLocalTransactionState() == LocalTransactionState.UNKNOW) {
throw new BusinessException(CommonErrorEnum.CREATE_ORDER_FAIL);
} else if (transactionSendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
throw new BusinessException(CommonErrorEnum.CREATE_ORDER_FAIL);
}
}
生产者
@Component
@RocketMQTransactionListener
public class LocalTransactionListener implements RocketMQLocalTransactionListener {
private Logger logger = LoggerFactory.getLogger(LocalTransactionListener.class);
@Autowired
private OrderService orderService;
@Autowired
private ItemService itemService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object args) {
String tag = msg.getHeaders().get("rocketmq_TAGS").toString();
try {
if ("decrease_stock".equals(tag)) {
return this.createOrder(args);
} else return RocketMQLocalTransactionState.UNKNOWN;
} catch (Exception e) {
logger.info("执行本地事务时发生错误!");
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 创建订单
*
* @param args
* @return
*/
private RocketMQLocalTransactionState createOrder(Object args) {
// 从参数中获取变量
JSONObject param = JSONObject.parse((String) args);
String userId = (String) param.get("userId");
String itemId = (String) param.get("itemId");
int amount = (int) param.get("amount");
String promotionId = (String) param.get("promotionId");
int itemStockLogId = (int) param.get("itemStockLogId");
try {
Order order = orderService.create(userId, itemId, promotionId, itemStockLogId, amount);
logger.debug("本地事务提交完成 [" + order.getId() + "]");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
logger.error("创建订单失败", e);
itemService.updateItemStockLogStatusById(itemStockLogId, 2);
logger.debug("更新流水完成 [" + itemStockLogId + "]");
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 回查事务执行状态
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
JSONObject body = JSONObject.parse(new String((byte[]) msg.getPayload()));
Integer itemStockLogId = Integer.parseInt((String) body.get("itemStockLogId"));
ItemStockLog itemStockLogById = itemService.findItemStockLogById(itemStockLogId);
if (itemStockLogById == null) {
// 流水没有成功创建,所以rollback了
return RocketMQLocalTransactionState.ROLLBACK;
} else if (itemStockLogById.getStatus() == 0) {
// 流水创建成功,本地事务执行失败,应该是断网了
return RocketMQLocalTransactionState.UNKNOWN;
} else if (itemStockLogById.getStatus() == 1) {
// 本地事务执行成功了,因为流水的状态改变了
return RocketMQLocalTransactionState.COMMIT;
} else {
// 在创建订单时出错了,此时将流水状态修改为2,应当是回滚了
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
消费者
@Component
@RocketMQMessageListener(topic = "seckill", consumerGroup = "seckill_stock", selectorExpression = "decrease_stock")
public class DecreaseStockConsumer implements RocketMQListener<String> {
private Logger logger = LoggerFactory.getLogger(DecreaseStockConsumer.class);
@Autowired
private ItemService itemService;
@Override
public void onMessage(String message) {
JSONObject param = JSONObject.parse(message);
String itemId = (String) param.get("itemId");
int amount = (int) param.get("amount");
try {
itemService.decreaseStock(itemId, amount);
logger.info("扣减库存成功");
} catch (Exception e) {
logger.info("在DB中扣减库存失败");
}
}
}
削峰限流
验证码
使用的是easycaptcha这一开源框架,使用起来相当简单,只需要在Maven工程中引入以下依赖
<dependency>
<groupId>com.pig4cloud.plugin</groupId>
<artifactId>captcha-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
然后在控制器中编写如下方法即可
@ResponseBody
@RequestMapping(path = "/captcha", method = RequestMethod.GET)
public ResponseModel getCaptcha(String token) {
// 新生成验证码
SpecCaptcha specCaptcha = new SpecCaptcha(130, 50);
// 将验证码和用户绑定,并存入Redis中,大小写不敏感
String key = "captcha:" + token;
redisTemplate.opsForValue().set(key, specCaptcha.text().toLowerCase(), 1, TimeUnit.MINUTES);
// 将验证码写给前端
return ResponseModel.createSuccess(specCaptcha.toBase64());
}
秒杀大闸
在上一步获得验证码后,用户就需要带着验证码来申请令牌,如果验证码正确,就调用promotionServiceImpl#generateToken()方法来发放令牌,这里我们可以使用Redis来做秒杀大闸,限制令牌的总数量。
所以第一步,我们需要在SpringBoot启动时初始化一些数据,这包括商品详情的缓存以及秒杀大闸的相关设置。
@Component
public class CacheInit {
private Logger logger = LoggerFactory.getLogger(CacheInit.class);
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private ItemService itemService;
@PostConstruct
public void init() {
List<Item> items = itemService.findItemsOnPromotion();
// 缓存商品详情
items.stream().map(
item -> {
item = itemService.getDetailInCache(item.getId());
return item;
}
).forEach(item -> logger.info("加载" + item.getId() + "完毕"));
// 秒杀大闸, 1000个令牌
redisTemplate.opsForValue().set("promotion:gate", 1000);
logger.info("秒杀大闸设置完毕");
}
}
启动SpringBoot应用后,控制台打印如下信息。

下面是生成令牌的代码。
@Service
public class PromotionServiceImpl implements PromotionService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private ItemMapper itemMapper;
/**
* 根据传入的id生成令牌
*
* @param userId
* @param itemId
* @return 如果申请失败,就返回null,否则返回令牌
*/
@Override
public String generateToken(String userId, String itemId) {
// 判断是否售罄
if (redisTemplate.hasKey("item:stock:over:" + itemId)) {
return null;
}
// 校验商品是否正在促销
Item item = itemMapper.selectByPrimaryKey(itemId);
if (item == null || item.getPromotion() == null || item.getItemStock().getStock() == 0) {
return null;
}
// 校验用户是否已经申请过令牌
String key = "promotion:token:" + userId + ":" + itemId;
if(redisTemplate.hasKey(key)){
// 重复申请不可取
return null;
}
// 秒杀大闸
ValueOperations vos = redisTemplate.opsForValue();
// 如果令牌数量不够,那就不发了
if (vos.decrement("promotion:gate", 1) < 0) {
return null;
}
// 绑定当前用户,当前商品,给他们一个令牌
String token = ToolBox.getRandomUUID();
// 放到Redis里
redisTemplate.opsForValue().set(key, token, 10, TimeUnit.MINUTES);
return token;
}
}
限流器
限流器用于单机限流,使用的是guava提供的RateLimiter类,用于限制某一方法在一段时间内的平均请求次数。
- 首先引入guava依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
- 改造
OrderController.java
@Controller
@RequestMapping("/order")
@CrossOrigin(origins = "${myseckill.web.path}", allowedHeaders = "*", allowCredentials = "true")
public class OrderController {
// 限制每秒最多处理10个请求
private RateLimiter rateLimiter = RateLimiter.create(10);
@ResponseBody
@RequestMapping(path = "/token", method = RequestMethod.POST)
public ResponseModel generateToken(String token,
String itemId,
@NotEmpty(message = "验证码不能为空") String captcha) {
if(!rateLimiter.tryAcquire()){
throw new BusinessException(CommonErrorEnum.SERVER_BUSY);
}
/*more code*/
}
}
队列处理机
队列处理机是通过Spring封装好的线程池ThreadPoolTaskExecuter类来实现的,当获取令牌成功后,会通过创建一个线程池来创建订单,最终完成交易,代码如下所示。
@ResponseBody
@RequestMapping(path = "/create", method = RequestMethod.POST)
public ResponseModel create(String token, @NotEmpty String itemId, String promotionToken, int amount) {
if (!rateLimiter.tryAcquire()) {
throw new BusinessException(CommonErrorEnum.SERVER_BUSY);
}
User user = (User) redisTemplate.opsForValue().get(token);
// 判断当前用户有无令牌
String key = "promotion:token:" + user.getId() + ":" + itemId;
String realPromotionToken = (String) redisTemplate.opsForValue().get(key);
// 如果没有令牌或者令牌不一致,那么就下单失败
if (realPromotionToken == null || !realPromotionToken.equals(promotionToken)) {
throw new BusinessException(CommonErrorEnum.CREATE_ORDER_FAIL);
}
// 线程池中取出一个线程异步执行下订单的方法
Future future = taskExecutor.submit(() -> {
orderService.createAsync(user.getId(), itemId, amount);
return null;
});
try {
future.get();
} catch (Exception e) {
throw new BusinessException(CommonErrorEnum.CREATE_ORDER_FAIL);
}
return ResponseModel.create();
}
总结
无论是秒杀大闸,还是单机限流,其原理都是一样的,都是使用令牌的机制,常见的有漏桶算法和令牌桶算法,这个我们留待以后再来讨论。