diff options
Diffstat (limited to 'src/server/repositories/sync.ts')
| -rw-r--r-- | src/server/repositories/sync.ts | 123 |
1 files changed, 123 insertions, 0 deletions
diff --git a/src/server/repositories/sync.ts b/src/server/repositories/sync.ts index 59a195a..188bd1b 100644 --- a/src/server/repositories/sync.ts +++ b/src/server/repositories/sync.ts @@ -9,6 +9,7 @@ import { noteTypes, reviewLogs, } from "../db/schema.js"; +import { type CrdtEntityTypeValue, crdtDocuments } from "../db/schema-crdt.js"; import type { Card, Deck, @@ -20,6 +21,20 @@ import type { } from "./types.js"; /** + * CRDT sync payload for conflict-free synchronization + */ +export interface CrdtSyncPayload { + /** Document ID in format entityType:entityId */ + documentId: string; + /** Entity type */ + entityType: CrdtEntityTypeValue; + /** Entity ID */ + entityId: string; + /** Base64-encoded CRDT binary */ + binary: string; +} + +/** * Sync data types for push/pull operations */ export interface SyncPushData { @@ -30,6 +45,8 @@ export interface SyncPushData { noteFieldTypes: SyncNoteFieldTypeData[]; notes: SyncNoteData[]; noteFieldValues: SyncNoteFieldValueData[]; + /** CRDT document changes for conflict-free sync */ + crdtChanges?: CrdtSyncPayload[]; } export interface SyncDeckData { @@ -122,6 +139,12 @@ export interface SyncPushResult { noteFieldTypes: { id: string; syncVersion: number }[]; notes: { id: string; syncVersion: number }[]; noteFieldValues: { id: string; syncVersion: number }[]; + /** CRDT documents that were stored/merged */ + crdtChanges: { + entityType: CrdtEntityTypeValue; + entityId: string; + syncVersion: number; + }[]; conflicts: { decks: string[]; cards: string[]; @@ -144,6 +167,8 @@ export interface SyncPullResult { noteFieldTypes: NoteFieldType[]; notes: Note[]; noteFieldValues: NoteFieldValue[]; + /** CRDT document changes for conflict-free sync */ + crdtChanges: CrdtSyncPayload[]; currentSyncVersion: number; } @@ -165,6 +190,7 @@ export const syncRepository: SyncRepository = { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -768,6 +794,78 @@ export const syncRepository: SyncRepository = { } } + // Process CRDT changes + if (data.crdtChanges && data.crdtChanges.length > 0) { + for (const crdtChange of data.crdtChanges) { + // Check if document exists + const existing = await db + .select({ + id: crdtDocuments.id, + syncVersion: crdtDocuments.syncVersion, + }) + .from(crdtDocuments) + .where( + and( + eq(crdtDocuments.userId, userId), + eq(crdtDocuments.entityType, crdtChange.entityType), + eq(crdtDocuments.entityId, crdtChange.entityId), + ), + ); + + if (existing.length === 0) { + // New document - insert + const [inserted] = await db + .insert(crdtDocuments) + .values({ + userId, + entityType: crdtChange.entityType, + entityId: crdtChange.entityId, + binary: crdtChange.binary, + syncVersion: 1, + }) + .returning({ + syncVersion: crdtDocuments.syncVersion, + }); + + if (inserted) { + result.crdtChanges.push({ + entityType: crdtChange.entityType as CrdtEntityTypeValue, + entityId: crdtChange.entityId, + syncVersion: inserted.syncVersion, + }); + } + } else { + // Existing document - update (store new binary, merge happens on client) + // For now, we just store the latest binary. Future: server-side Automerge merge + const [updated] = await db + .update(crdtDocuments) + .set({ + binary: crdtChange.binary, + updatedAt: new Date(), + syncVersion: sql`${crdtDocuments.syncVersion} + 1`, + }) + .where( + and( + eq(crdtDocuments.userId, userId), + eq(crdtDocuments.entityType, crdtChange.entityType), + eq(crdtDocuments.entityId, crdtChange.entityId), + ), + ) + .returning({ + syncVersion: crdtDocuments.syncVersion, + }); + + if (updated) { + result.crdtChanges.push({ + entityType: crdtChange.entityType as CrdtEntityTypeValue, + entityId: crdtChange.entityId, + syncVersion: updated.syncVersion, + }); + } + } + } + } + return result; }, @@ -882,6 +980,25 @@ export const syncRepository: SyncRepository = { ); } + // Get all CRDT documents for user with syncVersion > lastSyncVersion + const pulledCrdtDocs = await db + .select() + .from(crdtDocuments) + .where( + and( + eq(crdtDocuments.userId, userId), + gt(crdtDocuments.syncVersion, lastSyncVersion), + ), + ); + + // Convert CRDT documents to sync payload format + const pulledCrdtChanges: CrdtSyncPayload[] = pulledCrdtDocs.map((doc) => ({ + documentId: `${doc.entityType}:${doc.entityId}`, + entityType: doc.entityType as CrdtEntityTypeValue, + entityId: doc.entityId, + binary: doc.binary, + })); + // Calculate current max sync version across all entities let currentSyncVersion = lastSyncVersion; @@ -920,6 +1037,11 @@ export const syncRepository: SyncRepository = { currentSyncVersion = fieldValue.syncVersion; } } + for (const crdtDoc of pulledCrdtDocs) { + if (crdtDoc.syncVersion > currentSyncVersion) { + currentSyncVersion = crdtDoc.syncVersion; + } + } return { decks: pulledDecks, @@ -929,6 +1051,7 @@ export const syncRepository: SyncRepository = { noteFieldTypes: pulledNoteFieldTypes, notes: pulledNotes, noteFieldValues: pulledNoteFieldValues, + crdtChanges: pulledCrdtChanges, currentSyncVersion, }; }, |
