JAVA中webSockt一对一聊天

JAVA中实现websockt一对一聊天

WebSocketConfig 配置

/**
 * 开启WebSocket支持
 * @author
 */
@Configuration
public class WebSocketConfig {

    @Bean
    @Conditional(WarAndJarWebscoketAutoWired.class)//自动根据情况装配bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

WarAndJarWebscoketAutoWired

import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;

/**
 * 判断是否使用webscoket容器,如果使用的是本地开发环境就装配bean(非开发环境都是外部tomcat,不需要装配这个bean),使用方式@Conditional(WarAndJarWebscoketAutoWired.class)
 */
public class WarAndJarWebscoketAutoWired implements Condition {
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        Environment env = context.getEnvironment();
        String active = env.getProperty("spring.profiles.active");
        //               return !packageStyle.equals("war");
        return (active.equals("dev"));
    }
}

WebSocketServer 部分代码:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.CrossOrigin;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;


/**
 * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
 * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
 * @author fengtao.xue
 */
@Component
@CrossOrigin
@ServerEndpoint("/crm/websocket/{userId}")
public class WebSocketServer {
    static Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static final AtomicInteger OnlineCount = new AtomicInteger(0);
    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
    private static ConcurrentHashMap<String, WebSocketServer> webSocketSet = new ConcurrentHashMap<String,WebSocketServer>();
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session WebSocketsession;
    //当前发消息的人员userId
    private String userId = "";

    public Session getWebSocketsession() {
        return WebSocketsession;
    }

    public void setWebSocketsession(Session webSocketsession) {
        WebSocketsession = webSocketsession;
    }


    /**
     * 从set中删除
     * 删除 WebSocket 用户
     * @param userId
     */
    public void remove(String userId){
        webSocketSet.remove(userId);
    }


    public static Collection<WebSocketServer> getValues(){
        return webSocketSet.values();
    }

    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(@PathParam(value = "userId") String param, Session webSocketsession, EndpointConfig config) {
        userId = param;
        this.WebSocketsession = webSocketsession;
        this.WebSocketsession = WebSocketsession;
        webSocketSet.put(param, this);//加入map中
        int cnt = OnlineCount.incrementAndGet(); // 在线数加1
        logger.info("有连接加入{},当前连接数为:{}",param, cnt);
        logger.info("有连接加入,当前连接数为:{}", cnt);
        //sendMessage(this.WebSocketsession, "连接成功");
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if (!userId.equals("")){
            webSocketSet.remove(userId);//从set中删除
            int cnt = OnlineCount.decrementAndGet();
            logger.info("有连接关闭{},当前连接数为:{}",userId, cnt);
            logger.info("有连接关闭,当前连接数为:{}", cnt);
        }
    }

    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("来自客户端的消息:{}",message);
        sendMessage(session, "收到消息,消息内容:"+message);
    }

    /**
     * 发生错误后调用的方法
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        logger.error("发生错误:{},Session ID: {}",error.getMessage(),session.getId());
        error.printStackTrace();
    }

    /**
     * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
     * @param message
     */
    public void sendMessage(Session session, String message) {
        try {
            session.getBasicRemote().sendText(message);
            //session.getBasicRemote().sendText(String.format("%s",message));
        } catch (IOException e) {
            logger.error("发送消息出错:{}", e.getMessage());
            e.printStackTrace();
        }
    }



    /**
     * 群发消息
     * @param message
     * @throws IOException
     */
    public void broadCastInfo(String message) {
        for (String key : webSocketSet.keySet()) {
            Session session = webSocketSet.get(key).WebSocketsession;
            if(session != null && session.isOpen() && !userId.equals(key)){
                sendMessage(session, message);
            }
        }
    }

    /**
     * 指定Session发送消息
     * @param message
     * @throws IOException
     */
    public void sendToUser(String userId, String message) {
        getSocketUser();
        logger.info("发送消息:userId"+userId);
        WebSocketServer webSocketServer = webSocketSet.get(userId);
        if ( webSocketServer != null && webSocketServer.WebSocketsession.isOpen()){
            sendMessage(webSocketServer.WebSocketsession, message);
        }
        else{
            logger.warn("当前用户不在线:{}",userId);
        }
    }

    /**
     * 获取 Socket 的连接用户
     * @return
     */
    public List<String> getSocketUser() {
        List<String> userIds= webSocketSet.keySet().stream().collect(Collectors.toList());
        logger.warn("当前Socket的连接用户:{}",userIds);
        return userIds;
    }
}

相关文章

vue中websocket心跳机制


版权声明:本文为GoodburghCottage原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。