Live video on the AT Protocol
at eli/rtmp-push 763 lines 23 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/util" 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/streamplace/oatproxy/pkg/oatproxy" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/aqtime" 19 "stream.place/streamplace/pkg/bus" 20 "stream.place/streamplace/pkg/config" 21 "stream.place/streamplace/pkg/livepeer" 22 "stream.place/streamplace/pkg/log" 23 "stream.place/streamplace/pkg/media" 24 "stream.place/streamplace/pkg/model" 25 "stream.place/streamplace/pkg/renditions" 26 "stream.place/streamplace/pkg/replication" 27 "stream.place/streamplace/pkg/spmetrics" 28 "stream.place/streamplace/pkg/statedb" 29 "stream.place/streamplace/pkg/streamplace" 30 "stream.place/streamplace/pkg/thumbnail" 31) 32 33type StreamSession struct { 34 mm *media.MediaManager 35 mod model.Model 36 cli *config.CLI 37 bus *bus.Bus 38 op *oatproxy.OATProxy 39 hls *media.M3U8 40 lp *livepeer.LivepeerSession 41 repoDID string 42 segmentChan chan struct{} 43 lastStatus time.Time 44 lastStatusCID *string 45 lastStatusLock sync.Mutex 46 lastOriginTime time.Time 47 lastOriginLock sync.Mutex 48 g *errgroup.Group 49 started chan struct{} 50 ctx context.Context 51 packets []bus.PacketizedSegment 52 statefulDB *statedb.StatefulDB 53 replicator replication.Replicator 54} 55 56func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { 57 ctx, cancel := context.WithCancel(ctx) 58 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc() 59 ss.g, ctx = errgroup.WithContext(ctx) 60 sid := livepeer.RandomTrailer(8) 61 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID) 62 ss.ctx = ctx 63 log.Log(ctx, "starting stream session") 64 defer cancel() 65 spseg, err := notif.Segment.ToStreamplaceSegment() 66 if err != nil { 67 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 68 } 69 var allRenditions renditions.Renditions 70 71 if ss.cli.LivepeerGatewayURL != "" { 72 allRenditions, err = renditions.GenerateRenditions(spseg) 73 } else { 74 allRenditions = []renditions.Rendition{} 75 } 76 if err != nil { 77 return err 78 } 79 if spseg.Duration == nil { 80 return fmt.Errorf("segment duration is required to calculate bitrate") 81 } 82 dur := time.Duration(*spseg.Duration) 83 byteLen := len(notif.Data) 84 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 85 sourceRendition := renditions.Rendition{ 86 Name: "source", 87 Bitrate: bitrate, 88 Width: spseg.Video[0].Width, 89 Height: spseg.Video[0].Height, 90 } 91 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 92 ss.hls = media.NewM3U8(allRenditions) 93 94 // for _, r := range allRenditions { 95 // g.Go(func() error { 96 // for { 97 // if ctx.Err() != nil { 98 // return nil 99 // } 100 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 101 // if ctx.Err() != nil { 102 // return nil 103 // } 104 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 105 // time.Sleep(time.Second * 5) 106 // } 107 // }) 108 // } 109 110 close(ss.started) 111 112 ss.Go(ctx, func() error { 113 return ss.HandleMultistreamTargets(ctx) 114 }) 115 116 for { 117 select { 118 case <-ss.segmentChan: 119 // reset timer 120 case <-ctx.Done(): 121 return ss.g.Wait() 122 // case <-time.After(time.Minute * 1): 123 case <-time.After(ss.cli.StreamSessionTimeout): 124 log.Log(ctx, "stream session timeout, shutting down", "timeout", ss.cli.StreamSessionTimeout) 125 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec() 126 for _, r := range allRenditions { 127 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 128 } 129 if notif.Local { 130 ss.Go(ctx, func() error { 131 return ss.DeleteStatus(spseg.Creator) 132 }) 133 } 134 cancel() 135 } 136 } 137} 138 139// Execute a goroutine in the context of the stream session. Errors are 140// non-fatal; if you actually want to melt the universe on an error you 141// should panic() 142func (ss *StreamSession) Go(ctx context.Context, f func() error) { 143 <-ss.started 144 ss.g.Go(func() error { 145 err := f() 146 if err != nil { 147 log.Error(ctx, "error in stream_session goroutine", "error", err) 148 } 149 return nil 150 }) 151} 152 153func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error { 154 <-ss.started 155 go func() { 156 select { 157 case <-ss.ctx.Done(): 158 return 159 case ss.segmentChan <- struct{}{}: 160 } 161 }() 162 aqt := aqtime.FromTime(notif.Segment.StartTime) 163 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 164 notif.Segment.MediaData.Size = len(notif.Data) 165 err := ss.mod.CreateSegment(notif.Segment) 166 if err != nil { 167 return fmt.Errorf("could not add segment to database: %w", err) 168 } 169 spseg, err := notif.Segment.ToStreamplaceSegment() 170 if err != nil { 171 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 172 } 173 174 ss.bus.Publish(spseg.Creator, spseg) 175 ss.Go(ctx, func() error { 176 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 177 Filepath: notif.Segment.ID, 178 Data: notif.Data, 179 }) 180 }) 181 182 if ss.cli.Thumbnail { 183 ss.Go(ctx, func() error { 184 return ss.Thumbnail(ctx, spseg.Creator, notif) 185 }) 186 } 187 188 if notif.Local { 189 ss.Go(ctx, func() error { 190 return ss.UpdateStatus(ctx, spseg.Creator) 191 }) 192 193 ss.Go(ctx, func() error { 194 return ss.UpdateBroadcastOrigin(ctx) 195 }) 196 } 197 198 if ss.cli.LivepeerGatewayURL != "" { 199 ss.Go(ctx, func() error { 200 start := time.Now() 201 err := ss.Transcode(ctx, spseg, notif.Data) 202 took := time.Since(start) 203 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 204 return err 205 }) 206 } 207 208 // trigger a notification blast if this is a new livestream 209 if notif.Metadata.Livestream != nil { 210 ss.Go(ctx, func() error { 211 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator) 212 if err != nil { 213 return fmt.Errorf("failed to get repo: %w", err) 214 } 215 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator) 216 if err != nil { 217 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 218 } 219 if livestreamModel == nil { 220 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator) 221 return nil 222 } 223 lsv, err := livestreamModel.ToLivestreamView() 224 if err != nil { 225 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err) 226 } 227 if !shouldNotify(lsv) { 228 log.Debug(ctx, "is not set to notify", "repoDID", spseg.Creator) 229 return nil 230 } 231 task := &statedb.NotificationTask{ 232 Livestream: lsv, 233 PDSURL: r.PDS, 234 } 235 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator) 236 if err != nil { 237 return fmt.Errorf("failed to get chat profile: %w", err) 238 } 239 if cp != nil { 240 spcp, err := cp.ToStreamplaceChatProfile() 241 if err != nil { 242 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 243 } 244 task.ChatProfile = spcp 245 } 246 247 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri))) 248 if err != nil { 249 log.Error(ctx, "failed to enqueue notification task", "err", err) 250 } 251 return ss.UpdateStatus(ctx, spseg.Creator) 252 }) 253 } else { 254 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator) 255 } 256 257 return nil 258} 259 260func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool { 261 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 262 if !ok { 263 return true 264 } 265 if lsvr.NotificationSettings == nil { 266 return true 267 } 268 settings := lsvr.NotificationSettings 269 if settings.PushNotification == nil { 270 return true 271 } 272 return *settings.PushNotification 273} 274 275func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 276 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 277 locked := lock.TryLock() 278 if !locked { 279 // we're already generating a thumbnail for this user, skip 280 return nil 281 } 282 defer lock.Unlock() 283 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 284 if err != nil { 285 return err 286 } 287 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 288 // we have a thumbnail <60sec old, skip generating a new one 289 return nil 290 } 291 r := bytes.NewReader(not.Data) 292 aqt := aqtime.FromTime(not.Segment.StartTime) 293 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 294 if err != nil { 295 return err 296 } 297 defer fd.Close() 298 err = media.Thumbnail(ctx, r, fd, "jpeg") 299 if err != nil { 300 return err 301 } 302 thumb := &model.Thumbnail{ 303 Format: "jpeg", 304 SegmentID: not.Segment.ID, 305 } 306 err = ss.mod.CreateThumbnail(thumb) 307 if err != nil { 308 return err 309 } 310 return nil 311} 312 313func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 314 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 315 ss.lastStatusLock.Lock() 316 defer ss.lastStatusLock.Unlock() 317 if time.Since(ss.lastStatus) < time.Minute { 318 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 319 return nil 320 } 321 322 client, err := ss.GetClientByDID(repoDID) 323 if err != nil { 324 return fmt.Errorf("could not get xrpc client: %w", err) 325 } 326 327 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 328 if err != nil { 329 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 330 } 331 lsv, err := ls.ToLivestreamView() 332 if err != nil { 333 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 334 } 335 336 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 337 if !ok { 338 return fmt.Errorf("livestream is not a streamplace livestream") 339 } 340 thumb := lsvr.Thumb 341 342 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 343 if err != nil { 344 return fmt.Errorf("could not get repo for repoDID: %w", err) 345 } 346 347 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 348 if !ok { 349 return fmt.Errorf("livestream is not a streamplace livestream") 350 } 351 352 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle) 353 354 if lsr.CanonicalUrl != nil { 355 canonicalUrl = *lsr.CanonicalUrl 356 } 357 358 actorStatusEmbed := bsky.ActorStatus_Embed{ 359 EmbedExternal: &bsky.EmbedExternal{ 360 External: &bsky.EmbedExternal_External{ 361 Title: lsr.Title, 362 Uri: canonicalUrl, 363 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost), 364 Thumb: thumb, 365 }, 366 }, 367 } 368 369 duration := int64(120) 370 status := bsky.ActorStatus{ 371 Status: "app.bsky.actor.status#live", 372 DurationMinutes: &duration, 373 Embed: &actorStatusEmbed, 374 CreatedAt: time.Now().Format(time.RFC3339), 375 } 376 377 var swapRecord *string 378 getOutput := comatproto.RepoGetRecord_Output{} 379 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 380 "repo": repoDID, 381 "collection": "app.bsky.actor.status", 382 "rkey": "self", 383 }, nil, &getOutput) 384 if err != nil { 385 xErr, ok := err.(*xrpc.Error) 386 if !ok { 387 return fmt.Errorf("could not get record: %w", err) 388 } 389 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 390 return fmt.Errorf("could not get record: %w", err) 391 } 392 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 393 } else { 394 log.Debug(ctx, "got record", "record", getOutput) 395 swapRecord = getOutput.Cid 396 } 397 398 inp := comatproto.RepoPutRecord_Input{ 399 Collection: "app.bsky.actor.status", 400 Record: &lexutil.LexiconTypeDecoder{Val: &status}, 401 Rkey: "self", 402 Repo: repoDID, 403 SwapRecord: swapRecord, 404 } 405 out := comatproto.RepoPutRecord_Output{} 406 407 ss.lastStatusCID = &out.Cid 408 409 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 410 if err != nil { 411 return fmt.Errorf("could not create record: %w", err) 412 } 413 log.Debug(ctx, "created status record", "out", out) 414 415 ss.lastStatus = time.Now() 416 417 return nil 418} 419 420func (ss *StreamSession) DeleteStatus(repoDID string) error { 421 // need a special extra context because the stream session context is already cancelled 422 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID) 423 ss.lastStatusLock.Lock() 424 defer ss.lastStatusLock.Unlock() 425 if ss.lastStatusCID == nil { 426 log.Debug(ctx, "no status cid to delete") 427 return nil 428 } 429 inp := comatproto.RepoDeleteRecord_Input{ 430 Collection: "app.bsky.actor.status", 431 Rkey: "self", 432 Repo: repoDID, 433 } 434 inp.SwapRecord = ss.lastStatusCID 435 out := comatproto.RepoDeleteRecord_Output{} 436 437 client, err := ss.GetClientByDID(repoDID) 438 if err != nil { 439 return fmt.Errorf("could not get xrpc client: %w", err) 440 } 441 442 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out) 443 if err != nil { 444 return fmt.Errorf("could not delete record: %w", err) 445 } 446 447 ss.lastStatusCID = nil 448 return nil 449} 450 451var originUpdateInterval = time.Second * 30 452 453func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error { 454 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 455 ss.lastOriginLock.Lock() 456 defer ss.lastOriginLock.Unlock() 457 if time.Since(ss.lastOriginTime) < originUpdateInterval { 458 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 459 return nil 460 } 461 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost) 462 origin := streamplace.BroadcastOrigin{ 463 Streamer: ss.repoDID, 464 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost), 465 Broadcaster: &broadcaster, 466 UpdatedAt: time.Now().UTC().Format(util.ISO8601), 467 } 468 err := ss.replicator.BuildOriginRecord(&origin) 469 if err != nil { 470 return fmt.Errorf("could not build origin record: %w", err) 471 } 472 473 client, err := ss.GetClientByDID(ss.repoDID) 474 if err != nil { 475 return fmt.Errorf("could not get xrpc client for repoDID: %w", err) 476 } 477 478 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost) 479 480 var swapRecord *string 481 getOutput := comatproto.RepoGetRecord_Output{} 482 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 483 "repo": ss.repoDID, 484 "collection": "place.stream.broadcast.origin", 485 "rkey": rkey, 486 }, nil, &getOutput) 487 if err != nil { 488 xErr, ok := err.(*xrpc.Error) 489 if !ok { 490 return fmt.Errorf("could not get record: %w", err) 491 } 492 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 493 return fmt.Errorf("could not get record: %w", err) 494 } 495 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID) 496 } else { 497 log.Debug(ctx, "got record", "record", getOutput) 498 swapRecord = getOutput.Cid 499 } 500 501 inp := comatproto.RepoPutRecord_Input{ 502 Collection: "place.stream.broadcast.origin", 503 Record: &lexutil.LexiconTypeDecoder{Val: &origin}, 504 Rkey: rkey, 505 Repo: ss.repoDID, 506 SwapRecord: swapRecord, 507 } 508 out := comatproto.RepoPutRecord_Output{} 509 510 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 511 if err != nil { 512 return fmt.Errorf("could not create record: %w", err) 513 } 514 515 ss.lastOriginTime = time.Now() 516 return nil 517} 518 519func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 520 rs, err := renditions.GenerateRenditions(spseg) 521 if err != nil { 522 return fmt.Errorf("failed to generated renditions: %w", err) 523 } 524 525 if ss.lp == nil { 526 var err error 527 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL) 528 if err != nil { 529 return err 530 } 531 532 } 533 spmetrics.TranscodeAttemptsTotal.Inc() 534 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 535 if err != nil { 536 spmetrics.TranscodeErrorsTotal.Inc() 537 return err 538 } 539 if len(rs) != len(segs) { 540 spmetrics.TranscodeErrorsTotal.Inc() 541 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 542 } 543 spmetrics.TranscodeSuccessesTotal.Inc() 544 aqt, err := aqtime.FromString(spseg.StartTime) 545 if err != nil { 546 return err 547 } 548 for i, seg := range segs { 549 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 550 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 551 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 552 if err != nil { 553 return fmt.Errorf("failed to create transcoded segment file: %w", err) 554 } 555 defer fd.Close() 556 _, err = fd.Write(seg) 557 if err != nil { 558 return fmt.Errorf("failed to write transcoded segment file: %w", err) 559 } 560 ss.Go(ctx, func() error { 561 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 562 Filepath: fd.Name(), 563 Data: seg, 564 }) 565 }) 566 567 } 568 return nil 569} 570 571func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 572 ss.Go(ctx, func() error { 573 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 574 }) 575 ss.Go(ctx, func() error { 576 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 577 }) 578 return nil 579} 580 581func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 582 packet, err := media.Packetize(ctx, seg) 583 if err != nil { 584 return fmt.Errorf("failed to packetize segment: %w", err) 585 } 586 seg.PacketizedData = packet 587 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 588 return nil 589} 590 591func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 592 buf := bytes.Buffer{} 593 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 594 if err != nil { 595 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 596 } 597 // newSeg := &streamplace.Segment{ 598 // LexiconTypeID: "place.stream.segment", 599 // Id: spseg.Id, 600 // Creator: spseg.Creator, 601 // StartTime: spseg.StartTime, 602 // Duration: &dur, 603 // Audio: spseg.Audio, 604 // Video: spseg.Video, 605 // SigningKey: spseg.SigningKey, 606 // } 607 aqt, err := aqtime.FromString(spseg.StartTime) 608 if err != nil { 609 return fmt.Errorf("failed to parse segment start time: %w", err) 610 } 611 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 612 rend, err := ss.hls.GetRendition(rendition) 613 if err != nil { 614 return fmt.Errorf("failed to get rendition: %w", err) 615 } 616 if err := rend.NewSegment(&media.Segment{ 617 Buf: &buf, 618 Duration: time.Duration(dur), 619 Time: aqt.Time(), 620 }); err != nil { 621 return fmt.Errorf("failed to create new segment: %w", err) 622 } 623 624 return nil 625} 626 627type XRPCClient interface { 628 Do(ctx context.Context, method string, contentType string, path string, queryParams map[string]any, body any, out any) error 629} 630 631func (ss *StreamSession) GetClientByDID(did string) (XRPCClient, error) { 632 password, ok := ss.cli.DevAccountCreds[did] 633 if ok { 634 repo, err := ss.mod.GetRepoByHandleOrDID(did) 635 if err != nil { 636 return nil, fmt.Errorf("could not get repo by did: %w", err) 637 } 638 if repo == nil { 639 return nil, fmt.Errorf("repo not found for did: %s", did) 640 } 641 anonXRPCC := &xrpc.Client{ 642 Host: repo.PDS, 643 Client: &aqhttp.Client, 644 } 645 session, err := comatproto.ServerCreateSession(context.Background(), anonXRPCC, &comatproto.ServerCreateSession_Input{ 646 Identifier: repo.DID, 647 Password: password, 648 }) 649 if err != nil { 650 return nil, fmt.Errorf("could not create session: %w", err) 651 } 652 653 log.Warn(context.Background(), "created session for dev account", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS) 654 655 return &xrpc.Client{ 656 Host: repo.PDS, 657 Client: &aqhttp.Client, 658 Auth: &xrpc.AuthInfo{ 659 Did: repo.DID, 660 AccessJwt: session.AccessJwt, 661 RefreshJwt: session.RefreshJwt, 662 Handle: repo.Handle, 663 }, 664 }, nil 665 } 666 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID) 667 if err != nil { 668 return nil, fmt.Errorf("could not get OAuth session for repoDID: %w", err) 669 } 670 if session == nil { 671 return nil, fmt.Errorf("no session found for repoDID: %s", ss.repoDID) 672 } 673 674 session, err = ss.op.RefreshIfNeeded(session) 675 if err != nil { 676 return nil, fmt.Errorf("could not refresh session for repoDID: %w", err) 677 } 678 679 client, err := ss.op.GetXrpcClient(session) 680 if err != nil { 681 return nil, fmt.Errorf("could not get xrpc client: %w", err) 682 } 683 684 return client, nil 685} 686 687type runningMultistream struct { 688 cancel func() 689 uri string 690} 691 692// we're making an attempt here not to log (sensitive) stream keys, so we're 693// referencing by atproto URI 694func (ss *StreamSession) HandleMultistreamTargets(ctx context.Context) error { 695 ctx = log.WithLogValues(ctx, "system", "multistreaming") 696 isTrue := true 697 // {target.Uri}:{rec.Url} -> runningMultistream 698 // no concurrency issues, it's only used from this one loop 699 running := map[string]*runningMultistream{} 700 for { 701 targets, err := ss.statefulDB.ListMultistreamTargets(ss.repoDID, 100, 0, &isTrue) 702 if err != nil { 703 return fmt.Errorf("failed to list multistream targets: %w", err) 704 } 705 currentRunning := map[string]bool{} 706 for _, targetView := range targets { 707 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget) 708 if !ok { 709 log.Error(ctx, "failed to convert multistream target to streamplace multistream target", "uri", targetView.Uri) 710 continue 711 } 712 key := fmt.Sprintf("%s:%s", targetView.Uri, rec.Url) 713 if running[key] == nil { 714 childCtx, childCancel := context.WithCancel(ctx) 715 ss.Go(ctx, func() error { 716 log.Log(ctx, "starting multistream target", "uri", targetView.Uri) 717 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, "starting multistream target", "pending") 718 if err != nil { 719 log.Error(ctx, "failed to create multistream event", "error", err) 720 } 721 return ss.StartMultistreamTarget(childCtx, targetView) 722 }) 723 running[key] = &runningMultistream{ 724 cancel: childCancel, 725 uri: key, 726 } 727 } 728 currentRunning[key] = true 729 } 730 for key := range running { 731 if !currentRunning[key] { 732 log.Log(ctx, "stopping multistream target", "uri", running[key].uri) 733 running[key].cancel() 734 delete(running, key) 735 } 736 } 737 select { 738 case <-ctx.Done(): 739 return nil 740 case <-time.After(time.Second * 5): 741 continue 742 } 743 } 744} 745 746func (ss *StreamSession) StartMultistreamTarget(ctx context.Context, targetView *streamplace.MultistreamDefs_TargetView) error { 747 for { 748 err := ss.mm.RTMPPush(ctx, ss.repoDID, "source", targetView) 749 if err != nil { 750 log.Error(ctx, "failed to push to RTMP server", "error", err) 751 err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, err.Error(), "error") 752 if err != nil { 753 log.Error(ctx, "failed to create multistream event", "error", err) 754 } 755 } 756 select { 757 case <-ctx.Done(): 758 return nil 759 case <-time.After(time.Second * 5): 760 continue 761 } 762 } 763}