forked from hailey.at/cocoon
An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

+63 -59
README.md
··· 5 5 6 6 Cocoon is a PDS implementation in Go. It is highly experimental, and is not ready for any production use. 7 7 8 - ### Impmlemented Endpoints 8 + ## Implemented Endpoints 9 9 10 10 > [!NOTE] 11 - Just because something is implemented doesn't mean it is finisehd. Tons of these are returning bad errors, don't do validation properly, etc. I'll make a "second pass" checklist at some point to do all of that. 11 + Just because something is implemented doesn't mean it is finished. Tons of these are returning bad errors, don't do validation properly, etc. I'll make a "second pass" checklist at some point to do all of that. 12 12 13 - #### Identity 14 - - [ ] com.atproto.identity.getRecommendedDidCredentials 15 - - [ ] com.atproto.identity.requestPlcOperationSignature 16 - - [x] com.atproto.identity.resolveHandle 17 - - [ ] com.atproto.identity.signPlcOperation 18 - - [ ] com.atproto.identity.submitPlcOperatioin 19 - - [x] com.atproto.identity.updateHandle 13 + ### Identity 20 14 21 - #### Repo 22 - - [x] com.atproto.repo.applyWrites 23 - - [x] com.atproto.repo.createRecord 24 - - [x] com.atproto.repo.putRecord 25 - - [x] com.atproto.repo.deleteRecord 26 - - [x] com.atproto.repo.describeRepo 27 - - [x] com.atproto.repo.getRecord 28 - - [x] com.atproto.repo.importRepo (Works "okay". You still have to handle PLC operations on your own when migrating. Use with extreme caution.) 29 - - [x] com.atproto.repo.listRecords 30 - - [ ] com.atproto.repo.listMissingBlobs 15 + - [ ] `com.atproto.identity.getRecommendedDidCredentials` 16 + - [ ] `com.atproto.identity.requestPlcOperationSignature` 17 + - [x] `com.atproto.identity.resolveHandle` 18 + - [ ] `com.atproto.identity.signPlcOperation` 19 + - [ ] `com.atproto.identity.submitPlcOperation` 20 + - [x] `com.atproto.identity.updateHandle` 31 21 32 - #### Server 33 - - [ ] com.atproto.server.activateAccount 34 - - [x] com.atproto.server.checkAccountStatus 35 - - [x] com.atproto.server.confirmEmail 36 - - [x] com.atproto.server.createAccount 37 - - [x] com.atproto.server.createInviteCode 38 - - [x] com.atproto.server.createInviteCodes 39 - - [ ] com.atproto.server.deactivateAccount 40 - - [ ] com.atproto.server.deleteAccount 41 - - [x] com.atproto.server.deleteSession 42 - - [x] com.atproto.server.describeServer 43 - - [ ] com.atproto.server.getAccountInviteCodes 44 - - [ ] com.atproto.server.getServiceAuth 45 - - ~[ ] com.atproto.server.listAppPasswords~ - not going to add app passwords 46 - - [x] com.atproto.server.refreshSession 47 - - [ ] com.atproto.server.requestAccountDelete 48 - - [x] com.atproto.server.requestEmailConfirmation 49 - - [x] com.atproto.server.requestEmailUpdate 50 - - [x] com.atproto.server.requestPasswordReset 51 - - [ ] com.atproto.server.reserveSigningKey 52 - - [x] com.atproto.server.resetPassword 53 - - ~[ ] com.atproto.server.revokeAppPassword~ - not going to add app passwords 54 - - [x] com.atproto.server.updateEmail 22 + ### Repo 55 23 56 - #### Sync 57 - - [x] com.atproto.sync.getBlob 58 - - [x] com.atproto.sync.getBlocks 59 - - [x] com.atproto.sync.getLatestCommit 60 - - [x] com.atproto.sync.getRecord 61 - - [x] com.atproto.sync.getRepoStatus 62 - - [x] com.atproto.sync.getRepo 63 - - [x] com.atproto.sync.listBlobs 64 - - [x] com.atproto.sync.listRepos 65 - - ~[ ] com.atproto.sync.notifyOfUpdate~ - BGS doesn't even have this implemented lol 66 - - [x] com.atproto.sync.requestCrawl 67 - - [x] com.atproto.sync.subscribeRepos 24 + - [x] `com.atproto.repo.applyWrites` 25 + - [x] `com.atproto.repo.createRecord` 26 + - [x] `com.atproto.repo.putRecord` 27 + - [x] `com.atproto.repo.deleteRecord` 28 + - [x] `com.atproto.repo.describeRepo` 29 + - [x] `com.atproto.repo.getRecord` 30 + - [x] `com.atproto.repo.importRepo` (Works "okay". You still have to handle PLC operations on your own when migrating. Use with extreme caution.) 31 + - [x] `com.atproto.repo.listRecords` 32 + - [ ] `com.atproto.repo.listMissingBlobs` 33 + 34 + ### Server 35 + 36 + - [ ] `com.atproto.server.activateAccount` 37 + - [x] `com.atproto.server.checkAccountStatus` 38 + - [x] `com.atproto.server.confirmEmail` 39 + - [x] `com.atproto.server.createAccount` 40 + - [x] `com.atproto.server.createInviteCode` 41 + - [x] `com.atproto.server.createInviteCodes` 42 + - [ ] `com.atproto.server.deactivateAccount` 43 + - [ ] `com.atproto.server.deleteAccount` 44 + - [x] `com.atproto.server.deleteSession` 45 + - [x] `com.atproto.server.describeServer` 46 + - [ ] `com.atproto.server.getAccountInviteCodes` 47 + - [ ] `com.atproto.server.getServiceAuth` 48 + - ~~[ ] `com.atproto.server.listAppPasswords`~~ - not going to add app passwords 49 + - [x] `com.atproto.server.refreshSession` 50 + - [ ] `com.atproto.server.requestAccountDelete` 51 + - [x] `com.atproto.server.requestEmailConfirmation` 52 + - [x] `com.atproto.server.requestEmailUpdate` 53 + - [x] `com.atproto.server.requestPasswordReset` 54 + - [ ] `com.atproto.server.reserveSigningKey` 55 + - [x] `com.atproto.server.resetPassword` 56 + - ~~[] `com.atproto.server.revokeAppPassword`~~ - not going to add app passwords 57 + - [x] `com.atproto.server.updateEmail` 68 58 69 - #### Other 70 - - [ ] com.atproto.label.queryLabels 71 - - [x] com.atproto.moderation.createReport (Note: this should be handled by proxying, not actually implemented in the PDS) 72 - - [x] app.bsky.actor.getPreferences 73 - - [x] app.bsky.actor.putPreferences 59 + ### Sync 74 60 61 + - [x] `com.atproto.sync.getBlob` 62 + - [x] `com.atproto.sync.getBlocks` 63 + - [x] `com.atproto.sync.getLatestCommit` 64 + - [x] `com.atproto.sync.getRecord` 65 + - [x] `com.atproto.sync.getRepoStatus` 66 + - [x] `com.atproto.sync.getRepo` 67 + - [x] `com.atproto.sync.listBlobs` 68 + - [x] `com.atproto.sync.listRepos` 69 + - ~~[ ] `com.atproto.sync.notifyOfUpdate`~~ - BGS doesn't even have this implemented lol 70 + - [x] `com.atproto.sync.requestCrawl` 71 + - [x] `com.atproto.sync.subscribeRepos` 72 + 73 + ### Other 74 + 75 + - [ ] `com.atproto.label.queryLabels` 76 + - [x] `com.atproto.moderation.createReport` (Note: this should be handled by proxying, not actually implemented in the PDS) 77 + - [x] `app.bsky.actor.getPreferences` 78 + - [x] `app.bsky.actor.putPreferences` 75 79 76 80 ## License 77 81
-163
blockstore/blockstore.go
··· 1 - package blockstore 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - 7 - "github.com/bluesky-social/indigo/atproto/syntax" 8 - "github.com/haileyok/cocoon/internal/db" 9 - "github.com/haileyok/cocoon/models" 10 - blocks "github.com/ipfs/go-block-format" 11 - "github.com/ipfs/go-cid" 12 - "gorm.io/gorm/clause" 13 - ) 14 - 15 - type SqliteBlockstore struct { 16 - db *db.DB 17 - did string 18 - readonly bool 19 - inserts map[cid.Cid]blocks.Block 20 - } 21 - 22 - func New(did string, db *db.DB) *SqliteBlockstore { 23 - return &SqliteBlockstore{ 24 - did: did, 25 - db: db, 26 - readonly: false, 27 - inserts: map[cid.Cid]blocks.Block{}, 28 - } 29 - } 30 - 31 - func NewReadOnly(did string, db *db.DB) *SqliteBlockstore { 32 - return &SqliteBlockstore{ 33 - did: did, 34 - db: db, 35 - readonly: true, 36 - inserts: map[cid.Cid]blocks.Block{}, 37 - } 38 - } 39 - 40 - func (bs *SqliteBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { 41 - var block models.Block 42 - 43 - maybeBlock, ok := bs.inserts[cid] 44 - if ok { 45 - return maybeBlock, nil 46 - } 47 - 48 - if err := bs.db.Raw("SELECT * FROM blocks WHERE did = ? AND cid = ?", nil, bs.did, cid.Bytes()).Scan(&block).Error; err != nil { 49 - return nil, err 50 - } 51 - 52 - b, err := blocks.NewBlockWithCid(block.Value, cid) 53 - if err != nil { 54 - return nil, err 55 - } 56 - 57 - return b, nil 58 - } 59 - 60 - func (bs *SqliteBlockstore) Put(ctx context.Context, block blocks.Block) error { 61 - bs.inserts[block.Cid()] = block 62 - 63 - if bs.readonly { 64 - return nil 65 - } 66 - 67 - b := models.Block{ 68 - Did: bs.did, 69 - Cid: block.Cid().Bytes(), 70 - Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this 71 - Value: block.RawData(), 72 - } 73 - 74 - if err := bs.db.Create(&b, []clause.Expression{clause.OnConflict{ 75 - Columns: []clause.Column{{Name: "did"}, {Name: "cid"}}, 76 - UpdateAll: true, 77 - }}).Error; err != nil { 78 - return err 79 - } 80 - 81 - return nil 82 - } 83 - 84 - func (bs *SqliteBlockstore) DeleteBlock(context.Context, cid.Cid) error { 85 - panic("not implemented") 86 - } 87 - 88 - func (bs *SqliteBlockstore) Has(context.Context, cid.Cid) (bool, error) { 89 - panic("not implemented") 90 - } 91 - 92 - func (bs *SqliteBlockstore) GetSize(context.Context, cid.Cid) (int, error) { 93 - panic("not implemented") 94 - } 95 - 96 - func (bs *SqliteBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { 97 - tx := bs.db.BeginDangerously() 98 - 99 - for _, block := range blocks { 100 - bs.inserts[block.Cid()] = block 101 - 102 - if bs.readonly { 103 - continue 104 - } 105 - 106 - b := models.Block{ 107 - Did: bs.did, 108 - Cid: block.Cid().Bytes(), 109 - Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this 110 - Value: block.RawData(), 111 - } 112 - 113 - if err := tx.Clauses(clause.OnConflict{ 114 - Columns: []clause.Column{{Name: "did"}, {Name: "cid"}}, 115 - UpdateAll: true, 116 - }).Create(&b).Error; err != nil { 117 - tx.Rollback() 118 - return err 119 - } 120 - } 121 - 122 - if bs.readonly { 123 - return nil 124 - } 125 - 126 - tx.Commit() 127 - 128 - return nil 129 - } 130 - 131 - func (bs *SqliteBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 132 - panic("not implemented") 133 - } 134 - 135 - func (bs *SqliteBlockstore) HashOnRead(enabled bool) { 136 - panic("not implemented") 137 - } 138 - 139 - func (bs *SqliteBlockstore) UpdateRepo(ctx context.Context, root cid.Cid, rev string) error { 140 - if err := bs.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, bs.did).Error; err != nil { 141 - return err 142 - } 143 - 144 - return nil 145 - } 146 - 147 - func (bs *SqliteBlockstore) Execute(ctx context.Context) error { 148 - if !bs.readonly { 149 - return fmt.Errorf("blockstore was not readonly") 150 - } 151 - 152 - bs.readonly = false 153 - for _, b := range bs.inserts { 154 - bs.Put(ctx, b) 155 - } 156 - bs.readonly = true 157 - 158 - return nil 159 - } 160 - 161 - func (bs *SqliteBlockstore) GetLog() map[cid.Cid]blocks.Block { 162 - return bs.inserts 163 - }
+7
cmd/cocoon/main.go
··· 136 136 EnvVars: []string{"COCOON_DEFAULT_ATPROTO_PROXY"}, 137 137 Value: "did:web:api.bsky.app#bsky_appview", 138 138 }, 139 + &cli.StringFlag{ 140 + Name: "blockstore-variant", 141 + EnvVars: []string{"COCOON_BLOCKSTORE_VARIANT"}, 142 + Value: "sqlite", 143 + }, 139 144 }, 140 145 Commands: []*cli.Command{ 141 146 runServe, ··· 158 163 Usage: "Start the cocoon PDS", 159 164 Flags: []cli.Flag{}, 160 165 Action: func(cmd *cli.Context) error { 166 + 161 167 s, err := server.New(&server.Args{ 162 168 Addr: cmd.String("addr"), 163 169 DbName: cmd.String("db-name"), ··· 185 191 }, 186 192 SessionSecret: cmd.String("session-secret"), 187 193 DefaultAtprotoProxy: cmd.String("default-atproto-proxy"), 194 + BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")), 188 195 }) 189 196 if err != nil { 190 197 fmt.Printf("error creating cocoon: %v", err)
+45
cspell.json
··· 1 + { 2 + "version": "0.2", 3 + "language": "en", 4 + "words": [ 5 + "atproto", 6 + "bsky", 7 + "Cocoon", 8 + "PDS", 9 + "Plc", 10 + "plc", 11 + "repo", 12 + "InviteCodes", 13 + "InviteCode", 14 + "Invite", 15 + "Signin", 16 + "Signout", 17 + "JWKS", 18 + "dpop", 19 + "BGS", 20 + "pico", 21 + "picocss", 22 + "par", 23 + "blobs", 24 + "blob", 25 + "did", 26 + "DID", 27 + "OAuth", 28 + "oauth", 29 + "par", 30 + "Cocoon", 31 + "memcache", 32 + "db", 33 + "helpers", 34 + "middleware", 35 + "repo", 36 + "static", 37 + "pico", 38 + "picocss", 39 + "MIT", 40 + "Go" 41 + ], 42 + "ignorePaths": [ 43 + "server/static/pico.css" 44 + ] 45 + }
+1
go.mod
··· 14 14 github.com/google/uuid v1.4.0 15 15 github.com/gorilla/sessions v1.4.0 16 16 github.com/gorilla/websocket v1.5.1 17 + github.com/hako/durafmt v0.0.0-20210608085754-5c1018a4e16b 17 18 github.com/hashicorp/golang-lru/v2 v2.0.7 18 19 github.com/ipfs/go-block-format v0.2.0 19 20 github.com/ipfs/go-cid v0.4.1
+2
go.sum
··· 91 91 github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= 92 92 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= 93 93 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= 94 + github.com/hako/durafmt v0.0.0-20210608085754-5c1018a4e16b h1:wDUNC2eKiL35DbLvsDhiblTUXHxcOPwQSCzi7xpQUN4= 95 + github.com/hako/durafmt v0.0.0-20210608085754-5c1018a4e16b/go.mod h1:VzxiSdG6j1pi7rwGm/xYI5RbtpBgM8sARDXlvEvxlu0= 94 96 github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= 95 97 github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= 96 98 github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
+73 -54
identity/identity.go
··· 13 13 "github.com/bluesky-social/indigo/util" 14 14 ) 15 15 16 - func ResolveHandle(ctx context.Context, cli *http.Client, handle string) (string, error) { 17 - if cli == nil { 18 - cli = util.RobustHTTPClient() 19 - } 20 - 21 - var did string 22 - 23 - _, err := syntax.ParseHandle(handle) 16 + func ResolveHandleFromTXT(ctx context.Context, handle string) (string, error) { 17 + name := fmt.Sprintf("_atproto.%s", handle) 18 + recs, err := net.LookupTXT(name) 24 19 if err != nil { 25 - return "", err 20 + return "", fmt.Errorf("handle could not be resolved via txt: %w", err) 26 21 } 27 22 28 - recs, err := net.LookupTXT(fmt.Sprintf("_atproto.%s", handle)) 29 - if err == nil { 30 - for _, rec := range recs { 31 - if strings.HasPrefix(rec, "did=") { 32 - did = strings.Split(rec, "did=")[1] 33 - break 23 + for _, rec := range recs { 24 + if strings.HasPrefix(rec, "did=") { 25 + maybeDid := strings.Split(rec, "did=")[1] 26 + if _, err := syntax.ParseDID(maybeDid); err == nil { 27 + return maybeDid, nil 34 28 } 35 29 } 36 - } else { 37 - fmt.Printf("erorr getting txt records: %v\n", err) 38 30 } 39 31 40 - if did == "" { 41 - req, err := http.NewRequestWithContext( 42 - ctx, 43 - "GET", 44 - fmt.Sprintf("https://%s/.well-known/atproto-did", handle), 45 - nil, 46 - ) 47 - if err != nil { 48 - return "", nil 49 - } 32 + return "", fmt.Errorf("handle could not be resolved via txt: no record found") 33 + } 50 34 51 - resp, err := http.DefaultClient.Do(req) 52 - if err != nil { 53 - return "", nil 54 - } 55 - defer resp.Body.Close() 35 + func ResolveHandleFromWellKnown(ctx context.Context, cli *http.Client, handle string) (string, error) { 36 + ustr := fmt.Sprintf("https://%s/.well=known/atproto-did", handle) 37 + req, err := http.NewRequestWithContext( 38 + ctx, 39 + "GET", 40 + ustr, 41 + nil, 42 + ) 43 + if err != nil { 44 + return "", fmt.Errorf("handle could not be resolved via web: %w", err) 45 + } 56 46 57 - if resp.StatusCode != http.StatusOK { 58 - io.Copy(io.Discard, resp.Body) 59 - return "", fmt.Errorf("unable to resolve handle") 60 - } 47 + resp, err := cli.Do(req) 48 + if err != nil { 49 + return "", fmt.Errorf("handle could not be resolved via web: %w", err) 50 + } 51 + defer resp.Body.Close() 61 52 62 - b, err := io.ReadAll(resp.Body) 63 - if err != nil { 64 - return "", err 65 - } 53 + b, err := io.ReadAll(resp.Body) 54 + if err != nil { 55 + return "", fmt.Errorf("handle could not be resolved via web: %w", err) 56 + } 66 57 67 - maybeDid := string(b) 58 + if resp.StatusCode != http.StatusOK { 59 + return "", fmt.Errorf("handle could not be resolved via web: invalid status code %d", resp.StatusCode) 60 + } 68 61 69 - if _, err := syntax.ParseDID(maybeDid); err != nil { 70 - return "", fmt.Errorf("unable to resolve handle") 71 - } 62 + maybeDid := string(b) 72 63 73 - did = maybeDid 64 + if _, err := syntax.ParseDID(maybeDid); err != nil { 65 + return "", fmt.Errorf("handle could not be resolved via web: invalid did in document") 74 66 } 75 67 76 - return did, nil 68 + return maybeDid, nil 77 69 } 78 70 79 - func FetchDidDoc(ctx context.Context, cli *http.Client, did string) (*DidDoc, error) { 71 + func ResolveHandle(ctx context.Context, cli *http.Client, handle string) (string, error) { 80 72 if cli == nil { 81 73 cli = util.RobustHTTPClient() 82 74 } 83 75 84 - var ustr string 76 + _, err := syntax.ParseHandle(handle) 77 + if err != nil { 78 + return "", err 79 + } 80 + 81 + if maybeDidFromTxt, err := ResolveHandleFromTXT(ctx, handle); err == nil { 82 + return maybeDidFromTxt, nil 83 + } 84 + 85 + if maybeDidFromWeb, err := ResolveHandleFromWellKnown(ctx, cli, handle); err == nil { 86 + return maybeDidFromWeb, nil 87 + } 88 + 89 + return "", fmt.Errorf("handle could not be resolved") 90 + } 91 + 92 + func DidToDocUrl(did string) (string, error) { 85 93 if strings.HasPrefix(did, "did:plc:") { 86 - ustr = fmt.Sprintf("https://plc.directory/%s", did) 94 + return fmt.Sprintf("https://plc.directory/%s", did), nil 87 95 } else if strings.HasPrefix(did, "did:web:") { 88 - ustr = fmt.Sprintf("https://%s/.well-known/did.json", strings.TrimPrefix(did, "did:web:")) 96 + return fmt.Sprintf("https://%s/.well-known/did.json", strings.TrimPrefix(did, "did:web:")), nil 89 97 } else { 90 - return nil, fmt.Errorf("did was not a supported did type") 98 + return "", fmt.Errorf("did was not a supported did type") 99 + } 100 + } 101 + 102 + func FetchDidDoc(ctx context.Context, cli *http.Client, did string) (*DidDoc, error) { 103 + if cli == nil { 104 + cli = util.RobustHTTPClient() 105 + } 106 + 107 + ustr, err := DidToDocUrl(did) 108 + if err != nil { 109 + return nil, err 91 110 } 92 111 93 112 req, err := http.NewRequestWithContext(ctx, "GET", ustr, nil) ··· 95 114 return nil, err 96 115 } 97 116 98 - resp, err := http.DefaultClient.Do(req) 117 + resp, err := cli.Do(req) 99 118 if err != nil { 100 119 return nil, err 101 120 } ··· 103 122 104 123 if resp.StatusCode != 200 { 105 124 io.Copy(io.Discard, resp.Body) 106 - return nil, fmt.Errorf("could not find identity in plc registry") 125 + return nil, fmt.Errorf("unable to find did doc at url. did: %s. url: %s", did, ustr) 107 126 } 108 127 109 128 var diddoc DidDoc ··· 127 146 return nil, err 128 147 } 129 148 130 - resp, err := http.DefaultClient.Do(req) 149 + resp, err := cli.Do(req) 131 150 if err != nil { 132 151 return nil, err 133 152 }
+16 -5
identity/passport.go
··· 19 19 type Passport struct { 20 20 h *http.Client 21 21 bc BackingCache 22 - lk sync.Mutex 22 + mu sync.RWMutex 23 23 } 24 24 25 25 func NewPassport(h *http.Client, bc BackingCache) *Passport { ··· 30 30 return &Passport{ 31 31 h: h, 32 32 bc: bc, 33 - lk: sync.Mutex{}, 34 33 } 35 34 } 36 35 ··· 38 37 skipCache, _ := ctx.Value("skip-cache").(bool) 39 38 40 39 if !skipCache { 40 + p.mu.RLock() 41 41 cached, ok := p.bc.GetDoc(did) 42 + p.mu.RUnlock() 43 + 42 44 if ok { 43 45 return cached, nil 44 46 } 45 47 } 46 48 47 - p.lk.Lock() // this is pretty pathetic, and i should rethink this. but for now, fuck it 48 - defer p.lk.Unlock() 49 - 49 + // TODO: should coalesce requests here 50 50 doc, err := FetchDidDoc(ctx, p.h, did) 51 51 if err != nil { 52 52 return nil, err 53 53 } 54 54 55 + p.mu.Lock() 55 56 p.bc.PutDoc(did, doc) 57 + p.mu.Unlock() 56 58 57 59 return doc, nil 58 60 } ··· 61 63 skipCache, _ := ctx.Value("skip-cache").(bool) 62 64 63 65 if !skipCache { 66 + p.mu.RLock() 64 67 cached, ok := p.bc.GetDid(handle) 68 + p.mu.RUnlock() 69 + 65 70 if ok { 66 71 return cached, nil 67 72 } ··· 72 77 return "", err 73 78 } 74 79 80 + p.mu.Lock() 75 81 p.bc.PutDid(handle, did) 82 + p.mu.Unlock() 76 83 77 84 return did, nil 78 85 } 79 86 80 87 func (p *Passport) BustDoc(ctx context.Context, did string) error { 88 + p.mu.Lock() 89 + defer p.mu.Unlock() 81 90 return p.bc.BustDoc(did) 82 91 } 83 92 84 93 func (p *Passport) BustDid(ctx context.Context, handle string) error { 94 + p.mu.Lock() 95 + defer p.mu.Unlock() 85 96 return p.bc.BustDid(handle) 86 97 }
+1 -1
oauth/client/manager.go
··· 289 289 return nil, errors.New("at least one `redirect_uri` is required") 290 290 } 291 291 292 - if metadata.ApplicationType == "native" && metadata.TokenEndpointAuthMethod == "none" { 292 + if metadata.ApplicationType == "native" && metadata.TokenEndpointAuthMethod != "none" { 293 293 return nil, errors.New("native clients must authenticate using `none` method") 294 294 } 295 295
+32
oauth/helpers.go
··· 4 4 "errors" 5 5 "fmt" 6 6 "net/url" 7 + "time" 7 8 8 9 "github.com/haileyok/cocoon/internal/helpers" 9 10 "github.com/haileyok/cocoon/oauth/constants" 11 + "github.com/haileyok/cocoon/oauth/provider" 10 12 ) 11 13 12 14 func GenerateCode() string { ··· 46 48 47 49 return reqId, nil 48 50 } 51 + 52 + type SessionAgeResult struct { 53 + SessionAge time.Duration 54 + RefreshAge time.Duration 55 + SessionExpired bool 56 + RefreshExpired bool 57 + } 58 + 59 + func GetSessionAgeFromToken(t provider.OauthToken) SessionAgeResult { 60 + sessionLifetime := constants.PublicClientSessionLifetime 61 + refreshLifetime := constants.PublicClientRefreshLifetime 62 + if t.ClientAuth.Method != "none" { 63 + sessionLifetime = constants.ConfidentialClientSessionLifetime 64 + refreshLifetime = constants.ConfidentialClientRefreshLifetime 65 + } 66 + 67 + res := SessionAgeResult{} 68 + 69 + res.SessionAge = time.Since(t.CreatedAt) 70 + if res.SessionAge > sessionLifetime { 71 + res.SessionExpired = true 72 + } 73 + 74 + refreshAge := time.Since(t.UpdatedAt) 75 + if refreshAge > refreshLifetime { 76 + res.RefreshExpired = true 77 + } 78 + 79 + return res 80 + }
+2
oauth/provider/models.go
··· 65 65 Code string `gorm:"index"` 66 66 Token string `gorm:"uniqueIndex"` 67 67 RefreshToken string `gorm:"uniqueIndex"` 68 + Ip string 68 69 } 69 70 70 71 type OauthAuthorizationRequest struct { ··· 78 79 Sub *string 79 80 Code *string 80 81 Accepted *bool 82 + Ip string 81 83 }
+77
recording_blockstore/recording_blockstore.go
··· 1 + package recording_blockstore 2 + 3 + import ( 4 + "context" 5 + 6 + blockformat "github.com/ipfs/go-block-format" 7 + "github.com/ipfs/go-cid" 8 + blockstore "github.com/ipfs/go-ipfs-blockstore" 9 + ) 10 + 11 + type RecordingBlockstore struct { 12 + base blockstore.Blockstore 13 + 14 + inserts map[cid.Cid]blockformat.Block 15 + } 16 + 17 + func New(base blockstore.Blockstore) *RecordingBlockstore { 18 + return &RecordingBlockstore{ 19 + base: base, 20 + inserts: make(map[cid.Cid]blockformat.Block), 21 + } 22 + } 23 + 24 + func (bs *RecordingBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { 25 + return bs.base.Has(ctx, c) 26 + } 27 + 28 + func (bs *RecordingBlockstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { 29 + return bs.base.Get(ctx, c) 30 + } 31 + 32 + func (bs *RecordingBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { 33 + return bs.base.GetSize(ctx, c) 34 + } 35 + 36 + func (bs *RecordingBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error { 37 + return bs.base.DeleteBlock(ctx, c) 38 + } 39 + 40 + func (bs *RecordingBlockstore) Put(ctx context.Context, block blockformat.Block) error { 41 + if err := bs.base.Put(ctx, block); err != nil { 42 + return err 43 + } 44 + bs.inserts[block.Cid()] = block 45 + return nil 46 + } 47 + 48 + func (bs *RecordingBlockstore) PutMany(ctx context.Context, blocks []blockformat.Block) error { 49 + if err := bs.base.PutMany(ctx, blocks); err != nil { 50 + return err 51 + } 52 + 53 + for _, b := range blocks { 54 + bs.inserts[b.Cid()] = b 55 + } 56 + 57 + return nil 58 + } 59 + 60 + func (bs *RecordingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 61 + return bs.AllKeysChan(ctx) 62 + } 63 + 64 + func (bs *RecordingBlockstore) HashOnRead(enabled bool) { 65 + } 66 + 67 + func (bs *RecordingBlockstore) GetLogMap() map[cid.Cid]blockformat.Block { 68 + return bs.inserts 69 + } 70 + 71 + func (bs *RecordingBlockstore) GetLogArray() []blockformat.Block { 72 + var blocks []blockformat.Block 73 + for _, b := range bs.inserts { 74 + blocks = append(blocks, b) 75 + } 76 + return blocks 77 + }
+30
server/blockstore_variant.go
··· 1 + package server 2 + 3 + import ( 4 + "github.com/haileyok/cocoon/sqlite_blockstore" 5 + blockstore "github.com/ipfs/go-ipfs-blockstore" 6 + ) 7 + 8 + type BlockstoreVariant int 9 + 10 + const ( 11 + BlockstoreVariantSqlite = iota 12 + ) 13 + 14 + func MustReturnBlockstoreVariant(maybeBsv string) BlockstoreVariant { 15 + switch maybeBsv { 16 + case "sqlite": 17 + return BlockstoreVariantSqlite 18 + default: 19 + panic("invalid blockstore variant provided") 20 + } 21 + } 22 + 23 + func (s *Server) getBlockstore(did string) blockstore.Blockstore { 24 + switch s.config.BlockstoreVariant { 25 + case BlockstoreVariantSqlite: 26 + return sqlite_blockstore.New(did, s.db) 27 + default: 28 + return sqlite_blockstore.New(did, s.db) 29 + } 30 + }
+37 -7
server/handle_account.go
··· 3 3 import ( 4 4 "time" 5 5 6 + "github.com/haileyok/cocoon/oauth" 7 + "github.com/haileyok/cocoon/oauth/constants" 6 8 "github.com/haileyok/cocoon/oauth/provider" 9 + "github.com/hako/durafmt" 7 10 "github.com/labstack/echo/v4" 8 11 ) 9 12 10 13 func (s *Server) handleAccount(e echo.Context) error { 14 + ctx := e.Request().Context() 11 15 repo, sess, err := s.getSessionRepoOrErr(e) 12 16 if err != nil { 13 17 return e.Redirect(303, "/account/signin") 14 18 } 15 19 16 - now := time.Now() 20 + oldestPossibleSession := time.Now().Add(constants.ConfidentialClientSessionLifetime) 17 21 18 22 var tokens []provider.OauthToken 19 - if err := s.db.Raw("SELECT * FROM oauth_tokens WHERE sub = ? AND expires_at >= ? ORDER BY created_at ASC", nil, repo.Repo.Did, now).Scan(&tokens).Error; err != nil { 23 + if err := s.db.Raw("SELECT * FROM oauth_tokens WHERE sub = ? AND created_at < ? ORDER BY created_at ASC", nil, repo.Repo.Did, oldestPossibleSession).Scan(&tokens).Error; err != nil { 20 24 s.logger.Error("couldnt fetch oauth sessions for account", "did", repo.Repo.Did, "error", err) 21 25 sess.AddFlash("Unable to fetch sessions. See server logs for more details.", "error") 22 26 sess.Save(e.Request(), e.Response()) ··· 25 29 }) 26 30 } 27 31 32 + var filtered []provider.OauthToken 33 + for _, t := range tokens { 34 + ageRes := oauth.GetSessionAgeFromToken(t) 35 + if ageRes.SessionExpired { 36 + continue 37 + } 38 + filtered = append(filtered, t) 39 + } 40 + 41 + now := time.Now() 42 + 28 43 tokenInfo := []map[string]string{} 29 44 for _, t := range tokens { 45 + ageRes := oauth.GetSessionAgeFromToken(t) 46 + maxTime := constants.PublicClientSessionLifetime 47 + if t.ClientAuth.Method != "none" { 48 + maxTime = constants.ConfidentialClientSessionLifetime 49 + } 50 + 51 + var clientName string 52 + metadata, err := s.oauthProvider.ClientManager.GetClient(ctx, t.ClientId) 53 + if err != nil { 54 + clientName = t.ClientId 55 + } else { 56 + clientName = metadata.Metadata.ClientName 57 + } 58 + 30 59 tokenInfo = append(tokenInfo, map[string]string{ 31 - "ClientId": t.ClientId, 32 - "CreatedAt": t.CreatedAt.Format("02 Jan 06 15:04 MST"), 33 - "UpdatedAt": t.CreatedAt.Format("02 Jan 06 15:04 MST"), 34 - "ExpiresAt": t.CreatedAt.Format("02 Jan 06 15:04 MST"), 35 - "Token": t.Token, 60 + "ClientName": clientName, 61 + "Age": durafmt.Parse(ageRes.SessionAge).LimitFirstN(2).String(), 62 + "LastUpdated": durafmt.Parse(now.Sub(t.UpdatedAt)).LimitFirstN(2).String(), 63 + "ExpiresIn": durafmt.Parse(now.Add(maxTime).Sub(now)).LimitFirstN(2).String(), 64 + "Token": t.Token, 65 + "Ip": t.Ip, 36 66 }) 37 67 } 38 68
+2 -3
server/handle_import_repo.go
··· 9 9 10 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 11 "github.com/bluesky-social/indigo/repo" 12 - "github.com/haileyok/cocoon/blockstore" 13 12 "github.com/haileyok/cocoon/internal/helpers" 14 13 "github.com/haileyok/cocoon/models" 15 14 blocks "github.com/ipfs/go-block-format" ··· 27 26 return helpers.ServerError(e, nil) 28 27 } 29 28 30 - bs := blockstore.New(urepo.Repo.Did, s.db) 29 + bs := s.getBlockstore(urepo.Repo.Did) 31 30 32 31 cs, err := car.NewCarReader(bytes.NewReader(b)) 33 32 if err != nil { ··· 107 106 return helpers.ServerError(e, nil) 108 107 } 109 108 110 - if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil { 109 + if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil { 111 110 s.logger.Error("error updating repo after commit", "error", err) 112 111 return helpers.ServerError(e, nil) 113 112 }
+1 -1
server/handle_oauth_authorize.go
··· 113 113 114 114 code := oauth.GenerateCode() 115 115 116 - if err := s.db.Exec("UPDATE oauth_authorization_requests SET sub = ?, code = ?, accepted = ? WHERE request_id = ?", nil, repo.Repo.Did, code, true, reqId).Error; err != nil { 116 + if err := s.db.Exec("UPDATE oauth_authorization_requests SET sub = ?, code = ?, accepted = ?, ip = ? WHERE request_id = ?", nil, repo.Repo.Did, code, true, e.RealIP(), reqId).Error; err != nil { 117 117 s.logger.Error("error updating authorization request", "error", err) 118 118 return helpers.ServerError(e, nil) 119 119 }
+4 -10
server/handle_oauth_token.go
··· 157 157 Code: *authReq.Code, 158 158 Token: accessString, 159 159 RefreshToken: refreshToken, 160 + Ip: authReq.Ip, 160 161 }, nil).Error; err != nil { 161 162 s.logger.Error("error creating token in db", "error", err) 162 163 return helpers.ServerError(e, nil) ··· 203 204 return helpers.InputError(e, to.StringPtr("dpop proof does not match expected jkt")) 204 205 } 205 206 206 - sessionLifetime := constants.PublicClientSessionLifetime 207 - refreshLifetime := constants.PublicClientRefreshLifetime 208 - if clientAuth.Method != "none" { 209 - sessionLifetime = constants.ConfidentialClientSessionLifetime 210 - refreshLifetime = constants.ConfidentialClientRefreshLifetime 211 - } 207 + ageRes := oauth.GetSessionAgeFromToken(oauthToken) 212 208 213 - sessionAge := time.Since(oauthToken.CreatedAt) 214 - if sessionAge > sessionLifetime { 209 + if ageRes.SessionExpired { 215 210 return helpers.InputError(e, to.StringPtr("Session expired")) 216 211 } 217 212 218 - refreshAge := time.Since(oauthToken.UpdatedAt) 219 - if refreshAge > refreshLifetime { 213 + if ageRes.RefreshExpired { 220 214 return helpers.InputError(e, to.StringPtr("Refresh token expired")) 221 215 } 222 216
+1 -1
server/handle_proxy.go
··· 43 43 } 44 44 } 45 45 46 - return endpoint, "", nil 46 + return endpoint, svcDid, nil 47 47 } 48 48 49 49 func (s *Server) handleProxy(e echo.Context) error {
+2 -3
server/handle_server_create_account.go
··· 14 14 "github.com/bluesky-social/indigo/events" 15 15 "github.com/bluesky-social/indigo/repo" 16 16 "github.com/bluesky-social/indigo/util" 17 - "github.com/haileyok/cocoon/blockstore" 18 17 "github.com/haileyok/cocoon/internal/helpers" 19 18 "github.com/haileyok/cocoon/models" 20 19 "github.com/labstack/echo/v4" ··· 177 176 } 178 177 179 178 if customDidHeader == "" { 180 - bs := blockstore.New(signupDid, s.db) 179 + bs := s.getBlockstore(signupDid) 181 180 r := repo.NewRepo(context.TODO(), signupDid, bs) 182 181 183 182 root, rev, err := r.Commit(context.TODO(), urepo.SignFor) ··· 186 185 return helpers.ServerError(e, nil) 187 186 } 188 187 189 - if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil { 188 + if err := s.UpdateRepo(context.TODO(), urepo.Did, root, rev); err != nil { 190 189 s.logger.Error("error updating repo after commit", "error", err) 191 190 return helpers.ServerError(e, nil) 192 191 }
+8 -6
server/handle_server_get_service_auth.go
··· 19 19 20 20 type ServerGetServiceAuthRequest struct { 21 21 Aud string `query:"aud" validate:"required,atproto-did"` 22 - Exp int64 `query:"exp"` 23 - Lxm string `query:"lxm" validate:"required,atproto-nsid"` 22 + // exp should be a float, as some clients will send a non-integer expiration 23 + Exp float64 `query:"exp"` 24 + Lxm string `query:"lxm" validate:"required,atproto-nsid"` 24 25 } 25 26 26 27 func (s *Server) handleServerGetServiceAuth(e echo.Context) error { ··· 34 35 return helpers.InputError(e, nil) 35 36 } 36 37 38 + exp := int64(req.Exp) 37 39 now := time.Now().Unix() 38 - if req.Exp == 0 { 39 - req.Exp = now + 60 // default 40 + if exp == 0 { 41 + exp = now + 60 // default 40 42 } 41 43 42 44 if req.Lxm == "com.atproto.server.getServiceAuth" { ··· 44 46 } 45 47 46 48 maxExp := now + (60 * 30) 47 - if req.Exp > maxExp { 49 + if exp > maxExp { 48 50 return helpers.InputError(e, to.StringPtr("expiration too big. smoller please")) 49 51 } 50 52 ··· 68 70 "aud": req.Aud, 69 71 "lxm": req.Lxm, 70 72 "jti": uuid.NewString(), 71 - "exp": req.Exp, 73 + "exp": exp, 72 74 "iat": now, 73 75 } 74 76 pj, err := json.Marshal(payload)
+1 -2
server/handle_sync_get_blocks.go
··· 6 6 "strings" 7 7 8 8 "github.com/bluesky-social/indigo/carstore" 9 - "github.com/haileyok/cocoon/blockstore" 10 9 "github.com/haileyok/cocoon/internal/helpers" 11 10 "github.com/ipfs/go-cid" 12 11 cbor "github.com/ipfs/go-ipld-cbor" ··· 54 53 return helpers.ServerError(e, nil) 55 54 } 56 55 57 - bs := blockstore.New(urepo.Repo.Did, s.db) 56 + bs := s.getBlockstore(urepo.Repo.Did) 58 57 59 58 for _, c := range cids { 60 59 b, err := bs.Get(context.TODO(), c)
+13 -13
server/repo.go
··· 16 16 "github.com/bluesky-social/indigo/events" 17 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 18 "github.com/bluesky-social/indigo/repo" 19 - "github.com/bluesky-social/indigo/util" 20 - "github.com/haileyok/cocoon/blockstore" 21 19 "github.com/haileyok/cocoon/internal/db" 22 20 "github.com/haileyok/cocoon/models" 21 + "github.com/haileyok/cocoon/recording_blockstore" 23 22 blocks "github.com/ipfs/go-block-format" 24 23 "github.com/ipfs/go-cid" 25 24 cbor "github.com/ipfs/go-ipld-cbor" ··· 103 102 return nil, err 104 103 } 105 104 106 - dbs := blockstore.New(urepo.Did, rm.db) 105 + dbs := rm.s.getBlockstore(urepo.Did) 106 + bs := recording_blockstore.New(dbs) 107 107 r, err := repo.OpenRepo(context.TODO(), dbs, rootcid) 108 108 109 109 entries := []models.Record{} ··· 274 274 } 275 275 } 276 276 277 - for _, op := range dbs.GetLog() { 277 + for _, op := range bs.GetLogMap() { 278 278 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 279 279 return nil, err 280 280 } ··· 318 318 Rev: rev, 319 319 Since: &urepo.Rev, 320 320 Commit: lexutil.LexLink(newroot), 321 - Time: time.Now().Format(util.ISO8601), 321 + Time: time.Now().Format(time.RFC3339Nano), 322 322 Ops: ops, 323 323 TooBig: false, 324 324 }, 325 325 }) 326 326 327 - if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil { 327 + if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil { 328 328 return nil, err 329 329 } 330 330 ··· 345 345 return cid.Undef, nil, err 346 346 } 347 347 348 - dbs := blockstore.New(urepo.Did, rm.db) 349 - bs := util.NewLoggingBstore(dbs) 348 + dbs := rm.s.getBlockstore(urepo.Did) 349 + bs := recording_blockstore.New(dbs) 350 350 351 351 r, err := repo.OpenRepo(context.TODO(), bs, c) 352 352 if err != nil { ··· 358 358 return cid.Undef, nil, err 359 359 } 360 360 361 - return c, bs.GetLoggedBlocks(), nil 361 + return c, bs.GetLogArray(), nil 362 362 } 363 363 364 364 func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) { ··· 414 414 return nil, fmt.Errorf("error unmarshaling cbor: %w", err) 415 415 } 416 416 417 - var deepiter func(interface{}) error 418 - deepiter = func(item interface{}) error { 417 + var deepiter func(any) error 418 + deepiter = func(item any) error { 419 419 switch val := item.(type) { 420 - case map[string]interface{}: 420 + case map[string]any: 421 421 if val["$type"] == "blob" { 422 422 if ref, ok := val["ref"].(string); ok { 423 423 c, err := cid.Parse(ref) ··· 430 430 return deepiter(v) 431 431 } 432 432 } 433 - case []interface{}: 433 + case []any: 434 434 for _, v := range val { 435 435 deepiter(v) 436 436 }
+13
server/server.go
··· 38 38 "github.com/haileyok/cocoon/oauth/dpop" 39 39 "github.com/haileyok/cocoon/oauth/provider" 40 40 "github.com/haileyok/cocoon/plc" 41 + "github.com/ipfs/go-cid" 41 42 echo_session "github.com/labstack/echo-contrib/session" 42 43 "github.com/labstack/echo/v4" 43 44 "github.com/labstack/echo/v4/middleware" ··· 104 105 SessionSecret string 105 106 106 107 DefaultAtprotoProxy string 108 + 109 + BlockstoreVariant BlockstoreVariant 107 110 } 108 111 109 112 type config struct { ··· 117 120 SmtpEmail string 118 121 SmtpName string 119 122 DefaultAtprotoProxy string 123 + BlockstoreVariant BlockstoreVariant 120 124 } 121 125 122 126 type CustomValidator struct { ··· 349 353 SmtpName: args.SmtpName, 350 354 SmtpEmail: args.SmtpEmail, 351 355 DefaultAtprotoProxy: args.DefaultAtprotoProxy, 356 + BlockstoreVariant: args.BlockstoreVariant, 352 357 }, 353 358 evtman: events.NewEventManager(events.NewMemPersister()), 354 359 passport: identity.NewPassport(h, identity.NewMemCache(10_000)), ··· 641 646 go s.doBackup() 642 647 } 643 648 } 649 + 650 + func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error { 651 + if err := s.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil { 652 + return err 653 + } 654 + 655 + return nil 656 + }
+5 -4
server/templates/account.html
··· 24 24 </div> 25 25 {{ else }} {{ range .Tokens }} 26 26 <div class="base-container"> 27 - <h4>{{ .ClientId }}</h4> 28 - <p>Created: {{ .CreatedAt }}</p> 29 - <p>Updated: {{ .UpdatedAt }}</p> 30 - <p>Expires: {{ .ExpiresAt }}</p> 27 + <h4>{{ .ClientName }}</h4> 28 + <p>Session Age: {{ .Age}}</p> 29 + <p>Last Updated: {{ .LastUpdated }} ago</p> 30 + <p>Expires In: {{ .ExpiresIn }}</p> 31 + <p>IP Address: {{ .Ip }}</p> 31 32 <form action="/account/revoke" method="post"> 32 33 <input type="hidden" name="token" value="{{ .Token }}" /> 33 34 <button type="submit" value="">Revoke</button>
+155
sqlite_blockstore/sqlite_blockstore.go
··· 1 + package sqlite_blockstore 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + "github.com/haileyok/cocoon/internal/db" 9 + "github.com/haileyok/cocoon/models" 10 + blocks "github.com/ipfs/go-block-format" 11 + "github.com/ipfs/go-cid" 12 + "gorm.io/gorm/clause" 13 + ) 14 + 15 + type SqliteBlockstore struct { 16 + db *db.DB 17 + did string 18 + readonly bool 19 + inserts map[cid.Cid]blocks.Block 20 + } 21 + 22 + func New(did string, db *db.DB) *SqliteBlockstore { 23 + return &SqliteBlockstore{ 24 + did: did, 25 + db: db, 26 + readonly: false, 27 + inserts: map[cid.Cid]blocks.Block{}, 28 + } 29 + } 30 + 31 + func NewReadOnly(did string, db *db.DB) *SqliteBlockstore { 32 + return &SqliteBlockstore{ 33 + did: did, 34 + db: db, 35 + readonly: true, 36 + inserts: map[cid.Cid]blocks.Block{}, 37 + } 38 + } 39 + 40 + func (bs *SqliteBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { 41 + var block models.Block 42 + 43 + maybeBlock, ok := bs.inserts[cid] 44 + if ok { 45 + return maybeBlock, nil 46 + } 47 + 48 + if err := bs.db.Raw("SELECT * FROM blocks WHERE did = ? AND cid = ?", nil, bs.did, cid.Bytes()).Scan(&block).Error; err != nil { 49 + return nil, err 50 + } 51 + 52 + b, err := blocks.NewBlockWithCid(block.Value, cid) 53 + if err != nil { 54 + return nil, err 55 + } 56 + 57 + return b, nil 58 + } 59 + 60 + func (bs *SqliteBlockstore) Put(ctx context.Context, block blocks.Block) error { 61 + bs.inserts[block.Cid()] = block 62 + 63 + if bs.readonly { 64 + return nil 65 + } 66 + 67 + b := models.Block{ 68 + Did: bs.did, 69 + Cid: block.Cid().Bytes(), 70 + Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this 71 + Value: block.RawData(), 72 + } 73 + 74 + if err := bs.db.Create(&b, []clause.Expression{clause.OnConflict{ 75 + Columns: []clause.Column{{Name: "did"}, {Name: "cid"}}, 76 + UpdateAll: true, 77 + }}).Error; err != nil { 78 + return err 79 + } 80 + 81 + return nil 82 + } 83 + 84 + func (bs *SqliteBlockstore) DeleteBlock(context.Context, cid.Cid) error { 85 + panic("not implemented") 86 + } 87 + 88 + func (bs *SqliteBlockstore) Has(context.Context, cid.Cid) (bool, error) { 89 + panic("not implemented") 90 + } 91 + 92 + func (bs *SqliteBlockstore) GetSize(context.Context, cid.Cid) (int, error) { 93 + panic("not implemented") 94 + } 95 + 96 + func (bs *SqliteBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { 97 + tx := bs.db.BeginDangerously() 98 + 99 + for _, block := range blocks { 100 + bs.inserts[block.Cid()] = block 101 + 102 + if bs.readonly { 103 + continue 104 + } 105 + 106 + b := models.Block{ 107 + Did: bs.did, 108 + Cid: block.Cid().Bytes(), 109 + Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this 110 + Value: block.RawData(), 111 + } 112 + 113 + if err := tx.Clauses(clause.OnConflict{ 114 + Columns: []clause.Column{{Name: "did"}, {Name: "cid"}}, 115 + UpdateAll: true, 116 + }).Create(&b).Error; err != nil { 117 + tx.Rollback() 118 + return err 119 + } 120 + } 121 + 122 + if bs.readonly { 123 + return nil 124 + } 125 + 126 + tx.Commit() 127 + 128 + return nil 129 + } 130 + 131 + func (bs *SqliteBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 132 + panic("not implemented") 133 + } 134 + 135 + func (bs *SqliteBlockstore) HashOnRead(enabled bool) { 136 + panic("not implemented") 137 + } 138 + 139 + func (bs *SqliteBlockstore) Execute(ctx context.Context) error { 140 + if !bs.readonly { 141 + return fmt.Errorf("blockstore was not readonly") 142 + } 143 + 144 + bs.readonly = false 145 + for _, b := range bs.inserts { 146 + bs.Put(ctx, b) 147 + } 148 + bs.readonly = true 149 + 150 + return nil 151 + } 152 + 153 + func (bs *SqliteBlockstore) GetLog() map[cid.Cid]blocks.Block { 154 + return bs.inserts 155 + }