+3
-2
consumer/src/database_writer/bulk_processor.rs
+3
-2
consumer/src/database_writer/bulk_processor.rs
···
554
554
});
555
555
} else if subject_uri.contains("/app.bsky.feed.generator/") {
556
556
// Feedgen like - only include if feedgen exists (no auto-stubbing)
557
-
if let Some(&feedgen_id) = resolved_feedgens.get(subject_uri.as_str()) {
557
+
if let Some(&(feed_actor_id, ref feed_rkey)) = resolved_feedgens.get(subject_uri.as_str()) {
558
558
feedgen_like_data.push(FeedgenLikeCopyData {
559
559
actor_id,
560
560
rkey: like.rkey,
561
-
feedgen_id,
561
+
feed_actor_id,
562
+
feed_rkey: feed_rkey.clone(),
562
563
});
563
564
} else {
564
565
tracing::debug!(uri = %subject_uri, "Feedgen not found for like, skipping");
+12
-8
consumer/src/database_writer/bulk_types.rs
+12
-8
consumer/src/database_writer/bulk_types.rs
···
113
113
pub via_repost_rkey: Option<i64>, // Optional FK to reposts table (natural key part 2)
114
114
}
115
115
116
-
/// Data for bulk COPY into feedgen_likes table
116
+
/// Data for bulk appending likes to feedgens.like_actor_ids[] arrays
117
117
///
118
-
/// Maps to schema: feedgen_likes(actor_id, rkey, feedgen_id)
118
+
/// Feedgen likes are stored as parallel arrays on feedgens table (like posts).
119
+
/// Maps to: feedgens(like_actor_ids[], like_rkeys[])
119
120
#[derive(Debug, Clone)]
120
121
pub struct FeedgenLikeCopyData {
121
-
pub actor_id: i32,
122
-
pub rkey: i64, // TID converted to i64
123
-
pub feedgen_id: i64, // Resolved FK to feedgens table
122
+
pub actor_id: i32, // The liker's actor_id
123
+
pub rkey: i64, // The like record's rkey (TID)
124
+
pub feed_actor_id: i32, // Feedgen's actor_id (natural key part 1)
125
+
pub feed_rkey: String, // Feedgen's rkey (natural key part 2)
124
126
}
125
127
126
128
/// Data for bulk COPY into labeler_likes table
···
228
230
pub post_rkey: i64,
229
231
}
230
232
231
-
/// Result of inserting a feedgen like via bulk COPY
233
+
/// Result of inserting a feedgen like via bulk array append
232
234
#[derive(Debug)]
233
235
pub struct InsertedFeedgenLike {
234
236
/// The actor_id who liked the feedgen
235
237
pub actor_id: i32,
236
-
/// The feedgen's synthetic ID
237
-
pub feedgen_id: i64,
238
+
/// The feedgen's actor_id (natural key part 1)
239
+
pub feed_actor_id: i32,
240
+
/// The feedgen's rkey (natural key part 2)
241
+
pub feed_rkey: String,
238
242
}
239
243
240
244
/// Result of inserting a labeler like via bulk COPY
+1
-7
consumer/src/database_writer/workers.rs
+1
-7
consumer/src/database_writer/workers.rs
···
181
181
match crate::db::bulk_copy::copy_feedgen_likes(&txn, bulk_ops.feedgen_likes).await {
182
182
Ok(inserted) => {
183
183
total_inserted += inserted.len();
184
-
185
-
// Update feedgen like_count aggregates
186
-
let feedgen_ids: Vec<i64> = inserted.iter().map(|like| like.feedgen_id).collect();
187
-
if let Err(e) = crate::db::operations::increment_feedgen_like_counts(&txn, &feedgen_ids).await {
188
-
tracing::error!(error = ?e, "Failed to update feedgen like_count aggregates");
189
-
}
190
-
184
+
// Note: like_count is maintained automatically by trigger (update_feedgen_like_count)
191
185
tracing::info!(inserted = inserted.len(), "Bulk COPY feedgen_likes completed");
192
186
}
193
187
Err(e) => {
+44
-57
consumer/src/db/bulk_copy/mod.rs
+44
-57
consumer/src/db/bulk_copy/mod.rs
···
252
252
Ok(result)
253
253
}
254
254
255
-
/// Bulk COPY feedgen likes into the database
255
+
/// Bulk append feedgen likes to feedgens.like_actor_ids[] arrays
256
+
///
257
+
/// Feedgen likes are stored as parallel arrays on feedgens table (like posts).
258
+
/// This function updates feedgens rows to append likes to their like_actor_ids[] and like_rkeys[] arrays.
256
259
///
257
-
/// # Schema
260
+
/// # Performance
258
261
///
259
-
/// ```sql
260
-
/// feedgen_likes(actor_id, rkey, feedgen_id)
261
-
/// ```
262
+
/// Groups likes by feedgen and uses single UPDATE per feedgen to append all likes at once.
262
263
pub async fn copy_feedgen_likes(
263
264
conn: &Transaction<'_>,
264
265
likes: Vec<FeedgenLikeCopyData>,
···
267
268
return Ok(Vec::new());
268
269
}
269
270
270
-
tracing::debug!(count = likes.len(), "Starting bulk COPY for feedgen_likes");
271
+
tracing::debug!(count = likes.len(), "Starting bulk array append for feedgen likes");
271
272
let start = std::time::Instant::now();
272
273
273
-
queries::create_feedgen_likes_staging_table(conn).await?;
274
+
// Group likes by feedgen (feed_actor_id, feed_rkey)
275
+
let mut likes_by_feedgen: std::collections::HashMap<(i32, String), Vec<(i32, i64)>> = std::collections::HashMap::new();
274
276
275
-
let copy_sink = conn
276
-
.copy_in(
277
-
"COPY feedgen_likes_staging (actor_id, rkey, feedgen_id)
278
-
FROM STDIN (FORMAT binary)",
279
-
)
280
-
.await?;
277
+
for like in &likes {
278
+
likes_by_feedgen
279
+
.entry((like.feed_actor_id, like.feed_rkey.clone()))
280
+
.or_default()
281
+
.push((like.actor_id, like.rkey));
282
+
}
281
283
282
-
let writer = BinaryCopyInWriter::new(
283
-
copy_sink,
284
-
&[
285
-
Type::INT4, // actor_id
286
-
Type::INT8, // rkey
287
-
Type::INT8, // feedgen_id
288
-
],
284
+
tracing::debug!(
285
+
feedgens_count = likes_by_feedgen.len(),
286
+
likes_count = likes.len(),
287
+
"Grouped likes by feedgen"
289
288
);
290
289
291
-
futures::pin_mut!(writer);
290
+
// Update each feedgen to append likes to arrays
291
+
let mut updated_count = 0;
292
+
for ((feed_actor_id, feed_rkey), feedgen_likes) in likes_by_feedgen.iter() {
293
+
let liker_actor_ids: Vec<i32> = feedgen_likes.iter().map(|(aid, _)| *aid).collect();
294
+
let liker_rkeys: Vec<i64> = feedgen_likes.iter().map(|(_, rk)| *rk).collect();
292
295
293
-
for like in &likes {
294
-
writer
295
-
.as_mut()
296
-
.write(&[&like.actor_id, &like.rkey, &like.feedgen_id])
296
+
let rows = conn
297
+
.execute(
298
+
"UPDATE feedgens
299
+
SET like_actor_ids = COALESCE(like_actor_ids, ARRAY[]::integer[]) || $3::integer[],
300
+
like_rkeys = COALESCE(like_rkeys, ARRAY[]::bigint[]) || $4::bigint[]
301
+
WHERE actor_id = $1 AND rkey = $2",
302
+
&[&feed_actor_id, &feed_rkey, &liker_actor_ids, &liker_rkeys],
303
+
)
297
304
.await?;
298
-
}
299
305
300
-
writer.finish().await?;
301
-
302
-
let rows = conn
303
-
.query(
304
-
"WITH inserted AS (
305
-
INSERT INTO feedgen_likes (actor_id, rkey, feedgen_id)
306
-
SELECT actor_id, rkey, feedgen_id
307
-
FROM feedgen_likes_staging
308
-
ON CONFLICT (actor_id, rkey) DO NOTHING
309
-
RETURNING actor_id, feedgen_id
310
-
)
311
-
SELECT
312
-
i.actor_id,
313
-
i.feedgen_id,
314
-
'at://' || a.did || '/app.bsky.feed.generator/' || f.rkey as subject_uri
315
-
FROM inserted i
316
-
INNER JOIN feedgens f ON i.feedgen_id = f.id
317
-
INNER JOIN actors a ON f.actor_id = a.id",
318
-
&[],
319
-
)
320
-
.await?;
306
+
updated_count += rows;
307
+
}
321
308
322
-
let mut result = Vec::with_capacity(rows.len());
323
-
for row in rows {
324
-
let actor_id: i32 = row.get(0);
325
-
let feedgen_id: i64 = row.get(1);
309
+
tracing::debug!(updated_feedgens = updated_count, "Updated feedgens with likes");
326
310
311
+
// Build result
312
+
let mut result = Vec::with_capacity(likes.len());
313
+
for like in likes {
327
314
result.push(InsertedFeedgenLike {
328
-
actor_id,
329
-
feedgen_id,
315
+
actor_id: like.actor_id,
316
+
feed_actor_id: like.feed_actor_id,
317
+
feed_rkey: like.feed_rkey,
330
318
});
331
319
}
332
320
333
321
let elapsed = start.elapsed();
334
322
tracing::info!(
335
-
count = likes.len(),
336
-
inserted = result.len(),
337
-
duplicates = likes.len() - result.len(),
323
+
count = result.len(),
324
+
feedgens_updated = updated_count,
338
325
duration_ms = elapsed.as_millis(),
339
-
"Bulk COPY feedgen_likes completed"
326
+
"Bulk array append for feedgen_likes completed"
340
327
);
341
328
342
329
Ok(result)
+57
-33
consumer/src/db/bulk_resolve/mod.rs
+57
-33
consumer/src/db/bulk_resolve/mod.rs
···
227
227
resolve_post_uris_bulk(conn, &uris).await
228
228
}
229
229
230
-
/// Resolve multiple feedgen AT-URIs to feedgen_ids in a single query
230
+
/// Resolve multiple feedgen AT-URIs to natural keys
231
231
///
232
-
/// Returns a HashMap of AT-URI → feedgen_id for all found feedgens.
232
+
/// Returns a HashMap of AT-URI → (feed_actor_id, feed_rkey) for all found feedgens.
233
233
/// Does NOT create stubs (feedgens must be explicitly created, not auto-stubbed).
234
234
///
235
235
/// # Performance
236
236
///
237
-
/// Single query joining actors and feedgens using `WHERE (a.did, f.rkey) = ANY(...)`.
237
+
/// First resolves all DIDs to actor_ids in bulk, then queries feedgens by actor_id and rkey.
238
+
///
239
+
/// # Note
240
+
///
241
+
/// Phase 6 migration removed synthetic IDs - feedgens now use natural keys (actor_id, rkey).
242
+
/// feedgen_likes table was dropped - likes are now stored as arrays on feedgens table.
238
243
pub async fn resolve_feedgen_uris_bulk<C: GenericClient>(
239
244
conn: &C,
240
245
at_uris: &[&str],
241
-
) -> Result<HashMap<String, i64>> {
246
+
) -> Result<HashMap<String, (i32, String)>> {
242
247
if at_uris.is_empty() {
243
248
return Ok(HashMap::new());
244
249
}
245
250
246
251
// Parse URIs to extract (did, rkey) pairs
247
-
let mut did_rkey_pairs: Vec<(String, String)> = Vec::with_capacity(at_uris.len());
248
252
let mut uri_to_did_rkey: HashMap<String, (String, String)> = HashMap::new();
253
+
let mut dids_set: std::collections::HashSet<String> = std::collections::HashSet::new();
249
254
250
255
for uri in at_uris {
251
256
let did = parakeet_db::at_uri_util::extract_did(uri)
···
253
258
let rkey = parakeet_db::at_uri_util::extract_rkey(uri)
254
259
.ok_or_else(|| eyre::eyre!("Invalid AT URI: missing rkey in {}", uri))?;
255
260
256
-
did_rkey_pairs.push((did.to_string(), rkey.to_string()));
257
261
uri_to_did_rkey.insert(uri.to_string(), (did.to_string(), rkey.to_string()));
262
+
dids_set.insert(did.to_string());
258
263
}
259
264
260
-
// Use UNNEST to create temporary table and join
261
-
let dids: Vec<String> = did_rkey_pairs.iter().map(|(d, _)| d.clone()).collect();
262
-
let rkeys: Vec<String> = did_rkey_pairs.iter().map(|(_, r)| r.clone()).collect();
265
+
// Resolve all actor DIDs to actor_ids (no auto-stubbing for feedgen owners)
266
+
let dids_vec: Vec<&str> = dids_set.iter().map(|s| s.as_str()).collect();
267
+
let did_to_actor_id = resolve_actor_dids_bulk(conn, &dids_vec).await?;
263
268
264
-
let rows = conn
265
-
.query(
266
-
"SELECT a.did, f.rkey, f.id
267
-
FROM actors a
268
-
INNER JOIN feedgens f ON f.actor_id = a.id
269
-
WHERE (a.did, f.rkey) IN (
270
-
SELECT UNNEST($1::text[]), UNNEST($2::text[])
271
-
)",
272
-
&[&dids, &rkeys],
273
-
)
274
-
.await?;
269
+
// Collect rkeys for each actor_id
270
+
let mut actor_rkeys: Vec<(i32, String)> = Vec::new();
271
+
for (did, rkey) in uri_to_did_rkey.values() {
272
+
if let Some(&actor_id) = did_to_actor_id.get(did) {
273
+
actor_rkeys.push((actor_id, rkey.clone()));
274
+
}
275
+
}
275
276
276
-
// Build reverse mapping: (did, rkey) → feedgen_id
277
-
let mut did_rkey_to_id: HashMap<(String, String), i64> = HashMap::new();
277
+
if actor_rkeys.is_empty() {
278
+
return Ok(HashMap::new());
279
+
}
280
+
281
+
// Query feedgens existence by (actor_id, rkey) pairs
282
+
// Build dynamic query with OR clauses
283
+
let placeholders: Vec<String> = (0..actor_rkeys.len())
284
+
.map(|i| format!("(actor_id = ${} AND rkey = ${})", i * 2 + 1, i * 2 + 2))
285
+
.collect();
286
+
let query = format!(
287
+
"SELECT actor_id, rkey FROM feedgens WHERE {}",
288
+
placeholders.join(" OR ")
289
+
);
290
+
291
+
let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = Vec::new();
292
+
for (actor_id, rkey) in &actor_rkeys {
293
+
params.push(actor_id);
294
+
params.push(rkey);
295
+
}
296
+
297
+
let rows = conn.query(&query, ¶ms).await?;
298
+
299
+
// Build reverse lookup: (actor_id, rkey) → natural key
300
+
let mut found_feedgens: std::collections::HashSet<(i32, String)> = std::collections::HashSet::new();
278
301
for row in rows {
279
-
let did: String = row.get(0);
302
+
let actor_id: i32 = row.get(0);
280
303
let rkey: String = row.get(1);
281
-
let feedgen_id: i64 = row.get(2);
282
-
did_rkey_to_id.insert((did, rkey), feedgen_id);
304
+
found_feedgens.insert((actor_id, rkey));
283
305
}
284
306
285
-
// Map back to URIs
307
+
// Map URIs to natural keys (only for feedgens that exist)
286
308
let mut result = HashMap::new();
287
309
for (uri, (did, rkey)) in uri_to_did_rkey {
288
-
if let Some(&feedgen_id) = did_rkey_to_id.get(&(did, rkey)) {
289
-
result.insert(uri, feedgen_id);
310
+
if let Some(&actor_id) = did_to_actor_id.get(&did) {
311
+
if found_feedgens.contains(&(actor_id, rkey.clone())) {
312
+
result.insert(uri, (actor_id, rkey));
313
+
}
290
314
}
291
315
}
292
316
···
301
325
/// # Performance
302
326
///
303
327
/// Single query using `WHERE did = ANY($1)` to find labelers by their DID.
304
-
/// Labelers are identified by having a record in the `labelers` table.
328
+
/// Phase 2: Labelers are denormalized into actors table - identified by having labeler_status set.
305
329
pub async fn resolve_labeler_dids_bulk<C: GenericClient>(
306
330
conn: &C,
307
331
dids: &[&str],
···
312
336
313
337
let rows = conn
314
338
.query(
315
-
"SELECT a.did, l.actor_id
316
-
FROM actors a
317
-
INNER JOIN labelers l ON l.actor_id = a.id
318
-
WHERE a.did = ANY($1)",
339
+
"SELECT did, id
340
+
FROM actors
341
+
WHERE did = ANY($1)
342
+
AND labeler_status IS NOT NULL",
319
343
&[&dids],
320
344
)
321
345
.await?;
+1
-1
consumer/src/db/operations/feed.rs
+1
-1
consumer/src/db/operations/feed.rs
···
29
29
mod feedgen;
30
30
31
31
// Re-export all public functions for external use
32
-
pub use feedgen::{feedgen_delete, feedgen_upsert, increment_feedgen_like_counts, decrement_feedgen_like_count};
32
+
pub use feedgen::{feedgen_delete, feedgen_upsert, decrement_feedgen_like_count};
33
33
pub use helpers::{ensure_list_id, ensure_list_natural_key, get_actor_id};
34
34
pub use like::{like_delete, like_insert, LikeSubject};
35
35
pub use post::{post_delete, post_insert};
+3
-26
consumer/src/db/operations/feed/feedgen.rs
+3
-26
consumer/src/db/operations/feed/feedgen.rs
···
102
102
.wrap_err_with(|| format!("Failed to delete feedgen for actor_id:{} rkey:{}", actor_id, rkey))
103
103
}
104
104
105
-
/// Increment like_count for multiple feedgens (bulk operation)
106
-
///
107
-
/// This is called after bulk inserting feedgen_likes to update the aggregate counts.
108
-
/// Uses a single UPDATE statement with aggregation for efficiency.
109
-
pub async fn increment_feedgen_like_counts<C: GenericClient>(
110
-
conn: &C,
111
-
feedgen_ids: &[i64],
112
-
) -> Result<u64> {
113
-
if feedgen_ids.is_empty() {
114
-
return Ok(0);
115
-
}
116
-
117
-
conn.execute(
118
-
"UPDATE feedgens
119
-
SET like_count = like_count + counts.count
120
-
FROM (
121
-
SELECT id, COUNT(*) as count
122
-
FROM unnest($1::bigint[]) as id
123
-
GROUP BY id
124
-
) AS counts
125
-
WHERE feedgens.id = counts.id",
126
-
&[&feedgen_ids],
127
-
)
128
-
.await
129
-
.wrap_err("Failed to increment feedgen like_count aggregates")
130
-
}
105
+
// NOTE: update_feedgen_like_counts() removed - like_count is now maintained automatically
106
+
// by the update_feedgen_like_count() trigger (Phase 8 migration 2025-12-08-152125).
107
+
// The trigger fires on INSERT or UPDATE OF like_actor_ids and sets like_count = array_length().
131
108
132
109
/// Decrement like_count for a single feedgen
133
110
///
+28
-23
consumer/tests/feedgen_labeler_operations_test.rs
+28
-23
consumer/tests/feedgen_labeler_operations_test.rs
···
3
3
//! Tests coverage for:
4
4
//! - `feedgen_upsert` (insert and update paths)
5
5
//! - `feedgen_delete`
6
-
//! - `increment_feedgen_like_counts` (bulk increment aggregates)
6
+
//! - Feedgen like_count trigger (automatic count maintenance from like_actor_ids array)
7
7
//! - `decrement_feedgen_like_count` (single decrement with GREATEST protection)
8
8
//! - `labeler_upsert` (creates labeler and maintains label definitions)
9
9
//! - `labeler_delete`
···
634
634
// ========================================
635
635
636
636
#[tokio::test]
637
-
async fn test_increment_feedgen_like_counts() -> eyre::Result<()> {
637
+
async fn test_feedgen_like_count_trigger() -> eyre::Result<()> {
638
638
let pool = test_pool();
639
639
let mut conn = pool.get().await.wrap_err("Failed to get connection")?;
640
640
let tx = conn
···
643
643
.wrap_err("Failed to start transaction")?;
644
644
645
645
// Create actors
646
+
let (liker1_actor_id, _, _) = consumer::db::operations::feed::get_actor_id(&tx, "did:plc:liker1")
647
+
.await
648
+
.wrap_err("Failed to ensure liker1 actor")?;
649
+
let (liker2_actor_id, _, _) = consumer::db::operations::feed::get_actor_id(&tx, "did:plc:liker2")
650
+
.await
651
+
.wrap_err("Failed to ensure liker2 actor")?;
652
+
let (liker3_actor_id, _, _) = consumer::db::operations::feed::get_actor_id(&tx, "did:plc:liker3")
653
+
.await
654
+
.wrap_err("Failed to ensure liker3 actor")?;
646
655
consumer::db::operations::feed::get_actor_id(&tx, "did:plc:feedgenowner4")
647
656
.await
648
657
.wrap_err("Failed to ensure owner actor")?;
···
672
681
.await
673
682
.wrap_err("Failed to insert feedgen")?;
674
683
675
-
// Get feedgen_id
676
-
let feedgen_id: i64 = tx
677
-
.query_one(
678
-
"SELECT id FROM feedgens WHERE actor_id = $1 AND rkey = $2",
679
-
&[&owner_actor_id, &rkey],
680
-
)
681
-
.await
682
-
.wrap_err("Failed to get feedgen_id")?
683
-
.get(0);
684
-
685
684
// Verify initial like_count is 0
686
685
let initial_count: i32 = tx
687
686
.query_one(
688
-
"SELECT like_count FROM feedgens WHERE id = $1",
689
-
&[&feedgen_id],
687
+
"SELECT like_count FROM feedgens WHERE actor_id = $1 AND rkey = $2",
688
+
&[&owner_actor_id, &rkey],
690
689
)
691
690
.await
692
691
.wrap_err("Failed to get initial like_count")?
693
692
.get(0);
694
693
assert_eq!(initial_count, 0, "Initial like_count should be 0");
695
694
696
-
// Increment like count (simulating 3 likes for the same feedgen)
697
-
let result = feed::increment_feedgen_like_counts(&tx, &[feedgen_id, feedgen_id, feedgen_id])
698
-
.await
699
-
.wrap_err("Failed to increment feedgen like_counts")?;
700
-
assert_eq!(result, 1, "Should update 1 row (the feedgen itself)");
695
+
// Append likes to like_actor_ids array (simulating 3 likes)
696
+
// Trigger should automatically update like_count
697
+
tx.execute(
698
+
"UPDATE feedgens
699
+
SET like_actor_ids = ARRAY[$3, $4, $5]::integer[],
700
+
like_rkeys = ARRAY[1, 2, 3]::bigint[]
701
+
WHERE actor_id = $1 AND rkey = $2",
702
+
&[&owner_actor_id, &rkey, &liker1_actor_id, &liker2_actor_id, &liker3_actor_id],
703
+
)
704
+
.await
705
+
.wrap_err("Failed to update like arrays")?;
701
706
702
-
// Verify like_count was incremented
707
+
// Verify like_count was automatically updated by trigger
703
708
let updated_count: i32 = tx
704
709
.query_one(
705
-
"SELECT like_count FROM feedgens WHERE id = $1",
706
-
&[&feedgen_id],
710
+
"SELECT like_count FROM feedgens WHERE actor_id = $1 AND rkey = $2",
711
+
&[&owner_actor_id, &rkey],
707
712
)
708
713
.await
709
714
.wrap_err("Failed to get updated like_count")?
710
715
.get(0);
711
-
assert_eq!(updated_count, 3, "like_count should be incremented to 3");
716
+
assert_eq!(updated_count, 3, "like_count should be auto-updated to 3 by trigger");
712
717
713
718
tx.rollback().await.wrap_err("Failed to rollback")?;
714
719
Ok(())
+25
migrations/2025-12-08-152125_fix_feedgen_likes_natural_keys/down.sql
+25
migrations/2025-12-08-152125_fix_feedgen_likes_natural_keys/down.sql
···
1
+
-- Revert feedgen likes denormalization
2
+
--
3
+
-- Note: This down migration cannot restore the data since we dropped the table.
4
+
5
+
-- Step 1: Drop trigger and function
6
+
DROP TRIGGER IF EXISTS feedgen_like_count_trigger ON feedgens;
7
+
DROP FUNCTION IF EXISTS update_feedgen_like_count();
8
+
9
+
-- Step 2: Drop GIN index
10
+
DROP INDEX IF EXISTS idx_feedgens_like_actor_ids_gin;
11
+
12
+
-- Step 2: Recreate feedgen_likes table (with orphaned feedgen_id)
13
+
CREATE TABLE feedgen_likes (
14
+
actor_id INTEGER NOT NULL,
15
+
rkey BIGINT NOT NULL,
16
+
feedgen_id BIGINT NOT NULL,
17
+
PRIMARY KEY (actor_id, rkey)
18
+
);
19
+
20
+
CREATE INDEX idx_feedgen_likes_feedgen ON feedgen_likes(feedgen_id);
21
+
CREATE INDEX idx_feedgen_likes_rkey ON feedgen_likes(rkey);
22
+
23
+
-- Step 3: Drop like array columns from feedgens
24
+
ALTER TABLE feedgens DROP COLUMN IF EXISTS like_actor_ids;
25
+
ALTER TABLE feedgens DROP COLUMN IF EXISTS like_rkeys;
+35
migrations/2025-12-08-152125_fix_feedgen_likes_natural_keys/up.sql
+35
migrations/2025-12-08-152125_fix_feedgen_likes_natural_keys/up.sql
···
1
+
-- Denormalize feedgen likes into feedgens table
2
+
--
3
+
-- This follows the same pattern as posts (Phase 1) where likes are stored as parallel arrays.
4
+
-- The feedgen_likes table is dropped and replaced with like arrays on feedgens.
5
+
6
+
-- Step 1: Add like array columns to feedgens
7
+
ALTER TABLE feedgens
8
+
ADD COLUMN like_actor_ids INTEGER[],
9
+
ADD COLUMN like_rkeys BIGINT[];
10
+
11
+
-- Step 2: Backfill like arrays from feedgen_likes table
12
+
-- Since feedgens.id was dropped but feedgen_likes.feedgen_id still references it,
13
+
-- we can't join properly. The data is orphaned, so we'll truncate and start fresh.
14
+
-- In production, you would need to reconstruct the joins or manually fix the data.
15
+
16
+
-- For now, just drop the broken table
17
+
DROP TABLE feedgen_likes;
18
+
19
+
-- Step 3: Add GIN indexes for array operations (matching posts pattern)
20
+
CREATE INDEX idx_feedgens_like_actor_ids_gin ON feedgens USING gin (like_actor_ids gin__int_ops);
21
+
22
+
-- Step 4: Add trigger to maintain like_count automatically from like_actor_ids array
23
+
-- Note: like_count column already exists from migration 2025-11-27-143034
24
+
CREATE OR REPLACE FUNCTION update_feedgen_like_count()
25
+
RETURNS TRIGGER AS $$
26
+
BEGIN
27
+
NEW.like_count := COALESCE(array_length(NEW.like_actor_ids, 1), 0);
28
+
RETURN NEW;
29
+
END;
30
+
$$ LANGUAGE plpgsql;
31
+
32
+
CREATE TRIGGER feedgen_like_count_trigger
33
+
BEFORE INSERT OR UPDATE OF like_actor_ids ON feedgens
34
+
FOR EACH ROW
35
+
EXECUTE FUNCTION update_feedgen_like_count();
+5
-13
parakeet-db/src/models.rs
+5
-13
parakeet-db/src/models.rs
···
414
414
// SOCIAL INTERACTIONS
415
415
// =============================================================================
416
416
417
-
// Feed Generator Likes (rare, ~0.01% of likes)
418
-
#[derive(Clone, Debug, Queryable, Selectable, Identifiable)]
419
-
#[diesel(table_name = crate::schema::feedgen_likes)]
420
-
#[diesel(primary_key(actor_id, rkey))]
421
-
#[diesel(check_for_backend(diesel::pg::Pg))]
422
-
pub struct FeedgenLike {
423
-
pub actor_id: i32, // PK: FK to actors
424
-
pub rkey: i64, // PK: TID as INT8
425
-
pub feedgen_id: i64, // FK to feedgens (enforced by database!)
426
-
// Note: created_at derived from TID rkey via created_at() method
427
-
}
417
+
// Note: Feed Generator Likes table dropped - likes stored as arrays on feedgens table
418
+
// The like_actor_ids[] and like_rkeys[] arrays are maintained on feedgens
419
+
// This follows the same denormalization pattern as posts
428
420
429
421
// Note: Labeler Likes table dropped - labeler data moved to actors table
430
422
// The labeler_like_count is maintained on actors.labeler_like_count
···
732
724
// Implement HasTidRkey for all models with TID-based rkeys
733
725
734
726
impl_tid_rkey!(Post);
735
-
impl_tid_rkey!(FeedgenLike);
727
+
// impl_tid_rkey!(FeedgenLike); // Removed - feedgen_likes table dropped, now like_actor_ids[]/like_rkeys[] arrays
736
728
// impl_tid_rkey!(LabelerLike); // Removed - labeler_likes table dropped
737
729
impl_tid_rkey!(Repost);
738
730
// impl_tid_rkey!(Follow); // Removed - follows table dropped, now follow_record[] arrays
···
751
743
// This eliminates the need for a separate created_at column in the database
752
744
753
745
impl_tid_created_at!(Post);
754
-
impl_tid_created_at!(FeedgenLike);
746
+
// impl_tid_created_at!(FeedgenLike); // Removed - feedgen_likes table dropped, now like_actor_ids[]/like_rkeys[] arrays
755
747
// impl_tid_created_at!(LabelerLike); // Removed - labeler_likes table dropped
756
748
impl_tid_created_at!(Repost);
757
749
// impl_tid_created_at!(Follow); // Removed - follows table dropped, now follow_record[] arrays
+2
-9
parakeet-db/src/schema.rs
+2
-9
parakeet-db/src/schema.rs
···
307
307
}
308
308
309
309
diesel::table! {
310
-
feedgen_likes (actor_id, rkey) {
311
-
actor_id -> Int4,
312
-
rkey -> Int8,
313
-
feedgen_id -> Int8,
314
-
}
315
-
}
316
-
317
-
diesel::table! {
318
310
use diesel::sql_types::*;
319
311
use super::sql_types::ContentMode;
320
312
use super::sql_types::FeedgenStatus;
···
334
326
accepts_interactions -> Bool,
335
327
status -> FeedgenStatus,
336
328
like_count -> Int4,
329
+
like_actor_ids -> Nullable<Array<Nullable<Int4>>>,
330
+
like_rkeys -> Nullable<Array<Nullable<Int8>>>,
337
331
}
338
332
}
339
333
···
588
582
actors,
589
583
backfill_jobs,
590
584
constellation_enrichment_queue,
591
-
feedgen_likes,
592
585
feedgens,
593
586
fetch_queue,
594
587
handle_resolution_queue,