forked from
lewis.moe/bspds-sandbox
PDS software with bells & whistles you didn’t even know you needed. will move this to its own account when ready.
1use crate::appview::DidResolver;
2use crate::cache::{Cache, DistributedRateLimiter, create_cache};
3use crate::circuit_breaker::CircuitBreakers;
4use crate::config::AuthConfig;
5use crate::rate_limit::RateLimiters;
6use crate::repo::PostgresBlockStore;
7use crate::storage::{BackupStorage, BlobStorage, S3BlobStorage};
8use crate::sync::firehose::SequencedEvent;
9use sqlx::PgPool;
10use std::error::Error;
11use std::sync::Arc;
12use tokio::sync::broadcast;
13
14#[derive(Clone)]
15pub struct AppState {
16 pub db: PgPool,
17 pub block_store: PostgresBlockStore,
18 pub blob_store: Arc<dyn BlobStorage>,
19 pub backup_storage: Option<Arc<BackupStorage>>,
20 pub firehose_tx: broadcast::Sender<SequencedEvent>,
21 pub rate_limiters: Arc<RateLimiters>,
22 pub circuit_breakers: Arc<CircuitBreakers>,
23 pub cache: Arc<dyn Cache>,
24 pub distributed_rate_limiter: Arc<dyn DistributedRateLimiter>,
25 pub did_resolver: Arc<DidResolver>,
26}
27
28pub enum RateLimitKind {
29 Login,
30 AccountCreation,
31 PasswordReset,
32 ResetPassword,
33 RefreshSession,
34 OAuthToken,
35 OAuthAuthorize,
36 OAuthPar,
37 OAuthIntrospect,
38 AppPassword,
39 EmailUpdate,
40 TotpVerify,
41 HandleUpdate,
42 HandleUpdateDaily,
43 VerificationCheck,
44}
45
46impl RateLimitKind {
47 fn key_prefix(&self) -> &'static str {
48 match self {
49 Self::Login => "login",
50 Self::AccountCreation => "account_creation",
51 Self::PasswordReset => "password_reset",
52 Self::ResetPassword => "reset_password",
53 Self::RefreshSession => "refresh_session",
54 Self::OAuthToken => "oauth_token",
55 Self::OAuthAuthorize => "oauth_authorize",
56 Self::OAuthPar => "oauth_par",
57 Self::OAuthIntrospect => "oauth_introspect",
58 Self::AppPassword => "app_password",
59 Self::EmailUpdate => "email_update",
60 Self::TotpVerify => "totp_verify",
61 Self::HandleUpdate => "handle_update",
62 Self::HandleUpdateDaily => "handle_update_daily",
63 Self::VerificationCheck => "verification_check",
64 }
65 }
66
67 fn limit_and_window_ms(&self) -> (u32, u64) {
68 match self {
69 Self::Login => (10, 60_000),
70 Self::AccountCreation => (10, 3_600_000),
71 Self::PasswordReset => (5, 3_600_000),
72 Self::ResetPassword => (10, 60_000),
73 Self::RefreshSession => (60, 60_000),
74 Self::OAuthToken => (30, 60_000),
75 Self::OAuthAuthorize => (10, 60_000),
76 Self::OAuthPar => (30, 60_000),
77 Self::OAuthIntrospect => (30, 60_000),
78 Self::AppPassword => (10, 60_000),
79 Self::EmailUpdate => (5, 3_600_000),
80 Self::TotpVerify => (5, 300_000),
81 Self::HandleUpdate => (10, 300_000),
82 Self::HandleUpdateDaily => (50, 86_400_000),
83 Self::VerificationCheck => (60, 60_000),
84 }
85 }
86}
87
88impl AppState {
89 pub async fn new() -> Result<Self, Box<dyn Error>> {
90 let database_url = std::env::var("DATABASE_URL")
91 .map_err(|_| "DATABASE_URL environment variable must be set")?;
92
93 let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS")
94 .ok()
95 .and_then(|v| v.parse().ok())
96 .unwrap_or(100);
97
98 let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS")
99 .ok()
100 .and_then(|v| v.parse().ok())
101 .unwrap_or(10);
102
103 let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS")
104 .ok()
105 .and_then(|v| v.parse().ok())
106 .unwrap_or(10);
107
108 tracing::info!(
109 "Configuring database pool: max={}, min={}, acquire_timeout={}s",
110 max_connections,
111 min_connections,
112 acquire_timeout_secs
113 );
114
115 let db = sqlx::postgres::PgPoolOptions::new()
116 .max_connections(max_connections)
117 .min_connections(min_connections)
118 .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs))
119 .idle_timeout(std::time::Duration::from_secs(300))
120 .max_lifetime(std::time::Duration::from_secs(1800))
121 .connect(&database_url)
122 .await
123 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?;
124
125 sqlx::migrate!("./migrations")
126 .run(&db)
127 .await
128 .map_err(|e| format!("Failed to run migrations: {}", e))?;
129
130 Ok(Self::from_db(db).await)
131 }
132
133 pub async fn from_db(db: PgPool) -> Self {
134 AuthConfig::init();
135
136 let block_store = PostgresBlockStore::new(db.clone());
137 let blob_store = S3BlobStorage::new().await;
138 let backup_storage = BackupStorage::new().await.map(Arc::new);
139
140 let firehose_buffer_size: usize = std::env::var("FIREHOSE_BUFFER_SIZE")
141 .ok()
142 .and_then(|v| v.parse().ok())
143 .unwrap_or(10000);
144
145 let (firehose_tx, _) = broadcast::channel(firehose_buffer_size);
146 let rate_limiters = Arc::new(RateLimiters::new());
147 let circuit_breakers = Arc::new(CircuitBreakers::new());
148 let (cache, distributed_rate_limiter) = create_cache().await;
149 let did_resolver = Arc::new(DidResolver::new());
150
151 Self {
152 db,
153 block_store,
154 blob_store: Arc::new(blob_store),
155 backup_storage,
156 firehose_tx,
157 rate_limiters,
158 circuit_breakers,
159 cache,
160 distributed_rate_limiter,
161 did_resolver,
162 }
163 }
164
165 pub fn with_rate_limiters(mut self, rate_limiters: RateLimiters) -> Self {
166 self.rate_limiters = Arc::new(rate_limiters);
167 self
168 }
169
170 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self {
171 self.circuit_breakers = Arc::new(circuit_breakers);
172 self
173 }
174
175 pub async fn check_rate_limit(&self, kind: RateLimitKind, client_ip: &str) -> bool {
176 if std::env::var("DISABLE_RATE_LIMITING").is_ok() {
177 return true;
178 }
179
180 let key = format!("{}:{}", kind.key_prefix(), client_ip);
181 let limiter_name = kind.key_prefix();
182 let (limit, window_ms) = kind.limit_and_window_ms();
183
184 if !self
185 .distributed_rate_limiter
186 .check_rate_limit(&key, limit, window_ms)
187 .await
188 {
189 crate::metrics::record_rate_limit_rejection(limiter_name);
190 return false;
191 }
192
193 let limiter = match kind {
194 RateLimitKind::Login => &self.rate_limiters.login,
195 RateLimitKind::AccountCreation => &self.rate_limiters.account_creation,
196 RateLimitKind::PasswordReset => &self.rate_limiters.password_reset,
197 RateLimitKind::ResetPassword => &self.rate_limiters.reset_password,
198 RateLimitKind::RefreshSession => &self.rate_limiters.refresh_session,
199 RateLimitKind::OAuthToken => &self.rate_limiters.oauth_token,
200 RateLimitKind::OAuthAuthorize => &self.rate_limiters.oauth_authorize,
201 RateLimitKind::OAuthPar => &self.rate_limiters.oauth_par,
202 RateLimitKind::OAuthIntrospect => &self.rate_limiters.oauth_introspect,
203 RateLimitKind::AppPassword => &self.rate_limiters.app_password,
204 RateLimitKind::EmailUpdate => &self.rate_limiters.email_update,
205 RateLimitKind::TotpVerify => &self.rate_limiters.totp_verify,
206 RateLimitKind::HandleUpdate => &self.rate_limiters.handle_update,
207 RateLimitKind::HandleUpdateDaily => &self.rate_limiters.handle_update_daily,
208 RateLimitKind::VerificationCheck => &self.rate_limiters.verification_check,
209 };
210
211 let ok = limiter.check_key(&client_ip.to_string()).is_ok();
212 if !ok {
213 crate::metrics::record_rate_limit_rejection(limiter_name);
214 }
215 ok
216 }
217}