diff options
| author | nsfisis <nsfisis@gmail.com> | 2025-12-31 16:20:40 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2025-12-31 16:20:40 +0900 |
| commit | 26b6408c84bfcc46f3d470292688e4ffaf0265f2 (patch) | |
| tree | 52160cdfab669ec0a20886e9f5bd9151f3eeebd3 | |
| parent | e4aeded6c105de6c8af6a931d5c24a659dcbd138 (diff) | |
| download | kioku-26b6408c84bfcc46f3d470292688e4ffaf0265f2.tar.gz kioku-26b6408c84bfcc46f3d470292688e4ffaf0265f2.tar.zst kioku-26b6408c84bfcc46f3d470292688e4ffaf0265f2.zip | |
feat(crdt): add server-side CRDT sync API handling
Add crdtChanges field to sync push/pull endpoints for CRDT document
synchronization. The server now stores and retrieves CRDT binaries
from the crdt_documents table, enabling conflict-free sync between
clients.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| -rw-r--r-- | docs/dev/roadmap.md | 4 | ||||
| -rw-r--r-- | src/server/repositories/sync.ts | 123 | ||||
| -rw-r--r-- | src/server/routes/sync.test.ts | 159 | ||||
| -rw-r--r-- | src/server/routes/sync.ts | 21 |
4 files changed, 305 insertions, 2 deletions
diff --git a/docs/dev/roadmap.md b/docs/dev/roadmap.md index 36e17e6..df4e695 100644 --- a/docs/dev/roadmap.md +++ b/docs/dev/roadmap.md @@ -33,8 +33,8 @@ Replace the current Last-Write-Wins (LWW) conflict resolution with Automerge CRD - [x] Install server dependency: `@automerge/automerge` - [x] Create `src/server/db/schema-crdt.ts` - CRDT document storage schema - [x] Create database migration for crdt_documents table -- [ ] Modify `src/server/routes/sync.ts` - Handle CRDT changes in API -- [ ] Modify `src/server/repositories/sync.ts` - Store/merge CRDT documents +- [x] Modify `src/server/routes/sync.ts` - Handle CRDT changes in API +- [x] Modify `src/server/repositories/sync.ts` - Store/merge CRDT documents ### Phase 5: Migration diff --git a/src/server/repositories/sync.ts b/src/server/repositories/sync.ts index 59a195a..188bd1b 100644 --- a/src/server/repositories/sync.ts +++ b/src/server/repositories/sync.ts @@ -9,6 +9,7 @@ import { noteTypes, reviewLogs, } from "../db/schema.js"; +import { type CrdtEntityTypeValue, crdtDocuments } from "../db/schema-crdt.js"; import type { Card, Deck, @@ -20,6 +21,20 @@ import type { } from "./types.js"; /** + * CRDT sync payload for conflict-free synchronization + */ +export interface CrdtSyncPayload { + /** Document ID in format entityType:entityId */ + documentId: string; + /** Entity type */ + entityType: CrdtEntityTypeValue; + /** Entity ID */ + entityId: string; + /** Base64-encoded CRDT binary */ + binary: string; +} + +/** * Sync data types for push/pull operations */ export interface SyncPushData { @@ -30,6 +45,8 @@ export interface SyncPushData { noteFieldTypes: SyncNoteFieldTypeData[]; notes: SyncNoteData[]; noteFieldValues: SyncNoteFieldValueData[]; + /** CRDT document changes for conflict-free sync */ + crdtChanges?: CrdtSyncPayload[]; } export interface SyncDeckData { @@ -122,6 +139,12 @@ export interface SyncPushResult { noteFieldTypes: { id: string; syncVersion: number }[]; notes: { id: string; syncVersion: number }[]; noteFieldValues: { id: string; syncVersion: number }[]; + /** CRDT documents that were stored/merged */ + crdtChanges: { + entityType: CrdtEntityTypeValue; + entityId: string; + syncVersion: number; + }[]; conflicts: { decks: string[]; cards: string[]; @@ -144,6 +167,8 @@ export interface SyncPullResult { noteFieldTypes: NoteFieldType[]; notes: Note[]; noteFieldValues: NoteFieldValue[]; + /** CRDT document changes for conflict-free sync */ + crdtChanges: CrdtSyncPayload[]; currentSyncVersion: number; } @@ -165,6 +190,7 @@ export const syncRepository: SyncRepository = { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -768,6 +794,78 @@ export const syncRepository: SyncRepository = { } } + // Process CRDT changes + if (data.crdtChanges && data.crdtChanges.length > 0) { + for (const crdtChange of data.crdtChanges) { + // Check if document exists + const existing = await db + .select({ + id: crdtDocuments.id, + syncVersion: crdtDocuments.syncVersion, + }) + .from(crdtDocuments) + .where( + and( + eq(crdtDocuments.userId, userId), + eq(crdtDocuments.entityType, crdtChange.entityType), + eq(crdtDocuments.entityId, crdtChange.entityId), + ), + ); + + if (existing.length === 0) { + // New document - insert + const [inserted] = await db + .insert(crdtDocuments) + .values({ + userId, + entityType: crdtChange.entityType, + entityId: crdtChange.entityId, + binary: crdtChange.binary, + syncVersion: 1, + }) + .returning({ + syncVersion: crdtDocuments.syncVersion, + }); + + if (inserted) { + result.crdtChanges.push({ + entityType: crdtChange.entityType as CrdtEntityTypeValue, + entityId: crdtChange.entityId, + syncVersion: inserted.syncVersion, + }); + } + } else { + // Existing document - update (store new binary, merge happens on client) + // For now, we just store the latest binary. Future: server-side Automerge merge + const [updated] = await db + .update(crdtDocuments) + .set({ + binary: crdtChange.binary, + updatedAt: new Date(), + syncVersion: sql`${crdtDocuments.syncVersion} + 1`, + }) + .where( + and( + eq(crdtDocuments.userId, userId), + eq(crdtDocuments.entityType, crdtChange.entityType), + eq(crdtDocuments.entityId, crdtChange.entityId), + ), + ) + .returning({ + syncVersion: crdtDocuments.syncVersion, + }); + + if (updated) { + result.crdtChanges.push({ + entityType: crdtChange.entityType as CrdtEntityTypeValue, + entityId: crdtChange.entityId, + syncVersion: updated.syncVersion, + }); + } + } + } + } + return result; }, @@ -882,6 +980,25 @@ export const syncRepository: SyncRepository = { ); } + // Get all CRDT documents for user with syncVersion > lastSyncVersion + const pulledCrdtDocs = await db + .select() + .from(crdtDocuments) + .where( + and( + eq(crdtDocuments.userId, userId), + gt(crdtDocuments.syncVersion, lastSyncVersion), + ), + ); + + // Convert CRDT documents to sync payload format + const pulledCrdtChanges: CrdtSyncPayload[] = pulledCrdtDocs.map((doc) => ({ + documentId: `${doc.entityType}:${doc.entityId}`, + entityType: doc.entityType as CrdtEntityTypeValue, + entityId: doc.entityId, + binary: doc.binary, + })); + // Calculate current max sync version across all entities let currentSyncVersion = lastSyncVersion; @@ -920,6 +1037,11 @@ export const syncRepository: SyncRepository = { currentSyncVersion = fieldValue.syncVersion; } } + for (const crdtDoc of pulledCrdtDocs) { + if (crdtDoc.syncVersion > currentSyncVersion) { + currentSyncVersion = crdtDoc.syncVersion; + } + } return { decks: pulledDecks, @@ -929,6 +1051,7 @@ export const syncRepository: SyncRepository = { noteFieldTypes: pulledNoteFieldTypes, notes: pulledNotes, noteFieldValues: pulledNoteFieldValues, + crdtChanges: pulledCrdtChanges, currentSyncVersion, }; }, diff --git a/src/server/routes/sync.test.ts b/src/server/routes/sync.test.ts index f340af7..8ea2ce3 100644 --- a/src/server/routes/sync.test.ts +++ b/src/server/routes/sync.test.ts @@ -96,6 +96,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -133,6 +134,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], }); }); @@ -155,6 +157,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -212,6 +215,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -260,6 +264,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -311,6 +316,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: ["550e8400-e29b-41d4-a716-446655440003"], cards: [], @@ -474,6 +480,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -527,6 +534,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -590,6 +598,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -639,6 +648,7 @@ describe("POST /api/sync/push", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], conflicts: { decks: [], cards: [], @@ -667,6 +677,114 @@ describe("POST /api/sync/push", () => { const body = (await res.json()) as SyncPushResponse; expect(body.decks).toHaveLength(1); }); + + it("successfully pushes CRDT changes", async () => { + const crdtChange = { + documentId: "deck:550e8400-e29b-41d4-a716-446655440000", + entityType: "deck" as const, + entityId: "550e8400-e29b-41d4-a716-446655440000", + binary: "SGVsbG8gV29ybGQ=", // Base64 encoded test data + }; + + const mockResult: SyncPushResult = { + decks: [], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + crdtChanges: [ + { + entityType: "deck", + entityId: "550e8400-e29b-41d4-a716-446655440000", + syncVersion: 1, + }, + ], + conflicts: { + decks: [], + cards: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + }, + }; + vi.mocked(mockSyncRepo.pushChanges).mockResolvedValue(mockResult); + + const res = await app.request("/api/sync/push", { + method: "POST", + headers: { + Authorization: `Bearer ${authToken}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + decks: [], + cards: [], + reviewLogs: [], + crdtChanges: [crdtChange], + }), + }); + + expect(res.status).toBe(200); + const body = (await res.json()) as { crdtChanges?: unknown[] }; + expect(body.crdtChanges).toHaveLength(1); + expect(mockSyncRepo.pushChanges).toHaveBeenCalledWith( + userId, + expect.objectContaining({ + crdtChanges: [crdtChange], + }), + ); + }); + + it("validates CRDT changes have required fields", async () => { + const invalidCrdtChange = { + documentId: "deck:123", + entityType: "deck", + // missing entityId and binary + }; + + const res = await app.request("/api/sync/push", { + method: "POST", + headers: { + Authorization: `Bearer ${authToken}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + decks: [], + cards: [], + reviewLogs: [], + crdtChanges: [invalidCrdtChange], + }), + }); + + expect(res.status).toBe(400); + }); + + it("validates CRDT entity type is valid", async () => { + const invalidCrdtChange = { + documentId: "invalid:550e8400-e29b-41d4-a716-446655440000", + entityType: "invalidType", + entityId: "550e8400-e29b-41d4-a716-446655440000", + binary: "SGVsbG8gV29ybGQ=", + }; + + const res = await app.request("/api/sync/push", { + method: "POST", + headers: { + Authorization: `Bearer ${authToken}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + decks: [], + cards: [], + reviewLogs: [], + crdtChanges: [invalidCrdtChange], + }), + }); + + expect(res.status).toBe(400); + }); }); interface SyncPullResponse { @@ -727,6 +845,7 @@ describe("GET /api/sync/pull", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], currentSyncVersion: 1, }; vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); @@ -761,6 +880,7 @@ describe("GET /api/sync/pull", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], currentSyncVersion: 5, }; vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); @@ -801,6 +921,7 @@ describe("GET /api/sync/pull", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], currentSyncVersion: 2, }; vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); @@ -850,6 +971,7 @@ describe("GET /api/sync/pull", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], currentSyncVersion: 3, }; vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); @@ -892,6 +1014,7 @@ describe("GET /api/sync/pull", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], currentSyncVersion: 1, }; vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); @@ -968,6 +1091,7 @@ describe("GET /api/sync/pull", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], currentSyncVersion: 3, }; vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); @@ -1012,6 +1136,7 @@ describe("GET /api/sync/pull", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], currentSyncVersion: 1, }; vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); @@ -1064,6 +1189,7 @@ describe("GET /api/sync/pull", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], currentSyncVersion: 1, }; vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); @@ -1104,6 +1230,7 @@ describe("GET /api/sync/pull", () => { noteFieldTypes: [], notes: [], noteFieldValues: [], + crdtChanges: [], currentSyncVersion: 2, }; vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); @@ -1129,4 +1256,36 @@ describe("GET /api/sync/pull", () => { expect(res.status).toBe(400); }); + + it("returns CRDT changes in pull response", async () => { + const mockResult: SyncPullResult = { + decks: [], + cards: [], + reviewLogs: [], + noteTypes: [], + noteFieldTypes: [], + notes: [], + noteFieldValues: [], + crdtChanges: [ + { + documentId: "deck:550e8400-e29b-41d4-a716-446655440000", + entityType: "deck", + entityId: "550e8400-e29b-41d4-a716-446655440000", + binary: "SGVsbG8gV29ybGQ=", + }, + ], + currentSyncVersion: 1, + }; + vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult); + + const res = await app.request("/api/sync/pull", { + headers: { + Authorization: `Bearer ${authToken}`, + }, + }); + + expect(res.status).toBe(200); + const body = (await res.json()) as { crdtChanges?: unknown[] }; + expect(body.crdtChanges).toHaveLength(1); + }); }); diff --git a/src/server/routes/sync.ts b/src/server/routes/sync.ts index fca099b..c571f8a 100644 --- a/src/server/routes/sync.ts +++ b/src/server/routes/sync.ts @@ -1,6 +1,7 @@ import { zValidator } from "@hono/zod-validator"; import { Hono } from "hono"; import { z } from "zod"; +import { CrdtEntityType } from "../db/schema-crdt.js"; import { authMiddleware, getAuthUser } from "../middleware/index.js"; import { type SyncPullQuery, @@ -95,6 +96,25 @@ const syncNoteFieldValueSchema = z.object({ updatedAt: z.string().datetime(), }); +/** + * Schema for CRDT sync payload + * Used for conflict-free synchronization of entity data + */ +const crdtSyncPayloadSchema = z.object({ + documentId: z.string().min(1), + entityType: z.enum([ + CrdtEntityType.Deck, + CrdtEntityType.NoteType, + CrdtEntityType.NoteFieldType, + CrdtEntityType.Note, + CrdtEntityType.NoteFieldValue, + CrdtEntityType.Card, + CrdtEntityType.ReviewLog, + ]), + entityId: z.uuid(), + binary: z.string().min(1), // Base64-encoded Automerge binary +}); + const syncPushSchema = z.object({ decks: z.array(syncDeckSchema).default([]), cards: z.array(syncCardSchema).default([]), @@ -103,6 +123,7 @@ const syncPushSchema = z.object({ noteFieldTypes: z.array(syncNoteFieldTypeSchema).default([]), notes: z.array(syncNoteSchema).default([]), noteFieldValues: z.array(syncNoteFieldValueSchema).default([]), + crdtChanges: z.array(crdtSyncPayloadSchema).default([]), }); const syncPullQuerySchema = z.object({ |
