this repo has no description
1use anyhow::Result;
2use chrono::Utc;
3use fnv_rs::{Fnv64, FnvHasher};
4use std::{collections::HashMap, path::PathBuf, sync::Arc};
5use tokio::sync::RwLock;
6use tokio_util::sync::CancellationToken;
7
8use crate::storage::{feed_content_cached, StoragePool};
9
10pub struct InnerCache {
11 page_size: u8,
12 cached_feeds: HashMap<String, Vec<Vec<String>>>,
13}
14
15#[derive(Clone)]
16pub struct Cache {
17 pub inner_cache: Arc<RwLock<InnerCache>>,
18}
19
20impl Default for InnerCache {
21 fn default() -> Self {
22 Self {
23 page_size: 20,
24 cached_feeds: HashMap::new(),
25 }
26 }
27}
28
29impl Default for Cache {
30 fn default() -> Self {
31 Self {
32 inner_cache: Arc::new(RwLock::new(InnerCache::default())),
33 }
34 }
35}
36
37impl InnerCache {
38 pub(crate) fn new(page_size: u8) -> Self {
39 Self {
40 page_size,
41 cached_feeds: HashMap::new(),
42 }
43 }
44}
45
46impl Cache {
47 pub fn new(page_size: u8) -> Self {
48 Self {
49 inner_cache: Arc::new(RwLock::new(InnerCache::new(page_size))),
50 }
51 }
52
53 pub(crate) async fn get_posts(&self, feed_id: &str, page: usize) -> Option<Vec<String>> {
54 let inner = self.inner_cache.read().await;
55
56 let feed_chunks = inner.cached_feeds.get(feed_id)?;
57
58 if page > feed_chunks.len() {
59 return Some(vec![]);
60 }
61
62 feed_chunks.get(page).cloned()
63 }
64
65 #[allow(clippy::ptr_arg)]
66 pub(crate) async fn update_feed(&self, feed_id: &str, posts: &Vec<String>) {
67 let mut inner = self.inner_cache.write().await;
68
69 let chunks = posts
70 .chunks(inner.page_size.into())
71 .map(|chunk| chunk.to_vec())
72 .collect();
73
74 inner.cached_feeds.insert(feed_id.to_string(), chunks);
75 }
76}
77
78pub struct CacheTask {
79 pub pool: StoragePool,
80 pub cache: Cache,
81 pub config: crate::config::Config,
82
83 pub cancellation_token: CancellationToken,
84}
85
86impl CacheTask {
87 pub fn new(
88 pool: StoragePool,
89 cache: Cache,
90 config: crate::config::Config,
91 cancellation_token: CancellationToken,
92 ) -> Self {
93 Self {
94 pool,
95 cache,
96 config,
97 cancellation_token,
98 }
99 }
100
101 pub async fn run_background(&self, interval: chrono::Duration) -> Result<()> {
102 let interval = interval.to_std()?;
103
104 let sleeper = tokio::time::sleep(interval);
105 tokio::pin!(sleeper);
106
107 self.load_cache().await?;
108
109 loop {
110 tokio::select! {
111 () = self.cancellation_token.cancelled() => {
112 break;
113 },
114 () = &mut sleeper => {
115
116 if let Err(err) = self.main().await {
117 tracing::error!("CacheTask task failed: {}", err);
118 }
119
120
121 sleeper.as_mut().reset(tokio::time::Instant::now() + interval);
122 }
123 }
124 }
125 Ok(())
126 }
127
128 async fn load_cache(&self) -> Result<()> {
129 if self.config.feed_cache_dir.is_empty() {
130 return Ok(());
131 }
132
133 for feed in &self.config.feeds.feeds {
134 let hash = Fnv64::hash(feed.uri.as_bytes());
135 let cache_file =
136 PathBuf::from(&self.config.feed_cache_dir).join(format!("{}.json", hash));
137
138 if let Ok(posts) = std::fs::read_to_string(&cache_file) {
139 let posts: Vec<String> = serde_json::from_str(&posts)?;
140 self.cache.update_feed(&feed.uri, &posts).await;
141 }
142 }
143 Ok(())
144 }
145
146 async fn write_cache(&self, feed_id: &str, posts: &Vec<String>) -> Result<()> {
147 if self.config.feed_cache_dir.is_empty() {
148 return Ok(());
149 }
150 let hash = Fnv64::hash(feed_id.as_bytes());
151 let cache_file = PathBuf::from(&self.config.feed_cache_dir).join(format!("{}.json", hash));
152
153 let posts = serde_json::to_string(&posts)?;
154 std::fs::write(&cache_file, posts)?;
155 Ok(())
156 }
157
158 pub async fn main(&self) -> Result<()> {
159 for feed in &self.config.feeds.feeds {
160 let query = feed.query.clone();
161
162 match query {
163 crate::config::FeedQuery::Simple { limit } => {
164 if let Err(err) = self.generate_simple(&feed.uri, *limit.as_ref()).await {
165 tracing::error!(error = ?err, feed_uri = ?feed.uri, "failed to generate simple feed");
166 }
167 }
168 crate::config::FeedQuery::Popular { gravity, limit } => {
169 if let Err(err) = self
170 .generate_popular(&feed.uri, gravity, *limit.as_ref())
171 .await
172 {
173 tracing::error!(error = ?err, feed_uri = ?feed.uri, "failed to generate simple feed");
174 }
175 }
176 }
177 }
178
179 Ok(())
180 }
181
182 async fn generate_simple(&self, feed_uri: &str, limit: u32) -> Result<()> {
183 let posts = feed_content_cached(&self.pool, feed_uri, limit).await?;
184 let posts = posts.iter().map(|post| post.uri.clone()).collect();
185 self.cache.update_feed(feed_uri, &posts).await;
186 self.write_cache(feed_uri, &posts).await?;
187 Ok(())
188 }
189
190 async fn generate_popular(&self, feed_uri: &str, gravity: f64, limit: u32) -> Result<()> {
191 let posts = feed_content_cached(&self.pool, feed_uri, limit).await?;
192
193 let now = Utc::now().timestamp();
194 let mut scored_posts = posts
195 .iter()
196 .map(|post| {
197 let age = post.age_in_hours(now);
198
199 let score = ((post.score - 1).max(0) as f64) / ((2 + age) as f64).powf(gravity);
200
201 (score, post.uri.clone(), age)
202 })
203 .collect::<Vec<(f64, String, i64)>>();
204
205 scored_posts.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap());
206
207 let sorted_posts = scored_posts.iter().map(|post| post.1.clone()).collect();
208
209 self.cache.update_feed(feed_uri, &sorted_posts).await;
210 self.write_cache(feed_uri, &sorted_posts).await?;
211 Ok(())
212 }
213}
214
215#[cfg(test)]
216mod tests {
217
218 use super::*;
219 use anyhow::Result;
220
221 #[tokio::test]
222 async fn record_feed_content() -> Result<()> {
223 let sorted_posts = (0..12)
224 .map(|value| format!("at://did:not:real/post/{}", value))
225 .collect();
226
227 let cache = Cache::new(5);
228 cache.update_feed("feed", &sorted_posts).await;
229
230 assert_eq!(
231 cache.get_posts("feed", 0).await,
232 Some(
233 (0..5)
234 .map(|value| format!("at://did:not:real/post/{}", value))
235 .collect()
236 )
237 );
238 assert_eq!(
239 cache.get_posts("feed", 1).await,
240 Some(
241 (5..10)
242 .map(|value| format!("at://did:not:real/post/{}", value))
243 .collect()
244 )
245 );
246 assert_eq!(
247 cache.get_posts("feed", 2).await,
248 Some(
249 (10..12)
250 .map(|value| format!("at://did:not:real/post/{}", value))
251 .collect()
252 )
253 );
254 assert_eq!(cache.get_posts("feed", 3).await, None);
255
256 Ok(())
257 }
258}