Live video on the AT Protocol
at eli/postinstall-bad 266 lines 7.4 kB view raw
1package model 2 3import ( 4 "context" 5 "database/sql/driver" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "time" 10 11 "gorm.io/gorm" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/streamplace" 15) 16 17type SegmentMediadataVideo struct { 18 Width int `json:"width"` 19 Height int `json:"height"` 20 FPSNum int `json:"fpsNum"` 21 FPSDen int `json:"fpsDen"` 22} 23 24type SegmentMediadataAudio struct { 25 Rate int `json:"rate"` 26 Channels int `json:"channels"` 27} 28 29type SegmentMediaData struct { 30 Video []*SegmentMediadataVideo `json:"video"` 31 Audio []*SegmentMediadataAudio `json:"audio"` 32 Duration int64 `json:"duration"` 33} 34 35// Scan scan value into Jsonb, implements sql.Scanner interface 36func (j *SegmentMediaData) Scan(value interface{}) error { 37 bytes, ok := value.([]byte) 38 if !ok { 39 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 40 } 41 42 result := SegmentMediaData{} 43 err := json.Unmarshal(bytes, &result) 44 *j = SegmentMediaData(result) 45 return err 46} 47 48// Value return json value, implement driver.Valuer interface 49func (j SegmentMediaData) Value() (driver.Value, error) { 50 return json.Marshal(j) 51} 52 53type Segment struct { 54 ID string `json:"id" gorm:"primaryKey"` 55 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 56 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"` 57 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"` 58 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"` 59 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 60 Title string `json:"title"` 61 MediaData *SegmentMediaData `json:"mediaData,omitempty"` 62} 63 64func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 65 aqt := aqtime.FromTime(s.StartTime) 66 if s.MediaData == nil { 67 return nil, fmt.Errorf("media data is nil") 68 } 69 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 70 return nil, fmt.Errorf("video data is nil") 71 } 72 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 73 return nil, fmt.Errorf("audio data is nil") 74 } 75 duration := s.MediaData.Duration 76 return &streamplace.Segment{ 77 LexiconTypeID: "place.stream.segment", 78 Creator: s.RepoDID, 79 Id: s.ID, 80 SigningKey: s.SigningKeyDID, 81 StartTime: string(aqt), 82 Duration: &duration, 83 Video: []*streamplace.Segment_Video{ 84 { 85 Codec: "h264", 86 Width: int64(s.MediaData.Video[0].Width), 87 Height: int64(s.MediaData.Video[0].Height), 88 Framerate: &streamplace.Segment_Framerate{ 89 Num: int64(s.MediaData.Video[0].FPSNum), 90 Den: int64(s.MediaData.Video[0].FPSDen), 91 }, 92 }, 93 }, 94 Audio: []*streamplace.Segment_Audio{ 95 { 96 Codec: "opus", 97 Rate: int64(s.MediaData.Audio[0].Rate), 98 Channels: int64(s.MediaData.Audio[0].Channels), 99 }, 100 }, 101 }, nil 102} 103 104func (m *DBModel) CreateSegment(seg *Segment) error { 105 err := m.DB.Model(Segment{}).Create(seg).Error 106 if err != nil { 107 return err 108 } 109 return nil 110} 111 112// should return the most recent segment for each user, ordered by most recent first 113// only includes segments from the last 30 seconds 114func (m *DBModel) MostRecentSegments() ([]Segment, error) { 115 var segments []Segment 116 thirtySecondsAgo := time.Now().Add(-30 * time.Second) 117 118 err := m.DB.Table("segments"). 119 Select("segments.*"). 120 Where("id IN (?)", 121 m.DB.Table("segments"). 122 Select("id"). 123 Where("(repo_did, start_time) IN (?)", 124 m.DB.Table("segments"). 125 Select("repo_did, MAX(start_time)"). 126 Group("repo_did"))). 127 Order("start_time DESC"). 128 Joins("JOIN repos ON segments.repo_did = repos.did"). 129 Preload("Repo"). 130 Find(&segments).Error 131 132 if err != nil { 133 return nil, err 134 } 135 if segments == nil { 136 return []Segment{}, nil 137 } 138 139 filteredSegments := []Segment{} 140 for _, seg := range segments { 141 if seg.StartTime.After(thirtySecondsAgo) { 142 filteredSegments = append(filteredSegments, seg) 143 } 144 } 145 146 return filteredSegments, nil 147} 148 149func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) { 150 var seg Segment 151 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 152 if err != nil { 153 return nil, err 154 } 155 return &seg, nil 156} 157 158func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error) { 159 var segs []Segment 160 if before == nil { 161 later := time.Now().Add(1000 * time.Hour) 162 before = &later 163 } 164 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ?", user, before.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 165 if err != nil { 166 return nil, err 167 } 168 return segs, nil 169} 170 171func (m *DBModel) GetLiveUsers() ([]Segment, error) { 172 var liveUsers []Segment 173 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time() 174 175 err := m.DB.Model(&Segment{}). 176 Preload("Repo"). 177 Where("start_time >= ?", thirtySecondsAgo). 178 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)"). 179 Order("start_time DESC"). 180 Find(&liveUsers).Error 181 182 if err != nil { 183 return nil, err 184 } 185 if liveUsers == nil { 186 return []Segment{}, nil 187 } 188 189 return liveUsers, nil 190} 191 192func (m *DBModel) GetSegment(id string) (*Segment, error) { 193 var seg Segment 194 195 err := m.DB.Model(&Segment{}). 196 Preload("Repo"). 197 Where("id = ?", id). 198 First(&seg).Error 199 200 if errors.Is(err, gorm.ErrRecordNotFound) { 201 return nil, nil 202 } 203 if err != nil { 204 return nil, err 205 } 206 207 return &seg, nil 208} 209 210func (m *DBModel) StartSegmentCleaner(ctx context.Context) error { 211 err := m.SegmentCleaner(ctx) 212 if err != nil { 213 return err 214 } 215 ticker := time.NewTicker(1 * time.Minute) 216 defer ticker.Stop() 217 218 for { 219 select { 220 case <-ctx.Done(): 221 return nil 222 case <-ticker.C: 223 err := m.SegmentCleaner(ctx) 224 if err != nil { 225 log.Error(ctx, "Failed to clean segments", "error", err) 226 } 227 } 228 } 229} 230 231func (m *DBModel) SegmentCleaner(ctx context.Context) error { 232 // Calculate the cutoff time (10 minutes ago) 233 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 234 235 // Find all unique repo_did values 236 var repoDIDs []string 237 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 238 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 239 return err 240 } 241 242 // For each user, keep their last 10 segments and delete older ones 243 for _, repoDID := range repoDIDs { 244 // Get IDs of the last 10 segments for this user 245 var keepSegmentIDs []string 246 if err := m.DB.Model(&Segment{}). 247 Where("repo_did = ?", repoDID). 248 Order("start_time DESC"). 249 Limit(10). 250 Pluck("id", &keepSegmentIDs).Error; err != nil { 251 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 252 return err 253 } 254 255 // Delete old segments except the ones we want to keep 256 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 257 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 258 259 if result.Error != nil { 260 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 261 } else if result.RowsAffected > 0 { 262 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 263 } 264 } 265 return nil 266}