Auto-indexing service and GraphQL API for AT Protocol Records
quickslice.slices.network/
atproto
gleam
graphql
1// server/src/database/postgres/executor.gleam
2
3import database/executor.{
4 type DbError, type Executor, type Value, Blob, Bool, ConnectionError,
5 ConstraintError, DecodeError, Float, Int, Null, PostgreSQL, QueryError, Text,
6 Timestamptz,
7}
8import gleam/dynamic/decode
9import gleam/int
10import gleam/list
11import gleam/result
12import gleam/string
13import gleam/time/timestamp
14import pog
15
16/// Create an Executor for PostgreSQL from a connection pool
17pub fn new(pool: pog.Connection) -> Executor {
18 executor.new(
19 PostgreSQL,
20 fn(sql, params) {
21 pog.query(sql)
22 |> set_pog_params(params)
23 |> pog.returning(decode.dynamic)
24 |> pog.execute(pool)
25 |> result.map(fn(returned) { returned.rows })
26 |> result.map_error(pog_error_to_db_error)
27 },
28 fn(sql, params) {
29 pog.query(sql)
30 |> set_pog_params(params)
31 |> pog.execute(pool)
32 |> result.map(fn(_) { Nil })
33 |> result.map_error(pog_error_to_db_error)
34 },
35 fn(index) { "$" <> int.to_string(index) },
36 fn(column, field) { column <> "->>'" <> field <> "'" },
37 fn(column, path) {
38 case path {
39 [] -> column
40 [single] -> column <> "->>'" <> single <> "'"
41 _ -> {
42 // All but last use ->, last uses ->>
43 let path_parts =
44 list.index_map(path, fn(part, i) {
45 case i == list.length(path) - 1 {
46 True -> "->>'" <> part <> "'"
47 False -> "->'" <> part <> "'"
48 }
49 })
50 column <> string.join(path_parts, "")
51 }
52 }
53 },
54 fn() { "NOW()" },
55 )
56}
57
58/// Set parameters on a pog query
59fn set_pog_params(query: pog.Query(a), params: List(Value)) -> pog.Query(a) {
60 list.fold(params, query, fn(q, param) {
61 case param {
62 Text(s) -> pog.parameter(q, pog.text(s))
63 Int(i) -> pog.parameter(q, pog.int(i))
64 Float(f) -> pog.parameter(q, pog.float(f))
65 Bool(b) -> pog.parameter(q, pog.bool(b))
66 Null -> pog.parameter(q, pog.null())
67 Blob(b) -> pog.parameter(q, pog.bytea(b))
68 Timestamptz(s) ->
69 // Parse ISO 8601/RFC 3339 string to timestamp
70 case timestamp.parse_rfc3339(s) {
71 Ok(ts) -> pog.parameter(q, pog.timestamp(ts))
72 // Fall back to text if parsing fails (shouldn't happen with valid input)
73 Error(_) -> pog.parameter(q, pog.text(s))
74 }
75 }
76 })
77}
78
79/// Convert pog.QueryError to our DbError type
80fn pog_error_to_db_error(err: pog.QueryError) -> DbError {
81 case err {
82 pog.ConstraintViolated(message, constraint, _detail) ->
83 ConstraintError(message <> ": " <> constraint)
84 pog.PostgresqlError(code, name, message) ->
85 QueryError("[" <> code <> " " <> name <> "] " <> message)
86 pog.UnexpectedArgumentCount(expected, got) ->
87 QueryError(
88 "Expected "
89 <> int.to_string(expected)
90 <> " arguments, got "
91 <> int.to_string(got),
92 )
93 pog.UnexpectedArgumentType(expected, got) ->
94 QueryError("Expected argument type " <> expected <> ", got " <> got)
95 pog.UnexpectedResultType(errors) ->
96 DecodeError("Failed to decode result: " <> string.inspect(errors))
97 pog.ConnectionUnavailable ->
98 ConnectionError("Database connection unavailable")
99 pog.QueryTimeout -> QueryError("Query timeout")
100 }
101}