Barazo AppView backend
barazo.forum
1import { eq, and, inArray } from 'drizzle-orm'
2import type { Database } from '../db/index.js'
3import type { Cache } from '../cache/index.js'
4import type { Logger } from '../lib/logger.js'
5import { ozoneLabels } from '../db/schema/ozone-labels.js'
6import { decodeEventStreamFrame } from '../lib/cbor-frames.js'
7
8const CACHE_TTL = 3600 // 1 hour
9const CACHE_PREFIX = 'ozone:labels:'
10const INITIAL_RECONNECT_MS = 1000
11const MAX_RECONNECT_MS = 60000
12const SPAM_LABELS = new Set(['spam', '!hide'])
13
14interface LabelEvent {
15 seq: number
16 labels: Label[]
17}
18
19interface Label {
20 src: string
21 uri: string
22 val: string
23 neg?: boolean
24 cts: string
25 exp?: string
26}
27
28interface CachedLabel {
29 val: string
30 src: string
31 neg: boolean
32}
33
34export class OzoneService {
35 private ws: WebSocket | null = null
36 private reconnectMs = INITIAL_RECONNECT_MS
37 private stopping = false
38
39 constructor(
40 private db: Database,
41 private cache: Cache,
42 private logger: Logger,
43 private labelerUrl: string
44 ) {}
45
46 start(): void {
47 this.stopping = false
48 this.connect()
49 }
50
51 stop(): void {
52 this.stopping = true
53 if (this.ws) {
54 this.ws.close()
55 this.ws = null
56 }
57 }
58
59 private connect(): void {
60 if (this.stopping) return
61
62 const wsUrl = this.labelerUrl.replace(/^https?:/, 'wss:').replace(/\/$/, '')
63 const url = `${wsUrl}/xrpc/com.atproto.label.subscribeLabels`
64
65 this.logger.info({ url }, 'Connecting to Ozone labeler')
66
67 try {
68 this.ws = new WebSocket(url)
69 } catch (err) {
70 this.logger.warn({ err }, 'Failed to create Ozone WebSocket')
71 this.scheduleReconnect()
72 return
73 }
74
75 this.ws.addEventListener('open', () => {
76 this.logger.info('Connected to Ozone labeler')
77 this.reconnectMs = INITIAL_RECONNECT_MS
78 })
79
80 this.ws.addEventListener('message', (event) => {
81 void this.handleMessage(event.data)
82 })
83
84 this.ws.addEventListener('close', () => {
85 this.logger.info('Ozone labeler connection closed')
86 this.scheduleReconnect()
87 })
88
89 this.ws.addEventListener('error', (event) => {
90 this.logger.warn({ event }, 'Ozone labeler WebSocket error')
91 })
92 }
93
94 private scheduleReconnect(): void {
95 if (this.stopping) return
96
97 this.logger.info({ reconnectMs: this.reconnectMs }, 'Scheduling Ozone labeler reconnect')
98
99 setTimeout(() => {
100 this.connect()
101 }, this.reconnectMs)
102
103 this.reconnectMs = Math.min(this.reconnectMs * 2, MAX_RECONNECT_MS)
104 }
105
106 private async handleMessage(data: unknown): Promise<void> {
107 try {
108 const bytes = await this.toBinaryData(data)
109 const { header, body } = decodeEventStreamFrame(bytes)
110
111 // Error frame (op: -1) -- log and skip
112 if (header.op === -1) {
113 const error = typeof body.error === 'string' ? body.error : 'unknown'
114 const message = typeof body.message === 'string' ? body.message : undefined
115 this.logger.warn({ error, message }, 'Ozone labeler error frame received')
116 return
117 }
118
119 // Only process #labels messages; skip other types silently
120 if (header.t !== '#labels') return
121
122 const event = body as unknown as LabelEvent
123 if (!Array.isArray(event.labels)) return
124
125 for (const label of event.labels) {
126 await this.processLabel(label)
127 }
128 } catch (err) {
129 this.logger.warn({ err }, 'Failed to process Ozone label event')
130 }
131 }
132
133 /**
134 * Convert incoming WebSocket data to a Uint8Array for CBOR decoding.
135 *
136 * The AT Protocol event stream sends binary CBOR-encoded frames.
137 * Node.js native WebSocket may deliver data as Blob, ArrayBuffer,
138 * Buffer, or Uint8Array depending on the binaryType setting.
139 */
140 private async toBinaryData(data: unknown): Promise<Uint8Array> {
141 if (data instanceof Uint8Array) return data
142 if (data instanceof ArrayBuffer) return new Uint8Array(data)
143 if (data instanceof Blob) return new Uint8Array(await data.arrayBuffer())
144 throw new Error(`Unexpected WebSocket data type: ${typeof data}`)
145 }
146
147 private async processLabel(label: Label): Promise<void> {
148 if (label.neg) {
149 // Negation: remove the prior label
150 await this.db
151 .delete(ozoneLabels)
152 .where(
153 and(
154 eq(ozoneLabels.src, label.src),
155 eq(ozoneLabels.uri, label.uri),
156 eq(ozoneLabels.val, label.val)
157 )
158 )
159 } else {
160 // Upsert the label
161 await this.db
162 .insert(ozoneLabels)
163 .values({
164 src: label.src,
165 uri: label.uri,
166 val: label.val,
167 neg: false,
168 cts: new Date(label.cts),
169 exp: label.exp ? new Date(label.exp) : undefined,
170 })
171 .onConflictDoUpdate({
172 target: [ozoneLabels.src, ozoneLabels.uri, ozoneLabels.val],
173 set: {
174 neg: false,
175 cts: new Date(label.cts),
176 exp: label.exp ? new Date(label.exp) : undefined,
177 indexedAt: new Date(),
178 },
179 })
180 }
181
182 // Invalidate cache for this URI
183 try {
184 await this.cache.del(`${CACHE_PREFIX}${label.uri}`)
185 } catch {
186 // Non-critical
187 }
188 }
189
190 /**
191 * Get all active labels for a URI (DID or content URI).
192 * Results are cached in Valkey for 1 hour.
193 */
194 async getLabels(uri: string): Promise<CachedLabel[]> {
195 const cacheKey = `${CACHE_PREFIX}${uri}`
196
197 // Try cache first
198 try {
199 const cached = await this.cache.get(cacheKey)
200 if (cached) {
201 return JSON.parse(cached) as CachedLabel[]
202 }
203 } catch {
204 // Fall through to DB
205 }
206
207 const rows = await this.db
208 .select({
209 val: ozoneLabels.val,
210 src: ozoneLabels.src,
211 neg: ozoneLabels.neg,
212 })
213 .from(ozoneLabels)
214 .where(and(eq(ozoneLabels.uri, uri), eq(ozoneLabels.neg, false)))
215
216 const labels: CachedLabel[] = rows.map((r) => ({
217 val: r.val,
218 src: r.src,
219 neg: r.neg,
220 }))
221
222 // Cache result
223 try {
224 await this.cache.set(cacheKey, JSON.stringify(labels), 'EX', CACHE_TTL)
225 } catch {
226 // Non-critical
227 }
228
229 return labels
230 }
231
232 /**
233 * Check if a URI has a specific label value.
234 */
235 async hasLabel(uri: string, val: string): Promise<boolean> {
236 const labels = await this.getLabels(uri)
237 return labels.some((l) => l.val === val)
238 }
239
240 /**
241 * Check if a DID or URI has any spam-related labels (spam, !hide).
242 */
243 async isSpamLabeled(didOrUri: string): Promise<boolean> {
244 const labels = await this.getLabels(didOrUri)
245 return labels.some((l) => SPAM_LABELS.has(l.val))
246 }
247
248 /**
249 * Batch check which DIDs have spam-related labels.
250 * Uses a single DB query for all cache misses instead of N+1 individual queries.
251 */
252 async batchIsSpamLabeled(dids: string[]): Promise<Map<string, boolean>> {
253 const result = new Map<string, boolean>()
254 if (dids.length === 0) return result
255
256 const uncached: string[] = []
257
258 // Check cache first
259 for (const did of dids) {
260 const cacheKey = `${CACHE_PREFIX}${did}`
261 try {
262 const cached = await this.cache.get(cacheKey)
263 if (cached) {
264 const labels = JSON.parse(cached) as CachedLabel[]
265 result.set(
266 did,
267 labels.some((l) => SPAM_LABELS.has(l.val))
268 )
269 continue
270 }
271 } catch {
272 // Fall through to DB
273 }
274 uncached.push(did)
275 }
276
277 if (uncached.length === 0) return result
278
279 // Batch DB query for all uncached DIDs
280 const rows = await this.db
281 .select({
282 uri: ozoneLabels.uri,
283 val: ozoneLabels.val,
284 src: ozoneLabels.src,
285 neg: ozoneLabels.neg,
286 })
287 .from(ozoneLabels)
288 .where(and(inArray(ozoneLabels.uri, uncached), eq(ozoneLabels.neg, false)))
289
290 // Group by URI
291 const labelsByUri = new Map<string, CachedLabel[]>()
292 for (const row of rows) {
293 const labels = labelsByUri.get(row.uri) ?? []
294 labels.push({ val: row.val, src: row.src, neg: row.neg })
295 labelsByUri.set(row.uri, labels)
296 }
297
298 // Cache and build results
299 for (const did of uncached) {
300 const labels = labelsByUri.get(did) ?? []
301 result.set(
302 did,
303 labels.some((l) => SPAM_LABELS.has(l.val))
304 )
305 try {
306 await this.cache.set(`${CACHE_PREFIX}${did}`, JSON.stringify(labels), 'EX', CACHE_TTL)
307 } catch {
308 // Non-critical
309 }
310 }
311
312 return result
313 }
314}