+69
-131
consumer/src/database_writer/operations/executor.rs
+69
-131
consumer/src/database_writer/operations/executor.rs
···
12
12
//! Operations return aggregate deltas ONLY when they actually affect rows in the database.
13
13
//! This makes stats idempotent - duplicate events won't double-count.
14
14
15
-
use super::{cache, ActorStatsDeltas, DatabaseOperation, PostStatsDeltas};
15
+
use super::{cache, ActorStatsDeltas, DatabaseOperation};
16
16
use eyre::WrapErr as _;
17
17
use ipld_core::cid::Cid;
18
-
19
-
/// Helper to create a single post stats delta
20
-
fn like_delta(actor_id: i32, rkey: i64, delta: i32) -> Vec<((i32, i64), PostStatsDeltas)> {
21
-
vec![((actor_id, rkey), PostStatsDeltas { like_delta: delta, ..Default::default() })]
22
-
}
23
-
24
-
/// Helper to create repost delta
25
-
fn repost_delta(actor_id: i32, rkey: i64, delta: i32) -> Vec<((i32, i64), PostStatsDeltas)> {
26
-
vec![((actor_id, rkey), PostStatsDeltas { repost_delta: delta, ..Default::default() })]
27
-
}
28
-
29
-
/// Helper to create reply delta
30
-
fn reply_delta(actor_id: i32, rkey: i64, delta: i32) -> Vec<((i32, i64), PostStatsDeltas)> {
31
-
vec![((actor_id, rkey), PostStatsDeltas { reply_delta: delta, ..Default::default() })]
32
-
}
33
-
34
-
/// Helper to create quote delta
35
-
fn quote_delta(actor_id: i32, rkey: i64, delta: i32) -> Vec<((i32, i64), PostStatsDeltas)> {
36
-
vec![((actor_id, rkey), PostStatsDeltas { quote_delta: delta, ..Default::default() })]
37
-
}
38
18
39
19
// Actor stats delta helpers for denormalized stats in actors table
40
20
···
240
220
if let Some(parent_uri) = parent_uri.as_ref() {
241
221
// Resolve parent URI to natural key
242
222
if let Ok(Some((parent_actor_id, parent_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, parent_uri).await {
243
-
deltas.extend(reply_delta(parent_actor_id, parent_rkey, 1));
244
-
// Invalidate parent post cache since reply_count changed
223
+
// Append to parent's reply arrays (array-only tracking)
224
+
let _ = conn.execute(
225
+
"UPDATE posts
226
+
SET reply_actor_ids = COALESCE(reply_actor_ids, ARRAY[]::integer[]) || $3,
227
+
reply_rkeys = COALESCE(reply_rkeys, ARRAY[]::bigint[]) || $4
228
+
WHERE (actor_id, rkey) = ($1, $2)
229
+
AND NOT ($3 = ANY(COALESCE(reply_actor_ids, ARRAY[])))",
230
+
&[&parent_actor_id, &parent_rkey, &actor_id, &rkey],
231
+
).await;
232
+
// Invalidate parent post cache since reply array changed
245
233
cache_invalidations.push(cache::invalidate_post(parent_uri));
246
234
}
247
235
}
248
236
249
-
// If this quotes/embeds a post, increment that post's quote count
237
+
// If this quotes/embeds a post, append to that post's quote arrays
250
238
if let Some(quoted_uri) = embed_uri.as_ref() {
251
239
// Resolve quoted URI to natural key
252
240
if let Ok(Some((quoted_actor_id, quoted_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, quoted_uri).await {
253
-
deltas.extend(quote_delta(quoted_actor_id, quoted_rkey, 1));
254
-
// Invalidate quoted post cache since quote_count changed
241
+
// Append to quoted post's quote arrays (array-only tracking)
242
+
let _ = conn.execute(
243
+
"UPDATE posts
244
+
SET quote_actor_ids = COALESCE(quote_actor_ids, ARRAY[]::integer[]) || $3,
245
+
quote_rkeys = COALESCE(quote_rkeys, ARRAY[]::bigint[]) || $4
246
+
WHERE (actor_id, rkey) = ($1, $2)
247
+
AND NOT ($3 = ANY(COALESCE(quote_actor_ids, ARRAY[])))",
248
+
&["ed_actor_id, "ed_rkey, &actor_id, &rkey],
249
+
).await;
250
+
// Invalidate quoted post cache since quote array changed
255
251
cache_invalidations.push(cache::invalidate_post(quoted_uri));
256
252
}
257
253
}
···
286
282
}
287
283
288
284
if rows > 0 {
289
-
// Only send delta if we actually inserted a like (not a duplicate)
290
-
// Resolve subject URI to natural key
291
-
let deltas = if let Ok(Some((subject_actor_id, subject_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, &subject_uri).await {
292
-
like_delta(subject_actor_id, subject_rkey, 1)
293
-
} else {
294
-
vec![]
295
-
};
285
+
// Note: like arrays are updated directly in db::like_insert via LikeArrayOp
286
+
// No deltas needed with array-only tracking
296
287
297
-
// Invalidate post cache since like_count changed
288
+
// Invalidate post cache since like array changed
298
289
let cache_invalidations = vec![cache::invalidate_post(&subject_uri)];
299
290
300
-
Ok((deltas, vec![], cache_invalidations))
291
+
Ok((vec![], vec![], cache_invalidations))
301
292
} else {
302
293
Ok((vec![], vec![], vec![]))
303
294
}
···
328
319
}
329
320
330
321
if rows > 0 {
331
-
// Only send delta if we actually inserted a repost
332
-
// Resolve subject URI to natural key
333
-
let deltas = if let Ok(Some((subject_actor_id, subject_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, &subject_uri).await {
334
-
repost_delta(subject_actor_id, subject_rkey, 1)
335
-
} else {
336
-
vec![]
337
-
};
322
+
// Note: repost arrays will be updated in db::repost_insert (dual-write to reposts table + post arrays)
323
+
// No deltas needed with array-only tracking
338
324
339
-
// Invalidate post cache since repost_count changed
325
+
// Invalidate post cache since repost array changed
340
326
let cache_invalidations = vec![cache::invalidate_post(&subject_uri)];
341
327
342
-
Ok((deltas, vec![], cache_invalidations))
328
+
Ok((vec![], vec![], cache_invalidations))
343
329
} else {
344
330
Ok((vec![], vec![], vec![]))
345
331
}
···
884
870
if let Some(subject) = db::like_delete(conn, rkey, actor_id).await? {
885
871
match subject {
886
872
db::LikeSubject::Post { actor_id: post_actor_id, rkey: post_rkey } => {
887
-
// Decrement post like_count
888
-
deltas.extend(like_delta(post_actor_id, post_rkey, -1));
873
+
// Note: like arrays are updated directly in db::like_delete via array_remove()
874
+
// No deltas needed with array-only tracking
889
875
890
876
// Invalidate post cache
891
877
if let Ok(post_did) = db::actor_did_from_id(conn, post_actor_id).await {
···
936
922
CollectionType::BskyFeedRepost => {
937
923
// Returns post URI if deleted
938
924
if let Some(post_uri) = db::repost_delete(conn, rkey, actor_id).await? {
939
-
// Resolve post URI to natural key for delta
940
-
if let Ok(Some((post_actor_id, post_rkey))) = crate::db::id_resolution::resolve_post_uri(conn, &post_uri).await {
941
-
deltas.extend(repost_delta(post_actor_id, post_rkey, -1));
942
-
}
925
+
// Note: repost arrays will be updated in db::repost_delete (remove from post arrays)
926
+
// No deltas needed with array-only tracking
943
927
944
-
// Invalidate post cache since repost_count changed
928
+
// Invalidate post cache since repost array changed
945
929
cache_invalidations.push(cache::invalidate_post(&post_uri));
946
930
947
931
// Remove PostgreSQL notification (recipient = author of reposted post)
···
1008
992
// Decrement actor's post count
1009
993
actor_deltas.extend(actor_post_delta(actor_id, -1));
1010
994
1011
-
// Decrement reply count for parent if this was a reply
995
+
// Remove from parent's reply arrays if this was a reply
1012
996
if let Some((parent_actor_id, parent_rkey)) = parent_key {
1013
-
deltas.extend(reply_delta(parent_actor_id, parent_rkey, -1));
997
+
// Remove from reply arrays using array_remove()
998
+
let _ = conn.execute(
999
+
"UPDATE posts
1000
+
SET reply_actor_ids = array_remove(reply_actor_ids, $3),
1001
+
reply_rkeys = array_remove(reply_rkeys, $4)
1002
+
WHERE (actor_id, rkey) = ($1, $2)",
1003
+
&[&parent_actor_id, &parent_rkey, &actor_id, &rkey],
1004
+
).await;
1014
1005
1015
-
// Invalidate parent post cache since reply_count changed
1006
+
// Invalidate parent post cache since reply array changed
1016
1007
// Construct URI using actor cache
1017
1008
if let Ok(parent_did) = db::actor_did_from_id(conn, parent_actor_id).await {
1018
1009
let parent_uri = format!("at://{}/app.bsky.feed.post/{}",
···
1022
1013
}
1023
1014
}
1024
1015
1025
-
// Decrement quote count if this quoted a record
1016
+
// Remove from embedded post's quote arrays if this quoted a record
1026
1017
if let Some((embed_actor_id, embed_rkey)) = embedded_key {
1027
-
deltas.extend(quote_delta(embed_actor_id, embed_rkey, -1));
1018
+
// Remove from quote arrays using array_remove()
1019
+
let _ = conn.execute(
1020
+
"UPDATE posts
1021
+
SET quote_actor_ids = array_remove(quote_actor_ids, $3),
1022
+
quote_rkeys = array_remove(quote_rkeys, $4)
1023
+
WHERE (actor_id, rkey) = ($1, $2)",
1024
+
&[&embed_actor_id, &embed_rkey, &actor_id, &rkey],
1025
+
).await;
1028
1026
1029
-
// Invalidate quoted post cache since quote_count changed
1027
+
// Invalidate quoted post cache since quote array changed
1030
1028
// Construct URI using actor cache
1031
1029
if let Ok(embed_did) = db::actor_did_from_id(conn, embed_actor_id).await {
1032
1030
let embed_uri = format!("at://{}/app.bsky.feed.post/{}",
···
1171
1169
Ok((vec![], vec![], vec![]))
1172
1170
}
1173
1171
DatabaseOperation::EnsureMinimumPostStats {
1174
-
post_uri,
1175
-
min_likes,
1176
-
min_reposts,
1177
-
min_quotes,
1178
-
min_replies,
1172
+
post_uri: _,
1173
+
min_likes: _,
1174
+
min_reposts: _,
1175
+
min_quotes: _,
1176
+
min_replies: _,
1179
1177
} => {
1180
-
// Resolve post_uri to post natural key
1181
-
let (post_actor_id, post_rkey) = match crate::db::id_resolution::resolve_uri_to_id(conn, &post_uri).await {
1182
-
Ok(crate::db::id_resolution::ResolvedId::Post(actor_id, rkey)) => (actor_id, rkey),
1183
-
Ok(crate::db::id_resolution::ResolvedId::Actor(_)) => {
1184
-
tracing::error!("EnsureMinimumPostStats received actor DID instead of post URI: {}", post_uri);
1185
-
return Ok((vec![], vec![], vec![]));
1186
-
}
1187
-
Err(e) => {
1188
-
tracing::debug!("Post not found for EnsureMinimumPostStats (may not exist yet): {}: {}", post_uri, e);
1189
-
return Ok((vec![], vec![], vec![]));
1190
-
}
1191
-
};
1192
-
1193
-
// Get current stats from denormalized columns
1194
-
let current_stats = conn.query_one(
1195
-
"SELECT like_count, reply_count, repost_count, quote_count FROM posts WHERE actor_id = $1 AND rkey = $2",
1196
-
&[&post_actor_id, &post_rkey],
1197
-
).await;
1198
-
1199
-
let (current_likes, current_replies, current_reposts, current_quotes) = match current_stats {
1200
-
Ok(row) => {
1201
-
let likes: Option<i16> = row.get(0);
1202
-
let replies: Option<i16> = row.get(1);
1203
-
let reposts: Option<i16> = row.get(2);
1204
-
let quotes: Option<i16> = row.get(3);
1205
-
(
1206
-
likes.unwrap_or(0) as u64,
1207
-
replies.unwrap_or(0) as u64,
1208
-
reposts.unwrap_or(0) as u64,
1209
-
quotes.unwrap_or(0) as u64,
1210
-
)
1211
-
}
1212
-
Err(e) => {
1213
-
tracing::warn!("Failed to get post stats from database for {}: {}", post_uri, e);
1214
-
return Ok((vec![], vec![], vec![]));
1215
-
}
1216
-
};
1217
-
1218
-
// Calculate deltas needed to reach minimums and build PostStatsDeltas
1219
-
let mut delta_likes = 0;
1220
-
let mut delta_replies = 0;
1221
-
let mut delta_reposts = 0;
1222
-
let mut delta_quotes = 0;
1223
-
1224
-
if min_likes > current_likes {
1225
-
delta_likes = (min_likes - current_likes) as i32;
1226
-
}
1227
-
if min_replies > current_replies {
1228
-
delta_replies = (min_replies - current_replies) as i32;
1229
-
}
1230
-
if min_reposts > current_reposts {
1231
-
delta_reposts = (min_reposts - current_reposts) as i32;
1232
-
}
1233
-
if min_quotes > current_quotes {
1234
-
delta_quotes = (min_quotes - current_quotes) as i32;
1235
-
}
1236
-
1237
-
// Return deltas if any are non-zero
1238
-
if delta_likes != 0 || delta_replies != 0 || delta_reposts != 0 || delta_quotes != 0 {
1239
-
let deltas = vec![((post_actor_id, post_rkey), PostStatsDeltas {
1240
-
like_delta: delta_likes,
1241
-
reply_delta: delta_replies,
1242
-
repost_delta: delta_reposts,
1243
-
quote_delta: delta_quotes,
1244
-
})];
1245
-
Ok((deltas, vec![], vec![]))
1246
-
} else {
1247
-
Ok((vec![], vec![], vec![]))
1248
-
}
1178
+
// DEPRECATED: EnsureMinimumPostStats is incompatible with array-only tracking.
1179
+
// With arrays, we can't fabricate engagement entries to reach minimum counts
1180
+
// (we'd need actual actor IDs and rkeys, not just synthetic counts).
1181
+
// The Constellation enrichment feature that used this may need to be redesigned
1182
+
// or removed entirely.
1183
+
//
1184
+
// For now, this operation is a no-op to avoid breaking callers.
1185
+
// TODO: Remove this operation entirely once callers are updated.
1186
+
Ok((vec![], vec![], vec![]))
1249
1187
}
1250
1188
}
1251
1189
}
+4
-6
consumer/src/database_writer/operations/types.rs
+4
-6
consumer/src/database_writer/operations/types.rs
···
252
252
},
253
253
}
254
254
255
-
/// Post stats deltas for batch updating denormalized stats columns
256
-
/// Accumulates deltas per post (actor_id, rkey) during operation processing
255
+
/// Post stats deltas (DEPRECATED - now using array-only tracking)
256
+
/// This struct is kept for backwards compatibility during migration but is no longer used.
257
+
/// All functions should return empty HashMaps/Vecs for this type.
257
258
#[derive(Default, Debug, Clone)]
258
259
pub struct PostStatsDeltas {
259
-
pub like_delta: i32,
260
-
pub reply_delta: i32,
261
-
pub repost_delta: i32,
262
-
pub quote_delta: i32,
260
+
// All fields removed - engagement is now tracked via arrays only
263
261
}
264
262
265
263
/// Actor stats deltas for batch updating denormalized stats columns in actors table
+17
-144
consumer/src/database_writer/workers.rs
+17
-144
consumer/src/database_writer/workers.rs
···
72
72
&pds_cache,
73
73
).await {
74
74
Ok((stats_deltas, actor_stats_deltas)) => {
75
-
// Batch update post stats
76
-
if !stats_deltas.is_empty() {
77
-
if let Err(e) = batch_update_post_stats(&mut conn_mut, stats_deltas).await {
78
-
tracing::error!(worker = name, error = ?e, repo = %repo, "Failed to batch update post stats after bulk COPY");
79
-
counter!("bulk_writer_stats_update_error").increment(1);
80
-
}
81
-
}
75
+
// Note: Post stats (likes, replies, quotes, reposts) are now tracked via arrays,
76
+
// updated directly during operation processing - no batch updates needed
77
+
82
78
// Batch update actor stats
83
79
if !actor_stats_deltas.is_empty() {
84
80
if let Err(e) = batch_update_actor_stats(&mut conn_mut, actor_stats_deltas).await {
···
148
144
let mut stats_deltas: HashMap<(i32, i64), super::PostStatsDeltas> = HashMap::new();
149
145
let mut actor_stats_deltas: HashMap<i32, super::ActorStatsDeltas> = HashMap::new();
150
146
151
-
// Helper to accumulate deltas
152
-
let mut accumulate_delta = |actor_id: i32, rkey: i64, delta: super::PostStatsDeltas| {
153
-
stats_deltas.entry((actor_id, rkey))
154
-
.and_modify(|existing| {
155
-
existing.like_delta += delta.like_delta;
156
-
existing.reply_delta += delta.reply_delta;
157
-
existing.repost_delta += delta.repost_delta;
158
-
existing.quote_delta += delta.quote_delta;
159
-
})
160
-
.or_insert(delta);
147
+
// Helper to accumulate deltas (DEPRECATED - arrays updated directly now)
148
+
let mut accumulate_delta = |_actor_id: i32, _rkey: i64, _delta: super::PostStatsDeltas| {
149
+
// No-op: engagement is now tracked via arrays, updated directly during operations
161
150
};
162
151
163
152
// Helper to accumulate actor deltas
···
180
169
let op_elapsed = op_start.elapsed();
181
170
total_inserted += inserted.len();
182
171
183
-
// Generate deltas: increment like count for each liked post
184
-
for like in inserted {
185
-
accumulate_delta(like.post_actor_id, like.post_rkey, super::PostStatsDeltas {
186
-
like_delta: 1,
187
-
..Default::default()
188
-
});
189
-
}
172
+
// Note: Like arrays are updated directly in bulk_copy, no deltas needed
173
+
// accumulate_delta is now a no-op
190
174
191
175
tracing::info!(
192
176
count = count,
···
268
252
Ok(inserted) => {
269
253
total_inserted += inserted.len();
270
254
271
-
// Generate deltas: increment repost count for each reposted post
272
-
for repost in &inserted {
273
-
accumulate_delta(repost.post_actor_id, repost.post_rkey, super::PostStatsDeltas {
274
-
repost_delta: 1,
275
-
..Default::default()
276
-
});
277
-
}
255
+
// Note: Repost arrays are updated directly in bulk_copy, no deltas needed
256
+
// accumulate_delta is now a no-op
278
257
279
258
tracing::info!(inserted = inserted.len(), "Bulk COPY reposts completed");
280
259
}
···
323
302
324
303
// Generate deltas for posts
325
304
for post_data in &bulk_ops.posts {
326
-
// Increment reply count on parent post
327
-
if let (Some(parent_actor_id), Some(parent_rkey)) =
328
-
(post_data.parent_post_actor_id, post_data.parent_post_rkey) {
329
-
accumulate_delta(parent_actor_id, parent_rkey, super::PostStatsDeltas {
330
-
reply_delta: 1,
331
-
..Default::default()
332
-
});
333
-
}
334
-
335
-
// Increment quote count on embedded post
336
-
if let (Some(embedded_actor_id), Some(embedded_rkey)) =
337
-
(post_data.embedded_post_actor_id, post_data.embedded_post_rkey) {
338
-
accumulate_delta(embedded_actor_id, embedded_rkey, super::PostStatsDeltas {
339
-
quote_delta: 1,
340
-
..Default::default()
341
-
});
342
-
}
305
+
// Note: Reply/quote arrays are updated directly in bulk_copy, no deltas needed
306
+
// accumulate_delta is now a no-op
343
307
}
344
308
345
309
// Execute child table inserts
···
411
375
/// Takes accumulated deltas and updates denormalized stats columns in posts table.
412
376
/// Uses CASE-batched UPDATEs for efficiency (500 posts per UPDATE statement).
413
377
/// Now uses the consolidated PostUpdate infrastructure.
414
-
async fn batch_update_post_stats(
415
-
conn: &mut deadpool_postgres::Object,
416
-
deltas: std::collections::HashMap<(i32, i64), super::PostStatsDeltas>,
417
-
) -> eyre::Result<()> {
418
-
if deltas.is_empty() {
419
-
return Ok(());
420
-
}
421
-
422
-
let start = std::time::Instant::now();
423
-
let total_posts = deltas.len();
424
-
425
-
// Convert deltas to Vec and sort by (actor_id, rkey) to ensure consistent lock ordering
426
-
// This prevents deadlocks when multiple workers update overlapping posts
427
-
let mut deltas_vec: Vec<((i32, i64), super::PostStatsDeltas)> = deltas.into_iter().collect();
428
-
deltas_vec.sort_by_key(|((actor_id, rkey), _)| (*actor_id, *rkey));
429
-
430
-
const BATCH_SIZE: usize = 500;
431
-
let batch_count = (deltas_vec.len() + BATCH_SIZE - 1) / BATCH_SIZE;
432
-
433
-
let mut total_updated = 0;
434
-
435
-
// Process deltas in batches using PostUpdate
436
-
for (batch_idx, chunk) in deltas_vec.chunks(BATCH_SIZE).enumerate() {
437
-
let batch_start = std::time::Instant::now();
438
-
439
-
// Convert to PostUpdate format
440
-
let mut count_deltas = std::collections::HashMap::new();
441
-
let mut post_keys = Vec::new();
442
-
443
-
for ((actor_id, rkey), delta) in chunk {
444
-
post_keys.push((*actor_id, *rkey));
445
-
count_deltas.insert(
446
-
(*actor_id, *rkey),
447
-
crate::db::operations::CountDeltas {
448
-
like_count: Some(delta.like_delta),
449
-
reply_count: Some(delta.reply_delta),
450
-
repost_count: Some(delta.repost_delta),
451
-
quote_count: Some(delta.quote_delta),
452
-
},
453
-
);
454
-
}
455
-
456
-
// Use consolidated PostUpdate API
457
-
let result = crate::db::operations::PostUpdate {
458
-
target: crate::db::operations::PostUpdateTarget::Batch { keys: post_keys },
459
-
count_deltas: Some(count_deltas),
460
-
..Default::default()
461
-
}
462
-
.execute(conn)
463
-
.await?;
464
-
465
-
let updated = match result {
466
-
crate::db::operations::PostUpdateResult::Count(n) => n,
467
-
_ => unreachable!("PostUpdate with no RETURNING should return Count"),
468
-
};
469
-
470
-
total_updated += updated;
471
-
472
-
tracing::debug!(
473
-
batch = batch_idx + 1,
474
-
batch_size = chunk.len(),
475
-
updated = updated,
476
-
duration_ms = batch_start.elapsed().as_millis(),
477
-
"CASE-batched post stats UPDATE completed (via PostUpdate)"
478
-
);
479
-
}
480
-
481
-
let elapsed = start.elapsed();
482
-
tracing::debug!(
483
-
posts_updated = total_updated,
484
-
deltas_count = total_posts,
485
-
batches = batch_count,
486
-
batch_size = BATCH_SIZE,
487
-
duration_ms = elapsed.as_millis(),
488
-
"Batch updated post stats (CASE-batched via PostUpdate)"
489
-
);
490
-
Ok(())
491
-
}
492
-
493
378
/// Batch update actor stats in the database
494
379
/// Updates denormalized stats columns in actors table (followers_count, following_count, posts_count, etc.)
495
380
///
···
1374
1259
let mut actor_stats_deltas: std::collections::HashMap<i32, super::ActorStatsDeltas> = std::collections::HashMap::new();
1375
1260
let mut event_cache_invalidations = Vec::new();
1376
1261
1377
-
// Helper to accumulate deltas
1378
-
let mut accumulate_delta = |actor_id: i32, rkey: i64, delta: super::PostStatsDeltas| {
1379
-
stats_deltas.entry((actor_id, rkey))
1380
-
.and_modify(|existing| {
1381
-
existing.like_delta += delta.like_delta;
1382
-
existing.reply_delta += delta.reply_delta;
1383
-
existing.repost_delta += delta.repost_delta;
1384
-
existing.quote_delta += delta.quote_delta;
1385
-
})
1386
-
.or_insert(delta);
1262
+
// Helper to accumulate deltas (DEPRECATED - arrays updated directly now)
1263
+
let mut accumulate_delta = |_actor_id: i32, _rkey: i64, _delta: super::PostStatsDeltas| {
1264
+
// No-op: engagement is now tracked via arrays, updated directly during operations
1387
1265
};
1388
1266
1389
1267
// Helper to accumulate actor deltas
···
1488
1366
);
1489
1367
}
1490
1368
1491
-
// Batch update post stats in the database
1492
-
if !stats_deltas.is_empty() {
1493
-
if let Err(e) = batch_update_post_stats(&mut conn, stats_deltas).await {
1494
-
tracing::error!(worker = name, "Failed to batch update post stats: {}", e);
1495
-
counter!("batch_writer_stats_update_error").increment(1);
1496
-
}
1497
-
}
1369
+
// Note: Post stats (likes, replies, quotes, reposts) are now tracked via arrays,
1370
+
// updated directly during operation processing - no batch updates needed
1498
1371
1499
1372
// Batch update actor stats in the database
1500
1373
if !actor_stats_deltas.is_empty() {
+1
-1
consumer/src/db/operations/feed.rs
+1
-1
consumer/src/db/operations/feed.rs
···
33
33
pub use helpers::{ensure_list_id, get_actor_id};
34
34
pub use like::{like_delete, like_insert, LikeSubject};
35
35
pub use post::{post_delete, post_insert};
36
-
pub use post_update::{PostUpdate, PostUpdateTarget, PostUpdateReturning, PostUpdateResult, LikeArrayOp, CountDeltas, RecordDetachedOp};
36
+
pub use post_update::{PostUpdate, PostUpdateTarget, PostUpdateReturning, PostUpdateResult, LikeArrayOp, RecordDetachedOp};
37
37
pub use postgate::{postgate_delete, postgate_upsert};
38
38
pub use repost::{repost_delete, repost_insert};
39
39
pub use threadgate::{threadgate_delete, threadgate_get, threadgate_upsert};
+2
-2
consumer/src/db/operations/feed/like.rs
+2
-2
consumer/src/db/operations/feed/like.rs
···
237
237
) -> Result<Option<LikeSubject>> {
238
238
// Try removing from posts arrays first (most common case)
239
239
// Use array_remove() - much simpler than manual array filtering!
240
+
// Note: like_count column removed in array-only migration
240
241
let res = conn
241
242
.query_opt(
242
243
"UPDATE posts
243
244
SET
244
245
like_actor_ids = array_remove(like_actor_ids, $2),
245
246
like_rkeys = array_remove(like_rkeys, $1),
246
-
like_via_repost_data = like_via_repost_data - $2::text,
247
-
like_count = cardinality(array_remove(like_actor_ids, $2))
247
+
like_via_repost_data = like_via_repost_data - $2::text
248
248
WHERE $2 = ANY(like_actor_ids)
249
249
AND $1 = ANY(like_rkeys)
250
250
RETURNING actor_id, rkey",
+6
-97
consumer/src/db/operations/feed/post_update.rs
+6
-97
consumer/src/db/operations/feed/post_update.rs
···
70
70
},
71
71
}
72
72
73
-
/// Count deltas for batch updates
74
-
#[derive(Debug, Clone, Default)]
75
-
pub struct CountDeltas {
76
-
pub like_count: Option<i32>,
77
-
pub reply_count: Option<i32>,
78
-
pub repost_count: Option<i32>,
79
-
pub quote_count: Option<i32>,
80
-
}
81
-
82
73
/// Postgate detach calculation
83
74
#[derive(Debug, Clone)]
84
75
pub enum RecordDetachedOp {
···
125
116
/// Field updates (all optional)
126
117
pub status: Option<PostStatus>,
127
118
pub like_array_op: Option<LikeArrayOp>,
128
-
pub count_deltas: Option<HashMap<(i32, i64), CountDeltas>>,
129
119
pub record_detached: Option<RecordDetachedOp>,
130
120
pub tokens: Option<Vec<String>>,
131
121
···
142
132
},
143
133
status: None,
144
134
like_array_op: None,
145
-
count_deltas: None,
146
135
record_detached: None,
147
136
tokens: None,
148
137
returning: PostUpdateReturning::None,
···
300
289
param_idx += 3;
301
290
}
302
291
303
-
// Update like_count
304
-
clauses.push(format!(
305
-
"like_count = cardinality(like_actor_ids || ${})",
306
-
actor_param_idx
307
-
));
292
+
// Note: like_count column removed in array-only migration
293
+
// Count is now computed on-demand via array_length(like_actor_ids)
308
294
}
309
295
310
296
LikeArrayOp::AppendBatch { likes_by_post } => {
···
366
352
case_repost_data
367
353
));
368
354
369
-
clauses.push("like_count = cardinality(like_actor_ids) + 1".to_string());
355
+
// Note: like_count column removed in array-only migration
356
+
// Count is now computed on-demand via array_length(like_actor_ids)
370
357
}
371
358
372
359
LikeArrayOp::Remove {
···
390
377
actor_param_idx
391
378
));
392
379
393
-
// Update like_count
394
-
clauses.push(format!(
395
-
"like_count = cardinality(array_remove(like_actor_ids, ${}))",
396
-
actor_param_idx
397
-
));
380
+
// Note: like_count column removed in array-only migration
381
+
// Count is now computed on-demand via array_length(like_actor_ids)
398
382
}
399
383
}
400
-
}
401
-
402
-
// Count deltas (batch updates with CASE expressions)
403
-
if let Some(deltas_map) = &self.count_deltas {
404
-
// Build CASE expressions for each count field
405
-
// Format: like_count = CASE (actor_id, rkey) WHEN (1, 2) THEN LEAST(32767, GREATEST(0, COALESCE(like_count, 0) + delta)) WHEN ... END
406
-
407
-
let mut case_like = String::new();
408
-
let mut case_reply = String::new();
409
-
let mut case_repost = String::new();
410
-
let mut case_quote = String::new();
411
-
412
-
for (idx, ((post_actor_id, post_rkey), deltas)) in deltas_map.iter().enumerate() {
413
-
if idx > 0 {
414
-
case_like.push(' ');
415
-
case_reply.push(' ');
416
-
case_repost.push(' ');
417
-
case_quote.push(' ');
418
-
}
419
-
420
-
// Build CASE clauses with bounds checking (0-32767 for smallint)
421
-
if let Some(like_delta) = deltas.like_count {
422
-
case_like.push_str(&format!(
423
-
"WHEN ({}, {}) THEN LEAST(32767, GREATEST(0, COALESCE(like_count, 0) + {}))::smallint",
424
-
post_actor_id, post_rkey, like_delta
425
-
));
426
-
} else {
427
-
case_like.push_str(&format!(
428
-
"WHEN ({}, {}) THEN like_count",
429
-
post_actor_id, post_rkey
430
-
));
431
-
}
432
-
433
-
if let Some(reply_delta) = deltas.reply_count {
434
-
case_reply.push_str(&format!(
435
-
"WHEN ({}, {}) THEN LEAST(32767, GREATEST(0, COALESCE(reply_count, 0) + {}))::smallint",
436
-
post_actor_id, post_rkey, reply_delta
437
-
));
438
-
} else {
439
-
case_reply.push_str(&format!(
440
-
"WHEN ({}, {}) THEN reply_count",
441
-
post_actor_id, post_rkey
442
-
));
443
-
}
444
-
445
-
if let Some(repost_delta) = deltas.repost_count {
446
-
case_repost.push_str(&format!(
447
-
"WHEN ({}, {}) THEN LEAST(32767, GREATEST(0, COALESCE(repost_count, 0) + {}))::smallint",
448
-
post_actor_id, post_rkey, repost_delta
449
-
));
450
-
} else {
451
-
case_repost.push_str(&format!(
452
-
"WHEN ({}, {}) THEN repost_count",
453
-
post_actor_id, post_rkey
454
-
));
455
-
}
456
-
457
-
if let Some(quote_delta) = deltas.quote_count {
458
-
case_quote.push_str(&format!(
459
-
"WHEN ({}, {}) THEN LEAST(32767, GREATEST(0, COALESCE(quote_count, 0) + {}))::smallint",
460
-
post_actor_id, post_rkey, quote_delta
461
-
));
462
-
} else {
463
-
case_quote.push_str(&format!(
464
-
"WHEN ({}, {}) THEN quote_count",
465
-
post_actor_id, post_rkey
466
-
));
467
-
}
468
-
}
469
-
470
-
// Add SET clauses with CASE expressions
471
-
clauses.push(format!("like_count = CASE (actor_id, rkey) {} END", case_like));
472
-
clauses.push(format!("reply_count = CASE (actor_id, rkey) {} END", case_reply));
473
-
clauses.push(format!("repost_count = CASE (actor_id, rkey) {} END", case_repost));
474
-
clauses.push(format!("quote_count = CASE (actor_id, rkey) {} END", case_quote));
475
384
}
476
385
477
386
// Record detached (postgate)
+3
-9
consumer/tests/feed_operations_test.rs
+3
-9
consumer/tests/feed_operations_test.rs
···
512
512
);
513
513
assert_eq!(result.wrap_err("Operation failed")?, 1, "Should insert 1 like");
514
514
515
-
// Verify like was created in embedded arrays
515
+
// Verify like was created in embedded arrays (array-only tracking)
516
516
let like_row = tx
517
517
.query_one(
518
518
"SELECT
519
519
CASE WHEN $2 = ANY(p.like_actor_ids) THEN 1 ELSE 0 END as like_exists,
520
-
p.like_count,
521
520
array_length(p.like_actor_ids, 1) as array_len
522
521
FROM posts p
523
522
INNER JOIN actors pa ON p.actor_id = pa.id
···
528
527
.wrap_err("Failed to query like")?;
529
528
530
529
let like_exists: i32 = like_row.get(0);
531
-
let like_count: Option<i32> = like_row.get(1);
532
-
let array_len: Option<i32> = like_row.get(2);
530
+
let array_len: Option<i32> = like_row.get(1);
533
531
534
532
assert_eq!(like_exists, 1, "Liker should be in like_actor_ids array");
535
-
assert_eq!(like_count, Some(1), "like_count should be 1");
536
533
assert_eq!(array_len, Some(1), "like_actor_ids array should have 1 element");
537
534
Ok(())
538
535
}
···
630
627
.query_one(
631
628
"SELECT
632
629
CASE WHEN $2 = ANY(p.like_actor_ids) THEN 1 ELSE 0 END as like_exists,
633
-
p.like_count,
634
630
array_length(p.like_actor_ids, 1) as array_len
635
631
FROM posts p
636
632
INNER JOIN actors pa ON p.actor_id = pa.id
···
641
637
.wrap_err("Failed to query post")?;
642
638
643
639
let like_exists: i32 = like_row.get(0);
644
-
let like_count: Option<i32> = like_row.get(1);
645
-
let array_len: Option<i32> = like_row.get(2);
640
+
let array_len: Option<i32> = like_row.get(1);
646
641
647
642
assert_eq!(like_exists, 0, "Liker should NOT be in like_actor_ids array");
648
-
assert_eq!(like_count, Some(0), "like_count should be 0 after deletion");
649
643
assert!(array_len.is_none() || array_len == Some(0), "like_actor_ids array should be empty or NULL");
650
644
Ok(())
651
645
}
+39
migrations/2025-12-06-224848_array_only_engagement/down.sql
+39
migrations/2025-12-06-224848_array_only_engagement/down.sql
···
1
+
-- Rollback: Restore count columns from arrays
2
+
-- This migration reverses the array-only engagement tracking
3
+
4
+
-- Step 1: Restore count columns
5
+
ALTER TABLE posts
6
+
ADD COLUMN like_count integer DEFAULT 0 NOT NULL,
7
+
ADD COLUMN reply_count integer DEFAULT 0 NOT NULL,
8
+
ADD COLUMN repost_count integer DEFAULT 0 NOT NULL,
9
+
ADD COLUMN quote_count integer DEFAULT 0 NOT NULL;
10
+
11
+
-- Step 2: Restore counts from arrays
12
+
-- Use COALESCE to handle NULL arrays (treat as 0)
13
+
UPDATE posts SET
14
+
like_count = COALESCE(array_length(like_actor_ids, 1), 0),
15
+
reply_count = COALESCE(array_length(reply_actor_ids, 1), 0),
16
+
repost_count = COALESCE(array_length(repost_actor_ids, 1), 0),
17
+
quote_count = COALESCE(array_length(quote_actor_ids, 1), 0);
18
+
19
+
-- Step 3: Restore indexes on count columns
20
+
CREATE INDEX idx_posts_like_count_desc ON posts (like_count DESC);
21
+
CREATE INDEX idx_posts_engagement_score ON posts (
22
+
(like_count + reply_count * 2 + repost_count) DESC
23
+
);
24
+
25
+
-- Step 4: Drop array columns and indexes (keep like arrays from previous migration)
26
+
DROP INDEX IF EXISTS idx_posts_reply_actor_ids_gin;
27
+
DROP INDEX IF EXISTS idx_posts_quote_actor_ids_gin;
28
+
DROP INDEX IF EXISTS idx_posts_repost_actor_ids_gin;
29
+
30
+
ALTER TABLE posts
31
+
DROP COLUMN reply_actor_ids,
32
+
DROP COLUMN reply_rkeys,
33
+
DROP COLUMN quote_actor_ids,
34
+
DROP COLUMN quote_rkeys,
35
+
DROP COLUMN repost_actor_ids,
36
+
DROP COLUMN repost_rkeys;
37
+
38
+
-- Note: We keep like_actor_ids and like_rkeys from the previous migration
39
+
-- The like_count column is now restored and will be maintained alongside arrays
+133
migrations/2025-12-06-224848_array_only_engagement/up.sql
+133
migrations/2025-12-06-224848_array_only_engagement/up.sql
···
1
+
-- Array-Only Engagement Tracking Migration
2
+
-- Replace {like,reply,repost,quote}_count columns with array-only tracking
3
+
-- Compute counts on-demand via array_length()
4
+
5
+
-- Step 1: Add new array columns for replies, quotes, and reposts
6
+
-- (like arrays already exist from previous migration)
7
+
ALTER TABLE posts
8
+
ADD COLUMN reply_actor_ids integer[],
9
+
ADD COLUMN reply_rkeys bigint[],
10
+
ADD COLUMN quote_actor_ids integer[],
11
+
ADD COLUMN quote_rkeys bigint[],
12
+
ADD COLUMN repost_actor_ids integer[],
13
+
ADD COLUMN repost_rkeys bigint[];
14
+
15
+
-- Step 2: Backfill replies from posts.parent_post_* relationships
16
+
-- Group by parent post and collect all reply actor_ids and rkeys
17
+
UPDATE posts p
18
+
SET
19
+
reply_actor_ids = COALESCE(r.actor_ids, ARRAY[]::integer[]),
20
+
reply_rkeys = COALESCE(r.rkeys, ARRAY[]::bigint[])
21
+
FROM (
22
+
SELECT
23
+
parent_post_actor_id,
24
+
parent_post_rkey,
25
+
array_agg(actor_id ORDER BY rkey) as actor_ids,
26
+
array_agg(rkey ORDER BY rkey) as rkeys
27
+
FROM posts
28
+
WHERE parent_post_actor_id IS NOT NULL
29
+
GROUP BY parent_post_actor_id, parent_post_rkey
30
+
) r
31
+
WHERE p.actor_id = r.parent_post_actor_id
32
+
AND p.rkey = r.parent_post_rkey;
33
+
34
+
-- Step 3: Backfill quotes from posts.embedded_post_* relationships
35
+
-- Quotes are posts with embedded_post but NO parent (replies have parents)
36
+
UPDATE posts p
37
+
SET
38
+
quote_actor_ids = COALESCE(q.actor_ids, ARRAY[]::integer[]),
39
+
quote_rkeys = COALESCE(q.rkeys, ARRAY[]::bigint[])
40
+
FROM (
41
+
SELECT
42
+
embedded_post_actor_id,
43
+
embedded_post_rkey,
44
+
array_agg(actor_id ORDER BY rkey) as actor_ids,
45
+
array_agg(rkey ORDER BY rkey) as rkeys
46
+
FROM posts
47
+
WHERE embedded_post_actor_id IS NOT NULL
48
+
AND parent_post_actor_id IS NULL -- Quotes don't have parents (not replies)
49
+
GROUP BY embedded_post_actor_id, embedded_post_rkey
50
+
) q
51
+
WHERE p.actor_id = q.embedded_post_actor_id
52
+
AND p.rkey = q.embedded_post_rkey;
53
+
54
+
-- Step 4: Backfill reposts from reposts table
55
+
UPDATE posts p
56
+
SET
57
+
repost_actor_ids = COALESCE(r.actor_ids, ARRAY[]::integer[]),
58
+
repost_rkeys = COALESCE(r.rkeys, ARRAY[]::bigint[])
59
+
FROM (
60
+
SELECT
61
+
post_actor_id,
62
+
post_rkey,
63
+
array_agg(actor_id ORDER BY rkey) as actor_ids,
64
+
array_agg(rkey ORDER BY rkey) as rkeys
65
+
FROM reposts
66
+
GROUP BY post_actor_id, post_rkey
67
+
) r
68
+
WHERE p.actor_id = r.post_actor_id
69
+
AND p.rkey = r.post_rkey;
70
+
71
+
-- Step 5: Create GIN indexes for array lookups
72
+
-- These enable fast "who replied/quoted/reposted?" queries
73
+
CREATE INDEX idx_posts_reply_actor_ids_gin ON posts USING GIN (reply_actor_ids);
74
+
CREATE INDEX idx_posts_quote_actor_ids_gin ON posts USING GIN (quote_actor_ids);
75
+
CREATE INDEX idx_posts_repost_actor_ids_gin ON posts USING GIN (repost_actor_ids);
76
+
77
+
-- Step 6: Validate backfill (should return 0 rows with mismatches)
78
+
-- This ensures arrays match the existing count columns before we drop them
79
+
DO $$
80
+
DECLARE
81
+
mismatches integer;
82
+
like_mismatches integer;
83
+
reply_mismatches integer;
84
+
quote_mismatches integer;
85
+
repost_mismatches integer;
86
+
BEGIN
87
+
-- Check reply counts
88
+
SELECT COUNT(*) INTO reply_mismatches
89
+
FROM posts
90
+
WHERE COALESCE(array_length(reply_actor_ids, 1), 0) != COALESCE(reply_count, 0);
91
+
92
+
-- Check quote counts
93
+
SELECT COUNT(*) INTO quote_mismatches
94
+
FROM posts
95
+
WHERE COALESCE(array_length(quote_actor_ids, 1), 0) != COALESCE(quote_count, 0);
96
+
97
+
-- Check repost counts
98
+
SELECT COUNT(*) INTO repost_mismatches
99
+
FROM posts
100
+
WHERE COALESCE(array_length(repost_actor_ids, 1), 0) != COALESCE(repost_count, 0);
101
+
102
+
-- Check like counts (should already be correct from previous migration)
103
+
SELECT COUNT(*) INTO like_mismatches
104
+
FROM posts
105
+
WHERE COALESCE(array_length(like_actor_ids, 1), 0) != COALESCE(like_count, 0);
106
+
107
+
mismatches := like_mismatches + reply_mismatches + quote_mismatches + repost_mismatches;
108
+
109
+
IF mismatches > 0 THEN
110
+
-- WARNING instead of EXCEPTION: Count columns may be stale (that's why we're migrating!)
111
+
-- Arrays are built from actual FK relationships, so they're the source of truth.
112
+
RAISE WARNING 'Array backfill differs from old counts: % total (likes: %, replies: %, quotes: %, reposts: %). This is expected if counts drifted from reality.',
113
+
mismatches, like_mismatches, reply_mismatches, quote_mismatches, repost_mismatches;
114
+
RAISE NOTICE 'Arrays are built from actual relationships and will be the new source of truth.';
115
+
ELSE
116
+
RAISE NOTICE 'Backfill validation passed: all counts match arrays';
117
+
RAISE NOTICE ' - Like arrays: correct';
118
+
RAISE NOTICE ' - Reply arrays: correct';
119
+
RAISE NOTICE ' - Quote arrays: correct';
120
+
RAISE NOTICE ' - Repost arrays: correct';
121
+
END IF;
122
+
END $$;
123
+
124
+
-- Step 7: Drop old indexes that depend on count columns
125
+
DROP INDEX IF EXISTS idx_posts_like_count_desc;
126
+
DROP INDEX IF EXISTS idx_posts_engagement_score;
127
+
128
+
-- Step 8: Drop count columns (arrays are now the single source of truth)
129
+
ALTER TABLE posts
130
+
DROP COLUMN like_count,
131
+
DROP COLUMN reply_count,
132
+
DROP COLUMN repost_count,
133
+
DROP COLUMN quote_count;
+38
-13
parakeet-db/src/models.rs
+38
-13
parakeet-db/src/models.rs
···
205
205
pub facet_7: Option<crate::composite_types::FacetEmbed>,
206
206
pub facet_8: Option<crate::composite_types::FacetEmbed>,
207
207
pub mentions: Option<Vec<Option<i32>>>, // Array of actor_ids
208
-
// Aggregate stats (denormalized for performance, NULL = 0)
209
-
// INTEGER in PostgreSQL = Option<i32> in Rust (changed from SMALLINT to prevent overflow)
210
-
pub like_count: Option<i32>,
211
-
pub reply_count: Option<i32>,
212
-
pub repost_count: Option<i32>,
213
-
pub quote_count: Option<i32>,
214
-
// Embedded likes
215
-
// like_actor_ids[i] pairs with like_rkeys[i]
208
+
// Embedded engagement arrays (array-only tracking, compute counts via array_length())
209
+
// Likes: like_actor_ids[i] pairs with like_rkeys[i]
216
210
pub like_actor_ids: Option<Vec<i32>>, // [actor1, actor2, ...] (who liked)
217
211
pub like_rkeys: Option<Vec<i64>>, // [rkey1, rkey2, ...] (when they liked)
218
212
// Via repost tracking as JSONB: {"30": {"actor_id": 15, "rkey": 999}, ...}
219
213
// Key: liker's actor_id (as string), Value: {actor_id, rkey} of repost they came via
220
214
pub like_via_repost_data: Option<serde_json::Value>,
221
-
// Note: created_at derived from TID rkey via created_at() method
215
+
// Replies: reply_actor_ids[i] pairs with reply_rkeys[i]
216
+
pub reply_actor_ids: Option<Vec<i32>>, // [actor1, actor2, ...] (who replied)
217
+
pub reply_rkeys: Option<Vec<i64>>, // [rkey1, rkey2, ...] (when they replied)
218
+
// Quotes: quote_actor_ids[i] pairs with quote_rkeys[i]
219
+
pub quote_actor_ids: Option<Vec<i32>>, // [actor1, actor2, ...] (who quoted)
220
+
pub quote_rkeys: Option<Vec<i64>>, // [rkey1, rkey2, ...] (when they quoted)
221
+
// Reposts: repost_actor_ids[i] pairs with repost_rkeys[i]
222
+
pub repost_actor_ids: Option<Vec<i32>>, // [actor1, actor2, ...] (who reposted)
223
+
pub repost_rkeys: Option<Vec<i64>>, // [rkey1, rkey2, ...] (when they reposted)
224
+
// Note: created_at derived from TID rkey via created_at() method
222
225
}
223
226
224
227
impl Post {
···
285
288
.map(|v| v.len())
286
289
.unwrap_or(0)
287
290
}
291
+
292
+
// Count helpers (compute from array lengths, replacing old count columns)
293
+
294
+
/// Get like count from array length
295
+
pub fn like_count(&self) -> usize {
296
+
self.like_actor_ids.as_ref().map_or(0, |v| v.len())
297
+
}
298
+
299
+
/// Get reply count from array length
300
+
pub fn reply_count(&self) -> usize {
301
+
self.reply_actor_ids.as_ref().map_or(0, |v| v.len())
302
+
}
303
+
304
+
/// Get quote count from array length
305
+
pub fn quote_count(&self) -> usize {
306
+
self.quote_actor_ids.as_ref().map_or(0, |v| v.len())
307
+
}
308
+
309
+
/// Get repost count from array length
310
+
pub fn repost_count(&self) -> usize {
311
+
self.repost_actor_ids.as_ref().map_or(0, |v| v.len())
312
+
}
288
313
}
289
314
290
315
/// Represents a single like extracted from post arrays
···
315
340
impl PostStats {
316
341
pub fn from_post(post: &Post) -> Self {
317
342
Self {
318
-
likes: post.like_count.unwrap_or(0) as i32,
319
-
replies: post.reply_count.unwrap_or(0) as i32,
320
-
reposts: post.repost_count.unwrap_or(0) as i32,
321
-
quotes: post.quote_count.unwrap_or(0) as i32,
343
+
likes: post.like_count() as i32,
344
+
replies: post.reply_count() as i32,
345
+
reposts: post.repost_count() as i32,
346
+
quotes: post.quote_count() as i32,
322
347
}
323
348
}
324
349
+6
-16
parakeet-db/src/schema.rs
+6
-16
parakeet-db/src/schema.rs
···
474
474
}
475
475
476
476
diesel::table! {
477
-
post_likes (actor_id, rkey) {
478
-
actor_id -> Int4,
479
-
rkey -> Int8,
480
-
post_actor_id -> Int4,
481
-
post_rkey -> Int8,
482
-
via_repost_actor_id -> Nullable<Int4>,
483
-
via_repost_rkey -> Nullable<Int8>,
484
-
}
485
-
}
486
-
487
-
diesel::table! {
488
477
postgate_detached (post_actor_id, post_rkey, detached_post_actor_id, detached_post_rkey) {
489
478
post_actor_id -> Int4,
490
479
post_rkey -> Int8,
···
551
540
facet_7 -> Nullable<PostFacetEmbed>,
552
541
facet_8 -> Nullable<PostFacetEmbed>,
553
542
mentions -> Nullable<Array<Nullable<Int4>>>,
554
-
like_count -> Nullable<Int4>,
555
-
reply_count -> Nullable<Int4>,
556
-
repost_count -> Nullable<Int4>,
557
-
quote_count -> Nullable<Int4>,
558
543
like_actor_ids -> Nullable<Array<Int4>>,
559
544
like_rkeys -> Nullable<Array<Int8>>,
560
545
like_via_repost_data -> Nullable<Jsonb>,
546
+
reply_actor_ids -> Nullable<Array<Int4>>,
547
+
reply_rkeys -> Nullable<Array<Int8>>,
548
+
quote_actor_ids -> Nullable<Array<Int4>>,
549
+
quote_rkeys -> Nullable<Array<Int8>>,
550
+
repost_actor_ids -> Nullable<Array<Int4>>,
551
+
repost_rkeys -> Nullable<Array<Int8>>,
561
552
}
562
553
}
563
554
···
712
703
mutes,
713
704
notifications,
714
705
post_aggregate_stats,
715
-
post_likes,
716
706
postgate_detached,
717
707
postgates,
718
708
posts,
+1
-1
parakeet/src/loaders/mod.rs
+1
-1
parakeet/src/loaders/mod.rs
···
19
19
pub use labeler::{EnrichedLabeler, LabelLoader, LabelServiceLoader, LabelServiceLoaderRet};
20
20
pub use list::{EnrichedList, ListLoader, ListLoaderRet, ListStateLoader};
21
21
pub use misc::{EnrichedStarterPack, EnrichedVerification, StarterPackLoader, StarterPackLoaderRet, VerificationLoader};
22
-
pub use post::{EnrichedThreadgate, HydratedPost, PostLoader, PostLoaderRet, PostStateLoader};
22
+
pub use post::{EnrichedThreadgate, HydratedPost, PostLoader, PostLoaderRet, PostStateLoader, PostWithComputed};
23
23
pub use profile::{
24
24
EnrichedStatus, HandleLoader, Profile, ProfileByIdLoader, ProfileLoader, ProfileLoaderRet, ProfileStateLoader, ProfileStatsLoader,
25
25
};
+118
-111
parakeet/src/loaders/post.rs
+118
-111
parakeet/src/loaders/post.rs
···
59
59
}
60
60
}
61
61
62
+
/// Struct to capture SQL query results from build_posts_batch_query()
63
+
///
64
+
/// This struct MUST match the SELECT columns in build_posts_batch_query() exactly.
65
+
/// It's public for testing - tests can use this to validate the SQL query is executable.
66
+
/// If the query changes but this struct doesn't (or vice versa), tests will fail.
67
+
#[derive(diesel::QueryableByName)]
68
+
#[allow(dead_code, reason = "Diesel QueryableByName requires all SQL columns even if unused")]
69
+
pub struct PostWithComputed {
70
+
#[diesel(sql_type = diesel::sql_types::Integer)]
71
+
pub actor_id: i32,
72
+
#[diesel(sql_type = diesel::sql_types::BigInt)]
73
+
pub rkey: i64,
74
+
#[diesel(sql_type = diesel::sql_types::Binary)]
75
+
pub cid: Vec<u8>,
76
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
77
+
pub content: Option<Vec<u8>>,
78
+
#[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<parakeet_db::schema::sql_types::LanguageCode>>)]
79
+
pub langs: Vec<Option<parakeet_db::types::LanguageCode>>,
80
+
#[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<diesel::sql_types::Text>>)]
81
+
pub tags: Vec<Option<String>>,
82
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
83
+
pub parent_post_actor_id: Option<i32>,
84
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
85
+
pub parent_post_rkey: Option<i64>,
86
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
87
+
pub root_post_actor_id: Option<i32>,
88
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
89
+
pub root_post_rkey: Option<i64>,
90
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::EmbedType>)]
91
+
pub embed_type: Option<parakeet_db::types::EmbedType>,
92
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::EmbedType>)]
93
+
pub embed_subtype: Option<parakeet_db::types::EmbedType>,
94
+
#[diesel(sql_type = diesel::sql_types::Bool)]
95
+
pub violates_threadgate: bool,
96
+
#[diesel(sql_type = parakeet_db::schema::sql_types::PostStatus)]
97
+
pub status: parakeet_db::types::PostStatus,
98
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostVideoEmbed>)]
99
+
pub video_embed: Option<parakeet_db::composite_types::VideoEmbed>,
100
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostExtEmbed>)]
101
+
pub ext_embed: Option<parakeet_db::composite_types::ExtEmbed>,
102
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostImageEmbed>)]
103
+
pub image_1: Option<parakeet_db::composite_types::ImageEmbed>,
104
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostImageEmbed>)]
105
+
pub image_2: Option<parakeet_db::composite_types::ImageEmbed>,
106
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostImageEmbed>)]
107
+
pub image_3: Option<parakeet_db::composite_types::ImageEmbed>,
108
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostImageEmbed>)]
109
+
pub image_4: Option<parakeet_db::composite_types::ImageEmbed>,
110
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
111
+
pub embedded_post_actor_id: Option<i32>,
112
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
113
+
pub embedded_post_rkey: Option<i64>,
114
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bool>)]
115
+
pub record_detached: Option<bool>,
116
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
117
+
pub facet_1: Option<parakeet_db::composite_types::FacetEmbed>,
118
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
119
+
pub facet_2: Option<parakeet_db::composite_types::FacetEmbed>,
120
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
121
+
pub facet_3: Option<parakeet_db::composite_types::FacetEmbed>,
122
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
123
+
pub facet_4: Option<parakeet_db::composite_types::FacetEmbed>,
124
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
125
+
pub facet_5: Option<parakeet_db::composite_types::FacetEmbed>,
126
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
127
+
pub facet_6: Option<parakeet_db::composite_types::FacetEmbed>,
128
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
129
+
pub facet_7: Option<parakeet_db::composite_types::FacetEmbed>,
130
+
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
131
+
pub facet_8: Option<parakeet_db::composite_types::FacetEmbed>,
132
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Array<diesel::sql_types::Integer>>)]
133
+
pub like_actor_ids: Option<Vec<i32>>,
134
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Array<diesel::sql_types::BigInt>>)]
135
+
pub like_rkeys: Option<Vec<i64>>,
136
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Jsonb>)]
137
+
pub like_via_repost_data: Option<serde_json::Value>,
138
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Array<diesel::sql_types::Integer>>)]
139
+
pub reply_actor_ids: Option<Vec<i32>>,
140
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Array<diesel::sql_types::BigInt>>)]
141
+
pub reply_rkeys: Option<Vec<i64>>,
142
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Array<diesel::sql_types::Integer>>)]
143
+
pub quote_actor_ids: Option<Vec<i32>>,
144
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Array<diesel::sql_types::BigInt>>)]
145
+
pub quote_rkeys: Option<Vec<i64>>,
146
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Array<diesel::sql_types::Integer>>)]
147
+
pub repost_actor_ids: Option<Vec<i32>>,
148
+
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Array<diesel::sql_types::BigInt>>)]
149
+
pub repost_rkeys: Option<Vec<i64>>,
150
+
}
151
+
62
152
/// Build SQL query for batch loading posts with computed fields
63
153
///
64
154
/// This function is public for testing purposes.
···
106
196
p.facet_6,
107
197
p.facet_7,
108
198
p.facet_8,
109
-
-- Aggregate stats (denormalized columns)
110
-
p.like_count,
111
-
p.reply_count,
112
-
p.repost_count,
113
-
p.quote_count
199
+
-- Engagement arrays (array-only tracking)
200
+
p.like_actor_ids,
201
+
p.like_rkeys,
202
+
p.like_via_repost_data,
203
+
p.reply_actor_ids,
204
+
p.reply_rkeys,
205
+
p.quote_actor_ids,
206
+
p.quote_rkeys,
207
+
p.repost_actor_ids,
208
+
p.repost_rkeys
114
209
FROM posts p
115
210
INNER JOIN unnest($1::integer[], $2::bigint[]) AS lookup(lookup_actor_id, lookup_rkey)
116
211
ON p.actor_id = lookup.lookup_actor_id AND p.rkey = lookup.lookup_rkey
···
449
544
450
545
let query = build_posts_batch_query();
451
546
452
-
// Define a struct to capture the SQL results (simplified - no LEFT JOIN fields)
453
-
#[derive(diesel::QueryableByName)]
454
-
#[allow(dead_code, reason = "Diesel QueryableByName requires all SQL columns even if unused")]
455
-
struct PostWithComputed {
456
-
// Actual fields from posts table
457
-
#[diesel(sql_type = diesel::sql_types::Integer)]
458
-
actor_id: i32,
459
-
#[diesel(sql_type = diesel::sql_types::BigInt)]
460
-
rkey: i64,
461
-
#[diesel(sql_type = diesel::sql_types::Binary)]
462
-
cid: Vec<u8>,
463
-
// Note: created_at computed from rkey in Rust via tid_to_datetime()
464
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
465
-
content: Option<Vec<u8>>, // Compressed content (BYTEA)
466
-
#[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<parakeet_db::schema::sql_types::LanguageCode>>)]
467
-
langs: Vec<Option<parakeet_db::types::LanguageCode>>,
468
-
#[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<diesel::sql_types::Text>>)]
469
-
tags: Vec<Option<String>>,
470
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
471
-
parent_post_actor_id: Option<i32>,
472
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
473
-
parent_post_rkey: Option<i64>,
474
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
475
-
root_post_actor_id: Option<i32>,
476
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
477
-
root_post_rkey: Option<i64>,
478
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::EmbedType>)]
479
-
embed_type: Option<parakeet_db::types::EmbedType>,
480
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::EmbedType>)]
481
-
embed_subtype: Option<parakeet_db::types::EmbedType>,
482
-
#[diesel(sql_type = diesel::sql_types::Bool)]
483
-
violates_threadgate: bool,
484
-
#[diesel(sql_type = parakeet_db::schema::sql_types::PostStatus)]
485
-
status: parakeet_db::types::PostStatus,
486
-
// Embed composite fields
487
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostVideoEmbed>)]
488
-
video_embed: Option<parakeet_db::composite_types::VideoEmbed>,
489
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostExtEmbed>)]
490
-
ext_embed: Option<parakeet_db::composite_types::ExtEmbed>,
491
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostImageEmbed>)]
492
-
image_1: Option<parakeet_db::composite_types::ImageEmbed>,
493
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostImageEmbed>)]
494
-
image_2: Option<parakeet_db::composite_types::ImageEmbed>,
495
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostImageEmbed>)]
496
-
image_3: Option<parakeet_db::composite_types::ImageEmbed>,
497
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostImageEmbed>)]
498
-
image_4: Option<parakeet_db::composite_types::ImageEmbed>,
499
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
500
-
embedded_post_actor_id: Option<i32>,
501
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
502
-
embedded_post_rkey: Option<i64>,
503
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bool>)]
504
-
record_detached: Option<bool>,
505
-
// Facet composite fields (loaded inline to avoid separate query)
506
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
507
-
facet_1: Option<parakeet_db::composite_types::FacetEmbed>,
508
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
509
-
facet_2: Option<parakeet_db::composite_types::FacetEmbed>,
510
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
511
-
facet_3: Option<parakeet_db::composite_types::FacetEmbed>,
512
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
513
-
facet_4: Option<parakeet_db::composite_types::FacetEmbed>,
514
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
515
-
facet_5: Option<parakeet_db::composite_types::FacetEmbed>,
516
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
517
-
facet_6: Option<parakeet_db::composite_types::FacetEmbed>,
518
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
519
-
facet_7: Option<parakeet_db::composite_types::FacetEmbed>,
520
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::PostFacetEmbed>)]
521
-
facet_8: Option<parakeet_db::composite_types::FacetEmbed>,
522
-
// Aggregate stats (denormalized columns - changed from SmallInt to Integer)
523
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
524
-
like_count: Option<i32>,
525
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
526
-
reply_count: Option<i32>,
527
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
528
-
repost_count: Option<i32>,
529
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)]
530
-
quote_count: Option<i32>,
531
-
// Note: author_did comes from posts_with_keys, not from query result
532
-
}
547
+
// PostWithComputed struct is now defined at module level for reuse in tests
533
548
534
549
let posts_query_start = std::time::Instant::now();
535
550
let posts_with_computed: Vec<PostWithComputed> = diesel_async::RunQueryDsl::load(
···
896
911
facet_7: None,
897
912
facet_8: None,
898
913
mentions: None,
899
-
// Denormalized stats (loaded from columns)
900
-
like_count: p.like_count,
901
-
reply_count: p.reply_count,
902
-
repost_count: p.repost_count,
903
-
quote_count: p.quote_count,
904
-
// Embedded likes (not loaded for hydration - viewer state loaded separately)
905
-
like_actor_ids: None,
906
-
like_rkeys: None,
907
-
like_via_repost_data: None,
914
+
// Engagement arrays (array-only tracking, counts computed via helper methods)
915
+
like_actor_ids: p.like_actor_ids.clone(),
916
+
like_rkeys: p.like_rkeys.clone(),
917
+
like_via_repost_data: p.like_via_repost_data.clone(),
918
+
reply_actor_ids: p.reply_actor_ids.clone(),
919
+
reply_rkeys: p.reply_rkeys.clone(),
920
+
quote_actor_ids: p.quote_actor_ids.clone(),
921
+
quote_rkeys: p.quote_rkeys.clone(),
922
+
repost_actor_ids: p.repost_actor_ids.clone(),
923
+
repost_rkeys: p.repost_rkeys.clone(),
908
924
};
909
925
910
926
// Encode TIDs using Rust utility functions
···
983
999
p.facet_6,
984
1000
p.facet_7,
985
1001
p.facet_8,
986
-
// Stats data (denormalized columns)
987
-
p.like_count,
988
-
p.reply_count,
989
-
p.repost_count,
990
-
p.quote_count,
991
1002
))
992
1003
})
993
1004
.collect();
···
997
1008
// Collect all mention actor IDs from facets to batch lookup DIDs
998
1009
let facets_start = std::time::Instant::now();
999
1010
let mut mention_actor_ids: Vec<i32> = Vec::new();
1000
-
for (_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, facet_1, facet_2, facet_3, facet_4, facet_5, facet_6, facet_7, facet_8, _, _, _, _) in &posts_with_computed_data {
1011
+
for (_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, facet_1, facet_2, facet_3, facet_4, facet_5, facet_6, facet_7, facet_8) in &posts_with_computed_data {
1001
1012
for facet in [facet_1, facet_2, facet_3, facet_4, facet_5, facet_6, facet_7, facet_8].iter().filter_map(|f| f.as_ref()) {
1002
1013
if let Some(actor_id) = facet.mention_actor_id {
1003
1014
mention_actor_ids.push(actor_id);
···
1023
1034
let threadgates_start = std::time::Instant::now();
1024
1035
let post_keys: Vec<(i32, i64)> = posts_with_computed_data
1025
1036
.iter()
1026
-
.map(|(post, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _)| (post.actor_id, post.rkey))
1037
+
.map(|(post, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _)| (post.actor_id, post.rkey))
1027
1038
.collect();
1028
1039
1029
1040
// Load threadgates separately - need to reconstruct post URIs from post_id FK
···
1216
1227
facet_6,
1217
1228
facet_7,
1218
1229
facet_8,
1219
-
like_count,
1220
-
reply_count,
1221
-
repost_count,
1222
-
quote_count,
1223
1230
)| {
1224
1231
// OPTIMIZED: Process facets from inline composite fields (no separate query)
1225
1232
let facet_vec = vec![facet_1, facet_2, facet_3, facet_4, facet_5, facet_6, facet_7, facet_8];
···
1258
1265
&created_at,
1259
1266
);
1260
1267
1261
-
// Create PostStats from denormalized columns
1268
+
// Create PostStats from array lengths using helper methods
1262
1269
let stats = parakeet_db::models::PostStats {
1263
-
likes: like_count.unwrap_or(0),
1264
-
replies: reply_count.unwrap_or(0),
1265
-
reposts: repost_count.unwrap_or(0),
1266
-
quotes: quote_count.unwrap_or(0),
1270
+
likes: post.like_count() as i32,
1271
+
replies: post.reply_count() as i32,
1272
+
reposts: post.repost_count() as i32,
1273
+
quotes: post.quote_count() as i32,
1267
1274
};
1268
1275
1269
1276
let hydrated_post = HydratedPost {
+3
-50
parakeet/tests/loaders_test.rs
+3
-50
parakeet/tests/loaders_test.rs
···
236
236
// Use the actual PostLoader query builder (no SQL duplication!)
237
237
let query = parakeet::loaders::build_posts_batch_query();
238
238
239
-
#[derive(diesel::QueryableByName)]
240
-
#[allow(dead_code)]
241
-
struct PostWithComputed {
242
-
#[diesel(sql_type = diesel::sql_types::BigInt)]
243
-
id: i64,
244
-
#[diesel(sql_type = diesel::sql_types::Integer)]
245
-
actor_id: i32,
246
-
#[diesel(sql_type = diesel::sql_types::BigInt)]
247
-
rkey: i64,
248
-
#[diesel(sql_type = diesel::sql_types::Binary)]
249
-
cid: Vec<u8>,
250
-
#[diesel(sql_type = diesel::sql_types::Timestamptz)]
251
-
created_at: chrono::DateTime<chrono::Utc>,
252
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
253
-
content: Option<Vec<u8>>,
254
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::SmallInt>)]
255
-
content_version: Option<i16>,
256
-
#[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<parakeet_db::schema::sql_types::LanguageCode>>)]
257
-
langs: Vec<Option<parakeet_db::types::LanguageCode>>,
258
-
#[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<diesel::sql_types::Text>>)]
259
-
tags: Vec<Option<String>>,
260
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
261
-
parent_post_id: Option<i64>,
262
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
263
-
root_post_id: Option<i64>,
264
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::EmbedType>)]
265
-
embed_type: Option<parakeet_db::types::EmbedType>,
266
-
#[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::EmbedType>)]
267
-
embed_subtype: Option<parakeet_db::types::EmbedType>,
268
-
#[diesel(sql_type = diesel::sql_types::Bool)]
269
-
violates_threadgate: bool,
270
-
#[diesel(sql_type = parakeet_db::schema::sql_types::PostStatus)]
271
-
status: parakeet_db::types::PostStatus,
272
-
#[diesel(sql_type = diesel::sql_types::Text)]
273
-
author_did: String,
274
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
275
-
parent_did: Option<String>,
276
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
277
-
parent_rkey: Option<i64>,
278
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
279
-
parent_cid_bytes: Option<Vec<u8>>,
280
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
281
-
root_did: Option<String>,
282
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
283
-
root_rkey: Option<i64>,
284
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
285
-
root_cid_bytes: Option<Vec<u8>>,
286
-
}
287
-
239
+
// Use the production PostWithComputed struct (no duplication!)
240
+
// If SQL query changes but struct doesn't match, this test will fail
288
241
let actor_ids: Vec<i32> = vec![];
289
242
let min_rkey = 0i64;
290
243
let max_rkey = i64::MAX;
···
294
247
.bind::<diesel::sql_types::Array<diesel::sql_types::BigInt>, _>(&rkeys)
295
248
.bind::<diesel::sql_types::BigInt, _>(min_rkey)
296
249
.bind::<diesel::sql_types::BigInt, _>(max_rkey)
297
-
.load::<PostWithComputed>(&mut conn)
250
+
.load::<parakeet::loaders::PostWithComputed>(&mut conn)
298
251
.await
299
252
.wrap_err("PostLoader main query SQL failed")?;
300
253