diff options
| author | nsfisis <nsfisis@gmail.com> | 2025-12-31 15:32:58 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2025-12-31 15:32:58 +0900 |
| commit | f3952a509b2d98a25cbb80c9ad091b3b471be52e (patch) | |
| tree | 23fdc7a024ab028f5c7d5fca2d7ea8e66073d3df /src/client/sync/pull.ts | |
| parent | a1383a9304ff457d6671e12ded4265b135256004 (diff) | |
| download | kioku-f3952a509b2d98a25cbb80c9ad091b3b471be52e.tar.gz kioku-f3952a509b2d98a25cbb80c9ad091b3b471be52e.tar.zst kioku-f3952a509b2d98a25cbb80c9ad091b3b471be52e.zip | |
feat(crdt): handle crdtChanges in sync pull response
Add applyCrdtChanges function to process CRDT payloads received from
the server during pull operations. The function merges remote documents
with local ones using Automerge and stores the result in sync state.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'src/client/sync/pull.ts')
| -rw-r--r-- | src/client/sync/pull.ts | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/src/client/sync/pull.ts b/src/client/sync/pull.ts index 8b55a9b..4771757 100644 --- a/src/client/sync/pull.ts +++ b/src/client/sync/pull.ts @@ -10,6 +10,16 @@ import type { LocalReviewLog, RatingType, } from "../db/index"; +import { + base64ToBinary, + CrdtEntityType, + type CrdtEntityTypeValue, + type CrdtSyncPayload, + crdtSyncStateManager, + getCrdtRepository, + mergeAndConvert, + parseDocumentId, +} from "./crdt"; import type { SyncQueue } from "./queue"; /** @@ -136,6 +146,8 @@ export interface SyncPullResult { noteFieldTypes: ServerNoteFieldType[]; notes: ServerNote[]; noteFieldValues: ServerNoteFieldValue[]; + /** CRDT document changes for conflict-free sync */ + crdtChanges?: CrdtSyncPayload[]; currentSyncVersion: number; } @@ -309,6 +321,147 @@ export function pullResultToLocalData(result: SyncPullResult): { } /** + * Result of applying CRDT changes + */ +export interface ApplyCrdtChangesResult { + /** Number of documents that were merged */ + merged: number; + /** Number of documents that were new (no local version) */ + created: number; + /** Merged entities grouped by type */ + entities: { + decks: Omit<LocalDeck, "_synced">[]; + noteTypes: Omit<LocalNoteType, "_synced">[]; + noteFieldTypes: Omit<LocalNoteFieldType, "_synced">[]; + notes: Omit<LocalNote, "_synced">[]; + noteFieldValues: Omit<LocalNoteFieldValue, "_synced">[]; + cards: Omit<LocalCard, "_synced">[]; + reviewLogs: Omit<LocalReviewLog, "_synced">[]; + }; +} + +/** + * Apply CRDT changes from pull response + * + * This function: + * 1. Parses each CRDT payload from the server + * 2. Loads existing local CRDT document if present + * 3. Merges remote and local documents using Automerge + * 4. Stores the merged binary in sync state + * 5. Converts merged document to local entity format + * + * @param crdtChanges Array of CRDT payloads from server + * @param syncVersion The sync version to associate with merged documents + * @returns Result containing merged entities ready for database storage + */ +export async function applyCrdtChanges( + crdtChanges: CrdtSyncPayload[], + syncVersion: number, +): Promise<ApplyCrdtChangesResult> { + const result: ApplyCrdtChangesResult = { + merged: 0, + created: 0, + entities: { + decks: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + cards: [], + reviewLogs: [], + }, + }; + + // Process each CRDT payload + for (const payload of crdtChanges) { + const parsed = parseDocumentId(payload.documentId); + if (!parsed) { + console.warn(`Invalid document ID: ${payload.documentId}`); + continue; + } + + const { entityType, entityId } = parsed; + + // Load existing local CRDT binary if present + const localBinary = await crdtSyncStateManager.getDocumentBinary( + entityType, + entityId, + ); + + // Convert remote payload binary from base64 + const remoteBinary = base64ToBinary(payload.binary); + + // Get the repository and merge based on entity type + // We use type assertions here because getCrdtRepository returns a union type + // but we know the specific type based on entityType + const repository = getCrdtRepository(entityType); + const mergeResult = mergeAndConvert( + localBinary, + remoteBinary, + // biome-ignore lint/suspicious/noExplicitAny: Repository type is determined by entityType at runtime + repository as any, + ); + + // Store the merged binary in sync state + await crdtSyncStateManager.setDocumentBinary( + entityType, + entityId, + mergeResult.binary, + syncVersion, + ); + + // Track statistics + if (localBinary === null) { + result.created++; + } else { + result.merged++; + } + + // Add entity to the appropriate array based on type + addEntityToResult(result.entities, entityType, mergeResult.entity); + } + + return result; +} + +/** + * Helper to add an entity to the result based on its type + */ +function addEntityToResult( + entities: ApplyCrdtChangesResult["entities"], + entityType: CrdtEntityTypeValue, + entity: unknown, +): void { + switch (entityType) { + case CrdtEntityType.Deck: + entities.decks.push(entity as Omit<LocalDeck, "_synced">); + break; + case CrdtEntityType.NoteType: + entities.noteTypes.push(entity as Omit<LocalNoteType, "_synced">); + break; + case CrdtEntityType.NoteFieldType: + entities.noteFieldTypes.push( + entity as Omit<LocalNoteFieldType, "_synced">, + ); + break; + case CrdtEntityType.Note: + entities.notes.push(entity as Omit<LocalNote, "_synced">); + break; + case CrdtEntityType.NoteFieldValue: + entities.noteFieldValues.push( + entity as Omit<LocalNoteFieldValue, "_synced">, + ); + break; + case CrdtEntityType.Card: + entities.cards.push(entity as Omit<LocalCard, "_synced">); + break; + case CrdtEntityType.ReviewLog: + entities.reviewLogs.push(entity as Omit<LocalReviewLog, "_synced">); + break; + } +} + +/** * Pull sync service * * Handles pulling changes from the server: @@ -339,6 +492,11 @@ export class PullService { // Pull changes from server const result = await this.pullFromServer(lastSyncVersion); + // Process CRDT changes if present + if (result.crdtChanges && result.crdtChanges.length > 0) { + await applyCrdtChanges(result.crdtChanges, result.currentSyncVersion); + } + // If there are changes, apply them to local database if ( result.decks.length > 0 || |
