Live video on the AT Protocol
1package spxrpc
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/http"
8 "time"
9
10 "github.com/bluesky-social/indigo/lex/util"
11 "github.com/bluesky-social/indigo/repo"
12 "github.com/ipfs/go-cid"
13 "github.com/labstack/echo/v4"
14 "github.com/multiformats/go-multihash"
15 "stream.place/streamplace/pkg/spmetrics"
16
17 placestreamtypes "stream.place/streamplace/pkg/streamplace"
18)
19
20func (s *Server) handlePlaceStreamLiveGetSegments(ctx context.Context, before string, limit int, userDID string) (*placestreamtypes.LiveGetSegments_Output, error) {
21 if userDID == "" {
22 return nil, echo.NewHTTPError(http.StatusBadRequest, "User DID is required")
23 }
24 var beforeTime *time.Time
25 if before != "" {
26 parsedTime, err := time.Parse(time.RFC3339, before)
27 if err != nil {
28 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid 'before' parameter: must be RFC3339 format")
29 }
30 beforeTime = &parsedTime
31 }
32
33 segments, err := s.model.LatestSegmentsForUser(userDID, limit, beforeTime)
34 if err != nil {
35 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch segments")
36 }
37
38 // Convert segments to the expected output format
39 output := &placestreamtypes.LiveGetSegments_Output{
40 Segments: make([]*placestreamtypes.Segment_SegmentView, len(segments)),
41 }
42
43 for i, segment := range segments {
44 record, err := segment.ToStreamplaceSegment()
45 if err != nil {
46 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to convert segment to streamplace segment: %s", err))
47 }
48 c, err := getCID(record)
49 if err != nil {
50 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to get CID: %s", err))
51 }
52 ltd := &util.LexiconTypeDecoder{Val: record}
53
54 output.Segments[i] = &placestreamtypes.Segment_SegmentView{
55 Record: ltd,
56 Cid: c.String(),
57 }
58 }
59
60 return output, nil
61}
62
63func (s *Server) handlePlaceStreamLiveGetLiveUsers(ctx context.Context, before string, limit int) (*placestreamtypes.LiveGetLiveUsers_Output, error) {
64 var beforeTime *time.Time
65 if before != "" {
66 parsedTime, err := time.Parse(time.RFC3339, before)
67 if err != nil {
68 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid 'before' parameter: must be RFC3339 format")
69 }
70 beforeTime = &parsedTime
71 }
72 ls, err := s.model.GetLatestLivestreams(limit, beforeTime)
73 if err != nil {
74 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch livestreams")
75 }
76
77 streams := make([]*placestreamtypes.Livestream_LivestreamView, len(ls))
78
79 for i, l := range ls {
80 stream, err := l.ToLivestreamView()
81 if err != nil {
82 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to convert livestream to streamplace livestream: %s", err))
83 }
84 viewers := spmetrics.GetViewCount(stream.Author.Did)
85 stream.ViewerCount = &placestreamtypes.Livestream_ViewerCount{
86 LexiconTypeID: "place.stream.livestream#viewerCount",
87 Count: int64(viewers),
88 }
89 streams[i] = stream
90 }
91
92 liveUsers := &placestreamtypes.LiveGetLiveUsers_Output{
93 Streams: streams,
94 }
95
96 return liveUsers, nil
97}
98
99func getCID(record repo.CborMarshaler) (*cid.Cid, error) {
100 builder := cid.NewPrefixV1(cid.DagCBOR, multihash.SHA2_256)
101 buf := bytes.NewBuffer(nil)
102 err := record.MarshalCBOR(buf)
103 if err != nil {
104 return nil, err
105 }
106 c, err := builder.Sum(buf.Bytes())
107 if err != nil {
108 return nil, err
109 }
110 return &c, nil
111}