···55 println!("cargo:rerun-if-changed=static/.vite/manifest.json");
6677 // Read Vite manifest to get hashed asset filenames
88- let manifest_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
99- .join("static/.vite/manifest.json");
88+ let manifest_path =
99+ std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("static/.vite/manifest.json");
10101111 let (bundle_js, bundle_css) = if manifest_path.exists() {
1212- let manifest_content = std::fs::read_to_string(&manifest_path)
1313- .expect("Failed to read Vite manifest");
1414- let manifest: serde_json::Value = serde_json::from_str(&manifest_content)
1515- .expect("Failed to parse Vite manifest");
1212+ let manifest_content =
1313+ std::fs::read_to_string(&manifest_path).expect("Failed to read Vite manifest");
1414+ let manifest: serde_json::Value =
1515+ serde_json::from_str(&manifest_content).expect("Failed to parse Vite manifest");
16161717 // Extract main entry point
1818- let main_entry = manifest.get("src/main.ts")
1818+ let main_entry = manifest
1919+ .get("src/main.ts")
1920 .expect("Missing src/main.ts in manifest");
20212121- let js_file = main_entry.get("file")
2222+ let js_file = main_entry
2323+ .get("file")
2224 .and_then(|v| v.as_str())
2325 .expect("Missing file in main entry");
24262525- let css_file = main_entry.get("css")
2727+ let css_file = main_entry
2828+ .get("css")
2629 .and_then(|v| v.as_array())
2730 .and_then(|arr| arr.first())
2831 .and_then(|v| v.as_str())
+1-4
src/atproto/lexicon/lfg.rs
···214214 let mut lfg = create_test_lfg();
215215 lfg.tags = vec![];
216216 assert!(lfg.validate().is_err());
217217- assert_eq!(
218218- lfg.validate().unwrap_err(),
219219- "At least one tag is required"
220220- );
217217+ assert_eq!(lfg.validate().unwrap_err(), "At least one tag is required");
221218 }
222219223220 #[test]
+3-1
src/atproto/utils.rs
···9999#[cfg(test)]
100100mod tests {
101101 use super::*;
102102- use atproto_record::lexicon::app::bsky::richtext::facet::{ByteSlice, Facet, Link, Mention, Tag};
102102+ use atproto_record::lexicon::app::bsky::richtext::facet::{
103103+ ByteSlice, Facet, Link, Mention, Tag,
104104+ };
103105 use std::collections::HashMap;
104106105107 #[test]
+7-6
src/bin/smokesignal.rs
···55};
66use atproto_oauth_axum::state::OAuthClientConfig;
77use smokesignal::processor::ContentFetcher;
88+use smokesignal::service::{ServiceDID, ServiceKey, build_service_document};
99+use smokesignal::storage::content::{CachedContentStorage, ContentStorage, FilesystemStorage};
810use smokesignal::tap_processor::TapProcessor;
911use smokesignal::task_search_indexer::SearchIndexer;
1010-use tokio_util::sync::CancellationToken;
1111-use smokesignal::service::{ServiceDID, ServiceKey, build_service_document};
1212-use smokesignal::storage::content::{CachedContentStorage, ContentStorage, FilesystemStorage};
1312use smokesignal::{
1413 http::{
1514 context::{AppEngine, WebContext},
···2221 cache::create_cache_pool,
2322 },
2423};
2424+use tokio_util::sync::CancellationToken;
25252626use chrono::Duration;
2727use smokesignal::config::OAuthBackendConfig;
···330330331331 // Create search indexer if enabled
332332 let search_indexer = if config.enable_opensearch && config.enable_task_opensearch {
333333- let opensearch_endpoint = config.opensearch_endpoint.as_ref().expect(
334334- "OPENSEARCH_ENDPOINT is required when search indexing is enabled",
335335- );
333333+ let opensearch_endpoint = config
334334+ .opensearch_endpoint
335335+ .as_ref()
336336+ .expect("OPENSEARCH_ENDPOINT is required when search indexing is enabled");
336337337338 match SearchIndexer::new(
338339 opensearch_endpoint,
+1-1
src/facets.rs
···12071207 Facet {
12081208 index: ByteSlice {
12091209 byte_start: 291, // Should be 290
12101210- byte_end: 324, // Should be 323 - but 324 > text.len() so this facet is SKIPPED
12101210+ byte_end: 324, // Should be 323 - but 324 > text.len() so this facet is SKIPPED
12111211 },
12121212 features: vec![FacetFeature::Link(Link {
12131213 uri: "https://atprotocalls.leaflet.pub/".to_string(),
···147147 let mut conn = match web_context.cache_pool.get().await {
148148 Ok(conn) => conn,
149149 Err(e) => {
150150- tracing::debug!(?e, "Failed to get Redis connection for AIP session cache write");
150150+ tracing::debug!(
151151+ ?e,
152152+ "Failed to get Redis connection for AIP session cache write"
153153+ );
151154 return;
152155 }
153156 };
+2-2
src/http/errors/mod.rs
···99pub mod import_error;
1010pub mod lfg_error;
1111pub mod login_error;
1212-pub mod profile_import_error;
1312pub mod middleware_errors;
1313+pub mod profile_import_error;
1414pub mod url_error;
1515pub mod view_event_error;
1616pub mod web_error;
···2222pub(crate) use import_error::ImportError;
2323pub(crate) use lfg_error::LfgError;
2424pub(crate) use login_error::LoginError;
2525-pub(crate) use profile_import_error::ProfileImportError;
2625pub(crate) use middleware_errors::WebSessionError;
2626+pub(crate) use profile_import_error::ProfileImportError;
2727pub(crate) use url_error::UrlError;
2828pub(crate) use view_event_error::ViewEventError;
2929pub(crate) use web_error::WebError;
+3-1
src/http/errors/web_error.rs
···166166 /// The contained string is the destination URL to redirect to after re-authentication.
167167 ///
168168 /// **Error Code:** `error-smokesignal-web-3`
169169- #[error("error-smokesignal-web-3 Session expired, re-authentication required (destination: {0})")]
169169+ #[error(
170170+ "error-smokesignal-web-3 Session expired, re-authentication required (destination: {0})"
171171+ )]
170172 SessionStale(String),
171173172174 /// An internal server error occurred.
···216216 aturi: &str,
217217) -> Result<()> {
218218 // Parse the AT-URI
219219- let parsed = ATURI::from_str(aturi)
220220- .with_context(|| format!("Invalid AT-URI: {}", aturi))?;
219219+ let parsed = ATURI::from_str(aturi).with_context(|| format!("Invalid AT-URI: {}", aturi))?;
221220222221 let did = &parsed.authority;
223222 let collection = &parsed.collection;
···264263 // Process the record based on its collection
265264 match collection.as_str() {
266265 COMMUNITY_EVENT_NSID => {
267267- handle_event_import(http_client, pool, content_storage, did, rkey, &cid, &record, pds_endpoint).await?;
266266+ handle_event_import(
267267+ http_client,
268268+ pool,
269269+ content_storage,
270270+ did,
271271+ rkey,
272272+ &cid,
273273+ &record,
274274+ pds_endpoint,
275275+ )
276276+ .await?;
268277 }
269278 COMMUNITY_RSVP_NSID => {
270279 handle_rsvp_import(pool, record_resolver, did, rkey, &cid, &record).await?;
271280 }
272281 PROFILE_NSID => {
273273- handle_profile_import(http_client, pool, content_storage, &document, did, rkey, &cid, &record, pds_endpoint).await?;
282282+ handle_profile_import(
283283+ http_client,
284284+ pool,
285285+ content_storage,
286286+ &document,
287287+ did,
288288+ rkey,
289289+ &cid,
290290+ &record,
291291+ pds_endpoint,
292292+ )
293293+ .await?;
274294 }
275295 ACCEPTANCE_NSID => {
276296 handle_acceptance_import(pool, did, rkey, &cid, &record).await?;
···334354 let all_media = event_record.media;
335355336356 for media in &all_media {
337337- if let Err(err) = download_media(http_client, content_storage, pds_endpoint, did, media).await {
357357+ if let Err(err) =
358358+ download_media(http_client, content_storage, pds_endpoint, did, media).await
359359+ {
338360 tracing::error!(error = ?err, "failed processing image");
339361 }
340362 }
···468490469491 // Download avatar and banner blobs if present
470492 if let Some(ref avatar) = profile_record.avatar
471471- && let Err(e) = download_avatar(http_client, content_storage, pds_endpoint, did, avatar).await
493493+ && let Err(e) =
494494+ download_avatar(http_client, content_storage, pds_endpoint, did, avatar).await
472495 {
473496 tracing::warn!(
474497 error = ?e,
···478501 }
479502480503 if let Some(ref banner) = profile_record.banner
481481- && let Err(e) = download_banner(http_client, content_storage, pds_endpoint, did, banner).await
504504+ && let Err(e) =
505505+ download_banner(http_client, content_storage, pds_endpoint, did, banner).await
482506 {
483507 tracing::warn!(
484508 error = ?e,
···742766 let image_path = format!("{}.png", blob_ref);
743767 tracing::info!(?image_path, "image_path");
744768745745- if content_storage
746746- .as_ref()
747747- .content_exists(&image_path)
748748- .await?
749749- {
769769+ if content_storage.as_ref().content_exists(&image_path).await? {
750770 tracing::info!(?image_path, "content exists");
751771 return Ok(());
752772 }
+6-5
src/http/mod.rs
···77pub mod event_form;
88pub mod event_validation;
99pub mod event_view;
1010+pub mod h3_utils;
1011pub mod handle_accept_rsvp;
1112pub mod handle_admin_content_list;
1213pub mod handle_admin_content_view;
1314pub mod handle_admin_denylist;
1415pub mod handle_admin_event;
1616+pub mod handle_admin_event_index;
1517pub mod handle_admin_events;
1618pub mod handle_admin_handles;
1719pub mod handle_admin_identity_profile;
1820pub mod handle_admin_import_event;
1921pub mod handle_admin_import_rsvp;
2022pub mod handle_admin_index;
2323+pub mod handle_admin_profile_index;
2124pub mod handle_admin_rsvp;
2225pub mod handle_admin_rsvp_accept;
2326pub mod handle_admin_rsvp_accepts;
2427pub mod handle_admin_rsvps;
2528pub mod handle_admin_search_index;
2626-pub mod handle_admin_event_index;
2727-pub mod handle_admin_profile_index;
2829pub mod handle_blob;
2930pub mod handle_bulk_accept_rsvps;
3031pub mod handle_content;
···3940pub mod handle_finalize_acceptance;
4041pub mod handle_geo_aggregation;
4142pub mod handle_health;
4242-pub mod handle_lfg;
4343-pub mod h3_utils;
4443pub mod handle_host_meta;
4544pub mod handle_import;
4645pub mod handle_index;
4646+pub mod handle_lfg;
4747pub mod handle_location;
4848pub mod handle_location_suggestions;
4949pub mod handle_mailgun_webhook;
···5858pub mod handle_preview_description;
5959pub mod handle_profile;
6060pub mod handle_quick_event;
6161-pub mod profile_import;
6261pub mod handle_search;
6362pub mod handle_set_language;
6463pub mod handle_settings;
···6766pub mod handle_view_event;
6867pub mod handle_wellknown;
6968pub mod handle_xrpc_get_event;
6969+pub mod handle_xrpc_get_rsvp;
7070pub mod handle_xrpc_link_attestation;
7171pub mod handle_xrpc_search_events;
7272pub mod handler_mcp;
···7777pub mod middleware_auth;
7878pub mod middleware_i18n;
7979pub mod pagination;
8080+pub mod profile_import;
8081pub mod rsvp_form;
8182pub mod server;
8283pub mod tab_selector;
+36-10
src/http/profile_import.rs
···1818use crate::{
1919 atproto::lexicon::{
2020 bluesky_profile::{BlueskyProfile, NSID as BLUESKY_PROFILE_NSID},
2121- profile::{Profile as SmokesignalProfile, NSID as SMOKESIGNAL_PROFILE_NSID},
2121+ profile::{NSID as SMOKESIGNAL_PROFILE_NSID, Profile as SmokesignalProfile},
2222 },
2323 facets::{FacetLimits, parse_facets_from_text},
2424- storage::{StoragePool, content::ContentStorage, profile::{profile_get_by_did, profile_insert}},
2424+ storage::{
2525+ StoragePool,
2626+ content::ContentStorage,
2727+ profile::{profile_get_by_did, profile_insert},
2828+ },
2529};
26302731use super::errors::ProfileImportError;
···133137 };
134138135139 // Parse facets from description
136136- if let Some(facets) = parse_facets_from_text(&truncated, identity_resolver.as_ref(), facet_limits).await {
140140+ if let Some(facets) =
141141+ parse_facets_from_text(&truncated, identity_resolver.as_ref(), facet_limits).await
142142+ {
137143 smokesignal_profile.facets = Some(facets);
138144 }
139145···183189 swap_commit: None,
184190 };
185191186186- let put_response = put_record(http_client, &Auth::DPoP(dpop_auth.clone()), pds_endpoint, put_request)
187187- .await
188188- .map_err(|e| ProfileImportError::PdsWriteFailed(e.to_string()))?;
192192+ let put_response = put_record(
193193+ http_client,
194194+ &Auth::DPoP(dpop_auth.clone()),
195195+ pds_endpoint,
196196+ put_request,
197197+ )
198198+ .await
199199+ .map_err(|e| ProfileImportError::PdsWriteFailed(e.to_string()))?;
189200190201 match put_response {
191202 PutRecordResponse::StrongRef { uri, cid, .. } => {
···198209 .unwrap_or(handle);
199210200211 // Store profile locally
201201- if let Err(e) = profile_insert(pool, &uri, &cid, did, display_name_for_db, &smokesignal_profile).await {
212212+ if let Err(e) = profile_insert(
213213+ pool,
214214+ &uri,
215215+ &cid,
216216+ did,
217217+ display_name_for_db,
218218+ &smokesignal_profile,
219219+ )
220220+ .await
221221+ {
202222 tracing::error!(did = %did, error = %e, "Failed to store imported profile locally");
203223 return Err(ProfileImportError::StorageFailed(e.to_string()));
204224 }
···243263 .map_err(|e| ProfileImportError::AvatarProcessFailed(e.to_string()))?;
244264245265 // Upload processed avatar to user's PDS
246246- let new_blob = upload_blob_to_pds(http_client, dpop_auth, pds_endpoint, &processed, "image/png")
247247- .await
248248- .map_err(|e| ProfileImportError::AvatarUploadFailed(e.to_string()))?;
266266+ let new_blob = upload_blob_to_pds(
267267+ http_client,
268268+ dpop_auth,
269269+ pds_endpoint,
270270+ &processed,
271271+ "image/png",
272272+ )
273273+ .await
274274+ .map_err(|e| ProfileImportError::AvatarUploadFailed(e.to_string()))?;
249275250276 // Store avatar locally in content storage
251277 let image_path = format!("{}.png", new_blob.inner.ref_.link);
···1313 }
14141515 // Try to load the image to ensure it's valid
1616- image::load_from_memory(data)
1717- .map_err(|e| ImageError::InvalidImageData(e.to_string()))?;
1616+ image::load_from_memory(data).map_err(|e| ImageError::InvalidImageData(e.to_string()))?;
18171918 Ok(())
2019}
···2221/// Process avatar image: validate 1:1 aspect ratio, resize to 400x400, convert to PNG
2322pub(crate) fn process_avatar(data: &[u8]) -> Result<Vec<u8>, ImageError> {
2423 // Load the image
2525- let img = image::load_from_memory(data)
2626- .map_err(|e| ImageError::AvatarLoadFailed(e.to_string()))?;
2424+ let img =
2525+ image::load_from_memory(data).map_err(|e| ImageError::AvatarLoadFailed(e.to_string()))?;
27262827 let (width, height) = img.dimensions();
2928···5251/// Process banner image: validate 16:9 aspect ratio, resize to 1600x900, convert to PNG
5352pub(crate) fn process_banner(data: &[u8]) -> Result<Vec<u8>, ImageError> {
5453 // Load the image
5555- let img = image::load_from_memory(data)
5656- .map_err(|e| ImageError::BannerLoadFailed(e.to_string()))?;
5454+ let img =
5555+ image::load_from_memory(data).map_err(|e| ImageError::BannerLoadFailed(e.to_string()))?;
57565857 let (width, height) = img.dimensions();
5958
+6-2
src/image_errors.rs
···8888 ///
8989 /// This error occurs when an event header image doesn't have the required
9090 /// 3:1 aspect ratio (allowing 10% deviation).
9191- #[error("error-smokesignal-image-10 Event header must have 3:1 aspect ratio: got {width}:{height}")]
9191+ #[error(
9292+ "error-smokesignal-image-10 Event header must have 3:1 aspect ratio: got {width}:{height}"
9393+ )]
9294 InvalidEventHeaderAspectRatio {
9395 /// The width of the image in pixels.
9496 width: u32,
···114116 ///
115117 /// This error occurs when a thumbnail image doesn't have the required
116118 /// square aspect ratio (allowing 5% deviation).
117117- #[error("error-smokesignal-image-13 Thumbnail must have 1:1 aspect ratio: got {width}:{height}")]
119119+ #[error(
120120+ "error-smokesignal-image-13 Thumbnail must have 1:1 aspect ratio: got {width}:{height}"
121121+ )]
118122 InvalidThumbnailAspectRatio {
119123 /// The width of the image in pixels.
120124 width: u32,
+2-2
src/lib.rs
···1818pub mod mcp;
1919pub mod processor;
2020pub mod processor_errors;
2121+pub mod profile_index;
2222+pub mod profile_index_errors;
2123pub mod record_resolver;
2224pub mod refresh_tokens_errors;
2325pub mod search_index;
2424-pub mod profile_index;
2525-pub mod profile_index_errors;
2626pub mod service;
2727pub mod stats;
2828pub mod storage;
+62-63
src/mcp/tools.rs
···11use anyhow::Result;
22-use atproto_client::com::atproto::repo::{put_record, PutRecordRequest, PutRecordResponse};
33-use atproto_identity::resolve::{parse_input, InputType};
22+use atproto_client::com::atproto::repo::{PutRecordRequest, PutRecordResponse, put_record};
33+use atproto_identity::resolve::{InputType, parse_input};
44use atproto_record::lexicon::community::lexicon::calendar::event::NSID as CommunityLexiconCalendarEventNSID;
55-use atproto_record::lexicon::community::lexicon::calendar::rsvp::{Rsvp, RsvpStatus, NSID as RsvpNSID};
55+use atproto_record::lexicon::community::lexicon::calendar::rsvp::{
66+ NSID as RsvpNSID, Rsvp, RsvpStatus,
77+};
68use chrono::Utc;
79use metrohash::MetroHash64;
810use std::hash::Hasher;
···1113use crate::config::OAuthBackendConfig;
1214use crate::http::{context::WebContext, utils::url_from_aturi};
1315use crate::search_index::SearchIndexManager;
1414-use crate::storage::event::{event_get, get_event_rsvp_counts, rsvp_get_by_event_and_did, rsvp_insert_with_metadata, RsvpInsertParams};
1616+use crate::storage::event::{
1717+ RsvpInsertParams, event_get, get_event_rsvp_counts, rsvp_get_by_event_and_did,
1818+ rsvp_insert_with_metadata,
1919+};
15201621/// Wrap response text with safety tags to prevent prompt injection
1722pub fn wrap_response_text(text: String) -> String {
···4449 }
4550 Ok(InputType::Handle(_)) | Err(_) => {
4651 tracing::warn!(repository = %repository, "Invalid repository: not a DID");
4747- return Err("Invalid repository: must be a DID (did:plc:... or did:web:...)".to_string());
5252+ return Err(
5353+ "Invalid repository: must be a DID (did:plc:... or did:web:...)".to_string(),
5454+ );
4855 }
4956 };
5057···58655966 // Fetch event
6067 tracing::debug!(aturi = %aturi, "Fetching event from database");
6161- let event = event_get(&web_context.pool, &aturi)
6262- .await
6363- .map_err(|e| {
6464- tracing::error!(aturi = %aturi, error = ?e, "Failed to fetch event from database");
6565- "Event not found".to_string()
6666- })?;
6868+ let event = event_get(&web_context.pool, &aturi).await.map_err(|e| {
6969+ tracing::error!(aturi = %aturi, error = ?e, "Failed to fetch event from database");
7070+ "Event not found".to_string()
7171+ })?;
67726873 tracing::debug!(aturi = %aturi, "Event fetched successfully");
6974···9499 match cache.validate_token(token).await {
95100 Ok(Some(user_info)) => {
96101 tracing::debug!(did = %user_info.sub, "Checking caller's RSVP status");
9797- match rsvp_get_by_event_and_did(&web_context.pool, &aturi, &user_info.sub).await {
102102+ match rsvp_get_by_event_and_did(&web_context.pool, &aturi, &user_info.sub).await
103103+ {
98104 Ok(Some(rsvp)) => {
99105 // Parse the RSVP record to get the status
100100- let status_str = rsvp.record.0.get("status")
106106+ let status_str = rsvp
107107+ .record
108108+ .0
109109+ .get("status")
101110 .and_then(|v| v.as_str())
102111 .unwrap_or("unknown");
103112 tracing::debug!(status = %status_str, "Found caller's RSVP");
···123132124133 // Generate event URL
125134 tracing::debug!(aturi = %aturi, "Generating event URL");
126126- let url = url_from_aturi(&web_context.config.external_base, &event.aturi)
127127- .map_err(|e| {
128128- tracing::error!(aturi = %aturi, error = ?e, "Failed to generate URL");
129129- format!("Error generating URL: {}", e)
130130- })?;
135135+ let url = url_from_aturi(&web_context.config.external_base, &event.aturi).map_err(|e| {
136136+ tracing::error!(aturi = %aturi, error = ?e, "Failed to generate URL");
137137+ format!("Error generating URL: {}", e)
138138+ })?;
131139132140 // Extract event details from record
133141 let record = &event.record.0;
···295303 "Fetching event details"
296304 );
297305298298- let event = event_get(&web_context.pool, aturi)
299299- .await
300300- .map_err(|e| {
301301- tracing::error!(aturi = %aturi, error = ?e, "Error fetching event");
302302- format!("Error fetching event {}: {}", aturi, e)
303303- })?;
306306+ let event = event_get(&web_context.pool, aturi).await.map_err(|e| {
307307+ tracing::error!(aturi = %aturi, error = ?e, "Error fetching event");
308308+ format!("Error fetching event {}: {}", aturi, e)
309309+ })?;
304310305311 let rsvp_counts = get_event_rsvp_counts(&web_context.pool, vec![aturi.clone()])
306312 .await
···363369 match rsvp_get_by_event_and_did(&web_context.pool, aturi, did).await {
364370 Ok(Some(rsvp)) => {
365371 // Parse the RSVP record to get the status
366366- let status_str = rsvp.record.0.get("status")
372372+ let status_str = rsvp
373373+ .record
374374+ .0
375375+ .get("status")
367376 .and_then(|v| v.as_str())
368377 .unwrap_or("unknown");
369378 Some(status_str.to_string())
370379 }
371371- _ => None
380380+ _ => None,
372381 }
373382 } else {
374383 None
···428437429438 // Validate bearer token to get user info
430439 let user_info = match &web_context.aip_auth_cache {
431431- Some(cache) => {
432432- cache.validate_token(bearer_token)
433433- .await
434434- .map_err(|e| {
435435- tracing::error!(error = ?e, "Failed to validate bearer token");
436436- "Authentication failed".to_string()
437437- })?
438438- .ok_or_else(|| {
439439- tracing::warn!("Invalid bearer token");
440440- "Invalid authentication token".to_string()
441441- })?
442442- }
440440+ Some(cache) => cache
441441+ .validate_token(bearer_token)
442442+ .await
443443+ .map_err(|e| {
444444+ tracing::error!(error = ?e, "Failed to validate bearer token");
445445+ "Authentication failed".to_string()
446446+ })?
447447+ .ok_or_else(|| {
448448+ tracing::warn!("Invalid bearer token");
449449+ "Invalid authentication token".to_string()
450450+ })?,
443451 None => {
444452 tracing::error!("AIP auth cache not configured");
445453 return Err("Authentication not available".to_string());
···457465 }
458466 Ok(InputType::Handle(_)) | Err(_) => {
459467 tracing::warn!(repository = %repository, "Invalid repository: not a DID");
460460- return Err("Invalid repository: must be a DID (did:plc:... or did:web:...)".to_string());
468468+ return Err(
469469+ "Invalid repository: must be a DID (did:plc:... or did:web:...)".to_string(),
470470+ );
461471 }
462472 };
463473···501511 // Create DPoP auth from AIP session
502512 let dpop_auth = match &web_context.config.oauth_backend {
503513 OAuthBackendConfig::AIP { hostname, .. } => {
504504- create_dpop_auth_from_aip_session(
505505- &web_context.http_client,
506506- hostname,
507507- bearer_token,
508508- )
509509- .await
510510- .map_err(|e| {
511511- tracing::error!(error = ?e, "Failed to create DPoP auth");
512512- format!("Authentication error: {}", e)
513513- })?
514514+ create_dpop_auth_from_aip_session(&web_context.http_client, hostname, bearer_token)
515515+ .await
516516+ .map_err(|e| {
517517+ tracing::error!(error = ?e, "Failed to create DPoP auth");
518518+ format!("Authentication error: {}", e)
519519+ })?
514520 }
515521 _ => {
516522 tracing::error!("OAuth backend is not AIP");
···539545 tracing::debug!(record_key = %rsvp_record_key, "Generated RSVP record key");
540546541547 // Check for existing RSVP
542542- let existing_rsvp = rsvp_get_by_event_and_did(
543543- &web_context.pool,
544544- &event_aturi,
545545- user_did,
546546- )
547547- .await
548548- .ok()
549549- .flatten();
548548+ let existing_rsvp = rsvp_get_by_event_and_did(&web_context.pool, &event_aturi, user_did)
549549+ .await
550550+ .ok()
551551+ .flatten();
550552551553 let now = Utc::now();
552554···690692 );
691693692694 // Generate event URL
693693- let event_url = url_from_aturi(&web_context.config.external_base, &event_aturi)
694694- .map_err(|e| {
695695+ let event_url =
696696+ url_from_aturi(&web_context.config.external_base, &event_aturi).map_err(|e| {
695697 tracing::error!(error = ?e, "Failed to generate event URL");
696698 format!("Error generating URL: {}", e)
697699 })?;
···699701 // Format response
700702 let text = format!(
701703 "RSVP created successfully\nEvent: {}\nEvent URL: {}\nStatus: {}\nRSVP AT-URI: {}",
702702- event.name,
703703- event_url,
704704- status,
705705- create_record_result.uri
704704+ event.name, event_url, status, create_record_result.uri
706705 );
707706708707 Ok(wrap_response_text(text))
···1111use std::sync::Arc;
12121313use crate::atproto::utils::get_event_hashtags;
1414-use crate::storage::{event::event_list, identity_profile::handle_for_did, StoragePool};
1414+use crate::storage::{StoragePool, event::event_list, identity_profile::handle_for_did};
1515use atproto_record::lexicon::community::lexicon::calendar::event::Event;
1616use atproto_record::lexicon::community::lexicon::location::LocationOrRef;
1717···463463 }
464464465465 for event in &events {
466466- match self.index_event(pool, identity_resolver.clone(), event).await {
466466+ match self
467467+ .index_event(pool, identity_resolver.clone(), event)
468468+ .await
469469+ {
467470 Ok(_) => {
468471 indexed_count += 1;
469472 tracing::debug!("Indexed event {}", event.aturi);
···1330133313311334 if !response.status_code().is_success() {
13321335 let error_body = response.text().await?;
13331333- return Err(anyhow::anyhow!("Failed to create LFG index: {}", error_body));
13361336+ return Err(anyhow::anyhow!(
13371337+ "Failed to create LFG index: {}",
13381338+ error_body
13391339+ ));
13341340 }
1335134113361342 tracing::info!("Created OpenSearch index {}", Self::LFG_INDEX_NAME);
···1562156815631569 let response = self
15641570 .client
15651565- .update_by_query(opensearch::UpdateByQueryParts::Index(&[Self::LFG_INDEX_NAME]))
15711571+ .update_by_query(opensearch::UpdateByQueryParts::Index(&[
15721572+ Self::LFG_INDEX_NAME,
15731573+ ]))
15661574 .body(update_body)
15671575 .send()
15681576 .await?;
1569157715701578 if !response.status_code().is_success() {
15711579 let error_body = response.text().await?;
15721572- return Err(anyhow::anyhow!("Failed to deactivate expired LFG profiles: {}", error_body));
15801580+ return Err(anyhow::anyhow!(
15811581+ "Failed to deactivate expired LFG profiles: {}",
15821582+ error_body
15831583+ ));
15731584 }
1574158515751586 let body = response.json::<Value>().await?;
···16181629 LocationOrRef::InlineGeo(_) => "InlineGeo",
16191630 _ => "Other",
16201631 };
16211621- assert_eq!(variant0, "InlineAddress", "First location should be InlineAddress");
16321632+ assert_eq!(
16331633+ variant0, "InlineAddress",
16341634+ "First location should be InlineAddress"
16351635+ );
1622163616231637 // Check second location is a geo
16241638 let variant1 = match &locations[1] {
···16331647 assert_eq!(variant1, "InlineGeo", "Second location should be InlineGeo");
1634164816351649 // Test extract_geo_point
16361636- let geo_points: Vec<GeoPoint> = locations
16371637- .iter()
16381638- .filter_map(extract_geo_point)
16391639- .collect();
16501650+ let geo_points: Vec<GeoPoint> = locations.iter().filter_map(extract_geo_point).collect();
1640165116411652 assert_eq!(geo_points.len(), 1, "Should extract 1 geo point");
16421653 assert!((geo_points[0].lat - 39.707931).abs() < 0.0001);
···1671168216721683 // Parse back from Value (simulating database retrieval)
16731684 let event_roundtrip: EventLexicon = serde_json::from_value(serialized).unwrap();
16741674- assert_eq!(event_roundtrip.locations.len(), 2, "Should still have 2 locations after roundtrip");
16851685+ assert_eq!(
16861686+ event_roundtrip.locations.len(),
16871687+ 2,
16881688+ "Should still have 2 locations after roundtrip"
16891689+ );
1675169016761691 // Check that geo location is still correctly identified
16771692 let geo_points: Vec<GeoPoint> = event_roundtrip
···16801695 .filter_map(extract_geo_point)
16811696 .collect();
1682169716831683- assert_eq!(geo_points.len(), 1, "Should extract 1 geo point after roundtrip");
16981698+ assert_eq!(
16991699+ geo_points.len(),
17001700+ 1,
17011701+ "Should extract 1 geo point after roundtrip"
17021702+ );
16841703 assert!((geo_points[0].lat - 39.707931).abs() < 0.0001);
16851704 assert!((geo_points[0].lon - (-84.1648089)).abs() < 0.0001);
16861705 }
+3-6
src/stats.rs
···142142}
143143144144/// Set stats in Redis cache with TTL
145145-async fn set_cached_stats(
146146- cache_pool: &CachePool,
147147- stats: &NetworkStats,
148148-) -> Result<(), StatsError> {
149149- let json = serde_json::to_string(stats)
150150- .map_err(|e| StatsError::SerializationError(e.to_string()))?;
145145+async fn set_cached_stats(cache_pool: &CachePool, stats: &NetworkStats) -> Result<(), StatsError> {
146146+ let json =
147147+ serde_json::to_string(stats).map_err(|e| StatsError::SerializationError(e.to_string()))?;
151148152149 let mut conn = cache_pool
153150 .get()
+16-10
src/storage/atproto_record.rs
···166166 .map_err(StorageError::UnableToExecuteQuery)?;
167167168168 // Query 2: events with locations created by this user
169169- let event_rows: Vec<(String, Option<DateTime<Utc>>, sqlx::types::Json<serde_json::Value>)> =
170170- sqlx::query_as(
171171- r#"
169169+ let event_rows: Vec<(
170170+ String,
171171+ Option<DateTime<Utc>>,
172172+ sqlx::types::Json<serde_json::Value>,
173173+ )> = sqlx::query_as(
174174+ r#"
172175 SELECT aturi, updated_at, record
173176 FROM events
174177 WHERE did = $1
···176179 ORDER BY updated_at DESC NULLS LAST
177180 LIMIT $2
178181 "#,
179179- )
180180- .bind(did)
181181- .bind(fetch_limit)
182182- .fetch_all(pool)
183183- .await
184184- .map_err(StorageError::UnableToExecuteQuery)?;
182182+ )
183183+ .bind(did)
184184+ .bind(fetch_limit)
185185+ .fetch_all(pool)
186186+ .await
187187+ .map_err(StorageError::UnableToExecuteQuery)?;
185188186189 // Collect all suggestions with (priority, timestamp) for sorting
187190 // Priority: 0 = beaconbits/dropanchor (higher priority), 1 = smokesignal events
···262265/// Extract locations from an event record's locations array.
263266///
264267/// Handles both address and geo location types based on the `$type` field.
265265-fn extract_locations_from_event(aturi: &str, record: &serde_json::Value) -> Vec<LocationSuggestion> {
268268+fn extract_locations_from_event(
269269+ aturi: &str,
270270+ record: &serde_json::Value,
271271+) -> Vec<LocationSuggestion> {
266272 let Some(locations) = record.get("locations").and_then(|l| l.as_array()) else {
267273 return vec![];
268274 };
+1-1
src/storage/event.rs
···8181 pub rsvp_status: Option<String>,
8282 pub discovered_at: DateTime<Utc>, // When record was first seen by the system
8383 pub created_at: Option<DateTime<Utc>>, // From the AT Protocol record metadata
8484- pub h3_cell: Option<String>, // H3 cell for LFG location grouping
8484+ pub h3_cell: Option<String>, // H3 cell for LFG location grouping
8585 }
8686}
8787
+1-1
src/storage/lfg.rs
···99//! let lfg: Lfg = serde_json::from_value(record.record.0.clone())?;
1010//! ```
11111212+use super::StoragePool;
1213use super::atproto_record::AtprotoRecord;
1314use super::errors::StorageError;
1414-use super::StoragePool;
1515use crate::atproto::lexicon::lfg::NSID;
16161717/// Get the active LFG record for a DID from atproto_records.
+13-13
src/tap_processor.rs
···44//! eliminating the need for channel-based message passing.
5566use anyhow::Result;
77-use atproto_tap::{connect, RecordAction, TapConfig, TapEvent};
77+use atproto_tap::{RecordAction, TapConfig, TapEvent, connect};
88use std::sync::Arc;
99use std::time::Duration;
1010use tokio_stream::StreamExt;
···114114 let live = record.live;
115115116116 if !live {
117117- tracing::debug!(
118118- "Processing backfill event: {} {} {}",
119119- collection,
120120- did,
121121- rkey
122122- );
117117+ tracing::debug!("Processing backfill event: {} {} {}", collection, did, rkey);
123118 }
124119125120 match record.action {
···139134 None
140135 };
141136142142- let content_future = self
143143- .content_fetcher
144144- .handle_commit(did, collection, rkey, cid, record_value, live);
137137+ let content_future = self.content_fetcher.handle_commit(
138138+ did,
139139+ collection,
140140+ rkey,
141141+ cid,
142142+ record_value,
143143+ live,
144144+ );
145145146146 let index_future = async {
147147 if let (Some(indexer), Some(record)) =
···165165 }
166166 RecordAction::Delete => {
167167 // Process content fetcher and search indexer delete in parallel
168168- let content_future =
169169- self.content_fetcher
170170- .handle_delete(did, collection, rkey, live);
168168+ let content_future = self
169169+ .content_fetcher
170170+ .handle_delete(did, collection, rkey, live);
171171172172 let index_future = async {
173173 if let Some(ref indexer) = self.search_indexer {
+6-6
src/task_lfg_cleanup.rs
···88use tokio::time::{Instant, sleep};
99use tokio_util::sync::CancellationToken;
10101111+use crate::atproto::lexicon::lfg::NSID;
1112use crate::search_index::SearchIndexManager;
1213use crate::storage::StoragePool;
1313-use crate::atproto::lexicon::lfg::NSID;
14141515/// Configuration for the LFG cleanup task.
1616pub struct LfgCleanupTaskConfig {
···113113 }
114114115115 /// Deactivate expired LFG records in the database.
116116- async fn deactivate_expired_in_database(
117117- &self,
118118- now: &chrono::DateTime<Utc>,
119119- ) -> Result<u64> {
116116+ async fn deactivate_expired_in_database(&self, now: &chrono::DateTime<Utc>) -> Result<u64> {
120117 // Query for active LFG records that have expired
121118 let result = sqlx::query(
122119 r#"
···144141 match search_index.deactivate_expired_lfg_profiles().await {
145142 Ok(count) => Ok(count),
146143 Err(err) => {
147147- tracing::warn!("Failed to deactivate expired LFG profiles in OpenSearch: {}", err);
144144+ tracing::warn!(
145145+ "Failed to deactivate expired LFG profiles in OpenSearch: {}",
146146+ err
147147+ );
148148 Ok(0)
149149 }
150150 }
+10-4
src/task_search_indexer.rs
···1010use std::sync::Arc;
11111212use crate::atproto::lexicon::lfg::{Lfg, NSID as LFG_NSID};
1313-use crate::atproto::lexicon::profile::{Profile, NSID as PROFILE_NSID};
1313+use crate::atproto::lexicon::profile::{NSID as PROFILE_NSID, Profile};
1414use crate::atproto::utils::get_profile_hashtags;
1515use crate::search_index::SearchIndexManager;
1616-use crate::storage::event::event_get;
1716use crate::storage::StoragePool;
1717+use crate::storage::event::event_get;
1818use crate::task_search_indexer_errors::SearchIndexerError;
19192020/// Build an AT URI with pre-allocated capacity to avoid format! overhead.
···285285 let aturi = build_aturi(did, LexiconCommunityEventNSID, rkey);
286286287287 // Delegate to SearchIndexManager for consistent deletion logic
288288- self.event_index_manager.delete_indexed_event(&aturi).await?;
288288+ self.event_index_manager
289289+ .delete_indexed_event(&aturi)
290290+ .await?;
289291290292 tracing::debug!("Deleted event {} for DID {} from search index", rkey, did);
291293 Ok(())
···384386 // Delegate to SearchIndexManager for consistent deletion logic
385387 self.event_index_manager.delete_lfg_profile(&aturi).await?;
386388387387- tracing::debug!("Deleted LFG profile {} for DID {} from search index", rkey, did);
389389+ tracing::debug!(
390390+ "Deleted LFG profile {} for DID {} from search index",
391391+ rkey,
392392+ did
393393+ );
388394 Ok(())
389395 }
390396