Spring Boot整合WebSocket实现客户端与服务器之间的双向通信

在之前浏览器还不支持WebSocket的时候,Web开发者大多使用轮询接口的方式来实现近实时的数据更新。这种单方向通信的方式,由于服务器是被动接受查询,只能实现近实时的消息更新,且轮询的频率很难准确确定,如果频率高势必会增加服务器的负担;如果频率低,服务器端的消息可能很有很长的延迟才能达到客户端。
如今,web3.0时代的到来,几乎所有的浏览器和Web服务器均支持了WebSocket。WebSocket的产生正式为了解决客户端与Web服务器之间单向通信的问题。WebSocket实现了浏览器(客户端)与Web服务器的双向通信,建立连接之后任何一方都可以主动发送消息到对方。

Spring Boot提供了WebSocket的自动化配置,按照下面的步骤即可快速的将WebSocket整合到项目中。
本文使用的环境如下:

  • Spring Boot:2.6.6
  • JDK:11.0.2

1.引入依赖Starter

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
 </dependency>

2. 编写WebSocket消息处理器

首先介绍一下WebSocketHandler接口,此接口用于处理连接的生命周期事件和消息处理:

package org.springframework.web.socket;
/**
 * A handler for WebSocket messages and lifecycle events.
 */
public interface WebSocketHandler {
	/**
	 * WebSocket连接建立成功被调用,可在此处保存会话信息。
	 */
	void afterConnectionEstablished(WebSocketSession session) throws Exception;
	/**
	 * 当消息到达时调用,可在此处处理消息。
	 */
	void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
	/**
	 * 当底层的消息传输发生错误时被调用。
	 */
	void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
	/**
	 * 当连接被任何一方关闭时,或者发生传输错误后被调用。
	 */
	void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
	/**
	 * 是否支持部分(不完整)消息。
	 */
	boolean supportsPartialMessages();
}

WebSocketHandler实现类及其继承关系
实现自己的Handler:

import com.github.cloudgyb.websocket.config.WebSocketEndpoint;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

/**
 * 简单的文本消息处理器
 *
 * @author cloudgyb
 * @since 2022/4/4 19:01
 */
@Component
public class MyWebSocketHandler extends TextWebSocketHandler {
    /**
     * 保存连接的会话
     */
    private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        System.out.println("连接已建立,会话ID:" + session.getId() + ",客户端地址:" + session.getRemoteAddress());
        sessions.put(session.getId(),session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        System.out.println("接受到消息:" + message.getPayload());
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        System.out.println("消息传输出错!");
        exception.printStackTrace();
    }

    /**
     * 连接关闭移除会话
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        System.out.println("连接被关闭,会话ID:" + session.getId() + ",客户端地址:" + session.getRemoteAddress());
        sessions.remove(session.getId());
    }

    /**
     * 向客户端推送消息
     *
     * @param msg 文本消息
     */
    public void pushMsg(String msg) {
        final Collection<WebSocketSession> webSocketSessions = sessions.values();
        final TextMessage textMessage = new TextMessage(msg);
        webSocketSessions.forEach(s -> {
            try {
                s.sendMessage(textMessage);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
}

上边实现了主要的4个方法,分别用于处理连接建立、文本消息处理、处理传输错误和处理连接关闭。另外使用了ConcurrentHashMap实现了会话的管理,将建立连接时的会话保存了起来,用于我们集体推送消息到客户端。

3. WebSocket配置类

上边的Handler还不能处理WebSocket消息,因为还没启用WebSocket,下面的类用于启用WebSocket和注册端点处理器。

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.handler.TextWebSocketHandler;

/**
 * 启用WebSocket和注册Handler
 *
 * @author cloudgyb
 * @since 2022/4/4 19:00
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    private final MyWebSocketHandler myWebSocketHandler;
    public WebSocketConfig(MyWebSocketHandler myWebSocketHandler){
    	this.myWebSocketHandler = myWebSocketHandler;
    }
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myWebSocketHandler, "/ws");
    }
}

上边注册了我们自定义的Handler,这样客户端就可以使用ws://localhost:8080/ws进行连接了。

4.测试消息发送

启动项目,我们使用Postman来进行测试:
测试消息发送服务端输出:
在这里插入图片描述

5. 服务端消息推送

接下来我来实现并演示一下,服务器消息推送到客户端:

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 消息产生源,使用一个线程来模拟消息的产生,5秒一个消息。
 * 我实现了ApplicationRunner接口,这样项目一旦启动就有消息产生了
 *
 * @author cloudgyb
 * @since 2022/4/4 21:27
 */
@Component
public class MsgSource implements ApplicationRunner {
    private final MyWebSocketHandler myWebSocketHandler;

    public MsgSource(MyWebSocketHandler myWebSocketHandler) {
        this.myWebSocketHandler = myWebSocketHandler;
    }

    @Override
    public void run(ApplicationArguments args) {
        new Thread(() -> {
            while (true) {
                myWebSocketHandler.pushMsg("这是一个消息" + new Date());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

使用Postman测试,打开多个标签建立多个连接,同时都能收到服务端推送的消息:
在这里插入图片描述
到此为止,Spring Boot整合WebSocket实现客户端与服务器之间的双向通信已经完成了。

下面是一些代码设计上的优化,如果你只是了解如何整合,你可以忽略。

6. 代码设计优化

6.1 不合理分析

上面的WebSocketConfig类的registerWebSocketHandlers方法用于注册Handler,我直接将MyWebSocketHandler注入到了该类中。
仔细想想这样设计不合理:

  • 一个配置类依赖了某个功能的具体实现?
  • 当我们有多个Handler时,需要修改该配置类增加Handler的实现依赖,以注册Handler。不符合开闭原则!
  • Handler的实现类与WebSocketConfig严重耦合。

6.2 优化

我们可以自定义一个注解WebSocketEndpoint来标识一个类是一个Handler。

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 标注到Handler上,以便自动注册
 *
 * @author cloudgyb
 * @since 2022/4/4 19:11
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface WebSocketEndpoint {
    /**
     * WebSocket 端点路径
     */
    String value();
}

修改MyWebSocketHandler实现:

@Component
@WebSocketEndpoint("/ws")
public class MyWebSocketHandler extends TextWebSocketHandler {
   //方法实现同上,省略。。。
}

我们标注了@WebSocketEndpoint注解,指定了端点路径。

修改WebSocketConfig 配置类:

/**
 * 启用WebSocket和注册Handler
 *
 * @author cloudgyb
 * @since 2022/4/4 19:00
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    private final ObjectProvider<TextWebSocketHandler> webSocketHandlers;

    public WebSocketConfig(ObjectProvider<TextWebSocketHandler> webSocketHandlers) {
        this.webSocketHandlers = webSocketHandlers;
    }

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        webSocketHandlers.forEach(textWebSocketHandler -> {
            final WebSocketEndpoint annotation = textWebSocketHandler.getClass()
                    .getAnnotation(WebSocketEndpoint.class);
            if (annotation != null) {
                final String endpoint = annotation.value();
                registry.addHandler(textWebSocketHandler, endpoint);
            }
        });
    }
}

这里我们借助了Spring提供的ObjectProvider方便的拿到所有的实现了TextWebSocketHandler的Handler Bean,这样我们就可以不用关心到底有多少个Handler,达到了一次性注册的目的。并且增加一个handler该配置了也无需修改了!
我们遵守了面向对象设计里面的开闭原则。

仔细看看,其实MyWebSocketHandler 类的设计也有不可理的地方,它本质上是一个消息处理器,但是我们的设计让它同时维护了session,这点是不是不符合单一职责原则呢?

7. 源码地址

为了方便大家参考,我将该文对应的源码放大了Github上:https://github.com/cloudgyb/websocket-spring-boot.


版权声明:本文为gybshen原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。