JS 中的流式处理技术 Iterator、Generator、for await...of
小麦2025年09月03日1976 字
讲讲 JS 中进行流式处理的几个关键技术

欢迎来到《通俗易懂 AI 应用开发》,我是小麦。

在上期视频中,我们实现了这样一个命令行聊天机器人。
你会发现每次提问后都要等待一段时间,体验不是很好。

这次我们不借助任何三方库,把它改造成流式输出,

然后深入讲解流式处理技术中的几个核心工具:TextDecoder 文本编码器、Iterator 迭代器、for await…of 语法和 Generator 生成器。
它们也是 JS 语言较为进阶的内容,相信看完会有不少收获。

先前我们在 invoke 函数中通过 fetch 调用模型服务,等待响应体完全到达后才返回,这是用户要等很久的核心原因。

现在我们新传入一个参数 stream: true,告诉接口以特定的格式响应,

接着直接将响应体 body 返回。

在调用侧,我们引入了一个叫做 TextDecoder 的解码器,在 for 循环中对响应体(response)进行解码。
让我们细致地看看这个循环做了什么。

在 for await…of 循环中,每次调用 TextDecoder 的 decode 方法对 value 进行解码。
这里的 value 是来自于响应体(response)的数据片段,它是个 Uint8Array,存储字符串的 ASCII 编码。

比如开始的 100, 97, 116, 97 这四个数字对应的字符分别是 'd', 'a', ’t', 'a',

连起来就是 "data" 这个字符串。

因此这里的 decode 是在把无符号整数数组,映射为字符串。
你会注意到 stream: true 这个参数,它用来处理多字节字符的残留问题,

比如中文字符“你”,在 utf-8 编码规则下,需要用三个数字表示:228, 189 和 160。
如果不完整,那么 TextDecoder 会解出来乱码。

而加上 stream: true 参数后,TextDecoder 会在积累到完整数据后再解码,从而避免乱码问题。

回到循环代码,decode 后紧接着是字符串操作 split、map 和 filter,map 和 filter 很好理解,split 为啥要按换行符分割呢?
还记得我们之前在 fetch 方法那边添加的 stream: true 参数吗,和接口约定好的输出格式具体是这样的。

它包含多行带有 "data" 前缀的 JSON 字符串,并且用空行分割,最后一行固定是 "data: [DONE]"。
需要注意的是,每次循环拿到的 value 都一定包含一个或多个完整的 JSON 串。
现在你应该理解 split、map、filter 三连的作用了。

将 lines 分离出来后,遍历,判断是否结束,去掉 data 再解析为 json 对象,取出 json 对象中的 content 字段,然后打印。
这里打印用的是 process.stdout.write 而不是 console.log,可以想想为什么。
for await 循环结束后我们拿到了本轮对话的完整回复,之后的流程就和先前一样了。

这段代码其实大部分都在做数据转换和格式处理的事情,而 for await…of 语法才是流式处理的核心。
接下来我们学习一下它背后的原理。

首先我们要了解什么是迭代器(Iterator)。

我们已经非常熟悉 for…of 语法,它是 ES6 引入的同步迭代语法。
而 for await…of 是 ES9 引入的异步迭代语法。
ES6 标准规定:实现了 Symbol.iterator 方法的对象都可以用 for…of 来迭代。
ES9 标准规定:实现了 Symbol.asyncIterator 方法的对象都可以用 for await…of 来迭代。
先看同步迭代器,

原生数组对象默认已经实现了一个同步迭代器,把它拿出来看看。


你会发现这个 iterator 对象上有三个方法,next、return 和 throw。

重点关注 next,每次调用它都会返回下一个值(value)以及是否结束标记(done)。

现在用一个循环驱动迭代器迭代。嗯…看上去有点繁琐是吧,

于是我们就有了 for...of 语法,它的作用其实就是简化左边代码的写法。
搞明白同步迭代后,异步迭代就简单很多了。

依然以 fetch 为例,响应体 body 是个 ReadableStream,

既然它可以遍历,那么类比同步迭代,它上面是不是应该有个 Symbol.asyncIterator 方法呢?

唉,还真有。它的 next 方法会返回一个 Promise,里面包裹的东西和同步迭代一样。
同样,写个循环驱动迭代器迭代。

和同步迭代不一样的是,这里需要 await 一下。

如果换成 for await 语法就变成了右边这样。
怎么样,很简单吧。

顺带提一嘴,ReadableStream 在 Node.js 中是这样实现的。
这里的 next、return 和 SymbolAsyncIterator 是不是似曾相识呢?

回到项目代码,从设计角度考虑,这段代码有什么问题?
这段代码的数据处理逻辑和业务逻辑耦合在一起,假设项目里还有很多地方都要进行这样的异步迭代,那么如何避免重复代码呢?

比如能否简化成这样:只保留核心业务代码,而把相对通用的逻辑封装到 stream 方法的内部。
这就要求 stream 方法返回一个实现了异步迭代器的对象,并且这个对象每次 next 返回的 value 都是字符串。

来看一下具体如何实现,需要注意到 async function 后面多了一个星号,这表示它是一个异步生成器函数(AsyncGenerator)。
同时最里面的循环使用 yield 关键字返回迭代值,其余代码和之前一样。

唉,这就要引入生成器(Generator)的概念了。

ES6 标准规定:可以用 function* 定义一个生成器函数。

在函数内部使用 yield 关键字产出一个值并暂停函数执行。

调用生成器函数会返回一个 Generator 对象,

它会自动实现迭代器 Iterator。
唉,迭代器又出现了。

这样就可以直接用 for of 语法来遍历生成器函数的返回值。
和先前类似,异步生成器只需要添加 async 关键字,这里不过多赘述了。
说了这么多,生成器平时几乎用不到,有什么实际作用呢?有的,兄弟,有的。

其中一个重要作用就是用于流的转换。

在我们的实战项目中,就利用了异步生成器函数,将 fetch 返回的 ReadableStream 转换成为自定义的一个 Stream,它屏蔽了内部数据处理的细节。

在 AI 应用开发中,流的转换非常有用。比如在流式动态渲染 UI 界面时,前端不会直接处理底层数据结构,而是处理业务自定义的数据结构。



这就要用到流的转换,而 JS 的 for await 语法会让这一过程变得容易。

最后,你可以通过 Github 访问这期视频的源代码。
感谢你看到这里,如果觉得有用记得三连支持和关注。

我是小麦,一位热爱技术和分享的软件工程师,我们下期再见。
