Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "database/sql/driver"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/streamplace"
15)
16
17type SegmentMediadataVideo struct {
18 Width int `json:"width"`
19 Height int `json:"height"`
20 FPSNum int `json:"fpsNum"`
21 FPSDen int `json:"fpsDen"`
22 BFrames bool `json:"bframes"`
23}
24
25type SegmentMediadataAudio struct {
26 Rate int `json:"rate"`
27 Channels int `json:"channels"`
28}
29
30type SegmentMediaData struct {
31 Video []*SegmentMediadataVideo `json:"video"`
32 Audio []*SegmentMediadataAudio `json:"audio"`
33 Duration int64 `json:"duration"`
34}
35
36// Scan scan value into Jsonb, implements sql.Scanner interface
37func (j *SegmentMediaData) Scan(value interface{}) error {
38 bytes, ok := value.([]byte)
39 if !ok {
40 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
41 }
42
43 result := SegmentMediaData{}
44 err := json.Unmarshal(bytes, &result)
45 *j = SegmentMediaData(result)
46 return err
47}
48
49// Value return json value, implement driver.Valuer interface
50func (j SegmentMediaData) Value() (driver.Value, error) {
51 return json.Marshal(j)
52}
53
54type Segment struct {
55 ID string `json:"id" gorm:"primaryKey"`
56 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
57 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
58 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"`
59 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"`
60 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
61 Title string `json:"title"`
62 MediaData *SegmentMediaData `json:"mediaData,omitempty"`
63}
64
65func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) {
66 aqt := aqtime.FromTime(s.StartTime)
67 if s.MediaData == nil {
68 return nil, fmt.Errorf("media data is nil")
69 }
70 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil {
71 return nil, fmt.Errorf("video data is nil")
72 }
73 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil {
74 return nil, fmt.Errorf("audio data is nil")
75 }
76 duration := s.MediaData.Duration
77 return &streamplace.Segment{
78 LexiconTypeID: "place.stream.segment",
79 Creator: s.RepoDID,
80 Id: s.ID,
81 SigningKey: s.SigningKeyDID,
82 StartTime: string(aqt),
83 Duration: &duration,
84 Video: []*streamplace.Segment_Video{
85 {
86 Codec: "h264",
87 Width: int64(s.MediaData.Video[0].Width),
88 Height: int64(s.MediaData.Video[0].Height),
89 Framerate: &streamplace.Segment_Framerate{
90 Num: int64(s.MediaData.Video[0].FPSNum),
91 Den: int64(s.MediaData.Video[0].FPSDen),
92 },
93 Bframes: &s.MediaData.Video[0].BFrames,
94 },
95 },
96 Audio: []*streamplace.Segment_Audio{
97 {
98 Codec: "opus",
99 Rate: int64(s.MediaData.Audio[0].Rate),
100 Channels: int64(s.MediaData.Audio[0].Channels),
101 },
102 },
103 }, nil
104}
105
106func (m *DBModel) CreateSegment(seg *Segment) error {
107 err := m.DB.Model(Segment{}).Create(seg).Error
108 if err != nil {
109 return err
110 }
111 return nil
112}
113
114// should return the most recent segment for each user, ordered by most recent first
115// only includes segments from the last 30 seconds
116func (m *DBModel) MostRecentSegments() ([]Segment, error) {
117 var segments []Segment
118 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
119
120 err := m.DB.Table("segments").
121 Select("segments.*").
122 Where("id IN (?)",
123 m.DB.Table("segments").
124 Select("id").
125 Where("(repo_did, start_time) IN (?)",
126 m.DB.Table("segments").
127 Select("repo_did, MAX(start_time)").
128 Group("repo_did"))).
129 Order("start_time DESC").
130 Joins("JOIN repos ON segments.repo_did = repos.did").
131 Preload("Repo").
132 Find(&segments).Error
133
134 if err != nil {
135 return nil, err
136 }
137 if segments == nil {
138 return []Segment{}, nil
139 }
140
141 filteredSegments := []Segment{}
142 for _, seg := range segments {
143 if seg.StartTime.After(thirtySecondsAgo) {
144 filteredSegments = append(filteredSegments, seg)
145 }
146 }
147
148 return filteredSegments, nil
149}
150
151func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) {
152 var seg Segment
153 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error
154 if err != nil {
155 return nil, err
156 }
157 return &seg, nil
158}
159
160func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error) {
161 var segs []Segment
162 if before == nil {
163 later := time.Now().Add(1000 * time.Hour)
164 before = &later
165 }
166 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ?", user, before.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error
167 if err != nil {
168 return nil, err
169 }
170 return segs, nil
171}
172
173func (m *DBModel) GetLiveUsers() ([]Segment, error) {
174 var liveUsers []Segment
175 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time()
176
177 err := m.DB.Model(&Segment{}).
178 Preload("Repo").
179 Where("start_time >= ?", thirtySecondsAgo).
180 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)").
181 Order("start_time DESC").
182 Find(&liveUsers).Error
183
184 if err != nil {
185 return nil, err
186 }
187 if liveUsers == nil {
188 return []Segment{}, nil
189 }
190
191 return liveUsers, nil
192}
193
194func (m *DBModel) GetSegment(id string) (*Segment, error) {
195 var seg Segment
196
197 err := m.DB.Model(&Segment{}).
198 Preload("Repo").
199 Where("id = ?", id).
200 First(&seg).Error
201
202 if errors.Is(err, gorm.ErrRecordNotFound) {
203 return nil, nil
204 }
205 if err != nil {
206 return nil, err
207 }
208
209 return &seg, nil
210}
211
212func (m *DBModel) StartSegmentCleaner(ctx context.Context) error {
213 err := m.SegmentCleaner(ctx)
214 if err != nil {
215 return err
216 }
217 ticker := time.NewTicker(1 * time.Minute)
218 defer ticker.Stop()
219
220 for {
221 select {
222 case <-ctx.Done():
223 return nil
224 case <-ticker.C:
225 err := m.SegmentCleaner(ctx)
226 if err != nil {
227 log.Error(ctx, "Failed to clean segments", "error", err)
228 }
229 }
230 }
231}
232
233func (m *DBModel) SegmentCleaner(ctx context.Context) error {
234 // Calculate the cutoff time (10 minutes ago)
235 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time()
236
237 // Find all unique repo_did values
238 var repoDIDs []string
239 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil {
240 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err)
241 return err
242 }
243
244 // For each user, keep their last 10 segments and delete older ones
245 for _, repoDID := range repoDIDs {
246 // Get IDs of the last 10 segments for this user
247 var keepSegmentIDs []string
248 if err := m.DB.Model(&Segment{}).
249 Where("repo_did = ?", repoDID).
250 Order("start_time DESC").
251 Limit(10).
252 Pluck("id", &keepSegmentIDs).Error; err != nil {
253 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err)
254 return err
255 }
256
257 // Delete old segments except the ones we want to keep
258 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?",
259 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{})
260
261 if result.Error != nil {
262 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error)
263 } else if result.RowsAffected > 0 {
264 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected)
265 }
266 }
267 return nil
268}