Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/rtmp-rec 371 lines 10 kB view raw
1package atproto 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "io/fs" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 16 "github.com/bluesky-social/indigo/atproto/data" 17 "github.com/bluesky-social/indigo/atproto/lexicon" 18 "github.com/bluesky-social/indigo/carstore" 19 lexutil "github.com/bluesky-social/indigo/lex/util" 20 "github.com/bluesky-social/indigo/models" 21 "github.com/bluesky-social/indigo/mst" 22 atrepo "github.com/bluesky-social/indigo/repo" 23 "github.com/bluesky-social/indigo/util" 24 "github.com/ipfs/go-cid" 25 cbg "github.com/whyrusleeping/cbor-gen" 26 27 "github.com/whyrusleeping/go-did" 28 "stream.place/streamplace/lexicons" 29 "stream.place/streamplace/pkg/config" 30 "stream.place/streamplace/pkg/log" 31 "stream.place/streamplace/pkg/model" 32 "stream.place/streamplace/pkg/spid" 33 "stream.place/streamplace/pkg/statedb" 34) 35 36var LexiconRepo *atrepo.Repo 37var LexiconPubMultibase string 38var RepoUser models.Uid = models.Uid(1) 39var CarStore carstore.CarStore 40var ActionCreate = "create" 41var ActionUpdate = "update" 42var ActionDelete = "delete" 43 44func walkLexicons(ctx context.Context, bundle fs.FS, path string) ([][]byte, error) { 45 ret := [][]byte{} 46 err := fs.WalkDir(bundle, ".", func(path string, d fs.DirEntry, err error) error { 47 if err != nil { 48 return err 49 } 50 if d.IsDir() { 51 return nil 52 } 53 if !strings.HasSuffix(path, ".json") { 54 return nil 55 } 56 lex, err := fs.ReadFile(bundle, path) 57 if err != nil { 58 return err 59 } 60 ret = append(ret, lex) 61 return nil 62 }) 63 return ret, err 64} 65 66type SchemaFileWrapper struct { 67 LexiconTypeID string `json:"$type,const=com.atproto.lexicon.schema" cborgen:"$type,const=com.atproto.lexicon.schema"` 68 SchemaFile lexicon.SchemaFile 69} 70 71func (sfw *SchemaFileWrapper) MarshalCBOR(w io.Writer) error { 72 bs, err := json.Marshal(sfw.SchemaFile) 73 if err != nil { 74 return err 75 } 76 mapObj, err := data.UnmarshalJSON(bs) 77 if err != nil { 78 return err 79 } 80 mapObj["$type"] = "com.atproto.lexicon.schema" 81 cbs, err := data.MarshalCBOR(mapObj) 82 if err != nil { 83 return err 84 } 85 _, err = w.Write(cbs) 86 if err != nil { 87 return err 88 } 89 return nil 90} 91 92func (sfw *SchemaFileWrapper) MarshalJSON() ([]byte, error) { 93 bs, err := json.Marshal(sfw.SchemaFile) 94 if err != nil { 95 return nil, err 96 } 97 mapObj, err := data.UnmarshalJSON(bs) 98 if err != nil { 99 return nil, err 100 } 101 mapObj["$type"] = "com.atproto.lexicon.schema" 102 bs, err = json.Marshal(mapObj) 103 if err != nil { 104 return nil, err 105 } 106 return bs, nil 107} 108 109type SPKeyManager struct { 110 priv *did.PrivKey 111} 112 113func (km *SPKeyManager) VerifyUserSignature(ctx context.Context, did string, sb []byte, sig []byte) error { 114 panic("NYI") 115} 116 117func (km *SPKeyManager) SignForUser(ctx context.Context, did string, sb []byte) ([]byte, error) { 118 return km.priv.Sign(sb) 119} 120 121var AllFiles fs.FS = lexicons.AllFiles 122 123type Closer interface { 124 Close() error 125} 126 127type NoopCloser struct{} 128 129func (c *NoopCloser) Close() error { 130 return nil 131} 132 133func MakeLexiconRepo(ctx context.Context, cli *config.CLI, mod model.Model, state *statedb.StatefulDB) (Closer, error) { 134 ctx = log.WithLogValues(ctx, "func", "MakeLexiconRepo") 135 var err error 136 137 sqliteStore := &carstore.SQLiteStore{} 138 139 err = sqliteStore.Open(":memory:") 140 if err != nil { 141 return nil, err 142 } 143 CarStore = sqliteStore 144 145 var priv *atcrypto.PrivateKeyK256 146 147 keyBs, err := state.GetConfig("repo-key") 148 if err != nil { 149 return nil, err 150 } 151 if keyBs != nil { 152 // good path: we have a key in the stateful database 153 priv, err = atcrypto.ParsePrivateBytesK256(keyBs.Value) 154 if err != nil { 155 return nil, fmt.Errorf("failed to parse repo key from stateful database: %w", err) 156 } 157 } else { 158 // migration path: maybe we have an old one on disk. 159 exists, err := cli.DataFileExists([]string{"carstore", "repo.key"}) 160 if err != nil { 161 return nil, err 162 } 163 if exists { 164 log.Warn(ctx, "found old repo key on disk, migrating to stateful database", "path", cli.DataFilePath([]string{"carstore", "repo.key"})) 165 buf := bytes.Buffer{} 166 err := cli.DataFileRead([]string{"carstore", "repo.key"}, &buf) 167 if err != nil { 168 return nil, err 169 } 170 priv, err = atcrypto.ParsePrivateBytesK256(buf.Bytes()) 171 if err != nil { 172 return nil, fmt.Errorf("failed to read repo key from disk: %w", err) 173 } 174 } else { 175 priv, err = atcrypto.GeneratePrivateKeyK256() 176 if err != nil { 177 return nil, err 178 } 179 } 180 bs := priv.Bytes() 181 err = state.PutConfig("repo-key", bs) 182 if err != nil { 183 return nil, fmt.Errorf("failed to save repo key to stateful database: %w", err) 184 } 185 } 186 187 pub, err := priv.PublicKey() 188 if err != nil { 189 return nil, fmt.Errorf("failed to get public key from private key: %w", err) 190 } 191 192 LexiconPubMultibase = pub.Multibase() 193 signer := func(ctx context.Context, did string, sb []byte) ([]byte, error) { 194 return priv.HashAndSign(sb) 195 } 196 197 events, err := state.GetCommitEventsSince(cli.MyDID(), time.Time{}) 198 if err != nil { 199 return nil, fmt.Errorf("failed to get commit events: %w", err) 200 } 201 202 var ses *carstore.DeltaSession 203 var currentRoot cid.Cid 204 var currentRev string 205 206 for _, event := range events { 207 evt, err := event.ToCommitEvent() 208 if err != nil { 209 return nil, fmt.Errorf("failed to convert event to commit event: %w", err) 210 } 211 currentRoot, ses, err = CarStore.ImportSlice(ctx, RepoUser, nil, evt.Blocks) 212 if err != nil { 213 return nil, fmt.Errorf("failed to import slice: %w", err) 214 } 215 currentRev = evt.Rev 216 } 217 218 if currentRoot == cid.Undef { 219 log.Warn(ctx, "no existing lexicon repo, creating new one") 220 ses, err = CarStore.NewDeltaSession(ctx, RepoUser, nil) 221 if err != nil { 222 return nil, fmt.Errorf("failed to create delta session: %w", err) 223 } 224 LexiconRepo = atrepo.NewRepo(ctx, cli.MyDID(), ses) 225 } else { 226 LexiconRepo, err = atrepo.OpenRepo(ctx, ses, currentRoot) 227 if err != nil { 228 return nil, fmt.Errorf("failed to open repo: %w", err) 229 } 230 } 231 232 LexiconPubMultibase = pub.Multibase() 233 lexs, err := walkLexicons(ctx, AllFiles, "/") 234 if err != nil { 235 return nil, fmt.Errorf("failed to walk lexicon files: %w", err) 236 } 237 238 ops := []*comatproto.SyncSubscribeRepos_RepoOp{} 239 240 for _, lex := range lexs { 241 lexFile := lexicon.SchemaFile{} 242 err := json.Unmarshal(lex, &lexFile) 243 if err != nil { 244 return nil, err 245 } 246 if !strings.HasPrefix(lexFile.ID, "place.stream") { 247 continue 248 } 249 sfw := &SchemaFileWrapper{SchemaFile: lexFile} 250 rpath := fmt.Sprintf("com.atproto.lexicon.schema/%s", lexFile.ID) 251 newCid, err := spid.GetCID(sfw) 252 if err != nil { 253 return nil, err 254 } 255 cidLink := lexutil.LexLink(*newCid) 256 257 oldCid, _, err := LexiconRepo.GetRecord(ctx, rpath) 258 if errors.Is(err, mst.ErrNotFound) { 259 _, err = LexiconRepo.PutRecord(ctx, rpath, sfw) 260 if err != nil { 261 return nil, err 262 } 263 log.Debug(ctx, "created new lexicon record", "rpath", rpath, "cid", newCid.String()) 264 ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{ 265 Action: ActionCreate, 266 Path: rpath, 267 Cid: &cidLink, 268 }) 269 } else if err != nil { 270 return nil, err 271 } else { 272 if newCid.Equals(oldCid) { 273 log.Debug(ctx, "new cid is the same as old cid, skipping lexicon record", "rpath", rpath, "cid", newCid.String()) 274 continue 275 } else { 276 log.Debug(ctx, "new cid is different from old cid, updating lexicon record", "rpath", rpath, "old", oldCid.String(), "new", newCid.String()) 277 _, err = LexiconRepo.UpdateRecord(ctx, rpath, sfw) 278 if err != nil { 279 return nil, err 280 } 281 oldLink := lexutil.LexLink(oldCid) 282 ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{ 283 Action: ActionUpdate, 284 Path: rpath, 285 Prev: &oldLink, 286 Cid: &cidLink, 287 }) 288 } 289 } 290 currentRoot, currentRev, err = LexiconRepo.Commit(ctx, signer) 291 if err != nil { 292 return nil, fmt.Errorf("failed to commit: %w", err) 293 } 294 295 log.Debug(ctx, "LexiconRepo committed", "cid", currentRoot.String(), "rev", currentRev) 296 } 297 blocks, err := ses.CloseWithRoot(ctx, currentRoot, currentRev) 298 if err != nil { 299 return nil, fmt.Errorf("failed to close delta session: %w", err) 300 } 301 signed := LexiconRepo.SignedCommit() 302 if len(ops) > 0 { 303 log.Log(ctx, "created new lexicon commit for changes", "did", signed.Did, "data", signed.Data, "prev", signed.Prev, "rev", signed.Rev) 304 commit := &comatproto.SyncSubscribeRepos_Commit{ 305 Repo: cli.MyDID(), 306 Blocks: blocks, 307 Rev: currentRev, 308 Commit: lexutil.LexLink(currentRoot), 309 Time: time.Now().Format(util.ISO8601), 310 Ops: ops, 311 TooBig: false, 312 } 313 err := state.CreateCommitEvent(commit, signed.Data.String()) 314 if err != nil { 315 return nil, fmt.Errorf("failed to create commit event: %w", err) 316 } 317 } 318 319 return &NoopCloser{}, nil 320} 321 322func OpenLexiconRepo(ctx context.Context) (*atrepo.Repo, *carstore.DeltaSession, error) { 323 ses, err := CarStore.NewDeltaSession(ctx, RepoUser, nil) 324 if err != nil { 325 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to create delta session: %w", err) 326 } 327 328 base := ses.BaseCid() 329 if base == cid.Undef { 330 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: delta session has no base cid") 331 } 332 333 r, err := atrepo.OpenRepo(ctx, ses, base) 334 if err != nil { 335 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to open repo: %w", err) 336 } 337 return r, ses, nil 338} 339 340// Get record that handles special-casing for com.atproto.lexicon.schema 341func GetRecordCBOR(ctx context.Context, ses *carstore.DeltaSession, c cid.Cid, collection string, rkey string) (cbg.CBORMarshaler, error) { 342 b, err := ses.Get(ctx, c) 343 if err != nil { 344 return nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to get record for collection %q, rkey %q: %w", collection, rkey, err) 345 } 346 var val cbg.CBORMarshaler 347 if collection == "com.atproto.lexicon.schema" { 348 sfMap, err := data.UnmarshalCBOR(b.RawData()) 349 if err != nil { 350 return nil, fmt.Errorf("failed to unmarshal schema file: %w", err) 351 } 352 jbs, err := json.Marshal(sfMap) 353 if err != nil { 354 return nil, fmt.Errorf("failed to marshal schema file: %w", err) 355 } 356 sf := lexicon.SchemaFile{} 357 err = json.Unmarshal(jbs, &sf) 358 if err != nil { 359 return nil, fmt.Errorf("failed to unmarshal schema file: %w", err) 360 } 361 val = &SchemaFileWrapper{ 362 SchemaFile: sf, 363 } 364 } else { 365 val, err = lexutil.CborDecodeValue(b.RawData()) 366 if err != nil { 367 return nil, fmt.Errorf("failed to decode record: %w", err) 368 } 369 } 370 return val, nil 371}