Live video on the AT Protocol
at eli/get-segments 358 lines 10 kB view raw
1package model 2 3import ( 4 "context" 5 "database/sql/driver" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "time" 10 11 "github.com/bluesky-social/indigo/lex/util" 12 "gorm.io/gorm" 13 "stream.place/streamplace/pkg/aqtime" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/streamplace" 16) 17 18type SegmentMediadataVideo struct { 19 Width int `json:"width"` 20 Height int `json:"height"` 21 FPSNum int `json:"fpsNum"` 22 FPSDen int `json:"fpsDen"` 23} 24 25type SegmentMediadataAudio struct { 26 Rate int `json:"rate"` 27 Channels int `json:"channels"` 28} 29 30type SegmentMediaData struct { 31 Video []*SegmentMediadataVideo `json:"video"` 32 Audio []*SegmentMediadataAudio `json:"audio"` 33 Duration int64 `json:"duration"` 34} 35 36// Scan scan value into Jsonb, implements sql.Scanner interface 37func (j *SegmentMediaData) Scan(value interface{}) error { 38 bytes, ok := value.([]byte) 39 if !ok { 40 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 41 } 42 43 result := SegmentMediaData{} 44 err := json.Unmarshal(bytes, &result) 45 *j = SegmentMediaData(result) 46 return err 47} 48 49// Value return json value, implement driver.Valuer interface 50func (j SegmentMediaData) Value() (driver.Value, error) { 51 return json.Marshal(j) 52} 53 54type Segment struct { 55 ID string `json:"id" gorm:"primaryKey"` 56 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 57 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"` 58 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"` 59 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"` 60 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 61 Title string `json:"title"` 62 MediaData *SegmentMediaData `json:"mediaData,omitempty"` 63} 64 65func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 66 aqt := aqtime.FromTime(s.StartTime) 67 if s.MediaData == nil { 68 return nil, fmt.Errorf("media data is nil") 69 } 70 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 71 return nil, fmt.Errorf("video data is nil") 72 } 73 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 74 return nil, fmt.Errorf("audio data is nil") 75 } 76 duration := s.MediaData.Duration 77 return &streamplace.Segment{ 78 LexiconTypeID: "place.stream.segment", 79 Creator: s.RepoDID, 80 Id: s.ID, 81 SigningKey: s.SigningKeyDID, 82 StartTime: string(aqt), 83 Duration: &duration, 84 Video: []*streamplace.Segment_Video{ 85 { 86 Codec: "h264", 87 Width: int64(s.MediaData.Video[0].Width), 88 Height: int64(s.MediaData.Video[0].Height), 89 Framerate: &streamplace.Segment_Framerate{ 90 Num: int64(s.MediaData.Video[0].FPSNum), 91 Den: int64(s.MediaData.Video[0].FPSDen), 92 }, 93 }, 94 }, 95 Audio: []*streamplace.Segment_Audio{ 96 { 97 Codec: "opus", 98 Rate: int64(s.MediaData.Audio[0].Rate), 99 Channels: int64(s.MediaData.Audio[0].Channels), 100 }, 101 }, 102 }, nil 103} 104 105func (m *DBModel) CreateSegment(seg *Segment) error { 106 err := m.DB.Model(Segment{}).Create(seg).Error 107 if err != nil { 108 return err 109 } 110 return nil 111} 112 113// should return the most recent segment for each user, ordered by most recent first 114// only includes segments from the last 30 seconds 115func (m *DBModel) MostRecentSegments() ([]Segment, error) { 116 var segments []Segment 117 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 118 119 err := m.DB.Table("segments"). 120 Select("segments.*"). 121 Where("id IN (?)", 122 m.DB.Table("segments"). 123 Select("id"). 124 Where("(repo_did, start_time) IN (?)", 125 m.DB.Table("segments"). 126 Select("repo_did, MAX(start_time)"). 127 Group("repo_did"))). 128 Order("start_time DESC"). 129 Joins("JOIN repos ON segments.repo_did = repos.did"). 130 Preload("Repo"). 131 Find(&segments).Error 132 133 if err != nil { 134 return nil, err 135 } 136 if segments == nil { 137 return []Segment{}, nil 138 } 139 140 filteredSegments := []Segment{} 141 for _, seg := range segments { 142 if seg.StartTime.After(thirtySecondsAgo) { 143 filteredSegments = append(filteredSegments, seg) 144 } 145 } 146 147 return filteredSegments, nil 148} 149 150func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) { 151 var seg Segment 152 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 153 if err != nil { 154 return nil, err 155 } 156 return &seg, nil 157} 158 159func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error) { 160 var segs []Segment 161 if before == nil { 162 later := time.Now().Add(1000 * time.Hour) 163 before = &later 164 } 165 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ?", user, before.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 166 if err != nil { 167 return nil, err 168 } 169 return segs, nil 170} 171 172func (m *DBModel) GetLiveUsers() ([]Segment, error) { 173 var liveUsers []Segment 174 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time() 175 176 err := m.DB.Model(&Segment{}). 177 Preload("Repo"). 178 Where("start_time >= ?", thirtySecondsAgo). 179 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)"). 180 Order("start_time DESC"). 181 Find(&liveUsers).Error 182 183 if err != nil { 184 return nil, err 185 } 186 if liveUsers == nil { 187 return []Segment{}, nil 188 } 189 190 return liveUsers, nil 191} 192 193func (m *DBModel) GetSegment(id string) (*Segment, error) { 194 var seg Segment 195 196 err := m.DB.Model(&Segment{}). 197 Preload("Repo"). 198 Where("id = ?", id). 199 First(&seg).Error 200 201 if errors.Is(err, gorm.ErrRecordNotFound) { 202 return nil, nil 203 } 204 if err != nil { 205 return nil, err 206 } 207 208 return &seg, nil 209} 210 211func (m *DBModel) StartSegmentCleaner(ctx context.Context) error { 212 err := m.SegmentCleaner(ctx) 213 if err != nil { 214 return err 215 } 216 ticker := time.NewTicker(1 * time.Minute) 217 defer ticker.Stop() 218 219 for { 220 select { 221 case <-ctx.Done(): 222 return nil 223 case <-ticker.C: 224 err := m.SegmentCleaner(ctx) 225 if err != nil { 226 log.Error(ctx, "Failed to clean segments", "error", err) 227 } 228 } 229 } 230} 231 232func (m *DBModel) SegmentCleaner(ctx context.Context) error { 233 // Calculate the cutoff time (10 minutes ago) 234 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 235 236 // Find all unique repo_did values 237 var repoDIDs []string 238 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 239 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 240 return err 241 } 242 243 // For each user, keep their last 10 segments and delete older ones 244 for _, repoDID := range repoDIDs { 245 // Get IDs of the last 10 segments for this user 246 var keepSegmentIDs []string 247 if err := m.DB.Model(&Segment{}). 248 Where("repo_did = ?", repoDID). 249 Order("start_time DESC"). 250 Limit(10). 251 Pluck("id", &keepSegmentIDs).Error; err != nil { 252 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 253 return err 254 } 255 256 // Delete old segments except the ones we want to keep 257 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 258 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 259 260 if result.Error != nil { 261 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 262 } else if result.RowsAffected > 0 { 263 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 264 } 265 } 266 return nil 267} 268 269func (m *DBModel) MostRecentSegmentsWithStreamInfo() ([]SegmentWithStreamInfo, error) { 270 var recentSegments []Segment 271 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 272 273 // fetch recent segments made <30s ago 274 err := m.DB.Table("segments"). 275 Select("segments.*"). 276 Where("segments.id IN (?) AND segments.start_time > ?", 277 m.DB.Table("segments"). 278 Select("id"). 279 Where("(repo_did, start_time) IN (?)", 280 m.DB.Table("segments"). 281 Select("repo_did, MAX(start_time)"). 282 Group("repo_did")), 283 thirtySecondsAgo). 284 Order("segments.start_time DESC"). 285 Preload("Repo"). // Preload Repo for the Segment 286 Find(&recentSegments).Error 287 288 if err != nil { 289 return nil, fmt.Errorf("error fetching recent segments: %w", err) 290 } 291 292 if len(recentSegments) == 0 { 293 return []SegmentWithStreamInfo{}, nil 294 } 295 296 // get all repo DIDs 297 repoDIDs := make([]string, 0, len(recentSegments)) 298 repoDIDSet := make(map[string]struct{}) 299 for _, seg := range recentSegments { 300 if _, exists := repoDIDSet[seg.RepoDID]; !exists { 301 repoDIDs = append(repoDIDs, seg.RepoDID) 302 repoDIDSet[seg.RepoDID] = struct{}{} 303 } 304 } 305 306 // fetch latest stream record for each DID 307 var latestLivestreams []Livestream 308 if len(repoDIDs) > 0 { 309 subQuery := m.DB.Table("livestreams"). 310 Select("repo_did, MAX(created_at) as max_created_at"). 311 Where("repo_did IN (?)", repoDIDs). 312 Group("repo_did") 313 314 err = m.DB.Table("livestreams as ls"). 315 Joins("JOIN (?) latest_ls ON ls.repo_did = latest_ls.repo_did AND ls.created_at = latest_ls.max_created_at", subQuery). 316 Where("ls.repo_did IN (?)", repoDIDs). 317 Preload("Repo"). // Preload Repo for the Livestream 318 Find(&latestLivestreams).Error 319 320 if err != nil { 321 return nil, fmt.Errorf("error fetching latest livestreams for repos: %w", err) 322 } 323 } 324 325 // map livestreams for easy lookup below, convert to LivestreamView 326 livestreamMap := make(map[string]*streamplace.Livestream_LivestreamView) 327 for i := range latestLivestreams { 328 view, err := latestLivestreams[i].ToLivestreamView() 329 if err != nil { 330 log.Error(context.Background(), "Couldn't convert Livestream object to LivestreamView, skipping", "error", err) 331 } else { 332 // extract inner information 333 livestreamMap[latestLivestreams[i].RepoDID] = view 334 } 335 } 336 337 // finally, make the resulting SegmentWithStreamInfos 338 resultWithStreamInfo := make([]SegmentWithStreamInfo, len(recentSegments)) 339 for i, seg := range recentSegments { 340 view, ok := livestreamMap[seg.RepoDID] 341 if !ok { 342 log.Error(context.Background(), "No livestream view found for repo_did", "repo_did", seg.RepoDID) 343 continue 344 } 345 resultWithStreamInfo[i] = SegmentWithStreamInfo{ 346 Segment: seg, 347 LivestreamView: view.Record, 348 } 349 } 350 351 return resultWithStreamInfo, nil 352} 353 354type SegmentWithStreamInfo struct { 355 Segment 356 // possibly incorrect/weird type 357 LivestreamView *util.LexiconTypeDecoder `json:"streamRecord,omitempty"` 358}