Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
pdsmoover.com
pds
atproto
migrations
moo
cow
1mod handlers;
2
3use crate::handlers::well_known::{
4 ServiceDID, ServiceDocument, ServiceKey, build_service_document, handle_wellknown_did_web,
5};
6use crate::handlers::xrpc::com_atproto_sync::atproto_routes;
7use crate::handlers::xrpc::com_pdsmoover_admin_handlers::admin_routes;
8use crate::handlers::xrpc::com_pdsmoover_backup_handlers::backup_routes;
9use atproto_identity::key::to_public;
10use axum::http::header;
11use axum::{Router, routing::get};
12use chrono::{DateTime, Utc};
13use dotenvy::dotenv;
14use jacquard_identity::PublicResolver;
15use s3::creds::Credentials;
16use s3::{Bucket, Region};
17use shared::db::Db;
18use std::env;
19use std::net::SocketAddr;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::net::TcpListener;
23use tokio::sync::RwLock;
24use tower_governor::GovernorLayer;
25use tower_governor::governor::GovernorConfigBuilder;
26use tower_governor::key_extractor::SmartIpKeyExtractor;
27use tower_http::cors::{Any, CorsLayer};
28use tracing::info;
29use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
30
31#[derive(Clone)]
32struct AppState {
33 db: Db,
34 public_resolver: Arc<PublicResolver>,
35 service_document: ServiceDocument,
36 did_web: ServiceDID,
37 admin_token: Option<String>,
38 s3_bucket: Arc<Box<s3::Bucket>>,
39 describe_server_cache: Arc<RwLock<Option<(shared::db::DescribeServerRow, DateTime<Utc>)>>>,
40}
41
42#[tokio::main]
43async fn main() -> anyhow::Result<()> {
44 // Load environment variables from .env if present
45 let _ = dotenv();
46
47 let _prod = std::env::var("PROD")
48 .unwrap_or_else(|_| "false".to_string())
49 .parse::<bool>()
50 .unwrap_or(false);
51
52 // Initialize tracing subscriber with env filter (RUST_LOG) and pretty formatter
53 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
54 tracing_subscriber::registry()
55 .with(env_filter)
56 .with(fmt::layer().compact())
57 .init();
58
59 // Initialize DB
60 let database_url = std::env::var("DATABASE_URL")
61 .expect("DATABASE_URL must be set, e.g. postgres://user:pass@localhost:5432/dbname");
62 let db = Db::connect(&database_url).await?;
63 db.apply_migrations().await?;
64
65 //did:web/atproto service setup
66 let external_domain = env::var("EXTERNAL_DOMAIN")?;
67 let service_key_string = env::var("SERVICE_KEY")?;
68 let service_did = ServiceDID(format!("did:web:{}", external_domain));
69 let service_key: ServiceKey = service_key_string.try_into()?;
70
71 let public_service_key = to_public(&service_key.0)
72 .map(|public_key_data| public_key_data.to_string())
73 .expect("public service key");
74 let service_document = build_service_document(&*external_domain, &public_service_key);
75
76 let resolver = PublicResolver::default();
77
78 // Admin token used for temporary admin Basic auth
79 // If not set, admin endpoints will not be added to the router
80 let admin_password = env::var("ADMIN_PASSWORD").ok();
81
82 // S3 setup for serving backups
83 let region_name = env::var("S3_REGION")?;
84 let endpoint = env::var("S3_ENDPOINT")?;
85 let region = Region::Custom {
86 region: region_name,
87 endpoint,
88 };
89 let bucket = Bucket::new(
90 env::var("S3_BUCKET_NAME")?.as_str(),
91 region,
92 Credentials::new(
93 Some(env::var("S3_ACCESS_KEY")?.as_str()),
94 Some(env::var("S3_SECRET_KEY")?.as_str()),
95 None,
96 None,
97 None,
98 )?,
99 )?;
100
101 let state = AppState {
102 db,
103 service_document,
104 public_resolver: Arc::new(resolver),
105 did_web: service_did,
106 admin_token: admin_password.clone(),
107 s3_bucket: Arc::new(bucket),
108 describe_server_cache: Arc::new(RwLock::new(None)),
109 };
110
111 // Build Axum router
112 let mut app = Router::new()
113 .route("/", get(root_handler))
114 //XRPC Routes
115 .merge(backup_routes(state.clone()))
116 .merge(atproto_routes(state.clone()))
117 //Other routes
118 .route("/.well-known/did.json", get(handle_wellknown_did_web));
119
120 // Conditionally add admin endpoints only if ADMIN_PASSWORD is set
121 if state.admin_token.is_some() {
122 // Basic rate limiting for admin endpoints.
123 // Adjust per_second/burst_size as needed.
124 let governor_conf = GovernorConfigBuilder::default()
125 .per_second(60)
126 .burst_size(5)
127 .key_extractor(SmartIpKeyExtractor)
128 .finish()
129 .expect("valid governor config");
130 let limiter = governor_conf.limiter().clone();
131
132 let interval = Duration::from_secs(60);
133 // a separate background task to clean up
134 std::thread::spawn(move || {
135 loop {
136 limiter.retain_recent();
137 std::thread::sleep(interval);
138 }
139 });
140 let governor_layer = GovernorLayer::new(Arc::new(governor_conf));
141
142 app = app.merge(admin_routes(state.clone()).layer(governor_layer));
143 }
144
145 //CORS
146 let cors = CorsLayer::new()
147 .allow_methods(Any)
148 .allow_origin(Any)
149 .allow_headers(Any);
150
151 // Finalize with state
152 let app = app.layer(cors).with_state(state);
153
154 // Read PORT from env or default to 3000
155 let port: u16 = std::env::var("PORT")
156 .ok()
157 .and_then(|s| s.parse().ok())
158 .unwrap_or(3000);
159 let addr: SocketAddr = ([0, 0, 0, 0], port).into();
160
161 info!(%addr, "starting web server");
162
163 let listener = TcpListener::bind(addr).await?;
164 axum::serve(listener, app).await?;
165
166 Ok(())
167}
168
169async fn root_handler() -> impl axum::response::IntoResponse {
170 let body = r"
171.---. .---. .--.
172: .; :: . :: .--'
173: _.': :: :`. `.
174: : : :; : _`, :
175:_; :___.'`.__.'
176
177
178.-..-. .--. .--.
179: `' :: ,. :: ,. :
180: .. :: :: :: :: :.-..-. .--. .--.
181: :; :: :; :: :; :: `; :' '_.': ..'
182:_;:_;`.__.'`.__.'`.__.'`.__.':_;
183
184 ";
185
186 let intro = "\n\nThis is a PDS MOOver xrpc service\n\nCode: https://tangled.sh/@baileytownsend.dev/pds-moover\n";
187
188 let banner = format!(" {body}\n{intro}");
189
190 (
191 [(header::CONTENT_TYPE, "text/plain; charset=utf-8")],
192 banner,
193 )
194}