aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2025-12-31 16:20:40 +0900
committernsfisis <nsfisis@gmail.com>2025-12-31 16:20:40 +0900
commit26b6408c84bfcc46f3d470292688e4ffaf0265f2 (patch)
tree52160cdfab669ec0a20886e9f5bd9151f3eeebd3
parente4aeded6c105de6c8af6a931d5c24a659dcbd138 (diff)
downloadkioku-26b6408c84bfcc46f3d470292688e4ffaf0265f2.tar.gz
kioku-26b6408c84bfcc46f3d470292688e4ffaf0265f2.tar.zst
kioku-26b6408c84bfcc46f3d470292688e4ffaf0265f2.zip
feat(crdt): add server-side CRDT sync API handling
Add crdtChanges field to sync push/pull endpoints for CRDT document synchronization. The server now stores and retrieves CRDT binaries from the crdt_documents table, enabling conflict-free sync between clients. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
-rw-r--r--docs/dev/roadmap.md4
-rw-r--r--src/server/repositories/sync.ts123
-rw-r--r--src/server/routes/sync.test.ts159
-rw-r--r--src/server/routes/sync.ts21
4 files changed, 305 insertions, 2 deletions
diff --git a/docs/dev/roadmap.md b/docs/dev/roadmap.md
index 36e17e6..df4e695 100644
--- a/docs/dev/roadmap.md
+++ b/docs/dev/roadmap.md
@@ -33,8 +33,8 @@ Replace the current Last-Write-Wins (LWW) conflict resolution with Automerge CRD
- [x] Install server dependency: `@automerge/automerge`
- [x] Create `src/server/db/schema-crdt.ts` - CRDT document storage schema
- [x] Create database migration for crdt_documents table
-- [ ] Modify `src/server/routes/sync.ts` - Handle CRDT changes in API
-- [ ] Modify `src/server/repositories/sync.ts` - Store/merge CRDT documents
+- [x] Modify `src/server/routes/sync.ts` - Handle CRDT changes in API
+- [x] Modify `src/server/repositories/sync.ts` - Store/merge CRDT documents
### Phase 5: Migration
diff --git a/src/server/repositories/sync.ts b/src/server/repositories/sync.ts
index 59a195a..188bd1b 100644
--- a/src/server/repositories/sync.ts
+++ b/src/server/repositories/sync.ts
@@ -9,6 +9,7 @@ import {
noteTypes,
reviewLogs,
} from "../db/schema.js";
+import { type CrdtEntityTypeValue, crdtDocuments } from "../db/schema-crdt.js";
import type {
Card,
Deck,
@@ -20,6 +21,20 @@ import type {
} from "./types.js";
/**
+ * CRDT sync payload for conflict-free synchronization
+ */
+export interface CrdtSyncPayload {
+ /** Document ID in format entityType:entityId */
+ documentId: string;
+ /** Entity type */
+ entityType: CrdtEntityTypeValue;
+ /** Entity ID */
+ entityId: string;
+ /** Base64-encoded CRDT binary */
+ binary: string;
+}
+
+/**
* Sync data types for push/pull operations
*/
export interface SyncPushData {
@@ -30,6 +45,8 @@ export interface SyncPushData {
noteFieldTypes: SyncNoteFieldTypeData[];
notes: SyncNoteData[];
noteFieldValues: SyncNoteFieldValueData[];
+ /** CRDT document changes for conflict-free sync */
+ crdtChanges?: CrdtSyncPayload[];
}
export interface SyncDeckData {
@@ -122,6 +139,12 @@ export interface SyncPushResult {
noteFieldTypes: { id: string; syncVersion: number }[];
notes: { id: string; syncVersion: number }[];
noteFieldValues: { id: string; syncVersion: number }[];
+ /** CRDT documents that were stored/merged */
+ crdtChanges: {
+ entityType: CrdtEntityTypeValue;
+ entityId: string;
+ syncVersion: number;
+ }[];
conflicts: {
decks: string[];
cards: string[];
@@ -144,6 +167,8 @@ export interface SyncPullResult {
noteFieldTypes: NoteFieldType[];
notes: Note[];
noteFieldValues: NoteFieldValue[];
+ /** CRDT document changes for conflict-free sync */
+ crdtChanges: CrdtSyncPayload[];
currentSyncVersion: number;
}
@@ -165,6 +190,7 @@ export const syncRepository: SyncRepository = {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -768,6 +794,78 @@ export const syncRepository: SyncRepository = {
}
}
+ // Process CRDT changes
+ if (data.crdtChanges && data.crdtChanges.length > 0) {
+ for (const crdtChange of data.crdtChanges) {
+ // Check if document exists
+ const existing = await db
+ .select({
+ id: crdtDocuments.id,
+ syncVersion: crdtDocuments.syncVersion,
+ })
+ .from(crdtDocuments)
+ .where(
+ and(
+ eq(crdtDocuments.userId, userId),
+ eq(crdtDocuments.entityType, crdtChange.entityType),
+ eq(crdtDocuments.entityId, crdtChange.entityId),
+ ),
+ );
+
+ if (existing.length === 0) {
+ // New document - insert
+ const [inserted] = await db
+ .insert(crdtDocuments)
+ .values({
+ userId,
+ entityType: crdtChange.entityType,
+ entityId: crdtChange.entityId,
+ binary: crdtChange.binary,
+ syncVersion: 1,
+ })
+ .returning({
+ syncVersion: crdtDocuments.syncVersion,
+ });
+
+ if (inserted) {
+ result.crdtChanges.push({
+ entityType: crdtChange.entityType as CrdtEntityTypeValue,
+ entityId: crdtChange.entityId,
+ syncVersion: inserted.syncVersion,
+ });
+ }
+ } else {
+ // Existing document - update (store new binary, merge happens on client)
+ // For now, we just store the latest binary. Future: server-side Automerge merge
+ const [updated] = await db
+ .update(crdtDocuments)
+ .set({
+ binary: crdtChange.binary,
+ updatedAt: new Date(),
+ syncVersion: sql`${crdtDocuments.syncVersion} + 1`,
+ })
+ .where(
+ and(
+ eq(crdtDocuments.userId, userId),
+ eq(crdtDocuments.entityType, crdtChange.entityType),
+ eq(crdtDocuments.entityId, crdtChange.entityId),
+ ),
+ )
+ .returning({
+ syncVersion: crdtDocuments.syncVersion,
+ });
+
+ if (updated) {
+ result.crdtChanges.push({
+ entityType: crdtChange.entityType as CrdtEntityTypeValue,
+ entityId: crdtChange.entityId,
+ syncVersion: updated.syncVersion,
+ });
+ }
+ }
+ }
+ }
+
return result;
},
@@ -882,6 +980,25 @@ export const syncRepository: SyncRepository = {
);
}
+ // Get all CRDT documents for user with syncVersion > lastSyncVersion
+ const pulledCrdtDocs = await db
+ .select()
+ .from(crdtDocuments)
+ .where(
+ and(
+ eq(crdtDocuments.userId, userId),
+ gt(crdtDocuments.syncVersion, lastSyncVersion),
+ ),
+ );
+
+ // Convert CRDT documents to sync payload format
+ const pulledCrdtChanges: CrdtSyncPayload[] = pulledCrdtDocs.map((doc) => ({
+ documentId: `${doc.entityType}:${doc.entityId}`,
+ entityType: doc.entityType as CrdtEntityTypeValue,
+ entityId: doc.entityId,
+ binary: doc.binary,
+ }));
+
// Calculate current max sync version across all entities
let currentSyncVersion = lastSyncVersion;
@@ -920,6 +1037,11 @@ export const syncRepository: SyncRepository = {
currentSyncVersion = fieldValue.syncVersion;
}
}
+ for (const crdtDoc of pulledCrdtDocs) {
+ if (crdtDoc.syncVersion > currentSyncVersion) {
+ currentSyncVersion = crdtDoc.syncVersion;
+ }
+ }
return {
decks: pulledDecks,
@@ -929,6 +1051,7 @@ export const syncRepository: SyncRepository = {
noteFieldTypes: pulledNoteFieldTypes,
notes: pulledNotes,
noteFieldValues: pulledNoteFieldValues,
+ crdtChanges: pulledCrdtChanges,
currentSyncVersion,
};
},
diff --git a/src/server/routes/sync.test.ts b/src/server/routes/sync.test.ts
index f340af7..8ea2ce3 100644
--- a/src/server/routes/sync.test.ts
+++ b/src/server/routes/sync.test.ts
@@ -96,6 +96,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -133,6 +134,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
});
});
@@ -155,6 +157,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -212,6 +215,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -260,6 +264,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -311,6 +316,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: ["550e8400-e29b-41d4-a716-446655440003"],
cards: [],
@@ -474,6 +480,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -527,6 +534,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -590,6 +598,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -639,6 +648,7 @@ describe("POST /api/sync/push", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
conflicts: {
decks: [],
cards: [],
@@ -667,6 +677,114 @@ describe("POST /api/sync/push", () => {
const body = (await res.json()) as SyncPushResponse;
expect(body.decks).toHaveLength(1);
});
+
+ it("successfully pushes CRDT changes", async () => {
+ const crdtChange = {
+ documentId: "deck:550e8400-e29b-41d4-a716-446655440000",
+ entityType: "deck" as const,
+ entityId: "550e8400-e29b-41d4-a716-446655440000",
+ binary: "SGVsbG8gV29ybGQ=", // Base64 encoded test data
+ };
+
+ const mockResult: SyncPushResult = {
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ crdtChanges: [
+ {
+ entityType: "deck",
+ entityId: "550e8400-e29b-41d4-a716-446655440000",
+ syncVersion: 1,
+ },
+ ],
+ conflicts: {
+ decks: [],
+ cards: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ },
+ };
+ vi.mocked(mockSyncRepo.pushChanges).mockResolvedValue(mockResult);
+
+ const res = await app.request("/api/sync/push", {
+ method: "POST",
+ headers: {
+ Authorization: `Bearer ${authToken}`,
+ "Content-Type": "application/json",
+ },
+ body: JSON.stringify({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ crdtChanges: [crdtChange],
+ }),
+ });
+
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { crdtChanges?: unknown[] };
+ expect(body.crdtChanges).toHaveLength(1);
+ expect(mockSyncRepo.pushChanges).toHaveBeenCalledWith(
+ userId,
+ expect.objectContaining({
+ crdtChanges: [crdtChange],
+ }),
+ );
+ });
+
+ it("validates CRDT changes have required fields", async () => {
+ const invalidCrdtChange = {
+ documentId: "deck:123",
+ entityType: "deck",
+ // missing entityId and binary
+ };
+
+ const res = await app.request("/api/sync/push", {
+ method: "POST",
+ headers: {
+ Authorization: `Bearer ${authToken}`,
+ "Content-Type": "application/json",
+ },
+ body: JSON.stringify({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ crdtChanges: [invalidCrdtChange],
+ }),
+ });
+
+ expect(res.status).toBe(400);
+ });
+
+ it("validates CRDT entity type is valid", async () => {
+ const invalidCrdtChange = {
+ documentId: "invalid:550e8400-e29b-41d4-a716-446655440000",
+ entityType: "invalidType",
+ entityId: "550e8400-e29b-41d4-a716-446655440000",
+ binary: "SGVsbG8gV29ybGQ=",
+ };
+
+ const res = await app.request("/api/sync/push", {
+ method: "POST",
+ headers: {
+ Authorization: `Bearer ${authToken}`,
+ "Content-Type": "application/json",
+ },
+ body: JSON.stringify({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ crdtChanges: [invalidCrdtChange],
+ }),
+ });
+
+ expect(res.status).toBe(400);
+ });
});
interface SyncPullResponse {
@@ -727,6 +845,7 @@ describe("GET /api/sync/pull", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
currentSyncVersion: 1,
};
vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
@@ -761,6 +880,7 @@ describe("GET /api/sync/pull", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
currentSyncVersion: 5,
};
vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
@@ -801,6 +921,7 @@ describe("GET /api/sync/pull", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
currentSyncVersion: 2,
};
vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
@@ -850,6 +971,7 @@ describe("GET /api/sync/pull", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
currentSyncVersion: 3,
};
vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
@@ -892,6 +1014,7 @@ describe("GET /api/sync/pull", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
currentSyncVersion: 1,
};
vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
@@ -968,6 +1091,7 @@ describe("GET /api/sync/pull", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
currentSyncVersion: 3,
};
vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
@@ -1012,6 +1136,7 @@ describe("GET /api/sync/pull", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
currentSyncVersion: 1,
};
vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
@@ -1064,6 +1189,7 @@ describe("GET /api/sync/pull", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
currentSyncVersion: 1,
};
vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
@@ -1104,6 +1230,7 @@ describe("GET /api/sync/pull", () => {
noteFieldTypes: [],
notes: [],
noteFieldValues: [],
+ crdtChanges: [],
currentSyncVersion: 2,
};
vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
@@ -1129,4 +1256,36 @@ describe("GET /api/sync/pull", () => {
expect(res.status).toBe(400);
});
+
+ it("returns CRDT changes in pull response", async () => {
+ const mockResult: SyncPullResult = {
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ crdtChanges: [
+ {
+ documentId: "deck:550e8400-e29b-41d4-a716-446655440000",
+ entityType: "deck",
+ entityId: "550e8400-e29b-41d4-a716-446655440000",
+ binary: "SGVsbG8gV29ybGQ=",
+ },
+ ],
+ currentSyncVersion: 1,
+ };
+ vi.mocked(mockSyncRepo.pullChanges).mockResolvedValue(mockResult);
+
+ const res = await app.request("/api/sync/pull", {
+ headers: {
+ Authorization: `Bearer ${authToken}`,
+ },
+ });
+
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { crdtChanges?: unknown[] };
+ expect(body.crdtChanges).toHaveLength(1);
+ });
});
diff --git a/src/server/routes/sync.ts b/src/server/routes/sync.ts
index fca099b..c571f8a 100644
--- a/src/server/routes/sync.ts
+++ b/src/server/routes/sync.ts
@@ -1,6 +1,7 @@
import { zValidator } from "@hono/zod-validator";
import { Hono } from "hono";
import { z } from "zod";
+import { CrdtEntityType } from "../db/schema-crdt.js";
import { authMiddleware, getAuthUser } from "../middleware/index.js";
import {
type SyncPullQuery,
@@ -95,6 +96,25 @@ const syncNoteFieldValueSchema = z.object({
updatedAt: z.string().datetime(),
});
+/**
+ * Schema for CRDT sync payload
+ * Used for conflict-free synchronization of entity data
+ */
+const crdtSyncPayloadSchema = z.object({
+ documentId: z.string().min(1),
+ entityType: z.enum([
+ CrdtEntityType.Deck,
+ CrdtEntityType.NoteType,
+ CrdtEntityType.NoteFieldType,
+ CrdtEntityType.Note,
+ CrdtEntityType.NoteFieldValue,
+ CrdtEntityType.Card,
+ CrdtEntityType.ReviewLog,
+ ]),
+ entityId: z.uuid(),
+ binary: z.string().min(1), // Base64-encoded Automerge binary
+});
+
const syncPushSchema = z.object({
decks: z.array(syncDeckSchema).default([]),
cards: z.array(syncCardSchema).default([]),
@@ -103,6 +123,7 @@ const syncPushSchema = z.object({
noteFieldTypes: z.array(syncNoteFieldTypeSchema).default([]),
notes: z.array(syncNoteSchema).default([]),
noteFieldValues: z.array(syncNoteFieldValueSchema).default([]),
+ crdtChanges: z.array(crdtSyncPayloadSchema).default([]),
});
const syncPullQuerySchema = z.object({