at development 411 lines 15 kB view raw
1//! Lumina > Server > Database 2//! 3//! Database management and connection pooling module. 4 5/* 6 * Lumina/Peonies 7 * Copyright (C) 2018-2026 MLC 'Strawmelonjuice' Bloeiman and contributors. 8 * 9 * This program is free software: you can redistribute it and/or modify 10 * it under the terms of the GNU Affero General Public License as published 11 * by the Free Software Foundation, either version 3 of the License, or 12 * (at your option) any later version. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 * GNU Affero General Public License for more details. 18 * 19 * You should have received a copy of the GNU Affero General Public License 20 * along with this program. If not, see <https://www.gnu.org/licenses/>. 21 */ 22use crate::EnvVar::*; 23use crate::errors::LuminaError::{self}; 24use crate::helpers::events::EventLogger; 25use crate::postgres; 26use crate::timeline; 27use crate::{info_elog, success_elog, warn_elog}; 28use bb8::Pool; 29use bb8_postgres::PostgresConnectionManager; 30use bb8_redis::RedisConnectionManager; 31use cynthia_con::{CynthiaColors, CynthiaStyles}; 32use std::time::Duration; 33use tokio_postgres::NoTls; 34 35pub(crate) async fn setup() -> Result<PgConn, LuminaError> { 36 let ev_log = EventLogger::new(&None); 37 let redis_url = 38 std::env::var("LUMINA_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".into()); 39 let redis_pool = { 40 info_elog!(ev_log, "Setting up Redis connection to {}...", redis_url); 41 let manager = RedisConnectionManager::new(redis_url.clone())?; 42 // Configure pool sizes 43 let redis_pool = Pool::builder() 44 .max_size(50) 45 .connection_timeout(Duration::from_secs(5)) 46 .idle_timeout(Some(Duration::from_secs(300))) 47 .build(manager) 48 .await?; 49 success_elog!( 50 ev_log, 51 "Redis connection to {} created successfully.", 52 redis_url 53 ); 54 55 redis_pool 56 }; 57 58 { 59 let pg_config: tokio_postgres::Config = { 60 let mut uuu = ( 61 "unspecified database".to_string(), 62 "unspecified host".to_string(), 63 "unknown port".to_string(), 64 ); 65 let mut pg_config = postgres::Config::new(); 66 pg_config.user(&{ 67 std::env::var("LUMINA_POSTGRES_USERNAME").unwrap_or("lumina".to_string()) 68 }); 69 let dbname = 70 std::env::var("LUMINA_POSTGRES_DATABASE").unwrap_or("lumina_config".to_string()); 71 uuu.0 = dbname.clone(); 72 pg_config.dbname(&dbname); 73 let port = match std::env::var("LUMINA_POSTGRES_PORT") { 74 Err(..) => { 75 warn_elog!( 76 ev_log, 77 "No Postgres database port provided under environment variable 'LUMINA_POSTGRES_PORT'. Using default value '5432'." 78 ); 79 "5432".to_string() 80 } 81 Ok(c) => c, 82 }; 83 uuu.2 = port.clone(); 84 // Parse the port as u16, if it fails, return an error 85 pg_config.port( 86 port.parse::<u16>() 87 .map_err(|_| LuminaError::ConfInvalid(LUMINA_POSTGRES_PORT))?, 88 ); 89 match std::env::var("LUMINA_POSTGRES_HOST") { 90 Ok(val) => { 91 uuu.1 = val.clone(); 92 pg_config.host(&val); 93 } 94 Err(_) => { 95 warn_elog!( 96 ev_log, 97 "No Postgres database host provided under environment variable 'LUMINA_POSTGRES_HOST'. Using default value 'localhost'." 98 ); 99 // Default to localhost if not set 100 uuu.1 = "localhost".to_string(); 101 pg_config.host("localhost"); 102 } 103 }; 104 match std::env::var("LUMINA_POSTGRES_PASSWORD") { 105 Ok(val) => { 106 pg_config.password(&val); 107 } 108 Err(_) => { 109 warn_elog!( 110 ev_log, 111 "No Postgres database password provided under environment variable 'LUMINA_POSTGRES_PASSWORD'. Trying passwordless authentication." 112 ); 113 } 114 }; 115 info_elog!( 116 ev_log, 117 "Using Postgres database at: {} on host: {} at port: {}", 118 uuu.0.color_bright_cyan().style_bold(), 119 uuu.1.color_bright_cyan().style_bold(), 120 uuu.2.color_bright_cyan().style_bold(), 121 ); 122 pg_config 123 }; 124 125 // Create Postgres connection pool 126 let pg_manager = PostgresConnectionManager::new(pg_config.clone(), NoTls); 127 let pg_pool = Pool::builder().build(pg_manager).await?; 128 { 129 let pg_conn = pg_pool.get().await?; 130 pg_conn 131 .batch_execute(include_str!("../../SQL/create_pg.sql")) 132 .await?; 133 // Populate bloom filters 134 let mut redis_conn = redis_pool.get().await?; 135 let email_key = "bloom:email"; 136 let username_key = "bloom:username"; 137 138 let rows = pg_conn 139 .query("SELECT email, username FROM users", &[]) 140 .await?; 141 for row in rows { 142 let email: String = row.get(0); 143 let username: String = row.get(1); 144 let _: () = redis::cmd("BF.ADD") 145 .arg(email_key) 146 .arg(email) 147 .query_async(&mut *redis_conn) 148 .await?; 149 let _: () = redis::cmd("BF.ADD") 150 .arg(username_key) 151 .arg(username) 152 .query_async(&mut *redis_conn) 153 .await?; 154 } 155 info_elog!(ev_log, "Bloom filters populated from PostgreSQL.",); 156 }; 157 let pg_pool_clone = pg_pool.clone(); 158 let redis_pool_clone = redis_pool.clone(); 159 tokio::spawn(async move { 160 maintain(PgConn { 161 postgres_pool: pg_pool_clone, 162 redis_pool: redis_pool_clone, 163 }) 164 .await 165 }); 166 Ok(PgConn { 167 postgres_pool: pg_pool, 168 redis_pool, 169 }) 170 } 171} 172 173/// This enum contains the postgres and redis connection and pool respectively. It used to have more variants before, and maybe it will once again. 174#[derive()] 175pub enum DbConn { 176 /// The main database is a Postgres database in this variant. 177 PgsqlConnection( 178 Pool<PostgresConnectionManager<NoTls>>, 179 Pool<RedisConnectionManager>, 180 ), 181} 182 183pub(crate) trait DatabaseConnections { 184 /// Get a reference to the redis pool 185 /// This is useful for functions that need to access redis but not the main database 186 /// such as timeline cache management 187 /// This returns a clone of the pool without recreating it entirely, so it is cheap to call 188 fn get_redis_pool(&self) -> Pool<RedisConnectionManager>; 189 190 /// Get a reference to the Postgres pool 191 /// This returns a clone of the pool without recreating it entirely, so it is cheap to call 192 fn get_postgres_pool(&self) -> Pool<PostgresConnectionManager<NoTls>>; 193 194 /// Recreate the database connection. 195 async fn recreate(&self) -> PgConn 196 where 197 Self: Sized; 198} 199 200impl DatabaseConnections for DbConn { 201 /// Recreate the database connection. 202 /// This clones the pools - bb8 pools are cheap to clone as they share the underlying connections. 203 // This function converts a generic DbConn to the more concrete PgConn type. 204 async fn recreate(&self) -> PgConn { 205 PgConn { 206 postgres_pool: self.get_postgres_pool(), 207 redis_pool: self.get_redis_pool(), 208 } 209 } 210 211 fn get_redis_pool(&self) -> Pool<RedisConnectionManager> { 212 match self { 213 DbConn::PgsqlConnection(_, redis_pool) => redis_pool.clone(), 214 } 215 } 216 fn get_postgres_pool(&self) -> Pool<PostgresConnectionManager<NoTls>> { 217 match self { 218 DbConn::PgsqlConnection(pg_pool, _) => pg_pool.clone(), 219 } 220 } 221} 222 223impl DatabaseConnections for PgConn { 224 fn get_redis_pool(&self) -> Pool<RedisConnectionManager> { 225 self.redis_pool.clone() 226 } 227 228 fn get_postgres_pool(&self) -> Pool<PostgresConnectionManager<NoTls>> { 229 self.postgres_pool.clone() 230 } 231 232 async fn recreate(&self) -> PgConn 233 where 234 Self: Sized, 235 { 236 self.clone() 237 } 238} 239/// Simplified type only accounting for the Postgres struct, since the enum adds some future flexibility, but also a lot of overhead. 240/// If all goes well, this PgConn type will have replaced DbConn entirely after a few iterations of improvement over the years. 241pub struct PgConn { 242 pub(crate) postgres_pool: Pool<PostgresConnectionManager<NoTls>>, 243 pub(crate) redis_pool: Pool<RedisConnectionManager>, 244} 245 246impl From<PgConn> for DbConn { 247 /// Converts/unwraps the more concrete PgConn type to the generic DbConn counterpart. 248 fn from(db: PgConn) -> Self { 249 Self::PgsqlConnection(db.postgres_pool, db.redis_pool) 250 } 251} 252 253impl Clone for PgConn { 254 fn clone(&self) -> Self { 255 PgConn { 256 postgres_pool: self.postgres_pool.clone(), 257 redis_pool: self.redis_pool.clone(), 258 } 259 } 260} 261 262// This function will be used to maintain the database, such as deleting old sessions 263// and managing timeline caches 264pub async fn maintain(db: PgConn) { 265 let db = DbConn::from(db); 266 match db { 267 DbConn::PgsqlConnection(pg_pool, redis_pool) => { 268 let mut session_interval = tokio::time::interval(std::time::Duration::from_secs(60)); 269 let mut cache_interval = tokio::time::interval(std::time::Duration::from_secs(300)); // 5 minutes 270 271 loop { 272 tokio::select! { 273 _ = session_interval.tick() => { 274 // Delete any sessions older than 20 days 275 if let Ok(client) = pg_pool.get().await { 276 let _ = client 277 .execute( 278 "DELETE FROM sessions WHERE created_at < NOW() - INTERVAL '20 days'", 279 &[], 280 ) 281 .await; 282 } 283 } 284 _ = cache_interval.tick() => { 285 // Clean up expired timeline caches and manage cache invalidation 286 if let Ok(mut redis_conn) = redis_pool.get().await { 287 let _ = cleanup_timeline_caches(&mut redis_conn).await; 288 if let Ok(pg_conn) = pg_pool.get().await { 289 let _ = check_timeline_invalidations(&mut redis_conn, &pg_conn).await; 290 } 291 } 292 } 293 } 294 } 295 } 296 } 297} 298 299// Clean up expired timeline cache entries 300async fn cleanup_timeline_caches( 301 redis_conn: &mut bb8::PooledConnection<'_, RedisConnectionManager>, 302) -> Result<(), LuminaError> { 303 let pattern = "timeline_cache:*"; 304 let mut cursor = 0; 305 306 loop { 307 let result: (u64, Vec<String>) = redis::cmd("SCAN") 308 .cursor_arg(cursor) 309 .arg("MATCH") 310 .arg(pattern) 311 .query_async(&mut **redis_conn) 312 .await?; 313 314 cursor = result.0; 315 let keys = result.1; 316 317 let mut expired_keys = Vec::new(); 318 319 for key in keys { 320 // Check TTL, if -1 or 0, it should be cleaned up 321 let ttl: i64 = redis::cmd("TTL") 322 .arg(&key) 323 .query_async(&mut **redis_conn) 324 .await?; 325 if ttl == -1 || ttl == 0 { 326 expired_keys.push(key); 327 } 328 } 329 330 if !expired_keys.is_empty() { 331 let _: () = redis::cmd("DEL") 332 .arg(&expired_keys) 333 .query_async(&mut **redis_conn) 334 .await?; 335 } 336 337 if cursor == 0 { 338 break; 339 } 340 } 341 342 Ok(()) 343} 344 345// Check for timeline changes and invalidate caches accordingly (PostgreSQL) 346async fn check_timeline_invalidations( 347 redis_conn: &mut bb8::PooledConnection<'_, RedisConnectionManager>, 348 client: &bb8::PooledConnection<'_, PostgresConnectionManager<NoTls>>, 349) -> Result<(), LuminaError> { 350 // Get the last check timestamp 351 let last_check: Option<String> = redis::cmd("GET") 352 .arg("timeline_cache_last_check") 353 .query_async(&mut **redis_conn) 354 .await 355 .unwrap_or(None); 356 357 let query = if let Some(timestamp) = last_check { 358 client 359 .query( 360 "SELECT DISTINCT tlid FROM timelines WHERE timestamp > $1", 361 &[&timestamp], 362 ) 363 .await 364 } else { 365 // First run, don't invalidate anything 366 let _: () = redis::cmd("SET") 367 .arg("timeline_cache_last_check") 368 .arg( 369 time::OffsetDateTime::now_utc() 370 .format(&time::format_description::well_known::Rfc3339) 371 .unwrap(), 372 ) 373 .query_async(&mut **redis_conn) 374 .await?; 375 return Ok(()); 376 }; 377 378 match query { 379 Ok(rows) => { 380 for row in rows { 381 let timeline_id: String = row.get(0); 382 let _ = timeline::invalidate_timeline_cache(redis_conn, &timeline_id).await; 383 } 384 385 // Update last check timestamp 386 let _: () = redis::cmd("SET") 387 .arg("timeline_cache_last_check") 388 .arg( 389 time::OffsetDateTime::now_utc() 390 .format(&time::format_description::well_known::Rfc3339) 391 .unwrap(), 392 ) 393 .query_async(&mut **redis_conn) 394 .await?; 395 } 396 Err(_) => { 397 // If query fails, just update timestamp to avoid repeated failures 398 let _: () = redis::cmd("SET") 399 .arg("timeline_cache_last_check") 400 .arg( 401 time::OffsetDateTime::now_utc() 402 .format(&time::format_description::well_known::Rfc3339) 403 .unwrap(), 404 ) 405 .query_async(&mut **redis_conn) 406 .await?; 407 } 408 } 409 410 Ok(()) 411}