Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "github.com/streamplace/oatproxy/pkg/oatproxy"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/aqtime"
17 "stream.place/streamplace/pkg/bus"
18 "stream.place/streamplace/pkg/config"
19 "stream.place/streamplace/pkg/livepeer"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/media"
22 "stream.place/streamplace/pkg/model"
23 "stream.place/streamplace/pkg/renditions"
24 "stream.place/streamplace/pkg/replication/iroh_replicator"
25 "stream.place/streamplace/pkg/spmetrics"
26 "stream.place/streamplace/pkg/statedb"
27 "stream.place/streamplace/pkg/streamplace"
28 "stream.place/streamplace/pkg/thumbnail"
29)
30
31type StreamSession struct {
32 mm *media.MediaManager
33 mod model.Model
34 cli *config.CLI
35 bus *bus.Bus
36 op *oatproxy.OATProxy
37 hls *media.M3U8
38 lp *livepeer.LivepeerSession
39 repoDID string
40 segmentChan chan struct{}
41 lastStatus time.Time
42 lastStatusLock sync.Mutex
43 lastOriginTime time.Time
44 lastOriginLock sync.Mutex
45 g *errgroup.Group
46 started chan struct{}
47 ctx context.Context
48 packets []bus.PacketizedSegment
49 statefulDB *statedb.StatefulDB
50 swarm *iroh_replicator.IrohSwarm
51}
52
53func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error {
54 ctx, cancel := context.WithCancel(ctx)
55 ss.g, ctx = errgroup.WithContext(ctx)
56 sid := livepeer.RandomTrailer(8)
57 ctx = log.WithLogValues(ctx, "sid", sid)
58 ss.ctx = ctx
59 log.Log(ctx, "starting stream session")
60 defer cancel()
61 spseg, err := notif.Segment.ToStreamplaceSegment()
62 if err != nil {
63 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
64 }
65 var allRenditions renditions.Renditions
66
67 if ss.cli.LivepeerGatewayURL != "" {
68 allRenditions, err = renditions.GenerateRenditions(spseg)
69 } else {
70 allRenditions = []renditions.Rendition{}
71 }
72 if err != nil {
73 return err
74 }
75 if spseg.Duration == nil {
76 return fmt.Errorf("segment duration is required to calculate bitrate")
77 }
78 dur := time.Duration(*spseg.Duration)
79 byteLen := len(notif.Data)
80 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
81 sourceRendition := renditions.Rendition{
82 Name: "source",
83 Bitrate: bitrate,
84 Width: spseg.Video[0].Width,
85 Height: spseg.Video[0].Height,
86 }
87 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
88 ss.hls = media.NewM3U8(allRenditions)
89
90 // for _, r := range allRenditions {
91 // g.Go(func() error {
92 // for {
93 // if ctx.Err() != nil {
94 // return nil
95 // }
96 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
97 // if ctx.Err() != nil {
98 // return nil
99 // }
100 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
101 // time.Sleep(time.Second * 5)
102 // }
103 // })
104 // }
105
106 close(ss.started)
107
108 for {
109 select {
110 case <-ss.segmentChan:
111 // reset timer
112 case <-ctx.Done():
113 return ss.g.Wait()
114 // case <-time.After(time.Minute * 1):
115 case <-time.After(time.Second * 60):
116 log.Log(ctx, "no new segments for 1 minute, shutting down")
117 for _, r := range allRenditions {
118 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
119 }
120 cancel()
121 }
122 }
123}
124
125// Execute a goroutine in the context of the stream session. Errors are
126// non-fatal; if you actually want to melt the universe on an error you
127// should panic()
128func (ss *StreamSession) Go(ctx context.Context, f func() error) {
129 <-ss.started
130 ss.g.Go(func() error {
131 err := f()
132 if err != nil {
133 log.Error(ctx, "error in goroutine", "error", err)
134 }
135 return nil
136 })
137}
138
139func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error {
140 <-ss.started
141 go func() {
142 select {
143 case <-ss.ctx.Done():
144 return
145 case ss.segmentChan <- struct{}{}:
146 }
147 }()
148 aqt := aqtime.FromTime(notif.Segment.StartTime)
149 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString())
150 notif.Segment.MediaData.Size = len(notif.Data)
151 err := ss.mod.CreateSegment(notif.Segment)
152 if err != nil {
153 return fmt.Errorf("could not add segment to database: %w", err)
154 }
155 spseg, err := notif.Segment.ToStreamplaceSegment()
156 if err != nil {
157 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
158 }
159
160 ss.bus.Publish(spseg.Creator, spseg)
161 ss.Go(ctx, func() error {
162 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
163 Filepath: notif.Segment.ID,
164 Data: notif.Data,
165 })
166 })
167
168 if ss.cli.Thumbnail {
169 ss.Go(ctx, func() error {
170 return ss.Thumbnail(ctx, spseg.Creator, notif)
171 })
172 }
173
174 ss.Go(ctx, func() error {
175 return ss.UpdateStatus(ctx, spseg.Creator)
176 })
177
178 if notif.Local {
179 ss.Go(ctx, func() error {
180 return ss.UpdateBroadcastOrigin(ctx)
181 })
182 }
183
184 if ss.cli.LivepeerGatewayURL != "" {
185 ss.Go(ctx, func() error {
186 start := time.Now()
187 err := ss.Transcode(ctx, spseg, notif.Data)
188 took := time.Since(start)
189 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
190 return err
191 })
192 }
193
194 return nil
195}
196
197func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
198 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
199 locked := lock.TryLock()
200 if !locked {
201 // we're already generating a thumbnail for this user, skip
202 return nil
203 }
204 defer lock.Unlock()
205 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
206 if err != nil {
207 return err
208 }
209 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
210 // we have a thumbnail <60sec old, skip generating a new one
211 return nil
212 }
213 r := bytes.NewReader(not.Data)
214 aqt := aqtime.FromTime(not.Segment.StartTime)
215 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
216 if err != nil {
217 return err
218 }
219 defer fd.Close()
220 err = media.Thumbnail(ctx, r, fd, "jpeg")
221 if err != nil {
222 return err
223 }
224 thumb := &model.Thumbnail{
225 Format: "jpeg",
226 SegmentID: not.Segment.ID,
227 }
228 err = ss.mod.CreateThumbnail(thumb)
229 if err != nil {
230 return err
231 }
232 return nil
233}
234
235func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) {
236 if pv == nil {
237 return nil, fmt.Errorf("post view is nil")
238 }
239 rec, ok := pv.Record.Val.(*bsky.FeedPost)
240 if !ok {
241 return nil, fmt.Errorf("post view record is not a feed post")
242 }
243 if rec.Embed == nil {
244 return nil, fmt.Errorf("post view embed is nil")
245 }
246 if rec.Embed.EmbedExternal == nil {
247 return nil, fmt.Errorf("post view embed external view is nil")
248 }
249 if rec.Embed.EmbedExternal.External == nil {
250 return nil, fmt.Errorf("post view embed external is nil")
251 }
252 if rec.Embed.EmbedExternal.External.Thumb == nil {
253 return nil, fmt.Errorf("post view embed external thumb is nil")
254 }
255 return rec.Embed.EmbedExternal.External.Thumb, nil
256}
257
258func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
259 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
260 ss.lastStatusLock.Lock()
261 defer ss.lastStatusLock.Unlock()
262 if time.Since(ss.lastStatus) < time.Minute {
263 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
264 return nil
265 }
266
267 session, err := ss.statefulDB.GetSessionByDID(repoDID)
268 if err != nil {
269 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
270 }
271 if session == nil {
272 return fmt.Errorf("no session found for repoDID: %s", repoDID)
273 }
274
275 session, err = ss.op.RefreshIfNeeded(session)
276 if err != nil {
277 return fmt.Errorf("could not refresh session for repoDID: %w", err)
278 }
279
280 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
281 if err != nil {
282 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
283 }
284 lsv, err := ls.ToLivestreamView()
285 if err != nil {
286 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
287 }
288
289 post, err := ss.mod.GetFeedPost(ls.PostURI)
290 if err != nil {
291 return fmt.Errorf("could not get feed post: %w", err)
292 }
293 if post == nil {
294 return fmt.Errorf("feed post not found for livestream: %w", err)
295 }
296 postView, err := post.ToBskyPostView()
297 if err != nil {
298 return fmt.Errorf("could not convert feed post to bsky post view: %w", err)
299 }
300 thumb, err := getThumbnailCID(postView)
301 if err != nil {
302 return fmt.Errorf("could not get thumbnail cid: %w", err)
303 }
304
305 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
306 if err != nil {
307 return fmt.Errorf("could not get repo for repoDID: %w", err)
308 }
309
310 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
311 if !ok {
312 return fmt.Errorf("livestream is not a streamplace livestream")
313 }
314
315 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle)
316
317 if lsr.CanonicalUrl != nil {
318 canonicalUrl = *lsr.CanonicalUrl
319 }
320
321 actorStatusEmbed := bsky.ActorStatus_Embed{
322 EmbedExternal: &bsky.EmbedExternal{
323 External: &bsky.EmbedExternal_External{
324 Title: lsr.Title,
325 Uri: canonicalUrl,
326 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost),
327 Thumb: thumb,
328 },
329 },
330 }
331
332 duration := int64(2)
333 status := bsky.ActorStatus{
334 Status: "app.bsky.actor.status#live",
335 DurationMinutes: &duration,
336 Embed: &actorStatusEmbed,
337 CreatedAt: time.Now().Format(time.RFC3339),
338 }
339
340 client, err := ss.op.GetXrpcClient(session)
341 if err != nil {
342 return fmt.Errorf("could not get xrpc client: %w", err)
343 }
344
345 var swapRecord *string
346 getOutput := atproto.RepoGetRecord_Output{}
347 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
348 "repo": repoDID,
349 "collection": "app.bsky.actor.status",
350 "rkey": "self",
351 }, nil, &getOutput)
352 if err != nil {
353 xErr, ok := err.(*xrpc.Error)
354 if !ok {
355 return fmt.Errorf("could not get record: %w", err)
356 }
357 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
358 return fmt.Errorf("could not get record: %w", err)
359 }
360 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
361 } else {
362 log.Debug(ctx, "got record", "record", getOutput)
363 swapRecord = getOutput.Cid
364 }
365
366 inp := atproto.RepoPutRecord_Input{
367 Collection: "app.bsky.actor.status",
368 Record: &util.LexiconTypeDecoder{Val: &status},
369 Rkey: "self",
370 Repo: repoDID,
371 SwapRecord: swapRecord,
372 }
373 out := atproto.RepoPutRecord_Output{}
374
375 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
376 if err != nil {
377 return fmt.Errorf("could not create record: %w", err)
378 }
379 log.Debug(ctx, "created status record", "out", out)
380
381 ss.lastStatus = time.Now()
382
383 return nil
384}
385
386var originUpdateInterval = time.Second * 30
387
388func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error {
389 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
390 ss.lastOriginLock.Lock()
391 defer ss.lastOriginLock.Unlock()
392 if time.Since(ss.lastOriginTime) < originUpdateInterval {
393 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago")
394 return nil
395 }
396 origin := streamplace.BroadcastOrigin{
397 Streamer: ss.repoDID,
398 Server: fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost),
399 UpdatedAt: time.Now().Format(time.RFC3339),
400 IrohTicket: &ss.swarm.NodeTicket,
401 }
402
403 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID)
404 if err != nil {
405 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
406 }
407 if session == nil {
408 return fmt.Errorf("no session found for repoDID: %s", ss.repoDID)
409 }
410
411 session, err = ss.op.RefreshIfNeeded(session)
412 if err != nil {
413 return fmt.Errorf("could not refresh session for repoDID: %w", err)
414 }
415
416 client, err := ss.op.GetXrpcClient(session)
417 if err != nil {
418 return fmt.Errorf("could not get xrpc client: %w", err)
419 }
420
421 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost)
422
423 var swapRecord *string
424 getOutput := atproto.RepoGetRecord_Output{}
425 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
426 "repo": ss.repoDID,
427 "collection": "place.stream.broadcast.origin",
428 "rkey": rkey,
429 }, nil, &getOutput)
430 if err != nil {
431 xErr, ok := err.(*xrpc.Error)
432 if !ok {
433 return fmt.Errorf("could not get record: %w", err)
434 }
435 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
436 return fmt.Errorf("could not get record: %w", err)
437 }
438 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID)
439 } else {
440 log.Debug(ctx, "got record", "record", getOutput)
441 swapRecord = getOutput.Cid
442 }
443
444 inp := atproto.RepoPutRecord_Input{
445 Collection: "place.stream.broadcast.origin",
446 Record: &util.LexiconTypeDecoder{Val: &origin},
447 Rkey: fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.BroadcasterHost),
448 Repo: ss.repoDID,
449 SwapRecord: swapRecord,
450 }
451 out := atproto.RepoPutRecord_Output{}
452
453 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
454 if err != nil {
455 return fmt.Errorf("could not create record: %w", err)
456 }
457
458 ss.lastOriginTime = time.Now()
459 return nil
460}
461
462func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
463 rs, err := renditions.GenerateRenditions(spseg)
464 if err != nil {
465 return fmt.Errorf("failed to generated renditions: %w", err)
466 }
467
468 if ss.lp == nil {
469 var err error
470 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL)
471 if err != nil {
472 return err
473 }
474
475 }
476 spmetrics.TranscodeAttemptsTotal.Inc()
477 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
478 if err != nil {
479 spmetrics.TranscodeErrorsTotal.Inc()
480 return err
481 }
482 if len(rs) != len(segs) {
483 spmetrics.TranscodeErrorsTotal.Inc()
484 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
485 }
486 spmetrics.TranscodeSuccessesTotal.Inc()
487 aqt, err := aqtime.FromString(spseg.StartTime)
488 if err != nil {
489 return err
490 }
491 for i, seg := range segs {
492 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
493 log.Debug(ctx, "publishing segment", "rendition", rs[i])
494 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
495 if err != nil {
496 return fmt.Errorf("failed to create transcoded segment file: %w", err)
497 }
498 defer fd.Close()
499 _, err = fd.Write(seg)
500 if err != nil {
501 return fmt.Errorf("failed to write transcoded segment file: %w", err)
502 }
503 ss.Go(ctx, func() error {
504 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
505 Filepath: fd.Name(),
506 Data: seg,
507 })
508 })
509
510 }
511 return nil
512}
513
514func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
515 ss.Go(ctx, func() error {
516 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
517 })
518 ss.Go(ctx, func() error {
519 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
520 })
521 return nil
522}
523
524func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
525 packet, err := media.Packetize(ctx, seg)
526 if err != nil {
527 return fmt.Errorf("failed to packetize segment: %w", err)
528 }
529 seg.PacketizedData = packet
530 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
531 return nil
532}
533
534func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
535 buf := bytes.Buffer{}
536 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
537 if err != nil {
538 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
539 }
540 // newSeg := &streamplace.Segment{
541 // LexiconTypeID: "place.stream.segment",
542 // Id: spseg.Id,
543 // Creator: spseg.Creator,
544 // StartTime: spseg.StartTime,
545 // Duration: &dur,
546 // Audio: spseg.Audio,
547 // Video: spseg.Video,
548 // SigningKey: spseg.SigningKey,
549 // }
550 aqt, err := aqtime.FromString(spseg.StartTime)
551 if err != nil {
552 return fmt.Errorf("failed to parse segment start time: %w", err)
553 }
554 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
555 rend, err := ss.hls.GetRendition(rendition)
556 if err != nil {
557 return fmt.Errorf("failed to get rendition: %w", err)
558 }
559 if err := rend.NewSegment(&media.Segment{
560 Buf: &buf,
561 Duration: time.Duration(dur),
562 Time: aqt.Time(),
563 }); err != nil {
564 return fmt.Errorf("failed to create new segment: %w", err)
565 }
566
567 return nil
568}