fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package director
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "github.com/streamplace/oatproxy/pkg/oatproxy"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/aqtime"
17 "stream.place/streamplace/pkg/bus"
18 "stream.place/streamplace/pkg/config"
19 "stream.place/streamplace/pkg/livepeer"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/media"
22 "stream.place/streamplace/pkg/model"
23 "stream.place/streamplace/pkg/renditions"
24 "stream.place/streamplace/pkg/spmetrics"
25 "stream.place/streamplace/pkg/streamplace"
26 "stream.place/streamplace/pkg/thumbnail"
27)
28
29type StreamSession struct {
30 mm *media.MediaManager
31 mod model.Model
32 cli *config.CLI
33 bus *bus.Bus
34 op *oatproxy.OATProxy
35 hls *media.M3U8
36 lp *livepeer.LivepeerSession
37 repoDID string
38 segmentChan chan struct{}
39 lastStatus time.Time
40 lastStatusLock sync.Mutex
41 g *errgroup.Group
42 started chan struct{}
43 ctx context.Context
44 packets []bus.PacketizedSegment
45}
46
47func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error {
48 ctx, cancel := context.WithCancel(ctx)
49 ss.g, ctx = errgroup.WithContext(ctx)
50 sid := livepeer.RandomTrailer(8)
51 ctx = log.WithLogValues(ctx, "sid", sid)
52 ss.ctx = ctx
53 log.Log(ctx, "starting stream session")
54 defer cancel()
55 spseg, err := not.Segment.ToStreamplaceSegment()
56 if err != nil {
57 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
58 }
59 var allRenditions renditions.Renditions
60
61 if ss.cli.LivepeerGatewayURL != "" {
62 allRenditions, err = renditions.GenerateRenditions(spseg)
63 } else {
64 allRenditions = []renditions.Rendition{}
65 }
66 if err != nil {
67 return err
68 }
69 if spseg.Duration == nil {
70 return fmt.Errorf("segment duration is required to calculate bitrate")
71 }
72 dur := time.Duration(*spseg.Duration)
73 byteLen := len(not.Data)
74 bitrate := int(float64(byteLen) / dur.Seconds() * 8)
75 sourceRendition := renditions.Rendition{
76 Name: "source",
77 Bitrate: bitrate,
78 Width: spseg.Video[0].Width,
79 Height: spseg.Video[0].Height,
80 }
81 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...)
82 ss.hls = media.NewM3U8(allRenditions)
83
84 // for _, r := range allRenditions {
85 // g.Go(func() error {
86 // for {
87 // if ctx.Err() != nil {
88 // return nil
89 // }
90 // err := ss.mm.ToHLS(ctx, spseg.Creator, r.Name, ss.hls)
91 // if ctx.Err() != nil {
92 // return nil
93 // }
94 // log.Warn(ctx, "hls failed, retrying in 5 seconds", "error", err)
95 // time.Sleep(time.Second * 5)
96 // }
97 // })
98 // }
99
100 close(ss.started)
101
102 for {
103 select {
104 case <-ss.segmentChan:
105 // reset timer
106 case <-ctx.Done():
107 return ss.g.Wait()
108 // case <-time.After(time.Minute * 1):
109 case <-time.After(time.Second * 60):
110 log.Log(ctx, "no new segments for 1 minute, shutting down")
111 for _, r := range allRenditions {
112 ss.bus.EndSession(ctx, spseg.Creator, r.Name)
113 }
114 cancel()
115 }
116 }
117}
118
119// Execute a goroutine in the context of the stream session. Errors are
120// non-fatal; if you actually want to melt the universe on an error you
121// should panic()
122func (ss *StreamSession) Go(ctx context.Context, f func() error) {
123 <-ss.started
124 ss.g.Go(func() error {
125 err := f()
126 if err != nil {
127 log.Error(ctx, "error in goroutine", "error", err)
128 }
129 return nil
130 })
131}
132
133func (ss *StreamSession) NewSegment(ctx context.Context, not *media.NewSegmentNotification) error {
134 <-ss.started
135 go func() {
136 select {
137 case <-ss.ctx.Done():
138 return
139 case ss.segmentChan <- struct{}{}:
140 }
141 }()
142 aqt := aqtime.FromTime(not.Segment.StartTime)
143 ctx = log.WithLogValues(ctx, "segID", not.Segment.ID, "repoDID", not.Segment.RepoDID, "timestamp", aqt.FileSafeString())
144 err := ss.mod.CreateSegment(not.Segment)
145 if err != nil {
146 return fmt.Errorf("could not add segment to database: %w", err)
147 }
148 spseg, err := not.Segment.ToStreamplaceSegment()
149 if err != nil {
150 return fmt.Errorf("could not convert segment to streamplace segment: %w", err)
151 }
152
153 ss.bus.Publish(spseg.Creator, spseg)
154 ss.Go(ctx, func() error {
155 return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
156 Filepath: not.Segment.ID,
157 Data: not.Data,
158 })
159 })
160
161 if ss.cli.Thumbnail {
162 ss.Go(ctx, func() error {
163 return ss.Thumbnail(ctx, spseg.Creator, not)
164 })
165 }
166
167 ss.Go(ctx, func() error {
168 return ss.UpdateStatus(ctx, spseg.Creator)
169 })
170
171 if ss.cli.LivepeerGatewayURL != "" {
172 ss.Go(ctx, func() error {
173 start := time.Now()
174 err := ss.Transcode(ctx, spseg, not.Data)
175 took := time.Since(start)
176 spmetrics.QueuedTranscodeDuration.WithLabelValues(spseg.Creator).Set(float64(took.Milliseconds()))
177 return err
178 })
179 }
180
181 return nil
182}
183
184func (ss *StreamSession) Thumbnail(ctx context.Context, repoDID string, not *media.NewSegmentNotification) error {
185 lock := thumbnail.GetThumbnailLock(not.Segment.RepoDID)
186 locked := lock.TryLock()
187 if !locked {
188 // we're already generating a thumbnail for this user, skip
189 return nil
190 }
191 defer lock.Unlock()
192 oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID)
193 if err != nil {
194 return err
195 }
196 if oldThumb != nil && not.Segment.StartTime.Sub(oldThumb.Segment.StartTime) < time.Minute {
197 // we have a thumbnail <60sec old, skip generating a new one
198 return nil
199 }
200 r := bytes.NewReader(not.Data)
201 aqt := aqtime.FromTime(not.Segment.StartTime)
202 fd, err := ss.cli.SegmentFileCreate(not.Segment.RepoDID, aqt, "jpeg")
203 if err != nil {
204 return err
205 }
206 defer fd.Close()
207 err = media.Thumbnail(ctx, r, fd, "jpeg")
208 if err != nil {
209 return err
210 }
211 thumb := &model.Thumbnail{
212 Format: "jpeg",
213 SegmentID: not.Segment.ID,
214 }
215 err = ss.mod.CreateThumbnail(thumb)
216 if err != nil {
217 return err
218 }
219 return nil
220}
221
222func getThumbnailCID(pv *bsky.FeedDefs_PostView) (*util.LexBlob, error) {
223 if pv == nil {
224 return nil, fmt.Errorf("post view is nil")
225 }
226 rec, ok := pv.Record.Val.(*bsky.FeedPost)
227 if !ok {
228 return nil, fmt.Errorf("post view record is not a feed post")
229 }
230 if rec.Embed == nil {
231 return nil, fmt.Errorf("post view embed is nil")
232 }
233 if rec.Embed.EmbedExternal == nil {
234 return nil, fmt.Errorf("post view embed external view is nil")
235 }
236 if rec.Embed.EmbedExternal.External == nil {
237 return nil, fmt.Errorf("post view embed external is nil")
238 }
239 if rec.Embed.EmbedExternal.External.Thumb == nil {
240 return nil, fmt.Errorf("post view embed external thumb is nil")
241 }
242 return rec.Embed.EmbedExternal.External.Thumb, nil
243}
244
245func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error {
246 ctx = log.WithLogValues(ctx, "func", "UpdateStatus")
247 ss.lastStatusLock.Lock()
248 defer ss.lastStatusLock.Unlock()
249 if time.Since(ss.lastStatus) < time.Minute {
250 log.Debug(ctx, "not updating status, last status was less than 1 minute ago")
251 return nil
252 }
253
254 session, err := ss.mod.GetSessionByDID(repoDID)
255 if err != nil {
256 return fmt.Errorf("could not get OAuth session for repoDID: %w", err)
257 }
258 if session == nil {
259 return fmt.Errorf("no session found for repoDID: %s", repoDID)
260 }
261
262 session, err = ss.op.RefreshIfNeeded(session)
263 if err != nil {
264 return fmt.Errorf("could not refresh session for repoDID: %w", err)
265 }
266
267 ls, err := ss.mod.GetLatestLivestreamForRepo(repoDID)
268 if err != nil {
269 return fmt.Errorf("could not get latest livestream for repoDID: %w", err)
270 }
271 lsv, err := ls.ToLivestreamView()
272 if err != nil {
273 return fmt.Errorf("could not convert livestream to streamplace livestream: %w", err)
274 }
275
276 post, err := ss.mod.GetFeedPost(ls.PostCID)
277 if err != nil {
278 return fmt.Errorf("could not get feed post: %w", err)
279 }
280 if post == nil {
281 return fmt.Errorf("feed post not found for livestream: %w", err)
282 }
283 postView, err := post.ToBskyPostView()
284 if err != nil {
285 return fmt.Errorf("could not convert feed post to bsky post view: %w", err)
286 }
287 thumb, err := getThumbnailCID(postView)
288 if err != nil {
289 return fmt.Errorf("could not get thumbnail cid: %w", err)
290 }
291
292 repo, err := ss.mod.GetRepoByHandleOrDID(repoDID)
293 if err != nil {
294 return fmt.Errorf("could not get repo for repoDID: %w", err)
295 }
296
297 lsr, ok := lsv.Record.Val.(*streamplace.Livestream)
298 if !ok {
299 return fmt.Errorf("livestream is not a streamplace livestream")
300 }
301
302 actorStatusEmbed := bsky.ActorStatus_Embed{
303 EmbedExternal: &bsky.EmbedExternal{
304 External: &bsky.EmbedExternal_External{
305 Title: lsr.Title,
306 Uri: fmt.Sprintf("https://%s/%s", ss.cli.PublicHost, repo.Handle),
307 Description: fmt.Sprintf("@%s is 🔴LIVE on %s", repo.Handle, ss.cli.PublicHost),
308 Thumb: thumb,
309 },
310 },
311 }
312
313 duration := int64(2)
314 status := bsky.ActorStatus{
315 Status: "live",
316 DurationMinutes: &duration,
317 Embed: &actorStatusEmbed,
318 CreatedAt: time.Now().Format(time.RFC3339),
319 }
320
321 client, err := ss.op.GetXrpcClient(session)
322 if err != nil {
323 return fmt.Errorf("could not get xrpc client: %w", err)
324 }
325
326 var swapRecord *string
327 getOutput := atproto.RepoGetRecord_Output{}
328 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{
329 "repo": repoDID,
330 "collection": "app.bsky.actor.status",
331 "rkey": "self",
332 }, nil, &getOutput)
333 if err != nil {
334 xErr, ok := err.(*xrpc.Error)
335 if !ok {
336 return fmt.Errorf("could not get record: %w", err)
337 }
338 if xErr.StatusCode != 400 { // yes, they return "400" for "not found"
339 return fmt.Errorf("could not get record: %w", err)
340 }
341 log.Debug(ctx, "record not found, creating", "repoDID", repoDID)
342 } else {
343 log.Debug(ctx, "got record", "record", getOutput)
344 swapRecord = getOutput.Cid
345 }
346
347 inp := atproto.RepoPutRecord_Input{
348 Collection: "app.bsky.actor.status",
349 Record: &util.LexiconTypeDecoder{Val: &status},
350 Rkey: "self",
351 Repo: repoDID,
352 SwapRecord: swapRecord,
353 }
354 out := atproto.RepoPutRecord_Output{}
355
356 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
357 if err != nil {
358 return fmt.Errorf("could not create record: %w", err)
359 }
360 log.Debug(ctx, "created status record", "out", out)
361
362 ss.lastStatus = time.Now()
363
364 return nil
365}
366
367func (ss *StreamSession) Transcode(ctx context.Context, spseg *streamplace.Segment, data []byte) error {
368 rs, err := renditions.GenerateRenditions(spseg)
369 if err != nil {
370 return fmt.Errorf("failed to generated renditions: %w", err)
371 }
372
373 if ss.lp == nil {
374 var err error
375 ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL)
376 if err != nil {
377 return err
378 }
379
380 }
381 spmetrics.TranscodeAttemptsTotal.Inc()
382 segs, err := ss.lp.PostSegmentToGateway(ctx, data, spseg, rs)
383 if err != nil {
384 spmetrics.TranscodeErrorsTotal.Inc()
385 return err
386 }
387 if len(rs) != len(segs) {
388 spmetrics.TranscodeErrorsTotal.Inc()
389 return fmt.Errorf("expected %d renditions, got %d", len(rs), len(segs))
390 }
391 spmetrics.TranscodeSuccessesTotal.Inc()
392 aqt, err := aqtime.FromString(spseg.StartTime)
393 if err != nil {
394 return err
395 }
396 for i, seg := range segs {
397 ctx := log.WithLogValues(ctx, "rendition", rs[i].Name)
398 log.Debug(ctx, "publishing segment", "rendition", rs[i])
399 fd, err := ss.cli.SegmentFileCreate(spseg.Creator, aqt, fmt.Sprintf("%s.mp4", rs[i].Name))
400 if err != nil {
401 return fmt.Errorf("failed to create transcoded segment file: %w", err)
402 }
403 defer fd.Close()
404 _, err = fd.Write(seg)
405 if err != nil {
406 return fmt.Errorf("failed to write transcoded segment file: %w", err)
407 }
408 ss.Go(ctx, func() error {
409 return ss.AddPlaybackSegment(ctx, spseg, rs[i].Name, &bus.Seg{
410 Filepath: fd.Name(),
411 Data: seg,
412 })
413 })
414
415 }
416 return nil
417}
418
419func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
420 ss.Go(ctx, func() error {
421 return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
422 })
423 ss.Go(ctx, func() error {
424 return ss.AddToWebRTC(ctx, spseg, rendition, seg)
425 })
426 return nil
427}
428
429func (ss *StreamSession) AddToWebRTC(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
430 packet, err := media.Packetize(ctx, seg)
431 if err != nil {
432 return fmt.Errorf("failed to packetize segment: %w", err)
433 }
434 seg.PacketizedData = packet
435 ss.bus.PublishSegment(ctx, spseg.Creator, rendition, seg)
436 return nil
437}
438
439func (ss *StreamSession) AddToHLS(ctx context.Context, spseg *streamplace.Segment, rendition string, data []byte) error {
440 buf := bytes.Buffer{}
441 dur, err := media.MP4ToMPEGTS(ctx, bytes.NewReader(data), &buf)
442 if err != nil {
443 return fmt.Errorf("failed to convert MP4 to MPEG-TS: %w", err)
444 }
445 // newSeg := &streamplace.Segment{
446 // LexiconTypeID: "place.stream.segment",
447 // Id: spseg.Id,
448 // Creator: spseg.Creator,
449 // StartTime: spseg.StartTime,
450 // Duration: &dur,
451 // Audio: spseg.Audio,
452 // Video: spseg.Video,
453 // SigningKey: spseg.SigningKey,
454 // }
455 aqt, err := aqtime.FromString(spseg.StartTime)
456 if err != nil {
457 return fmt.Errorf("failed to parse segment start time: %w", err)
458 }
459 log.Debug(ctx, "transmuxed to mpegts, adding to hls", "rendition", rendition, "size", buf.Len())
460 rend, err := ss.hls.GetRendition(rendition)
461 if err != nil {
462 return fmt.Errorf("failed to get rendition: %w", err)
463 }
464 if err := rend.NewSegment(&media.Segment{
465 Buf: &buf,
466 Duration: time.Duration(dur),
467 Time: aqt.Time(),
468 }); err != nil {
469 return fmt.Errorf("failed to create new segment: %w", err)
470 }
471
472 return nil
473}