Source code for my personal quote bot project.
at rust 9.6 kB view raw
1use bsky_sdk::api::app::bsky::feed::post; 2use bsky_sdk::api::types::string::Datetime; 3use bsky_sdk::{BskyAgent, api::types::Object}; 4 5use glob::glob; 6use grep::{regex, searcher::sinks}; 7use rand::seq::SliceRandom; 8use redis::aio::ConnectionManagerConfig; 9 10use std::{sync::Arc, time::Duration}; 11use tokio_cron_scheduler::{Job, JobScheduler}; 12 13use redis::AsyncCommands; 14 15const DEFAULT_QUEUE: &str = "queue:default"; 16const EVENT_QUEUE: &str = "queue:event"; 17 18// See https://cron.help for what these strings mean 19const POSTING_INTERVAL_CRON: &str = "0 0,30 * * * *"; 20const POSTING_INTERVAL_DEBUG: &str = "1/10 * * * * *"; 21const EVENT_UPDATE_INTERVAL: &str = "55 23 * * *"; 22 23const POSTING_RETRIES: i32 = 5; 24 25fn prepare_post<I: Into<String>>(text: I) -> post::RecordData { 26 post::RecordData { 27 text: text.into(), 28 created_at: Datetime::now(), 29 embed: None, 30 entities: None, 31 facets: None, 32 labels: None, 33 langs: None, 34 reply: None, 35 tags: None, 36 } 37} 38 39#[derive(Clone, Debug)] 40struct QuoteFilter { 41 path: String, 42 content: String, 43 dates: Vec<String>, 44} 45 46impl QuoteFilter { 47 pub async fn get_quote( 48 &self, 49 mut con: impl redis::aio::ConnectionLike + AsyncCommands + Clone, 50 ) -> Result<String, ()> { 51 // 1: Attempt to read from the event (priority) queue 52 let event_quote: Option<String> = con.lpop(EVENT_QUEUE, None).await.ok(); 53 if let Some(quote) = event_quote { 54 return Ok(quote); 55 } 56 57 // 2: Otherwise, we read from the regular queue, repopulating it if it's empty 58 self.reshuffle_quotes(con.clone(), DEFAULT_QUEUE).await?; 59 con.lpop(DEFAULT_QUEUE, None).await.map_err(|_| ()) 60 } 61 62 async fn reshuffle_quotes( 63 &self, 64 mut con: impl redis::aio::ConnectionLike + AsyncCommands, 65 output_queue: &str, 66 ) -> Result<(), ()> { 67 let len: u64 = con.llen(output_queue).await.map_err(|_| ())?; 68 // NOTE: The following assumes the queue hasn't been repopulated by any other client 69 // in-between the call to llen and the execution of the pipeline. 70 // Hopefully won't be a problem :) 71 if len == 0 { 72 let mut file_contents = self.read_files(); 73 74 { 75 let mut rand = rand::rng(); 76 file_contents.shuffle(&mut rand); 77 } 78 79 let mut pipeline = redis::pipe(); 80 for file_contents in file_contents.into_iter() { 81 pipeline.lpush(output_queue, file_contents.as_str()); 82 } 83 let _: () = pipeline.query_async(&mut con).await.map_err(|_| ())?; 84 } 85 86 Ok(()) 87 } 88 89 fn read_files(&self) -> Vec<String> { 90 let matcher = regex::RegexMatcher::new(&self.content).unwrap(); 91 let mut searcher = grep::searcher::Searcher::new(); 92 let mut results = Vec::new(); 93 94 for file in glob(&self.path).unwrap() { 95 let file = match file { 96 Ok(file) => file, 97 Err(_) => continue, 98 }; 99 100 let mut matched = false; 101 let sink = sinks::Lossy(|_lnum, _line| { 102 matched = true; 103 Ok(false) 104 }); 105 106 let search_result = searcher.search_path(&matcher, &file, sink); 107 if !matched || search_result.is_err() { 108 continue; 109 } 110 111 let contents = std::fs::read_to_string(file).unwrap(); 112 results.push(contents.trim().to_string()); 113 } 114 115 results 116 } 117} 118 119#[derive(Clone)] 120struct RedisState { 121 con_manager: redis::aio::ConnectionManager, 122} 123 124impl RedisState { 125 pub async fn new(url: String) -> Result<Self, ()> { 126 let redis = redis::Client::open(url).map_err(|_| ())?; 127 let config = ConnectionManagerConfig::new() 128 .set_response_timeout(std::time::Duration::from_secs(10)) 129 .set_number_of_retries(3); 130 let con_manager = redis::aio::ConnectionManager::new_with_config(redis, config) 131 .await 132 .map_err(|_| ())?; 133 134 Ok(RedisState { con_manager }) 135 } 136 137 pub async fn fetch_quote(&self, filter: &QuoteFilter) -> Result<String, ()> { 138 loop { 139 match filter.get_quote(self.con_manager.clone()).await { 140 Ok(text) => return Ok(text), 141 Err(_) => eprintln!("Error fetching quote from redis storage. Retrying..."), 142 }; 143 } 144 } 145} 146 147#[derive(Clone)] 148struct BlueskyState { 149 bsky_agent: BskyAgent, 150 bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>, 151} 152 153impl BlueskyState { 154 pub async fn new_session(username: String, password: String) -> Result<Self, ()> { 155 let agent = BskyAgent::builder().build().await.map_err(|_| ())?; 156 let session = agent.login(username, password).await.map_err(|_| ())?; 157 158 Ok(Self { 159 bsky_agent: agent, 160 bsky_session: session, 161 }) 162 } 163 164 pub async fn submit_post(self, post: String) -> Result<(), ()> { 165 let post = prepare_post(post.as_str()); 166 167 for current_try in 0..POSTING_RETRIES { 168 if let Err(e) = self.bsky_agent.create_record(post.clone()).await { 169 eprintln!("Could not post quote: `{e}`"); 170 eprintln!("Attempting to refresh login..."); 171 172 if let Err(e) = self 173 .bsky_agent 174 .resume_session(self.bsky_session.clone()) 175 .await 176 { 177 eprintln!("Failed to resume sessions due to following error: {e}") 178 } 179 } else { 180 if current_try > 0 { 181 eprintln!("Successfully posted quote on retry #{current_try}"); 182 } 183 return Ok(()); 184 } 185 } 186 187 Err(()) 188 } 189} 190 191#[derive(Clone)] 192struct State { 193 redis: RedisState, 194 bsky_session: Option<BlueskyState>, 195} 196 197impl State { 198 pub fn redis(&self) -> &RedisState { 199 &self.redis 200 } 201 202 pub fn bsky(&self) -> Option<&BlueskyState> { 203 self.bsky_session.as_ref() 204 } 205} 206 207#[tokio::main] 208async fn main() -> Result<(), Box<dyn std::error::Error>> { 209 let debug_mode = std::env::var("DEBUG").unwrap_or("0".to_string()) == "1"; 210 let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1"; 211 212 let redis_state = 213 RedisState::new(std::env::var("REDIS_URL").unwrap_or("redis://localhost".to_string())) 214 .await 215 .expect("Initial redis connection failure"); 216 let bsky_state = if use_bsky { 217 Some( 218 BlueskyState::new_session( 219 std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"), 220 std::env::var("BLUESKY_PASSWORD") 221 .expect("Bluesky application password not supplied"), 222 ) 223 .await 224 .expect("Could not connect to Bluesky with supplied credentials"), 225 ) 226 } else { 227 None 228 }; 229 230 let app_state = Arc::new(State { 231 redis: redis_state, 232 bsky_session: bsky_state, 233 }); 234 235 let sched = JobScheduler::new().await?; 236 237 /* 238 let event_filter = Arc::new(QuoteFilter { 239 content: r"\b(?i:mother|mommy|mama|mom)\b".to_string(), 240 path: "test/**/ 241*.txt".to_string(), 242 dates: vec![], 243 }); 244 */ 245 246 let regular_filter = Arc::new(QuoteFilter { 247 content: r".*".to_string(), 248 path: if !debug_mode { 249 "quotes/**/*.txt".to_string() 250 } else { 251 "test/**/*.txt".to_string() 252 }, 253 dates: vec![], 254 }); 255 256 let posting_interval = if !debug_mode { 257 POSTING_INTERVAL_CRON 258 } else { 259 POSTING_INTERVAL_DEBUG 260 }; 261 262 let post_job = Job::new_async(posting_interval, move |_uuid, _| { 263 let filter = regular_filter.clone(); 264 let app_state = app_state.clone(); 265 266 Box::pin(async move { 267 // We try fetching a new quote from our redis storage until we succeed 268 let text = match app_state.redis().fetch_quote(&filter).await { 269 Ok(text) => text, 270 Err(_) => { 271 eprintln!("Error fetching quote from redis storage."); 272 return; 273 } 274 }; 275 276 if let Some(bsky) = app_state.bsky() { 277 if let Err(_) = bsky.clone().submit_post(text).await { 278 eprintln!("Error posting to bluesky."); 279 return; 280 } 281 } else { 282 // Let's just print the quote! 283 println!("{}\n", text); 284 } 285 }) 286 })?; 287 288 // Add async job 289 sched.add(post_job).await?; 290 291 // sched 292 // .add(Job::new_async(EVENT_UPDATE_INTERVAL, move |_uuid, _| { 293 // let filter = event_filter.clone(); 294 // let con = con_event_monitor.clone(); 295 // let _agent = agent_event_monitor.clone(); // Can be used later to e.g. update profile 296 297 // Box::pin(async move { 298 // // For testing purposes, let's always upload events 299 // reshuffle_quotes(&filter, con.clone(), EVENT_QUEUE) 300 // .await 301 // .unwrap(); 302 // }) 303 // })?) 304 // .await?; 305 306 sched 307 .start() 308 .await 309 .expect("Error starting tokio scheduler. Shutting down..."); 310 loop { 311 tokio::time::sleep(Duration::from_secs(10)).await; 312 } 313}