diff options
Diffstat (limited to 'src/client')
| -rw-r--r-- | src/client/sync/pull.test.ts | 491 | ||||
| -rw-r--r-- | src/client/sync/pull.ts | 158 |
2 files changed, 649 insertions, 0 deletions
diff --git a/src/client/sync/pull.test.ts b/src/client/sync/pull.test.ts index dd562a0..8bbf7cf 100644 --- a/src/client/sync/pull.test.ts +++ b/src/client/sync/pull.test.ts @@ -13,6 +13,15 @@ import { localNoteTypeRepository, } from "../db/repositories"; import { + binaryToBase64, + CrdtEntityType, + crdtDeckRepository, + crdtNoteTypeRepository, + crdtSyncDb, + crdtSyncStateManager, +} from "./crdt"; +import { + applyCrdtChanges, PullService, pullResultToLocalData, type SyncPullResult, @@ -1151,3 +1160,485 @@ describe("PullService", () => { }); }); }); + +describe("applyCrdtChanges", () => { + beforeEach(async () => { + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + }); + + afterEach(async () => { + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + }); + + it("should process CRDT payload for a new deck", async () => { + // Create a CRDT document from a deck + const deck = { + id: "deck-1", + userId: "user-1", + name: "Test Deck", + description: "A test description", + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const crdtResult = crdtDeckRepository.toCrdtDocument(deck); + + const payload = { + documentId: crdtResult.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: binaryToBase64(crdtResult.binary), + }; + + const result = await applyCrdtChanges([payload], 5); + + expect(result.created).toBe(1); + expect(result.merged).toBe(0); + expect(result.entities.decks).toHaveLength(1); + expect(result.entities.decks[0]?.id).toBe("deck-1"); + expect(result.entities.decks[0]?.name).toBe("Test Deck"); + expect(result.entities.decks[0]?.description).toBe("A test description"); + }); + + it("should merge CRDT payload with existing local document", async () => { + // Create an initial local CRDT document + const localDeck = { + id: "deck-1", + userId: "user-1", + name: "Local Deck", + description: "Local description", + newCardsPerDay: 10, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-01T12:00:00Z"), + deletedAt: null, + syncVersion: 1, + _synced: true as const, + }; + const localCrdtResult = crdtDeckRepository.toCrdtDocument(localDeck); + + // Store the local CRDT binary + await crdtSyncStateManager.setDocumentBinary( + CrdtEntityType.Deck, + localDeck.id, + localCrdtResult.binary, + 1, + ); + + // Create a remote CRDT document with updated data + const remoteDeck = { + id: "deck-1", + userId: "user-1", + name: "Remote Deck", + description: "Remote description", + newCardsPerDay: 25, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), // Later timestamp + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const remoteCrdtResult = crdtDeckRepository.toCrdtDocument(remoteDeck); + + const payload = { + documentId: remoteCrdtResult.documentId, + entityType: CrdtEntityType.Deck, + entityId: remoteDeck.id, + binary: binaryToBase64(remoteCrdtResult.binary), + }; + + const result = await applyCrdtChanges([payload], 5); + + expect(result.created).toBe(0); + expect(result.merged).toBe(1); + expect(result.entities.decks).toHaveLength(1); + // The merged result should reflect the remote changes + expect(result.entities.decks[0]?.id).toBe("deck-1"); + }); + + it("should process multiple CRDT payloads", async () => { + const deck1 = { + id: "deck-1", + userId: "user-1", + name: "Deck 1", + description: null, + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const deck2 = { + id: "deck-2", + userId: "user-1", + name: "Deck 2", + description: "Second deck", + newCardsPerDay: 15, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + + const crdtResult1 = crdtDeckRepository.toCrdtDocument(deck1); + const crdtResult2 = crdtDeckRepository.toCrdtDocument(deck2); + + const payloads = [ + { + documentId: crdtResult1.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck1.id, + binary: binaryToBase64(crdtResult1.binary), + }, + { + documentId: crdtResult2.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck2.id, + binary: binaryToBase64(crdtResult2.binary), + }, + ]; + + const result = await applyCrdtChanges(payloads, 5); + + expect(result.created).toBe(2); + expect(result.merged).toBe(0); + expect(result.entities.decks).toHaveLength(2); + }); + + it("should process CRDT payloads for different entity types", async () => { + const deck = { + id: "deck-1", + userId: "user-1", + name: "Test Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + + const noteType = { + id: "note-type-1", + userId: "user-1", + name: "Basic", + frontTemplate: "{{Front}}", + backTemplate: "{{Back}}", + isReversible: false, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + + const deckCrdt = crdtDeckRepository.toCrdtDocument(deck); + const noteTypeCrdt = crdtNoteTypeRepository.toCrdtDocument(noteType); + + const payloads = [ + { + documentId: deckCrdt.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: binaryToBase64(deckCrdt.binary), + }, + { + documentId: noteTypeCrdt.documentId, + entityType: CrdtEntityType.NoteType, + entityId: noteType.id, + binary: binaryToBase64(noteTypeCrdt.binary), + }, + ]; + + const result = await applyCrdtChanges(payloads, 5); + + expect(result.created).toBe(2); + expect(result.entities.decks).toHaveLength(1); + expect(result.entities.noteTypes).toHaveLength(1); + expect(result.entities.decks[0]?.name).toBe("Test Deck"); + expect(result.entities.noteTypes[0]?.name).toBe("Basic"); + }); + + it("should store merged binary in sync state", async () => { + const deck = { + id: "deck-1", + userId: "user-1", + name: "Test Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const crdtResult = crdtDeckRepository.toCrdtDocument(deck); + + const payload = { + documentId: crdtResult.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: binaryToBase64(crdtResult.binary), + }; + + await applyCrdtChanges([payload], 5); + + // Verify the binary was stored in sync state + const storedBinary = await crdtSyncStateManager.getDocumentBinary( + CrdtEntityType.Deck, + deck.id, + ); + + expect(storedBinary).toBeDefined(); + // Check that it's a typed array with length > 0 + expect(storedBinary?.length).toBeGreaterThan(0); + }); + + it("should skip invalid document IDs", async () => { + const consoleWarn = vi.spyOn(console, "warn").mockImplementation(() => {}); + + const payload = { + documentId: "invalid-format", + entityType: CrdtEntityType.Deck, + entityId: "deck-1", + binary: "SGVsbG8=", // "Hello" in base64 + }; + + const result = await applyCrdtChanges([payload], 5); + + expect(result.created).toBe(0); + expect(result.merged).toBe(0); + expect(result.entities.decks).toHaveLength(0); + expect(consoleWarn).toHaveBeenCalledWith( + "Invalid document ID: invalid-format", + ); + + consoleWarn.mockRestore(); + }); + + it("should return empty result for empty payloads", async () => { + const result = await applyCrdtChanges([], 5); + + expect(result.created).toBe(0); + expect(result.merged).toBe(0); + expect(result.entities.decks).toHaveLength(0); + expect(result.entities.noteTypes).toHaveLength(0); + expect(result.entities.cards).toHaveLength(0); + }); +}); + +describe("PullService with CRDT changes", () => { + let syncQueue: SyncQueue; + + beforeEach(async () => { + await db.decks.clear(); + await db.cards.clear(); + await db.reviewLogs.clear(); + await db.noteTypes.clear(); + await db.noteFieldTypes.clear(); + await db.notes.clear(); + await db.noteFieldValues.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + localStorage.clear(); + syncQueue = new SyncQueue(); + }); + + afterEach(async () => { + await db.decks.clear(); + await db.cards.clear(); + await db.reviewLogs.clear(); + await db.noteTypes.clear(); + await db.noteFieldTypes.clear(); + await db.notes.clear(); + await db.noteFieldValues.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + localStorage.clear(); + }); + + it("should process CRDT changes when present in pull response", async () => { + const deck = { + id: "deck-1", + userId: "user-1", + name: "CRDT Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const crdtResult = crdtDeckRepository.toCrdtDocument(deck); + + const pullFromServer = vi.fn().mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + crdtChanges: [ + { + documentId: crdtResult.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: binaryToBase64(crdtResult.binary), + }, + ], + currentSyncVersion: 5, + }); + + const pullService = new PullService({ + syncQueue, + pullFromServer, + }); + + await pullService.pull(); + + // Verify CRDT binary was stored + const storedBinary = await crdtSyncStateManager.getDocumentBinary( + CrdtEntityType.Deck, + deck.id, + ); + expect(storedBinary).toBeDefined(); + }); + + it("should handle pull response without CRDT changes", async () => { + const pullFromServer = vi.fn().mockResolvedValue({ + decks: [ + { + id: "deck-1", + userId: "user-1", + name: "Test Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + syncVersion: 1, + }, + ], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + // No crdtChanges field + currentSyncVersion: 1, + }); + + const pullService = new PullService({ + syncQueue, + pullFromServer, + }); + + // Should not throw even without crdtChanges + const result = await pullService.pull(); + + expect(result.decks).toHaveLength(1); + expect(syncQueue.getLastSyncVersion()).toBe(1); + }); + + it("should handle empty CRDT changes array", async () => { + const pullFromServer = vi.fn().mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + crdtChanges: [], + currentSyncVersion: 1, + }); + + const pullService = new PullService({ + syncQueue, + pullFromServer, + }); + + const result = await pullService.pull(); + + expect(result.crdtChanges).toHaveLength(0); + expect(syncQueue.getLastSyncVersion()).toBe(1); + }); + + it("should process both regular data and CRDT changes", async () => { + // Create CRDT payload for a note type + const noteType = { + id: "note-type-1", + userId: "user-1", + name: "CRDT NoteType", + frontTemplate: "{{Front}}", + backTemplate: "{{Back}}", + isReversible: true, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const crdtResult = crdtNoteTypeRepository.toCrdtDocument(noteType); + + const pullFromServer = vi.fn().mockResolvedValue({ + decks: [ + { + id: "deck-1", + userId: "user-1", + name: "Regular Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + syncVersion: 5, + }, + ], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + crdtChanges: [ + { + documentId: crdtResult.documentId, + entityType: CrdtEntityType.NoteType, + entityId: noteType.id, + binary: binaryToBase64(crdtResult.binary), + }, + ], + currentSyncVersion: 5, + }); + + const pullService = new PullService({ + syncQueue, + pullFromServer, + }); + + await pullService.pull(); + + // Verify regular deck was applied + const storedDeck = await localDeckRepository.findById("deck-1"); + expect(storedDeck).toBeDefined(); + expect(storedDeck?.name).toBe("Regular Deck"); + + // Verify CRDT binary was stored for note type + const storedBinary = await crdtSyncStateManager.getDocumentBinary( + CrdtEntityType.NoteType, + noteType.id, + ); + expect(storedBinary).toBeDefined(); + }); +}); diff --git a/src/client/sync/pull.ts b/src/client/sync/pull.ts index 8b55a9b..4771757 100644 --- a/src/client/sync/pull.ts +++ b/src/client/sync/pull.ts @@ -10,6 +10,16 @@ import type { LocalReviewLog, RatingType, } from "../db/index"; +import { + base64ToBinary, + CrdtEntityType, + type CrdtEntityTypeValue, + type CrdtSyncPayload, + crdtSyncStateManager, + getCrdtRepository, + mergeAndConvert, + parseDocumentId, +} from "./crdt"; import type { SyncQueue } from "./queue"; /** @@ -136,6 +146,8 @@ export interface SyncPullResult { noteFieldTypes: ServerNoteFieldType[]; notes: ServerNote[]; noteFieldValues: ServerNoteFieldValue[]; + /** CRDT document changes for conflict-free sync */ + crdtChanges?: CrdtSyncPayload[]; currentSyncVersion: number; } @@ -309,6 +321,147 @@ export function pullResultToLocalData(result: SyncPullResult): { } /** + * Result of applying CRDT changes + */ +export interface ApplyCrdtChangesResult { + /** Number of documents that were merged */ + merged: number; + /** Number of documents that were new (no local version) */ + created: number; + /** Merged entities grouped by type */ + entities: { + decks: Omit<LocalDeck, "_synced">[]; + noteTypes: Omit<LocalNoteType, "_synced">[]; + noteFieldTypes: Omit<LocalNoteFieldType, "_synced">[]; + notes: Omit<LocalNote, "_synced">[]; + noteFieldValues: Omit<LocalNoteFieldValue, "_synced">[]; + cards: Omit<LocalCard, "_synced">[]; + reviewLogs: Omit<LocalReviewLog, "_synced">[]; + }; +} + +/** + * Apply CRDT changes from pull response + * + * This function: + * 1. Parses each CRDT payload from the server + * 2. Loads existing local CRDT document if present + * 3. Merges remote and local documents using Automerge + * 4. Stores the merged binary in sync state + * 5. Converts merged document to local entity format + * + * @param crdtChanges Array of CRDT payloads from server + * @param syncVersion The sync version to associate with merged documents + * @returns Result containing merged entities ready for database storage + */ +export async function applyCrdtChanges( + crdtChanges: CrdtSyncPayload[], + syncVersion: number, +): Promise<ApplyCrdtChangesResult> { + const result: ApplyCrdtChangesResult = { + merged: 0, + created: 0, + entities: { + decks: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + cards: [], + reviewLogs: [], + }, + }; + + // Process each CRDT payload + for (const payload of crdtChanges) { + const parsed = parseDocumentId(payload.documentId); + if (!parsed) { + console.warn(`Invalid document ID: ${payload.documentId}`); + continue; + } + + const { entityType, entityId } = parsed; + + // Load existing local CRDT binary if present + const localBinary = await crdtSyncStateManager.getDocumentBinary( + entityType, + entityId, + ); + + // Convert remote payload binary from base64 + const remoteBinary = base64ToBinary(payload.binary); + + // Get the repository and merge based on entity type + // We use type assertions here because getCrdtRepository returns a union type + // but we know the specific type based on entityType + const repository = getCrdtRepository(entityType); + const mergeResult = mergeAndConvert( + localBinary, + remoteBinary, + // biome-ignore lint/suspicious/noExplicitAny: Repository type is determined by entityType at runtime + repository as any, + ); + + // Store the merged binary in sync state + await crdtSyncStateManager.setDocumentBinary( + entityType, + entityId, + mergeResult.binary, + syncVersion, + ); + + // Track statistics + if (localBinary === null) { + result.created++; + } else { + result.merged++; + } + + // Add entity to the appropriate array based on type + addEntityToResult(result.entities, entityType, mergeResult.entity); + } + + return result; +} + +/** + * Helper to add an entity to the result based on its type + */ +function addEntityToResult( + entities: ApplyCrdtChangesResult["entities"], + entityType: CrdtEntityTypeValue, + entity: unknown, +): void { + switch (entityType) { + case CrdtEntityType.Deck: + entities.decks.push(entity as Omit<LocalDeck, "_synced">); + break; + case CrdtEntityType.NoteType: + entities.noteTypes.push(entity as Omit<LocalNoteType, "_synced">); + break; + case CrdtEntityType.NoteFieldType: + entities.noteFieldTypes.push( + entity as Omit<LocalNoteFieldType, "_synced">, + ); + break; + case CrdtEntityType.Note: + entities.notes.push(entity as Omit<LocalNote, "_synced">); + break; + case CrdtEntityType.NoteFieldValue: + entities.noteFieldValues.push( + entity as Omit<LocalNoteFieldValue, "_synced">, + ); + break; + case CrdtEntityType.Card: + entities.cards.push(entity as Omit<LocalCard, "_synced">); + break; + case CrdtEntityType.ReviewLog: + entities.reviewLogs.push(entity as Omit<LocalReviewLog, "_synced">); + break; + } +} + +/** * Pull sync service * * Handles pulling changes from the server: @@ -339,6 +492,11 @@ export class PullService { // Pull changes from server const result = await this.pullFromServer(lastSyncVersion); + // Process CRDT changes if present + if (result.crdtChanges && result.crdtChanges.length > 0) { + await applyCrdtChanges(result.crdtChanges, result.currentSyncVersion); + } + // If there are changes, apply them to local database if ( result.decks.length > 0 || |
