···2929}
30303131pub mod storage {
3232- // pub mod postgres;
3232+ pub mod postgres;
33333434- // pub use postgres::PostgresBackend;
3434+ pub use postgres::PostgresBackend;
3535}
36363737pub use core::actor_store::ActorIdStore;
+341
parakeet-consumer/src/storage/postgres.rs
···11+use crate::core::{ActorBackend, StorageBackend, StorageError};
22+use crate::records::{Follow, Like, Post, Profile, Repost};
33+use async_trait::async_trait;
44+use deadpool_diesel::postgres::Pool;
55+use parakeet_db::{utils::tid, types};
66+use std::sync::Arc;
77+88+/// PostgreSQL storage backend implementation leveraging parakeet-db
99+pub struct PostgresBackend {
1010+ pool: Arc<Pool>,
1111+}
1212+1313+impl PostgresBackend {
1414+ /// Create a new PostgreSQL backend with a connection pool
1515+ pub fn new(database_url: &str) -> Result<Self, StorageError> {
1616+ let manager =
1717+ deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1);
1818+ let pool = Pool::builder(manager)
1919+ .build()
2020+ .map_err(|e| StorageError::Connection(e.to_string()))?;
2121+2222+ Ok(Self {
2323+ pool: Arc::new(pool),
2424+ })
2525+ }
2626+2727+ /// Convert CID string to bytes (simplified)
2828+ fn cid_to_bytes(&self, cid: &str) -> Vec<u8> {
2929+ // TODO: Use parakeet_db::utils::cid functions
3030+ let mut bytes = cid.bytes().take(32).collect::<Vec<_>>();
3131+ bytes.resize(32, 0);
3232+ bytes
3333+ }
3434+}
3535+3636+#[async_trait]
3737+impl StorageBackend for PostgresBackend {
3838+ async fn upsert_post(&self, post: &Post<'static>) -> Result<(), StorageError> {
3939+ // Parse the AT URI to extract rkey
4040+ let parts: Vec<&str> = post.uri.split('/').collect();
4141+ if parts.len() < 5 {
4242+ return Err(StorageError::Parse(format!("Invalid AT URI: {}", post.uri)));
4343+ }
4444+ let rkey_str = parts[4];
4545+4646+ // Convert TID to i64 using parakeet-db's utility
4747+ let rkey = tid::decode_tid(rkey_str)
4848+ .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
4949+5050+ // Parse CID
5151+ let cid_bytes = self.cid_to_bytes(&post.cid);
5252+5353+ // Serialize the post content
5454+ let content_json =
5555+ serde_json::to_value(&post.post).map_err(|e| StorageError::Parse(e.to_string()))?;
5656+ let content_bytes =
5757+ serde_json::to_vec(&content_json).map_err(|e| StorageError::Parse(e.to_string()))?;
5858+5959+ // Log the operation (TODO: execute the actual query)
6060+ tracing::info!(
6161+ "Would upsert post: actor_id={}, rkey={}, cid_len={}, content_len={}, status={:?}",
6262+ post.actor_id,
6363+ rkey,
6464+ cid_bytes.len(),
6565+ content_bytes.len(),
6666+ types::PostStatus::Complete
6767+ );
6868+6969+ // TODO: uncomment this to actually write to the database:
7070+ /*
7171+ let mut conn = self.pool.get().await
7272+ .map_err(|e| StorageError::Connection(e.to_string()))?;
7373+7474+ use diesel::prelude::*;
7575+ use diesel_async::RunQueryDsl;
7676+ use parakeet_db::schema;
7777+7878+ diesel::insert_into(schema::posts::table)
7979+ .values((
8080+ schema::posts::actor_id.eq(post.actor_id as i32),
8181+ schema::posts::rkey.eq(rkey),
8282+ schema::posts::cid.eq(cid_bytes),
8383+ schema::posts::content.eq(Some(content_bytes)),
8484+ schema::posts::status.eq(types::PostStatus::Complete),
8585+ schema::posts::langs.eq(Vec::<Option<types::LanguageCode>>::new()),
8686+ schema::posts::tags.eq(Vec::<Option<String>>::new()),
8787+ schema::posts::violates_threadgate.eq(false),
8888+ ))
8989+ .on_conflict((schema::posts::actor_id, schema::posts::rkey))
9090+ .do_update()
9191+ .set((
9292+ schema::posts::cid.eq(cid_bytes),
9393+ schema::posts::content.eq(Some(content_bytes)),
9494+ schema::posts::status.eq(types::PostStatus::Complete),
9595+ ))
9696+ .execute(&mut conn)
9797+ .await
9898+ .map_err(|e| StorageError::Query(e.to_string()))?;
9999+ */
100100+101101+ Ok(())
102102+ }
103103+104104+ async fn upsert_profile(&self, profile: &Profile<'static>) -> Result<(), StorageError> {
105105+ // Parse CID
106106+ let profile_cid = self.cid_to_bytes(&profile.cid);
107107+108108+ // Log the operation
109109+ tracing::info!(
110110+ "Would upsert profile: actor_id={}, cid_len={}, display_name={:?}, sync_state={:?}",
111111+ profile.actor_id,
112112+ profile_cid.len(),
113113+ profile.profile.display_name,
114114+ types::ActorSyncState::Synced
115115+ );
116116+117117+ // TODO: uncomment to update the actors table
118118+ /*
119119+ let mut conn = self.pool.get().await
120120+ .map_err(|e| StorageError::Connection(e.to_string()))?;
121121+122122+ use diesel::prelude::*;
123123+ use diesel_async::RunQueryDsl;
124124+ use parakeet_db::schema;
125125+126126+ diesel::update(schema::actors::table)
127127+ .filter(schema::actors::id.eq(profile.actor_id as i32))
128128+ .set((
129129+ schema::actors::profile_cid.eq(Some(profile_cid)),
130130+ schema::actors::profile_display_name.eq(profile.profile.display_name.as_deref()),
131131+ schema::actors::profile_description.eq(profile.profile.description.as_deref()),
132132+ schema::actors::sync_state.eq(types::ActorSyncState::Synced),
133133+ ))
134134+ .execute(&mut conn)
135135+ .await
136136+ .map_err(|e| StorageError::Query(e.to_string()))?;
137137+ */
138138+139139+ Ok(())
140140+ }
141141+142142+ async fn create_follow(&self, follow: &Follow<'static>) -> Result<(), StorageError> {
143143+ // Parse the subject DID to get the target actor
144144+ let target_did = &follow.follow.subject;
145145+146146+ // Parse rkey (TID)
147147+ let parts: Vec<&str> = follow.uri.split('/').collect();
148148+ if parts.len() < 5 {
149149+ return Err(StorageError::Parse(format!(
150150+ "Invalid AT URI: {}",
151151+ follow.uri
152152+ )));
153153+ }
154154+ let rkey = tid::decode_tid(parts[4])
155155+ .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
156156+157157+ // Log the operation
158158+ tracing::info!(
159159+ "Would create follow: actor {} follows {} (rkey: {})",
160160+ follow.actor_id,
161161+ target_did,
162162+ rkey
163163+ );
164164+165165+ // TODO: look up the target actor and update graph relationships
166166+167167+ Ok(())
168168+ }
169169+170170+ async fn create_like(&self, like: &Like<'static>) -> Result<(), StorageError> {
171171+ // Parse the subject URI to extract post reference
172172+ let subject_uri = &like.like.subject.uri;
173173+ let parts: Vec<&str> = subject_uri.split('/').collect();
174174+ if parts.len() < 5 {
175175+ return Err(StorageError::Parse(format!(
176176+ "Invalid subject URI: {}",
177177+ subject_uri
178178+ )));
179179+ }
180180+181181+ // Extract target post actor DID and rkey
182182+ let target_did = parts[2];
183183+ let target_rkey_str = parts[4];
184184+ let target_rkey = tid::decode_tid(target_rkey_str)
185185+ .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
186186+187187+ // Parse like rkey
188188+ let like_parts: Vec<&str> = like.uri.split('/').collect();
189189+ if like_parts.len() < 5 {
190190+ return Err(StorageError::Parse(format!("Invalid AT URI: {}", like.uri)));
191191+ }
192192+ let like_rkey = tid::decode_tid(like_parts[4])
193193+ .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
194194+195195+ // Log the operation
196196+ tracing::info!(
197197+ "Would create like: actor {} likes post did={}/rkey={} (like rkey: {})",
198198+ like.actor_id,
199199+ target_did,
200200+ target_rkey,
201201+ like_rkey
202202+ );
203203+204204+ // TODO: update the post's like arrays
205205+206206+ Ok(())
207207+ }
208208+209209+ async fn create_repost(&self, repost: &Repost<'static>) -> Result<(), StorageError> {
210210+ // Parse the subject URI to extract post reference
211211+ let subject_uri = &repost.repost.subject.uri;
212212+ let parts: Vec<&str> = subject_uri.split('/').collect();
213213+ if parts.len() < 5 {
214214+ return Err(StorageError::Parse(format!(
215215+ "Invalid subject URI: {}",
216216+ subject_uri
217217+ )));
218218+ }
219219+220220+ // Extract target post actor DID and rkey
221221+ let target_did = parts[2];
222222+ let target_rkey_str = parts[4];
223223+ let target_rkey = tid::decode_tid(target_rkey_str)
224224+ .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
225225+226226+ // Parse repost rkey
227227+ let repost_parts: Vec<&str> = repost.uri.split('/').collect();
228228+ if repost_parts.len() < 5 {
229229+ return Err(StorageError::Parse(format!(
230230+ "Invalid AT URI: {}",
231231+ repost.uri
232232+ )));
233233+ }
234234+ let repost_rkey = tid::decode_tid(repost_parts[4])
235235+ .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
236236+237237+ // Log the operation
238238+ tracing::info!(
239239+ "Would create repost: actor {} reposts post did={}/rkey={} (repost rkey: {})",
240240+ repost.actor_id,
241241+ target_did,
242242+ target_rkey,
243243+ repost_rkey
244244+ );
245245+246246+ // TODO: update the post's repost arrays
247247+248248+ Ok(())
249249+ }
250250+251251+ async fn delete_record(&self, uri: &str) -> Result<(), StorageError> {
252252+ // Parse the AT URI
253253+ let parts: Vec<&str> = uri.split('/').collect();
254254+ if parts.len() < 5 {
255255+ return Err(StorageError::Parse(format!("Invalid AT URI: {}", uri)));
256256+ }
257257+258258+ let did = parts[2];
259259+ let collection = parts[3];
260260+ let rkey_str = parts[4];
261261+262262+ // Handle different collection types
263263+ match collection {
264264+ "app.bsky.feed.post" => {
265265+ let rkey = tid::decode_tid(rkey_str)
266266+ .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
267267+268268+ // Log the operation
269269+ tracing::info!(
270270+ "Would delete post: did={}, rkey={}, status={:?}",
271271+ did,
272272+ rkey,
273273+ types::PostStatus::Deleted
274274+ );
275275+276276+ // TODO: mark the post as deleted in the database
277277+ }
278278+ _ => {
279279+ tracing::warn!("Unhandled collection type for deletion: {}", collection);
280280+ }
281281+ }
282282+283283+ Ok(())
284284+ }
285285+}
286286+287287+#[async_trait]
288288+impl ActorBackend for PostgresBackend {
289289+ async fn get_actor_id(&self, did: &str) -> Result<i64, StorageError> {
290290+ // TODO: look up or create the actor in the database
291291+ let actor_id = did
292292+ .bytes()
293293+ .fold(1i64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as i64))
294294+ .abs();
295295+296296+ tracing::info!(
297297+ "Would get/create actor: did={}, id={}, status={:?}, sync_state={:?}",
298298+ did,
299299+ actor_id,
300300+ types::ActorStatus::Active,
301301+ types::ActorSyncState::Partial
302302+ );
303303+304304+ // TODO: uncomment to actually query/insert:
305305+ /*
306306+ let mut conn = self.pool.get().await
307307+ .map_err(|e| StorageError::Connection(e.to_string()))?;
308308+309309+ use diesel::prelude::*;
310310+ use diesel_async::RunQueryDsl;
311311+ use parakeet_db::schema;
312312+313313+ let result = schema::actors::table
314314+ .select(schema::actors::id)
315315+ .filter(schema::actors::did.eq(did))
316316+ .first::<i32>(&mut conn)
317317+ .await;
318318+319319+ match result {
320320+ Ok(id) => Ok(id as i64),
321321+ Err(diesel::result::Error::NotFound) => {
322322+ let new_actor_id = diesel::insert_into(schema::actors::table)
323323+ .values((
324324+ schema::actors::did.eq(did),
325325+ schema::actors::status.eq(types::ActorStatus::Active),
326326+ schema::actors::sync_state.eq(types::ActorSyncState::Partial),
327327+ ))
328328+ .returning(schema::actors::id)
329329+ .get_result::<i32>(&mut conn)
330330+ .await
331331+ .map_err(|e| StorageError::Query(e.to_string()))?;
332332+333333+ Ok(new_actor_id as i64)
334334+ }
335335+ Err(e) => Err(StorageError::Query(e.to_string())),
336336+ }
337337+ */
338338+339339+ Ok(actor_id)
340340+ }
341341+}