Auto-indexing service and GraphQL API for AT Protocol Records
at validation 188 lines 5.2 kB view raw
1// server/src/database/postgres/connection.gleam 2 3import database/executor.{type DbError, type Executor, ConnectionError} 4import database/postgres/executor as postgres_executor 5import gleam/erlang/process 6import gleam/int 7import gleam/list 8import gleam/option.{None, Some} 9import gleam/otp/actor 10import gleam/result 11import gleam/string 12import gleam/uri 13import logging 14import pog 15 16/// Default connection pool size 17const default_pool_size = 10 18 19/// Default idle interval in milliseconds (how often to ping idle connections) 20/// Set to 30 seconds to reduce noise from connection health checks 21const default_idle_interval = 30_000 22 23/// Connect to PostgreSQL database and return an Executor 24pub fn connect(url: String) -> Result(Executor, DbError) { 25 use config <- result.try(parse_url(url)) 26 27 // Generate a unique pool name 28 let pool_name = process.new_name("pog_pool") 29 30 let pog_config = 31 pog.default_config(pool_name: pool_name) 32 |> pog.host(config.host) 33 |> pog.port(config.port) 34 |> pog.database(config.database) 35 |> pog.user(config.user) 36 |> apply_password(config.password) 37 |> pog.pool_size(config.pool_size) 38 |> pog.idle_interval(config.idle_interval) 39 |> pog.ssl(case config.ssl { 40 True -> pog.SslUnverified 41 False -> pog.SslDisabled 42 }) 43 44 case pog.start(pog_config) { 45 Ok(started) -> { 46 logging.log( 47 logging.Info, 48 "Connected to PostgreSQL database: " 49 <> config.host 50 <> "/" 51 <> config.database, 52 ) 53 Ok(postgres_executor.new(started.data)) 54 } 55 Error(actor.InitTimeout) -> 56 Error(ConnectionError("PostgreSQL connection timeout")) 57 Error(actor.InitFailed(_reason)) -> 58 Error(ConnectionError("Failed to connect to PostgreSQL")) 59 Error(actor.InitExited(_reason)) -> 60 Error(ConnectionError("PostgreSQL connection process exited")) 61 } 62} 63 64/// Apply password to config if present 65fn apply_password( 66 config: pog.Config, 67 password: option.Option(String), 68) -> pog.Config { 69 case password { 70 Some(p) -> pog.password(config, Some(p)) 71 None -> config 72 } 73} 74 75/// Parsed PostgreSQL connection config 76type PgConfig { 77 PgConfig( 78 host: String, 79 port: Int, 80 database: String, 81 user: String, 82 password: option.Option(String), 83 pool_size: Int, 84 idle_interval: Int, 85 ssl: Bool, 86 ) 87} 88 89/// Parse a PostgreSQL URL into connection config 90/// Format: postgres://user:password@host:port/database?pool_size=N&idle_interval=N&sslmode=disable 91fn parse_url(url: String) -> Result(PgConfig, DbError) { 92 case uri.parse(url) { 93 Ok(parsed) -> { 94 let host = option.unwrap(parsed.host, "localhost") 95 let port = option.unwrap(parsed.port, 5432) 96 let database = case parsed.path { 97 "/" <> db -> db 98 db -> db 99 } 100 101 // Parse userinfo (user:password) 102 let #(user, password) = case parsed.userinfo { 103 Some(userinfo) -> 104 case string.split_once(userinfo, ":") { 105 Ok(#(u, p)) -> #(u, Some(p)) 106 Error(_) -> #(userinfo, None) 107 } 108 None -> #("postgres", None) 109 } 110 111 // Parse query params for pool_size, idle_interval, and sslmode 112 let #(pool_size, idle_interval, ssl) = case parsed.query { 113 Some(query) -> #( 114 parse_pool_size(query), 115 parse_idle_interval(query), 116 parse_ssl_mode(query), 117 ) 118 None -> #(default_pool_size, default_idle_interval, True) 119 } 120 121 case database { 122 "" -> Error(ConnectionError("No database specified in PostgreSQL URL")) 123 _ -> 124 Ok(PgConfig( 125 host: host, 126 port: port, 127 database: database, 128 user: user, 129 password: password, 130 pool_size: pool_size, 131 idle_interval: idle_interval, 132 ssl: ssl, 133 )) 134 } 135 } 136 Error(_) -> Error(ConnectionError("Invalid PostgreSQL URL: " <> url)) 137 } 138} 139 140/// Parse pool_size from query string 141fn parse_pool_size(query: String) -> Int { 142 query 143 |> string.split("&") 144 |> list.find_map(fn(param) { 145 case string.split_once(param, "=") { 146 Ok(#("pool_size", value)) -> 147 case int.parse(value) { 148 Ok(n) -> Ok(n) 149 Error(_) -> Error(Nil) 150 } 151 _ -> Error(Nil) 152 } 153 }) 154 |> result.unwrap(default_pool_size) 155} 156 157/// Parse idle_interval from query string (in milliseconds) 158/// How often the pool pings idle connections to check if they're still alive 159fn parse_idle_interval(query: String) -> Int { 160 query 161 |> string.split("&") 162 |> list.find_map(fn(param) { 163 case string.split_once(param, "=") { 164 Ok(#("idle_interval", value)) -> 165 case int.parse(value) { 166 Ok(n) -> Ok(n) 167 Error(_) -> Error(Nil) 168 } 169 _ -> Error(Nil) 170 } 171 }) 172 |> result.unwrap(default_idle_interval) 173} 174 175/// Parse sslmode from query string 176/// Returns True (SSL enabled) unless sslmode=disable is specified 177fn parse_ssl_mode(query: String) -> Bool { 178 query 179 |> string.split("&") 180 |> list.find_map(fn(param) { 181 case string.split_once(param, "=") { 182 Ok(#("sslmode", "disable")) -> Ok(False) 183 Ok(#("sslmode", _)) -> Ok(True) 184 _ -> Error(Nil) 185 } 186 }) 187 |> result.unwrap(True) 188}