fixed deleted record handling

Orual c11d8208 247da442

+56 -5
+1 -1
crates/weaver-index/migrations/clickhouse/001_raw_records.sql
··· 50 ) 51 ) 52 ENGINE = ReplacingMergeTree(indexed_at) 53 - ORDER BY (collection, did, rkey, event_time) 54 SETTINGS deduplicate_merge_projection_mode = 'drop';
··· 50 ) 51 ) 52 ENGINE = ReplacingMergeTree(indexed_at) 53 + ORDER BY (collection, did, rkey, event_time, indexed_at) 54 SETTINGS deduplicate_merge_projection_mode = 'drop';
+5 -3
crates/weaver-index/src/clickhouse/client.rs
··· 107 rkey: &str, 108 ) -> Result<Option<RecordRow>, IndexError> { 109 // FINAL ensures ReplacingMergeTree deduplication is applied 110 let query = r#" 111 - SELECT cid, record 112 FROM raw_records FINAL 113 WHERE did = ? 114 AND collection = ? 115 AND rkey = ? 116 - AND operation != 'delete' 117 - ORDER BY event_time DESC 118 LIMIT 1 119 "#; 120 ··· 283 pub struct RecordRow { 284 pub cid: String, 285 pub record: String, // JSON string 286 } 287 288 /// Record with rkey from raw_records (for listRecords)
··· 107 rkey: &str, 108 ) -> Result<Option<RecordRow>, IndexError> { 109 // FINAL ensures ReplacingMergeTree deduplication is applied 110 + // Order by event_time first (firehose data wins), then indexed_at as tiebreaker 111 + // Include deletes so we can return not-found for deleted records 112 let query = r#" 113 + SELECT cid, record, operation 114 FROM raw_records FINAL 115 WHERE did = ? 116 AND collection = ? 117 AND rkey = ? 118 + ORDER BY event_time DESC, indexed_at DESC 119 LIMIT 1 120 "#; 121 ··· 284 pub struct RecordRow { 285 pub cid: String, 286 pub record: String, // JSON string 287 + pub operation: String, 288 } 289 290 /// Record with rkey from raw_records (for listRecords)
+50 -1
crates/weaver-index/src/endpoints/repo.rs
··· 96 })?; 97 98 if let Some(row) = cached { 99 // Cache hit - return from ClickHouse 100 let value: Data<'_> = serde_json::from_str(&row.record).map_err(|e| { 101 tracing::error!("Failed to parse record JSON: {}", e); ··· 103 })?; 104 105 let uri_str = format!("at://{}/{}/{}", did, collection, rkey); 106 - let uri = AtUri::new_owned(uri_str).map_err(|e| { 107 tracing::error!("Failed to construct AT URI: {}", e); 108 XrpcErrorResponse::internal_error("Failed to construct URI") 109 })?; ··· 112 tracing::error!("Invalid CID in database: {}", e); 113 XrpcErrorResponse::internal_error("Invalid CID stored") 114 })?; 115 116 return Ok(Json( 117 GetRecordOutput {
··· 96 })?; 97 98 if let Some(row) = cached { 99 + // Check if record was deleted 100 + if row.operation == "delete" { 101 + return Err(XrpcErrorResponse::not_found("Record not found")); 102 + } 103 + 104 // Cache hit - return from ClickHouse 105 let value: Data<'_> = serde_json::from_str(&row.record).map_err(|e| { 106 tracing::error!("Failed to parse record JSON: {}", e); ··· 108 })?; 109 110 let uri_str = format!("at://{}/{}/{}", did, collection, rkey); 111 + let uri = AtUri::new_owned(uri_str.clone()).map_err(|e| { 112 tracing::error!("Failed to construct AT URI: {}", e); 113 XrpcErrorResponse::internal_error("Failed to construct URI") 114 })?; ··· 117 tracing::error!("Invalid CID in database: {}", e); 118 XrpcErrorResponse::internal_error("Invalid CID stored") 119 })?; 120 + 121 + // Stale-while-revalidate: check freshness in background 122 + let cached_cid = row.cid.clone(); 123 + let clickhouse = state.clickhouse.clone(); 124 + let resolver = state.resolver.clone(); 125 + let did_str = did.as_str().to_string(); 126 + let collection_str = collection.to_string(); 127 + let rkey_str = rkey.to_string(); 128 + 129 + tokio::spawn(async move { 130 + let uri = match AtUri::new_owned(uri_str) { 131 + Ok(u) => u, 132 + Err(_) => return, 133 + }; 134 + 135 + let upstream = match resolver.fetch_record_slingshot(&uri).await { 136 + Ok(r) => r, 137 + Err(e) => { 138 + tracing::debug!("Background revalidation fetch failed: {}", e); 139 + return; 140 + } 141 + }; 142 + 143 + // Check if CID changed 144 + let upstream_cid = upstream 145 + .cid 146 + .as_ref() 147 + .map(|c| c.as_str()) 148 + .unwrap_or_default(); 149 + 150 + if upstream_cid != cached_cid && !upstream_cid.is_empty() { 151 + let record_json = serde_json::to_string(&upstream.value).unwrap_or_default(); 152 + if !record_json.is_empty() { 153 + if let Err(e) = clickhouse 154 + .insert_record(&did_str, &collection_str, &rkey_str, upstream_cid, &record_json) 155 + .await 156 + { 157 + tracing::warn!("Failed to update stale cache entry: {}", e); 158 + } else { 159 + tracing::debug!("Updated stale cache entry for {}", uri); 160 + } 161 + } 162 + } 163 + }); 164 165 return Ok(Json( 166 GetRecordOutput {