Live video on the AT Protocol
at natb/command-errors 410 lines 12 kB view raw
1package localdb 2 3import ( 4 "context" 5 "database/sql/driver" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "time" 10 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/streamplace" 15) 16 17type SegmentMediadataVideo struct { 18 Width int `json:"width"` 19 Height int `json:"height"` 20 FPSNum int `json:"fpsNum"` 21 FPSDen int `json:"fpsDen"` 22 BFrames bool `json:"bframes"` 23} 24 25type SegmentMediadataAudio struct { 26 Rate int `json:"rate"` 27 Channels int `json:"channels"` 28} 29 30type SegmentMediaData struct { 31 Video []*SegmentMediadataVideo `json:"video"` 32 Audio []*SegmentMediadataAudio `json:"audio"` 33 Duration int64 `json:"duration"` 34 Size int `json:"size"` 35} 36 37// Scan scan value into Jsonb, implements sql.Scanner interface 38func (j *SegmentMediaData) Scan(value any) error { 39 bytes, ok := value.([]byte) 40 if !ok { 41 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 42 } 43 44 result := SegmentMediaData{} 45 err := json.Unmarshal(bytes, &result) 46 *j = SegmentMediaData(result) 47 return err 48} 49 50// Value return json value, implement driver.Valuer interface 51func (j SegmentMediaData) Value() (driver.Value, error) { 52 return json.Marshal(j) 53} 54 55// ContentRights represents content rights and attribution information 56type ContentRights struct { 57 CopyrightNotice *string `json:"copyrightNotice,omitempty"` 58 CopyrightYear *int64 `json:"copyrightYear,omitempty"` 59 Creator *string `json:"creator,omitempty"` 60 CreditLine *string `json:"creditLine,omitempty"` 61 License *string `json:"license,omitempty"` 62} 63 64// Scan scan value into ContentRights, implements sql.Scanner interface 65func (c *ContentRights) Scan(value any) error { 66 if value == nil { 67 *c = ContentRights{} 68 return nil 69 } 70 bytes, ok := value.([]byte) 71 if !ok { 72 return errors.New(fmt.Sprint("Failed to unmarshal ContentRights value:", value)) 73 } 74 75 result := ContentRights{} 76 err := json.Unmarshal(bytes, &result) 77 *c = ContentRights(result) 78 return err 79} 80 81// Value return json value, implement driver.Valuer interface 82func (c ContentRights) Value() (driver.Value, error) { 83 return json.Marshal(c) 84} 85 86// DistributionPolicy represents distribution policy information 87type DistributionPolicy struct { 88 DeleteAfterSeconds *int64 `json:"deleteAfterSeconds,omitempty"` 89} 90 91// Scan scan value into DistributionPolicy, implements sql.Scanner interface 92func (d *DistributionPolicy) Scan(value any) error { 93 if value == nil { 94 *d = DistributionPolicy{} 95 return nil 96 } 97 bytes, ok := value.([]byte) 98 if !ok { 99 return errors.New(fmt.Sprint("Failed to unmarshal DistributionPolicy value:", value)) 100 } 101 102 result := DistributionPolicy{} 103 err := json.Unmarshal(bytes, &result) 104 *d = DistributionPolicy(result) 105 return err 106} 107 108// Value return json value, implement driver.Valuer interface 109func (d DistributionPolicy) Value() (driver.Value, error) { 110 return json.Marshal(d) 111} 112 113// ContentWarningsSlice is a custom type for storing content warnings as JSON in the database 114type ContentWarningsSlice []string 115 116// Scan scan value into ContentWarningsSlice, implements sql.Scanner interface 117func (c *ContentWarningsSlice) Scan(value any) error { 118 if value == nil { 119 *c = ContentWarningsSlice{} 120 return nil 121 } 122 bytes, ok := value.([]byte) 123 if !ok { 124 return errors.New(fmt.Sprint("Failed to unmarshal ContentWarningsSlice value:", value)) 125 } 126 127 result := ContentWarningsSlice{} 128 err := json.Unmarshal(bytes, &result) 129 *c = ContentWarningsSlice(result) 130 return err 131} 132 133// Value return json value, implement driver.Valuer interface 134func (c ContentWarningsSlice) Value() (driver.Value, error) { 135 return json.Marshal(c) 136} 137 138type Segment struct { 139 ID string `json:"id" gorm:"primaryKey"` 140 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 141 StartTime time.Time `json:"startTime" gorm:"index:latest_segments,priority:2;index:start_time"` 142 RepoDID string `json:"repoDID" gorm:"index:latest_segments,priority:1;column:repo_did"` 143 Title string `json:"title"` 144 Size int `json:"size" gorm:"column:size"` 145 MediaData *SegmentMediaData `json:"mediaData,omitempty"` 146 ContentWarnings ContentWarningsSlice `json:"contentWarnings,omitempty"` 147 ContentRights *ContentRights `json:"contentRights,omitempty"` 148 DistributionPolicy *DistributionPolicy `json:"distributionPolicy,omitempty"` 149 DeleteAfter *time.Time `json:"deleteAfter,omitempty" gorm:"column:delete_after;index:delete_after"` 150} 151 152func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 153 aqt := aqtime.FromTime(s.StartTime) 154 if s.MediaData == nil { 155 return nil, fmt.Errorf("media data is nil") 156 } 157 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 158 return nil, fmt.Errorf("video data is nil") 159 } 160 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 161 return nil, fmt.Errorf("audio data is nil") 162 } 163 duration := s.MediaData.Duration 164 sizei64 := int64(s.Size) 165 166 // Convert model metadata to streamplace metadata 167 var contentRights *streamplace.MetadataContentRights 168 if s.ContentRights != nil { 169 contentRights = &streamplace.MetadataContentRights{ 170 CopyrightNotice: s.ContentRights.CopyrightNotice, 171 CopyrightYear: s.ContentRights.CopyrightYear, 172 Creator: s.ContentRights.Creator, 173 CreditLine: s.ContentRights.CreditLine, 174 License: s.ContentRights.License, 175 } 176 } 177 178 var contentWarnings *streamplace.MetadataContentWarnings 179 if len(s.ContentWarnings) > 0 { 180 contentWarnings = &streamplace.MetadataContentWarnings{ 181 Warnings: []string(s.ContentWarnings), 182 } 183 } 184 185 var distributionPolicy *streamplace.MetadataDistributionPolicy 186 if s.DistributionPolicy != nil && s.DistributionPolicy.DeleteAfterSeconds != nil { 187 distributionPolicy = &streamplace.MetadataDistributionPolicy{ 188 DeleteAfter: s.DistributionPolicy.DeleteAfterSeconds, 189 } 190 } 191 192 return &streamplace.Segment{ 193 LexiconTypeID: "place.stream.segment", 194 Creator: s.RepoDID, 195 Id: s.ID, 196 SigningKey: s.SigningKeyDID, 197 StartTime: string(aqt), 198 Duration: &duration, 199 Size: &sizei64, 200 ContentRights: contentRights, 201 ContentWarnings: contentWarnings, 202 DistributionPolicy: distributionPolicy, 203 Video: []*streamplace.Segment_Video{ 204 { 205 Codec: "h264", 206 Width: int64(s.MediaData.Video[0].Width), 207 Height: int64(s.MediaData.Video[0].Height), 208 Framerate: &streamplace.Segment_Framerate{ 209 Num: int64(s.MediaData.Video[0].FPSNum), 210 Den: int64(s.MediaData.Video[0].FPSDen), 211 }, 212 Bframes: &s.MediaData.Video[0].BFrames, 213 }, 214 }, 215 Audio: []*streamplace.Segment_Audio{ 216 { 217 Codec: "opus", 218 Rate: int64(s.MediaData.Audio[0].Rate), 219 Channels: int64(s.MediaData.Audio[0].Channels), 220 }, 221 }, 222 }, nil 223} 224 225func (m *LocalDatabase) CreateSegment(seg *Segment) error { 226 err := m.DB.Model(Segment{}).Create(seg).Error 227 if err != nil { 228 return err 229 } 230 return nil 231} 232 233// should return the most recent segment for each user, ordered by most recent first 234// only includes segments from the last 30 seconds 235func (m *LocalDatabase) MostRecentSegments() ([]Segment, error) { 236 var segments []Segment 237 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 238 239 err := m.DB.Table("segments"). 240 Select("segments.*"). 241 Where("start_time > ?", thirtySecondsAgo.UTC()). 242 Order("start_time DESC"). 243 Find(&segments).Error 244 if err != nil { 245 return nil, err 246 } 247 if segments == nil { 248 return []Segment{}, nil 249 } 250 251 segmentMap := make(map[string]Segment) 252 for _, seg := range segments { 253 prev, ok := segmentMap[seg.RepoDID] 254 if !ok { 255 segmentMap[seg.RepoDID] = seg 256 } else { 257 if seg.StartTime.After(prev.StartTime) { 258 segmentMap[seg.RepoDID] = seg 259 } 260 } 261 } 262 263 filteredSegments := []Segment{} 264 for _, seg := range segmentMap { 265 filteredSegments = append(filteredSegments, seg) 266 } 267 268 return filteredSegments, nil 269} 270 271func (m *LocalDatabase) LatestSegmentForUser(user string) (*Segment, error) { 272 var seg Segment 273 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 274 if err != nil { 275 return nil, err 276 } 277 return &seg, nil 278} 279 280func (m *LocalDatabase) FilterLiveRepoDIDs(repoDIDs []string) ([]string, error) { 281 if len(repoDIDs) == 0 { 282 return []string{}, nil 283 } 284 285 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 286 287 var liveDIDs []string 288 289 err := m.DB.Table("segments"). 290 Select("DISTINCT repo_did"). 291 Where("repo_did IN ? AND start_time > ?", repoDIDs, thirtySecondsAgo.UTC()). 292 Pluck("repo_did", &liveDIDs).Error 293 294 if err != nil { 295 return nil, err 296 } 297 298 return liveDIDs, nil 299} 300 301func (m *LocalDatabase) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) { 302 var segs []Segment 303 if before == nil { 304 later := time.Now().Add(1000 * time.Hour) 305 before = &later 306 } 307 if after == nil { 308 earlier := time.Time{} 309 after = &earlier 310 } 311 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 312 if err != nil { 313 return nil, err 314 } 315 return segs, nil 316} 317 318func (m *LocalDatabase) GetSegment(id string) (*Segment, error) { 319 var seg Segment 320 321 err := m.DB.Model(&Segment{}). 322 Preload("Repo"). 323 Where("id = ?", id). 324 First(&seg).Error 325 326 if errors.Is(err, gorm.ErrRecordNotFound) { 327 return nil, nil 328 } 329 if err != nil { 330 return nil, err 331 } 332 333 return &seg, nil 334} 335 336func (m *LocalDatabase) GetExpiredSegments(ctx context.Context) ([]Segment, error) { 337 338 var expiredSegments []Segment 339 now := time.Now() 340 err := m.DB. 341 Where("delete_after IS NOT NULL AND delete_after < ?", now.UTC()). 342 Find(&expiredSegments).Error 343 if err != nil { 344 return nil, err 345 } 346 347 return expiredSegments, nil 348} 349 350func (m *LocalDatabase) DeleteSegment(ctx context.Context, id string) error { 351 return m.DB.Delete(&Segment{}, "id = ?", id).Error 352} 353 354func (m *LocalDatabase) StartSegmentCleaner(ctx context.Context) error { 355 err := m.SegmentCleaner(ctx) 356 if err != nil { 357 return err 358 } 359 ticker := time.NewTicker(1 * time.Minute) 360 defer ticker.Stop() 361 362 for { 363 select { 364 case <-ctx.Done(): 365 return nil 366 case <-ticker.C: 367 err := m.SegmentCleaner(ctx) 368 if err != nil { 369 log.Error(ctx, "Failed to clean segments", "error", err) 370 } 371 } 372 } 373} 374 375func (m *LocalDatabase) SegmentCleaner(ctx context.Context) error { 376 // Calculate the cutoff time (10 minutes ago) 377 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 378 379 // Find all unique repo_did values 380 var repoDIDs []string 381 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 382 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 383 return err 384 } 385 386 // For each user, keep their last 10 segments and delete older ones 387 for _, repoDID := range repoDIDs { 388 // Get IDs of the last 10 segments for this user 389 var keepSegmentIDs []string 390 if err := m.DB.Model(&Segment{}). 391 Where("repo_did = ?", repoDID). 392 Order("start_time DESC"). 393 Limit(10). 394 Pluck("id", &keepSegmentIDs).Error; err != nil { 395 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 396 return err 397 } 398 399 // Delete old segments except the ones we want to keep 400 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 401 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 402 403 if result.Error != nil { 404 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 405 } else if result.RowsAffected > 0 { 406 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 407 } 408 } 409 return nil 410}