forked from hailey.at/cocoon
An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

-163
blockstore/blockstore.go
··· 1 - package blockstore 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - 7 - "github.com/bluesky-social/indigo/atproto/syntax" 8 - "github.com/haileyok/cocoon/internal/db" 9 - "github.com/haileyok/cocoon/models" 10 - blocks "github.com/ipfs/go-block-format" 11 - "github.com/ipfs/go-cid" 12 - "gorm.io/gorm/clause" 13 - ) 14 - 15 - type SqliteBlockstore struct { 16 - db *db.DB 17 - did string 18 - readonly bool 19 - inserts map[cid.Cid]blocks.Block 20 - } 21 - 22 - func New(did string, db *db.DB) *SqliteBlockstore { 23 - return &SqliteBlockstore{ 24 - did: did, 25 - db: db, 26 - readonly: false, 27 - inserts: map[cid.Cid]blocks.Block{}, 28 - } 29 - } 30 - 31 - func NewReadOnly(did string, db *db.DB) *SqliteBlockstore { 32 - return &SqliteBlockstore{ 33 - did: did, 34 - db: db, 35 - readonly: true, 36 - inserts: map[cid.Cid]blocks.Block{}, 37 - } 38 - } 39 - 40 - func (bs *SqliteBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { 41 - var block models.Block 42 - 43 - maybeBlock, ok := bs.inserts[cid] 44 - if ok { 45 - return maybeBlock, nil 46 - } 47 - 48 - if err := bs.db.Raw("SELECT * FROM blocks WHERE did = ? AND cid = ?", nil, bs.did, cid.Bytes()).Scan(&block).Error; err != nil { 49 - return nil, err 50 - } 51 - 52 - b, err := blocks.NewBlockWithCid(block.Value, cid) 53 - if err != nil { 54 - return nil, err 55 - } 56 - 57 - return b, nil 58 - } 59 - 60 - func (bs *SqliteBlockstore) Put(ctx context.Context, block blocks.Block) error { 61 - bs.inserts[block.Cid()] = block 62 - 63 - if bs.readonly { 64 - return nil 65 - } 66 - 67 - b := models.Block{ 68 - Did: bs.did, 69 - Cid: block.Cid().Bytes(), 70 - Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this 71 - Value: block.RawData(), 72 - } 73 - 74 - if err := bs.db.Create(&b, []clause.Expression{clause.OnConflict{ 75 - Columns: []clause.Column{{Name: "did"}, {Name: "cid"}}, 76 - UpdateAll: true, 77 - }}).Error; err != nil { 78 - return err 79 - } 80 - 81 - return nil 82 - } 83 - 84 - func (bs *SqliteBlockstore) DeleteBlock(context.Context, cid.Cid) error { 85 - panic("not implemented") 86 - } 87 - 88 - func (bs *SqliteBlockstore) Has(context.Context, cid.Cid) (bool, error) { 89 - panic("not implemented") 90 - } 91 - 92 - func (bs *SqliteBlockstore) GetSize(context.Context, cid.Cid) (int, error) { 93 - panic("not implemented") 94 - } 95 - 96 - func (bs *SqliteBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { 97 - tx := bs.db.BeginDangerously() 98 - 99 - for _, block := range blocks { 100 - bs.inserts[block.Cid()] = block 101 - 102 - if bs.readonly { 103 - continue 104 - } 105 - 106 - b := models.Block{ 107 - Did: bs.did, 108 - Cid: block.Cid().Bytes(), 109 - Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this 110 - Value: block.RawData(), 111 - } 112 - 113 - if err := tx.Clauses(clause.OnConflict{ 114 - Columns: []clause.Column{{Name: "did"}, {Name: "cid"}}, 115 - UpdateAll: true, 116 - }).Create(&b).Error; err != nil { 117 - tx.Rollback() 118 - return err 119 - } 120 - } 121 - 122 - if bs.readonly { 123 - return nil 124 - } 125 - 126 - tx.Commit() 127 - 128 - return nil 129 - } 130 - 131 - func (bs *SqliteBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 132 - panic("not implemented") 133 - } 134 - 135 - func (bs *SqliteBlockstore) HashOnRead(enabled bool) { 136 - panic("not implemented") 137 - } 138 - 139 - func (bs *SqliteBlockstore) UpdateRepo(ctx context.Context, root cid.Cid, rev string) error { 140 - if err := bs.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, bs.did).Error; err != nil { 141 - return err 142 - } 143 - 144 - return nil 145 - } 146 - 147 - func (bs *SqliteBlockstore) Execute(ctx context.Context) error { 148 - if !bs.readonly { 149 - return fmt.Errorf("blockstore was not readonly") 150 - } 151 - 152 - bs.readonly = false 153 - for _, b := range bs.inserts { 154 - bs.Put(ctx, b) 155 - } 156 - bs.readonly = true 157 - 158 - return nil 159 - } 160 - 161 - func (bs *SqliteBlockstore) GetLog() map[cid.Cid]blocks.Block { 162 - return bs.inserts 163 - }
···
+7
cmd/cocoon/main.go
··· 136 EnvVars: []string{"COCOON_DEFAULT_ATPROTO_PROXY"}, 137 Value: "did:web:api.bsky.app#bsky_appview", 138 }, 139 }, 140 Commands: []*cli.Command{ 141 runServe, ··· 158 Usage: "Start the cocoon PDS", 159 Flags: []cli.Flag{}, 160 Action: func(cmd *cli.Context) error { 161 s, err := server.New(&server.Args{ 162 Addr: cmd.String("addr"), 163 DbName: cmd.String("db-name"), ··· 185 }, 186 SessionSecret: cmd.String("session-secret"), 187 DefaultAtprotoProxy: cmd.String("default-atproto-proxy"), 188 }) 189 if err != nil { 190 fmt.Printf("error creating cocoon: %v", err)
··· 136 EnvVars: []string{"COCOON_DEFAULT_ATPROTO_PROXY"}, 137 Value: "did:web:api.bsky.app#bsky_appview", 138 }, 139 + &cli.StringFlag{ 140 + Name: "blockstore-variant", 141 + EnvVars: []string{"COCOON_BLOCKSTORE_VARIANT"}, 142 + Value: "sqlite", 143 + }, 144 }, 145 Commands: []*cli.Command{ 146 runServe, ··· 163 Usage: "Start the cocoon PDS", 164 Flags: []cli.Flag{}, 165 Action: func(cmd *cli.Context) error { 166 + 167 s, err := server.New(&server.Args{ 168 Addr: cmd.String("addr"), 169 DbName: cmd.String("db-name"), ··· 191 }, 192 SessionSecret: cmd.String("session-secret"), 193 DefaultAtprotoProxy: cmd.String("default-atproto-proxy"), 194 + BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")), 195 }) 196 if err != nil { 197 fmt.Printf("error creating cocoon: %v", err)
+73 -54
identity/identity.go
··· 13 "github.com/bluesky-social/indigo/util" 14 ) 15 16 - func ResolveHandle(ctx context.Context, cli *http.Client, handle string) (string, error) { 17 - if cli == nil { 18 - cli = util.RobustHTTPClient() 19 - } 20 - 21 - var did string 22 - 23 - _, err := syntax.ParseHandle(handle) 24 if err != nil { 25 - return "", err 26 } 27 28 - recs, err := net.LookupTXT(fmt.Sprintf("_atproto.%s", handle)) 29 - if err == nil { 30 - for _, rec := range recs { 31 - if strings.HasPrefix(rec, "did=") { 32 - did = strings.Split(rec, "did=")[1] 33 - break 34 } 35 } 36 - } else { 37 - fmt.Printf("erorr getting txt records: %v\n", err) 38 } 39 40 - if did == "" { 41 - req, err := http.NewRequestWithContext( 42 - ctx, 43 - "GET", 44 - fmt.Sprintf("https://%s/.well-known/atproto-did", handle), 45 - nil, 46 - ) 47 - if err != nil { 48 - return "", nil 49 - } 50 51 - resp, err := http.DefaultClient.Do(req) 52 - if err != nil { 53 - return "", nil 54 - } 55 - defer resp.Body.Close() 56 57 - if resp.StatusCode != http.StatusOK { 58 - io.Copy(io.Discard, resp.Body) 59 - return "", fmt.Errorf("unable to resolve handle") 60 - } 61 62 - b, err := io.ReadAll(resp.Body) 63 - if err != nil { 64 - return "", err 65 - } 66 67 - maybeDid := string(b) 68 69 - if _, err := syntax.ParseDID(maybeDid); err != nil { 70 - return "", fmt.Errorf("unable to resolve handle") 71 - } 72 73 - did = maybeDid 74 } 75 76 - return did, nil 77 } 78 79 - func FetchDidDoc(ctx context.Context, cli *http.Client, did string) (*DidDoc, error) { 80 if cli == nil { 81 cli = util.RobustHTTPClient() 82 } 83 84 - var ustr string 85 if strings.HasPrefix(did, "did:plc:") { 86 - ustr = fmt.Sprintf("https://plc.directory/%s", did) 87 } else if strings.HasPrefix(did, "did:web:") { 88 - ustr = fmt.Sprintf("https://%s/.well-known/did.json", strings.TrimPrefix(did, "did:web:")) 89 } else { 90 - return nil, fmt.Errorf("did was not a supported did type") 91 } 92 93 req, err := http.NewRequestWithContext(ctx, "GET", ustr, nil) ··· 95 return nil, err 96 } 97 98 - resp, err := http.DefaultClient.Do(req) 99 if err != nil { 100 return nil, err 101 } ··· 103 104 if resp.StatusCode != 200 { 105 io.Copy(io.Discard, resp.Body) 106 - return nil, fmt.Errorf("could not find identity in plc registry") 107 } 108 109 var diddoc DidDoc ··· 127 return nil, err 128 } 129 130 - resp, err := http.DefaultClient.Do(req) 131 if err != nil { 132 return nil, err 133 }
··· 13 "github.com/bluesky-social/indigo/util" 14 ) 15 16 + func ResolveHandleFromTXT(ctx context.Context, handle string) (string, error) { 17 + name := fmt.Sprintf("_atproto.%s", handle) 18 + recs, err := net.LookupTXT(name) 19 if err != nil { 20 + return "", fmt.Errorf("handle could not be resolved via txt: %w", err) 21 } 22 23 + for _, rec := range recs { 24 + if strings.HasPrefix(rec, "did=") { 25 + maybeDid := strings.Split(rec, "did=")[1] 26 + if _, err := syntax.ParseDID(maybeDid); err == nil { 27 + return maybeDid, nil 28 } 29 } 30 } 31 32 + return "", fmt.Errorf("handle could not be resolved via txt: no record found") 33 + } 34 35 + func ResolveHandleFromWellKnown(ctx context.Context, cli *http.Client, handle string) (string, error) { 36 + ustr := fmt.Sprintf("https://%s/.well=known/atproto-did", handle) 37 + req, err := http.NewRequestWithContext( 38 + ctx, 39 + "GET", 40 + ustr, 41 + nil, 42 + ) 43 + if err != nil { 44 + return "", fmt.Errorf("handle could not be resolved via web: %w", err) 45 + } 46 47 + resp, err := cli.Do(req) 48 + if err != nil { 49 + return "", fmt.Errorf("handle could not be resolved via web: %w", err) 50 + } 51 + defer resp.Body.Close() 52 53 + b, err := io.ReadAll(resp.Body) 54 + if err != nil { 55 + return "", fmt.Errorf("handle could not be resolved via web: %w", err) 56 + } 57 58 + if resp.StatusCode != http.StatusOK { 59 + return "", fmt.Errorf("handle could not be resolved via web: invalid status code %d", resp.StatusCode) 60 + } 61 62 + maybeDid := string(b) 63 64 + if _, err := syntax.ParseDID(maybeDid); err != nil { 65 + return "", fmt.Errorf("handle could not be resolved via web: invalid did in document") 66 } 67 68 + return maybeDid, nil 69 } 70 71 + func ResolveHandle(ctx context.Context, cli *http.Client, handle string) (string, error) { 72 if cli == nil { 73 cli = util.RobustHTTPClient() 74 } 75 76 + _, err := syntax.ParseHandle(handle) 77 + if err != nil { 78 + return "", err 79 + } 80 + 81 + if maybeDidFromTxt, err := ResolveHandleFromTXT(ctx, handle); err == nil { 82 + return maybeDidFromTxt, nil 83 + } 84 + 85 + if maybeDidFromWeb, err := ResolveHandleFromWellKnown(ctx, cli, handle); err == nil { 86 + return maybeDidFromWeb, nil 87 + } 88 + 89 + return "", fmt.Errorf("handle could not be resolved") 90 + } 91 + 92 + func DidToDocUrl(did string) (string, error) { 93 if strings.HasPrefix(did, "did:plc:") { 94 + return fmt.Sprintf("https://plc.directory/%s", did), nil 95 } else if strings.HasPrefix(did, "did:web:") { 96 + return fmt.Sprintf("https://%s/.well-known/did.json", strings.TrimPrefix(did, "did:web:")), nil 97 } else { 98 + return "", fmt.Errorf("did was not a supported did type") 99 + } 100 + } 101 + 102 + func FetchDidDoc(ctx context.Context, cli *http.Client, did string) (*DidDoc, error) { 103 + if cli == nil { 104 + cli = util.RobustHTTPClient() 105 + } 106 + 107 + ustr, err := DidToDocUrl(did) 108 + if err != nil { 109 + return nil, err 110 } 111 112 req, err := http.NewRequestWithContext(ctx, "GET", ustr, nil) ··· 114 return nil, err 115 } 116 117 + resp, err := cli.Do(req) 118 if err != nil { 119 return nil, err 120 } ··· 122 123 if resp.StatusCode != 200 { 124 io.Copy(io.Discard, resp.Body) 125 + return nil, fmt.Errorf("unable to find did doc at url. did: %s. url: %s", did, ustr) 126 } 127 128 var diddoc DidDoc ··· 146 return nil, err 147 } 148 149 + resp, err := cli.Do(req) 150 if err != nil { 151 return nil, err 152 }
+16 -5
identity/passport.go
··· 19 type Passport struct { 20 h *http.Client 21 bc BackingCache 22 - lk sync.Mutex 23 } 24 25 func NewPassport(h *http.Client, bc BackingCache) *Passport { ··· 30 return &Passport{ 31 h: h, 32 bc: bc, 33 - lk: sync.Mutex{}, 34 } 35 } 36 ··· 38 skipCache, _ := ctx.Value("skip-cache").(bool) 39 40 if !skipCache { 41 cached, ok := p.bc.GetDoc(did) 42 if ok { 43 return cached, nil 44 } 45 } 46 47 - p.lk.Lock() // this is pretty pathetic, and i should rethink this. but for now, fuck it 48 - defer p.lk.Unlock() 49 - 50 doc, err := FetchDidDoc(ctx, p.h, did) 51 if err != nil { 52 return nil, err 53 } 54 55 p.bc.PutDoc(did, doc) 56 57 return doc, nil 58 } ··· 61 skipCache, _ := ctx.Value("skip-cache").(bool) 62 63 if !skipCache { 64 cached, ok := p.bc.GetDid(handle) 65 if ok { 66 return cached, nil 67 } ··· 72 return "", err 73 } 74 75 p.bc.PutDid(handle, did) 76 77 return did, nil 78 } 79 80 func (p *Passport) BustDoc(ctx context.Context, did string) error { 81 return p.bc.BustDoc(did) 82 } 83 84 func (p *Passport) BustDid(ctx context.Context, handle string) error { 85 return p.bc.BustDid(handle) 86 }
··· 19 type Passport struct { 20 h *http.Client 21 bc BackingCache 22 + mu sync.RWMutex 23 } 24 25 func NewPassport(h *http.Client, bc BackingCache) *Passport { ··· 30 return &Passport{ 31 h: h, 32 bc: bc, 33 } 34 } 35 ··· 37 skipCache, _ := ctx.Value("skip-cache").(bool) 38 39 if !skipCache { 40 + p.mu.RLock() 41 cached, ok := p.bc.GetDoc(did) 42 + p.mu.RUnlock() 43 + 44 if ok { 45 return cached, nil 46 } 47 } 48 49 + // TODO: should coalesce requests here 50 doc, err := FetchDidDoc(ctx, p.h, did) 51 if err != nil { 52 return nil, err 53 } 54 55 + p.mu.Lock() 56 p.bc.PutDoc(did, doc) 57 + p.mu.Unlock() 58 59 return doc, nil 60 } ··· 63 skipCache, _ := ctx.Value("skip-cache").(bool) 64 65 if !skipCache { 66 + p.mu.RLock() 67 cached, ok := p.bc.GetDid(handle) 68 + p.mu.RUnlock() 69 + 70 if ok { 71 return cached, nil 72 } ··· 77 return "", err 78 } 79 80 + p.mu.Lock() 81 p.bc.PutDid(handle, did) 82 + p.mu.Unlock() 83 84 return did, nil 85 } 86 87 func (p *Passport) BustDoc(ctx context.Context, did string) error { 88 + p.mu.Lock() 89 + defer p.mu.Unlock() 90 return p.bc.BustDoc(did) 91 } 92 93 func (p *Passport) BustDid(ctx context.Context, handle string) error { 94 + p.mu.Lock() 95 + defer p.mu.Unlock() 96 return p.bc.BustDid(handle) 97 }
+1 -1
oauth/client/manager.go
··· 289 return nil, errors.New("at least one `redirect_uri` is required") 290 } 291 292 - if metadata.ApplicationType == "native" && metadata.TokenEndpointAuthMethod == "none" { 293 return nil, errors.New("native clients must authenticate using `none` method") 294 } 295
··· 289 return nil, errors.New("at least one `redirect_uri` is required") 290 } 291 292 + if metadata.ApplicationType == "native" && metadata.TokenEndpointAuthMethod != "none" { 293 return nil, errors.New("native clients must authenticate using `none` method") 294 } 295
+77
recording_blockstore/recording_blockstore.go
···
··· 1 + package recording_blockstore 2 + 3 + import ( 4 + "context" 5 + 6 + blockformat "github.com/ipfs/go-block-format" 7 + "github.com/ipfs/go-cid" 8 + blockstore "github.com/ipfs/go-ipfs-blockstore" 9 + ) 10 + 11 + type RecordingBlockstore struct { 12 + base blockstore.Blockstore 13 + 14 + inserts map[cid.Cid]blockformat.Block 15 + } 16 + 17 + func New(base blockstore.Blockstore) *RecordingBlockstore { 18 + return &RecordingBlockstore{ 19 + base: base, 20 + inserts: make(map[cid.Cid]blockformat.Block), 21 + } 22 + } 23 + 24 + func (bs *RecordingBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { 25 + return bs.base.Has(ctx, c) 26 + } 27 + 28 + func (bs *RecordingBlockstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { 29 + return bs.base.Get(ctx, c) 30 + } 31 + 32 + func (bs *RecordingBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { 33 + return bs.base.GetSize(ctx, c) 34 + } 35 + 36 + func (bs *RecordingBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error { 37 + return bs.base.DeleteBlock(ctx, c) 38 + } 39 + 40 + func (bs *RecordingBlockstore) Put(ctx context.Context, block blockformat.Block) error { 41 + if err := bs.base.Put(ctx, block); err != nil { 42 + return err 43 + } 44 + bs.inserts[block.Cid()] = block 45 + return nil 46 + } 47 + 48 + func (bs *RecordingBlockstore) PutMany(ctx context.Context, blocks []blockformat.Block) error { 49 + if err := bs.base.PutMany(ctx, blocks); err != nil { 50 + return err 51 + } 52 + 53 + for _, b := range blocks { 54 + bs.inserts[b.Cid()] = b 55 + } 56 + 57 + return nil 58 + } 59 + 60 + func (bs *RecordingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 61 + return bs.AllKeysChan(ctx) 62 + } 63 + 64 + func (bs *RecordingBlockstore) HashOnRead(enabled bool) { 65 + } 66 + 67 + func (bs *RecordingBlockstore) GetLogMap() map[cid.Cid]blockformat.Block { 68 + return bs.inserts 69 + } 70 + 71 + func (bs *RecordingBlockstore) GetLogArray() []blockformat.Block { 72 + var blocks []blockformat.Block 73 + for _, b := range bs.inserts { 74 + blocks = append(blocks, b) 75 + } 76 + return blocks 77 + }
+30
server/blockstore_variant.go
···
··· 1 + package server 2 + 3 + import ( 4 + "github.com/haileyok/cocoon/sqlite_blockstore" 5 + blockstore "github.com/ipfs/go-ipfs-blockstore" 6 + ) 7 + 8 + type BlockstoreVariant int 9 + 10 + const ( 11 + BlockstoreVariantSqlite = iota 12 + ) 13 + 14 + func MustReturnBlockstoreVariant(maybeBsv string) BlockstoreVariant { 15 + switch maybeBsv { 16 + case "sqlite": 17 + return BlockstoreVariantSqlite 18 + default: 19 + panic("invalid blockstore variant provided") 20 + } 21 + } 22 + 23 + func (s *Server) getBlockstore(did string) blockstore.Blockstore { 24 + switch s.config.BlockstoreVariant { 25 + case BlockstoreVariantSqlite: 26 + return sqlite_blockstore.New(did, s.db) 27 + default: 28 + return sqlite_blockstore.New(did, s.db) 29 + } 30 + }
+2 -3
server/handle_import_repo.go
··· 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/indigo/repo" 12 - "github.com/haileyok/cocoon/blockstore" 13 "github.com/haileyok/cocoon/internal/helpers" 14 "github.com/haileyok/cocoon/models" 15 blocks "github.com/ipfs/go-block-format" ··· 27 return helpers.ServerError(e, nil) 28 } 29 30 - bs := blockstore.New(urepo.Repo.Did, s.db) 31 32 cs, err := car.NewCarReader(bytes.NewReader(b)) 33 if err != nil { ··· 107 return helpers.ServerError(e, nil) 108 } 109 110 - if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil { 111 s.logger.Error("error updating repo after commit", "error", err) 112 return helpers.ServerError(e, nil) 113 }
··· 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/indigo/repo" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 blocks "github.com/ipfs/go-block-format" ··· 26 return helpers.ServerError(e, nil) 27 } 28 29 + bs := s.getBlockstore(urepo.Repo.Did) 30 31 cs, err := car.NewCarReader(bytes.NewReader(b)) 32 if err != nil { ··· 106 return helpers.ServerError(e, nil) 107 } 108 109 + if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil { 110 s.logger.Error("error updating repo after commit", "error", err) 111 return helpers.ServerError(e, nil) 112 }
+2 -3
server/handle_server_create_account.go
··· 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/repo" 16 "github.com/bluesky-social/indigo/util" 17 - "github.com/haileyok/cocoon/blockstore" 18 "github.com/haileyok/cocoon/internal/helpers" 19 "github.com/haileyok/cocoon/models" 20 "github.com/labstack/echo/v4" ··· 177 } 178 179 if customDidHeader == "" { 180 - bs := blockstore.New(signupDid, s.db) 181 r := repo.NewRepo(context.TODO(), signupDid, bs) 182 183 root, rev, err := r.Commit(context.TODO(), urepo.SignFor) ··· 186 return helpers.ServerError(e, nil) 187 } 188 189 - if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil { 190 s.logger.Error("error updating repo after commit", "error", err) 191 return helpers.ServerError(e, nil) 192 }
··· 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/repo" 16 "github.com/bluesky-social/indigo/util" 17 "github.com/haileyok/cocoon/internal/helpers" 18 "github.com/haileyok/cocoon/models" 19 "github.com/labstack/echo/v4" ··· 176 } 177 178 if customDidHeader == "" { 179 + bs := s.getBlockstore(signupDid) 180 r := repo.NewRepo(context.TODO(), signupDid, bs) 181 182 root, rev, err := r.Commit(context.TODO(), urepo.SignFor) ··· 185 return helpers.ServerError(e, nil) 186 } 187 188 + if err := s.UpdateRepo(context.TODO(), urepo.Did, root, rev); err != nil { 189 s.logger.Error("error updating repo after commit", "error", err) 190 return helpers.ServerError(e, nil) 191 }
+8 -6
server/handle_server_get_service_auth.go
··· 19 20 type ServerGetServiceAuthRequest struct { 21 Aud string `query:"aud" validate:"required,atproto-did"` 22 - Exp int64 `query:"exp"` 23 - Lxm string `query:"lxm" validate:"required,atproto-nsid"` 24 } 25 26 func (s *Server) handleServerGetServiceAuth(e echo.Context) error { ··· 34 return helpers.InputError(e, nil) 35 } 36 37 now := time.Now().Unix() 38 - if req.Exp == 0 { 39 - req.Exp = now + 60 // default 40 } 41 42 if req.Lxm == "com.atproto.server.getServiceAuth" { ··· 44 } 45 46 maxExp := now + (60 * 30) 47 - if req.Exp > maxExp { 48 return helpers.InputError(e, to.StringPtr("expiration too big. smoller please")) 49 } 50 ··· 68 "aud": req.Aud, 69 "lxm": req.Lxm, 70 "jti": uuid.NewString(), 71 - "exp": req.Exp, 72 "iat": now, 73 } 74 pj, err := json.Marshal(payload)
··· 19 20 type ServerGetServiceAuthRequest struct { 21 Aud string `query:"aud" validate:"required,atproto-did"` 22 + // exp should be a float, as some clients will send a non-integer expiration 23 + Exp float64 `query:"exp"` 24 + Lxm string `query:"lxm" validate:"required,atproto-nsid"` 25 } 26 27 func (s *Server) handleServerGetServiceAuth(e echo.Context) error { ··· 35 return helpers.InputError(e, nil) 36 } 37 38 + exp := int64(req.Exp) 39 now := time.Now().Unix() 40 + if exp == 0 { 41 + exp = now + 60 // default 42 } 43 44 if req.Lxm == "com.atproto.server.getServiceAuth" { ··· 46 } 47 48 maxExp := now + (60 * 30) 49 + if exp > maxExp { 50 return helpers.InputError(e, to.StringPtr("expiration too big. smoller please")) 51 } 52 ··· 70 "aud": req.Aud, 71 "lxm": req.Lxm, 72 "jti": uuid.NewString(), 73 + "exp": exp, 74 "iat": now, 75 } 76 pj, err := json.Marshal(payload)
+1 -2
server/handle_sync_get_blocks.go
··· 6 "strings" 7 8 "github.com/bluesky-social/indigo/carstore" 9 - "github.com/haileyok/cocoon/blockstore" 10 "github.com/haileyok/cocoon/internal/helpers" 11 "github.com/ipfs/go-cid" 12 cbor "github.com/ipfs/go-ipld-cbor" ··· 54 return helpers.ServerError(e, nil) 55 } 56 57 - bs := blockstore.New(urepo.Repo.Did, s.db) 58 59 for _, c := range cids { 60 b, err := bs.Get(context.TODO(), c)
··· 6 "strings" 7 8 "github.com/bluesky-social/indigo/carstore" 9 "github.com/haileyok/cocoon/internal/helpers" 10 "github.com/ipfs/go-cid" 11 cbor "github.com/ipfs/go-ipld-cbor" ··· 53 return helpers.ServerError(e, nil) 54 } 55 56 + bs := s.getBlockstore(urepo.Repo.Did) 57 58 for _, c := range cids { 59 b, err := bs.Get(context.TODO(), c)
+12 -12
server/repo.go
··· 16 "github.com/bluesky-social/indigo/events" 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 "github.com/bluesky-social/indigo/repo" 19 - "github.com/bluesky-social/indigo/util" 20 - "github.com/haileyok/cocoon/blockstore" 21 "github.com/haileyok/cocoon/internal/db" 22 "github.com/haileyok/cocoon/models" 23 blocks "github.com/ipfs/go-block-format" 24 "github.com/ipfs/go-cid" 25 cbor "github.com/ipfs/go-ipld-cbor" ··· 103 return nil, err 104 } 105 106 - dbs := blockstore.New(urepo.Did, rm.db) 107 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid) 108 109 entries := []models.Record{} ··· 274 } 275 } 276 277 - for _, op := range dbs.GetLog() { 278 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 279 return nil, err 280 } ··· 324 }, 325 }) 326 327 - if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil { 328 return nil, err 329 } 330 ··· 345 return cid.Undef, nil, err 346 } 347 348 - dbs := blockstore.New(urepo.Did, rm.db) 349 - bs := util.NewLoggingBstore(dbs) 350 351 r, err := repo.OpenRepo(context.TODO(), bs, c) 352 if err != nil { ··· 358 return cid.Undef, nil, err 359 } 360 361 - return c, bs.GetLoggedBlocks(), nil 362 } 363 364 func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) { ··· 414 return nil, fmt.Errorf("error unmarshaling cbor: %w", err) 415 } 416 417 - var deepiter func(interface{}) error 418 - deepiter = func(item interface{}) error { 419 switch val := item.(type) { 420 - case map[string]interface{}: 421 if val["$type"] == "blob" { 422 if ref, ok := val["ref"].(string); ok { 423 c, err := cid.Parse(ref) ··· 430 return deepiter(v) 431 } 432 } 433 - case []interface{}: 434 for _, v := range val { 435 deepiter(v) 436 }
··· 16 "github.com/bluesky-social/indigo/events" 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 "github.com/bluesky-social/indigo/repo" 19 "github.com/haileyok/cocoon/internal/db" 20 "github.com/haileyok/cocoon/models" 21 + "github.com/haileyok/cocoon/recording_blockstore" 22 blocks "github.com/ipfs/go-block-format" 23 "github.com/ipfs/go-cid" 24 cbor "github.com/ipfs/go-ipld-cbor" ··· 102 return nil, err 103 } 104 105 + dbs := rm.s.getBlockstore(urepo.Did) 106 + bs := recording_blockstore.New(dbs) 107 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid) 108 109 entries := []models.Record{} ··· 274 } 275 } 276 277 + for _, op := range bs.GetLogMap() { 278 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 279 return nil, err 280 } ··· 324 }, 325 }) 326 327 + if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil { 328 return nil, err 329 } 330 ··· 345 return cid.Undef, nil, err 346 } 347 348 + dbs := rm.s.getBlockstore(urepo.Did) 349 + bs := recording_blockstore.New(dbs) 350 351 r, err := repo.OpenRepo(context.TODO(), bs, c) 352 if err != nil { ··· 358 return cid.Undef, nil, err 359 } 360 361 + return c, bs.GetLogArray(), nil 362 } 363 364 func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) { ··· 414 return nil, fmt.Errorf("error unmarshaling cbor: %w", err) 415 } 416 417 + var deepiter func(any) error 418 + deepiter = func(item any) error { 419 switch val := item.(type) { 420 + case map[string]any: 421 if val["$type"] == "blob" { 422 if ref, ok := val["ref"].(string); ok { 423 c, err := cid.Parse(ref) ··· 430 return deepiter(v) 431 } 432 } 433 + case []any: 434 for _, v := range val { 435 deepiter(v) 436 }
+13
server/server.go
··· 38 "github.com/haileyok/cocoon/oauth/dpop" 39 "github.com/haileyok/cocoon/oauth/provider" 40 "github.com/haileyok/cocoon/plc" 41 echo_session "github.com/labstack/echo-contrib/session" 42 "github.com/labstack/echo/v4" 43 "github.com/labstack/echo/v4/middleware" ··· 104 SessionSecret string 105 106 DefaultAtprotoProxy string 107 } 108 109 type config struct { ··· 117 SmtpEmail string 118 SmtpName string 119 DefaultAtprotoProxy string 120 } 121 122 type CustomValidator struct { ··· 349 SmtpName: args.SmtpName, 350 SmtpEmail: args.SmtpEmail, 351 DefaultAtprotoProxy: args.DefaultAtprotoProxy, 352 }, 353 evtman: events.NewEventManager(events.NewMemPersister()), 354 passport: identity.NewPassport(h, identity.NewMemCache(10_000)), ··· 641 go s.doBackup() 642 } 643 }
··· 38 "github.com/haileyok/cocoon/oauth/dpop" 39 "github.com/haileyok/cocoon/oauth/provider" 40 "github.com/haileyok/cocoon/plc" 41 + "github.com/ipfs/go-cid" 42 echo_session "github.com/labstack/echo-contrib/session" 43 "github.com/labstack/echo/v4" 44 "github.com/labstack/echo/v4/middleware" ··· 105 SessionSecret string 106 107 DefaultAtprotoProxy string 108 + 109 + BlockstoreVariant BlockstoreVariant 110 } 111 112 type config struct { ··· 120 SmtpEmail string 121 SmtpName string 122 DefaultAtprotoProxy string 123 + BlockstoreVariant BlockstoreVariant 124 } 125 126 type CustomValidator struct { ··· 353 SmtpName: args.SmtpName, 354 SmtpEmail: args.SmtpEmail, 355 DefaultAtprotoProxy: args.DefaultAtprotoProxy, 356 + BlockstoreVariant: args.BlockstoreVariant, 357 }, 358 evtman: events.NewEventManager(events.NewMemPersister()), 359 passport: identity.NewPassport(h, identity.NewMemCache(10_000)), ··· 646 go s.doBackup() 647 } 648 } 649 + 650 + func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error { 651 + if err := s.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil { 652 + return err 653 + } 654 + 655 + return nil 656 + }
+155
sqlite_blockstore/sqlite_blockstore.go
···
··· 1 + package sqlite_blockstore 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + "github.com/haileyok/cocoon/internal/db" 9 + "github.com/haileyok/cocoon/models" 10 + blocks "github.com/ipfs/go-block-format" 11 + "github.com/ipfs/go-cid" 12 + "gorm.io/gorm/clause" 13 + ) 14 + 15 + type SqliteBlockstore struct { 16 + db *db.DB 17 + did string 18 + readonly bool 19 + inserts map[cid.Cid]blocks.Block 20 + } 21 + 22 + func New(did string, db *db.DB) *SqliteBlockstore { 23 + return &SqliteBlockstore{ 24 + did: did, 25 + db: db, 26 + readonly: false, 27 + inserts: map[cid.Cid]blocks.Block{}, 28 + } 29 + } 30 + 31 + func NewReadOnly(did string, db *db.DB) *SqliteBlockstore { 32 + return &SqliteBlockstore{ 33 + did: did, 34 + db: db, 35 + readonly: true, 36 + inserts: map[cid.Cid]blocks.Block{}, 37 + } 38 + } 39 + 40 + func (bs *SqliteBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { 41 + var block models.Block 42 + 43 + maybeBlock, ok := bs.inserts[cid] 44 + if ok { 45 + return maybeBlock, nil 46 + } 47 + 48 + if err := bs.db.Raw("SELECT * FROM blocks WHERE did = ? AND cid = ?", nil, bs.did, cid.Bytes()).Scan(&block).Error; err != nil { 49 + return nil, err 50 + } 51 + 52 + b, err := blocks.NewBlockWithCid(block.Value, cid) 53 + if err != nil { 54 + return nil, err 55 + } 56 + 57 + return b, nil 58 + } 59 + 60 + func (bs *SqliteBlockstore) Put(ctx context.Context, block blocks.Block) error { 61 + bs.inserts[block.Cid()] = block 62 + 63 + if bs.readonly { 64 + return nil 65 + } 66 + 67 + b := models.Block{ 68 + Did: bs.did, 69 + Cid: block.Cid().Bytes(), 70 + Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this 71 + Value: block.RawData(), 72 + } 73 + 74 + if err := bs.db.Create(&b, []clause.Expression{clause.OnConflict{ 75 + Columns: []clause.Column{{Name: "did"}, {Name: "cid"}}, 76 + UpdateAll: true, 77 + }}).Error; err != nil { 78 + return err 79 + } 80 + 81 + return nil 82 + } 83 + 84 + func (bs *SqliteBlockstore) DeleteBlock(context.Context, cid.Cid) error { 85 + panic("not implemented") 86 + } 87 + 88 + func (bs *SqliteBlockstore) Has(context.Context, cid.Cid) (bool, error) { 89 + panic("not implemented") 90 + } 91 + 92 + func (bs *SqliteBlockstore) GetSize(context.Context, cid.Cid) (int, error) { 93 + panic("not implemented") 94 + } 95 + 96 + func (bs *SqliteBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { 97 + tx := bs.db.BeginDangerously() 98 + 99 + for _, block := range blocks { 100 + bs.inserts[block.Cid()] = block 101 + 102 + if bs.readonly { 103 + continue 104 + } 105 + 106 + b := models.Block{ 107 + Did: bs.did, 108 + Cid: block.Cid().Bytes(), 109 + Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this 110 + Value: block.RawData(), 111 + } 112 + 113 + if err := tx.Clauses(clause.OnConflict{ 114 + Columns: []clause.Column{{Name: "did"}, {Name: "cid"}}, 115 + UpdateAll: true, 116 + }).Create(&b).Error; err != nil { 117 + tx.Rollback() 118 + return err 119 + } 120 + } 121 + 122 + if bs.readonly { 123 + return nil 124 + } 125 + 126 + tx.Commit() 127 + 128 + return nil 129 + } 130 + 131 + func (bs *SqliteBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 132 + panic("not implemented") 133 + } 134 + 135 + func (bs *SqliteBlockstore) HashOnRead(enabled bool) { 136 + panic("not implemented") 137 + } 138 + 139 + func (bs *SqliteBlockstore) Execute(ctx context.Context) error { 140 + if !bs.readonly { 141 + return fmt.Errorf("blockstore was not readonly") 142 + } 143 + 144 + bs.readonly = false 145 + for _, b := range bs.inserts { 146 + bs.Put(ctx, b) 147 + } 148 + bs.readonly = true 149 + 150 + return nil 151 + } 152 + 153 + func (bs *SqliteBlockstore) GetLog() map[cid.Cid]blocks.Block { 154 + return bs.inserts 155 + }