Learn how to use Rust to build ATProto powered applications
1use actix_web::web::Data;
2use async_sqlite::{
3 Pool, rusqlite,
4 rusqlite::{Error, Row},
5};
6use atrium_api::types::string::Did;
7use chrono::{DateTime, Datelike, Utc};
8use rusqlite::types::Type;
9use serde::{Deserialize, Serialize};
10use std::{fmt::Debug, sync::Arc};
11
12/// Creates the tables in the db.
13pub async fn create_tables_in_database(pool: &Pool) -> Result<(), async_sqlite::Error> {
14 pool.conn(move |conn| {
15 conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
16
17 // status
18 conn.execute(
19 "CREATE TABLE IF NOT EXISTS status (
20 uri TEXT PRIMARY KEY,
21 authorDid TEXT NOT NULL,
22 status TEXT NOT NULL,
23 createdAt INTEGER NOT NULL,
24 indexedAt INTEGER NOT NULL
25 )",
26 [],
27 )
28 .unwrap();
29
30 // auth_session
31 conn.execute(
32 "CREATE TABLE IF NOT EXISTS auth_session (
33 key TEXT PRIMARY KEY,
34 session TEXT NOT NULL
35 )",
36 [],
37 )
38 .unwrap();
39
40 // auth_state
41 conn.execute(
42 "CREATE TABLE IF NOT EXISTS auth_state (
43 key TEXT PRIMARY KEY,
44 state TEXT NOT NULL
45 )",
46 [],
47 )
48 .unwrap();
49 Ok(())
50 })
51 .await?;
52 Ok(())
53}
54
55///Status table datatype
56#[derive(Debug, Clone, Deserialize, Serialize)]
57pub struct StatusFromDb {
58 pub uri: String,
59 pub author_did: String,
60 pub status: String,
61 pub created_at: DateTime<Utc>,
62 pub indexed_at: DateTime<Utc>,
63 pub handle: Option<String>,
64}
65
66//Status methods
67impl StatusFromDb {
68 /// Creates a new [StatusFromDb]
69 pub fn new(uri: String, author_did: String, status: String) -> Self {
70 let now = chrono::Utc::now();
71 Self {
72 uri,
73 author_did,
74 status,
75 created_at: now,
76 indexed_at: now,
77 handle: None,
78 }
79 }
80
81 /// Helper to map from [Row] to [StatusDb]
82 fn map_from_row(row: &Row) -> Result<Self, rusqlite::Error> {
83 Ok(Self {
84 uri: row.get(0)?,
85 author_did: row.get(1)?,
86 status: row.get(2)?,
87 //DateTimes are stored as INTEGERS then parsed into a DateTime<UTC>
88 created_at: {
89 let timestamp: i64 = row.get(3)?;
90 DateTime::from_timestamp(timestamp, 0).ok_or_else(|| {
91 Error::InvalidColumnType(3, "Invalid timestamp".parse().unwrap(), Type::Text)
92 })?
93 },
94 //DateTimes are stored as INTEGERS then parsed into a DateTime<UTC>
95 indexed_at: {
96 let timestamp: i64 = row.get(4)?;
97 DateTime::from_timestamp(timestamp, 0).ok_or_else(|| {
98 Error::InvalidColumnType(4, "Invalid timestamp".parse().unwrap(), Type::Text)
99 })?
100 },
101 handle: None,
102 })
103 }
104
105 /// Helper for the UI to see if indexed_at date is today or not
106 pub fn is_today(&self) -> bool {
107 let now = Utc::now();
108
109 self.indexed_at.day() == now.day()
110 && self.indexed_at.month() == now.month()
111 && self.indexed_at.year() == now.year()
112 }
113
114 /// Saves the [StatusDb]
115 pub async fn save(&self, pool: Data<Arc<Pool>>) -> Result<(), async_sqlite::Error> {
116 let cloned_self = self.clone();
117 pool.conn(move |conn| {
118 Ok(conn.execute(
119 "INSERT INTO status (uri, authorDid, status, createdAt, indexedAt) VALUES (?1, ?2, ?3, ?4, ?5)",
120 [
121 &cloned_self.uri,
122 &cloned_self.author_did,
123 &cloned_self.status,
124 &cloned_self.created_at.timestamp().to_string(),
125 &cloned_self.indexed_at.timestamp().to_string(),
126 ],
127 )?)
128 })
129 .await?;
130 Ok(())
131 }
132
133 /// Saves or updates a status by its did(uri)
134 pub async fn save_or_update(&self, pool: &Pool) -> Result<(), async_sqlite::Error> {
135 let cloned_self = self.clone();
136 pool.conn(move |conn| {
137 //We check to see if the session already exists, if so we need to update not insert
138 let mut stmt = conn.prepare("SELECT COUNT(*) FROM status WHERE uri = ?1")?;
139 let count: i64 = stmt.query_row([&cloned_self.uri], |row| row.get(0))?;
140 match count > 0 {
141 true => {
142 let mut update_stmt =
143 conn.prepare("UPDATE status SET status = ?2, indexedAt = ? WHERE uri = ?1")?;
144 update_stmt.execute([&cloned_self.uri, &cloned_self.status, &cloned_self.indexed_at.timestamp().to_string()])?;
145 Ok(())
146 }
147 false => {
148 conn.execute(
149 "INSERT INTO status (uri, authorDid, status, createdAt, indexedAt) VALUES (?1, ?2, ?3, ?4, ?5)",
150 [
151 &cloned_self.uri,
152 &cloned_self.author_did,
153 &cloned_self.status,
154 &cloned_self.created_at.timestamp().to_string(),
155 &cloned_self.indexed_at.timestamp().to_string(),
156 ],
157 )?;
158 Ok(())
159 }
160 }
161 })
162 .await?;
163 Ok(())
164 }
165 pub async fn delete_by_uri(pool: &Pool, uri: String) -> Result<(), async_sqlite::Error> {
166 pool.conn(move |conn| {
167 let mut stmt = conn.prepare("DELETE FROM status WHERE uri = ?1")?;
168 stmt.execute([&uri])
169 })
170 .await?;
171 Ok(())
172 }
173
174 /// Loads the last 10 statuses we have saved
175 pub async fn load_latest_statuses(
176 pool: &Data<Arc<Pool>>,
177 ) -> Result<Vec<Self>, async_sqlite::Error> {
178 Ok(pool
179 .conn(move |conn| {
180 let mut stmt =
181 conn.prepare("SELECT * FROM status ORDER BY indexedAt DESC LIMIT 10")?;
182 let status_iter = stmt
183 .query_map([], |row| Ok(Self::map_from_row(row).unwrap()))
184 .unwrap();
185
186 let mut statuses = Vec::new();
187 for status in status_iter {
188 statuses.push(status?);
189 }
190 Ok(statuses)
191 })
192 .await?)
193 }
194
195 /// Loads the logged-in users current status
196 pub async fn my_status(
197 pool: &Data<Arc<Pool>>,
198 did: &str,
199 ) -> Result<Option<Self>, async_sqlite::Error> {
200 let did = did.to_string();
201 pool.conn(move |conn| {
202 let mut stmt = conn.prepare(
203 "SELECT * FROM status WHERE authorDid = ?1 ORDER BY createdAt DESC LIMIT 1",
204 )?;
205 stmt.query_row([did.as_str()], |row| Self::map_from_row(row))
206 .map(Some)
207 .or_else(|err| {
208 if err == rusqlite::Error::QueryReturnedNoRows {
209 Ok(None)
210 } else {
211 Err(err)
212 }
213 })
214 })
215 .await
216 }
217
218 /// ui helper to show a handle or did if the handle cannot be found
219 pub fn author_display_name(&self) -> String {
220 match self.handle.as_ref() {
221 Some(handle) => handle.to_string(),
222 None => self.author_did.to_string(),
223 }
224 }
225}
226
227/// AuthSession table data type
228#[derive(Debug, Clone, Deserialize, Serialize)]
229pub struct AuthSession {
230 pub key: String,
231 pub session: String,
232}
233
234impl AuthSession {
235 /// Creates a new [AuthSession]
236 pub fn new<V>(key: String, session: V) -> Self
237 where
238 V: Serialize,
239 {
240 let session = serde_json::to_string(&session).unwrap();
241 Self {
242 key: key.to_string(),
243 session,
244 }
245 }
246
247 /// Helper to map from [Row] to [AuthSession]
248 fn map_from_row(row: &Row) -> Result<Self, Error> {
249 let key: String = row.get(0)?;
250 let session: String = row.get(1)?;
251 Ok(Self { key, session })
252 }
253
254 /// Gets a session by the users did(key)
255 pub async fn get_by_did(pool: &Pool, did: String) -> Result<Option<Self>, async_sqlite::Error> {
256 let did = Did::new(did).unwrap();
257 pool.conn(move |conn| {
258 let mut stmt = conn.prepare("SELECT * FROM auth_session WHERE key = ?1")?;
259 stmt.query_row([did.as_str()], |row| Self::map_from_row(row))
260 .map(Some)
261 .or_else(|err| {
262 if err == Error::QueryReturnedNoRows {
263 Ok(None)
264 } else {
265 Err(err)
266 }
267 })
268 })
269 .await
270 }
271
272 /// Saves or updates the session by its did(key)
273 pub async fn save_or_update(&self, pool: &Pool) -> Result<(), async_sqlite::Error> {
274 let cloned_self = self.clone();
275 pool.conn(move |conn| {
276 //We check to see if the session already exists, if so we need to update not insert
277 let mut stmt = conn.prepare("SELECT COUNT(*) FROM auth_session WHERE key = ?1")?;
278 let count: i64 = stmt.query_row([&cloned_self.key], |row| row.get(0))?;
279 match count > 0 {
280 true => {
281 let mut update_stmt =
282 conn.prepare("UPDATE auth_session SET session = ?2 WHERE key = ?1")?;
283 update_stmt.execute([&cloned_self.key, &cloned_self.session])?;
284 Ok(())
285 }
286 false => {
287 conn.execute(
288 "INSERT INTO auth_session (key, session) VALUES (?1, ?2)",
289 [&cloned_self.key, &cloned_self.session],
290 )?;
291 Ok(())
292 }
293 }
294 })
295 .await?;
296 Ok(())
297 }
298
299 /// Deletes the session by did
300 pub async fn delete_by_did(pool: &Pool, did: String) -> Result<(), async_sqlite::Error> {
301 pool.conn(move |conn| {
302 let mut stmt = conn.prepare("DELETE FROM auth_session WHERE key = ?1")?;
303 stmt.execute([&did])
304 })
305 .await?;
306 Ok(())
307 }
308
309 /// Deletes all the sessions
310 pub async fn delete_all(pool: &Pool) -> Result<(), async_sqlite::Error> {
311 pool.conn(move |conn| {
312 let mut stmt = conn.prepare("DELETE FROM auth_session")?;
313 stmt.execute([])
314 })
315 .await?;
316 Ok(())
317 }
318}
319
320/// AuthState table datatype
321#[derive(Debug, Clone, Deserialize, Serialize)]
322pub struct AuthState {
323 pub key: String,
324 pub state: String,
325}
326
327impl AuthState {
328 /// Creates a new [AuthState]
329 pub fn new<V>(key: String, state: V) -> Self
330 where
331 V: Serialize,
332 {
333 let state = serde_json::to_string(&state).unwrap();
334 Self {
335 key: key.to_string(),
336 state,
337 }
338 }
339
340 /// Helper to map from [Row] to [AuthState]
341 fn map_from_row(row: &Row) -> Result<Self, Error> {
342 let key: String = row.get(0)?;
343 let state: String = row.get(1)?;
344 Ok(Self { key, state })
345 }
346
347 /// Gets a state by the users key
348 pub async fn get_by_key(pool: &Pool, key: String) -> Result<Option<Self>, async_sqlite::Error> {
349 pool.conn(move |conn| {
350 let mut stmt = conn.prepare("SELECT * FROM auth_state WHERE key = ?1")?;
351 stmt.query_row([key.as_str()], |row| Self::map_from_row(row))
352 .map(Some)
353 .or_else(|err| {
354 if err == Error::QueryReturnedNoRows {
355 Ok(None)
356 } else {
357 Err(err)
358 }
359 })
360 })
361 .await
362 }
363
364 /// Saves or updates the state by its key
365 pub async fn save_or_update(&self, pool: &Pool) -> Result<(), async_sqlite::Error> {
366 let cloned_self = self.clone();
367 pool.conn(move |conn| {
368 //We check to see if the state already exists, if so we need to update
369 let mut stmt = conn.prepare("SELECT COUNT(*) FROM auth_state WHERE key = ?1")?;
370 let count: i64 = stmt.query_row([&cloned_self.key], |row| row.get(0))?;
371 match count > 0 {
372 true => {
373 let mut update_stmt =
374 conn.prepare("UPDATE auth_state SET state = ?2 WHERE key = ?1")?;
375 update_stmt.execute([&cloned_self.key, &cloned_self.state])?;
376 Ok(())
377 }
378 false => {
379 conn.execute(
380 "INSERT INTO auth_state (key, state) VALUES (?1, ?2)",
381 [&cloned_self.key, &cloned_self.state],
382 )?;
383 Ok(())
384 }
385 }
386 })
387 .await?;
388 Ok(())
389 }
390
391 pub async fn delete_by_key(pool: &Pool, key: String) -> Result<(), async_sqlite::Error> {
392 pool.conn(move |conn| {
393 let mut stmt = conn.prepare("DELETE FROM auth_state WHERE key = ?1")?;
394 stmt.execute([&key])
395 })
396 .await?;
397 Ok(())
398 }
399
400 pub async fn delete_all(pool: &Pool) -> Result<(), async_sqlite::Error> {
401 pool.conn(move |conn| {
402 let mut stmt = conn.prepare("DELETE FROM auth_state")?;
403 stmt.execute([])
404 })
405 .await?;
406 Ok(())
407 }
408}