Live video on the AT Protocol
at natb/fix-fullscreen 253 lines 7.0 kB view raw
1package model 2 3import ( 4 "context" 5 "database/sql/driver" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "time" 10 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/streamplace" 15) 16 17type SegmentMediadataVideo struct { 18 Width int `json:"width"` 19 Height int `json:"height"` 20 FPSNum int `json:"fpsNum"` 21 FPSDen int `json:"fpsDen"` 22} 23 24type SegmentMediadataAudio struct { 25 Rate int `json:"rate"` 26 Channels int `json:"channels"` 27} 28 29type SegmentMediaData struct { 30 Video []*SegmentMediadataVideo `json:"video"` 31 Audio []*SegmentMediadataAudio `json:"audio"` 32 Duration int64 `json:"duration"` 33} 34 35// Scan scan value into Jsonb, implements sql.Scanner interface 36func (j *SegmentMediaData) Scan(value interface{}) error { 37 bytes, ok := value.([]byte) 38 if !ok { 39 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 40 } 41 42 result := SegmentMediaData{} 43 err := json.Unmarshal(bytes, &result) 44 *j = SegmentMediaData(result) 45 return err 46} 47 48// Value return json value, implement driver.Valuer interface 49func (j SegmentMediaData) Value() (driver.Value, error) { 50 return json.Marshal(j) 51} 52 53type Segment struct { 54 ID string `json:"id" gorm:"primaryKey"` 55 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 56 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"` 57 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"` 58 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"` 59 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 60 Title string `json:"title"` 61 MediaData *SegmentMediaData `json:"mediaData,omitempty"` 62} 63 64func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 65 aqt := aqtime.FromTime(s.StartTime) 66 if s.MediaData == nil { 67 return nil, fmt.Errorf("media data is nil") 68 } 69 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 70 return nil, fmt.Errorf("video data is nil") 71 } 72 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 73 return nil, fmt.Errorf("audio data is nil") 74 } 75 duration := s.MediaData.Duration 76 return &streamplace.Segment{ 77 LexiconTypeID: "place.stream.segment", 78 Creator: s.RepoDID, 79 Id: s.ID, 80 SigningKey: s.SigningKeyDID, 81 StartTime: string(aqt), 82 Duration: &duration, 83 Video: []*streamplace.Segment_Video{ 84 { 85 Codec: "h264", 86 Width: int64(s.MediaData.Video[0].Width), 87 Height: int64(s.MediaData.Video[0].Height), 88 Framerate: &streamplace.Segment_Framerate{ 89 Num: int64(s.MediaData.Video[0].FPSNum), 90 Den: int64(s.MediaData.Video[0].FPSDen), 91 }, 92 }, 93 }, 94 Audio: []*streamplace.Segment_Audio{ 95 { 96 Codec: "opus", 97 Rate: int64(s.MediaData.Audio[0].Rate), 98 Channels: int64(s.MediaData.Audio[0].Channels), 99 }, 100 }, 101 }, nil 102} 103 104func (m *DBModel) CreateSegment(seg *Segment) error { 105 err := m.DB.Model(Segment{}).Create(seg).Error 106 if err != nil { 107 return err 108 } 109 return nil 110} 111 112// should return the most recent segment for each user, ordered by most recent first 113// only includes segments from the last 30 seconds 114func (m *DBModel) MostRecentSegments() ([]Segment, error) { 115 var segments []Segment 116 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 117 118 err := m.DB.Table("segments"). 119 Select("segments.*"). 120 Where("id IN (?)", 121 m.DB.Table("segments"). 122 Select("id"). 123 Where("(repo_did, start_time) IN (?)", 124 m.DB.Table("segments"). 125 Select("repo_did, MAX(start_time)"). 126 Group("repo_did"))). 127 Order("start_time DESC"). 128 Joins("JOIN repos ON segments.repo_did = repos.did"). 129 Preload("Repo"). 130 Find(&segments).Error 131 132 if err != nil { 133 return nil, err 134 } 135 if segments == nil { 136 return []Segment{}, nil 137 } 138 139 filteredSegments := []Segment{} 140 for _, seg := range segments { 141 if seg.StartTime.After(thirtySecondsAgo) { 142 filteredSegments = append(filteredSegments, seg) 143 } 144 } 145 146 return filteredSegments, nil 147} 148 149func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) { 150 var seg Segment 151 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 152 if err != nil { 153 return nil, err 154 } 155 return &seg, nil 156} 157 158func (m *DBModel) GetLiveUsers() ([]Segment, error) { 159 var liveUsers []Segment 160 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time() 161 162 err := m.DB.Model(&Segment{}). 163 Preload("Repo"). 164 Where("start_time >= ?", thirtySecondsAgo). 165 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)"). 166 Order("start_time DESC"). 167 Find(&liveUsers).Error 168 169 if err != nil { 170 return nil, err 171 } 172 if liveUsers == nil { 173 return []Segment{}, nil 174 } 175 176 return liveUsers, nil 177} 178 179func (m *DBModel) GetSegment(id string) (*Segment, error) { 180 var seg Segment 181 182 err := m.DB.Model(&Segment{}). 183 Preload("Repo"). 184 Where("id = ?", id). 185 First(&seg).Error 186 187 if errors.Is(err, gorm.ErrRecordNotFound) { 188 return nil, nil 189 } 190 if err != nil { 191 return nil, err 192 } 193 194 return &seg, nil 195} 196 197func (m *DBModel) StartSegmentCleaner(ctx context.Context) error { 198 err := m.SegmentCleaner(ctx) 199 if err != nil { 200 return err 201 } 202 ticker := time.NewTicker(1 * time.Minute) 203 defer ticker.Stop() 204 205 for { 206 select { 207 case <-ctx.Done(): 208 return nil 209 case <-ticker.C: 210 err := m.SegmentCleaner(ctx) 211 if err != nil { 212 log.Error(ctx, "Failed to clean segments", "error", err) 213 } 214 } 215 } 216} 217 218func (m *DBModel) SegmentCleaner(ctx context.Context) error { 219 // Calculate the cutoff time (10 minutes ago) 220 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 221 222 // Find all unique repo_did values 223 var repoDIDs []string 224 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 225 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 226 return err 227 } 228 229 // For each user, keep their last 10 segments and delete older ones 230 for _, repoDID := range repoDIDs { 231 // Get IDs of the last 10 segments for this user 232 var keepSegmentIDs []string 233 if err := m.DB.Model(&Segment{}). 234 Where("repo_did = ?", repoDID). 235 Order("start_time DESC"). 236 Limit(10). 237 Pluck("id", &keepSegmentIDs).Error; err != nil { 238 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 239 return err 240 } 241 242 // Delete old segments except the ones we want to keep 243 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 244 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 245 246 if result.Error != nil { 247 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 248 } else if result.RowsAffected > 0 { 249 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 250 } 251 } 252 return nil 253}