+90
-175
Diff
round #0
+3
crates/tranquil-comms/Cargo.toml
+3
crates/tranquil-comms/Cargo.toml
···
6
6
7
7
[dependencies]
8
8
tranquil-config = { workspace = true }
9
+
tranquil-signal = { workspace = true }
9
10
10
11
async-trait = { workspace = true }
11
12
base64 = { workspace = true }
12
13
reqwest = { workspace = true }
13
14
serde_json = { workspace = true }
15
+
sqlx = { workspace = true }
14
16
thiserror = { workspace = true }
15
17
tokio = { workspace = true }
18
+
tracing = { workspace = true }
16
19
tranquil-db-traits = { workspace = true }
17
20
uuid = { workspace = true }
+87
-175
crates/tranquil-comms/src/sender.rs
+87
-175
crates/tranquil-comms/src/sender.rs
···
6
6
use std::time::Duration;
7
7
use tokio::io::AsyncWriteExt;
8
8
use tokio::process::Command;
9
-
use tokio::time::timeout;
10
9
11
10
use super::types::{CommsChannel, QueuedComms};
12
11
···
58
57
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
59
58
}
60
59
60
+
async fn send_http_with_retry<F, Fut>(service_name: &str, send_request: F) -> Result<(), SendError>
61
+
where
62
+
F: Fn() -> Fut,
63
+
Fut: std::future::Future<Output = Result<reqwest::Response, reqwest::Error>>,
64
+
{
65
+
let mut last_error = None;
66
+
for attempt in 0..MAX_RETRIES {
67
+
match send_request().await {
68
+
Ok(response) => {
69
+
if response.status().is_success() {
70
+
return Ok(());
71
+
}
72
+
let status = response.status();
73
+
if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
74
+
last_error = Some(format!("{service_name} API returned {status}"));
75
+
retry_delay(attempt).await;
76
+
continue;
77
+
}
78
+
let body = response.text().await.unwrap_or_default();
79
+
return Err(SendError::ExternalService(format!(
80
+
"{service_name} API returned {status}: {body}",
81
+
)));
82
+
}
83
+
Err(e) => {
84
+
if e.is_timeout() {
85
+
if attempt < MAX_RETRIES - 1 {
86
+
last_error = Some(format!("{service_name} request timed out"));
87
+
retry_delay(attempt).await;
88
+
continue;
89
+
}
90
+
return Err(SendError::Timeout);
91
+
}
92
+
return Err(SendError::ExternalService(format!(
93
+
"{service_name} request failed: {e}",
94
+
)));
95
+
}
96
+
}
97
+
}
98
+
Err(SendError::MaxRetriesExceeded(
99
+
last_error.unwrap_or_else(|| "unknown error".to_string()),
100
+
))
101
+
}
102
+
61
103
pub fn sanitize_header_value(value: &str) -> String {
62
104
value.replace(['\r', '\n'], " ").trim().to_string()
63
105
}
···
90
132
}
91
133
92
134
pub fn is_valid_signal_username(username: &str) -> bool {
93
-
if username.len() < 6 || username.len() > 35 {
94
-
return false;
95
-
}
96
-
let Some((base, discriminator)) = username.rsplit_once('.') else {
97
-
return false;
98
-
};
99
-
if base.len() < 3 || base.len() > 32 {
100
-
return false;
101
-
}
102
-
if !base.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
103
-
return false;
104
-
}
105
-
if !base.chars().next().is_some_and(|c| c.is_ascii_alphabetic()) {
106
-
return false;
107
-
}
108
-
discriminator.len() == 2 && discriminator.chars().all(|c| c.is_ascii_digit())
135
+
tranquil_signal::SignalUsername::parse(username).is_ok()
109
136
}
110
137
111
138
pub struct EmailSender {
···
414
441
let payload = json!({ "content": content });
415
442
let url = format!("{}/channels/{}/messages", DISCORD_API_BASE, channel_id);
416
443
417
-
let mut last_error = None;
418
-
for attempt in 0..MAX_RETRIES {
419
-
let result = self
420
-
.http_client
444
+
send_http_with_retry("Discord", || {
445
+
self.http_client
421
446
.post(&url)
422
447
.header("Authorization", self.auth_header())
423
448
.json(&payload)
424
449
.send()
425
-
.await;
426
-
match result {
427
-
Ok(response) => {
428
-
if response.status().is_success() {
429
-
return Ok(());
430
-
}
431
-
let status = response.status();
432
-
if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
433
-
last_error = Some(format!("Discord API returned {}", status));
434
-
retry_delay(attempt).await;
435
-
continue;
436
-
}
437
-
let body = response.text().await.unwrap_or_default();
438
-
return Err(SendError::ExternalService(format!(
439
-
"Discord API returned {}: {}",
440
-
status, body
441
-
)));
442
-
}
443
-
Err(e) => {
444
-
if e.is_timeout() {
445
-
if attempt < MAX_RETRIES - 1 {
446
-
last_error = Some("Discord request timed out".to_string());
447
-
retry_delay(attempt).await;
448
-
continue;
449
-
}
450
-
return Err(SendError::Timeout);
451
-
}
452
-
return Err(SendError::ExternalService(format!(
453
-
"Discord request failed: {}",
454
-
e
455
-
)));
456
-
}
457
-
}
458
-
}
459
-
Err(SendError::MaxRetriesExceeded(
460
-
last_error.unwrap_or_else(|| "Unknown error".to_string()),
461
-
))
450
+
})
451
+
.await
462
452
}
463
453
}
464
454
···
552
542
"text": text,
553
543
"parse_mode": "HTML"
554
544
});
555
-
let mut last_error = None;
556
-
for attempt in 0..MAX_RETRIES {
557
-
let result = self.http_client.post(&url).json(&payload).send().await;
558
-
match result {
559
-
Ok(response) => {
560
-
if response.status().is_success() {
561
-
return Ok(());
562
-
}
563
-
let status = response.status();
564
-
if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
565
-
last_error = Some(format!("Telegram API returned {}", status));
566
-
retry_delay(attempt).await;
567
-
continue;
568
-
}
569
-
let body = response.text().await.unwrap_or_default();
570
-
return Err(SendError::ExternalService(format!(
571
-
"Telegram API returned {}: {}",
572
-
status, body
573
-
)));
574
-
}
575
-
Err(e) => {
576
-
if e.is_timeout() {
577
-
if attempt < MAX_RETRIES - 1 {
578
-
last_error = Some("Telegram request timed out".to_string());
579
-
retry_delay(attempt).await;
580
-
continue;
581
-
}
582
-
return Err(SendError::Timeout);
583
-
}
584
-
return Err(SendError::ExternalService(format!(
585
-
"Telegram request failed: {}",
586
-
e
587
-
)));
588
-
}
589
-
}
590
-
}
591
-
Err(SendError::MaxRetriesExceeded(
592
-
last_error.unwrap_or_else(|| "Unknown error".to_string()),
593
-
))
545
+
546
+
send_http_with_retry("Telegram", || {
547
+
self.http_client.post(&url).json(&payload).send()
548
+
})
549
+
.await
594
550
}
595
551
}
596
552
597
553
pub struct SignalSender {
598
-
signal_cli_path: String,
599
-
sender_number: String,
554
+
slot: std::sync::Arc<tranquil_signal::SignalSlot>,
600
555
}
601
556
602
557
impl SignalSender {
603
-
pub fn new(signal_cli_path: String, sender_number: String) -> Self {
604
-
Self {
605
-
signal_cli_path,
606
-
sender_number,
607
-
}
608
-
}
609
-
610
-
pub fn from_config(cfg: &tranquil_config::TranquilConfig) -> Option<Self> {
611
-
let signal_cli_path = cfg.signal.cli_path.clone();
612
-
let sender_number = cfg.signal.sender_number.clone()?;
613
-
Some(Self::new(signal_cli_path, sender_number))
558
+
pub fn new(slot: std::sync::Arc<tranquil_signal::SignalSlot>) -> Self {
559
+
Self { slot }
614
560
}
615
561
}
616
562
617
-
const SIGNAL_TIMEOUT_SECS: u64 = 30;
618
-
619
-
fn is_retryable_signal_error(stderr: &str) -> bool {
620
-
let lower = stderr.to_lowercase();
621
-
lower.contains("timeout")
622
-
|| lower.contains("timed out")
623
-
|| lower.contains("connection refused")
624
-
|| lower.contains("network")
625
-
|| lower.contains("temporarily")
626
-
|| lower.contains("try again")
627
-
|| lower.contains("rate limit")
628
-
}
629
-
630
563
#[async_trait]
631
564
impl CommsSender for SignalSender {
632
565
fn channel(&self) -> CommsChannel {
···
634
567
}
635
568
636
569
async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> {
637
-
let recipient = ¬ification.recipient;
638
-
if !is_valid_signal_username(recipient) {
639
-
return Err(SendError::InvalidRecipient(format!(
640
-
"Invalid Signal username format: {}",
641
-
recipient
642
-
)));
643
-
}
570
+
let username = tranquil_signal::SignalUsername::parse(¬ification.recipient)
571
+
.map_err(|e| SendError::InvalidRecipient(e.to_string()))?;
572
+
573
+
let client = self
574
+
.slot
575
+
.client()
576
+
.await
577
+
.ok_or(SendError::NotConfigured(CommsChannel::Signal))?;
578
+
644
579
let subject = notification.subject.as_deref().unwrap_or("Notification");
645
-
let message = format!("{}\n\n{}", subject, notification.body);
580
+
let raw_message = format!("{}\n\n{}", subject, notification.body);
581
+
let message = tranquil_signal::MessageBody::new(raw_message)
582
+
.map_err(|e| SendError::InvalidRecipient(e.to_string()))?;
646
583
647
584
let mut last_error = None;
648
585
for attempt in 0..MAX_RETRIES {
649
-
let cmd_future = Command::new(&self.signal_cli_path)
650
-
.arg("-u")
651
-
.arg(&self.sender_number)
652
-
.arg("send")
653
-
.arg("--username")
654
-
.arg(recipient)
655
-
.arg("-m")
656
-
.arg(&message)
657
-
.output();
658
-
659
-
let result = timeout(Duration::from_secs(SIGNAL_TIMEOUT_SECS), cmd_future).await;
660
-
661
-
match result {
662
-
Ok(Ok(output)) if output.status.success() => return Ok(()),
663
-
Ok(Ok(output)) => {
664
-
let stderr = String::from_utf8_lossy(&output.stderr);
665
-
if is_retryable_signal_error(&stderr) && attempt < MAX_RETRIES - 1 {
666
-
last_error = Some(format!("signal-cli failed: {}", stderr));
667
-
retry_delay(attempt).await;
668
-
continue;
669
-
}
670
-
return Err(SendError::ExternalService(format!(
671
-
"signal-cli failed: {}",
672
-
stderr
673
-
)));
674
-
}
675
-
Ok(Err(e)) => {
676
-
if attempt < MAX_RETRIES - 1 {
677
-
last_error = Some(format!("signal-cli spawn failed: {}", e));
678
-
retry_delay(attempt).await;
679
-
continue;
680
-
}
681
-
return Err(SendError::ProcessSpawn {
682
-
command: self.signal_cli_path.clone(),
683
-
source: e,
684
-
});
685
-
}
686
-
Err(_) => {
687
-
if attempt < MAX_RETRIES - 1 {
688
-
last_error = Some("signal-cli timed out".to_string());
689
-
retry_delay(attempt).await;
690
-
continue;
586
+
match client.send(&username, message.clone()).await {
587
+
Ok(()) => return Ok(()),
588
+
Err(e) => {
589
+
let err_str = e.to_string();
590
+
match &e {
591
+
tranquil_signal::SignalError::UsernameNotFound(_)
592
+
| tranquil_signal::SignalError::UsernameLookup(_)
593
+
| tranquil_signal::SignalError::NotLinked => {
594
+
return Err(SendError::ExternalService(format!(
595
+
"signal send failed: {err_str}"
596
+
)));
597
+
}
598
+
_ => {
599
+
last_error = Some(err_str);
600
+
if attempt < MAX_RETRIES - 1 {
601
+
retry_delay(attempt).await;
602
+
}
603
+
}
691
604
}
692
-
return Err(SendError::Timeout);
693
605
}
694
606
}
695
607
}
696
608
Err(SendError::MaxRetriesExceeded(
697
-
last_error.unwrap_or_else(|| "Unknown error".to_string()),
609
+
last_error.unwrap_or_else(|| "unknown error".to_string()),
698
610
))
699
611
}
700
612
}
History
1 round
0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
refactor(comms): extract HTTP retry combinator and integrate signal sender
expand 0 comments
pull request successfully merged