Source code for my personal quote bot project.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

actor: sink manager implementation.

+57 -3
+1
Cargo.lock
··· 143 143 dependencies = [ 144 144 "bsky-sdk", 145 145 "cron-lite", 146 + "futures", 146 147 "glob", 147 148 "grep", 148 149 "kameo",
+1
Cargo.toml
··· 9 9 [dependencies] 10 10 bsky-sdk = "0.1.16" 11 11 cron-lite = { version = "0.3.0", features = ["async"] } 12 + futures = "0.3.31" 12 13 glob = "0.3.2" 13 14 grep = "0.3.2" 14 15 kameo = "0.17.2"
+55 -3
src/sink.rs
··· 3 3 4 4 /// A newtype over [Quote] used to prompt the [SinkManager] to 5 5 /// submit a new quote to all its configured sinks. 6 + #[derive(Debug, Clone)] 6 7 pub struct PostQuote(pub Quote); 7 8 8 9 /// Error type for internal communication between ··· 56 57 /// The SinkManager will attempt to reinitialize failed sinks upon 57 58 /// encountering recoverable errors. 58 59 #[derive(Actor)] 59 - pub struct SinkManager<S: QuoteSink> { 60 + pub struct SinkManager { 60 61 // Uh oh. As the [Actor] trait is *not* dyn-compatible, 61 62 // and I do not own its definition, I'm fairly certain that I cannot 62 63 // do asynchronous dynamic dispatch for it here. 63 - // We will have to use multiple vectors with static dispatch, instead... 64 - sinks: Vec<ActorRef<S>>, 64 + // I've decided I'll limit this to one sink per implementation right now. 65 + stdout_sink: Option<ActorRef<StdoutSink>>, 66 + // ... 67 + } 68 + 69 + impl SinkManager { 70 + pub fn new(stdout_sink: Option<ActorRef<StdoutSink>>) -> Self { 71 + Self { stdout_sink } 72 + } 73 + } 74 + 75 + pub type SinkReplies = Vec<Result<(), ()>>; 76 + 77 + impl Message<PostQuote> for SinkManager { 78 + type Reply = SinkReplies; 79 + 80 + async fn handle( 81 + &mut self, 82 + msg: PostQuote, 83 + _ctx: &mut Context<Self, Self::Reply>, 84 + ) -> Self::Reply { 85 + use futures::future::join_all; 86 + 87 + // We'll see if this monstrosity actually works 88 + let sinks = [self.stdout_sink.clone()]; 89 + let futures = sinks 90 + .iter() 91 + .flatten() 92 + .map(|s| s.ask(msg.clone()).into_future()); 93 + let results = join_all(futures).await; 94 + 95 + results.iter().map(|r| r.clone().or(Err(()))).collect() 96 + } 97 + } 98 + 99 + mod test { 100 + #[tokio::test] 101 + async fn stdout_sink() { 102 + use super::*; 103 + 104 + let stdout = StdoutSink::spawn(StdoutSink); 105 + let manager = SinkManager::spawn(SinkManager::new(Some(stdout))); 106 + 107 + let messages = ["First test!", "Second test.", "Third..."]; 108 + for msg in messages { 109 + manager.tell(PostQuote(msg.into())).await.unwrap(); 110 + tokio::time::sleep(std::time::Duration::from_secs(5)).await; 111 + } 112 + 113 + // Hopefully we don't crash...! 114 + // TODO: Sink that actually stores every quote it "posts"? 115 + // Could help in verifying everything was sent correctly. 116 + } 65 117 }