可以先参考这个对 SSE 有基本概念:
https://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
https://juejin.cn/post/7331726568363311115
实现一个 HTTP 工作线程同步阻塞方式的 SSE 很简单,代码直接这样写就行:
@GetMapping("/test")
public void test(HttpServletResponse response) {
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
try (final PrintWriter writer = response.getWriter()) {
// 要推送的内容
final String content = "测试一下 SSE 的推送效果,打字机方式输出文本结果。";
int len = content.length();
int endIndex = 0;
// 每隔2个字符推送一次,模拟打字机效果
while (endIndex < len) {
endIndex = Math.min(endIndex + 2, len);
final String subContent = content.substring(0, endIndex);
// 将要推送的内容封装成JSON格式,模拟实际开发中的数据格式,非必须
final JSONObject json = new JSONObject();
json.put("data", subContent);
json.put("code", HttpStatus.OK.value());
// 最后一次推送时,type为finish,表示推送结束,其它情况为add
final String type = endIndex == len
? "finish"
: "add";
json.put("type", type);
// 组装成SSE格式的数据,发送给前端,这个格式(data: content\n\n)是固定的,content是自定义的推送内容
writer.write("data: " + json.toJSONString() + "\n\n");
writer.flush();
// 稍微给点停顿,防止数据发送太快,浏览器接收不过来
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error("流式推送数据异常", e);
}
}
浏览器直接请求该 HTTP 地址,就能看到 SSE 流式输出的效果。
但是这种方式就会有一个问题,Tomcat 调度过来的 HTTP线程就会一直被同步执行,直到当前代码块完全执行完,该 HTTP 线程才会被回收到线程池中。
有没有一种方式实现 WebSocket 的效果呢,WebSocket 效果简单来说就是,连接建立以后该连接会持续被保持打开状态。当有新的消息发送上来后,检测到网络端口有新的消息后,此时 Tomcat 将该消息丢给对应的工作线程池,工作线程池中调度空闲线程,去执行该@Message 方法标记的方法。
简单来说就是 NIO,所有的网络通道保持都注册到对应的轮训器select 上,监听各个通道的变化,只要有消息产生,此时再去调度具体的工作线程去处理。此处 SSE 想要实现这个效果也很简单,只要使用Spring 提供的异步处理机制,它能正确处理异步响应并保持连接开放直到异步任务完成。
代码如下:
private static final Long AUTO_SSE_TIMEOUT = 60 * 60 * 1000L;
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
private static final long HEARTBEAT_INTERVAL = 10_000L; // 10秒心跳间隔
/**
* 开启SSE心跳
* @param emitter
* @param requestId
* @return
*/
private ScheduledFuture<?> startHeartbeat(SseEmitter emitter, String requestId) {
return executor.scheduleAtFixedRate(() -> {
try {
// 发送心跳消息
log.info("{} send heartbeat", requestId);
emitter.send("heartbeat");
}
catch (Exception e) {
// 发送心跳失败,关闭连接
log.error("{} heartbeat failed, closing connection", requestId, e);
emitter.completeWithError(e);
}
}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
}
/**
* 注册SSE事件
* @param emitter
* @param requestId
* @param heartbeatFuture
*/
private void registerSSEMonitor(SseEmitter emitter, String requestId, ScheduledFuture<?> heartbeatFuture) {
// 监听SSE异常事件
emitter.onCompletion(() -> {
log.info("{} SSE connection completed normally", requestId);
heartbeatFuture.cancel(true);
});
// 监听连接超时事件
emitter.onTimeout(() -> {
log.info("{} SSE connection timed out", requestId);
heartbeatFuture.cancel(true);
emitter.complete();
});
// 监听连接错误事件
emitter.onError((ex) -> {
log.info("{} SSE connection error: ", requestId, ex);
heartbeatFuture.cancel(true);
emitter.completeWithError(ex);
});
}
@GetMapping("/asyncTest")
public SseEmitter asyncTest() {
log.info("asyncTest thread = >> " + Thread.currentThread());
String requestId = UUID.randomUUID().toString();
SseEmitter emitter = new SseEmitter(AUTO_SSE_TIMEOUT);
// SSE心跳
ScheduledFuture<?> heartbeatFuture = startHeartbeat(emitter, requestId);
// 监听SSE事件
registerSSEMonitor(emitter, requestId, heartbeatFuture);
CompletableFuture.runAsync(() -> {
try {
// 要推送的内容: (SSE 效果推送)
final String content = "Test the push effect of SSE and output text results in typewriter mode.";
int len = content.length();
int endIndex = 0;
// 每隔2个字符推送一次,模拟打字机效果
while (endIndex < len) {
endIndex = Math.min(endIndex + 2, len);
final String subContent = content.substring(0, endIndex);
// 将要推送的内容封装成JSON格式,模拟实际开发中的数据格式,非必须
final JSONObject json = new JSONObject();
json.put("data", subContent);
json.put("code", HttpStatus.OK.value());
// 最后一次推送时,type为finish,表示推送结束,其它情况为add
final String type = endIndex == len
? "finish"
: "add";
json.put("type", type);
// 组装成SSE格式的数据,发送给前端,这个格式(data: content\n\n)是固定的,content是自定义的推送内容
emitter.send("data: " + json.toJSONString() + "\n\n");
// 稍微给点停顿,防止数据发送太快,浏览器接收不过来
TimeUnit.MILLISECONDS.sleep(100);
}
emitter.complete();
}
catch (Exception e) {
emitter.completeWithError(e);
Thread.currentThread().interrupt();
log.error("流式推送数据异常", e);
}
});
log.info("asyncTest end thread = >> " + Thread.currentThread());
//controller 方法立即执行返回,此时该HTTP 工作线程会直接被释放。
return emitter;
}
其实代码本身很简单,只要看asyncTest即可;主要是返回一个SseEmitter,然后使用CompletableFuture.runAsync(),交给另外的线程来执行emitter.send()来推送数据。至于上面的 startHeartbeat 和 registerSSEMonitor方法,则是为了保持 SSE 的完整。
startHeartbeat是将该 SSE 注册到一个定时线程池中,保持 10 秒和客户端有一次心跳交互。registerSSEMonitor方法则是监听Emitter 的各种状态(SSE 异常、超时、连接错误)等状态。
通过上面代码的方式就可以实现一个完整的基于 Spring mvc 的异步 SSE,将具体使用 SSE send 推送数据的线程自行代码中控制。而不阻塞 Tomcat 容器的 HTTP 工作线程,达到支持更多并发的效果。