DataFn
Server

WebSockets

Real-time updates via WebSocket connections.

Overview

DataFn uses WebSocket connections to notify clients of new data in real time. When a mutation is pushed to the server, all connected clients in the same namespace receive a cursor update, prompting them to pull the latest changes.

Configuration

Configure WebSocket behavior through the ws option in the server config:

const server = await createDatafnServer({
  schema,
  db,
  ws: {
    maxConnections: 10_000,
    maxConnectionsPerNamespace: 100,
    heartbeatIntervalMs: 30_000,
    heartbeatTimeoutMs: 10_000,
  },
});
OptionDefaultDescription
maxConnections10,000Maximum total simultaneous WebSocket connections.
maxConnectionsPerNamespace100Maximum connections per namespace.
heartbeatIntervalMs30,000Interval between ping frames (ms).
heartbeatTimeoutMs10,000Maximum time to wait for pong before closing the connection (ms).

Connection Setup

WebSocket connections require authentication at connection time. The server provides a websocketHandler on the server instance:

const server = await createDatafnServer({ schema, db });

// In your WebSocket upgrade handler:
wss.on("connection", (ws, req) => {
  // Authenticate the connection (your logic)
  const authContext = authenticate(req);
  if (!authContext) {
    ws.close(4401, "Unauthorized");
    return;
  }

  // Derive namespace from auth context
  const namespace = authContext.tenantId
    ? `tenant:${authContext.tenantId}:user:${authContext.userId}`
    : `user:${authContext.userId}`;

  // Register the client
  const accepted = server.websocketHandler.addClient(ws, { namespace });
  if (!accepted) {
    // Connection limit exceeded -- client was closed with code 4503
    return;
  }

  ws.on("message", (data) => {
    server.websocketHandler.handleMessage(ws, data.toString());
  });

  ws.on("pong", () => {
    server.websocketHandler.handlePong(ws);
  });

  ws.on("close", () => {
    server.websocketHandler.removeClient(ws);
  });
});

Key Design Decisions

  • Namespace is server-derived. The namespace is determined from the WsAuthContext passed to addClient, never from client messages. Client-supplied namespace fields in hello messages are ignored.
  • Authentication before registration. Unauthenticated connections must be rejected with close code 4401 before calling addClient.

Connection Limits

When connection limits are exceeded, addClient returns false and closes the WebSocket with code 4503:

  • Global limit: If total connections reach maxConnections, the new connection is rejected.
  • Per-namespace limit: If connections in the client's namespace reach maxConnectionsPerNamespace, the new connection is rejected.

Heartbeat

The WebSocket manager uses native ping/pong frames to detect dead connections:

  1. Every heartbeatIntervalMs, the server sends a ping frame to each connected client.
  2. If a client does not respond with a pong within heartbeatTimeoutMs, the connection is closed with code 1001.
  3. When your transport layer receives a pong frame, call server.websocketHandler.handlePong(client) to clear the pending timeout.

The heartbeat timer starts lazily when the first client connects, and does not keep the Node.js process alive (the timer is unrefed).

Message Types

hello (client to server)

Sent by the client after connection to register itself:

{ "type": "hello", "clientId": "client_abc", "cursor": "42" }

The server acknowledges the registration internally. Any namespace field in the message is ignored -- the namespace is locked at connection time.

cursor (server to client)

Broadcast to all clients in a namespace after a successful push:

{ "type": "cursor", "cursor": "43" }

Clients should pull changes when they receive a cursor update that is ahead of their local cursor.

Namespace-Scoped Broadcast

Cursor notifications are scoped to the client's namespace. The server maintains a Map<namespace, Set<client>> structure for efficient broadcasting. Only clients in the same namespace receive the notification, with O(clients-in-namespace) performance.

Graceful Shutdown

When server.close() is called:

  1. The heartbeat timer is stopped.
  2. All connected clients receive close frame 1001 (Going Away).
  3. All internal client tracking structures are cleared.