A locally focused bluesky appview
at master 4.5 kB view raw
1package backend 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "log/slog" 8 9 "github.com/bluesky-social/indigo/api/atproto" 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 xrpclib "github.com/bluesky-social/indigo/xrpc" 13 "github.com/ipfs/go-cid" 14) 15 16type MissingRecordType string 17 18const ( 19 MissingRecordTypeProfile MissingRecordType = "profile" 20 MissingRecordTypePost MissingRecordType = "post" 21 MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator" 22 MissingRecordTypeUnknown MissingRecordType = "unknown" 23) 24 25type MissingRecord struct { 26 Type MissingRecordType 27 Identifier string // DID for profiles, AT-URI for posts/feedgens 28 Wait bool 29 30 waitch chan struct{} 31} 32 33func (b *PostgresBackend) addMissingRecord(ctx context.Context, rec MissingRecord) { 34 if rec.Wait { 35 rec.waitch = make(chan struct{}) 36 } 37 38 select { 39 case b.missingRecords <- rec: 40 case <-ctx.Done(): 41 } 42 43 if rec.Wait { 44 select { 45 case <-rec.waitch: 46 case <-ctx.Done(): 47 } 48 } 49} 50 51func (b *PostgresBackend) missingRecordFetcher() { 52 for rec := range b.missingRecords { 53 var err error 54 switch rec.Type { 55 case MissingRecordTypeProfile: 56 err = b.fetchMissingProfile(context.TODO(), rec.Identifier) 57 case MissingRecordTypePost: 58 err = b.fetchMissingPost(context.TODO(), rec.Identifier) 59 case MissingRecordTypeFeedGenerator: 60 err = b.fetchMissingFeedGenerator(context.TODO(), rec.Identifier) 61 default: 62 slog.Error("unknown missing record type", "type", rec.Type) 63 continue 64 } 65 66 if err != nil { 67 slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err) 68 } 69 70 if rec.Wait { 71 close(rec.waitch) 72 } 73 } 74} 75 76func (b *PostgresBackend) fetchMissingProfile(ctx context.Context, did string) error { 77 b.AddRelevantDid(did) 78 79 repo, err := b.GetOrCreateRepo(ctx, did) 80 if err != nil { 81 return err 82 } 83 84 resp, err := b.dir.LookupDID(ctx, syntax.DID(did)) 85 if err != nil { 86 return err 87 } 88 89 c := &xrpclib.Client{ 90 Host: resp.PDSEndpoint(), 91 } 92 93 rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self") 94 if err != nil { 95 return err 96 } 97 98 prof, ok := rec.Value.Val.(*bsky.ActorProfile) 99 if !ok { 100 return fmt.Errorf("record we got back wasnt a profile somehow") 101 } 102 103 buf := new(bytes.Buffer) 104 if err := prof.MarshalCBOR(buf); err != nil { 105 return err 106 } 107 108 cc, err := cid.Decode(*rec.Cid) 109 if err != nil { 110 return err 111 } 112 113 return b.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc) 114} 115 116func (b *PostgresBackend) fetchMissingPost(ctx context.Context, uri string) error { 117 puri, err := syntax.ParseATURI(uri) 118 if err != nil { 119 return fmt.Errorf("invalid AT URI: %s", uri) 120 } 121 122 did := puri.Authority().String() 123 collection := puri.Collection().String() 124 rkey := puri.RecordKey().String() 125 126 b.AddRelevantDid(did) 127 128 repo, err := b.GetOrCreateRepo(ctx, did) 129 if err != nil { 130 return err 131 } 132 133 resp, err := b.dir.LookupDID(ctx, syntax.DID(did)) 134 if err != nil { 135 return err 136 } 137 138 c := &xrpclib.Client{ 139 Host: resp.PDSEndpoint(), 140 } 141 142 rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 143 if err != nil { 144 return err 145 } 146 147 post, ok := rec.Value.Val.(*bsky.FeedPost) 148 if !ok { 149 return fmt.Errorf("record we got back wasn't a post somehow") 150 } 151 152 buf := new(bytes.Buffer) 153 if err := post.MarshalCBOR(buf); err != nil { 154 return err 155 } 156 157 cc, err := cid.Decode(*rec.Cid) 158 if err != nil { 159 return err 160 } 161 162 return b.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc) 163} 164 165func (b *PostgresBackend) fetchMissingFeedGenerator(ctx context.Context, uri string) error { 166 puri, err := syntax.ParseATURI(uri) 167 if err != nil { 168 return fmt.Errorf("invalid AT URI: %s", uri) 169 } 170 171 did := puri.Authority().String() 172 collection := puri.Collection().String() 173 rkey := puri.RecordKey().String() 174 b.AddRelevantDid(did) 175 176 repo, err := b.GetOrCreateRepo(ctx, did) 177 if err != nil { 178 return err 179 } 180 181 resp, err := b.dir.LookupDID(ctx, syntax.DID(did)) 182 if err != nil { 183 return err 184 } 185 186 c := &xrpclib.Client{ 187 Host: resp.PDSEndpoint(), 188 } 189 190 rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 191 if err != nil { 192 return err 193 } 194 195 feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator) 196 if !ok { 197 return fmt.Errorf("record we got back wasn't a feed generator somehow") 198 } 199 200 buf := new(bytes.Buffer) 201 if err := feedGen.MarshalCBOR(buf); err != nil { 202 return err 203 } 204 205 cc, err := cid.Decode(*rec.Cid) 206 if err != nil { 207 return err 208 } 209 210 return b.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc) 211}