at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use scc::HashMap;
2use std::future::Future;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
5use std::time::Duration;
6use tokio::sync::{Notify, Semaphore, SemaphorePermit};
7use url::Url;
8
9/// max concurrent in-flight requests per PDS before we start queuing
10/// ref pds allows 10 requests per second... so 10 should be fine
11const PER_PDS_CONCURRENCY: usize = 10;
12
13#[derive(Clone)]
14pub struct Throttler {
15 states: Arc<HashMap<Url, Arc<State>>>,
16}
17
18impl Throttler {
19 pub fn new() -> Self {
20 Self {
21 states: Arc::new(HashMap::new()),
22 }
23 }
24
25 pub async fn get_handle(&self, url: &Url) -> ThrottleHandle {
26 let state = self
27 .states
28 .entry_async(url.clone())
29 .await
30 .or_insert_with(|| Arc::new(State::new()))
31 .get()
32 .clone();
33
34 ThrottleHandle { state }
35 }
36
37 /// drop entries with no active throttle and no consecutive failures.
38 pub async fn evict_clean(&self) {
39 self.states
40 .retain_async(|_, v| {
41 v.throttled_until.load(Ordering::Acquire) != 0
42 || v.consecutive_failures.load(Ordering::Acquire) != 0
43 })
44 .await;
45 }
46}
47
48struct State {
49 throttled_until: AtomicI64,
50 consecutive_failures: AtomicUsize,
51 consecutive_timeouts: AtomicUsize,
52 /// only fires on hard failures (timeout, TLS, bad gateway, etc).
53 /// ratelimits do NOT fire this — they just store `throttled_until` and
54 /// let tasks exit naturally, deferring to the background retry loop.
55 failure_notify: Notify,
56 semaphore: Semaphore,
57}
58
59impl State {
60 fn new() -> Self {
61 Self {
62 throttled_until: AtomicI64::new(0),
63 consecutive_failures: AtomicUsize::new(0),
64 consecutive_timeouts: AtomicUsize::new(0),
65 failure_notify: Notify::new(),
66 semaphore: Semaphore::new(PER_PDS_CONCURRENCY),
67 }
68 }
69}
70
71pub struct ThrottleHandle {
72 state: Arc<State>,
73}
74
75impl ThrottleHandle {
76 pub fn is_throttled(&self) -> bool {
77 let until = self.state.throttled_until.load(Ordering::Acquire);
78 until != 0 && chrono::Utc::now().timestamp() < until
79 }
80
81 /// the unix timestamp at which this throttle expires (0 if not throttled).
82 pub fn throttled_until(&self) -> i64 {
83 self.state.throttled_until.load(Ordering::Acquire)
84 }
85
86 pub fn record_success(&self) {
87 self.state.consecutive_failures.store(0, Ordering::Release);
88 self.state.consecutive_timeouts.store(0, Ordering::Release);
89 self.state.throttled_until.store(0, Ordering::Release);
90 }
91
92 /// called on a 429 response. `retry_after_secs` comes from the `Retry-After`
93 /// header if present; falls back to 60s. uses `fetch_max` so concurrent callers
94 /// don't race each other back to a shorter window.
95 ///
96 /// deliberately does NOT notify waiters — 429s are soft and tasks should exit
97 /// naturally via the `Retry` result rather than being cancelled.
98 pub fn record_ratelimit(&self, retry_after_secs: Option<u64>) {
99 let secs = retry_after_secs.unwrap_or(60) as i64;
100 let until = chrono::Utc::now().timestamp() + secs;
101 self.state
102 .throttled_until
103 .fetch_max(until, Ordering::AcqRel);
104 }
105
106 /// called on hard failures (timeout, TLS error, bad gateway, etc).
107 /// returns throttle duration in minutes if this is a *new* throttle,
108 /// and notifies all in-flight tasks to cancel immediately.
109 pub fn record_failure(&self) -> Option<i64> {
110 if self.is_throttled() {
111 return None;
112 }
113
114 let failures = self
115 .state
116 .consecutive_failures
117 .fetch_add(1, Ordering::AcqRel)
118 + 1;
119
120 // 30 min, 60 min, 120 min, ... capped at ~512 hours
121 let base_minutes = 30i64;
122 let exponent = (failures as u32).saturating_sub(1);
123 let minutes = base_minutes * 2i64.pow(exponent.min(10));
124 let until = chrono::Utc::now().timestamp() + minutes * 60;
125
126 self.state.throttled_until.store(until, Ordering::Release);
127 self.state.failure_notify.notify_waiters();
128
129 Some(minutes)
130 }
131
132 /// returns current timeout duration — 3s, 6s, or 12s depending on prior timeouts.
133 pub fn timeout(&self) -> Duration {
134 let n = self.state.consecutive_timeouts.load(Ordering::Acquire);
135 Duration::from_secs(3 * 2u64.pow(n.min(2) as u32))
136 }
137
138 pub fn record_timeout(&self) -> bool {
139 let timeouts = self
140 .state
141 .consecutive_timeouts
142 .fetch_add(1, Ordering::AcqRel)
143 + 1;
144 timeouts > 2
145 }
146
147 /// acquire a concurrency slot for this PDS. hold the returned permit
148 /// for the duration of the request.
149 pub async fn acquire(&self) -> SemaphorePermit<'_> {
150 self.state
151 .semaphore
152 .acquire()
153 .await
154 .expect("throttle semaphore unexpectedly closed")
155 }
156
157 /// resolves when this PDS gets a hard failure notification.
158 /// used by `or_throttle` and the semaphore acquire select to cancel in-flight work.
159 pub async fn wait_for_failure(&self) {
160 loop {
161 let notified = self.state.failure_notify.notified();
162 if self.is_throttled() {
163 return;
164 }
165 notified.await;
166 }
167 }
168}
169
170/// adds a method for racing the future against a hard-failure notification.
171#[allow(async_fn_in_trait)]
172pub trait OrFailure<T, E>: Future<Output = Result<T, E>> {
173 async fn or_failure(
174 self,
175 handle: &ThrottleHandle,
176 on_throttle: impl FnOnce() -> E,
177 ) -> Result<T, E>
178 where
179 Self: Sized,
180 {
181 tokio::select! {
182 res = self => res,
183 _ = handle.wait_for_failure() => Err(on_throttle()),
184 }
185 }
186}
187
188impl<T, E, F: Future<Output = Result<T, E>>> OrFailure<T, E> for F {}