An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

+278 -10
+3 -3
go.mod
··· 1 1 module github.com/haileyok/cocoon 2 2 3 - go 1.24.5 3 + go 1.25 4 4 5 5 require ( 6 6 github.com/Azure/go-autorest/autorest/to v0.4.1 7 7 github.com/aws/aws-sdk-go v1.55.7 8 8 github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 9 - github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe 9 + github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec 10 10 github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 11 11 github.com/domodwyer/mailyak/v3 v3.6.2 12 12 github.com/go-pkgz/expirable-cache/v3 v3.0.0 ··· 42 42 github.com/Azure/go-autorest v14.2.0+incompatible // indirect 43 43 github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect 44 44 github.com/beorn7/perks v1.0.1 // indirect 45 - github.com/carlmjohnson/versioninfo v0.22.5 // indirect 46 45 github.com/cespare/xxhash/v2 v2.3.0 // indirect 47 46 github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect 48 47 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect 48 + github.com/earthboundkid/versioninfo/v2 v2.24.1 // indirect 49 49 github.com/felixge/httpsnoop v1.0.4 // indirect 50 50 github.com/go-logr/logr v1.4.2 // indirect 51 51 github.com/go-logr/stdr v1.2.2 // indirect
+4 -4
go.sum
··· 18 18 github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= 19 19 github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 h1:btHMur2kTRgWEnCHn6LaI3BE9YRgsqTpwpJ1UdB7VEk= 20 20 github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934/go.mod h1:LWamyZfbQGW7PaVc5jumFfjgrshJ5mXgDUnR6fK7+BI= 21 - github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo= 22 - github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck= 21 + github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec h1:fubriMftMNEmb35sF07gDCsdUSEd0+EIDebt/+5oQRU= 22 + github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec/go.mod h1:VG/LeqLGNI3Ew7lsYixajnZGFfWPv144qbUddh+Oyag= 23 23 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= 24 24 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= 25 25 github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc= 26 26 github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= 27 - github.com/carlmjohnson/versioninfo v0.22.5 h1:O00sjOLUAFxYQjlN/bzYTuZiS0y6fWDQjMRvwtKgwwc= 28 - github.com/carlmjohnson/versioninfo v0.22.5/go.mod h1:QT9mph3wcVfISUKd0i9sZfVrPviHuSF+cUtLjm2WSf8= 29 27 github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 30 28 github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 31 29 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= ··· 40 38 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= 41 39 github.com/domodwyer/mailyak/v3 v3.6.2 h1:x3tGMsyFhTCaxp6ycgR0FE/bu5QiNp+hetUuCOBXMn8= 42 40 github.com/domodwyer/mailyak/v3 v3.6.2/go.mod h1:lOm/u9CyCVWHeaAmHIdF4RiKVxKUT/H5XX10lIKAL6c= 41 + github.com/earthboundkid/versioninfo/v2 v2.24.1 h1:SJTMHaoUx3GzjjnUO1QzP3ZXK6Ee/nbWyCm58eY3oUg= 42 + github.com/earthboundkid/versioninfo/v2 v2.24.1/go.mod h1:VcWEooDEuyUJnMfbdTh0uFN4cfEIg+kHMuWB2CDCLjw= 43 43 github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= 44 44 github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= 45 45 github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
+8
models/models.go
··· 136 136 PrivateKey []byte 137 137 CreatedAt time.Time `gorm:"index"` 138 138 } 139 + 140 + type EventRecord struct { 141 + Seq int64 `gorm:"primaryKey;autoIncrement:false"` 142 + CreatedAt time.Time 143 + Did string `gorm:"index"` 144 + Type string 145 + Data []byte 146 + }
+13 -1
server/handle_sync_subscribe_repos.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "strconv" 5 6 "time" 6 7 7 8 "github.com/bluesky-social/indigo/events" ··· 27 28 logger = logger.With("ident", ident) 28 29 logger.Info("new connection established") 29 30 31 + var since *int64 32 + if cursorStr := e.QueryParam("cursor"); cursorStr != "" { 33 + cursor, err := strconv.ParseInt(cursorStr, 10, 64) 34 + if err != nil { 35 + logger.Warn("invalid cursor parameter", "cursor", cursorStr, "err", err) 36 + } else { 37 + since = &cursor 38 + logger.Info("subscribing with cursor", "cursor", cursor) 39 + } 40 + } 41 + 30 42 metrics.RelaysConnected.WithLabelValues(ident).Inc() 31 43 defer func() { 32 44 metrics.RelaysConnected.WithLabelValues(ident).Dec() ··· 34 46 35 47 evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 36 48 return true 37 - }, nil) 49 + }, since) 38 50 if err != nil { 39 51 return err 40 52 }
+243
server/persist.go
··· 1 + package server 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "sync" 8 + "time" 9 + 10 + "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/events" 12 + indigomodels "github.com/bluesky-social/indigo/models" 13 + cbg "github.com/whyrusleeping/cbor-gen" 14 + "gorm.io/gorm" 15 + 16 + "github.com/haileyok/cocoon/models" 17 + ) 18 + 19 + type DbPersister struct { 20 + Db *gorm.DB 21 + 22 + Lk sync.Mutex 23 + Seq int64 24 + 25 + Broadcast func(*events.XRPCStreamEvent) 26 + 27 + // how long do we actually want to keep these things around 28 + Retention time.Duration 29 + } 30 + 31 + func NewDbPersister(db *gorm.DB, retention time.Duration) (*DbPersister, error) { 32 + if err := db.AutoMigrate(&models.EventRecord{}); err != nil { 33 + return nil, fmt.Errorf("failed to migrate EventRecord: %w", err) 34 + } 35 + 36 + if retention == 0 { 37 + retention = 72 * time.Hour 38 + } 39 + 40 + p := &DbPersister{ 41 + Db: db, 42 + Retention: retention, 43 + } 44 + 45 + // kind of hacky. we will try and get the latest one from the db, but if it doesn't exist...well we have a problem 46 + // because the relay will already have _some_ value > 0 set as a cursor, we'll want to just set this to some high value 47 + // we'll just grab a current unix timestamp and set that as the cursor 48 + var lastEvent models.EventRecord 49 + if err := db.Order("seq desc").Limit(1).First(&lastEvent).Error; err != nil { 50 + if err != gorm.ErrRecordNotFound { 51 + return nil, fmt.Errorf("failed to get last event seq: %w", err) 52 + } 53 + p.Seq = time.Now().Unix() 54 + } else { 55 + p.Seq = lastEvent.Seq 56 + } 57 + 58 + go p.cleanupRoutine() 59 + 60 + return p, nil 61 + } 62 + 63 + func (p *DbPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 64 + p.Broadcast = brc 65 + } 66 + 67 + func (p *DbPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error { 68 + p.Lk.Lock() 69 + defer p.Lk.Unlock() 70 + 71 + p.Seq++ 72 + seq := p.Seq 73 + 74 + var did string 75 + var evtType string 76 + 77 + switch { 78 + case e.RepoCommit != nil: 79 + e.RepoCommit.Seq = seq 80 + did = e.RepoCommit.Repo 81 + evtType = "commit" 82 + case e.RepoSync != nil: 83 + e.RepoSync.Seq = seq 84 + did = e.RepoSync.Did 85 + evtType = "sync" 86 + case e.RepoIdentity != nil: 87 + e.RepoIdentity.Seq = seq 88 + did = e.RepoIdentity.Did 89 + evtType = "identity" 90 + case e.RepoAccount != nil: 91 + e.RepoAccount.Seq = seq 92 + did = e.RepoAccount.Did 93 + evtType = "account" 94 + default: 95 + return fmt.Errorf("unknown event type") 96 + } 97 + 98 + data, err := serializeEvent(e) 99 + if err != nil { 100 + return fmt.Errorf("failed to serialize event: %w", err) 101 + } 102 + 103 + rec := &models.EventRecord{ 104 + Seq: seq, 105 + CreatedAt: time.Now(), 106 + Did: did, 107 + Type: evtType, 108 + Data: data, 109 + } 110 + 111 + if err := p.Db.Create(rec).Error; err != nil { 112 + return fmt.Errorf("failed to persist event: %w", err) 113 + } 114 + 115 + if p.Broadcast != nil { 116 + p.Broadcast(e) 117 + } 118 + 119 + return nil 120 + } 121 + 122 + func (p *DbPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 123 + const pageSize = 500 124 + 125 + cursor := since 126 + for { 127 + var records []models.EventRecord 128 + if err := p.Db.WithContext(ctx). 129 + Where("seq > ?", cursor). 130 + Order("seq asc"). 131 + Limit(pageSize). 132 + Find(&records).Error; err != nil { 133 + return fmt.Errorf("failed to query events: %w", err) 134 + } 135 + 136 + if len(records) == 0 { 137 + return nil 138 + } 139 + 140 + for _, rec := range records { 141 + evt, err := deserializeEvent(rec.Type, rec.Data) 142 + if err != nil { 143 + return fmt.Errorf("failed to deserialize event %d: %w", rec.Seq, err) 144 + } 145 + 146 + if err := cb(evt); err != nil { 147 + return err 148 + } 149 + 150 + cursor = rec.Seq 151 + } 152 + 153 + if len(records) < pageSize { 154 + return nil 155 + } 156 + } 157 + } 158 + 159 + func (p *DbPersister) TakeDownRepo(ctx context.Context, uid indigomodels.Uid) error { 160 + return nil 161 + } 162 + 163 + func (p *DbPersister) Flush(ctx context.Context) error { 164 + return nil 165 + } 166 + 167 + func (p *DbPersister) Shutdown(ctx context.Context) error { 168 + return nil 169 + } 170 + 171 + func (p *DbPersister) cleanupRoutine() { 172 + ticker := time.NewTicker(time.Hour) 173 + defer ticker.Stop() 174 + 175 + for range ticker.C { 176 + cutoff := time.Now().Add(-p.Retention) 177 + if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil { 178 + continue 179 + } 180 + } 181 + } 182 + 183 + func serializeEvent(e *events.XRPCStreamEvent) ([]byte, error) { 184 + buf := new(bytes.Buffer) 185 + cw := cbg.NewCborWriter(buf) 186 + 187 + switch { 188 + case e.RepoCommit != nil: 189 + if err := e.RepoCommit.MarshalCBOR(cw); err != nil { 190 + return nil, err 191 + } 192 + case e.RepoSync != nil: 193 + if err := e.RepoSync.MarshalCBOR(cw); err != nil { 194 + return nil, err 195 + } 196 + case e.RepoIdentity != nil: 197 + if err := e.RepoIdentity.MarshalCBOR(cw); err != nil { 198 + return nil, err 199 + } 200 + case e.RepoAccount != nil: 201 + if err := e.RepoAccount.MarshalCBOR(cw); err != nil { 202 + return nil, err 203 + } 204 + default: 205 + return nil, fmt.Errorf("unknown event type") 206 + } 207 + 208 + return buf.Bytes(), nil 209 + } 210 + 211 + func deserializeEvent(evtType string, data []byte) (*events.XRPCStreamEvent, error) { 212 + r := bytes.NewReader(data) 213 + cr := cbg.NewCborReader(r) 214 + 215 + switch evtType { 216 + case "commit": 217 + evt := &atproto.SyncSubscribeRepos_Commit{} 218 + if err := evt.UnmarshalCBOR(cr); err != nil { 219 + return nil, err 220 + } 221 + return &events.XRPCStreamEvent{RepoCommit: evt}, nil 222 + case "sync": 223 + evt := &atproto.SyncSubscribeRepos_Sync{} 224 + if err := evt.UnmarshalCBOR(cr); err != nil { 225 + return nil, err 226 + } 227 + return &events.XRPCStreamEvent{RepoSync: evt}, nil 228 + case "identity": 229 + evt := &atproto.SyncSubscribeRepos_Identity{} 230 + if err := evt.UnmarshalCBOR(cr); err != nil { 231 + return nil, err 232 + } 233 + return &events.XRPCStreamEvent{RepoIdentity: evt}, nil 234 + case "account": 235 + evt := &atproto.SyncSubscribeRepos_Account{} 236 + if err := evt.UnmarshalCBOR(cr); err != nil { 237 + return nil, err 238 + } 239 + return &events.XRPCStreamEvent{RepoAccount: evt}, nil 240 + default: 241 + return nil, fmt.Errorf("unknown event type: %s", evtType) 242 + } 243 + }
+1 -1
server/repo.go
··· 39 39 return &RepoMan{ 40 40 s: s, 41 41 db: s.db, 42 - clock: &clock, 42 + clock: clock, 43 43 } 44 44 } 45 45
+6 -1
server/server.go
··· 405 405 nonceSecret = maybeSecret 406 406 } 407 407 408 + evtPersister, err := NewDbPersister(gdb, 72*time.Hour) 409 + if err != nil { 410 + return nil, fmt.Errorf("failed to create event persister: %w", err) 411 + } 412 + 408 413 s := &Server{ 409 414 http: h, 410 415 httpd: httpd, ··· 429 434 BlockstoreVariant: args.BlockstoreVariant, 430 435 FallbackProxy: args.FallbackProxy, 431 436 }, 432 - evtman: events.NewEventManager(events.NewMemPersister()), 437 + evtman: events.NewEventManager(evtPersister), 433 438 passport: identity.NewPassport(h, identity.NewMemCache(10_000)), 434 439 435 440 dbName: args.DbName,