+3
events.go
+3
events.go
···
18
"github.com/jackc/pgx/v5/pgconn"
19
"github.com/jackc/pgx/v5/pgxpool"
20
"gorm.io/gorm"
21
+
22
+
. "github.com/whyrusleeping/konbini/models"
23
)
24
25
type PostgresBackend struct {
···
647
Author: repo.ID,
648
Rkey: rkey,
649
Did: rec.Did,
650
+
Raw: recb,
651
}).Error; err != nil {
652
return err
653
}
+2
handlers.go
+2
handlers.go
+13
-2
hydration/hydrator.go
+13
-2
hydration/hydrator.go
···
10
db *gorm.DB
11
dir identity.Directory
12
13
-
missingActorCallback func(string)
14
-
missingPostCallback func(string)
15
}
16
17
// NewHydrator creates a new Hydrator
···
29
func (h *Hydrator) addMissingActor(did string) {
30
if h.missingActorCallback != nil {
31
h.missingActorCallback(did)
32
}
33
}
34
···
10
db *gorm.DB
11
dir identity.Directory
12
13
+
missingActorCallback func(string)
14
+
missingPostCallback func(string)
15
+
missingFeedGeneratorCallback func(string)
16
}
17
18
// NewHydrator creates a new Hydrator
···
30
func (h *Hydrator) addMissingActor(did string) {
31
if h.missingActorCallback != nil {
32
h.missingActorCallback(did)
33
+
}
34
+
}
35
+
36
+
func (h *Hydrator) SetMissingFeedGeneratorCallback(fn func(string)) {
37
+
h.missingFeedGeneratorCallback = fn
38
+
}
39
+
40
+
func (h *Hydrator) AddMissingFeedGenerator(uri string) {
41
+
if h.missingFeedGeneratorCallback != nil {
42
+
h.missingFeedGeneratorCallback(uri)
43
}
44
}
45
+29
hydration/utils.go
+29
hydration/utils.go
···
···
1
+
package hydration
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
7
+
"github.com/bluesky-social/indigo/atproto/syntax"
8
+
)
9
+
10
+
func (h *Hydrator) NormalizeUri(ctx context.Context, uri string) (string, error) {
11
+
puri, err := syntax.ParseATURI(uri)
12
+
if err != nil {
13
+
return "", fmt.Errorf("invalid uri: %w", err)
14
+
}
15
+
16
+
var did string
17
+
if !puri.Authority().IsDID() {
18
+
resp, err := h.dir.LookupHandle(ctx, syntax.Handle(puri.Authority().String()))
19
+
if err != nil {
20
+
return "", err
21
+
}
22
+
23
+
did = resp.DID.String()
24
+
} else {
25
+
did = puri.Authority().String()
26
+
}
27
+
28
+
return fmt.Sprintf("at://%s/%s/%s", did, puri.Collection().String(), puri.RecordKey().String()), nil
29
+
}
+10
-5
main.go
+10
-5
main.go
···
30
"github.com/urfave/cli/v2"
31
"github.com/whyrusleeping/konbini/xrpc"
32
"gorm.io/gorm/logger"
33
)
34
35
var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{
···
151
client: cc,
152
dir: dir,
153
154
-
missingProfiles: make(chan string, 1024),
155
-
missingPosts: make(chan string, 1024),
156
}
157
fmt.Println("MY DID: ", s.mydid)
158
···
199
200
go s.missingProfileFetcher()
201
go s.missingPostFetcher()
202
203
seqno, err := loadLastSeq(db, "firehose_seq")
204
if err != nil {
···
223
seqLk sync.Mutex
224
lastSeq int64
225
226
-
mpLk sync.Mutex
227
-
missingProfiles chan string
228
-
missingPosts chan string
229
}
230
231
func (s *Server) getXrpcClient() (*xrpclib.Client, error) {
···
30
"github.com/urfave/cli/v2"
31
"github.com/whyrusleeping/konbini/xrpc"
32
"gorm.io/gorm/logger"
33
+
34
+
. "github.com/whyrusleeping/konbini/models"
35
)
36
37
var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{
···
153
client: cc,
154
dir: dir,
155
156
+
missingProfiles: make(chan string, 1024),
157
+
missingPosts: make(chan string, 1024),
158
+
missingFeedGenerators: make(chan string, 1024),
159
}
160
fmt.Println("MY DID: ", s.mydid)
161
···
202
203
go s.missingProfileFetcher()
204
go s.missingPostFetcher()
205
+
go s.missingFeedGeneratorFetcher()
206
207
seqno, err := loadLastSeq(db, "firehose_seq")
208
if err != nil {
···
227
seqLk sync.Mutex
228
lastSeq int64
229
230
+
mpLk sync.Mutex
231
+
missingProfiles chan string
232
+
missingPosts chan string
233
+
missingFeedGenerators chan string
234
}
235
236
func (s *Server) getXrpcClient() (*xrpclib.Client, error) {
+64
missing.go
+64
missing.go
···
131
132
return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
133
}
134
+
135
+
func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) {
136
+
slog.Info("adding missing feed generator to fetch queue", "uri", uri)
137
+
select {
138
+
case s.missingFeedGenerators <- uri:
139
+
case <-ctx.Done():
140
+
}
141
+
}
142
+
143
+
func (s *Server) missingFeedGeneratorFetcher() {
144
+
for uri := range s.missingFeedGenerators {
145
+
if err := s.fetchMissingFeedGenerator(context.TODO(), uri); err != nil {
146
+
log.Warn("failed to fetch missing feed generator", "uri", uri, "error", err)
147
+
}
148
+
}
149
+
}
150
+
151
+
func (s *Server) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
152
+
// Parse AT URI: at://did:plc:xxx/app.bsky.feed.generator/rkey
153
+
puri, err := syntax.ParseATURI(uri)
154
+
if err != nil {
155
+
return fmt.Errorf("invalid AT URI: %s", uri)
156
+
}
157
+
158
+
did := puri.Authority().String()
159
+
collection := puri.Collection().String()
160
+
rkey := puri.RecordKey().String()
161
+
162
+
repo, err := s.backend.getOrCreateRepo(ctx, did)
163
+
if err != nil {
164
+
return err
165
+
}
166
+
167
+
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
168
+
if err != nil {
169
+
return err
170
+
}
171
+
172
+
c := &xrpclib.Client{
173
+
Host: resp.PDSEndpoint(),
174
+
}
175
+
176
+
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
177
+
if err != nil {
178
+
return err
179
+
}
180
+
181
+
feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator)
182
+
if !ok {
183
+
return fmt.Errorf("record we got back wasn't a feed generator somehow")
184
+
}
185
+
186
+
buf := new(bytes.Buffer)
187
+
if err := feedGen.MarshalCBOR(buf); err != nil {
188
+
return err
189
+
}
190
+
191
+
cc, err := cid.Decode(*rec.Cid)
192
+
if err != nil {
193
+
return err
194
+
}
195
+
196
+
return s.backend.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc)
197
+
}
+1
-1
models.go
models/models.go
+1
-1
models.go
models/models.go
+6
pgbackend.go
+6
pgbackend.go
···
17
"gorm.io/gorm"
18
"gorm.io/gorm/clause"
19
"gorm.io/gorm/logger"
20
+
21
+
. "github.com/whyrusleeping/konbini/models"
22
)
23
24
func (b *PostgresBackend) getOrCreateRepo(ctx context.Context, did string) (*Repo, error) {
···
406
func (b *PostgresBackend) TrackMissingActor(did string) {
407
b.s.addMissingProfile(context.TODO(), did)
408
}
409
+
410
+
func (b *PostgresBackend) TrackMissingFeedGenerator(uri string) {
411
+
b.s.addMissingFeedGenerator(context.TODO(), uri)
412
+
}
+2
seqno.go
+2
seqno.go
+136
-5
views/feed.go
+136
-5
views/feed.go
···
1
package views
2
3
import (
4
"github.com/bluesky-social/indigo/api/bsky"
5
"github.com/bluesky-social/indigo/lex/util"
6
"github.com/whyrusleeping/konbini/hydration"
···
40
}
41
}
42
43
-
// TODO: Add embed handling - need to convert embed types to proper views
44
-
// if post.Post.Embed != nil {
45
-
// view.Embed = formatEmbed(post.Post.Embed)
46
-
// }
47
48
return view
49
}
···
56
}
57
58
// ThreadViewPost builds a thread view post (app.bsky.feed.defs#threadViewPost)
59
-
func ThreadViewPost(post *hydration.PostInfo, author *hydration.ActorInfo, parent, replies interface{}) *bsky.FeedDefs_ThreadViewPost {
60
view := &bsky.FeedDefs_ThreadViewPost{
61
LexiconTypeID: "app.bsky.feed.defs#threadViewPost",
62
Post: PostView(post, author),
···
67
68
return view
69
}
···
1
package views
2
3
import (
4
+
"fmt"
5
+
6
"github.com/bluesky-social/indigo/api/bsky"
7
"github.com/bluesky-social/indigo/lex/util"
8
"github.com/whyrusleeping/konbini/hydration"
···
42
}
43
}
44
45
+
// Add embed handling
46
+
if post.Post.Embed != nil {
47
+
view.Embed = formatEmbed(post.Post.Embed, post.Author)
48
+
}
49
50
return view
51
}
···
58
}
59
60
// ThreadViewPost builds a thread view post (app.bsky.feed.defs#threadViewPost)
61
+
func ThreadViewPost(post *hydration.PostInfo, author *hydration.ActorInfo, parent, replies any) *bsky.FeedDefs_ThreadViewPost {
62
view := &bsky.FeedDefs_ThreadViewPost{
63
LexiconTypeID: "app.bsky.feed.defs#threadViewPost",
64
Post: PostView(post, author),
···
69
70
return view
71
}
72
+
73
+
func formatEmbed(embed *bsky.FeedPost_Embed, authorDID string) *bsky.FeedDefs_PostView_Embed {
74
+
if embed == nil {
75
+
return nil
76
+
}
77
+
78
+
result := &bsky.FeedDefs_PostView_Embed{}
79
+
80
+
// Handle images
81
+
if embed.EmbedImages != nil {
82
+
viewImages := make([]*bsky.EmbedImages_ViewImage, len(embed.EmbedImages.Images))
83
+
for i, img := range embed.EmbedImages.Images {
84
+
// Convert blob to CDN URLs
85
+
fullsize := ""
86
+
thumb := ""
87
+
if img.Image != nil {
88
+
// CDN URL format for feed images
89
+
cid := img.Image.Ref.String()
90
+
fullsize = fmt.Sprintf("https://cdn.bsky.app/img/feed_fullsize/plain/%s/%s@jpeg", authorDID, cid)
91
+
thumb = fmt.Sprintf("https://cdn.bsky.app/img/feed_thumbnail/plain/%s/%s@jpeg", authorDID, cid)
92
+
}
93
+
94
+
viewImages[i] = &bsky.EmbedImages_ViewImage{
95
+
Alt: img.Alt,
96
+
AspectRatio: img.AspectRatio,
97
+
Fullsize: fullsize,
98
+
Thumb: thumb,
99
+
}
100
+
}
101
+
result.EmbedImages_View = &bsky.EmbedImages_View{
102
+
LexiconTypeID: "app.bsky.embed.images#view",
103
+
Images: viewImages,
104
+
}
105
+
return result
106
+
}
107
+
108
+
// Handle external links
109
+
if embed.EmbedExternal != nil && embed.EmbedExternal.External != nil {
110
+
// Convert blob thumb to CDN URL if present
111
+
var thumbURL *string
112
+
if embed.EmbedExternal.External.Thumb != nil {
113
+
// CDN URL for external link thumbnails
114
+
cid := embed.EmbedExternal.External.Thumb.Ref.String()
115
+
url := fmt.Sprintf("https://cdn.bsky.app/img/feed_thumbnail/plain/%s/%s@jpeg", authorDID, cid)
116
+
thumbURL = &url
117
+
}
118
+
119
+
result.EmbedExternal_View = &bsky.EmbedExternal_View{
120
+
LexiconTypeID: "app.bsky.embed.external#view",
121
+
External: &bsky.EmbedExternal_ViewExternal{
122
+
Uri: embed.EmbedExternal.External.Uri,
123
+
Title: embed.EmbedExternal.External.Title,
124
+
Description: embed.EmbedExternal.External.Description,
125
+
Thumb: thumbURL,
126
+
},
127
+
}
128
+
return result
129
+
}
130
+
131
+
// Handle video
132
+
if embed.EmbedVideo != nil {
133
+
// TODO: Implement video embed view
134
+
// This would require converting video blob to CDN URLs and playlist URLs
135
+
return nil
136
+
}
137
+
138
+
// Handle record (quote posts, etc.)
139
+
if embed.EmbedRecord != nil {
140
+
// TODO: Implement record embed view
141
+
// This requires hydrating the embedded record, which is complex
142
+
// For now, return nil to skip these embeds
143
+
return nil
144
+
}
145
+
146
+
// Handle record with media (quote post with images/external)
147
+
if embed.EmbedRecordWithMedia != nil {
148
+
// TODO: Implement record with media embed view
149
+
// This combines record hydration with media conversion
150
+
return nil
151
+
}
152
+
153
+
return nil
154
+
}
155
+
156
+
// GeneratorView builds a feed generator view (app.bsky.feed.defs#generatorView)
157
+
func GeneratorView(uri, cid string, record *bsky.FeedGenerator, creator *hydration.ActorInfo, likeCount int64, viewerLike string, indexedAt string) *bsky.FeedDefs_GeneratorView {
158
+
view := &bsky.FeedDefs_GeneratorView{
159
+
LexiconTypeID: "app.bsky.feed.defs#generatorView",
160
+
Uri: uri,
161
+
Cid: cid,
162
+
Did: record.Did,
163
+
Creator: ProfileView(creator),
164
+
DisplayName: record.DisplayName,
165
+
Description: record.Description,
166
+
IndexedAt: indexedAt,
167
+
}
168
+
169
+
// Add optional fields
170
+
if record.Avatar != nil {
171
+
avatarURL := fmt.Sprintf("https://cdn.bsky.app/img/avatar/plain/%s/%s@jpeg", creator.DID, record.Avatar.Ref.String())
172
+
view.Avatar = &avatarURL
173
+
}
174
+
175
+
if record.DescriptionFacets != nil && len(record.DescriptionFacets) > 0 {
176
+
view.DescriptionFacets = record.DescriptionFacets
177
+
}
178
+
179
+
if record.AcceptsInteractions != nil {
180
+
view.AcceptsInteractions = record.AcceptsInteractions
181
+
}
182
+
183
+
if record.ContentMode != nil {
184
+
view.ContentMode = record.ContentMode
185
+
}
186
+
187
+
// Add like count if present
188
+
if likeCount > 0 {
189
+
view.LikeCount = &likeCount
190
+
}
191
+
192
+
// Add viewer state if viewer has liked
193
+
if viewerLike != "" {
194
+
view.Viewer = &bsky.FeedDefs_GeneratorViewerState{
195
+
Like: &viewerLike,
196
+
}
197
+
}
198
+
199
+
return view
200
+
}
+2
-15
xrpc/actor/getProfiles.go
+2
-15
xrpc/actor/getProfiles.go
···
3
import (
4
"net/http"
5
6
"github.com/labstack/echo/v4"
7
"github.com/whyrusleeping/konbini/hydration"
8
"github.com/whyrusleeping/konbini/views"
···
28
ctx := c.Request().Context()
29
30
// Resolve all actors to DIDs and hydrate profiles
31
-
profiles := make([]interface{}, 0)
32
for _, actor := range actors {
33
// Resolve actor to DID
34
did, err := hydrator.ResolveDID(ctx, actor)
···
43
// Skip actors that can't be hydrated
44
continue
45
}
46
-
47
-
// Get counts for the profile
48
-
type counts struct {
49
-
Followers int
50
-
Follows int
51
-
Posts int
52
-
}
53
-
var c counts
54
-
db.Raw(`
55
-
SELECT
56
-
(SELECT COUNT(*) FROM follows WHERE subject = (SELECT id FROM repos WHERE did = ?)) as followers,
57
-
(SELECT COUNT(*) FROM follows WHERE author = (SELECT id FROM repos WHERE did = ?)) as follows,
58
-
(SELECT COUNT(*) FROM posts WHERE author = (SELECT id FROM repos WHERE did = ?)) as posts
59
-
`, did, did, did).Scan(&c)
60
61
profiles = append(profiles, views.ProfileViewDetailed(actorInfo))
62
}
···
3
import (
4
"net/http"
5
6
+
"github.com/bluesky-social/indigo/api/bsky"
7
"github.com/labstack/echo/v4"
8
"github.com/whyrusleeping/konbini/hydration"
9
"github.com/whyrusleeping/konbini/views"
···
29
ctx := c.Request().Context()
30
31
// Resolve all actors to DIDs and hydrate profiles
32
+
profiles := make([]*bsky.ActorDefs_ProfileViewDetailed, 0, len(actors))
33
for _, actor := range actors {
34
// Resolve actor to DID
35
did, err := hydrator.ResolveDID(ctx, actor)
···
44
// Skip actors that can't be hydrated
45
continue
46
}
47
48
profiles = append(profiles, views.ProfileViewDetailed(actorInfo))
49
}
+59
-25
xrpc/feed/getAuthorFeed.go
+59
-25
xrpc/feed/getAuthorFeed.go
···
1
package feed
2
3
import (
4
"net/http"
5
"strconv"
6
"time"
7
8
"github.com/labstack/echo/v4"
9
"github.com/whyrusleeping/konbini/hydration"
10
"github.com/whyrusleeping/konbini/views"
11
"gorm.io/gorm"
12
)
13
14
// HandleGetAuthorFeed implements app.bsky.feed.getAuthorFeed
15
func HandleGetAuthorFeed(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
16
actorParam := c.QueryParam("actor")
17
if actorParam == "" {
18
-
return c.JSON(http.StatusBadRequest, map[string]interface{}{
19
"error": "InvalidRequest",
20
"message": "actor parameter is required",
21
})
···
49
// Resolve actor to DID
50
did, err := hydrator.ResolveDID(ctx, actorParam)
51
if err != nil {
52
-
return c.JSON(http.StatusBadRequest, map[string]interface{}{
53
"error": "ActorNotFound",
54
"message": "actor not found",
55
})
···
87
`
88
}
89
90
-
type postRow struct {
91
-
URI string
92
-
AuthorID uint
93
-
}
94
var rows []postRow
95
if err := db.Raw(query, did, cursor, limit).Scan(&rows).Error; err != nil {
96
-
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
97
"error": "InternalError",
98
"message": "failed to query author feed",
99
})
100
}
101
102
-
// Hydrate posts
103
-
feed := make([]interface{}, 0)
104
-
for _, row := range rows {
105
-
postInfo, err := hydrator.HydratePost(ctx, row.URI, viewer)
106
-
if err != nil {
107
-
continue
108
-
}
109
-
110
-
// Hydrate author
111
-
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
112
-
if err != nil {
113
-
continue
114
-
}
115
-
116
-
feedItem := views.FeedViewPost(postInfo, authorInfo)
117
-
feed = append(feed, feedItem)
118
-
}
119
120
// Generate next cursor
121
var nextCursor string
···
130
}
131
}
132
133
-
return c.JSON(http.StatusOK, map[string]interface{}{
134
"feed": feed,
135
"cursor": nextCursor,
136
})
137
}
···
1
package feed
2
3
import (
4
+
"context"
5
+
"log/slog"
6
"net/http"
7
"strconv"
8
+
"sync"
9
"time"
10
11
+
"github.com/bluesky-social/indigo/api/bsky"
12
"github.com/labstack/echo/v4"
13
"github.com/whyrusleeping/konbini/hydration"
14
"github.com/whyrusleeping/konbini/views"
15
"gorm.io/gorm"
16
)
17
18
+
type postRow struct {
19
+
URI string
20
+
AuthorID uint
21
+
}
22
+
23
// HandleGetAuthorFeed implements app.bsky.feed.getAuthorFeed
24
func HandleGetAuthorFeed(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
25
actorParam := c.QueryParam("actor")
26
if actorParam == "" {
27
+
return c.JSON(http.StatusBadRequest, map[string]any{
28
"error": "InvalidRequest",
29
"message": "actor parameter is required",
30
})
···
58
// Resolve actor to DID
59
did, err := hydrator.ResolveDID(ctx, actorParam)
60
if err != nil {
61
+
return c.JSON(http.StatusBadRequest, map[string]any{
62
"error": "ActorNotFound",
63
"message": "actor not found",
64
})
···
96
`
97
}
98
99
var rows []postRow
100
if err := db.Raw(query, did, cursor, limit).Scan(&rows).Error; err != nil {
101
+
return c.JSON(http.StatusInternalServerError, map[string]any{
102
"error": "InternalError",
103
"message": "failed to query author feed",
104
})
105
}
106
107
+
feed := hydratePostRows(ctx, hydrator, viewer, rows)
108
109
// Generate next cursor
110
var nextCursor string
···
119
}
120
}
121
122
+
return c.JSON(http.StatusOK, map[string]any{
123
"feed": feed,
124
"cursor": nextCursor,
125
})
126
}
127
+
128
+
func hydratePostRows(ctx context.Context, hydrator *hydration.Hydrator, viewer string, rows []postRow) []*bsky.FeedDefs_FeedViewPost {
129
+
// Hydrate posts
130
+
var wg sync.WaitGroup
131
+
132
+
var outLk sync.Mutex
133
+
feed := make([]*bsky.FeedDefs_FeedViewPost, len(rows))
134
+
for i, row := range rows {
135
+
wg.Add(1)
136
+
go func(i int, row postRow) {
137
+
defer wg.Done()
138
+
139
+
postInfo, err := hydrator.HydratePost(ctx, row.URI, viewer)
140
+
if err != nil {
141
+
slog.Error("failed to hydrate post", "uri", row.URI, "error", err)
142
+
return
143
+
}
144
+
145
+
// Hydrate author
146
+
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
147
+
if err != nil {
148
+
slog.Error("failed to hydrate actor", "actor", postInfo.Author, "error", err)
149
+
return
150
+
}
151
+
152
+
feedItem := views.FeedViewPost(postInfo, authorInfo)
153
+
outLk.Lock()
154
+
feed[i] = feedItem
155
+
outLk.Unlock()
156
+
}(i, row)
157
+
}
158
+
wg.Wait()
159
+
160
+
x := 0
161
+
for i := 0; i < len(feed); i++ {
162
+
if feed[i] != nil {
163
+
feed[x] = feed[i]
164
+
x++
165
+
continue
166
+
}
167
+
}
168
+
feed = feed[:x]
169
+
170
+
return feed
171
+
}
+158
xrpc/feed/getFeed.go
+158
xrpc/feed/getFeed.go
···
···
1
+
package feed
2
+
3
+
import (
4
+
"bytes"
5
+
"log/slog"
6
+
"net/http"
7
+
"strconv"
8
+
9
+
"github.com/bluesky-social/indigo/api/bsky"
10
+
"github.com/bluesky-social/indigo/atproto/identity"
11
+
"github.com/bluesky-social/indigo/atproto/syntax"
12
+
"github.com/bluesky-social/indigo/xrpc"
13
+
"github.com/labstack/echo/v4"
14
+
"github.com/whyrusleeping/konbini/hydration"
15
+
"github.com/whyrusleeping/konbini/views"
16
+
"github.com/whyrusleeping/market/models"
17
+
"gorm.io/gorm"
18
+
)
19
+
20
+
// HandleGetFeed implements app.bsky.feed.getFeed
21
+
// Gets posts from a custom feed generator
22
+
func HandleGetFeed(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator, dir identity.Directory) error {
23
+
// Parse parameters
24
+
feedURI := c.QueryParam("feed")
25
+
if feedURI == "" {
26
+
return c.JSON(http.StatusBadRequest, map[string]interface{}{
27
+
"error": "InvalidRequest",
28
+
"message": "feed parameter is required",
29
+
})
30
+
}
31
+
32
+
// Parse limit
33
+
limit := int64(50)
34
+
if limitParam := c.QueryParam("limit"); limitParam != "" {
35
+
if l, err := strconv.ParseInt(limitParam, 10, 64); err == nil && l > 0 && l <= 100 {
36
+
limit = l
37
+
}
38
+
}
39
+
40
+
// Parse cursor
41
+
cursor := c.QueryParam("cursor")
42
+
43
+
ctx := c.Request().Context()
44
+
viewer := getUserDID(c)
45
+
46
+
// Extract feed generator DID and rkey from URI
47
+
// URI format: at://did:plc:xxx/app.bsky.feed.generator/rkey
48
+
did := extractDIDFromURI(feedURI)
49
+
rkey := extractRkeyFromURI(feedURI)
50
+
51
+
if did == "" || rkey == "" {
52
+
return c.JSON(http.StatusBadRequest, map[string]interface{}{
53
+
"error": "InvalidRequest",
54
+
"message": "invalid feed URI format",
55
+
})
56
+
}
57
+
58
+
// Check if feed generator exists in database
59
+
var feedGen models.FeedGenerator
60
+
if err := db.Raw(`
61
+
SELECT * FROM feed_generators fg WHERE fg.author = (select id from repos where did = ?) AND fg.rkey = ?
62
+
`, did, rkey).Scan(&feedGen).Error; err != nil {
63
+
return err
64
+
}
65
+
66
+
if feedGen.ID == 0 {
67
+
hydrator.AddMissingFeedGenerator(feedURI)
68
+
return c.JSON(http.StatusNotFound, map[string]interface{}{
69
+
"error": "NotFound",
70
+
"message": "feed generator not found",
71
+
})
72
+
}
73
+
74
+
// Decode the feed generator record to get the service DID
75
+
var feedGenRecord bsky.FeedGenerator
76
+
if err := feedGenRecord.UnmarshalCBOR(bytes.NewReader(feedGen.Raw)); err != nil {
77
+
slog.Error("failed to decode feed generator record", "error", err)
78
+
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
79
+
"error": "InternalError",
80
+
"message": "failed to decode feed generator record",
81
+
})
82
+
}
83
+
84
+
// Parse the service DID
85
+
serviceDID, err := syntax.ParseDID(feedGenRecord.Did)
86
+
if err != nil {
87
+
slog.Error("invalid service DID in feed generator", "error", err, "did", feedGenRecord.Did)
88
+
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
89
+
"error": "InternalError",
90
+
"message": "invalid service DID",
91
+
})
92
+
}
93
+
94
+
// Resolve the service DID to get its endpoint
95
+
serviceIdent, err := dir.LookupDID(ctx, serviceDID)
96
+
if err != nil {
97
+
slog.Error("failed to resolve service DID", "error", err, "did", serviceDID)
98
+
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
99
+
"error": "InternalError",
100
+
"message": "failed to resolve service endpoint",
101
+
})
102
+
}
103
+
104
+
serviceEndpoint := serviceIdent.GetServiceEndpoint("bsky_fg")
105
+
if serviceEndpoint == "" {
106
+
slog.Error("service has no bsky_fg endpoint", "did", serviceDID)
107
+
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
108
+
"error": "InternalError",
109
+
"message": "service has no endpoint",
110
+
})
111
+
}
112
+
113
+
// Create XRPC client for the feed generator service
114
+
client := &xrpc.Client{
115
+
Host: serviceEndpoint,
116
+
}
117
+
118
+
// Call getFeedSkeleton on the service
119
+
skeleton, err := bsky.FeedGetFeedSkeleton(ctx, client, cursor, feedURI, limit)
120
+
if err != nil {
121
+
slog.Error("failed to call getFeedSkeleton", "error", err, "service", serviceEndpoint)
122
+
// Return empty feed on error rather than failing completely
123
+
return c.JSON(http.StatusOK, &bsky.FeedGetFeed_Output{
124
+
Feed: make([]*bsky.FeedDefs_FeedViewPost, 0),
125
+
})
126
+
}
127
+
128
+
// Hydrate the posts from the skeleton
129
+
posts := make([]*bsky.FeedDefs_FeedViewPost, 0, len(skeleton.Feed))
130
+
for _, skeletonPost := range skeleton.Feed {
131
+
postURI, err := syntax.ParseATURI(skeletonPost.Post)
132
+
if err != nil {
133
+
slog.Warn("invalid post URI in skeleton", "uri", skeletonPost.Post, "error", err)
134
+
continue
135
+
}
136
+
137
+
postInfo, err := hydrator.HydratePost(ctx, string(postURI), viewer)
138
+
if err != nil {
139
+
slog.Warn("failed to hydrate post", "uri", postURI, "error", err)
140
+
continue
141
+
}
142
+
143
+
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
144
+
if err != nil {
145
+
slog.Warn("failed to hydrate author", "did", postInfo.Author, "error", err)
146
+
continue
147
+
}
148
+
149
+
posts = append(posts, views.FeedViewPost(postInfo, authorInfo))
150
+
}
151
+
152
+
output := &bsky.FeedGetFeed_Output{
153
+
Feed: posts,
154
+
Cursor: skeleton.Cursor,
155
+
}
156
+
157
+
return c.JSON(http.StatusOK, output)
158
+
}
+161
xrpc/feed/getFeedGenerator.go
+161
xrpc/feed/getFeedGenerator.go
···
···
1
+
package feed
2
+
3
+
import (
4
+
"bytes"
5
+
"log/slog"
6
+
"net/http"
7
+
"time"
8
+
9
+
"github.com/bluesky-social/indigo/api/bsky"
10
+
"github.com/bluesky-social/indigo/atproto/identity"
11
+
"github.com/bluesky-social/indigo/atproto/syntax"
12
+
cid "github.com/ipfs/go-cid"
13
+
"github.com/labstack/echo/v4"
14
+
mh "github.com/multiformats/go-multihash"
15
+
"github.com/whyrusleeping/konbini/hydration"
16
+
"github.com/whyrusleeping/konbini/views"
17
+
"gorm.io/gorm"
18
+
)
19
+
20
+
// HandleGetFeedGenerator implements app.bsky.feed.getFeedGenerator
21
+
func HandleGetFeedGenerator(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator, dir identity.Directory) error {
22
+
ctx := c.Request().Context()
23
+
24
+
// Parse parameters
25
+
feedURI := c.QueryParam("feed")
26
+
if feedURI == "" {
27
+
return c.JSON(http.StatusBadRequest, map[string]any{
28
+
"error": "InvalidRequest",
29
+
"message": "feed parameter is required",
30
+
})
31
+
}
32
+
33
+
nu, err := hydrator.NormalizeUri(ctx, feedURI)
34
+
if err != nil {
35
+
return err
36
+
}
37
+
feedURI = nu
38
+
39
+
viewer := getUserDID(c)
40
+
_ = viewer
41
+
42
+
// Extract feed generator DID and rkey from URI
43
+
did := extractDIDFromURI(feedURI)
44
+
rkey := extractRkeyFromURI(feedURI)
45
+
46
+
if did == "" || rkey == "" {
47
+
return c.JSON(http.StatusBadRequest, map[string]any{
48
+
"error": "InvalidRequest",
49
+
"message": "invalid feed URI format",
50
+
})
51
+
}
52
+
53
+
// Query feed generator from database
54
+
type feedGenRow struct {
55
+
ID uint
56
+
Did string
57
+
Raw []byte
58
+
AuthorDid string
59
+
Indexed time.Time
60
+
}
61
+
var feedGen feedGenRow
62
+
err = db.Raw(`
63
+
SELECT fg.id, fg.did, fg.raw, r.did as author_did, indexed
64
+
FROM feed_generators fg
65
+
JOIN repos r ON r.id = fg.author
66
+
WHERE r.did = ? AND fg.rkey = ?
67
+
`, did, rkey).Scan(&feedGen).Error
68
+
69
+
if err != nil || feedGen.ID == 0 {
70
+
// Track this missing feed generator for fetching
71
+
hydrator.AddMissingFeedGenerator(feedURI)
72
+
73
+
return c.JSON(http.StatusNotFound, map[string]any{
74
+
"error": "NotFound",
75
+
"message": "feed generator not found",
76
+
})
77
+
}
78
+
79
+
// Decode the feed generator record
80
+
var feedGenRecord bsky.FeedGenerator
81
+
if err := feedGenRecord.UnmarshalCBOR(bytes.NewReader(feedGen.Raw)); err != nil {
82
+
slog.Error("failed to decode feed generator record", "error", err)
83
+
return c.JSON(http.StatusInternalServerError, map[string]any{
84
+
"error": "InternalError",
85
+
"message": "failed to decode feed generator record",
86
+
})
87
+
}
88
+
89
+
// Compute CID from raw bytes
90
+
hash, err := mh.Sum(feedGen.Raw, mh.SHA2_256, -1)
91
+
if err != nil {
92
+
slog.Error("failed to hash record", "error", err)
93
+
return c.JSON(http.StatusInternalServerError, map[string]any{
94
+
"error": "InternalError",
95
+
"message": "failed to compute CID",
96
+
})
97
+
}
98
+
recordCid := cid.NewCidV1(cid.DagCBOR, hash).String()
99
+
100
+
// Hydrate the creator
101
+
creatorInfo, err := hydrator.HydrateActor(ctx, feedGen.AuthorDid)
102
+
if err != nil {
103
+
slog.Error("failed to hydrate creator", "error", err, "did", feedGen.AuthorDid)
104
+
return c.JSON(http.StatusInternalServerError, map[string]any{
105
+
"error": "InternalError",
106
+
"message": "failed to hydrate creator",
107
+
})
108
+
}
109
+
110
+
// Count likes for this feed generator
111
+
var likeCount int64
112
+
113
+
// Check if viewer has liked this feed generator
114
+
viewerLike := ""
115
+
116
+
// Validate the service DID (check if it's resolvable)
117
+
serviceDID, err := syntax.ParseDID(feedGenRecord.Did)
118
+
if err != nil {
119
+
slog.Error("invalid service DID in feed generator", "error", err, "did", feedGenRecord.Did)
120
+
return c.JSON(http.StatusInternalServerError, map[string]any{
121
+
"error": "InternalError",
122
+
"message": "invalid service DID",
123
+
})
124
+
}
125
+
126
+
// Try to resolve the service DID to check if it's online/valid
127
+
isOnline := true
128
+
isValid := true
129
+
serviceIdent, err := dir.LookupDID(ctx, serviceDID)
130
+
if err != nil {
131
+
slog.Warn("failed to resolve service DID", "error", err, "did", serviceDID)
132
+
isOnline = false
133
+
isValid = false
134
+
} else {
135
+
// Check if service has an endpoint
136
+
serviceEndpoint := serviceIdent.PDSEndpoint()
137
+
if serviceEndpoint == "" {
138
+
slog.Warn("service has no PDS endpoint", "did", serviceDID)
139
+
isValid = false
140
+
}
141
+
}
142
+
143
+
// Build the generator view
144
+
generatorView := views.GeneratorView(
145
+
feedURI,
146
+
recordCid,
147
+
&feedGenRecord,
148
+
creatorInfo,
149
+
likeCount,
150
+
viewerLike,
151
+
feedGen.Indexed.Format(time.RFC3339),
152
+
)
153
+
154
+
output := &bsky.FeedGetFeedGenerator_Output{
155
+
View: generatorView,
156
+
IsOnline: isOnline,
157
+
IsValid: isValid,
158
+
}
159
+
160
+
return c.JSON(http.StatusOK, output)
161
+
}
+5
-27
xrpc/feed/getTimeline.go
+5
-27
xrpc/feed/getTimeline.go
···
1
package feed
2
3
import (
4
-
"log/slog"
5
"net/http"
6
"strconv"
7
"time"
8
9
"github.com/labstack/echo/v4"
10
"github.com/whyrusleeping/konbini/hydration"
11
-
"github.com/whyrusleeping/konbini/views"
12
"gorm.io/gorm"
13
)
14
···
16
func HandleGetTimeline(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
17
viewer := getUserDID(c)
18
if viewer == "" {
19
-
return c.JSON(http.StatusUnauthorized, map[string]interface{}{
20
"error": "AuthenticationRequired",
21
"message": "authentication required",
22
})
···
43
// Get viewer's repo ID
44
var viewerRepoID uint
45
if err := db.Raw("SELECT id FROM repos WHERE did = ?", viewer).Scan(&viewerRepoID).Error; err != nil {
46
-
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
47
"error": "InternalError",
48
"message": "failed to load viewer",
49
})
50
}
51
52
// Query posts from followed users
53
-
type postRow struct {
54
-
URI string
55
-
AuthorID uint
56
-
}
57
var rows []postRow
58
err := db.Raw(`
59
SELECT
···
70
`, viewerRepoID, cursor, limit).Scan(&rows).Error
71
72
if err != nil {
73
-
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
74
"error": "InternalError",
75
"message": "failed to query timeline",
76
})
77
}
78
79
// Hydrate posts
80
-
feed := make([]interface{}, 0)
81
-
for _, row := range rows {
82
-
postInfo, err := hydrator.HydratePost(ctx, row.URI, viewer)
83
-
if err != nil {
84
-
continue
85
-
}
86
-
87
-
// Hydrate author
88
-
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
89
-
if err != nil {
90
-
slog.Error("failed to hydrate actor", "author", postInfo.Author, "error", err)
91
-
continue
92
-
}
93
-
94
-
feedItem := views.FeedViewPost(postInfo, authorInfo)
95
-
feed = append(feed, feedItem)
96
-
}
97
98
// Generate next cursor
99
var nextCursor string
···
111
}
112
}
113
114
-
return c.JSON(http.StatusOK, map[string]interface{}{
115
"feed": feed,
116
"cursor": nextCursor,
117
})
···
1
package feed
2
3
import (
4
"net/http"
5
"strconv"
6
"time"
7
8
"github.com/labstack/echo/v4"
9
"github.com/whyrusleeping/konbini/hydration"
10
"gorm.io/gorm"
11
)
12
···
14
func HandleGetTimeline(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
15
viewer := getUserDID(c)
16
if viewer == "" {
17
+
return c.JSON(http.StatusUnauthorized, map[string]any{
18
"error": "AuthenticationRequired",
19
"message": "authentication required",
20
})
···
41
// Get viewer's repo ID
42
var viewerRepoID uint
43
if err := db.Raw("SELECT id FROM repos WHERE did = ?", viewer).Scan(&viewerRepoID).Error; err != nil {
44
+
return c.JSON(http.StatusInternalServerError, map[string]any{
45
"error": "InternalError",
46
"message": "failed to load viewer",
47
})
48
}
49
50
// Query posts from followed users
51
var rows []postRow
52
err := db.Raw(`
53
SELECT
···
64
`, viewerRepoID, cursor, limit).Scan(&rows).Error
65
66
if err != nil {
67
+
return c.JSON(http.StatusInternalServerError, map[string]any{
68
"error": "InternalError",
69
"message": "failed to query timeline",
70
})
71
}
72
73
// Hydrate posts
74
+
feed := hydratePostRows(ctx, hydrator, viewer, rows)
75
76
// Generate next cursor
77
var nextCursor string
···
89
}
90
}
91
92
+
return c.JSON(http.StatusOK, map[string]any{
93
"feed": feed,
94
"cursor": nextCursor,
95
})
+150
-20
xrpc/notification/listNotifications.go
+150
-20
xrpc/notification/listNotifications.go
···
1
package notification
2
3
import (
4
"net/http"
5
"strconv"
6
7
"github.com/labstack/echo/v4"
8
"github.com/whyrusleeping/konbini/hydration"
9
"github.com/whyrusleeping/konbini/views"
10
"gorm.io/gorm"
11
)
12
···
84
}
85
86
// Hydrate notifications
87
-
notifications := make([]interface{}, 0)
88
for _, row := range rows {
89
authorInfo, err := hydrator.HydrateActor(ctx, row.AuthorDid)
90
if err != nil {
91
continue
92
}
93
94
-
notif := map[string]interface{}{
95
-
"uri": row.Source,
96
-
"author": views.ProfileView(authorInfo),
97
-
"reason": mapNotifKind(row.Kind),
98
-
"record": nil, // Could hydrate the source record here
99
-
"isRead": false,
100
-
"indexedAt": row.CreatedAt,
101
-
"labels": []interface{}{},
102
}
103
104
-
// Only include CID if we have one (required field)
105
-
if row.SourceCid != "" {
106
-
notif["cid"] = row.SourceCid
107
-
} else {
108
-
// Skip notifications without CIDs as they're invalid
109
continue
110
}
111
112
notifications = append(notifications, notif)
113
}
114
115
// Generate next cursor
116
-
var nextCursor string
117
if len(rows) > 0 {
118
-
nextCursor = strconv.FormatUint(uint64(rows[len(rows)-1].ID), 10)
119
}
120
121
-
return c.JSON(http.StatusOK, map[string]interface{}{
122
-
"notifications": notifications,
123
-
"cursor": nextCursor,
124
-
})
125
}
126
127
// HandleGetUnreadCount implements app.bsky.notification.getUnreadCount
···
175
return "repost"
176
case "mention":
177
return "mention"
178
default:
179
return kind
180
}
181
}
···
1
package notification
2
3
import (
4
+
"bytes"
5
+
"fmt"
6
"net/http"
7
"strconv"
8
+
"time"
9
10
+
"github.com/bluesky-social/indigo/api/atproto"
11
+
"github.com/bluesky-social/indigo/api/bsky"
12
+
"github.com/bluesky-social/indigo/lex/util"
13
+
lexutil "github.com/bluesky-social/indigo/lex/util"
14
"github.com/labstack/echo/v4"
15
"github.com/whyrusleeping/konbini/hydration"
16
"github.com/whyrusleeping/konbini/views"
17
+
"github.com/whyrusleeping/market/models"
18
"gorm.io/gorm"
19
)
20
···
92
}
93
94
// Hydrate notifications
95
+
notifications := make([]*bsky.NotificationListNotifications_Notification, 0)
96
for _, row := range rows {
97
authorInfo, err := hydrator.HydrateActor(ctx, row.AuthorDid)
98
if err != nil {
99
continue
100
}
101
102
+
// Skip notifications without CIDs as they're invalid
103
+
if row.SourceCid == "" {
104
+
continue
105
}
106
107
+
// Fetch and decode the raw record
108
+
recordDecoder, err := fetchNotificationRecord(db, row.Source, row.Kind)
109
+
if err != nil {
110
continue
111
}
112
113
+
notif := &bsky.NotificationListNotifications_Notification{
114
+
Uri: row.Source,
115
+
Cid: row.SourceCid,
116
+
Author: views.ProfileView(authorInfo),
117
+
Reason: mapNotifKind(row.Kind),
118
+
Record: recordDecoder,
119
+
IsRead: false,
120
+
IndexedAt: row.CreatedAt,
121
+
}
122
+
123
notifications = append(notifications, notif)
124
}
125
126
// Generate next cursor
127
+
var cursorPtr *string
128
if len(rows) > 0 {
129
+
cursor := strconv.FormatUint(uint64(rows[len(rows)-1].ID), 10)
130
+
cursorPtr = &cursor
131
+
}
132
+
133
+
output := &bsky.NotificationListNotifications_Output{
134
+
Notifications: notifications,
135
+
Cursor: cursorPtr,
136
}
137
138
+
return c.JSON(http.StatusOK, output)
139
}
140
141
// HandleGetUnreadCount implements app.bsky.notification.getUnreadCount
···
189
return "repost"
190
case "mention":
191
return "mention"
192
+
case "follow":
193
+
return "follow"
194
default:
195
return kind
196
}
197
}
198
+
199
+
// fetchNotificationRecord fetches and decodes the raw record for a notification
200
+
func fetchNotificationRecord(db *gorm.DB, sourceURI string, kind string) (*util.LexiconTypeDecoder, error) {
201
+
// Parse the source URI to extract DID and rkey
202
+
// URI format: at://did:plc:xxx/collection/rkey
203
+
did := extractDIDFromURI(sourceURI)
204
+
rkey := extractRkeyFromURI(sourceURI)
205
+
206
+
if did == "" || rkey == "" {
207
+
return nil, fmt.Errorf("invalid source URI")
208
+
}
209
+
210
+
var raw []byte
211
+
var err error
212
+
213
+
// Fetch raw data based on notification kind
214
+
switch kind {
215
+
case "reply", "mention", "quote":
216
+
// These reference posts
217
+
err = db.Raw(`
218
+
SELECT p.raw
219
+
FROM posts p
220
+
JOIN repos r ON r.id = p.author
221
+
WHERE r.did = ? AND p.rkey = ?
222
+
`, did, rkey).Scan(&raw).Error
223
+
224
+
case "like":
225
+
// we don't store the raw like objects, so we just reconstruct it here...
226
+
// These reference like records
227
+
var like models.Like
228
+
err = db.Raw(`
229
+
SELECT *
230
+
FROM likes l
231
+
JOIN repos r ON r.id = l.author
232
+
WHERE r.did = ? AND l.rkey = ?
233
+
`, did, rkey).Scan(&like).Error
234
+
235
+
lk := bsky.FeedLike{
236
+
CreatedAt: like.Created.Format(time.RFC3339),
237
+
Subject: &atproto.RepoStrongRef{
238
+
Cid: "",
239
+
Uri: "",
240
+
},
241
+
}
242
+
buf := new(bytes.Buffer)
243
+
if err := lk.MarshalCBOR(buf); err != nil {
244
+
return nil, fmt.Errorf("failed to marshal reconstructed like: %w", err)
245
+
}
246
+
raw = buf.Bytes()
247
+
248
+
case "repost":
249
+
// These reference repost records
250
+
err = db.Raw(`
251
+
SELECT r.raw
252
+
FROM reposts r
253
+
JOIN repos repo ON repo.id = r.author
254
+
WHERE repo.did = ? AND r.rkey = ?
255
+
`, did, rkey).Scan(&raw).Error
256
+
257
+
case "follow":
258
+
// These reference follow records
259
+
err = db.Raw(`
260
+
SELECT f.raw
261
+
FROM follows f
262
+
JOIN repos r ON r.id = f.author
263
+
WHERE r.did = ? AND f.rkey = ?
264
+
`, did, rkey).Scan(&raw).Error
265
+
266
+
default:
267
+
return nil, fmt.Errorf("unknown notification kind: %s", kind)
268
+
}
269
+
270
+
if err != nil || len(raw) == 0 {
271
+
return nil, fmt.Errorf("failed to fetch record: %w", err)
272
+
}
273
+
274
+
// Decode the CBOR data
275
+
decoded, err := lexutil.CborDecodeValue(raw)
276
+
if err != nil {
277
+
return nil, fmt.Errorf("failed to decode CBOR: %w", err)
278
+
}
279
+
280
+
return &util.LexiconTypeDecoder{
281
+
Val: decoded,
282
+
}, nil
283
+
}
284
+
285
+
func extractDIDFromURI(uri string) string {
286
+
// URI format: at://did:plc:xxx/collection/rkey
287
+
if len(uri) < 5 || uri[:5] != "at://" {
288
+
return ""
289
+
}
290
+
parts := []rune(uri[5:])
291
+
for i, r := range parts {
292
+
if r == '/' {
293
+
return string(parts[:i])
294
+
}
295
+
}
296
+
return string(parts)
297
+
}
298
+
299
+
func extractRkeyFromURI(uri string) string {
300
+
// URI format: at://did:plc:xxx/collection/rkey
301
+
if len(uri) < 5 || uri[:5] != "at://" {
302
+
return ""
303
+
}
304
+
// Find last slash
305
+
for i := len(uri) - 1; i >= 5; i-- {
306
+
if uri[i] == '/' {
307
+
return uri[i+1:]
308
+
}
309
+
}
310
+
return ""
311
+
}
+11
xrpc/server.go
+11
xrpc/server.go
···
32
// Add methods as needed for data access
33
34
TrackMissingActor(did string)
35
}
36
37
// NewServer creates a new XRPC server
···
60
}
61
62
s.hydrator.SetMissingActorCallback(backend.TrackMissingActor)
63
64
// Register XRPC endpoints
65
s.registerEndpoints()
···
124
xrpcGroup.GET("/app.bsky.feed.getActorLikes", func(c echo.Context) error {
125
return feed.HandleGetActorLikes(c, s.db, s.hydrator)
126
}, s.requireAuth)
127
128
// app.bsky.graph.*
129
xrpcGroup.GET("/app.bsky.graph.getFollows", func(c echo.Context) error {
···
166
})
167
xrpcGroup.GET("/app.bsky.unspecced.getTrendingTopics", func(c echo.Context) error {
168
return unspecced.HandleGetTrendingTopics(c)
169
})
170
}
171
···
32
// Add methods as needed for data access
33
34
TrackMissingActor(did string)
35
+
TrackMissingFeedGenerator(uri string)
36
}
37
38
// NewServer creates a new XRPC server
···
61
}
62
63
s.hydrator.SetMissingActorCallback(backend.TrackMissingActor)
64
+
s.hydrator.SetMissingFeedGeneratorCallback(backend.TrackMissingFeedGenerator)
65
66
// Register XRPC endpoints
67
s.registerEndpoints()
···
126
xrpcGroup.GET("/app.bsky.feed.getActorLikes", func(c echo.Context) error {
127
return feed.HandleGetActorLikes(c, s.db, s.hydrator)
128
}, s.requireAuth)
129
+
xrpcGroup.GET("/app.bsky.feed.getFeed", func(c echo.Context) error {
130
+
return feed.HandleGetFeed(c, s.db, s.hydrator, s.dir)
131
+
})
132
+
xrpcGroup.GET("/app.bsky.feed.getFeedGenerator", func(c echo.Context) error {
133
+
return feed.HandleGetFeedGenerator(c, s.db, s.hydrator, s.dir)
134
+
})
135
136
// app.bsky.graph.*
137
xrpcGroup.GET("/app.bsky.graph.getFollows", func(c echo.Context) error {
···
174
})
175
xrpcGroup.GET("/app.bsky.unspecced.getTrendingTopics", func(c echo.Context) error {
176
return unspecced.HandleGetTrendingTopics(c)
177
+
})
178
+
xrpcGroup.GET("/app.bsky.unspecced.getPostThreadV2", func(c echo.Context) error {
179
+
return unspecced.HandleGetPostThreadV2(c, s.db, s.hydrator)
180
})
181
}
182
+333
xrpc/unspecced/getPostThreadV2.go
+333
xrpc/unspecced/getPostThreadV2.go
···
···
1
+
package unspecced
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"log/slog"
7
+
"net/http"
8
+
"strconv"
9
+
10
+
"github.com/bluesky-social/indigo/api/bsky"
11
+
"github.com/labstack/echo/v4"
12
+
"github.com/whyrusleeping/konbini/hydration"
13
+
"github.com/whyrusleeping/konbini/views"
14
+
"gorm.io/gorm"
15
+
)
16
+
17
+
// HandleGetPostThreadV2 implements app.bsky.unspecced.getPostThreadV2
18
+
func HandleGetPostThreadV2(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
19
+
ctx := c.Request().Context()
20
+
21
+
// Parse parameters
22
+
anchorRaw := c.QueryParam("anchor")
23
+
if anchorRaw == "" {
24
+
return c.JSON(http.StatusBadRequest, map[string]interface{}{
25
+
"error": "InvalidRequest",
26
+
"message": "anchor parameter is required",
27
+
})
28
+
}
29
+
30
+
anchorUri, err := hydrator.NormalizeUri(ctx, anchorRaw)
31
+
if err != nil {
32
+
return err
33
+
}
34
+
35
+
// Parse optional parameters with defaults
36
+
above := c.QueryParam("above") != "false" // default true
37
+
38
+
below := int64(6) // default
39
+
if belowParam := c.QueryParam("below"); belowParam != "" {
40
+
if b, err := strconv.ParseInt(belowParam, 10, 64); err == nil && b >= 0 && b <= 20 {
41
+
below = b
42
+
}
43
+
}
44
+
45
+
branchingFactor := int64(10) // default
46
+
if bfParam := c.QueryParam("branchingFactor"); bfParam != "" {
47
+
if bf, err := strconv.ParseInt(bfParam, 10, 64); err == nil && bf > 0 {
48
+
branchingFactor = bf
49
+
}
50
+
}
51
+
52
+
_ = c.QueryParam("prioritizeFollowedUsers") == "true" // TODO: implement prioritization
53
+
54
+
sort := c.QueryParam("sort")
55
+
if sort == "" {
56
+
sort = "newest"
57
+
}
58
+
59
+
viewer := getUserDID(c)
60
+
61
+
// Hydrate the anchor post
62
+
anchorPostInfo, err := hydrator.HydratePost(ctx, anchorUri, viewer)
63
+
if err != nil {
64
+
slog.Error("failed to hydrate post", "error", err, "anchor", anchorUri)
65
+
return c.JSON(http.StatusNotFound, map[string]interface{}{
66
+
"error": "NotFound",
67
+
"message": "anchor post not found",
68
+
})
69
+
}
70
+
71
+
// Determine the root post ID for the thread
72
+
rootPostID := anchorPostInfo.InThread
73
+
if rootPostID == 0 {
74
+
// This post is the root - get its ID
75
+
var postID uint
76
+
db.Raw(`
77
+
SELECT id FROM posts
78
+
WHERE author = (SELECT id FROM repos WHERE did = ?)
79
+
AND rkey = ?
80
+
`, extractDIDFromURI(anchorUri), extractRkeyFromURI(anchorUri)).Scan(&postID)
81
+
rootPostID = postID
82
+
}
83
+
84
+
// Query all posts in this thread
85
+
type threadPostRow struct {
86
+
ID uint
87
+
Rkey string
88
+
ReplyTo uint
89
+
InThread uint
90
+
AuthorDid string
91
+
}
92
+
var threadPosts []threadPostRow
93
+
db.Raw(`
94
+
SELECT p.id, p.rkey, p.reply_to, p.in_thread, r.did as author_did
95
+
FROM posts p
96
+
JOIN repos r ON r.id = p.author
97
+
WHERE (p.id = ? OR p.in_thread = ?)
98
+
AND p.not_found = false
99
+
ORDER BY p.created ASC
100
+
`, rootPostID, rootPostID).Scan(&threadPosts)
101
+
102
+
// Build a map of posts by ID
103
+
postsByID := make(map[uint]*threadNode)
104
+
for _, tp := range threadPosts {
105
+
uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", tp.AuthorDid, tp.Rkey)
106
+
postsByID[tp.ID] = &threadNode{
107
+
id: tp.ID,
108
+
uri: uri,
109
+
replyTo: tp.ReplyTo,
110
+
inThread: tp.InThread,
111
+
children: []*threadNode{},
112
+
}
113
+
}
114
+
115
+
// Build parent-child relationships
116
+
for _, node := range postsByID {
117
+
if node.replyTo != 0 {
118
+
parent := postsByID[node.replyTo]
119
+
if parent != nil {
120
+
parent.children = append(parent.children, node)
121
+
}
122
+
}
123
+
}
124
+
125
+
// Find the anchor node
126
+
anchorID := uint(0)
127
+
for id, node := range postsByID {
128
+
if node.uri == anchorUri {
129
+
anchorID = id
130
+
break
131
+
}
132
+
}
133
+
134
+
if anchorID == 0 {
135
+
return c.JSON(http.StatusNotFound, map[string]interface{}{
136
+
"error": "NotFound",
137
+
"message": "anchor post not found in thread",
138
+
})
139
+
}
140
+
141
+
anchorNode := postsByID[anchorID]
142
+
143
+
// Build flat thread items list
144
+
var threadItems []*bsky.UnspeccedGetPostThreadV2_ThreadItem
145
+
hasOtherReplies := false
146
+
147
+
// Add parents if requested
148
+
if above {
149
+
parents := collectParents(anchorNode, postsByID)
150
+
for i := len(parents) - 1; i >= 0; i-- {
151
+
depth := int64(-(len(parents) - i))
152
+
item := buildThreadItem(ctx, hydrator, parents[i], depth, viewer)
153
+
if item != nil {
154
+
threadItems = append(threadItems, item)
155
+
}
156
+
}
157
+
}
158
+
159
+
// Add anchor post (depth 0)
160
+
anchorItem := buildThreadItem(ctx, hydrator, anchorNode, 0, viewer)
161
+
if anchorItem != nil {
162
+
threadItems = append(threadItems, anchorItem)
163
+
}
164
+
165
+
// Add replies below anchor
166
+
if below > 0 {
167
+
replies, hasMore := collectReplies(ctx, hydrator, anchorNode, 1, below, branchingFactor, sort, viewer)
168
+
threadItems = append(threadItems, replies...)
169
+
hasOtherReplies = hasMore
170
+
}
171
+
172
+
return c.JSON(http.StatusOK, &bsky.UnspeccedGetPostThreadV2_Output{
173
+
Thread: threadItems,
174
+
HasOtherReplies: hasOtherReplies,
175
+
})
176
+
}
177
+
178
+
type threadNode struct {
179
+
id uint
180
+
uri string
181
+
replyTo uint
182
+
inThread uint
183
+
children []*threadNode
184
+
}
185
+
186
+
func collectParents(node *threadNode, allNodes map[uint]*threadNode) []*threadNode {
187
+
var parents []*threadNode
188
+
current := node
189
+
for current.replyTo != 0 {
190
+
parent := allNodes[current.replyTo]
191
+
if parent == nil {
192
+
break
193
+
}
194
+
parents = append(parents, parent)
195
+
current = parent
196
+
}
197
+
return parents
198
+
}
199
+
200
+
func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, currentDepth, maxDepth, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, bool) {
201
+
var items []*bsky.UnspeccedGetPostThreadV2_ThreadItem
202
+
hasMore := false
203
+
204
+
if currentDepth > maxDepth {
205
+
return items, false
206
+
}
207
+
208
+
// Sort children based on sort parameter
209
+
children := node.children
210
+
// TODO: Actually sort based on the sort parameter (newest/oldest/top)
211
+
// For now, just use the order we have
212
+
213
+
// Limit to branchingFactor
214
+
limit := int(branchingFactor)
215
+
if len(children) > limit {
216
+
hasMore = true
217
+
children = children[:limit]
218
+
}
219
+
220
+
for _, child := range children {
221
+
item := buildThreadItem(ctx, hydrator, child, currentDepth, viewer)
222
+
if item != nil {
223
+
items = append(items, item)
224
+
225
+
// Recursively collect replies
226
+
if currentDepth < maxDepth {
227
+
childReplies, childHasMore := collectReplies(ctx, hydrator, child, currentDepth+1, maxDepth, branchingFactor, sort, viewer)
228
+
items = append(items, childReplies...)
229
+
if childHasMore {
230
+
hasMore = true
231
+
}
232
+
}
233
+
}
234
+
}
235
+
236
+
return items, hasMore
237
+
}
238
+
239
+
func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem {
240
+
// Hydrate the post
241
+
postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer)
242
+
if err != nil {
243
+
// Return not found item
244
+
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
245
+
Depth: depth,
246
+
Uri: node.uri,
247
+
Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{
248
+
UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{
249
+
LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound",
250
+
},
251
+
},
252
+
}
253
+
}
254
+
255
+
// Hydrate author
256
+
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
257
+
if err != nil {
258
+
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
259
+
Depth: depth,
260
+
Uri: node.uri,
261
+
Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{
262
+
UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{
263
+
LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound",
264
+
},
265
+
},
266
+
}
267
+
}
268
+
269
+
// Build post view
270
+
postView := views.PostView(postInfo, authorInfo)
271
+
272
+
// Calculate moreReplies count
273
+
moreReplies := int64(0)
274
+
if len(node.children) > 0 {
275
+
// This is a simplified calculation - actual count would need more complex logic
276
+
moreReplies = int64(len(node.children))
277
+
}
278
+
279
+
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
280
+
Depth: depth,
281
+
Uri: node.uri,
282
+
Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{
283
+
UnspeccedDefs_ThreadItemPost: &bsky.UnspeccedDefs_ThreadItemPost{
284
+
LexiconTypeID: "app.bsky.unspecced.defs#threadItemPost",
285
+
Post: postView,
286
+
HiddenByThreadgate: false,
287
+
MoreParents: false,
288
+
MoreReplies: moreReplies,
289
+
MutedByViewer: false,
290
+
OpThread: false, // TODO: Calculate this properly
291
+
},
292
+
},
293
+
}
294
+
}
295
+
296
+
func getUserDID(c echo.Context) string {
297
+
did := c.Get("viewer")
298
+
if did == nil {
299
+
return ""
300
+
}
301
+
if s, ok := did.(string); ok {
302
+
return s
303
+
}
304
+
return ""
305
+
}
306
+
307
+
func extractDIDFromURI(uri string) string {
308
+
// URI format: at://did:plc:xxx/collection/rkey
309
+
if len(uri) < 5 || uri[:5] != "at://" {
310
+
return ""
311
+
}
312
+
parts := []rune(uri[5:])
313
+
for i, r := range parts {
314
+
if r == '/' {
315
+
return string(parts[:i])
316
+
}
317
+
}
318
+
return string(parts)
319
+
}
320
+
321
+
func extractRkeyFromURI(uri string) string {
322
+
// URI format: at://did:plc:xxx/collection/rkey
323
+
if len(uri) < 5 || uri[:5] != "at://" {
324
+
return ""
325
+
}
326
+
// Find last slash
327
+
for i := len(uri) - 1; i >= 5; i-- {
328
+
if uri[i] == '/' {
329
+
return uri[i+1:]
330
+
}
331
+
}
332
+
return ""
333
+
}