learn and share notes on atproto (wip) 🦉 malfestio.stormlightlabs.org/
readability solid axum atproto srs

feat: sync tracking persistence layer

Changed files
+1409 -9
crates
server
docs
migrations
+1
crates/server/src/lib.rs
··· 7 pub mod pds; 8 pub mod repository; 9 pub mod state; 10 pub mod well_known; 11 12 use axum::http::Method;
··· 7 pub mod pds; 8 pub mod repository; 9 pub mod state; 10 + pub mod sync_service; 11 pub mod well_known; 12 13 use axum::http::Method;
+53 -7
crates/server/src/pds/client.rs
··· 40 pub cid: String, 41 } 42 43 /// Request body for deleteRecord XRPC. 44 #[derive(Serialize)] 45 #[serde(rename_all = "camelCase")] ··· 124 } 125 126 /// Create or update a record in the repository. 127 - /// 128 - /// # Arguments 129 - /// 130 - /// * `did` - The user's DID (repository owner) 131 - /// * `collection` - The collection NSID (e.g., "org.stormlightlabs.malfestio.deck") 132 - /// * `rkey` - The record key (TID) 133 - /// * `record` - The record data as JSON 134 pub async fn put_record( 135 &self, did: &str, collection: &str, rkey: &str, record: serde_json::Value, 136 ) -> Result<AtUri, PdsError> { ··· 164 .map_err(|e| PdsError::NetworkError(e.to_string()))?; 165 166 self.handle_response(response).await 167 } 168 169 /// Delete a record from the repository.
··· 40 pub cid: String, 41 } 42 43 + /// Response from getRecord XRPC. 44 + #[derive(Deserialize)] 45 + #[serde(rename_all = "camelCase")] 46 + pub struct GetRecordResponse { 47 + pub uri: String, 48 + pub cid: String, 49 + pub value: serde_json::Value, 50 + } 51 + 52 + /// Result of getting a record from PDS. 53 + #[derive(Debug, Clone)] 54 + pub struct GetRecordResult { 55 + pub uri: String, 56 + pub cid: String, 57 + pub value: serde_json::Value, 58 + } 59 + 60 /// Request body for deleteRecord XRPC. 61 #[derive(Serialize)] 62 #[serde(rename_all = "camelCase")] ··· 141 } 142 143 /// Create or update a record in the repository. 144 pub async fn put_record( 145 &self, did: &str, collection: &str, rkey: &str, record: serde_json::Value, 146 ) -> Result<AtUri, PdsError> { ··· 174 .map_err(|e| PdsError::NetworkError(e.to_string()))?; 175 176 self.handle_response(response).await 177 + } 178 + 179 + /// Get a record from the repository. 180 + pub async fn get_record(&self, did: &str, collection: &str, rkey: &str) -> Result<GetRecordResult, PdsError> { 181 + let url = format!( 182 + "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 183 + self.pds_url, did, collection, rkey 184 + ); 185 + 186 + let mut request_builder = self.http_client.get(&url); 187 + 188 + if let Some(ref dpop_keypair) = self.dpop_keypair { 189 + let dpop_proof = dpop_keypair.generate_proof("GET", &url, Some(&self.access_token)); 190 + request_builder = request_builder 191 + .header("Authorization", format!("DPoP {}", self.access_token)) 192 + .header("DPoP", dpop_proof); 193 + } else { 194 + request_builder = request_builder.header("Authorization", format!("Bearer {}", self.access_token)); 195 + } 196 + 197 + let response = request_builder 198 + .send() 199 + .await 200 + .map_err(|e| PdsError::NetworkError(e.to_string()))?; 201 + 202 + if response.status().is_success() { 203 + let get_response: GetRecordResponse = response 204 + .json() 205 + .await 206 + .map_err(|e| PdsError::NetworkError(format!("Failed to parse response: {}", e)))?; 207 + Ok(GetRecordResult { uri: get_response.uri, cid: get_response.cid, value: get_response.value }) 208 + } else { 209 + let status = response.status(); 210 + let body = response.text().await.unwrap_or_default(); 211 + Err(self.map_error_status(status, body)) 212 + } 213 } 214 215 /// Delete a record from the repository.
+1
crates/server/src/repository/mod.rs
··· 6 pub mod review; 7 pub mod search; 8 pub mod social;
··· 6 pub mod review; 7 pub mod search; 8 pub mod social; 9 + pub mod sync;
+749
crates/server/src/repository/sync.rs
···
··· 1 + //! Sync repository for tracking synchronization state. 2 + //! 3 + //! Manages the sync status of entities between local database and PDS. 4 + 5 + use std::str::FromStr; 6 + 7 + use async_trait::async_trait; 8 + use chrono::{DateTime, Utc}; 9 + use uuid::Uuid; 10 + 11 + /// Sync status for an entity. 12 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 13 + pub enum SyncStatus { 14 + /// Never synced to PDS 15 + LocalOnly, 16 + /// In sync with PDS 17 + Synced, 18 + /// Local changes need to be pushed 19 + PendingPush, 20 + /// Local and remote both changed (conflict) 21 + Conflict, 22 + } 23 + 24 + impl SyncStatus { 25 + pub fn as_str(&self) -> &'static str { 26 + match self { 27 + SyncStatus::LocalOnly => "local_only", 28 + SyncStatus::Synced => "synced", 29 + SyncStatus::PendingPush => "pending_push", 30 + SyncStatus::Conflict => "conflict", 31 + } 32 + } 33 + } 34 + 35 + impl FromStr for SyncStatus { 36 + type Err = std::fmt::Error; 37 + 38 + fn from_str(s: &str) -> Result<Self, Self::Err> { 39 + match s { 40 + "local_only" => Ok(SyncStatus::LocalOnly), 41 + "synced" => Ok(SyncStatus::Synced), 42 + "pending_push" => Ok(SyncStatus::PendingPush), 43 + "conflict" => Ok(SyncStatus::Conflict), 44 + _ => Err(std::fmt::Error), 45 + } 46 + } 47 + } 48 + 49 + impl std::fmt::Display for SyncStatus { 50 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 51 + f.write_str(self.as_str()) 52 + } 53 + } 54 + 55 + /// A pending sync operation. 56 + #[derive(Debug, Clone)] 57 + pub struct PendingSync { 58 + pub entity_type: String, 59 + pub entity_id: String, 60 + pub owner_did: String, 61 + pub version: i32, 62 + pub status: SyncStatus, 63 + } 64 + 65 + /// Sync metadata for an entity. 66 + #[derive(Debug, Clone)] 67 + pub struct SyncMetadata { 68 + pub entity_type: String, 69 + pub entity_id: String, 70 + pub version: i32, 71 + pub pds_cid: Option<String>, 72 + pub pds_uri: Option<String>, 73 + pub status: SyncStatus, 74 + pub last_synced_at: Option<DateTime<Utc>>, 75 + } 76 + 77 + /// Entry in the sync log. 78 + #[derive(Debug, Clone)] 79 + pub struct SyncLogEntry { 80 + pub id: String, 81 + pub owner_did: String, 82 + pub entity_type: String, 83 + pub entity_id: String, 84 + pub operation: String, 85 + pub status: String, 86 + pub pds_cid: Option<String>, 87 + pub error_message: Option<String>, 88 + pub created_at: DateTime<Utc>, 89 + pub completed_at: Option<DateTime<Utc>>, 90 + } 91 + 92 + /// Parameters for logging a sync operation. 93 + #[derive(Debug, Clone)] 94 + pub struct LogOperationParams<'a> { 95 + pub owner_did: &'a str, 96 + pub entity_type: &'a str, 97 + pub entity_id: &'a str, 98 + pub operation: &'a str, 99 + pub status: &'a str, 100 + pub pds_cid: Option<&'a str>, 101 + pub error_message: Option<&'a str>, 102 + } 103 + 104 + /// Error type for sync repository operations. 105 + #[derive(Debug)] 106 + pub enum SyncRepoError { 107 + DatabaseError(String), 108 + NotFound(String), 109 + InvalidArgument(String), 110 + } 111 + 112 + impl std::fmt::Display for SyncRepoError { 113 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 114 + match self { 115 + SyncRepoError::DatabaseError(e) => write!(f, "Database error: {}", e), 116 + SyncRepoError::NotFound(e) => write!(f, "Not found: {}", e), 117 + SyncRepoError::InvalidArgument(e) => write!(f, "Invalid argument: {}", e), 118 + } 119 + } 120 + } 121 + 122 + impl std::error::Error for SyncRepoError {} 123 + 124 + /// Repository trait for sync operations. 125 + #[async_trait] 126 + pub trait SyncRepository: Send + Sync { 127 + /// Get sync metadata for an entity. 128 + async fn get_sync_metadata(&self, entity_type: &str, id: &str) -> Result<SyncMetadata, SyncRepoError>; 129 + 130 + /// Mark an entity as synced with given PDS CID and URI. 131 + async fn mark_synced(&self, entity_type: &str, id: &str, pds_cid: &str, pds_uri: &str) 132 + -> Result<(), SyncRepoError>; 133 + 134 + /// Mark an entity as pending push. 135 + async fn mark_pending(&self, entity_type: &str, id: &str) -> Result<(), SyncRepoError>; 136 + 137 + /// Mark an entity as having a conflict. 138 + async fn mark_conflict(&self, entity_type: &str, id: &str) -> Result<(), SyncRepoError>; 139 + 140 + /// Get all pending items for a user. 141 + async fn get_pending_items(&self, owner_did: &str) -> Result<Vec<PendingSync>, SyncRepoError>; 142 + 143 + /// Get all conflicts for a user. 144 + async fn get_conflicts(&self, owner_did: &str) -> Result<Vec<PendingSync>, SyncRepoError>; 145 + 146 + /// Increment version for an entity (used when resolving conflicts). 147 + async fn increment_version(&self, entity_type: &str, id: &str) -> Result<i32, SyncRepoError>; 148 + 149 + /// Log a sync operation. 150 + async fn log_operation(&self, params: LogOperationParams<'_>) -> Result<String, SyncRepoError>; 151 + 152 + /// Mark a sync log entry as completed. 153 + async fn complete_log_entry( 154 + &self, log_id: &str, status: &str, pds_cid: Option<&str>, error_message: Option<&str>, 155 + ) -> Result<(), SyncRepoError>; 156 + } 157 + 158 + /// Database implementation of SyncRepository. 159 + pub struct DbSyncRepository { 160 + pool: crate::db::DbPool, 161 + } 162 + 163 + impl DbSyncRepository { 164 + pub fn new(pool: crate::db::DbPool) -> Self { 165 + Self { pool } 166 + } 167 + 168 + fn table_for_entity(&self, entity_type: &str) -> Result<&'static str, SyncRepoError> { 169 + match entity_type { 170 + "deck" => Ok("decks"), 171 + "card" => Ok("cards"), 172 + "note" => Ok("notes"), 173 + _ => Err(SyncRepoError::InvalidArgument(format!( 174 + "Unknown entity type: {}", 175 + entity_type 176 + ))), 177 + } 178 + } 179 + } 180 + 181 + #[async_trait] 182 + impl SyncRepository for DbSyncRepository { 183 + async fn get_sync_metadata(&self, entity_type: &str, id: &str) -> Result<SyncMetadata, SyncRepoError> { 184 + let table = self.table_for_entity(entity_type)?; 185 + let uuid = Uuid::parse_str(id).map_err(|e| SyncRepoError::InvalidArgument(format!("Invalid UUID: {}", e)))?; 186 + 187 + let client = self 188 + .pool 189 + .get() 190 + .await 191 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to get connection: {}", e)))?; 192 + 193 + let query = format!( 194 + "SELECT version, pds_cid, pds_uri, sync_status::text, last_synced_at FROM {} WHERE id = $1", 195 + table 196 + ); 197 + 198 + let row = client 199 + .query_opt(&query, &[&uuid]) 200 + .await 201 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to query: {}", e)))? 202 + .ok_or_else(|| SyncRepoError::NotFound(format!("{} not found: {}", entity_type, id)))?; 203 + 204 + let version: i32 = row.get("version"); 205 + let pds_cid: Option<String> = row.get("pds_cid"); 206 + let pds_uri: Option<String> = row.get("pds_uri"); 207 + let status_str: String = row.get("sync_status"); 208 + let last_synced_at: Option<DateTime<Utc>> = row.get("last_synced_at"); 209 + 210 + Ok(SyncMetadata { 211 + entity_type: entity_type.to_string(), 212 + entity_id: id.to_string(), 213 + version, 214 + pds_cid, 215 + pds_uri, 216 + status: SyncStatus::from_str(&status_str).unwrap_or(SyncStatus::LocalOnly), 217 + last_synced_at, 218 + }) 219 + } 220 + 221 + async fn mark_synced( 222 + &self, entity_type: &str, id: &str, pds_cid: &str, pds_uri: &str, 223 + ) -> Result<(), SyncRepoError> { 224 + let table = self.table_for_entity(entity_type)?; 225 + let uuid = Uuid::parse_str(id).map_err(|e| SyncRepoError::InvalidArgument(format!("Invalid UUID: {}", e)))?; 226 + 227 + let client = self 228 + .pool 229 + .get() 230 + .await 231 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to get connection: {}", e)))?; 232 + 233 + let query = format!( 234 + "UPDATE {} SET sync_status = 'synced', pds_cid = $1, pds_uri = $2, last_synced_at = NOW() WHERE id = $3", 235 + table 236 + ); 237 + 238 + client 239 + .execute(&query, &[&pds_cid, &pds_uri, &uuid]) 240 + .await 241 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to update: {}", e)))?; 242 + 243 + Ok(()) 244 + } 245 + 246 + async fn mark_pending(&self, entity_type: &str, id: &str) -> Result<(), SyncRepoError> { 247 + let table = self.table_for_entity(entity_type)?; 248 + let uuid = Uuid::parse_str(id).map_err(|e| SyncRepoError::InvalidArgument(format!("Invalid UUID: {}", e)))?; 249 + 250 + let client = self 251 + .pool 252 + .get() 253 + .await 254 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to get connection: {}", e)))?; 255 + 256 + let query = format!("UPDATE {} SET sync_status = 'pending_push' WHERE id = $1", table); 257 + 258 + client 259 + .execute(&query, &[&uuid]) 260 + .await 261 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to update: {}", e)))?; 262 + 263 + Ok(()) 264 + } 265 + 266 + async fn mark_conflict(&self, entity_type: &str, id: &str) -> Result<(), SyncRepoError> { 267 + let table = self.table_for_entity(entity_type)?; 268 + let uuid = Uuid::parse_str(id).map_err(|e| SyncRepoError::InvalidArgument(format!("Invalid UUID: {}", e)))?; 269 + 270 + let client = self 271 + .pool 272 + .get() 273 + .await 274 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to get connection: {}", e)))?; 275 + 276 + let query = format!("UPDATE {} SET sync_status = 'conflict' WHERE id = $1", table); 277 + 278 + client 279 + .execute(&query, &[&uuid]) 280 + .await 281 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to update: {}", e)))?; 282 + 283 + Ok(()) 284 + } 285 + 286 + async fn get_pending_items(&self, owner_did: &str) -> Result<Vec<PendingSync>, SyncRepoError> { 287 + let client = self 288 + .pool 289 + .get() 290 + .await 291 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to get connection: {}", e)))?; 292 + 293 + let mut pending = Vec::new(); 294 + 295 + for (entity_type, table) in [("deck", "decks"), ("card", "cards"), ("note", "notes")] { 296 + let query = format!( 297 + "SELECT id, version, sync_status::text FROM {} WHERE owner_did = $1 AND sync_status = 'pending_push'", 298 + table 299 + ); 300 + 301 + let rows = client 302 + .query(&query, &[&owner_did]) 303 + .await 304 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to query: {}", e)))?; 305 + 306 + for row in rows { 307 + let id: Uuid = row.get("id"); 308 + let version: i32 = row.get("version"); 309 + let status_str: String = row.get("sync_status"); 310 + 311 + pending.push(PendingSync { 312 + entity_type: entity_type.to_string(), 313 + entity_id: id.to_string(), 314 + owner_did: owner_did.to_string(), 315 + version, 316 + status: SyncStatus::from_str(&status_str).unwrap_or(SyncStatus::PendingPush), 317 + }); 318 + } 319 + } 320 + 321 + Ok(pending) 322 + } 323 + 324 + async fn get_conflicts(&self, owner_did: &str) -> Result<Vec<PendingSync>, SyncRepoError> { 325 + let client = self 326 + .pool 327 + .get() 328 + .await 329 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to get connection: {}", e)))?; 330 + 331 + let mut conflicts = Vec::new(); 332 + 333 + for (entity_type, table) in [("deck", "decks"), ("card", "cards"), ("note", "notes")] { 334 + let query = format!( 335 + "SELECT id, version, sync_status::text FROM {} WHERE owner_did = $1 AND sync_status = 'conflict'", 336 + table 337 + ); 338 + 339 + let rows = client 340 + .query(&query, &[&owner_did]) 341 + .await 342 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to query: {}", e)))?; 343 + 344 + for row in rows { 345 + let id: Uuid = row.get("id"); 346 + let version: i32 = row.get("version"); 347 + 348 + conflicts.push(PendingSync { 349 + entity_type: entity_type.to_string(), 350 + entity_id: id.to_string(), 351 + owner_did: owner_did.to_string(), 352 + version, 353 + status: SyncStatus::Conflict, 354 + }); 355 + } 356 + } 357 + 358 + Ok(conflicts) 359 + } 360 + 361 + async fn increment_version(&self, entity_type: &str, id: &str) -> Result<i32, SyncRepoError> { 362 + let table = self.table_for_entity(entity_type)?; 363 + let uuid = Uuid::parse_str(id).map_err(|e| SyncRepoError::InvalidArgument(format!("Invalid UUID: {}", e)))?; 364 + 365 + let client = self 366 + .pool 367 + .get() 368 + .await 369 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to get connection: {}", e)))?; 370 + 371 + let query = format!( 372 + "UPDATE {} SET version = version + 1 WHERE id = $1 RETURNING version", 373 + table 374 + ); 375 + 376 + let row = client 377 + .query_one(&query, &[&uuid]) 378 + .await 379 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to update: {}", e)))?; 380 + 381 + let version: i32 = row.get("version"); 382 + Ok(version) 383 + } 384 + 385 + async fn log_operation(&self, params: LogOperationParams<'_>) -> Result<String, SyncRepoError> { 386 + let entity_uuid = Uuid::parse_str(params.entity_id) 387 + .map_err(|e| SyncRepoError::InvalidArgument(format!("Invalid UUID: {}", e)))?; 388 + 389 + let client = self 390 + .pool 391 + .get() 392 + .await 393 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to get connection: {}", e)))?; 394 + 395 + let row = client 396 + .query_one( 397 + "INSERT INTO sync_log (owner_did, entity_type, entity_id, operation, status, pds_cid, error_message) 398 + VALUES ($1, $2, $3, $4, $5, $6, $7) 399 + RETURNING id", 400 + &[ 401 + &params.owner_did, 402 + &params.entity_type, 403 + &entity_uuid, 404 + &params.operation, 405 + &params.status, 406 + &params.pds_cid, 407 + &params.error_message, 408 + ], 409 + ) 410 + .await 411 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to insert log: {}", e)))?; 412 + 413 + let id: Uuid = row.get("id"); 414 + Ok(id.to_string()) 415 + } 416 + 417 + async fn complete_log_entry( 418 + &self, log_id: &str, status: &str, pds_cid: Option<&str>, error_message: Option<&str>, 419 + ) -> Result<(), SyncRepoError> { 420 + let uuid = 421 + Uuid::parse_str(log_id).map_err(|e| SyncRepoError::InvalidArgument(format!("Invalid UUID: {}", e)))?; 422 + 423 + let client = self 424 + .pool 425 + .get() 426 + .await 427 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to get connection: {}", e)))?; 428 + 429 + client 430 + .execute( 431 + "UPDATE sync_log 432 + SET status = $1, pds_cid = COALESCE($2, pds_cid), error_message = $3, completed_at = NOW() 433 + WHERE id = $4", 434 + &[&status, &pds_cid, &error_message, &uuid], 435 + ) 436 + .await 437 + .map_err(|e| SyncRepoError::DatabaseError(format!("Failed to update log: {}", e)))?; 438 + 439 + Ok(()) 440 + } 441 + } 442 + 443 + #[cfg(test)] 444 + pub mod mock { 445 + use super::*; 446 + use std::collections::HashMap; 447 + use std::sync::{Arc, Mutex}; 448 + 449 + #[derive(Clone)] 450 + pub struct MockSyncRepository { 451 + metadata: Arc<Mutex<HashMap<String, SyncMetadata>>>, 452 + logs: Arc<Mutex<Vec<SyncLogEntry>>>, 453 + } 454 + 455 + impl MockSyncRepository { 456 + pub fn new() -> Self { 457 + Self { metadata: Arc::new(Mutex::new(HashMap::new())), logs: Arc::new(Mutex::new(Vec::new())) } 458 + } 459 + 460 + pub fn with_metadata(metadata: Vec<SyncMetadata>) -> Self { 461 + let map: HashMap<String, SyncMetadata> = metadata 462 + .into_iter() 463 + .map(|m| (format!("{}:{}", m.entity_type, m.entity_id), m)) 464 + .collect(); 465 + Self { metadata: Arc::new(Mutex::new(map)), logs: Arc::new(Mutex::new(Vec::new())) } 466 + } 467 + 468 + fn key(entity_type: &str, id: &str) -> String { 469 + format!("{}:{}", entity_type, id) 470 + } 471 + } 472 + 473 + impl Default for MockSyncRepository { 474 + fn default() -> Self { 475 + Self::new() 476 + } 477 + } 478 + 479 + #[async_trait] 480 + impl SyncRepository for MockSyncRepository { 481 + async fn get_sync_metadata(&self, entity_type: &str, id: &str) -> Result<SyncMetadata, SyncRepoError> { 482 + let key = Self::key(entity_type, id); 483 + self.metadata 484 + .lock() 485 + .unwrap() 486 + .get(&key) 487 + .cloned() 488 + .ok_or_else(|| SyncRepoError::NotFound(format!("{} not found: {}", entity_type, id))) 489 + } 490 + 491 + async fn mark_synced( 492 + &self, entity_type: &str, id: &str, pds_cid: &str, pds_uri: &str, 493 + ) -> Result<(), SyncRepoError> { 494 + let key = Self::key(entity_type, id); 495 + let mut map = self.metadata.lock().unwrap(); 496 + if let Some(meta) = map.get_mut(&key) { 497 + meta.status = SyncStatus::Synced; 498 + meta.pds_cid = Some(pds_cid.to_string()); 499 + meta.pds_uri = Some(pds_uri.to_string()); 500 + meta.last_synced_at = Some(Utc::now()); 501 + } 502 + Ok(()) 503 + } 504 + 505 + async fn mark_pending(&self, entity_type: &str, id: &str) -> Result<(), SyncRepoError> { 506 + let key = Self::key(entity_type, id); 507 + let mut map = self.metadata.lock().unwrap(); 508 + if let Some(meta) = map.get_mut(&key) { 509 + meta.status = SyncStatus::PendingPush; 510 + } 511 + Ok(()) 512 + } 513 + 514 + async fn mark_conflict(&self, entity_type: &str, id: &str) -> Result<(), SyncRepoError> { 515 + let key = Self::key(entity_type, id); 516 + let mut map = self.metadata.lock().unwrap(); 517 + if let Some(meta) = map.get_mut(&key) { 518 + meta.status = SyncStatus::Conflict; 519 + } 520 + Ok(()) 521 + } 522 + 523 + async fn get_pending_items(&self, owner_did: &str) -> Result<Vec<PendingSync>, SyncRepoError> { 524 + let map = self.metadata.lock().unwrap(); 525 + let pending: Vec<_> = map 526 + .values() 527 + .filter(|m| m.status == SyncStatus::PendingPush) 528 + .map(|m| PendingSync { 529 + entity_type: m.entity_type.clone(), 530 + entity_id: m.entity_id.clone(), 531 + owner_did: owner_did.to_string(), 532 + version: m.version, 533 + status: m.status, 534 + }) 535 + .collect(); 536 + Ok(pending) 537 + } 538 + 539 + async fn get_conflicts(&self, owner_did: &str) -> Result<Vec<PendingSync>, SyncRepoError> { 540 + let map = self.metadata.lock().unwrap(); 541 + let conflicts: Vec<_> = map 542 + .values() 543 + .filter(|m| m.status == SyncStatus::Conflict) 544 + .map(|m| PendingSync { 545 + entity_type: m.entity_type.clone(), 546 + entity_id: m.entity_id.clone(), 547 + owner_did: owner_did.to_string(), 548 + version: m.version, 549 + status: m.status, 550 + }) 551 + .collect(); 552 + Ok(conflicts) 553 + } 554 + 555 + async fn increment_version(&self, entity_type: &str, id: &str) -> Result<i32, SyncRepoError> { 556 + let key = Self::key(entity_type, id); 557 + let mut map = self.metadata.lock().unwrap(); 558 + if let Some(meta) = map.get_mut(&key) { 559 + meta.version += 1; 560 + Ok(meta.version) 561 + } else { 562 + Err(SyncRepoError::NotFound(format!("{} not found: {}", entity_type, id))) 563 + } 564 + } 565 + 566 + async fn log_operation(&self, params: LogOperationParams<'_>) -> Result<String, SyncRepoError> { 567 + let id = Uuid::new_v4().to_string(); 568 + let entry = SyncLogEntry { 569 + id: id.clone(), 570 + owner_did: params.owner_did.to_string(), 571 + entity_type: params.entity_type.to_string(), 572 + entity_id: params.entity_id.to_string(), 573 + operation: params.operation.to_string(), 574 + status: params.status.to_string(), 575 + pds_cid: params.pds_cid.map(String::from), 576 + error_message: params.error_message.map(String::from), 577 + created_at: Utc::now(), 578 + completed_at: None, 579 + }; 580 + self.logs.lock().unwrap().push(entry); 581 + Ok(id) 582 + } 583 + 584 + async fn complete_log_entry( 585 + &self, log_id: &str, status: &str, pds_cid: Option<&str>, error_message: Option<&str>, 586 + ) -> Result<(), SyncRepoError> { 587 + let mut logs = self.logs.lock().unwrap(); 588 + if let Some(entry) = logs.iter_mut().find(|e| e.id == log_id) { 589 + entry.status = status.to_string(); 590 + entry.pds_cid = pds_cid.map(String::from).or(entry.pds_cid.clone()); 591 + entry.error_message = error_message.map(String::from); 592 + entry.completed_at = Some(Utc::now()); 593 + } 594 + Ok(()) 595 + } 596 + } 597 + } 598 + 599 + #[cfg(test)] 600 + mod tests { 601 + use super::mock::MockSyncRepository; 602 + use super::*; 603 + 604 + #[test] 605 + fn test_sync_status_as_str() { 606 + assert_eq!(SyncStatus::LocalOnly.as_str(), "local_only"); 607 + assert_eq!(SyncStatus::Synced.as_str(), "synced"); 608 + assert_eq!(SyncStatus::PendingPush.as_str(), "pending_push"); 609 + assert_eq!(SyncStatus::Conflict.as_str(), "conflict"); 610 + } 611 + 612 + #[test] 613 + fn test_sync_status_from_str() { 614 + assert_eq!(SyncStatus::from_str("local_only").unwrap(), SyncStatus::LocalOnly); 615 + assert_eq!(SyncStatus::from_str("synced").unwrap(), SyncStatus::Synced); 616 + assert_eq!(SyncStatus::from_str("pending_push").unwrap(), SyncStatus::PendingPush); 617 + assert_eq!(SyncStatus::from_str("conflict").unwrap(), SyncStatus::Conflict); 618 + assert!(SyncStatus::from_str("unknown").is_err()); 619 + } 620 + 621 + #[tokio::test] 622 + async fn test_mock_sync_repo_get_metadata() { 623 + let metadata = SyncMetadata { 624 + entity_type: "deck".to_string(), 625 + entity_id: "123".to_string(), 626 + version: 1, 627 + pds_cid: None, 628 + pds_uri: None, 629 + status: SyncStatus::LocalOnly, 630 + last_synced_at: None, 631 + }; 632 + let repo = MockSyncRepository::with_metadata(vec![metadata]); 633 + 634 + let result = repo.get_sync_metadata("deck", "123").await; 635 + assert!(result.is_ok()); 636 + let meta = result.unwrap(); 637 + assert_eq!(meta.entity_type, "deck"); 638 + assert_eq!(meta.version, 1); 639 + assert_eq!(meta.status, SyncStatus::LocalOnly); 640 + } 641 + 642 + #[tokio::test] 643 + async fn test_mock_sync_repo_mark_synced() { 644 + let metadata = SyncMetadata { 645 + entity_type: "deck".to_string(), 646 + entity_id: "123".to_string(), 647 + version: 1, 648 + pds_cid: None, 649 + pds_uri: None, 650 + status: SyncStatus::PendingPush, 651 + last_synced_at: None, 652 + }; 653 + let repo = MockSyncRepository::with_metadata(vec![metadata]); 654 + 655 + repo.mark_synced("deck", "123", "bafycid123", "at://did:plc:test/deck/123") 656 + .await 657 + .unwrap(); 658 + 659 + let meta = repo.get_sync_metadata("deck", "123").await.unwrap(); 660 + assert_eq!(meta.status, SyncStatus::Synced); 661 + assert_eq!(meta.pds_cid, Some("bafycid123".to_string())); 662 + assert!(meta.last_synced_at.is_some()); 663 + } 664 + 665 + #[tokio::test] 666 + async fn test_mock_sync_repo_increment_version() { 667 + let metadata = SyncMetadata { 668 + entity_type: "note".to_string(), 669 + entity_id: "456".to_string(), 670 + version: 5, 671 + pds_cid: None, 672 + pds_uri: None, 673 + status: SyncStatus::Synced, 674 + last_synced_at: None, 675 + }; 676 + let repo = MockSyncRepository::with_metadata(vec![metadata]); 677 + 678 + let new_version = repo.increment_version("note", "456").await.unwrap(); 679 + assert_eq!(new_version, 6); 680 + 681 + let meta = repo.get_sync_metadata("note", "456").await.unwrap(); 682 + assert_eq!(meta.version, 6); 683 + } 684 + 685 + #[tokio::test] 686 + async fn test_mock_sync_repo_log_operation() { 687 + let repo = MockSyncRepository::new(); 688 + 689 + let log_id = repo 690 + .log_operation(LogOperationParams { 691 + owner_did: "did:plc:test", 692 + entity_type: "deck", 693 + entity_id: "123e4567-e89b-12d3-a456-426614174000", 694 + operation: "push", 695 + status: "pending", 696 + pds_cid: None, 697 + error_message: None, 698 + }) 699 + .await 700 + .unwrap(); 701 + 702 + assert!(!log_id.is_empty()); 703 + 704 + repo.complete_log_entry(&log_id, "success", Some("bafycid"), None) 705 + .await 706 + .unwrap(); 707 + } 708 + 709 + #[tokio::test] 710 + async fn test_mock_sync_repo_get_pending() { 711 + let metadata = vec![ 712 + SyncMetadata { 713 + entity_type: "deck".to_string(), 714 + entity_id: "1".to_string(), 715 + version: 1, 716 + pds_cid: None, 717 + pds_uri: None, 718 + status: SyncStatus::PendingPush, 719 + last_synced_at: None, 720 + }, 721 + SyncMetadata { 722 + entity_type: "note".to_string(), 723 + entity_id: "2".to_string(), 724 + version: 1, 725 + pds_cid: None, 726 + pds_uri: None, 727 + status: SyncStatus::Synced, 728 + last_synced_at: None, 729 + }, 730 + ]; 731 + let repo = MockSyncRepository::with_metadata(metadata); 732 + 733 + let pending = repo.get_pending_items("did:plc:test").await.unwrap(); 734 + assert_eq!(pending.len(), 1); 735 + assert_eq!(pending[0].entity_type, "deck"); 736 + } 737 + 738 + #[test] 739 + fn test_sync_repo_error_display() { 740 + let err = SyncRepoError::DatabaseError("connection failed".to_string()); 741 + assert!(err.to_string().contains("Database error")); 742 + 743 + let err = SyncRepoError::NotFound("deck:123".to_string()); 744 + assert!(err.to_string().contains("Not found")); 745 + 746 + let err = SyncRepoError::InvalidArgument("bad uuid".to_string()); 747 + assert!(err.to_string().contains("Invalid argument")); 748 + } 749 + }
+518
crates/server/src/sync_service.rs
···
··· 1 + //! Sync service for coordinating bi-directional PDS synchronization. 2 + //! 3 + //! Handles push/pull operations and conflict resolution. 4 + 5 + use crate::middleware::auth::UserContext; 6 + use crate::pds::client::{GetRecordResult, PdsClient, PdsError}; 7 + use crate::pds::records::{prepare_card_record, prepare_deck_record, prepare_note_record}; 8 + use crate::repository::card::CardRepository; 9 + use crate::repository::deck::DeckRepository; 10 + use crate::repository::note::NoteRepository; 11 + use crate::repository::oauth::OAuthRepository; 12 + use crate::repository::sync::{LogOperationParams, SyncRepoError, SyncRepository, SyncStatus}; 13 + use std::str::FromStr; 14 + use std::sync::Arc; 15 + 16 + /// Result of a sync operation. 17 + #[derive(Debug, Clone)] 18 + pub struct SyncResult { 19 + pub entity_type: String, 20 + pub entity_id: String, 21 + pub pds_uri: Option<String>, 22 + pub pds_cid: Option<String>, 23 + pub new_version: i32, 24 + pub status: SyncStatus, 25 + } 26 + 27 + /// Conflict information for UI display. 28 + #[derive(Debug, Clone)] 29 + pub struct ConflictInfo { 30 + pub entity_type: String, 31 + pub entity_id: String, 32 + pub local_version: i32, 33 + pub remote_version: Option<i32>, 34 + pub local_updated_at: Option<String>, 35 + pub remote_updated_at: Option<String>, 36 + } 37 + 38 + /// Summary of sync status for a user. 39 + #[derive(Debug, Clone)] 40 + pub struct SyncStatusSummary { 41 + pub pending_count: usize, 42 + pub conflict_count: usize, 43 + pub pending_items: Vec<(String, String)>, 44 + pub conflicts: Vec<ConflictInfo>, 45 + } 46 + 47 + /// Conflict resolution strategy. 48 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 49 + pub enum ConflictStrategy { 50 + /// Use the most recently modified version (default) 51 + LastWriteWins, 52 + /// Keep local version, overwrite remote 53 + KeepLocal, 54 + /// Keep remote version, overwrite local 55 + KeepRemote, 56 + // TODO: MergeUI - Show UI for manual merge 57 + } 58 + 59 + impl ConflictStrategy { 60 + pub fn as_str(&self) -> &'static str { 61 + match self { 62 + ConflictStrategy::LastWriteWins => "last_write_wins", 63 + ConflictStrategy::KeepLocal => "keep_local", 64 + ConflictStrategy::KeepRemote => "keep_remote", 65 + } 66 + } 67 + } 68 + 69 + impl FromStr for ConflictStrategy { 70 + type Err = String; 71 + 72 + fn from_str(s: &str) -> Result<Self, Self::Err> { 73 + match s { 74 + "last_write_wins" => Ok(ConflictStrategy::LastWriteWins), 75 + "keep_local" => Ok(ConflictStrategy::KeepLocal), 76 + "keep_remote" => Ok(ConflictStrategy::KeepRemote), 77 + _ => Err(format!("Invalid conflict strategy: {}", s)), 78 + } 79 + } 80 + } 81 + 82 + /// Error type for sync operations. 83 + #[derive(Debug)] 84 + pub enum SyncError { 85 + /// Entity not found 86 + NotFound(String), 87 + /// Authentication required 88 + AuthRequired(String), 89 + /// No OAuth tokens available 90 + NoTokens(String), 91 + /// PDS operation failed 92 + PdsError(PdsError), 93 + /// Repository error 94 + RepoError(SyncRepoError), 95 + /// Invalid argument 96 + InvalidArgument(String), 97 + /// Conflict detected 98 + ConflictDetected(ConflictInfo), 99 + } 100 + 101 + impl std::fmt::Display for SyncError { 102 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 103 + match self { 104 + SyncError::NotFound(e) => write!(f, "Not found: {}", e), 105 + SyncError::AuthRequired(e) => write!(f, "Authentication required: {}", e), 106 + SyncError::NoTokens(e) => write!(f, "No OAuth tokens: {}", e), 107 + SyncError::PdsError(e) => write!(f, "PDS error: {}", e), 108 + SyncError::RepoError(e) => write!(f, "Repository error: {}", e), 109 + SyncError::InvalidArgument(e) => write!(f, "Invalid argument: {}", e), 110 + SyncError::ConflictDetected(c) => { 111 + write!(f, "Conflict detected for {}:{}", c.entity_type, c.entity_id) 112 + } 113 + } 114 + } 115 + } 116 + 117 + impl std::error::Error for SyncError {} 118 + 119 + impl From<SyncRepoError> for SyncError { 120 + fn from(e: SyncRepoError) -> Self { 121 + SyncError::RepoError(e) 122 + } 123 + } 124 + 125 + impl From<PdsError> for SyncError { 126 + fn from(e: PdsError) -> Self { 127 + SyncError::PdsError(e) 128 + } 129 + } 130 + 131 + /// Remote record data retrieved from PDS. 132 + #[derive(Debug, Clone)] 133 + pub struct RemoteRecord { 134 + pub uri: String, 135 + pub cid: String, 136 + pub value: serde_json::Value, 137 + } 138 + 139 + /// Sync service for coordinating sync operations. 140 + pub struct SyncService { 141 + sync_repo: Arc<dyn SyncRepository>, 142 + deck_repo: Arc<dyn DeckRepository>, 143 + card_repo: Arc<dyn CardRepository>, 144 + note_repo: Arc<dyn NoteRepository>, 145 + oauth_repo: Arc<dyn OAuthRepository>, 146 + } 147 + 148 + impl SyncService { 149 + pub fn new( 150 + sync_repo: Arc<dyn SyncRepository>, deck_repo: Arc<dyn DeckRepository>, card_repo: Arc<dyn CardRepository>, 151 + note_repo: Arc<dyn NoteRepository>, oauth_repo: Arc<dyn OAuthRepository>, 152 + ) -> Self { 153 + Self { sync_repo, deck_repo, card_repo, note_repo, oauth_repo } 154 + } 155 + 156 + /// Push a local deck to the user's PDS. 157 + pub async fn push_deck(&self, deck_id: &str, user_ctx: &UserContext) -> Result<SyncResult, SyncError> { 158 + // Log the operation 159 + let log_id = self 160 + .sync_repo 161 + .log_operation(LogOperationParams { 162 + owner_did: &user_ctx.did, 163 + entity_type: "deck", 164 + entity_id: deck_id, 165 + operation: "push", 166 + status: "pending", 167 + pds_cid: None, 168 + error_message: None, 169 + }) 170 + .await?; 171 + 172 + // Get PDS client 173 + let pds_client = self.get_pds_client(user_ctx).await?; 174 + 175 + // Get deck from repository 176 + let deck = self 177 + .deck_repo 178 + .get(deck_id) 179 + .await 180 + .map_err(|e| SyncError::NotFound(format!("Deck not found: {:?}", e)))?; 181 + 182 + // Get cards for the deck 183 + let cards = self 184 + .card_repo 185 + .list_by_deck(deck_id) 186 + .await 187 + .map_err(|e| SyncError::RepoError(SyncRepoError::DatabaseError(format!("{:?}", e))))?; 188 + 189 + // Push cards first, collect AT-URIs 190 + let mut card_at_uris = Vec::with_capacity(cards.len()); 191 + for card in &cards { 192 + let prepared = prepare_card_record(card, ""); // deck_ref filled later 193 + let at_uri = pds_client 194 + .put_record(&user_ctx.did, &prepared.collection, &prepared.rkey, prepared.record) 195 + .await?; 196 + card_at_uris.push(at_uri.to_string()); 197 + 198 + // Mark card as synced 199 + self.sync_repo 200 + .mark_synced("card", &card.id, "", &at_uri.to_string()) 201 + .await?; 202 + } 203 + 204 + // Push deck with card refs 205 + let prepared = prepare_deck_record(&deck, card_at_uris); 206 + let at_uri = pds_client 207 + .put_record(&user_ctx.did, &prepared.collection, &prepared.rkey, prepared.record) 208 + .await?; 209 + 210 + // Mark deck as synced 211 + self.sync_repo 212 + .mark_synced("deck", deck_id, "", &at_uri.to_string()) 213 + .await?; 214 + 215 + let metadata = self.sync_repo.get_sync_metadata("deck", deck_id).await?; 216 + 217 + // Complete log entry 218 + self.sync_repo 219 + .complete_log_entry(&log_id, "success", metadata.pds_cid.as_deref(), None) 220 + .await?; 221 + 222 + Ok(SyncResult { 223 + entity_type: "deck".to_string(), 224 + entity_id: deck_id.to_string(), 225 + pds_uri: Some(at_uri.to_string()), 226 + pds_cid: metadata.pds_cid, 227 + new_version: metadata.version, 228 + status: SyncStatus::Synced, 229 + }) 230 + } 231 + 232 + /// Push a local note to the user's PDS. 233 + pub async fn push_note(&self, note_id: &str, user_ctx: &UserContext) -> Result<SyncResult, SyncError> { 234 + // Log the operation 235 + let log_id = self 236 + .sync_repo 237 + .log_operation(LogOperationParams { 238 + owner_did: &user_ctx.did, 239 + entity_type: "note", 240 + entity_id: note_id, 241 + operation: "push", 242 + status: "pending", 243 + pds_cid: None, 244 + error_message: None, 245 + }) 246 + .await?; 247 + 248 + // Get PDS client 249 + let pds_client = self.get_pds_client(user_ctx).await?; 250 + 251 + // Get note from repository 252 + let note = self 253 + .note_repo 254 + .get(note_id, Some(&user_ctx.did)) 255 + .await 256 + .map_err(|e| SyncError::NotFound(format!("Note not found: {:?}", e)))?; 257 + 258 + let prepared = prepare_note_record(&note); 259 + let at_uri = pds_client 260 + .put_record(&user_ctx.did, &prepared.collection, &prepared.rkey, prepared.record) 261 + .await?; 262 + 263 + self.sync_repo 264 + .mark_synced("note", note_id, "", &at_uri.to_string()) 265 + .await?; 266 + 267 + let metadata = self.sync_repo.get_sync_metadata("note", note_id).await?; 268 + 269 + // Complete log entry 270 + self.sync_repo 271 + .complete_log_entry(&log_id, "success", metadata.pds_cid.as_deref(), None) 272 + .await?; 273 + 274 + Ok(SyncResult { 275 + entity_type: "note".to_string(), 276 + entity_id: note_id.to_string(), 277 + pds_uri: Some(at_uri.to_string()), 278 + pds_cid: metadata.pds_cid, 279 + new_version: metadata.version, 280 + status: SyncStatus::Synced, 281 + }) 282 + } 283 + 284 + /// Pull a record from the user's PDS. 285 + pub async fn pull_record( 286 + &self, entity_type: &str, at_uri: &str, user_ctx: &UserContext, 287 + ) -> Result<RemoteRecord, SyncError> { 288 + let parsed = malfestio_core::at_uri::AtUri::parse(at_uri) 289 + .map_err(|e| SyncError::InvalidArgument(format!("Invalid AT-URI: {}", e)))?; 290 + 291 + let log_id = self 292 + .sync_repo 293 + .log_operation(LogOperationParams { 294 + owner_did: &user_ctx.did, 295 + entity_type, 296 + entity_id: at_uri, 297 + operation: "pull", 298 + status: "pending", 299 + pds_cid: None, 300 + error_message: None, 301 + }) 302 + .await?; 303 + 304 + let pds_client = self.get_pds_client(user_ctx).await?; 305 + let result: GetRecordResult = pds_client 306 + .get_record(&parsed.authority, &parsed.collection, &parsed.rkey) 307 + .await 308 + .map_err(|e| { 309 + tracing::error!("Failed to pull record from PDS: {:?}", e); 310 + SyncError::PdsError(e) 311 + })?; 312 + 313 + self.sync_repo 314 + .complete_log_entry(&log_id, "success", Some(&result.cid), None) 315 + .await?; 316 + 317 + // TODO: Offline queue - Store pulled record in IndexedDB for offline access 318 + 319 + Ok(RemoteRecord { uri: result.uri, cid: result.cid, value: result.value }) 320 + } 321 + 322 + /// Check if there's a conflict between local and remote versions. 323 + pub async fn check_conflict( 324 + &self, entity_type: &str, entity_id: &str, remote_cid: &str, 325 + ) -> Result<bool, SyncError> { 326 + let metadata = self.sync_repo.get_sync_metadata(entity_type, entity_id).await?; 327 + 328 + let has_conflict = 329 + metadata.status == SyncStatus::PendingPush && metadata.pds_cid.as_deref() != Some(remote_cid); 330 + 331 + if has_conflict { 332 + self.sync_repo.mark_conflict(entity_type, entity_id).await?; 333 + } 334 + 335 + Ok(has_conflict) 336 + } 337 + 338 + /// Get sync status for a user. 339 + pub async fn get_sync_status(&self, user_ctx: &UserContext) -> Result<SyncStatusSummary, SyncError> { 340 + let pending = self.sync_repo.get_pending_items(&user_ctx.did).await?; 341 + let conflicts = self.sync_repo.get_conflicts(&user_ctx.did).await?; 342 + 343 + Ok(SyncStatusSummary { 344 + pending_count: pending.len(), 345 + conflict_count: conflicts.len(), 346 + pending_items: pending.into_iter().map(|p| (p.entity_type, p.entity_id)).collect(), 347 + conflicts: conflicts 348 + .into_iter() 349 + .map(|c| ConflictInfo { 350 + entity_type: c.entity_type, 351 + entity_id: c.entity_id, 352 + local_version: c.version, 353 + remote_version: None, 354 + local_updated_at: None, 355 + remote_updated_at: None, 356 + }) 357 + .collect(), 358 + }) 359 + } 360 + 361 + /// Resolve a conflict using the specified strategy. 362 + pub async fn resolve_conflict( 363 + &self, entity_type: &str, id: &str, strategy: ConflictStrategy, user_ctx: &UserContext, 364 + ) -> Result<SyncResult, SyncError> { 365 + let metadata = self.sync_repo.get_sync_metadata(entity_type, id).await?; 366 + 367 + if metadata.status != SyncStatus::Conflict { 368 + return Err(SyncError::InvalidArgument(format!( 369 + "Entity is not in conflict state: {}:{}", 370 + entity_type, id 371 + ))); 372 + } 373 + 374 + match strategy { 375 + ConflictStrategy::LastWriteWins | ConflictStrategy::KeepLocal => match entity_type { 376 + "deck" => self.push_deck(id, user_ctx).await, 377 + "note" => self.push_note(id, user_ctx).await, 378 + _ => Err(SyncError::InvalidArgument(format!( 379 + "Unknown entity type: {}", 380 + entity_type 381 + ))), 382 + }, 383 + ConflictStrategy::KeepRemote => { 384 + if let Some(pds_uri) = &metadata.pds_uri { 385 + let remote = self.pull_record(entity_type, pds_uri, user_ctx).await?; 386 + 387 + self.sync_repo 388 + .mark_synced(entity_type, id, &remote.cid, &remote.uri) 389 + .await?; 390 + 391 + let new_metadata = self.sync_repo.get_sync_metadata(entity_type, id).await?; 392 + 393 + Ok(SyncResult { 394 + entity_type: entity_type.to_string(), 395 + entity_id: id.to_string(), 396 + pds_uri: Some(remote.uri), 397 + pds_cid: Some(remote.cid), 398 + new_version: new_metadata.version, 399 + status: SyncStatus::Synced, 400 + }) 401 + } else { 402 + Err(SyncError::InvalidArgument("No PDS URI for remote record".to_string())) 403 + } 404 + } 405 + } 406 + } 407 + 408 + async fn get_pds_client(&self, user_ctx: &UserContext) -> Result<PdsClient, SyncError> { 409 + if user_ctx.has_dpop 410 + && let Ok(stored_token) = self.oauth_repo.get_tokens(&user_ctx.did).await 411 + && let Some(dpop_keypair) = stored_token.dpop_keypair() 412 + { 413 + Ok(PdsClient::new_with_dpop( 414 + stored_token.pds_url.clone(), 415 + stored_token.access_token.clone(), 416 + dpop_keypair, 417 + )) 418 + } else { 419 + Ok(PdsClient::new_bearer( 420 + user_ctx.pds_url.clone(), 421 + user_ctx.access_token.clone(), 422 + )) 423 + } 424 + } 425 + } 426 + 427 + #[cfg(test)] 428 + mod tests { 429 + use super::*; 430 + 431 + #[test] 432 + fn test_conflict_strategy_from_str() { 433 + assert_eq!( 434 + ConflictStrategy::from_str("last_write_wins"), 435 + Ok(ConflictStrategy::LastWriteWins) 436 + ); 437 + assert_eq!( 438 + ConflictStrategy::from_str("keep_local"), 439 + Ok(ConflictStrategy::KeepLocal) 440 + ); 441 + assert_eq!( 442 + ConflictStrategy::from_str("keep_remote"), 443 + Ok(ConflictStrategy::KeepRemote) 444 + ); 445 + assert!(ConflictStrategy::from_str("unknown").is_err()); 446 + } 447 + 448 + #[test] 449 + fn test_conflict_strategy_as_str() { 450 + assert_eq!(ConflictStrategy::LastWriteWins.as_str(), "last_write_wins"); 451 + assert_eq!(ConflictStrategy::KeepLocal.as_str(), "keep_local"); 452 + assert_eq!(ConflictStrategy::KeepRemote.as_str(), "keep_remote"); 453 + } 454 + 455 + #[test] 456 + fn test_sync_error_display() { 457 + let err = SyncError::NotFound("deck:123".to_string()); 458 + assert!(err.to_string().contains("Not found")); 459 + 460 + let err = SyncError::AuthRequired("missing token".to_string()); 461 + assert!(err.to_string().contains("Authentication required")); 462 + 463 + let err = SyncError::InvalidArgument("bad type".to_string()); 464 + assert!(err.to_string().contains("Invalid argument")); 465 + } 466 + 467 + #[test] 468 + fn test_sync_result_creation() { 469 + let result = SyncResult { 470 + entity_type: "deck".to_string(), 471 + entity_id: "123".to_string(), 472 + pds_uri: Some("at://did:plc:test/deck/tid".to_string()), 473 + pds_cid: Some("bafycid".to_string()), 474 + new_version: 2, 475 + status: SyncStatus::Synced, 476 + }; 477 + 478 + assert_eq!(result.entity_type, "deck"); 479 + assert_eq!(result.new_version, 2); 480 + assert_eq!(result.status, SyncStatus::Synced); 481 + } 482 + 483 + #[test] 484 + fn test_sync_status_summary() { 485 + let summary = SyncStatusSummary { 486 + pending_count: 3, 487 + conflict_count: 1, 488 + pending_items: vec![ 489 + ("deck".to_string(), "1".to_string()), 490 + ("note".to_string(), "2".to_string()), 491 + ], 492 + conflicts: vec![ConflictInfo { 493 + entity_type: "deck".to_string(), 494 + entity_id: "3".to_string(), 495 + local_version: 5, 496 + remote_version: Some(6), 497 + local_updated_at: None, 498 + remote_updated_at: None, 499 + }], 500 + }; 501 + 502 + assert_eq!(summary.pending_count, 3); 503 + assert_eq!(summary.conflict_count, 1); 504 + assert_eq!(summary.pending_items.len(), 2); 505 + } 506 + 507 + #[test] 508 + fn test_remote_record_creation() { 509 + let record = RemoteRecord { 510 + uri: "at://did:plc:test/deck/tid".to_string(), 511 + cid: "bafycid123".to_string(), 512 + value: serde_json::json!({"title": "Test"}), 513 + }; 514 + 515 + assert_eq!(record.uri, "at://did:plc:test/deck/tid"); 516 + assert!(record.value.get("title").is_some()); 517 + } 518 + }
+4 -2
docs/todo.md
··· 30 31 **Sync & Conflict Resolution:** 32 33 - - [ ] Bi-directional sync: local drafts → PDS records, PDS records → local cache 34 - - [ ] Conflict resolution strategy for concurrent edits (last-write-wins or merge UI) 35 - [ ] Offline queue for pending publishes 36 - [ ] Sync status UI indicators 37 38 **Deep Linking:**
··· 30 31 **Sync & Conflict Resolution:** 32 33 + - [x] Bi-directional sync infrastructure 34 + - [x] Conflict resolution strategy 35 + - [ ] API endpoints for sync operations 36 - [ ] Offline queue for pending publishes 37 + - [ ] Frontend sync store with IndexedDB persistence 38 - [ ] Sync status UI indicators 39 40 **Deep Linking:**
+83
migrations/015_2026_01_03_sync_tracking.sql
···
··· 1 + -- Sync tracking infrastructure for bi-directional PDS synchronization 2 + -- Adds version tracking and sync status to core tables 3 + 4 + -- Sync status enum (idempotent creation) 5 + DO $$ BEGIN 6 + CREATE TYPE sync_status AS ENUM ( 7 + 'local_only', -- Never synced to PDS 8 + 'synced', -- In sync with PDS 9 + 'pending_push', -- Local changes need to be pushed 10 + 'conflict' -- Local and remote both changed 11 + ); 12 + EXCEPTION 13 + WHEN duplicate_object THEN null; 14 + END $$; 15 + 16 + ALTER TABLE decks 17 + ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 1, 18 + ADD COLUMN IF NOT EXISTS pds_cid TEXT, 19 + ADD COLUMN IF NOT EXISTS pds_uri TEXT, 20 + ADD COLUMN IF NOT EXISTS sync_status sync_status NOT NULL DEFAULT 'local_only', 21 + ADD COLUMN IF NOT EXISTS last_synced_at TIMESTAMPTZ; 22 + 23 + ALTER TABLE cards 24 + ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 1, 25 + ADD COLUMN IF NOT EXISTS pds_cid TEXT, 26 + ADD COLUMN IF NOT EXISTS pds_uri TEXT, 27 + ADD COLUMN IF NOT EXISTS sync_status sync_status NOT NULL DEFAULT 'local_only', 28 + ADD COLUMN IF NOT EXISTS last_synced_at TIMESTAMPTZ; 29 + 30 + ALTER TABLE notes 31 + ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 1, 32 + ADD COLUMN IF NOT EXISTS pds_cid TEXT, 33 + ADD COLUMN IF NOT EXISTS pds_uri TEXT, 34 + ADD COLUMN IF NOT EXISTS sync_status sync_status NOT NULL DEFAULT 'local_only', 35 + ADD COLUMN IF NOT EXISTS last_synced_at TIMESTAMPTZ; 36 + 37 + CREATE TABLE IF NOT EXISTS sync_log ( 38 + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), 39 + owner_did TEXT NOT NULL, 40 + entity_type TEXT NOT NULL, -- 'deck', 'card', 'note' 41 + entity_id UUID NOT NULL, 42 + operation TEXT NOT NULL, -- 'push', 'pull', 'conflict_resolve' 43 + status TEXT NOT NULL, -- 'pending', 'success', 'failed' 44 + pds_cid TEXT, 45 + error_message TEXT, 46 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 47 + completed_at TIMESTAMPTZ 48 + ); 49 + 50 + CREATE INDEX IF NOT EXISTS idx_sync_log_owner_did ON sync_log(owner_did); 51 + CREATE INDEX IF NOT EXISTS idx_sync_log_entity ON sync_log(entity_type, entity_id); 52 + CREATE INDEX IF NOT EXISTS idx_sync_log_status ON sync_log(status); 53 + CREATE INDEX IF NOT EXISTS idx_sync_log_created_at ON sync_log(created_at DESC); 54 + 55 + CREATE INDEX IF NOT EXISTS idx_decks_sync_status ON decks(sync_status) WHERE sync_status != 'synced'; 56 + CREATE INDEX IF NOT EXISTS idx_cards_sync_status ON cards(sync_status) WHERE sync_status != 'synced'; 57 + CREATE INDEX IF NOT EXISTS idx_notes_sync_status ON notes(sync_status) WHERE sync_status != 'synced'; 58 + 59 + CREATE OR REPLACE FUNCTION increment_version_on_update() 60 + RETURNS TRIGGER AS $$ 61 + BEGIN 62 + -- Only increment if content changed (not just sync metadata) 63 + IF (TG_TABLE_NAME = 'decks' AND (NEW.title != OLD.title OR NEW.description != OLD.description OR NEW.tags != OLD.tags)) OR 64 + (TG_TABLE_NAME = 'cards' AND (NEW.front != OLD.front OR NEW.back != OLD.back OR NEW.media_url IS DISTINCT FROM OLD.media_url)) OR 65 + (TG_TABLE_NAME = 'notes' AND (NEW.title != OLD.title OR NEW.body != OLD.body OR NEW.tags != OLD.tags)) THEN 66 + NEW.version = OLD.version + 1; 67 + -- Mark as pending push if it was synced 68 + IF OLD.sync_status = 'synced' THEN 69 + NEW.sync_status = 'pending_push'; 70 + END IF; 71 + END IF; 72 + RETURN NEW; 73 + END; 74 + $$ LANGUAGE plpgsql; 75 + 76 + CREATE TRIGGER increment_decks_version BEFORE UPDATE ON decks 77 + FOR EACH ROW EXECUTE FUNCTION increment_version_on_update(); 78 + 79 + CREATE TRIGGER increment_cards_version BEFORE UPDATE ON cards 80 + FOR EACH ROW EXECUTE FUNCTION increment_version_on_update(); 81 + 82 + CREATE TRIGGER increment_notes_version BEFORE UPDATE ON notes 83 + FOR EACH ROW EXECUTE FUNCTION increment_version_on_update();