Springboot集成WebSocket实现消息推送 记录

一、什么是WebSocket?

WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据,在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。–来源自https://juejin.cn/post/6844904004762222606?utm_source=gold_browser_extension

img

(图源自网络)

二、与springboot集成

引入spring-boot-starter-websocket依赖

1
2
3
4
5
 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>${springboot.version}</version>
</dependency>

新建WebSocketHandler

MercuryWebsocketHandler.class 对连接建立处理,消息处理,错误处理,以及连接关闭处理等等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
public class MercuryWebSocketHandler implements WebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
String wsId = WsSessionManager.getWsId(webSocketSession.getUri().getPath());
if (wsId != null) {
// 用户连接成功,放入在线用户缓存
WsSessionManager.add(wsId, webSocketSession);
} else {
throw new RuntimeException("用户登录已经失效!");
}
}

@Override
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
String payload = webSocketMessage.getPayload().toString();
String wsId = WsSessionManager.getWsId(webSocketSession.getUri().getPath());
System.out.println("server 接收到 " + wsId + " 发送的 " + payload);

}

@Override
public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
log.error("发生错误", throwable);
}

@Override
public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
String wsId = WsSessionManager.getWsId(webSocketSession.getUri().getPath());
if (wsId != null) {
WsSessionManager.removeAndClose(wsId);
}
}

@Override
public boolean supportsPartialMessages() {
return false;
}
}

实现WebSocketConfigurer 来对握手前后进行逻辑处理

因为我需要对websocket 握手前进行校验,所以创建WebSocketHandshakeInterceptor.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
@Component
public class WebSocketHandshakeInterceptor extends HttpSessionHandshakeInterceptor {

@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> attributes)
throws Exception {
/**省略校验逻辑 校验不通过 return false**/

return super.beforeHandshake(request, response, webSocketHandler, attributes);
}

@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception e) {
super.afterHandshake(request, response, wsHandler, e);
}
}

实现WebSocketConfigurer

新建websocket配置类,添加拦截地址以及相应的websocket消息处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
@EnableWebSocket
public class WebSocketServerConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 添加拦截地址以及相应的websocket消息处理器
WebSocketHandlerRegistration registration = registry
.addHandler(new MercuryWebSocketHandler(), "/api/websocket/**")
.setAllowedOrigins("*");
// 添加拦截器
registration.addInterceptors(new WebSocketHandshakeInterceptor());
}

}

创建WsSessionManager存储websocket session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@Slf4j
public class WsSessionManager {

private static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
private static AtomicInteger onlineCount = new AtomicInteger(0);
/**
* 添加 session
*
* @param wsId
*/
public static void add(String wsId, WebSocketSession session) {
addOnlineCount();
SESSION_POOL.put(wsId, session);
log.info("有新窗口开始监听:" + wsId + ",当前在线人数为:" + getOnlineCount());
}

/**
* 删除 session,会返回删除的 session
*
* @param wsId
* @return
*/
public static WebSocketSession remove(String wsId) {
WebSocketSession session = SESSION_POOL.remove(wsId);
subOnlineCount();
log.info("释放的wsId为:" + wsId);
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
return session;
}

/**
* 删除并同步关闭连接
*
* @param wsId
*/
public static void removeAndClose(String wsId) {
WebSocketSession session = remove(wsId);
if (session != null) {
try {
// 关闭连接
session.close();
} catch (IOException e) {
log.error(e.getMessage(),e);
}
}
}

/**
* 获得 session
*
* @param key
* @return
*/
public static WebSocketSession get(String wsId) {
// 获得 session
return SESSION_POOL.get(wsId);
}

/**
* 实现服务器主动推送
*/
public static void sendMessage(String wsId,String message) throws Exception {
get(wsId).sendMessage(new TextMessage(message));
}

/**
* 非多机部署请注释此方法
* 实现服务器主动推送 用于解决分布式websocket session不共享的问题
*/
public static void sendMessageBroadcasting(String wsId,String message) throws Exception {
RocketMQProducer producer = SpringContextUtils.getBean(RocketMQProducer.class);
WebSocketMessageSendDto webSocketMessageSendDto = new WebSocketMessageSendDto();
webSocketMessageSendDto.setWsId(wsId);
webSocketMessageSendDto.setMessage(message);
producer.sendObject(RocketMqConstant.Topic.MERCURY_WEB_SOCKET_MESSAGE_SEND,webSocketMessageSendDto);
}

public static synchronized int getOnlineCount() {
return WsSessionManager.onlineCount.get();
}

public static synchronized void addOnlineCount() {
WsSessionManager.onlineCount.getAndIncrement();
}

public static synchronized void subOnlineCount() {
WsSessionManager.onlineCount.getAndDecrement();
}

/**
* url template:http://localhost/api/websocket/{ws_id}
* 需要在url提取ws_id
*/
public static String getWsId(String requestUrl) throws Exception {
String[] split = requestUrl.split("/api/websocket/");
if (split.length < 1) {
throw new Exception("invalid ws request");
}
return split[split.length - 1];
}

}

发送消息给前端

1
WsSessionManager.sendMessage(ws_id,message)

三、前端Vue与websocket集成

新建websocket.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// import { showInfoMsg, showErrorMsg } from '@/utils/popInfo'
import ElementUI from 'element-ui';

const WS_API = `ws://localhost:8080`;
function initWebSocket(e) {
console.log(e)
if(!('WebSocket'in window)) {
ElementUI.Notification({
title: '',
message: '当前浏览器 Not support websocket',
type: 'error',
duration: 0,
});
}
const wsUri = WS_API + '/api/websocket/' + e;
this.socket = new WebSocket(wsUri); //这里面的this都指向vue
this.socket.onerror = webSocketOnError;
this.socket.onmessage = webSocketOnMessage;
this.socket.onclose = closeWebsocket;
}

function webSocketOnError(e) {
// console.log(e)
console.log(e)
ElementUI.Notification({
title: '',
message: 'WebSocket连接发生错误',
type: 'error',
duration: 3000,
});
}

function webSocketOnMessage(e) {
const data = JSON.parse(e.data);
ElementUI.Notification({
title: '',
message: data.msg,
type: 'success',
duration: 5000,
});
}

// 关闭websiocket
function closeWebsocket() {
console.log('连接已关闭...');
}

function close() {
this.socket.close(); // 关闭 websocket
this.socket.onclose = function (e) {
console.log(e); //监听关闭事件
console.log('关闭');
};
}

function webSocketSend(agentData) {
this.socket.send(agentData);
}

export default {
initWebSocket,
close,
};

在main.js引入

1
2
3
import websocket from "具体地址/websocket";

Vue.prototype.$websocket = websocket;

然后在需要初始化websocket的地方,我是在App.vue全局mounted以下代码

1
this.$websocket.initWebSocket({ws_id})//请根据具体逻辑替换ws_id 我这里填入的是jwt token

效果图

image-20220606183934456

进入页面就会连接websocket

image-20220606183828420

后端通过调用WsSessionManager.sendMessage(ws_id,message)发送消息给具体用户

三、解决websocket session在分布式系统不共享问题

在Spring所集成的WebSocket里面,每个ws连接都有一个对应的session:WebSocketSession,在Spring WebSocket中,我们建立ws连接之后可以通过类似这样的方式进行与客户端的通信。但是 ws的session无法序列化到redis, 因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。

本文简单解决方案思路:利用rocketmq广播的特性使得每一台机子都能执行发送消息的动作,这样的操作可能会导致没有对应webSocketSession的机器执行一直没必要的操作,但是因为根据键值ws_id在map中定位session,可以忽略不计。

简单定义一个数据传输对象

1
2
3
4
5
@Data
public class WebSocketMessageSendDto {
private String wsId;
private String message;
}

创建消费者

重点:messageModel = MessageModel.BROADCASTING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqConstant.Topic.MERCURY_WEB_SOCKET_MESSAGE_SEND
, consumerGroup = RocketMqConstant.ComsumerGroup.MERCURY_WEB_SOCKET_MESSAGE_SEND,
messageModel = MessageModel.BROADCASTING)
public class WebSocketMessageSendConsumer implements RocketMQListener<WebSocketMessageSendDto>, RocketMQPushConsumerLifecycleListener {

@Override
public void onMessage(WebSocketMessageSendDto dto) {
log.info("WebSocketMessageSendConsumer 接收到信息:{}",JSON.toJSONString(dto));
try {
WsSessionManager.sendMessage(dto.getWsId(),dto.getMessage());
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}

@Override public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setMaxReconsumeTimes(2);
}
}

消息生产者

1
2
3
4
5
6
7
8
9
10
11
/**
* 实现服务器主动推送
*/
public static void sendMessageBroadcasting(String wsId,String message) throws Exception {
RocketMQTemplate rocketMQTemplate = SpringContextUtils.getBean(RocketMQTemplate.class);
WebSocketMessageSendDto webSocketMessageSendDto = new WebSocketMessageSendDto();
webSocketMessageSendDto.setWsId(wsId);
webSocketMessageSendDto.setMessage(message);
rocketMQTemplate.convertAndSend(RocketMqConstant.Topic.MERCURY_WEB_SOCKET_MESSAGE_SEND, JSON.toJSONString(webSocketMessageSendDto))
}


Springboot集成WebSocket实现消息推送 记录
https://cason.work/2022/06/07/Springboot集成WebSocket实现消息推送-记录/
作者
Cason Mo
发布于
2022年6月7日
许可协议