DataFn
Advanced

Change Tracking

Track mutations for incremental sync.

The ChangeTrackingService records every mutation as a change entry with a monotonic serverSeq. These entries power the incremental sync protocol, allowing clients to pull only the changes they have not yet seen.

Architecture

Mutation -> ChangeTrackingService -> __datafn_changes table
                                 -> onChange callback (WebSocket notification)

Each change entry contains:

interface ChangeEntry {
  namespace: string;
  serverSeq: number;
  resource: string;
  id: string;
  op: "insert" | "merge" | "replace" | "upsert" | "delete";
  record: Record<string, unknown> | null;
}

Monotonic Server Sequence

The serverSeq is a strictly increasing integer per namespace. It serves as the sync cursor. The service allocates sequences atomically using a pluggable SequenceStore.

Sequence Store Implementations

StoreBackendMechanism
RedisSequenceStoreRedis / ValkeyAtomic INCR command
KVSequenceStoreKV StoreAtomic incr() operation
DatabaseSequenceStorePrimary databaseCAS retry loop on __datafn_meta
FallbackSequenceStorePrimary + fallbackTries primary, falls back to database

Redis Sequence Store

Provides the highest throughput using Redis atomic INCR:

import { RedisSequenceStore } from "@datafn/server";

const seqStore = new RedisSequenceStore(redisAdapter);
// Key: "serverSeq:<namespace>"

Database Sequence Store (CAS)

When no external store is configured, sequences are allocated from the __datafn_meta table using compare-and-swap:

// Read current value
const meta = await db.findOne("__datafn_meta", { namespace });
const currentSeq = meta.next_server_seq;

// Attempt atomic update (CAS)
const affected = await db.update("__datafn_meta",
  { namespace, next_server_seq: currentSeq },     // WHERE condition
  { next_server_seq: currentSeq + count },         // SET
);

// If affected === 0, another process won -- retry with backoff

The CAS loop retries up to 10 times with exponential backoff (10ms * 2^attempt).

Fallback Store

The FallbackSequenceStore tries the primary store first and falls back to the database on failure. It tracks the last known primary sequence per namespace to prevent duplicate sequences after failover:

import { createSequenceStore } from "@datafn/server";

const seqStore = createSequenceStore({
  db: adapter,
  redis: redisAdapter,          // Optional
  dbMapping: { serverseq: "redis" },
  logger,
});

Internal Tables

__datafn_meta

Stores the next_server_seq counter per namespace:

ColumnTypeDescription
idTEXTPrimary key (meta:<namespace>)
namespaceTEXTNamespace identifier
next_server_seqINTEGERNext available sequence number

__datafn_changes

Stores individual change entries:

ColumnTypeDescription
idTEXTPrimary key (change:<namespace>:<seq>:<resource>:<id>)
namespaceTEXTNamespace identifier
server_seqINTEGERMonotonic sequence number
resourceTEXTResource name
record_idTEXTAffected record ID
opTEXTOperation type
recordTEXTJSON-serialized record (null for deletes)
created_atTEXTISO 8601 timestamp

Recording Changes

const cts = new ChangeTrackingService(adapter, "my-namespace", seqStore, onChange);

// Allocate sequence and record a single change
const seq = await cts.getNextServerSeq();
await cts.recordChange({
  serverSeq: seq,
  resource: "todos",
  id: "t1",
  op: "insert",
  record: { id: "t1", title: "Buy milk" },
});

// Record multiple changes in batch
const seqs = await cts.getNextServerSeqBatch(3);
await cts.recordChanges([
  { serverSeq: seqs[0], resource: "todos", id: "t1", op: "merge", record: { status: "done" } },
  { serverSeq: seqs[1], resource: "todos", id: "t2", op: "delete", record: null },
  { serverSeq: seqs[2], resource: "todos", id: "t3", op: "insert", record: { id: "t3", title: "New" } },
]);

Querying Changes

// Get all changes for a resource since a given cursor
const changes = await cts.getChangesSince({
  resource: "todos",
  sinceSeq: 42,
  limit: 100,
});

// Get all changes across all resources (global pull)
const globalChanges = await cts.getGlobalChanges({
  sinceSeq: 42,
  limit: 200,
});

// Get the current global serverSeq
const currentSeq = await cts.getCurrentServerSeq();

Retention and Pruning

Change entries accumulate over time. Use pruneChanges to delete old entries:

// Delete changes older than 30 days
const deletedCount = await cts.pruneChanges(30);

onChange Callback

The optional onChange callback is invoked after changes are recorded. It is typically used to notify WebSocket connections:

const cts = new ChangeTrackingService(
  adapter,
  "my-namespace",
  seqStore,
  (seq, namespace) => {
    // Notify all connected WebSocket clients
    wsServer.broadcast({ type: "cursor", cursor: String(seq) });
  },
);

Transaction Support

The withDb method creates a new ChangeTrackingService that uses a transaction adapter, ensuring change records are written within the same transaction as the mutations they track:

await adapter.transaction(async (trx) => {
  const txCts = cts.withDb(trx);
  const seq = await txCts.getNextServerSeq();
  // Record and mutation happen atomically
  await txCts.recordChange({ serverSeq: seq, resource: "todos", id: "t1", op: "insert", record: data });
  await trx.create({ model: "todos", data, namespace });
});