Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "database/sql/driver"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10
11 "github.com/bluesky-social/indigo/lex/util"
12 "gorm.io/gorm"
13 "stream.place/streamplace/pkg/aqtime"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/streamplace"
16)
17
18type SegmentMediadataVideo struct {
19 Width int `json:"width"`
20 Height int `json:"height"`
21 FPSNum int `json:"fpsNum"`
22 FPSDen int `json:"fpsDen"`
23}
24
25type SegmentMediadataAudio struct {
26 Rate int `json:"rate"`
27 Channels int `json:"channels"`
28}
29
30type SegmentMediaData struct {
31 Video []*SegmentMediadataVideo `json:"video"`
32 Audio []*SegmentMediadataAudio `json:"audio"`
33 Duration int64 `json:"duration"`
34}
35
36// Scan scan value into Jsonb, implements sql.Scanner interface
37func (j *SegmentMediaData) Scan(value interface{}) error {
38 bytes, ok := value.([]byte)
39 if !ok {
40 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
41 }
42
43 result := SegmentMediaData{}
44 err := json.Unmarshal(bytes, &result)
45 *j = SegmentMediaData(result)
46 return err
47}
48
49// Value return json value, implement driver.Valuer interface
50func (j SegmentMediaData) Value() (driver.Value, error) {
51 return json.Marshal(j)
52}
53
54type Segment struct {
55 ID string `json:"id" gorm:"primaryKey"`
56 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
57 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
58 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"`
59 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"`
60 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
61 Title string `json:"title"`
62 MediaData *SegmentMediaData `json:"mediaData,omitempty"`
63}
64
65func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) {
66 aqt := aqtime.FromTime(s.StartTime)
67 if s.MediaData == nil {
68 return nil, fmt.Errorf("media data is nil")
69 }
70 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil {
71 return nil, fmt.Errorf("video data is nil")
72 }
73 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil {
74 return nil, fmt.Errorf("audio data is nil")
75 }
76 duration := s.MediaData.Duration
77 return &streamplace.Segment{
78 LexiconTypeID: "place.stream.segment",
79 Creator: s.RepoDID,
80 Id: s.ID,
81 SigningKey: s.SigningKeyDID,
82 StartTime: string(aqt),
83 Duration: &duration,
84 Video: []*streamplace.Segment_Video{
85 {
86 Codec: "h264",
87 Width: int64(s.MediaData.Video[0].Width),
88 Height: int64(s.MediaData.Video[0].Height),
89 Framerate: &streamplace.Segment_Framerate{
90 Num: int64(s.MediaData.Video[0].FPSNum),
91 Den: int64(s.MediaData.Video[0].FPSDen),
92 },
93 },
94 },
95 Audio: []*streamplace.Segment_Audio{
96 {
97 Codec: "opus",
98 Rate: int64(s.MediaData.Audio[0].Rate),
99 Channels: int64(s.MediaData.Audio[0].Channels),
100 },
101 },
102 }, nil
103}
104
105func (m *DBModel) CreateSegment(seg *Segment) error {
106 err := m.DB.Model(Segment{}).Create(seg).Error
107 if err != nil {
108 return err
109 }
110 return nil
111}
112
113// should return the most recent segment for each user, ordered by most recent first
114// only includes segments from the last 30 seconds
115func (m *DBModel) MostRecentSegments() ([]Segment, error) {
116 var segments []Segment
117 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
118
119 err := m.DB.Table("segments").
120 Select("segments.*").
121 Where("id IN (?)",
122 m.DB.Table("segments").
123 Select("id").
124 Where("(repo_did, start_time) IN (?)",
125 m.DB.Table("segments").
126 Select("repo_did, MAX(start_time)").
127 Group("repo_did"))).
128 Order("start_time DESC").
129 Joins("JOIN repos ON segments.repo_did = repos.did").
130 Preload("Repo").
131 Find(&segments).Error
132
133 if err != nil {
134 return nil, err
135 }
136 if segments == nil {
137 return []Segment{}, nil
138 }
139
140 filteredSegments := []Segment{}
141 for _, seg := range segments {
142 if seg.StartTime.After(thirtySecondsAgo) {
143 filteredSegments = append(filteredSegments, seg)
144 }
145 }
146
147 return filteredSegments, nil
148}
149
150func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) {
151 var seg Segment
152 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error
153 if err != nil {
154 return nil, err
155 }
156 return &seg, nil
157}
158
159func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error) {
160 var segs []Segment
161 if before == nil {
162 later := time.Now().Add(1000 * time.Hour)
163 before = &later
164 }
165 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ?", user, before.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error
166 if err != nil {
167 return nil, err
168 }
169 return segs, nil
170}
171
172func (m *DBModel) GetLiveUsers() ([]Segment, error) {
173 var liveUsers []Segment
174 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time()
175
176 err := m.DB.Model(&Segment{}).
177 Preload("Repo").
178 Where("start_time >= ?", thirtySecondsAgo).
179 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)").
180 Order("start_time DESC").
181 Find(&liveUsers).Error
182
183 if err != nil {
184 return nil, err
185 }
186 if liveUsers == nil {
187 return []Segment{}, nil
188 }
189
190 return liveUsers, nil
191}
192
193func (m *DBModel) GetSegment(id string) (*Segment, error) {
194 var seg Segment
195
196 err := m.DB.Model(&Segment{}).
197 Preload("Repo").
198 Where("id = ?", id).
199 First(&seg).Error
200
201 if errors.Is(err, gorm.ErrRecordNotFound) {
202 return nil, nil
203 }
204 if err != nil {
205 return nil, err
206 }
207
208 return &seg, nil
209}
210
211func (m *DBModel) StartSegmentCleaner(ctx context.Context) error {
212 err := m.SegmentCleaner(ctx)
213 if err != nil {
214 return err
215 }
216 ticker := time.NewTicker(1 * time.Minute)
217 defer ticker.Stop()
218
219 for {
220 select {
221 case <-ctx.Done():
222 return nil
223 case <-ticker.C:
224 err := m.SegmentCleaner(ctx)
225 if err != nil {
226 log.Error(ctx, "Failed to clean segments", "error", err)
227 }
228 }
229 }
230}
231
232func (m *DBModel) SegmentCleaner(ctx context.Context) error {
233 // Calculate the cutoff time (10 minutes ago)
234 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time()
235
236 // Find all unique repo_did values
237 var repoDIDs []string
238 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil {
239 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err)
240 return err
241 }
242
243 // For each user, keep their last 10 segments and delete older ones
244 for _, repoDID := range repoDIDs {
245 // Get IDs of the last 10 segments for this user
246 var keepSegmentIDs []string
247 if err := m.DB.Model(&Segment{}).
248 Where("repo_did = ?", repoDID).
249 Order("start_time DESC").
250 Limit(10).
251 Pluck("id", &keepSegmentIDs).Error; err != nil {
252 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err)
253 return err
254 }
255
256 // Delete old segments except the ones we want to keep
257 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?",
258 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{})
259
260 if result.Error != nil {
261 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error)
262 } else if result.RowsAffected > 0 {
263 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected)
264 }
265 }
266 return nil
267}
268
269func (m *DBModel) MostRecentSegmentsWithStreamInfo() ([]SegmentWithStreamInfo, error) {
270 var recentSegments []Segment
271 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
272
273 // fetch recent segments made <30s ago
274 err := m.DB.Table("segments").
275 Select("segments.*").
276 Where("segments.id IN (?) AND segments.start_time > ?",
277 m.DB.Table("segments").
278 Select("id").
279 Where("(repo_did, start_time) IN (?)",
280 m.DB.Table("segments").
281 Select("repo_did, MAX(start_time)").
282 Group("repo_did")),
283 thirtySecondsAgo).
284 Order("segments.start_time DESC").
285 Preload("Repo"). // Preload Repo for the Segment
286 Find(&recentSegments).Error
287
288 if err != nil {
289 return nil, fmt.Errorf("error fetching recent segments: %w", err)
290 }
291
292 if len(recentSegments) == 0 {
293 return []SegmentWithStreamInfo{}, nil
294 }
295
296 // get all repo DIDs
297 repoDIDs := make([]string, 0, len(recentSegments))
298 repoDIDSet := make(map[string]struct{})
299 for _, seg := range recentSegments {
300 if _, exists := repoDIDSet[seg.RepoDID]; !exists {
301 repoDIDs = append(repoDIDs, seg.RepoDID)
302 repoDIDSet[seg.RepoDID] = struct{}{}
303 }
304 }
305
306 // fetch latest stream record for each DID
307 var latestLivestreams []Livestream
308 if len(repoDIDs) > 0 {
309 subQuery := m.DB.Table("livestreams").
310 Select("repo_did, MAX(created_at) as max_created_at").
311 Where("repo_did IN (?)", repoDIDs).
312 Group("repo_did")
313
314 err = m.DB.Table("livestreams as ls").
315 Joins("JOIN (?) latest_ls ON ls.repo_did = latest_ls.repo_did AND ls.created_at = latest_ls.max_created_at", subQuery).
316 Where("ls.repo_did IN (?)", repoDIDs).
317 Preload("Repo"). // Preload Repo for the Livestream
318 Find(&latestLivestreams).Error
319
320 if err != nil {
321 return nil, fmt.Errorf("error fetching latest livestreams for repos: %w", err)
322 }
323 }
324
325 // map livestreams for easy lookup below, convert to LivestreamView
326 livestreamMap := make(map[string]*streamplace.Livestream_LivestreamView)
327 for i := range latestLivestreams {
328 view, err := latestLivestreams[i].ToLivestreamView()
329 if err != nil {
330 log.Error(context.Background(), "Couldn't convert Livestream object to LivestreamView, skipping", "error", err)
331 } else {
332 // extract inner information
333 livestreamMap[latestLivestreams[i].RepoDID] = view
334 }
335 }
336
337 // finally, make the resulting SegmentWithStreamInfos
338 resultWithStreamInfo := make([]SegmentWithStreamInfo, len(recentSegments))
339 for i, seg := range recentSegments {
340 view, ok := livestreamMap[seg.RepoDID]
341 if !ok {
342 log.Error(context.Background(), "No livestream view found for repo_did", "repo_did", seg.RepoDID)
343 continue
344 }
345 resultWithStreamInfo[i] = SegmentWithStreamInfo{
346 Segment: seg,
347 LivestreamView: view.Record,
348 }
349 }
350
351 return resultWithStreamInfo, nil
352}
353
354type SegmentWithStreamInfo struct {
355 Segment
356 // possibly incorrect/weird type
357 LivestreamView *util.LexiconTypeDecoder `json:"streamRecord,omitempty"`
358}