一、什么是WebSocket?
WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据,在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。–来源自https://juejin.cn/post/6844904004762222606?utm_source=gold_browser_extension
(图源自网络)
二、与springboot集成
引入spring-boot-starter-websocket依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>${springboot.version}</version> </dependency>
|
新建WebSocketHandler
MercuryWebsocketHandler.class 对连接建立处理,消息处理,错误处理,以及连接关闭处理等等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Slf4j public class MercuryWebSocketHandler implements WebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception { String wsId = WsSessionManager.getWsId(webSocketSession.getUri().getPath()); if (wsId != null) { WsSessionManager.add(wsId, webSocketSession); } else { throw new RuntimeException("用户登录已经失效!"); } }
@Override public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception { String payload = webSocketMessage.getPayload().toString(); String wsId = WsSessionManager.getWsId(webSocketSession.getUri().getPath()); System.out.println("server 接收到 " + wsId + " 发送的 " + payload);
}
@Override public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception { log.error("发生错误", throwable); }
@Override public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception { String wsId = WsSessionManager.getWsId(webSocketSession.getUri().getPath()); if (wsId != null) { WsSessionManager.removeAndClose(wsId); } }
@Override public boolean supportsPartialMessages() { return false; } }
|
因为我需要对websocket 握手前进行校验,所以创建WebSocketHandshakeInterceptor.class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Slf4j @Component public class WebSocketHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
@Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception { return super.beforeHandshake(request, response, webSocketHandler, attributes); }
@Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception e) { super.afterHandshake(request, response, wsHandler, e); } }
|
新建websocket配置类,添加拦截地址以及相应的websocket消息处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Configuration @EnableWebSocket public class WebSocketServerConfig implements WebSocketConfigurer {
@Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { WebSocketHandlerRegistration registration = registry .addHandler(new MercuryWebSocketHandler(), "/api/websocket/**") .setAllowedOrigins("*"); registration.addInterceptors(new WebSocketHandshakeInterceptor()); }
}
|
创建WsSessionManager存储websocket session
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| @Slf4j public class WsSessionManager {
private static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>(); private static AtomicInteger onlineCount = new AtomicInteger(0);
public static void add(String wsId, WebSocketSession session) { addOnlineCount(); SESSION_POOL.put(wsId, session); log.info("有新窗口开始监听:" + wsId + ",当前在线人数为:" + getOnlineCount()); }
public static WebSocketSession remove(String wsId) { WebSocketSession session = SESSION_POOL.remove(wsId); subOnlineCount(); log.info("释放的wsId为:" + wsId); log.info("有一连接关闭!当前在线人数为" + getOnlineCount()); return session; }
public static void removeAndClose(String wsId) { WebSocketSession session = remove(wsId); if (session != null) { try { session.close(); } catch (IOException e) { log.error(e.getMessage(),e); } } }
public static WebSocketSession get(String wsId) { return SESSION_POOL.get(wsId); }
public static void sendMessage(String wsId,String message) throws Exception { get(wsId).sendMessage(new TextMessage(message)); }
public static void sendMessageBroadcasting(String wsId,String message) throws Exception { RocketMQProducer producer = SpringContextUtils.getBean(RocketMQProducer.class); WebSocketMessageSendDto webSocketMessageSendDto = new WebSocketMessageSendDto(); webSocketMessageSendDto.setWsId(wsId); webSocketMessageSendDto.setMessage(message); producer.sendObject(RocketMqConstant.Topic.MERCURY_WEB_SOCKET_MESSAGE_SEND,webSocketMessageSendDto); }
public static synchronized int getOnlineCount() { return WsSessionManager.onlineCount.get(); }
public static synchronized void addOnlineCount() { WsSessionManager.onlineCount.getAndIncrement(); }
public static synchronized void subOnlineCount() { WsSessionManager.onlineCount.getAndDecrement(); }
public static String getWsId(String requestUrl) throws Exception { String[] split = requestUrl.split("/api/websocket/"); if (split.length < 1) { throw new Exception("invalid ws request"); } return split[split.length - 1]; }
}
|
发送消息给前端
1
| WsSessionManager.sendMessage(ws_id,message)
|
三、前端Vue与websocket集成
新建websocket.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| import ElementUI from 'element-ui';
const WS_API = `ws://localhost:8080`; function initWebSocket(e) { console.log(e) if(!('WebSocket'in window)) { ElementUI.Notification({ title: '', message: '当前浏览器 Not support websocket', type: 'error', duration: 0, }); } const wsUri = WS_API + '/api/websocket/' + e; this.socket = new WebSocket(wsUri); this.socket.onerror = webSocketOnError; this.socket.onmessage = webSocketOnMessage; this.socket.onclose = closeWebsocket; }
function webSocketOnError(e) { console.log(e) ElementUI.Notification({ title: '', message: 'WebSocket连接发生错误', type: 'error', duration: 3000, }); }
function webSocketOnMessage(e) { const data = JSON.parse(e.data); ElementUI.Notification({ title: '', message: data.msg, type: 'success', duration: 5000, }); }
function closeWebsocket() { console.log('连接已关闭...'); }
function close() { this.socket.close(); this.socket.onclose = function (e) { console.log(e); console.log('关闭'); }; }
function webSocketSend(agentData) { this.socket.send(agentData); }
export default { initWebSocket, close, };
|
在main.js引入
1 2 3
| import websocket from "具体地址/websocket";
Vue.prototype.$websocket = websocket;
|
然后在需要初始化websocket的地方,我是在App.vue全局mounted以下代码
1
| this.$websocket.initWebSocket({ws_id})
|
效果图
进入页面就会连接websocket
后端通过调用WsSessionManager.sendMessage(ws_id,message)发送消息给具体用户
三、解决websocket session在分布式系统不共享问题
在Spring所集成的WebSocket里面,每个ws连接都有一个对应的session:WebSocketSession,在Spring WebSocket中,我们建立ws连接之后可以通过类似这样的方式进行与客户端的通信。但是 ws的session无法序列化到redis, 因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。
本文简单解决方案思路:利用rocketmq广播的特性使得每一台机子都能执行发送消息的动作,这样的操作可能会导致没有对应webSocketSession的机器执行一直没必要的操作,但是因为根据键值ws_id在map中定位session,可以忽略不计。
简单定义一个数据传输对象
1 2 3 4 5
| @Data public class WebSocketMessageSendDto { private String wsId; private String message; }
|
创建消费者
重点:messageModel = MessageModel.BROADCASTING
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Slf4j @Component @RocketMQMessageListener(topic = RocketMqConstant.Topic.MERCURY_WEB_SOCKET_MESSAGE_SEND , consumerGroup = RocketMqConstant.ComsumerGroup.MERCURY_WEB_SOCKET_MESSAGE_SEND, messageModel = MessageModel.BROADCASTING) public class WebSocketMessageSendConsumer implements RocketMQListener<WebSocketMessageSendDto>, RocketMQPushConsumerLifecycleListener {
@Override public void onMessage(WebSocketMessageSendDto dto) { log.info("WebSocketMessageSendConsumer 接收到信息:{}",JSON.toJSONString(dto)); try { WsSessionManager.sendMessage(dto.getWsId(),dto.getMessage()); } catch (Exception e) { log.error(e.getMessage(),e); } }
@Override public void prepareStart(DefaultMQPushConsumer consumer) { consumer.setMaxReconsumeTimes(2); } }
|
消息生产者
1 2 3 4 5 6 7 8 9 10 11
|
public static void sendMessageBroadcasting(String wsId,String message) throws Exception { RocketMQTemplate rocketMQTemplate = SpringContextUtils.getBean(RocketMQTemplate.class); WebSocketMessageSendDto webSocketMessageSendDto = new WebSocketMessageSendDto(); webSocketMessageSendDto.setWsId(wsId); webSocketMessageSendDto.setMessage(message); rocketMQTemplate.convertAndSend(RocketMqConstant.Topic.MERCURY_WEB_SOCKET_MESSAGE_SEND, JSON.toJSONString(webSocketMessageSendDto)) }
|