专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

为什么说 ReadableStream 是现代 Web 架构的关键拼图?

ins518 2025-09-11 20:47:57 技术文章 1 ℃ 0 评论

家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发,您的支持是我不断创作的动力。

1. 为什么需要 Stream 流

Streams API 允许开发者以编程方式访问通过 网络接收或本地创建 的数据流,并使用 JavaScript 进行处理。流式处理是指将要接收、发送或转换的资源分解成小块 (chunk),然后逐位处理这些小块。

虽然浏览器在接收要显示在网页上的 HTML 或视频等资源时都会进行流式处理,但在 2015 年引入流式 fetch 之前,JavaScript 从未提供过此功能。虽然 XMLHttpRequest 技术上可行,但效果不明显。

以前,如果开发者想处理某种资源,例如:视频、文本文件等,必须下载整个文件,等待其反序列化为合适的格式再处理。但随着浏览器流式技术的发展,现在只要原始数据在客户端可用,就可以使用 JavaScript 逐步处理, 而无需生成缓冲区、字符串或 Blob

流式技术为以下场景带来了翻天覆地的变化:

  • 视频特效处理:将可读视频流传输到可实时应用效果的转换流
  • 数据(解)压缩:将文件流传输到可选择性解 / 压缩的转换流
  • 图像解码:将 HTTP 响应流传输到 将字节解码为位图数据的转换流,然后再传输到将位图转换为 PNG 的另一个转换流。如果将其用于在 ServiceWorker 的 fetch 中,则可以透明地 polyfill 新的图像格式,例如: AVIF

2. 什么是可读流

可读流是 JavaScript 中由 ReadableStream 对象表示的数据源,该对象从底层数据源流出,ReadableStream() 构造函数会根据给定的处理程序创建并返回一个可读流对象。

ReadableStream 的底层数据源有两种类型:

  • 推送源会在访问后持续推送数据,可以自行决定开始、暂停或取消对流的访问,包括:实时视频流、SSE 或 WebSocket 等
  • 拉取源要求在连接后显式向其请求数据,包括:通过 fetch() 或 XMLHttpRequest 调用执行的 HTTP 操作

流数据以小块 (chunk) 方式按顺序读取,放入流中的块被称为 “入队” 数据,在队列中等待读取,而内部队列 (Internal Queue) 会实时跟踪尚未读取的块。

排队策略 (Queuing Strategy) 是一个对象,其根据流内部队列的状态确定流应如何发出背压 (BackPressure) 信号。排队策略会为每个 chunk 分配一个大小,并将队列中所有块的总大小与一个指定的数字(称为高水位线,即 HighWaterMark)进行比较。

流中的块由读取器 (Reader) 读取,每个块单独操作。读取器及其附带的其他处理代码称为消费者,同时每个可读流都有一个关联的控制器 (Controller) 用于控制流。

  • 一次只能有一个读取器读取一个流
  • 当一个读取器被创建并开始读取一个流(成为 活动读取器)时,其会被锁定在该流上。如果希望另一个读取器接管并读取流,需要在执行之前释放第一个读取器(尽管可启动流)

3. 如何构造一个可读流

可以用下面方法快速创建可读流:

const readableStream = new ReadableStream({
  start(controller) {
    // controller 是 ReadableStreamDefaultController
    // controller 的 desiredSize 返回填充流内部队列所需的大小
    controller.enqueue('The first chunk!');
  // 将数据块加入关联的流
  },
  pull(controller) {},
  cancel(reason) {},
});

下面是针对 start、pull、cancel 的说明:

  • start(controller):对象构造完成后立即调用,可以访问流源并执行 设置流功能 所需的任何操作。如果此过程异步,则可以返回 Promise 来指示成功或失败。传递给此方法的 controller 参数是一个 ReadableStreamDefaultController
  • pull(controller):可用于在获取更多数据块时控制流,只要流的内部数据块队列未满,就会重复调用该方法直到队列达到其高水位线。如果返回 Promise,则在其 resolve 之前不会再次调用 pull(),而如果 reject 则流出错
  • cancel(reason):当流消费者取消流时调用


ReadableStreamDefaultController 支持以下方法:

  • close():关闭关联的流
  • enqueue() :将指定的数据块加入关联的流
  • error() :会导致与关联流的任何后续交互都发生错误

ReadableStream 还支持第二个参数,例如:

const readableStream = new ReadableStream(
  {},
  {
    highWaterMark: 10,
    // 当缓冲区中的数据量达到或超过 `highWaterMark` 时,流会暂停从底层源读取,防止缓冲区溢出
   // 当数据被消费,缓冲区中的数据量低于 `highWaterMark` 时,流会恢复读取数据
    size(chunk) {
      return chunk.length;
    },
  }
);

第二个参数表示 queuingStrategy,用于定义流的排队策略,包含两个参数:

  • highWaterMark:非负数,表示使用此排队策略的流的高水位线,相当于一个阈值,用于限制流内部缓冲区的数据量
  • size(chunk):用于计算并返回给定 chunk 的有限非负大小,结果用于确定背压,并通过相应的 ReadableStreamDefaultController.desiredSize 属性体现,其还可控制何时调用底层源的 pull() 方法。

4. 如何使用 getReader() 和 read() 消费可读流

要从可读流中读取数据需要指定 Reader,由 getReader() 方法创建并将流锁定到该 Reader。在流锁定期间且 Reader 释放之前,无法获取其他 Reader。


ReadableStreamDefaultReader 的 read() 方法返回一个 Promise,用于访问流内部队列中的下一个数据块,其会根据流的状态 resolve 或 reject 并返回结果。

  • 如果数据块可用则 resolve,并返回 {value: chunk, done: false}
  • 如果流已关闭则 reject,并返回 {value: undefined, done: true}
  • 如果流发生错误,则 reject 并返回相应的错误

下面代码示例通过调用 getReader() 方法并读取流内容:

const reader = readableStream.getReader();
while (true) {
  const {done, value} = await reader.read();
  // 返回一个 Promise
  if (done) {
    console.log("The stream is done.");
    break;
  }
  console.log("Just read a chunk:", value);
}

但是,每次 read() 循环迭代时检查流的 done 属性并不方便,可以借助于异步迭代。

for await (const chunk of stream) {
  console.log(chunk);
}

对于不支持 for..await...of 的浏览器,可以使用异步迭代生成器的方法来 polyfill:

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
        }
        yield value;
      }
    } finally {
      reader.releaseLock();
    }
  };
}

值得一提的是,开发者还可以通过访问 ReadableStream.locked 属性来检查可读流是否被锁定。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? "indeed" : "not"} locked.`);

5. 拷贝 (teeing) 可读流

ReadableStream.tee() 方法启动当前可读流,返回一个包含两个结果分支的双元素数组,作为新的 ReadableStream 实例,其允许两个读取器同时读取一个流。

例如,如果想在 Service Worker 中从服务器获取响应并将其流式传输到浏览器,同时又传输到 Service Worker 。由于响应体不能被重复消费,因此需要两个副本来执行此操作。但是要取消流时,开发者需要取消两个结果分支。复制流通常会在执行期间锁定,以防止其他读取器锁定。

const readableStream = new ReadableStream({
  start(controller) {
    // 构造时候调用
    console.log("[start]");
    controller.enqueue("a");
    controller.enqueue("b");
    controller.enqueue("c");
  },
  pull(controller) {
    // 当 controller 的队列为空时调用 read() 方法
    console.log("[pull]");
    controller.enqueue("d");
    controller.close();
  },
  cancel(reason) {
    // 流取消调用
    console.log("[cancel]", reason);
  },
});
// 创建两个 `ReadableStream`
const [streamA, streamB] = readableStream.tee();
// 逐个迭代读取流 A
const readerA = streamA.getReader();
console.log("[A]", await readerA.read());
//=> {value: "a", done: false}
console.log("[A]", await readerA.read());
//=> {value: "b", done: false}
// 因为 controller.desiredSize 大小为 1,因此上面代码执行完成后
// pull 方法就会被调用,否则填不满
console.log("[A]", await readerA.read());
//=> {value: "c", done: false}
console.log("[A]", await readerA.read());
//=> {value: "d", done: false}
console.log("[A]", await readerA.read());
//=> {value: undefined, done: true}
// 在循环中读取 streamB
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log("[B]", result);
}

6. 可读字节流

6.1 如何使用可读字节流

对于字节流,提供了可读流的扩展版本,以便高效处理字节,特别是通过最小化副本 (Minimizing Copies),字节流允许获取自带缓冲区 (bring-your-own-buffer,BYOB) 读取器。

默认实现可以提供一系列不同的输出,例如:在 WebSocket 下提供字符串或数组缓冲区,而字节流则保证字节输出。此外,BYOB 读取器还具有稳定性优势,这是因为如果缓冲区分离,其可以保证不会重复写入同一个缓冲区,从而避免竞争条件。BYOB 读取器可以减少浏览器运行垃圾回收的次数,因为可以 重用缓冲区

字节流的创建也非常简单:

new ReadableStream({type: "bytes"})

可读字节流的底层源被赋予一个
ReadableByteStreamController,支持以下方法和属性:

  • enqueue(): 接受一个 chunk 参数,其值为 ArrayBufferView
  • byobRequest :返回当前的 BYOB 拉取请求,如果没有,则返回 null
  • p :返回填充受控流内部队列所需的大小,是一个动态变化的数值

构造函数的第二个参数是 queuingStrategy,用于定义流的排队策略,接受一个参数:

  • highWaterMark:表示使用此排队策略的流的高水位,用于确定背压,并通过相应的 desiredSize 属性体现。同时,其还控制何时调用底层源的 pull() 方法。

desiredSize 的计算公式为:highWaterMark - 内部队列中的 chunk 值。

6.2 可读字节流的 getReader() 和 read() 方法

开发者可以通过设置 mode 参数来访问 ReadableStreamBYOBReader:ReadableStream.getReader({mode: "byob"}),最终用于更精确地控制缓冲区分配,从而避免复制。

要从字节流读取数据,开发者需要调用
ReadableStreamBYOBReader.read(view),其中 view 是一个 ArrayBufferView。

const reader = readableStream.getReader({mode: "byob"});

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
// 读取字节流内容
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const {value: view, done} = await reader.read(
      new Uint8Array(buffer, offset, buffer.byteLength - offset)
    );
    // read 参数是 ArrayBufferView
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }
  return buffer;
}

以下函数返回可读字节流,允许对随机生成的数组进行高效的零拷贝读取。其没有使用预设的 1,024 个块大小,而是尝试填充开发人员提供的缓冲区,从而实现完全控制。

const DEFAULT_CHUNK_SIZE = 1_024;
// 缓冲区大小
function makeReadableByteStream() {
  return new ReadableStream({
    type: "bytes",
    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },
    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

7. 用好可读流的几个示例

7.1 流式读取 fetch 响应内容

以下示例创建了一个人工响应 (Response),用于将从其他资源获取的 HTML 片段流式传输到浏览器,其演示了 ReadableStream 与 Uint8Array 的结合使用。

fetch("https://www.example.org")
  .then((response) => response.body)
  .then((rb) => {
    const reader = rb.getReader();
    return new ReadableStream({
      start(controller) {
        function push() {
          // done 表示是否结束,value 是 Uint8Array 类型
          reader.read().then(({ done, value}) => {
            // 读取结束,关闭关联的流
            if (done) {
              console.log("done", done);
              controller.close();
              return;
            }
            // 获取数据并通过 controller 传递给浏览器
            controller.enqueue(value);
            push();
          });
        }
        push();
      },
    });
  })
  .then((stream) =>
    // 通过 stream 手动构造 Response
    new Response(stream, { headers: { "Content-Type": "text/html"} }).text(),
  )
  .then((result) => {
    // 这里变成了字符串
    console.log(result);
  });

7.2 具有底层推送源和背压支持的可读流

以下函数返回包装了背压 (Backpressure) 套接字的可读流。“背压套接字” 是假设的对象,具有与 Web 套接字相同的 API,但也提供了使用 readStop 和 readStart 方法暂停和恢复数据流的功能。

此示例以此方式展示了如何将背压应用于支持背压的底层数据源:

function makeReadableBackpressureSocketStream(host, port) {
  const socket = createBackpressureSocket(host, port);
  return new ReadableStream({
    start(controller) {
      socket.ondata = event => {
        controller.enqueue(event.data);
        if (controller.desiredSize <= 0) {
          // 因为内部队列已满,需要将被压信号通知给数据源
          socket.readStop();
        }
      };
      socket.onend = () => controller.close();
      socket.onerror = () => controller.error(new Error("The socket errored!"));
    },
    pull() {
      // 如果内部队列已空,但是消费者依然需要数据,此时需要重启
      socket.readStart();
    },
    cancel() {
      socket.close();
    }
  });
}

上面的例子中,通过管道传输到目标位置的数据接收速度赶不上套接字生成数据的速度时,或者如果在一段时间内没有读取流,就会向套接字发送背压信号。

参考资料

https://web.dev/articles/streams

https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表