Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "database/sql/driver"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10
11 "gorm.io/gorm"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/streamplace"
15)
16
17type SegmentMediadataVideo struct {
18 Width int `json:"width"`
19 Height int `json:"height"`
20 FPSNum int `json:"fpsNum"`
21 FPSDen int `json:"fpsDen"`
22}
23
24type SegmentMediadataAudio struct {
25 Rate int `json:"rate"`
26 Channels int `json:"channels"`
27}
28
29type SegmentMediaData struct {
30 Video []*SegmentMediadataVideo `json:"video"`
31 Audio []*SegmentMediadataAudio `json:"audio"`
32 Duration int64 `json:"duration"`
33}
34
35// Scan scan value into Jsonb, implements sql.Scanner interface
36func (j *SegmentMediaData) Scan(value interface{}) error {
37 bytes, ok := value.([]byte)
38 if !ok {
39 return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value))
40 }
41
42 result := SegmentMediaData{}
43 err := json.Unmarshal(bytes, &result)
44 *j = SegmentMediaData(result)
45 return err
46}
47
48// Value return json value, implement driver.Valuer interface
49func (j SegmentMediaData) Value() (driver.Value, error) {
50 return json.Marshal(j)
51}
52
53type Segment struct {
54 ID string `json:"id" gorm:"primaryKey"`
55 SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"`
56 SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"`
57 StartTime time.Time `json:"startTime" gorm:"index:latest_segments"`
58 RepoDID string `json:"repoDID" gorm:"index:latest_segments;column:repo_did"`
59 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"`
60 Title string `json:"title"`
61 MediaData *SegmentMediaData `json:"mediaData,omitempty"`
62}
63
64func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) {
65 aqt := aqtime.FromTime(s.StartTime)
66 if s.MediaData == nil {
67 return nil, fmt.Errorf("media data is nil")
68 }
69 if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil {
70 return nil, fmt.Errorf("video data is nil")
71 }
72 if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil {
73 return nil, fmt.Errorf("audio data is nil")
74 }
75 duration := s.MediaData.Duration
76 return &streamplace.Segment{
77 LexiconTypeID: "place.stream.segment",
78 Creator: s.RepoDID,
79 Id: s.ID,
80 SigningKey: s.SigningKeyDID,
81 StartTime: string(aqt),
82 Duration: &duration,
83 Video: []*streamplace.Segment_Video{
84 {
85 Codec: "h264",
86 Width: int64(s.MediaData.Video[0].Width),
87 Height: int64(s.MediaData.Video[0].Height),
88 Framerate: &streamplace.Segment_Framerate{
89 Num: int64(s.MediaData.Video[0].FPSNum),
90 Den: int64(s.MediaData.Video[0].FPSDen),
91 },
92 },
93 },
94 Audio: []*streamplace.Segment_Audio{
95 {
96 Codec: "opus",
97 Rate: int64(s.MediaData.Audio[0].Rate),
98 Channels: int64(s.MediaData.Audio[0].Channels),
99 },
100 },
101 }, nil
102}
103
104func (m *DBModel) CreateSegment(seg *Segment) error {
105 err := m.DB.Model(Segment{}).Create(seg).Error
106 if err != nil {
107 return err
108 }
109 return nil
110}
111
112// should return the most recent segment for each user, ordered by most recent first
113// only includes segments from the last 30 seconds
114func (m *DBModel) MostRecentSegments() ([]Segment, error) {
115 var segments []Segment
116 thirtySecondsAgo := time.Now().Add(-30 * time.Second)
117
118 err := m.DB.Table("segments").
119 Select("segments.*").
120 Where("id IN (?)",
121 m.DB.Table("segments").
122 Select("id").
123 Where("(repo_did, start_time) IN (?)",
124 m.DB.Table("segments").
125 Select("repo_did, MAX(start_time)").
126 Group("repo_did"))).
127 Order("start_time DESC").
128 Joins("JOIN repos ON segments.repo_did = repos.did").
129 Preload("Repo").
130 Find(&segments).Error
131
132 if err != nil {
133 return nil, err
134 }
135 if segments == nil {
136 return []Segment{}, nil
137 }
138
139 filteredSegments := []Segment{}
140 for _, seg := range segments {
141 if seg.StartTime.After(thirtySecondsAgo) {
142 filteredSegments = append(filteredSegments, seg)
143 }
144 }
145
146 return filteredSegments, nil
147}
148
149func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) {
150 var seg Segment
151 err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error
152 if err != nil {
153 return nil, err
154 }
155 return &seg, nil
156}
157
158func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error) {
159 var segs []Segment
160 if before == nil {
161 later := time.Now().Add(1000 * time.Hour)
162 before = &later
163 }
164 err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ?", user, before.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error
165 if err != nil {
166 return nil, err
167 }
168 return segs, nil
169}
170
171func (m *DBModel) GetLiveUsers() ([]Segment, error) {
172 var liveUsers []Segment
173 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time()
174
175 err := m.DB.Model(&Segment{}).
176 Preload("Repo").
177 Where("start_time >= ?", thirtySecondsAgo).
178 Where("start_time = (SELECT MAX(start_time) FROM segments s2 WHERE s2.repo_did = segments.repo_did)").
179 Order("start_time DESC").
180 Find(&liveUsers).Error
181
182 if err != nil {
183 return nil, err
184 }
185 if liveUsers == nil {
186 return []Segment{}, nil
187 }
188
189 return liveUsers, nil
190}
191
192func (m *DBModel) GetSegment(id string) (*Segment, error) {
193 var seg Segment
194
195 err := m.DB.Model(&Segment{}).
196 Preload("Repo").
197 Where("id = ?", id).
198 First(&seg).Error
199
200 if errors.Is(err, gorm.ErrRecordNotFound) {
201 return nil, nil
202 }
203 if err != nil {
204 return nil, err
205 }
206
207 return &seg, nil
208}
209
210func (m *DBModel) StartSegmentCleaner(ctx context.Context) error {
211 err := m.SegmentCleaner(ctx)
212 if err != nil {
213 return err
214 }
215 ticker := time.NewTicker(1 * time.Minute)
216 defer ticker.Stop()
217
218 for {
219 select {
220 case <-ctx.Done():
221 return nil
222 case <-ticker.C:
223 err := m.SegmentCleaner(ctx)
224 if err != nil {
225 log.Error(ctx, "Failed to clean segments", "error", err)
226 }
227 }
228 }
229}
230
231func (m *DBModel) SegmentCleaner(ctx context.Context) error {
232 // Calculate the cutoff time (10 minutes ago)
233 cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time()
234
235 // Find all unique repo_did values
236 var repoDIDs []string
237 if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil {
238 log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err)
239 return err
240 }
241
242 // For each user, keep their last 10 segments and delete older ones
243 for _, repoDID := range repoDIDs {
244 // Get IDs of the last 10 segments for this user
245 var keepSegmentIDs []string
246 if err := m.DB.Model(&Segment{}).
247 Where("repo_did = ?", repoDID).
248 Order("start_time DESC").
249 Limit(10).
250 Pluck("id", &keepSegmentIDs).Error; err != nil {
251 log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err)
252 return err
253 }
254
255 // Delete old segments except the ones we want to keep
256 result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?",
257 repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{})
258
259 if result.Error != nil {
260 log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error)
261 } else if result.RowsAffected > 0 {
262 log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected)
263 }
264 }
265 return nil
266}