+1
lexica/src/app_bsky/mod.rs
+1
lexica/src/app_bsky/mod.rs
+33
lexica/src/app_bsky/unspecced.rs
+33
lexica/src/app_bsky/unspecced.rs
···
1
+
use crate::app_bsky::feed::{BlockedAuthor, PostView};
2
+
use serde::Serialize;
3
+
4
+
#[derive(Clone, Debug, Serialize)]
5
+
pub struct ThreadV2Item {
6
+
pub uri: String,
7
+
pub depth: i32,
8
+
pub value: ThreadV2ItemType,
9
+
}
10
+
11
+
#[derive(Clone, Debug, Serialize)]
12
+
#[serde(tag = "$type")]
13
+
pub enum ThreadV2ItemType {
14
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemPost")]
15
+
Post(ThreadItemPost),
16
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemNoUnauthenticated")]
17
+
NoUnauthenticated {},
18
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemNotFound")]
19
+
NotFound {},
20
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemBlocked")]
21
+
Blocked { author: BlockedAuthor },
22
+
}
23
+
24
+
#[derive(Clone, Debug, Serialize)]
25
+
#[serde(rename_all = "camelCase")]
26
+
pub struct ThreadItemPost {
27
+
pub post: PostView,
28
+
pub more_parents: bool,
29
+
pub more_replies: i32,
30
+
pub op_thread: bool,
31
+
pub hidden_by_threadgate: bool,
32
+
pub muted_by_viewer: bool,
33
+
}
+95
-1
parakeet/src/db.rs
+95
-1
parakeet/src/db.rs
···
1
1
use diesel::prelude::*;
2
-
use diesel::sql_types::{Array, Bool, Nullable, Text};
2
+
use diesel::sql_types::{Array, Bool, Integer, Nullable, Text};
3
3
use diesel_async::{AsyncPgConnection, RunQueryDsl};
4
4
use parakeet_db::{schema, types};
5
+
use parakeet_db::models::TextArray;
5
6
6
7
pub async fn get_actor_status(
7
8
conn: &mut AsyncPgConnection,
···
196
197
.await
197
198
.optional()
198
199
}
200
+
201
+
#[derive(Debug, QueryableByName)]
202
+
#[diesel(check_for_backend(diesel::pg::Pg))]
203
+
#[allow(unused)]
204
+
pub struct ThreadItem {
205
+
#[diesel(sql_type = Text)]
206
+
pub at_uri: String,
207
+
#[diesel(sql_type = Nullable<Text>)]
208
+
pub parent_uri: Option<String>,
209
+
#[diesel(sql_type = Nullable<Text>)]
210
+
pub root_uri: Option<String>,
211
+
#[diesel(sql_type = Integer)]
212
+
pub depth: i32,
213
+
}
214
+
215
+
pub async fn get_thread_children(
216
+
conn: &mut AsyncPgConnection,
217
+
uri: &str,
218
+
depth: i32,
219
+
) -> QueryResult<Vec<ThreadItem>> {
220
+
diesel::sql_query(include_str!("sql/thread.sql"))
221
+
.bind::<Text, _>(uri)
222
+
.bind::<Integer, _>(depth)
223
+
.load(conn)
224
+
.await
225
+
}
226
+
227
+
pub async fn get_thread_children_branching(
228
+
conn: &mut AsyncPgConnection,
229
+
uri: &str,
230
+
depth: i32,
231
+
branching_factor: i32,
232
+
) -> QueryResult<Vec<ThreadItem>> {
233
+
diesel::sql_query(include_str!("sql/thread_branching.sql"))
234
+
.bind::<Text, _>(uri)
235
+
.bind::<Integer, _>(depth)
236
+
.bind::<Integer, _>(branching_factor)
237
+
.load(conn)
238
+
.await
239
+
}
240
+
241
+
#[derive(Debug, QueryableByName)]
242
+
#[diesel(check_for_backend(diesel::pg::Pg))]
243
+
pub struct HiddenThreadChildItem {
244
+
#[diesel(sql_type = Text)]
245
+
pub at_uri: String,
246
+
}
247
+
248
+
pub async fn get_thread_children_hidden(
249
+
conn: &mut AsyncPgConnection,
250
+
uri: &str,
251
+
root: &str,
252
+
) -> QueryResult<Vec<HiddenThreadChildItem>> {
253
+
diesel::sql_query(include_str!("sql/thread_v2_hidden_children.sql"))
254
+
.bind::<Text, _>(uri)
255
+
.bind::<Text, _>(root)
256
+
.load(conn)
257
+
.await
258
+
}
259
+
260
+
pub async fn get_thread_parents(
261
+
conn: &mut AsyncPgConnection,
262
+
uri: &str,
263
+
height: i32,
264
+
) -> QueryResult<Vec<ThreadItem>> {
265
+
diesel::sql_query(include_str!("sql/thread_parent.sql"))
266
+
.bind::<Text, _>(uri)
267
+
.bind::<Integer, _>(height)
268
+
.load(conn)
269
+
.await
270
+
}
271
+
272
+
pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> {
273
+
schema::posts::table
274
+
.select(schema::posts::root_uri)
275
+
.find(&uri)
276
+
.get_result(conn)
277
+
.await
278
+
.optional()
279
+
.map(|v| v.flatten())
280
+
}
281
+
282
+
pub async fn get_threadgate_hiddens(
283
+
conn: &mut AsyncPgConnection,
284
+
uri: &str,
285
+
) -> QueryResult<Option<TextArray>> {
286
+
schema::threadgates::table
287
+
.select(schema::threadgates::hidden_replies)
288
+
.find(&uri)
289
+
.get_result(conn)
290
+
.await
291
+
.optional()
292
+
}
+5
-9
parakeet/src/hydration/labeler.rs
+5
-9
parakeet/src/hydration/labeler.rs
···
42
42
likes: Option<i32>,
43
43
) -> LabelerViewDetailed {
44
44
let reason_types = labeler.reasons.map(|v| {
45
-
v.into_iter()
46
-
.flatten()
47
-
.filter_map(|v| ReasonType::from_str(&v).ok())
45
+
v.iter()
46
+
.filter_map(|v| ReasonType::from_str(v).ok())
48
47
.collect()
49
48
});
50
49
···
74
73
})
75
74
.collect();
76
75
let subject_types = labeler.subject_types.map(|v| {
77
-
v.into_iter()
78
-
.flatten()
79
-
.filter_map(|v| SubjectType::from_str(&v).ok())
76
+
v.iter()
77
+
.filter_map(|v| SubjectType::from_str(v).ok())
80
78
.collect()
81
79
});
82
-
let subject_collections = labeler
83
-
.subject_collections
84
-
.map(|v| v.into_iter().flatten().collect());
80
+
let subject_collections = labeler.subject_collections.map(Vec::from);
85
81
86
82
LabelerViewDetailed {
87
83
uri: format!("at://{}/app.bsky.labeler.service/self", labeler.did),
+3
-3
parakeet/src/hydration/posts.rs
+3
-3
parakeet/src/hydration/posts.rs
···
89
89
let threadgate = threadgate?;
90
90
91
91
let lists = match threadgate.allowed_lists.as_ref() {
92
-
Some(allowed_lists) => allowed_lists.iter().flatten().cloned().collect(),
92
+
Some(allowed_lists) => allowed_lists.clone().into(),
93
93
None => Vec::new(),
94
94
};
95
95
let lists = self.hydrate_lists_basic(lists).await;
···
106
106
) -> HashMap<String, ThreadgateView> {
107
107
let lists = threadgates.iter().fold(Vec::new(), |mut acc, c| {
108
108
if let Some(lists) = &c.allowed_lists {
109
-
acc.extend(lists.iter().flatten().cloned());
109
+
acc.extend(lists.clone().0);
110
110
}
111
111
acc
112
112
});
···
118
118
let this_lists = match &threadgate.allowed_lists {
119
119
Some(allowed_lists) => allowed_lists
120
120
.iter()
121
-
.filter_map(|v| v.clone().and_then(|v| lists.get(&v).cloned()))
121
+
.filter_map(|v| lists.get(v).cloned())
122
122
.collect(),
123
123
None => Vec::new(),
124
124
};
+3
-7
parakeet/src/hydration/starter_packs.rs
+3
-7
parakeet/src/hydration/starter_packs.rs
···
96
96
let feeds = sp
97
97
.feeds
98
98
.clone()
99
-
.unwrap_or_default()
100
-
.into_iter()
101
-
.flatten()
102
-
.collect();
103
-
let feeds = self.hydrate_feedgens(feeds).await.into_values().collect();
99
+
.unwrap_or_default();
100
+
let feeds = self.hydrate_feedgens(feeds.into()).await.into_values().collect();
104
101
105
102
Some(build_spview(sp, creator, labels, list, feeds))
106
103
}
···
119
116
let feeds = packs
120
117
.values()
121
118
.filter_map(|pack| pack.feeds.clone())
122
-
.flat_map(|feeds| feeds.into_iter().flatten())
119
+
.flat_map(Vec::from)
123
120
.collect();
124
121
125
122
let creators = self.hydrate_profiles_basic(creators).await;
···
133
130
let list = lists.get(&pack.list).cloned();
134
131
let feeds = pack.feeds.as_ref().map(|v| {
135
132
v.iter()
136
-
.flatten()
137
133
.filter_map(|feed| feeds.get(feed).cloned())
138
134
.collect()
139
135
});
+4
-1
parakeet/src/loaders.rs
+4
-1
parakeet/src/loaders.rs
···
4
4
use dataloader::async_cached::Loader;
5
5
use dataloader::non_cached::Loader as NonCachedLoader;
6
6
use dataloader::BatchFn;
7
+
use diesel::dsl::sql;
7
8
use diesel::prelude::*;
8
9
use diesel_async::pooled_connection::deadpool::Pool;
9
10
use diesel_async::{AsyncPgConnection, RunQueryDsl};
···
368
369
let mut conn = self.0.get().await.unwrap();
369
370
370
371
let res = schema::posts::table
371
-
.left_join(schema::threadgates::table)
372
+
.left_join(schema::threadgates::table.on(
373
+
schema::threadgates::post_uri.eq(sql("coalesce(posts.root_uri, posts.at_uri)")),
374
+
))
372
375
.select((
373
376
models::Post::as_select(),
374
377
Option::<models::Threadgate>::as_select(),
+1
-1
parakeet/src/sql/thread.sql
+1
-1
parakeet/src/sql/thread.sql
+13
parakeet/src/sql/thread_branching.sql
+13
parakeet/src/sql/thread_branching.sql
···
1
+
with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth
2
+
from posts
3
+
where parent_uri = $1
4
+
and violates_threadgate = FALSE
5
+
union all
6
+
(select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1
7
+
from posts p
8
+
join thread on p.parent_uri = thread.at_uri
9
+
where thread.depth <= $2
10
+
and violates_threadgate = FALSE
11
+
LIMIT $3))
12
+
select *
13
+
from thread;
+2
-24
parakeet/src/xrpc/app_bsky/feed/posts.rs
+2
-24
parakeet/src/xrpc/app_bsky/feed/posts.rs
···
361
361
pub threadgate: Option<ThreadgateView>,
362
362
}
363
363
364
-
#[derive(Debug, QueryableByName)]
365
-
#[diesel(check_for_backend(diesel::pg::Pg))]
366
-
struct ThreadItem {
367
-
#[diesel(sql_type = diesel::sql_types::Text)]
368
-
at_uri: String,
369
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
370
-
parent_uri: Option<String>,
371
-
// #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
372
-
// root_uri: Option<String>,
373
-
#[diesel(sql_type = diesel::sql_types::Integer)]
374
-
depth: i32,
375
-
}
376
-
377
364
pub async fn get_post_thread(
378
365
State(state): State<GlobalState>,
379
366
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
···
409
396
}
410
397
}
411
398
412
-
let replies = diesel::sql_query(include_str!("../../../sql/thread.sql"))
413
-
.bind::<diesel::sql_types::Text, _>(&uri)
414
-
.bind::<diesel::sql_types::Integer, _>(depth as i32)
415
-
.load::<ThreadItem>(&mut conn)
416
-
.await?;
417
-
418
-
let parents = diesel::sql_query(include_str!("../../../sql/thread_parent.sql"))
419
-
.bind::<diesel::sql_types::Text, _>(&uri)
420
-
.bind::<diesel::sql_types::Integer, _>(parent_height as i32)
421
-
.load::<ThreadItem>(&mut conn)
422
-
.await?;
399
+
let replies = crate::db::get_thread_children(&mut conn, &uri, depth as i32).await?;
400
+
let parents = crate::db::get_thread_parents(&mut conn, &uri, parent_height as i32).await?;
423
401
424
402
let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect();
425
403
let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect();
+3
parakeet/src/xrpc/app_bsky/mod.rs
+3
parakeet/src/xrpc/app_bsky/mod.rs
···
6
6
mod feed;
7
7
mod graph;
8
8
mod labeler;
9
+
mod unspecced;
9
10
10
11
#[rustfmt::skip]
11
12
pub fn routes() -> Router<crate::GlobalState> {
···
64
65
// TODO: app.bsky.notification.putActivitySubscriptions
65
66
// TODO: app.bsky.notification.putPreferences
66
67
// TODO: app.bsky.notification.putPreferencesV2
68
+
.route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2))
69
+
.route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2))
67
70
}
68
71
69
72
async fn not_implemented() -> axum::http::StatusCode {
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
···
1
+
pub mod thread_v2;
+382
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
+382
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
···
1
+
use crate::db::ThreadItem;
2
+
use crate::hydration::StatefulHydrator;
3
+
use crate::xrpc::error::{Error, XrpcResult};
4
+
use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth};
5
+
use crate::xrpc::normalise_at_uri;
6
+
use crate::GlobalState;
7
+
use axum::extract::{Query, State};
8
+
use axum::Json;
9
+
use itertools::Itertools;
10
+
use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView};
11
+
use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType};
12
+
use serde::{Deserialize, Serialize};
13
+
use std::cmp::Ordering;
14
+
use std::collections::{HashMap, HashSet};
15
+
16
+
const THREAD_PARENTS: usize = 50;
17
+
const DEFAULT_BRANCHING: u32 = 10;
18
+
const DEFAULT_DEPTH: u32 = 6;
19
+
20
+
#[derive(Copy, Clone, Debug, Default, Deserialize)]
21
+
#[serde(rename_all = "lowercase")]
22
+
pub enum PostThreadSort {
23
+
Newest,
24
+
#[default]
25
+
Oldest,
26
+
Top,
27
+
}
28
+
29
+
#[derive(Debug, Deserialize)]
30
+
#[serde(rename_all = "camelCase")]
31
+
pub struct GetPostThreadV2Req {
32
+
pub anchor: String,
33
+
pub above: Option<bool>,
34
+
pub below: Option<u32>,
35
+
pub branching_factor: Option<u32>,
36
+
#[serde(default)]
37
+
pub sort: PostThreadSort,
38
+
}
39
+
40
+
#[derive(Debug, Serialize)]
41
+
#[serde(rename_all = "camelCase")]
42
+
pub struct GetPostThreadV2Res {
43
+
pub thread: Vec<ThreadV2Item>,
44
+
#[serde(skip_serializing_if = "Option::is_none")]
45
+
pub threadgate: Option<ThreadgateView>,
46
+
pub has_other_replies: bool,
47
+
}
48
+
49
+
pub async fn get_post_thread_v2(
50
+
State(state): State<GlobalState>,
51
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
52
+
maybe_auth: Option<AtpAuth>,
53
+
Query(query): Query<GetPostThreadV2Req>,
54
+
) -> XrpcResult<Json<GetPostThreadV2Res>> {
55
+
let mut conn = state.pool.get().await?;
56
+
let maybe_did = maybe_auth.clone().map(|v| v.0);
57
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
58
+
59
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
60
+
let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32;
61
+
let branching_factor = query
62
+
.branching_factor
63
+
.unwrap_or(DEFAULT_BRANCHING)
64
+
.clamp(0, 100) as i32;
65
+
66
+
let anchor = hyd
67
+
.hydrate_post(uri.clone())
68
+
.await
69
+
.ok_or(Error::not_found())?;
70
+
71
+
if let Some(v) = &anchor.author.viewer {
72
+
if v.blocked_by || v.blocking.is_some() {
73
+
let block = ThreadV2ItemType::Blocked {
74
+
author: BlockedAuthor {
75
+
did: anchor.author.did,
76
+
viewer: anchor.author.viewer,
77
+
},
78
+
};
79
+
80
+
return Ok(Json(GetPostThreadV2Res {
81
+
thread: vec![ThreadV2Item {
82
+
uri,
83
+
depth: 0,
84
+
value: block,
85
+
}],
86
+
threadgate: anchor.threadgate,
87
+
has_other_replies: false,
88
+
}));
89
+
}
90
+
}
91
+
92
+
// get the root post URI (if there is one) and return its author's DID.
93
+
let root_uri = crate::db::get_root_post(&mut conn, &uri)
94
+
.await?
95
+
.unwrap_or(uri.clone());
96
+
let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0];
97
+
98
+
let replies =
99
+
crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1)
100
+
.await?;
101
+
let reply_uris = replies
102
+
.iter()
103
+
.map(|item| item.at_uri.clone())
104
+
.collect::<Vec<_>>();
105
+
106
+
// bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents.
107
+
let parents = match query.above.unwrap_or(true) {
108
+
true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?,
109
+
false => vec![],
110
+
};
111
+
let parent_uris = parents
112
+
.iter()
113
+
.map(|item| item.at_uri.clone())
114
+
.collect::<Vec<_>>();
115
+
116
+
let (mut replies_hyd, mut parents_hyd) = tokio::join!(
117
+
hyd.hydrate_posts(reply_uris),
118
+
hyd.hydrate_posts(parent_uris),
119
+
);
120
+
121
+
let threadgate = anchor.threadgate.clone();
122
+
let hidden: HashSet<_, std::hash::RandomState> = match &threadgate {
123
+
Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?,
124
+
None => None,
125
+
}
126
+
.map(|hiddens| HashSet::from_iter(Vec::from(hiddens)))
127
+
.unwrap_or_default();
128
+
129
+
let root_has_more = parents.len() > THREAD_PARENTS;
130
+
let mut is_op_thread = true;
131
+
132
+
let mut thread = Vec::with_capacity(1 + replies.len() + parents.len());
133
+
134
+
thread.extend(
135
+
parents
136
+
.into_iter()
137
+
.tail(THREAD_PARENTS)
138
+
.enumerate()
139
+
.map(|(idx, item)| {
140
+
let value = parents_hyd
141
+
.remove(&item.at_uri)
142
+
.map(|post| {
143
+
if let Some(v) = &post.author.viewer {
144
+
if v.blocked_by || v.blocking.is_some() {
145
+
return ThreadV2ItemType::Blocked {
146
+
author: BlockedAuthor {
147
+
did: post.author.did,
148
+
viewer: post.author.viewer,
149
+
},
150
+
};
151
+
}
152
+
}
153
+
154
+
let op_thread = (is_op_thread
155
+
|| item.root_uri.is_none() && item.parent_uri.is_none())
156
+
&& post.author.did == root_did;
157
+
158
+
ThreadV2ItemType::Post(ThreadItemPost {
159
+
post,
160
+
more_parents: idx == 0 && root_has_more,
161
+
more_replies: 0,
162
+
op_thread,
163
+
hidden_by_threadgate: false,
164
+
muted_by_viewer: false,
165
+
})
166
+
})
167
+
.unwrap_or(ThreadV2ItemType::NotFound {});
168
+
169
+
ThreadV2Item {
170
+
uri: item.at_uri,
171
+
depth: -item.depth - 1,
172
+
value,
173
+
}
174
+
}),
175
+
);
176
+
177
+
is_op_thread = is_op_thread && anchor.author.did == root_did;
178
+
thread.push(ThreadV2Item {
179
+
uri: uri.clone(),
180
+
depth: 0,
181
+
value: ThreadV2ItemType::Post(ThreadItemPost {
182
+
post: anchor,
183
+
more_parents: false,
184
+
more_replies: 0,
185
+
op_thread: is_op_thread,
186
+
hidden_by_threadgate: false,
187
+
muted_by_viewer: false,
188
+
}),
189
+
});
190
+
191
+
let mut replies_grouped = replies
192
+
.into_iter()
193
+
.into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default());
194
+
195
+
// start with the anchor
196
+
let (children, has_other_replies) = build_thread_children(
197
+
&mut replies_grouped,
198
+
&mut replies_hyd,
199
+
&hidden,
200
+
&uri,
201
+
is_op_thread,
202
+
1,
203
+
&BuildThreadChildrenOpts {
204
+
root_did,
205
+
sort: query.sort,
206
+
maybe_did: &maybe_did,
207
+
max_depth: depth,
208
+
},
209
+
);
210
+
thread.extend(children);
211
+
212
+
Ok(Json(GetPostThreadV2Res {
213
+
thread,
214
+
threadgate,
215
+
has_other_replies,
216
+
}))
217
+
}
218
+
219
+
#[derive(Debug, Deserialize)]
220
+
#[serde(rename_all = "camelCase")]
221
+
pub struct GetPostThreadOtherV2Req {
222
+
pub anchor: String,
223
+
}
224
+
225
+
#[derive(Debug, Serialize)]
226
+
#[serde(rename_all = "camelCase")]
227
+
pub struct GetPostThreadOtherV2Res {
228
+
pub thread: Vec<ThreadV2Item>,
229
+
}
230
+
231
+
pub async fn get_post_thread_other_v2(
232
+
State(state): State<GlobalState>,
233
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
234
+
maybe_auth: Option<AtpAuth>,
235
+
Query(query): Query<GetPostThreadOtherV2Req>,
236
+
) -> XrpcResult<Json<GetPostThreadOtherV2Res>> {
237
+
let mut conn = state.pool.get().await?;
238
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
239
+
240
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
241
+
242
+
let root = crate::db::get_root_post(&mut conn, &uri)
243
+
.await?
244
+
.unwrap_or(uri.clone());
245
+
246
+
// this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE
247
+
let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?;
248
+
let reply_uris = replies
249
+
.into_iter()
250
+
.map(|item| item.at_uri)
251
+
.collect::<Vec<_>>();
252
+
let thread = hyd
253
+
.hydrate_posts(reply_uris)
254
+
.await
255
+
.into_iter()
256
+
.filter(|(_, post)| match &post.author.viewer {
257
+
Some(viewer) if viewer.blocked_by || viewer.blocking.is_some() => false,
258
+
_ => true,
259
+
})
260
+
.map(|(uri, post)| {
261
+
let post = ThreadItemPost {
262
+
post,
263
+
more_parents: false,
264
+
more_replies: 0,
265
+
op_thread: false,
266
+
hidden_by_threadgate: true,
267
+
muted_by_viewer: false,
268
+
};
269
+
270
+
ThreadV2Item {
271
+
uri,
272
+
depth: 1,
273
+
value: ThreadV2ItemType::Post(post),
274
+
}
275
+
})
276
+
.collect();
277
+
278
+
Ok(Json(GetPostThreadOtherV2Res { thread }))
279
+
}
280
+
281
+
#[derive(Debug)]
282
+
struct BuildThreadChildrenOpts<'a> {
283
+
root_did: &'a str,
284
+
sort: PostThreadSort,
285
+
maybe_did: &'a Option<String>,
286
+
max_depth: i32,
287
+
}
288
+
289
+
fn build_thread_children(
290
+
grouped_replies: &mut HashMap<String, Vec<ThreadItem>>,
291
+
replies_hyd: &mut HashMap<String, PostView>,
292
+
hidden: &HashSet<String>,
293
+
parent: &str,
294
+
is_op_thread: bool,
295
+
depth: i32,
296
+
opts: &BuildThreadChildrenOpts,
297
+
) -> (Vec<ThreadV2Item>, bool) {
298
+
let mut has_other_replies = false;
299
+
300
+
let Some(replies) = grouped_replies.remove(parent) else {
301
+
return (Vec::default(), has_other_replies);
302
+
};
303
+
304
+
let replies = replies
305
+
.into_iter()
306
+
.filter_map(|item| replies_hyd.remove(&item.at_uri))
307
+
.sorted_by(sort_replies(&opts.sort));
308
+
309
+
let mut out = Vec::new();
310
+
311
+
for post in replies {
312
+
let reply_count = grouped_replies
313
+
.get(&post.uri)
314
+
.map(|v| v.len())
315
+
.unwrap_or_default();
316
+
let at_max = depth == opts.max_depth;
317
+
let more_replies = if at_max { reply_count } else { 0 };
318
+
let op_thread = is_op_thread && post.author.did == opts.root_did;
319
+
320
+
// shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies...
321
+
if let Some(v) = &post.author.viewer {
322
+
if v.blocked_by || v.blocking.is_some() {
323
+
continue;
324
+
}
325
+
}
326
+
327
+
// check if the post is hidden AND we're NOT the author (hidden posts still show for their author)
328
+
if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) {
329
+
// post is hidden - do not ~pass go~ push to the thread.
330
+
if depth == 1 {
331
+
has_other_replies = true;
332
+
}
333
+
continue;
334
+
}
335
+
336
+
let uri = post.uri.clone();
337
+
out.push(ThreadV2Item {
338
+
uri: post.uri.clone(),
339
+
depth,
340
+
value: ThreadV2ItemType::Post(ThreadItemPost {
341
+
post,
342
+
more_parents: false,
343
+
more_replies: more_replies as i32,
344
+
op_thread,
345
+
hidden_by_threadgate: false,
346
+
muted_by_viewer: false,
347
+
}),
348
+
});
349
+
350
+
if !at_max {
351
+
// we don't care about has_other_replies when recursing
352
+
let (children, _) = build_thread_children(
353
+
grouped_replies,
354
+
replies_hyd,
355
+
hidden,
356
+
&uri,
357
+
op_thread,
358
+
depth + 1,
359
+
opts,
360
+
);
361
+
362
+
out.extend(children);
363
+
}
364
+
}
365
+
366
+
(out, has_other_replies)
367
+
}
368
+
369
+
fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> {
370
+
move |a: &PostView, b: &PostView| match sort {
371
+
PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at),
372
+
PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at),
373
+
PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count),
374
+
}
375
+
}
376
+
377
+
fn did_is_cur(cur: &Option<String>, did: &String) -> bool {
378
+
match cur {
379
+
Some(cur) => did == cur,
380
+
None => false,
381
+
}
382
+
}
+1
-1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
+1
-1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
+55
-13
parakeet-db/src/models.rs
+55
-13
parakeet-db/src/models.rs
···
137
137
138
138
pub content: String,
139
139
pub facets: Option<serde_json::Value>,
140
-
pub languages: Vec<Option<String>>,
141
-
pub tags: Vec<Option<String>>,
140
+
pub languages: not_null_vec::TextArray,
141
+
pub tags: not_null_vec::TextArray,
142
142
143
143
pub parent_uri: Option<String>,
144
144
pub parent_cid: Option<String>,
···
148
148
pub embed: Option<String>,
149
149
pub embed_subtype: Option<String>,
150
150
151
-
pub mentions: Option<Vec<Option<String>>>,
151
+
pub mentions: Option<not_null_vec::TextArray>,
152
152
pub violates_threadgate: bool,
153
153
154
154
pub created_at: DateTime<Utc>,
···
236
236
pub cid: String,
237
237
pub post_uri: String,
238
238
239
-
pub detached: Vec<Option<String>>,
240
-
pub rules: Vec<Option<String>>,
239
+
pub detached: not_null_vec::TextArray,
240
+
pub rules: not_null_vec::TextArray,
241
241
242
242
pub created_at: DateTime<Utc>,
243
243
pub indexed_at: NaiveDateTime,
···
252
252
pub cid: String,
253
253
pub post_uri: String,
254
254
255
-
pub hidden_replies: Vec<Option<String>>,
256
-
pub allow: Option<Vec<Option<String>>>,
257
-
pub allowed_lists: Option<Vec<Option<String>>>,
255
+
pub hidden_replies: not_null_vec::TextArray,
256
+
pub allow: Option<not_null_vec::TextArray>,
257
+
pub allowed_lists: Option<not_null_vec::TextArray>,
258
258
259
259
pub record: serde_json::Value,
260
260
···
276
276
pub description: Option<String>,
277
277
pub description_facets: Option<serde_json::Value>,
278
278
pub list: String,
279
-
pub feeds: Option<Vec<Option<String>>>,
279
+
pub feeds: Option<not_null_vec::TextArray>,
280
280
281
281
pub created_at: DateTime<Utc>,
282
282
pub indexed_at: NaiveDateTime,
···
290
290
pub did: String,
291
291
pub cid: String,
292
292
293
-
pub reasons: Option<Vec<Option<String>>>,
294
-
pub subject_types: Option<Vec<Option<String>>>,
295
-
pub subject_collections: Option<Vec<Option<String>>>,
293
+
pub reasons: Option<not_null_vec::TextArray>,
294
+
pub subject_types: Option<not_null_vec::TextArray>,
295
+
pub subject_collections: Option<not_null_vec::TextArray>,
296
296
297
297
pub created_at: NaiveDateTime,
298
298
pub indexed_at: NaiveDateTime,
···
402
402
pub subject: String,
403
403
pub subject_cid: Option<String>,
404
404
pub subject_type: String,
405
-
pub tags: Vec<Option<String>>,
405
+
pub tags: not_null_vec::TextArray,
406
406
pub created_at: DateTime<Utc>,
407
407
}
408
408
···
430
430
pub typ: String,
431
431
pub sort_at: DateTime<Utc>,
432
432
}
433
+
434
+
pub use not_null_vec::TextArray;
435
+
mod not_null_vec {
436
+
use diesel::deserialize::FromSql;
437
+
use diesel::pg::Pg;
438
+
use diesel::sql_types::{Array, Nullable, Text};
439
+
use diesel::{deserialize, FromSqlRow};
440
+
use serde::{Deserialize, Serialize};
441
+
use std::ops::{Deref, DerefMut};
442
+
443
+
#[derive(Clone, Debug, Default, Serialize, Deserialize, FromSqlRow)]
444
+
#[diesel(sql_type = Array<Nullable<Text>>)]
445
+
pub struct TextArray(pub Vec<String>);
446
+
447
+
impl FromSql<Array<Nullable<Text>>, Pg> for TextArray {
448
+
fn from_sql(bytes: diesel::pg::PgValue<'_>) -> deserialize::Result<Self> {
449
+
let vec_with_nulls =
450
+
<Vec<Option<String>> as FromSql<Array<Nullable<Text>>, Pg>>::from_sql(bytes)?;
451
+
Ok(TextArray(vec_with_nulls.into_iter().flatten().collect()))
452
+
}
453
+
}
454
+
455
+
impl Deref for TextArray {
456
+
type Target = Vec<String>;
457
+
458
+
fn deref(&self) -> &Self::Target {
459
+
&self.0
460
+
}
461
+
}
462
+
463
+
impl DerefMut for TextArray {
464
+
fn deref_mut(&mut self) -> &mut Self::Target {
465
+
&mut self.0
466
+
}
467
+
}
468
+
469
+
impl From<TextArray> for Vec<String> {
470
+
fn from(v: TextArray) -> Vec<String> {
471
+
v.0
472
+
}
473
+
}
474
+
}