this repo has no description
at main 4.0 kB view raw
1import gleam/bool 2import gleam/erlang/process 3import gleam/option 4import gleam/otp/actor 5import gleam/otp/supervision 6import integrations/provider 7import models/payment_request 8import redis 9import valkyrie 10 11pub type Processor { 12 Processor( 13 redis_conn: valkyrie.Connection, 14 name: option.Option(process.Name(Message)), 15 providers: List(provider.ProviderConfig), 16 selected_provider: provider.ProviderConfig, 17 ) 18} 19 20pub type Message { 21 ServerTick 22 HealthCheck 23} 24 25pub fn new(conn: valkyrie.Connection) -> Processor { 26 Processor( 27 redis_conn: conn, 28 name: option.None, 29 providers: [], 30 selected_provider: provider.ProviderConfig( 31 url: "", 32 min_response_time: -1, 33 name: "", 34 ), 35 ) 36} 37 38pub fn named(processor: Processor, name: process.Name(Message)) -> Processor { 39 Processor(..processor, name: option.Some(name)) 40} 41 42pub fn providers( 43 processor: Processor, 44 providers: List(provider.ProviderConfig), 45) -> Processor { 46 Processor(..processor, providers: providers) 47} 48 49fn selected_provider( 50 processor: Processor, 51 provider: provider.ProviderConfig, 52) -> Processor { 53 Processor(..processor, selected_provider: provider) 54} 55 56pub fn start(processor: Processor) { 57 let ac = 58 processor 59 |> actor.new 60 |> actor.on_message(handle_message) 61 62 case processor.name { 63 option.None -> ac 64 option.Some(name) -> ac |> actor.named(name) 65 } 66 |> actor.start 67} 68 69pub fn supervised(processor: Processor) { 70 supervision.supervisor(fn() { start(processor) }) 71} 72 73fn handle_message(state: Processor, message: Message) { 74 case message { 75 ServerTick -> { 76 process.spawn(fn() { 77 case redis.read_queue_payments(state.redis_conn) { 78 "" -> Nil 79 data -> { 80 let _ = case integrate_data(state, data) { 81 Error(_) -> 82 state.redis_conn 83 |> redis.enqueue_payments([data]) 84 Ok(message) -> 85 state.redis_conn 86 |> redis.save_data(message) 87 } 88 Nil 89 } 90 } 91 }) 92 93 actor.continue(state) 94 } 95 HealthCheck -> { 96 let provider = 97 get_faster_healthcheck(state.providers, state.selected_provider) 98 99 state 100 |> selected_provider(provider) 101 |> actor.continue 102 } 103 } 104} 105 106pub fn loop_worker(subject: process.Subject(Message), processor_time: Int) { 107 process.send(subject, ServerTick) 108 process.sleep(processor_time) 109 loop_worker(subject, processor_time) 110} 111 112pub fn loop_healthcheck(subject: process.Subject(Message)) { 113 process.send(subject, HealthCheck) 114 process.sleep(5000) 115 loop_healthcheck(subject) 116} 117 118fn integrate_data(processor: Processor, data: String) { 119 let assert Ok(message) = payment_request.from_json_string(data) 120 121 let body_to_send = message |> provider.create_body 122 123 case provider.send_request(processor.selected_provider, body_to_send) { 124 Ok(_) -> { 125 message 126 |> payment_request.set_provider(processor.selected_provider.name) 127 |> payment_request.to_dict 128 |> Ok 129 } 130 _ -> Error(Nil) 131 } 132} 133 134fn get_faster_healthcheck( 135 providers: List(provider.ProviderConfig), 136 acc: provider.ProviderConfig, 137) -> provider.ProviderConfig { 138 case providers { 139 [] -> acc 140 [provider, ..rest] -> 141 case provider.health_check(provider) { 142 Error(_) -> acc 143 Ok(data) -> { 144 use <- bool.lazy_guard(when: acc.url == "", return: fn() { 145 get_faster_healthcheck( 146 rest, 147 provider.ProviderConfig( 148 ..provider, 149 min_response_time: data.min_response_time, 150 ), 151 ) 152 }) 153 154 use <- bool.lazy_guard( 155 when: acc.min_response_time <= data.min_response_time, 156 return: fn() { get_faster_healthcheck(rest, acc) }, 157 ) 158 159 get_faster_healthcheck( 160 rest, 161 provider.ProviderConfig( 162 ..provider, 163 min_response_time: data.min_response_time, 164 ), 165 ) 166 } 167 } 168 } 169}