this repo has no description

feature: feed caching and restoration

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+41 -1
+1
Cargo.toml
··· 43 reqwest = { version = "0.12.9", features = ["json", "zstd", "rustls-tls"] } 44 rhai = { version = "1.20.0", features = ["serde", "std", "sync"]} 45 duration-str = "0.11.2"
··· 43 reqwest = { version = "0.12.9", features = ["json", "zstd", "rustls-tls"] } 44 rhai = { version = "1.20.0", features = ["serde", "std", "sync"]} 45 duration-str = "0.11.2" 46 + fnv_rs = "0.4.3"
+36 -1
src/cache.rs
··· 1 use anyhow::Result; 2 use chrono::Utc; 3 - use std::{collections::HashMap, sync::Arc}; 4 use tokio::sync::RwLock; 5 use tokio_util::sync::CancellationToken; 6 ··· 103 let sleeper = tokio::time::sleep(interval); 104 tokio::pin!(sleeper); 105 106 loop { 107 tokio::select! { 108 () = self.cancellation_token.cancelled() => { ··· 122 Ok(()) 123 } 124 125 pub async fn main(&self) -> Result<()> { 126 for feed in &self.config.feeds.feeds { 127 let query = feed.query.clone(); ··· 150 let posts = feed_content_cached(&self.pool, feed_uri, limit).await?; 151 let posts = posts.iter().map(|post| post.uri.clone()).collect(); 152 self.cache.update_feed(feed_uri, &posts).await; 153 Ok(()) 154 } 155 ··· 173 let sorted_posts = scored_posts.iter().map(|post| post.1.clone()).collect(); 174 175 self.cache.update_feed(feed_uri, &sorted_posts).await; 176 Ok(()) 177 } 178 }
··· 1 use anyhow::Result; 2 use chrono::Utc; 3 + use fnv_rs::{Fnv64, FnvHasher}; 4 + use std::{collections::HashMap, path::PathBuf, sync::Arc}; 5 use tokio::sync::RwLock; 6 use tokio_util::sync::CancellationToken; 7 ··· 104 let sleeper = tokio::time::sleep(interval); 105 tokio::pin!(sleeper); 106 107 + self.load_cache().await?; 108 + 109 loop { 110 tokio::select! { 111 () = self.cancellation_token.cancelled() => { ··· 125 Ok(()) 126 } 127 128 + async fn load_cache(&self) -> Result<()> { 129 + if self.config.feed_cache_dir.is_empty() { 130 + return Ok(()); 131 + } 132 + 133 + for feed in &self.config.feeds.feeds { 134 + let hash = Fnv64::hash(feed.uri.as_bytes()); 135 + let cache_file = 136 + PathBuf::from(&self.config.feed_cache_dir).join(format!("{}.json", hash)); 137 + 138 + if let Ok(posts) = std::fs::read_to_string(&cache_file) { 139 + let posts: Vec<String> = serde_json::from_str(&posts)?; 140 + self.cache.update_feed(&feed.uri, &posts).await; 141 + } 142 + } 143 + Ok(()) 144 + } 145 + 146 + async fn write_cache(&self, feed_id: &str, posts: &Vec<String>) -> Result<()> { 147 + if self.config.feed_cache_dir.is_empty() { 148 + return Ok(()); 149 + } 150 + let hash = Fnv64::hash(feed_id.as_bytes()); 151 + let cache_file = PathBuf::from(&self.config.feed_cache_dir).join(format!("{}.json", hash)); 152 + 153 + let posts = serde_json::to_string(&posts)?; 154 + std::fs::write(&cache_file, posts)?; 155 + Ok(()) 156 + } 157 + 158 pub async fn main(&self) -> Result<()> { 159 for feed in &self.config.feeds.feeds { 160 let query = feed.query.clone(); ··· 183 let posts = feed_content_cached(&self.pool, feed_uri, limit).await?; 184 let posts = posts.iter().map(|post| post.uri.clone()).collect(); 185 self.cache.update_feed(feed_uri, &posts).await; 186 + self.write_cache(feed_uri, &posts).await?; 187 Ok(()) 188 } 189 ··· 207 let sorted_posts = scored_posts.iter().map(|post| post.1.clone()).collect(); 208 209 self.cache.update_feed(feed_uri, &sorted_posts).await; 210 + self.write_cache(feed_uri, &sorted_posts).await?; 211 Ok(()) 212 } 213 }
+4
src/config.rs
··· 129 pub feeds: Feeds, 130 pub compression: Compression, 131 pub collections: Collections, 132 } 133 134 impl Config { ··· 184 let collections: Collections = 185 default_env("COLLECTIONS", "app.bsky.feed.post").try_into()?; 186 187 Ok(Self { 188 version: version()?, 189 http_port, ··· 204 feeds, 205 compression, 206 collections, 207 }) 208 } 209 }
··· 129 pub feeds: Feeds, 130 pub compression: Compression, 131 pub collections: Collections, 132 + pub feed_cache_dir: String, 133 } 134 135 impl Config { ··· 185 let collections: Collections = 186 default_env("COLLECTIONS", "app.bsky.feed.post").try_into()?; 187 188 + let feed_cache_dir = optional_env("FEED_CACHE_DIR"); 189 + 190 Ok(Self { 191 version: version()?, 192 http_port, ··· 207 feeds, 208 compression, 209 collections, 210 + feed_cache_dir, 211 }) 212 } 213 }