aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2025-12-31 15:32:58 +0900
committernsfisis <nsfisis@gmail.com>2025-12-31 15:32:58 +0900
commitf3952a509b2d98a25cbb80c9ad091b3b471be52e (patch)
tree23fdc7a024ab028f5c7d5fca2d7ea8e66073d3df
parenta1383a9304ff457d6671e12ded4265b135256004 (diff)
downloadkioku-f3952a509b2d98a25cbb80c9ad091b3b471be52e.tar.gz
kioku-f3952a509b2d98a25cbb80c9ad091b3b471be52e.tar.zst
kioku-f3952a509b2d98a25cbb80c9ad091b3b471be52e.zip
feat(crdt): handle crdtChanges in sync pull response
Add applyCrdtChanges function to process CRDT payloads received from the server during pull operations. The function merges remote documents with local ones using Automerge and stores the result in sync state. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
-rw-r--r--docs/dev/roadmap.md2
-rw-r--r--src/client/sync/pull.test.ts491
-rw-r--r--src/client/sync/pull.ts158
3 files changed, 650 insertions, 1 deletions
diff --git a/docs/dev/roadmap.md b/docs/dev/roadmap.md
index d17ba41..9251e15 100644
--- a/docs/dev/roadmap.md
+++ b/docs/dev/roadmap.md
@@ -24,7 +24,7 @@ Replace the current Last-Write-Wins (LWW) conflict resolution with Automerge CRD
### Phase 3: Modify Sync Protocol
- [x] Modify `src/client/sync/push.ts` - Add crdtChanges to push payload
-- [ ] Modify `src/client/sync/pull.ts` - Handle crdtChanges in pull response
+- [x] Modify `src/client/sync/pull.ts` - Handle crdtChanges in pull response
- [ ] Modify `src/client/sync/conflict.ts` - Replace LWW with Automerge merge
- [ ] Modify `src/client/sync/manager.ts` - Integrate CRDT sync flow
diff --git a/src/client/sync/pull.test.ts b/src/client/sync/pull.test.ts
index dd562a0..8bbf7cf 100644
--- a/src/client/sync/pull.test.ts
+++ b/src/client/sync/pull.test.ts
@@ -13,6 +13,15 @@ import {
localNoteTypeRepository,
} from "../db/repositories";
import {
+ binaryToBase64,
+ CrdtEntityType,
+ crdtDeckRepository,
+ crdtNoteTypeRepository,
+ crdtSyncDb,
+ crdtSyncStateManager,
+} from "./crdt";
+import {
+ applyCrdtChanges,
PullService,
pullResultToLocalData,
type SyncPullResult,
@@ -1151,3 +1160,485 @@ describe("PullService", () => {
});
});
});
+
+describe("applyCrdtChanges", () => {
+ beforeEach(async () => {
+ await crdtSyncDb.syncState.clear();
+ await crdtSyncDb.metadata.clear();
+ });
+
+ afterEach(async () => {
+ await crdtSyncDb.syncState.clear();
+ await crdtSyncDb.metadata.clear();
+ });
+
+ it("should process CRDT payload for a new deck", async () => {
+ // Create a CRDT document from a deck
+ const deck = {
+ id: "deck-1",
+ userId: "user-1",
+ name: "Test Deck",
+ description: "A test description",
+ newCardsPerDay: 20,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-02T15:30:00Z"),
+ deletedAt: null,
+ syncVersion: 5,
+ _synced: false as const,
+ };
+ const crdtResult = crdtDeckRepository.toCrdtDocument(deck);
+
+ const payload = {
+ documentId: crdtResult.documentId,
+ entityType: CrdtEntityType.Deck,
+ entityId: deck.id,
+ binary: binaryToBase64(crdtResult.binary),
+ };
+
+ const result = await applyCrdtChanges([payload], 5);
+
+ expect(result.created).toBe(1);
+ expect(result.merged).toBe(0);
+ expect(result.entities.decks).toHaveLength(1);
+ expect(result.entities.decks[0]?.id).toBe("deck-1");
+ expect(result.entities.decks[0]?.name).toBe("Test Deck");
+ expect(result.entities.decks[0]?.description).toBe("A test description");
+ });
+
+ it("should merge CRDT payload with existing local document", async () => {
+ // Create an initial local CRDT document
+ const localDeck = {
+ id: "deck-1",
+ userId: "user-1",
+ name: "Local Deck",
+ description: "Local description",
+ newCardsPerDay: 10,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-01T12:00:00Z"),
+ deletedAt: null,
+ syncVersion: 1,
+ _synced: true as const,
+ };
+ const localCrdtResult = crdtDeckRepository.toCrdtDocument(localDeck);
+
+ // Store the local CRDT binary
+ await crdtSyncStateManager.setDocumentBinary(
+ CrdtEntityType.Deck,
+ localDeck.id,
+ localCrdtResult.binary,
+ 1,
+ );
+
+ // Create a remote CRDT document with updated data
+ const remoteDeck = {
+ id: "deck-1",
+ userId: "user-1",
+ name: "Remote Deck",
+ description: "Remote description",
+ newCardsPerDay: 25,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-02T15:30:00Z"), // Later timestamp
+ deletedAt: null,
+ syncVersion: 5,
+ _synced: false as const,
+ };
+ const remoteCrdtResult = crdtDeckRepository.toCrdtDocument(remoteDeck);
+
+ const payload = {
+ documentId: remoteCrdtResult.documentId,
+ entityType: CrdtEntityType.Deck,
+ entityId: remoteDeck.id,
+ binary: binaryToBase64(remoteCrdtResult.binary),
+ };
+
+ const result = await applyCrdtChanges([payload], 5);
+
+ expect(result.created).toBe(0);
+ expect(result.merged).toBe(1);
+ expect(result.entities.decks).toHaveLength(1);
+ // The merged result should reflect the remote changes
+ expect(result.entities.decks[0]?.id).toBe("deck-1");
+ });
+
+ it("should process multiple CRDT payloads", async () => {
+ const deck1 = {
+ id: "deck-1",
+ userId: "user-1",
+ name: "Deck 1",
+ description: null,
+ newCardsPerDay: 20,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-02T15:30:00Z"),
+ deletedAt: null,
+ syncVersion: 5,
+ _synced: false as const,
+ };
+ const deck2 = {
+ id: "deck-2",
+ userId: "user-1",
+ name: "Deck 2",
+ description: "Second deck",
+ newCardsPerDay: 15,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-02T15:30:00Z"),
+ deletedAt: null,
+ syncVersion: 5,
+ _synced: false as const,
+ };
+
+ const crdtResult1 = crdtDeckRepository.toCrdtDocument(deck1);
+ const crdtResult2 = crdtDeckRepository.toCrdtDocument(deck2);
+
+ const payloads = [
+ {
+ documentId: crdtResult1.documentId,
+ entityType: CrdtEntityType.Deck,
+ entityId: deck1.id,
+ binary: binaryToBase64(crdtResult1.binary),
+ },
+ {
+ documentId: crdtResult2.documentId,
+ entityType: CrdtEntityType.Deck,
+ entityId: deck2.id,
+ binary: binaryToBase64(crdtResult2.binary),
+ },
+ ];
+
+ const result = await applyCrdtChanges(payloads, 5);
+
+ expect(result.created).toBe(2);
+ expect(result.merged).toBe(0);
+ expect(result.entities.decks).toHaveLength(2);
+ });
+
+ it("should process CRDT payloads for different entity types", async () => {
+ const deck = {
+ id: "deck-1",
+ userId: "user-1",
+ name: "Test Deck",
+ description: null,
+ newCardsPerDay: 20,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-02T15:30:00Z"),
+ deletedAt: null,
+ syncVersion: 5,
+ _synced: false as const,
+ };
+
+ const noteType = {
+ id: "note-type-1",
+ userId: "user-1",
+ name: "Basic",
+ frontTemplate: "{{Front}}",
+ backTemplate: "{{Back}}",
+ isReversible: false,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-02T15:30:00Z"),
+ deletedAt: null,
+ syncVersion: 5,
+ _synced: false as const,
+ };
+
+ const deckCrdt = crdtDeckRepository.toCrdtDocument(deck);
+ const noteTypeCrdt = crdtNoteTypeRepository.toCrdtDocument(noteType);
+
+ const payloads = [
+ {
+ documentId: deckCrdt.documentId,
+ entityType: CrdtEntityType.Deck,
+ entityId: deck.id,
+ binary: binaryToBase64(deckCrdt.binary),
+ },
+ {
+ documentId: noteTypeCrdt.documentId,
+ entityType: CrdtEntityType.NoteType,
+ entityId: noteType.id,
+ binary: binaryToBase64(noteTypeCrdt.binary),
+ },
+ ];
+
+ const result = await applyCrdtChanges(payloads, 5);
+
+ expect(result.created).toBe(2);
+ expect(result.entities.decks).toHaveLength(1);
+ expect(result.entities.noteTypes).toHaveLength(1);
+ expect(result.entities.decks[0]?.name).toBe("Test Deck");
+ expect(result.entities.noteTypes[0]?.name).toBe("Basic");
+ });
+
+ it("should store merged binary in sync state", async () => {
+ const deck = {
+ id: "deck-1",
+ userId: "user-1",
+ name: "Test Deck",
+ description: null,
+ newCardsPerDay: 20,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-02T15:30:00Z"),
+ deletedAt: null,
+ syncVersion: 5,
+ _synced: false as const,
+ };
+ const crdtResult = crdtDeckRepository.toCrdtDocument(deck);
+
+ const payload = {
+ documentId: crdtResult.documentId,
+ entityType: CrdtEntityType.Deck,
+ entityId: deck.id,
+ binary: binaryToBase64(crdtResult.binary),
+ };
+
+ await applyCrdtChanges([payload], 5);
+
+ // Verify the binary was stored in sync state
+ const storedBinary = await crdtSyncStateManager.getDocumentBinary(
+ CrdtEntityType.Deck,
+ deck.id,
+ );
+
+ expect(storedBinary).toBeDefined();
+ // Check that it's a typed array with length > 0
+ expect(storedBinary?.length).toBeGreaterThan(0);
+ });
+
+ it("should skip invalid document IDs", async () => {
+ const consoleWarn = vi.spyOn(console, "warn").mockImplementation(() => {});
+
+ const payload = {
+ documentId: "invalid-format",
+ entityType: CrdtEntityType.Deck,
+ entityId: "deck-1",
+ binary: "SGVsbG8=", // "Hello" in base64
+ };
+
+ const result = await applyCrdtChanges([payload], 5);
+
+ expect(result.created).toBe(0);
+ expect(result.merged).toBe(0);
+ expect(result.entities.decks).toHaveLength(0);
+ expect(consoleWarn).toHaveBeenCalledWith(
+ "Invalid document ID: invalid-format",
+ );
+
+ consoleWarn.mockRestore();
+ });
+
+ it("should return empty result for empty payloads", async () => {
+ const result = await applyCrdtChanges([], 5);
+
+ expect(result.created).toBe(0);
+ expect(result.merged).toBe(0);
+ expect(result.entities.decks).toHaveLength(0);
+ expect(result.entities.noteTypes).toHaveLength(0);
+ expect(result.entities.cards).toHaveLength(0);
+ });
+});
+
+describe("PullService with CRDT changes", () => {
+ let syncQueue: SyncQueue;
+
+ beforeEach(async () => {
+ await db.decks.clear();
+ await db.cards.clear();
+ await db.reviewLogs.clear();
+ await db.noteTypes.clear();
+ await db.noteFieldTypes.clear();
+ await db.notes.clear();
+ await db.noteFieldValues.clear();
+ await crdtSyncDb.syncState.clear();
+ await crdtSyncDb.metadata.clear();
+ localStorage.clear();
+ syncQueue = new SyncQueue();
+ });
+
+ afterEach(async () => {
+ await db.decks.clear();
+ await db.cards.clear();
+ await db.reviewLogs.clear();
+ await db.noteTypes.clear();
+ await db.noteFieldTypes.clear();
+ await db.notes.clear();
+ await db.noteFieldValues.clear();
+ await crdtSyncDb.syncState.clear();
+ await crdtSyncDb.metadata.clear();
+ localStorage.clear();
+ });
+
+ it("should process CRDT changes when present in pull response", async () => {
+ const deck = {
+ id: "deck-1",
+ userId: "user-1",
+ name: "CRDT Deck",
+ description: null,
+ newCardsPerDay: 20,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-02T15:30:00Z"),
+ deletedAt: null,
+ syncVersion: 5,
+ _synced: false as const,
+ };
+ const crdtResult = crdtDeckRepository.toCrdtDocument(deck);
+
+ const pullFromServer = vi.fn().mockResolvedValue({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ crdtChanges: [
+ {
+ documentId: crdtResult.documentId,
+ entityType: CrdtEntityType.Deck,
+ entityId: deck.id,
+ binary: binaryToBase64(crdtResult.binary),
+ },
+ ],
+ currentSyncVersion: 5,
+ });
+
+ const pullService = new PullService({
+ syncQueue,
+ pullFromServer,
+ });
+
+ await pullService.pull();
+
+ // Verify CRDT binary was stored
+ const storedBinary = await crdtSyncStateManager.getDocumentBinary(
+ CrdtEntityType.Deck,
+ deck.id,
+ );
+ expect(storedBinary).toBeDefined();
+ });
+
+ it("should handle pull response without CRDT changes", async () => {
+ const pullFromServer = vi.fn().mockResolvedValue({
+ decks: [
+ {
+ id: "deck-1",
+ userId: "user-1",
+ name: "Test Deck",
+ description: null,
+ newCardsPerDay: 20,
+ createdAt: new Date(),
+ updatedAt: new Date(),
+ deletedAt: null,
+ syncVersion: 1,
+ },
+ ],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ // No crdtChanges field
+ currentSyncVersion: 1,
+ });
+
+ const pullService = new PullService({
+ syncQueue,
+ pullFromServer,
+ });
+
+ // Should not throw even without crdtChanges
+ const result = await pullService.pull();
+
+ expect(result.decks).toHaveLength(1);
+ expect(syncQueue.getLastSyncVersion()).toBe(1);
+ });
+
+ it("should handle empty CRDT changes array", async () => {
+ const pullFromServer = vi.fn().mockResolvedValue({
+ decks: [],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ crdtChanges: [],
+ currentSyncVersion: 1,
+ });
+
+ const pullService = new PullService({
+ syncQueue,
+ pullFromServer,
+ });
+
+ const result = await pullService.pull();
+
+ expect(result.crdtChanges).toHaveLength(0);
+ expect(syncQueue.getLastSyncVersion()).toBe(1);
+ });
+
+ it("should process both regular data and CRDT changes", async () => {
+ // Create CRDT payload for a note type
+ const noteType = {
+ id: "note-type-1",
+ userId: "user-1",
+ name: "CRDT NoteType",
+ frontTemplate: "{{Front}}",
+ backTemplate: "{{Back}}",
+ isReversible: true,
+ createdAt: new Date("2024-01-01T10:00:00Z"),
+ updatedAt: new Date("2024-01-02T15:30:00Z"),
+ deletedAt: null,
+ syncVersion: 5,
+ _synced: false as const,
+ };
+ const crdtResult = crdtNoteTypeRepository.toCrdtDocument(noteType);
+
+ const pullFromServer = vi.fn().mockResolvedValue({
+ decks: [
+ {
+ id: "deck-1",
+ userId: "user-1",
+ name: "Regular Deck",
+ description: null,
+ newCardsPerDay: 20,
+ createdAt: new Date(),
+ updatedAt: new Date(),
+ deletedAt: null,
+ syncVersion: 5,
+ },
+ ],
+ cards: [],
+ reviewLogs: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ crdtChanges: [
+ {
+ documentId: crdtResult.documentId,
+ entityType: CrdtEntityType.NoteType,
+ entityId: noteType.id,
+ binary: binaryToBase64(crdtResult.binary),
+ },
+ ],
+ currentSyncVersion: 5,
+ });
+
+ const pullService = new PullService({
+ syncQueue,
+ pullFromServer,
+ });
+
+ await pullService.pull();
+
+ // Verify regular deck was applied
+ const storedDeck = await localDeckRepository.findById("deck-1");
+ expect(storedDeck).toBeDefined();
+ expect(storedDeck?.name).toBe("Regular Deck");
+
+ // Verify CRDT binary was stored for note type
+ const storedBinary = await crdtSyncStateManager.getDocumentBinary(
+ CrdtEntityType.NoteType,
+ noteType.id,
+ );
+ expect(storedBinary).toBeDefined();
+ });
+});
diff --git a/src/client/sync/pull.ts b/src/client/sync/pull.ts
index 8b55a9b..4771757 100644
--- a/src/client/sync/pull.ts
+++ b/src/client/sync/pull.ts
@@ -10,6 +10,16 @@ import type {
LocalReviewLog,
RatingType,
} from "../db/index";
+import {
+ base64ToBinary,
+ CrdtEntityType,
+ type CrdtEntityTypeValue,
+ type CrdtSyncPayload,
+ crdtSyncStateManager,
+ getCrdtRepository,
+ mergeAndConvert,
+ parseDocumentId,
+} from "./crdt";
import type { SyncQueue } from "./queue";
/**
@@ -136,6 +146,8 @@ export interface SyncPullResult {
noteFieldTypes: ServerNoteFieldType[];
notes: ServerNote[];
noteFieldValues: ServerNoteFieldValue[];
+ /** CRDT document changes for conflict-free sync */
+ crdtChanges?: CrdtSyncPayload[];
currentSyncVersion: number;
}
@@ -309,6 +321,147 @@ export function pullResultToLocalData(result: SyncPullResult): {
}
/**
+ * Result of applying CRDT changes
+ */
+export interface ApplyCrdtChangesResult {
+ /** Number of documents that were merged */
+ merged: number;
+ /** Number of documents that were new (no local version) */
+ created: number;
+ /** Merged entities grouped by type */
+ entities: {
+ decks: Omit<LocalDeck, "_synced">[];
+ noteTypes: Omit<LocalNoteType, "_synced">[];
+ noteFieldTypes: Omit<LocalNoteFieldType, "_synced">[];
+ notes: Omit<LocalNote, "_synced">[];
+ noteFieldValues: Omit<LocalNoteFieldValue, "_synced">[];
+ cards: Omit<LocalCard, "_synced">[];
+ reviewLogs: Omit<LocalReviewLog, "_synced">[];
+ };
+}
+
+/**
+ * Apply CRDT changes from pull response
+ *
+ * This function:
+ * 1. Parses each CRDT payload from the server
+ * 2. Loads existing local CRDT document if present
+ * 3. Merges remote and local documents using Automerge
+ * 4. Stores the merged binary in sync state
+ * 5. Converts merged document to local entity format
+ *
+ * @param crdtChanges Array of CRDT payloads from server
+ * @param syncVersion The sync version to associate with merged documents
+ * @returns Result containing merged entities ready for database storage
+ */
+export async function applyCrdtChanges(
+ crdtChanges: CrdtSyncPayload[],
+ syncVersion: number,
+): Promise<ApplyCrdtChangesResult> {
+ const result: ApplyCrdtChangesResult = {
+ merged: 0,
+ created: 0,
+ entities: {
+ decks: [],
+ noteTypes: [],
+ noteFieldTypes: [],
+ notes: [],
+ noteFieldValues: [],
+ cards: [],
+ reviewLogs: [],
+ },
+ };
+
+ // Process each CRDT payload
+ for (const payload of crdtChanges) {
+ const parsed = parseDocumentId(payload.documentId);
+ if (!parsed) {
+ console.warn(`Invalid document ID: ${payload.documentId}`);
+ continue;
+ }
+
+ const { entityType, entityId } = parsed;
+
+ // Load existing local CRDT binary if present
+ const localBinary = await crdtSyncStateManager.getDocumentBinary(
+ entityType,
+ entityId,
+ );
+
+ // Convert remote payload binary from base64
+ const remoteBinary = base64ToBinary(payload.binary);
+
+ // Get the repository and merge based on entity type
+ // We use type assertions here because getCrdtRepository returns a union type
+ // but we know the specific type based on entityType
+ const repository = getCrdtRepository(entityType);
+ const mergeResult = mergeAndConvert(
+ localBinary,
+ remoteBinary,
+ // biome-ignore lint/suspicious/noExplicitAny: Repository type is determined by entityType at runtime
+ repository as any,
+ );
+
+ // Store the merged binary in sync state
+ await crdtSyncStateManager.setDocumentBinary(
+ entityType,
+ entityId,
+ mergeResult.binary,
+ syncVersion,
+ );
+
+ // Track statistics
+ if (localBinary === null) {
+ result.created++;
+ } else {
+ result.merged++;
+ }
+
+ // Add entity to the appropriate array based on type
+ addEntityToResult(result.entities, entityType, mergeResult.entity);
+ }
+
+ return result;
+}
+
+/**
+ * Helper to add an entity to the result based on its type
+ */
+function addEntityToResult(
+ entities: ApplyCrdtChangesResult["entities"],
+ entityType: CrdtEntityTypeValue,
+ entity: unknown,
+): void {
+ switch (entityType) {
+ case CrdtEntityType.Deck:
+ entities.decks.push(entity as Omit<LocalDeck, "_synced">);
+ break;
+ case CrdtEntityType.NoteType:
+ entities.noteTypes.push(entity as Omit<LocalNoteType, "_synced">);
+ break;
+ case CrdtEntityType.NoteFieldType:
+ entities.noteFieldTypes.push(
+ entity as Omit<LocalNoteFieldType, "_synced">,
+ );
+ break;
+ case CrdtEntityType.Note:
+ entities.notes.push(entity as Omit<LocalNote, "_synced">);
+ break;
+ case CrdtEntityType.NoteFieldValue:
+ entities.noteFieldValues.push(
+ entity as Omit<LocalNoteFieldValue, "_synced">,
+ );
+ break;
+ case CrdtEntityType.Card:
+ entities.cards.push(entity as Omit<LocalCard, "_synced">);
+ break;
+ case CrdtEntityType.ReviewLog:
+ entities.reviewLogs.push(entity as Omit<LocalReviewLog, "_synced">);
+ break;
+ }
+}
+
+/**
* Pull sync service
*
* Handles pulling changes from the server:
@@ -339,6 +492,11 @@ export class PullService {
// Pull changes from server
const result = await this.pullFromServer(lastSyncVersion);
+ // Process CRDT changes if present
+ if (result.crdtChanges && result.crdtChanges.length > 0) {
+ await applyCrdtChanges(result.crdtChanges, result.currentSyncVersion);
+ }
+
// If there are changes, apply them to local database
if (
result.decks.length > 0 ||