A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
1import { Database } from "duckdb";
2import { logger } from "../logger/index.js";
3
4export interface Post {
5 uri: string;
6 did: string;
7 text?: string;
8 facets?: any;
9 embeds?: any;
10 langs?: string[];
11 tags?: string[];
12 created_at: string;
13 is_reply?: boolean;
14}
15
16export class PostsRepository {
17 constructor(private db: Database) {}
18
19 async insert(post: Post): Promise<void> {
20 return new Promise((resolve, reject) => {
21 this.db.prepare(
22 `
23 INSERT INTO posts (uri, did, text, facets, embeds, langs, tags, created_at, is_reply)
24 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
25 ON CONFLICT (uri) DO UPDATE SET
26 text = EXCLUDED.text,
27 facets = EXCLUDED.facets,
28 embeds = EXCLUDED.embeds,
29 langs = EXCLUDED.langs,
30 tags = EXCLUDED.tags
31 `,
32 (err, stmt) => {
33 if (err) {
34 logger.error({ err }, "Failed to prepare post insert statement");
35 reject(err);
36 return;
37 }
38
39 stmt.run(
40 post.uri,
41 post.did,
42 post.text || null,
43 post.facets ? JSON.stringify(post.facets) : null,
44 post.embeds ? JSON.stringify(post.embeds) : null,
45 post.langs ? JSON.stringify(post.langs) : null,
46 post.tags ? JSON.stringify(post.tags) : null,
47 post.created_at,
48 post.is_reply || false,
49 (err) => {
50 if (err) {
51 logger.error({ err, post }, "Failed to insert post");
52 reject(err);
53 return;
54 }
55 resolve();
56 }
57 );
58 }
59 );
60 });
61 }
62
63 async findByUri(uri: string): Promise<Post | null> {
64 return new Promise((resolve, reject) => {
65 this.db.all(`SELECT * FROM posts WHERE uri = $1`, uri, (err, rows: Post[]) => {
66 if (err) {
67 logger.error({ err, uri }, "Failed to find post by URI");
68 reject(err);
69 return;
70 }
71 resolve(rows?.[0] || null);
72 });
73 });
74 }
75
76 async findByDid(did: string, limit = 100): Promise<Post[]> {
77 return new Promise((resolve, reject) => {
78 this.db.all(
79 `SELECT * FROM posts WHERE did = $1 ORDER BY created_at DESC LIMIT $2`,
80 did,
81 limit,
82 (err, rows: Post[]) => {
83 if (err) {
84 logger.error({ err, did }, "Failed to find posts by DID");
85 reject(err);
86 return;
87 }
88 resolve(rows || []);
89 }
90 );
91 });
92 }
93}