better indexing on the server

Changed files
+228 -25
src
+81
src/client/components/feed-import-podcasts.tsx
··· 1 + import {useSignal} from '@preact/signals' 2 + import {useCallback} from 'preact/hooks' 3 + 4 + import {useSkypod} from '#client/skypod/context' 5 + import {useRealmIdentity} from '#realm/client/context-identity' 6 + 7 + // Popular podcasts 8 + const PODCAST_FEEDS = [ 9 + {url: 'https://feeds.simplecast.com/54nAGcIl', title: 'The Daily (NYT)'}, 10 + {url: 'https://feeds.megaphone.fm/WWO3519750118', title: 'Stuff You Should Know'}, 11 + { 12 + url: 'https://www.omnycontent.com/d/playlist/e73c998e-6e60-432f-8610-ae210140c5b1/A91018A4-EA4F-4130-BF55-AE270180FF7C/44710E1C-10BB-48D0-9524-AE270180FF93/podcast.rss', 13 + title: 'This American Life', 14 + }, 15 + {url: 'https://feeds.npr.org/510318/podcast.xml', title: 'Up First (NPR)'}, 16 + {url: 'https://feeds.megaphone.fm/darknetdiaries', title: 'Darknet Diaries'}, 17 + {url: 'https://changelog.com/podcast/feed', title: 'The Changelog'}, 18 + {url: 'https://feeds.simplecast.com/wgl4xEgL', title: 'Reply All'}, 19 + {url: 'https://feeds.megaphone.fm/ADL9840290619', title: 'The Ezra Klein Show'}, 20 + ] 21 + 22 + export const FeedImportPodcasts: preact.FunctionComponent = () => { 23 + const {identity} = useRealmIdentity() 24 + const store = useSkypod() 25 + 26 + const importing$ = useSignal(false) 27 + const imported$ = useSignal(0) 28 + const errors$ = useSignal<string[]>([]) 29 + 30 + const importAll = useCallback(() => { 31 + importing$.value = true 32 + imported$.value = 0 33 + errors$.value = [] 34 + 35 + const go = async () => { 36 + for (const feed of PODCAST_FEEDS) { 37 + try { 38 + const action = store.action('feed:add', { 39 + url: feed.url, 40 + lock: {by: identity.identid}, 41 + private: false, 42 + }) 43 + 44 + await store.dispatch(action) 45 + imported$.value++ 46 + } catch (ex: unknown) { 47 + const msg = ex instanceof Error ? ex.message : String(ex) 48 + errors$.value = [...errors$.value, `${feed.title}: ${msg}`] 49 + console.error(`Failed to import ${feed.title}:`, ex) 50 + } 51 + } 52 + 53 + importing$.value = false 54 + } 55 + 56 + go().catch((exc: unknown) => { 57 + console.error('error loading podcast feeds', exc) 58 + }) 59 + }, [store, identity.identid, importing$, imported$, errors$]) 60 + 61 + return ( 62 + <div className="feed-import-podcasts"> 63 + <button type="button" onClick={importAll} disabled={importing$.value}> 64 + {importing$.value 65 + ? `Importing... (${imported$.value}/${PODCAST_FEEDS.length})` 66 + : 'Import Podcast Feeds'} 67 + </button> 68 + 69 + {errors$.value.length > 0 && ( 70 + <details> 71 + <summary>Errors ({errors$.value.length})</summary> 72 + <ul> 73 + {errors$.value.map((err, i) => ( 74 + <li key={i}>{err}</li> 75 + ))} 76 + </ul> 77 + </details> 78 + )} 79 + </div> 80 + ) 81 + }
+80
src/client/components/feed-import-tech.tsx
··· 1 + import {useSignal} from '@preact/signals' 2 + import {useCallback} from 'preact/hooks' 3 + 4 + import {useSkypod} from '#client/skypod/context' 5 + import {useRealmIdentity} from '#realm/client/context-identity' 6 + 7 + // Popular tech/programming feeds 8 + const TECH_FEEDS = [ 9 + {url: 'https://hnrss.org/frontpage', title: 'Hacker News - Front Page'}, 10 + {url: 'https://hnrss.org/newest', title: 'Hacker News - Newest'}, 11 + {url: 'https://lobste.rs/rss', title: 'Lobsters'}, 12 + {url: 'https://www.theverge.com/rss/index.xml', title: 'The Verge'}, 13 + {url: 'https://techcrunch.com/feed/', title: 'TechCrunch'}, 14 + {url: 'https://arstechnica.com/feed/', title: 'Ars Technica'}, 15 + {url: 'https://www.wired.com/feed/rss', title: 'Wired'}, 16 + {url: 'https://stackoverflow.blog/feed/', title: 'Stack Overflow Blog'}, 17 + {url: 'https://github.blog/feed/', title: 'GitHub Blog'}, 18 + {url: 'https://blog.cloudflare.com/rss/', title: 'Cloudflare Blog'}, 19 + ] 20 + 21 + export const FeedImportTech: preact.FunctionComponent = () => { 22 + const {identity} = useRealmIdentity() 23 + const store = useSkypod() 24 + 25 + const importing$ = useSignal(false) 26 + const imported$ = useSignal(0) 27 + const errors$ = useSignal<string[]>([]) 28 + 29 + const importAll = useCallback(() => { 30 + importing$.value = true 31 + imported$.value = 0 32 + errors$.value = [] 33 + 34 + const go = async () => { 35 + for (const feed of TECH_FEEDS) { 36 + try { 37 + const action = store.action('feed:add', { 38 + url: feed.url, 39 + lock: {by: identity.identid}, 40 + private: false, 41 + }) 42 + 43 + await store.dispatch(action) 44 + imported$.value++ 45 + } catch (ex: unknown) { 46 + const msg = ex instanceof Error ? ex.message : String(ex) 47 + errors$.value = [...errors$.value, `${feed.title}: ${msg}`] 48 + console.error(`Failed to import ${feed.title}:`, ex) 49 + } 50 + } 51 + 52 + importing$.value = false 53 + } 54 + 55 + go().catch((exc: unknown) => { 56 + console.error('error loading tech feeds', exc) 57 + }) 58 + }, [store, identity.identid, importing$, imported$, errors$]) 59 + 60 + return ( 61 + <div className="feed-import-tech"> 62 + <button type="button" onClick={importAll} disabled={importing$.value}> 63 + {importing$.value 64 + ? `Importing... (${imported$.value}/${TECH_FEEDS.length})` 65 + : 'Import Tech Feeds'} 66 + </button> 67 + 68 + {errors$.value.length > 0 && ( 69 + <details> 70 + <summary>Errors ({errors$.value.length})</summary> 71 + <ul> 72 + {errors$.value.map((err, i) => ( 73 + <li key={i}>{err}</li> 74 + ))} 75 + </ul> 76 + </details> 77 + )} 78 + </div> 79 + ) 80 + }
+4
src/client/page-app.tsx
··· 9 9 10 10 import {DebugNuke} from './components/debug-nuke' 11 11 import {FeedImportNYTimes} from './components/feed-import-nytimes' 12 + import {FeedImportPodcasts} from './components/feed-import-podcasts' 13 + import {FeedImportTech} from './components/feed-import-tech' 12 14 import {Messenger} from './components/messenger' 13 15 import {PeerList} from './components/peer-list' 14 16 ··· 31 33 <RealmConnectionManager /> 32 34 <PeerList /> 33 35 <FeedImportNYTimes /> 36 + <FeedImportTech /> 37 + <FeedImportPodcasts /> 34 38 <Messenger /> 35 39 <DebugNuke /> 36 40 </SkypodProvider>
+12 -9
src/realm/client/service-connection-sync.ts
··· 59 59 60 60 async buildSyncDelta(clocks: PeerClocks): Promise<StoredAction[]> { 61 61 const states = await this.buildSyncState() 62 - const initial = this.#db.actions.where('[actor+clock]').equals([Dexie.minKey, Dexie.minKey]) 63 - // this is the root for the reduction; dexie doesn't have an "additive identity" for `or` - table.toCollection is (all) not (none) 62 + const [first, ...known] = Object.keys(states) as IdentID[] 63 + if (!first) return [] 64 + 65 + const initial = this.#db.actions.where('[actor+clock]').between( 66 + [first, clocks[first] ?? Dexie.minKey], 67 + [first, states[first]], 68 + clocks[first] == null, // include lower only if they don't have it 69 + true, // always include the most recent 70 + ) 64 71 65 - const known = Object.keys(states) as IdentID[] 66 72 const actions = known.reduce((memo, actor) => { 67 73 const clock = clocks[actor] ?? null 68 - return memo.or('[actor+clock]').between( 69 - [actor, clock ?? Dexie.minKey], 70 - [actor, states[actor]], 71 - clock == null, // include lower only if they don't have it 72 - true, // always include the most recent 73 - ) 74 + return memo 75 + .or('[actor+clock]') 76 + .between([actor, clock ?? Dexie.minKey], [actor, states[actor]], clock == null, true) 74 77 }, initial) 75 78 76 79 // already in clock order
+3 -2
src/realm/protocol/logical-clock.ts
··· 14 14 // hlc format: 'lc.seconds.counter.identid' 15 15 16 16 static #hlc = Symbol('hlc') 17 - static readonly pattern = `lc:\\d+:\\d+:${IdentBrand.pattern}` 17 + static readonly pattern = `lc:\\d+:\\d{6}:${IdentBrand.pattern}` 18 18 static readonly regexp = new RegExp(`^${this.pattern}`) 19 19 static readonly schema = z.string().regex(this.regexp).brand(this.#hlc) 20 20 ··· 108 108 this.#seconds = seconds 109 109 } 110 110 111 - const hlcstr = `lc:${this.#seconds.toFixed(0)}:${this.#counter}:${this.#identid}` 111 + const counter = String(this.#counter).padStart(6, '0') 112 + const hlcstr = `lc:${this.#seconds.toFixed(0)}:${counter}:${this.#identid}` 112 113 return this.tick(LogicalClock.parse(hlcstr)) 113 114 } 114 115
+48 -14
src/realm/server/state-storage.ts
··· 26 26 27 27 type RealmMetadata = z.infer<typeof realmMetadataSchema> 28 28 29 + const CLOCK_KEY_PREFIX = 'clock:' 30 + const ACTION_KEY_PREFIX = 'action:' 31 + 29 32 export class RealmStorage { 30 33 private static async storagedir() { 31 34 const dir = process.env.REALM_STORAGE_DIR || './data/realms' ··· 152 155 try { 153 156 const {identid: actor} = LogicalClock.extract(action.clk) 154 157 const storedAction: StoredAction = {actor, clock: action.clk, action} 155 - await this.#db.put(action.clk, JSON.stringify(storedAction)) 158 + 159 + // store action with compound key [actor:clock] 160 + const actionKey = `${ACTION_KEY_PREFIX}${actor}:${action.clk}` 161 + await this.#db.put(actionKey, JSON.stringify(storedAction)) 162 + 163 + // update latest clock for this actor 164 + const clockKey = `${CLOCK_KEY_PREFIX}${actor}` 165 + const existingClock = await this.#getClock(actor) 166 + if (!existingClock || LogicalClock.compare(action.clk, existingClock) > 0) { 167 + await this.#db.put(clockKey, action.clk) 168 + } 169 + 156 170 console.debug(`Stored action: ${action.clk} from ${actor}`) 157 171 } catch (err) { 158 172 console.error('Failed to store action:', action, err) ··· 164 178 const states: Record<IdentID, LCTimestamp | null> = {} 165 179 166 180 try { 167 - // Iterate through all actions (leveldb stores by clock which is sortable) 168 - for await (const value of this.#db.values({reverse: true})) { 169 - const stored = storedActionSchema.parse(JSON.parse(value)) 170 - if (!states[stored.actor]) { 171 - states[stored.actor] = stored.clock 172 - } 181 + // iterate _just_ over the "latest clock" keys 182 + for await (const [key, value] of this.#db.iterator({ 183 + gte: CLOCK_KEY_PREFIX, 184 + lte: `${CLOCK_KEY_PREFIX}\xFF`, 185 + })) { 186 + const actor = IdentBrand.parse(key.slice(CLOCK_KEY_PREFIX.length)) 187 + states[actor] = LogicalClock.parse(value) 173 188 } 174 189 } catch (err) { 175 190 console.error('Error building sync state:', err) ··· 180 195 181 196 async buildSyncDelta(clocks: PeerClocks): Promise<StoredAction[]> { 182 197 const results: StoredAction[] = [] 198 + 183 199 try { 184 - for await (const value of this.#db.values()) { 185 - const stored = storedActionSchema.parse(JSON.parse(value)) 186 - const knownClock = clocks[stored.actor] 200 + const state = await this.buildSyncState() 201 + for (const [key, ourClock] of Object.entries(state)) { 202 + if (!ourClock) continue // we don't have anything for this clock? 203 + 204 + const actor = key as IdentID 205 + const theirClock = clocks[actor] ?? null 206 + 207 + const gte = theirClock 208 + ? `${ACTION_KEY_PREFIX}${actor}:${theirClock}\x01` // exclude theirClock (they have it) 209 + : `${ACTION_KEY_PREFIX}${actor}:` // include all if they have nothing 210 + 211 + const lte = `${ACTION_KEY_PREFIX}${actor}:${ourClock}` 187 212 188 - // include if we don't know about this actor or our clock is newer 189 - if (!knownClock || LogicalClock.compare(stored.clock, knownClock) > 0) { 213 + for await (const value of this.#db.values({gte, lte})) { 214 + const stored = storedActionSchema.parse(JSON.parse(value)) 190 215 results.push(stored) 191 216 } 192 217 } ··· 194 219 console.error('Error building sync delta:', err) 195 220 } 196 221 197 - // results are already sorted by clock (leveldb key order) 198 - return results 222 + return results.sort((a, b) => LogicalClock.compare(a.clock, b.clock)) 223 + } 224 + 225 + async #getClock(actor: IdentID): Promise<LCTimestamp | null> { 226 + try { 227 + const clockKey = `${CLOCK_KEY_PREFIX}${actor}` 228 + return (await this.#db.get(clockKey)) as LCTimestamp 229 + } catch (err) { 230 + // Key doesn't exist 231 + return null 232 + } 199 233 } 200 234 201 235 async close(): Promise<void> {