A personal website powered by Astro and ATProto

simplified jetstream

+46
package-lock.json
··· 9 9 "version": "0.0.1", 10 10 "dependencies": { 11 11 "@astrojs/check": "^0.9.4", 12 + "@atcute/jetstream": "^1.0.2", 12 13 "@atproto/api": "^0.16.2", 13 14 "@atproto/xrpc": "^0.7.1", 14 15 "@nulfrost/leaflet-loader-astro": "^1.1.0", ··· 196 197 "dependencies": { 197 198 "@atcute/lexicons": "^1.0.4", 198 199 "@badrap/valita": "^0.4.5" 200 + } 201 + }, 202 + "node_modules/@atcute/jetstream": { 203 + "version": "1.0.2", 204 + "resolved": "https://registry.npmjs.org/@atcute/jetstream/-/jetstream-1.0.2.tgz", 205 + "integrity": "sha512-ZtdNNxl4zq9cgUpXSL9F+AsXUZt0Zuyj0V7974D7LxdMxfTItPnMZ9dRG8GoFkkGz3+pszdsG888Ix8C0F2+mA==", 206 + "license": "MIT", 207 + "dependencies": { 208 + "@atcute/lexicons": "^1.0.2", 209 + "@badrap/valita": "^0.4.2", 210 + "@mary-ext/event-iterator": "^1.0.0", 211 + "@mary-ext/simple-event-emitter": "^1.0.0", 212 + "partysocket": "^1.1.4", 213 + "type-fest": "^4.41.0", 214 + "yocto-queue": "^1.2.1" 199 215 } 200 216 }, 201 217 "node_modules/@atcute/lexicons": { ··· 1275 1291 "@jridgewell/sourcemap-codec": "^1.4.14" 1276 1292 } 1277 1293 }, 1294 + "node_modules/@mary-ext/event-iterator": { 1295 + "version": "1.0.0", 1296 + "resolved": "https://registry.npmjs.org/@mary-ext/event-iterator/-/event-iterator-1.0.0.tgz", 1297 + "integrity": "sha512-l6gCPsWJ8aRCe/s7/oCmero70kDHgIK5m4uJvYgwEYTqVxoBOIXbKr5tnkLqUHEg6mNduB4IWvms3h70Hp9ADQ==", 1298 + "license": "BSD-3-Clause", 1299 + "dependencies": { 1300 + "yocto-queue": "^1.2.1" 1301 + } 1302 + }, 1303 + "node_modules/@mary-ext/simple-event-emitter": { 1304 + "version": "1.0.0", 1305 + "resolved": "https://registry.npmjs.org/@mary-ext/simple-event-emitter/-/simple-event-emitter-1.0.0.tgz", 1306 + "integrity": "sha512-meA/zJZKIN1RVBNEYIbjufkUrW7/tRjHH60FjolpG1ixJKo76TB208qefQLNdOVDA7uIG0CGEDuhmMirtHKLAg==", 1307 + "license": "BSD-3-Clause" 1308 + }, 1278 1309 "node_modules/@nodelib/fs.scandir": { 1279 1310 "version": "2.1.5", 1280 1311 "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", ··· 3210 3241 "dependencies": { 3211 3242 "@types/estree": "^1.0.0" 3212 3243 } 3244 + }, 3245 + "node_modules/event-target-polyfill": { 3246 + "version": "0.0.4", 3247 + "resolved": "https://registry.npmjs.org/event-target-polyfill/-/event-target-polyfill-0.0.4.tgz", 3248 + "integrity": "sha512-Gs6RLjzlLRdT8X9ZipJdIZI/Y6/HhRLyq9RdDlCsnpxr/+Nn6bU2EFGuC94GjxqhM+Nmij2Vcq98yoHrU8uNFQ==", 3249 + "license": "MIT" 3213 3250 }, 3214 3251 "node_modules/eventemitter3": { 3215 3252 "version": "5.0.1", ··· 5304 5341 }, 5305 5342 "funding": { 5306 5343 "url": "https://github.com/inikulin/parse5?sponsor=1" 5344 + } 5345 + }, 5346 + "node_modules/partysocket": { 5347 + "version": "1.1.5", 5348 + "resolved": "https://registry.npmjs.org/partysocket/-/partysocket-1.1.5.tgz", 5349 + "integrity": "sha512-8uw9foq9bij4sKLCtTSHvyqMrMTQ5FJjrHc7BjoM2s95Vu7xYCN63ABpI7OZHC7ZMP5xaom/A+SsoFPXmTV6ZQ==", 5350 + "license": "MIT", 5351 + "dependencies": { 5352 + "event-target-polyfill": "^0.0.4" 5307 5353 } 5308 5354 }, 5309 5355 "node_modules/path-browserify": {
+1
package.json
··· 12 12 }, 13 13 "dependencies": { 14 14 "@astrojs/check": "^0.9.4", 15 + "@atcute/jetstream": "^1.0.2", 15 16 "@atproto/api": "^0.16.2", 16 17 "@atproto/xrpc": "^0.7.1", 17 18 "@nulfrost/leaflet-loader-astro": "^1.1.0",
+4 -4
src/components/content/ContentFeed.astro
··· 212 212 213 213 try { 214 214 // Use shared jetstream instead of creating a new connection 215 - const { startSharedStream, subscribeToPosts } = await import('../../lib/atproto/shared-jetstream'); 215 + const { startSharedStream, subscribeToPosts } = await import('../../lib/atproto/jetstream-client'); 216 216 217 217 // Start the shared stream 218 218 await startSharedStream(); 219 219 220 220 // Subscribe to new posts 221 - const unsubscribe = subscribeToPosts((record) => { 222 - if (record.operation === 'create') { 223 - const el = buildPostEl(record.value, record.did); 221 + const unsubscribe = subscribeToPosts((event) => { 222 + if (event.commit.operation === 'create') { 223 + const el = buildPostEl(event.commit.record, event.did); 224 224 // @ts-ignore 225 225 container.insertBefore(el, container.firstChild); 226 226 const posts = container.children;
+4 -4
src/components/content/StatusUpdate.astro
··· 43 43 </div> 44 44 45 45 <script> 46 - import { startSharedStream, subscribeToStatusUpdates } from '../../lib/atproto/shared-jetstream'; 46 + import { startSharedStream, subscribeToStatusUpdates } from '../../lib/atproto/jetstream-client'; 47 47 48 48 // Start the shared stream 49 49 startSharedStream(); 50 50 51 51 // Subscribe to status updates 52 - const unsubscribe = subscribeToStatusUpdates((record) => { 53 - if (record.operation === 'create') { 54 - updateStatusDisplay(record.value); 52 + const unsubscribe = subscribeToStatusUpdates((event) => { 53 + if (event.commit.operation === 'create') { 54 + updateStatusDisplay(event.commit.record); 55 55 } 56 56 }); 57 57
+187 -168
src/lib/atproto/jetstream-client.ts
··· 1 - // Jetstream-based repository streaming with DID filtering (based on atptools) 1 + // Complete Jetstream implementation using documented @atcute/jetstream approach 2 + import { JetstreamSubscription, type CommitEvent } from '@atcute/jetstream'; 2 3 import { loadConfig } from '../config/site'; 3 4 4 - export interface JetstreamRecord { 5 - uri: string; 6 - cid: string; 7 - value: any; 8 - indexedAt: string; 9 - collection: string; 10 - $type: string; 11 - service: string; 12 - did: string; 13 - time_us: number; 14 - operation: 'create' | 'update' | 'delete'; 15 - } 16 - 17 5 export interface JetstreamConfig { 18 6 handle: string; 19 7 did?: string; ··· 24 12 } 25 13 26 14 export class JetstreamClient { 27 - private ws: WebSocket | null = null; 15 + private subscription: JetstreamSubscription | null = null; 28 16 private config: JetstreamConfig; 29 - private targetDid: string | null = null; 30 17 private isStreaming = false; 31 18 private listeners: { 32 - onRecord?: (record: JetstreamRecord) => void; 19 + onRecord?: (event: CommitEvent) => void; 33 20 onError?: (error: Error) => void; 34 21 onConnect?: () => void; 35 22 onDisconnect?: () => void; ··· 40 27 this.config = { 41 28 handle: config?.handle || siteConfig.atproto.handle, 42 29 did: config?.did || siteConfig.atproto.did, 43 - endpoint: config?.endpoint || 'wss://jetstream1.us-east.bsky.network/subscribe', 30 + endpoint: config?.endpoint || 'wss://jetstream2.us-east.bsky.network', 44 31 wantedCollections: config?.wantedCollections || [], 45 32 wantedDids: config?.wantedDids || [], 46 33 cursor: config?.cursor, 47 34 }; 48 - this.targetDid = this.config.did || null; 49 35 50 - console.log('🔧 JetstreamClient initialized with handle:', this.config.handle); 51 - console.log('🎯 Target DID for filtering:', this.targetDid); 52 - console.log('🌐 Endpoint:', this.config.endpoint); 36 + console.log('🔧 JetstreamClient initialized'); 53 37 } 54 38 55 - // Start streaming all repository activity 56 39 async startStreaming(): Promise<void> { 57 40 if (this.isStreaming) { 58 - console.log('⚠️ Already streaming repository'); 41 + console.log('⚠️ Already streaming'); 59 42 return; 60 43 } 61 44 62 - console.log('🚀 Starting jetstream repository streaming...'); 45 + console.log('🚀 Starting jetstream streaming...'); 63 46 this.isStreaming = true; 64 47 65 48 try { 66 - // Resolve handle to DID if needed 67 - if (!this.targetDid) { 68 - this.targetDid = await this.resolveHandle(this.config.handle); 69 - if (!this.targetDid) { 70 - throw new Error(`Could not resolve handle: ${this.config.handle}`); 71 - } 72 - console.log('✅ Resolved DID:', this.targetDid); 49 + // Add our DID to wanted DIDs if specified 50 + const wantedDids = [...(this.config.wantedDids || [])]; 51 + if (this.config.did && !wantedDids.includes(this.config.did)) { 52 + wantedDids.push(this.config.did); 73 53 } 74 54 75 - // Add target DID to wanted DIDs 76 - if (this.targetDid && !this.config.wantedDids!.includes(this.targetDid)) { 77 - this.config.wantedDids!.push(this.targetDid); 78 - } 55 + this.subscription = new JetstreamSubscription({ 56 + url: this.config.endpoint!, 57 + wantedCollections: this.config.wantedCollections, 58 + wantedDids: wantedDids as any, 59 + cursor: this.config.cursor, 60 + onConnectionOpen: () => { 61 + console.log('✅ Connected to jetstream'); 62 + this.listeners.onConnect?.(); 63 + }, 64 + onConnectionClose: () => { 65 + console.log('🔌 Disconnected from jetstream'); 66 + this.isStreaming = false; 67 + this.listeners.onDisconnect?.(); 68 + }, 69 + onConnectionError: (error) => { 70 + console.error('❌ Jetstream connection error:', error); 71 + this.listeners.onError?.(new Error('Connection error')); 72 + }, 73 + }); 79 74 80 - // Start WebSocket connection 81 - this.connect(); 75 + // Process events using async iteration as documented 76 + this.processEvents(); 82 77 83 78 } catch (error) { 84 79 this.isStreaming = false; ··· 86 81 } 87 82 } 88 83 89 - // Stop streaming 90 - stopStreaming(): void { 91 - if (this.ws) { 92 - this.ws.close(); 93 - this.ws = null; 94 - } 95 - this.isStreaming = false; 96 - console.log('🛑 Stopped jetstream streaming'); 97 - this.listeners.onDisconnect?.(); 98 - } 84 + private async processEvents(): Promise<void> { 85 + if (!this.subscription) return; 99 86 100 - // Connect to jetstream WebSocket 101 - private connect(): void { 102 87 try { 103 - const url = new URL(this.config.endpoint!); 104 - 105 - // Add query parameters for filtering (using atptools' parameter names) 106 - this.config.wantedCollections!.forEach((collection) => { 107 - url.searchParams.append('wantedCollections', collection); 108 - }); 109 - this.config.wantedDids!.forEach((did) => { 110 - url.searchParams.append('wantedDids', did); 111 - }); 112 - if (this.config.cursor) { 113 - url.searchParams.set('cursor', this.config.cursor.toString()); 88 + // Use the documented async iteration approach 89 + for await (const event of this.subscription) { 90 + if (event.kind === 'commit') { 91 + console.log('📝 New commit:', { 92 + collection: event.commit.collection, 93 + operation: event.commit.operation, 94 + did: event.did, 95 + }); 96 + 97 + this.listeners.onRecord?.(event); 98 + } 114 99 } 115 - 116 - console.log('🔌 Connecting to jetstream:', url.toString()); 117 - 118 - this.ws = new WebSocket(url.toString()); 119 - 120 - this.ws.onopen = () => { 121 - console.log('✅ Connected to jetstream'); 122 - this.listeners.onConnect?.(); 123 - }; 124 - 125 - this.ws.onmessage = (event) => { 126 - try { 127 - const data = JSON.parse(event.data); 128 - this.handleMessage(data); 129 - } catch (error) { 130 - console.error('Error parsing jetstream message:', error); 131 - } 132 - }; 133 - 134 - this.ws.onerror = (error) => { 135 - console.error('❌ Jetstream WebSocket error:', error); 136 - this.listeners.onError?.(new Error('WebSocket error')); 137 - }; 138 - 139 - this.ws.onclose = () => { 140 - console.log('🔌 Disconnected from jetstream'); 141 - this.isStreaming = false; 142 - this.listeners.onDisconnect?.(); 143 - }; 144 - 145 100 } catch (error) { 146 - console.error('Error connecting to jetstream:', error); 101 + console.error('Error processing jetstream events:', error); 147 102 this.listeners.onError?.(error as Error); 103 + } finally { 104 + this.isStreaming = false; 105 + this.listeners.onDisconnect?.(); 148 106 } 149 107 } 150 108 151 - // Handle incoming jetstream messages 152 - private handleMessage(data: any): void { 153 - try { 154 - // Handle different message types based on atptools' format 155 - if (data.kind === 'commit') { 156 - this.handleCommit(data); 157 - } else if (data.kind === 'account') { 158 - console.log('Account event:', data); 159 - } else if (data.kind === 'identity') { 160 - console.log('Identity event:', data); 161 - } else { 162 - console.log('Unknown message type:', data); 163 - } 164 - } catch (error) { 165 - console.error('Error handling jetstream message:', error); 166 - } 167 - } 168 - 169 - // Handle commit events (record changes) 170 - private handleCommit(data: any): void { 171 - try { 172 - const commit = data.commit; 173 - const event = data; 174 - 175 - // Filter by DID if specified 176 - if (this.targetDid && event.did !== this.targetDid) { 177 - return; 178 - } 179 - 180 - const jetstreamRecord: JetstreamRecord = { 181 - uri: `at://${event.did}/${commit.collection}/${commit.rkey}`, 182 - cid: commit.cid || '', 183 - value: commit.record || {}, 184 - indexedAt: new Date(event.time_us / 1000).toISOString(), 185 - collection: commit.collection, 186 - $type: (commit.record?.$type as string) || 'unknown', 187 - service: this.inferService((commit.record?.$type as string) || '', commit.collection), 188 - did: event.did, 189 - time_us: event.time_us, 190 - operation: commit.operation, 191 - }; 192 - 193 - console.log('📝 New record from jetstream:', { 194 - collection: jetstreamRecord.collection, 195 - $type: jetstreamRecord.$type, 196 - operation: jetstreamRecord.operation, 197 - uri: jetstreamRecord.uri, 198 - service: jetstreamRecord.service 199 - }); 200 - 201 - this.listeners.onRecord?.(jetstreamRecord); 202 - } catch (error) { 203 - console.error('Error handling commit:', error); 204 - } 205 - } 206 - 207 - // Infer service from record type and collection 208 - private inferService($type: string, collection: string): string { 209 - if (collection.startsWith('grain.social')) return 'grain.social'; 210 - if (collection.startsWith('app.bsky')) return 'bsky.app'; 211 - if ($type.includes('grain')) return 'grain.social'; 212 - return 'unknown'; 213 - } 214 - 215 - // Resolve handle to DID 216 - private async resolveHandle(handle: string): Promise<string | null> { 217 - try { 218 - // For now, use the configured DID 219 - // In a real implementation, you'd call the ATProto API 220 - return this.config.did || null; 221 - } catch (error) { 222 - console.error('Error resolving handle:', error); 223 - return null; 224 - } 109 + stopStreaming(): void { 110 + this.subscription = null; 111 + this.isStreaming = false; 112 + console.log('🛑 Stopped jetstream streaming'); 113 + this.listeners.onDisconnect?.(); 225 114 } 226 115 227 116 // Event listeners 228 - onRecord(callback: (record: JetstreamRecord) => void): void { 117 + onRecord(callback: (event: CommitEvent) => void): void { 229 118 this.listeners.onRecord = callback; 230 119 } 231 120 ··· 241 130 this.listeners.onDisconnect = callback; 242 131 } 243 132 244 - // Get streaming status 245 133 getStatus(): 'streaming' | 'stopped' { 246 134 return this.isStreaming ? 'streaming' : 'stopped'; 247 135 } 136 + } 137 + 138 + // Shared Jetstream functionality 139 + let sharedJetstream: JetstreamClient | null = null; 140 + let connectionCount = 0; 141 + const listeners: Map<string, Set<(event: CommitEvent) => void>> = new Map(); 142 + 143 + export function getSharedJetstream(): JetstreamClient { 144 + if (!sharedJetstream) { 145 + // Create a shared client with common collections 146 + sharedJetstream = new JetstreamClient({ 147 + wantedCollections: [ 148 + 'app.bsky.feed.post', 149 + 'a.status.update', 150 + 'social.grain.gallery', 151 + 'social.grain.gallery.item', 152 + 'social.grain.photo', 153 + 'com.whtwnd.blog.entry' 154 + ] 155 + }); 156 + 157 + // Set up the main record handler that distributes to filtered listeners 158 + sharedJetstream.onRecord((event) => { 159 + // Distribute to all listeners that match the filter 160 + listeners.forEach((listenerSet, filterKey) => { 161 + if (matchesFilter(event, filterKey)) { 162 + listenerSet.forEach(callback => callback(event)); 163 + } 164 + }); 165 + }); 166 + } 167 + return sharedJetstream; 168 + } 169 + 170 + // Start the shared stream (call once when first component needs it) 171 + export async function startSharedStream(): Promise<void> { 172 + const jetstream = getSharedJetstream(); 173 + if (connectionCount === 0) { 174 + await jetstream.startStreaming(); 175 + } 176 + connectionCount++; 177 + } 178 + 179 + // Stop the shared stream (call when last component is done) 180 + export function stopSharedStream(): void { 181 + connectionCount--; 182 + if (connectionCount <= 0 && sharedJetstream) { 183 + sharedJetstream.stopStreaming(); 184 + connectionCount = 0; 185 + } 186 + } 187 + 188 + // Subscribe to filtered records 189 + export function subscribeToRecords( 190 + filter: string | ((event: CommitEvent) => boolean), 191 + callback: (event: CommitEvent) => void 192 + ): () => void { 193 + const filterKey = typeof filter === 'string' ? filter : filter.toString(); 194 + 195 + if (!listeners.has(filterKey)) { 196 + listeners.set(filterKey, new Set()); 197 + } 198 + 199 + const listenerSet = listeners.get(filterKey)!; 200 + listenerSet.add(callback); 201 + 202 + // Return unsubscribe function 203 + return () => { 204 + const set = listeners.get(filterKey); 205 + if (set) { 206 + set.delete(callback); 207 + if (set.size === 0) { 208 + listeners.delete(filterKey); 209 + } 210 + } 211 + }; 212 + } 213 + 214 + // Helper to check if a record matches a filter 215 + function matchesFilter(event: CommitEvent, filterKey: string): boolean { 216 + // Handle delete operations (no record property) 217 + if (event.commit.operation === 'delete') { 218 + // For delete operations, only support collection and operation matching 219 + if (filterKey.startsWith('collection:')) { 220 + const expectedCollection = filterKey.substring(11); 221 + return event.commit.collection === expectedCollection; 222 + } 223 + if (filterKey.startsWith('operation:')) { 224 + const expectedOperation = filterKey.substring(10); 225 + return event.commit.operation === expectedOperation; 226 + } 227 + return false; 228 + } 229 + 230 + // For create/update operations, we have record data 231 + const record = event.commit.record; 232 + const $type = record?.$type as string; 233 + 234 + // Support simple $type matching 235 + if (filterKey.startsWith('$type:')) { 236 + const expectedType = filterKey.substring(6); 237 + return $type === expectedType; 238 + } 239 + 240 + // Support collection matching 241 + if (filterKey.startsWith('collection:')) { 242 + const expectedCollection = filterKey.substring(11); 243 + return event.commit.collection === expectedCollection; 244 + } 245 + 246 + // Support operation matching 247 + if (filterKey.startsWith('operation:')) { 248 + const expectedOperation = filterKey.substring(10); 249 + return event.commit.operation === expectedOperation; 250 + } 251 + 252 + // Default to exact match 253 + return $type === filterKey; 254 + } 255 + 256 + // Convenience functions for common filters 257 + export function subscribeToStatusUpdates(callback: (event: CommitEvent) => void): () => void { 258 + return subscribeToRecords('$type:a.status.update', callback); 259 + } 260 + 261 + export function subscribeToPosts(callback: (event: CommitEvent) => void): () => void { 262 + return subscribeToRecords('$type:app.bsky.feed.post', callback); 263 + } 264 + 265 + export function subscribeToGalleryUpdates(callback: (event: CommitEvent) => void): () => void { 266 + return subscribeToRecords('collection:social.grain.gallery', callback); 248 267 }
-105
src/lib/atproto/shared-jetstream.ts
··· 1 - // Shared Jetstream instance for components to access and filter 2 - import { JetstreamClient, type JetstreamRecord } from './jetstream-client'; 3 - 4 - let sharedJetstream: JetstreamClient | null = null; 5 - let connectionCount = 0; 6 - const listeners: Map<string, Set<(record: JetstreamRecord) => void>> = new Map(); 7 - 8 - export function getSharedJetstream(): JetstreamClient { 9 - if (!sharedJetstream) { 10 - sharedJetstream = new JetstreamClient(); 11 - 12 - // Set up the main record handler that distributes to filtered listeners 13 - sharedJetstream.onRecord((record) => { 14 - // Distribute to all listeners that match the filter 15 - listeners.forEach((listenerSet, filterKey) => { 16 - if (matchesFilter(record, filterKey)) { 17 - listenerSet.forEach(callback => callback(record)); 18 - } 19 - }); 20 - }); 21 - } 22 - return sharedJetstream; 23 - } 24 - 25 - // Start the shared stream (call once when first component needs it) 26 - export async function startSharedStream(): Promise<void> { 27 - const jetstream = getSharedJetstream(); 28 - if (connectionCount === 0) { 29 - await jetstream.startStreaming(); 30 - } 31 - connectionCount++; 32 - } 33 - 34 - // Stop the shared stream (call when last component is done) 35 - export function stopSharedStream(): void { 36 - connectionCount--; 37 - if (connectionCount <= 0 && sharedJetstream) { 38 - sharedJetstream.stopStreaming(); 39 - connectionCount = 0; 40 - } 41 - } 42 - 43 - // Subscribe to filtered records 44 - export function subscribeToRecords( 45 - filter: string | ((record: JetstreamRecord) => boolean), 46 - callback: (record: JetstreamRecord) => void 47 - ): () => void { 48 - const filterKey = typeof filter === 'string' ? filter : filter.toString(); 49 - 50 - if (!listeners.has(filterKey)) { 51 - listeners.set(filterKey, new Set()); 52 - } 53 - 54 - const listenerSet = listeners.get(filterKey)!; 55 - listenerSet.add(callback); 56 - 57 - // Return unsubscribe function 58 - return () => { 59 - const set = listeners.get(filterKey); 60 - if (set) { 61 - set.delete(callback); 62 - if (set.size === 0) { 63 - listeners.delete(filterKey); 64 - } 65 - } 66 - }; 67 - } 68 - 69 - // Helper to check if a record matches a filter 70 - function matchesFilter(record: JetstreamRecord, filterKey: string): boolean { 71 - // If filter is a function string, we can't easily evaluate it 72 - // For now, support simple $type matching 73 - if (filterKey.startsWith('$type:')) { 74 - const expectedType = filterKey.substring(6); 75 - return record.$type === expectedType; 76 - } 77 - 78 - // Support collection matching 79 - if (filterKey.startsWith('collection:')) { 80 - const expectedCollection = filterKey.substring(11); 81 - return record.collection === expectedCollection; 82 - } 83 - 84 - // Support operation matching 85 - if (filterKey.startsWith('operation:')) { 86 - const expectedOperation = filterKey.substring(10); 87 - return record.operation === expectedOperation; 88 - } 89 - 90 - // Default to exact match 91 - return filterKey === record.$type; 92 - } 93 - 94 - // Convenience functions for common filters 95 - export function subscribeToStatusUpdates(callback: (record: JetstreamRecord) => void): () => void { 96 - return subscribeToRecords('$type:a.status.update', callback); 97 - } 98 - 99 - export function subscribeToPosts(callback: (record: JetstreamRecord) => void): () => void { 100 - return subscribeToRecords('$type:app.bsky.feed.post', callback); 101 - } 102 - 103 - export function subscribeToGalleryUpdates(callback: (record: JetstreamRecord) => void): () => void { 104 - return subscribeToRecords('collection:social.grain.gallery', callback); 105 - }
-271
src/lib/services/content-system.ts
··· 1 - import { AtprotoBrowser } from '../atproto/atproto-browser'; 2 - import { JetstreamClient } from '../atproto/jetstream-client'; 3 - import { GrainGalleryService } from './grain-gallery-service'; 4 - import { loadConfig } from '../config/site'; 5 - import type { AtprotoRecord } from '../atproto/atproto-browser'; 6 - 7 - export interface ContentItem { 8 - uri: string; 9 - cid: string; 10 - $type: string; 11 - collection: string; 12 - createdAt: string; 13 - indexedAt: string; 14 - value: any; 15 - service: string; 16 - operation?: 'create' | 'update' | 'delete'; 17 - } 18 - 19 - export interface ContentFeed { 20 - items: ContentItem[]; 21 - lastUpdated: string; 22 - totalItems: number; 23 - collections: string[]; 24 - } 25 - 26 - export interface ContentSystemConfig { 27 - enableStreaming?: boolean; 28 - buildTimeOnly?: boolean; 29 - collections?: string[]; 30 - maxItems?: number; 31 - } 32 - 33 - export class ContentSystem { 34 - private browser: AtprotoBrowser; 35 - private jetstream: JetstreamClient; 36 - private grainGalleryService: GrainGalleryService; 37 - private config: any; 38 - private contentFeed: ContentFeed; 39 - private isStreaming = false; 40 - 41 - constructor() { 42 - this.config = loadConfig(); 43 - this.browser = new AtprotoBrowser(); 44 - this.jetstream = new JetstreamClient(); 45 - this.grainGalleryService = new GrainGalleryService(); 46 - 47 - this.contentFeed = { 48 - items: [], 49 - lastUpdated: new Date().toISOString(), 50 - totalItems: 0, 51 - collections: [] 52 - }; 53 - } 54 - 55 - // Initialize content system (build-time) 56 - async initialize(identifier: string, options: ContentSystemConfig = {}): Promise<ContentFeed> { 57 - console.log('🚀 Initializing content system for:', identifier); 58 - 59 - try { 60 - // Get repository info 61 - const repoInfo = await this.browser.getRepoInfo(identifier); 62 - if (!repoInfo) { 63 - throw new Error(`Could not get repository info for: ${identifier}`); 64 - } 65 - 66 - console.log('📊 Repository info:', { 67 - handle: repoInfo.handle, 68 - did: repoInfo.did, 69 - collections: repoInfo.collections.length, 70 - recordCount: repoInfo.recordCount 71 - }); 72 - 73 - // Gather all content from collections 74 - const allItems: ContentItem[] = []; 75 - const collections = options.collections || repoInfo.collections; 76 - 77 - for (const collection of collections) { 78 - console.log(`📦 Fetching from collection: ${collection}`); 79 - const records = await this.browser.getCollectionRecords(identifier, collection, options.maxItems || 100); 80 - 81 - if (records && records.records) { 82 - for (const record of records.records) { 83 - const contentItem: ContentItem = { 84 - uri: record.uri, 85 - cid: record.cid, 86 - $type: record.$type, 87 - collection: record.collection, 88 - createdAt: record.value?.createdAt || record.indexedAt, 89 - indexedAt: record.indexedAt, 90 - value: record.value, 91 - service: this.inferService(record.$type, record.collection), 92 - operation: 'create' // Build-time items are existing 93 - }; 94 - 95 - allItems.push(contentItem); 96 - } 97 - } 98 - } 99 - 100 - // Sort by creation date (newest first) 101 - allItems.sort((a, b) => { 102 - const dateA = new Date(a.createdAt); 103 - const dateB = new Date(b.createdAt); 104 - return dateB.getTime() - dateA.getTime(); 105 - }); 106 - 107 - this.contentFeed = { 108 - items: allItems, 109 - lastUpdated: new Date().toISOString(), 110 - totalItems: allItems.length, 111 - collections: collections 112 - }; 113 - 114 - console.log(`✅ Content system initialized with ${allItems.length} items`); 115 - 116 - // Start streaming if enabled 117 - if (!options.buildTimeOnly && options.enableStreaming !== false) { 118 - await this.startStreaming(identifier); 119 - } 120 - 121 - return this.contentFeed; 122 - } catch (error) { 123 - console.error('Error initializing content system:', error); 124 - throw error; 125 - } 126 - } 127 - 128 - // Start real-time streaming 129 - async startStreaming(identifier: string): Promise<void> { 130 - if (this.isStreaming) { 131 - console.log('⚠️ Already streaming'); 132 - return; 133 - } 134 - 135 - console.log('🌊 Starting real-time content streaming...'); 136 - this.isStreaming = true; 137 - 138 - // Set up jetstream event handlers 139 - this.jetstream.onRecord((record) => { 140 - this.handleNewContent(record); 141 - }); 142 - 143 - this.jetstream.onError((error) => { 144 - console.error('❌ Jetstream error:', error); 145 - }); 146 - 147 - this.jetstream.onConnect(() => { 148 - console.log('✅ Connected to real-time stream'); 149 - }); 150 - 151 - this.jetstream.onDisconnect(() => { 152 - console.log('🔌 Disconnected from real-time stream'); 153 - this.isStreaming = false; 154 - }); 155 - 156 - // Start streaming 157 - await this.jetstream.startStreaming(); 158 - } 159 - 160 - // Handle new content from streaming 161 - private handleNewContent(jetstreamRecord: any): void { 162 - const contentItem: ContentItem = { 163 - uri: jetstreamRecord.uri, 164 - cid: jetstreamRecord.cid, 165 - $type: jetstreamRecord.$type, 166 - collection: jetstreamRecord.collection, 167 - createdAt: jetstreamRecord.value?.createdAt || jetstreamRecord.indexedAt, 168 - indexedAt: jetstreamRecord.indexedAt, 169 - value: jetstreamRecord.value, 170 - service: jetstreamRecord.service, 171 - operation: jetstreamRecord.operation 172 - }; 173 - 174 - // Add to beginning of feed (newest first) 175 - this.contentFeed.items.unshift(contentItem); 176 - this.contentFeed.totalItems++; 177 - this.contentFeed.lastUpdated = new Date().toISOString(); 178 - 179 - console.log('📝 New content added:', { 180 - $type: contentItem.$type, 181 - collection: contentItem.collection, 182 - operation: contentItem.operation 183 - }); 184 - 185 - // Emit event for UI updates 186 - this.emitContentUpdate(contentItem); 187 - } 188 - 189 - // Get current content feed 190 - getContentFeed(): ContentFeed { 191 - return this.contentFeed; 192 - } 193 - 194 - // Get content by type 195 - getContentByType($type: string): ContentItem[] { 196 - return this.contentFeed.items.filter(item => item.$type === $type); 197 - } 198 - 199 - // Get content by collection 200 - getContentByCollection(collection: string): ContentItem[] { 201 - return this.contentFeed.items.filter(item => item.collection === collection); 202 - } 203 - 204 - // Get galleries (using specialized service) 205 - async getGalleries(identifier: string): Promise<any[]> { 206 - return await this.grainGalleryService.getGalleries(identifier); 207 - } 208 - 209 - // Filter content by function 210 - filterContent(filterFn: (item: ContentItem) => boolean): ContentItem[] { 211 - return this.contentFeed.items.filter(filterFn); 212 - } 213 - 214 - // Search content 215 - searchContent(query: string): ContentItem[] { 216 - const lowerQuery = query.toLowerCase(); 217 - return this.contentFeed.items.filter(item => { 218 - const text = JSON.stringify(item.value).toLowerCase(); 219 - return text.includes(lowerQuery); 220 - }); 221 - } 222 - 223 - // Stop streaming 224 - stopStreaming(): void { 225 - if (this.isStreaming) { 226 - this.jetstream.stopStreaming(); 227 - this.isStreaming = false; 228 - } 229 - } 230 - 231 - // Infer service from record type and collection 232 - private inferService($type: string, collection: string): string { 233 - if (collection.startsWith('grain.social') || $type.includes('grain')) return 'grain.social'; 234 - if (collection.startsWith('app.bsky')) return 'bsky.app'; 235 - if (collection.startsWith('sh.tangled')) return 'sh.tangled'; 236 - return 'unknown'; 237 - } 238 - 239 - // Event system for UI updates 240 - private listeners: { 241 - onContentUpdate?: (item: ContentItem) => void; 242 - onContentAdd?: (item: ContentItem) => void; 243 - onContentRemove?: (item: ContentItem) => void; 244 - } = {}; 245 - 246 - onContentUpdate(callback: (item: ContentItem) => void): void { 247 - this.listeners.onContentUpdate = callback; 248 - } 249 - 250 - onContentAdd(callback: (item: ContentItem) => void): void { 251 - this.listeners.onContentAdd = callback; 252 - } 253 - 254 - onContentRemove(callback: (item: ContentItem) => void): void { 255 - this.listeners.onContentRemove = callback; 256 - } 257 - 258 - private emitContentUpdate(item: ContentItem): void { 259 - this.listeners.onContentUpdate?.(item); 260 - if (item.operation === 'create') { 261 - this.listeners.onContentAdd?.(item); 262 - } else if (item.operation === 'delete') { 263 - this.listeners.onContentRemove?.(item); 264 - } 265 - } 266 - 267 - // Get streaming status 268 - getStreamingStatus(): 'streaming' | 'stopped' { 269 - return this.isStreaming ? 'streaming' : 'stopped'; 270 - } 271 - }
+1 -1
src/pages/leaflets/[leaflet].astro
··· 41 41 <div class="text-sm text-gray-500 dark:text-gray-400 mb-6"> 42 42 {document.data.publishedAt && ( 43 43 <span> 44 - Published: {new Date(document.data.publishedAt).toLocaleDateString('en-US', { 44 + {new Date(document.data.publishedAt).toLocaleDateString('en-US', { 45 45 year: 'numeric', 46 46 month: 'long', 47 47 day: 'numeric',