Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/url"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/util"
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/google/uuid"
16 "github.com/streamplace/oatproxy/pkg/oatproxy"
17 "golang.org/x/sync/errgroup"
18 "stream.place/streamplace/pkg/aqhttp"
19 "stream.place/streamplace/pkg/aqtime"
20 "stream.place/streamplace/pkg/bus"
21 "stream.place/streamplace/pkg/config"
22 "stream.place/streamplace/pkg/livepeer"
23 "stream.place/streamplace/pkg/log"
24 "stream.place/streamplace/pkg/media"
25 "stream.place/streamplace/pkg/model"
26 "stream.place/streamplace/pkg/renditions"
27 "stream.place/streamplace/pkg/replication"
28 "stream.place/streamplace/pkg/spmetrics"
29 "stream.place/streamplace/pkg/statedb"
30 "stream.place/streamplace/pkg/streamplace"
31 "stream.place/streamplace/pkg/thumbnail"
32)
33
34type StreamSession struct {
35 mm *media.MediaManager
36 mod model.Model
37 cli *config.CLI
38 bus *bus.Bus
39 op *oatproxy.OATProxy
40 hls *media.M3U8
41 lp *livepeer.LivepeerSession
42 repoDID string
43 segmentChan chan struct{}
44 lastStatus time.Time
45 lastStatusCID *string
46 lastOriginTime time.Time
47
48 // Channels for background workers
49 statusUpdateChan chan struct{} // Signal to update status
50 originUpdateChan chan struct{} // Signal to update broadcast origin
51
52 g *errgroup.Group
53 started chan struct{}
54 ctx context.Context
55 packets []bus.PacketizedSegment
56 statefulDB *statedb.StatefulDB
57 replicator replication.Replicator
58}
59
60func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error {
61 ctx, cancel := context.WithCancel(ctx)
62 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc()
63 ss.g, ctx = errgroup.WithContext(ctx)
64 sid := livepeer.RandomTrailer(8)
65 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID)
66 ss.ctx = ctx
67 log.Log(ctx, "starting stream session")
68 defer cancel()
69 spseg, err := notif.Segment.ToStreamplaceSegment()
70 if err != nil {
71 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
72 }
73 var allRenditions renditions.Renditions
74
75 if ss.cli.LivepeerGatewayURL != "" {
76 allRenditions, err = renditions.GenerateRenditions(spseg)
77 } else {
78 allRenditions = []renditions.Rendition{}
79 }
80 if err != nil {
81 return err
82 }
83 if spseg.Duration == nil {
84 return fmt.Errorf("segment duration is required to calculate bitrate")
85 }
86 dur := time.Duration(*spseg.Duration)
87 byteLen := len(notif.Data)
88 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
89 sourceRendition := renditions.Rendition{
90 Name: "source",
91 Bitrate: bitrate,
92 Width: spseg.Video[0].Width,
93 Height: spseg.Video[0].Height,
94 }
95 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
96 ss.hls = media.NewM3U8(allRenditions)
97
98 // for _, r := range allRenditions {
99 // g.Go(func() error {
100 // for {
101 // if ctx.Err() != nil {
102 // return nil
103 // }
104 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
105 // if ctx.Err() != nil {
106 // return nil
107 // }
108 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
109 // time.Sleep(time.Second * 5)
110 // }
111 // })
112 // }
113
114 close(ss.started)
115
116 // Start background workers for status and origin updates
117 ss.g.Go(func() error {
118 return ss.statusUpdateLoop(ctx, spseg.Creator)
119 })
120 ss.g.Go(func() error {
121 return ss.originUpdateLoop(ctx)
122 })
123
124 ss.Go(ctx, func() error {
125 return ss.HandleMultistreamTargets(ctx)
126 })
127
128 for {
129 select {
130 case <-ss.segmentChan:
131 // reset timer
132 case <-ctx.Done():
133 // Signal all background workers to stop
134 return ss.g.Wait()
135 // case <-time.After(time.Minute * 1):
136 case <-time.After(ss.cli.StreamSessionTimeout):
137 log.Log(ctx, "stream session timeout, shutting down", "timeout", ss.cli.StreamSessionTimeout)
138 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec()
139 for _, r := range allRenditions {
140 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
141 }
142 // Signal background workers to stop
143 if notif.Local {
144 ss.Go(ctx, func() error {
145 return ss.DeleteStatus(spseg.Creator)
146 })
147 }
148 cancel()
149 }
150 }
151}
152
153// Execute a goroutine in the context of the stream session. Errors are
154// non-fatal; if you actually want to melt the universe on an error you
155// should panic()
156func (ss *StreamSession) Go(ctx context.Context, f func() error) {
157 <-ss.started
158 ss.g.Go(func() error {
159 err := f()
160 if err != nil {
161 log.Error(ctx, "error in stream_session goroutine", "error", err)
162 }
163 return nil
164 })
165}
166
167func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error {
168 <-ss.started
169 go func() {
170 select {
171 case <-ss.ctx.Done():
172 return
173 case ss.segmentChan <- struct{}{}:
174 }
175 }()
176 aqt := aqtime.FromTime(notif.Segment.StartTime)
177 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString())
178 notif.Segment.MediaData.Size = len(notif.Data)
179 err := ss.mod.CreateSegment(notif.Segment)
180 if err != nil {
181 return fmt.Errorf("could not add segment to database: %w", err)
182 }
183 spseg, err := notif.Segment.ToStreamplaceSegment()
184 if err != nil {
185 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
186 }
187
188 ss.bus.Publish(spseg.Creator, spseg)
189 ss.Go(ctx, func() error {
190 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
191 Filepath: notif.Segment.ID,
192 Data: notif.Data,
193 })
194 })
195
196 if ss.cli.Thumbnail {
197 ss.Go(ctx, func() error {
198 return ss.Thumbnail(ctx, spseg.Creator, notif)
199 })
200 }
201
202 if notif.Local {
203 ss.UpdateStatus(ctx, spseg.Creator)
204 ss.UpdateBroadcastOrigin(ctx)
205 }
206
207 if ss.cli.LivepeerGatewayURL != "" {
208 ss.Go(ctx, func() error {
209 start := time.Now()
210 err := ss.Transcode(ctx, spseg, notif.Data)
211 took := time.Since(start)
212 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
213 return err
214 })
215 }
216
217 // trigger a notification blast if this is a new livestream
218 if notif.Metadata.Livestream != nil {
219 ss.Go(ctx, func() error {
220 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator)
221 if err != nil {
222 return fmt.Errorf("failed to get repo: %w", err)
223 }
224 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator)
225 if err != nil {
226 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
227 }
228 if livestreamModel == nil {
229 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator)
230 return nil
231 }
232 lsv, err := livestreamModel.ToLivestreamView()
233 if err != nil {
234 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err)
235 }
236 if !shouldNotify(lsv) {
237 log.Debug(ctx, "is not set to notify", "repoDID", spseg.Creator)
238 return nil
239 }
240 task := &statedb.NotificationTask{
241 Livestream: lsv,
242 PDSURL: r.PDS,
243 }
244 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator)
245 if err != nil {
246 return fmt.Errorf("failed to get chat profile: %w", err)
247 }
248 if cp != nil {
249 spcp, err := cp.ToStreamplaceChatProfile()
250 if err != nil {
251 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err)
252 }
253 task.ChatProfile = spcp
254 }
255
256 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri)))
257 if err != nil {
258 log.Error(ctx, "failed to enqueue notification task", "err", err)
259 }
260 ss.UpdateStatus(ctx, spseg.Creator)
261 return nil
262 })
263 } else {
264 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator)
265 }
266
267 return nil
268}
269
270func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool {
271 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
272 if !ok {
273 return true
274 }
275 if lsvr.NotificationSettings == nil {
276 return true
277 }
278 settings := lsvr.NotificationSettings
279 if settings.PushNotification == nil {
280 return true
281 }
282 return *settings.PushNotification
283}
284
285func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
286 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
287 locked := lock.TryLock()
288 if !locked {
289 // we're already generating a thumbnail for this user, skip
290 return nil
291 }
292 defer lock.Unlock()
293 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
294 if err != nil {
295 return err
296 }
297 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
298 // we have a thumbnail <60sec old, skip generating a new one
299 return nil
300 }
301 r := bytes.NewReader(not.Data)
302 aqt := aqtime.FromTime(not.Segment.StartTime)
303 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
304 if err != nil {
305 return err
306 }
307 defer fd.Close()
308 err = media.Thumbnail(ctx, r, fd, "jpeg")
309 if err != nil {
310 return err
311 }
312 thumb := &model.Thumbnail{
313 Format: "jpeg",
314 SegmentID: not.Segment.ID,
315 }
316 err = ss.mod.CreateThumbnail(thumb)
317 if err != nil {
318 return err
319 }
320 return nil
321}
322
323// UpdateStatus signals the background worker to update status (non-blocking)
324func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) {
325 select {
326 case ss.statusUpdateChan <- struct{}{}:
327 default:
328 // Channel full, signal already pending
329 }
330}
331
332// statusUpdateLoop runs as a background goroutine for the session lifetime
333func (ss *StreamSession) statusUpdateLoop(ctx context.Context, repoDID string) error {
334 ctx = log.WithLogValues(ctx, "func", "statusUpdateLoop")
335 for {
336 select {
337 case <-ctx.Done():
338 return nil
339 case <-ss.statusUpdateChan:
340 if time.Since(ss.lastStatus) < time.Minute {
341 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
342 continue
343 }
344 if err := ss.doUpdateStatus(ctx, repoDID); err != nil {
345 log.Error(ctx, "failed to update status", "error", err)
346 }
347 }
348 }
349}
350
351// doUpdateStatus performs the actual status update work
352func (ss *StreamSession) doUpdateStatus(ctx context.Context, repoDID string) error {
353 ctx = log.WithLogValues(ctx, "func", "doUpdateStatus")
354
355 client, err := ss.GetClientByDID(repoDID)
356 if err != nil {
357 return fmt.Errorf("could not get xrpc client: %w", err)
358 }
359
360 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
361 if err != nil {
362 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
363 }
364 lsv, err := ls.ToLivestreamView()
365 if err != nil {
366 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
367 }
368
369 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
370 if !ok {
371 return fmt.Errorf("livestream is not a streamplace livestream")
372 }
373 thumb := lsvr.Thumb
374
375 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
376 if err != nil {
377 return fmt.Errorf("could not get repo for repoDID: %w", err)
378 }
379
380 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
381 if !ok {
382 return fmt.Errorf("livestream is not a streamplace livestream")
383 }
384
385 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle)
386
387 if lsr.CanonicalUrl != nil {
388 canonicalUrl = *lsr.CanonicalUrl
389 }
390
391 actorStatusEmbed := bsky.ActorStatus_Embed{
392 EmbedExternal: &bsky.EmbedExternal{
393 External: &bsky.EmbedExternal_External{
394 Title: lsr.Title,
395 Uri: canonicalUrl,
396 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost),
397 Thumb: thumb,
398 },
399 },
400 }
401
402 duration := int64(10)
403 status := bsky.ActorStatus{
404 Status: "app.bsky.actor.status#live",
405 DurationMinutes: &duration,
406 Embed: &actorStatusEmbed,
407 CreatedAt: time.Now().Format(time.RFC3339),
408 }
409
410 var swapRecord *string
411 getOutput := comatproto.RepoGetRecord_Output{}
412 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
413 "repo": repoDID,
414 "collection": "app.bsky.actor.status",
415 "rkey": "self",
416 }, nil, &getOutput)
417 if err != nil {
418 xErr, ok := err.(*xrpc.Error)
419 if !ok {
420 return fmt.Errorf("could not get record: %w", err)
421 }
422 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
423 return fmt.Errorf("could not get record: %w", err)
424 }
425 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
426 } else {
427 log.Debug(ctx, "got record", "record", getOutput)
428 swapRecord = getOutput.Cid
429 }
430
431 inp := comatproto.RepoPutRecord_Input{
432 Collection: "app.bsky.actor.status",
433 Record: &lexutil.LexiconTypeDecoder{Val: &status},
434 Rkey: "self",
435 Repo: repoDID,
436 SwapRecord: swapRecord,
437 }
438 out := comatproto.RepoPutRecord_Output{}
439
440 ss.lastStatusCID = &out.Cid
441
442 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
443 if err != nil {
444 return fmt.Errorf("could not create record: %w", err)
445 }
446 log.Debug(ctx, "created status record", "out", out)
447
448 ss.lastStatus = time.Now()
449
450 return nil
451}
452
453func (ss *StreamSession) DeleteStatus(repoDID string) error {
454 // need a special extra context because the stream session context is already cancelled
455 // No lock needed - this runs during teardown after the background worker has exited
456 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID)
457 if ss.lastStatusCID == nil {
458 log.Debug(ctx, "no status cid to delete")
459 return nil
460 }
461 inp := comatproto.RepoDeleteRecord_Input{
462 Collection: "app.bsky.actor.status",
463 Rkey: "self",
464 Repo: repoDID,
465 }
466 inp.SwapRecord = ss.lastStatusCID
467 out := comatproto.RepoDeleteRecord_Output{}
468
469 client, err := ss.GetClientByDID(repoDID)
470 if err != nil {
471 return fmt.Errorf("could not get xrpc client: %w", err)
472 }
473
474 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out)
475 if err != nil {
476 return fmt.Errorf("could not delete record: %w", err)
477 }
478
479 ss.lastStatusCID = nil
480 return nil
481}
482
483var originUpdateInterval = time.Second * 30
484
485// UpdateBroadcastOrigin signals the background worker to update origin (non-blocking)
486func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) {
487 select {
488 case ss.originUpdateChan <- struct{}{}:
489 default:
490 // Channel full, signal already pending
491 }
492}
493
494// originUpdateLoop runs as a background goroutine for the session lifetime
495func (ss *StreamSession) originUpdateLoop(ctx context.Context) error {
496 ctx = log.WithLogValues(ctx, "func", "originUpdateLoop")
497 for {
498 select {
499 case <-ctx.Done():
500 return nil
501 case <-ss.originUpdateChan:
502 if time.Since(ss.lastOriginTime) < originUpdateInterval {
503 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago")
504 continue
505 }
506 if err := ss.doUpdateBroadcastOrigin(ctx); err != nil {
507 log.Error(ctx, "failed to update broadcast origin", "error", err)
508 }
509 }
510 }
511}
512
513// doUpdateBroadcastOrigin performs the actual broadcast origin update work
514func (ss *StreamSession) doUpdateBroadcastOrigin(ctx context.Context) error {
515 ctx = log.WithLogValues(ctx, "func", "doUpdateBroadcastOrigin")
516
517 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost)
518 origin := streamplace.BroadcastOrigin{
519 Streamer: ss.repoDID,
520 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost),
521 Broadcaster: &broadcaster,
522 UpdatedAt: time.Now().UTC().Format(util.ISO8601),
523 }
524 err := ss.replicator.BuildOriginRecord(&origin)
525 if err != nil {
526 return fmt.Errorf("could not build origin record: %w", err)
527 }
528
529 client, err := ss.GetClientByDID(ss.repoDID)
530 if err != nil {
531 return fmt.Errorf("could not get xrpc client for repoDID: %w", err)
532 }
533
534 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost)
535
536 var swapRecord *string
537 getOutput := comatproto.RepoGetRecord_Output{}
538 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
539 "repo": ss.repoDID,
540 "collection": "place.stream.broadcast.origin",
541 "rkey": rkey,
542 }, nil, &getOutput)
543 if err != nil {
544 xErr, ok := err.(*xrpc.Error)
545 if !ok {
546 return fmt.Errorf("could not get record: %w", err)
547 }
548 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
549 return fmt.Errorf("could not get record: %w", err)
550 }
551 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID)
552 } else {
553 log.Debug(ctx, "got record", "record", getOutput)
554 swapRecord = getOutput.Cid
555 }
556
557 inp := comatproto.RepoPutRecord_Input{
558 Collection: "place.stream.broadcast.origin",
559 Record: &lexutil.LexiconTypeDecoder{Val: &origin},
560 Rkey: rkey,
561 Repo: ss.repoDID,
562 SwapRecord: swapRecord,
563 }
564 out := comatproto.RepoPutRecord_Output{}
565
566 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
567 if err != nil {
568 return fmt.Errorf("could not create record: %w", err)
569 }
570
571 ss.lastOriginTime = time.Now()
572 return nil
573}
574
575func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
576 rs, err := renditions.GenerateRenditions(spseg)
577 if err != nil {
578 return fmt.Errorf("failed to generated renditions: %w", err)
579 }
580
581 if ss.lp == nil {
582 var err error
583 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL)
584 if err != nil {
585 return err
586 }
587
588 }
589 spmetrics.TranscodeAttemptsTotal.Inc()
590 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
591 if err != nil {
592 spmetrics.TranscodeErrorsTotal.Inc()
593 return err
594 }
595 if len(rs) != len(segs) {
596 spmetrics.TranscodeErrorsTotal.Inc()
597 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
598 }
599 spmetrics.TranscodeSuccessesTotal.Inc()
600 aqt, err := aqtime.FromString(spseg.StartTime)
601 if err != nil {
602 return err
603 }
604 for i, seg := range segs {
605 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
606 log.Debug(ctx, "publishing segment", "rendition", rs[i])
607 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
608 if err != nil {
609 return fmt.Errorf("failed to create transcoded segment file: %w", err)
610 }
611 defer fd.Close()
612 _, err = fd.Write(seg)
613 if err != nil {
614 return fmt.Errorf("failed to write transcoded segment file: %w", err)
615 }
616 ss.Go(ctx, func() error {
617 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
618 Filepath: fd.Name(),
619 Data: seg,
620 })
621 })
622
623 }
624 return nil
625}
626
627func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
628 ss.Go(ctx, func() error {
629 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
630 })
631 ss.Go(ctx, func() error {
632 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
633 })
634 return nil
635}
636
637func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
638 packet, err := media.Packetize(ctx, seg)
639 if err != nil {
640 return fmt.Errorf("failed to packetize segment: %w", err)
641 }
642 seg.PacketizedData = packet
643 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
644 return nil
645}
646
647func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
648 buf := bytes.Buffer{}
649 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
650 if err != nil {
651 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
652 }
653 // newSeg := &streamplace.Segment{
654 // LexiconTypeID: "place.stream.segment",
655 // Id: spseg.Id,
656 // Creator: spseg.Creator,
657 // StartTime: spseg.StartTime,
658 // Duration: &dur,
659 // Audio: spseg.Audio,
660 // Video: spseg.Video,
661 // SigningKey: spseg.SigningKey,
662 // }
663 aqt, err := aqtime.FromString(spseg.StartTime)
664 if err != nil {
665 return fmt.Errorf("failed to parse segment start time: %w", err)
666 }
667 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
668 rend, err := ss.hls.GetRendition(rendition)
669 if err != nil {
670 return fmt.Errorf("failed to get rendition: %w", err)
671 }
672 if err := rend.NewSegment(&media.Segment{
673 Buf: &buf,
674 Duration: time.Duration(dur),
675 Time: aqt.Time(),
676 }); err != nil {
677 return fmt.Errorf("failed to create new segment: %w", err)
678 }
679
680 return nil
681}
682
683type XRPCClient interface {
684 Do(ctx context.Context, method string, contentType string, path string, queryParams map[string]any, body any, out any) error
685}
686
687func (ss *StreamSession) GetClientByDID(did string) (XRPCClient, error) {
688 password, ok := ss.cli.DevAccountCreds[did]
689 if ok {
690 repo, err := ss.mod.GetRepoByHandleOrDID(did)
691 if err != nil {
692 return nil, fmt.Errorf("could not get repo by did: %w", err)
693 }
694 if repo == nil {
695 return nil, fmt.Errorf("repo not found for did: %s", did)
696 }
697 anonXRPCC := &xrpc.Client{
698 Host: repo.PDS,
699 Client: &aqhttp.Client,
700 }
701 session, err := comatproto.ServerCreateSession(context.Background(), anonXRPCC, &comatproto.ServerCreateSession_Input{
702 Identifier: repo.DID,
703 Password: password,
704 })
705 if err != nil {
706 return nil, fmt.Errorf("could not create session: %w", err)
707 }
708
709 log.Warn(context.Background(), "created session for dev account", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS)
710
711 return &xrpc.Client{
712 Host: repo.PDS,
713 Client: &aqhttp.Client,
714 Auth: &xrpc.AuthInfo{
715 Did: repo.DID,
716 AccessJwt: session.AccessJwt,
717 RefreshJwt: session.RefreshJwt,
718 Handle: repo.Handle,
719 },
720 }, nil
721 }
722 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID)
723 if err != nil {
724 return nil, fmt.Errorf("could not get OAuth session for repoDID: %w", err)
725 }
726 if session == nil {
727 return nil, fmt.Errorf("no session found for repoDID: %s", ss.repoDID)
728 }
729
730 session, err = ss.op.RefreshIfNeeded(session)
731 if err != nil {
732 return nil, fmt.Errorf("could not refresh session for repoDID: %w", err)
733 }
734
735 client, err := ss.op.GetXrpcClient(session)
736 if err != nil {
737 return nil, fmt.Errorf("could not get xrpc client: %w", err)
738 }
739
740 return client, nil
741}
742
743type runningMultistream struct {
744 cancel func()
745 key string
746 pushID string
747 url string
748}
749
750func sanitizeMultistreamTargetURL(uri string) string {
751 u, err := url.Parse(uri)
752 if err != nil {
753 return uri
754 }
755 u.Path = "/redacted"
756 return u.String()
757}
758
759// we're making an attempt here not to log (sensitive) stream keys, so we're
760// referencing by atproto URI
761func (ss *StreamSession) HandleMultistreamTargets(ctx context.Context) error {
762 ctx = log.WithLogValues(ctx, "system", "multistreaming")
763 isTrue := true
764 // {target.Uri}:{rec.Url} -> runningMultistream
765 // no concurrency issues, it's only used from this one loop
766 running := map[string]*runningMultistream{}
767 for {
768 targets, err := ss.statefulDB.ListMultistreamTargets(ss.repoDID, 100, 0, &isTrue)
769 if err != nil {
770 return fmt.Errorf("failed to list multistream targets: %w", err)
771 }
772 currentRunning := map[string]bool{}
773 for _, targetView := range targets {
774 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget)
775 if !ok {
776 log.Error(ctx, "failed to convert multistream target to streamplace multistream target", "uri", targetView.Uri)
777 continue
778 }
779 uu, err := uuid.NewV7()
780 if err != nil {
781 return err
782 }
783 ctx := log.WithLogValues(ctx, "url", sanitizeMultistreamTargetURL(rec.Url), "pushID", uu.String())
784 key := fmt.Sprintf("%s:%s", targetView.Uri, rec.Url)
785 if running[key] == nil {
786 childCtx, childCancel := context.WithCancel(ctx)
787 ss.Go(ctx, func() error {
788 log.Log(ctx, "starting multistream target", "uri", targetView.Uri)
789 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, "starting multistream target", "pending")
790 if err != nil {
791 log.Error(ctx, "failed to create multistream event", "error", err)
792 }
793 return ss.StartMultistreamTarget(childCtx, targetView)
794 })
795 running[key] = &runningMultistream{
796 cancel: childCancel,
797 key: key,
798 pushID: uu.String(),
799 url: sanitizeMultistreamTargetURL(rec.Url),
800 }
801 }
802 currentRunning[key] = true
803 }
804 for key := range running {
805 if !currentRunning[key] {
806 log.Log(ctx, "stopping multistream target", "url", sanitizeMultistreamTargetURL(running[key].url), "pushID", running[key].pushID)
807 running[key].cancel()
808 delete(running, key)
809 }
810 }
811 select {
812 case <-ctx.Done():
813 return nil
814 case <-time.After(time.Second * 5):
815 continue
816 }
817 }
818}
819
820func (ss *StreamSession) StartMultistreamTarget(ctx context.Context, targetView *streamplace.MultistreamDefs_TargetView) error {
821 for {
822 err := ss.mm.RTMPPush(ctx, ss.repoDID, "source", targetView)
823 if err != nil {
824 log.Error(ctx, "failed to push to RTMP server", "error", err)
825 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, err.Error(), "error")
826 if err != nil {
827 log.Error(ctx, "failed to create multistream event", "error", err)
828 }
829 }
830 select {
831 case <-ctx.Done():
832 return nil
833 case <-time.After(time.Second * 5):
834 continue
835 }
836 }
837}