Source code for my personal quote bot project.

Compare changes

Choose any two refs to compare.

Changed files
+1006 -331
.tangled
workflows
src
+1 -1
.tangled/workflows/build.yml
··· 1 1 when: 2 2 - event: ["push", "manual"] 3 - branch: ["main", "ci-golf"] 3 + branch: ["main"] 4 4 5 5 engine: "nixery" 6 6
+265 -17
Cargo.lock
··· 33 33 checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" 34 34 35 35 [[package]] 36 - name = "android-tzdata" 37 - version = "0.1.1" 38 - source = "registry+https://github.com/rust-lang/crates.io-index" 39 - checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" 40 - 41 - [[package]] 42 36 name = "android_system_properties" 43 37 version = "0.1.5" 44 38 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 148 142 version = "0.1.0" 149 143 dependencies = [ 150 144 "bsky-sdk", 145 + "chrono", 146 + "cron-lite", 147 + "futures", 151 148 "glob", 152 149 "grep", 150 + "kameo", 153 151 "rand", 154 152 "redis", 155 153 "tokio", ··· 269 267 270 268 [[package]] 271 269 name = "chrono" 272 - version = "0.4.40" 270 + version = "0.4.42" 273 271 source = "registry+https://github.com/rust-lang/crates.io-index" 274 - checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" 272 + checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" 275 273 dependencies = [ 276 - "android-tzdata", 277 274 "iana-time-zone", 278 275 "js-sys", 279 276 "num-traits", ··· 283 280 ] 284 281 285 282 [[package]] 283 + name = "chrono-tz" 284 + version = "0.10.4" 285 + source = "registry+https://github.com/rust-lang/crates.io-index" 286 + checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3" 287 + dependencies = [ 288 + "chrono", 289 + "phf", 290 + ] 291 + 292 + [[package]] 286 293 name = "cid" 287 294 version = "0.11.1" 288 295 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 351 358 checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" 352 359 dependencies = [ 353 360 "cfg-if", 361 + ] 362 + 363 + [[package]] 364 + name = "cron-lite" 365 + version = "0.3.0" 366 + source = "registry+https://github.com/rust-lang/crates.io-index" 367 + checksum = "7b1c9e28df18340148b754969b7b66ed3c7f1242d10f4a4840391624333b589c" 368 + dependencies = [ 369 + "chrono", 370 + "futures", 371 + "pin-project", 354 372 ] 355 373 356 374 [[package]] 357 375 name = "croner" 358 - version = "2.1.0" 376 + version = "3.0.1" 359 377 source = "registry+https://github.com/rust-lang/crates.io-index" 360 - checksum = "38fd53511eaf0b00a185613875fee58b208dfce016577d0ad4bb548e1c4fb3ee" 378 + checksum = "4aa42bcd3d846ebf66e15bd528d1087f75d1c6c1c66ebff626178a106353c576" 361 379 dependencies = [ 362 380 "chrono", 381 + "derive_builder", 382 + "strum", 363 383 ] 364 384 365 385 [[package]] ··· 387 407 checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" 388 408 389 409 [[package]] 410 + name = "darling" 411 + version = "0.20.11" 412 + source = "registry+https://github.com/rust-lang/crates.io-index" 413 + checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" 414 + dependencies = [ 415 + "darling_core", 416 + "darling_macro", 417 + ] 418 + 419 + [[package]] 420 + name = "darling_core" 421 + version = "0.20.11" 422 + source = "registry+https://github.com/rust-lang/crates.io-index" 423 + checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" 424 + dependencies = [ 425 + "fnv", 426 + "ident_case", 427 + "proc-macro2", 428 + "quote", 429 + "strsim", 430 + "syn", 431 + ] 432 + 433 + [[package]] 434 + name = "darling_macro" 435 + version = "0.20.11" 436 + source = "registry+https://github.com/rust-lang/crates.io-index" 437 + checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" 438 + dependencies = [ 439 + "darling_core", 440 + "quote", 441 + "syn", 442 + ] 443 + 444 + [[package]] 390 445 name = "dashmap" 391 446 version = "6.1.0" 392 447 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 427 482 ] 428 483 429 484 [[package]] 485 + name = "derive_builder" 486 + version = "0.20.2" 487 + source = "registry+https://github.com/rust-lang/crates.io-index" 488 + checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" 489 + dependencies = [ 490 + "derive_builder_macro", 491 + ] 492 + 493 + [[package]] 494 + name = "derive_builder_core" 495 + version = "0.20.2" 496 + source = "registry+https://github.com/rust-lang/crates.io-index" 497 + checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" 498 + dependencies = [ 499 + "darling", 500 + "proc-macro2", 501 + "quote", 502 + "syn", 503 + ] 504 + 505 + [[package]] 506 + name = "derive_builder_macro" 507 + version = "0.20.2" 508 + source = "registry+https://github.com/rust-lang/crates.io-index" 509 + checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" 510 + dependencies = [ 511 + "derive_builder_core", 512 + "syn", 513 + ] 514 + 515 + [[package]] 430 516 name = "displaydoc" 431 517 version = "0.2.5" 432 518 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 436 522 "quote", 437 523 "syn", 438 524 ] 525 + 526 + [[package]] 527 + name = "downcast-rs" 528 + version = "2.0.2" 529 + source = "registry+https://github.com/rust-lang/crates.io-index" 530 + checksum = "117240f60069e65410b3ae1bb213295bd828f707b5bec6596a1afc8793ce0cbc" 531 + 532 + [[package]] 533 + name = "dyn-clone" 534 + version = "1.0.20" 535 + source = "registry+https://github.com/rust-lang/crates.io-index" 536 + checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" 439 537 440 538 [[package]] 441 539 name = "encoding_rs" ··· 545 643 ] 546 644 547 645 [[package]] 646 + name = "futures" 647 + version = "0.3.31" 648 + source = "registry+https://github.com/rust-lang/crates.io-index" 649 + checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" 650 + dependencies = [ 651 + "futures-channel", 652 + "futures-core", 653 + "futures-executor", 654 + "futures-io", 655 + "futures-sink", 656 + "futures-task", 657 + "futures-util", 658 + ] 659 + 660 + [[package]] 548 661 name = "futures-channel" 549 662 version = "0.3.31" 550 663 source = "registry+https://github.com/rust-lang/crates.io-index" 551 664 checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" 552 665 dependencies = [ 553 666 "futures-core", 667 + "futures-sink", 554 668 ] 555 669 556 670 [[package]] ··· 560 674 checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" 561 675 562 676 [[package]] 677 + name = "futures-executor" 678 + version = "0.3.31" 679 + source = "registry+https://github.com/rust-lang/crates.io-index" 680 + checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" 681 + dependencies = [ 682 + "futures-core", 683 + "futures-task", 684 + "futures-util", 685 + ] 686 + 687 + [[package]] 688 + name = "futures-io" 689 + version = "0.3.31" 690 + source = "registry+https://github.com/rust-lang/crates.io-index" 691 + checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" 692 + 693 + [[package]] 563 694 name = "futures-macro" 564 695 version = "0.3.31" 565 696 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 588 719 source = "registry+https://github.com/rust-lang/crates.io-index" 589 720 checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" 590 721 dependencies = [ 722 + "futures-channel", 591 723 "futures-core", 724 + "futures-io", 592 725 "futures-macro", 593 726 "futures-sink", 594 727 "futures-task", 728 + "memchr", 595 729 "pin-project-lite", 596 730 "pin-utils", 597 731 "slab", ··· 744 878 ] 745 879 746 880 [[package]] 881 + name = "heck" 882 + version = "0.5.0" 883 + source = "registry+https://github.com/rust-lang/crates.io-index" 884 + checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 885 + 886 + [[package]] 747 887 name = "http" 748 888 version = "1.2.0" 749 889 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 979 1119 ] 980 1120 981 1121 [[package]] 1122 + name = "ident_case" 1123 + version = "1.0.1" 1124 + source = "registry+https://github.com/rust-lang/crates.io-index" 1125 + checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" 1126 + 1127 + [[package]] 982 1128 name = "idna" 983 1129 version = "1.0.3" 984 1130 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1043 1189 ] 1044 1190 1045 1191 [[package]] 1192 + name = "kameo" 1193 + version = "0.17.2" 1194 + source = "registry+https://github.com/rust-lang/crates.io-index" 1195 + checksum = "41a73be96f616ca2784f597b5b6635582f5a7b3ba73b1dbe7afa5d9667955d39" 1196 + dependencies = [ 1197 + "downcast-rs", 1198 + "dyn-clone", 1199 + "futures", 1200 + "kameo_macros", 1201 + "once_cell", 1202 + "serde", 1203 + "tokio", 1204 + "tracing", 1205 + ] 1206 + 1207 + [[package]] 1208 + name = "kameo_macros" 1209 + version = "0.17.0" 1210 + source = "registry+https://github.com/rust-lang/crates.io-index" 1211 + checksum = "b3f384b32bf6426ae93a8b37da62c85073b676a31a82a86d608ad86453878de0" 1212 + dependencies = [ 1213 + "heck", 1214 + "proc-macro2", 1215 + "quote", 1216 + "syn", 1217 + "uuid", 1218 + ] 1219 + 1220 + [[package]] 1046 1221 name = "langtag" 1047 1222 version = "0.3.4" 1048 1223 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1374 1549 checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" 1375 1550 1376 1551 [[package]] 1552 + name = "phf" 1553 + version = "0.12.1" 1554 + source = "registry+https://github.com/rust-lang/crates.io-index" 1555 + checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7" 1556 + dependencies = [ 1557 + "phf_shared", 1558 + ] 1559 + 1560 + [[package]] 1561 + name = "phf_shared" 1562 + version = "0.12.1" 1563 + source = "registry+https://github.com/rust-lang/crates.io-index" 1564 + checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981" 1565 + dependencies = [ 1566 + "siphasher", 1567 + ] 1568 + 1569 + [[package]] 1570 + name = "pin-project" 1571 + version = "1.1.10" 1572 + source = "registry+https://github.com/rust-lang/crates.io-index" 1573 + checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" 1574 + dependencies = [ 1575 + "pin-project-internal", 1576 + ] 1577 + 1578 + [[package]] 1579 + name = "pin-project-internal" 1580 + version = "1.1.10" 1581 + source = "registry+https://github.com/rust-lang/crates.io-index" 1582 + checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" 1583 + dependencies = [ 1584 + "proc-macro2", 1585 + "quote", 1586 + "syn", 1587 + ] 1588 + 1589 + [[package]] 1377 1590 name = "pin-project-lite" 1378 1591 version = "0.2.16" 1379 1592 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1790 2003 ] 1791 2004 1792 2005 [[package]] 2006 + name = "siphasher" 2007 + version = "1.0.1" 2008 + source = "registry+https://github.com/rust-lang/crates.io-index" 2009 + checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" 2010 + 2011 + [[package]] 1793 2012 name = "slab" 1794 2013 version = "0.4.9" 1795 2014 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1821 2040 checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" 1822 2041 1823 2042 [[package]] 2043 + name = "strsim" 2044 + version = "0.11.1" 2045 + source = "registry+https://github.com/rust-lang/crates.io-index" 2046 + checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" 2047 + 2048 + [[package]] 2049 + name = "strum" 2050 + version = "0.27.2" 2051 + source = "registry+https://github.com/rust-lang/crates.io-index" 2052 + checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" 2053 + dependencies = [ 2054 + "strum_macros", 2055 + ] 2056 + 2057 + [[package]] 2058 + name = "strum_macros" 2059 + version = "0.27.2" 2060 + source = "registry+https://github.com/rust-lang/crates.io-index" 2061 + checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" 2062 + dependencies = [ 2063 + "heck", 2064 + "proc-macro2", 2065 + "quote", 2066 + "syn", 2067 + ] 2068 + 2069 + [[package]] 1824 2070 name = "syn" 1825 2071 version = "2.0.99" 1826 2072 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1935 2181 "signal-hook-registry", 1936 2182 "socket2", 1937 2183 "tokio-macros", 2184 + "tracing", 1938 2185 "windows-sys 0.52.0", 1939 2186 ] 1940 2187 1941 2188 [[package]] 1942 2189 name = "tokio-cron-scheduler" 1943 - version = "0.13.0" 2190 + version = "0.15.1" 1944 2191 source = "registry+https://github.com/rust-lang/crates.io-index" 1945 - checksum = "6a5597b569b4712cf78aa0c9ae29742461b7bda1e49c2a5fdad1d79bf022f8f0" 2192 + checksum = "1f50e41f200fd8ed426489bd356910ede4f053e30cebfbd59ef0f856f0d7432a" 1946 2193 dependencies = [ 1947 2194 "chrono", 2195 + "chrono-tz", 1948 2196 "croner", 1949 2197 "num-derive", 1950 2198 "num-traits", ··· 2027 2275 2028 2276 [[package]] 2029 2277 name = "tracing-attributes" 2030 - version = "0.1.28" 2278 + version = "0.1.31" 2031 2279 source = "registry+https://github.com/rust-lang/crates.io-index" 2032 - checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" 2280 + checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" 2033 2281 dependencies = [ 2034 2282 "proc-macro2", 2035 2283 "quote", ··· 2356 2604 2357 2605 [[package]] 2358 2606 name = "windows-link" 2359 - version = "0.1.0" 2607 + version = "0.2.1" 2360 2608 source = "registry+https://github.com/rust-lang/crates.io-index" 2361 - checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3" 2609 + checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" 2362 2610 2363 2611 [[package]] 2364 2612 name = "windows-registry"
+5 -3
Cargo.toml
··· 1 - cargo-features = ["edition2024"] # For rust-analyzer to work 2 - 3 1 [package] 4 2 name = "audquotes" 5 3 version = "0.1.0" ··· 8 6 9 7 [dependencies] 10 8 bsky-sdk = "0.1.16" 9 + chrono = "0.4.42" 10 + cron-lite = { version = "0.3.0", features = ["async"] } 11 + futures = "0.3.31" 11 12 glob = "0.3.2" 12 13 grep = "0.3.2" 14 + kameo = "0.17.2" 13 15 rand = "0.9.0" 14 16 redis = { version = "0.29.1", features = ["aio", "connection-manager", "tokio-comp"] } 15 17 tokio = { version = "1.44.0", features = ["full"] } 16 - tokio-cron-scheduler = "0.13.0" 18 + tokio-cron-scheduler = "0.15.1"
+24
src/data.rs
··· 1 + /// A newtype over [String] to represent a single quote. 2 + /// [Quote] does not implement [PartialEq] or [Eq] as, on principle, two 3 + /// quotes may be comprised of the same contents whilst still 4 + /// representing distinct quotes in practice (e.g. two different lines of dialogue which happen to be the same). 5 + #[derive(Debug, Clone)] 6 + pub struct Quote(String); 7 + 8 + impl<S: AsRef<str>> From<S> for Quote { 9 + fn from(value: S) -> Self { 10 + Self(value.as_ref().to_owned()) 11 + } 12 + } 13 + 14 + impl From<Quote> for String { 15 + fn from(value: Quote) -> Self { 16 + value.0 17 + } 18 + } 19 + 20 + impl Quote { 21 + pub fn get(&self) -> &str { 22 + &self.0 23 + } 24 + }
+94
src/lib.rs
··· 1 + pub mod data; 2 + pub mod sink; 3 + pub mod storage; 4 + 5 + pub mod run { 6 + use std::time::Duration; 7 + 8 + use crate::sink::{BskySink, PostQuote, SinkManager, StdoutSink}; 9 + use crate::storage::{ 10 + FetchQuote, QuoteCycle, queue::MemoryQueueStorage, source::FsFilterSourceManager, 11 + }; 12 + use cron_lite::CronEvent; 13 + use futures::StreamExt; 14 + use kameo::prelude::*; 15 + use tokio::time::timeout; 16 + 17 + pub async fn entrypoint() -> Result<(), Box<dyn std::error::Error>> { 18 + // TODO: Clean up this function's internals. 19 + // The current structure is alright, but it was stitched together 20 + // quickly just to confirm that everything is functioning as it should. 21 + let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1"; 22 + let bsky = if use_bsky { 23 + Some(BskySink::spawn( 24 + BskySink::new_session( 25 + std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"), 26 + std::env::var("BLUESKY_PASSWORD") 27 + .expect("Bluesky application password not supplied"), 28 + ) 29 + .await 30 + .expect("Could not connect to Bluesky with supplied credentials"), 31 + )) 32 + } else { 33 + None 34 + }; 35 + 36 + let sink = { 37 + let stdout = StdoutSink::spawn(StdoutSink); 38 + SinkManager::spawn(SinkManager::new(Some(stdout), bsky)) 39 + }; 40 + 41 + let cycle = { 42 + let source = FsFilterSourceManager::spawn(FsFilterSourceManager::default()); 43 + let queue = MemoryQueueStorage::spawn(MemoryQueueStorage::new()); 44 + 45 + QuoteCycle::spawn(QuoteCycle::with_thread_rng(source, queue)) 46 + }; 47 + 48 + use cron_lite::Schedule; 49 + const POSTING_TIMEOUT: Duration = Duration::from_secs(60); 50 + const POSTING_INTERVAL: &str = "*/10 * * * * * *"; 51 + let schedule = 52 + Schedule::new(POSTING_INTERVAL).expect("Schedule should be a valid cron expression"); 53 + let now = chrono::Utc::now(); 54 + 55 + let mut tick_stream = schedule.stream(&now); 56 + 57 + while let Some(tick) = tick_stream.next().await { 58 + if let CronEvent::Missed(missed_at) = tick { 59 + eprintln!( 60 + "Missed event tick at {}. Current time: {}. Skipping post.", 61 + missed_at, 62 + chrono::Utc::now() 63 + ); 64 + continue; 65 + } 66 + 67 + // We store the code to perform the next posting iteration as one atomic future which we wrap with a timeout. 68 + // This means that, if we miss a posting window due to the timeout, we will not get multiple consecutive or late posts. 69 + let next_post_iteration = async || -> Result<(), Box<dyn std::error::Error>> { 70 + let next_quote = cycle 71 + .ask(FetchQuote) 72 + .await 73 + .map_err(|_| "fetch quote should always succeed")?; 74 + 75 + // Note: By using `tell`, we don't know when each sink's code will have completed. 76 + // If any sink uses, say, a file or stdout, that resource may well be contested between 77 + // consecutive iterations of this loop. 78 + sink.tell(PostQuote(next_quote)).await?; 79 + println!(); 80 + 81 + Ok(()) 82 + }; 83 + 84 + if let Err(e) = timeout(POSTING_TIMEOUT, next_post_iteration()).await { 85 + eprintln!( 86 + "Could not submit post in time to all sinks. Timeout error: {}", 87 + e 88 + ); 89 + } 90 + } 91 + 92 + Ok(()) 93 + } 94 + }
+1 -310
src/main.rs
··· 1 - use bsky_sdk::api::app::bsky::feed::post; 2 - use bsky_sdk::api::types::string::Datetime; 3 - use bsky_sdk::{BskyAgent, api::types::Object}; 4 - 5 - use glob::glob; 6 - use grep::{regex, searcher::sinks}; 7 - use rand::seq::SliceRandom; 8 - use redis::aio::ConnectionManagerConfig; 9 - 10 - use std::{sync::Arc, time::Duration}; 11 - use tokio_cron_scheduler::{Job, JobScheduler}; 12 - 13 - use redis::AsyncCommands; 14 - 15 - const DEFAULT_QUEUE: &str = "queue:default"; 16 - const EVENT_QUEUE: &str = "queue:event"; 17 - 18 - // See https://cron.help for what these strings mean 19 - const POSTING_INTERVAL_CRON: &str = "0 0,30 * * * *"; 20 - const POSTING_INTERVAL_DEBUG: &str = "1/10 * * * * *"; 21 - const EVENT_UPDATE_INTERVAL: &str = "55 23 * * *"; 22 - 23 - const POSTING_RETRIES: i32 = 5; 24 - 25 - fn prepare_post<I: Into<String>>(text: I) -> post::RecordData { 26 - post::RecordData { 27 - text: text.into(), 28 - created_at: Datetime::now(), 29 - embed: None, 30 - entities: None, 31 - facets: None, 32 - labels: None, 33 - langs: None, 34 - reply: None, 35 - tags: None, 36 - } 37 - } 38 - 39 - #[derive(Clone, Debug)] 40 - struct QuoteFilter { 41 - path: String, 42 - content: String, 43 - dates: Vec<String>, 44 - } 45 - 46 - impl QuoteFilter { 47 - pub async fn get_quote( 48 - &self, 49 - mut con: impl redis::aio::ConnectionLike + AsyncCommands + Clone, 50 - ) -> Result<String, ()> { 51 - // 1: Attempt to read from the event (priority) queue 52 - let event_quote: Option<String> = con.lpop(EVENT_QUEUE, None).await.ok(); 53 - if let Some(quote) = event_quote { 54 - return Ok(quote); 55 - } 56 - 57 - // 2: Otherwise, we read from the regular queue, repopulating it if it's empty 58 - self.reshuffle_quotes(con.clone(), DEFAULT_QUEUE).await?; 59 - con.lpop(DEFAULT_QUEUE, None).await.map_err(|_| ()) 60 - } 61 - 62 - async fn reshuffle_quotes( 63 - &self, 64 - mut con: impl redis::aio::ConnectionLike + AsyncCommands, 65 - output_queue: &str, 66 - ) -> Result<(), ()> { 67 - let len: u64 = con.llen(output_queue).await.map_err(|_| ())?; 68 - // NOTE: The following assumes the queue hasn't been repopulated by any other client 69 - // in-between the call to llen and the execution of the pipeline. 70 - // Hopefully won't be a problem :) 71 - if len == 0 { 72 - let mut file_contents = self.read_files(); 73 - 74 - { 75 - let mut rand = rand::rng(); 76 - file_contents.shuffle(&mut rand); 77 - } 78 - 79 - let mut pipeline = redis::pipe(); 80 - for file_contents in file_contents.into_iter() { 81 - pipeline.lpush(output_queue, file_contents.as_str()); 82 - } 83 - let _: () = pipeline.query_async(&mut con).await.map_err(|_| ())?; 84 - } 85 - 86 - Ok(()) 87 - } 88 - 89 - fn read_files(&self) -> Vec<String> { 90 - let matcher = regex::RegexMatcher::new(&self.content).unwrap(); 91 - let mut searcher = grep::searcher::Searcher::new(); 92 - let mut results = Vec::new(); 93 - 94 - for file in glob(&self.path).unwrap() { 95 - let file = match file { 96 - Ok(file) => file, 97 - Err(_) => continue, 98 - }; 99 - 100 - let mut matched = false; 101 - let sink = sinks::Lossy(|_lnum, _line| { 102 - matched = true; 103 - Ok(false) 104 - }); 105 - 106 - let search_result = searcher.search_path(&matcher, &file, sink); 107 - if !matched || search_result.is_err() { 108 - continue; 109 - } 110 - 111 - let contents = std::fs::read_to_string(file).unwrap(); 112 - results.push(contents.trim().to_string()); 113 - } 114 - 115 - results 116 - } 117 - } 118 - 119 - #[derive(Clone)] 120 - struct RedisState { 121 - con_manager: redis::aio::ConnectionManager, 122 - } 123 - 124 - impl RedisState { 125 - pub async fn new(url: String) -> Result<Self, ()> { 126 - let redis = redis::Client::open(url).map_err(|_| ())?; 127 - let config = ConnectionManagerConfig::new() 128 - .set_response_timeout(std::time::Duration::from_secs(10)) 129 - .set_number_of_retries(3); 130 - let con_manager = redis::aio::ConnectionManager::new_with_config(redis, config) 131 - .await 132 - .map_err(|_| ())?; 133 - 134 - Ok(RedisState { con_manager }) 135 - } 136 - 137 - pub async fn fetch_quote(&self, filter: &QuoteFilter) -> Result<String, ()> { 138 - loop { 139 - match filter.get_quote(self.con_manager.clone()).await { 140 - Ok(text) => return Ok(text), 141 - Err(_) => eprintln!("Error fetching quote from redis storage. Retrying..."), 142 - }; 143 - } 144 - } 145 - } 146 - 147 - #[derive(Clone)] 148 - struct BlueskyState { 149 - bsky_agent: BskyAgent, 150 - bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>, 151 - } 152 - 153 - impl BlueskyState { 154 - pub async fn new_session(username: String, password: String) -> Result<Self, ()> { 155 - let agent = BskyAgent::builder().build().await.map_err(|_| ())?; 156 - let session = agent.login(username, password).await.map_err(|_| ())?; 157 - 158 - Ok(Self { 159 - bsky_agent: agent, 160 - bsky_session: session, 161 - }) 162 - } 163 - 164 - pub async fn submit_post(self, post: String) -> Result<(), ()> { 165 - let post = prepare_post(post.as_str()); 166 - 167 - for current_try in 0..POSTING_RETRIES { 168 - if let Err(e) = self.bsky_agent.create_record(post.clone()).await { 169 - eprintln!("Could not post quote: `{e}`"); 170 - eprintln!("Attempting to refresh login..."); 171 - 172 - if let Err(e) = self 173 - .bsky_agent 174 - .resume_session(self.bsky_session.clone()) 175 - .await 176 - { 177 - eprintln!("Failed to resume sessions due to following error: {e}") 178 - } 179 - } else { 180 - if current_try > 0 { 181 - eprintln!("Successfully posted quote on retry #{current_try}"); 182 - } 183 - return Ok(()); 184 - } 185 - } 186 - 187 - Err(()) 188 - } 189 - } 190 - 191 - #[derive(Clone)] 192 - struct State { 193 - redis: RedisState, 194 - bsky_session: Option<BlueskyState>, 195 - } 196 - 197 - impl State { 198 - pub fn redis(&self) -> &RedisState { 199 - &self.redis 200 - } 201 - 202 - pub fn bsky(&self) -> Option<&BlueskyState> { 203 - self.bsky_session.as_ref() 204 - } 205 - } 206 - 207 1 #[tokio::main] 208 2 async fn main() -> Result<(), Box<dyn std::error::Error>> { 209 - let debug_mode = std::env::var("DEBUG").unwrap_or("0".to_string()) == "1"; 210 - let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1"; 211 - 212 - let redis_state = 213 - RedisState::new(std::env::var("REDIS_URL").unwrap_or("redis://localhost".to_string())) 214 - .await 215 - .expect("Initial redis connection failure"); 216 - let bsky_state = if use_bsky { 217 - Some( 218 - BlueskyState::new_session( 219 - std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"), 220 - std::env::var("BLUESKY_PASSWORD") 221 - .expect("Bluesky application password not supplied"), 222 - ) 223 - .await 224 - .expect("Could not connect to Bluesky with supplied credentials"), 225 - ) 226 - } else { 227 - None 228 - }; 229 - 230 - let app_state = Arc::new(State { 231 - redis: redis_state, 232 - bsky_session: bsky_state, 233 - }); 234 - 235 - let sched = JobScheduler::new().await?; 236 - 237 - /* 238 - let event_filter = Arc::new(QuoteFilter { 239 - content: r"\b(?i:mother|mommy|mama|mom)\b".to_string(), 240 - path: "test/**/ 241 - *.txt".to_string(), 242 - dates: vec![], 243 - }); 244 - */ 245 - 246 - let regular_filter = Arc::new(QuoteFilter { 247 - content: r".*".to_string(), 248 - path: if !debug_mode { 249 - "quotes/**/*.txt".to_string() 250 - } else { 251 - "test/**/*.txt".to_string() 252 - }, 253 - dates: vec![], 254 - }); 255 - 256 - let posting_interval = if !debug_mode { 257 - POSTING_INTERVAL_CRON 258 - } else { 259 - POSTING_INTERVAL_DEBUG 260 - }; 261 - 262 - let post_job = Job::new_async(posting_interval, move |_uuid, _| { 263 - let filter = regular_filter.clone(); 264 - let app_state = app_state.clone(); 265 - 266 - Box::pin(async move { 267 - // We try fetching a new quote from our redis storage until we succeed 268 - let text = match app_state.redis().fetch_quote(&filter).await { 269 - Ok(text) => text, 270 - Err(_) => { 271 - eprintln!("Error fetching quote from redis storage."); 272 - return; 273 - } 274 - }; 275 - 276 - if let Some(bsky) = app_state.bsky() { 277 - if let Err(_) = bsky.clone().submit_post(text).await { 278 - eprintln!("Error posting to bluesky."); 279 - return; 280 - } 281 - } else { 282 - // Let's just print the quote! 283 - println!("{}\n", text); 284 - } 285 - }) 286 - })?; 287 - 288 - // Add async job 289 - sched.add(post_job).await?; 290 - 291 - // sched 292 - // .add(Job::new_async(EVENT_UPDATE_INTERVAL, move |_uuid, _| { 293 - // let filter = event_filter.clone(); 294 - // let con = con_event_monitor.clone(); 295 - // let _agent = agent_event_monitor.clone(); // Can be used later to e.g. update profile 296 - 297 - // Box::pin(async move { 298 - // // For testing purposes, let's always upload events 299 - // reshuffle_quotes(&filter, con.clone(), EVENT_QUEUE) 300 - // .await 301 - // .unwrap(); 302 - // }) 303 - // })?) 304 - // .await?; 305 - 306 - sched 307 - .start() 308 - .await 309 - .expect("Error starting tokio scheduler. Shutting down..."); 310 - loop { 311 - tokio::time::sleep(Duration::from_secs(10)).await; 312 - } 3 + audquotes::run::entrypoint().await 313 4 }
+192
src/sink.rs
··· 1 + use crate::data::Quote; 2 + use bsky_sdk::{BskyAgent, api::types::Object}; 3 + use kameo::prelude::*; 4 + 5 + /// A newtype over [Quote] used to prompt the [SinkManager] to 6 + /// submit a new quote to all its configured sinks. 7 + #[derive(Debug, Clone)] 8 + pub struct PostQuote(pub Quote); 9 + 10 + /// Error type for internal communication between 11 + /// the [SinkManager] and its sinks. 12 + /// The error reporting performed internally does not necessarily match the 13 + /// behavior which other modules will observe. 14 + #[derive(Debug, Clone)] 15 + pub enum PostFailure { 16 + /// Indicates that a given quote could not be posted to a sink, 17 + /// but that it *may* be retried. The `reinitialize` boolean signals 18 + /// whether the sink should be reinitialized before further attempts. 19 + Retry { reinitialize: bool }, 20 + 21 + /// Indicates that a given quote could not be posted to a sink, 22 + /// as it is unsupported by it in some way (e.g. quote exceeds the sink's length limit). 23 + Unsupported, 24 + 25 + /// Indicates that a given quote could not be posted to a sink 26 + /// due to the occurrence of some unrecoverable error. 27 + /// It is thus unlikely that the sink will work in the future, even if 28 + /// reinitialized. 29 + Unrecoverable, 30 + } 31 + 32 + pub type PostResult = Result<(), PostFailure>; 33 + 34 + /// Represents internal implementation details of the interactions between 35 + /// the [SinkManager] and its sinks. 36 + pub trait QuoteSink: Actor + Message<PostQuote, Reply = PostResult> {} 37 + 38 + /// A [QuoteSink] which will output the contents of each quote 39 + /// over Stdout. Is primarily meant for testing and observing sink behavior. 40 + #[derive(Actor)] 41 + pub struct StdoutSink; 42 + 43 + impl Message<PostQuote> for StdoutSink { 44 + type Reply = PostResult; 45 + 46 + async fn handle( 47 + &mut self, 48 + PostQuote(quote): PostQuote, 49 + _ctx: &mut Context<Self, Self::Reply>, 50 + ) -> Self::Reply { 51 + println!("{}", quote.get()); 52 + Ok(()) 53 + } 54 + } 55 + 56 + /// A [QuoteSink] which will post the contents of each quote to Bluesky. 57 + #[derive(Actor)] 58 + pub struct BskySink { 59 + bsky_agent: BskyAgent, 60 + bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>, 61 + } 62 + 63 + impl BskySink { 64 + pub async fn new_session(username: String, password: String) -> Result<Self, ()> { 65 + let agent = BskyAgent::builder().build().await.map_err(|_| ())?; 66 + let session = agent.login(username, password).await.map_err(|_| ())?; 67 + 68 + Ok(Self { 69 + bsky_agent: agent, 70 + bsky_session: session, 71 + }) 72 + } 73 + 74 + async fn submit_post(&mut self, quote: Quote) -> Result<(), ()> { 75 + let post = bsky_sdk::api::app::bsky::feed::post::RecordData { 76 + text: quote.into(), 77 + created_at: bsky_sdk::api::types::string::Datetime::now(), 78 + embed: None, 79 + entities: None, 80 + facets: None, 81 + labels: None, 82 + langs: None, 83 + reply: None, 84 + tags: None, 85 + }; 86 + 87 + if let Err(e) = self 88 + .bsky_agent 89 + .resume_session(self.bsky_session.clone()) 90 + .await 91 + { 92 + eprintln!("Failed to resume sessions due to following error: {e}"); 93 + return Err(()); 94 + } 95 + 96 + match self.bsky_agent.create_record(post.clone()).await { 97 + Ok(_) => Ok(()), 98 + Err(_) => Err(()), 99 + } 100 + } 101 + } 102 + 103 + impl Message<PostQuote> for BskySink { 104 + type Reply = PostResult; 105 + 106 + async fn handle( 107 + &mut self, 108 + PostQuote(quote): PostQuote, 109 + _ctx: &mut Context<Self, Self::Reply>, 110 + ) -> Self::Reply { 111 + match self.submit_post(quote).await { 112 + Ok(_) => Ok(()), 113 + Err(_) => Err(PostFailure::Unrecoverable), 114 + } 115 + } 116 + } 117 + 118 + /// Supervises all [QuoteSink] actors within the program, forwarding 119 + /// [PostQuote] messages to them as they are received. 120 + /// The SinkManager will attempt to reinitialize failed sinks upon 121 + /// encountering recoverable errors. 122 + #[derive(Actor)] 123 + pub struct SinkManager { 124 + // Uh oh. As the [Actor] trait is *not* dyn-compatible, 125 + // and I do not own its definition, I'm fairly certain that I cannot 126 + // do asynchronous dynamic dispatch for it here. 127 + // I've decided I'll limit this to one sink per implementation right now. 128 + stdout_sink: Option<ActorRef<StdoutSink>>, 129 + bsky_sink: Option<ActorRef<BskySink>>, 130 + // ... 131 + } 132 + 133 + impl SinkManager { 134 + pub fn new( 135 + stdout_sink: Option<ActorRef<StdoutSink>>, 136 + bsky_sink: Option<ActorRef<BskySink>>, 137 + ) -> Self { 138 + Self { 139 + stdout_sink, 140 + bsky_sink, 141 + } 142 + } 143 + } 144 + 145 + pub type SinkReplies = Vec<Result<(), ()>>; 146 + 147 + impl Message<PostQuote> for SinkManager { 148 + type Reply = SinkReplies; 149 + 150 + async fn handle( 151 + &mut self, 152 + msg: PostQuote, 153 + _ctx: &mut Context<Self, Self::Reply>, 154 + ) -> Self::Reply { 155 + use futures::future::join_all; 156 + 157 + let stdout_result = self 158 + .stdout_sink 159 + .as_ref() 160 + .map(|s| s.ask(msg.clone()).into_future()); 161 + 162 + let bsky_result = self 163 + .bsky_sink 164 + .as_ref() 165 + .map(|s| s.ask(msg.clone()).into_future()); 166 + 167 + let futures = [stdout_result, bsky_result].into_iter().flatten(); 168 + let results = join_all(futures).await; 169 + 170 + results.iter().map(|r| r.clone().or(Err(()))).collect() 171 + } 172 + } 173 + 174 + mod test { 175 + #[tokio::test] 176 + async fn stdout_sink() { 177 + use super::*; 178 + 179 + let stdout = StdoutSink::spawn(StdoutSink); 180 + let manager = SinkManager::spawn(SinkManager::new(Some(stdout), None)); 181 + 182 + let messages = ["First test!", "Second test.", "Third..."]; 183 + for msg in messages { 184 + manager.tell(PostQuote(msg.into())).await.unwrap(); 185 + tokio::time::sleep(std::time::Duration::from_secs(5)).await; 186 + } 187 + 188 + // Hopefully we don't crash...! 189 + // TODO: Sink that actually stores every quote it "posts"? 190 + // Could help in verifying everything was sent correctly. 191 + } 192 + }
+424
src/storage.rs
··· 1 + use kameo::prelude::*; 2 + 3 + use crate::storage::{ 4 + queue::{DequeueQuote, EnqueueQuotes}, 5 + source::SourceQuotes, 6 + }; 7 + 8 + use crate::data::Quote; 9 + 10 + mod rng { 11 + use rand::SeedableRng; 12 + 13 + pub struct PrngState { 14 + rng: rand::rngs::SmallRng, 15 + } 16 + 17 + impl PrngState { 18 + pub fn from_thread_rng() -> Self { 19 + Self { 20 + rng: rand::rngs::SmallRng::from_rng(&mut rand::rng()), 21 + } 22 + } 23 + 24 + pub fn from_seed(seed: u64) -> Self { 25 + Self { 26 + rng: rand::rngs::SmallRng::seed_from_u64(seed), 27 + } 28 + } 29 + 30 + pub fn shuffle_slice<T>(&mut self, slice: &mut [T]) { 31 + use rand::seq::SliceRandom; 32 + slice.shuffle(&mut self.rng); 33 + } 34 + } 35 + 36 + mod test { 37 + #[tokio::test] 38 + async fn shuffle_slice() { 39 + use super::*; 40 + 41 + let mut data = vec![1, 2, 3, 4]; 42 + let mut rng = PrngState::from_thread_rng(); 43 + 44 + rng.shuffle_slice(&mut data); 45 + println!("{:?}", data); 46 + } 47 + } 48 + } 49 + 50 + pub mod source { 51 + use super::*; 52 + 53 + // TODO: Should the quote source filters be 54 + // generic over the exact manager implementation being used? 55 + /// Message to request that a SourceManager source its quotes once again. 56 + pub struct SourceQuotes; 57 + pub type SourceReply = Result<Vec<Quote>, ()>; 58 + 59 + /// Subtrait of Actor which specifically 60 + /// denotes actors that can handle all relevant source messages. 61 + pub trait SourceManager: Actor + Message<SourceQuotes, Reply = SourceReply> {} 62 + 63 + impl<T> SourceManager for T where T: Message<SourceQuotes, Reply = SourceReply> {} 64 + 65 + /// Implementation of [`SourceManager`] which sources quotes from a Vec 66 + /// that it holds in memory, without accessing external services. 67 + /// Its main purpose is to be used for testing. 68 + #[derive(Actor)] 69 + pub struct MemorySourceManager { 70 + quotes: Vec<Quote>, 71 + } 72 + 73 + impl MemorySourceManager { 74 + pub fn new(quotes: impl IntoIterator<Item = impl Into<Quote>>) -> Self { 75 + Self { 76 + quotes: quotes.into_iter().map(Into::into).collect(), 77 + } 78 + } 79 + } 80 + 81 + impl Message<SourceQuotes> for MemorySourceManager { 82 + type Reply = SourceReply; 83 + 84 + async fn handle( 85 + &mut self, 86 + _msg: SourceQuotes, 87 + _ctx: &mut Context<Self, Self::Reply>, 88 + ) -> Self::Reply { 89 + // We just clone the quotes we've been holding onto since startup 90 + Ok(self.quotes.clone()) 91 + } 92 + } 93 + 94 + /// Uses a [QuoteFilter] to source quotes from the local filesystem 95 + /// at the beginning of each cycle. 96 + #[derive(Actor)] 97 + pub struct FsFilterSourceManager { 98 + filter: QuoteFilter, 99 + } 100 + 101 + impl FsFilterSourceManager { 102 + pub fn new(filter: QuoteFilter) -> Self { 103 + Self { filter } 104 + } 105 + } 106 + 107 + impl Default for FsFilterSourceManager { 108 + fn default() -> Self { 109 + Self { 110 + filter: QuoteFilter { 111 + content: r".*".to_string(), 112 + // TODO: Maybe make this a compile-time constant for debugging 113 + path: "test/**/*.txt".to_string(), 114 + _dates: vec![], 115 + }, 116 + } 117 + } 118 + } 119 + 120 + impl Message<SourceQuotes> for FsFilterSourceManager { 121 + type Reply = SourceReply; 122 + 123 + async fn handle( 124 + &mut self, 125 + _msg: SourceQuotes, 126 + _ctx: &mut Context<Self, Self::Reply>, 127 + ) -> Self::Reply { 128 + self.filter.read_files() 129 + } 130 + } 131 + 132 + #[derive(Clone, Debug)] 133 + pub struct QuoteFilter { 134 + path: String, 135 + content: String, 136 + _dates: Vec<String>, 137 + } 138 + 139 + impl QuoteFilter { 140 + // TODO: actually leverage async I/O 141 + fn read_files(&self) -> Result<Vec<Quote>, ()> { 142 + use glob::glob; 143 + use grep::{regex, searcher::sinks}; 144 + 145 + let matcher = regex::RegexMatcher::new(&self.content).map_err(|_| ())?; 146 + let mut searcher = grep::searcher::Searcher::new(); 147 + let mut results = Vec::new(); 148 + 149 + for file in glob(&self.path).map_err(|_| ())? { 150 + let file = match file { 151 + Ok(file) => file, 152 + Err(_) => continue, 153 + }; 154 + 155 + let mut matched = false; 156 + let sink = sinks::Lossy(|_lnum, _line| { 157 + matched = true; 158 + Ok(false) 159 + }); 160 + 161 + let search_result = searcher.search_path(&matcher, &file, sink); 162 + if !matched || search_result.is_err() { 163 + continue; 164 + } 165 + 166 + let contents = std::fs::read_to_string(file).map_err(|_| ())?; 167 + results.push(contents.trim().into()); 168 + } 169 + 170 + Ok(results) 171 + } 172 + } 173 + } 174 + 175 + pub mod queue { 176 + use std::collections::VecDeque; 177 + 178 + use super::*; 179 + 180 + // Messages to interact with the quote queue 181 + pub struct DequeueQuote; 182 + pub type DequeueReply = Result<Option<Quote>, ()>; 183 + 184 + pub struct EnqueueQuotes(pub Vec<Quote>); 185 + pub type EnqueueReply = Result<(), ()>; 186 + 187 + /// Subtrait of Actor which specifically 188 + /// denotes actors that can handle all relevant queue messages. 189 + pub trait QueueManager: 190 + Actor 191 + + Message<EnqueueQuotes, Reply = EnqueueReply> 192 + + Message<DequeueQuote, Reply = DequeueReply> 193 + { 194 + } 195 + 196 + impl<T> QueueManager for T where 197 + T: Message<EnqueueQuotes, Reply = EnqueueReply> 198 + + Message<DequeueQuote, Reply = DequeueReply> 199 + { 200 + } 201 + 202 + /// A basic implementation of an in-memory queue of quotes. 203 + /// Its contents are *not* persisted across application restarts, so it 204 + /// is only suited for testing purposes. 205 + #[derive(Actor, Default)] 206 + pub struct MemoryQueueStorage { 207 + quotes: VecDeque<Quote>, 208 + } 209 + 210 + impl MemoryQueueStorage { 211 + pub fn new() -> Self { 212 + Self { 213 + quotes: VecDeque::new(), 214 + } 215 + } 216 + } 217 + 218 + impl Message<EnqueueQuotes> for MemoryQueueStorage { 219 + /// We only need to signal success or failure in this instance, 220 + /// with no added metadata in either case. 221 + type Reply = EnqueueReply; 222 + 223 + async fn handle( 224 + &mut self, 225 + msg: EnqueueQuotes, 226 + _ctx: &mut Context<Self, Self::Reply>, 227 + ) -> Self::Reply { 228 + for q in msg.0 { 229 + self.quotes.push_back(q); 230 + } 231 + 232 + Ok(()) 233 + } 234 + } 235 + 236 + impl Message<DequeueQuote> for MemoryQueueStorage { 237 + type Reply = DequeueReply; 238 + 239 + async fn handle( 240 + &mut self, 241 + _msg: DequeueQuote, 242 + _ctx: &mut Context<Self, Self::Reply>, 243 + ) -> Self::Reply { 244 + // Note: this can never fail, since the quotes are stored in memory 245 + Ok(self.quotes.pop_front()) 246 + } 247 + } 248 + } 249 + 250 + #[derive(Actor)] 251 + pub struct QuoteCycle<S: source::SourceManager, Q: queue::QueueManager> { 252 + rng: rng::PrngState, 253 + source_manager: ActorRef<S>, 254 + queue_manager: ActorRef<Q>, 255 + } 256 + 257 + impl<S: source::SourceManager, Q: queue::QueueManager> QuoteCycle<S, Q> { 258 + pub fn new( 259 + rng: rng::PrngState, 260 + source_manager: ActorRef<S>, 261 + queue_manager: ActorRef<Q>, 262 + ) -> Self { 263 + Self { 264 + rng, 265 + source_manager, 266 + queue_manager, 267 + } 268 + } 269 + 270 + pub fn with_thread_rng(source_manager: ActorRef<S>, queue_manager: ActorRef<Q>) -> Self { 271 + Self { 272 + rng: rng::PrngState::from_thread_rng(), 273 + source_manager, 274 + queue_manager, 275 + } 276 + } 277 + } 278 + 279 + /// A message to [QuoteCycle] to fetch one more quote from its storage. 280 + pub struct FetchQuote; 281 + 282 + impl<S, Q> Message<FetchQuote> for QuoteCycle<S, Q> 283 + where 284 + S: source::SourceManager, 285 + Q: queue::QueueManager, 286 + { 287 + type Reply = Result<Quote, ()>; 288 + 289 + async fn handle( 290 + &mut self, 291 + _msg: FetchQuote, 292 + _ctx: &mut Context<Self, Self::Reply>, 293 + ) -> Self::Reply { 294 + // 1. We query our queue storage for the next quote 295 + if let Some(next_quote) = self.queue_manager.ask(DequeueQuote).await.map_err(|_| ())? { 296 + // if there is a quote, we simply return it and move on 297 + return Ok(next_quote); 298 + } 299 + 300 + // 2. Otherwise, we must repopulate the queue through our source 301 + let mut refreshed_quotes = self 302 + .source_manager 303 + .ask(SourceQuotes) 304 + .await 305 + .map_err(|_| ())?; 306 + 307 + // 3. We shuffle the newly-sourced quotes 308 + self.rng.shuffle_slice(&mut refreshed_quotes); 309 + let refreshed_quotes = refreshed_quotes; // No longer mutable 310 + 311 + // TODO: Perhaps we should assert that the new quotes are non-empty? 312 + // 4. We enqueue the newly-sourced quotes... 313 + self.queue_manager 314 + .ask(EnqueueQuotes(refreshed_quotes)) 315 + .await 316 + .map_err(|_| ())?; 317 + 318 + // 5. and, finally, we return the first among them. 319 + match self.queue_manager.ask(DequeueQuote).await { 320 + Ok(Some(q)) => Ok(q), 321 + Ok(None) => panic!("Newly-enqueued quotes should never be empty"), 322 + Err(_) => Err(()), 323 + } 324 + } 325 + } 326 + 327 + mod test { 328 + #[tokio::test] 329 + async fn memory_queue() { 330 + use super::Quote; 331 + use super::queue::*; 332 + use kameo::prelude::*; 333 + 334 + let queue_manager = MemoryQueueStorage::spawn(MemoryQueueStorage::new()); 335 + 336 + let sample_quotes = ["Test no.1", "Test no.2", "Test no.3"]; 337 + 338 + queue_manager 339 + .ask(EnqueueQuotes( 340 + sample_quotes.iter().cloned().map(Quote::from).collect(), 341 + )) 342 + .await 343 + .expect("In-memory quote queue storage should be valid for insertion"); 344 + 345 + for text in sample_quotes.iter() { 346 + assert_eq!( 347 + *text, 348 + queue_manager 349 + .ask(DequeueQuote) 350 + .await 351 + .expect("In-memory queue storage should never panic on dequeue") 352 + .expect("In-memory queue storage should never be initialized as empty") 353 + .get() 354 + ); 355 + } 356 + } 357 + 358 + #[tokio::test] 359 + async fn memory_source() { 360 + use super::source::*; 361 + use kameo::prelude::*; 362 + 363 + let sample_quotes = ["Minie", "Miney", "Moe", "and", "some", "more"]; 364 + 365 + let source_manager = MemorySourceManager::spawn(MemorySourceManager::new(sample_quotes)); 366 + 367 + let quotes = source_manager 368 + .ask(SourceQuotes) 369 + .await 370 + .expect("In-memory quote queue storage should be valid for insertion"); 371 + 372 + assert_eq!( 373 + sample_quotes.as_slice(), 374 + quotes 375 + .into_iter() 376 + // Since [Quote] doesn't implement any Equality trait, 377 + // we map the strings into quotes instead 378 + .map(String::from) 379 + .collect::<Vec<_>>() 380 + .as_slice(), 381 + ); 382 + } 383 + 384 + #[tokio::test] 385 + async fn memory_cycle() { 386 + use std::{collections::HashMap, ops::AddAssign}; 387 + 388 + use super::FetchQuote; 389 + use super::QuoteCycle; 390 + use super::queue::*; 391 + use super::source::*; 392 + use kameo::prelude::*; 393 + 394 + let sample_quotes = ["Minie", "Miney", "Moe"]; 395 + let cycle = { 396 + let source = MemorySourceManager::spawn(MemorySourceManager::new(sample_quotes)); 397 + let queue = MemoryQueueStorage::spawn(MemoryQueueStorage::new()); 398 + 399 + QuoteCycle::spawn(QuoteCycle::with_thread_rng(source, queue)) 400 + }; 401 + 402 + // We loop over `sample_quotes` twice to simulate the queue being exhausted fully, then re-sourced 403 + // Since the `cycle` manager will shuffle the quote sequence, we will verify that each 404 + // quote appears *exactly* `LOOPS` times throughout these iterations. 405 + const LOOPS: usize = 3; 406 + let mut quote_counts = HashMap::new(); 407 + for _ in 0..(sample_quotes.len() * LOOPS) { 408 + let next_quote = cycle.ask(FetchQuote).await.unwrap(); 409 + quote_counts 410 + .entry(next_quote.get().to_owned()) 411 + .or_insert(0) 412 + .add_assign(1); 413 + } 414 + 415 + let quote_counts = quote_counts; // no longer mut 416 + assert!( 417 + // Note: technically speaking, different quotes could contain equivalent strings, 418 + // which would make this test fail; a "more proper" invariant check would ensure 419 + // verify that all counts are a multiple of the amount of times `sample_quotes` was chained, 420 + // and that the sum of all counts equals the total number of times a `FetchQuote` message was sent. 421 + quote_counts.into_values().into_iter().all(|c| c == LOOPS) 422 + ); 423 + } 424 + }