aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/client/sync/crdt/sync-state.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/sync/crdt/sync-state.ts')
-rw-r--r--src/client/sync/crdt/sync-state.ts370
1 files changed, 370 insertions, 0 deletions
diff --git a/src/client/sync/crdt/sync-state.ts b/src/client/sync/crdt/sync-state.ts
new file mode 100644
index 0000000..391ccf0
--- /dev/null
+++ b/src/client/sync/crdt/sync-state.ts
@@ -0,0 +1,370 @@
+/**
+ * CRDT Sync State Management
+ *
+ * This module handles the serialization and persistence of CRDT sync state.
+ * It tracks which documents have been synced and stores the binary state
+ * for incremental sync operations.
+ *
+ * Design:
+ * - Sync state is stored in IndexedDB alongside entity data
+ * - Each entity has a corresponding CRDT binary stored in sync state
+ * - The state tracks sync vectors for efficient incremental updates
+ */
+
+import Dexie from "dexie";
+import { type CrdtEntityTypeValue, createDocumentId } from "./types";
+
+/**
+ * Sync state entry for a single CRDT document
+ */
+export interface CrdtSyncStateEntry {
+ /** Document ID in format entityType:entityId */
+ documentId: string;
+ /** Entity type */
+ entityType: CrdtEntityTypeValue;
+ /** Entity ID */
+ entityId: string;
+ /** Binary representation of the CRDT document */
+ binary: Uint8Array;
+ /** Last sync timestamp */
+ lastSyncedAt: number;
+ /** Sync version from server */
+ syncVersion: number;
+}
+
+/**
+ * Sync metadata for tracking overall sync state
+ */
+export interface CrdtSyncMetadata {
+ /** Unique key for metadata storage */
+ key: string;
+ /** Last successful sync timestamp */
+ lastSyncAt: number;
+ /** Server sync version watermark */
+ syncVersionWatermark: number;
+ /** Actor ID for this client */
+ actorId: string;
+}
+
+/**
+ * Database for storing CRDT sync state
+ * Separate from main app database to avoid migration conflicts
+ */
+class CrdtSyncDatabase extends Dexie {
+ syncState!: Dexie.Table<CrdtSyncStateEntry, string>;
+ metadata!: Dexie.Table<CrdtSyncMetadata, string>;
+
+ constructor() {
+ super("kioku-crdt-sync");
+
+ this.version(1).stores({
+ // Primary key is documentId, indexed by entityType and entityId
+ syncState: "documentId, entityType, entityId, lastSyncedAt",
+ // Simple key-value store for metadata
+ metadata: "key",
+ });
+ }
+}
+
+/**
+ * Singleton instance of the CRDT sync database
+ */
+export const crdtSyncDb = new CrdtSyncDatabase();
+
+/**
+ * CRDT Sync State Manager
+ *
+ * Provides operations for managing CRDT sync state:
+ * - Store/retrieve CRDT document binaries
+ * - Track sync progress
+ * - Manage sync metadata
+ */
+export class CrdtSyncStateManager {
+ private readonly metadataKey = "sync-metadata";
+
+ /**
+ * Get the CRDT binary for an entity
+ */
+ async getDocumentBinary(
+ entityType: CrdtEntityTypeValue,
+ entityId: string,
+ ): Promise<Uint8Array | null> {
+ const documentId = createDocumentId(entityType, entityId);
+ const entry = await crdtSyncDb.syncState.get(documentId);
+ return entry?.binary ?? null;
+ }
+
+ /**
+ * Store the CRDT binary for an entity
+ */
+ async setDocumentBinary(
+ entityType: CrdtEntityTypeValue,
+ entityId: string,
+ binary: Uint8Array,
+ syncVersion: number,
+ ): Promise<void> {
+ const documentId = createDocumentId(entityType, entityId);
+ const entry: CrdtSyncStateEntry = {
+ documentId,
+ entityType,
+ entityId,
+ binary,
+ lastSyncedAt: Date.now(),
+ syncVersion,
+ };
+ await crdtSyncDb.syncState.put(entry);
+ }
+
+ /**
+ * Get all CRDT binaries for an entity type
+ */
+ async getDocumentsByType(
+ entityType: CrdtEntityTypeValue,
+ ): Promise<CrdtSyncStateEntry[]> {
+ return crdtSyncDb.syncState
+ .where("entityType")
+ .equals(entityType)
+ .toArray();
+ }
+
+ /**
+ * Delete the CRDT binary for an entity
+ */
+ async deleteDocument(
+ entityType: CrdtEntityTypeValue,
+ entityId: string,
+ ): Promise<void> {
+ const documentId = createDocumentId(entityType, entityId);
+ await crdtSyncDb.syncState.delete(documentId);
+ }
+
+ /**
+ * Delete all CRDT binaries for an entity type
+ */
+ async deleteDocumentsByType(entityType: CrdtEntityTypeValue): Promise<void> {
+ await crdtSyncDb.syncState.where("entityType").equals(entityType).delete();
+ }
+
+ /**
+ * Get sync metadata
+ */
+ async getMetadata(): Promise<CrdtSyncMetadata | null> {
+ const metadata = await crdtSyncDb.metadata.get(this.metadataKey);
+ return metadata ?? null;
+ }
+
+ /**
+ * Update sync metadata
+ */
+ async setMetadata(
+ updates: Partial<Omit<CrdtSyncMetadata, "key">>,
+ ): Promise<void> {
+ const existing = await this.getMetadata();
+ const metadata: CrdtSyncMetadata = {
+ key: this.metadataKey,
+ lastSyncAt: updates.lastSyncAt ?? existing?.lastSyncAt ?? 0,
+ syncVersionWatermark:
+ updates.syncVersionWatermark ?? existing?.syncVersionWatermark ?? 0,
+ actorId: updates.actorId ?? existing?.actorId ?? "",
+ };
+ await crdtSyncDb.metadata.put(metadata);
+ }
+
+ /**
+ * Get the last sync timestamp
+ */
+ async getLastSyncAt(): Promise<number> {
+ const metadata = await this.getMetadata();
+ return metadata?.lastSyncAt ?? 0;
+ }
+
+ /**
+ * Update the last sync timestamp
+ */
+ async setLastSyncAt(timestamp: number): Promise<void> {
+ await this.setMetadata({ lastSyncAt: timestamp });
+ }
+
+ /**
+ * Get the sync version watermark
+ */
+ async getSyncVersionWatermark(): Promise<number> {
+ const metadata = await this.getMetadata();
+ return metadata?.syncVersionWatermark ?? 0;
+ }
+
+ /**
+ * Update the sync version watermark
+ */
+ async setSyncVersionWatermark(version: number): Promise<void> {
+ await this.setMetadata({ syncVersionWatermark: version });
+ }
+
+ /**
+ * Clear all sync state (for full resync)
+ */
+ async clearAll(): Promise<void> {
+ await crdtSyncDb.syncState.clear();
+ await crdtSyncDb.metadata.clear();
+ }
+
+ /**
+ * Get count of stored documents by type
+ */
+ async getDocumentCountByType(
+ entityType: CrdtEntityTypeValue,
+ ): Promise<number> {
+ return crdtSyncDb.syncState.where("entityType").equals(entityType).count();
+ }
+
+ /**
+ * Get total count of stored documents
+ */
+ async getTotalDocumentCount(): Promise<number> {
+ return crdtSyncDb.syncState.count();
+ }
+
+ /**
+ * Check if a document exists in sync state
+ */
+ async hasDocument(
+ entityType: CrdtEntityTypeValue,
+ entityId: string,
+ ): Promise<boolean> {
+ const documentId = createDocumentId(entityType, entityId);
+ const count = await crdtSyncDb.syncState
+ .where("documentId")
+ .equals(documentId)
+ .count();
+ return count > 0;
+ }
+
+ /**
+ * Get documents that have been synced since a given timestamp
+ */
+ async getDocumentsSyncedSince(
+ timestamp: number,
+ ): Promise<CrdtSyncStateEntry[]> {
+ return crdtSyncDb.syncState
+ .where("lastSyncedAt")
+ .above(timestamp)
+ .toArray();
+ }
+
+ /**
+ * Batch update multiple documents
+ */
+ async batchSetDocuments(
+ entries: Array<{
+ entityType: CrdtEntityTypeValue;
+ entityId: string;
+ binary: Uint8Array;
+ syncVersion: number;
+ }>,
+ ): Promise<void> {
+ const now = Date.now();
+ const syncEntries: CrdtSyncStateEntry[] = entries.map((entry) => ({
+ documentId: createDocumentId(entry.entityType, entry.entityId),
+ entityType: entry.entityType,
+ entityId: entry.entityId,
+ binary: entry.binary,
+ lastSyncedAt: now,
+ syncVersion: entry.syncVersion,
+ }));
+
+ await crdtSyncDb.syncState.bulkPut(syncEntries);
+ }
+
+ /**
+ * Batch delete multiple documents
+ */
+ async batchDeleteDocuments(
+ entries: Array<{
+ entityType: CrdtEntityTypeValue;
+ entityId: string;
+ }>,
+ ): Promise<void> {
+ const documentIds = entries.map((entry) =>
+ createDocumentId(entry.entityType, entry.entityId),
+ );
+ await crdtSyncDb.syncState.bulkDelete(documentIds);
+ }
+}
+
+/**
+ * Singleton instance of the sync state manager
+ */
+export const crdtSyncStateManager = new CrdtSyncStateManager();
+
+/**
+ * Serialize CRDT binary to base64 for network transport
+ */
+export function binaryToBase64(binary: Uint8Array): string {
+ // Use standard base64 encoding
+ const bytes = new Uint8Array(binary);
+ let binaryStr = "";
+ for (const byte of bytes) {
+ binaryStr += String.fromCharCode(byte);
+ }
+ return btoa(binaryStr);
+}
+
+/**
+ * Deserialize base64 string to CRDT binary
+ */
+export function base64ToBinary(base64: string): Uint8Array {
+ const binaryStr = atob(base64);
+ const bytes = new Uint8Array(binaryStr.length);
+ for (let i = 0; i < binaryStr.length; i++) {
+ bytes[i] = binaryStr.charCodeAt(i);
+ }
+ return bytes;
+}
+
+/**
+ * CRDT changes payload for sync API
+ */
+export interface CrdtSyncPayload {
+ /** Document ID */
+ documentId: string;
+ /** Entity type */
+ entityType: CrdtEntityTypeValue;
+ /** Entity ID */
+ entityId: string;
+ /** Base64-encoded CRDT binary */
+ binary: string;
+}
+
+/**
+ * Convert sync state entries to sync payload format
+ */
+export function entriesToSyncPayload(
+ entries: CrdtSyncStateEntry[],
+): CrdtSyncPayload[] {
+ return entries.map((entry) => ({
+ documentId: entry.documentId,
+ entityType: entry.entityType,
+ entityId: entry.entityId,
+ binary: binaryToBase64(entry.binary),
+ }));
+}
+
+/**
+ * Convert sync payload to sync state entries
+ */
+export function syncPayloadToEntries(
+ payloads: CrdtSyncPayload[],
+ syncVersion: number,
+): Array<{
+ entityType: CrdtEntityTypeValue;
+ entityId: string;
+ binary: Uint8Array;
+ syncVersion: number;
+}> {
+ return payloads.map((payload) => ({
+ entityType: payload.entityType,
+ entityId: payload.entityId,
+ binary: base64ToBinary(payload.binary),
+ syncVersion,
+ }));
+}