Rust AppView - highly experimental!

feat: denormalize strategy; labelers

+362 -254
+49 -30
consumer/src/db/labels.rs
··· 17 17 return Ok(0); // Actor doesn't exist yet 18 18 }; 19 19 20 - // drop any label defs not currently in the list 21 - let _ = conn 22 - .execute( 23 - "DELETE FROM labeler_defs WHERE labeler_actor_id=$1 AND NOT label_identifier = any($2)", 24 - &[&labeler_actor_id, &rec.policies.label_values], 25 - ) 26 - .await?; 27 - 20 + // Build labeler_defs array from label values and definitions 21 + // Maps label_identifier -> definition 28 22 let definitions = rec 29 23 .policies 30 24 .label_value_definitions ··· 32 26 .map(|def| (def.identifier.clone(), def)) 33 27 .collect::<HashMap<String, &LabelValueDefinition>>(); 34 28 29 + // Build arrays of values for each composite field 30 + let mut label_identifiers = Vec::new(); 31 + let mut severities = Vec::new(); 32 + let mut blurs_vals = Vec::new(); 33 + let mut default_settings = Vec::new(); 34 + let mut adult_onlys = Vec::new(); 35 + let mut locales_vals = Vec::new(); 36 + 35 37 for label in &rec.policies.label_values { 36 38 let definition = definitions.get(label); 37 39 38 - let severity = definition.map(|v| v.severity.to_string()); 39 - let blurs = definition.map(|v| v.blurs.to_string()); 40 - let default_setting = definition 41 - .and_then(|v| v.default_setting) 42 - .map(|v| v.to_string()); 43 - let adult_only = definition.and_then(|v| v.adult_only).unwrap_or_default(); 44 - let locales = definition.and_then(|v| serde_json::to_value(&v.locales).ok()); 40 + label_identifiers.push(label.clone()); 41 + severities.push(definition.map(|v| v.severity.to_string())); 42 + blurs_vals.push(definition.map(|v| v.blurs.to_string())); 43 + default_settings.push(definition.and_then(|v| v.default_setting).map(|v| v.to_string())); 44 + adult_onlys.push(definition.and_then(|v| v.adult_only).unwrap_or_default()); 45 + locales_vals.push(definition.and_then(|v| serde_json::to_value(&v.locales).ok())); 46 + } 45 47 46 - let _ = conn 47 - .execute( 48 - include_str!("sql/label_defs_upsert.sql"), 49 - &[ 50 - &labeler_actor_id, 51 - &label, 52 - &severity, 53 - &blurs, 54 - &default_setting, 55 - &adult_only, 56 - &locales, 57 - ], 58 - ) 59 - .await?; 60 - } 48 + // Update labeler_defs array on actors table using composite type constructor 49 + // ROW(...) constructs the composite type, ARRAY[...] builds the array 50 + conn.execute( 51 + "UPDATE actors 52 + SET labeler_defs = ( 53 + SELECT ARRAY_AGG( 54 + ROW( 55 + label_identifier, 56 + severity::text::label_severity, 57 + blurs::text::label_blurs, 58 + default_setting::text::label_default_setting, 59 + adult_only, 60 + locales, 61 + NOW() 62 + )::labeler_def_record 63 + ORDER BY idx 64 + ) 65 + FROM unnest($2::text[], $3::text[], $4::text[], $5::text[], $6::boolean[], $7::jsonb[]) 66 + WITH ORDINALITY AS t(label_identifier, severity, blurs, default_setting, adult_only, locales, idx) 67 + ) 68 + WHERE id = $1", 69 + &[ 70 + &labeler_actor_id, 71 + &label_identifiers, 72 + &severities, 73 + &blurs_vals, 74 + &default_settings, 75 + &adult_onlys, 76 + &locales_vals, 77 + ], 78 + ) 79 + .await?; 61 80 62 81 Ok(0) 63 82 }
+29 -21
consumer/src/db/operations/labeler.rs
··· 8 8 /// 9 9 /// This function: 10 10 /// 1. Gets/creates actor_id for the DID 11 - /// 2. Inserts stub labeler if not found (status='stub') 11 + /// 2. Sets labeler_cid and labeler_status='stub' on actors table if not already set 12 12 /// 3. Returns actor_id 13 13 /// 14 14 /// Uses advisory locks to prevent concurrent transactions from racing on the same labeler URI. 15 15 /// 16 - /// Labelers have a 1:1 relationship with actors (actor_id is the PK), 17 - /// so the actor_id serves as both the labeler identifier and return value. 16 + /// Labelers are now denormalized into the actors table with labeler_* columns. 18 17 pub async fn ensure_labeler_stub<C: GenericClient>( 19 18 conn: &C, 20 19 did: &str, ··· 36 35 // Get/create actor_id (discard allowlist status and was_created, not needed for labelers) 37 36 let actor_id = crate::db::actor::ensure_actor_id(conn, did, None, None, chrono::Utc::now()).await?; 38 37 39 - // Use CTE to SELECT first, then conditionally INSERT only if not found 40 - // This prevents unnecessary sequence consumption when labeler stub already exists 41 - // Still uses ON CONFLICT for race condition safety between concurrent transactions 38 + // Set labeler_cid and labeler_status on actors table if not already set 39 + // Only update if labeler_cid is NULL (stub not yet created) 42 40 conn.execute( 43 - "WITH existing AS ( 44 - SELECT actor_id FROM labelers WHERE actor_id = $1 45 - ) 46 - INSERT INTO labelers (actor_id, cid, created_at, status) 47 - SELECT $1, $2, NOW(), 'stub'::labeler_status 48 - WHERE NOT EXISTS (SELECT 1 FROM existing) 49 - ON CONFLICT (actor_id) DO NOTHING", 41 + "UPDATE actors 42 + SET labeler_cid = $2, 43 + labeler_created_at = NOW(), 44 + labeler_status = 'stub'::labeler_status, 45 + labeler_like_count = 0 46 + WHERE id = $1 AND labeler_cid IS NULL", 50 47 &[&actor_id, &cid_digest], 51 48 ) 52 49 .await?; ··· 108 105 109 106 pub async fn labeler_delete<C: GenericClient>(conn: &C, actor_id: i32) -> Result<u64> { 110 107 // Labeler records always use rkey "self", so no rkey parameter needed 108 + // Now sets labeler_* columns to NULL instead of deleting from separate table 111 109 112 110 conn.execute( 113 - "DELETE FROM labelers 114 - WHERE actor_id = $1", 111 + "UPDATE actors 112 + SET labeler_cid = NULL, 113 + labeler_created_at = NULL, 114 + labeler_reasons = NULL, 115 + labeler_subject_types = NULL, 116 + labeler_subject_collections = NULL, 117 + labeler_status = NULL, 118 + labeler_like_count = NULL, 119 + labeler_defs = NULL 120 + WHERE id = $1", 115 121 &[&actor_id], 116 122 ) 117 123 .await ··· 122 128 /// 123 129 /// This is called after bulk inserting labeler_likes to update the aggregate counts. 124 130 /// Uses a single UPDATE statement with aggregation for efficiency. 131 + /// Now updates actors.labeler_like_count instead of labelers.like_count 125 132 pub async fn increment_labeler_like_counts<C: GenericClient>( 126 133 conn: &C, 127 134 labeler_actor_ids: &[i32], ··· 131 138 } 132 139 133 140 conn.execute( 134 - "UPDATE labelers 135 - SET like_count = like_count + counts.count 141 + "UPDATE actors 142 + SET labeler_like_count = COALESCE(labeler_like_count, 0) + counts.count 136 143 FROM ( 137 144 SELECT actor_id, COUNT(*) as count 138 145 FROM unnest($1::int[]) as actor_id 139 146 GROUP BY actor_id 140 147 ) AS counts 141 - WHERE labelers.actor_id = counts.actor_id", 148 + WHERE actors.id = counts.actor_id", 142 149 &[&labeler_actor_ids], 143 150 ) 144 151 .await ··· 148 155 /// Decrement like_count for a single labeler 149 156 /// 150 157 /// This is called when deleting a labeler_like record. 158 + /// Now updates actors.labeler_like_count instead of labelers.like_count 151 159 pub async fn decrement_labeler_like_count<C: GenericClient>( 152 160 conn: &C, 153 161 labeler_actor_id: i32, 154 162 ) -> Result<u64> { 155 163 conn.execute( 156 - "UPDATE labelers 157 - SET like_count = GREATEST(like_count - 1, 0) 158 - WHERE actor_id = $1", 164 + "UPDATE actors 165 + SET labeler_like_count = GREATEST(COALESCE(labeler_like_count, 0) - 1, 0) 166 + WHERE id = $1", 159 167 &[&labeler_actor_id], 160 168 ) 161 169 .await
+12 -14
consumer/src/db/sql/label_service_upsert.sql
··· 1 - -- Insert/update labeler service with self-contained schema (no records table) 1 + -- Insert/update labeler service on actors table (denormalized) 2 2 -- Parameters: $1=actor_id, $2=cid(bytea), $3=reasons, $4=subject_types, $5=subject_collections 3 3 -- NOTE: actor_id is provided by dispatcher after ensuring actor exists 4 - INSERT INTO labelers (actor_id, cid, reasons, subject_types, subject_collections) 5 - SELECT 6 - $1, -- actor_id (provided by dispatcher) 7 - $2::bytea, -- cid (embedded) 8 - $3::text[]::reason_type[], 9 - $4::text[]::subject_type[], 10 - $5 11 - ON CONFLICT (actor_id) DO UPDATE SET 12 - cid=EXCLUDED.cid, 13 - reasons=EXCLUDED.reasons, 14 - subject_types=EXCLUDED.subject_types, 15 - subject_collections=EXCLUDED.subject_collections, 16 - status='complete'::labeler_status 4 + -- Sets labeler_* columns on actors table instead of separate labelers table 5 + UPDATE actors 6 + SET 7 + labeler_cid = $2::bytea, 8 + labeler_created_at = COALESCE(labeler_created_at, NOW()), 9 + labeler_reasons = $3::text[]::reason_type[], 10 + labeler_subject_types = $4::text[]::subject_type[], 11 + labeler_subject_collections = $5, 12 + labeler_status = 'complete'::labeler_status, 13 + labeler_like_count = COALESCE(labeler_like_count, 0) 14 + WHERE id = $1
+70
migrations/2025-12-08-004705_denormalize_labelers/down.sql
··· 1 + -- Revert Phase 2: Restore labelers tables from actors 2 + 3 + -- Recreate labelers table 4 + CREATE TABLE labelers ( 5 + actor_id INTEGER PRIMARY KEY, 6 + cid BYTEA NOT NULL, 7 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 8 + reasons reason_type[], 9 + subject_types subject_type[], 10 + subject_collections TEXT[], 11 + status labeler_status NOT NULL DEFAULT 'complete', 12 + like_count INTEGER NOT NULL DEFAULT 0 13 + ); 14 + 15 + CREATE INDEX idx_labelers_like_count_desc ON labelers(like_count DESC); 16 + 17 + -- Recreate labeler_defs table 18 + CREATE TABLE labeler_defs ( 19 + labeler_actor_id INTEGER NOT NULL, 20 + label_identifier TEXT NOT NULL, 21 + severity label_severity, 22 + blurs label_blurs, 23 + default_setting label_default_setting, 24 + adult_only BOOLEAN NOT NULL DEFAULT FALSE, 25 + locales JSONB, 26 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 27 + PRIMARY KEY (labeler_actor_id, label_identifier) 28 + ); 29 + 30 + -- Recreate labeler_likes table 31 + CREATE TABLE labeler_likes ( 32 + actor_id INTEGER NOT NULL, 33 + rkey BIGINT NOT NULL, 34 + labeler_actor_id INTEGER NOT NULL, 35 + PRIMARY KEY (actor_id, rkey) 36 + ); 37 + 38 + CREATE INDEX idx_labeler_likes_labeler ON labeler_likes(labeler_actor_id); 39 + CREATE INDEX idx_labeler_likes_rkey ON labeler_likes(rkey); 40 + 41 + -- Restore data from actors table 42 + INSERT INTO labelers (actor_id, cid, created_at, reasons, subject_types, subject_collections, status, like_count) 43 + SELECT id, labeler_cid, labeler_created_at, labeler_reasons, labeler_subject_types, labeler_subject_collections, labeler_status, labeler_like_count 44 + FROM actors 45 + WHERE labeler_cid IS NOT NULL; 46 + 47 + -- Restore labeler_defs from array 48 + INSERT INTO labeler_defs (labeler_actor_id, label_identifier, severity, blurs, default_setting, adult_only, locales, created_at) 49 + SELECT 50 + a.id, 51 + (def).label_identifier, 52 + (def).severity, 53 + (def).blurs, 54 + (def).default_setting, 55 + (def).adult_only, 56 + (def).locales, 57 + (def).created_at 58 + FROM actors a, unnest(a.labeler_defs) AS def 59 + WHERE a.labeler_defs IS NOT NULL; 60 + 61 + -- Drop labeler columns from actors 62 + ALTER TABLE actors 63 + DROP COLUMN labeler_defs, 64 + DROP COLUMN labeler_like_count, 65 + DROP COLUMN labeler_status, 66 + DROP COLUMN labeler_subject_collections, 67 + DROP COLUMN labeler_subject_types, 68 + DROP COLUMN labeler_reasons, 69 + DROP COLUMN labeler_created_at, 70 + DROP COLUMN labeler_cid;
+53
migrations/2025-12-08-004705_denormalize_labelers/up.sql
··· 1 + -- Phase 2: Denormalize labelers into actors table 2 + -- Move labeler data from separate tables into actors table with composite type arrays 3 + 4 + -- Add labeler columns to actors (flatten 1:1 relationship) 5 + ALTER TABLE actors 6 + ADD COLUMN labeler_cid BYTEA, 7 + ADD COLUMN labeler_created_at TIMESTAMPTZ, 8 + ADD COLUMN labeler_reasons reason_type[], 9 + ADD COLUMN labeler_subject_types subject_type[], 10 + ADD COLUMN labeler_subject_collections TEXT[], 11 + ADD COLUMN labeler_status labeler_status, 12 + ADD COLUMN labeler_like_count INTEGER DEFAULT 0; 13 + 14 + -- Add labeler_defs array (composite type) 15 + ALTER TABLE actors 16 + ADD COLUMN labeler_defs labeler_def_record[]; 17 + 18 + -- Backfill labeler data from labelers table 19 + UPDATE actors a 20 + SET 21 + labeler_cid = l.cid, 22 + labeler_created_at = l.created_at, 23 + labeler_reasons = l.reasons, 24 + labeler_subject_types = l.subject_types, 25 + labeler_subject_collections = l.subject_collections, 26 + labeler_status = l.status, 27 + labeler_like_count = l.like_count 28 + FROM labelers l 29 + WHERE a.id = l.actor_id; 30 + 31 + -- Backfill labeler_defs array (using composite type) 32 + UPDATE actors a 33 + SET labeler_defs = ( 34 + SELECT ARRAY_AGG( 35 + ROW( 36 + ld.label_identifier, 37 + ld.severity, 38 + ld.blurs, 39 + ld.default_setting, 40 + ld.adult_only, 41 + ld.locales, 42 + ld.created_at 43 + )::labeler_def_record 44 + ORDER BY ld.created_at 45 + ) 46 + FROM labeler_defs ld 47 + WHERE ld.labeler_actor_id = a.id 48 + ); 49 + 50 + -- Drop old tables (labeler_likes will be handled later if needed for arrays) 51 + DROP TABLE labeler_defs; 52 + DROP TABLE labeler_likes; 53 + DROP TABLE labelers;
+50 -2
parakeet-db/src/composite_types.rs
··· 25 25 26 26 use crate::schema::sql_types::{ 27 27 PostExtEmbed, PostVideoEmbed, PostImageEmbed, PostFacetEmbed, PostVideoCaption, 28 - FollowRecord, MuteRecord, BlockRecord, BookmarkRecord, ThreadMuteRecord, 29 - ListMuteRecord, ListBlockRecord, LabelerDefRecord, PostLabel, ActorLabel, 28 + LabelerDefRecord, 29 + // Note: Other composite types will be added when used in Phase 3-7: 30 + // FollowRecord, MuteRecord, BlockRecord, BookmarkRecord, ThreadMuteRecord, 31 + // ListMuteRecord, ListBlockRecord, PostLabel, ActorLabel, 30 32 }; 33 + 34 + // Placeholder SQL types for composite types not yet used in tables 35 + // These will be auto-generated by diesel once the columns are added 36 + #[allow(dead_code)] 37 + mod placeholder_sql_types { 38 + use diesel::query_builder::QueryId; 39 + use diesel::sql_types::SqlType; 40 + 41 + #[derive(QueryId, SqlType)] 42 + #[diesel(postgres_type(name = "follow_record"))] 43 + pub struct FollowRecord; 44 + 45 + #[derive(QueryId, SqlType)] 46 + #[diesel(postgres_type(name = "mute_record"))] 47 + pub struct MuteRecord; 48 + 49 + #[derive(QueryId, SqlType)] 50 + #[diesel(postgres_type(name = "block_record"))] 51 + pub struct BlockRecord; 52 + 53 + #[derive(QueryId, SqlType)] 54 + #[diesel(postgres_type(name = "bookmark_record"))] 55 + pub struct BookmarkRecord; 56 + 57 + #[derive(QueryId, SqlType)] 58 + #[diesel(postgres_type(name = "thread_mute_record"))] 59 + pub struct ThreadMuteRecord; 60 + 61 + #[derive(QueryId, SqlType)] 62 + #[diesel(postgres_type(name = "list_mute_record"))] 63 + pub struct ListMuteRecord; 64 + 65 + #[derive(QueryId, SqlType)] 66 + #[diesel(postgres_type(name = "list_block_record"))] 67 + pub struct ListBlockRecord; 68 + 69 + #[derive(QueryId, SqlType)] 70 + #[diesel(postgres_type(name = "post_label"))] 71 + pub struct PostLabel; 72 + 73 + #[derive(QueryId, SqlType)] 74 + #[diesel(postgres_type(name = "actor_label"))] 75 + pub struct ActorLabel; 76 + } 77 + 78 + use placeholder_sql_types::*; 31 79 use crate::types::{ 32 80 ImageMimeType, VideoMimeType, FacetType, CaptionMimeType, LanguageCode, 33 81 LabelSeverity, LabelBlurs, LabelDefaultSetting,
+18 -42
parakeet-db/src/models.rs
··· 34 34 // 35 35 // ============================================================================= 36 36 37 + use crate::composite_types::LabelerDef; 37 38 use crate::tid_util::{decode_tid, encode_tid, TidError}; 38 39 use crate::types::*; 39 40 use chrono::prelude::*; ··· 151 152 pub lists_count: Option<i16>, 152 153 pub feeds_count: Option<i16>, 153 154 pub starterpacks_count: Option<i16>, 155 + // Labeler fields (from labelers table, denormalized - NULL for non-labelers) 156 + pub labeler_cid: Option<Vec<u8>>, 157 + pub labeler_created_at: Option<DateTime<Utc>>, 158 + pub labeler_reasons: Option<Vec<Option<ReasonType>>>, 159 + pub labeler_subject_types: Option<Vec<Option<SubjectType>>>, 160 + pub labeler_subject_collections: Option<Vec<Option<String>>>, 161 + pub labeler_status: Option<LabelerStatus>, 162 + pub labeler_like_count: Option<i32>, 163 + pub labeler_defs: Option<Vec<Option<LabelerDef>>>, 154 164 } 155 165 156 166 // AllowlistEntry model removed - allowlist table dropped in favor of actors.sync_state ··· 414 424 // Note: created_at derived from TID rkey via created_at() method 415 425 } 416 426 417 - // Labeler Likes (rare, ~0.01% of likes) 418 - #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 419 - #[diesel(table_name = crate::schema::labeler_likes)] 420 - #[diesel(primary_key(actor_id, rkey))] 421 - #[diesel(check_for_backend(diesel::pg::Pg))] 422 - pub struct LabelerLike { 423 - pub actor_id: i32, // PK: FK to actors (liker) 424 - pub rkey: i64, // PK: TID as INT8 425 - pub labeler_actor_id: i32, // FK to actors (labeler service) 426 - // Note: created_at derived from TID rkey via created_at() method 427 - } 427 + // Note: Labeler Likes table dropped - labeler data moved to actors table 428 + // The labeler_like_count is maintained on actors.labeler_like_count 429 + // Individual like records (labeler_likes table) were dropped for simplicity 428 430 429 431 #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 430 432 #[diesel(table_name = crate::schema::reposts)] ··· 643 645 // MODERATION & LABELS 644 646 // ============================================================================= 645 647 646 - #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 647 - #[diesel(table_name = crate::schema::labelers)] 648 - #[diesel(primary_key(actor_id))] 649 - #[diesel(check_for_backend(diesel::pg::Pg))] 650 - pub struct Labeler { 651 - pub actor_id: i32, // PK: FK to actors (no rkey, always 'self') 652 - pub cid: Vec<u8>, // 32-byte CID digest 653 - pub created_at: DateTime<Utc>, // From AT Protocol record 654 - pub reasons: Option<array_helpers::ReasonTypeArray>, // ENUM array: spam | violation | misleading | etc. 655 - pub subject_types: Option<array_helpers::SubjectTypeArray>, // ENUM array: account | record | chat 656 - pub subject_collections: Option<array_helpers::TextArray>, // Collection names (as TEXT since generic) 657 - pub status: LabelerStatus, // ENUM: complete | stub | deleted 658 - pub like_count: i32, // Aggregated count of likes (maintained by database_writer) 659 - } 660 - 661 - #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable)] 662 - #[diesel(table_name = crate::schema::labeler_defs)] 663 - #[diesel(primary_key(labeler_actor_id, label_identifier))] 664 - #[diesel(check_for_backend(diesel::pg::Pg))] 665 - pub struct LabelerDef { 666 - pub labeler_actor_id: i32, // PK: FK to actors 667 - pub label_identifier: String, // PK: Label name 668 - pub severity: Option<LabelSeverity>, // ENUM: inform | alert | none 669 - pub blurs: Option<LabelBlurs>, // ENUM: content | media | none 670 - pub default_setting: Option<LabelDefaultSetting>, // ENUM: ignore | warn | hide 671 - pub adult_only: bool, 672 - pub locales: Option<serde_json::Value>, 673 - pub created_at: DateTime<Utc>, 674 - } 648 + // Note: Labeler and LabelerDef structs removed 649 + // Labeler data is now stored directly on actors table with labeler_* columns 650 + // LabelerDef is now a composite type in composite_types.rs, stored as labeler_defs array on actors 675 651 676 652 #[derive(Clone, Debug)] 677 653 pub struct Label { ··· 825 801 826 802 impl_tid_rkey!(Post); 827 803 impl_tid_rkey!(FeedgenLike); 828 - impl_tid_rkey!(LabelerLike); 804 + // impl_tid_rkey!(LabelerLike); // Removed - labeler_likes table dropped 829 805 impl_tid_rkey!(Repost); 830 806 impl_tid_rkey!(Follow); 831 807 impl_tid_rkey!(Block); ··· 844 820 845 821 impl_tid_created_at!(Post); 846 822 impl_tid_created_at!(FeedgenLike); 847 - impl_tid_created_at!(LabelerLike); 823 + // impl_tid_created_at!(LabelerLike); // Removed - labeler_likes table dropped 848 824 impl_tid_created_at!(Repost); 849 825 impl_tid_created_at!(Follow); 850 826 impl_tid_created_at!(Block);
+28 -51
parakeet-db/src/schema.rs
··· 50 50 pub struct LabelSeverity; 51 51 52 52 #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] 53 + #[diesel(postgres_type(name = "labeler_def_record"))] 54 + pub struct LabelerDefRecord; 55 + 56 + #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] 53 57 #[diesel(postgres_type(name = "labeler_status"))] 54 58 pub struct LabelerStatus; 55 59 ··· 131 135 pub struct ListBlockRecord; 132 136 133 137 #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] 134 - #[diesel(postgres_type(name = "labeler_def_record"))] 135 - pub struct LabelerDefRecord; 136 - 137 - #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] 138 138 #[diesel(postgres_type(name = "post_label"))] 139 139 pub struct PostLabel; 140 140 ··· 204 204 use super::sql_types::ImageMimeType; 205 205 use super::sql_types::ChatAllowIncoming; 206 206 use super::sql_types::NotifAllowSubscriptions; 207 + use super::sql_types::ReasonType; 208 + use super::sql_types::SubjectType; 209 + use super::sql_types::LabelerStatus; 210 + use super::sql_types::LabelerDefRecord; 207 211 208 212 actors (id) { 209 213 id -> Int4, ··· 246 250 lists_count -> Nullable<Int2>, 247 251 feeds_count -> Nullable<Int2>, 248 252 starterpacks_count -> Nullable<Int2>, 253 + labeler_cid -> Nullable<Bytea>, 254 + labeler_created_at -> Nullable<Timestamptz>, 255 + labeler_reasons -> Nullable<Array<Nullable<ReasonType>>>, 256 + labeler_subject_types -> Nullable<Array<Nullable<SubjectType>>>, 257 + labeler_subject_collections -> Nullable<Array<Nullable<Text>>>, 258 + labeler_status -> Nullable<LabelerStatus>, 259 + labeler_like_count -> Nullable<Int4>, 260 + labeler_defs -> Nullable<Array<Nullable<LabelerDefRecord>>>, 249 261 } 250 262 } 251 263 ··· 368 380 } 369 381 370 382 diesel::table! { 371 - use diesel::sql_types::*; 372 - use super::sql_types::LabelSeverity; 373 - use super::sql_types::LabelBlurs; 374 - use super::sql_types::LabelDefaultSetting; 375 - 376 - labeler_defs (labeler_actor_id, label_identifier) { 377 - labeler_actor_id -> Int4, 378 - label_identifier -> Text, 379 - severity -> Nullable<LabelSeverity>, 380 - blurs -> Nullable<LabelBlurs>, 381 - default_setting -> Nullable<LabelDefaultSetting>, 382 - adult_only -> Bool, 383 - locales -> Nullable<Jsonb>, 384 - created_at -> Timestamptz, 385 - } 386 - } 387 - 388 - diesel::table! { 389 - labeler_likes (actor_id, rkey) { 390 - actor_id -> Int4, 391 - rkey -> Int8, 392 - labeler_actor_id -> Int4, 393 - } 394 - } 395 - 396 - diesel::table! { 397 - use diesel::sql_types::*; 398 - use super::sql_types::ReasonType; 399 - use super::sql_types::SubjectType; 400 - use super::sql_types::LabelerStatus; 401 - 402 - labelers (actor_id) { 403 - actor_id -> Int4, 404 - cid -> Bytea, 405 - created_at -> Timestamptz, 406 - reasons -> Nullable<Array<Nullable<ReasonType>>>, 407 - subject_types -> Nullable<Array<Nullable<SubjectType>>>, 408 - subject_collections -> Nullable<Array<Nullable<Text>>>, 409 - status -> LabelerStatus, 410 - like_count -> Int4, 411 - } 412 - } 413 - 414 - diesel::table! { 415 383 labels (labeler_actor_id, label, uri) { 416 384 labeler_actor_id -> Int4, 417 385 label -> Text, ··· 511 479 stat_type -> PostStatType, 512 480 value -> Int4, 513 481 updated_at -> Timestamptz, 482 + } 483 + } 484 + 485 + diesel::table! { 486 + post_likes (actor_id, rkey) { 487 + actor_id -> Int4, 488 + rkey -> Int8, 489 + post_actor_id -> Int4, 490 + post_rkey -> Int8, 491 + via_repost_actor_id -> Nullable<Int4>, 492 + via_repost_rkey -> Nullable<Int8>, 514 493 } 515 494 } 516 495 ··· 686 665 follows, 687 666 handle_resolution_queue, 688 667 jetstream_cursors, 689 - labeler_defs, 690 - labeler_likes, 691 - labelers, 692 668 labels, 693 669 list_blocks, 694 670 list_items, ··· 697 673 mutes, 698 674 notifications, 699 675 post_aggregate_stats, 676 + post_likes, 700 677 posts, 701 678 reposts, 702 679 spatial_ref_sys,
+7 -5
parakeet/src/hydration/labeler.rs
··· 36 36 37 37 fn build_view_detailed( 38 38 enriched: EnrichedLabeler, 39 - defs: Vec<models::LabelerDef>, 39 + defs: Vec<parakeet_db::composite_types::LabelerDef>, 40 40 creator: ProfileView, 41 41 labels: Vec<models::Label>, 42 42 viewer: Option<LabelerViewerState>, 43 43 likes: Option<i32>, 44 44 ) -> LabelerViewDetailed { 45 - let reason_types = enriched.labeler.reasons.map(|v| { 45 + let reason_types = enriched.reasons.map(|v| { 46 46 v.iter() 47 + .flatten() 47 48 .filter_map(|v| ReasonType::from_str(&v.to_string()).ok()) 48 49 .collect() 49 50 }); ··· 73 74 }) 74 75 }) 75 76 .collect(); 76 - let subject_types = enriched.labeler.subject_types.map(|v| { 77 + let subject_types = enriched.subject_types.map(|v| { 77 78 v.iter() 79 + .flatten() 78 80 .filter_map(|v| SubjectType::from_str(&v.to_string()).ok()) 79 81 .collect() 80 82 }); 81 - let subject_collections = enriched.labeler.subject_collections.map(|v| { 82 - v.iter().map(|rt| rt.to_string()).collect() 83 + let subject_collections = enriched.subject_collections.map(|v| { 84 + v.iter().flatten().map(|rt| rt.to_string()).collect() 83 85 }); 84 86 85 87 LabelerViewDetailed {
+46 -89
parakeet/src/loaders/labeler.rs
··· 7 7 use parakeet_db::{models, schema}; 8 8 use std::collections::HashMap; 9 9 10 - /// Build SQL query for loading labeler record metadata 10 + /// Build SQL query for loading labeler record metadata from actors table 11 11 /// 12 12 /// This function is public for testing purposes. 13 13 pub fn build_labeler_records_query(actor_ids_str: &str) -> String { 14 14 format!( 15 - "SELECT actor_id, cid, created_at, like_count 16 - FROM labelers 17 - WHERE actor_id IN ({})", 15 + "SELECT id as actor_id, labeler_cid as cid, labeler_created_at as created_at, labeler_like_count as like_count 16 + FROM actors 17 + WHERE id IN ({}) AND labeler_cid IS NOT NULL", 18 18 actor_ids_str 19 19 ) 20 20 } ··· 65 65 ORDER BY l.created_at" 66 66 } 67 67 68 - // Enriched Labeler with reconstructed fields 68 + // Enriched Labeler with reconstructed fields from Actor 69 + // Note: Labeler data is now stored directly on actors table with labeler_* columns 69 70 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] 70 71 pub struct EnrichedLabeler { 71 - pub labeler: models::Labeler, 72 + pub actor_id: i32, 72 73 pub did: String, 73 74 pub cid: Vec<u8>, 74 75 pub created_at: chrono::DateTime<chrono::Utc>, 75 - pub like_count: i32, // Count of likes from labeler_likes table 76 + pub reasons: Option<Vec<Option<parakeet_db::types::ReasonType>>>, 77 + pub subject_types: Option<Vec<Option<parakeet_db::types::SubjectType>>>, 78 + pub subject_collections: Option<Vec<Option<String>>>, 79 + pub status: parakeet_db::types::LabelerStatus, 80 + pub like_count: i32, 76 81 } 77 82 78 83 pub struct LabelServiceLoader(pub(super) Pool<AsyncPgConnection>); 79 - pub type LabelServiceLoaderRet = (EnrichedLabeler, Vec<models::LabelerDef>); 84 + pub type LabelServiceLoaderRet = (EnrichedLabeler, Vec<parakeet_db::composite_types::LabelerDef>); 80 85 impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader { 81 86 async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> { 82 87 let mut conn = self.0.get().await.unwrap(); 83 88 84 - // Load labelers using Diesel DSL (loads from labelers table directly) 85 - let labelers: Vec<models::Labeler> = diesel_async::RunQueryDsl::load( 86 - schema::labelers::table 87 - .inner_join(schema::actors::table.on(schema::labelers::actor_id.eq(schema::actors::id))) 89 + // Load labelers from actors table (actors with labeler_cid IS NOT NULL) 90 + let actors: Vec<models::Actor> = diesel_async::RunQueryDsl::load( 91 + schema::actors::table 88 92 .filter(schema::actors::did.eq_any(keys)) 89 - .filter(schema::labelers::status.eq(parakeet_db::types::LabelerStatus::Complete)) 90 - .select(models::Labeler::as_select()), 93 + .filter(schema::actors::labeler_cid.is_not_null()) 94 + .filter(schema::actors::labeler_status.eq(parakeet_db::types::LabelerStatus::Complete)) 95 + .select(models::Actor::as_select()), 91 96 &mut conn, 92 97 ) 93 98 .await ··· 96 101 vec![] 97 102 }); 98 103 99 - if labelers.is_empty() { 104 + if actors.is_empty() { 100 105 return HashMap::new(); 101 106 } 102 107 103 - // Get actor IDs for loading other data 104 - let labeler_actor_ids: Vec<i32> = labelers.iter().map(|l| l.actor_id).collect(); 105 - 106 - // Load DIDs for these labelers 107 - let actors: Vec<(i32, String)> = diesel_async::RunQueryDsl::load( 108 - schema::actors::table 109 - .filter(schema::actors::id.eq_any(&labeler_actor_ids)) 110 - .select((schema::actors::id, schema::actors::did)), 111 - &mut conn, 112 - ) 113 - .await 114 - .unwrap_or_default(); 115 - let did_by_actor: HashMap<i32, String> = actors.into_iter().collect(); 116 - 117 - // Load record metadata (cid, created_at) for these labelers 118 - // Using raw SQL because RecordType doesn't implement Clone 119 - use diesel::sql_types::{Binary, Integer, Timestamptz}; 120 - 121 - let actor_ids_str = labeler_actor_ids 122 - .iter() 123 - .map(|id| id.to_string()) 124 - .collect::<Vec<_>>() 125 - .join(","); 126 - 127 - let query = build_labeler_records_query(&actor_ids_str); 128 - 129 - #[derive(diesel::QueryableByName)] 130 - struct RecordRow { 131 - #[diesel(sql_type = Integer)] 132 - actor_id: i32, 133 - #[diesel(sql_type = Binary)] 134 - cid: Vec<u8>, 135 - #[diesel(sql_type = Timestamptz)] 136 - created_at: chrono::DateTime<chrono::Utc>, 137 - #[diesel(sql_type = Integer)] 138 - like_count: i32, 139 - } 140 - 141 - let records: Vec<RecordRow> = diesel_async::RunQueryDsl::load( 142 - diesel::sql_query(query), 143 - &mut conn, 144 - ) 145 - .await 146 - .unwrap_or_default(); 147 - let record_by_actor: HashMap<i32, (Vec<u8>, chrono::DateTime<chrono::Utc>, i32)> = 148 - records.into_iter().map(|row| (row.actor_id, (row.cid, row.created_at, row.like_count))).collect(); 149 - 150 - // Load label definitions 151 - let defs: Vec<models::LabelerDef> = diesel_async::RunQueryDsl::load( 152 - schema::labeler_defs::table 153 - .filter(schema::labeler_defs::labeler_actor_id.eq_any(&labeler_actor_ids)), 154 - &mut conn, 155 - ) 156 - .await 157 - .unwrap_or_default(); 158 - 159 - // Group definitions by labeler_actor_id 160 - let mut defs_by_actor: HashMap<i32, Vec<models::LabelerDef>> = HashMap::new(); 161 - for def in defs { 162 - defs_by_actor.entry(def.labeler_actor_id).or_default().push(def); 163 - } 164 - 165 108 // Build result map: DID -> (EnrichedLabeler, Vec<LabelerDef>) 166 - labelers 109 + actors 167 110 .into_iter() 168 - .filter_map(|labeler| { 169 - let actor_id = labeler.actor_id; 170 - let did = did_by_actor.get(&actor_id)?.clone(); 171 - let (cid, created_at, like_count) = record_by_actor.get(&actor_id)?; 172 - let defs = defs_by_actor.remove(&actor_id).unwrap_or_default(); 111 + .filter_map(|actor| { 112 + // Extract labeler fields (all should be present if labeler_cid IS NOT NULL) 113 + let cid = actor.labeler_cid?; 114 + let created_at = actor.labeler_created_at?; 115 + let status = actor.labeler_status?; 116 + let like_count = actor.labeler_like_count.unwrap_or(0); 117 + 118 + // Extract labeler_defs array and filter out NULLs 119 + let defs: Vec<parakeet_db::composite_types::LabelerDef> = actor 120 + .labeler_defs 121 + .unwrap_or_default() 122 + .into_iter() 123 + .flatten() 124 + .collect(); 173 125 174 126 let enriched = EnrichedLabeler { 175 - labeler, 176 - did: did.clone(), 177 - cid: cid.clone(), 178 - created_at: *created_at, 179 - like_count: *like_count, 127 + actor_id: actor.id, 128 + did: actor.did.clone(), 129 + cid, 130 + created_at, 131 + reasons: actor.labeler_reasons, 132 + subject_types: actor.labeler_subject_types, 133 + subject_collections: actor.labeler_subject_collections, 134 + status, 135 + like_count, 180 136 }; 181 - Some((did, (enriched, defs))) 137 + 138 + Some((actor.did, (enriched, defs))) 182 139 }) 183 140 .collect() 184 141 }