Our Personal Data Server from scratch! tranquil.farm
atproto pds rust postgresql fun oauth

feat(signal): add admin endpoints, config, and server wiring #92

merged opened by oyster.cafe targeting main from feat/signal-client-in-house
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mhlxir6ev722
+208 -21
Diff #0
+3
crates/tranquil-api/Cargo.toml
··· 12 12 tranquil-db-traits = { workspace = true } 13 13 tranquil-lexicon = { workspace = true, features = ["resolve"] } 14 14 tranquil-scopes = { workspace = true } 15 + tranquil-signal = { workspace = true } 15 16 16 17 anyhow = { workspace = true } 17 18 axum = { workspace = true } 18 19 backon = { workspace = true } 19 20 base32 = { workspace = true } 20 21 base64 = { workspace = true } 22 + image = { workspace = true } 23 + qrcodegen = { workspace = true } 21 24 bcrypt = { workspace = true } 22 25 bs58 = { workspace = true } 23 26 bytes = { workspace = true }
+2
crates/tranquil-api/src/admin/mod.rs
··· 2 2 pub mod config; 3 3 pub mod invite; 4 4 pub mod server_stats; 5 + pub mod signal; 5 6 pub mod status; 6 7 7 8 pub use account::{ ··· 13 14 disable_account_invites, disable_invite_codes, enable_account_invites, get_invite_codes, 14 15 }; 15 16 pub use server_stats::get_server_stats; 17 + pub use signal::{get_signal_status, link_signal_device, unlink_signal_device}; 16 18 pub use status::{get_subject_status, update_subject_status};
+163
crates/tranquil-api/src/admin/signal.rs
··· 1 + use axum::{Json, extract::State}; 2 + use base64::{Engine, engine::general_purpose::STANDARD}; 3 + use image::{ImageBuffer, Luma}; 4 + use serde::Serialize; 5 + use tranquil_pds::api::error::ApiError; 6 + use tranquil_pds::auth::{Admin, Auth}; 7 + use tranquil_pds::state::AppState; 8 + use tranquil_signal::PgSignalStore; 9 + 10 + #[derive(Serialize)] 11 + #[serde(rename_all = "camelCase")] 12 + pub struct SignalStatusOutput { 13 + pub enabled: bool, 14 + pub linked: bool, 15 + } 16 + 17 + #[derive(Serialize)] 18 + #[serde(rename_all = "camelCase")] 19 + pub struct SignalLinkOutput { 20 + pub qr_base64: String, 21 + } 22 + 23 + pub async fn get_signal_status( 24 + State(state): State<AppState>, 25 + _auth: Auth<Admin>, 26 + ) -> Result<Json<SignalStatusOutput>, ApiError> { 27 + let enabled = tranquil_config::get().signal.enabled; 28 + let linked = match &state.signal_sender { 29 + Some(slot) => slot.is_linked().await, 30 + None => false, 31 + }; 32 + 33 + Ok(Json(SignalStatusOutput { enabled, linked })) 34 + } 35 + 36 + pub async fn link_signal_device( 37 + State(state): State<AppState>, 38 + _auth: Auth<Admin>, 39 + ) -> Result<Json<SignalLinkOutput>, ApiError> { 40 + let slot = state 41 + .signal_sender 42 + .as_ref() 43 + .ok_or_else(|| ApiError::InvalidRequest("Signal is not enabled".into()))?; 44 + 45 + if slot.is_linked().await { 46 + return Err(ApiError::InvalidRequest( 47 + "Signal device already linked".into(), 48 + )); 49 + } 50 + 51 + let (generation, link_cancel) = slot.begin_link().await; 52 + 53 + let device_name = tranquil_signal::DeviceName::new("tranquil-pds".to_string()) 54 + .map_err(|e| ApiError::InternalError(Some(format!("invalid device name: {e}"))))?; 55 + 56 + let link_result = tranquil_signal::SignalClient::link_device( 57 + &state.repos.pool, 58 + device_name, 59 + state.shutdown.clone(), 60 + link_cancel, 61 + slot.linking_flag(), 62 + ) 63 + .await 64 + .map_err(|e| ApiError::InternalError(Some(format!("Signal linking failed: {e}"))))?; 65 + 66 + let qr_base64 = url_to_qr_png_base64(link_result.url.as_str()) 67 + .map_err(|e| ApiError::InternalError(Some(format!("QR generation failed: {e}"))))?; 68 + 69 + let slot_for_task = slot.clone(); 70 + let shutdown = state.shutdown.clone(); 71 + tokio::spawn(async move { 72 + let result = tokio::select! { 73 + biased; 74 + _ = shutdown.cancelled() => { 75 + tracing::info!("Signal linking aborted due to server shutdown"); 76 + return; 77 + } 78 + r = link_result.completion => r, 79 + }; 80 + match result { 81 + Ok(Ok(client)) => { 82 + if slot_for_task.complete_link(generation, client).await { 83 + tracing::info!("Signal device linked successfully"); 84 + } else { 85 + tracing::warn!( 86 + "Signal link completed but generation mismatch or already linked; discarding" 87 + ); 88 + } 89 + } 90 + Ok(Err(e)) => { 91 + tracing::error!(error = %e, "Signal device linking failed"); 92 + } 93 + Err(_) => { 94 + tracing::error!("Signal linking task dropped without completing"); 95 + } 96 + } 97 + }); 98 + 99 + Ok(Json(SignalLinkOutput { qr_base64 })) 100 + } 101 + 102 + pub async fn unlink_signal_device( 103 + State(state): State<AppState>, 104 + _auth: Auth<Admin>, 105 + ) -> Result<Json<serde_json::Value>, ApiError> { 106 + let slot = state 107 + .signal_sender 108 + .as_ref() 109 + .ok_or_else(|| ApiError::InvalidRequest("Signal is not enabled".into()))?; 110 + 111 + let store = PgSignalStore::new(state.repos.pool.clone()); 112 + store 113 + .clear_all() 114 + .await 115 + .map_err(|e| ApiError::InternalError(Some(format!("Failed to clear signal data: {e}"))))?; 116 + 117 + slot.unlink().await; 118 + 119 + Ok(Json(serde_json::json!({}))) 120 + } 121 + 122 + const QR_MODULE_SCALE: u32 = 8; 123 + const QR_QUIET_ZONE_MODULES: u32 = 4; 124 + 125 + fn url_to_qr_png_base64(url: &str) -> Result<String, String> { 126 + let qr = qrcodegen::QrCode::encode_text(url, qrcodegen::QrCodeEcc::Medium) 127 + .map_err(|e| format!("QR encode failed: {e:?}"))?; 128 + let size = u32::try_from(qr.size()).map_err(|_| "QR size is negative".to_string())?; 129 + let img_size = size 130 + .checked_add( 131 + QR_QUIET_ZONE_MODULES 132 + .checked_mul(2) 133 + .ok_or("border overflow")?, 134 + ) 135 + .ok_or("image size overflow")? 136 + .checked_mul(QR_MODULE_SCALE) 137 + .ok_or("scaled size overflow")?; 138 + 139 + let img: ImageBuffer<Luma<u8>, Vec<u8>> = ImageBuffer::from_fn(img_size, img_size, |x, y| { 140 + let module_x = x / QR_MODULE_SCALE; 141 + let module_y = y / QR_MODULE_SCALE; 142 + match ( 143 + module_x.checked_sub(QR_QUIET_ZONE_MODULES), 144 + module_y.checked_sub(QR_QUIET_ZONE_MODULES), 145 + ) { 146 + (Some(mx), Some(my)) if mx < size && my < size => { 147 + if qr.get_module(mx as i32, my as i32) { 148 + Luma([0u8]) 149 + } else { 150 + Luma([255u8]) 151 + } 152 + } 153 + _ => Luma([255u8]), 154 + } 155 + }); 156 + 157 + let mut png_bytes = Vec::new(); 158 + let mut cursor = std::io::Cursor::new(&mut png_bytes); 159 + img.write_to(&mut cursor, image::ImageFormat::Png) 160 + .map_err(|e| format!("PNG encode failed: {e}"))?; 161 + 162 + Ok(STANDARD.encode(&png_bytes)) 163 + }
+6
crates/tranquil-api/src/lib.rs
··· 330 330 get(admin::get_invite_codes), 331 331 ) 332 332 .route("/_admin.getServerStats", get(admin::get_server_stats)) 333 + .route("/_admin.getSignalStatus", get(admin::get_signal_status)) 334 + .route("/_admin.linkSignalDevice", post(admin::link_signal_device)) 335 + .route( 336 + "/_admin.unlinkSignalDevice", 337 + post(admin::unlink_signal_device), 338 + ) 333 339 .route("/_server.getConfig", get(admin::get_server_config)) 334 340 .route( 335 341 "/_admin.updateServerConfig",
+1 -1
crates/tranquil-api/src/server/meta.rs
··· 14 14 if cfg.telegram.bot_token.is_some() { 15 15 channels.push(CommsChannel::Telegram); 16 16 } 17 - if cfg.signal.sender_number.is_some() { 17 + if cfg.signal.enabled { 18 18 channels.push(CommsChannel::Signal); 19 19 } 20 20 channels
+2 -7
crates/tranquil-config/src/lib.rs
··· 678 678 679 679 #[derive(Debug, Config)] 680 680 pub struct SignalConfig { 681 - /// Path to the `signal-cli` binary. 682 - #[config(env = "SIGNAL_CLI_PATH", default = "/usr/local/bin/signal-cli")] 683 - pub cli_path: String, 684 - 685 - /// Sender phone number. When unset, Signal integration is disabled. 686 - #[config(env = "SIGNAL_SENDER_NUMBER")] 687 - pub sender_number: Option<String>, 681 + #[config(env = "SIGNAL_ENABLED", default = false)] 682 + pub enabled: bool, 688 683 } 689 684 690 685 #[derive(Debug, Config)]
+1
crates/tranquil-pds/Cargo.toml
··· 15 15 tranquil-auth = { workspace = true } 16 16 tranquil-oauth = { workspace = true } 17 17 tranquil-comms = { workspace = true } 18 + tranquil-signal = { workspace = true } 18 19 tranquil-db = { workspace = true } 19 20 tranquil-db-traits = { workspace = true } 20 21 tranquil-lexicon = { workspace = true, features = ["resolve"] }
+7
crates/tranquil-pds/src/state.rs
··· 60 60 pub cross_pds_oauth: Arc<CrossPdsOAuthClient>, 61 61 pub shutdown: CancellationToken, 62 62 pub bootstrap_invite_code: Option<String>, 63 + pub signal_sender: Option<Arc<tranquil_signal::SignalSlot>>, 63 64 } 64 65 65 66 #[derive(Debug, Clone, Copy)] ··· 310 311 webauthn_config, 311 312 shutdown, 312 313 bootstrap_invite_code: None, 314 + signal_sender: None, 313 315 } 314 316 } 315 317 ··· 328 330 self 329 331 } 330 332 333 + pub fn with_signal_sender(mut self, slot: Arc<tranquil_signal::SignalSlot>) -> Self { 334 + self.signal_sender = Some(slot); 335 + self 336 + } 337 + 331 338 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self { 332 339 self.circuit_breakers = Arc::new(circuit_breakers); 333 340 self
+1
crates/tranquil-server/Cargo.toml
··· 10 10 tranquil-api = { workspace = true } 11 11 tranquil-oauth-server = { workspace = true } 12 12 tranquil-config = { workspace = true } 13 + tranquil-signal = { workspace = true } 13 14 14 15 axum = { workspace = true } 15 16 clap = { workspace = true }
+18 -3
crates/tranquil-server/src/main.rs
··· 109 109 110 110 spawn_signal_handler(shutdown.clone()); 111 111 112 - let state = AppState::new(shutdown.clone()).await?; 112 + let mut state = AppState::new(shutdown.clone()).await?; 113 + 114 + let signal_sender = if tranquil_config::get().signal.enabled { 115 + let slot = Arc::new(tranquil_signal::SignalSlot::default()); 116 + state = state.with_signal_sender(slot.clone()); 117 + if let Some(client) = 118 + tranquil_signal::SignalClient::from_pool(&state.repos.pool, shutdown.clone()).await 119 + { 120 + slot.set_client(client).await; 121 + info!("Signal device already linked"); 122 + } 123 + Some(SignalSender::new(slot)) 124 + } else { 125 + None 126 + }; 127 + 113 128 tranquil_sync::listener::start_sequencer_listener(state.clone()).await; 114 129 115 130 let backfill_repo_repo = state.repo_repo.clone(); ··· 210 225 comms_service = comms_service.register_sender(telegram_sender); 211 226 } 212 227 213 - if let Some(signal_sender) = SignalSender::from_config(cfg) { 228 + if let Some(sender) = signal_sender { 214 229 info!("Signal comms enabled"); 215 - comms_service = comms_service.register_sender(signal_sender); 230 + comms_service = comms_service.register_sender(sender); 216 231 } 217 232 218 233 let comms_handle = tokio::spawn(comms_service.run(shutdown.clone()));
+4 -10
example.toml
··· 320 320 #webhook_secret = 321 321 322 322 [signal] 323 - # Path to the `signal-cli` binary. 323 + # Protocol state is stored in postgres' signal_* tables. 324 + # Link a device via the admin API before enabling. 324 325 # 325 - # Can also be specified via environment variable `SIGNAL_CLI_PATH`. 326 - # 327 - # Default value: "/usr/local/bin/signal-cli" 328 - #cli_path = "/usr/local/bin/signal-cli" 329 - 330 - # Sender phone number. When unset, Signal integration is disabled. 331 - # 332 - # Can also be specified via environment variable `SIGNAL_SENDER_NUMBER`. 333 - #sender_number = 326 + # Can also be specified via environment variable `SIGNAL_ENABLED`. 327 + #enabled = false 334 328 335 329 [notifications] 336 330 # Polling interval in milliseconds for the comms queue.

History

1 round 0 comments
sign up or login to add to the discussion
oyster.cafe submitted #0
1 commit
expand
feat(signal): add admin endpoints, config, and server wiring
expand 0 comments
pull request successfully merged