DataFn
Server

Server Setup

Create and configure a DataFn server.

Creating a Server

Use createDatafnServer() to initialize a DataFn server instance. The function validates your schema at startup, initializes the database adapter, and returns a router with all DataFn endpoints.

import { createDatafnServer } from "@datafn/server";
import { createDrizzleAdapter } from "@superfunctions/db/drizzle";
import { schema } from "./schema";

const server = await createDatafnServer({
  schema,
  db: createDrizzleAdapter(drizzle),
  plugins: [],
  debug: process.env.NODE_ENV !== "production",
  rest: true,
  limits: {
    maxLimit: 100,
    maxPayloadBytes: 5_242_880,
    maxTransactSteps: 100,
    maxPullLimit: 1000,
    maxSelectTokens: 50,
    maxFilterKeysPerLevel: 20,
    maxSortFields: 10,
    maxAggregations: 20,
    maxIdLength: 255,
  },
  authorize: async (ctx, action, payload) => {
    return ctx.session?.authenticated === true;
  },
  authContextProvider: {
    getContext: (ctx) => ({
      userId: ctx.session.userId,
      tenantId: ctx.session.tenantId,
    }),
  },
  retention: {
    changeLogDays: 30,
    idempotencyDays: 7,
    pruneOnStartup: true,
    pruneIntervalMs: 3_600_000,
  },
  rateLimit: {
    enabled: true,
    maxRequests: 100,
    windowSeconds: 60,
    endpoints: {
      mutation: { maxRequests: 50, windowSeconds: 60 },
    },
  },
  observability: {
    timing: true,
    onTiming: (event) => console.log(event),
  },
  ws: {
    maxConnections: 10_000,
    maxConnectionsPerNamespace: 100,
    heartbeatIntervalMs: 30_000,
    heartbeatTimeoutMs: 10_000,
  },
  shutdownTimeoutMs: 10_000,
  logger: customLogger,
});

Configuration Reference

Required

OptionTypeDescription
schemaDatafnSchemaThe DataFn schema definition. Validated at startup; throws if invalid.

Database and Storage

OptionTypeDefaultDescription
dbAdapterundefinedDatabase adapter for persistence.
redisRedisAdapterundefinedRedis adapter for atomic operations (serverSeq, rate limiting).
kvStoreKVStoreAdapterundefinedKV store adapter. Must support incr() for serverSeq.
dbMappingDbMappingundefinedMaps features to specific databases (e.g., { serverseq: "redis" }).
rowLevelNamespacefalse | RowLevelNamespaceConfigautoRow-level namespace isolation. Auto-enabled when authContextProvider is present. Set to false to disable.

Authorization and Security

OptionTypeDefaultDescription
authorize(ctx, action, payload) => booleanundefinedPer-request authorization callback. Actions: "status", "query", "mutation", "transact", "seed", "clone", "pull", "push", "reconcile".
authContextProvider{ getContext: (ctx) => AuthContext }undefinedExtracts user/tenant context for namespace isolation.
allowUnknownResourcesbooleanfalseAllow access to resources without a permissions policy. Use only in development.
debugbooleanNODE_ENV !== 'production'When false, error messages are generic (no field names or schema details).

Limits

OptionTypeDefaultDescription
limits.maxLimitnumber100Maximum query result limit.
limits.maxTransactStepsnumber100Maximum steps in a transaction.
limits.maxPayloadBytesnumber5,242,880Maximum request body size (5 MB).
limits.maxPullLimitnumber1,000Maximum changes returned per pull.
limits.maxSelectTokensnumber50Maximum select tokens per query.
limits.maxFilterKeysPerLevelnumber20Maximum filter keys per nesting level.
limits.maxSortFieldsnumber10Maximum sort fields per query.
limits.maxAggregationsnumber20Maximum aggregation definitions per query.
limits.maxIdLengthnumber255Maximum character length for record IDs.

Features

OptionTypeDefaultDescription
pluginsDatafnPlugin[][]Server-side plugins (e.g., searchfn, softDelete).
restbooleanfalseEnable REST endpoint wrappers.
getServerTime() => numberDate.nowServer time provider. Useful for testing.
retentionRetentionConfigundefinedData retention for change log and idempotency tables.
rateLimitRateLimitConfigundefinedRate limiting configuration.
observabilityObservabilityConfigundefinedTiming and observability hooks.
wsWsManagerConfigundefinedWebSocket connection limits and heartbeat settings.
shutdownTimeoutMsnumber10,000Graceful shutdown drain timeout in milliseconds.
loggerDatafnLoggerconsole-basedPluggable logger. JSON output in production, pretty-print in dev.

Server Instance

createDatafnServer() returns a DatafnServer object:

interface DatafnServer<TContext = any> {
  router: Router<TContext>;
  websocketHandler: {
    addClient(client: WebSocketClient, authContext: WsAuthContext): boolean;
    removeClient(client: WebSocketClient): void;
    handleMessage(client: WebSocketClient, data: string): void;
    handlePong(client: WebSocketClient): void;
  };
  close(): Promise<void>;
}

The router handles all HTTP routing. Integrate it with your HTTP server framework (Bun, Node, etc.).

Schema Validation

The schema is validated at startup using validateSchema(). If validation fails, createDatafnServer() throws synchronously with a descriptive error:

Schema validation failed: <details>

Built-in KV resources are automatically added to the schema after validation.

Graceful Shutdown

Call server.close() to initiate graceful shutdown. The sequence is:

  1. New requests are immediately rejected with HTTP 503 (SERVICE_UNAVAILABLE).
  2. All connected WebSocket clients receive close frame 1001 (Going Away).
  3. The server waits up to shutdownTimeoutMs for in-flight requests to complete.
  4. Periodic timers (pruning, rate limit cleanup) are cleared.

The server also registers SIGTERM and SIGINT handlers that trigger shutdown automatically.

// Manual shutdown
await server.close();

// Automatic via process signals
// SIGTERM and SIGINT are handled automatically