Skip to content

SpringBoot实现SSE消息推送


参考文章:Springboot整合SSE

什么是SSE

参考连接:Web 实时消息推送详解

服务器发送事件(Server-Sent Events),简称 SSE。这是一种服务器端到客户端(浏览器)的单向消息推送。SSE 基于 HTTP 协议的,我们知道一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的,但 SSE 是个例外,它变换了一种思路。

SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息,在有数据变更时从服务器流式传输到客户端。

整体的实现思路有点类似于在线视频播放,视频流会连续不断的推送到浏览器,你也可以理解成,客户端在完成一次用时很长(网络不畅)的下载。

SSE 与 WebSocket 作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,但还是有些许不同:

  • SSE 是基于 HTTP 协议的,它们不需要特殊的协议或服务器实现即可工作;WebSocket 需单独服务器来处理协议。
  • SSE 单向通信,只能由服务端向客户端单向通信;WebSocket 全双工通信,即通信的双方可以同时发送和接受信息。
  • SSE 实现简单开发成本低,无需引入其他组件;WebSocket 传输数据需做二次解析,开发门槛高一些。
  • SSE 默认支持断线重连;WebSocket 则需要自己实现。
  • SSE 只能传送文本消息,二进制数据需要经过编码后传送;WebSocket 默认支持传送二进制数据。

代码实现

后端创建SSE服务

java
package com.amitu.sse.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
 * sse server demo
 *
 * @author junpeng.li
 * @date 2024-07-30 10:54
 */
@RestController
@RequestMapping("/sse")
public class SseController {

    @PostMapping(value = "/test", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter testSse() {
        // 设置超时时间,0表示不超时,默认值是30秒
        // SseEmitter emitter = new SseEmitter(60 * 60 *1000);
        SseEmitter emitter = new SseEmitter(60 * 60 *1000);

        new Thread(() -> {
            try {
                // 模拟1秒发送一次消息
                for (int i = 0; i < 10; i++) {
                    emitter.send("第" + i + "次发送消息");
                    Thread.sleep(1000);
                }

                // 关闭
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        }).start();

        return emitter;
    } 
}

后端请求SSE接口

现在大部分第三方的AI大模型服务接口,都支持SSE方式调用。以下示例使用hutool工具包的HttpUtil工具类实现。

java
package com.amitu.sse.demo;

import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;

/**
 * sse api call demo
 *
 * @author junpeng.li
 * @date 2024-07-30 10:54
 */
public class SseApiDemo {

    public static void main(String[] args) {
        HttpRequest request = HttpUtil.createPost("http://localhost:8080/sse/test");
        request.header("Authorization", "Bearer 1234567890");

        try (HttpResponse response = request.execute(true);
             InputStream inputStream = response.bodyStream();
             BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))
        ) {
            String line;
            while ((line = reader.readLine()) != null) {
                System.out.println(line);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

前端请求SSE接口

示例中使用了axios第三方库。

ts
// 创建axios实例
const request = axios.create({
  baseURL: 'http://localhost:8080,
  headers: { 
    'Content-Type': 'application/json;charset=utf-8' 
  }
})

request.post('/sse/test', null, {
  responseType: 'stream',
  onDownloadProgress: ({ event }) => {
    const text = event.target.responseText as string
    console.log(text)
  }
})

nginx 配置

如果SSE服务使用nginx进行部署或者转发,需要修改一下nginx的配置项,否则接口没有响应。以下配置项建议配置到具体的接口上,不要把SSE配置到整个项目接口中。

配置项如下:

  • 关闭缓存:为了确保SSE正常工作,需要禁用Nginx的缓存功能。通过 proxy_buffering off;proxy_cache off; 设置来关闭缓存。

  • 设置连接升级:SSE需要使用持续连接,因此需要设置Upgrade和Connection请求头。确保配置中包含 Upgrade $http_upgrade;Connection 'upgrade';

  • 调整超时设置:确保设置适当的超时时长,以避免SSE连接时间过短导致异常。您可以根据需要调整 proxy_connect_timeoutproxy_read_timeout 等设置。

nginx.conf示例代码:

bash
location /sse/test {
    proxy_set_header Host $http_host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header Connection '';
    # 重要:确保使用HTTP/1.1协议
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection 'upgrade';
    # 添加以下配置以处理SSE
    proxy_buffering off;
    proxy_cache off;
}