Live video on the AT Protocol
at natb/sync-client-time 395 lines 12 kB view raw
1package model 2 3import ( 4 "context" 5 "database/sql/driver" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "time" 10 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/streamplace" 15) 16 17type SegmentMediadataVideo struct { 18 Width int `json:"width"` 19 Height int `json:"height"` 20 FPSNum int `json:"fpsNum"` 21 FPSDen int `json:"fpsDen"` 22 BFrames bool `json:"bframes"` 23} 24 25type SegmentMediadataAudio struct { 26 Rate int `json:"rate"` 27 Channels int `json:"channels"` 28} 29 30type SegmentMediaData struct { 31 Video []*SegmentMediadataVideo `json:"video"` 32 Audio []*SegmentMediadataAudio `json:"audio"` 33 Duration int64 `json:"duration"` 34 Size int `json:"size"` 35} 36 37// Scan scan value into Jsonb, implements sql.Scanner interface 38func (j *SegmentMediaData) Scan(value any) error { 39 bytes, ok := value.([]byte) 40 if !ok { 41 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 42 } 43 44 result := SegmentMediaData{} 45 err := json.Unmarshal(bytes, &result) 46 *j = SegmentMediaData(result) 47 return err 48} 49 50// Value return json value, implement driver.Valuer interface 51func (j SegmentMediaData) Value() (driver.Value, error) { 52 return json.Marshal(j) 53} 54 55// ContentRights represents content rights and attribution information 56type ContentRights struct { 57 CopyrightNotice *string `json:"copyrightNotice,omitempty"` 58 CopyrightYear *int64 `json:"copyrightYear,omitempty"` 59 Creator *string `json:"creator,omitempty"` 60 CreditLine *string `json:"creditLine,omitempty"` 61 License *string `json:"license,omitempty"` 62} 63 64// Scan scan value into ContentRights, implements sql.Scanner interface 65func (c *ContentRights) Scan(value any) error { 66 if value == nil { 67 *c = ContentRights{} 68 return nil 69 } 70 bytes, ok := value.([]byte) 71 if !ok { 72 return errors.New(fmt.Sprint("Failed to unmarshal ContentRights value:", value)) 73 } 74 75 result := ContentRights{} 76 err := json.Unmarshal(bytes, &result) 77 *c = ContentRights(result) 78 return err 79} 80 81// Value return json value, implement driver.Valuer interface 82func (c ContentRights) Value() (driver.Value, error) { 83 return json.Marshal(c) 84} 85 86// DistributionPolicy represents distribution policy information 87type DistributionPolicy struct { 88 ExpiresAt *time.Time `json:"expiresAt,omitempty"` 89} 90 91// Scan scan value into DistributionPolicy, implements sql.Scanner interface 92func (d *DistributionPolicy) Scan(value any) error { 93 if value == nil { 94 *d = DistributionPolicy{} 95 return nil 96 } 97 bytes, ok := value.([]byte) 98 if !ok { 99 return errors.New(fmt.Sprint("Failed to unmarshal DistributionPolicy value:", value)) 100 } 101 102 result := DistributionPolicy{} 103 err := json.Unmarshal(bytes, &result) 104 *d = DistributionPolicy(result) 105 return err 106} 107 108// Value return json value, implement driver.Valuer interface 109func (d DistributionPolicy) Value() (driver.Value, error) { 110 return json.Marshal(d) 111} 112 113// ContentWarningsSlice is a custom type for storing content warnings as JSON in the database 114type ContentWarningsSlice []string 115 116// Scan scan value into ContentWarningsSlice, implements sql.Scanner interface 117func (c *ContentWarningsSlice) Scan(value any) error { 118 if value == nil { 119 *c = ContentWarningsSlice{} 120 return nil 121 } 122 bytes, ok := value.([]byte) 123 if !ok { 124 return errors.New(fmt.Sprint("Failed to unmarshal ContentWarningsSlice value:", value)) 125 } 126 127 result := ContentWarningsSlice{} 128 err := json.Unmarshal(bytes, &result) 129 *c = ContentWarningsSlice(result) 130 return err 131} 132 133// Value return json value, implement driver.Valuer interface 134func (c ContentWarningsSlice) Value() (driver.Value, error) { 135 return json.Marshal(c) 136} 137 138type Segment struct { 139 ID string `json:"id" gorm:"primaryKey"` 140 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 141 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"` 142 StartTime time.Time `json:"startTime" gorm:"index:latest_segments,priority:2;index:start_time"` 143 RepoDID string `json:"repoDID" gorm:"index:latest_segments,priority:1;column:repo_did"` 144 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 145 Title string `json:"title"` 146 Size int `json:"size" gorm:"column:size"` 147 MediaData *SegmentMediaData `json:"mediaData,omitempty"` 148 ContentWarnings ContentWarningsSlice `json:"contentWarnings,omitempty"` 149 ContentRights *ContentRights `json:"contentRights,omitempty"` 150 DistributionPolicy *DistributionPolicy `json:"distributionPolicy,omitempty"` 151 DeleteAfter *time.Time `json:"deleteAfter,omitempty" gorm:"column:delete_after;index:delete_after"` 152} 153 154func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 155 aqt := aqtime.FromTime(s.StartTime) 156 if s.MediaData == nil { 157 return nil, fmt.Errorf("media data is nil") 158 } 159 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 160 return nil, fmt.Errorf("video data is nil") 161 } 162 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 163 return nil, fmt.Errorf("audio data is nil") 164 } 165 duration := s.MediaData.Duration 166 sizei64 := int64(s.Size) 167 168 // Convert model metadata to streamplace metadata 169 var contentRights *streamplace.MetadataContentRights 170 if s.ContentRights != nil { 171 contentRights = &streamplace.MetadataContentRights{ 172 CopyrightNotice: s.ContentRights.CopyrightNotice, 173 CopyrightYear: s.ContentRights.CopyrightYear, 174 Creator: s.ContentRights.Creator, 175 CreditLine: s.ContentRights.CreditLine, 176 License: s.ContentRights.License, 177 } 178 } 179 180 var contentWarnings *streamplace.MetadataContentWarnings 181 if len(s.ContentWarnings) > 0 { 182 contentWarnings = &streamplace.MetadataContentWarnings{ 183 Warnings: []string(s.ContentWarnings), 184 } 185 } 186 187 var distributionPolicy *streamplace.MetadataDistributionPolicy 188 if s.DistributionPolicy != nil && s.DistributionPolicy.ExpiresAt != nil { 189 // Convert the absolute timestamp back to a duration (in seconds) from segment start 190 startTimeUnix := s.StartTime.Unix() 191 expiresAtUnix := s.DistributionPolicy.ExpiresAt.Unix() 192 deleteAfterSecs := expiresAtUnix - startTimeUnix 193 distributionPolicy = &streamplace.MetadataDistributionPolicy{ 194 DeleteAfter: &deleteAfterSecs, 195 } 196 } 197 198 return &streamplace.Segment{ 199 LexiconTypeID: "place.stream.segment", 200 Creator: s.RepoDID, 201 Id: s.ID, 202 SigningKey: s.SigningKeyDID, 203 StartTime: string(aqt), 204 Duration: &duration, 205 Size: &sizei64, 206 ContentRights: contentRights, 207 ContentWarnings: contentWarnings, 208 DistributionPolicy: distributionPolicy, 209 Video: []*streamplace.Segment_Video{ 210 { 211 Codec: "h264", 212 Width: int64(s.MediaData.Video[0].Width), 213 Height: int64(s.MediaData.Video[0].Height), 214 Framerate: &streamplace.Segment_Framerate{ 215 Num: int64(s.MediaData.Video[0].FPSNum), 216 Den: int64(s.MediaData.Video[0].FPSDen), 217 }, 218 Bframes: &s.MediaData.Video[0].BFrames, 219 }, 220 }, 221 Audio: []*streamplace.Segment_Audio{ 222 { 223 Codec: "opus", 224 Rate: int64(s.MediaData.Audio[0].Rate), 225 Channels: int64(s.MediaData.Audio[0].Channels), 226 }, 227 }, 228 }, nil 229} 230 231func (m *DBModel) CreateSegment(seg *Segment) error { 232 err := m.DB.Model(Segment{}).Create(seg).Error 233 if err != nil { 234 return err 235 } 236 return nil 237} 238 239// should return the most recent segment for each user, ordered by most recent first 240// only includes segments from the last 30 seconds 241func (m *DBModel) MostRecentSegments() ([]Segment, error) { 242 var segments []Segment 243 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 244 245 err := m.DB.Table("segments"). 246 Select("segments.*"). 247 Where("start_time > ?", thirtySecondsAgo.UTC()). 248 Order("start_time DESC"). 249 Find(&segments).Error 250 if err != nil { 251 return nil, err 252 } 253 if segments == nil { 254 return []Segment{}, nil 255 } 256 257 segmentMap := make(map[string]Segment) 258 for _, seg := range segments { 259 prev, ok := segmentMap[seg.RepoDID] 260 if !ok { 261 segmentMap[seg.RepoDID] = seg 262 } else { 263 if seg.StartTime.After(prev.StartTime) { 264 segmentMap[seg.RepoDID] = seg 265 } 266 } 267 } 268 269 filteredSegments := []Segment{} 270 for _, seg := range segmentMap { 271 filteredSegments = append(filteredSegments, seg) 272 } 273 274 return filteredSegments, nil 275} 276 277func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) { 278 var seg Segment 279 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 280 if err != nil { 281 return nil, err 282 } 283 return &seg, nil 284} 285 286func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) { 287 var segs []Segment 288 if before == nil { 289 later := time.Now().Add(1000 * time.Hour) 290 before = &later 291 } 292 if after == nil { 293 earlier := time.Time{} 294 after = &earlier 295 } 296 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 297 if err != nil { 298 return nil, err 299 } 300 return segs, nil 301} 302 303func (m *DBModel) GetSegment(id string) (*Segment, error) { 304 var seg Segment 305 306 err := m.DB.Model(&Segment{}). 307 Preload("Repo"). 308 Where("id = ?", id). 309 First(&seg).Error 310 311 if errors.Is(err, gorm.ErrRecordNotFound) { 312 return nil, nil 313 } 314 if err != nil { 315 return nil, err 316 } 317 318 return &seg, nil 319} 320 321func (m *DBModel) GetExpiredSegments(ctx context.Context) ([]Segment, error) { 322 323 var expiredSegments []Segment 324 now := time.Now() 325 err := m.DB. 326 Where("delete_after IS NOT NULL AND delete_after < ?", now.UTC()). 327 Find(&expiredSegments).Error 328 if err != nil { 329 return nil, err 330 } 331 332 return expiredSegments, nil 333} 334 335func (m *DBModel) DeleteSegment(ctx context.Context, id string) error { 336 return m.DB.Delete(&Segment{}, "id = ?", id).Error 337} 338 339func (m *DBModel) StartSegmentCleaner(ctx context.Context) error { 340 err := m.SegmentCleaner(ctx) 341 if err != nil { 342 return err 343 } 344 ticker := time.NewTicker(1 * time.Minute) 345 defer ticker.Stop() 346 347 for { 348 select { 349 case <-ctx.Done(): 350 return nil 351 case <-ticker.C: 352 err := m.SegmentCleaner(ctx) 353 if err != nil { 354 log.Error(ctx, "Failed to clean segments", "error", err) 355 } 356 } 357 } 358} 359 360func (m *DBModel) SegmentCleaner(ctx context.Context) error { 361 // Calculate the cutoff time (10 minutes ago) 362 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 363 364 // Find all unique repo_did values 365 var repoDIDs []string 366 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 367 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 368 return err 369 } 370 371 // For each user, keep their last 10 segments and delete older ones 372 for _, repoDID := range repoDIDs { 373 // Get IDs of the last 10 segments for this user 374 var keepSegmentIDs []string 375 if err := m.DB.Model(&Segment{}). 376 Where("repo_did = ?", repoDID). 377 Order("start_time DESC"). 378 Limit(10). 379 Pluck("id", &keepSegmentIDs).Error; err != nil { 380 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 381 return err 382 } 383 384 // Delete old segments except the ones we want to keep 385 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 386 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 387 388 if result.Error != nil { 389 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 390 } else if result.RowsAffected > 0 { 391 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 392 } 393 } 394 return nil 395}