diff options
Diffstat (limited to 'src/client/sync/conflict.ts')
| -rw-r--r-- | src/client/sync/conflict.ts | 437 |
1 files changed, 427 insertions, 10 deletions
diff --git a/src/client/sync/conflict.ts b/src/client/sync/conflict.ts index 2451920..a49e9b2 100644 --- a/src/client/sync/conflict.ts +++ b/src/client/sync/conflict.ts @@ -14,6 +14,17 @@ import { localNoteRepository, localNoteTypeRepository, } from "../db/repositories"; +import { + type CrdtSyncPayload, + crdtCardRepository, + crdtDeckRepository, + crdtNoteFieldTypeRepository, + crdtNoteFieldValueRepository, + crdtNoteRepository, + crdtNoteTypeRepository, + crdtSyncStateManager, +} from "./crdt"; +import { base64ToBinary } from "./crdt/sync-state"; import type { ServerCard, ServerDeck, @@ -51,15 +62,16 @@ export interface ConflictResolutionResult { export interface ConflictResolverOptions { /** * Strategy for resolving conflicts - * - "server_wins": Always use server data (default for LWW) + * - "crdt": Use Automerge CRDT merge for conflict-free resolution (default) + * - "server_wins": Always use server data (fallback when no CRDT data) * - "local_wins": Always use local data - * - "newer_wins": Compare timestamps and use newer data + * - "newer_wins": Compare timestamps and use newer data (legacy LWW) */ - strategy?: "server_wins" | "local_wins" | "newer_wins"; + strategy?: "crdt" | "server_wins" | "local_wins" | "newer_wins"; } /** - * Compare timestamps for LWW resolution + * Compare timestamps for LWW resolution (legacy fallback) * Returns true if server data is newer or equal */ function isServerNewer(serverUpdatedAt: Date, localUpdatedAt: Date): boolean { @@ -67,6 +79,15 @@ function isServerNewer(serverUpdatedAt: Date, localUpdatedAt: Date): boolean { } /** + * CRDT merge result with entity data + */ +interface CrdtMergeConflictResult<T> { + entity: Omit<T, "_synced">; + binary: Uint8Array; + hadLocalDocument: boolean; +} + +/** * Convert server deck to local format for storage */ function serverDeckToLocal(deck: ServerDeck): LocalDeck { @@ -191,15 +212,15 @@ function serverNoteFieldValueToLocal( * Handles conflicts reported by the server during push operations. * When a conflict occurs (server has newer data), this resolver: * 1. Identifies conflicting items from push result - * 2. Pulls latest server data for those items - * 3. Applies conflict resolution strategy (default: server wins / LWW) + * 2. Uses Automerge CRDT merge for conflict-free resolution (default) + * 3. Falls back to LWW strategies when CRDT data is unavailable * 4. Updates local database accordingly */ export class ConflictResolver { - private strategy: "server_wins" | "local_wins" | "newer_wins"; + private strategy: "crdt" | "server_wins" | "local_wins" | "newer_wins"; constructor(options: ConflictResolverOptions = {}) { - this.strategy = options.strategy ?? "server_wins"; + this.strategy = options.strategy ?? "crdt"; } /** @@ -236,10 +257,36 @@ export class ConflictResolver { async resolveDeckConflict( localDeck: LocalDeck, serverDeck: ServerDeck, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeDeckWithCrdt( + localDeck, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalDeck = { + ...mergeResult.entity, + _synced: true, + }; + await localDeckRepository.upsertFromServer(localData); + // Store the merged CRDT binary + await crdtSyncStateManager.setDocumentBinary( + "deck", + localDeck.id, + mergeResult.binary, + serverDeck.syncVersion, + ); + return { id: localDeck.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -267,15 +314,76 @@ export class ConflictResolver { } /** + * Merge deck using CRDT + */ + private async mergeDeckWithCrdt( + localDeck: LocalDeck, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalDeck> | null> { + try { + // Get local CRDT binary if it exists + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "deck", + localDeck.id, + ); + + // If no local CRDT binary, create one from local entity + const localDoc = localBinary + ? crdtDeckRepository.fromBinary(localBinary) + : crdtDeckRepository.toCrdtDocument(localDeck).doc; + + // Load server document + const serverDoc = crdtDeckRepository.fromBinary(serverBinary); + + // Merge documents + const mergeResult = crdtDeckRepository.merge(localDoc, serverDoc); + + return { + entity: crdtDeckRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn("CRDT merge failed for deck, falling back to LWW:", error); + return null; + } + } + + /** * Resolve card conflict using configured strategy */ async resolveCardConflict( localCard: LocalCard, serverCard: ServerCard, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeCardWithCrdt( + localCard, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalCard = { + ...mergeResult.entity, + _synced: true, + }; + await localCardRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "card", + localCard.id, + mergeResult.binary, + serverCard.syncVersion, + ); + return { id: localCard.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -303,15 +411,71 @@ export class ConflictResolver { } /** + * Merge card using CRDT + */ + private async mergeCardWithCrdt( + localCard: LocalCard, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalCard> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "card", + localCard.id, + ); + + const localDoc = localBinary + ? crdtCardRepository.fromBinary(localBinary) + : crdtCardRepository.toCrdtDocument(localCard).doc; + + const serverDoc = crdtCardRepository.fromBinary(serverBinary); + const mergeResult = crdtCardRepository.merge(localDoc, serverDoc); + + return { + entity: crdtCardRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn("CRDT merge failed for card, falling back to LWW:", error); + return null; + } + } + + /** * Resolve note type conflict using configured strategy */ async resolveNoteTypeConflict( localNoteType: LocalNoteType, serverNoteType: ServerNoteType, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeNoteTypeWithCrdt( + localNoteType, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalNoteType = { + ...mergeResult.entity, + _synced: true, + }; + await localNoteTypeRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "noteType", + localNoteType.id, + mergeResult.binary, + serverNoteType.syncVersion, + ); + return { id: localNoteType.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -337,15 +501,74 @@ export class ConflictResolver { } /** + * Merge note type using CRDT + */ + private async mergeNoteTypeWithCrdt( + localNoteType: LocalNoteType, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalNoteType> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "noteType", + localNoteType.id, + ); + + const localDoc = localBinary + ? crdtNoteTypeRepository.fromBinary(localBinary) + : crdtNoteTypeRepository.toCrdtDocument(localNoteType).doc; + + const serverDoc = crdtNoteTypeRepository.fromBinary(serverBinary); + const mergeResult = crdtNoteTypeRepository.merge(localDoc, serverDoc); + + return { + entity: crdtNoteTypeRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn( + "CRDT merge failed for note type, falling back to LWW:", + error, + ); + return null; + } + } + + /** * Resolve note field type conflict using configured strategy */ async resolveNoteFieldTypeConflict( localFieldType: LocalNoteFieldType, serverFieldType: ServerNoteFieldType, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeNoteFieldTypeWithCrdt( + localFieldType, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalNoteFieldType = { + ...mergeResult.entity, + _synced: true, + }; + await localNoteFieldTypeRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "noteFieldType", + localFieldType.id, + mergeResult.binary, + serverFieldType.syncVersion, + ); + return { id: localFieldType.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -371,15 +594,77 @@ export class ConflictResolver { } /** + * Merge note field type using CRDT + */ + private async mergeNoteFieldTypeWithCrdt( + localFieldType: LocalNoteFieldType, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalNoteFieldType> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "noteFieldType", + localFieldType.id, + ); + + const localDoc = localBinary + ? crdtNoteFieldTypeRepository.fromBinary(localBinary) + : crdtNoteFieldTypeRepository.toCrdtDocument(localFieldType).doc; + + const serverDoc = crdtNoteFieldTypeRepository.fromBinary(serverBinary); + const mergeResult = crdtNoteFieldTypeRepository.merge( + localDoc, + serverDoc, + ); + + return { + entity: crdtNoteFieldTypeRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn( + "CRDT merge failed for note field type, falling back to LWW:", + error, + ); + return null; + } + } + + /** * Resolve note conflict using configured strategy */ async resolveNoteConflict( localNote: LocalNote, serverNote: ServerNote, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeNoteWithCrdt( + localNote, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalNote = { + ...mergeResult.entity, + _synced: true, + }; + await localNoteRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "note", + localNote.id, + mergeResult.binary, + serverNote.syncVersion, + ); + return { id: localNote.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -405,15 +690,71 @@ export class ConflictResolver { } /** + * Merge note using CRDT + */ + private async mergeNoteWithCrdt( + localNote: LocalNote, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalNote> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "note", + localNote.id, + ); + + const localDoc = localBinary + ? crdtNoteRepository.fromBinary(localBinary) + : crdtNoteRepository.toCrdtDocument(localNote).doc; + + const serverDoc = crdtNoteRepository.fromBinary(serverBinary); + const mergeResult = crdtNoteRepository.merge(localDoc, serverDoc); + + return { + entity: crdtNoteRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn("CRDT merge failed for note, falling back to LWW:", error); + return null; + } + } + + /** * Resolve note field value conflict using configured strategy */ async resolveNoteFieldValueConflict( localFieldValue: LocalNoteFieldValue, serverFieldValue: ServerNoteFieldValue, + serverCrdtBinary?: Uint8Array, ): Promise<ConflictResolutionItem> { + // Try CRDT merge first if strategy is "crdt" and we have CRDT data + if (this.strategy === "crdt" && serverCrdtBinary) { + const mergeResult = await this.mergeNoteFieldValueWithCrdt( + localFieldValue, + serverCrdtBinary, + ); + if (mergeResult) { + const localData: LocalNoteFieldValue = { + ...mergeResult.entity, + _synced: true, + }; + await localNoteFieldValueRepository.upsertFromServer(localData); + await crdtSyncStateManager.setDocumentBinary( + "noteFieldValue", + localFieldValue.id, + mergeResult.binary, + serverFieldValue.syncVersion, + ); + return { id: localFieldValue.id, resolution: "server_wins" }; + } + } + + // Fallback to LWW strategies let resolution: "server_wins" | "local_wins"; switch (this.strategy) { + case "crdt": case "server_wins": resolution = "server_wins"; break; @@ -439,8 +780,46 @@ export class ConflictResolver { } /** + * Merge note field value using CRDT + */ + private async mergeNoteFieldValueWithCrdt( + localFieldValue: LocalNoteFieldValue, + serverBinary: Uint8Array, + ): Promise<CrdtMergeConflictResult<LocalNoteFieldValue> | null> { + try { + const localBinary = await crdtSyncStateManager.getDocumentBinary( + "noteFieldValue", + localFieldValue.id, + ); + + const localDoc = localBinary + ? crdtNoteFieldValueRepository.fromBinary(localBinary) + : crdtNoteFieldValueRepository.toCrdtDocument(localFieldValue).doc; + + const serverDoc = crdtNoteFieldValueRepository.fromBinary(serverBinary); + const mergeResult = crdtNoteFieldValueRepository.merge( + localDoc, + serverDoc, + ); + + return { + entity: crdtNoteFieldValueRepository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hadLocalDocument: localBinary !== null, + }; + } catch (error) { + console.warn( + "CRDT merge failed for note field value, falling back to LWW:", + error, + ); + return null; + } + } + + /** * Resolve all conflicts from a push result * Uses pull result to get server data for conflicting items + * When CRDT changes are available, uses Automerge merge for resolution */ async resolveConflicts( pushResult: SyncPushResult, @@ -455,15 +834,42 @@ export class ConflictResolver { noteFieldValues: [], }; + // Build a map of CRDT payloads by document ID for quick lookup + const crdtPayloadMap = new Map<string, CrdtSyncPayload>(); + if (pullResult.crdtChanges) { + for (const payload of pullResult.crdtChanges) { + crdtPayloadMap.set(payload.documentId, payload); + } + } + + // Helper to get CRDT binary for an entity + const getCrdtBinary = ( + entityType: string, + entityId: string, + ): Uint8Array | undefined => { + const payload = crdtPayloadMap.get(`${entityType}:${entityId}`); + if (!payload) return undefined; + try { + return base64ToBinary(payload.binary); + } catch { + console.warn( + `Failed to decode base64 for ${entityType}:${entityId}, skipping CRDT merge`, + ); + return undefined; + } + }; + // Resolve deck conflicts for (const deckId of pushResult.conflicts.decks) { const localDeck = await localDeckRepository.findById(deckId); const serverDeck = pullResult.decks.find((d) => d.id === deckId); + const crdtBinary = getCrdtBinary("deck", deckId); if (localDeck && serverDeck) { const resolution = await this.resolveDeckConflict( localDeck, serverDeck, + crdtBinary, ); result.decks.push(resolution); } else if (serverDeck) { @@ -479,11 +885,13 @@ export class ConflictResolver { for (const cardId of pushResult.conflicts.cards) { const localCard = await localCardRepository.findById(cardId); const serverCard = pullResult.cards.find((c) => c.id === cardId); + const crdtBinary = getCrdtBinary("card", cardId); if (localCard && serverCard) { const resolution = await this.resolveCardConflict( localCard, serverCard, + crdtBinary, ); result.cards.push(resolution); } else if (serverCard) { @@ -501,11 +909,13 @@ export class ConflictResolver { const serverNoteType = pullResult.noteTypes.find( (nt) => nt.id === noteTypeId, ); + const crdtBinary = getCrdtBinary("noteType", noteTypeId); if (localNoteType && serverNoteType) { const resolution = await this.resolveNoteTypeConflict( localNoteType, serverNoteType, + crdtBinary, ); result.noteTypes.push(resolution); } else if (serverNoteType) { @@ -522,11 +932,13 @@ export class ConflictResolver { const serverFieldType = pullResult.noteFieldTypes.find( (ft) => ft.id === fieldTypeId, ); + const crdtBinary = getCrdtBinary("noteFieldType", fieldTypeId); if (localFieldType && serverFieldType) { const resolution = await this.resolveNoteFieldTypeConflict( localFieldType, serverFieldType, + crdtBinary, ); result.noteFieldTypes.push(resolution); } else if (serverFieldType) { @@ -543,11 +955,13 @@ export class ConflictResolver { for (const noteId of pushResult.conflicts.notes) { const localNote = await localNoteRepository.findById(noteId); const serverNote = pullResult.notes.find((n) => n.id === noteId); + const crdtBinary = getCrdtBinary("note", noteId); if (localNote && serverNote) { const resolution = await this.resolveNoteConflict( localNote, serverNote, + crdtBinary, ); result.notes.push(resolution); } else if (serverNote) { @@ -564,11 +978,13 @@ export class ConflictResolver { const serverFieldValue = pullResult.noteFieldValues.find( (fv) => fv.id === fieldValueId, ); + const crdtBinary = getCrdtBinary("noteFieldValue", fieldValueId); if (localFieldValue && serverFieldValue) { const resolution = await this.resolveNoteFieldValueConflict( localFieldValue, serverFieldValue, + crdtBinary, ); result.noteFieldValues.push(resolution); } else if (serverFieldValue) { @@ -595,8 +1011,9 @@ export function createConflictResolver( } /** - * Default conflict resolver using LWW (server wins) strategy + * Default conflict resolver using CRDT (Automerge) strategy + * Falls back to server_wins when CRDT data is unavailable */ export const conflictResolver = new ConflictResolver({ - strategy: "server_wins", + strategy: "crdt", }); |
