SpringBoot搭建WebSocket推送服务

先说下简介,这是菜鸟教程上的说明:
WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。 WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。 在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。 现在,很多网站为了实现推送技术,所用的技术都是 Ajax 轮询。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP请求,然后由服务器返回最新的数据给客户端的浏览器。这种传统的模式带来很明显的缺点,即浏览器需要不断的向服务器发出请求,然而HTTP请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多的带宽等资源。 HTML5 定义的 WebSocket 协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
在这里插入图片描述
在我们企业项目开发过程中,可能会遇到如下业务场景:推送、扫码、聊天、远程投放等。这些业务场景都有一个共同点那就是需要服务器主动发消息至客户端,如果服务器不主动发送消息,那么就需要客户端时时刻刻轮询地发HTTP请求来确认是否有自己的消息,这样的处理方式虽然可以达到解决问题的目的,但是并不优雅,而且频繁的请求也占用了过多的无效资源。目前大部分主流浏览器已经支持webSocket通讯方式,从而实现客户端和服务器实现长连接,服务器主动发消息至客户端,我们利用这一技术可以高效实现实际业务。我就以一个简单的Demo演示SpringBoot实现WebSocket服务。
SpringBoot对websocket的starter

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

另外是一些工具jar

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.5</version>
        </dependency>

1.开启SpringBoot对webSocket的支持

/**
* @ClassName: WebSocketConfig
* @Description: socket配置,打开webSocket的支持
* @author wangzhi
* @date 2019年11月25日
 */
@Configuration
public class WebSocketConfig {
	
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    } 

}

2.添加webSocket服务暴露类

/**
 * 
* @ClassName: WebSocketServer
* @Description: WebSocket服务
* @author 王智
* @date 2019年11月25日
*
 */
@Slf4j
@ServerEndpoint("/websocket/{uid}")
@Component
public class WebSocketServer {

	/**
	 * 实时连接数量
	 */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

	/**
	 * 会话容器
	 */
    public static Map<Long,WebSocketServer> webSocketMap = new ConcurrentHashMap<>(128);

	/**
	 * webSocket会话对象
	 */
    private Session session;

	/**
	 * 业务变量(用户id)
	 */
    private Long uid;

	/**
	 * 业务服务
	 */
	private static CourseManager	courseManager;

	@Autowired
	public void setCourseManager(CourseManager courseManager){
		this.courseManager = courseManager;
	}

	/**
    * @Description 监听连接
    * @param session
    * @param uid 用户id
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("uid") Long uid) {
    	//获取该用户信息,做业务逻辑判断
		//User user = this.redisService.get("user:login:"+uid)
		//绑定session对象
		this.session = session;
		this.uid = uid;
		//收到连接请求,服务器应答
		try{
			this.sendMessage("hello connection is success");
			//将session会话对象存入map容器中,方便其它地方获取此session发送信息
			webSocketMap.put(uid,this);
		}catch (Exception e){
			log.error("连接失败");
			e.printStackTrace();
		}

    }

    /**
    * @Description 接收客户端消息
    * @param message 客户端传来的的消息
    * @param session 会话对象
     */
    @OnMessage
    public void onMessage(String message, Session session) {
    	if (StringUtils.isNotBlank(message)){
    		try {
				log.info("收到来自:{} 的消息:{}", this.uid, message);
				JSONObject jsonMessage = JSON.parseObject(message);
				String messageType = jsonMessage.getString("type");
				switch (messageType) {
					case "course":
						BigDecimal price = courseManager.findPriceByName(jsonMessage.getString("value"));
						this.sendMessage("产品一共:" + price + "元");
						System.out.println();
						break;
					default:
						log.info("消息类型不存在");
						break;
				}
			}catch (Exception e){
    			e.printStackTrace();
			}
		}else{
    		log.info("消息不能为空");
		}
	}

	/**
	 * @Description 关闭连接
	 */
	@OnClose
	public void onClose() {
		if(webSocketMap.containsValue(this.uid)){
			//删除当前session
			webSocketMap.remove(this.uid);
			//在线数减1
			subOnlineCount();
			log.info("有一连接关闭!当前在线用户数为" + getOnlineCount());
		}else{
			log.info("不存在该用户:{}",this.uid);
		}
	}

	/**
	 * 
	 * @param session
	 * @param error
	 */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("socket发生错误");
        error.printStackTrace();
    }
    
	/**
	 * 服务器推送消息至客户端
	 */
    public void sendMessage(String message) throws Exception {
        this.session.getBasicRemote().sendText(message);
    }

	/**
	 * 获取当前在线用户数
	 */
	public static int getOnlineCount() {
        return onlineCount.get();
    }

	/**
	 * 在线用户数加1
	 */
	public static void addOnlineCount() {
        onlineCount.incrementAndGet();
    }

	/**
	 * 在线用户数减1
	 */
	public static void subOnlineCount() {
        onlineCount.decrementAndGet();
    }

}

WebSocketService就是提供webSocket暴露服务的地方,其中有些注意点:

  • @ServerEndpoint("/websocket/{uid}") 此注解标识webSocket服务及其URI
  • webSocketMap 此Map存放每一个socket对象,便于其它业务调用推送消息
  • Session 每一个socket连接都会创建一个session会话标识对象
  • 结合实际业务需要依赖注入一些service或者mapper,不能用普通的@Autowired,因为此webSocket服务类不同于普通SpringBean,spring容器默认管理单例对象,而此webSocket服务类是每一个连接都会创建一次,所以依赖注入的属性必须加以static修饰,以setter方式注入。否则除了第一个初始化连接以外,其余用户创建连接以后调用服务属性会有空指针异常
  • @OnOpen 此注解标识webSocket建立连接的接口
  • @OnMessage 此注解标识webSocket监听客户端消息的接口
  • @OnClose 此注解标识webSocket客户端关闭的调用接口
  • @OnError webSocket发生内部异常的回调接口
  • 分布式环境下建议消息处理采用异步方式进行,以提高整个系统的可用性和性能合理分配,因为默认的Tomcat并发和连接以及线程也是socket服务性能的瓶颈。
  • webSocket协议不同于HTTP,所以访问URL要以ws开头,例如:ws://localhost:8081/test/websocket/9527
  • Nginx反向代理webSocket服务不同于HTTP,但也相差无几,需要加上两个请求头告诉Nginx:
 location / {
            proxy_pass http://socket.kevin.com/;
            proxy_set_header Host $host:$server_port;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
        }

最后启动SpringBoot,看看一个基本的WebSocket服务:

server:
  port: 8081
  servlet:
    context-path: /test
  tomcat:
    uri-encoding: UTF-8
    max-threads: 300
    max-connections: 10000
    max-http-header-size: 8192
    min-spare-threads: 50
    connection-timeout: 200000
    websocket:
      executorMaxSize: 300

随便找一个测试地址:http://www.jsons.cn/websocket/

在这里插入图片描述
我们看到,hello connection is success是服务器返回给客户端的消息,说明第一步连接OK!

在这里插入图片描述
在这里插入图片描述

客户端给服务器发送了{"type":"course","value":"Chinese"} 的消息,服务器成功解析,结合业务逻辑返回了结果给客户端。
当然,Map中存储着在线的session对象,我们其它的service服务可以直接调用这个静态的Map取得其中的session然后主动推送消息给客户端,因为每一个session对应一个客户端,这样就实现了服务器主动发消息给客户端的业务需求。


版权声明:本文为qq_42290561原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。