Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/AGENT_PROMPT_TASK_1.2.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/AGENT_PROMPT_TASK_1.2.md
Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/AGENT_PROMPT_TASK_1.2.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/AGENT_PROMPT_TASK_1.2.md
Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/TASK_1.2_COMPLETION_SUMMARY.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/TASK_1.2_COMPLETION_SUMMARY.md
Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/TASK_1.2_COMPLETION_SUMMARY.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/TASK_1.2_COMPLETION_SUMMARY.md
Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.1_COMPLETION_REPORT.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.1_COMPLETION_REPORT.md
Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.1_COMPLETION_REPORT.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.1_COMPLETION_REPORT.md
Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.1_Nominatim_Client_Integration_Log.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.1_Nominatim_Client_Integration_Log.md
Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.1_Nominatim_Client_Integration_Log.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.1_Nominatim_Client_Integration_Log.md
Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.2_Venue_Search_Service_Implementation_Log.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.2_Venue_Search_Service_Implementation_Log.md
Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.2_Venue_Search_Service_Implementation_Log.md
backup/Geocode-project/Memory/Phase_1_Nominatim_Client_Caching_Infrastructure/Task_1.2_Venue_Search_Service_Implementation_Log.md
Memory/Phase_2_Backend_API_Lexicon_Integration/TASK_2.1_PROMPT.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/TASK_2.1_PROMPT.md
Memory/Phase_2_Backend_API_Lexicon_Integration/TASK_2.1_PROMPT.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/TASK_2.1_PROMPT.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task2.2_prompt.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task2.2_prompt.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task2.2_prompt.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task2.2_prompt.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.1_Lexicon_Compatibility_Venue_Enhancement_Log.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.1_Lexicon_Compatibility_Venue_Enhancement_Log.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.1_Lexicon_Compatibility_Venue_Enhancement_Log.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.1_Lexicon_Compatibility_Venue_Enhancement_Log.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.1_Lexicon_Infrastructure_Analysis_Report.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.1_Lexicon_Infrastructure_Analysis_Report.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.1_Lexicon_Infrastructure_Analysis_Report.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.1_Lexicon_Infrastructure_Analysis_Report.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.2_Event_Location_API_Enhancement_Log.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.2_Event_Location_API_Enhancement_Log.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.2_Event_Location_API_Enhancement_Log.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.2_Event_Location_API_Enhancement_Log.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.3_Progressive_Geocoding_Implementation_Log.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.3_Progressive_Geocoding_Implementation_Log.md
Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.3_Progressive_Geocoding_Implementation_Log.md
backup/Geocode-project/Memory/Phase_2_Backend_API_Lexicon_Integration/Task_2.3_Progressive_Geocoding_Implementation_Log.md
Memory/Phase_3_Frontend_Integration_User_Experience/Task_3.1_Venue_Search_UI_Components_Log.md
backup/Geocode-project/Memory/Phase_3_Frontend_Integration_User_Experience/Task_3.1_Venue_Search_UI_Components_Log.md
Memory/Phase_3_Frontend_Integration_User_Experience/Task_3.1_Venue_Search_UI_Components_Log.md
backup/Geocode-project/Memory/Phase_3_Frontend_Integration_User_Experience/Task_3.1_Venue_Search_UI_Components_Log.md
Memory/Phase_3_Frontend_Integration_User_Experience/Task_3.1_Venue_Search_UI_Components_Prompt.md
backup/Geocode-project/Memory/Phase_3_Frontend_Integration_User_Experience/Task_3.1_Venue_Search_UI_Components_Prompt.md
Memory/Phase_3_Frontend_Integration_User_Experience/Task_3.1_Venue_Search_UI_Components_Prompt.md
backup/Geocode-project/Memory/Phase_3_Frontend_Integration_User_Experience/Task_3.1_Venue_Search_UI_Components_Prompt.md
Memory/README.md
backup/Geocode-project/Memory/README.md
Memory/README.md
backup/Geocode-project/Memory/README.md
docs/TASK_3_1_IMPLEMENTATION_SUMMARY.md
docs/geocoding/TASK_3_1_IMPLEMENTATION_SUMMARY.md
docs/TASK_3_1_IMPLEMENTATION_SUMMARY.md
docs/geocoding/TASK_3_1_IMPLEMENTATION_SUMMARY.md
docs/VENUE_SEARCH_QUICK_START.md
docs/geocoding/VENUE_SEARCH_QUICK_START.md
docs/VENUE_SEARCH_QUICK_START.md
docs/geocoding/VENUE_SEARCH_QUICK_START.md
+1533
docs/img-labeler/BLUESKY_LABELING_IMPLEMENTATION_PLAN.md
+1533
docs/img-labeler/BLUESKY_LABELING_IMPLEMENTATION_PLAN.md
···
1
+
# AT Protocol Image Labeling Implementation Plan
2
+
3
+
**Project:** Plaquetournante-dev Event Management System
4
+
**Feature:** Event Image Upload Moderation & Labeling with AT Protocol Integration
5
+
**Date:** 18 juin 2025
6
+
**Status:** Planning Phase - Updated with Current AT Protocol Specifications
7
+
**Scope:** Event Image Uploads Only
8
+
9
+
## Executive Summary
10
+
11
+
This document outlines the implementation plan for integrating AT Protocol's labeling system specifically for event image uploads in our ATProto-based event management platform. The implementation will use proper AT Protocol blob upload, labeler subscription mechanisms, and the com.atproto.label namespace for content moderation, providing automated image analysis, real-time content labeling, and user-controlled moderation preferences while maintaining seamless user experience.
12
+
13
+
**Key Updates for Current AT Protocol Specifications:**
14
+
- Proper implementation of `com.atproto.repo.uploadBlob` for image storage
15
+
- Integration with labeler services via HTTP headers (`atproto-accept-labelers`)
16
+
- Compliance with `com.atproto.label.defs` label structure specifications
17
+
- Self-labeling support using AT Protocol standard label values
18
+
- User preference management aligned with `app.bsky.actor.defs` preferences schema
19
+
20
+
## Table of Contents
21
+
22
+
1. [Project Context](#project-context)
23
+
2. [Technical Architecture](#technical-architecture)
24
+
3. [Implementation Phases](#implementation-phases)
25
+
4. [Component Specifications](#component-specifications)
26
+
5. [Database Schema Design](#database-schema-design)
27
+
6. [API Integration Points](#api-integration-points)
28
+
7. [User Experience Design](#user-experience-design)
29
+
8. [Performance Considerations](#performance-considerations)
30
+
9. [Testing Strategy](#testing-strategy)
31
+
10. [Deployment Strategy](#deployment-strategy)
32
+
11. [Timeline & Milestones](#timeline--milestones)
33
+
34
+
## Project Context
35
+
36
+
### Current System Analysis
37
+
38
+
Our event management system built on ATProto includes:
39
+
- OAuth authentication with Bluesky PDS (`/src/atproto/atrium_auth.rs`)
40
+
- Event creation and management system
41
+
- PostgreSQL database with migration system
42
+
- Image upload handling for events
43
+
- Bilingual support (English/French Canadian)
44
+
- Admin interface for content management
45
+
46
+
### Integration Goals
47
+
48
+
1. **AT Protocol Compliance**: Full compliance with AT Protocol blob upload and labeling specifications
49
+
2. **Image Content Safety**: Detect inappropriate content using moderation.bsky.app as primary labeler
50
+
3. **Real-time Feedback**: Immediate labeling during upload process via `atproto-accept-labelers` headers
51
+
4. **User Autonomy**: User-controlled moderation preferences and labeler subscriptions
52
+
5. **Self-Labeling Support**: Allow event creators to self-label their image content
53
+
6. **Event Quality**: Maintain high-quality event imagery standards through transparent moderation
54
+
7. **Performance**: Target 2-5 second image analysis response times with proper caching
55
+
8. **Metadata Security**: Strip sensitive image metadata before blob upload as per AT Protocol recommendations
56
+
9. **Custom Lexicon**: Create community.lexicon.calendar.event.image lexicon via RFC process
57
+
10. **Data Consistency**: Implement transaction compensation patterns for AT Protocol + PostgreSQL consistency
58
+
59
+
## Technical Architecture
60
+
61
+
### Core Components Overview
62
+
63
+
```
64
+
┌─────────────────────────────────────────────────────────────┐
65
+
│ Event Creation UI │
66
+
├─────────────────────────────────────────────────────────────┤
67
+
│ Image Upload Component │
68
+
├─────────────────────────────────────────────────────────────┤
69
+
│ HTTP Handler Layer │
70
+
├─────────────────────────────────────────────────────────────┤
71
+
│ Image Upload Handler with Moderation │
72
+
├─────────────────────────────────────────────────────────────┤
73
+
│ Image Labeling Service │
74
+
├─────────────────────────────────────────────────────────────┤
75
+
│ Bluesky Client │ Image Cache │ Rate Limiter │ Label Parser │
76
+
├─────────────────────────────────────────────────────────────┤
77
+
│ Database & Storage Layer │
78
+
├─────────────────────────────────────────────────────────────┤
79
+
│ Event Data │ Image Labels │ Upload Queue │ Cache Store │
80
+
└─────────────────────────────────────────────────────────────┘
81
+
```
82
+
83
+
### Integration Points
84
+
85
+
1. **Event Image Upload**: Integrate labeling during image upload
86
+
2. **Real-time Feedback**: Show labeling results immediately
87
+
3. **Event Display**: Display appropriate labels on event images
88
+
4. **Admin Review**: Allow admin override for borderline cases
89
+
5. **Background Processing**: Handle batch labeling for existing images
90
+
91
+
## Implementation Phases
92
+
93
+
### Phase 1: AT Protocol Labeling Service Integration (Weeks 1-2)
94
+
95
+
**Deliverables:**
96
+
- AT Protocol labeler client with proper header-based subscription
97
+
- Implementation of `com.atproto.label.defs` label structure
98
+
- Labeler service discovery and subscription management
99
+
- Database schema for AT Protocol-compliant label storage
100
+
- Rate limiting and caching for labeler API calls
101
+
- Integration with existing OAuth client for AT Protocol authentication
102
+
103
+
**Key Files:**
104
+
```
105
+
/src/services/atproto_labeling/mod.rs
106
+
/src/services/atproto_labeling/labeler_client.rs
107
+
/src/services/atproto_labeling/label_types.rs
108
+
/src/services/atproto_labeling/subscription_manager.rs
109
+
/migrations/20250618000001_atproto_labels.sql
110
+
```
111
+
112
+
### Phase 2: Blob Upload & Record Creation Integration (Weeks 3-4)
113
+
114
+
**Deliverables:**
115
+
- Proper `com.atproto.repo.uploadBlob` implementation with metadata stripping
116
+
- Image record creation using structured lexicons
117
+
- Real-time label retrieval during upload using subscribed labelers
118
+
- Self-labeling support with `com.atproto.label.defs#selfLabels`
119
+
- Enhanced error handling for AT Protocol-specific failures
120
+
- CDN URL generation for image display from blob references
121
+
122
+
**Key Files:**
123
+
```
124
+
/src/http/handle_atproto_image_upload.rs
125
+
/src/services/atproto_labeling/blob_upload.rs
126
+
/src/services/atproto_labeling/record_creation.rs
127
+
/src/services/atproto_labeling/metadata_stripper.rs
128
+
/src/models/atproto_image_record.rs
129
+
/src/atproto/lexicon/community_lexicon_calendar_event_image.rs
130
+
```
131
+
132
+
### Phase 3: User Experience & Preferences (Weeks 5-6)
133
+
134
+
**Deliverables:**
135
+
- User moderation preferences interface aligned with `app.bsky.actor.defs`
136
+
- Labeler subscription management with per-labeler preferences
137
+
- Content filtering engine based on AT Protocol moderation patterns
138
+
- HTMX Self-labeling i18n UI for content creators during upload
139
+
- Localized preference descriptions and label explanations
140
+
- Integration with existing OAuth flow for preference persistence
141
+
142
+
**Key Files:**
143
+
```
144
+
/src/services/user_moderation/mod.rs
145
+
/src/services/user_moderation/preferences.rs
146
+
/src/services/atproto_labeling/moderation_engine.rs
147
+
/templates/labeler_preferences.*.html
148
+
/src/http/handle_labeler_subscriptions.rs
149
+
/i18n/*/atproto_moderation.ftl
150
+
/static/labeler-subscription.css
151
+
```
152
+
153
+
## User Preference Integration
154
+
155
+
### 1. User Moderation Preferences Service (`/src/services/user_preferences/mod.rs`)
156
+
157
+
**Purpose**: Manage user content filtering preferences according to AT Protocol standards.
158
+
159
+
**Core Types (from AT Protocol documentation):**
160
+
```rust
161
+
#[derive(Debug, Clone, Serialize, Deserialize)]
162
+
pub struct ModerationPrefs {
163
+
pub adult_content_enabled: bool,
164
+
pub labels: HashMap<String, LabelPreference>, // Global label settings
165
+
pub labelers: Vec<ModerationPrefsLabeler>,
166
+
pub muted_words: Vec<MutedWord>,
167
+
pub hidden_posts: Vec<String>,
168
+
}
169
+
170
+
#[derive(Debug, Clone, Serialize, Deserialize)]
171
+
pub struct ModerationPrefsLabeler {
172
+
pub did: String,
173
+
pub labels: HashMap<String, LabelPreference>,
174
+
}
175
+
176
+
#[derive(Debug, Clone, Serialize, Deserialize)]
177
+
pub enum LabelPreference {
178
+
#[serde(rename = "hide")]
179
+
Hide, // Filter from listings, blur content
180
+
#[serde(rename = "warn")]
181
+
Warn, // Show warning overlay
182
+
#[serde(rename = "ignore")]
183
+
Ignore, // Show content normally
184
+
}
185
+
```
186
+
187
+
**User Preferences Management:**
188
+
```rust
189
+
impl UserModerationService {
190
+
pub async fn get_user_preferences(&self, user_did: &str) -> Result<ModerationPrefs> {
191
+
// Get from database or use defaults
192
+
let prefs = self.db.get_user_preferences(user_did).await?
193
+
.unwrap_or_else(|| self.get_default_preferences());
194
+
195
+
Ok(prefs)
196
+
}
197
+
198
+
pub async fn update_user_preferences(
199
+
&self,
200
+
user_did: &str,
201
+
prefs: ModerationPrefs
202
+
) -> Result<()> {
203
+
// Update labeler subscriptions via HTTP headers
204
+
self.update_labeler_subscriptions(&prefs.labelers).await?;
205
+
206
+
// Save to database
207
+
self.db.save_user_preferences(user_did, prefs).await?;
208
+
209
+
Ok(())
210
+
}
211
+
212
+
pub async fn subscribe_to_labeler(
213
+
&self,
214
+
user_did: &str,
215
+
labeler_did: &str,
216
+
label_preferences: HashMap<String, LabelPreference>
217
+
) -> Result<()> {
218
+
let mut prefs = self.get_user_preferences(user_did).await?;
219
+
220
+
// Add new labeler subscription
221
+
prefs.labelers.push(ModerationPrefsLabeler {
222
+
did: labeler_did.to_string(),
223
+
labels: label_preferences,
224
+
});
225
+
226
+
self.update_user_preferences(user_did, prefs).await?;
227
+
Ok(())
228
+
}
229
+
}
230
+
```
231
+
232
+
### 2. Content Moderation Engine (`/src/services/moderation/mod.rs`)
233
+
234
+
**Purpose**: Apply user preferences to filter and moderate content display.
235
+
236
+
**Moderation Decision Engine:**
237
+
```rust
238
+
#[derive(Debug, Clone)]
239
+
pub struct ModerationDecision {
240
+
pub filter: bool, // Remove from listings
241
+
pub blur: bool, // Put behind cover/warning
242
+
pub alert: bool, // Show danger warning
243
+
pub inform: bool, // Show neutral info
244
+
pub no_override: bool, // Disable clicking through warning
245
+
}
246
+
247
+
impl ModerationEngine {
248
+
pub async fn moderate_event_image(
249
+
&self,
250
+
image_labels: &[Label],
251
+
user_preferences: &ModerationPrefs,
252
+
context: &str // "contentList", "contentView", "contentMedia"
253
+
) -> ModerationDecision {
254
+
let mut decision = ModerationDecision::default();
255
+
256
+
for label in image_labels {
257
+
let preference = self.get_label_preference(label, user_preferences);
258
+
259
+
match (&label.val, preference, context) {
260
+
// Global labels with special behavior
261
+
("!hide", _, _) => {
262
+
decision.filter = true;
263
+
decision.blur = true;
264
+
decision.no_override = true;
265
+
}
266
+
("!warn", _, _) => {
267
+
decision.blur = true;
268
+
decision.alert = true;
269
+
}
270
+
("porn", LabelPreference::Hide, _) if !user_preferences.adult_content_enabled => {
271
+
decision.filter = true;
272
+
decision.blur = true;
273
+
decision.no_override = true;
274
+
}
275
+
("porn", LabelPreference::Warn, "contentMedia") => {
276
+
decision.blur = true;
277
+
decision.alert = true;
278
+
}
279
+
("sexual", LabelPreference::Warn, _) => {
280
+
decision.blur = true;
281
+
decision.inform = true;
282
+
}
283
+
("nudity", LabelPreference::Ignore, _) => {
284
+
// Show normally
285
+
}
286
+
_ => {
287
+
// Apply custom labeler preferences
288
+
self.apply_custom_labeler_preference(label, user_preferences, &mut decision);
289
+
}
290
+
}
291
+
}
292
+
293
+
decision
294
+
}
295
+
296
+
fn get_label_preference(&self, label: &Label, prefs: &ModerationPrefs) -> LabelPreference {
297
+
// Check if it's from a subscribed labeler
298
+
for labeler in &prefs.labelers {
299
+
if labeler.did == label.src {
300
+
if let Some(pref) = labeler.labels.get(&label.val) {
301
+
return pref.clone();
302
+
}
303
+
}
304
+
}
305
+
306
+
// Check global label preferences
307
+
prefs.labels.get(&label.val)
308
+
.cloned()
309
+
.unwrap_or(LabelPreference::Warn) // Default to warn
310
+
}
311
+
}
312
+
```
313
+
## Component Specifications
314
+
315
+
### 1. AT Protocol Labeler Client (`/src/services/atproto_labeling/labeler_client.rs`)
316
+
317
+
**Purpose**: Interface with AT Protocol labeling system using current API specifications and proper labeler subscription mechanisms.
318
+
319
+
**Key Features:**
320
+
- Header-based labeler subscription using `atproto-accept-labelers`
321
+
- Proper label fetching using `com.atproto.label.queryLabels`
322
+
- Real-time label subscription via `com.atproto.label.subscribeLabels`
323
+
- Integration with existing OAuth infrastructure
324
+
- Labeler service discovery and definition caching
325
+
326
+
**Core Implementation:**
327
+
```rust
328
+
use crate::atproto::atrium_auth::AtriumOAuthManager;
329
+
use atrium_api::com::atproto::label::{queryLabels, subscribeLabels};
330
+
use atrium_api::types::string::Datetime;
331
+
332
+
pub struct ATProtoLabelerClient {
333
+
oauth_manager: Arc<AtriumOAuthManager>,
334
+
subscribed_labelers: Arc<RwLock<Vec<String>>>, // DIDs of subscribed labelers
335
+
http_client: reqwest::Client,
336
+
label_cache: Arc<LabelCache>,
337
+
}
338
+
339
+
impl ATProtoLabelerClient {
340
+
/// Query labels for a specific AT URI from subscribed labelers
341
+
pub async fn query_labels_for_uri(&self, uri: &str) -> Result<Vec<Label>> {
342
+
let agent = self.oauth_manager.get_atproto_client().await?;
343
+
344
+
let query_input = queryLabels::Input {
345
+
uri_patterns: vec![uri.to_string()],
346
+
sources: Some(self.get_subscribed_labelers().await),
347
+
limit: Some(50),
348
+
cursor: None,
349
+
};
350
+
351
+
let response = agent.api.com.atproto.label.query_labels(query_input).await?;
352
+
Ok(response.labels)
353
+
}
354
+
355
+
/// Create image record with proper self-labeling support
356
+
pub async fn create_image_record_with_labels(
357
+
&self,
358
+
blob_ref: BlobRef,
359
+
metadata: ImageMetadata,
360
+
self_labels: Option<Vec<SelfLabel>>
361
+
) -> Result<CreateRecordOutput> {
362
+
let agent = self.oauth_manager.get_atproto_client().await?;
363
+
364
+
let record = ImageRecord {
365
+
blob: blob_ref,
366
+
alt: metadata.alt_text,
367
+
aspect_ratio: metadata.aspect_ratio,
368
+
labels: self_labels,
369
+
created_at: Datetime::now(),
370
+
};
371
+
372
+
let create_input = CreateRecordInput {
373
+
repo: agent.session.as_ref().unwrap().did.clone(),
374
+
collection: "community.lexicon.calendar.event.image".to_string(),
375
+
rkey: None,
376
+
validate: Some(true),
377
+
record: serde_json::to_value(record)?,
378
+
swap_commit: None,
379
+
};
380
+
381
+
agent.api.com.atproto.repo.create_record(create_input).await
382
+
}
383
+
384
+
/// Set proper labeler headers for requests
385
+
pub fn add_labeler_headers(&self, request: &mut reqwest::Request) {
386
+
let labelers = self.subscribed_labelers.read().unwrap();
387
+
if !labelers.is_empty() {
388
+
let labeler_header = labelers.join(",");
389
+
request.headers_mut().insert(
390
+
"atproto-accept-labelers",
391
+
HeaderValue::from_str(&labeler_header).unwrap()
392
+
);
393
+
}
394
+
}
395
+
}
396
+
```
397
+
398
+
**Labeler Service Discovery:**
399
+
```rust
400
+
impl ATProtoLabelerClient {
401
+
/// Discover available labeler services
402
+
pub async fn discover_labelers(&self, query: Option<&str>) -> Result<Vec<LabelerService>> {
403
+
// In practice, this would query a directory or use search APIs
404
+
// For now, return well-known labelers
405
+
let well_known_labelers = vec![
406
+
LabelerService {
407
+
did: "did:plc:ar7c4by46qjdydhdevvrndac".to_string(), // Bluesky Moderation Service
408
+
handle: "bsky.social".to_string(),
409
+
display_name: "Bluesky Moderation".to_string(),
410
+
description: "Official Bluesky content moderation".to_string(),
411
+
policies: self.fetch_labeler_policies("did:plc:ar7c4by46qjdydhdevvrndac").await?,
412
+
}
413
+
];
414
+
415
+
Ok(well_known_labelers)
416
+
}
417
+
418
+
/// Fetch labeler policies and supported labels
419
+
pub async fn fetch_labeler_policies(&self, labeler_did: &str) -> Result<LabelerPolicies> {
420
+
// Use existing cache or fetch from labeler service record
421
+
if let Some(cached) = self.label_cache.get_labeler_policies(labeler_did).await? {
422
+
return Ok(cached);
423
+
}
424
+
425
+
let agent = self.oauth_manager.get_atproto_client().await?;
426
+
let record = agent.get_record(&format!("at://{}/app.bsky.labeler.service/self", labeler_did)).await?;
427
+
428
+
let service: LabelerServiceRecord = serde_json::from_value(record.value)?;
429
+
430
+
// Cache for 6 hours as recommended
431
+
self.label_cache.cache_labeler_policies(labeler_did, &service.policies, Duration::hours(6)).await?;
432
+
433
+
Ok(service.policies)
434
+
}
435
+
}
436
+
```
437
+
438
+
### 2. AT Protocol Label Types (`/src/services/atproto_labeling/label_types.rs`)
439
+
440
+
**Purpose**: Define AT Protocol-compliant label handling based on current specifications and `com.atproto.label.defs`.
441
+
442
+
**AT Protocol Label Structure (Current Specification):**
443
+
```rust
444
+
#[derive(Debug, Clone, Serialize, Deserialize)]
445
+
pub struct Label {
446
+
/// The DID of the actor who created this label
447
+
pub src: String,
448
+
/// AT URI of the record, repository (account), or other resource
449
+
pub uri: String,
450
+
/// Optionally, CID specifying the specific version of 'uri' resource
451
+
pub cid: Option<String>,
452
+
/// The short string name of the value or type of this label
453
+
pub val: String,
454
+
/// If true, this is a negation label, overwriting a previous label
455
+
pub neg: Option<bool>,
456
+
/// Timestamp when this label was created (ISO 8601)
457
+
pub cts: String,
458
+
}
459
+
460
+
#[derive(Debug, Clone, Serialize, Deserialize)]
461
+
pub struct SelfLabels {
462
+
pub values: Vec<SelfLabel>,
463
+
}
464
+
465
+
#[derive(Debug, Clone, Serialize, Deserialize)]
466
+
pub struct SelfLabel {
467
+
pub val: String,
468
+
}
469
+
```
470
+
471
+
**Global Label Values (Per Current AT Protocol Documentation):**
472
+
```rust
473
+
pub mod global_labels {
474
+
/// Cannot be clicked through, filters from listings completely
475
+
pub const HIDE: &str = "!hide";
476
+
/// Generic warning, can be clicked through
477
+
pub const WARN: &str = "!warn";
478
+
/// Content inaccessible to logged-out users
479
+
pub const NO_UNAUTHENTICATED: &str = "!no-unauthenticated";
480
+
/// Adult content warning on images, 18+ restricted
481
+
pub const PORN: &str = "porn";
482
+
/// Less intense sexual content than porn
483
+
pub const SEXUAL: &str = "sexual";
484
+
/// Violence/gore content
485
+
pub const GRAPHIC_MEDIA: &str = "graphic-media";
486
+
/// Artistic nudity, not 18+ restricted
487
+
pub const NUDITY: &str = "nudity";
488
+
}
489
+
490
+
/// Self-labeling values that users can apply to their own content
491
+
pub const SELF_LABEL_VALUES: &[&str] = &[
492
+
global_labels::NO_UNAUTHENTICATED,
493
+
global_labels::PORN,
494
+
global_labels::SEXUAL,
495
+
global_labels::NUDITY,
496
+
global_labels::GRAPHIC_MEDIA,
497
+
];
498
+
```
499
+
500
+
**Label Behavior Definitions (Based on AT Protocol Standards):**
501
+
```rust
502
+
#[derive(Debug, Clone, Serialize, Deserialize)]
503
+
pub struct LabelValueDefinition {
504
+
pub identifier: String,
505
+
pub severity: LabelSeverity,
506
+
pub blurs: BlurBehavior,
507
+
pub default_setting: DefaultSetting,
508
+
pub adult_only: bool,
509
+
pub locales: Vec<LabelLocale>,
510
+
}
511
+
512
+
#[derive(Debug, Clone, Serialize, Deserialize)]
513
+
pub enum LabelSeverity {
514
+
#[serde(rename = "alert")]
515
+
Alert, // "Danger" warning with red styling
516
+
#[serde(rename = "inform")]
517
+
Inform, // "Info" neutral warning
518
+
#[serde(rename = "none")]
519
+
None, // No warning UI
520
+
}
521
+
522
+
#[derive(Debug, Clone, Serialize, Deserialize)]
523
+
pub enum BlurBehavior {
524
+
#[serde(rename = "content")]
525
+
Content, // Hide content completely, requires click-through
526
+
#[serde(rename = "media")]
527
+
Media, // Blur/hide images in content only
528
+
#[serde(rename = "none")]
529
+
None, // No visual hiding
530
+
}
531
+
532
+
#[derive(Debug, Clone, Serialize, Deserialize)]
533
+
pub enum DefaultSetting {
534
+
#[serde(rename = "hide")]
535
+
Hide, // Remove from feeds, hide content
536
+
#[serde(rename = "warn")]
537
+
Warn, // Show with warning overlay
538
+
#[serde(rename = "ignore")]
539
+
Ignore, // Show content normally
540
+
}
541
+
```
542
+
543
+
**Label Processing for Images:**
544
+
```rust
545
+
impl ImageLabelProcessor {
546
+
/// Determine if upload should be blocked based on AT Protocol global labels
547
+
pub fn should_block_upload(&self, labels: &[Label]) -> bool {
548
+
labels.iter().any(|label| {
549
+
matches!(label.val.as_str(),
550
+
global_labels::HIDE |
551
+
global_labels::PORN // Block porn uploads by default
552
+
)
553
+
})
554
+
}
555
+
556
+
/// Check if user should be warned about content
557
+
pub fn should_warn_user(&self, labels: &[Label]) -> bool {
558
+
labels.iter().any(|label| {
559
+
matches!(label.val.as_str(),
560
+
global_labels::WARN |
561
+
global_labels::SEXUAL |
562
+
global_labels::GRAPHIC_MEDIA |
563
+
global_labels::NUDITY
564
+
)
565
+
})
566
+
}
567
+
568
+
/// Generate upload feedback based on applied labels
569
+
pub fn get_upload_feedback(&self, labels: &[Label]) -> UploadFeedback {
570
+
if self.should_block_upload(labels) {
571
+
UploadFeedback::Blocked {
572
+
reason: "Content violates platform guidelines".to_string(),
573
+
labels: labels.to_vec()
574
+
}
575
+
} else if self.should_warn_user(labels) {
576
+
UploadFeedback::Warning {
577
+
message: "Content contains material that may be sensitive".to_string(),
578
+
labels: labels.to_vec(),
579
+
can_proceed: true,
580
+
}
581
+
} else {
582
+
UploadFeedback::Approved
583
+
}
584
+
}
585
+
586
+
/// Validate self-labels are in allowed set
587
+
pub fn validate_self_labels(&self, self_labels: &[SelfLabel]) -> Result<(), ValidationError> {
588
+
for label in self_labels {
589
+
if !SELF_LABEL_VALUES.contains(&label.val.as_str()) {
590
+
return Err(ValidationError::InvalidSelfLabel(label.val.clone()));
591
+
}
592
+
}
593
+
Ok(())
594
+
}
595
+
}
596
+
```
597
+
598
+
### 3. AT Protocol Image Upload Handler (`/src/http/handle_atproto_image_upload.rs`)
599
+
600
+
**Purpose**: Implement proper AT Protocol blob upload with metadata stripping and labeling integration.
601
+
602
+
**Lexicon Definition for Event Images:**
603
+
```json
604
+
{
605
+
"lexicon": 1,
606
+
"id": "community.lexicon.calendar.event.image",
607
+
"defs": {
608
+
"main": {
609
+
"type": "record",
610
+
"description": "Event image record with AT Protocol compliance",
611
+
"key": "tid",
612
+
"record": {
613
+
"type": "object",
614
+
"required": ["blob", "createdAt"],
615
+
"properties": {
616
+
"blob": {
617
+
"type": "blob",
618
+
"accept": ["image/png", "image/jpeg", "image/webp"],
619
+
"maxSize": 1000000
620
+
},
621
+
"alt": {
622
+
"type": "string",
623
+
"maxLength": 300,
624
+
"description": "Alt text for accessibility"
625
+
},
626
+
"aspectRatio": {
627
+
"type": "object",
628
+
"properties": {
629
+
"width": {"type": "integer", "minimum": 1},
630
+
"height": {"type": "integer", "minimum": 1}
631
+
}
632
+
},
633
+
"labels": {
634
+
"$ref": "com.atproto.label.defs#selfLabels"
635
+
},
636
+
"createdAt": {
637
+
"type": "string",
638
+
"format": "datetime"
639
+
}
640
+
}
641
+
}
642
+
}
643
+
}
644
+
}
645
+
```
646
+
647
+
**Core Upload Workflow with Metadata Stripping:**
648
+
```rust
649
+
use atrium_api::com::atproto::repo::{uploadBlob, createRecord};
650
+
651
+
pub async fn handle_atproto_image_upload(
652
+
State(app_state): State<AppState>,
653
+
Extension(auth): Extension<Auth>,
654
+
Multipart(mut multipart): Multipart,
655
+
) -> Result<impl IntoResponse, WebError> {
656
+
// 1. Validate and process image
657
+
let image_data = validate_and_process_image(multipart).await?;
658
+
659
+
// 2. Strip metadata for privacy/security (per AT Protocol recommendations)
660
+
let cleaned_image_data = strip_image_metadata(&image_data.bytes)?;
661
+
662
+
// 3. Get user's AT Protocol client with proper session
663
+
let atproto_client = app_state.oauth_manager
664
+
.get_atproto_client()
665
+
.await?;
666
+
667
+
// 4. Upload blob to user's PDS
668
+
let blob_upload_result = upload_image_blob(
669
+
&atproto_client,
670
+
&cleaned_image_data,
671
+
&image_data.mime_type
672
+
).await?;
673
+
674
+
// 5. Validate self-labels if provided
675
+
if let Some(ref self_labels) = image_data.self_labels {
676
+
app_state.label_processor.validate_self_labels(self_labels)?;
677
+
}
678
+
679
+
// 6. Create image record with blob reference and self-labels
680
+
let image_record = CreateImageRecord {
681
+
blob: blob_upload_result.blob.clone(),
682
+
alt: image_data.alt_text,
683
+
aspect_ratio: image_data.aspect_ratio,
684
+
labels: image_data.self_labels.map(|labels| SelfLabels { values: labels }),
685
+
created_at: Datetime::now(),
686
+
};
687
+
688
+
// 7. Create record in user's repository let record_result = create_image_record(
689
+
&atproto_client,
690
+
image_record
691
+
).await?;
692
+
693
+
// 8. Query labels from subscribed labelers using proper headers
694
+
let applied_labels = query_labels_for_record(
695
+
&app_state.labeler_client,
696
+
&record_result.uri
697
+
).await?;
698
+
699
+
// 9. Process labels and determine upload outcome
700
+
let upload_result = ProcessedUploadResult {
701
+
at_uri: record_result.uri,
702
+
blob_ref: blob_upload_result.blob,
703
+
cid: record_result.cid,
704
+
labels: applied_labels.clone(),
705
+
display_url: generate_cdn_url(&blob_upload_result.blob),
706
+
moderation_decision: app_state.label_processor.get_upload_feedback(&applied_labels),
707
+
};
708
+
709
+
Ok(Json(upload_result))
710
+
}
711
+
```
712
+
713
+
**Metadata Stripping Implementation:**
714
+
```rust
715
+
use image::{ImageFormat, DynamicImage};
716
+
717
+
/// Strip potentially sensitive metadata from images before upload
718
+
/// This implements AT Protocol recommendation for privacy protection
719
+
pub fn strip_image_metadata(image_bytes: &[u8]) -> Result<Vec<u8>, ImageProcessingError> {
720
+
// Load image
721
+
let img = image::load_from_memory(image_bytes)?;
722
+
723
+
// Determine format
724
+
let format = image::guess_format(image_bytes)?;
725
+
726
+
// Re-encode without metadata
727
+
let mut output = Vec::new();
728
+
match format {
729
+
ImageFormat::Jpeg => {
730
+
img.write_to(&mut std::io::Cursor::new(&mut output), ImageFormat::Jpeg)?;
731
+
}
732
+
ImageFormat::Png => {
733
+
img.write_to(&mut std::io::Cursor::new(&mut output), ImageFormat::Png)?;
734
+
}
735
+
ImageFormat::WebP => {
736
+
img.write_to(&mut std::io::Cursor::new(&mut output), ImageFormat::WebP)?;
737
+
}
738
+
_ => return Err(ImageProcessingError::UnsupportedFormat),
739
+
}
740
+
741
+
Ok(output)
742
+
}
743
+
```
744
+
745
+
**Proper AT Protocol Blob Upload:**
746
+
```rust
747
+
async fn upload_image_blob(
748
+
atproto_client: &AtpAgent<MemoryStore<(), Object<OutputData>>, DpopAwareHttpClient>,
749
+
image_data: &[u8],
750
+
mime_type: &str,
751
+
) -> Result<uploadBlob::Output, ImageUploadError> {
752
+
let upload_input = uploadBlob::Input {
753
+
blob: image_data.to_vec(),
754
+
};
755
+
756
+
// Set content type header manually if needed
757
+
let mut request = atproto_client.api.com.atproto.repo.upload_blob(upload_input);
758
+
759
+
let blob_result = request.await
760
+
.map_err(|e| ImageUploadError::BlobUploadFailed(e.to_string()))?;
761
+
762
+
// Verify blob metadata
763
+
if blob_result.blob.mime_type != mime_type {
764
+
return Err(ImageUploadError::MimeTypeMismatch {
765
+
expected: mime_type.to_string(),
766
+
received: blob_result.blob.mime_type,
767
+
});
768
+
}
769
+
770
+
Ok(blob_result)
771
+
}
772
+
773
+
async fn create_image_record(
774
+
atproto_client: &AtpAgent<MemoryStore<(), Object<OutputData>>, DpopAwareHttpClient>,
775
+
image_record: CreateImageRecord,
776
+
) -> Result<createRecord::Output, ImageUploadError> {
777
+
let create_input = createRecord::Input {
778
+
repo: atproto_client.session.as_ref().unwrap().did.clone(),
779
+
collection: "community.lexicon.calendar.event.image".to_string(),
780
+
rkey: None, // Let PDS generate the record key
781
+
validate: Some(true),
782
+
record: serde_json::to_value(image_record)?,
783
+
swap_commit: None,
784
+
};
785
+
786
+
atproto_client.api.com.atproto.repo.create_record(create_input).await
787
+
.map_err(|e| ImageUploadError::RecordCreationFailed(e.to_string()))
788
+
}
789
+
```
790
+
791
+
### 4. AT Protocol Label Cache System (`/src/services/atproto_labeling/cache.rs`)
792
+
793
+
**Purpose**: Cache label definitions and applied labels according to AT Protocol patterns with proper TTL management.
794
+
795
+
**Cache Architecture Aligned with AT Protocol:**
796
+
```rust
797
+
pub struct ATProtoLabelCache {
798
+
redis_client: Redis,
799
+
local_cache: LRU<String, CachedLabels>,
800
+
labeler_definitions_cache: LRU<String, CachedLabelerDefinitions>,
801
+
subscription_cache: LRU<String, CachedSubscription>,
802
+
}
803
+
804
+
#[derive(Debug, Clone)]
805
+
pub struct CachedLabels {
806
+
pub at_uri: String,
807
+
pub labels: Vec<Label>,
808
+
pub cached_at: DateTime<Utc>,
809
+
pub ttl: Duration,
810
+
pub labeler_sources: Vec<String>, // Which labelers provided these labels
811
+
}
812
+
813
+
#[derive(Debug, Clone)]
814
+
pub struct CachedLabelerDefinitions {
815
+
pub labeler_did: String,
816
+
pub policies: LabelerPolicies,
817
+
pub label_definitions: Vec<LabelValueDefinition>,
818
+
pub cached_at: DateTime<Utc>,
819
+
pub ttl: Duration, // 6 hours as per AT Protocol recommendations
820
+
}
821
+
822
+
#[derive(Debug, Clone)]
823
+
pub struct CachedSubscription {
824
+
pub user_did: String,
825
+
pub subscribed_labelers: Vec<String>,
826
+
pub cached_at: DateTime<Utc>,
827
+
pub ttl: Duration,
828
+
}
829
+
```
830
+
831
+
**Cache Strategy (per AT Protocol recommendations):**
832
+
```rust
833
+
impl ATProtoLabelCache {
834
+
/// Get cached labels for an AT URI with proper TTL handling
835
+
pub async fn get_labels(&self, at_uri: &str) -> Option<Vec<Label>> {
836
+
// Try local cache first (fastest)
837
+
if let Some(cached) = self.local_cache.get(at_uri) {
838
+
if cached.cached_at + cached.ttl > Utc::now() {
839
+
return Some(cached.labels.clone());
840
+
}
841
+
}
842
+
843
+
// Try Redis cache (shared across instances)
844
+
if let Ok(Some(cached)) = self.redis_client.get::<CachedLabels>(at_uri).await {
845
+
if cached.cached_at + cached.ttl > Utc::now() {
846
+
// Refresh local cache
847
+
self.local_cache.put(at_uri.to_string(), cached.clone());
848
+
return Some(cached.labels);
849
+
}
850
+
}
851
+
852
+
None
853
+
}
854
+
855
+
/// Cache labels with appropriate TTL based on source
856
+
pub async fn cache_labels(
857
+
&self,
858
+
at_uri: &str,
859
+
labels: Vec<Label>,
860
+
labeler_sources: Vec<String>
861
+
) -> Result<()> {
862
+
// Determine TTL based on label types and sources
863
+
let ttl = self.determine_label_ttl(&labels, &labeler_sources);
864
+
865
+
let cached = CachedLabels {
866
+
at_uri: at_uri.to_string(),
867
+
labels: labels.clone(),
868
+
cached_at: Utc::now(),
869
+
ttl,
870
+
labeler_sources,
871
+
};
872
+
873
+
// Store in both caches
874
+
self.local_cache.put(at_uri.to_string(), cached.clone());
875
+
876
+
// Redis cache with expiration
877
+
let redis_key = format!("labels:{}", at_uri);
878
+
self.redis_client.set_ex(redis_key, cached, ttl.num_seconds() as u64).await?;
879
+
880
+
Ok(())
881
+
}
882
+
883
+
/// Cache labeler definitions with 6-hour TTL as recommended
884
+
pub async fn cache_labeler_definitions(
885
+
&self,
886
+
labeler_did: &str,
887
+
policies: LabelerPolicies,
888
+
definitions: Vec<LabelValueDefinition>
889
+
) -> Result<()> {
890
+
let cached = CachedLabelerDefinitions {
891
+
labeler_did: labeler_did.to_string(),
892
+
policies,
893
+
label_definitions: definitions,
894
+
cached_at: Utc::now(),
895
+
ttl: Duration::hours(6), // AT Protocol recommendation
896
+
};
897
+
898
+
// Store in local cache
899
+
self.labeler_definitions_cache.put(labeler_did.to_string(), cached.clone());
900
+
901
+
// Store in Redis with expiration
902
+
let redis_key = format!("labeler_def:{}", labeler_did);
903
+
self.redis_client.set_ex(redis_key, cached, 6 * 3600).await?; // 6 hours
904
+
905
+
Ok(())
906
+
}
907
+
908
+
/// Determine appropriate TTL based on label content and usage patterns
909
+
fn determine_label_ttl(&self, labels: &[Label], sources: &[String]) -> Duration {
910
+
// Global labels (!hide, !warn) should have shorter TTL for rapid updates
911
+
let has_global_labels = labels.iter().any(|l| l.val.starts_with('!'));
912
+
913
+
// High-severity labels should refresh more frequently
914
+
let has_high_severity = labels.iter().any(|l| {
915
+
matches!(l.val.as_str(),
916
+
global_labels::HIDE |
917
+
global_labels::PORN |
918
+
global_labels::GRAPHIC_MEDIA
919
+
)
920
+
});
921
+
922
+
match (has_global_labels, has_high_severity) {
923
+
(true, _) => Duration::hours(1), // Global labels - 1 hour (reduced from 15min)
924
+
(_, true) => Duration::hours(4), // High severity labels - 4 hours
925
+
_ => Duration::hours(12), // Standard labels - 12 hours (increased)
926
+
}
927
+
}
928
+
929
+
/// Get labeler definitions with cache fallback
930
+
pub async fn get_labeler_definitions(&self, labeler_did: &str) -> Option<CachedLabelerDefinitions> {
931
+
// Check local cache first
932
+
if let Some(cached) = self.labeler_definitions_cache.get(labeler_did) {
933
+
if cached.cached_at + cached.ttl > Utc::now() {
934
+
return Some(cached.clone());
935
+
}
936
+
}
937
+
938
+
// Check Redis cache
939
+
let redis_key = format!("labeler_def:{}", labeler_did);
940
+
if let Ok(Some(cached)) = self.redis_client.get::<CachedLabelerDefinitions>(&redis_key).await {
941
+
if cached.cached_at + cached.ttl > Utc::now() {
942
+
self.labeler_definitions_cache.put(labeler_did.to_string(), cached.clone());
943
+
return Some(cached);
944
+
}
945
+
}
946
+
947
+
None
948
+
}
949
+
950
+
/// Cache user's labeler subscriptions
951
+
pub async fn cache_user_subscriptions(
952
+
&self,
953
+
user_did: &str,
954
+
subscribed_labelers: Vec<String>
955
+
) -> Result<()> {
956
+
let cached = CachedSubscription {
957
+
user_did: user_did.to_string(),
958
+
subscribed_labelers,
959
+
cached_at: Utc::now(),
960
+
ttl: Duration::hours(1), // User prefs change less frequently
961
+
};
962
+
963
+
// Store in local cache
964
+
self.subscription_cache.put(user_did.to_string(), cached.clone());
965
+
966
+
// Store in Redis
967
+
let redis_key = format!("user_subs:{}", user_did);
968
+
self.redis_client.set_ex(redis_key, cached, 3600).await?; // 1 hour
969
+
970
+
Ok(())
971
+
}
972
+
973
+
/// Invalidate cache for specific AT URI (e.g., when content is updated)
974
+
pub async fn invalidate_labels(&self, at_uri: &str) -> Result<()> {
975
+
// Remove from local cache
976
+
self.local_cache.pop(at_uri);
977
+
978
+
// Remove from Redis
979
+
let redis_key = format!("labels:{}", at_uri);
980
+
self.redis_client.del(redis_key).await?;
981
+
982
+
Ok(())
983
+
}
984
+
}
985
+
```
986
+
987
+
## Database Schema Design
988
+
989
+
### AT Protocol Label Storage (Compliant with Current Specifications)
990
+
```sql
991
+
-- /migrations/20250618000001_atproto_labels.sql
992
+
CREATE TABLE atproto_labels (
993
+
id BIGSERIAL PRIMARY KEY,
994
+
995
+
-- Core AT Protocol Label fields (matching com.atproto.label.defs)
996
+
src VARCHAR(255) NOT NULL, -- DID of labeler who created this label
997
+
uri VARCHAR(500) NOT NULL, -- AT URI of labeled resource
998
+
cid VARCHAR(100), -- Optional CID for specific version
999
+
val VARCHAR(100) NOT NULL, -- Label value (e.g., "porn", "nudity")
1000
+
neg BOOLEAN DEFAULT FALSE, -- Negation flag
1001
+
cts TIMESTAMPTZ NOT NULL, -- Label creation timestamp
1002
+
1003
+
-- Local metadata for caching and performance
1004
+
cached_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1005
+
expires_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() + INTERVAL '6 hours'),
1006
+
labeler_priority INTEGER DEFAULT 0, -- For handling conflicting labels
1007
+
1008
+
-- Indexes for efficient querying
1009
+
CONSTRAINT unique_label UNIQUE(src, uri, cid, val),
1010
+
INDEX idx_atproto_labels_uri (uri),
1011
+
INDEX idx_atproto_labels_src (src),
1012
+
INDEX idx_atproto_labels_val (val),
1013
+
INDEX idx_atproto_labels_cts (cts),
1014
+
INDEX idx_atproto_labels_expires (expires_at)
1015
+
);
1016
+
1017
+
-- Event to AT URI mapping (updated for AT Protocol)
1018
+
CREATE TABLE event_image_records (
1019
+
id BIGSERIAL PRIMARY KEY,
1020
+
event_id BIGINT NOT NULL REFERENCES events(id) ON DELETE CASCADE,
1021
+
1022
+
-- AT Protocol record references
1023
+
at_uri VARCHAR(500) NOT NULL UNIQUE, -- AT URI of image record
1024
+
record_key VARCHAR(100) NOT NULL, -- Record key (rkey)
1025
+
collection VARCHAR(255) NOT NULL DEFAULT 'community.lexicon.calendar.event.image',
1026
+
1027
+
-- Blob reference for display
1028
+
blob_cid VARCHAR(100) NOT NULL, -- CID of the blob
1029
+
blob_mime_type VARCHAR(50) NOT NULL, -- MIME type from blob metadata
1030
+
1031
+
-- Image metadata
1032
+
alt_text TEXT,
1033
+
aspect_ratio_width INTEGER,
1034
+
aspect_ratio_height INTEGER,
1035
+
1036
+
-- Self-labels as JSON array
1037
+
self_labels JSONB DEFAULT '[]', -- [{"val": "nudity"}, {"val": "!no-unauthenticated"}]
1038
+
1039
+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1040
+
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1041
+
1042
+
INDEX idx_event_image_records_event (event_id),
1043
+
INDEX idx_event_image_records_at_uri (at_uri),
1044
+
INDEX idx_event_image_records_collection (collection),
1045
+
INDEX idx_event_image_records_self_labels USING GIN (self_labels)
1046
+
);
1047
+
```
1048
+
1049
+
### Labeler Service Registry (For Discovery and Subscription)
1050
+
```sql
1051
+
-- /migrations/20250618000002_labeler_services.sql
1052
+
CREATE TABLE labeler_services (
1053
+
id BIGSERIAL PRIMARY KEY,
1054
+
1055
+
-- Labeler identification
1056
+
did VARCHAR(255) NOT NULL UNIQUE, -- Labeler's DID
1057
+
handle VARCHAR(255), -- Human-readable handle
1058
+
service_url VARCHAR(500), -- Labeler service endpoint
1059
+
1060
+
-- Service metadata
1061
+
display_name VARCHAR(200),
1062
+
description TEXT,
1063
+
terms_of_service_url VARCHAR(500),
1064
+
privacy_policy_url VARCHAR(500),
1065
+
1066
+
-- Supported labels and policies
1067
+
label_values JSONB NOT NULL DEFAULT '[]', -- Supported label values
1068
+
label_definitions JSONB NOT NULL DEFAULT '[]', -- LabelValueDefinition objects
1069
+
subject_types JSONB DEFAULT '["repo", "record"]', -- What can be labeled
1070
+
subject_collections JSONB DEFAULT '["app.bsky.feed.post", "community.lexicon.calendar.event.image"]', -- ["app.bsky.feed.post", "community.lexicon.calendar.event.image"]
1071
+
reason_types JSONB DEFAULT '["spam", "violation", "misleading", "sexual", "rude"]', -- ["spam", "violation", "misleading", "sexual", "rude"]
1072
+
1073
+
-- Service status
1074
+
is_active BOOLEAN DEFAULT TRUE,
1075
+
last_checked_at TIMESTAMPTZ,
1076
+
response_time_ms INTEGER,
1077
+
1078
+
-- Cache metadata
1079
+
policies_cached_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1080
+
policies_expires_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() + INTERVAL '6 hours'),
1081
+
1082
+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1083
+
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1084
+
1085
+
INDEX idx_labeler_services_did (did),
1086
+
INDEX idx_labeler_services_active (is_active),
1087
+
INDEX idx_labeler_services_expires (policies_expires_at)
1088
+
);
1089
+
```
1090
+
1091
+
### User Moderation Preferences (Aligned with app.bsky.actor.defs)
1092
+
```sql
1093
+
-- /migrations/20250618000003_user_moderation_prefs.sql
1094
+
CREATE TABLE user_moderation_preferences (
1095
+
user_did VARCHAR(255) PRIMARY KEY,
1096
+
1097
+
-- Global moderation settings
1098
+
adult_content_enabled BOOLEAN DEFAULT FALSE,
1099
+
1100
+
-- Global label preferences (applies to all labelers unless overridden)
1101
+
global_label_prefs JSONB NOT NULL DEFAULT '{}', -- {label_val: "hide"|"warn"|"ignore"}
1102
+
1103
+
-- Labeler subscriptions with per-labeler preferences
1104
+
labeler_subscriptions JSONB NOT NULL DEFAULT '[]', -- [{did: "", labels: {label_val: setting}}]
1105
+
1106
+
-- Content filtering preferences
1107
+
muted_words JSONB DEFAULT '[]', -- Array of muted word patterns
1108
+
hidden_posts JSONB DEFAULT '[]', -- Array of hidden post AT URIs
1109
+
hidden_reposts JSONB DEFAULT '[]', -- Array of hidden repost AT URIs
1110
+
1111
+
-- Privacy settings
1112
+
hide_from_unauthenticated BOOLEAN DEFAULT FALSE,
1113
+
1114
+
-- Metadata
1115
+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1116
+
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
1117
+
1118
+
INDEX idx_user_moderation_did (user_did),
1119
+
INDEX idx_user_moderation_updated (updated_at)
1120
+
);
1121
+
1122
+
-- Insert default preferences template
1123
+
INSERT INTO user_moderation_preferences (
1124
+
user_did,
1125
+
global_label_prefs,
1126
+
labeler_subscriptions
1127
+
) VALUES (
1128
+
'default_template',
1129
+
jsonb_build_object(
1130
+
'porn', 'hide',
1131
+
'sexual', 'warn',
1132
+
'graphic-media', 'warn',
1133
+
'nudity', 'ignore',
1134
+
'!hide', 'hide',
1135
+
'!warn', 'warn',
1136
+
'!no-unauthenticated', 'hide'
1137
+
),
1138
+
jsonb_build_array(
1139
+
jsonb_build_object(
1140
+
'did', 'did:plc:ar7c4by46qjdydhdevvrndac',
1141
+
'labels', jsonb_build_object(
1142
+
'porn', 'hide',
1143
+
'sexual', 'warn',
1144
+
'graphic-media', 'warn'
1145
+
)
1146
+
)
1147
+
)
1148
+
) ON CONFLICT (user_did) DO NOTHING;
1149
+
```
1150
+
1151
+
### Self-Labels Tracking for Events
1152
+
```sql
1153
+
-- /migrations/20250618000004_event_self_labels.sql
1154
+
-- Update events table to track self-labeling
1155
+
ALTER TABLE events ADD COLUMN image_self_labels JSONB DEFAULT '[]';
1156
+
ALTER TABLE events ADD COLUMN requires_login BOOLEAN DEFAULT FALSE; -- For !no-unauthenticated
1157
+
1158
+
-- Index for querying events by self-labels
1159
+
CREATE INDEX idx_events_self_labels ON events USING GIN (image_self_labels);
1160
+
CREATE INDEX idx_events_requires_login ON events (requires_login);
1161
+
1162
+
-- View for events with resolved labels (both self-labels and external labels)
1163
+
CREATE VIEW events_with_labels AS
1164
+
SELECT
1165
+
e.*,
1166
+
eir.at_uri as image_at_uri,
1167
+
eir.blob_cid as image_blob_cid,
1168
+
eir.self_labels as image_self_labels,
1169
+
COALESCE(
1170
+
jsonb_agg(
1171
+
jsonb_build_object(
1172
+
'src', al.src,
1173
+
'val', al.val,
1174
+
'neg', al.neg,
1175
+
'cts', al.cts
1176
+
)
1177
+
) FILTER (WHERE al.id IS NOT NULL),
1178
+
'[]'::jsonb
1179
+
) as external_labels
1180
+
FROM events e
1181
+
LEFT JOIN event_image_records eir ON e.id = eir.event_id
1182
+
LEFT JOIN atproto_labels al ON al.uri = eir.at_uri
1183
+
GROUP BY e.id, eir.at_uri, eir.blob_cid, eir.self_labels;
1184
+
```
1185
+
1186
+
## API Integration Points
1187
+
1188
+
### 1. AT Protocol Labeler Integration
1189
+
1190
+
**Current AT Protocol Label Structure (per `com.atproto.label.defs`):**
1191
+
```rust
1192
+
#[derive(Debug, Clone, Serialize, Deserialize)]
1193
+
pub struct Label {
1194
+
/// The DID of the actor who created this label
1195
+
pub src: String,
1196
+
/// AT URI of the record, repository (account), or other resource
1197
+
pub uri: String,
1198
+
/// Optionally, CID specifying the specific version of 'uri' resource
1199
+
pub cid: Option<String>,
1200
+
/// The short string name of the value or type of this label
1201
+
pub val: String,
1202
+
/// If true, this is a negation label, overwriting a previous label
1203
+
pub neg: Option<bool>,
1204
+
/// Timestamp when this label was created (ISO 8601)
1205
+
pub cts: String,
1206
+
}
1207
+
1208
+
#[derive(Debug, Clone, Serialize, Deserialize)]
1209
+
pub struct SelfLabels {
1210
+
pub values: Vec<SelfLabel>,
1211
+
}
1212
+
1213
+
#[derive(Debug, Clone, Serialize, Deserialize)]
1214
+
pub struct SelfLabel {
1215
+
pub val: String,
1216
+
}
1217
+
```
1218
+
1219
+
**Global Label Values (Current AT Protocol Standard):**
1220
+
- `!hide` - Content must be hidden, cannot be clicked through, filters from listings
1221
+
- `!warn` - Generic warning, can be clicked through
1222
+
- `!no-unauthenticated` - Content inaccessible to logged-out users
1223
+
- `porn` - Adult content warning, 18+ restricted
1224
+
- `sexual` - Sexual content, less intense than porn
1225
+
- `graphic-media` - Violence/gore content
1226
+
- `nudity` - Artistic nudity, not 18+ restricted
1227
+
1228
+
### 2. Labeler Subscription Using HTTP Headers
1229
+
1230
+
**Confirmed AT Protocol Integration Details**
1231
+
1232
+
**Labeler Header Support (Confirmed)**
1233
+
1234
+
Based on AT Protocol documentation, the `atproto-accept-labelers` header **is supported** with this syntax:
1235
+
```
1236
+
atproto-accept-labelers: did:web:mod.example.com;redact, did:plc:abc123, did:plc:xyz789
1237
+
```
1238
+
1239
+
**Implementation:**
1240
+
```rust
1241
+
// Confirmed working pattern from AT Protocol specs
1242
+
const LABELER_ACCEPT_HEADER: &str = "atproto-accept-labelers";
1243
+
1244
+
impl LabelerSubscriptionManager {
1245
+
/// Add labeler subscription headers with confirmed syntax
1246
+
pub fn add_labeler_headers(&self, request: &mut reqwest::Request, user_did: &str) {
1247
+
if let Some(subscriptions) = self.user_subscriptions.read().unwrap().get(user_did) {
1248
+
if !subscriptions.is_empty() {
1249
+
// Format: did1, did2, did3;flags
1250
+
let labeler_header = subscriptions.join(", ");
1251
+
request.headers_mut().insert(
1252
+
LABELER_ACCEPT_HEADER,
1253
+
HeaderValue::from_str(&labeler_header).unwrap()
1254
+
);
1255
+
}
1256
+
}
1257
+
}
1258
+
}
1259
+
```
1260
+
1261
+
### 3. Primary Labeler Service
1262
+
1263
+
**moderation.bsky.app Details:**
1264
+
- DID: `did:plc:ar7c4by46qjdydhdevvrndac` (Bluesky Moderation Service)
1265
+
- Primary labeler for initial implementation
1266
+
- Fallback strategy: Direct connection to moderation.bsky.app if other labelers fail
1267
+
- Supported labels: Standard AT Protocol global labels (`!hide`, `!warn`, `porn`, `sexual`, `nudity`, `graphic-media`)
1268
+
1269
+
### 4. Error Handling & Rollback Strategy
1270
+
1271
+
**Transaction Compensation Pattern (Research-Based):**
1272
+
Since AT Protocol doesn't have built-in transaction rollback, we implement compensation:
1273
+
1274
+
```rust
1275
+
pub struct ImageUploadTransaction {
1276
+
pub blob_ref: Option<BlobRef>,
1277
+
pub record_uri: Option<String>,
1278
+
pub postgres_id: Option<i64>,
1279
+
pub cache_keys: Vec<String>,
1280
+
}
1281
+
1282
+
impl ImageUploadTransaction {
1283
+
/// Rollback strategy for failed uploads
1284
+
pub async fn compensate(&self, atproto_client: &AtpAgent) -> Result<()> {
1285
+
// 1. Delete AT Protocol record if created
1286
+
if let Some(uri) = &self.record_uri {
1287
+
let _ = atproto_client.api.com.atproto.repo.delete_record(DeleteRecordInput {
1288
+
repo: atproto_client.session.as_ref().unwrap().did.clone(),
1289
+
collection: "community.lexicon.calendar.event.image".to_string(),
1290
+
rkey: extract_rkey_from_uri(uri)?,
1291
+
}).await; // Best effort - blobs remain in PDS temporarily
1292
+
}
1293
+
1294
+
// 2. Remove PostgreSQL entries
1295
+
if let Some(id) = self.postgres_id {
1296
+
sqlx::query!("DELETE FROM event_image_records WHERE id = $1", id)
1297
+
.execute(&self.db)
1298
+
.await?;
1299
+
}
1300
+
1301
+
// 3. Clear cache entries
1302
+
for key in &self.cache_keys {
1303
+
self.cache.invalidate(key).await?;
1304
+
}
1305
+
1306
+
Ok(())
1307
+
}
1308
+
}
1309
+
1310
+
/// Upload with compensation on failure
1311
+
pub async fn upload_with_compensation(
1312
+
upload_data: ImageUploadData,
1313
+
atproto_client: &AtpAgent,
1314
+
db: &PgPool,
1315
+
) -> Result<UploadResult, UploadError> {
1316
+
let mut transaction = ImageUploadTransaction::default();
1317
+
1318
+
// Step 1: Upload blob
1319
+
let blob_result = match upload_blob_to_pds(&upload_data, atproto_client).await {
1320
+
Ok(result) => {
1321
+
transaction.blob_ref = Some(result.blob.clone());
1322
+
result
1323
+
}
1324
+
Err(e) => return Err(UploadError::BlobUploadFailed(e)),
1325
+
};
1326
+
1327
+
// Step 2: Create AT Protocol record
1328
+
let record_result = match create_atproto_record(&blob_result, atproto_client).await {
1329
+
Ok(result) => {
1330
+
transaction.record_uri = Some(result.uri.clone());
1331
+
result
1332
+
}
1333
+
Err(e) => {
1334
+
transaction.compensate(atproto_client).await?;
1335
+
return Err(UploadError::RecordCreationFailed(e));
1336
+
}
1337
+
};
1338
+
1339
+
// Step 3: Store in PostgreSQL
1340
+
let postgres_id = match store_in_postgres(&record_result, db).await {
1341
+
Ok(id) => {
1342
+
transaction.postgres_id = Some(id);
1343
+
id
1344
+
}
1345
+
Err(e) => {
1346
+
transaction.compensate(atproto_client).await?;
1347
+
return Err(UploadError::DatabaseStorageFailed(e));
1348
+
}
1349
+
};
1350
+
1351
+
Ok(UploadResult {
1352
+
at_uri: record_result.uri,
1353
+
blob_cid: blob_result.blob.ref_link,
1354
+
postgres_id,
1355
+
})
1356
+
}
1357
+
```
1358
+
1359
+
### 4. Cache Strategy Research Results
1360
+
1361
+
**Optimized TTL Strategy:**
1362
+
Based on AT Protocol patterns and realistic usage:
1363
+
1364
+
```rust
1365
+
impl ATProtoLabelCache {
1366
+
/// Determine appropriate TTL based on label content and usage patterns
1367
+
fn determine_label_ttl(&self, labels: &[Label], sources: &[String]) -> Duration {
1368
+
// Global labels (!hide, !warn) should have shorter TTL for rapid updates
1369
+
let has_global_labels = labels.iter().any(|l| l.val.starts_with('!'));
1370
+
1371
+
// High-severity labels should refresh more frequently
1372
+
let has_high_severity = labels.iter().any(|l| {
1373
+
matches!(l.val.as_str(),
1374
+
global_labels::HIDE |
1375
+
global_labels::PORN |
1376
+
global_labels::GRAPHIC_MEDIA
1377
+
)
1378
+
});
1379
+
1380
+
match (has_global_labels, has_high_severity) {
1381
+
(true, _) => Duration::hours(1), // Global labels - 1 hour (reduced from 15min)
1382
+
(_, true) => Duration::hours(4), // High severity labels - 4 hours
1383
+
_ => Duration::hours(12), // Standard labels - 12 hours (increased)
1384
+
}
1385
+
}
1386
+
1387
+
/// Cache invalidation strategy for distributed instances
1388
+
pub async fn invalidate_distributed(&self, at_uri: &str) -> Result<()> {
1389
+
// Use Redis pub/sub for cache invalidation across instances
1390
+
let invalidation_message = CacheInvalidationMessage {
1391
+
at_uri: at_uri.to_string(),
1392
+
timestamp: Utc::now(),
1393
+
reason: "content_updated".to_string(),
1394
+
};
1395
+
1396
+
self.redis_client
1397
+
.publish("cache_invalidation", serde_json::to_string(&invalidation_message)?)
1398
+
.await?;
1399
+
1400
+
// Also invalidate local cache immediately
1401
+
self.local_cache.pop(at_uri);
1402
+
1403
+
Ok(())
1404
+
}
1405
+
}
1406
+
```
1407
+
## Outstanding Research & Implementation Questions
1408
+
1409
+
### 1. Critical Technical Decisions
1410
+
1411
+
**Question: Data Consistency Strategy**
1412
+
- **Status**: Requires investigation
1413
+
- **Decision**: Implement transaction compensation pattern with detailed logging
1414
+
- **Rollback Strategy**:
1415
+
1. Log all operations with unique transaction ID
1416
+
2. Implement compensation functions for each step
1417
+
3. Use circuit breaker pattern for external API failures
1418
+
4. Background cleanup process for orphaned resources
1419
+
1420
+
**Question: Cache TTL Optimization**
1421
+
- **Status**: Research needed
1422
+
- **Initial Strategy**: Conservative TTLs (1-12 hours) with invalidation on content changes
1423
+
- **Monitoring**: Track cache hit rates, API call frequency, user experience metrics
1424
+
- **Optimization**: Adjust TTLs based on actual usage patterns and performance data
1425
+
1426
+
**Question: Distributed Cache Invalidation**
1427
+
- **Status**: Requires implementation
1428
+
- **Solution**: Redis pub/sub with fallback to polling
1429
+
- **Pattern**: Event-driven invalidation with eventual consistency model
1430
+
1431
+
### 2. Performance Baseline Research
1432
+
1433
+
**Realistic Response Time Targets (Based on Workflow Complexity):**
1434
+
1435
+
1. **Blob Upload to PDS**: 1-3 seconds
1436
+
- Network latency to user's PDS
1437
+
- Image metadata stripping: ~100ms
1438
+
- File size optimization: ~200ms
1439
+
1440
+
2. **AT Protocol Record Creation**: 500ms-1s
1441
+
- Authentication token validation
1442
+
- Record validation and creation
1443
+
- PDS persistence
1444
+
1445
+
3. **Label Fetching from moderation.bsky.app**: 500ms-2s
1446
+
- API call with proper headers
1447
+
- Label processing and validation
1448
+
- Cache storage
1449
+
1450
+
4. **PostgreSQL Operations**: 50-200ms
1451
+
- Event-image relationship storage
1452
+
- Index updates and queries
1453
+
1454
+
**Total Realistic Target: 2-5 seconds** for complete upload workflow
1455
+
1456
+
### 3. Small Market Considerations (10M Population)
1457
+
1458
+
**Load Estimates:**
1459
+
- Concurrent users: ~1,000-5,000 peak
1460
+
- Image uploads: ~50-200 per minute peak
1461
+
- Cache storage: ~10GB for image labels
1462
+
- Database growth: ~1M records per year
1463
+
1464
+
**Infrastructure Scaling:**
1465
+
- Single Redis instance sufficient initially
1466
+
- PostgreSQL connection pooling (20-50 connections)
1467
+
- Rate limiting well within moderation.bsky.app capacity
1468
+
- ServeProxy CDN rate limits adequate for market size
1469
+
1470
+
### 4. RFC Process Timeline
1471
+
1472
+
**Lexicon Submission Strategy:**
1473
+
1. **Week 1**: Draft initial lexicon specification
1474
+
2. **Week 2**: Community feedback period
1475
+
3. **Week 3-4**: Iteration based on feedback
1476
+
4. **Week 5**: Formal submission to AT Protocol working group
1477
+
5. **Week 6-8**: Implementation while awaiting approval
1478
+
1479
+
**Fallback Plan**: Use existing image embedding patterns if RFC delayed
1480
+
1481
+
### 5. Feature Flag Implementation
1482
+
1483
+
**Rollout Control:**
1484
+
```rust
1485
+
#[derive(Debug, Clone)]
1486
+
pub struct ATProtoImageFeatureFlags {
1487
+
pub enabled_for_admins: bool,
1488
+
pub enabled_for_percentage: f32, // 0.0 to 1.0
1489
+
pub labeling_enabled: bool,
1490
+
pub cdn_integration_enabled: bool,
1491
+
pub self_labeling_enabled: bool,
1492
+
}
1493
+
1494
+
impl ATProtoImageFeatureFlags {
1495
+
pub fn is_enabled_for_user(&self, user_id: i64, is_admin: bool) -> bool {
1496
+
if is_admin && self.enabled_for_admins {
1497
+
return true;
1498
+
}
1499
+
1500
+
// Deterministic percentage rollout based on user ID
1501
+
let hash = user_id as f32 / i64::MAX as f32;
1502
+
hash <= self.enabled_for_percentage
1503
+
}
1504
+
}
1505
+
```
1506
+
1507
+
### 6. Monitoring & Alerting Strategy
1508
+
1509
+
**Key Metrics to Track:**
1510
+
- Upload success rate (target: >95%)
1511
+
- Average upload time (target: <5 seconds)
1512
+
- Label accuracy (manual review sample)
1513
+
- Cache hit rate (target: >80%)
1514
+
- Error rate by component
1515
+
- User satisfaction scores
1516
+
1517
+
**Alert Thresholds:**
1518
+
- Critical: Upload failure rate >10% for 5 minutes
1519
+
- Warning: Average upload time >7 seconds for 10 minutes
1520
+
- Info: Cache hit rate <70% for 1 hour
1521
+
1522
+
### 7. Business Continuity Planning
1523
+
1524
+
**Fallback Strategies:**
1525
+
1. **AT Protocol Unavailable**: Temporary local image storage with batch sync
1526
+
2. **moderation.bsky.app Down**: Basic keyword filtering + admin review queue
1527
+
3. **ServeProxy CDN Issues**: Direct blob serving from PDS
1528
+
4. **PostgreSQL Issues**: Read-only mode with cached data
1529
+
1530
+
**Data Recovery:**
1531
+
- Daily PostgreSQL backups
1532
+
- AT Protocol records serve as source of truth
1533
+
- Image regeneration from blobs if cache corrupted