A locally focused bluesky appview
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 392be17462b02d89a1ca0aa713f436caa5046525 234 lines 5.2 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "log/slog" 8 9 "github.com/bluesky-social/indigo/api/atproto" 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 xrpclib "github.com/bluesky-social/indigo/xrpc" 13 "github.com/ipfs/go-cid" 14) 15 16type MissingRecordType string 17 18const ( 19 MissingRecordTypeProfile MissingRecordType = "profile" 20 MissingRecordTypePost MissingRecordType = "post" 21 MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator" 22) 23 24type MissingRecord struct { 25 Type MissingRecordType 26 Identifier string // DID for profiles, AT-URI for posts/feedgens 27 Wait bool 28 29 waitch chan struct{} 30} 31 32func (s *Server) addMissingRecord(ctx context.Context, rec MissingRecord) { 33 if rec.Wait { 34 rec.waitch = make(chan struct{}) 35 } 36 37 select { 38 case s.missingRecords <- rec: 39 case <-ctx.Done(): 40 } 41 42 if rec.Wait { 43 select { 44 case <-rec.waitch: 45 case <-ctx.Done(): 46 } 47 } 48} 49 50// Legacy methods for backward compatibility 51func (s *Server) addMissingProfile(ctx context.Context, did string) { 52 s.addMissingRecord(ctx, MissingRecord{ 53 Type: MissingRecordTypeProfile, 54 Identifier: did, 55 }) 56} 57 58func (s *Server) addMissingPost(ctx context.Context, uri string) { 59 slog.Info("adding missing post to fetch queue", "uri", uri) 60 s.addMissingRecord(ctx, MissingRecord{ 61 Type: MissingRecordTypePost, 62 Identifier: uri, 63 }) 64} 65 66func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) { 67 slog.Info("adding missing feed generator to fetch queue", "uri", uri) 68 s.addMissingRecord(ctx, MissingRecord{ 69 Type: MissingRecordTypeFeedGenerator, 70 Identifier: uri, 71 }) 72} 73 74func (s *Server) missingRecordFetcher() { 75 for rec := range s.missingRecords { 76 var err error 77 switch rec.Type { 78 case MissingRecordTypeProfile: 79 err = s.fetchMissingProfile(context.TODO(), rec.Identifier) 80 case MissingRecordTypePost: 81 err = s.fetchMissingPost(context.TODO(), rec.Identifier) 82 case MissingRecordTypeFeedGenerator: 83 err = s.fetchMissingFeedGenerator(context.TODO(), rec.Identifier) 84 default: 85 slog.Error("unknown missing record type", "type", rec.Type) 86 continue 87 } 88 89 if err != nil { 90 slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err) 91 } 92 93 if rec.Wait { 94 close(rec.waitch) 95 } 96 } 97} 98 99func (s *Server) fetchMissingProfile(ctx context.Context, did string) error { 100 s.backend.addRelevantDid(did) 101 102 repo, err := s.backend.getOrCreateRepo(ctx, did) 103 if err != nil { 104 return err 105 } 106 107 resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 108 if err != nil { 109 return err 110 } 111 112 c := &xrpclib.Client{ 113 Host: resp.PDSEndpoint(), 114 } 115 116 rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self") 117 if err != nil { 118 return err 119 } 120 121 prof, ok := rec.Value.Val.(*bsky.ActorProfile) 122 if !ok { 123 return fmt.Errorf("record we got back wasnt a profile somehow") 124 } 125 126 buf := new(bytes.Buffer) 127 if err := prof.MarshalCBOR(buf); err != nil { 128 return err 129 } 130 131 cc, err := cid.Decode(*rec.Cid) 132 if err != nil { 133 return err 134 } 135 136 return s.backend.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc) 137} 138 139func (s *Server) fetchMissingPost(ctx context.Context, uri string) error { 140 puri, err := syntax.ParseATURI(uri) 141 if err != nil { 142 return fmt.Errorf("invalid AT URI: %s", uri) 143 } 144 145 did := puri.Authority().String() 146 collection := puri.Collection().String() 147 rkey := puri.RecordKey().String() 148 149 s.backend.addRelevantDid(did) 150 151 repo, err := s.backend.getOrCreateRepo(ctx, did) 152 if err != nil { 153 return err 154 } 155 156 resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 157 if err != nil { 158 return err 159 } 160 161 c := &xrpclib.Client{ 162 Host: resp.PDSEndpoint(), 163 } 164 165 rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 166 if err != nil { 167 return err 168 } 169 170 post, ok := rec.Value.Val.(*bsky.FeedPost) 171 if !ok { 172 return fmt.Errorf("record we got back wasn't a post somehow") 173 } 174 175 buf := new(bytes.Buffer) 176 if err := post.MarshalCBOR(buf); err != nil { 177 return err 178 } 179 180 cc, err := cid.Decode(*rec.Cid) 181 if err != nil { 182 return err 183 } 184 185 return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc) 186} 187 188func (s *Server) fetchMissingFeedGenerator(ctx context.Context, uri string) error { 189 puri, err := syntax.ParseATURI(uri) 190 if err != nil { 191 return fmt.Errorf("invalid AT URI: %s", uri) 192 } 193 194 did := puri.Authority().String() 195 collection := puri.Collection().String() 196 rkey := puri.RecordKey().String() 197 s.backend.addRelevantDid(did) 198 199 repo, err := s.backend.getOrCreateRepo(ctx, did) 200 if err != nil { 201 return err 202 } 203 204 resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 205 if err != nil { 206 return err 207 } 208 209 c := &xrpclib.Client{ 210 Host: resp.PDSEndpoint(), 211 } 212 213 rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 214 if err != nil { 215 return err 216 } 217 218 feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator) 219 if !ok { 220 return fmt.Errorf("record we got back wasn't a feed generator somehow") 221 } 222 223 buf := new(bytes.Buffer) 224 if err := feedGen.MarshalCBOR(buf); err != nil { 225 return err 226 } 227 228 cc, err := cid.Decode(*rec.Cid) 229 if err != nil { 230 return err 231 } 232 233 return s.backend.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc) 234}