Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 3.3 kB view raw
1// server/src/database/postgres/executor.gleam 2 3import database/executor.{ 4 type DbError, type Executor, type Value, Blob, Bool, ConnectionError, 5 ConstraintError, DecodeError, Float, Int, Null, PostgreSQL, QueryError, Text, 6 Timestamptz, 7} 8import gleam/dynamic/decode 9import gleam/int 10import gleam/list 11import gleam/result 12import gleam/string 13import gleam/time/timestamp 14import pog 15 16/// Create an Executor for PostgreSQL from a connection pool 17pub fn new(pool: pog.Connection) -> Executor { 18 executor.new( 19 PostgreSQL, 20 fn(sql, params) { 21 pog.query(sql) 22 |> set_pog_params(params) 23 |> pog.returning(decode.dynamic) 24 |> pog.execute(pool) 25 |> result.map(fn(returned) { returned.rows }) 26 |> result.map_error(pog_error_to_db_error) 27 }, 28 fn(sql, params) { 29 pog.query(sql) 30 |> set_pog_params(params) 31 |> pog.execute(pool) 32 |> result.map(fn(_) { Nil }) 33 |> result.map_error(pog_error_to_db_error) 34 }, 35 fn(index) { "$" <> int.to_string(index) }, 36 fn(column, field) { column <> "->>'" <> field <> "'" }, 37 fn(column, path) { 38 case path { 39 [] -> column 40 [single] -> column <> "->>'" <> single <> "'" 41 _ -> { 42 // All but last use ->, last uses ->> 43 let path_parts = 44 list.index_map(path, fn(part, i) { 45 case i == list.length(path) - 1 { 46 True -> "->>'" <> part <> "'" 47 False -> "->'" <> part <> "'" 48 } 49 }) 50 column <> string.join(path_parts, "") 51 } 52 } 53 }, 54 fn() { "NOW()" }, 55 ) 56} 57 58/// Set parameters on a pog query 59fn set_pog_params(query: pog.Query(a), params: List(Value)) -> pog.Query(a) { 60 list.fold(params, query, fn(q, param) { 61 case param { 62 Text(s) -> pog.parameter(q, pog.text(s)) 63 Int(i) -> pog.parameter(q, pog.int(i)) 64 Float(f) -> pog.parameter(q, pog.float(f)) 65 Bool(b) -> pog.parameter(q, pog.bool(b)) 66 Null -> pog.parameter(q, pog.null()) 67 Blob(b) -> pog.parameter(q, pog.bytea(b)) 68 Timestamptz(s) -> 69 // Parse ISO 8601/RFC 3339 string to timestamp 70 case timestamp.parse_rfc3339(s) { 71 Ok(ts) -> pog.parameter(q, pog.timestamp(ts)) 72 // Fall back to text if parsing fails (shouldn't happen with valid input) 73 Error(_) -> pog.parameter(q, pog.text(s)) 74 } 75 } 76 }) 77} 78 79/// Convert pog.QueryError to our DbError type 80fn pog_error_to_db_error(err: pog.QueryError) -> DbError { 81 case err { 82 pog.ConstraintViolated(message, constraint, _detail) -> 83 ConstraintError(message <> ": " <> constraint) 84 pog.PostgresqlError(code, name, message) -> 85 QueryError("[" <> code <> " " <> name <> "] " <> message) 86 pog.UnexpectedArgumentCount(expected, got) -> 87 QueryError( 88 "Expected " 89 <> int.to_string(expected) 90 <> " arguments, got " 91 <> int.to_string(got), 92 ) 93 pog.UnexpectedArgumentType(expected, got) -> 94 QueryError("Expected argument type " <> expected <> ", got " <> got) 95 pog.UnexpectedResultType(errors) -> 96 DecodeError("Failed to decode result: " <> string.inspect(errors)) 97 pog.ConnectionUnavailable -> 98 ConnectionError("Database connection unavailable") 99 pog.QueryTimeout -> QueryError("Query timeout") 100 } 101}