Live video on the AT Protocol
at eli/problem-detection 268 lines 7.5 kB view raw
1package model 2 3import ( 4 "context" 5 "database/sql/driver" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "time" 10 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/streamplace" 15) 16 17type SegmentMediadataVideo struct { 18 Width int `json:"width"` 19 Height int `json:"height"` 20 FPSNum int `json:"fpsNum"` 21 FPSDen int `json:"fpsDen"` 22 BFrames bool `json:"bframes"` 23} 24 25type SegmentMediadataAudio struct { 26 Rate int `json:"rate"` 27 Channels int `json:"channels"` 28} 29 30type SegmentMediaData struct { 31 Video []*SegmentMediadataVideo `json:"video"` 32 Audio []*SegmentMediadataAudio `json:"audio"` 33 Duration int64 `json:"duration"` 34} 35 36// Scan scan value into Jsonb, implements sql.Scanner interface 37func (j *SegmentMediaData) Scan(value interface{}) error { 38 bytes, ok := value.([]byte) 39 if !ok { 40 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 41 } 42 43 result := SegmentMediaData{} 44 err := json.Unmarshal(bytes, &result) 45 *j = SegmentMediaData(result) 46 return err 47} 48 49// Value return json value, implement driver.Valuer interface 50func (j SegmentMediaData) Value() (driver.Value, error) { 51 return json.Marshal(j) 52} 53 54type Segment struct { 55 ID string `json:"id" gorm:"primaryKey"` 56 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 57 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"` 58 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"` 59 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"` 60 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 61 Title string `json:"title"` 62 MediaData *SegmentMediaData `json:"mediaData,omitempty"` 63} 64 65func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 66 aqt := aqtime.FromTime(s.StartTime) 67 if s.MediaData == nil { 68 return nil, fmt.Errorf("media data is nil") 69 } 70 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 71 return nil, fmt.Errorf("video data is nil") 72 } 73 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 74 return nil, fmt.Errorf("audio data is nil") 75 } 76 duration := s.MediaData.Duration 77 return &streamplace.Segment{ 78 LexiconTypeID: "place.stream.segment", 79 Creator: s.RepoDID, 80 Id: s.ID, 81 SigningKey: s.SigningKeyDID, 82 StartTime: string(aqt), 83 Duration: &duration, 84 Video: []*streamplace.Segment_Video{ 85 { 86 Codec: "h264", 87 Width: int64(s.MediaData.Video[0].Width), 88 Height: int64(s.MediaData.Video[0].Height), 89 Framerate: &streamplace.Segment_Framerate{ 90 Num: int64(s.MediaData.Video[0].FPSNum), 91 Den: int64(s.MediaData.Video[0].FPSDen), 92 }, 93 Bframes: &s.MediaData.Video[0].BFrames, 94 }, 95 }, 96 Audio: []*streamplace.Segment_Audio{ 97 { 98 Codec: "opus", 99 Rate: int64(s.MediaData.Audio[0].Rate), 100 Channels: int64(s.MediaData.Audio[0].Channels), 101 }, 102 }, 103 }, nil 104} 105 106func (m *DBModel) CreateSegment(seg *Segment) error { 107 err := m.DB.Model(Segment{}).Create(seg).Error 108 if err != nil { 109 return err 110 } 111 return nil 112} 113 114// should return the most recent segment for each user, ordered by most recent first 115// only includes segments from the last 30 seconds 116func (m *DBModel) MostRecentSegments() ([]Segment, error) { 117 var segments []Segment 118 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 119 120 err := m.DB.Table("segments"). 121 Select("segments.*"). 122 Where("id IN (?)", 123 m.DB.Table("segments"). 124 Select("id"). 125 Where("(repo_did, start_time) IN (?)", 126 m.DB.Table("segments"). 127 Select("repo_did, MAX(start_time)"). 128 Group("repo_did"))). 129 Order("start_time DESC"). 130 Joins("JOIN repos ON segments.repo_did = repos.did"). 131 Preload("Repo"). 132 Find(&segments).Error 133 134 if err != nil { 135 return nil, err 136 } 137 if segments == nil { 138 return []Segment{}, nil 139 } 140 141 filteredSegments := []Segment{} 142 for _, seg := range segments { 143 if seg.StartTime.After(thirtySecondsAgo) { 144 filteredSegments = append(filteredSegments, seg) 145 } 146 } 147 148 return filteredSegments, nil 149} 150 151func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) { 152 var seg Segment 153 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 154 if err != nil { 155 return nil, err 156 } 157 return &seg, nil 158} 159 160func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error) { 161 var segs []Segment 162 if before == nil { 163 later := time.Now().Add(1000 * time.Hour) 164 before = &later 165 } 166 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ?", user, before.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 167 if err != nil { 168 return nil, err 169 } 170 return segs, nil 171} 172 173func (m *DBModel) GetLiveUsers() ([]Segment, error) { 174 var liveUsers []Segment 175 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time() 176 177 err := m.DB.Model(&Segment{}). 178 Preload("Repo"). 179 Where("start_time >= ?", thirtySecondsAgo). 180 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)"). 181 Order("start_time DESC"). 182 Find(&liveUsers).Error 183 184 if err != nil { 185 return nil, err 186 } 187 if liveUsers == nil { 188 return []Segment{}, nil 189 } 190 191 return liveUsers, nil 192} 193 194func (m *DBModel) GetSegment(id string) (*Segment, error) { 195 var seg Segment 196 197 err := m.DB.Model(&Segment{}). 198 Preload("Repo"). 199 Where("id = ?", id). 200 First(&seg).Error 201 202 if errors.Is(err, gorm.ErrRecordNotFound) { 203 return nil, nil 204 } 205 if err != nil { 206 return nil, err 207 } 208 209 return &seg, nil 210} 211 212func (m *DBModel) StartSegmentCleaner(ctx context.Context) error { 213 err := m.SegmentCleaner(ctx) 214 if err != nil { 215 return err 216 } 217 ticker := time.NewTicker(1 * time.Minute) 218 defer ticker.Stop() 219 220 for { 221 select { 222 case <-ctx.Done(): 223 return nil 224 case <-ticker.C: 225 err := m.SegmentCleaner(ctx) 226 if err != nil { 227 log.Error(ctx, "Failed to clean segments", "error", err) 228 } 229 } 230 } 231} 232 233func (m *DBModel) SegmentCleaner(ctx context.Context) error { 234 // Calculate the cutoff time (10 minutes ago) 235 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 236 237 // Find all unique repo_did values 238 var repoDIDs []string 239 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 240 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 241 return err 242 } 243 244 // For each user, keep their last 10 segments and delete older ones 245 for _, repoDID := range repoDIDs { 246 // Get IDs of the last 10 segments for this user 247 var keepSegmentIDs []string 248 if err := m.DB.Model(&Segment{}). 249 Where("repo_did = ?", repoDID). 250 Order("start_time DESC"). 251 Limit(10). 252 Pluck("id", &keepSegmentIDs).Error; err != nil { 253 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 254 return err 255 } 256 257 // Delete old segments except the ones we want to keep 258 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 259 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 260 261 if result.Error != nil { 262 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 263 } else if result.RowsAffected > 0 { 264 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 265 } 266 } 267 return nil 268}