1package diskpersist
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/binary"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "os"
13 "path/filepath"
14 "sync"
15 "time"
16
17 "github.com/bluesky-social/indigo/api/atproto"
18 "github.com/bluesky-social/indigo/events"
19 "github.com/bluesky-social/indigo/models"
20 arc "github.com/hashicorp/golang-lru/arc/v2"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/prometheus/client_golang/prometheus/promauto"
23 cbg "github.com/whyrusleeping/cbor-gen"
24 "gorm.io/gorm"
25)
26
27var log = slog.Default().With("system", "diskpersist")
28
29type DiskPersistence struct {
30 primaryDir string
31 archiveDir string
32 eventsPerFile int64
33 writeBufferSize int
34 retention time.Duration
35
36 meta *gorm.DB
37
38 broadcast func(*events.XRPCStreamEvent)
39
40 logfi *os.File
41
42 curSeq int64
43
44 uidCache *arc.ARCCache[models.Uid, string] // TODO: unused
45 didCache *arc.ARCCache[string, models.Uid]
46
47 writers *sync.Pool
48 buffers *sync.Pool
49 scratch []byte
50
51 outbuf *bytes.Buffer
52 evtbuf []persistJob
53
54 shutdown chan struct{}
55
56 lk sync.Mutex
57}
58
59type persistJob struct {
60 Bytes []byte
61 Evt *events.XRPCStreamEvent
62 Buffer *bytes.Buffer // so we can put it back in the pool when we're done
63}
64
65type jobResult struct {
66 Err error
67 Seq int64
68}
69
70const (
71 EvtFlagTakedown = 1 << iota
72 EvtFlagRebased
73)
74
75var _ (events.EventPersistence) = (*DiskPersistence)(nil)
76
77type DiskPersistOptions struct {
78 UIDCacheSize int
79 DIDCacheSize int
80 EventsPerFile int64
81 WriteBufferSize int
82 Retention time.Duration
83}
84
85func DefaultDiskPersistOptions() *DiskPersistOptions {
86 return &DiskPersistOptions{
87 EventsPerFile: 10_000,
88 UIDCacheSize: 1_000_000,
89 DIDCacheSize: 1_000_000,
90 WriteBufferSize: 50,
91 Retention: time.Hour * 24 * 3, // 3 days
92 }
93}
94
95func NewDiskPersistence(primaryDir, archiveDir string, db *gorm.DB, opts *DiskPersistOptions) (*DiskPersistence, error) {
96 if opts == nil {
97 opts = DefaultDiskPersistOptions()
98 }
99
100 uidCache, err := arc.NewARC[models.Uid, string](opts.UIDCacheSize)
101 if err != nil {
102 return nil, fmt.Errorf("failed to create uid cache: %w", err)
103 }
104
105 didCache, err := arc.NewARC[string, models.Uid](opts.DIDCacheSize)
106 if err != nil {
107 return nil, fmt.Errorf("failed to create did cache: %w", err)
108 }
109
110 db.AutoMigrate(&LogFileRef{})
111
112 bufpool := &sync.Pool{
113 New: func() any {
114 return new(bytes.Buffer)
115 },
116 }
117
118 wrpool := &sync.Pool{
119 New: func() any {
120 return cbg.NewCborWriter(nil)
121 },
122 }
123
124 dp := &DiskPersistence{
125 meta: db,
126 primaryDir: primaryDir,
127 archiveDir: archiveDir,
128 buffers: bufpool,
129 retention: opts.Retention,
130 writers: wrpool,
131 uidCache: uidCache,
132 didCache: didCache,
133 eventsPerFile: opts.EventsPerFile,
134 scratch: make([]byte, headerSize),
135 outbuf: new(bytes.Buffer),
136 writeBufferSize: opts.WriteBufferSize,
137 shutdown: make(chan struct{}),
138 }
139
140 if err := dp.resumeLog(); err != nil {
141 return nil, err
142 }
143
144 go dp.flushRoutine()
145
146 go dp.garbageCollectRoutine()
147
148 return dp, nil
149}
150
151type LogFileRef struct {
152 gorm.Model
153 Path string
154 Archived bool
155 SeqStart int64
156}
157
158func (dp *DiskPersistence) resumeLog() error {
159 var lfr LogFileRef
160 if err := dp.meta.Order("seq_start desc").Limit(1).Find(&lfr).Error; err != nil {
161 return err
162 }
163
164 if lfr.ID == 0 {
165 // no files, start anew!
166 return dp.initLogFile()
167 }
168
169 // 0 for the mode is fine since that is only used if O_CREAT is passed
170 fi, err := os.OpenFile(filepath.Join(dp.primaryDir, lfr.Path), os.O_RDWR, 0)
171 if err != nil {
172 return err
173 }
174
175 seq, err := scanForLastSeq(fi, -1)
176 if err != nil {
177 return fmt.Errorf("failed to scan log file for last seqno: %w", err)
178 }
179
180 dp.curSeq = seq
181 dp.logfi = fi
182
183 return nil
184}
185
186func (dp *DiskPersistence) initLogFile() error {
187 if err := os.MkdirAll(dp.primaryDir, 0775); err != nil {
188 return err
189 }
190
191 p := filepath.Join(dp.primaryDir, "evts-0")
192 fi, err := os.Create(p)
193 if err != nil {
194 return err
195 }
196
197 if err := dp.meta.Create(&LogFileRef{
198 Path: "evts-0",
199 SeqStart: 0,
200 }).Error; err != nil {
201 return err
202 }
203
204 dp.logfi = fi
205 dp.curSeq = 1
206 return nil
207}
208
209// swapLog swaps the current log file out for a new empty one
210// must only be called while holding dp.lk
211func (dp *DiskPersistence) swapLog(ctx context.Context) error {
212 if err := dp.logfi.Close(); err != nil {
213 return fmt.Errorf("failed to close current log file: %w", err)
214 }
215
216 fname := fmt.Sprintf("evts-%d", dp.curSeq)
217 nextp := filepath.Join(dp.primaryDir, fname)
218
219 fi, err := os.Create(nextp)
220 if err != nil {
221 return err
222 }
223
224 if err := dp.meta.Create(&LogFileRef{
225 Path: fname,
226 SeqStart: dp.curSeq,
227 }).Error; err != nil {
228 return err
229 }
230
231 dp.logfi = fi
232 return nil
233}
234
235func scanForLastSeq(fi *os.File, end int64) (int64, error) {
236 scratch := make([]byte, headerSize)
237
238 var lastSeq int64 = -1
239 var offset int64
240 for {
241 eh, err := readHeader(fi, scratch)
242 if err != nil {
243 if errors.Is(err, io.EOF) {
244 return lastSeq, nil
245 }
246 return 0, err
247 }
248
249 if end > 0 && eh.Seq > end {
250 // return to beginning of offset
251 n, err := fi.Seek(offset, io.SeekStart)
252 if err != nil {
253 return 0, err
254 }
255
256 if n != offset {
257 return 0, fmt.Errorf("rewind seek failed")
258 }
259
260 return eh.Seq, nil
261 }
262
263 lastSeq = eh.Seq
264
265 noff, err := fi.Seek(int64(eh.Len), io.SeekCurrent)
266 if err != nil {
267 return 0, err
268 }
269
270 if noff != offset+headerSize+int64(eh.Len) {
271 // TODO: must recover from this
272 return 0, fmt.Errorf("did not seek to next event properly")
273 }
274
275 offset = noff
276 }
277}
278
279const (
280 evtKindCommit = 1
281 evtKindHandle = 2 // DEPRECATED
282 evtKindTombstone = 3 // DEPRECATED
283 evtKindIdentity = 4
284 evtKindAccount = 5
285 evtKindSync = 6
286)
287
288var emptyHeader = make([]byte, headerSize)
289
290func (dp *DiskPersistence) addJobToQueue(ctx context.Context, job persistJob) error {
291 dp.lk.Lock()
292 defer dp.lk.Unlock()
293
294 if err := dp.doPersist(ctx, job); err != nil {
295 return err
296 }
297
298 // TODO: for some reason replacing this constant with p.writeBufferSize dramatically reduces perf...
299 if len(dp.evtbuf) > 400 {
300 if err := dp.flushLog(ctx); err != nil {
301 return fmt.Errorf("failed to flush disk log: %w", err)
302 }
303 }
304
305 return nil
306}
307
308func (dp *DiskPersistence) flushRoutine() {
309 t := time.NewTicker(time.Millisecond * 100)
310
311 for {
312 ctx := context.Background()
313 select {
314 case <-dp.shutdown:
315 return
316 case <-t.C:
317 dp.lk.Lock()
318 if err := dp.flushLog(ctx); err != nil {
319 // TODO: this happening is quite bad. Need a recovery strategy
320 log.Error("failed to flush disk log", "err", err)
321 }
322 dp.lk.Unlock()
323 }
324 }
325}
326
327func (dp *DiskPersistence) flushLog(ctx context.Context) error {
328 if len(dp.evtbuf) == 0 {
329 return nil
330 }
331
332 _, err := io.Copy(dp.logfi, dp.outbuf)
333 if err != nil {
334 return err
335 }
336
337 dp.outbuf.Truncate(0)
338
339 for _, ej := range dp.evtbuf {
340 dp.broadcast(ej.Evt)
341 ej.Buffer.Truncate(0)
342 dp.buffers.Put(ej.Buffer)
343 }
344
345 dp.evtbuf = dp.evtbuf[:0]
346
347 return nil
348}
349
350func (dp *DiskPersistence) garbageCollectRoutine() {
351 t := time.NewTicker(time.Hour)
352
353 for {
354 ctx := context.Background()
355 select {
356 // Closing a channel can be listened to with multiple routines: https://goplay.tools/snippet/UcwbC0CeJAL
357 case <-dp.shutdown:
358 return
359 case <-t.C:
360 if errs := dp.garbageCollect(ctx); len(errs) > 0 {
361 for _, err := range errs {
362 log.Error("garbage collection error", "err", err)
363 }
364 }
365 }
366 }
367}
368
369var garbageCollectionsExecuted = promauto.NewCounterVec(prometheus.CounterOpts{
370 Name: "disk_persister_garbage_collections_executed",
371 Help: "Number of garbage collections executed",
372}, []string{})
373
374var garbageCollectionErrors = promauto.NewCounterVec(prometheus.CounterOpts{
375 Name: "disk_persister_garbage_collections_errors",
376 Help: "Number of errors encountered during garbage collection",
377}, []string{})
378
379var refsGarbageCollected = promauto.NewCounterVec(prometheus.CounterOpts{
380 Name: "disk_persister_garbage_collections_refs_collected",
381 Help: "Number of refs collected during garbage collection",
382}, []string{})
383
384var filesGarbageCollected = promauto.NewCounterVec(prometheus.CounterOpts{
385 Name: "disk_persister_garbage_collections_files_collected",
386 Help: "Number of files collected during garbage collection",
387}, []string{})
388
389func (dp *DiskPersistence) garbageCollect(ctx context.Context) []error {
390 garbageCollectionsExecuted.WithLabelValues().Inc()
391
392 // Grab refs created before the retention period
393 var refs []LogFileRef
394 var errs []error
395
396 defer func() {
397 garbageCollectionErrors.WithLabelValues().Add(float64(len(errs)))
398 }()
399
400 if err := dp.meta.WithContext(ctx).Find(&refs, "created_at < ?", time.Now().Add(-dp.retention)).Error; err != nil {
401 return []error{err}
402 }
403
404 oldRefsFound := len(refs)
405 refsDeleted := 0
406 filesDeleted := 0
407
408 // In the future if we want to support Archiving, we could do that here instead of deleting
409 for _, r := range refs {
410 dp.lk.Lock()
411 currentLogfile := dp.logfi.Name()
412 dp.lk.Unlock()
413
414 if filepath.Join(dp.primaryDir, r.Path) == currentLogfile {
415 // Don't delete the current log file
416 log.Info("skipping deletion of current log file")
417 continue
418 }
419
420 // Delete the ref in the database to prevent playback from finding it
421 if err := dp.meta.WithContext(ctx).Delete(&r).Error; err != nil {
422 errs = append(errs, err)
423 continue
424 }
425 refsDeleted++
426
427 // Delete the file from disk
428 if err := os.Remove(filepath.Join(dp.primaryDir, r.Path)); err != nil {
429 errs = append(errs, err)
430 continue
431 }
432 filesDeleted++
433 }
434
435 refsGarbageCollected.WithLabelValues().Add(float64(refsDeleted))
436 filesGarbageCollected.WithLabelValues().Add(float64(filesDeleted))
437
438 log.Info("garbage collection complete",
439 "filesDeleted", filesDeleted,
440 "refsDeleted", refsDeleted,
441 "oldRefsFound", oldRefsFound,
442 )
443
444 return errs
445}
446
447func (dp *DiskPersistence) doPersist(ctx context.Context, j persistJob) error {
448 b := j.Bytes
449 e := j.Evt
450 seq := dp.curSeq
451 dp.curSeq++
452
453 // Set sequence number in event header
454 binary.LittleEndian.PutUint64(b[20:], uint64(seq))
455
456 switch {
457 case e.RepoCommit != nil:
458 e.RepoCommit.Seq = seq
459 case e.RepoSync != nil:
460 e.RepoSync.Seq = seq
461 case e.RepoIdentity != nil:
462 e.RepoIdentity.Seq = seq
463 case e.RepoAccount != nil:
464 e.RepoAccount.Seq = seq
465 default:
466 // only those three get peristed right now
467 // we should not actually ever get here...
468 return nil
469 }
470
471 // TODO: does this guarantee a full write?
472 _, err := dp.outbuf.Write(b)
473 if err != nil {
474 return err
475 }
476
477 dp.evtbuf = append(dp.evtbuf, j)
478
479 if seq%dp.eventsPerFile == 0 {
480 if err := dp.flushLog(ctx); err != nil {
481 return err
482 }
483
484 // time to roll the log file
485 if err := dp.swapLog(ctx); err != nil {
486 return err
487 }
488 }
489
490 return nil
491}
492
493func (dp *DiskPersistence) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
494 buffer := dp.buffers.Get().(*bytes.Buffer)
495 cw := dp.writers.Get().(*cbg.CborWriter)
496 cw.SetWriter(buffer)
497
498 buffer.Truncate(0)
499
500 buffer.Write(emptyHeader)
501
502 var did string
503 var evtKind uint32
504 switch {
505 case e.RepoCommit != nil:
506 evtKind = evtKindCommit
507 did = e.RepoCommit.Repo
508 if err := e.RepoCommit.MarshalCBOR(cw); err != nil {
509 return fmt.Errorf("failed to marshal: %w", err)
510 }
511 case e.RepoSync != nil:
512 evtKind = evtKindSync
513 did = e.RepoSync.Did
514 if err := e.RepoSync.MarshalCBOR(cw); err != nil {
515 return fmt.Errorf("failed to marshal: %w", err)
516 }
517 case e.RepoIdentity != nil:
518 evtKind = evtKindIdentity
519 did = e.RepoIdentity.Did
520 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil {
521 return fmt.Errorf("failed to marshal: %w", err)
522 }
523 case e.RepoAccount != nil:
524 evtKind = evtKindAccount
525 did = e.RepoAccount.Did
526 if err := e.RepoAccount.MarshalCBOR(cw); err != nil {
527 return fmt.Errorf("failed to marshal: %w", err)
528 }
529 default:
530 return nil
531 // only those two get peristed right now
532 }
533
534 usr, err := dp.uidForDid(ctx, did)
535 if err != nil {
536 return err
537 }
538
539 b := buffer.Bytes()
540
541 // Set flags in header (no flags for now)
542 binary.LittleEndian.PutUint32(b, 0)
543 // Set event kind in header
544 binary.LittleEndian.PutUint32(b[4:], evtKind)
545 // Set event length in header
546 binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-headerSize))
547 // Set user UID in header
548 binary.LittleEndian.PutUint64(b[12:], uint64(usr))
549
550 return dp.addJobToQueue(ctx, persistJob{
551 Bytes: b,
552 Evt: e,
553 Buffer: buffer,
554 })
555}
556
557type evtHeader struct {
558 Flags uint32
559 Kind uint32
560 Seq int64
561 Usr models.Uid
562 Len uint32
563}
564
565func (eh *evtHeader) Len64() int64 {
566 return int64(eh.Len)
567}
568
569const headerSize = 4 + 4 + 4 + 8 + 8
570
571func readHeader(r io.Reader, scratch []byte) (*evtHeader, error) {
572 if len(scratch) < headerSize {
573 return nil, fmt.Errorf("must pass scratch buffer of at least %d bytes", headerSize)
574 }
575
576 scratch = scratch[:headerSize]
577 _, err := io.ReadFull(r, scratch)
578 if err != nil {
579 return nil, fmt.Errorf("reading header: %w", err)
580 }
581
582 flags := binary.LittleEndian.Uint32(scratch[:4])
583 kind := binary.LittleEndian.Uint32(scratch[4:8])
584 l := binary.LittleEndian.Uint32(scratch[8:12])
585 usr := binary.LittleEndian.Uint64(scratch[12:20])
586 seq := binary.LittleEndian.Uint64(scratch[20:28])
587
588 return &evtHeader{
589 Flags: flags,
590 Kind: kind,
591 Len: l,
592 Usr: models.Uid(usr),
593 Seq: int64(seq),
594 }, nil
595}
596
597func (dp *DiskPersistence) writeHeader(ctx context.Context, flags uint32, kind uint32, l uint32, usr uint64, seq int64) error {
598 binary.LittleEndian.PutUint32(dp.scratch, flags)
599 binary.LittleEndian.PutUint32(dp.scratch[4:], kind)
600 binary.LittleEndian.PutUint32(dp.scratch[8:], l)
601 binary.LittleEndian.PutUint64(dp.scratch[12:], usr)
602 binary.LittleEndian.PutUint64(dp.scratch[20:], uint64(seq))
603
604 nw, err := dp.logfi.Write(dp.scratch)
605 if err != nil {
606 return err
607 }
608
609 if nw != headerSize {
610 return fmt.Errorf("only wrote %d bytes for header", nw)
611 }
612
613 return nil
614}
615
616func (dp *DiskPersistence) uidForDid(ctx context.Context, did string) (models.Uid, error) {
617 if uid, ok := dp.didCache.Get(did); ok {
618 return uid, nil
619 }
620
621 var u models.ActorInfo
622 if err := dp.meta.First(&u, "did = ?", did).Error; err != nil {
623 return 0, err
624 }
625
626 dp.didCache.Add(did, u.Uid)
627
628 return u.Uid, nil
629}
630
631func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
632 base := since - (since % dp.eventsPerFile)
633 var logs []LogFileRef
634 if err := dp.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", base).Error; err != nil {
635 return err
636 }
637
638 for i := 0; i < 10; i++ {
639 lastSeq, err := dp.PlaybackLogfiles(ctx, since, cb, logs)
640 if err != nil {
641 return err
642 }
643
644 // No lastSeq implies that we read until the end of known events
645 if lastSeq == nil {
646 break
647 }
648
649 if err := dp.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", *lastSeq).Error; err != nil {
650 return err
651 }
652 since = *lastSeq
653 }
654
655 return nil
656}
657
658func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error) {
659 for i, lf := range logFiles {
660 lastSeq, err := dp.readEventsFrom(ctx, since, filepath.Join(dp.primaryDir, lf.Path), cb)
661 if err != nil {
662 return nil, err
663 }
664 since = 0
665 if i == len(logFiles)-1 &&
666 lastSeq != nil &&
667 (*lastSeq-lf.SeqStart) == dp.eventsPerFile-1 {
668 // There may be more log files to read since the last one was full
669 return lastSeq, nil
670 }
671 }
672
673 return nil, nil
674}
675
676func postDoNotEmit(flags uint32) bool {
677 if flags&(EvtFlagRebased|EvtFlagTakedown) != 0 {
678 return true
679 }
680
681 return false
682}
683
684func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*events.XRPCStreamEvent) error) (*int64, error) {
685 fi, err := os.OpenFile(fn, os.O_RDONLY, 0)
686 if err != nil {
687 return nil, err
688 }
689
690 if since != 0 {
691 lastSeq, err := scanForLastSeq(fi, since)
692 if err != nil {
693 return nil, err
694 }
695 if since > lastSeq {
696 log.Error("playback cursor is greater than last seq of file checked",
697 "since", since,
698 "lastSeq", lastSeq,
699 "filename", fn,
700 )
701 return nil, nil
702 }
703 }
704
705 bufr := bufio.NewReader(fi)
706
707 lastSeq := int64(0)
708
709 scratch := make([]byte, headerSize)
710 for {
711 h, err := readHeader(bufr, scratch)
712 if err != nil {
713 if errors.Is(err, io.EOF) {
714 return &lastSeq, nil
715 }
716
717 return nil, err
718 }
719
720 lastSeq = h.Seq
721
722 if postDoNotEmit(h.Flags) {
723 // event taken down, skip
724 _, err := io.CopyN(io.Discard, bufr, h.Len64()) // would be really nice if the buffered reader had a 'skip' method that does a seek under the hood
725 if err != nil {
726 return nil, fmt.Errorf("failed while skipping event (seq: %d, fn: %q): %w", h.Seq, fn, err)
727 }
728 continue
729 }
730
731 switch h.Kind {
732 case evtKindCommit:
733 var evt atproto.SyncSubscribeRepos_Commit
734 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
735 return nil, err
736 }
737 evt.Seq = h.Seq
738 if err := cb(&events.XRPCStreamEvent{RepoCommit: &evt}); err != nil {
739 return nil, err
740 }
741 case evtKindSync:
742 var evt atproto.SyncSubscribeRepos_Sync
743 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
744 return nil, err
745 }
746 evt.Seq = h.Seq
747 if err := cb(&events.XRPCStreamEvent{RepoSync: &evt}); err != nil {
748 return nil, err
749 }
750 case evtKindIdentity:
751 var evt atproto.SyncSubscribeRepos_Identity
752 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
753 return nil, err
754 }
755 evt.Seq = h.Seq
756 if err := cb(&events.XRPCStreamEvent{RepoIdentity: &evt}); err != nil {
757 return nil, err
758 }
759 case evtKindAccount:
760 var evt atproto.SyncSubscribeRepos_Account
761 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
762 return nil, err
763 }
764 evt.Seq = h.Seq
765 if err := cb(&events.XRPCStreamEvent{RepoAccount: &evt}); err != nil {
766 return nil, err
767 }
768 default:
769 log.Warn("unrecognized event kind coming from log file", "seq", h.Seq, "kind", h.Kind)
770 return nil, fmt.Errorf("halting on unrecognized event kind")
771 }
772 }
773}
774
775type UserAction struct {
776 gorm.Model
777
778 Usr models.Uid
779 RebaseAt int64
780 Takedown bool
781}
782
783func (dp *DiskPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error {
784 /*
785 if err := p.meta.Create(&UserAction{
786 Usr: usr,
787 Takedown: true,
788 }).Error; err != nil {
789 return err
790 }
791 */
792
793 return dp.forEachShardWithUserEvents(ctx, usr, func(ctx context.Context, fn string) error {
794 if err := dp.deleteEventsForUser(ctx, usr, fn); err != nil {
795 return err
796 }
797
798 return nil
799 })
800}
801
802func (dp *DiskPersistence) forEachShardWithUserEvents(ctx context.Context, usr models.Uid, cb func(context.Context, string) error) error {
803 var refs []LogFileRef
804 if err := dp.meta.Order("created_at desc").Find(&refs).Error; err != nil {
805 return err
806 }
807
808 for _, r := range refs {
809 mhas, err := dp.refMaybeHasUserEvents(ctx, usr, r)
810 if err != nil {
811 return err
812 }
813
814 if mhas {
815 var path string
816 if r.Archived {
817 path = filepath.Join(dp.archiveDir, r.Path)
818 } else {
819 path = filepath.Join(dp.primaryDir, r.Path)
820 }
821
822 if err := cb(ctx, path); err != nil {
823 return err
824 }
825 }
826 }
827
828 return nil
829}
830
831func (dp *DiskPersistence) refMaybeHasUserEvents(ctx context.Context, usr models.Uid, ref LogFileRef) (bool, error) {
832 // TODO: lazily computed bloom filters for users in each logfile
833 return true, nil
834}
835
836type zeroReader struct{}
837
838func (zr *zeroReader) Read(p []byte) (n int, err error) {
839 for i := range p {
840 p[i] = 0
841 }
842 return len(p), nil
843}
844
845func (dp *DiskPersistence) deleteEventsForUser(ctx context.Context, usr models.Uid, fn string) error {
846 return dp.mutateUserEventsInLog(ctx, usr, fn, EvtFlagTakedown, true)
847}
848
849func (dp *DiskPersistence) mutateUserEventsInLog(ctx context.Context, usr models.Uid, fn string, flag uint32, zeroEvts bool) error {
850 fi, err := os.OpenFile(fn, os.O_RDWR, 0)
851 if err != nil {
852 return fmt.Errorf("failed to open log file: %w", err)
853 }
854 defer fi.Close()
855 defer fi.Sync()
856
857 scratch := make([]byte, headerSize)
858 var offset int64
859 for {
860 h, err := readHeader(fi, scratch)
861 if err != nil {
862 if errors.Is(err, io.EOF) {
863 return nil
864 }
865
866 return err
867 }
868
869 if h.Usr == usr && h.Flags&flag == 0 {
870 nflag := h.Flags | flag
871
872 binary.LittleEndian.PutUint32(scratch, nflag)
873
874 if _, err := fi.WriteAt(scratch[:4], offset); err != nil {
875 return fmt.Errorf("failed to write updated flag value: %w", err)
876 }
877
878 if zeroEvts {
879 // sync that write before blanking the event data
880 if err := fi.Sync(); err != nil {
881 return err
882 }
883
884 if _, err := fi.Seek(offset+headerSize, io.SeekStart); err != nil {
885 return fmt.Errorf("failed to seek: %w", err)
886 }
887
888 _, err := io.CopyN(fi, &zeroReader{}, h.Len64())
889 if err != nil {
890 return err
891 }
892 }
893 }
894
895 offset += headerSize + h.Len64()
896 _, err = fi.Seek(offset, io.SeekStart)
897 if err != nil {
898 return fmt.Errorf("failed to seek: %w", err)
899 }
900 }
901}
902
903func (dp *DiskPersistence) Flush(ctx context.Context) error {
904 dp.lk.Lock()
905 defer dp.lk.Unlock()
906 if len(dp.evtbuf) > 0 {
907 return dp.flushLog(ctx)
908 }
909 return nil
910}
911
912func (dp *DiskPersistence) Shutdown(ctx context.Context) error {
913 close(dp.shutdown)
914 if err := dp.Flush(ctx); err != nil {
915 return err
916 }
917
918 dp.logfi.Close()
919 return nil
920}
921
922func (dp *DiskPersistence) SetEventBroadcaster(f func(*events.XRPCStreamEvent)) {
923 dp.broadcast = f
924}