aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/server/repositories/sync.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/repositories/sync.ts')
-rw-r--r--src/server/repositories/sync.ts123
1 files changed, 123 insertions, 0 deletions
diff --git a/src/server/repositories/sync.ts b/src/server/repositories/sync.ts
index 59a195a..188bd1b 100644
--- a/src/server/repositories/sync.ts
+++ b/src/server/repositories/sync.ts
@@ -9,6 +9,7 @@ import {
noteTypes,
reviewLogs,
} from "../db/schema.js";
+import { type CrdtEntityTypeValue, crdtDocuments } from "../db/schema-crdt.js";
import type {
Card,
Deck,
@@ -20,6 +21,20 @@ import type {
} from "./types.js";
/**
+ * CRDT sync payload for conflict-free synchronization
+ */
+export interface CrdtSyncPayload {
+ /** Document ID in format entityType:entityId */
+ documentId: string;
+ /** Entity type */
+ entityType: CrdtEntityTypeValue;
+ /** Entity ID */
+ entityId: string;
+ /** Base64-encoded CRDT binary */
+ binary: string;
+}
+
+/**
* Sync data types for push/pull operations
*/
export interface SyncPushData {
@@ -30,6 +45,8 @@ export interface SyncPushData {
noteFieldTypes: SyncNoteFieldTypeData[];
notes: SyncNoteData[];
noteFieldValues: SyncNoteFieldValueData[];
+ /** CRDT document changes for conflict-free sync */
+ crdtChanges?: CrdtSyncPayload[];
}
export interface SyncDeckData {
@@ -122,6 +139,12 @@ export interface SyncPushResult {
noteFieldTypes: { id: string; syncVersion: number }[];
notes: { id: string; syncVersion: number }[];
noteFieldValues: { id: string; syncVersion: number }[];
+ /** CRDT documents that were stored/merged */
+ crdtChanges: {
+ entityType: CrdtEntityTypeValue;
+ entityId: string;
+ syncVersion: number;
+ }[];
conflicts: {
decks: string[];
cards: string[];
@@ -144,6 +167,8 @@ export interface SyncPullResult {
noteFieldTypes: NoteFieldType[];
notes: Note[];
noteFieldValues: NoteFieldValue[];
+ /** CRDT document changes for conflict-free sync */
+ crdtChanges: CrdtSyncPayload[];
currentSyncVersion: number;
}
@@ -165,6 +190,7 @@ export const syncRepository: SyncRepository = {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -768,6 +794,78 @@ export const syncRepository: SyncRepository = {
}
}
+ // Process CRDT changes
+ if (data.crdtChanges && data.crdtChanges.length > 0) {
+ for (const crdtChange of data.crdtChanges) {
+ // Check if document exists
+ const existing = await db
+ .select({
+ id: crdtDocuments.id,
+ syncVersion: crdtDocuments.syncVersion,
+ })
+ .from(crdtDocuments)
+ .where(
+ and(
+ eq(crdtDocuments.userId, userId),
+ eq(crdtDocuments.entityType, crdtChange.entityType),
+ eq(crdtDocuments.entityId, crdtChange.entityId),
+ ),
+ );
+
+ if (existing.length === 0) {
+ // New document - insert
+ const [inserted] = await db
+ .insert(crdtDocuments)
+ .values({
+ userId,
+ entityType: crdtChange.entityType,
+ entityId: crdtChange.entityId,
+ binary: crdtChange.binary,
+ syncVersion: 1,
+ })
+ .returning({
+ syncVersion: crdtDocuments.syncVersion,
+ });
+
+ if (inserted) {
+ result.crdtChanges.push({
+ entityType: crdtChange.entityType as CrdtEntityTypeValue,
+ entityId: crdtChange.entityId,
+ syncVersion: inserted.syncVersion,
+ });
+ }
+ } else {
+ // Existing document - update (store new binary, merge happens on client)
+ // For now, we just store the latest binary. Future: server-side Automerge merge
+ const [updated] = await db
+ .update(crdtDocuments)
+ .set({
+ binary: crdtChange.binary,
+ updatedAt: new Date(),
+ syncVersion: sql`${crdtDocuments.syncVersion} + 1`,
+ })
+ .where(
+ and(
+ eq(crdtDocuments.userId, userId),
+ eq(crdtDocuments.entityType, crdtChange.entityType),
+ eq(crdtDocuments.entityId, crdtChange.entityId),
+ ),
+ )
+ .returning({
+ syncVersion: crdtDocuments.syncVersion,
+ });
+
+ if (updated) {
+ result.crdtChanges.push({
+ entityType: crdtChange.entityType as CrdtEntityTypeValue,
+ entityId: crdtChange.entityId,
+ syncVersion: updated.syncVersion,
+ });
+ }
+ }
+ }
+ }
+
return result;
},
@@ -882,6 +980,25 @@ export const syncRepository: SyncRepository = {
);
}
+ // Get all CRDT documents for user with syncVersion > lastSyncVersion
+ const pulledCrdtDocs = await db
+ .select()
+ .from(crdtDocuments)
+ .where(
+ and(
+ eq(crdtDocuments.userId, userId),
+ gt(crdtDocuments.syncVersion, lastSyncVersion),
+ ),
+ );
+
+ // Convert CRDT documents to sync payload format
+ const pulledCrdtChanges: CrdtSyncPayload[] = pulledCrdtDocs.map((doc) => ({
+ documentId: `${doc.entityType}:${doc.entityId}`,
+ entityType: doc.entityType as CrdtEntityTypeValue,
+ entityId: doc.entityId,
+ binary: doc.binary,
+ }));
+
// Calculate current max sync version across all entities
let currentSyncVersion = lastSyncVersion;
@@ -920,6 +1037,11 @@ export const syncRepository: SyncRepository = {
currentSyncVersion = fieldValue.syncVersion;
}
}
+ for (const crdtDoc of pulledCrdtDocs) {
+ if (crdtDoc.syncVersion > currentSyncVersion) {
+ currentSyncVersion = crdtDoc.syncVersion;
+ }
+ }
return {
decks: pulledDecks,
@@ -929,6 +1051,7 @@ export const syncRepository: SyncRepository = {
noteFieldTypes: pulledNoteFieldTypes,
notes: pulledNotes,
noteFieldValues: pulledNoteFieldValues,
+ crdtChanges: pulledCrdtChanges,
currentSyncVersion,
};
},