//! This module is responsible for handling the database operations. use sqlx::{sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePool}, Executor}; use std::str::FromStr; use crate::{types, webrequest::Agent}; /// The main function for the database module. #[tracing::instrument] pub async fn main_database() -> Result<(), Box> { const STATE: u8 = 0; let pool_opts = SqliteConnectOptions::from_str("sqlite://prod.db").expect("Expected to be able to configure the database, but failed.") .journal_mode(SqliteJournalMode::Wal); let pool = SqlitePool::connect_with(pool_opts).await.expect("Expected to be able to connect to the database at sqlite://prod.db but failed."); match STATE { 0 => { if initialize_database(&pool).await.is_err() { tracing::debug!("Database already initialized"); } }, 1 => { validate_labels(&mut Agent::default(), &pool).await? }, _ => (), } Ok(()) } async fn initialize_database(pool: &SqlitePool) -> Result<(), sqlx::Error> { tracing::debug!("Initializing database"); let mut connection = pool.acquire().await?; _ = connection.execute("PRAGMA foreign_keys=on").await?; _ = connection .execute("CREATE TABLE profile (did STRING PRIMARY KEY)") .await?; _ = connection .execute( "CREATE TABLE profile_stats ( did STRING PRIMARY KEY, created_at DATETIME NOT NULL, follower_count INTEGER NOT NULL, post_count INTEGER NOT NULL, checked_at DATETIME NOT NULL, FOREIGN KEY(did) REFERENCES profile (did) )", ) .await?; _ = connection .execute( "CREATE TABLE profile_labels ( seq INTEGER PRIMARY KEY AUTOINCREMENT, uri STRING NOT NULL, cid STRING, val STRING NOT NULL, neg BOOLEAN, cts DATETIME NOT NULL, exp DATETIME, sig BLOB NOT NULL, FOREIGN KEY(uri) REFERENCES profile (did) )", ) .await?; tracing::info!("Database initialized"); Ok(()) } /// Negate a label by its DID. pub async fn negation(negation_id: &str, agent: &mut Agent, pool: &SqlitePool) -> Result<(), Box> { let existing_label = sqlx::query!( r#"SELECT seq FROM profile_labels WHERE uri = ? AND neg is not true "#, negation_id, ).fetch_optional(pool) .await.expect("Expected to be able to fetch the label, but failed."); tracing::info!("Existing label: {:?}", existing_label); if let Some(label) = existing_label { tracing::info!("Removing label for {}", negation_id); drop(types::Profile::remove_label(pool, label.seq).await); } tracing::info!("Inserting negate for {}", negation_id); drop(types::Profile::new(negation_id).determine_stats(agent, pool).await.negate_label(pool).await); Ok(()) } /// Iterate over all rows of profile_labels and validate the labels. async fn validate_labels(agent: &mut Agent, pool: &SqlitePool) -> Result<(), Box> { // // cleanup duplicates // let uris_with_duplicates = sqlx::query!( // r#"SELECT uri FROM profile_labels // GROUP BY uri // HAVING COUNT(*) > 1 // "#, // ).fetch_all(pool) // .await.expect("Expected to be able to fetch all labels, but failed."); // for uri in uris_with_duplicates { // let seqs = sqlx::query!( // r#"SELECT seq FROM profile_labels // WHERE uri = ? // ORDER BY seq ASC // "#, // uri.uri, // ).fetch_all(pool) // .await.expect("Expected to be able to fetch all labels, but failed."); // let mut first = true; // for seq in seqs { // if first { // first = false; // continue; // } // let seq = seq.seq; // let _ = sqlx::query!( // r#"DELETE FROM profile_labels // WHERE seq = ? // "#, // seq, // ).execute(pool).await.expect("Expected to be able to delete the label, but failed."); // } // } let mut valid = 0; let mut invalid = 0; let mut invalid_list = Vec::new(); let mut unreachable = 0; let start_from_seq = 222_081; // TODO: argumentize this let go_to_seq = 222_144; // TODO: argumentize this let labels = sqlx::query!( r#"SELECT seq, uri "uri: String", neg FROM profile_labels WHERE seq > ? AND seq < ? "#, start_from_seq, go_to_seq, ) .fetch_all(pool) .await.expect("Expected to be able to fetch all labels, but failed."); let num_of_labels = labels.len(); tracing::info!("Validating {} labels", num_of_labels); let mut count: u32 = 0; // We'll be collecting 25 labels at a time, and then checking them with get_profiles, to avoid rate limiting. let mut label_list = Vec::new(); const FETCH_AMOUNT: usize = 25; for label in labels { if label.neg == Some(true) { tracing::info!("Skipping negative label seq {} for https://bsky.app/profile/{}", label.seq, label.uri); continue; } label_list.push((label.uri, label.seq)); if label_list.len() == FETCH_AMOUNT { let fetched_labels = agent.check_profiles(label_list.as_slice()).await.expect("Expected to be able to check the profiles, but failed."); if fetched_labels.len() != FETCH_AMOUNT { tracing::warn!("Expected to get {} labels, but got {}", FETCH_AMOUNT, fetched_labels.len()); unreachable += FETCH_AMOUNT - fetched_labels.len(); // TODO: Figure out which ones are missing } for fetched_label in fetched_labels.iter() { let uri = fetched_label.1.0.as_str(); let seq = fetched_label.1.1; count += 1; if count % 100 == 0 { tracing::info!("Validating label count {}, seq {}", count, seq); } if fetched_label.0 { tracing::debug!("Valid label seq {} for https://bsky.app/profile/{}", seq, uri); valid += 1; } else { tracing::warn!("Invalid label seq {} for https://bsky.app/profile/{}", seq, uri); invalid += 1; invalid_list.push(format!("https://bsky.app/profile/{}", uri)); // let mut profile = types::Profile::new(uri); // let spawned_pool = pool.clone(); // // drop(tokio::spawn(async move { // if types::Profile::remove_label(&spawned_pool, seq).await.is_ok() // && profile.determine_stats_exist(&spawned_pool.clone()).await.expect( // "Expected to be able to determine if stats exist, but failed.").is_some() { // _ = profile.determine_label(&spawned_pool).await; // tracing::debug!("Label removed and profile revalidated for https://bsky.app/profile/{}", uri); // } else { // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", seq, uri); // } // })); } // { // tracing::warn!("Failed to get profile https://bsky.app/profile/{}", uri); // unreachable += 1; // let spawned_pool = pool.clone(); // // drop(tokio::spawn(async move { // if types::Profile::remove_label(&spawned_pool, seq).await.is_err() { // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", seq, uri); // } // // })); // } } label_list.clear(); // tokio::time::sleep(tokio::time::Duration::from_millis(5)).await; } } // for label in labels { // tokio::time::sleep(tokio::time::Duration::from_millis(25)).await; // Sleep for 25ms to throttle rate limiting // count += 1; // if count % 100 == 0 { // tracing::info!("Validating label count {}, seq {}", count, label.seq); // } // tracing::debug!("Validating label seq {} for {}", label.seq, label.uri); // let valid_label = agent.check_profile(&label.uri).await; // if valid_label.is_ok() { // if valid_label.expect("Expected to be able to check the profile, but failed.") { // valid += 1; // } else { // tracing::warn!("Invalid label seq {} for https://bsky.app/profile/{}", label.seq, label.uri); // invalid += 1; // invalid_list.push(format!("https://bsky.app/profile/{}", label.uri)); // let mut profile = crate::types::Profile::new(&label.uri); // let spawned_pool = pool.clone(); // drop(tokio::spawn(async move { // if crate::types::Profile::remove_label(&spawned_pool, label.seq).await.is_ok() // && profile.determine_stats_exist(&spawned_pool.clone()).await.expect( // "Expected to be able to determine if stats exist, but failed.").is_some() { // _ = profile.determine_label(&spawned_pool).await; // tracing::debug!("Label removed and profile revalidated for https://bsky.app/profile/{}", label.uri); // } else { // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", label.seq, label.uri); // } // })); // } // } else { // let valid_label2 = agent.check_profile(&label.uri).await; // if valid_label2.is_ok() { // if valid_label2.expect("Expected to be able to check the profile, but failed.") { // tracing::warn!("Had to retry for profile https://bsky.app/profile/{}", label.uri); // valid += 1; // } else { // tracing::warn!("Invalid label {} for https://bsky.app/profile/{}", label.seq, label.uri); // invalid += 1; // invalid_list.push(format!("https://bsky.app/profile/{}", label.uri)); // let mut profile = crate::types::Profile::new(&label.uri); // let spawned_pool = pool.clone(); // drop(tokio::spawn(async move { // if crate::types::Profile::remove_label(&spawned_pool, label.seq).await.is_ok() // && profile.determine_stats_exist(&spawned_pool.clone()).await.expect( // "Expected to be able to determine if stats exist, but failed.").is_none() { // _ = profile.determine_label(&spawned_pool).await; // tracing::info!("Label removed and profile revalidated for https://bsky.app/profile/{}", label.uri); // } else { // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", label.seq, label.uri); // } // })); // } // } else { // tracing::warn!("Failed to get profile https://bsky.app/profile/{}", label.uri); // unreachable += 1; // let spawned_pool = pool.clone(); // drop(tokio::spawn(async move { // if crate::types::Profile::remove_label(&spawned_pool, label.seq).await.is_err() { // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", label.seq, label.uri); // } // })); // } // } // } tracing::info!("Valid labels: {}", valid); tracing::info!("Invalid labels: {}", invalid); tracing::info!("List of invalid labels: {:?}", invalid_list); tracing::info!("Unreachable labels: {}", unreachable); Ok(()) }