直播弹幕系统(七)- 利用动态创建队列完成直播间独立聊天

直播弹幕系统(七)- 利用动态创建队列完成直播间独立聊天

前言

上一篇SpringBoot + STOMP + RabbitMQ(使用MQ替代Spring代理) 中主要讲解了如何整合STOMP以及RabbitMQ代替Spring代理。

其中代码的设计有一点还并不完善:

  • 所有的直播间共用同一个队列。就会造成直播间聊天内容窜了。

因为设计起来,希望是每个直播间有一个独立的队列和交换机。这样就能做到不同直播间的人聊天内容不会乱窜。但是我们又不可能提前去为每个直播间去创建队列和交换机。(排除创建直播间时的操作),我们这里就通过动态创建和监听的方式来完成这个功能。

一. 动态创建队列

我是这样设想的:

  1. 在打开任何一个直播间的时候,Java后端这里我们是能够感应到WebSocket的创建的。我们主要在这里进行队列和交换机的动态创建过程。
  2. 每个直播间的消息都往统一的交换机发送。和上篇文章保持一致:stomp-exchange交换机。
  3. 根据我们的RabbitMQ配置,对这个交换机对应的队列stomp-queue进行监听。再由业务代码来决定,消息该往哪个直播间的交换机发送。(发送的消息体中包含了直播间号)

我们按照这个思路顺序来编写代码。

稍微复习一下,在上篇文章中我们写了个WebSocketEventListener监听类,下面有这么一个函数:

@EventListener
public void handleWebSocketConnectListener(SessionConnectEvent event) {}

当建立WebSocket链接的时候,这个函数就会走进来。那么我们在原本代码基础上,增加动态创建队列的逻辑即可。

@EventListener
public void handleWebSocketConnectListener(SessionConnectEvent event) {
	// 老代码...
	// 如果没有队列就创建一个队列
    createQueueAndExchangeIfNeed(roomId);
}

public void createQueueAndExchangeIfNeed(String roomId) {
    String exchangeName = "Live_" + roomId + "-Exchange";
    String queueName = "Live_" + roomId + "-Queue";

    // 判断是否有队列创建过了
    QueueInformation queueInfo = RabbitMQUtil.getQueueInfo(queueName);
    // 如果创建过队列了,就直接返回,不要重复创建
    if (queueInfo != null) {
        return;
    }
    // 创建新队列
    RabbitMQUtil.createAndBindQueue(queueName);
    // 创建新交换机
    RabbitMQUtil.createAndBindExchange(exchangeName, ExchangeTypeEnum.TOPIC);
    // 绑定队列和交换机
    RabbitMQUtil.binding(queueName, exchangeName);
}

RabbitMQUtil工具类代码,核心:通过AmqpAdmin去创建队列、交换机以及绑定动作。

import kz.constants.ExchangeTypeEnum;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;

/**
 * @Date 2023/1/5 15:32
 * @Created by jj.lin
 */
public class RabbitMQUtil {
    private static AmqpAdmin getAmqpAdmin() {
        return SpringBeanUtil.getBean("amqpAdmin", AmqpAdmin.class);
    }

    public static void createAndBindQueue(String queueName) {
        AmqpAdmin amqpAdmin = getAmqpAdmin();
        Queue queue = new Queue(queueName, true);
        if (StringUtils.isBlank(queueName)) {
            return;
        }
        amqpAdmin.declareQueue(queue);
    }

    public static QueueInformation getQueueInfo(String queueName) {
        if (StringUtils.isBlank(queueName)) {
            return null;
        }
        return getAmqpAdmin().getQueueInfo(queueName);
    }

    public static void createAndBindExchange(String exchangeName, ExchangeTypeEnum typeEnum) {
        AbstractExchange exchange = null;
        switch (typeEnum) {
            case DIRECT:// 直连交换机
                exchange = new DirectExchange(exchangeName, true, false);
                break;
            case TOPIC: // 主题交换机
                exchange = new TopicExchange(exchangeName, true, false);
                break;
            case FANOUT: //扇形交换机
                exchange = new FanoutExchange(exchangeName, true, false);
                break;
            case HEADERS: // 头交换机
                exchange = new HeadersExchange(exchangeName, true, false);
                break;
        }
        getAmqpAdmin().declareExchange(exchange);
    }

    public static void binding(String queueName, String exchangeName) {
        Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, "", null);
        // 绑定队列和交换机
        getAmqpAdmin().declareBinding(binding);
    }
}

这里我们还创建了一个枚举类ExchangeTypeEnum

public enum ExchangeTypeEnum {
    /**
     * 直连交换机
     */
    DIRECT,

    /**
     * 主题交换机
     */
    TOPIC,
    /**
     * 扇形交换机
     */
    FANOUT,
    /**
     * 头交换机
     */
    HEADERS;
}

接下来第二点,我们无需改动,我们需要关注的是业务代码,如何通过业务代码去控制将消息发送到对应直播间的对应交换机上,我们看ChatService.sendMsg()这个函数:

public boolean sendMsg(String message) {
    if (StringUtils.isBlank(message)) {
        return false;
    }
    ChatMessage chatMessage = JsonUtil.parseJsonToObj(message, ChatMessage.class);
    if (chatMessage == null) {
        return false;
    }
    LiveMessage liveMessage = new LiveMessage();
    liveMessage.setType(MessageType.CHAT.toString());
    liveMessage.setContent("用户 [" + chatMessage.getSender() + "] 说 (来自MQ):" + chatMessage.getContent());
    String roomId = chatMessage.getRoomId();
    String exchangeName = "Live_" + roomId + "-Exchange";
	// 主要的修改部分就是这里,根据roomId去拼接对应的交换机名称
    rabbitTemplate.convertAndSend(exchangeName, "", JsonUtil.toJSON(liveMessage));
    return true;
}

后端到这里就改好了(后面会根据流程跑一遍),前端部分很简单,我们只需要更换一下订阅的队列名称即可:

const onMQConnected = () => {
  console.log('RabbitMQ初始化成功');
  // 订阅交换机
  const exchangeName = `/exchange/Live_${roomId}-Exchange`;
  stompMQClient.subscribe(exchangeName, function(data:any) {
    const res = data.body;
    const entity = JSON.parse(res);
    const arr :any = [ entity.content ];
    setBulletList((pre: any[]) => [].concat(...pre, ...arr));
    data.ack();
  }, { ack: 'client' });
};

1.1 测试 - 动态创建队列

首先来看下RabbitMQ的控制台:一共有4个队列。
在这里插入图片描述
接下来我打开URLhttp://localhost:4396/zong/?userId=Zong4&roomId=6。那么对应的就应该自动创建一个名为 Live_6-Queue的队列。我们跟着代码来跑一遍。打开URL,代码进入到此:
在这里插入图片描述
紧接着,分别创建了以下对象:

  • 队列:Live_6-Queue
  • 交换机:Live_6-Exchange

在这里插入图片描述
控制台验证:绑定关系也有了。
在这里插入图片描述

1.2 测试 - 聊天室独立

我们在直播间号为6的地方聊天(点击右侧按钮),这里发送的是HTTP请求。
在这里插入图片描述
Controller层接收:
在这里插入图片描述
此时将这条信息(包含了直播间号等数据)发送给了交换机stomp-exchange。根据项目启动时RabbitMQ的相关配置,对stomp-queue这个队列进行了监听:
在这里插入图片描述
监听到后,将消息委派给ChatService.sendMsg()这个函数来处理:
在这里插入图片描述
这样前端监听的时候,就可以直接拿到自己直播间的消息啦:
在这里插入图片描述
结果如下:
在这里插入图片描述
当然,每个直播间的聊天内容也是独立的哦:
在这里插入图片描述

本篇文章到这里就结束啦,最后我也想说一下:

  1. 其实这一系列的文章都是自己思考后得出的一些设计思路。问题肯定是存在的,当然,现在也是在不断地学习和摸索,看是否有更好的实现方案。但是如果说这种思路或者编码方式对你们有一点帮助,那么这些都是值得的。
  2. 后面会研究前端方面,如何实现弹幕的滚动效果。毕竟我这里是一个简单的聊天室功能。后端方面也会继续更新。

版权声明:本文为Zong_0915原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。