spring websocket + redismq

引言

之前写webflux通过rabbitmq推送消息,这次写websocket基于redis推送消息,区别在于websocket我们可以主动推消息,也就是说我们只需要一个监听就足够了,webflux中是每个用户对应一个监听者,具体原因可以看这->Spring Reactive Web Webflux 整合 RabbitMQ

单机情况下,websocket进行广播是不需要用到MQ的,但在分布式环境下如果不用MQ是没办法广播的,必须通过一个中间件进行存储然后转发。

同样,不写分析,不看源码,不浪费大家时间,毕竟都开源的想看源码不是都能看,在这复制粘贴是浪费大家时间。

栗子

pom.xml

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.auth0</groupId>
            <artifactId>java-jwt</artifactId>
            <version>3.10.3</version>
        </dependency>

RedisConfiguration

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.messaging.simp.SimpMessagingTemplate;

@Configuration
public class RedisConfig {
	//websocket 的 autoconfig自动配置好的
    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Bean
    RedisMessageListenerContainer socketMessageListenerContainer(RedisConnectionFactory connectionFactory){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        container.addMessageListener((message, pattern) -> {
        	//反序列化,如果给redis发送消息时不是字符串,需要自己写反序列化,因此这里我故意多此一举使用stringRedisSerializer
            String msg = stringRedisSerializer.deserialize(message.getBody());
            //通过messagingTemplate,对websocket所有用户进行广播消息
            //SocketMsg就是个普通的实体类,也可以直接传字符串。
            messagingTemplate.convertAndSend( "/queue/redis_broadcast", new SocketMsg(msg));
        }, new PatternTopic("chat.*"));
        //new PatternTopic("chat.*") 我们接收所有来自chat.*的消息
        //具体看Controller中会有这个Pattern
        //并通过messagingTemplate发送给websocket中订阅/queue/redis_broadcast的用户
        return container;
    }
}

HandShakeInterceptor

为了省事我直接使用JWT了,实际应用中使用OAuth2的TokenService

import com.auth0.jwt.JWT;
import com.auth0.jwt.algorithms.Algorithm;
import com.auth0.jwt.exceptions.JWTVerificationException;
import com.auth0.jwt.interfaces.DecodedJWT;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Date;
import java.util.Map;

@Component
public class HandShakeInterceptor  implements HandshakeInterceptor {

    /*
     * 在WebSocket连接建立之前的操作,以鉴权为例
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        //获得参数Authorization,实际上是个JWT令牌,如果验证不出错,设置到attributes中,后面会用到
        String authorization = ((ServletServerHttpRequest) request).getServletRequest().getParameter("Authorization");
        try{
            DecodedJWT verify= JWT.require(Algorithm.HMAC256("123456")).build().verify(authorization);
            if (verify!=null && verify.getExpiresAt().after(new Date())){
                attributes.put("username",verify.getClaim("username").asString());
            }
            return true;
        }catch (JWTVerificationException ignored){}

        return false;
    }


    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                               WebSocketHandler wsHandler, Exception ex) {

    }
}

WebSocket Configuration

为了减少代码我没使用ChannelInterceptor,而是直接使用DefaultHandshakeHandler

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
	
    //自己实现,主要为了在连接上的时候将用户信息注册进accessor
    @Autowired
    private HandShakeInterceptor handShakeInterceptor;
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/myWebSocket").setAllowedOrigins("*")
                .addInterceptors(handShakeInterceptor)
                .setHandshakeHandler(new DefaultHandshakeHandler(){
                	//现在就用到了handShakeInterceptor中的attributes,这里实际上就是获取到用户名后,给accessor加个principal
                    @Override
                    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
                        return () -> (String) attributes.get("username");
                    }
                }).withSockJS();
    }
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/queue","/topic");
        registry.setApplicationDestinationPrefixes("/app");
        registry.setUserDestinationPrefix("/user/");
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(2 * 1024);
        registration.setSendBufferSizeLimit(2 * 1024);
        registration.setSendTimeLimit(10000);
    }
}

Controller

@RestController
public class HomeController {
	
	//可以使用userRegistry获取注册到accessor中用户的信息了
	@Autowired
    private SimpUserRegistry userRegistry;

	//用来生成一个假的JWT令牌,做测试用,注意我这里打印了用户数量,复制时别复制错了
	//还要说明一点,这个用户在注册进accessor时,是不可重复的
	//你重复注册用户名不影响功能,但userRegistry.getUserCount()返回的数量只有1个。
    @RequestMapping("/")
    public String index(@RequestParam String name){
        return userRegistry.getUserCount()+"  " + JWT.create().withExpiresAt(new Date(System.currentTimeMillis() + 60 * 15 * 1000))
                .withClaim("username", name)
                .sign(Algorithm.HMAC256("123456"));
    }

    @Autowired
    private StringRedisTemplate redisTemplate;

	//接收到客户端的消息后,将消息转发到redis中
    @MessageMapping("/sendRedis")
    public void sendRedis(Principal principal,SocketMsg msg){
        redisTemplate.convertAndSend("chat." + principal.getName(), msg.getMsg());
    }
}

html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
    <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
    <script>
    	//这里使用之前生成的JWT
        var socket = new SockJS("http://localhost:8080/myWebSocket?Authorization=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE1ODk0MjQyMTIsInVzZXJuYW1lIjoiMTIzMTIzIn0.w9LSRQ9XMRFkQgMElGarGD0pDoWqI_jtEgWSZCHINlw");
        var stompClient = Stomp.over(socket);
        window.onload = function () {
            connect();
        }
        window.onbeforeunload = function(){
            stompClient.disconnect();
        }
       
        /**
         * 发送用户信息
         * */

        function chat() {
            var value = document.getElementById("info").value;
            //对应@MessageMapping("/sendRedis")
            stompClient.send("/app/sendRedis", {}, JSON.stringify( {'msg': value} ));
        }

        /**
         * 连接成功后会通过回调订阅
         * */
        function connect() {

            stompClient.connect({
                    //连接时也可以带header信息,比如:
                    //注意这里的connect并不是beforeHandshake,而是已经连接上了之后发送的StompCommand.CONNECT
                    /*
                    token:"asdasdasdasd",
                    user:"asdasdasd"
					*/
					//DefaultHandshakeHandler去掉
					//在WebSocketConfig重写configureClientInboundChannel方法 注册一个ChannelInterceptor就可以使用StompHeaderAccessor来获取header
					//可以使用StompHeaderAccessor.setUser重新设置UserName等等。
                },
                function connectCallback(frame) {
                    // 连接成功时(服务器响应 CONNECTED)的回调方法
                    alert("success");
                    stompClient.subscribe('/queue/redis_broadcast', function (response) {
                        console.log("/queue/redis_broadcast 广播消息:\n"+JSON.parse(response.body).msg);
                    });
                },
                function errorCallBack(error) {
                    // 连接失败时(服务器响应 ERROR 帧)的回调方法
                    alert("error");
                });
        }

    </script>
</head>
<body>
<input type="text" id="info"/><button onclick="chat();">发送</button>
</body>
</html>

注意: 如果使用rabbitmq 那么队列名必须不同,具体原因另外一篇文章有。


版权声明:本文为slslslyxz原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。