Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "github.com/streamplace/oatproxy/pkg/oatproxy"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/aqtime"
17 "stream.place/streamplace/pkg/bus"
18 "stream.place/streamplace/pkg/config"
19 "stream.place/streamplace/pkg/livepeer"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/media"
22 "stream.place/streamplace/pkg/model"
23 "stream.place/streamplace/pkg/renditions"
24 "stream.place/streamplace/pkg/spmetrics"
25 "stream.place/streamplace/pkg/statedb"
26 "stream.place/streamplace/pkg/streamplace"
27 "stream.place/streamplace/pkg/thumbnail"
28)
29
30type StreamSession struct {
31 mm *media.MediaManager
32 mod model.Model
33 cli *config.CLI
34 bus *bus.Bus
35 op *oatproxy.OATProxy
36 hls *media.M3U8
37 lp *livepeer.LivepeerSession
38 repoDID string
39 segmentChan chan struct{}
40 lastStatus time.Time
41 lastStatusLock sync.Mutex
42 g *errgroup.Group
43 started chan struct{}
44 ctx context.Context
45 packets []bus.PacketizedSegment
46 statefulDB *statedb.StatefulDB
47}
48
49func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error {
50 ctx, cancel := context.WithCancel(ctx)
51 ss.g, ctx = errgroup.WithContext(ctx)
52 sid := livepeer.RandomTrailer(8)
53 ctx = log.WithLogValues(ctx, "sid", sid)
54 ss.ctx = ctx
55 log.Log(ctx, "starting stream session")
56 defer cancel()
57 spseg, err := not.Segment.ToStreamplaceSegment()
58 if err != nil {
59 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
60 }
61 var allRenditions renditions.Renditions
62
63 if ss.cli.LivepeerGatewayURL != "" {
64 allRenditions, err = renditions.GenerateRenditions(spseg)
65 } else {
66 allRenditions = []renditions.Rendition{}
67 }
68 if err != nil {
69 return err
70 }
71 if spseg.Duration == nil {
72 return fmt.Errorf("segment duration is required to calculate bitrate")
73 }
74 dur := time.Duration(*spseg.Duration)
75 byteLen := len(not.Data)
76 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
77 sourceRendition := renditions.Rendition{
78 Name: "source",
79 Bitrate: bitrate,
80 Width: spseg.Video[0].Width,
81 Height: spseg.Video[0].Height,
82 }
83 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
84 ss.hls = media.NewM3U8(allRenditions)
85
86 // for _, r := range allRenditions {
87 // g.Go(func() error {
88 // for {
89 // if ctx.Err() != nil {
90 // return nil
91 // }
92 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
93 // if ctx.Err() != nil {
94 // return nil
95 // }
96 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
97 // time.Sleep(time.Second * 5)
98 // }
99 // })
100 // }
101
102 close(ss.started)
103
104 for {
105 select {
106 case <-ss.segmentChan:
107 // reset timer
108 case <-ctx.Done():
109 return ss.g.Wait()
110 // case <-time.After(time.Minute * 1):
111 case <-time.After(time.Second * 60):
112 log.Log(ctx, "no new segments for 1 minute, shutting down")
113 for _, r := range allRenditions {
114 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
115 }
116 cancel()
117 }
118 }
119}
120
121// Execute a goroutine in the context of the stream session. Errors are
122// non-fatal; if you actually want to melt the universe on an error you
123// should panic()
124func (ss *StreamSession) Go(ctx context.Context, f func() error) {
125 <-ss.started
126 ss.g.Go(func() error {
127 err := f()
128 if err != nil {
129 log.Error(ctx, "error in goroutine", "error", err)
130 }
131 return nil
132 })
133}
134
135func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error {
136 <-ss.started
137 go func() {
138 select {
139 case <-ss.ctx.Done():
140 return
141 case ss.segmentChan <- struct{}{}:
142 }
143 }()
144 aqt := aqtime.FromTime(not.Segment.StartTime)
145 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString())
146 not.Segment.MediaData.Size = len(not.Data)
147 err := ss.mod.CreateSegment(not.Segment)
148 if err != nil {
149 return fmt.Errorf("could not add segment to database: %w", err)
150 }
151 spseg, err := not.Segment.ToStreamplaceSegment()
152 if err != nil {
153 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
154 }
155
156 ss.bus.Publish(spseg.Creator, spseg)
157 ss.Go(ctx, func() error {
158 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
159 Filepath: not.Segment.ID,
160 Data: not.Data,
161 })
162 })
163
164 if ss.cli.Thumbnail {
165 ss.Go(ctx, func() error {
166 return ss.Thumbnail(ctx, spseg.Creator, not)
167 })
168 }
169
170 ss.Go(ctx, func() error {
171 return ss.UpdateStatus(ctx, spseg.Creator)
172 })
173
174 if ss.cli.LivepeerGatewayURL != "" {
175 ss.Go(ctx, func() error {
176 start := time.Now()
177 err := ss.Transcode(ctx, spseg, not.Data)
178 took := time.Since(start)
179 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
180 return err
181 })
182 }
183
184 return nil
185}
186
187func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
188 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
189 locked := lock.TryLock()
190 if !locked {
191 // we're already generating a thumbnail for this user, skip
192 return nil
193 }
194 defer lock.Unlock()
195 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
196 if err != nil {
197 return err
198 }
199 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
200 // we have a thumbnail <60sec old, skip generating a new one
201 return nil
202 }
203 r := bytes.NewReader(not.Data)
204 aqt := aqtime.FromTime(not.Segment.StartTime)
205 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
206 if err != nil {
207 return err
208 }
209 defer fd.Close()
210 err = media.Thumbnail(ctx, r, fd, "jpeg")
211 if err != nil {
212 return err
213 }
214 thumb := &model.Thumbnail{
215 Format: "jpeg",
216 SegmentID: not.Segment.ID,
217 }
218 err = ss.mod.CreateThumbnail(thumb)
219 if err != nil {
220 return err
221 }
222 return nil
223}
224
225func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) {
226 if pv == nil {
227 return nil, fmt.Errorf("post view is nil")
228 }
229 rec, ok := pv.Record.Val.(*bsky.FeedPost)
230 if !ok {
231 return nil, fmt.Errorf("post view record is not a feed post")
232 }
233 if rec.Embed == nil {
234 return nil, fmt.Errorf("post view embed is nil")
235 }
236 if rec.Embed.EmbedExternal == nil {
237 return nil, fmt.Errorf("post view embed external view is nil")
238 }
239 if rec.Embed.EmbedExternal.External == nil {
240 return nil, fmt.Errorf("post view embed external is nil")
241 }
242 if rec.Embed.EmbedExternal.External.Thumb == nil {
243 return nil, fmt.Errorf("post view embed external thumb is nil")
244 }
245 return rec.Embed.EmbedExternal.External.Thumb, nil
246}
247
248func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
249 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
250 ss.lastStatusLock.Lock()
251 defer ss.lastStatusLock.Unlock()
252 if time.Since(ss.lastStatus) < time.Minute {
253 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
254 return nil
255 }
256
257 session, err := ss.statefulDB.GetSessionByDID(repoDID)
258 if err != nil {
259 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
260 }
261 if session == nil {
262 return fmt.Errorf("no session found for repoDID: %s", repoDID)
263 }
264
265 session, err = ss.op.RefreshIfNeeded(session)
266 if err != nil {
267 return fmt.Errorf("could not refresh session for repoDID: %w", err)
268 }
269
270 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
271 if err != nil {
272 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
273 }
274 lsv, err := ls.ToLivestreamView()
275 if err != nil {
276 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
277 }
278
279 post, err := ss.mod.GetFeedPost(ls.PostCID)
280 if err != nil {
281 return fmt.Errorf("could not get feed post: %w", err)
282 }
283 if post == nil {
284 return fmt.Errorf("feed post not found for livestream: %w", err)
285 }
286 postView, err := post.ToBskyPostView()
287 if err != nil {
288 return fmt.Errorf("could not convert feed post to bsky post view: %w", err)
289 }
290 thumb, err := getThumbnailCID(postView)
291 if err != nil {
292 return fmt.Errorf("could not get thumbnail cid: %w", err)
293 }
294
295 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
296 if err != nil {
297 return fmt.Errorf("could not get repo for repoDID: %w", err)
298 }
299
300 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
301 if !ok {
302 return fmt.Errorf("livestream is not a streamplace livestream")
303 }
304
305 canonicalUrl := fmt.Sprintf("https://%s/%s", ss.cli.PublicHost, repo.Handle)
306
307 if lsr.CanonicalUrl != nil {
308 canonicalUrl = *lsr.CanonicalUrl
309 }
310
311 actorStatusEmbed := bsky.ActorStatus_Embed{
312 EmbedExternal: &bsky.EmbedExternal{
313 External: &bsky.EmbedExternal_External{
314 Title: lsr.Title,
315 Uri: canonicalUrl,
316 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.PublicHost),
317 Thumb: thumb,
318 },
319 },
320 }
321
322 duration := int64(2)
323 status := bsky.ActorStatus{
324 Status: "app.bsky.actor.status#live",
325 DurationMinutes: &duration,
326 Embed: &actorStatusEmbed,
327 CreatedAt: time.Now().Format(time.RFC3339),
328 }
329
330 client, err := ss.op.GetXrpcClient(session)
331 if err != nil {
332 return fmt.Errorf("could not get xrpc client: %w", err)
333 }
334
335 var swapRecord *string
336 getOutput := atproto.RepoGetRecord_Output{}
337 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
338 "repo": repoDID,
339 "collection": "app.bsky.actor.status",
340 "rkey": "self",
341 }, nil, &getOutput)
342 if err != nil {
343 xErr, ok := err.(*xrpc.Error)
344 if !ok {
345 return fmt.Errorf("could not get record: %w", err)
346 }
347 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
348 return fmt.Errorf("could not get record: %w", err)
349 }
350 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
351 } else {
352 log.Debug(ctx, "got record", "record", getOutput)
353 swapRecord = getOutput.Cid
354 }
355
356 inp := atproto.RepoPutRecord_Input{
357 Collection: "app.bsky.actor.status",
358 Record: &util.LexiconTypeDecoder{Val: &status},
359 Rkey: "self",
360 Repo: repoDID,
361 SwapRecord: swapRecord,
362 }
363 out := atproto.RepoPutRecord_Output{}
364
365 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
366 if err != nil {
367 return fmt.Errorf("could not create record: %w", err)
368 }
369 log.Debug(ctx, "created status record", "out", out)
370
371 ss.lastStatus = time.Now()
372
373 return nil
374}
375
376func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
377 rs, err := renditions.GenerateRenditions(spseg)
378 if err != nil {
379 return fmt.Errorf("failed to generated renditions: %w", err)
380 }
381
382 if ss.lp == nil {
383 var err error
384 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL)
385 if err != nil {
386 return err
387 }
388
389 }
390 spmetrics.TranscodeAttemptsTotal.Inc()
391 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
392 if err != nil {
393 spmetrics.TranscodeErrorsTotal.Inc()
394 return err
395 }
396 if len(rs) != len(segs) {
397 spmetrics.TranscodeErrorsTotal.Inc()
398 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
399 }
400 spmetrics.TranscodeSuccessesTotal.Inc()
401 aqt, err := aqtime.FromString(spseg.StartTime)
402 if err != nil {
403 return err
404 }
405 for i, seg := range segs {
406 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
407 log.Debug(ctx, "publishing segment", "rendition", rs[i])
408 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
409 if err != nil {
410 return fmt.Errorf("failed to create transcoded segment file: %w", err)
411 }
412 defer fd.Close()
413 _, err = fd.Write(seg)
414 if err != nil {
415 return fmt.Errorf("failed to write transcoded segment file: %w", err)
416 }
417 ss.Go(ctx, func() error {
418 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
419 Filepath: fd.Name(),
420 Data: seg,
421 })
422 })
423
424 }
425 return nil
426}
427
428func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
429 ss.Go(ctx, func() error {
430 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
431 })
432 ss.Go(ctx, func() error {
433 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
434 })
435 return nil
436}
437
438func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
439 packet, err := media.Packetize(ctx, seg)
440 if err != nil {
441 return fmt.Errorf("failed to packetize segment: %w", err)
442 }
443 seg.PacketizedData = packet
444 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
445 return nil
446}
447
448func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
449 buf := bytes.Buffer{}
450 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
451 if err != nil {
452 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
453 }
454 // newSeg := &streamplace.Segment{
455 // LexiconTypeID: "place.stream.segment",
456 // Id: spseg.Id,
457 // Creator: spseg.Creator,
458 // StartTime: spseg.StartTime,
459 // Duration: &dur,
460 // Audio: spseg.Audio,
461 // Video: spseg.Video,
462 // SigningKey: spseg.SigningKey,
463 // }
464 aqt, err := aqtime.FromString(spseg.StartTime)
465 if err != nil {
466 return fmt.Errorf("failed to parse segment start time: %w", err)
467 }
468 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
469 rend, err := ss.hls.GetRendition(rendition)
470 if err != nil {
471 return fmt.Errorf("failed to get rendition: %w", err)
472 }
473 if err := rend.NewSegment(&media.Segment{
474 Buf: &buf,
475 Duration: time.Duration(dur),
476 Time: aqt.Time(),
477 }); err != nil {
478 return fmt.Errorf("failed to create new segment: %w", err)
479 }
480
481 return nil
482}