experiments in a post-browser web
at main 484 lines 15 kB view raw
1/** 2 * Sync Engine 3 * 4 * Bidirectional sync protocol: pull/push/merge with last-write-wins conflict resolution. 5 * Runtime-agnostic — uses fetch() (available in Node 18+, all browsers, Tauri WebView). 6 * 7 * Config is provided via callbacks so each runtime can store it however it wants 8 * (chrome.storage.local, SQLite extension_settings, env vars, etc.). 9 */ 10 11import { DATASTORE_VERSION, PROTOCOL_VERSION } from './version.js'; 12 13/** 14 * @typedef {Object} SyncConfig 15 * @property {string} serverUrl 16 * @property {string} apiKey 17 * @property {string} [serverProfileId] 18 * @property {number} lastSyncTime - Unix ms 19 */ 20 21export class SyncEngine { 22 /** 23 * @param {import('./data.js').DataEngine} dataEngine 24 * @param {Object} options 25 * @param {() => Promise<SyncConfig>|SyncConfig} options.getConfig 26 * @param {(updates: Partial<SyncConfig>) => Promise<void>|void} options.setConfig 27 * @param {typeof globalThis.fetch} [options.fetch] - Custom fetch for testing 28 */ 29 constructor(dataEngine, { getConfig, setConfig, fetch: customFetch }) { 30 this.data = dataEngine; 31 this.getConfig = getConfig; 32 this.setConfig = setConfig; 33 this._fetch = customFetch || globalThis.fetch; 34 } 35 36 // ==================== Pull (Server → Client) ==================== 37 38 /** 39 * Pull items from server and merge into local storage. 40 * @param {Object} [options] 41 * @param {number} [options.since] - Override lastSyncTime 42 * @returns {Promise<{pulled: number, conflicts: number}>} 43 */ 44 async pullFromServer(options = {}) { 45 const config = await this.getConfig(); 46 if (!config.serverUrl || !config.apiKey) { 47 return { pulled: 0, conflicts: 0 }; 48 } 49 50 const since = options.since ?? config.lastSyncTime; 51 52 let path = '/items'; 53 if (since && since > 0) { 54 path = `/items/since/${toISOString(since)}`; 55 } 56 const separator = path.includes('?') ? '&' : '?'; 57 if (config.serverProfileId) { 58 path += `${separator}profile=${encodeURIComponent(config.serverProfileId)}`; 59 } 60 // Always include deleted items so tombstones propagate during sync 61 path += `${path.includes('?') ? '&' : '?'}includeDeleted=true`; 62 63 const response = await this._serverFetch( 64 config.serverUrl, 65 config.apiKey, 66 path 67 ); 68 const serverItems = response.items || []; 69 70 let pulled = 0; 71 let conflicts = 0; 72 73 for (const serverItem of serverItems) { 74 const result = await this._mergeServerItem(serverItem); 75 if (result === 'pulled') pulled++; 76 if (result === 'conflict') conflicts++; 77 } 78 79 return { pulled, conflicts }; 80 } 81 82 // ==================== Push (Client → Server) ==================== 83 84 /** 85 * Push unsynced local items to server. 86 * @returns {Promise<{pushed: number, failed: number}>} 87 */ 88 async pushToServer() { 89 const config = await this.getConfig(); 90 if (!config.serverUrl || !config.apiKey) { 91 return { pushed: 0, failed: 0 }; 92 } 93 94 const allItems = await this.data.queryItems({ includeDeleted: false }); 95 const lastSyncTime = config.lastSyncTime || 0; 96 97 let itemsToPush; 98 if (lastSyncTime > 0) { 99 // Incremental: never synced OR locally modified after their last sync 100 itemsToPush = allItems.filter( 101 i => 102 i.syncSource === '' || 103 (i.syncedAt > 0 && i.updatedAt > i.syncedAt) 104 ); 105 } else { 106 // Full: all items that haven't been synced 107 itemsToPush = allItems.filter(i => i.syncSource === ''); 108 } 109 110 // Also include deleted items that need tombstone push 111 const allWithDeleted = await this.data.queryItems({ includeDeleted: true }); 112 const deletedToPush = allWithDeleted.filter( 113 i => 114 i.deletedAt > 0 && 115 i.syncId && 116 i.syncId !== '' && 117 i.syncedAt > 0 && 118 i.updatedAt > i.syncedAt 119 ); 120 itemsToPush = itemsToPush.concat(deletedToPush); 121 122 let pushed = 0; 123 let failed = 0; 124 125 for (const item of itemsToPush) { 126 try { 127 const tags = await this.data.getItemTags(item.id); 128 const tagNames = tags.map(t => t.name); 129 130 let metadata = null; 131 if (item.metadata && item.metadata !== '{}') { 132 try { 133 metadata = JSON.parse(item.metadata); 134 } catch { 135 // Invalid JSON, skip metadata 136 } 137 } 138 139 const body = { 140 type: item.type, 141 content: item.content, 142 tags: tagNames, 143 syncId: item.syncId || item.id, 144 }; 145 if (metadata) body.metadata = metadata; 146 if (item.deletedAt > 0) { 147 body.deletedAt = item.deletedAt; 148 } 149 150 let pushPath = '/items'; 151 if (config.serverProfileId) { 152 pushPath += `?profile=${encodeURIComponent(config.serverProfileId)}`; 153 } 154 155 const response = await this._serverFetch( 156 config.serverUrl, 157 config.apiKey, 158 pushPath, 159 { method: 'POST', body } 160 ); 161 162 // Update local item with sync info 163 await this.data.adapter.updateItem(item.id, { 164 syncId: response.id, 165 syncSource: 'server', 166 syncedAt: Date.now(), 167 }); 168 169 pushed++; 170 } catch (error) { 171 console.error(`[sync] Push failed for item ${item.id}:`, error.message); 172 failed++; 173 } 174 } 175 176 return { pushed, failed }; 177 } 178 179 // ==================== Full Sync ==================== 180 181 /** 182 * Full bidirectional sync: pull, then push. 183 * @returns {Promise<{pulled: number, pushed: number, conflicts: number, lastSyncTime: number}>} 184 */ 185 async syncAll() { 186 const config = await this.getConfig(); 187 if (!config.serverUrl) { 188 return { pulled: 0, pushed: 0, conflicts: 0, lastSyncTime: 0 }; 189 } 190 191 await this.resetSyncStateIfServerChanged(config.serverUrl); 192 const startTime = Date.now(); 193 194 const pullResult = await this.pullFromServer(); 195 await this.saveSyncServerConfig(config.serverUrl); 196 const pushResult = await this.pushToServer(); 197 await this.setConfig({ lastSyncTime: startTime }); 198 199 return { 200 pulled: pullResult.pulled, 201 pushed: pushResult.pushed, 202 conflicts: pullResult.conflicts, 203 lastSyncTime: startTime, 204 }; 205 } 206 207 // ==================== Status ==================== 208 209 /** 210 * Get current sync status. 211 */ 212 async getSyncStatus() { 213 const config = await this.getConfig(); 214 const allItems = await this.data.queryItems({ includeDeleted: false }); 215 const pendingCount = allItems.filter( 216 i => 217 i.syncSource === '' || 218 (i.syncedAt > 0 && i.updatedAt > i.syncedAt) 219 ).length; 220 221 // Count deleted tombstones that need pushing 222 const allWithDeleted = await this.data.queryItems({ includeDeleted: true }); 223 const pendingTombstones = allWithDeleted.filter( 224 i => 225 i.deletedAt > 0 && 226 i.syncId && 227 i.syncId !== '' && 228 i.syncedAt > 0 && 229 i.updatedAt > i.syncedAt 230 ).length; 231 232 return { 233 configured: !!(config.serverUrl && config.apiKey), 234 lastSyncTime: config.lastSyncTime || 0, 235 pendingCount: pendingCount + pendingTombstones, 236 }; 237 } 238 239 // ==================== Server-Change Detection ==================== 240 241 /** 242 * Detect if the sync server has changed and reset per-item sync markers. 243 * @param {string} serverUrl 244 * @returns {Promise<boolean>} true if state was reset 245 */ 246 async resetSyncStateIfServerChanged(serverUrl) { 247 const config = await this.getConfig(); 248 const currentProfileId = config.serverProfileId || ''; 249 250 let storedUrl = ''; 251 let storedProfileId = ''; 252 try { 253 const val = await this.data.getSetting('sync_lastSyncServerUrl'); 254 if (val) storedUrl = JSON.parse(val); 255 } catch { 256 /* first sync */ 257 } 258 try { 259 const val = await this.data.getSetting('sync_lastSyncProfileId'); 260 if (val) storedProfileId = JSON.parse(val); 261 } catch { 262 /* first sync */ 263 } 264 265 // First sync — no stored config means we haven't tracked the server yet. 266 // Don't reset items that may have been pulled in a prior pull-only sync. 267 if (!storedUrl && !storedProfileId) { 268 return false; 269 } 270 271 const urlChanged = storedUrl && storedUrl !== serverUrl; 272 const profileChanged = 273 storedProfileId && storedProfileId !== currentProfileId; 274 275 if (urlChanged || profileChanged) { 276 const allItems = await this.data.queryItems({ includeDeleted: false }); 277 for (const item of allItems) { 278 await this.data.adapter.updateItem(item.id, { 279 syncSource: '', 280 syncedAt: 0, 281 syncId: '', 282 }); 283 } 284 await this.setConfig({ lastSyncTime: 0 }); 285 return true; 286 } 287 288 return false; 289 } 290 291 /** 292 * Save current server config for change detection on next sync. 293 * @param {string} serverUrl 294 */ 295 async saveSyncServerConfig(serverUrl) { 296 const config = await this.getConfig(); 297 const currentProfileId = config.serverProfileId || ''; 298 await this.data.setSetting( 299 'sync_lastSyncServerUrl', 300 JSON.stringify(serverUrl) 301 ); 302 await this.data.setSetting( 303 'sync_lastSyncProfileId', 304 JSON.stringify(currentProfileId) 305 ); 306 } 307 308 // ==================== Internal Helpers ==================== 309 310 /** 311 * Authenticated fetch to sync server with version header checks. 312 */ 313 async _serverFetch(serverUrl, apiKey, path, options = {}) { 314 const url = `${serverUrl.replace(/\/$/, '')}${path}`; 315 const response = await this._fetch(url, { 316 method: options.method || 'GET', 317 headers: { 318 Authorization: `Bearer ${apiKey}`, 319 'Content-Type': 'application/json', 320 'X-Peek-Datastore-Version': String(DATASTORE_VERSION), 321 'X-Peek-Protocol-Version': String(PROTOCOL_VERSION), 322 'X-Peek-Client': 'sync-engine', 323 }, 324 body: options.body ? JSON.stringify(options.body) : undefined, 325 }); 326 327 if (!response.ok) { 328 const error = await response.text(); 329 throw new Error(`Server error ${response.status}: ${error}`); 330 } 331 332 // Check server version headers 333 const serverDS = response.headers.get('X-Peek-Datastore-Version'); 334 const serverProto = response.headers.get('X-Peek-Protocol-Version'); 335 336 if (serverDS) { 337 const serverDSNum = parseInt(serverDS, 10); 338 if (serverDSNum !== DATASTORE_VERSION) { 339 throw new Error( 340 `Datastore version mismatch: server=${serverDSNum}, client=${DATASTORE_VERSION}. Please update.` 341 ); 342 } 343 } 344 345 if (serverProto) { 346 const serverProtoNum = parseInt(serverProto, 10); 347 if (serverProtoNum !== PROTOCOL_VERSION) { 348 throw new Error( 349 `Protocol version mismatch: server=${serverProtoNum}, client=${PROTOCOL_VERSION}. Please update.` 350 ); 351 } 352 } 353 354 return response.json(); 355 } 356 357 /** 358 * Merge a single server item into local storage (last-write-wins). 359 * @returns {Promise<'pulled'|'conflict'|'skipped'>} 360 */ 361 async _mergeServerItem(serverItem) { 362 const serverUpdatedAt = serverItem.updatedAt; 363 const serverDeletedAt = serverItem.deletedAt || 0; 364 365 // Find local item by syncId 366 const localItem = await this.data.adapter.findItemBySyncId(serverItem.id); 367 368 // Handle server-side tombstones (deletedAt > 0) 369 if (serverDeletedAt > 0) { 370 if (!localItem) { 371 // No local item — nothing to delete, skip 372 return 'skipped'; 373 } 374 if (localItem.deletedAt > 0) { 375 // Already deleted locally — just update syncedAt 376 await this.data.adapter.updateItem(localItem.id, { 377 syncedAt: Date.now(), 378 }); 379 return 'pulled'; 380 } 381 // Local item exists and is active — soft-delete it 382 await this.data.adapter.updateItem(localItem.id, { 383 deletedAt: serverDeletedAt, 384 updatedAt: serverDeletedAt, 385 syncedAt: Date.now(), 386 }); 387 return 'pulled'; 388 } 389 390 // Server item is active (deletedAt is 0 or absent) 391 if (!localItem) { 392 // New item from server — insert 393 const { id: localId } = await this.data.addItem(serverItem.type, { 394 content: serverItem.content || null, 395 metadata: serverItem.metadata 396 ? JSON.stringify(serverItem.metadata) 397 : null, 398 syncId: serverItem.id, 399 syncSource: 'server', 400 }); 401 402 // Overwrite timestamps to match server 403 await this.data.adapter.updateItem(localId, { 404 createdAt: serverItem.createdAt, 405 updatedAt: serverUpdatedAt, 406 syncedAt: Date.now(), 407 }); 408 409 // Sync tags 410 await this._syncTagsToItem(localId, serverItem.tags || []); 411 return 'pulled'; 412 } 413 414 // Local item exists and is deleted, but server item is active 415 if (localItem.deletedAt > 0) { 416 if (serverUpdatedAt > localItem.updatedAt) { 417 // Server is newer — undelete locally 418 await this.data.adapter.updateItem(localItem.id, { 419 content: serverItem.content || null, 420 metadata: serverItem.metadata 421 ? JSON.stringify(serverItem.metadata) 422 : null, 423 deletedAt: 0, 424 updatedAt: serverUpdatedAt, 425 syncedAt: Date.now(), 426 }); 427 await this._syncTagsToItem(localItem.id, serverItem.tags || []); 428 return 'pulled'; 429 } 430 // Local is newer — conflict (will push tombstone next sync) 431 return 'conflict'; 432 } 433 434 // Both active — check timestamps 435 if (serverUpdatedAt > localItem.updatedAt) { 436 // Server is newer — update local 437 await this.data.updateItem(localItem.id, { 438 content: serverItem.content || null, 439 metadata: serverItem.metadata 440 ? JSON.stringify(serverItem.metadata) 441 : null, 442 }); 443 await this.data.adapter.updateItem(localItem.id, { 444 updatedAt: serverUpdatedAt, 445 syncedAt: Date.now(), 446 }); 447 448 await this._syncTagsToItem(localItem.id, serverItem.tags || []); 449 return 'pulled'; 450 } 451 452 if (localItem.updatedAt > serverUpdatedAt) { 453 // Local is newer — conflict (local wins) 454 return 'conflict'; 455 } 456 457 // Same timestamp — skip 458 return 'skipped'; 459 } 460 461 /** 462 * Replace item tags with server-provided tag names. 463 */ 464 async _syncTagsToItem(itemId, tagNames) { 465 await this.data.adapter.clearItemTags(itemId); 466 for (const tagName of tagNames) { 467 const { tag } = await this.data.getOrCreateTag(tagName); 468 await this.data.adapter.tagItem(itemId, tag.id); 469 } 470 } 471} 472 473// ==================== Timestamp Conversion ==================== 474 475/** Convert Unix ms to ISO string (for server API). */ 476function toISOString(unixMs) { 477 return new Date(unixMs).toISOString(); 478} 479 480/** Convert ISO string or integer timestamp to Unix ms (from server API). */ 481function fromISOString(value) { 482 if (typeof value === 'number') return value; 483 return new Date(value).getTime(); 484}