Live video on the AT Protocol
at next 69 lines 1.7 kB view raw
1package storage 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "os" 8 "time" 9 10 "golang.org/x/sync/errgroup" 11 "stream.place/streamplace/pkg/aqtime" 12 "stream.place/streamplace/pkg/config" 13 "stream.place/streamplace/pkg/localdb" 14 "stream.place/streamplace/pkg/log" 15) 16 17const moderationRetention = 120 * time.Second 18 19func StartSegmentCleaner(ctx context.Context, localDB localdb.LocalDB, cli *config.CLI) error { 20 ctx = log.WithLogValues(ctx, "func", "StartSegmentCleaner") 21 g, ctx := errgroup.WithContext(ctx) 22 g.Go(func() error { 23 for { 24 select { 25 case <-ctx.Done(): 26 return nil 27 case <-time.After(60 * time.Second): 28 expiredSegments, err := localDB.GetExpiredSegments(ctx) 29 if err != nil { 30 return err 31 } 32 log.Log(ctx, "Cleaning expired segments", "count", len(expiredSegments)) 33 for _, seg := range expiredSegments { 34 g.Go(func() error { 35 err := deleteSegment(ctx, localDB, cli, seg) 36 if err != nil { 37 log.Error(ctx, "Failed to delete segment", "error", err) 38 } 39 return nil 40 }) 41 42 } 43 } 44 } 45 }) 46 47 return g.Wait() 48} 49 50func deleteSegment(ctx context.Context, localDB localdb.LocalDB, cli *config.CLI, seg localdb.Segment) error { 51 if time.Since(seg.StartTime) < moderationRetention { 52 log.Debug(ctx, "Skipping deletion of segment", "id", seg.ID, "time since start", time.Since(seg.StartTime)) 53 return nil 54 } 55 aqt := aqtime.FromTime(seg.StartTime) 56 fpath, err := cli.SegmentFilePath(seg.RepoDID, fmt.Sprintf("%s.%s", aqt.FileSafeString(), "mp4")) 57 if err != nil { 58 return err 59 } 60 err = os.Remove(fpath) 61 if err != nil && !errors.Is(err, os.ErrNotExist) { 62 return err 63 } 64 err = localDB.DeleteSegment(ctx, seg.ID) 65 if err != nil { 66 return err 67 } 68 return nil 69}