JAVA中实现websockt一对一聊天
WebSocketConfig 配置
/**
* 开启WebSocket支持
* @author
*/
@Configuration
public class WebSocketConfig {
@Bean
@Conditional(WarAndJarWebscoketAutoWired.class)//自动根据情况装配bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WarAndJarWebscoketAutoWired
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;
/**
* 判断是否使用webscoket容器,如果使用的是本地开发环境就装配bean(非开发环境都是外部tomcat,不需要装配这个bean),使用方式@Conditional(WarAndJarWebscoketAutoWired.class)
*/
public class WarAndJarWebscoketAutoWired implements Condition {
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
Environment env = context.getEnvironment();
String active = env.getProperty("spring.profiles.active");
// return !packageStyle.equals("war");
return (active.equals("dev"));
}
}
WebSocketServer 部分代码:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.CrossOrigin;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
* 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
* @author fengtao.xue
*/
@Component
@CrossOrigin
@ServerEndpoint("/crm/websocket/{userId}")
public class WebSocketServer {
static Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static final AtomicInteger OnlineCount = new AtomicInteger(0);
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
private static ConcurrentHashMap<String, WebSocketServer> webSocketSet = new ConcurrentHashMap<String,WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session WebSocketsession;
//当前发消息的人员userId
private String userId = "";
public Session getWebSocketsession() {
return WebSocketsession;
}
public void setWebSocketsession(Session webSocketsession) {
WebSocketsession = webSocketsession;
}
/**
* 从set中删除
* 删除 WebSocket 用户
* @param userId
*/
public void remove(String userId){
webSocketSet.remove(userId);
}
public static Collection<WebSocketServer> getValues(){
return webSocketSet.values();
}
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(@PathParam(value = "userId") String param, Session webSocketsession, EndpointConfig config) {
userId = param;
this.WebSocketsession = webSocketsession;
this.WebSocketsession = WebSocketsession;
webSocketSet.put(param, this);//加入map中
int cnt = OnlineCount.incrementAndGet(); // 在线数加1
logger.info("有连接加入{},当前连接数为:{}",param, cnt);
logger.info("有连接加入,当前连接数为:{}", cnt);
//sendMessage(this.WebSocketsession, "连接成功");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (!userId.equals("")){
webSocketSet.remove(userId);//从set中删除
int cnt = OnlineCount.decrementAndGet();
logger.info("有连接关闭{},当前连接数为:{}",userId, cnt);
logger.info("有连接关闭,当前连接数为:{}", cnt);
}
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
logger.info("来自客户端的消息:{}",message);
sendMessage(session, "收到消息,消息内容:"+message);
}
/**
* 发生错误后调用的方法
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
logger.error("发生错误:{},Session ID: {}",error.getMessage(),session.getId());
error.printStackTrace();
}
/**
* 发送消息,实践表明,每次浏览器刷新,session会发生变化。
* @param message
*/
public void sendMessage(Session session, String message) {
try {
session.getBasicRemote().sendText(message);
//session.getBasicRemote().sendText(String.format("%s",message));
} catch (IOException e) {
logger.error("发送消息出错:{}", e.getMessage());
e.printStackTrace();
}
}
/**
* 群发消息
* @param message
* @throws IOException
*/
public void broadCastInfo(String message) {
for (String key : webSocketSet.keySet()) {
Session session = webSocketSet.get(key).WebSocketsession;
if(session != null && session.isOpen() && !userId.equals(key)){
sendMessage(session, message);
}
}
}
/**
* 指定Session发送消息
* @param message
* @throws IOException
*/
public void sendToUser(String userId, String message) {
getSocketUser();
logger.info("发送消息:userId"+userId);
WebSocketServer webSocketServer = webSocketSet.get(userId);
if ( webSocketServer != null && webSocketServer.WebSocketsession.isOpen()){
sendMessage(webSocketServer.WebSocketsession, message);
}
else{
logger.warn("当前用户不在线:{}",userId);
}
}
/**
* 获取 Socket 的连接用户
* @return
*/
public List<String> getSocketUser() {
List<String> userIds= webSocketSet.keySet().stream().collect(Collectors.toList());
logger.warn("当前Socket的连接用户:{}",userIds);
return userIds;
}
}
相关文章
版权声明:本文为GoodburghCottage原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。