A locally focused bluesky appview
1package main
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log/slog"
8
9 "github.com/bluesky-social/indigo/api/atproto"
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 xrpclib "github.com/bluesky-social/indigo/xrpc"
13 "github.com/ipfs/go-cid"
14)
15
16type MissingRecordType string
17
18const (
19 MissingRecordTypeProfile MissingRecordType = "profile"
20 MissingRecordTypePost MissingRecordType = "post"
21 MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator"
22)
23
24type MissingRecord struct {
25 Type MissingRecordType
26 Identifier string // DID for profiles, AT-URI for posts/feedgens
27 Wait bool
28
29 waitch chan struct{}
30}
31
32func (s *Server) addMissingRecord(ctx context.Context, rec MissingRecord) {
33 if rec.Wait {
34 rec.waitch = make(chan struct{})
35 }
36
37 select {
38 case s.missingRecords <- rec:
39 case <-ctx.Done():
40 }
41
42 if rec.Wait {
43 select {
44 case <-rec.waitch:
45 case <-ctx.Done():
46 }
47 }
48}
49
50// Legacy methods for backward compatibility
51func (s *Server) addMissingProfile(ctx context.Context, did string) {
52 s.addMissingRecord(ctx, MissingRecord{
53 Type: MissingRecordTypeProfile,
54 Identifier: did,
55 })
56}
57
58func (s *Server) addMissingPost(ctx context.Context, uri string) {
59 slog.Info("adding missing post to fetch queue", "uri", uri)
60 s.addMissingRecord(ctx, MissingRecord{
61 Type: MissingRecordTypePost,
62 Identifier: uri,
63 })
64}
65
66func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) {
67 slog.Info("adding missing feed generator to fetch queue", "uri", uri)
68 s.addMissingRecord(ctx, MissingRecord{
69 Type: MissingRecordTypeFeedGenerator,
70 Identifier: uri,
71 })
72}
73
74func (s *Server) missingRecordFetcher() {
75 for rec := range s.missingRecords {
76 var err error
77 switch rec.Type {
78 case MissingRecordTypeProfile:
79 err = s.fetchMissingProfile(context.TODO(), rec.Identifier)
80 case MissingRecordTypePost:
81 err = s.fetchMissingPost(context.TODO(), rec.Identifier)
82 case MissingRecordTypeFeedGenerator:
83 err = s.fetchMissingFeedGenerator(context.TODO(), rec.Identifier)
84 default:
85 slog.Error("unknown missing record type", "type", rec.Type)
86 continue
87 }
88
89 if err != nil {
90 slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err)
91 }
92
93 if rec.Wait {
94 close(rec.waitch)
95 }
96 }
97}
98
99func (s *Server) fetchMissingProfile(ctx context.Context, did string) error {
100 s.backend.addRelevantDid(did)
101
102 repo, err := s.backend.getOrCreateRepo(ctx, did)
103 if err != nil {
104 return err
105 }
106
107 resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
108 if err != nil {
109 return err
110 }
111
112 c := &xrpclib.Client{
113 Host: resp.PDSEndpoint(),
114 }
115
116 rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self")
117 if err != nil {
118 return err
119 }
120
121 prof, ok := rec.Value.Val.(*bsky.ActorProfile)
122 if !ok {
123 return fmt.Errorf("record we got back wasnt a profile somehow")
124 }
125
126 buf := new(bytes.Buffer)
127 if err := prof.MarshalCBOR(buf); err != nil {
128 return err
129 }
130
131 cc, err := cid.Decode(*rec.Cid)
132 if err != nil {
133 return err
134 }
135
136 return s.backend.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
137}
138
139func (s *Server) fetchMissingPost(ctx context.Context, uri string) error {
140 puri, err := syntax.ParseATURI(uri)
141 if err != nil {
142 return fmt.Errorf("invalid AT URI: %s", uri)
143 }
144
145 did := puri.Authority().String()
146 collection := puri.Collection().String()
147 rkey := puri.RecordKey().String()
148
149 s.backend.addRelevantDid(did)
150
151 repo, err := s.backend.getOrCreateRepo(ctx, did)
152 if err != nil {
153 return err
154 }
155
156 resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
157 if err != nil {
158 return err
159 }
160
161 c := &xrpclib.Client{
162 Host: resp.PDSEndpoint(),
163 }
164
165 rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
166 if err != nil {
167 return err
168 }
169
170 post, ok := rec.Value.Val.(*bsky.FeedPost)
171 if !ok {
172 return fmt.Errorf("record we got back wasn't a post somehow")
173 }
174
175 buf := new(bytes.Buffer)
176 if err := post.MarshalCBOR(buf); err != nil {
177 return err
178 }
179
180 cc, err := cid.Decode(*rec.Cid)
181 if err != nil {
182 return err
183 }
184
185 return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
186}
187
188func (s *Server) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
189 puri, err := syntax.ParseATURI(uri)
190 if err != nil {
191 return fmt.Errorf("invalid AT URI: %s", uri)
192 }
193
194 did := puri.Authority().String()
195 collection := puri.Collection().String()
196 rkey := puri.RecordKey().String()
197 s.backend.addRelevantDid(did)
198
199 repo, err := s.backend.getOrCreateRepo(ctx, did)
200 if err != nil {
201 return err
202 }
203
204 resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
205 if err != nil {
206 return err
207 }
208
209 c := &xrpclib.Client{
210 Host: resp.PDSEndpoint(),
211 }
212
213 rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
214 if err != nil {
215 return err
216 }
217
218 feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator)
219 if !ok {
220 return fmt.Errorf("record we got back wasn't a feed generator somehow")
221 }
222
223 buf := new(bytes.Buffer)
224 if err := feedGen.MarshalCBOR(buf); err != nil {
225 return err
226 }
227
228 cc, err := cid.Decode(*rec.Cid)
229 if err != nil {
230 return err
231 }
232
233 return s.backend.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc)
234}