Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

add RTMPS support (#172)

* rtmp: cheeky mistserver sidecar container?

* rtmp: signer cache for mistserver

* rtmps: clean up, add docker build

* media: move mkv_ingest to its own file

* media: clean up MKVIngest

* rtmps: add frontend

* docker: fix mistserver dir

authored by

Eli Mallon and committed by
Natalie B
cec8711f 14455c47

+674 -190
+19
.gitlab-ci.yml
··· 140 140 --destination "$CI_REGISTRY_IMAGE:$STREAMPLACE_BRANCH-amd64" 141 141 --destination "$CI_REGISTRY_IMAGE:$STREAMPLACE_VERSION-amd64" 142 142 143 + build-docker-mistserver: 144 + stage: build 145 + interruptible: true 146 + needs: 147 + - job: build 148 + artifacts: true 149 + image: 150 + name: gcr.io/kaniko-project/executor:v1.14.0-debug 151 + entrypoint: [""] 152 + timeout: 2 hours 153 + script: 154 + - /kaniko/executor 155 + --build-arg TARGETARCH=amd64 156 + --build-arg STREAMPLACE_URL=$STREAMPLACE_URL_LINUX_AMD64 157 + --context "${CI_PROJECT_DIR}" 158 + --dockerfile "${CI_PROJECT_DIR}/docker/mistserver.Dockerfile" 159 + --destination "$CI_REGISTRY_IMAGE:$STREAMPLACE_BRANCH-mistserver" 160 + --destination "$CI_REGISTRY_IMAGE:$STREAMPLACE_VERSION-mistserver" 161 + 143 162 build-docker-arm64: 144 163 stage: build 145 164 interruptible: true
+11
Makefile
··· 471 471 in-container: docker-build-builder 472 472 $(DOCKER_BIN) run $(DOCKER_OPTS) -v $$(pwd):$$(pwd) -w $$(pwd) --rm $(DOCKER_REF) bash -c "$(IN_CONTAINER_CMD)" 473 473 474 + STREAMPLACE_URL?=https://git.stream.place/streamplace/streamplace/-/package_files/10122/download 474 475 .PHONY: docker-release 475 476 docker-release: 476 477 cd docker \ 477 478 && docker build -f release.Dockerfile \ 478 479 --build-arg TARGETARCH=$(BUILDARCH) \ 480 + --build-arg STREAMPLACE_URL=$(STREAMPLACE_URL) \ 479 481 -t dist.stream.place/streamplace/streamplace \ 482 + . 483 + 484 + .PHONY: docker-mistserver 485 + docker-mistserver: 486 + cd docker \ 487 + && docker build -f mistserver.Dockerfile \ 488 + --build-arg TARGETARCH=$(BUILDARCH) \ 489 + --build-arg STREAMPLACE_URL=$(STREAMPLACE_URL) \ 490 + -t dist.stream.place/streamplace/streamplace:mistserver \ 480 491 . 481 492 482 493 .PHONY: ci-upload
+13
docker/mistserver.Dockerfile
··· 1 + ARG TARGETARCH 2 + FROM --platform=linux/$TARGETARCH ubuntu:24.04 3 + RUN apt update && apt install -y curl 4 + ARG STREAMPLACE_URL 5 + ENV STREAMPLACE_URL $STREAMPLACE_URL 6 + # strip the -cloudflare suffix from the url; we're on the git server we don't need to leave 7 + RUN export LOCAL_URL="$(echo $STREAMPLACE_URL | sed 's/-cloudflare//')" && echo "downloading $LOCAL_URL" && cd /usr/local/bin && curl -L "$LOCAL_URL" | tar xzv 8 + 9 + RUN apt-get update && apt-get install -y curl 10 + RUN curl -o - https://releases.mistserver.org/is/mistserver_64V3.6.1.tar.gz 2>/dev/null | sh 11 + RUN mkdir -p /config 12 + ADD ./docker/mistserver.json /config/mistserver.json 13 + CMD ["MistController", "-c", "/config/mistserver.json"]
+149
docker/mistserver.json
··· 1 + { 2 + "account": { 3 + "streamplace": { 4 + "password": "9bc4bd49515c5ade1fa94f8301c24473" 5 + } 6 + }, 7 + "auto_push": null, 8 + "bandwidth": { 9 + "exceptions": [ 10 + "::1", 11 + "127.0.0.0/8", 12 + "10.0.0.0/8", 13 + "192.168.0.0/16", 14 + "172.16.0.0/12" 15 + ] 16 + }, 17 + "config": { 18 + "accesslog": "LOG", 19 + "controller": { 20 + "interface": "127.0.0.1", 21 + "port": null, 22 + "username": null 23 + }, 24 + "defaultStream": null, 25 + "limits": null, 26 + "prometheus": "", 27 + "protocols": [ 28 + { 29 + "connector": "AAC" 30 + }, 31 + { 32 + "connector": "CMAF" 33 + }, 34 + { 35 + "connector": "EBML" 36 + }, 37 + { 38 + "connector": "FLAC" 39 + }, 40 + { 41 + "connector": "FLV" 42 + }, 43 + { 44 + "connector": "H264" 45 + }, 46 + { 47 + "connector": "HDS" 48 + }, 49 + { 50 + "connector": "HLS" 51 + }, 52 + { 53 + "connector": "HTTPTS" 54 + }, 55 + { 56 + "connector": "JPG" 57 + }, 58 + { 59 + "connector": "JSON" 60 + }, 61 + { 62 + "connector": "MP3" 63 + }, 64 + { 65 + "connector": "MP4" 66 + }, 67 + { 68 + "connector": "OGG" 69 + }, 70 + { 71 + "connector": "RTMP", 72 + "interface": "127.0.0.1", 73 + "port": 31935 74 + }, 75 + { 76 + "connector": "SDP" 77 + }, 78 + { 79 + "connector": "SubRip" 80 + }, 81 + { 82 + "connector": "WAV" 83 + }, 84 + { 85 + "connector": "HTTP", 86 + "interface": "127.0.0.1", 87 + "port": 28080, 88 + "pubaddr": [] 89 + } 90 + ], 91 + "serverid": null, 92 + "sessionInputMode": 15, 93 + "sessionOutputMode": 15, 94 + "sessionStreamInfoMode": 1, 95 + "sessionUnspecifiedMode": 0, 96 + "sessionViewerMode": 14, 97 + "tknMode": 15, 98 + "triggers": { 99 + "PUSH_REWRITE": [ 100 + { 101 + "handler": "http://127.0.0.1:39090/mist-trigger", 102 + "streams": [], 103 + "sync": true 104 + } 105 + ] 106 + }, 107 + "trustedproxy": [] 108 + }, 109 + "extwriters": null, 110 + "push_settings": { 111 + "maxspeed": 0, 112 + "wait": 3 113 + }, 114 + "streams": { 115 + "stream": { 116 + "debug": 5, 117 + "name": "stream", 118 + "processes": [ 119 + { 120 + "debug": 5, 121 + "exec": "streamplace live $wildcard", 122 + "exit_unmask": false, 123 + "inconsequential": false, 124 + "process": "MKVExec", 125 + "restart_type": "fixed" 126 + } 127 + ], 128 + "source": "push://", 129 + "stop_sessions": false, 130 + "tags": [] 131 + } 132 + }, 133 + "ui_settings": { 134 + "HTTPUrl": "http://127.0.0.1:28080/", 135 + "sort_autopushes": { 136 + "by": "Stream", 137 + "dir": 1 138 + }, 139 + "sort_pushes": { 140 + "by": "Statistics", 141 + "dir": 1 142 + }, 143 + "sortstreams": { 144 + "by": "name", 145 + "dir": 1 146 + } 147 + }, 148 + "variables": null 149 + }
+86 -24
js/app/components/live-dashboard/stream-key.tsx
··· 10 10 import { useAppDispatch, useAppSelector } from "store/hooks"; 11 11 import { View, Paragraph, Button, Text } from "tamagui"; 12 12 import { Redirect } from "components/aqlink"; 13 - import Waiting from "./waiting"; 14 13 const Row = ({ children }: { children: React.ReactNode }) => { 15 14 return ( 16 15 <View w="100%" f={1} fd="row" padding="$4"> ··· 36 35 }; 37 36 38 37 export default function StreamKeyScreen() { 38 + const [protocol, setProtocol] = useState("whip"); 39 39 const isReady = useAppSelector(selectIsReady); 40 40 if (!isReady) { 41 41 return <Loading />; ··· 49 49 if (!userProfile) { 50 50 return <Loading />; 51 51 } 52 + 52 53 return ( 53 54 <View 54 55 f={1} ··· 61 62 > 62 63 <View w="100%" maxWidth={600}> 63 64 <Row> 64 - <Left> 65 - <Paragraph>Service</Paragraph> 66 - </Left> 67 - <Right> 68 - <Paragraph>WHIP</Paragraph> 69 - </Right> 65 + <Button 66 + marginHorizontal={10} 67 + backgroundColor={ 68 + protocol === "whip" ? "$accentBackground" : "$grey2" 69 + } 70 + onPress={() => setProtocol("whip")} 71 + > 72 + WHIP 73 + </Button> 74 + <Button 75 + marginHorizontal={10} 76 + backgroundColor={ 77 + protocol === "rtmp" ? "$accentBackground" : "$grey2" 78 + } 79 + onPress={() => setProtocol("rtmp")} 80 + > 81 + RTMP (beta) 82 + </Button> 70 83 </Row> 71 - <Row> 72 - <Left> 73 - <Paragraph>Server</Paragraph> 74 - </Left> 75 - <Right> 76 - <Paragraph>{url}</Paragraph> 77 - </Right> 78 - </Row> 79 - <Row> 80 - <Left> 81 - <Paragraph>Bearer Token</Paragraph> 82 - </Left> 83 - <Right> 84 - <StreamKey /> 85 - </Right> 86 - </Row> 84 + {protocol === "whip" && <WHIPDescription url={url} />} 85 + {protocol === "rtmp" && <RTMPDescription url={url} />} 87 86 <Row> 88 87 <Left> 89 88 <Paragraph>Output Settings</Paragraph> ··· 99 98 </Right> 100 99 </Row> 101 100 </View> 102 - <Waiting /> 103 101 </View> 102 + ); 103 + } 104 + 105 + export function WHIPDescription({ url }: { url: string }) { 106 + return ( 107 + <> 108 + <Row> 109 + <Left> 110 + <Paragraph>Service</Paragraph> 111 + </Left> 112 + <Right> 113 + <Paragraph>WHIP</Paragraph> 114 + </Right> 115 + </Row> 116 + <Row> 117 + <Left> 118 + <Paragraph>Server</Paragraph> 119 + </Left> 120 + <Right> 121 + <Paragraph>{url}</Paragraph> 122 + </Right> 123 + </Row> 124 + <Row> 125 + <Left> 126 + <Paragraph>Bearer Token</Paragraph> 127 + </Left> 128 + <Right> 129 + <StreamKey /> 130 + </Right> 131 + </Row> 132 + </> 133 + ); 134 + } 135 + 136 + export function RTMPDescription({ url }: { url: string }) { 137 + const u = new URL(url); 138 + const rtmpUrl = `rtmps://${u.host}:1935/live`; 139 + return ( 140 + <> 141 + <Row> 142 + <Left> 143 + <Paragraph>Service</Paragraph> 144 + </Left> 145 + <Right> 146 + <Paragraph>Custom...</Paragraph> 147 + </Right> 148 + </Row> 149 + <Row> 150 + <Left> 151 + <Paragraph>Server</Paragraph> 152 + </Left> 153 + <Right> 154 + <Paragraph>{rtmpUrl}</Paragraph> 155 + </Right> 156 + </Row> 157 + <Row> 158 + <Left> 159 + <Paragraph>Stream Key</Paragraph> 160 + </Left> 161 + <Right> 162 + <StreamKey /> 163 + </Right> 164 + </Row> 165 + </> 104 166 ); 105 167 } 106 168
+4 -4
js/app/src/screens/live-dashboard.tsx
··· 12 12 import { useLiveUser } from "hooks/useLiveUser"; 13 13 import StreamKeyScreen from "components/live-dashboard/stream-key"; 14 14 import { VideoElementProvider } from "contexts/VideoElementContext"; 15 + import { Camera, FerrisWheel, X } from "@tamagui/lucide-icons"; 16 + import { H6, Text } from "tamagui"; 17 + import Waiting from "components/live-dashboard/waiting"; 18 + import { selectTelemetry } from "features/streamplace/streamplaceSlice"; 15 19 16 20 enum StreamSource { 17 21 Start, ··· 100 104 ); 101 105 } 102 106 103 - import { Camera, FerrisWheel, X } from "@tamagui/lucide-icons"; 104 - import { H6, Text } from "tamagui"; 105 - import Waiting from "components/live-dashboard/waiting"; 106 - import { selectTelemetry } from "features/streamplace/streamplaceSlice"; 107 107 const elems = [ 108 108 { 109 109 title: "Stream your camera!",
+5 -2
pkg/api/api.go
··· 59 59 60 60 connTracker *WebsocketTracker 61 61 62 - limiters map[string]*rate.Limiter 63 - limitersMu sync.Mutex 62 + limiters map[string]*rate.Limiter 63 + limitersMu sync.Mutex 64 + SignerCache map[string]media.MediaSigner 65 + SignerCacheMu sync.Mutex 64 66 } 65 67 66 68 type WebsocketTracker struct { ··· 87 89 Director: d, 88 90 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 89 91 limiters: make(map[string]*rate.Limiter), 92 + SignerCache: make(map[string]media.MediaSigner), 90 93 } 91 94 a.Mimes, err = updater.GetMimes() 92 95 if err != nil {
+45 -7
pkg/api/api_internal.go
··· 25 25 "golang.org/x/sync/errgroup" 26 26 "stream.place/streamplace/pkg/errors" 27 27 "stream.place/streamplace/pkg/log" 28 + "stream.place/streamplace/pkg/media" 28 29 "stream.place/streamplace/pkg/mist/mistconfig" 29 30 "stream.place/streamplace/pkg/mist/misttriggers" 30 31 "stream.place/streamplace/pkg/model" ··· 55 56 func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 56 57 router := httprouter.New() 57 58 broker := misttriggers.NewTriggerBroker() 58 - broker.OnPushOutStart(func(ctx context.Context, payload *misttriggers.PushOutStartPayload) (string, error) { 59 - return payload.URL, nil 60 - }) 59 + 61 60 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 62 61 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 62 + // Extract the last part of the URL path 63 + urlPath := payload.URL.Path 64 + parts := strings.Split(urlPath, "/") 65 + lastPart := "" 66 + if len(parts) > 0 { 67 + lastPart = parts[len(parts)-1] 68 + } 69 + mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 70 + if err != nil { 71 + return "", err 72 + } 63 73 64 74 ms := time.Now().UnixMilli() 65 - out := fmt.Sprintf("%s+%s_%d", mistconfig.STREAM_NAME, payload.StreamName, ms) 75 + out := fmt.Sprintf("%s+%s_%d", mistconfig.STREAM_NAME, mediaSigner.Streamer(), ms) 76 + a.SignerCacheMu.Lock() 77 + a.SignerCache[mediaSigner.Streamer()] = mediaSigner 78 + a.SignerCacheMu.Unlock() 79 + log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 66 80 67 81 return out, nil 68 82 }) ··· 239 253 }) 240 254 241 255 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 256 + key := p.ByName("key") 242 257 log.Log(ctx, "stream start") 243 - err := a.MediaManager.IngestStream(ctx, r.Body, a.MediaSigner) 258 + 259 + var mediaSigner media.MediaSigner 260 + var ok bool 261 + var err error 262 + parts := strings.Split(key, "_") 263 + 264 + if len(parts) == 2 { 265 + a.SignerCacheMu.Lock() 266 + mediaSigner, ok = a.SignerCache[parts[0]] 267 + a.SignerCacheMu.Unlock() 268 + if !ok { 269 + log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 270 + errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 271 + return 272 + } 273 + } else { 274 + mediaSigner, err = a.MakeMediaSigner(ctx, key) 275 + if err != nil { 276 + errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 277 + return 278 + } 279 + } 280 + 281 + err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner) 244 282 245 283 if err != nil { 246 284 log.Log(ctx, "stream error", "error", err) ··· 251 289 } 252 290 253 291 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 254 - router.POST("/stream/:key", handleIncomingStream) 255 - router.PUT("/stream/:key", handleIncomingStream) 292 + router.POST("/live/:key", handleIncomingStream) 293 + router.PUT("/live/:key", handleIncomingStream) 256 294 257 295 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 258 296 id := p.ByName("id")
+2 -89
pkg/api/playback.go
··· 4 4 "bufio" 5 5 "bytes" 6 6 "context" 7 - "crypto" 8 7 "fmt" 9 8 "io" 10 9 "net/http" ··· 12 11 "strings" 13 12 "time" 14 13 15 - atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 16 - "github.com/decred/dcrd/dcrec/secp256k1" 17 14 "github.com/julienschmidt/httprouter" 18 - "github.com/mr-tron/base58" 19 15 "github.com/pion/webrtc/v4" 20 16 "golang.org/x/sync/errgroup" 21 17 "stream.place/streamplace/pkg/aqtime" 22 - "stream.place/streamplace/pkg/atproto" 23 18 "stream.place/streamplace/pkg/constants" 24 19 "stream.place/streamplace/pkg/errors" 25 - apierrors "stream.place/streamplace/pkg/errors" 26 20 "stream.place/streamplace/pkg/log" 27 - "stream.place/streamplace/pkg/media" 28 21 "stream.place/streamplace/pkg/spmetrics" 29 22 ) 30 23 ··· 208 201 encoded = strings.TrimSpace(encoded) 209 202 } 210 203 211 - if len(encoded) < 2 || encoded[0] != 'z' { 212 - errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a multibase base58btc string)", nil) 213 - return 214 - } 215 - 216 - var addrBytes []byte 217 - var didBytes []byte 218 - priv, err := atcrypto.ParsePrivateMultibase(encoded) 219 - if err == nil { 220 - addrBytes = priv.Bytes() 221 - } else { 222 - decoded, err := base58.Decode(encoded[1:]) 223 - if err != nil { 224 - errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a base58btc string)", nil) 225 - return 226 - } 227 - addrBytes = decoded[:32] 228 - didBytes = decoded[32:] 229 - priv, err = atcrypto.ParsePrivateBytesK256(addrBytes) 230 - if err != nil { 231 - errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid atcrypto)", err) 232 - return 233 - } 234 - } 235 - 236 - key, _ := secp256k1.PrivKeyFromBytes(addrBytes) 237 - if key == nil { 238 - errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", nil) 239 - return 240 - } 241 - var signer crypto.Signer = key.ToECDSA() 242 - pub, err := priv.PublicKey() 243 - if err != nil { 244 - apierrors.WriteHTTPUnauthorized(w, "invalid authorization key (could not parse as atcrypto)", err) 245 - return 246 - } 247 - 248 - did := string(didBytes) 249 - 250 - if did != "" { 251 - repo, err := a.ATSync.SyncBlueskyRepo(ctx, did, a.Model) 252 - if err != nil { 253 - apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 254 - return 255 - } 256 - err = a.CLI.StreamIsAllowed(repo.DID) 257 - if err != nil { 258 - apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err) 259 - return 260 - } 261 - signingKey, err := a.Model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 262 - if err != nil { 263 - apierrors.WriteHTTPUnauthorized(w, "signing key not found", err) 264 - return 265 - } 266 - if signingKey == nil { 267 - apierrors.WriteHTTPUnauthorized(w, "signing key not found", nil) 268 - return 269 - } 270 - } else { 271 - atkey, err := atproto.ParsePubKey(signer.Public()) 272 - if err != nil { 273 - apierrors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err) 274 - return 275 - } 276 - did = atkey.DIDKey() 277 - err = a.CLI.StreamIsAllowed(did) 278 - if err != nil { 279 - apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err) 280 - return 281 - } 282 - } 283 - 284 - ctx = log.WithLogValues(ctx, "did", did) 285 - 286 - var mediaSigner media.MediaSigner 287 - if a.CLI.ExternalSigning { 288 - mediaSigner, err = media.MakeMediaSignerExt(ctx, a.CLI, did, addrBytes) 289 - } else { 290 - mediaSigner, err = media.MakeMediaSigner(ctx, a.CLI, did, signer) 291 - } 204 + mediaSigner, err := a.MakeMediaSigner(ctx, encoded) 292 205 if err != nil { 293 - errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err) 206 + errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 294 207 return 295 208 } 296 209
+92
pkg/api/stream_key.go
··· 1 + package api 2 + 3 + import ( 4 + "context" 5 + "crypto" 6 + "fmt" 7 + 8 + atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 9 + "github.com/decred/dcrd/dcrec/secp256k1" 10 + "github.com/mr-tron/base58" 11 + "stream.place/streamplace/pkg/atproto" 12 + "stream.place/streamplace/pkg/log" 13 + "stream.place/streamplace/pkg/media" 14 + ) 15 + 16 + func (a *StreamplaceAPI) MakeMediaSigner(ctx context.Context, keyStr string) (media.MediaSigner, error) { 17 + if len(keyStr) < 2 || keyStr[0] != 'z' { 18 + return nil, fmt.Errorf("invalid authorization key (not a multibase base58btc string)") 19 + } 20 + 21 + var addrBytes []byte 22 + var didBytes []byte 23 + priv, err := atcrypto.ParsePrivateMultibase(keyStr) 24 + if err == nil { 25 + addrBytes = priv.Bytes() 26 + } else { 27 + decoded, err := base58.Decode(keyStr[1:]) 28 + if err != nil { 29 + return nil, fmt.Errorf("invalid authorization key (not a base58btc string)") 30 + } 31 + addrBytes = decoded[:32] 32 + didBytes = decoded[32:] 33 + priv, err = atcrypto.ParsePrivateBytesK256(addrBytes) 34 + if err != nil { 35 + return nil, fmt.Errorf("invalid authorization key (not valid atproto): %w", err) 36 + } 37 + } 38 + 39 + key, _ := secp256k1.PrivKeyFromBytes(addrBytes) 40 + if key == nil { 41 + return nil, fmt.Errorf("invalid authorization key (not valid secp256k1)") 42 + } 43 + var signer crypto.Signer = key.ToECDSA() 44 + pub, err := priv.PublicKey() 45 + if err != nil { 46 + return nil, fmt.Errorf("invalid authorization key (could not parse as atproto): %w", err) 47 + } 48 + 49 + did := string(didBytes) 50 + 51 + if did != "" { 52 + repo, err := a.ATSync.SyncBlueskyRepo(ctx, did, a.Model) 53 + if err != nil { 54 + return nil, fmt.Errorf("could not resolve streamplace key: %w", err) 55 + } 56 + err = a.CLI.StreamIsAllowed(repo.DID) 57 + if err != nil { 58 + return nil, fmt.Errorf("user is not allowed to stream: %w", err) 59 + } 60 + signingKey, err := a.Model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 61 + if err != nil { 62 + return nil, fmt.Errorf("signing key not found: %w", err) 63 + } 64 + if signingKey == nil { 65 + return nil, fmt.Errorf("signing key not found") 66 + } 67 + } else { 68 + atkey, err := atproto.ParsePubKey(signer.Public()) 69 + if err != nil { 70 + return nil, fmt.Errorf("invalid authorization key (not valid secp256k1): %w", err) 71 + } 72 + did = atkey.DIDKey() 73 + err = a.CLI.StreamIsAllowed(did) 74 + if err != nil { 75 + return nil, fmt.Errorf("user is not allowed to stream: %w", err) 76 + } 77 + } 78 + 79 + ctx = log.WithLogValues(ctx, "did", did) 80 + 81 + var mediaSigner media.MediaSigner 82 + if a.CLI.ExternalSigning { 83 + mediaSigner, err = media.MakeMediaSignerExt(ctx, a.CLI, did, addrBytes) 84 + } else { 85 + mediaSigner, err = media.MakeMediaSigner(ctx, a.CLI, did, signer) 86 + } 87 + if err != nil { 88 + return nil, fmt.Errorf("invalid authorization key (not valid secp256k1): %w", err) 89 + } 90 + 91 + return mediaSigner, nil 92 + }
+45
pkg/cmd/live.go
··· 1 + package cmd 2 + 3 + import ( 4 + "fmt" 5 + "io" 6 + "net/http" 7 + "os" 8 + ) 9 + 10 + func Live(streamKey string) error { 11 + // Create the URL for the live stream endpoint 12 + url := fmt.Sprintf("http://127.0.0.1:39090/live/%s", streamKey) 13 + 14 + // Create a new HTTP request with POST method 15 + req, err := http.NewRequest("POST", url, os.Stdin) 16 + if err != nil { 17 + return fmt.Errorf("error creating request: %w", err) 18 + } 19 + 20 + // Set appropriate headers if needed 21 + req.Header.Set("Content-Type", "video/x-matroska") // Assuming MKV format, adjust if needed 22 + 23 + // Create HTTP client and send the request 24 + client := &http.Client{} 25 + resp, err := client.Do(req) 26 + if err != nil { 27 + return fmt.Errorf("error sending stream: %w", err) 28 + } 29 + defer resp.Body.Close() 30 + 31 + // Check response status 32 + if resp.StatusCode != http.StatusOK { 33 + body, _ := io.ReadAll(resp.Body) 34 + return fmt.Errorf("server returned non-OK status: %d %s - %s", 35 + resp.StatusCode, resp.Status, string(body)) 36 + } 37 + 38 + // Copy response to stdout (if any) 39 + _, err = io.Copy(os.Stdout, resp.Body) 40 + if err != nil { 41 + return fmt.Errorf("error reading response: %w", err) 42 + } 43 + 44 + return nil 45 + }
+16
pkg/cmd/streamplace.go
··· 28 28 "stream.place/streamplace/pkg/notifications" 29 29 "stream.place/streamplace/pkg/replication" 30 30 "stream.place/streamplace/pkg/replication/boring" 31 + "stream.place/streamplace/pkg/rtmps" 31 32 v0 "stream.place/streamplace/pkg/schema/v0" 32 33 "stream.place/streamplace/pkg/spmetrics" 33 34 ··· 79 80 os.Exit(1) 80 81 } 81 82 return Stream(os.Args[2]) 83 + } 84 + 85 + if len(os.Args) > 1 && os.Args[1] == "live" { 86 + if len(os.Args) != 3 { 87 + fmt.Println("usage: streamplace live [stream-key]") 88 + os.Exit(1) 89 + } 90 + return Live(os.Args[2]) 82 91 } 83 92 84 93 if len(os.Args) > 1 && os.Args[1] == "sign" { ··· 153 162 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 10, "rate limit for requests per second per ip") 154 163 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 10, "rate limit burst for requests per ip") 155 164 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 165 + fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 166 + fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 156 167 version := fs.Bool("version", false, "print version and exit") 157 168 158 169 if runtime.GOOS == "linux" { ··· 351 362 group.Go(func() error { 352 363 return a.ServeHTTPRedirect(ctx) 353 364 }) 365 + if cli.RTMPServerAddon != "" { 366 + group.Go(func() error { 367 + return rtmps.ServeRTMPS(ctx, &cli) 368 + }) 369 + } 354 370 } else { 355 371 group.Go(func() error { 356 372 return a.ServeHTTP(ctx)
+2
pkg/config/config.go
··· 56 56 HttpAddr string 57 57 HttpInternalAddr string 58 58 HttpsAddr string 59 + RtmpsAddr string 59 60 Secure bool 60 61 NoMist bool 61 62 MistAdminPort int ··· 90 91 Thumbnail bool 91 92 SmearAudio bool 92 93 ExternalSigning bool 94 + RTMPServerAddon string 93 95 TracingEndpoint string 94 96 PublicHost string 95 97 RateLimitPerSecond int
-64
pkg/media/gstreamer.go
··· 157 157 return nil 158 158 } 159 159 160 - func (mm *MediaManager) IngestStream(ctx context.Context, input io.Reader, ms MediaSigner) error { 161 - ctx, cancel := context.WithCancel(ctx) 162 - defer cancel() 163 - pipelineSlice := []string{ 164 - "appsrc name=streamsrc ! matroskademux name=demux", 165 - "demux. ! queue ! h264parse name=parse", 166 - "demux. ! queue ! aacparse name=audioparse", 167 - } 168 - pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 169 - if err != nil { 170 - return fmt.Errorf("error creating IngestStream pipeline: %w", err) 171 - } 172 - defer runtime.KeepAlive(pipeline) 173 - srcele, err := pipeline.GetElementByName("streamsrc") 174 - if err != nil { 175 - return err 176 - } 177 - // defer runtime.KeepAlive(srcele) 178 - src := app.SrcFromElement(srcele) 179 - src.SetCallbacks(&app.SourceCallbacks{ 180 - NeedDataFunc: ReaderNeedData(ctx, input), 181 - }) 182 - parseEle, err := pipeline.GetElementByName("parse") 183 - if err != nil { 184 - return err 185 - } 186 - 187 - signer, err := mm.SegmentAndSignElem(ctx, ms) 188 - if err != nil { 189 - return err 190 - } 191 - 192 - err = pipeline.Add(signer) 193 - if err != nil { 194 - return err 195 - } 196 - err = parseEle.Link(signer) 197 - if err != nil { 198 - return err 199 - } 200 - audioparse, err := pipeline.GetElementByName("audioparse") 201 - if err != nil { 202 - return err 203 - } 204 - err = audioparse.Link(signer) 205 - if err != nil { 206 - return err 207 - } 208 - 209 - go func() { 210 - HandleBusMessages(ctx, pipeline) 211 - cancel() 212 - }() 213 - 214 - err = pipeline.SetState(gst.StatePlaying) 215 - if err != nil { 216 - return err 217 - } 218 - 219 - <-ctx.Done() 220 - 221 - return nil 222 - } 223 - 224 160 const TESTSRC_WIDTH = 1280 225 161 const TESTSRC_HEIGHT = 720 226 162 const QR_SIZE = 256
+86
pkg/media/mkv_ingest.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "strings" 8 + 9 + "github.com/go-gst/go-gst/gst" 10 + "github.com/go-gst/go-gst/gst/app" 11 + "stream.place/streamplace/pkg/log" 12 + ) 13 + 14 + // ingest a H264+AAC MKV stream (prolly from an RTMP server) 15 + func (mm *MediaManager) MKVIngest(ctx context.Context, input io.Reader, ms MediaSigner) error { 16 + ctx, cancel := context.WithCancel(ctx) 17 + defer cancel() 18 + pipelineSlice := []string{ 19 + "appsrc name=streamsrc ! matroskademux name=demux", 20 + "demux. ! queue ! h264parse name=parse", 21 + "demux. ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc", 22 + } 23 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 24 + if err != nil { 25 + return fmt.Errorf("error creating MKVIngest pipeline: %w", err) 26 + } 27 + 28 + srcele, err := pipeline.GetElementByName("streamsrc") 29 + if err != nil { 30 + return err 31 + } 32 + // defer runtime.KeepAlive(srcele) 33 + src := app.SrcFromElement(srcele) 34 + src.SetCallbacks(&app.SourceCallbacks{ 35 + NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 36 + }) 37 + parseEle, err := pipeline.GetElementByName("parse") 38 + if err != nil { 39 + return err 40 + } 41 + 42 + signer, err := mm.SegmentAndSignElem(ctx, ms) 43 + if err != nil { 44 + return err 45 + } 46 + 47 + err = pipeline.Add(signer) 48 + if err != nil { 49 + return err 50 + } 51 + err = parseEle.Link(signer) 52 + if err != nil { 53 + return err 54 + } 55 + audioenc, err := pipeline.GetElementByName("audioenc") 56 + if err != nil { 57 + return err 58 + } 59 + err = audioenc.Link(signer) 60 + if err != nil { 61 + return err 62 + } 63 + 64 + busErr := make(chan error) 65 + go func() { 66 + err := HandleBusMessages(ctx, pipeline) 67 + cancel() 68 + busErr <- err 69 + }() 70 + 71 + err = pipeline.SetState(gst.StatePlaying) 72 + if err != nil { 73 + return err 74 + } 75 + 76 + defer func() { 77 + err := pipeline.SetState(gst.StateNull) 78 + if err != nil { 79 + log.Error(ctx, "error setting pipeline to null state", "error", err) 80 + } 81 + }() 82 + 83 + <-busErr 84 + 85 + return nil 86 + }
+99
pkg/rtmps/rtmps.go
··· 1 + package rtmps 2 + 3 + import ( 4 + "context" 5 + "crypto/tls" 6 + "errors" 7 + "fmt" 8 + "io" 9 + "net" 10 + "sync" 11 + 12 + "stream.place/streamplace/pkg/config" 13 + "stream.place/streamplace/pkg/log" 14 + ) 15 + 16 + // passthrough RTMPS TLS terminator to external RTMP server 17 + func ServeRTMPS(ctx context.Context, cli *config.CLI) error { 18 + if cli.RTMPServerAddon == "" { 19 + return fmt.Errorf("RTMP server address not configured") 20 + } 21 + 22 + cert, err := tls.LoadX509KeyPair(cli.TLSCertPath, cli.TLSKeyPath) 23 + if err != nil { 24 + return fmt.Errorf("failed to load TLS certificate: %w", err) 25 + } 26 + 27 + tlsConfig := &tls.Config{ 28 + Certificates: []tls.Certificate{cert}, 29 + MinVersion: tls.VersionTLS12, 30 + } 31 + 32 + listener, err := tls.Listen("tcp", cli.RtmpsAddr, tlsConfig) 33 + if err != nil { 34 + return fmt.Errorf("failed to create RTMPS listener: %w", err) 35 + } 36 + 37 + log.Log(ctx, "rtmps server starting", 38 + "addr", cli.RtmpsAddr, 39 + "forwarding_to", cli.RTMPServerAddon) 40 + 41 + go func() { 42 + <-ctx.Done() 43 + listener.Close() 44 + }() 45 + 46 + for { 47 + conn, err := listener.Accept() 48 + if err != nil { 49 + // Check if the context was canceled, which means we're shutting down 50 + select { 51 + case <-ctx.Done(): 52 + return nil 53 + default: 54 + log.Error(ctx, "error accepting RTMPS connection", "error", err) 55 + continue 56 + } 57 + } 58 + 59 + go func(clientConn net.Conn) { 60 + defer clientConn.Close() 61 + 62 + rtmpConn, err := net.Dial("tcp", cli.RTMPServerAddon) 63 + if err != nil { 64 + log.Error(ctx, "failed to connect to RTMP server", "error", err) 65 + return 66 + } 67 + defer rtmpConn.Close() 68 + 69 + // Create a wait group to wait for both copy operations to complete 70 + var wg sync.WaitGroup 71 + wg.Add(2) 72 + 73 + // Copy from client to RTMP server 74 + go func() { 75 + defer wg.Done() 76 + _, err := io.Copy(rtmpConn, clientConn) 77 + if err != nil && !errors.Is(err, io.EOF) { 78 + log.Error(ctx, "error copying from client to RTMP server", "error", err) 79 + } 80 + // Signal the other goroutine to stop by closing the connection 81 + rtmpConn.Close() 82 + }() 83 + 84 + // Copy from RTMP server to client 85 + go func() { 86 + defer wg.Done() 87 + _, err := io.Copy(clientConn, rtmpConn) 88 + if err != nil && !errors.Is(err, io.EOF) { 89 + log.Error(ctx, "error copying from RTMP server to client", "error", err) 90 + } 91 + // Signal the other goroutine to stop by closing the connection 92 + clientConn.Close() 93 + }() 94 + 95 + // Wait for both copy operations to complete 96 + wg.Wait() 97 + }(conn) 98 + } 99 + }