Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "github.com/streamplace/oatproxy/pkg/oatproxy"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/aqtime"
17 "stream.place/streamplace/pkg/bus"
18 "stream.place/streamplace/pkg/config"
19 "stream.place/streamplace/pkg/livepeer"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/media"
22 "stream.place/streamplace/pkg/model"
23 "stream.place/streamplace/pkg/renditions"
24 "stream.place/streamplace/pkg/replication/iroh_replicator"
25 "stream.place/streamplace/pkg/spmetrics"
26 "stream.place/streamplace/pkg/statedb"
27 "stream.place/streamplace/pkg/streamplace"
28 "stream.place/streamplace/pkg/thumbnail"
29)
30
31type StreamSession struct {
32 mm *media.MediaManager
33 mod model.Model
34 cli *config.CLI
35 bus *bus.Bus
36 op *oatproxy.OATProxy
37 hls *media.M3U8
38 lp *livepeer.LivepeerSession
39 repoDID string
40 segmentChan chan struct{}
41 lastStatus time.Time
42 lastStatusCID *string
43 lastStatusLock sync.Mutex
44 lastOriginTime time.Time
45 lastOriginLock sync.Mutex
46 g *errgroup.Group
47 started chan struct{}
48 ctx context.Context
49 packets []bus.PacketizedSegment
50 statefulDB *statedb.StatefulDB
51 swarm *iroh_replicator.IrohSwarm
52}
53
54func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error {
55 ctx, cancel := context.WithCancel(ctx)
56 ss.g, ctx = errgroup.WithContext(ctx)
57 sid := livepeer.RandomTrailer(8)
58 ctx = log.WithLogValues(ctx, "sid", sid)
59 ss.ctx = ctx
60 log.Log(ctx, "starting stream session")
61 defer cancel()
62 spseg, err := notif.Segment.ToStreamplaceSegment()
63 if err != nil {
64 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
65 }
66 var allRenditions renditions.Renditions
67
68 if ss.cli.LivepeerGatewayURL != "" {
69 allRenditions, err = renditions.GenerateRenditions(spseg)
70 } else {
71 allRenditions = []renditions.Rendition{}
72 }
73 if err != nil {
74 return err
75 }
76 if spseg.Duration == nil {
77 return fmt.Errorf("segment duration is required to calculate bitrate")
78 }
79 dur := time.Duration(*spseg.Duration)
80 byteLen := len(notif.Data)
81 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
82 sourceRendition := renditions.Rendition{
83 Name: "source",
84 Bitrate: bitrate,
85 Width: spseg.Video[0].Width,
86 Height: spseg.Video[0].Height,
87 }
88 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
89 ss.hls = media.NewM3U8(allRenditions)
90
91 // for _, r := range allRenditions {
92 // g.Go(func() error {
93 // for {
94 // if ctx.Err() != nil {
95 // return nil
96 // }
97 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
98 // if ctx.Err() != nil {
99 // return nil
100 // }
101 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
102 // time.Sleep(time.Second * 5)
103 // }
104 // })
105 // }
106
107 close(ss.started)
108
109 for {
110 select {
111 case <-ss.segmentChan:
112 // reset timer
113 case <-ctx.Done():
114 return ss.g.Wait()
115 // case <-time.After(time.Minute * 1):
116 case <-time.After(time.Second * 60):
117 log.Log(ctx, "no new segments for 1 minute, shutting down")
118 for _, r := range allRenditions {
119 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
120 }
121 if notif.Local {
122 ss.Go(ctx, func() error {
123 return ss.DeleteStatus(spseg.Creator)
124 })
125 }
126 cancel()
127 }
128 }
129}
130
131// Execute a goroutine in the context of the stream session. Errors are
132// non-fatal; if you actually want to melt the universe on an error you
133// should panic()
134func (ss *StreamSession) Go(ctx context.Context, f func() error) {
135 <-ss.started
136 ss.g.Go(func() error {
137 err := f()
138 if err != nil {
139 log.Error(ctx, "error in goroutine", "error", err)
140 }
141 return nil
142 })
143}
144
145func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error {
146 <-ss.started
147 go func() {
148 select {
149 case <-ss.ctx.Done():
150 return
151 case ss.segmentChan <- struct{}{}:
152 }
153 }()
154 aqt := aqtime.FromTime(notif.Segment.StartTime)
155 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString())
156 notif.Segment.MediaData.Size = len(notif.Data)
157 err := ss.mod.CreateSegment(notif.Segment)
158 if err != nil {
159 return fmt.Errorf("could not add segment to database: %w", err)
160 }
161 spseg, err := notif.Segment.ToStreamplaceSegment()
162 if err != nil {
163 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
164 }
165
166 ss.bus.Publish(spseg.Creator, spseg)
167 ss.Go(ctx, func() error {
168 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
169 Filepath: notif.Segment.ID,
170 Data: notif.Data,
171 })
172 })
173
174 if ss.cli.Thumbnail {
175 ss.Go(ctx, func() error {
176 return ss.Thumbnail(ctx, spseg.Creator, notif)
177 })
178 }
179
180 if notif.Local {
181 ss.Go(ctx, func() error {
182 return ss.UpdateStatus(ctx, spseg.Creator)
183 })
184
185 ss.Go(ctx, func() error {
186 return ss.UpdateBroadcastOrigin(ctx)
187 })
188 }
189
190 if ss.cli.LivepeerGatewayURL != "" {
191 ss.Go(ctx, func() error {
192 start := time.Now()
193 err := ss.Transcode(ctx, spseg, notif.Data)
194 took := time.Since(start)
195 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
196 return err
197 })
198 }
199
200 return nil
201}
202
203func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
204 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
205 locked := lock.TryLock()
206 if !locked {
207 // we're already generating a thumbnail for this user, skip
208 return nil
209 }
210 defer lock.Unlock()
211 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
212 if err != nil {
213 return err
214 }
215 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
216 // we have a thumbnail <60sec old, skip generating a new one
217 return nil
218 }
219 r := bytes.NewReader(not.Data)
220 aqt := aqtime.FromTime(not.Segment.StartTime)
221 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
222 if err != nil {
223 return err
224 }
225 defer fd.Close()
226 err = media.Thumbnail(ctx, r, fd, "jpeg")
227 if err != nil {
228 return err
229 }
230 thumb := &model.Thumbnail{
231 Format: "jpeg",
232 SegmentID: not.Segment.ID,
233 }
234 err = ss.mod.CreateThumbnail(thumb)
235 if err != nil {
236 return err
237 }
238 return nil
239}
240
241func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) {
242 if pv == nil {
243 return nil, fmt.Errorf("post view is nil")
244 }
245 rec, ok := pv.Record.Val.(*bsky.FeedPost)
246 if !ok {
247 return nil, fmt.Errorf("post view record is not a feed post")
248 }
249 if rec.Embed == nil {
250 return nil, fmt.Errorf("post view embed is nil")
251 }
252 if rec.Embed.EmbedExternal == nil {
253 return nil, fmt.Errorf("post view embed external view is nil")
254 }
255 if rec.Embed.EmbedExternal.External == nil {
256 return nil, fmt.Errorf("post view embed external is nil")
257 }
258 if rec.Embed.EmbedExternal.External.Thumb == nil {
259 return nil, fmt.Errorf("post view embed external thumb is nil")
260 }
261 return rec.Embed.EmbedExternal.External.Thumb, nil
262}
263
264func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
265 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
266 ss.lastStatusLock.Lock()
267 defer ss.lastStatusLock.Unlock()
268 if time.Since(ss.lastStatus) < time.Minute {
269 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
270 return nil
271 }
272
273 session, err := ss.statefulDB.GetSessionByDID(repoDID)
274 if err != nil {
275 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
276 }
277 if session == nil {
278 return fmt.Errorf("no session found for repoDID: %s", repoDID)
279 }
280
281 session, err = ss.op.RefreshIfNeeded(session)
282 if err != nil {
283 return fmt.Errorf("could not refresh session for repoDID: %w", err)
284 }
285
286 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
287 if err != nil {
288 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
289 }
290 lsv, err := ls.ToLivestreamView()
291 if err != nil {
292 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
293 }
294
295 post, err := ss.mod.GetFeedPost(ls.PostURI)
296 if err != nil {
297 return fmt.Errorf("could not get feed post: %w", err)
298 }
299 if post == nil {
300 return fmt.Errorf("feed post not found for livestream: %w", err)
301 }
302 postView, err := post.ToBskyPostView()
303 if err != nil {
304 return fmt.Errorf("could not convert feed post to bsky post view: %w", err)
305 }
306 thumb, err := getThumbnailCID(postView)
307 if err != nil {
308 return fmt.Errorf("could not get thumbnail cid: %w", err)
309 }
310
311 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
312 if err != nil {
313 return fmt.Errorf("could not get repo for repoDID: %w", err)
314 }
315
316 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
317 if !ok {
318 return fmt.Errorf("livestream is not a streamplace livestream")
319 }
320
321 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle)
322
323 if lsr.CanonicalUrl != nil {
324 canonicalUrl = *lsr.CanonicalUrl
325 }
326
327 actorStatusEmbed := bsky.ActorStatus_Embed{
328 EmbedExternal: &bsky.EmbedExternal{
329 External: &bsky.EmbedExternal_External{
330 Title: lsr.Title,
331 Uri: canonicalUrl,
332 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost),
333 Thumb: thumb,
334 },
335 },
336 }
337
338 duration := int64(120)
339 status := bsky.ActorStatus{
340 Status: "app.bsky.actor.status#live",
341 DurationMinutes: &duration,
342 Embed: &actorStatusEmbed,
343 CreatedAt: time.Now().Format(time.RFC3339),
344 }
345
346 client, err := ss.op.GetXrpcClient(session)
347 if err != nil {
348 return fmt.Errorf("could not get xrpc client: %w", err)
349 }
350
351 var swapRecord *string
352 getOutput := atproto.RepoGetRecord_Output{}
353 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
354 "repo": repoDID,
355 "collection": "app.bsky.actor.status",
356 "rkey": "self",
357 }, nil, &getOutput)
358 if err != nil {
359 xErr, ok := err.(*xrpc.Error)
360 if !ok {
361 return fmt.Errorf("could not get record: %w", err)
362 }
363 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
364 return fmt.Errorf("could not get record: %w", err)
365 }
366 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
367 } else {
368 log.Debug(ctx, "got record", "record", getOutput)
369 swapRecord = getOutput.Cid
370 }
371
372 inp := atproto.RepoPutRecord_Input{
373 Collection: "app.bsky.actor.status",
374 Record: &util.LexiconTypeDecoder{Val: &status},
375 Rkey: "self",
376 Repo: repoDID,
377 SwapRecord: swapRecord,
378 }
379 out := atproto.RepoPutRecord_Output{}
380
381 ss.lastStatusCID = &out.Cid
382
383 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
384 if err != nil {
385 return fmt.Errorf("could not create record: %w", err)
386 }
387 log.Debug(ctx, "created status record", "out", out)
388
389 ss.lastStatus = time.Now()
390
391 return nil
392}
393
394func (ss *StreamSession) DeleteStatus(repoDID string) error {
395 // need a special extra context because the stream session context is already cancelled
396 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID)
397 ss.lastStatusLock.Lock()
398 defer ss.lastStatusLock.Unlock()
399 if ss.lastStatusCID == nil {
400 log.Debug(ctx, "no status cid to delete")
401 return nil
402 }
403 inp := atproto.RepoDeleteRecord_Input{
404 Collection: "app.bsky.actor.status",
405 Rkey: "self",
406 Repo: repoDID,
407 }
408 inp.SwapRecord = ss.lastStatusCID
409 out := atproto.RepoDeleteRecord_Output{}
410
411 session, err := ss.statefulDB.GetSessionByDID(repoDID)
412 if err != nil {
413 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
414 }
415 if session == nil {
416 return fmt.Errorf("no session found for repoDID: %s", repoDID)
417 }
418
419 session, err = ss.op.RefreshIfNeeded(session)
420 if err != nil {
421 return fmt.Errorf("could not refresh session for repoDID: %w", err)
422 }
423
424 client, err := ss.op.GetXrpcClient(session)
425 if err != nil {
426 return fmt.Errorf("could not get xrpc client: %w", err)
427 }
428
429 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out)
430 if err != nil {
431 return fmt.Errorf("could not delete record: %w", err)
432 }
433
434 ss.lastStatusCID = nil
435 return nil
436}
437
438var originUpdateInterval = time.Second * 30
439
440func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error {
441 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
442 ss.lastOriginLock.Lock()
443 defer ss.lastOriginLock.Unlock()
444 if time.Since(ss.lastOriginTime) < originUpdateInterval {
445 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago")
446 return nil
447 }
448 origin := streamplace.BroadcastOrigin{
449 Streamer: ss.repoDID,
450 Server: fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost),
451 UpdatedAt: time.Now().Format(time.RFC3339),
452 IrohTicket: &ss.swarm.NodeTicket,
453 }
454
455 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID)
456 if err != nil {
457 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
458 }
459 if session == nil {
460 return fmt.Errorf("no session found for repoDID: %s", ss.repoDID)
461 }
462
463 session, err = ss.op.RefreshIfNeeded(session)
464 if err != nil {
465 return fmt.Errorf("could not refresh session for repoDID: %w", err)
466 }
467
468 client, err := ss.op.GetXrpcClient(session)
469 if err != nil {
470 return fmt.Errorf("could not get xrpc client: %w", err)
471 }
472
473 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost)
474
475 var swapRecord *string
476 getOutput := atproto.RepoGetRecord_Output{}
477 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
478 "repo": ss.repoDID,
479 "collection": "place.stream.broadcast.origin",
480 "rkey": rkey,
481 }, nil, &getOutput)
482 if err != nil {
483 xErr, ok := err.(*xrpc.Error)
484 if !ok {
485 return fmt.Errorf("could not get record: %w", err)
486 }
487 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
488 return fmt.Errorf("could not get record: %w", err)
489 }
490 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID)
491 } else {
492 log.Debug(ctx, "got record", "record", getOutput)
493 swapRecord = getOutput.Cid
494 }
495
496 inp := atproto.RepoPutRecord_Input{
497 Collection: "place.stream.broadcast.origin",
498 Record: &util.LexiconTypeDecoder{Val: &origin},
499 Rkey: fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost),
500 Repo: ss.repoDID,
501 SwapRecord: swapRecord,
502 }
503 out := atproto.RepoPutRecord_Output{}
504
505 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
506 if err != nil {
507 return fmt.Errorf("could not create record: %w", err)
508 }
509
510 ss.lastOriginTime = time.Now()
511 return nil
512}
513
514func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
515 rs, err := renditions.GenerateRenditions(spseg)
516 if err != nil {
517 return fmt.Errorf("failed to generated renditions: %w", err)
518 }
519
520 if ss.lp == nil {
521 var err error
522 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL)
523 if err != nil {
524 return err
525 }
526
527 }
528 spmetrics.TranscodeAttemptsTotal.Inc()
529 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
530 if err != nil {
531 spmetrics.TranscodeErrorsTotal.Inc()
532 return err
533 }
534 if len(rs) != len(segs) {
535 spmetrics.TranscodeErrorsTotal.Inc()
536 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
537 }
538 spmetrics.TranscodeSuccessesTotal.Inc()
539 aqt, err := aqtime.FromString(spseg.StartTime)
540 if err != nil {
541 return err
542 }
543 for i, seg := range segs {
544 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
545 log.Debug(ctx, "publishing segment", "rendition", rs[i])
546 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
547 if err != nil {
548 return fmt.Errorf("failed to create transcoded segment file: %w", err)
549 }
550 defer fd.Close()
551 _, err = fd.Write(seg)
552 if err != nil {
553 return fmt.Errorf("failed to write transcoded segment file: %w", err)
554 }
555 ss.Go(ctx, func() error {
556 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
557 Filepath: fd.Name(),
558 Data: seg,
559 })
560 })
561
562 }
563 return nil
564}
565
566func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
567 ss.Go(ctx, func() error {
568 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
569 })
570 ss.Go(ctx, func() error {
571 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
572 })
573 return nil
574}
575
576func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
577 packet, err := media.Packetize(ctx, seg)
578 if err != nil {
579 return fmt.Errorf("failed to packetize segment: %w", err)
580 }
581 seg.PacketizedData = packet
582 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
583 return nil
584}
585
586func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
587 buf := bytes.Buffer{}
588 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
589 if err != nil {
590 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
591 }
592 // newSeg := &streamplace.Segment{
593 // LexiconTypeID: "place.stream.segment",
594 // Id: spseg.Id,
595 // Creator: spseg.Creator,
596 // StartTime: spseg.StartTime,
597 // Duration: &dur,
598 // Audio: spseg.Audio,
599 // Video: spseg.Video,
600 // SigningKey: spseg.SigningKey,
601 // }
602 aqt, err := aqtime.FromString(spseg.StartTime)
603 if err != nil {
604 return fmt.Errorf("failed to parse segment start time: %w", err)
605 }
606 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
607 rend, err := ss.hls.GetRendition(rendition)
608 if err != nil {
609 return fmt.Errorf("failed to get rendition: %w", err)
610 }
611 if err := rend.NewSegment(&media.Segment{
612 Buf: &buf,
613 Duration: time.Duration(dur),
614 Time: aqt.Time(),
615 }); err != nil {
616 return fmt.Errorf("failed to create new segment: %w", err)
617 }
618
619 return nil
620}