Live video on the AT Protocol
1package atproto
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 _ "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/atproto/identity"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/repo"
15 "github.com/bluesky-social/indigo/xrpc"
16 "github.com/ipfs/go-cid"
17 "go.opentelemetry.io/otel"
18 "stream.place/streamplace/pkg/aqhttp"
19 "stream.place/streamplace/pkg/log"
20 "stream.place/streamplace/pkg/model"
21)
22
23var SyncGetRepo = comatproto.SyncGetRepo
24
25func (atsync *ATProtoSynchronizer) SyncBlueskyRepoCached(ctx context.Context, handle string, mod model.Model) (*model.Repo, error) {
26 ctx, span := otel.Tracer("signer").Start(ctx, "SyncBlueskyRepoCached")
27 defer span.End()
28 repo, err := mod.GetRepoByHandleOrDID(handle)
29 if err != nil {
30 return nil, fmt.Errorf("failed to get repo for %s: %w", handle, err)
31 }
32 if repo != nil {
33 return repo, nil
34 }
35
36 return atsync.SyncBlueskyRepo(ctx, handle, mod)
37}
38
39type mstNode struct {
40 rkey syntax.RecordKey
41 collection syntax.NSID
42}
43
44func (atsync *ATProtoSynchronizer) SyncBlueskyRepo(ctx context.Context, handle string, mod model.Model) (*model.Repo, error) {
45 ident, err := atsync.resolveIdent(ctx, handle, true)
46 if err != nil {
47 return nil, fmt.Errorf("failed to resolve Bluesky handle %s: %w", handle, err)
48 }
49
50 ctx = log.WithLogValues(ctx, "did", ident.DID.String(), "handle", ident.Handle.String())
51
52 handleLock := handleLocks.GetLock(ident.DID.String())
53 handleLock.Lock()
54 defer handleLock.Unlock()
55
56 rev := ""
57 oldRepo, err := mod.GetRepo(ident.DID.String())
58 if err != nil {
59 return nil, fmt.Errorf("failed to get DID record for %s: %w", ident.DID.String(), err)
60 }
61 if oldRepo != nil {
62 log.Log(ctx, "found existing DID record", "did", oldRepo.DID, "version", oldRepo.Version)
63 return oldRepo, nil
64 } else {
65 // create an empty repo while we sync. this is useful because we'll start monitoring the firehose for
66 // any new follows and such from this user while we're syncing, which can take a long time
67 newRepo := model.Repo{
68 DID: ident.DID.String(),
69 PDS: ident.PDSEndpoint(),
70 Version: "",
71 Handle: ident.Handle.String(),
72 }
73 err = mod.UpdateRepo(&newRepo)
74 if err != nil {
75 return nil, fmt.Errorf("failed to create empty DID record for %s: %w", ident.DID.String(), err)
76 }
77 err = atsync.StatefulDB.AddRepo(ident.DID.String())
78 if err != nil {
79 return nil, fmt.Errorf("failed to add repo to stateful DB for %s: %w", ident.DID.String(), err)
80 }
81 }
82
83 log.Log(ctx, "resolved bluesky identity", "did", ident.DID, "handle", ident.Handle, "pds", ident.PDSEndpoint())
84 pdsLock := pdsLocks.GetLock(ident.PDSEndpoint())
85 xrpcc := xrpc.Client{
86 Host: ident.PDSEndpoint(),
87 Client: &aqhttp.Client,
88 }
89 if xrpcc.Host == "" {
90 return nil, fmt.Errorf("no PDS endpoint found for Bluesky identity %s", handle)
91 }
92 pdsLock.Lock()
93 repoBytes, err := SyncGetRepo(ctx, &xrpcc, ident.DID.String(), rev)
94 pdsLock.Unlock()
95 if err != nil {
96 return nil, fmt.Errorf("failed to fetch repo for %s from PDS %s: %w", ident.DID.String(), xrpcc.Host, err)
97 }
98
99 // uncomment for saving new test cases:
100
101 // timestamp := time.Now().Unix()
102 // filename := fmt.Sprintf("%d.base64", timestamp)
103 // encodedBytes := base64.URLEncoding.EncodeToString(repoBytes)
104 // err = os.WriteFile(filename, []byte(encodedBytes), 0644)
105 // if err != nil {
106 // return nil, fmt.Errorf("failed to write encoded repo bytes to file: %w", err)
107 // }
108
109 log.Debug(ctx, "got diff", "bytes", len(repoBytes))
110
111 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repoBytes))
112 if err != nil {
113 return nil, fmt.Errorf("failed to parse repo CAR data for %s: %w", ident.DID.String(), err)
114 }
115 // extract DID from repo commit
116 sc := r.SignedCommit()
117 signerDID, err := syntax.ParseDID(sc.Did)
118 if err != nil {
119 return nil, fmt.Errorf("invalid DID in repo commit for %s: %w", ident.DID.String(), err)
120 }
121 if signerDID != ident.DID {
122 return nil, fmt.Errorf("signer DID %s does not match identity %s", signerDID, ident.DID.String())
123 }
124
125 err = r.ForEach(ctx, "", func(k string, v cid.Cid) error {
126 nsid, rkey, err := syntax.ParseRepoPath(k)
127 if err != nil {
128 log.Warn(ctx, "failed to parse repo path", "k", k, "err", err)
129 return fmt.Errorf("could not parse repo path %s: %w", k, err)
130 }
131 _, bs, err := r.GetRecordBytes(ctx, k)
132 if err != nil {
133 log.Warn(ctx, "failed to get record bytes", "k", k, "rkey", rkey, "err", err)
134 return fmt.Errorf("could not retrieve record bytes for %s (rkey: %s): %w", k, rkey, err)
135 }
136 log.Debug(ctx, "record type", "key", k, "type", nsid.String())
137
138 err = atsync.handleCreateUpdate(ctx, signerDID.String(), rkey, bs, v.String(), nsid, false, true)
139 if err != nil {
140 log.Warn(ctx, "failed to handle create update", "err", err)
141 // invalid CBOR and stuff should get ignored, so
142 // return fmt.Errorf("failed to process record update for %s (type: %s): %w", k, nsid.String(), err)
143 }
144 return nil
145 })
146 if err != nil {
147 return nil, fmt.Errorf("failed to iterate over repo: %w", err)
148 }
149
150 newRepo := model.Repo{
151 DID: ident.DID.String(),
152 PDS: ident.PDSEndpoint(),
153 Version: sc.Rev,
154 Handle: ident.Handle.String(),
155 }
156 err = mod.UpdateRepo(&newRepo)
157 if err != nil {
158 return nil, fmt.Errorf("failed to update DID record for %s: %w", sc.Did, err)
159 }
160 err = atsync.StatefulDB.AddRepo(ident.DID.String())
161 if err != nil {
162 return nil, fmt.Errorf("failed to add repo to stateful DB for %s: %w", ident.DID.String(), err)
163 }
164
165 return &newRepo, nil
166}
167
168func (atsync *ATProtoSynchronizer) RefreshIdentity(ctx context.Context, did string) (*identity.Identity, error) {
169 id, err := atsync.resolveIdent(ctx, did, false)
170 if err != nil {
171 return nil, fmt.Errorf("failed to resolve ident: %w", err)
172 }
173 newRepo := model.Repo{
174 DID: id.DID.String(),
175 PDS: id.PDSEndpoint(),
176 Handle: id.Handle.String(),
177 }
178 err = atsync.Model.UpdateRepo(&newRepo)
179 if err != nil {
180 return nil, fmt.Errorf("failed to update repo: %w", err)
181 }
182 return id, nil
183}
184
185func (atsync *ATProtoSynchronizer) resolveIdent(ctx context.Context, arg string, cached bool) (*identity.Identity, error) {
186 if atsync.PLCDirectory == nil {
187 atsync.PLCDirectory = CustomDirectory(atsync.CLI.PLCURL)
188 }
189 if atsync.CachedPLCDirectory == nil {
190 cachedDir := identity.NewCacheDirectory(atsync.PLCDirectory, 250_000, time.Hour*24, time.Minute*2, time.Minute*5)
191 atsync.CachedPLCDirectory = &cachedDir
192 }
193 dir := atsync.PLCDirectory
194 if cached {
195 dir = atsync.CachedPLCDirectory
196 }
197 id, err := syntax.ParseAtIdentifier(arg)
198 if err != nil {
199 return nil, err
200 }
201
202 resolvedID, err := dir.Lookup(ctx, *id)
203 if err != nil {
204 return nil, err
205 }
206 log.Log(ctx, "resolved ident", "id", resolvedID.DID.String(), "handle", resolvedID.Handle.String())
207
208 return resolvedID, nil
209}
210func CustomDirectory(plcURL string) identity.Directory {
211 base := identity.BaseDirectory{
212 PLCURL: plcURL,
213 HTTPClient: aqhttp.Client,
214 Resolver: net.Resolver{
215 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
216 d := net.Dialer{Timeout: time.Second * 3}
217 return d.DialContext(ctx, network, address)
218 },
219 },
220 TryAuthoritativeDNS: true,
221 // primary Bluesky PDS instance only supports HTTP resolution method
222 SkipDNSDomainSuffixes: []string{".bsky.social"},
223 }
224 return &base
225}
226
227func DIDDoc(host string) map[string]any {
228 return map[string]any{
229 "@context": []string{
230 "https://www.w3.org/ns/did/v1",
231 "https://w3id.org/security/multikey/v1",
232 "https://w3id.org/security/suites/secp256k1-2019/v1",
233 },
234 "id": fmt.Sprintf("did:web:%s", host),
235 "alsoKnownAs": []string{},
236 "service": []map[string]any{
237 {
238 "id": "#bsky_fg",
239 "type": "BskyFeedGenerator",
240 "serviceEndpoint": fmt.Sprintf("https://%s", host),
241 },
242 {
243 "id": "#atproto_pds",
244 "type": "AtprotoPersonalDataServer",
245 "serviceEndpoint": fmt.Sprintf("https://%s", host),
246 },
247 },
248 "verificationMethod": []map[string]any{
249 {
250 "id": fmt.Sprintf("did:web:%s#atproto", host),
251 "type": "Multikey",
252 "controller": fmt.Sprintf("did:web:%s", host),
253 "publicKeyMultibase": LexiconPubMultibase,
254 },
255 },
256 }
257}