Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.2 69 lines 1.7 kB view raw
1package storage 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "os" 8 "time" 9 10 "golang.org/x/sync/errgroup" 11 "stream.place/streamplace/pkg/aqtime" 12 "stream.place/streamplace/pkg/config" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/model" 15) 16 17const moderationRetention = 120 * time.Second 18 19func StartSegmentCleaner(ctx context.Context, mod model.Model, cli *config.CLI) error { 20 ctx = log.WithLogValues(ctx, "func", "StartSegmentCleaner") 21 g, ctx := errgroup.WithContext(ctx) 22 g.Go(func() error { 23 for { 24 select { 25 case <-ctx.Done(): 26 return nil 27 case <-time.After(60 * time.Second): 28 expiredSegments, err := mod.GetExpiredSegments(ctx) 29 if err != nil { 30 return err 31 } 32 log.Log(ctx, "Cleaning expired segments", "count", len(expiredSegments)) 33 for _, seg := range expiredSegments { 34 g.Go(func() error { 35 err := deleteSegment(ctx, mod, cli, seg) 36 if err != nil { 37 log.Error(ctx, "Failed to delete segment", "error", err) 38 } 39 return nil 40 }) 41 42 } 43 } 44 } 45 }) 46 47 return g.Wait() 48} 49 50func deleteSegment(ctx context.Context, mod model.Model, cli *config.CLI, seg model.Segment) error { 51 if time.Since(seg.StartTime) < moderationRetention { 52 log.Debug(ctx, "Skipping deletion of segment", "id", seg.ID, "time since start", time.Since(seg.StartTime)) 53 return nil 54 } 55 aqt := aqtime.FromTime(seg.StartTime) 56 fpath, err := cli.SegmentFilePath(seg.RepoDID, fmt.Sprintf("%s.%s", aqt.FileSafeString(), "mp4")) 57 if err != nil { 58 return err 59 } 60 err = os.Remove(fpath) 61 if err != nil && !errors.Is(err, os.ErrNotExist) { 62 return err 63 } 64 err = mod.DeleteSegment(ctx, seg.ID) 65 if err != nil { 66 return err 67 } 68 return nil 69}