tangled
alpha
login
or
join now
hailey.at
/
cocoon
An atproto PDS written in Go
85
fork
atom
overview
issues
pulls
pipelines
Compare changes
Choose any two refs to compare.
base:
main
hailey/try-fix-no-req-uri
hailey/totp
hailey/tidy
hailey/support-jwks-in-metadata
hailey/s3-blobstore
hailey/rm-locking-db
hailey/refactor-identity-package
hailey/move-sqlite-blockstore
hailey/fix-get-blocks
hailey/fix-dpop-nonce-err
hailey/deactivate-activate
hailey/db-ctx
hailey/cleanup-write-records
hailey/attempt-reconnect-websocket
v0.8.4
v0.8.3
v0.8.2
v0.8.1
v0.8.0
v0.7.2
list
v0.7.1
v0.7.0
v0.6.0
0.5.1
v0.5.1
0.5.0
0.4.4
0.4.3
0.4.2
0.4.1
0.4.0
0.3.6
0.3.5
0.3.4
0.3.3
0.3.2
0.3.1
0.3
0.2
0.1
0.0.6
0.0.5
0.0.4
0.0.3
0.0.2
v0.0.2
v0.0.1
compare:
main
hailey/try-fix-no-req-uri
hailey/totp
hailey/tidy
hailey/support-jwks-in-metadata
hailey/s3-blobstore
hailey/rm-locking-db
hailey/refactor-identity-package
hailey/move-sqlite-blockstore
hailey/fix-get-blocks
hailey/fix-dpop-nonce-err
hailey/deactivate-activate
hailey/db-ctx
hailey/cleanup-write-records
hailey/attempt-reconnect-websocket
v0.8.4
v0.8.3
v0.8.2
v0.8.1
v0.8.0
v0.7.2
list
v0.7.1
v0.7.0
v0.6.0
0.5.1
v0.5.1
0.5.0
0.4.4
0.4.3
0.4.2
0.4.1
0.4.0
0.3.6
0.3.5
0.3.4
0.3.3
0.3.2
0.3.1
0.3
0.2
0.1
0.0.6
0.0.5
0.0.4
0.0.3
0.0.2
v0.0.2
v0.0.1
go
+278
-10
7 changed files
expand all
collapse all
unified
split
go.mod
go.sum
models
models.go
server
handle_sync_subscribe_repos.go
persist.go
repo.go
server.go
+3
-3
go.mod
···
1
1
module github.com/haileyok/cocoon
2
2
3
3
-
go 1.24.5
3
3
+
go 1.25
4
4
5
5
require (
6
6
github.com/Azure/go-autorest/autorest/to v0.4.1
7
7
github.com/aws/aws-sdk-go v1.55.7
8
8
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934
9
9
-
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe
9
9
+
github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec
10
10
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792
11
11
github.com/domodwyer/mailyak/v3 v3.6.2
12
12
github.com/go-pkgz/expirable-cache/v3 v3.0.0
···
42
42
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
43
43
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect
44
44
github.com/beorn7/perks v1.0.1 // indirect
45
45
-
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
46
45
github.com/cespare/xxhash/v2 v2.3.0 // indirect
47
46
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
48
47
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
48
48
+
github.com/earthboundkid/versioninfo/v2 v2.24.1 // indirect
49
49
github.com/felixge/httpsnoop v1.0.4 // indirect
50
50
github.com/go-logr/logr v1.4.2 // indirect
51
51
github.com/go-logr/stdr v1.2.2 // indirect
+4
-4
go.sum
···
18
18
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
19
19
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 h1:btHMur2kTRgWEnCHn6LaI3BE9YRgsqTpwpJ1UdB7VEk=
20
20
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934/go.mod h1:LWamyZfbQGW7PaVc5jumFfjgrshJ5mXgDUnR6fK7+BI=
21
21
-
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo=
22
22
-
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck=
21
21
+
github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec h1:fubriMftMNEmb35sF07gDCsdUSEd0+EIDebt/+5oQRU=
22
22
+
github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec/go.mod h1:VG/LeqLGNI3Ew7lsYixajnZGFfWPv144qbUddh+Oyag=
23
23
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
24
24
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
25
25
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
26
26
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
27
27
-
github.com/carlmjohnson/versioninfo v0.22.5 h1:O00sjOLUAFxYQjlN/bzYTuZiS0y6fWDQjMRvwtKgwwc=
28
28
-
github.com/carlmjohnson/versioninfo v0.22.5/go.mod h1:QT9mph3wcVfISUKd0i9sZfVrPviHuSF+cUtLjm2WSf8=
29
27
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
30
28
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
31
29
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
···
40
38
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
41
39
github.com/domodwyer/mailyak/v3 v3.6.2 h1:x3tGMsyFhTCaxp6ycgR0FE/bu5QiNp+hetUuCOBXMn8=
42
40
github.com/domodwyer/mailyak/v3 v3.6.2/go.mod h1:lOm/u9CyCVWHeaAmHIdF4RiKVxKUT/H5XX10lIKAL6c=
41
41
+
github.com/earthboundkid/versioninfo/v2 v2.24.1 h1:SJTMHaoUx3GzjjnUO1QzP3ZXK6Ee/nbWyCm58eY3oUg=
42
42
+
github.com/earthboundkid/versioninfo/v2 v2.24.1/go.mod h1:VcWEooDEuyUJnMfbdTh0uFN4cfEIg+kHMuWB2CDCLjw=
43
43
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
44
44
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
45
45
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
+8
models/models.go
···
136
136
PrivateKey []byte
137
137
CreatedAt time.Time `gorm:"index"`
138
138
}
139
139
+
140
140
+
type EventRecord struct {
141
141
+
Seq int64 `gorm:"primaryKey;autoIncrement:false"`
142
142
+
CreatedAt time.Time
143
143
+
Did string `gorm:"index"`
144
144
+
Type string
145
145
+
Data []byte
146
146
+
}
+13
-1
server/handle_sync_subscribe_repos.go
···
2
2
3
3
import (
4
4
"context"
5
5
+
"strconv"
5
6
"time"
6
7
7
8
"github.com/bluesky-social/indigo/events"
···
27
28
logger = logger.With("ident", ident)
28
29
logger.Info("new connection established")
29
30
31
31
+
var since *int64
32
32
+
if cursorStr := e.QueryParam("cursor"); cursorStr != "" {
33
33
+
cursor, err := strconv.ParseInt(cursorStr, 10, 64)
34
34
+
if err != nil {
35
35
+
logger.Warn("invalid cursor parameter", "cursor", cursorStr, "err", err)
36
36
+
} else {
37
37
+
since = &cursor
38
38
+
logger.Info("subscribing with cursor", "cursor", cursor)
39
39
+
}
40
40
+
}
41
41
+
30
42
metrics.RelaysConnected.WithLabelValues(ident).Inc()
31
43
defer func() {
32
44
metrics.RelaysConnected.WithLabelValues(ident).Dec()
···
34
46
35
47
evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
36
48
return true
37
37
-
}, nil)
49
49
+
}, since)
38
50
if err != nil {
39
51
return err
40
52
}
+243
server/persist.go
···
1
1
+
package server
2
2
+
3
3
+
import (
4
4
+
"bytes"
5
5
+
"context"
6
6
+
"fmt"
7
7
+
"sync"
8
8
+
"time"
9
9
+
10
10
+
"github.com/bluesky-social/indigo/api/atproto"
11
11
+
"github.com/bluesky-social/indigo/events"
12
12
+
indigomodels "github.com/bluesky-social/indigo/models"
13
13
+
cbg "github.com/whyrusleeping/cbor-gen"
14
14
+
"gorm.io/gorm"
15
15
+
16
16
+
"github.com/haileyok/cocoon/models"
17
17
+
)
18
18
+
19
19
+
type DbPersister struct {
20
20
+
Db *gorm.DB
21
21
+
22
22
+
Lk sync.Mutex
23
23
+
Seq int64
24
24
+
25
25
+
Broadcast func(*events.XRPCStreamEvent)
26
26
+
27
27
+
// how long do we actually want to keep these things around
28
28
+
Retention time.Duration
29
29
+
}
30
30
+
31
31
+
func NewDbPersister(db *gorm.DB, retention time.Duration) (*DbPersister, error) {
32
32
+
if err := db.AutoMigrate(&models.EventRecord{}); err != nil {
33
33
+
return nil, fmt.Errorf("failed to migrate EventRecord: %w", err)
34
34
+
}
35
35
+
36
36
+
if retention == 0 {
37
37
+
retention = 72 * time.Hour
38
38
+
}
39
39
+
40
40
+
p := &DbPersister{
41
41
+
Db: db,
42
42
+
Retention: retention,
43
43
+
}
44
44
+
45
45
+
// kind of hacky. we will try and get the latest one from the db, but if it doesn't exist...well we have a problem
46
46
+
// because the relay will already have _some_ value > 0 set as a cursor, we'll want to just set this to some high value
47
47
+
// we'll just grab a current unix timestamp and set that as the cursor
48
48
+
var lastEvent models.EventRecord
49
49
+
if err := db.Order("seq desc").Limit(1).First(&lastEvent).Error; err != nil {
50
50
+
if err != gorm.ErrRecordNotFound {
51
51
+
return nil, fmt.Errorf("failed to get last event seq: %w", err)
52
52
+
}
53
53
+
p.Seq = time.Now().Unix()
54
54
+
} else {
55
55
+
p.Seq = lastEvent.Seq
56
56
+
}
57
57
+
58
58
+
go p.cleanupRoutine()
59
59
+
60
60
+
return p, nil
61
61
+
}
62
62
+
63
63
+
func (p *DbPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
64
64
+
p.Broadcast = brc
65
65
+
}
66
66
+
67
67
+
func (p *DbPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
68
68
+
p.Lk.Lock()
69
69
+
defer p.Lk.Unlock()
70
70
+
71
71
+
p.Seq++
72
72
+
seq := p.Seq
73
73
+
74
74
+
var did string
75
75
+
var evtType string
76
76
+
77
77
+
switch {
78
78
+
case e.RepoCommit != nil:
79
79
+
e.RepoCommit.Seq = seq
80
80
+
did = e.RepoCommit.Repo
81
81
+
evtType = "commit"
82
82
+
case e.RepoSync != nil:
83
83
+
e.RepoSync.Seq = seq
84
84
+
did = e.RepoSync.Did
85
85
+
evtType = "sync"
86
86
+
case e.RepoIdentity != nil:
87
87
+
e.RepoIdentity.Seq = seq
88
88
+
did = e.RepoIdentity.Did
89
89
+
evtType = "identity"
90
90
+
case e.RepoAccount != nil:
91
91
+
e.RepoAccount.Seq = seq
92
92
+
did = e.RepoAccount.Did
93
93
+
evtType = "account"
94
94
+
default:
95
95
+
return fmt.Errorf("unknown event type")
96
96
+
}
97
97
+
98
98
+
data, err := serializeEvent(e)
99
99
+
if err != nil {
100
100
+
return fmt.Errorf("failed to serialize event: %w", err)
101
101
+
}
102
102
+
103
103
+
rec := &models.EventRecord{
104
104
+
Seq: seq,
105
105
+
CreatedAt: time.Now(),
106
106
+
Did: did,
107
107
+
Type: evtType,
108
108
+
Data: data,
109
109
+
}
110
110
+
111
111
+
if err := p.Db.Create(rec).Error; err != nil {
112
112
+
return fmt.Errorf("failed to persist event: %w", err)
113
113
+
}
114
114
+
115
115
+
if p.Broadcast != nil {
116
116
+
p.Broadcast(e)
117
117
+
}
118
118
+
119
119
+
return nil
120
120
+
}
121
121
+
122
122
+
func (p *DbPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
123
123
+
const pageSize = 500
124
124
+
125
125
+
cursor := since
126
126
+
for {
127
127
+
var records []models.EventRecord
128
128
+
if err := p.Db.WithContext(ctx).
129
129
+
Where("seq > ?", cursor).
130
130
+
Order("seq asc").
131
131
+
Limit(pageSize).
132
132
+
Find(&records).Error; err != nil {
133
133
+
return fmt.Errorf("failed to query events: %w", err)
134
134
+
}
135
135
+
136
136
+
if len(records) == 0 {
137
137
+
return nil
138
138
+
}
139
139
+
140
140
+
for _, rec := range records {
141
141
+
evt, err := deserializeEvent(rec.Type, rec.Data)
142
142
+
if err != nil {
143
143
+
return fmt.Errorf("failed to deserialize event %d: %w", rec.Seq, err)
144
144
+
}
145
145
+
146
146
+
if err := cb(evt); err != nil {
147
147
+
return err
148
148
+
}
149
149
+
150
150
+
cursor = rec.Seq
151
151
+
}
152
152
+
153
153
+
if len(records) < pageSize {
154
154
+
return nil
155
155
+
}
156
156
+
}
157
157
+
}
158
158
+
159
159
+
func (p *DbPersister) TakeDownRepo(ctx context.Context, uid indigomodels.Uid) error {
160
160
+
return nil
161
161
+
}
162
162
+
163
163
+
func (p *DbPersister) Flush(ctx context.Context) error {
164
164
+
return nil
165
165
+
}
166
166
+
167
167
+
func (p *DbPersister) Shutdown(ctx context.Context) error {
168
168
+
return nil
169
169
+
}
170
170
+
171
171
+
func (p *DbPersister) cleanupRoutine() {
172
172
+
ticker := time.NewTicker(time.Hour)
173
173
+
defer ticker.Stop()
174
174
+
175
175
+
for range ticker.C {
176
176
+
cutoff := time.Now().Add(-p.Retention)
177
177
+
if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil {
178
178
+
continue
179
179
+
}
180
180
+
}
181
181
+
}
182
182
+
183
183
+
func serializeEvent(e *events.XRPCStreamEvent) ([]byte, error) {
184
184
+
buf := new(bytes.Buffer)
185
185
+
cw := cbg.NewCborWriter(buf)
186
186
+
187
187
+
switch {
188
188
+
case e.RepoCommit != nil:
189
189
+
if err := e.RepoCommit.MarshalCBOR(cw); err != nil {
190
190
+
return nil, err
191
191
+
}
192
192
+
case e.RepoSync != nil:
193
193
+
if err := e.RepoSync.MarshalCBOR(cw); err != nil {
194
194
+
return nil, err
195
195
+
}
196
196
+
case e.RepoIdentity != nil:
197
197
+
if err := e.RepoIdentity.MarshalCBOR(cw); err != nil {
198
198
+
return nil, err
199
199
+
}
200
200
+
case e.RepoAccount != nil:
201
201
+
if err := e.RepoAccount.MarshalCBOR(cw); err != nil {
202
202
+
return nil, err
203
203
+
}
204
204
+
default:
205
205
+
return nil, fmt.Errorf("unknown event type")
206
206
+
}
207
207
+
208
208
+
return buf.Bytes(), nil
209
209
+
}
210
210
+
211
211
+
func deserializeEvent(evtType string, data []byte) (*events.XRPCStreamEvent, error) {
212
212
+
r := bytes.NewReader(data)
213
213
+
cr := cbg.NewCborReader(r)
214
214
+
215
215
+
switch evtType {
216
216
+
case "commit":
217
217
+
evt := &atproto.SyncSubscribeRepos_Commit{}
218
218
+
if err := evt.UnmarshalCBOR(cr); err != nil {
219
219
+
return nil, err
220
220
+
}
221
221
+
return &events.XRPCStreamEvent{RepoCommit: evt}, nil
222
222
+
case "sync":
223
223
+
evt := &atproto.SyncSubscribeRepos_Sync{}
224
224
+
if err := evt.UnmarshalCBOR(cr); err != nil {
225
225
+
return nil, err
226
226
+
}
227
227
+
return &events.XRPCStreamEvent{RepoSync: evt}, nil
228
228
+
case "identity":
229
229
+
evt := &atproto.SyncSubscribeRepos_Identity{}
230
230
+
if err := evt.UnmarshalCBOR(cr); err != nil {
231
231
+
return nil, err
232
232
+
}
233
233
+
return &events.XRPCStreamEvent{RepoIdentity: evt}, nil
234
234
+
case "account":
235
235
+
evt := &atproto.SyncSubscribeRepos_Account{}
236
236
+
if err := evt.UnmarshalCBOR(cr); err != nil {
237
237
+
return nil, err
238
238
+
}
239
239
+
return &events.XRPCStreamEvent{RepoAccount: evt}, nil
240
240
+
default:
241
241
+
return nil, fmt.Errorf("unknown event type: %s", evtType)
242
242
+
}
243
243
+
}
+1
-1
server/repo.go
···
39
39
return &RepoMan{
40
40
s: s,
41
41
db: s.db,
42
42
-
clock: &clock,
42
42
+
clock: clock,
43
43
}
44
44
}
45
45
+6
-1
server/server.go
···
405
405
nonceSecret = maybeSecret
406
406
}
407
407
408
408
+
evtPersister, err := NewDbPersister(gdb, 72*time.Hour)
409
409
+
if err != nil {
410
410
+
return nil, fmt.Errorf("failed to create event persister: %w", err)
411
411
+
}
412
412
+
408
413
s := &Server{
409
414
http: h,
410
415
httpd: httpd,
···
429
434
BlockstoreVariant: args.BlockstoreVariant,
430
435
FallbackProxy: args.FallbackProxy,
431
436
},
432
432
-
evtman: events.NewEventManager(events.NewMemPersister()),
437
437
+
evtman: events.NewEventManager(evtPersister),
433
438
passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
434
439
435
440
dbName: args.DbName,