Live video on the AT Protocol
at natb/add-problems-back 276 lines 7.8 kB view raw
1package model 2 3import ( 4 "context" 5 "database/sql/driver" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "time" 10 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/streamplace" 15) 16 17type SegmentMediadataVideo struct { 18 Width int `json:"width"` 19 Height int `json:"height"` 20 FPSNum int `json:"fpsNum"` 21 FPSDen int `json:"fpsDen"` 22 BFrames bool `json:"bframes"` 23} 24 25type SegmentMediadataAudio struct { 26 Rate int `json:"rate"` 27 Channels int `json:"channels"` 28} 29 30type SegmentMediaData struct { 31 Video []*SegmentMediadataVideo `json:"video"` 32 Audio []*SegmentMediadataAudio `json:"audio"` 33 Duration int64 `json:"duration"` 34 Size int `json:"size"` 35} 36 37// Scan scan value into Jsonb, implements sql.Scanner interface 38func (j *SegmentMediaData) Scan(value any) error { 39 bytes, ok := value.([]byte) 40 if !ok { 41 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 42 } 43 44 result := SegmentMediaData{} 45 err := json.Unmarshal(bytes, &result) 46 *j = SegmentMediaData(result) 47 return err 48} 49 50// Value return json value, implement driver.Valuer interface 51func (j SegmentMediaData) Value() (driver.Value, error) { 52 return json.Marshal(j) 53} 54 55type Segment struct { 56 ID string `json:"id" gorm:"primaryKey"` 57 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 58 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"` 59 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"` 60 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"` 61 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 62 Title string `json:"title"` 63 Size int `json:"size" gorm:"column:size"` 64 MediaData *SegmentMediaData `json:"mediaData,omitempty"` 65} 66 67func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 68 aqt := aqtime.FromTime(s.StartTime) 69 if s.MediaData == nil { 70 return nil, fmt.Errorf("media data is nil") 71 } 72 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 73 return nil, fmt.Errorf("video data is nil") 74 } 75 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 76 return nil, fmt.Errorf("audio data is nil") 77 } 78 duration := s.MediaData.Duration 79 sizei64 := int64(s.Size) 80 return &streamplace.Segment{ 81 LexiconTypeID: "place.stream.segment", 82 Creator: s.RepoDID, 83 Id: s.ID, 84 SigningKey: s.SigningKeyDID, 85 StartTime: string(aqt), 86 Duration: &duration, 87 Size: &sizei64, 88 Video: []*streamplace.Segment_Video{ 89 { 90 Codec: "h264", 91 Width: int64(s.MediaData.Video[0].Width), 92 Height: int64(s.MediaData.Video[0].Height), 93 Framerate: &streamplace.Segment_Framerate{ 94 Num: int64(s.MediaData.Video[0].FPSNum), 95 Den: int64(s.MediaData.Video[0].FPSDen), 96 }, 97 Bframes: &s.MediaData.Video[0].BFrames, 98 }, 99 }, 100 Audio: []*streamplace.Segment_Audio{ 101 { 102 Codec: "opus", 103 Rate: int64(s.MediaData.Audio[0].Rate), 104 Channels: int64(s.MediaData.Audio[0].Channels), 105 }, 106 }, 107 }, nil 108} 109 110func (m *DBModel) CreateSegment(seg *Segment) error { 111 err := m.DB.Model(Segment{}).Create(seg).Error 112 if err != nil { 113 return err 114 } 115 return nil 116} 117 118// should return the most recent segment for each user, ordered by most recent first 119// only includes segments from the last 30 seconds 120func (m *DBModel) MostRecentSegments() ([]Segment, error) { 121 var segments []Segment 122 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 123 124 err := m.DB.Table("segments"). 125 Select("segments.*"). 126 Where("id IN (?)", 127 m.DB.Table("segments"). 128 Select("id"). 129 Where("(repo_did, start_time) IN (?)", 130 m.DB.Table("segments"). 131 Select("repo_did, MAX(start_time)"). 132 Group("repo_did"))). 133 Order("start_time DESC"). 134 Joins("JOIN repos ON segments.repo_did = repos.did"). 135 Preload("Repo"). 136 Find(&segments).Error 137 138 if err != nil { 139 return nil, err 140 } 141 if segments == nil { 142 return []Segment{}, nil 143 } 144 145 filteredSegments := []Segment{} 146 for _, seg := range segments { 147 if seg.StartTime.After(thirtySecondsAgo) { 148 filteredSegments = append(filteredSegments, seg) 149 } 150 } 151 152 return filteredSegments, nil 153} 154 155func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) { 156 var seg Segment 157 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 158 if err != nil { 159 return nil, err 160 } 161 return &seg, nil 162} 163 164func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) { 165 var segs []Segment 166 if before == nil { 167 later := time.Now().Add(1000 * time.Hour) 168 before = &later 169 } 170 if after == nil { 171 earlier := time.Time{} 172 after = &earlier 173 } 174 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 175 if err != nil { 176 return nil, err 177 } 178 return segs, nil 179} 180 181func (m *DBModel) GetLiveUsers() ([]Segment, error) { 182 var liveUsers []Segment 183 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time() 184 185 err := m.DB.Model(&Segment{}). 186 Preload("Repo"). 187 Where("start_time >= ?", thirtySecondsAgo). 188 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)"). 189 Order("start_time DESC"). 190 Find(&liveUsers).Error 191 192 if err != nil { 193 return nil, err 194 } 195 if liveUsers == nil { 196 return []Segment{}, nil 197 } 198 199 return liveUsers, nil 200} 201 202func (m *DBModel) GetSegment(id string) (*Segment, error) { 203 var seg Segment 204 205 err := m.DB.Model(&Segment{}). 206 Preload("Repo"). 207 Where("id = ?", id). 208 First(&seg).Error 209 210 if errors.Is(err, gorm.ErrRecordNotFound) { 211 return nil, nil 212 } 213 if err != nil { 214 return nil, err 215 } 216 217 return &seg, nil 218} 219 220func (m *DBModel) StartSegmentCleaner(ctx context.Context) error { 221 err := m.SegmentCleaner(ctx) 222 if err != nil { 223 return err 224 } 225 ticker := time.NewTicker(1 * time.Minute) 226 defer ticker.Stop() 227 228 for { 229 select { 230 case <-ctx.Done(): 231 return nil 232 case <-ticker.C: 233 err := m.SegmentCleaner(ctx) 234 if err != nil { 235 log.Error(ctx, "Failed to clean segments", "error", err) 236 } 237 } 238 } 239} 240 241func (m *DBModel) SegmentCleaner(ctx context.Context) error { 242 // Calculate the cutoff time (10 minutes ago) 243 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 244 245 // Find all unique repo_did values 246 var repoDIDs []string 247 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 248 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 249 return err 250 } 251 252 // For each user, keep their last 10 segments and delete older ones 253 for _, repoDID := range repoDIDs { 254 // Get IDs of the last 10 segments for this user 255 var keepSegmentIDs []string 256 if err := m.DB.Model(&Segment{}). 257 Where("repo_did = ?", repoDID). 258 Order("start_time DESC"). 259 Limit(10). 260 Pluck("id", &keepSegmentIDs).Error; err != nil { 261 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 262 return err 263 } 264 265 // Delete old segments except the ones we want to keep 266 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 267 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 268 269 if result.Error != nil { 270 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 271 } else if result.RowsAffected > 0 { 272 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 273 } 274 } 275 return nil 276}