Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/util"
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/streamplace/oatproxy/pkg/oatproxy"
16 "golang.org/x/sync/errgroup"
17 "stream.place/streamplace/pkg/aqtime"
18 "stream.place/streamplace/pkg/bus"
19 "stream.place/streamplace/pkg/config"
20 "stream.place/streamplace/pkg/livepeer"
21 "stream.place/streamplace/pkg/log"
22 "stream.place/streamplace/pkg/media"
23 "stream.place/streamplace/pkg/model"
24 "stream.place/streamplace/pkg/renditions"
25 "stream.place/streamplace/pkg/replication/iroh_replicator"
26 "stream.place/streamplace/pkg/spmetrics"
27 "stream.place/streamplace/pkg/statedb"
28 "stream.place/streamplace/pkg/streamplace"
29 "stream.place/streamplace/pkg/thumbnail"
30)
31
32type StreamSession struct {
33 mm *media.MediaManager
34 mod model.Model
35 cli *config.CLI
36 bus *bus.Bus
37 op *oatproxy.OATProxy
38 hls *media.M3U8
39 lp *livepeer.LivepeerSession
40 repoDID string
41 segmentChan chan struct{}
42 lastStatus time.Time
43 lastStatusCID *string
44 lastStatusLock sync.Mutex
45 lastOriginTime time.Time
46 lastOriginLock sync.Mutex
47 g *errgroup.Group
48 started chan struct{}
49 ctx context.Context
50 packets []bus.PacketizedSegment
51 statefulDB *statedb.StatefulDB
52 swarm *iroh_replicator.IrohSwarm
53}
54
55func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error {
56 ctx, cancel := context.WithCancel(ctx)
57 ss.g, ctx = errgroup.WithContext(ctx)
58 sid := livepeer.RandomTrailer(8)
59 ctx = log.WithLogValues(ctx, "sid", sid)
60 ss.ctx = ctx
61 log.Log(ctx, "starting stream session")
62 defer cancel()
63 spseg, err := notif.Segment.ToStreamplaceSegment()
64 if err != nil {
65 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
66 }
67 var allRenditions renditions.Renditions
68
69 if ss.cli.LivepeerGatewayURL != "" {
70 allRenditions, err = renditions.GenerateRenditions(spseg)
71 } else {
72 allRenditions = []renditions.Rendition{}
73 }
74 if err != nil {
75 return err
76 }
77 if spseg.Duration == nil {
78 return fmt.Errorf("segment duration is required to calculate bitrate")
79 }
80 dur := time.Duration(*spseg.Duration)
81 byteLen := len(notif.Data)
82 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
83 sourceRendition := renditions.Rendition{
84 Name: "source",
85 Bitrate: bitrate,
86 Width: spseg.Video[0].Width,
87 Height: spseg.Video[0].Height,
88 }
89 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
90 ss.hls = media.NewM3U8(allRenditions)
91
92 // for _, r := range allRenditions {
93 // g.Go(func() error {
94 // for {
95 // if ctx.Err() != nil {
96 // return nil
97 // }
98 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
99 // if ctx.Err() != nil {
100 // return nil
101 // }
102 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
103 // time.Sleep(time.Second * 5)
104 // }
105 // })
106 // }
107
108 close(ss.started)
109
110 for {
111 select {
112 case <-ss.segmentChan:
113 // reset timer
114 case <-ctx.Done():
115 return ss.g.Wait()
116 // case <-time.After(time.Minute * 1):
117 case <-time.After(time.Second * 60):
118 log.Log(ctx, "no new segments for 1 minute, shutting down")
119 for _, r := range allRenditions {
120 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
121 }
122 if notif.Local {
123 ss.Go(ctx, func() error {
124 return ss.DeleteStatus(spseg.Creator)
125 })
126 }
127 cancel()
128 }
129 }
130}
131
132// Execute a goroutine in the context of the stream session. Errors are
133// non-fatal; if you actually want to melt the universe on an error you
134// should panic()
135func (ss *StreamSession) Go(ctx context.Context, f func() error) {
136 <-ss.started
137 ss.g.Go(func() error {
138 err := f()
139 if err != nil {
140 log.Error(ctx, "error in goroutine", "error", err)
141 }
142 return nil
143 })
144}
145
146func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error {
147 <-ss.started
148 go func() {
149 select {
150 case <-ss.ctx.Done():
151 return
152 case ss.segmentChan <- struct{}{}:
153 }
154 }()
155 aqt := aqtime.FromTime(notif.Segment.StartTime)
156 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString())
157 notif.Segment.MediaData.Size = len(notif.Data)
158 err := ss.mod.CreateSegment(notif.Segment)
159 if err != nil {
160 return fmt.Errorf("could not add segment to database: %w", err)
161 }
162 spseg, err := notif.Segment.ToStreamplaceSegment()
163 if err != nil {
164 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
165 }
166
167 ss.bus.Publish(spseg.Creator, spseg)
168 ss.Go(ctx, func() error {
169 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
170 Filepath: notif.Segment.ID,
171 Data: notif.Data,
172 })
173 })
174
175 if ss.cli.Thumbnail {
176 ss.Go(ctx, func() error {
177 return ss.Thumbnail(ctx, spseg.Creator, notif)
178 })
179 }
180
181 if notif.Local {
182 ss.Go(ctx, func() error {
183 return ss.UpdateStatus(ctx, spseg.Creator)
184 })
185
186 ss.Go(ctx, func() error {
187 return ss.UpdateBroadcastOrigin(ctx)
188 })
189 }
190
191 if ss.cli.LivepeerGatewayURL != "" {
192 ss.Go(ctx, func() error {
193 start := time.Now()
194 err := ss.Transcode(ctx, spseg, notif.Data)
195 took := time.Since(start)
196 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
197 return err
198 })
199 }
200
201 return nil
202}
203
204func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
205 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
206 locked := lock.TryLock()
207 if !locked {
208 // we're already generating a thumbnail for this user, skip
209 return nil
210 }
211 defer lock.Unlock()
212 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
213 if err != nil {
214 return err
215 }
216 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
217 // we have a thumbnail <60sec old, skip generating a new one
218 return nil
219 }
220 r := bytes.NewReader(not.Data)
221 aqt := aqtime.FromTime(not.Segment.StartTime)
222 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
223 if err != nil {
224 return err
225 }
226 defer fd.Close()
227 err = media.Thumbnail(ctx, r, fd, "jpeg")
228 if err != nil {
229 return err
230 }
231 thumb := &model.Thumbnail{
232 Format: "jpeg",
233 SegmentID: not.Segment.ID,
234 }
235 err = ss.mod.CreateThumbnail(thumb)
236 if err != nil {
237 return err
238 }
239 return nil
240}
241
242func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*lexutil.LexBlob, error) {
243 if pv == nil {
244 return nil, fmt.Errorf("post view is nil")
245 }
246 rec, ok := pv.Record.Val.(*bsky.FeedPost)
247 if !ok {
248 return nil, fmt.Errorf("post view record is not a feed post")
249 }
250 if rec.Embed == nil {
251 return nil, fmt.Errorf("post view embed is nil")
252 }
253 if rec.Embed.EmbedExternal == nil {
254 return nil, fmt.Errorf("post view embed external view is nil")
255 }
256 if rec.Embed.EmbedExternal.External == nil {
257 return nil, fmt.Errorf("post view embed external is nil")
258 }
259 if rec.Embed.EmbedExternal.External.Thumb == nil {
260 return nil, fmt.Errorf("post view embed external thumb is nil")
261 }
262 return rec.Embed.EmbedExternal.External.Thumb, nil
263}
264
265func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
266 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
267 ss.lastStatusLock.Lock()
268 defer ss.lastStatusLock.Unlock()
269 if time.Since(ss.lastStatus) < time.Minute {
270 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
271 return nil
272 }
273
274 session, err := ss.statefulDB.GetSessionByDID(repoDID)
275 if err != nil {
276 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
277 }
278 if session == nil {
279 return fmt.Errorf("no session found for repoDID: %s", repoDID)
280 }
281
282 session, err = ss.op.RefreshIfNeeded(session)
283 if err != nil {
284 return fmt.Errorf("could not refresh session for repoDID: %w", err)
285 }
286
287 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
288 if err != nil {
289 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
290 }
291 lsv, err := ls.ToLivestreamView()
292 if err != nil {
293 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
294 }
295
296 post, err := ss.mod.GetFeedPost(ls.PostURI)
297 if err != nil {
298 return fmt.Errorf("could not get feed post: %w", err)
299 }
300 if post == nil {
301 return fmt.Errorf("feed post not found for livestream: %w", err)
302 }
303 postView, err := post.ToBskyPostView()
304 if err != nil {
305 return fmt.Errorf("could not convert feed post to bsky post view: %w", err)
306 }
307 thumb, err := getThumbnailCID(postView)
308 if err != nil {
309 return fmt.Errorf("could not get thumbnail cid: %w", err)
310 }
311
312 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
313 if err != nil {
314 return fmt.Errorf("could not get repo for repoDID: %w", err)
315 }
316
317 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
318 if !ok {
319 return fmt.Errorf("livestream is not a streamplace livestream")
320 }
321
322 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle)
323
324 if lsr.CanonicalUrl != nil {
325 canonicalUrl = *lsr.CanonicalUrl
326 }
327
328 actorStatusEmbed := bsky.ActorStatus_Embed{
329 EmbedExternal: &bsky.EmbedExternal{
330 External: &bsky.EmbedExternal_External{
331 Title: lsr.Title,
332 Uri: canonicalUrl,
333 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost),
334 Thumb: thumb,
335 },
336 },
337 }
338
339 duration := int64(120)
340 status := bsky.ActorStatus{
341 Status: "app.bsky.actor.status#live",
342 DurationMinutes: &duration,
343 Embed: &actorStatusEmbed,
344 CreatedAt: time.Now().Format(time.RFC3339),
345 }
346
347 client, err := ss.op.GetXrpcClient(session)
348 if err != nil {
349 return fmt.Errorf("could not get xrpc client: %w", err)
350 }
351
352 var swapRecord *string
353 getOutput := atproto.RepoGetRecord_Output{}
354 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
355 "repo": repoDID,
356 "collection": "app.bsky.actor.status",
357 "rkey": "self",
358 }, nil, &getOutput)
359 if err != nil {
360 xErr, ok := err.(*xrpc.Error)
361 if !ok {
362 return fmt.Errorf("could not get record: %w", err)
363 }
364 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
365 return fmt.Errorf("could not get record: %w", err)
366 }
367 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
368 } else {
369 log.Debug(ctx, "got record", "record", getOutput)
370 swapRecord = getOutput.Cid
371 }
372
373 inp := atproto.RepoPutRecord_Input{
374 Collection: "app.bsky.actor.status",
375 Record: &lexutil.LexiconTypeDecoder{Val: &status},
376 Rkey: "self",
377 Repo: repoDID,
378 SwapRecord: swapRecord,
379 }
380 out := atproto.RepoPutRecord_Output{}
381
382 ss.lastStatusCID = &out.Cid
383
384 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
385 if err != nil {
386 return fmt.Errorf("could not create record: %w", err)
387 }
388 log.Debug(ctx, "created status record", "out", out)
389
390 ss.lastStatus = time.Now()
391
392 return nil
393}
394
395func (ss *StreamSession) DeleteStatus(repoDID string) error {
396 // need a special extra context because the stream session context is already cancelled
397 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID)
398 ss.lastStatusLock.Lock()
399 defer ss.lastStatusLock.Unlock()
400 if ss.lastStatusCID == nil {
401 log.Debug(ctx, "no status cid to delete")
402 return nil
403 }
404 inp := atproto.RepoDeleteRecord_Input{
405 Collection: "app.bsky.actor.status",
406 Rkey: "self",
407 Repo: repoDID,
408 }
409 inp.SwapRecord = ss.lastStatusCID
410 out := atproto.RepoDeleteRecord_Output{}
411
412 session, err := ss.statefulDB.GetSessionByDID(repoDID)
413 if err != nil {
414 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
415 }
416 if session == nil {
417 return fmt.Errorf("no session found for repoDID: %s", repoDID)
418 }
419
420 session, err = ss.op.RefreshIfNeeded(session)
421 if err != nil {
422 return fmt.Errorf("could not refresh session for repoDID: %w", err)
423 }
424
425 client, err := ss.op.GetXrpcClient(session)
426 if err != nil {
427 return fmt.Errorf("could not get xrpc client: %w", err)
428 }
429
430 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out)
431 if err != nil {
432 return fmt.Errorf("could not delete record: %w", err)
433 }
434
435 ss.lastStatusCID = nil
436 return nil
437}
438
439var originUpdateInterval = time.Second * 30
440
441func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error {
442 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
443 ss.lastOriginLock.Lock()
444 defer ss.lastOriginLock.Unlock()
445 if time.Since(ss.lastOriginTime) < originUpdateInterval {
446 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago")
447 return nil
448 }
449 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost)
450 origin := streamplace.BroadcastOrigin{
451 Streamer: ss.repoDID,
452 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost),
453 Broadcaster: &broadcaster,
454 UpdatedAt: time.Now().Format(util.ISO8601),
455 IrohTicket: &ss.swarm.NodeTicket,
456 }
457
458 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID)
459 if err != nil {
460 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
461 }
462 if session == nil {
463 return fmt.Errorf("no session found for repoDID: %s", ss.repoDID)
464 }
465
466 session, err = ss.op.RefreshIfNeeded(session)
467 if err != nil {
468 return fmt.Errorf("could not refresh session for repoDID: %w", err)
469 }
470
471 client, err := ss.op.GetXrpcClient(session)
472 if err != nil {
473 return fmt.Errorf("could not get xrpc client: %w", err)
474 }
475
476 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost)
477
478 var swapRecord *string
479 getOutput := atproto.RepoGetRecord_Output{}
480 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
481 "repo": ss.repoDID,
482 "collection": "place.stream.broadcast.origin",
483 "rkey": rkey,
484 }, nil, &getOutput)
485 if err != nil {
486 xErr, ok := err.(*xrpc.Error)
487 if !ok {
488 return fmt.Errorf("could not get record: %w", err)
489 }
490 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
491 return fmt.Errorf("could not get record: %w", err)
492 }
493 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID)
494 } else {
495 log.Debug(ctx, "got record", "record", getOutput)
496 swapRecord = getOutput.Cid
497 }
498
499 inp := atproto.RepoPutRecord_Input{
500 Collection: "place.stream.broadcast.origin",
501 Record: &lexutil.LexiconTypeDecoder{Val: &origin},
502 Rkey: fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost),
503 Repo: ss.repoDID,
504 SwapRecord: swapRecord,
505 }
506 out := atproto.RepoPutRecord_Output{}
507
508 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
509 if err != nil {
510 return fmt.Errorf("could not create record: %w", err)
511 }
512
513 ss.lastOriginTime = time.Now()
514 return nil
515}
516
517func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
518 rs, err := renditions.GenerateRenditions(spseg)
519 if err != nil {
520 return fmt.Errorf("failed to generated renditions: %w", err)
521 }
522
523 if ss.lp == nil {
524 var err error
525 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL)
526 if err != nil {
527 return err
528 }
529
530 }
531 spmetrics.TranscodeAttemptsTotal.Inc()
532 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
533 if err != nil {
534 spmetrics.TranscodeErrorsTotal.Inc()
535 return err
536 }
537 if len(rs) != len(segs) {
538 spmetrics.TranscodeErrorsTotal.Inc()
539 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
540 }
541 spmetrics.TranscodeSuccessesTotal.Inc()
542 aqt, err := aqtime.FromString(spseg.StartTime)
543 if err != nil {
544 return err
545 }
546 for i, seg := range segs {
547 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
548 log.Debug(ctx, "publishing segment", "rendition", rs[i])
549 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
550 if err != nil {
551 return fmt.Errorf("failed to create transcoded segment file: %w", err)
552 }
553 defer fd.Close()
554 _, err = fd.Write(seg)
555 if err != nil {
556 return fmt.Errorf("failed to write transcoded segment file: %w", err)
557 }
558 ss.Go(ctx, func() error {
559 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
560 Filepath: fd.Name(),
561 Data: seg,
562 })
563 })
564
565 }
566 return nil
567}
568
569func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
570 ss.Go(ctx, func() error {
571 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
572 })
573 ss.Go(ctx, func() error {
574 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
575 })
576 return nil
577}
578
579func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
580 packet, err := media.Packetize(ctx, seg)
581 if err != nil {
582 return fmt.Errorf("failed to packetize segment: %w", err)
583 }
584 seg.PacketizedData = packet
585 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
586 return nil
587}
588
589func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
590 buf := bytes.Buffer{}
591 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
592 if err != nil {
593 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
594 }
595 // newSeg := &streamplace.Segment{
596 // LexiconTypeID: "place.stream.segment",
597 // Id: spseg.Id,
598 // Creator: spseg.Creator,
599 // StartTime: spseg.StartTime,
600 // Duration: &dur,
601 // Audio: spseg.Audio,
602 // Video: spseg.Video,
603 // SigningKey: spseg.SigningKey,
604 // }
605 aqt, err := aqtime.FromString(spseg.StartTime)
606 if err != nil {
607 return fmt.Errorf("failed to parse segment start time: %w", err)
608 }
609 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
610 rend, err := ss.hls.GetRendition(rendition)
611 if err != nil {
612 return fmt.Errorf("failed to get rendition: %w", err)
613 }
614 if err := rend.NewSegment(&media.Segment{
615 Buf: &buf,
616 Duration: time.Duration(dur),
617 Time: aqt.Time(),
618 }); err != nil {
619 return fmt.Errorf("failed to create new segment: %w", err)
620 }
621
622 return nil
623}