DataFn
Storage

Custom Storage

Implement your own storage adapter.

You can implement a custom storage adapter for specialized use cases such as SQLite in React Native, encrypted storage, or cloud-backed persistence.

Interface

Implement the DatafnStorageAdapter interface:

import type { DatafnStorageAdapter, DatafnHydrationState, DatafnChangelogEntry } from "@datafn/client";

class CustomStorageAdapter implements DatafnStorageAdapter {
  // Records
  async getRecord(resource: string, id: string): Promise<Record<string, unknown> | null> { /* ... */ }
  async listRecords(resource: string): Promise<Record<string, unknown>[]> { /* ... */ }
  async upsertRecord(resource: string, record: Record<string, unknown>): Promise<void> { /* ... */ }
  async deleteRecord(resource: string, id: string): Promise<void> { /* ... */ }
  async mergeRecord(resource: string, id: string, partial: Record<string, unknown>): Promise<Record<string, unknown>> { /* ... */ }

  // Join rows
  async listJoinRows(relationKey: string): Promise<Array<Record<string, unknown>>> { /* ... */ }
  async getJoinRows(relationKey: string, fromId: string): Promise<Array<Record<string, unknown>>> { /* ... */ }
  async getJoinRowsInverse(relationKey: string, toId: string): Promise<Array<Record<string, unknown>>> { /* ... */ }
  async upsertJoinRow(relationKey: string, row: Record<string, unknown>): Promise<void> { /* ... */ }
  async setJoinRows(relationKey: string, rows: Array<Record<string, unknown>>): Promise<void> { /* ... */ }
  async deleteJoinRow(relationKey: string, from: string, to: string): Promise<void> { /* ... */ }

  // Convenience query
  async findRecords(resource: string, field: string, value: unknown): Promise<Record<string, unknown>[]> { /* ... */ }

  // Sync state
  async getCursor(resource: string): Promise<string | null> { /* ... */ }
  async setCursor(resource: string, cursor: string | null): Promise<void> { /* ... */ }
  async getHydrationState(resource: string): Promise<DatafnHydrationState> { /* ... */ }
  async setHydrationState(resource: string, state: DatafnHydrationState): Promise<void> { /* ... */ }

  // Changelog
  async changelogAppend(entry: Omit<DatafnChangelogEntry, "seq">): Promise<DatafnChangelogEntry> { /* ... */ }
  async changelogList(options?: { limit?: number }): Promise<DatafnChangelogEntry[]> { /* ... */ }
  async changelogAck(options: { throughSeq: number }): Promise<void> { /* ... */ }

  // Counts
  async countRecords(resource: string): Promise<number> { /* ... */ }
  async countJoinRows(relationKey: string): Promise<number> { /* ... */ }

  // Lifecycle
  async close(): Promise<void> { /* ... */ }
  async clearAll(): Promise<void> { /* ... */ }
  async healthCheck(): Promise<{ ok: boolean; issues: string[] }> { /* ... */ }
}

Key Implementation Requirements

Atomic mergeRecord

The mergeRecord method must be atomic: read the existing record, merge fields, and write back in a single transaction. If your storage backend supports transactions, use them. If not, use a mutex or optimistic concurrency control.

The merge behavior is one-level deep:

  • Scalar fields: overwrite.
  • Object fields: shallow-merge the nested object.
  • Arrays: overwrite (no element-level merging).
async mergeRecord(
  resource: string,
  id: string,
  partial: Record<string, unknown>,
): Promise<Record<string, unknown>> {
  // Begin transaction
  const existing = await this.getRecord(resource, id);
  const merged = existing ? { ...existing } : { id };

  for (const [key, value] of Object.entries(partial)) {
    if (
      value !== null && typeof value === "object" && !Array.isArray(value) &&
      merged[key] !== null && typeof merged[key] === "object" && !Array.isArray(merged[key])
    ) {
      merged[key] = { ...(merged[key] as object), ...(value as object) };
    } else {
      merged[key] = value;
    }
  }

  merged.id = id;
  await this.upsertRecord(resource, merged);
  // Commit transaction
  return merged;
}

Monotonic Changelog Sequence

The seq field in changelog entries must be monotonically increasing. Each call to changelogAppend must return a seq greater than all previous entries. Use auto-increment or an in-memory counter.

Changelog Deduplication

changelogAppend must deduplicate by (clientId, mutationId). If an entry with the same pair already exists, return the existing entry without creating a duplicate.

Cursor Persistence

Cursors are string values (typically stringified integers) stored per resource. The special key "__global_cursor__" is used for the global sync cursor and should bypass resource name validation.

Deterministic Ordering

listRecords should return records sorted by id ascending. listJoinRows should sort by from then to. This ensures consistent behavior across storage backends.

Example: SQLite Adapter Skeleton

class SQLiteStorageAdapter implements DatafnStorageAdapter {
  constructor(private db: SQLiteDatabase) {}

  async getRecord(resource: string, id: string) {
    return this.db.get(`SELECT * FROM ${resource} WHERE id = ?`, [id]) ?? null;
  }

  async listRecords(resource: string) {
    return this.db.all(`SELECT * FROM ${resource} ORDER BY id ASC`);
  }

  async upsertRecord(resource: string, record: Record<string, unknown>) {
    const keys = Object.keys(record);
    const values = Object.values(record);
    const placeholders = keys.map(() => "?").join(", ");
    const updates = keys.map((k) => `${k} = excluded.${k}`).join(", ");

    this.db.run(
      `INSERT INTO ${resource} (${keys.join(", ")}) VALUES (${placeholders})
       ON CONFLICT(id) DO UPDATE SET ${updates}`,
      values,
    );
  }

  // ... implement remaining methods
}

Registration

Pass your custom adapter to the client:

const client = createDatafnClient({
  schema,
  storage: new CustomStorageAdapter(),
});

Or use a factory for multi-user isolation:

const client = createDatafnClient({
  schema,
  storage: (context) => new CustomStorageAdapter(context.userId),
});