this repo has no description
at main 11 kB view raw
1use anyhow::{Context, Result}; 2use chrono::{prelude::*, Duration}; 3use sqlx::{Execute, Pool, QueryBuilder, Sqlite}; 4 5use model::FeedContent; 6 7pub type StoragePool = Pool<Sqlite>; 8 9pub mod model { 10 use chrono::{DateTime, SubsecRound, Utc}; 11 use sqlx::prelude::*; 12 13 #[derive(Clone, FromRow)] 14 pub struct FeedContent { 15 pub feed_id: String, 16 pub uri: String, 17 pub indexed_at: i64, 18 pub score: i32, 19 } 20 21 impl FeedContent { 22 pub(crate) fn age_in_hours(&self, now: i64) -> i64 { 23 let target = DateTime::from_timestamp_micros(self.indexed_at) 24 .map(|value| value.trunc_subsecs(0).timestamp()); 25 if target.is_none() { 26 return 1; 27 } 28 let target = target.unwrap(); 29 let diff_seconds = now - target; 30 std::cmp::max((diff_seconds / (60 * 60)) + 1, 1) 31 } 32 } 33 34 #[derive(Clone, FromRow)] 35 pub struct Denylist { 36 pub subject: String, 37 pub reason: String, 38 pub created_at: DateTime<Utc>, 39 } 40} 41 42pub async fn feed_content_upsert(pool: &StoragePool, feed_content: &FeedContent) -> Result<()> { 43 let mut tx = pool.begin().await.context("failed to begin transaction")?; 44 45 let now = Utc::now(); 46 let res = sqlx::query("INSERT OR REPLACE INTO feed_content (feed_id, uri, indexed_at, updated_at, score) VALUES (?, ?, ?, ?, ?)") 47 .bind(&feed_content.feed_id) 48 .bind(&feed_content.uri) 49 .bind(feed_content.indexed_at) 50 .bind(now) 51 .bind(feed_content.score) 52 .execute(tx.as_mut()) 53 .await.context("failed to insert feed content record")?; 54 55 if res.rows_affected() == 0 { 56 sqlx::query("UPDATE feed_content SET score = score + ?, updated_at = ? WHERE feed_id = ? AND uri = ?") 57 .bind(feed_content.score) 58 .bind(now) 59 .bind(&feed_content.feed_id) 60 .bind(&feed_content.uri) 61 .execute(tx.as_mut()) 62 .await 63 .context("failed to update feed content record")?; 64 } 65 66 tx.commit().await.context("failed to commit transaction") 67} 68 69pub async fn feed_content_update(pool: &StoragePool, feed_content: &FeedContent) -> Result<()> { 70 let mut tx = pool.begin().await.context("failed to begin transaction")?; 71 72 let now = Utc::now(); 73 sqlx::query( 74 "UPDATE feed_content SET score = score + ?, updated_at = ? WHERE feed_id = ? AND uri = ?", 75 ) 76 .bind(feed_content.score) 77 .bind(now) 78 .bind(&feed_content.feed_id) 79 .bind(&feed_content.uri) 80 .execute(tx.as_mut()) 81 .await 82 .context("failed to update feed content record")?; 83 84 tx.commit().await.context("failed to commit transaction") 85} 86 87pub async fn feed_content_cached( 88 pool: &StoragePool, 89 feed_uri: &str, 90 limit: u32, 91) -> Result<Vec<FeedContent>> { 92 let mut tx = pool.begin().await.context("failed to begin transaction")?; 93 94 let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC LIMIT ?"; 95 96 let results = sqlx::query_as::<_, FeedContent>(query) 97 .bind(feed_uri) 98 .bind(limit) 99 .fetch_all(tx.as_mut()) 100 .await?; 101 102 tx.commit().await.context("failed to commit transaction")?; 103 104 Ok(results) 105} 106 107pub async fn consumer_control_insert(pool: &StoragePool, source: &str, time_us: i64) -> Result<()> { 108 let mut tx = pool.begin().await.context("failed to begin transaction")?; 109 110 let now = Utc::now(); 111 sqlx::query( 112 "INSERT OR REPLACE INTO consumer_control (source, time_us, updated_at) VALUES (?, ?, ?)", 113 ) 114 .bind(source) 115 .bind(time_us) 116 .bind(now) 117 .execute(tx.as_mut()) 118 .await?; 119 120 tx.commit().await.context("failed to commit transaction") 121} 122 123pub async fn consumer_control_get(pool: &StoragePool, source: &str) -> Result<Option<i64>> { 124 let mut tx = pool.begin().await.context("failed to begin transaction")?; 125 126 let result = 127 sqlx::query_scalar::<_, i64>("SELECT time_us FROM consumer_control WHERE source = ?") 128 .bind(source) 129 .fetch_optional(tx.as_mut()) 130 .await 131 .context("failed to select consumer control record")?; 132 133 tx.commit().await.context("failed to commit transaction")?; 134 135 Ok(result) 136} 137 138pub async fn verifcation_method_insert( 139 pool: &StoragePool, 140 did: &str, 141 multikey: &str, 142) -> Result<()> { 143 let mut tx = pool.begin().await.context("failed to begin transaction")?; 144 145 let now = Utc::now(); 146 sqlx::query( 147 "INSERT OR REPLACE INTO verification_method_cache (did, multikey, updated_at) VALUES (?, ?, ?)", 148 ) 149 .bind(did) 150 .bind(multikey) 151 .bind(now) 152 .execute(tx.as_mut()) 153 .await.context("failed to update verification method cache")?; 154 155 tx.commit().await.context("failed to commit transaction") 156} 157 158pub async fn verification_method_cleanup(pool: &StoragePool) -> Result<()> { 159 let mut tx = pool.begin().await.context("failed to begin transaction")?; 160 161 let now = Utc::now(); 162 let seven_days_ago = now - Duration::days(7); 163 sqlx::query("DELETE FROM verification_method_cache WHERE updated_at < ?") 164 .bind(seven_days_ago) 165 .execute(tx.as_mut()) 166 .await 167 .context("failed to delete old verification method cache records")?; 168 169 tx.commit().await.context("failed to commit transaction") 170} 171 172pub async fn verification_method_get(pool: &StoragePool, did: &str) -> Result<Option<String>> { 173 let mut tx = pool.begin().await.context("failed to begin transaction")?; 174 175 let result = sqlx::query_scalar::<_, String>( 176 "SELECT multikey FROM verification_method_cache WHERE did = ?", 177 ) 178 .bind(did) 179 .fetch_optional(tx.as_mut()) 180 .await 181 .context("failed to select verification method cache record")?; 182 tx.commit().await.context("failed to commit transaction")?; 183 Ok(result) 184} 185 186pub async fn feed_content_truncate_oldest(pool: &StoragePool, age: DateTime<Utc>) -> Result<()> { 187 let mut tx = pool.begin().await.context("failed to begin transaction")?; 188 189 // TODO: This might need an index. 190 sqlx::query("DELETE FROM feed_content WHERE updated_at < ?") 191 .bind(age) 192 .execute(tx.as_mut()) 193 .await 194 .context("failed to delete feed content beyond mark")?; 195 196 tx.commit().await.context("failed to commit transaction") 197} 198 199pub async fn denylist_upsert(pool: &StoragePool, subject: &str, reason: &str) -> Result<()> { 200 let mut tx = pool.begin().await.context("failed to begin transaction")?; 201 202 let now = Utc::now(); 203 sqlx::query("INSERT OR REPLACE INTO denylist (subject, reason, updated_at) VALUES (?, ?, ?)") 204 .bind(subject) 205 .bind(reason) 206 .bind(now) 207 .execute(tx.as_mut()) 208 .await 209 .context("failed to upsert denylist record")?; 210 211 tx.commit().await.context("failed to commit transaction") 212} 213 214pub async fn denylist_remove(pool: &StoragePool, subject: &str) -> Result<()> { 215 let mut tx = pool.begin().await.context("failed to begin transaction")?; 216 217 sqlx::query("DELETE FROM denylist WHERE subject = ?") 218 .bind(subject) 219 .execute(tx.as_mut()) 220 .await 221 .context("failed to delete denylist record")?; 222 223 tx.commit().await.context("failed to commit transaction") 224} 225 226pub async fn feed_content_purge_aturi( 227 pool: &StoragePool, 228 aturi: &str, 229 feed: &Option<String>, 230) -> Result<()> { 231 let mut tx = pool.begin().await.context("failed to begin transaction")?; 232 233 if let Some(feed) = feed { 234 sqlx::query("DELETE FROM feed_content WHERE feed_id = ? AND uri = ?") 235 .bind(feed) 236 .bind(aturi) 237 .execute(tx.as_mut()) 238 .await 239 .context("failed to delete denylist record")?; 240 } else { 241 sqlx::query("DELETE FROM feed_content WHERE uri = ?") 242 .bind(aturi) 243 .execute(tx.as_mut()) 244 .await 245 .context("failed to delete denylist record")?; 246 } 247 248 tx.commit().await.context("failed to commit transaction") 249} 250 251pub async fn denylist_exists(pool: &StoragePool, subjects: &[&str]) -> Result<bool> { 252 let mut tx = pool.begin().await.context("failed to begin transaction")?; 253 254 let mut query_builder: QueryBuilder<Sqlite> = 255 QueryBuilder::new("SELECT COUNT(*) FROM denylist WHERE subject IN ("); 256 let mut separated = query_builder.separated(", "); 257 for subject in subjects { 258 separated.push_bind(subject); 259 } 260 separated.push_unseparated(") "); 261 262 let mut query = sqlx::query_scalar::<_, i64>(query_builder.build().sql()); 263 for subject in subjects { 264 query = query.bind(subject); 265 } 266 let count = query 267 .fetch_one(tx.as_mut()) 268 .await 269 .context("failed to delete denylist record")?; 270 271 tx.commit().await.context("failed to commit transaction")?; 272 273 Ok(count > 0) 274} 275 276#[cfg(test)] 277mod tests { 278 use sqlx::SqlitePool; 279 280 #[sqlx::test] 281 async fn record_feed_content(pool: SqlitePool) -> sqlx::Result<()> { 282 let record = super::model::FeedContent { 283 feed_id: "feed".to_string(), 284 uri: "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n" 285 .to_string(), 286 indexed_at: 1730673934229172_i64, 287 score: 1, 288 }; 289 super::feed_content_upsert(&pool, &record) 290 .await 291 .expect("failed to insert record"); 292 293 let records = super::feed_content_cached(&pool, "feed", 5) 294 .await 295 .expect("failed to paginate records"); 296 297 assert_eq!(records.len(), 1); 298 assert_eq!(records[0].feed_id, "feed"); 299 assert_eq!( 300 records[0].uri, 301 "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n" 302 ); 303 assert_eq!(records[0].indexed_at, 1730673934229172_i64); 304 305 Ok(()) 306 } 307 308 #[sqlx::test] 309 async fn consumer_control(pool: SqlitePool) -> sqlx::Result<()> { 310 super::consumer_control_insert(&pool, "foo", 1730673934229172_i64) 311 .await 312 .expect("failed to insert record"); 313 314 assert_eq!( 315 super::consumer_control_get(&pool, "foo") 316 .await 317 .expect("failed to get record"), 318 Some(1730673934229172_i64) 319 ); 320 321 super::consumer_control_insert(&pool, "foo", 1730673934229173_i64) 322 .await 323 .expect("failed to insert record"); 324 325 assert_eq!( 326 super::consumer_control_get(&pool, "foo") 327 .await 328 .expect("failed to get record"), 329 Some(1730673934229173_i64) 330 ); 331 332 Ok(()) 333 } 334}