this repo has no description
1use anyhow::{Context, Result};
2use chrono::{prelude::*, Duration};
3use sqlx::{Execute, Pool, QueryBuilder, Sqlite};
4
5use model::FeedContent;
6
7pub type StoragePool = Pool<Sqlite>;
8
9pub mod model {
10 use chrono::{DateTime, SubsecRound, Utc};
11 use sqlx::prelude::*;
12
13 #[derive(Clone, FromRow)]
14 pub struct FeedContent {
15 pub feed_id: String,
16 pub uri: String,
17 pub indexed_at: i64,
18 pub score: i32,
19 }
20
21 impl FeedContent {
22 pub(crate) fn age_in_hours(&self, now: i64) -> i64 {
23 let target = DateTime::from_timestamp_micros(self.indexed_at)
24 .map(|value| value.trunc_subsecs(0).timestamp());
25 if target.is_none() {
26 return 1;
27 }
28 let target = target.unwrap();
29 let diff_seconds = now - target;
30 std::cmp::max((diff_seconds / (60 * 60)) + 1, 1)
31 }
32 }
33
34 #[derive(Clone, FromRow)]
35 pub struct Denylist {
36 pub subject: String,
37 pub reason: String,
38 pub created_at: DateTime<Utc>,
39 }
40}
41
42pub async fn feed_content_upsert(pool: &StoragePool, feed_content: &FeedContent) -> Result<()> {
43 let mut tx = pool.begin().await.context("failed to begin transaction")?;
44
45 let now = Utc::now();
46 let res = sqlx::query("INSERT OR REPLACE INTO feed_content (feed_id, uri, indexed_at, updated_at, score) VALUES (?, ?, ?, ?, ?)")
47 .bind(&feed_content.feed_id)
48 .bind(&feed_content.uri)
49 .bind(feed_content.indexed_at)
50 .bind(now)
51 .bind(feed_content.score)
52 .execute(tx.as_mut())
53 .await.context("failed to insert feed content record")?;
54
55 if res.rows_affected() == 0 {
56 sqlx::query("UPDATE feed_content SET score = score + ?, updated_at = ? WHERE feed_id = ? AND uri = ?")
57 .bind(feed_content.score)
58 .bind(now)
59 .bind(&feed_content.feed_id)
60 .bind(&feed_content.uri)
61 .execute(tx.as_mut())
62 .await
63 .context("failed to update feed content record")?;
64 }
65
66 tx.commit().await.context("failed to commit transaction")
67}
68
69pub async fn feed_content_update(pool: &StoragePool, feed_content: &FeedContent) -> Result<()> {
70 let mut tx = pool.begin().await.context("failed to begin transaction")?;
71
72 let now = Utc::now();
73 sqlx::query(
74 "UPDATE feed_content SET score = score + ?, updated_at = ? WHERE feed_id = ? AND uri = ?",
75 )
76 .bind(feed_content.score)
77 .bind(now)
78 .bind(&feed_content.feed_id)
79 .bind(&feed_content.uri)
80 .execute(tx.as_mut())
81 .await
82 .context("failed to update feed content record")?;
83
84 tx.commit().await.context("failed to commit transaction")
85}
86
87pub async fn feed_content_cached(
88 pool: &StoragePool,
89 feed_uri: &str,
90 limit: u32,
91) -> Result<Vec<FeedContent>> {
92 let mut tx = pool.begin().await.context("failed to begin transaction")?;
93
94 let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC LIMIT ?";
95
96 let results = sqlx::query_as::<_, FeedContent>(query)
97 .bind(feed_uri)
98 .bind(limit)
99 .fetch_all(tx.as_mut())
100 .await?;
101
102 tx.commit().await.context("failed to commit transaction")?;
103
104 Ok(results)
105}
106
107pub async fn consumer_control_insert(pool: &StoragePool, source: &str, time_us: i64) -> Result<()> {
108 let mut tx = pool.begin().await.context("failed to begin transaction")?;
109
110 let now = Utc::now();
111 sqlx::query(
112 "INSERT OR REPLACE INTO consumer_control (source, time_us, updated_at) VALUES (?, ?, ?)",
113 )
114 .bind(source)
115 .bind(time_us)
116 .bind(now)
117 .execute(tx.as_mut())
118 .await?;
119
120 tx.commit().await.context("failed to commit transaction")
121}
122
123pub async fn consumer_control_get(pool: &StoragePool, source: &str) -> Result<Option<i64>> {
124 let mut tx = pool.begin().await.context("failed to begin transaction")?;
125
126 let result =
127 sqlx::query_scalar::<_, i64>("SELECT time_us FROM consumer_control WHERE source = ?")
128 .bind(source)
129 .fetch_optional(tx.as_mut())
130 .await
131 .context("failed to select consumer control record")?;
132
133 tx.commit().await.context("failed to commit transaction")?;
134
135 Ok(result)
136}
137
138pub async fn verifcation_method_insert(
139 pool: &StoragePool,
140 did: &str,
141 multikey: &str,
142) -> Result<()> {
143 let mut tx = pool.begin().await.context("failed to begin transaction")?;
144
145 let now = Utc::now();
146 sqlx::query(
147 "INSERT OR REPLACE INTO verification_method_cache (did, multikey, updated_at) VALUES (?, ?, ?)",
148 )
149 .bind(did)
150 .bind(multikey)
151 .bind(now)
152 .execute(tx.as_mut())
153 .await.context("failed to update verification method cache")?;
154
155 tx.commit().await.context("failed to commit transaction")
156}
157
158pub async fn verification_method_cleanup(pool: &StoragePool) -> Result<()> {
159 let mut tx = pool.begin().await.context("failed to begin transaction")?;
160
161 let now = Utc::now();
162 let seven_days_ago = now - Duration::days(7);
163 sqlx::query("DELETE FROM verification_method_cache WHERE updated_at < ?")
164 .bind(seven_days_ago)
165 .execute(tx.as_mut())
166 .await
167 .context("failed to delete old verification method cache records")?;
168
169 tx.commit().await.context("failed to commit transaction")
170}
171
172pub async fn verification_method_get(pool: &StoragePool, did: &str) -> Result<Option<String>> {
173 let mut tx = pool.begin().await.context("failed to begin transaction")?;
174
175 let result = sqlx::query_scalar::<_, String>(
176 "SELECT multikey FROM verification_method_cache WHERE did = ?",
177 )
178 .bind(did)
179 .fetch_optional(tx.as_mut())
180 .await
181 .context("failed to select verification method cache record")?;
182 tx.commit().await.context("failed to commit transaction")?;
183 Ok(result)
184}
185
186pub async fn feed_content_truncate_oldest(pool: &StoragePool, age: DateTime<Utc>) -> Result<()> {
187 let mut tx = pool.begin().await.context("failed to begin transaction")?;
188
189 // TODO: This might need an index.
190 sqlx::query("DELETE FROM feed_content WHERE updated_at < ?")
191 .bind(age)
192 .execute(tx.as_mut())
193 .await
194 .context("failed to delete feed content beyond mark")?;
195
196 tx.commit().await.context("failed to commit transaction")
197}
198
199pub async fn denylist_upsert(pool: &StoragePool, subject: &str, reason: &str) -> Result<()> {
200 let mut tx = pool.begin().await.context("failed to begin transaction")?;
201
202 let now = Utc::now();
203 sqlx::query("INSERT OR REPLACE INTO denylist (subject, reason, updated_at) VALUES (?, ?, ?)")
204 .bind(subject)
205 .bind(reason)
206 .bind(now)
207 .execute(tx.as_mut())
208 .await
209 .context("failed to upsert denylist record")?;
210
211 tx.commit().await.context("failed to commit transaction")
212}
213
214pub async fn denylist_remove(pool: &StoragePool, subject: &str) -> Result<()> {
215 let mut tx = pool.begin().await.context("failed to begin transaction")?;
216
217 sqlx::query("DELETE FROM denylist WHERE subject = ?")
218 .bind(subject)
219 .execute(tx.as_mut())
220 .await
221 .context("failed to delete denylist record")?;
222
223 tx.commit().await.context("failed to commit transaction")
224}
225
226pub async fn feed_content_purge_aturi(
227 pool: &StoragePool,
228 aturi: &str,
229 feed: &Option<String>,
230) -> Result<()> {
231 let mut tx = pool.begin().await.context("failed to begin transaction")?;
232
233 if let Some(feed) = feed {
234 sqlx::query("DELETE FROM feed_content WHERE feed_id = ? AND uri = ?")
235 .bind(feed)
236 .bind(aturi)
237 .execute(tx.as_mut())
238 .await
239 .context("failed to delete denylist record")?;
240 } else {
241 sqlx::query("DELETE FROM feed_content WHERE uri = ?")
242 .bind(aturi)
243 .execute(tx.as_mut())
244 .await
245 .context("failed to delete denylist record")?;
246 }
247
248 tx.commit().await.context("failed to commit transaction")
249}
250
251pub async fn denylist_exists(pool: &StoragePool, subjects: &[&str]) -> Result<bool> {
252 let mut tx = pool.begin().await.context("failed to begin transaction")?;
253
254 let mut query_builder: QueryBuilder<Sqlite> =
255 QueryBuilder::new("SELECT COUNT(*) FROM denylist WHERE subject IN (");
256 let mut separated = query_builder.separated(", ");
257 for subject in subjects {
258 separated.push_bind(subject);
259 }
260 separated.push_unseparated(") ");
261
262 let mut query = sqlx::query_scalar::<_, i64>(query_builder.build().sql());
263 for subject in subjects {
264 query = query.bind(subject);
265 }
266 let count = query
267 .fetch_one(tx.as_mut())
268 .await
269 .context("failed to delete denylist record")?;
270
271 tx.commit().await.context("failed to commit transaction")?;
272
273 Ok(count > 0)
274}
275
276#[cfg(test)]
277mod tests {
278 use sqlx::SqlitePool;
279
280 #[sqlx::test]
281 async fn record_feed_content(pool: SqlitePool) -> sqlx::Result<()> {
282 let record = super::model::FeedContent {
283 feed_id: "feed".to_string(),
284 uri: "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n"
285 .to_string(),
286 indexed_at: 1730673934229172_i64,
287 score: 1,
288 };
289 super::feed_content_upsert(&pool, &record)
290 .await
291 .expect("failed to insert record");
292
293 let records = super::feed_content_cached(&pool, "feed", 5)
294 .await
295 .expect("failed to paginate records");
296
297 assert_eq!(records.len(), 1);
298 assert_eq!(records[0].feed_id, "feed");
299 assert_eq!(
300 records[0].uri,
301 "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n"
302 );
303 assert_eq!(records[0].indexed_at, 1730673934229172_i64);
304
305 Ok(())
306 }
307
308 #[sqlx::test]
309 async fn consumer_control(pool: SqlitePool) -> sqlx::Result<()> {
310 super::consumer_control_insert(&pool, "foo", 1730673934229172_i64)
311 .await
312 .expect("failed to insert record");
313
314 assert_eq!(
315 super::consumer_control_get(&pool, "foo")
316 .await
317 .expect("failed to get record"),
318 Some(1730673934229172_i64)
319 );
320
321 super::consumer_control_insert(&pool, "foo", 1730673934229173_i64)
322 .await
323 .expect("failed to insert record");
324
325 assert_eq!(
326 super::consumer_control_get(&pool, "foo")
327 .await
328 .expect("failed to get record"),
329 Some(1730673934229173_i64)
330 );
331
332 Ok(())
333 }
334}