fork of indigo with slightly nicer lexgen
at main 21 kB view raw
1package diskpersist 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "encoding/binary" 8 "errors" 9 "fmt" 10 "io" 11 "log/slog" 12 "os" 13 "path/filepath" 14 "sync" 15 "time" 16 17 "github.com/bluesky-social/indigo/api/atproto" 18 "github.com/bluesky-social/indigo/events" 19 "github.com/bluesky-social/indigo/models" 20 arc "github.com/hashicorp/golang-lru/arc/v2" 21 "github.com/prometheus/client_golang/prometheus" 22 "github.com/prometheus/client_golang/prometheus/promauto" 23 cbg "github.com/whyrusleeping/cbor-gen" 24 "gorm.io/gorm" 25) 26 27var log = slog.Default().With("system", "diskpersist") 28 29type DiskPersistence struct { 30 primaryDir string 31 archiveDir string 32 eventsPerFile int64 33 writeBufferSize int 34 retention time.Duration 35 36 meta *gorm.DB 37 38 broadcast func(*events.XRPCStreamEvent) 39 40 logfi *os.File 41 42 curSeq int64 43 44 uidCache *arc.ARCCache[models.Uid, string] // TODO: unused 45 didCache *arc.ARCCache[string, models.Uid] 46 47 writers *sync.Pool 48 buffers *sync.Pool 49 scratch []byte 50 51 outbuf *bytes.Buffer 52 evtbuf []persistJob 53 54 shutdown chan struct{} 55 56 lk sync.Mutex 57} 58 59type persistJob struct { 60 Bytes []byte 61 Evt *events.XRPCStreamEvent 62 Buffer *bytes.Buffer // so we can put it back in the pool when we're done 63} 64 65type jobResult struct { 66 Err error 67 Seq int64 68} 69 70const ( 71 EvtFlagTakedown = 1 << iota 72 EvtFlagRebased 73) 74 75var _ (events.EventPersistence) = (*DiskPersistence)(nil) 76 77type DiskPersistOptions struct { 78 UIDCacheSize int 79 DIDCacheSize int 80 EventsPerFile int64 81 WriteBufferSize int 82 Retention time.Duration 83} 84 85func DefaultDiskPersistOptions() *DiskPersistOptions { 86 return &DiskPersistOptions{ 87 EventsPerFile: 10_000, 88 UIDCacheSize: 1_000_000, 89 DIDCacheSize: 1_000_000, 90 WriteBufferSize: 50, 91 Retention: time.Hour * 24 * 3, // 3 days 92 } 93} 94 95func NewDiskPersistence(primaryDir, archiveDir string, db *gorm.DB, opts *DiskPersistOptions) (*DiskPersistence, error) { 96 if opts == nil { 97 opts = DefaultDiskPersistOptions() 98 } 99 100 uidCache, err := arc.NewARC[models.Uid, string](opts.UIDCacheSize) 101 if err != nil { 102 return nil, fmt.Errorf("failed to create uid cache: %w", err) 103 } 104 105 didCache, err := arc.NewARC[string, models.Uid](opts.DIDCacheSize) 106 if err != nil { 107 return nil, fmt.Errorf("failed to create did cache: %w", err) 108 } 109 110 db.AutoMigrate(&LogFileRef{}) 111 112 bufpool := &sync.Pool{ 113 New: func() any { 114 return new(bytes.Buffer) 115 }, 116 } 117 118 wrpool := &sync.Pool{ 119 New: func() any { 120 return cbg.NewCborWriter(nil) 121 }, 122 } 123 124 dp := &DiskPersistence{ 125 meta: db, 126 primaryDir: primaryDir, 127 archiveDir: archiveDir, 128 buffers: bufpool, 129 retention: opts.Retention, 130 writers: wrpool, 131 uidCache: uidCache, 132 didCache: didCache, 133 eventsPerFile: opts.EventsPerFile, 134 scratch: make([]byte, headerSize), 135 outbuf: new(bytes.Buffer), 136 writeBufferSize: opts.WriteBufferSize, 137 shutdown: make(chan struct{}), 138 } 139 140 if err := dp.resumeLog(); err != nil { 141 return nil, err 142 } 143 144 go dp.flushRoutine() 145 146 go dp.garbageCollectRoutine() 147 148 return dp, nil 149} 150 151type LogFileRef struct { 152 gorm.Model 153 Path string 154 Archived bool 155 SeqStart int64 156} 157 158func (dp *DiskPersistence) resumeLog() error { 159 var lfr LogFileRef 160 if err := dp.meta.Order("seq_start desc").Limit(1).Find(&lfr).Error; err != nil { 161 return err 162 } 163 164 if lfr.ID == 0 { 165 // no files, start anew! 166 return dp.initLogFile() 167 } 168 169 // 0 for the mode is fine since that is only used if O_CREAT is passed 170 fi, err := os.OpenFile(filepath.Join(dp.primaryDir, lfr.Path), os.O_RDWR, 0) 171 if err != nil { 172 return err 173 } 174 175 seq, err := scanForLastSeq(fi, -1) 176 if err != nil { 177 return fmt.Errorf("failed to scan log file for last seqno: %w", err) 178 } 179 180 dp.curSeq = seq 181 dp.logfi = fi 182 183 return nil 184} 185 186func (dp *DiskPersistence) initLogFile() error { 187 if err := os.MkdirAll(dp.primaryDir, 0775); err != nil { 188 return err 189 } 190 191 p := filepath.Join(dp.primaryDir, "evts-0") 192 fi, err := os.Create(p) 193 if err != nil { 194 return err 195 } 196 197 if err := dp.meta.Create(&LogFileRef{ 198 Path: "evts-0", 199 SeqStart: 0, 200 }).Error; err != nil { 201 return err 202 } 203 204 dp.logfi = fi 205 dp.curSeq = 1 206 return nil 207} 208 209// swapLog swaps the current log file out for a new empty one 210// must only be called while holding dp.lk 211func (dp *DiskPersistence) swapLog(ctx context.Context) error { 212 if err := dp.logfi.Close(); err != nil { 213 return fmt.Errorf("failed to close current log file: %w", err) 214 } 215 216 fname := fmt.Sprintf("evts-%d", dp.curSeq) 217 nextp := filepath.Join(dp.primaryDir, fname) 218 219 fi, err := os.Create(nextp) 220 if err != nil { 221 return err 222 } 223 224 if err := dp.meta.Create(&LogFileRef{ 225 Path: fname, 226 SeqStart: dp.curSeq, 227 }).Error; err != nil { 228 return err 229 } 230 231 dp.logfi = fi 232 return nil 233} 234 235func scanForLastSeq(fi *os.File, end int64) (int64, error) { 236 scratch := make([]byte, headerSize) 237 238 var lastSeq int64 = -1 239 var offset int64 240 for { 241 eh, err := readHeader(fi, scratch) 242 if err != nil { 243 if errors.Is(err, io.EOF) { 244 return lastSeq, nil 245 } 246 return 0, err 247 } 248 249 if end > 0 && eh.Seq > end { 250 // return to beginning of offset 251 n, err := fi.Seek(offset, io.SeekStart) 252 if err != nil { 253 return 0, err 254 } 255 256 if n != offset { 257 return 0, fmt.Errorf("rewind seek failed") 258 } 259 260 return eh.Seq, nil 261 } 262 263 lastSeq = eh.Seq 264 265 noff, err := fi.Seek(int64(eh.Len), io.SeekCurrent) 266 if err != nil { 267 return 0, err 268 } 269 270 if noff != offset+headerSize+int64(eh.Len) { 271 // TODO: must recover from this 272 return 0, fmt.Errorf("did not seek to next event properly") 273 } 274 275 offset = noff 276 } 277} 278 279const ( 280 evtKindCommit = 1 281 evtKindHandle = 2 // DEPRECATED 282 evtKindTombstone = 3 // DEPRECATED 283 evtKindIdentity = 4 284 evtKindAccount = 5 285 evtKindSync = 6 286) 287 288var emptyHeader = make([]byte, headerSize) 289 290func (dp *DiskPersistence) addJobToQueue(ctx context.Context, job persistJob) error { 291 dp.lk.Lock() 292 defer dp.lk.Unlock() 293 294 if err := dp.doPersist(ctx, job); err != nil { 295 return err 296 } 297 298 // TODO: for some reason replacing this constant with p.writeBufferSize dramatically reduces perf... 299 if len(dp.evtbuf) > 400 { 300 if err := dp.flushLog(ctx); err != nil { 301 return fmt.Errorf("failed to flush disk log: %w", err) 302 } 303 } 304 305 return nil 306} 307 308func (dp *DiskPersistence) flushRoutine() { 309 t := time.NewTicker(time.Millisecond * 100) 310 311 for { 312 ctx := context.Background() 313 select { 314 case <-dp.shutdown: 315 return 316 case <-t.C: 317 dp.lk.Lock() 318 if err := dp.flushLog(ctx); err != nil { 319 // TODO: this happening is quite bad. Need a recovery strategy 320 log.Error("failed to flush disk log", "err", err) 321 } 322 dp.lk.Unlock() 323 } 324 } 325} 326 327func (dp *DiskPersistence) flushLog(ctx context.Context) error { 328 if len(dp.evtbuf) == 0 { 329 return nil 330 } 331 332 _, err := io.Copy(dp.logfi, dp.outbuf) 333 if err != nil { 334 return err 335 } 336 337 dp.outbuf.Truncate(0) 338 339 for _, ej := range dp.evtbuf { 340 dp.broadcast(ej.Evt) 341 ej.Buffer.Truncate(0) 342 dp.buffers.Put(ej.Buffer) 343 } 344 345 dp.evtbuf = dp.evtbuf[:0] 346 347 return nil 348} 349 350func (dp *DiskPersistence) garbageCollectRoutine() { 351 t := time.NewTicker(time.Hour) 352 353 for { 354 ctx := context.Background() 355 select { 356 // Closing a channel can be listened to with multiple routines: https://goplay.tools/snippet/UcwbC0CeJAL 357 case <-dp.shutdown: 358 return 359 case <-t.C: 360 if errs := dp.garbageCollect(ctx); len(errs) > 0 { 361 for _, err := range errs { 362 log.Error("garbage collection error", "err", err) 363 } 364 } 365 } 366 } 367} 368 369var garbageCollectionsExecuted = promauto.NewCounterVec(prometheus.CounterOpts{ 370 Name: "disk_persister_garbage_collections_executed", 371 Help: "Number of garbage collections executed", 372}, []string{}) 373 374var garbageCollectionErrors = promauto.NewCounterVec(prometheus.CounterOpts{ 375 Name: "disk_persister_garbage_collections_errors", 376 Help: "Number of errors encountered during garbage collection", 377}, []string{}) 378 379var refsGarbageCollected = promauto.NewCounterVec(prometheus.CounterOpts{ 380 Name: "disk_persister_garbage_collections_refs_collected", 381 Help: "Number of refs collected during garbage collection", 382}, []string{}) 383 384var filesGarbageCollected = promauto.NewCounterVec(prometheus.CounterOpts{ 385 Name: "disk_persister_garbage_collections_files_collected", 386 Help: "Number of files collected during garbage collection", 387}, []string{}) 388 389func (dp *DiskPersistence) garbageCollect(ctx context.Context) []error { 390 garbageCollectionsExecuted.WithLabelValues().Inc() 391 392 // Grab refs created before the retention period 393 var refs []LogFileRef 394 var errs []error 395 396 defer func() { 397 garbageCollectionErrors.WithLabelValues().Add(float64(len(errs))) 398 }() 399 400 if err := dp.meta.WithContext(ctx).Find(&refs, "created_at < ?", time.Now().Add(-dp.retention)).Error; err != nil { 401 return []error{err} 402 } 403 404 oldRefsFound := len(refs) 405 refsDeleted := 0 406 filesDeleted := 0 407 408 // In the future if we want to support Archiving, we could do that here instead of deleting 409 for _, r := range refs { 410 dp.lk.Lock() 411 currentLogfile := dp.logfi.Name() 412 dp.lk.Unlock() 413 414 if filepath.Join(dp.primaryDir, r.Path) == currentLogfile { 415 // Don't delete the current log file 416 log.Info("skipping deletion of current log file") 417 continue 418 } 419 420 // Delete the ref in the database to prevent playback from finding it 421 if err := dp.meta.WithContext(ctx).Delete(&r).Error; err != nil { 422 errs = append(errs, err) 423 continue 424 } 425 refsDeleted++ 426 427 // Delete the file from disk 428 if err := os.Remove(filepath.Join(dp.primaryDir, r.Path)); err != nil { 429 errs = append(errs, err) 430 continue 431 } 432 filesDeleted++ 433 } 434 435 refsGarbageCollected.WithLabelValues().Add(float64(refsDeleted)) 436 filesGarbageCollected.WithLabelValues().Add(float64(filesDeleted)) 437 438 log.Info("garbage collection complete", 439 "filesDeleted", filesDeleted, 440 "refsDeleted", refsDeleted, 441 "oldRefsFound", oldRefsFound, 442 ) 443 444 return errs 445} 446 447func (dp *DiskPersistence) doPersist(ctx context.Context, j persistJob) error { 448 b := j.Bytes 449 e := j.Evt 450 seq := dp.curSeq 451 dp.curSeq++ 452 453 // Set sequence number in event header 454 binary.LittleEndian.PutUint64(b[20:], uint64(seq)) 455 456 switch { 457 case e.RepoCommit != nil: 458 e.RepoCommit.Seq = seq 459 case e.RepoSync != nil: 460 e.RepoSync.Seq = seq 461 case e.RepoIdentity != nil: 462 e.RepoIdentity.Seq = seq 463 case e.RepoAccount != nil: 464 e.RepoAccount.Seq = seq 465 default: 466 // only those three get peristed right now 467 // we should not actually ever get here... 468 return nil 469 } 470 471 // TODO: does this guarantee a full write? 472 _, err := dp.outbuf.Write(b) 473 if err != nil { 474 return err 475 } 476 477 dp.evtbuf = append(dp.evtbuf, j) 478 479 if seq%dp.eventsPerFile == 0 { 480 if err := dp.flushLog(ctx); err != nil { 481 return err 482 } 483 484 // time to roll the log file 485 if err := dp.swapLog(ctx); err != nil { 486 return err 487 } 488 } 489 490 return nil 491} 492 493func (dp *DiskPersistence) Persist(ctx context.Context, e *events.XRPCStreamEvent) error { 494 buffer := dp.buffers.Get().(*bytes.Buffer) 495 cw := dp.writers.Get().(*cbg.CborWriter) 496 cw.SetWriter(buffer) 497 498 buffer.Truncate(0) 499 500 buffer.Write(emptyHeader) 501 502 var did string 503 var evtKind uint32 504 switch { 505 case e.RepoCommit != nil: 506 evtKind = evtKindCommit 507 did = e.RepoCommit.Repo 508 if err := e.RepoCommit.MarshalCBOR(cw); err != nil { 509 return fmt.Errorf("failed to marshal: %w", err) 510 } 511 case e.RepoSync != nil: 512 evtKind = evtKindSync 513 did = e.RepoSync.Did 514 if err := e.RepoSync.MarshalCBOR(cw); err != nil { 515 return fmt.Errorf("failed to marshal: %w", err) 516 } 517 case e.RepoIdentity != nil: 518 evtKind = evtKindIdentity 519 did = e.RepoIdentity.Did 520 if err := e.RepoIdentity.MarshalCBOR(cw); err != nil { 521 return fmt.Errorf("failed to marshal: %w", err) 522 } 523 case e.RepoAccount != nil: 524 evtKind = evtKindAccount 525 did = e.RepoAccount.Did 526 if err := e.RepoAccount.MarshalCBOR(cw); err != nil { 527 return fmt.Errorf("failed to marshal: %w", err) 528 } 529 default: 530 return nil 531 // only those two get peristed right now 532 } 533 534 usr, err := dp.uidForDid(ctx, did) 535 if err != nil { 536 return err 537 } 538 539 b := buffer.Bytes() 540 541 // Set flags in header (no flags for now) 542 binary.LittleEndian.PutUint32(b, 0) 543 // Set event kind in header 544 binary.LittleEndian.PutUint32(b[4:], evtKind) 545 // Set event length in header 546 binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-headerSize)) 547 // Set user UID in header 548 binary.LittleEndian.PutUint64(b[12:], uint64(usr)) 549 550 return dp.addJobToQueue(ctx, persistJob{ 551 Bytes: b, 552 Evt: e, 553 Buffer: buffer, 554 }) 555} 556 557type evtHeader struct { 558 Flags uint32 559 Kind uint32 560 Seq int64 561 Usr models.Uid 562 Len uint32 563} 564 565func (eh *evtHeader) Len64() int64 { 566 return int64(eh.Len) 567} 568 569const headerSize = 4 + 4 + 4 + 8 + 8 570 571func readHeader(r io.Reader, scratch []byte) (*evtHeader, error) { 572 if len(scratch) < headerSize { 573 return nil, fmt.Errorf("must pass scratch buffer of at least %d bytes", headerSize) 574 } 575 576 scratch = scratch[:headerSize] 577 _, err := io.ReadFull(r, scratch) 578 if err != nil { 579 return nil, fmt.Errorf("reading header: %w", err) 580 } 581 582 flags := binary.LittleEndian.Uint32(scratch[:4]) 583 kind := binary.LittleEndian.Uint32(scratch[4:8]) 584 l := binary.LittleEndian.Uint32(scratch[8:12]) 585 usr := binary.LittleEndian.Uint64(scratch[12:20]) 586 seq := binary.LittleEndian.Uint64(scratch[20:28]) 587 588 return &evtHeader{ 589 Flags: flags, 590 Kind: kind, 591 Len: l, 592 Usr: models.Uid(usr), 593 Seq: int64(seq), 594 }, nil 595} 596 597func (dp *DiskPersistence) writeHeader(ctx context.Context, flags uint32, kind uint32, l uint32, usr uint64, seq int64) error { 598 binary.LittleEndian.PutUint32(dp.scratch, flags) 599 binary.LittleEndian.PutUint32(dp.scratch[4:], kind) 600 binary.LittleEndian.PutUint32(dp.scratch[8:], l) 601 binary.LittleEndian.PutUint64(dp.scratch[12:], usr) 602 binary.LittleEndian.PutUint64(dp.scratch[20:], uint64(seq)) 603 604 nw, err := dp.logfi.Write(dp.scratch) 605 if err != nil { 606 return err 607 } 608 609 if nw != headerSize { 610 return fmt.Errorf("only wrote %d bytes for header", nw) 611 } 612 613 return nil 614} 615 616func (dp *DiskPersistence) uidForDid(ctx context.Context, did string) (models.Uid, error) { 617 if uid, ok := dp.didCache.Get(did); ok { 618 return uid, nil 619 } 620 621 var u models.ActorInfo 622 if err := dp.meta.First(&u, "did = ?", did).Error; err != nil { 623 return 0, err 624 } 625 626 dp.didCache.Add(did, u.Uid) 627 628 return u.Uid, nil 629} 630 631func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 632 base := since - (since % dp.eventsPerFile) 633 var logs []LogFileRef 634 if err := dp.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", base).Error; err != nil { 635 return err 636 } 637 638 for i := 0; i < 10; i++ { 639 lastSeq, err := dp.PlaybackLogfiles(ctx, since, cb, logs) 640 if err != nil { 641 return err 642 } 643 644 // No lastSeq implies that we read until the end of known events 645 if lastSeq == nil { 646 break 647 } 648 649 if err := dp.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", *lastSeq).Error; err != nil { 650 return err 651 } 652 since = *lastSeq 653 } 654 655 return nil 656} 657 658func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error) { 659 for i, lf := range logFiles { 660 lastSeq, err := dp.readEventsFrom(ctx, since, filepath.Join(dp.primaryDir, lf.Path), cb) 661 if err != nil { 662 return nil, err 663 } 664 since = 0 665 if i == len(logFiles)-1 && 666 lastSeq != nil && 667 (*lastSeq-lf.SeqStart) == dp.eventsPerFile-1 { 668 // There may be more log files to read since the last one was full 669 return lastSeq, nil 670 } 671 } 672 673 return nil, nil 674} 675 676func postDoNotEmit(flags uint32) bool { 677 if flags&(EvtFlagRebased|EvtFlagTakedown) != 0 { 678 return true 679 } 680 681 return false 682} 683 684func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*events.XRPCStreamEvent) error) (*int64, error) { 685 fi, err := os.OpenFile(fn, os.O_RDONLY, 0) 686 if err != nil { 687 return nil, err 688 } 689 690 if since != 0 { 691 lastSeq, err := scanForLastSeq(fi, since) 692 if err != nil { 693 return nil, err 694 } 695 if since > lastSeq { 696 log.Error("playback cursor is greater than last seq of file checked", 697 "since", since, 698 "lastSeq", lastSeq, 699 "filename", fn, 700 ) 701 return nil, nil 702 } 703 } 704 705 bufr := bufio.NewReader(fi) 706 707 lastSeq := int64(0) 708 709 scratch := make([]byte, headerSize) 710 for { 711 h, err := readHeader(bufr, scratch) 712 if err != nil { 713 if errors.Is(err, io.EOF) { 714 return &lastSeq, nil 715 } 716 717 return nil, err 718 } 719 720 lastSeq = h.Seq 721 722 if postDoNotEmit(h.Flags) { 723 // event taken down, skip 724 _, err := io.CopyN(io.Discard, bufr, h.Len64()) // would be really nice if the buffered reader had a 'skip' method that does a seek under the hood 725 if err != nil { 726 return nil, fmt.Errorf("failed while skipping event (seq: %d, fn: %q): %w", h.Seq, fn, err) 727 } 728 continue 729 } 730 731 switch h.Kind { 732 case evtKindCommit: 733 var evt atproto.SyncSubscribeRepos_Commit 734 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 735 return nil, err 736 } 737 evt.Seq = h.Seq 738 if err := cb(&events.XRPCStreamEvent{RepoCommit: &evt}); err != nil { 739 return nil, err 740 } 741 case evtKindSync: 742 var evt atproto.SyncSubscribeRepos_Sync 743 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 744 return nil, err 745 } 746 evt.Seq = h.Seq 747 if err := cb(&events.XRPCStreamEvent{RepoSync: &evt}); err != nil { 748 return nil, err 749 } 750 case evtKindIdentity: 751 var evt atproto.SyncSubscribeRepos_Identity 752 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 753 return nil, err 754 } 755 evt.Seq = h.Seq 756 if err := cb(&events.XRPCStreamEvent{RepoIdentity: &evt}); err != nil { 757 return nil, err 758 } 759 case evtKindAccount: 760 var evt atproto.SyncSubscribeRepos_Account 761 if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 762 return nil, err 763 } 764 evt.Seq = h.Seq 765 if err := cb(&events.XRPCStreamEvent{RepoAccount: &evt}); err != nil { 766 return nil, err 767 } 768 default: 769 log.Warn("unrecognized event kind coming from log file", "seq", h.Seq, "kind", h.Kind) 770 return nil, fmt.Errorf("halting on unrecognized event kind") 771 } 772 } 773} 774 775type UserAction struct { 776 gorm.Model 777 778 Usr models.Uid 779 RebaseAt int64 780 Takedown bool 781} 782 783func (dp *DiskPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error { 784 /* 785 if err := p.meta.Create(&UserAction{ 786 Usr: usr, 787 Takedown: true, 788 }).Error; err != nil { 789 return err 790 } 791 */ 792 793 return dp.forEachShardWithUserEvents(ctx, usr, func(ctx context.Context, fn string) error { 794 if err := dp.deleteEventsForUser(ctx, usr, fn); err != nil { 795 return err 796 } 797 798 return nil 799 }) 800} 801 802func (dp *DiskPersistence) forEachShardWithUserEvents(ctx context.Context, usr models.Uid, cb func(context.Context, string) error) error { 803 var refs []LogFileRef 804 if err := dp.meta.Order("created_at desc").Find(&refs).Error; err != nil { 805 return err 806 } 807 808 for _, r := range refs { 809 mhas, err := dp.refMaybeHasUserEvents(ctx, usr, r) 810 if err != nil { 811 return err 812 } 813 814 if mhas { 815 var path string 816 if r.Archived { 817 path = filepath.Join(dp.archiveDir, r.Path) 818 } else { 819 path = filepath.Join(dp.primaryDir, r.Path) 820 } 821 822 if err := cb(ctx, path); err != nil { 823 return err 824 } 825 } 826 } 827 828 return nil 829} 830 831func (dp *DiskPersistence) refMaybeHasUserEvents(ctx context.Context, usr models.Uid, ref LogFileRef) (bool, error) { 832 // TODO: lazily computed bloom filters for users in each logfile 833 return true, nil 834} 835 836type zeroReader struct{} 837 838func (zr *zeroReader) Read(p []byte) (n int, err error) { 839 for i := range p { 840 p[i] = 0 841 } 842 return len(p), nil 843} 844 845func (dp *DiskPersistence) deleteEventsForUser(ctx context.Context, usr models.Uid, fn string) error { 846 return dp.mutateUserEventsInLog(ctx, usr, fn, EvtFlagTakedown, true) 847} 848 849func (dp *DiskPersistence) mutateUserEventsInLog(ctx context.Context, usr models.Uid, fn string, flag uint32, zeroEvts bool) error { 850 fi, err := os.OpenFile(fn, os.O_RDWR, 0) 851 if err != nil { 852 return fmt.Errorf("failed to open log file: %w", err) 853 } 854 defer fi.Close() 855 defer fi.Sync() 856 857 scratch := make([]byte, headerSize) 858 var offset int64 859 for { 860 h, err := readHeader(fi, scratch) 861 if err != nil { 862 if errors.Is(err, io.EOF) { 863 return nil 864 } 865 866 return err 867 } 868 869 if h.Usr == usr && h.Flags&flag == 0 { 870 nflag := h.Flags | flag 871 872 binary.LittleEndian.PutUint32(scratch, nflag) 873 874 if _, err := fi.WriteAt(scratch[:4], offset); err != nil { 875 return fmt.Errorf("failed to write updated flag value: %w", err) 876 } 877 878 if zeroEvts { 879 // sync that write before blanking the event data 880 if err := fi.Sync(); err != nil { 881 return err 882 } 883 884 if _, err := fi.Seek(offset+headerSize, io.SeekStart); err != nil { 885 return fmt.Errorf("failed to seek: %w", err) 886 } 887 888 _, err := io.CopyN(fi, &zeroReader{}, h.Len64()) 889 if err != nil { 890 return err 891 } 892 } 893 } 894 895 offset += headerSize + h.Len64() 896 _, err = fi.Seek(offset, io.SeekStart) 897 if err != nil { 898 return fmt.Errorf("failed to seek: %w", err) 899 } 900 } 901} 902 903func (dp *DiskPersistence) Flush(ctx context.Context) error { 904 dp.lk.Lock() 905 defer dp.lk.Unlock() 906 if len(dp.evtbuf) > 0 { 907 return dp.flushLog(ctx) 908 } 909 return nil 910} 911 912func (dp *DiskPersistence) Shutdown(ctx context.Context) error { 913 close(dp.shutdown) 914 if err := dp.Flush(ctx); err != nil { 915 return err 916 } 917 918 dp.logfi.Close() 919 return nil 920} 921 922func (dp *DiskPersistence) SetEventBroadcaster(f func(*events.XRPCStreamEvent)) { 923 dp.broadcast = f 924}