A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules
1import { createClient } from "redis";
2import { REDIS_URL } from "./config.js";
3import { logger } from "./logger.js";
4import type { WindowUnit } from "./types.js";
5
6export const redisClient = createClient({
7 url: REDIS_URL,
8});
9
10redisClient.on("error", (err: Error) => {
11 logger.error({ err }, "Redis client error");
12});
13
14redisClient.on("connect", () => {
15 logger.info("Redis client connected");
16});
17
18redisClient.on("ready", () => {
19 logger.info("Redis client ready");
20});
21
22redisClient.on("reconnecting", () => {
23 logger.warn("Redis client reconnecting");
24});
25
26export async function connectRedis(): Promise<void> {
27 try {
28 await redisClient.connect();
29 } catch (err) {
30 logger.error({ err }, "Failed to connect to Redis");
31 throw err;
32 }
33}
34
35export async function disconnectRedis(): Promise<void> {
36 try {
37 await redisClient.quit();
38 logger.info("Redis client disconnected");
39 } catch (err) {
40 logger.error({ err }, "Error disconnecting Redis");
41 }
42}
43
44function getPostLabelCacheKey(atURI: string, label: string): string {
45 return `post-label:${atURI}:${label}`;
46}
47
48function getAccountLabelCacheKey(did: string, label: string): string {
49 return `account-label:${did}:${label}`;
50}
51
52export async function tryClaimPostLabel(
53 atURI: string,
54 label: string,
55): Promise<boolean> {
56 try {
57 const key = getPostLabelCacheKey(atURI, label);
58 const result = await redisClient.set(key, "1", {
59 NX: true,
60 EX: 60 * 60 * 24 * 7,
61 });
62 return result === "OK";
63 } catch (err) {
64 logger.warn(
65 { err, atURI, label },
66 "Error claiming post label in Redis, allowing through",
67 );
68 return true;
69 }
70}
71
72export async function tryClaimAccountLabel(
73 did: string,
74 label: string,
75): Promise<boolean> {
76 try {
77 const key = getAccountLabelCacheKey(did, label);
78 const result = await redisClient.set(key, "1", {
79 NX: true,
80 EX: 60 * 60 * 24 * 7,
81 });
82 return result === "OK";
83 } catch (err) {
84 logger.warn(
85 { err, did, label },
86 "Error claiming account label in Redis, allowing through",
87 );
88 return true;
89 }
90}
91
92export async function deleteAccountLabelClaim(
93 did: string,
94 label: string,
95): Promise<void> {
96 try {
97 const key = getAccountLabelCacheKey(did, label);
98 await redisClient.del(key);
99 logger.debug(
100 { did, label },
101 "Deleted account label claim from Redis cache",
102 );
103 } catch (err) {
104 logger.warn(
105 { err, did, label },
106 "Error deleting account label claim from Redis",
107 );
108 }
109}
110
111export async function tryClaimAccountComment(
112 did: string,
113 atURI: string,
114): Promise<boolean> {
115 try {
116 const key = `account-comment:${did}:${atURI}`;
117 const result = await redisClient.set(key, "1", {
118 NX: true,
119 EX: 60 * 60 * 24 * 7,
120 });
121 return result === "OK";
122 } catch (err) {
123 logger.warn(
124 { err, did, atURI },
125 "Error claiming account comment in Redis, allowing through",
126 );
127 return true;
128 }
129}
130
131function windowToMicroseconds(window: number, unit: WindowUnit): number {
132 const multipliers: Record<WindowUnit, number> = {
133 minutes: 60 * 1000000,
134 hours: 60 * 60 * 1000000,
135 days: 24 * 60 * 60 * 1000000,
136 };
137 return window * multipliers[unit];
138}
139
140function windowToSeconds(window: number, unit: WindowUnit): number {
141 const multipliers: Record<WindowUnit, number> = {
142 minutes: 60,
143 hours: 60 * 60,
144 days: 24 * 60 * 60,
145 };
146 return window * multipliers[unit];
147}
148
149function getPostLabelTrackingKey(
150 did: string,
151 label: string,
152 window: number,
153 unit: WindowUnit,
154): string {
155 return `account-post-labels:${did}:${label}:${window.toString()}${unit}`;
156}
157
158function getStarterPackTrackingKey(
159 did: string,
160 window: number,
161 unit: WindowUnit,
162): string {
163 return `starterpack:threshold:${did}:${window.toString()}${unit}`;
164}
165
166export async function trackStarterPackForAccount(
167 did: string,
168 starterPackUri: string,
169 timestamp: number,
170 window: number,
171 windowUnit: WindowUnit,
172): Promise<void> {
173 try {
174 const key = getStarterPackTrackingKey(did, window, windowUnit);
175 const windowStartTime = timestamp - windowToMicroseconds(window, windowUnit);
176
177 await redisClient.zRemRangeByScore(key, "-inf", windowStartTime);
178
179 await redisClient.zAdd(key, {
180 score: timestamp,
181 value: starterPackUri,
182 });
183
184 const ttlSeconds = windowToSeconds(window, windowUnit) + 60 * 60;
185 await redisClient.expire(key, ttlSeconds);
186
187 logger.debug(
188 { did, starterPackUri, timestamp, window, windowUnit },
189 "Tracked starter pack for account",
190 );
191 } catch (err) {
192 logger.error(
193 { err, did, starterPackUri, timestamp, window, windowUnit },
194 "Error tracking starter pack in Redis",
195 );
196 throw err;
197 }
198}
199
200export async function getStarterPackCountInWindow(
201 did: string,
202 window: number,
203 windowUnit: WindowUnit,
204 currentTime: number,
205): Promise<number> {
206 try {
207 const key = getStarterPackTrackingKey(did, window, windowUnit);
208 const windowStartTime = currentTime - windowToMicroseconds(window, windowUnit);
209 const count = await redisClient.zCount(key, windowStartTime, "+inf");
210
211 logger.debug(
212 { did, window, windowUnit, count },
213 "Retrieved starter pack count in window",
214 );
215
216 return count;
217 } catch (err) {
218 logger.error(
219 { err, did, window, windowUnit },
220 "Error getting starter pack count from Redis",
221 );
222 throw err;
223 }
224}
225
226export async function trackPostLabelForAccount(
227 did: string,
228 label: string,
229 timestamp: number,
230 window: number,
231 windowUnit: WindowUnit,
232): Promise<void> {
233 try {
234 const key = getPostLabelTrackingKey(did, label, window, windowUnit);
235 const windowStartTime = timestamp - windowToMicroseconds(window, windowUnit);
236
237 await redisClient.zRemRangeByScore(key, "-inf", windowStartTime);
238
239 await redisClient.zAdd(key, {
240 score: timestamp,
241 value: timestamp.toString(),
242 });
243
244 const ttlSeconds = windowToSeconds(window, windowUnit) + 60 * 60;
245 await redisClient.expire(key, ttlSeconds);
246
247 logger.debug(
248 { did, label, timestamp, window, windowUnit },
249 "Tracked post label for account",
250 );
251 } catch (err) {
252 logger.error(
253 { err, did, label, timestamp, window, windowUnit },
254 "Error tracking post label in Redis",
255 );
256 throw err;
257 }
258}
259
260export async function getPostLabelCountInWindow(
261 did: string,
262 labels: string[],
263 window: number,
264 windowUnit: WindowUnit,
265 currentTime: number,
266): Promise<number> {
267 try {
268 const windowStartTime = currentTime - windowToMicroseconds(window, windowUnit);
269 let totalCount = 0;
270
271 for (const label of labels) {
272 const key = getPostLabelTrackingKey(did, label, window, windowUnit);
273 const count = await redisClient.zCount(key, windowStartTime, "+inf");
274 totalCount += count;
275 }
276
277 logger.debug(
278 { did, labels, window, windowUnit, totalCount },
279 "Retrieved post label count in window",
280 );
281
282 return totalCount;
283 } catch (err) {
284 logger.error(
285 { err, did, labels, window, windowUnit },
286 "Error getting post label count from Redis",
287 );
288 throw err;
289 }
290}