Rust AppView - highly experimental!

feat: get labeler/feedgen stats

Changed files
+513 -93
consumer
src
database_writer
db
operations
tests
migrations
2025-11-27-143034_add_like_counts_to_feedgens_labelers
parakeet
src
parakeet-db
+10 -4
consumer/src/database_writer/workers.rs
··· 88 88 tracing::info!(inserted = inserted.len(), "Bulk COPY feedgen_likes completed"); 89 89 total_inserted += inserted.len(); 90 90 91 - // Generate deltas for feedgen likes (currently not tracked in aggregates) 92 - // Future: Could add FeedGen aggregate target 91 + // Update feedgen like_count aggregates 92 + let feedgen_ids: Vec<i64> = inserted.iter().map(|like| like.feedgen_id).collect(); 93 + if let Err(e) = crate::db::operations::increment_feedgen_like_counts(&txn, &feedgen_ids).await { 94 + tracing::error!(error = ?e, "Failed to update feedgen like_count aggregates"); 95 + } 93 96 } 94 97 Err(e) => { 95 98 tracing::error!(error = ?e, "Failed to bulk COPY feedgen_likes"); ··· 104 107 tracing::info!(inserted = inserted.len(), "Bulk COPY labeler_likes completed"); 105 108 total_inserted += inserted.len(); 106 109 107 - // Generate deltas for labeler likes (currently not tracked in aggregates) 108 - // Future: Could add Labeler aggregate target 110 + // Update labeler like_count aggregates 111 + let labeler_actor_ids: Vec<i32> = inserted.iter().map(|like| like.labeler_actor_id).collect(); 112 + if let Err(e) = crate::db::operations::increment_labeler_like_counts(&txn, &labeler_actor_ids).await { 113 + tracing::error!(error = ?e, "Failed to update labeler like_count aggregates"); 114 + } 109 115 } 110 116 Err(e) => { 111 117 tracing::error!(error = ?e, "Failed to bulk COPY labeler_likes");
+1 -1
consumer/src/db/operations/feed.rs
··· 28 28 mod feedgen; 29 29 30 30 // Re-export all public functions for external use 31 - pub use feedgen::{feedgen_delete, feedgen_upsert}; 31 + pub use feedgen::{feedgen_delete, feedgen_upsert, increment_feedgen_like_counts, decrement_feedgen_like_count}; 32 32 pub use helpers::{ensure_list_id, get_actor_id}; 33 33 pub use like::{like_delete, like_insert}; 34 34 pub use post::{post_delete, post_insert};
+42
consumer/src/db/operations/feed/feedgen.rs
··· 101 101 .await 102 102 .wrap_err_with(|| format!("Failed to delete feedgen for actor_id:{} rkey:{}", actor_id, rkey)) 103 103 } 104 + 105 + /// Increment like_count for multiple feedgens (bulk operation) 106 + /// 107 + /// This is called after bulk inserting feedgen_likes to update the aggregate counts. 108 + /// Uses a single UPDATE statement with unnest for efficiency. 109 + pub async fn increment_feedgen_like_counts<C: GenericClient>( 110 + conn: &C, 111 + feedgen_ids: &[i64], 112 + ) -> Result<u64> { 113 + if feedgen_ids.is_empty() { 114 + return Ok(0); 115 + } 116 + 117 + conn.execute( 118 + "UPDATE feedgens 119 + SET like_count = like_count + counts.count 120 + FROM ( 121 + SELECT unnest($1::bigint[]) as id, unnest($2::int[]) as count 122 + ) AS counts 123 + WHERE feedgens.id = counts.id", 124 + &[&feedgen_ids, &vec![1i32; feedgen_ids.len()]], 125 + ) 126 + .await 127 + .wrap_err("Failed to increment feedgen like_count aggregates") 128 + } 129 + 130 + /// Decrement like_count for a single feedgen 131 + /// 132 + /// This is called when deleting a feedgen_like record. 133 + pub async fn decrement_feedgen_like_count<C: GenericClient>( 134 + conn: &C, 135 + feedgen_id: i64, 136 + ) -> Result<u64> { 137 + conn.execute( 138 + "UPDATE feedgens 139 + SET like_count = GREATEST(like_count - 1, 0) 140 + WHERE id = $1", 141 + &[&feedgen_id], 142 + ) 143 + .await 144 + .wrap_err("Failed to decrement feedgen like_count") 145 + }
+42
consumer/src/db/operations/labeler.rs
··· 119 119 .await 120 120 .wrap_err_with(|| format!("Failed to delete labeler for actor_id:{}", actor_id)) 121 121 } 122 + 123 + /// Increment like_count for multiple labelers (bulk operation) 124 + /// 125 + /// This is called after bulk inserting labeler_likes to update the aggregate counts. 126 + /// Uses a single UPDATE statement with unnest for efficiency. 127 + pub async fn increment_labeler_like_counts<C: GenericClient>( 128 + conn: &C, 129 + labeler_actor_ids: &[i32], 130 + ) -> Result<u64> { 131 + if labeler_actor_ids.is_empty() { 132 + return Ok(0); 133 + } 134 + 135 + conn.execute( 136 + "UPDATE labelers 137 + SET like_count = like_count + counts.count 138 + FROM ( 139 + SELECT unnest($1::int[]) as actor_id, unnest($2::int[]) as count 140 + ) AS counts 141 + WHERE labelers.actor_id = counts.actor_id", 142 + &[&labeler_actor_ids, &vec![1i32; labeler_actor_ids.len()]], 143 + ) 144 + .await 145 + .wrap_err("Failed to increment labeler like_count aggregates") 146 + } 147 + 148 + /// Decrement like_count for a single labeler 149 + /// 150 + /// This is called when deleting a labeler_like record. 151 + pub async fn decrement_labeler_like_count<C: GenericClient>( 152 + conn: &C, 153 + labeler_actor_id: i32, 154 + ) -> Result<u64> { 155 + conn.execute( 156 + "UPDATE labelers 157 + SET like_count = GREATEST(like_count - 1, 0) 158 + WHERE actor_id = $1", 159 + &[&labeler_actor_id], 160 + ) 161 + .await 162 + .wrap_err("Failed to decrement labeler like_count") 163 + }
+352
consumer/tests/feedgen_labeler_operations_test.rs
··· 3 3 //! Tests coverage for: 4 4 //! - `feedgen_upsert` (insert and update paths) 5 5 //! - `feedgen_delete` 6 + //! - `increment_feedgen_like_counts` (bulk increment aggregates) 7 + //! - `decrement_feedgen_like_count` (single decrement with GREATEST protection) 6 8 //! - `labeler_upsert` (creates labeler and maintains label definitions) 7 9 //! - `labeler_delete` 10 + //! - `increment_labeler_like_counts` (bulk increment aggregates) 11 + //! - `decrement_labeler_like_count` (single decrement with GREATEST protection) 8 12 9 13 mod common; 10 14 ··· 624 628 tx.rollback().await.wrap_err("Failed to rollback")?; 625 629 Ok(()) 626 630 } 631 + 632 + // ======================================== 633 + // Feed Generator Like Count Tests 634 + // ======================================== 635 + 636 + #[tokio::test] 637 + async fn test_increment_feedgen_like_counts() -> eyre::Result<()> { 638 + let pool = test_pool(); 639 + let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 640 + let tx = conn 641 + .transaction() 642 + .await 643 + .wrap_err("Failed to start transaction")?; 644 + 645 + // Create actors 646 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:feedgenowner4") 647 + .await 648 + .wrap_err("Failed to ensure owner actor")?; 649 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:feedservice4") 650 + .await 651 + .wrap_err("Failed to ensure service actor")?; 652 + 653 + // Insert feedgen 654 + let feedgen = AppBskyFeedGenerator { 655 + did: "did:plc:feedservice4".to_string(), 656 + display_name: "Test Feed".to_string(), 657 + description: None, 658 + description_facets: None, 659 + avatar: None, 660 + accepts_interactions: None, 661 + labels: None, 662 + content_mode: None, 663 + created_at: Utc::now(), 664 + }; 665 + 666 + let (service_actor_id, _, _) = 667 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:feedservice4").await?; 668 + let (owner_actor_id, _, _) = 669 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:feedgenowner4").await?; 670 + let rkey = "testfeed"; 671 + feed::feedgen_upsert(&tx, owner_actor_id, rkey, test_cid(), service_actor_id, feedgen) 672 + .await 673 + .wrap_err("Failed to insert feedgen")?; 674 + 675 + // Get feedgen_id 676 + let feedgen_id: i64 = tx 677 + .query_one( 678 + "SELECT id FROM feedgens WHERE actor_id = $1 AND rkey = $2", 679 + &[&owner_actor_id, &rkey], 680 + ) 681 + .await 682 + .wrap_err("Failed to get feedgen_id")? 683 + .get(0); 684 + 685 + // Verify initial like_count is 0 686 + let initial_count: i32 = tx 687 + .query_one( 688 + "SELECT like_count FROM feedgens WHERE id = $1", 689 + &[&feedgen_id], 690 + ) 691 + .await 692 + .wrap_err("Failed to get initial like_count")? 693 + .get(0); 694 + assert_eq!(initial_count, 0, "Initial like_count should be 0"); 695 + 696 + // Increment like count (simulating 3 likes for the same feedgen) 697 + let result = feed::increment_feedgen_like_counts(&tx, &[feedgen_id, feedgen_id, feedgen_id]) 698 + .await 699 + .wrap_err("Failed to increment feedgen like_counts")?; 700 + assert_eq!(result, 1, "Should update 1 row (the feedgen itself)"); 701 + 702 + // Verify like_count was incremented 703 + let updated_count: i32 = tx 704 + .query_one( 705 + "SELECT like_count FROM feedgens WHERE id = $1", 706 + &[&feedgen_id], 707 + ) 708 + .await 709 + .wrap_err("Failed to get updated like_count")? 710 + .get(0); 711 + assert_eq!(updated_count, 3, "like_count should be incremented to 3"); 712 + 713 + tx.rollback().await.wrap_err("Failed to rollback")?; 714 + Ok(()) 715 + } 716 + 717 + #[tokio::test] 718 + async fn test_decrement_feedgen_like_count() -> eyre::Result<()> { 719 + let pool = test_pool(); 720 + let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 721 + let tx = conn 722 + .transaction() 723 + .await 724 + .wrap_err("Failed to start transaction")?; 725 + 726 + // Create actors 727 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:feedgenowner5") 728 + .await 729 + .wrap_err("Failed to ensure owner actor")?; 730 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:feedservice5") 731 + .await 732 + .wrap_err("Failed to ensure service actor")?; 733 + 734 + // Insert feedgen 735 + let feedgen = AppBskyFeedGenerator { 736 + did: "did:plc:feedservice5".to_string(), 737 + display_name: "Test Feed".to_string(), 738 + description: None, 739 + description_facets: None, 740 + avatar: None, 741 + accepts_interactions: None, 742 + labels: None, 743 + content_mode: None, 744 + created_at: Utc::now(), 745 + }; 746 + 747 + let (service_actor_id, _, _) = 748 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:feedservice5").await?; 749 + let (owner_actor_id, _, _) = 750 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:feedgenowner5").await?; 751 + let rkey = "testfeed"; 752 + feed::feedgen_upsert(&tx, owner_actor_id, rkey, test_cid(), service_actor_id, feedgen) 753 + .await 754 + .wrap_err("Failed to insert feedgen")?; 755 + 756 + // Get feedgen_id 757 + let feedgen_id: i64 = tx 758 + .query_one( 759 + "SELECT id FROM feedgens WHERE actor_id = $1 AND rkey = $2", 760 + &[&owner_actor_id, &rkey], 761 + ) 762 + .await 763 + .wrap_err("Failed to get feedgen_id")? 764 + .get(0); 765 + 766 + // Set like_count to 5 manually 767 + tx.execute( 768 + "UPDATE feedgens SET like_count = 5 WHERE id = $1", 769 + &[&feedgen_id], 770 + ) 771 + .await 772 + .wrap_err("Failed to set initial like_count")?; 773 + 774 + // Decrement like count 775 + let result = feed::decrement_feedgen_like_count(&tx, feedgen_id) 776 + .await 777 + .wrap_err("Failed to decrement feedgen like_count")?; 778 + assert_eq!(result, 1, "Should update 1 row"); 779 + 780 + // Verify like_count was decremented 781 + let updated_count: i32 = tx 782 + .query_one( 783 + "SELECT like_count FROM feedgens WHERE id = $1", 784 + &[&feedgen_id], 785 + ) 786 + .await 787 + .wrap_err("Failed to get updated like_count")? 788 + .get(0); 789 + assert_eq!(updated_count, 4, "like_count should be decremented to 4"); 790 + 791 + // Test that it doesn't go below 0 792 + for _ in 0..10 { 793 + feed::decrement_feedgen_like_count(&tx, feedgen_id) 794 + .await 795 + .wrap_err("Failed to decrement")?; 796 + } 797 + 798 + let final_count: i32 = tx 799 + .query_one( 800 + "SELECT like_count FROM feedgens WHERE id = $1", 801 + &[&feedgen_id], 802 + ) 803 + .await 804 + .wrap_err("Failed to get final like_count")? 805 + .get(0); 806 + assert_eq!( 807 + final_count, 0, 808 + "like_count should not go below 0 (GREATEST protection)" 809 + ); 810 + 811 + tx.rollback().await.wrap_err("Failed to rollback")?; 812 + Ok(()) 813 + } 814 + 815 + // ======================================== 816 + // Labeler Like Count Tests 817 + // ======================================== 818 + 819 + #[tokio::test] 820 + async fn test_increment_labeler_like_counts() -> eyre::Result<()> { 821 + let pool = test_pool(); 822 + let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 823 + let tx = conn 824 + .transaction() 825 + .await 826 + .wrap_err("Failed to start transaction")?; 827 + 828 + // Create actor 829 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:labeler4") 830 + .await 831 + .wrap_err("Failed to ensure labeler actor")?; 832 + 833 + // Insert labeler 834 + let labeler = AppBskyLabelerService { 835 + policies: LabelerPolicy { 836 + label_values: vec!["test".to_string()], 837 + label_value_definitions: vec![LabelValueDefinition { 838 + identifier: "test".to_string(), 839 + severity: Severity::Inform, 840 + blurs: Blurs::None, 841 + default_setting: None, 842 + adult_only: None, 843 + locales: vec![], 844 + }], 845 + }, 846 + labels: None, 847 + reason_types: None, 848 + subject_types: None, 849 + subject_collections: None, 850 + created_at: Utc::now(), 851 + }; 852 + 853 + let (actor_id, _, _) = 854 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:labeler4").await?; 855 + labeler::labeler_upsert(&tx, actor_id, "did:plc:labeler4", test_cid(), labeler) 856 + .await 857 + .wrap_err("Failed to insert labeler")?; 858 + 859 + // Verify initial like_count is 0 860 + let initial_count: i32 = tx 861 + .query_one( 862 + "SELECT like_count FROM labelers WHERE actor_id = $1", 863 + &[&actor_id], 864 + ) 865 + .await 866 + .wrap_err("Failed to get initial like_count")? 867 + .get(0); 868 + assert_eq!(initial_count, 0, "Initial like_count should be 0"); 869 + 870 + // Increment like count (simulating 2 likes for the same labeler) 871 + let result = labeler::increment_labeler_like_counts(&tx, &[actor_id, actor_id]) 872 + .await 873 + .wrap_err("Failed to increment labeler like_counts")?; 874 + assert_eq!(result, 1, "Should update 1 row (the labeler itself)"); 875 + 876 + // Verify like_count was incremented 877 + let updated_count: i32 = tx 878 + .query_one( 879 + "SELECT like_count FROM labelers WHERE actor_id = $1", 880 + &[&actor_id], 881 + ) 882 + .await 883 + .wrap_err("Failed to get updated like_count")? 884 + .get(0); 885 + assert_eq!(updated_count, 2, "like_count should be incremented to 2"); 886 + 887 + tx.rollback().await.wrap_err("Failed to rollback")?; 888 + Ok(()) 889 + } 890 + 891 + #[tokio::test] 892 + async fn test_decrement_labeler_like_count() -> eyre::Result<()> { 893 + let pool = test_pool(); 894 + let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 895 + let tx = conn 896 + .transaction() 897 + .await 898 + .wrap_err("Failed to start transaction")?; 899 + 900 + // Create actor 901 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:labeler5") 902 + .await 903 + .wrap_err("Failed to ensure labeler actor")?; 904 + 905 + // Insert labeler 906 + let labeler = AppBskyLabelerService { 907 + policies: LabelerPolicy { 908 + label_values: vec!["test".to_string()], 909 + label_value_definitions: vec![LabelValueDefinition { 910 + identifier: "test".to_string(), 911 + severity: Severity::Inform, 912 + blurs: Blurs::None, 913 + default_setting: None, 914 + adult_only: None, 915 + locales: vec![], 916 + }], 917 + }, 918 + labels: None, 919 + reason_types: None, 920 + subject_types: None, 921 + subject_collections: None, 922 + created_at: Utc::now(), 923 + }; 924 + 925 + let (actor_id, _, _) = 926 + consumer::db::operations::feed::get_actor_id(&tx, "did:plc:labeler5").await?; 927 + labeler::labeler_upsert(&tx, actor_id, "did:plc:labeler5", test_cid(), labeler) 928 + .await 929 + .wrap_err("Failed to insert labeler")?; 930 + 931 + // Set like_count to 3 manually 932 + tx.execute( 933 + "UPDATE labelers SET like_count = 3 WHERE actor_id = $1", 934 + &[&actor_id], 935 + ) 936 + .await 937 + .wrap_err("Failed to set initial like_count")?; 938 + 939 + // Decrement like count 940 + let result = labeler::decrement_labeler_like_count(&tx, actor_id) 941 + .await 942 + .wrap_err("Failed to decrement labeler like_count")?; 943 + assert_eq!(result, 1, "Should update 1 row"); 944 + 945 + // Verify like_count was decremented 946 + let updated_count: i32 = tx 947 + .query_one( 948 + "SELECT like_count FROM labelers WHERE actor_id = $1", 949 + &[&actor_id], 950 + ) 951 + .await 952 + .wrap_err("Failed to get updated like_count")? 953 + .get(0); 954 + assert_eq!(updated_count, 2, "like_count should be decremented to 2"); 955 + 956 + // Test that it doesn't go below 0 957 + for _ in 0..10 { 958 + labeler::decrement_labeler_like_count(&tx, actor_id) 959 + .await 960 + .wrap_err("Failed to decrement")?; 961 + } 962 + 963 + let final_count: i32 = tx 964 + .query_one( 965 + "SELECT like_count FROM labelers WHERE actor_id = $1", 966 + &[&actor_id], 967 + ) 968 + .await 969 + .wrap_err("Failed to get final like_count")? 970 + .get(0); 971 + assert_eq!( 972 + final_count, 0, 973 + "like_count should not go below 0 (GREATEST protection)" 974 + ); 975 + 976 + tx.rollback().await.wrap_err("Failed to rollback")?; 977 + Ok(()) 978 + }
+7
migrations/2025-11-27-143034_add_like_counts_to_feedgens_labelers/down.sql
··· 1 + -- Drop indexes 2 + DROP INDEX IF EXISTS idx_labelers_like_count_desc; 3 + DROP INDEX IF EXISTS idx_feedgens_like_count_desc; 4 + 5 + -- Drop columns 6 + ALTER TABLE labelers DROP COLUMN IF EXISTS like_count; 7 + ALTER TABLE feedgens DROP COLUMN IF EXISTS like_count;
+27
migrations/2025-11-27-143034_add_like_counts_to_feedgens_labelers/up.sql
··· 1 + -- Add like_count column to feedgens table 2 + ALTER TABLE feedgens ADD COLUMN like_count INTEGER NOT NULL DEFAULT 0; 3 + 4 + -- Add like_count column to labelers table 5 + ALTER TABLE labelers ADD COLUMN like_count INTEGER NOT NULL DEFAULT 0; 6 + 7 + -- Create index for ranking feedgens by likes (descending) 8 + CREATE INDEX idx_feedgens_like_count_desc ON feedgens(like_count DESC); 9 + 10 + -- Create index for ranking labelers by likes (descending) 11 + CREATE INDEX idx_labelers_like_count_desc ON labelers(like_count DESC); 12 + 13 + -- Backfill existing like counts for feedgens 14 + UPDATE feedgens f 15 + SET like_count = ( 16 + SELECT COUNT(*)::int 17 + FROM feedgen_likes fl 18 + WHERE fl.feedgen_id = f.id 19 + ); 20 + 21 + -- Backfill existing like counts for labelers 22 + UPDATE labelers l 23 + SET like_count = ( 24 + SELECT COUNT(*)::int 25 + FROM labeler_likes ll 26 + WHERE ll.labeler_actor_id = l.actor_id 27 + );
+2
parakeet-db/src/models.rs
··· 376 376 pub avatar_cid: Option<Vec<u8>>, // 32-byte CID 377 377 pub accepts_interactions: bool, 378 378 pub status: FeedgenStatus, // ENUM: complete | stub | deleted 379 + pub like_count: i32, // Aggregated count of likes (maintained by database_writer) 379 380 } 380 381 381 382 // ============================================================================= ··· 536 537 pub subject_types: Option<array_helpers::SubjectTypeArray>, // ENUM array: account | record | chat 537 538 pub subject_collections: Option<array_helpers::TextArray>, // Collection names (as TEXT since generic) 538 539 pub status: LabelerStatus, // ENUM: complete | stub | deleted 540 + pub like_count: i32, // Aggregated count of likes (maintained by database_writer) 539 541 } 540 542 541 543 #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable)]
+2
parakeet-db/src/schema.rs
··· 271 271 avatar_cid -> Nullable<Bytea>, 272 272 accepts_interactions -> Bool, 273 273 status -> FeedgenStatus, 274 + like_count -> Int4, 274 275 } 275 276 } 276 277 ··· 356 357 subject_types -> Nullable<Array<Nullable<SubjectType>>>, 357 358 subject_collections -> Nullable<Array<Nullable<Text>>>, 358 359 status -> LabelerStatus, 360 + like_count -> Int4, 359 361 } 360 362 } 361 363
+21 -50
parakeet/src/loaders/feed.rs
··· 44 44 f.avatar_cid, 45 45 f.accepts_interactions, 46 46 f.status, 47 + f.like_count, 47 48 'at://' || owner.did || '/app.bsky.feed.generator/' || f.rkey::text as at_uri, 48 49 owner.did as owner_did, 49 50 service.did as service_did ··· 94 95 accepts_interactions: bool, 95 96 #[diesel(sql_type = parakeet_db::schema::sql_types::FeedgenStatus)] 96 97 status: parakeet_db::types::FeedgenStatus, 98 + #[diesel(sql_type = diesel::sql_types::Integer)] 99 + like_count: i32, 97 100 #[diesel(sql_type = diesel::sql_types::Text)] 98 101 at_uri: String, 99 102 #[diesel(sql_type = diesel::sql_types::Text)] ··· 112 115 tracing::error!("feedgen load failed: {e}"); 113 116 vec![] 114 117 }); 115 - 116 - // Query like counts for all feedgens 117 - let feedgen_ids: Vec<i64> = res.iter().map(|r| r.id).collect(); 118 - let like_counts: HashMap<i64, i32> = if !feedgen_ids.is_empty() { 119 - #[derive(diesel::QueryableByName)] 120 - struct LikeCountRow { 121 - #[diesel(sql_type = diesel::sql_types::BigInt)] 122 - feedgen_id: i64, 123 - #[diesel(sql_type = diesel::sql_types::BigInt)] 124 - like_count: i64, 125 - } 126 - 127 - let count_query = "SELECT feedgen_id, COUNT(*)::bigint as like_count 128 - FROM feedgen_likes 129 - WHERE feedgen_id = ANY($1) 130 - GROUP BY feedgen_id"; 131 - 132 - diesel_async::RunQueryDsl::load::<LikeCountRow>( 133 - diesel::sql_query(count_query) 134 - .bind::<diesel::sql_types::Array<diesel::sql_types::BigInt>, _>(&feedgen_ids), 135 - &mut conn, 136 - ) 137 - .await 138 - .unwrap_or_else(|e| { 139 - tracing::error!("feedgen like count query failed: {e}"); 140 - vec![] 141 - }) 142 - .into_iter() 143 - .map(|row| (row.feedgen_id, row.like_count as i32)) 144 - .collect() 145 - } else { 146 - HashMap::new() 147 - }; 148 118 149 119 HashMap::from_iter(res.into_iter().map(|row| { 150 120 let enriched = EnrichedFeedGen { ··· 163 133 avatar_cid: row.avatar_cid, 164 134 accepts_interactions: row.accepts_interactions, 165 135 status: row.status, 136 + like_count: row.like_count, 166 137 }, 167 138 at_uri: row.at_uri.clone(), 168 139 owner: row.owner_did, 169 140 service_did: row.service_did, 170 141 cid: row.cid, 171 142 created_at: row.created_at, 172 - like_count: like_counts.get(&row.id).copied().unwrap_or(0), 143 + like_count: row.like_count, 173 144 }; 174 145 (row.at_uri, enriched) 175 146 })) ··· 180 151 /// 181 152 /// This loader is specifically for loading JUST like counts for feedgens and labelers 182 153 /// when we need to rank them by likes before hydrating the full data. 154 + /// 155 + /// Like counts are stored as aggregated columns (feedgens.like_count, labelers.like_count) 156 + /// and are maintained by the database_writer when likes are inserted/deleted. 157 + /// 183 158 /// For posts, use the parakeet-index stats service via LikeRecordLoader. 184 159 pub struct LikeLoader(pub(super) Pool<AsyncPgConnection>); 185 160 impl BatchFn<String, i32> for LikeLoader { ··· 210 185 211 186 let mut result = HashMap::new(); 212 187 213 - // Query feedgen like counts 188 + // Query feedgen like counts from aggregated column 214 189 if !feedgen_uris.is_empty() { 215 190 #[derive(diesel::QueryableByName)] 216 191 struct FeedgenLikeCount { 217 192 #[diesel(sql_type = diesel::sql_types::Text)] 218 193 uri: String, 219 - #[diesel(sql_type = diesel::sql_types::BigInt)] 220 - like_count: i64, 194 + #[diesel(sql_type = diesel::sql_types::Integer)] 195 + like_count: i32, 221 196 } 222 197 223 198 let feedgen_query = "SELECT 224 199 'at://' || a.did || '/app.bsky.feed.generator/' || f.rkey::text as uri, 225 - COUNT(*)::bigint as like_count 200 + f.like_count 226 201 FROM feedgens f 227 202 INNER JOIN actors a ON f.owner_actor_id = a.id 228 - INNER JOIN feedgen_likes fl ON fl.feedgen_id = f.id 229 - WHERE 'at://' || a.did || '/app.bsky.feed.generator/' || f.rkey::text = ANY($1) 230 - GROUP BY a.did, f.rkey"; 203 + WHERE 'at://' || a.did || '/app.bsky.feed.generator/' || f.rkey::text = ANY($1)"; 231 204 232 205 let feedgen_counts: Vec<FeedgenLikeCount> = diesel_async::RunQueryDsl::load( 233 206 diesel::sql_query(feedgen_query) ··· 241 214 }); 242 215 243 216 for row in feedgen_counts { 244 - result.insert(row.uri, row.like_count as i32); 217 + result.insert(row.uri, row.like_count); 245 218 } 246 219 } 247 220 248 - // Query labeler like counts 221 + // Query labeler like counts from aggregated column 249 222 if !labeler_uris.is_empty() { 250 223 #[derive(diesel::QueryableByName)] 251 224 struct LabelerLikeCount { 252 225 #[diesel(sql_type = diesel::sql_types::Text)] 253 226 uri: String, 254 - #[diesel(sql_type = diesel::sql_types::BigInt)] 255 - like_count: i64, 227 + #[diesel(sql_type = diesel::sql_types::Integer)] 228 + like_count: i32, 256 229 } 257 230 258 231 let labeler_query = "SELECT 259 232 'at://' || a.did || '/app.bsky.labeler.service/self' as uri, 260 - COUNT(*)::bigint as like_count 233 + l.like_count 261 234 FROM labelers l 262 235 INNER JOIN actors a ON l.actor_id = a.id 263 - INNER JOIN labeler_likes ll ON ll.labeler_actor_id = l.actor_id 264 - WHERE 'at://' || a.did || '/app.bsky.labeler.service/self' = ANY($1) 265 - GROUP BY a.did, l.actor_id"; 236 + WHERE 'at://' || a.did || '/app.bsky.labeler.service/self' = ANY($1)"; 266 237 267 238 let labeler_counts: Vec<LabelerLikeCount> = diesel_async::RunQueryDsl::load( 268 239 diesel::sql_query(labeler_query) ··· 276 247 }); 277 248 278 249 for row in labeler_counts { 279 - result.insert(row.uri, row.like_count as i32); 250 + result.insert(row.uri, row.like_count); 280 251 } 281 252 } 282 253
+7 -38
parakeet/src/loaders/labeler.rs
··· 12 12 /// This function is public for testing purposes. 13 13 pub fn build_labeler_records_query(actor_ids_str: &str) -> String { 14 14 format!( 15 - "SELECT actor_id, cid, created_at 15 + "SELECT actor_id, cid, created_at, like_count 16 16 FROM labelers 17 17 WHERE actor_id IN ({})", 18 18 actor_ids_str ··· 134 134 cid: Vec<u8>, 135 135 #[diesel(sql_type = Timestamptz)] 136 136 created_at: chrono::DateTime<chrono::Utc>, 137 + #[diesel(sql_type = Integer)] 138 + like_count: i32, 137 139 } 138 140 139 141 let records: Vec<RecordRow> = diesel_async::RunQueryDsl::load( ··· 142 144 ) 143 145 .await 144 146 .unwrap_or_default(); 145 - let record_by_actor: HashMap<i32, (Vec<u8>, chrono::DateTime<chrono::Utc>)> = 146 - records.into_iter().map(|row| (row.actor_id, (row.cid, row.created_at))).collect(); 147 + let record_by_actor: HashMap<i32, (Vec<u8>, chrono::DateTime<chrono::Utc>, i32)> = 148 + records.into_iter().map(|row| (row.actor_id, (row.cid, row.created_at, row.like_count))).collect(); 147 149 148 150 // Load label definitions 149 151 let defs: Vec<models::LabelerDef> = diesel_async::RunQueryDsl::load( ··· 160 162 defs_by_actor.entry(def.labeler_actor_id).or_default().push(def); 161 163 } 162 164 163 - // Load like counts for these labelers 164 - let like_counts: HashMap<i32, i32> = if !labeler_actor_ids.is_empty() { 165 - #[derive(diesel::QueryableByName)] 166 - struct LikeCountRow { 167 - #[diesel(sql_type = diesel::sql_types::Integer)] 168 - labeler_actor_id: i32, 169 - #[diesel(sql_type = diesel::sql_types::BigInt)] 170 - like_count: i64, 171 - } 172 - 173 - let count_query = "SELECT labeler_actor_id, COUNT(*)::bigint as like_count 174 - FROM labeler_likes 175 - WHERE labeler_actor_id = ANY($1) 176 - GROUP BY labeler_actor_id"; 177 - 178 - diesel_async::RunQueryDsl::load::<LikeCountRow>( 179 - diesel::sql_query(count_query) 180 - .bind::<diesel::sql_types::Array<diesel::sql_types::Integer>, _>(&labeler_actor_ids), 181 - &mut conn, 182 - ) 183 - .await 184 - .unwrap_or_else(|e| { 185 - tracing::error!("labeler like count query failed: {e}"); 186 - vec![] 187 - }) 188 - .into_iter() 189 - .map(|row| (row.labeler_actor_id, row.like_count as i32)) 190 - .collect() 191 - } else { 192 - HashMap::new() 193 - }; 194 - 195 165 // Build result map: DID -> (EnrichedLabeler, Vec<LabelerDef>) 196 166 labelers 197 167 .into_iter() 198 168 .filter_map(|labeler| { 199 169 let actor_id = labeler.actor_id; 200 170 let did = did_by_actor.get(&actor_id)?.clone(); 201 - let (cid, created_at) = record_by_actor.get(&actor_id)?; 171 + let (cid, created_at, like_count) = record_by_actor.get(&actor_id)?; 202 172 let defs = defs_by_actor.remove(&actor_id).unwrap_or_default(); 203 - let like_count = like_counts.get(&actor_id).copied().unwrap_or(0); 204 173 205 174 let enriched = EnrichedLabeler { 206 175 labeler, 207 176 did: did.clone(), 208 177 cid: cid.clone(), 209 178 created_at: *created_at, 210 - like_count, 179 + like_count: *like_count, 211 180 }; 212 181 Some((did, (enriched, defs))) 213 182 })