spring websocket + redismq
引言
之前写webflux通过rabbitmq推送消息,这次写websocket基于redis推送消息,区别在于websocket我们可以主动推消息,也就是说我们只需要一个监听就足够了,webflux中是每个用户对应一个监听者,具体原因可以看这->Spring Reactive Web Webflux 整合 RabbitMQ
单机情况下,websocket进行广播是不需要用到MQ的,但在分布式环境下如果不用MQ是没办法广播的,必须通过一个中间件进行存储然后转发。
同样,不写分析,不看源码,不浪费大家时间,毕竟都开源的想看源码不是都能看,在这复制粘贴是浪费大家时间。
栗子
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
<version>3.10.3</version>
</dependency>
RedisConfiguration
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.messaging.simp.SimpMessagingTemplate;
@Configuration
public class RedisConfig {
//websocket 的 autoconfig自动配置好的
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Bean
RedisMessageListenerContainer socketMessageListenerContainer(RedisConnectionFactory connectionFactory){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
container.addMessageListener((message, pattern) -> {
//反序列化,如果给redis发送消息时不是字符串,需要自己写反序列化,因此这里我故意多此一举使用stringRedisSerializer
String msg = stringRedisSerializer.deserialize(message.getBody());
//通过messagingTemplate,对websocket所有用户进行广播消息
//SocketMsg就是个普通的实体类,也可以直接传字符串。
messagingTemplate.convertAndSend( "/queue/redis_broadcast", new SocketMsg(msg));
}, new PatternTopic("chat.*"));
//new PatternTopic("chat.*") 我们接收所有来自chat.*的消息
//具体看Controller中会有这个Pattern
//并通过messagingTemplate发送给websocket中订阅/queue/redis_broadcast的用户
return container;
}
}
HandShakeInterceptor
为了省事我直接使用JWT了,实际应用中使用OAuth2的TokenService
import com.auth0.jwt.JWT;
import com.auth0.jwt.algorithms.Algorithm;
import com.auth0.jwt.exceptions.JWTVerificationException;
import com.auth0.jwt.interfaces.DecodedJWT;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Date;
import java.util.Map;
@Component
public class HandShakeInterceptor implements HandshakeInterceptor {
/*
* 在WebSocket连接建立之前的操作,以鉴权为例
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//获得参数Authorization,实际上是个JWT令牌,如果验证不出错,设置到attributes中,后面会用到
String authorization = ((ServletServerHttpRequest) request).getServletRequest().getParameter("Authorization");
try{
DecodedJWT verify= JWT.require(Algorithm.HMAC256("123456")).build().verify(authorization);
if (verify!=null && verify.getExpiresAt().after(new Date())){
attributes.put("username",verify.getClaim("username").asString());
}
return true;
}catch (JWTVerificationException ignored){}
return false;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception ex) {
}
}
WebSocket Configuration
为了减少代码我没使用ChannelInterceptor,而是直接使用DefaultHandshakeHandler
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
//自己实现,主要为了在连接上的时候将用户信息注册进accessor
@Autowired
private HandShakeInterceptor handShakeInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/myWebSocket").setAllowedOrigins("*")
.addInterceptors(handShakeInterceptor)
.setHandshakeHandler(new DefaultHandshakeHandler(){
//现在就用到了handShakeInterceptor中的attributes,这里实际上就是获取到用户名后,给accessor加个principal
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
return () -> (String) attributes.get("username");
}
}).withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue","/topic");
registry.setApplicationDestinationPrefixes("/app");
registry.setUserDestinationPrefix("/user/");
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(2 * 1024);
registration.setSendBufferSizeLimit(2 * 1024);
registration.setSendTimeLimit(10000);
}
}
Controller
@RestController
public class HomeController {
//可以使用userRegistry获取注册到accessor中用户的信息了
@Autowired
private SimpUserRegistry userRegistry;
//用来生成一个假的JWT令牌,做测试用,注意我这里打印了用户数量,复制时别复制错了
//还要说明一点,这个用户在注册进accessor时,是不可重复的
//你重复注册用户名不影响功能,但userRegistry.getUserCount()返回的数量只有1个。
@RequestMapping("/")
public String index(@RequestParam String name){
return userRegistry.getUserCount()+" " + JWT.create().withExpiresAt(new Date(System.currentTimeMillis() + 60 * 15 * 1000))
.withClaim("username", name)
.sign(Algorithm.HMAC256("123456"));
}
@Autowired
private StringRedisTemplate redisTemplate;
//接收到客户端的消息后,将消息转发到redis中
@MessageMapping("/sendRedis")
public void sendRedis(Principal principal,SocketMsg msg){
redisTemplate.convertAndSend("chat." + principal.getName(), msg.getMsg());
}
}
html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script>
//这里使用之前生成的JWT
var socket = new SockJS("http://localhost:8080/myWebSocket?Authorization=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE1ODk0MjQyMTIsInVzZXJuYW1lIjoiMTIzMTIzIn0.w9LSRQ9XMRFkQgMElGarGD0pDoWqI_jtEgWSZCHINlw");
var stompClient = Stomp.over(socket);
window.onload = function () {
connect();
}
window.onbeforeunload = function(){
stompClient.disconnect();
}
/**
* 发送用户信息
* */
function chat() {
var value = document.getElementById("info").value;
//对应@MessageMapping("/sendRedis")
stompClient.send("/app/sendRedis", {}, JSON.stringify( {'msg': value} ));
}
/**
* 连接成功后会通过回调订阅
* */
function connect() {
stompClient.connect({
//连接时也可以带header信息,比如:
//注意这里的connect并不是beforeHandshake,而是已经连接上了之后发送的StompCommand.CONNECT
/*
token:"asdasdasdasd",
user:"asdasdasd"
*/
//DefaultHandshakeHandler去掉
//在WebSocketConfig重写configureClientInboundChannel方法 注册一个ChannelInterceptor就可以使用StompHeaderAccessor来获取header
//可以使用StompHeaderAccessor.setUser重新设置UserName等等。
},
function connectCallback(frame) {
// 连接成功时(服务器响应 CONNECTED)的回调方法
alert("success");
stompClient.subscribe('/queue/redis_broadcast', function (response) {
console.log("/queue/redis_broadcast 广播消息:\n"+JSON.parse(response.body).msg);
});
},
function errorCallBack(error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
alert("error");
});
}
</script>
</head>
<body>
<input type="text" id="info"/><button onclick="chat();">发送</button>
</body>
</html>
注意: 如果使用rabbitmq 那么队列名必须不同,具体原因另外一篇文章有。
版权声明:本文为slslslyxz原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。