this repo has no description
at main 7.3 kB view raw
1use anyhow::Result; 2use chrono::Utc; 3use fnv_rs::{Fnv64, FnvHasher}; 4use std::{collections::HashMap, path::PathBuf, sync::Arc}; 5use tokio::sync::RwLock; 6use tokio_util::sync::CancellationToken; 7 8use crate::storage::{feed_content_cached, StoragePool}; 9 10pub struct InnerCache { 11 page_size: u8, 12 cached_feeds: HashMap<String, Vec<Vec<String>>>, 13} 14 15#[derive(Clone)] 16pub struct Cache { 17 pub inner_cache: Arc<RwLock<InnerCache>>, 18} 19 20impl Default for InnerCache { 21 fn default() -> Self { 22 Self { 23 page_size: 20, 24 cached_feeds: HashMap::new(), 25 } 26 } 27} 28 29impl Default for Cache { 30 fn default() -> Self { 31 Self { 32 inner_cache: Arc::new(RwLock::new(InnerCache::default())), 33 } 34 } 35} 36 37impl InnerCache { 38 pub(crate) fn new(page_size: u8) -> Self { 39 Self { 40 page_size, 41 cached_feeds: HashMap::new(), 42 } 43 } 44} 45 46impl Cache { 47 pub fn new(page_size: u8) -> Self { 48 Self { 49 inner_cache: Arc::new(RwLock::new(InnerCache::new(page_size))), 50 } 51 } 52 53 pub(crate) async fn get_posts(&self, feed_id: &str, page: usize) -> Option<Vec<String>> { 54 let inner = self.inner_cache.read().await; 55 56 let feed_chunks = inner.cached_feeds.get(feed_id)?; 57 58 if page > feed_chunks.len() { 59 return Some(vec![]); 60 } 61 62 feed_chunks.get(page).cloned() 63 } 64 65 #[allow(clippy::ptr_arg)] 66 pub(crate) async fn update_feed(&self, feed_id: &str, posts: &Vec<String>) { 67 let mut inner = self.inner_cache.write().await; 68 69 let chunks = posts 70 .chunks(inner.page_size.into()) 71 .map(|chunk| chunk.to_vec()) 72 .collect(); 73 74 inner.cached_feeds.insert(feed_id.to_string(), chunks); 75 } 76} 77 78pub struct CacheTask { 79 pub pool: StoragePool, 80 pub cache: Cache, 81 pub config: crate::config::Config, 82 83 pub cancellation_token: CancellationToken, 84} 85 86impl CacheTask { 87 pub fn new( 88 pool: StoragePool, 89 cache: Cache, 90 config: crate::config::Config, 91 cancellation_token: CancellationToken, 92 ) -> Self { 93 Self { 94 pool, 95 cache, 96 config, 97 cancellation_token, 98 } 99 } 100 101 pub async fn run_background(&self, interval: chrono::Duration) -> Result<()> { 102 let interval = interval.to_std()?; 103 104 let sleeper = tokio::time::sleep(interval); 105 tokio::pin!(sleeper); 106 107 self.load_cache().await?; 108 109 loop { 110 tokio::select! { 111 () = self.cancellation_token.cancelled() => { 112 break; 113 }, 114 () = &mut sleeper => { 115 116 if let Err(err) = self.main().await { 117 tracing::error!("CacheTask task failed: {}", err); 118 } 119 120 121 sleeper.as_mut().reset(tokio::time::Instant::now() + interval); 122 } 123 } 124 } 125 Ok(()) 126 } 127 128 async fn load_cache(&self) -> Result<()> { 129 if self.config.feed_cache_dir.is_empty() { 130 return Ok(()); 131 } 132 133 for feed in &self.config.feeds.feeds { 134 let hash = Fnv64::hash(feed.uri.as_bytes()); 135 let cache_file = 136 PathBuf::from(&self.config.feed_cache_dir).join(format!("{}.json", hash)); 137 138 if let Ok(posts) = std::fs::read_to_string(&cache_file) { 139 let posts: Vec<String> = serde_json::from_str(&posts)?; 140 self.cache.update_feed(&feed.uri, &posts).await; 141 } 142 } 143 Ok(()) 144 } 145 146 async fn write_cache(&self, feed_id: &str, posts: &Vec<String>) -> Result<()> { 147 if self.config.feed_cache_dir.is_empty() { 148 return Ok(()); 149 } 150 let hash = Fnv64::hash(feed_id.as_bytes()); 151 let cache_file = PathBuf::from(&self.config.feed_cache_dir).join(format!("{}.json", hash)); 152 153 let posts = serde_json::to_string(&posts)?; 154 std::fs::write(&cache_file, posts)?; 155 Ok(()) 156 } 157 158 pub async fn main(&self) -> Result<()> { 159 for feed in &self.config.feeds.feeds { 160 let query = feed.query.clone(); 161 162 match query { 163 crate::config::FeedQuery::Simple { limit } => { 164 if let Err(err) = self.generate_simple(&feed.uri, *limit.as_ref()).await { 165 tracing::error!(error = ?err, feed_uri = ?feed.uri, "failed to generate simple feed"); 166 } 167 } 168 crate::config::FeedQuery::Popular { gravity, limit } => { 169 if let Err(err) = self 170 .generate_popular(&feed.uri, gravity, *limit.as_ref()) 171 .await 172 { 173 tracing::error!(error = ?err, feed_uri = ?feed.uri, "failed to generate simple feed"); 174 } 175 } 176 } 177 } 178 179 Ok(()) 180 } 181 182 async fn generate_simple(&self, feed_uri: &str, limit: u32) -> Result<()> { 183 let posts = feed_content_cached(&self.pool, feed_uri, limit).await?; 184 let posts = posts.iter().map(|post| post.uri.clone()).collect(); 185 self.cache.update_feed(feed_uri, &posts).await; 186 self.write_cache(feed_uri, &posts).await?; 187 Ok(()) 188 } 189 190 async fn generate_popular(&self, feed_uri: &str, gravity: f64, limit: u32) -> Result<()> { 191 let posts = feed_content_cached(&self.pool, feed_uri, limit).await?; 192 193 let now = Utc::now().timestamp(); 194 let mut scored_posts = posts 195 .iter() 196 .map(|post| { 197 let age = post.age_in_hours(now); 198 199 let score = ((post.score - 1).max(0) as f64) / ((2 + age) as f64).powf(gravity); 200 201 (score, post.uri.clone(), age) 202 }) 203 .collect::<Vec<(f64, String, i64)>>(); 204 205 scored_posts.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap()); 206 207 let sorted_posts = scored_posts.iter().map(|post| post.1.clone()).collect(); 208 209 self.cache.update_feed(feed_uri, &sorted_posts).await; 210 self.write_cache(feed_uri, &sorted_posts).await?; 211 Ok(()) 212 } 213} 214 215#[cfg(test)] 216mod tests { 217 218 use super::*; 219 use anyhow::Result; 220 221 #[tokio::test] 222 async fn record_feed_content() -> Result<()> { 223 let sorted_posts = (0..12) 224 .map(|value| format!("at://did:not:real/post/{}", value)) 225 .collect(); 226 227 let cache = Cache::new(5); 228 cache.update_feed("feed", &sorted_posts).await; 229 230 assert_eq!( 231 cache.get_posts("feed", 0).await, 232 Some( 233 (0..5) 234 .map(|value| format!("at://did:not:real/post/{}", value)) 235 .collect() 236 ) 237 ); 238 assert_eq!( 239 cache.get_posts("feed", 1).await, 240 Some( 241 (5..10) 242 .map(|value| format!("at://did:not:real/post/{}", value)) 243 .collect() 244 ) 245 ); 246 assert_eq!( 247 cache.get_posts("feed", 2).await, 248 Some( 249 (10..12) 250 .map(|value| format!("at://did:not:real/post/{}", value)) 251 .collect() 252 ) 253 ); 254 assert_eq!(cache.get_posts("feed", 3).await, None); 255 256 Ok(()) 257 } 258}