+1
-245
Cargo.lock
+1
-245
Cargo.lock
···
125
125
checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04"
126
126
127
127
[[package]]
128
-
name = "async-channel"
129
-
version = "1.9.0"
130
-
source = "registry+https://github.com/rust-lang/crates.io-index"
131
-
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
132
-
dependencies = [
133
-
"concurrent-queue",
134
-
"event-listener 2.5.3",
135
-
"futures-core",
136
-
]
137
-
138
-
[[package]]
139
-
name = "async-channel"
140
-
version = "2.3.1"
141
-
source = "registry+https://github.com/rust-lang/crates.io-index"
142
-
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
143
-
dependencies = [
144
-
"concurrent-queue",
145
-
"event-listener-strategy",
146
-
"futures-core",
147
-
"pin-project-lite",
148
-
]
149
-
150
-
[[package]]
151
128
name = "async-compression"
152
129
version = "0.4.22"
153
130
source = "registry+https://github.com/rust-lang/crates.io-index"
···
161
138
]
162
139
163
140
[[package]]
164
-
name = "async-executor"
165
-
version = "1.13.1"
166
-
source = "registry+https://github.com/rust-lang/crates.io-index"
167
-
checksum = "30ca9a001c1e8ba5149f91a74362376cc6bc5b919d92d988668657bd570bdcec"
168
-
dependencies = [
169
-
"async-task",
170
-
"concurrent-queue",
171
-
"fastrand",
172
-
"futures-lite",
173
-
"slab",
174
-
]
175
-
176
-
[[package]]
177
-
name = "async-global-executor"
178
-
version = "2.4.1"
179
-
source = "registry+https://github.com/rust-lang/crates.io-index"
180
-
checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c"
181
-
dependencies = [
182
-
"async-channel 2.3.1",
183
-
"async-executor",
184
-
"async-io",
185
-
"async-lock",
186
-
"blocking",
187
-
"futures-lite",
188
-
"once_cell",
189
-
]
190
-
191
-
[[package]]
192
-
name = "async-io"
193
-
version = "2.4.0"
194
-
source = "registry+https://github.com/rust-lang/crates.io-index"
195
-
checksum = "43a2b323ccce0a1d90b449fd71f2a06ca7faa7c54c2751f06c9bd851fc061059"
196
-
dependencies = [
197
-
"async-lock",
198
-
"cfg-if",
199
-
"concurrent-queue",
200
-
"futures-io",
201
-
"futures-lite",
202
-
"parking",
203
-
"polling",
204
-
"rustix",
205
-
"slab",
206
-
"tracing",
207
-
"windows-sys 0.59.0",
208
-
]
209
-
210
-
[[package]]
211
-
name = "async-lock"
212
-
version = "3.4.0"
213
-
source = "registry+https://github.com/rust-lang/crates.io-index"
214
-
checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
215
-
dependencies = [
216
-
"event-listener 5.4.0",
217
-
"event-listener-strategy",
218
-
"pin-project-lite",
219
-
]
220
-
221
-
[[package]]
222
141
name = "async-recursion"
223
142
version = "1.1.1"
224
143
source = "registry+https://github.com/rust-lang/crates.io-index"
···
228
147
"quote",
229
148
"syn",
230
149
]
231
-
232
-
[[package]]
233
-
name = "async-std"
234
-
version = "1.13.0"
235
-
source = "registry+https://github.com/rust-lang/crates.io-index"
236
-
checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615"
237
-
dependencies = [
238
-
"async-channel 1.9.0",
239
-
"async-global-executor",
240
-
"async-io",
241
-
"async-lock",
242
-
"crossbeam-utils",
243
-
"futures-channel",
244
-
"futures-core",
245
-
"futures-io",
246
-
"futures-lite",
247
-
"gloo-timers",
248
-
"kv-log-macro",
249
-
"log",
250
-
"memchr",
251
-
"once_cell",
252
-
"pin-project-lite",
253
-
"pin-utils",
254
-
"slab",
255
-
"wasm-bindgen-futures",
256
-
]
257
-
258
-
[[package]]
259
-
name = "async-task"
260
-
version = "4.7.1"
261
-
source = "registry+https://github.com/rust-lang/crates.io-index"
262
-
checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de"
263
150
264
151
[[package]]
265
152
name = "async-trait"
···
488
375
]
489
376
490
377
[[package]]
491
-
name = "blocking"
492
-
version = "1.6.1"
493
-
source = "registry+https://github.com/rust-lang/crates.io-index"
494
-
checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea"
495
-
dependencies = [
496
-
"async-channel 2.3.1",
497
-
"async-task",
498
-
"futures-io",
499
-
"futures-lite",
500
-
"piper",
501
-
]
502
-
503
-
[[package]]
504
378
name = "brotli"
505
379
version = "7.0.0"
506
380
source = "registry+https://github.com/rust-lang/crates.io-index"
···
727
601
]
728
602
729
603
[[package]]
730
-
name = "concurrent-queue"
731
-
version = "2.5.0"
732
-
source = "registry+https://github.com/rust-lang/crates.io-index"
733
-
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
734
-
dependencies = [
735
-
"crossbeam-utils",
736
-
]
737
-
738
-
[[package]]
739
604
name = "const-oid"
740
605
version = "0.9.6"
741
606
source = "registry+https://github.com/rust-lang/crates.io-index"
···
968
833
name = "dataloader"
969
834
version = "0.18.0"
970
835
dependencies = [
971
-
"async-std",
972
836
"futures",
973
837
"tokio",
974
838
]
···
1256
1120
]
1257
1121
1258
1122
[[package]]
1259
-
name = "event-listener"
1260
-
version = "2.5.3"
1261
-
source = "registry+https://github.com/rust-lang/crates.io-index"
1262
-
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
1263
-
1264
-
[[package]]
1265
-
name = "event-listener"
1266
-
version = "5.4.0"
1267
-
source = "registry+https://github.com/rust-lang/crates.io-index"
1268
-
checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae"
1269
-
dependencies = [
1270
-
"concurrent-queue",
1271
-
"parking",
1272
-
"pin-project-lite",
1273
-
]
1274
-
1275
-
[[package]]
1276
-
name = "event-listener-strategy"
1277
-
version = "0.5.3"
1278
-
source = "registry+https://github.com/rust-lang/crates.io-index"
1279
-
checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2"
1280
-
dependencies = [
1281
-
"event-listener 5.4.0",
1282
-
"pin-project-lite",
1283
-
]
1284
-
1285
-
[[package]]
1286
1123
name = "eyre"
1287
1124
version = "0.6.12"
1288
1125
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1453
1290
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
1454
1291
1455
1292
[[package]]
1456
-
name = "futures-lite"
1457
-
version = "2.6.0"
1458
-
source = "registry+https://github.com/rust-lang/crates.io-index"
1459
-
checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532"
1460
-
dependencies = [
1461
-
"fastrand",
1462
-
"futures-core",
1463
-
"futures-io",
1464
-
"parking",
1465
-
"pin-project-lite",
1466
-
]
1467
-
1468
-
[[package]]
1469
1293
name = "futures-macro"
1470
1294
version = "0.3.31"
1471
1295
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1564
1388
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
1565
1389
1566
1390
[[package]]
1567
-
name = "gloo-timers"
1568
-
version = "0.3.0"
1569
-
source = "registry+https://github.com/rust-lang/crates.io-index"
1570
-
checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994"
1571
-
dependencies = [
1572
-
"futures-channel",
1573
-
"futures-core",
1574
-
"js-sys",
1575
-
"wasm-bindgen",
1576
-
]
1577
-
1578
-
[[package]]
1579
1391
name = "group"
1580
1392
version = "0.13.0"
1581
1393
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1665
1477
version = "0.3.9"
1666
1478
source = "registry+https://github.com/rust-lang/crates.io-index"
1667
1479
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
1668
-
1669
-
[[package]]
1670
-
name = "hermit-abi"
1671
-
version = "0.4.0"
1672
-
source = "registry+https://github.com/rust-lang/crates.io-index"
1673
-
checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc"
1674
1480
1675
1481
[[package]]
1676
1482
name = "hex"
···
2248
2054
]
2249
2055
2250
2056
[[package]]
2251
-
name = "kv-log-macro"
2252
-
version = "1.0.7"
2253
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2254
-
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
2255
-
dependencies = [
2256
-
"log",
2257
-
]
2258
-
2259
-
[[package]]
2260
2057
name = "lazy_static"
2261
2058
version = "1.5.0"
2262
2059
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2361
2158
version = "0.4.25"
2362
2159
source = "registry+https://github.com/rust-lang/crates.io-index"
2363
2160
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
2364
-
dependencies = [
2365
-
"value-bag",
2366
-
]
2367
2161
2368
2162
[[package]]
2369
2163
name = "lru-cache"
···
2655
2449
source = "registry+https://github.com/rust-lang/crates.io-index"
2656
2450
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
2657
2451
dependencies = [
2658
-
"hermit-abi 0.3.9",
2452
+
"hermit-abi",
2659
2453
"libc",
2660
2454
]
2661
2455
···
2824
2618
]
2825
2619
2826
2620
[[package]]
2827
-
name = "parking"
2828
-
version = "2.2.1"
2829
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2830
-
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
2831
-
2832
-
[[package]]
2833
2621
name = "parking_lot"
2834
2622
version = "0.11.2"
2835
2623
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2992
2780
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
2993
2781
2994
2782
[[package]]
2995
-
name = "piper"
2996
-
version = "0.2.4"
2997
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2998
-
checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066"
2999
-
dependencies = [
3000
-
"atomic-waker",
3001
-
"fastrand",
3002
-
"futures-io",
3003
-
]
3004
-
3005
-
[[package]]
3006
2783
name = "pkcs1"
3007
2784
version = "0.7.5"
3008
2785
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3028
2805
version = "0.3.31"
3029
2806
source = "registry+https://github.com/rust-lang/crates.io-index"
3030
2807
checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
3031
-
3032
-
[[package]]
3033
-
name = "polling"
3034
-
version = "3.7.4"
3035
-
source = "registry+https://github.com/rust-lang/crates.io-index"
3036
-
checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f"
3037
-
dependencies = [
3038
-
"cfg-if",
3039
-
"concurrent-queue",
3040
-
"hermit-abi 0.4.0",
3041
-
"pin-project-lite",
3042
-
"rustix",
3043
-
"tracing",
3044
-
"windows-sys 0.59.0",
3045
-
]
3046
2808
3047
2809
[[package]]
3048
2810
name = "portable-atomic"
···
4688
4450
version = "0.1.1"
4689
4451
source = "registry+https://github.com/rust-lang/crates.io-index"
4690
4452
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
4691
-
4692
-
[[package]]
4693
-
name = "value-bag"
4694
-
version = "1.10.0"
4695
-
source = "registry+https://github.com/rust-lang/crates.io-index"
4696
-
checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2"
4697
4453
4698
4454
[[package]]
4699
4455
name = "vcpkg"
+6
-1
consumer/src/backfill/mod.rs
+6
-1
consumer/src/backfill/mod.rs
···
275
275
follows: Vec<(String, String, DateTime<Utc>)>,
276
276
list_items: Vec<(String, records::AppBskyGraphListItem)>,
277
277
verifications: Vec<(String, Cid, records::AppBskyGraphVerification)>,
278
+
threadgates: Vec<(String, Cid, records::AppBskyFeedThreadgate)>, // not COPY'd but needs to be kept until last.
278
279
records: Vec<(String, Cid)>,
279
280
}
280
281
281
282
impl CopyStore {
282
283
async fn submit(self, t: &mut Transaction<'_>, did: &str) -> Result<(), tokio_postgres::Error> {
283
284
db::copy::copy_likes(t, did, self.likes).await?;
284
-
db::copy::copy_posts(t, did, self.posts).await?;
285
285
db::copy::copy_reposts(t, did, self.reposts).await?;
286
286
db::copy::copy_blocks(t, did, self.blocks).await?;
287
287
db::copy::copy_follows(t, did, self.follows).await?;
288
288
db::copy::copy_list_items(t, self.list_items).await?;
289
289
db::copy::copy_verification(t, did, self.verifications).await?;
290
+
db::copy::copy_posts(t, did, self.posts).await?;
291
+
for (at_uri, cid, record) in self.threadgates {
292
+
db::threadgate_enforce_backfill(t, did, &record).await?;
293
+
db::threadgate_upsert(t, &at_uri, cid, record).await?;
294
+
}
290
295
db::copy::copy_records(t, did, self.records).await?;
291
296
292
297
Ok(())
+11
-1
consumer/src/backfill/repo.rs
+11
-1
consumer/src/backfill/repo.rs
···
4
4
};
5
5
use crate::indexer::records;
6
6
use crate::indexer::types::{AggregateDeltaStore, RecordTypes};
7
+
use crate::utils::at_uri_is_by;
7
8
use crate::{db, indexer};
8
9
use deadpool_postgres::Transaction;
9
10
use ipld_core::cid::Cid;
···
144
145
db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?;
145
146
}
146
147
if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) {
147
-
db::post_embed_insert(t, &at_uri, embed, rec.created_at).await?;
148
+
db::post_embed_insert(t, &at_uri, embed, rec.created_at, true).await?;
148
149
}
149
150
150
151
deltas.incr(did, AggregateType::ProfilePost).await;
···
165
166
copies
166
167
.reposts
167
168
.push((rkey.to_string(), rec.subject, rec.via, rec.created_at));
169
+
}
170
+
RecordTypes::AppBskyFeedThreadgate(record) => {
171
+
if !at_uri_is_by(&record.post, did) {
172
+
tracing::warn!("tried to create a threadgate on a post we don't control!");
173
+
return Ok(());
174
+
}
175
+
176
+
copies.push_record(&at_uri, cid);
177
+
copies.threadgates.push((at_uri, cid, record));
168
178
}
169
179
RecordTypes::AppBskyGraphBlock(rec) => {
170
180
copies.push_record(&at_uri, cid);
+38
-3
consumer/src/db/copy.rs
+38
-3
consumer/src/db/copy.rs
···
1
1
use super::PgExecResult;
2
2
use crate::indexer::records;
3
-
use crate::utils::strongref_to_parts;
3
+
use crate::utils::{extract_mentions_and_tags, merge_tags, strongref_to_parts};
4
4
use chrono::prelude::*;
5
5
use deadpool_postgres::Transaction;
6
6
use futures::pin_mut;
···
119
119
.await
120
120
}
121
121
122
-
const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, created_at) FROM STDIN (FORMAT binary)";
122
+
const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, mentions, created_at) FROM STDIN (FORMAT binary)";
123
123
const POST_TYPES: &[Type] = &[
124
124
Type::TEXT,
125
125
Type::TEXT,
···
135
135
Type::TEXT,
136
136
Type::TEXT,
137
137
Type::TEXT,
138
+
Type::TEXT_ARRAY,
138
139
Type::TIMESTAMP,
139
140
];
140
141
pub async fn copy_posts(
···
159
160
160
161
for (at_uri, cid, post) in data {
161
162
let record = serde_json::to_value(&post).unwrap();
163
+
let (mentions, tags) = post
164
+
.facets
165
+
.as_ref()
166
+
.map(|v| extract_mentions_and_tags(v))
167
+
.unzip();
162
168
let facets = post.facets.and_then(|v| serde_json::to_value(v).ok());
163
169
let embed = post.embed.as_ref().map(|v| v.as_str());
164
170
let embed_subtype = post.embed.as_ref().and_then(|v| v.subtype());
165
171
let (parent_uri, parent_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.parent));
166
172
let (root_uri, root_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.root));
173
+
174
+
let tags = merge_tags(tags, post.tags);
167
175
168
176
let writer = writer.as_mut();
169
177
writer
···
175
183
&post.text,
176
184
&facets,
177
185
&post.langs.unwrap_or_default(),
178
-
&post.tags.unwrap_or_default(),
186
+
&tags,
179
187
&parent_uri,
180
188
&parent_cid,
181
189
&root_uri,
182
190
&root_cid,
183
191
&embed,
184
192
&embed_subtype,
193
+
&mentions,
185
194
&post.created_at.naive_utc(),
186
195
])
187
196
.await?;
188
197
}
189
198
190
199
writer.finish().await?;
200
+
201
+
let threadgated: Vec<(String, String, DateTime<Utc>)> = conn
202
+
.query(
203
+
"SELECT root_uri, p.at_uri, p.created_at FROM posts_tmp p INNER JOIN threadgates t ON root_uri = post_uri WHERE t.allow IS NOT NULL",
204
+
&[],
205
+
)
206
+
.await?
207
+
.into_iter()
208
+
.map(|v| (v.get(0), v.get(1), v.get(2))).collect();
209
+
210
+
for (root, post, created_at) in threadgated {
211
+
match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
212
+
Ok(true) => {
213
+
conn.execute(
214
+
"UPDATE posts_tmp SET violates_threadgate=TRUE WHERE at_uri=$1",
215
+
&[&post],
216
+
)
217
+
.await?;
218
+
}
219
+
Ok(false) => continue,
220
+
Err(e) => {
221
+
tracing::error!("failed to check threadgate enforcement: {e}");
222
+
continue;
223
+
}
224
+
}
225
+
}
191
226
192
227
conn.execute("INSERT INTO posts (SELECT * FROM posts_tmp)", &[])
193
228
.await
+208
consumer/src/db/gates.rs
+208
consumer/src/db/gates.rs
···
1
+
use super::{PgExecResult, PgResult};
2
+
use crate::indexer::records::{
3
+
AppBskyFeedThreadgate, ThreadgateRule, THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING,
4
+
THREADGATE_RULE_LIST, THREADGATE_RULE_MENTION,
5
+
};
6
+
use chrono::prelude::*;
7
+
use chrono::{DateTime, Utc};
8
+
use deadpool_postgres::GenericClient;
9
+
use std::collections::HashSet;
10
+
11
+
pub async fn post_enforce_threadgate<C: GenericClient>(
12
+
conn: &mut C,
13
+
root: &str,
14
+
post_author: &str,
15
+
post_created_at: DateTime<Utc>,
16
+
is_backfill: bool,
17
+
) -> PgResult<bool> {
18
+
// check if the root and the current post are the same author
19
+
// strip "at://" then break into parts by '/'
20
+
let parts = root[5..].split('/').collect::<Vec<_>>();
21
+
let root_author = parts[0];
22
+
if root_author == post_author {
23
+
return Ok(false);
24
+
}
25
+
26
+
let tg_data = super::threadgate_get(conn, root).await?;
27
+
28
+
let Some((created_at, allow, allow_lists)) = tg_data else {
29
+
return Ok(false);
30
+
};
31
+
32
+
// when backfilling, there's no point continuing if the record is dated before the threadgate
33
+
if is_backfill && post_created_at < created_at {
34
+
return Ok(false);
35
+
}
36
+
37
+
if allow.is_empty() {
38
+
return Ok(true);
39
+
}
40
+
41
+
let allow: HashSet<String> = HashSet::from_iter(allow);
42
+
43
+
if allow.contains(THREADGATE_RULE_FOLLOWER) || allow.contains(THREADGATE_RULE_FOLLOWING) {
44
+
let profile_state: Option<(bool, bool)> = conn
45
+
.query_opt(
46
+
"SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2",
47
+
&[&root_author, &post_author],
48
+
)
49
+
.await?
50
+
.map(|v| (v.get(0), v.get(1)));
51
+
52
+
if let Some((following, followed)) = profile_state {
53
+
if allow.contains(THREADGATE_RULE_FOLLOWER) && followed {
54
+
return Ok(false);
55
+
}
56
+
57
+
if allow.contains(THREADGATE_RULE_FOLLOWING) && following {
58
+
return Ok(false);
59
+
}
60
+
}
61
+
}
62
+
63
+
// check mentions
64
+
if allow.contains(THREADGATE_RULE_MENTION) {
65
+
let mentions: Vec<String> = conn
66
+
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
67
+
.await?
68
+
.map(|r| r.get(0))
69
+
.unwrap_or_default();
70
+
71
+
if mentions.contains(&post_author.to_owned()) {
72
+
return Ok(false);
73
+
}
74
+
}
75
+
76
+
if allow.contains(THREADGATE_RULE_LIST) {
77
+
if allow_lists.is_empty() {
78
+
return Ok(true);
79
+
}
80
+
81
+
let count: i64 = conn
82
+
.query_one(
83
+
"SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2",
84
+
&[&allow_lists, &post_author],
85
+
)
86
+
.await?
87
+
.get(0);
88
+
if count != 0 {
89
+
return Ok(false);
90
+
}
91
+
}
92
+
93
+
Ok(true)
94
+
}
95
+
96
+
pub async fn postgate_maintain_detaches<C: GenericClient>(
97
+
conn: &mut C,
98
+
post: &str,
99
+
detached: &[String],
100
+
disable_effective: Option<NaiveDateTime>,
101
+
) -> PgExecResult {
102
+
conn.execute(
103
+
"SELECT maintain_postgates($1, $2, $3)",
104
+
&[&post, &detached, &disable_effective],
105
+
)
106
+
.await
107
+
}
108
+
109
+
// variant of post_enforce_threadgate that runs when backfilling to clean up any posts already in DB
110
+
pub async fn threadgate_enforce_backfill<C: GenericClient>(
111
+
conn: &mut C,
112
+
root_author: &str,
113
+
threadgate: &AppBskyFeedThreadgate,
114
+
) -> PgExecResult {
115
+
// pull out allow - if it's None we can skip this gate.
116
+
let Some(allow) = threadgate.allow.as_ref() else {
117
+
return Ok(0);
118
+
};
119
+
120
+
let root = &threadgate.post;
121
+
122
+
if allow.is_empty() {
123
+
// blind update everything
124
+
return conn.execute(
125
+
"UPDATE posts SET violates_threadgate=TRUE WHERE root_uri=$1 AND did != $2 AND created_at >= $3",
126
+
&[&root, &root_author, &threadgate.created_at],
127
+
).await;
128
+
}
129
+
130
+
// pull authors with our root_uri where the author is not the root author and are dated after created_at
131
+
// this is mutable because we'll remove ALLOWED dids
132
+
let mut dids: HashSet<String> = conn
133
+
.query(
134
+
"SELECT DISTINCT did FROM posts WHERE root_uri=$1 AND did != $2 AND created_at >= $3",
135
+
&[&root, &root_author, &threadgate.created_at],
136
+
)
137
+
.await?
138
+
.into_iter()
139
+
.map(|row| row.get(0))
140
+
.collect();
141
+
142
+
// this will be empty if there are no replies.
143
+
if dids.is_empty() {
144
+
return Ok(0);
145
+
}
146
+
147
+
let allowed_lists = allow
148
+
.iter()
149
+
.filter_map(|rule| match rule {
150
+
ThreadgateRule::List { list } => Some(list),
151
+
_ => None,
152
+
})
153
+
.collect::<Vec<_>>();
154
+
155
+
let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str()));
156
+
157
+
if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() {
158
+
let current_dids: Vec<_> = dids.iter().collect();
159
+
160
+
let res = conn.query(
161
+
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL",
162
+
&[&root_author, ¤t_dids]
163
+
).await?;
164
+
165
+
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
166
+
}
167
+
168
+
if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() {
169
+
let current_dids: Vec<_> = dids.iter().collect();
170
+
171
+
let res = conn.query(
172
+
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL",
173
+
&[&root_author, ¤t_dids]
174
+
).await?;
175
+
176
+
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
177
+
}
178
+
179
+
if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() {
180
+
let mentions: Vec<String> = conn
181
+
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
182
+
.await?
183
+
.map(|r| r.get(0))
184
+
.unwrap_or_default();
185
+
186
+
dids = &dids - &HashSet::from_iter(mentions);
187
+
}
188
+
189
+
if allow.contains(THREADGATE_RULE_LIST) && !dids.is_empty() {
190
+
let current_dids: Vec<_> = dids.iter().collect();
191
+
192
+
let res = conn
193
+
.query(
194
+
"SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)",
195
+
&[&allowed_lists, ¤t_dids],
196
+
)
197
+
.await?;
198
+
199
+
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
200
+
}
201
+
202
+
let dids = dids.into_iter().collect::<Vec<_>>();
203
+
204
+
conn.execute(
205
+
"UPDATE posts SET violates_threadgate=TRUE WHERE root_uri = $1 AND did = ANY($2) AND created_at >= $3",
206
+
&[&threadgate.post, &dids, &threadgate.created_at]
207
+
).await
208
+
}
+2
consumer/src/db/mod.rs
+2
consumer/src/db/mod.rs
+85
-41
consumer/src/db/record.rs
+85
-41
consumer/src/db/record.rs
···
1
1
use super::{PgExecResult, PgOptResult, PgResult};
2
2
use crate::indexer::records::*;
3
-
use crate::utils::{blob_ref, strongref_to_parts};
3
+
use crate::utils::{blob_ref, extract_mentions_and_tags, merge_tags, strongref_to_parts};
4
4
use chrono::prelude::*;
5
5
use deadpool_postgres::GenericClient;
6
6
use ipld_core::cid::Cid;
7
7
use lexica::community_lexicon::bookmarks::Bookmark;
8
+
use std::collections::HashSet;
8
9
9
10
pub async fn record_upsert<C: GenericClient>(
10
11
conn: &mut C,
···
317
318
repo: &str,
318
319
cid: Cid,
319
320
rec: AppBskyFeedPost,
321
+
is_backfill: bool,
320
322
) -> PgExecResult {
321
323
let cid = cid.to_string();
322
324
let record = serde_json::to_value(&rec).unwrap();
325
+
let (mentions, tags) = rec
326
+
.facets
327
+
.as_ref()
328
+
.map(|v| extract_mentions_and_tags(v))
329
+
.unzip();
323
330
let facets = rec.facets.and_then(|v| serde_json::to_value(v).ok());
324
331
let (parent_uri, parent_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.parent));
325
332
let (root_uri, root_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.root));
326
333
let embed = rec.embed.as_ref().map(|v| v.as_str());
327
334
let embed_subtype = rec.embed.as_ref().and_then(|v| v.subtype());
328
335
336
+
// if there is a root, we need to check for the presence of a threadgate.
337
+
let violates_threadgate = match &root_uri {
338
+
Some(root) => {
339
+
super::post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await?
340
+
}
341
+
None => false,
342
+
};
343
+
344
+
let tags = merge_tags(tags, rec.tags);
345
+
329
346
let count = conn
330
347
.execute(
331
348
include_str!("sql/post_insert.sql"),
···
337
354
&rec.text,
338
355
&facets,
339
356
&rec.langs.unwrap_or_default(),
340
-
&rec.tags.unwrap_or_default(),
357
+
&tags,
341
358
&parent_uri,
342
359
&parent_cid,
343
360
&root_uri,
344
361
&root_cid,
345
362
&embed,
346
363
&embed_subtype,
364
+
&mentions,
365
+
&violates_threadgate,
347
366
&rec.created_at,
348
367
],
349
368
)
350
369
.await?;
351
370
352
371
if let Some(embed) = rec.embed.and_then(|embed| embed.into_bsky()) {
353
-
post_embed_insert(conn, at_uri, embed, rec.created_at).await?;
372
+
post_embed_insert(conn, at_uri, embed, rec.created_at, is_backfill).await?;
354
373
}
355
374
356
375
Ok(count)
···
380
399
post: &str,
381
400
embed: AppBskyEmbed,
382
401
created_at: DateTime<Utc>,
402
+
is_backfill: bool,
383
403
) -> PgExecResult {
384
404
match embed {
385
405
AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await,
386
406
AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await,
387
407
AppBskyEmbed::External(embed) => post_embed_external_insert(conn, post, embed).await,
388
408
AppBskyEmbed::Record(embed) => {
389
-
post_embed_record_insert(conn, post, embed, created_at).await
409
+
post_embed_record_insert(conn, post, embed, created_at, is_backfill).await
390
410
}
391
411
AppBskyEmbed::RecordWithMedia(embed) => {
392
-
post_embed_record_insert(conn, post, embed.record, created_at).await?;
412
+
post_embed_record_insert(conn, post, embed.record, created_at, is_backfill).await?;
393
413
match *embed.media {
394
414
AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await,
395
415
AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await,
···
476
496
).await
477
497
}
478
498
499
+
const PG_DISABLE_RULE: &str = "app.bsky.feed.postgate#disableRule";
479
500
async fn post_embed_record_insert<C: GenericClient>(
480
501
conn: &mut C,
481
502
post: &str,
482
503
embed: AppBskyEmbedRecord,
483
504
post_created_at: DateTime<Utc>,
505
+
is_backfill: bool,
484
506
) -> PgExecResult {
485
507
// strip "at://" then break into parts by '/'
486
508
let parts = embed.record.uri[5..].split('/').collect::<Vec<_>>();
487
509
488
510
let detached = if parts[1] == "app.bsky.feed.post" {
489
-
let postgate_effective: Option<DateTime<Utc>> = conn
490
-
.query_opt(
491
-
"SELECT created_at FROM postgates WHERE post_uri=$1",
492
-
&[&post],
493
-
)
494
-
.await?
495
-
.map(|v| v.get(0));
511
+
let pg_data = postgate_get(conn, post).await?;
496
512
497
-
postgate_effective
498
-
.map(|v| Utc::now().min(post_created_at) > v)
499
-
.unwrap_or_default()
513
+
if let Some((effective, detached, rules)) = pg_data {
514
+
let detached: HashSet<String> = HashSet::from_iter(detached);
515
+
let rules: HashSet<String> = HashSet::from_iter(rules);
516
+
let compare_date = match is_backfill {
517
+
true => post_created_at,
518
+
false => Utc::now(),
519
+
};
520
+
521
+
detached.contains(post) || (rules.contains(PG_DISABLE_RULE) && compare_date > effective)
522
+
} else {
523
+
false
524
+
}
500
525
} else {
501
526
false
502
527
};
···
507
532
).await
508
533
}
509
534
535
+
async fn postgate_get<C: GenericClient>(
536
+
conn: &mut C,
537
+
post: &str,
538
+
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
539
+
let res = conn
540
+
.query_opt(
541
+
"SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1",
542
+
&[&post],
543
+
)
544
+
.await?
545
+
.map(|v| (v.get(0), v.get(1), v.get(2)));
546
+
547
+
Ok(res)
548
+
}
549
+
510
550
pub async fn postgate_upsert<C: GenericClient>(
511
551
conn: &mut C,
512
552
at_uri: &str,
···
536
576
pub async fn postgate_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
537
577
conn.execute("DELETE FROM postgates WHERE at_uri=$1", &[&at_uri])
538
578
.await
539
-
}
540
-
541
-
pub async fn postgate_maintain_detaches<C: GenericClient>(
542
-
conn: &mut C,
543
-
post: &str,
544
-
detached: &[String],
545
-
disable_effective: Option<NaiveDateTime>,
546
-
) -> PgExecResult {
547
-
conn.execute(
548
-
"SELECT maintain_postgates($1, $2, $3)",
549
-
&[&post, &detached, &disable_effective],
550
-
)
551
-
.await
552
579
}
553
580
554
581
pub async fn profile_upsert<C: GenericClient>(
···
700
727
.await
701
728
}
702
729
730
+
pub async fn threadgate_get<C: GenericClient>(
731
+
conn: &mut C,
732
+
post: &str,
733
+
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
734
+
let res = conn
735
+
.query_opt(
736
+
"SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL",
737
+
&[&post],
738
+
)
739
+
.await?
740
+
.map(|v| (v.get(0), v.get(1), v.get(2)));
741
+
742
+
Ok(res)
743
+
}
744
+
703
745
pub async fn threadgate_upsert<C: GenericClient>(
704
746
conn: &mut C,
705
747
at_uri: &str,
···
708
750
) -> PgExecResult {
709
751
let record = serde_json::to_value(&rec).unwrap();
710
752
711
-
let allowed_lists = rec
712
-
.allow
713
-
.iter()
714
-
.filter_map(|rule| match rule {
715
-
ThreadgateRule::List { list } => Some(list.clone()),
716
-
_ => None,
717
-
})
718
-
.collect::<Vec<_>>();
753
+
let allowed_lists = rec.allow.as_ref().map(|allow| {
754
+
allow
755
+
.iter()
756
+
.filter_map(|rule| match rule {
757
+
ThreadgateRule::List { list } => Some(list.clone()),
758
+
_ => None,
759
+
})
760
+
.collect::<Vec<_>>()
761
+
});
719
762
720
-
let allow = rec
721
-
.allow
722
-
.into_iter()
723
-
.map(|v| v.as_str().to_string())
724
-
.collect::<Vec<_>>();
763
+
let allow = rec.allow.map(|allow| {
764
+
allow
765
+
.into_iter()
766
+
.map(|v| v.as_str().to_string())
767
+
.collect::<Vec<_>>()
768
+
});
725
769
726
770
conn.execute(
727
771
include_str!("sql/threadgate_upsert.sql"),
+2
-2
consumer/src/db/sql/post_insert.sql
+2
-2
consumer/src/db/sql/post_insert.sql
···
1
1
INSERT INTO posts (at_uri, did, cid, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri,
2
-
root_cid, embed, embed_subtype, created_at)
3
-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
2
+
root_cid, embed, embed_subtype, mentions, violates_threadgate, created_at)
3
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
4
4
ON CONFLICT DO NOTHING
+1
-1
consumer/src/indexer/mod.rs
+1
-1
consumer/src/indexer/mod.rs
···
625
625
});
626
626
627
627
let labels = record.labels.clone();
628
-
db::post_insert(conn, at_uri, repo, cid, record).await?;
628
+
db::post_insert(conn, at_uri, repo, cid, record, false).await?;
629
629
if let Some(labels) = labels {
630
630
db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?;
631
631
}
+10
-6
consumer/src/indexer/records.rs
+10
-6
consumer/src/indexer/records.rs
···
267
267
pub struct AppBskyFeedThreadgate {
268
268
pub post: String,
269
269
pub created_at: DateTime<Utc>,
270
-
#[serde(default)]
271
-
pub allow: Vec<ThreadgateRule>,
270
+
pub allow: Option<Vec<ThreadgateRule>>,
272
271
#[serde(default)]
273
272
pub hidden_replies: Vec<String>,
274
273
}
274
+
275
+
pub const THREADGATE_RULE_MENTION: &str = "app.bsky.feed.threadgate#mentionRule";
276
+
pub const THREADGATE_RULE_FOLLOWER: &str = "app.bsky.feed.threadgate#followerRule";
277
+
pub const THREADGATE_RULE_FOLLOWING: &str = "app.bsky.feed.threadgate#followingRule";
278
+
pub const THREADGATE_RULE_LIST: &str = "app.bsky.feed.threadgate#listRule";
275
279
276
280
#[derive(Debug, Deserialize, Serialize)]
277
281
#[serde(tag = "$type")]
···
289
293
impl ThreadgateRule {
290
294
pub fn as_str(&self) -> &'static str {
291
295
match self {
292
-
ThreadgateRule::Mention => "app.bsky.feed.threadgate#mentionRule",
293
-
ThreadgateRule::Follower => "app.bsky.feed.threadgate#followerRule",
294
-
ThreadgateRule::Following => "app.bsky.feed.threadgate#followingRule",
295
-
ThreadgateRule::List { .. } => "app.bsky.feed.threadgate#listRule",
296
+
ThreadgateRule::Mention => THREADGATE_RULE_MENTION,
297
+
ThreadgateRule::Follower => THREADGATE_RULE_FOLLOWER,
298
+
ThreadgateRule::Following => THREADGATE_RULE_FOLLOWING,
299
+
ThreadgateRule::List { .. } => THREADGATE_RULE_LIST,
296
300
}
297
301
}
298
302
}
+31
consumer/src/utils.rs
+31
consumer/src/utils.rs
···
1
+
use lexica::app_bsky::richtext::{Facet, FacetMain, FacetOuter};
1
2
use lexica::{Blob, StrongRef};
2
3
use serde::{Deserialize, Deserializer};
3
4
···
39
40
40
41
did == split_aturi[2]
41
42
}
43
+
44
+
pub fn extract_mentions_and_tags(from: &[FacetMain]) -> (Vec<String>, Vec<String>) {
45
+
let (mentions, tags) = from
46
+
.iter()
47
+
.flat_map(|v| {
48
+
v.features.iter().map(|facet| match facet {
49
+
FacetOuter::Bsky(Facet::Mention { did }) => (Some(did), None),
50
+
FacetOuter::Bsky(Facet::Tag { tag }) => (None, Some(tag)),
51
+
_ => (None, None),
52
+
})
53
+
})
54
+
.unzip::<_, _, Vec<_>, Vec<_>>();
55
+
56
+
let mentions = mentions.into_iter().flatten().cloned().collect();
57
+
let tags = tags.into_iter().flatten().cloned().collect();
58
+
59
+
(mentions, tags)
60
+
}
61
+
62
+
pub fn merge_tags<T>(t1: Option<Vec<T>>, t2: Option<Vec<T>>) -> Vec<T> {
63
+
match (t1, t2) {
64
+
(Some(t1), None) => t1,
65
+
(None, Some(t2)) => t2,
66
+
(Some(mut t1), Some(t2)) => {
67
+
t1.extend(t2);
68
+
t1
69
+
}
70
+
_ => Vec::default(),
71
+
}
72
+
}
+2
-12
dataloader-rs/Cargo.toml
+2
-12
dataloader-rs/Cargo.toml
···
2
2
name = "dataloader"
3
3
version = "0.18.0"
4
4
edition = "2021"
5
-
authors = ["cksac <cs.cksac@gmail.com>", "Lily"]
5
+
authors = ["cksac <cs.cksac@gmail.com>", "Mia"]
6
6
description = "Rust implementation of Facebook's DataLoader using async-await."
7
7
keywords = ["batcher", "dataloader", "cache"]
8
8
categories = ["asynchronous", "caching"]
···
15
15
[badges]
16
16
travis-ci = { repository = "/cksac/dataloader-rs" }
17
17
18
-
[features]
19
-
default = ["runtime-async-std"]
20
-
runtime-async-std = [
21
-
"async-std",
22
-
]
23
-
runtime-tokio = [
24
-
"tokio"
25
-
]
26
-
27
18
[dependencies]
28
-
async-std = { version = "1", optional = true }
29
-
tokio = { version = "1", features = [ "sync", "rt" ], optional = true }
19
+
tokio = { version = "1", features = [ "sync", "rt" ] }
30
20
31
21
[dev-dependencies]
32
22
futures = "0.3"
-13
dataloader-rs/src/runtime.rs
-13
dataloader-rs/src/runtime.rs
···
1
-
// runtime-async-std
2
-
#[cfg(feature = "runtime-async-std")]
3
-
pub type Arc<T> = async_std::sync::Arc<T>;
4
-
5
-
#[cfg(feature = "runtime-async-std")]
6
-
pub type Mutex<T> = async_std::sync::Mutex<T>;
7
-
8
-
#[cfg(feature = "runtime-async-std")]
9
-
pub use async_std::task::yield_now;
10
-
11
1
// runtime-tokio
12
-
#[cfg(feature = "runtime-tokio")]
13
2
pub type Arc<T> = std::sync::Arc<T>;
14
3
15
-
#[cfg(feature = "runtime-tokio")]
16
4
pub type Mutex<T> = tokio::sync::Mutex<T>;
17
5
18
-
#[cfg(feature = "runtime-tokio")]
19
6
pub use tokio::task::yield_now;
+4
lexica/src/app_bsky/actor.rs
+4
lexica/src/app_bsky/actor.rs
···
160
160
pub verification: Option<VerificationState>,
161
161
#[serde(skip_serializing_if = "Option::is_none")]
162
162
pub status: Option<StatusView>,
163
+
#[serde(skip_serializing_if = "Option::is_none")]
164
+
pub pronouns: Option<String>,
163
165
164
166
pub created_at: DateTime<Utc>,
165
167
}
···
186
188
pub verification: Option<VerificationState>,
187
189
#[serde(skip_serializing_if = "Option::is_none")]
188
190
pub status: Option<StatusView>,
191
+
#[serde(skip_serializing_if = "Option::is_none")]
192
+
pub pronouns: Option<String>,
189
193
190
194
pub created_at: DateTime<Utc>,
191
195
pub indexed_at: NaiveDateTime,
+1
lexica/src/app_bsky/mod.rs
+1
lexica/src/app_bsky/mod.rs
+33
lexica/src/app_bsky/unspecced.rs
+33
lexica/src/app_bsky/unspecced.rs
···
1
+
use crate::app_bsky::feed::{BlockedAuthor, PostView};
2
+
use serde::Serialize;
3
+
4
+
#[derive(Clone, Debug, Serialize)]
5
+
pub struct ThreadV2Item {
6
+
pub uri: String,
7
+
pub depth: i32,
8
+
pub value: ThreadV2ItemType,
9
+
}
10
+
11
+
#[derive(Clone, Debug, Serialize)]
12
+
#[serde(tag = "$type")]
13
+
pub enum ThreadV2ItemType {
14
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemPost")]
15
+
Post(ThreadItemPost),
16
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemNoUnauthenticated")]
17
+
NoUnauthenticated {},
18
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemNotFound")]
19
+
NotFound {},
20
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemBlocked")]
21
+
Blocked { author: BlockedAuthor },
22
+
}
23
+
24
+
#[derive(Clone, Debug, Serialize)]
25
+
#[serde(rename_all = "camelCase")]
26
+
pub struct ThreadItemPost {
27
+
pub post: PostView,
28
+
pub more_parents: bool,
29
+
pub more_replies: i32,
30
+
pub op_thread: bool,
31
+
pub hidden_by_threadgate: bool,
32
+
pub muted_by_viewer: bool,
33
+
}
+2
-2
migrations/2025-02-16-142357_posts/up.sql
+2
-2
migrations/2025-02-16-142357_posts/up.sql
+15
migrations/2025-09-27-171241_post-tweaks/down.sql
+15
migrations/2025-09-27-171241_post-tweaks/down.sql
···
1
+
alter table posts
2
+
drop column mentions,
3
+
drop column violates_threadgate;
4
+
5
+
drop trigger t_author_feed_ins_post on posts;
6
+
drop trigger t_author_feed_del_post on posts;
7
+
drop trigger t_author_feed_ins_repost on reposts;
8
+
drop trigger t_author_feed_del_repost on reposts;
9
+
10
+
drop function f_author_feed_ins_post;
11
+
drop function f_author_feed_del_post;
12
+
drop function f_author_feed_ins_repost;
13
+
drop function f_author_feed_del_repost;
14
+
15
+
drop table author_feeds;
+79
migrations/2025-09-27-171241_post-tweaks/up.sql
+79
migrations/2025-09-27-171241_post-tweaks/up.sql
···
1
+
alter table posts
2
+
add column mentions text[],
3
+
add column violates_threadgate bool not null default false;
4
+
5
+
create table author_feeds
6
+
(
7
+
uri text primary key,
8
+
cid text not null,
9
+
post text not null,
10
+
did text not null,
11
+
typ text not null,
12
+
sort_at timestamptz not null
13
+
);
14
+
15
+
-- author_feeds post triggers
16
+
create function f_author_feed_ins_post() returns trigger
17
+
language plpgsql as
18
+
$$
19
+
begin
20
+
insert into author_feeds (uri, cid, post, did, typ, sort_at)
21
+
VALUES (NEW.at_uri, NEW.cid, NEW.at_uri, NEW.did, 'post', NEW.created_at)
22
+
on conflict do nothing;
23
+
return NEW;
24
+
end;
25
+
$$;
26
+
27
+
create trigger t_author_feed_ins_post
28
+
before insert
29
+
on posts
30
+
for each row
31
+
execute procedure f_author_feed_ins_post();
32
+
33
+
create function f_author_feed_del_post() returns trigger
34
+
language plpgsql as
35
+
$$
36
+
begin
37
+
delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post';
38
+
return OLD;
39
+
end;
40
+
$$;
41
+
42
+
create trigger t_author_feed_del_post
43
+
before delete
44
+
on posts
45
+
for each row
46
+
execute procedure f_author_feed_del_post();
47
+
48
+
-- author_feeds repost triggers
49
+
create function f_author_feed_ins_repost() returns trigger
50
+
language plpgsql as
51
+
$$
52
+
begin
53
+
insert into author_feeds (uri, cid, post, did, typ, sort_at)
54
+
VALUES ('at://' || NEW.did || 'app.bsky.feed.repost' || NEW.rkey, NEW.post_cid, NEW.post, NEW.did, 'repost', NEW.created_at)
55
+
on conflict do nothing;
56
+
return NEW;
57
+
end;
58
+
$$;
59
+
60
+
create trigger t_author_feed_ins_repost
61
+
before insert
62
+
on reposts
63
+
for each row
64
+
execute procedure f_author_feed_ins_repost();
65
+
66
+
create function f_author_feed_del_repost() returns trigger
67
+
language plpgsql as
68
+
$$
69
+
begin
70
+
delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost';
71
+
return OLD;
72
+
end;
73
+
$$;
74
+
75
+
create trigger t_author_feed_del_repost
76
+
before delete
77
+
on reposts
78
+
for each row
79
+
execute procedure f_author_feed_del_repost();
+1
-1
parakeet/Cargo.toml
+1
-1
parakeet/Cargo.toml
···
9
9
axum-extra = { version = "0.10.0", features = ["query", "typed-header"] }
10
10
base64 = "0.22"
11
11
chrono = { version = "0.4.39", features = ["serde"] }
12
-
dataloader = { path = "../dataloader-rs", default-features = false, features = ["runtime-tokio"] }
12
+
dataloader = { path = "../dataloader-rs" }
13
13
deadpool = { version = "0.12.1", features = ["managed"] }
14
14
did-resolver = { path = "../did-resolver" }
15
15
diesel = { version = "2.2.6", features = ["chrono", "serde_json"] }
+95
-1
parakeet/src/db.rs
+95
-1
parakeet/src/db.rs
···
1
1
use diesel::prelude::*;
2
-
use diesel::sql_types::{Array, Bool, Nullable, Text};
2
+
use diesel::sql_types::{Array, Bool, Integer, Nullable, Text};
3
3
use diesel_async::{AsyncPgConnection, RunQueryDsl};
4
4
use parakeet_db::{schema, types};
5
+
use parakeet_db::models::TextArray;
5
6
6
7
pub async fn get_actor_status(
7
8
conn: &mut AsyncPgConnection,
···
196
197
.await
197
198
.optional()
198
199
}
200
+
201
+
#[derive(Debug, QueryableByName)]
202
+
#[diesel(check_for_backend(diesel::pg::Pg))]
203
+
#[allow(unused)]
204
+
pub struct ThreadItem {
205
+
#[diesel(sql_type = Text)]
206
+
pub at_uri: String,
207
+
#[diesel(sql_type = Nullable<Text>)]
208
+
pub parent_uri: Option<String>,
209
+
#[diesel(sql_type = Nullable<Text>)]
210
+
pub root_uri: Option<String>,
211
+
#[diesel(sql_type = Integer)]
212
+
pub depth: i32,
213
+
}
214
+
215
+
pub async fn get_thread_children(
216
+
conn: &mut AsyncPgConnection,
217
+
uri: &str,
218
+
depth: i32,
219
+
) -> QueryResult<Vec<ThreadItem>> {
220
+
diesel::sql_query(include_str!("sql/thread.sql"))
221
+
.bind::<Text, _>(uri)
222
+
.bind::<Integer, _>(depth)
223
+
.load(conn)
224
+
.await
225
+
}
226
+
227
+
pub async fn get_thread_children_branching(
228
+
conn: &mut AsyncPgConnection,
229
+
uri: &str,
230
+
depth: i32,
231
+
branching_factor: i32,
232
+
) -> QueryResult<Vec<ThreadItem>> {
233
+
diesel::sql_query(include_str!("sql/thread_branching.sql"))
234
+
.bind::<Text, _>(uri)
235
+
.bind::<Integer, _>(depth)
236
+
.bind::<Integer, _>(branching_factor)
237
+
.load(conn)
238
+
.await
239
+
}
240
+
241
+
#[derive(Debug, QueryableByName)]
242
+
#[diesel(check_for_backend(diesel::pg::Pg))]
243
+
pub struct HiddenThreadChildItem {
244
+
#[diesel(sql_type = Text)]
245
+
pub at_uri: String,
246
+
}
247
+
248
+
pub async fn get_thread_children_hidden(
249
+
conn: &mut AsyncPgConnection,
250
+
uri: &str,
251
+
root: &str,
252
+
) -> QueryResult<Vec<HiddenThreadChildItem>> {
253
+
diesel::sql_query(include_str!("sql/thread_v2_hidden_children.sql"))
254
+
.bind::<Text, _>(uri)
255
+
.bind::<Text, _>(root)
256
+
.load(conn)
257
+
.await
258
+
}
259
+
260
+
pub async fn get_thread_parents(
261
+
conn: &mut AsyncPgConnection,
262
+
uri: &str,
263
+
height: i32,
264
+
) -> QueryResult<Vec<ThreadItem>> {
265
+
diesel::sql_query(include_str!("sql/thread_parent.sql"))
266
+
.bind::<Text, _>(uri)
267
+
.bind::<Integer, _>(height)
268
+
.load(conn)
269
+
.await
270
+
}
271
+
272
+
pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> {
273
+
schema::posts::table
274
+
.select(schema::posts::root_uri)
275
+
.find(&uri)
276
+
.get_result(conn)
277
+
.await
278
+
.optional()
279
+
.map(|v| v.flatten())
280
+
}
281
+
282
+
pub async fn get_threadgate_hiddens(
283
+
conn: &mut AsyncPgConnection,
284
+
uri: &str,
285
+
) -> QueryResult<Option<TextArray>> {
286
+
schema::threadgates::table
287
+
.select(schema::threadgates::hidden_replies)
288
+
.find(&uri)
289
+
.get_result(conn)
290
+
.await
291
+
.optional()
292
+
}
+5
-9
parakeet/src/hydration/labeler.rs
+5
-9
parakeet/src/hydration/labeler.rs
···
42
42
likes: Option<i32>,
43
43
) -> LabelerViewDetailed {
44
44
let reason_types = labeler.reasons.map(|v| {
45
-
v.into_iter()
46
-
.flatten()
47
-
.filter_map(|v| ReasonType::from_str(&v).ok())
45
+
v.iter()
46
+
.filter_map(|v| ReasonType::from_str(v).ok())
48
47
.collect()
49
48
});
50
49
···
74
73
})
75
74
.collect();
76
75
let subject_types = labeler.subject_types.map(|v| {
77
-
v.into_iter()
78
-
.flatten()
79
-
.filter_map(|v| SubjectType::from_str(&v).ok())
76
+
v.iter()
77
+
.filter_map(|v| SubjectType::from_str(v).ok())
80
78
.collect()
81
79
});
82
-
let subject_collections = labeler
83
-
.subject_collections
84
-
.map(|v| v.into_iter().flatten().collect());
80
+
let subject_collections = labeler.subject_collections.map(Vec::from);
85
81
86
82
LabelerViewDetailed {
87
83
uri: format!("at://{}/app.bsky.labeler.service/self", labeler.did),
+8
parakeet/src/hydration/list.rs
+8
parakeet/src/hydration/list.rs
···
78
78
}
79
79
80
80
pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> {
81
+
if lists.is_empty() {
82
+
return HashMap::new();
83
+
}
84
+
81
85
let labels = self.get_label_many(&lists).await;
82
86
let viewers = self.get_list_viewer_states(&lists).await;
83
87
let lists = self.loaders.list.load_many(lists).await;
···
103
107
}
104
108
105
109
pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> {
110
+
if lists.is_empty() {
111
+
return HashMap::new();
112
+
}
113
+
106
114
let labels = self.get_label_many(&lists).await;
107
115
let viewers = self.get_list_viewer_states(&lists).await;
108
116
let lists = self.loaders.list.load_many(lists).await;
+173
-92
parakeet/src/hydration/posts.rs
+173
-92
parakeet/src/hydration/posts.rs
···
3
3
use lexica::app_bsky::actor::ProfileViewBasic;
4
4
use lexica::app_bsky::embed::Embed;
5
5
use lexica::app_bsky::feed::{
6
-
BlockedAuthor, FeedViewPost, PostView, PostViewerState, ReplyRef, ReplyRefPost, ThreadgateView,
6
+
BlockedAuthor, FeedReasonRepost, FeedViewPost, FeedViewPostReason, PostView, PostViewerState,
7
+
ReplyRef, ReplyRefPost, ThreadgateView,
7
8
};
8
9
use lexica::app_bsky::graph::ListViewBasic;
9
10
use lexica::app_bsky::RecordStats;
···
32
33
}
33
34
}
34
35
36
+
type HydratePostsRet = (
37
+
models::Post,
38
+
ProfileViewBasic,
39
+
Vec<models::Label>,
40
+
Option<Embed>,
41
+
Option<ThreadgateView>,
42
+
Option<PostViewerState>,
43
+
Option<PostStats>,
44
+
);
45
+
35
46
fn build_postview(
36
-
post: models::Post,
37
-
author: ProfileViewBasic,
38
-
labels: Vec<models::Label>,
39
-
embed: Option<Embed>,
40
-
threadgate: Option<ThreadgateView>,
41
-
viewer: Option<PostViewerState>,
42
-
stats: Option<PostStats>,
47
+
(post, author, labels, embed, threadgate, viewer, stats): HydratePostsRet,
43
48
) -> PostView {
44
49
let stats = stats
45
50
.map(|stats| RecordStats {
···
83
88
) -> Option<ThreadgateView> {
84
89
let threadgate = threadgate?;
85
90
86
-
let lists = threadgate
87
-
.allowed_lists
88
-
.iter()
89
-
.flatten()
90
-
.cloned()
91
-
.collect::<Vec<_>>();
91
+
let lists = match threadgate.allowed_lists.as_ref() {
92
+
Some(allowed_lists) => allowed_lists.clone().into(),
93
+
None => Vec::new(),
94
+
};
92
95
let lists = self.hydrate_lists_basic(lists).await;
93
96
94
97
Some(build_threadgate_view(
···
102
105
threadgates: Vec<models::Threadgate>,
103
106
) -> HashMap<String, ThreadgateView> {
104
107
let lists = threadgates.iter().fold(Vec::new(), |mut acc, c| {
105
-
acc.extend(c.allowed_lists.iter().flatten().cloned());
108
+
if let Some(lists) = &c.allowed_lists {
109
+
acc.extend(lists.clone().0);
110
+
}
106
111
acc
107
112
});
108
113
let lists = self.hydrate_lists_basic(lists).await;
···
110
115
threadgates
111
116
.into_iter()
112
117
.map(|threadgate| {
113
-
let this_lists = threadgate
114
-
.allowed_lists
115
-
.iter()
116
-
.filter_map(|v| v.clone().and_then(|v| lists.get(&v).cloned()))
117
-
.collect();
118
+
let this_lists = match &threadgate.allowed_lists {
119
+
Some(allowed_lists) => allowed_lists
120
+
.iter()
121
+
.filter_map(|v| lists.get(v).cloned())
122
+
.collect(),
123
+
None => Vec::new(),
124
+
};
118
125
119
126
(
120
127
threadgate.at_uri.clone(),
···
133
140
let threadgate = self.hydrate_threadgate(threadgate).await;
134
141
let labels = self.get_label(&post.at_uri).await;
135
142
136
-
Some(build_postview(
143
+
Some(build_postview((
137
144
post, author, labels, embed, threadgate, viewer, stats,
138
-
))
145
+
)))
139
146
}
140
147
141
-
pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> {
148
+
async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> {
142
149
let stats = self.loaders.post_stats.load_many(posts.clone()).await;
143
150
let posts = self.loaders.posts.load_many(posts).await;
144
151
···
148
155
.unzip::<_, _, Vec<_>, Vec<_>>();
149
156
let authors = self.hydrate_profiles_basic(authors).await;
150
157
151
-
let post_labels = self.get_label_many(&post_uris).await;
152
-
let viewer_data = self.get_post_viewer_states(&post_uris).await;
158
+
let mut post_labels = self.get_label_many(&post_uris).await;
159
+
let mut viewer_data = self.get_post_viewer_states(&post_uris).await;
153
160
154
161
let threadgates = posts
155
162
.values()
···
157
164
.collect();
158
165
let threadgates = self.hydrate_threadgates(threadgates).await;
159
166
160
-
let embeds = self.hydrate_embeds(post_uris).await;
167
+
let mut embeds = self.hydrate_embeds(post_uris).await;
161
168
162
169
posts
163
170
.into_iter()
164
171
.filter_map(|(uri, (post, threadgate))| {
165
-
let author = authors.get(&post.did)?;
166
-
let embed = embeds.get(&uri).cloned();
172
+
let author = authors.get(&post.did)?.clone();
173
+
let embed = embeds.remove(&uri);
167
174
let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned());
168
-
let labels = post_labels.get(&uri).cloned().unwrap_or_default();
175
+
let labels = post_labels.remove(&uri).unwrap_or_default();
169
176
let stats = stats.get(&uri).cloned();
170
-
let viewer = viewer_data.get(&uri).cloned();
177
+
let viewer = viewer_data.remove(&uri);
171
178
172
179
Some((
173
180
uri,
174
-
build_postview(
175
-
post,
176
-
author.to_owned(),
177
-
labels,
178
-
embed,
179
-
threadgate,
180
-
viewer,
181
-
stats,
182
-
),
181
+
(post, author, labels, embed, threadgate, viewer, stats),
183
182
))
184
183
})
185
184
.collect()
186
185
}
187
186
188
-
pub async fn hydrate_feed_posts(&self, posts: Vec<String>) -> HashMap<String, FeedViewPost> {
189
-
let stats = self.loaders.post_stats.load_many(posts.clone()).await;
190
-
let posts = self.loaders.posts.load_many(posts).await;
187
+
pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> {
188
+
self.hydrate_posts_inner(posts)
189
+
.await
190
+
.into_iter()
191
+
.map(|(uri, data)| (uri, build_postview(data)))
192
+
.collect()
193
+
}
191
194
192
-
let (authors, post_uris) = posts
193
-
.values()
194
-
.map(|(post, _)| (post.did.clone(), post.at_uri.clone()))
195
-
.unzip::<_, _, Vec<_>, Vec<_>>();
196
-
let authors = self.hydrate_profiles_basic(authors).await;
197
-
198
-
let post_labels = self.get_label_many(&post_uris).await;
199
-
let viewer_data = self.get_post_viewer_states(&post_uris).await;
200
-
let embeds = self.hydrate_embeds(post_uris).await;
195
+
pub async fn hydrate_feed_posts(
196
+
&self,
197
+
posts: Vec<RawFeedItem>,
198
+
author_threads_only: bool,
199
+
) -> Vec<FeedViewPost> {
200
+
let post_uris = posts
201
+
.iter()
202
+
.map(|item| item.post_uri().to_string())
203
+
.collect::<Vec<_>>();
204
+
let mut posts_hyd = self.hydrate_posts_inner(post_uris).await;
201
205
202
-
let reply_refs = posts
206
+
// we shouldn't show the parent when the post violates a threadgate.
207
+
let reply_refs = posts_hyd
203
208
.values()
204
-
.flat_map(|(post, _)| [post.parent_uri.clone(), post.root_uri.clone()])
209
+
.filter(|(post, ..)| !post.violates_threadgate)
210
+
.flat_map(|(post, ..)| [post.parent_uri.clone(), post.root_uri.clone()])
205
211
.flatten()
206
212
.collect::<Vec<_>>();
213
+
let reply_posts = self.hydrate_posts(reply_refs).await;
207
214
208
-
let reply_posts = self.hydrate_posts(reply_refs).await;
215
+
let repost_profiles = posts
216
+
.iter()
217
+
.filter_map(|item| item.repost_by())
218
+
.collect::<Vec<_>>();
219
+
let profiles_hydrated = self.hydrate_profiles_basic(repost_profiles).await;
209
220
210
221
posts
211
222
.into_iter()
212
-
.filter_map(|(post_uri, (post, _))| {
213
-
let author = authors.get(&post.did)?;
223
+
.filter_map(|item| {
224
+
let post = posts_hyd.remove(item.post_uri())?;
225
+
let context = item.context();
226
+
227
+
let reply = if let RawFeedItem::Post { .. } = item {
228
+
let root_uri = post.0.root_uri.as_ref();
229
+
let parent_uri = post.0.parent_uri.as_ref();
230
+
231
+
let (root, parent) = if author_threads_only {
232
+
if root_uri.is_some() && parent_uri.is_some() {
233
+
let root = root_uri.and_then(|uri| posts_hyd.get(uri))?;
234
+
let parent = parent_uri.and_then(|uri| posts_hyd.get(uri))?;
235
+
236
+
let root = build_postview(root.clone());
237
+
let parent = build_postview(parent.clone());
214
238
215
-
let root = post.root_uri.as_ref().and_then(|uri| reply_posts.get(uri));
216
-
let parent = post
217
-
.parent_uri
218
-
.as_ref()
219
-
.and_then(|uri| reply_posts.get(uri));
239
+
(Some(root), Some(parent))
240
+
} else {
241
+
(None, None)
242
+
}
243
+
} else {
244
+
let root = root_uri.and_then(|uri| reply_posts.get(uri)).cloned();
245
+
let parent = parent_uri.and_then(|uri| reply_posts.get(uri)).cloned();
220
246
221
-
let reply = if post.parent_uri.is_some() && post.root_uri.is_some() {
222
-
Some(ReplyRef {
223
-
root: root.cloned().map(postview_to_replyref).unwrap_or(
224
-
ReplyRefPost::NotFound {
225
-
uri: post.root_uri.as_ref().unwrap().clone(),
226
-
not_found: true,
227
-
},
228
-
),
229
-
parent: parent.cloned().map(postview_to_replyref).unwrap_or(
230
-
ReplyRefPost::NotFound {
231
-
uri: post.parent_uri.as_ref().unwrap().clone(),
232
-
not_found: true,
233
-
},
234
-
),
235
-
grandparent_author: None,
236
-
})
247
+
(root, parent)
248
+
};
249
+
250
+
if root_uri.is_some() || parent_uri.is_some() {
251
+
Some(ReplyRef {
252
+
root: root.map(postview_to_replyref).unwrap_or(
253
+
ReplyRefPost::NotFound {
254
+
uri: root_uri.unwrap().to_owned(),
255
+
not_found: true,
256
+
},
257
+
),
258
+
parent: parent.map(postview_to_replyref).unwrap_or(
259
+
ReplyRefPost::NotFound {
260
+
uri: parent_uri.unwrap().to_owned(),
261
+
not_found: true,
262
+
},
263
+
),
264
+
grandparent_author: None,
265
+
})
266
+
} else {
267
+
None
268
+
}
237
269
} else {
238
270
None
239
271
};
240
272
241
-
let embed = embeds.get(&post_uri).cloned();
242
-
let labels = post_labels.get(&post_uri).cloned().unwrap_or_default();
243
-
let stats = stats.get(&post_uri).cloned();
244
-
let viewer = viewer_data.get(&post_uri).cloned();
245
-
let post =
246
-
build_postview(post, author.to_owned(), labels, embed, None, viewer, stats);
273
+
let reason = match item {
274
+
RawFeedItem::Repost { uri, by, at, .. } => {
275
+
Some(FeedViewPostReason::Repost(FeedReasonRepost {
276
+
by: profiles_hydrated.get(&by).cloned()?,
277
+
uri: Some(uri),
278
+
cid: None,
279
+
indexed_at: at,
280
+
}))
281
+
}
282
+
RawFeedItem::Pin { .. } => Some(FeedViewPostReason::Pin),
283
+
_ => None,
284
+
};
247
285
248
-
Some((
249
-
post_uri,
250
-
FeedViewPost {
251
-
post,
252
-
reply,
253
-
reason: None,
254
-
feed_context: None,
255
-
},
256
-
))
286
+
let post = build_postview(post);
287
+
288
+
Some(FeedViewPost {
289
+
post,
290
+
reply,
291
+
reason,
292
+
feed_context: context,
293
+
})
257
294
})
258
295
.collect()
259
296
}
···
297
334
_ => ReplyRefPost::Post(post),
298
335
}
299
336
}
337
+
338
+
#[derive(Debug)]
339
+
pub enum RawFeedItem {
340
+
Pin {
341
+
uri: String,
342
+
context: Option<String>,
343
+
},
344
+
Post {
345
+
uri: String,
346
+
context: Option<String>,
347
+
},
348
+
Repost {
349
+
uri: String,
350
+
post: String,
351
+
by: String,
352
+
at: chrono::DateTime<chrono::Utc>,
353
+
context: Option<String>,
354
+
},
355
+
}
356
+
357
+
impl RawFeedItem {
358
+
fn post_uri(&self) -> &str {
359
+
match self {
360
+
RawFeedItem::Pin { uri, .. } => uri,
361
+
RawFeedItem::Post { uri, .. } => uri,
362
+
RawFeedItem::Repost { post, .. } => post,
363
+
}
364
+
}
365
+
366
+
fn repost_by(&self) -> Option<String> {
367
+
match self {
368
+
RawFeedItem::Repost { by, .. } => Some(by.clone()),
369
+
_ => None,
370
+
}
371
+
}
372
+
373
+
fn context(&self) -> Option<String> {
374
+
match self {
375
+
RawFeedItem::Pin { context, .. } => context.clone(),
376
+
RawFeedItem::Post { context, .. } => context.clone(),
377
+
RawFeedItem::Repost { context, .. } => context.clone(),
378
+
}
379
+
}
380
+
}
+2
parakeet/src/hydration/profile.rs
+2
parakeet/src/hydration/profile.rs
···
201
201
labels: map_labels(labels),
202
202
verification,
203
203
status,
204
+
pronouns: profile.pronouns,
204
205
created_at: profile.created_at.and_utc(),
205
206
}
206
207
}
···
229
230
labels: map_labels(labels),
230
231
verification,
231
232
status,
233
+
pronouns: profile.pronouns,
232
234
created_at: profile.created_at.and_utc(),
233
235
indexed_at: profile.indexed_at,
234
236
}
+3
-7
parakeet/src/hydration/starter_packs.rs
+3
-7
parakeet/src/hydration/starter_packs.rs
···
96
96
let feeds = sp
97
97
.feeds
98
98
.clone()
99
-
.unwrap_or_default()
100
-
.into_iter()
101
-
.flatten()
102
-
.collect();
103
-
let feeds = self.hydrate_feedgens(feeds).await.into_values().collect();
99
+
.unwrap_or_default();
100
+
let feeds = self.hydrate_feedgens(feeds.into()).await.into_values().collect();
104
101
105
102
Some(build_spview(sp, creator, labels, list, feeds))
106
103
}
···
119
116
let feeds = packs
120
117
.values()
121
118
.filter_map(|pack| pack.feeds.clone())
122
-
.flat_map(|feeds| feeds.into_iter().flatten())
119
+
.flat_map(Vec::from)
123
120
.collect();
124
121
125
122
let creators = self.hydrate_profiles_basic(creators).await;
···
133
130
let list = lists.get(&pack.list).cloned();
134
131
let feeds = pack.feeds.as_ref().map(|v| {
135
132
v.iter()
136
-
.flatten()
137
133
.filter_map(|feed| feeds.get(feed).cloned())
138
134
.collect()
139
135
});
+4
-1
parakeet/src/loaders.rs
+4
-1
parakeet/src/loaders.rs
···
4
4
use dataloader::async_cached::Loader;
5
5
use dataloader::non_cached::Loader as NonCachedLoader;
6
6
use dataloader::BatchFn;
7
+
use diesel::dsl::sql;
7
8
use diesel::prelude::*;
8
9
use diesel_async::pooled_connection::deadpool::Pool;
9
10
use diesel_async::{AsyncPgConnection, RunQueryDsl};
···
368
369
let mut conn = self.0.get().await.unwrap();
369
370
370
371
let res = schema::posts::table
371
-
.left_join(schema::threadgates::table)
372
+
.left_join(schema::threadgates::table.on(
373
+
schema::threadgates::post_uri.eq(sql("coalesce(posts.root_uri, posts.at_uri)")),
374
+
))
372
375
.select((
373
376
models::Post::as_select(),
374
377
Option::<models::Threadgate>::as_select(),
+3
-3
parakeet/src/sql/thread.sql
+3
-3
parakeet/src/sql/thread.sql
···
1
-
with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth
1
+
with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth
2
2
from posts
3
-
where parent_uri = $1
3
+
where parent_uri = $1 and violates_threadgate=FALSE
4
4
union all
5
5
select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1
6
6
from posts p
7
7
join thread on p.parent_uri = thread.at_uri
8
-
where thread.depth <= $2)
8
+
where thread.depth <= $2 and p.violates_threadgate=FALSE)
9
9
select *
10
10
from thread
11
11
order by depth desc;
+13
parakeet/src/sql/thread_branching.sql
+13
parakeet/src/sql/thread_branching.sql
···
1
+
with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth
2
+
from posts
3
+
where parent_uri = $1
4
+
and violates_threadgate = FALSE
5
+
union all
6
+
(select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1
7
+
from posts p
8
+
join thread on p.parent_uri = thread.at_uri
9
+
where thread.depth <= $2
10
+
and violates_threadgate = FALSE
11
+
LIMIT $3))
12
+
select *
13
+
from thread;
+4
-2
parakeet/src/sql/thread_parent.sql
+4
-2
parakeet/src/sql/thread_parent.sql
···
1
1
with recursive parents as (select at_uri, cid, parent_uri, root_uri, 0 as depth
2
2
from posts
3
-
where at_uri = (select parent_uri from posts where at_uri = $1)
3
+
where
4
+
at_uri = (select parent_uri from posts where at_uri = $1 and violates_threadgate = FALSE)
4
5
union all
5
6
select p.at_uri, p.cid, p.parent_uri, p.root_uri, parents.depth + 1
6
7
from posts p
7
8
join parents on p.at_uri = parents.parent_uri
8
-
where parents.depth <= $2)
9
+
where parents.depth <= $2
10
+
and p.violates_threadgate = FALSE)
9
11
select *
10
12
from parents
11
13
order by depth desc;
+7
-8
parakeet/src/xrpc/app_bsky/feed/likes.rs
+7
-8
parakeet/src/xrpc/app_bsky/feed/likes.rs
···
1
+
use crate::hydration::posts::RawFeedItem;
1
2
use crate::hydration::StatefulHydrator;
2
3
use crate::xrpc::error::{Error, XrpcResult};
3
4
use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth};
···
57
58
.last()
58
59
.map(|(last, _)| last.timestamp_millis().to_string());
59
60
60
-
let at_uris = results
61
+
let raw_feed = results
61
62
.iter()
62
-
.map(|(_, uri)| uri.clone())
63
+
.map(|(_, uri)| RawFeedItem::Post {
64
+
uri: uri.clone(),
65
+
context: None,
66
+
})
63
67
.collect::<Vec<_>>();
64
68
65
-
let mut posts = hyd.hydrate_feed_posts(at_uris).await;
66
-
67
-
let feed: Vec<_> = results
68
-
.into_iter()
69
-
.filter_map(|(_, uri)| posts.remove(&uri))
70
-
.collect();
69
+
let feed = hyd.hydrate_feed_posts(raw_feed, false).await;
71
70
72
71
Ok(Json(FeedRes { cursor, feed }))
73
72
}
+85
-116
parakeet/src/xrpc/app_bsky/feed/posts.rs
+85
-116
parakeet/src/xrpc/app_bsky/feed/posts.rs
···
1
+
use crate::hydration::posts::RawFeedItem;
1
2
use crate::hydration::StatefulHydrator;
2
3
use crate::xrpc::app_bsky::graph::lists::ListWithCursorQuery;
3
4
use crate::xrpc::error::{Error, XrpcResult};
···
16
17
use diesel_async::{AsyncPgConnection, RunQueryDsl};
17
18
use lexica::app_bsky::actor::ProfileView;
18
19
use lexica::app_bsky::feed::{
19
-
BlockedAuthor, FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason,
20
-
PostView, SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView,
20
+
BlockedAuthor, FeedSkeletonResponse, FeedViewPost, PostView, SkeletonReason, ThreadViewPost,
21
+
ThreadViewPostType, ThreadgateView,
21
22
};
22
-
use parakeet_db::schema;
23
+
use parakeet_db::{models, schema};
23
24
use reqwest::Url;
24
25
use serde::{Deserialize, Serialize};
25
26
use std::collections::HashMap;
···
113
114
114
115
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
115
116
116
-
let at_uris = skeleton.feed.iter().map(|v| v.post.clone()).collect();
117
117
let repost_skeleton = skeleton
118
118
.feed
119
119
.iter()
···
122
122
_ => None,
123
123
})
124
124
.collect::<Vec<_>>();
125
+
let mut repost_data = get_skeleton_repost_data(&mut conn, repost_skeleton).await;
125
126
126
-
let mut posts = hyd.hydrate_feed_posts(at_uris).await;
127
-
let mut repost_data = get_skeleton_repost_data(&mut conn, &hyd, repost_skeleton).await;
128
-
129
-
let feed = skeleton
127
+
let raw_feed = skeleton
130
128
.feed
131
129
.into_iter()
132
-
.filter_map(|item| {
133
-
let mut post = posts.remove(&item.post)?;
134
-
let reason = match item.reason {
135
-
Some(SkeletonReason::Repost { repost }) => {
136
-
repost_data.remove(&repost).map(FeedViewPostReason::Repost)
137
-
}
138
-
Some(SkeletonReason::Pin {}) => Some(FeedViewPostReason::Pin),
139
-
_ => None,
140
-
};
141
-
142
-
post.reason = reason;
143
-
post.feed_context = item.feed_context;
144
-
145
-
Some(post)
130
+
.filter_map(|v| match v.reason {
131
+
Some(SkeletonReason::Repost { repost }) => {
132
+
repost_data
133
+
.remove_entry(&repost)
134
+
.map(|(uri, (by, at))| RawFeedItem::Repost {
135
+
uri,
136
+
post: v.post,
137
+
by,
138
+
at: at.and_utc(),
139
+
context: v.feed_context,
140
+
})
141
+
}
142
+
Some(SkeletonReason::Pin {}) => Some(RawFeedItem::Pin {
143
+
uri: v.post,
144
+
context: v.feed_context,
145
+
}),
146
+
None => Some(RawFeedItem::Post {
147
+
uri: v.post,
148
+
context: v.feed_context,
149
+
}),
146
150
})
147
151
.collect();
152
+
153
+
let feed = hyd.hydrate_feed_posts(raw_feed, false).await;
148
154
149
155
Ok(Json(FeedRes {
150
156
cursor: skeleton.cursor,
···
152
158
}))
153
159
}
154
160
155
-
#[derive(Debug, Deserialize)]
161
+
#[derive(Debug, Default, Eq, PartialEq, Deserialize)]
156
162
#[serde(rename_all = "snake_case")]
157
163
pub enum GetAuthorFeedFilter {
164
+
#[default]
158
165
PostsWithReplies,
159
166
PostsNoReplies,
160
167
PostsWithMedia,
161
168
PostsAndAuthorThreads,
162
169
PostsWithVideo,
163
-
}
164
-
165
-
impl Default for GetAuthorFeedFilter {
166
-
fn default() -> Self {
167
-
Self::PostsWithReplies
168
-
}
169
170
}
170
171
171
172
#[derive(Debug, Deserialize)]
···
209
210
210
211
let pin = match query.include_pins && query.cursor.is_none() {
211
212
false => None,
212
-
true => match crate::db::get_pinned_post_uri(&mut conn, &did).await? {
213
-
Some(post) => hyd.hydrate_post(post).await,
214
-
None => None,
215
-
},
213
+
true => crate::db::get_pinned_post_uri(&mut conn, &did).await?,
216
214
};
217
215
218
216
let limit = query.limit.unwrap_or(50).clamp(1, 100);
219
217
220
-
let mut posts_query = schema::posts::table
221
-
.select((schema::posts::created_at, schema::posts::at_uri))
222
-
.filter(schema::posts::did.eq(did))
218
+
let mut posts_query = schema::author_feeds::table
219
+
.select(models::AuthorFeedItem::as_select())
220
+
.left_join(schema::posts::table.on(schema::posts::at_uri.eq(schema::author_feeds::post)))
221
+
.filter(schema::author_feeds::did.eq(&did))
223
222
.into_boxed();
224
223
225
224
if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) {
226
-
posts_query = posts_query.filter(schema::posts::created_at.lt(cursor));
225
+
posts_query = posts_query.filter(schema::author_feeds::sort_at.lt(cursor));
227
226
}
228
227
228
+
let author_threads_only = query.filter == GetAuthorFeedFilter::PostsAndAuthorThreads;
229
229
posts_query = match query.filter {
230
-
GetAuthorFeedFilter::PostsWithReplies => posts_query,
230
+
GetAuthorFeedFilter::PostsWithReplies => {
231
+
posts_query.filter(schema::author_feeds::typ.eq("post"))
232
+
}
231
233
GetAuthorFeedFilter::PostsNoReplies => {
232
234
posts_query.filter(schema::posts::parent_uri.is_null())
233
235
}
234
-
GetAuthorFeedFilter::PostsWithMedia => posts_query.filter(embed_type_filter(&[
235
-
"app.bsky.embed.video",
236
-
"app.bsky.embed.images",
237
-
])),
236
+
GetAuthorFeedFilter::PostsWithMedia => posts_query.filter(
237
+
embed_type_filter(&["app.bsky.embed.video", "app.bsky.embed.images"])
238
+
.and(schema::author_feeds::typ.eq("post")),
239
+
),
238
240
GetAuthorFeedFilter::PostsAndAuthorThreads => posts_query.filter(
239
241
(schema::posts::parent_uri
240
-
.like(format!("at://{}/%", &query.actor))
242
+
.like(format!("at://{did}/%"))
241
243
.or(schema::posts::parent_uri.is_null()))
242
244
.and(
243
245
schema::posts::root_uri
244
-
.like(format!("at://{}/%", &query.actor))
246
+
.like(format!("at://{did}/%"))
245
247
.or(schema::posts::root_uri.is_null()),
246
248
),
247
249
),
248
-
GetAuthorFeedFilter::PostsWithVideo => {
249
-
posts_query.filter(embed_type_filter(&["app.bsky.embed.video"]))
250
-
}
250
+
GetAuthorFeedFilter::PostsWithVideo => posts_query.filter(
251
+
embed_type_filter(&["app.bsky.embed.video"]).and(schema::author_feeds::typ.eq("post")),
252
+
),
251
253
};
252
254
253
255
let results = posts_query
254
-
.order(schema::posts::created_at.desc())
256
+
.order(schema::author_feeds::sort_at.desc())
255
257
.limit(limit as i64)
256
-
.load::<(chrono::DateTime<chrono::Utc>, String)>(&mut conn)
258
+
.load(&mut conn)
257
259
.await?;
258
260
259
261
let cursor = results
260
262
.last()
261
-
.map(|(last, _)| last.timestamp_millis().to_string());
263
+
.map(|item| item.sort_at.timestamp_millis().to_string());
262
264
263
-
let at_uris = results
264
-
.iter()
265
-
.map(|(_, uri)| uri.clone())
265
+
let mut raw_feed = results
266
+
.into_iter()
267
+
.filter_map(|item| match &*item.typ {
268
+
"post" => Some(RawFeedItem::Post {
269
+
uri: item.post,
270
+
context: None,
271
+
}),
272
+
"repost" => Some(RawFeedItem::Repost {
273
+
uri: item.uri,
274
+
post: item.post,
275
+
by: item.did,
276
+
at: item.sort_at,
277
+
context: None,
278
+
}),
279
+
_ => None,
280
+
})
266
281
.collect::<Vec<_>>();
267
282
268
-
let mut posts = hyd.hydrate_feed_posts(at_uris).await;
269
-
270
-
let mut feed: Vec<_> = results
271
-
.into_iter()
272
-
.filter_map(|(_, uri)| posts.remove(&uri))
273
-
.collect();
274
-
275
283
if let Some(post) = pin {
276
-
feed.insert(
284
+
raw_feed.insert(
277
285
0,
278
-
FeedViewPost {
279
-
post,
280
-
reply: None,
281
-
reason: Some(FeedViewPostReason::Pin),
282
-
feed_context: None,
286
+
RawFeedItem::Pin {
287
+
uri: post,
288
+
context: None,
283
289
},
284
290
);
285
291
}
292
+
293
+
let feed = hyd.hydrate_feed_posts(raw_feed, author_threads_only).await;
286
294
287
295
Ok(Json(FeedRes { cursor, feed }))
288
296
}
···
325
333
.last()
326
334
.map(|(last, _)| last.timestamp_millis().to_string());
327
335
328
-
let at_uris = results
336
+
let raw_feed = results
329
337
.iter()
330
-
.map(|(_, uri)| uri.clone())
338
+
.map(|(_, uri)| RawFeedItem::Post {
339
+
uri: uri.clone(),
340
+
context: None,
341
+
})
331
342
.collect::<Vec<_>>();
332
343
333
-
let mut posts = hyd.hydrate_feed_posts(at_uris).await;
334
-
335
-
let feed = results
336
-
.into_iter()
337
-
.filter_map(|(_, uri)| posts.remove(&uri))
338
-
.collect();
344
+
let feed = hyd.hydrate_feed_posts(raw_feed, false).await;
339
345
340
346
Ok(Json(FeedRes { cursor, feed }))
341
347
}
···
355
361
pub threadgate: Option<ThreadgateView>,
356
362
}
357
363
358
-
#[derive(Debug, QueryableByName)]
359
-
#[diesel(check_for_backend(diesel::pg::Pg))]
360
-
struct ThreadItem {
361
-
#[diesel(sql_type = diesel::sql_types::Text)]
362
-
at_uri: String,
363
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
364
-
parent_uri: Option<String>,
365
-
// #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
366
-
// root_uri: Option<String>,
367
-
#[diesel(sql_type = diesel::sql_types::Integer)]
368
-
depth: i32,
369
-
}
370
-
371
364
pub async fn get_post_thread(
372
365
State(state): State<GlobalState>,
373
366
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
···
403
396
}
404
397
}
405
398
406
-
let replies = diesel::sql_query(include_str!("../../../sql/thread.sql"))
407
-
.bind::<diesel::sql_types::Text, _>(&uri)
408
-
.bind::<diesel::sql_types::Integer, _>(depth as i32)
409
-
.load::<ThreadItem>(&mut conn)
410
-
.await?;
411
-
412
-
let parents = diesel::sql_query(include_str!("../../../sql/thread_parent.sql"))
413
-
.bind::<diesel::sql_types::Text, _>(&uri)
414
-
.bind::<diesel::sql_types::Integer, _>(parent_height as i32)
415
-
.load::<ThreadItem>(&mut conn)
416
-
.await?;
399
+
let replies = crate::db::get_thread_children(&mut conn, &uri, depth as i32).await?;
400
+
let parents = crate::db::get_thread_parents(&mut conn, &uri, parent_height as i32).await?;
417
401
418
402
let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect();
419
403
let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect();
···
669
653
}
670
654
}
671
655
672
-
async fn get_skeleton_repost_data<'a>(
656
+
async fn get_skeleton_repost_data(
673
657
conn: &mut AsyncPgConnection,
674
-
hyd: &StatefulHydrator<'a>,
675
658
reposts: Vec<String>,
676
-
) -> HashMap<String, FeedReasonRepost> {
659
+
) -> HashMap<String, (String, NaiveDateTime)> {
677
660
let Ok(repost_data) = schema::records::table
678
661
.select((
679
662
schema::records::at_uri,
···
687
670
return HashMap::new();
688
671
};
689
672
690
-
let profiles = repost_data.iter().map(|(_, did, _)| did.clone()).collect();
691
-
let profiles = hyd.hydrate_profiles_basic(profiles).await;
692
-
693
673
repost_data
694
674
.into_iter()
695
-
.filter_map(|(uri, did, indexed_at)| {
696
-
let by = profiles.get(&did).cloned()?;
697
-
698
-
let repost = FeedReasonRepost {
699
-
by,
700
-
uri: Some(uri.clone()),
701
-
cid: None, // okay, we do have this, but the app doesn't seem to be bothered about not setting it.
702
-
indexed_at: indexed_at.and_utc(),
703
-
};
704
-
705
-
Some((uri, repost))
706
-
})
675
+
.map(|(uri, did, at)| (uri, (did, at)))
707
676
.collect()
708
677
}
709
678
+3
parakeet/src/xrpc/app_bsky/mod.rs
+3
parakeet/src/xrpc/app_bsky/mod.rs
···
6
6
mod feed;
7
7
mod graph;
8
8
mod labeler;
9
+
mod unspecced;
9
10
10
11
#[rustfmt::skip]
11
12
pub fn routes() -> Router<crate::GlobalState> {
···
64
65
// TODO: app.bsky.notification.putActivitySubscriptions
65
66
// TODO: app.bsky.notification.putPreferences
66
67
// TODO: app.bsky.notification.putPreferencesV2
68
+
.route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2))
69
+
.route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2))
67
70
}
68
71
69
72
async fn not_implemented() -> axum::http::StatusCode {
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
···
1
+
pub mod thread_v2;
+382
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
+382
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
···
1
+
use crate::db::ThreadItem;
2
+
use crate::hydration::StatefulHydrator;
3
+
use crate::xrpc::error::{Error, XrpcResult};
4
+
use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth};
5
+
use crate::xrpc::normalise_at_uri;
6
+
use crate::GlobalState;
7
+
use axum::extract::{Query, State};
8
+
use axum::Json;
9
+
use itertools::Itertools;
10
+
use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView};
11
+
use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType};
12
+
use serde::{Deserialize, Serialize};
13
+
use std::cmp::Ordering;
14
+
use std::collections::{HashMap, HashSet};
15
+
16
+
const THREAD_PARENTS: usize = 50;
17
+
const DEFAULT_BRANCHING: u32 = 10;
18
+
const DEFAULT_DEPTH: u32 = 6;
19
+
20
+
#[derive(Copy, Clone, Debug, Default, Deserialize)]
21
+
#[serde(rename_all = "lowercase")]
22
+
pub enum PostThreadSort {
23
+
Newest,
24
+
#[default]
25
+
Oldest,
26
+
Top,
27
+
}
28
+
29
+
#[derive(Debug, Deserialize)]
30
+
#[serde(rename_all = "camelCase")]
31
+
pub struct GetPostThreadV2Req {
32
+
pub anchor: String,
33
+
pub above: Option<bool>,
34
+
pub below: Option<u32>,
35
+
pub branching_factor: Option<u32>,
36
+
#[serde(default)]
37
+
pub sort: PostThreadSort,
38
+
}
39
+
40
+
#[derive(Debug, Serialize)]
41
+
#[serde(rename_all = "camelCase")]
42
+
pub struct GetPostThreadV2Res {
43
+
pub thread: Vec<ThreadV2Item>,
44
+
#[serde(skip_serializing_if = "Option::is_none")]
45
+
pub threadgate: Option<ThreadgateView>,
46
+
pub has_other_replies: bool,
47
+
}
48
+
49
+
pub async fn get_post_thread_v2(
50
+
State(state): State<GlobalState>,
51
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
52
+
maybe_auth: Option<AtpAuth>,
53
+
Query(query): Query<GetPostThreadV2Req>,
54
+
) -> XrpcResult<Json<GetPostThreadV2Res>> {
55
+
let mut conn = state.pool.get().await?;
56
+
let maybe_did = maybe_auth.clone().map(|v| v.0);
57
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
58
+
59
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
60
+
let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32;
61
+
let branching_factor = query
62
+
.branching_factor
63
+
.unwrap_or(DEFAULT_BRANCHING)
64
+
.clamp(0, 100) as i32;
65
+
66
+
let anchor = hyd
67
+
.hydrate_post(uri.clone())
68
+
.await
69
+
.ok_or(Error::not_found())?;
70
+
71
+
if let Some(v) = &anchor.author.viewer {
72
+
if v.blocked_by || v.blocking.is_some() {
73
+
let block = ThreadV2ItemType::Blocked {
74
+
author: BlockedAuthor {
75
+
did: anchor.author.did,
76
+
viewer: anchor.author.viewer,
77
+
},
78
+
};
79
+
80
+
return Ok(Json(GetPostThreadV2Res {
81
+
thread: vec![ThreadV2Item {
82
+
uri,
83
+
depth: 0,
84
+
value: block,
85
+
}],
86
+
threadgate: anchor.threadgate,
87
+
has_other_replies: false,
88
+
}));
89
+
}
90
+
}
91
+
92
+
// get the root post URI (if there is one) and return its author's DID.
93
+
let root_uri = crate::db::get_root_post(&mut conn, &uri)
94
+
.await?
95
+
.unwrap_or(uri.clone());
96
+
let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0];
97
+
98
+
let replies =
99
+
crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1)
100
+
.await?;
101
+
let reply_uris = replies
102
+
.iter()
103
+
.map(|item| item.at_uri.clone())
104
+
.collect::<Vec<_>>();
105
+
106
+
// bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents.
107
+
let parents = match query.above.unwrap_or(true) {
108
+
true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?,
109
+
false => vec![],
110
+
};
111
+
let parent_uris = parents
112
+
.iter()
113
+
.map(|item| item.at_uri.clone())
114
+
.collect::<Vec<_>>();
115
+
116
+
let (mut replies_hyd, mut parents_hyd) = tokio::join!(
117
+
hyd.hydrate_posts(reply_uris),
118
+
hyd.hydrate_posts(parent_uris),
119
+
);
120
+
121
+
let threadgate = anchor.threadgate.clone();
122
+
let hidden: HashSet<_, std::hash::RandomState> = match &threadgate {
123
+
Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?,
124
+
None => None,
125
+
}
126
+
.map(|hiddens| HashSet::from_iter(Vec::from(hiddens)))
127
+
.unwrap_or_default();
128
+
129
+
let root_has_more = parents.len() > THREAD_PARENTS;
130
+
let mut is_op_thread = true;
131
+
132
+
let mut thread = Vec::with_capacity(1 + replies.len() + parents.len());
133
+
134
+
thread.extend(
135
+
parents
136
+
.into_iter()
137
+
.tail(THREAD_PARENTS)
138
+
.enumerate()
139
+
.map(|(idx, item)| {
140
+
let value = parents_hyd
141
+
.remove(&item.at_uri)
142
+
.map(|post| {
143
+
if let Some(v) = &post.author.viewer {
144
+
if v.blocked_by || v.blocking.is_some() {
145
+
return ThreadV2ItemType::Blocked {
146
+
author: BlockedAuthor {
147
+
did: post.author.did,
148
+
viewer: post.author.viewer,
149
+
},
150
+
};
151
+
}
152
+
}
153
+
154
+
let op_thread = (is_op_thread
155
+
|| item.root_uri.is_none() && item.parent_uri.is_none())
156
+
&& post.author.did == root_did;
157
+
158
+
ThreadV2ItemType::Post(ThreadItemPost {
159
+
post,
160
+
more_parents: idx == 0 && root_has_more,
161
+
more_replies: 0,
162
+
op_thread,
163
+
hidden_by_threadgate: false,
164
+
muted_by_viewer: false,
165
+
})
166
+
})
167
+
.unwrap_or(ThreadV2ItemType::NotFound {});
168
+
169
+
ThreadV2Item {
170
+
uri: item.at_uri,
171
+
depth: -item.depth - 1,
172
+
value,
173
+
}
174
+
}),
175
+
);
176
+
177
+
is_op_thread = is_op_thread && anchor.author.did == root_did;
178
+
thread.push(ThreadV2Item {
179
+
uri: uri.clone(),
180
+
depth: 0,
181
+
value: ThreadV2ItemType::Post(ThreadItemPost {
182
+
post: anchor,
183
+
more_parents: false,
184
+
more_replies: 0,
185
+
op_thread: is_op_thread,
186
+
hidden_by_threadgate: false,
187
+
muted_by_viewer: false,
188
+
}),
189
+
});
190
+
191
+
let mut replies_grouped = replies
192
+
.into_iter()
193
+
.into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default());
194
+
195
+
// start with the anchor
196
+
let (children, has_other_replies) = build_thread_children(
197
+
&mut replies_grouped,
198
+
&mut replies_hyd,
199
+
&hidden,
200
+
&uri,
201
+
is_op_thread,
202
+
1,
203
+
&BuildThreadChildrenOpts {
204
+
root_did,
205
+
sort: query.sort,
206
+
maybe_did: &maybe_did,
207
+
max_depth: depth,
208
+
},
209
+
);
210
+
thread.extend(children);
211
+
212
+
Ok(Json(GetPostThreadV2Res {
213
+
thread,
214
+
threadgate,
215
+
has_other_replies,
216
+
}))
217
+
}
218
+
219
+
#[derive(Debug, Deserialize)]
220
+
#[serde(rename_all = "camelCase")]
221
+
pub struct GetPostThreadOtherV2Req {
222
+
pub anchor: String,
223
+
}
224
+
225
+
#[derive(Debug, Serialize)]
226
+
#[serde(rename_all = "camelCase")]
227
+
pub struct GetPostThreadOtherV2Res {
228
+
pub thread: Vec<ThreadV2Item>,
229
+
}
230
+
231
+
pub async fn get_post_thread_other_v2(
232
+
State(state): State<GlobalState>,
233
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
234
+
maybe_auth: Option<AtpAuth>,
235
+
Query(query): Query<GetPostThreadOtherV2Req>,
236
+
) -> XrpcResult<Json<GetPostThreadOtherV2Res>> {
237
+
let mut conn = state.pool.get().await?;
238
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
239
+
240
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
241
+
242
+
let root = crate::db::get_root_post(&mut conn, &uri)
243
+
.await?
244
+
.unwrap_or(uri.clone());
245
+
246
+
// this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE
247
+
let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?;
248
+
let reply_uris = replies
249
+
.into_iter()
250
+
.map(|item| item.at_uri)
251
+
.collect::<Vec<_>>();
252
+
let thread = hyd
253
+
.hydrate_posts(reply_uris)
254
+
.await
255
+
.into_iter()
256
+
.filter(|(_, post)| match &post.author.viewer {
257
+
Some(viewer) if viewer.blocked_by || viewer.blocking.is_some() => false,
258
+
_ => true,
259
+
})
260
+
.map(|(uri, post)| {
261
+
let post = ThreadItemPost {
262
+
post,
263
+
more_parents: false,
264
+
more_replies: 0,
265
+
op_thread: false,
266
+
hidden_by_threadgate: true,
267
+
muted_by_viewer: false,
268
+
};
269
+
270
+
ThreadV2Item {
271
+
uri,
272
+
depth: 1,
273
+
value: ThreadV2ItemType::Post(post),
274
+
}
275
+
})
276
+
.collect();
277
+
278
+
Ok(Json(GetPostThreadOtherV2Res { thread }))
279
+
}
280
+
281
+
#[derive(Debug)]
282
+
struct BuildThreadChildrenOpts<'a> {
283
+
root_did: &'a str,
284
+
sort: PostThreadSort,
285
+
maybe_did: &'a Option<String>,
286
+
max_depth: i32,
287
+
}
288
+
289
+
fn build_thread_children(
290
+
grouped_replies: &mut HashMap<String, Vec<ThreadItem>>,
291
+
replies_hyd: &mut HashMap<String, PostView>,
292
+
hidden: &HashSet<String>,
293
+
parent: &str,
294
+
is_op_thread: bool,
295
+
depth: i32,
296
+
opts: &BuildThreadChildrenOpts,
297
+
) -> (Vec<ThreadV2Item>, bool) {
298
+
let mut has_other_replies = false;
299
+
300
+
let Some(replies) = grouped_replies.remove(parent) else {
301
+
return (Vec::default(), has_other_replies);
302
+
};
303
+
304
+
let replies = replies
305
+
.into_iter()
306
+
.filter_map(|item| replies_hyd.remove(&item.at_uri))
307
+
.sorted_by(sort_replies(&opts.sort));
308
+
309
+
let mut out = Vec::new();
310
+
311
+
for post in replies {
312
+
let reply_count = grouped_replies
313
+
.get(&post.uri)
314
+
.map(|v| v.len())
315
+
.unwrap_or_default();
316
+
let at_max = depth == opts.max_depth;
317
+
let more_replies = if at_max { reply_count } else { 0 };
318
+
let op_thread = is_op_thread && post.author.did == opts.root_did;
319
+
320
+
// shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies...
321
+
if let Some(v) = &post.author.viewer {
322
+
if v.blocked_by || v.blocking.is_some() {
323
+
continue;
324
+
}
325
+
}
326
+
327
+
// check if the post is hidden AND we're NOT the author (hidden posts still show for their author)
328
+
if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) {
329
+
// post is hidden - do not ~pass go~ push to the thread.
330
+
if depth == 1 {
331
+
has_other_replies = true;
332
+
}
333
+
continue;
334
+
}
335
+
336
+
let uri = post.uri.clone();
337
+
out.push(ThreadV2Item {
338
+
uri: post.uri.clone(),
339
+
depth,
340
+
value: ThreadV2ItemType::Post(ThreadItemPost {
341
+
post,
342
+
more_parents: false,
343
+
more_replies: more_replies as i32,
344
+
op_thread,
345
+
hidden_by_threadgate: false,
346
+
muted_by_viewer: false,
347
+
}),
348
+
});
349
+
350
+
if !at_max {
351
+
// we don't care about has_other_replies when recursing
352
+
let (children, _) = build_thread_children(
353
+
grouped_replies,
354
+
replies_hyd,
355
+
hidden,
356
+
&uri,
357
+
op_thread,
358
+
depth + 1,
359
+
opts,
360
+
);
361
+
362
+
out.extend(children);
363
+
}
364
+
}
365
+
366
+
(out, has_other_replies)
367
+
}
368
+
369
+
fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> {
370
+
move |a: &PostView, b: &PostView| match sort {
371
+
PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at),
372
+
PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at),
373
+
PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count),
374
+
}
375
+
}
376
+
377
+
fn did_is_cur(cur: &Option<String>, did: &String) -> bool {
378
+
match cur {
379
+
Some(cur) => did == cur,
380
+
None => false,
381
+
}
382
+
}
+1
-1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
+1
-1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
+70
-12
parakeet-db/src/models.rs
+70
-12
parakeet-db/src/models.rs
···
137
137
138
138
pub content: String,
139
139
pub facets: Option<serde_json::Value>,
140
-
pub languages: Vec<Option<String>>,
141
-
pub tags: Vec<Option<String>>,
140
+
pub languages: not_null_vec::TextArray,
141
+
pub tags: not_null_vec::TextArray,
142
142
143
143
pub parent_uri: Option<String>,
144
144
pub parent_cid: Option<String>,
···
147
147
148
148
pub embed: Option<String>,
149
149
pub embed_subtype: Option<String>,
150
+
151
+
pub mentions: Option<not_null_vec::TextArray>,
152
+
pub violates_threadgate: bool,
150
153
151
154
pub created_at: DateTime<Utc>,
152
155
pub indexed_at: NaiveDateTime,
···
233
236
pub cid: String,
234
237
pub post_uri: String,
235
238
236
-
pub detached: Vec<Option<String>>,
237
-
pub rules: Vec<Option<String>>,
239
+
pub detached: not_null_vec::TextArray,
240
+
pub rules: not_null_vec::TextArray,
238
241
239
242
pub created_at: DateTime<Utc>,
240
243
pub indexed_at: NaiveDateTime,
···
249
252
pub cid: String,
250
253
pub post_uri: String,
251
254
252
-
pub hidden_replies: Vec<Option<String>>,
253
-
pub allow: Vec<Option<String>>,
254
-
pub allowed_lists: Vec<Option<String>>,
255
+
pub hidden_replies: not_null_vec::TextArray,
256
+
pub allow: Option<not_null_vec::TextArray>,
257
+
pub allowed_lists: Option<not_null_vec::TextArray>,
255
258
256
259
pub record: serde_json::Value,
257
260
···
273
276
pub description: Option<String>,
274
277
pub description_facets: Option<serde_json::Value>,
275
278
pub list: String,
276
-
pub feeds: Option<Vec<Option<String>>>,
279
+
pub feeds: Option<not_null_vec::TextArray>,
277
280
278
281
pub created_at: DateTime<Utc>,
279
282
pub indexed_at: NaiveDateTime,
···
287
290
pub did: String,
288
291
pub cid: String,
289
292
290
-
pub reasons: Option<Vec<Option<String>>>,
291
-
pub subject_types: Option<Vec<Option<String>>>,
292
-
pub subject_collections: Option<Vec<Option<String>>>,
293
+
pub reasons: Option<not_null_vec::TextArray>,
294
+
pub subject_types: Option<not_null_vec::TextArray>,
295
+
pub subject_collections: Option<not_null_vec::TextArray>,
293
296
294
297
pub created_at: NaiveDateTime,
295
298
pub indexed_at: NaiveDateTime,
···
399
402
pub subject: String,
400
403
pub subject_cid: Option<String>,
401
404
pub subject_type: String,
402
-
pub tags: Vec<Option<String>>,
405
+
pub tags: not_null_vec::TextArray,
403
406
pub created_at: DateTime<Utc>,
404
407
}
405
408
···
414
417
pub subject_type: &'a str,
415
418
pub tags: Vec<String>,
416
419
}
420
+
421
+
#[derive(Debug, Queryable, Selectable, Identifiable)]
422
+
#[diesel(table_name = crate::schema::author_feeds)]
423
+
#[diesel(primary_key(uri))]
424
+
#[diesel(check_for_backend(diesel::pg::Pg))]
425
+
pub struct AuthorFeedItem {
426
+
pub uri: String,
427
+
pub cid: String,
428
+
pub post: String,
429
+
pub did: String,
430
+
pub typ: String,
431
+
pub sort_at: DateTime<Utc>,
432
+
}
433
+
434
+
pub use not_null_vec::TextArray;
435
+
mod not_null_vec {
436
+
use diesel::deserialize::FromSql;
437
+
use diesel::pg::Pg;
438
+
use diesel::sql_types::{Array, Nullable, Text};
439
+
use diesel::{deserialize, FromSqlRow};
440
+
use serde::{Deserialize, Serialize};
441
+
use std::ops::{Deref, DerefMut};
442
+
443
+
#[derive(Clone, Debug, Default, Serialize, Deserialize, FromSqlRow)]
444
+
#[diesel(sql_type = Array<Nullable<Text>>)]
445
+
pub struct TextArray(pub Vec<String>);
446
+
447
+
impl FromSql<Array<Nullable<Text>>, Pg> for TextArray {
448
+
fn from_sql(bytes: diesel::pg::PgValue<'_>) -> deserialize::Result<Self> {
449
+
let vec_with_nulls =
450
+
<Vec<Option<String>> as FromSql<Array<Nullable<Text>>, Pg>>::from_sql(bytes)?;
451
+
Ok(TextArray(vec_with_nulls.into_iter().flatten().collect()))
452
+
}
453
+
}
454
+
455
+
impl Deref for TextArray {
456
+
type Target = Vec<String>;
457
+
458
+
fn deref(&self) -> &Self::Target {
459
+
&self.0
460
+
}
461
+
}
462
+
463
+
impl DerefMut for TextArray {
464
+
fn deref_mut(&mut self) -> &mut Self::Target {
465
+
&mut self.0
466
+
}
467
+
}
468
+
469
+
impl From<TextArray> for Vec<String> {
470
+
fn from(v: TextArray) -> Vec<String> {
471
+
v.0
472
+
}
473
+
}
474
+
}
+16
-2
parakeet-db/src/schema.rs
+16
-2
parakeet-db/src/schema.rs
···
13
13
}
14
14
15
15
diesel::table! {
16
+
author_feeds (uri) {
17
+
uri -> Text,
18
+
cid -> Text,
19
+
post -> Text,
20
+
did -> Text,
21
+
typ -> Text,
22
+
sort_at -> Timestamptz,
23
+
}
24
+
}
25
+
26
+
diesel::table! {
16
27
backfill (repo, repo_ver) {
17
28
repo -> Text,
18
29
repo_ver -> Text,
···
284
295
embed_subtype -> Nullable<Text>,
285
296
created_at -> Timestamptz,
286
297
indexed_at -> Timestamp,
298
+
mentions -> Nullable<Array<Nullable<Text>>>,
299
+
violates_threadgate -> Bool,
287
300
}
288
301
}
289
302
···
378
391
cid -> Text,
379
392
post_uri -> Text,
380
393
hidden_replies -> Array<Nullable<Text>>,
381
-
allow -> Array<Nullable<Text>>,
382
-
allowed_lists -> Array<Nullable<Text>>,
394
+
allow -> Nullable<Array<Nullable<Text>>>,
395
+
allowed_lists -> Nullable<Array<Nullable<Text>>>,
383
396
record -> Jsonb,
384
397
created_at -> Timestamptz,
385
398
indexed_at -> Timestamp,
···
429
442
430
443
diesel::allow_tables_to_appear_in_same_query!(
431
444
actors,
445
+
author_feeds,
432
446
backfill,
433
447
backfill_jobs,
434
448
blocks,