Live video on the AT Protocol
at natb/command-errors 257 lines 8.3 kB view raw
1package atproto 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 _ "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/identity" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/repo" 15 "github.com/bluesky-social/indigo/xrpc" 16 "github.com/ipfs/go-cid" 17 "go.opentelemetry.io/otel" 18 "stream.place/streamplace/pkg/aqhttp" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/model" 21) 22 23var SyncGetRepo = comatproto.SyncGetRepo 24 25func (atsync *ATProtoSynchronizer) SyncBlueskyRepoCached(ctx context.Context, handle string, mod model.Model) (*model.Repo, error) { 26 ctx, span := otel.Tracer("signer").Start(ctx, "SyncBlueskyRepoCached") 27 defer span.End() 28 repo, err := mod.GetRepoByHandleOrDID(handle) 29 if err != nil { 30 return nil, fmt.Errorf("failed to get repo for %s: %w", handle, err) 31 } 32 if repo != nil { 33 return repo, nil 34 } 35 36 return atsync.SyncBlueskyRepo(ctx, handle, mod) 37} 38 39type mstNode struct { 40 rkey syntax.RecordKey 41 collection syntax.NSID 42} 43 44func (atsync *ATProtoSynchronizer) SyncBlueskyRepo(ctx context.Context, handle string, mod model.Model) (*model.Repo, error) { 45 ident, err := atsync.resolveIdent(ctx, handle, true) 46 if err != nil { 47 return nil, fmt.Errorf("failed to resolve Bluesky handle %s: %w", handle, err) 48 } 49 50 ctx = log.WithLogValues(ctx, "did", ident.DID.String(), "handle", ident.Handle.String()) 51 52 handleLock := handleLocks.GetLock(ident.DID.String()) 53 handleLock.Lock() 54 defer handleLock.Unlock() 55 56 rev := "" 57 oldRepo, err := mod.GetRepo(ident.DID.String()) 58 if err != nil { 59 return nil, fmt.Errorf("failed to get DID record for %s: %w", ident.DID.String(), err) 60 } 61 if oldRepo != nil { 62 log.Log(ctx, "found existing DID record", "did", oldRepo.DID, "version", oldRepo.Version) 63 return oldRepo, nil 64 } else { 65 // create an empty repo while we sync. this is useful because we'll start monitoring the firehose for 66 // any new follows and such from this user while we're syncing, which can take a long time 67 newRepo := model.Repo{ 68 DID: ident.DID.String(), 69 PDS: ident.PDSEndpoint(), 70 Version: "", 71 Handle: ident.Handle.String(), 72 } 73 err = mod.UpdateRepo(&newRepo) 74 if err != nil { 75 return nil, fmt.Errorf("failed to create empty DID record for %s: %w", ident.DID.String(), err) 76 } 77 err = atsync.StatefulDB.AddRepo(ident.DID.String()) 78 if err != nil { 79 return nil, fmt.Errorf("failed to add repo to stateful DB for %s: %w", ident.DID.String(), err) 80 } 81 } 82 83 log.Log(ctx, "resolved bluesky identity", "did", ident.DID, "handle", ident.Handle, "pds", ident.PDSEndpoint()) 84 pdsLock := pdsLocks.GetLock(ident.PDSEndpoint()) 85 xrpcc := xrpc.Client{ 86 Host: ident.PDSEndpoint(), 87 Client: &aqhttp.Client, 88 } 89 if xrpcc.Host == "" { 90 return nil, fmt.Errorf("no PDS endpoint found for Bluesky identity %s", handle) 91 } 92 pdsLock.Lock() 93 repoBytes, err := SyncGetRepo(ctx, &xrpcc, ident.DID.String(), rev) 94 pdsLock.Unlock() 95 if err != nil { 96 return nil, fmt.Errorf("failed to fetch repo for %s from PDS %s: %w", ident.DID.String(), xrpcc.Host, err) 97 } 98 99 // uncomment for saving new test cases: 100 101 // timestamp := time.Now().Unix() 102 // filename := fmt.Sprintf("%d.base64", timestamp) 103 // encodedBytes := base64.URLEncoding.EncodeToString(repoBytes) 104 // err = os.WriteFile(filename, []byte(encodedBytes), 0644) 105 // if err != nil { 106 // return nil, fmt.Errorf("failed to write encoded repo bytes to file: %w", err) 107 // } 108 109 log.Debug(ctx, "got diff", "bytes", len(repoBytes)) 110 111 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repoBytes)) 112 if err != nil { 113 return nil, fmt.Errorf("failed to parse repo CAR data for %s: %w", ident.DID.String(), err) 114 } 115 // extract DID from repo commit 116 sc := r.SignedCommit() 117 signerDID, err := syntax.ParseDID(sc.Did) 118 if err != nil { 119 return nil, fmt.Errorf("invalid DID in repo commit for %s: %w", ident.DID.String(), err) 120 } 121 if signerDID != ident.DID { 122 return nil, fmt.Errorf("signer DID %s does not match identity %s", signerDID, ident.DID.String()) 123 } 124 125 err = r.ForEach(ctx, "", func(k string, v cid.Cid) error { 126 nsid, rkey, err := syntax.ParseRepoPath(k) 127 if err != nil { 128 log.Warn(ctx, "failed to parse repo path", "k", k, "err", err) 129 return fmt.Errorf("could not parse repo path %s: %w", k, err) 130 } 131 _, bs, err := r.GetRecordBytes(ctx, k) 132 if err != nil { 133 log.Warn(ctx, "failed to get record bytes", "k", k, "rkey", rkey, "err", err) 134 return fmt.Errorf("could not retrieve record bytes for %s (rkey: %s): %w", k, rkey, err) 135 } 136 log.Debug(ctx, "record type", "key", k, "type", nsid.String()) 137 138 err = atsync.handleCreateUpdate(ctx, signerDID.String(), rkey, bs, v.String(), nsid, false, true) 139 if err != nil { 140 log.Warn(ctx, "failed to handle create update", "err", err) 141 // invalid CBOR and stuff should get ignored, so 142 // return fmt.Errorf("failed to process record update for %s (type: %s): %w", k, nsid.String(), err) 143 } 144 return nil 145 }) 146 if err != nil { 147 return nil, fmt.Errorf("failed to iterate over repo: %w", err) 148 } 149 150 newRepo := model.Repo{ 151 DID: ident.DID.String(), 152 PDS: ident.PDSEndpoint(), 153 Version: sc.Rev, 154 Handle: ident.Handle.String(), 155 } 156 err = mod.UpdateRepo(&newRepo) 157 if err != nil { 158 return nil, fmt.Errorf("failed to update DID record for %s: %w", sc.Did, err) 159 } 160 err = atsync.StatefulDB.AddRepo(ident.DID.String()) 161 if err != nil { 162 return nil, fmt.Errorf("failed to add repo to stateful DB for %s: %w", ident.DID.String(), err) 163 } 164 165 return &newRepo, nil 166} 167 168func (atsync *ATProtoSynchronizer) RefreshIdentity(ctx context.Context, did string) (*identity.Identity, error) { 169 id, err := atsync.resolveIdent(ctx, did, false) 170 if err != nil { 171 return nil, fmt.Errorf("failed to resolve ident: %w", err) 172 } 173 newRepo := model.Repo{ 174 DID: id.DID.String(), 175 PDS: id.PDSEndpoint(), 176 Handle: id.Handle.String(), 177 } 178 err = atsync.Model.UpdateRepo(&newRepo) 179 if err != nil { 180 return nil, fmt.Errorf("failed to update repo: %w", err) 181 } 182 return id, nil 183} 184 185func (atsync *ATProtoSynchronizer) resolveIdent(ctx context.Context, arg string, cached bool) (*identity.Identity, error) { 186 if atsync.PLCDirectory == nil { 187 atsync.PLCDirectory = CustomDirectory(atsync.CLI.PLCURL) 188 } 189 if atsync.CachedPLCDirectory == nil { 190 cachedDir := identity.NewCacheDirectory(atsync.PLCDirectory, 250_000, time.Hour*24, time.Minute*2, time.Minute*5) 191 atsync.CachedPLCDirectory = &cachedDir 192 } 193 dir := atsync.PLCDirectory 194 if cached { 195 dir = atsync.CachedPLCDirectory 196 } 197 id, err := syntax.ParseAtIdentifier(arg) 198 if err != nil { 199 return nil, err 200 } 201 202 resolvedID, err := dir.Lookup(ctx, *id) 203 if err != nil { 204 return nil, err 205 } 206 log.Log(ctx, "resolved ident", "id", resolvedID.DID.String(), "handle", resolvedID.Handle.String()) 207 208 return resolvedID, nil 209} 210func CustomDirectory(plcURL string) identity.Directory { 211 base := identity.BaseDirectory{ 212 PLCURL: plcURL, 213 HTTPClient: aqhttp.Client, 214 Resolver: net.Resolver{ 215 Dial: func(ctx context.Context, network, address string) (net.Conn, error) { 216 d := net.Dialer{Timeout: time.Second * 3} 217 return d.DialContext(ctx, network, address) 218 }, 219 }, 220 TryAuthoritativeDNS: true, 221 // primary Bluesky PDS instance only supports HTTP resolution method 222 SkipDNSDomainSuffixes: []string{".bsky.social"}, 223 } 224 return &base 225} 226 227func DIDDoc(host string) map[string]any { 228 return map[string]any{ 229 "@context": []string{ 230 "https://www.w3.org/ns/did/v1", 231 "https://w3id.org/security/multikey/v1", 232 "https://w3id.org/security/suites/secp256k1-2019/v1", 233 }, 234 "id": fmt.Sprintf("did:web:%s", host), 235 "alsoKnownAs": []string{}, 236 "service": []map[string]any{ 237 { 238 "id": "#bsky_fg", 239 "type": "BskyFeedGenerator", 240 "serviceEndpoint": fmt.Sprintf("https://%s", host), 241 }, 242 { 243 "id": "#atproto_pds", 244 "type": "AtprotoPersonalDataServer", 245 "serviceEndpoint": fmt.Sprintf("https://%s", host), 246 }, 247 }, 248 "verificationMethod": []map[string]any{ 249 { 250 "id": fmt.Sprintf("did:web:%s#atproto", host), 251 "type": "Multikey", 252 "controller": fmt.Sprintf("did:web:%s", host), 253 "publicKeyMultibase": LexiconPubMultibase, 254 }, 255 }, 256 } 257}