引言
由于新需求,需要实现多语言下实现websocket消息通讯推送,本来打算引入高级消息队列协议的,但体量没那么大,所以选型时选择了通过redis的队列来实现。
实现步骤
pom引入
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Following dependency is required for Full Featured STOMP Broker Relay -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
Redis 监听实现
首先需要实现redis订阅监听
application.yml
spring:
redis:
open: true # 是否开启redis缓存 true开启 false关闭
database: 0
host: 172.1.1.2
port: 6379
password: password # 密码(默认为空)
timeout: 6000ms # 连接超时时长(毫秒)
jedis:
pool:
max-active: 1000 # 连接池最大连接数(使用负值表示没有限制)
max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 10 # 连接池中的最大空闲连接
min-idle: 5 # 连接池中的最小空闲连接
Redis消息监听者容器
package com.example.demo.config;
import com.example.demo.listener.Topic2Listener;
import com.example.demo.listener.TopicListener;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
/**
* @功能描述 Redis消息监听者容器
* @作者 左岸天涯
* @创建时间 2020/10/21 14:52
**/
@SuppressWarnings({"all"})
@Configuration
@EnableCaching
public class RedisConfig {
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
//相当于xml中的bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter TopicAdapter, MessageListenerAdapter Topic2Adapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅了一个叫chat 的通道
container.addMessageListener(TopicAdapter, new PatternTopic("Mytopic"));
//container.addMessageListener(Topic2Adapter, new PatternTopic("Mytopic2"));
//这个container 可以添加多个 messageListener
return container;
}
/**
* 消息监听器适配器,绑定消息处理器
*
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter TopicAdapter() {
return new MessageListenerAdapter(new TopicListener());
}
/**
* 消息监听器适配器,绑定消息处理器
*
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter Topic2Adapter() {
return new MessageListenerAdapter(new Topic2Listener());
}
/**
* redis 读取内容的template
*/
@Bean
StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
//定义value的序列化方式
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
监听内容
package com.example.demo.listener;
import com.example.demo.utils.InjectServiceUtil;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
/**
* @功能描述 监听消息
* @创建时间 2020/10/21 15:07
**/
public class TopicListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("我是Topic监听" + message.toString());
InjectServiceUtil.getInstance().sendMessage().send(message.toString());
}
}
监听器需要单独实现注入
package com.example.demo.utils;
import com.example.demo.service.SendMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @功能描述 监听器中注入
* @创建时间 2020/10/21 15:48
**/
@Component
public class InjectServiceUtil {
@Autowired
private SendMessage sendMessage;
@PostConstruct
public void init(){
InjectServiceUtil.getInstance().sendMessage = this.sendMessage;
}
/**
* 实现单例 start
*/
private static class SingletonHolder {
private static final InjectServiceUtil INSTANCE = new InjectServiceUtil();
}
private InjectServiceUtil (){}
public static final InjectServiceUtil getInstance() {
return SingletonHolder.INSTANCE;
}
/**
* 实现单例 end
*/
public SendMessage sendMessage(){
return InjectServiceUtil.getInstance().sendMessage;
}
}
消息推送实现
package com.example.demo.service;
import com.example.demo.entity.ChatMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
/**
* @功能描述 消息发送
* @创建时间 2020/10/21 15:23
**/
@Service
public class SendMessage {
@Autowired
private SimpMessagingTemplate template;
public void send(String message){
ChatMessage chatMessage=new ChatMessage();
chatMessage.setContent(message);
chatMessage.setType(ChatMessage.MessageType.CHAT);
chatMessage.setSender("监听者广播消息");
System.out.println(chatMessage);
template.convertAndSend("/topic/public",chatMessage);
}
}
STOMP 服务实现
消息体
package com.example.demo.entity;
import lombok.Data;
/**
* @功能描述 TODO
* @创建时间 2020/10/21 14:08
**/
@Data
public class ChatMessage {
private MessageType type;
private String content;
private String sender;
public enum MessageType {
CHAT,
JOIN,
LEAVE
}
}
服务配置
package com.example.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
/**
* @功能描述 STOMP服务配置实现
* @创建时间 2020/10/21 14:05
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private static long HEART_BEAT = 10000;
// @Bean
// public ServerEndpointExporter serverEndpointExporter(){
// return new ServerEndpointExporter();
// }
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
// .addInterceptors(new SessionAuthHandshakeInterceptor())
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//设置简单的消息代理器,它使用Memory(内存)作为消息代理器,
//其中/user和/topic都是我们发送到前台的数据前缀。前端必须订阅以/user开始的消息(.subscribe()进行监听)。
//setHeartbeatValue设置后台向前台发送的心跳,
//注意:setHeartbeatValue这个不能单独设置,不然不起作用,要配合后面setTaskScheduler才可以生效。
ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
te.setPoolSize(1);
te.setThreadNamePrefix("wss-heartbeat-thread-");
te.initialize();
registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT, HEART_BEAT}).setTaskScheduler(te);
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(500 * 1024 * 1024);
registration.setSendBufferSizeLimit(1024 * 1024 * 1024);
registration.setSendTimeLimit(200000);
}
}
服务实现
package com.example.demo.controller;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.example.demo.entity.ChatMessage;
import com.example.demo.service.SendMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Controller;
import java.util.HashMap;
import java.util.Map;
/**
* @功能描述 TODO
* @创建时间 2020/10/21 14:10
**/
@Controller
public class ChatController {
@Autowired
private SimpMessagingTemplate template;
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
return chatMessage;
}
@MessageMapping("/chat.addUser")
@SendTo("/topic/public")
public ChatMessage addUser(@Payload ChatMessage chatMessage,
SimpMessageHeaderAccessor headerAccessor) {
// Add username in web socket session
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
return chatMessage;
}
}
模拟消息发布
package com.example.demo.service;
import cn.hutool.core.date.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @功能描述 TODO
* @创建时间 2020/10/21 15:09
**/
@EnableScheduling //开启定时器功能
@Component
public class MessageSender {
@Autowired
@Resource(name = "stringRedisTemplate")
private StringRedisTemplate stringRedisTemplate;
@Scheduled(fixedRate = 5000) //间隔 通过StringRedisTemplate对象向redis消息队列频道发布消息
public void sendTopicMessage(){
stringRedisTemplate.convertAndSend("Mytopic","Mytopic:时间"+ DateUtil.now());
}
}
websocket客户端实现
index.html
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1.0, minimum-scale=1.0">
<title>Spring Boot WebSocket Chat Application</title>
<link rel="stylesheet" href="/css/main.css" />
</head>
<body>
<noscript>
<h2>Sorry! Your browser doesn't support Javascript</h2>
</noscript>
<div id="username-page">
<div class="username-page-container">
<h1 class="title">Type your username</h1>
<form id="usernameForm" name="usernameForm">
<div class="form-group">
<input type="text" id="name" placeholder="Username" autocomplete="off" class="form-control" />
</div>
<div class="form-group">
<button type="submit" class="accent username-submit">Start Chatting</button>
</div>
</form>
</div>
</div>
<div id="chat-page" class="hidden">
<div class="chat-container">
<div class="chat-header">
<h2>Spring WebSocket Chat Demo</h2>
</div>
<div class="connecting">
Connecting...
</div>
<ul id="messageArea">
</ul>
<form id="messageForm" name="messageForm" nameForm="messageForm">
<div class="form-group">
<div class="input-group clearfix">
<input type="text" id="message" placeholder="Type a message..." autocomplete="off" class="form-control"/>
<button type="submit" class="primary">Send</button>
</div>
</div>
</form>
</div>
</div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script src="/js/main.js"></script>
</body>
</html>
main.js
'use strict';
var usernamePage = document.querySelector('#username-page');
var chatPage = document.querySelector('#chat-page');
var usernameForm = document.querySelector('#usernameForm');
var messageForm = document.querySelector('#messageForm');
var messageInput = document.querySelector('#message');
var messageArea = document.querySelector('#messageArea');
var connectingElement = document.querySelector('.connecting');
var stompClient = null;
var username = null;
var colors = [
'#2196F3', '#32c787', '#00BCD4', '#ff5652',
'#ffc107', '#ff85af', '#FF9800', '#39bbb0'
];
function connect(event) {
username = document.querySelector('#name').value.trim();
if(username) {
usernamePage.classList.add('hidden');
chatPage.classList.remove('hidden');
var socket = new SockJS('/ws');
stompClient = Stomp.over(socket);
stompClient.connect({}, onConnected, onError);
}
event.preventDefault();
}
function onConnected() {
// Subscribe to the Public Topic
stompClient.subscribe('/topic/public', onMessageReceived);
// stompClient.subscribe('/topic/public2', onMessageReceived);
// Tell your username to the server
stompClient.send("/app/chat.addUser",
{},
JSON.stringify({sender: username, type: 'JOIN'})
)
connectingElement.classList.add('hidden');
}
function onError(error) {
connectingElement.textContent = 'Could not connect to WebSocket server. Please refresh this page to try again!';
connectingElement.style.color = 'red';
}
function sendMessage(event) {
var messageContent = messageInput.value.trim();
if(messageContent && stompClient) {
var chatMessage = {
sender: username,
content: messageInput.value,
type: 'CHAT'
};
stompClient.send("/app/chat.sendMessage", {}, JSON.stringify(chatMessage));
messageInput.value = '';
}
event.preventDefault();
}
function onMessageReceived(payload) {
var message = JSON.parse(payload.body);
var messageElement = document.createElement('li');
if(message.type === 'JOIN') {
messageElement.classList.add('event-message');
message.content = message.sender + ' joined!';
} else if (message.type === 'LEAVE') {
messageElement.classList.add('event-message');
message.content = message.sender + ' left!';
} else {
messageElement.classList.add('chat-message');
var avatarElement = document.createElement('i');
var avatarText = document.createTextNode(message.sender[0]);
avatarElement.appendChild(avatarText);
avatarElement.style['background-color'] = getAvatarColor(message.sender);
messageElement.appendChild(avatarElement);
var usernameElement = document.createElement('span');
var usernameText = document.createTextNode(message.sender);
usernameElement.appendChild(usernameText);
messageElement.appendChild(usernameElement);
}
var textElement = document.createElement('p');
var messageText = document.createTextNode(message.content);
textElement.appendChild(messageText);
messageElement.appendChild(textElement);
messageArea.appendChild(messageElement);
messageArea.scrollTop = messageArea.scrollHeight;
}
function getAvatarColor(messageSender) {
var hash = 0;
for (var i = 0; i < messageSender.length; i++) {
hash = 31 * hash + messageSender.charCodeAt(i);
}
var index = Math.abs(hash % colors.length);
return colors[index];
}
usernameForm.addEventListener('submit', connect, true)
messageForm.addEventListener('submit', sendMessage, true)
main.css
* {
-webkit-box-sizing: border-box;
-moz-box-sizing: border-box;
box-sizing: border-box;
}
html,body {
height: 100%;
overflow: hidden;
}
body {
margin: 0;
padding: 0;
font-weight: 400;
font-family: "Helvetica Neue", Helvetica, Arial, sans-serif;
font-size: 1rem;
line-height: 1.58;
color: #333;
background-color: #f4f4f4;
height: 100%;
}
body:before {
height: 50%;
width: 100%;
position: absolute;
top: 0;
left: 0;
background: #128ff2;
content: "";
z-index: 0;
}
.clearfix:after {
display: block;
content: "";
clear: both;
}
.hidden {
display: none;
}
.form-control {
width: 100%;
min-height: 38px;
font-size: 15px;
border: 1px solid #c8c8c8;
}
.form-group {
margin-bottom: 15px;
}
input {
padding-left: 10px;
outline: none;
}
h1, h2, h3, h4, h5, h6 {
margin-top: 20px;
margin-bottom: 20px;
}
h1 {
font-size: 1.7em;
}
a {
color: #128ff2;
}
button {
box-shadow: none;
border: 1px solid transparent;
font-size: 14px;
outline: none;
line-height: 100%;
white-space: nowrap;
vertical-align: middle;
padding: 0.6rem 1rem;
border-radius: 2px;
transition: all 0.2s ease-in-out;
cursor: pointer;
min-height: 38px;
}
button.default {
background-color: #e8e8e8;
color: #333;
box-shadow: 0 2px 2px 0 rgba(0, 0, 0, 0.12);
}
button.primary {
background-color: #128ff2;
box-shadow: 0 2px 2px 0 rgba(0, 0, 0, 0.12);
color: #fff;
}
button.accent {
background-color: #ff4743;
box-shadow: 0 2px 2px 0 rgba(0, 0, 0, 0.12);
color: #fff;
}
#username-page {
text-align: center;
}
.username-page-container {
background: #fff;
box-shadow: 0 1px 11px rgba(0, 0, 0, 0.27);
border-radius: 2px;
width: 100%;
max-width: 500px;
display: inline-block;
margin-top: 42px;
vertical-align: middle;
position: relative;
padding: 35px 55px 35px;
min-height: 250px;
position: absolute;
top: 50%;
left: 0;
right: 0;
margin: 0 auto;
margin-top: -160px;
}
.username-page-container .username-submit {
margin-top: 10px;
}
#chat-page {
position: relative;
height: 100%;
}
.chat-container {
max-width: 700px;
margin-left: auto;
margin-right: auto;
background-color: #fff;
box-shadow: 0 1px 11px rgba(0, 0, 0, 0.27);
margin-top: 30px;
height: calc(100% - 60px);
max-height: 600px;
position: relative;
}
#chat-page ul {
list-style-type: none;
background-color: #FFF;
margin: 0;
overflow: auto;
overflow-y: scroll;
padding: 0 20px 0px 20px;
height: calc(100% - 150px);
}
#chat-page #messageForm {
padding: 20px;
}
#chat-page ul li {
line-height: 1.5rem;
padding: 10px 20px;
margin: 0;
border-bottom: 1px solid #f4f4f4;
}
#chat-page ul li p {
margin: 0;
}
#chat-page .event-message {
width: 100%;
text-align: center;
clear: both;
}
#chat-page .event-message p {
color: #777;
font-size: 14px;
word-wrap: break-word;
}
#chat-page .chat-message {
padding-left: 68px;
position: relative;
}
#chat-page .chat-message i {
position: absolute;
width: 42px;
height: 42px;
overflow: hidden;
left: 10px;
display: inline-block;
vertical-align: middle;
font-size: 18px;
line-height: 42px;
color: #fff;
text-align: center;
border-radius: 50%;
font-style: normal;
text-transform: uppercase;
}
#chat-page .chat-message span {
color: #333;
font-weight: 600;
}
#chat-page .chat-message p {
color: #43464b;
}
#messageForm .input-group input {
float: left;
width: calc(100% - 85px);
}
#messageForm .input-group button {
float: left;
width: 80px;
height: 38px;
margin-left: 5px;
}
.chat-header {
text-align: center;
padding: 15px;
border-bottom: 1px solid #ececec;
}
.chat-header h2 {
margin: 0;
font-weight: 500;
}
.connecting {
padding-top: 5px;
text-align: center;
color: #777;
position: absolute;
top: 65px;
width: 100%;
}
@media screen and (max-width: 730px) {
.chat-container {
margin-left: 10px;
margin-right: 10px;
margin-top: 10px;
}
}
@media screen and (max-width: 480px) {
.chat-container {
height: calc(100% - 30px);
}
.username-page-container {
width: auto;
margin-left: 15px;
margin-right: 15px;
padding: 25px;
}
#chat-page ul {
height: calc(100% - 120px);
}
#messageForm .input-group button {
width: 65px;
}
#messageForm .input-group input {
width: calc(100% - 70px);
}
.chat-header {
padding: 10px;
}
.connecting {
top: 60px;
}
.chat-header h2 {
font-size: 1.1em;
}
}
结语
本文流程思路如下:
生成消息发送–>Redis发布–>Redis订阅–>转换消息推送到websocket–>前端展示信息
redis的消息队列可更换成其它消息队列如rabbitMq、zeroMq等。
使用消息队列就可以实现分布式内容推送,其它开发语言可以通过消息队列推送内容,这里不再深入。
本文参考了其它文章的内容,这里不再一一列举,如有不足之处请谅解。
版权声明:本文为weixin_44575542原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。