Mirror of Lumina
1//! Lumina > Server > Database
2//!
3//! Database management and connection pooling module.
4
5/*
6 * Lumina/Peonies
7 * Copyright (C) 2018-2026 MLC 'Strawmelonjuice' Bloeiman and contributors.
8 *
9 * This program is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU Affero General Public License as published
11 * by the Free Software Foundation, either version 3 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Affero General Public License for more details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see <https://www.gnu.org/licenses/>.
21 */
22use crate::EnvVar::*;
23use crate::errors::LuminaError::{self};
24use crate::helpers::events::EventLogger;
25use crate::postgres;
26use crate::timeline;
27use crate::{info_elog, success_elog, warn_elog};
28use bb8::Pool;
29use bb8_postgres::PostgresConnectionManager;
30use bb8_redis::RedisConnectionManager;
31use cynthia_con::{CynthiaColors, CynthiaStyles};
32use std::time::Duration;
33use tokio_postgres::NoTls;
34
35pub(crate) async fn setup() -> Result<PgConn, LuminaError> {
36 let ev_log = EventLogger::new(&None);
37 let redis_url =
38 std::env::var("LUMINA_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".into());
39 let redis_pool = {
40 info_elog!(ev_log, "Setting up Redis connection to {}...", redis_url);
41 let manager = RedisConnectionManager::new(redis_url.clone())?;
42 // Configure pool sizes
43 let redis_pool = Pool::builder()
44 .max_size(50)
45 .connection_timeout(Duration::from_secs(5))
46 .idle_timeout(Some(Duration::from_secs(300)))
47 .build(manager)
48 .await?;
49 success_elog!(
50 ev_log,
51 "Redis connection to {} created successfully.",
52 redis_url
53 );
54
55 redis_pool
56 };
57
58 {
59 let pg_config: tokio_postgres::Config = {
60 let mut uuu = (
61 "unspecified database".to_string(),
62 "unspecified host".to_string(),
63 "unknown port".to_string(),
64 );
65 let mut pg_config = postgres::Config::new();
66 pg_config.user(&{
67 std::env::var("LUMINA_POSTGRES_USERNAME").unwrap_or("lumina".to_string())
68 });
69 let dbname =
70 std::env::var("LUMINA_POSTGRES_DATABASE").unwrap_or("lumina_config".to_string());
71 uuu.0 = dbname.clone();
72 pg_config.dbname(&dbname);
73 let port = match std::env::var("LUMINA_POSTGRES_PORT") {
74 Err(..) => {
75 warn_elog!(
76 ev_log,
77 "No Postgres database port provided under environment variable 'LUMINA_POSTGRES_PORT'. Using default value '5432'."
78 );
79 "5432".to_string()
80 }
81 Ok(c) => c,
82 };
83 uuu.2 = port.clone();
84 // Parse the port as u16, if it fails, return an error
85 pg_config.port(
86 port.parse::<u16>()
87 .map_err(|_| LuminaError::ConfInvalid(LUMINA_POSTGRES_PORT))?,
88 );
89 match std::env::var("LUMINA_POSTGRES_HOST") {
90 Ok(val) => {
91 uuu.1 = val.clone();
92 pg_config.host(&val);
93 }
94 Err(_) => {
95 warn_elog!(
96 ev_log,
97 "No Postgres database host provided under environment variable 'LUMINA_POSTGRES_HOST'. Using default value 'localhost'."
98 );
99 // Default to localhost if not set
100 uuu.1 = "localhost".to_string();
101 pg_config.host("localhost");
102 }
103 };
104 match std::env::var("LUMINA_POSTGRES_PASSWORD") {
105 Ok(val) => {
106 pg_config.password(&val);
107 }
108 Err(_) => {
109 warn_elog!(
110 ev_log,
111 "No Postgres database password provided under environment variable 'LUMINA_POSTGRES_PASSWORD'. Trying passwordless authentication."
112 );
113 }
114 };
115 info_elog!(
116 ev_log,
117 "Using Postgres database at: {} on host: {} at port: {}",
118 uuu.0.color_bright_cyan().style_bold(),
119 uuu.1.color_bright_cyan().style_bold(),
120 uuu.2.color_bright_cyan().style_bold(),
121 );
122 pg_config
123 };
124
125 // Create Postgres connection pool
126 let pg_manager = PostgresConnectionManager::new(pg_config.clone(), NoTls);
127 let pg_pool = Pool::builder().build(pg_manager).await?;
128 {
129 let pg_conn = pg_pool.get().await?;
130 pg_conn
131 .batch_execute(include_str!("../../SQL/create_pg.sql"))
132 .await?;
133 // Populate bloom filters
134 let mut redis_conn = redis_pool.get().await?;
135 let email_key = "bloom:email";
136 let username_key = "bloom:username";
137
138 let rows = pg_conn
139 .query("SELECT email, username FROM users", &[])
140 .await?;
141 for row in rows {
142 let email: String = row.get(0);
143 let username: String = row.get(1);
144 let _: () = redis::cmd("BF.ADD")
145 .arg(email_key)
146 .arg(email)
147 .query_async(&mut *redis_conn)
148 .await?;
149 let _: () = redis::cmd("BF.ADD")
150 .arg(username_key)
151 .arg(username)
152 .query_async(&mut *redis_conn)
153 .await?;
154 }
155 info_elog!(ev_log, "Bloom filters populated from PostgreSQL.",);
156 };
157 let pg_pool_clone = pg_pool.clone();
158 let redis_pool_clone = redis_pool.clone();
159 tokio::spawn(async move {
160 maintain(PgConn {
161 postgres_pool: pg_pool_clone,
162 redis_pool: redis_pool_clone,
163 })
164 .await
165 });
166 Ok(PgConn {
167 postgres_pool: pg_pool,
168 redis_pool,
169 })
170 }
171}
172
173/// This enum contains the postgres and redis connection and pool respectively. It used to have more variants before, and maybe it will once again.
174#[derive()]
175pub enum DbConn {
176 /// The main database is a Postgres database in this variant.
177 PgsqlConnection(
178 Pool<PostgresConnectionManager<NoTls>>,
179 Pool<RedisConnectionManager>,
180 ),
181}
182
183pub(crate) trait DatabaseConnections {
184 /// Get a reference to the redis pool
185 /// This is useful for functions that need to access redis but not the main database
186 /// such as timeline cache management
187 /// This returns a clone of the pool without recreating it entirely, so it is cheap to call
188 fn get_redis_pool(&self) -> Pool<RedisConnectionManager>;
189
190 /// Get a reference to the Postgres pool
191 /// This returns a clone of the pool without recreating it entirely, so it is cheap to call
192 fn get_postgres_pool(&self) -> Pool<PostgresConnectionManager<NoTls>>;
193
194 /// Recreate the database connection.
195 async fn recreate(&self) -> PgConn
196 where
197 Self: Sized;
198}
199
200impl DatabaseConnections for DbConn {
201 /// Recreate the database connection.
202 /// This clones the pools - bb8 pools are cheap to clone as they share the underlying connections.
203 // This function converts a generic DbConn to the more concrete PgConn type.
204 async fn recreate(&self) -> PgConn {
205 PgConn {
206 postgres_pool: self.get_postgres_pool(),
207 redis_pool: self.get_redis_pool(),
208 }
209 }
210
211 fn get_redis_pool(&self) -> Pool<RedisConnectionManager> {
212 match self {
213 DbConn::PgsqlConnection(_, redis_pool) => redis_pool.clone(),
214 }
215 }
216 fn get_postgres_pool(&self) -> Pool<PostgresConnectionManager<NoTls>> {
217 match self {
218 DbConn::PgsqlConnection(pg_pool, _) => pg_pool.clone(),
219 }
220 }
221}
222
223impl DatabaseConnections for PgConn {
224 fn get_redis_pool(&self) -> Pool<RedisConnectionManager> {
225 self.redis_pool.clone()
226 }
227
228 fn get_postgres_pool(&self) -> Pool<PostgresConnectionManager<NoTls>> {
229 self.postgres_pool.clone()
230 }
231
232 async fn recreate(&self) -> PgConn
233 where
234 Self: Sized,
235 {
236 self.clone()
237 }
238}
239/// Simplified type only accounting for the Postgres struct, since the enum adds some future flexibility, but also a lot of overhead.
240/// If all goes well, this PgConn type will have replaced DbConn entirely after a few iterations of improvement over the years.
241pub struct PgConn {
242 pub(crate) postgres_pool: Pool<PostgresConnectionManager<NoTls>>,
243 pub(crate) redis_pool: Pool<RedisConnectionManager>,
244}
245
246impl From<PgConn> for DbConn {
247 /// Converts/unwraps the more concrete PgConn type to the generic DbConn counterpart.
248 fn from(db: PgConn) -> Self {
249 Self::PgsqlConnection(db.postgres_pool, db.redis_pool)
250 }
251}
252
253impl Clone for PgConn {
254 fn clone(&self) -> Self {
255 PgConn {
256 postgres_pool: self.postgres_pool.clone(),
257 redis_pool: self.redis_pool.clone(),
258 }
259 }
260}
261
262// This function will be used to maintain the database, such as deleting old sessions
263// and managing timeline caches
264pub async fn maintain(db: PgConn) {
265 let db = DbConn::from(db);
266 match db {
267 DbConn::PgsqlConnection(pg_pool, redis_pool) => {
268 let mut session_interval = tokio::time::interval(std::time::Duration::from_secs(60));
269 let mut cache_interval = tokio::time::interval(std::time::Duration::from_secs(300)); // 5 minutes
270
271 loop {
272 tokio::select! {
273 _ = session_interval.tick() => {
274 // Delete any sessions older than 20 days
275 if let Ok(client) = pg_pool.get().await {
276 let _ = client
277 .execute(
278 "DELETE FROM sessions WHERE created_at < NOW() - INTERVAL '20 days'",
279 &[],
280 )
281 .await;
282 }
283 }
284 _ = cache_interval.tick() => {
285 // Clean up expired timeline caches and manage cache invalidation
286 if let Ok(mut redis_conn) = redis_pool.get().await {
287 let _ = cleanup_timeline_caches(&mut redis_conn).await;
288 if let Ok(pg_conn) = pg_pool.get().await {
289 let _ = check_timeline_invalidations(&mut redis_conn, &pg_conn).await;
290 }
291 }
292 }
293 }
294 }
295 }
296 }
297}
298
299// Clean up expired timeline cache entries
300async fn cleanup_timeline_caches(
301 redis_conn: &mut bb8::PooledConnection<'_, RedisConnectionManager>,
302) -> Result<(), LuminaError> {
303 let pattern = "timeline_cache:*";
304 let mut cursor = 0;
305
306 loop {
307 let result: (u64, Vec<String>) = redis::cmd("SCAN")
308 .cursor_arg(cursor)
309 .arg("MATCH")
310 .arg(pattern)
311 .query_async(&mut **redis_conn)
312 .await?;
313
314 cursor = result.0;
315 let keys = result.1;
316
317 let mut expired_keys = Vec::new();
318
319 for key in keys {
320 // Check TTL, if -1 or 0, it should be cleaned up
321 let ttl: i64 = redis::cmd("TTL")
322 .arg(&key)
323 .query_async(&mut **redis_conn)
324 .await?;
325 if ttl == -1 || ttl == 0 {
326 expired_keys.push(key);
327 }
328 }
329
330 if !expired_keys.is_empty() {
331 let _: () = redis::cmd("DEL")
332 .arg(&expired_keys)
333 .query_async(&mut **redis_conn)
334 .await?;
335 }
336
337 if cursor == 0 {
338 break;
339 }
340 }
341
342 Ok(())
343}
344
345// Check for timeline changes and invalidate caches accordingly (PostgreSQL)
346async fn check_timeline_invalidations(
347 redis_conn: &mut bb8::PooledConnection<'_, RedisConnectionManager>,
348 client: &bb8::PooledConnection<'_, PostgresConnectionManager<NoTls>>,
349) -> Result<(), LuminaError> {
350 // Get the last check timestamp
351 let last_check: Option<String> = redis::cmd("GET")
352 .arg("timeline_cache_last_check")
353 .query_async(&mut **redis_conn)
354 .await
355 .unwrap_or(None);
356
357 let query = if let Some(timestamp) = last_check {
358 client
359 .query(
360 "SELECT DISTINCT tlid FROM timelines WHERE timestamp > $1",
361 &[×tamp],
362 )
363 .await
364 } else {
365 // First run, don't invalidate anything
366 let _: () = redis::cmd("SET")
367 .arg("timeline_cache_last_check")
368 .arg(
369 time::OffsetDateTime::now_utc()
370 .format(&time::format_description::well_known::Rfc3339)
371 .unwrap(),
372 )
373 .query_async(&mut **redis_conn)
374 .await?;
375 return Ok(());
376 };
377
378 match query {
379 Ok(rows) => {
380 for row in rows {
381 let timeline_id: String = row.get(0);
382 let _ = timeline::invalidate_timeline_cache(redis_conn, &timeline_id).await;
383 }
384
385 // Update last check timestamp
386 let _: () = redis::cmd("SET")
387 .arg("timeline_cache_last_check")
388 .arg(
389 time::OffsetDateTime::now_utc()
390 .format(&time::format_description::well_known::Rfc3339)
391 .unwrap(),
392 )
393 .query_async(&mut **redis_conn)
394 .await?;
395 }
396 Err(_) => {
397 // If query fails, just update timestamp to avoid repeated failures
398 let _: () = redis::cmd("SET")
399 .arg("timeline_cache_last_check")
400 .arg(
401 time::OffsetDateTime::now_utc()
402 .format(&time::format_description::well_known::Rfc3339)
403 .unwrap(),
404 )
405 .query_async(&mut **redis_conn)
406 .await?;
407 }
408 }
409
410 Ok(())
411}