+26
consumer/src/db/copy.rs
+26
consumer/src/db/copy.rs
···
198
198
199
199
writer.finish().await?;
200
200
201
+
let threadgated: Vec<(String, String, DateTime<Utc>)> = conn
202
+
.query(
203
+
"SELECT root_uri, p.at_uri, p.created_at FROM posts_tmp p INNER JOIN threadgates t ON root_uri = post_uri WHERE t.allow IS NOT NULL",
204
+
&[],
205
+
)
206
+
.await?
207
+
.into_iter()
208
+
.map(|v| (v.get(0), v.get(1), v.get(2))).collect();
209
+
210
+
for (root, post, created_at) in dbg!(threadgated) {
211
+
match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
212
+
Ok(true) => {
213
+
conn.execute(
214
+
"UPDATE posts_tmp SET violates_threadgate=TRUE WHERE at_uri=$1",
215
+
&[&post],
216
+
)
217
+
.await?;
218
+
}
219
+
Ok(false) => continue,
220
+
Err(e) => {
221
+
tracing::error!("failed to check threadgate enforcement: {e}");
222
+
continue;
223
+
}
224
+
}
225
+
}
226
+
201
227
conn.execute("INSERT INTO posts (SELECT * FROM posts_tmp)", &[])
202
228
.await
203
229
}
+111
consumer/src/db/record.rs
+111
consumer/src/db/record.rs
···
333
333
let embed = rec.embed.as_ref().map(|v| v.as_str());
334
334
let embed_subtype = rec.embed.as_ref().and_then(|v| v.subtype());
335
335
336
+
// if there is a root, we need to check for the presence of a threadgate.
337
+
let violates_threadgate = match &root_uri {
338
+
Some(root) => {
339
+
post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await?
340
+
}
341
+
None => false,
342
+
};
343
+
336
344
let tags = merge_tags(tags, rec.tags);
337
345
338
346
let count = conn
···
354
362
&embed,
355
363
&embed_subtype,
356
364
&mentions,
365
+
&violates_threadgate,
357
366
&rec.created_at,
358
367
],
359
368
)
···
369
378
pub async fn post_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
370
379
conn.execute("DELETE FROM posts WHERE at_uri=$1", &[&at_uri])
371
380
.await
381
+
}
382
+
383
+
pub async fn post_enforce_threadgate<C: GenericClient>(
384
+
conn: &mut C,
385
+
root: &str,
386
+
post_author: &str,
387
+
post_created_at: DateTime<Utc>,
388
+
is_backfill: bool,
389
+
) -> PgResult<bool> {
390
+
// check if the root and the current post are the same author
391
+
// strip "at://" then break into parts by '/'
392
+
let parts = root[5..].split('/').collect::<Vec<_>>();
393
+
let root_author = parts[0];
394
+
if root_author == post_author {
395
+
return Ok(false);
396
+
}
397
+
398
+
let tg_data = threadgate_get(conn, root).await?;
399
+
400
+
let Some((created_at, allow, allow_lists)) = tg_data else {
401
+
return Ok(false);
402
+
};
403
+
404
+
// when backfilling, there's no point continuing if the record is dated before the threadgate
405
+
if is_backfill && post_created_at < created_at {
406
+
return Ok(false);
407
+
}
408
+
409
+
if allow.is_empty() {
410
+
return Ok(true);
411
+
}
412
+
413
+
let allow: HashSet<String> = HashSet::from_iter(allow);
414
+
415
+
if allow.contains("app.bsky.feed.threadgate#followerRule")
416
+
|| allow.contains("app.bsky.feed.threadgate#followingRule")
417
+
{
418
+
let profile_state: Option<(bool, bool)> = conn
419
+
.query_opt(
420
+
"SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2",
421
+
&[&root_author, &post_author],
422
+
)
423
+
.await?
424
+
.map(|v| (v.get(0), v.get(1)));
425
+
426
+
if let Some((following, followed)) = profile_state {
427
+
if allow.contains("app.bsky.feed.threadgate#followerRule") && followed {
428
+
return Ok(false);
429
+
}
430
+
431
+
if allow.contains("app.bsky.feed.threadgate#followingRule") && following {
432
+
return Ok(false);
433
+
}
434
+
}
435
+
}
436
+
437
+
// check mentions
438
+
if allow.contains("app.bsky.feed.threadgate#mentionRule") {
439
+
let mentions: Vec<String> = conn
440
+
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
441
+
.await?
442
+
.map(|r| r.get(0))
443
+
.unwrap_or_default();
444
+
445
+
if mentions.contains(&post_author.to_owned()) {
446
+
return Ok(false);
447
+
}
448
+
}
449
+
450
+
if allow.contains("app.bsky.feed.threadgate#listRule") {
451
+
if allow_lists.is_empty() {
452
+
return Ok(true);
453
+
}
454
+
455
+
let count: i64 = conn
456
+
.query_one(
457
+
"SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2",
458
+
&[&allow_lists, &post_author],
459
+
)
460
+
.await?
461
+
.get(0);
462
+
if count == 0 {
463
+
return Ok(true);
464
+
}
465
+
}
466
+
467
+
Ok(false)
372
468
}
373
469
374
470
pub async fn post_get_info_for_delete<C: GenericClient>(
···
729
825
pub async fn status_delete<C: GenericClient>(conn: &mut C, did: &str) -> PgExecResult {
730
826
conn.execute("DELETE FROM statuses WHERE did=$1", &[&did])
731
827
.await
828
+
}
829
+
830
+
async fn threadgate_get<C: GenericClient>(
831
+
conn: &mut C,
832
+
post: &str,
833
+
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
834
+
let res = conn
835
+
.query_opt(
836
+
"SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL",
837
+
&[&post],
838
+
)
839
+
.await?
840
+
.map(|v| (v.get(0), v.get(1), v.get(2)));
841
+
842
+
Ok(res)
732
843
}
733
844
734
845
pub async fn threadgate_upsert<C: GenericClient>(
+2
-2
consumer/src/db/sql/post_insert.sql
+2
-2
consumer/src/db/sql/post_insert.sql
···
1
1
INSERT INTO posts (at_uri, did, cid, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri,
2
-
root_cid, embed, embed_subtype, mentions, created_at)
3
-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
2
+
root_cid, embed, embed_subtype, mentions, violates_threadgate, created_at)
3
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
4
4
ON CONFLICT DO NOTHING
+2
-1
migrations/2025-09-27-171241_post-tweaks/down.sql
+2
-1
migrations/2025-09-27-171241_post-tweaks/down.sql
+2
-1
migrations/2025-09-27-171241_post-tweaks/up.sql
+2
-1
migrations/2025-09-27-171241_post-tweaks/up.sql