网站首页 > 技术文章 正文
大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发,您的支持是我不断创作的动力。
1. 为什么需要 Stream 流
Streams API 允许开发者以编程方式访问通过 网络接收或本地创建 的数据流,并使用 JavaScript 进行处理。流式处理是指将要接收、发送或转换的资源分解成小块 (chunk),然后逐位处理这些小块。
虽然浏览器在接收要显示在网页上的 HTML 或视频等资源时都会进行流式处理,但在 2015 年引入流式 fetch 之前,JavaScript 从未提供过此功能。虽然 XMLHttpRequest 技术上可行,但效果不明显。
以前,如果开发者想处理某种资源,例如:视频、文本文件等,必须下载整个文件,等待其反序列化为合适的格式再处理。但随着浏览器流式技术的发展,现在只要原始数据在客户端可用,就可以使用 JavaScript 逐步处理, 而无需生成缓冲区、字符串或 Blob。
流式技术为以下场景带来了翻天覆地的变化:
- 视频特效处理:将可读视频流传输到可实时应用效果的转换流
- 数据(解)压缩:将文件流传输到可选择性解 / 压缩的转换流
- 图像解码:将 HTTP 响应流传输到 将字节解码为位图数据的转换流,然后再传输到将位图转换为 PNG 的另一个转换流。如果将其用于在 ServiceWorker 的 fetch 中,则可以透明地 polyfill 新的图像格式,例如: AVIF
2. 什么是可读流
可读流是 JavaScript 中由 ReadableStream 对象表示的数据源,该对象从底层数据源流出,ReadableStream() 构造函数会根据给定的处理程序创建并返回一个可读流对象。
ReadableStream 的底层数据源有两种类型:
- 推送源会在访问后持续推送数据,可以自行决定开始、暂停或取消对流的访问,包括:实时视频流、SSE 或 WebSocket 等
- 拉取源要求在连接后显式向其请求数据,包括:通过 fetch() 或 XMLHttpRequest 调用执行的 HTTP 操作
流数据以小块 (chunk) 方式按顺序读取,放入流中的块被称为 “入队” 数据,在队列中等待读取,而内部队列 (Internal Queue) 会实时跟踪尚未读取的块。
排队策略 (Queuing Strategy) 是一个对象,其根据流内部队列的状态确定流应如何发出背压 (BackPressure) 信号。排队策略会为每个 chunk 分配一个大小,并将队列中所有块的总大小与一个指定的数字(称为高水位线,即 HighWaterMark)进行比较。
流中的块由读取器 (Reader) 读取,每个块单独操作。读取器及其附带的其他处理代码称为消费者,同时每个可读流都有一个关联的控制器 (Controller) 用于控制流。
- 一次只能有一个读取器读取一个流
- 当一个读取器被创建并开始读取一个流(成为 活动读取器)时,其会被锁定在该流上。如果希望另一个读取器接管并读取流,需要在执行之前释放第一个读取器(尽管可启动流)
3. 如何构造一个可读流
可以用下面方法快速创建可读流:
const readableStream = new ReadableStream({
start(controller) {
// controller 是 ReadableStreamDefaultController
// controller 的 desiredSize 返回填充流内部队列所需的大小
controller.enqueue('The first chunk!');
// 将数据块加入关联的流
},
pull(controller) {},
cancel(reason) {},
});
下面是针对 start、pull、cancel 的说明:
- start(controller):对象构造完成后立即调用,可以访问流源并执行 设置流功能 所需的任何操作。如果此过程异步,则可以返回 Promise 来指示成功或失败。传递给此方法的 controller 参数是一个 ReadableStreamDefaultController
- pull(controller):可用于在获取更多数据块时控制流,只要流的内部数据块队列未满,就会重复调用该方法直到队列达到其高水位线。如果返回 Promise,则在其 resolve 之前不会再次调用 pull(),而如果 reject 则流出错
- cancel(reason):当流消费者取消流时调用
ReadableStreamDefaultController 支持以下方法:
- close():关闭关联的流
- enqueue() :将指定的数据块加入关联的流
- error() :会导致与关联流的任何后续交互都发生错误
ReadableStream 还支持第二个参数,例如:
const readableStream = new ReadableStream(
{},
{
highWaterMark: 10,
// 当缓冲区中的数据量达到或超过 `highWaterMark` 时,流会暂停从底层源读取,防止缓冲区溢出
// 当数据被消费,缓冲区中的数据量低于 `highWaterMark` 时,流会恢复读取数据
size(chunk) {
return chunk.length;
},
}
);
第二个参数表示 queuingStrategy,用于定义流的排队策略,包含两个参数:
- highWaterMark:非负数,表示使用此排队策略的流的高水位线,相当于一个阈值,用于限制流内部缓冲区的数据量
- size(chunk):用于计算并返回给定 chunk 的有限非负大小,结果用于确定背压,并通过相应的 ReadableStreamDefaultController.desiredSize 属性体现,其还可控制何时调用底层源的 pull() 方法。
4. 如何使用 getReader() 和 read() 消费可读流
要从可读流中读取数据需要指定 Reader,由 getReader() 方法创建并将流锁定到该 Reader。在流锁定期间且 Reader 释放之前,无法获取其他 Reader。
ReadableStreamDefaultReader 的 read() 方法返回一个 Promise,用于访问流内部队列中的下一个数据块,其会根据流的状态 resolve 或 reject 并返回结果。
- 如果数据块可用则 resolve,并返回 {value: chunk, done: false}
- 如果流已关闭则 reject,并返回 {value: undefined, done: true}
- 如果流发生错误,则 reject 并返回相应的错误
下面代码示例通过调用 getReader() 方法并读取流内容:
const reader = readableStream.getReader();
while (true) {
const {done, value} = await reader.read();
// 返回一个 Promise
if (done) {
console.log("The stream is done.");
break;
}
console.log("Just read a chunk:", value);
}
但是,每次 read() 循环迭代时检查流的 done 属性并不方便,可以借助于异步迭代。
for await (const chunk of stream) {
console.log(chunk);
}
对于不支持 for..await...of 的浏览器,可以使用异步迭代生成器的方法来 polyfill:
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
reader.releaseLock();
}
};
}
值得一提的是,开发者还可以通过访问 ReadableStream.locked 属性来检查可读流是否被锁定。
const locked = readableStream.locked;
console.log(`The stream is ${locked ? "indeed" : "not"} locked.`);
5. 拷贝 (teeing) 可读流
ReadableStream.tee() 方法启动当前可读流,返回一个包含两个结果分支的双元素数组,作为新的 ReadableStream 实例,其允许两个读取器同时读取一个流。
例如,如果想在 Service Worker 中从服务器获取响应并将其流式传输到浏览器,同时又传输到 Service Worker 。由于响应体不能被重复消费,因此需要两个副本来执行此操作。但是要取消流时,开发者需要取消两个结果分支。复制流通常会在执行期间锁定,以防止其他读取器锁定。
const readableStream = new ReadableStream({
start(controller) {
// 构造时候调用
console.log("[start]");
controller.enqueue("a");
controller.enqueue("b");
controller.enqueue("c");
},
pull(controller) {
// 当 controller 的队列为空时调用 read() 方法
console.log("[pull]");
controller.enqueue("d");
controller.close();
},
cancel(reason) {
// 流取消调用
console.log("[cancel]", reason);
},
});
// 创建两个 `ReadableStream`
const [streamA, streamB] = readableStream.tee();
// 逐个迭代读取流 A
const readerA = streamA.getReader();
console.log("[A]", await readerA.read());
//=> {value: "a", done: false}
console.log("[A]", await readerA.read());
//=> {value: "b", done: false}
// 因为 controller.desiredSize 大小为 1,因此上面代码执行完成后
// pull 方法就会被调用,否则填不满
console.log("[A]", await readerA.read());
//=> {value: "c", done: false}
console.log("[A]", await readerA.read());
//=> {value: "d", done: false}
console.log("[A]", await readerA.read());
//=> {value: undefined, done: true}
// 在循环中读取 streamB
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log("[B]", result);
}
6. 可读字节流
6.1 如何使用可读字节流
对于字节流,提供了可读流的扩展版本,以便高效处理字节,特别是通过最小化副本 (Minimizing Copies),字节流允许获取自带缓冲区 (bring-your-own-buffer,BYOB) 读取器。
默认实现可以提供一系列不同的输出,例如:在 WebSocket 下提供字符串或数组缓冲区,而字节流则保证字节输出。此外,BYOB 读取器还具有稳定性优势,这是因为如果缓冲区分离,其可以保证不会重复写入同一个缓冲区,从而避免竞争条件。BYOB 读取器可以减少浏览器运行垃圾回收的次数,因为可以 重用缓冲区。
字节流的创建也非常简单:
new ReadableStream({type: "bytes"})
可读字节流的底层源被赋予一个
ReadableByteStreamController,支持以下方法和属性:
- enqueue(): 接受一个 chunk 参数,其值为 ArrayBufferView
- byobRequest :返回当前的 BYOB 拉取请求,如果没有,则返回 null
- p :返回填充受控流内部队列所需的大小,是一个动态变化的数值
构造函数的第二个参数是 queuingStrategy,用于定义流的排队策略,接受一个参数:
- highWaterMark:表示使用此排队策略的流的高水位,用于确定背压,并通过相应的 desiredSize 属性体现。同时,其还控制何时调用底层源的 pull() 方法。
desiredSize 的计算公式为:highWaterMark - 内部队列中的 chunk 值。
6.2 可读字节流的 getReader() 和 read() 方法
开发者可以通过设置 mode 参数来访问 ReadableStreamBYOBReader:ReadableStream.getReader({mode: "byob"}),最终用于更精确地控制缓冲区分配,从而避免复制。
要从字节流读取数据,开发者需要调用
ReadableStreamBYOBReader.read(view),其中 view 是一个 ArrayBufferView。
const reader = readableStream.getReader({mode: "byob"});
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
// 读取字节流内容
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const {value: view, done} = await reader.read(
new Uint8Array(buffer, offset, buffer.byteLength - offset)
);
// read 参数是 ArrayBufferView
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
以下函数返回可读字节流,允许对随机生成的数组进行高效的零拷贝读取。其没有使用预设的 1,024 个块大小,而是尝试填充开发人员提供的缓冲区,从而实现完全控制。
const DEFAULT_CHUNK_SIZE = 1_024;
// 缓冲区大小
function makeReadableByteStream() {
return new ReadableStream({
type: "bytes",
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
7. 用好可读流的几个示例
7.1 流式读取 fetch 响应内容
以下示例创建了一个人工响应 (Response),用于将从其他资源获取的 HTML 片段流式传输到浏览器,其演示了 ReadableStream 与 Uint8Array 的结合使用。
fetch("https://www.example.org")
.then((response) => response.body)
.then((rb) => {
const reader = rb.getReader();
return new ReadableStream({
start(controller) {
function push() {
// done 表示是否结束,value 是 Uint8Array 类型
reader.read().then(({ done, value}) => {
// 读取结束,关闭关联的流
if (done) {
console.log("done", done);
controller.close();
return;
}
// 获取数据并通过 controller 传递给浏览器
controller.enqueue(value);
push();
});
}
push();
},
});
})
.then((stream) =>
// 通过 stream 手动构造 Response
new Response(stream, { headers: { "Content-Type": "text/html"} }).text(),
)
.then((result) => {
// 这里变成了字符串
console.log(result);
});
7.2 具有底层推送源和背压支持的可读流
以下函数返回包装了背压 (Backpressure) 套接字的可读流。“背压套接字” 是假设的对象,具有与 Web 套接字相同的 API,但也提供了使用 readStop 和 readStart 方法暂停和恢复数据流的功能。
此示例以此方式展示了如何将背压应用于支持背压的底层数据源:
function makeReadableBackpressureSocketStream(host, port) {
const socket = createBackpressureSocket(host, port);
return new ReadableStream({
start(controller) {
socket.ondata = event => {
controller.enqueue(event.data);
if (controller.desiredSize <= 0) {
// 因为内部队列已满,需要将被压信号通知给数据源
socket.readStop();
}
};
socket.onend = () => controller.close();
socket.onerror = () => controller.error(new Error("The socket errored!"));
},
pull() {
// 如果内部队列已空,但是消费者依然需要数据,此时需要重启
socket.readStart();
},
cancel() {
socket.close();
}
});
}
上面的例子中,通过管道传输到目标位置的数据接收速度赶不上套接字生成数据的速度时,或者如果在一段时间内没有读取流,就会向套接字发送背压信号。
参考资料
https://web.dev/articles/streams
https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
猜你喜欢
- 2025-09-11 Vue 自定义指令_vue 自定义指令实现v-bind
- 2025-09-11 2025前端面试题-React基础篇_前端react必读书籍推荐
- 2025-09-11 肉鸭产业养殖端是否需要升级?我们该怎么做?
- 2025-09-11 为什么你的前端首屏慢?从 TTFB 到 CLS 的系统性优化清单
- 2024-12-08 函数节流的6种应用场景,与防抖函数有什么区别?
- 2024-12-08 前端百题斩—通俗易懂的防抖与节流
- 2024-12-08 前端面试:什么是节流,在什么场景中使用
- 2024-12-08 前端都应该要掌握的防抖和节流
- 2024-12-08 秒懂前端防抖和节流(无需代码)
你 发表评论:
欢迎- 最近发表
-
- 用AI做微信小程序的完整步骤_如何用ai制作微信表情包
- 自习室预约的微信小程序设计与实现-计算机毕业设计源码+LW文档
- 微信小程序开发入门指南_微信小程序开发入门教程
- 写字机器人好用吗? 组装就花了5个小时 还要学习软件、录入字体
- 白描网页版 - 高效准确且免费的OCR文字识别工具
- 字体图形面板与图标字体使用_字体图标的优势和劣势
- 作为前端工程师必须懂得的33个CSS核心概念
- Flutter程序员开发炫酷的登录页面 字体库运用 路由学习 源码分享
- 2025Q3开源字体盘点:让你的代码和文档'颜值'飙升!
- Agent杂谈:Agent的能力上下限及「Agent构建」核心技术栈调研分享~
- 标签列表
-
- 前端设计模式 (75)
- 前端性能优化 (51)
- 前端模板 (66)
- 前端跨域 (52)
- 前端缓存 (63)
- 前端aes加密 (58)
- 前端脚手架 (56)
- 前端md5加密 (54)
- 前端路由 (61)
- 前端数组 (73)
- 前端js面试题 (50)
- 前端定时器 (59)
- Oracle RAC (76)
- oracle恢复 (77)
- oracle 删除表 (52)
- oracle 用户名 (80)
- oracle 工具 (55)
- oracle 内存 (55)
- oracle 导出表 (62)
- oracle约束 (54)
- oracle 中文 (51)
- oracle链接 (54)
- oracle的函数 (58)
- oracle面试 (55)
- 前端调试 (52)
本文暂时没有评论,来添加一个吧(●'◡'●)