diff options
Diffstat (limited to 'src/client/sync/pull.ts')
| -rw-r--r-- | src/client/sync/pull.ts | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/src/client/sync/pull.ts b/src/client/sync/pull.ts index 8b55a9b..4771757 100644 --- a/src/client/sync/pull.ts +++ b/src/client/sync/pull.ts @@ -10,6 +10,16 @@ import type { LocalReviewLog, RatingType, } from "../db/index"; +import { + base64ToBinary, + CrdtEntityType, + type CrdtEntityTypeValue, + type CrdtSyncPayload, + crdtSyncStateManager, + getCrdtRepository, + mergeAndConvert, + parseDocumentId, +} from "./crdt"; import type { SyncQueue } from "./queue"; /** @@ -136,6 +146,8 @@ export interface SyncPullResult { noteFieldTypes: ServerNoteFieldType[]; notes: ServerNote[]; noteFieldValues: ServerNoteFieldValue[]; + /** CRDT document changes for conflict-free sync */ + crdtChanges?: CrdtSyncPayload[]; currentSyncVersion: number; } @@ -309,6 +321,147 @@ export function pullResultToLocalData(result: SyncPullResult): { } /** + * Result of applying CRDT changes + */ +export interface ApplyCrdtChangesResult { + /** Number of documents that were merged */ + merged: number; + /** Number of documents that were new (no local version) */ + created: number; + /** Merged entities grouped by type */ + entities: { + decks: Omit<LocalDeck, "_synced">[]; + noteTypes: Omit<LocalNoteType, "_synced">[]; + noteFieldTypes: Omit<LocalNoteFieldType, "_synced">[]; + notes: Omit<LocalNote, "_synced">[]; + noteFieldValues: Omit<LocalNoteFieldValue, "_synced">[]; + cards: Omit<LocalCard, "_synced">[]; + reviewLogs: Omit<LocalReviewLog, "_synced">[]; + }; +} + +/** + * Apply CRDT changes from pull response + * + * This function: + * 1. Parses each CRDT payload from the server + * 2. Loads existing local CRDT document if present + * 3. Merges remote and local documents using Automerge + * 4. Stores the merged binary in sync state + * 5. Converts merged document to local entity format + * + * @param crdtChanges Array of CRDT payloads from server + * @param syncVersion The sync version to associate with merged documents + * @returns Result containing merged entities ready for database storage + */ +export async function applyCrdtChanges( + crdtChanges: CrdtSyncPayload[], + syncVersion: number, +): Promise<ApplyCrdtChangesResult> { + const result: ApplyCrdtChangesResult = { + merged: 0, + created: 0, + entities: { + decks: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + cards: [], + reviewLogs: [], + }, + }; + + // Process each CRDT payload + for (const payload of crdtChanges) { + const parsed = parseDocumentId(payload.documentId); + if (!parsed) { + console.warn(`Invalid document ID: ${payload.documentId}`); + continue; + } + + const { entityType, entityId } = parsed; + + // Load existing local CRDT binary if present + const localBinary = await crdtSyncStateManager.getDocumentBinary( + entityType, + entityId, + ); + + // Convert remote payload binary from base64 + const remoteBinary = base64ToBinary(payload.binary); + + // Get the repository and merge based on entity type + // We use type assertions here because getCrdtRepository returns a union type + // but we know the specific type based on entityType + const repository = getCrdtRepository(entityType); + const mergeResult = mergeAndConvert( + localBinary, + remoteBinary, + // biome-ignore lint/suspicious/noExplicitAny: Repository type is determined by entityType at runtime + repository as any, + ); + + // Store the merged binary in sync state + await crdtSyncStateManager.setDocumentBinary( + entityType, + entityId, + mergeResult.binary, + syncVersion, + ); + + // Track statistics + if (localBinary === null) { + result.created++; + } else { + result.merged++; + } + + // Add entity to the appropriate array based on type + addEntityToResult(result.entities, entityType, mergeResult.entity); + } + + return result; +} + +/** + * Helper to add an entity to the result based on its type + */ +function addEntityToResult( + entities: ApplyCrdtChangesResult["entities"], + entityType: CrdtEntityTypeValue, + entity: unknown, +): void { + switch (entityType) { + case CrdtEntityType.Deck: + entities.decks.push(entity as Omit<LocalDeck, "_synced">); + break; + case CrdtEntityType.NoteType: + entities.noteTypes.push(entity as Omit<LocalNoteType, "_synced">); + break; + case CrdtEntityType.NoteFieldType: + entities.noteFieldTypes.push( + entity as Omit<LocalNoteFieldType, "_synced">, + ); + break; + case CrdtEntityType.Note: + entities.notes.push(entity as Omit<LocalNote, "_synced">); + break; + case CrdtEntityType.NoteFieldValue: + entities.noteFieldValues.push( + entity as Omit<LocalNoteFieldValue, "_synced">, + ); + break; + case CrdtEntityType.Card: + entities.cards.push(entity as Omit<LocalCard, "_synced">); + break; + case CrdtEntityType.ReviewLog: + entities.reviewLogs.push(entity as Omit<LocalReviewLog, "_synced">); + break; + } +} + +/** * Pull sync service * * Handles pulling changes from the server: @@ -339,6 +492,11 @@ export class PullService { // Pull changes from server const result = await this.pullFromServer(lastSyncVersion); + // Process CRDT changes if present + if (result.crdtChanges && result.crdtChanges.length > 0) { + await applyCrdtChanges(result.crdtChanges, result.currentSyncVersion); + } + // If there are changes, apply them to local database if ( result.decks.length > 0 || |
