Skip to main content
流是一种重要的抽象概念,用于处理二进制数据,而无需一次性将所有数据加载到内存中。它们通常用于读取和写入文件、发送和接收网络请求以及处理大量数据。 Bun 实现了 Web API ReadableStreamWritableStream
Bun 还实现了 node:stream 模块,包括 ReadableWritableDuplex。有关完整文档,请参阅 Node.js 文档
要创建一个简单的 ReadableStream
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});
可以使用 for await 语法逐块读取 ReadableStream 的内容。
for await (const chunk of stream) {
  console.log(chunk);
}

// hello
// world

直接 ReadableStream

Bun 实现了一个优化版本的 ReadableStream,避免了不必要的数据复制和队列管理逻辑。 对于传统的 ReadableStream,数据块会被_排队_。每个块都被复制到队列中,直到流准备好发送更多数据。
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});
对于直接 ReadableStream,数据块直接写入流。不会发生排队,也不需要将块数据克隆到内存中。controller API 已更新以反映这一点;不是使用 .enqueue() 而是调用 .write
const stream = new ReadableStream({
  type: "direct", 
  pull(controller) {
    controller.write("hello");
    controller.write("world");
  },
});
使用直接 ReadableStream 时,所有块队列都由目标处理。流的使用者接收传递给 controller.write() 的确切内容,没有任何编码或修改。

异步生成器流

Bun 还支持异步生成器函数作为 ResponseRequest 的源。这是一种创建从异步源获取数据的 ReadableStream 的简单方法。
const response = new Response(
  (async function* () {
    yield "hello";
    yield "world";
  })(),
);

await response.text(); // "helloworld"
您也可以直接使用 [Symbol.asyncIterator]
const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    yield "hello";
    yield "world";
  },
});

await response.text(); // "helloworld"
如果需要对流进行更精细的控制,yield 将返回直接 ReadableStream 控制器。
const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    const controller = yield "hello";
    await controller.end();
  },
});

await response.text(); // "hello"

Bun.ArrayBufferSink

Bun.ArrayBufferSink 类是一个用于构建未知大小的 ArrayBuffer 的快速增量写入器。
const sink = new Bun.ArrayBufferSink();

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]
要以 Uint8Array 形式检索数据,请将 asUint8Array 选项传递给 start 方法。
const sink = new Bun.ArrayBufferSink();
sink.start({
  asUint8Array: true, 
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]
.write() 方法支持字符串、类型化数组、ArrayBufferSharedArrayBuffer
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);

sink.end();
一旦调用了 .end(),就不能再向 ArrayBufferSink 写入数据。然而,在缓冲流的上下文中,连续写入数据并定期 .flush() 内容(比如,写入 WriteableStream)是有用的。为了支持这一点,请在构造函数中传递 stream: true
const sink = new Bun.ArrayBufferSink();
sink.start({
  stream: true, 
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]

sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]
.flush() 方法将缓冲的数据作为 ArrayBuffer(或如果设置了 asUint8Array: true 则为 Uint8Array)返回并清除内部缓冲区。 要手动设置内部缓冲区的大小(以字节为单位),请为 highWaterMark 传递一个值:
const sink = new Bun.ArrayBufferSink();
sink.start({
  highWaterMark: 1024 * 1024, // 1 MB
});

参考

See Typescript Definitions
/**
 * 快速增量写入器,在 end() 时变成 `ArrayBuffer`。
 */
export class ArrayBufferSink {
  constructor();

  start(options?: {
    asUint8Array?: boolean;
    /**
     * 预分配此大小的内部缓冲区
     * 当块大小很小时,这可以显著提高性能
     */
    highWaterMark?: number;
    /**
     * 在 {@link ArrayBufferSink.flush} 时,将写入的数据作为 `Uint8Array` 返回。
     * 写入将从缓冲区的开头重新开始。
     */
    stream?: boolean;
  }): void;

  write(chunk: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer): number;
  /**
   * 刷新内部缓冲区
   *
   * 如果 {@link ArrayBufferSink.start} 传递了 `stream` 选项,这将返回 `ArrayBuffer`
   * 如果 {@link ArrayBufferSink.start} 传递了 `stream` 选项和 `asUint8Array`,这将返回 `Uint8Array`
   * 否则,这将返回自上次刷新以来写入的字节数
   *
   * 此 API 可能在稍后更改,以分离 Uint8ArraySink 和 ArrayBufferSink
   */
  flush(): number | Uint8Array<ArrayBuffer> | ArrayBuffer;
  end(): ArrayBuffer | Uint8Array<ArrayBuffer>;
}