+1
-1
.tangled/workflows/build.yml
+1
-1
.tangled/workflows/build.yml
+265
-17
Cargo.lock
+265
-17
Cargo.lock
···
33
33
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
34
34
35
35
[[package]]
36
-
name = "android-tzdata"
37
-
version = "0.1.1"
38
-
source = "registry+https://github.com/rust-lang/crates.io-index"
39
-
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
40
-
41
-
[[package]]
42
36
name = "android_system_properties"
43
37
version = "0.1.5"
44
38
source = "registry+https://github.com/rust-lang/crates.io-index"
···
148
142
version = "0.1.0"
149
143
dependencies = [
150
144
"bsky-sdk",
145
+
"chrono",
146
+
"cron-lite",
147
+
"futures",
151
148
"glob",
152
149
"grep",
150
+
"kameo",
153
151
"rand",
154
152
"redis",
155
153
"tokio",
···
269
267
270
268
[[package]]
271
269
name = "chrono"
272
-
version = "0.4.40"
270
+
version = "0.4.42"
273
271
source = "registry+https://github.com/rust-lang/crates.io-index"
274
-
checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c"
272
+
checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2"
275
273
dependencies = [
276
-
"android-tzdata",
277
274
"iana-time-zone",
278
275
"js-sys",
279
276
"num-traits",
···
283
280
]
284
281
285
282
[[package]]
283
+
name = "chrono-tz"
284
+
version = "0.10.4"
285
+
source = "registry+https://github.com/rust-lang/crates.io-index"
286
+
checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3"
287
+
dependencies = [
288
+
"chrono",
289
+
"phf",
290
+
]
291
+
292
+
[[package]]
286
293
name = "cid"
287
294
version = "0.11.1"
288
295
source = "registry+https://github.com/rust-lang/crates.io-index"
···
351
358
checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3"
352
359
dependencies = [
353
360
"cfg-if",
361
+
]
362
+
363
+
[[package]]
364
+
name = "cron-lite"
365
+
version = "0.3.0"
366
+
source = "registry+https://github.com/rust-lang/crates.io-index"
367
+
checksum = "7b1c9e28df18340148b754969b7b66ed3c7f1242d10f4a4840391624333b589c"
368
+
dependencies = [
369
+
"chrono",
370
+
"futures",
371
+
"pin-project",
354
372
]
355
373
356
374
[[package]]
357
375
name = "croner"
358
-
version = "2.1.0"
376
+
version = "3.0.1"
359
377
source = "registry+https://github.com/rust-lang/crates.io-index"
360
-
checksum = "38fd53511eaf0b00a185613875fee58b208dfce016577d0ad4bb548e1c4fb3ee"
378
+
checksum = "4aa42bcd3d846ebf66e15bd528d1087f75d1c6c1c66ebff626178a106353c576"
361
379
dependencies = [
362
380
"chrono",
381
+
"derive_builder",
382
+
"strum",
363
383
]
364
384
365
385
[[package]]
···
387
407
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
388
408
389
409
[[package]]
410
+
name = "darling"
411
+
version = "0.20.11"
412
+
source = "registry+https://github.com/rust-lang/crates.io-index"
413
+
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
414
+
dependencies = [
415
+
"darling_core",
416
+
"darling_macro",
417
+
]
418
+
419
+
[[package]]
420
+
name = "darling_core"
421
+
version = "0.20.11"
422
+
source = "registry+https://github.com/rust-lang/crates.io-index"
423
+
checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e"
424
+
dependencies = [
425
+
"fnv",
426
+
"ident_case",
427
+
"proc-macro2",
428
+
"quote",
429
+
"strsim",
430
+
"syn",
431
+
]
432
+
433
+
[[package]]
434
+
name = "darling_macro"
435
+
version = "0.20.11"
436
+
source = "registry+https://github.com/rust-lang/crates.io-index"
437
+
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
438
+
dependencies = [
439
+
"darling_core",
440
+
"quote",
441
+
"syn",
442
+
]
443
+
444
+
[[package]]
390
445
name = "dashmap"
391
446
version = "6.1.0"
392
447
source = "registry+https://github.com/rust-lang/crates.io-index"
···
427
482
]
428
483
429
484
[[package]]
485
+
name = "derive_builder"
486
+
version = "0.20.2"
487
+
source = "registry+https://github.com/rust-lang/crates.io-index"
488
+
checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947"
489
+
dependencies = [
490
+
"derive_builder_macro",
491
+
]
492
+
493
+
[[package]]
494
+
name = "derive_builder_core"
495
+
version = "0.20.2"
496
+
source = "registry+https://github.com/rust-lang/crates.io-index"
497
+
checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8"
498
+
dependencies = [
499
+
"darling",
500
+
"proc-macro2",
501
+
"quote",
502
+
"syn",
503
+
]
504
+
505
+
[[package]]
506
+
name = "derive_builder_macro"
507
+
version = "0.20.2"
508
+
source = "registry+https://github.com/rust-lang/crates.io-index"
509
+
checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c"
510
+
dependencies = [
511
+
"derive_builder_core",
512
+
"syn",
513
+
]
514
+
515
+
[[package]]
430
516
name = "displaydoc"
431
517
version = "0.2.5"
432
518
source = "registry+https://github.com/rust-lang/crates.io-index"
···
436
522
"quote",
437
523
"syn",
438
524
]
525
+
526
+
[[package]]
527
+
name = "downcast-rs"
528
+
version = "2.0.2"
529
+
source = "registry+https://github.com/rust-lang/crates.io-index"
530
+
checksum = "117240f60069e65410b3ae1bb213295bd828f707b5bec6596a1afc8793ce0cbc"
531
+
532
+
[[package]]
533
+
name = "dyn-clone"
534
+
version = "1.0.20"
535
+
source = "registry+https://github.com/rust-lang/crates.io-index"
536
+
checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
439
537
440
538
[[package]]
441
539
name = "encoding_rs"
···
545
643
]
546
644
547
645
[[package]]
646
+
name = "futures"
647
+
version = "0.3.31"
648
+
source = "registry+https://github.com/rust-lang/crates.io-index"
649
+
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
650
+
dependencies = [
651
+
"futures-channel",
652
+
"futures-core",
653
+
"futures-executor",
654
+
"futures-io",
655
+
"futures-sink",
656
+
"futures-task",
657
+
"futures-util",
658
+
]
659
+
660
+
[[package]]
548
661
name = "futures-channel"
549
662
version = "0.3.31"
550
663
source = "registry+https://github.com/rust-lang/crates.io-index"
551
664
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
552
665
dependencies = [
553
666
"futures-core",
667
+
"futures-sink",
554
668
]
555
669
556
670
[[package]]
···
560
674
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
561
675
562
676
[[package]]
677
+
name = "futures-executor"
678
+
version = "0.3.31"
679
+
source = "registry+https://github.com/rust-lang/crates.io-index"
680
+
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
681
+
dependencies = [
682
+
"futures-core",
683
+
"futures-task",
684
+
"futures-util",
685
+
]
686
+
687
+
[[package]]
688
+
name = "futures-io"
689
+
version = "0.3.31"
690
+
source = "registry+https://github.com/rust-lang/crates.io-index"
691
+
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
692
+
693
+
[[package]]
563
694
name = "futures-macro"
564
695
version = "0.3.31"
565
696
source = "registry+https://github.com/rust-lang/crates.io-index"
···
588
719
source = "registry+https://github.com/rust-lang/crates.io-index"
589
720
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
590
721
dependencies = [
722
+
"futures-channel",
591
723
"futures-core",
724
+
"futures-io",
592
725
"futures-macro",
593
726
"futures-sink",
594
727
"futures-task",
728
+
"memchr",
595
729
"pin-project-lite",
596
730
"pin-utils",
597
731
"slab",
···
744
878
]
745
879
746
880
[[package]]
881
+
name = "heck"
882
+
version = "0.5.0"
883
+
source = "registry+https://github.com/rust-lang/crates.io-index"
884
+
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
885
+
886
+
[[package]]
747
887
name = "http"
748
888
version = "1.2.0"
749
889
source = "registry+https://github.com/rust-lang/crates.io-index"
···
979
1119
]
980
1120
981
1121
[[package]]
1122
+
name = "ident_case"
1123
+
version = "1.0.1"
1124
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1125
+
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
1126
+
1127
+
[[package]]
982
1128
name = "idna"
983
1129
version = "1.0.3"
984
1130
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1043
1189
]
1044
1190
1045
1191
[[package]]
1192
+
name = "kameo"
1193
+
version = "0.17.2"
1194
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1195
+
checksum = "41a73be96f616ca2784f597b5b6635582f5a7b3ba73b1dbe7afa5d9667955d39"
1196
+
dependencies = [
1197
+
"downcast-rs",
1198
+
"dyn-clone",
1199
+
"futures",
1200
+
"kameo_macros",
1201
+
"once_cell",
1202
+
"serde",
1203
+
"tokio",
1204
+
"tracing",
1205
+
]
1206
+
1207
+
[[package]]
1208
+
name = "kameo_macros"
1209
+
version = "0.17.0"
1210
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1211
+
checksum = "b3f384b32bf6426ae93a8b37da62c85073b676a31a82a86d608ad86453878de0"
1212
+
dependencies = [
1213
+
"heck",
1214
+
"proc-macro2",
1215
+
"quote",
1216
+
"syn",
1217
+
"uuid",
1218
+
]
1219
+
1220
+
[[package]]
1046
1221
name = "langtag"
1047
1222
version = "0.3.4"
1048
1223
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1374
1549
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
1375
1550
1376
1551
[[package]]
1552
+
name = "phf"
1553
+
version = "0.12.1"
1554
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1555
+
checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7"
1556
+
dependencies = [
1557
+
"phf_shared",
1558
+
]
1559
+
1560
+
[[package]]
1561
+
name = "phf_shared"
1562
+
version = "0.12.1"
1563
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1564
+
checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981"
1565
+
dependencies = [
1566
+
"siphasher",
1567
+
]
1568
+
1569
+
[[package]]
1570
+
name = "pin-project"
1571
+
version = "1.1.10"
1572
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1573
+
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
1574
+
dependencies = [
1575
+
"pin-project-internal",
1576
+
]
1577
+
1578
+
[[package]]
1579
+
name = "pin-project-internal"
1580
+
version = "1.1.10"
1581
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1582
+
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
1583
+
dependencies = [
1584
+
"proc-macro2",
1585
+
"quote",
1586
+
"syn",
1587
+
]
1588
+
1589
+
[[package]]
1377
1590
name = "pin-project-lite"
1378
1591
version = "0.2.16"
1379
1592
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1790
2003
]
1791
2004
1792
2005
[[package]]
2006
+
name = "siphasher"
2007
+
version = "1.0.1"
2008
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2009
+
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
2010
+
2011
+
[[package]]
1793
2012
name = "slab"
1794
2013
version = "0.4.9"
1795
2014
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1821
2040
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
1822
2041
1823
2042
[[package]]
2043
+
name = "strsim"
2044
+
version = "0.11.1"
2045
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2046
+
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
2047
+
2048
+
[[package]]
2049
+
name = "strum"
2050
+
version = "0.27.2"
2051
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2052
+
checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf"
2053
+
dependencies = [
2054
+
"strum_macros",
2055
+
]
2056
+
2057
+
[[package]]
2058
+
name = "strum_macros"
2059
+
version = "0.27.2"
2060
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2061
+
checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7"
2062
+
dependencies = [
2063
+
"heck",
2064
+
"proc-macro2",
2065
+
"quote",
2066
+
"syn",
2067
+
]
2068
+
2069
+
[[package]]
1824
2070
name = "syn"
1825
2071
version = "2.0.99"
1826
2072
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1935
2181
"signal-hook-registry",
1936
2182
"socket2",
1937
2183
"tokio-macros",
2184
+
"tracing",
1938
2185
"windows-sys 0.52.0",
1939
2186
]
1940
2187
1941
2188
[[package]]
1942
2189
name = "tokio-cron-scheduler"
1943
-
version = "0.13.0"
2190
+
version = "0.15.1"
1944
2191
source = "registry+https://github.com/rust-lang/crates.io-index"
1945
-
checksum = "6a5597b569b4712cf78aa0c9ae29742461b7bda1e49c2a5fdad1d79bf022f8f0"
2192
+
checksum = "1f50e41f200fd8ed426489bd356910ede4f053e30cebfbd59ef0f856f0d7432a"
1946
2193
dependencies = [
1947
2194
"chrono",
2195
+
"chrono-tz",
1948
2196
"croner",
1949
2197
"num-derive",
1950
2198
"num-traits",
···
2027
2275
2028
2276
[[package]]
2029
2277
name = "tracing-attributes"
2030
-
version = "0.1.28"
2278
+
version = "0.1.31"
2031
2279
source = "registry+https://github.com/rust-lang/crates.io-index"
2032
-
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
2280
+
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
2033
2281
dependencies = [
2034
2282
"proc-macro2",
2035
2283
"quote",
···
2356
2604
2357
2605
[[package]]
2358
2606
name = "windows-link"
2359
-
version = "0.1.0"
2607
+
version = "0.2.1"
2360
2608
source = "registry+https://github.com/rust-lang/crates.io-index"
2361
-
checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3"
2609
+
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
2362
2610
2363
2611
[[package]]
2364
2612
name = "windows-registry"
+5
-3
Cargo.toml
+5
-3
Cargo.toml
···
1
-
cargo-features = ["edition2024"] # For rust-analyzer to work
2
-
3
1
[package]
4
2
name = "audquotes"
5
3
version = "0.1.0"
···
8
6
9
7
[dependencies]
10
8
bsky-sdk = "0.1.16"
9
+
chrono = "0.4.42"
10
+
cron-lite = { version = "0.3.0", features = ["async"] }
11
+
futures = "0.3.31"
11
12
glob = "0.3.2"
12
13
grep = "0.3.2"
14
+
kameo = "0.17.2"
13
15
rand = "0.9.0"
14
16
redis = { version = "0.29.1", features = ["aio", "connection-manager", "tokio-comp"] }
15
17
tokio = { version = "1.44.0", features = ["full"] }
16
-
tokio-cron-scheduler = "0.13.0"
18
+
tokio-cron-scheduler = "0.15.1"
+24
src/data.rs
+24
src/data.rs
···
1
+
/// A newtype over [String] to represent a single quote.
2
+
/// [Quote] does not implement [PartialEq] or [Eq] as, on principle, two
3
+
/// quotes may be comprised of the same contents whilst still
4
+
/// representing distinct quotes in practice (e.g. two different lines of dialogue which happen to be the same).
5
+
#[derive(Debug, Clone)]
6
+
pub struct Quote(String);
7
+
8
+
impl<S: AsRef<str>> From<S> for Quote {
9
+
fn from(value: S) -> Self {
10
+
Self(value.as_ref().to_owned())
11
+
}
12
+
}
13
+
14
+
impl From<Quote> for String {
15
+
fn from(value: Quote) -> Self {
16
+
value.0
17
+
}
18
+
}
19
+
20
+
impl Quote {
21
+
pub fn get(&self) -> &str {
22
+
&self.0
23
+
}
24
+
}
+94
src/lib.rs
+94
src/lib.rs
···
1
+
pub mod data;
2
+
pub mod sink;
3
+
pub mod storage;
4
+
5
+
pub mod run {
6
+
use std::time::Duration;
7
+
8
+
use crate::sink::{BskySink, PostQuote, SinkManager, StdoutSink};
9
+
use crate::storage::{
10
+
FetchQuote, QuoteCycle, queue::MemoryQueueStorage, source::FsFilterSourceManager,
11
+
};
12
+
use cron_lite::CronEvent;
13
+
use futures::StreamExt;
14
+
use kameo::prelude::*;
15
+
use tokio::time::timeout;
16
+
17
+
pub async fn entrypoint() -> Result<(), Box<dyn std::error::Error>> {
18
+
// TODO: Clean up this function's internals.
19
+
// The current structure is alright, but it was stitched together
20
+
// quickly just to confirm that everything is functioning as it should.
21
+
let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1";
22
+
let bsky = if use_bsky {
23
+
Some(BskySink::spawn(
24
+
BskySink::new_session(
25
+
std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"),
26
+
std::env::var("BLUESKY_PASSWORD")
27
+
.expect("Bluesky application password not supplied"),
28
+
)
29
+
.await
30
+
.expect("Could not connect to Bluesky with supplied credentials"),
31
+
))
32
+
} else {
33
+
None
34
+
};
35
+
36
+
let sink = {
37
+
let stdout = StdoutSink::spawn(StdoutSink);
38
+
SinkManager::spawn(SinkManager::new(Some(stdout), bsky))
39
+
};
40
+
41
+
let cycle = {
42
+
let source = FsFilterSourceManager::spawn(FsFilterSourceManager::default());
43
+
let queue = MemoryQueueStorage::spawn(MemoryQueueStorage::new());
44
+
45
+
QuoteCycle::spawn(QuoteCycle::with_thread_rng(source, queue))
46
+
};
47
+
48
+
use cron_lite::Schedule;
49
+
const POSTING_TIMEOUT: Duration = Duration::from_secs(60);
50
+
const POSTING_INTERVAL: &str = "*/10 * * * * * *";
51
+
let schedule =
52
+
Schedule::new(POSTING_INTERVAL).expect("Schedule should be a valid cron expression");
53
+
let now = chrono::Utc::now();
54
+
55
+
let mut tick_stream = schedule.stream(&now);
56
+
57
+
while let Some(tick) = tick_stream.next().await {
58
+
if let CronEvent::Missed(missed_at) = tick {
59
+
eprintln!(
60
+
"Missed event tick at {}. Current time: {}. Skipping post.",
61
+
missed_at,
62
+
chrono::Utc::now()
63
+
);
64
+
continue;
65
+
}
66
+
67
+
// We store the code to perform the next posting iteration as one atomic future which we wrap with a timeout.
68
+
// This means that, if we miss a posting window due to the timeout, we will not get multiple consecutive or late posts.
69
+
let next_post_iteration = async || -> Result<(), Box<dyn std::error::Error>> {
70
+
let next_quote = cycle
71
+
.ask(FetchQuote)
72
+
.await
73
+
.map_err(|_| "fetch quote should always succeed")?;
74
+
75
+
// Note: By using `tell`, we don't know when each sink's code will have completed.
76
+
// If any sink uses, say, a file or stdout, that resource may well be contested between
77
+
// consecutive iterations of this loop.
78
+
sink.tell(PostQuote(next_quote)).await?;
79
+
println!();
80
+
81
+
Ok(())
82
+
};
83
+
84
+
if let Err(e) = timeout(POSTING_TIMEOUT, next_post_iteration()).await {
85
+
eprintln!(
86
+
"Could not submit post in time to all sinks. Timeout error: {}",
87
+
e
88
+
);
89
+
}
90
+
}
91
+
92
+
Ok(())
93
+
}
94
+
}
+1
-310
src/main.rs
+1
-310
src/main.rs
···
1
-
use bsky_sdk::api::app::bsky::feed::post;
2
-
use bsky_sdk::api::types::string::Datetime;
3
-
use bsky_sdk::{BskyAgent, api::types::Object};
4
-
5
-
use glob::glob;
6
-
use grep::{regex, searcher::sinks};
7
-
use rand::seq::SliceRandom;
8
-
use redis::aio::ConnectionManagerConfig;
9
-
10
-
use std::{sync::Arc, time::Duration};
11
-
use tokio_cron_scheduler::{Job, JobScheduler};
12
-
13
-
use redis::AsyncCommands;
14
-
15
-
const DEFAULT_QUEUE: &str = "queue:default";
16
-
const EVENT_QUEUE: &str = "queue:event";
17
-
18
-
// See https://cron.help for what these strings mean
19
-
const POSTING_INTERVAL_CRON: &str = "0 0,30 * * * *";
20
-
const POSTING_INTERVAL_DEBUG: &str = "1/10 * * * * *";
21
-
const EVENT_UPDATE_INTERVAL: &str = "55 23 * * *";
22
-
23
-
const POSTING_RETRIES: i32 = 5;
24
-
25
-
fn prepare_post<I: Into<String>>(text: I) -> post::RecordData {
26
-
post::RecordData {
27
-
text: text.into(),
28
-
created_at: Datetime::now(),
29
-
embed: None,
30
-
entities: None,
31
-
facets: None,
32
-
labels: None,
33
-
langs: None,
34
-
reply: None,
35
-
tags: None,
36
-
}
37
-
}
38
-
39
-
#[derive(Clone, Debug)]
40
-
struct QuoteFilter {
41
-
path: String,
42
-
content: String,
43
-
dates: Vec<String>,
44
-
}
45
-
46
-
impl QuoteFilter {
47
-
pub async fn get_quote(
48
-
&self,
49
-
mut con: impl redis::aio::ConnectionLike + AsyncCommands + Clone,
50
-
) -> Result<String, ()> {
51
-
// 1: Attempt to read from the event (priority) queue
52
-
let event_quote: Option<String> = con.lpop(EVENT_QUEUE, None).await.ok();
53
-
if let Some(quote) = event_quote {
54
-
return Ok(quote);
55
-
}
56
-
57
-
// 2: Otherwise, we read from the regular queue, repopulating it if it's empty
58
-
self.reshuffle_quotes(con.clone(), DEFAULT_QUEUE).await?;
59
-
con.lpop(DEFAULT_QUEUE, None).await.map_err(|_| ())
60
-
}
61
-
62
-
async fn reshuffle_quotes(
63
-
&self,
64
-
mut con: impl redis::aio::ConnectionLike + AsyncCommands,
65
-
output_queue: &str,
66
-
) -> Result<(), ()> {
67
-
let len: u64 = con.llen(output_queue).await.map_err(|_| ())?;
68
-
// NOTE: The following assumes the queue hasn't been repopulated by any other client
69
-
// in-between the call to llen and the execution of the pipeline.
70
-
// Hopefully won't be a problem :)
71
-
if len == 0 {
72
-
let mut file_contents = self.read_files();
73
-
74
-
{
75
-
let mut rand = rand::rng();
76
-
file_contents.shuffle(&mut rand);
77
-
}
78
-
79
-
let mut pipeline = redis::pipe();
80
-
for file_contents in file_contents.into_iter() {
81
-
pipeline.lpush(output_queue, file_contents.as_str());
82
-
}
83
-
let _: () = pipeline.query_async(&mut con).await.map_err(|_| ())?;
84
-
}
85
-
86
-
Ok(())
87
-
}
88
-
89
-
fn read_files(&self) -> Vec<String> {
90
-
let matcher = regex::RegexMatcher::new(&self.content).unwrap();
91
-
let mut searcher = grep::searcher::Searcher::new();
92
-
let mut results = Vec::new();
93
-
94
-
for file in glob(&self.path).unwrap() {
95
-
let file = match file {
96
-
Ok(file) => file,
97
-
Err(_) => continue,
98
-
};
99
-
100
-
let mut matched = false;
101
-
let sink = sinks::Lossy(|_lnum, _line| {
102
-
matched = true;
103
-
Ok(false)
104
-
});
105
-
106
-
let search_result = searcher.search_path(&matcher, &file, sink);
107
-
if !matched || search_result.is_err() {
108
-
continue;
109
-
}
110
-
111
-
let contents = std::fs::read_to_string(file).unwrap();
112
-
results.push(contents.trim().to_string());
113
-
}
114
-
115
-
results
116
-
}
117
-
}
118
-
119
-
#[derive(Clone)]
120
-
struct RedisState {
121
-
con_manager: redis::aio::ConnectionManager,
122
-
}
123
-
124
-
impl RedisState {
125
-
pub async fn new(url: String) -> Result<Self, ()> {
126
-
let redis = redis::Client::open(url).map_err(|_| ())?;
127
-
let config = ConnectionManagerConfig::new()
128
-
.set_response_timeout(std::time::Duration::from_secs(10))
129
-
.set_number_of_retries(3);
130
-
let con_manager = redis::aio::ConnectionManager::new_with_config(redis, config)
131
-
.await
132
-
.map_err(|_| ())?;
133
-
134
-
Ok(RedisState { con_manager })
135
-
}
136
-
137
-
pub async fn fetch_quote(&self, filter: &QuoteFilter) -> Result<String, ()> {
138
-
loop {
139
-
match filter.get_quote(self.con_manager.clone()).await {
140
-
Ok(text) => return Ok(text),
141
-
Err(_) => eprintln!("Error fetching quote from redis storage. Retrying..."),
142
-
};
143
-
}
144
-
}
145
-
}
146
-
147
-
#[derive(Clone)]
148
-
struct BlueskyState {
149
-
bsky_agent: BskyAgent,
150
-
bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>,
151
-
}
152
-
153
-
impl BlueskyState {
154
-
pub async fn new_session(username: String, password: String) -> Result<Self, ()> {
155
-
let agent = BskyAgent::builder().build().await.map_err(|_| ())?;
156
-
let session = agent.login(username, password).await.map_err(|_| ())?;
157
-
158
-
Ok(Self {
159
-
bsky_agent: agent,
160
-
bsky_session: session,
161
-
})
162
-
}
163
-
164
-
pub async fn submit_post(self, post: String) -> Result<(), ()> {
165
-
let post = prepare_post(post.as_str());
166
-
167
-
for current_try in 0..POSTING_RETRIES {
168
-
if let Err(e) = self.bsky_agent.create_record(post.clone()).await {
169
-
eprintln!("Could not post quote: `{e}`");
170
-
eprintln!("Attempting to refresh login...");
171
-
172
-
if let Err(e) = self
173
-
.bsky_agent
174
-
.resume_session(self.bsky_session.clone())
175
-
.await
176
-
{
177
-
eprintln!("Failed to resume sessions due to following error: {e}")
178
-
}
179
-
} else {
180
-
if current_try > 0 {
181
-
eprintln!("Successfully posted quote on retry #{current_try}");
182
-
}
183
-
return Ok(());
184
-
}
185
-
}
186
-
187
-
Err(())
188
-
}
189
-
}
190
-
191
-
#[derive(Clone)]
192
-
struct State {
193
-
redis: RedisState,
194
-
bsky_session: Option<BlueskyState>,
195
-
}
196
-
197
-
impl State {
198
-
pub fn redis(&self) -> &RedisState {
199
-
&self.redis
200
-
}
201
-
202
-
pub fn bsky(&self) -> Option<&BlueskyState> {
203
-
self.bsky_session.as_ref()
204
-
}
205
-
}
206
-
207
1
#[tokio::main]
208
2
async fn main() -> Result<(), Box<dyn std::error::Error>> {
209
-
let debug_mode = std::env::var("DEBUG").unwrap_or("0".to_string()) == "1";
210
-
let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1";
211
-
212
-
let redis_state =
213
-
RedisState::new(std::env::var("REDIS_URL").unwrap_or("redis://localhost".to_string()))
214
-
.await
215
-
.expect("Initial redis connection failure");
216
-
let bsky_state = if use_bsky {
217
-
Some(
218
-
BlueskyState::new_session(
219
-
std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"),
220
-
std::env::var("BLUESKY_PASSWORD")
221
-
.expect("Bluesky application password not supplied"),
222
-
)
223
-
.await
224
-
.expect("Could not connect to Bluesky with supplied credentials"),
225
-
)
226
-
} else {
227
-
None
228
-
};
229
-
230
-
let app_state = Arc::new(State {
231
-
redis: redis_state,
232
-
bsky_session: bsky_state,
233
-
});
234
-
235
-
let sched = JobScheduler::new().await?;
236
-
237
-
/*
238
-
let event_filter = Arc::new(QuoteFilter {
239
-
content: r"\b(?i:mother|mommy|mama|mom)\b".to_string(),
240
-
path: "test/**/
241
-
*.txt".to_string(),
242
-
dates: vec![],
243
-
});
244
-
*/
245
-
246
-
let regular_filter = Arc::new(QuoteFilter {
247
-
content: r".*".to_string(),
248
-
path: if !debug_mode {
249
-
"quotes/**/*.txt".to_string()
250
-
} else {
251
-
"test/**/*.txt".to_string()
252
-
},
253
-
dates: vec![],
254
-
});
255
-
256
-
let posting_interval = if !debug_mode {
257
-
POSTING_INTERVAL_CRON
258
-
} else {
259
-
POSTING_INTERVAL_DEBUG
260
-
};
261
-
262
-
let post_job = Job::new_async(posting_interval, move |_uuid, _| {
263
-
let filter = regular_filter.clone();
264
-
let app_state = app_state.clone();
265
-
266
-
Box::pin(async move {
267
-
// We try fetching a new quote from our redis storage until we succeed
268
-
let text = match app_state.redis().fetch_quote(&filter).await {
269
-
Ok(text) => text,
270
-
Err(_) => {
271
-
eprintln!("Error fetching quote from redis storage.");
272
-
return;
273
-
}
274
-
};
275
-
276
-
if let Some(bsky) = app_state.bsky() {
277
-
if let Err(_) = bsky.clone().submit_post(text).await {
278
-
eprintln!("Error posting to bluesky.");
279
-
return;
280
-
}
281
-
} else {
282
-
// Let's just print the quote!
283
-
println!("{}\n", text);
284
-
}
285
-
})
286
-
})?;
287
-
288
-
// Add async job
289
-
sched.add(post_job).await?;
290
-
291
-
// sched
292
-
// .add(Job::new_async(EVENT_UPDATE_INTERVAL, move |_uuid, _| {
293
-
// let filter = event_filter.clone();
294
-
// let con = con_event_monitor.clone();
295
-
// let _agent = agent_event_monitor.clone(); // Can be used later to e.g. update profile
296
-
297
-
// Box::pin(async move {
298
-
// // For testing purposes, let's always upload events
299
-
// reshuffle_quotes(&filter, con.clone(), EVENT_QUEUE)
300
-
// .await
301
-
// .unwrap();
302
-
// })
303
-
// })?)
304
-
// .await?;
305
-
306
-
sched
307
-
.start()
308
-
.await
309
-
.expect("Error starting tokio scheduler. Shutting down...");
310
-
loop {
311
-
tokio::time::sleep(Duration::from_secs(10)).await;
312
-
}
3
+
audquotes::run::entrypoint().await
313
4
}
+192
src/sink.rs
+192
src/sink.rs
···
1
+
use crate::data::Quote;
2
+
use bsky_sdk::{BskyAgent, api::types::Object};
3
+
use kameo::prelude::*;
4
+
5
+
/// A newtype over [Quote] used to prompt the [SinkManager] to
6
+
/// submit a new quote to all its configured sinks.
7
+
#[derive(Debug, Clone)]
8
+
pub struct PostQuote(pub Quote);
9
+
10
+
/// Error type for internal communication between
11
+
/// the [SinkManager] and its sinks.
12
+
/// The error reporting performed internally does not necessarily match the
13
+
/// behavior which other modules will observe.
14
+
#[derive(Debug, Clone)]
15
+
pub enum PostFailure {
16
+
/// Indicates that a given quote could not be posted to a sink,
17
+
/// but that it *may* be retried. The `reinitialize` boolean signals
18
+
/// whether the sink should be reinitialized before further attempts.
19
+
Retry { reinitialize: bool },
20
+
21
+
/// Indicates that a given quote could not be posted to a sink,
22
+
/// as it is unsupported by it in some way (e.g. quote exceeds the sink's length limit).
23
+
Unsupported,
24
+
25
+
/// Indicates that a given quote could not be posted to a sink
26
+
/// due to the occurrence of some unrecoverable error.
27
+
/// It is thus unlikely that the sink will work in the future, even if
28
+
/// reinitialized.
29
+
Unrecoverable,
30
+
}
31
+
32
+
pub type PostResult = Result<(), PostFailure>;
33
+
34
+
/// Represents internal implementation details of the interactions between
35
+
/// the [SinkManager] and its sinks.
36
+
pub trait QuoteSink: Actor + Message<PostQuote, Reply = PostResult> {}
37
+
38
+
/// A [QuoteSink] which will output the contents of each quote
39
+
/// over Stdout. Is primarily meant for testing and observing sink behavior.
40
+
#[derive(Actor)]
41
+
pub struct StdoutSink;
42
+
43
+
impl Message<PostQuote> for StdoutSink {
44
+
type Reply = PostResult;
45
+
46
+
async fn handle(
47
+
&mut self,
48
+
PostQuote(quote): PostQuote,
49
+
_ctx: &mut Context<Self, Self::Reply>,
50
+
) -> Self::Reply {
51
+
println!("{}", quote.get());
52
+
Ok(())
53
+
}
54
+
}
55
+
56
+
/// A [QuoteSink] which will post the contents of each quote to Bluesky.
57
+
#[derive(Actor)]
58
+
pub struct BskySink {
59
+
bsky_agent: BskyAgent,
60
+
bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>,
61
+
}
62
+
63
+
impl BskySink {
64
+
pub async fn new_session(username: String, password: String) -> Result<Self, ()> {
65
+
let agent = BskyAgent::builder().build().await.map_err(|_| ())?;
66
+
let session = agent.login(username, password).await.map_err(|_| ())?;
67
+
68
+
Ok(Self {
69
+
bsky_agent: agent,
70
+
bsky_session: session,
71
+
})
72
+
}
73
+
74
+
async fn submit_post(&mut self, quote: Quote) -> Result<(), ()> {
75
+
let post = bsky_sdk::api::app::bsky::feed::post::RecordData {
76
+
text: quote.into(),
77
+
created_at: bsky_sdk::api::types::string::Datetime::now(),
78
+
embed: None,
79
+
entities: None,
80
+
facets: None,
81
+
labels: None,
82
+
langs: None,
83
+
reply: None,
84
+
tags: None,
85
+
};
86
+
87
+
if let Err(e) = self
88
+
.bsky_agent
89
+
.resume_session(self.bsky_session.clone())
90
+
.await
91
+
{
92
+
eprintln!("Failed to resume sessions due to following error: {e}");
93
+
return Err(());
94
+
}
95
+
96
+
match self.bsky_agent.create_record(post.clone()).await {
97
+
Ok(_) => Ok(()),
98
+
Err(_) => Err(()),
99
+
}
100
+
}
101
+
}
102
+
103
+
impl Message<PostQuote> for BskySink {
104
+
type Reply = PostResult;
105
+
106
+
async fn handle(
107
+
&mut self,
108
+
PostQuote(quote): PostQuote,
109
+
_ctx: &mut Context<Self, Self::Reply>,
110
+
) -> Self::Reply {
111
+
match self.submit_post(quote).await {
112
+
Ok(_) => Ok(()),
113
+
Err(_) => Err(PostFailure::Unrecoverable),
114
+
}
115
+
}
116
+
}
117
+
118
+
/// Supervises all [QuoteSink] actors within the program, forwarding
119
+
/// [PostQuote] messages to them as they are received.
120
+
/// The SinkManager will attempt to reinitialize failed sinks upon
121
+
/// encountering recoverable errors.
122
+
#[derive(Actor)]
123
+
pub struct SinkManager {
124
+
// Uh oh. As the [Actor] trait is *not* dyn-compatible,
125
+
// and I do not own its definition, I'm fairly certain that I cannot
126
+
// do asynchronous dynamic dispatch for it here.
127
+
// I've decided I'll limit this to one sink per implementation right now.
128
+
stdout_sink: Option<ActorRef<StdoutSink>>,
129
+
bsky_sink: Option<ActorRef<BskySink>>,
130
+
// ...
131
+
}
132
+
133
+
impl SinkManager {
134
+
pub fn new(
135
+
stdout_sink: Option<ActorRef<StdoutSink>>,
136
+
bsky_sink: Option<ActorRef<BskySink>>,
137
+
) -> Self {
138
+
Self {
139
+
stdout_sink,
140
+
bsky_sink,
141
+
}
142
+
}
143
+
}
144
+
145
+
pub type SinkReplies = Vec<Result<(), ()>>;
146
+
147
+
impl Message<PostQuote> for SinkManager {
148
+
type Reply = SinkReplies;
149
+
150
+
async fn handle(
151
+
&mut self,
152
+
msg: PostQuote,
153
+
_ctx: &mut Context<Self, Self::Reply>,
154
+
) -> Self::Reply {
155
+
use futures::future::join_all;
156
+
157
+
let stdout_result = self
158
+
.stdout_sink
159
+
.as_ref()
160
+
.map(|s| s.ask(msg.clone()).into_future());
161
+
162
+
let bsky_result = self
163
+
.bsky_sink
164
+
.as_ref()
165
+
.map(|s| s.ask(msg.clone()).into_future());
166
+
167
+
let futures = [stdout_result, bsky_result].into_iter().flatten();
168
+
let results = join_all(futures).await;
169
+
170
+
results.iter().map(|r| r.clone().or(Err(()))).collect()
171
+
}
172
+
}
173
+
174
+
mod test {
175
+
#[tokio::test]
176
+
async fn stdout_sink() {
177
+
use super::*;
178
+
179
+
let stdout = StdoutSink::spawn(StdoutSink);
180
+
let manager = SinkManager::spawn(SinkManager::new(Some(stdout), None));
181
+
182
+
let messages = ["First test!", "Second test.", "Third..."];
183
+
for msg in messages {
184
+
manager.tell(PostQuote(msg.into())).await.unwrap();
185
+
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
186
+
}
187
+
188
+
// Hopefully we don't crash...!
189
+
// TODO: Sink that actually stores every quote it "posts"?
190
+
// Could help in verifying everything was sent correctly.
191
+
}
192
+
}
+424
src/storage.rs
+424
src/storage.rs
···
1
+
use kameo::prelude::*;
2
+
3
+
use crate::storage::{
4
+
queue::{DequeueQuote, EnqueueQuotes},
5
+
source::SourceQuotes,
6
+
};
7
+
8
+
use crate::data::Quote;
9
+
10
+
mod rng {
11
+
use rand::SeedableRng;
12
+
13
+
pub struct PrngState {
14
+
rng: rand::rngs::SmallRng,
15
+
}
16
+
17
+
impl PrngState {
18
+
pub fn from_thread_rng() -> Self {
19
+
Self {
20
+
rng: rand::rngs::SmallRng::from_rng(&mut rand::rng()),
21
+
}
22
+
}
23
+
24
+
pub fn from_seed(seed: u64) -> Self {
25
+
Self {
26
+
rng: rand::rngs::SmallRng::seed_from_u64(seed),
27
+
}
28
+
}
29
+
30
+
pub fn shuffle_slice<T>(&mut self, slice: &mut [T]) {
31
+
use rand::seq::SliceRandom;
32
+
slice.shuffle(&mut self.rng);
33
+
}
34
+
}
35
+
36
+
mod test {
37
+
#[tokio::test]
38
+
async fn shuffle_slice() {
39
+
use super::*;
40
+
41
+
let mut data = vec![1, 2, 3, 4];
42
+
let mut rng = PrngState::from_thread_rng();
43
+
44
+
rng.shuffle_slice(&mut data);
45
+
println!("{:?}", data);
46
+
}
47
+
}
48
+
}
49
+
50
+
pub mod source {
51
+
use super::*;
52
+
53
+
// TODO: Should the quote source filters be
54
+
// generic over the exact manager implementation being used?
55
+
/// Message to request that a SourceManager source its quotes once again.
56
+
pub struct SourceQuotes;
57
+
pub type SourceReply = Result<Vec<Quote>, ()>;
58
+
59
+
/// Subtrait of Actor which specifically
60
+
/// denotes actors that can handle all relevant source messages.
61
+
pub trait SourceManager: Actor + Message<SourceQuotes, Reply = SourceReply> {}
62
+
63
+
impl<T> SourceManager for T where T: Message<SourceQuotes, Reply = SourceReply> {}
64
+
65
+
/// Implementation of [`SourceManager`] which sources quotes from a Vec
66
+
/// that it holds in memory, without accessing external services.
67
+
/// Its main purpose is to be used for testing.
68
+
#[derive(Actor)]
69
+
pub struct MemorySourceManager {
70
+
quotes: Vec<Quote>,
71
+
}
72
+
73
+
impl MemorySourceManager {
74
+
pub fn new(quotes: impl IntoIterator<Item = impl Into<Quote>>) -> Self {
75
+
Self {
76
+
quotes: quotes.into_iter().map(Into::into).collect(),
77
+
}
78
+
}
79
+
}
80
+
81
+
impl Message<SourceQuotes> for MemorySourceManager {
82
+
type Reply = SourceReply;
83
+
84
+
async fn handle(
85
+
&mut self,
86
+
_msg: SourceQuotes,
87
+
_ctx: &mut Context<Self, Self::Reply>,
88
+
) -> Self::Reply {
89
+
// We just clone the quotes we've been holding onto since startup
90
+
Ok(self.quotes.clone())
91
+
}
92
+
}
93
+
94
+
/// Uses a [QuoteFilter] to source quotes from the local filesystem
95
+
/// at the beginning of each cycle.
96
+
#[derive(Actor)]
97
+
pub struct FsFilterSourceManager {
98
+
filter: QuoteFilter,
99
+
}
100
+
101
+
impl FsFilterSourceManager {
102
+
pub fn new(filter: QuoteFilter) -> Self {
103
+
Self { filter }
104
+
}
105
+
}
106
+
107
+
impl Default for FsFilterSourceManager {
108
+
fn default() -> Self {
109
+
Self {
110
+
filter: QuoteFilter {
111
+
content: r".*".to_string(),
112
+
// TODO: Maybe make this a compile-time constant for debugging
113
+
path: "test/**/*.txt".to_string(),
114
+
_dates: vec![],
115
+
},
116
+
}
117
+
}
118
+
}
119
+
120
+
impl Message<SourceQuotes> for FsFilterSourceManager {
121
+
type Reply = SourceReply;
122
+
123
+
async fn handle(
124
+
&mut self,
125
+
_msg: SourceQuotes,
126
+
_ctx: &mut Context<Self, Self::Reply>,
127
+
) -> Self::Reply {
128
+
self.filter.read_files()
129
+
}
130
+
}
131
+
132
+
#[derive(Clone, Debug)]
133
+
pub struct QuoteFilter {
134
+
path: String,
135
+
content: String,
136
+
_dates: Vec<String>,
137
+
}
138
+
139
+
impl QuoteFilter {
140
+
// TODO: actually leverage async I/O
141
+
fn read_files(&self) -> Result<Vec<Quote>, ()> {
142
+
use glob::glob;
143
+
use grep::{regex, searcher::sinks};
144
+
145
+
let matcher = regex::RegexMatcher::new(&self.content).map_err(|_| ())?;
146
+
let mut searcher = grep::searcher::Searcher::new();
147
+
let mut results = Vec::new();
148
+
149
+
for file in glob(&self.path).map_err(|_| ())? {
150
+
let file = match file {
151
+
Ok(file) => file,
152
+
Err(_) => continue,
153
+
};
154
+
155
+
let mut matched = false;
156
+
let sink = sinks::Lossy(|_lnum, _line| {
157
+
matched = true;
158
+
Ok(false)
159
+
});
160
+
161
+
let search_result = searcher.search_path(&matcher, &file, sink);
162
+
if !matched || search_result.is_err() {
163
+
continue;
164
+
}
165
+
166
+
let contents = std::fs::read_to_string(file).map_err(|_| ())?;
167
+
results.push(contents.trim().into());
168
+
}
169
+
170
+
Ok(results)
171
+
}
172
+
}
173
+
}
174
+
175
+
pub mod queue {
176
+
use std::collections::VecDeque;
177
+
178
+
use super::*;
179
+
180
+
// Messages to interact with the quote queue
181
+
pub struct DequeueQuote;
182
+
pub type DequeueReply = Result<Option<Quote>, ()>;
183
+
184
+
pub struct EnqueueQuotes(pub Vec<Quote>);
185
+
pub type EnqueueReply = Result<(), ()>;
186
+
187
+
/// Subtrait of Actor which specifically
188
+
/// denotes actors that can handle all relevant queue messages.
189
+
pub trait QueueManager:
190
+
Actor
191
+
+ Message<EnqueueQuotes, Reply = EnqueueReply>
192
+
+ Message<DequeueQuote, Reply = DequeueReply>
193
+
{
194
+
}
195
+
196
+
impl<T> QueueManager for T where
197
+
T: Message<EnqueueQuotes, Reply = EnqueueReply>
198
+
+ Message<DequeueQuote, Reply = DequeueReply>
199
+
{
200
+
}
201
+
202
+
/// A basic implementation of an in-memory queue of quotes.
203
+
/// Its contents are *not* persisted across application restarts, so it
204
+
/// is only suited for testing purposes.
205
+
#[derive(Actor, Default)]
206
+
pub struct MemoryQueueStorage {
207
+
quotes: VecDeque<Quote>,
208
+
}
209
+
210
+
impl MemoryQueueStorage {
211
+
pub fn new() -> Self {
212
+
Self {
213
+
quotes: VecDeque::new(),
214
+
}
215
+
}
216
+
}
217
+
218
+
impl Message<EnqueueQuotes> for MemoryQueueStorage {
219
+
/// We only need to signal success or failure in this instance,
220
+
/// with no added metadata in either case.
221
+
type Reply = EnqueueReply;
222
+
223
+
async fn handle(
224
+
&mut self,
225
+
msg: EnqueueQuotes,
226
+
_ctx: &mut Context<Self, Self::Reply>,
227
+
) -> Self::Reply {
228
+
for q in msg.0 {
229
+
self.quotes.push_back(q);
230
+
}
231
+
232
+
Ok(())
233
+
}
234
+
}
235
+
236
+
impl Message<DequeueQuote> for MemoryQueueStorage {
237
+
type Reply = DequeueReply;
238
+
239
+
async fn handle(
240
+
&mut self,
241
+
_msg: DequeueQuote,
242
+
_ctx: &mut Context<Self, Self::Reply>,
243
+
) -> Self::Reply {
244
+
// Note: this can never fail, since the quotes are stored in memory
245
+
Ok(self.quotes.pop_front())
246
+
}
247
+
}
248
+
}
249
+
250
+
#[derive(Actor)]
251
+
pub struct QuoteCycle<S: source::SourceManager, Q: queue::QueueManager> {
252
+
rng: rng::PrngState,
253
+
source_manager: ActorRef<S>,
254
+
queue_manager: ActorRef<Q>,
255
+
}
256
+
257
+
impl<S: source::SourceManager, Q: queue::QueueManager> QuoteCycle<S, Q> {
258
+
pub fn new(
259
+
rng: rng::PrngState,
260
+
source_manager: ActorRef<S>,
261
+
queue_manager: ActorRef<Q>,
262
+
) -> Self {
263
+
Self {
264
+
rng,
265
+
source_manager,
266
+
queue_manager,
267
+
}
268
+
}
269
+
270
+
pub fn with_thread_rng(source_manager: ActorRef<S>, queue_manager: ActorRef<Q>) -> Self {
271
+
Self {
272
+
rng: rng::PrngState::from_thread_rng(),
273
+
source_manager,
274
+
queue_manager,
275
+
}
276
+
}
277
+
}
278
+
279
+
/// A message to [QuoteCycle] to fetch one more quote from its storage.
280
+
pub struct FetchQuote;
281
+
282
+
impl<S, Q> Message<FetchQuote> for QuoteCycle<S, Q>
283
+
where
284
+
S: source::SourceManager,
285
+
Q: queue::QueueManager,
286
+
{
287
+
type Reply = Result<Quote, ()>;
288
+
289
+
async fn handle(
290
+
&mut self,
291
+
_msg: FetchQuote,
292
+
_ctx: &mut Context<Self, Self::Reply>,
293
+
) -> Self::Reply {
294
+
// 1. We query our queue storage for the next quote
295
+
if let Some(next_quote) = self.queue_manager.ask(DequeueQuote).await.map_err(|_| ())? {
296
+
// if there is a quote, we simply return it and move on
297
+
return Ok(next_quote);
298
+
}
299
+
300
+
// 2. Otherwise, we must repopulate the queue through our source
301
+
let mut refreshed_quotes = self
302
+
.source_manager
303
+
.ask(SourceQuotes)
304
+
.await
305
+
.map_err(|_| ())?;
306
+
307
+
// 3. We shuffle the newly-sourced quotes
308
+
self.rng.shuffle_slice(&mut refreshed_quotes);
309
+
let refreshed_quotes = refreshed_quotes; // No longer mutable
310
+
311
+
// TODO: Perhaps we should assert that the new quotes are non-empty?
312
+
// 4. We enqueue the newly-sourced quotes...
313
+
self.queue_manager
314
+
.ask(EnqueueQuotes(refreshed_quotes))
315
+
.await
316
+
.map_err(|_| ())?;
317
+
318
+
// 5. and, finally, we return the first among them.
319
+
match self.queue_manager.ask(DequeueQuote).await {
320
+
Ok(Some(q)) => Ok(q),
321
+
Ok(None) => panic!("Newly-enqueued quotes should never be empty"),
322
+
Err(_) => Err(()),
323
+
}
324
+
}
325
+
}
326
+
327
+
mod test {
328
+
#[tokio::test]
329
+
async fn memory_queue() {
330
+
use super::Quote;
331
+
use super::queue::*;
332
+
use kameo::prelude::*;
333
+
334
+
let queue_manager = MemoryQueueStorage::spawn(MemoryQueueStorage::new());
335
+
336
+
let sample_quotes = ["Test no.1", "Test no.2", "Test no.3"];
337
+
338
+
queue_manager
339
+
.ask(EnqueueQuotes(
340
+
sample_quotes.iter().cloned().map(Quote::from).collect(),
341
+
))
342
+
.await
343
+
.expect("In-memory quote queue storage should be valid for insertion");
344
+
345
+
for text in sample_quotes.iter() {
346
+
assert_eq!(
347
+
*text,
348
+
queue_manager
349
+
.ask(DequeueQuote)
350
+
.await
351
+
.expect("In-memory queue storage should never panic on dequeue")
352
+
.expect("In-memory queue storage should never be initialized as empty")
353
+
.get()
354
+
);
355
+
}
356
+
}
357
+
358
+
#[tokio::test]
359
+
async fn memory_source() {
360
+
use super::source::*;
361
+
use kameo::prelude::*;
362
+
363
+
let sample_quotes = ["Minie", "Miney", "Moe", "and", "some", "more"];
364
+
365
+
let source_manager = MemorySourceManager::spawn(MemorySourceManager::new(sample_quotes));
366
+
367
+
let quotes = source_manager
368
+
.ask(SourceQuotes)
369
+
.await
370
+
.expect("In-memory quote queue storage should be valid for insertion");
371
+
372
+
assert_eq!(
373
+
sample_quotes.as_slice(),
374
+
quotes
375
+
.into_iter()
376
+
// Since [Quote] doesn't implement any Equality trait,
377
+
// we map the strings into quotes instead
378
+
.map(String::from)
379
+
.collect::<Vec<_>>()
380
+
.as_slice(),
381
+
);
382
+
}
383
+
384
+
#[tokio::test]
385
+
async fn memory_cycle() {
386
+
use std::{collections::HashMap, ops::AddAssign};
387
+
388
+
use super::FetchQuote;
389
+
use super::QuoteCycle;
390
+
use super::queue::*;
391
+
use super::source::*;
392
+
use kameo::prelude::*;
393
+
394
+
let sample_quotes = ["Minie", "Miney", "Moe"];
395
+
let cycle = {
396
+
let source = MemorySourceManager::spawn(MemorySourceManager::new(sample_quotes));
397
+
let queue = MemoryQueueStorage::spawn(MemoryQueueStorage::new());
398
+
399
+
QuoteCycle::spawn(QuoteCycle::with_thread_rng(source, queue))
400
+
};
401
+
402
+
// We loop over `sample_quotes` twice to simulate the queue being exhausted fully, then re-sourced
403
+
// Since the `cycle` manager will shuffle the quote sequence, we will verify that each
404
+
// quote appears *exactly* `LOOPS` times throughout these iterations.
405
+
const LOOPS: usize = 3;
406
+
let mut quote_counts = HashMap::new();
407
+
for _ in 0..(sample_quotes.len() * LOOPS) {
408
+
let next_quote = cycle.ask(FetchQuote).await.unwrap();
409
+
quote_counts
410
+
.entry(next_quote.get().to_owned())
411
+
.or_insert(0)
412
+
.add_assign(1);
413
+
}
414
+
415
+
let quote_counts = quote_counts; // no longer mut
416
+
assert!(
417
+
// Note: technically speaking, different quotes could contain equivalent strings,
418
+
// which would make this test fail; a "more proper" invariant check would ensure
419
+
// verify that all counts are a multiple of the amount of times `sample_quotes` was chained,
420
+
// and that the sum of all counts equals the total number of times a `FetchQuote` message was sent.
421
+
quote_counts.into_values().into_iter().all(|c| c == LOOPS)
422
+
);
423
+
}
424
+
}