Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "github.com/streamplace/oatproxy/pkg/oatproxy"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/aqtime"
17 "stream.place/streamplace/pkg/bus"
18 "stream.place/streamplace/pkg/config"
19 "stream.place/streamplace/pkg/livepeer"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/media"
22 "stream.place/streamplace/pkg/model"
23 "stream.place/streamplace/pkg/renditions"
24 "stream.place/streamplace/pkg/spmetrics"
25 "stream.place/streamplace/pkg/streamplace"
26 "stream.place/streamplace/pkg/thumbnail"
27)
28
29type StreamSession struct {
30 mm *media.MediaManager
31 mod model.Model
32 cli *config.CLI
33 bus *bus.Bus
34 op *oatproxy.OATProxy
35 hls *media.M3U8
36 lp *livepeer.LivepeerSession
37 repoDID string
38 segmentChan chan struct{}
39 lastStatus time.Time
40 lastStatusLock sync.Mutex
41 g *errgroup.Group
42 started chan struct{}
43 ctx context.Context
44 packets []bus.PacketizedSegment
45}
46
47func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error {
48 ctx, cancel := context.WithCancel(ctx)
49 ss.g, ctx = errgroup.WithContext(ctx)
50 sid := livepeer.RandomTrailer(8)
51 ctx = log.WithLogValues(ctx, "sid", sid)
52 ss.ctx = ctx
53 log.Log(ctx, "starting stream session")
54 defer cancel()
55 spseg, err := not.Segment.ToStreamplaceSegment()
56 if err != nil {
57 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
58 }
59 var allRenditions renditions.Renditions
60
61 if ss.cli.LivepeerGatewayURL != "" {
62 allRenditions, err = renditions.GenerateRenditions(spseg)
63 } else {
64 allRenditions = []renditions.Rendition{}
65 }
66 if err != nil {
67 return err
68 }
69 if spseg.Duration == nil {
70 return fmt.Errorf("segment duration is required to calculate bitrate")
71 }
72 dur := time.Duration(*spseg.Duration)
73 byteLen := len(not.Data)
74 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
75 sourceRendition := renditions.Rendition{
76 Name: "source",
77 Bitrate: bitrate,
78 Width: spseg.Video[0].Width,
79 Height: spseg.Video[0].Height,
80 }
81 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
82 ss.hls = media.NewM3U8(allRenditions)
83
84 // for _, r := range allRenditions {
85 // g.Go(func() error {
86 // for {
87 // if ctx.Err() != nil {
88 // return nil
89 // }
90 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
91 // if ctx.Err() != nil {
92 // return nil
93 // }
94 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
95 // time.Sleep(time.Second * 5)
96 // }
97 // })
98 // }
99
100 close(ss.started)
101
102 for {
103 select {
104 case <-ss.segmentChan:
105 // reset timer
106 case <-ctx.Done():
107 return ss.g.Wait()
108 // case <-time.After(time.Minute * 1):
109 case <-time.After(time.Second * 60):
110 log.Log(ctx, "no new segments for 1 minute, shutting down")
111 for _, r := range allRenditions {
112 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
113 }
114 cancel()
115 }
116 }
117}
118
119// Execute a goroutine in the context of the stream session. Errors are
120// non-fatal; if you actually want to melt the universe on an error you
121// should panic()
122func (ss *StreamSession) Go(ctx context.Context, f func() error) {
123 <-ss.started
124 ss.g.Go(func() error {
125 err := f()
126 if err != nil {
127 log.Error(ctx, "error in goroutine", "error", err)
128 }
129 return nil
130 })
131}
132
133func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error {
134 <-ss.started
135 go func() {
136 select {
137 case <-ss.ctx.Done():
138 return
139 case ss.segmentChan <- struct{}{}:
140 }
141 }()
142 aqt := aqtime.FromTime(not.Segment.StartTime)
143 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString())
144 err := ss.mod.CreateSegment(not.Segment)
145 if err != nil {
146 return fmt.Errorf("could not add segment to database: %w", err)
147 }
148 spseg, err := not.Segment.ToStreamplaceSegment()
149 if err != nil {
150 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
151 }
152
153 ss.bus.Publish(spseg.Creator, spseg)
154 ss.Go(ctx, func() error {
155 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
156 Filepath: not.Segment.ID,
157 Data: not.Data,
158 })
159 })
160
161 if ss.cli.Thumbnail {
162 ss.Go(ctx, func() error {
163 return ss.Thumbnail(ctx, spseg.Creator, not)
164 })
165 }
166
167 ss.Go(ctx, func() error {
168 return ss.UpdateStatus(ctx, spseg.Creator)
169 })
170
171 if ss.cli.LivepeerGatewayURL != "" {
172 ss.Go(ctx, func() error {
173 start := time.Now()
174 err := ss.Transcode(ctx, spseg, not.Data)
175 took := time.Since(start)
176 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
177 return err
178 })
179 }
180
181 return nil
182}
183
184func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
185 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
186 locked := lock.TryLock()
187 if !locked {
188 // we're already generating a thumbnail for this user, skip
189 return nil
190 }
191 defer lock.Unlock()
192 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
193 if err != nil {
194 return err
195 }
196 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
197 // we have a thumbnail <60sec old, skip generating a new one
198 return nil
199 }
200 r := bytes.NewReader(not.Data)
201 aqt := aqtime.FromTime(not.Segment.StartTime)
202 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
203 if err != nil {
204 return err
205 }
206 defer fd.Close()
207 err = media.Thumbnail(ctx, r, fd, "jpeg")
208 if err != nil {
209 return err
210 }
211 thumb := &model.Thumbnail{
212 Format: "jpeg",
213 SegmentID: not.Segment.ID,
214 }
215 err = ss.mod.CreateThumbnail(thumb)
216 if err != nil {
217 return err
218 }
219 return nil
220}
221
222func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) {
223 if pv == nil {
224 return nil, fmt.Errorf("post view is nil")
225 }
226 rec, ok := pv.Record.Val.(*bsky.FeedPost)
227 if !ok {
228 return nil, fmt.Errorf("post view record is not a feed post")
229 }
230 if rec.Embed == nil {
231 return nil, fmt.Errorf("post view embed is nil")
232 }
233 if rec.Embed.EmbedExternal == nil {
234 return nil, fmt.Errorf("post view embed external view is nil")
235 }
236 if rec.Embed.EmbedExternal.External == nil {
237 return nil, fmt.Errorf("post view embed external is nil")
238 }
239 if rec.Embed.EmbedExternal.External.Thumb == nil {
240 return nil, fmt.Errorf("post view embed external thumb is nil")
241 }
242 return rec.Embed.EmbedExternal.External.Thumb, nil
243}
244
245func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
246 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
247 ss.lastStatusLock.Lock()
248 defer ss.lastStatusLock.Unlock()
249 if time.Since(ss.lastStatus) < time.Minute {
250 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
251 return nil
252 }
253
254 session, err := ss.mod.GetSessionByDID(repoDID)
255 if err != nil {
256 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
257 }
258 if session == nil {
259 return fmt.Errorf("no session found for repoDID: %s", repoDID)
260 }
261
262 session, err = ss.op.RefreshIfNeeded(session)
263 if err != nil {
264 return fmt.Errorf("could not refresh session for repoDID: %w", err)
265 }
266
267 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
268 if err != nil {
269 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
270 }
271 lsv, err := ls.ToLivestreamView()
272 if err != nil {
273 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
274 }
275
276 post, err := ss.mod.GetFeedPost(ls.PostCID)
277 if err != nil {
278 return fmt.Errorf("could not get feed post: %w", err)
279 }
280 if post == nil {
281 return fmt.Errorf("feed post not found for livestream: %w", err)
282 }
283 postView, err := post.ToBskyPostView()
284 if err != nil {
285 return fmt.Errorf("could not convert feed post to bsky post view: %w", err)
286 }
287 thumb, err := getThumbnailCID(postView)
288 if err != nil {
289 return fmt.Errorf("could not get thumbnail cid: %w", err)
290 }
291
292 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
293 if err != nil {
294 return fmt.Errorf("could not get repo for repoDID: %w", err)
295 }
296
297 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
298 if !ok {
299 return fmt.Errorf("livestream is not a streamplace livestream")
300 }
301
302 actorStatusEmbed := bsky.ActorStatus_Embed{
303 EmbedExternal: &bsky.EmbedExternal{
304 External: &bsky.EmbedExternal_External{
305 Title: lsr.Title,
306 Uri: fmt.Sprintf("https://%s/%s", ss.cli.PublicHost, repo.Handle),
307 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.PublicHost),
308 Thumb: thumb,
309 },
310 },
311 }
312
313 duration := int64(2)
314 status := bsky.ActorStatus{
315 Status: "live",
316 DurationMinutes: &duration,
317 Embed: &actorStatusEmbed,
318 CreatedAt: time.Now().Format(time.RFC3339),
319 }
320
321 client, err := ss.op.GetXrpcClient(session)
322 if err != nil {
323 return fmt.Errorf("could not get xrpc client: %w", err)
324 }
325
326 var swapRecord *string
327 getOutput := atproto.RepoGetRecord_Output{}
328 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
329 "repo": repoDID,
330 "collection": "app.bsky.actor.status",
331 "rkey": "self",
332 }, nil, &getOutput)
333 if err != nil {
334 xErr, ok := err.(*xrpc.Error)
335 if !ok {
336 return fmt.Errorf("could not get record: %w", err)
337 }
338 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
339 return fmt.Errorf("could not get record: %w", err)
340 }
341 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
342 } else {
343 log.Debug(ctx, "got record", "record", getOutput)
344 swapRecord = getOutput.Cid
345 }
346
347 inp := atproto.RepoPutRecord_Input{
348 Collection: "app.bsky.actor.status",
349 Record: &util.LexiconTypeDecoder{Val: &status},
350 Rkey: "self",
351 Repo: repoDID,
352 SwapRecord: swapRecord,
353 }
354 out := atproto.RepoPutRecord_Output{}
355
356 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
357 if err != nil {
358 return fmt.Errorf("could not create record: %w", err)
359 }
360 log.Debug(ctx, "created status record", "out", out)
361
362 ss.lastStatus = time.Now()
363
364 return nil
365}
366
367func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
368 rs, err := renditions.GenerateRenditions(spseg)
369 if err != nil {
370 return fmt.Errorf("failed to generated renditions: %w", err)
371 }
372
373 if ss.lp == nil {
374 var err error
375 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL)
376 if err != nil {
377 return err
378 }
379
380 }
381 spmetrics.TranscodeAttemptsTotal.Inc()
382 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
383 if err != nil {
384 spmetrics.TranscodeErrorsTotal.Inc()
385 return err
386 }
387 if len(rs) != len(segs) {
388 spmetrics.TranscodeErrorsTotal.Inc()
389 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
390 }
391 spmetrics.TranscodeSuccessesTotal.Inc()
392 aqt, err := aqtime.FromString(spseg.StartTime)
393 if err != nil {
394 return err
395 }
396 for i, seg := range segs {
397 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
398 log.Debug(ctx, "publishing segment", "rendition", rs[i])
399 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
400 if err != nil {
401 return fmt.Errorf("failed to create transcoded segment file: %w", err)
402 }
403 defer fd.Close()
404 _, err = fd.Write(seg)
405 if err != nil {
406 return fmt.Errorf("failed to write transcoded segment file: %w", err)
407 }
408 ss.Go(ctx, func() error {
409 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
410 Filepath: fd.Name(),
411 Data: seg,
412 })
413 })
414
415 }
416 return nil
417}
418
419func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
420 ss.Go(ctx, func() error {
421 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
422 })
423 ss.Go(ctx, func() error {
424 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
425 })
426 return nil
427}
428
429func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
430 packet, err := media.Packetize(ctx, seg)
431 if err != nil {
432 return fmt.Errorf("failed to packetize segment: %w", err)
433 }
434 seg.PacketizedData = packet
435 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
436 return nil
437}
438
439func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
440 buf := bytes.Buffer{}
441 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
442 if err != nil {
443 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
444 }
445 // newSeg := &streamplace.Segment{
446 // LexiconTypeID: "place.stream.segment",
447 // Id: spseg.Id,
448 // Creator: spseg.Creator,
449 // StartTime: spseg.StartTime,
450 // Duration: &dur,
451 // Audio: spseg.Audio,
452 // Video: spseg.Video,
453 // SigningKey: spseg.SigningKey,
454 // }
455 aqt, err := aqtime.FromString(spseg.StartTime)
456 if err != nil {
457 return fmt.Errorf("failed to parse segment start time: %w", err)
458 }
459 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
460 rend, err := ss.hls.GetRendition(rendition)
461 if err != nil {
462 return fmt.Errorf("failed to get rendition: %w", err)
463 }
464 if err := rend.NewSegment(&media.Segment{
465 Buf: &buf,
466 Duration: time.Duration(dur),
467 Time: aqt.Time(),
468 }); err != nil {
469 return fmt.Errorf("failed to create new segment: %w", err)
470 }
471
472 return nil
473}