From 2ded1df457fd769323d48af08b9dd68da4aeb820 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Wed, 31 Dec 2025 15:52:48 +0900 Subject: feat(crdt): integrate CRDT sync flow into sync manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Store CRDT document binaries after successful push operations - Update CRDT sync metadata (lastSyncAt, syncVersionWatermark) after sync - Add getCrdtSyncStats(), clearCrdtState(), hasCrdtDocument() methods - Add crdt_documents_stored event and crdtDocumentsStored to SyncResult - Include all entity types in conflict resolution count 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- docs/dev/roadmap.md | 2 +- src/client/sync/manager.test.ts | 293 ++++++++++++++++++++++++++++++++++++++++ src/client/sync/manager.ts | 243 ++++++++++++++++++++++++++++++++- 3 files changed, 532 insertions(+), 6 deletions(-) diff --git a/docs/dev/roadmap.md b/docs/dev/roadmap.md index d2a4d3b..51d65c8 100644 --- a/docs/dev/roadmap.md +++ b/docs/dev/roadmap.md @@ -26,7 +26,7 @@ Replace the current Last-Write-Wins (LWW) conflict resolution with Automerge CRD - [x] Modify `src/client/sync/push.ts` - Add crdtChanges to push payload - [x] Modify `src/client/sync/pull.ts` - Handle crdtChanges in pull response - [x] Modify `src/client/sync/conflict.ts` - Replace LWW with Automerge merge -- [ ] Modify `src/client/sync/manager.ts` - Integrate CRDT sync flow +- [x] Modify `src/client/sync/manager.ts` - Integrate CRDT sync flow ### Phase 4: Server-Side CRDT Support diff --git a/src/client/sync/manager.test.ts b/src/client/sync/manager.test.ts index a3799c0..0758261 100644 --- a/src/client/sync/manager.test.ts +++ b/src/client/sync/manager.test.ts @@ -14,6 +14,7 @@ import { import { db } from "../db/index"; import { localDeckRepository } from "../db/repositories"; import { ConflictResolver } from "./conflict"; +import { CrdtEntityType, CrdtSyncStateManager, crdtSyncDb } from "./crdt"; import { SyncManager, type SyncManagerEvent } from "./manager"; import { PullService, type SyncPullResult } from "./pull"; import { PushService, type SyncPushResult } from "./push"; @@ -90,6 +91,8 @@ describe("SyncManager", () => { await db.decks.clear(); await db.cards.clear(); await db.reviewLogs.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); localStorage.clear(); syncQueue = new SyncQueue(); @@ -116,6 +119,8 @@ describe("SyncManager", () => { await db.decks.clear(); await db.cards.clear(); await db.reviewLogs.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); localStorage.clear(); }); @@ -637,4 +642,292 @@ describe("SyncManager", () => { expect(manager.isSyncing()).toBe(false); }); }); + + describe("CRDT integration", () => { + it("should store CRDT documents after successful push", async () => { + // Create pending data + const deck = await createPendingDeck(); + + // Mock push to return success with sync version + pushToServer.mockResolvedValue({ + decks: [{ id: deck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: createEmptyConflicts(), + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + const result = await manager.sync(); + + expect(result.success).toBe(true); + expect(result.crdtDocumentsStored).toBe(1); + + // Verify CRDT document was stored + const hasDocument = await crdtSyncStateManager.hasDocument( + CrdtEntityType.Deck, + deck.id, + ); + expect(hasDocument).toBe(true); + }); + + it("should emit crdt_documents_stored event when documents are stored", async () => { + const deck = await createPendingDeck(); + + pushToServer.mockResolvedValue({ + decks: [{ id: deck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: createEmptyConflicts(), + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + }); + + const events: SyncManagerEvent[] = []; + manager.subscribe((event) => events.push(event)); + + await manager.sync(); + + const crdtEvent = events.find( + (e) => e.type === "crdt_documents_stored", + ) as { type: "crdt_documents_stored"; count: number } | undefined; + expect(crdtEvent).toBeDefined(); + expect(crdtEvent?.count).toBe(1); + }); + + it("should update CRDT sync metadata after successful sync", async () => { + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(10), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + await manager.sync(); + + const stats = await manager.getCrdtSyncStats(); + expect(stats.syncVersionWatermark).toBe(10); + expect(stats.lastSyncAt).toBeGreaterThan(0); + }); + + it("should return CRDT sync stats", async () => { + const deck = await createPendingDeck(); + + pushToServer.mockResolvedValue({ + decks: [{ id: deck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: createEmptyConflicts(), + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + await manager.sync(); + + const stats = await manager.getCrdtSyncStats(); + expect(stats.totalDocuments).toBe(1); + }); + + it("should clear CRDT state when clearCrdtState is called", async () => { + const deck = await createPendingDeck(); + + pushToServer.mockResolvedValue({ + decks: [{ id: deck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: createEmptyConflicts(), + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + await manager.sync(); + + // Verify document exists + let hasDocument = await manager.hasCrdtDocument( + CrdtEntityType.Deck, + deck.id, + ); + expect(hasDocument).toBe(true); + + // Clear CRDT state + await manager.clearCrdtState(); + + // Verify document is gone + hasDocument = await manager.hasCrdtDocument(CrdtEntityType.Deck, deck.id); + expect(hasDocument).toBe(false); + + const stats = await manager.getCrdtSyncStats(); + expect(stats.totalDocuments).toBe(0); + }); + + it("should check if CRDT document exists", async () => { + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + // No documents initially + const hasDocument = await manager.hasCrdtDocument( + CrdtEntityType.Deck, + "non-existent-id", + ); + expect(hasDocument).toBe(false); + }); + + it("should not store CRDT documents for failed push items", async () => { + const deck = await createPendingDeck(); + + // Push succeeds but deck is in conflicts (not in success list) + pushToServer.mockResolvedValue({ + decks: [], // Deck not in success list + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: { ...createEmptyConflicts(), decks: [deck.id] }, + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + const result = await manager.sync(); + + // No CRDT documents should be stored for conflicted items + expect(result.crdtDocumentsStored).toBe(0); + + const hasDocument = await crdtSyncStateManager.hasDocument( + CrdtEntityType.Deck, + deck.id, + ); + expect(hasDocument).toBe(false); + }); + + it("should include crdtDocumentsStored in sync result when offline", async () => { + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + }); + + manager.start(); + window.dispatchEvent(new Event("offline")); + + const result = await manager.sync(); + + expect(result.success).toBe(false); + expect(result.crdtDocumentsStored).toBe(0); + + manager.stop(); + }); + + it("should include crdtDocumentsStored in sync result on error", async () => { + await createPendingDeck(); + pushToServer.mockRejectedValue(new Error("Network error")); + + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + }); + + const result = await manager.sync(); + + expect(result.success).toBe(false); + expect(result.crdtDocumentsStored).toBe(0); + }); + }); }); diff --git a/src/client/sync/manager.ts b/src/client/sync/manager.ts index d935a3b..b5da89a 100644 --- a/src/client/sync/manager.ts +++ b/src/client/sync/manager.ts @@ -1,7 +1,12 @@ import type { ConflictResolver } from "./conflict"; +import { + CrdtEntityType, + type CrdtSyncStateManager, + crdtSyncStateManager as defaultCrdtSyncStateManager, +} from "./crdt"; import type { PullService, SyncPullResult } from "./pull"; import type { PushService, SyncPushResult } from "./push"; -import type { SyncQueue, SyncQueueState } from "./queue"; +import type { PendingChanges, SyncQueue, SyncQueueState } from "./queue"; /** * Sync result from a full sync operation @@ -11,6 +16,8 @@ export interface SyncResult { pushResult: SyncPushResult | null; pullResult: SyncPullResult | null; conflictsResolved: number; + /** Number of CRDT documents stored during sync */ + crdtDocumentsStored: number; error?: string; } @@ -22,6 +29,11 @@ export interface SyncManagerOptions { pushService: PushService; pullService: PullService; conflictResolver: ConflictResolver; + /** + * CRDT sync state manager for storing CRDT document binaries + * Default: singleton crdtSyncStateManager + */ + crdtSyncStateManager?: CrdtSyncStateManager; /** * Debounce time in ms before syncing after coming online * Default: 1000ms @@ -44,7 +56,8 @@ export type SyncManagerEvent = | { type: "offline" } | { type: "sync_start" } | { type: "sync_complete"; result: SyncResult } - | { type: "sync_error"; error: string }; + | { type: "sync_error"; error: string } + | { type: "crdt_documents_stored"; count: number }; /** * Sync Manager @@ -54,12 +67,14 @@ export type SyncManagerEvent = * 2. Triggers sync when coming back online * 3. Coordinates push, pull, and conflict resolution * 4. Manages sync state and notifies listeners + * 5. Stores CRDT document binaries for conflict-free sync */ export class SyncManager { private syncQueue: SyncQueue; private pushService: PushService; private pullService: PullService; private conflictResolver: ConflictResolver; + private crdtSyncStateManager: CrdtSyncStateManager; private debounceMs: number; private autoSync: boolean; private listeners: Set = new Set(); @@ -75,6 +90,8 @@ export class SyncManager { this.pushService = options.pushService; this.pullService = options.pullService; this.conflictResolver = options.conflictResolver; + this.crdtSyncStateManager = + options.crdtSyncStateManager ?? defaultCrdtSyncStateManager; this.debounceMs = options.debounceMs ?? 1000; this.autoSync = options.autoSync ?? true; this.isOnline = typeof navigator !== "undefined" ? navigator.onLine : true; @@ -206,6 +223,7 @@ export class SyncManager { pushResult: null, pullResult: null, conflictsResolved: 0, + crdtDocumentsStored: 0, error: "Offline", }; } @@ -216,6 +234,7 @@ export class SyncManager { pushResult: null, pullResult: null, conflictsResolved: 0, + crdtDocumentsStored: 0, error: "Sync already in progress", }; } @@ -226,27 +245,56 @@ export class SyncManager { try { await this.syncQueue.startSync(); + // Get pending changes before push to store CRDT documents + const pendingChanges = await this.syncQueue.getPendingChanges(); + // Step 1: Push local changes const pushResult = await this.pushService.push(); - // Step 2: Pull server changes + // Step 2: Store CRDT documents for successfully pushed entities + const crdtDocumentsStored = await this.storeCrdtDocumentsAfterPush( + pendingChanges, + pushResult, + ); + + if (crdtDocumentsStored > 0) { + this.notifyListeners({ + type: "crdt_documents_stored", + count: crdtDocumentsStored, + }); + } + + // Step 3: Pull server changes const pullResult = await this.pullService.pull(); - // Step 3: Resolve any conflicts + // Step 4: Resolve any conflicts using CRDT merge let conflictsResolved = 0; if (this.conflictResolver.hasConflicts(pushResult)) { const resolution = await this.conflictResolver.resolveConflicts( pushResult, pullResult, ); - conflictsResolved = resolution.decks.length + resolution.cards.length; + conflictsResolved = + resolution.decks.length + + resolution.cards.length + + resolution.noteTypes.length + + resolution.noteFieldTypes.length + + resolution.notes.length + + resolution.noteFieldValues.length; } + // Step 5: Update CRDT sync metadata + await this.crdtSyncStateManager.setMetadata({ + lastSyncAt: Date.now(), + syncVersionWatermark: pullResult.currentSyncVersion, + }); + const result: SyncResult = { success: true, pushResult, pullResult, conflictsResolved, + crdtDocumentsStored, }; this.notifyListeners({ type: "sync_complete", result }); @@ -261,6 +309,7 @@ export class SyncManager { pushResult: null, pullResult: null, conflictsResolved: 0, + crdtDocumentsStored: 0, error: errorMessage, }; @@ -271,6 +320,152 @@ export class SyncManager { } } + /** + * Store CRDT document binaries after successful push + * This ensures we have local CRDT state for future conflict resolution + */ + private async storeCrdtDocumentsAfterPush( + pendingChanges: PendingChanges, + pushResult: SyncPushResult, + ): Promise { + const entriesToStore: Array<{ + entityType: (typeof CrdtEntityType)[keyof typeof CrdtEntityType]; + entityId: string; + binary: Uint8Array; + syncVersion: number; + }> = []; + + // Helper to find sync version from push result + const findSyncVersion = ( + results: { id: string; syncVersion: number }[], + id: string, + ): number | undefined => { + return results.find((r) => r.id === id)?.syncVersion; + }; + + // Import CRDT repositories dynamically to avoid circular dependencies + const { + crdtDeckRepository, + crdtNoteTypeRepository, + crdtNoteFieldTypeRepository, + crdtNoteRepository, + crdtNoteFieldValueRepository, + crdtCardRepository, + crdtReviewLogRepository, + } = await import("./crdt"); + + // Process pushed decks + for (const deck of pendingChanges.decks) { + const syncVersion = findSyncVersion(pushResult.decks, deck.id); + if (syncVersion !== undefined) { + const result = crdtDeckRepository.toCrdtDocument(deck); + entriesToStore.push({ + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed note types + for (const noteType of pendingChanges.noteTypes) { + const syncVersion = findSyncVersion(pushResult.noteTypes, noteType.id); + if (syncVersion !== undefined) { + const result = crdtNoteTypeRepository.toCrdtDocument(noteType); + entriesToStore.push({ + entityType: CrdtEntityType.NoteType, + entityId: noteType.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed note field types + for (const fieldType of pendingChanges.noteFieldTypes) { + const syncVersion = findSyncVersion( + pushResult.noteFieldTypes, + fieldType.id, + ); + if (syncVersion !== undefined) { + const result = crdtNoteFieldTypeRepository.toCrdtDocument(fieldType); + entriesToStore.push({ + entityType: CrdtEntityType.NoteFieldType, + entityId: fieldType.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed notes + for (const note of pendingChanges.notes) { + const syncVersion = findSyncVersion(pushResult.notes, note.id); + if (syncVersion !== undefined) { + const result = crdtNoteRepository.toCrdtDocument(note); + entriesToStore.push({ + entityType: CrdtEntityType.Note, + entityId: note.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed note field values + for (const fieldValue of pendingChanges.noteFieldValues) { + const syncVersion = findSyncVersion( + pushResult.noteFieldValues, + fieldValue.id, + ); + if (syncVersion !== undefined) { + const result = crdtNoteFieldValueRepository.toCrdtDocument(fieldValue); + entriesToStore.push({ + entityType: CrdtEntityType.NoteFieldValue, + entityId: fieldValue.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed cards + for (const card of pendingChanges.cards) { + const syncVersion = findSyncVersion(pushResult.cards, card.id); + if (syncVersion !== undefined) { + const result = crdtCardRepository.toCrdtDocument(card); + entriesToStore.push({ + entityType: CrdtEntityType.Card, + entityId: card.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Process pushed review logs + for (const reviewLog of pendingChanges.reviewLogs) { + const syncVersion = findSyncVersion(pushResult.reviewLogs, reviewLog.id); + if (syncVersion !== undefined) { + const result = crdtReviewLogRepository.toCrdtDocument(reviewLog); + entriesToStore.push({ + entityType: CrdtEntityType.ReviewLog, + entityId: reviewLog.id, + binary: result.binary, + syncVersion, + }); + } + } + + // Batch store all entries + if (entriesToStore.length > 0) { + await this.crdtSyncStateManager.batchSetDocuments(entriesToStore); + } + + return entriesToStore.length; + } + /** * Force sync even if auto-sync is disabled */ @@ -291,6 +486,44 @@ export class SyncManager { isAutoSyncEnabled(): boolean { return this.autoSync; } + + /** + * Get CRDT sync statistics + */ + async getCrdtSyncStats(): Promise<{ + totalDocuments: number; + lastSyncAt: number; + syncVersionWatermark: number; + }> { + const [totalDocuments, metadata] = await Promise.all([ + this.crdtSyncStateManager.getTotalDocumentCount(), + this.crdtSyncStateManager.getMetadata(), + ]); + + return { + totalDocuments, + lastSyncAt: metadata?.lastSyncAt ?? 0, + syncVersionWatermark: metadata?.syncVersionWatermark ?? 0, + }; + } + + /** + * Clear all CRDT sync state + * Use this when resetting sync or logging out + */ + async clearCrdtState(): Promise { + await this.crdtSyncStateManager.clearAll(); + } + + /** + * Check if a document has CRDT state stored + */ + async hasCrdtDocument( + entityType: (typeof CrdtEntityType)[keyof typeof CrdtEntityType], + entityId: string, + ): Promise { + return this.crdtSyncStateManager.hasDocument(entityType, entityId); + } } /** -- cgit v1.2.3-70-g09d2