Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at feat-iroh-replicator 355 lines 9.6 kB view raw
1package atproto 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "io/fs" 11 "strings" 12 "time" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 16 "github.com/bluesky-social/indigo/atproto/data" 17 "github.com/bluesky-social/indigo/atproto/lexicon" 18 "github.com/bluesky-social/indigo/carstore" 19 lexutil "github.com/bluesky-social/indigo/lex/util" 20 "github.com/bluesky-social/indigo/models" 21 "github.com/bluesky-social/indigo/mst" 22 atrepo "github.com/bluesky-social/indigo/repo" 23 "github.com/bluesky-social/indigo/util" 24 "github.com/ipfs/go-cid" 25 cbg "github.com/whyrusleeping/cbor-gen" 26 "gorm.io/driver/sqlite" 27 "gorm.io/gorm" 28 29 "github.com/whyrusleeping/go-did" 30 "stream.place/streamplace/lexicons" 31 "stream.place/streamplace/pkg/config" 32 "stream.place/streamplace/pkg/log" 33 "stream.place/streamplace/pkg/model" 34) 35 36var LexiconRepo *atrepo.Repo 37var LexiconPubMultibase string 38var RepoUser models.Uid = models.Uid(1) 39var CarStore carstore.CarStore 40var ActionCreate = "create" 41var ActionUpdate = "update" 42var ActionDelete = "delete" 43 44func walkLexicons(ctx context.Context, bundle fs.FS, path string) ([][]byte, error) { 45 ret := [][]byte{} 46 err := fs.WalkDir(bundle, ".", func(path string, d fs.DirEntry, err error) error { 47 if err != nil { 48 return err 49 } 50 if d.IsDir() { 51 return nil 52 } 53 if !strings.HasSuffix(path, ".json") { 54 return nil 55 } 56 lex, err := fs.ReadFile(bundle, path) 57 if err != nil { 58 return err 59 } 60 ret = append(ret, lex) 61 return nil 62 }) 63 return ret, err 64} 65 66type SchemaFileWrapper struct { 67 LexiconTypeID string `json:"$type,const=com.atproto.lexicon.schema" cborgen:"$type,const=com.atproto.lexicon.schema"` 68 SchemaFile lexicon.SchemaFile 69} 70 71func (sfw *SchemaFileWrapper) MarshalCBOR(w io.Writer) error { 72 bs, err := json.Marshal(sfw.SchemaFile) 73 if err != nil { 74 return err 75 } 76 mapObj, err := data.UnmarshalJSON(bs) 77 if err != nil { 78 return err 79 } 80 mapObj["$type"] = "com.atproto.lexicon.schema" 81 cbs, err := data.MarshalCBOR(mapObj) 82 if err != nil { 83 return err 84 } 85 _, err = w.Write(cbs) 86 if err != nil { 87 return err 88 } 89 return nil 90} 91 92func (sfw *SchemaFileWrapper) MarshalJSON() ([]byte, error) { 93 bs, err := json.Marshal(sfw.SchemaFile) 94 if err != nil { 95 return nil, err 96 } 97 mapObj, err := data.UnmarshalJSON(bs) 98 if err != nil { 99 return nil, err 100 } 101 mapObj["$type"] = "com.atproto.lexicon.schema" 102 bs, err = json.Marshal(mapObj) 103 if err != nil { 104 return nil, err 105 } 106 return bs, nil 107} 108 109type SPKeyManager struct { 110 priv *did.PrivKey 111} 112 113func (km *SPKeyManager) VerifyUserSignature(ctx context.Context, did string, sb []byte, sig []byte) error { 114 panic("NYI") 115} 116 117func (km *SPKeyManager) SignForUser(ctx context.Context, did string, sb []byte) ([]byte, error) { 118 return km.priv.Sign(sb) 119} 120 121var AllFiles fs.FS = lexicons.AllFiles 122 123type Closer interface { 124 Close() error 125} 126 127func MakeLexiconRepo(ctx context.Context, cli *config.CLI, mod model.Model) (Closer, error) { 128 ctx = log.WithLogValues(ctx, "func", "MakeLexiconRepo") 129 fd, err := cli.DataFileCreate([]string{"carstore", "empty"}, true) 130 if err != nil { 131 return nil, err 132 } 133 sqlitePath := cli.DataFilePath([]string{"carstore", "meta.sqlite"}) 134 135 db, err := gorm.Open(sqlite.Open(sqlitePath)) 136 if err != nil { 137 return nil, err 138 } 139 err = fd.Close() 140 if err != nil { 141 return nil, err 142 } 143 CarStore, err = carstore.NewCarStore(db, []string{ 144 cli.DataFilePath([]string{"carstore"}), 145 }) 146 if err != nil { 147 return nil, err 148 } 149 150 sqlDB, err := db.DB() 151 if err != nil { 152 return nil, err 153 } 154 155 var priv *atcrypto.PrivateKeyK256 156 exists, err := cli.DataFileExists([]string{"carstore", "repo.key"}) 157 if err != nil { 158 return nil, err 159 } 160 if exists { 161 buf := bytes.Buffer{} 162 err := cli.DataFileRead([]string{"carstore", "repo.key"}, &buf) 163 if err != nil { 164 return nil, err 165 } 166 priv, err = atcrypto.ParsePrivateBytesK256(buf.Bytes()) 167 if err != nil { 168 return nil, err 169 } 170 } else { 171 priv, err = atcrypto.GeneratePrivateKeyK256() 172 if err != nil { 173 return nil, err 174 } 175 bs := priv.Bytes() 176 err = cli.DataFileWrite([]string{"carstore", "repo.key"}, bytes.NewReader(bs), true) 177 if err != nil { 178 return nil, err 179 } 180 } 181 182 pub, err := priv.PublicKey() 183 if err != nil { 184 return nil, fmt.Errorf("failed to get public key from private key: %w", err) 185 } 186 187 LexiconPubMultibase = pub.Multibase() 188 signer := func(ctx context.Context, did string, sb []byte) ([]byte, error) { 189 return priv.HashAndSign(sb) 190 } 191 192 ses, err := CarStore.NewDeltaSession(ctx, RepoUser, nil) 193 if err != nil { 194 return nil, fmt.Errorf("failed to create delta session: %w", err) 195 } 196 197 currentRoot, err := CarStore.GetUserRepoHead(ctx, RepoUser) 198 if err != nil { 199 return nil, fmt.Errorf("failed to get user repo head: %w", err) 200 } 201 currentRev := "" 202 203 if currentRoot == cid.Undef { 204 LexiconRepo = atrepo.NewRepo(ctx, cli.MyDID(), ses) 205 } else { 206 LexiconRepo, err = atrepo.OpenRepo(ctx, ses, currentRoot) 207 if err != nil { 208 return nil, fmt.Errorf("failed to open repo: %w", err) 209 } 210 currentRev, err = CarStore.GetUserRepoRev(ctx, RepoUser) 211 if err != nil { 212 return nil, fmt.Errorf("failed to get user repo rev: %w", err) 213 } 214 } 215 216 LexiconPubMultibase = pub.Multibase() 217 lexs, err := walkLexicons(ctx, AllFiles, "/") 218 if err != nil { 219 return nil, fmt.Errorf("failed to walk lexicon files: %w", err) 220 } 221 222 ops := []*comatproto.SyncSubscribeRepos_RepoOp{} 223 224 for _, lex := range lexs { 225 lexFile := lexicon.SchemaFile{} 226 err := json.Unmarshal(lex, &lexFile) 227 if err != nil { 228 return nil, err 229 } 230 if !strings.HasPrefix(lexFile.ID, "place.stream") { 231 continue 232 } 233 sfw := &SchemaFileWrapper{SchemaFile: lexFile} 234 rpath := fmt.Sprintf("com.atproto.lexicon.schema/%s", lexFile.ID) 235 newCid, err := GetCID(sfw) 236 if err != nil { 237 return nil, err 238 } 239 cidLink := lexutil.LexLink(*newCid) 240 241 oldCid, _, err := LexiconRepo.GetRecord(ctx, rpath) 242 if errors.Is(err, mst.ErrNotFound) { 243 _, err = LexiconRepo.PutRecord(ctx, rpath, sfw) 244 if err != nil { 245 return nil, err 246 } 247 log.Debug(ctx, "created new lexicon record", "rpath", rpath, "cid", newCid.String()) 248 ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{ 249 Action: ActionCreate, 250 Path: rpath, 251 Cid: &cidLink, 252 }) 253 } else if err != nil { 254 return nil, err 255 } else { 256 if newCid.Equals(oldCid) { 257 log.Debug(ctx, "new cid is the same as old cid, skipping lexicon record", "rpath", rpath, "cid", newCid.String()) 258 continue 259 } else { 260 log.Debug(ctx, "new cid is different from old cid, updating lexicon record", "rpath", rpath, "old", oldCid.String(), "new", newCid.String()) 261 _, err = LexiconRepo.UpdateRecord(ctx, rpath, sfw) 262 if err != nil { 263 return nil, err 264 } 265 oldLink := lexutil.LexLink(oldCid) 266 ops = append(ops, &comatproto.SyncSubscribeRepos_RepoOp{ 267 Action: ActionUpdate, 268 Path: rpath, 269 Prev: &oldLink, 270 Cid: &cidLink, 271 }) 272 } 273 } 274 currentRoot, currentRev, err = LexiconRepo.Commit(ctx, signer) 275 if err != nil { 276 return nil, fmt.Errorf("failed to commit: %w", err) 277 } 278 279 log.Debug(ctx, "LexiconRepo committed", "cid", currentRoot.String(), "rev", currentRev) 280 } 281 blocks, err := ses.CloseWithRoot(ctx, currentRoot, currentRev) 282 if err != nil { 283 return nil, fmt.Errorf("failed to close delta session: %w", err) 284 } 285 signed := LexiconRepo.SignedCommit() 286 if len(ops) > 0 { 287 log.Log(ctx, "created new lexicon commit for changes", "did", signed.Did, "data", signed.Data, "prev", signed.Prev, "rev", signed.Rev) 288 commit := &comatproto.SyncSubscribeRepos_Commit{ 289 Repo: cli.MyDID(), 290 Blocks: blocks, 291 Rev: currentRev, 292 Commit: lexutil.LexLink(currentRoot), 293 Time: time.Now().Format(util.ISO8601), 294 Ops: ops, 295 TooBig: false, 296 } 297 err := mod.CreateCommitEvent(commit, signed.Data.String()) 298 if err != nil { 299 return nil, fmt.Errorf("failed to create commit event: %w", err) 300 } 301 } 302 303 return sqlDB, nil 304} 305 306func OpenLexiconRepo(ctx context.Context) (*atrepo.Repo, *carstore.DeltaSession, error) { 307 ses, err := CarStore.NewDeltaSession(ctx, RepoUser, nil) 308 if err != nil { 309 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to create delta session: %w", err) 310 } 311 312 base := ses.BaseCid() 313 if base == cid.Undef { 314 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: delta session has no base cid") 315 } 316 317 r, err := atrepo.OpenRepo(ctx, ses, base) 318 if err != nil { 319 return nil, nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to open repo: %w", err) 320 } 321 return r, ses, nil 322} 323 324// Get record that handles special-casing for com.atproto.lexicon.schema 325func GetRecordCBOR(ctx context.Context, ses *carstore.DeltaSession, c cid.Cid, collection string, rkey string) (cbg.CBORMarshaler, error) { 326 b, err := ses.Get(ctx, c) 327 if err != nil { 328 return nil, fmt.Errorf("handleComAtprotoRepoListRecords: failed to get record for collection %q, rkey %q: %w", collection, rkey, err) 329 } 330 var val cbg.CBORMarshaler 331 if collection == "com.atproto.lexicon.schema" { 332 sfMap, err := data.UnmarshalCBOR(b.RawData()) 333 if err != nil { 334 return nil, fmt.Errorf("failed to unmarshal schema file: %w", err) 335 } 336 jbs, err := json.Marshal(sfMap) 337 if err != nil { 338 return nil, fmt.Errorf("failed to marshal schema file: %w", err) 339 } 340 sf := lexicon.SchemaFile{} 341 err = json.Unmarshal(jbs, &sf) 342 if err != nil { 343 return nil, fmt.Errorf("failed to unmarshal schema file: %w", err) 344 } 345 val = &SchemaFileWrapper{ 346 SchemaFile: sf, 347 } 348 } else { 349 val, err = lexutil.CborDecodeValue(b.RawData()) 350 if err != nil { 351 return nil, fmt.Errorf("failed to decode record: %w", err) 352 } 353 } 354 return val, nil 355}