···11-cargo-features = ["edition2024"] # For rust-analyzer to work
22-31[package]
42name = "audquotes"
53version = "0.1.0"
···8697[dependencies]
108bsky-sdk = "0.1.16"
99+chrono = "0.4.42"
1110cron-lite = { version = "0.3.0", features = ["async"] }
1211futures = "0.3.31"
1312glob = "0.3.2"
+65-10
src/lib.rs
···33pub mod storage;
4455pub mod run {
66- use crate::sink::{PostQuote, SinkManager, StdoutSink};
66+ use std::time::Duration;
77+88+ use crate::sink::{BskySink, PostQuote, SinkManager, StdoutSink};
79 use crate::storage::{
810 FetchQuote, QuoteCycle, queue::MemoryQueueStorage, source::FsFilterSourceManager,
911 };
1212+ use cron_lite::CronEvent;
1313+ use futures::StreamExt;
1014 use kameo::prelude::*;
1515+ use tokio::time::timeout;
11161217 pub async fn entrypoint() -> Result<(), Box<dyn std::error::Error>> {
1318 // TODO: Clean up this function's internals.
1419 // The current structure is alright, but it was stitched together
1520 // quickly just to confirm that everything is functioning as it should.
2121+ let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1";
2222+ let bsky = if use_bsky {
2323+ Some(BskySink::spawn(
2424+ BskySink::new_session(
2525+ std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"),
2626+ std::env::var("BLUESKY_PASSWORD")
2727+ .expect("Bluesky application password not supplied"),
2828+ )
2929+ .await
3030+ .expect("Could not connect to Bluesky with supplied credentials"),
3131+ ))
3232+ } else {
3333+ None
3434+ };
16351736 let sink = {
1837 let stdout = StdoutSink::spawn(StdoutSink);
1919- SinkManager::spawn(SinkManager::new(Some(stdout)))
3838+ SinkManager::spawn(SinkManager::new(Some(stdout), bsky))
2039 };
21402241 let cycle = {
···2645 QuoteCycle::spawn(QuoteCycle::with_thread_rng(source, queue))
2746 };
28472929- loop {
3030- let next_quote = cycle
3131- .ask(FetchQuote)
3232- .await
3333- .map_err(|_| "fetch quote should always succeed")?;
3434- sink.tell(PostQuote(next_quote)).await?;
3535- tokio::time::sleep(std::time::Duration::from_secs(3)).await;
3636- println!()
4848+ use cron_lite::Schedule;
4949+ const POSTING_TIMEOUT: Duration = Duration::from_secs(60);
5050+ const POSTING_INTERVAL: &str = "*/10 * * * * * *";
5151+ let schedule =
5252+ Schedule::new(POSTING_INTERVAL).expect("Schedule should be a valid cron expression");
5353+ let now = chrono::Utc::now();
5454+5555+ let mut tick_stream = schedule.stream(&now);
5656+5757+ while let Some(tick) = tick_stream.next().await {
5858+ if let CronEvent::Missed(missed_at) = tick {
5959+ eprintln!(
6060+ "Missed event tick at {}. Current time: {}. Skipping post.",
6161+ missed_at,
6262+ chrono::Utc::now()
6363+ );
6464+ continue;
6565+ }
6666+6767+ // We store the code to perform the next posting iteration as one atomic future which we wrap with a timeout.
6868+ // This means that, if we miss a posting window due to the timeout, we will not get multiple consecutive or late posts.
6969+ let next_post_iteration = async || -> Result<(), Box<dyn std::error::Error>> {
7070+ let next_quote = cycle
7171+ .ask(FetchQuote)
7272+ .await
7373+ .map_err(|_| "fetch quote should always succeed")?;
7474+7575+ // Note: By using `tell`, we don't know when each sink's code will have completed.
7676+ // If any sink uses, say, a file or stdout, that resource may well be contested between
7777+ // consecutive iterations of this loop.
7878+ sink.tell(PostQuote(next_quote)).await?;
7979+ println!();
8080+8181+ Ok(())
8282+ };
8383+8484+ if let Err(e) = timeout(POSTING_TIMEOUT, next_post_iteration()).await {
8585+ eprintln!(
8686+ "Could not submit post in time to all sinks. Timeout error: {}",
8787+ e
8888+ );
8989+ }
3790 }
9191+9292+ Ok(())
3893 }
3994}
+83-8
src/sink.rs
···11use crate::data::Quote;
22+use bsky_sdk::{BskyAgent, api::types::Object};
23use kameo::prelude::*;
3445/// A newtype over [Quote] used to prompt the [SinkManager] to
···5253 }
5354}
54555656+/// A [QuoteSink] which will post the contents of each quote to Bluesky.
5757+#[derive(Actor)]
5858+pub struct BskySink {
5959+ bsky_agent: BskyAgent,
6060+ bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>,
6161+}
6262+6363+impl BskySink {
6464+ pub async fn new_session(username: String, password: String) -> Result<Self, ()> {
6565+ let agent = BskyAgent::builder().build().await.map_err(|_| ())?;
6666+ let session = agent.login(username, password).await.map_err(|_| ())?;
6767+6868+ Ok(Self {
6969+ bsky_agent: agent,
7070+ bsky_session: session,
7171+ })
7272+ }
7373+7474+ async fn submit_post(&mut self, quote: Quote) -> Result<(), ()> {
7575+ let post = bsky_sdk::api::app::bsky::feed::post::RecordData {
7676+ text: quote.into(),
7777+ created_at: bsky_sdk::api::types::string::Datetime::now(),
7878+ embed: None,
7979+ entities: None,
8080+ facets: None,
8181+ labels: None,
8282+ langs: None,
8383+ reply: None,
8484+ tags: None,
8585+ };
8686+8787+ if let Err(e) = self
8888+ .bsky_agent
8989+ .resume_session(self.bsky_session.clone())
9090+ .await
9191+ {
9292+ eprintln!("Failed to resume sessions due to following error: {e}");
9393+ return Err(());
9494+ }
9595+9696+ match self.bsky_agent.create_record(post.clone()).await {
9797+ Ok(_) => Ok(()),
9898+ Err(_) => Err(()),
9999+ }
100100+ }
101101+}
102102+103103+impl Message<PostQuote> for BskySink {
104104+ type Reply = PostResult;
105105+106106+ async fn handle(
107107+ &mut self,
108108+ PostQuote(quote): PostQuote,
109109+ _ctx: &mut Context<Self, Self::Reply>,
110110+ ) -> Self::Reply {
111111+ match self.submit_post(quote).await {
112112+ Ok(_) => Ok(()),
113113+ Err(_) => Err(PostFailure::Unrecoverable),
114114+ }
115115+ }
116116+}
117117+55118/// Supervises all [QuoteSink] actors within the program, forwarding
56119/// [PostQuote] messages to them as they are received.
57120/// The SinkManager will attempt to reinitialize failed sinks upon
···63126 // do asynchronous dynamic dispatch for it here.
64127 // I've decided I'll limit this to one sink per implementation right now.
65128 stdout_sink: Option<ActorRef<StdoutSink>>,
129129+ bsky_sink: Option<ActorRef<BskySink>>,
66130 // ...
67131}
6813269133impl SinkManager {
7070- pub fn new(stdout_sink: Option<ActorRef<StdoutSink>>) -> Self {
7171- Self { stdout_sink }
134134+ pub fn new(
135135+ stdout_sink: Option<ActorRef<StdoutSink>>,
136136+ bsky_sink: Option<ActorRef<BskySink>>,
137137+ ) -> Self {
138138+ Self {
139139+ stdout_sink,
140140+ bsky_sink,
141141+ }
72142 }
73143}
74144···84154 ) -> Self::Reply {
85155 use futures::future::join_all;
861568787- // We'll see if this monstrosity actually works
8888- let sinks = [self.stdout_sink.clone()];
8989- let futures = sinks
9090- .iter()
9191- .flatten()
157157+ let stdout_result = self
158158+ .stdout_sink
159159+ .as_ref()
92160 .map(|s| s.ask(msg.clone()).into_future());
161161+162162+ let bsky_result = self
163163+ .bsky_sink
164164+ .as_ref()
165165+ .map(|s| s.ask(msg.clone()).into_future());
166166+167167+ let futures = [stdout_result, bsky_result].into_iter().flatten();
93168 let results = join_all(futures).await;
9416995170 results.iter().map(|r| r.clone().or(Err(()))).collect()
···102177 use super::*;
103178104179 let stdout = StdoutSink::spawn(StdoutSink);
105105- let manager = SinkManager::spawn(SinkManager::new(Some(stdout)));
180180+ let manager = SinkManager::spawn(SinkManager::new(Some(stdout), None));
106181107182 let messages = ["First test!", "Second test.", "Third..."];
108183 for msg in messages {