aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2025-12-31 15:52:48 +0900
committernsfisis <nsfisis@gmail.com>2025-12-31 15:52:48 +0900
commit2ded1df457fd769323d48af08b9dd68da4aeb820 (patch)
treee9f1809f9aedc79074fa0f3785150f0cd0cdfba0
parent3810450c20326998aef17c0acfcd5893e7b3ca20 (diff)
downloadkioku-2ded1df457fd769323d48af08b9dd68da4aeb820.tar.gz
kioku-2ded1df457fd769323d48af08b9dd68da4aeb820.tar.zst
kioku-2ded1df457fd769323d48af08b9dd68da4aeb820.zip
feat(crdt): integrate CRDT sync flow into sync manager
- Store CRDT document binaries after successful push operations - Update CRDT sync metadata (lastSyncAt, syncVersionWatermark) after sync - Add getCrdtSyncStats(), clearCrdtState(), hasCrdtDocument() methods - Add crdt_documents_stored event and crdtDocumentsStored to SyncResult - Include all entity types in conflict resolution count 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
-rw-r--r--docs/dev/roadmap.md2
-rw-r--r--src/client/sync/manager.test.ts293
-rw-r--r--src/client/sync/manager.ts243
3 files changed, 532 insertions, 6 deletions
diff --git a/docs/dev/roadmap.md b/docs/dev/roadmap.md
index d2a4d3b..51d65c8 100644
--- a/docs/dev/roadmap.md
+++ b/docs/dev/roadmap.md
@@ -26,7 +26,7 @@ Replace the current Last-Write-Wins (LWW) conflict resolution with Automerge CRD
- [x] Modify `src/client/sync/push.ts` - Add crdtChanges to push payload
- [x] Modify `src/client/sync/pull.ts` - Handle crdtChanges in pull response
- [x] Modify `src/client/sync/conflict.ts` - Replace LWW with Automerge merge
-- [ ] Modify `src/client/sync/manager.ts` - Integrate CRDT sync flow
+- [x] Modify `src/client/sync/manager.ts` - Integrate CRDT sync flow
### Phase 4: Server-Side CRDT Support
diff --git a/src/client/sync/manager.test.ts b/src/client/sync/manager.test.ts
index a3799c0..0758261 100644
--- a/src/client/sync/manager.test.ts
+++ b/src/client/sync/manager.test.ts
@@ -14,6 +14,7 @@ import {
import { db } from "../db/index";
import { localDeckRepository } from "../db/repositories";
import { ConflictResolver } from "./conflict";
+import { CrdtEntityType, CrdtSyncStateManager, crdtSyncDb } from "./crdt";
import { SyncManager, type SyncManagerEvent } from "./manager";
import { PullService, type SyncPullResult } from "./pull";
import { PushService, type SyncPushResult } from "./push";
@@ -90,6 +91,8 @@ describe("SyncManager", () => {
await db.decks.clear();
await db.cards.clear();
await db.reviewLogs.clear();
+ await crdtSyncDb.syncState.clear();
+ await crdtSyncDb.metadata.clear();
localStorage.clear();
syncQueue = new SyncQueue();
@@ -116,6 +119,8 @@ describe("SyncManager", () => {
await db.decks.clear();
await db.cards.clear();
await db.reviewLogs.clear();
+ await crdtSyncDb.syncState.clear();
+ await crdtSyncDb.metadata.clear();
localStorage.clear();
});
@@ -637,4 +642,292 @@ describe("SyncManager", () => {
expect(manager.isSyncing()).toBe(false);
});
});
+
+ describe("CRDT integration", () => {
+ it("should store CRDT documents after successful push", async () => {
+ // Create pending data
+ const deck = await createPendingDeck();
+
+ // Mock push to return success with sync version
+ pushToServer.mockResolvedValue({
+ decks: [{ id: deck.id, syncVersion: 1 }],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPushResult(),
+ conflicts: createEmptyConflicts(),
+ } satisfies SyncPushResult);
+
+ pullFromServer.mockResolvedValue({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(1),
+ } satisfies SyncPullResult);
+
+ const crdtSyncStateManager = new CrdtSyncStateManager();
+ const { pushService, pullService } = createServices();
+ const manager = new SyncManager({
+ syncQueue,
+ pushService,
+ pullService,
+ conflictResolver,
+ crdtSyncStateManager,
+ });
+
+ const result = await manager.sync();
+
+ expect(result.success).toBe(true);
+ expect(result.crdtDocumentsStored).toBe(1);
+
+ // Verify CRDT document was stored
+ const hasDocument = await crdtSyncStateManager.hasDocument(
+ CrdtEntityType.Deck,
+ deck.id,
+ );
+ expect(hasDocument).toBe(true);
+ });
+
+ it("should emit crdt_documents_stored event when documents are stored", async () => {
+ const deck = await createPendingDeck();
+
+ pushToServer.mockResolvedValue({
+ decks: [{ id: deck.id, syncVersion: 1 }],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPushResult(),
+ conflicts: createEmptyConflicts(),
+ } satisfies SyncPushResult);
+
+ pullFromServer.mockResolvedValue({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(1),
+ } satisfies SyncPullResult);
+
+ const { pushService, pullService } = createServices();
+ const manager = new SyncManager({
+ syncQueue,
+ pushService,
+ pullService,
+ conflictResolver,
+ });
+
+ const events: SyncManagerEvent[] = [];
+ manager.subscribe((event) => events.push(event));
+
+ await manager.sync();
+
+ const crdtEvent = events.find(
+ (e) => e.type === "crdt_documents_stored",
+ ) as { type: "crdt_documents_stored"; count: number } | undefined;
+ expect(crdtEvent).toBeDefined();
+ expect(crdtEvent?.count).toBe(1);
+ });
+
+ it("should update CRDT sync metadata after successful sync", async () => {
+ pullFromServer.mockResolvedValue({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(10),
+ } satisfies SyncPullResult);
+
+ const crdtSyncStateManager = new CrdtSyncStateManager();
+ const { pushService, pullService } = createServices();
+ const manager = new SyncManager({
+ syncQueue,
+ pushService,
+ pullService,
+ conflictResolver,
+ crdtSyncStateManager,
+ });
+
+ await manager.sync();
+
+ const stats = await manager.getCrdtSyncStats();
+ expect(stats.syncVersionWatermark).toBe(10);
+ expect(stats.lastSyncAt).toBeGreaterThan(0);
+ });
+
+ it("should return CRDT sync stats", async () => {
+ const deck = await createPendingDeck();
+
+ pushToServer.mockResolvedValue({
+ decks: [{ id: deck.id, syncVersion: 1 }],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPushResult(),
+ conflicts: createEmptyConflicts(),
+ } satisfies SyncPushResult);
+
+ pullFromServer.mockResolvedValue({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(1),
+ } satisfies SyncPullResult);
+
+ const crdtSyncStateManager = new CrdtSyncStateManager();
+ const { pushService, pullService } = createServices();
+ const manager = new SyncManager({
+ syncQueue,
+ pushService,
+ pullService,
+ conflictResolver,
+ crdtSyncStateManager,
+ });
+
+ await manager.sync();
+
+ const stats = await manager.getCrdtSyncStats();
+ expect(stats.totalDocuments).toBe(1);
+ });
+
+ it("should clear CRDT state when clearCrdtState is called", async () => {
+ const deck = await createPendingDeck();
+
+ pushToServer.mockResolvedValue({
+ decks: [{ id: deck.id, syncVersion: 1 }],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPushResult(),
+ conflicts: createEmptyConflicts(),
+ } satisfies SyncPushResult);
+
+ pullFromServer.mockResolvedValue({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(1),
+ } satisfies SyncPullResult);
+
+ const crdtSyncStateManager = new CrdtSyncStateManager();
+ const { pushService, pullService } = createServices();
+ const manager = new SyncManager({
+ syncQueue,
+ pushService,
+ pullService,
+ conflictResolver,
+ crdtSyncStateManager,
+ });
+
+ await manager.sync();
+
+ // Verify document exists
+ let hasDocument = await manager.hasCrdtDocument(
+ CrdtEntityType.Deck,
+ deck.id,
+ );
+ expect(hasDocument).toBe(true);
+
+ // Clear CRDT state
+ await manager.clearCrdtState();
+
+ // Verify document is gone
+ hasDocument = await manager.hasCrdtDocument(CrdtEntityType.Deck, deck.id);
+ expect(hasDocument).toBe(false);
+
+ const stats = await manager.getCrdtSyncStats();
+ expect(stats.totalDocuments).toBe(0);
+ });
+
+ it("should check if CRDT document exists", async () => {
+ const crdtSyncStateManager = new CrdtSyncStateManager();
+ const { pushService, pullService } = createServices();
+ const manager = new SyncManager({
+ syncQueue,
+ pushService,
+ pullService,
+ conflictResolver,
+ crdtSyncStateManager,
+ });
+
+ // No documents initially
+ const hasDocument = await manager.hasCrdtDocument(
+ CrdtEntityType.Deck,
+ "non-existent-id",
+ );
+ expect(hasDocument).toBe(false);
+ });
+
+ it("should not store CRDT documents for failed push items", async () => {
+ const deck = await createPendingDeck();
+
+ // Push succeeds but deck is in conflicts (not in success list)
+ pushToServer.mockResolvedValue({
+ decks: [], // Deck not in success list
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPushResult(),
+ conflicts: { ...createEmptyConflicts(), decks: [deck.id] },
+ } satisfies SyncPushResult);
+
+ pullFromServer.mockResolvedValue({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(1),
+ } satisfies SyncPullResult);
+
+ const crdtSyncStateManager = new CrdtSyncStateManager();
+ const { pushService, pullService } = createServices();
+ const manager = new SyncManager({
+ syncQueue,
+ pushService,
+ pullService,
+ conflictResolver,
+ crdtSyncStateManager,
+ });
+
+ const result = await manager.sync();
+
+ // No CRDT documents should be stored for conflicted items
+ expect(result.crdtDocumentsStored).toBe(0);
+
+ const hasDocument = await crdtSyncStateManager.hasDocument(
+ CrdtEntityType.Deck,
+ deck.id,
+ );
+ expect(hasDocument).toBe(false);
+ });
+
+ it("should include crdtDocumentsStored in sync result when offline", async () => {
+ const { pushService, pullService } = createServices();
+ const manager = new SyncManager({
+ syncQueue,
+ pushService,
+ pullService,
+ conflictResolver,
+ });
+
+ manager.start();
+ window.dispatchEvent(new Event("offline"));
+
+ const result = await manager.sync();
+
+ expect(result.success).toBe(false);
+ expect(result.crdtDocumentsStored).toBe(0);
+
+ manager.stop();
+ });
+
+ it("should include crdtDocumentsStored in sync result on error", async () => {
+ await createPendingDeck();
+ pushToServer.mockRejectedValue(new Error("Network error"));
+
+ const { pushService, pullService } = createServices();
+ const manager = new SyncManager({
+ syncQueue,
+ pushService,
+ pullService,
+ conflictResolver,
+ });
+
+ const result = await manager.sync();
+
+ expect(result.success).toBe(false);
+ expect(result.crdtDocumentsStored).toBe(0);
+ });
+ });
});
diff --git a/src/client/sync/manager.ts b/src/client/sync/manager.ts
index d935a3b..b5da89a 100644
--- a/src/client/sync/manager.ts
+++ b/src/client/sync/manager.ts
@@ -1,7 +1,12 @@
import type { ConflictResolver } from "./conflict";
+import {
+ CrdtEntityType,
+ type CrdtSyncStateManager,
+ crdtSyncStateManager as defaultCrdtSyncStateManager,
+} from "./crdt";
import type { PullService, SyncPullResult } from "./pull";
import type { PushService, SyncPushResult } from "./push";
-import type { SyncQueue, SyncQueueState } from "./queue";
+import type { PendingChanges, SyncQueue, SyncQueueState } from "./queue";
/**
* Sync result from a full sync operation
@@ -11,6 +16,8 @@ export interface SyncResult {
pushResult: SyncPushResult | null;
pullResult: SyncPullResult | null;
conflictsResolved: number;
+ /** Number of CRDT documents stored during sync */
+ crdtDocumentsStored: number;
error?: string;
}
@@ -23,6 +30,11 @@ export interface SyncManagerOptions {
pullService: PullService;
conflictResolver: ConflictResolver;
/**
+ * CRDT sync state manager for storing CRDT document binaries
+ * Default: singleton crdtSyncStateManager
+ */
+ crdtSyncStateManager?: CrdtSyncStateManager;
+ /**
* Debounce time in ms before syncing after coming online
* Default: 1000ms
*/
@@ -44,7 +56,8 @@ export type SyncManagerEvent =
| { type: "offline" }
| { type: "sync_start" }
| { type: "sync_complete"; result: SyncResult }
- | { type: "sync_error"; error: string };
+ | { type: "sync_error"; error: string }
+ | { type: "crdt_documents_stored"; count: number };
/**
* Sync Manager
@@ -54,12 +67,14 @@ export type SyncManagerEvent =
* 2. Triggers sync when coming back online
* 3. Coordinates push, pull, and conflict resolution
* 4. Manages sync state and notifies listeners
+ * 5. Stores CRDT document binaries for conflict-free sync
*/
export class SyncManager {
private syncQueue: SyncQueue;
private pushService: PushService;
private pullService: PullService;
private conflictResolver: ConflictResolver;
+ private crdtSyncStateManager: CrdtSyncStateManager;
private debounceMs: number;
private autoSync: boolean;
private listeners: Set<SyncManagerListener> = new Set();
@@ -75,6 +90,8 @@ export class SyncManager {
this.pushService = options.pushService;
this.pullService = options.pullService;
this.conflictResolver = options.conflictResolver;
+ this.crdtSyncStateManager =
+ options.crdtSyncStateManager ?? defaultCrdtSyncStateManager;
this.debounceMs = options.debounceMs ?? 1000;
this.autoSync = options.autoSync ?? true;
this.isOnline = typeof navigator !== "undefined" ? navigator.onLine : true;
@@ -206,6 +223,7 @@ export class SyncManager {
pushResult: null,
pullResult: null,
conflictsResolved: 0,
+ crdtDocumentsStored: 0,
error: "Offline",
};
}
@@ -216,6 +234,7 @@ export class SyncManager {
pushResult: null,
pullResult: null,
conflictsResolved: 0,
+ crdtDocumentsStored: 0,
error: "Sync already in progress",
};
}
@@ -226,27 +245,56 @@ export class SyncManager {
try {
await this.syncQueue.startSync();
+ // Get pending changes before push to store CRDT documents
+ const pendingChanges = await this.syncQueue.getPendingChanges();
+
// Step 1: Push local changes
const pushResult = await this.pushService.push();
- // Step 2: Pull server changes
+ // Step 2: Store CRDT documents for successfully pushed entities
+ const crdtDocumentsStored = await this.storeCrdtDocumentsAfterPush(
+ pendingChanges,
+ pushResult,
+ );
+
+ if (crdtDocumentsStored > 0) {
+ this.notifyListeners({
+ type: "crdt_documents_stored",
+ count: crdtDocumentsStored,
+ });
+ }
+
+ // Step 3: Pull server changes
const pullResult = await this.pullService.pull();
- // Step 3: Resolve any conflicts
+ // Step 4: Resolve any conflicts using CRDT merge
let conflictsResolved = 0;
if (this.conflictResolver.hasConflicts(pushResult)) {
const resolution = await this.conflictResolver.resolveConflicts(
pushResult,
pullResult,
);
- conflictsResolved = resolution.decks.length + resolution.cards.length;
+ conflictsResolved =
+ resolution.decks.length +
+ resolution.cards.length +
+ resolution.noteTypes.length +
+ resolution.noteFieldTypes.length +
+ resolution.notes.length +
+ resolution.noteFieldValues.length;
}
+ // Step 5: Update CRDT sync metadata
+ await this.crdtSyncStateManager.setMetadata({
+ lastSyncAt: Date.now(),
+ syncVersionWatermark: pullResult.currentSyncVersion,
+ });
+
const result: SyncResult = {
success: true,
pushResult,
pullResult,
conflictsResolved,
+ crdtDocumentsStored,
};
this.notifyListeners({ type: "sync_complete", result });
@@ -261,6 +309,7 @@ export class SyncManager {
pushResult: null,
pullResult: null,
conflictsResolved: 0,
+ crdtDocumentsStored: 0,
error: errorMessage,
};
@@ -272,6 +321,152 @@ export class SyncManager {
}
/**
+ * Store CRDT document binaries after successful push
+ * This ensures we have local CRDT state for future conflict resolution
+ */
+ private async storeCrdtDocumentsAfterPush(
+ pendingChanges: PendingChanges,
+ pushResult: SyncPushResult,
+ ): Promise<number> {
+ const entriesToStore: Array<{
+ entityType: (typeof CrdtEntityType)[keyof typeof CrdtEntityType];
+ entityId: string;
+ binary: Uint8Array;
+ syncVersion: number;
+ }> = [];
+
+ // Helper to find sync version from push result
+ const findSyncVersion = (
+ results: { id: string; syncVersion: number }[],
+ id: string,
+ ): number | undefined => {
+ return results.find((r) => r.id === id)?.syncVersion;
+ };
+
+ // Import CRDT repositories dynamically to avoid circular dependencies
+ const {
+ crdtDeckRepository,
+ crdtNoteTypeRepository,
+ crdtNoteFieldTypeRepository,
+ crdtNoteRepository,
+ crdtNoteFieldValueRepository,
+ crdtCardRepository,
+ crdtReviewLogRepository,
+ } = await import("./crdt");
+
+ // Process pushed decks
+ for (const deck of pendingChanges.decks) {
+ const syncVersion = findSyncVersion(pushResult.decks, deck.id);
+ if (syncVersion !== undefined) {
+ const result = crdtDeckRepository.toCrdtDocument(deck);
+ entriesToStore.push({
+ entityType: CrdtEntityType.Deck,
+ entityId: deck.id,
+ binary: result.binary,
+ syncVersion,
+ });
+ }
+ }
+
+ // Process pushed note types
+ for (const noteType of pendingChanges.noteTypes) {
+ const syncVersion = findSyncVersion(pushResult.noteTypes, noteType.id);
+ if (syncVersion !== undefined) {
+ const result = crdtNoteTypeRepository.toCrdtDocument(noteType);
+ entriesToStore.push({
+ entityType: CrdtEntityType.NoteType,
+ entityId: noteType.id,
+ binary: result.binary,
+ syncVersion,
+ });
+ }
+ }
+
+ // Process pushed note field types
+ for (const fieldType of pendingChanges.noteFieldTypes) {
+ const syncVersion = findSyncVersion(
+ pushResult.noteFieldTypes,
+ fieldType.id,
+ );
+ if (syncVersion !== undefined) {
+ const result = crdtNoteFieldTypeRepository.toCrdtDocument(fieldType);
+ entriesToStore.push({
+ entityType: CrdtEntityType.NoteFieldType,
+ entityId: fieldType.id,
+ binary: result.binary,
+ syncVersion,
+ });
+ }
+ }
+
+ // Process pushed notes
+ for (const note of pendingChanges.notes) {
+ const syncVersion = findSyncVersion(pushResult.notes, note.id);
+ if (syncVersion !== undefined) {
+ const result = crdtNoteRepository.toCrdtDocument(note);
+ entriesToStore.push({
+ entityType: CrdtEntityType.Note,
+ entityId: note.id,
+ binary: result.binary,
+ syncVersion,
+ });
+ }
+ }
+
+ // Process pushed note field values
+ for (const fieldValue of pendingChanges.noteFieldValues) {
+ const syncVersion = findSyncVersion(
+ pushResult.noteFieldValues,
+ fieldValue.id,
+ );
+ if (syncVersion !== undefined) {
+ const result = crdtNoteFieldValueRepository.toCrdtDocument(fieldValue);
+ entriesToStore.push({
+ entityType: CrdtEntityType.NoteFieldValue,
+ entityId: fieldValue.id,
+ binary: result.binary,
+ syncVersion,
+ });
+ }
+ }
+
+ // Process pushed cards
+ for (const card of pendingChanges.cards) {
+ const syncVersion = findSyncVersion(pushResult.cards, card.id);
+ if (syncVersion !== undefined) {
+ const result = crdtCardRepository.toCrdtDocument(card);
+ entriesToStore.push({
+ entityType: CrdtEntityType.Card,
+ entityId: card.id,
+ binary: result.binary,
+ syncVersion,
+ });
+ }
+ }
+
+ // Process pushed review logs
+ for (const reviewLog of pendingChanges.reviewLogs) {
+ const syncVersion = findSyncVersion(pushResult.reviewLogs, reviewLog.id);
+ if (syncVersion !== undefined) {
+ const result = crdtReviewLogRepository.toCrdtDocument(reviewLog);
+ entriesToStore.push({
+ entityType: CrdtEntityType.ReviewLog,
+ entityId: reviewLog.id,
+ binary: result.binary,
+ syncVersion,
+ });
+ }
+ }
+
+ // Batch store all entries
+ if (entriesToStore.length > 0) {
+ await this.crdtSyncStateManager.batchSetDocuments(entriesToStore);
+ }
+
+ return entriesToStore.length;
+ }
+
+ /**
* Force sync even if auto-sync is disabled
*/
async forceSync(): Promise<SyncResult> {
@@ -291,6 +486,44 @@ export class SyncManager {
isAutoSyncEnabled(): boolean {
return this.autoSync;
}
+
+ /**
+ * Get CRDT sync statistics
+ */
+ async getCrdtSyncStats(): Promise<{
+ totalDocuments: number;
+ lastSyncAt: number;
+ syncVersionWatermark: number;
+ }> {
+ const [totalDocuments, metadata] = await Promise.all([
+ this.crdtSyncStateManager.getTotalDocumentCount(),
+ this.crdtSyncStateManager.getMetadata(),
+ ]);
+
+ return {
+ totalDocuments,
+ lastSyncAt: metadata?.lastSyncAt ?? 0,
+ syncVersionWatermark: metadata?.syncVersionWatermark ?? 0,
+ };
+ }
+
+ /**
+ * Clear all CRDT sync state
+ * Use this when resetting sync or logging out
+ */
+ async clearCrdtState(): Promise<void> {
+ await this.crdtSyncStateManager.clearAll();
+ }
+
+ /**
+ * Check if a document has CRDT state stored
+ */
+ async hasCrdtDocument(
+ entityType: (typeof CrdtEntityType)[keyof typeof CrdtEntityType],
+ entityId: string,
+ ): Promise<boolean> {
+ return this.crdtSyncStateManager.hasDocument(entityType, entityId);
+ }
}
/**