Live video on the AT Protocol

oauth: successfully making posts and whatnot!

+232 -13
+5 -2
js/app/components/login/login.tsx
··· 12 import { Keyboard } from "react-native"; 13 import { useAppDispatch, useAppSelector } from "store/hooks"; 14 import { Button, Form, H3, Input, Sheet, Spinner, Text, View } from "tamagui"; 15 16 export default function Login() { 17 const dispatch = useAppDispatch(); ··· 52 ); 53 } 54 55 return ( 56 <View 57 f={1} ··· 75 <Button 76 width="100%" 77 onPress={async () => { 78 - const agent = new AtpBaseClient(`http://127.0.0.1:38080`); 79 const res = await agent.place.stream.account.login({ 80 handleOrDID: handle, 81 }); 82 - console.log(res); 83 // await dispatch(login(`https://${pds.url}`)); 84 }} 85 margin="$4"
··· 12 import { Keyboard } from "react-native"; 13 import { useAppDispatch, useAppSelector } from "store/hooks"; 14 import { Button, Form, H3, Input, Sheet, Spinner, Text, View } from "tamagui"; 15 + import useStreamplaceNode from "hooks/useStreamplaceNode"; 16 17 export default function Login() { 18 const dispatch = useAppDispatch(); ··· 53 ); 54 } 55 56 + const { url } = useStreamplaceNode(); 57 + 58 return ( 59 <View 60 f={1} ··· 78 <Button 79 width="100%" 80 onPress={async () => { 81 + const agent = new AtpBaseClient(url); 82 const res = await agent.place.stream.account.login({ 83 handleOrDID: handle, 84 }); 85 + window.location.href = res.data.redirectUrl; 86 // await dispatch(login(`https://${pds.url}`)); 87 }} 88 margin="$4"
+1 -1
pkg/atproto/client_metadata.go
··· 76 // } 77 78 if platform == "web" { 79 - meta.RedirectURIs = []string{fmt.Sprintf("https://%s/login", host)} 80 meta.ApplicationType = "web" 81 } else { 82 meta.RedirectURIs = []string{fmt.Sprintf("https://%s/api/app-return/%s", host, appBundleId)}
··· 76 // } 77 78 if platform == "web" { 79 + meta.RedirectURIs = []string{fmt.Sprintf("https://%s/xrpc/place.stream.account.oauthReturn", host)} 80 meta.ApplicationType = "web" 81 } else { 82 meta.RedirectURIs = []string{fmt.Sprintf("https://%s/api/app-return/%s", host, appBundleId)}
+134 -9
pkg/atproto/oauth.go
··· 2 3 import ( 4 "context" 5 "fmt" 6 "net/url" 7 8 oauth "github.com/haileyok/atproto-oauth-golang" 9 "github.com/haileyok/atproto-oauth-golang/helpers" 10 "stream.place/streamplace/pkg/config" 11 "stream.place/streamplace/pkg/log" 12 "stream.place/streamplace/pkg/streamplace" 13 ) 14 15 - func Login(ctx context.Context, cli *config.CLI, input *streamplace.AccountLogin_Input) (*streamplace.AccountDefs_LoginResponse, error) { 16 meta := GetMetadata("longos.iameli.link", "web", "") 17 oclient, err := oauth.NewClient(oauth.ClientArgs{ 18 ClientJwk: cli.JWK, ··· 21 }) 22 log.Log(ctx, "OAuth client information", "clientId", meta.ClientID, "redirectUri", meta.RedirectURIs[0]) 23 if err != nil { 24 - return nil, err 25 } 26 27 // If you already have a did or a URL, you can skip this step 28 did, err := resolveHandle(ctx, input.HandleOrDID) // returns did:plc:abc123 or did:web:test.com 29 if err != nil { 30 - return nil, err 31 } 32 33 // If you already have a URL, you can skip this step 34 service, err := resolveService(ctx, did) // returns https://pds.haileyok.com 35 if err != nil { 36 - return nil, err 37 } 38 39 authserver, err := oclient.ResolvePdsAuthServer(ctx, service) 40 if err != nil { 41 - return nil, err 42 } 43 44 authmeta, err := oclient.FetchAuthServerMetadata(ctx, authserver) 45 if err != nil { 46 - return nil, err 47 } 48 49 k, err := helpers.GenerateKey(nil) 50 if err != nil { 51 - return nil, err 52 } 53 54 // b, err := json.Marshal(k) ··· 58 59 parResp, err := oclient.SendParAuthRequest(ctx, authserver, authmeta, input.HandleOrDID, meta.Scope, k) 60 if err != nil { 61 - return nil, err 62 } 63 64 log.Log(ctx, "parResp", "parResp", parResp) 65 66 - u, _ := url.Parse(authmeta.AuthorizationEndpoint) 67 u.RawQuery = fmt.Sprintf("client_id=%s&request_uri=%s", url.QueryEscape(meta.ClientID), parResp.RequestUri) 68 str := u.String() 69 70 return &streamplace.AccountDefs_LoginResponse{ 71 RedirectUrl: str, 72 }, nil 73 }
··· 2 3 import ( 4 "context" 5 + "encoding/json" 6 "fmt" 7 "net/url" 8 + "time" 9 10 + "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/api/bsky" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + "github.com/bluesky-social/indigo/lex/util" 14 + "github.com/bluesky-social/indigo/xrpc" 15 oauth "github.com/haileyok/atproto-oauth-golang" 16 "github.com/haileyok/atproto-oauth-golang/helpers" 17 + "github.com/lestrrat-go/jwx/v2/jwk" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/log" 20 + "stream.place/streamplace/pkg/model" 21 "stream.place/streamplace/pkg/streamplace" 22 ) 23 24 + func Login(ctx context.Context, cli *config.CLI, input *streamplace.AccountLogin_Input, mod model.Model) (*streamplace.AccountDefs_LoginResponse, error) { 25 meta := GetMetadata("longos.iameli.link", "web", "") 26 oclient, err := oauth.NewClient(oauth.ClientArgs{ 27 ClientJwk: cli.JWK, ··· 30 }) 31 log.Log(ctx, "OAuth client information", "clientId", meta.ClientID, "redirectUri", meta.RedirectURIs[0]) 32 if err != nil { 33 + return nil, fmt.Errorf("failed to create OAuth client: %w", err) 34 } 35 36 // If you already have a did or a URL, you can skip this step 37 did, err := resolveHandle(ctx, input.HandleOrDID) // returns did:plc:abc123 or did:web:test.com 38 if err != nil { 39 + return nil, fmt.Errorf("failed to resolve handle '%s': %w", input.HandleOrDID, err) 40 } 41 42 // If you already have a URL, you can skip this step 43 service, err := resolveService(ctx, did) // returns https://pds.haileyok.com 44 if err != nil { 45 + return nil, fmt.Errorf("failed to resolve service for DID '%s': %w", did, err) 46 } 47 48 authserver, err := oclient.ResolvePdsAuthServer(ctx, service) 49 if err != nil { 50 + return nil, fmt.Errorf("failed to resolve PDS auth server for service '%s': %w", service, err) 51 } 52 53 authmeta, err := oclient.FetchAuthServerMetadata(ctx, authserver) 54 if err != nil { 55 + return nil, fmt.Errorf("failed to fetch auth server metadata from '%s': %w", authserver, err) 56 } 57 58 k, err := helpers.GenerateKey(nil) 59 if err != nil { 60 + return nil, fmt.Errorf("failed to generate DPoP key: %w", err) 61 } 62 63 // b, err := json.Marshal(k) ··· 67 68 parResp, err := oclient.SendParAuthRequest(ctx, authserver, authmeta, input.HandleOrDID, meta.Scope, k) 69 if err != nil { 70 + return nil, fmt.Errorf("failed to send PAR auth request to '%s': %w", authserver, err) 71 } 72 73 log.Log(ctx, "parResp", "parResp", parResp) 74 75 + jwkJSON, err := json.Marshal(k) 76 + if err != nil { 77 + return nil, fmt.Errorf("failed to marshal DPoP key to JSON: %w", err) 78 + } 79 + 80 + u, err := url.Parse(authmeta.AuthorizationEndpoint) 81 + if err != nil { 82 + return nil, fmt.Errorf("failed to parse auth server metadata: %w", err) 83 + } 84 u.RawQuery = fmt.Sprintf("client_id=%s&request_uri=%s", url.QueryEscape(meta.ClientID), parResp.RequestUri) 85 str := u.String() 86 87 + err = mod.CreateOAuthSession(&model.OAuthSession{ 88 + State: parResp.State, 89 + RepoDID: did, 90 + PDSUrl: service, 91 + AuthServerIssuer: authserver, 92 + PKCEVerifier: parResp.PkceVerifier, 93 + DPoPNonce: parResp.DpopAuthserverNonce, 94 + DPoPPrivateJWK: jwkJSON, 95 + }) 96 + if err != nil { 97 + return nil, fmt.Errorf("failed to create OAuth session in database: %w", err) 98 + } 99 + 100 return &streamplace.AccountDefs_LoginResponse{ 101 RedirectUrl: str, 102 }, nil 103 } 104 + 105 + var xrpcClient *oauth.XrpcClient 106 + 107 + func getXrpcClient(mod model.Model) *oauth.XrpcClient { 108 + if xrpcClient == nil { 109 + xrpcClient = &oauth.XrpcClient{ 110 + OnDpopPdsNonceChanged: func(did, newNonce string) { 111 + // todo: update the nonce in the database... i guess we only have one session per user? 112 + }, 113 + } 114 + } 115 + return xrpcClient 116 + } 117 + 118 + func HandleOauthReturn(ctx context.Context, cli *config.CLI, code string, iss string, state string, mod model.Model) error { 119 + meta := GetMetadata("longos.iameli.link", "web", "") 120 + oclient, err := oauth.NewClient(oauth.ClientArgs{ 121 + ClientJwk: cli.JWK, 122 + ClientId: meta.ClientID, 123 + RedirectUri: meta.RedirectURIs[0], 124 + }) 125 + 126 + session, err := mod.GetOAuthSessionByState(state) 127 + if err != nil { 128 + return fmt.Errorf("failed to get OAuth session: %w", err) 129 + } 130 + if session == nil { 131 + return fmt.Errorf("no OAuth session found for state: %s", state) 132 + } 133 + 134 + if iss != session.AuthServerIssuer { 135 + return fmt.Errorf("issuer mismatch: %s != %s", iss, session.AuthServerIssuer) 136 + } 137 + 138 + key, err := jwk.ParseKey(session.DPoPPrivateJWK) 139 + if err != nil { 140 + return fmt.Errorf("failed to parse DPoP private JWK: %w", err) 141 + } 142 + 143 + itResp, err := oclient.InitialTokenRequest(ctx, code, iss, session.PKCEVerifier, session.DPoPNonce, key) 144 + if err != nil { 145 + return fmt.Errorf("failed to request initial token: %w", err) 146 + } 147 + now := time.Now() 148 + 149 + if itResp.Sub != session.RepoDID { 150 + return fmt.Errorf("sub mismatch: %s != %s", itResp.Sub, session.RepoDID) 151 + } 152 + 153 + if itResp.Scope != meta.Scope { 154 + return fmt.Errorf("scope mismatch: %s != %s", itResp.Scope, meta.Scope) 155 + } 156 + 157 + expiry := now.Add(time.Second * time.Duration(itResp.ExpiresIn)).UTC() 158 + session.AccessToken = itResp.AccessToken 159 + session.AccessTokenExp = expiry 160 + session.RefreshToken = itResp.RefreshToken 161 + err = mod.UpdateOAuthSession(session) 162 + if err != nil { 163 + return fmt.Errorf("failed to update OAuth session: %w", err) 164 + } 165 + 166 + log.Log(ctx, "itResp", "itResp", itResp) 167 + 168 + authArgs := &oauth.XrpcAuthedRequestArgs{ 169 + Did: session.RepoDID, 170 + AccessToken: session.AccessToken, 171 + PdsUrl: session.PDSUrl, 172 + Issuer: session.AuthServerIssuer, 173 + DpopPdsNonce: session.DPoPNonce, 174 + DpopPrivateJwk: key, 175 + } 176 + 177 + post := bsky.FeedPost{ 178 + Text: "hello from atproto golang oauth client", 179 + CreatedAt: syntax.DatetimeNow().String(), 180 + } 181 + 182 + input := atproto.RepoCreateRecord_Input{ 183 + Collection: "app.bsky.feed.post", 184 + Repo: authArgs.Did, 185 + Record: &util.LexiconTypeDecoder{Val: &post}, 186 + } 187 + 188 + xc := getXrpcClient(mod) 189 + 190 + var out atproto.RepoCreateRecord_Output 191 + if err := xc.Do(ctx, authArgs, xrpc.Procedure, "application/json", "com.atproto.repo.createRecord", nil, input, &out); err != nil { 192 + return err 193 + } 194 + 195 + log.Log(ctx, "out", "out", out) 196 + 197 + return nil 198 + }
+6
pkg/model/model.go
··· 79 80 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 81 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 82 } 83 84 func MakeDB(dbURL string) (Model, error) { ··· 135 Block{}, 136 ChatMessage{}, 137 ChatProfile{}, 138 } { 139 err = db.AutoMigrate(model) 140 if err != nil {
··· 79 80 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 81 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 82 + 83 + CreateOAuthSession(session *OAuthSession) error 84 + GetOAuthSessionByState(state string) (*OAuthSession, error) 85 + UpdateOAuthSession(session *OAuthSession) error 86 + DeleteOAuthSession(state string) error 87 } 88 89 func MakeDB(dbURL string) (Model, error) { ··· 140 Block{}, 141 ChatMessage{}, 142 ChatProfile{}, 143 + OAuthSession{}, 144 } { 145 err = db.AutoMigrate(model) 146 if err != nil {
+55
pkg/model/oauth_session.go
···
··· 1 + package model 2 + 3 + import ( 4 + "time" 5 + 6 + "gorm.io/gorm" 7 + ) 8 + 9 + // OAuthSession stores authentication data needed during the OAuth flow 10 + type OAuthSession struct { 11 + // ID string `gorm:"primarykey"` 12 + State string `gorm:"column:state;primarykey"` 13 + RepoDID string `gorm:"column:repo_did;index"` 14 + PDSUrl string `gorm:"column:pds_url"` 15 + AuthServerIssuer string `gorm:"column:auth_server_issuer"` 16 + PKCEVerifier string `gorm:"column:pkce_verifier"` 17 + DPoPNonce string `gorm:"column:dpop_nonce"` 18 + DPoPPrivateJWK []byte `gorm:"column:dpop_private_jwk;type:text"` 19 + AccessToken string `gorm:"column:access_token"` 20 + AccessTokenExp time.Time `gorm:"column:access_token_exp"` 21 + RefreshToken string `gorm:"column:refresh_token"` 22 + CreatedAt time.Time 23 + UpdatedAt time.Time 24 + DeletedAt gorm.DeletedAt `gorm:"index"` 25 + } 26 + 27 + func (m *DBModel) CreateOAuthSession(session *OAuthSession) error { 28 + return m.DB.Create(session).Error 29 + } 30 + 31 + func (m *DBModel) GetOAuthSessionByState(state string) (*OAuthSession, error) { 32 + var session OAuthSession 33 + err := m.DB.Where("state = ?", state).First(&session).Error 34 + if err != nil { 35 + return nil, err 36 + } 37 + return &session, nil 38 + } 39 + 40 + // func (m *DBModel) GetOAuthSessionByID(id string) (*OAuthSession, error) { 41 + // var session OAuthSession 42 + // err := m.DB.Where("id = ?", id).First(&session).Error 43 + // if err != nil { 44 + // return nil, err 45 + // } 46 + // return &session, nil 47 + // } 48 + 49 + func (m *DBModel) UpdateOAuthSession(session *OAuthSession) error { 50 + return m.DB.Save(session).Error 51 + } 52 + 53 + func (m *DBModel) DeleteOAuthSession(state string) error { 54 + return m.DB.Delete(&OAuthSession{}, "state = ?", state).Error 55 + }
+28 -1
pkg/spxrpc/account.go
··· 3 import ( 4 "context" 5 6 "stream.place/streamplace/pkg/atproto" 7 placestreamtypes "stream.place/streamplace/pkg/streamplace" 8 ) 9 10 func (s *Server) handlePlaceStreamAccountLogin(ctx context.Context, body *placestreamtypes.AccountLogin_Input) (*placestreamtypes.AccountDefs_LoginResponse, error) { 11 - return atproto.Login(ctx, s.cli, body) 12 }
··· 3 import ( 4 "context" 5 6 + "github.com/labstack/echo/v4" 7 + "go.opentelemetry.io/otel" 8 "stream.place/streamplace/pkg/atproto" 9 + "stream.place/streamplace/pkg/log" 10 placestreamtypes "stream.place/streamplace/pkg/streamplace" 11 ) 12 13 func (s *Server) handlePlaceStreamAccountLogin(ctx context.Context, body *placestreamtypes.AccountLogin_Input) (*placestreamtypes.AccountDefs_LoginResponse, error) { 14 + return atproto.Login(ctx, s.cli, body, s.model) 15 + } 16 + 17 + func (s *Server) handlePlaceStreamAccountOauthReturn(ctx context.Context, code string, iss string, state string) error { 18 + err := atproto.HandleOauthReturn(ctx, s.cli, code, iss, state, s.model) 19 + if err != nil { 20 + log.Error(ctx, "failed to handle OAuth return", "error", err) 21 + return err 22 + } 23 + return nil 24 + } 25 + 26 + func (s *Server) HandlePlaceStreamAccountOauthReturn(c echo.Context) error { 27 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandlePlaceStreamAccountOauthReturn") 28 + defer span.End() 29 + code := c.QueryParam("code") 30 + iss := c.QueryParam("iss") 31 + state := c.QueryParam("state") 32 + var handleErr error 33 + // func (s *Server) handlePlaceStreamAccountOauthReturn(ctx context.Context,code string,iss string,state string) (io.Reader, error) 34 + handleErr = s.handlePlaceStreamAccountOauthReturn(ctx, code, iss, state) 35 + if handleErr != nil { 36 + return handleErr 37 + } 38 + return c.Redirect(302, "https://longos.iameli.link/") 39 }
+3
pkg/spxrpc/spxrpc.go
··· 29 if err != nil { 30 return nil, err 31 } 32 return s, nil 33 } 34
··· 29 if err != nil { 30 return nil, err 31 } 32 + 33 + // this one we're handling manually because codegen doesn't support redirects 34 + e.GET("/xrpc/place.stream.account.oauthReturn", s.HandlePlaceStreamAccountOauthReturn) 35 return s, nil 36 } 37