Live video on the AT Protocol
1package atproto
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "io/fs"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 atcrypto "github.com/bluesky-social/indigo/atproto/crypto"
16 "github.com/bluesky-social/indigo/atproto/data"
17 "github.com/bluesky-social/indigo/atproto/lexicon"
18 "github.com/bluesky-social/indigo/carstore"
19 lexutil "github.com/bluesky-social/indigo/lex/util"
20 "github.com/bluesky-social/indigo/models"
21 "github.com/bluesky-social/indigo/mst"
22 atrepo "github.com/bluesky-social/indigo/repo"
23 "github.com/bluesky-social/indigo/util"
24 "github.com/ipfs/go-cid"
25 cbg "github.com/whyrusleeping/cbor-gen"
26
27 "github.com/whyrusleeping/go-did"
28 "stream.place/streamplace/lexicons"
29 "stream.place/streamplace/pkg/config"
30 "stream.place/streamplace/pkg/log"
31 "stream.place/streamplace/pkg/model"
32 "stream.place/streamplace/pkg/spid"
33 "stream.place/streamplace/pkg/statedb"
34)
35
36var LexiconRepo *atrepo.Repo
37var LexiconPubMultibase string
38var RepoUser models.Uid = models.Uid(1)
39var CarStore carstore.CarStore
40var ActionCreate = "create"
41var ActionUpdate = "update"
42var ActionDelete = "delete"
43
44func walkLexicons(ctx context.Context, bundle fs.FS, path string) ([][]byte, error) {
45 ret := [][]byte{}
46 err := fs.WalkDir(bundle, ".", func(path string, d fs.DirEntry, err error) error {
47 if err != nil {
48 return err
49 }
50 if d.IsDir() {
51 return nil
52 }
53 if !strings.HasSuffix(path, ".json") {
54 return nil
55 }
56 lex, err := fs.ReadFile(bundle, path)
57 if err != nil {
58 return err
59 }
60 ret = append(ret, lex)
61 return nil
62 })
63 return ret, err
64}
65
66type SchemaFileWrapper struct {
67 LexiconTypeID string `json:"$type,const=com.atproto.lexicon.schema" cborgen:"$type,const=com.atproto.lexicon.schema"`
68 SchemaFile lexicon.SchemaFile
69}
70
71func (sfw *SchemaFileWrapper) MarshalCBOR(w io.Writer) error {
72 bs, err := json.Marshal(sfw.SchemaFile)
73 if err != nil {
74 return err
75 }
76 mapObj, err := data.UnmarshalJSON(bs)
77 if err != nil {
78 return err
79 }
80 mapObj["$type"] = "com.atproto.lexicon.schema"
81 cbs, err := data.MarshalCBOR(mapObj)
82 if err != nil {
83 return err
84 }
85 _, err = w.Write(cbs)
86 if err != nil {
87 return err
88 }
89 return nil
90}
91
92func (sfw *SchemaFileWrapper) MarshalJSON() ([]byte, error) {
93 bs, err := json.Marshal(sfw.SchemaFile)
94 if err != nil {
95 return nil, err
96 }
97 mapObj, err := data.UnmarshalJSON(bs)
98 if err != nil {
99 return nil, err
100 }
101 mapObj["$type"] = "com.atproto.lexicon.schema"
102 bs, err = json.Marshal(mapObj)
103 if err != nil {
104 return nil, err
105 }
106 return bs, nil
107}
108
109type SPKeyManager struct {
110 priv *did.PrivKey
111}
112
113func (km *SPKeyManager) VerifyUserSignature(ctx context.Context, did string, sb []byte, sig []byte) error {
114 panic("NYI")
115}
116
117func (km *SPKeyManager) SignForUser(ctx context.Context, did string, sb []byte) ([]byte, error) {
118 return km.priv.Sign(sb)
119}
120
121var AllFiles fs.FS = lexicons.AllFiles
122
123type Closer interface {
124 Close() error
125}
126
127type NoopCloser struct{}
128
129func (c *NoopCloser) Close() error {
130 return nil
131}
132
133func MakeLexiconRepo(ctx context.Context, cli *config.CLI, mod model.Model, state *statedb.StatefulDB) (Closer, error) {
134 ctx = log.WithLogValues(ctx, "func", "MakeLexiconRepo")
135 var err error
136
137 sqliteStore := &carstore.SQLiteStore{}
138
139 err = sqliteStore.Open(":memory:")
140 if err != nil {
141 return nil, err
142 }
143 CarStore = sqliteStore
144
145 var priv *atcrypto.PrivateKeyK256
146
147 keyBs, err := state.GetConfig("repo-key")
148 if err != nil {
149 return nil, err
150 }
151 if keyBs != nil {
152 // good path: we have a key in the stateful database
153 priv, err = atcrypto.ParsePrivateBytesK256(keyBs.Value)
154 if err != nil {
155 return nil, fmt.Errorf("failed to parse repo key from stateful database: %w", err)
156 }
157 } else {
158 // migration path: maybe we have an old one on disk.
159 exists, err := cli.DataFileExists([]string{"carstore", "repo.key"})
160 if err != nil {
161 return nil, err
162 }
163 if exists {
164 log.Warn(ctx, "found old repo key on disk, migrating to stateful database", "path", cli.DataFilePath([]string{"carstore", "repo.key"}))
165 buf := bytes.Buffer{}
166 err := cli.DataFileRead([]string{"carstore", "repo.key"}, &buf)
167 if err != nil {
168 return nil, err
169 }
170 priv, err = atcrypto.ParsePrivateBytesK256(buf.Bytes())
171 if err != nil {
172 return nil, fmt.Errorf("failed to read repo key from disk: %w", err)
173 }
174 } else {
175 priv, err = atcrypto.GeneratePrivateKeyK256()
176 if err != nil {
177 return nil, err
178 }
179 }
180 bs := priv.Bytes()
181 err = state.PutConfig("repo-key", bs)
182 if err != nil {
183 return nil, fmt.Errorf("failed to save repo key to stateful database: %w", err)
184 }
185 }
186
187 pub, err := priv.PublicKey()
188 if err != nil {
189 return nil, fmt.Errorf("failed to get public key from private key: %w", err)
190 }
191
192 LexiconPubMultibase = pub.Multibase()
193 signer := func(ctx context.Context, did string, sb []byte) ([]byte, error) {
194 return priv.HashAndSign(sb)
195 }
196
197 events, err := state.GetCommitEventsSince(cli.MyDID(), time.Time{})
198 if err != nil {
199 return nil, fmt.Errorf("failed to get commit events: %w", err)
200 }
201
202 var ses *carstore.DeltaSession
203 var currentRoot cid.Cid
204 var currentRev string
205
206 for _, event := range events {
207 evt, err := event.ToCommitEvent()
208 if err != nil {
209 return nil, fmt.Errorf("failed to convert event to commit event: %w", err)
210 }
211 currentRoot, ses, err = CarStore.ImportSlice(ctx, RepoUser, nil, evt.Blocks)
212 if err != nil {
213 return nil, fmt.Errorf("failed to import slice: %w", err)
214 }
215 currentRev = evt.Rev
216 }
217
218 if currentRoot == cid.Undef {
219 log.Warn(ctx, "no existing lexicon repo, creating new one")
220 ses, err = CarStore.NewDeltaSession(ctx, RepoUser, nil)
221 if err != nil {
222 return nil, fmt.Errorf("failed to create delta session: %w", err)
223 }
224 LexiconRepo = atrepo.NewRepo(ctx, cli.MyDID(), ses)
225 } else {
226 LexiconRepo, err = atrepo.OpenRepo(ctx, ses, currentRoot)
227 if err != nil {
228 return nil, fmt.Errorf("failed to open repo: %w", err)
229 }
230 }
231
232 LexiconPubMultibase = pub.Multibase()
233 lexs, err := walkLexicons(ctx, AllFiles, "/")
234 if err != nil {
235 return nil, fmt.Errorf("failed to walk lexicon files: %w", err)
236 }
237
238 ops := []*comatproto.SyncSubscribeRepos_RepoOp{}
239
240 for _, lex := range lexs {
241 lexFile := lexicon.SchemaFile{}
242 err := json.Unmarshal(lex, &lexFile)
243 if err != nil {
244 return nil, err
245 }
246 if !strings.HasPrefix(lexFile.ID, "place.stream") {
247 continue
248 }
249 sfw := &SchemaFileWrapper{SchemaFile: lexFile}
250 rpath := fmt.Sprintf("com.atproto.lexicon.schema/%s", lexFile.ID)
251 newCid, err := spid.GetCID(sfw)
252 if err != nil {
253 return nil, err
254 }
255 cidLink := lexutil.LexLink(*newCid)
256
257 oldCid, _, err := LexiconRepo.GetRecord(ctx, rpath)
258 if errors.Is(err, mst.ErrNotFound) {
259 _, err = LexiconRepo.PutRecord(ctx, rpath, sfw)
260 if err != nil {
261 return nil, err
262 }
263 log.Debug(ctx, "created new lexicon record", "rpath", rpath, "cid", newCid.String())
264 ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{
265 Action: ActionCreate,
266 Path: rpath,
267 Cid: &cidLink,
268 })
269 } else if err != nil {
270 return nil, err
271 } else {
272 if newCid.Equals(oldCid) {
273 log.Debug(ctx, "new cid is the same as old cid, skipping lexicon record", "rpath", rpath, "cid", newCid.String())
274 continue
275 } else {
276 log.Debug(ctx, "new cid is different from old cid, updating lexicon record", "rpath", rpath, "old", oldCid.String(), "new", newCid.String())
277 _, err = LexiconRepo.UpdateRecord(ctx, rpath, sfw)
278 if err != nil {
279 return nil, err
280 }
281 oldLink := lexutil.LexLink(oldCid)
282 ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{
283 Action: ActionUpdate,
284 Path: rpath,
285 Prev: &oldLink,
286 Cid: &cidLink,
287 })
288 }
289 }
290 currentRoot, currentRev, err = LexiconRepo.Commit(ctx, signer)
291 if err != nil {
292 return nil, fmt.Errorf("failed to commit: %w", err)
293 }
294
295 log.Debug(ctx, "LexiconRepo committed", "cid", currentRoot.String(), "rev", currentRev)
296 }
297 blocks, err := ses.CloseWithRoot(ctx, currentRoot, currentRev)
298 if err != nil {
299 return nil, fmt.Errorf("failed to close delta session: %w", err)
300 }
301 signed := LexiconRepo.SignedCommit()
302 if len(ops) > 0 {
303 log.Log(ctx, "created new lexicon commit for changes", "did", signed.Did, "data", signed.Data, "prev", signed.Prev, "rev", signed.Rev)
304 commit := &comatproto.SyncSubscribeRepos_Commit{
305 Repo: cli.MyDID(),
306 Blocks: blocks,
307 Rev: currentRev,
308 Commit: lexutil.LexLink(currentRoot),
309 Time: time.Now().Format(util.ISO8601),
310 Ops: ops,
311 TooBig: false,
312 }
313 err := state.CreateCommitEvent(commit, signed.Data.String())
314 if err != nil {
315 return nil, fmt.Errorf("failed to create commit event: %w", err)
316 }
317 }
318
319 return &NoopCloser{}, nil
320}
321
322func OpenLexiconRepo(ctx context.Context) (*atrepo.Repo, *carstore.DeltaSession, error) {
323 ses, err := CarStore.NewDeltaSession(ctx, RepoUser, nil)
324 if err != nil {
325 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to create delta session: %w", err)
326 }
327
328 base := ses.BaseCid()
329 if base == cid.Undef {
330 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: delta session has no base cid")
331 }
332
333 r, err := atrepo.OpenRepo(ctx, ses, base)
334 if err != nil {
335 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to open repo: %w", err)
336 }
337 return r, ses, nil
338}
339
340// Get record that handles special-casing for com.atproto.lexicon.schema
341func GetRecordCBOR(ctx context.Context, ses *carstore.DeltaSession, c cid.Cid, collection string, rkey string) (cbg.CBORMarshaler, error) {
342 b, err := ses.Get(ctx, c)
343 if err != nil {
344 return nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to get record for collection %q, rkey %q: %w", collection, rkey, err)
345 }
346 var val cbg.CBORMarshaler
347 if collection == "com.atproto.lexicon.schema" {
348 sfMap, err := data.UnmarshalCBOR(b.RawData())
349 if err != nil {
350 return nil, fmt.Errorf("failed to unmarshal schema file: %w", err)
351 }
352 jbs, err := json.Marshal(sfMap)
353 if err != nil {
354 return nil, fmt.Errorf("failed to marshal schema file: %w", err)
355 }
356 sf := lexicon.SchemaFile{}
357 err = json.Unmarshal(jbs, &sf)
358 if err != nil {
359 return nil, fmt.Errorf("failed to unmarshal schema file: %w", err)
360 }
361 val = &SchemaFileWrapper{
362 SchemaFile: sf,
363 }
364 } else {
365 val, err = lexutil.CborDecodeValue(b.RawData())
366 if err != nil {
367 return nil, fmt.Errorf("failed to decode record: %w", err)
368 }
369 }
370 return val, nil
371}