this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

:sparkles: queue processor using actor

+66 -36
+54 -32
src/processor.gleam
··· 1 1 import birl 2 - import gleam/dynamic 3 2 import gleam/dynamic/decode 4 3 import gleam/erlang/process 5 4 import gleam/json 5 + import gleam/option 6 6 import gleam/otp/actor 7 + import gleam/otp/supervision 7 8 import gleam/result 8 9 import integrations/default 9 10 import integrations/fallback 10 - import model.{type PaymentRequest, PaymentRequest} 11 + import model.{PaymentRequest} 11 12 import redis 12 13 import valkyrie 13 14 15 + pub type Processor { 16 + Processor( 17 + redis_conn: valkyrie.Connection, 18 + name: option.Option(process.Name(Message)), 19 + ) 20 + } 21 + 14 22 pub type Message { 15 - Process(element: PaymentRequest) 23 + ServerTick 24 + } 25 + 26 + pub fn new(conn: valkyrie.Connection) { 27 + Processor(redis_conn: conn, name: option.None) 28 + } 29 + 30 + pub fn named(processor: Processor, name: process.Name(Message)) { 31 + Processor(..processor, name: option.Some(name)) 16 32 } 17 33 18 - pub fn create_worker_to_read_messages() { 19 - let name = process.new_name("worker_process") 34 + pub fn start(processor: Processor) { 35 + let ac = 36 + processor 37 + |> actor.new 38 + |> actor.on_message(handle_message) 20 39 21 - actor.new(PaymentRequest( 22 - amount: 0.0, 23 - correlation_id: "", 24 - requested_at: birl.now(), 25 - )) 26 - |> actor.named(name) 27 - |> actor.on_message(handle_message) 40 + case processor.name { 41 + option.None -> ac 42 + option.Some(name) -> ac |> actor.named(name) 43 + } 28 44 |> actor.start 29 45 } 30 46 31 - fn handle_message(state: PaymentRequest, message: Message) { 47 + pub fn supervised(processor: Processor) { 48 + supervision.supervisor(fn() { start(processor) }) 49 + } 50 + 51 + fn handle_message(state: Processor, message: Message) { 32 52 echo "inside on message" 33 53 case message { 34 - Process(data) -> { 35 - let body_to_send = data |> default.create_body 54 + ServerTick -> { 55 + case redis.read_queue_payments(state.redis_conn) { 56 + "" -> actor.continue(state) 57 + data -> { 58 + let assert Ok(message) = parse_data_redis(data) 36 59 37 - case default.default_provider_send_request(body_to_send) { 38 - Ok(_) -> actor.continue(data) 39 - Error(_) -> { 40 - echo "failed to make request" 41 - case fallback.fallback_provider_send_request(body_to_send) { 42 - Ok(_) -> actor.continue(data) 60 + let body_to_send = message |> default.create_body 61 + 62 + case default.default_provider_send_request(body_to_send) { 63 + Ok(_) -> actor.continue(state) 43 64 Error(_) -> { 44 65 echo "failed to make request" 45 - actor.continue(data) 66 + case fallback.fallback_provider_send_request(body_to_send) { 67 + Ok(_) -> actor.continue(state) 68 + Error(_) -> { 69 + echo "failed to make request" 70 + actor.continue(state) 71 + } 72 + } 46 73 } 47 74 } 75 + 76 + actor.continue(state) 48 77 } 49 78 } 50 79 } 51 80 } 52 81 } 53 82 54 - pub fn loop_worker(subject: process.Subject(Message), conn: valkyrie.Connection) { 83 + pub fn loop_worker(subject: process.Subject(Message)) { 55 84 echo "inside pool" 56 - case redis.read_queue_payments(conn) { 57 - "" -> Nil 58 - data -> { 59 - let assert Ok(message) = parse_data_redis(data) 60 - process.send(subject, Process(message)) 61 - } 62 - } 63 - 85 + process.send(subject, ServerTick) 64 86 process.sleep(2000) 65 - loop_worker(subject, conn) 87 + loop_worker(subject) 66 88 } 67 89 68 90 fn parse_data_redis(message: String) {
+12 -4
src/rinha_2025.gleam
··· 10 10 pub fn main() -> Nil { 11 11 let #(valkey_pool_name, valkey_pool) = redis.create_supervised_pool() 12 12 let valky = valkyrie.named_connection(valkey_pool_name) 13 - let worker_pool = processor.create_worker_to_read_messages() 13 + 14 + let worker_name = process.new_name("worker_pool") 15 + let worker_pool_supervised = 16 + processor.new(valky) 17 + |> processor.named(worker_name) 18 + |> processor.supervised 14 19 15 20 let ctx = server.Context(valkye_conn: valky) 16 21 ··· 18 23 supervisor.new(supervisor.OneForOne) 19 24 |> supervisor.add(valkey_pool) 20 25 |> supervisor.add(web.create_server_supervised(ctx)) 21 - |> supervisor.add(supervision.worker(fn() { worker_pool })) 26 + |> supervisor.add(worker_pool_supervised) 22 27 |> supervisor.start 23 28 24 - let assert Ok(pool) = worker_pool 25 - processor.loop_worker(pool.data, valky) 29 + process.spawn(fn() { 30 + worker_name 31 + |> process.named_subject 32 + |> processor.loop_worker 33 + }) 26 34 27 35 process.sleep_forever() 28 36 }