Source code for my personal quote bot project.
1use bsky_sdk::api::app::bsky::feed::post;
2use bsky_sdk::api::types::string::Datetime;
3use bsky_sdk::{BskyAgent, api::types::Object};
4
5use glob::glob;
6use grep::{regex, searcher::sinks};
7use rand::seq::SliceRandom;
8use redis::aio::ConnectionManagerConfig;
9
10use std::{sync::Arc, time::Duration};
11use tokio_cron_scheduler::{Job, JobScheduler};
12
13use redis::AsyncCommands;
14
15const DEFAULT_QUEUE: &str = "queue:default";
16const EVENT_QUEUE: &str = "queue:event";
17
18// See https://cron.help for what these strings mean
19const POSTING_INTERVAL_CRON: &str = "0 0,30 * * * *";
20const POSTING_INTERVAL_DEBUG: &str = "1/10 * * * * *";
21const EVENT_UPDATE_INTERVAL: &str = "55 23 * * *";
22
23const POSTING_RETRIES: i32 = 5;
24
25fn prepare_post<I: Into<String>>(text: I) -> post::RecordData {
26 post::RecordData {
27 text: text.into(),
28 created_at: Datetime::now(),
29 embed: None,
30 entities: None,
31 facets: None,
32 labels: None,
33 langs: None,
34 reply: None,
35 tags: None,
36 }
37}
38
39#[derive(Clone, Debug)]
40struct QuoteFilter {
41 path: String,
42 content: String,
43 dates: Vec<String>,
44}
45
46impl QuoteFilter {
47 pub async fn get_quote(
48 &self,
49 mut con: impl redis::aio::ConnectionLike + AsyncCommands + Clone,
50 ) -> Result<String, ()> {
51 // 1: Attempt to read from the event (priority) queue
52 let event_quote: Option<String> = con.lpop(EVENT_QUEUE, None).await.ok();
53 if let Some(quote) = event_quote {
54 return Ok(quote);
55 }
56
57 // 2: Otherwise, we read from the regular queue, repopulating it if it's empty
58 self.reshuffle_quotes(con.clone(), DEFAULT_QUEUE).await?;
59 con.lpop(DEFAULT_QUEUE, None).await.map_err(|_| ())
60 }
61
62 async fn reshuffle_quotes(
63 &self,
64 mut con: impl redis::aio::ConnectionLike + AsyncCommands,
65 output_queue: &str,
66 ) -> Result<(), ()> {
67 let len: u64 = con.llen(output_queue).await.map_err(|_| ())?;
68 // NOTE: The following assumes the queue hasn't been repopulated by any other client
69 // in-between the call to llen and the execution of the pipeline.
70 // Hopefully won't be a problem :)
71 if len == 0 {
72 let mut file_contents = self.read_files();
73
74 {
75 let mut rand = rand::rng();
76 file_contents.shuffle(&mut rand);
77 }
78
79 let mut pipeline = redis::pipe();
80 for file_contents in file_contents.into_iter() {
81 pipeline.lpush(output_queue, file_contents.as_str());
82 }
83 let _: () = pipeline.query_async(&mut con).await.map_err(|_| ())?;
84 }
85
86 Ok(())
87 }
88
89 fn read_files(&self) -> Vec<String> {
90 let matcher = regex::RegexMatcher::new(&self.content).unwrap();
91 let mut searcher = grep::searcher::Searcher::new();
92 let mut results = Vec::new();
93
94 for file in glob(&self.path).unwrap() {
95 let file = match file {
96 Ok(file) => file,
97 Err(_) => continue,
98 };
99
100 let mut matched = false;
101 let sink = sinks::Lossy(|_lnum, _line| {
102 matched = true;
103 Ok(false)
104 });
105
106 let search_result = searcher.search_path(&matcher, &file, sink);
107 if !matched || search_result.is_err() {
108 continue;
109 }
110
111 let contents = std::fs::read_to_string(file).unwrap();
112 results.push(contents.trim().to_string());
113 }
114
115 results
116 }
117}
118
119#[derive(Clone)]
120struct RedisState {
121 con_manager: redis::aio::ConnectionManager,
122}
123
124impl RedisState {
125 pub async fn new(url: String) -> Result<Self, ()> {
126 let redis = redis::Client::open(url).map_err(|_| ())?;
127 let config = ConnectionManagerConfig::new()
128 .set_response_timeout(std::time::Duration::from_secs(10))
129 .set_number_of_retries(3);
130 let con_manager = redis::aio::ConnectionManager::new_with_config(redis, config)
131 .await
132 .map_err(|_| ())?;
133
134 Ok(RedisState { con_manager })
135 }
136
137 pub async fn fetch_quote(&self, filter: &QuoteFilter) -> Result<String, ()> {
138 loop {
139 match filter.get_quote(self.con_manager.clone()).await {
140 Ok(text) => return Ok(text),
141 Err(_) => eprintln!("Error fetching quote from redis storage. Retrying..."),
142 };
143 }
144 }
145}
146
147#[derive(Clone)]
148struct BlueskyState {
149 bsky_agent: BskyAgent,
150 bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>,
151}
152
153impl BlueskyState {
154 pub async fn new_session(username: String, password: String) -> Result<Self, ()> {
155 let agent = BskyAgent::builder().build().await.map_err(|_| ())?;
156 let session = agent.login(username, password).await.map_err(|_| ())?;
157
158 Ok(Self {
159 bsky_agent: agent,
160 bsky_session: session,
161 })
162 }
163
164 pub async fn submit_post(self, post: String) -> Result<(), ()> {
165 let post = prepare_post(post.as_str());
166
167 for current_try in 0..POSTING_RETRIES {
168 if let Err(e) = self.bsky_agent.create_record(post.clone()).await {
169 eprintln!("Could not post quote: `{e}`");
170 eprintln!("Attempting to refresh login...");
171
172 if let Err(e) = self
173 .bsky_agent
174 .resume_session(self.bsky_session.clone())
175 .await
176 {
177 eprintln!("Failed to resume sessions due to following error: {e}")
178 }
179 } else {
180 if current_try > 0 {
181 eprintln!("Successfully posted quote on retry #{current_try}");
182 }
183 return Ok(());
184 }
185 }
186
187 Err(())
188 }
189}
190
191#[derive(Clone)]
192struct State {
193 redis: RedisState,
194 bsky_session: Option<BlueskyState>,
195}
196
197impl State {
198 pub fn redis(&self) -> &RedisState {
199 &self.redis
200 }
201
202 pub fn bsky(&self) -> Option<&BlueskyState> {
203 self.bsky_session.as_ref()
204 }
205}
206
207#[tokio::main]
208async fn main() -> Result<(), Box<dyn std::error::Error>> {
209 let debug_mode = std::env::var("DEBUG").unwrap_or("0".to_string()) == "1";
210 let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1";
211
212 let redis_state =
213 RedisState::new(std::env::var("REDIS_URL").unwrap_or("redis://localhost".to_string()))
214 .await
215 .expect("Initial redis connection failure");
216 let bsky_state = if use_bsky {
217 Some(
218 BlueskyState::new_session(
219 std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"),
220 std::env::var("BLUESKY_PASSWORD")
221 .expect("Bluesky application password not supplied"),
222 )
223 .await
224 .expect("Could not connect to Bluesky with supplied credentials"),
225 )
226 } else {
227 None
228 };
229
230 let app_state = Arc::new(State {
231 redis: redis_state,
232 bsky_session: bsky_state,
233 });
234
235 let sched = JobScheduler::new().await?;
236
237 /*
238 let event_filter = Arc::new(QuoteFilter {
239 content: r"\b(?i:mother|mommy|mama|mom)\b".to_string(),
240 path: "test/**/
241*.txt".to_string(),
242 dates: vec![],
243 });
244 */
245
246 let regular_filter = Arc::new(QuoteFilter {
247 content: r".*".to_string(),
248 path: if !debug_mode {
249 "quotes/**/*.txt".to_string()
250 } else {
251 "test/**/*.txt".to_string()
252 },
253 dates: vec![],
254 });
255
256 let posting_interval = if !debug_mode {
257 POSTING_INTERVAL_CRON
258 } else {
259 POSTING_INTERVAL_DEBUG
260 };
261
262 let post_job = Job::new_async(posting_interval, move |_uuid, _| {
263 let filter = regular_filter.clone();
264 let app_state = app_state.clone();
265
266 Box::pin(async move {
267 // We try fetching a new quote from our redis storage until we succeed
268 let text = match app_state.redis().fetch_quote(&filter).await {
269 Ok(text) => text,
270 Err(_) => {
271 eprintln!("Error fetching quote from redis storage.");
272 return;
273 }
274 };
275
276 if let Some(bsky) = app_state.bsky() {
277 if let Err(_) = bsky.clone().submit_post(text).await {
278 eprintln!("Error posting to bluesky.");
279 return;
280 }
281 } else {
282 // Let's just print the quote!
283 println!("{}\n", text);
284 }
285 })
286 })?;
287
288 // Add async job
289 sched.add(post_job).await?;
290
291 // sched
292 // .add(Job::new_async(EVENT_UPDATE_INTERVAL, move |_uuid, _| {
293 // let filter = event_filter.clone();
294 // let con = con_event_monitor.clone();
295 // let _agent = agent_event_monitor.clone(); // Can be used later to e.g. update profile
296
297 // Box::pin(async move {
298 // // For testing purposes, let's always upload events
299 // reshuffle_quotes(&filter, con.clone(), EVENT_QUEUE)
300 // .await
301 // .unwrap();
302 // })
303 // })?)
304 // .await?;
305
306 sched
307 .start()
308 .await
309 .expect("Error starting tokio scheduler. Shutting down...");
310 loop {
311 tokio::time::sleep(Duration::from_secs(10)).await;
312 }
313}