Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "github.com/streamplace/oatproxy/pkg/oatproxy"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/aqtime"
17 "stream.place/streamplace/pkg/bus"
18 "stream.place/streamplace/pkg/config"
19 "stream.place/streamplace/pkg/livepeer"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/media"
22 "stream.place/streamplace/pkg/media/segchanman"
23 "stream.place/streamplace/pkg/model"
24 "stream.place/streamplace/pkg/renditions"
25 "stream.place/streamplace/pkg/spmetrics"
26 "stream.place/streamplace/pkg/streamplace"
27 "stream.place/streamplace/pkg/thumbnail"
28)
29
30type StreamSession struct {
31 mm *media.MediaManager
32 mod model.Model
33 cli *config.CLI
34 bus *bus.Bus
35 op *oatproxy.OATProxy
36 hls *media.M3U8
37 lp *livepeer.LivepeerSession
38 repoDID string
39 segmentChan chan struct{}
40 lastStatus time.Time
41 lastStatusLock sync.Mutex
42}
43
44func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error {
45
46 sid := livepeer.RandomTrailer(8)
47 ctx = log.WithLogValues(ctx, "sid", sid)
48 ctx, cancel := context.WithCancel(ctx)
49 log.Log(ctx, "starting stream session")
50 defer cancel()
51 spseg, err := not.Segment.ToStreamplaceSegment()
52 if err != nil {
53 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
54 }
55 var allRenditions renditions.Renditions
56
57 if ss.cli.LivepeerGatewayURL != "" {
58 allRenditions, err = renditions.GenerateRenditions(spseg)
59 } else {
60 allRenditions = []renditions.Rendition{}
61 }
62 if err != nil {
63 return err
64 }
65 if spseg.Duration == nil {
66 return fmt.Errorf("segment duration is required to calculate bitrate")
67 }
68 dur := time.Duration(*spseg.Duration)
69 byteLen := len(not.Data)
70 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
71 sourceRendition := renditions.Rendition{
72 Name: "source",
73 Bitrate: bitrate,
74 Width: spseg.Video[0].Width,
75 Height: spseg.Video[0].Height,
76 }
77 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
78 ss.hls = media.NewM3U8(allRenditions)
79
80 g, ctx := errgroup.WithContext(ctx)
81
82 // for _, r := range allRenditions {
83 // g.Go(func() error {
84 // for {
85 // if ctx.Err() != nil {
86 // return nil
87 // }
88 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
89 // if ctx.Err() != nil {
90 // return nil
91 // }
92 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
93 // time.Sleep(time.Second * 5)
94 // }
95 // })
96 // }
97
98 for {
99 select {
100 case <-ss.segmentChan:
101 // reset timer
102 case <-ctx.Done():
103 return g.Wait()
104 // case <-time.After(time.Minute * 1):
105 case <-time.After(time.Second * 60):
106 log.Log(ctx, "no new segments for 1 minute, shutting down")
107 cancel()
108 }
109 }
110}
111
112func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error {
113 if ctx.Err() != nil {
114 return nil
115 }
116 ss.segmentChan <- struct{}{}
117 aqt := aqtime.FromTime(not.Segment.StartTime)
118 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString())
119 err := ss.mod.CreateSegment(not.Segment)
120 if err != nil {
121 return fmt.Errorf("could not add segment to database: %w", err)
122 }
123 spseg, err := not.Segment.ToStreamplaceSegment()
124 if err != nil {
125 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
126 }
127
128 ss.bus.Publish(spseg.Creator, spseg)
129 go ss.TryAddToHLS(ctx, spseg, "source", not.Data)
130
131 if ss.cli.Thumbnail {
132 go func() {
133 err := ss.Thumbnail(ctx, spseg.Creator, not)
134 if err != nil {
135 log.Error(ctx, "could not create thumbnail", "error", err)
136 }
137 }()
138 }
139
140 go func() {
141 err := ss.UpdateStatus(ctx, spseg.Creator)
142 if err != nil {
143 log.Error(ctx, "could not update status", "error", err)
144 }
145 }()
146
147 if ss.cli.LivepeerGatewayURL != "" {
148 go func() {
149 start := time.Now()
150 err := ss.Transcode(ctx, spseg, not.Data)
151 took := time.Since(start)
152 if err != nil {
153 log.Error(ctx, "could not transcode", "error", err, "took", took)
154 } else {
155 log.Log(ctx, "transcoded segment", "took", took)
156 }
157 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(time.Since(start).Milliseconds()))
158 }()
159 }
160
161 return nil
162}
163
164func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
165 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
166 locked := lock.TryLock()
167 if !locked {
168 // we're already generating a thumbnail for this user, skip
169 return nil
170 }
171 defer lock.Unlock()
172 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
173 if err != nil {
174 return err
175 }
176 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
177 // we have a thumbnail <60sec old, skip generating a new one
178 return nil
179 }
180 r := bytes.NewReader(not.Data)
181 aqt := aqtime.FromTime(not.Segment.StartTime)
182 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
183 if err != nil {
184 return err
185 }
186 defer fd.Close()
187 err = media.Thumbnail(ctx, r, fd, "jpeg")
188 if err != nil {
189 return err
190 }
191 thumb := &model.Thumbnail{
192 Format: "jpeg",
193 SegmentID: not.Segment.ID,
194 }
195 err = ss.mod.CreateThumbnail(thumb)
196 if err != nil {
197 return err
198 }
199 return nil
200}
201
202func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) {
203 if pv == nil {
204 return nil, fmt.Errorf("post view is nil")
205 }
206 rec, ok := pv.Record.Val.(*bsky.FeedPost)
207 if !ok {
208 return nil, fmt.Errorf("post view record is not a feed post")
209 }
210 if rec.Embed == nil {
211 return nil, fmt.Errorf("post view embed is nil")
212 }
213 if rec.Embed.EmbedExternal == nil {
214 return nil, fmt.Errorf("post view embed external view is nil")
215 }
216 if rec.Embed.EmbedExternal.External == nil {
217 return nil, fmt.Errorf("post view embed external is nil")
218 }
219 if rec.Embed.EmbedExternal.External.Thumb == nil {
220 return nil, fmt.Errorf("post view embed external thumb is nil")
221 }
222 return rec.Embed.EmbedExternal.External.Thumb, nil
223}
224
225func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
226 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
227 ss.lastStatusLock.Lock()
228 defer ss.lastStatusLock.Unlock()
229 if time.Since(ss.lastStatus) < time.Minute {
230 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
231 return nil
232 }
233
234 session, err := ss.mod.GetSessionByDID(repoDID)
235 if err != nil {
236 return fmt.Errorf("could not get session for repoDID: %w", err)
237 }
238 if session == nil {
239 return fmt.Errorf("no session found for repoDID: %s", repoDID)
240 }
241
242 session, err = ss.op.RefreshIfNeeded(session)
243 if err != nil {
244 return fmt.Errorf("could not refresh session for repoDID: %w", err)
245 }
246
247 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
248 if err != nil {
249 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
250 }
251 lsv, err := ls.ToLivestreamView()
252 if err != nil {
253 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
254 }
255
256 post, err := ss.mod.GetFeedPost(ls.PostCID)
257 if err != nil {
258 return fmt.Errorf("could not get feed post: %w", err)
259 }
260 if post == nil {
261 return fmt.Errorf("feed post not found for livestream: %w", err)
262 }
263 postView, err := post.ToBskyPostView()
264 if err != nil {
265 return fmt.Errorf("could not convert feed post to bsky post view: %w", err)
266 }
267 thumb, err := getThumbnailCID(postView)
268 if err != nil {
269 return fmt.Errorf("could not get thumbnail cid: %w", err)
270 }
271
272 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
273 if err != nil {
274 return fmt.Errorf("could not get repo for repoDID: %w", err)
275 }
276
277 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
278 if !ok {
279 return fmt.Errorf("livestream is not a streamplace livestream")
280 }
281
282 actorStatusEmbed := bsky.ActorStatus_Embed{
283 EmbedExternal: &bsky.EmbedExternal{
284 External: &bsky.EmbedExternal_External{
285 Title: lsr.Title,
286 Uri: fmt.Sprintf("https://%s/%s", ss.cli.PublicHost, repo.Handle),
287 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.PublicHost),
288 Thumb: thumb,
289 },
290 },
291 }
292
293 duration := int64(2)
294 status := bsky.ActorStatus{
295 Status: "live",
296 DurationMinutes: &duration,
297 Embed: &actorStatusEmbed,
298 CreatedAt: time.Now().Format(time.RFC3339),
299 }
300
301 client, err := ss.op.GetXrpcClient(session)
302 if err != nil {
303 return fmt.Errorf("could not get xrpc client: %w", err)
304 }
305
306 var swapRecord *string
307 getOutput := atproto.RepoGetRecord_Output{}
308 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
309 "repo": repoDID,
310 "collection": "app.bsky.actor.status",
311 "rkey": "self",
312 }, nil, &getOutput)
313 if err != nil {
314 xErr, ok := err.(*xrpc.Error)
315 if !ok {
316 return fmt.Errorf("could not get record: %w", err)
317 }
318 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
319 return fmt.Errorf("could not get record: %w", err)
320 }
321 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
322 } else {
323 log.Debug(ctx, "got record", "record", getOutput)
324 swapRecord = getOutput.Cid
325 }
326
327 inp := atproto.RepoPutRecord_Input{
328 Collection: "app.bsky.actor.status",
329 Record: &util.LexiconTypeDecoder{Val: &status},
330 Rkey: "self",
331 Repo: repoDID,
332 SwapRecord: swapRecord,
333 }
334 out := atproto.RepoPutRecord_Output{}
335
336 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
337 if err != nil {
338 return fmt.Errorf("could not create record: %w", err)
339 }
340 log.Debug(ctx, "created status record", "out", out)
341
342 ss.lastStatus = time.Now()
343
344 return nil
345}
346
347func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
348 rs, err := renditions.GenerateRenditions(spseg)
349 if err != nil {
350 return fmt.Errorf("failed to generated renditions: %w", err)
351 }
352
353 if ss.lp == nil {
354 var err error
355 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL)
356 if err != nil {
357 return err
358 }
359
360 }
361 spmetrics.TranscodeAttemptsTotal.Inc()
362 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
363 if err != nil {
364 spmetrics.TranscodeErrorsTotal.Inc()
365 return err
366 }
367 if len(rs) != len(segs) {
368 spmetrics.TranscodeErrorsTotal.Inc()
369 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
370 }
371 spmetrics.TranscodeSuccessesTotal.Inc()
372 aqt, err := aqtime.FromString(spseg.StartTime)
373 if err != nil {
374 return err
375 }
376 for i, seg := range segs {
377 log.Debug(ctx, "publishing segment", "rendition", rs[i])
378 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
379 if err != nil {
380 return fmt.Errorf("failed to create transcoded segment file: %w", err)
381 }
382 defer fd.Close()
383 _, err = fd.Write(seg)
384 if err != nil {
385 return fmt.Errorf("failed to write transcoded segment file: %w", err)
386 }
387 go ss.TryAddToHLS(ctx, spseg, rs[i].Name, seg)
388 go ss.mm.PublishSegment(ctx, spseg.Creator, rs[i].Name, &segchanman.Seg{
389 Filepath: fd.Name(),
390 Data: seg,
391 })
392 }
393 return nil
394}
395
396func (ss *StreamSession) TryAddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) {
397 ctx = log.WithLogValues(ctx, "rendition", rendition)
398 err := ss.AddToHLS(ctx, spseg, rendition, data)
399 if err != nil {
400 log.Error(ctx, "could not add to hls", "error", err)
401 }
402}
403
404func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
405 buf := bytes.Buffer{}
406 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
407 if err != nil {
408 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
409 }
410 // newSeg := &streamplace.Segment{
411 // LexiconTypeID: "place.stream.segment",
412 // Id: spseg.Id,
413 // Creator: spseg.Creator,
414 // StartTime: spseg.StartTime,
415 // Duration: &dur,
416 // Audio: spseg.Audio,
417 // Video: spseg.Video,
418 // SigningKey: spseg.SigningKey,
419 // }
420 aqt, err := aqtime.FromString(spseg.StartTime)
421 if err != nil {
422 return fmt.Errorf("failed to parse segment start time: %w", err)
423 }
424 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
425 if err := ss.hls.GetRendition(rendition).NewSegment(&media.Segment{
426 Buf: &buf,
427 Duration: time.Duration(dur),
428 Time: aqt.Time(),
429 }); err != nil {
430 return fmt.Errorf("failed to create new segment: %w", err)
431 }
432
433 return nil
434}