DataFn
Server

Server Setup

Create and configure a DataFn server.

DataFn supports both TypeScript (Node.js) and Python servers. Use the language tabs to switch between examples.

Installation

npm install @datafn/server @datafn/core
pip install datafn

Or in pyproject.toml:

[project]
dependencies = [
    "datafn",
]

Creating a Server

Use createDatafnServer() to initialize a DataFn server instance. The function validates your schema at startup, initializes the database adapter, and returns a router with all DataFn endpoints.

import { createDatafnServer } from "@datafn/server";
import { drizzleAdapter } from "@superfunctions/db/adapters";
import { ns } from "@datafn/core";
import { schema } from "./schema";

const server = await createDatafnServer({
  schema,
  db: drizzleAdapter({ db: drizzle, dialect: "postgres" }),
  plugins: [],
  debug: process.env.NODE_ENV !== "production",
  rest: true,
  limits: {
    maxLimit: 100,
    maxPayloadBytes: 5_242_880,
    maxTransactSteps: 100,
    maxPullLimit: 1000,
    maxSelectTokens: 50,
    maxFilterKeysPerLevel: 20,
    maxSortFields: 10,
    maxAggregations: 20,
    maxIdLength: 255,
    maxBatchSize: 500,
    maxBatchQueryConcurrency: 20,
  },
  authorize: async (ctx, action, payload) => {
    return ctx.session?.authenticated === true;
  },
  namespaceProvider: {
    getNamespace: (ctx) => ns(ctx.session.orgId, ctx.session.userId),
    getActorId: (ctx) => ctx.session.userId,
  },
  retention: {
    changeLogDays: 30,
    idempotencyDays: 7,
    pruneOnStartup: true,
    pruneIntervalMs: 3_600_000,
  },
  rateLimit: {
    enabled: true,
    maxRequests: 100,
    windowSeconds: 60,
    endpoints: {
      mutation: { maxRequests: 50, windowSeconds: 60 },
    },
  },
  observability: {
    timing: true,
    onTiming: (event) => console.log(event),
  },
  ws: {
    maxConnections: 10_000,
    maxConnectionsPerNamespace: 100,
    heartbeatIntervalMs: 30_000,
    heartbeatTimeoutMs: 10_000,
  },
  shutdownTimeoutMs: 10_000,
  logger: customLogger,
});

Use create_datafn_server() to initialize a DataFn server. It validates your schema and returns a dictionary with route handlers.

from datafn import create_datafn_server
from datafn.server import DatafnServerConfig

schema = {
    "resources": [
        {
            "name": "todos",
            "version": 1,
            "idPrefix": "td",
            "fields": [
                {"name": "title", "type": "string", "required": True},
                {"name": "completed", "type": "boolean", "required": True},
            ],
        }
    ]
}

config = DatafnServerConfig(
    schema=schema,
    db=my_db_adapter,
    authorize=my_authorize_fn,
    limits={"maxLimit": 100},
)

server = create_datafn_server(config)

You can also pass configuration as a plain dictionary:

server = create_datafn_server({
    "schema": schema,
    "db": my_db_adapter,
    "authorize": my_authorize_fn,
    "limits": {"maxLimit": 50},
})

Python Configuration Options

ParameterTypeDescription
schemadictRequired. DataFn schema definition with resources and optional relations.
dbAdapterDatabase adapter implementing the adapter protocol.
authorizecallableAsync authorization callback (ctx, action, payload) -> bool.
limitsdictOptional limits: maxLimit, maxTransactSteps, maxPayloadBytes.

Configuration Reference

Required

OptionTypeDescription
schemaDatafnSchemaThe DataFn schema definition. Validated at startup; throws if invalid.

Database and Storage

OptionTypeDefaultDescription
dbAdapterundefinedDatabase adapter for persistence.
redisRedisAdapterundefinedRedis adapter for atomic operations (serverSeq, rate limiting).
kvStoreKVStoreAdapterundefinedKV store adapter. Must support incr() for serverSeq.
dbMappingDbMappingundefinedMaps features to specific databases (e.g., { serverseq: "redis" }).
rowLevelNamespacefalse | RowLevelNamespaceConfigautoRow-level namespace isolation. Auto-enabled when namespaceProvider is present. Set to false to disable.

Authorization and Security

OptionTypeDefaultDescription
authorize(ctx, action, payload) => booleanundefinedPer-request authorization callback. Actions: "status", "query", "mutation", "transact", "seed", "clone", "pull", "push", "reconcile".
namespaceProvider{ getNamespace: (ctx) => string; getActorId?: (ctx) => string }undefinedMaps requests to a namespace string for data isolation. Optional getActorId for audit attribution.
allowUnknownResourcesbooleanfalseAllow access to resources without a permissions policy. Use only in development.
debugbooleanNODE_ENV !== 'production'When false, error messages are generic (no field names or schema details).

Limits

OptionTypeDefaultDescription
limits.maxLimitnumber100Maximum query result limit.
limits.maxTransactStepsnumber100Maximum steps in a transaction.
limits.maxPayloadBytesnumber5,242,880Maximum request body size (5 MB).
limits.maxPullLimitnumber1,000Maximum changes returned per pull.
limits.maxSelectTokensnumber50Maximum select tokens per query.
limits.maxFilterKeysPerLevelnumber20Maximum filter keys per nesting level.
limits.maxSortFieldsnumber10Maximum sort fields per query.
limits.maxAggregationsnumber20Maximum aggregation definitions per query.
limits.maxIdLengthnumber255Maximum character length for record IDs.
limits.maxBatchSizenumber500Maximum batch size for /datafn/push and batch /datafn/mutation.
limits.maxBatchQueryConcurrencynumber20Maximum in-flight query concurrency for batch /datafn/query.

Features

OptionTypeDefaultDescription
pluginsDatafnPlugin[][]Server-side plugins (for example custom query/mutation hooks).
restbooleanfalseEnable REST endpoint wrappers.
getServerTime() => numberDate.nowServer time provider. Useful for testing.
retentionRetentionConfigundefinedData retention for change log and idempotency tables.
rateLimitRateLimitConfigundefinedRate limiting configuration.
observabilityObservabilityConfigundefinedTiming and observability hooks.
wsWsManagerConfigundefinedWebSocket connection limits and heartbeat settings.
shutdownTimeoutMsnumber10,000Graceful shutdown drain timeout in milliseconds.
loggerDatafnLoggerconsole-basedPluggable logger. JSON output in production, pretty-print in dev.

Server Instance

createDatafnServer() returns a DatafnServer object:

interface DatafnServer<TContext = any> {
  router: Router<TContext>;
  websocketHandler: {
    addClient(client: WebSocketClient, authContext: WsAuthContext): boolean;
    removeClient(client: WebSocketClient): void;
    handleMessage(client: WebSocketClient, data: string): void;
    handlePong(client: WebSocketClient): void;
  };
  close(): Promise<void>;
}

The router handles all HTTP routing. Integrate it with your HTTP server framework (Bun, Node, etc.).

Schema Validation

The schema is validated at startup using validateSchema(). If validation fails, createDatafnServer() throws synchronously with a descriptive error:

Schema validation failed: <details>

Built-in KV resources are automatically added to the schema after validation.

Graceful Shutdown

Call server.close() to initiate graceful shutdown. The sequence is:

  1. New requests are immediately rejected with HTTP 503 (SERVICE_UNAVAILABLE).
  2. All connected WebSocket clients receive close frame 1001 (Going Away).
  3. The server waits up to shutdownTimeoutMs for in-flight requests to complete.
  4. Periodic timers (pruning, rate limit cleanup) are cleared.

The server also registers SIGTERM and SIGINT handlers that trigger shutdown automatically.

// Manual shutdown
await server.close();

// Automatic via process signals
// SIGTERM and SIGINT are handled automatically