//! Lumina > Server > Database
//!
//! Database management and connection pooling module.
/*
* Lumina/Peonies
* Copyright (C) 2018-2026 MLC 'Strawmelonjuice' Bloeiman and contributors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
use crate::EnvVar::*;
use crate::errors::LuminaError::{self};
use crate::helpers::events::EventLogger;
use crate::postgres;
use crate::timeline;
use crate::{info_elog, success_elog, warn_elog};
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use bb8_redis::RedisConnectionManager;
use cynthia_con::{CynthiaColors, CynthiaStyles};
use std::time::Duration;
use tokio_postgres::NoTls;
pub(crate) async fn setup() -> Result {
let ev_log = EventLogger::new(&None);
let redis_url =
std::env::var("LUMINA_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".into());
let redis_pool = {
info_elog!(ev_log, "Setting up Redis connection to {}...", redis_url);
let manager = RedisConnectionManager::new(redis_url.clone())?;
// Configure pool sizes
let redis_pool = Pool::builder()
.max_size(50)
.connection_timeout(Duration::from_secs(5))
.idle_timeout(Some(Duration::from_secs(300)))
.build(manager)
.await?;
success_elog!(
ev_log,
"Redis connection to {} created successfully.",
redis_url
);
redis_pool
};
{
let pg_config: tokio_postgres::Config = {
let mut uuu = (
"unspecified database".to_string(),
"unspecified host".to_string(),
"unknown port".to_string(),
);
let mut pg_config = postgres::Config::new();
pg_config.user(&{
std::env::var("LUMINA_POSTGRES_USERNAME").unwrap_or("lumina".to_string())
});
let dbname =
std::env::var("LUMINA_POSTGRES_DATABASE").unwrap_or("lumina_config".to_string());
uuu.0 = dbname.clone();
pg_config.dbname(&dbname);
let port = match std::env::var("LUMINA_POSTGRES_PORT") {
Err(..) => {
warn_elog!(
ev_log,
"No Postgres database port provided under environment variable 'LUMINA_POSTGRES_PORT'. Using default value '5432'."
);
"5432".to_string()
}
Ok(c) => c,
};
uuu.2 = port.clone();
// Parse the port as u16, if it fails, return an error
pg_config.port(
port.parse::()
.map_err(|_| LuminaError::ConfInvalid(LUMINA_POSTGRES_PORT))?,
);
match std::env::var("LUMINA_POSTGRES_HOST") {
Ok(val) => {
uuu.1 = val.clone();
pg_config.host(&val);
}
Err(_) => {
warn_elog!(
ev_log,
"No Postgres database host provided under environment variable 'LUMINA_POSTGRES_HOST'. Using default value 'localhost'."
);
// Default to localhost if not set
uuu.1 = "localhost".to_string();
pg_config.host("localhost");
}
};
match std::env::var("LUMINA_POSTGRES_PASSWORD") {
Ok(val) => {
pg_config.password(&val);
}
Err(_) => {
warn_elog!(
ev_log,
"No Postgres database password provided under environment variable 'LUMINA_POSTGRES_PASSWORD'. Trying passwordless authentication."
);
}
};
info_elog!(
ev_log,
"Using Postgres database at: {} on host: {} at port: {}",
uuu.0.color_bright_cyan().style_bold(),
uuu.1.color_bright_cyan().style_bold(),
uuu.2.color_bright_cyan().style_bold(),
);
pg_config
};
// Create Postgres connection pool
let pg_manager = PostgresConnectionManager::new(pg_config.clone(), NoTls);
let pg_pool = Pool::builder().build(pg_manager).await?;
{
let pg_conn = pg_pool.get().await?;
pg_conn
.batch_execute(include_str!("../../SQL/create_pg.sql"))
.await?;
// Populate bloom filters
let mut redis_conn = redis_pool.get().await?;
let email_key = "bloom:email";
let username_key = "bloom:username";
let rows = pg_conn
.query("SELECT email, username FROM users", &[])
.await?;
for row in rows {
let email: String = row.get(0);
let username: String = row.get(1);
let _: () = redis::cmd("BF.ADD")
.arg(email_key)
.arg(email)
.query_async(&mut *redis_conn)
.await?;
let _: () = redis::cmd("BF.ADD")
.arg(username_key)
.arg(username)
.query_async(&mut *redis_conn)
.await?;
}
info_elog!(ev_log, "Bloom filters populated from PostgreSQL.",);
};
let pg_pool_clone = pg_pool.clone();
let redis_pool_clone = redis_pool.clone();
tokio::spawn(async move {
maintain(PgConn {
postgres_pool: pg_pool_clone,
redis_pool: redis_pool_clone,
})
.await
});
Ok(PgConn {
postgres_pool: pg_pool,
redis_pool,
})
}
}
/// This enum contains the postgres and redis connection and pool respectively. It used to have more variants before, and maybe it will once again.
#[derive()]
pub enum DbConn {
/// The main database is a Postgres database in this variant.
PgsqlConnection(
Pool>,
Pool,
),
}
pub(crate) trait DatabaseConnections {
/// Get a reference to the redis pool
/// This is useful for functions that need to access redis but not the main database
/// such as timeline cache management
/// This returns a clone of the pool without recreating it entirely, so it is cheap to call
fn get_redis_pool(&self) -> Pool;
/// Get a reference to the Postgres pool
/// This returns a clone of the pool without recreating it entirely, so it is cheap to call
fn get_postgres_pool(&self) -> Pool>;
/// Recreate the database connection.
async fn recreate(&self) -> PgConn
where
Self: Sized;
}
impl DatabaseConnections for DbConn {
/// Recreate the database connection.
/// This clones the pools - bb8 pools are cheap to clone as they share the underlying connections.
// This function converts a generic DbConn to the more concrete PgConn type.
async fn recreate(&self) -> PgConn {
PgConn {
postgres_pool: self.get_postgres_pool(),
redis_pool: self.get_redis_pool(),
}
}
fn get_redis_pool(&self) -> Pool {
match self {
DbConn::PgsqlConnection(_, redis_pool) => redis_pool.clone(),
}
}
fn get_postgres_pool(&self) -> Pool> {
match self {
DbConn::PgsqlConnection(pg_pool, _) => pg_pool.clone(),
}
}
}
impl DatabaseConnections for PgConn {
fn get_redis_pool(&self) -> Pool {
self.redis_pool.clone()
}
fn get_postgres_pool(&self) -> Pool> {
self.postgres_pool.clone()
}
async fn recreate(&self) -> PgConn
where
Self: Sized,
{
self.clone()
}
}
/// Simplified type only accounting for the Postgres struct, since the enum adds some future flexibility, but also a lot of overhead.
/// If all goes well, this PgConn type will have replaced DbConn entirely after a few iterations of improvement over the years.
pub struct PgConn {
pub(crate) postgres_pool: Pool>,
pub(crate) redis_pool: Pool,
}
impl From for DbConn {
/// Converts/unwraps the more concrete PgConn type to the generic DbConn counterpart.
fn from(db: PgConn) -> Self {
Self::PgsqlConnection(db.postgres_pool, db.redis_pool)
}
}
impl Clone for PgConn {
fn clone(&self) -> Self {
PgConn {
postgres_pool: self.postgres_pool.clone(),
redis_pool: self.redis_pool.clone(),
}
}
}
// This function will be used to maintain the database, such as deleting old sessions
// and managing timeline caches
pub async fn maintain(db: PgConn) {
let db = DbConn::from(db);
match db {
DbConn::PgsqlConnection(pg_pool, redis_pool) => {
let mut session_interval = tokio::time::interval(std::time::Duration::from_secs(60));
let mut cache_interval = tokio::time::interval(std::time::Duration::from_secs(300)); // 5 minutes
loop {
tokio::select! {
_ = session_interval.tick() => {
// Delete any sessions older than 20 days
if let Ok(client) = pg_pool.get().await {
let _ = client
.execute(
"DELETE FROM sessions WHERE created_at < NOW() - INTERVAL '20 days'",
&[],
)
.await;
}
}
_ = cache_interval.tick() => {
// Clean up expired timeline caches and manage cache invalidation
if let Ok(mut redis_conn) = redis_pool.get().await {
let _ = cleanup_timeline_caches(&mut redis_conn).await;
if let Ok(pg_conn) = pg_pool.get().await {
let _ = check_timeline_invalidations(&mut redis_conn, &pg_conn).await;
}
}
}
}
}
}
}
}
// Clean up expired timeline cache entries
async fn cleanup_timeline_caches(
redis_conn: &mut bb8::PooledConnection<'_, RedisConnectionManager>,
) -> Result<(), LuminaError> {
let pattern = "timeline_cache:*";
let mut cursor = 0;
loop {
let result: (u64, Vec) = redis::cmd("SCAN")
.cursor_arg(cursor)
.arg("MATCH")
.arg(pattern)
.query_async(&mut **redis_conn)
.await?;
cursor = result.0;
let keys = result.1;
let mut expired_keys = Vec::new();
for key in keys {
// Check TTL, if -1 or 0, it should be cleaned up
let ttl: i64 = redis::cmd("TTL")
.arg(&key)
.query_async(&mut **redis_conn)
.await?;
if ttl == -1 || ttl == 0 {
expired_keys.push(key);
}
}
if !expired_keys.is_empty() {
let _: () = redis::cmd("DEL")
.arg(&expired_keys)
.query_async(&mut **redis_conn)
.await?;
}
if cursor == 0 {
break;
}
}
Ok(())
}
// Check for timeline changes and invalidate caches accordingly (PostgreSQL)
async fn check_timeline_invalidations(
redis_conn: &mut bb8::PooledConnection<'_, RedisConnectionManager>,
client: &bb8::PooledConnection<'_, PostgresConnectionManager>,
) -> Result<(), LuminaError> {
// Get the last check timestamp
let last_check: Option = redis::cmd("GET")
.arg("timeline_cache_last_check")
.query_async(&mut **redis_conn)
.await
.unwrap_or(None);
let query = if let Some(timestamp) = last_check {
client
.query(
"SELECT DISTINCT tlid FROM timelines WHERE timestamp > $1",
&[×tamp],
)
.await
} else {
// First run, don't invalidate anything
let _: () = redis::cmd("SET")
.arg("timeline_cache_last_check")
.arg(
time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap(),
)
.query_async(&mut **redis_conn)
.await?;
return Ok(());
};
match query {
Ok(rows) => {
for row in rows {
let timeline_id: String = row.get(0);
let _ = timeline::invalidate_timeline_cache(redis_conn, &timeline_id).await;
}
// Update last check timestamp
let _: () = redis::cmd("SET")
.arg("timeline_cache_last_check")
.arg(
time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap(),
)
.query_async(&mut **redis_conn)
.await?;
}
Err(_) => {
// If query fails, just update timestamp to avoid repeated failures
let _: () = redis::cmd("SET")
.arg("timeline_cache_last_check")
.arg(
time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap(),
)
.query_async(&mut **redis_conn)
.await?;
}
}
Ok(())
}