this repo has no description

🪣Implement Token Bucket algorithm

Closes #1

+6 -2
CHANGELOG.md
··· 7 7 8 8 ## [Unreleased] 9 9 10 - ### Changed 10 + ### Breaking changes 11 11 12 - - Deprecated `glimit.handler` and renamed it to the more descriptive `glimit.on_limit_exceeded`. 12 + - Refactored the code to use a Token Bucket algorithm instead of a Sliding Window algorithm. This has removed some of the library features/API, such as `glimit.applyX` to apply a rate limiter on a function with multiple arguments. 13 + 14 + ### Added 15 + 16 + - Added a `burst_limit` setting to the limiter configuration. This setting allows the user to set the maximum number of tokens that the bucket can hold. 13 17 14 18 15 19 ## v0.1.3 - 2024-09-04
+13 -7
README.md
··· 4 4 [![Hex Docs](https://img.shields.io/badge/hex-docs-ffaff3)](https://hexdocs.pm/glimit/) 5 5 [![test](https://github.com/nootr/glimit/actions/workflows/test.yml/badge.svg)](https://github.com/nootr/glimit/actions/workflows/test.yml) 6 6 7 - A framework-agnostic rate limiter for Gleam. 💫 7 + A simple, framework-agnostic, in-memory rate limiter for Gleam. 💫 8 8 9 9 > ⚠️ This library is still in development, use at your own risk. 10 + 11 + 12 + ## Features 13 + 14 + * ✨ Simple and easy to use. 15 + * 📏 Rate limits based on any key (e.g. IP address, or user ID). 16 + * 🪣 Uses a distributed Token Bucket algorithm to rate limit requests. 17 + * 🗄️ No back-end service needed; stores rate limit stats in-memory. 10 18 11 19 12 20 ## Usage ··· 73 81 let limiter = 74 82 glimit.new() 75 83 |> glimit.per_second(10) 76 - |> glimit.per_minute(100) 77 - |> glimit.per_hour(1000) 78 84 |> glimit.identifier(get_identifier) 79 85 |> glimit.on_limit_exceeded(rate_limit_reached) 80 86 |> glimit.build ··· 91 97 ``` 92 98 93 99 94 - ## How it works 100 + ## Constraints 95 101 96 - Once v1.0 is reached, `glimit` will use a distributed Token Bucket algorithm to rate limit requests. It will support multiple backend storage systems, such as Redis and in-memory storage. 102 + While the in-memory rate limiter is simple and easy to use, it does have an important constraint: it is scoped to the BEAM VM cluster it runs in. This means that if your application is running across multiple BEAM VM clusters, the rate limiter will not be shared between them. 97 103 98 - However, at the moment, `glimit` uses a simple Sliding Window algorithm with in-memory storage. This means that the rate limiter is not memory efficient and is not ready for production use. 104 + There are plans to add support for a centralized data store using Redis in the future. 99 105 100 106 101 107 ## Documentation 102 108 103 - Further documentation can be found at <https://hexdocs.pm/glimit>. 109 + Further documentation can be found at <https://hexdocs.pm/glimit/glimit.html>. 104 110 105 111 106 112 ## Contributing
+4 -1
gleam.toml
··· 1 1 name = "glimit" 2 2 version = "0.1.3" 3 3 4 - description = "A framework-agnostic rate limiter for Gleam." 4 + description = "A simple, framework-agnostic, in-memory rate limiter for Gleam." 5 5 licences = ["MIT"] 6 6 repository = { type = "github", user = "nootr", repo = "glimit" } 7 7 target = "erlang" 8 + internal_modules = [ 9 + "glimit/*", 10 + ] 8 11 9 12 [dependencies] 10 13 gleam_stdlib = ">= 0.34.0 and < 2.0.0"
+52 -126
src/glimit.gleam
··· 1 - //// A framework-agnostic rate limiter for Gleam. 💫 1 + //// This module provides a distributed rate limiter that can be used to limit the 2 + //// number of requests or function calls per second for a given identifier. 2 3 //// 3 - //// This module provides a rate limiter that can be used to limit the number of 4 - //// requests that can be made to a given function or handler within a given 5 - //// time frame. 4 + //// A single actor is used to assign one rate limiter actor per identifier. The 5 + //// rate limiter actor then uses a Token Bucket algorithm to determine if a 6 + //// request or function call should be allowed to proceed. A separate process is 7 + //// polling the rate limiters to remove full buckets to reduce unnecessary memory 8 + //// usage. 9 + //// 10 + //// The rate limits are configured using the following two options: 6 11 //// 7 - //// The rate limiter is implemented as an actor that keeps track of the number 8 - //// of hits for a given identifier within the last second, minute, and hour. 9 - //// When a hit is received, the actor checks the rate limits and either allows 10 - //// the hit to pass or rejects it. 12 + //// - `per_second`: The rate of new available tokens per second. Think of this 13 + //// as the steady state rate limit. 14 + //// - `burst_limit`: The maximum number of available tokens. Think of this as 15 + //// the burst rate limit. The default value is the `per_second` rate limit. 11 16 //// 12 - //// The rate limiter can be configured with rate limits per second, minute, and 13 - //// hour, and a handler function that is called when the rate limit is reached. 14 17 //// The rate limiter can be applied to a function or handler using the `apply` 15 18 //// function, which returns a new function that checks the rate limit before 16 19 //// calling the original function. ··· 23 26 //// let limiter = 24 27 //// glimit.new() 25 28 //// |> glimit.per_second(10) 26 - //// |> glimit.per_minute(100) 27 - //// |> glimit.per_hour(1000) 28 29 //// |> glimit.identifier(fn(request) { request.ip }) 29 30 //// |> glimit.on_limit_exceeded(fn(_request) { "Rate limit reached" }) 30 31 //// |> glimit.build() ··· 35 36 //// ``` 36 37 //// 37 38 38 - import gleam/erlang/process.{type Subject} 39 39 import gleam/option.{type Option, None, Some} 40 40 import gleam/result 41 - import glimit/actor 41 + import glimit/rate_limiter 42 + import glimit/registry.{type RateLimiterRegistryActor} 42 43 43 44 /// A rate limiter. 44 45 /// 45 46 pub type RateLimiter(a, b, id) { 46 47 RateLimiter( 47 - subject: Subject(actor.Message(id)), 48 + rate_limiter_registry: RateLimiterRegistryActor(id), 48 49 on_limit_exceeded: fn(a) -> b, 49 50 identifier: fn(a) -> id, 50 51 ) ··· 55 56 pub type RateLimiterBuilder(a, b, id) { 56 57 RateLimiterBuilder( 57 58 per_second: Option(Int), 58 - per_minute: Option(Int), 59 - per_hour: Option(Int), 59 + burst_limit: Option(Int), 60 60 identifier: Option(fn(a) -> id), 61 61 on_limit_exceeded: Option(fn(a) -> b), 62 62 ) ··· 67 67 pub fn new() -> RateLimiterBuilder(a, b, id) { 68 68 RateLimiterBuilder( 69 69 per_second: None, 70 - per_minute: None, 71 - per_hour: None, 70 + burst_limit: None, 72 71 identifier: None, 73 72 on_limit_exceeded: None, 74 73 ) ··· 76 75 77 76 /// Set the rate limit per second. 78 77 /// 78 + /// The value is not only used for the rate at which tokens are added to the bucket, but 79 + /// also for the maximum number of available tokens. To set a different value fo the 80 + /// maximum number of available tokens, use the `burst_limit` function. 81 + /// 79 82 pub fn per_second( 80 83 limiter: RateLimiterBuilder(a, b, id), 81 84 limit: Int, ··· 83 86 RateLimiterBuilder(..limiter, per_second: Some(limit)) 84 87 } 85 88 86 - /// Set the rate limit per minute. 89 + /// Set the maximum number of available tokens. 87 90 /// 88 - pub fn per_minute( 89 - limiter: RateLimiterBuilder(a, b, id), 90 - limit: Int, 91 - ) -> RateLimiterBuilder(a, b, id) { 92 - RateLimiterBuilder(..limiter, per_minute: Some(limit)) 93 - } 94 - 95 - /// Set the rate limit per hour. 91 + /// The maximum number of available tokens is the maximum number of requests that can be 92 + /// made in a single second. The default value is the same as the rate limit per second. 96 93 /// 97 - pub fn per_hour( 94 + pub fn burst_limit( 98 95 limiter: RateLimiterBuilder(a, b, id), 99 - limit: Int, 96 + burst_limit: Int, 100 97 ) -> RateLimiterBuilder(a, b, id) { 101 - RateLimiterBuilder(..limiter, per_hour: Some(limit)) 98 + RateLimiterBuilder(..limiter, burst_limit: Some(burst_limit)) 102 99 } 103 100 104 101 /// Set the handler to be called when the rate limit is reached. 105 102 /// 106 103 pub fn on_limit_exceeded( 107 - limiter: RateLimiterBuilder(a, b, id), 108 - on_limit_exceeded: fn(a) -> b, 109 - ) -> RateLimiterBuilder(a, b, id) { 110 - RateLimiterBuilder(..limiter, on_limit_exceeded: Some(on_limit_exceeded)) 111 - } 112 - 113 - @deprecated("Use `on_limit_exceeded` instead") 114 - pub fn handler( 115 104 limiter: RateLimiterBuilder(a, b, id), 116 105 on_limit_exceeded: fn(a) -> b, 117 106 ) -> RateLimiterBuilder(a, b, id) { ··· 129 118 130 119 /// Build the rate limiter. 131 120 /// 132 - /// Panics if the rate limiter actor cannot be started or if the identifier 133 - /// function or on_limit_exceeded function is missing. 121 + /// Panics if the rate limiter registry cannot be started or if the `identifier` 122 + /// function or `on_limit_exceeded` function is missing. 123 + /// 124 + /// To handle errors instead of panicking, use `try_build`. 134 125 /// 135 126 pub fn build(config: RateLimiterBuilder(a, b, id)) -> RateLimiter(a, b, id) { 136 127 case try_build(config) { ··· 144 135 pub fn try_build( 145 136 config: RateLimiterBuilder(a, b, id), 146 137 ) -> Result(RateLimiter(a, b, id), String) { 147 - use subject <- result.try( 148 - actor.new(config.per_second, config.per_minute, config.per_hour) 149 - |> result.map_error(fn(_) { "Failed to start rate limiter actor" }), 138 + use per_second <- result.try(case config.per_second { 139 + Some(per_second) -> Ok(per_second) 140 + None -> Error("`per_second` rate limit is required") 141 + }) 142 + let burst_limit = case config.burst_limit { 143 + Some(burst_limit) -> burst_limit 144 + None -> per_second 145 + } 146 + use rate_limiter_registry <- result.try( 147 + registry.new(per_second, burst_limit) 148 + |> result.map_error(fn(_) { "Failed to start rate limiter registry" }), 150 149 ) 151 150 use identifier <- result.try(case config.identifier { 152 151 Some(identifier) -> Ok(identifier) ··· 158 157 }) 159 158 160 159 Ok(RateLimiter( 161 - subject: subject, 160 + rate_limiter_registry: rate_limiter_registry, 162 161 on_limit_exceeded: on_limit_exceeded, 163 162 identifier: identifier, 164 163 )) ··· 169 168 pub fn apply(func: fn(a) -> b, limiter: RateLimiter(a, b, id)) -> fn(a) -> b { 170 169 fn(input: a) -> b { 171 170 let identifier = limiter.identifier(input) 172 - case actor.hit(limiter.subject, identifier) { 173 - Ok(Nil) -> func(input) 174 - Error(Nil) -> limiter.on_limit_exceeded(input) 175 - } 176 - } 177 - } 178 - 179 - /// Apply the rate limiter to a request handler or function with two arguments. 180 - /// 181 - /// Note: this function folds the two arguments into a tuple before passing them to the 182 - /// identifier or on_limit_exceeded functions. 183 - /// 184 - /// # Example 185 - /// 186 - /// ```gleam 187 - /// import glimit 188 - /// 189 - /// let limiter = 190 - /// glimit.new() 191 - /// |> glimit.per_hour(1000) 192 - /// |> glimit.identifier(fn(i: #(String, String)) { 193 - /// let #(a, _) = i 194 - /// a 195 - /// }) 196 - /// |> glimit.on_limit_exceeded(fn(_) { "Rate limit reached" }) 197 - /// |> glimit.build() 198 - /// 199 - /// let handler = 200 - /// fn(a, b) { a <> b } 201 - /// |> glimit.apply2(limiter) 202 - /// ``` 203 - pub fn apply2( 204 - func: fn(a, b) -> c, 205 - limiter: RateLimiter(#(a, b), c, id), 206 - ) -> fn(a, b) -> c { 207 - fn(a: a, b: b) -> c { 208 - let identifier = limiter.identifier(#(a, b)) 209 - case actor.hit(limiter.subject, identifier) { 210 - Ok(Nil) -> func(a, b) 211 - Error(Nil) -> limiter.on_limit_exceeded(#(a, b)) 212 - } 213 - } 214 - } 215 - 216 - /// Apply the rate limiter to a request handler or function with three arguments. 217 - /// 218 - /// Note: this function folds the three arguments into a tuple before passing them to the 219 - /// identifier or on_limit_exceeded functions. 220 - /// 221 - pub fn apply3( 222 - func: fn(a, b, c) -> d, 223 - limiter: RateLimiter(#(a, b, c), d, id), 224 - ) -> fn(a, b, c) -> d { 225 - fn(a: a, b: b, c: c) -> d { 226 - let identifier = limiter.identifier(#(a, b, c)) 227 - case actor.hit(limiter.subject, identifier) { 228 - Ok(Nil) -> func(a, b, c) 229 - Error(Nil) -> limiter.on_limit_exceeded(#(a, b, c)) 230 - } 231 - } 232 - } 233 - 234 - /// Apply the rate limiter to a request handler or function with four arguments. 235 - /// 236 - /// Note: this function folds the four arguments into a tuple before passing them to the 237 - /// identifier or on_limit_exceeded functions. 238 - /// 239 - /// > ⚠️ For functions with more than four arguments, you'll need to write a custom 240 - /// > wrapper function that folds the arguments into a tuple before passing them to the 241 - /// > rate limiter. This is because Gleam does not support variadic functions, because 242 - /// > the BEAM VM identifies functions by their arity. 243 - /// 244 - pub fn apply4( 245 - func: fn(a, b, c, d) -> e, 246 - limiter: RateLimiter(#(a, b, c, d), e, id), 247 - ) -> fn(a, b, c, d) -> e { 248 - fn(a: a, b: b, c: c, d: d) -> e { 249 - let identifier = limiter.identifier(#(a, b, c, d)) 250 - case actor.hit(limiter.subject, identifier) { 251 - Ok(Nil) -> func(a, b, c, d) 252 - Error(Nil) -> limiter.on_limit_exceeded(#(a, b, c, d)) 171 + case limiter.rate_limiter_registry |> registry.get_or_create(identifier) { 172 + Ok(rate_limiter) -> { 173 + case rate_limiter |> rate_limiter.hit { 174 + Ok(Nil) -> func(input) 175 + Error(Nil) -> limiter.on_limit_exceeded(input) 176 + } 177 + } 178 + Error(_) -> panic as "Failed to get rate limiter" 253 179 } 254 180 } 255 181 }
-120
src/glimit/actor.gleam
··· 1 - //// The rate limiter actor. 2 - //// 3 - 4 - import gleam/dict 5 - import gleam/erlang/process.{type Subject} 6 - import gleam/list 7 - import gleam/option.{type Option, None, Some} 8 - import gleam/otp/actor 9 - import gleam/result 10 - import glimit/utils 11 - 12 - /// The messages that the actor can receive. 13 - /// 14 - pub type Message(id) { 15 - /// Stop the actor. 16 - Shutdown 17 - 18 - /// Mark a hit for a given identifier. 19 - Hit(identifier: id, reply_with: Subject(Result(Nil, Nil))) 20 - } 21 - 22 - /// The actor state. 23 - /// 24 - type State(a, b, id) { 25 - RateLimiterState( 26 - hit_log: dict.Dict(id, List(Int)), 27 - per_second: Option(Int), 28 - per_minute: Option(Int), 29 - per_hour: Option(Int), 30 - ) 31 - } 32 - 33 - fn handle_message( 34 - message: Message(id), 35 - state: State(a, b, id), 36 - ) -> actor.Next(Message(id), State(a, b, id)) { 37 - case message { 38 - Shutdown -> actor.Stop(process.Normal) 39 - Hit(identifier, client) -> { 40 - // Update hit log 41 - let timestamp = utils.now() 42 - let hits = 43 - state.hit_log 44 - |> dict.get(identifier) 45 - |> result.unwrap([]) 46 - |> list.filter(fn(hit) { hit >= timestamp - 60 * 60 }) 47 - |> list.append([timestamp]) 48 - let hit_log = 49 - state.hit_log 50 - |> dict.insert(identifier, hits) 51 - let state = RateLimiterState(..state, hit_log: hit_log) 52 - 53 - // Check rate limits 54 - // TODO: optimize into a single loop 55 - let hits_last_hour = hits |> list.length() 56 - 57 - let hits_last_minute = 58 - hits 59 - |> list.filter(fn(hit) { hit >= timestamp - 60 }) 60 - |> list.length() 61 - 62 - let hits_last_second = 63 - hits 64 - |> list.filter(fn(hit) { hit >= timestamp - 1 }) 65 - |> list.length() 66 - 67 - let limit_reached = { 68 - case state.per_hour { 69 - Some(limit) -> hits_last_hour > limit 70 - None -> False 71 - } 72 - || case state.per_minute { 73 - Some(limit) -> hits_last_minute > limit 74 - None -> False 75 - } 76 - || case state.per_second { 77 - Some(limit) -> hits_last_second > limit 78 - None -> False 79 - } 80 - } 81 - 82 - case limit_reached { 83 - True -> process.send(client, Error(Nil)) 84 - False -> process.send(client, Ok(Nil)) 85 - } 86 - 87 - actor.continue(state) 88 - } 89 - } 90 - } 91 - 92 - /// Create a new rate limiter actor. 93 - /// 94 - pub fn new( 95 - per_second: Option(Int), 96 - per_minute: Option(Int), 97 - per_hour: Option(Int), 98 - ) -> Result(Subject(Message(id)), Nil) { 99 - let state = 100 - RateLimiterState( 101 - hit_log: dict.new(), 102 - per_second: per_second, 103 - per_minute: per_minute, 104 - per_hour: per_hour, 105 - ) 106 - actor.start(state, handle_message) 107 - |> result.nil_error 108 - } 109 - 110 - /// Log a hit for a given identifier. 111 - /// 112 - pub fn hit(subject: Subject(Message(id)), identifier: id) -> Result(Nil, Nil) { 113 - actor.call(subject, Hit(identifier, _), 10) 114 - } 115 - 116 - /// Stop the actor. 117 - /// 118 - pub fn stop(subject: Subject(Message(id))) { 119 - actor.send(subject, Shutdown) 120 - }
+120
src/glimit/rate_limiter.gleam
··· 1 + //// This module contains the implementation of a single rate limiter actor. 2 + //// 3 + 4 + import gleam/erlang/process.{type Subject} 5 + import gleam/int 6 + import gleam/option.{type Option, None, Some} 7 + import gleam/otp/actor 8 + import gleam/result 9 + import glimit/utils 10 + 11 + type State { 12 + State( 13 + /// The maximum number of tokens. 14 + /// 15 + max_token_count: Int, 16 + /// The rate of token generation per second. 17 + /// 18 + token_rate: Int, 19 + /// The number of tokens available. 20 + /// 21 + token_count: Int, 22 + /// Epoch timestamp of the last time the rate limiter was updated. 23 + /// 24 + last_update: Option(Int), 25 + ) 26 + } 27 + 28 + /// Updates the state to reflect the passage of time. 29 + /// 30 + fn refill_bucket(state: State) -> State { 31 + let now = utils.now() 32 + let time_diff = case state.last_update { 33 + None -> 0 34 + Some(last_update) -> now - last_update 35 + } 36 + let token_count = 37 + state.token_count + state.token_rate * time_diff 38 + |> int.min(state.max_token_count) 39 + |> int.max(0) 40 + 41 + State(..state, token_count: token_count, last_update: Some(now)) 42 + } 43 + 44 + /// Updates the state to remove a token. 45 + /// 46 + fn remove_token(state: State) -> State { 47 + State(..state, token_count: state.token_count - 1) 48 + } 49 + 50 + /// The message type for the rate limiter actor. 51 + /// 52 + pub type Message { 53 + /// Stop the actor. 54 + /// 55 + Shutdown 56 + 57 + /// Mark a hit. 58 + /// 59 + /// The actor will reply with the result of the hit. 60 + /// 61 + Hit(reply_with: Subject(Result(Nil, Nil))) 62 + 63 + /// Returns True if the token bucket is full. 64 + /// 65 + HasFullBucket(reply_with: Subject(Bool)) 66 + } 67 + 68 + fn handle_message(message: Message, state: State) -> actor.Next(Message, State) { 69 + case message { 70 + Shutdown -> actor.Stop(process.Normal) 71 + 72 + Hit(client) -> { 73 + let state = refill_bucket(state) 74 + let #(result, state) = case state.token_count { 75 + 0 -> #(Error(Nil), state) 76 + _ -> #(Ok(Nil), remove_token(state)) 77 + } 78 + 79 + actor.send(client, result) 80 + actor.continue(state) 81 + } 82 + 83 + HasFullBucket(client) -> { 84 + let state = refill_bucket(state) 85 + let result = state.token_count == state.max_token_count 86 + 87 + actor.send(client, result) 88 + actor.continue(state) 89 + } 90 + } 91 + } 92 + 93 + /// Create a new rate limiter actor. 94 + /// 95 + pub fn new( 96 + max_token_count: Int, 97 + token_rate: Int, 98 + ) -> Result(Subject(Message), Nil) { 99 + let state = 100 + State( 101 + max_token_count: max_token_count, 102 + token_rate: token_rate, 103 + token_count: max_token_count, 104 + last_update: None, 105 + ) 106 + actor.start(state, handle_message) 107 + |> result.nil_error 108 + } 109 + 110 + /// Mark a hit on the rate limiter actor. 111 + /// 112 + pub fn hit(rate_limiter: Subject(Message)) -> Result(Nil, Nil) { 113 + actor.call(rate_limiter, Hit, 10) 114 + } 115 + 116 + /// Returns True if the token bucket is full. 117 + /// 118 + pub fn has_full_bucket(rate_limiter: Subject(Message)) -> Bool { 119 + actor.call(rate_limiter, HasFullBucket, 10) 120 + }
+143
src/glimit/registry.gleam
··· 1 + //// This module contains a registry which maps hit identifiers to rate limiter actors. 2 + //// 3 + 4 + import gleam/dict.{type Dict} 5 + import gleam/erlang/process.{type Subject} 6 + import gleam/list 7 + import gleam/otp/actor 8 + import gleam/otp/task 9 + import gleam/result 10 + import glimit/rate_limiter 11 + 12 + pub type RateLimiterRegistryActor(id) = 13 + Subject(Message(id)) 14 + 15 + /// The rate limiter registry state. 16 + type State(id) { 17 + State( 18 + /// The maximum number of tokens. 19 + /// 20 + max_token_count: Int, 21 + /// The rate of token generation per second. 22 + /// 23 + token_rate: Int, 24 + /// The registry of rate limiters. 25 + /// 26 + registry: Dict(id, Subject(rate_limiter.Message)), 27 + ) 28 + } 29 + 30 + pub type Message(id) { 31 + /// Get the rate limiter for the given id or create a new one if missing. 32 + /// 33 + GetOrCreate( 34 + identifier: id, 35 + reply_with: Subject(Result(Subject(rate_limiter.Message), Nil)), 36 + ) 37 + Sweep 38 + } 39 + 40 + fn handle_get_or_create( 41 + identifier, 42 + state: State(id), 43 + ) -> Result(Subject(rate_limiter.Message), Nil) { 44 + case state.registry |> dict.get(identifier) { 45 + Ok(rate_limiter) -> { 46 + Ok(rate_limiter) 47 + } 48 + Error(_) -> { 49 + use rate_limiter <- result.try(rate_limiter.new( 50 + state.max_token_count, 51 + state.token_rate, 52 + )) 53 + Ok(rate_limiter) 54 + } 55 + } 56 + } 57 + 58 + /// Shutdown and remove all rate limiters that are not alive. 59 + /// 60 + fn handle_message( 61 + message: Message(id), 62 + state: State(id), 63 + ) -> actor.Next(Message(id), State(id)) { 64 + case message { 65 + GetOrCreate(identifier, client) -> { 66 + case handle_get_or_create(identifier, state) { 67 + Ok(rate_limiter) -> { 68 + let registry = state.registry |> dict.insert(identifier, rate_limiter) 69 + let state = State(..state, registry: registry) 70 + actor.send(client, Ok(rate_limiter)) 71 + actor.continue(state) 72 + } 73 + Error(_) -> { 74 + actor.send(client, Error(Nil)) 75 + actor.continue(state) 76 + } 77 + } 78 + } 79 + Sweep -> { 80 + let full_buckets = 81 + state.registry 82 + |> dict.to_list 83 + |> list.filter(fn(pair) { 84 + let #(_, rate_limiter) = pair 85 + rate_limiter |> rate_limiter.has_full_bucket 86 + }) 87 + |> list.map(fn(pair) { 88 + let #(identifier, rate_limiter) = pair 89 + actor.send(rate_limiter, rate_limiter.Shutdown) 90 + identifier 91 + }) 92 + 93 + let registry = state.registry |> dict.drop(full_buckets) 94 + 95 + let state = State(..state, registry: registry) 96 + 97 + actor.continue(state) 98 + } 99 + } 100 + } 101 + 102 + /// Create a new rate limiter registry. 103 + /// 104 + pub fn new( 105 + per_second: Int, 106 + burst_limit: Int, 107 + ) -> Result(RateLimiterRegistryActor(id), Nil) { 108 + let state = 109 + State( 110 + max_token_count: burst_limit, 111 + token_rate: per_second, 112 + registry: dict.new(), 113 + ) 114 + use registry <- result.try( 115 + actor.start(state, handle_message) 116 + |> result.nil_error, 117 + ) 118 + 119 + task.async(fn() { sweep_loop(registry) }) 120 + 121 + Ok(registry) 122 + } 123 + 124 + /// Get the rate limiter for the given id or create a new one if missing. 125 + /// 126 + pub fn get_or_create( 127 + registry: RateLimiterRegistryActor(id), 128 + identifier: id, 129 + ) -> Result(Subject(rate_limiter.Message), Nil) { 130 + actor.call(registry, GetOrCreate(identifier, _), 10) 131 + } 132 + 133 + fn sweep_loop(registry: RateLimiterRegistryActor(id)) { 134 + process.sleep(10_000) 135 + sweep(registry) 136 + sweep_loop(registry) 137 + } 138 + 139 + /// Sweep the registry and remove all rate limiters that have a full bucket. 140 + /// 141 + pub fn sweep(registry: RateLimiterRegistryActor(id)) { 142 + actor.send(registry, Sweep) 143 + }
+23
test/glimit_rate_limiter_test.gleam
··· 1 + import gleeunit/should 2 + import glimit/rate_limiter 3 + 4 + // TODO: find a way to mock time so we can test the refilling of the rate limiter. 5 + 6 + pub fn rate_limiter_test() { 7 + let limiter = case rate_limiter.new(2, 2) { 8 + Ok(limiter) -> limiter 9 + Error(_) -> panic as "Should be able to create rate limiter" 10 + } 11 + 12 + limiter 13 + |> rate_limiter.hit 14 + |> should.be_ok 15 + 16 + limiter 17 + |> rate_limiter.hit 18 + |> should.be_ok 19 + 20 + limiter 21 + |> rate_limiter.hit 22 + |> should.be_error 23 + }
+68
test/glimit_registry_test.gleam
··· 1 + import gleeunit/should 2 + import glimit/rate_limiter 3 + import glimit/registry 4 + 5 + pub fn same_id_same_actor_test() { 6 + let registry = case registry.new(2, 2) { 7 + Ok(registry) -> registry 8 + Error(_) -> { 9 + panic as "Should be able to create a new registry" 10 + } 11 + } 12 + 13 + let assert Ok(rate_limiter) = registry |> registry.get_or_create("🚀") 14 + let assert Ok(same_rate_limiter) = registry |> registry.get_or_create("🚀") 15 + 16 + rate_limiter 17 + |> should.equal(same_rate_limiter) 18 + } 19 + 20 + pub fn other_id_other_actor_test() { 21 + let registry = case registry.new(2, 2) { 22 + Ok(registry) -> registry 23 + Error(_) -> { 24 + panic as "Should be able to create a new registry" 25 + } 26 + } 27 + 28 + let assert Ok(rate_limiter) = registry |> registry.get_or_create("🚀") 29 + let assert Ok(same_rate_limiter) = registry |> registry.get_or_create("💫") 30 + 31 + rate_limiter 32 + |> should.not_equal(same_rate_limiter) 33 + } 34 + 35 + pub fn sweep_full_bucket_test() { 36 + let registry = case registry.new(2, 2) { 37 + Ok(registry) -> registry 38 + Error(_) -> { 39 + panic as "Should be able to create a new registry" 40 + } 41 + } 42 + 43 + let assert Ok(rate_limiter) = registry |> registry.get_or_create("🚀") 44 + registry |> registry.sweep 45 + let assert Ok(new_rate_limiter) = registry |> registry.get_or_create("🚀") 46 + 47 + rate_limiter 48 + |> should.not_equal(new_rate_limiter) 49 + } 50 + 51 + pub fn sweep_not_full_bucket_test() { 52 + let registry = case registry.new(2, 2) { 53 + Ok(registry) -> registry 54 + Error(_) -> { 55 + panic as "Should be able to create a new registry" 56 + } 57 + } 58 + 59 + let assert Ok(rate_limiter) = registry |> registry.get_or_create("🚀") 60 + 61 + let _ = rate_limiter |> rate_limiter.hit 62 + registry |> registry.sweep 63 + 64 + let assert Ok(new_rate_limiter) = registry |> registry.get_or_create("🚀") 65 + 66 + rate_limiter 67 + |> should.equal(new_rate_limiter) 68 + }
+16 -104
test/glimit_test.gleam
··· 1 + import gleam/erlang/process 1 2 import gleeunit 2 3 import gleeunit/should 3 4 import glimit ··· 24 25 func(Nil) |> should.equal("Stop!") 25 26 } 26 27 27 - pub fn single_argument_function_per_minute_test() { 28 - let limiter = 29 - glimit.new() 30 - |> glimit.per_minute(2) 31 - |> glimit.identifier(fn(_) { "id" }) 32 - |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) 33 - |> glimit.build 34 - 35 - let func = 36 - fn(_) { "OK" } 37 - |> glimit.apply(limiter) 38 - 39 - func(Nil) |> should.equal("OK") 40 - func(Nil) |> should.equal("OK") 41 - func(Nil) |> should.equal("Stop!") 42 - func(Nil) |> should.equal("Stop!") 43 - } 44 - 45 - pub fn single_argument_function_per_hour_test() { 46 - let limiter = 47 - glimit.new() 48 - |> glimit.per_hour(2) 49 - |> glimit.identifier(fn(_) { "id" }) 50 - |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) 51 - |> glimit.build 52 - 53 - let func = 54 - fn(_) { "OK" } 55 - |> glimit.apply(limiter) 56 - 57 - func(Nil) |> should.equal("OK") 58 - func(Nil) |> should.equal("OK") 59 - func(Nil) |> should.equal("Stop!") 60 - func(Nil) |> should.equal("Stop!") 61 - } 62 - 63 28 pub fn single_argument_function_different_ids_test() { 64 29 let limiter = 65 30 glimit.new() ··· 80 45 func("🚀") |> should.equal("Stop!") 81 46 } 82 47 83 - pub fn two_arguments_function_test() { 84 - let limiter = 85 - glimit.new() 86 - |> glimit.per_second(2) 87 - |> glimit.identifier(fn(_) { "id" }) 88 - |> glimit.identifier(fn(i: #(String, String)) { 89 - let #(a, _) = i 90 - a 91 - }) 92 - |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) 93 - |> glimit.build 94 - 95 - let func = 96 - fn(x, y) { x <> y } 97 - |> glimit.apply2(limiter) 98 - 99 - func("O", "K") |> should.equal("OK") 100 - func(":", ")") |> should.equal(":)") 101 - func("O", "K") |> should.equal("OK") 102 - func("O", "K") |> should.equal("Stop!") 103 - } 104 - 105 - pub fn three_arguments_function_test() { 106 - let limiter = 107 - glimit.new() 108 - |> glimit.per_second(2) 109 - |> glimit.identifier(fn(_) { "id" }) 110 - |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) 111 - |> glimit.build 112 - 113 - let func = 114 - fn(x, y, z) { x <> y <> z } 115 - |> glimit.apply3(limiter) 116 - 117 - func("O", "K", "!") |> should.equal("OK!") 118 - func("O", "K", "!") |> should.equal("OK!") 119 - func("O", "K", "!") |> should.equal("Stop!") 120 - } 121 - 122 - pub fn four_arguments_function_test() { 48 + pub fn burst_limit_test() { 123 49 let limiter = 124 50 glimit.new() 125 51 |> glimit.per_second(2) 52 + |> glimit.burst_limit(3) 126 53 |> glimit.identifier(fn(_) { "id" }) 127 54 |> glimit.on_limit_exceeded(fn(_) { "Stop!" }) 128 55 |> glimit.build 129 56 130 57 let func = 131 - fn(x, y, z, p) { x <> y <> z <> p } 132 - |> glimit.apply4(limiter) 58 + fn(_) { "OK" } 59 + |> glimit.apply(limiter) 133 60 134 - func("O", "K", "?", "!") |> should.equal("OK?!") 135 - func("O", "K", "?", "!") |> should.equal("OK?!") 136 - func("O", "K", "?", "!") |> should.equal("Stop!") 137 - } 138 - 139 - pub fn try_build_ok_test() { 140 - glimit.new() 141 - |> glimit.per_second(2) 142 - |> glimit.identifier(fn(x) { x }) 143 - |> glimit.on_limit_exceeded(fn(x) { x }) 144 - |> glimit.try_build 145 - |> should.be_ok() 146 - } 61 + func(Nil) |> should.equal("OK") 62 + func(Nil) |> should.equal("OK") 63 + func(Nil) |> should.equal("OK") 64 + func(Nil) |> should.equal("Stop!") 65 + func(Nil) |> should.equal("Stop!") 147 66 148 - pub fn try_build_identifier_missing_test() { 149 - glimit.new() 150 - |> glimit.per_second(2) 151 - |> glimit.on_limit_exceeded(fn(x) { x }) 152 - |> glimit.try_build 153 - |> should.equal(Error("`identifier` function is required")) 154 - } 67 + // TODO: mock time to avoid sleeping 😴 68 + process.sleep(1000) 155 69 156 - pub fn try_build_on_limit_exceeded_missing_test() { 157 - glimit.new() 158 - |> glimit.per_second(2) 159 - |> glimit.identifier(fn(x) { x }) 160 - |> glimit.try_build 161 - |> should.equal(Error("`on_limit_exceeded` function is required")) 70 + func(Nil) |> should.equal("OK") 71 + func(Nil) |> should.equal("OK") 72 + func(Nil) |> should.equal("Stop!") 73 + func(Nil) |> should.equal("Stop!") 162 74 }