use crate::core::{ActorBackend, StorageBackend, StorageError}; use crate::records::{Follow, Like, Post, Profile, Repost}; use async_trait::async_trait; use deadpool_diesel::postgres::Pool; use parakeet_db::{types, utils::tid}; use std::sync::Arc; /// PostgreSQL storage backend implementation leveraging parakeet-db pub struct PostgresBackend { pool: Arc, } impl PostgresBackend { /// Create a new PostgreSQL backend with a connection pool pub fn new(database_url: &str) -> Result { let manager = deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1); let pool = Pool::builder(manager) .build() .map_err(|e| StorageError::Connection(e.to_string()))?; Ok(Self { pool: Arc::new(pool), }) } /// Convert CID string to bytes (simplified) fn cid_to_bytes(&self, cid: &str) -> Vec { // TODO: Use parakeet_db::utils::cid functions let mut bytes = cid.bytes().take(32).collect::>(); bytes.resize(32, 0); bytes } } #[async_trait] impl StorageBackend for PostgresBackend { async fn upsert_post(&self, post: &Post<'static>) -> Result<(), StorageError> { // Parse the AT URI to extract rkey let parts: Vec<&str> = post.uri.split('/').collect(); if parts.len() < 5 { return Err(StorageError::Parse(format!("Invalid AT URI: {}", post.uri))); } let rkey_str = parts[4]; // Convert TID to i64 using parakeet-db's utility let rkey = tid::decode_tid(rkey_str) .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; // Parse CID let cid_bytes = self.cid_to_bytes(&post.cid); // Serialize the post content let content_json = serde_json::to_value(&post.post).map_err(|e| StorageError::Parse(e.to_string()))?; let content_bytes = serde_json::to_vec(&content_json).map_err(|e| StorageError::Parse(e.to_string()))?; // Log the operation (TODO: execute the actual query) tracing::info!( "Would upsert post: actor_id={}, rkey={}, cid_len={}, content_len={}, status={:?}", post.actor_id, rkey, cid_bytes.len(), content_bytes.len(), types::PostStatus::Complete ); // TODO: uncomment this to actually write to the database: /* let mut conn = self.pool.get().await .map_err(|e| StorageError::Connection(e.to_string()))?; use diesel::prelude::*; use diesel_async::RunQueryDsl; use parakeet_db::schema; diesel::insert_into(schema::posts::table) .values(( schema::posts::actor_id.eq(post.actor_id as i32), schema::posts::rkey.eq(rkey), schema::posts::cid.eq(cid_bytes), schema::posts::content.eq(Some(content_bytes)), schema::posts::status.eq(types::PostStatus::Complete), schema::posts::langs.eq(Vec::>::new()), schema::posts::tags.eq(Vec::>::new()), schema::posts::violates_threadgate.eq(false), )) .on_conflict((schema::posts::actor_id, schema::posts::rkey)) .do_update() .set(( schema::posts::cid.eq(cid_bytes), schema::posts::content.eq(Some(content_bytes)), schema::posts::status.eq(types::PostStatus::Complete), )) .execute(&mut conn) .await .map_err(|e| StorageError::Query(e.to_string()))?; */ Ok(()) } async fn upsert_profile(&self, profile: &Profile<'static>) -> Result<(), StorageError> { // Parse CID let profile_cid = self.cid_to_bytes(&profile.cid); // Log the operation tracing::info!( "Would upsert profile: actor_id={}, cid_len={}, display_name={:?}, sync_state={:?}", profile.actor_id, profile_cid.len(), profile.profile.display_name, types::ActorSyncState::Synced ); // TODO: uncomment to update the actors table /* let mut conn = self.pool.get().await .map_err(|e| StorageError::Connection(e.to_string()))?; use diesel::prelude::*; use diesel_async::RunQueryDsl; use parakeet_db::schema; diesel::update(schema::actors::table) .filter(schema::actors::id.eq(profile.actor_id as i32)) .set(( schema::actors::profile_cid.eq(Some(profile_cid)), schema::actors::profile_display_name.eq(profile.profile.display_name.as_deref()), schema::actors::profile_description.eq(profile.profile.description.as_deref()), schema::actors::sync_state.eq(types::ActorSyncState::Synced), )) .execute(&mut conn) .await .map_err(|e| StorageError::Query(e.to_string()))?; */ Ok(()) } async fn create_follow(&self, follow: &Follow<'static>) -> Result<(), StorageError> { // Parse the subject DID to get the target actor let target_did = &follow.follow.subject; // Parse rkey (TID) let parts: Vec<&str> = follow.uri.split('/').collect(); if parts.len() < 5 { return Err(StorageError::Parse(format!( "Invalid AT URI: {}", follow.uri ))); } let rkey = tid::decode_tid(parts[4]) .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; // Log the operation tracing::info!( "Would create follow: actor {} follows {} (rkey: {})", follow.actor_id, target_did, rkey ); // TODO: look up the target actor and update graph relationships Ok(()) } async fn create_like(&self, like: &Like<'static>) -> Result<(), StorageError> { // Parse the subject URI to extract post reference let subject_uri = &like.like.subject.uri; let parts: Vec<&str> = subject_uri.split('/').collect(); if parts.len() < 5 { return Err(StorageError::Parse(format!( "Invalid subject URI: {}", subject_uri ))); } // Extract target post actor DID and rkey let target_did = parts[2]; let target_rkey_str = parts[4]; let target_rkey = tid::decode_tid(target_rkey_str) .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; // Parse like rkey let like_parts: Vec<&str> = like.uri.split('/').collect(); if like_parts.len() < 5 { return Err(StorageError::Parse(format!("Invalid AT URI: {}", like.uri))); } let like_rkey = tid::decode_tid(like_parts[4]) .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; // Log the operation tracing::info!( "Would create like: actor {} likes post did={}/rkey={} (like rkey: {})", like.actor_id, target_did, target_rkey, like_rkey ); // TODO: update the post's like arrays Ok(()) } async fn create_repost(&self, repost: &Repost<'static>) -> Result<(), StorageError> { // Parse the subject URI to extract post reference let subject_uri = &repost.repost.subject.uri; let parts: Vec<&str> = subject_uri.split('/').collect(); if parts.len() < 5 { return Err(StorageError::Parse(format!( "Invalid subject URI: {}", subject_uri ))); } // Extract target post actor DID and rkey let target_did = parts[2]; let target_rkey_str = parts[4]; let target_rkey = tid::decode_tid(target_rkey_str) .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; // Parse repost rkey let repost_parts: Vec<&str> = repost.uri.split('/').collect(); if repost_parts.len() < 5 { return Err(StorageError::Parse(format!( "Invalid AT URI: {}", repost.uri ))); } let repost_rkey = tid::decode_tid(repost_parts[4]) .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; // Log the operation tracing::info!( "Would create repost: actor {} reposts post did={}/rkey={} (repost rkey: {})", repost.actor_id, target_did, target_rkey, repost_rkey ); // TODO: update the post's repost arrays Ok(()) } async fn delete_record(&self, uri: &str) -> Result<(), StorageError> { // Parse the AT URI let parts: Vec<&str> = uri.split('/').collect(); if parts.len() < 5 { return Err(StorageError::Parse(format!("Invalid AT URI: {}", uri))); } let did = parts[2]; let collection = parts[3]; let rkey_str = parts[4]; // Handle different collection types match collection { "app.bsky.feed.post" => { let rkey = tid::decode_tid(rkey_str) .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; // Log the operation tracing::info!( "Would delete post: did={}, rkey={}, status={:?}", did, rkey, types::PostStatus::Deleted ); // TODO: mark the post as deleted in the database } _ => { tracing::warn!("Unhandled collection type for deletion: {}", collection); } } Ok(()) } } #[async_trait] impl ActorBackend for PostgresBackend { async fn get_actor_id(&self, did: &str) -> Result { // TODO: look up or create the actor in the database let actor_id = did .bytes() .fold(1i64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as i64)) .abs(); tracing::info!( "Would get/create actor: did={}, id={}, status={:?}, sync_state={:?}", did, actor_id, types::ActorStatus::Active, types::ActorSyncState::Partial ); // TODO: uncomment to actually query/insert: /* let mut conn = self.pool.get().await .map_err(|e| StorageError::Connection(e.to_string()))?; use diesel::prelude::*; use diesel_async::RunQueryDsl; use parakeet_db::schema; let result = schema::actors::table .select(schema::actors::id) .filter(schema::actors::did.eq(did)) .first::(&mut conn) .await; match result { Ok(id) => Ok(id as i64), Err(diesel::result::Error::NotFound) => { let new_actor_id = diesel::insert_into(schema::actors::table) .values(( schema::actors::did.eq(did), schema::actors::status.eq(types::ActorStatus::Active), schema::actors::sync_state.eq(types::ActorSyncState::Partial), )) .returning(schema::actors::id) .get_result::(&mut conn) .await .map_err(|e| StorageError::Query(e.to_string()))?; Ok(new_actor_id as i64) } Err(e) => Err(StorageError::Query(e.to_string())), } */ Ok(actor_id) } }