Skip to main content
Bun.serve() 支持服务端WebSockets,具有即时压缩、TLS支持和Bun原生的发布-订阅API。
⚡️ 7倍更高的吞吐量Bun的WebSockets很快。对于Linux x64上的简单聊天室,Bun每秒可以处理比Node.js + "ws"多7倍的请求。
每秒发送的消息数运行时客户端
~700,000(Bun.serve) Bun v0.2.1 (x64)16
~100,000(ws) Node v18.10.0 (x64)16
Bun的WebSocket内部实现基于uWebSockets

启动WebSocket服务器

下面是一个使用Bun.serve构建的简单WebSocket服务器,其中所有传入的请求都在fetch处理器中被升级为WebSocket连接。套接字处理器在websocket参数中声明。
https://mintcdn.com/teemo/2s-4Z6VdGqiCeBNX/icons/typescript.svg?fit=max&auto=format&n=2s-4Z6VdGqiCeBNX&q=85&s=087b260066909db1cd3e9c7292bc34b2server.ts
Bun.serve({
  fetch(req, server) {
    // 将请求升级为WebSocket
    if (server.upgrade(req)) {
      return; // 不返回Response
    }
    return new Response("升级失败", { status: 500 });
  },
  websocket: {}, // 处理器
});
支持以下WebSocket事件处理器:
https://mintcdn.com/teemo/2s-4Z6VdGqiCeBNX/icons/typescript.svg?fit=max&auto=format&n=2s-4Z6VdGqiCeBNX&q=85&s=087b260066909db1cd3e9c7292bc34b2server.ts
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    message(ws, message) {}, // 收到消息
    open(ws) {}, // 套接字打开
    close(ws, code, message) {}, // 套接字关闭
    drain(ws) {}, // 套接字准备好接收更多数据
  },
});
在Bun中,处理器在每个服务器上只声明一次,而不是每个套接字。ServerWebSocket期望您将一个WebSocketHandler对象传递给Bun.serve()方法,该对象包含openmessageclosedrainerror的方法。这与客户端的WebSocket类不同,该类继承自EventTarget(onmessage、onopen、onclose)。客户端往往不会有太多的套接字连接,所以基于事件的API是有意义的。但是服务器往往有许多套接字连接打开,这意味着:
  • 每个连接添加/移除事件监听器所花费的时间会累积起来
  • 每个连接存储回调函数引用所消耗的额外内存
  • 通常,人们为每个连接创建新函数,这也意味着更多的内存
因此,不使用基于事件的API,ServerWebSocket希望您在Bun.serve()中传递一个包含每个事件方法的单一对象,并且它会被每个连接重用。这导致更少的内存使用和更少的时间花在添加/移除事件监听器上。
每个处理器的第一个参数是处理事件的ServerWebSocket实例。ServerWebSocket类是WebSocket的快速、Bun原生实现,带有一些附加功能。
https://mintcdn.com/teemo/2s-4Z6VdGqiCeBNX/icons/typescript.svg?fit=max&auto=format&n=2s-4Z6VdGqiCeBNX&q=85&s=087b260066909db1cd3e9c7292bc34b2server.ts
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    message(ws, message) {
      ws.send(message); // 回显消息
    },
  },
});

发送消息

每个ServerWebSocket实例都有一个.send()方法用于向客户端发送消息。它支持多种输入类型。
https://mintcdn.com/teemo/2s-4Z6VdGqiCeBNX/icons/typescript.svg?fit=max&auto=format&n=2s-4Z6VdGqiCeBNX&q=85&s=087b260066909db1cd3e9c7292bc34b2server.ts
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    message(ws, message) {
      ws.send("Hello world"); // 字符串
      ws.send(response.arrayBuffer()); // ArrayBuffer
      ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView
    },
  },
});

Headers

一旦升级成功,Bun将按照规范发送一个101 Switching Protocols响应。可以在调用server.upgrade()时将附加的headers附加到此Response
https://mintcdn.com/teemo/2s-4Z6VdGqiCeBNX/icons/typescript.svg?fit=max&auto=format&n=2s-4Z6VdGqiCeBNX&q=85&s=087b260066909db1cd3e9c7292bc34b2server.ts
Bun.serve({
  fetch(req, server) {
    const sessionId = await generateSessionId();
    server.upgrade(req, {
      headers: { 
        "Set-Cookie": `SessionId=${sessionId}`, 
      }, 
    });
  },
  websocket: {}, // 处理器
});

上下文数据

上下文data可以附加到新的WebSocket中的.upgrade()调用。这些数据在WebSocket处理器内的ws.data属性上可用。 为了强类型化ws.data,请在websocket处理器对象上添加一个data属性。这会在所有生命周期钩子中键入ws.data
https://mintcdn.com/teemo/2s-4Z6VdGqiCeBNX/icons/typescript.svg?fit=max&auto=format&n=2s-4Z6VdGqiCeBNX&q=85&s=087b260066909db1cd3e9c7292bc34b2server.ts
type WebSocketData = {
  createdAt: number;
  channelId: string;
  authToken: string;
};

Bun.serve({
  fetch(req, server) {
    const cookies = new Bun.CookieMap(req.headers.get("cookie")!);

    server.upgrade(req, {
      // 此对象必须符合WebSocketData
      data: {
        createdAt: Date.now(),
        channelId: new URL(req.url).searchParams.get("channelId"),
        authToken: cookies.get("X-Token"),
      },
    });

    return undefined;
  },
  websocket: {
    // TypeScript: 像这样指定ws.data的类型
    data: {} as WebSocketData,
    // 收到消息时调用的处理器
    async message(ws, message) {
      // ws.data现在被正确键入为WebSocketData
      const user = getUserFromToken(ws.data.authToken);

      await saveMessageToDatabase({
        channel: ws.data.channelId,
        message: String(message),
        userId: user.id,
      });
    },
  },
});
注意: 以前,您可以使用Bun.serve上的类型参数来指定ws.data的类型,例如Bun.serve<MyData>({...})。由于TypeScript的一个限制,这种模式已被移除,转而使用上面所示的data属性。
要从浏览器连接到此服务器,请创建一个新的WebSocket
browser.js
const socket = new WebSocket("ws://localhost:3000/chat");

socket.addEventListener("message", event => {
  console.log(event.data);
});
识别用户当前页面上设置的cookie将随WebSocket升级请求一起发送,并可在fetch处理器中的req.headers中获得。解析这些cookie以确定连接用户的标识,并相应地设置data的值。

发布/订阅

Bun的ServerWebSocket实现为基于主题的广播实现了原生的发布-订阅API。各个套接字可以.subscribe()到一个主题(用字符串标识指定)并.publish()消息给该主题的所有其他订阅者(排除自身)。这种基于主题的广播API类似于MQTTRedis Pub/Sub
https://mintcdn.com/teemo/2s-4Z6VdGqiCeBNX/icons/typescript.svg?fit=max&auto=format&n=2s-4Z6VdGqiCeBNX&q=85&s=087b260066909db1cd3e9c7292bc34b2server.ts
const server = Bun.serve({
  fetch(req, server) {
    const url = new URL(req.url);
    if (url.pathname === "/chat") {
      console.log(`升级!`);
      const username = getUsernameFromReq(req);
      const success = server.upgrade(req, { data: { username } });
      return success ? undefined : new Response("WebSocket升级错误", { status: 400 });
    }

    return new Response("Hello world");
  },
  websocket: {
    // TypeScript: 像这样指定ws.data的类型
    data: {} as { username: string },
    open(ws) {
      const msg = `${ws.data.username} 进入了聊天室`;
      ws.subscribe("群聊");
      server.publish("群聊", msg);
    },
    message(ws, message) {
      // 这是一个群聊
      // 所以服务器将收到的消息重新广播给每个人
      server.publish("群聊", `${ws.data.username}: ${message}`);

      // 检查当前订阅
      console.log(ws.subscriptions); // ["群聊"]
    },
    close(ws) {
      const msg = `${ws.data.username} 离开了聊天室`;
      ws.unsubscribe("群聊");
      server.publish("群聊", msg);
    },
  },
});

console.log(`监听在 ${server.hostname}:${server.port}`);
调用.publish(data)将向主题的所有订阅者发送消息,_除了_调用.publish()的套接字。要向主题的所有订阅者发送消息,请使用Server实例上的.publish()方法。
const server = Bun.serve({
  websocket: {
    // ...
  },
});

// 监听某些外部事件
server.publish("群聊", "Hello world");

压缩

可以通过perMessageDeflate参数启用逐消息压缩
https://mintcdn.com/teemo/2s-4Z6VdGqiCeBNX/icons/typescript.svg?fit=max&auto=format&n=2s-4Z6VdGqiCeBNX&q=85&s=087b260066909db1cd3e9c7292bc34b2server.ts
Bun.serve({
  websocket: {
    perMessageDeflate: true, 
  },
});
可以通过将布尔值作为第二个参数传递给.send()来为单个消息启用压缩。
ws.send("Hello world", true);
有关压缩特性的细粒度控制,请参阅参考

反压

ServerWebSocket.send(message)方法返回一个表示操作结果的number
  • -1 — 消息已排队但存在反压
  • 0 — 消息因连接问题而被丢弃
  • 1+ — 发送的字节数
这使您能够更好地控制服务器中的反压。

超时和限制

默认情况下,如果WebSocket连接空闲120秒,Bun将关闭它。这可以通过idleTimeout参数配置。
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    idleTimeout: 60, // 60秒
  },
});
如果Bun收到大于16MB的消息,它也会关闭WebSocket连接。这可以通过maxPayloadLength参数配置。
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    maxPayloadLength: 1024 * 1024, // 1 MB
  },
});

连接到 Websocket 服务器

Bun实现了WebSocket类。要创建连接到ws://wss://服务器的WebSocket客户端,就像在浏览器中一样创建一个WebSocket实例。
const socket = new WebSocket("ws://localhost:3000");

// 带子协议协商
const socket2 = new WebSocket("ws://localhost:3000", ["soap", "wamp"]);
在浏览器中,当前页面上设置的cookie将随WebSocket升级请求一起发送。这是WebSocket API的标准功能。 为方便起见,Bun允许您直接在构造函数中设置自定义header。这是Bun对WebSocket标准的特定扩展。这在浏览器中不起作用。
const socket = new WebSocket("ws://localhost:3000", {
  headers: {
    /* 自定义header */
  }, 
});
要向套接字添加事件监听器:
// 收到消息
socket.addEventListener("message", event => {});

// 套接字打开
socket.addEventListener("open", event => {});

// 套接字关闭
socket.addEventListener("close", event => {});

// 错误处理器
socket.addEventListener("error", event => {});

参考

查看Typescript定义 展开
namespace Bun {
  export function serve(params: {
    fetch: (req: Request, server: Server) => Response | Promise<Response>;
    websocket?: {
      message: (ws: ServerWebSocket, message: string | ArrayBuffer | Uint8Array) => void;
      open?: (ws: ServerWebSocket) => void;
      close?: (ws: ServerWebSocket, code: number, reason: string) => void;
      error?: (ws: ServerWebSocket, error: Error) => void;
      drain?: (ws: ServerWebSocket) => void;

      maxPayloadLength?: number; // 默认值: 16 * 1024 * 1024 = 16 MB
      idleTimeout?: number; // 默认值: 120 (秒)
      backpressureLimit?: number; // 默认值: 1024 * 1024 = 1 MB
      closeOnBackpressureLimit?: boolean; // 默认值: false
      sendPings?: boolean; // 默认值: true
      publishToSelf?: boolean; // 默认值: false

      perMessageDeflate?:
        | boolean
        | {
            compress?: boolean | Compressor;
            decompress?: boolean | Compressor;
          };
    };
  }): Server;
}

type Compressor =
  | `"disable"`
  | `"shared"`
  | `"dedicated"`
  | `"3KB"`
  | `"4KB"`
  | `"8KB"`
  | `"16KB"`
  | `"32KB"`
  | `"64KB"`
  | `"128KB"`
  | `"256KB"`;

interface Server {
  pendingWebSockets: number;
  publish(topic: string, data: string | ArrayBufferView | ArrayBuffer, compress?: boolean): number;
  upgrade(
    req: Request,
    options?: {
      headers?: HeadersInit;
      data?: any;
    },
  ): boolean;
}

interface ServerWebSocket {
  readonly data: any;
  readonly readyState: number;
  readonly remoteAddress: string;
  readonly subscriptions: string[];
  send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
  close(code?: number, reason?: string): void;
  subscribe(topic: string): void;
  unsubscribe(topic: string): void;
  publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
  isSubscribed(topic: string): boolean;
  cork(cb: (ws: ServerWebSocket) => void): void;
}