Live video on the AT Protocol
at next 841 lines 25 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/url" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/util" 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/google/uuid" 16 "github.com/streamplace/oatproxy/pkg/oatproxy" 17 "golang.org/x/sync/errgroup" 18 "stream.place/streamplace/pkg/aqhttp" 19 "stream.place/streamplace/pkg/aqtime" 20 "stream.place/streamplace/pkg/bus" 21 "stream.place/streamplace/pkg/config" 22 "stream.place/streamplace/pkg/livepeer" 23 "stream.place/streamplace/pkg/localdb" 24 "stream.place/streamplace/pkg/log" 25 "stream.place/streamplace/pkg/media" 26 "stream.place/streamplace/pkg/model" 27 "stream.place/streamplace/pkg/renditions" 28 "stream.place/streamplace/pkg/replication" 29 "stream.place/streamplace/pkg/spmetrics" 30 "stream.place/streamplace/pkg/statedb" 31 "stream.place/streamplace/pkg/streamplace" 32 "stream.place/streamplace/pkg/thumbnail" 33) 34 35type StreamSession struct { 36 mm *media.MediaManager 37 mod model.Model 38 cli *config.CLI 39 bus *bus.Bus 40 op *oatproxy.OATProxy 41 hls *media.M3U8 42 lp *livepeer.LivepeerSession 43 repoDID string 44 segmentChan chan struct{} 45 lastStatus time.Time 46 lastStatusCID *string 47 lastOriginTime time.Time 48 localDB localdb.LocalDB 49 50 // Channels for background workers 51 statusUpdateChan chan struct{} // Signal to update status 52 originUpdateChan chan struct{} // Signal to update broadcast origin 53 54 g *errgroup.Group 55 started chan struct{} 56 ctx context.Context 57 packets []bus.PacketizedSegment 58 statefulDB *statedb.StatefulDB 59 replicator replication.Replicator 60} 61 62func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { 63 ctx, cancel := context.WithCancel(ctx) 64 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc() 65 ss.g, ctx = errgroup.WithContext(ctx) 66 sid := livepeer.RandomTrailer(8) 67 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID) 68 ss.ctx = ctx 69 log.Log(ctx, "starting stream session") 70 defer cancel() 71 spseg, err := notif.Segment.ToStreamplaceSegment() 72 if err != nil { 73 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 74 } 75 var allRenditions renditions.Renditions 76 77 if ss.cli.LivepeerGatewayURL != "" { 78 allRenditions, err = renditions.GenerateRenditions(spseg) 79 } else { 80 allRenditions = []renditions.Rendition{} 81 } 82 if err != nil { 83 return err 84 } 85 if spseg.Duration == nil { 86 return fmt.Errorf("segment duration is required to calculate bitrate") 87 } 88 dur := time.Duration(*spseg.Duration) 89 byteLen := len(notif.Data) 90 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 91 sourceRendition := renditions.Rendition{ 92 Name: "source", 93 Bitrate: bitrate, 94 Width: spseg.Video[0].Width, 95 Height: spseg.Video[0].Height, 96 } 97 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 98 ss.hls = media.NewM3U8(allRenditions) 99 100 // for _, r := range allRenditions { 101 // g.Go(func() error { 102 // for { 103 // if ctx.Err() != nil { 104 // return nil 105 // } 106 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 107 // if ctx.Err() != nil { 108 // return nil 109 // } 110 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 111 // time.Sleep(time.Second * 5) 112 // } 113 // }) 114 // } 115 116 close(ss.started) 117 118 // Start background workers for status and origin updates 119 ss.g.Go(func() error { 120 return ss.statusUpdateLoop(ctx, spseg.Creator) 121 }) 122 ss.g.Go(func() error { 123 return ss.originUpdateLoop(ctx) 124 }) 125 126 if notif.Local { 127 ss.Go(ctx, func() error { 128 return ss.HandleMultistreamTargets(ctx) 129 }) 130 } 131 132 for { 133 select { 134 case <-ss.segmentChan: 135 // reset timer 136 case <-ctx.Done(): 137 // Signal all background workers to stop 138 return ss.g.Wait() 139 // case <-time.After(time.Minute * 1): 140 case <-time.After(ss.cli.StreamSessionTimeout): 141 log.Log(ctx, "stream session timeout, shutting down", "timeout", ss.cli.StreamSessionTimeout) 142 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec() 143 for _, r := range allRenditions { 144 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 145 } 146 // Signal background workers to stop 147 if notif.Local { 148 ss.Go(ctx, func() error { 149 return ss.DeleteStatus(spseg.Creator) 150 }) 151 } 152 cancel() 153 } 154 } 155} 156 157// Execute a goroutine in the context of the stream session. Errors are 158// non-fatal; if you actually want to melt the universe on an error you 159// should panic() 160func (ss *StreamSession) Go(ctx context.Context, f func() error) { 161 <-ss.started 162 ss.g.Go(func() error { 163 err := f() 164 if err != nil { 165 log.Error(ctx, "error in stream_session goroutine", "error", err) 166 } 167 return nil 168 }) 169} 170 171func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error { 172 <-ss.started 173 go func() { 174 select { 175 case <-ss.ctx.Done(): 176 return 177 case ss.segmentChan <- struct{}{}: 178 } 179 }() 180 aqt := aqtime.FromTime(notif.Segment.StartTime) 181 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 182 notif.Segment.MediaData.Size = len(notif.Data) 183 err := ss.localDB.CreateSegment(notif.Segment) 184 if err != nil { 185 return fmt.Errorf("could not add segment to database: %w", err) 186 } 187 spseg, err := notif.Segment.ToStreamplaceSegment() 188 if err != nil { 189 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 190 } 191 192 ss.bus.Publish(spseg.Creator, spseg) 193 ss.Go(ctx, func() error { 194 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 195 Filepath: notif.Segment.ID, 196 Data: notif.Data, 197 }) 198 }) 199 200 if ss.cli.Thumbnail { 201 ss.Go(ctx, func() error { 202 return ss.Thumbnail(ctx, spseg.Creator, notif) 203 }) 204 } 205 206 if notif.Local { 207 ss.UpdateStatus(ctx, spseg.Creator) 208 ss.UpdateBroadcastOrigin(ctx) 209 } 210 211 if ss.cli.LivepeerGatewayURL != "" { 212 ss.Go(ctx, func() error { 213 start := time.Now() 214 err := ss.Transcode(ctx, spseg, notif.Data) 215 took := time.Since(start) 216 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 217 return err 218 }) 219 } 220 221 // trigger a notification blast if this is a new livestream 222 if notif.Metadata.Livestream != nil { 223 ss.Go(ctx, func() error { 224 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator) 225 if err != nil { 226 return fmt.Errorf("failed to get repo: %w", err) 227 } 228 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator) 229 if err != nil { 230 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 231 } 232 if livestreamModel == nil { 233 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator) 234 return nil 235 } 236 lsv, err := livestreamModel.ToLivestreamView() 237 if err != nil { 238 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err) 239 } 240 if !shouldNotify(lsv) { 241 log.Debug(ctx, "is not set to notify", "repoDID", spseg.Creator) 242 return nil 243 } 244 task := &statedb.NotificationTask{ 245 Livestream: lsv, 246 PDSURL: r.PDS, 247 } 248 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator) 249 if err != nil { 250 return fmt.Errorf("failed to get chat profile: %w", err) 251 } 252 if cp != nil { 253 spcp, err := cp.ToStreamplaceChatProfile() 254 if err != nil { 255 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 256 } 257 task.ChatProfile = spcp 258 } 259 260 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri))) 261 if err != nil { 262 log.Error(ctx, "failed to enqueue notification task", "err", err) 263 } 264 ss.UpdateStatus(ctx, spseg.Creator) 265 return nil 266 }) 267 } else { 268 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator) 269 } 270 271 return nil 272} 273 274func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool { 275 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 276 if !ok { 277 return true 278 } 279 if lsvr.NotificationSettings == nil { 280 return true 281 } 282 settings := lsvr.NotificationSettings 283 if settings.PushNotification == nil { 284 return true 285 } 286 return *settings.PushNotification 287} 288 289func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 290 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 291 locked := lock.TryLock() 292 if !locked { 293 // we're already generating a thumbnail for this user, skip 294 return nil 295 } 296 defer lock.Unlock() 297 oldThumb, err := ss.localDB.LatestThumbnailForUser(not.Segment.RepoDID) 298 if err != nil { 299 return err 300 } 301 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 302 // we have a thumbnail <60sec old, skip generating a new one 303 return nil 304 } 305 r := bytes.NewReader(not.Data) 306 aqt := aqtime.FromTime(not.Segment.StartTime) 307 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 308 if err != nil { 309 return err 310 } 311 defer fd.Close() 312 err = media.Thumbnail(ctx, r, fd, "jpeg") 313 if err != nil { 314 return err 315 } 316 thumb := &localdb.Thumbnail{ 317 Format: "jpeg", 318 SegmentID: not.Segment.ID, 319 } 320 err = ss.localDB.CreateThumbnail(thumb) 321 if err != nil { 322 return err 323 } 324 return nil 325} 326 327// UpdateStatus signals the background worker to update status (non-blocking) 328func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) { 329 select { 330 case ss.statusUpdateChan <- struct{}{}: 331 default: 332 // Channel full, signal already pending 333 } 334} 335 336// statusUpdateLoop runs as a background goroutine for the session lifetime 337func (ss *StreamSession) statusUpdateLoop(ctx context.Context, repoDID string) error { 338 ctx = log.WithLogValues(ctx, "func", "statusUpdateLoop") 339 for { 340 select { 341 case <-ctx.Done(): 342 return nil 343 case <-ss.statusUpdateChan: 344 if time.Since(ss.lastStatus) < time.Minute { 345 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 346 continue 347 } 348 if err := ss.doUpdateStatus(ctx, repoDID); err != nil { 349 log.Error(ctx, "failed to update status", "error", err) 350 } 351 } 352 } 353} 354 355// doUpdateStatus performs the actual status update work 356func (ss *StreamSession) doUpdateStatus(ctx context.Context, repoDID string) error { 357 ctx = log.WithLogValues(ctx, "func", "doUpdateStatus") 358 359 client, err := ss.GetClientByDID(repoDID) 360 if err != nil { 361 return fmt.Errorf("could not get xrpc client: %w", err) 362 } 363 364 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 365 if err != nil { 366 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 367 } 368 lsv, err := ls.ToLivestreamView() 369 if err != nil { 370 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 371 } 372 373 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 374 if !ok { 375 return fmt.Errorf("livestream is not a streamplace livestream") 376 } 377 thumb := lsvr.Thumb 378 379 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 380 if err != nil { 381 return fmt.Errorf("could not get repo for repoDID: %w", err) 382 } 383 384 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 385 if !ok { 386 return fmt.Errorf("livestream is not a streamplace livestream") 387 } 388 389 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle) 390 391 if lsr.CanonicalUrl != nil { 392 canonicalUrl = *lsr.CanonicalUrl 393 } 394 395 actorStatusEmbed := bsky.ActorStatus_Embed{ 396 EmbedExternal: &bsky.EmbedExternal{ 397 External: &bsky.EmbedExternal_External{ 398 Title: lsr.Title, 399 Uri: canonicalUrl, 400 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost), 401 Thumb: thumb, 402 }, 403 }, 404 } 405 406 duration := int64(10) 407 status := bsky.ActorStatus{ 408 Status: "app.bsky.actor.status#live", 409 DurationMinutes: &duration, 410 Embed: &actorStatusEmbed, 411 CreatedAt: time.Now().Format(time.RFC3339), 412 } 413 414 var swapRecord *string 415 getOutput := comatproto.RepoGetRecord_Output{} 416 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 417 "repo": repoDID, 418 "collection": "app.bsky.actor.status", 419 "rkey": "self", 420 }, nil, &getOutput) 421 if err != nil { 422 xErr, ok := err.(*xrpc.Error) 423 if !ok { 424 return fmt.Errorf("could not get record: %w", err) 425 } 426 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 427 return fmt.Errorf("could not get record: %w", err) 428 } 429 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 430 } else { 431 log.Debug(ctx, "got record", "record", getOutput) 432 swapRecord = getOutput.Cid 433 } 434 435 inp := comatproto.RepoPutRecord_Input{ 436 Collection: "app.bsky.actor.status", 437 Record: &lexutil.LexiconTypeDecoder{Val: &status}, 438 Rkey: "self", 439 Repo: repoDID, 440 SwapRecord: swapRecord, 441 } 442 out := comatproto.RepoPutRecord_Output{} 443 444 ss.lastStatusCID = &out.Cid 445 446 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 447 if err != nil { 448 return fmt.Errorf("could not create record: %w", err) 449 } 450 log.Debug(ctx, "created status record", "out", out) 451 452 ss.lastStatus = time.Now() 453 454 return nil 455} 456 457func (ss *StreamSession) DeleteStatus(repoDID string) error { 458 // need a special extra context because the stream session context is already cancelled 459 // No lock needed - this runs during teardown after the background worker has exited 460 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID) 461 if ss.lastStatusCID == nil { 462 log.Debug(ctx, "no status cid to delete") 463 return nil 464 } 465 inp := comatproto.RepoDeleteRecord_Input{ 466 Collection: "app.bsky.actor.status", 467 Rkey: "self", 468 Repo: repoDID, 469 } 470 inp.SwapRecord = ss.lastStatusCID 471 out := comatproto.RepoDeleteRecord_Output{} 472 473 client, err := ss.GetClientByDID(repoDID) 474 if err != nil { 475 return fmt.Errorf("could not get xrpc client: %w", err) 476 } 477 478 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out) 479 if err != nil { 480 return fmt.Errorf("could not delete record: %w", err) 481 } 482 483 ss.lastStatusCID = nil 484 return nil 485} 486 487var originUpdateInterval = time.Second * 30 488 489// UpdateBroadcastOrigin signals the background worker to update origin (non-blocking) 490func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) { 491 select { 492 case ss.originUpdateChan <- struct{}{}: 493 default: 494 // Channel full, signal already pending 495 } 496} 497 498// originUpdateLoop runs as a background goroutine for the session lifetime 499func (ss *StreamSession) originUpdateLoop(ctx context.Context) error { 500 ctx = log.WithLogValues(ctx, "func", "originUpdateLoop") 501 for { 502 select { 503 case <-ctx.Done(): 504 return nil 505 case <-ss.originUpdateChan: 506 if time.Since(ss.lastOriginTime) < originUpdateInterval { 507 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 508 continue 509 } 510 if err := ss.doUpdateBroadcastOrigin(ctx); err != nil { 511 log.Error(ctx, "failed to update broadcast origin", "error", err) 512 } 513 } 514 } 515} 516 517// doUpdateBroadcastOrigin performs the actual broadcast origin update work 518func (ss *StreamSession) doUpdateBroadcastOrigin(ctx context.Context) error { 519 ctx = log.WithLogValues(ctx, "func", "doUpdateBroadcastOrigin") 520 521 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost) 522 origin := streamplace.BroadcastOrigin{ 523 Streamer: ss.repoDID, 524 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost), 525 Broadcaster: &broadcaster, 526 UpdatedAt: time.Now().UTC().Format(util.ISO8601), 527 } 528 err := ss.replicator.BuildOriginRecord(&origin) 529 if err != nil { 530 return fmt.Errorf("could not build origin record: %w", err) 531 } 532 533 client, err := ss.GetClientByDID(ss.repoDID) 534 if err != nil { 535 return fmt.Errorf("could not get xrpc client for repoDID: %w", err) 536 } 537 538 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost) 539 540 var swapRecord *string 541 getOutput := comatproto.RepoGetRecord_Output{} 542 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 543 "repo": ss.repoDID, 544 "collection": "place.stream.broadcast.origin", 545 "rkey": rkey, 546 }, nil, &getOutput) 547 if err != nil { 548 xErr, ok := err.(*xrpc.Error) 549 if !ok { 550 return fmt.Errorf("could not get record: %w", err) 551 } 552 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 553 return fmt.Errorf("could not get record: %w", err) 554 } 555 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID) 556 } else { 557 log.Debug(ctx, "got record", "record", getOutput) 558 swapRecord = getOutput.Cid 559 } 560 561 inp := comatproto.RepoPutRecord_Input{ 562 Collection: "place.stream.broadcast.origin", 563 Record: &lexutil.LexiconTypeDecoder{Val: &origin}, 564 Rkey: rkey, 565 Repo: ss.repoDID, 566 SwapRecord: swapRecord, 567 } 568 out := comatproto.RepoPutRecord_Output{} 569 570 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 571 if err != nil { 572 return fmt.Errorf("could not create record: %w", err) 573 } 574 575 ss.lastOriginTime = time.Now() 576 return nil 577} 578 579func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 580 rs, err := renditions.GenerateRenditions(spseg) 581 if err != nil { 582 return fmt.Errorf("failed to generated renditions: %w", err) 583 } 584 585 if ss.lp == nil { 586 var err error 587 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL) 588 if err != nil { 589 return err 590 } 591 592 } 593 spmetrics.TranscodeAttemptsTotal.Inc() 594 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 595 if err != nil { 596 spmetrics.TranscodeErrorsTotal.Inc() 597 return err 598 } 599 if len(rs) != len(segs) { 600 spmetrics.TranscodeErrorsTotal.Inc() 601 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 602 } 603 spmetrics.TranscodeSuccessesTotal.Inc() 604 aqt, err := aqtime.FromString(spseg.StartTime) 605 if err != nil { 606 return err 607 } 608 for i, seg := range segs { 609 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 610 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 611 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 612 if err != nil { 613 return fmt.Errorf("failed to create transcoded segment file: %w", err) 614 } 615 defer fd.Close() 616 _, err = fd.Write(seg) 617 if err != nil { 618 return fmt.Errorf("failed to write transcoded segment file: %w", err) 619 } 620 ss.Go(ctx, func() error { 621 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 622 Filepath: fd.Name(), 623 Data: seg, 624 }) 625 }) 626 627 } 628 return nil 629} 630 631func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 632 ss.Go(ctx, func() error { 633 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 634 }) 635 ss.Go(ctx, func() error { 636 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 637 }) 638 return nil 639} 640 641func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 642 packet, err := media.Packetize(ctx, seg) 643 if err != nil { 644 return fmt.Errorf("failed to packetize segment: %w", err) 645 } 646 seg.PacketizedData = packet 647 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 648 return nil 649} 650 651func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 652 buf := bytes.Buffer{} 653 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 654 if err != nil { 655 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 656 } 657 // newSeg := &streamplace.Segment{ 658 // LexiconTypeID: "place.stream.segment", 659 // Id: spseg.Id, 660 // Creator: spseg.Creator, 661 // StartTime: spseg.StartTime, 662 // Duration: &dur, 663 // Audio: spseg.Audio, 664 // Video: spseg.Video, 665 // SigningKey: spseg.SigningKey, 666 // } 667 aqt, err := aqtime.FromString(spseg.StartTime) 668 if err != nil { 669 return fmt.Errorf("failed to parse segment start time: %w", err) 670 } 671 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 672 rend, err := ss.hls.GetRendition(rendition) 673 if err != nil { 674 return fmt.Errorf("failed to get rendition: %w", err) 675 } 676 if err := rend.NewSegment(&media.Segment{ 677 Buf: &buf, 678 Duration: time.Duration(dur), 679 Time: aqt.Time(), 680 }); err != nil { 681 return fmt.Errorf("failed to create new segment: %w", err) 682 } 683 684 return nil 685} 686 687type XRPCClient interface { 688 Do(ctx context.Context, method string, contentType string, path string, queryParams map[string]any, body any, out any) error 689} 690 691func (ss *StreamSession) GetClientByDID(did string) (XRPCClient, error) { 692 password, ok := ss.cli.DevAccountCreds[did] 693 if ok { 694 repo, err := ss.mod.GetRepoByHandleOrDID(did) 695 if err != nil { 696 return nil, fmt.Errorf("could not get repo by did: %w", err) 697 } 698 if repo == nil { 699 return nil, fmt.Errorf("repo not found for did: %s", did) 700 } 701 anonXRPCC := &xrpc.Client{ 702 Host: repo.PDS, 703 Client: &aqhttp.Client, 704 } 705 session, err := comatproto.ServerCreateSession(context.Background(), anonXRPCC, &comatproto.ServerCreateSession_Input{ 706 Identifier: repo.DID, 707 Password: password, 708 }) 709 if err != nil { 710 return nil, fmt.Errorf("could not create session: %w", err) 711 } 712 713 log.Warn(context.Background(), "created session for dev account", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS) 714 715 return &xrpc.Client{ 716 Host: repo.PDS, 717 Client: &aqhttp.Client, 718 Auth: &xrpc.AuthInfo{ 719 Did: repo.DID, 720 AccessJwt: session.AccessJwt, 721 RefreshJwt: session.RefreshJwt, 722 Handle: repo.Handle, 723 }, 724 }, nil 725 } 726 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID) 727 if err != nil { 728 return nil, fmt.Errorf("could not get OAuth session for repoDID: %w", err) 729 } 730 if session == nil { 731 return nil, fmt.Errorf("no session found for repoDID: %s", ss.repoDID) 732 } 733 734 session, err = ss.op.RefreshIfNeeded(session) 735 if err != nil { 736 return nil, fmt.Errorf("could not refresh session for repoDID: %w", err) 737 } 738 739 client, err := ss.op.GetXrpcClient(session) 740 if err != nil { 741 return nil, fmt.Errorf("could not get xrpc client: %w", err) 742 } 743 744 return client, nil 745} 746 747type runningMultistream struct { 748 cancel func() 749 key string 750 pushID string 751 url string 752} 753 754func sanitizeMultistreamTargetURL(uri string) string { 755 u, err := url.Parse(uri) 756 if err != nil { 757 return uri 758 } 759 u.Path = "/redacted" 760 return u.String() 761} 762 763// we're making an attempt here not to log (sensitive) stream keys, so we're 764// referencing by atproto URI 765func (ss *StreamSession) HandleMultistreamTargets(ctx context.Context) error { 766 ctx = log.WithLogValues(ctx, "system", "multistreaming") 767 isTrue := true 768 // {target.Uri}:{rec.Url} -> runningMultistream 769 // no concurrency issues, it's only used from this one loop 770 running := map[string]*runningMultistream{} 771 for { 772 targets, err := ss.statefulDB.ListMultistreamTargets(ss.repoDID, 100, 0, &isTrue) 773 if err != nil { 774 return fmt.Errorf("failed to list multistream targets: %w", err) 775 } 776 currentRunning := map[string]bool{} 777 for _, targetView := range targets { 778 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget) 779 if !ok { 780 log.Error(ctx, "failed to convert multistream target to streamplace multistream target", "uri", targetView.Uri) 781 continue 782 } 783 uu, err := uuid.NewV7() 784 if err != nil { 785 return err 786 } 787 ctx := log.WithLogValues(ctx, "url", sanitizeMultistreamTargetURL(rec.Url), "pushID", uu.String()) 788 key := fmt.Sprintf("%s:%s", targetView.Uri, rec.Url) 789 if running[key] == nil { 790 childCtx, childCancel := context.WithCancel(ctx) 791 ss.Go(ctx, func() error { 792 log.Log(ctx, "starting multistream target", "uri", targetView.Uri) 793 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, "starting multistream target", "pending") 794 if err != nil { 795 log.Error(ctx, "failed to create multistream event", "error", err) 796 } 797 return ss.StartMultistreamTarget(childCtx, targetView) 798 }) 799 running[key] = &runningMultistream{ 800 cancel: childCancel, 801 key: key, 802 pushID: uu.String(), 803 url: sanitizeMultistreamTargetURL(rec.Url), 804 } 805 } 806 currentRunning[key] = true 807 } 808 for key := range running { 809 if !currentRunning[key] { 810 log.Log(ctx, "stopping multistream target", "url", sanitizeMultistreamTargetURL(running[key].url), "pushID", running[key].pushID) 811 running[key].cancel() 812 delete(running, key) 813 } 814 } 815 select { 816 case <-ctx.Done(): 817 return nil 818 case <-time.After(time.Second * 5): 819 continue 820 } 821 } 822} 823 824func (ss *StreamSession) StartMultistreamTarget(ctx context.Context, targetView *streamplace.MultistreamDefs_TargetView) error { 825 for { 826 err := ss.mm.RTMPPush(ctx, ss.repoDID, "source", targetView) 827 if err != nil { 828 log.Error(ctx, "failed to push to RTMP server", "error", err) 829 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, err.Error(), "error") 830 if err != nil { 831 log.Error(ctx, "failed to create multistream event", "error", err) 832 } 833 } 834 select { 835 case <-ctx.Done(): 836 return nil 837 case <-time.After(time.Second * 5): 838 continue 839 } 840 } 841}