A locally focused bluesky appview
1package backend
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log/slog"
8
9 "github.com/bluesky-social/indigo/api/atproto"
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 xrpclib "github.com/bluesky-social/indigo/xrpc"
13 "github.com/ipfs/go-cid"
14)
15
16type MissingRecordType string
17
18const (
19 MissingRecordTypeProfile MissingRecordType = "profile"
20 MissingRecordTypePost MissingRecordType = "post"
21 MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator"
22 MissingRecordTypeUnknown MissingRecordType = "unknown"
23)
24
25type MissingRecord struct {
26 Type MissingRecordType
27 Identifier string // DID for profiles, AT-URI for posts/feedgens
28 Wait bool
29
30 waitch chan struct{}
31}
32
33func (b *PostgresBackend) addMissingRecord(ctx context.Context, rec MissingRecord) {
34 if rec.Wait {
35 rec.waitch = make(chan struct{})
36 }
37
38 select {
39 case b.missingRecords <- rec:
40 case <-ctx.Done():
41 }
42
43 if rec.Wait {
44 select {
45 case <-rec.waitch:
46 case <-ctx.Done():
47 }
48 }
49}
50
51func (b *PostgresBackend) missingRecordFetcher() {
52 for rec := range b.missingRecords {
53 var err error
54 switch rec.Type {
55 case MissingRecordTypeProfile:
56 err = b.fetchMissingProfile(context.TODO(), rec.Identifier)
57 case MissingRecordTypePost:
58 err = b.fetchMissingPost(context.TODO(), rec.Identifier)
59 case MissingRecordTypeFeedGenerator:
60 err = b.fetchMissingFeedGenerator(context.TODO(), rec.Identifier)
61 default:
62 slog.Error("unknown missing record type", "type", rec.Type)
63 continue
64 }
65
66 if err != nil {
67 slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err)
68 }
69
70 if rec.Wait {
71 close(rec.waitch)
72 }
73 }
74}
75
76func (b *PostgresBackend) fetchMissingProfile(ctx context.Context, did string) error {
77 b.AddRelevantDid(did)
78
79 repo, err := b.GetOrCreateRepo(ctx, did)
80 if err != nil {
81 return err
82 }
83
84 resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
85 if err != nil {
86 return err
87 }
88
89 c := &xrpclib.Client{
90 Host: resp.PDSEndpoint(),
91 }
92
93 rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self")
94 if err != nil {
95 return err
96 }
97
98 prof, ok := rec.Value.Val.(*bsky.ActorProfile)
99 if !ok {
100 return fmt.Errorf("record we got back wasnt a profile somehow")
101 }
102
103 buf := new(bytes.Buffer)
104 if err := prof.MarshalCBOR(buf); err != nil {
105 return err
106 }
107
108 cc, err := cid.Decode(*rec.Cid)
109 if err != nil {
110 return err
111 }
112
113 return b.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
114}
115
116func (b *PostgresBackend) fetchMissingPost(ctx context.Context, uri string) error {
117 puri, err := syntax.ParseATURI(uri)
118 if err != nil {
119 return fmt.Errorf("invalid AT URI: %s", uri)
120 }
121
122 did := puri.Authority().String()
123 collection := puri.Collection().String()
124 rkey := puri.RecordKey().String()
125
126 b.AddRelevantDid(did)
127
128 repo, err := b.GetOrCreateRepo(ctx, did)
129 if err != nil {
130 return err
131 }
132
133 resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
134 if err != nil {
135 return err
136 }
137
138 c := &xrpclib.Client{
139 Host: resp.PDSEndpoint(),
140 }
141
142 rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
143 if err != nil {
144 return err
145 }
146
147 post, ok := rec.Value.Val.(*bsky.FeedPost)
148 if !ok {
149 return fmt.Errorf("record we got back wasn't a post somehow")
150 }
151
152 buf := new(bytes.Buffer)
153 if err := post.MarshalCBOR(buf); err != nil {
154 return err
155 }
156
157 cc, err := cid.Decode(*rec.Cid)
158 if err != nil {
159 return err
160 }
161
162 return b.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
163}
164
165func (b *PostgresBackend) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
166 puri, err := syntax.ParseATURI(uri)
167 if err != nil {
168 return fmt.Errorf("invalid AT URI: %s", uri)
169 }
170
171 did := puri.Authority().String()
172 collection := puri.Collection().String()
173 rkey := puri.RecordKey().String()
174 b.AddRelevantDid(did)
175
176 repo, err := b.GetOrCreateRepo(ctx, did)
177 if err != nil {
178 return err
179 }
180
181 resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
182 if err != nil {
183 return err
184 }
185
186 c := &xrpclib.Client{
187 Host: resp.PDSEndpoint(),
188 }
189
190 rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
191 if err != nil {
192 return err
193 }
194
195 feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator)
196 if !ok {
197 return fmt.Errorf("record we got back wasn't a feed generator somehow")
198 }
199
200 buf := new(bytes.Buffer)
201 if err := feedGen.MarshalCBOR(buf); err != nil {
202 return err
203 }
204
205 cc, err := cid.Decode(*rec.Cid)
206 if err != nil {
207 return err
208 }
209
210 return b.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc)
211}