1use crate::core::{ActorBackend, StorageBackend, StorageError};
2use crate::records::{Follow, Like, Post, Profile, Repost};
3use async_trait::async_trait;
4use deadpool_diesel::postgres::Pool;
5use parakeet_db::{types, utils::tid};
6use std::sync::Arc;
7
8/// PostgreSQL storage backend implementation leveraging parakeet-db
9pub struct PostgresBackend {
10 pool: Arc<Pool>,
11}
12
13impl PostgresBackend {
14 /// Create a new PostgreSQL backend with a connection pool
15 pub fn new(database_url: &str) -> Result<Self, StorageError> {
16 let manager =
17 deadpool_diesel::postgres::Manager::new(database_url, deadpool_diesel::Runtime::Tokio1);
18 let pool = Pool::builder(manager)
19 .build()
20 .map_err(|e| StorageError::Connection(e.to_string()))?;
21
22 Ok(Self {
23 pool: Arc::new(pool),
24 })
25 }
26
27 /// Convert CID string to bytes (simplified)
28 fn cid_to_bytes(&self, cid: &str) -> Vec<u8> {
29 // TODO: Use parakeet_db::utils::cid functions
30 let mut bytes = cid.bytes().take(32).collect::<Vec<_>>();
31 bytes.resize(32, 0);
32 bytes
33 }
34}
35
36#[async_trait]
37impl StorageBackend for PostgresBackend {
38 async fn upsert_post(&self, post: &Post<'static>) -> Result<(), StorageError> {
39 // Parse the AT URI to extract rkey
40 let parts: Vec<&str> = post.uri.split('/').collect();
41 if parts.len() < 5 {
42 return Err(StorageError::Parse(format!("Invalid AT URI: {}", post.uri)));
43 }
44 let rkey_str = parts[4];
45
46 // Convert TID to i64 using parakeet-db's utility
47 let rkey = tid::decode_tid(rkey_str)
48 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
49
50 // Parse CID
51 let cid_bytes = self.cid_to_bytes(&post.cid);
52
53 // Serialize the post content
54 let content_json =
55 serde_json::to_value(&post.post).map_err(|e| StorageError::Parse(e.to_string()))?;
56 let content_bytes =
57 serde_json::to_vec(&content_json).map_err(|e| StorageError::Parse(e.to_string()))?;
58
59 // Log the operation (TODO: execute the actual query)
60 tracing::info!(
61 "Would upsert post: actor_id={}, rkey={}, cid_len={}, content_len={}, status={:?}",
62 post.actor_id,
63 rkey,
64 cid_bytes.len(),
65 content_bytes.len(),
66 types::PostStatus::Complete
67 );
68
69 // TODO: uncomment this to actually write to the database:
70 /*
71 let mut conn = self.pool.get().await
72 .map_err(|e| StorageError::Connection(e.to_string()))?;
73
74 use diesel::prelude::*;
75 use diesel_async::RunQueryDsl;
76 use parakeet_db::schema;
77
78 diesel::insert_into(schema::posts::table)
79 .values((
80 schema::posts::actor_id.eq(post.actor_id as i32),
81 schema::posts::rkey.eq(rkey),
82 schema::posts::cid.eq(cid_bytes),
83 schema::posts::content.eq(Some(content_bytes)),
84 schema::posts::status.eq(types::PostStatus::Complete),
85 schema::posts::langs.eq(Vec::<Option<types::LanguageCode>>::new()),
86 schema::posts::tags.eq(Vec::<Option<String>>::new()),
87 schema::posts::violates_threadgate.eq(false),
88 ))
89 .on_conflict((schema::posts::actor_id, schema::posts::rkey))
90 .do_update()
91 .set((
92 schema::posts::cid.eq(cid_bytes),
93 schema::posts::content.eq(Some(content_bytes)),
94 schema::posts::status.eq(types::PostStatus::Complete),
95 ))
96 .execute(&mut conn)
97 .await
98 .map_err(|e| StorageError::Query(e.to_string()))?;
99 */
100
101 Ok(())
102 }
103
104 async fn upsert_profile(&self, profile: &Profile<'static>) -> Result<(), StorageError> {
105 // Parse CID
106 let profile_cid = self.cid_to_bytes(&profile.cid);
107
108 // Log the operation
109 tracing::info!(
110 "Would upsert profile: actor_id={}, cid_len={}, display_name={:?}, sync_state={:?}",
111 profile.actor_id,
112 profile_cid.len(),
113 profile.profile.display_name,
114 types::ActorSyncState::Synced
115 );
116
117 // TODO: uncomment to update the actors table
118 /*
119 let mut conn = self.pool.get().await
120 .map_err(|e| StorageError::Connection(e.to_string()))?;
121
122 use diesel::prelude::*;
123 use diesel_async::RunQueryDsl;
124 use parakeet_db::schema;
125
126 diesel::update(schema::actors::table)
127 .filter(schema::actors::id.eq(profile.actor_id as i32))
128 .set((
129 schema::actors::profile_cid.eq(Some(profile_cid)),
130 schema::actors::profile_display_name.eq(profile.profile.display_name.as_deref()),
131 schema::actors::profile_description.eq(profile.profile.description.as_deref()),
132 schema::actors::sync_state.eq(types::ActorSyncState::Synced),
133 ))
134 .execute(&mut conn)
135 .await
136 .map_err(|e| StorageError::Query(e.to_string()))?;
137 */
138
139 Ok(())
140 }
141
142 async fn create_follow(&self, follow: &Follow<'static>) -> Result<(), StorageError> {
143 // Parse the subject DID to get the target actor
144 let target_did = &follow.follow.subject;
145
146 // Parse rkey (TID)
147 let parts: Vec<&str> = follow.uri.split('/').collect();
148 if parts.len() < 5 {
149 return Err(StorageError::Parse(format!(
150 "Invalid AT URI: {}",
151 follow.uri
152 )));
153 }
154 let rkey = tid::decode_tid(parts[4])
155 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
156
157 // Log the operation
158 tracing::info!(
159 "Would create follow: actor {} follows {} (rkey: {})",
160 follow.actor_id,
161 target_did,
162 rkey
163 );
164
165 // TODO: look up the target actor and update graph relationships
166
167 Ok(())
168 }
169
170 async fn create_like(&self, like: &Like<'static>) -> Result<(), StorageError> {
171 // Parse the subject URI to extract post reference
172 let subject_uri = &like.like.subject.uri;
173 let parts: Vec<&str> = subject_uri.split('/').collect();
174 if parts.len() < 5 {
175 return Err(StorageError::Parse(format!(
176 "Invalid subject URI: {}",
177 subject_uri
178 )));
179 }
180
181 // Extract target post actor DID and rkey
182 let target_did = parts[2];
183 let target_rkey_str = parts[4];
184 let target_rkey = tid::decode_tid(target_rkey_str)
185 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
186
187 // Parse like rkey
188 let like_parts: Vec<&str> = like.uri.split('/').collect();
189 if like_parts.len() < 5 {
190 return Err(StorageError::Parse(format!("Invalid AT URI: {}", like.uri)));
191 }
192 let like_rkey = tid::decode_tid(like_parts[4])
193 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
194
195 // Log the operation
196 tracing::info!(
197 "Would create like: actor {} likes post did={}/rkey={} (like rkey: {})",
198 like.actor_id,
199 target_did,
200 target_rkey,
201 like_rkey
202 );
203
204 // TODO: update the post's like arrays
205
206 Ok(())
207 }
208
209 async fn create_repost(&self, repost: &Repost<'static>) -> Result<(), StorageError> {
210 // Parse the subject URI to extract post reference
211 let subject_uri = &repost.repost.subject.uri;
212 let parts: Vec<&str> = subject_uri.split('/').collect();
213 if parts.len() < 5 {
214 return Err(StorageError::Parse(format!(
215 "Invalid subject URI: {}",
216 subject_uri
217 )));
218 }
219
220 // Extract target post actor DID and rkey
221 let target_did = parts[2];
222 let target_rkey_str = parts[4];
223 let target_rkey = tid::decode_tid(target_rkey_str)
224 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
225
226 // Parse repost rkey
227 let repost_parts: Vec<&str> = repost.uri.split('/').collect();
228 if repost_parts.len() < 5 {
229 return Err(StorageError::Parse(format!(
230 "Invalid AT URI: {}",
231 repost.uri
232 )));
233 }
234 let repost_rkey = tid::decode_tid(repost_parts[4])
235 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
236
237 // Log the operation
238 tracing::info!(
239 "Would create repost: actor {} reposts post did={}/rkey={} (repost rkey: {})",
240 repost.actor_id,
241 target_did,
242 target_rkey,
243 repost_rkey
244 );
245
246 // TODO: update the post's repost arrays
247
248 Ok(())
249 }
250
251 async fn delete_record(&self, uri: &str) -> Result<(), StorageError> {
252 // Parse the AT URI
253 let parts: Vec<&str> = uri.split('/').collect();
254 if parts.len() < 5 {
255 return Err(StorageError::Parse(format!("Invalid AT URI: {}", uri)));
256 }
257
258 let did = parts[2];
259 let collection = parts[3];
260 let rkey_str = parts[4];
261
262 // Handle different collection types
263 match collection {
264 "app.bsky.feed.post" => {
265 let rkey = tid::decode_tid(rkey_str)
266 .map_err(|e| StorageError::Parse(format!("Invalid TID: {:?}", e)))?;
267
268 // Log the operation
269 tracing::info!(
270 "Would delete post: did={}, rkey={}, status={:?}",
271 did,
272 rkey,
273 types::PostStatus::Deleted
274 );
275
276 // TODO: mark the post as deleted in the database
277 }
278 _ => {
279 tracing::warn!("Unhandled collection type for deletion: {}", collection);
280 }
281 }
282
283 Ok(())
284 }
285}
286
287#[async_trait]
288impl ActorBackend for PostgresBackend {
289 async fn get_actor_id(&self, did: &str) -> Result<i64, StorageError> {
290 // TODO: look up or create the actor in the database
291 let actor_id = did
292 .bytes()
293 .fold(1i64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as i64))
294 .abs();
295
296 tracing::info!(
297 "Would get/create actor: did={}, id={}, status={:?}, sync_state={:?}",
298 did,
299 actor_id,
300 types::ActorStatus::Active,
301 types::ActorSyncState::Partial
302 );
303
304 // TODO: uncomment to actually query/insert:
305 /*
306 let mut conn = self.pool.get().await
307 .map_err(|e| StorageError::Connection(e.to_string()))?;
308
309 use diesel::prelude::*;
310 use diesel_async::RunQueryDsl;
311 use parakeet_db::schema;
312
313 let result = schema::actors::table
314 .select(schema::actors::id)
315 .filter(schema::actors::did.eq(did))
316 .first::<i32>(&mut conn)
317 .await;
318
319 match result {
320 Ok(id) => Ok(id as i64),
321 Err(diesel::result::Error::NotFound) => {
322 let new_actor_id = diesel::insert_into(schema::actors::table)
323 .values((
324 schema::actors::did.eq(did),
325 schema::actors::status.eq(types::ActorStatus::Active),
326 schema::actors::sync_state.eq(types::ActorSyncState::Partial),
327 ))
328 .returning(schema::actors::id)
329 .get_result::<i32>(&mut conn)
330 .await
331 .map_err(|e| StorageError::Query(e.to_string()))?;
332
333 Ok(new_actor_id as i64)
334 }
335 Err(e) => Err(StorageError::Query(e.to_string())),
336 }
337 */
338
339 Ok(actor_id)
340 }
341}