Browse Source

websocket demo

cr7118@sina.cn 3 years ago
parent
commit
f8af64bb24

+ 60 - 0
application-facade/src/main/java/com/factory/websocket/WebSocketConfig.java

@@ -0,0 +1,60 @@
+package com.factory.websocket;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+import java.io.IOException;
+
+@Configuration
+@EnableWebSocket
+@RestController
+@RequestMapping("/websocket")
+public class WebSocketConfig implements WebSocketConfigurer {
+
+    @GetMapping("/pushone")
+    public void pushone() throws IOException {
+        WebSocketSession session=WsSessionManager.get("123617832");
+        if(session!=null){
+            session.sendMessage(new TextMessage("test111111111111111111111111111啥地方啥地方回家"));
+        }
+
+    }
+
+    @Autowired
+    private WebSocketHandler webSocketHandler;
+    @Autowired
+    private WebSocketInterceptor webSocketInterceptor;
+
+    @Override
+    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+        registry
+                .addHandler(webSocketHandler, "myWS/test")
+                .addInterceptors(webSocketInterceptor)
+                .setAllowedOrigins("*");
+    }
+
+    @Bean
+    public TaskScheduler taskScheduler(){
+        ThreadPoolTaskScheduler taskScheduler=new ThreadPoolTaskScheduler();
+        taskScheduler.setPoolSize(40);
+        taskScheduler.initialize();
+        return taskScheduler;
+    }
+
+//
+//    @Bean
+//    public ServerEndpointExporter serverEndpointExporter(){
+//        return new ServerEndpointExporter();
+//    }
+}

+ 66 - 0
application-facade/src/main/java/com/factory/websocket/WebSocketHandler.java

@@ -0,0 +1,66 @@
+package com.factory.websocket;
+
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+@Component
+public class WebSocketHandler extends TextWebSocketHandler {
+
+    /**
+     * 建立成功事件
+     * @param session
+     * @throws Exception
+     */
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+        Object token = session.getAttributes().get("token");
+        if (token != null) {
+            WsSessionManager.add(token.toString(), session);   // 用户连接成功,放入在线用户缓存  存储信息可以使用redis代替
+        } else {
+            throw new RuntimeException("用户登录已经失效!");
+        }
+    }
+
+
+    /**
+     * 接收消息事件
+     *
+     * @param session
+     * @param message
+     * @throws Exception
+     */
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        String payload = message.getPayload();
+        Object token = session.getAttributes().get("token");
+        session.sendMessage(new TextMessage("test"));
+    }
+
+    /**
+     * 关闭
+     * @param session
+     * @param status
+     * @throws Exception
+     */
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+        Object token = session.getAttributes().get("token");
+        if (token != null) {
+            WsSessionManager.remove(token.toString());
+        }
+    }
+
+    /**
+     * 异常处理
+     * @param session
+     * @param exception
+     * @throws Exception
+     */
+    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
+        WsSessionManager.removeAndClose(String.valueOf(session.getAttributes().get("token")));
+    }
+
+}

+ 43 - 0
application-facade/src/main/java/com/factory/websocket/WebSocketInterceptor.java

@@ -0,0 +1,43 @@
+package com.factory.websocket;
+
+import cn.hutool.http.HttpUtil;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 过来过滤安全确定 验证用户身份
+ */
+@Component
+public class WebSocketInterceptor implements HandshakeInterceptor {
+
+    /**
+     *
+     * @param serverHttpRequest
+     * @param serverHttpResponse
+     * @param webSocketHandler
+     * @param map
+     * @return
+     * @throws Exception
+     */
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
+        HashMap<String, String> paramMap = HttpUtil.decodeParamMap(serverHttpRequest.getURI().getQuery(), "utf-8");
+        if (paramMap.get("token") != null) {
+            map.put("token", paramMap.get("token"));
+            map.put("userid", "11111");
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
+
+    }
+}

+ 55 - 0
application-facade/src/main/java/com/factory/websocket/WsSessionManager.java

@@ -0,0 +1,55 @@
+package com.factory.websocket;
+
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class WsSessionManager {
+
+    private static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
+    /**
+     * 添加 session
+     *
+     * @param key
+     */
+    public static void add(String key, WebSocketSession session) {
+        SESSION_POOL.put(key, session);
+    }
+
+    /**
+     * 删除 session,会返回删除的 session
+     *
+     * @param key
+     * @return
+     */
+    public static WebSocketSession remove(String key) {
+        return SESSION_POOL.remove(key);
+    }
+
+    /**
+     * 删除并同步关闭连接
+     *
+     * @param key
+     */
+    public static void removeAndClose(String key) {
+        WebSocketSession session = remove(key);
+        if (session != null) {
+            try {
+                session.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * 获得 session
+     *
+     * @param key
+     * @return
+     */
+    public static WebSocketSession get(String key) {
+        return SESSION_POOL.get(key);
+    }
+}