Live video on the AT Protocol
1package atproto
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "io/fs"
11 "strings"
12 "time"
13
14 comatproto "github.com/bluesky-social/indigo/api/atproto"
15 atcrypto "github.com/bluesky-social/indigo/atproto/crypto"
16 "github.com/bluesky-social/indigo/atproto/data"
17 "github.com/bluesky-social/indigo/atproto/lexicon"
18 "github.com/bluesky-social/indigo/carstore"
19 lexutil "github.com/bluesky-social/indigo/lex/util"
20 "github.com/bluesky-social/indigo/models"
21 "github.com/bluesky-social/indigo/mst"
22 atrepo "github.com/bluesky-social/indigo/repo"
23 "github.com/bluesky-social/indigo/util"
24 "github.com/ipfs/go-cid"
25 cbg "github.com/whyrusleeping/cbor-gen"
26 "gorm.io/driver/sqlite"
27 "gorm.io/gorm"
28
29 "github.com/whyrusleeping/go-did"
30 "stream.place/streamplace/lexicons"
31 "stream.place/streamplace/pkg/config"
32 "stream.place/streamplace/pkg/log"
33 "stream.place/streamplace/pkg/model"
34)
35
36var LexiconRepo *atrepo.Repo
37var LexiconPubMultibase string
38var RepoUser models.Uid = models.Uid(1)
39var CarStore carstore.CarStore
40var ActionCreate = "create"
41var ActionUpdate = "update"
42var ActionDelete = "delete"
43
44func walkLexicons(ctx context.Context, bundle fs.FS, path string) ([][]byte, error) {
45 ret := [][]byte{}
46 err := fs.WalkDir(bundle, ".", func(path string, d fs.DirEntry, err error) error {
47 if err != nil {
48 return err
49 }
50 if d.IsDir() {
51 return nil
52 }
53 if !strings.HasSuffix(path, ".json") {
54 return nil
55 }
56 lex, err := fs.ReadFile(bundle, path)
57 if err != nil {
58 return err
59 }
60 ret = append(ret, lex)
61 return nil
62 })
63 return ret, err
64}
65
66type SchemaFileWrapper struct {
67 LexiconTypeID string `json:"$type,const=com.atproto.lexicon.schema" cborgen:"$type,const=com.atproto.lexicon.schema"`
68 SchemaFile lexicon.SchemaFile
69}
70
71func (sfw *SchemaFileWrapper) MarshalCBOR(w io.Writer) error {
72 bs, err := json.Marshal(sfw.SchemaFile)
73 if err != nil {
74 return err
75 }
76 mapObj, err := data.UnmarshalJSON(bs)
77 if err != nil {
78 return err
79 }
80 mapObj["$type"] = "com.atproto.lexicon.schema"
81 cbs, err := data.MarshalCBOR(mapObj)
82 if err != nil {
83 return err
84 }
85 _, err = w.Write(cbs)
86 if err != nil {
87 return err
88 }
89 return nil
90}
91
92func (sfw *SchemaFileWrapper) MarshalJSON() ([]byte, error) {
93 bs, err := json.Marshal(sfw.SchemaFile)
94 if err != nil {
95 return nil, err
96 }
97 mapObj, err := data.UnmarshalJSON(bs)
98 if err != nil {
99 return nil, err
100 }
101 mapObj["$type"] = "com.atproto.lexicon.schema"
102 bs, err = json.Marshal(mapObj)
103 if err != nil {
104 return nil, err
105 }
106 return bs, nil
107}
108
109type SPKeyManager struct {
110 priv *did.PrivKey
111}
112
113func (km *SPKeyManager) VerifyUserSignature(ctx context.Context, did string, sb []byte, sig []byte) error {
114 panic("NYI")
115}
116
117func (km *SPKeyManager) SignForUser(ctx context.Context, did string, sb []byte) ([]byte, error) {
118 return km.priv.Sign(sb)
119}
120
121var AllFiles fs.FS = lexicons.AllFiles
122
123type Closer interface {
124 Close() error
125}
126
127func MakeLexiconRepo(ctx context.Context, cli *config.CLI, mod model.Model) (Closer, error) {
128 ctx = log.WithLogValues(ctx, "func", "MakeLexiconRepo")
129 fd, err := cli.DataFileCreate([]string{"carstore", "empty"}, true)
130 if err != nil {
131 return nil, err
132 }
133 sqlitePath := cli.DataFilePath([]string{"carstore", "meta.sqlite"})
134
135 db, err := gorm.Open(sqlite.Open(sqlitePath))
136 if err != nil {
137 return nil, err
138 }
139 err = fd.Close()
140 if err != nil {
141 return nil, err
142 }
143 CarStore, err = carstore.NewCarStore(db, []string{
144 cli.DataFilePath([]string{"carstore"}),
145 })
146 if err != nil {
147 return nil, err
148 }
149
150 sqlDB, err := db.DB()
151 if err != nil {
152 return nil, err
153 }
154
155 var priv *atcrypto.PrivateKeyK256
156 exists, err := cli.DataFileExists([]string{"carstore", "repo.key"})
157 if err != nil {
158 return nil, err
159 }
160 if exists {
161 buf := bytes.Buffer{}
162 err := cli.DataFileRead([]string{"carstore", "repo.key"}, &buf)
163 if err != nil {
164 return nil, err
165 }
166 priv, err = atcrypto.ParsePrivateBytesK256(buf.Bytes())
167 if err != nil {
168 return nil, err
169 }
170 } else {
171 priv, err = atcrypto.GeneratePrivateKeyK256()
172 if err != nil {
173 return nil, err
174 }
175 bs := priv.Bytes()
176 err = cli.DataFileWrite([]string{"carstore", "repo.key"}, bytes.NewReader(bs), true)
177 if err != nil {
178 return nil, err
179 }
180 }
181
182 pub, err := priv.PublicKey()
183 if err != nil {
184 return nil, fmt.Errorf("failed to get public key from private key: %w", err)
185 }
186
187 LexiconPubMultibase = pub.Multibase()
188 signer := func(ctx context.Context, did string, sb []byte) ([]byte, error) {
189 return priv.HashAndSign(sb)
190 }
191
192 ses, err := CarStore.NewDeltaSession(ctx, RepoUser, nil)
193 if err != nil {
194 return nil, fmt.Errorf("failed to create delta session: %w", err)
195 }
196
197 currentRoot, err := CarStore.GetUserRepoHead(ctx, RepoUser)
198 if err != nil {
199 return nil, fmt.Errorf("failed to get user repo head: %w", err)
200 }
201 currentRev := ""
202
203 if currentRoot == cid.Undef {
204 LexiconRepo = atrepo.NewRepo(ctx, cli.MyDID(), ses)
205 } else {
206 LexiconRepo, err = atrepo.OpenRepo(ctx, ses, currentRoot)
207 if err != nil {
208 return nil, fmt.Errorf("failed to open repo: %w", err)
209 }
210 currentRev, err = CarStore.GetUserRepoRev(ctx, RepoUser)
211 if err != nil {
212 return nil, fmt.Errorf("failed to get user repo rev: %w", err)
213 }
214 }
215
216 LexiconPubMultibase = pub.Multibase()
217 lexs, err := walkLexicons(ctx, AllFiles, "/")
218 if err != nil {
219 return nil, fmt.Errorf("failed to walk lexicon files: %w", err)
220 }
221
222 ops := []*comatproto.SyncSubscribeRepos_RepoOp{}
223
224 for _, lex := range lexs {
225 lexFile := lexicon.SchemaFile{}
226 err := json.Unmarshal(lex, &lexFile)
227 if err != nil {
228 return nil, err
229 }
230 if !strings.HasPrefix(lexFile.ID, "place.stream") {
231 continue
232 }
233 sfw := &SchemaFileWrapper{SchemaFile: lexFile}
234 rpath := fmt.Sprintf("com.atproto.lexicon.schema/%s", lexFile.ID)
235 newCid, err := GetCID(sfw)
236 if err != nil {
237 return nil, err
238 }
239 cidLink := lexutil.LexLink(*newCid)
240
241 oldCid, _, err := LexiconRepo.GetRecord(ctx, rpath)
242 if errors.Is(err, mst.ErrNotFound) {
243 _, err = LexiconRepo.PutRecord(ctx, rpath, sfw)
244 if err != nil {
245 return nil, err
246 }
247 log.Debug(ctx, "created new lexicon record", "rpath", rpath, "cid", newCid.String())
248 ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{
249 Action: ActionCreate,
250 Path: rpath,
251 Cid: &cidLink,
252 })
253 } else if err != nil {
254 return nil, err
255 } else {
256 if newCid.Equals(oldCid) {
257 log.Debug(ctx, "new cid is the same as old cid, skipping lexicon record", "rpath", rpath, "cid", newCid.String())
258 continue
259 } else {
260 log.Debug(ctx, "new cid is different from old cid, updating lexicon record", "rpath", rpath, "old", oldCid.String(), "new", newCid.String())
261 _, err = LexiconRepo.UpdateRecord(ctx, rpath, sfw)
262 if err != nil {
263 return nil, err
264 }
265 oldLink := lexutil.LexLink(oldCid)
266 ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{
267 Action: ActionUpdate,
268 Path: rpath,
269 Prev: &oldLink,
270 Cid: &cidLink,
271 })
272 }
273 }
274 currentRoot, currentRev, err = LexiconRepo.Commit(ctx, signer)
275 if err != nil {
276 return nil, fmt.Errorf("failed to commit: %w", err)
277 }
278
279 log.Debug(ctx, "LexiconRepo committed", "cid", currentRoot.String(), "rev", currentRev)
280 }
281 blocks, err := ses.CloseWithRoot(ctx, currentRoot, currentRev)
282 if err != nil {
283 return nil, fmt.Errorf("failed to close delta session: %w", err)
284 }
285 signed := LexiconRepo.SignedCommit()
286 if len(ops) > 0 {
287 log.Log(ctx, "created new lexicon commit for changes", "did", signed.Did, "data", signed.Data, "prev", signed.Prev, "rev", signed.Rev)
288 commit := &comatproto.SyncSubscribeRepos_Commit{
289 Repo: cli.MyDID(),
290 Blocks: blocks,
291 Rev: currentRev,
292 Commit: lexutil.LexLink(currentRoot),
293 Time: time.Now().Format(util.ISO8601),
294 Ops: ops,
295 TooBig: false,
296 }
297 err := mod.CreateCommitEvent(commit, signed.Data.String())
298 if err != nil {
299 return nil, fmt.Errorf("failed to create commit event: %w", err)
300 }
301 }
302
303 return sqlDB, nil
304}
305
306func OpenLexiconRepo(ctx context.Context) (*atrepo.Repo, *carstore.DeltaSession, error) {
307 ses, err := CarStore.NewDeltaSession(ctx, RepoUser, nil)
308 if err != nil {
309 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to create delta session: %w", err)
310 }
311
312 base := ses.BaseCid()
313 if base == cid.Undef {
314 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: delta session has no base cid")
315 }
316
317 r, err := atrepo.OpenRepo(ctx, ses, base)
318 if err != nil {
319 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to open repo: %w", err)
320 }
321 return r, ses, nil
322}
323
324// Get record that handles special-casing for com.atproto.lexicon.schema
325func GetRecordCBOR(ctx context.Context, ses *carstore.DeltaSession, c cid.Cid, collection string, rkey string) (cbg.CBORMarshaler, error) {
326 b, err := ses.Get(ctx, c)
327 if err != nil {
328 return nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to get record for collection %q, rkey %q: %w", collection, rkey, err)
329 }
330 var val cbg.CBORMarshaler
331 if collection == "com.atproto.lexicon.schema" {
332 sfMap, err := data.UnmarshalCBOR(b.RawData())
333 if err != nil {
334 return nil, fmt.Errorf("failed to unmarshal schema file: %w", err)
335 }
336 jbs, err := json.Marshal(sfMap)
337 if err != nil {
338 return nil, fmt.Errorf("failed to marshal schema file: %w", err)
339 }
340 sf := lexicon.SchemaFile{}
341 err = json.Unmarshal(jbs, &sf)
342 if err != nil {
343 return nil, fmt.Errorf("failed to unmarshal schema file: %w", err)
344 }
345 val = &SchemaFileWrapper{
346 SchemaFile: sf,
347 }
348 } else {
349 val, err = lexutil.CborDecodeValue(b.RawData())
350 if err != nil {
351 return nil, fmt.Errorf("failed to decode record: %w", err)
352 }
353 }
354 return val, nil
355}