Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "database/sql/driver"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/streamplace"
15)
16
17type SegmentMediadataVideo struct {
18 Width int `json:"width"`
19 Height int `json:"height"`
20 FPSNum int `json:"fpsNum"`
21 FPSDen int `json:"fpsDen"`
22 BFrames bool `json:"bframes"`
23}
24
25type SegmentMediadataAudio struct {
26 Rate int `json:"rate"`
27 Channels int `json:"channels"`
28}
29
30type SegmentMediaData struct {
31 Video []*SegmentMediadataVideo `json:"video"`
32 Audio []*SegmentMediadataAudio `json:"audio"`
33 Duration int64 `json:"duration"`
34 Size int `json:"size"`
35}
36
37// Scan scan value into Jsonb, implements sql.Scanner interface
38func (j *SegmentMediaData) Scan(value any) error {
39 bytes, ok := value.([]byte)
40 if !ok {
41 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
42 }
43
44 result := SegmentMediaData{}
45 err := json.Unmarshal(bytes, &result)
46 *j = SegmentMediaData(result)
47 return err
48}
49
50// Value return json value, implement driver.Valuer interface
51func (j SegmentMediaData) Value() (driver.Value, error) {
52 return json.Marshal(j)
53}
54
55type Segment struct {
56 ID string `json:"id" gorm:"primaryKey"`
57 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
58 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
59 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"`
60 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"`
61 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
62 Title string `json:"title"`
63 Size int `json:"size" gorm:"column:size"`
64 MediaData *SegmentMediaData `json:"mediaData,omitempty"`
65}
66
67func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) {
68 aqt := aqtime.FromTime(s.StartTime)
69 if s.MediaData == nil {
70 return nil, fmt.Errorf("media data is nil")
71 }
72 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil {
73 return nil, fmt.Errorf("video data is nil")
74 }
75 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil {
76 return nil, fmt.Errorf("audio data is nil")
77 }
78 duration := s.MediaData.Duration
79 sizei64 := int64(s.Size)
80 return &streamplace.Segment{
81 LexiconTypeID: "place.stream.segment",
82 Creator: s.RepoDID,
83 Id: s.ID,
84 SigningKey: s.SigningKeyDID,
85 StartTime: string(aqt),
86 Duration: &duration,
87 Size: &sizei64,
88 Video: []*streamplace.Segment_Video{
89 {
90 Codec: "h264",
91 Width: int64(s.MediaData.Video[0].Width),
92 Height: int64(s.MediaData.Video[0].Height),
93 Framerate: &streamplace.Segment_Framerate{
94 Num: int64(s.MediaData.Video[0].FPSNum),
95 Den: int64(s.MediaData.Video[0].FPSDen),
96 },
97 Bframes: &s.MediaData.Video[0].BFrames,
98 },
99 },
100 Audio: []*streamplace.Segment_Audio{
101 {
102 Codec: "opus",
103 Rate: int64(s.MediaData.Audio[0].Rate),
104 Channels: int64(s.MediaData.Audio[0].Channels),
105 },
106 },
107 }, nil
108}
109
110func (m *DBModel) CreateSegment(seg *Segment) error {
111 err := m.DB.Model(Segment{}).Create(seg).Error
112 if err != nil {
113 return err
114 }
115 return nil
116}
117
118// should return the most recent segment for each user, ordered by most recent first
119// only includes segments from the last 30 seconds
120func (m *DBModel) MostRecentSegments() ([]Segment, error) {
121 var segments []Segment
122 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
123
124 err := m.DB.Table("segments").
125 Select("segments.*").
126 Where("id IN (?)",
127 m.DB.Table("segments").
128 Select("id").
129 Where("(repo_did, start_time) IN (?)",
130 m.DB.Table("segments").
131 Select("repo_did, MAX(start_time)").
132 Group("repo_did"))).
133 Order("start_time DESC").
134 Joins("JOIN repos ON segments.repo_did = repos.did").
135 Preload("Repo").
136 Find(&segments).Error
137
138 if err != nil {
139 return nil, err
140 }
141 if segments == nil {
142 return []Segment{}, nil
143 }
144
145 filteredSegments := []Segment{}
146 for _, seg := range segments {
147 if seg.StartTime.After(thirtySecondsAgo) {
148 filteredSegments = append(filteredSegments, seg)
149 }
150 }
151
152 return filteredSegments, nil
153}
154
155func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) {
156 var seg Segment
157 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error
158 if err != nil {
159 return nil, err
160 }
161 return &seg, nil
162}
163
164func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) {
165 var segs []Segment
166 if before == nil {
167 later := time.Now().Add(1000 * time.Hour)
168 before = &later
169 }
170 if after == nil {
171 earlier := time.Time{}
172 after = &earlier
173 }
174 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error
175 if err != nil {
176 return nil, err
177 }
178 return segs, nil
179}
180
181func (m *DBModel) GetLiveUsers() ([]Segment, error) {
182 var liveUsers []Segment
183 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time()
184
185 err := m.DB.Model(&Segment{}).
186 Preload("Repo").
187 Where("start_time >= ?", thirtySecondsAgo).
188 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)").
189 Order("start_time DESC").
190 Find(&liveUsers).Error
191
192 if err != nil {
193 return nil, err
194 }
195 if liveUsers == nil {
196 return []Segment{}, nil
197 }
198
199 return liveUsers, nil
200}
201
202func (m *DBModel) GetSegment(id string) (*Segment, error) {
203 var seg Segment
204
205 err := m.DB.Model(&Segment{}).
206 Preload("Repo").
207 Where("id = ?", id).
208 First(&seg).Error
209
210 if errors.Is(err, gorm.ErrRecordNotFound) {
211 return nil, nil
212 }
213 if err != nil {
214 return nil, err
215 }
216
217 return &seg, nil
218}
219
220func (m *DBModel) StartSegmentCleaner(ctx context.Context) error {
221 err := m.SegmentCleaner(ctx)
222 if err != nil {
223 return err
224 }
225 ticker := time.NewTicker(1 * time.Minute)
226 defer ticker.Stop()
227
228 for {
229 select {
230 case <-ctx.Done():
231 return nil
232 case <-ticker.C:
233 err := m.SegmentCleaner(ctx)
234 if err != nil {
235 log.Error(ctx, "Failed to clean segments", "error", err)
236 }
237 }
238 }
239}
240
241func (m *DBModel) SegmentCleaner(ctx context.Context) error {
242 // Calculate the cutoff time (10 minutes ago)
243 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time()
244
245 // Find all unique repo_did values
246 var repoDIDs []string
247 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil {
248 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err)
249 return err
250 }
251
252 // For each user, keep their last 10 segments and delete older ones
253 for _, repoDID := range repoDIDs {
254 // Get IDs of the last 10 segments for this user
255 var keepSegmentIDs []string
256 if err := m.DB.Model(&Segment{}).
257 Where("repo_did = ?", repoDID).
258 Order("start_time DESC").
259 Limit(10).
260 Pluck("id", &keepSegmentIDs).Error; err != nil {
261 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err)
262 return err
263 }
264
265 // Delete old segments except the ones we want to keep
266 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?",
267 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{})
268
269 if result.Error != nil {
270 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error)
271 } else if result.RowsAffected > 0 {
272 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected)
273 }
274 }
275 return nil
276}