this repo has no description
1import gleam/bool
2import gleam/erlang/process
3import gleam/option
4import gleam/otp/actor
5import gleam/otp/supervision
6import integrations/provider
7import models/payment_request
8import redis
9import valkyrie
10
11pub type Processor {
12 Processor(
13 redis_conn: valkyrie.Connection,
14 name: option.Option(process.Name(Message)),
15 providers: List(provider.ProviderConfig),
16 selected_provider: provider.ProviderConfig,
17 )
18}
19
20pub type Message {
21 ServerTick
22 HealthCheck
23}
24
25pub fn new(conn: valkyrie.Connection) -> Processor {
26 Processor(
27 redis_conn: conn,
28 name: option.None,
29 providers: [],
30 selected_provider: provider.ProviderConfig(
31 url: "",
32 min_response_time: -1,
33 name: "",
34 ),
35 )
36}
37
38pub fn named(processor: Processor, name: process.Name(Message)) -> Processor {
39 Processor(..processor, name: option.Some(name))
40}
41
42pub fn providers(
43 processor: Processor,
44 providers: List(provider.ProviderConfig),
45) -> Processor {
46 Processor(..processor, providers: providers)
47}
48
49fn selected_provider(
50 processor: Processor,
51 provider: provider.ProviderConfig,
52) -> Processor {
53 Processor(..processor, selected_provider: provider)
54}
55
56pub fn start(processor: Processor) {
57 let ac =
58 processor
59 |> actor.new
60 |> actor.on_message(handle_message)
61
62 case processor.name {
63 option.None -> ac
64 option.Some(name) -> ac |> actor.named(name)
65 }
66 |> actor.start
67}
68
69pub fn supervised(processor: Processor) {
70 supervision.supervisor(fn() { start(processor) })
71}
72
73fn handle_message(state: Processor, message: Message) {
74 case message {
75 ServerTick -> {
76 process.spawn(fn() {
77 case redis.read_queue_payments(state.redis_conn) {
78 "" -> Nil
79 data -> {
80 let _ = case integrate_data(state, data) {
81 Error(_) ->
82 state.redis_conn
83 |> redis.enqueue_payments([data])
84 Ok(message) ->
85 state.redis_conn
86 |> redis.save_data(message)
87 }
88 Nil
89 }
90 }
91 })
92
93 actor.continue(state)
94 }
95 HealthCheck -> {
96 let provider =
97 get_faster_healthcheck(state.providers, state.selected_provider)
98
99 state
100 |> selected_provider(provider)
101 |> actor.continue
102 }
103 }
104}
105
106pub fn loop_worker(subject: process.Subject(Message), processor_time: Int) {
107 process.send(subject, ServerTick)
108 process.sleep(processor_time)
109 loop_worker(subject, processor_time)
110}
111
112pub fn loop_healthcheck(subject: process.Subject(Message)) {
113 process.send(subject, HealthCheck)
114 process.sleep(5000)
115 loop_healthcheck(subject)
116}
117
118fn integrate_data(processor: Processor, data: String) {
119 let assert Ok(message) = payment_request.from_json_string(data)
120
121 let body_to_send = message |> provider.create_body
122
123 case provider.send_request(processor.selected_provider, body_to_send) {
124 Ok(_) -> {
125 message
126 |> payment_request.set_provider(processor.selected_provider.name)
127 |> payment_request.to_dict
128 |> Ok
129 }
130 _ -> Error(Nil)
131 }
132}
133
134fn get_faster_healthcheck(
135 providers: List(provider.ProviderConfig),
136 acc: provider.ProviderConfig,
137) -> provider.ProviderConfig {
138 case providers {
139 [] -> acc
140 [provider, ..rest] ->
141 case provider.health_check(provider) {
142 Error(_) -> acc
143 Ok(data) -> {
144 use <- bool.lazy_guard(when: acc.url == "", return: fn() {
145 get_faster_healthcheck(
146 rest,
147 provider.ProviderConfig(
148 ..provider,
149 min_response_time: data.min_response_time,
150 ),
151 )
152 })
153
154 use <- bool.lazy_guard(
155 when: acc.min_response_time <= data.min_response_time,
156 return: fn() { get_faster_healthcheck(rest, acc) },
157 )
158
159 get_faster_healthcheck(
160 rest,
161 provider.ProviderConfig(
162 ..provider,
163 min_response_time: data.min_response_time,
164 ),
165 )
166 }
167 }
168 }
169}