Live video on the AT Protocol
1package storage
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "os"
8 "time"
9
10 "golang.org/x/sync/errgroup"
11 "stream.place/streamplace/pkg/aqtime"
12 "stream.place/streamplace/pkg/config"
13 "stream.place/streamplace/pkg/localdb"
14 "stream.place/streamplace/pkg/log"
15)
16
17const moderationRetention = 120 * time.Second
18
19func StartSegmentCleaner(ctx context.Context, localDB localdb.LocalDB, cli *config.CLI) error {
20 ctx = log.WithLogValues(ctx, "func", "StartSegmentCleaner")
21 g, ctx := errgroup.WithContext(ctx)
22 g.Go(func() error {
23 for {
24 select {
25 case <-ctx.Done():
26 return nil
27 case <-time.After(60 * time.Second):
28 expiredSegments, err := localDB.GetExpiredSegments(ctx)
29 if err != nil {
30 return err
31 }
32 log.Log(ctx, "Cleaning expired segments", "count", len(expiredSegments))
33 for _, seg := range expiredSegments {
34 g.Go(func() error {
35 err := deleteSegment(ctx, localDB, cli, seg)
36 if err != nil {
37 log.Error(ctx, "Failed to delete segment", "error", err)
38 }
39 return nil
40 })
41
42 }
43 }
44 }
45 })
46
47 return g.Wait()
48}
49
50func deleteSegment(ctx context.Context, localDB localdb.LocalDB, cli *config.CLI, seg localdb.Segment) error {
51 if time.Since(seg.StartTime) < moderationRetention {
52 log.Debug(ctx, "Skipping deletion of segment", "id", seg.ID, "time since start", time.Since(seg.StartTime))
53 return nil
54 }
55 aqt := aqtime.FromTime(seg.StartTime)
56 fpath, err := cli.SegmentFilePath(seg.RepoDID, fmt.Sprintf("%s.%s", aqt.FileSafeString(), "mp4"))
57 if err != nil {
58 return err
59 }
60 err = os.Remove(fpath)
61 if err != nil && !errors.Is(err, os.ErrNotExist) {
62 return err
63 }
64 err = localDB.DeleteSegment(ctx, seg.ID)
65 if err != nil {
66 return err
67 }
68 return nil
69}