Live video on the AT Protocol
1package spxrpc
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 "time"
8
9 "github.com/bluesky-social/indigo/lex/util"
10 "github.com/labstack/echo/v4"
11 "stream.place/streamplace/pkg/spid"
12 "stream.place/streamplace/pkg/spmetrics"
13
14 placestreamtypes "stream.place/streamplace/pkg/streamplace"
15)
16
17func (s *Server) handlePlaceStreamLiveGetSegments(ctx context.Context, before string, limit int, userDID string) (*placestreamtypes.LiveGetSegments_Output, error) {
18 if userDID == "" {
19 return nil, echo.NewHTTPError(http.StatusBadRequest, "User DID is required")
20 }
21 var beforeTime *time.Time
22 if before != "" {
23 parsedTime, err := time.Parse(time.RFC3339, before)
24 if err != nil {
25 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid 'before' parameter: must be RFC3339 format")
26 }
27 beforeTime = &parsedTime
28 }
29
30 segments, err := s.model.LatestSegmentsForUser(userDID, limit, beforeTime, nil)
31 if err != nil {
32 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch segments")
33 }
34
35 // Convert segments to the expected output format
36 output := &placestreamtypes.LiveGetSegments_Output{
37 Segments: make([]*placestreamtypes.Segment_SegmentView, len(segments)),
38 }
39
40 for i, segment := range segments {
41 record, err := segment.ToStreamplaceSegment()
42 if err != nil {
43 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to convert segment to streamplace segment: %s", err))
44 }
45 c, err := spid.GetCID(record)
46 if err != nil {
47 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to get CID: %s", err))
48 }
49 ltd := &util.LexiconTypeDecoder{Val: record}
50
51 output.Segments[i] = &placestreamtypes.Segment_SegmentView{
52 Record: ltd,
53 Cid: c.String(),
54 }
55 }
56
57 return output, nil
58}
59
60func (s *Server) handlePlaceStreamLiveGetLiveUsers(ctx context.Context, before string, limit int) (*placestreamtypes.LiveGetLiveUsers_Output, error) {
61 var beforeTime *time.Time
62 if before != "" {
63 parsedTime, err := time.Parse(time.RFC3339, before)
64 if err != nil {
65 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid 'before' parameter: must be RFC3339 format")
66 }
67 beforeTime = &parsedTime
68 }
69 ls, err := s.model.GetLatestLivestreams(limit, beforeTime)
70 if err != nil {
71 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch livestreams")
72 }
73
74 streams := make([]*placestreamtypes.Livestream_LivestreamView, len(ls))
75
76 for i, l := range ls {
77 stream, err := l.ToLivestreamView()
78 if err != nil {
79 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to convert livestream to streamplace livestream: %s", err))
80 }
81 viewers := spmetrics.GetViewCount(stream.Author.Did)
82 stream.ViewerCount = &placestreamtypes.Livestream_ViewerCount{
83 LexiconTypeID: "place.stream.livestream#viewerCount",
84 Count: int64(viewers),
85 }
86 streams[i] = stream
87 }
88
89 liveUsers := &placestreamtypes.LiveGetLiveUsers_Output{
90 Streams: streams,
91 }
92
93 return liveUsers, nil
94}