Live video on the AT Protocol
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "github.com/streamplace/oatproxy/pkg/oatproxy"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/aqtime"
17 "stream.place/streamplace/pkg/bus"
18 "stream.place/streamplace/pkg/config"
19 "stream.place/streamplace/pkg/livepeer"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/media"
22 "stream.place/streamplace/pkg/media/segchanman"
23 "stream.place/streamplace/pkg/model"
24 "stream.place/streamplace/pkg/renditions"
25 "stream.place/streamplace/pkg/spmetrics"
26 "stream.place/streamplace/pkg/streamplace"
27 "stream.place/streamplace/pkg/thumbnail"
28)
29
30type StreamSession struct {
31 mm *media.MediaManager
32 mod model.Model
33 cli *config.CLI
34 bus *bus.Bus
35 op *oatproxy.OATProxy
36 hls *media.M3U8
37 lp *livepeer.LivepeerSession
38 repoDID string
39 segmentChan chan struct{}
40 lastStatus time.Time
41 lastStatusLock sync.Mutex
42}
43
44func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error {
45
46 sid := livepeer.RandomTrailer(8)
47 ctx = log.WithLogValues(ctx, "sid", sid)
48 ctx, cancel := context.WithCancel(ctx)
49 log.Log(ctx, "starting stream session")
50 defer cancel()
51 spseg, err := not.Segment.ToStreamplaceSegment()
52 if err != nil {
53 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
54 }
55 var allRenditions renditions.Renditions
56
57 if ss.cli.LivepeerGatewayURL != "" {
58 allRenditions, err = renditions.GenerateRenditions(spseg)
59 } else {
60 allRenditions = []renditions.Rendition{}
61 }
62 if err != nil {
63 return err
64 }
65 if spseg.Duration == nil {
66 return fmt.Errorf("segment duration is required to calculate bitrate")
67 }
68 dur := time.Duration(*spseg.Duration)
69 byteLen := len(not.Data)
70 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
71 sourceRendition := renditions.Rendition{
72 Name: "source",
73 Bitrate: bitrate,
74 Width: spseg.Video[0].Width,
75 Height: spseg.Video[0].Height,
76 }
77 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
78 ss.hls = media.NewM3U8(allRenditions)
79
80 g, ctx := errgroup.WithContext(ctx)
81
82 // for _, r := range allRenditions {
83 // g.Go(func() error {
84 // for {
85 // if ctx.Err() != nil {
86 // return nil
87 // }
88 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
89 // if ctx.Err() != nil {
90 // return nil
91 // }
92 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
93 // time.Sleep(time.Second * 5)
94 // }
95 // })
96 // }
97
98 for {
99 select {
100 case <-ss.segmentChan:
101 // reset timer
102 case <-ctx.Done():
103 return g.Wait()
104 // case <-time.After(time.Minute * 1):
105 case <-time.After(time.Second * 60):
106 log.Log(ctx, "no new segments for 1 minute, shutting down")
107 cancel()
108 }
109 }
110}
111
112func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error {
113 if ctx.Err() != nil {
114 return nil
115 }
116 ss.segmentChan <- struct{}{}
117 aqt := aqtime.FromTime(not.Segment.StartTime)
118 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString())
119 err := ss.mod.CreateSegment(not.Segment)
120 if err != nil {
121 return fmt.Errorf("could not add segment to database: %w", err)
122 }
123 spseg, err := not.Segment.ToStreamplaceSegment()
124 if err != nil {
125 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
126 }
127
128 ss.bus.Publish(spseg.Creator, spseg)
129 go ss.TryAddToHLS(ctx, spseg, "source", not.Data)
130
131 if ss.cli.Thumbnail {
132 go func() {
133 err := ss.Thumbnail(ctx, spseg.Creator, not)
134 if err != nil {
135 log.Error(ctx, "could not create thumbnail", "error", err)
136 }
137 }()
138 }
139
140 go func() {
141 err := ss.UpdateStatus(ctx, spseg.Creator)
142 if err != nil {
143 log.Error(ctx, "could not update status", "error", err)
144 }
145 }()
146
147 if ss.cli.LivepeerGatewayURL != "" {
148 go func() {
149 start := time.Now()
150 err := ss.Transcode(ctx, spseg, not.Data)
151 took := time.Since(start)
152 if err != nil {
153 log.Error(ctx, "could not transcode", "error", err, "took", took)
154 } else {
155 log.Log(ctx, "transcoded segment", "took", took)
156 }
157 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(time.Since(start).Milliseconds()))
158 }()
159 }
160
161 return nil
162}
163
164func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
165 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
166 locked := lock.TryLock()
167 if !locked {
168 // we're already generating a thumbnail for this user, skip
169 return nil
170 }
171 defer lock.Unlock()
172 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
173 if err != nil {
174 return err
175 }
176 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
177 // we have a thumbnail <60sec old, skip generating a new one
178 return nil
179 }
180 r := bytes.NewReader(not.Data)
181 aqt := aqtime.FromTime(not.Segment.StartTime)
182 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "png")
183 if err != nil {
184 return err
185 }
186 defer fd.Close()
187 err = media.Thumbnail(ctx, r, fd)
188 if err != nil {
189 return err
190 }
191 thumb := &model.Thumbnail{
192 Format: "png",
193 SegmentID: not.Segment.ID,
194 }
195 err = ss.mod.CreateThumbnail(thumb)
196 if err != nil {
197 return err
198 }
199 return nil
200}
201
202func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) {
203 if pv == nil {
204 return nil, fmt.Errorf("post view is nil")
205 }
206 rec, ok := pv.Record.Val.(*bsky.FeedPost)
207 if !ok {
208 return nil, fmt.Errorf("post view record is not a feed post")
209 }
210 if rec.Embed == nil {
211 return nil, fmt.Errorf("post view embed is nil")
212 }
213 if rec.Embed.EmbedExternal == nil {
214 return nil, fmt.Errorf("post view embed external view is nil")
215 }
216 if rec.Embed.EmbedExternal.External == nil {
217 return nil, fmt.Errorf("post view embed external is nil")
218 }
219 if rec.Embed.EmbedExternal.External.Thumb == nil {
220 return nil, fmt.Errorf("post view embed external thumb is nil")
221 }
222 return rec.Embed.EmbedExternal.External.Thumb, nil
223}
224
225func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
226 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
227 ss.lastStatusLock.Lock()
228 defer ss.lastStatusLock.Unlock()
229 if time.Since(ss.lastStatus) < time.Minute {
230 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
231 return nil
232 }
233
234 session, err := ss.mod.GetSessionByDID(repoDID)
235 if err != nil {
236 return fmt.Errorf("could not get session for repoDID: %w", err)
237 }
238 if session == nil {
239 return fmt.Errorf("no session found for repoDID: %s", repoDID)
240 }
241
242 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
243 if err != nil {
244 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
245 }
246 lsv, err := ls.ToLivestreamView()
247 if err != nil {
248 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
249 }
250
251 post, err := ss.mod.GetFeedPost(ls.PostCID)
252 if err != nil {
253 return fmt.Errorf("could not get feed post: %w", err)
254 }
255 postView, err := post.ToBskyPostView()
256 if err != nil {
257 return fmt.Errorf("could not convert feed post to bsky post view: %w", err)
258 }
259 thumb, err := getThumbnailCID(postView)
260 if err != nil {
261 return fmt.Errorf("could not get thumbnail cid: %w", err)
262 }
263
264 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
265 if err != nil {
266 return fmt.Errorf("could not get repo for repoDID: %w", err)
267 }
268
269 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
270 if !ok {
271 return fmt.Errorf("livestream is not a streamplace livestream")
272 }
273
274 actorStatusEmbed := bsky.ActorStatus_Embed{
275 EmbedExternal: &bsky.EmbedExternal{
276 External: &bsky.EmbedExternal_External{
277 Title: lsr.Title,
278 Uri: fmt.Sprintf("https://%s/%s", ss.cli.PublicHost, repo.Handle),
279 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.PublicHost),
280 Thumb: thumb,
281 },
282 },
283 }
284
285 duration := int64(2)
286 status := bsky.ActorStatus{
287 Status: "live",
288 DurationMinutes: &duration,
289 Embed: &actorStatusEmbed,
290 CreatedAt: time.Now().Format(time.RFC3339),
291 }
292
293 client, err := ss.op.GetXrpcClient(session)
294 if err != nil {
295 return fmt.Errorf("could not get xrpc client: %w", err)
296 }
297
298 var swapRecord *string
299 getOutput := atproto.RepoGetRecord_Output{}
300 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
301 "repo": repoDID,
302 "collection": "app.bsky.actor.status",
303 "rkey": "self",
304 }, nil, &getOutput)
305 if err != nil {
306 xErr, ok := err.(*xrpc.Error)
307 if !ok {
308 return fmt.Errorf("could not get record: %w", err)
309 }
310 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
311 return fmt.Errorf("could not get record: %w", err)
312 }
313 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
314 } else {
315 log.Debug(ctx, "got record", "record", getOutput)
316 swapRecord = getOutput.Cid
317 }
318
319 inp := atproto.RepoPutRecord_Input{
320 Collection: "app.bsky.actor.status",
321 Record: &util.LexiconTypeDecoder{Val: &status},
322 Rkey: "self",
323 Repo: repoDID,
324 SwapRecord: swapRecord,
325 }
326 out := atproto.RepoPutRecord_Output{}
327
328 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
329 if err != nil {
330 return fmt.Errorf("could not create record: %w", err)
331 }
332 log.Debug(ctx, "created status record", "out", out)
333
334 ss.lastStatus = time.Now()
335
336 return nil
337}
338
339func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
340 rs, err := renditions.GenerateRenditions(spseg)
341 if err != nil {
342 return fmt.Errorf("failed to generated renditions: %w", err)
343 }
344
345 if ss.lp == nil {
346 var err error
347 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL)
348 if err != nil {
349 return err
350 }
351
352 }
353 spmetrics.TranscodeAttemptsTotal.Inc()
354 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
355 if err != nil {
356 spmetrics.TranscodeErrorsTotal.Inc()
357 return err
358 }
359 if len(rs) != len(segs) {
360 spmetrics.TranscodeErrorsTotal.Inc()
361 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
362 }
363 spmetrics.TranscodeSuccessesTotal.Inc()
364 aqt, err := aqtime.FromString(spseg.StartTime)
365 if err != nil {
366 return err
367 }
368 for i, seg := range segs {
369 log.Debug(ctx, "publishing segment", "rendition", rs[i])
370 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
371 if err != nil {
372 return fmt.Errorf("failed to create transcoded segment file: %w", err)
373 }
374 defer fd.Close()
375 _, err = fd.Write(seg)
376 if err != nil {
377 return fmt.Errorf("failed to write transcoded segment file: %w", err)
378 }
379 go ss.TryAddToHLS(ctx, spseg, rs[i].Name, seg)
380 go ss.mm.PublishSegment(ctx, spseg.Creator, rs[i].Name, &segchanman.Seg{
381 Filepath: fd.Name(),
382 Data: seg,
383 })
384 }
385 return nil
386}
387
388func (ss *StreamSession) TryAddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) {
389 ctx = log.WithLogValues(ctx, "rendition", rendition)
390 err := ss.AddToHLS(ctx, spseg, rendition, data)
391 if err != nil {
392 log.Error(ctx, "could not add to hls", "error", err)
393 }
394}
395
396func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
397 buf := bytes.Buffer{}
398 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
399 if err != nil {
400 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
401 }
402 // newSeg := &streamplace.Segment{
403 // LexiconTypeID: "place.stream.segment",
404 // Id: spseg.Id,
405 // Creator: spseg.Creator,
406 // StartTime: spseg.StartTime,
407 // Duration: &dur,
408 // Audio: spseg.Audio,
409 // Video: spseg.Video,
410 // SigningKey: spseg.SigningKey,
411 // }
412 aqt, err := aqtime.FromString(spseg.StartTime)
413 if err != nil {
414 return fmt.Errorf("failed to parse segment start time: %w", err)
415 }
416 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
417 if err := ss.hls.GetRendition(rendition).NewSegment(&media.Segment{
418 Buf: &buf,
419 Duration: time.Duration(dur),
420 Time: aqt.Time(),
421 }); err != nil {
422 return fmt.Errorf("failed to create new segment: %w", err)
423 }
424
425 return nil
426}