A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
at main 2.5 kB view raw
1import { Database } from "duckdb"; 2import { logger } from "../logger/index.js"; 3 4export interface Post { 5 uri: string; 6 did: string; 7 text?: string; 8 facets?: any; 9 embeds?: any; 10 langs?: string[]; 11 tags?: string[]; 12 created_at: string; 13 is_reply?: boolean; 14} 15 16export class PostsRepository { 17 constructor(private db: Database) {} 18 19 async insert(post: Post): Promise<void> { 20 return new Promise((resolve, reject) => { 21 this.db.prepare( 22 ` 23 INSERT INTO posts (uri, did, text, facets, embeds, langs, tags, created_at, is_reply) 24 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 25 ON CONFLICT (uri) DO UPDATE SET 26 text = EXCLUDED.text, 27 facets = EXCLUDED.facets, 28 embeds = EXCLUDED.embeds, 29 langs = EXCLUDED.langs, 30 tags = EXCLUDED.tags 31 `, 32 (err, stmt) => { 33 if (err) { 34 logger.error({ err }, "Failed to prepare post insert statement"); 35 reject(err); 36 return; 37 } 38 39 stmt.run( 40 post.uri, 41 post.did, 42 post.text || null, 43 post.facets ? JSON.stringify(post.facets) : null, 44 post.embeds ? JSON.stringify(post.embeds) : null, 45 post.langs ? JSON.stringify(post.langs) : null, 46 post.tags ? JSON.stringify(post.tags) : null, 47 post.created_at, 48 post.is_reply || false, 49 (err) => { 50 if (err) { 51 logger.error({ err, post }, "Failed to insert post"); 52 reject(err); 53 return; 54 } 55 resolve(); 56 } 57 ); 58 } 59 ); 60 }); 61 } 62 63 async findByUri(uri: string): Promise<Post | null> { 64 return new Promise((resolve, reject) => { 65 this.db.all(`SELECT * FROM posts WHERE uri = $1`, uri, (err, rows: Post[]) => { 66 if (err) { 67 logger.error({ err, uri }, "Failed to find post by URI"); 68 reject(err); 69 return; 70 } 71 resolve(rows?.[0] || null); 72 }); 73 }); 74 } 75 76 async findByDid(did: string, limit = 100): Promise<Post[]> { 77 return new Promise((resolve, reject) => { 78 this.db.all( 79 `SELECT * FROM posts WHERE did = $1 ORDER BY created_at DESC LIMIT $2`, 80 did, 81 limit, 82 (err, rows: Post[]) => { 83 if (err) { 84 logger.error({ err, did }, "Failed to find posts by DID"); 85 reject(err); 86 return; 87 } 88 resolve(rows || []); 89 } 90 ); 91 }); 92 } 93}