+70
events/events.go
+70
events/events.go
···
219
219
return obj.MarshalCBOR(cborWriter)
220
220
}
221
221
222
+
func (xevt *XRPCStreamEvent) Deserialize(r io.Reader) error {
223
+
var header EventHeader
224
+
if err := header.UnmarshalCBOR(r); err != nil {
225
+
return fmt.Errorf("reading header: %w", err)
226
+
}
227
+
switch header.Op {
228
+
case EvtKindMessage:
229
+
switch header.MsgType {
230
+
case "#commit":
231
+
var evt comatproto.SyncSubscribeRepos_Commit
232
+
if err := evt.UnmarshalCBOR(r); err != nil {
233
+
return fmt.Errorf("reading repoCommit event: %w", err)
234
+
}
235
+
xevt.RepoCommit = &evt
236
+
case "#handle":
237
+
var evt comatproto.SyncSubscribeRepos_Handle
238
+
if err := evt.UnmarshalCBOR(r); err != nil {
239
+
return err
240
+
}
241
+
xevt.RepoHandle = &evt
242
+
case "#identity":
243
+
var evt comatproto.SyncSubscribeRepos_Identity
244
+
if err := evt.UnmarshalCBOR(r); err != nil {
245
+
return err
246
+
}
247
+
xevt.RepoIdentity = &evt
248
+
case "#account":
249
+
var evt comatproto.SyncSubscribeRepos_Account
250
+
if err := evt.UnmarshalCBOR(r); err != nil {
251
+
return err
252
+
}
253
+
xevt.RepoAccount = &evt
254
+
case "#info":
255
+
// TODO: this might also be a LabelInfo (as opposed to RepoInfo)
256
+
var evt comatproto.SyncSubscribeRepos_Info
257
+
if err := evt.UnmarshalCBOR(r); err != nil {
258
+
return err
259
+
}
260
+
xevt.RepoInfo = &evt
261
+
case "#migrate":
262
+
var evt comatproto.SyncSubscribeRepos_Migrate
263
+
if err := evt.UnmarshalCBOR(r); err != nil {
264
+
return err
265
+
}
266
+
xevt.RepoMigrate = &evt
267
+
case "#tombstone":
268
+
var evt comatproto.SyncSubscribeRepos_Tombstone
269
+
if err := evt.UnmarshalCBOR(r); err != nil {
270
+
return err
271
+
}
272
+
xevt.RepoTombstone = &evt
273
+
case "#labels":
274
+
var evt comatproto.LabelSubscribeLabels_Labels
275
+
if err := evt.UnmarshalCBOR(r); err != nil {
276
+
return fmt.Errorf("reading Labels event: %w", err)
277
+
}
278
+
xevt.LabelLabels = &evt
279
+
}
280
+
case EvtKindErrorFrame:
281
+
var errframe ErrorFrame
282
+
if err := errframe.UnmarshalCBOR(r); err != nil {
283
+
return err
284
+
}
285
+
xevt.Error = &errframe
286
+
default:
287
+
return fmt.Errorf("unrecognized event stream type: %d", header.Op)
288
+
}
289
+
return nil
290
+
}
291
+
222
292
var ErrNoSeq = errors.New("event has no sequence number")
223
293
224
294
// serialize content into Preserialized cache
+63
-6
events/pebblepersist.go
+63
-6
events/pebblepersist.go
···
1
1
package events
2
2
3
3
import (
4
+
"bytes"
4
5
"context"
5
6
"encoding/binary"
6
7
"fmt"
···
30
31
}
31
32
blob := e.Preserialized
32
33
33
-
seq := e.Se
34
+
seq := e.Sequence()
35
+
if seq < 0 {
36
+
// drop event
37
+
// TODO: persist with longer key? {prev 8 byte key}{int32 extra counter}
38
+
return nil
39
+
}
34
40
35
41
var key [8]byte
36
-
binary.BigEndian.PutUint64(key, seq)
42
+
binary.BigEndian.PutUint64(key[:], uint64(seq))
43
+
44
+
err = pp.db.Set(key[:], blob, pebble.Sync)
37
45
38
-
return nil
46
+
return err
47
+
}
48
+
49
+
func eventFromPebbleIter(iter *pebble.Iterator) (*XRPCStreamEvent, error) {
50
+
blob, err := iter.ValueAndErr()
51
+
if err != nil {
52
+
return nil, err
53
+
}
54
+
br := bytes.NewReader(blob)
55
+
evt := new(XRPCStreamEvent)
56
+
err = evt.Deserialize(br)
57
+
if err != nil {
58
+
return nil, err
59
+
}
60
+
evt.Preserialized = bytes.Clone(blob)
61
+
return evt, nil
39
62
}
63
+
40
64
func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error {
65
+
var key [8]byte
66
+
binary.BigEndian.PutUint64(key[:], uint64(since))
67
+
68
+
iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{LowerBound: key[:]})
69
+
if err != nil {
70
+
return err
71
+
}
72
+
defer iter.Close()
73
+
74
+
for iter.First(); iter.Valid(); iter.Next() {
75
+
evt, err := eventFromPebbleIter(iter)
76
+
if err != nil {
77
+
return err
78
+
}
79
+
80
+
err = cb(evt)
81
+
if err != nil {
82
+
return err
83
+
}
84
+
}
85
+
41
86
return nil
42
87
}
43
88
func (pp *PebblePersist) TakeDownRepo(ctx context.Context, usr models.Uid) error {
89
+
// TODO: implement filter on playback to ignore taken-down-repos?
44
90
return nil
45
91
}
46
92
func (pp *PebblePersist) Flush(context.Context) error {
47
-
return nil
93
+
return pp.db.Flush()
48
94
}
49
95
func (pp *PebblePersist) Shutdown(context.Context) error {
50
-
return nil
96
+
err := pp.db.Close()
97
+
pp.db = nil
98
+
return err
51
99
}
52
100
53
101
func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*XRPCStreamEvent)) {
···
55
103
}
56
104
57
105
func (pp *PebblePersist) GetLast(ctx context.Context) (*XRPCStreamEvent, error) {
58
-
106
+
iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{})
107
+
if err != nil {
108
+
return nil, err
109
+
}
110
+
ok := iter.Last()
111
+
if !ok {
112
+
return nil, nil
113
+
}
114
+
evt, err := eventFromPebbleIter(iter)
115
+
return evt, nil
59
116
}