refactor: improve database connection handling and organization

+94 -38
Cargo.lock
··· 27 27 ] 28 28 29 29 [[package]] 30 + name = "arc-swap" 31 + version = "1.7.1" 32 + source = "registry+https://github.com/rust-lang/crates.io-index" 33 + checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" 34 + 35 + [[package]] 30 36 name = "async-stream" 31 37 version = "0.3.6" 32 38 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 81 87 checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" 82 88 83 89 [[package]] 90 + name = "backon" 91 + version = "1.6.0" 92 + source = "registry+https://github.com/rust-lang/crates.io-index" 93 + checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" 94 + dependencies = [ 95 + "fastrand", 96 + ] 97 + 98 + [[package]] 84 99 name = "backtrace" 85 100 version = "0.3.74" 86 101 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 102 117 checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" 103 118 104 119 [[package]] 120 + name = "bb8" 121 + version = "0.8.6" 122 + source = "registry+https://github.com/rust-lang/crates.io-index" 123 + checksum = "d89aabfae550a5c44b43ab941844ffcd2e993cb6900b342debf59e9ea74acdb8" 124 + dependencies = [ 125 + "async-trait", 126 + "futures-util", 127 + "parking_lot", 128 + "tokio", 129 + ] 130 + 131 + [[package]] 132 + name = "bb8-postgres" 133 + version = "0.8.1" 134 + source = "registry+https://github.com/rust-lang/crates.io-index" 135 + checksum = "56ac82c42eb30889b5c4ee4763a24b8c566518171ebea648cd7e3bc532c60680" 136 + dependencies = [ 137 + "async-trait", 138 + "bb8", 139 + "tokio", 140 + "tokio-postgres", 141 + ] 142 + 143 + [[package]] 144 + name = "bb8-redis" 145 + version = "0.17.0" 146 + source = "registry+https://github.com/rust-lang/crates.io-index" 147 + checksum = "1781f22daa0ae97d934fdf04a5c66646f154a164c4bdc157ec8d3c11166c05cc" 148 + dependencies = [ 149 + "async-trait", 150 + "bb8", 151 + "redis", 152 + ] 153 + 154 + [[package]] 105 155 name = "bcrypt" 106 156 version = "0.17.0" 107 157 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 207 257 checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" 208 258 dependencies = [ 209 259 "bytes", 260 + "futures-core", 210 261 "memchr", 262 + "pin-project-lite", 263 + "tokio", 264 + "tokio-util", 211 265 ] 212 266 213 267 [[package]] ··· 405 459 dependencies = [ 406 460 "futures-channel", 407 461 "futures-core", 462 + "futures-executor", 408 463 "futures-io", 409 464 "futures-sink", 410 465 "futures-task", ··· 426 481 version = "0.3.31" 427 482 source = "registry+https://github.com/rust-lang/crates.io-index" 428 483 checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" 484 + 485 + [[package]] 486 + name = "futures-executor" 487 + version = "0.3.31" 488 + source = "registry+https://github.com/rust-lang/crates.io-index" 489 + checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" 490 + dependencies = [ 491 + "futures-core", 492 + "futures-task", 493 + "futures-util", 494 + ] 429 495 430 496 [[package]] 431 497 name = "futures-io" ··· 646 712 "httpdate", 647 713 "itoa", 648 714 "pin-project-lite", 649 - "socket2 0.5.9", 715 + "socket2", 650 716 "tokio", 651 717 "tower-service", 652 718 "tracing", ··· 830 896 ] 831 897 832 898 [[package]] 899 + name = "itertools" 900 + version = "0.13.0" 901 + source = "registry+https://github.com/rust-lang/crates.io-index" 902 + checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" 903 + dependencies = [ 904 + "either", 905 + ] 906 + 907 + [[package]] 833 908 name = "itoa" 834 909 version = "1.0.15" 835 910 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 905 980 version = "0.1.0" 906 981 dependencies = [ 907 982 "base64", 983 + "bb8", 984 + "bb8-postgres", 985 + "bb8-redis", 908 986 "bcrypt", 909 987 "cynthia_con", 910 988 "dotenv", 911 - "r2d2", 912 989 "redis", 913 990 "regex", 914 991 "rocket", ··· 1265 1342 checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" 1266 1343 1267 1344 [[package]] 1268 - name = "r2d2" 1269 - version = "0.8.10" 1270 - source = "registry+https://github.com/rust-lang/crates.io-index" 1271 - checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" 1272 - dependencies = [ 1273 - "log", 1274 - "parking_lot", 1275 - "scheduled-thread-pool", 1276 - ] 1277 - 1278 - [[package]] 1279 1345 name = "rand" 1280 1346 version = "0.8.5" 1281 1347 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1336 1402 1337 1403 [[package]] 1338 1404 name = "redis" 1339 - version = "0.32.4" 1405 + version = "0.27.6" 1340 1406 source = "registry+https://github.com/rust-lang/crates.io-index" 1341 - checksum = "e1f66bf4cac9733a23bcdf1e0e01effbaaad208567beba68be8f67e5f4af3ee1" 1407 + checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" 1342 1408 dependencies = [ 1409 + "arc-swap", 1410 + "async-trait", 1411 + "backon", 1412 + "bytes", 1343 1413 "combine", 1414 + "futures", 1415 + "futures-util", 1416 + "itertools", 1344 1417 "itoa", 1345 1418 "num-bigint", 1346 1419 "percent-encoding", 1347 - "r2d2", 1420 + "pin-project-lite", 1348 1421 "ryu", 1349 1422 "sha1_smol", 1350 - "socket2 0.6.0", 1423 + "socket2", 1424 + "tokio", 1425 + "tokio-util", 1351 1426 "url", 1352 1427 ] 1353 1428 ··· 1547 1622 checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" 1548 1623 1549 1624 [[package]] 1550 - name = "scheduled-thread-pool" 1551 - version = "0.2.7" 1552 - source = "registry+https://github.com/rust-lang/crates.io-index" 1553 - checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" 1554 - dependencies = [ 1555 - "parking_lot", 1556 - ] 1557 - 1558 - [[package]] 1559 1625 name = "scoped-tls" 1560 1626 version = "1.0.1" 1561 1627 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1689 1755 dependencies = [ 1690 1756 "libc", 1691 1757 "windows-sys 0.52.0", 1692 - ] 1693 - 1694 - [[package]] 1695 - name = "socket2" 1696 - version = "0.6.0" 1697 - source = "registry+https://github.com/rust-lang/crates.io-index" 1698 - checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" 1699 - dependencies = [ 1700 - "libc", 1701 - "windows-sys 0.59.0", 1702 1758 ] 1703 1759 1704 1760 [[package]] ··· 1925 1981 "parking_lot", 1926 1982 "pin-project-lite", 1927 1983 "signal-hook-registry", 1928 - "socket2 0.5.9", 1984 + "socket2", 1929 1985 "tokio-macros", 1930 1986 "windows-sys 0.52.0", 1931 1987 ] ··· 1961 2017 "postgres-protocol", 1962 2018 "postgres-types", 1963 2019 "rand 0.9.1", 1964 - "socket2 0.5.9", 2020 + "socket2", 1965 2021 "tokio", 1966 2022 "tokio-util", 1967 2023 "whoami",
+4 -2
server/Cargo.toml
··· 20 20 dotenv = "0.15.0" 21 21 tokio-postgres = { version = "0.7.13", features = ["with-uuid-1"] } 22 22 bcrypt = "0.17.0" 23 - r2d2 = "0.8.10" 23 + bb8 = "0.8" 24 + bb8-postgres = "0.8" 25 + bb8-redis = "0.17" 24 26 tabled = "0.20.0" 25 27 regex = "1.10.5" 26 - redis = { version = "0.32.4", features = ["r2d2"] } 28 + redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] } 27 29 time = "0.3.20" 28 30 base64 = "0.22"
+6 -6
server/src/client_communication.rs
··· 53 53 ) -> ws::Channel<'k> { 54 54 let ev_log = { 55 55 let appstate = state.0.clone(); 56 - appstate.event_logger.clone().await 56 + appstate.event_logger.clone() 57 57 }; 58 58 http_code_elog!(ev_log, 200, "/connection"); 59 59 use rocket::futures::{SinkExt, StreamExt}; ··· 153 153 "User created: {}", 154 154 user.clone().username.color_bright_cyan() 155 155 ); 156 - match User::create_session(user, db, ev_log.clone().await).await { 156 + match User::create_session(user, db, ev_log.clone()).await { 157 157 Ok((session_reference, user)) => { 158 158 client_session_data.user = 159 159 Some(user.clone()); ··· 173 173 match e { 174 174 LuminaError::Postgres(e) => 175 175 error_elog!(ev_log,"While creating session token: {:?}", e), 176 - LuminaError::R2D2Pool(e) => 177 - warn_elog!(ev_log,"There was an error creating session token: {:?}", e), 176 + LuminaError::Bb8Pool(e) => 177 + warn_elog!(ev_log,"There was an error creating session token: {}", e), 178 178 _ => {} 179 179 } 180 180 // I would return a more specific error message ··· 286 286 } else { 287 287 let appstate = state.0.clone(); 288 288 let db = &appstate.db.lock().await; 289 - let msgback = match User::authenticate(email_username.clone(), password, db, ev_log.clone().await).await { 289 + let msgback = match User::authenticate(email_username.clone(), password, db, ev_log.clone()).await { 290 290 Ok((session_reference, user)) => { 291 291 incoming_elog!(ev_log,"User {} authenticated to session with id {}.\n{}", user.username.clone().color_bright_cyan(), session_reference.session_id.to_string().color_pink(), format!("(User id: {})", user.id).style_dim()); 292 292 client_session_data.user = Some(user.clone()); ··· 342 342 let db = &appstate.db.lock().await; 343 343 // Fetch post IDs for the requested timeline 344 344 match fetch_timeline_post_ids_by_timeline_name( 345 - ev_log.clone().await, 345 + ev_log.clone(), 346 346 db, 347 347 &name, 348 348 client_session_data.user.clone().unwrap(),
+147 -123
server/src/database.rs
··· 25 25 use crate::timeline; 26 26 use crate::{info_elog, success_elog, warn_elog}; 27 27 use cynthia_con::{CynthiaColors, CynthiaStyles}; 28 - use r2d2::Pool; 29 - use redis::Commands; 28 + use bb8::Pool; 29 + use bb8_postgres::PostgresConnectionManager; 30 + use bb8_redis::RedisConnectionManager; 30 31 use tokio_postgres as postgres; 31 - use tokio_postgres::tls::NoTlsStream; 32 - use tokio_postgres::{Client, Connection, Socket}; 32 + use tokio_postgres::NoTls; 33 + use std::time::Duration; 33 34 34 - pub(crate) async fn setup() -> Result<DbConn, LuminaError> { 35 - let ev_log = EventLogger::new(&None).await; 35 + pub(crate) async fn setup() -> Result<PgConn, LuminaError> { 36 + let ev_log = EventLogger::new(&None); 36 37 let redis_url = 37 38 std::env::var("LUMINA_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".into()); 38 - let redis_pool: Pool<redis::Client> = { 39 + let redis_pool = { 39 40 info_elog!(ev_log, "Setting up Redis connection to {}...", redis_url); 40 - let client = redis::Client::open(redis_url.clone()).map_err(LuminaError::Redis)?; 41 - Pool::builder().build(client).map_err(LuminaError::R2D2Pool) 42 - }?; 43 - success_elog!( 44 - ev_log, 45 - "Redis connection to {} created successfully.", 46 - redis_url 47 - ); 41 + let manager = RedisConnectionManager::new(redis_url.clone()).map_err(LuminaError::Redis)?; 42 + // Configure pool sizes 43 + let redis_pool = Pool::builder() 44 + .max_size(50) 45 + .connection_timeout(Duration::from_secs(5)) 46 + .idle_timeout(Some(Duration::from_secs(300))) 47 + .build(manager).await?; 48 + success_elog!( 49 + ev_log, 50 + "Redis connection to {} created successfully.", 51 + redis_url 52 + ); 53 + 54 + redis_pool 55 + }; 48 56 49 57 { 50 58 let pg_config: tokio_postgres::Config = { ··· 114 122 pg_config 115 123 }; 116 124 117 - // Connect to the database 118 - let conn: (Client, Connection<Socket, NoTlsStream>) = pg_config 119 - .connect(postgres::tls::NoTls) 125 + // Create Postgres connection pool 126 + let pg_manager = PostgresConnectionManager::new(pg_config.clone(), NoTls); 127 + let pg_pool = Pool::builder() 128 + .build(pg_manager) 120 129 .await 121 - .map_err(LuminaError::Postgres)?; 122 - tokio::spawn(conn.1); 123 - // Create a second connection to the database for spawning the maintain function 124 - let conn_two: (Client, Connection<Socket, NoTlsStream>) = pg_config 125 - .connect(postgres::tls::NoTls) 126 - .await 127 - .map_err(LuminaError::Postgres)?; 128 - tokio::spawn(conn_two.1); 130 + .map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 129 131 { 130 - conn.0 132 + let pg_conn = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 133 + pg_conn 131 134 .batch_execute(include_str!("../../SQL/create_pg.sql")) 132 135 .await 133 136 .map_err(LuminaError::Postgres)?; 134 137 135 138 // Populate bloom filters 136 - let mut redis_conn = redis_pool.get().map_err(LuminaError::R2D2Pool)?; 139 + let mut redis_conn = redis_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 137 140 let email_key = "bloom:email"; 138 141 let username_key = "bloom:username"; 139 142 140 - let rows = conn 141 - .0 143 + let rows = pg_conn 142 144 .query("SELECT email, username FROM users", &[]) 143 145 .await 144 146 .map_err(LuminaError::Postgres)?; ··· 148 150 let _: () = redis::cmd("BF.ADD") 149 151 .arg(email_key) 150 152 .arg(email) 151 - .query(&mut *redis_conn) 153 + .query_async(&mut *redis_conn) 154 + .await 152 155 .map_err(LuminaError::Redis)?; 153 156 let _: () = redis::cmd("BF.ADD") 154 157 .arg(username_key) 155 158 .arg(username) 156 - .query(&mut *redis_conn) 159 + .query_async(&mut *redis_conn) 160 + .await 157 161 .map_err(LuminaError::Redis)?; 158 162 } 159 163 info_elog!(ev_log, "Bloom filters populated from PostgreSQL.",); 160 164 }; 161 - let conn_clone = conn_two.0; 162 - let pg_config_clone = pg_config.clone(); 165 + let pg_pool_clone = pg_pool.clone(); 163 166 let redis_pool_clone = redis_pool.clone(); 164 167 tokio::spawn(async move { 165 - maintain(DbConn::PgsqlConnection( 166 - (conn_clone, pg_config_clone), 167 - redis_pool_clone, 168 - )) 168 + maintain(PgConn { 169 + postgres_pool: pg_pool_clone, 170 + redis_pool: redis_pool_clone, 171 + }) 169 172 .await 170 173 }); 171 - Ok(DbConn::PgsqlConnection((conn.0, pg_config), redis_pool)) 174 + Ok(PgConn { 175 + postgres_pool: pg_pool, 176 + redis_pool, 177 + }) 172 178 } 173 179 } 174 180 175 181 /// This enum contains the postgres and redis connection and pool respectively. It used to have more variants before, and maybe it will once again. 176 182 #[derive()] 177 183 pub enum DbConn { 178 - // The config is also shared, so that for example the logger can set up its own connection, use this sparingly. 179 184 /// The main database is a Postgres database in this variant. 180 - PgsqlConnection((Client, postgres::Config), Pool<redis::Client>), 185 + PgsqlConnection(Pool<PostgresConnectionManager<NoTls>>, Pool<RedisConnectionManager>), 181 186 } 182 187 183 188 pub(crate) trait DatabaseConnections { ··· 185 190 /// This is useful for functions that need to access redis but not the main database 186 191 /// such as timeline cache management 187 192 /// This returns a clone of the pool without recreating it entirely, so it is cheap to call 188 - fn get_redis_pool(&self) -> Pool<redis::Client>; 193 + fn get_redis_pool(&self) -> Pool<RedisConnectionManager>; 194 + 195 + /// Get a reference to the Postgres pool 196 + /// This returns a clone of the pool without recreating it entirely, so it is cheap to call 197 + fn get_postgres_pool(&self) -> Pool<PostgresConnectionManager<NoTls>>; 189 198 190 199 /// Recreate the database connection. 191 - async fn recreate(&self) -> Result<Self, LuminaError> 200 + async fn recreate(&self) -> PgConn 192 201 where 193 202 Self: Sized; 194 203 } 195 204 196 205 impl DatabaseConnections for DbConn { 197 206 /// Recreate the database connection. 198 - /// This clones the pool on sqlite and for redis, and creates a new connection on postgres. 199 - async fn recreate(&self) -> Result<Self, LuminaError> { 207 + /// This clones the pools - bb8 pools are cheap to clone as they share the underlying connections. 208 + // This function converts a generic DbConn to the more concrete PgConn type. 209 + async fn recreate(&self) -> PgConn { 210 + PgConn { 211 + postgres_pool: self.get_postgres_pool(), 212 + redis_pool: self.get_redis_pool(), 213 + } 214 + } 215 + 216 + fn get_redis_pool(&self) -> Pool<RedisConnectionManager> { 200 217 match self { 201 - DbConn::PgsqlConnection((_, config), redis_pool) => { 202 - let c = config 203 - .connect(tokio_postgres::tls::NoTls) 204 - .await 205 - .map_err(LuminaError::Postgres)? 206 - .0; 207 - let r = redis_pool.clone(); 208 - 209 - Ok(DbConn::PgsqlConnection((c, config.to_owned()), r)) 210 - } 218 + DbConn::PgsqlConnection(_, redis_pool) => redis_pool.clone(), 211 219 } 212 220 } 213 - 214 - fn get_redis_pool(&self) -> Pool<redis::Client> { 221 + fn get_postgres_pool(&self) -> Pool<PostgresConnectionManager<NoTls>> { 215 222 match self { 216 - DbConn::PgsqlConnection((_, _), redis_pool) => redis_pool.clone(), 223 + DbConn::PgsqlConnection(pg_pool, _) => pg_pool.clone(), 217 224 } 218 225 } 219 226 } 220 227 228 + 221 229 impl DatabaseConnections for PgConn { 222 - fn get_redis_pool(&self) -> Pool<redis::Client> { 230 + fn get_redis_pool(&self) -> Pool<RedisConnectionManager> { 223 231 self.redis_pool.clone() 224 232 } 225 233 226 - async fn recreate(&self) -> Result<Self, LuminaError> { 227 - let postgres = self 228 - .postgres_config 229 - .connect(tokio_postgres::tls::NoTls) 230 - .await 231 - .map_err(LuminaError::Postgres)? 232 - .0; 233 - let postgres_config = self.postgres_config.to_owned(); 234 - let redis_pool = self.redis_pool.clone(); 235 - Ok(PgConn { 236 - postgres, 237 - postgres_config, 238 - redis_pool, 239 - }) 234 + 235 + fn get_postgres_pool(&self) -> Pool<PostgresConnectionManager<NoTls>> { 236 + self.postgres_pool.clone() 237 + } 238 + 239 + async fn recreate(&self) -> PgConn 240 + where 241 + Self: Sized { 242 + self.clone() 240 243 } 241 244 } 242 245 /// Simplified type only accounting for the Postgres struct, since the enum adds some future flexibility, but also a lot of overhead. 243 246 /// If all goes well, this PgConn type will have replaced DbConn entirely after a few iterations of improvement over the years. 244 247 pub struct PgConn { 245 - pub(crate) postgres: Client, 246 - postgres_config: postgres::Config, 247 - pub(crate) redis_pool: Pool<redis::Client>, 248 + pub(crate) postgres_pool: Pool<PostgresConnectionManager<NoTls>>, 249 + pub(crate) redis_pool: Pool<RedisConnectionManager>, 248 250 } 249 251 250 - impl DbConn { 251 - /// Converts/unwraps the generic DbConn type to it's more concrete PgConn counterpart. 252 - pub(crate) fn to_pgconn(db: Self) -> PgConn { 253 - match db { 254 - Self::PgsqlConnection((a, b), c) => PgConn { 255 - postgres: a, 256 - postgres_config: b, 257 - redis_pool: c, 258 - }, 252 + impl From<PgConn> for DbConn { 253 + /// Converts/unwraps the more concrete PgConn type to the generic DbConn counterpart. 254 + fn from(db: PgConn) -> Self { 255 + Self::PgsqlConnection(db.postgres_pool, db.redis_pool) 256 + } 257 + } 258 + 259 + impl Clone for PgConn { 260 + fn clone(&self) -> Self { 261 + PgConn { 262 + postgres_pool: self.postgres_pool.clone(), 263 + redis_pool: self.redis_pool.clone(), 259 264 } 260 265 } 261 266 } 267 + 262 268 263 269 // This function will be used to maintain the database, such as deleting old sessions 264 270 // and managing timeline caches 265 - pub async fn maintain(db: DbConn) { 271 + pub async fn maintain(db: PgConn) { 272 + let db = DbConn::from(db); 266 273 match db { 267 - DbConn::PgsqlConnection((client, _), redis_pool) => { 274 + DbConn::PgsqlConnection(pg_pool, redis_pool) => { 268 275 let mut session_interval = tokio::time::interval(std::time::Duration::from_secs(60)); 269 276 let mut cache_interval = tokio::time::interval(std::time::Duration::from_secs(300)); // 5 minutes 270 277 ··· 272 279 tokio::select! { 273 280 _ = session_interval.tick() => { 274 281 // Delete any sessions older than 20 days 275 - let _ = client 276 - .execute( 277 - "DELETE FROM sessions WHERE created_at < NOW() - INTERVAL '20 days'", 278 - &[], 279 - ) 280 - .await; 282 + if let Ok(client) = pg_pool.get().await { 283 + let _ = client 284 + .execute( 285 + "DELETE FROM sessions WHERE created_at < NOW() - INTERVAL '20 days'", 286 + &[], 287 + ) 288 + .await; 289 + } 281 290 } 282 291 _ = cache_interval.tick() => { 283 292 // Clean up expired timeline caches and manage cache invalidation 284 - if let Ok(mut redis_conn) = redis_pool.get() { 293 + if let Ok(mut redis_conn) = redis_pool.get().await { 285 294 let _ = cleanup_timeline_caches(&mut redis_conn).await; 286 - let _ = check_timeline_invalidations(&mut redis_conn, &client).await; 295 + if let Ok(pg_conn) = pg_pool.get().await { 296 + let _ = check_timeline_invalidations(&mut redis_conn, &pg_conn).await; 297 + } 287 298 } 288 299 } 289 300 } ··· 293 304 } 294 305 295 306 // Clean up expired timeline cache entries 296 - async fn cleanup_timeline_caches(redis_conn: &mut redis::Connection) -> Result<(), LuminaError> { 307 + async fn cleanup_timeline_caches(redis_conn: &mut bb8::PooledConnection<'_, RedisConnectionManager>) -> Result<(), LuminaError> { 297 308 let pattern = "timeline_cache:*"; 298 309 let mut cursor = 0; 299 310 ··· 302 313 .cursor_arg(cursor) 303 314 .arg("MATCH") 304 315 .arg(pattern) 305 - .query(redis_conn) 316 + .query_async(&mut **redis_conn) 317 + .await 306 318 .map_err(LuminaError::Redis)?; 307 319 308 320 cursor = result.0; ··· 312 324 313 325 for key in keys { 314 326 // Check TTL, if -1 or 0, it should be cleaned up 315 - let ttl: i64 = redis_conn.ttl(&key).map_err(LuminaError::Redis)?; 327 + let ttl: i64 = redis::cmd("TTL") 328 + .arg(&key) 329 + .query_async(&mut **redis_conn) 330 + .await 331 + .map_err(LuminaError::Redis)?; 316 332 if ttl == -1 || ttl == 0 { 317 333 expired_keys.push(key); 318 334 } 319 335 } 320 336 321 337 if !expired_keys.is_empty() { 322 - let _: () = redis_conn.del(&expired_keys).map_err(LuminaError::Redis)?; 338 + let _: () = redis::cmd("DEL") 339 + .arg(&expired_keys) 340 + .query_async(&mut **redis_conn) 341 + .await 342 + .map_err(LuminaError::Redis)?; 323 343 } 324 344 325 345 if cursor == 0 { ··· 332 352 333 353 // Check for timeline changes and invalidate caches accordingly (PostgreSQL) 334 354 async fn check_timeline_invalidations( 335 - redis_conn: &mut redis::Connection, 336 - client: &Client, 355 + redis_conn: &mut bb8::PooledConnection<'_, RedisConnectionManager>, 356 + client: &bb8::PooledConnection<'_, PostgresConnectionManager<NoTls>>, 337 357 ) -> Result<(), LuminaError> { 338 358 // Get the last check timestamp 339 - let last_check: Option<String> = redis_conn.get("timeline_cache_last_check").unwrap_or(None); 359 + let last_check: Option<String> = redis::cmd("GET") 360 + .arg("timeline_cache_last_check") 361 + .query_async(&mut **redis_conn) 362 + .await 363 + .unwrap_or(None); 340 364 341 365 let query = if let Some(timestamp) = last_check { 342 366 client ··· 347 371 .await 348 372 } else { 349 373 // First run, don't invalidate anything 350 - let _: () = redis_conn 351 - .set( 352 - "timeline_cache_last_check", 353 - time::OffsetDateTime::now_utc() 354 - .format(&time::format_description::well_known::Rfc3339) 355 - .unwrap(), 356 - ) 374 + let _: () = redis::cmd("SET") 375 + .arg("timeline_cache_last_check") 376 + .arg(time::OffsetDateTime::now_utc() 377 + .format(&time::format_description::well_known::Rfc3339) 378 + .unwrap()) 379 + .query_async(&mut **redis_conn) 380 + .await 357 381 .map_err(LuminaError::Redis)?; 358 382 return Ok(()); 359 383 }; ··· 366 390 } 367 391 368 392 // Update last check timestamp 369 - let _: () = redis_conn 370 - .set( 371 - "timeline_cache_last_check", 372 - time::OffsetDateTime::now_utc() 373 - .format(&time::format_description::well_known::Rfc3339) 374 - .unwrap(), 375 - ) 393 + let _: () = redis::cmd("SET") 394 + .arg("timeline_cache_last_check") 395 + .arg(time::OffsetDateTime::now_utc() 396 + .format(&time::format_description::well_known::Rfc3339) 397 + .unwrap()) 398 + .query_async(&mut **redis_conn) 399 + .await 376 400 .map_err(LuminaError::Redis)?; 377 401 } 378 402 Err(_) => { 379 403 // If query fails, just update timestamp to avoid repeated failures 380 - let _: () = redis_conn 381 - .set( 382 - "timeline_cache_last_check", 383 - time::OffsetDateTime::now_utc() 384 - .format(&time::format_description::well_known::Rfc3339) 385 - .unwrap(), 386 - ) 404 + let _: () = redis::cmd("SET") 405 + .arg("timeline_cache_last_check") 406 + .arg(time::OffsetDateTime::now_utc() 407 + .format(&time::format_description::well_known::Rfc3339) 408 + .unwrap()) 409 + .query_async(&mut **redis_conn) 410 + .await 387 411 .map_err(LuminaError::Redis)?; 388 412 } 389 413 }
+19 -6
server/src/errors.rs
··· 23 23 #[derive(Debug)] 24 24 pub(crate) enum LuminaError { 25 25 ConfInvalid(String), 26 - R2D2Pool(r2d2::Error), 26 + Bb8Pool(String), 27 27 Postgres(crate::postgres::Error), 28 28 Unknown, 29 - /// Rocket failure wrapper, due to size, we only store the error source here. Construct with: 30 - /// ```rust 31 - /// (LuminaError::RocketFaillure, Some<rocket::Error>) 32 - /// ``` 33 - RocketFaillure, 29 + RocketFaillure(Box<rocket::Error>), 34 30 BcryptError, 35 31 RegisterEmailInUse, 36 32 RegisterUsernameInUse, ··· 44 40 SerializationError(String), 45 41 JoinFaillure, 46 42 } 43 + impl From<rocket::Error> for LuminaError { 44 + fn from(err: rocket::Error) -> Self { 45 + LuminaError::RocketFaillure(Box::new(err)) 46 + } 47 + } 48 + 49 + impl From<crate::postgres::Error> for LuminaError { 50 + fn from(err: crate::postgres::Error) -> Self { 51 + LuminaError::Postgres(err) 52 + } 53 + } 54 + 55 + impl From<redis::RedisError> for LuminaError { 56 + fn from(err: redis::RedisError) -> Self { 57 + LuminaError::Redis(err) 58 + } 59 + }
+26 -31
server/src/helpers/events.rs
··· 24 24 */ 25 25 26 26 use crate::LuminaError; 27 - use crate::database::{DatabaseConnections, PgConn}; 27 + use crate::database::{PgConn}; 28 28 use cynthia_con::{CynthiaColors, CynthiaStyles}; 29 29 use time::OffsetDateTime; 30 30 ··· 55 55 OnlyStdout, 56 56 } 57 57 58 + impl From<&PgConn> for EventLogger { 59 + fn from(db: &PgConn) -> Self { 60 + EventLogger::WithDatabase(Box::new(db.clone())) 61 + } 62 + } 63 + 64 + impl Clone for EventLogger { 65 + fn clone(&self) -> Self { 66 + match self { 67 + EventLogger::WithDatabase(db) => EventLogger::WithDatabase(Box::new((**db).clone())), 68 + EventLogger::OnlyStdout => EventLogger::OnlyStdout, 69 + } 70 + } 71 + } 72 + 58 73 impl EventLogger { 59 74 /// Creates a new logger instance. 60 75 /// The `db` parameter can be `None` if the database isn't connected. 61 - pub async fn new(db: &Option<PgConn>) -> Self { 76 + pub fn new(db: &Option<PgConn>) -> Self { 62 77 // For quick implementation we'll just check if not none and that's all. 63 78 match db { 64 - Some(d) => Self::from_db(d).await, 79 + Some(d) => Self::from(d), 65 80 None => Self::OnlyStdout, 66 - } 67 - } 68 - 69 - pub async fn from_db(db_: &PgConn) -> Self { 70 - match db_.recreate().await { 71 - Ok(new_db) => Self::WithDatabase(Box::new(new_db)), 72 - Err(error) => { 73 - let n = Self::OnlyStdout; 74 - n.error( 75 - format!("Could not connect the logger to the database! {:?}", error).as_str(), 76 - ) 77 - .await; 78 - n 79 - } 80 - } 81 - } 82 - 83 - pub async fn clone(&self) -> Self { 84 - match self { 85 - EventLogger::WithDatabase(db) => Self::from_db(db).await, 86 - EventLogger::OnlyStdout => Self::OnlyStdout, 87 81 } 88 82 } 89 83 ··· 176 170 .format(&time::format_description::well_known::Rfc3339) 177 171 .unwrap(); 178 172 179 - let _ = db_conn 180 - .postgres 181 - .execute( 182 - "INSERT INTO logs (type, message, timestamp) VALUES ($1, $2, $3)", 183 - &[&level_str, &message_db, &ts], 184 - ) 185 - .await; 173 + if let Ok(pg_conn) = db_conn.postgres_pool.get().await { 174 + let _ = pg_conn 175 + .execute( 176 + "INSERT INTO logs (type, message, timestamp) VALUES ($1, $2, $3)", 177 + &[&level_str, &message_db, &ts], 178 + ) 179 + .await; 180 + } 186 181 } 187 182 EventLogger::OnlyStdout => { 188 183 // Log to stdout with the prefix.
+20 -17
server/src/main.rs
··· 30 30 pub mod errors; 31 31 pub mod helpers; 32 32 mod staticroutes; 33 + #[cfg(test)] 33 34 mod tests; 34 35 mod timeline; 35 36 use helpers::events::EventLogger; ··· 55 56 port: u16, 56 57 host: IpAddr, 57 58 } 58 - use crate::database::DatabaseConnections; 59 + use crate::database::{DatabaseConnections, PgConn}; 59 60 use crate::errors::LuminaError; 60 61 use cynthia_con::{CynthiaColors, CynthiaStyles}; 61 62 use dotenv::dotenv; ··· 78 79 #[rocket::main] 79 80 async fn main() { 80 81 let me = format!("Lumina Server, version {}", env!("CARGO_PKG_VERSION")); 81 - let ev_log: EventLogger = EventLogger::new(&None).await; 82 + let ev_log: EventLogger = EventLogger::new(&None); 82 83 let args: Vec<String> = std::env::args().skip(1).collect(); 83 84 match ( 84 85 args.is_empty(), ··· 102 103 Ok(config) => { 103 104 let mut interval = 104 105 tokio::time::interval(std::time::Duration::from_millis(3000)); 105 - let mut db_mut: Option<DbConn> = None; 106 - let ev_log: EventLogger = EventLogger::new(&None).await; 106 + let mut db_mut: Option<PgConn> = None; 107 + let ev_log: EventLogger = EventLogger::new(&None); 107 108 108 109 let mut db_tries: usize = 0; 109 110 while db_mut.is_none() { ··· 131 132 error_elog!(ev_log, "While connecting to postgres database: {}", a); 132 133 None 133 134 } 134 - Err(LuminaError::R2D2Pool(a)) => { 135 + Err(LuminaError::Bb8Pool(a)) => { 135 136 error_elog!(ev_log, "While setting up database pool: {}", a); 136 137 None 137 138 } ··· 166 167 } 167 168 // If we got here, we have a database connection. 168 169 169 - let db = db_mut.unwrap(); 170 - let pg = DbConn::to_pgconn(db.recreate().await.unwrap()); 171 - let ev_log = EventLogger::from_db(&pg).await; 170 + let pg = db_mut.unwrap(); 171 + let db: DbConn = pg.clone().into(); 172 + let ev_log: EventLogger = EventLogger::new(&Some(pg)); 172 173 success_elog!(ev_log, "Database connected."); 173 174 174 175 if cfg!(debug_assertions) { 175 - let mut redis_conn = db.get_redis_pool().get().unwrap(); 176 + let redis_pool = db.get_redis_pool(); 177 + let mut redis_conn = redis_pool.get().await.unwrap(); 176 178 timeline::invalidate_timeline_cache( 177 179 &mut redis_conn, 178 180 "00000000-0000-0000-0000-000000000000", ··· 180 182 .await 181 183 .unwrap(); 182 184 let global = timeline::fetch_timeline_post_ids( 183 - ev_log.clone().await, 185 + ev_log.clone(), 184 186 &db, 185 187 "00000000-0000-0000-0000-000000000000", 186 188 None, ··· 195 197 let generated_uuid = Uuid::new_v4(); 196 198 let hello_content = "Hello world"; 197 199 198 - match db.recreate().await.unwrap() { 199 - DbConn::PgsqlConnection((client, _), _) => { 200 + match db.recreate().await.into() { 201 + DbConn::PgsqlConnection(pg_pool, _) => { 202 + let client = pg_pool.get().await.unwrap(); 200 203 // Insert Hello World post and timeline entry if not exists 201 204 let user_1_: Result<user::User, LuminaError> = 202 205 match user::User::create_user( ··· 252 255 &[&generated_uuid, &user_1.id, &hello_content], 253 256 ) 254 257 .await; 255 - let add_clone = ev_log.clone().await; 258 + let add_clone = ev_log.clone(); 256 259 timeline::add_to_timeline( 257 260 add_clone, 258 261 &db, ··· 277 280 let appstate = AppState(Arc::from(InnerAppState { 278 281 config: config.clone(), 279 282 db: Mutex::from(db), 280 - event_logger: ev_log.clone().await, 283 + event_logger: ev_log.clone(), 281 284 })); 282 285 283 286 // Create a simple in-memory IP-based rate limiter. ··· 370 373 let result = { 371 374 let g = s.await; 372 375 match g { 373 - Ok(x) => x.map_err(|e| (LuminaError::RocketFaillure, Some(e))), 374 - Err(..) => Err((LuminaError::JoinFaillure, None)), 376 + Ok(x) => x.map_err(|e| LuminaError::RocketFaillure(Box::new(e))), 377 + Err(..) => Err(LuminaError::JoinFaillure), 375 378 } 376 379 }; 377 380 match result { 378 381 Ok(_) => {} 379 - Err((LuminaError::RocketFaillure, Some(e))) => { 382 + Err(LuminaError::RocketFaillure(e)) => { 380 383 // This handling should slowly expand as I run into newer ones, the 'defh' (default handling) is good enough, but for the most-bumped into errors, I'd like to give more human responses. 381 384 let defh = 382 385 async || error_elog!(ev_log, "Error starting server: {:?}", e);
+8 -8
server/src/staticroutes.rs
··· 36 36 pub(crate) async fn index(state: &State<AppState>) -> RawHtml<String> { 37 37 let ev_log = { 38 38 let appstate = state.0.clone(); 39 - appstate.event_logger.clone().await 39 + appstate.event_logger.clone() 40 40 }; 41 41 http_code_elog!(ev_log, 200, "/"); 42 42 let js = if cfg!(debug_assertions) { ··· 71 71 pub(crate) async fn lumina_js(state: &State<AppState>) -> RawJavaScript<String> { 72 72 let ev_log = { 73 73 let appstate = state.0.clone(); 74 - appstate.event_logger.clone().await 74 + appstate.event_logger.clone() 75 75 }; 76 76 http_code_elog!(ev_log, 200, "/static/lumina.min.mjs"); 77 77 ··· 82 82 pub(crate) async fn lumina_d_js(state: &State<AppState>) -> RawJavaScript<String> { 83 83 let ev_log = { 84 84 let appstate = state.0.clone(); 85 - appstate.event_logger.clone().await 85 + appstate.event_logger.clone() 86 86 }; 87 87 http_code_elog!(ev_log, 200, "/static/lumina.mjs"); 88 88 ··· 93 93 pub(crate) async fn lumina_css(state: &State<AppState>) -> RawCss<String> { 94 94 let ev_log = { 95 95 let appstate = state.0.clone(); 96 - appstate.event_logger.clone().await 96 + appstate.event_logger.clone() 97 97 }; 98 98 http_code_elog!(ev_log, 200, "/static/lumina.css"); 99 99 ··· 104 104 pub(crate) async fn licence(state: &State<AppState>) -> RawText<String> { 105 105 let ev_log = { 106 106 let appstate = state.0.clone(); 107 - appstate.event_logger.clone().await 107 + appstate.event_logger.clone() 108 108 }; 109 109 http_code_elog!(ev_log, 200, "/licence"); 110 110 ··· 119 119 pub(crate) async fn logo_svg(state: &State<AppState>) -> (ContentType, &'static str) { 120 120 let ev_log = { 121 121 let appstate = state.0.clone(); 122 - appstate.event_logger.clone().await 122 + appstate.event_logger.clone() 123 123 }; 124 124 http_code_elog!(ev_log, 200, "/static/logo.svg"); 125 125 ··· 133 133 pub(crate) async fn favicon(state: &State<AppState>) -> (ContentType, &'static [u8]) { 134 134 let ev_log = { 135 135 let appstate = state.0.clone(); 136 - appstate.event_logger.clone().await 136 + appstate.event_logger.clone() 137 137 }; 138 138 http_code_elog!(ev_log, 200, "/favicon.ico"); 139 139 produce_logo_png() ··· 143 143 pub(crate) async fn logo_png(state: &State<AppState>) -> (ContentType, &'static [u8]) { 144 144 let ev_log = { 145 145 let appstate = state.0.clone(); 146 - appstate.event_logger.clone().await 146 + appstate.event_logger.clone() 147 147 }; 148 148 http_code_elog!(ev_log, 200, "/static/logo.png"); 149 149 produce_logo_png()
+68 -72
server/src/tests.rs
··· 16 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 17 */ 18 18 19 - #[cfg(test)] 20 - mod tests { 21 - use crate::database::{self, DatabaseConnections}; 22 - use crate::timeline; 19 + use crate::database::{self, DatabaseConnections}; 20 + use crate::timeline; 23 21 24 - #[tokio::test] 25 - async fn test_database_setup() { 26 - let result = database::setup().await; 27 - assert!(result.is_ok(), "Database setup should succeed"); 28 - } 22 + #[tokio::test] 23 + async fn test_database_setup() { 24 + let result = database::setup().await; 25 + assert!(result.is_ok(), "Database setup should succeed"); 26 + } 29 27 30 - #[tokio::test] 31 - async fn test_redis_bloom_filter() { 32 - let db = database::setup().await.expect("DB setup"); 33 - let redis_pool = db.get_redis_pool(); 34 - let mut conn = redis_pool.get().await.expect("Redis conn"); 35 - let email_key = "test_bloom:email"; 36 - let test_email = "testuser@example.com"; 37 - 38 - // Add to bloom filter 39 - let _: () = redis::cmd("BF.ADD") 40 - .arg(email_key) 41 - .arg(test_email) 42 - .query_async(&mut *conn) 43 - .await 44 - .expect("BF.ADD"); 45 - 46 - // Check if exists 47 - let exists: bool = redis::cmd("BF.EXISTS") 48 - .arg(email_key) 49 - .arg(test_email) 50 - .query_async(&mut *conn) 51 - .await 52 - .expect("BF.EXISTS"); 53 - 54 - assert!(exists, "Bloom filter should contain the test email"); 55 - 56 - // Clean up 57 - let _: () = redis::cmd("DEL") 58 - .arg(email_key) 59 - .query_async(&mut *conn) 60 - .await 61 - .unwrap_or(()); 62 - } 28 + #[tokio::test] 29 + async fn test_redis_bloom_filter() { 30 + let db = database::setup().await.expect("DB setup"); 31 + let redis_pool = db.get_redis_pool(); 32 + let mut conn = redis_pool.get().await.expect("Redis conn"); 33 + let email_key = "test_bloom:email"; 34 + let test_email = "testuser@example.com"; 63 35 64 - #[tokio::test] 65 - async fn test_timeline_invalidation() { 66 - let db = database::setup().await.expect("DB setup"); 67 - let redis_pool = db.get_redis_pool(); 68 - let mut conn = redis_pool.get().await.expect("Redis conn"); 69 - let timeline_id = "test-timeline-invalidation"; 70 - 71 - // Set a test cache key 72 - let cache_key = format!("timeline_cache:{}:page:0", timeline_id); 73 - let _: () = redis::cmd("SET") 74 - .arg(&cache_key) 75 - .arg("test_data") 76 - .query_async(&mut *conn) 77 - .await 78 - .expect("SET"); 79 - 80 - // Invalidate the timeline 81 - timeline::invalidate_timeline_cache(&mut conn, timeline_id) 82 - .await 83 - .expect("Invalidate cache"); 84 - 85 - // Verify cache was cleared 86 - let result: Option<String> = redis::cmd("GET") 87 - .arg(&cache_key) 88 - .query_async(&mut *conn) 89 - .await 90 - .unwrap_or(None); 91 - 92 - assert!(result.is_none(), "Cache should be invalidated"); 93 - } 36 + // Add to bloom filter 37 + let _: () = redis::cmd("BF.ADD") 38 + .arg(email_key) 39 + .arg(test_email) 40 + .query_async(&mut *conn) 41 + .await 42 + .expect("BF.ADD"); 43 + 44 + // Check if exists 45 + let exists: bool = redis::cmd("BF.EXISTS") 46 + .arg(email_key) 47 + .arg(test_email) 48 + .query_async(&mut *conn) 49 + .await 50 + .expect("BF.EXISTS"); 51 + 52 + assert!(exists, "Bloom filter should contain the test email"); 53 + 54 + // Clean up 55 + let _: () = redis::cmd("DEL") 56 + .arg(email_key) 57 + .query_async(&mut *conn) 58 + .await 59 + .unwrap_or(()); 94 60 } 95 61 62 + #[tokio::test] 63 + async fn test_timeline_invalidation() { 64 + let db = database::setup().await.expect("DB setup"); 65 + let redis_pool = db.get_redis_pool(); 66 + let mut conn = redis_pool.get().await.expect("Redis conn"); 67 + let timeline_id = "test-timeline-invalidation"; 68 + 69 + // Set a test cache key 70 + let cache_key = format!("timeline_cache:{}:page:0", timeline_id); 71 + let _: () = redis::cmd("SET") 72 + .arg(&cache_key) 73 + .arg("test_data") 74 + .query_async(&mut *conn) 75 + .await 76 + .expect("SET"); 77 + 78 + // Invalidate the timeline 79 + timeline::invalidate_timeline_cache(&mut conn, timeline_id) 80 + .await 81 + .expect("Invalidate cache"); 82 + 83 + // Verify cache was cleared 84 + let result: Option<String> = redis::cmd("GET") 85 + .arg(&cache_key) 86 + .query_async(&mut *conn) 87 + .await 88 + .unwrap_or(None); 89 + 90 + assert!(result.is_none(), "Cache should be invalidated"); 91 + }
+73
server/src/tests/tests.rs
··· 1 + use crate::database::{self, DatabaseConnections}; 2 + use crate::timeline; 3 + 4 + #[tokio::test] 5 + async fn test_database_setup() { 6 + let result = database::setup().await; 7 + assert!(result.is_ok(), "Database setup should succeed"); 8 + } 9 + 10 + #[tokio::test] 11 + async fn test_redis_bloom_filter() { 12 + let db = database::setup().await.expect("DB setup"); 13 + let redis_pool = db.get_redis_pool(); 14 + let mut conn = redis_pool.get().await.expect("Redis conn"); 15 + let email_key = "test_bloom:email"; 16 + let test_email = "testuser@example.com"; 17 + 18 + // Add to bloom filter 19 + let _: () = redis::cmd("BF.ADD") 20 + .arg(email_key) 21 + .arg(test_email) 22 + .query_async(&mut *conn) 23 + .await 24 + .expect("BF.ADD"); 25 + 26 + // Check if exists 27 + let exists: bool = redis::cmd("BF.EXISTS") 28 + .arg(email_key) 29 + .arg(test_email) 30 + .query_async(&mut *conn) 31 + .await 32 + .expect("BF.EXISTS"); 33 + 34 + assert!(exists, "Bloom filter should contain the test email"); 35 + 36 + // Clean up 37 + let _: () = redis::cmd("DEL") 38 + .arg(email_key) 39 + .query_async(&mut *conn) 40 + .await 41 + .unwrap_or(()); 42 + } 43 + 44 + #[tokio::test] 45 + async fn test_timeline_invalidation() { 46 + let db = database::setup().await.expect("DB setup"); 47 + let redis_pool = db.get_redis_pool(); 48 + let mut conn = redis_pool.get().await.expect("Redis conn"); 49 + let timeline_id = "test-timeline-invalidation"; 50 + 51 + // Set a test cache key 52 + let cache_key = format!("timeline_cache:{}:page:0", timeline_id); 53 + let _: () = redis::cmd("SET") 54 + .arg(&cache_key) 55 + .arg("test_data") 56 + .query_async(&mut *conn) 57 + .await 58 + .expect("SET"); 59 + 60 + // Invalidate the timeline 61 + timeline::invalidate_timeline_cache(&mut conn, timeline_id) 62 + .await 63 + .expect("Invalidate cache"); 64 + 65 + // Verify cache was cleared 66 + let result: Option<String> = redis::cmd("GET") 67 + .arg(&cache_key) 68 + .query_async(&mut *conn) 69 + .await 70 + .unwrap_or(None); 71 + 72 + assert!(result.is_none(), "Cache should be invalidated"); 73 + }
+39 -21
server/src/timeline.rs
··· 23 23 use crate::errors::LuminaError; 24 24 use crate::helpers::events::EventLogger; 25 25 use crate::{DbConn, error_elog, info_elog, user}; 26 - use redis::Commands; 27 26 use serde::{Deserialize, Serialize}; 28 27 use uuid::Uuid; 29 28 ··· 49 48 50 49 /// Check if a timeline should be cached based on traffic 51 50 async fn is_high_traffic_timeline( 52 - redis_conn: &mut redis::Connection, 51 + redis_conn: &mut bb8::PooledConnection<'_, bb8_redis::RedisConnectionManager>, 53 52 timeline_id: &str, 54 53 ) -> Result<bool, LuminaError> { 55 54 // Global timeline is always high traffic ··· 58 57 } 59 58 60 59 // Check lookup count for other timelines 61 - let lookup_count: i64 = redis_conn 62 - .get(format!("timeline_lookup:{}", timeline_id)) 60 + let lookup_count: i64 = redis::cmd("GET") 61 + .arg(format!("timeline_lookup:{}", timeline_id)) 62 + .query_async(&mut **redis_conn) 63 + .await 63 64 .unwrap_or(0); 64 65 65 66 Ok(lookup_count >= HIGH_TRAFFIC_THRESHOLD) ··· 77 78 78 79 /// Store timeline page in Redis cache 79 80 async fn cache_timeline_page( 80 - redis_conn: &mut redis::Connection, 81 + redis_conn: &mut bb8::PooledConnection<'_, bb8_redis::RedisConnectionManager>, 81 82 timeline_id: &str, 82 83 page: usize, 83 84 post_ids: &[String], ··· 98 99 .arg(cache_key) 99 100 .arg(CACHE_TTL) 100 101 .arg(serialized) 101 - .query(redis_conn) 102 + .query_async(&mut **redis_conn) 103 + .await 102 104 .map_err(LuminaError::Redis)?; 103 105 104 106 // Also cache metadata ··· 107 109 .arg(meta_key) 108 110 .arg(CACHE_TTL) 109 111 .arg(total_count) 110 - .query(redis_conn) 112 + .query_async(&mut **redis_conn) 113 + .await 111 114 .map_err(LuminaError::Redis)?; 112 115 113 116 Ok(()) ··· 115 118 116 119 /// Retrieve timeline page from Redis cache 117 120 async fn get_cached_timeline_page( 118 - redis_conn: &mut redis::Connection, 121 + redis_conn: &mut bb8::PooledConnection<'_, bb8_redis::RedisConnectionManager>, 119 122 timeline_id: &str, 120 123 page: usize, 121 124 ) -> Result<Option<CachedTimelinePage>, LuminaError> { 122 125 let cache_key = get_cache_key(timeline_id, page); 123 126 124 - let cached_data: Option<String> = redis_conn.get(cache_key).map_err(LuminaError::Redis)?; 127 + let cached_data: Option<String> = redis::cmd("GET") 128 + .arg(cache_key) 129 + .query_async(&mut **redis_conn) 130 + .await 131 + .map_err(LuminaError::Redis)?; 125 132 126 133 match cached_data { 127 134 Some(data) => { ··· 135 142 136 143 /// Invalidate all cache entries for a timeline 137 144 pub async fn invalidate_timeline_cache( 138 - redis_conn: &mut redis::Connection, 145 + redis_conn: &mut bb8::PooledConnection<'_, bb8_redis::RedisConnectionManager>, 139 146 timeline_id: &str, 140 147 ) -> Result<(), LuminaError> { 141 148 // Use SCAN to find all cache keys for this timeline ··· 147 154 .cursor_arg(cursor) 148 155 .arg("MATCH") 149 156 .arg(&pattern) 150 - .query(redis_conn) 157 + .query_async(&mut **redis_conn) 158 + .await 151 159 .map_err(LuminaError::Redis)?; 152 160 153 161 cursor = result.0; 154 162 let keys = result.1; 155 163 156 164 if !keys.is_empty() { 157 - let _: () = redis_conn.del(&keys).map_err(LuminaError::Redis)?; 165 + let _: () = redis::cmd("DEL") 166 + .arg(&keys) 167 + .query_async(&mut **redis_conn) 168 + .await 169 + .map_err(LuminaError::Redis)?; 158 170 } 159 171 160 172 if cursor == 0 { ··· 168 180 /// Fetch total count for a timeline from database 169 181 async fn fetch_timeline_total_count(db: &DbConn, timeline_id: &str) -> Result<usize, LuminaError> { 170 182 match db { 171 - DbConn::PgsqlConnection((client, _pg_config), _redis_pool) => { 183 + DbConn::PgsqlConnection(pg_pool, _redis_pool) => { 184 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 172 185 let timeline_uuid = Uuid::parse_str(timeline_id).map_err(|_| LuminaError::UUidError)?; 173 186 let row = client 174 187 .query_one( ··· 192 205 limit: usize, 193 206 ) -> Result<Vec<String>, LuminaError> { 194 207 match db { 195 - DbConn::PgsqlConnection((client, _pg_config), _redis_pool) => { 208 + DbConn::PgsqlConnection(pg_pool, _redis_pool) => { 209 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 196 210 let timeline_uuid = Uuid::parse_str(timeline_id).map_err(|_| LuminaError::UUidError)?; 197 211 let rows = client 198 212 .query( ··· 225 239 // Get Redis connection 226 240 let mut redis_conn = match db { 227 241 DbConn::PgsqlConnection(_, redis_pool) => { 228 - redis_pool.get().map_err(LuminaError::R2D2Pool)? 242 + redis_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))? 229 243 } 230 244 }; 231 245 232 246 // Log the requested timeline id for tracking 233 - let _: () = redis_conn 234 - .incr(format!("timeline_lookup:{}", timeline_id), 1) 247 + let _: () = redis::cmd("INCR") 248 + .arg(format!("timeline_lookup:{}", timeline_id)) 249 + .query_async(&mut *redis_conn) 250 + .await 235 251 .map_err(LuminaError::Redis)?; 236 252 237 253 // Check if this timeline should be cached ··· 343 359 ) -> Result<(), LuminaError> { 344 360 // Add to database 345 361 match db { 346 - DbConn::PgsqlConnection((client, _pg_config), redis_pool) => { 362 + DbConn::PgsqlConnection(pg_pool, redis_pool) => { 363 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 347 364 let timeline_uuid = Uuid::parse_str(timeline_id).map_err(|_| LuminaError::UUidError)?; 348 365 let item_uuid = Uuid::parse_str(item_id).map_err(|_| LuminaError::UUidError)?; 349 366 client ··· 355 372 .map_err(LuminaError::Postgres)?; 356 373 357 374 // Invalidate cache 358 - let mut redis_conn = redis_pool.get().map_err(LuminaError::R2D2Pool)?; 375 + let mut redis_conn = redis_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 359 376 if let Err(e) = invalidate_timeline_cache(&mut redis_conn, timeline_id).await { 360 377 error_elog!( 361 378 event_logger, ··· 380 397 ) -> Result<(), LuminaError> { 381 398 // Remove from database 382 399 match db { 383 - DbConn::PgsqlConnection((client, _pg_config), redis_pool) => { 400 + DbConn::PgsqlConnection(pg_pool, redis_pool) => { 401 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 384 402 let timeline_uuid = Uuid::parse_str(timeline_id).map_err(|_| LuminaError::UUidError)?; 385 403 let item_uuid = Uuid::parse_str(item_id).map_err(|_| LuminaError::UUidError)?; 386 404 client ··· 392 410 .map_err(LuminaError::Postgres)?; 393 411 394 412 // Invalidate cache 395 - let mut redis_conn = redis_pool.get().map_err(LuminaError::R2D2Pool)?; 413 + let mut redis_conn = redis_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 396 414 if let Err(e) = invalidate_timeline_cache(&mut redis_conn, timeline_id).await { 397 415 error_elog!( 398 416 event_logger,
+21 -11
server/src/user.rs
··· 61 61 } 62 62 async fn get_hashed_password(self, database: &DbConn) -> Result<String, LuminaError> { 63 63 match database { 64 - DbConn::PgsqlConnection((client, _), _) => { 64 + DbConn::PgsqlConnection(pg_pool, _) => { 65 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 65 66 let row = client 66 67 .query_one("SELECT password FROM users WHERE id = $1", &[&self.id]) 67 68 .await ··· 82 83 let password = 83 84 bcrypt::hash(password, bcrypt::DEFAULT_COST).map_err(|_| LuminaError::BcryptError)?; 84 85 match db { 85 - DbConn::PgsqlConnection((client, _), _) => { 86 + DbConn::PgsqlConnection(pg_pool, _) => { 87 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 86 88 // Some username and email validation should be done here 87 89 // Check if the email is already in use 88 90 let email_exists = client ··· 124 126 "username" 125 127 }; 126 128 match db { 127 - DbConn::PgsqlConnection((client, _), _) => { 129 + DbConn::PgsqlConnection(pg_pool, _) => { 130 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 128 131 let user = client 129 132 .query_one( 130 133 &format!("SELECT id, email, username, COALESCE(foreign_instance_id, '') FROM users WHERE {} = $1", identifyer_type), ··· 150 153 let user = self; 151 154 let user_id = user.id; 152 155 match db { 153 - DbConn::PgsqlConnection((client, _), _) => { 156 + DbConn::PgsqlConnection(pg_pool, _) => { 157 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 154 158 let session_key = Uuid::new_v4().to_string(); 155 159 let id = client 156 160 .query_one( ··· 180 184 db: &DbConn, 181 185 ) -> Result<User, LuminaError> { 182 186 match db { 183 - DbConn::PgsqlConnection((client, _), _) => { 187 + DbConn::PgsqlConnection(pg_pool, _) => { 188 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 184 189 let user = client 185 190 .query_one("SELECT users.id, users.email, users.username FROM users JOIN sessions ON users.id = sessions.user_id WHERE sessions.session_key = $1", &[&token]) 186 191 .await ··· 205 210 { 206 211 // Check if the email or username is already in use using fastbloom algorithm with Redis, and fallback to DB check if not found. If not in either, we can go on. 207 212 match db { 208 - DbConn::PgsqlConnection((client, _), redis_pool) => { 209 - let mut redis_conn = redis_pool.get().map_err(LuminaError::R2D2Pool)?; 213 + DbConn::PgsqlConnection(pg_pool, redis_pool) => { 214 + let client = pg_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 215 + let mut redis_conn = redis_pool.get().await.map_err(|e| LuminaError::Bb8Pool(e.to_string()))?; 210 216 // fastbloom_rs expects bytes, so we use the string as bytes 211 217 let email_key = String::from("bloom:email"); 212 218 let username_key = String::from("bloom:username"); 213 219 let email_exists: bool = redis::cmd("BF.EXISTS") 214 220 .arg(&email_key) 215 221 .arg(&email) 216 - .query(&mut *redis_conn) 222 + .query_async(&mut *redis_conn) 223 + .await 217 224 .unwrap_or(false); 218 225 if email_exists { 219 226 // Fallback to DB check if in bloom filter ··· 228 235 let username_exists: bool = redis::cmd("BF.EXISTS") 229 236 .arg(&username_key) 230 237 .arg(&username) 231 - .query(&mut *redis_conn) 238 + .query_async(&mut *redis_conn) 239 + .await 232 240 .unwrap_or(false); 233 241 if username_exists { 234 242 // Fallback to DB check if in bloom filter ··· 250 258 let _: () = redis::cmd("BF.ADD") 251 259 .arg(&email_key) 252 260 .arg(&email) 253 - .query(&mut *redis_conn) 261 + .query_async(&mut *redis_conn) 262 + .await 254 263 .unwrap_or(()); 255 264 return Err(LuminaError::RegisterEmailInUse); 256 265 } ··· 262 271 let _: () = redis::cmd("BF.ADD") 263 272 .arg(&username_key) 264 273 .arg(&username) 265 - .query(&mut *redis_conn) 274 + .query_async(&mut *redis_conn) 275 + .await 266 276 .unwrap_or(()); 267 277 return Err(LuminaError::RegisterUsernameInUse); 268 278 }