Barazo AppView backend barazo.forum
at main 314 lines 8.4 kB view raw
1import { eq, and, inArray } from 'drizzle-orm' 2import type { Database } from '../db/index.js' 3import type { Cache } from '../cache/index.js' 4import type { Logger } from '../lib/logger.js' 5import { ozoneLabels } from '../db/schema/ozone-labels.js' 6import { decodeEventStreamFrame } from '../lib/cbor-frames.js' 7 8const CACHE_TTL = 3600 // 1 hour 9const CACHE_PREFIX = 'ozone:labels:' 10const INITIAL_RECONNECT_MS = 1000 11const MAX_RECONNECT_MS = 60000 12const SPAM_LABELS = new Set(['spam', '!hide']) 13 14interface LabelEvent { 15 seq: number 16 labels: Label[] 17} 18 19interface Label { 20 src: string 21 uri: string 22 val: string 23 neg?: boolean 24 cts: string 25 exp?: string 26} 27 28interface CachedLabel { 29 val: string 30 src: string 31 neg: boolean 32} 33 34export class OzoneService { 35 private ws: WebSocket | null = null 36 private reconnectMs = INITIAL_RECONNECT_MS 37 private stopping = false 38 39 constructor( 40 private db: Database, 41 private cache: Cache, 42 private logger: Logger, 43 private labelerUrl: string 44 ) {} 45 46 start(): void { 47 this.stopping = false 48 this.connect() 49 } 50 51 stop(): void { 52 this.stopping = true 53 if (this.ws) { 54 this.ws.close() 55 this.ws = null 56 } 57 } 58 59 private connect(): void { 60 if (this.stopping) return 61 62 const wsUrl = this.labelerUrl.replace(/^https?:/, 'wss:').replace(/\/$/, '') 63 const url = `${wsUrl}/xrpc/com.atproto.label.subscribeLabels` 64 65 this.logger.info({ url }, 'Connecting to Ozone labeler') 66 67 try { 68 this.ws = new WebSocket(url) 69 } catch (err) { 70 this.logger.warn({ err }, 'Failed to create Ozone WebSocket') 71 this.scheduleReconnect() 72 return 73 } 74 75 this.ws.addEventListener('open', () => { 76 this.logger.info('Connected to Ozone labeler') 77 this.reconnectMs = INITIAL_RECONNECT_MS 78 }) 79 80 this.ws.addEventListener('message', (event) => { 81 void this.handleMessage(event.data) 82 }) 83 84 this.ws.addEventListener('close', () => { 85 this.logger.info('Ozone labeler connection closed') 86 this.scheduleReconnect() 87 }) 88 89 this.ws.addEventListener('error', (event) => { 90 this.logger.warn({ event }, 'Ozone labeler WebSocket error') 91 }) 92 } 93 94 private scheduleReconnect(): void { 95 if (this.stopping) return 96 97 this.logger.info({ reconnectMs: this.reconnectMs }, 'Scheduling Ozone labeler reconnect') 98 99 setTimeout(() => { 100 this.connect() 101 }, this.reconnectMs) 102 103 this.reconnectMs = Math.min(this.reconnectMs * 2, MAX_RECONNECT_MS) 104 } 105 106 private async handleMessage(data: unknown): Promise<void> { 107 try { 108 const bytes = await this.toBinaryData(data) 109 const { header, body } = decodeEventStreamFrame(bytes) 110 111 // Error frame (op: -1) -- log and skip 112 if (header.op === -1) { 113 const error = typeof body.error === 'string' ? body.error : 'unknown' 114 const message = typeof body.message === 'string' ? body.message : undefined 115 this.logger.warn({ error, message }, 'Ozone labeler error frame received') 116 return 117 } 118 119 // Only process #labels messages; skip other types silently 120 if (header.t !== '#labels') return 121 122 const event = body as unknown as LabelEvent 123 if (!Array.isArray(event.labels)) return 124 125 for (const label of event.labels) { 126 await this.processLabel(label) 127 } 128 } catch (err) { 129 this.logger.warn({ err }, 'Failed to process Ozone label event') 130 } 131 } 132 133 /** 134 * Convert incoming WebSocket data to a Uint8Array for CBOR decoding. 135 * 136 * The AT Protocol event stream sends binary CBOR-encoded frames. 137 * Node.js native WebSocket may deliver data as Blob, ArrayBuffer, 138 * Buffer, or Uint8Array depending on the binaryType setting. 139 */ 140 private async toBinaryData(data: unknown): Promise<Uint8Array> { 141 if (data instanceof Uint8Array) return data 142 if (data instanceof ArrayBuffer) return new Uint8Array(data) 143 if (data instanceof Blob) return new Uint8Array(await data.arrayBuffer()) 144 throw new Error(`Unexpected WebSocket data type: ${typeof data}`) 145 } 146 147 private async processLabel(label: Label): Promise<void> { 148 if (label.neg) { 149 // Negation: remove the prior label 150 await this.db 151 .delete(ozoneLabels) 152 .where( 153 and( 154 eq(ozoneLabels.src, label.src), 155 eq(ozoneLabels.uri, label.uri), 156 eq(ozoneLabels.val, label.val) 157 ) 158 ) 159 } else { 160 // Upsert the label 161 await this.db 162 .insert(ozoneLabels) 163 .values({ 164 src: label.src, 165 uri: label.uri, 166 val: label.val, 167 neg: false, 168 cts: new Date(label.cts), 169 exp: label.exp ? new Date(label.exp) : undefined, 170 }) 171 .onConflictDoUpdate({ 172 target: [ozoneLabels.src, ozoneLabels.uri, ozoneLabels.val], 173 set: { 174 neg: false, 175 cts: new Date(label.cts), 176 exp: label.exp ? new Date(label.exp) : undefined, 177 indexedAt: new Date(), 178 }, 179 }) 180 } 181 182 // Invalidate cache for this URI 183 try { 184 await this.cache.del(`${CACHE_PREFIX}${label.uri}`) 185 } catch { 186 // Non-critical 187 } 188 } 189 190 /** 191 * Get all active labels for a URI (DID or content URI). 192 * Results are cached in Valkey for 1 hour. 193 */ 194 async getLabels(uri: string): Promise<CachedLabel[]> { 195 const cacheKey = `${CACHE_PREFIX}${uri}` 196 197 // Try cache first 198 try { 199 const cached = await this.cache.get(cacheKey) 200 if (cached) { 201 return JSON.parse(cached) as CachedLabel[] 202 } 203 } catch { 204 // Fall through to DB 205 } 206 207 const rows = await this.db 208 .select({ 209 val: ozoneLabels.val, 210 src: ozoneLabels.src, 211 neg: ozoneLabels.neg, 212 }) 213 .from(ozoneLabels) 214 .where(and(eq(ozoneLabels.uri, uri), eq(ozoneLabels.neg, false))) 215 216 const labels: CachedLabel[] = rows.map((r) => ({ 217 val: r.val, 218 src: r.src, 219 neg: r.neg, 220 })) 221 222 // Cache result 223 try { 224 await this.cache.set(cacheKey, JSON.stringify(labels), 'EX', CACHE_TTL) 225 } catch { 226 // Non-critical 227 } 228 229 return labels 230 } 231 232 /** 233 * Check if a URI has a specific label value. 234 */ 235 async hasLabel(uri: string, val: string): Promise<boolean> { 236 const labels = await this.getLabels(uri) 237 return labels.some((l) => l.val === val) 238 } 239 240 /** 241 * Check if a DID or URI has any spam-related labels (spam, !hide). 242 */ 243 async isSpamLabeled(didOrUri: string): Promise<boolean> { 244 const labels = await this.getLabels(didOrUri) 245 return labels.some((l) => SPAM_LABELS.has(l.val)) 246 } 247 248 /** 249 * Batch check which DIDs have spam-related labels. 250 * Uses a single DB query for all cache misses instead of N+1 individual queries. 251 */ 252 async batchIsSpamLabeled(dids: string[]): Promise<Map<string, boolean>> { 253 const result = new Map<string, boolean>() 254 if (dids.length === 0) return result 255 256 const uncached: string[] = [] 257 258 // Check cache first 259 for (const did of dids) { 260 const cacheKey = `${CACHE_PREFIX}${did}` 261 try { 262 const cached = await this.cache.get(cacheKey) 263 if (cached) { 264 const labels = JSON.parse(cached) as CachedLabel[] 265 result.set( 266 did, 267 labels.some((l) => SPAM_LABELS.has(l.val)) 268 ) 269 continue 270 } 271 } catch { 272 // Fall through to DB 273 } 274 uncached.push(did) 275 } 276 277 if (uncached.length === 0) return result 278 279 // Batch DB query for all uncached DIDs 280 const rows = await this.db 281 .select({ 282 uri: ozoneLabels.uri, 283 val: ozoneLabels.val, 284 src: ozoneLabels.src, 285 neg: ozoneLabels.neg, 286 }) 287 .from(ozoneLabels) 288 .where(and(inArray(ozoneLabels.uri, uncached), eq(ozoneLabels.neg, false))) 289 290 // Group by URI 291 const labelsByUri = new Map<string, CachedLabel[]>() 292 for (const row of rows) { 293 const labels = labelsByUri.get(row.uri) ?? [] 294 labels.push({ val: row.val, src: row.src, neg: row.neg }) 295 labelsByUri.set(row.uri, labels) 296 } 297 298 // Cache and build results 299 for (const did of uncached) { 300 const labels = labelsByUri.get(did) ?? [] 301 result.set( 302 did, 303 labels.some((l) => SPAM_LABELS.has(l.val)) 304 ) 305 try { 306 await this.cache.set(`${CACHE_PREFIX}${did}`, JSON.stringify(labels), 'EX', CACHE_TTL) 307 } catch { 308 // Non-critical 309 } 310 } 311 312 return result 313 } 314}