+12
-9
hydration/post.go
+12
-9
hydration/post.go
···
17
18
// PostInfo contains hydrated post information
19
type PostInfo struct {
20
URI string
21
Cid string
22
Post *bsky.FeedPost
···
39
ctx, span := tracer.Start(ctx, "hydratePost")
40
defer span.End()
41
42
autoFetch, _ := ctx.Value("auto-fetch").(bool)
43
44
authorDid := extractDIDFromURI(uri)
···
47
return nil, err
48
}
49
50
-
// Query post from database
51
-
var dbPost models.Post
52
-
if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil {
53
-
return nil, fmt.Errorf("failed to query post: %w", err)
54
-
}
55
-
56
if dbPost.NotFound || len(dbPost.Raw) == 0 {
57
if autoFetch {
58
h.AddMissingRecord(uri, true)
···
75
76
var wg sync.WaitGroup
77
78
-
// Get author DID
79
-
80
-
authorDID := extractDIDFromURI(uri)
81
82
// Get engagement counts
83
var likes, reposts, replies int
···
121
wg.Wait()
122
123
info := &PostInfo{
124
URI: uri,
125
Cid: dbPost.Cid,
126
Post: &feedPost,
···
17
18
// PostInfo contains hydrated post information
19
type PostInfo struct {
20
+
ID uint
21
URI string
22
Cid string
23
Post *bsky.FeedPost
···
40
ctx, span := tracer.Start(ctx, "hydratePost")
41
defer span.End()
42
43
+
p, err := h.backend.GetPostByUri(ctx, uri, "*")
44
+
if err != nil {
45
+
return nil, err
46
+
}
47
+
48
+
return h.HydratePostDB(ctx, uri, p, viewerDID)
49
+
}
50
+
51
+
func (h *Hydrator) HydratePostDB(ctx context.Context, uri string, dbPost *models.Post, viewerDID string) (*PostInfo, error) {
52
autoFetch, _ := ctx.Value("auto-fetch").(bool)
53
54
authorDid := extractDIDFromURI(uri)
···
57
return nil, err
58
}
59
60
if dbPost.NotFound || len(dbPost.Raw) == 0 {
61
if autoFetch {
62
h.AddMissingRecord(uri, true)
···
79
80
var wg sync.WaitGroup
81
82
+
authorDID := r.Did
83
84
// Get engagement counts
85
var likes, reposts, replies int
···
123
wg.Wait()
124
125
info := &PostInfo{
126
+
ID: dbPost.ID,
127
URI: uri,
128
Cid: dbPost.Cid,
129
Post: &feedPost,
+10
hydration/utils.go
+10
hydration/utils.go
···
5
"fmt"
6
7
"github.com/bluesky-social/indigo/atproto/syntax"
8
+
"github.com/whyrusleeping/market/models"
9
)
10
11
func (h *Hydrator) NormalizeUri(ctx context.Context, uri string) (string, error) {
···
28
29
return fmt.Sprintf("at://%s/%s/%s", did, puri.Collection().String(), puri.RecordKey().String()), nil
30
}
31
+
32
+
func (h *Hydrator) UriForPost(ctx context.Context, p *models.Post) (string, error) {
33
+
r, err := h.backend.GetRepoByID(ctx, p.Author)
34
+
if err != nil {
35
+
return "", err
36
+
}
37
+
38
+
return fmt.Sprintf("at://%s/app.bsky.feed.post/%s", r.Did, p.Rkey), nil
39
+
}
+11
-11
xrpc/feed/getPostThread.go
+11
-11
xrpc/feed/getPostThread.go
···
15
func HandleGetPostThread(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
16
uriParam := c.QueryParam("uri")
17
if uriParam == "" {
18
-
return c.JSON(http.StatusBadRequest, map[string]interface{}{
19
"error": "InvalidRequest",
20
"message": "uri parameter is required",
21
})
···
27
// Hydrate the requested post
28
postInfo, err := hydrator.HydratePost(ctx, uriParam, viewer)
29
if err != nil {
30
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
31
"error": "NotFound",
32
"message": "post not found",
33
})
···
74
uri: uri,
75
replyTo: tp.ReplyTo,
76
inThread: tp.InThread,
77
-
replies: []interface{}{},
78
}
79
}
80
···
98
}
99
100
if rootNode == nil {
101
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
102
"error": "NotFound",
103
"message": "thread root not found",
104
})
···
107
// Build the response by traversing the tree
108
thread := buildThreadView(ctx, db, rootNode, postsByID, hydrator, viewer, nil)
109
110
-
return c.JSON(http.StatusOK, map[string]interface{}{
111
"thread": thread,
112
})
113
}
···
117
uri string
118
replyTo uint
119
inThread uint
120
-
replies []interface{}
121
}
122
123
-
func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent interface{}) interface{} {
124
// Hydrate this post
125
postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer)
126
if err != nil {
127
// Return a notFound post
128
-
return map[string]interface{}{
129
"$type": "app.bsky.feed.defs#notFoundPost",
130
"uri": node.uri,
131
}
···
134
// Hydrate author
135
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
136
if err != nil {
137
-
return map[string]interface{}{
138
"$type": "app.bsky.feed.defs#notFoundPost",
139
"uri": node.uri,
140
}
141
}
142
143
// Build replies
144
-
var replies []interface{}
145
for _, replyNode := range node.replies {
146
if rn, ok := replyNode.(*threadPostNode); ok {
147
replyView := buildThreadView(ctx, db, rn, allNodes, hydrator, viewer, nil)
···
150
}
151
152
// Build the thread view post
153
-
var repliesForView interface{}
154
if len(replies) > 0 {
155
repliesForView = replies
156
}
···
15
func HandleGetPostThread(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
16
uriParam := c.QueryParam("uri")
17
if uriParam == "" {
18
+
return c.JSON(http.StatusBadRequest, map[string]any{
19
"error": "InvalidRequest",
20
"message": "uri parameter is required",
21
})
···
27
// Hydrate the requested post
28
postInfo, err := hydrator.HydratePost(ctx, uriParam, viewer)
29
if err != nil {
30
+
return c.JSON(http.StatusNotFound, map[string]any{
31
"error": "NotFound",
32
"message": "post not found",
33
})
···
74
uri: uri,
75
replyTo: tp.ReplyTo,
76
inThread: tp.InThread,
77
+
replies: []any{},
78
}
79
}
80
···
98
}
99
100
if rootNode == nil {
101
+
return c.JSON(http.StatusNotFound, map[string]any{
102
"error": "NotFound",
103
"message": "thread root not found",
104
})
···
107
// Build the response by traversing the tree
108
thread := buildThreadView(ctx, db, rootNode, postsByID, hydrator, viewer, nil)
109
110
+
return c.JSON(http.StatusOK, map[string]any{
111
"thread": thread,
112
})
113
}
···
117
uri string
118
replyTo uint
119
inThread uint
120
+
replies []any
121
}
122
123
+
func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent any) any {
124
// Hydrate this post
125
postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer)
126
if err != nil {
127
// Return a notFound post
128
+
return map[string]any{
129
"$type": "app.bsky.feed.defs#notFoundPost",
130
"uri": node.uri,
131
}
···
134
// Hydrate author
135
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
136
if err != nil {
137
+
return map[string]any{
138
"$type": "app.bsky.feed.defs#notFoundPost",
139
"uri": node.uri,
140
}
141
}
142
143
// Build replies
144
+
var replies []any
145
for _, replyNode := range node.replies {
146
if rn, ok := replyNode.(*threadPostNode); ok {
147
replyView := buildThreadView(ctx, db, rn, allNodes, hydrator, viewer, nil)
···
150
}
151
152
// Build the thread view post
153
+
var repliesForView any
154
if len(replies) > 0 {
155
repliesForView = replies
156
}
+135
-133
xrpc/unspecced/getPostThreadV2.go
+135
-133
xrpc/unspecced/getPostThreadV2.go
···
1
package unspecced
2
3
import (
4
"context"
5
"fmt"
6
"log/slog"
···
11
"github.com/labstack/echo/v4"
12
"github.com/whyrusleeping/konbini/hydration"
13
"github.com/whyrusleeping/konbini/views"
14
"gorm.io/gorm"
15
)
16
···
69
})
70
}
71
72
-
// Determine the root post ID for the thread
73
-
rootPostID := anchorPostInfo.InThread
74
-
if rootPostID == 0 {
75
-
// This post is the root - get its ID
76
-
var postID uint
77
-
db.Raw(`
78
-
SELECT id FROM posts
79
-
WHERE author = (SELECT id FROM repos WHERE did = ?)
80
-
AND rkey = ?
81
-
`, extractDIDFromURI(anchorUri), extractRkeyFromURI(anchorUri)).Scan(&postID)
82
-
rootPostID = postID
83
}
84
85
-
// Query all posts in this thread
86
-
type threadPostRow struct {
87
-
ID uint
88
-
Rkey string
89
-
ReplyTo uint
90
-
InThread uint
91
-
AuthorDid string
92
}
93
-
var threadPosts []threadPostRow
94
-
db.Raw(`
95
-
SELECT p.id, p.rkey, p.reply_to, p.in_thread, r.did as author_did
96
-
FROM posts p
97
-
JOIN repos r ON r.id = p.author
98
-
WHERE (p.id = ? OR p.in_thread = ?)
99
-
AND p.not_found = false
100
-
ORDER BY p.created ASC
101
-
`, rootPostID, rootPostID).Scan(&threadPosts)
102
103
-
// Build a map of posts by ID
104
-
postsByID := make(map[uint]*threadNode)
105
-
for _, tp := range threadPosts {
106
-
uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", tp.AuthorDid, tp.Rkey)
107
-
postsByID[tp.ID] = &threadNode{
108
-
id: tp.ID,
109
-
uri: uri,
110
-
replyTo: tp.ReplyTo,
111
-
inThread: tp.InThread,
112
-
children: []*threadNode{},
113
-
}
114
-
}
115
-
116
-
// Build parent-child relationships
117
-
for _, node := range postsByID {
118
-
if node.replyTo != 0 {
119
-
parent := postsByID[node.replyTo]
120
-
if parent != nil {
121
-
parent.children = append(parent.children, node)
122
-
}
123
-
}
124
-
}
125
-
126
-
// Find the anchor node
127
-
anchorID := uint(0)
128
-
for id, node := range postsByID {
129
-
if node.uri == anchorUri {
130
-
anchorID = id
131
-
break
132
-
}
133
-
}
134
135
-
if anchorID == 0 {
136
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
137
-
"error": "NotFound",
138
-
"message": "anchor post not found in thread",
139
-
})
140
}
141
142
-
anchorNode := postsByID[anchorID]
143
144
// Build flat thread items list
145
var threadItems []*bsky.UnspeccedGetPostThreadV2_ThreadItem
···
147
148
// Add parents if requested
149
if above {
150
-
parents := collectParents(anchorNode, postsByID)
151
-
for i := len(parents) - 1; i >= 0; i-- {
152
-
depth := int64(-(len(parents) - i))
153
-
item := buildThreadItem(ctx, hydrator, parents[i], depth, viewer)
154
if item != nil {
155
threadItems = append(threadItems, item)
156
}
157
}
158
}
159
160
// Add anchor post (depth 0)
161
-
anchorItem := buildThreadItem(ctx, hydrator, anchorNode, 0, viewer)
162
if anchorItem != nil {
163
threadItems = append(threadItems, anchorItem)
164
}
165
166
// Add replies below anchor
167
if below > 0 {
168
-
replies, hasMore := collectReplies(ctx, hydrator, anchorNode, 1, below, branchingFactor, sort, viewer)
169
threadItems = append(threadItems, replies...)
170
-
hasOtherReplies = hasMore
171
}
172
173
return c.JSON(http.StatusOK, &bsky.UnspeccedGetPostThreadV2_Output{
···
176
})
177
}
178
179
-
type threadNode struct {
180
-
id uint
181
-
uri string
182
-
replyTo uint
183
-
inThread uint
184
-
children []*threadNode
185
-
}
186
-
187
-
func collectParents(node *threadNode, allNodes map[uint]*threadNode) []*threadNode {
188
-
var parents []*threadNode
189
-
current := node
190
-
for current.replyTo != 0 {
191
-
parent := allNodes[current.replyTo]
192
-
if parent == nil {
193
-
break
194
-
}
195
-
parents = append(parents, parent)
196
-
current = parent
197
}
198
-
return parents
199
-
}
200
201
-
func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, currentDepth, maxDepth, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, bool) {
202
-
var items []*bsky.UnspeccedGetPostThreadV2_ThreadItem
203
-
hasMore := false
204
205
-
if currentDepth > maxDepth {
206
-
return items, false
207
-
}
208
-
209
-
// Sort children based on sort parameter
210
-
children := node.children
211
-
// TODO: Actually sort based on the sort parameter (newest/oldest/top)
212
-
// For now, just use the order we have
213
214
-
// Limit to branchingFactor
215
-
limit := int(branchingFactor)
216
-
if len(children) > limit {
217
-
hasMore = true
218
-
children = children[:limit]
219
}
220
221
-
for _, child := range children {
222
-
item := buildThreadItem(ctx, hydrator, child, currentDepth, viewer)
223
-
if item != nil {
224
-
items = append(items, item)
225
226
-
// Recursively collect replies
227
-
if currentDepth < maxDepth {
228
-
childReplies, childHasMore := collectReplies(ctx, hydrator, child, currentDepth+1, maxDepth, branchingFactor, sort, viewer)
229
-
items = append(items, childReplies...)
230
-
if childHasMore {
231
-
hasMore = true
232
-
}
233
-
}
234
}
235
}
236
237
-
return items, hasMore
238
-
}
239
-
240
-
func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem {
241
// Hydrate the post
242
-
postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer)
243
if err != nil {
244
// Return not found item
245
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
246
Depth: depth,
···
256
// Hydrate author
257
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
258
if err != nil {
259
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
260
Depth: depth,
261
Uri: node.uri,
···
319
return string(parts)
320
}
321
322
-
func extractRkeyFromURI(uri string) string {
323
-
// URI format: at://did:plc:xxx/collection/rkey
324
-
if len(uri) < 5 || uri[:5] != "at://" {
325
-
return ""
326
}
327
-
// Find last slash
328
-
for i := len(uri) - 1; i >= 5; i-- {
329
-
if uri[i] == '/' {
330
-
return uri[i+1:]
331
}
332
}
333
-
return ""
334
}
···
1
package unspecced
2
3
import (
4
+
"bytes"
5
"context"
6
"fmt"
7
"log/slog"
···
12
"github.com/labstack/echo/v4"
13
"github.com/whyrusleeping/konbini/hydration"
14
"github.com/whyrusleeping/konbini/views"
15
+
"github.com/whyrusleeping/market/models"
16
"gorm.io/gorm"
17
)
18
···
71
})
72
}
73
74
+
threadID := anchorPostInfo.InThread
75
+
if threadID == 0 {
76
+
threadID = anchorPostInfo.ID
77
}
78
79
+
var threadPosts []*models.Post
80
+
if err := db.Raw("SELECT * FROM posts WHERE in_thread = ? OR id = ?", threadID, anchorPostInfo.ID).Scan(&threadPosts).Error; err != nil {
81
+
return err
82
}
83
84
+
fmt.Println("GOT THREAD POSTS: ", len(threadPosts))
85
86
+
treeNodes, err := buildThreadTree(ctx, hydrator, db, threadPosts)
87
+
if err != nil {
88
+
return fmt.Errorf("failed to construct tree: %w", err)
89
}
90
91
+
anchor := treeNodes[anchorPostInfo.ID]
92
93
// Build flat thread items list
94
var threadItems []*bsky.UnspeccedGetPostThreadV2_ThreadItem
···
96
97
// Add parents if requested
98
if above {
99
+
parent := anchor.parent
100
+
depth := int64(-1)
101
+
for parent != nil {
102
+
if parent.missing {
103
+
fmt.Println("Parent missing: ", depth)
104
+
item := &bsky.UnspeccedGetPostThreadV2_ThreadItem{
105
+
Depth: depth,
106
+
Uri: parent.uri,
107
+
Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{
108
+
UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{
109
+
LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound",
110
+
},
111
+
},
112
+
}
113
+
114
+
threadItems = append(threadItems, item)
115
+
break
116
+
}
117
+
118
+
item := buildThreadItem(ctx, hydrator, parent, depth, viewer)
119
if item != nil {
120
threadItems = append(threadItems, item)
121
}
122
+
123
+
parent = parent.parent
124
+
depth--
125
}
126
}
127
128
// Add anchor post (depth 0)
129
+
anchorItem := buildThreadItem(ctx, hydrator, anchor, 0, viewer)
130
if anchorItem != nil {
131
threadItems = append(threadItems, anchorItem)
132
}
133
134
// Add replies below anchor
135
if below > 0 {
136
+
replies, err := collectReplies(ctx, hydrator, anchor, 0, below, branchingFactor, sort, viewer)
137
+
if err != nil {
138
+
return err
139
+
}
140
threadItems = append(threadItems, replies...)
141
+
//hasOtherReplies = hasMore
142
}
143
144
return c.JSON(http.StatusOK, &bsky.UnspeccedGetPostThreadV2_Output{
···
147
})
148
}
149
150
+
func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, curnode *threadTree, depth int64, below int64, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, error) {
151
+
if below == 0 {
152
+
return nil, nil
153
}
154
155
+
var out []*bsky.UnspeccedGetPostThreadV2_ThreadItem
156
+
for _, child := range curnode.children {
157
+
out = append(out, buildThreadItem(ctx, hydrator, child, depth+1, viewer))
158
+
if child.missing {
159
+
continue
160
+
}
161
162
+
sub, err := collectReplies(ctx, hydrator, child, depth+1, below-1, branchingFactor, sort, viewer)
163
+
if err != nil {
164
+
return nil, err
165
+
}
166
167
+
out = append(out, sub...)
168
}
169
170
+
return out, nil
171
+
}
172
173
+
func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadTree, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem {
174
+
if node.missing {
175
+
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
176
+
Depth: depth,
177
+
Uri: node.uri,
178
+
Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{
179
+
UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{
180
+
LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound",
181
+
},
182
+
},
183
}
184
}
185
186
// Hydrate the post
187
+
postInfo, err := hydrator.HydratePostDB(ctx, node.uri, node.val, viewer)
188
if err != nil {
189
+
slog.Error("failed to hydrate post in thread item", "uri", node.uri, "error", err)
190
// Return not found item
191
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
192
Depth: depth,
···
202
// Hydrate author
203
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
204
if err != nil {
205
+
slog.Error("failed to hydrate actor in thread item", "author", postInfo.Author, "error", err)
206
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
207
Depth: depth,
208
Uri: node.uri,
···
266
return string(parts)
267
}
268
269
+
type threadTree struct {
270
+
parent *threadTree
271
+
children []*threadTree
272
+
273
+
val *models.Post
274
+
275
+
missing bool
276
+
277
+
uri string
278
+
cid string
279
+
}
280
+
281
+
func buildThreadTree(ctx context.Context, hydrator *hydration.Hydrator, db *gorm.DB, posts []*models.Post) (map[uint]*threadTree, error) {
282
+
nodes := make(map[uint]*threadTree)
283
+
for _, p := range posts {
284
+
puri, err := hydrator.UriForPost(ctx, p)
285
+
if err != nil {
286
+
return nil, err
287
+
}
288
+
289
+
t := &threadTree{
290
+
val: p,
291
+
uri: puri,
292
+
}
293
+
294
+
nodes[p.ID] = t
295
}
296
+
297
+
missing := make(map[uint]*threadTree)
298
+
for _, node := range nodes {
299
+
if node.val.ReplyTo == 0 {
300
+
continue
301
+
}
302
+
303
+
pnode, ok := nodes[node.val.ReplyTo]
304
+
if !ok {
305
+
pnode = &threadTree{
306
+
missing: true,
307
+
}
308
+
missing[node.val.ReplyTo] = pnode
309
+
310
+
var bspost bsky.FeedPost
311
+
if err := bspost.UnmarshalCBOR(bytes.NewReader(node.val.Raw)); err != nil {
312
+
return nil, err
313
+
}
314
+
315
+
if bspost.Reply == nil || bspost.Reply.Parent == nil {
316
+
return nil, fmt.Errorf("node with parent had no parent in object")
317
+
}
318
+
319
+
pnode.uri = bspost.Reply.Parent.Uri
320
+
pnode.cid = bspost.Reply.Parent.Cid
321
+
322
+
/* Maybe we could force hydrate these?
323
+
hydrator.AddMissingRecord(puri, true)
324
+
*/
325
}
326
+
327
+
pnode.children = append(pnode.children, node)
328
+
node.parent = pnode
329
}
330
+
331
+
for k, v := range missing {
332
+
nodes[k] = v
333
+
}
334
+
335
+
return nodes, nil
336
}