Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "database/sql/driver"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/streamplace"
15)
16
17type SegmentMediadataVideo struct {
18 Width int `json:"width"`
19 Height int `json:"height"`
20 FPSNum int `json:"fpsNum"`
21 FPSDen int `json:"fpsDen"`
22}
23
24type SegmentMediadataAudio struct {
25 Rate int `json:"rate"`
26 Channels int `json:"channels"`
27}
28
29type SegmentMediaData struct {
30 Video []*SegmentMediadataVideo `json:"video"`
31 Audio []*SegmentMediadataAudio `json:"audio"`
32 Duration int64 `json:"duration"`
33}
34
35// Scan scan value into Jsonb, implements sql.Scanner interface
36func (j *SegmentMediaData) Scan(value interface{}) error {
37 bytes, ok := value.([]byte)
38 if !ok {
39 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
40 }
41
42 result := SegmentMediaData{}
43 err := json.Unmarshal(bytes, &result)
44 *j = SegmentMediaData(result)
45 return err
46}
47
48// Value return json value, implement driver.Valuer interface
49func (j SegmentMediaData) Value() (driver.Value, error) {
50 return json.Marshal(j)
51}
52
53type Segment struct {
54 ID string `json:"id" gorm:"primaryKey"`
55 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
56 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
57 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"`
58 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"`
59 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
60 Title string `json:"title"`
61 MediaData *SegmentMediaData `json:"mediaData,omitempty"`
62}
63
64func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) {
65 aqt := aqtime.FromTime(s.StartTime)
66 if s.MediaData == nil {
67 return nil, fmt.Errorf("media data is nil")
68 }
69 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil {
70 return nil, fmt.Errorf("video data is nil")
71 }
72 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil {
73 return nil, fmt.Errorf("audio data is nil")
74 }
75 duration := s.MediaData.Duration
76 return &streamplace.Segment{
77 LexiconTypeID: "place.stream.segment",
78 Creator: s.RepoDID,
79 Id: s.ID,
80 SigningKey: s.SigningKeyDID,
81 StartTime: string(aqt),
82 Duration: &duration,
83 Video: []*streamplace.Segment_Video{
84 {
85 Codec: "h264",
86 Width: int64(s.MediaData.Video[0].Width),
87 Height: int64(s.MediaData.Video[0].Height),
88 Framerate: &streamplace.Segment_Framerate{
89 Num: int64(s.MediaData.Video[0].FPSNum),
90 Den: int64(s.MediaData.Video[0].FPSDen),
91 },
92 },
93 },
94 Audio: []*streamplace.Segment_Audio{
95 {
96 Codec: "opus",
97 Rate: int64(s.MediaData.Audio[0].Rate),
98 Channels: int64(s.MediaData.Audio[0].Channels),
99 },
100 },
101 }, nil
102}
103
104func (m *DBModel) CreateSegment(seg *Segment) error {
105 err := m.DB.Model(Segment{}).Create(seg).Error
106 if err != nil {
107 return err
108 }
109 return nil
110}
111
112// should return the most recent segment for each user, ordered by most recent first
113// only includes segments from the last 30 seconds
114func (m *DBModel) MostRecentSegments() ([]Segment, error) {
115 var segments []Segment
116 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
117
118 err := m.DB.Table("segments").
119 Select("segments.*").
120 Where("id IN (?)",
121 m.DB.Table("segments").
122 Select("id").
123 Where("(repo_did, start_time) IN (?)",
124 m.DB.Table("segments").
125 Select("repo_did, MAX(start_time)").
126 Group("repo_did"))).
127 Order("start_time DESC").
128 Joins("JOIN repos ON segments.repo_did = repos.did").
129 Preload("Repo").
130 Find(&segments).Error
131
132 if err != nil {
133 return nil, err
134 }
135 if segments == nil {
136 return []Segment{}, nil
137 }
138
139 filteredSegments := []Segment{}
140 for _, seg := range segments {
141 if seg.StartTime.After(thirtySecondsAgo) {
142 filteredSegments = append(filteredSegments, seg)
143 }
144 }
145
146 return filteredSegments, nil
147}
148
149func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) {
150 var seg Segment
151 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error
152 if err != nil {
153 return nil, err
154 }
155 return &seg, nil
156}
157
158func (m *DBModel) GetLiveUsers() ([]Segment, error) {
159 var liveUsers []Segment
160 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time()
161
162 err := m.DB.Model(&Segment{}).
163 Preload("Repo").
164 Where("start_time >= ?", thirtySecondsAgo).
165 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)").
166 Order("start_time DESC").
167 Find(&liveUsers).Error
168
169 if err != nil {
170 return nil, err
171 }
172 if liveUsers == nil {
173 return []Segment{}, nil
174 }
175
176 return liveUsers, nil
177}
178
179func (m *DBModel) GetSegment(id string) (*Segment, error) {
180 var seg Segment
181
182 err := m.DB.Model(&Segment{}).
183 Preload("Repo").
184 Where("id = ?", id).
185 First(&seg).Error
186
187 if errors.Is(err, gorm.ErrRecordNotFound) {
188 return nil, nil
189 }
190 if err != nil {
191 return nil, err
192 }
193
194 return &seg, nil
195}
196
197func (m *DBModel) StartSegmentCleaner(ctx context.Context) error {
198 err := m.SegmentCleaner(ctx)
199 if err != nil {
200 return err
201 }
202 ticker := time.NewTicker(1 * time.Minute)
203 defer ticker.Stop()
204
205 for {
206 select {
207 case <-ctx.Done():
208 return nil
209 case <-ticker.C:
210 err := m.SegmentCleaner(ctx)
211 if err != nil {
212 log.Error(ctx, "Failed to clean segments", "error", err)
213 }
214 }
215 }
216}
217
218func (m *DBModel) SegmentCleaner(ctx context.Context) error {
219 // Calculate the cutoff time (10 minutes ago)
220 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time()
221
222 // Find all unique repo_did values
223 var repoDIDs []string
224 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil {
225 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err)
226 return err
227 }
228
229 // For each user, keep their last 10 segments and delete older ones
230 for _, repoDID := range repoDIDs {
231 // Get IDs of the last 10 segments for this user
232 var keepSegmentIDs []string
233 if err := m.DB.Model(&Segment{}).
234 Where("repo_did = ?", repoDID).
235 Order("start_time DESC").
236 Limit(10).
237 Pluck("id", &keepSegmentIDs).Error; err != nil {
238 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err)
239 return err
240 }
241
242 // Delete old segments except the ones we want to keep
243 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?",
244 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{})
245
246 if result.Error != nil {
247 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error)
248 } else if result.RowsAffected > 0 {
249 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected)
250 }
251 }
252 return nil
253}