Live video on the AT Protocol
1package localdb
2
3import (
4 "context"
5 "database/sql/driver"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/streamplace"
15)
16
17type SegmentMediadataVideo struct {
18 Width int `json:"width"`
19 Height int `json:"height"`
20 FPSNum int `json:"fpsNum"`
21 FPSDen int `json:"fpsDen"`
22 BFrames bool `json:"bframes"`
23}
24
25type SegmentMediadataAudio struct {
26 Rate int `json:"rate"`
27 Channels int `json:"channels"`
28}
29
30type SegmentMediaData struct {
31 Video []*SegmentMediadataVideo `json:"video"`
32 Audio []*SegmentMediadataAudio `json:"audio"`
33 Duration int64 `json:"duration"`
34 Size int `json:"size"`
35}
36
37// Scan scan value into Jsonb, implements sql.Scanner interface
38func (j *SegmentMediaData) Scan(value any) error {
39 bytes, ok := value.([]byte)
40 if !ok {
41 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
42 }
43
44 result := SegmentMediaData{}
45 err := json.Unmarshal(bytes, &result)
46 *j = SegmentMediaData(result)
47 return err
48}
49
50// Value return json value, implement driver.Valuer interface
51func (j SegmentMediaData) Value() (driver.Value, error) {
52 return json.Marshal(j)
53}
54
55// ContentRights represents content rights and attribution information
56type ContentRights struct {
57 CopyrightNotice *string `json:"copyrightNotice,omitempty"`
58 CopyrightYear *int64 `json:"copyrightYear,omitempty"`
59 Creator *string `json:"creator,omitempty"`
60 CreditLine *string `json:"creditLine,omitempty"`
61 License *string `json:"license,omitempty"`
62}
63
64// Scan scan value into ContentRights, implements sql.Scanner interface
65func (c *ContentRights) Scan(value any) error {
66 if value == nil {
67 *c = ContentRights{}
68 return nil
69 }
70 bytes, ok := value.([]byte)
71 if !ok {
72 return errors.New(fmt.Sprint("Failed to unmarshal ContentRights value:", value))
73 }
74
75 result := ContentRights{}
76 err := json.Unmarshal(bytes, &result)
77 *c = ContentRights(result)
78 return err
79}
80
81// Value return json value, implement driver.Valuer interface
82func (c ContentRights) Value() (driver.Value, error) {
83 return json.Marshal(c)
84}
85
86// DistributionPolicy represents distribution policy information
87type DistributionPolicy struct {
88 DeleteAfterSeconds *int64 `json:"deleteAfterSeconds,omitempty"`
89}
90
91// Scan scan value into DistributionPolicy, implements sql.Scanner interface
92func (d *DistributionPolicy) Scan(value any) error {
93 if value == nil {
94 *d = DistributionPolicy{}
95 return nil
96 }
97 bytes, ok := value.([]byte)
98 if !ok {
99 return errors.New(fmt.Sprint("Failed to unmarshal DistributionPolicy value:", value))
100 }
101
102 result := DistributionPolicy{}
103 err := json.Unmarshal(bytes, &result)
104 *d = DistributionPolicy(result)
105 return err
106}
107
108// Value return json value, implement driver.Valuer interface
109func (d DistributionPolicy) Value() (driver.Value, error) {
110 return json.Marshal(d)
111}
112
113// ContentWarningsSlice is a custom type for storing content warnings as JSON in the database
114type ContentWarningsSlice []string
115
116// Scan scan value into ContentWarningsSlice, implements sql.Scanner interface
117func (c *ContentWarningsSlice) Scan(value any) error {
118 if value == nil {
119 *c = ContentWarningsSlice{}
120 return nil
121 }
122 bytes, ok := value.([]byte)
123 if !ok {
124 return errors.New(fmt.Sprint("Failed to unmarshal ContentWarningsSlice value:", value))
125 }
126
127 result := ContentWarningsSlice{}
128 err := json.Unmarshal(bytes, &result)
129 *c = ContentWarningsSlice(result)
130 return err
131}
132
133// Value return json value, implement driver.Valuer interface
134func (c ContentWarningsSlice) Value() (driver.Value, error) {
135 return json.Marshal(c)
136}
137
138type Segment struct {
139 ID string `json:"id" gorm:"primaryKey"`
140 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
141 StartTime time.Time `json:"startTime" gorm:"index:latest_segments,priority:2;index:start_time"`
142 RepoDID string `json:"repoDID" gorm:"index:latest_segments,priority:1;column:repo_did"`
143 Title string `json:"title"`
144 Size int `json:"size" gorm:"column:size"`
145 MediaData *SegmentMediaData `json:"mediaData,omitempty"`
146 ContentWarnings ContentWarningsSlice `json:"contentWarnings,omitempty"`
147 ContentRights *ContentRights `json:"contentRights,omitempty"`
148 DistributionPolicy *DistributionPolicy `json:"distributionPolicy,omitempty"`
149 DeleteAfter *time.Time `json:"deleteAfter,omitempty" gorm:"column:delete_after;index:delete_after"`
150}
151
152func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) {
153 aqt := aqtime.FromTime(s.StartTime)
154 if s.MediaData == nil {
155 return nil, fmt.Errorf("media data is nil")
156 }
157 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil {
158 return nil, fmt.Errorf("video data is nil")
159 }
160 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil {
161 return nil, fmt.Errorf("audio data is nil")
162 }
163 duration := s.MediaData.Duration
164 sizei64 := int64(s.Size)
165
166 // Convert model metadata to streamplace metadata
167 var contentRights *streamplace.MetadataContentRights
168 if s.ContentRights != nil {
169 contentRights = &streamplace.MetadataContentRights{
170 CopyrightNotice: s.ContentRights.CopyrightNotice,
171 CopyrightYear: s.ContentRights.CopyrightYear,
172 Creator: s.ContentRights.Creator,
173 CreditLine: s.ContentRights.CreditLine,
174 License: s.ContentRights.License,
175 }
176 }
177
178 var contentWarnings *streamplace.MetadataContentWarnings
179 if len(s.ContentWarnings) > 0 {
180 contentWarnings = &streamplace.MetadataContentWarnings{
181 Warnings: []string(s.ContentWarnings),
182 }
183 }
184
185 var distributionPolicy *streamplace.MetadataDistributionPolicy
186 if s.DistributionPolicy != nil && s.DistributionPolicy.DeleteAfterSeconds != nil {
187 distributionPolicy = &streamplace.MetadataDistributionPolicy{
188 DeleteAfter: s.DistributionPolicy.DeleteAfterSeconds,
189 }
190 }
191
192 return &streamplace.Segment{
193 LexiconTypeID: "place.stream.segment",
194 Creator: s.RepoDID,
195 Id: s.ID,
196 SigningKey: s.SigningKeyDID,
197 StartTime: string(aqt),
198 Duration: &duration,
199 Size: &sizei64,
200 ContentRights: contentRights,
201 ContentWarnings: contentWarnings,
202 DistributionPolicy: distributionPolicy,
203 Video: []*streamplace.Segment_Video{
204 {
205 Codec: "h264",
206 Width: int64(s.MediaData.Video[0].Width),
207 Height: int64(s.MediaData.Video[0].Height),
208 Framerate: &streamplace.Segment_Framerate{
209 Num: int64(s.MediaData.Video[0].FPSNum),
210 Den: int64(s.MediaData.Video[0].FPSDen),
211 },
212 Bframes: &s.MediaData.Video[0].BFrames,
213 },
214 },
215 Audio: []*streamplace.Segment_Audio{
216 {
217 Codec: "opus",
218 Rate: int64(s.MediaData.Audio[0].Rate),
219 Channels: int64(s.MediaData.Audio[0].Channels),
220 },
221 },
222 }, nil
223}
224
225func (m *LocalDatabase) CreateSegment(seg *Segment) error {
226 err := m.DB.Model(Segment{}).Create(seg).Error
227 if err != nil {
228 return err
229 }
230 return nil
231}
232
233// should return the most recent segment for each user, ordered by most recent first
234// only includes segments from the last 30 seconds
235func (m *LocalDatabase) MostRecentSegments() ([]Segment, error) {
236 var segments []Segment
237 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
238
239 err := m.DB.Table("segments").
240 Select("segments.*").
241 Where("start_time > ?", thirtySecondsAgo.UTC()).
242 Order("start_time DESC").
243 Find(&segments).Error
244 if err != nil {
245 return nil, err
246 }
247 if segments == nil {
248 return []Segment{}, nil
249 }
250
251 segmentMap := make(map[string]Segment)
252 for _, seg := range segments {
253 prev, ok := segmentMap[seg.RepoDID]
254 if !ok {
255 segmentMap[seg.RepoDID] = seg
256 } else {
257 if seg.StartTime.After(prev.StartTime) {
258 segmentMap[seg.RepoDID] = seg
259 }
260 }
261 }
262
263 filteredSegments := []Segment{}
264 for _, seg := range segmentMap {
265 filteredSegments = append(filteredSegments, seg)
266 }
267
268 return filteredSegments, nil
269}
270
271func (m *LocalDatabase) LatestSegmentForUser(user string) (*Segment, error) {
272 var seg Segment
273 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error
274 if err != nil {
275 return nil, err
276 }
277 return &seg, nil
278}
279
280func (m *LocalDatabase) FilterLiveRepoDIDs(repoDIDs []string) ([]string, error) {
281 if len(repoDIDs) == 0 {
282 return []string{}, nil
283 }
284
285 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
286
287 var liveDIDs []string
288
289 err := m.DB.Table("segments").
290 Select("DISTINCT repo_did").
291 Where("repo_did IN ? AND start_time > ?", repoDIDs, thirtySecondsAgo.UTC()).
292 Pluck("repo_did", &liveDIDs).Error
293
294 if err != nil {
295 return nil, err
296 }
297
298 return liveDIDs, nil
299}
300
301func (m *LocalDatabase) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) {
302 var segs []Segment
303 if before == nil {
304 later := time.Now().Add(1000 * time.Hour)
305 before = &later
306 }
307 if after == nil {
308 earlier := time.Time{}
309 after = &earlier
310 }
311 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error
312 if err != nil {
313 return nil, err
314 }
315 return segs, nil
316}
317
318func (m *LocalDatabase) GetSegment(id string) (*Segment, error) {
319 var seg Segment
320
321 err := m.DB.Model(&Segment{}).
322 Preload("Repo").
323 Where("id = ?", id).
324 First(&seg).Error
325
326 if errors.Is(err, gorm.ErrRecordNotFound) {
327 return nil, nil
328 }
329 if err != nil {
330 return nil, err
331 }
332
333 return &seg, nil
334}
335
336func (m *LocalDatabase) GetExpiredSegments(ctx context.Context) ([]Segment, error) {
337
338 var expiredSegments []Segment
339 now := time.Now()
340 err := m.DB.
341 Where("delete_after IS NOT NULL AND delete_after < ?", now.UTC()).
342 Find(&expiredSegments).Error
343 if err != nil {
344 return nil, err
345 }
346
347 return expiredSegments, nil
348}
349
350func (m *LocalDatabase) DeleteSegment(ctx context.Context, id string) error {
351 return m.DB.Delete(&Segment{}, "id = ?", id).Error
352}
353
354func (m *LocalDatabase) StartSegmentCleaner(ctx context.Context) error {
355 err := m.SegmentCleaner(ctx)
356 if err != nil {
357 return err
358 }
359 ticker := time.NewTicker(1 * time.Minute)
360 defer ticker.Stop()
361
362 for {
363 select {
364 case <-ctx.Done():
365 return nil
366 case <-ticker.C:
367 err := m.SegmentCleaner(ctx)
368 if err != nil {
369 log.Error(ctx, "Failed to clean segments", "error", err)
370 }
371 }
372 }
373}
374
375func (m *LocalDatabase) SegmentCleaner(ctx context.Context) error {
376 // Calculate the cutoff time (10 minutes ago)
377 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time()
378
379 // Find all unique repo_did values
380 var repoDIDs []string
381 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil {
382 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err)
383 return err
384 }
385
386 // For each user, keep their last 10 segments and delete older ones
387 for _, repoDID := range repoDIDs {
388 // Get IDs of the last 10 segments for this user
389 var keepSegmentIDs []string
390 if err := m.DB.Model(&Segment{}).
391 Where("repo_did = ?", repoDID).
392 Order("start_time DESC").
393 Limit(10).
394 Pluck("id", &keepSegmentIDs).Error; err != nil {
395 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err)
396 return err
397 }
398
399 // Delete old segments except the ones we want to keep
400 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?",
401 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{})
402
403 if result.Error != nil {
404 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error)
405 } else if result.RowsAffected > 0 {
406 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected)
407 }
408 }
409 return nil
410}