Our Personal Data Server from scratch! tranquil.farm
atproto pds rust postgresql fun oauth

refactor(comms): extract HTTP retry combinator and integrate signal sender #91

merged opened by oyster.cafe targeting main from feat/signal-client-in-house
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mhlxir6esy22
+90 -175
Diff #0
+3
crates/tranquil-comms/Cargo.toml
··· 6 6 7 7 [dependencies] 8 8 tranquil-config = { workspace = true } 9 + tranquil-signal = { workspace = true } 9 10 10 11 async-trait = { workspace = true } 11 12 base64 = { workspace = true } 12 13 reqwest = { workspace = true } 13 14 serde_json = { workspace = true } 15 + sqlx = { workspace = true } 14 16 thiserror = { workspace = true } 15 17 tokio = { workspace = true } 18 + tracing = { workspace = true } 16 19 tranquil-db-traits = { workspace = true } 17 20 uuid = { workspace = true }
+87 -175
crates/tranquil-comms/src/sender.rs
··· 6 6 use std::time::Duration; 7 7 use tokio::io::AsyncWriteExt; 8 8 use tokio::process::Command; 9 - use tokio::time::timeout; 10 9 11 10 use super::types::{CommsChannel, QueuedComms}; 12 11 ··· 58 57 tokio::time::sleep(Duration::from_millis(delay_ms)).await; 59 58 } 60 59 60 + async fn send_http_with_retry<F, Fut>(service_name: &str, send_request: F) -> Result<(), SendError> 61 + where 62 + F: Fn() -> Fut, 63 + Fut: std::future::Future<Output = Result<reqwest::Response, reqwest::Error>>, 64 + { 65 + let mut last_error = None; 66 + for attempt in 0..MAX_RETRIES { 67 + match send_request().await { 68 + Ok(response) => { 69 + if response.status().is_success() { 70 + return Ok(()); 71 + } 72 + let status = response.status(); 73 + if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 74 + last_error = Some(format!("{service_name} API returned {status}")); 75 + retry_delay(attempt).await; 76 + continue; 77 + } 78 + let body = response.text().await.unwrap_or_default(); 79 + return Err(SendError::ExternalService(format!( 80 + "{service_name} API returned {status}: {body}", 81 + ))); 82 + } 83 + Err(e) => { 84 + if e.is_timeout() { 85 + if attempt < MAX_RETRIES - 1 { 86 + last_error = Some(format!("{service_name} request timed out")); 87 + retry_delay(attempt).await; 88 + continue; 89 + } 90 + return Err(SendError::Timeout); 91 + } 92 + return Err(SendError::ExternalService(format!( 93 + "{service_name} request failed: {e}", 94 + ))); 95 + } 96 + } 97 + } 98 + Err(SendError::MaxRetriesExceeded( 99 + last_error.unwrap_or_else(|| "unknown error".to_string()), 100 + )) 101 + } 102 + 61 103 pub fn sanitize_header_value(value: &str) -> String { 62 104 value.replace(['\r', '\n'], " ").trim().to_string() 63 105 } ··· 90 132 } 91 133 92 134 pub fn is_valid_signal_username(username: &str) -> bool { 93 - if username.len() < 6 || username.len() > 35 { 94 - return false; 95 - } 96 - let Some((base, discriminator)) = username.rsplit_once('.') else { 97 - return false; 98 - }; 99 - if base.len() < 3 || base.len() > 32 { 100 - return false; 101 - } 102 - if !base.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') { 103 - return false; 104 - } 105 - if !base.chars().next().is_some_and(|c| c.is_ascii_alphabetic()) { 106 - return false; 107 - } 108 - discriminator.len() == 2 && discriminator.chars().all(|c| c.is_ascii_digit()) 135 + tranquil_signal::SignalUsername::parse(username).is_ok() 109 136 } 110 137 111 138 pub struct EmailSender { ··· 414 441 let payload = json!({ "content": content }); 415 442 let url = format!("{}/channels/{}/messages", DISCORD_API_BASE, channel_id); 416 443 417 - let mut last_error = None; 418 - for attempt in 0..MAX_RETRIES { 419 - let result = self 420 - .http_client 444 + send_http_with_retry("Discord", || { 445 + self.http_client 421 446 .post(&url) 422 447 .header("Authorization", self.auth_header()) 423 448 .json(&payload) 424 449 .send() 425 - .await; 426 - match result { 427 - Ok(response) => { 428 - if response.status().is_success() { 429 - return Ok(()); 430 - } 431 - let status = response.status(); 432 - if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 433 - last_error = Some(format!("Discord API returned {}", status)); 434 - retry_delay(attempt).await; 435 - continue; 436 - } 437 - let body = response.text().await.unwrap_or_default(); 438 - return Err(SendError::ExternalService(format!( 439 - "Discord API returned {}: {}", 440 - status, body 441 - ))); 442 - } 443 - Err(e) => { 444 - if e.is_timeout() { 445 - if attempt < MAX_RETRIES - 1 { 446 - last_error = Some("Discord request timed out".to_string()); 447 - retry_delay(attempt).await; 448 - continue; 449 - } 450 - return Err(SendError::Timeout); 451 - } 452 - return Err(SendError::ExternalService(format!( 453 - "Discord request failed: {}", 454 - e 455 - ))); 456 - } 457 - } 458 - } 459 - Err(SendError::MaxRetriesExceeded( 460 - last_error.unwrap_or_else(|| "Unknown error".to_string()), 461 - )) 450 + }) 451 + .await 462 452 } 463 453 } 464 454 ··· 552 542 "text": text, 553 543 "parse_mode": "HTML" 554 544 }); 555 - let mut last_error = None; 556 - for attempt in 0..MAX_RETRIES { 557 - let result = self.http_client.post(&url).json(&payload).send().await; 558 - match result { 559 - Ok(response) => { 560 - if response.status().is_success() { 561 - return Ok(()); 562 - } 563 - let status = response.status(); 564 - if is_retryable_status(status) && attempt < MAX_RETRIES - 1 { 565 - last_error = Some(format!("Telegram API returned {}", status)); 566 - retry_delay(attempt).await; 567 - continue; 568 - } 569 - let body = response.text().await.unwrap_or_default(); 570 - return Err(SendError::ExternalService(format!( 571 - "Telegram API returned {}: {}", 572 - status, body 573 - ))); 574 - } 575 - Err(e) => { 576 - if e.is_timeout() { 577 - if attempt < MAX_RETRIES - 1 { 578 - last_error = Some("Telegram request timed out".to_string()); 579 - retry_delay(attempt).await; 580 - continue; 581 - } 582 - return Err(SendError::Timeout); 583 - } 584 - return Err(SendError::ExternalService(format!( 585 - "Telegram request failed: {}", 586 - e 587 - ))); 588 - } 589 - } 590 - } 591 - Err(SendError::MaxRetriesExceeded( 592 - last_error.unwrap_or_else(|| "Unknown error".to_string()), 593 - )) 545 + 546 + send_http_with_retry("Telegram", || { 547 + self.http_client.post(&url).json(&payload).send() 548 + }) 549 + .await 594 550 } 595 551 } 596 552 597 553 pub struct SignalSender { 598 - signal_cli_path: String, 599 - sender_number: String, 554 + slot: std::sync::Arc<tranquil_signal::SignalSlot>, 600 555 } 601 556 602 557 impl SignalSender { 603 - pub fn new(signal_cli_path: String, sender_number: String) -> Self { 604 - Self { 605 - signal_cli_path, 606 - sender_number, 607 - } 608 - } 609 - 610 - pub fn from_config(cfg: &tranquil_config::TranquilConfig) -> Option<Self> { 611 - let signal_cli_path = cfg.signal.cli_path.clone(); 612 - let sender_number = cfg.signal.sender_number.clone()?; 613 - Some(Self::new(signal_cli_path, sender_number)) 558 + pub fn new(slot: std::sync::Arc<tranquil_signal::SignalSlot>) -> Self { 559 + Self { slot } 614 560 } 615 561 } 616 562 617 - const SIGNAL_TIMEOUT_SECS: u64 = 30; 618 - 619 - fn is_retryable_signal_error(stderr: &str) -> bool { 620 - let lower = stderr.to_lowercase(); 621 - lower.contains("timeout") 622 - || lower.contains("timed out") 623 - || lower.contains("connection refused") 624 - || lower.contains("network") 625 - || lower.contains("temporarily") 626 - || lower.contains("try again") 627 - || lower.contains("rate limit") 628 - } 629 - 630 563 #[async_trait] 631 564 impl CommsSender for SignalSender { 632 565 fn channel(&self) -> CommsChannel { ··· 634 567 } 635 568 636 569 async fn send(&self, notification: &QueuedComms) -> Result<(), SendError> { 637 - let recipient = &notification.recipient; 638 - if !is_valid_signal_username(recipient) { 639 - return Err(SendError::InvalidRecipient(format!( 640 - "Invalid Signal username format: {}", 641 - recipient 642 - ))); 643 - } 570 + let username = tranquil_signal::SignalUsername::parse(&notification.recipient) 571 + .map_err(|e| SendError::InvalidRecipient(e.to_string()))?; 572 + 573 + let client = self 574 + .slot 575 + .client() 576 + .await 577 + .ok_or(SendError::NotConfigured(CommsChannel::Signal))?; 578 + 644 579 let subject = notification.subject.as_deref().unwrap_or("Notification"); 645 - let message = format!("{}\n\n{}", subject, notification.body); 580 + let raw_message = format!("{}\n\n{}", subject, notification.body); 581 + let message = tranquil_signal::MessageBody::new(raw_message) 582 + .map_err(|e| SendError::InvalidRecipient(e.to_string()))?; 646 583 647 584 let mut last_error = None; 648 585 for attempt in 0..MAX_RETRIES { 649 - let cmd_future = Command::new(&self.signal_cli_path) 650 - .arg("-u") 651 - .arg(&self.sender_number) 652 - .arg("send") 653 - .arg("--username") 654 - .arg(recipient) 655 - .arg("-m") 656 - .arg(&message) 657 - .output(); 658 - 659 - let result = timeout(Duration::from_secs(SIGNAL_TIMEOUT_SECS), cmd_future).await; 660 - 661 - match result { 662 - Ok(Ok(output)) if output.status.success() => return Ok(()), 663 - Ok(Ok(output)) => { 664 - let stderr = String::from_utf8_lossy(&output.stderr); 665 - if is_retryable_signal_error(&stderr) && attempt < MAX_RETRIES - 1 { 666 - last_error = Some(format!("signal-cli failed: {}", stderr)); 667 - retry_delay(attempt).await; 668 - continue; 669 - } 670 - return Err(SendError::ExternalService(format!( 671 - "signal-cli failed: {}", 672 - stderr 673 - ))); 674 - } 675 - Ok(Err(e)) => { 676 - if attempt < MAX_RETRIES - 1 { 677 - last_error = Some(format!("signal-cli spawn failed: {}", e)); 678 - retry_delay(attempt).await; 679 - continue; 680 - } 681 - return Err(SendError::ProcessSpawn { 682 - command: self.signal_cli_path.clone(), 683 - source: e, 684 - }); 685 - } 686 - Err(_) => { 687 - if attempt < MAX_RETRIES - 1 { 688 - last_error = Some("signal-cli timed out".to_string()); 689 - retry_delay(attempt).await; 690 - continue; 586 + match client.send(&username, message.clone()).await { 587 + Ok(()) => return Ok(()), 588 + Err(e) => { 589 + let err_str = e.to_string(); 590 + match &e { 591 + tranquil_signal::SignalError::UsernameNotFound(_) 592 + | tranquil_signal::SignalError::UsernameLookup(_) 593 + | tranquil_signal::SignalError::NotLinked => { 594 + return Err(SendError::ExternalService(format!( 595 + "signal send failed: {err_str}" 596 + ))); 597 + } 598 + _ => { 599 + last_error = Some(err_str); 600 + if attempt < MAX_RETRIES - 1 { 601 + retry_delay(attempt).await; 602 + } 603 + } 691 604 } 692 - return Err(SendError::Timeout); 693 605 } 694 606 } 695 607 } 696 608 Err(SendError::MaxRetriesExceeded( 697 - last_error.unwrap_or_else(|| "Unknown error".to_string()), 609 + last_error.unwrap_or_else(|| "unknown error".to_string()), 698 610 )) 699 611 } 700 612 }

History

1 round 0 comments
sign up or login to add to the discussion
oyster.cafe submitted #0
1 commit
expand
refactor(comms): extract HTTP retry combinator and integrate signal sender
expand 0 comments
pull request successfully merged