Dec 12, 2024
Backpressure in WebSockets using Web Streams
WebSockets allow bi-directional communication between two processes over a TCP connection. Since its standardisation in 2011, it has become a well defined protocol.
The web platform offers the WebSockets API to interact with WebSockets through a simple interface - just attach an onmessage
handler, and you’re good to go!
const ws = new WebSocket("wss://...");
ws.onmessage = (messageEvent) => {
const data = JSON.parse(messageEvent.data);
// ...
};
Unfortunately, this API does not support backpressure. A chatty websocket connection may end up overwhelming your application with messages if your handler is slow. This is a well known problem, and there are atleast two proposals to fix it:
WebSocketStream
- a Web Streams based API for web sockets to handle backpressure automatically. Unfortunately, it is a non-standardised API, and is currently only available on Chrome.WebTransport
- a modern replacement for the WebSocket API. It is standardised, but unfortunately is not available on Safari yet.
At the time of writing, neither of these solutions are available widely enough to be usable in production. Fortunately, we can build our own version of the first proposal, and use the Web Streams API to handle backpressure!
Web Streams
The Streams API is all about processing small chunks of data in a pipeline.
- A
ReadableStream
reads data from an input source in small chunks. - A
TransformStream
can optionally transform each chunk from one format to another. - Finally, a
WritableStream
writes each chunk to a destination sink.
There can be any number of TransformStreams
in the middle of a pipeline, but only one readable stream at the start, and only one writable stream at the end.
Backpressure
If a source produces data faster than the destination can receive it, it causes backpressure in the pipeline. This pressure must be handled correctly, or it can lead to an application’s memory usage to grow uncontrollably, eventually causing it to lock up or crash.
To deal with backpressure, you can follow one of three strategies:
- Signal the source to slow down — this is great if your input source controls the speed at which it reads data — filesystems, HTTP responses, etc.
- Queue the unprocessed messages for consumption later by the stream — the size of the queue must be bounded though, so there is a limit to the number of messages that can be kept waiting.
- Drop messages — while it may not work for all applications, this is a perfectly valid strategy.
Web streams support all three strategies, and use an internal queue to buffer unprocessed data. The size of this queue is configurable through the highWaterMark
property.
Thankfully, web streams have built in support for handling backpressure, and there is an excellent MDN guide on how to do so.
Web Sockets with Streams
In a web socket connection, you can not control the speed at which a server sends messages. Therefore, you have no choice but to drop them if the stream is backed up. When the stream is free to process data again, you can resume receiving messages.
You can implement this behaviour by creating a pull-based ReadableStream
from a web socket.
function streamWs(ws: WebSocket): ReadableStream<MessageEvent> {
const stream = new ReadableStream<MessageEvent>({
// mandatory, but we don't need to do anything here
start: () => {},
pull: async (controller) => {
const message = await nextMsg(ws);
controller.enqueue(message);
},
});
}
async function nextMsg(ws: WebSocket): Promise<MessageEvent> {
const message = await new Promise<MessageEvent>((resolve, reject) => {
connection.onmessage = resolve;
connection.onerror = reject;
});
return message;
}
Whenever the stream has the ability to accept more messages, it calls pull
to fetch data from the source. This implementation await
s on the next message from the socket, and gives it to the stream.
The stream will continue pulling data until:
- Either the consumer tells the stream to slow down, or
- the internal queue of the stream is full.
The size of the internal queue can be configured by supplying a backpressure strategy:
function streamWs(ws: WebSocket): ReadableStream<MessageEvent> {
const stream = new ReadableStream<MessageEvent>(
{
start: () => {},
pull: (controller) => { ... },
},
new CountQueuingStrategy({ highWaterMark: 100 })
);
}
A stream should also propagate cancellation signals to its source, so you can close the web socket when the stream is cancelled.
function streamWs(ws: WebSocket): ReadableStream<MessageEvent> {
const stream = new ReadableStream<MessageEvent>(
{
start: () => {},
pull: (controller) => { ... },
cancel: () => {
ws.close();
},
},
new CountQueuingStrategy({ highWaterMark: 100 })
);
}
And voila! We now have a ReadableStream
backed by a web socket that supports backpressure.
Usage
For my open source project ATProto Browser, I recently built a live feed of Bluesky posts on the home page based on the Bluesky Jetstream.
Jetstream gives you the ability to listen to Bluesky’s firehose for realtime updates on the network over a web socket connection.
I used the implementation described above to subscribe to Jetstream updates in a React component. Here is a simplified implementation.
import { useEffect, useState, useCallback } from "react";
function useJetstream() {
// Buffered posts in memory
const [posts, setPosts] = useState([]);
const onNewPost = useCallback((post: AppBskyFeedPost.Record) => {
// Add the new post to the front of the buffer
// and remove the oldest post from the back.
setPosts([post, ...posts.slice(0, BUFFER_SIZE - 1)]);
}, []);
useEffect(() => {
const controller = new AbortController();
const ws = new WebSocket("wss://jetstream1.us-east.bsky.network");
streamWs(ws).pipeTo(collect(onNewPost), { signal: controller.signal });
// Cancel the stream when the component unmounts
return () => controller.abort();
}, []);
return { posts };
}
You can find the actual implementation here.
Notes
- As mentioned earlier, this implementation drops messages emitted from the web socket whenever the stream is backed up. This is a simple solution, but it may not work for all applications.
- In the future you will probably want to use the Web Transport API instead of WebSockets, which supposedly has better backpressure support.
- The implementation above is a simplified example. The actual implementation in ATProto Browser uses a
TransformStream
to sample incoming updates, and aTransformStream
to convert WebSocket messages to Bluesky posts. It also pauses the stream when the user is not looking at the feed, and resumes it when the feed is in focus.