Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/docs-separation 837 lines 25 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/url" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/util" 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/google/uuid" 16 "github.com/streamplace/oatproxy/pkg/oatproxy" 17 "golang.org/x/sync/errgroup" 18 "stream.place/streamplace/pkg/aqhttp" 19 "stream.place/streamplace/pkg/aqtime" 20 "stream.place/streamplace/pkg/bus" 21 "stream.place/streamplace/pkg/config" 22 "stream.place/streamplace/pkg/livepeer" 23 "stream.place/streamplace/pkg/log" 24 "stream.place/streamplace/pkg/media" 25 "stream.place/streamplace/pkg/model" 26 "stream.place/streamplace/pkg/renditions" 27 "stream.place/streamplace/pkg/replication" 28 "stream.place/streamplace/pkg/spmetrics" 29 "stream.place/streamplace/pkg/statedb" 30 "stream.place/streamplace/pkg/streamplace" 31 "stream.place/streamplace/pkg/thumbnail" 32) 33 34type StreamSession struct { 35 mm *media.MediaManager 36 mod model.Model 37 cli *config.CLI 38 bus *bus.Bus 39 op *oatproxy.OATProxy 40 hls *media.M3U8 41 lp *livepeer.LivepeerSession 42 repoDID string 43 segmentChan chan struct{} 44 lastStatus time.Time 45 lastStatusCID *string 46 lastOriginTime time.Time 47 48 // Channels for background workers 49 statusUpdateChan chan struct{} // Signal to update status 50 originUpdateChan chan struct{} // Signal to update broadcast origin 51 52 g *errgroup.Group 53 started chan struct{} 54 ctx context.Context 55 packets []bus.PacketizedSegment 56 statefulDB *statedb.StatefulDB 57 replicator replication.Replicator 58} 59 60func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { 61 ctx, cancel := context.WithCancel(ctx) 62 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc() 63 ss.g, ctx = errgroup.WithContext(ctx) 64 sid := livepeer.RandomTrailer(8) 65 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID) 66 ss.ctx = ctx 67 log.Log(ctx, "starting stream session") 68 defer cancel() 69 spseg, err := notif.Segment.ToStreamplaceSegment() 70 if err != nil { 71 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 72 } 73 var allRenditions renditions.Renditions 74 75 if ss.cli.LivepeerGatewayURL != "" { 76 allRenditions, err = renditions.GenerateRenditions(spseg) 77 } else { 78 allRenditions = []renditions.Rendition{} 79 } 80 if err != nil { 81 return err 82 } 83 if spseg.Duration == nil { 84 return fmt.Errorf("segment duration is required to calculate bitrate") 85 } 86 dur := time.Duration(*spseg.Duration) 87 byteLen := len(notif.Data) 88 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 89 sourceRendition := renditions.Rendition{ 90 Name: "source", 91 Bitrate: bitrate, 92 Width: spseg.Video[0].Width, 93 Height: spseg.Video[0].Height, 94 } 95 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 96 ss.hls = media.NewM3U8(allRenditions) 97 98 // for _, r := range allRenditions { 99 // g.Go(func() error { 100 // for { 101 // if ctx.Err() != nil { 102 // return nil 103 // } 104 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 105 // if ctx.Err() != nil { 106 // return nil 107 // } 108 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 109 // time.Sleep(time.Second * 5) 110 // } 111 // }) 112 // } 113 114 close(ss.started) 115 116 // Start background workers for status and origin updates 117 ss.g.Go(func() error { 118 return ss.statusUpdateLoop(ctx, spseg.Creator) 119 }) 120 ss.g.Go(func() error { 121 return ss.originUpdateLoop(ctx) 122 }) 123 124 ss.Go(ctx, func() error { 125 return ss.HandleMultistreamTargets(ctx) 126 }) 127 128 for { 129 select { 130 case <-ss.segmentChan: 131 // reset timer 132 case <-ctx.Done(): 133 // Signal all background workers to stop 134 return ss.g.Wait() 135 // case <-time.After(time.Minute * 1): 136 case <-time.After(ss.cli.StreamSessionTimeout): 137 log.Log(ctx, "stream session timeout, shutting down", "timeout", ss.cli.StreamSessionTimeout) 138 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec() 139 for _, r := range allRenditions { 140 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 141 } 142 // Signal background workers to stop 143 if notif.Local { 144 ss.Go(ctx, func() error { 145 return ss.DeleteStatus(spseg.Creator) 146 }) 147 } 148 cancel() 149 } 150 } 151} 152 153// Execute a goroutine in the context of the stream session. Errors are 154// non-fatal; if you actually want to melt the universe on an error you 155// should panic() 156func (ss *StreamSession) Go(ctx context.Context, f func() error) { 157 <-ss.started 158 ss.g.Go(func() error { 159 err := f() 160 if err != nil { 161 log.Error(ctx, "error in stream_session goroutine", "error", err) 162 } 163 return nil 164 }) 165} 166 167func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error { 168 <-ss.started 169 go func() { 170 select { 171 case <-ss.ctx.Done(): 172 return 173 case ss.segmentChan <- struct{}{}: 174 } 175 }() 176 aqt := aqtime.FromTime(notif.Segment.StartTime) 177 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 178 notif.Segment.MediaData.Size = len(notif.Data) 179 err := ss.mod.CreateSegment(notif.Segment) 180 if err != nil { 181 return fmt.Errorf("could not add segment to database: %w", err) 182 } 183 spseg, err := notif.Segment.ToStreamplaceSegment() 184 if err != nil { 185 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 186 } 187 188 ss.bus.Publish(spseg.Creator, spseg) 189 ss.Go(ctx, func() error { 190 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 191 Filepath: notif.Segment.ID, 192 Data: notif.Data, 193 }) 194 }) 195 196 if ss.cli.Thumbnail { 197 ss.Go(ctx, func() error { 198 return ss.Thumbnail(ctx, spseg.Creator, notif) 199 }) 200 } 201 202 if notif.Local { 203 ss.UpdateStatus(ctx, spseg.Creator) 204 ss.UpdateBroadcastOrigin(ctx) 205 } 206 207 if ss.cli.LivepeerGatewayURL != "" { 208 ss.Go(ctx, func() error { 209 start := time.Now() 210 err := ss.Transcode(ctx, spseg, notif.Data) 211 took := time.Since(start) 212 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 213 return err 214 }) 215 } 216 217 // trigger a notification blast if this is a new livestream 218 if notif.Metadata.Livestream != nil { 219 ss.Go(ctx, func() error { 220 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator) 221 if err != nil { 222 return fmt.Errorf("failed to get repo: %w", err) 223 } 224 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator) 225 if err != nil { 226 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 227 } 228 if livestreamModel == nil { 229 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator) 230 return nil 231 } 232 lsv, err := livestreamModel.ToLivestreamView() 233 if err != nil { 234 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err) 235 } 236 if !shouldNotify(lsv) { 237 log.Debug(ctx, "is not set to notify", "repoDID", spseg.Creator) 238 return nil 239 } 240 task := &statedb.NotificationTask{ 241 Livestream: lsv, 242 PDSURL: r.PDS, 243 } 244 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator) 245 if err != nil { 246 return fmt.Errorf("failed to get chat profile: %w", err) 247 } 248 if cp != nil { 249 spcp, err := cp.ToStreamplaceChatProfile() 250 if err != nil { 251 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 252 } 253 task.ChatProfile = spcp 254 } 255 256 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri))) 257 if err != nil { 258 log.Error(ctx, "failed to enqueue notification task", "err", err) 259 } 260 ss.UpdateStatus(ctx, spseg.Creator) 261 return nil 262 }) 263 } else { 264 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator) 265 } 266 267 return nil 268} 269 270func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool { 271 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 272 if !ok { 273 return true 274 } 275 if lsvr.NotificationSettings == nil { 276 return true 277 } 278 settings := lsvr.NotificationSettings 279 if settings.PushNotification == nil { 280 return true 281 } 282 return *settings.PushNotification 283} 284 285func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 286 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 287 locked := lock.TryLock() 288 if !locked { 289 // we're already generating a thumbnail for this user, skip 290 return nil 291 } 292 defer lock.Unlock() 293 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 294 if err != nil { 295 return err 296 } 297 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 298 // we have a thumbnail <60sec old, skip generating a new one 299 return nil 300 } 301 r := bytes.NewReader(not.Data) 302 aqt := aqtime.FromTime(not.Segment.StartTime) 303 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 304 if err != nil { 305 return err 306 } 307 defer fd.Close() 308 err = media.Thumbnail(ctx, r, fd, "jpeg") 309 if err != nil { 310 return err 311 } 312 thumb := &model.Thumbnail{ 313 Format: "jpeg", 314 SegmentID: not.Segment.ID, 315 } 316 err = ss.mod.CreateThumbnail(thumb) 317 if err != nil { 318 return err 319 } 320 return nil 321} 322 323// UpdateStatus signals the background worker to update status (non-blocking) 324func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) { 325 select { 326 case ss.statusUpdateChan <- struct{}{}: 327 default: 328 // Channel full, signal already pending 329 } 330} 331 332// statusUpdateLoop runs as a background goroutine for the session lifetime 333func (ss *StreamSession) statusUpdateLoop(ctx context.Context, repoDID string) error { 334 ctx = log.WithLogValues(ctx, "func", "statusUpdateLoop") 335 for { 336 select { 337 case <-ctx.Done(): 338 return nil 339 case <-ss.statusUpdateChan: 340 if time.Since(ss.lastStatus) < time.Minute { 341 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 342 continue 343 } 344 if err := ss.doUpdateStatus(ctx, repoDID); err != nil { 345 log.Error(ctx, "failed to update status", "error", err) 346 } 347 } 348 } 349} 350 351// doUpdateStatus performs the actual status update work 352func (ss *StreamSession) doUpdateStatus(ctx context.Context, repoDID string) error { 353 ctx = log.WithLogValues(ctx, "func", "doUpdateStatus") 354 355 client, err := ss.GetClientByDID(repoDID) 356 if err != nil { 357 return fmt.Errorf("could not get xrpc client: %w", err) 358 } 359 360 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 361 if err != nil { 362 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 363 } 364 lsv, err := ls.ToLivestreamView() 365 if err != nil { 366 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 367 } 368 369 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 370 if !ok { 371 return fmt.Errorf("livestream is not a streamplace livestream") 372 } 373 thumb := lsvr.Thumb 374 375 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 376 if err != nil { 377 return fmt.Errorf("could not get repo for repoDID: %w", err) 378 } 379 380 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 381 if !ok { 382 return fmt.Errorf("livestream is not a streamplace livestream") 383 } 384 385 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle) 386 387 if lsr.CanonicalUrl != nil { 388 canonicalUrl = *lsr.CanonicalUrl 389 } 390 391 actorStatusEmbed := bsky.ActorStatus_Embed{ 392 EmbedExternal: &bsky.EmbedExternal{ 393 External: &bsky.EmbedExternal_External{ 394 Title: lsr.Title, 395 Uri: canonicalUrl, 396 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost), 397 Thumb: thumb, 398 }, 399 }, 400 } 401 402 duration := int64(10) 403 status := bsky.ActorStatus{ 404 Status: "app.bsky.actor.status#live", 405 DurationMinutes: &duration, 406 Embed: &actorStatusEmbed, 407 CreatedAt: time.Now().Format(time.RFC3339), 408 } 409 410 var swapRecord *string 411 getOutput := comatproto.RepoGetRecord_Output{} 412 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 413 "repo": repoDID, 414 "collection": "app.bsky.actor.status", 415 "rkey": "self", 416 }, nil, &getOutput) 417 if err != nil { 418 xErr, ok := err.(*xrpc.Error) 419 if !ok { 420 return fmt.Errorf("could not get record: %w", err) 421 } 422 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 423 return fmt.Errorf("could not get record: %w", err) 424 } 425 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 426 } else { 427 log.Debug(ctx, "got record", "record", getOutput) 428 swapRecord = getOutput.Cid 429 } 430 431 inp := comatproto.RepoPutRecord_Input{ 432 Collection: "app.bsky.actor.status", 433 Record: &lexutil.LexiconTypeDecoder{Val: &status}, 434 Rkey: "self", 435 Repo: repoDID, 436 SwapRecord: swapRecord, 437 } 438 out := comatproto.RepoPutRecord_Output{} 439 440 ss.lastStatusCID = &out.Cid 441 442 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 443 if err != nil { 444 return fmt.Errorf("could not create record: %w", err) 445 } 446 log.Debug(ctx, "created status record", "out", out) 447 448 ss.lastStatus = time.Now() 449 450 return nil 451} 452 453func (ss *StreamSession) DeleteStatus(repoDID string) error { 454 // need a special extra context because the stream session context is already cancelled 455 // No lock needed - this runs during teardown after the background worker has exited 456 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID) 457 if ss.lastStatusCID == nil { 458 log.Debug(ctx, "no status cid to delete") 459 return nil 460 } 461 inp := comatproto.RepoDeleteRecord_Input{ 462 Collection: "app.bsky.actor.status", 463 Rkey: "self", 464 Repo: repoDID, 465 } 466 inp.SwapRecord = ss.lastStatusCID 467 out := comatproto.RepoDeleteRecord_Output{} 468 469 client, err := ss.GetClientByDID(repoDID) 470 if err != nil { 471 return fmt.Errorf("could not get xrpc client: %w", err) 472 } 473 474 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out) 475 if err != nil { 476 return fmt.Errorf("could not delete record: %w", err) 477 } 478 479 ss.lastStatusCID = nil 480 return nil 481} 482 483var originUpdateInterval = time.Second * 30 484 485// UpdateBroadcastOrigin signals the background worker to update origin (non-blocking) 486func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) { 487 select { 488 case ss.originUpdateChan <- struct{}{}: 489 default: 490 // Channel full, signal already pending 491 } 492} 493 494// originUpdateLoop runs as a background goroutine for the session lifetime 495func (ss *StreamSession) originUpdateLoop(ctx context.Context) error { 496 ctx = log.WithLogValues(ctx, "func", "originUpdateLoop") 497 for { 498 select { 499 case <-ctx.Done(): 500 return nil 501 case <-ss.originUpdateChan: 502 if time.Since(ss.lastOriginTime) < originUpdateInterval { 503 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 504 continue 505 } 506 if err := ss.doUpdateBroadcastOrigin(ctx); err != nil { 507 log.Error(ctx, "failed to update broadcast origin", "error", err) 508 } 509 } 510 } 511} 512 513// doUpdateBroadcastOrigin performs the actual broadcast origin update work 514func (ss *StreamSession) doUpdateBroadcastOrigin(ctx context.Context) error { 515 ctx = log.WithLogValues(ctx, "func", "doUpdateBroadcastOrigin") 516 517 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost) 518 origin := streamplace.BroadcastOrigin{ 519 Streamer: ss.repoDID, 520 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost), 521 Broadcaster: &broadcaster, 522 UpdatedAt: time.Now().UTC().Format(util.ISO8601), 523 } 524 err := ss.replicator.BuildOriginRecord(&origin) 525 if err != nil { 526 return fmt.Errorf("could not build origin record: %w", err) 527 } 528 529 client, err := ss.GetClientByDID(ss.repoDID) 530 if err != nil { 531 return fmt.Errorf("could not get xrpc client for repoDID: %w", err) 532 } 533 534 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost) 535 536 var swapRecord *string 537 getOutput := comatproto.RepoGetRecord_Output{} 538 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 539 "repo": ss.repoDID, 540 "collection": "place.stream.broadcast.origin", 541 "rkey": rkey, 542 }, nil, &getOutput) 543 if err != nil { 544 xErr, ok := err.(*xrpc.Error) 545 if !ok { 546 return fmt.Errorf("could not get record: %w", err) 547 } 548 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 549 return fmt.Errorf("could not get record: %w", err) 550 } 551 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID) 552 } else { 553 log.Debug(ctx, "got record", "record", getOutput) 554 swapRecord = getOutput.Cid 555 } 556 557 inp := comatproto.RepoPutRecord_Input{ 558 Collection: "place.stream.broadcast.origin", 559 Record: &lexutil.LexiconTypeDecoder{Val: &origin}, 560 Rkey: rkey, 561 Repo: ss.repoDID, 562 SwapRecord: swapRecord, 563 } 564 out := comatproto.RepoPutRecord_Output{} 565 566 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 567 if err != nil { 568 return fmt.Errorf("could not create record: %w", err) 569 } 570 571 ss.lastOriginTime = time.Now() 572 return nil 573} 574 575func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 576 rs, err := renditions.GenerateRenditions(spseg) 577 if err != nil { 578 return fmt.Errorf("failed to generated renditions: %w", err) 579 } 580 581 if ss.lp == nil { 582 var err error 583 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL) 584 if err != nil { 585 return err 586 } 587 588 } 589 spmetrics.TranscodeAttemptsTotal.Inc() 590 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 591 if err != nil { 592 spmetrics.TranscodeErrorsTotal.Inc() 593 return err 594 } 595 if len(rs) != len(segs) { 596 spmetrics.TranscodeErrorsTotal.Inc() 597 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 598 } 599 spmetrics.TranscodeSuccessesTotal.Inc() 600 aqt, err := aqtime.FromString(spseg.StartTime) 601 if err != nil { 602 return err 603 } 604 for i, seg := range segs { 605 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 606 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 607 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 608 if err != nil { 609 return fmt.Errorf("failed to create transcoded segment file: %w", err) 610 } 611 defer fd.Close() 612 _, err = fd.Write(seg) 613 if err != nil { 614 return fmt.Errorf("failed to write transcoded segment file: %w", err) 615 } 616 ss.Go(ctx, func() error { 617 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 618 Filepath: fd.Name(), 619 Data: seg, 620 }) 621 }) 622 623 } 624 return nil 625} 626 627func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 628 ss.Go(ctx, func() error { 629 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 630 }) 631 ss.Go(ctx, func() error { 632 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 633 }) 634 return nil 635} 636 637func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 638 packet, err := media.Packetize(ctx, seg) 639 if err != nil { 640 return fmt.Errorf("failed to packetize segment: %w", err) 641 } 642 seg.PacketizedData = packet 643 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 644 return nil 645} 646 647func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 648 buf := bytes.Buffer{} 649 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 650 if err != nil { 651 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 652 } 653 // newSeg := &streamplace.Segment{ 654 // LexiconTypeID: "place.stream.segment", 655 // Id: spseg.Id, 656 // Creator: spseg.Creator, 657 // StartTime: spseg.StartTime, 658 // Duration: &dur, 659 // Audio: spseg.Audio, 660 // Video: spseg.Video, 661 // SigningKey: spseg.SigningKey, 662 // } 663 aqt, err := aqtime.FromString(spseg.StartTime) 664 if err != nil { 665 return fmt.Errorf("failed to parse segment start time: %w", err) 666 } 667 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 668 rend, err := ss.hls.GetRendition(rendition) 669 if err != nil { 670 return fmt.Errorf("failed to get rendition: %w", err) 671 } 672 if err := rend.NewSegment(&media.Segment{ 673 Buf: &buf, 674 Duration: time.Duration(dur), 675 Time: aqt.Time(), 676 }); err != nil { 677 return fmt.Errorf("failed to create new segment: %w", err) 678 } 679 680 return nil 681} 682 683type XRPCClient interface { 684 Do(ctx context.Context, method string, contentType string, path string, queryParams map[string]any, body any, out any) error 685} 686 687func (ss *StreamSession) GetClientByDID(did string) (XRPCClient, error) { 688 password, ok := ss.cli.DevAccountCreds[did] 689 if ok { 690 repo, err := ss.mod.GetRepoByHandleOrDID(did) 691 if err != nil { 692 return nil, fmt.Errorf("could not get repo by did: %w", err) 693 } 694 if repo == nil { 695 return nil, fmt.Errorf("repo not found for did: %s", did) 696 } 697 anonXRPCC := &xrpc.Client{ 698 Host: repo.PDS, 699 Client: &aqhttp.Client, 700 } 701 session, err := comatproto.ServerCreateSession(context.Background(), anonXRPCC, &comatproto.ServerCreateSession_Input{ 702 Identifier: repo.DID, 703 Password: password, 704 }) 705 if err != nil { 706 return nil, fmt.Errorf("could not create session: %w", err) 707 } 708 709 log.Warn(context.Background(), "created session for dev account", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS) 710 711 return &xrpc.Client{ 712 Host: repo.PDS, 713 Client: &aqhttp.Client, 714 Auth: &xrpc.AuthInfo{ 715 Did: repo.DID, 716 AccessJwt: session.AccessJwt, 717 RefreshJwt: session.RefreshJwt, 718 Handle: repo.Handle, 719 }, 720 }, nil 721 } 722 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID) 723 if err != nil { 724 return nil, fmt.Errorf("could not get OAuth session for repoDID: %w", err) 725 } 726 if session == nil { 727 return nil, fmt.Errorf("no session found for repoDID: %s", ss.repoDID) 728 } 729 730 session, err = ss.op.RefreshIfNeeded(session) 731 if err != nil { 732 return nil, fmt.Errorf("could not refresh session for repoDID: %w", err) 733 } 734 735 client, err := ss.op.GetXrpcClient(session) 736 if err != nil { 737 return nil, fmt.Errorf("could not get xrpc client: %w", err) 738 } 739 740 return client, nil 741} 742 743type runningMultistream struct { 744 cancel func() 745 key string 746 pushID string 747 url string 748} 749 750func sanitizeMultistreamTargetURL(uri string) string { 751 u, err := url.Parse(uri) 752 if err != nil { 753 return uri 754 } 755 u.Path = "/redacted" 756 return u.String() 757} 758 759// we're making an attempt here not to log (sensitive) stream keys, so we're 760// referencing by atproto URI 761func (ss *StreamSession) HandleMultistreamTargets(ctx context.Context) error { 762 ctx = log.WithLogValues(ctx, "system", "multistreaming") 763 isTrue := true 764 // {target.Uri}:{rec.Url} -> runningMultistream 765 // no concurrency issues, it's only used from this one loop 766 running := map[string]*runningMultistream{} 767 for { 768 targets, err := ss.statefulDB.ListMultistreamTargets(ss.repoDID, 100, 0, &isTrue) 769 if err != nil { 770 return fmt.Errorf("failed to list multistream targets: %w", err) 771 } 772 currentRunning := map[string]bool{} 773 for _, targetView := range targets { 774 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget) 775 if !ok { 776 log.Error(ctx, "failed to convert multistream target to streamplace multistream target", "uri", targetView.Uri) 777 continue 778 } 779 uu, err := uuid.NewV7() 780 if err != nil { 781 return err 782 } 783 ctx := log.WithLogValues(ctx, "url", sanitizeMultistreamTargetURL(rec.Url), "pushID", uu.String()) 784 key := fmt.Sprintf("%s:%s", targetView.Uri, rec.Url) 785 if running[key] == nil { 786 childCtx, childCancel := context.WithCancel(ctx) 787 ss.Go(ctx, func() error { 788 log.Log(ctx, "starting multistream target", "uri", targetView.Uri) 789 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, "starting multistream target", "pending") 790 if err != nil { 791 log.Error(ctx, "failed to create multistream event", "error", err) 792 } 793 return ss.StartMultistreamTarget(childCtx, targetView) 794 }) 795 running[key] = &runningMultistream{ 796 cancel: childCancel, 797 key: key, 798 pushID: uu.String(), 799 url: sanitizeMultistreamTargetURL(rec.Url), 800 } 801 } 802 currentRunning[key] = true 803 } 804 for key := range running { 805 if !currentRunning[key] { 806 log.Log(ctx, "stopping multistream target", "url", sanitizeMultistreamTargetURL(running[key].url), "pushID", running[key].pushID) 807 running[key].cancel() 808 delete(running, key) 809 } 810 } 811 select { 812 case <-ctx.Done(): 813 return nil 814 case <-time.After(time.Second * 5): 815 continue 816 } 817 } 818} 819 820func (ss *StreamSession) StartMultistreamTarget(ctx context.Context, targetView *streamplace.MultistreamDefs_TargetView) error { 821 for { 822 err := ss.mm.RTMPPush(ctx, ss.repoDID, "source", targetView) 823 if err != nil { 824 log.Error(ctx, "failed to push to RTMP server", "error", err) 825 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, err.Error(), "error") 826 if err != nil { 827 log.Error(ctx, "failed to create multistream event", "error", err) 828 } 829 } 830 select { 831 case <-ctx.Done(): 832 return nil 833 case <-time.After(time.Second * 5): 834 continue 835 } 836 } 837}