Change Tracking
Track mutations for incremental sync.
The ChangeTrackingService records every mutation as a change entry with a monotonic serverSeq. These entries power the incremental sync protocol, allowing clients to pull only the changes they have not yet seen.
Architecture
Mutation -> ChangeTrackingService -> __datafn_changes table
-> onChange callback (WebSocket notification)Each change entry contains:
interface ChangeEntry {
namespace: string;
serverSeq: number;
resource: string;
id: string;
op: "insert" | "merge" | "replace" | "upsert" | "delete";
record: Record<string, unknown> | null;
}Monotonic Server Sequence
The serverSeq is a strictly increasing integer per namespace. It serves as the sync cursor. The service allocates sequences atomically using a pluggable SequenceStore.
Sequence Store Implementations
| Store | Backend | Mechanism |
|---|---|---|
RedisSequenceStore | Redis / Valkey | Atomic INCR command |
KVSequenceStore | KV Store | Atomic incr() operation |
DatabaseSequenceStore | Primary database | CAS retry loop on __datafn_meta |
| Combined sequence chain | Primary + database chain | Uses the configured primary store and database sequence store together |
Redis Sequence Store
Provides the highest throughput using Redis atomic INCR:
import { RedisSequenceStore } from "@datafn/server";
const seqStore = new RedisSequenceStore(redisAdapter);
// Key: "serverSeq:<namespace>"Database Sequence Store (CAS)
When no external store is configured, sequences are allocated from the __datafn_meta table using compare-and-swap:
// Read current value
const meta = await db.findOne("__datafn_meta", { namespace });
const currentSeq = meta.next_server_seq;
// Attempt atomic update (CAS)
const affected = await db.update("__datafn_meta",
{ namespace, next_server_seq: currentSeq }, // WHERE condition
{ next_server_seq: currentSeq + count }, // SET
);
// If affected === 0, another process won -- retry with backoffThe CAS loop retries up to 10 times with exponential backoff (10ms * 2^attempt).
Combined Store
The combined chain coordinates primary + database sequence allocation and tracks last-known primary sequence state per namespace:
import { createSequenceStore } from "@datafn/server";
const seqStore = createSequenceStore({
db: adapter,
redis: redisAdapter, // Optional
dbMapping: { serverseq: "redis" },
logger,
});Internal Tables
__datafn_meta
Stores the next_server_seq counter per namespace:
| Column | Type | Description |
|---|---|---|
id | TEXT | Primary key (meta:<namespace>) |
namespace | TEXT | Namespace identifier |
next_server_seq | INTEGER | Next available sequence number |
__datafn_changes
Stores individual change entries:
| Column | Type | Description |
|---|---|---|
id | TEXT | Primary key (change:<namespace>:<seq>:<resource>:<id>) |
namespace | TEXT | Namespace identifier |
server_seq | INTEGER | Monotonic sequence number |
resource | TEXT | Resource name |
record_id | TEXT | Affected record ID |
op | TEXT | Operation type |
record | TEXT | JSON-serialized record (null for deletes) |
created_at | TEXT | ISO 8601 timestamp |
Recording Changes
const cts = new ChangeTrackingService(adapter, "my-namespace", seqStore, onChange);
// Allocate sequence and record a single change
const seq = await cts.getNextServerSeq();
await cts.recordChange({
serverSeq: seq,
resource: "todos",
id: "t1",
op: "insert",
record: { id: "t1", title: "Buy milk" },
});
// Record multiple changes in batch
const seqs = await cts.getNextServerSeqBatch(3);
await cts.recordChanges([
{ serverSeq: seqs[0], resource: "todos", id: "t1", op: "merge", record: { status: "done" } },
{ serverSeq: seqs[1], resource: "todos", id: "t2", op: "delete", record: null },
{ serverSeq: seqs[2], resource: "todos", id: "t3", op: "insert", record: { id: "t3", title: "New" } },
]);Querying Changes
// Get all changes for a resource since a given cursor
const changes = await cts.getChangesSince({
resource: "todos",
sinceSeq: 42,
limit: 100,
});
// Get all changes across all resources (global pull)
const globalChanges = await cts.getGlobalChanges({
sinceSeq: 42,
limit: 200,
});
// Get the current global serverSeq
const currentSeq = await cts.getCurrentServerSeq();Retention and Pruning
Change entries accumulate over time. Use pruneChanges to delete old entries:
// Delete changes older than 30 days
const deletedCount = await cts.pruneChanges(30);onChange Callback
The optional onChange callback is invoked after changes are recorded. It is typically used to notify WebSocket connections:
const cts = new ChangeTrackingService(
adapter,
"my-namespace",
seqStore,
(seq, namespace) => {
// Notify all connected WebSocket clients
wsServer.broadcast({ type: "cursor", cursor: String(seq) });
},
);Transaction Support
The withDb method creates a new ChangeTrackingService that uses a transaction adapter, ensuring change records are written within the same transaction as the mutations they track:
await adapter.transaction(async (trx) => {
const txCts = cts.withDb(trx);
const seq = await txCts.getNextServerSeq();
// Record and mutation happen atomically
await txCts.recordChange({ serverSeq: seq, resource: "todos", id: "t1", op: "insert", record: data });
await trx.create({ model: "todos", data, namespace });
});