SpringBoot实现SSE消息推送
参考文章:Springboot整合SSE
什么是SSE
参考连接:Web 实时消息推送详解
服务器发送事件(Server-Sent Events),简称 SSE。这是一种服务器端到客户端(浏览器)的单向消息推送。SSE 基于 HTTP 协议的,我们知道一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的,但 SSE 是个例外,它变换了一种思路。
SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream
类型的数据流信息,在有数据变更时从服务器流式传输到客户端。
整体的实现思路有点类似于在线视频播放,视频流会连续不断的推送到浏览器,你也可以理解成,客户端在完成一次用时很长(网络不畅)的下载。
SSE 与 WebSocket 作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,但还是有些许不同:
- SSE 是基于 HTTP 协议的,它们不需要特殊的协议或服务器实现即可工作;WebSocket 需单独服务器来处理协议。
- SSE 单向通信,只能由服务端向客户端单向通信;WebSocket 全双工通信,即通信的双方可以同时发送和接受信息。
- SSE 实现简单开发成本低,无需引入其他组件;WebSocket 传输数据需做二次解析,开发门槛高一些。
- SSE 默认支持断线重连;WebSocket 则需要自己实现。
- SSE 只能传送文本消息,二进制数据需要经过编码后传送;WebSocket 默认支持传送二进制数据。
代码实现
后端创建SSE服务
package com.amitu.sse.controller;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* sse server demo
*
* @author junpeng.li
* @date 2024-07-30 10:54
*/
@RestController
@RequestMapping("/sse")
public class SseController {
@PostMapping(value = "/test", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter testSse() {
// 设置超时时间,0表示不超时,默认值是30秒
// SseEmitter emitter = new SseEmitter(60 * 60 *1000);
SseEmitter emitter = new SseEmitter(60 * 60 *1000);
new Thread(() -> {
try {
// 模拟1秒发送一次消息
for (int i = 0; i < 10; i++) {
emitter.send("第" + i + "次发送消息");
Thread.sleep(1000);
}
// 关闭
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}
后端请求SSE接口
现在大部分第三方的AI大模型服务接口,都支持SSE方式调用。以下示例使用hutool工具包的HttpUtil
工具类实现。
package com.amitu.sse.demo;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* sse api call demo
*
* @author junpeng.li
* @date 2024-07-30 10:54
*/
public class SseApiDemo {
public static void main(String[] args) {
HttpRequest request = HttpUtil.createPost("http://localhost:8080/sse/test");
request.header("Authorization", "Bearer 1234567890");
try (HttpResponse response = request.execute(true);
InputStream inputStream = response.bodyStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))
) {
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
前端请求SSE接口
示例中使用了axios
第三方库。
// 创建axios实例
const request = axios.create({
baseURL: 'http://localhost:8080,
headers: {
'Content-Type': 'application/json;charset=utf-8'
}
})
request.post('/sse/test', null, {
responseType: 'stream',
onDownloadProgress: ({ event }) => {
const text = event.target.responseText as string
console.log(text)
}
})
nginx 配置
如果SSE服务使用nginx进行部署或者转发,需要修改一下nginx的配置项,否则接口没有响应。以下配置项建议配置到具体的接口上,不要把SSE配置到整个项目接口中。
配置项如下:
关闭缓存:为了确保SSE正常工作,需要禁用Nginx的缓存功能。通过
proxy_buffering off;
和proxy_cache off;
设置来关闭缓存。设置连接升级:SSE需要使用持续连接,因此需要设置Upgrade和Connection请求头。确保配置中包含
Upgrade $http_upgrade;
和Connection 'upgrade';
。调整超时设置:确保设置适当的超时时长,以避免SSE连接时间过短导致异常。您可以根据需要调整
proxy_connect_timeout
和proxy_read_timeout
等设置。
nginx.conf
示例代码:
location /sse/test {
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Connection '';
# 重要:确保使用HTTP/1.1协议
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
# 添加以下配置以处理SSE
proxy_buffering off;
proxy_cache off;
}