+3
-2
gleam.toml
+3
-2
gleam.toml
···
1
1
name = "gmysql"
2
-
version = "1.2.0"
2
+
version = "1.3.0"
3
3
4
4
# Fill out these fields if you intend to generate HTML documentation or publish
5
5
# your project to the Hex package manager.
6
6
#
7
-
description = "Gleam mysql client."
7
+
description = "Gleam mysql client and connection pooler."
8
8
licences = ["Apache-2.0"]
9
9
repository = { type = "github", user = "VioletBuse", repo = "gmysql" }
10
10
# links = [{ title = "Website", href = "https://gleam.run" }]
···
16
16
gleam_stdlib = ">= 0.34.0 and < 2.0.0"
17
17
mysql = ">= 1.8.0 and < 2.0.0"
18
18
gleam_erlang = ">= 0.25.0 and < 1.0.0"
19
+
gleam_otp = ">= 0.10.0 and < 1.0.0"
19
20
20
21
[dev-dependencies]
21
22
gleeunit = ">= 1.0.0 and < 2.0.0"
+3
-1
manifest.toml
+3
-1
manifest.toml
···
3
3
4
4
packages = [
5
5
{ name = "gleam_erlang", version = "0.25.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "054D571A7092D2A9727B3E5D183B7507DAB0DA41556EC9133606F09C15497373" },
6
+
{ name = "gleam_otp", version = "0.10.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "0B04FE915ACECE539B317F9652CAADBBC0F000184D586AAAF2D94C100945D72B" },
6
7
{ name = "gleam_stdlib", version = "0.38.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "663CF11861179AF415A625307447775C09404E752FF99A24E2057C835319F1BE" },
7
8
{ name = "gleeunit", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "72CDC3D3F719478F26C4E2C5FED3E657AC81EC14A47D2D2DEBB8693CA3220C3B" },
8
9
{ name = "mysql", version = "1.8.0", build_tools = ["make", "rebar3", "mix"], requirements = [], otp_app = "mysql", source = "hex", outer_checksum = "D473C479C19E5CDE20237458EEAD6673C3C00E0EF84AFD30615AEBBB67FEE7B3" },
9
10
]
10
11
11
12
[requirements]
12
-
gleam_erlang = { version = ">= 0.25.0 and < 1.0.0"}
13
+
gleam_erlang = { version = ">= 0.25.0 and < 1.0.0" }
14
+
gleam_otp = { version = ">= 0.10.0 and < 1.0.0"}
13
15
gleam_stdlib = { version = ">= 0.34.0 and < 2.0.0" }
14
16
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
15
17
mysql = { version = ">= 1.8.0 and < 2.0.0" }
+12
-3
src/gmysql.gleam
+12
-3
src/gmysql.gleam
···
1
1
import gleam/dynamic.{type Dynamic}
2
2
import gleam/erlang/charlist.{type Charlist}
3
+
import gleam/erlang/process.{type Pid}
3
4
import gleam/option.{type Option, None, Some}
4
5
5
6
pub type Connection
···
121
122
@external(erlang, "gmysql_ffi", "to_param")
122
123
pub fn to_param(param: a) -> Param
123
124
125
+
@external(erlang, "gmysql_ffi", "to_pid")
126
+
pub fn to_pid(connection: Connection) -> Pid
127
+
128
+
/// Danger, this is primarily for internal use, do not pass in pids that you did not
129
+
/// get from the `to_pid/1` function.
130
+
@external(erlang, "gmysql_ffi", "from_pid")
131
+
pub fn from_pid(connection: Pid) -> Connection
132
+
124
133
@external(erlang, "gmysql_ffi", "query")
125
134
fn query_internal(
126
135
connection: Connection,
···
134
143
on connection: Connection,
135
144
with arguments: List(Param),
136
145
expecting decoder: fn(Dynamic) -> Result(a, List(dynamic.DecodeError)),
137
-
) -> Result(a, Error) {
146
+
) -> Result(List(a), Error) {
138
147
query_with_timeout(sql, connection, arguments, decoder, Infinity)
139
148
}
140
149
···
144
153
with arguments: List(Param),
145
154
expecting decoder: fn(Dynamic) -> Result(a, List(dynamic.DecodeError)),
146
155
until timeout: Timeout,
147
-
) -> Result(a, Error) {
156
+
) -> Result(List(a), Error) {
148
157
case query_internal(connection, sql, arguments, timeout) {
149
158
Error(int) -> Error(int)
150
159
Ok(dyn) ->
151
-
case decoder(dyn) {
160
+
case dynamic.list(decoder)(dyn) {
152
161
Ok(decoded) -> Ok(decoded)
153
162
Error(decode_errors) -> Error(DecodeError(decode_errors))
154
163
}
+290
src/gmysql/pool.gleam
+290
src/gmysql/pool.gleam
···
1
+
import gleam/bool
2
+
import gleam/erlang
3
+
import gleam/erlang/process.{type Subject}
4
+
import gleam/function
5
+
import gleam/iterator
6
+
import gleam/list
7
+
import gleam/option.{type Option, None, Some}
8
+
import gleam/otp/actor.{type Next, Continue, Stop}
9
+
import gleam/otp/intensity_tracker.{type IntensityTracker}
10
+
import gmysql.{type Config, type Connection}
11
+
12
+
pub opaque type Pool {
13
+
Pool(actor: Subject(Message))
14
+
}
15
+
16
+
pub opaque type Message {
17
+
UpdateState(fn(State) -> State)
18
+
Initialize(subject: Subject(Message))
19
+
Shutdown
20
+
Restart(Connection)
21
+
RestartAll
22
+
StartNew
23
+
Checkout(client: Subject(Result(Connection, Nil)))
24
+
Checkin(connection: Connection)
25
+
Tick(Subject(Message))
26
+
}
27
+
28
+
type State {
29
+
State(slots: List(Slot), config: Config, rate_limit: IntensityTracker)
30
+
}
31
+
32
+
type Slot {
33
+
Slot(connection: Option(Connection), checked_out: Bool)
34
+
}
35
+
36
+
pub fn connect(
37
+
config: Config,
38
+
count: Int,
39
+
limit max_connections_per_second: Int,
40
+
) {
41
+
let slots =
42
+
iterator.repeat(Slot(connection: None, checked_out: False))
43
+
|> iterator.take(count)
44
+
|> iterator.to_list
45
+
46
+
let assert Ok(actor) =
47
+
actor.start(
48
+
State(
49
+
slots: slots,
50
+
config: config,
51
+
rate_limit: intensity_tracker.new(
52
+
limit: max_connections_per_second,
53
+
period: 1000,
54
+
),
55
+
),
56
+
handle_message,
57
+
)
58
+
59
+
process.send(actor, Initialize(actor))
60
+
61
+
Pool(actor)
62
+
}
63
+
64
+
pub fn disconnect(pool: Pool) {
65
+
actor.send(pool.actor, Shutdown)
66
+
}
67
+
68
+
pub fn restart_connection(pool: Pool, connection: Connection) {
69
+
actor.send(pool.actor, Restart(connection))
70
+
}
71
+
72
+
pub fn restart_all_connections(pool: Pool) {
73
+
actor.send(pool.actor, RestartAll)
74
+
}
75
+
76
+
pub fn new_connection(pool: Pool) {
77
+
actor.send(pool.actor, StartNew)
78
+
}
79
+
80
+
pub fn checkout_connection(pool: Pool, timeout: Int) {
81
+
actor.call(pool.actor, Checkout, timeout)
82
+
}
83
+
84
+
pub fn checkin_connection(pool: Pool, connection: Connection) {
85
+
actor.send(pool.actor, Checkin(connection))
86
+
}
87
+
88
+
pub type WithConnectionError {
89
+
CouldNotCheckout
90
+
}
91
+
92
+
pub fn with_connection(
93
+
pool: Pool,
94
+
wait checkout_timeout: Int,
95
+
with function: fn(Connection) -> a,
96
+
) -> Result(a, WithConnectionError) {
97
+
let until = erlang.system_time(erlang.Millisecond) + checkout_timeout
98
+
with_connection_loop(pool, until, checkout_timeout, function)
99
+
}
100
+
101
+
fn with_connection_loop(
102
+
pool: Pool,
103
+
until: Int,
104
+
timeout: Int,
105
+
function: fn(Connection) -> a,
106
+
) -> Result(a, WithConnectionError) {
107
+
use <- bool.guard(
108
+
when: erlang.system_time(erlang.Millisecond) > until,
109
+
return: Error(CouldNotCheckout),
110
+
)
111
+
112
+
case checkout_connection(pool, timeout / 3) {
113
+
Ok(connection) -> {
114
+
let result = function(connection)
115
+
checkin_connection(pool, connection)
116
+
117
+
Ok(result)
118
+
}
119
+
Error(_) -> {
120
+
process.sleep(timeout / 6)
121
+
with_connection_loop(pool, until, timeout, function)
122
+
}
123
+
}
124
+
}
125
+
126
+
fn handle_message(message: Message, state: State) -> Next(Message, State) {
127
+
case message {
128
+
UpdateState(..) -> handle_update_state(message, state)
129
+
Initialize(..) -> handle_init(message, state)
130
+
Shutdown -> handle_shutdown(message, state)
131
+
Restart(..) -> handle_restart(message, state)
132
+
RestartAll -> handle_restart_all(message, state)
133
+
StartNew -> handle_start_new(message, state)
134
+
Checkout(..) -> handle_checkout(message, state)
135
+
Checkin(..) -> handle_checkin(message, state)
136
+
Tick(..) -> handle_tick(message, state)
137
+
}
138
+
}
139
+
140
+
fn handle_update_state(message: Message, state: State) -> Next(Message, State) {
141
+
let assert UpdateState(transform) = message
142
+
let new_state = transform(state)
143
+
144
+
Continue(new_state, None)
145
+
}
146
+
147
+
fn handle_crashed_connection(exit_message: process.ExitMessage) -> Message {
148
+
let process.ExitMessage(pid, reason) = exit_message
149
+
150
+
let connection = gmysql.from_pid(pid)
151
+
case reason {
152
+
process.Normal ->
153
+
UpdateState(fn(state) {
154
+
State(
155
+
..state,
156
+
slots: list.filter(state.slots, fn(slot) {
157
+
slot.connection == Some(connection)
158
+
}),
159
+
)
160
+
})
161
+
_ ->
162
+
UpdateState(fn(state) {
163
+
State(
164
+
..state,
165
+
slots: list.map(state.slots, fn(slot) {
166
+
case slot.connection == Some(connection) {
167
+
True -> Slot(..slot, connection: None)
168
+
False -> slot
169
+
}
170
+
}),
171
+
)
172
+
})
173
+
}
174
+
}
175
+
176
+
fn handle_init(message: Message, state: State) -> Next(Message, State) {
177
+
let assert Initialize(self) = message
178
+
process.trap_exits(True)
179
+
180
+
let selector =
181
+
process.new_selector()
182
+
|> process.selecting(self, function.identity)
183
+
|> process.selecting_trapped_exits(handle_crashed_connection)
184
+
185
+
process.send(self, Tick(self))
186
+
187
+
Continue(state, Some(selector))
188
+
}
189
+
190
+
fn handle_shutdown(_message: Message, state: State) -> Next(Message, State) {
191
+
list.each(state.slots, fn(slot) {
192
+
case slot.connection {
193
+
None -> Nil
194
+
Some(conn) -> gmysql.disconnect(conn)
195
+
}
196
+
})
197
+
198
+
Stop(process.Normal)
199
+
}
200
+
201
+
fn handle_restart(message: Message, state: State) -> Next(Message, State) {
202
+
let assert Restart(conn) = message
203
+
let pid = gmysql.to_pid(conn)
204
+
process.kill(pid)
205
+
206
+
actor.continue(state)
207
+
}
208
+
209
+
fn handle_restart_all(_message: Message, state: State) -> Next(Message, State) {
210
+
list.each(state.slots, fn(slot) {
211
+
case slot.connection {
212
+
None -> Nil
213
+
Some(conn) -> gmysql.to_pid(conn) |> process.kill
214
+
}
215
+
})
216
+
217
+
actor.continue(state)
218
+
}
219
+
220
+
fn handle_start_new(_message: Message, state: State) -> Next(Message, State) {
221
+
actor.continue(
222
+
State(
223
+
..state,
224
+
slots: [Slot(connection: None, checked_out: False), ..state.slots],
225
+
),
226
+
)
227
+
}
228
+
229
+
fn handle_checkout(message: Message, state: State) -> Next(Message, State) {
230
+
let assert Checkout(client) = message
231
+
232
+
let #(connection, slots) =
233
+
list.map_fold(state.slots, None, fn(connection, slot) {
234
+
case connection, slot {
235
+
Some(..), _ -> #(connection, slot)
236
+
None, Slot(connection: Some(conn), checked_out: False) -> #(
237
+
Some(conn),
238
+
Slot(..slot, checked_out: True),
239
+
)
240
+
_, _ -> #(connection, slot)
241
+
}
242
+
})
243
+
244
+
connection |> option.to_result(Nil) |> process.send(client, _)
245
+
246
+
actor.continue(State(..state, slots: slots))
247
+
}
248
+
249
+
fn handle_checkin(message: Message, state: State) -> Next(Message, State) {
250
+
let assert Checkin(connection) = message
251
+
252
+
actor.continue(
253
+
State(
254
+
..state,
255
+
slots: list.map(state.slots, fn(slot) {
256
+
case slot.connection == Some(connection) {
257
+
True -> Slot(..slot, checked_out: False)
258
+
False -> slot
259
+
}
260
+
}),
261
+
),
262
+
)
263
+
}
264
+
265
+
fn handle_tick(message: Message, state: State) -> Next(Message, State) {
266
+
let assert Tick(self) = message
267
+
268
+
let #(rate_limiter, slots) =
269
+
list.map_fold(state.slots, state.rate_limit, fn(rate_limit, slot) {
270
+
case intensity_tracker.add_event(rate_limit), slot {
271
+
Ok(limiter), Slot(connection: None, ..) ->
272
+
case gmysql.connect(state.config) {
273
+
Ok(connection) -> #(
274
+
limiter,
275
+
Slot(connection: Some(connection), checked_out: False),
276
+
)
277
+
Error(_) -> #(limiter, slot)
278
+
}
279
+
_, Slot(connection: None, ..) -> #(
280
+
rate_limit,
281
+
Slot(..slot, checked_out: False),
282
+
)
283
+
_, slot -> #(rate_limit, slot)
284
+
}
285
+
})
286
+
287
+
process.send_after(self, 250, Tick(self))
288
+
289
+
actor.continue(State(..state, slots: slots, rate_limit: rate_limiter))
290
+
}
+8
-2
src/gmysql_ffi.erl
+8
-2
src/gmysql_ffi.erl
···
2
2
3
3
-export([connect/1, exec/3, to_param/1, query/4,
4
4
with_connection/2, with_transaction/3, close/1,
5
-
from_timeout/1]).
5
+
from_timeout/1, to_pid/1, from_pid/1]).
6
6
7
7
connect(ConnectOpts) ->
8
8
try
···
24
24
to_param(Param) ->
25
25
Param.
26
26
27
+
to_pid(Connection) ->
28
+
Connection.
29
+
30
+
from_pid(Pid) ->
31
+
Pid.
32
+
27
33
from_timeout(Timeout) ->
28
34
case Timeout of
29
35
infinity -> infinity;
···
34
40
case mysql:query(Connection, Query, Params, from_timeout(Timeout)) of
35
41
ok -> {ok, []};
36
42
{ok, ok} -> {ok, []};
37
-
{ok, ColNameList, Rows} -> {ok, Rows};
43
+
{ok, _, Rows} -> {ok, Rows};
38
44
{ok, ResultsList} -> {ok, ResultsList};
39
45
{error, {Code, _, Message}} -> {error, {server_error, Code, Message}};
40
46
{error, Any} -> {error, {unknown_error, Any}}