Live video on the AT Protocol

oproxy: implemented authorize

+183 -95
js/docs/src/content/docs/guides/start-streaming/obs-multistream.jpg

This is a binary file and will not be displayed.

+1 -1
pkg/atproto/resolution.go pkg/oproxy/resolution.go
··· 1 - package atproto 1 + package oproxy 2 2 3 3 import ( 4 4 "context"
+19 -33
pkg/oproxy/handlers.go
··· 5 5 "fmt" 6 6 "net/http" 7 7 "net/url" 8 - "time" 9 8 10 9 "github.com/labstack/echo/v4" 11 10 "go.opentelemetry.io/otel" ··· 61 60 } 62 61 63 62 func (o *OProxy) HandleOAuthPAR(c echo.Context) error { 63 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleOAuthPAR") 64 + defer span.End() 64 65 c.Response().Header().Set("Access-Control-Allow-Origin", "*") 65 66 var par PAR 66 67 if err := json.NewDecoder(c.Request().Body).Decode(&par); err != nil { ··· 72 73 return echo.NewHTTPError(http.StatusUnauthorized, "DPoP header is required") 73 74 } 74 75 75 - resp, err := o.NewPAR(c.Request().Context(), &par, dpopHeader) 76 + resp, err := o.NewPAR(ctx, &par, dpopHeader) 76 77 if err != nil { 77 78 return echo.NewHTTPError(http.StatusBadRequest, err.Error()) 78 79 } ··· 80 81 } 81 82 82 83 func (o *OProxy) HandleOAuthAuthorize(c echo.Context) error { 83 - w.Header().Set("Access-Control-Allow-Origin", "*") 84 - query := r.URL.Query() 85 - parID := query.Get("request_uri") 86 - if parID == "" { 87 - apierrors.WriteHTTPBadRequest(w, "request_uri is required", nil) 88 - return 89 - } 90 - par, err := a.Model.GetPAR(parID) 91 - if err != nil { 92 - apierrors.WriteHTTPInternalServerError(w, "could not get par", err) 93 - return 94 - } 95 - if par == nil { 96 - apierrors.WriteHTTPBadRequest(w, "par not found", nil) 97 - return 98 - } 99 - if par.ExpiresAt.Before(time.Now()) { 100 - apierrors.WriteHTTPBadRequest(w, "par expired", nil) 101 - return 84 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleOAuthAuthorize") 85 + defer span.End() 86 + c.Response().Header().Set("Access-Control-Allow-Origin", "*") 87 + requestURI := c.QueryParam("request_uri") 88 + if requestURI == "" { 89 + return echo.NewHTTPError(http.StatusBadRequest, "request_uri is required") 102 90 } 103 - if par.LoginHint == "" { 104 - apierrors.WriteHTTPBadRequest(w, "login hint is required", nil) 105 - return 91 + clientID := c.QueryParam("client_id") 92 + if clientID == "" { 93 + return echo.NewHTTPError(http.StatusBadRequest, "client_id is required") 106 94 } 107 - redirectURL, err := atproto.Login(ctx, a.CLI, par, a.Model) 95 + redirectURL, err := o.Authorize(ctx, clientID, requestURI) 108 96 if err != nil { 109 - apierrors.WriteHTTPInternalServerError(w, "could not login", err) 110 - return 97 + return err 111 98 } 112 - http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect) 113 - 99 + return c.Redirect(http.StatusTemporaryRedirect, redirectURL) 114 100 } 115 101 116 102 func (o *OProxy) HandleOAuthReturn(c echo.Context) error { 117 - ctx, span := otel.Tracer("server").Start(ctx, "HandlePlaceStreamAccountOauthReturn") 103 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleOAuthReturn") 118 104 defer span.End() 119 - code := r.URL.Query().Get("code") 120 - iss := r.URL.Query().Get("iss") 121 - state := r.URL.Query().Get("state") 105 + code := c.QueryParam("code") 106 + iss := c.QueryParam("iss") 107 + state := c.QueryParam("state") 122 108 upstreamSession, err := atproto.HandleOauthReturn(ctx, a.CLI, code, iss, state, a.Model) 123 109 if err != nil { 124 110 apierrors.WriteHTTPInternalServerError(w, "could not handle oauth return", err)
+23
pkg/oproxy/helpers.go
··· 2 2 3 3 import ( 4 4 "fmt" 5 + "strings" 5 6 6 7 "github.com/google/uuid" 7 8 ) ··· 17 18 } 18 19 return fmt.Sprintf("%s-%s", prefix, uu.String()) 19 20 } 21 + 22 + var urnPrefix = "urn:ietf:params:oauth:request_uri:" 23 + 24 + func makeURN(jkt string) string { 25 + uu, err := uuid.NewV7() 26 + if err != nil { 27 + panic(err) 28 + } 29 + return fmt.Sprintf("%s%s_%s", urnPrefix, jkt, uu.String()) 30 + } 31 + 32 + // urn --> jkt, uu 33 + func parseURN(urn string) (string, string, error) { 34 + if !strings.HasPrefix(urn, urnPrefix) { 35 + return "", "", fmt.Errorf("invalid URN: %s", urn) 36 + } 37 + parts := strings.Split(urn[len(urnPrefix):], "_") 38 + if len(parts) != 2 { 39 + return "", "", fmt.Errorf("invalid URN: %s", urn) 40 + } 41 + return parts[0], parts[1], nil 42 + }
+1 -5
pkg/oproxy/oauth_downstream.go
··· 300 300 301 301 // proof is valid, get public key to use as primary key of oauth session 302 302 jkt := proof.PublicKey() 303 - uu, err := uuid.NewV7() 304 - if err != nil { 305 - panic(err) 306 - } 307 303 308 - urn := fmt.Sprintf("urn:ietf:params:oauth:request_uri:%s", uu.String()) 304 + urn := makeURN(jkt) 309 305 310 306 err = o.createOAuthSession(jkt, &OAuthSession{ 311 307 DownstreamDPoPJKT: jkt,
+57 -14
pkg/oproxy/oauth_session.go
··· 2 2 3 3 import ( 4 4 "time" 5 - 6 - "gorm.io/gorm" 7 5 ) 8 6 9 7 // OAuthSession stores authentication data needed during the OAuth flow 10 8 type OAuthSession struct { 11 - DID string `gorm:"column:repo_did;index"` 9 + DID string `gorm:"column:repo_did;index"` 10 + PDSUrl string `gorm:"column:pds_url;index"` 12 11 13 12 // Upstream fields 14 13 UpstreamState string `gorm:"column:upstream_state;index"` 15 14 UpstreamAuthServerIssuer string `gorm:"column:upstream_auth_server_issuer"` 16 15 UpstreamPKCEVerifier string `gorm:"column:upstream_pkce_verifier"` 17 16 UpstreamDPoPNonce string `gorm:"column:upstream_dpop_nonce"` 18 - UpstreamDPoPPrivateJWK []byte `gorm:"column:upstream_dpop_private_jwk;type:text"` 17 + UpstreamDPoPPrivateJWK string `gorm:"column:upstream_dpop_private_jwk;type:text"` 19 18 UpstreamAccessToken string `gorm:"column:upstream_access_token"` 20 19 UpstreamAccessTokenExp time.Time `gorm:"column:upstream_access_token_exp"` 21 20 UpstreamRefreshToken string `gorm:"column:upstream_refresh_token"` 22 21 23 22 // Downstream fields 24 - DownstreamDPoPNonce string `gorm:"column:downstream_dpop_nonce"` 25 - DownstreamDPoPJKT string `gorm:"column:downstream_dpop_jkt;primaryKey"` 26 - DownstreamAccessToken string `gorm:"column:downstream_access_token;index"` 27 - DownstreamRefreshToken string `gorm:"column:downstream_refresh_token;index"` 28 - DownstreamAuthorizationCode string `gorm:"column:downstream_authorization_code;index"` 29 - DownstreamState string `gorm:"column:downstream_state"` 30 - DownstreamScope string `gorm:"column:downstream_scope"` 31 - DownstreamCodeChallenge string `gorm:"column:downstream_code_challenge"` 32 - DownstreamPARRequestURI string `gorm:"column:downstream_par_request_uri"` 23 + DownstreamDPoPNonce string `gorm:"column:downstream_dpop_nonce"` 24 + DownstreamDPoPJKT string `gorm:"column:downstream_dpop_jkt;primaryKey"` 25 + DownstreamAccessToken string `gorm:"column:downstream_access_token;index"` 26 + DownstreamRefreshToken string `gorm:"column:downstream_refresh_token;index"` 27 + DownstreamAuthorizationCode string `gorm:"column:downstream_authorization_code;index"` 28 + DownstreamState string `gorm:"column:downstream_state"` 29 + DownstreamScope string `gorm:"column:downstream_scope"` 30 + DownstreamCodeChallenge string `gorm:"column:downstream_code_challenge"` 31 + DownstreamPARRequestURI string `gorm:"column:downstream_par_request_uri"` 32 + DownstreamPARUsedAt *time.Time `gorm:"column:downstream_par_used_at"` 33 33 34 34 RevokedAt *time.Time `gorm:"column:revoked_at"` 35 35 CreatedAt time.Time 36 36 UpdatedAt time.Time 37 - DeletedAt gorm.DeletedAt `gorm:"index"` 38 37 } 39 38 39 + // for gorm. this is prettier than "o_auth_sessions" 40 40 func (o *OAuthSession) TableName() string { 41 41 return "oauth_sessions" 42 + } 43 + 44 + type OAuthSessionStatus string 45 + 46 + const ( 47 + // PAR has been created, but not yet used 48 + OAuthSessionStatePARCreated OAuthSessionStatus = "par-created" 49 + // PAR has been used, but maybe upstream will fail for some reason 50 + OAuthSessionStatePARUsed OAuthSessionStatus = "par-used" 51 + // PAR has been used, we're waiting to hear back from upstream 52 + OAuthSessionStateUpstream OAuthSessionStatus = "upstream" 53 + // Upstream came back, we've issued the user a code but it hasn't been used yet 54 + OAuthSessionStateDownstream OAuthSessionStatus = "downstream" 55 + // Code has been used, everything is good 56 + OAuthSessionStateReady OAuthSessionStatus = "ready" 57 + // For any reason we're done. Revoked or expired 58 + OAuthSessionStateRejected OAuthSessionStatus = "rejected" 59 + ) 60 + 61 + func (o *OAuthSession) Status() OAuthSessionStatus { 62 + if o.RevokedAt != nil { 63 + return OAuthSessionStateRejected 64 + } 65 + if o.UpstreamAccessTokenExp.Before(time.Now()) { 66 + return OAuthSessionStateRejected 67 + } 68 + if o.DownstreamAccessToken != "" { 69 + return OAuthSessionStateReady 70 + } 71 + if o.DownstreamAuthorizationCode != "" { 72 + return OAuthSessionStateDownstream 73 + } 74 + if o.UpstreamDPoPPrivateJWK != "" { 75 + return OAuthSessionStateUpstream 76 + } 77 + if o.DownstreamPARUsedAt != nil { 78 + return OAuthSessionStatePARUsed 79 + } 80 + if o.DownstreamPARRequestURI != "" { 81 + return OAuthSessionStatePARCreated 82 + } 83 + // todo: this should never happen, log a warning? panic? 84 + return OAuthSessionStateRejected 42 85 } 43 86 44 87 // func (m *DBModel) CreateOAuthSession(session *OAuthSession) error {
+66 -41
pkg/oproxy/oauth_upstream.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "fmt" 7 + "net/http" 7 8 "net/url" 8 9 "time" 9 10 ··· 11 12 "github.com/bluesky-social/indigo/xrpc" 12 13 oauth "github.com/haileyok/atproto-oauth-golang" 13 14 "github.com/haileyok/atproto-oauth-golang/helpers" 15 + "github.com/labstack/echo/v4" 14 16 "github.com/lestrrat-go/jwx/v2/jwk" 15 17 "stream.place/streamplace/pkg/config" 16 - "stream.place/streamplace/pkg/log" 17 18 "stream.place/streamplace/pkg/model" 18 19 ) 19 20 20 - func Login(ctx context.Context, cli *config.CLI, downstreamPAR *model.PAR, mod model.Model) (string, error) { 21 - meta := GetUpstreamMetadata("longos.iameli.link", "web", "") 21 + // downstream --> upstream transition; attempt to send user to the upstream auth server 22 + func (o *OProxy) Authorize(ctx context.Context, requestURI, clientID string) (string, error) { 23 + downstreamMeta := o.GetDownstreamMetadata() 24 + if downstreamMeta.ClientID != clientID { 25 + return "", echo.NewHTTPError(http.StatusBadRequest, "client ID mismatch") 26 + } 27 + 28 + jkt, _, err := parseURN(requestURI) 29 + if err != nil { 30 + return "", echo.NewHTTPError(http.StatusBadRequest, err.Error()) 31 + } 32 + 33 + session, err := o.loadOAuthSession(jkt) 34 + if err != nil { 35 + return "", echo.NewHTTPError(http.StatusBadRequest, err.Error()) 36 + } 37 + 38 + if session == nil { 39 + return "", echo.NewHTTPError(http.StatusBadRequest, "no session found") 40 + } 41 + 42 + if session.Status() != OAuthSessionStatePARCreated { 43 + return "", echo.NewHTTPError(http.StatusBadRequest, "session is not in par-created state") 44 + } 45 + 46 + if session.DownstreamPARRequestURI != requestURI { 47 + return "", echo.NewHTTPError(http.StatusBadRequest, "request URI mismatch") 48 + } 49 + 50 + now := time.Now() 51 + session.DownstreamPARUsedAt = &now 52 + err = o.updateOAuthSession(jkt, session) 53 + if err != nil { 54 + return "", echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to update OAuth session: %s", err)) 55 + } 56 + 57 + upstreamMeta := o.GetUpstreamMetadata() 22 58 oclient, err := oauth.NewClient(oauth.ClientArgs{ 23 - ClientJwk: cli.JWK, 24 - ClientId: meta.ClientID, 25 - RedirectUri: meta.RedirectURIs[0], 59 + ClientJwk: o.jwk, 60 + ClientId: upstreamMeta.ClientID, 61 + RedirectUri: upstreamMeta.RedirectURIs[0], 26 62 }) 27 - log.Log(ctx, "OAuth client information", "clientId", meta.ClientID, "redirectUri", meta.RedirectURIs[0]) 28 63 if err != nil { 29 - return "", fmt.Errorf("failed to create OAuth client: %w", err) 64 + return "", echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to create OAuth client: %s", err)) 30 65 } 31 66 32 - // If you already have a did or a URL, you can skip this step 33 - did, err := resolveHandle(ctx, downstreamPAR.LoginHint) // returns did:plc:abc123 or did:web:test.com 34 - if err != nil { 35 - return "", fmt.Errorf("failed to resolve handle '%s': %w", downstreamPAR.LoginHint, err) 36 - } 67 + // did, err := resolveHandle(ctx, session.DID) 68 + // if err != nil { 69 + // return "", echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("failed to resolve handle '%s': %s", session.DID, err)) 70 + // } 37 71 38 - // If you already have a URL, you can skip this step 39 - service, err := resolveService(ctx, did) // returns https://pds.haileyok.com 72 + service, err := resolveService(ctx, session.DID) 40 73 if err != nil { 41 - return "", fmt.Errorf("failed to resolve service for DID '%s': %w", did, err) 74 + return "", echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("failed to resolve service for DID '%s': %s", session.DID, err)) 42 75 } 43 76 44 77 authserver, err := oclient.ResolvePdsAuthServer(ctx, service) 45 78 if err != nil { 46 - return "", fmt.Errorf("failed to resolve PDS auth server for service '%s': %w", service, err) 79 + return "", echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("failed to resolve PDS auth server for service '%s': %s", service, err)) 47 80 } 48 81 49 82 authmeta, err := oclient.FetchAuthServerMetadata(ctx, authserver) 50 83 if err != nil { 51 - return "", fmt.Errorf("failed to fetch auth server metadata from '%s': %w", authserver, err) 84 + return "", echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("failed to fetch auth server metadata from '%s': %s", authserver, err)) 52 85 } 53 86 54 87 k, err := helpers.GenerateKey(nil) 55 88 if err != nil { 56 - return "", fmt.Errorf("failed to generate DPoP key: %w", err) 89 + return "", echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to generate DPoP key: %s", err)) 57 90 } 58 91 59 - // b, err := json.Marshal(k) 60 - // if err != nil { 61 - // return "", err 62 - // } 63 - 64 - parResp, err := oclient.SendParAuthRequest(ctx, authserver, authmeta, downstreamPAR.LoginHint, meta.Scope, k) 92 + parResp, err := oclient.SendParAuthRequest(ctx, authserver, authmeta, session.DID, upstreamMeta.Scope, k) 65 93 if err != nil { 66 - return "", fmt.Errorf("failed to send PAR auth request to '%s': %w", authserver, err) 94 + return "", echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("failed to send PAR auth request to '%s': %s", authserver, err)) 67 95 } 68 96 69 - log.Log(ctx, "parResp", "parResp", parResp) 70 - 71 97 jwkJSON, err := json.Marshal(k) 72 98 if err != nil { 73 99 return "", fmt.Errorf("failed to marshal DPoP key to JSON: %w", err) ··· 77 103 if err != nil { 78 104 return "", fmt.Errorf("failed to parse auth server metadata: %w", err) 79 105 } 80 - u.RawQuery = fmt.Sprintf("client_id=%s&request_uri=%s", url.QueryEscape(meta.ClientID), parResp.RequestUri) 106 + u.RawQuery = fmt.Sprintf("client_id=%s&request_uri=%s", url.QueryEscape(upstreamMeta.ClientID), parResp.RequestUri) 81 107 str := u.String() 82 108 83 - err = mod.CreateOAuthSession(&model.OAuthSession{ 84 - UpstreamState: parResp.State, 85 - RepoDID: did, 86 - PDSUrl: service, 87 - UpstreamAuthServerIssuer: authserver, 88 - UpstreamPKCEVerifier: parResp.PkceVerifier, 89 - UpstreamDPoPNonce: parResp.DpopAuthserverNonce, 90 - UpstreamDPoPPrivateJWK: jwkJSON, 91 - DownstreamPARID: downstreamPAR.ID, 92 - }) 109 + session.DID = session.DID 110 + session.PDSUrl = service 111 + session.UpstreamState = parResp.State 112 + session.UpstreamAuthServerIssuer = authserver 113 + session.UpstreamPKCEVerifier = parResp.PkceVerifier 114 + session.UpstreamDPoPNonce = parResp.DpopAuthserverNonce 115 + session.UpstreamDPoPPrivateJWK = string(jwkJSON) 116 + 117 + err = o.updateOAuthSession(jkt, session) 93 118 if err != nil { 94 - return "", fmt.Errorf("failed to create OAuth session in database: %w", err) 119 + return "", echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to update OAuth session: %s", err)) 95 120 } 96 121 97 122 return str, nil 98 123 } 99 124 100 - func HandleOauthReturn(ctx context.Context, cli *config.CLI, code string, iss string, state string, mod model.Model) (*model.OAuthSession, error) { 125 + func Return(ctx context.Context, code string, iss string, state string) (*model.OAuthSession, error) { 101 126 meta := GetUpstreamMetadata("longos.iameli.link", "web", "") 102 127 oclient, err := oauth.NewClient(oauth.ClientArgs{ 103 128 ClientJwk: cli.JWK,
+16 -1
pkg/oproxy/oproxy.go
··· 1 1 package oproxy 2 2 3 - import "github.com/labstack/echo/v4" 3 + import ( 4 + "log/slog" 5 + "os" 6 + 7 + "github.com/labstack/echo/v4" 8 + "github.com/lestrrat-go/jwx/v2/jwk" 9 + ) 4 10 5 11 type OProxy struct { 6 12 createOAuthSession func(id string, session *OAuthSession) error ··· 9 15 e *echo.Echo 10 16 host string 11 17 scope string 18 + jwk jwk.Key 19 + slog *slog.Logger 12 20 } 13 21 14 22 type Config struct { ··· 17 25 LoadOAuthSession func(id string) (*OAuthSession, error) 18 26 Host string 19 27 Scope string 28 + JWK jwk.Key 29 + Slog *slog.Logger 20 30 } 21 31 22 32 func New(conf *Config) *OProxy { 23 33 e := echo.New() 34 + mySlog := conf.Slog 35 + if mySlog == nil { 36 + mySlog = slog.New(slog.NewTextHandler(os.Stderr, nil)) 37 + } 24 38 return &OProxy{ 25 39 createOAuthSession: conf.CreateOAuthSession, 26 40 updateOAuthSession: conf.UpdateOAuthSession, ··· 28 42 e: e, 29 43 host: conf.Host, 30 44 scope: conf.Scope, 45 + jwk: conf.JWK, 31 46 } 32 47 }