Live video on the AT Protocol
at eli/rtmprec 681 lines 20 kB view raw
1package director 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/util" 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/streamplace/oatproxy/pkg/oatproxy" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/aqtime" 19 "stream.place/streamplace/pkg/bus" 20 "stream.place/streamplace/pkg/config" 21 "stream.place/streamplace/pkg/livepeer" 22 "stream.place/streamplace/pkg/log" 23 "stream.place/streamplace/pkg/media" 24 "stream.place/streamplace/pkg/model" 25 "stream.place/streamplace/pkg/renditions" 26 "stream.place/streamplace/pkg/replication" 27 "stream.place/streamplace/pkg/spmetrics" 28 "stream.place/streamplace/pkg/statedb" 29 "stream.place/streamplace/pkg/streamplace" 30 "stream.place/streamplace/pkg/thumbnail" 31) 32 33type StreamSession struct { 34 mm *media.MediaManager 35 mod model.Model 36 cli *config.CLI 37 bus *bus.Bus 38 op *oatproxy.OATProxy 39 hls *media.M3U8 40 lp *livepeer.LivepeerSession 41 repoDID string 42 segmentChan chan struct{} 43 lastStatus time.Time 44 lastStatusCID *string 45 lastStatusLock sync.Mutex 46 lastOriginTime time.Time 47 lastOriginLock sync.Mutex 48 g *errgroup.Group 49 started chan struct{} 50 ctx context.Context 51 packets []bus.PacketizedSegment 52 statefulDB *statedb.StatefulDB 53 replicator replication.Replicator 54} 55 56func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { 57 ctx, cancel := context.WithCancel(ctx) 58 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc() 59 ss.g, ctx = errgroup.WithContext(ctx) 60 sid := livepeer.RandomTrailer(8) 61 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID) 62 ss.ctx = ctx 63 log.Log(ctx, "starting stream session") 64 defer cancel() 65 spseg, err := notif.Segment.ToStreamplaceSegment() 66 if err != nil { 67 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 68 } 69 var allRenditions renditions.Renditions 70 71 if ss.cli.LivepeerGatewayURL != "" { 72 allRenditions, err = renditions.GenerateRenditions(spseg) 73 } else { 74 allRenditions = []renditions.Rendition{} 75 } 76 if err != nil { 77 return err 78 } 79 if spseg.Duration == nil { 80 return fmt.Errorf("segment duration is required to calculate bitrate") 81 } 82 dur := time.Duration(*spseg.Duration) 83 byteLen := len(notif.Data) 84 bitrate := int(float64(byteLen) / dur.Seconds() * 8) 85 sourceRendition := renditions.Rendition{ 86 Name: "source", 87 Bitrate: bitrate, 88 Width: spseg.Video[0].Width, 89 Height: spseg.Video[0].Height, 90 } 91 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 92 ss.hls = media.NewM3U8(allRenditions) 93 94 // for _, r := range allRenditions { 95 // g.Go(func() error { 96 // for { 97 // if ctx.Err() != nil { 98 // return nil 99 // } 100 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls) 101 // if ctx.Err() != nil { 102 // return nil 103 // } 104 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err) 105 // time.Sleep(time.Second * 5) 106 // } 107 // }) 108 // } 109 110 close(ss.started) 111 112 for { 113 select { 114 case <-ss.segmentChan: 115 // reset timer 116 case <-ctx.Done(): 117 return ss.g.Wait() 118 // case <-time.After(time.Minute * 1): 119 case <-time.After(ss.cli.StreamSessionTimeout): 120 log.Log(ctx, "stream session timeout, shutting down", "timeout", ss.cli.StreamSessionTimeout) 121 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec() 122 for _, r := range allRenditions { 123 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 124 } 125 if notif.Local { 126 ss.Go(ctx, func() error { 127 return ss.DeleteStatus(spseg.Creator) 128 }) 129 } 130 cancel() 131 } 132 } 133} 134 135// Execute a goroutine in the context of the stream session. Errors are 136// non-fatal; if you actually want to melt the universe on an error you 137// should panic() 138func (ss *StreamSession) Go(ctx context.Context, f func() error) { 139 <-ss.started 140 ss.g.Go(func() error { 141 err := f() 142 if err != nil { 143 log.Error(ctx, "error in stream_session goroutine", "error", err) 144 } 145 return nil 146 }) 147} 148 149func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error { 150 <-ss.started 151 go func() { 152 select { 153 case <-ss.ctx.Done(): 154 return 155 case ss.segmentChan <- struct{}{}: 156 } 157 }() 158 aqt := aqtime.FromTime(notif.Segment.StartTime) 159 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 160 notif.Segment.MediaData.Size = len(notif.Data) 161 err := ss.mod.CreateSegment(notif.Segment) 162 if err != nil { 163 return fmt.Errorf("could not add segment to database: %w", err) 164 } 165 spseg, err := notif.Segment.ToStreamplaceSegment() 166 if err != nil { 167 return fmt.Errorf("could not convert segment to streamplace segment: %w", err) 168 } 169 170 ss.bus.Publish(spseg.Creator, spseg) 171 ss.Go(ctx, func() error { 172 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{ 173 Filepath: notif.Segment.ID, 174 Data: notif.Data, 175 }) 176 }) 177 178 if ss.cli.Thumbnail { 179 ss.Go(ctx, func() error { 180 return ss.Thumbnail(ctx, spseg.Creator, notif) 181 }) 182 } 183 184 if notif.Local { 185 ss.Go(ctx, func() error { 186 return ss.UpdateStatus(ctx, spseg.Creator) 187 }) 188 189 ss.Go(ctx, func() error { 190 return ss.UpdateBroadcastOrigin(ctx) 191 }) 192 } 193 194 if ss.cli.LivepeerGatewayURL != "" { 195 ss.Go(ctx, func() error { 196 start := time.Now() 197 err := ss.Transcode(ctx, spseg, notif.Data) 198 took := time.Since(start) 199 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds())) 200 return err 201 }) 202 } 203 204 // trigger a notification blast if this is a new livestream 205 if notif.Metadata.Livestream != nil { 206 ss.Go(ctx, func() error { 207 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator) 208 if err != nil { 209 return fmt.Errorf("failed to get repo: %w", err) 210 } 211 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator) 212 if err != nil { 213 return fmt.Errorf("failed to get latest livestream for repo: %w", err) 214 } 215 if livestreamModel == nil { 216 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator) 217 return nil 218 } 219 lsv, err := livestreamModel.ToLivestreamView() 220 if err != nil { 221 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err) 222 } 223 if !shouldNotify(lsv) { 224 log.Warn(ctx, "is not set to notify", "repoDID", spseg.Creator) 225 return nil 226 } 227 task := &statedb.NotificationTask{ 228 Livestream: lsv, 229 PDSURL: r.PDS, 230 } 231 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator) 232 if err != nil { 233 return fmt.Errorf("failed to get chat profile: %w", err) 234 } 235 if cp != nil { 236 spcp, err := cp.ToStreamplaceChatProfile() 237 if err != nil { 238 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 239 } 240 task.ChatProfile = spcp 241 } 242 243 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri))) 244 if err != nil { 245 log.Error(ctx, "failed to enqueue notification task", "err", err) 246 } 247 return ss.UpdateStatus(ctx, spseg.Creator) 248 }) 249 } else { 250 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator) 251 } 252 253 return nil 254} 255 256func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool { 257 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 258 if !ok { 259 return true 260 } 261 if lsvr.NotificationSettings == nil { 262 return true 263 } 264 settings := lsvr.NotificationSettings 265 if settings.PushNotification == nil { 266 return true 267 } 268 return *settings.PushNotification 269} 270 271func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error { 272 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID) 273 locked := lock.TryLock() 274 if !locked { 275 // we're already generating a thumbnail for this user, skip 276 return nil 277 } 278 defer lock.Unlock() 279 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 280 if err != nil { 281 return err 282 } 283 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute { 284 // we have a thumbnail <60sec old, skip generating a new one 285 return nil 286 } 287 r := bytes.NewReader(not.Data) 288 aqt := aqtime.FromTime(not.Segment.StartTime) 289 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg") 290 if err != nil { 291 return err 292 } 293 defer fd.Close() 294 err = media.Thumbnail(ctx, r, fd, "jpeg") 295 if err != nil { 296 return err 297 } 298 thumb := &model.Thumbnail{ 299 Format: "jpeg", 300 SegmentID: not.Segment.ID, 301 } 302 err = ss.mod.CreateThumbnail(thumb) 303 if err != nil { 304 return err 305 } 306 return nil 307} 308 309func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 310 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 311 ss.lastStatusLock.Lock() 312 defer ss.lastStatusLock.Unlock() 313 if time.Since(ss.lastStatus) < time.Minute { 314 log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 315 return nil 316 } 317 318 client, err := ss.GetClientByDID(repoDID) 319 if err != nil { 320 return fmt.Errorf("could not get xrpc client: %w", err) 321 } 322 323 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID) 324 if err != nil { 325 return fmt.Errorf("could not get latest livestream for repoDID: %w", err) 326 } 327 lsv, err := ls.ToLivestreamView() 328 if err != nil { 329 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err) 330 } 331 332 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream) 333 if !ok { 334 return fmt.Errorf("livestream is not a streamplace livestream") 335 } 336 thumb := lsvr.Thumb 337 338 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID) 339 if err != nil { 340 return fmt.Errorf("could not get repo for repoDID: %w", err) 341 } 342 343 lsr, ok := lsv.Record.Val.(*streamplace.Livestream) 344 if !ok { 345 return fmt.Errorf("livestream is not a streamplace livestream") 346 } 347 348 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle) 349 350 if lsr.CanonicalUrl != nil { 351 canonicalUrl = *lsr.CanonicalUrl 352 } 353 354 actorStatusEmbed := bsky.ActorStatus_Embed{ 355 EmbedExternal: &bsky.EmbedExternal{ 356 External: &bsky.EmbedExternal_External{ 357 Title: lsr.Title, 358 Uri: canonicalUrl, 359 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost), 360 Thumb: thumb, 361 }, 362 }, 363 } 364 365 duration := int64(120) 366 status := bsky.ActorStatus{ 367 Status: "app.bsky.actor.status#live", 368 DurationMinutes: &duration, 369 Embed: &actorStatusEmbed, 370 CreatedAt: time.Now().Format(time.RFC3339), 371 } 372 373 var swapRecord *string 374 getOutput := comatproto.RepoGetRecord_Output{} 375 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 376 "repo": repoDID, 377 "collection": "app.bsky.actor.status", 378 "rkey": "self", 379 }, nil, &getOutput) 380 if err != nil { 381 xErr, ok := err.(*xrpc.Error) 382 if !ok { 383 return fmt.Errorf("could not get record: %w", err) 384 } 385 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 386 return fmt.Errorf("could not get record: %w", err) 387 } 388 log.Debug(ctx, "record not found, creating", "repoDID", repoDID) 389 } else { 390 log.Debug(ctx, "got record", "record", getOutput) 391 swapRecord = getOutput.Cid 392 } 393 394 inp := comatproto.RepoPutRecord_Input{ 395 Collection: "app.bsky.actor.status", 396 Record: &lexutil.LexiconTypeDecoder{Val: &status}, 397 Rkey: "self", 398 Repo: repoDID, 399 SwapRecord: swapRecord, 400 } 401 out := comatproto.RepoPutRecord_Output{} 402 403 ss.lastStatusCID = &out.Cid 404 405 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 406 if err != nil { 407 return fmt.Errorf("could not create record: %w", err) 408 } 409 log.Debug(ctx, "created status record", "out", out) 410 411 ss.lastStatus = time.Now() 412 413 return nil 414} 415 416func (ss *StreamSession) DeleteStatus(repoDID string) error { 417 // need a special extra context because the stream session context is already cancelled 418 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID) 419 ss.lastStatusLock.Lock() 420 defer ss.lastStatusLock.Unlock() 421 if ss.lastStatusCID == nil { 422 log.Debug(ctx, "no status cid to delete") 423 return nil 424 } 425 inp := comatproto.RepoDeleteRecord_Input{ 426 Collection: "app.bsky.actor.status", 427 Rkey: "self", 428 Repo: repoDID, 429 } 430 inp.SwapRecord = ss.lastStatusCID 431 out := comatproto.RepoDeleteRecord_Output{} 432 433 client, err := ss.GetClientByDID(repoDID) 434 if err != nil { 435 return fmt.Errorf("could not get xrpc client: %w", err) 436 } 437 438 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out) 439 if err != nil { 440 return fmt.Errorf("could not delete record: %w", err) 441 } 442 443 ss.lastStatusCID = nil 444 return nil 445} 446 447var originUpdateInterval = time.Second * 30 448 449func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error { 450 ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 451 ss.lastOriginLock.Lock() 452 defer ss.lastOriginLock.Unlock() 453 if time.Since(ss.lastOriginTime) < originUpdateInterval { 454 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 455 return nil 456 } 457 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost) 458 origin := streamplace.BroadcastOrigin{ 459 Streamer: ss.repoDID, 460 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost), 461 Broadcaster: &broadcaster, 462 UpdatedAt: time.Now().UTC().Format(util.ISO8601), 463 } 464 err := ss.replicator.BuildOriginRecord(&origin) 465 if err != nil { 466 return fmt.Errorf("could not build origin record: %w", err) 467 } 468 469 client, err := ss.GetClientByDID(ss.repoDID) 470 if err != nil { 471 return fmt.Errorf("could not get xrpc client for repoDID: %w", err) 472 } 473 474 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost) 475 476 var swapRecord *string 477 getOutput := comatproto.RepoGetRecord_Output{} 478 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 479 "repo": ss.repoDID, 480 "collection": "place.stream.broadcast.origin", 481 "rkey": rkey, 482 }, nil, &getOutput) 483 if err != nil { 484 xErr, ok := err.(*xrpc.Error) 485 if !ok { 486 return fmt.Errorf("could not get record: %w", err) 487 } 488 if xErr.StatusCode != 400 { // yes, they return "400" for "not found" 489 return fmt.Errorf("could not get record: %w", err) 490 } 491 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID) 492 } else { 493 log.Debug(ctx, "got record", "record", getOutput) 494 swapRecord = getOutput.Cid 495 } 496 497 inp := comatproto.RepoPutRecord_Input{ 498 Collection: "place.stream.broadcast.origin", 499 Record: &lexutil.LexiconTypeDecoder{Val: &origin}, 500 Rkey: rkey, 501 Repo: ss.repoDID, 502 SwapRecord: swapRecord, 503 } 504 out := comatproto.RepoPutRecord_Output{} 505 506 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 507 if err != nil { 508 return fmt.Errorf("could not create record: %w", err) 509 } 510 511 ss.lastOriginTime = time.Now() 512 return nil 513} 514 515func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 516 rs, err := renditions.GenerateRenditions(spseg) 517 if err != nil { 518 return fmt.Errorf("failed to generated renditions: %w", err) 519 } 520 521 if ss.lp == nil { 522 var err error 523 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL) 524 if err != nil { 525 return err 526 } 527 528 } 529 spmetrics.TranscodeAttemptsTotal.Inc() 530 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs) 531 if err != nil { 532 spmetrics.TranscodeErrorsTotal.Inc() 533 return err 534 } 535 if len(rs) != len(segs) { 536 spmetrics.TranscodeErrorsTotal.Inc() 537 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs)) 538 } 539 spmetrics.TranscodeSuccessesTotal.Inc() 540 aqt, err := aqtime.FromString(spseg.StartTime) 541 if err != nil { 542 return err 543 } 544 for i, seg := range segs { 545 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name) 546 log.Debug(ctx, "publishing segment", "rendition", rs[i]) 547 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name)) 548 if err != nil { 549 return fmt.Errorf("failed to create transcoded segment file: %w", err) 550 } 551 defer fd.Close() 552 _, err = fd.Write(seg) 553 if err != nil { 554 return fmt.Errorf("failed to write transcoded segment file: %w", err) 555 } 556 ss.Go(ctx, func() error { 557 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{ 558 Filepath: fd.Name(), 559 Data: seg, 560 }) 561 }) 562 563 } 564 return nil 565} 566 567func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 568 ss.Go(ctx, func() error { 569 return ss.AddToHLS(ctx, spseg, rendition, seg.Data) 570 }) 571 ss.Go(ctx, func() error { 572 return ss.AddToWebRTC(ctx, spseg, rendition, seg) 573 }) 574 return nil 575} 576 577func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error { 578 packet, err := media.Packetize(ctx, seg) 579 if err != nil { 580 return fmt.Errorf("failed to packetize segment: %w", err) 581 } 582 seg.PacketizedData = packet 583 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg) 584 return nil 585} 586 587func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error { 588 buf := bytes.Buffer{} 589 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf) 590 if err != nil { 591 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err) 592 } 593 // newSeg := &streamplace.Segment{ 594 // LexiconTypeID: "place.stream.segment", 595 // Id: spseg.Id, 596 // Creator: spseg.Creator, 597 // StartTime: spseg.StartTime, 598 // Duration: &dur, 599 // Audio: spseg.Audio, 600 // Video: spseg.Video, 601 // SigningKey: spseg.SigningKey, 602 // } 603 aqt, err := aqtime.FromString(spseg.StartTime) 604 if err != nil { 605 return fmt.Errorf("failed to parse segment start time: %w", err) 606 } 607 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len()) 608 rend, err := ss.hls.GetRendition(rendition) 609 if err != nil { 610 return fmt.Errorf("failed to get rendition: %w", err) 611 } 612 if err := rend.NewSegment(&media.Segment{ 613 Buf: &buf, 614 Duration: time.Duration(dur), 615 Time: aqt.Time(), 616 }); err != nil { 617 return fmt.Errorf("failed to create new segment: %w", err) 618 } 619 620 return nil 621} 622 623type XRPCClient interface { 624 Do(ctx context.Context, method string, contentType string, path string, queryParams map[string]any, body any, out any) error 625} 626 627func (ss *StreamSession) GetClientByDID(did string) (XRPCClient, error) { 628 password, ok := ss.cli.DevAccountCreds[did] 629 if ok { 630 repo, err := ss.mod.GetRepoByHandleOrDID(did) 631 if err != nil { 632 return nil, fmt.Errorf("could not get repo by did: %w", err) 633 } 634 if repo == nil { 635 return nil, fmt.Errorf("repo not found for did: %s", did) 636 } 637 anonXRPCC := &xrpc.Client{ 638 Host: repo.PDS, 639 Client: &aqhttp.Client, 640 } 641 session, err := comatproto.ServerCreateSession(context.Background(), anonXRPCC, &comatproto.ServerCreateSession_Input{ 642 Identifier: repo.DID, 643 Password: password, 644 }) 645 if err != nil { 646 return nil, fmt.Errorf("could not create session: %w", err) 647 } 648 649 log.Warn(context.Background(), "created session for dev account", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS) 650 651 return &xrpc.Client{ 652 Host: repo.PDS, 653 Client: &aqhttp.Client, 654 Auth: &xrpc.AuthInfo{ 655 Did: repo.DID, 656 AccessJwt: session.AccessJwt, 657 RefreshJwt: session.RefreshJwt, 658 Handle: repo.Handle, 659 }, 660 }, nil 661 } 662 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID) 663 if err != nil { 664 return nil, fmt.Errorf("could not get OAuth session for repoDID: %w", err) 665 } 666 if session == nil { 667 return nil, fmt.Errorf("no session found for repoDID: %s", ss.repoDID) 668 } 669 670 session, err = ss.op.RefreshIfNeeded(session) 671 if err != nil { 672 return nil, fmt.Errorf("could not refresh session for repoDID: %w", err) 673 } 674 675 client, err := ss.op.GetXrpcClient(session) 676 if err != nil { 677 return nil, fmt.Errorf("could not get xrpc client: %w", err) 678 } 679 680 return client, nil 681}