aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/client/sync/pull.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/sync/pull.ts')
-rw-r--r--src/client/sync/pull.ts158
1 files changed, 158 insertions, 0 deletions
diff --git a/src/client/sync/pull.ts b/src/client/sync/pull.ts
index 8b55a9b..4771757 100644
--- a/src/client/sync/pull.ts
+++ b/src/client/sync/pull.ts
@@ -10,6 +10,16 @@ import type {
LocalReviewLog,
RatingType,
} from "../db/index";
+import {
+ base64ToBinary,
+ CrdtEntityType,
+ type CrdtEntityTypeValue,
+ type CrdtSyncPayload,
+ crdtSyncStateManager,
+ getCrdtRepository,
+ mergeAndConvert,
+ parseDocumentId,
+} from "./crdt";
import type { SyncQueue } from "./queue";
/**
@@ -136,6 +146,8 @@ export interface SyncPullResult {
noteFieldTypes: ServerNoteFieldType[];
notes: ServerNote[];
noteFieldValues: ServerNoteFieldValue[];
+ /** CRDT document changes for conflict-free sync */
+ crdtChanges?: CrdtSyncPayload[];
currentSyncVersion: number;
}
@@ -309,6 +321,147 @@ export function pullResultToLocalData(result: SyncPullResult): {
}
/**
+ * Result of applying CRDT changes
+ */
+export interface ApplyCrdtChangesResult {
+ /** Number of documents that were merged */
+ merged: number;
+ /** Number of documents that were new (no local version) */
+ created: number;
+ /** Merged entities grouped by type */
+ entities: {
+ decks: Omit<LocalDeck, "_synced">[];
+ noteTypes: Omit<LocalNoteType, "_synced">[];
+ noteFieldTypes: Omit<LocalNoteFieldType, "_synced">[];
+ notes: Omit<LocalNote, "_synced">[];
+ noteFieldValues: Omit<LocalNoteFieldValue, "_synced">[];
+ cards: Omit<LocalCard, "_synced">[];
+ reviewLogs: Omit<LocalReviewLog, "_synced">[];
+ };
+}
+
+/**
+ * Apply CRDT changes from pull response
+ *
+ * This function:
+ * 1. Parses each CRDT payload from the server
+ * 2. Loads existing local CRDT document if present
+ * 3. Merges remote and local documents using Automerge
+ * 4. Stores the merged binary in sync state
+ * 5. Converts merged document to local entity format
+ *
+ * @param crdtChanges Array of CRDT payloads from server
+ * @param syncVersion The sync version to associate with merged documents
+ * @returns Result containing merged entities ready for database storage
+ */
+export async function applyCrdtChanges(
+ crdtChanges: CrdtSyncPayload[],
+ syncVersion: number,
+): Promise<ApplyCrdtChangesResult> {
+ const result: ApplyCrdtChangesResult = {
+ merged: 0,
+ created: 0,
+ entities: {
+ decks: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ cards: [],
+ reviewLogs: [],
+ },
+ };
+
+ // Process each CRDT payload
+ for (const payload of crdtChanges) {
+ const parsed = parseDocumentId(payload.documentId);
+ if (!parsed) {
+ console.warn(`Invalid document ID: ${payload.documentId}`);
+ continue;
+ }
+
+ const { entityType, entityId } = parsed;
+
+ // Load existing local CRDT binary if present
+ const localBinary = await crdtSyncStateManager.getDocumentBinary(
+ entityType,
+ entityId,
+ );
+
+ // Convert remote payload binary from base64
+ const remoteBinary = base64ToBinary(payload.binary);
+
+ // Get the repository and merge based on entity type
+ // We use type assertions here because getCrdtRepository returns a union type
+ // but we know the specific type based on entityType
+ const repository = getCrdtRepository(entityType);
+ const mergeResult = mergeAndConvert(
+ localBinary,
+ remoteBinary,
+ // biome-ignore lint/suspicious/noExplicitAny: Repository type is determined by entityType at runtime
+ repository as any,
+ );
+
+ // Store the merged binary in sync state
+ await crdtSyncStateManager.setDocumentBinary(
+ entityType,
+ entityId,
+ mergeResult.binary,
+ syncVersion,
+ );
+
+ // Track statistics
+ if (localBinary === null) {
+ result.created++;
+ } else {
+ result.merged++;
+ }
+
+ // Add entity to the appropriate array based on type
+ addEntityToResult(result.entities, entityType, mergeResult.entity);
+ }
+
+ return result;
+}
+
+/**
+ * Helper to add an entity to the result based on its type
+ */
+function addEntityToResult(
+ entities: ApplyCrdtChangesResult["entities"],
+ entityType: CrdtEntityTypeValue,
+ entity: unknown,
+): void {
+ switch (entityType) {
+ case CrdtEntityType.Deck:
+ entities.decks.push(entity as Omit<LocalDeck, "_synced">);
+ break;
+ case CrdtEntityType.NoteType:
+ entities.noteTypes.push(entity as Omit<LocalNoteType, "_synced">);
+ break;
+ case CrdtEntityType.NoteFieldType:
+ entities.noteFieldTypes.push(
+ entity as Omit<LocalNoteFieldType, "_synced">,
+ );
+ break;
+ case CrdtEntityType.Note:
+ entities.notes.push(entity as Omit<LocalNote, "_synced">);
+ break;
+ case CrdtEntityType.NoteFieldValue:
+ entities.noteFieldValues.push(
+ entity as Omit<LocalNoteFieldValue, "_synced">,
+ );
+ break;
+ case CrdtEntityType.Card:
+ entities.cards.push(entity as Omit<LocalCard, "_synced">);
+ break;
+ case CrdtEntityType.ReviewLog:
+ entities.reviewLogs.push(entity as Omit<LocalReviewLog, "_synced">);
+ break;
+ }
+}
+
+/**
* Pull sync service
*
* Handles pulling changes from the server:
@@ -339,6 +492,11 @@ export class PullService {
// Pull changes from server
const result = await this.pullFromServer(lastSyncVersion);
+ // Process CRDT changes if present
+ if (result.crdtChanges && result.crdtChanges.length > 0) {
+ await applyCrdtChanges(result.crdtChanges, result.currentSyncVersion);
+ }
+
// If there are changes, apply them to local database
if (
result.decks.length > 0 ||