tangled
alpha
login
or
join now
quilling.dev
/
parakeet
forked from
parakeet.at/parakeet
1
fork
atom
Rust AppView - highly experimental!
1
fork
atom
overview
issues
pulls
pipelines
fix: use id cache
quilling.dev
4 months ago
c0921b07
27c4e18d
+301
-42
3 changed files
expand all
collapse all
unified
split
parakeet
src
db
graph.rs
db.rs
xrpc
app_bsky
graph
relations.rs
+2
-1
parakeet/src/db.rs
reviewed
···
39
39
pub use graph::{
40
40
get_actor_followers, get_actor_followers_by_id, get_actor_follows, get_actor_follows_by_id,
41
41
get_actor_lists, get_followed_by_batch, get_following_batch, get_list_id_by_uri, get_list_items,
42
42
-
get_mutual_followers, get_user_blocks, get_user_list_blocks, get_user_list_mutes, get_user_mutes,
42
42
+
get_mutual_followers, get_mutual_followers_by_id, get_user_blocks, get_user_list_blocks,
43
43
+
get_user_list_mutes, get_user_mutes,
43
44
};
44
45
pub use likes::{get_actor_likes, get_like_state, get_like_states, get_post_likes};
45
46
pub use notification_records::{get_follow_record, get_like_record, get_post_record, get_repost_record};
+45
parakeet/src/db/graph.rs
reviewed
···
386
386
})
387
387
}
388
388
389
389
+
/// Get known followers (mutual follows intersection) - OPTIMIZED VERSION
390
390
+
///
391
391
+
/// Returns list of (created_at, follower_actor_id) tuples for followers that viewer also follows.
392
392
+
/// The caller should resolve follower_actor_ids → DIDs via IdCache.
393
393
+
///
394
394
+
/// This version eliminates the 3x actors table JOINs that the non-optimized version has.
395
395
+
pub async fn get_mutual_followers_by_id(
396
396
+
conn: &mut AsyncPgConnection,
397
397
+
target_actor_id: i32,
398
398
+
viewer_actor_id: i32,
399
399
+
cursor_timestamp: Option<&chrono::DateTime<chrono::Utc>>,
400
400
+
limit: u8,
401
401
+
) -> QueryResult<Vec<(chrono::DateTime<chrono::Utc>, i32)>> {
402
402
+
#[derive(QueryableByName)]
403
403
+
struct MutualFollowerRow {
404
404
+
#[diesel(sql_type = Timestamptz)]
405
405
+
created_at: chrono::DateTime<chrono::Utc>,
406
406
+
#[diesel(sql_type = Integer)]
407
407
+
follower_actor_id: i32,
408
408
+
}
409
409
+
410
410
+
// OPTIMIZED: No actors JOINs! Uses actor_ids directly
411
411
+
diesel::sql_query(
412
412
+
"SELECT DISTINCT ON (f1.actor_id) tid_timestamp(f1.rkey) as created_at, f1.actor_id as follower_actor_id
413
413
+
FROM follows f1
414
414
+
INNER JOIN follows f2 ON f1.actor_id = f2.subject_actor_id
415
415
+
WHERE f1.subject_actor_id = $1
416
416
+
AND f2.actor_id = $2
417
417
+
AND ($3::timestamptz IS NULL OR tid_timestamp(f1.rkey) < $3)
418
418
+
ORDER BY f1.actor_id, f1.rkey DESC
419
419
+
LIMIT $4"
420
420
+
)
421
421
+
.bind::<Integer, _>(target_actor_id)
422
422
+
.bind::<Integer, _>(viewer_actor_id)
423
423
+
.bind::<Nullable<Timestamptz>, _>(cursor_timestamp)
424
424
+
.bind::<BigInt, _>(i64::from(limit))
425
425
+
.load::<MutualFollowerRow>(conn)
426
426
+
.await
427
427
+
.map(|rows| {
428
428
+
rows.into_iter()
429
429
+
.map(|r| (r.created_at, r.follower_actor_id))
430
430
+
.collect()
431
431
+
})
432
432
+
}
433
433
+
389
434
/// Get lists owned by an actor with cursor pagination
390
435
///
391
436
/// Returns list of (created_at, at_uri) tuples
+254
-41
parakeet/src/xrpc/app_bsky/graph/relations.rs
reviewed
···
72
72
// Parse TID cursor
73
73
let parsed_cursor = crate::xrpc::tid_cursor(query.cursor.as_ref());
74
74
75
75
-
// Parallelize: hydrate subject profile and query followers concurrently
76
76
-
let (subject_opt, results) = tokio::join!(
77
77
-
hyd.hydrate_profile(subj_did.clone()),
78
78
-
crate::db::get_actor_followers(&mut conn, &subj_did, parsed_cursor, limit)
79
79
-
);
75
75
+
// OPTIMIZATION: Resolve subject DID → actor_id via IdCache to use optimized query
76
76
+
let subject_actor_id_opt = state.id_cache.get_actor_id_only(&subj_did).await;
80
77
78
78
+
// Hydrate subject profile first (needed either way)
79
79
+
let subject_opt = hyd.hydrate_profile(subj_did.clone()).await;
81
80
let Some(subject) = subject_opt else {
82
81
return Err(Error::not_found());
83
82
};
84
83
85
85
-
let results = results?;
84
84
+
// Query followers using optimized or non-optimized path
85
85
+
let (cursor, dids) = if let Some(subject_actor_id) = subject_actor_id_opt {
86
86
+
// OPTIMIZED PATH: Use _by_id version (0 JOINs!)
87
87
+
let results = crate::db::get_actor_followers_by_id(&mut conn, subject_actor_id, parsed_cursor, limit).await?;
86
88
87
87
-
// Generate cursor from TID (base32-encoded rkey)
88
88
-
let cursor = results
89
89
-
.last()
90
90
-
.map(|row| parakeet_db::tid_util::encode_tid(row.0));
89
89
+
// Generate cursor from TID (base32-encoded rkey)
90
90
+
let cursor = results
91
91
+
.last()
92
92
+
.map(|row| parakeet_db::tid_util::encode_tid(row.0));
91
93
92
92
-
let dids = results.iter().map(|row| row.1.clone()).collect();
94
94
+
// Batch resolve actor_ids → DIDs via IdCache
95
95
+
let actor_ids: Vec<i32> = results.iter().map(|row| row.1).collect();
96
96
+
let unique_actor_ids: std::collections::HashSet<i32> = actor_ids.iter().copied().collect();
97
97
+
let actor_ids_vec: Vec<i32> = unique_actor_ids.into_iter().collect();
93
98
94
94
-
let mut profiles = hyd.hydrate_profiles(dids).await;
99
99
+
// Batch resolve via IdCache
100
100
+
let cached = state.id_cache.get_actor_data_many(&actor_ids_vec).await;
101
101
+
let mut actor_id_to_did: std::collections::HashMap<i32, String> = cached
102
102
+
.into_iter()
103
103
+
.map(|(id, data)| (id, data.did))
104
104
+
.collect();
95
105
96
96
-
let followers = results
106
106
+
// Query DB for cache misses
107
107
+
let missing: Vec<i32> = actor_ids_vec
108
108
+
.iter()
109
109
+
.filter(|id| !actor_id_to_did.contains_key(id))
110
110
+
.copied()
111
111
+
.collect();
112
112
+
113
113
+
if !missing.is_empty() {
114
114
+
use diesel::sql_types::{Array, Integer, Text};
115
115
+
use diesel_async::RunQueryDsl;
116
116
+
117
117
+
#[derive(diesel::QueryableByName)]
118
118
+
struct ActorRow {
119
119
+
#[diesel(sql_type = Integer)]
120
120
+
id: i32,
121
121
+
#[diesel(sql_type = Text)]
122
122
+
did: String,
123
123
+
}
124
124
+
125
125
+
let db_results: Vec<ActorRow> = diesel::sql_query(
126
126
+
"SELECT id, did FROM actors WHERE id = ANY($1)"
127
127
+
)
128
128
+
.bind::<Array<Integer>, _>(&missing)
129
129
+
.load(&mut conn)
130
130
+
.await
131
131
+
.map_err(|e| {
132
132
+
Error::new(
133
133
+
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
134
134
+
"DatabaseError",
135
135
+
Some(format!("Failed to resolve follower DIDs: {}", e)),
136
136
+
)
137
137
+
})?;
138
138
+
139
139
+
for row in db_results {
140
140
+
actor_id_to_did.insert(row.id, row.did);
141
141
+
}
142
142
+
}
143
143
+
144
144
+
// Map actor_ids to DIDs, preserving order
145
145
+
let dids: Vec<String> = actor_ids
146
146
+
.into_iter()
147
147
+
.filter_map(|id| actor_id_to_did.get(&id).cloned())
148
148
+
.collect();
149
149
+
150
150
+
(cursor, dids)
151
151
+
} else {
152
152
+
// NON-OPTIMIZED PATH: Use original version with JOINs
153
153
+
let results = crate::db::get_actor_followers(&mut conn, &subj_did, parsed_cursor, limit).await?;
154
154
+
155
155
+
// Generate cursor from TID (base32-encoded rkey)
156
156
+
let cursor = results
157
157
+
.last()
158
158
+
.map(|row| parakeet_db::tid_util::encode_tid(row.0));
159
159
+
160
160
+
let dids = results.iter().map(|row| row.1.clone()).collect();
161
161
+
162
162
+
(cursor, dids)
163
163
+
};
164
164
+
165
165
+
let mut profiles = hyd.hydrate_profiles(dids.clone()).await;
166
166
+
167
167
+
let followers = dids
97
168
.into_iter()
98
98
-
.filter_map(|row| profiles.remove(&row.1))
169
169
+
.filter_map(|did| profiles.remove(&did))
99
170
.collect();
100
171
101
172
Ok(Json(AppBskyGraphGetFollowersRes {
···
129
200
// Parse TID cursor
130
201
let parsed_cursor = crate::xrpc::tid_cursor(query.cursor.as_ref());
131
202
132
132
-
// Parallelize: hydrate subject profile and query follows concurrently
133
133
-
let (subject_opt, results) = tokio::join!(
134
134
-
hyd.hydrate_profile(subj_did.clone()),
135
135
-
crate::db::get_actor_follows(&mut conn, &subj_did, parsed_cursor, limit)
136
136
-
);
203
203
+
// OPTIMIZATION: Resolve subject DID → actor_id via IdCache to use optimized query
204
204
+
let subject_actor_id_opt = state.id_cache.get_actor_id_only(&subj_did).await;
137
205
206
206
+
// Hydrate subject profile first (needed either way)
207
207
+
let subject_opt = hyd.hydrate_profile(subj_did.clone()).await;
138
208
let Some(subject) = subject_opt else {
139
209
return Err(Error::not_found());
140
210
};
141
211
142
142
-
let results = results?;
212
212
+
// Query follows using optimized or non-optimized path
213
213
+
let (cursor, dids) = if let Some(subject_actor_id) = subject_actor_id_opt {
214
214
+
// OPTIMIZED PATH: Use _by_id version (0 JOINs!)
215
215
+
let results = crate::db::get_actor_follows_by_id(&mut conn, subject_actor_id, parsed_cursor, limit).await?;
216
216
+
217
217
+
// Generate cursor from TID (base32-encoded rkey)
218
218
+
let cursor = results
219
219
+
.last()
220
220
+
.map(|row| parakeet_db::tid_util::encode_tid(row.0));
221
221
+
222
222
+
// Batch resolve actor_ids → DIDs via IdCache
223
223
+
let actor_ids: Vec<i32> = results.iter().map(|row| row.1).collect();
224
224
+
let unique_actor_ids: std::collections::HashSet<i32> = actor_ids.iter().copied().collect();
225
225
+
let actor_ids_vec: Vec<i32> = unique_actor_ids.into_iter().collect();
226
226
+
227
227
+
// Batch resolve via IdCache
228
228
+
let cached = state.id_cache.get_actor_data_many(&actor_ids_vec).await;
229
229
+
let mut actor_id_to_did: std::collections::HashMap<i32, String> = cached
230
230
+
.into_iter()
231
231
+
.map(|(id, data)| (id, data.did))
232
232
+
.collect();
233
233
+
234
234
+
// Query DB for cache misses
235
235
+
let missing: Vec<i32> = actor_ids_vec
236
236
+
.iter()
237
237
+
.filter(|id| !actor_id_to_did.contains_key(id))
238
238
+
.copied()
239
239
+
.collect();
240
240
+
241
241
+
if !missing.is_empty() {
242
242
+
use diesel::sql_types::{Array, Integer, Text};
243
243
+
use diesel_async::RunQueryDsl;
244
244
+
245
245
+
#[derive(diesel::QueryableByName)]
246
246
+
struct ActorRow {
247
247
+
#[diesel(sql_type = Integer)]
248
248
+
id: i32,
249
249
+
#[diesel(sql_type = Text)]
250
250
+
did: String,
251
251
+
}
143
252
144
144
-
// Generate cursor from TID (base32-encoded rkey)
145
145
-
let cursor = results
146
146
-
.last()
147
147
-
.map(|row| parakeet_db::tid_util::encode_tid(row.0));
253
253
+
let db_results: Vec<ActorRow> = diesel::sql_query(
254
254
+
"SELECT id, did FROM actors WHERE id = ANY($1)"
255
255
+
)
256
256
+
.bind::<Array<Integer>, _>(&missing)
257
257
+
.load(&mut conn)
258
258
+
.await
259
259
+
.map_err(|e| {
260
260
+
Error::new(
261
261
+
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
262
262
+
"DatabaseError",
263
263
+
Some(format!("Failed to resolve followed DIDs: {}", e)),
264
264
+
)
265
265
+
})?;
148
266
149
149
-
let dids = results.iter().map(|row| row.1.clone()).collect();
267
267
+
for row in db_results {
268
268
+
actor_id_to_did.insert(row.id, row.did);
269
269
+
}
270
270
+
}
150
271
151
151
-
let mut profiles = hyd.hydrate_profiles(dids).await;
272
272
+
// Map actor_ids to DIDs, preserving order
273
273
+
let dids: Vec<String> = actor_ids
274
274
+
.into_iter()
275
275
+
.filter_map(|id| actor_id_to_did.get(&id).cloned())
276
276
+
.collect();
152
277
153
153
-
let follows = results
278
278
+
(cursor, dids)
279
279
+
} else {
280
280
+
// NON-OPTIMIZED PATH: Use original version with JOINs
281
281
+
let results = crate::db::get_actor_follows(&mut conn, &subj_did, parsed_cursor, limit).await?;
282
282
+
283
283
+
// Generate cursor from TID (base32-encoded rkey)
284
284
+
let cursor = results
285
285
+
.last()
286
286
+
.map(|row| parakeet_db::tid_util::encode_tid(row.0));
287
287
+
288
288
+
let dids = results.iter().map(|row| row.1.clone()).collect();
289
289
+
290
290
+
(cursor, dids)
291
291
+
};
292
292
+
293
293
+
let mut profiles = hyd.hydrate_profiles(dids.clone()).await;
294
294
+
295
295
+
let follows = dids
154
296
.into_iter()
155
155
-
.filter_map(|row| profiles.remove(&row.1))
297
297
+
.filter_map(|did| profiles.remove(&did))
156
298
.collect();
157
299
158
300
Ok(Json(AppBskyGraphGetFollowsRes {
···
284
426
let viewer_did = auth.0.clone();
285
427
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, Some(auth));
286
428
287
287
-
// Resolve target actor
429
429
+
// Resolve target and viewer actors
288
430
let target_did = get_actor_did(&state.dataloaders, query.actor).await?;
289
431
290
432
let limit = query.limit.unwrap_or(50).clamp(1, 100);
291
433
let parsed_cursor = datetime_cursor(query.cursor.as_ref());
292
434
293
293
-
// Parallelize: hydrate subject profile and query mutual followers concurrently
294
294
-
let (subject_opt, results) = tokio::join!(
295
295
-
hyd.hydrate_profile(target_did.clone()),
296
296
-
crate::db::get_mutual_followers(&mut conn, &target_did, &viewer_did, parsed_cursor.as_ref(), limit)
297
297
-
);
435
435
+
// OPTIMIZATION: Resolve target and viewer DIDs → actor_ids via IdCache to use optimized query
436
436
+
let target_actor_id_opt = state.id_cache.get_actor_id_only(&target_did).await;
437
437
+
let viewer_actor_id_opt = state.id_cache.get_actor_id_only(&viewer_did).await;
298
438
439
439
+
// Hydrate subject profile first (needed either way)
440
440
+
let subject_opt = hyd.hydrate_profile(target_did.clone()).await;
299
441
let Some(subject) = subject_opt else {
300
442
return Err(Error::not_found());
301
443
};
302
444
303
303
-
let results = results?;
445
445
+
// Query known followers using optimized or non-optimized path
446
446
+
let (cursor, dids) = if let (Some(target_actor_id), Some(viewer_actor_id)) = (target_actor_id_opt, viewer_actor_id_opt) {
447
447
+
// OPTIMIZED PATH: Use _by_id version (eliminates 3 actors JOINs!)
448
448
+
let results = crate::db::get_mutual_followers_by_id(&mut conn, target_actor_id, viewer_actor_id, parsed_cursor.as_ref(), limit).await?;
449
449
+
450
450
+
// Generate cursor
451
451
+
let cursor = results.last().map(|row| row.0.to_rfc3339());
452
452
+
453
453
+
// Batch resolve actor_ids → DIDs via IdCache
454
454
+
let actor_ids: Vec<i32> = results.iter().map(|row| row.1).collect();
455
455
+
let unique_actor_ids: std::collections::HashSet<i32> = actor_ids.iter().copied().collect();
456
456
+
let actor_ids_vec: Vec<i32> = unique_actor_ids.into_iter().collect();
457
457
+
458
458
+
// Batch resolve via IdCache
459
459
+
let cached = state.id_cache.get_actor_data_many(&actor_ids_vec).await;
460
460
+
let mut actor_id_to_did: std::collections::HashMap<i32, String> = cached
461
461
+
.into_iter()
462
462
+
.map(|(id, data)| (id, data.did))
463
463
+
.collect();
464
464
+
465
465
+
// Query DB for cache misses
466
466
+
let missing: Vec<i32> = actor_ids_vec
467
467
+
.iter()
468
468
+
.filter(|id| !actor_id_to_did.contains_key(id))
469
469
+
.copied()
470
470
+
.collect();
471
471
+
472
472
+
if !missing.is_empty() {
473
473
+
use diesel::sql_types::{Array, Integer, Text};
474
474
+
use diesel_async::RunQueryDsl;
475
475
+
476
476
+
#[derive(diesel::QueryableByName)]
477
477
+
struct ActorRow {
478
478
+
#[diesel(sql_type = Integer)]
479
479
+
id: i32,
480
480
+
#[diesel(sql_type = Text)]
481
481
+
did: String,
482
482
+
}
483
483
+
484
484
+
let db_results: Vec<ActorRow> = diesel::sql_query(
485
485
+
"SELECT id, did FROM actors WHERE id = ANY($1)"
486
486
+
)
487
487
+
.bind::<Array<Integer>, _>(&missing)
488
488
+
.load(&mut conn)
489
489
+
.await
490
490
+
.map_err(|e| {
491
491
+
Error::new(
492
492
+
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
493
493
+
"DatabaseError",
494
494
+
Some(format!("Failed to resolve known follower DIDs: {}", e)),
495
495
+
)
496
496
+
})?;
304
497
305
305
-
// Generate cursor
306
306
-
let cursor = results.last().map(|row| row.0.to_rfc3339());
498
498
+
for row in db_results {
499
499
+
actor_id_to_did.insert(row.id, row.did);
500
500
+
}
501
501
+
}
502
502
+
503
503
+
// Map actor_ids to DIDs, preserving order
504
504
+
let dids: Vec<String> = actor_ids
505
505
+
.into_iter()
506
506
+
.filter_map(|id| actor_id_to_did.get(&id).cloned())
507
507
+
.collect();
508
508
+
509
509
+
(cursor, dids)
510
510
+
} else {
511
511
+
// NON-OPTIMIZED PATH: Use original version with 3 actors JOINs
512
512
+
let results = crate::db::get_mutual_followers(&mut conn, &target_did, &viewer_did, parsed_cursor.as_ref(), limit).await?;
513
513
+
514
514
+
// Generate cursor
515
515
+
let cursor = results.last().map(|row| row.0.to_rfc3339());
516
516
+
517
517
+
let dids = results.iter().map(|row| row.1.clone()).collect();
518
518
+
519
519
+
(cursor, dids)
520
520
+
};
307
521
308
522
// Hydrate profiles for known followers
309
309
-
let dids = results.iter().map(|row| row.1.clone()).collect();
310
310
-
let mut profiles = hyd.hydrate_profiles(dids).await;
523
523
+
let mut profiles = hyd.hydrate_profiles(dids.clone()).await;
311
524
312
525
// Maintain order from query
313
313
-
let followers = results
526
526
+
let followers = dids
314
527
.into_iter()
315
315
-
.filter_map(|row| profiles.remove(&row.1))
528
528
+
.filter_map(|did| profiles.remove(&did))
316
529
.collect();
317
530
318
531
Ok(Json(GetKnownFollowersRes {