Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/util"
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/streamplace/oatproxy/pkg/oatproxy"
16 "golang.org/x/sync/errgroup"
17 "stream.place/streamplace/pkg/aqhttp"
18 "stream.place/streamplace/pkg/aqtime"
19 "stream.place/streamplace/pkg/bus"
20 "stream.place/streamplace/pkg/config"
21 "stream.place/streamplace/pkg/livepeer"
22 "stream.place/streamplace/pkg/log"
23 "stream.place/streamplace/pkg/media"
24 "stream.place/streamplace/pkg/model"
25 "stream.place/streamplace/pkg/renditions"
26 "stream.place/streamplace/pkg/replication"
27 "stream.place/streamplace/pkg/spmetrics"
28 "stream.place/streamplace/pkg/statedb"
29 "stream.place/streamplace/pkg/streamplace"
30 "stream.place/streamplace/pkg/thumbnail"
31)
32
33type StreamSession struct {
34 mm *media.MediaManager
35 mod model.Model
36 cli *config.CLI
37 bus *bus.Bus
38 op *oatproxy.OATProxy
39 hls *media.M3U8
40 lp *livepeer.LivepeerSession
41 repoDID string
42 segmentChan chan struct{}
43 lastStatus time.Time
44 lastStatusCID *string
45 lastStatusLock sync.Mutex
46 lastOriginTime time.Time
47 lastOriginLock sync.Mutex
48 g *errgroup.Group
49 started chan struct{}
50 ctx context.Context
51 packets []bus.PacketizedSegment
52 statefulDB *statedb.StatefulDB
53 replicator replication.Replicator
54}
55
56func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error {
57 ctx, cancel := context.WithCancel(ctx)
58 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc()
59 ss.g, ctx = errgroup.WithContext(ctx)
60 sid := livepeer.RandomTrailer(8)
61 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID)
62 ss.ctx = ctx
63 log.Log(ctx, "starting stream session")
64 defer cancel()
65 spseg, err := notif.Segment.ToStreamplaceSegment()
66 if err != nil {
67 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
68 }
69 var allRenditions renditions.Renditions
70
71 if ss.cli.LivepeerGatewayURL != "" {
72 allRenditions, err = renditions.GenerateRenditions(spseg)
73 } else {
74 allRenditions = []renditions.Rendition{}
75 }
76 if err != nil {
77 return err
78 }
79 if spseg.Duration == nil {
80 return fmt.Errorf("segment duration is required to calculate bitrate")
81 }
82 dur := time.Duration(*spseg.Duration)
83 byteLen := len(notif.Data)
84 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
85 sourceRendition := renditions.Rendition{
86 Name: "source",
87 Bitrate: bitrate,
88 Width: spseg.Video[0].Width,
89 Height: spseg.Video[0].Height,
90 }
91 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
92 ss.hls = media.NewM3U8(allRenditions)
93
94 // for _, r := range allRenditions {
95 // g.Go(func() error {
96 // for {
97 // if ctx.Err() != nil {
98 // return nil
99 // }
100 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
101 // if ctx.Err() != nil {
102 // return nil
103 // }
104 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
105 // time.Sleep(time.Second * 5)
106 // }
107 // })
108 // }
109
110 close(ss.started)
111
112 ss.Go(ctx, func() error {
113 return ss.HandleMultistreamTargets(ctx)
114 })
115
116 for {
117 select {
118 case <-ss.segmentChan:
119 // reset timer
120 case <-ctx.Done():
121 return ss.g.Wait()
122 // case <-time.After(time.Minute * 1):
123 case <-time.After(ss.cli.StreamSessionTimeout):
124 log.Log(ctx, "stream session timeout, shutting down", "timeout", ss.cli.StreamSessionTimeout)
125 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec()
126 for _, r := range allRenditions {
127 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
128 }
129 if notif.Local {
130 ss.Go(ctx, func() error {
131 return ss.DeleteStatus(spseg.Creator)
132 })
133 }
134 cancel()
135 }
136 }
137}
138
139// Execute a goroutine in the context of the stream session. Errors are
140// non-fatal; if you actually want to melt the universe on an error you
141// should panic()
142func (ss *StreamSession) Go(ctx context.Context, f func() error) {
143 <-ss.started
144 ss.g.Go(func() error {
145 err := f()
146 if err != nil {
147 log.Error(ctx, "error in stream_session goroutine", "error", err)
148 }
149 return nil
150 })
151}
152
153func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error {
154 <-ss.started
155 go func() {
156 select {
157 case <-ss.ctx.Done():
158 return
159 case ss.segmentChan <- struct{}{}:
160 }
161 }()
162 aqt := aqtime.FromTime(notif.Segment.StartTime)
163 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString())
164 notif.Segment.MediaData.Size = len(notif.Data)
165 err := ss.mod.CreateSegment(notif.Segment)
166 if err != nil {
167 return fmt.Errorf("could not add segment to database: %w", err)
168 }
169 spseg, err := notif.Segment.ToStreamplaceSegment()
170 if err != nil {
171 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
172 }
173
174 ss.bus.Publish(spseg.Creator, spseg)
175 ss.Go(ctx, func() error {
176 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
177 Filepath: notif.Segment.ID,
178 Data: notif.Data,
179 })
180 })
181
182 if ss.cli.Thumbnail {
183 ss.Go(ctx, func() error {
184 return ss.Thumbnail(ctx, spseg.Creator, notif)
185 })
186 }
187
188 if notif.Local {
189 ss.Go(ctx, func() error {
190 return ss.UpdateStatus(ctx, spseg.Creator)
191 })
192
193 ss.Go(ctx, func() error {
194 return ss.UpdateBroadcastOrigin(ctx)
195 })
196 }
197
198 if ss.cli.LivepeerGatewayURL != "" {
199 ss.Go(ctx, func() error {
200 start := time.Now()
201 err := ss.Transcode(ctx, spseg, notif.Data)
202 took := time.Since(start)
203 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
204 return err
205 })
206 }
207
208 // trigger a notification blast if this is a new livestream
209 if notif.Metadata.Livestream != nil {
210 ss.Go(ctx, func() error {
211 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator)
212 if err != nil {
213 return fmt.Errorf("failed to get repo: %w", err)
214 }
215 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator)
216 if err != nil {
217 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
218 }
219 if livestreamModel == nil {
220 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator)
221 return nil
222 }
223 lsv, err := livestreamModel.ToLivestreamView()
224 if err != nil {
225 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err)
226 }
227 if !shouldNotify(lsv) {
228 log.Debug(ctx, "is not set to notify", "repoDID", spseg.Creator)
229 return nil
230 }
231 task := &statedb.NotificationTask{
232 Livestream: lsv,
233 PDSURL: r.PDS,
234 }
235 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator)
236 if err != nil {
237 return fmt.Errorf("failed to get chat profile: %w", err)
238 }
239 if cp != nil {
240 spcp, err := cp.ToStreamplaceChatProfile()
241 if err != nil {
242 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err)
243 }
244 task.ChatProfile = spcp
245 }
246
247 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri)))
248 if err != nil {
249 log.Error(ctx, "failed to enqueue notification task", "err", err)
250 }
251 return ss.UpdateStatus(ctx, spseg.Creator)
252 })
253 } else {
254 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator)
255 }
256
257 return nil
258}
259
260func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool {
261 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
262 if !ok {
263 return true
264 }
265 if lsvr.NotificationSettings == nil {
266 return true
267 }
268 settings := lsvr.NotificationSettings
269 if settings.PushNotification == nil {
270 return true
271 }
272 return *settings.PushNotification
273}
274
275func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
276 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
277 locked := lock.TryLock()
278 if !locked {
279 // we're already generating a thumbnail for this user, skip
280 return nil
281 }
282 defer lock.Unlock()
283 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
284 if err != nil {
285 return err
286 }
287 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
288 // we have a thumbnail <60sec old, skip generating a new one
289 return nil
290 }
291 r := bytes.NewReader(not.Data)
292 aqt := aqtime.FromTime(not.Segment.StartTime)
293 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
294 if err != nil {
295 return err
296 }
297 defer fd.Close()
298 err = media.Thumbnail(ctx, r, fd, "jpeg")
299 if err != nil {
300 return err
301 }
302 thumb := &model.Thumbnail{
303 Format: "jpeg",
304 SegmentID: not.Segment.ID,
305 }
306 err = ss.mod.CreateThumbnail(thumb)
307 if err != nil {
308 return err
309 }
310 return nil
311}
312
313func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
314 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
315 ss.lastStatusLock.Lock()
316 defer ss.lastStatusLock.Unlock()
317 if time.Since(ss.lastStatus) < time.Minute {
318 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
319 return nil
320 }
321
322 client, err := ss.GetClientByDID(repoDID)
323 if err != nil {
324 return fmt.Errorf("could not get xrpc client: %w", err)
325 }
326
327 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
328 if err != nil {
329 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
330 }
331 lsv, err := ls.ToLivestreamView()
332 if err != nil {
333 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
334 }
335
336 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
337 if !ok {
338 return fmt.Errorf("livestream is not a streamplace livestream")
339 }
340 thumb := lsvr.Thumb
341
342 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
343 if err != nil {
344 return fmt.Errorf("could not get repo for repoDID: %w", err)
345 }
346
347 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
348 if !ok {
349 return fmt.Errorf("livestream is not a streamplace livestream")
350 }
351
352 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle)
353
354 if lsr.CanonicalUrl != nil {
355 canonicalUrl = *lsr.CanonicalUrl
356 }
357
358 actorStatusEmbed := bsky.ActorStatus_Embed{
359 EmbedExternal: &bsky.EmbedExternal{
360 External: &bsky.EmbedExternal_External{
361 Title: lsr.Title,
362 Uri: canonicalUrl,
363 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost),
364 Thumb: thumb,
365 },
366 },
367 }
368
369 duration := int64(120)
370 status := bsky.ActorStatus{
371 Status: "app.bsky.actor.status#live",
372 DurationMinutes: &duration,
373 Embed: &actorStatusEmbed,
374 CreatedAt: time.Now().Format(time.RFC3339),
375 }
376
377 var swapRecord *string
378 getOutput := comatproto.RepoGetRecord_Output{}
379 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
380 "repo": repoDID,
381 "collection": "app.bsky.actor.status",
382 "rkey": "self",
383 }, nil, &getOutput)
384 if err != nil {
385 xErr, ok := err.(*xrpc.Error)
386 if !ok {
387 return fmt.Errorf("could not get record: %w", err)
388 }
389 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
390 return fmt.Errorf("could not get record: %w", err)
391 }
392 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
393 } else {
394 log.Debug(ctx, "got record", "record", getOutput)
395 swapRecord = getOutput.Cid
396 }
397
398 inp := comatproto.RepoPutRecord_Input{
399 Collection: "app.bsky.actor.status",
400 Record: &lexutil.LexiconTypeDecoder{Val: &status},
401 Rkey: "self",
402 Repo: repoDID,
403 SwapRecord: swapRecord,
404 }
405 out := comatproto.RepoPutRecord_Output{}
406
407 ss.lastStatusCID = &out.Cid
408
409 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
410 if err != nil {
411 return fmt.Errorf("could not create record: %w", err)
412 }
413 log.Debug(ctx, "created status record", "out", out)
414
415 ss.lastStatus = time.Now()
416
417 return nil
418}
419
420func (ss *StreamSession) DeleteStatus(repoDID string) error {
421 // need a special extra context because the stream session context is already cancelled
422 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID)
423 ss.lastStatusLock.Lock()
424 defer ss.lastStatusLock.Unlock()
425 if ss.lastStatusCID == nil {
426 log.Debug(ctx, "no status cid to delete")
427 return nil
428 }
429 inp := comatproto.RepoDeleteRecord_Input{
430 Collection: "app.bsky.actor.status",
431 Rkey: "self",
432 Repo: repoDID,
433 }
434 inp.SwapRecord = ss.lastStatusCID
435 out := comatproto.RepoDeleteRecord_Output{}
436
437 client, err := ss.GetClientByDID(repoDID)
438 if err != nil {
439 return fmt.Errorf("could not get xrpc client: %w", err)
440 }
441
442 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out)
443 if err != nil {
444 return fmt.Errorf("could not delete record: %w", err)
445 }
446
447 ss.lastStatusCID = nil
448 return nil
449}
450
451var originUpdateInterval = time.Second * 30
452
453func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error {
454 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
455 ss.lastOriginLock.Lock()
456 defer ss.lastOriginLock.Unlock()
457 if time.Since(ss.lastOriginTime) < originUpdateInterval {
458 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago")
459 return nil
460 }
461 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost)
462 origin := streamplace.BroadcastOrigin{
463 Streamer: ss.repoDID,
464 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost),
465 Broadcaster: &broadcaster,
466 UpdatedAt: time.Now().UTC().Format(util.ISO8601),
467 }
468 err := ss.replicator.BuildOriginRecord(&origin)
469 if err != nil {
470 return fmt.Errorf("could not build origin record: %w", err)
471 }
472
473 client, err := ss.GetClientByDID(ss.repoDID)
474 if err != nil {
475 return fmt.Errorf("could not get xrpc client for repoDID: %w", err)
476 }
477
478 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost)
479
480 var swapRecord *string
481 getOutput := comatproto.RepoGetRecord_Output{}
482 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
483 "repo": ss.repoDID,
484 "collection": "place.stream.broadcast.origin",
485 "rkey": rkey,
486 }, nil, &getOutput)
487 if err != nil {
488 xErr, ok := err.(*xrpc.Error)
489 if !ok {
490 return fmt.Errorf("could not get record: %w", err)
491 }
492 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
493 return fmt.Errorf("could not get record: %w", err)
494 }
495 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID)
496 } else {
497 log.Debug(ctx, "got record", "record", getOutput)
498 swapRecord = getOutput.Cid
499 }
500
501 inp := comatproto.RepoPutRecord_Input{
502 Collection: "place.stream.broadcast.origin",
503 Record: &lexutil.LexiconTypeDecoder{Val: &origin},
504 Rkey: rkey,
505 Repo: ss.repoDID,
506 SwapRecord: swapRecord,
507 }
508 out := comatproto.RepoPutRecord_Output{}
509
510 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
511 if err != nil {
512 return fmt.Errorf("could not create record: %w", err)
513 }
514
515 ss.lastOriginTime = time.Now()
516 return nil
517}
518
519func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
520 rs, err := renditions.GenerateRenditions(spseg)
521 if err != nil {
522 return fmt.Errorf("failed to generated renditions: %w", err)
523 }
524
525 if ss.lp == nil {
526 var err error
527 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL)
528 if err != nil {
529 return err
530 }
531
532 }
533 spmetrics.TranscodeAttemptsTotal.Inc()
534 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
535 if err != nil {
536 spmetrics.TranscodeErrorsTotal.Inc()
537 return err
538 }
539 if len(rs) != len(segs) {
540 spmetrics.TranscodeErrorsTotal.Inc()
541 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
542 }
543 spmetrics.TranscodeSuccessesTotal.Inc()
544 aqt, err := aqtime.FromString(spseg.StartTime)
545 if err != nil {
546 return err
547 }
548 for i, seg := range segs {
549 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
550 log.Debug(ctx, "publishing segment", "rendition", rs[i])
551 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
552 if err != nil {
553 return fmt.Errorf("failed to create transcoded segment file: %w", err)
554 }
555 defer fd.Close()
556 _, err = fd.Write(seg)
557 if err != nil {
558 return fmt.Errorf("failed to write transcoded segment file: %w", err)
559 }
560 ss.Go(ctx, func() error {
561 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
562 Filepath: fd.Name(),
563 Data: seg,
564 })
565 })
566
567 }
568 return nil
569}
570
571func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
572 ss.Go(ctx, func() error {
573 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
574 })
575 ss.Go(ctx, func() error {
576 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
577 })
578 return nil
579}
580
581func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
582 packet, err := media.Packetize(ctx, seg)
583 if err != nil {
584 return fmt.Errorf("failed to packetize segment: %w", err)
585 }
586 seg.PacketizedData = packet
587 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
588 return nil
589}
590
591func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
592 buf := bytes.Buffer{}
593 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
594 if err != nil {
595 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
596 }
597 // newSeg := &streamplace.Segment{
598 // LexiconTypeID: "place.stream.segment",
599 // Id: spseg.Id,
600 // Creator: spseg.Creator,
601 // StartTime: spseg.StartTime,
602 // Duration: &dur,
603 // Audio: spseg.Audio,
604 // Video: spseg.Video,
605 // SigningKey: spseg.SigningKey,
606 // }
607 aqt, err := aqtime.FromString(spseg.StartTime)
608 if err != nil {
609 return fmt.Errorf("failed to parse segment start time: %w", err)
610 }
611 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
612 rend, err := ss.hls.GetRendition(rendition)
613 if err != nil {
614 return fmt.Errorf("failed to get rendition: %w", err)
615 }
616 if err := rend.NewSegment(&media.Segment{
617 Buf: &buf,
618 Duration: time.Duration(dur),
619 Time: aqt.Time(),
620 }); err != nil {
621 return fmt.Errorf("failed to create new segment: %w", err)
622 }
623
624 return nil
625}
626
627type XRPCClient interface {
628 Do(ctx context.Context, method string, contentType string, path string, queryParams map[string]any, body any, out any) error
629}
630
631func (ss *StreamSession) GetClientByDID(did string) (XRPCClient, error) {
632 password, ok := ss.cli.DevAccountCreds[did]
633 if ok {
634 repo, err := ss.mod.GetRepoByHandleOrDID(did)
635 if err != nil {
636 return nil, fmt.Errorf("could not get repo by did: %w", err)
637 }
638 if repo == nil {
639 return nil, fmt.Errorf("repo not found for did: %s", did)
640 }
641 anonXRPCC := &xrpc.Client{
642 Host: repo.PDS,
643 Client: &aqhttp.Client,
644 }
645 session, err := comatproto.ServerCreateSession(context.Background(), anonXRPCC, &comatproto.ServerCreateSession_Input{
646 Identifier: repo.DID,
647 Password: password,
648 })
649 if err != nil {
650 return nil, fmt.Errorf("could not create session: %w", err)
651 }
652
653 log.Warn(context.Background(), "created session for dev account", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS)
654
655 return &xrpc.Client{
656 Host: repo.PDS,
657 Client: &aqhttp.Client,
658 Auth: &xrpc.AuthInfo{
659 Did: repo.DID,
660 AccessJwt: session.AccessJwt,
661 RefreshJwt: session.RefreshJwt,
662 Handle: repo.Handle,
663 },
664 }, nil
665 }
666 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID)
667 if err != nil {
668 return nil, fmt.Errorf("could not get OAuth session for repoDID: %w", err)
669 }
670 if session == nil {
671 return nil, fmt.Errorf("no session found for repoDID: %s", ss.repoDID)
672 }
673
674 session, err = ss.op.RefreshIfNeeded(session)
675 if err != nil {
676 return nil, fmt.Errorf("could not refresh session for repoDID: %w", err)
677 }
678
679 client, err := ss.op.GetXrpcClient(session)
680 if err != nil {
681 return nil, fmt.Errorf("could not get xrpc client: %w", err)
682 }
683
684 return client, nil
685}
686
687type runningMultistream struct {
688 cancel func()
689 uri string
690}
691
692// we're making an attempt here not to log (sensitive) stream keys, so we're
693// referencing by atproto URI
694func (ss *StreamSession) HandleMultistreamTargets(ctx context.Context) error {
695 ctx = log.WithLogValues(ctx, "system", "multistreaming")
696 isTrue := true
697 // {target.Uri}:{rec.Url} -> runningMultistream
698 // no concurrency issues, it's only used from this one loop
699 running := map[string]*runningMultistream{}
700 for {
701 targets, err := ss.statefulDB.ListMultistreamTargets(ss.repoDID, 100, 0, &isTrue)
702 if err != nil {
703 return fmt.Errorf("failed to list multistream targets: %w", err)
704 }
705 currentRunning := map[string]bool{}
706 for _, targetView := range targets {
707 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget)
708 if !ok {
709 log.Error(ctx, "failed to convert multistream target to streamplace multistream target", "uri", targetView.Uri)
710 continue
711 }
712 key := fmt.Sprintf("%s:%s", targetView.Uri, rec.Url)
713 if running[key] == nil {
714 childCtx, childCancel := context.WithCancel(ctx)
715 ss.Go(ctx, func() error {
716 log.Log(ctx, "starting multistream target", "uri", targetView.Uri)
717 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, "starting multistream target", "pending")
718 if err != nil {
719 log.Error(ctx, "failed to create multistream event", "error", err)
720 }
721 return ss.StartMultistreamTarget(childCtx, targetView)
722 })
723 running[key] = &runningMultistream{
724 cancel: childCancel,
725 uri: key,
726 }
727 }
728 currentRunning[key] = true
729 }
730 for key := range running {
731 if !currentRunning[key] {
732 log.Log(ctx, "stopping multistream target", "uri", running[key].uri)
733 running[key].cancel()
734 delete(running, key)
735 }
736 }
737 select {
738 case <-ctx.Done():
739 return nil
740 case <-time.After(time.Second * 5):
741 continue
742 }
743 }
744}
745
746func (ss *StreamSession) StartMultistreamTarget(ctx context.Context, targetView *streamplace.MultistreamDefs_TargetView) error {
747 for {
748 err := ss.mm.RTMPPush(ctx, ss.repoDID, "source", targetView)
749 if err != nil {
750 log.Error(ctx, "failed to push to RTMP server", "error", err)
751 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, err.Error(), "error")
752 if err != nil {
753 log.Error(ctx, "failed to create multistream event", "error", err)
754 }
755 }
756 select {
757 case <-ctx.Done():
758 return nil
759 case <-time.After(time.Second * 5):
760 continue
761 }
762 }
763}