tangled
alpha
login
or
join now
hailey.at
/
cocoon
An atproto PDS written in Go
85
fork
atom
overview
issues
pulls
pipelines
Compare changes
Choose any two refs to compare.
base:
main
hailey/try-fix-no-req-uri
hailey/totp
hailey/tidy
hailey/support-jwks-in-metadata
hailey/s3-blobstore
hailey/rm-locking-db
hailey/refactor-identity-package
hailey/move-sqlite-blockstore
hailey/fix-get-blocks
hailey/fix-dpop-nonce-err
hailey/deactivate-activate
hailey/db-ctx
hailey/cleanup-write-records
hailey/attempt-reconnect-websocket
v0.8.4
v0.8.3
v0.8.2
v0.8.1
v0.8.0
v0.7.2
list
v0.7.1
v0.7.0
v0.6.0
0.5.1
v0.5.1
0.5.0
0.4.4
0.4.3
0.4.2
0.4.1
0.4.0
0.3.6
0.3.5
0.3.4
0.3.3
0.3.2
0.3.1
0.3
0.2
0.1
0.0.6
0.0.5
0.0.4
0.0.3
0.0.2
v0.0.2
v0.0.1
compare:
main
hailey/try-fix-no-req-uri
hailey/totp
hailey/tidy
hailey/support-jwks-in-metadata
hailey/s3-blobstore
hailey/rm-locking-db
hailey/refactor-identity-package
hailey/move-sqlite-blockstore
hailey/fix-get-blocks
hailey/fix-dpop-nonce-err
hailey/deactivate-activate
hailey/db-ctx
hailey/cleanup-write-records
hailey/attempt-reconnect-websocket
v0.8.4
v0.8.3
v0.8.2
v0.8.1
v0.8.0
v0.7.2
list
v0.7.1
v0.7.0
v0.6.0
0.5.1
v0.5.1
0.5.0
0.4.4
0.4.3
0.4.2
0.4.1
0.4.0
0.3.6
0.3.5
0.3.4
0.3.3
0.3.2
0.3.1
0.3
0.2
0.1
0.0.6
0.0.5
0.0.4
0.0.3
0.0.2
v0.0.2
v0.0.1
go
+278
-10
7 changed files
expand all
collapse all
unified
split
go.mod
go.sum
models
models.go
server
handle_sync_subscribe_repos.go
persist.go
repo.go
server.go
+3
-3
go.mod
···
1
module github.com/haileyok/cocoon
2
3
-
go 1.24.5
4
5
require (
6
github.com/Azure/go-autorest/autorest/to v0.4.1
7
github.com/aws/aws-sdk-go v1.55.7
8
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934
9
-
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe
10
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792
11
github.com/domodwyer/mailyak/v3 v3.6.2
12
github.com/go-pkgz/expirable-cache/v3 v3.0.0
···
42
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
43
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect
44
github.com/beorn7/perks v1.0.1 // indirect
45
-
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
46
github.com/cespare/xxhash/v2 v2.3.0 // indirect
47
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
48
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
0
49
github.com/felixge/httpsnoop v1.0.4 // indirect
50
github.com/go-logr/logr v1.4.2 // indirect
51
github.com/go-logr/stdr v1.2.2 // indirect
···
1
module github.com/haileyok/cocoon
2
3
+
go 1.25
4
5
require (
6
github.com/Azure/go-autorest/autorest/to v0.4.1
7
github.com/aws/aws-sdk-go v1.55.7
8
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934
9
+
github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec
10
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792
11
github.com/domodwyer/mailyak/v3 v3.6.2
12
github.com/go-pkgz/expirable-cache/v3 v3.0.0
···
42
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
43
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect
44
github.com/beorn7/perks v1.0.1 // indirect
0
45
github.com/cespare/xxhash/v2 v2.3.0 // indirect
46
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
47
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
48
+
github.com/earthboundkid/versioninfo/v2 v2.24.1 // indirect
49
github.com/felixge/httpsnoop v1.0.4 // indirect
50
github.com/go-logr/logr v1.4.2 // indirect
51
github.com/go-logr/stdr v1.2.2 // indirect
+4
-4
go.sum
···
18
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
19
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 h1:btHMur2kTRgWEnCHn6LaI3BE9YRgsqTpwpJ1UdB7VEk=
20
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934/go.mod h1:LWamyZfbQGW7PaVc5jumFfjgrshJ5mXgDUnR6fK7+BI=
21
-
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo=
22
-
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck=
23
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
24
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
25
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
26
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
27
-
github.com/carlmjohnson/versioninfo v0.22.5 h1:O00sjOLUAFxYQjlN/bzYTuZiS0y6fWDQjMRvwtKgwwc=
28
-
github.com/carlmjohnson/versioninfo v0.22.5/go.mod h1:QT9mph3wcVfISUKd0i9sZfVrPviHuSF+cUtLjm2WSf8=
29
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
30
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
31
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
···
40
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
41
github.com/domodwyer/mailyak/v3 v3.6.2 h1:x3tGMsyFhTCaxp6ycgR0FE/bu5QiNp+hetUuCOBXMn8=
42
github.com/domodwyer/mailyak/v3 v3.6.2/go.mod h1:lOm/u9CyCVWHeaAmHIdF4RiKVxKUT/H5XX10lIKAL6c=
0
0
43
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
44
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
45
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
···
18
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
19
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 h1:btHMur2kTRgWEnCHn6LaI3BE9YRgsqTpwpJ1UdB7VEk=
20
github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934/go.mod h1:LWamyZfbQGW7PaVc5jumFfjgrshJ5mXgDUnR6fK7+BI=
21
+
github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec h1:fubriMftMNEmb35sF07gDCsdUSEd0+EIDebt/+5oQRU=
22
+
github.com/bluesky-social/indigo v0.0.0-20260203235305-a86f3ae1f8ec/go.mod h1:VG/LeqLGNI3Ew7lsYixajnZGFfWPv144qbUddh+Oyag=
23
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
24
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
25
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
26
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
0
0
27
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
28
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
29
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
···
38
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
39
github.com/domodwyer/mailyak/v3 v3.6.2 h1:x3tGMsyFhTCaxp6ycgR0FE/bu5QiNp+hetUuCOBXMn8=
40
github.com/domodwyer/mailyak/v3 v3.6.2/go.mod h1:lOm/u9CyCVWHeaAmHIdF4RiKVxKUT/H5XX10lIKAL6c=
41
+
github.com/earthboundkid/versioninfo/v2 v2.24.1 h1:SJTMHaoUx3GzjjnUO1QzP3ZXK6Ee/nbWyCm58eY3oUg=
42
+
github.com/earthboundkid/versioninfo/v2 v2.24.1/go.mod h1:VcWEooDEuyUJnMfbdTh0uFN4cfEIg+kHMuWB2CDCLjw=
43
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
44
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
45
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
+8
models/models.go
···
136
PrivateKey []byte
137
CreatedAt time.Time `gorm:"index"`
138
}
0
0
0
0
0
0
0
0
···
136
PrivateKey []byte
137
CreatedAt time.Time `gorm:"index"`
138
}
139
+
140
+
type EventRecord struct {
141
+
Seq int64 `gorm:"primaryKey;autoIncrement:false"`
142
+
CreatedAt time.Time
143
+
Did string `gorm:"index"`
144
+
Type string
145
+
Data []byte
146
+
}
+13
-1
server/handle_sync_subscribe_repos.go
···
2
3
import (
4
"context"
0
5
"time"
6
7
"github.com/bluesky-social/indigo/events"
···
27
logger = logger.With("ident", ident)
28
logger.Info("new connection established")
29
0
0
0
0
0
0
0
0
0
0
0
30
metrics.RelaysConnected.WithLabelValues(ident).Inc()
31
defer func() {
32
metrics.RelaysConnected.WithLabelValues(ident).Dec()
···
34
35
evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
36
return true
37
-
}, nil)
38
if err != nil {
39
return err
40
}
···
2
3
import (
4
"context"
5
+
"strconv"
6
"time"
7
8
"github.com/bluesky-social/indigo/events"
···
28
logger = logger.With("ident", ident)
29
logger.Info("new connection established")
30
31
+
var since *int64
32
+
if cursorStr := e.QueryParam("cursor"); cursorStr != "" {
33
+
cursor, err := strconv.ParseInt(cursorStr, 10, 64)
34
+
if err != nil {
35
+
logger.Warn("invalid cursor parameter", "cursor", cursorStr, "err", err)
36
+
} else {
37
+
since = &cursor
38
+
logger.Info("subscribing with cursor", "cursor", cursor)
39
+
}
40
+
}
41
+
42
metrics.RelaysConnected.WithLabelValues(ident).Inc()
43
defer func() {
44
metrics.RelaysConnected.WithLabelValues(ident).Dec()
···
46
47
evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
48
return true
49
+
}, since)
50
if err != nil {
51
return err
52
}
+243
server/persist.go
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
package server
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"fmt"
7
+
"sync"
8
+
"time"
9
+
10
+
"github.com/bluesky-social/indigo/api/atproto"
11
+
"github.com/bluesky-social/indigo/events"
12
+
indigomodels "github.com/bluesky-social/indigo/models"
13
+
cbg "github.com/whyrusleeping/cbor-gen"
14
+
"gorm.io/gorm"
15
+
16
+
"github.com/haileyok/cocoon/models"
17
+
)
18
+
19
+
type DbPersister struct {
20
+
Db *gorm.DB
21
+
22
+
Lk sync.Mutex
23
+
Seq int64
24
+
25
+
Broadcast func(*events.XRPCStreamEvent)
26
+
27
+
// how long do we actually want to keep these things around
28
+
Retention time.Duration
29
+
}
30
+
31
+
func NewDbPersister(db *gorm.DB, retention time.Duration) (*DbPersister, error) {
32
+
if err := db.AutoMigrate(&models.EventRecord{}); err != nil {
33
+
return nil, fmt.Errorf("failed to migrate EventRecord: %w", err)
34
+
}
35
+
36
+
if retention == 0 {
37
+
retention = 72 * time.Hour
38
+
}
39
+
40
+
p := &DbPersister{
41
+
Db: db,
42
+
Retention: retention,
43
+
}
44
+
45
+
// kind of hacky. we will try and get the latest one from the db, but if it doesn't exist...well we have a problem
46
+
// because the relay will already have _some_ value > 0 set as a cursor, we'll want to just set this to some high value
47
+
// we'll just grab a current unix timestamp and set that as the cursor
48
+
var lastEvent models.EventRecord
49
+
if err := db.Order("seq desc").Limit(1).First(&lastEvent).Error; err != nil {
50
+
if err != gorm.ErrRecordNotFound {
51
+
return nil, fmt.Errorf("failed to get last event seq: %w", err)
52
+
}
53
+
p.Seq = time.Now().Unix()
54
+
} else {
55
+
p.Seq = lastEvent.Seq
56
+
}
57
+
58
+
go p.cleanupRoutine()
59
+
60
+
return p, nil
61
+
}
62
+
63
+
func (p *DbPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
64
+
p.Broadcast = brc
65
+
}
66
+
67
+
func (p *DbPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
68
+
p.Lk.Lock()
69
+
defer p.Lk.Unlock()
70
+
71
+
p.Seq++
72
+
seq := p.Seq
73
+
74
+
var did string
75
+
var evtType string
76
+
77
+
switch {
78
+
case e.RepoCommit != nil:
79
+
e.RepoCommit.Seq = seq
80
+
did = e.RepoCommit.Repo
81
+
evtType = "commit"
82
+
case e.RepoSync != nil:
83
+
e.RepoSync.Seq = seq
84
+
did = e.RepoSync.Did
85
+
evtType = "sync"
86
+
case e.RepoIdentity != nil:
87
+
e.RepoIdentity.Seq = seq
88
+
did = e.RepoIdentity.Did
89
+
evtType = "identity"
90
+
case e.RepoAccount != nil:
91
+
e.RepoAccount.Seq = seq
92
+
did = e.RepoAccount.Did
93
+
evtType = "account"
94
+
default:
95
+
return fmt.Errorf("unknown event type")
96
+
}
97
+
98
+
data, err := serializeEvent(e)
99
+
if err != nil {
100
+
return fmt.Errorf("failed to serialize event: %w", err)
101
+
}
102
+
103
+
rec := &models.EventRecord{
104
+
Seq: seq,
105
+
CreatedAt: time.Now(),
106
+
Did: did,
107
+
Type: evtType,
108
+
Data: data,
109
+
}
110
+
111
+
if err := p.Db.Create(rec).Error; err != nil {
112
+
return fmt.Errorf("failed to persist event: %w", err)
113
+
}
114
+
115
+
if p.Broadcast != nil {
116
+
p.Broadcast(e)
117
+
}
118
+
119
+
return nil
120
+
}
121
+
122
+
func (p *DbPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
123
+
const pageSize = 500
124
+
125
+
cursor := since
126
+
for {
127
+
var records []models.EventRecord
128
+
if err := p.Db.WithContext(ctx).
129
+
Where("seq > ?", cursor).
130
+
Order("seq asc").
131
+
Limit(pageSize).
132
+
Find(&records).Error; err != nil {
133
+
return fmt.Errorf("failed to query events: %w", err)
134
+
}
135
+
136
+
if len(records) == 0 {
137
+
return nil
138
+
}
139
+
140
+
for _, rec := range records {
141
+
evt, err := deserializeEvent(rec.Type, rec.Data)
142
+
if err != nil {
143
+
return fmt.Errorf("failed to deserialize event %d: %w", rec.Seq, err)
144
+
}
145
+
146
+
if err := cb(evt); err != nil {
147
+
return err
148
+
}
149
+
150
+
cursor = rec.Seq
151
+
}
152
+
153
+
if len(records) < pageSize {
154
+
return nil
155
+
}
156
+
}
157
+
}
158
+
159
+
func (p *DbPersister) TakeDownRepo(ctx context.Context, uid indigomodels.Uid) error {
160
+
return nil
161
+
}
162
+
163
+
func (p *DbPersister) Flush(ctx context.Context) error {
164
+
return nil
165
+
}
166
+
167
+
func (p *DbPersister) Shutdown(ctx context.Context) error {
168
+
return nil
169
+
}
170
+
171
+
func (p *DbPersister) cleanupRoutine() {
172
+
ticker := time.NewTicker(time.Hour)
173
+
defer ticker.Stop()
174
+
175
+
for range ticker.C {
176
+
cutoff := time.Now().Add(-p.Retention)
177
+
if err := p.Db.Where("created_at < ?", cutoff).Delete(&models.EventRecord{}).Error; err != nil {
178
+
continue
179
+
}
180
+
}
181
+
}
182
+
183
+
func serializeEvent(e *events.XRPCStreamEvent) ([]byte, error) {
184
+
buf := new(bytes.Buffer)
185
+
cw := cbg.NewCborWriter(buf)
186
+
187
+
switch {
188
+
case e.RepoCommit != nil:
189
+
if err := e.RepoCommit.MarshalCBOR(cw); err != nil {
190
+
return nil, err
191
+
}
192
+
case e.RepoSync != nil:
193
+
if err := e.RepoSync.MarshalCBOR(cw); err != nil {
194
+
return nil, err
195
+
}
196
+
case e.RepoIdentity != nil:
197
+
if err := e.RepoIdentity.MarshalCBOR(cw); err != nil {
198
+
return nil, err
199
+
}
200
+
case e.RepoAccount != nil:
201
+
if err := e.RepoAccount.MarshalCBOR(cw); err != nil {
202
+
return nil, err
203
+
}
204
+
default:
205
+
return nil, fmt.Errorf("unknown event type")
206
+
}
207
+
208
+
return buf.Bytes(), nil
209
+
}
210
+
211
+
func deserializeEvent(evtType string, data []byte) (*events.XRPCStreamEvent, error) {
212
+
r := bytes.NewReader(data)
213
+
cr := cbg.NewCborReader(r)
214
+
215
+
switch evtType {
216
+
case "commit":
217
+
evt := &atproto.SyncSubscribeRepos_Commit{}
218
+
if err := evt.UnmarshalCBOR(cr); err != nil {
219
+
return nil, err
220
+
}
221
+
return &events.XRPCStreamEvent{RepoCommit: evt}, nil
222
+
case "sync":
223
+
evt := &atproto.SyncSubscribeRepos_Sync{}
224
+
if err := evt.UnmarshalCBOR(cr); err != nil {
225
+
return nil, err
226
+
}
227
+
return &events.XRPCStreamEvent{RepoSync: evt}, nil
228
+
case "identity":
229
+
evt := &atproto.SyncSubscribeRepos_Identity{}
230
+
if err := evt.UnmarshalCBOR(cr); err != nil {
231
+
return nil, err
232
+
}
233
+
return &events.XRPCStreamEvent{RepoIdentity: evt}, nil
234
+
case "account":
235
+
evt := &atproto.SyncSubscribeRepos_Account{}
236
+
if err := evt.UnmarshalCBOR(cr); err != nil {
237
+
return nil, err
238
+
}
239
+
return &events.XRPCStreamEvent{RepoAccount: evt}, nil
240
+
default:
241
+
return nil, fmt.Errorf("unknown event type: %s", evtType)
242
+
}
243
+
}
+1
-1
server/repo.go
···
39
return &RepoMan{
40
s: s,
41
db: s.db,
42
-
clock: &clock,
43
}
44
}
45
···
39
return &RepoMan{
40
s: s,
41
db: s.db,
42
+
clock: clock,
43
}
44
}
45
+6
-1
server/server.go
···
405
nonceSecret = maybeSecret
406
}
407
0
0
0
0
0
408
s := &Server{
409
http: h,
410
httpd: httpd,
···
429
BlockstoreVariant: args.BlockstoreVariant,
430
FallbackProxy: args.FallbackProxy,
431
},
432
-
evtman: events.NewEventManager(events.NewMemPersister()),
433
passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
434
435
dbName: args.DbName,
···
405
nonceSecret = maybeSecret
406
}
407
408
+
evtPersister, err := NewDbPersister(gdb, 72*time.Hour)
409
+
if err != nil {
410
+
return nil, fmt.Errorf("failed to create event persister: %w", err)
411
+
}
412
+
413
s := &Server{
414
http: h,
415
httpd: httpd,
···
434
BlockstoreVariant: args.BlockstoreVariant,
435
FallbackProxy: args.FallbackProxy,
436
},
437
+
evtman: events.NewEventManager(evtPersister),
438
passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
439
440
dbName: args.DbName,