Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place
96
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 37e32fdd98000a2ed7935966510a4d94aca0b940 623 lines 24 kB view raw
1use crate::pull::pull_site; 2use crate::redirects::{load_redirect_rules, match_redirect_rule, RedirectRule}; 3use crate::place_wisp::settings::Settings; 4use axum::{ 5 Router, 6 extract::Request, 7 response::{Response, IntoResponse, Redirect}, 8 http::{StatusCode, Uri, header}, 9 body::Body, 10}; 11use jacquard::CowStr; 12use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 13use jacquard::api::com_atproto::repo::get_record::GetRecord; 14use jacquard_common::types::string::Did; 15use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient, XrpcExt}; 16use jacquard_common::IntoStatic; 17use jacquard_common::types::value::from_data; 18use miette::IntoDiagnostic; 19use n0_future::StreamExt; 20use std::collections::HashMap; 21use std::path::{PathBuf, Path}; 22use std::sync::Arc; 23use tokio::sync::RwLock; 24use tower::Service; 25use tower_http::compression::CompressionLayer; 26use tower_http::services::ServeDir; 27 28/// Shared state for the server 29#[derive(Clone)] 30struct ServerState { 31 did: CowStr<'static>, 32 rkey: CowStr<'static>, 33 output_dir: PathBuf, 34 last_cid: Arc<RwLock<Option<String>>>, 35 redirect_rules: Arc<RwLock<Vec<RedirectRule>>>, 36 settings: Arc<RwLock<Option<Settings<'static>>>>, 37} 38 39/// Fetch settings for a site from the PDS 40async fn fetch_settings( 41 pds_url: &url::Url, 42 did: &Did<'_>, 43 rkey: &str, 44) -> miette::Result<Option<Settings<'static>>> { 45 use jacquard_common::types::ident::AtIdentifier; 46 use jacquard_common::types::string::{Rkey as RkeyType, RecordKey}; 47 48 let client = reqwest::Client::new(); 49 let rkey_parsed = RkeyType::new(rkey).into_diagnostic()?; 50 51 let request = GetRecord::new() 52 .repo(AtIdentifier::Did(did.clone())) 53 .collection(CowStr::from("place.wisp.settings")) 54 .rkey(RecordKey::from(rkey_parsed)) 55 .build(); 56 57 match client.xrpc(pds_url.clone()).send(&request).await { 58 Ok(response) => { 59 let output = response.into_output().into_diagnostic()?; 60 61 // Parse the record value as Settings 62 match from_data::<Settings>(&output.value) { 63 Ok(settings) => { 64 Ok(Some(settings.into_static())) 65 } 66 Err(_) => { 67 // Settings record exists but couldn't parse - use defaults 68 Ok(None) 69 } 70 } 71 } 72 Err(_) => { 73 // Settings record doesn't exist 74 Ok(None) 75 } 76 } 77} 78 79/// Serve a site locally with real-time firehose updates 80pub async fn serve_site( 81 input: CowStr<'static>, 82 rkey: CowStr<'static>, 83 output_dir: PathBuf, 84 port: u16, 85) -> miette::Result<()> { 86 println!("Serving site {} from {} on port {}...", rkey, input, port); 87 88 // Resolve handle to DID if needed 89 use jacquard_identity::PublicResolver; 90 use jacquard::prelude::IdentityResolver; 91 92 let resolver = PublicResolver::default(); 93 let did = if input.starts_with("did:") { 94 Did::new(&input).into_diagnostic()? 95 } else { 96 // It's a handle, resolve it 97 let handle = jacquard_common::types::string::Handle::new(&input).into_diagnostic()?; 98 resolver.resolve_handle(&handle).await.into_diagnostic()? 99 }; 100 101 println!("Resolved to DID: {}", did.as_str()); 102 103 // Resolve PDS URL (needed for settings fetch) 104 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?; 105 106 // Create output directory if it doesn't exist 107 std::fs::create_dir_all(&output_dir).into_diagnostic()?; 108 109 // Initial pull of the site 110 println!("Performing initial pull..."); 111 let did_str = CowStr::from(did.as_str().to_string()); 112 pull_site(did_str.clone(), rkey.clone(), output_dir.clone()).await?; 113 114 // Fetch settings 115 let settings = fetch_settings(&pds_url, &did, rkey.as_ref()).await?; 116 if let Some(ref s) = settings { 117 println!("\nSettings loaded:"); 118 if let Some(true) = s.directory_listing { 119 println!(" • Directory listing: enabled"); 120 } 121 if let Some(ref spa_file) = s.spa_mode { 122 println!(" • SPA mode: enabled ({})", spa_file); 123 } 124 if let Some(ref custom404) = s.custom404 { 125 println!(" • Custom 404: {}", custom404); 126 } 127 } else { 128 println!("No settings configured (using defaults)"); 129 } 130 131 // Load redirect rules 132 let redirect_rules = load_redirect_rules(&output_dir); 133 if !redirect_rules.is_empty() { 134 println!("Loaded {} redirect rules from _redirects", redirect_rules.len()); 135 } 136 137 // Create shared state 138 let state = ServerState { 139 did: did_str.clone(), 140 rkey: rkey.clone(), 141 output_dir: output_dir.clone(), 142 last_cid: Arc::new(RwLock::new(None)), 143 redirect_rules: Arc::new(RwLock::new(redirect_rules)), 144 settings: Arc::new(RwLock::new(settings)), 145 }; 146 147 // Start firehose listener in background 148 let firehose_state = state.clone(); 149 tokio::spawn(async move { 150 if let Err(e) = watch_firehose(firehose_state).await { 151 eprintln!("Firehose error: {}", e); 152 } 153 }); 154 155 // Create HTTP server with gzip compression and redirect handling 156 let serve_dir = ServeDir::new(&output_dir).precompressed_gzip(); 157 158 let app = Router::new() 159 .fallback(move |req: Request| { 160 let state = state.clone(); 161 let mut serve_dir = serve_dir.clone(); 162 async move { 163 handle_request_with_redirects(req, state, &mut serve_dir).await 164 } 165 }) 166 .layer(CompressionLayer::new()); 167 168 let addr = format!("0.0.0.0:{}", port); 169 let listener = tokio::net::TcpListener::bind(&addr) 170 .await 171 .into_diagnostic()?; 172 173 println!("\n✓ Server running at http://localhost:{}", port); 174 println!(" Watching for updates on the firehose...\n"); 175 176 axum::serve(listener, app).await.into_diagnostic()?; 177 178 Ok(()) 179} 180 181/// Serve a file for SPA mode 182async fn serve_file_for_spa(output_dir: &Path, spa_file: &str) -> Response { 183 let file_path = output_dir.join(spa_file.trim_start_matches('/')); 184 185 match tokio::fs::read(&file_path).await { 186 Ok(contents) => { 187 Response::builder() 188 .status(StatusCode::OK) 189 .header(header::CONTENT_TYPE, "text/html; charset=utf-8") 190 .body(Body::from(contents)) 191 .unwrap() 192 } 193 Err(_) => { 194 StatusCode::NOT_FOUND.into_response() 195 } 196 } 197} 198 199/// Serve custom 404 page 200async fn serve_custom_404(output_dir: &Path, custom404_file: &str) -> Response { 201 let file_path = output_dir.join(custom404_file.trim_start_matches('/')); 202 203 match tokio::fs::read(&file_path).await { 204 Ok(contents) => { 205 Response::builder() 206 .status(StatusCode::NOT_FOUND) 207 .header(header::CONTENT_TYPE, "text/html; charset=utf-8") 208 .body(Body::from(contents)) 209 .unwrap() 210 } 211 Err(_) => { 212 StatusCode::NOT_FOUND.into_response() 213 } 214 } 215} 216 217/// Serve directory listing 218async fn serve_directory_listing(dir_path: &Path, url_path: &str) -> Response { 219 match tokio::fs::read_dir(dir_path).await { 220 Ok(mut entries) => { 221 let mut html = String::from("<!DOCTYPE html><html><head><meta charset='utf-8'><title>Directory listing</title>"); 222 html.push_str("<style>body{font-family:sans-serif;margin:2em}a{display:block;padding:0.5em;text-decoration:none;color:#0066cc}a:hover{background:#f0f0f0}</style>"); 223 html.push_str("</head><body>"); 224 html.push_str(&format!("<h1>Index of {}</h1>", url_path)); 225 html.push_str("<hr>"); 226 227 // Add parent directory link if not at root 228 if url_path != "/" { 229 let parent = if url_path.ends_with('/') { 230 format!("{}../", url_path) 231 } else { 232 format!("{}/", url_path.rsplitn(2, '/').nth(1).unwrap_or("/")) 233 }; 234 html.push_str(&format!("<a href='{}'>../</a>", parent)); 235 } 236 237 let mut items = Vec::new(); 238 while let Ok(Some(entry)) = entries.next_entry().await { 239 if let Ok(name) = entry.file_name().into_string() { 240 let is_dir = entry.path().is_dir(); 241 let display_name = if is_dir { 242 format!("{}/", name) 243 } else { 244 name.clone() 245 }; 246 247 let link_path = if url_path.ends_with('/') { 248 format!("{}{}", url_path, name) 249 } else { 250 format!("{}/{}", url_path, name) 251 }; 252 253 items.push((display_name, link_path, is_dir)); 254 } 255 } 256 257 // Sort: directories first, then alphabetically 258 items.sort_by(|a, b| { 259 match (a.2, b.2) { 260 (true, false) => std::cmp::Ordering::Less, 261 (false, true) => std::cmp::Ordering::Greater, 262 _ => a.0.cmp(&b.0), 263 } 264 }); 265 266 for (display_name, link_path, _) in items { 267 html.push_str(&format!("<a href='{}'>{}</a>", link_path, display_name)); 268 } 269 270 html.push_str("</body></html>"); 271 272 Response::builder() 273 .status(StatusCode::OK) 274 .header(header::CONTENT_TYPE, "text/html; charset=utf-8") 275 .body(Body::from(html)) 276 .unwrap() 277 } 278 Err(_) => { 279 StatusCode::NOT_FOUND.into_response() 280 } 281 } 282} 283 284/// Handle a request with redirect and settings support 285async fn handle_request_with_redirects( 286 req: Request, 287 state: ServerState, 288 serve_dir: &mut ServeDir, 289) -> Response { 290 let uri = req.uri().clone(); 291 let path = uri.path(); 292 let method = req.method().clone(); 293 294 // Parse query parameters 295 let query_params = uri.query().map(|q| { 296 let mut params = HashMap::new(); 297 for pair in q.split('&') { 298 if let Some((key, value)) = pair.split_once('=') { 299 params.insert(key.to_string(), value.to_string()); 300 } 301 } 302 params 303 }); 304 305 // Get settings 306 let settings = state.settings.read().await.clone(); 307 308 // Check for redirect rules first 309 let redirect_rules = state.redirect_rules.read().await; 310 if let Some(redirect_match) = match_redirect_rule(path, &redirect_rules, query_params.as_ref()) { 311 let is_force = redirect_match.force; 312 drop(redirect_rules); // Release the lock 313 314 // If not forced, check if the file exists first 315 if !is_force { 316 // Try to serve the file normally first 317 let test_req = Request::builder() 318 .uri(uri.clone()) 319 .method(&method) 320 .body(axum::body::Body::empty()) 321 .unwrap(); 322 323 match serve_dir.call(test_req).await { 324 Ok(response) if response.status().is_success() => { 325 // File exists and was served successfully, return it 326 return response.into_response(); 327 } 328 _ => { 329 // File doesn't exist or error, apply redirect 330 } 331 } 332 } 333 334 // Handle different status codes 335 match redirect_match.status { 336 200 => { 337 // Rewrite: serve the target file but keep the URL the same 338 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() { 339 let new_req = Request::builder() 340 .uri(target_uri) 341 .method(&method) 342 .body(axum::body::Body::empty()) 343 .unwrap(); 344 345 match serve_dir.call(new_req).await { 346 Ok(response) => response.into_response(), 347 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), 348 } 349 } else { 350 StatusCode::INTERNAL_SERVER_ERROR.into_response() 351 } 352 } 353 301 => { 354 // Permanent redirect 355 Redirect::permanent(&redirect_match.target_path).into_response() 356 } 357 302 => { 358 // Temporary redirect 359 Redirect::temporary(&redirect_match.target_path).into_response() 360 } 361 404 => { 362 // Custom 404 page 363 if let Ok(target_uri) = redirect_match.target_path.parse::<Uri>() { 364 let new_req = Request::builder() 365 .uri(target_uri) 366 .method(&method) 367 .body(axum::body::Body::empty()) 368 .unwrap(); 369 370 match serve_dir.call(new_req).await { 371 Ok(mut response) => { 372 *response.status_mut() = StatusCode::NOT_FOUND; 373 response.into_response() 374 } 375 Err(_) => StatusCode::NOT_FOUND.into_response(), 376 } 377 } else { 378 StatusCode::NOT_FOUND.into_response() 379 } 380 } 381 _ => { 382 // Unsupported status code, fall through to normal serving 383 match serve_dir.call(req).await { 384 Ok(response) => response.into_response(), 385 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), 386 } 387 } 388 } 389 } else { 390 drop(redirect_rules); 391 392 // No redirect match, try to serve the file 393 let response_result = serve_dir.call(req).await; 394 395 match response_result { 396 Ok(response) if response.status().is_success() => { 397 // File served successfully 398 response.into_response() 399 } 400 Ok(response) if response.status() == StatusCode::NOT_FOUND => { 401 // File not found, check settings for fallback behavior 402 if let Some(ref settings) = settings { 403 // SPA mode takes precedence 404 if let Some(ref spa_file) = settings.spa_mode { 405 // Serve the SPA file for all non-file routes 406 return serve_file_for_spa(&state.output_dir, spa_file.as_ref()).await; 407 } 408 409 // Check if path is a directory and directory listing is enabled 410 if let Some(true) = settings.directory_listing { 411 let file_path = state.output_dir.join(path.trim_start_matches('/')); 412 if file_path.is_dir() { 413 return serve_directory_listing(&file_path, path).await; 414 } 415 } 416 417 // Check for custom 404 418 if let Some(ref custom404) = settings.custom404 { 419 return serve_custom_404(&state.output_dir, custom404.as_ref()).await; 420 } 421 } 422 423 // No special handling, return 404 424 StatusCode::NOT_FOUND.into_response() 425 } 426 Ok(response) => response.into_response(), 427 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), 428 } 429 } 430} 431 432/// Watch the firehose for updates to the specific site 433fn watch_firehose(state: ServerState) -> std::pin::Pin<Box<dyn std::future::Future<Output = miette::Result<()>> + Send>> { 434 Box::pin(async move { 435 use jacquard_identity::PublicResolver; 436 use jacquard::prelude::IdentityResolver; 437 438 // Resolve DID to PDS URL 439 let resolver = PublicResolver::default(); 440 let did = Did::new(&state.did).into_diagnostic()?; 441 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?; 442 443 println!("[PDS] Resolved DID to PDS: {}", pds_url); 444 445 // Convert HTTP(S) URL to WebSocket URL 446 let mut ws_url = pds_url.clone(); 447 let scheme = if pds_url.scheme() == "https" { "wss" } else { "ws" }; 448 ws_url.set_scheme(scheme) 449 .map_err(|_| miette::miette!("Failed to set WebSocket scheme"))?; 450 451 println!("[PDS] Connecting to {}...", ws_url); 452 453 // Create subscription client 454 let client = TungsteniteSubscriptionClient::from_base_uri(ws_url); 455 456 // Subscribe to the PDS firehose 457 let params = SubscribeRepos::new().build(); 458 459 let stream = client.subscribe(&params).await.into_diagnostic()?; 460 println!("[PDS] Connected! Watching for updates..."); 461 462 // Convert to typed message stream 463 let (_sink, mut messages) = stream.into_stream(); 464 465 loop { 466 match messages.next().await { 467 Some(Ok(msg)) => { 468 if let Err(e) = handle_firehose_message(&state, msg).await { 469 eprintln!("[PDS] Error handling message: {}", e); 470 } 471 } 472 Some(Err(e)) => { 473 eprintln!("[PDS] Stream error: {}", e); 474 // Try to reconnect after a delay 475 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 476 return Box::pin(watch_firehose(state)).await; 477 } 478 None => { 479 println!("[PDS] Stream ended, reconnecting..."); 480 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 481 return Box::pin(watch_firehose(state)).await; 482 } 483 } 484 } 485 }) 486} 487 488/// Handle a firehose message 489async fn handle_firehose_message<'a>( 490 state: &ServerState, 491 msg: SubscribeReposMessage<'a>, 492) -> miette::Result<()> { 493 match msg { 494 SubscribeReposMessage::Commit(commit_msg) => { 495 // Check if this commit is from our DID 496 if commit_msg.repo.as_str() != state.did.as_str() { 497 return Ok(()); 498 } 499 500 // Check if any operation affects our site or settings 501 let site_path = format!("place.wisp.fs/{}", state.rkey); 502 let settings_path = format!("place.wisp.settings/{}", state.rkey); 503 let has_site_update = commit_msg.ops.iter().any(|op| op.path.as_ref() == site_path); 504 let has_settings_update = commit_msg.ops.iter().any(|op| op.path.as_ref() == settings_path); 505 506 if has_site_update { 507 // Debug: log all operations for this commit 508 println!("[Debug] Commit has {} ops for {}", commit_msg.ops.len(), state.rkey); 509 for op in &commit_msg.ops { 510 if op.path.as_ref() == site_path { 511 println!("[Debug] - {} {}", op.action.as_ref(), op.path.as_ref()); 512 } 513 } 514 } 515 516 if has_site_update { 517 // Use the commit CID as the version tracker 518 let commit_cid = commit_msg.commit.to_string(); 519 520 // Check if this is a new commit 521 let should_update = { 522 let last_cid = state.last_cid.read().await; 523 Some(commit_cid.clone()) != *last_cid 524 }; 525 526 if should_update { 527 // Check operation types 528 let has_create_or_update = commit_msg.ops.iter().any(|op| { 529 op.path.as_ref() == site_path && 530 (op.action.as_ref() == "create" || op.action.as_ref() == "update") 531 }); 532 let has_delete = commit_msg.ops.iter().any(|op| { 533 op.path.as_ref() == site_path && op.action.as_ref() == "delete" 534 }); 535 536 // If there's a create/update, pull the site (even if there's also a delete in the same commit) 537 if has_create_or_update { 538 println!("\n[Update] Detected change to site {} (commit: {})", state.rkey, commit_cid); 539 println!("[Update] Pulling latest version..."); 540 541 // Pull the updated site 542 match pull_site( 543 state.did.clone(), 544 state.rkey.clone(), 545 state.output_dir.clone(), 546 ) 547 .await 548 { 549 Ok(_) => { 550 // Update last CID 551 let mut last_cid = state.last_cid.write().await; 552 *last_cid = Some(commit_cid); 553 554 // Reload redirect rules 555 let new_redirect_rules = load_redirect_rules(&state.output_dir); 556 let mut redirect_rules = state.redirect_rules.write().await; 557 *redirect_rules = new_redirect_rules; 558 559 println!("[Update] ✓ Site updated successfully!\n"); 560 } 561 Err(e) => { 562 eprintln!("[Update] Failed to pull site: {}", e); 563 } 564 } 565 } else if has_delete { 566 // Only a delete, no create/update 567 println!("\n[Update] Site {} was deleted", state.rkey); 568 569 // Update last CID so we don't process this commit again 570 let mut last_cid = state.last_cid.write().await; 571 *last_cid = Some(commit_cid); 572 } 573 } 574 } 575 576 // Handle settings updates 577 if has_settings_update { 578 println!("\n[Settings] Detected change to settings"); 579 580 // Resolve PDS URL 581 use jacquard_identity::PublicResolver; 582 use jacquard::prelude::IdentityResolver; 583 584 let resolver = PublicResolver::default(); 585 let did = Did::new(&state.did).into_diagnostic()?; 586 let pds_url = resolver.pds_for_did(&did).await.into_diagnostic()?; 587 588 // Fetch updated settings 589 match fetch_settings(&pds_url, &did, state.rkey.as_ref()).await { 590 Ok(new_settings) => { 591 let mut settings = state.settings.write().await; 592 *settings = new_settings.clone(); 593 drop(settings); 594 595 if let Some(ref s) = new_settings { 596 println!("[Settings] Updated:"); 597 if let Some(true) = s.directory_listing { 598 println!(" • Directory listing: enabled"); 599 } 600 if let Some(ref spa_file) = s.spa_mode { 601 println!(" • SPA mode: enabled ({})", spa_file); 602 } 603 if let Some(ref custom404) = s.custom404 { 604 println!(" • Custom 404: {}", custom404); 605 } 606 } else { 607 println!("[Settings] Cleared (using defaults)"); 608 } 609 } 610 Err(e) => { 611 eprintln!("[Settings] Failed to fetch updated settings: {}", e); 612 } 613 } 614 } 615 } 616 _ => { 617 // Ignore identity and account messages 618 } 619 } 620 621 Ok(()) 622} 623