Skip to content

fetch实现sse流式

前不久用SpringBootEventSource实现了一个流式应用的Demo,那时候使用的是SseEmitter,发送出去的数据是满足SSE标准格式的数据,故而前端可以直接使用EventSource做数据处理。但是当后端发送过来的流式数据非标准协议格式,那么就无法使用EventSource了,下面介绍如何使用fetchReadableStream对接后端自定义数据的流式接口。

在实现SSEServer-Sent Events,服务器推送事件)流式接口时,EventSourceFetch API 是两种常用方案,它们在设计目标、使用场景和功能特性上有显著差异。以下从 优缺点核心区别 两方面详细对比:

一、核心区别与特性对比

维度EventSourceFetch API(配合 ReadableStream)
设计目标专为 SSE 设计的原生 API,简化流式通信通用 HTTP 请求 API,通过流处理实现 SSE
API 复杂度极简,基于事件监听(onmessage/onopen)较底层,需手动处理流读取、解析、重连等
自动重连内置自动重连机制(断连后自动重试)需手动实现重连逻辑(如监听流关闭事件)
SSE 格式解析自动解析 SSE 协议格式(data/event/id)需手动解析流数据(处理换行、字段拆分等)
跨域支持有限制(默认不发送凭证,需显式配置)完全支持自定义跨域参数(credentials 等)
请求控制仅支持 GET 方法,无法自定义复杂头信息支持任意 HTTP 方法、自定义头、AbortController 取消请求
错误信息粒度错误事件信息有限(难以区分具体错误类型)可通过 response.status/statusText 获取详细错误
浏览器兼容性现代浏览器支持(IE 完全不支持)支持更广泛(现代浏览器均支持,包括部分旧版本)

二、优缺点分析

1. EventSource 的优缺点

优点:

  • 开箱即用的 SSE 适配:是 HTML5 标准中为 SSE 量身设计的 API,无需手动处理流的底层细节(如连接建立、数据分片读取)。

  • 自动重连机制:当连接意外断开(如网络波动),会自动触发重连,且支持服务器通过 retry 字段指定重连间隔(毫秒),极大简化稳定性保障。

  • 自动解析 SSE 格式:服务器发送的 SSE 数据(如 data: xxx\n\nevent: type\nid: 123\n)会被自动解析,开发者直接通过事件的 datatypelastEventId 属性获取信息,无需处理字符串拆分或格式校验。

  • 轻量简洁:代码量极少,只需监听 onmessage 事件即可处理数据,学习成本低。

    javascript
    // EventSource 示例
    const source = new EventSource('/sse-stream');
    source.onmessage = (e) => console.log('接收数据:', e.data);
    source.onerror = (e) => console.error('错误:', e);

缺点:

  • 灵活性差:仅支持 GET 方法,无法自定义复杂请求头(如 Token 认证需通过 URL 传递,不够安全),且跨域时默认不发送 Cookie 等凭证(需设置 withCredentials: true,但部分浏览器兼容性不佳)。
  • 错误处理弱onerror 事件无法区分具体错误类型(如 404 与 500 错误、网络中断与服务器主动关闭),调试困难。
  • 浏览器兼容性局限:完全不支持 IE,部分旧版浏览器(如 Safari 10 以下)存在功能缺陷。

2. Fetch API 实现 SSE 的优缺点

优点:

  • 极高的灵活性:支持自定义请求方法(虽 SSE 通常用 GET,但可扩展)、请求头(如 Authorization: Bearer Token)、跨域凭证(credentials: 'include')等,适配复杂认证或跨域场景。

  • 精细的错误控制:可通过 response.status 判断 HTTP 状态码(如 200 正常、401 未授权),结合 response.ok 快速识别错误,便于针对性处理。

  • 支持主动取消:通过 AbortController 可主动终止请求,适合用户手动关闭流的场景(如页面跳转时停止接收)。

  • 通用流处理:不仅限于 SSE,可处理任意类型的流式响应(如二进制流),扩展性强。

    javascript
    // Fetch 实现 SSE 示例
    const controller = new AbortController();
    fetch('/sse-stream', { 
      signal: controller.signal,
      headers: { 'Authorization': 'Bearer token' }
    }).then(response => {
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      // 循环读取流数据
      const read = () => reader.read().then(({ done, value }) => {
        if (done) return;
        const data = decoder.decode(value);
        console.log('接收数据:', data); // 需手动解析 SSE 格式
        read();
      });
      read();
    });
    // 主动取消
    // controller.abort();

缺点:

  • 需手动处理所有细节:需手动实现流的循环读取、SSE 格式解析(如拆分 data/event/id 字段、处理多行数据拼接)、断连重连逻辑(如监听 done 状态后重试),代码复杂度高。
  • 无内置重连:连接断开后需自行判断原因(如网络错误 vs 服务器正常关闭),并通过定时器实现重连,容易出现重复连接或重连风暴。
  • 学习成本高:需理解 ReadableStreamTextDecoder 等 API,以及 SSE 协议细节(如换行符规则、字段格式),对新手不友好。

三、适用场景总结

  • 优先选 EventSource:场景简单(如无复杂认证、跨域)、需快速实现、重视稳定性(依赖自动重连),例如股票实时行情、新闻推送等。
  • 优先选 Fetch:需要自定义请求(如 Token 认证、复杂头信息)、跨域场景复杂、需精细错误处理或主动取消,例如需要权限校验的实时日志系统、用户可手动关闭的通知流。

简言之,EventSource 是“傻瓜式”的 SSE 解决方案,适合大多数简单场景;Fetch 是“手动挡”方案,适合需要深度定制的复杂场景,但需承担更高的开发成本。

fetch实现sse

这样实现的一个好处就是可以在parseSSEBuffer函数中针对非标准的数据进行处理

fetch-sse.js

js
/**
 * SSE POST 请求工具函数
 * 基于 Fetch API 实现,支持流式接收数据并按 SSE 规范解析
 * @param {Object} options - 请求配置
 * @param {string} options.url - SSE 接口地址
 * @param {Object} [options.data] - POST 请求体数据(会自动转为 JSON)
 * @param {Object} [options.headers] - 自定义请求头
 * @param {number} [options.timeout=30000] - 超时时间(毫秒)
 * @param {Function} options.onMessage - 收到消息的回调 (data: 解析后的数据, event: 完整事件对象)
 * @param {Function} [options.onOpen] - 连接成功回调
 * @param {Function} [options.onError] - 错误回调 (error: 错误信息)
 * @param {Function} [options.onClose] - 连接关闭回调
 * @returns {Function} 取消请求的函数
 */
export function ssePost({
                            url,
                            data,
                            headers = {},
                            timeout = 30000,
                            onMessage,
                            onOpen,
                            onError,
                            onClose
                        }) {
    let isAborted = false;
    const controller = new AbortController();
    const signal = controller.signal;

    // 超时处理
    const timeoutId = setTimeout(() => {
        abort(new Error(`请求超时(${timeout}ms)`));
    }, timeout);

    // 取消请求的函数
    function abort(error) {
        if (isAborted) return;
        isAborted = true;
        clearTimeout(timeoutId);
        controller.abort();
        if (error && onError) {
            onError(error);
        }
    }

    // 开始请求
    (async () => {
        try {
            // 发送 POST 请求
            const response = await fetch(url, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Accept': 'text/event-stream',
                    ...headers
                },
                body: data ? JSON.stringify(data) : undefined,
                signal,
                cache: 'no-store' // 禁用缓存
            });

            // 检查连接状态
            if (!response.ok) {
                throw new Error(`HTTP 错误: ${response.status} ${response.statusText}`);
            }

            // 验证响应类型
            const contentType = response.headers.get('Content-Type');
            if (!contentType || !contentType.includes('text/event-stream')) {
                throw new Error('响应不是 SSE 格式(text/event-stream)');
            }

            // 触发连接成功回调
            if (onOpen) {
                onOpen();
            }

            // 获取流读取器和解码器
            const reader = response.body.getReader();
            const decoder = new TextDecoder('utf-8');
            let buffer = ''; // 缓存未完成的消息片段

            // 循环读取流数据
            while (!isAborted) {
                const {done, value} = await reader.read();

                // 流结束
                if (done) {
                    // 处理剩余缓冲数据
                    if (buffer.trim()) {
                        parseSSEBuffer(buffer, onMessage);
                    }
                    if (onClose) onClose();
                    return;
                }

                // 解码并追加到缓冲区
                buffer += decoder.decode(value, {stream: true});
                // 解析完整消息并更新缓冲区
                buffer = parseSSEBuffer(buffer, onMessage);
            }
        } catch (error) {
            if (!isAborted && onError) {
                onError(error);
            }
        }
    })();

    return abort; // 返回取消函数
}

/**
 * 解析 SSE 格式的缓冲区数据
 * @param {string} buffer - 待解析的字符串缓冲区
 * @param {Function} onMessage - 消息回调函数
 * @returns {string} 未解析的剩余缓冲区
 */
function parseSSEBuffer(buffer, onMessage) {
    // 按 SSE 消息结束符分割(\n\n)
    const messageParts = buffer.split('\n');

    // console.log("messageParts", messageParts)
    // 最后一部分可能是不完整的消息,需要保留
    const remaining = messageParts.pop() || '';

    // 解析每个完整的消息
    messageParts.forEach(rawMessage => {
        if (!rawMessage.trim()) return;

        const data = JSON.parse(rawMessage)
        if (data.message.content) {
            // console.log("message", data.message.content)
            onMessage(data.message.content);
        }
    });

    return remaining;
}

使用示例

js
if (typeof window !== 'undefined') {
  // 发起 SSE POST 请求
  const abort = ssePost({
    url: '/api/sse-stream',
    data: {
      conversationId: '123',
      query: '请流式返回数据'
    },
    headers: {
      'Authorization': 'Bearer your-token'
    },
    onOpen: () => {
      console.log('SSE 连接已建立');
    },
    onMessage: (data, event) => {
      console.log(`收到 [${event.event}] 事件:`, data);
      // 解析 JSON 数据(如果服务端返回 JSON 字符串)
      try {
        const jsonData = JSON.parse(data);
        console.log('解析后的 JSON 数据:', jsonData);
      } catch (e) {
        // 非 JSON 格式直接使用
      }
    },
    onError: (error) => {
      console.error('SSE 错误:', error.message);
    },
    onClose: () => {
      console.log('SSE 连接已关闭');
    }
  });

  // 如需手动取消请求(如组件卸载时)
  // setTimeout(() => {
  //   abort();
  // }, 5000);
}

模拟键盘打字效果

可以封装一个公共函数,模拟键盘打字将流式接口接收到的数据逐字渲染到网页中

js
// 新增状态标记
let isAborted = false;
let currentTimer = null;
// 2. 队列式typeWriter(使用外部滚动函数)
let typeQueue = [];
let isProcessing = false;

/**
 * 自动滚动到容器底部
 * @param {string|HTMLElement} container - 容器选择器或DOM元素
 * @param {boolean} [smooth=false] - 是否平滑滚动
 */
export function scrollToBottom(container, smooth = false) {
    const targetContainer = typeof container === 'string'
        ? document.querySelector(container)
        : container;

    if (!targetContainer) {
        console.warn('滚动容器不存在');
        return;
    }

    requestAnimationFrame(() => {
        if (smooth) {
            targetContainer.scrollTo({ top: targetContainer.scrollHeight, behavior: 'smooth' });
        } else {
            targetContainer.scrollTop = targetContainer.scrollHeight;
        }
    });
}

/**
 * 终止所有打字任务
 * @param {boolean} [clearContent=false] - 是否清空已输出内容
 * @param {HTMLElement} [element] - 要清空内容的元素(clearContent为true时必填)
 */
export function abortTypeWriter(clearContent = false, element) {
    isAborted = true;
    typeQueue = [];

    if (currentTimer) {
        clearTimeout(currentTimer);
        currentTimer = null;
    }

    if (clearContent && element && element instanceof HTMLElement) {
        element.innerHTML = '';
    }

    isProcessing = false;
}

/**
 * 重置打字机状态(用于终止后重新开始)
 */
export function resetTypeWriter() {
    typeQueue = [];
    isProcessing = false;
    isAborted = false;
    if (currentTimer) {
        clearTimeout(currentTimer);
        currentTimer = null;
    }
}

export function typeWriter(text, element, speed = 50, scrollContainer = '#report-container') {
    return new Promise(resolve => {
        typeQueue.push({ text, element, speed, scrollContainer, resolve });
        if (!isProcessing) {
            processQueue();
        }
    });
}

async function processQueue() {
    if (typeQueue.length === 0) {
        isProcessing = false;
        return;
    }

    isProcessing = true;
    const { text, element, speed, scrollContainer, resolve } = typeQueue.shift();

    try {
        await writeText(text, element, speed, scrollContainer);
        resolve();
    } finally {
        processQueue();
    }
}

function writeText(text, element, speed, scrollContainer) {
    return new Promise(resolve => {
        let index = 0;
        const originalHTML = element.innerHTML;

        const writeChar = () => {
            if (!element.isConnected) {
                resolve();
                return;
            }

            if (index < text.length) {
                const char = text[index] === '\n' ? '<br>' :
                    text[index] === ' ' ? '&nbsp;' : text[index];

                element.innerHTML = originalHTML + text.substring(0, index) + char;
                index++;

                // 调用外部滚动函数(逐字输出用即时滚动)
                scrollToBottom(scrollContainer, false);

                requestAnimationFrame(() => {
                    setTimeout(writeChar, speed);
                });
            } else {
                element.innerHTML = originalHTML + text.replace(/\n/g, '<br>');
                // 最终滚动(可选使用平滑滚动)
                scrollToBottom(scrollContainer, false);
                resolve();
            }
        };

        writeChar();
    });
}

如何使用

js
 // reportDom 是要渲染的dom节点
 onMessage: (message) => {
                    if (reportDom) {
                        typeWriter(message, reportDom)
                    } else {
                        console.log("dom不存在")
                    }
                },

上次更新于: