Rust AppView - highly experimental!
at experiments 341 lines 12 kB view raw
1use crate::core::{ActorBackend, StorageBackend, StorageError}; 2use crate::records::{Follow, Like, Post, Profile, Repost}; 3use async_trait::async_trait; 4use deadpool_diesel::postgres::Pool; 5use parakeet_db::{types, utils::tid}; 6use std::sync::Arc; 7 8/// PostgreSQL storage backend implementation leveraging parakeet-db 9pub struct PostgresBackend { 10 pool: Arc<Pool>, 11} 12 13impl PostgresBackend { 14 /// Create a new PostgreSQL backend with a connection pool 15 pub fn new(database_url: &str) -> Result<Self, StorageError> { 16 let manager = 17 deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1); 18 let pool = Pool::builder(manager) 19 .build() 20 .map_err(|e| StorageError::Connection(e.to_string()))?; 21 22 Ok(Self { 23 pool: Arc::new(pool), 24 }) 25 } 26 27 /// Convert CID string to bytes (simplified) 28 fn cid_to_bytes(&self, cid: &str) -> Vec<u8> { 29 // TODO: Use parakeet_db::utils::cid functions 30 let mut bytes = cid.bytes().take(32).collect::<Vec<_>>(); 31 bytes.resize(32, 0); 32 bytes 33 } 34} 35 36#[async_trait] 37impl StorageBackend for PostgresBackend { 38 async fn upsert_post(&self, post: &Post<'static>) -> Result<(), StorageError> { 39 // Parse the AT URI to extract rkey 40 let parts: Vec<&str> = post.uri.split('/').collect(); 41 if parts.len() < 5 { 42 return Err(StorageError::Parse(format!("Invalid AT URI: {}", post.uri))); 43 } 44 let rkey_str = parts[4]; 45 46 // Convert TID to i64 using parakeet-db's utility 47 let rkey = tid::decode_tid(rkey_str) 48 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; 49 50 // Parse CID 51 let cid_bytes = self.cid_to_bytes(&post.cid); 52 53 // Serialize the post content 54 let content_json = 55 serde_json::to_value(&post.post).map_err(|e| StorageError::Parse(e.to_string()))?; 56 let content_bytes = 57 serde_json::to_vec(&content_json).map_err(|e| StorageError::Parse(e.to_string()))?; 58 59 // Log the operation (TODO: execute the actual query) 60 tracing::info!( 61 "Would upsert post: actor_id={}, rkey={}, cid_len={}, content_len={}, status={:?}", 62 post.actor_id, 63 rkey, 64 cid_bytes.len(), 65 content_bytes.len(), 66 types::PostStatus::Complete 67 ); 68 69 // TODO: uncomment this to actually write to the database: 70 /* 71 let mut conn = self.pool.get().await 72 .map_err(|e| StorageError::Connection(e.to_string()))?; 73 74 use diesel::prelude::*; 75 use diesel_async::RunQueryDsl; 76 use parakeet_db::schema; 77 78 diesel::insert_into(schema::posts::table) 79 .values(( 80 schema::posts::actor_id.eq(post.actor_id as i32), 81 schema::posts::rkey.eq(rkey), 82 schema::posts::cid.eq(cid_bytes), 83 schema::posts::content.eq(Some(content_bytes)), 84 schema::posts::status.eq(types::PostStatus::Complete), 85 schema::posts::langs.eq(Vec::<Option<types::LanguageCode>>::new()), 86 schema::posts::tags.eq(Vec::<Option<String>>::new()), 87 schema::posts::violates_threadgate.eq(false), 88 )) 89 .on_conflict((schema::posts::actor_id, schema::posts::rkey)) 90 .do_update() 91 .set(( 92 schema::posts::cid.eq(cid_bytes), 93 schema::posts::content.eq(Some(content_bytes)), 94 schema::posts::status.eq(types::PostStatus::Complete), 95 )) 96 .execute(&mut conn) 97 .await 98 .map_err(|e| StorageError::Query(e.to_string()))?; 99 */ 100 101 Ok(()) 102 } 103 104 async fn upsert_profile(&self, profile: &Profile<'static>) -> Result<(), StorageError> { 105 // Parse CID 106 let profile_cid = self.cid_to_bytes(&profile.cid); 107 108 // Log the operation 109 tracing::info!( 110 "Would upsert profile: actor_id={}, cid_len={}, display_name={:?}, sync_state={:?}", 111 profile.actor_id, 112 profile_cid.len(), 113 profile.profile.display_name, 114 types::ActorSyncState::Synced 115 ); 116 117 // TODO: uncomment to update the actors table 118 /* 119 let mut conn = self.pool.get().await 120 .map_err(|e| StorageError::Connection(e.to_string()))?; 121 122 use diesel::prelude::*; 123 use diesel_async::RunQueryDsl; 124 use parakeet_db::schema; 125 126 diesel::update(schema::actors::table) 127 .filter(schema::actors::id.eq(profile.actor_id as i32)) 128 .set(( 129 schema::actors::profile_cid.eq(Some(profile_cid)), 130 schema::actors::profile_display_name.eq(profile.profile.display_name.as_deref()), 131 schema::actors::profile_description.eq(profile.profile.description.as_deref()), 132 schema::actors::sync_state.eq(types::ActorSyncState::Synced), 133 )) 134 .execute(&mut conn) 135 .await 136 .map_err(|e| StorageError::Query(e.to_string()))?; 137 */ 138 139 Ok(()) 140 } 141 142 async fn create_follow(&self, follow: &Follow<'static>) -> Result<(), StorageError> { 143 // Parse the subject DID to get the target actor 144 let target_did = &follow.follow.subject; 145 146 // Parse rkey (TID) 147 let parts: Vec<&str> = follow.uri.split('/').collect(); 148 if parts.len() < 5 { 149 return Err(StorageError::Parse(format!( 150 "Invalid AT URI: {}", 151 follow.uri 152 ))); 153 } 154 let rkey = tid::decode_tid(parts[4]) 155 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; 156 157 // Log the operation 158 tracing::info!( 159 "Would create follow: actor {} follows {} (rkey: {})", 160 follow.actor_id, 161 target_did, 162 rkey 163 ); 164 165 // TODO: look up the target actor and update graph relationships 166 167 Ok(()) 168 } 169 170 async fn create_like(&self, like: &Like<'static>) -> Result<(), StorageError> { 171 // Parse the subject URI to extract post reference 172 let subject_uri = &like.like.subject.uri; 173 let parts: Vec<&str> = subject_uri.split('/').collect(); 174 if parts.len() < 5 { 175 return Err(StorageError::Parse(format!( 176 "Invalid subject URI: {}", 177 subject_uri 178 ))); 179 } 180 181 // Extract target post actor DID and rkey 182 let target_did = parts[2]; 183 let target_rkey_str = parts[4]; 184 let target_rkey = tid::decode_tid(target_rkey_str) 185 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; 186 187 // Parse like rkey 188 let like_parts: Vec<&str> = like.uri.split('/').collect(); 189 if like_parts.len() < 5 { 190 return Err(StorageError::Parse(format!("Invalid AT URI: {}", like.uri))); 191 } 192 let like_rkey = tid::decode_tid(like_parts[4]) 193 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; 194 195 // Log the operation 196 tracing::info!( 197 "Would create like: actor {} likes post did={}/rkey={} (like rkey: {})", 198 like.actor_id, 199 target_did, 200 target_rkey, 201 like_rkey 202 ); 203 204 // TODO: update the post's like arrays 205 206 Ok(()) 207 } 208 209 async fn create_repost(&self, repost: &Repost<'static>) -> Result<(), StorageError> { 210 // Parse the subject URI to extract post reference 211 let subject_uri = &repost.repost.subject.uri; 212 let parts: Vec<&str> = subject_uri.split('/').collect(); 213 if parts.len() < 5 { 214 return Err(StorageError::Parse(format!( 215 "Invalid subject URI: {}", 216 subject_uri 217 ))); 218 } 219 220 // Extract target post actor DID and rkey 221 let target_did = parts[2]; 222 let target_rkey_str = parts[4]; 223 let target_rkey = tid::decode_tid(target_rkey_str) 224 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; 225 226 // Parse repost rkey 227 let repost_parts: Vec<&str> = repost.uri.split('/').collect(); 228 if repost_parts.len() < 5 { 229 return Err(StorageError::Parse(format!( 230 "Invalid AT URI: {}", 231 repost.uri 232 ))); 233 } 234 let repost_rkey = tid::decode_tid(repost_parts[4]) 235 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; 236 237 // Log the operation 238 tracing::info!( 239 "Would create repost: actor {} reposts post did={}/rkey={} (repost rkey: {})", 240 repost.actor_id, 241 target_did, 242 target_rkey, 243 repost_rkey 244 ); 245 246 // TODO: update the post's repost arrays 247 248 Ok(()) 249 } 250 251 async fn delete_record(&self, uri: &str) -> Result<(), StorageError> { 252 // Parse the AT URI 253 let parts: Vec<&str> = uri.split('/').collect(); 254 if parts.len() < 5 { 255 return Err(StorageError::Parse(format!("Invalid AT URI: {}", uri))); 256 } 257 258 let did = parts[2]; 259 let collection = parts[3]; 260 let rkey_str = parts[4]; 261 262 // Handle different collection types 263 match collection { 264 "app.bsky.feed.post" => { 265 let rkey = tid::decode_tid(rkey_str) 266 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?; 267 268 // Log the operation 269 tracing::info!( 270 "Would delete post: did={}, rkey={}, status={:?}", 271 did, 272 rkey, 273 types::PostStatus::Deleted 274 ); 275 276 // TODO: mark the post as deleted in the database 277 } 278 _ => { 279 tracing::warn!("Unhandled collection type for deletion: {}", collection); 280 } 281 } 282 283 Ok(()) 284 } 285} 286 287#[async_trait] 288impl ActorBackend for PostgresBackend { 289 async fn get_actor_id(&self, did: &str) -> Result<i64, StorageError> { 290 // TODO: look up or create the actor in the database 291 let actor_id = did 292 .bytes() 293 .fold(1i64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as i64)) 294 .abs(); 295 296 tracing::info!( 297 "Would get/create actor: did={}, id={}, status={:?}, sync_state={:?}", 298 did, 299 actor_id, 300 types::ActorStatus::Active, 301 types::ActorSyncState::Partial 302 ); 303 304 // TODO: uncomment to actually query/insert: 305 /* 306 let mut conn = self.pool.get().await 307 .map_err(|e| StorageError::Connection(e.to_string()))?; 308 309 use diesel::prelude::*; 310 use diesel_async::RunQueryDsl; 311 use parakeet_db::schema; 312 313 let result = schema::actors::table 314 .select(schema::actors::id) 315 .filter(schema::actors::did.eq(did)) 316 .first::<i32>(&mut conn) 317 .await; 318 319 match result { 320 Ok(id) => Ok(id as i64), 321 Err(diesel::result::Error::NotFound) => { 322 let new_actor_id = diesel::insert_into(schema::actors::table) 323 .values(( 324 schema::actors::did.eq(did), 325 schema::actors::status.eq(types::ActorStatus::Active), 326 schema::actors::sync_state.eq(types::ActorSyncState::Partial), 327 )) 328 .returning(schema::actors::id) 329 .get_result::<i32>(&mut conn) 330 .await 331 .map_err(|e| StorageError::Query(e.to_string()))?; 332 333 Ok(new_actor_id as i64) 334 } 335 Err(e) => Err(StorageError::Query(e.to_string())), 336 } 337 */ 338 339 Ok(actor_id) 340 } 341}