DataFn
Advanced

Change Tracking

Track mutations for incremental sync.

The ChangeTrackingService records every mutation as a change entry with a monotonic serverSeq. These entries power the incremental sync protocol, allowing clients to pull only the changes they have not yet seen.

Architecture

Mutation -> ChangeTrackingService -> __datafn_changes table
                                 -> onChange callback (WebSocket notification)

Each change entry contains:

interface ChangeEntry {
  namespace: string;
  serverSeq: number;
  resource: string;
  id: string;
  op: "insert" | "merge" | "replace" | "upsert" | "delete";
  record: Record<string, unknown> | null;
}

Monotonic Server Sequence

The serverSeq is a strictly increasing integer per namespace. It serves as the sync cursor. The service allocates sequences atomically using a pluggable SequenceStore.

Sequence Store Implementations

StoreBackendMechanism
RedisSequenceStoreRedis / ValkeyAtomic INCR command
KVSequenceStoreKV StoreAtomic incr() operation
DatabaseSequenceStorePrimary databaseCAS retry loop on __datafn_meta
Combined sequence chainPrimary + database chainUses the configured primary store and database sequence store together

Redis Sequence Store

Provides the highest throughput using Redis atomic INCR:

import { RedisSequenceStore } from "@datafn/server";

const seqStore = new RedisSequenceStore(redisAdapter);
// Key: "serverSeq:<namespace>"

Database Sequence Store (CAS)

When no external store is configured, sequences are allocated from the __datafn_meta table using compare-and-swap:

// Read current value
const meta = await db.findOne("__datafn_meta", { namespace });
const currentSeq = meta.next_server_seq;

// Attempt atomic update (CAS)
const affected = await db.update("__datafn_meta",
  { namespace, next_server_seq: currentSeq },     // WHERE condition
  { next_server_seq: currentSeq + count },         // SET
);

// If affected === 0, another process won -- retry with backoff

The CAS loop retries up to 10 times with exponential backoff (10ms * 2^attempt).

Combined Store

The combined chain coordinates primary + database sequence allocation and tracks last-known primary sequence state per namespace:

import { createSequenceStore } from "@datafn/server";

const seqStore = createSequenceStore({
  db: adapter,
  redis: redisAdapter,          // Optional
  dbMapping: { serverseq: "redis" },
  logger,
});

Internal Tables

__datafn_meta

Stores the next_server_seq counter per namespace:

ColumnTypeDescription
idTEXTPrimary key (meta:<namespace>)
namespaceTEXTNamespace identifier
next_server_seqINTEGERNext available sequence number

__datafn_changes

Stores individual change entries:

ColumnTypeDescription
idTEXTPrimary key (change:<namespace>:<seq>:<resource>:<id>)
namespaceTEXTNamespace identifier
server_seqINTEGERMonotonic sequence number
resourceTEXTResource name
record_idTEXTAffected record ID
opTEXTOperation type
recordTEXTJSON-serialized record (null for deletes)
created_atTEXTISO 8601 timestamp

Recording Changes

const cts = new ChangeTrackingService(adapter, "my-namespace", seqStore, onChange);

// Allocate sequence and record a single change
const seq = await cts.getNextServerSeq();
await cts.recordChange({
  serverSeq: seq,
  resource: "todos",
  id: "t1",
  op: "insert",
  record: { id: "t1", title: "Buy milk" },
});

// Record multiple changes in batch
const seqs = await cts.getNextServerSeqBatch(3);
await cts.recordChanges([
  { serverSeq: seqs[0], resource: "todos", id: "t1", op: "merge", record: { status: "done" } },
  { serverSeq: seqs[1], resource: "todos", id: "t2", op: "delete", record: null },
  { serverSeq: seqs[2], resource: "todos", id: "t3", op: "insert", record: { id: "t3", title: "New" } },
]);

Querying Changes

// Get all changes for a resource since a given cursor
const changes = await cts.getChangesSince({
  resource: "todos",
  sinceSeq: 42,
  limit: 100,
});

// Get all changes across all resources (global pull)
const globalChanges = await cts.getGlobalChanges({
  sinceSeq: 42,
  limit: 200,
});

// Get the current global serverSeq
const currentSeq = await cts.getCurrentServerSeq();

Retention and Pruning

Change entries accumulate over time. Use pruneChanges to delete old entries:

// Delete changes older than 30 days
const deletedCount = await cts.pruneChanges(30);

onChange Callback

The optional onChange callback is invoked after changes are recorded. It is typically used to notify WebSocket connections:

const cts = new ChangeTrackingService(
  adapter,
  "my-namespace",
  seqStore,
  (seq, namespace) => {
    // Notify all connected WebSocket clients
    wsServer.broadcast({ type: "cursor", cursor: String(seq) });
  },
);

Transaction Support

The withDb method creates a new ChangeTrackingService that uses a transaction adapter, ensuring change records are written within the same transaction as the mutations they track:

await adapter.transaction(async (trx) => {
  const txCts = cts.withDb(trx);
  const seq = await txCts.getNextServerSeq();
  // Record and mutation happen atomically
  await txCts.recordChange({ serverSeq: seq, resource: "todos", id: "t1", op: "insert", record: data });
  await trx.create({ model: "todos", data, namespace });
});