Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "database/sql/driver"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/streamplace"
15)
16
17type SegmentMediadataVideo struct {
18 Width int `json:"width"`
19 Height int `json:"height"`
20 FPSNum int `json:"fpsNum"`
21 FPSDen int `json:"fpsDen"`
22 BFrames bool `json:"bframes"`
23}
24
25type SegmentMediadataAudio struct {
26 Rate int `json:"rate"`
27 Channels int `json:"channels"`
28}
29
30type SegmentMediaData struct {
31 Video []*SegmentMediadataVideo `json:"video"`
32 Audio []*SegmentMediadataAudio `json:"audio"`
33 Duration int64 `json:"duration"`
34 Size int `json:"size"`
35}
36
37// Scan scan value into Jsonb, implements sql.Scanner interface
38func (j *SegmentMediaData) Scan(value any) error {
39 bytes, ok := value.([]byte)
40 if !ok {
41 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
42 }
43
44 result := SegmentMediaData{}
45 err := json.Unmarshal(bytes, &result)
46 *j = SegmentMediaData(result)
47 return err
48}
49
50// Value return json value, implement driver.Valuer interface
51func (j SegmentMediaData) Value() (driver.Value, error) {
52 return json.Marshal(j)
53}
54
55// ContentRights represents content rights and attribution information
56type ContentRights struct {
57 CopyrightNotice *string `json:"copyrightNotice,omitempty"`
58 CopyrightYear *int64 `json:"copyrightYear,omitempty"`
59 Creator *string `json:"creator,omitempty"`
60 CreditLine *string `json:"creditLine,omitempty"`
61 License *string `json:"license,omitempty"`
62}
63
64// Scan scan value into ContentRights, implements sql.Scanner interface
65func (c *ContentRights) Scan(value any) error {
66 if value == nil {
67 *c = ContentRights{}
68 return nil
69 }
70 bytes, ok := value.([]byte)
71 if !ok {
72 return errors.New(fmt.Sprint("Failed to unmarshal ContentRights value:", value))
73 }
74
75 result := ContentRights{}
76 err := json.Unmarshal(bytes, &result)
77 *c = ContentRights(result)
78 return err
79}
80
81// Value return json value, implement driver.Valuer interface
82func (c ContentRights) Value() (driver.Value, error) {
83 return json.Marshal(c)
84}
85
86// DistributionPolicy represents distribution policy information
87type DistributionPolicy struct {
88 DeleteAfterSeconds *int64 `json:"deleteAfterSeconds,omitempty"`
89}
90
91// Scan scan value into DistributionPolicy, implements sql.Scanner interface
92func (d *DistributionPolicy) Scan(value any) error {
93 if value == nil {
94 *d = DistributionPolicy{}
95 return nil
96 }
97 bytes, ok := value.([]byte)
98 if !ok {
99 return errors.New(fmt.Sprint("Failed to unmarshal DistributionPolicy value:", value))
100 }
101
102 result := DistributionPolicy{}
103 err := json.Unmarshal(bytes, &result)
104 *d = DistributionPolicy(result)
105 return err
106}
107
108// Value return json value, implement driver.Valuer interface
109func (d DistributionPolicy) Value() (driver.Value, error) {
110 return json.Marshal(d)
111}
112
113// ContentWarningsSlice is a custom type for storing content warnings as JSON in the database
114type ContentWarningsSlice []string
115
116// Scan scan value into ContentWarningsSlice, implements sql.Scanner interface
117func (c *ContentWarningsSlice) Scan(value any) error {
118 if value == nil {
119 *c = ContentWarningsSlice{}
120 return nil
121 }
122 bytes, ok := value.([]byte)
123 if !ok {
124 return errors.New(fmt.Sprint("Failed to unmarshal ContentWarningsSlice value:", value))
125 }
126
127 result := ContentWarningsSlice{}
128 err := json.Unmarshal(bytes, &result)
129 *c = ContentWarningsSlice(result)
130 return err
131}
132
133// Value return json value, implement driver.Valuer interface
134func (c ContentWarningsSlice) Value() (driver.Value, error) {
135 return json.Marshal(c)
136}
137
138type Segment struct {
139 ID string `json:"id" gorm:"primaryKey"`
140 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
141 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
142 StartTime time.Time `json:"startTime" gorm:"index:latest_segments,priority:2;index:start_time"`
143 RepoDID string `json:"repoDID" gorm:"index:latest_segments,priority:1;column:repo_did"`
144 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
145 Title string `json:"title"`
146 Size int `json:"size" gorm:"column:size"`
147 MediaData *SegmentMediaData `json:"mediaData,omitempty"`
148 ContentWarnings ContentWarningsSlice `json:"contentWarnings,omitempty"`
149 ContentRights *ContentRights `json:"contentRights,omitempty"`
150 DistributionPolicy *DistributionPolicy `json:"distributionPolicy,omitempty"`
151 DeleteAfter *time.Time `json:"deleteAfter,omitempty" gorm:"column:delete_after;index:delete_after"`
152}
153
154func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) {
155 aqt := aqtime.FromTime(s.StartTime)
156 if s.MediaData == nil {
157 return nil, fmt.Errorf("media data is nil")
158 }
159 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil {
160 return nil, fmt.Errorf("video data is nil")
161 }
162 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil {
163 return nil, fmt.Errorf("audio data is nil")
164 }
165 duration := s.MediaData.Duration
166 sizei64 := int64(s.Size)
167
168 // Convert model metadata to streamplace metadata
169 var contentRights *streamplace.MetadataContentRights
170 if s.ContentRights != nil {
171 contentRights = &streamplace.MetadataContentRights{
172 CopyrightNotice: s.ContentRights.CopyrightNotice,
173 CopyrightYear: s.ContentRights.CopyrightYear,
174 Creator: s.ContentRights.Creator,
175 CreditLine: s.ContentRights.CreditLine,
176 License: s.ContentRights.License,
177 }
178 }
179
180 var contentWarnings *streamplace.MetadataContentWarnings
181 if len(s.ContentWarnings) > 0 {
182 contentWarnings = &streamplace.MetadataContentWarnings{
183 Warnings: []string(s.ContentWarnings),
184 }
185 }
186
187 var distributionPolicy *streamplace.MetadataDistributionPolicy
188 if s.DistributionPolicy != nil && s.DistributionPolicy.DeleteAfterSeconds != nil {
189 distributionPolicy = &streamplace.MetadataDistributionPolicy{
190 DeleteAfter: s.DistributionPolicy.DeleteAfterSeconds,
191 }
192 }
193
194 return &streamplace.Segment{
195 LexiconTypeID: "place.stream.segment",
196 Creator: s.RepoDID,
197 Id: s.ID,
198 SigningKey: s.SigningKeyDID,
199 StartTime: string(aqt),
200 Duration: &duration,
201 Size: &sizei64,
202 ContentRights: contentRights,
203 ContentWarnings: contentWarnings,
204 DistributionPolicy: distributionPolicy,
205 Video: []*streamplace.Segment_Video{
206 {
207 Codec: "h264",
208 Width: int64(s.MediaData.Video[0].Width),
209 Height: int64(s.MediaData.Video[0].Height),
210 Framerate: &streamplace.Segment_Framerate{
211 Num: int64(s.MediaData.Video[0].FPSNum),
212 Den: int64(s.MediaData.Video[0].FPSDen),
213 },
214 Bframes: &s.MediaData.Video[0].BFrames,
215 },
216 },
217 Audio: []*streamplace.Segment_Audio{
218 {
219 Codec: "opus",
220 Rate: int64(s.MediaData.Audio[0].Rate),
221 Channels: int64(s.MediaData.Audio[0].Channels),
222 },
223 },
224 }, nil
225}
226
227func (m *DBModel) CreateSegment(seg *Segment) error {
228 err := m.DB.Model(Segment{}).Create(seg).Error
229 if err != nil {
230 return err
231 }
232 return nil
233}
234
235// should return the most recent segment for each user, ordered by most recent first
236// only includes segments from the last 30 seconds
237func (m *DBModel) MostRecentSegments() ([]Segment, error) {
238 var segments []Segment
239 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
240
241 err := m.DB.Table("segments").
242 Select("segments.*").
243 Where("start_time > ?", thirtySecondsAgo.UTC()).
244 Order("start_time DESC").
245 Find(&segments).Error
246 if err != nil {
247 return nil, err
248 }
249 if segments == nil {
250 return []Segment{}, nil
251 }
252
253 segmentMap := make(map[string]Segment)
254 for _, seg := range segments {
255 prev, ok := segmentMap[seg.RepoDID]
256 if !ok {
257 segmentMap[seg.RepoDID] = seg
258 } else {
259 if seg.StartTime.After(prev.StartTime) {
260 segmentMap[seg.RepoDID] = seg
261 }
262 }
263 }
264
265 filteredSegments := []Segment{}
266 for _, seg := range segmentMap {
267 filteredSegments = append(filteredSegments, seg)
268 }
269
270 return filteredSegments, nil
271}
272
273func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) {
274 var seg Segment
275 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error
276 if err != nil {
277 return nil, err
278 }
279 return &seg, nil
280}
281
282func (m *DBModel) FilterLiveRepoDIDs(repoDIDs []string) ([]string, error) {
283 if len(repoDIDs) == 0 {
284 return []string{}, nil
285 }
286
287 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
288
289 var liveDIDs []string
290
291 err := m.DB.Table("segments").
292 Select("DISTINCT repo_did").
293 Where("repo_did IN ? AND start_time > ?", repoDIDs, thirtySecondsAgo.UTC()).
294 Pluck("repo_did", &liveDIDs).Error
295
296 if err != nil {
297 return nil, err
298 }
299
300 return liveDIDs, nil
301}
302
303func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) {
304 var segs []Segment
305 if before == nil {
306 later := time.Now().Add(1000 * time.Hour)
307 before = &later
308 }
309 if after == nil {
310 earlier := time.Time{}
311 after = &earlier
312 }
313 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error
314 if err != nil {
315 return nil, err
316 }
317 return segs, nil
318}
319
320func (m *DBModel) GetSegment(id string) (*Segment, error) {
321 var seg Segment
322
323 err := m.DB.Model(&Segment{}).
324 Preload("Repo").
325 Where("id = ?", id).
326 First(&seg).Error
327
328 if errors.Is(err, gorm.ErrRecordNotFound) {
329 return nil, nil
330 }
331 if err != nil {
332 return nil, err
333 }
334
335 return &seg, nil
336}
337
338func (m *DBModel) GetExpiredSegments(ctx context.Context) ([]Segment, error) {
339
340 var expiredSegments []Segment
341 now := time.Now()
342 err := m.DB.
343 Where("delete_after IS NOT NULL AND delete_after < ?", now.UTC()).
344 Find(&expiredSegments).Error
345 if err != nil {
346 return nil, err
347 }
348
349 return expiredSegments, nil
350}
351
352func (m *DBModel) DeleteSegment(ctx context.Context, id string) error {
353 return m.DB.Delete(&Segment{}, "id = ?", id).Error
354}
355
356func (m *DBModel) StartSegmentCleaner(ctx context.Context) error {
357 err := m.SegmentCleaner(ctx)
358 if err != nil {
359 return err
360 }
361 ticker := time.NewTicker(1 * time.Minute)
362 defer ticker.Stop()
363
364 for {
365 select {
366 case <-ctx.Done():
367 return nil
368 case <-ticker.C:
369 err := m.SegmentCleaner(ctx)
370 if err != nil {
371 log.Error(ctx, "Failed to clean segments", "error", err)
372 }
373 }
374 }
375}
376
377func (m *DBModel) SegmentCleaner(ctx context.Context) error {
378 // Calculate the cutoff time (10 minutes ago)
379 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time()
380
381 // Find all unique repo_did values
382 var repoDIDs []string
383 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil {
384 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err)
385 return err
386 }
387
388 // For each user, keep their last 10 segments and delete older ones
389 for _, repoDID := range repoDIDs {
390 // Get IDs of the last 10 segments for this user
391 var keepSegmentIDs []string
392 if err := m.DB.Model(&Segment{}).
393 Where("repo_did = ?", repoDID).
394 Order("start_time DESC").
395 Limit(10).
396 Pluck("id", &keepSegmentIDs).Error; err != nil {
397 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err)
398 return err
399 }
400
401 // Delete old segments except the ones we want to keep
402 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?",
403 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{})
404
405 if result.Error != nil {
406 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error)
407 } else if result.RowsAffected > 0 {
408 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected)
409 }
410 }
411 return nil
412}