diff options
| author | nsfisis <nsfisis@gmail.com> | 2025-12-31 15:46:41 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2025-12-31 15:46:41 +0900 |
| commit | 3810450c20326998aef17c0acfcd5893e7b3ca20 (patch) | |
| tree | adf5434c2afe3b24ddada7066c697070be0d5950 /src | |
| parent | f3952a509b2d98a25cbb80c9ad091b3b471be52e (diff) | |
| download | kioku-3810450c20326998aef17c0acfcd5893e7b3ca20.tar.gz kioku-3810450c20326998aef17c0acfcd5893e7b3ca20.tar.zst kioku-3810450c20326998aef17c0acfcd5893e7b3ca20.zip | |
feat(crdt): replace LWW conflict resolution with Automerge merge
The ConflictResolver now defaults to CRDT strategy which uses Automerge
to merge local and server documents during sync conflicts. This provides
conflict-free resolution that preserves changes from both sides.
Key changes:
- Add CRDT merge methods for all entity types (deck, card, note, etc.)
- Update resolve methods to accept optional CRDT binary data
- Fall back to server_wins when CRDT data is unavailable or invalid
- Add comprehensive tests for CRDT conflict resolution scenarios
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/sync/conflict.test.ts | 269 | ||||
| -rw-r--r-- | src/client/sync/conflict.ts | 437 | ||||
| -rw-r--r-- | src/client/sync/push.test.ts | 5 |
3 files changed, 698 insertions, 13 deletions
diff --git a/src/client/sync/conflict.test.ts b/src/client/sync/conflict.test.ts index 52362ff..e648373 100644 --- a/src/client/sync/conflict.test.ts +++ b/src/client/sync/conflict.test.ts @@ -6,6 +6,12 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { CardState, db } from "../db/index"; import { localCardRepository, localDeckRepository } from "../db/repositories"; import { ConflictResolver } from "./conflict"; +import { + binaryToBase64, + crdtDeckRepository, + crdtSyncDb, + crdtSyncStateManager, +} from "./crdt"; import type { SyncPullResult } from "./pull"; import type { SyncPushResult } from "./push"; @@ -37,6 +43,8 @@ describe("ConflictResolver", () => { await db.decks.clear(); await db.cards.clear(); await db.reviewLogs.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); localStorage.clear(); }); @@ -44,6 +52,8 @@ describe("ConflictResolver", () => { await db.decks.clear(); await db.cards.clear(); await db.reviewLogs.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); localStorage.clear(); }); @@ -581,7 +591,7 @@ describe("ConflictResolver", () => { expect(localDeck?.name).toBe("Local Only Deck"); }); - it("should default to server_wins strategy", async () => { + it("should default to crdt strategy", async () => { const deck = await localDeckRepository.create({ userId: "user-1", name: "Local Name", @@ -619,7 +629,8 @@ describe("ConflictResolver", () => { ...createEmptyPullResult(5), }; - // Create resolver without explicit strategy + // Create resolver without explicit strategy - defaults to CRDT + // Without CRDT data, falls back to server_wins behavior const resolver = new ConflictResolver(); const result = await resolver.resolveConflicts(pushResult, pullResult); @@ -629,4 +640,258 @@ describe("ConflictResolver", () => { expect(updatedDeck?.name).toBe("Server Name"); }); }); + + describe("CRDT conflict resolution", () => { + it("should merge deck using CRDT when crdtChanges are provided", async () => { + // Create a local deck + const localDeck = await localDeckRepository.create({ + userId: "user-1", + name: "Local Deck Name", + description: "Local description", + newCardsPerDay: 10, + }); + + // Store local CRDT document + const localCrdtResult = crdtDeckRepository.toCrdtDocument(localDeck); + await crdtSyncStateManager.setDocumentBinary( + "deck", + localDeck.id, + localCrdtResult.binary, + 1, + ); + + // Create a "server" version with different data + const serverDeckData = { + id: localDeck.id, + userId: "user-1", + name: "Server Deck Name", + description: "Server description", + newCardsPerDay: 20, + createdAt: localDeck.createdAt, + updatedAt: new Date(Date.now() + 1000), + deletedAt: null, + syncVersion: 5, + }; + + // Create server CRDT document + const serverCrdtResult = crdtDeckRepository.toCrdtDocument({ + ...serverDeckData, + _synced: true, + }); + + const pushResult: SyncPushResult = { + decks: [{ id: localDeck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + conflicts: { ...createEmptyConflicts(), decks: [localDeck.id] }, + }; + + const pullResult: SyncPullResult = { + decks: [serverDeckData], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(5), + crdtChanges: [ + { + documentId: `deck:${localDeck.id}`, + entityType: "deck", + entityId: localDeck.id, + binary: binaryToBase64(serverCrdtResult.binary), + }, + ], + }; + + const resolver = new ConflictResolver({ strategy: "crdt" }); + const result = await resolver.resolveConflicts(pushResult, pullResult); + + expect(result.decks).toHaveLength(1); + expect(result.decks[0]?.resolution).toBe("server_wins"); + + // Verify the CRDT sync state was updated + const storedBinary = await crdtSyncStateManager.getDocumentBinary( + "deck", + localDeck.id, + ); + expect(storedBinary).toBeDefined(); + }); + + it("should fall back to server_wins when CRDT merge fails", async () => { + const localDeck = await localDeckRepository.create({ + userId: "user-1", + name: "Local Name", + description: null, + newCardsPerDay: 10, + }); + + const serverDeck = { + id: localDeck.id, + userId: "user-1", + name: "Server Name", + description: null, + newCardsPerDay: 20, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + syncVersion: 5, + }; + + const pushResult: SyncPushResult = { + decks: [{ id: localDeck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + conflicts: { ...createEmptyConflicts(), decks: [localDeck.id] }, + }; + + const pullResult: SyncPullResult = { + decks: [serverDeck], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(5), + crdtChanges: [ + { + documentId: `deck:${localDeck.id}`, + entityType: "deck", + entityId: localDeck.id, + // Invalid base64 - should trigger fallback + binary: "invalid-base64-data!!!", + }, + ], + }; + + const resolver = new ConflictResolver({ strategy: "crdt" }); + const result = await resolver.resolveConflicts(pushResult, pullResult); + + // Should still resolve using fallback + expect(result.decks).toHaveLength(1); + expect(result.decks[0]?.resolution).toBe("server_wins"); + + // Server data should be applied + const updatedDeck = await localDeckRepository.findById(localDeck.id); + expect(updatedDeck?.name).toBe("Server Name"); + }); + + it("should fall back to server_wins when no CRDT data is available", async () => { + const localDeck = await localDeckRepository.create({ + userId: "user-1", + name: "Local Name", + description: null, + newCardsPerDay: 10, + }); + + const serverDeck = { + id: localDeck.id, + userId: "user-1", + name: "Server Name", + description: null, + newCardsPerDay: 20, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + syncVersion: 5, + }; + + const pushResult: SyncPushResult = { + decks: [{ id: localDeck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + conflicts: { ...createEmptyConflicts(), decks: [localDeck.id] }, + }; + + // No crdtChanges in pull result + const pullResult: SyncPullResult = { + decks: [serverDeck], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(5), + }; + + const resolver = new ConflictResolver({ strategy: "crdt" }); + const result = await resolver.resolveConflicts(pushResult, pullResult); + + expect(result.decks).toHaveLength(1); + expect(result.decks[0]?.resolution).toBe("server_wins"); + + const updatedDeck = await localDeckRepository.findById(localDeck.id); + expect(updatedDeck?.name).toBe("Server Name"); + }); + + it("should use CRDT to merge when local has no existing CRDT document", async () => { + // Create a local deck without a CRDT document + const localDeck = await localDeckRepository.create({ + userId: "user-1", + name: "Local Name", + description: null, + newCardsPerDay: 10, + }); + + const serverDeck = { + id: localDeck.id, + userId: "user-1", + name: "Server Name", + description: null, + newCardsPerDay: 20, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + syncVersion: 5, + }; + + // Create server CRDT document + const serverCrdtResult = crdtDeckRepository.toCrdtDocument({ + ...serverDeck, + _synced: true, + }); + + const pushResult: SyncPushResult = { + decks: [{ id: localDeck.id, syncVersion: 1 }], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + conflicts: { ...createEmptyConflicts(), decks: [localDeck.id] }, + }; + + const pullResult: SyncPullResult = { + decks: [serverDeck], + cards: [], + reviewLogs: [], + ...createEmptyPullResult(5), + crdtChanges: [ + { + documentId: `deck:${localDeck.id}`, + entityType: "deck", + entityId: localDeck.id, + binary: binaryToBase64(serverCrdtResult.binary), + }, + ], + }; + + const resolver = new ConflictResolver({ strategy: "crdt" }); + const result = await resolver.resolveConflicts(pushResult, pullResult); + + expect(result.decks).toHaveLength(1); + expect(result.decks[0]?.resolution).toBe("server_wins"); + + // Verify CRDT document was stored + const storedBinary = await crdtSyncStateManager.getDocumentBinary( + "deck", + localDeck.id, + ); + expect(storedBinary).toBeDefined(); + }); + }); }); diff --git a/src/client/sync/conflict.ts b/src/client/sync/conflict.ts index 2451920..a49e9b2 100644 --- a/src/client/sync/conflict.ts +++ b/src/client/sync/conflict.ts @@ -14,6 +14,17 @@ import { localNoteRepository, localNoteTypeRepository, } from "../db/repositories"; +import { + type CrdtSyncPayload, + crdtCardRepository, + crdtDeckRepository, + crdtNoteFieldTypeRepository, + crdtNoteFieldValueRepository, + crdtNoteRepository, + crdtNoteTypeRepository, + crdtSyncStateManager, +} from "./crdt"; +import { base64ToBinary } from "./crdt/sync-state"; import type { ServerCard, ServerDeck, @@ -51,15 +62,16 @@ export interface ConflictResolutionResult { export interface ConflictResolverOptions { /** * Strategy for resolving conflicts - * - "server_wins": Always use server data (default for LWW) + * - "crdt": Use Automerge CRDT merge for conflict-free resolution (default) + * - "server_wins": Always use server data (fallback when no CRDT data) * - "local_wins": Always use local data - * - "newer_wins": Compare timestamps and use newer data + * - "newer_wins": Compare timestamps and use newer data (legacy LWW) */ - strategy?: "server_wins" | "local_wins" | "newer_wins"; + strategy?: "crdt" | "server_wins" | "local_wins" | "newer_wins"; } /** - * Compare timestamps for LWW resolution + * Compare timestamps for LWW resolution (legacy fallback) * Returns true if server data is newer or equal */ function isServerNewer(serverUpdatedAt: Date, localUpdatedAt: Date): boolean { @@ -67,6 +79,15 @@ function isServerNewer(serverUpdatedAt: Date, localUpdatedAt: Date): boolean { } /** + * CRDT merge result with entity data + */ +interface CrdtMergeConflictResult<T> { + entity: Omit<T, "_synced">; + binary: Uint8Array; + hadLocalDocument: boolean; +} + +/** * Convert server deck to local format for storage */ function serverDeckToLocal(deck: ServerDeck): LocalDeck { @@ -191,15 +212,15 @@ function serverNoteFieldValueToLocal( * Handles conflicts reported by the server during push operations. * When a conflict occurs (server has newer data), this resolver: * 1. Identifies conflicting items from push result - * 2. Pulls latest server data for those items - * 3. Applies conflict resolution strategy (default: server wins / LWW) + * 2. Uses Automerge CRDT merge for conflict-free resolution (default) + * 3. Falls back to LWW strategies when CRDT data is unavailable * 4. Updates local database accordingly */ export class ConflictResolver { - private strategy: "server_wins" | "local_wins" | "newer_wins"; + private strategy: "crdt" | "server_wins" | "local_wins" | "newer_wins"; constructor(options: ConflictResolverOptions = {}) { - this.strategy = options.strategy ?? "server_wins"; + this.strategy = options.strategy ?? "crdt"; } /** @@ -236,10 +257,36 @@ export class ConflictResolver { async resolveDeckConflict( localDeck: LocalDeck, serverDeck: ServerDeck, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeDeckWithCrdt( + localDeck, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalDeck = { + ...mergeResult.entity, + _synced: true, + }; + await localDeckRepository.upsertFromServer(localData); + // Store the merged CRDT binary + await crdtSyncStateManager.setDocumentBinary( + "deck", + localDeck.id, + mergeResult.binary, + serverDeck.syncVersion, + ); + return { id: localDeck.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -267,15 +314,76 @@ export class ConflictResolver { } /** + * Merge deck using CRDT + */ + private async mergeDeckWithCrdt( + localDeck: LocalDeck, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalDeck> | null> { + try { + // Get local CRDT binary if it exists + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "deck", + localDeck.id, + ); + + // If no local CRDT binary, create one from local entity + const localDoc = localBinary + ? crdtDeckRepository.fromBinary(localBinary) + : crdtDeckRepository.toCrdtDocument(localDeck).doc; + + // Load server document + const serverDoc = crdtDeckRepository.fromBinary(serverBinary); + + // Merge documents + const mergeResult = crdtDeckRepository.merge(localDoc, serverDoc); + + return { + entity: crdtDeckRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn("CRDT merge failed for deck, falling back to LWW:", error); + return null; + } + } + + /** * Resolve card conflict using configured strategy */ async resolveCardConflict( localCard: LocalCard, serverCard: ServerCard, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeCardWithCrdt( + localCard, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalCard = { + ...mergeResult.entity, + _synced: true, + }; + await localCardRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "card", + localCard.id, + mergeResult.binary, + serverCard.syncVersion, + ); + return { id: localCard.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -303,15 +411,71 @@ export class ConflictResolver { } /** + * Merge card using CRDT + */ + private async mergeCardWithCrdt( + localCard: LocalCard, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalCard> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "card", + localCard.id, + ); + + const localDoc = localBinary + ? crdtCardRepository.fromBinary(localBinary) + : crdtCardRepository.toCrdtDocument(localCard).doc; + + const serverDoc = crdtCardRepository.fromBinary(serverBinary); + const mergeResult = crdtCardRepository.merge(localDoc, serverDoc); + + return { + entity: crdtCardRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn("CRDT merge failed for card, falling back to LWW:", error); + return null; + } + } + + /** * Resolve note type conflict using configured strategy */ async resolveNoteTypeConflict( localNoteType: LocalNoteType, serverNoteType: ServerNoteType, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeNoteTypeWithCrdt( + localNoteType, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalNoteType = { + ...mergeResult.entity, + _synced: true, + }; + await localNoteTypeRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "noteType", + localNoteType.id, + mergeResult.binary, + serverNoteType.syncVersion, + ); + return { id: localNoteType.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -337,15 +501,74 @@ export class ConflictResolver { } /** + * Merge note type using CRDT + */ + private async mergeNoteTypeWithCrdt( + localNoteType: LocalNoteType, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalNoteType> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "noteType", + localNoteType.id, + ); + + const localDoc = localBinary + ? crdtNoteTypeRepository.fromBinary(localBinary) + : crdtNoteTypeRepository.toCrdtDocument(localNoteType).doc; + + const serverDoc = crdtNoteTypeRepository.fromBinary(serverBinary); + const mergeResult = crdtNoteTypeRepository.merge(localDoc, serverDoc); + + return { + entity: crdtNoteTypeRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn( + "CRDT merge failed for note type, falling back to LWW:", + error, + ); + return null; + } + } + + /** * Resolve note field type conflict using configured strategy */ async resolveNoteFieldTypeConflict( localFieldType: LocalNoteFieldType, serverFieldType: ServerNoteFieldType, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeNoteFieldTypeWithCrdt( + localFieldType, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalNoteFieldType = { + ...mergeResult.entity, + _synced: true, + }; + await localNoteFieldTypeRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "noteFieldType", + localFieldType.id, + mergeResult.binary, + serverFieldType.syncVersion, + ); + return { id: localFieldType.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -371,15 +594,77 @@ export class ConflictResolver { } /** + * Merge note field type using CRDT + */ + private async mergeNoteFieldTypeWithCrdt( + localFieldType: LocalNoteFieldType, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalNoteFieldType> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "noteFieldType", + localFieldType.id, + ); + + const localDoc = localBinary + ? crdtNoteFieldTypeRepository.fromBinary(localBinary) + : crdtNoteFieldTypeRepository.toCrdtDocument(localFieldType).doc; + + const serverDoc = crdtNoteFieldTypeRepository.fromBinary(serverBinary); + const mergeResult = crdtNoteFieldTypeRepository.merge( + localDoc, + serverDoc, + ); + + return { + entity: crdtNoteFieldTypeRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn( + "CRDT merge failed for note field type, falling back to LWW:", + error, + ); + return null; + } + } + + /** * Resolve note conflict using configured strategy */ async resolveNoteConflict( localNote: LocalNote, serverNote: ServerNote, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeNoteWithCrdt( + localNote, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalNote = { + ...mergeResult.entity, + _synced: true, + }; + await localNoteRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "note", + localNote.id, + mergeResult.binary, + serverNote.syncVersion, + ); + return { id: localNote.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -405,15 +690,71 @@ export class ConflictResolver { } /** + * Merge note using CRDT + */ + private async mergeNoteWithCrdt( + localNote: LocalNote, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalNote> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "note", + localNote.id, + ); + + const localDoc = localBinary + ? crdtNoteRepository.fromBinary(localBinary) + : crdtNoteRepository.toCrdtDocument(localNote).doc; + + const serverDoc = crdtNoteRepository.fromBinary(serverBinary); + const mergeResult = crdtNoteRepository.merge(localDoc, serverDoc); + + return { + entity: crdtNoteRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn("CRDT merge failed for note, falling back to LWW:", error); + return null; + } + } + + /** * Resolve note field value conflict using configured strategy */ async resolveNoteFieldValueConflict( localFieldValue: LocalNoteFieldValue, serverFieldValue: ServerNoteFieldValue, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeNoteFieldValueWithCrdt( + localFieldValue, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalNoteFieldValue = { + ...mergeResult.entity, + _synced: true, + }; + await localNoteFieldValueRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "noteFieldValue", + localFieldValue.id, + mergeResult.binary, + serverFieldValue.syncVersion, + ); + return { id: localFieldValue.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -439,8 +780,46 @@ export class ConflictResolver { } /** + * Merge note field value using CRDT + */ + private async mergeNoteFieldValueWithCrdt( + localFieldValue: LocalNoteFieldValue, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalNoteFieldValue> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "noteFieldValue", + localFieldValue.id, + ); + + const localDoc = localBinary + ? crdtNoteFieldValueRepository.fromBinary(localBinary) + : crdtNoteFieldValueRepository.toCrdtDocument(localFieldValue).doc; + + const serverDoc = crdtNoteFieldValueRepository.fromBinary(serverBinary); + const mergeResult = crdtNoteFieldValueRepository.merge( + localDoc, + serverDoc, + ); + + return { + entity: crdtNoteFieldValueRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn( + "CRDT merge failed for note field value, falling back to LWW:", + error, + ); + return null; + } + } + + /** * Resolve all conflicts from a push result * Uses pull result to get server data for conflicting items + * When CRDT changes are available, uses Automerge merge for resolution */ async resolveConflicts( pushResult: SyncPushResult, @@ -455,15 +834,42 @@ export class ConflictResolver { noteFieldValues: [], }; + // Build a map of CRDT payloads by document ID for quick lookup + const crdtPayloadMap = new Map<string, CrdtSyncPayload>(); + if (pullResult.crdtChanges) { + for (const payload of pullResult.crdtChanges) { + crdtPayloadMap.set(payload.documentId, payload); + } + } + + // Helper to get CRDT binary for an entity + const getCrdtBinary = ( + entityType: string, + entityId: string, + ): Uint8Array | undefined => { + const payload = crdtPayloadMap.get(`${entityType}:${entityId}`); + if (!payload) return undefined; + try { + return base64ToBinary(payload.binary); + } catch { + console.warn( + `Failed to decode base64 for ${entityType}:${entityId}, skipping CRDT merge`, + ); + return undefined; + } + }; + // Resolve deck conflicts for (const deckId of pushResult.conflicts.decks) { const localDeck = await localDeckRepository.findById(deckId); const serverDeck = pullResult.decks.find((d) => d.id === deckId); + const crdtBinary = getCrdtBinary("deck", deckId); if (localDeck && serverDeck) { const resolution = await this.resolveDeckConflict( localDeck, serverDeck, + crdtBinary, ); result.decks.push(resolution); } else if (serverDeck) { @@ -479,11 +885,13 @@ export class ConflictResolver { for (const cardId of pushResult.conflicts.cards) { const localCard = await localCardRepository.findById(cardId); const serverCard = pullResult.cards.find((c) => c.id === cardId); + const crdtBinary = getCrdtBinary("card", cardId); if (localCard && serverCard) { const resolution = await this.resolveCardConflict( localCard, serverCard, + crdtBinary, ); result.cards.push(resolution); } else if (serverCard) { @@ -501,11 +909,13 @@ export class ConflictResolver { const serverNoteType = pullResult.noteTypes.find( (nt) => nt.id === noteTypeId, ); + const crdtBinary = getCrdtBinary("noteType", noteTypeId); if (localNoteType && serverNoteType) { const resolution = await this.resolveNoteTypeConflict( localNoteType, serverNoteType, + crdtBinary, ); result.noteTypes.push(resolution); } else if (serverNoteType) { @@ -522,11 +932,13 @@ export class ConflictResolver { const serverFieldType = pullResult.noteFieldTypes.find( (ft) => ft.id === fieldTypeId, ); + const crdtBinary = getCrdtBinary("noteFieldType", fieldTypeId); if (localFieldType && serverFieldType) { const resolution = await this.resolveNoteFieldTypeConflict( localFieldType, serverFieldType, + crdtBinary, ); result.noteFieldTypes.push(resolution); } else if (serverFieldType) { @@ -543,11 +955,13 @@ export class ConflictResolver { for (const noteId of pushResult.conflicts.notes) { const localNote = await localNoteRepository.findById(noteId); const serverNote = pullResult.notes.find((n) => n.id === noteId); + const crdtBinary = getCrdtBinary("note", noteId); if (localNote && serverNote) { const resolution = await this.resolveNoteConflict( localNote, serverNote, + crdtBinary, ); result.notes.push(resolution); } else if (serverNote) { @@ -564,11 +978,13 @@ export class ConflictResolver { const serverFieldValue = pullResult.noteFieldValues.find( (fv) => fv.id === fieldValueId, ); + const crdtBinary = getCrdtBinary("noteFieldValue", fieldValueId); if (localFieldValue && serverFieldValue) { const resolution = await this.resolveNoteFieldValueConflict( localFieldValue, serverFieldValue, + crdtBinary, ); result.noteFieldValues.push(resolution); } else if (serverFieldValue) { @@ -595,8 +1011,9 @@ export function createConflictResolver( } /** - * Default conflict resolver using LWW (server wins) strategy + * Default conflict resolver using CRDT (Automerge) strategy + * Falls back to server_wins when CRDT data is unavailable */ export const conflictResolver = new ConflictResolver({ - strategy: "server_wins", + strategy: "crdt", }); diff --git a/src/client/sync/push.test.ts b/src/client/sync/push.test.ts index bce4652..19b39da 100644 --- a/src/client/sync/push.test.ts +++ b/src/client/sync/push.test.ts @@ -1140,7 +1140,10 @@ describe("generateCrdtChanges", () => { expect(crdtChanges[0]?.documentId).toBe("deck:deck-1"); expect(crdtChanges[0]?.binary).toBeDefined(); // Verify it's valid base64 - expect(() => base64ToBinary(crdtChanges[0]!.binary)).not.toThrow(); + const binary = crdtChanges[0]?.binary; + if (binary) { + expect(() => base64ToBinary(binary)).not.toThrow(); + } }); it("should generate CRDT changes for cards", () => { |
