Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "database/sql/driver"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/streamplace"
15)
16
17type SegmentMediadataVideo struct {
18 Width int `json:"width"`
19 Height int `json:"height"`
20 FPSNum int `json:"fpsNum"`
21 FPSDen int `json:"fpsDen"`
22 BFrames bool `json:"bframes"`
23}
24
25type SegmentMediadataAudio struct {
26 Rate int `json:"rate"`
27 Channels int `json:"channels"`
28}
29
30type SegmentMediaData struct {
31 Video []*SegmentMediadataVideo `json:"video"`
32 Audio []*SegmentMediadataAudio `json:"audio"`
33 Duration int64 `json:"duration"`
34 Size int `json:"size"`
35}
36
37// Scan scan value into Jsonb, implements sql.Scanner interface
38func (j *SegmentMediaData) Scan(value any) error {
39 bytes, ok := value.([]byte)
40 if !ok {
41 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
42 }
43
44 result := SegmentMediaData{}
45 err := json.Unmarshal(bytes, &result)
46 *j = SegmentMediaData(result)
47 return err
48}
49
50// Value return json value, implement driver.Valuer interface
51func (j SegmentMediaData) Value() (driver.Value, error) {
52 return json.Marshal(j)
53}
54
55// ContentRights represents content rights and attribution information
56type ContentRights struct {
57 CopyrightNotice *string `json:"copyrightNotice,omitempty"`
58 CopyrightYear *int64 `json:"copyrightYear,omitempty"`
59 Creator *string `json:"creator,omitempty"`
60 CreditLine *string `json:"creditLine,omitempty"`
61 License *string `json:"license,omitempty"`
62}
63
64// Scan scan value into ContentRights, implements sql.Scanner interface
65func (c *ContentRights) Scan(value any) error {
66 if value == nil {
67 *c = ContentRights{}
68 return nil
69 }
70 bytes, ok := value.([]byte)
71 if !ok {
72 return errors.New(fmt.Sprint("Failed to unmarshal ContentRights value:", value))
73 }
74
75 result := ContentRights{}
76 err := json.Unmarshal(bytes, &result)
77 *c = ContentRights(result)
78 return err
79}
80
81// Value return json value, implement driver.Valuer interface
82func (c ContentRights) Value() (driver.Value, error) {
83 return json.Marshal(c)
84}
85
86// DistributionPolicy represents distribution policy information
87type DistributionPolicy struct {
88 ExpiresAt *time.Time `json:"expiresAt,omitempty"`
89}
90
91// Scan scan value into DistributionPolicy, implements sql.Scanner interface
92func (d *DistributionPolicy) Scan(value any) error {
93 if value == nil {
94 *d = DistributionPolicy{}
95 return nil
96 }
97 bytes, ok := value.([]byte)
98 if !ok {
99 return errors.New(fmt.Sprint("Failed to unmarshal DistributionPolicy value:", value))
100 }
101
102 result := DistributionPolicy{}
103 err := json.Unmarshal(bytes, &result)
104 *d = DistributionPolicy(result)
105 return err
106}
107
108// Value return json value, implement driver.Valuer interface
109func (d DistributionPolicy) Value() (driver.Value, error) {
110 return json.Marshal(d)
111}
112
113// ContentWarningsSlice is a custom type for storing content warnings as JSON in the database
114type ContentWarningsSlice []string
115
116// Scan scan value into ContentWarningsSlice, implements sql.Scanner interface
117func (c *ContentWarningsSlice) Scan(value any) error {
118 if value == nil {
119 *c = ContentWarningsSlice{}
120 return nil
121 }
122 bytes, ok := value.([]byte)
123 if !ok {
124 return errors.New(fmt.Sprint("Failed to unmarshal ContentWarningsSlice value:", value))
125 }
126
127 result := ContentWarningsSlice{}
128 err := json.Unmarshal(bytes, &result)
129 *c = ContentWarningsSlice(result)
130 return err
131}
132
133// Value return json value, implement driver.Valuer interface
134func (c ContentWarningsSlice) Value() (driver.Value, error) {
135 return json.Marshal(c)
136}
137
138type Segment struct {
139 ID string `json:"id" gorm:"primaryKey"`
140 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
141 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
142 StartTime time.Time `json:"startTime" gorm:"index:latest_segments,priority:2;index:start_time"`
143 RepoDID string `json:"repoDID" gorm:"index:latest_segments,priority:1;column:repo_did"`
144 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
145 Title string `json:"title"`
146 Size int `json:"size" gorm:"column:size"`
147 MediaData *SegmentMediaData `json:"mediaData,omitempty"`
148 ContentWarnings ContentWarningsSlice `json:"contentWarnings,omitempty"`
149 ContentRights *ContentRights `json:"contentRights,omitempty"`
150 DistributionPolicy *DistributionPolicy `json:"distributionPolicy,omitempty"`
151 DeleteAfter *time.Time `json:"deleteAfter,omitempty" gorm:"column:delete_after;index:delete_after"`
152}
153
154func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) {
155 aqt := aqtime.FromTime(s.StartTime)
156 if s.MediaData == nil {
157 return nil, fmt.Errorf("media data is nil")
158 }
159 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil {
160 return nil, fmt.Errorf("video data is nil")
161 }
162 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil {
163 return nil, fmt.Errorf("audio data is nil")
164 }
165 duration := s.MediaData.Duration
166 sizei64 := int64(s.Size)
167
168 // Convert model metadata to streamplace metadata
169 var contentRights *streamplace.MetadataContentRights
170 if s.ContentRights != nil {
171 contentRights = &streamplace.MetadataContentRights{
172 CopyrightNotice: s.ContentRights.CopyrightNotice,
173 CopyrightYear: s.ContentRights.CopyrightYear,
174 Creator: s.ContentRights.Creator,
175 CreditLine: s.ContentRights.CreditLine,
176 License: s.ContentRights.License,
177 }
178 }
179
180 var contentWarnings *streamplace.MetadataContentWarnings
181 if len(s.ContentWarnings) > 0 {
182 contentWarnings = &streamplace.MetadataContentWarnings{
183 Warnings: []string(s.ContentWarnings),
184 }
185 }
186
187 var distributionPolicy *streamplace.MetadataDistributionPolicy
188 if s.DistributionPolicy != nil && s.DistributionPolicy.ExpiresAt != nil {
189 // Convert the absolute timestamp back to a duration (in seconds) from segment start
190 startTimeUnix := s.StartTime.Unix()
191 expiresAtUnix := s.DistributionPolicy.ExpiresAt.Unix()
192 deleteAfterSecs := expiresAtUnix - startTimeUnix
193 distributionPolicy = &streamplace.MetadataDistributionPolicy{
194 DeleteAfter: &deleteAfterSecs,
195 }
196 }
197
198 return &streamplace.Segment{
199 LexiconTypeID: "place.stream.segment",
200 Creator: s.RepoDID,
201 Id: s.ID,
202 SigningKey: s.SigningKeyDID,
203 StartTime: string(aqt),
204 Duration: &duration,
205 Size: &sizei64,
206 ContentRights: contentRights,
207 ContentWarnings: contentWarnings,
208 DistributionPolicy: distributionPolicy,
209 Video: []*streamplace.Segment_Video{
210 {
211 Codec: "h264",
212 Width: int64(s.MediaData.Video[0].Width),
213 Height: int64(s.MediaData.Video[0].Height),
214 Framerate: &streamplace.Segment_Framerate{
215 Num: int64(s.MediaData.Video[0].FPSNum),
216 Den: int64(s.MediaData.Video[0].FPSDen),
217 },
218 Bframes: &s.MediaData.Video[0].BFrames,
219 },
220 },
221 Audio: []*streamplace.Segment_Audio{
222 {
223 Codec: "opus",
224 Rate: int64(s.MediaData.Audio[0].Rate),
225 Channels: int64(s.MediaData.Audio[0].Channels),
226 },
227 },
228 }, nil
229}
230
231func (m *DBModel) CreateSegment(seg *Segment) error {
232 err := m.DB.Model(Segment{}).Create(seg).Error
233 if err != nil {
234 return err
235 }
236 return nil
237}
238
239// should return the most recent segment for each user, ordered by most recent first
240// only includes segments from the last 30 seconds
241func (m *DBModel) MostRecentSegments() ([]Segment, error) {
242 var segments []Segment
243 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
244
245 err := m.DB.Table("segments").
246 Select("segments.*").
247 Where("start_time > ?", thirtySecondsAgo.UTC()).
248 Order("start_time DESC").
249 Find(&segments).Error
250 if err != nil {
251 return nil, err
252 }
253 if segments == nil {
254 return []Segment{}, nil
255 }
256
257 segmentMap := make(map[string]Segment)
258 for _, seg := range segments {
259 prev, ok := segmentMap[seg.RepoDID]
260 if !ok {
261 segmentMap[seg.RepoDID] = seg
262 } else {
263 if seg.StartTime.After(prev.StartTime) {
264 segmentMap[seg.RepoDID] = seg
265 }
266 }
267 }
268
269 filteredSegments := []Segment{}
270 for _, seg := range segmentMap {
271 filteredSegments = append(filteredSegments, seg)
272 }
273
274 return filteredSegments, nil
275}
276
277func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) {
278 var seg Segment
279 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error
280 if err != nil {
281 return nil, err
282 }
283 return &seg, nil
284}
285
286func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) {
287 var segs []Segment
288 if before == nil {
289 later := time.Now().Add(1000 * time.Hour)
290 before = &later
291 }
292 if after == nil {
293 earlier := time.Time{}
294 after = &earlier
295 }
296 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error
297 if err != nil {
298 return nil, err
299 }
300 return segs, nil
301}
302
303func (m *DBModel) GetSegment(id string) (*Segment, error) {
304 var seg Segment
305
306 err := m.DB.Model(&Segment{}).
307 Preload("Repo").
308 Where("id = ?", id).
309 First(&seg).Error
310
311 if errors.Is(err, gorm.ErrRecordNotFound) {
312 return nil, nil
313 }
314 if err != nil {
315 return nil, err
316 }
317
318 return &seg, nil
319}
320
321func (m *DBModel) GetExpiredSegments(ctx context.Context) ([]Segment, error) {
322
323 var expiredSegments []Segment
324 now := time.Now()
325 err := m.DB.
326 Where("delete_after IS NOT NULL AND delete_after < ?", now.UTC()).
327 Find(&expiredSegments).Error
328 if err != nil {
329 return nil, err
330 }
331
332 return expiredSegments, nil
333}
334
335func (m *DBModel) DeleteSegment(ctx context.Context, id string) error {
336 return m.DB.Delete(&Segment{}, "id = ?", id).Error
337}
338
339func (m *DBModel) StartSegmentCleaner(ctx context.Context) error {
340 err := m.SegmentCleaner(ctx)
341 if err != nil {
342 return err
343 }
344 ticker := time.NewTicker(1 * time.Minute)
345 defer ticker.Stop()
346
347 for {
348 select {
349 case <-ctx.Done():
350 return nil
351 case <-ticker.C:
352 err := m.SegmentCleaner(ctx)
353 if err != nil {
354 log.Error(ctx, "Failed to clean segments", "error", err)
355 }
356 }
357 }
358}
359
360func (m *DBModel) SegmentCleaner(ctx context.Context) error {
361 // Calculate the cutoff time (10 minutes ago)
362 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time()
363
364 // Find all unique repo_did values
365 var repoDIDs []string
366 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil {
367 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err)
368 return err
369 }
370
371 // For each user, keep their last 10 segments and delete older ones
372 for _, repoDID := range repoDIDs {
373 // Get IDs of the last 10 segments for this user
374 var keepSegmentIDs []string
375 if err := m.DB.Model(&Segment{}).
376 Where("repo_did = ?", repoDID).
377 Order("start_time DESC").
378 Limit(10).
379 Pluck("id", &keepSegmentIDs).Error; err != nil {
380 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err)
381 return err
382 }
383
384 // Delete old segments except the ones we want to keep
385 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?",
386 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{})
387
388 if result.Error != nil {
389 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error)
390 } else if result.RowsAffected > 0 {
391 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected)
392 }
393 }
394 return nil
395}