Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/docs-openapi 111 lines 3.4 kB view raw
1package spxrpc 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "time" 9 10 "github.com/bluesky-social/indigo/lex/util" 11 "github.com/bluesky-social/indigo/repo" 12 "github.com/ipfs/go-cid" 13 "github.com/labstack/echo/v4" 14 "github.com/multiformats/go-multihash" 15 "stream.place/streamplace/pkg/spmetrics" 16 17 placestreamtypes "stream.place/streamplace/pkg/streamplace" 18) 19 20func (s *Server) handlePlaceStreamLiveGetSegments(ctx context.Context, before string, limit int, userDID string) (*placestreamtypes.LiveGetSegments_Output, error) { 21 if userDID == "" { 22 return nil, echo.NewHTTPError(http.StatusBadRequest, "User DID is required") 23 } 24 var beforeTime *time.Time 25 if before != "" { 26 parsedTime, err := time.Parse(time.RFC3339, before) 27 if err != nil { 28 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid 'before' parameter: must be RFC3339 format") 29 } 30 beforeTime = &parsedTime 31 } 32 33 segments, err := s.model.LatestSegmentsForUser(userDID, limit, beforeTime) 34 if err != nil { 35 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch segments") 36 } 37 38 // Convert segments to the expected output format 39 output := &placestreamtypes.LiveGetSegments_Output{ 40 Segments: make([]*placestreamtypes.Segment_SegmentView, len(segments)), 41 } 42 43 for i, segment := range segments { 44 record, err := segment.ToStreamplaceSegment() 45 if err != nil { 46 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to convert segment to streamplace segment: %s", err)) 47 } 48 c, err := getCID(record) 49 if err != nil { 50 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to get CID: %s", err)) 51 } 52 ltd := &util.LexiconTypeDecoder{Val: record} 53 54 output.Segments[i] = &placestreamtypes.Segment_SegmentView{ 55 Record: ltd, 56 Cid: c.String(), 57 } 58 } 59 60 return output, nil 61} 62 63func (s *Server) handlePlaceStreamLiveGetLiveUsers(ctx context.Context, before string, limit int) (*placestreamtypes.LiveGetLiveUsers_Output, error) { 64 var beforeTime *time.Time 65 if before != "" { 66 parsedTime, err := time.Parse(time.RFC3339, before) 67 if err != nil { 68 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid 'before' parameter: must be RFC3339 format") 69 } 70 beforeTime = &parsedTime 71 } 72 ls, err := s.model.GetLatestLivestreams(limit, beforeTime) 73 if err != nil { 74 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch livestreams") 75 } 76 77 streams := make([]*placestreamtypes.Livestream_LivestreamView, len(ls)) 78 79 for i, l := range ls { 80 stream, err := l.ToLivestreamView() 81 if err != nil { 82 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to convert livestream to streamplace livestream: %s", err)) 83 } 84 viewers := spmetrics.GetViewCount(stream.Author.Did) 85 stream.ViewerCount = &placestreamtypes.Livestream_ViewerCount{ 86 LexiconTypeID: "place.stream.livestream#viewerCount", 87 Count: int64(viewers), 88 } 89 streams[i] = stream 90 } 91 92 liveUsers := &placestreamtypes.LiveGetLiveUsers_Output{ 93 Streams: streams, 94 } 95 96 return liveUsers, nil 97} 98 99func getCID(record repo.CborMarshaler) (*cid.Cid, error) { 100 builder := cid.NewPrefixV1(cid.DagCBOR, multihash.SHA2_256) 101 buf := bytes.NewBuffer(nil) 102 err := record.MarshalCBOR(buf) 103 if err != nil { 104 return nil, err 105 } 106 c, err := builder.Sum(buf.Bytes()) 107 if err != nil { 108 return nil, err 109 } 110 return &c, nil 111}