diff options
| author | nsfisis <nsfisis@gmail.com> | 2025-12-31 15:18:39 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2025-12-31 15:18:39 +0900 |
| commit | db60c5cc3e6dd2e51fce7dd946e477b3e125ba69 (patch) | |
| tree | 5ce9d4e6ca1fe7735508fce208693bbe4d2c1dc0 | |
| parent | 2e21859626e69d992d4dff21338487d372097cb0 (diff) | |
| download | kioku-db60c5cc3e6dd2e51fce7dd946e477b3e125ba69.tar.gz kioku-db60c5cc3e6dd2e51fce7dd946e477b3e125ba69.tar.zst kioku-db60c5cc3e6dd2e51fce7dd946e477b3e125ba69.zip | |
feat(crdt): add CRDT repository layer and sync state management
Add Phase 2 of the CRDT implementation:
- CRDT-aware repository wrappers for all entity types (Deck, Card, Note, etc.)
- Sync state management with IndexedDB storage for CRDT document binaries
- Base64 serialization utilities for network transport
- Comprehensive test coverage (53 new tests)
| -rw-r--r-- | docs/dev/roadmap.md | 4 | ||||
| -rw-r--r-- | src/client/sync/crdt/index.ts | 34 | ||||
| -rw-r--r-- | src/client/sync/crdt/repositories.test.ts | 531 | ||||
| -rw-r--r-- | src/client/sync/crdt/repositories.ts | 485 | ||||
| -rw-r--r-- | src/client/sync/crdt/sync-state.test.ts | 449 | ||||
| -rw-r--r-- | src/client/sync/crdt/sync-state.ts | 370 |
6 files changed, 1871 insertions, 2 deletions
diff --git a/docs/dev/roadmap.md b/docs/dev/roadmap.md index 41697a9..656b10d 100644 --- a/docs/dev/roadmap.md +++ b/docs/dev/roadmap.md @@ -18,8 +18,8 @@ Replace the current Last-Write-Wins (LWW) conflict resolution with Automerge CRD ### Phase 2: Create CRDT Repository Layer -- [ ] Create `src/client/sync/crdt/repositories.ts` - CRDT-aware repository wrappers -- [ ] Create `src/client/sync/crdt/sync-state.ts` - Sync state serialization +- [x] Create `src/client/sync/crdt/repositories.ts` - CRDT-aware repository wrappers +- [x] Create `src/client/sync/crdt/sync-state.ts` - Sync state serialization ### Phase 3: Modify Sync Protocol diff --git a/src/client/sync/crdt/index.ts b/src/client/sync/crdt/index.ts index fa296bd..4a3d600 100644 --- a/src/client/sync/crdt/index.ts +++ b/src/client/sync/crdt/index.ts @@ -47,6 +47,40 @@ export { saveIncremental, updateDocument, } from "./document-manager"; + +// CRDT-aware repository wrappers +export { + type CrdtDocumentResult, + type CrdtMergeResult, + type CrdtRepository, + crdtCardRepository, + crdtDeckRepository, + crdtNoteFieldTypeRepository, + crdtNoteFieldValueRepository, + crdtNoteRepository, + crdtNoteTypeRepository, + crdtRepositories, + crdtReviewLogRepository, + entitiesToCrdtDocuments, + getCrdtRepository, + getRepositoryForDocumentId, + mergeAndConvert, +} from "./repositories"; + +// Sync state management +export { + base64ToBinary, + binaryToBase64, + type CrdtSyncMetadata, + type CrdtSyncPayload, + type CrdtSyncStateEntry, + CrdtSyncStateManager, + crdtSyncDb, + crdtSyncStateManager, + entriesToSyncPayload, + syncPayloadToEntries, +} from "./sync-state"; + // Type definitions export { type CrdtCardDocument, diff --git a/src/client/sync/crdt/repositories.test.ts b/src/client/sync/crdt/repositories.test.ts new file mode 100644 index 0000000..f237536 --- /dev/null +++ b/src/client/sync/crdt/repositories.test.ts @@ -0,0 +1,531 @@ +import * as Automerge from "@automerge/automerge"; +import { describe, expect, it } from "vitest"; +import type { + LocalCard, + LocalDeck, + LocalNote, + LocalNoteFieldType, + LocalNoteFieldValue, + LocalNoteType, + LocalReviewLog, +} from "../../db/index"; +import { CardState, FieldType, Rating } from "../../db/index"; +import { saveDocument } from "./document-manager"; +import { + crdtCardRepository, + crdtDeckRepository, + crdtNoteFieldTypeRepository, + crdtNoteFieldValueRepository, + crdtNoteRepository, + crdtNoteTypeRepository, + crdtRepositories, + crdtReviewLogRepository, + entitiesToCrdtDocuments, + getCrdtRepository, + getRepositoryForDocumentId, + mergeAndConvert, +} from "./repositories"; +import { CrdtEntityType } from "./types"; + +describe("crdtDeckRepository", () => { + const createTestDeck = (): LocalDeck => { + const now = new Date(); + return { + id: "deck-1", + userId: "user-1", + name: "Test Deck", + description: "A test deck", + newCardsPerDay: 20, + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }; + }; + + it("should have correct entity type", () => { + expect(crdtDeckRepository.entityType).toBe(CrdtEntityType.Deck); + }); + + it("should convert deck to CRDT document", () => { + const deck = createTestDeck(); + const result = crdtDeckRepository.toCrdtDocument(deck); + + expect(result.documentId).toBe("deck:deck-1"); + expect(result.binary).toBeInstanceOf(Uint8Array); + expect(result.doc.meta.entityId).toBe("deck-1"); + expect(result.doc.data.name).toBe("Test Deck"); + }); + + it("should load document from binary", () => { + const deck = createTestDeck(); + const { binary } = crdtDeckRepository.toCrdtDocument(deck); + const loaded = crdtDeckRepository.fromBinary(binary); + + expect(loaded.meta.entityId).toBe("deck-1"); + expect(loaded.data.name).toBe("Test Deck"); + }); + + it("should merge documents correctly", () => { + const deck = createTestDeck(); + const { doc: doc1 } = crdtDeckRepository.toCrdtDocument(deck); + const doc2 = Automerge.clone(doc1); + + // Make concurrent changes + const updated1 = Automerge.change(doc1, (d) => { + d.data.name = "Updated Name"; + }); + const updated2 = Automerge.change(doc2, (d) => { + d.data.newCardsPerDay = 30; + }); + + const result = crdtDeckRepository.merge(updated1, updated2); + + expect(result.hasChanges).toBe(true); + expect(result.merged.data.name).toBe("Updated Name"); + expect(result.merged.data.newCardsPerDay).toBe(30); + }); + + it("should convert CRDT document to local entity", () => { + const deck = createTestDeck(); + const { doc } = crdtDeckRepository.toCrdtDocument(deck); + const localEntity = crdtDeckRepository.toLocalEntity(doc); + + expect(localEntity.id).toBe("deck-1"); + expect(localEntity.name).toBe("Test Deck"); + expect(localEntity.userId).toBe("user-1"); + expect(localEntity.syncVersion).toBe(0); // Reset by conversion + }); + + it("should create document ID correctly", () => { + expect(crdtDeckRepository.createDocumentId("deck-123")).toBe( + "deck:deck-123", + ); + }); +}); + +describe("crdtNoteTypeRepository", () => { + const createTestNoteType = (): LocalNoteType => { + const now = new Date(); + return { + id: "notetype-1", + userId: "user-1", + name: "Basic", + frontTemplate: "{{Front}}", + backTemplate: "{{Back}}", + isReversible: true, + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }; + }; + + it("should have correct entity type", () => { + expect(crdtNoteTypeRepository.entityType).toBe(CrdtEntityType.NoteType); + }); + + it("should convert note type to CRDT document", () => { + const noteType = createTestNoteType(); + const result = crdtNoteTypeRepository.toCrdtDocument(noteType); + + expect(result.documentId).toBe("noteType:notetype-1"); + expect(result.doc.data.name).toBe("Basic"); + expect(result.doc.data.isReversible).toBe(true); + }); + + it("should roundtrip correctly", () => { + const noteType = createTestNoteType(); + const { binary } = crdtNoteTypeRepository.toCrdtDocument(noteType); + const loaded = crdtNoteTypeRepository.fromBinary(binary); + const entity = crdtNoteTypeRepository.toLocalEntity(loaded); + + expect(entity.id).toBe("notetype-1"); + expect(entity.frontTemplate).toBe("{{Front}}"); + expect(entity.backTemplate).toBe("{{Back}}"); + }); +}); + +describe("crdtNoteFieldTypeRepository", () => { + const createTestNoteFieldType = (): LocalNoteFieldType => { + const now = new Date(); + return { + id: "fieldtype-1", + noteTypeId: "notetype-1", + name: "Front", + order: 0, + fieldType: FieldType.Text, + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }; + }; + + it("should have correct entity type", () => { + expect(crdtNoteFieldTypeRepository.entityType).toBe( + CrdtEntityType.NoteFieldType, + ); + }); + + it("should convert note field type to CRDT document", () => { + const fieldType = createTestNoteFieldType(); + const result = crdtNoteFieldTypeRepository.toCrdtDocument(fieldType); + + expect(result.documentId).toBe("noteFieldType:fieldtype-1"); + expect(result.doc.data.name).toBe("Front"); + expect(result.doc.data.order).toBe(0); + }); +}); + +describe("crdtNoteRepository", () => { + const createTestNote = (): LocalNote => { + const now = new Date(); + return { + id: "note-1", + deckId: "deck-1", + noteTypeId: "notetype-1", + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }; + }; + + it("should have correct entity type", () => { + expect(crdtNoteRepository.entityType).toBe(CrdtEntityType.Note); + }); + + it("should convert note to CRDT document", () => { + const note = createTestNote(); + const result = crdtNoteRepository.toCrdtDocument(note); + + expect(result.documentId).toBe("note:note-1"); + expect(result.doc.data.deckId).toBe("deck-1"); + expect(result.doc.data.noteTypeId).toBe("notetype-1"); + }); +}); + +describe("crdtNoteFieldValueRepository", () => { + const createTestNoteFieldValue = (): LocalNoteFieldValue => { + const now = new Date(); + return { + id: "fieldvalue-1", + noteId: "note-1", + noteFieldTypeId: "fieldtype-1", + value: "Tokyo", + createdAt: now, + updatedAt: now, + syncVersion: 1, + _synced: true, + }; + }; + + it("should have correct entity type", () => { + expect(crdtNoteFieldValueRepository.entityType).toBe( + CrdtEntityType.NoteFieldValue, + ); + }); + + it("should convert note field value to CRDT document", () => { + const fieldValue = createTestNoteFieldValue(); + const result = crdtNoteFieldValueRepository.toCrdtDocument(fieldValue); + + expect(result.documentId).toBe("noteFieldValue:fieldvalue-1"); + expect(result.doc.data.value).toBe("Tokyo"); + }); +}); + +describe("crdtCardRepository", () => { + const createTestCard = (): LocalCard => { + const now = new Date(); + return { + id: "card-1", + deckId: "deck-1", + noteId: "note-1", + isReversed: false, + front: "What is the capital of Japan?", + back: "Tokyo", + state: CardState.New, + due: now, + stability: 0, + difficulty: 0, + elapsedDays: 0, + scheduledDays: 0, + reps: 0, + lapses: 0, + lastReview: null, + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }; + }; + + it("should have correct entity type", () => { + expect(crdtCardRepository.entityType).toBe(CrdtEntityType.Card); + }); + + it("should convert card to CRDT document", () => { + const card = createTestCard(); + const result = crdtCardRepository.toCrdtDocument(card); + + expect(result.documentId).toBe("card:card-1"); + expect(result.doc.data.front).toBe("What is the capital of Japan?"); + expect(result.doc.data.back).toBe("Tokyo"); + expect(result.doc.data.state).toBe(CardState.New); + }); + + it("should preserve FSRS fields in roundtrip", () => { + const now = new Date(); + const card: LocalCard = { + ...createTestCard(), + state: CardState.Review, + stability: 5.5, + difficulty: 0.3, + reps: 5, + lapses: 1, + lastReview: now, + }; + + const { binary } = crdtCardRepository.toCrdtDocument(card); + const loaded = crdtCardRepository.fromBinary(binary); + const entity = crdtCardRepository.toLocalEntity(loaded); + + expect(entity.state).toBe(CardState.Review); + expect(entity.stability).toBe(5.5); + expect(entity.difficulty).toBe(0.3); + expect(entity.reps).toBe(5); + expect(entity.lapses).toBe(1); + }); +}); + +describe("crdtReviewLogRepository", () => { + const createTestReviewLog = (): LocalReviewLog => { + const now = new Date(); + return { + id: "review-1", + cardId: "card-1", + userId: "user-1", + rating: Rating.Good, + state: CardState.Review, + scheduledDays: 4, + elapsedDays: 1, + reviewedAt: now, + durationMs: 5000, + syncVersion: 1, + _synced: true, + }; + }; + + it("should have correct entity type", () => { + expect(crdtReviewLogRepository.entityType).toBe(CrdtEntityType.ReviewLog); + }); + + it("should convert review log to CRDT document", () => { + const reviewLog = createTestReviewLog(); + const result = crdtReviewLogRepository.toCrdtDocument(reviewLog); + + expect(result.documentId).toBe("reviewLog:review-1"); + expect(result.doc.data.rating).toBe(Rating.Good); + expect(result.doc.data.durationMs).toBe(5000); + }); +}); + +describe("getCrdtRepository", () => { + it("should return correct repository for each entity type", () => { + expect(getCrdtRepository(CrdtEntityType.Deck)).toBe(crdtDeckRepository); + expect(getCrdtRepository(CrdtEntityType.NoteType)).toBe( + crdtNoteTypeRepository, + ); + expect(getCrdtRepository(CrdtEntityType.NoteFieldType)).toBe( + crdtNoteFieldTypeRepository, + ); + expect(getCrdtRepository(CrdtEntityType.Note)).toBe(crdtNoteRepository); + expect(getCrdtRepository(CrdtEntityType.NoteFieldValue)).toBe( + crdtNoteFieldValueRepository, + ); + expect(getCrdtRepository(CrdtEntityType.Card)).toBe(crdtCardRepository); + expect(getCrdtRepository(CrdtEntityType.ReviewLog)).toBe( + crdtReviewLogRepository, + ); + }); +}); + +describe("crdtRepositories", () => { + it("should contain all repositories", () => { + expect(Object.keys(crdtRepositories)).toHaveLength(7); + expect(crdtRepositories.deck).toBe(crdtDeckRepository); + expect(crdtRepositories.noteType).toBe(crdtNoteTypeRepository); + expect(crdtRepositories.noteFieldType).toBe(crdtNoteFieldTypeRepository); + expect(crdtRepositories.note).toBe(crdtNoteRepository); + expect(crdtRepositories.noteFieldValue).toBe(crdtNoteFieldValueRepository); + expect(crdtRepositories.card).toBe(crdtCardRepository); + expect(crdtRepositories.reviewLog).toBe(crdtReviewLogRepository); + }); +}); + +describe("entitiesToCrdtDocuments", () => { + it("should convert multiple entities to CRDT documents", () => { + const now = new Date(); + const decks: LocalDeck[] = [ + { + id: "deck-1", + userId: "user-1", + name: "Deck 1", + description: null, + newCardsPerDay: 20, + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }, + { + id: "deck-2", + userId: "user-1", + name: "Deck 2", + description: "Second deck", + newCardsPerDay: 15, + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }, + ]; + + const results = entitiesToCrdtDocuments(decks, crdtDeckRepository); + + expect(results).toHaveLength(2); + expect(results[0]?.documentId).toBe("deck:deck-1"); + expect(results[1]?.documentId).toBe("deck:deck-2"); + expect(results[0]?.doc.data.name).toBe("Deck 1"); + expect(results[1]?.doc.data.name).toBe("Deck 2"); + }); +}); + +describe("mergeAndConvert", () => { + it("should use remote document when local is null", () => { + const now = new Date(); + const deck: LocalDeck = { + id: "deck-1", + userId: "user-1", + name: "Remote Deck", + description: null, + newCardsPerDay: 20, + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }; + + const { binary } = crdtDeckRepository.toCrdtDocument(deck); + const result = mergeAndConvert(null, binary, crdtDeckRepository); + + expect(result.hasChanges).toBe(true); + expect(result.entity.name).toBe("Remote Deck"); + }); + + it("should merge local and remote documents", () => { + const now = new Date(); + const deck: LocalDeck = { + id: "deck-1", + userId: "user-1", + name: "Original", + description: null, + newCardsPerDay: 20, + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }; + + const { doc: localDoc } = crdtDeckRepository.toCrdtDocument(deck); + + // Create remote with different changes + const remoteDoc = Automerge.change(Automerge.clone(localDoc), (d) => { + d.data.newCardsPerDay = 30; + }); + const remoteBinary = saveDocument(remoteDoc); + + // Modify local + const updatedLocalDoc = Automerge.change(localDoc, (d) => { + d.data.name = "Updated Local"; + }); + const updatedLocalBinary = saveDocument(updatedLocalDoc); + + const result = mergeAndConvert( + updatedLocalBinary, + remoteBinary, + crdtDeckRepository, + ); + + expect(result.hasChanges).toBe(true); + // Both changes should be merged + expect(result.entity.name).toBe("Updated Local"); + expect(result.entity.newCardsPerDay).toBe(30); + }); + + it("should detect no changes when documents are identical", () => { + const now = new Date(); + const deck: LocalDeck = { + id: "deck-1", + userId: "user-1", + name: "Same", + description: null, + newCardsPerDay: 20, + createdAt: now, + updatedAt: now, + deletedAt: null, + syncVersion: 1, + _synced: true, + }; + + const { binary } = crdtDeckRepository.toCrdtDocument(deck); + const result = mergeAndConvert(binary, binary, crdtDeckRepository); + + expect(result.hasChanges).toBe(false); + expect(result.entity.name).toBe("Same"); + }); +}); + +describe("getRepositoryForDocumentId", () => { + it("should return repository and entity ID for valid document ID", () => { + const result = getRepositoryForDocumentId("deck:deck-123"); + + expect(result).not.toBeNull(); + expect(result?.entityId).toBe("deck-123"); + }); + + it("should return null for invalid document ID", () => { + expect(getRepositoryForDocumentId("invalid")).toBeNull(); + expect(getRepositoryForDocumentId("unknown:id")).toBeNull(); + expect(getRepositoryForDocumentId("")).toBeNull(); + }); + + it("should work for all entity types", () => { + const testCases = [ + { documentId: "deck:id1", entityId: "id1" }, + { documentId: "noteType:id2", entityId: "id2" }, + { documentId: "noteFieldType:id3", entityId: "id3" }, + { documentId: "note:id4", entityId: "id4" }, + { documentId: "noteFieldValue:id5", entityId: "id5" }, + { documentId: "card:id6", entityId: "id6" }, + { documentId: "reviewLog:id7", entityId: "id7" }, + ]; + + for (const { documentId, entityId } of testCases) { + const result = getRepositoryForDocumentId(documentId); + expect(result).not.toBeNull(); + expect(result?.entityId).toBe(entityId); + } + }); +}); diff --git a/src/client/sync/crdt/repositories.ts b/src/client/sync/crdt/repositories.ts new file mode 100644 index 0000000..32d516f --- /dev/null +++ b/src/client/sync/crdt/repositories.ts @@ -0,0 +1,485 @@ +/** + * CRDT-Aware Repository Wrappers + * + * This module provides CRDT-aware repository wrappers that handle the conversion + * between local entities and Automerge CRDT documents. These repositories are used + * during sync operations to create, update, and merge CRDT documents. + * + * Design: + * - Each entity type has a corresponding CRDT repository + * - Repositories handle conversion to/from CRDT format + * - Binary serialization is handled for sync payload + * - Merge operations use Automerge's conflict-free merge + */ + +import type * as Automerge from "@automerge/automerge"; +import type { + LocalCard, + LocalDeck, + LocalNote, + LocalNoteFieldType, + LocalNoteFieldValue, + LocalNoteType, + LocalReviewLog, +} from "../../db/index"; +import { + cardToCrdtDocument, + crdtDocumentToCard, + crdtDocumentToDeck, + crdtDocumentToNote, + crdtDocumentToNoteFieldType, + crdtDocumentToNoteFieldValue, + crdtDocumentToNoteType, + crdtDocumentToReviewLog, + createDocument, + deckToCrdtDocument, + loadDocument, + type MergeResult, + mergeDocuments, + noteFieldTypeToCrdtDocument, + noteFieldValueToCrdtDocument, + noteToCrdtDocument, + noteTypeToCrdtDocument, + reviewLogToCrdtDocument, + saveDocument, +} from "./document-manager"; +import type { + CrdtCardDocument, + CrdtDeckDocument, + CrdtEntityTypeValue, + CrdtNoteDocument, + CrdtNoteFieldTypeDocument, + CrdtNoteFieldValueDocument, + CrdtNoteTypeDocument, + CrdtReviewLogDocument, +} from "./types"; +import { CrdtEntityType, createDocumentId, parseDocumentId } from "./types"; + +/** + * Result of creating or updating a CRDT document + */ +export interface CrdtDocumentResult<T> { + /** The Automerge document */ + doc: Automerge.Doc<T>; + /** Binary representation for sync */ + binary: Uint8Array; + /** Document ID (entityType:entityId format) */ + documentId: string; +} + +/** + * Result of merging CRDT documents + */ +export interface CrdtMergeResult<T> { + /** The merged document */ + doc: Automerge.Doc<T>; + /** Binary representation of merged document */ + binary: Uint8Array; + /** Whether the merge resulted in any changes */ + hasChanges: boolean; + /** Converted local entity from merged document */ + entity: T extends CrdtDeckDocument + ? Omit<LocalDeck, "_synced"> + : T extends CrdtNoteTypeDocument + ? Omit<LocalNoteType, "_synced"> + : T extends CrdtNoteFieldTypeDocument + ? Omit<LocalNoteFieldType, "_synced"> + : T extends CrdtNoteDocument + ? Omit<LocalNote, "_synced"> + : T extends CrdtNoteFieldValueDocument + ? Omit<LocalNoteFieldValue, "_synced"> + : T extends CrdtCardDocument + ? Omit<LocalCard, "_synced"> + : T extends CrdtReviewLogDocument + ? Omit<LocalReviewLog, "_synced"> + : never; +} + +/** + * Base interface for CRDT repositories + */ +export interface CrdtRepository<TLocal, TCrdt> { + /** Entity type identifier */ + readonly entityType: CrdtEntityTypeValue; + + /** Convert local entity to CRDT document and serialize */ + toCrdtDocument(entity: TLocal): CrdtDocumentResult<TCrdt>; + + /** Load CRDT document from binary data */ + fromBinary(binary: Uint8Array): Automerge.Doc<TCrdt>; + + /** Merge local and remote documents */ + merge( + local: Automerge.Doc<TCrdt>, + remote: Automerge.Doc<TCrdt>, + ): MergeResult<TCrdt>; + + /** Convert CRDT document to local entity format */ + toLocalEntity(doc: Automerge.Doc<TCrdt>): Omit<TLocal, "_synced">; + + /** Create document ID for an entity */ + createDocumentId(entityId: string): string; +} + +/** + * CRDT Repository for Deck entities + */ +export const crdtDeckRepository: CrdtRepository<LocalDeck, CrdtDeckDocument> = { + entityType: CrdtEntityType.Deck, + + toCrdtDocument(deck: LocalDeck): CrdtDocumentResult<CrdtDeckDocument> { + const crdtData = deckToCrdtDocument(deck); + const doc = createDocument(crdtData); + const binary = saveDocument(doc); + const documentId = createDocumentId(this.entityType, deck.id); + + return { doc, binary, documentId }; + }, + + fromBinary(binary: Uint8Array): Automerge.Doc<CrdtDeckDocument> { + return loadDocument<CrdtDeckDocument>(binary); + }, + + merge( + local: Automerge.Doc<CrdtDeckDocument>, + remote: Automerge.Doc<CrdtDeckDocument>, + ): MergeResult<CrdtDeckDocument> { + return mergeDocuments(local, remote); + }, + + toLocalEntity( + doc: Automerge.Doc<CrdtDeckDocument>, + ): Omit<LocalDeck, "_synced"> { + return crdtDocumentToDeck(doc); + }, + + createDocumentId(entityId: string): string { + return createDocumentId(this.entityType, entityId); + }, +}; + +/** + * CRDT Repository for NoteType entities + */ +export const crdtNoteTypeRepository: CrdtRepository< + LocalNoteType, + CrdtNoteTypeDocument +> = { + entityType: CrdtEntityType.NoteType, + + toCrdtDocument( + noteType: LocalNoteType, + ): CrdtDocumentResult<CrdtNoteTypeDocument> { + const crdtData = noteTypeToCrdtDocument(noteType); + const doc = createDocument(crdtData); + const binary = saveDocument(doc); + const documentId = createDocumentId(this.entityType, noteType.id); + + return { doc, binary, documentId }; + }, + + fromBinary(binary: Uint8Array): Automerge.Doc<CrdtNoteTypeDocument> { + return loadDocument<CrdtNoteTypeDocument>(binary); + }, + + merge( + local: Automerge.Doc<CrdtNoteTypeDocument>, + remote: Automerge.Doc<CrdtNoteTypeDocument>, + ): MergeResult<CrdtNoteTypeDocument> { + return mergeDocuments(local, remote); + }, + + toLocalEntity( + doc: Automerge.Doc<CrdtNoteTypeDocument>, + ): Omit<LocalNoteType, "_synced"> { + return crdtDocumentToNoteType(doc); + }, + + createDocumentId(entityId: string): string { + return createDocumentId(this.entityType, entityId); + }, +}; + +/** + * CRDT Repository for NoteFieldType entities + */ +export const crdtNoteFieldTypeRepository: CrdtRepository< + LocalNoteFieldType, + CrdtNoteFieldTypeDocument +> = { + entityType: CrdtEntityType.NoteFieldType, + + toCrdtDocument( + fieldType: LocalNoteFieldType, + ): CrdtDocumentResult<CrdtNoteFieldTypeDocument> { + const crdtData = noteFieldTypeToCrdtDocument(fieldType); + const doc = createDocument(crdtData); + const binary = saveDocument(doc); + const documentId = createDocumentId(this.entityType, fieldType.id); + + return { doc, binary, documentId }; + }, + + fromBinary(binary: Uint8Array): Automerge.Doc<CrdtNoteFieldTypeDocument> { + return loadDocument<CrdtNoteFieldTypeDocument>(binary); + }, + + merge( + local: Automerge.Doc<CrdtNoteFieldTypeDocument>, + remote: Automerge.Doc<CrdtNoteFieldTypeDocument>, + ): MergeResult<CrdtNoteFieldTypeDocument> { + return mergeDocuments(local, remote); + }, + + toLocalEntity( + doc: Automerge.Doc<CrdtNoteFieldTypeDocument>, + ): Omit<LocalNoteFieldType, "_synced"> { + return crdtDocumentToNoteFieldType(doc); + }, + + createDocumentId(entityId: string): string { + return createDocumentId(this.entityType, entityId); + }, +}; + +/** + * CRDT Repository for Note entities + */ +export const crdtNoteRepository: CrdtRepository<LocalNote, CrdtNoteDocument> = { + entityType: CrdtEntityType.Note, + + toCrdtDocument(note: LocalNote): CrdtDocumentResult<CrdtNoteDocument> { + const crdtData = noteToCrdtDocument(note); + const doc = createDocument(crdtData); + const binary = saveDocument(doc); + const documentId = createDocumentId(this.entityType, note.id); + + return { doc, binary, documentId }; + }, + + fromBinary(binary: Uint8Array): Automerge.Doc<CrdtNoteDocument> { + return loadDocument<CrdtNoteDocument>(binary); + }, + + merge( + local: Automerge.Doc<CrdtNoteDocument>, + remote: Automerge.Doc<CrdtNoteDocument>, + ): MergeResult<CrdtNoteDocument> { + return mergeDocuments(local, remote); + }, + + toLocalEntity( + doc: Automerge.Doc<CrdtNoteDocument>, + ): Omit<LocalNote, "_synced"> { + return crdtDocumentToNote(doc); + }, + + createDocumentId(entityId: string): string { + return createDocumentId(this.entityType, entityId); + }, +}; + +/** + * CRDT Repository for NoteFieldValue entities + */ +export const crdtNoteFieldValueRepository: CrdtRepository< + LocalNoteFieldValue, + CrdtNoteFieldValueDocument +> = { + entityType: CrdtEntityType.NoteFieldValue, + + toCrdtDocument( + fieldValue: LocalNoteFieldValue, + ): CrdtDocumentResult<CrdtNoteFieldValueDocument> { + const crdtData = noteFieldValueToCrdtDocument(fieldValue); + const doc = createDocument(crdtData); + const binary = saveDocument(doc); + const documentId = createDocumentId(this.entityType, fieldValue.id); + + return { doc, binary, documentId }; + }, + + fromBinary(binary: Uint8Array): Automerge.Doc<CrdtNoteFieldValueDocument> { + return loadDocument<CrdtNoteFieldValueDocument>(binary); + }, + + merge( + local: Automerge.Doc<CrdtNoteFieldValueDocument>, + remote: Automerge.Doc<CrdtNoteFieldValueDocument>, + ): MergeResult<CrdtNoteFieldValueDocument> { + return mergeDocuments(local, remote); + }, + + toLocalEntity( + doc: Automerge.Doc<CrdtNoteFieldValueDocument>, + ): Omit<LocalNoteFieldValue, "_synced"> { + return crdtDocumentToNoteFieldValue(doc); + }, + + createDocumentId(entityId: string): string { + return createDocumentId(this.entityType, entityId); + }, +}; + +/** + * CRDT Repository for Card entities + */ +export const crdtCardRepository: CrdtRepository<LocalCard, CrdtCardDocument> = { + entityType: CrdtEntityType.Card, + + toCrdtDocument(card: LocalCard): CrdtDocumentResult<CrdtCardDocument> { + const crdtData = cardToCrdtDocument(card); + const doc = createDocument(crdtData); + const binary = saveDocument(doc); + const documentId = createDocumentId(this.entityType, card.id); + + return { doc, binary, documentId }; + }, + + fromBinary(binary: Uint8Array): Automerge.Doc<CrdtCardDocument> { + return loadDocument<CrdtCardDocument>(binary); + }, + + merge( + local: Automerge.Doc<CrdtCardDocument>, + remote: Automerge.Doc<CrdtCardDocument>, + ): MergeResult<CrdtCardDocument> { + return mergeDocuments(local, remote); + }, + + toLocalEntity( + doc: Automerge.Doc<CrdtCardDocument>, + ): Omit<LocalCard, "_synced"> { + return crdtDocumentToCard(doc); + }, + + createDocumentId(entityId: string): string { + return createDocumentId(this.entityType, entityId); + }, +}; + +/** + * CRDT Repository for ReviewLog entities + */ +export const crdtReviewLogRepository: CrdtRepository< + LocalReviewLog, + CrdtReviewLogDocument +> = { + entityType: CrdtEntityType.ReviewLog, + + toCrdtDocument( + reviewLog: LocalReviewLog, + ): CrdtDocumentResult<CrdtReviewLogDocument> { + const crdtData = reviewLogToCrdtDocument(reviewLog); + const doc = createDocument(crdtData); + const binary = saveDocument(doc); + const documentId = createDocumentId(this.entityType, reviewLog.id); + + return { doc, binary, documentId }; + }, + + fromBinary(binary: Uint8Array): Automerge.Doc<CrdtReviewLogDocument> { + return loadDocument<CrdtReviewLogDocument>(binary); + }, + + merge( + local: Automerge.Doc<CrdtReviewLogDocument>, + remote: Automerge.Doc<CrdtReviewLogDocument>, + ): MergeResult<CrdtReviewLogDocument> { + return mergeDocuments(local, remote); + }, + + toLocalEntity( + doc: Automerge.Doc<CrdtReviewLogDocument>, + ): Omit<LocalReviewLog, "_synced"> { + return crdtDocumentToReviewLog(doc); + }, + + createDocumentId(entityId: string): string { + return createDocumentId(this.entityType, entityId); + }, +}; + +/** + * Map of entity types to their CRDT repositories + */ +export const crdtRepositories = { + [CrdtEntityType.Deck]: crdtDeckRepository, + [CrdtEntityType.NoteType]: crdtNoteTypeRepository, + [CrdtEntityType.NoteFieldType]: crdtNoteFieldTypeRepository, + [CrdtEntityType.Note]: crdtNoteRepository, + [CrdtEntityType.NoteFieldValue]: crdtNoteFieldValueRepository, + [CrdtEntityType.Card]: crdtCardRepository, + [CrdtEntityType.ReviewLog]: crdtReviewLogRepository, +} as const; + +/** + * Get the CRDT repository for an entity type + */ +export function getCrdtRepository<T extends CrdtEntityTypeValue>( + entityType: T, +): (typeof crdtRepositories)[T] { + return crdtRepositories[entityType]; +} + +/** + * Helper to convert multiple entities to CRDT documents + */ +export function entitiesToCrdtDocuments<TLocal, TCrdt>( + entities: TLocal[], + repository: CrdtRepository<TLocal, TCrdt>, +): CrdtDocumentResult<TCrdt>[] { + return entities.map((entity) => repository.toCrdtDocument(entity)); +} + +/** + * Helper to merge and convert a remote document with a local document + */ +export function mergeAndConvert<TLocal, TCrdt>( + localBinary: Uint8Array | null, + remoteBinary: Uint8Array, + repository: CrdtRepository<TLocal, TCrdt>, +): { + entity: Omit<TLocal, "_synced">; + binary: Uint8Array; + hasChanges: boolean; +} { + const remoteDoc = repository.fromBinary(remoteBinary); + + if (localBinary === null) { + // No local document, use remote as-is + return { + entity: repository.toLocalEntity(remoteDoc), + binary: remoteBinary, + hasChanges: true, + }; + } + + const localDoc = repository.fromBinary(localBinary); + const mergeResult = repository.merge(localDoc, remoteDoc); + + return { + entity: repository.toLocalEntity(mergeResult.merged), + binary: mergeResult.binary, + hasChanges: mergeResult.hasChanges, + }; +} + +/** + * Parse a document ID and get the corresponding repository + */ +export function getRepositoryForDocumentId(documentId: string): { + repository: CrdtRepository<unknown, unknown>; + entityId: string; +} | null { + const parsed = parseDocumentId(documentId); + if (!parsed) { + return null; + } + + const repository = getCrdtRepository(parsed.entityType); + return { + repository: repository as CrdtRepository<unknown, unknown>, + entityId: parsed.entityId, + }; +} diff --git a/src/client/sync/crdt/sync-state.test.ts b/src/client/sync/crdt/sync-state.test.ts new file mode 100644 index 0000000..4fb9066 --- /dev/null +++ b/src/client/sync/crdt/sync-state.test.ts @@ -0,0 +1,449 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import "fake-indexeddb/auto"; +import { + base64ToBinary, + binaryToBase64, + type CrdtSyncPayload, + type CrdtSyncStateEntry, + CrdtSyncStateManager, + crdtSyncDb, + entriesToSyncPayload, + syncPayloadToEntries, +} from "./sync-state"; +import { CrdtEntityType } from "./types"; + +describe("binaryToBase64 and base64ToBinary", () => { + it("should roundtrip binary data correctly", () => { + const original = new Uint8Array([0, 1, 2, 255, 128, 64]); + const base64 = binaryToBase64(original); + const restored = base64ToBinary(base64); + + expect(restored).toEqual(original); + }); + + it("should handle empty array", () => { + const original = new Uint8Array([]); + const base64 = binaryToBase64(original); + const restored = base64ToBinary(base64); + + expect(restored).toEqual(original); + }); + + it("should handle large data", () => { + const original = new Uint8Array(1000); + for (let i = 0; i < 1000; i++) { + original[i] = i % 256; + } + + const base64 = binaryToBase64(original); + const restored = base64ToBinary(base64); + + expect(restored).toEqual(original); + }); + + it("should produce valid base64 string", () => { + const data = new Uint8Array([72, 101, 108, 108, 111]); // "Hello" + const base64 = binaryToBase64(data); + + expect(base64).toBe("SGVsbG8="); + }); +}); + +describe("entriesToSyncPayload", () => { + it("should convert entries to sync payload format", () => { + const entries: CrdtSyncStateEntry[] = [ + { + documentId: "deck:deck-1", + entityType: CrdtEntityType.Deck, + entityId: "deck-1", + binary: new Uint8Array([1, 2, 3]), + lastSyncedAt: Date.now(), + syncVersion: 1, + }, + { + documentId: "card:card-1", + entityType: CrdtEntityType.Card, + entityId: "card-1", + binary: new Uint8Array([4, 5, 6]), + lastSyncedAt: Date.now(), + syncVersion: 2, + }, + ]; + + const payloads = entriesToSyncPayload(entries); + + expect(payloads).toHaveLength(2); + expect(payloads[0]?.documentId).toBe("deck:deck-1"); + expect(payloads[0]?.entityType).toBe(CrdtEntityType.Deck); + expect(payloads[0]?.entityId).toBe("deck-1"); + expect(typeof payloads[0]?.binary).toBe("string"); // Base64 encoded + }); +}); + +describe("syncPayloadToEntries", () => { + it("should convert sync payload to entries format", () => { + const payloads: CrdtSyncPayload[] = [ + { + documentId: "deck:deck-1", + entityType: CrdtEntityType.Deck, + entityId: "deck-1", + binary: binaryToBase64(new Uint8Array([1, 2, 3])), + }, + ]; + + const entries = syncPayloadToEntries(payloads, 5); + + expect(entries).toHaveLength(1); + expect(entries[0]?.entityType).toBe(CrdtEntityType.Deck); + expect(entries[0]?.entityId).toBe("deck-1"); + expect(entries[0]?.binary).toEqual(new Uint8Array([1, 2, 3])); + expect(entries[0]?.syncVersion).toBe(5); + }); +}); + +describe("CrdtSyncStateManager", () => { + let manager: CrdtSyncStateManager; + + beforeEach(async () => { + manager = new CrdtSyncStateManager(); + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + }); + + afterEach(async () => { + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + }); + + describe("document operations", () => { + it("should store and retrieve document binary", async () => { + const binary = new Uint8Array([1, 2, 3, 4, 5]); + await manager.setDocumentBinary(CrdtEntityType.Deck, "deck-1", binary, 1); + + const retrieved = await manager.getDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + ); + expect(retrieved).toEqual(binary); + }); + + it("should return null for non-existent document", async () => { + const result = await manager.getDocumentBinary( + CrdtEntityType.Deck, + "non-existent", + ); + expect(result).toBeNull(); + }); + + it("should update existing document", async () => { + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([2]), + 2, + ); + + const retrieved = await manager.getDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + ); + expect(retrieved).toEqual(new Uint8Array([2])); + }); + + it("should delete document", async () => { + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + await manager.deleteDocument(CrdtEntityType.Deck, "deck-1"); + + const result = await manager.getDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + ); + expect(result).toBeNull(); + }); + + it("should check if document exists", async () => { + expect(await manager.hasDocument(CrdtEntityType.Deck, "deck-1")).toBe( + false, + ); + + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + + expect(await manager.hasDocument(CrdtEntityType.Deck, "deck-1")).toBe( + true, + ); + }); + }); + + describe("document by type operations", () => { + it("should get all documents by type", async () => { + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-2", + new Uint8Array([2]), + 1, + ); + await manager.setDocumentBinary( + CrdtEntityType.Card, + "card-1", + new Uint8Array([3]), + 1, + ); + + const decks = await manager.getDocumentsByType(CrdtEntityType.Deck); + expect(decks).toHaveLength(2); + + const cards = await manager.getDocumentsByType(CrdtEntityType.Card); + expect(cards).toHaveLength(1); + }); + + it("should delete documents by type", async () => { + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-2", + new Uint8Array([2]), + 1, + ); + await manager.setDocumentBinary( + CrdtEntityType.Card, + "card-1", + new Uint8Array([3]), + 1, + ); + + await manager.deleteDocumentsByType(CrdtEntityType.Deck); + + const decks = await manager.getDocumentsByType(CrdtEntityType.Deck); + expect(decks).toHaveLength(0); + + const cards = await manager.getDocumentsByType(CrdtEntityType.Card); + expect(cards).toHaveLength(1); + }); + + it("should count documents by type", async () => { + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-2", + new Uint8Array([2]), + 1, + ); + + const count = await manager.getDocumentCountByType(CrdtEntityType.Deck); + expect(count).toBe(2); + }); + }); + + describe("metadata operations", () => { + it("should get and set metadata", async () => { + await manager.setMetadata({ + lastSyncAt: 1234567890, + syncVersionWatermark: 10, + actorId: "actor-123", + }); + + const metadata = await manager.getMetadata(); + expect(metadata?.lastSyncAt).toBe(1234567890); + expect(metadata?.syncVersionWatermark).toBe(10); + expect(metadata?.actorId).toBe("actor-123"); + }); + + it("should return null for non-existent metadata", async () => { + const metadata = await manager.getMetadata(); + expect(metadata).toBeNull(); + }); + + it("should update partial metadata", async () => { + await manager.setMetadata({ + lastSyncAt: 1000, + syncVersionWatermark: 1, + actorId: "actor-1", + }); + + await manager.setMetadata({ lastSyncAt: 2000 }); + + const metadata = await manager.getMetadata(); + expect(metadata?.lastSyncAt).toBe(2000); + expect(metadata?.syncVersionWatermark).toBe(1); // Preserved + expect(metadata?.actorId).toBe("actor-1"); // Preserved + }); + + it("should get and set last sync timestamp", async () => { + expect(await manager.getLastSyncAt()).toBe(0); + + await manager.setLastSyncAt(1234567890); + expect(await manager.getLastSyncAt()).toBe(1234567890); + }); + + it("should get and set sync version watermark", async () => { + expect(await manager.getSyncVersionWatermark()).toBe(0); + + await manager.setSyncVersionWatermark(42); + expect(await manager.getSyncVersionWatermark()).toBe(42); + }); + }); + + describe("batch operations", () => { + it("should batch set multiple documents", async () => { + const deckType = CrdtEntityType.Deck; + const cardType = CrdtEntityType.Card; + const entries = [ + { + entityType: deckType, + entityId: "deck-1", + binary: new Uint8Array([1]), + syncVersion: 1, + }, + { + entityType: cardType, + entityId: "card-1", + binary: new Uint8Array([2]), + syncVersion: 1, + }, + { + entityType: cardType, + entityId: "card-2", + binary: new Uint8Array([3]), + syncVersion: 1, + }, + ]; + + await manager.batchSetDocuments(entries); + + expect(await manager.getTotalDocumentCount()).toBe(3); + expect( + await manager.getDocumentBinary(CrdtEntityType.Deck, "deck-1"), + ).toEqual(new Uint8Array([1])); + expect( + await manager.getDocumentBinary(CrdtEntityType.Card, "card-1"), + ).toEqual(new Uint8Array([2])); + }); + + it("should batch delete multiple documents", async () => { + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + await manager.setDocumentBinary( + CrdtEntityType.Card, + "card-1", + new Uint8Array([2]), + 1, + ); + await manager.setDocumentBinary( + CrdtEntityType.Card, + "card-2", + new Uint8Array([3]), + 1, + ); + + await manager.batchDeleteDocuments([ + { entityType: CrdtEntityType.Deck, entityId: "deck-1" }, + { entityType: CrdtEntityType.Card, entityId: "card-1" }, + ]); + + expect(await manager.getTotalDocumentCount()).toBe(1); + expect( + await manager.getDocumentBinary(CrdtEntityType.Card, "card-2"), + ).toEqual(new Uint8Array([3])); + }); + }); + + describe("sync time queries", () => { + it("should get documents synced since timestamp", async () => { + // Set documents with different sync times + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + + // Wait a bit to ensure different timestamps + await new Promise((resolve) => setTimeout(resolve, 10)); + const afterFirst = Date.now(); + + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-2", + new Uint8Array([2]), + 1, + ); + + const recentDocs = await manager.getDocumentsSyncedSince(afterFirst - 5); + expect(recentDocs.length).toBeGreaterThanOrEqual(1); + }); + }); + + describe("clear operations", () => { + it("should clear all data", async () => { + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + await manager.setMetadata({ lastSyncAt: 1000 }); + + await manager.clearAll(); + + expect(await manager.getTotalDocumentCount()).toBe(0); + expect(await manager.getMetadata()).toBeNull(); + }); + }); + + describe("total document count", () => { + it("should return total count of all documents", async () => { + expect(await manager.getTotalDocumentCount()).toBe(0); + + await manager.setDocumentBinary( + CrdtEntityType.Deck, + "deck-1", + new Uint8Array([1]), + 1, + ); + await manager.setDocumentBinary( + CrdtEntityType.Card, + "card-1", + new Uint8Array([2]), + 1, + ); + + expect(await manager.getTotalDocumentCount()).toBe(2); + }); + }); +}); diff --git a/src/client/sync/crdt/sync-state.ts b/src/client/sync/crdt/sync-state.ts new file mode 100644 index 0000000..391ccf0 --- /dev/null +++ b/src/client/sync/crdt/sync-state.ts @@ -0,0 +1,370 @@ +/** + * CRDT Sync State Management + * + * This module handles the serialization and persistence of CRDT sync state. + * It tracks which documents have been synced and stores the binary state + * for incremental sync operations. + * + * Design: + * - Sync state is stored in IndexedDB alongside entity data + * - Each entity has a corresponding CRDT binary stored in sync state + * - The state tracks sync vectors for efficient incremental updates + */ + +import Dexie from "dexie"; +import { type CrdtEntityTypeValue, createDocumentId } from "./types"; + +/** + * Sync state entry for a single CRDT document + */ +export interface CrdtSyncStateEntry { + /** Document ID in format entityType:entityId */ + documentId: string; + /** Entity type */ + entityType: CrdtEntityTypeValue; + /** Entity ID */ + entityId: string; + /** Binary representation of the CRDT document */ + binary: Uint8Array; + /** Last sync timestamp */ + lastSyncedAt: number; + /** Sync version from server */ + syncVersion: number; +} + +/** + * Sync metadata for tracking overall sync state + */ +export interface CrdtSyncMetadata { + /** Unique key for metadata storage */ + key: string; + /** Last successful sync timestamp */ + lastSyncAt: number; + /** Server sync version watermark */ + syncVersionWatermark: number; + /** Actor ID for this client */ + actorId: string; +} + +/** + * Database for storing CRDT sync state + * Separate from main app database to avoid migration conflicts + */ +class CrdtSyncDatabase extends Dexie { + syncState!: Dexie.Table<CrdtSyncStateEntry, string>; + metadata!: Dexie.Table<CrdtSyncMetadata, string>; + + constructor() { + super("kioku-crdt-sync"); + + this.version(1).stores({ + // Primary key is documentId, indexed by entityType and entityId + syncState: "documentId, entityType, entityId, lastSyncedAt", + // Simple key-value store for metadata + metadata: "key", + }); + } +} + +/** + * Singleton instance of the CRDT sync database + */ +export const crdtSyncDb = new CrdtSyncDatabase(); + +/** + * CRDT Sync State Manager + * + * Provides operations for managing CRDT sync state: + * - Store/retrieve CRDT document binaries + * - Track sync progress + * - Manage sync metadata + */ +export class CrdtSyncStateManager { + private readonly metadataKey = "sync-metadata"; + + /** + * Get the CRDT binary for an entity + */ + async getDocumentBinary( + entityType: CrdtEntityTypeValue, + entityId: string, + ): Promise<Uint8Array | null> { + const documentId = createDocumentId(entityType, entityId); + const entry = await crdtSyncDb.syncState.get(documentId); + return entry?.binary ?? null; + } + + /** + * Store the CRDT binary for an entity + */ + async setDocumentBinary( + entityType: CrdtEntityTypeValue, + entityId: string, + binary: Uint8Array, + syncVersion: number, + ): Promise<void> { + const documentId = createDocumentId(entityType, entityId); + const entry: CrdtSyncStateEntry = { + documentId, + entityType, + entityId, + binary, + lastSyncedAt: Date.now(), + syncVersion, + }; + await crdtSyncDb.syncState.put(entry); + } + + /** + * Get all CRDT binaries for an entity type + */ + async getDocumentsByType( + entityType: CrdtEntityTypeValue, + ): Promise<CrdtSyncStateEntry[]> { + return crdtSyncDb.syncState + .where("entityType") + .equals(entityType) + .toArray(); + } + + /** + * Delete the CRDT binary for an entity + */ + async deleteDocument( + entityType: CrdtEntityTypeValue, + entityId: string, + ): Promise<void> { + const documentId = createDocumentId(entityType, entityId); + await crdtSyncDb.syncState.delete(documentId); + } + + /** + * Delete all CRDT binaries for an entity type + */ + async deleteDocumentsByType(entityType: CrdtEntityTypeValue): Promise<void> { + await crdtSyncDb.syncState.where("entityType").equals(entityType).delete(); + } + + /** + * Get sync metadata + */ + async getMetadata(): Promise<CrdtSyncMetadata | null> { + const metadata = await crdtSyncDb.metadata.get(this.metadataKey); + return metadata ?? null; + } + + /** + * Update sync metadata + */ + async setMetadata( + updates: Partial<Omit<CrdtSyncMetadata, "key">>, + ): Promise<void> { + const existing = await this.getMetadata(); + const metadata: CrdtSyncMetadata = { + key: this.metadataKey, + lastSyncAt: updates.lastSyncAt ?? existing?.lastSyncAt ?? 0, + syncVersionWatermark: + updates.syncVersionWatermark ?? existing?.syncVersionWatermark ?? 0, + actorId: updates.actorId ?? existing?.actorId ?? "", + }; + await crdtSyncDb.metadata.put(metadata); + } + + /** + * Get the last sync timestamp + */ + async getLastSyncAt(): Promise<number> { + const metadata = await this.getMetadata(); + return metadata?.lastSyncAt ?? 0; + } + + /** + * Update the last sync timestamp + */ + async setLastSyncAt(timestamp: number): Promise<void> { + await this.setMetadata({ lastSyncAt: timestamp }); + } + + /** + * Get the sync version watermark + */ + async getSyncVersionWatermark(): Promise<number> { + const metadata = await this.getMetadata(); + return metadata?.syncVersionWatermark ?? 0; + } + + /** + * Update the sync version watermark + */ + async setSyncVersionWatermark(version: number): Promise<void> { + await this.setMetadata({ syncVersionWatermark: version }); + } + + /** + * Clear all sync state (for full resync) + */ + async clearAll(): Promise<void> { + await crdtSyncDb.syncState.clear(); + await crdtSyncDb.metadata.clear(); + } + + /** + * Get count of stored documents by type + */ + async getDocumentCountByType( + entityType: CrdtEntityTypeValue, + ): Promise<number> { + return crdtSyncDb.syncState.where("entityType").equals(entityType).count(); + } + + /** + * Get total count of stored documents + */ + async getTotalDocumentCount(): Promise<number> { + return crdtSyncDb.syncState.count(); + } + + /** + * Check if a document exists in sync state + */ + async hasDocument( + entityType: CrdtEntityTypeValue, + entityId: string, + ): Promise<boolean> { + const documentId = createDocumentId(entityType, entityId); + const count = await crdtSyncDb.syncState + .where("documentId") + .equals(documentId) + .count(); + return count > 0; + } + + /** + * Get documents that have been synced since a given timestamp + */ + async getDocumentsSyncedSince( + timestamp: number, + ): Promise<CrdtSyncStateEntry[]> { + return crdtSyncDb.syncState + .where("lastSyncedAt") + .above(timestamp) + .toArray(); + } + + /** + * Batch update multiple documents + */ + async batchSetDocuments( + entries: Array<{ + entityType: CrdtEntityTypeValue; + entityId: string; + binary: Uint8Array; + syncVersion: number; + }>, + ): Promise<void> { + const now = Date.now(); + const syncEntries: CrdtSyncStateEntry[] = entries.map((entry) => ({ + documentId: createDocumentId(entry.entityType, entry.entityId), + entityType: entry.entityType, + entityId: entry.entityId, + binary: entry.binary, + lastSyncedAt: now, + syncVersion: entry.syncVersion, + })); + + await crdtSyncDb.syncState.bulkPut(syncEntries); + } + + /** + * Batch delete multiple documents + */ + async batchDeleteDocuments( + entries: Array<{ + entityType: CrdtEntityTypeValue; + entityId: string; + }>, + ): Promise<void> { + const documentIds = entries.map((entry) => + createDocumentId(entry.entityType, entry.entityId), + ); + await crdtSyncDb.syncState.bulkDelete(documentIds); + } +} + +/** + * Singleton instance of the sync state manager + */ +export const crdtSyncStateManager = new CrdtSyncStateManager(); + +/** + * Serialize CRDT binary to base64 for network transport + */ +export function binaryToBase64(binary: Uint8Array): string { + // Use standard base64 encoding + const bytes = new Uint8Array(binary); + let binaryStr = ""; + for (const byte of bytes) { + binaryStr += String.fromCharCode(byte); + } + return btoa(binaryStr); +} + +/** + * Deserialize base64 string to CRDT binary + */ +export function base64ToBinary(base64: string): Uint8Array { + const binaryStr = atob(base64); + const bytes = new Uint8Array(binaryStr.length); + for (let i = 0; i < binaryStr.length; i++) { + bytes[i] = binaryStr.charCodeAt(i); + } + return bytes; +} + +/** + * CRDT changes payload for sync API + */ +export interface CrdtSyncPayload { + /** Document ID */ + documentId: string; + /** Entity type */ + entityType: CrdtEntityTypeValue; + /** Entity ID */ + entityId: string; + /** Base64-encoded CRDT binary */ + binary: string; +} + +/** + * Convert sync state entries to sync payload format + */ +export function entriesToSyncPayload( + entries: CrdtSyncStateEntry[], +): CrdtSyncPayload[] { + return entries.map((entry) => ({ + documentId: entry.documentId, + entityType: entry.entityType, + entityId: entry.entityId, + binary: binaryToBase64(entry.binary), + })); +} + +/** + * Convert sync payload to sync state entries + */ +export function syncPayloadToEntries( + payloads: CrdtSyncPayload[], + syncVersion: number, +): Array<{ + entityType: CrdtEntityTypeValue; + entityId: string; + binary: Uint8Array; + syncVersion: number; +}> { + return payloads.map((payload) => ({ + entityType: payload.entityType, + entityId: payload.entityId, + binary: base64ToBinary(payload.binary), + syncVersion, + })); +} |
