Live video on the AT Protocol

replication: implement websocket replication

+519 -16
+13 -7
js/docs/src/content/docs/lex-reference/broadcast/place-stream-broadcast-origin.md
··· 20 21 **Record Properties:** 22 23 - | Name | Type | Req'd | Description | Constraints | 24 - | ------------- | -------- | ----- | -------------------------------------------------------------------------- | ------------------ | 25 - | `streamer` | `string` | ✅ | DID of the streamer whose livestream is being published | Format: `did` | 26 - | `server` | `string` | ✅ | did of the server that's currently rebroadcasting the livestream | Format: `did` | 27 - | `broadcaster` | `string` | ❌ | did of the broadcaster that operates the server syndicating the livestream | Format: `did` | 28 - | `updatedAt` | `string` | ✅ | Periodically updated timestamp when this origin last saw a livestream | Format: `datetime` | 29 - | `irohTicket` | `string` | ❌ | Iroh ticket that can be used to access the livestream from the server | Max Length: 2048 | 30 31 --- 32 ··· 69 "type": "string", 70 "maxLength": 2048, 71 "description": "Iroh ticket that can be used to access the livestream from the server" 72 } 73 } 74 }
··· 20 21 **Record Properties:** 22 23 + | Name | Type | Req'd | Description | Constraints | 24 + | -------------- | -------- | ----- | -------------------------------------------------------------------------- | ------------------ | 25 + | `streamer` | `string` | ✅ | DID of the streamer whose livestream is being published | Format: `did` | 26 + | `server` | `string` | ✅ | did of the server that's currently rebroadcasting the livestream | Format: `did` | 27 + | `broadcaster` | `string` | ❌ | did of the broadcaster that operates the server syndicating the livestream | Format: `did` | 28 + | `updatedAt` | `string` | ✅ | Periodically updated timestamp when this origin last saw a livestream | Format: `datetime` | 29 + | `irohTicket` | `string` | ❌ | Iroh ticket that can be used to access the livestream from the server | Max Length: 2048 | 30 + | `websocketURL` | `string` | ❌ | URL of the websocket endpoint for the livestream | Format: `uri` | 31 32 --- 33 ··· 70 "type": "string", 71 "maxLength": 2048, 72 "description": "Iroh ticket that can be used to access the livestream from the server" 73 + }, 74 + "websocketURL": { 75 + "type": "string", 76 + "format": "uri", 77 + "description": "URL of the websocket endpoint for the livestream" 78 } 79 } 80 }
+76
js/docs/src/content/docs/lex-reference/live/place-stream-live-subscribesegments.md
···
··· 1 + --- 2 + title: place.stream.live.subscribeSegments 3 + description: Reference for the place.stream.live.subscribeSegments lexicon 4 + --- 5 + 6 + **Lexicon Version:** 1 7 + 8 + ## Definitions 9 + 10 + <a name="main"></a> 11 + 12 + ### `main` 13 + 14 + **Type:** `subscription` 15 + 16 + Subscribe to a stream's new segments as they come in! 17 + 18 + **Parameters:** 19 + 20 + | Name | Type | Req'd | Description | Constraints | 21 + | ---------- | -------- | ----- | --------------------------------------- | ----------- | 22 + | `streamer` | `string` | ✅ | The DID of the streamer to subscribe to | | 23 + 24 + **Message:** 25 + 26 + - **Schema:** 27 + 28 + **Schema Type:** Union of:<br/>&nbsp;&nbsp;[`#segment`](#segment) 29 + 30 + --- 31 + 32 + <a name="segment"></a> 33 + 34 + ### `segment` 35 + 36 + **Type:** `bytes` 37 + 38 + MP4 file of a user's signed livestream segment 39 + 40 + --- 41 + 42 + ## Lexicon Source 43 + 44 + ```json 45 + { 46 + "lexicon": 1, 47 + "id": "place.stream.live.subscribeSegments", 48 + "defs": { 49 + "main": { 50 + "type": "subscription", 51 + "description": "Subscribe to a stream's new segments as they come in!", 52 + "parameters": { 53 + "type": "params", 54 + "required": ["streamer"], 55 + "properties": { 56 + "streamer": { 57 + "type": "string", 58 + "description": "The DID of the streamer to subscribe to" 59 + } 60 + } 61 + }, 62 + "message": { 63 + "schema": { 64 + "type": "union", 65 + "refs": ["#segment"] 66 + } 67 + }, 68 + "errors": [] 69 + }, 70 + "segment": { 71 + "type": "bytes", 72 + "description": "MP4 file of a user's signed livestream segment" 73 + } 74 + } 75 + } 76 + ```
+35
js/docs/src/content/docs/lex-reference/openapi.json
··· 652 ] 653 } 654 }, 655 "/xrpc/place.stream.graph.getFollowingUser": { 656 "get": { 657 "summary": "Get whether or not user A is following user B.", ··· 2097 "record": {} 2098 }, 2099 "required": ["cid", "record"] 2100 }, 2101 "com.atproto.repo.strongRef": { 2102 "type": "object",
··· 652 ] 653 } 654 }, 655 + "/xrpc/place.stream.live.subscribeSegments": { 656 + "get": { 657 + "summary": "Subscribe to a stream's new segments as they come in!", 658 + "operationId": "place.stream.live.subscribeSegments", 659 + "tags": ["place.stream.live"], 660 + "x-websocket": true, 661 + "responses": {}, 662 + "parameters": [ 663 + { 664 + "name": "streamer", 665 + "in": "query", 666 + "required": true, 667 + "description": "The DID of the streamer to subscribe to", 668 + "schema": { 669 + "type": "string", 670 + "description": "The DID of the streamer to subscribe to" 671 + } 672 + } 673 + ], 674 + "x-websocket-message": { 675 + "schema": { 676 + "oneOf": [ 677 + { 678 + "$ref": "#/components/schemas/place.stream.live.subscribeSegments_segment" 679 + } 680 + ] 681 + } 682 + } 683 + } 684 + }, 685 "/xrpc/place.stream.graph.getFollowingUser": { 686 "get": { 687 "summary": "Get whether or not user A is following user B.", ··· 2127 "record": {} 2128 }, 2129 "required": ["cid", "record"] 2130 + }, 2131 + "place.stream.live.subscribeSegments_segment": { 2132 + "type": "string", 2133 + "format": "byte", 2134 + "description": "MP4 file of a user's signed livestream segment" 2135 }, 2136 "com.atproto.repo.strongRef": { 2137 "type": "object",
+5
lexicons/place/stream/broadcast/origin.json
··· 34 "type": "string", 35 "maxLength": 2048, 36 "description": "Iroh ticket that can be used to access the livestream from the server" 37 } 38 } 39 }
··· 34 "type": "string", 35 "maxLength": 2048, 36 "description": "Iroh ticket that can be used to access the livestream from the server" 37 + }, 38 + "websocketURL": { 39 + "type": "string", 40 + "format": "uri", 41 + "description": "URL of the websocket endpoint for the livestream" 42 } 43 } 44 }
+31
lexicons/place/stream/live/subscribeSegments.json
···
··· 1 + { 2 + "lexicon": 1, 3 + "id": "place.stream.live.subscribeSegments", 4 + "defs": { 5 + "main": { 6 + "type": "subscription", 7 + "description": "Subscribe to a stream's new segments as they come in!", 8 + "parameters": { 9 + "type": "params", 10 + "required": ["streamer"], 11 + "properties": { 12 + "streamer": { 13 + "type": "string", 14 + "description": "The DID of the streamer to subscribe to" 15 + } 16 + } 17 + }, 18 + "message": { 19 + "schema": { 20 + "type": "union", 21 + "refs": ["#segment"] 22 + } 23 + }, 24 + "errors": [] 25 + }, 26 + "segment": { 27 + "type": "bytes", 28 + "description": "MP4 file of a user's signed livestream segment" 29 + } 30 + } 31 + }
+1 -1
pkg/api/api.go
··· 145 Recorder: metrics.NewRecorder(metrics.Config{}), 146 }) 147 var xrpc http.Handler 148 - xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync) 149 if err != nil { 150 return nil, err 151 }
··· 145 Recorder: metrics.NewRecorder(metrics.Config{}), 146 }) 147 var xrpc http.Handler 148 + xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus) 149 if err != nil { 150 return nil, err 151 }
+4
pkg/cmd/streamplace.go
··· 38 "stream.place/streamplace/pkg/notifications" 39 "stream.place/streamplace/pkg/replication" 40 "stream.place/streamplace/pkg/replication/iroh_replicator" 41 "stream.place/streamplace/pkg/rtmps" 42 v0 "stream.place/streamplace/pkg/schema/v0" 43 "stream.place/streamplace/pkg/spmetrics" ··· 440 if err != nil { 441 return err 442 } 443 } 444 445 op := oatproxy.New(&oatproxy.Config{
··· 38 "stream.place/streamplace/pkg/notifications" 39 "stream.place/streamplace/pkg/replication" 40 "stream.place/streamplace/pkg/replication/iroh_replicator" 41 + "stream.place/streamplace/pkg/replication/websocketrep" 42 "stream.place/streamplace/pkg/rtmps" 43 v0 "stream.place/streamplace/pkg/schema/v0" 44 "stream.place/streamplace/pkg/spmetrics" ··· 441 if err != nil { 442 return err 443 } 444 + } 445 + if slices.Contains(cli.Replicators, config.ReplicatorWebsocket) { 446 + replicator = websocketrep.NewWebsocketReplicator(b, mod, mm) 447 } 448 449 op := oatproxy.New(&oatproxy.Config{
+5 -3
pkg/config/config.go
··· 132 DevAccountCreds map[string]string 133 StreamSessionTimeout time.Duration 134 Replicators []string 135 } 136 137 // ContentFilters represents the content filtering configuration ··· 146 } 147 148 const ( 149 - ReplicatorHTTP string = "http" 150 - ReplicatorIroh string = "iroh" 151 ) 152 153 func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { ··· 224 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay") 225 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth") 226 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?") 227 - cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorIroh}, "list of replication protocols to use (http, iroh)") 228 229 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 230 _ = starter.NewLivepeerConfig(lpFlags)
··· 132 DevAccountCreds map[string]string 133 StreamSessionTimeout time.Duration 134 Replicators []string 135 + WebsocketURL string 136 } 137 138 // ContentFilters represents the content filtering configuration ··· 147 } 148 149 const ( 150 + ReplicatorWebsocket string = "websocket" 151 + ReplicatorIroh string = "iroh" 152 ) 153 154 func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { ··· 225 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay") 226 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth") 227 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?") 228 + cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)") 229 + fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)") 230 231 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 232 _ = starter.NewLivepeerConfig(lpFlags)
+8
pkg/media/validate.go
··· 41 if err != nil { 42 return err 43 } 44 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 45 if err != nil { 46 return err
··· 41 if err != nil { 42 return err 43 } 44 + oldSeg, err := mm.model.GetSegment(*maniCert.Manifest.Label) 45 + if err != nil { 46 + return err 47 + } 48 + if oldSeg != nil { 49 + log.Warn(ctx, "segment already exists, skipping", "segmentID", *maniCert.Manifest.Label) 50 + return nil 51 + } 52 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 53 if err != nil { 54 return err
+4 -1
pkg/multitest/multitest_test.go
··· 112 for _, acct := range dev.Accounts { 113 devAccountCreds = append(devAccountCreds, fmt.Sprintf("%s=%s", acct.DID, acct.Password)) 114 } 115 env := map[string]string{ 116 - "SP_HTTP_ADDR": fmt.Sprintf("127.0.0.1:%d", nextPort()), 117 "SP_HTTP_INTERNAL_ADDR": fmt.Sprintf("127.0.0.1:%d", nextPort()), 118 "SP_RELAY_HOST": strings.ReplaceAll(dev.PDSURL, "http://", "ws://"), 119 "SP_PLC_URL": dev.PLCURL, ··· 122 "SP_STREAM_SESSION_TIMEOUT": "3s", 123 "SP_COLOR": "true", 124 "RUST_LOG": os.Getenv("RUST_LOG"), 125 } 126 _, file, _, _ := runtime.Caller(0) 127 buildDir := fmt.Sprintf("build-%s-%s", runtime.GOOS, runtime.GOARCH)
··· 112 for _, acct := range dev.Accounts { 113 devAccountCreds = append(devAccountCreds, fmt.Sprintf("%s=%s", acct.DID, acct.Password)) 114 } 115 + apiPort := nextPort() 116 env := map[string]string{ 117 + "SP_HTTP_ADDR": fmt.Sprintf("127.0.0.1:%d", apiPort), 118 "SP_HTTP_INTERNAL_ADDR": fmt.Sprintf("127.0.0.1:%d", nextPort()), 119 "SP_RELAY_HOST": strings.ReplaceAll(dev.PDSURL, "http://", "ws://"), 120 "SP_PLC_URL": dev.PLCURL, ··· 123 "SP_STREAM_SESSION_TIMEOUT": "3s", 124 "SP_COLOR": "true", 125 "RUST_LOG": os.Getenv("RUST_LOG"), 126 + "SP_BROADCASTER_HOST": fmt.Sprintf("%s.example.com", name), 127 + "SP_WEBSOCKET_URL": fmt.Sprintf("ws://127.0.0.1:%d", apiPort), 128 } 129 _, file, _, _ := runtime.Caller(0) 130 buildDir := fmt.Sprintf("build-%s-%s", runtime.GOOS, runtime.GOARCH)
+199
pkg/replication/websocketrep/websocket_replicator.go
···
··· 1 + package websocketrep 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "net/url" 8 + "sync" 9 + 10 + "github.com/gorilla/websocket" 11 + "golang.org/x/sync/errgroup" 12 + "stream.place/streamplace/pkg/bus" 13 + "stream.place/streamplace/pkg/config" 14 + "stream.place/streamplace/pkg/log" 15 + "stream.place/streamplace/pkg/media" 16 + "stream.place/streamplace/pkg/model" 17 + "stream.place/streamplace/pkg/streamplace" 18 + ) 19 + 20 + type WebsocketReplicator struct { 21 + bus *bus.Bus 22 + cli *config.CLI 23 + mod model.Model 24 + conns map[string]bool 25 + connsMutex sync.RWMutex 26 + group *errgroup.Group 27 + mm *media.MediaManager 28 + } 29 + 30 + func NewWebsocketReplicator(bus *bus.Bus, mod model.Model, mm *media.MediaManager) *WebsocketReplicator { 31 + return &WebsocketReplicator{ 32 + bus: bus, 33 + mod: mod, 34 + conns: make(map[string]bool), 35 + connsMutex: sync.RWMutex{}, 36 + mm: mm, 37 + } 38 + } 39 + 40 + func (r *WebsocketReplicator) Start(ctx context.Context, cli *config.CLI) error { 41 + r.cli = cli 42 + _ = r.getMyWebsocketURL() // panic check 43 + r.group, ctx = errgroup.WithContext(ctx) 44 + return r.startBusSubscribe(ctx) 45 + } 46 + 47 + func (r *WebsocketReplicator) startBusSubscribe(ctx context.Context) error { 48 + // start subscription first so we're buffering new origins 49 + busCh := r.bus.Subscribe("") 50 + originViews, err := r.mod.GetRecentBroadcastOrigins(ctx) 51 + if err != nil { 52 + return fmt.Errorf("failed to get recent broadcast origins: %w", err) 53 + } 54 + for _, view := range originViews { 55 + err = r.handleOriginMessage(ctx, view) 56 + if err != nil { 57 + log.Error(ctx, "could not check origin", "error", err) 58 + } 59 + } 60 + log.Log(ctx, "Resumed recent broadcast origins", "count", len(originViews)) 61 + for { 62 + select { 63 + case <-ctx.Done(): 64 + return ctx.Err() 65 + case msg := <-busCh: 66 + if view, ok := msg.(*streamplace.BroadcastDefs_BroadcastOriginView); ok { 67 + log.Debug(ctx, "got broadcast origin view", "view", view) 68 + err = r.handleOriginMessage(ctx, view) 69 + if err != nil { 70 + log.Error(ctx, "could not handle origin message", "error", err) 71 + } 72 + } 73 + } 74 + } 75 + } 76 + 77 + func (r *WebsocketReplicator) handleOriginMessage(ctx context.Context, view *streamplace.BroadcastDefs_BroadcastOriginView) error { 78 + origin, ok := view.Record.Val.(*streamplace.BroadcastOrigin) 79 + if !ok { 80 + return fmt.Errorf("record is not a BroadcastOrigin") 81 + } 82 + ctx = log.WithLogValues(ctx, "streamer", view.Author.Did) 83 + if origin.WebsocketURL == nil { 84 + return fmt.Errorf("origin has no websocket URL author=%s", view.Author.Did) 85 + } 86 + if r.hasConnection(origin.Streamer) { 87 + log.Debug(ctx, "already has connection") 88 + return nil 89 + } 90 + myURL := r.getMyWebsocketURL() 91 + u, err := url.Parse(*origin.WebsocketURL) 92 + if err != nil { 93 + return fmt.Errorf("could not parse origin websocket URL: %w", err) 94 + } 95 + if u.Host == myURL.Host { 96 + log.Debug(ctx, "origin websocket URL is on this node, skipping") 97 + return nil 98 + } 99 + r.group.Go(func() error { 100 + err := r.openWebsocket(ctx, view) 101 + log.Error(ctx, "websocket connection error", "error", err) 102 + return nil 103 + }) 104 + return nil 105 + } 106 + 107 + func (r *WebsocketReplicator) openWebsocket(ctx context.Context, view *streamplace.BroadcastDefs_BroadcastOriginView) error { 108 + err := r.tryConnection(view.Author.Did) 109 + if err != nil { 110 + return err 111 + } 112 + defer r.removeConnection(view.Author.Did) 113 + origin, ok := view.Record.Val.(*streamplace.BroadcastOrigin) 114 + if !ok { 115 + return fmt.Errorf("record is not a BroadcastOrigin") 116 + } 117 + if origin.WebsocketURL == nil { 118 + return fmt.Errorf("origin has no websocket URL") 119 + } 120 + conn, _, err := websocket.DefaultDialer.Dial(*origin.WebsocketURL, nil) 121 + if err != nil { 122 + return fmt.Errorf("could not dial websocket: %w", err) 123 + } 124 + defer conn.Close() 125 + for { 126 + typ, msg, err := conn.ReadMessage() 127 + if err != nil { 128 + log.Error(ctx, "could not read message", "error", err) 129 + return fmt.Errorf("could not read message: %w", err) 130 + } 131 + if typ != websocket.BinaryMessage { 132 + log.Error(ctx, "expected binary message", "type", typ) 133 + return fmt.Errorf("expected binary message") 134 + } 135 + log.Debug(ctx, "received message", "type", typ, "length", len(msg)) 136 + err = r.mm.ValidateMP4(context.Background(), bytes.NewReader(msg), false) 137 + if err != nil { 138 + return fmt.Errorf("could not validate segment: %w", err) 139 + } 140 + } 141 + } 142 + 143 + func (r *WebsocketReplicator) hasConnection(origin string) bool { 144 + r.connsMutex.RLock() 145 + defer r.connsMutex.RUnlock() 146 + return r.conns[origin] 147 + } 148 + 149 + func (r *WebsocketReplicator) tryConnection(origin string) error { 150 + r.connsMutex.Lock() 151 + defer r.connsMutex.Unlock() 152 + if _, ok := r.conns[origin]; ok { 153 + return fmt.Errorf("connection already exists") 154 + } 155 + r.conns[origin] = true 156 + return nil 157 + } 158 + 159 + func (r *WebsocketReplicator) removeConnection(origin string) { 160 + r.connsMutex.Lock() 161 + defer r.connsMutex.Unlock() 162 + delete(r.conns, origin) 163 + } 164 + 165 + // we're pull-based, nothing to do here 166 + func (r *WebsocketReplicator) SendSegment(ctx context.Context, seg *media.NewSegmentNotification) error { 167 + return nil 168 + } 169 + 170 + func (r *WebsocketReplicator) BuildOriginRecord(origin *streamplace.BroadcastOrigin) error { 171 + u := r.getMyWebsocketURL() 172 + u.Path = "/xrpc/place.stream.live.subscribeSegments" 173 + u.RawQuery = url.Values{ 174 + "streamer": []string{origin.Streamer}, 175 + }.Encode() 176 + 177 + urlStr := u.String() 178 + origin.WebsocketURL = &urlStr 179 + return nil 180 + } 181 + 182 + func (r *WebsocketReplicator) getMyWebsocketURL() *url.URL { 183 + if r.cli.WebsocketURL != "" { 184 + u, err := url.Parse(r.cli.WebsocketURL) 185 + // chill to panic, we're going to check this on boot 186 + if err != nil { 187 + panic("invalid websocket override URL: " + r.cli.WebsocketURL) 188 + } 189 + return u 190 + } 191 + u := url.URL{ 192 + Scheme: "ws", 193 + Host: r.cli.ServerHost, 194 + } 195 + if r.cli.Secure { 196 + u.Scheme = "wss" 197 + } 198 + return &u 199 + }
+6 -1
pkg/spmetrics/spmetrics.go
··· 75 76 var WebsocketsOpen = promauto.NewGauge(prometheus.GaugeOpts{ 77 Name: "streamplace_websockets_open", 78 - Help: "number of open websockets", 79 }) 80 81 var SegmentSubscriptionsOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{
··· 75 76 var WebsocketsOpen = promauto.NewGauge(prometheus.GaugeOpts{ 77 Name: "streamplace_websockets_open", 78 + Help: "number of open playback websockets", 79 + }) 80 + 81 + var ReplicationWebsocketsOpen = promauto.NewGauge(prometheus.GaugeOpts{ 82 + Name: "streamplace_replication_websockets_open", 83 + Help: "number of open replication websockets", 84 }) 85 86 var SegmentSubscriptionsOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{
+61
pkg/spxrpc/place_stream_live.go
··· 7 "time" 8 9 "github.com/bluesky-social/indigo/lex/util" 10 "github.com/labstack/echo/v4" 11 "stream.place/streamplace/pkg/spid" 12 "stream.place/streamplace/pkg/spmetrics" 13 14 placestreamtypes "stream.place/streamplace/pkg/streamplace" 15 ) 16 17 func (s *Server) handlePlaceStreamLiveGetSegments(ctx context.Context, before string, limit int, userDID string) (*placestreamtypes.LiveGetSegments_Output, error) { 18 if userDID == "" { ··· 92 93 return liveUsers, nil 94 }
··· 7 "time" 8 9 "github.com/bluesky-social/indigo/lex/util" 10 + "github.com/gorilla/websocket" 11 "github.com/labstack/echo/v4" 12 + "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/spid" 14 "stream.place/streamplace/pkg/spmetrics" 15 16 placestreamtypes "stream.place/streamplace/pkg/streamplace" 17 ) 18 + 19 + var replicationUpgrader = websocket.Upgrader{ 20 + ReadBufferSize: 1024, 21 + WriteBufferSize: 1024 * 1024 * 10, // 10MB 22 + CheckOrigin: func(r *http.Request) bool { 23 + return true 24 + }, 25 + } 26 27 func (s *Server) handlePlaceStreamLiveGetSegments(ctx context.Context, before string, limit int, userDID string) (*placestreamtypes.LiveGetSegments_Output, error) { 28 if userDID == "" { ··· 102 103 return liveUsers, nil 104 } 105 + 106 + func (s *Server) handlePlaceStreamLiveSubscribeSegments(c echo.Context) error { 107 + user := c.QueryParam("streamer") 108 + if user == "" { 109 + return echo.NewHTTPError(http.StatusBadRequest, "User DID is required") 110 + } 111 + spmetrics.ReplicationWebsocketsOpen.Inc() 112 + defer spmetrics.ReplicationWebsocketsOpen.Dec() 113 + ws, err := replicationUpgrader.Upgrade(c.Response(), c.Request(), nil) 114 + if err != nil { 115 + return err 116 + } 117 + defer ws.Close() 118 + 119 + ctx, cancel := context.WithCancel(c.Request().Context()) 120 + defer cancel() 121 + go func() { 122 + 123 + segChan := s.bus.SubscribeSegmentBuf(ctx, user, "source", 2) 124 + defer s.bus.UnsubscribeSegment(ctx, user, "source", segChan) 125 + for { 126 + select { 127 + case <-ctx.Done(): 128 + log.Debug(ctx, "exiting segment reader") 129 + return 130 + case file := <-segChan.C: 131 + log.Debug(ctx, "got segment", "file", file.Filepath) 132 + err := ws.WriteMessage(websocket.BinaryMessage, file.Data) 133 + if err != nil { 134 + log.Error(ctx, "could not write message", "error", err) 135 + cancel() 136 + return 137 + } 138 + } 139 + } 140 + 141 + }() 142 + 143 + for { 144 + if ctx.Err() != nil { 145 + return ctx.Err() 146 + } 147 + // Read 148 + _, msg, err := ws.ReadMessage() 149 + if err != nil { 150 + c.Logger().Error(err) 151 + return err 152 + } 153 + log.Debug(c.Request().Context(), "received message", "message", string(msg)) 154 + } 155 + }
+5 -1
pkg/spxrpc/spxrpc.go
··· 16 "github.com/streamplace/oatproxy/pkg/oatproxy" 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/atproto" 19 "stream.place/streamplace/pkg/config" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/model" ··· 29 OGImageCache *cache.Cache 30 ATSync *atproto.ATProtoSynchronizer 31 statefulDB *statedb.StatefulDB 32 } 33 34 - func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer) (*Server, error) { 35 e := echo.New() 36 s := &Server{ 37 e: e, ··· 40 OGImageCache: cache.New(5*time.Minute, 10*time.Minute), // 5min TTL, 10min cleanup 41 ATSync: atsync, 42 statefulDB: statefulDB, 43 } 44 e.Use(s.ErrorHandlingMiddleware()) 45 e.Use(s.ContextPreservingMiddleware()) ··· 61 return c.JSON(http.StatusOK, map[string]string{"version": cli.Build.Version}) 62 }) 63 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleComAtprotoSyncSubscribeRepos) 64 e.GET("/xrpc/*", s.HandleWildcard) 65 e.POST("/xrpc/*", s.HandleWildcard) 66 return s, nil
··· 16 "github.com/streamplace/oatproxy/pkg/oatproxy" 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/atproto" 19 + "stream.place/streamplace/pkg/bus" 20 "stream.place/streamplace/pkg/config" 21 "stream.place/streamplace/pkg/log" 22 "stream.place/streamplace/pkg/model" ··· 30 OGImageCache *cache.Cache 31 ATSync *atproto.ATProtoSynchronizer 32 statefulDB *statedb.StatefulDB 33 + bus *bus.Bus 34 } 35 36 + func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer, bus *bus.Bus) (*Server, error) { 37 e := echo.New() 38 s := &Server{ 39 e: e, ··· 42 OGImageCache: cache.New(5*time.Minute, 10*time.Minute), // 5min TTL, 10min cleanup 43 ATSync: atsync, 44 statefulDB: statefulDB, 45 + bus: bus, 46 } 47 e.Use(s.ErrorHandlingMiddleware()) 48 e.Use(s.ContextPreservingMiddleware()) ··· 64 return c.JSON(http.StatusOK, map[string]string{"version": cli.Build.Version}) 65 }) 66 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleComAtprotoSyncSubscribeRepos) 67 + e.GET("/xrpc/place.stream.live.subscribeSegments", s.handlePlaceStreamLiveSubscribeSegments) 68 e.GET("/xrpc/*", s.HandleWildcard) 69 e.POST("/xrpc/*", s.HandleWildcard) 70 return s, nil
+2
pkg/streamplace/broadcastorigin.go
··· 24 Streamer string `json:"streamer" cborgen:"streamer"` 25 // updatedAt: Periodically updated timestamp when this origin last saw a livestream 26 UpdatedAt string `json:"updatedAt" cborgen:"updatedAt"` 27 }
··· 24 Streamer string `json:"streamer" cborgen:"streamer"` 25 // updatedAt: Periodically updated timestamp when this origin last saw a livestream 26 UpdatedAt string `json:"updatedAt" cborgen:"updatedAt"` 27 + // websocketURL: URL of the websocket endpoint for the livestream 28 + WebsocketURL *string `json:"websocketURL,omitempty" cborgen:"websocketURL,omitempty"` 29 }
+59 -2
pkg/streamplace/cbor_gen.go
··· 3455 } 3456 3457 cw := cbg.NewCborWriter(w) 3458 - fieldCount := 6 3459 3460 if t.Broadcaster == nil { 3461 fieldCount-- 3462 } 3463 3464 if t.IrohTicket == nil { 3465 fieldCount-- 3466 } 3467 ··· 3620 } 3621 } 3622 } 3623 return nil 3624 } 3625 ··· 3648 3649 n := extra 3650 3651 - nameBuf := make([]byte, 11) 3652 for i := uint64(0); i < n; i++ { 3653 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 3654 if err != nil { ··· 3748 } 3749 3750 t.Broadcaster = (*string)(&sval) 3751 } 3752 } 3753
··· 3455 } 3456 3457 cw := cbg.NewCborWriter(w) 3458 + fieldCount := 7 3459 3460 if t.Broadcaster == nil { 3461 fieldCount-- 3462 } 3463 3464 if t.IrohTicket == nil { 3465 + fieldCount-- 3466 + } 3467 + 3468 + if t.WebsocketURL == nil { 3469 fieldCount-- 3470 } 3471 ··· 3624 } 3625 } 3626 } 3627 + 3628 + // t.WebsocketURL (string) (string) 3629 + if t.WebsocketURL != nil { 3630 + 3631 + if len("websocketURL") > 1000000 { 3632 + return xerrors.Errorf("Value in field \"websocketURL\" was too long") 3633 + } 3634 + 3635 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("websocketURL"))); err != nil { 3636 + return err 3637 + } 3638 + if _, err := cw.WriteString(string("websocketURL")); err != nil { 3639 + return err 3640 + } 3641 + 3642 + if t.WebsocketURL == nil { 3643 + if _, err := cw.Write(cbg.CborNull); err != nil { 3644 + return err 3645 + } 3646 + } else { 3647 + if len(*t.WebsocketURL) > 1000000 { 3648 + return xerrors.Errorf("Value in field t.WebsocketURL was too long") 3649 + } 3650 + 3651 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.WebsocketURL))); err != nil { 3652 + return err 3653 + } 3654 + if _, err := cw.WriteString(string(*t.WebsocketURL)); err != nil { 3655 + return err 3656 + } 3657 + } 3658 + } 3659 return nil 3660 } 3661 ··· 3684 3685 n := extra 3686 3687 + nameBuf := make([]byte, 12) 3688 for i := uint64(0); i < n; i++ { 3689 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 3690 if err != nil { ··· 3784 } 3785 3786 t.Broadcaster = (*string)(&sval) 3787 + } 3788 + } 3789 + // t.WebsocketURL (string) (string) 3790 + case "websocketURL": 3791 + 3792 + { 3793 + b, err := cr.ReadByte() 3794 + if err != nil { 3795 + return err 3796 + } 3797 + if b != cbg.CborNull[0] { 3798 + if err := cr.UnreadByte(); err != nil { 3799 + return err 3800 + } 3801 + 3802 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 3803 + if err != nil { 3804 + return err 3805 + } 3806 + 3807 + t.WebsocketURL = (*string)(&sval) 3808 } 3809 } 3810
+5
pkg/streamplace/livesubscribeSegments.go
···
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package streamplace 4 + 5 + // schema: place.stream.live.subscribeSegments