Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/url"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/util"
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/google/uuid"
16 "github.com/streamplace/oatproxy/pkg/oatproxy"
17 "golang.org/x/sync/errgroup"
18 "stream.place/streamplace/pkg/aqhttp"
19 "stream.place/streamplace/pkg/aqtime"
20 "stream.place/streamplace/pkg/bus"
21 "stream.place/streamplace/pkg/config"
22 "stream.place/streamplace/pkg/livepeer"
23 "stream.place/streamplace/pkg/localdb"
24 "stream.place/streamplace/pkg/log"
25 "stream.place/streamplace/pkg/media"
26 "stream.place/streamplace/pkg/model"
27 "stream.place/streamplace/pkg/renditions"
28 "stream.place/streamplace/pkg/replication"
29 "stream.place/streamplace/pkg/spmetrics"
30 "stream.place/streamplace/pkg/statedb"
31 "stream.place/streamplace/pkg/streamplace"
32 "stream.place/streamplace/pkg/thumbnail"
33)
34
35type StreamSession struct {
36 mm *media.MediaManager
37 mod model.Model
38 cli *config.CLI
39 bus *bus.Bus
40 op *oatproxy.OATProxy
41 hls *media.M3U8
42 lp *livepeer.LivepeerSession
43 repoDID string
44 segmentChan chan struct{}
45 lastStatus time.Time
46 lastStatusCID *string
47 lastOriginTime time.Time
48 localDB localdb.LocalDB
49
50 // Channels for background workers
51 statusUpdateChan chan struct{} // Signal to update status
52 originUpdateChan chan struct{} // Signal to update broadcast origin
53
54 g *errgroup.Group
55 started chan struct{}
56 ctx context.Context
57 packets []bus.PacketizedSegment
58 statefulDB *statedb.StatefulDB
59 replicator replication.Replicator
60}
61
62func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error {
63 ctx, cancel := context.WithCancel(ctx)
64 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc()
65 ss.g, ctx = errgroup.WithContext(ctx)
66 sid := livepeer.RandomTrailer(8)
67 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID)
68 ss.ctx = ctx
69 log.Log(ctx, "starting stream session")
70 defer cancel()
71 spseg, err := notif.Segment.ToStreamplaceSegment()
72 if err != nil {
73 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
74 }
75 var allRenditions renditions.Renditions
76
77 if ss.cli.LivepeerGatewayURL != "" {
78 allRenditions, err = renditions.GenerateRenditions(spseg)
79 } else {
80 allRenditions = []renditions.Rendition{}
81 }
82 if err != nil {
83 return err
84 }
85 if spseg.Duration == nil {
86 return fmt.Errorf("segment duration is required to calculate bitrate")
87 }
88 dur := time.Duration(*spseg.Duration)
89 byteLen := len(notif.Data)
90 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
91 sourceRendition := renditions.Rendition{
92 Name: "source",
93 Bitrate: bitrate,
94 Width: spseg.Video[0].Width,
95 Height: spseg.Video[0].Height,
96 }
97 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
98 ss.hls = media.NewM3U8(allRenditions)
99
100 // for _, r := range allRenditions {
101 // g.Go(func() error {
102 // for {
103 // if ctx.Err() != nil {
104 // return nil
105 // }
106 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
107 // if ctx.Err() != nil {
108 // return nil
109 // }
110 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
111 // time.Sleep(time.Second * 5)
112 // }
113 // })
114 // }
115
116 close(ss.started)
117
118 // Start background workers for status and origin updates
119 ss.g.Go(func() error {
120 return ss.statusUpdateLoop(ctx, spseg.Creator)
121 })
122 ss.g.Go(func() error {
123 return ss.originUpdateLoop(ctx)
124 })
125
126 if notif.Local {
127 ss.Go(ctx, func() error {
128 return ss.HandleMultistreamTargets(ctx)
129 })
130 }
131
132 for {
133 select {
134 case <-ss.segmentChan:
135 // reset timer
136 case <-ctx.Done():
137 // Signal all background workers to stop
138 return ss.g.Wait()
139 // case <-time.After(time.Minute * 1):
140 case <-time.After(ss.cli.StreamSessionTimeout):
141 log.Log(ctx, "stream session timeout, shutting down", "timeout", ss.cli.StreamSessionTimeout)
142 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec()
143 for _, r := range allRenditions {
144 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
145 }
146 // Signal background workers to stop
147 if notif.Local {
148 ss.Go(ctx, func() error {
149 return ss.DeleteStatus(spseg.Creator)
150 })
151 }
152 cancel()
153 }
154 }
155}
156
157// Execute a goroutine in the context of the stream session. Errors are
158// non-fatal; if you actually want to melt the universe on an error you
159// should panic()
160func (ss *StreamSession) Go(ctx context.Context, f func() error) {
161 <-ss.started
162 ss.g.Go(func() error {
163 err := f()
164 if err != nil {
165 log.Error(ctx, "error in stream_session goroutine", "error", err)
166 }
167 return nil
168 })
169}
170
171func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error {
172 <-ss.started
173 go func() {
174 select {
175 case <-ss.ctx.Done():
176 return
177 case ss.segmentChan <- struct{}{}:
178 }
179 }()
180 aqt := aqtime.FromTime(notif.Segment.StartTime)
181 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString())
182 notif.Segment.MediaData.Size = len(notif.Data)
183 err := ss.localDB.CreateSegment(notif.Segment)
184 if err != nil {
185 return fmt.Errorf("could not add segment to database: %w", err)
186 }
187 spseg, err := notif.Segment.ToStreamplaceSegment()
188 if err != nil {
189 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
190 }
191
192 ss.bus.Publish(spseg.Creator, spseg)
193 ss.Go(ctx, func() error {
194 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
195 Filepath: notif.Segment.ID,
196 Data: notif.Data,
197 })
198 })
199
200 if ss.cli.Thumbnail {
201 ss.Go(ctx, func() error {
202 return ss.Thumbnail(ctx, spseg.Creator, notif)
203 })
204 }
205
206 if notif.Local {
207 ss.UpdateStatus(ctx, spseg.Creator)
208 ss.UpdateBroadcastOrigin(ctx)
209 }
210
211 if ss.cli.LivepeerGatewayURL != "" {
212 ss.Go(ctx, func() error {
213 start := time.Now()
214 err := ss.Transcode(ctx, spseg, notif.Data)
215 took := time.Since(start)
216 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
217 return err
218 })
219 }
220
221 // trigger a notification blast if this is a new livestream
222 if notif.Metadata.Livestream != nil {
223 ss.Go(ctx, func() error {
224 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator)
225 if err != nil {
226 return fmt.Errorf("failed to get repo: %w", err)
227 }
228 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator)
229 if err != nil {
230 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
231 }
232 if livestreamModel == nil {
233 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator)
234 return nil
235 }
236 lsv, err := livestreamModel.ToLivestreamView()
237 if err != nil {
238 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err)
239 }
240 if !shouldNotify(lsv) {
241 log.Debug(ctx, "is not set to notify", "repoDID", spseg.Creator)
242 return nil
243 }
244 task := &statedb.NotificationTask{
245 Livestream: lsv,
246 PDSURL: r.PDS,
247 }
248 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator)
249 if err != nil {
250 return fmt.Errorf("failed to get chat profile: %w", err)
251 }
252 if cp != nil {
253 spcp, err := cp.ToStreamplaceChatProfile()
254 if err != nil {
255 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err)
256 }
257 task.ChatProfile = spcp
258 }
259
260 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri)))
261 if err != nil {
262 log.Error(ctx, "failed to enqueue notification task", "err", err)
263 }
264 ss.UpdateStatus(ctx, spseg.Creator)
265 return nil
266 })
267 } else {
268 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator)
269 }
270
271 return nil
272}
273
274func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool {
275 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
276 if !ok {
277 return true
278 }
279 if lsvr.NotificationSettings == nil {
280 return true
281 }
282 settings := lsvr.NotificationSettings
283 if settings.PushNotification == nil {
284 return true
285 }
286 return *settings.PushNotification
287}
288
289func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
290 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
291 locked := lock.TryLock()
292 if !locked {
293 // we're already generating a thumbnail for this user, skip
294 return nil
295 }
296 defer lock.Unlock()
297 oldThumb, err := ss.localDB.LatestThumbnailForUser(not.Segment.RepoDID)
298 if err != nil {
299 return err
300 }
301 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
302 // we have a thumbnail <60sec old, skip generating a new one
303 return nil
304 }
305 r := bytes.NewReader(not.Data)
306 aqt := aqtime.FromTime(not.Segment.StartTime)
307 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
308 if err != nil {
309 return err
310 }
311 defer fd.Close()
312 err = media.Thumbnail(ctx, r, fd, "jpeg")
313 if err != nil {
314 return err
315 }
316 thumb := &localdb.Thumbnail{
317 Format: "jpeg",
318 SegmentID: not.Segment.ID,
319 }
320 err = ss.localDB.CreateThumbnail(thumb)
321 if err != nil {
322 return err
323 }
324 return nil
325}
326
327// UpdateStatus signals the background worker to update status (non-blocking)
328func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) {
329 select {
330 case ss.statusUpdateChan <- struct{}{}:
331 default:
332 // Channel full, signal already pending
333 }
334}
335
336// statusUpdateLoop runs as a background goroutine for the session lifetime
337func (ss *StreamSession) statusUpdateLoop(ctx context.Context, repoDID string) error {
338 ctx = log.WithLogValues(ctx, "func", "statusUpdateLoop")
339 for {
340 select {
341 case <-ctx.Done():
342 return nil
343 case <-ss.statusUpdateChan:
344 if time.Since(ss.lastStatus) < time.Minute {
345 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
346 continue
347 }
348 if err := ss.doUpdateStatus(ctx, repoDID); err != nil {
349 log.Error(ctx, "failed to update status", "error", err)
350 }
351 }
352 }
353}
354
355// doUpdateStatus performs the actual status update work
356func (ss *StreamSession) doUpdateStatus(ctx context.Context, repoDID string) error {
357 ctx = log.WithLogValues(ctx, "func", "doUpdateStatus")
358
359 client, err := ss.GetClientByDID(repoDID)
360 if err != nil {
361 return fmt.Errorf("could not get xrpc client: %w", err)
362 }
363
364 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
365 if err != nil {
366 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
367 }
368 lsv, err := ls.ToLivestreamView()
369 if err != nil {
370 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
371 }
372
373 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
374 if !ok {
375 return fmt.Errorf("livestream is not a streamplace livestream")
376 }
377 thumb := lsvr.Thumb
378
379 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
380 if err != nil {
381 return fmt.Errorf("could not get repo for repoDID: %w", err)
382 }
383
384 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
385 if !ok {
386 return fmt.Errorf("livestream is not a streamplace livestream")
387 }
388
389 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle)
390
391 if lsr.CanonicalUrl != nil {
392 canonicalUrl = *lsr.CanonicalUrl
393 }
394
395 actorStatusEmbed := bsky.ActorStatus_Embed{
396 EmbedExternal: &bsky.EmbedExternal{
397 External: &bsky.EmbedExternal_External{
398 Title: lsr.Title,
399 Uri: canonicalUrl,
400 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost),
401 Thumb: thumb,
402 },
403 },
404 }
405
406 duration := int64(10)
407 status := bsky.ActorStatus{
408 Status: "app.bsky.actor.status#live",
409 DurationMinutes: &duration,
410 Embed: &actorStatusEmbed,
411 CreatedAt: time.Now().Format(time.RFC3339),
412 }
413
414 var swapRecord *string
415 getOutput := comatproto.RepoGetRecord_Output{}
416 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
417 "repo": repoDID,
418 "collection": "app.bsky.actor.status",
419 "rkey": "self",
420 }, nil, &getOutput)
421 if err != nil {
422 xErr, ok := err.(*xrpc.Error)
423 if !ok {
424 return fmt.Errorf("could not get record: %w", err)
425 }
426 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
427 return fmt.Errorf("could not get record: %w", err)
428 }
429 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
430 } else {
431 log.Debug(ctx, "got record", "record", getOutput)
432 swapRecord = getOutput.Cid
433 }
434
435 inp := comatproto.RepoPutRecord_Input{
436 Collection: "app.bsky.actor.status",
437 Record: &lexutil.LexiconTypeDecoder{Val: &status},
438 Rkey: "self",
439 Repo: repoDID,
440 SwapRecord: swapRecord,
441 }
442 out := comatproto.RepoPutRecord_Output{}
443
444 ss.lastStatusCID = &out.Cid
445
446 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
447 if err != nil {
448 return fmt.Errorf("could not create record: %w", err)
449 }
450 log.Debug(ctx, "created status record", "out", out)
451
452 ss.lastStatus = time.Now()
453
454 return nil
455}
456
457func (ss *StreamSession) DeleteStatus(repoDID string) error {
458 // need a special extra context because the stream session context is already cancelled
459 // No lock needed - this runs during teardown after the background worker has exited
460 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID)
461 if ss.lastStatusCID == nil {
462 log.Debug(ctx, "no status cid to delete")
463 return nil
464 }
465 inp := comatproto.RepoDeleteRecord_Input{
466 Collection: "app.bsky.actor.status",
467 Rkey: "self",
468 Repo: repoDID,
469 }
470 inp.SwapRecord = ss.lastStatusCID
471 out := comatproto.RepoDeleteRecord_Output{}
472
473 client, err := ss.GetClientByDID(repoDID)
474 if err != nil {
475 return fmt.Errorf("could not get xrpc client: %w", err)
476 }
477
478 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out)
479 if err != nil {
480 return fmt.Errorf("could not delete record: %w", err)
481 }
482
483 ss.lastStatusCID = nil
484 return nil
485}
486
487var originUpdateInterval = time.Second * 30
488
489// UpdateBroadcastOrigin signals the background worker to update origin (non-blocking)
490func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) {
491 select {
492 case ss.originUpdateChan <- struct{}{}:
493 default:
494 // Channel full, signal already pending
495 }
496}
497
498// originUpdateLoop runs as a background goroutine for the session lifetime
499func (ss *StreamSession) originUpdateLoop(ctx context.Context) error {
500 ctx = log.WithLogValues(ctx, "func", "originUpdateLoop")
501 for {
502 select {
503 case <-ctx.Done():
504 return nil
505 case <-ss.originUpdateChan:
506 if time.Since(ss.lastOriginTime) < originUpdateInterval {
507 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago")
508 continue
509 }
510 if err := ss.doUpdateBroadcastOrigin(ctx); err != nil {
511 log.Error(ctx, "failed to update broadcast origin", "error", err)
512 }
513 }
514 }
515}
516
517// doUpdateBroadcastOrigin performs the actual broadcast origin update work
518func (ss *StreamSession) doUpdateBroadcastOrigin(ctx context.Context) error {
519 ctx = log.WithLogValues(ctx, "func", "doUpdateBroadcastOrigin")
520
521 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost)
522 origin := streamplace.BroadcastOrigin{
523 Streamer: ss.repoDID,
524 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost),
525 Broadcaster: &broadcaster,
526 UpdatedAt: time.Now().UTC().Format(util.ISO8601),
527 }
528 err := ss.replicator.BuildOriginRecord(&origin)
529 if err != nil {
530 return fmt.Errorf("could not build origin record: %w", err)
531 }
532
533 client, err := ss.GetClientByDID(ss.repoDID)
534 if err != nil {
535 return fmt.Errorf("could not get xrpc client for repoDID: %w", err)
536 }
537
538 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost)
539
540 var swapRecord *string
541 getOutput := comatproto.RepoGetRecord_Output{}
542 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
543 "repo": ss.repoDID,
544 "collection": "place.stream.broadcast.origin",
545 "rkey": rkey,
546 }, nil, &getOutput)
547 if err != nil {
548 xErr, ok := err.(*xrpc.Error)
549 if !ok {
550 return fmt.Errorf("could not get record: %w", err)
551 }
552 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
553 return fmt.Errorf("could not get record: %w", err)
554 }
555 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID)
556 } else {
557 log.Debug(ctx, "got record", "record", getOutput)
558 swapRecord = getOutput.Cid
559 }
560
561 inp := comatproto.RepoPutRecord_Input{
562 Collection: "place.stream.broadcast.origin",
563 Record: &lexutil.LexiconTypeDecoder{Val: &origin},
564 Rkey: rkey,
565 Repo: ss.repoDID,
566 SwapRecord: swapRecord,
567 }
568 out := comatproto.RepoPutRecord_Output{}
569
570 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
571 if err != nil {
572 return fmt.Errorf("could not create record: %w", err)
573 }
574
575 ss.lastOriginTime = time.Now()
576 return nil
577}
578
579func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
580 rs, err := renditions.GenerateRenditions(spseg)
581 if err != nil {
582 return fmt.Errorf("failed to generated renditions: %w", err)
583 }
584
585 if ss.lp == nil {
586 var err error
587 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL)
588 if err != nil {
589 return err
590 }
591
592 }
593 spmetrics.TranscodeAttemptsTotal.Inc()
594 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
595 if err != nil {
596 spmetrics.TranscodeErrorsTotal.Inc()
597 return err
598 }
599 if len(rs) != len(segs) {
600 spmetrics.TranscodeErrorsTotal.Inc()
601 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
602 }
603 spmetrics.TranscodeSuccessesTotal.Inc()
604 aqt, err := aqtime.FromString(spseg.StartTime)
605 if err != nil {
606 return err
607 }
608 for i, seg := range segs {
609 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
610 log.Debug(ctx, "publishing segment", "rendition", rs[i])
611 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
612 if err != nil {
613 return fmt.Errorf("failed to create transcoded segment file: %w", err)
614 }
615 defer fd.Close()
616 _, err = fd.Write(seg)
617 if err != nil {
618 return fmt.Errorf("failed to write transcoded segment file: %w", err)
619 }
620 ss.Go(ctx, func() error {
621 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
622 Filepath: fd.Name(),
623 Data: seg,
624 })
625 })
626
627 }
628 return nil
629}
630
631func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
632 ss.Go(ctx, func() error {
633 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
634 })
635 ss.Go(ctx, func() error {
636 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
637 })
638 return nil
639}
640
641func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
642 packet, err := media.Packetize(ctx, seg)
643 if err != nil {
644 return fmt.Errorf("failed to packetize segment: %w", err)
645 }
646 seg.PacketizedData = packet
647 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
648 return nil
649}
650
651func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
652 buf := bytes.Buffer{}
653 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
654 if err != nil {
655 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
656 }
657 // newSeg := &streamplace.Segment{
658 // LexiconTypeID: "place.stream.segment",
659 // Id: spseg.Id,
660 // Creator: spseg.Creator,
661 // StartTime: spseg.StartTime,
662 // Duration: &dur,
663 // Audio: spseg.Audio,
664 // Video: spseg.Video,
665 // SigningKey: spseg.SigningKey,
666 // }
667 aqt, err := aqtime.FromString(spseg.StartTime)
668 if err != nil {
669 return fmt.Errorf("failed to parse segment start time: %w", err)
670 }
671 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
672 rend, err := ss.hls.GetRendition(rendition)
673 if err != nil {
674 return fmt.Errorf("failed to get rendition: %w", err)
675 }
676 if err := rend.NewSegment(&media.Segment{
677 Buf: &buf,
678 Duration: time.Duration(dur),
679 Time: aqt.Time(),
680 }); err != nil {
681 return fmt.Errorf("failed to create new segment: %w", err)
682 }
683
684 return nil
685}
686
687type XRPCClient interface {
688 Do(ctx context.Context, method string, contentType string, path string, queryParams map[string]any, body any, out any) error
689}
690
691func (ss *StreamSession) GetClientByDID(did string) (XRPCClient, error) {
692 password, ok := ss.cli.DevAccountCreds[did]
693 if ok {
694 repo, err := ss.mod.GetRepoByHandleOrDID(did)
695 if err != nil {
696 return nil, fmt.Errorf("could not get repo by did: %w", err)
697 }
698 if repo == nil {
699 return nil, fmt.Errorf("repo not found for did: %s", did)
700 }
701 anonXRPCC := &xrpc.Client{
702 Host: repo.PDS,
703 Client: &aqhttp.Client,
704 }
705 session, err := comatproto.ServerCreateSession(context.Background(), anonXRPCC, &comatproto.ServerCreateSession_Input{
706 Identifier: repo.DID,
707 Password: password,
708 })
709 if err != nil {
710 return nil, fmt.Errorf("could not create session: %w", err)
711 }
712
713 log.Warn(context.Background(), "created session for dev account", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS)
714
715 return &xrpc.Client{
716 Host: repo.PDS,
717 Client: &aqhttp.Client,
718 Auth: &xrpc.AuthInfo{
719 Did: repo.DID,
720 AccessJwt: session.AccessJwt,
721 RefreshJwt: session.RefreshJwt,
722 Handle: repo.Handle,
723 },
724 }, nil
725 }
726 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID)
727 if err != nil {
728 return nil, fmt.Errorf("could not get OAuth session for repoDID: %w", err)
729 }
730 if session == nil {
731 return nil, fmt.Errorf("no session found for repoDID: %s", ss.repoDID)
732 }
733
734 session, err = ss.op.RefreshIfNeeded(session)
735 if err != nil {
736 return nil, fmt.Errorf("could not refresh session for repoDID: %w", err)
737 }
738
739 client, err := ss.op.GetXrpcClient(session)
740 if err != nil {
741 return nil, fmt.Errorf("could not get xrpc client: %w", err)
742 }
743
744 return client, nil
745}
746
747type runningMultistream struct {
748 cancel func()
749 key string
750 pushID string
751 url string
752}
753
754func sanitizeMultistreamTargetURL(uri string) string {
755 u, err := url.Parse(uri)
756 if err != nil {
757 return uri
758 }
759 u.Path = "/redacted"
760 return u.String()
761}
762
763// we're making an attempt here not to log (sensitive) stream keys, so we're
764// referencing by atproto URI
765func (ss *StreamSession) HandleMultistreamTargets(ctx context.Context) error {
766 ctx = log.WithLogValues(ctx, "system", "multistreaming")
767 isTrue := true
768 // {target.Uri}:{rec.Url} -> runningMultistream
769 // no concurrency issues, it's only used from this one loop
770 running := map[string]*runningMultistream{}
771 for {
772 targets, err := ss.statefulDB.ListMultistreamTargets(ss.repoDID, 100, 0, &isTrue)
773 if err != nil {
774 return fmt.Errorf("failed to list multistream targets: %w", err)
775 }
776 currentRunning := map[string]bool{}
777 for _, targetView := range targets {
778 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget)
779 if !ok {
780 log.Error(ctx, "failed to convert multistream target to streamplace multistream target", "uri", targetView.Uri)
781 continue
782 }
783 uu, err := uuid.NewV7()
784 if err != nil {
785 return err
786 }
787 ctx := log.WithLogValues(ctx, "url", sanitizeMultistreamTargetURL(rec.Url), "pushID", uu.String())
788 key := fmt.Sprintf("%s:%s", targetView.Uri, rec.Url)
789 if running[key] == nil {
790 childCtx, childCancel := context.WithCancel(ctx)
791 ss.Go(ctx, func() error {
792 log.Log(ctx, "starting multistream target", "uri", targetView.Uri)
793 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, "starting multistream target", "pending")
794 if err != nil {
795 log.Error(ctx, "failed to create multistream event", "error", err)
796 }
797 return ss.StartMultistreamTarget(childCtx, targetView)
798 })
799 running[key] = &runningMultistream{
800 cancel: childCancel,
801 key: key,
802 pushID: uu.String(),
803 url: sanitizeMultistreamTargetURL(rec.Url),
804 }
805 }
806 currentRunning[key] = true
807 }
808 for key := range running {
809 if !currentRunning[key] {
810 log.Log(ctx, "stopping multistream target", "url", sanitizeMultistreamTargetURL(running[key].url), "pushID", running[key].pushID)
811 running[key].cancel()
812 delete(running, key)
813 }
814 }
815 select {
816 case <-ctx.Done():
817 return nil
818 case <-time.After(time.Second * 5):
819 continue
820 }
821 }
822}
823
824func (ss *StreamSession) StartMultistreamTarget(ctx context.Context, targetView *streamplace.MultistreamDefs_TargetView) error {
825 for {
826 err := ss.mm.RTMPPush(ctx, ss.repoDID, "source", targetView)
827 if err != nil {
828 log.Error(ctx, "failed to push to RTMP server", "error", err)
829 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, err.Error(), "error")
830 if err != nil {
831 log.Error(ctx, "failed to create multistream event", "error", err)
832 }
833 }
834 select {
835 case <-ctx.Done():
836 return nil
837 case <-time.After(time.Second * 5):
838 continue
839 }
840 }
841}