Learn how to use Rust to build ATProto powered applications
at main 14 kB view raw
1use actix_web::web::Data; 2use async_sqlite::{ 3 Pool, rusqlite, 4 rusqlite::{Error, Row}, 5}; 6use atrium_api::types::string::Did; 7use chrono::{DateTime, Datelike, Utc}; 8use rusqlite::types::Type; 9use serde::{Deserialize, Serialize}; 10use std::{fmt::Debug, sync::Arc}; 11 12/// Creates the tables in the db. 13pub async fn create_tables_in_database(pool: &Pool) -> Result<(), async_sqlite::Error> { 14 pool.conn(move |conn| { 15 conn.execute("PRAGMA foreign_keys = ON", []).unwrap(); 16 17 // status 18 conn.execute( 19 "CREATE TABLE IF NOT EXISTS status ( 20 uri TEXT PRIMARY KEY, 21 authorDid TEXT NOT NULL, 22 status TEXT NOT NULL, 23 createdAt INTEGER NOT NULL, 24 indexedAt INTEGER NOT NULL 25 )", 26 [], 27 ) 28 .unwrap(); 29 30 // auth_session 31 conn.execute( 32 "CREATE TABLE IF NOT EXISTS auth_session ( 33 key TEXT PRIMARY KEY, 34 session TEXT NOT NULL 35 )", 36 [], 37 ) 38 .unwrap(); 39 40 // auth_state 41 conn.execute( 42 "CREATE TABLE IF NOT EXISTS auth_state ( 43 key TEXT PRIMARY KEY, 44 state TEXT NOT NULL 45 )", 46 [], 47 ) 48 .unwrap(); 49 Ok(()) 50 }) 51 .await?; 52 Ok(()) 53} 54 55///Status table datatype 56#[derive(Debug, Clone, Deserialize, Serialize)] 57pub struct StatusFromDb { 58 pub uri: String, 59 pub author_did: String, 60 pub status: String, 61 pub created_at: DateTime<Utc>, 62 pub indexed_at: DateTime<Utc>, 63 pub handle: Option<String>, 64} 65 66//Status methods 67impl StatusFromDb { 68 /// Creates a new [StatusFromDb] 69 pub fn new(uri: String, author_did: String, status: String) -> Self { 70 let now = chrono::Utc::now(); 71 Self { 72 uri, 73 author_did, 74 status, 75 created_at: now, 76 indexed_at: now, 77 handle: None, 78 } 79 } 80 81 /// Helper to map from [Row] to [StatusDb] 82 fn map_from_row(row: &Row) -> Result<Self, rusqlite::Error> { 83 Ok(Self { 84 uri: row.get(0)?, 85 author_did: row.get(1)?, 86 status: row.get(2)?, 87 //DateTimes are stored as INTEGERS then parsed into a DateTime<UTC> 88 created_at: { 89 let timestamp: i64 = row.get(3)?; 90 DateTime::from_timestamp(timestamp, 0).ok_or_else(|| { 91 Error::InvalidColumnType(3, "Invalid timestamp".parse().unwrap(), Type::Text) 92 })? 93 }, 94 //DateTimes are stored as INTEGERS then parsed into a DateTime<UTC> 95 indexed_at: { 96 let timestamp: i64 = row.get(4)?; 97 DateTime::from_timestamp(timestamp, 0).ok_or_else(|| { 98 Error::InvalidColumnType(4, "Invalid timestamp".parse().unwrap(), Type::Text) 99 })? 100 }, 101 handle: None, 102 }) 103 } 104 105 /// Helper for the UI to see if indexed_at date is today or not 106 pub fn is_today(&self) -> bool { 107 let now = Utc::now(); 108 109 self.indexed_at.day() == now.day() 110 && self.indexed_at.month() == now.month() 111 && self.indexed_at.year() == now.year() 112 } 113 114 /// Saves the [StatusDb] 115 pub async fn save(&self, pool: Data<Arc<Pool>>) -> Result<(), async_sqlite::Error> { 116 let cloned_self = self.clone(); 117 pool.conn(move |conn| { 118 Ok(conn.execute( 119 "INSERT INTO status (uri, authorDid, status, createdAt, indexedAt) VALUES (?1, ?2, ?3, ?4, ?5)", 120 [ 121 &cloned_self.uri, 122 &cloned_self.author_did, 123 &cloned_self.status, 124 &cloned_self.created_at.timestamp().to_string(), 125 &cloned_self.indexed_at.timestamp().to_string(), 126 ], 127 )?) 128 }) 129 .await?; 130 Ok(()) 131 } 132 133 /// Saves or updates a status by its did(uri) 134 pub async fn save_or_update(&self, pool: &Pool) -> Result<(), async_sqlite::Error> { 135 let cloned_self = self.clone(); 136 pool.conn(move |conn| { 137 //We check to see if the session already exists, if so we need to update not insert 138 let mut stmt = conn.prepare("SELECT COUNT(*) FROM status WHERE uri = ?1")?; 139 let count: i64 = stmt.query_row([&cloned_self.uri], |row| row.get(0))?; 140 match count > 0 { 141 true => { 142 let mut update_stmt = 143 conn.prepare("UPDATE status SET status = ?2, indexedAt = ? WHERE uri = ?1")?; 144 update_stmt.execute([&cloned_self.uri, &cloned_self.status, &cloned_self.indexed_at.timestamp().to_string()])?; 145 Ok(()) 146 } 147 false => { 148 conn.execute( 149 "INSERT INTO status (uri, authorDid, status, createdAt, indexedAt) VALUES (?1, ?2, ?3, ?4, ?5)", 150 [ 151 &cloned_self.uri, 152 &cloned_self.author_did, 153 &cloned_self.status, 154 &cloned_self.created_at.timestamp().to_string(), 155 &cloned_self.indexed_at.timestamp().to_string(), 156 ], 157 )?; 158 Ok(()) 159 } 160 } 161 }) 162 .await?; 163 Ok(()) 164 } 165 pub async fn delete_by_uri(pool: &Pool, uri: String) -> Result<(), async_sqlite::Error> { 166 pool.conn(move |conn| { 167 let mut stmt = conn.prepare("DELETE FROM status WHERE uri = ?1")?; 168 stmt.execute([&uri]) 169 }) 170 .await?; 171 Ok(()) 172 } 173 174 /// Loads the last 10 statuses we have saved 175 pub async fn load_latest_statuses( 176 pool: &Data<Arc<Pool>>, 177 ) -> Result<Vec<Self>, async_sqlite::Error> { 178 Ok(pool 179 .conn(move |conn| { 180 let mut stmt = 181 conn.prepare("SELECT * FROM status ORDER BY indexedAt DESC LIMIT 10")?; 182 let status_iter = stmt 183 .query_map([], |row| Ok(Self::map_from_row(row).unwrap())) 184 .unwrap(); 185 186 let mut statuses = Vec::new(); 187 for status in status_iter { 188 statuses.push(status?); 189 } 190 Ok(statuses) 191 }) 192 .await?) 193 } 194 195 /// Loads the logged-in users current status 196 pub async fn my_status( 197 pool: &Data<Arc<Pool>>, 198 did: &str, 199 ) -> Result<Option<Self>, async_sqlite::Error> { 200 let did = did.to_string(); 201 pool.conn(move |conn| { 202 let mut stmt = conn.prepare( 203 "SELECT * FROM status WHERE authorDid = ?1 ORDER BY createdAt DESC LIMIT 1", 204 )?; 205 stmt.query_row([did.as_str()], |row| Self::map_from_row(row)) 206 .map(Some) 207 .or_else(|err| { 208 if err == rusqlite::Error::QueryReturnedNoRows { 209 Ok(None) 210 } else { 211 Err(err) 212 } 213 }) 214 }) 215 .await 216 } 217 218 /// ui helper to show a handle or did if the handle cannot be found 219 pub fn author_display_name(&self) -> String { 220 match self.handle.as_ref() { 221 Some(handle) => handle.to_string(), 222 None => self.author_did.to_string(), 223 } 224 } 225} 226 227/// AuthSession table data type 228#[derive(Debug, Clone, Deserialize, Serialize)] 229pub struct AuthSession { 230 pub key: String, 231 pub session: String, 232} 233 234impl AuthSession { 235 /// Creates a new [AuthSession] 236 pub fn new<V>(key: String, session: V) -> Self 237 where 238 V: Serialize, 239 { 240 let session = serde_json::to_string(&session).unwrap(); 241 Self { 242 key: key.to_string(), 243 session, 244 } 245 } 246 247 /// Helper to map from [Row] to [AuthSession] 248 fn map_from_row(row: &Row) -> Result<Self, Error> { 249 let key: String = row.get(0)?; 250 let session: String = row.get(1)?; 251 Ok(Self { key, session }) 252 } 253 254 /// Gets a session by the users did(key) 255 pub async fn get_by_did(pool: &Pool, did: String) -> Result<Option<Self>, async_sqlite::Error> { 256 let did = Did::new(did).unwrap(); 257 pool.conn(move |conn| { 258 let mut stmt = conn.prepare("SELECT * FROM auth_session WHERE key = ?1")?; 259 stmt.query_row([did.as_str()], |row| Self::map_from_row(row)) 260 .map(Some) 261 .or_else(|err| { 262 if err == Error::QueryReturnedNoRows { 263 Ok(None) 264 } else { 265 Err(err) 266 } 267 }) 268 }) 269 .await 270 } 271 272 /// Saves or updates the session by its did(key) 273 pub async fn save_or_update(&self, pool: &Pool) -> Result<(), async_sqlite::Error> { 274 let cloned_self = self.clone(); 275 pool.conn(move |conn| { 276 //We check to see if the session already exists, if so we need to update not insert 277 let mut stmt = conn.prepare("SELECT COUNT(*) FROM auth_session WHERE key = ?1")?; 278 let count: i64 = stmt.query_row([&cloned_self.key], |row| row.get(0))?; 279 match count > 0 { 280 true => { 281 let mut update_stmt = 282 conn.prepare("UPDATE auth_session SET session = ?2 WHERE key = ?1")?; 283 update_stmt.execute([&cloned_self.key, &cloned_self.session])?; 284 Ok(()) 285 } 286 false => { 287 conn.execute( 288 "INSERT INTO auth_session (key, session) VALUES (?1, ?2)", 289 [&cloned_self.key, &cloned_self.session], 290 )?; 291 Ok(()) 292 } 293 } 294 }) 295 .await?; 296 Ok(()) 297 } 298 299 /// Deletes the session by did 300 pub async fn delete_by_did(pool: &Pool, did: String) -> Result<(), async_sqlite::Error> { 301 pool.conn(move |conn| { 302 let mut stmt = conn.prepare("DELETE FROM auth_session WHERE key = ?1")?; 303 stmt.execute([&did]) 304 }) 305 .await?; 306 Ok(()) 307 } 308 309 /// Deletes all the sessions 310 pub async fn delete_all(pool: &Pool) -> Result<(), async_sqlite::Error> { 311 pool.conn(move |conn| { 312 let mut stmt = conn.prepare("DELETE FROM auth_session")?; 313 stmt.execute([]) 314 }) 315 .await?; 316 Ok(()) 317 } 318} 319 320/// AuthState table datatype 321#[derive(Debug, Clone, Deserialize, Serialize)] 322pub struct AuthState { 323 pub key: String, 324 pub state: String, 325} 326 327impl AuthState { 328 /// Creates a new [AuthState] 329 pub fn new<V>(key: String, state: V) -> Self 330 where 331 V: Serialize, 332 { 333 let state = serde_json::to_string(&state).unwrap(); 334 Self { 335 key: key.to_string(), 336 state, 337 } 338 } 339 340 /// Helper to map from [Row] to [AuthState] 341 fn map_from_row(row: &Row) -> Result<Self, Error> { 342 let key: String = row.get(0)?; 343 let state: String = row.get(1)?; 344 Ok(Self { key, state }) 345 } 346 347 /// Gets a state by the users key 348 pub async fn get_by_key(pool: &Pool, key: String) -> Result<Option<Self>, async_sqlite::Error> { 349 pool.conn(move |conn| { 350 let mut stmt = conn.prepare("SELECT * FROM auth_state WHERE key = ?1")?; 351 stmt.query_row([key.as_str()], |row| Self::map_from_row(row)) 352 .map(Some) 353 .or_else(|err| { 354 if err == Error::QueryReturnedNoRows { 355 Ok(None) 356 } else { 357 Err(err) 358 } 359 }) 360 }) 361 .await 362 } 363 364 /// Saves or updates the state by its key 365 pub async fn save_or_update(&self, pool: &Pool) -> Result<(), async_sqlite::Error> { 366 let cloned_self = self.clone(); 367 pool.conn(move |conn| { 368 //We check to see if the state already exists, if so we need to update 369 let mut stmt = conn.prepare("SELECT COUNT(*) FROM auth_state WHERE key = ?1")?; 370 let count: i64 = stmt.query_row([&cloned_self.key], |row| row.get(0))?; 371 match count > 0 { 372 true => { 373 let mut update_stmt = 374 conn.prepare("UPDATE auth_state SET state = ?2 WHERE key = ?1")?; 375 update_stmt.execute([&cloned_self.key, &cloned_self.state])?; 376 Ok(()) 377 } 378 false => { 379 conn.execute( 380 "INSERT INTO auth_state (key, state) VALUES (?1, ?2)", 381 [&cloned_self.key, &cloned_self.state], 382 )?; 383 Ok(()) 384 } 385 } 386 }) 387 .await?; 388 Ok(()) 389 } 390 391 pub async fn delete_by_key(pool: &Pool, key: String) -> Result<(), async_sqlite::Error> { 392 pool.conn(move |conn| { 393 let mut stmt = conn.prepare("DELETE FROM auth_state WHERE key = ?1")?; 394 stmt.execute([&key]) 395 }) 396 .await?; 397 Ok(()) 398 } 399 400 pub async fn delete_all(pool: &Pool) -> Result<(), async_sqlite::Error> { 401 pool.conn(move |conn| { 402 let mut stmt = conn.prepare("DELETE FROM auth_state")?; 403 stmt.execute([]) 404 }) 405 .await?; 406 Ok(()) 407 } 408}