···11-cargo-features = ["edition2024"] # For rust-analyzer to work
22-31[package]
42name = "audquotes"
53version = "0.1.0"
···8697[dependencies]
108bsky-sdk = "0.1.16"
99+chrono = "0.4.42"
1010+cron-lite = { version = "0.3.0", features = ["async"] }
1111+futures = "0.3.31"
1112glob = "0.3.2"
1213grep = "0.3.2"
1414+kameo = "0.17.2"
1315rand = "0.9.0"
1416redis = { version = "0.29.1", features = ["aio", "connection-manager", "tokio-comp"] }
1517tokio = { version = "1.44.0", features = ["full"] }
1616-tokio-cron-scheduler = "0.13.0"
1818+tokio-cron-scheduler = "0.15.1"
+24
src/data.rs
···11+/// A newtype over [String] to represent a single quote.
22+/// [Quote] does not implement [PartialEq] or [Eq] as, on principle, two
33+/// quotes may be comprised of the same contents whilst still
44+/// representing distinct quotes in practice (e.g. two different lines of dialogue which happen to be the same).
55+#[derive(Debug, Clone)]
66+pub struct Quote(String);
77+88+impl<S: AsRef<str>> From<S> for Quote {
99+ fn from(value: S) -> Self {
1010+ Self(value.as_ref().to_owned())
1111+ }
1212+}
1313+1414+impl From<Quote> for String {
1515+ fn from(value: Quote) -> Self {
1616+ value.0
1717+ }
1818+}
1919+2020+impl Quote {
2121+ pub fn get(&self) -> &str {
2222+ &self.0
2323+ }
2424+}
+94
src/lib.rs
···11+pub mod data;
22+pub mod sink;
33+pub mod storage;
44+55+pub mod run {
66+ use std::time::Duration;
77+88+ use crate::sink::{BskySink, PostQuote, SinkManager, StdoutSink};
99+ use crate::storage::{
1010+ FetchQuote, QuoteCycle, queue::MemoryQueueStorage, source::FsFilterSourceManager,
1111+ };
1212+ use cron_lite::CronEvent;
1313+ use futures::StreamExt;
1414+ use kameo::prelude::*;
1515+ use tokio::time::timeout;
1616+1717+ pub async fn entrypoint() -> Result<(), Box<dyn std::error::Error>> {
1818+ // TODO: Clean up this function's internals.
1919+ // The current structure is alright, but it was stitched together
2020+ // quickly just to confirm that everything is functioning as it should.
2121+ let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1";
2222+ let bsky = if use_bsky {
2323+ Some(BskySink::spawn(
2424+ BskySink::new_session(
2525+ std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"),
2626+ std::env::var("BLUESKY_PASSWORD")
2727+ .expect("Bluesky application password not supplied"),
2828+ )
2929+ .await
3030+ .expect("Could not connect to Bluesky with supplied credentials"),
3131+ ))
3232+ } else {
3333+ None
3434+ };
3535+3636+ let sink = {
3737+ let stdout = StdoutSink::spawn(StdoutSink);
3838+ SinkManager::spawn(SinkManager::new(Some(stdout), bsky))
3939+ };
4040+4141+ let cycle = {
4242+ let source = FsFilterSourceManager::spawn(FsFilterSourceManager::default());
4343+ let queue = MemoryQueueStorage::spawn(MemoryQueueStorage::new());
4444+4545+ QuoteCycle::spawn(QuoteCycle::with_thread_rng(source, queue))
4646+ };
4747+4848+ use cron_lite::Schedule;
4949+ const POSTING_TIMEOUT: Duration = Duration::from_secs(60);
5050+ const POSTING_INTERVAL: &str = "*/10 * * * * * *";
5151+ let schedule =
5252+ Schedule::new(POSTING_INTERVAL).expect("Schedule should be a valid cron expression");
5353+ let now = chrono::Utc::now();
5454+5555+ let mut tick_stream = schedule.stream(&now);
5656+5757+ while let Some(tick) = tick_stream.next().await {
5858+ if let CronEvent::Missed(missed_at) = tick {
5959+ eprintln!(
6060+ "Missed event tick at {}. Current time: {}. Skipping post.",
6161+ missed_at,
6262+ chrono::Utc::now()
6363+ );
6464+ continue;
6565+ }
6666+6767+ // We store the code to perform the next posting iteration as one atomic future which we wrap with a timeout.
6868+ // This means that, if we miss a posting window due to the timeout, we will not get multiple consecutive or late posts.
6969+ let next_post_iteration = async || -> Result<(), Box<dyn std::error::Error>> {
7070+ let next_quote = cycle
7171+ .ask(FetchQuote)
7272+ .await
7373+ .map_err(|_| "fetch quote should always succeed")?;
7474+7575+ // Note: By using `tell`, we don't know when each sink's code will have completed.
7676+ // If any sink uses, say, a file or stdout, that resource may well be contested between
7777+ // consecutive iterations of this loop.
7878+ sink.tell(PostQuote(next_quote)).await?;
7979+ println!();
8080+8181+ Ok(())
8282+ };
8383+8484+ if let Err(e) = timeout(POSTING_TIMEOUT, next_post_iteration()).await {
8585+ eprintln!(
8686+ "Could not submit post in time to all sinks. Timeout error: {}",
8787+ e
8888+ );
8989+ }
9090+ }
9191+9292+ Ok(())
9393+ }
9494+}
+1-310
src/main.rs
···11-use bsky_sdk::api::app::bsky::feed::post;
22-use bsky_sdk::api::types::string::Datetime;
33-use bsky_sdk::{BskyAgent, api::types::Object};
44-55-use glob::glob;
66-use grep::{regex, searcher::sinks};
77-use rand::seq::SliceRandom;
88-use redis::aio::ConnectionManagerConfig;
99-1010-use std::{sync::Arc, time::Duration};
1111-use tokio_cron_scheduler::{Job, JobScheduler};
1212-1313-use redis::AsyncCommands;
1414-1515-const DEFAULT_QUEUE: &str = "queue:default";
1616-const EVENT_QUEUE: &str = "queue:event";
1717-1818-// See https://cron.help for what these strings mean
1919-const POSTING_INTERVAL_CRON: &str = "0 0,30 * * * *";
2020-const POSTING_INTERVAL_DEBUG: &str = "1/10 * * * * *";
2121-const EVENT_UPDATE_INTERVAL: &str = "55 23 * * *";
2222-2323-const POSTING_RETRIES: i32 = 5;
2424-2525-fn prepare_post<I: Into<String>>(text: I) -> post::RecordData {
2626- post::RecordData {
2727- text: text.into(),
2828- created_at: Datetime::now(),
2929- embed: None,
3030- entities: None,
3131- facets: None,
3232- labels: None,
3333- langs: None,
3434- reply: None,
3535- tags: None,
3636- }
3737-}
3838-3939-#[derive(Clone, Debug)]
4040-struct QuoteFilter {
4141- path: String,
4242- content: String,
4343- dates: Vec<String>,
4444-}
4545-4646-impl QuoteFilter {
4747- pub async fn get_quote(
4848- &self,
4949- mut con: impl redis::aio::ConnectionLike + AsyncCommands + Clone,
5050- ) -> Result<String, ()> {
5151- // 1: Attempt to read from the event (priority) queue
5252- let event_quote: Option<String> = con.lpop(EVENT_QUEUE, None).await.ok();
5353- if let Some(quote) = event_quote {
5454- return Ok(quote);
5555- }
5656-5757- // 2: Otherwise, we read from the regular queue, repopulating it if it's empty
5858- self.reshuffle_quotes(con.clone(), DEFAULT_QUEUE).await?;
5959- con.lpop(DEFAULT_QUEUE, None).await.map_err(|_| ())
6060- }
6161-6262- async fn reshuffle_quotes(
6363- &self,
6464- mut con: impl redis::aio::ConnectionLike + AsyncCommands,
6565- output_queue: &str,
6666- ) -> Result<(), ()> {
6767- let len: u64 = con.llen(output_queue).await.map_err(|_| ())?;
6868- // NOTE: The following assumes the queue hasn't been repopulated by any other client
6969- // in-between the call to llen and the execution of the pipeline.
7070- // Hopefully won't be a problem :)
7171- if len == 0 {
7272- let mut file_contents = self.read_files();
7373-7474- {
7575- let mut rand = rand::rng();
7676- file_contents.shuffle(&mut rand);
7777- }
7878-7979- let mut pipeline = redis::pipe();
8080- for file_contents in file_contents.into_iter() {
8181- pipeline.lpush(output_queue, file_contents.as_str());
8282- }
8383- let _: () = pipeline.query_async(&mut con).await.map_err(|_| ())?;
8484- }
8585-8686- Ok(())
8787- }
8888-8989- fn read_files(&self) -> Vec<String> {
9090- let matcher = regex::RegexMatcher::new(&self.content).unwrap();
9191- let mut searcher = grep::searcher::Searcher::new();
9292- let mut results = Vec::new();
9393-9494- for file in glob(&self.path).unwrap() {
9595- let file = match file {
9696- Ok(file) => file,
9797- Err(_) => continue,
9898- };
9999-100100- let mut matched = false;
101101- let sink = sinks::Lossy(|_lnum, _line| {
102102- matched = true;
103103- Ok(false)
104104- });
105105-106106- let search_result = searcher.search_path(&matcher, &file, sink);
107107- if !matched || search_result.is_err() {
108108- continue;
109109- }
110110-111111- let contents = std::fs::read_to_string(file).unwrap();
112112- results.push(contents.trim().to_string());
113113- }
114114-115115- results
116116- }
117117-}
118118-119119-#[derive(Clone)]
120120-struct RedisState {
121121- con_manager: redis::aio::ConnectionManager,
122122-}
123123-124124-impl RedisState {
125125- pub async fn new(url: String) -> Result<Self, ()> {
126126- let redis = redis::Client::open(url).map_err(|_| ())?;
127127- let config = ConnectionManagerConfig::new()
128128- .set_response_timeout(std::time::Duration::from_secs(10))
129129- .set_number_of_retries(3);
130130- let con_manager = redis::aio::ConnectionManager::new_with_config(redis, config)
131131- .await
132132- .map_err(|_| ())?;
133133-134134- Ok(RedisState { con_manager })
135135- }
136136-137137- pub async fn fetch_quote(&self, filter: &QuoteFilter) -> Result<String, ()> {
138138- loop {
139139- match filter.get_quote(self.con_manager.clone()).await {
140140- Ok(text) => return Ok(text),
141141- Err(_) => eprintln!("Error fetching quote from redis storage. Retrying..."),
142142- };
143143- }
144144- }
145145-}
146146-147147-#[derive(Clone)]
148148-struct BlueskyState {
149149- bsky_agent: BskyAgent,
150150- bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>,
151151-}
152152-153153-impl BlueskyState {
154154- pub async fn new_session(username: String, password: String) -> Result<Self, ()> {
155155- let agent = BskyAgent::builder().build().await.map_err(|_| ())?;
156156- let session = agent.login(username, password).await.map_err(|_| ())?;
157157-158158- Ok(Self {
159159- bsky_agent: agent,
160160- bsky_session: session,
161161- })
162162- }
163163-164164- pub async fn submit_post(self, post: String) -> Result<(), ()> {
165165- let post = prepare_post(post.as_str());
166166-167167- for current_try in 0..POSTING_RETRIES {
168168- if let Err(e) = self.bsky_agent.create_record(post.clone()).await {
169169- eprintln!("Could not post quote: `{e}`");
170170- eprintln!("Attempting to refresh login...");
171171-172172- if let Err(e) = self
173173- .bsky_agent
174174- .resume_session(self.bsky_session.clone())
175175- .await
176176- {
177177- eprintln!("Failed to resume sessions due to following error: {e}")
178178- }
179179- } else {
180180- if current_try > 0 {
181181- eprintln!("Successfully posted quote on retry #{current_try}");
182182- }
183183- return Ok(());
184184- }
185185- }
186186-187187- Err(())
188188- }
189189-}
190190-191191-#[derive(Clone)]
192192-struct State {
193193- redis: RedisState,
194194- bsky_session: Option<BlueskyState>,
195195-}
196196-197197-impl State {
198198- pub fn redis(&self) -> &RedisState {
199199- &self.redis
200200- }
201201-202202- pub fn bsky(&self) -> Option<&BlueskyState> {
203203- self.bsky_session.as_ref()
204204- }
205205-}
206206-2071#[tokio::main]
2082async fn main() -> Result<(), Box<dyn std::error::Error>> {
209209- let debug_mode = std::env::var("DEBUG").unwrap_or("0".to_string()) == "1";
210210- let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1";
211211-212212- let redis_state =
213213- RedisState::new(std::env::var("REDIS_URL").unwrap_or("redis://localhost".to_string()))
214214- .await
215215- .expect("Initial redis connection failure");
216216- let bsky_state = if use_bsky {
217217- Some(
218218- BlueskyState::new_session(
219219- std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"),
220220- std::env::var("BLUESKY_PASSWORD")
221221- .expect("Bluesky application password not supplied"),
222222- )
223223- .await
224224- .expect("Could not connect to Bluesky with supplied credentials"),
225225- )
226226- } else {
227227- None
228228- };
229229-230230- let app_state = Arc::new(State {
231231- redis: redis_state,
232232- bsky_session: bsky_state,
233233- });
234234-235235- let sched = JobScheduler::new().await?;
236236-237237- /*
238238- let event_filter = Arc::new(QuoteFilter {
239239- content: r"\b(?i:mother|mommy|mama|mom)\b".to_string(),
240240- path: "test/**/
241241-*.txt".to_string(),
242242- dates: vec![],
243243- });
244244- */
245245-246246- let regular_filter = Arc::new(QuoteFilter {
247247- content: r".*".to_string(),
248248- path: if !debug_mode {
249249- "quotes/**/*.txt".to_string()
250250- } else {
251251- "test/**/*.txt".to_string()
252252- },
253253- dates: vec![],
254254- });
255255-256256- let posting_interval = if !debug_mode {
257257- POSTING_INTERVAL_CRON
258258- } else {
259259- POSTING_INTERVAL_DEBUG
260260- };
261261-262262- let post_job = Job::new_async(posting_interval, move |_uuid, _| {
263263- let filter = regular_filter.clone();
264264- let app_state = app_state.clone();
265265-266266- Box::pin(async move {
267267- // We try fetching a new quote from our redis storage until we succeed
268268- let text = match app_state.redis().fetch_quote(&filter).await {
269269- Ok(text) => text,
270270- Err(_) => {
271271- eprintln!("Error fetching quote from redis storage.");
272272- return;
273273- }
274274- };
275275-276276- if let Some(bsky) = app_state.bsky() {
277277- if let Err(_) = bsky.clone().submit_post(text).await {
278278- eprintln!("Error posting to bluesky.");
279279- return;
280280- }
281281- } else {
282282- // Let's just print the quote!
283283- println!("{}\n", text);
284284- }
285285- })
286286- })?;
287287-288288- // Add async job
289289- sched.add(post_job).await?;
290290-291291- // sched
292292- // .add(Job::new_async(EVENT_UPDATE_INTERVAL, move |_uuid, _| {
293293- // let filter = event_filter.clone();
294294- // let con = con_event_monitor.clone();
295295- // let _agent = agent_event_monitor.clone(); // Can be used later to e.g. update profile
296296-297297- // Box::pin(async move {
298298- // // For testing purposes, let's always upload events
299299- // reshuffle_quotes(&filter, con.clone(), EVENT_QUEUE)
300300- // .await
301301- // .unwrap();
302302- // })
303303- // })?)
304304- // .await?;
305305-306306- sched
307307- .start()
308308- .await
309309- .expect("Error starting tokio scheduler. Shutting down...");
310310- loop {
311311- tokio::time::sleep(Duration::from_secs(10)).await;
312312- }
33+ audquotes::run::entrypoint().await
3134}
+192
src/sink.rs
···11+use crate::data::Quote;
22+use bsky_sdk::{BskyAgent, api::types::Object};
33+use kameo::prelude::*;
44+55+/// A newtype over [Quote] used to prompt the [SinkManager] to
66+/// submit a new quote to all its configured sinks.
77+#[derive(Debug, Clone)]
88+pub struct PostQuote(pub Quote);
99+1010+/// Error type for internal communication between
1111+/// the [SinkManager] and its sinks.
1212+/// The error reporting performed internally does not necessarily match the
1313+/// behavior which other modules will observe.
1414+#[derive(Debug, Clone)]
1515+pub enum PostFailure {
1616+ /// Indicates that a given quote could not be posted to a sink,
1717+ /// but that it *may* be retried. The `reinitialize` boolean signals
1818+ /// whether the sink should be reinitialized before further attempts.
1919+ Retry { reinitialize: bool },
2020+2121+ /// Indicates that a given quote could not be posted to a sink,
2222+ /// as it is unsupported by it in some way (e.g. quote exceeds the sink's length limit).
2323+ Unsupported,
2424+2525+ /// Indicates that a given quote could not be posted to a sink
2626+ /// due to the occurrence of some unrecoverable error.
2727+ /// It is thus unlikely that the sink will work in the future, even if
2828+ /// reinitialized.
2929+ Unrecoverable,
3030+}
3131+3232+pub type PostResult = Result<(), PostFailure>;
3333+3434+/// Represents internal implementation details of the interactions between
3535+/// the [SinkManager] and its sinks.
3636+pub trait QuoteSink: Actor + Message<PostQuote, Reply = PostResult> {}
3737+3838+/// A [QuoteSink] which will output the contents of each quote
3939+/// over Stdout. Is primarily meant for testing and observing sink behavior.
4040+#[derive(Actor)]
4141+pub struct StdoutSink;
4242+4343+impl Message<PostQuote> for StdoutSink {
4444+ type Reply = PostResult;
4545+4646+ async fn handle(
4747+ &mut self,
4848+ PostQuote(quote): PostQuote,
4949+ _ctx: &mut Context<Self, Self::Reply>,
5050+ ) -> Self::Reply {
5151+ println!("{}", quote.get());
5252+ Ok(())
5353+ }
5454+}
5555+5656+/// A [QuoteSink] which will post the contents of each quote to Bluesky.
5757+#[derive(Actor)]
5858+pub struct BskySink {
5959+ bsky_agent: BskyAgent,
6060+ bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>,
6161+}
6262+6363+impl BskySink {
6464+ pub async fn new_session(username: String, password: String) -> Result<Self, ()> {
6565+ let agent = BskyAgent::builder().build().await.map_err(|_| ())?;
6666+ let session = agent.login(username, password).await.map_err(|_| ())?;
6767+6868+ Ok(Self {
6969+ bsky_agent: agent,
7070+ bsky_session: session,
7171+ })
7272+ }
7373+7474+ async fn submit_post(&mut self, quote: Quote) -> Result<(), ()> {
7575+ let post = bsky_sdk::api::app::bsky::feed::post::RecordData {
7676+ text: quote.into(),
7777+ created_at: bsky_sdk::api::types::string::Datetime::now(),
7878+ embed: None,
7979+ entities: None,
8080+ facets: None,
8181+ labels: None,
8282+ langs: None,
8383+ reply: None,
8484+ tags: None,
8585+ };
8686+8787+ if let Err(e) = self
8888+ .bsky_agent
8989+ .resume_session(self.bsky_session.clone())
9090+ .await
9191+ {
9292+ eprintln!("Failed to resume sessions due to following error: {e}");
9393+ return Err(());
9494+ }
9595+9696+ match self.bsky_agent.create_record(post.clone()).await {
9797+ Ok(_) => Ok(()),
9898+ Err(_) => Err(()),
9999+ }
100100+ }
101101+}
102102+103103+impl Message<PostQuote> for BskySink {
104104+ type Reply = PostResult;
105105+106106+ async fn handle(
107107+ &mut self,
108108+ PostQuote(quote): PostQuote,
109109+ _ctx: &mut Context<Self, Self::Reply>,
110110+ ) -> Self::Reply {
111111+ match self.submit_post(quote).await {
112112+ Ok(_) => Ok(()),
113113+ Err(_) => Err(PostFailure::Unrecoverable),
114114+ }
115115+ }
116116+}
117117+118118+/// Supervises all [QuoteSink] actors within the program, forwarding
119119+/// [PostQuote] messages to them as they are received.
120120+/// The SinkManager will attempt to reinitialize failed sinks upon
121121+/// encountering recoverable errors.
122122+#[derive(Actor)]
123123+pub struct SinkManager {
124124+ // Uh oh. As the [Actor] trait is *not* dyn-compatible,
125125+ // and I do not own its definition, I'm fairly certain that I cannot
126126+ // do asynchronous dynamic dispatch for it here.
127127+ // I've decided I'll limit this to one sink per implementation right now.
128128+ stdout_sink: Option<ActorRef<StdoutSink>>,
129129+ bsky_sink: Option<ActorRef<BskySink>>,
130130+ // ...
131131+}
132132+133133+impl SinkManager {
134134+ pub fn new(
135135+ stdout_sink: Option<ActorRef<StdoutSink>>,
136136+ bsky_sink: Option<ActorRef<BskySink>>,
137137+ ) -> Self {
138138+ Self {
139139+ stdout_sink,
140140+ bsky_sink,
141141+ }
142142+ }
143143+}
144144+145145+pub type SinkReplies = Vec<Result<(), ()>>;
146146+147147+impl Message<PostQuote> for SinkManager {
148148+ type Reply = SinkReplies;
149149+150150+ async fn handle(
151151+ &mut self,
152152+ msg: PostQuote,
153153+ _ctx: &mut Context<Self, Self::Reply>,
154154+ ) -> Self::Reply {
155155+ use futures::future::join_all;
156156+157157+ let stdout_result = self
158158+ .stdout_sink
159159+ .as_ref()
160160+ .map(|s| s.ask(msg.clone()).into_future());
161161+162162+ let bsky_result = self
163163+ .bsky_sink
164164+ .as_ref()
165165+ .map(|s| s.ask(msg.clone()).into_future());
166166+167167+ let futures = [stdout_result, bsky_result].into_iter().flatten();
168168+ let results = join_all(futures).await;
169169+170170+ results.iter().map(|r| r.clone().or(Err(()))).collect()
171171+ }
172172+}
173173+174174+mod test {
175175+ #[tokio::test]
176176+ async fn stdout_sink() {
177177+ use super::*;
178178+179179+ let stdout = StdoutSink::spawn(StdoutSink);
180180+ let manager = SinkManager::spawn(SinkManager::new(Some(stdout), None));
181181+182182+ let messages = ["First test!", "Second test.", "Third..."];
183183+ for msg in messages {
184184+ manager.tell(PostQuote(msg.into())).await.unwrap();
185185+ tokio::time::sleep(std::time::Duration::from_secs(5)).await;
186186+ }
187187+188188+ // Hopefully we don't crash...!
189189+ // TODO: Sink that actually stores every quote it "posts"?
190190+ // Could help in verifying everything was sent correctly.
191191+ }
192192+}
+424
src/storage.rs
···11+use kameo::prelude::*;
22+33+use crate::storage::{
44+ queue::{DequeueQuote, EnqueueQuotes},
55+ source::SourceQuotes,
66+};
77+88+use crate::data::Quote;
99+1010+mod rng {
1111+ use rand::SeedableRng;
1212+1313+ pub struct PrngState {
1414+ rng: rand::rngs::SmallRng,
1515+ }
1616+1717+ impl PrngState {
1818+ pub fn from_thread_rng() -> Self {
1919+ Self {
2020+ rng: rand::rngs::SmallRng::from_rng(&mut rand::rng()),
2121+ }
2222+ }
2323+2424+ pub fn from_seed(seed: u64) -> Self {
2525+ Self {
2626+ rng: rand::rngs::SmallRng::seed_from_u64(seed),
2727+ }
2828+ }
2929+3030+ pub fn shuffle_slice<T>(&mut self, slice: &mut [T]) {
3131+ use rand::seq::SliceRandom;
3232+ slice.shuffle(&mut self.rng);
3333+ }
3434+ }
3535+3636+ mod test {
3737+ #[tokio::test]
3838+ async fn shuffle_slice() {
3939+ use super::*;
4040+4141+ let mut data = vec![1, 2, 3, 4];
4242+ let mut rng = PrngState::from_thread_rng();
4343+4444+ rng.shuffle_slice(&mut data);
4545+ println!("{:?}", data);
4646+ }
4747+ }
4848+}
4949+5050+pub mod source {
5151+ use super::*;
5252+5353+ // TODO: Should the quote source filters be
5454+ // generic over the exact manager implementation being used?
5555+ /// Message to request that a SourceManager source its quotes once again.
5656+ pub struct SourceQuotes;
5757+ pub type SourceReply = Result<Vec<Quote>, ()>;
5858+5959+ /// Subtrait of Actor which specifically
6060+ /// denotes actors that can handle all relevant source messages.
6161+ pub trait SourceManager: Actor + Message<SourceQuotes, Reply = SourceReply> {}
6262+6363+ impl<T> SourceManager for T where T: Message<SourceQuotes, Reply = SourceReply> {}
6464+6565+ /// Implementation of [`SourceManager`] which sources quotes from a Vec
6666+ /// that it holds in memory, without accessing external services.
6767+ /// Its main purpose is to be used for testing.
6868+ #[derive(Actor)]
6969+ pub struct MemorySourceManager {
7070+ quotes: Vec<Quote>,
7171+ }
7272+7373+ impl MemorySourceManager {
7474+ pub fn new(quotes: impl IntoIterator<Item = impl Into<Quote>>) -> Self {
7575+ Self {
7676+ quotes: quotes.into_iter().map(Into::into).collect(),
7777+ }
7878+ }
7979+ }
8080+8181+ impl Message<SourceQuotes> for MemorySourceManager {
8282+ type Reply = SourceReply;
8383+8484+ async fn handle(
8585+ &mut self,
8686+ _msg: SourceQuotes,
8787+ _ctx: &mut Context<Self, Self::Reply>,
8888+ ) -> Self::Reply {
8989+ // We just clone the quotes we've been holding onto since startup
9090+ Ok(self.quotes.clone())
9191+ }
9292+ }
9393+9494+ /// Uses a [QuoteFilter] to source quotes from the local filesystem
9595+ /// at the beginning of each cycle.
9696+ #[derive(Actor)]
9797+ pub struct FsFilterSourceManager {
9898+ filter: QuoteFilter,
9999+ }
100100+101101+ impl FsFilterSourceManager {
102102+ pub fn new(filter: QuoteFilter) -> Self {
103103+ Self { filter }
104104+ }
105105+ }
106106+107107+ impl Default for FsFilterSourceManager {
108108+ fn default() -> Self {
109109+ Self {
110110+ filter: QuoteFilter {
111111+ content: r".*".to_string(),
112112+ // TODO: Maybe make this a compile-time constant for debugging
113113+ path: "test/**/*.txt".to_string(),
114114+ _dates: vec![],
115115+ },
116116+ }
117117+ }
118118+ }
119119+120120+ impl Message<SourceQuotes> for FsFilterSourceManager {
121121+ type Reply = SourceReply;
122122+123123+ async fn handle(
124124+ &mut self,
125125+ _msg: SourceQuotes,
126126+ _ctx: &mut Context<Self, Self::Reply>,
127127+ ) -> Self::Reply {
128128+ self.filter.read_files()
129129+ }
130130+ }
131131+132132+ #[derive(Clone, Debug)]
133133+ pub struct QuoteFilter {
134134+ path: String,
135135+ content: String,
136136+ _dates: Vec<String>,
137137+ }
138138+139139+ impl QuoteFilter {
140140+ // TODO: actually leverage async I/O
141141+ fn read_files(&self) -> Result<Vec<Quote>, ()> {
142142+ use glob::glob;
143143+ use grep::{regex, searcher::sinks};
144144+145145+ let matcher = regex::RegexMatcher::new(&self.content).map_err(|_| ())?;
146146+ let mut searcher = grep::searcher::Searcher::new();
147147+ let mut results = Vec::new();
148148+149149+ for file in glob(&self.path).map_err(|_| ())? {
150150+ let file = match file {
151151+ Ok(file) => file,
152152+ Err(_) => continue,
153153+ };
154154+155155+ let mut matched = false;
156156+ let sink = sinks::Lossy(|_lnum, _line| {
157157+ matched = true;
158158+ Ok(false)
159159+ });
160160+161161+ let search_result = searcher.search_path(&matcher, &file, sink);
162162+ if !matched || search_result.is_err() {
163163+ continue;
164164+ }
165165+166166+ let contents = std::fs::read_to_string(file).map_err(|_| ())?;
167167+ results.push(contents.trim().into());
168168+ }
169169+170170+ Ok(results)
171171+ }
172172+ }
173173+}
174174+175175+pub mod queue {
176176+ use std::collections::VecDeque;
177177+178178+ use super::*;
179179+180180+ // Messages to interact with the quote queue
181181+ pub struct DequeueQuote;
182182+ pub type DequeueReply = Result<Option<Quote>, ()>;
183183+184184+ pub struct EnqueueQuotes(pub Vec<Quote>);
185185+ pub type EnqueueReply = Result<(), ()>;
186186+187187+ /// Subtrait of Actor which specifically
188188+ /// denotes actors that can handle all relevant queue messages.
189189+ pub trait QueueManager:
190190+ Actor
191191+ + Message<EnqueueQuotes, Reply = EnqueueReply>
192192+ + Message<DequeueQuote, Reply = DequeueReply>
193193+ {
194194+ }
195195+196196+ impl<T> QueueManager for T where
197197+ T: Message<EnqueueQuotes, Reply = EnqueueReply>
198198+ + Message<DequeueQuote, Reply = DequeueReply>
199199+ {
200200+ }
201201+202202+ /// A basic implementation of an in-memory queue of quotes.
203203+ /// Its contents are *not* persisted across application restarts, so it
204204+ /// is only suited for testing purposes.
205205+ #[derive(Actor, Default)]
206206+ pub struct MemoryQueueStorage {
207207+ quotes: VecDeque<Quote>,
208208+ }
209209+210210+ impl MemoryQueueStorage {
211211+ pub fn new() -> Self {
212212+ Self {
213213+ quotes: VecDeque::new(),
214214+ }
215215+ }
216216+ }
217217+218218+ impl Message<EnqueueQuotes> for MemoryQueueStorage {
219219+ /// We only need to signal success or failure in this instance,
220220+ /// with no added metadata in either case.
221221+ type Reply = EnqueueReply;
222222+223223+ async fn handle(
224224+ &mut self,
225225+ msg: EnqueueQuotes,
226226+ _ctx: &mut Context<Self, Self::Reply>,
227227+ ) -> Self::Reply {
228228+ for q in msg.0 {
229229+ self.quotes.push_back(q);
230230+ }
231231+232232+ Ok(())
233233+ }
234234+ }
235235+236236+ impl Message<DequeueQuote> for MemoryQueueStorage {
237237+ type Reply = DequeueReply;
238238+239239+ async fn handle(
240240+ &mut self,
241241+ _msg: DequeueQuote,
242242+ _ctx: &mut Context<Self, Self::Reply>,
243243+ ) -> Self::Reply {
244244+ // Note: this can never fail, since the quotes are stored in memory
245245+ Ok(self.quotes.pop_front())
246246+ }
247247+ }
248248+}
249249+250250+#[derive(Actor)]
251251+pub struct QuoteCycle<S: source::SourceManager, Q: queue::QueueManager> {
252252+ rng: rng::PrngState,
253253+ source_manager: ActorRef<S>,
254254+ queue_manager: ActorRef<Q>,
255255+}
256256+257257+impl<S: source::SourceManager, Q: queue::QueueManager> QuoteCycle<S, Q> {
258258+ pub fn new(
259259+ rng: rng::PrngState,
260260+ source_manager: ActorRef<S>,
261261+ queue_manager: ActorRef<Q>,
262262+ ) -> Self {
263263+ Self {
264264+ rng,
265265+ source_manager,
266266+ queue_manager,
267267+ }
268268+ }
269269+270270+ pub fn with_thread_rng(source_manager: ActorRef<S>, queue_manager: ActorRef<Q>) -> Self {
271271+ Self {
272272+ rng: rng::PrngState::from_thread_rng(),
273273+ source_manager,
274274+ queue_manager,
275275+ }
276276+ }
277277+}
278278+279279+/// A message to [QuoteCycle] to fetch one more quote from its storage.
280280+pub struct FetchQuote;
281281+282282+impl<S, Q> Message<FetchQuote> for QuoteCycle<S, Q>
283283+where
284284+ S: source::SourceManager,
285285+ Q: queue::QueueManager,
286286+{
287287+ type Reply = Result<Quote, ()>;
288288+289289+ async fn handle(
290290+ &mut self,
291291+ _msg: FetchQuote,
292292+ _ctx: &mut Context<Self, Self::Reply>,
293293+ ) -> Self::Reply {
294294+ // 1. We query our queue storage for the next quote
295295+ if let Some(next_quote) = self.queue_manager.ask(DequeueQuote).await.map_err(|_| ())? {
296296+ // if there is a quote, we simply return it and move on
297297+ return Ok(next_quote);
298298+ }
299299+300300+ // 2. Otherwise, we must repopulate the queue through our source
301301+ let mut refreshed_quotes = self
302302+ .source_manager
303303+ .ask(SourceQuotes)
304304+ .await
305305+ .map_err(|_| ())?;
306306+307307+ // 3. We shuffle the newly-sourced quotes
308308+ self.rng.shuffle_slice(&mut refreshed_quotes);
309309+ let refreshed_quotes = refreshed_quotes; // No longer mutable
310310+311311+ // TODO: Perhaps we should assert that the new quotes are non-empty?
312312+ // 4. We enqueue the newly-sourced quotes...
313313+ self.queue_manager
314314+ .ask(EnqueueQuotes(refreshed_quotes))
315315+ .await
316316+ .map_err(|_| ())?;
317317+318318+ // 5. and, finally, we return the first among them.
319319+ match self.queue_manager.ask(DequeueQuote).await {
320320+ Ok(Some(q)) => Ok(q),
321321+ Ok(None) => panic!("Newly-enqueued quotes should never be empty"),
322322+ Err(_) => Err(()),
323323+ }
324324+ }
325325+}
326326+327327+mod test {
328328+ #[tokio::test]
329329+ async fn memory_queue() {
330330+ use super::Quote;
331331+ use super::queue::*;
332332+ use kameo::prelude::*;
333333+334334+ let queue_manager = MemoryQueueStorage::spawn(MemoryQueueStorage::new());
335335+336336+ let sample_quotes = ["Test no.1", "Test no.2", "Test no.3"];
337337+338338+ queue_manager
339339+ .ask(EnqueueQuotes(
340340+ sample_quotes.iter().cloned().map(Quote::from).collect(),
341341+ ))
342342+ .await
343343+ .expect("In-memory quote queue storage should be valid for insertion");
344344+345345+ for text in sample_quotes.iter() {
346346+ assert_eq!(
347347+ *text,
348348+ queue_manager
349349+ .ask(DequeueQuote)
350350+ .await
351351+ .expect("In-memory queue storage should never panic on dequeue")
352352+ .expect("In-memory queue storage should never be initialized as empty")
353353+ .get()
354354+ );
355355+ }
356356+ }
357357+358358+ #[tokio::test]
359359+ async fn memory_source() {
360360+ use super::source::*;
361361+ use kameo::prelude::*;
362362+363363+ let sample_quotes = ["Minie", "Miney", "Moe", "and", "some", "more"];
364364+365365+ let source_manager = MemorySourceManager::spawn(MemorySourceManager::new(sample_quotes));
366366+367367+ let quotes = source_manager
368368+ .ask(SourceQuotes)
369369+ .await
370370+ .expect("In-memory quote queue storage should be valid for insertion");
371371+372372+ assert_eq!(
373373+ sample_quotes.as_slice(),
374374+ quotes
375375+ .into_iter()
376376+ // Since [Quote] doesn't implement any Equality trait,
377377+ // we map the strings into quotes instead
378378+ .map(String::from)
379379+ .collect::<Vec<_>>()
380380+ .as_slice(),
381381+ );
382382+ }
383383+384384+ #[tokio::test]
385385+ async fn memory_cycle() {
386386+ use std::{collections::HashMap, ops::AddAssign};
387387+388388+ use super::FetchQuote;
389389+ use super::QuoteCycle;
390390+ use super::queue::*;
391391+ use super::source::*;
392392+ use kameo::prelude::*;
393393+394394+ let sample_quotes = ["Minie", "Miney", "Moe"];
395395+ let cycle = {
396396+ let source = MemorySourceManager::spawn(MemorySourceManager::new(sample_quotes));
397397+ let queue = MemoryQueueStorage::spawn(MemoryQueueStorage::new());
398398+399399+ QuoteCycle::spawn(QuoteCycle::with_thread_rng(source, queue))
400400+ };
401401+402402+ // We loop over `sample_quotes` twice to simulate the queue being exhausted fully, then re-sourced
403403+ // Since the `cycle` manager will shuffle the quote sequence, we will verify that each
404404+ // quote appears *exactly* `LOOPS` times throughout these iterations.
405405+ const LOOPS: usize = 3;
406406+ let mut quote_counts = HashMap::new();
407407+ for _ in 0..(sample_quotes.len() * LOOPS) {
408408+ let next_quote = cycle.ask(FetchQuote).await.unwrap();
409409+ quote_counts
410410+ .entry(next_quote.get().to_owned())
411411+ .or_insert(0)
412412+ .add_assign(1);
413413+ }
414414+415415+ let quote_counts = quote_counts; // no longer mut
416416+ assert!(
417417+ // Note: technically speaking, different quotes could contain equivalent strings,
418418+ // which would make this test fail; a "more proper" invariant check would ensure
419419+ // verify that all counts are a multiple of the amount of times `sample_quotes` was chained,
420420+ // and that the sum of all counts equals the total number of times a `FetchQuote` message was sent.
421421+ quote_counts.into_values().into_iter().all(|c| c == LOOPS)
422422+ );
423423+ }
424424+}