Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "database/sql/driver"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/streamplace"
15)
16
17type SegmentMediadataVideo struct {
18 Width int `json:"width"`
19 Height int `json:"height"`
20 FPSNum int `json:"fpsNum"`
21 FPSDen int `json:"fpsDen"`
22}
23
24type SegmentMediadataAudio struct {
25 Rate int `json:"rate"`
26 Channels int `json:"channels"`
27}
28
29type SegmentMediaData struct {
30 Video []*SegmentMediadataVideo `json:"video"`
31 Audio []*SegmentMediadataAudio `json:"audio"`
32 Duration int64 `json:"duration"`
33}
34
35// Scan scan value into Jsonb, implements sql.Scanner interface
36func (j *SegmentMediaData) Scan(value interface{}) error {
37 bytes, ok := value.([]byte)
38 if !ok {
39 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
40 }
41
42 result := SegmentMediaData{}
43 err := json.Unmarshal(bytes, &result)
44 *j = SegmentMediaData(result)
45 return err
46}
47
48// Value return json value, implement driver.Valuer interface
49func (j SegmentMediaData) Value() (driver.Value, error) {
50 return json.Marshal(j)
51}
52
53type Segment struct {
54 ID string `json:"id" gorm:"primaryKey"`
55 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
56 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
57 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"`
58 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"`
59 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
60 Title string `json:"title"`
61 MediaData *SegmentMediaData `json:"mediaData,omitempty"`
62}
63
64func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) {
65 aqt := aqtime.FromTime(s.StartTime)
66 if s.MediaData == nil {
67 return nil, fmt.Errorf("media data is nil")
68 }
69 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil {
70 return nil, fmt.Errorf("video data is nil")
71 }
72 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil {
73 return nil, fmt.Errorf("audio data is nil")
74 }
75 duration := s.MediaData.Duration
76 return &streamplace.Segment{
77 LexiconTypeID: "place.stream.segment",
78 Creator: s.RepoDID,
79 Id: s.ID,
80 SigningKey: s.SigningKeyDID,
81 StartTime: string(aqt),
82 Duration: &duration,
83 Video: []*streamplace.Segment_Video{
84 {
85 Codec: "h264",
86 Width: int64(s.MediaData.Video[0].Width),
87 Height: int64(s.MediaData.Video[0].Height),
88 Framerate: &streamplace.Segment_Framerate{
89 Num: int64(s.MediaData.Video[0].FPSNum),
90 Den: int64(s.MediaData.Video[0].FPSDen),
91 },
92 },
93 },
94 Audio: []*streamplace.Segment_Audio{
95 {
96 Codec: "opus",
97 Rate: int64(s.MediaData.Audio[0].Rate),
98 Channels: int64(s.MediaData.Audio[0].Channels),
99 },
100 },
101 }, nil
102}
103
104func (m *DBModel) CreateSegment(seg *Segment) error {
105 err := m.DB.Model(Segment{}).Create(seg).Error
106 if err != nil {
107 return err
108 }
109 return nil
110}
111
112// should return the most recent segment for each user, ordered by most recent first
113// only includes segments from the last 30 seconds
114func (m *DBModel) MostRecentSegments() ([]Segment, error) {
115 var segments []Segment
116 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
117
118 err := m.DB.Table("segments").
119 Select("segments.*").
120 Where("id IN (?)",
121 m.DB.Table("segments").
122 Select("id").
123 Where("(repo_did, start_time) IN (?)",
124 m.DB.Table("segments").
125 Select("repo_did, MAX(start_time)").
126 Group("repo_did"))).
127 Order("start_time DESC").
128 Joins("JOIN repos ON segments.repo_did = repos.did").
129 Preload("Repo").
130 Find(&segments).Error
131
132 if err != nil {
133 return nil, err
134 }
135 if segments == nil {
136 return []Segment{}, nil
137 }
138
139 filteredSegments := []Segment{}
140 for _, seg := range segments {
141 if seg.StartTime.After(thirtySecondsAgo) {
142 filteredSegments = append(filteredSegments, seg)
143 }
144 }
145
146 return filteredSegments, nil
147}
148
149func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) {
150 var seg Segment
151 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error
152 if err != nil {
153 return nil, err
154 }
155 return &seg, nil
156}
157
158func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) {
159 var segs []Segment
160 if before == nil {
161 later := time.Now().Add(1000 * time.Hour)
162 before = &later
163 }
164 if after == nil {
165 earlier := time.Time{}
166 after = &earlier
167 }
168 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error
169 if err != nil {
170 return nil, err
171 }
172 return segs, nil
173}
174
175func (m *DBModel) GetLiveUsers() ([]Segment, error) {
176 var liveUsers []Segment
177 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time()
178
179 err := m.DB.Model(&Segment{}).
180 Preload("Repo").
181 Where("start_time >= ?", thirtySecondsAgo).
182 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)").
183 Order("start_time DESC").
184 Find(&liveUsers).Error
185
186 if err != nil {
187 return nil, err
188 }
189 if liveUsers == nil {
190 return []Segment{}, nil
191 }
192
193 return liveUsers, nil
194}
195
196func (m *DBModel) GetSegment(id string) (*Segment, error) {
197 var seg Segment
198
199 err := m.DB.Model(&Segment{}).
200 Preload("Repo").
201 Where("id = ?", id).
202 First(&seg).Error
203
204 if errors.Is(err, gorm.ErrRecordNotFound) {
205 return nil, nil
206 }
207 if err != nil {
208 return nil, err
209 }
210
211 return &seg, nil
212}
213
214func (m *DBModel) StartSegmentCleaner(ctx context.Context) error {
215 err := m.SegmentCleaner(ctx)
216 if err != nil {
217 return err
218 }
219 ticker := time.NewTicker(1 * time.Minute)
220 defer ticker.Stop()
221
222 for {
223 select {
224 case <-ctx.Done():
225 return nil
226 case <-ticker.C:
227 err := m.SegmentCleaner(ctx)
228 if err != nil {
229 log.Error(ctx, "Failed to clean segments", "error", err)
230 }
231 }
232 }
233}
234
235func (m *DBModel) SegmentCleaner(ctx context.Context) error {
236 // Calculate the cutoff time (10 minutes ago)
237 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time()
238
239 // Find all unique repo_did values
240 var repoDIDs []string
241 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil {
242 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err)
243 return err
244 }
245
246 // For each user, keep their last 10 segments and delete older ones
247 for _, repoDID := range repoDIDs {
248 // Get IDs of the last 10 segments for this user
249 var keepSegmentIDs []string
250 if err := m.DB.Model(&Segment{}).
251 Where("repo_did = ?", repoDID).
252 Order("start_time DESC").
253 Limit(10).
254 Pluck("id", &keepSegmentIDs).Error; err != nil {
255 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err)
256 return err
257 }
258
259 // Delete old segments except the ones we want to keep
260 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?",
261 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{})
262
263 if result.Error != nil {
264 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error)
265 } else if result.RowsAffected > 0 {
266 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected)
267 }
268 }
269 return nil
270}