Live video on the AT Protocol

xrpc: add place.stream.live.getSegments

+325 -2
+1 -1
js/app/features/bluesky/blueskySlice.tsx
··· 300 301 console.log("getting profiles"); 302 return await bskyAgent.getProfiles({ 303 - actors: actors, 304 }); 305 }, 306 {
··· 300 301 console.log("getting profiles"); 302 return await bskyAgent.getProfiles({ 303 + actors: [actors.join(",")], 304 }); 305 }, 306 {
+88
js/docs/src/content/docs/lex-reference/live/place-stream-live-getsegments.md
···
··· 1 + --- 2 + title: place.stream.live.getSegments 3 + description: Reference for the place.stream.live.getSegments lexicon 4 + --- 5 + 6 + **Lexicon Version:** 1 7 + 8 + ## Definitions 9 + 10 + <a name="main"></a> 11 + 12 + ### `main` 13 + 14 + **Type:** `query` 15 + 16 + Get a list of livestream segments for a user 17 + 18 + **Parameters:** 19 + 20 + | Name | Type | Req'd | Description | Constraints | 21 + | --------- | --------- | ----- | ----------------------------------------- | ------------------------------------- | 22 + | `userDID` | `string` | ✅ | The DID of the potentially-following user | Format: `did` | 23 + | `limit` | `integer` | ❌ | | Min: 1<br/>Max: 100<br/>Default: `50` | 24 + | `before` | `string` | ❌ | | Format: `datetime` | 25 + 26 + **Output:** 27 + 28 + - **Encoding:** `application/json` 29 + - **Schema:** 30 + 31 + **Schema Type:** `object` 32 + 33 + | Name | Type | Req'd | Description | Constraints | 34 + | ---------- | ---------------------------------------------------------------------------------------------- | ----- | ----------- | ----------- | 35 + | `segments` | Array of [`place.stream.segment#segmentView`](/lex-reference/place-stream-segment#segmentview) | ❌ | | | 36 + 37 + --- 38 + 39 + ## Lexicon Source 40 + 41 + ```json 42 + { 43 + "lexicon": 1, 44 + "id": "place.stream.live.getSegments", 45 + "defs": { 46 + "main": { 47 + "type": "query", 48 + "description": "Get a list of livestream segments for a user", 49 + "parameters": { 50 + "type": "params", 51 + "required": ["userDID"], 52 + "properties": { 53 + "userDID": { 54 + "type": "string", 55 + "format": "did", 56 + "description": "The DID of the potentially-following user" 57 + }, 58 + "limit": { 59 + "type": "integer", 60 + "minimum": 1, 61 + "maximum": 100, 62 + "default": 50 63 + }, 64 + "before": { 65 + "type": "string", 66 + "format": "datetime" 67 + } 68 + } 69 + }, 70 + "output": { 71 + "encoding": "application/json", 72 + "schema": { 73 + "type": "object", 74 + "properties": { 75 + "segments": { 76 + "type": "array", 77 + "items": { 78 + "type": "ref", 79 + "ref": "place.stream.segment#segmentView" 80 + } 81 + } 82 + } 83 + } 84 + } 85 + } 86 + } 87 + } 88 + ```
+28
js/docs/src/content/docs/lex-reference/place-stream-segment.md
··· 79 80 --- 81 82 ## Lexicon Source 83 84 ```json ··· 177 }, 178 "den": { 179 "type": "integer" 180 } 181 } 182 }
··· 79 80 --- 81 82 + <a name="segmentview"></a> 83 + 84 + ### `segmentView` 85 + 86 + **Type:** `object` 87 + 88 + **Properties:** 89 + 90 + | Name | Type | Req'd | Description | Constraints | 91 + | -------- | --------- | ----- | ----------- | ------------- | 92 + | `cid` | `string` | ✅ | | Format: `cid` | 93 + | `record` | `unknown` | ✅ | | | 94 + 95 + --- 96 + 97 ## Lexicon Source 98 99 ```json ··· 192 }, 193 "den": { 194 "type": "integer" 195 + } 196 + } 197 + }, 198 + "segmentView": { 199 + "type": "object", 200 + "required": ["cid", "record"], 201 + "properties": { 202 + "cid": { 203 + "type": "string", 204 + "format": "cid" 205 + }, 206 + "record": { 207 + "type": "unknown" 208 } 209 } 210 }
+43
lexicons/place/stream/live/getSegments.json
···
··· 1 + { 2 + "lexicon": 1, 3 + "id": "place.stream.live.getSegments", 4 + "defs": { 5 + "main": { 6 + "type": "query", 7 + "description": "Get a list of livestream segments for a user", 8 + "parameters": { 9 + "type": "params", 10 + "required": ["userDID"], 11 + "properties": { 12 + "userDID": { 13 + "type": "string", 14 + "format": "did", 15 + "description": "The DID of the potentially-following user" 16 + }, 17 + "limit": { 18 + "type": "integer", 19 + "minimum": 1, 20 + "maximum": 100, 21 + "default": 50 22 + }, 23 + "before": { "type": "string", "format": "datetime" } 24 + } 25 + }, 26 + "output": { 27 + "encoding": "application/json", 28 + "schema": { 29 + "type": "object", 30 + "properties": { 31 + "segments": { 32 + "type": "array", 33 + "items": { 34 + "type": "ref", 35 + "ref": "place.stream.segment#segmentView" 36 + } 37 + } 38 + } 39 + } 40 + } 41 + } 42 + } 43 + }
+8
lexicons/place/stream/segment.json
··· 77 "num": { "type": "integer" }, 78 "den": { "type": "integer" } 79 } 80 } 81 } 82 }
··· 77 "num": { "type": "integer" }, 78 "den": { "type": "integer" } 79 } 80 + }, 81 + "segmentView": { 82 + "type": "object", 83 + "required": ["cid", "record"], 84 + "properties": { 85 + "cid": { "type": "string", "format": "cid" }, 86 + "record": { "type": "unknown" } 87 + } 88 } 89 } 90 }
+1
pkg/model/model.go
··· 37 MostRecentSegments() ([]Segment, error) 38 MostRecentSegmentsWithStreamInfo() ([]SegmentWithStreamInfo, error) 39 LatestSegmentForUser(user string) (*Segment, error) 40 CreateThumbnail(thumb *Thumbnail) error 41 LatestThumbnailForUser(user string) (*Thumbnail, error) 42 GetSegment(id string) (*Segment, error)
··· 37 MostRecentSegments() ([]Segment, error) 38 MostRecentSegmentsWithStreamInfo() ([]SegmentWithStreamInfo, error) 39 LatestSegmentForUser(user string) (*Segment, error) 40 + LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error) 41 CreateThumbnail(thumb *Thumbnail) error 42 LatestThumbnailForUser(user string) (*Thumbnail, error) 43 GetSegment(id string) (*Segment, error)
+19 -1
pkg/model/segment.go
··· 156 return &seg, nil 157 } 158 159 func (m *DBModel) GetLiveUsers() ([]Segment, error) { 160 var liveUsers []Segment 161 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time() ··· 324 // finally, make the resulting SegmentWithStreamInfos 325 resultWithStreamInfo := make([]SegmentWithStreamInfo, len(recentSegments)) 326 for i, seg := range recentSegments { 327 resultWithStreamInfo[i] = SegmentWithStreamInfo{ 328 Segment: seg, 329 - LivestreamView: livestreamMap[seg.RepoDID].Record, 330 } 331 } 332
··· 156 return &seg, nil 157 } 158 159 + func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error) { 160 + var segs []Segment 161 + if before == nil { 162 + later := time.Now().Add(1000 * time.Hour) 163 + before = &later 164 + } 165 + err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ?", user, before.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 166 + if err != nil { 167 + return nil, err 168 + } 169 + return segs, nil 170 + } 171 + 172 func (m *DBModel) GetLiveUsers() ([]Segment, error) { 173 var liveUsers []Segment 174 thirtySecondsAgo := aqtime.FromTime(time.Now().Add(-30 * time.Second)).Time() ··· 337 // finally, make the resulting SegmentWithStreamInfos 338 resultWithStreamInfo := make([]SegmentWithStreamInfo, len(recentSegments)) 339 for i, seg := range recentSegments { 340 + view, ok := livestreamMap[seg.RepoDID] 341 + if !ok { 342 + log.Error(context.Background(), "No livestream view found for repo_did", "repo_did", seg.RepoDID) 343 + continue 344 + } 345 resultWithStreamInfo[i] = SegmentWithStreamInfo{ 346 Segment: seg, 347 + LivestreamView: view.Record, 348 } 349 } 350
+70
pkg/spxrpc/place_stream_live.go
···
··· 1 + package spxrpc 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "net/http" 8 + "time" 9 + 10 + "github.com/bluesky-social/indigo/lex/util" 11 + "github.com/bluesky-social/indigo/repo" 12 + "github.com/ipfs/go-cid" 13 + "github.com/labstack/echo/v4" 14 + "github.com/multiformats/go-multihash" 15 + placestreamtypes "stream.place/streamplace/pkg/streamplace" 16 + ) 17 + 18 + func (s *Server) handlePlaceStreamLiveGetSegments(ctx context.Context, before string, limit int, userDID string) (*placestreamtypes.LiveGetSegments_Output, error) { 19 + var beforeTime *time.Time 20 + if before != "" { 21 + parsedTime, err := time.Parse(time.RFC3339, before) 22 + if err != nil { 23 + return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid 'before' parameter: must be RFC3339 format") 24 + } 25 + beforeTime = &parsedTime 26 + } 27 + 28 + segments, err := s.model.LatestSegmentsForUser(userDID, limit, beforeTime) 29 + if err != nil { 30 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch segments") 31 + } 32 + 33 + // Convert segments to the expected output format 34 + output := &placestreamtypes.LiveGetSegments_Output{ 35 + Segments: make([]*placestreamtypes.Segment_SegmentView, len(segments)), 36 + } 37 + 38 + for i, segment := range segments { 39 + record, err := segment.ToStreamplaceSegment() 40 + if err != nil { 41 + return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to convert segment to streamplace segment: %s", err)) 42 + } 43 + c, err := getCID(record) 44 + if err != nil { 45 + return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Failed to get CID: %s", err)) 46 + } 47 + ltd := &util.LexiconTypeDecoder{Val: record} 48 + 49 + output.Segments[i] = &placestreamtypes.Segment_SegmentView{ 50 + Record: ltd, 51 + Cid: c.String(), 52 + } 53 + } 54 + 55 + return output, nil 56 + } 57 + 58 + func getCID(record repo.CborMarshaler) (*cid.Cid, error) { 59 + builder := cid.NewPrefixV1(cid.DagCBOR, multihash.SHA2_256) 60 + buf := bytes.NewBuffer(nil) 61 + err := record.MarshalCBOR(buf) 62 + if err != nil { 63 + return nil, err 64 + } 65 + c, err := builder.Sum(buf.Bytes()) 66 + if err != nil { 67 + return nil, err 68 + } 69 + return &c, nil 70 + }
+27
pkg/spxrpc/stubs.go
··· 97 98 func (s *Server) RegisterHandlersPlaceStream(e *echo.Echo) error { 99 e.GET("/xrpc/place.stream.graph.getFollowingUser", s.HandlePlaceStreamGraphGetFollowingUser) 100 return nil 101 } 102 ··· 109 var handleErr error 110 // func (s *Server) handlePlaceStreamGraphGetFollowingUser(ctx context.Context,subjectDID string,userDID string) (*placestreamtypes.GraphGetFollowingUser_Output, error) 111 out, handleErr = s.handlePlaceStreamGraphGetFollowingUser(ctx, subjectDID, userDID) 112 if handleErr != nil { 113 return handleErr 114 }
··· 97 98 func (s *Server) RegisterHandlersPlaceStream(e *echo.Echo) error { 99 e.GET("/xrpc/place.stream.graph.getFollowingUser", s.HandlePlaceStreamGraphGetFollowingUser) 100 + e.GET("/xrpc/place.stream.live.getSegments", s.HandlePlaceStreamLiveGetSegments) 101 return nil 102 } 103 ··· 110 var handleErr error 111 // func (s *Server) handlePlaceStreamGraphGetFollowingUser(ctx context.Context,subjectDID string,userDID string) (*placestreamtypes.GraphGetFollowingUser_Output, error) 112 out, handleErr = s.handlePlaceStreamGraphGetFollowingUser(ctx, subjectDID, userDID) 113 + if handleErr != nil { 114 + return handleErr 115 + } 116 + return c.JSON(200, out) 117 + } 118 + 119 + func (s *Server) HandlePlaceStreamLiveGetSegments(c echo.Context) error { 120 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandlePlaceStreamLiveGetSegments") 121 + defer span.End() 122 + before := c.QueryParam("before") 123 + 124 + var limit int 125 + if p := c.QueryParam("limit"); p != "" { 126 + var err error 127 + limit, err = strconv.Atoi(p) 128 + if err != nil { 129 + return err 130 + } 131 + } else { 132 + limit = 50 133 + } 134 + userDID := c.QueryParam("userDID") 135 + var out *placestreamtypes.LiveGetSegments_Output 136 + var handleErr error 137 + // func (s *Server) handlePlaceStreamLiveGetSegments(ctx context.Context,before string,limit int,userDID string) (*placestreamtypes.LiveGetSegments_Output, error) 138 + out, handleErr = s.handlePlaceStreamLiveGetSegments(ctx, before, limit, userDID) 139 if handleErr != nil { 140 return handleErr 141 }
+34
pkg/streamplace/livegetSegments.go
···
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package streamplace 4 + 5 + // schema: place.stream.live.getSegments 6 + 7 + import ( 8 + "context" 9 + 10 + "github.com/bluesky-social/indigo/xrpc" 11 + ) 12 + 13 + // LiveGetSegments_Output is the output of a place.stream.live.getSegments call. 14 + type LiveGetSegments_Output struct { 15 + Segments []*Segment_SegmentView `json:"segments,omitempty" cborgen:"segments,omitempty"` 16 + } 17 + 18 + // LiveGetSegments calls the XRPC method "place.stream.live.getSegments". 19 + // 20 + // userDID: The DID of the potentially-following user 21 + func LiveGetSegments(ctx context.Context, c *xrpc.Client, before string, limit int64, userDID string) (*LiveGetSegments_Output, error) { 22 + var out LiveGetSegments_Output 23 + 24 + params := map[string]interface{}{ 25 + "before": before, 26 + "limit": limit, 27 + "userDID": userDID, 28 + } 29 + if err := c.Do(ctx, xrpc.Query, "", "place.stream.live.getSegments", params, nil, &out); err != nil { 30 + return nil, err 31 + } 32 + 33 + return &out, nil 34 + }
+6
pkg/streamplace/streamsegment.go
··· 40 Num int64 `json:"num" cborgen:"num"` 41 } 42 43 // Segment_Video is a "video" in the place.stream.segment schema. 44 type Segment_Video struct { 45 Codec string `json:"codec" cborgen:"codec"`
··· 40 Num int64 `json:"num" cborgen:"num"` 41 } 42 43 + // Segment_SegmentView is a "segmentView" in the place.stream.segment schema. 44 + type Segment_SegmentView struct { 45 + Cid string `json:"cid" cborgen:"cid"` 46 + Record *util.LexiconTypeDecoder `json:"record" cborgen:"record"` 47 + } 48 + 49 // Segment_Video is a "video" in the place.stream.segment schema. 50 type Segment_Video struct { 51 Codec string `json:"codec" cborgen:"codec"`