Skip to content

Springboot实现对话流式输出

25年最热门的词就是AI,网络上也诞生了很多AI相关的应用,而Sse(服务器发送事件)广泛用于AI问答式应用中,所这篇文章就来实现一个简单的Sse示例

Sse(Server-Sent Events)

SSE(Server-Sent Events,服务器发送事件)是一种基于 HTTP 的服务器向客户端单向推送实时数据的技术,允许服务器主动向客户端发送信息,而无需客户端频繁发起请求。它在实时通讯、数据更新通知等场景中被广泛应用(例如股票行情实时刷新、新闻推送等)。

核心特点

  • 单向通信 数据只能从服务器主动推送到客户端,客户端无法通过 SSE 通道向服务器发送数据(若需双向通信,需结合其他技术如 AJAXWebSocket)。

  • 基于 HTTP 长连接 建立在标准 HTTP/HTTPS 协议之上,无需额外协议(如 WebSocket 需要单独握手),兼容性更好,且可利用 HTTP 现有的缓存、代理、认证机制。

  • 文本数据格式 传输的数据以 UTF-8 编码的文本为主,格式有特定规范(见下文 “数据格式”),不支持二进制数据(如需二进制,可考虑 WebSocket)。

  • 自动重连机制 当连接断开时,客户端会自动尝试重连(默认时间间隔为 3 秒,可通过服务器发送的 retry 字段自定义)。

工作流程

1、客户端发起连接

客户端通过 JavaScript 创建 EventSource 对象,指定服务器端的 SSE 接口 URL(需同源,跨域需服务器配置 CORS):

js
const eventSource = new EventSource('/sse-endpoint');

2、服务器响应协议头

服务器接收到请求后,需返回特定的响应头,表明这是一个 SSE 连接:

Content-Type: text/event-stream  // 必须,指定为 SSE 类型
Cache-Control: no-cache          // 禁止缓存,确保数据实时性
Connection: keep-alive           // 保持长连接
Access-Control-Allow-Origin: *   // 跨域时需配置

3、服务器推送数据 服务器按照 SSE 规范的格式向客户端发送数据,客户端通过 EventSource 的事件监听处理数据。

4、连接维护与关闭

  • 服务器可主动关闭连接(通过结束响应);

  • 客户端可调用 eventSource.close() 关闭连接;

  • 连接断开时,客户端自动重连(除非主动关闭)。

数据格式

字段名说明
data核心数据字段,可多行(多行时每行前都需加 data:),最终拼接为完整数据。
event自定义事件名,客户端可通过对应事件名监听(默认事件名为 message)。
id数据的唯一标识,客户端会记录最后接收的 id,重连时通过 Last-Event-ID 请求头告知服务器,便于服务器恢复数据。
retry指定客户端重连的时间间隔(毫秒),默认 3000 毫秒。

SSE与WebSocket的对比

特性SSEWebSocket
通信方向单向(服务器 → 客户端)双向
协议基础HTTP 长连接独立的 WebSocket 协议
数据格式仅文本(UTF-8)支持文本和二进制
自动重连内置支持需手动实现
兼容性几乎所有现代浏览器(IE 不支持)现代浏览器支持(IE 不支持)
适用场景实时通知、数据推送实时聊天、游戏等双向交互

代码实现

后端代码

com/zhaochao/ssedemo/model/ChatMessage.java

java
package com.zhaochao.ssedemo.model;

public class ChatMessage {
    private String sender;
    private String content;
    private String type; // USER, AI

    public ChatMessage(String sender, String content, String type) {
        this.sender = sender;
        this.content = content;
        this.type = type;
    }

    public String getSender() {
        return sender;
    }

    public String getContent() {
        return content;
    }

    public String getType() {
        return type;
    }

    public void setSender(String sender) {
        this.sender = sender;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public void setType(String type) {
        this.type = type;
    }
}

com/zhaochao/ssedemo/service/ChatService.java

java
package com.zhaochao.ssedemo.service;

import com.zhaochao.ssedemo.model.ChatMessage;
import com.zhaochao.ssedemo.utils.LogUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class ChatService {
    private final Map<String, SseEmitter> clients = new ConcurrentHashMap<>();

    // 注册客户端
    public SseEmitter registerClient(String userId){
        if(clients.containsKey(userId)){
            clients.get(userId).complete();
        }

        SseEmitter emitter = new SseEmitter(300_000L);
        clients.put(userId, emitter);

        // 设置回调
        emitter.onCompletion(() -> clients.remove(userId));
        emitter.onTimeout(() -> clients.remove(userId));
        emitter.onError(e -> clients.remove(userId));

        return emitter;
    }

    // 向特定用户发送消息
    public void sendMessageToUser(String userId, ChatMessage message){
        SseEmitter emitter = clients.get(userId);
        if(emitter != null){
            try {
                emitter.send(SseEmitter.event()
                        .id(String.valueOf(System.currentTimeMillis()))
                        .name("message")
                        .data(message));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    // 处理用户消息并生成回复
    public void processUserMessage(String userId, ChatMessage userMessage){
        // 1. 保存用户消息到数据库(实际应用中)

        // 2. 调用LLM API获取回复(异步处理)
        CompletableFuture.runAsync(() -> {
            try {
                System.out.println("content");
                System.out.println(userMessage.getContent());
                // 模拟调用LLM API
                String[] responseChunks = generateResponseChunks(userMessage.getContent());

                // 3. 流式发送回复
                for (String chunk : responseChunks) {
                    ChatMessage partialResponse = new ChatMessage(
                            "ai",
                            chunk,
                            "AI"
                    );
                    sendMessageToUser(userId, partialResponse);

                    Thread.sleep(200); // 控制流式速度
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    // 模拟LLM API响应(实际应用中替换为真实调用)
    private String[] generateResponseChunks(String prompt) {
        LogUtils.info("prompt: {}", prompt);
        String fullResponse = "感谢您的问题:" + prompt + "\n" +
                "这是一个流式响应示例,实际应用中可以调用真实的LLM API。\n" +
                "回复会分多次发送,模拟打字效果。";
        return fullResponse.split("\n");
    }
}

com/zhaochao/ssedemo/controller/ChatController.java

java
package com.zhaochao.ssedemo.controller;

import com.zhaochao.ssedemo.model.ChatMessage;
import com.zhaochao.ssedemo.service.ChatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Slf4j
@RestController
@RequestMapping("/api/chat")
public class ChatController {

    @Autowired
    private ChatService chatService;

    // 建立SSE连接(从请求中获取用户ID)
    @GetMapping(path = "/stream",produces = "text/event-stream")
    public SseEmitter streamSseMvc(@RequestParam("userId") String userId){
        return chatService.registerClient(userId);
    }

    // 接收用户消息,从请求中获取用户id
    @PostMapping("/send")
    public void handleUserMessage(@RequestHeader("X-User-Id") String userId, @RequestBody ChatMessage message){
        // 处理用户消息
        chatService.processUserMessage(userId,message);
    }

}

前端代码

html
<!DOCTYPE html>
<html lang="zh-CN">

<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>一对一流式对话助手</title>
  <!-- 引入Tailwind CSS用于样式开发 -->
  <script src="https://cdn.tailwindcss.com"></script>
  <!-- 已注释的Font Awesome图标库(按需启用) -->
  <!-- <link href="https://cdn.tailwindcss.com"></script> -->
  
  <!-- 配置Tailwind自定义主题 -->
  <script>
    tailwind.config = {
      theme: {
        extend: {
          // 自定义颜色变量
          colors: {
            primary: '#165DFF',     // 主色调(蓝色,用于用户相关元素)
            secondary: '#36D399',   // 辅助色(绿色,用于AI相关元素)
            neutral: '#F3F4F6',     // 中性色
            dark: '#1F2937'         // 深色
          },
          // 自定义字体
          fontFamily: {
            inter: ['Inter', 'system-ui', 'sans-serif'],
          },
        }
      }
    }
  </script>
  
  <!-- 自定义Tailwind工具类 -->
  <style type="text/tailwindcss">
    @layer utilities {
      .content-auto {
        content-visibility: auto;  /* 优化内容可见性 */
      }

      .scrollbar-hide {
        /* 隐藏滚动条(跨浏览器兼容) */
        -ms-overflow-style: none;
        scrollbar-width: none;
      }

      .scrollbar-hide::-webkit-scrollbar {
        display: none;  /* 隐藏WebKit浏览器滚动条 */
      }

      .message-appear {
        animation: fadeIn 0.3s ease-in-out;  /* 消息出现动画 */
      }

      @keyframes fadeIn {
        from {
          opacity: 0;
          transform: translateY(10px);  /* 从下方淡入 */
        }

        to {
          opacity: 1;
          transform: translateY(0);
        }
      }

      .typing-cursor {
        animation: blink 1s infinite;  /* 光标闪烁动画 */
      }

      @keyframes blink {
        0%, 100% { opacity: 1; }
        50% { opacity: 0; }
      }
    }
  </style>
</head>

<body class="bg-gray-50 font-inter min-h-screen">
  <!-- 主容器:居中布局,响应式设计 -->
  <div class="max-w-4xl mx-auto p-4 md:p-6 flex flex-col min-h-screen">
    <!-- 聊天窗口标题栏 -->
    <header class="bg-white rounded-t-lg shadow-sm p-4 mb-4 flex items-center justify-between">
      <div class="flex items-center">
        <!-- 图标容器 -->
        <div class="w-10 h-10 rounded-full bg-primary flex items-center justify-center text-white">
          <i class="fa fa-comments"></i>  <!-- 对话图标(依赖Font Awesome) -->
        </div>
        <h1 class="ml-3 text-xl font-semibold text-gray-800">一对一流式对话助手</h1>
      </div>
      <!-- 状态信息 -->
      <div class="text-sm text-gray-500">
        用户ID: <span id="user-id" class="font-mono"></span>  <!-- 显示用户唯一标识 -->
        | 连接状态: <span id="connection-status" class="text-green-500">已连接</span>  <!-- 显示SSE连接状态 -->
      </div>
    </header>

    <!-- 聊天消息显示区域 -->
    <main class="flex-grow bg-white rounded-md shadow-sm overflow-hidden mb-4">
      <div id="chat-messages" class="h-[500px] overflow-y-auto p-4 scrollbar-hide space-y-4">
        <!-- 系统默认欢迎消息 -->
        <div class="message-appear">
          <div class="flex items-start">
            <!-- 系统图标 -->
            <div class="w-8 h-8 rounded-full bg-gray-200 flex items-center justify-center text-gray-600 mr-2 flex-shrink-0">
              <i class="fa fa-info-circle"></i>  <!-- 信息图标 -->
            </div>
            <!-- 消息内容 -->
            <div class="bg-gray-100 rounded-lg p-3 max-w-[90%]">
              <div class="text-xs text-gray-500 mb-1">系统消息</div>
              <p class="text-gray-800">欢迎使用一对一流式对话助手!您可以开始提问,系统将实时回复。</p>
            </div>
          </div>
        </div>
      </div>
    </main>

    <!-- 输入区域 -->
    <footer class="bg-white rounded-b-lg shadow-sm p-4">
      <div class="flex">
        <!-- 消息输入框 -->
        <input type="text" id="message-input"
               class="flex-grow px-4 py-2 border border-gray-300 rounded-l-lg focus:outline-none focus:ring-2 focus:ring-primary/50 focus:border-primary transition-all"
               placeholder="输入消息..." autocomplete="off">
        
        <!-- 发送按钮 -->
        <button id="send-button"
                class="bg-primary hover:bg-primary/90 text-white px-4 py-2 rounded-r-lg transition-all flex items-center justify-center">
          <i class="fa fa-paper-plane mr-2"></i> 发送  <!-- 纸飞机图标+文字 -->
        </button>
      </div>
      <!-- 操作提示 -->
      <div class="text-xs text-gray-500 mt-2">按 Enter 发送消息,Shift+Enter 换行</div>
    </footer>
  </div>

  <script>
    // DOM加载完成后执行初始化
    document.addEventListener('DOMContentLoaded', () => {
      // 获取DOM元素
      const messageInput = document.getElementById('message-input');    // 消息输入框
      const sendButton = document.getElementById('send-button');        // 发送按钮
      const chatMessages = document.getElementById('chat-messages');    // 消息显示容器
      const connectionStatus = document.getElementById('connection-status');  // 连接状态
      const userIdElement = document.getElementById('user-id');          // 用户ID显示

      // 后端服务地址(需替换为实际后端接口地址)
      const BACKEND_URL = 'http://xx.xx.xx.xx:3000';

      // 变量初始化
      let eventSource = null;                // SSE连接实例
      let isStreaming = false;               // 是否正在流式输出
      let currentStreamMessageId = '';       // 当前流式消息的DOM ID
      let currentStreamContent = '';         // 当前流式消息内容
      let currentCharIndex = 0;              // 打字动画的字符索引
      let typingTimer = null;                // 打字动画定时器
      let currentCursorElement = null;       // 当前闪烁的光标元素

      // 生成或获取用户ID(通过localStorage持久化,确保会话一致性)
      const userId = localStorage.getItem('userId') || generateUserId();
      localStorage.setItem('userId', userId);
      userIdElement.textContent = userId;

      // 初始化SSE连接
      connectEventSource();

      /**
       * 发送消息到后端
       */
      function sendMessage() {
        const content = messageInput.value.trim();
        if (!content) return;  // 空内容不发送

        // 在界面添加用户消息
        appendMessage('user', content);
        messageInput.value = '';  // 清空输入框

        // 调用后端API发送消息
        fetch(`${BACKEND_URL}/api/chat/send`, {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
            'X-User-Id': userId  // 在请求头携带用户ID
          },
          body: JSON.stringify({
            sender: 'user',
            content: content,
            type: 'USER'
          })
        });

        // 添加"正在思考"提示
        appendMessage('ai', '<div class="animate-pulse">正在思考...</div>');
      }

      /**
       * 建立SSE(Server-Sent Events)连接
       * 用于接收后端实时推送的消息
       */
      function connectEventSource() {
        // 关闭已有连接,避免重复连接
        if (eventSource) {
          eventSource.close();
        }

        // 建立新的SSE连接(URL中携带用户ID)
        eventSource = new EventSource(`${BACKEND_URL}/api/chat/stream?userId=${userId}`);

        /**
         * 处理接收到的SSE消息
         */
        eventSource.onmessage = function (event) {
          try {
            const message = JSON.parse(event.data);  // 解析JSON消息

            if (message.type === 'USER') return;  // 过滤用户自己发送的消息

            // 处理AI消息
            if (message.type === 'AI') {
              // 移除"正在思考"提示
              const thinkingElement = chatMessages.lastElementChild;
              if (thinkingElement && thinkingElement.textContent.includes('正在思考')) {
                chatMessages.removeChild(thinkingElement);
              }
              // 启动打字机效果显示AI回复
              startTypingEffect(message.content); 
            }
          } catch (error) {
            console.error('解析消息失败:', error);
            appendMessage('system', `接收消息失败: ${error.message}`);
          }
        };

        /**
         * SSE连接成功回调
         */
        eventSource.onopen = function () {
          connectionStatus.textContent = '已连接';
          connectionStatus.className = 'text-green-500';
          console.log('SSE 连接已建立');
        };

        /**
         * SSE连接错误回调(自动重试)
         */
        eventSource.onerror = function () {
          connectionStatus.textContent = '连接断开,正在重试...';
          connectionStatus.className = 'text-red-500';
          console.error('SSE 连接错误');
          // 5秒后重试连接
          setTimeout(connectEventSource, 5000);
        };
      }

      /**
       * 打字机效果实现
       * @param {string} content - 要显示的完整内容
       */
      function startTypingEffect(content) {
        console.log("接收的内容:", content)
        
        // 停止当前正在进行的打字动画
        if (typingTimer) {
          clearInterval(typingTimer);
          if (currentCursorElement) {
            currentCursorElement.remove();  // 移除旧光标
            currentCursorElement = null;
          }
        }

        // 初始化状态
        isStreaming = true;
        currentStreamMessageId = `stream-${Date.now()}`;  // 生成唯一ID
        currentStreamContent = '';
        currentCharIndex = 0;

        // 创建空的消息容器
        appendMessage('ai', '', currentStreamMessageId);
        const streamElement = document.getElementById(currentStreamMessageId);

        // 将内容拆分为字符数组(支持中文等多字节字符)
        const characters = Array.from(content);

        // 启动打字定时器(每20ms输出一个字符)
        typingTimer = setInterval(() => {
          if (currentCharIndex < characters.length) {
            // 拼接当前字符
            currentStreamContent += characters[currentCharIndex++];
            
            if (streamElement) {
              // 移除旧光标
              if (currentCursorElement) {
                currentCursorElement.remove();
              }
              
              // 更新消息内容(处理换行和空格)
              streamElement.innerHTML = currentStreamContent
               .replace(/\n/g, '<br>')  // 替换换行符为HTML换行
               .replace(/\s/g, '&nbsp;');  // 替换空格为非换行空格
              
              // 创建并添加新光标
              const cursor = document.createElement('span');
              cursor.className = 'typing-cursor';
              cursor.textContent = '|';
              streamElement.appendChild(cursor);
              currentCursorElement = cursor;
            }
          } else {
            // 打字完成,清理定时器和光标
            clearInterval(typingTimer);
            if (streamElement && currentCursorElement) {
              currentCursorElement.remove();
              currentCursorElement = null;
            }
            isStreaming = false;
          }
        }, 20);  // 打字速度(毫秒/字符)
      }

      /**
       * 向聊天窗口添加消息
       * @param {string} sender - 发送者('user'或'ai'或'system')
       * @param {string} content - 消息内容
       * @param {string} messageId - 消息DOM元素ID(可选)
       */
      function appendMessage(sender, content, messageId = null) {
        const isUser = sender === 'user';  // 判断是否为用户消息
        const messageDiv = document.createElement('div');
        messageDiv.className = 'message-appear';  // 添加出现动画
        if (messageId) {
          messageDiv.id = messageId;  // 设置ID(用于流式更新)
        }

        // 根据发送者设置不同样式
        const avatarClass = isUser 
         ? 'bg-primary text-white' 
          : 'bg-secondary text-white';
        const avatarIcon = isUser? 'fa-user' : 'fa-robot';
        const messageBubbleClass = isUser 
         ? 'bg-primary/10' 
          : 'bg-secondary/10';
        // 用户消息居右,AI消息居左
        const alignClass = isUser? 'justify-end' : 'justify-start'; 

        // 构建消息HTML结构
        messageDiv.innerHTML = `
          <div class="flex items-start ${alignClass}">
            <!-- 头像 -->
            <div class="w-8 h-8 rounded-full ${avatarClass} flex items-center justify-center text-white mr-2 flex-shrink-0">
              <i class="fa ${avatarIcon}"></i>
            </div>
            <!-- 消息气泡 -->
            <div class="${messageBubbleClass} rounded-lg p-3 max-w-[90%]">
              <div class="text-xs text-gray-500 mb-1">${isUser? '你' : '助手'}</div>
              <p class="text-gray-800">${content}</p>
            </div>
          </div>
        `;

        // 添加到聊天窗口并滚动到底部
        chatMessages.appendChild(messageDiv);
        chatMessages.scrollTop = chatMessages.scrollHeight;
      }

      /**
       * 生成随机用户ID
       * @returns {string} 随机用户ID
       */
      function generateUserId() {
        return 'user-' + Date.now().toString(36) + Math.random().toString(36).substring(2, 8);
      }

      // 绑定事件监听
      sendButton.addEventListener('click', sendMessage);  // 点击发送按钮
      
      // 键盘事件(按Enter发送,Shift+Enter换行)
      messageInput.addEventListener('keydown', (e) => {
        if (e.key === 'Enter' &&!e.shiftKey) {
          e.preventDefault();  // 阻止默认换行
          sendMessage();
        }
      });

      // 页面关闭时清理资源
      window.addEventListener('beforeunload', () => {
        if (eventSource) {
          eventSource.close();  // 关闭SSE连接
        }
        if (typingTimer) {
          clearInterval(typingTimer);  // 清除定时器
          if (currentCursorElement) {
            currentCursorElement.remove();
          }
        }
      });
    });
  </script>
</body>

</html>

上次更新于: