From 2ded1df457fd769323d48af08b9dd68da4aeb820 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Wed, 31 Dec 2025 15:52:48 +0900 Subject: feat(crdt): integrate CRDT sync flow into sync manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Store CRDT document binaries after successful push operations - Update CRDT sync metadata (lastSyncAt, syncVersionWatermark) after sync - Add getCrdtSyncStats(), clearCrdtState(), hasCrdtDocument() methods - Add crdt_documents_stored event and crdtDocumentsStored to SyncResult - Include all entity types in conflict resolution count 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/client/sync/manager.test.ts | 293 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 293 insertions(+) (limited to 'src/client/sync/manager.test.ts') diff --git a/src/client/sync/manager.test.ts b/src/client/sync/manager.test.ts index a3799c0..0758261 100644 --- a/src/client/sync/manager.test.ts +++ b/src/client/sync/manager.test.ts @@ -14,6 +14,7 @@ import { import { db } from "../db/index"; import { localDeckRepository } from "../db/repositories"; import { ConflictResolver } from "./conflict"; +import { CrdtEntityType, CrdtSyncStateManager, crdtSyncDb } from "./crdt"; import { SyncManager, type SyncManagerEvent } from "./manager"; import { PullService, type SyncPullResult } from "./pull"; import { PushService, type SyncPushResult } from "./push"; @@ -90,6 +91,8 @@ describe("SyncManager", () => { await db.decks.clear(); await db.cards.clear(); await db.reviewLogs.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); localStorage.clear(); syncQueue = new SyncQueue(); @@ -116,6 +119,8 @@ describe("SyncManager", () => { await db.decks.clear(); await db.cards.clear(); await db.reviewLogs.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); localStorage.clear(); }); @@ -637,4 +642,292 @@ describe("SyncManager", () => { expect(manager.isSyncing()).toBe(false); }); }); + + describe("CRDT integration", () => { + it("should store CRDT documents after successful push", async () => { + // Create pending data + const deck = await createPendingDeck(); + + // Mock push to return success with sync version + pushToServer.mockResolvedValue({ + decks: [{ id: deck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: createEmptyConflicts(), + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + const result = await manager.sync(); + + expect(result.success).toBe(true); + expect(result.crdtDocumentsStored).toBe(1); + + // Verify CRDT document was stored + const hasDocument = await crdtSyncStateManager.hasDocument( + CrdtEntityType.Deck, + deck.id, + ); + expect(hasDocument).toBe(true); + }); + + it("should emit crdt_documents_stored event when documents are stored", async () => { + const deck = await createPendingDeck(); + + pushToServer.mockResolvedValue({ + decks: [{ id: deck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: createEmptyConflicts(), + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + }); + + const events: SyncManagerEvent[] = []; + manager.subscribe((event) => events.push(event)); + + await manager.sync(); + + const crdtEvent = events.find( + (e) => e.type === "crdt_documents_stored", + ) as { type: "crdt_documents_stored"; count: number } | undefined; + expect(crdtEvent).toBeDefined(); + expect(crdtEvent?.count).toBe(1); + }); + + it("should update CRDT sync metadata after successful sync", async () => { + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(10), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + await manager.sync(); + + const stats = await manager.getCrdtSyncStats(); + expect(stats.syncVersionWatermark).toBe(10); + expect(stats.lastSyncAt).toBeGreaterThan(0); + }); + + it("should return CRDT sync stats", async () => { + const deck = await createPendingDeck(); + + pushToServer.mockResolvedValue({ + decks: [{ id: deck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: createEmptyConflicts(), + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + await manager.sync(); + + const stats = await manager.getCrdtSyncStats(); + expect(stats.totalDocuments).toBe(1); + }); + + it("should clear CRDT state when clearCrdtState is called", async () => { + const deck = await createPendingDeck(); + + pushToServer.mockResolvedValue({ + decks: [{ id: deck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: createEmptyConflicts(), + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + await manager.sync(); + + // Verify document exists + let hasDocument = await manager.hasCrdtDocument( + CrdtEntityType.Deck, + deck.id, + ); + expect(hasDocument).toBe(true); + + // Clear CRDT state + await manager.clearCrdtState(); + + // Verify document is gone + hasDocument = await manager.hasCrdtDocument(CrdtEntityType.Deck, deck.id); + expect(hasDocument).toBe(false); + + const stats = await manager.getCrdtSyncStats(); + expect(stats.totalDocuments).toBe(0); + }); + + it("should check if CRDT document exists", async () => { + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + // No documents initially + const hasDocument = await manager.hasCrdtDocument( + CrdtEntityType.Deck, + "non-existent-id", + ); + expect(hasDocument).toBe(false); + }); + + it("should not store CRDT documents for failed push items", async () => { + const deck = await createPendingDeck(); + + // Push succeeds but deck is in conflicts (not in success list) + pushToServer.mockResolvedValue({ + decks: [], // Deck not in success list + cards: [], + reviewLogs: [], + ...createEmptyPushResult(), + conflicts: { ...createEmptyConflicts(), decks: [deck.id] }, + } satisfies SyncPushResult); + + pullFromServer.mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(1), + } satisfies SyncPullResult); + + const crdtSyncStateManager = new CrdtSyncStateManager(); + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + crdtSyncStateManager, + }); + + const result = await manager.sync(); + + // No CRDT documents should be stored for conflicted items + expect(result.crdtDocumentsStored).toBe(0); + + const hasDocument = await crdtSyncStateManager.hasDocument( + CrdtEntityType.Deck, + deck.id, + ); + expect(hasDocument).toBe(false); + }); + + it("should include crdtDocumentsStored in sync result when offline", async () => { + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + }); + + manager.start(); + window.dispatchEvent(new Event("offline")); + + const result = await manager.sync(); + + expect(result.success).toBe(false); + expect(result.crdtDocumentsStored).toBe(0); + + manager.stop(); + }); + + it("should include crdtDocumentsStored in sync result on error", async () => { + await createPendingDeck(); + pushToServer.mockRejectedValue(new Error("Network error")); + + const { pushService, pullService } = createServices(); + const manager = new SyncManager({ + syncQueue, + pushService, + pullService, + conflictResolver, + }); + + const result = await manager.sync(); + + expect(result.success).toBe(false); + expect(result.crdtDocumentsStored).toBe(0); + }); + }); }); -- cgit v1.2.3-70-g09d2