diff options
Diffstat (limited to 'src/client/sync/manager.ts')
| -rw-r--r-- | src/client/sync/manager.ts | 243 |
1 files changed, 238 insertions, 5 deletions
diff --git a/src/client/sync/manager.ts b/src/client/sync/manager.ts index d935a3b..b5da89a 100644 --- a/src/client/sync/manager.ts +++ b/src/client/sync/manager.ts @@ -1,7 +1,12 @@ import type { ConflictResolver } from "./conflict"; +import { + CrdtEntityType, + type CrdtSyncStateManager, + crdtSyncStateManager as defaultCrdtSyncStateManager, +} from "./crdt"; import type { PullService, SyncPullResult } from "./pull"; import type { PushService, SyncPushResult } from "./push"; -import type { SyncQueue, SyncQueueState } from "./queue"; +import type { PendingChanges, SyncQueue, SyncQueueState } from "./queue"; /** * Sync result from a full sync operation @@ -11,6 +16,8 @@ export interface SyncResult { pushResult: SyncPushResult | null; pullResult: SyncPullResult | null; conflictsResolved: number; + /** Number of CRDT documents stored during sync */ + crdtDocumentsStored: number; error?: string; } @@ -23,6 +30,11 @@ export interface SyncManagerOptions { pullService: PullService; conflictResolver: ConflictResolver; /** + * CRDT sync state manager for storing CRDT document binaries + * Default: singleton crdtSyncStateManager + */ + crdtSyncStateManager?: CrdtSyncStateManager; + /** * Debounce time in ms before syncing after coming online * Default: 1000ms */ @@ -44,7 +56,8 @@ export type SyncManagerEvent = | { type: "offline" } | { type: "sync_start" } | { type: "sync_complete"; result: SyncResult } - | { type: "sync_error"; error: string }; + | { type: "sync_error"; error: string } + | { type: "crdt_documents_stored"; count: number }; /** * Sync Manager @@ -54,12 +67,14 @@ export type SyncManagerEvent = * 2. Triggers sync when coming back online * 3. Coordinates push, pull, and conflict resolution * 4. Manages sync state and notifies listeners + * 5. Stores CRDT document binaries for conflict-free sync */ export class SyncManager { private syncQueue: SyncQueue; private pushService: PushService; private pullService: PullService; private conflictResolver: ConflictResolver; + private crdtSyncStateManager: CrdtSyncStateManager; private debounceMs: number; private autoSync: boolean; private listeners: Set<SyncManagerListener> = new Set(); @@ -75,6 +90,8 @@ export class SyncManager { this.pushService = options.pushService; this.pullService = options.pullService; this.conflictResolver = options.conflictResolver; + this.crdtSyncStateManager = + options.crdtSyncStateManager ?? defaultCrdtSyncStateManager; this.debounceMs = options.debounceMs ?? 1000; this.autoSync = options.autoSync ?? true; this.isOnline = typeof navigator !== "undefined" ? navigator.onLine : true; @@ -206,6 +223,7 @@ export class SyncManager { pushResult: null, pullResult: null, conflictsResolved: 0, + crdtDocumentsStored: 0, error: "Offline", }; } @@ -216,6 +234,7 @@ export class SyncManager { pushResult: null, pullResult: null, conflictsResolved: 0, + crdtDocumentsStored: 0, error: "Sync already in progress", }; } @@ -226,27 +245,56 @@ export class SyncManager { try { await this.syncQueue.startSync(); + // Get pending changes before push to store CRDT documents + const pendingChanges = await this.syncQueue.getPendingChanges(); + // Step 1: Push local changes const pushResult = await this.pushService.push(); - // Step 2: Pull server changes + // Step 2: Store CRDT documents for successfully pushed entities + const crdtDocumentsStored = await this.storeCrdtDocumentsAfterPush( + pendingChanges, + pushResult, + ); + + if (crdtDocumentsStored > 0) { + this.notifyListeners({ + type: "crdt_documents_stored", + count: crdtDocumentsStored, + }); + } + + // Step 3: Pull server changes const pullResult = await this.pullService.pull(); - // Step 3: Resolve any conflicts + // Step 4: Resolve any conflicts using CRDT merge let conflictsResolved = 0; if (this.conflictResolver.hasConflicts(pushResult)) { const resolution = await this.conflictResolver.resolveConflicts( pushResult, pullResult, ); - conflictsResolved = resolution.decks.length + resolution.cards.length; + conflictsResolved = + resolution.decks.length + + resolution.cards.length + + resolution.noteTypes.length + + resolution.noteFieldTypes.length + + resolution.notes.length + + resolution.noteFieldValues.length; } + // Step 5: Update CRDT sync metadata + await this.crdtSyncStateManager.setMetadata({ + lastSyncAt: Date.now(), + syncVersionWatermark: pullResult.currentSyncVersion, + }); + const result: SyncResult = { success: true, pushResult, pullResult, conflictsResolved, + crdtDocumentsStored, }; this.notifyListeners({ type: "sync_complete", result }); @@ -261,6 +309,7 @@ export class SyncManager { pushResult: null, pullResult: null, conflictsResolved: 0, + crdtDocumentsStored: 0, error: errorMessage, }; @@ -272,6 +321,152 @@ export class SyncManager { } /** + * Store CRDT document binaries after successful push + * This ensures we have local CRDT state for future conflict resolution + */ + private async storeCrdtDocumentsAfterPush( + pendingChanges: PendingChanges, + pushResult: SyncPushResult, + ): Promise<number> { + const entriesToStore: Array<{ + entityType: (typeof CrdtEntityType)[keyof typeof CrdtEntityType]; + entityId: string; + binary: Uint8Array; + syncVersion: number; + }> = []; + + // Helper to find sync version from push result + const findSyncVersion = ( + results: { id: string; syncVersion: number }[], + id: string, + ): number | undefined => { + return results.find((r) => r.id === id)?.syncVersion; + }; + + // Import CRDT repositories dynamically to avoid circular dependencies + const { + crdtDeckRepository, + crdtNoteTypeRepository, + crdtNoteFieldTypeRepository, + crdtNoteRepository, + crdtNoteFieldValueRepository, + crdtCardRepository, + crdtReviewLogRepository, + } = await import("./crdt"); + + // Process pushed decks + for (const deck of pendingChanges.decks) { + const syncVersion = findSyncVersion(pushResult.decks, deck.id); + if (syncVersion !== undefined) { + const result = crdtDeckRepository.toCrdtDocument(deck); + entriesToStore.push({ + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed note types + for (const noteType of pendingChanges.noteTypes) { + const syncVersion = findSyncVersion(pushResult.noteTypes, noteType.id); + if (syncVersion !== undefined) { + const result = crdtNoteTypeRepository.toCrdtDocument(noteType); + entriesToStore.push({ + entityType: CrdtEntityType.NoteType, + entityId: noteType.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed note field types + for (const fieldType of pendingChanges.noteFieldTypes) { + const syncVersion = findSyncVersion( + pushResult.noteFieldTypes, + fieldType.id, + ); + if (syncVersion !== undefined) { + const result = crdtNoteFieldTypeRepository.toCrdtDocument(fieldType); + entriesToStore.push({ + entityType: CrdtEntityType.NoteFieldType, + entityId: fieldType.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed notes + for (const note of pendingChanges.notes) { + const syncVersion = findSyncVersion(pushResult.notes, note.id); + if (syncVersion !== undefined) { + const result = crdtNoteRepository.toCrdtDocument(note); + entriesToStore.push({ + entityType: CrdtEntityType.Note, + entityId: note.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed note field values + for (const fieldValue of pendingChanges.noteFieldValues) { + const syncVersion = findSyncVersion( + pushResult.noteFieldValues, + fieldValue.id, + ); + if (syncVersion !== undefined) { + const result = crdtNoteFieldValueRepository.toCrdtDocument(fieldValue); + entriesToStore.push({ + entityType: CrdtEntityType.NoteFieldValue, + entityId: fieldValue.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed cards + for (const card of pendingChanges.cards) { + const syncVersion = findSyncVersion(pushResult.cards, card.id); + if (syncVersion !== undefined) { + const result = crdtCardRepository.toCrdtDocument(card); + entriesToStore.push({ + entityType: CrdtEntityType.Card, + entityId: card.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed review logs + for (const reviewLog of pendingChanges.reviewLogs) { + const syncVersion = findSyncVersion(pushResult.reviewLogs, reviewLog.id); + if (syncVersion !== undefined) { + const result = crdtReviewLogRepository.toCrdtDocument(reviewLog); + entriesToStore.push({ + entityType: CrdtEntityType.ReviewLog, + entityId: reviewLog.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Batch store all entries + if (entriesToStore.length > 0) { + await this.crdtSyncStateManager.batchSetDocuments(entriesToStore); + } + + return entriesToStore.length; + } + + /** * Force sync even if auto-sync is disabled */ async forceSync(): Promise<SyncResult> { @@ -291,6 +486,44 @@ export class SyncManager { isAutoSyncEnabled(): boolean { return this.autoSync; } + + /** + * Get CRDT sync statistics + */ + async getCrdtSyncStats(): Promise<{ + totalDocuments: number; + lastSyncAt: number; + syncVersionWatermark: number; + }> { + const [totalDocuments, metadata] = await Promise.all([ + this.crdtSyncStateManager.getTotalDocumentCount(), + this.crdtSyncStateManager.getMetadata(), + ]); + + return { + totalDocuments, + lastSyncAt: metadata?.lastSyncAt ?? 0, + syncVersionWatermark: metadata?.syncVersionWatermark ?? 0, + }; + } + + /** + * Clear all CRDT sync state + * Use this when resetting sync or logging out + */ + async clearCrdtState(): Promise<void> { + await this.crdtSyncStateManager.clearAll(); + } + + /** + * Check if a document has CRDT state stored + */ + async hasCrdtDocument( + entityType: (typeof CrdtEntityType)[keyof typeof CrdtEntityType], + entityId: string, + ): Promise<boolean> { + return this.crdtSyncStateManager.hasDocument(entityType, entityId); + } } /** |
