Change Tracking
Track mutations for incremental sync.
The ChangeTrackingService records every mutation as a change entry with a monotonic serverSeq. These entries power the incremental sync protocol, allowing clients to pull only the changes they have not yet seen.
Architecture
Mutation -> ChangeTrackingService -> __datafn_changes table
-> onChange callback (WebSocket notification)Each change entry contains:
interface ChangeEntry {
namespace: string;
serverSeq: number;
resource: string;
id: string;
op: "insert" | "merge" | "replace" | "upsert" | "delete";
record: Record<string, unknown> | null;
}Monotonic Server Sequence
The serverSeq is a strictly increasing integer per namespace. It serves as the sync cursor. The service allocates sequences atomically using a pluggable SequenceStore.
Sequence Store Implementations
| Store | Backend | Mechanism |
|---|---|---|
RedisSequenceStore | Redis / Valkey | Atomic INCR command |
KVSequenceStore | KV Store | Atomic incr() operation |
DatabaseSequenceStore | Primary database | CAS retry loop on __datafn_meta |
FallbackSequenceStore | Primary + fallback | Tries primary, falls back to database |
Redis Sequence Store
Provides the highest throughput using Redis atomic INCR:
import { RedisSequenceStore } from "@datafn/server";
const seqStore = new RedisSequenceStore(redisAdapter);
// Key: "serverSeq:<namespace>"Database Sequence Store (CAS)
When no external store is configured, sequences are allocated from the __datafn_meta table using compare-and-swap:
// Read current value
const meta = await db.findOne("__datafn_meta", { namespace });
const currentSeq = meta.next_server_seq;
// Attempt atomic update (CAS)
const affected = await db.update("__datafn_meta",
{ namespace, next_server_seq: currentSeq }, // WHERE condition
{ next_server_seq: currentSeq + count }, // SET
);
// If affected === 0, another process won -- retry with backoffThe CAS loop retries up to 10 times with exponential backoff (10ms * 2^attempt).
Fallback Store
The FallbackSequenceStore tries the primary store first and falls back to the database on failure. It tracks the last known primary sequence per namespace to prevent duplicate sequences after failover:
import { createSequenceStore } from "@datafn/server";
const seqStore = createSequenceStore({
db: adapter,
redis: redisAdapter, // Optional
dbMapping: { serverseq: "redis" },
logger,
});Internal Tables
__datafn_meta
Stores the next_server_seq counter per namespace:
| Column | Type | Description |
|---|---|---|
id | TEXT | Primary key (meta:<namespace>) |
namespace | TEXT | Namespace identifier |
next_server_seq | INTEGER | Next available sequence number |
__datafn_changes
Stores individual change entries:
| Column | Type | Description |
|---|---|---|
id | TEXT | Primary key (change:<namespace>:<seq>:<resource>:<id>) |
namespace | TEXT | Namespace identifier |
server_seq | INTEGER | Monotonic sequence number |
resource | TEXT | Resource name |
record_id | TEXT | Affected record ID |
op | TEXT | Operation type |
record | TEXT | JSON-serialized record (null for deletes) |
created_at | TEXT | ISO 8601 timestamp |
Recording Changes
const cts = new ChangeTrackingService(adapter, "my-namespace", seqStore, onChange);
// Allocate sequence and record a single change
const seq = await cts.getNextServerSeq();
await cts.recordChange({
serverSeq: seq,
resource: "todos",
id: "t1",
op: "insert",
record: { id: "t1", title: "Buy milk" },
});
// Record multiple changes in batch
const seqs = await cts.getNextServerSeqBatch(3);
await cts.recordChanges([
{ serverSeq: seqs[0], resource: "todos", id: "t1", op: "merge", record: { status: "done" } },
{ serverSeq: seqs[1], resource: "todos", id: "t2", op: "delete", record: null },
{ serverSeq: seqs[2], resource: "todos", id: "t3", op: "insert", record: { id: "t3", title: "New" } },
]);Querying Changes
// Get all changes for a resource since a given cursor
const changes = await cts.getChangesSince({
resource: "todos",
sinceSeq: 42,
limit: 100,
});
// Get all changes across all resources (global pull)
const globalChanges = await cts.getGlobalChanges({
sinceSeq: 42,
limit: 200,
});
// Get the current global serverSeq
const currentSeq = await cts.getCurrentServerSeq();Retention and Pruning
Change entries accumulate over time. Use pruneChanges to delete old entries:
// Delete changes older than 30 days
const deletedCount = await cts.pruneChanges(30);onChange Callback
The optional onChange callback is invoked after changes are recorded. It is typically used to notify WebSocket connections:
const cts = new ChangeTrackingService(
adapter,
"my-namespace",
seqStore,
(seq, namespace) => {
// Notify all connected WebSocket clients
wsServer.broadcast({ type: "cursor", cursor: String(seq) });
},
);Transaction Support
The withDb method creates a new ChangeTrackingService that uses a transaction adapter, ensuring change records are written within the same transaction as the mutations they track:
await adapter.transaction(async (trx) => {
const txCts = cts.withDb(trx);
const seq = await txCts.getNextServerSeq();
// Record and mutation happen atomically
await txCts.recordChange({ serverSeq: seq, resource: "todos", id: "t1", op: "insert", record: data });
await trx.create({ model: "todos", data, namespace });
});