Search K
Appearance
Appearance
前不久用SpringBoot
和EventSource
实现了一个流式应用的Demo
,那时候使用的是SseEmitter
,发送出去的数据是满足SSE
标准格式的数据,故而前端可以直接使用EventSource
做数据处理。但是当后端发送过来的流式数据非标准协议格式,那么就无法使用EventSource
了,下面介绍如何使用fetch
和ReadableStream
对接后端自定义数据的流式接口。
在实现SSE
(Server-Sent Events
,服务器推送事件)流式接口时,EventSource
和 Fetch API
是两种常用方案,它们在设计目标、使用场景和功能特性上有显著差异。以下从 优缺点 和 核心区别 两方面详细对比:
维度 | EventSource | Fetch API(配合 ReadableStream) |
---|---|---|
设计目标 | 专为 SSE 设计的原生 API,简化流式通信 | 通用 HTTP 请求 API,通过流处理实现 SSE |
API 复杂度 | 极简,基于事件监听(onmessage/onopen) | 较底层,需手动处理流读取、解析、重连等 |
自动重连 | 内置自动重连机制(断连后自动重试) | 需手动实现重连逻辑(如监听流关闭事件) |
SSE 格式解析 | 自动解析 SSE 协议格式(data/event/id) | 需手动解析流数据(处理换行、字段拆分等) |
跨域支持 | 有限制(默认不发送凭证,需显式配置) | 完全支持自定义跨域参数(credentials 等) |
请求控制 | 仅支持 GET 方法,无法自定义复杂头信息 | 支持任意 HTTP 方法、自定义头、AbortController 取消请求 |
错误信息粒度 | 错误事件信息有限(难以区分具体错误类型) | 可通过 response.status/statusText 获取详细错误 |
浏览器兼容性 | 现代浏览器支持(IE 完全不支持) | 支持更广泛(现代浏览器均支持,包括部分旧版本) |
优点:
开箱即用的 SSE 适配:是 HTML5 标准中为 SSE 量身设计的 API,无需手动处理流的底层细节(如连接建立、数据分片读取)。
自动重连机制:当连接意外断开(如网络波动),会自动触发重连,且支持服务器通过 retry
字段指定重连间隔(毫秒),极大简化稳定性保障。
自动解析 SSE 格式:服务器发送的 SSE 数据(如 data: xxx\n\n
、event: type\n
、id: 123\n
)会被自动解析,开发者直接通过事件的 data
、type
、lastEventId
属性获取信息,无需处理字符串拆分或格式校验。
轻量简洁:代码量极少,只需监听 onmessage
事件即可处理数据,学习成本低。
// EventSource 示例
const source = new EventSource('/sse-stream');
source.onmessage = (e) => console.log('接收数据:', e.data);
source.onerror = (e) => console.error('错误:', e);
缺点:
withCredentials: true
,但部分浏览器兼容性不佳)。onerror
事件无法区分具体错误类型(如 404 与 500 错误、网络中断与服务器主动关闭),调试困难。优点:
极高的灵活性:支持自定义请求方法(虽 SSE 通常用 GET,但可扩展)、请求头(如 Authorization: Bearer Token
)、跨域凭证(credentials: 'include'
)等,适配复杂认证或跨域场景。
精细的错误控制:可通过 response.status
判断 HTTP 状态码(如 200 正常、401 未授权),结合 response.ok
快速识别错误,便于针对性处理。
支持主动取消:通过 AbortController
可主动终止请求,适合用户手动关闭流的场景(如页面跳转时停止接收)。
通用流处理:不仅限于 SSE,可处理任意类型的流式响应(如二进制流),扩展性强。
// Fetch 实现 SSE 示例
const controller = new AbortController();
fetch('/sse-stream', {
signal: controller.signal,
headers: { 'Authorization': 'Bearer token' }
}).then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
// 循环读取流数据
const read = () => reader.read().then(({ done, value }) => {
if (done) return;
const data = decoder.decode(value);
console.log('接收数据:', data); // 需手动解析 SSE 格式
read();
});
read();
});
// 主动取消
// controller.abort();
缺点:
data
/event
/id
字段、处理多行数据拼接)、断连重连逻辑(如监听 done
状态后重试),代码复杂度高。ReadableStream
、TextDecoder
等 API,以及 SSE 协议细节(如换行符规则、字段格式),对新手不友好。简言之,EventSource
是“傻瓜式”的 SSE
解决方案,适合大多数简单场景;Fetch
是“手动挡”方案,适合需要深度定制的复杂场景,但需承担更高的开发成本。
这样实现的一个好处就是可以在parseSSEBuffer
函数中针对非标准的数据进行处理
fetch-sse.js
/**
* SSE POST 请求工具函数
* 基于 Fetch API 实现,支持流式接收数据并按 SSE 规范解析
* @param {Object} options - 请求配置
* @param {string} options.url - SSE 接口地址
* @param {Object} [options.data] - POST 请求体数据(会自动转为 JSON)
* @param {Object} [options.headers] - 自定义请求头
* @param {number} [options.timeout=30000] - 超时时间(毫秒)
* @param {Function} options.onMessage - 收到消息的回调 (data: 解析后的数据, event: 完整事件对象)
* @param {Function} [options.onOpen] - 连接成功回调
* @param {Function} [options.onError] - 错误回调 (error: 错误信息)
* @param {Function} [options.onClose] - 连接关闭回调
* @returns {Function} 取消请求的函数
*/
export function ssePost({
url,
data,
headers = {},
timeout = 30000,
onMessage,
onOpen,
onError,
onClose
}) {
let isAborted = false;
const controller = new AbortController();
const signal = controller.signal;
// 超时处理
const timeoutId = setTimeout(() => {
abort(new Error(`请求超时(${timeout}ms)`));
}, timeout);
// 取消请求的函数
function abort(error) {
if (isAborted) return;
isAborted = true;
clearTimeout(timeoutId);
controller.abort();
if (error && onError) {
onError(error);
}
}
// 开始请求
(async () => {
try {
// 发送 POST 请求
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
...headers
},
body: data ? JSON.stringify(data) : undefined,
signal,
cache: 'no-store' // 禁用缓存
});
// 检查连接状态
if (!response.ok) {
throw new Error(`HTTP 错误: ${response.status} ${response.statusText}`);
}
// 验证响应类型
const contentType = response.headers.get('Content-Type');
if (!contentType || !contentType.includes('text/event-stream')) {
throw new Error('响应不是 SSE 格式(text/event-stream)');
}
// 触发连接成功回调
if (onOpen) {
onOpen();
}
// 获取流读取器和解码器
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = ''; // 缓存未完成的消息片段
// 循环读取流数据
while (!isAborted) {
const {done, value} = await reader.read();
// 流结束
if (done) {
// 处理剩余缓冲数据
if (buffer.trim()) {
parseSSEBuffer(buffer, onMessage);
}
if (onClose) onClose();
return;
}
// 解码并追加到缓冲区
buffer += decoder.decode(value, {stream: true});
// 解析完整消息并更新缓冲区
buffer = parseSSEBuffer(buffer, onMessage);
}
} catch (error) {
if (!isAborted && onError) {
onError(error);
}
}
})();
return abort; // 返回取消函数
}
/**
* 解析 SSE 格式的缓冲区数据
* @param {string} buffer - 待解析的字符串缓冲区
* @param {Function} onMessage - 消息回调函数
* @returns {string} 未解析的剩余缓冲区
*/
function parseSSEBuffer(buffer, onMessage) {
// 按 SSE 消息结束符分割(\n\n)
const messageParts = buffer.split('\n');
// console.log("messageParts", messageParts)
// 最后一部分可能是不完整的消息,需要保留
const remaining = messageParts.pop() || '';
// 解析每个完整的消息
messageParts.forEach(rawMessage => {
if (!rawMessage.trim()) return;
const data = JSON.parse(rawMessage)
if (data.message.content) {
// console.log("message", data.message.content)
onMessage(data.message.content);
}
});
return remaining;
}
使用示例
if (typeof window !== 'undefined') {
// 发起 SSE POST 请求
const abort = ssePost({
url: '/api/sse-stream',
data: {
conversationId: '123',
query: '请流式返回数据'
},
headers: {
'Authorization': 'Bearer your-token'
},
onOpen: () => {
console.log('SSE 连接已建立');
},
onMessage: (data, event) => {
console.log(`收到 [${event.event}] 事件:`, data);
// 解析 JSON 数据(如果服务端返回 JSON 字符串)
try {
const jsonData = JSON.parse(data);
console.log('解析后的 JSON 数据:', jsonData);
} catch (e) {
// 非 JSON 格式直接使用
}
},
onError: (error) => {
console.error('SSE 错误:', error.message);
},
onClose: () => {
console.log('SSE 连接已关闭');
}
});
// 如需手动取消请求(如组件卸载时)
// setTimeout(() => {
// abort();
// }, 5000);
}
可以封装一个公共函数,模拟键盘打字将流式接口接收到的数据逐字渲染到网页中
// 新增状态标记
let isAborted = false;
let currentTimer = null;
// 2. 队列式typeWriter(使用外部滚动函数)
let typeQueue = [];
let isProcessing = false;
/**
* 自动滚动到容器底部
* @param {string|HTMLElement} container - 容器选择器或DOM元素
* @param {boolean} [smooth=false] - 是否平滑滚动
*/
export function scrollToBottom(container, smooth = false) {
const targetContainer = typeof container === 'string'
? document.querySelector(container)
: container;
if (!targetContainer) {
console.warn('滚动容器不存在');
return;
}
requestAnimationFrame(() => {
if (smooth) {
targetContainer.scrollTo({ top: targetContainer.scrollHeight, behavior: 'smooth' });
} else {
targetContainer.scrollTop = targetContainer.scrollHeight;
}
});
}
/**
* 终止所有打字任务
* @param {boolean} [clearContent=false] - 是否清空已输出内容
* @param {HTMLElement} [element] - 要清空内容的元素(clearContent为true时必填)
*/
export function abortTypeWriter(clearContent = false, element) {
isAborted = true;
typeQueue = [];
if (currentTimer) {
clearTimeout(currentTimer);
currentTimer = null;
}
if (clearContent && element && element instanceof HTMLElement) {
element.innerHTML = '';
}
isProcessing = false;
}
/**
* 重置打字机状态(用于终止后重新开始)
*/
export function resetTypeWriter() {
typeQueue = [];
isProcessing = false;
isAborted = false;
if (currentTimer) {
clearTimeout(currentTimer);
currentTimer = null;
}
}
export function typeWriter(text, element, speed = 50, scrollContainer = '#report-container') {
return new Promise(resolve => {
typeQueue.push({ text, element, speed, scrollContainer, resolve });
if (!isProcessing) {
processQueue();
}
});
}
async function processQueue() {
if (typeQueue.length === 0) {
isProcessing = false;
return;
}
isProcessing = true;
const { text, element, speed, scrollContainer, resolve } = typeQueue.shift();
try {
await writeText(text, element, speed, scrollContainer);
resolve();
} finally {
processQueue();
}
}
function writeText(text, element, speed, scrollContainer) {
return new Promise(resolve => {
let index = 0;
const originalHTML = element.innerHTML;
const writeChar = () => {
if (!element.isConnected) {
resolve();
return;
}
if (index < text.length) {
const char = text[index] === '\n' ? '<br>' :
text[index] === ' ' ? ' ' : text[index];
element.innerHTML = originalHTML + text.substring(0, index) + char;
index++;
// 调用外部滚动函数(逐字输出用即时滚动)
scrollToBottom(scrollContainer, false);
requestAnimationFrame(() => {
setTimeout(writeChar, speed);
});
} else {
element.innerHTML = originalHTML + text.replace(/\n/g, '<br>');
// 最终滚动(可选使用平滑滚动)
scrollToBottom(scrollContainer, false);
resolve();
}
};
writeChar();
});
}
如何使用
// reportDom 是要渲染的dom节点
onMessage: (message) => {
if (reportDom) {
typeWriter(message, reportDom)
} else {
console.log("dom不存在")
}
},