forked from hailey.at/cocoon
An atproto PDS written in Go

refactor blockstore to be extensible

+1 -9
blockstore/blockstore.go sqlite_blockstore/sqlite_blockstore.go
··· 1 - package blockstore 1 + package sqlite_blockstore 2 2 3 3 import ( 4 4 "context" ··· 134 134 135 135 func (bs *SqliteBlockstore) HashOnRead(enabled bool) { 136 136 panic("not implemented") 137 - } 138 - 139 - func (bs *SqliteBlockstore) UpdateRepo(ctx context.Context, root cid.Cid, rev string) error { 140 - if err := bs.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, bs.did).Error; err != nil { 141 - return err 142 - } 143 - 144 - return nil 145 137 } 146 138 147 139 func (bs *SqliteBlockstore) Execute(ctx context.Context) error {
+77
recording_blockstore/recording_blockstore.go
··· 1 + package recording_blockstore 2 + 3 + import ( 4 + "context" 5 + 6 + blockformat "github.com/ipfs/go-block-format" 7 + "github.com/ipfs/go-cid" 8 + blockstore "github.com/ipfs/go-ipfs-blockstore" 9 + ) 10 + 11 + type RecordingBlockstore struct { 12 + base blockstore.Blockstore 13 + 14 + inserts map[cid.Cid]blockformat.Block 15 + } 16 + 17 + func New(base blockstore.Blockstore) *RecordingBlockstore { 18 + return &RecordingBlockstore{ 19 + base: base, 20 + inserts: make(map[cid.Cid]blockformat.Block), 21 + } 22 + } 23 + 24 + func (bs *RecordingBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { 25 + return bs.base.Has(ctx, c) 26 + } 27 + 28 + func (bs *RecordingBlockstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { 29 + return bs.base.Get(ctx, c) 30 + } 31 + 32 + func (bs *RecordingBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { 33 + return bs.base.GetSize(ctx, c) 34 + } 35 + 36 + func (bs *RecordingBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error { 37 + return bs.base.DeleteBlock(ctx, c) 38 + } 39 + 40 + func (bs *RecordingBlockstore) Put(ctx context.Context, block blockformat.Block) error { 41 + if err := bs.base.Put(ctx, block); err != nil { 42 + return err 43 + } 44 + bs.inserts[block.Cid()] = block 45 + return nil 46 + } 47 + 48 + func (bs *RecordingBlockstore) PutMany(ctx context.Context, blocks []blockformat.Block) error { 49 + if err := bs.base.PutMany(ctx, blocks); err != nil { 50 + return err 51 + } 52 + 53 + for _, b := range blocks { 54 + bs.inserts[b.Cid()] = b 55 + } 56 + 57 + return nil 58 + } 59 + 60 + func (bs *RecordingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 61 + return bs.AllKeysChan(ctx) 62 + } 63 + 64 + func (bs *RecordingBlockstore) HashOnRead(enabled bool) { 65 + } 66 + 67 + func (bs *RecordingBlockstore) GetLogMap() map[cid.Cid]blockformat.Block { 68 + return bs.inserts 69 + } 70 + 71 + func (bs *RecordingBlockstore) GetLogArray() []blockformat.Block { 72 + var blocks []blockformat.Block 73 + for _, b := range bs.inserts { 74 + blocks = append(blocks, b) 75 + } 76 + return blocks 77 + }
+2 -3
server/handle_import_repo.go
··· 9 9 10 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 11 "github.com/bluesky-social/indigo/repo" 12 - "github.com/haileyok/cocoon/blockstore" 13 12 "github.com/haileyok/cocoon/internal/helpers" 14 13 "github.com/haileyok/cocoon/models" 15 14 blocks "github.com/ipfs/go-block-format" ··· 27 26 return helpers.ServerError(e, nil) 28 27 } 29 28 30 - bs := blockstore.New(urepo.Repo.Did, s.db) 29 + bs := s.createBlockstore(urepo.Repo.Did) 31 30 32 31 cs, err := car.NewCarReader(bytes.NewReader(b)) 33 32 if err != nil { ··· 107 106 return helpers.ServerError(e, nil) 108 107 } 109 108 110 - if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil { 109 + if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil { 111 110 s.logger.Error("error updating repo after commit", "error", err) 112 111 return helpers.ServerError(e, nil) 113 112 }
+2 -3
server/handle_server_create_account.go
··· 14 14 "github.com/bluesky-social/indigo/events" 15 15 "github.com/bluesky-social/indigo/repo" 16 16 "github.com/bluesky-social/indigo/util" 17 - "github.com/haileyok/cocoon/blockstore" 18 17 "github.com/haileyok/cocoon/internal/helpers" 19 18 "github.com/haileyok/cocoon/models" 20 19 "github.com/labstack/echo/v4" ··· 177 176 } 178 177 179 178 if customDidHeader == "" { 180 - bs := blockstore.New(signupDid, s.db) 179 + bs := s.createBlockstore(signupDid) 181 180 r := repo.NewRepo(context.TODO(), signupDid, bs) 182 181 183 182 root, rev, err := r.Commit(context.TODO(), urepo.SignFor) ··· 186 185 return helpers.ServerError(e, nil) 187 186 } 188 187 189 - if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil { 188 + if err := s.UpdateRepo(context.TODO(), urepo.Did, root, rev); err != nil { 190 189 s.logger.Error("error updating repo after commit", "error", err) 191 190 return helpers.ServerError(e, nil) 192 191 }
+1 -2
server/handle_sync_get_blocks.go
··· 6 6 "strings" 7 7 8 8 "github.com/bluesky-social/indigo/carstore" 9 - "github.com/haileyok/cocoon/blockstore" 10 9 "github.com/haileyok/cocoon/internal/helpers" 11 10 "github.com/ipfs/go-cid" 12 11 cbor "github.com/ipfs/go-ipld-cbor" ··· 54 53 return helpers.ServerError(e, nil) 55 54 } 56 55 57 - bs := blockstore.New(urepo.Repo.Did, s.db) 56 + bs := s.createBlockstore(urepo.Repo.Did) 58 57 59 58 for _, c := range cids { 60 59 b, err := bs.Get(context.TODO(), c)
+12 -12
server/repo.go
··· 16 16 "github.com/bluesky-social/indigo/events" 17 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 18 "github.com/bluesky-social/indigo/repo" 19 - "github.com/bluesky-social/indigo/util" 20 - "github.com/haileyok/cocoon/blockstore" 21 19 "github.com/haileyok/cocoon/internal/db" 22 20 "github.com/haileyok/cocoon/models" 21 + "github.com/haileyok/cocoon/recording_blockstore" 23 22 blocks "github.com/ipfs/go-block-format" 24 23 "github.com/ipfs/go-cid" 25 24 cbor "github.com/ipfs/go-ipld-cbor" ··· 103 102 return nil, err 104 103 } 105 104 106 - dbs := blockstore.New(urepo.Did, rm.db) 105 + dbs := rm.s.createBlockstore(urepo.Did) 106 + bs := recording_blockstore.New(dbs) 107 107 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid) 108 108 109 109 entries := []models.Record{} ··· 274 274 } 275 275 } 276 276 277 - for _, op := range dbs.GetLog() { 277 + for _, op := range bs.GetLogMap() { 278 278 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 279 279 return nil, err 280 280 } ··· 324 324 }, 325 325 }) 326 326 327 - if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil { 327 + if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil { 328 328 return nil, err 329 329 } 330 330 ··· 345 345 return cid.Undef, nil, err 346 346 } 347 347 348 - dbs := blockstore.New(urepo.Did, rm.db) 349 - bs := util.NewLoggingBstore(dbs) 348 + dbs := rm.s.createBlockstore(urepo.Did) 349 + bs := recording_blockstore.New(dbs) 350 350 351 351 r, err := repo.OpenRepo(context.TODO(), bs, c) 352 352 if err != nil { ··· 358 358 return cid.Undef, nil, err 359 359 } 360 360 361 - return c, bs.GetLoggedBlocks(), nil 361 + return c, bs.GetLogArray(), nil 362 362 } 363 363 364 364 func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) { ··· 414 414 return nil, fmt.Errorf("error unmarshaling cbor: %w", err) 415 415 } 416 416 417 - var deepiter func(interface{}) error 418 - deepiter = func(item interface{}) error { 417 + var deepiter func(any) error 418 + deepiter = func(item any) error { 419 419 switch val := item.(type) { 420 - case map[string]interface{}: 420 + case map[string]any: 421 421 if val["$type"] == "blob" { 422 422 if ref, ok := val["ref"].(string); ok { 423 423 c, err := cid.Parse(ref) ··· 430 430 return deepiter(v) 431 431 } 432 432 } 433 - case []interface{}: 433 + case []any: 434 434 for _, v := range val { 435 435 deepiter(v) 436 436 }
+16
server/server.go
··· 38 38 "github.com/haileyok/cocoon/oauth/dpop" 39 39 "github.com/haileyok/cocoon/oauth/provider" 40 40 "github.com/haileyok/cocoon/plc" 41 + "github.com/haileyok/cocoon/sqlite_blockstore" 42 + "github.com/ipfs/go-cid" 43 + blockstore "github.com/ipfs/go-ipfs-blockstore" 41 44 echo_session "github.com/labstack/echo-contrib/session" 42 45 "github.com/labstack/echo/v4" 43 46 "github.com/labstack/echo/v4/middleware" ··· 641 644 go s.doBackup() 642 645 } 643 646 } 647 + 648 + func (s *Server) createBlockstore(did string) blockstore.Blockstore { 649 + // TODO: eventually configurable blockstore types here 650 + return sqlite_blockstore.New(did, s.db) 651 + } 652 + 653 + func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error { 654 + if err := s.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil { 655 + return err 656 + } 657 + 658 + return nil 659 + }