Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/util"
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/streamplace/oatproxy/pkg/oatproxy"
16 "golang.org/x/sync/errgroup"
17 "stream.place/streamplace/pkg/aqhttp"
18 "stream.place/streamplace/pkg/aqtime"
19 "stream.place/streamplace/pkg/bus"
20 "stream.place/streamplace/pkg/config"
21 "stream.place/streamplace/pkg/livepeer"
22 "stream.place/streamplace/pkg/log"
23 "stream.place/streamplace/pkg/media"
24 "stream.place/streamplace/pkg/model"
25 "stream.place/streamplace/pkg/renditions"
26 "stream.place/streamplace/pkg/replication"
27 "stream.place/streamplace/pkg/spmetrics"
28 "stream.place/streamplace/pkg/statedb"
29 "stream.place/streamplace/pkg/streamplace"
30 "stream.place/streamplace/pkg/thumbnail"
31)
32
33type StreamSession struct {
34 mm *media.MediaManager
35 mod model.Model
36 cli *config.CLI
37 bus *bus.Bus
38 op *oatproxy.OATProxy
39 hls *media.M3U8
40 lp *livepeer.LivepeerSession
41 repoDID string
42 segmentChan chan struct{}
43 lastStatus time.Time
44 lastStatusCID *string
45 lastStatusLock sync.Mutex
46 lastOriginTime time.Time
47 lastOriginLock sync.Mutex
48 g *errgroup.Group
49 started chan struct{}
50 ctx context.Context
51 packets []bus.PacketizedSegment
52 statefulDB *statedb.StatefulDB
53 replicator replication.Replicator
54}
55
56func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error {
57 ctx, cancel := context.WithCancel(ctx)
58 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Inc()
59 ss.g, ctx = errgroup.WithContext(ctx)
60 sid := livepeer.RandomTrailer(8)
61 ctx = log.WithLogValues(ctx, "sid", sid, "streamer", notif.Segment.RepoDID)
62 ss.ctx = ctx
63 log.Log(ctx, "starting stream session")
64 defer cancel()
65 spseg, err := notif.Segment.ToStreamplaceSegment()
66 if err != nil {
67 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
68 }
69 var allRenditions renditions.Renditions
70
71 if ss.cli.LivepeerGatewayURL != "" {
72 allRenditions, err = renditions.GenerateRenditions(spseg)
73 } else {
74 allRenditions = []renditions.Rendition{}
75 }
76 if err != nil {
77 return err
78 }
79 if spseg.Duration == nil {
80 return fmt.Errorf("segment duration is required to calculate bitrate")
81 }
82 dur := time.Duration(*spseg.Duration)
83 byteLen := len(notif.Data)
84 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
85 sourceRendition := renditions.Rendition{
86 Name: "source",
87 Bitrate: bitrate,
88 Width: spseg.Video[0].Width,
89 Height: spseg.Video[0].Height,
90 }
91 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
92 ss.hls = media.NewM3U8(allRenditions)
93
94 // for _, r := range allRenditions {
95 // g.Go(func() error {
96 // for {
97 // if ctx.Err() != nil {
98 // return nil
99 // }
100 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
101 // if ctx.Err() != nil {
102 // return nil
103 // }
104 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
105 // time.Sleep(time.Second * 5)
106 // }
107 // })
108 // }
109
110 close(ss.started)
111
112 for {
113 select {
114 case <-ss.segmentChan:
115 // reset timer
116 case <-ctx.Done():
117 return ss.g.Wait()
118 // case <-time.After(time.Minute * 1):
119 case <-time.After(ss.cli.StreamSessionTimeout):
120 log.Log(ctx, "stream session timeout, shutting down", "timeout", ss.cli.StreamSessionTimeout)
121 spmetrics.StreamSessions.WithLabelValues(notif.Segment.RepoDID).Dec()
122 for _, r := range allRenditions {
123 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
124 }
125 if notif.Local {
126 ss.Go(ctx, func() error {
127 return ss.DeleteStatus(spseg.Creator)
128 })
129 }
130 cancel()
131 }
132 }
133}
134
135// Execute a goroutine in the context of the stream session. Errors are
136// non-fatal; if you actually want to melt the universe on an error you
137// should panic()
138func (ss *StreamSession) Go(ctx context.Context, f func() error) {
139 <-ss.started
140 ss.g.Go(func() error {
141 err := f()
142 if err != nil {
143 log.Error(ctx, "error in stream_session goroutine", "error", err)
144 }
145 return nil
146 })
147}
148
149func (ss *StreamSession) NewSegment(ctx context.Context, notif *media.NewSegmentNotification) error {
150 <-ss.started
151 go func() {
152 select {
153 case <-ss.ctx.Done():
154 return
155 case ss.segmentChan <- struct{}{}:
156 }
157 }()
158 aqt := aqtime.FromTime(notif.Segment.StartTime)
159 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString())
160 notif.Segment.MediaData.Size = len(notif.Data)
161 err := ss.mod.CreateSegment(notif.Segment)
162 if err != nil {
163 return fmt.Errorf("could not add segment to database: %w", err)
164 }
165 spseg, err := notif.Segment.ToStreamplaceSegment()
166 if err != nil {
167 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
168 }
169
170 ss.bus.Publish(spseg.Creator, spseg)
171 ss.Go(ctx, func() error {
172 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
173 Filepath: notif.Segment.ID,
174 Data: notif.Data,
175 })
176 })
177
178 if ss.cli.Thumbnail {
179 ss.Go(ctx, func() error {
180 return ss.Thumbnail(ctx, spseg.Creator, notif)
181 })
182 }
183
184 if notif.Local {
185 ss.Go(ctx, func() error {
186 return ss.UpdateStatus(ctx, spseg.Creator)
187 })
188
189 ss.Go(ctx, func() error {
190 return ss.UpdateBroadcastOrigin(ctx)
191 })
192 }
193
194 if ss.cli.LivepeerGatewayURL != "" {
195 ss.Go(ctx, func() error {
196 start := time.Now()
197 err := ss.Transcode(ctx, spseg, notif.Data)
198 took := time.Since(start)
199 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
200 return err
201 })
202 }
203
204 // trigger a notification blast if this is a new livestream
205 if notif.Metadata.Livestream != nil {
206 ss.Go(ctx, func() error {
207 r, err := ss.mod.GetRepoByHandleOrDID(spseg.Creator)
208 if err != nil {
209 return fmt.Errorf("failed to get repo: %w", err)
210 }
211 livestreamModel, err := ss.mod.GetLatestLivestreamForRepo(spseg.Creator)
212 if err != nil {
213 return fmt.Errorf("failed to get latest livestream for repo: %w", err)
214 }
215 if livestreamModel == nil {
216 log.Warn(ctx, "no livestream found, skipping notification blast", "repoDID", spseg.Creator)
217 return nil
218 }
219 lsv, err := livestreamModel.ToLivestreamView()
220 if err != nil {
221 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err)
222 }
223 if !shouldNotify(lsv) {
224 log.Warn(ctx, "is not set to notify", "repoDID", spseg.Creator)
225 return nil
226 }
227 task := &statedb.NotificationTask{
228 Livestream: lsv,
229 PDSURL: r.PDS,
230 }
231 cp, err := ss.mod.GetChatProfile(ctx, spseg.Creator)
232 if err != nil {
233 return fmt.Errorf("failed to get chat profile: %w", err)
234 }
235 if cp != nil {
236 spcp, err := cp.ToStreamplaceChatProfile()
237 if err != nil {
238 return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err)
239 }
240 task.ChatProfile = spcp
241 }
242
243 _, err = ss.statefulDB.EnqueueTask(ctx, statedb.TaskNotification, task, statedb.WithTaskKey(fmt.Sprintf("notification-blast::%s", lsv.Uri)))
244 if err != nil {
245 log.Error(ctx, "failed to enqueue notification task", "err", err)
246 }
247 return ss.UpdateStatus(ctx, spseg.Creator)
248 })
249 } else {
250 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator)
251 }
252
253 return nil
254}
255
256func shouldNotify(lsv *streamplace.Livestream_LivestreamView) bool {
257 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
258 if !ok {
259 return true
260 }
261 if lsvr.NotificationSettings == nil {
262 return true
263 }
264 settings := lsvr.NotificationSettings
265 if settings.PushNotification == nil {
266 return true
267 }
268 return *settings.PushNotification
269}
270
271func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
272 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
273 locked := lock.TryLock()
274 if !locked {
275 // we're already generating a thumbnail for this user, skip
276 return nil
277 }
278 defer lock.Unlock()
279 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
280 if err != nil {
281 return err
282 }
283 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
284 // we have a thumbnail <60sec old, skip generating a new one
285 return nil
286 }
287 r := bytes.NewReader(not.Data)
288 aqt := aqtime.FromTime(not.Segment.StartTime)
289 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
290 if err != nil {
291 return err
292 }
293 defer fd.Close()
294 err = media.Thumbnail(ctx, r, fd, "jpeg")
295 if err != nil {
296 return err
297 }
298 thumb := &model.Thumbnail{
299 Format: "jpeg",
300 SegmentID: not.Segment.ID,
301 }
302 err = ss.mod.CreateThumbnail(thumb)
303 if err != nil {
304 return err
305 }
306 return nil
307}
308
309func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
310 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
311 ss.lastStatusLock.Lock()
312 defer ss.lastStatusLock.Unlock()
313 if time.Since(ss.lastStatus) < time.Minute {
314 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
315 return nil
316 }
317
318 client, err := ss.GetClientByDID(repoDID)
319 if err != nil {
320 return fmt.Errorf("could not get xrpc client: %w", err)
321 }
322
323 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
324 if err != nil {
325 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
326 }
327 lsv, err := ls.ToLivestreamView()
328 if err != nil {
329 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
330 }
331
332 lsvr, ok := lsv.Record.Val.(*streamplace.Livestream)
333 if !ok {
334 return fmt.Errorf("livestream is not a streamplace livestream")
335 }
336 thumb := lsvr.Thumb
337
338 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
339 if err != nil {
340 return fmt.Errorf("could not get repo for repoDID: %w", err)
341 }
342
343 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
344 if !ok {
345 return fmt.Errorf("livestream is not a streamplace livestream")
346 }
347
348 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.BroadcasterHost, repo.Handle)
349
350 if lsr.CanonicalUrl != nil {
351 canonicalUrl = *lsr.CanonicalUrl
352 }
353
354 actorStatusEmbed := bsky.ActorStatus_Embed{
355 EmbedExternal: &bsky.EmbedExternal{
356 External: &bsky.EmbedExternal_External{
357 Title: lsr.Title,
358 Uri: canonicalUrl,
359 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.BroadcasterHost),
360 Thumb: thumb,
361 },
362 },
363 }
364
365 duration := int64(120)
366 status := bsky.ActorStatus{
367 Status: "app.bsky.actor.status#live",
368 DurationMinutes: &duration,
369 Embed: &actorStatusEmbed,
370 CreatedAt: time.Now().Format(time.RFC3339),
371 }
372
373 var swapRecord *string
374 getOutput := comatproto.RepoGetRecord_Output{}
375 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
376 "repo": repoDID,
377 "collection": "app.bsky.actor.status",
378 "rkey": "self",
379 }, nil, &getOutput)
380 if err != nil {
381 xErr, ok := err.(*xrpc.Error)
382 if !ok {
383 return fmt.Errorf("could not get record: %w", err)
384 }
385 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
386 return fmt.Errorf("could not get record: %w", err)
387 }
388 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
389 } else {
390 log.Debug(ctx, "got record", "record", getOutput)
391 swapRecord = getOutput.Cid
392 }
393
394 inp := comatproto.RepoPutRecord_Input{
395 Collection: "app.bsky.actor.status",
396 Record: &lexutil.LexiconTypeDecoder{Val: &status},
397 Rkey: "self",
398 Repo: repoDID,
399 SwapRecord: swapRecord,
400 }
401 out := comatproto.RepoPutRecord_Output{}
402
403 ss.lastStatusCID = &out.Cid
404
405 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
406 if err != nil {
407 return fmt.Errorf("could not create record: %w", err)
408 }
409 log.Debug(ctx, "created status record", "out", out)
410
411 ss.lastStatus = time.Now()
412
413 return nil
414}
415
416func (ss *StreamSession) DeleteStatus(repoDID string) error {
417 // need a special extra context because the stream session context is already cancelled
418 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID)
419 ss.lastStatusLock.Lock()
420 defer ss.lastStatusLock.Unlock()
421 if ss.lastStatusCID == nil {
422 log.Debug(ctx, "no status cid to delete")
423 return nil
424 }
425 inp := comatproto.RepoDeleteRecord_Input{
426 Collection: "app.bsky.actor.status",
427 Rkey: "self",
428 Repo: repoDID,
429 }
430 inp.SwapRecord = ss.lastStatusCID
431 out := comatproto.RepoDeleteRecord_Output{}
432
433 client, err := ss.GetClientByDID(repoDID)
434 if err != nil {
435 return fmt.Errorf("could not get xrpc client: %w", err)
436 }
437
438 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.deleteRecord", map[string]any{}, inp, &out)
439 if err != nil {
440 return fmt.Errorf("could not delete record: %w", err)
441 }
442
443 ss.lastStatusCID = nil
444 return nil
445}
446
447var originUpdateInterval = time.Second * 30
448
449func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error {
450 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
451 ss.lastOriginLock.Lock()
452 defer ss.lastOriginLock.Unlock()
453 if time.Since(ss.lastOriginTime) < originUpdateInterval {
454 log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago")
455 return nil
456 }
457 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost)
458 origin := streamplace.BroadcastOrigin{
459 Streamer: ss.repoDID,
460 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost),
461 Broadcaster: &broadcaster,
462 UpdatedAt: time.Now().UTC().Format(util.ISO8601),
463 }
464 err := ss.replicator.BuildOriginRecord(&origin)
465 if err != nil {
466 return fmt.Errorf("could not build origin record: %w", err)
467 }
468
469 client, err := ss.GetClientByDID(ss.repoDID)
470 if err != nil {
471 return fmt.Errorf("could not get xrpc client for repoDID: %w", err)
472 }
473
474 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost)
475
476 var swapRecord *string
477 getOutput := comatproto.RepoGetRecord_Output{}
478 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
479 "repo": ss.repoDID,
480 "collection": "place.stream.broadcast.origin",
481 "rkey": rkey,
482 }, nil, &getOutput)
483 if err != nil {
484 xErr, ok := err.(*xrpc.Error)
485 if !ok {
486 return fmt.Errorf("could not get record: %w", err)
487 }
488 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
489 return fmt.Errorf("could not get record: %w", err)
490 }
491 log.Debug(ctx, "record not found, creating", "repoDID", ss.repoDID)
492 } else {
493 log.Debug(ctx, "got record", "record", getOutput)
494 swapRecord = getOutput.Cid
495 }
496
497 inp := comatproto.RepoPutRecord_Input{
498 Collection: "place.stream.broadcast.origin",
499 Record: &lexutil.LexiconTypeDecoder{Val: &origin},
500 Rkey: rkey,
501 Repo: ss.repoDID,
502 SwapRecord: swapRecord,
503 }
504 out := comatproto.RepoPutRecord_Output{}
505
506 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
507 if err != nil {
508 return fmt.Errorf("could not create record: %w", err)
509 }
510
511 ss.lastOriginTime = time.Now()
512 return nil
513}
514
515func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
516 rs, err := renditions.GenerateRenditions(spseg)
517 if err != nil {
518 return fmt.Errorf("failed to generated renditions: %w", err)
519 }
520
521 if ss.lp == nil {
522 var err error
523 ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL)
524 if err != nil {
525 return err
526 }
527
528 }
529 spmetrics.TranscodeAttemptsTotal.Inc()
530 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
531 if err != nil {
532 spmetrics.TranscodeErrorsTotal.Inc()
533 return err
534 }
535 if len(rs) != len(segs) {
536 spmetrics.TranscodeErrorsTotal.Inc()
537 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
538 }
539 spmetrics.TranscodeSuccessesTotal.Inc()
540 aqt, err := aqtime.FromString(spseg.StartTime)
541 if err != nil {
542 return err
543 }
544 for i, seg := range segs {
545 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
546 log.Debug(ctx, "publishing segment", "rendition", rs[i])
547 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
548 if err != nil {
549 return fmt.Errorf("failed to create transcoded segment file: %w", err)
550 }
551 defer fd.Close()
552 _, err = fd.Write(seg)
553 if err != nil {
554 return fmt.Errorf("failed to write transcoded segment file: %w", err)
555 }
556 ss.Go(ctx, func() error {
557 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
558 Filepath: fd.Name(),
559 Data: seg,
560 })
561 })
562
563 }
564 return nil
565}
566
567func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
568 ss.Go(ctx, func() error {
569 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
570 })
571 ss.Go(ctx, func() error {
572 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
573 })
574 return nil
575}
576
577func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
578 packet, err := media.Packetize(ctx, seg)
579 if err != nil {
580 return fmt.Errorf("failed to packetize segment: %w", err)
581 }
582 seg.PacketizedData = packet
583 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
584 return nil
585}
586
587func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
588 buf := bytes.Buffer{}
589 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
590 if err != nil {
591 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
592 }
593 // newSeg := &streamplace.Segment{
594 // LexiconTypeID: "place.stream.segment",
595 // Id: spseg.Id,
596 // Creator: spseg.Creator,
597 // StartTime: spseg.StartTime,
598 // Duration: &dur,
599 // Audio: spseg.Audio,
600 // Video: spseg.Video,
601 // SigningKey: spseg.SigningKey,
602 // }
603 aqt, err := aqtime.FromString(spseg.StartTime)
604 if err != nil {
605 return fmt.Errorf("failed to parse segment start time: %w", err)
606 }
607 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
608 rend, err := ss.hls.GetRendition(rendition)
609 if err != nil {
610 return fmt.Errorf("failed to get rendition: %w", err)
611 }
612 if err := rend.NewSegment(&media.Segment{
613 Buf: &buf,
614 Duration: time.Duration(dur),
615 Time: aqt.Time(),
616 }); err != nil {
617 return fmt.Errorf("failed to create new segment: %w", err)
618 }
619
620 return nil
621}
622
623type XRPCClient interface {
624 Do(ctx context.Context, method string, contentType string, path string, queryParams map[string]any, body any, out any) error
625}
626
627func (ss *StreamSession) GetClientByDID(did string) (XRPCClient, error) {
628 password, ok := ss.cli.DevAccountCreds[did]
629 if ok {
630 repo, err := ss.mod.GetRepoByHandleOrDID(did)
631 if err != nil {
632 return nil, fmt.Errorf("could not get repo by did: %w", err)
633 }
634 if repo == nil {
635 return nil, fmt.Errorf("repo not found for did: %s", did)
636 }
637 anonXRPCC := &xrpc.Client{
638 Host: repo.PDS,
639 Client: &aqhttp.Client,
640 }
641 session, err := comatproto.ServerCreateSession(context.Background(), anonXRPCC, &comatproto.ServerCreateSession_Input{
642 Identifier: repo.DID,
643 Password: password,
644 })
645 if err != nil {
646 return nil, fmt.Errorf("could not create session: %w", err)
647 }
648
649 log.Warn(context.Background(), "created session for dev account", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS)
650
651 return &xrpc.Client{
652 Host: repo.PDS,
653 Client: &aqhttp.Client,
654 Auth: &xrpc.AuthInfo{
655 Did: repo.DID,
656 AccessJwt: session.AccessJwt,
657 RefreshJwt: session.RefreshJwt,
658 Handle: repo.Handle,
659 },
660 }, nil
661 }
662 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID)
663 if err != nil {
664 return nil, fmt.Errorf("could not get OAuth session for repoDID: %w", err)
665 }
666 if session == nil {
667 return nil, fmt.Errorf("no session found for repoDID: %s", ss.repoDID)
668 }
669
670 session, err = ss.op.RefreshIfNeeded(session)
671 if err != nil {
672 return nil, fmt.Errorf("could not refresh session for repoDID: %w", err)
673 }
674
675 client, err := ss.op.GetXrpcClient(session)
676 if err != nil {
677 return nil, fmt.Errorf("could not get xrpc client: %w", err)
678 }
679
680 return client, nil
681}