at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 188 lines 6.0 kB view raw
1use scc::HashMap; 2use std::future::Future; 3use std::sync::Arc; 4use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; 5use std::time::Duration; 6use tokio::sync::{Notify, Semaphore, SemaphorePermit}; 7use url::Url; 8 9/// max concurrent in-flight requests per PDS before we start queuing 10/// ref pds allows 10 requests per second... so 10 should be fine 11const PER_PDS_CONCURRENCY: usize = 10; 12 13#[derive(Clone)] 14pub struct Throttler { 15 states: Arc<HashMap<Url, Arc<State>>>, 16} 17 18impl Throttler { 19 pub fn new() -> Self { 20 Self { 21 states: Arc::new(HashMap::new()), 22 } 23 } 24 25 pub async fn get_handle(&self, url: &Url) -> ThrottleHandle { 26 let state = self 27 .states 28 .entry_async(url.clone()) 29 .await 30 .or_insert_with(|| Arc::new(State::new())) 31 .get() 32 .clone(); 33 34 ThrottleHandle { state } 35 } 36 37 /// drop entries with no active throttle and no consecutive failures. 38 pub async fn evict_clean(&self) { 39 self.states 40 .retain_async(|_, v| { 41 v.throttled_until.load(Ordering::Acquire) != 0 42 || v.consecutive_failures.load(Ordering::Acquire) != 0 43 }) 44 .await; 45 } 46} 47 48struct State { 49 throttled_until: AtomicI64, 50 consecutive_failures: AtomicUsize, 51 consecutive_timeouts: AtomicUsize, 52 /// only fires on hard failures (timeout, TLS, bad gateway, etc). 53 /// ratelimits do NOT fire this — they just store `throttled_until` and 54 /// let tasks exit naturally, deferring to the background retry loop. 55 failure_notify: Notify, 56 semaphore: Semaphore, 57} 58 59impl State { 60 fn new() -> Self { 61 Self { 62 throttled_until: AtomicI64::new(0), 63 consecutive_failures: AtomicUsize::new(0), 64 consecutive_timeouts: AtomicUsize::new(0), 65 failure_notify: Notify::new(), 66 semaphore: Semaphore::new(PER_PDS_CONCURRENCY), 67 } 68 } 69} 70 71pub struct ThrottleHandle { 72 state: Arc<State>, 73} 74 75impl ThrottleHandle { 76 pub fn is_throttled(&self) -> bool { 77 let until = self.state.throttled_until.load(Ordering::Acquire); 78 until != 0 && chrono::Utc::now().timestamp() < until 79 } 80 81 /// the unix timestamp at which this throttle expires (0 if not throttled). 82 pub fn throttled_until(&self) -> i64 { 83 self.state.throttled_until.load(Ordering::Acquire) 84 } 85 86 pub fn record_success(&self) { 87 self.state.consecutive_failures.store(0, Ordering::Release); 88 self.state.consecutive_timeouts.store(0, Ordering::Release); 89 self.state.throttled_until.store(0, Ordering::Release); 90 } 91 92 /// called on a 429 response. `retry_after_secs` comes from the `Retry-After` 93 /// header if present; falls back to 60s. uses `fetch_max` so concurrent callers 94 /// don't race each other back to a shorter window. 95 /// 96 /// deliberately does NOT notify waiters — 429s are soft and tasks should exit 97 /// naturally via the `Retry` result rather than being cancelled. 98 pub fn record_ratelimit(&self, retry_after_secs: Option<u64>) { 99 let secs = retry_after_secs.unwrap_or(60) as i64; 100 let until = chrono::Utc::now().timestamp() + secs; 101 self.state 102 .throttled_until 103 .fetch_max(until, Ordering::AcqRel); 104 } 105 106 /// called on hard failures (timeout, TLS error, bad gateway, etc). 107 /// returns throttle duration in minutes if this is a *new* throttle, 108 /// and notifies all in-flight tasks to cancel immediately. 109 pub fn record_failure(&self) -> Option<i64> { 110 if self.is_throttled() { 111 return None; 112 } 113 114 let failures = self 115 .state 116 .consecutive_failures 117 .fetch_add(1, Ordering::AcqRel) 118 + 1; 119 120 // 30 min, 60 min, 120 min, ... capped at ~512 hours 121 let base_minutes = 30i64; 122 let exponent = (failures as u32).saturating_sub(1); 123 let minutes = base_minutes * 2i64.pow(exponent.min(10)); 124 let until = chrono::Utc::now().timestamp() + minutes * 60; 125 126 self.state.throttled_until.store(until, Ordering::Release); 127 self.state.failure_notify.notify_waiters(); 128 129 Some(minutes) 130 } 131 132 /// returns current timeout duration — 3s, 6s, or 12s depending on prior timeouts. 133 pub fn timeout(&self) -> Duration { 134 let n = self.state.consecutive_timeouts.load(Ordering::Acquire); 135 Duration::from_secs(3 * 2u64.pow(n.min(2) as u32)) 136 } 137 138 pub fn record_timeout(&self) -> bool { 139 let timeouts = self 140 .state 141 .consecutive_timeouts 142 .fetch_add(1, Ordering::AcqRel) 143 + 1; 144 timeouts > 2 145 } 146 147 /// acquire a concurrency slot for this PDS. hold the returned permit 148 /// for the duration of the request. 149 pub async fn acquire(&self) -> SemaphorePermit<'_> { 150 self.state 151 .semaphore 152 .acquire() 153 .await 154 .expect("throttle semaphore unexpectedly closed") 155 } 156 157 /// resolves when this PDS gets a hard failure notification. 158 /// used by `or_throttle` and the semaphore acquire select to cancel in-flight work. 159 pub async fn wait_for_failure(&self) { 160 loop { 161 let notified = self.state.failure_notify.notified(); 162 if self.is_throttled() { 163 return; 164 } 165 notified.await; 166 } 167 } 168} 169 170/// adds a method for racing the future against a hard-failure notification. 171#[allow(async_fn_in_trait)] 172pub trait OrFailure<T, E>: Future<Output = Result<T, E>> { 173 async fn or_failure( 174 self, 175 handle: &ThrottleHandle, 176 on_throttle: impl FnOnce() -> E, 177 ) -> Result<T, E> 178 where 179 Self: Sized, 180 { 181 tokio::select! { 182 res = self => res, 183 _ = handle.wait_for_failure() => Err(on_throttle()), 184 } 185 } 186} 187 188impl<T, E, F: Future<Output = Result<T, E>>> OrFailure<T, E> for F {}