diff options
| author | nsfisis <nsfisis@gmail.com> | 2025-12-31 15:32:58 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2025-12-31 15:32:58 +0900 |
| commit | f3952a509b2d98a25cbb80c9ad091b3b471be52e (patch) | |
| tree | 23fdc7a024ab028f5c7d5fca2d7ea8e66073d3df /src | |
| parent | a1383a9304ff457d6671e12ded4265b135256004 (diff) | |
| download | kioku-f3952a509b2d98a25cbb80c9ad091b3b471be52e.tar.gz kioku-f3952a509b2d98a25cbb80c9ad091b3b471be52e.tar.zst kioku-f3952a509b2d98a25cbb80c9ad091b3b471be52e.zip | |
feat(crdt): handle crdtChanges in sync pull response
Add applyCrdtChanges function to process CRDT payloads received from
the server during pull operations. The function merges remote documents
with local ones using Automerge and stores the result in sync state.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'src')
| -rw-r--r-- | src/client/sync/pull.test.ts | 491 | ||||
| -rw-r--r-- | src/client/sync/pull.ts | 158 |
2 files changed, 649 insertions, 0 deletions
diff --git a/src/client/sync/pull.test.ts b/src/client/sync/pull.test.ts index dd562a0..8bbf7cf 100644 --- a/src/client/sync/pull.test.ts +++ b/src/client/sync/pull.test.ts @@ -13,6 +13,15 @@ import { localNoteTypeRepository, } from "../db/repositories"; import { + binaryToBase64, + CrdtEntityType, + crdtDeckRepository, + crdtNoteTypeRepository, + crdtSyncDb, + crdtSyncStateManager, +} from "./crdt"; +import { + applyCrdtChanges, PullService, pullResultToLocalData, type SyncPullResult, @@ -1151,3 +1160,485 @@ describe("PullService", () => { }); }); }); + +describe("applyCrdtChanges", () => { + beforeEach(async () => { + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + }); + + afterEach(async () => { + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + }); + + it("should process CRDT payload for a new deck", async () => { + // Create a CRDT document from a deck + const deck = { + id: "deck-1", + userId: "user-1", + name: "Test Deck", + description: "A test description", + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const crdtResult = crdtDeckRepository.toCrdtDocument(deck); + + const payload = { + documentId: crdtResult.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: binaryToBase64(crdtResult.binary), + }; + + const result = await applyCrdtChanges([payload], 5); + + expect(result.created).toBe(1); + expect(result.merged).toBe(0); + expect(result.entities.decks).toHaveLength(1); + expect(result.entities.decks[0]?.id).toBe("deck-1"); + expect(result.entities.decks[0]?.name).toBe("Test Deck"); + expect(result.entities.decks[0]?.description).toBe("A test description"); + }); + + it("should merge CRDT payload with existing local document", async () => { + // Create an initial local CRDT document + const localDeck = { + id: "deck-1", + userId: "user-1", + name: "Local Deck", + description: "Local description", + newCardsPerDay: 10, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-01T12:00:00Z"), + deletedAt: null, + syncVersion: 1, + _synced: true as const, + }; + const localCrdtResult = crdtDeckRepository.toCrdtDocument(localDeck); + + // Store the local CRDT binary + await crdtSyncStateManager.setDocumentBinary( + CrdtEntityType.Deck, + localDeck.id, + localCrdtResult.binary, + 1, + ); + + // Create a remote CRDT document with updated data + const remoteDeck = { + id: "deck-1", + userId: "user-1", + name: "Remote Deck", + description: "Remote description", + newCardsPerDay: 25, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), // Later timestamp + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const remoteCrdtResult = crdtDeckRepository.toCrdtDocument(remoteDeck); + + const payload = { + documentId: remoteCrdtResult.documentId, + entityType: CrdtEntityType.Deck, + entityId: remoteDeck.id, + binary: binaryToBase64(remoteCrdtResult.binary), + }; + + const result = await applyCrdtChanges([payload], 5); + + expect(result.created).toBe(0); + expect(result.merged).toBe(1); + expect(result.entities.decks).toHaveLength(1); + // The merged result should reflect the remote changes + expect(result.entities.decks[0]?.id).toBe("deck-1"); + }); + + it("should process multiple CRDT payloads", async () => { + const deck1 = { + id: "deck-1", + userId: "user-1", + name: "Deck 1", + description: null, + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const deck2 = { + id: "deck-2", + userId: "user-1", + name: "Deck 2", + description: "Second deck", + newCardsPerDay: 15, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + + const crdtResult1 = crdtDeckRepository.toCrdtDocument(deck1); + const crdtResult2 = crdtDeckRepository.toCrdtDocument(deck2); + + const payloads = [ + { + documentId: crdtResult1.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck1.id, + binary: binaryToBase64(crdtResult1.binary), + }, + { + documentId: crdtResult2.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck2.id, + binary: binaryToBase64(crdtResult2.binary), + }, + ]; + + const result = await applyCrdtChanges(payloads, 5); + + expect(result.created).toBe(2); + expect(result.merged).toBe(0); + expect(result.entities.decks).toHaveLength(2); + }); + + it("should process CRDT payloads for different entity types", async () => { + const deck = { + id: "deck-1", + userId: "user-1", + name: "Test Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + + const noteType = { + id: "note-type-1", + userId: "user-1", + name: "Basic", + frontTemplate: "{{Front}}", + backTemplate: "{{Back}}", + isReversible: false, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + + const deckCrdt = crdtDeckRepository.toCrdtDocument(deck); + const noteTypeCrdt = crdtNoteTypeRepository.toCrdtDocument(noteType); + + const payloads = [ + { + documentId: deckCrdt.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: binaryToBase64(deckCrdt.binary), + }, + { + documentId: noteTypeCrdt.documentId, + entityType: CrdtEntityType.NoteType, + entityId: noteType.id, + binary: binaryToBase64(noteTypeCrdt.binary), + }, + ]; + + const result = await applyCrdtChanges(payloads, 5); + + expect(result.created).toBe(2); + expect(result.entities.decks).toHaveLength(1); + expect(result.entities.noteTypes).toHaveLength(1); + expect(result.entities.decks[0]?.name).toBe("Test Deck"); + expect(result.entities.noteTypes[0]?.name).toBe("Basic"); + }); + + it("should store merged binary in sync state", async () => { + const deck = { + id: "deck-1", + userId: "user-1", + name: "Test Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const crdtResult = crdtDeckRepository.toCrdtDocument(deck); + + const payload = { + documentId: crdtResult.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: binaryToBase64(crdtResult.binary), + }; + + await applyCrdtChanges([payload], 5); + + // Verify the binary was stored in sync state + const storedBinary = await crdtSyncStateManager.getDocumentBinary( + CrdtEntityType.Deck, + deck.id, + ); + + expect(storedBinary).toBeDefined(); + // Check that it's a typed array with length > 0 + expect(storedBinary?.length).toBeGreaterThan(0); + }); + + it("should skip invalid document IDs", async () => { + const consoleWarn = vi.spyOn(console, "warn").mockImplementation(() => {}); + + const payload = { + documentId: "invalid-format", + entityType: CrdtEntityType.Deck, + entityId: "deck-1", + binary: "SGVsbG8=", // "Hello" in base64 + }; + + const result = await applyCrdtChanges([payload], 5); + + expect(result.created).toBe(0); + expect(result.merged).toBe(0); + expect(result.entities.decks).toHaveLength(0); + expect(consoleWarn).toHaveBeenCalledWith( + "Invalid document ID: invalid-format", + ); + + consoleWarn.mockRestore(); + }); + + it("should return empty result for empty payloads", async () => { + const result = await applyCrdtChanges([], 5); + + expect(result.created).toBe(0); + expect(result.merged).toBe(0); + expect(result.entities.decks).toHaveLength(0); + expect(result.entities.noteTypes).toHaveLength(0); + expect(result.entities.cards).toHaveLength(0); + }); +}); + +describe("PullService with CRDT changes", () => { + let syncQueue: SyncQueue; + + beforeEach(async () => { + await db.decks.clear(); + await db.cards.clear(); + await db.reviewLogs.clear(); + await db.noteTypes.clear(); + await db.noteFieldTypes.clear(); + await db.notes.clear(); + await db.noteFieldValues.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + localStorage.clear(); + syncQueue = new SyncQueue(); + }); + + afterEach(async () => { + await db.decks.clear(); + await db.cards.clear(); + await db.reviewLogs.clear(); + await db.noteTypes.clear(); + await db.noteFieldTypes.clear(); + await db.notes.clear(); + await db.noteFieldValues.clear(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + localStorage.clear(); + }); + + it("should process CRDT changes when present in pull response", async () => { + const deck = { + id: "deck-1", + userId: "user-1", + name: "CRDT Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const crdtResult = crdtDeckRepository.toCrdtDocument(deck); + + const pullFromServer = vi.fn().mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + crdtChanges: [ + { + documentId: crdtResult.documentId, + entityType: CrdtEntityType.Deck, + entityId: deck.id, + binary: binaryToBase64(crdtResult.binary), + }, + ], + currentSyncVersion: 5, + }); + + const pullService = new PullService({ + syncQueue, + pullFromServer, + }); + + await pullService.pull(); + + // Verify CRDT binary was stored + const storedBinary = await crdtSyncStateManager.getDocumentBinary( + CrdtEntityType.Deck, + deck.id, + ); + expect(storedBinary).toBeDefined(); + }); + + it("should handle pull response without CRDT changes", async () => { + const pullFromServer = vi.fn().mockResolvedValue({ + decks: [ + { + id: "deck-1", + userId: "user-1", + name: "Test Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + syncVersion: 1, + }, + ], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + // No crdtChanges field + currentSyncVersion: 1, + }); + + const pullService = new PullService({ + syncQueue, + pullFromServer, + }); + + // Should not throw even without crdtChanges + const result = await pullService.pull(); + + expect(result.decks).toHaveLength(1); + expect(syncQueue.getLastSyncVersion()).toBe(1); + }); + + it("should handle empty CRDT changes array", async () => { + const pullFromServer = vi.fn().mockResolvedValue({ + decks: [], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + crdtChanges: [], + currentSyncVersion: 1, + }); + + const pullService = new PullService({ + syncQueue, + pullFromServer, + }); + + const result = await pullService.pull(); + + expect(result.crdtChanges).toHaveLength(0); + expect(syncQueue.getLastSyncVersion()).toBe(1); + }); + + it("should process both regular data and CRDT changes", async () => { + // Create CRDT payload for a note type + const noteType = { + id: "note-type-1", + userId: "user-1", + name: "CRDT NoteType", + frontTemplate: "{{Front}}", + backTemplate: "{{Back}}", + isReversible: true, + createdAt: new Date("2024-01-01T10:00:00Z"), + updatedAt: new Date("2024-01-02T15:30:00Z"), + deletedAt: null, + syncVersion: 5, + _synced: false as const, + }; + const crdtResult = crdtNoteTypeRepository.toCrdtDocument(noteType); + + const pullFromServer = vi.fn().mockResolvedValue({ + decks: [ + { + id: "deck-1", + userId: "user-1", + name: "Regular Deck", + description: null, + newCardsPerDay: 20, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + syncVersion: 5, + }, + ], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + crdtChanges: [ + { + documentId: crdtResult.documentId, + entityType: CrdtEntityType.NoteType, + entityId: noteType.id, + binary: binaryToBase64(crdtResult.binary), + }, + ], + currentSyncVersion: 5, + }); + + const pullService = new PullService({ + syncQueue, + pullFromServer, + }); + + await pullService.pull(); + + // Verify regular deck was applied + const storedDeck = await localDeckRepository.findById("deck-1"); + expect(storedDeck).toBeDefined(); + expect(storedDeck?.name).toBe("Regular Deck"); + + // Verify CRDT binary was stored for note type + const storedBinary = await crdtSyncStateManager.getDocumentBinary( + CrdtEntityType.NoteType, + noteType.id, + ); + expect(storedBinary).toBeDefined(); + }); +}); diff --git a/src/client/sync/pull.ts b/src/client/sync/pull.ts index 8b55a9b..4771757 100644 --- a/src/client/sync/pull.ts +++ b/src/client/sync/pull.ts @@ -10,6 +10,16 @@ import type { LocalReviewLog, RatingType, } from "../db/index"; +import { + base64ToBinary, + CrdtEntityType, + type CrdtEntityTypeValue, + type CrdtSyncPayload, + crdtSyncStateManager, + getCrdtRepository, + mergeAndConvert, + parseDocumentId, +} from "./crdt"; import type { SyncQueue } from "./queue"; /** @@ -136,6 +146,8 @@ export interface SyncPullResult { noteFieldTypes: ServerNoteFieldType[]; notes: ServerNote[]; noteFieldValues: ServerNoteFieldValue[]; + /** CRDT document changes for conflict-free sync */ + crdtChanges?: CrdtSyncPayload[]; currentSyncVersion: number; } @@ -309,6 +321,147 @@ export function pullResultToLocalData(result: SyncPullResult): { } /** + * Result of applying CRDT changes + */ +export interface ApplyCrdtChangesResult { + /** Number of documents that were merged */ + merged: number; + /** Number of documents that were new (no local version) */ + created: number; + /** Merged entities grouped by type */ + entities: { + decks: Omit<LocalDeck, "_synced">[]; + noteTypes: Omit<LocalNoteType, "_synced">[]; + noteFieldTypes: Omit<LocalNoteFieldType, "_synced">[]; + notes: Omit<LocalNote, "_synced">[]; + noteFieldValues: Omit<LocalNoteFieldValue, "_synced">[]; + cards: Omit<LocalCard, "_synced">[]; + reviewLogs: Omit<LocalReviewLog, "_synced">[]; + }; +} + +/** + * Apply CRDT changes from pull response + * + * This function: + * 1. Parses each CRDT payload from the server + * 2. Loads existing local CRDT document if present + * 3. Merges remote and local documents using Automerge + * 4. Stores the merged binary in sync state + * 5. Converts merged document to local entity format + * + * @param crdtChanges Array of CRDT payloads from server + * @param syncVersion The sync version to associate with merged documents + * @returns Result containing merged entities ready for database storage + */ +export async function applyCrdtChanges( + crdtChanges: CrdtSyncPayload[], + syncVersion: number, +): Promise<ApplyCrdtChangesResult> { + const result: ApplyCrdtChangesResult = { + merged: 0, + created: 0, + entities: { + decks: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + cards: [], + reviewLogs: [], + }, + }; + + // Process each CRDT payload + for (const payload of crdtChanges) { + const parsed = parseDocumentId(payload.documentId); + if (!parsed) { + console.warn(`Invalid document ID: ${payload.documentId}`); + continue; + } + + const { entityType, entityId } = parsed; + + // Load existing local CRDT binary if present + const localBinary = await crdtSyncStateManager.getDocumentBinary( + entityType, + entityId, + ); + + // Convert remote payload binary from base64 + const remoteBinary = base64ToBinary(payload.binary); + + // Get the repository and merge based on entity type + // We use type assertions here because getCrdtRepository returns a union type + // but we know the specific type based on entityType + const repository = getCrdtRepository(entityType); + const mergeResult = mergeAndConvert( + localBinary, + remoteBinary, + // biome-ignore lint/suspicious/noExplicitAny: Repository type is determined by entityType at runtime + repository as any, + ); + + // Store the merged binary in sync state + await crdtSyncStateManager.setDocumentBinary( + entityType, + entityId, + mergeResult.binary, + syncVersion, + ); + + // Track statistics + if (localBinary === null) { + result.created++; + } else { + result.merged++; + } + + // Add entity to the appropriate array based on type + addEntityToResult(result.entities, entityType, mergeResult.entity); + } + + return result; +} + +/** + * Helper to add an entity to the result based on its type + */ +function addEntityToResult( + entities: ApplyCrdtChangesResult["entities"], + entityType: CrdtEntityTypeValue, + entity: unknown, +): void { + switch (entityType) { + case CrdtEntityType.Deck: + entities.decks.push(entity as Omit<LocalDeck, "_synced">); + break; + case CrdtEntityType.NoteType: + entities.noteTypes.push(entity as Omit<LocalNoteType, "_synced">); + break; + case CrdtEntityType.NoteFieldType: + entities.noteFieldTypes.push( + entity as Omit<LocalNoteFieldType, "_synced">, + ); + break; + case CrdtEntityType.Note: + entities.notes.push(entity as Omit<LocalNote, "_synced">); + break; + case CrdtEntityType.NoteFieldValue: + entities.noteFieldValues.push( + entity as Omit<LocalNoteFieldValue, "_synced">, + ); + break; + case CrdtEntityType.Card: + entities.cards.push(entity as Omit<LocalCard, "_synced">); + break; + case CrdtEntityType.ReviewLog: + entities.reviewLogs.push(entity as Omit<LocalReviewLog, "_synced">); + break; + } +} + +/** * Pull sync service * * Handles pulling changes from the server: @@ -339,6 +492,11 @@ export class PullService { // Pull changes from server const result = await this.pullFromServer(lastSyncVersion); + // Process CRDT changes if present + if (result.crdtChanges && result.crdtChanges.length > 0) { + await applyCrdtChanges(result.crdtChanges, result.currentSyncVersion); + } + // If there are changes, apply them to local database if ( result.decks.length > 0 || |
