WebSockets
Real-time updates via WebSocket connections.
Overview
DataFn uses WebSocket connections to notify clients of new data in real time. When a mutation is pushed to the server, all connected clients in the same namespace receive a cursor update, prompting them to pull the latest changes.
Configuration
Configure WebSocket behavior through the ws option in the server config:
const server = await createDatafnServer({
schema,
db,
ws: {
maxConnections: 10_000,
maxConnectionsPerNamespace: 100,
heartbeatIntervalMs: 30_000,
heartbeatTimeoutMs: 10_000,
},
});| Option | Default | Description |
|---|---|---|
maxConnections | 10,000 | Maximum total simultaneous WebSocket connections. |
maxConnectionsPerNamespace | 100 | Maximum connections per namespace. |
heartbeatIntervalMs | 30,000 | Interval between ping frames (ms). |
heartbeatTimeoutMs | 10,000 | Maximum time to wait for pong before closing the connection (ms). |
Connection Setup
WebSocket connections require authentication at connection time. The server provides a websocketHandler on the server instance:
const server = await createDatafnServer({ schema, db });
// In your WebSocket upgrade handler:
wss.on("connection", (ws, req) => {
// Authenticate the connection (your logic)
const authContext = authenticate(req);
if (!authContext) {
ws.close(4401, "Unauthorized");
return;
}
// Derive namespace from auth context
const namespace = authContext.tenantId
? `tenant:${authContext.tenantId}:user:${authContext.userId}`
: `user:${authContext.userId}`;
// Register the client
const accepted = server.websocketHandler.addClient(ws, { namespace });
if (!accepted) {
// Connection limit exceeded -- client was closed with code 4503
return;
}
ws.on("message", (data) => {
server.websocketHandler.handleMessage(ws, data.toString());
});
ws.on("pong", () => {
server.websocketHandler.handlePong(ws);
});
ws.on("close", () => {
server.websocketHandler.removeClient(ws);
});
});Key Design Decisions
- Namespace is server-derived. The namespace is determined from the
WsAuthContextpassed toaddClient, never from client messages. Client-supplied namespace fields inhellomessages are ignored. - Authentication before registration. Unauthenticated connections must be rejected with close code 4401 before calling
addClient.
Connection Limits
When connection limits are exceeded, addClient returns false and closes the WebSocket with code 4503:
- Global limit: If total connections reach
maxConnections, the new connection is rejected. - Per-namespace limit: If connections in the client's namespace reach
maxConnectionsPerNamespace, the new connection is rejected.
Heartbeat
The WebSocket manager uses native ping/pong frames to detect dead connections:
- Every
heartbeatIntervalMs, the server sends a ping frame to each connected client. - If a client does not respond with a pong within
heartbeatTimeoutMs, the connection is closed with code 1001. - When your transport layer receives a pong frame, call
server.websocketHandler.handlePong(client)to clear the pending timeout.
The heartbeat timer starts lazily when the first client connects, and does not keep the Node.js process alive (the timer is unrefed).
Message Types
hello (client to server)
Sent by the client after connection to register itself:
{ "type": "hello", "clientId": "client_abc", "cursor": "42" }The server acknowledges the registration internally. Any namespace field in the message is ignored -- the namespace is locked at connection time.
cursor (server to client)
Broadcast to all clients in a namespace after a successful push:
{ "type": "cursor", "cursor": "43" }Clients should pull changes when they receive a cursor update that is ahead of their local cursor.
Namespace-Scoped Broadcast
Cursor notifications are scoped to the client's namespace. The server maintains a Map<namespace, Set<client>> structure for efficient broadcasting. Only clients in the same namespace receive the notification, with O(clients-in-namespace) performance.
Graceful Shutdown
When server.close() is called:
- The heartbeat timer is stopped.
- All connected clients receive close frame 1001 (Going Away).
- All internal client tracking structures are cleared.