Custom Storage
Implement your own storage adapter.
You can implement a custom storage adapter for specialized use cases such as SQLite in React Native, encrypted storage, or cloud-backed persistence.
Interface
Implement the DatafnStorageAdapter interface:
import type { DatafnStorageAdapter, DatafnHydrationState, DatafnChangelogEntry } from "@datafn/client";
class CustomStorageAdapter implements DatafnStorageAdapter {
// Records
async getRecord(resource: string, id: string): Promise<Record<string, unknown> | null> { /* ... */ }
async listRecords(resource: string): Promise<Record<string, unknown>[]> { /* ... */ }
async upsertRecord(resource: string, record: Record<string, unknown>): Promise<void> { /* ... */ }
async deleteRecord(resource: string, id: string): Promise<void> { /* ... */ }
async mergeRecord(resource: string, id: string, partial: Record<string, unknown>): Promise<Record<string, unknown>> { /* ... */ }
// Join rows
async listJoinRows(relationKey: string): Promise<Array<Record<string, unknown>>> { /* ... */ }
async getJoinRows(relationKey: string, fromId: string): Promise<Array<Record<string, unknown>>> { /* ... */ }
async getJoinRowsInverse(relationKey: string, toId: string): Promise<Array<Record<string, unknown>>> { /* ... */ }
async upsertJoinRow(relationKey: string, row: Record<string, unknown>): Promise<void> { /* ... */ }
async setJoinRows(relationKey: string, rows: Array<Record<string, unknown>>): Promise<void> { /* ... */ }
async deleteJoinRow(relationKey: string, from: string, to: string): Promise<void> { /* ... */ }
// Convenience query
async findRecords(resource: string, field: string, value: unknown): Promise<Record<string, unknown>[]> { /* ... */ }
// Sync state
async getCursor(resource: string): Promise<string | null> { /* ... */ }
async setCursor(resource: string, cursor: string | null): Promise<void> { /* ... */ }
async getHydrationState(resource: string): Promise<DatafnHydrationState> { /* ... */ }
async setHydrationState(resource: string, state: DatafnHydrationState): Promise<void> { /* ... */ }
// Changelog
async changelogAppend(entry: Omit<DatafnChangelogEntry, "seq">): Promise<DatafnChangelogEntry> { /* ... */ }
async changelogList(options?: { limit?: number }): Promise<DatafnChangelogEntry[]> { /* ... */ }
async changelogAck(options: { throughSeq: number }): Promise<void> { /* ... */ }
// Counts
async countRecords(resource: string): Promise<number> { /* ... */ }
async countJoinRows(relationKey: string): Promise<number> { /* ... */ }
// Lifecycle
async close(): Promise<void> { /* ... */ }
async clearAll(): Promise<void> { /* ... */ }
async healthCheck(): Promise<{ ok: boolean; issues: string[] }> { /* ... */ }
}Key Implementation Requirements
Atomic mergeRecord
The mergeRecord method must be atomic: read the existing record, merge fields, and write back in a single transaction. If your storage backend supports transactions, use them. If not, use a mutex or optimistic concurrency control.
The merge behavior is one-level deep:
- Scalar fields: overwrite.
- Object fields: shallow-merge the nested object.
- Arrays: overwrite (no element-level merging).
async mergeRecord(
resource: string,
id: string,
partial: Record<string, unknown>,
): Promise<Record<string, unknown>> {
// Begin transaction
const existing = await this.getRecord(resource, id);
const merged = existing ? { ...existing } : { id };
for (const [key, value] of Object.entries(partial)) {
if (
value !== null && typeof value === "object" && !Array.isArray(value) &&
merged[key] !== null && typeof merged[key] === "object" && !Array.isArray(merged[key])
) {
merged[key] = { ...(merged[key] as object), ...(value as object) };
} else {
merged[key] = value;
}
}
merged.id = id;
await this.upsertRecord(resource, merged);
// Commit transaction
return merged;
}Monotonic Changelog Sequence
The seq field in changelog entries must be monotonically increasing. Each call to changelogAppend must return a seq greater than all previous entries. Use auto-increment or an in-memory counter.
Changelog Deduplication
changelogAppend must deduplicate by (clientId, mutationId). If an entry with the same pair already exists, return the existing entry without creating a duplicate.
Cursor Persistence
Cursors are string values (typically stringified integers) stored per resource. The special key "__global_cursor__" is used for the global sync cursor and should bypass resource name validation.
Deterministic Ordering
listRecords should return records sorted by id ascending. listJoinRows should sort by from then to. This ensures consistent behavior across storage backends.
Example: SQLite Adapter Skeleton
class SQLiteStorageAdapter implements DatafnStorageAdapter {
constructor(private db: SQLiteDatabase) {}
async getRecord(resource: string, id: string) {
return this.db.get(`SELECT * FROM ${resource} WHERE id = ?`, [id]) ?? null;
}
async listRecords(resource: string) {
return this.db.all(`SELECT * FROM ${resource} ORDER BY id ASC`);
}
async upsertRecord(resource: string, record: Record<string, unknown>) {
const keys = Object.keys(record);
const values = Object.values(record);
const placeholders = keys.map(() => "?").join(", ");
const updates = keys.map((k) => `${k} = excluded.${k}`).join(", ");
this.db.run(
`INSERT INTO ${resource} (${keys.join(", ")}) VALUES (${placeholders})
ON CONFLICT(id) DO UPDATE SET ${updates}`,
values,
);
}
// ... implement remaining methods
}Registration
Pass your custom adapter to the client:
const client = createDatafnClient({
schema,
storage: new CustomStorageAdapter(),
});Or use a factory for multi-user isolation:
const client = createDatafnClient({
schema,
storage: (context) => new CustomStorageAdapter(context.userId),
});