Live video on the AT Protocol
at natb/loader-under-ui 412 lines 12 kB view raw
1package model 2 3import ( 4 "context" 5 "database/sql/driver" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "time" 10 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/streamplace" 15) 16 17type SegmentMediadataVideo struct { 18 Width int `json:"width"` 19 Height int `json:"height"` 20 FPSNum int `json:"fpsNum"` 21 FPSDen int `json:"fpsDen"` 22 BFrames bool `json:"bframes"` 23} 24 25type SegmentMediadataAudio struct { 26 Rate int `json:"rate"` 27 Channels int `json:"channels"` 28} 29 30type SegmentMediaData struct { 31 Video []*SegmentMediadataVideo `json:"video"` 32 Audio []*SegmentMediadataAudio `json:"audio"` 33 Duration int64 `json:"duration"` 34 Size int `json:"size"` 35} 36 37// Scan scan value into Jsonb, implements sql.Scanner interface 38func (j *SegmentMediaData) Scan(value any) error { 39 bytes, ok := value.([]byte) 40 if !ok { 41 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 42 } 43 44 result := SegmentMediaData{} 45 err := json.Unmarshal(bytes, &result) 46 *j = SegmentMediaData(result) 47 return err 48} 49 50// Value return json value, implement driver.Valuer interface 51func (j SegmentMediaData) Value() (driver.Value, error) { 52 return json.Marshal(j) 53} 54 55// ContentRights represents content rights and attribution information 56type ContentRights struct { 57 CopyrightNotice *string `json:"copyrightNotice,omitempty"` 58 CopyrightYear *int64 `json:"copyrightYear,omitempty"` 59 Creator *string `json:"creator,omitempty"` 60 CreditLine *string `json:"creditLine,omitempty"` 61 License *string `json:"license,omitempty"` 62} 63 64// Scan scan value into ContentRights, implements sql.Scanner interface 65func (c *ContentRights) Scan(value any) error { 66 if value == nil { 67 *c = ContentRights{} 68 return nil 69 } 70 bytes, ok := value.([]byte) 71 if !ok { 72 return errors.New(fmt.Sprint("Failed to unmarshal ContentRights value:", value)) 73 } 74 75 result := ContentRights{} 76 err := json.Unmarshal(bytes, &result) 77 *c = ContentRights(result) 78 return err 79} 80 81// Value return json value, implement driver.Valuer interface 82func (c ContentRights) Value() (driver.Value, error) { 83 return json.Marshal(c) 84} 85 86// DistributionPolicy represents distribution policy information 87type DistributionPolicy struct { 88 DeleteAfterSeconds *int64 `json:"deleteAfterSeconds,omitempty"` 89} 90 91// Scan scan value into DistributionPolicy, implements sql.Scanner interface 92func (d *DistributionPolicy) Scan(value any) error { 93 if value == nil { 94 *d = DistributionPolicy{} 95 return nil 96 } 97 bytes, ok := value.([]byte) 98 if !ok { 99 return errors.New(fmt.Sprint("Failed to unmarshal DistributionPolicy value:", value)) 100 } 101 102 result := DistributionPolicy{} 103 err := json.Unmarshal(bytes, &result) 104 *d = DistributionPolicy(result) 105 return err 106} 107 108// Value return json value, implement driver.Valuer interface 109func (d DistributionPolicy) Value() (driver.Value, error) { 110 return json.Marshal(d) 111} 112 113// ContentWarningsSlice is a custom type for storing content warnings as JSON in the database 114type ContentWarningsSlice []string 115 116// Scan scan value into ContentWarningsSlice, implements sql.Scanner interface 117func (c *ContentWarningsSlice) Scan(value any) error { 118 if value == nil { 119 *c = ContentWarningsSlice{} 120 return nil 121 } 122 bytes, ok := value.([]byte) 123 if !ok { 124 return errors.New(fmt.Sprint("Failed to unmarshal ContentWarningsSlice value:", value)) 125 } 126 127 result := ContentWarningsSlice{} 128 err := json.Unmarshal(bytes, &result) 129 *c = ContentWarningsSlice(result) 130 return err 131} 132 133// Value return json value, implement driver.Valuer interface 134func (c ContentWarningsSlice) Value() (driver.Value, error) { 135 return json.Marshal(c) 136} 137 138type Segment struct { 139 ID string `json:"id" gorm:"primaryKey"` 140 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 141 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"` 142 StartTime time.Time `json:"startTime" gorm:"index:latest_segments,priority:2;index:start_time"` 143 RepoDID string `json:"repoDID" gorm:"index:latest_segments,priority:1;column:repo_did"` 144 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 145 Title string `json:"title"` 146 Size int `json:"size" gorm:"column:size"` 147 MediaData *SegmentMediaData `json:"mediaData,omitempty"` 148 ContentWarnings ContentWarningsSlice `json:"contentWarnings,omitempty"` 149 ContentRights *ContentRights `json:"contentRights,omitempty"` 150 DistributionPolicy *DistributionPolicy `json:"distributionPolicy,omitempty"` 151 DeleteAfter *time.Time `json:"deleteAfter,omitempty" gorm:"column:delete_after;index:delete_after"` 152} 153 154func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 155 aqt := aqtime.FromTime(s.StartTime) 156 if s.MediaData == nil { 157 return nil, fmt.Errorf("media data is nil") 158 } 159 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 160 return nil, fmt.Errorf("video data is nil") 161 } 162 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 163 return nil, fmt.Errorf("audio data is nil") 164 } 165 duration := s.MediaData.Duration 166 sizei64 := int64(s.Size) 167 168 // Convert model metadata to streamplace metadata 169 var contentRights *streamplace.MetadataContentRights 170 if s.ContentRights != nil { 171 contentRights = &streamplace.MetadataContentRights{ 172 CopyrightNotice: s.ContentRights.CopyrightNotice, 173 CopyrightYear: s.ContentRights.CopyrightYear, 174 Creator: s.ContentRights.Creator, 175 CreditLine: s.ContentRights.CreditLine, 176 License: s.ContentRights.License, 177 } 178 } 179 180 var contentWarnings *streamplace.MetadataContentWarnings 181 if len(s.ContentWarnings) > 0 { 182 contentWarnings = &streamplace.MetadataContentWarnings{ 183 Warnings: []string(s.ContentWarnings), 184 } 185 } 186 187 var distributionPolicy *streamplace.MetadataDistributionPolicy 188 if s.DistributionPolicy != nil && s.DistributionPolicy.DeleteAfterSeconds != nil { 189 distributionPolicy = &streamplace.MetadataDistributionPolicy{ 190 DeleteAfter: s.DistributionPolicy.DeleteAfterSeconds, 191 } 192 } 193 194 return &streamplace.Segment{ 195 LexiconTypeID: "place.stream.segment", 196 Creator: s.RepoDID, 197 Id: s.ID, 198 SigningKey: s.SigningKeyDID, 199 StartTime: string(aqt), 200 Duration: &duration, 201 Size: &sizei64, 202 ContentRights: contentRights, 203 ContentWarnings: contentWarnings, 204 DistributionPolicy: distributionPolicy, 205 Video: []*streamplace.Segment_Video{ 206 { 207 Codec: "h264", 208 Width: int64(s.MediaData.Video[0].Width), 209 Height: int64(s.MediaData.Video[0].Height), 210 Framerate: &streamplace.Segment_Framerate{ 211 Num: int64(s.MediaData.Video[0].FPSNum), 212 Den: int64(s.MediaData.Video[0].FPSDen), 213 }, 214 Bframes: &s.MediaData.Video[0].BFrames, 215 }, 216 }, 217 Audio: []*streamplace.Segment_Audio{ 218 { 219 Codec: "opus", 220 Rate: int64(s.MediaData.Audio[0].Rate), 221 Channels: int64(s.MediaData.Audio[0].Channels), 222 }, 223 }, 224 }, nil 225} 226 227func (m *DBModel) CreateSegment(seg *Segment) error { 228 err := m.DB.Model(Segment{}).Create(seg).Error 229 if err != nil { 230 return err 231 } 232 return nil 233} 234 235// should return the most recent segment for each user, ordered by most recent first 236// only includes segments from the last 30 seconds 237func (m *DBModel) MostRecentSegments() ([]Segment, error) { 238 var segments []Segment 239 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 240 241 err := m.DB.Table("segments"). 242 Select("segments.*"). 243 Where("start_time > ?", thirtySecondsAgo.UTC()). 244 Order("start_time DESC"). 245 Find(&segments).Error 246 if err != nil { 247 return nil, err 248 } 249 if segments == nil { 250 return []Segment{}, nil 251 } 252 253 segmentMap := make(map[string]Segment) 254 for _, seg := range segments { 255 prev, ok := segmentMap[seg.RepoDID] 256 if !ok { 257 segmentMap[seg.RepoDID] = seg 258 } else { 259 if seg.StartTime.After(prev.StartTime) { 260 segmentMap[seg.RepoDID] = seg 261 } 262 } 263 } 264 265 filteredSegments := []Segment{} 266 for _, seg := range segmentMap { 267 filteredSegments = append(filteredSegments, seg) 268 } 269 270 return filteredSegments, nil 271} 272 273func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) { 274 var seg Segment 275 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 276 if err != nil { 277 return nil, err 278 } 279 return &seg, nil 280} 281 282func (m *DBModel) FilterLiveRepoDIDs(repoDIDs []string) ([]string, error) { 283 if len(repoDIDs) == 0 { 284 return []string{}, nil 285 } 286 287 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 288 289 var liveDIDs []string 290 291 err := m.DB.Table("segments"). 292 Select("DISTINCT repo_did"). 293 Where("repo_did IN ? AND start_time > ?", repoDIDs, thirtySecondsAgo.UTC()). 294 Pluck("repo_did", &liveDIDs).Error 295 296 if err != nil { 297 return nil, err 298 } 299 300 return liveDIDs, nil 301} 302 303func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) { 304 var segs []Segment 305 if before == nil { 306 later := time.Now().Add(1000 * time.Hour) 307 before = &later 308 } 309 if after == nil { 310 earlier := time.Time{} 311 after = &earlier 312 } 313 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 314 if err != nil { 315 return nil, err 316 } 317 return segs, nil 318} 319 320func (m *DBModel) GetSegment(id string) (*Segment, error) { 321 var seg Segment 322 323 err := m.DB.Model(&Segment{}). 324 Preload("Repo"). 325 Where("id = ?", id). 326 First(&seg).Error 327 328 if errors.Is(err, gorm.ErrRecordNotFound) { 329 return nil, nil 330 } 331 if err != nil { 332 return nil, err 333 } 334 335 return &seg, nil 336} 337 338func (m *DBModel) GetExpiredSegments(ctx context.Context) ([]Segment, error) { 339 340 var expiredSegments []Segment 341 now := time.Now() 342 err := m.DB. 343 Where("delete_after IS NOT NULL AND delete_after < ?", now.UTC()). 344 Find(&expiredSegments).Error 345 if err != nil { 346 return nil, err 347 } 348 349 return expiredSegments, nil 350} 351 352func (m *DBModel) DeleteSegment(ctx context.Context, id string) error { 353 return m.DB.Delete(&Segment{}, "id = ?", id).Error 354} 355 356func (m *DBModel) StartSegmentCleaner(ctx context.Context) error { 357 err := m.SegmentCleaner(ctx) 358 if err != nil { 359 return err 360 } 361 ticker := time.NewTicker(1 * time.Minute) 362 defer ticker.Stop() 363 364 for { 365 select { 366 case <-ctx.Done(): 367 return nil 368 case <-ticker.C: 369 err := m.SegmentCleaner(ctx) 370 if err != nil { 371 log.Error(ctx, "Failed to clean segments", "error", err) 372 } 373 } 374 } 375} 376 377func (m *DBModel) SegmentCleaner(ctx context.Context) error { 378 // Calculate the cutoff time (10 minutes ago) 379 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 380 381 // Find all unique repo_did values 382 var repoDIDs []string 383 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 384 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 385 return err 386 } 387 388 // For each user, keep their last 10 segments and delete older ones 389 for _, repoDID := range repoDIDs { 390 // Get IDs of the last 10 segments for this user 391 var keepSegmentIDs []string 392 if err := m.DB.Model(&Segment{}). 393 Where("repo_did = ?", repoDID). 394 Order("start_time DESC"). 395 Limit(10). 396 Pluck("id", &keepSegmentIDs).Error; err != nil { 397 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 398 return err 399 } 400 401 // Delete old segments except the ones we want to keep 402 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 403 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 404 405 if result.Error != nil { 406 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 407 } else if result.RowsAffected > 0 { 408 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 409 } 410 } 411 return nil 412}