aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--docs/dev/roadmap.md2
-rw-r--r--src/client/sync/conflict.test.ts269
-rw-r--r--src/client/sync/conflict.ts437
-rw-r--r--src/client/sync/push.test.ts5
4 files changed, 699 insertions, 14 deletions
diff --git a/docs/dev/roadmap.md b/docs/dev/roadmap.md
index 9251e15..d2a4d3b 100644
--- a/docs/dev/roadmap.md
+++ b/docs/dev/roadmap.md
@@ -25,7 +25,7 @@ Replace the current Last-Write-Wins (LWW) conflict resolution with Automerge CRD
- [x] Modify `src/client/sync/push.ts` - Add crdtChanges to push payload
- [x] Modify `src/client/sync/pull.ts` - Handle crdtChanges in pull response
-- [ ] Modify `src/client/sync/conflict.ts` - Replace LWW with Automerge merge
+- [x] Modify `src/client/sync/conflict.ts` - Replace LWW with Automerge merge
- [ ] Modify `src/client/sync/manager.ts` - Integrate CRDT sync flow
### Phase 4: Server-Side CRDT Support
diff --git a/src/client/sync/conflict.test.ts b/src/client/sync/conflict.test.ts
index 52362ff..e648373 100644
--- a/src/client/sync/conflict.test.ts
+++ b/src/client/sync/conflict.test.ts
@@ -6,6 +6,12 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { CardState, db } from "../db/index";
import { localCardRepository, localDeckRepository } from "../db/repositories";
import { ConflictResolver } from "./conflict";
+import {
+ binaryToBase64,
+ crdtDeckRepository,
+ crdtSyncDb,
+ crdtSyncStateManager,
+} from "./crdt";
import type { SyncPullResult } from "./pull";
import type { SyncPushResult } from "./push";
@@ -37,6 +43,8 @@ describe("ConflictResolver", () => {
await db.decks.clear();
await db.cards.clear();
await db.reviewLogs.clear();
+ await crdtSyncDb.syncState.clear();
+ await crdtSyncDb.metadata.clear();
localStorage.clear();
});
@@ -44,6 +52,8 @@ describe("ConflictResolver", () => {
await db.decks.clear();
await db.cards.clear();
await db.reviewLogs.clear();
+ await crdtSyncDb.syncState.clear();
+ await crdtSyncDb.metadata.clear();
localStorage.clear();
});
@@ -581,7 +591,7 @@ describe("ConflictResolver", () => {
expect(localDeck?.name).toBe("Local Only Deck");
});
- it("should default to server_wins strategy", async () => {
+ it("should default to crdt strategy", async () => {
const deck = await localDeckRepository.create({
userId: "user-1",
name: "Local Name",
@@ -619,7 +629,8 @@ describe("ConflictResolver", () => {
...createEmptyPullResult(5),
};
- // Create resolver without explicit strategy
+ // Create resolver without explicit strategy - defaults to CRDT
+ // Without CRDT data, falls back to server_wins behavior
const resolver = new ConflictResolver();
const result = await resolver.resolveConflicts(pushResult, pullResult);
@@ -629,4 +640,258 @@ describe("ConflictResolver", () => {
expect(updatedDeck?.name).toBe("Server Name");
});
});
+
+ describe("CRDT conflict resolution", () => {
+ it("should merge deck using CRDT when crdtChanges are provided", async () => {
+ // Create a local deck
+ const localDeck = await localDeckRepository.create({
+ userId: "user-1",
+ name: "Local Deck Name",
+ description: "Local description",
+ newCardsPerDay: 10,
+ });
+
+ // Store local CRDT document
+ const localCrdtResult = crdtDeckRepository.toCrdtDocument(localDeck);
+ await crdtSyncStateManager.setDocumentBinary(
+ "deck",
+ localDeck.id,
+ localCrdtResult.binary,
+ 1,
+ );
+
+ // Create a "server" version with different data
+ const serverDeckData = {
+ id: localDeck.id,
+ userId: "user-1",
+ name: "Server Deck Name",
+ description: "Server description",
+ newCardsPerDay: 20,
+ createdAt: localDeck.createdAt,
+ updatedAt: new Date(Date.now() + 1000),
+ deletedAt: null,
+ syncVersion: 5,
+ };
+
+ // Create server CRDT document
+ const serverCrdtResult = crdtDeckRepository.toCrdtDocument({
+ ...serverDeckData,
+ _synced: true,
+ });
+
+ const pushResult: SyncPushResult = {
+ decks: [{ id: localDeck.id, syncVersion: 1 }],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ conflicts: { ...createEmptyConflicts(), decks: [localDeck.id] },
+ };
+
+ const pullResult: SyncPullResult = {
+ decks: [serverDeckData],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(5),
+ crdtChanges: [
+ {
+ documentId: `deck:${localDeck.id}`,
+ entityType: "deck",
+ entityId: localDeck.id,
+ binary: binaryToBase64(serverCrdtResult.binary),
+ },
+ ],
+ };
+
+ const resolver = new ConflictResolver({ strategy: "crdt" });
+ const result = await resolver.resolveConflicts(pushResult, pullResult);
+
+ expect(result.decks).toHaveLength(1);
+ expect(result.decks[0]?.resolution).toBe("server_wins");
+
+ // Verify the CRDT sync state was updated
+ const storedBinary = await crdtSyncStateManager.getDocumentBinary(
+ "deck",
+ localDeck.id,
+ );
+ expect(storedBinary).toBeDefined();
+ });
+
+ it("should fall back to server_wins when CRDT merge fails", async () => {
+ const localDeck = await localDeckRepository.create({
+ userId: "user-1",
+ name: "Local Name",
+ description: null,
+ newCardsPerDay: 10,
+ });
+
+ const serverDeck = {
+ id: localDeck.id,
+ userId: "user-1",
+ name: "Server Name",
+ description: null,
+ newCardsPerDay: 20,
+ createdAt: new Date(),
+ updatedAt: new Date(),
+ deletedAt: null,
+ syncVersion: 5,
+ };
+
+ const pushResult: SyncPushResult = {
+ decks: [{ id: localDeck.id, syncVersion: 1 }],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ conflicts: { ...createEmptyConflicts(), decks: [localDeck.id] },
+ };
+
+ const pullResult: SyncPullResult = {
+ decks: [serverDeck],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(5),
+ crdtChanges: [
+ {
+ documentId: `deck:${localDeck.id}`,
+ entityType: "deck",
+ entityId: localDeck.id,
+ // Invalid base64 - should trigger fallback
+ binary: "invalid-base64-data!!!",
+ },
+ ],
+ };
+
+ const resolver = new ConflictResolver({ strategy: "crdt" });
+ const result = await resolver.resolveConflicts(pushResult, pullResult);
+
+ // Should still resolve using fallback
+ expect(result.decks).toHaveLength(1);
+ expect(result.decks[0]?.resolution).toBe("server_wins");
+
+ // Server data should be applied
+ const updatedDeck = await localDeckRepository.findById(localDeck.id);
+ expect(updatedDeck?.name).toBe("Server Name");
+ });
+
+ it("should fall back to server_wins when no CRDT data is available", async () => {
+ const localDeck = await localDeckRepository.create({
+ userId: "user-1",
+ name: "Local Name",
+ description: null,
+ newCardsPerDay: 10,
+ });
+
+ const serverDeck = {
+ id: localDeck.id,
+ userId: "user-1",
+ name: "Server Name",
+ description: null,
+ newCardsPerDay: 20,
+ createdAt: new Date(),
+ updatedAt: new Date(),
+ deletedAt: null,
+ syncVersion: 5,
+ };
+
+ const pushResult: SyncPushResult = {
+ decks: [{ id: localDeck.id, syncVersion: 1 }],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ conflicts: { ...createEmptyConflicts(), decks: [localDeck.id] },
+ };
+
+ // No crdtChanges in pull result
+ const pullResult: SyncPullResult = {
+ decks: [serverDeck],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(5),
+ };
+
+ const resolver = new ConflictResolver({ strategy: "crdt" });
+ const result = await resolver.resolveConflicts(pushResult, pullResult);
+
+ expect(result.decks).toHaveLength(1);
+ expect(result.decks[0]?.resolution).toBe("server_wins");
+
+ const updatedDeck = await localDeckRepository.findById(localDeck.id);
+ expect(updatedDeck?.name).toBe("Server Name");
+ });
+
+ it("should use CRDT to merge when local has no existing CRDT document", async () => {
+ // Create a local deck without a CRDT document
+ const localDeck = await localDeckRepository.create({
+ userId: "user-1",
+ name: "Local Name",
+ description: null,
+ newCardsPerDay: 10,
+ });
+
+ const serverDeck = {
+ id: localDeck.id,
+ userId: "user-1",
+ name: "Server Name",
+ description: null,
+ newCardsPerDay: 20,
+ createdAt: new Date(),
+ updatedAt: new Date(),
+ deletedAt: null,
+ syncVersion: 5,
+ };
+
+ // Create server CRDT document
+ const serverCrdtResult = crdtDeckRepository.toCrdtDocument({
+ ...serverDeck,
+ _synced: true,
+ });
+
+ const pushResult: SyncPushResult = {
+ decks: [{ id: localDeck.id, syncVersion: 1 }],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ conflicts: { ...createEmptyConflicts(), decks: [localDeck.id] },
+ };
+
+ const pullResult: SyncPullResult = {
+ decks: [serverDeck],
+ cards: [],
+ reviewLogs: [],
+ ...createEmptyPullResult(5),
+ crdtChanges: [
+ {
+ documentId: `deck:${localDeck.id}`,
+ entityType: "deck",
+ entityId: localDeck.id,
+ binary: binaryToBase64(serverCrdtResult.binary),
+ },
+ ],
+ };
+
+ const resolver = new ConflictResolver({ strategy: "crdt" });
+ const result = await resolver.resolveConflicts(pushResult, pullResult);
+
+ expect(result.decks).toHaveLength(1);
+ expect(result.decks[0]?.resolution).toBe("server_wins");
+
+ // Verify CRDT document was stored
+ const storedBinary = await crdtSyncStateManager.getDocumentBinary(
+ "deck",
+ localDeck.id,
+ );
+ expect(storedBinary).toBeDefined();
+ });
+ });
});
diff --git a/src/client/sync/conflict.ts b/src/client/sync/conflict.ts
index 2451920..a49e9b2 100644
--- a/src/client/sync/conflict.ts
+++ b/src/client/sync/conflict.ts
@@ -14,6 +14,17 @@ import {
localNoteRepository,
localNoteTypeRepository,
} from "../db/repositories";
+import {
+ type CrdtSyncPayload,
+ crdtCardRepository,
+ crdtDeckRepository,
+ crdtNoteFieldTypeRepository,
+ crdtNoteFieldValueRepository,
+ crdtNoteRepository,
+ crdtNoteTypeRepository,
+ crdtSyncStateManager,
+} from "./crdt";
+import { base64ToBinary } from "./crdt/sync-state";
import type {
ServerCard,
ServerDeck,
@@ -51,15 +62,16 @@ export interface ConflictResolutionResult {
export interface ConflictResolverOptions {
/**
* Strategy for resolving conflicts
- * - "server_wins": Always use server data (default for LWW)
+ * - "crdt": Use Automerge CRDT merge for conflict-free resolution (default)
+ * - "server_wins": Always use server data (fallback when no CRDT data)
* - "local_wins": Always use local data
- * - "newer_wins": Compare timestamps and use newer data
+ * - "newer_wins": Compare timestamps and use newer data (legacy LWW)
*/
- strategy?: "server_wins" | "local_wins" | "newer_wins";
+ strategy?: "crdt" | "server_wins" | "local_wins" | "newer_wins";
}
/**
- * Compare timestamps for LWW resolution
+ * Compare timestamps for LWW resolution (legacy fallback)
* Returns true if server data is newer or equal
*/
function isServerNewer(serverUpdatedAt: Date, localUpdatedAt: Date): boolean {
@@ -67,6 +79,15 @@ function isServerNewer(serverUpdatedAt: Date, localUpdatedAt: Date): boolean {
}
/**
+ * CRDT merge result with entity data
+ */
+interface CrdtMergeConflictResult<T> {
+ entity: Omit<T, "_synced">;
+ binary: Uint8Array;
+ hadLocalDocument: boolean;
+}
+
+/**
* Convert server deck to local format for storage
*/
function serverDeckToLocal(deck: ServerDeck): LocalDeck {
@@ -191,15 +212,15 @@ function serverNoteFieldValueToLocal(
* Handles conflicts reported by the server during push operations.
* When a conflict occurs (server has newer data), this resolver:
* 1. Identifies conflicting items from push result
- * 2. Pulls latest server data for those items
- * 3. Applies conflict resolution strategy (default: server wins / LWW)
+ * 2. Uses Automerge CRDT merge for conflict-free resolution (default)
+ * 3. Falls back to LWW strategies when CRDT data is unavailable
* 4. Updates local database accordingly
*/
export class ConflictResolver {
- private strategy: "server_wins" | "local_wins" | "newer_wins";
+ private strategy: "crdt" | "server_wins" | "local_wins" | "newer_wins";
constructor(options: ConflictResolverOptions = {}) {
- this.strategy = options.strategy ?? "server_wins";
+ this.strategy = options.strategy ?? "crdt";
}
/**
@@ -236,10 +257,36 @@ export class ConflictResolver {
async resolveDeckConflict(
localDeck: LocalDeck,
serverDeck: ServerDeck,
+ serverCrdtBinary?: Uint8Array,
): Promise<ConflictResolutionItem> {
+ // Try CRDT merge first if strategy is "crdt" and we have CRDT data
+ if (this.strategy === "crdt" && serverCrdtBinary) {
+ const mergeResult = await this.mergeDeckWithCrdt(
+ localDeck,
+ serverCrdtBinary,
+ );
+ if (mergeResult) {
+ const localData: LocalDeck = {
+ ...mergeResult.entity,
+ _synced: true,
+ };
+ await localDeckRepository.upsertFromServer(localData);
+ // Store the merged CRDT binary
+ await crdtSyncStateManager.setDocumentBinary(
+ "deck",
+ localDeck.id,
+ mergeResult.binary,
+ serverDeck.syncVersion,
+ );
+ return { id: localDeck.id, resolution: "server_wins" };
+ }
+ }
+
+ // Fallback to LWW strategies
let resolution: "server_wins" | "local_wins";
switch (this.strategy) {
+ case "crdt":
case "server_wins":
resolution = "server_wins";
break;
@@ -267,15 +314,76 @@ export class ConflictResolver {
}
/**
+ * Merge deck using CRDT
+ */
+ private async mergeDeckWithCrdt(
+ localDeck: LocalDeck,
+ serverBinary: Uint8Array,
+ ): Promise<CrdtMergeConflictResult<LocalDeck> | null> {
+ try {
+ // Get local CRDT binary if it exists
+ const localBinary = await crdtSyncStateManager.getDocumentBinary(
+ "deck",
+ localDeck.id,
+ );
+
+ // If no local CRDT binary, create one from local entity
+ const localDoc = localBinary
+ ? crdtDeckRepository.fromBinary(localBinary)
+ : crdtDeckRepository.toCrdtDocument(localDeck).doc;
+
+ // Load server document
+ const serverDoc = crdtDeckRepository.fromBinary(serverBinary);
+
+ // Merge documents
+ const mergeResult = crdtDeckRepository.merge(localDoc, serverDoc);
+
+ return {
+ entity: crdtDeckRepository.toLocalEntity(mergeResult.merged),
+ binary: mergeResult.binary,
+ hadLocalDocument: localBinary !== null,
+ };
+ } catch (error) {
+ console.warn("CRDT merge failed for deck, falling back to LWW:", error);
+ return null;
+ }
+ }
+
+ /**
* Resolve card conflict using configured strategy
*/
async resolveCardConflict(
localCard: LocalCard,
serverCard: ServerCard,
+ serverCrdtBinary?: Uint8Array,
): Promise<ConflictResolutionItem> {
+ // Try CRDT merge first if strategy is "crdt" and we have CRDT data
+ if (this.strategy === "crdt" && serverCrdtBinary) {
+ const mergeResult = await this.mergeCardWithCrdt(
+ localCard,
+ serverCrdtBinary,
+ );
+ if (mergeResult) {
+ const localData: LocalCard = {
+ ...mergeResult.entity,
+ _synced: true,
+ };
+ await localCardRepository.upsertFromServer(localData);
+ await crdtSyncStateManager.setDocumentBinary(
+ "card",
+ localCard.id,
+ mergeResult.binary,
+ serverCard.syncVersion,
+ );
+ return { id: localCard.id, resolution: "server_wins" };
+ }
+ }
+
+ // Fallback to LWW strategies
let resolution: "server_wins" | "local_wins";
switch (this.strategy) {
+ case "crdt":
case "server_wins":
resolution = "server_wins";
break;
@@ -303,15 +411,71 @@ export class ConflictResolver {
}
/**
+ * Merge card using CRDT
+ */
+ private async mergeCardWithCrdt(
+ localCard: LocalCard,
+ serverBinary: Uint8Array,
+ ): Promise<CrdtMergeConflictResult<LocalCard> | null> {
+ try {
+ const localBinary = await crdtSyncStateManager.getDocumentBinary(
+ "card",
+ localCard.id,
+ );
+
+ const localDoc = localBinary
+ ? crdtCardRepository.fromBinary(localBinary)
+ : crdtCardRepository.toCrdtDocument(localCard).doc;
+
+ const serverDoc = crdtCardRepository.fromBinary(serverBinary);
+ const mergeResult = crdtCardRepository.merge(localDoc, serverDoc);
+
+ return {
+ entity: crdtCardRepository.toLocalEntity(mergeResult.merged),
+ binary: mergeResult.binary,
+ hadLocalDocument: localBinary !== null,
+ };
+ } catch (error) {
+ console.warn("CRDT merge failed for card, falling back to LWW:", error);
+ return null;
+ }
+ }
+
+ /**
* Resolve note type conflict using configured strategy
*/
async resolveNoteTypeConflict(
localNoteType: LocalNoteType,
serverNoteType: ServerNoteType,
+ serverCrdtBinary?: Uint8Array,
): Promise<ConflictResolutionItem> {
+ // Try CRDT merge first if strategy is "crdt" and we have CRDT data
+ if (this.strategy === "crdt" && serverCrdtBinary) {
+ const mergeResult = await this.mergeNoteTypeWithCrdt(
+ localNoteType,
+ serverCrdtBinary,
+ );
+ if (mergeResult) {
+ const localData: LocalNoteType = {
+ ...mergeResult.entity,
+ _synced: true,
+ };
+ await localNoteTypeRepository.upsertFromServer(localData);
+ await crdtSyncStateManager.setDocumentBinary(
+ "noteType",
+ localNoteType.id,
+ mergeResult.binary,
+ serverNoteType.syncVersion,
+ );
+ return { id: localNoteType.id, resolution: "server_wins" };
+ }
+ }
+
+ // Fallback to LWW strategies
let resolution: "server_wins" | "local_wins";
switch (this.strategy) {
+ case "crdt":
case "server_wins":
resolution = "server_wins";
break;
@@ -337,15 +501,74 @@ export class ConflictResolver {
}
/**
+ * Merge note type using CRDT
+ */
+ private async mergeNoteTypeWithCrdt(
+ localNoteType: LocalNoteType,
+ serverBinary: Uint8Array,
+ ): Promise<CrdtMergeConflictResult<LocalNoteType> | null> {
+ try {
+ const localBinary = await crdtSyncStateManager.getDocumentBinary(
+ "noteType",
+ localNoteType.id,
+ );
+
+ const localDoc = localBinary
+ ? crdtNoteTypeRepository.fromBinary(localBinary)
+ : crdtNoteTypeRepository.toCrdtDocument(localNoteType).doc;
+
+ const serverDoc = crdtNoteTypeRepository.fromBinary(serverBinary);
+ const mergeResult = crdtNoteTypeRepository.merge(localDoc, serverDoc);
+
+ return {
+ entity: crdtNoteTypeRepository.toLocalEntity(mergeResult.merged),
+ binary: mergeResult.binary,
+ hadLocalDocument: localBinary !== null,
+ };
+ } catch (error) {
+ console.warn(
+ "CRDT merge failed for note type, falling back to LWW:",
+ error,
+ );
+ return null;
+ }
+ }
+
+ /**
* Resolve note field type conflict using configured strategy
*/
async resolveNoteFieldTypeConflict(
localFieldType: LocalNoteFieldType,
serverFieldType: ServerNoteFieldType,
+ serverCrdtBinary?: Uint8Array,
): Promise<ConflictResolutionItem> {
+ // Try CRDT merge first if strategy is "crdt" and we have CRDT data
+ if (this.strategy === "crdt" && serverCrdtBinary) {
+ const mergeResult = await this.mergeNoteFieldTypeWithCrdt(
+ localFieldType,
+ serverCrdtBinary,
+ );
+ if (mergeResult) {
+ const localData: LocalNoteFieldType = {
+ ...mergeResult.entity,
+ _synced: true,
+ };
+ await localNoteFieldTypeRepository.upsertFromServer(localData);
+ await crdtSyncStateManager.setDocumentBinary(
+ "noteFieldType",
+ localFieldType.id,
+ mergeResult.binary,
+ serverFieldType.syncVersion,
+ );
+ return { id: localFieldType.id, resolution: "server_wins" };
+ }
+ }
+
+ // Fallback to LWW strategies
let resolution: "server_wins" | "local_wins";
switch (this.strategy) {
+ case "crdt":
case "server_wins":
resolution = "server_wins";
break;
@@ -371,15 +594,77 @@ export class ConflictResolver {
}
/**
+ * Merge note field type using CRDT
+ */
+ private async mergeNoteFieldTypeWithCrdt(
+ localFieldType: LocalNoteFieldType,
+ serverBinary: Uint8Array,
+ ): Promise<CrdtMergeConflictResult<LocalNoteFieldType> | null> {
+ try {
+ const localBinary = await crdtSyncStateManager.getDocumentBinary(
+ "noteFieldType",
+ localFieldType.id,
+ );
+
+ const localDoc = localBinary
+ ? crdtNoteFieldTypeRepository.fromBinary(localBinary)
+ : crdtNoteFieldTypeRepository.toCrdtDocument(localFieldType).doc;
+
+ const serverDoc = crdtNoteFieldTypeRepository.fromBinary(serverBinary);
+ const mergeResult = crdtNoteFieldTypeRepository.merge(
+ localDoc,
+ serverDoc,
+ );
+
+ return {
+ entity: crdtNoteFieldTypeRepository.toLocalEntity(mergeResult.merged),
+ binary: mergeResult.binary,
+ hadLocalDocument: localBinary !== null,
+ };
+ } catch (error) {
+ console.warn(
+ "CRDT merge failed for note field type, falling back to LWW:",
+ error,
+ );
+ return null;
+ }
+ }
+
+ /**
* Resolve note conflict using configured strategy
*/
async resolveNoteConflict(
localNote: LocalNote,
serverNote: ServerNote,
+ serverCrdtBinary?: Uint8Array,
): Promise<ConflictResolutionItem> {
+ // Try CRDT merge first if strategy is "crdt" and we have CRDT data
+ if (this.strategy === "crdt" && serverCrdtBinary) {
+ const mergeResult = await this.mergeNoteWithCrdt(
+ localNote,
+ serverCrdtBinary,
+ );
+ if (mergeResult) {
+ const localData: LocalNote = {
+ ...mergeResult.entity,
+ _synced: true,
+ };
+ await localNoteRepository.upsertFromServer(localData);
+ await crdtSyncStateManager.setDocumentBinary(
+ "note",
+ localNote.id,
+ mergeResult.binary,
+ serverNote.syncVersion,
+ );
+ return { id: localNote.id, resolution: "server_wins" };
+ }
+ }
+
+ // Fallback to LWW strategies
let resolution: "server_wins" | "local_wins";
switch (this.strategy) {
+ case "crdt":
case "server_wins":
resolution = "server_wins";
break;
@@ -405,15 +690,71 @@ export class ConflictResolver {
}
/**
+ * Merge note using CRDT
+ */
+ private async mergeNoteWithCrdt(
+ localNote: LocalNote,
+ serverBinary: Uint8Array,
+ ): Promise<CrdtMergeConflictResult<LocalNote> | null> {
+ try {
+ const localBinary = await crdtSyncStateManager.getDocumentBinary(
+ "note",
+ localNote.id,
+ );
+
+ const localDoc = localBinary
+ ? crdtNoteRepository.fromBinary(localBinary)
+ : crdtNoteRepository.toCrdtDocument(localNote).doc;
+
+ const serverDoc = crdtNoteRepository.fromBinary(serverBinary);
+ const mergeResult = crdtNoteRepository.merge(localDoc, serverDoc);
+
+ return {
+ entity: crdtNoteRepository.toLocalEntity(mergeResult.merged),
+ binary: mergeResult.binary,
+ hadLocalDocument: localBinary !== null,
+ };
+ } catch (error) {
+ console.warn("CRDT merge failed for note, falling back to LWW:", error);
+ return null;
+ }
+ }
+
+ /**
* Resolve note field value conflict using configured strategy
*/
async resolveNoteFieldValueConflict(
localFieldValue: LocalNoteFieldValue,
serverFieldValue: ServerNoteFieldValue,
+ serverCrdtBinary?: Uint8Array,
): Promise<ConflictResolutionItem> {
+ // Try CRDT merge first if strategy is "crdt" and we have CRDT data
+ if (this.strategy === "crdt" && serverCrdtBinary) {
+ const mergeResult = await this.mergeNoteFieldValueWithCrdt(
+ localFieldValue,
+ serverCrdtBinary,
+ );
+ if (mergeResult) {
+ const localData: LocalNoteFieldValue = {
+ ...mergeResult.entity,
+ _synced: true,
+ };
+ await localNoteFieldValueRepository.upsertFromServer(localData);
+ await crdtSyncStateManager.setDocumentBinary(
+ "noteFieldValue",
+ localFieldValue.id,
+ mergeResult.binary,
+ serverFieldValue.syncVersion,
+ );
+ return { id: localFieldValue.id, resolution: "server_wins" };
+ }
+ }
+
+ // Fallback to LWW strategies
let resolution: "server_wins" | "local_wins";
switch (this.strategy) {
+ case "crdt":
case "server_wins":
resolution = "server_wins";
break;
@@ -439,8 +780,46 @@ export class ConflictResolver {
}
/**
+ * Merge note field value using CRDT
+ */
+ private async mergeNoteFieldValueWithCrdt(
+ localFieldValue: LocalNoteFieldValue,
+ serverBinary: Uint8Array,
+ ): Promise<CrdtMergeConflictResult<LocalNoteFieldValue> | null> {
+ try {
+ const localBinary = await crdtSyncStateManager.getDocumentBinary(
+ "noteFieldValue",
+ localFieldValue.id,
+ );
+
+ const localDoc = localBinary
+ ? crdtNoteFieldValueRepository.fromBinary(localBinary)
+ : crdtNoteFieldValueRepository.toCrdtDocument(localFieldValue).doc;
+
+ const serverDoc = crdtNoteFieldValueRepository.fromBinary(serverBinary);
+ const mergeResult = crdtNoteFieldValueRepository.merge(
+ localDoc,
+ serverDoc,
+ );
+
+ return {
+ entity: crdtNoteFieldValueRepository.toLocalEntity(mergeResult.merged),
+ binary: mergeResult.binary,
+ hadLocalDocument: localBinary !== null,
+ };
+ } catch (error) {
+ console.warn(
+ "CRDT merge failed for note field value, falling back to LWW:",
+ error,
+ );
+ return null;
+ }
+ }
+
+ /**
* Resolve all conflicts from a push result
* Uses pull result to get server data for conflicting items
+ * When CRDT changes are available, uses Automerge merge for resolution
*/
async resolveConflicts(
pushResult: SyncPushResult,
@@ -455,15 +834,42 @@ export class ConflictResolver {
noteFieldValues: [],
};
+ // Build a map of CRDT payloads by document ID for quick lookup
+ const crdtPayloadMap = new Map<string, CrdtSyncPayload>();
+ if (pullResult.crdtChanges) {
+ for (const payload of pullResult.crdtChanges) {
+ crdtPayloadMap.set(payload.documentId, payload);
+ }
+ }
+
+ // Helper to get CRDT binary for an entity
+ const getCrdtBinary = (
+ entityType: string,
+ entityId: string,
+ ): Uint8Array | undefined => {
+ const payload = crdtPayloadMap.get(`${entityType}:${entityId}`);
+ if (!payload) return undefined;
+ try {
+ return base64ToBinary(payload.binary);
+ } catch {
+ console.warn(
+ `Failed to decode base64 for ${entityType}:${entityId}, skipping CRDT merge`,
+ );
+ return undefined;
+ }
+ };
+
// Resolve deck conflicts
for (const deckId of pushResult.conflicts.decks) {
const localDeck = await localDeckRepository.findById(deckId);
const serverDeck = pullResult.decks.find((d) => d.id === deckId);
+ const crdtBinary = getCrdtBinary("deck", deckId);
if (localDeck && serverDeck) {
const resolution = await this.resolveDeckConflict(
localDeck,
serverDeck,
+ crdtBinary,
);
result.decks.push(resolution);
} else if (serverDeck) {
@@ -479,11 +885,13 @@ export class ConflictResolver {
for (const cardId of pushResult.conflicts.cards) {
const localCard = await localCardRepository.findById(cardId);
const serverCard = pullResult.cards.find((c) => c.id === cardId);
+ const crdtBinary = getCrdtBinary("card", cardId);
if (localCard && serverCard) {
const resolution = await this.resolveCardConflict(
localCard,
serverCard,
+ crdtBinary,
);
result.cards.push(resolution);
} else if (serverCard) {
@@ -501,11 +909,13 @@ export class ConflictResolver {
const serverNoteType = pullResult.noteTypes.find(
(nt) => nt.id === noteTypeId,
);
+ const crdtBinary = getCrdtBinary("noteType", noteTypeId);
if (localNoteType && serverNoteType) {
const resolution = await this.resolveNoteTypeConflict(
localNoteType,
serverNoteType,
+ crdtBinary,
);
result.noteTypes.push(resolution);
} else if (serverNoteType) {
@@ -522,11 +932,13 @@ export class ConflictResolver {
const serverFieldType = pullResult.noteFieldTypes.find(
(ft) => ft.id === fieldTypeId,
);
+ const crdtBinary = getCrdtBinary("noteFieldType", fieldTypeId);
if (localFieldType && serverFieldType) {
const resolution = await this.resolveNoteFieldTypeConflict(
localFieldType,
serverFieldType,
+ crdtBinary,
);
result.noteFieldTypes.push(resolution);
} else if (serverFieldType) {
@@ -543,11 +955,13 @@ export class ConflictResolver {
for (const noteId of pushResult.conflicts.notes) {
const localNote = await localNoteRepository.findById(noteId);
const serverNote = pullResult.notes.find((n) => n.id === noteId);
+ const crdtBinary = getCrdtBinary("note", noteId);
if (localNote && serverNote) {
const resolution = await this.resolveNoteConflict(
localNote,
serverNote,
+ crdtBinary,
);
result.notes.push(resolution);
} else if (serverNote) {
@@ -564,11 +978,13 @@ export class ConflictResolver {
const serverFieldValue = pullResult.noteFieldValues.find(
(fv) => fv.id === fieldValueId,
);
+ const crdtBinary = getCrdtBinary("noteFieldValue", fieldValueId);
if (localFieldValue && serverFieldValue) {
const resolution = await this.resolveNoteFieldValueConflict(
localFieldValue,
serverFieldValue,
+ crdtBinary,
);
result.noteFieldValues.push(resolution);
} else if (serverFieldValue) {
@@ -595,8 +1011,9 @@ export function createConflictResolver(
}
/**
- * Default conflict resolver using LWW (server wins) strategy
+ * Default conflict resolver using CRDT (Automerge) strategy
+ * Falls back to server_wins when CRDT data is unavailable
*/
export const conflictResolver = new ConflictResolver({
- strategy: "server_wins",
+ strategy: "crdt",
});
diff --git a/src/client/sync/push.test.ts b/src/client/sync/push.test.ts
index bce4652..19b39da 100644
--- a/src/client/sync/push.test.ts
+++ b/src/client/sync/push.test.ts
@@ -1140,7 +1140,10 @@ describe("generateCrdtChanges", () => {
expect(crdtChanges[0]?.documentId).toBe("deck:deck-1");
expect(crdtChanges[0]?.binary).toBeDefined();
// Verify it's valid base64
- expect(() => base64ToBinary(crdtChanges[0]!.binary)).not.toThrow();
+ const binary = crdtChanges[0]?.binary;
+ if (binary) {
+ expect(() => base64ToBinary(binary)).not.toThrow();
+ }
});
it("should generate CRDT changes for cards", () => {