赵志浩
Published on 2025-08-18 / 16 Visits
3
0

WebSocket & SSE 场景选型

WebSocket 和 SSE,SSE 本身就是 HTTP 的另外使用形态,所以在网络建立上和 HTTP 是完全一致的。

针对 WebSocket 和 HTTP 的网络连接建立,都是 NIO 的,针对网络事件本身的监听是异步的。

网络通道建立后,收到具体的网络事件时,此时HTTP 场景会通知具体的 GetMapping 进行任务的执行,此时是使用的 Tomcat http的工作线程池中的线程。此线程池数量可配置。

在该方法的完全执行期间,是纯阻塞的,该线程会持续被使用的状态。直到方法体完全执行完成,此时工作线程会被回收。

WebSocket 在网络连接建立时也是 NIO 异步建立连接,此时连接建立后,WebSocket 的@OnMessage 方法在没有任何事件产生时,此时是不会有工作线程介入的。也就是空闲时间不会占用工作线程。

当连接建立后,客户端发送具体的指令上来时,此时网络监听检测到对应事件变更后,会通知 WebSocket 的@OnMessage 方法进行执行。此时分配的是具体的工作线程进行执行。

当服务端要发送具体消息给到客户端时,也是具体的工作线程执行,然后通知具体的 Session 将消息下方客户端。

而 SSE 实际是 HTTP 具体工作线程执行期间,推送具体的消息流给到具体的客户端即可,比如此处代码示例:/Users/zhihaozhao/IdeaProjects/ingeek-robot/ingeek-robot-manage/ingeek-robot-manage-srv/src/main/java/com/ingeek/robot/controller/test/TestController.java

此处代码 SSE 的推送是纯阻塞的,也就是一直消耗 HTTP 的工作线程,直到数据流完全推送完成。

当然此处代码也可以改造为具体的线程池直接推送数据,而不是一直占用 HTTP 工作线程,这个改造是完全没有任何问题的。

@GetMapping("/test")
    public void test(HttpServletResponse response) {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        try (final PrintWriter writer = response.getWriter()) {
            // 要推送的内容
            final String content = "测试一下 SSE 的推送效果,打字机方式输出文本结果。";
            int len = content.length();
            int endIndex = 0;
            // 每隔2个字符推送一次,模拟打字机效果
            while (endIndex < len) {
                endIndex = Math.min(endIndex + 2, len);
                final String subContent = content.substring(0, endIndex);
                // 将要推送的内容封装成JSON格式,模拟实际开发中的数据格式,非必须
                final JSONObject json = new JSONObject();
                json.put("data", subContent);
                json.put("code", HttpStatus.OK.value());
                // 最后一次推送时,type为finish,表示推送结束,其它情况为add
                final String type = endIndex == len
                        ? "finish"
                        : "add";
                json.put("type", type);
                // 组装成SSE格式的数据,发送给前端,这个格式(data: content\n\n)是固定的,content是自定义的推送内容
                writer.write("data: " + json.toJSONString() + "\n\n");
                writer.flush();
                // 稍微给点停顿,防止数据发送太快,浏览器接收不过来
                TimeUnit.MILLISECONDS.sleep(100);
            }
        } catch (Exception e) {
            Thread.currentThread().interrupt();
            log.error("流式推送数据异常", e);
        }
        //采用异步 SSE,线程池单独控制
//        CompletableFuture.runAsync(() -> {
        //
        //        });
    }

WebSocket 代码示例可以参考:/Users/zhihaozhao/IdeaProjects/ingeek-robot/ingeek-robot-manage/ingeek-robot-manage-srv/src/main/java/com/ingeek/robot/controller/test/WsController.java

@ServerEndpoint("/websocket/test")
public class WsController {

    // 用于存储所有连接的WebSocket会话
    private static final Map<String, Session> sessions = new ConcurrentHashMap<>();

    // 用于执行异步任务的线程池
    private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

    /**
     * 连接建立成功时调用
     *
     * @param session WebSocket会话
     */
    @OnOpen
    public void onOpen(Session session) {
        // 将会话添加到会话集合中
        sessions.put(session.getId(), session);
        log.info("新的WebSocket连接已建立,sessionId: {}", session.getId());

        // 发送连接成功的消息
        JSONObject message = new JSONObject();
        message.put("type", "connection");
        message.put("status", "success");
        message.put("message", "连接已建立");
        message.put("sessionId", session.getId());
        sendMessage(session, message.toJSONString());
    }

    /**
     * 收到客户端消息时调用
     *
     * @param message 客户端发送的消息
     * @param session WebSocket会话
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到客户端消息: {},sessionId: {}", message, session.getId());

        try {
            // 解析客户端发送的JSON消息
            JSONObject request = JSONObject.parseObject(message);
            String action = request.getString("action");

            switch (action) {
                case "startOperation":
                    // 启动异步操作
                    startAsyncOperation(session, request);
                    break;
                case "ping":
                    // 心跳检测
                    JSONObject pong = new JSONObject();
                    pong.put("type", "pong");
                    sendMessage(session, pong.toJSONString());
                    break;
                default:
                    // 未知操作
                    JSONObject unknown = new JSONObject();
                    unknown.put("type", "error");
                    unknown.put("message", "未知的操作: " + action);
                    sendMessage(session, unknown.toJSONString());
            }
        } catch (Exception e) {
            log.error("处理客户端消息时发生异常", e);
            JSONObject error = new JSONObject();
            error.put("type", "error");
            error.put("message", "处理消息时发生异常: " + e.getMessage());
            sendMessage(session, error.toJSONString());
        }
    }

    /**
     * 连接关闭时调用
     *
     * @param session WebSocket会话
     * @param closeCode 关闭码
     * @param reason 关闭原因
     * @param remote 是否由远程关闭
     */
    @OnClose
    public void onClose(Session session, CloseReason closeCode, String reason, boolean remote) {
        // 从会话集合中移除
        sessions.remove(session.getId());
        log.info("WebSocket连接已关闭,sessionId: {},关闭码: {},原因: {},远程关闭: {}",
                session.getId(), closeCode.getCloseCode(), reason, remote);
    }

    /**
     * 发生错误时调用
     *
     * @param session WebSocket会话
     * @param error 错误信息
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("WebSocket发生错误,sessionId: {}", session.getId(), error);

        // 尝试通知客户端发生了错误
        try {
            JSONObject errorMessage = new JSONObject();
            errorMessage.put("type", "error");
            errorMessage.put("message", "服务器发生错误: " + error.getMessage());
            sendMessage(session, errorMessage.toJSONString());
        } catch (Exception e) {
            log.error("发送错误消息时发生异常", e);
        }

        // 关闭连接
        try {
            session.close();
        } catch (IOException e) {
            log.error("关闭WebSocket会话时发生异常", e);
        }

        // 从会话集合中移除
        sessions.remove(session.getId());
    }

    /**
     * 模拟执行实际操作(比如调用车端接口)
     * 在实际应用中,这里会替换为真实的业务逻辑
     *
     * @return 操作是否成功
     */
    private boolean performActualOperation() {
        // 这里实现实际的业务逻辑,比如调用车端接口
        // 模拟:90%概率成功,10%概率失败
        return Math.random() > 0.1;
    }

    /**
     * 向指定会话发送消息
     *
     * @param sessionId 会话ID
     * @param message 消息内容
     */
    private void sendMessageToSession(String sessionId, String message) {
        Session session = sessions.get(sessionId);
        if (session != null && session.isOpen()) {
            sendMessage(session, message);
        }
    }

    /**
     * 向WebSocket会话发送消息
     *
     * @param session WebSocket会话
     * @param message 消息内容
     */
    private void sendMessage(Session session, String message) {
        try {
            if (session.isOpen()) {
                session.getBasicRemote().sendText(message);
            }
        } catch (Exception e) {
            log.error("向WebSocket会话发送消息失败,sessionId: {}", session.getId(), e);
        }
    }

    /**
     * 获取当前连接数
     *
     * @return 当前连接数
     */
    public static int getCurrentConnectionCount() {
        return sessions.size();
    }

    /**
     * 向所有连接发送广播消息
     *
     * @param message 消息内容
     */
    public static void broadcastMessage(String message) {
        for (Session session : sessions.values()) {
            try {
                if (session.isOpen()) {
                    session.getBasicRemote().sendText(message);
                }
            } catch (Exception e) {
                log.error("广播消息时发生异常,sessionId: {}", session.getId(), e);
            }
        }
    }
}

WebSocket 和 SSE 的区别是什么:

  1. SSE 是单向的 HTTP 流,更适合客户端一次连接,服务端直接持续有数据流推送的场景。比如和大模型对话的场景就是非常典型的场景。

  2. SSE 因为是单向的 HTTP 流,所以容易收到网关等代理的影响导致断开。且由于SSE 单向 HTTP 流的机制,导致 SSE 断开后,服务端实际无法感知到连接断开,所以服务端还会持续推送数据下去,但实际客户端已经无法接受到消息了。

  3. SSE 不适合等待时间较久的的场景,比如服务端有尝试车端是否 TSP 推送成功的检测和重试,此时仍然保持这个连接是不合适的。

  4. WebSocket 连接建立后更加稳定,可以长时间保持,且无具体网络事件时,并不占用具体工作线程,所以可以稳定建立多个网络通道的长链接。

  5. WebSocket 长链接建立后,客户端断开后,服务端可以实时感知客户端的在线情况。客户端断开后可以及时发现,导致资源不会持续的被浪费。

  6. 针对有服务端重试和网络长链接建立的场景,应该优先使用 WebSocket.

但是 WebSocket 的问题则是,一个 Tomcat 服务器只能维护一组长链接会话,针对我们几十万设备客户端的场景,必须构建对应的 WebSocket 集群才行。

此时构建 WebSocket 集群后,不同的 Tomcat 容器维护不同的客户端Session 连接。

当TSP 下车端完成后,此时服务端要通知具体的客户端事件成功或失败的通知时,需要先从 Redis 中获取客户端设备和 Session 所在 Tomcat 容器的具体映射关系。

当明确该设备的 Session 会话是维护在 Tomcat1 容器时,此时发送具体的消息通知给到 Tomcat1 容器,1 容器收到具体的指令后,将对应的指令结果推送给对应的客户端设备。

所以构建长链接的 WebSocket 集群是效果最佳的方式,需要引入 Redis + MQ 才行。所以设备和 Session 的会话映射关系全部维护到 Redis,通知具体设备端时,先从 Redis 获取设备端 Session 所在的服务器地址。然后发送 MQ 消息给到具体的服务器,服务器接受后推送消息给设备端。

SSE 流程:

@RestController
public class SseController {
    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();

    // 客户端订阅
    @GetMapping("/sse")
    public SseEmitter subscribe() {
        SseEmitter emitter = new SseEmitter(30_000L); // 超时30秒
        emitters.add(emitter);
        
        emitter.onCompletion(() -> emitters.remove(emitter));
        emitter.onTimeout(() -> emitters.remove(emitter));
        
        return emitter;
    }

    // 服务端推送消息
    public void broadcast(String data) {
        emitters.forEach(emitter -> {
            try {
                emitter.send(SseEmitter.event()
                    .data(data)
                    .id(UUID.randomUUID().toString())
                    .name("message"));
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        });
    }
}


Comment