Personal ATProto tools.
at main 262 lines 12 kB view raw
1//! This module is responsible for handling the database operations. 2 3use sqlx::{sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePool}, Executor}; 4use std::str::FromStr; 5 6use crate::{types, webrequest::Agent}; 7 8/// The main function for the database module. 9#[tracing::instrument] 10pub async fn main_database() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 11 const STATE: u8 = 0; 12 let pool_opts = SqliteConnectOptions::from_str("sqlite://prod.db").expect("Expected to be able to configure the database, but failed.") 13 .journal_mode(SqliteJournalMode::Wal); 14 let pool = SqlitePool::connect_with(pool_opts).await.expect("Expected to be able to connect to the database at sqlite://prod.db but failed."); 15 match STATE { 16 0 => { 17 if initialize_database(&pool).await.is_err() { 18 tracing::debug!("Database already initialized"); 19 } 20 }, 21 1 => { 22 validate_labels(&mut Agent::default(), &pool).await? 23 }, 24 _ => (), 25 } 26 Ok(()) 27} 28async fn initialize_database(pool: &SqlitePool) -> Result<(), sqlx::Error> { 29 tracing::debug!("Initializing database"); 30 let mut connection = pool.acquire().await?; 31 _ = connection.execute("PRAGMA foreign_keys=on").await?; 32 _ = connection 33 .execute("CREATE TABLE profile (did STRING PRIMARY KEY)") 34 .await?; 35 _ = connection 36 .execute( 37 "CREATE TABLE profile_stats ( 38 did STRING PRIMARY KEY, 39 created_at DATETIME NOT NULL, 40 follower_count INTEGER NOT NULL, 41 post_count INTEGER NOT NULL, 42 checked_at DATETIME NOT NULL, 43 FOREIGN KEY(did) REFERENCES profile (did) 44 )", 45 ) 46 .await?; 47 _ = connection 48 .execute( 49 "CREATE TABLE profile_labels ( 50 seq INTEGER PRIMARY KEY AUTOINCREMENT, 51 uri STRING NOT NULL, 52 cid STRING, 53 val STRING NOT NULL, 54 neg BOOLEAN, 55 cts DATETIME NOT NULL, 56 exp DATETIME, 57 sig BLOB NOT NULL, 58 FOREIGN KEY(uri) REFERENCES profile (did) 59 )", 60 ) 61 .await?; 62 tracing::info!("Database initialized"); 63 Ok(()) 64} 65/// Negate a label by its DID. 66pub async fn negation(negation_id: &str, agent: &mut Agent, pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 67 let existing_label = sqlx::query!( 68 r#"SELECT seq FROM profile_labels 69 WHERE uri = ? AND neg is not true 70 "#, 71 negation_id, 72 ).fetch_optional(pool) 73 .await.expect("Expected to be able to fetch the label, but failed."); 74 tracing::info!("Existing label: {:?}", existing_label); 75 if let Some(label) = existing_label { 76 tracing::info!("Removing label for {}", negation_id); 77 drop(types::Profile::remove_label(pool, label.seq).await); 78 } 79 tracing::info!("Inserting negate for {}", negation_id); 80 drop(types::Profile::new(negation_id).determine_stats(agent, pool).await.negate_label(pool).await); 81 Ok(()) 82} 83/// Iterate over all rows of profile_labels and validate the labels. 84async fn validate_labels(agent: &mut Agent, pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 85 // // cleanup duplicates 86 // let uris_with_duplicates = sqlx::query!( 87 // r#"SELECT uri FROM profile_labels 88 // GROUP BY uri 89 // HAVING COUNT(*) > 1 90 // "#, 91 // ).fetch_all(pool) 92 // .await.expect("Expected to be able to fetch all labels, but failed."); 93 // for uri in uris_with_duplicates { 94 // let seqs = sqlx::query!( 95 // r#"SELECT seq FROM profile_labels 96 // WHERE uri = ? 97 // ORDER BY seq ASC 98 // "#, 99 // uri.uri, 100 // ).fetch_all(pool) 101 // .await.expect("Expected to be able to fetch all labels, but failed."); 102 // let mut first = true; 103 // for seq in seqs { 104 // if first { 105 // first = false; 106 // continue; 107 // } 108 // let seq = seq.seq; 109 // let _ = sqlx::query!( 110 // r#"DELETE FROM profile_labels 111 // WHERE seq = ? 112 // "#, 113 // seq, 114 // ).execute(pool).await.expect("Expected to be able to delete the label, but failed."); 115 // } 116 // } 117 let mut valid = 0; 118 let mut invalid = 0; 119 let mut invalid_list = Vec::new(); 120 let mut unreachable = 0; 121 let start_from_seq = 222_081; // TODO: argumentize this 122 let go_to_seq = 222_144; // TODO: argumentize this 123 let labels = sqlx::query!( 124 r#"SELECT seq, uri "uri: String", neg FROM profile_labels 125 WHERE seq > ? AND seq < ? 126 "#, 127 start_from_seq, 128 go_to_seq, 129 ) 130 .fetch_all(pool) 131 .await.expect("Expected to be able to fetch all labels, but failed."); 132 let num_of_labels = labels.len(); 133 tracing::info!("Validating {} labels", num_of_labels); 134 let mut count: u32 = 0; 135 // We'll be collecting 25 labels at a time, and then checking them with get_profiles, to avoid rate limiting. 136 let mut label_list = Vec::new(); 137 const FETCH_AMOUNT: usize = 25; 138 for label in labels { 139 if label.neg == Some(true) { 140 tracing::info!("Skipping negative label seq {} for https://bsky.app/profile/{}", label.seq, label.uri); 141 continue; 142 } 143 label_list.push((label.uri, label.seq)); 144 if label_list.len() == FETCH_AMOUNT { 145 let fetched_labels = agent.check_profiles(label_list.as_slice()).await.expect("Expected to be able to check the profiles, but failed."); 146 if fetched_labels.len() != FETCH_AMOUNT { 147 tracing::warn!("Expected to get {} labels, but got {}", FETCH_AMOUNT, fetched_labels.len()); 148 unreachable += FETCH_AMOUNT - fetched_labels.len(); 149 // TODO: Figure out which ones are missing 150 } 151 for fetched_label in fetched_labels.iter() { 152 let uri = fetched_label.1.0.as_str(); 153 let seq = fetched_label.1.1; 154 count += 1; 155 if count % 100 == 0 { 156 tracing::info!("Validating label count {}, seq {}", count, seq); 157 } 158 if fetched_label.0 { 159 tracing::debug!("Valid label seq {} for https://bsky.app/profile/{}", seq, uri); 160 valid += 1; 161 } else { 162 tracing::warn!("Invalid label seq {} for https://bsky.app/profile/{}", seq, uri); 163 invalid += 1; 164 invalid_list.push(format!("https://bsky.app/profile/{}", uri)); 165 // let mut profile = types::Profile::new(uri); 166 // let spawned_pool = pool.clone(); 167 // // drop(tokio::spawn(async move { 168 // if types::Profile::remove_label(&spawned_pool, seq).await.is_ok() 169 // && profile.determine_stats_exist(&spawned_pool.clone()).await.expect( 170 // "Expected to be able to determine if stats exist, but failed.").is_some() { 171 // _ = profile.determine_label(&spawned_pool).await; 172 // tracing::debug!("Label removed and profile revalidated for https://bsky.app/profile/{}", uri); 173 // } else { 174 // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", seq, uri); 175 // } 176 // })); 177 } 178 // { 179 // tracing::warn!("Failed to get profile https://bsky.app/profile/{}", uri); 180 // unreachable += 1; 181 // let spawned_pool = pool.clone(); 182 // // drop(tokio::spawn(async move { 183 // if types::Profile::remove_label(&spawned_pool, seq).await.is_err() { 184 // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", seq, uri); 185 // } 186 // // })); 187 // } 188 } 189 label_list.clear(); 190 // tokio::time::sleep(tokio::time::Duration::from_millis(5)).await; 191 } 192 } 193 194 // for label in labels { 195 // tokio::time::sleep(tokio::time::Duration::from_millis(25)).await; // Sleep for 25ms to throttle rate limiting 196 // count += 1; 197 // if count % 100 == 0 { 198 // tracing::info!("Validating label count {}, seq {}", count, label.seq); 199 // } 200 // tracing::debug!("Validating label seq {} for {}", label.seq, label.uri); 201 // let valid_label = agent.check_profile(&label.uri).await; 202 // if valid_label.is_ok() { 203 // if valid_label.expect("Expected to be able to check the profile, but failed.") { 204 // valid += 1; 205 // } else { 206 // tracing::warn!("Invalid label seq {} for https://bsky.app/profile/{}", label.seq, label.uri); 207 // invalid += 1; 208 // invalid_list.push(format!("https://bsky.app/profile/{}", label.uri)); 209 // let mut profile = crate::types::Profile::new(&label.uri); 210 // let spawned_pool = pool.clone(); 211 // drop(tokio::spawn(async move { 212 // if crate::types::Profile::remove_label(&spawned_pool, label.seq).await.is_ok() 213 // && profile.determine_stats_exist(&spawned_pool.clone()).await.expect( 214 // "Expected to be able to determine if stats exist, but failed.").is_some() { 215 // _ = profile.determine_label(&spawned_pool).await; 216 // tracing::debug!("Label removed and profile revalidated for https://bsky.app/profile/{}", label.uri); 217 // } else { 218 // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", label.seq, label.uri); 219 // } 220 // })); 221 // } 222 // } else { 223 // let valid_label2 = agent.check_profile(&label.uri).await; 224 // if valid_label2.is_ok() { 225 // if valid_label2.expect("Expected to be able to check the profile, but failed.") { 226 // tracing::warn!("Had to retry for profile https://bsky.app/profile/{}", label.uri); 227 // valid += 1; 228 // } else { 229 // tracing::warn!("Invalid label {} for https://bsky.app/profile/{}", label.seq, label.uri); 230 // invalid += 1; 231 // invalid_list.push(format!("https://bsky.app/profile/{}", label.uri)); 232 // let mut profile = crate::types::Profile::new(&label.uri); 233 // let spawned_pool = pool.clone(); 234 // drop(tokio::spawn(async move { 235 // if crate::types::Profile::remove_label(&spawned_pool, label.seq).await.is_ok() 236 // && profile.determine_stats_exist(&spawned_pool.clone()).await.expect( 237 // "Expected to be able to determine if stats exist, but failed.").is_none() { 238 // _ = profile.determine_label(&spawned_pool).await; 239 // tracing::info!("Label removed and profile revalidated for https://bsky.app/profile/{}", label.uri); 240 // } else { 241 // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", label.seq, label.uri); 242 // } 243 // })); 244 // } 245 // } else { 246 // tracing::warn!("Failed to get profile https://bsky.app/profile/{}", label.uri); 247 // unreachable += 1; 248 // let spawned_pool = pool.clone(); 249 // drop(tokio::spawn(async move { 250 // if crate::types::Profile::remove_label(&spawned_pool, label.seq).await.is_err() { 251 // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", label.seq, label.uri); 252 // } 253 // })); 254 // } 255 // } 256 // } 257 tracing::info!("Valid labels: {}", valid); 258 tracing::info!("Invalid labels: {}", invalid); 259 tracing::info!("List of invalid labels: {:?}", invalid_list); 260 tracing::info!("Unreachable labels: {}", unreachable); 261 Ok(()) 262}