Live video on the AT Protocol

Merge pull request #571 from streamplace/eli/handle-changes

atproto: index handle changes

authored by Eli Mallon and committed by GitHub dc9cb6c0 7075bc0e

+401 -17
+1 -1
Makefile
··· 359 359 && sed -i.bak 's/AppBskyGraphBlock\.Main/AppBskyGraphBlock\.Record/' $$(find ./js/streamplace/src/lexicons/types/place/stream -type f) \ 360 360 && sed -i.bak 's/PlaceStreamChatProfile\.Main/PlaceStreamChatProfile\.Record/' $$(find ./js/streamplace/src/lexicons/types/place/stream -type f) \ 361 361 && for x in $$(find ./js/streamplace/src/lexicons -type f -name '*.ts'); do \ 362 - echo 'import { AppBskyRichtextFacet, AppBskyGraphBlock, ComAtprotoRepoStrongRef, AppBskyActorDefs, ComAtprotoSyncListRepos, AppBskyActorGetProfile, AppBskyFeedGetFeedSkeleton, ComAtprotoIdentityResolveHandle, ComAtprotoModerationCreateReport, ComAtprotoRepoCreateRecord, ComAtprotoRepoDeleteRecord, ComAtprotoRepoDescribeRepo, ComAtprotoRepoGetRecord, ComAtprotoRepoListRecords, ComAtprotoRepoPutRecord, ComAtprotoRepoUploadBlob, ComAtprotoServerDescribeServer, ComAtprotoSyncGetRecord, ComAtprotoSyncListReposComAtprotoRepoCreateRecord, ComAtprotoRepoDeleteRecord, ComAtprotoRepoGetRecord, ComAtprotoRepoListRecords } from "@atproto/api"' >> $$x; \ 362 + echo 'import { AppBskyRichtextFacet, AppBskyGraphBlock, ComAtprotoRepoStrongRef, AppBskyActorDefs, ComAtprotoSyncListRepos, AppBskyActorGetProfile, AppBskyFeedGetFeedSkeleton, ComAtprotoIdentityResolveHandle, ComAtprotoModerationCreateReport, ComAtprotoRepoCreateRecord, ComAtprotoRepoDeleteRecord, ComAtprotoRepoDescribeRepo, ComAtprotoRepoGetRecord, ComAtprotoRepoListRecords, ComAtprotoRepoPutRecord, ComAtprotoRepoUploadBlob, ComAtprotoServerDescribeServer, ComAtprotoSyncGetRecord, ComAtprotoSyncListReposComAtprotoRepoCreateRecord, ComAtprotoRepoDeleteRecord, ComAtprotoRepoGetRecord, ComAtprotoRepoListRecords, ComAtprotoIdentityRefreshIdentity } from "@atproto/api"' >> $$x; \ 363 363 done \ 364 364 && npx prettier --write $$(find ./js/streamplace/src/lexicons -type f -name '*.ts') \ 365 365 && find . | grep bak$$ | xargs rm
+84
js/docs/src/content/docs/lex-reference/openapi.json
··· 849 849 } 850 850 } 851 851 }, 852 + "/xrpc/com.atproto.identity.refreshIdentity": { 853 + "post": { 854 + "summary": "Request that the server re-resolve an identity (DID and handle). The server may ignore this request, or require authentication, depending on the role, implementation, and policy of the server.", 855 + "operationId": "com.atproto.identity.refreshIdentity", 856 + "tags": ["com.atproto.identity"], 857 + "responses": { 858 + "200": { 859 + "description": "Success", 860 + "content": { 861 + "application/json": { 862 + "schema": { 863 + "$ref": "#/components/schemas/com.atproto.identity.defs_identityInfo" 864 + } 865 + } 866 + } 867 + }, 868 + "400": { 869 + "description": "Bad Request", 870 + "content": { 871 + "application/json": { 872 + "schema": { 873 + "type": "object", 874 + "required": ["error", "message"], 875 + "properties": { 876 + "error": { 877 + "type": "string", 878 + "oneOf": [ 879 + { 880 + "const": "HandleNotFound" 881 + }, 882 + { 883 + "const": "DidNotFound" 884 + }, 885 + { 886 + "const": "DidDeactivated" 887 + } 888 + ] 889 + }, 890 + "message": { 891 + "type": "string" 892 + } 893 + } 894 + } 895 + } 896 + } 897 + } 898 + }, 899 + "requestBody": { 900 + "required": true, 901 + "content": { 902 + "application/json": { 903 + "schema": { 904 + "type": "object", 905 + "properties": { 906 + "identifier": { 907 + "type": "string", 908 + "format": "at-identifier" 909 + } 910 + }, 911 + "required": ["identifier"] 912 + } 913 + } 914 + } 915 + } 916 + } 917 + }, 852 918 "/xrpc/com.atproto.identity.resolveHandle": { 853 919 "get": { 854 920 "summary": "Resolves an atproto handle (hostname) to a DID. Does not necessarily bi-directionally verify against the the DID document.", ··· 1720 1786 } 1721 1787 }, 1722 1788 "required": ["name"] 1789 + }, 1790 + "com.atproto.identity.defs_identityInfo": { 1791 + "type": "object", 1792 + "properties": { 1793 + "did": { 1794 + "type": "string", 1795 + "format": "did" 1796 + }, 1797 + "handle": { 1798 + "type": "string", 1799 + "description": "The validated handle of the account; or 'handle.invalid' if the handle did not bi-directionally match the DID document.", 1800 + "format": "handle" 1801 + }, 1802 + "didDoc": { 1803 + "description": "The complete DID document for the identity." 1804 + } 1805 + }, 1806 + "required": ["did", "handle", "didDoc"] 1723 1807 }, 1724 1808 "app.bsky.feed.defs_skeletonFeedPost": { 1725 1809 "type": "object",
+44
lexicons/com/atproto/identity/refreshIdentity.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "com.atproto.identity.refreshIdentity", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Request that the server re-resolve an identity (DID and handle). The server may ignore this request, or require authentication, depending on the role, implementation, and policy of the server.", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": ["identifier"], 13 + "properties": { 14 + "identifier": { 15 + "type": "string", 16 + "format": "at-identifier" 17 + } 18 + } 19 + } 20 + }, 21 + "output": { 22 + "encoding": "application/json", 23 + "schema": { 24 + "type": "ref", 25 + "ref": "com.atproto.identity.defs#identityInfo" 26 + } 27 + }, 28 + "errors": [ 29 + { 30 + "name": "HandleNotFound", 31 + "description": "The resolution process confirmed that the handle does not resolve to any DID." 32 + }, 33 + { 34 + "name": "DidNotFound", 35 + "description": "The DID resolution process confirmed that there is no current DID." 36 + }, 37 + { 38 + "name": "DidDeactivated", 39 + "description": "The DID previously existed, but has been deactivated." 40 + } 41 + ] 42 + } 43 + } 44 + }
+35 -6
pkg/atproto/atproto.go
··· 42 42 } 43 43 44 44 func (atsync *ATProtoSynchronizer) SyncBlueskyRepo(ctx context.Context, handle string, mod model.Model) (*model.Repo, error) { 45 - ident, err := atsync.resolveIdent(ctx, handle) 45 + ident, err := atsync.resolveIdent(ctx, handle, true) 46 46 if err != nil { 47 47 return nil, fmt.Errorf("failed to resolve Bluesky handle %s: %w", handle, err) 48 48 } ··· 165 165 return &newRepo, nil 166 166 } 167 167 168 - func (atsync *ATProtoSynchronizer) resolveIdent(ctx context.Context, arg string) (*identity.Identity, error) { 168 + func (atsync *ATProtoSynchronizer) RefreshIdentity(ctx context.Context, did string) (*identity.Identity, error) { 169 + id, err := atsync.resolveIdent(ctx, did, false) 170 + if err != nil { 171 + return nil, fmt.Errorf("failed to resolve ident: %w", err) 172 + } 173 + newRepo := model.Repo{ 174 + DID: id.DID.String(), 175 + PDS: id.PDSEndpoint(), 176 + Handle: id.Handle.String(), 177 + } 178 + err = atsync.Model.UpdateRepo(&newRepo) 179 + if err != nil { 180 + return nil, fmt.Errorf("failed to update repo: %w", err) 181 + } 182 + return id, nil 183 + } 184 + 185 + func (atsync *ATProtoSynchronizer) resolveIdent(ctx context.Context, arg string, cached bool) (*identity.Identity, error) { 169 186 if atsync.PLCDirectory == nil { 170 187 atsync.PLCDirectory = CustomDirectory(atsync.CLI.PLCURL) 171 188 } 189 + if atsync.CachedPLCDirectory == nil { 190 + cachedDir := identity.NewCacheDirectory(atsync.PLCDirectory, 250_000, time.Hour*24, time.Minute*2, time.Minute*5) 191 + atsync.CachedPLCDirectory = &cachedDir 192 + } 193 + dir := atsync.PLCDirectory 194 + if cached { 195 + dir = atsync.CachedPLCDirectory 196 + } 172 197 id, err := syntax.ParseAtIdentifier(arg) 173 198 if err != nil { 174 199 return nil, err 175 200 } 176 201 177 - return atsync.PLCDirectory.Lookup(ctx, *id) 178 - } 202 + resolvedID, err := dir.Lookup(ctx, *id) 203 + if err != nil { 204 + return nil, err 205 + } 206 + log.Log(ctx, "resolved ident", "id", resolvedID.DID.String(), "handle", resolvedID.Handle.String()) 179 207 208 + return resolvedID, nil 209 + } 180 210 func CustomDirectory(plcURL string) identity.Directory { 181 211 base := identity.BaseDirectory{ 182 212 PLCURL: plcURL, ··· 191 221 // primary Bluesky PDS instance only supports HTTP resolution method 192 222 SkipDNSDomainSuffixes: []string{".bsky.social"}, 193 223 } 194 - cached := identity.NewCacheDirectory(&base, 250_000, time.Hour*24, time.Minute*2, time.Minute*5) 195 - return &cached 224 + return &base 196 225 } 197 226 198 227 func DIDDoc(host string) map[string]any {
+35 -8
pkg/atproto/firehose.go
··· 34 34 ) 35 35 36 36 type ATProtoSynchronizer struct { 37 - CLI *config.CLI 38 - Model model.Model 39 - StatefulDB *statedb.StatefulDB 40 - LastSeen time.Time 41 - LastEvent time.Time 42 - Noter notificationpkg.FirebaseNotifier 43 - Bus *bus.Bus 44 - PLCDirectory identity.Directory 37 + CLI *config.CLI 38 + Model model.Model 39 + StatefulDB *statedb.StatefulDB 40 + LastSeen time.Time 41 + LastEvent time.Time 42 + Noter notificationpkg.FirebaseNotifier 43 + Bus *bus.Bus 44 + PLCDirectory identity.Directory 45 + CachedPLCDirectory identity.Directory 45 46 } 46 47 47 48 func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error { ··· 97 98 rsc := &events.RepoStreamCallbacks{ 98 99 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 99 100 go atsync.handleCommitEventOps(ctx, evt) 101 + return nil 102 + }, 103 + RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 104 + go atsync.handleIdentityEventOps(ctx, evt) 100 105 return nil 101 106 }, 102 107 Error: func(evt *events.ErrorFrame) error { ··· 306 311 } 307 312 } 308 313 } 314 + 315 + func (atsync *ATProtoSynchronizer) handleIdentityEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) { 316 + handle := "" 317 + if evt.Handle != nil { 318 + handle = *evt.Handle 319 + } 320 + ctx = log.WithLogValues(ctx, "event", "identity", "did", evt.Did, "handle", handle, "func", "handleIdentityEventOps") 321 + r, err := atsync.Model.GetRepo(evt.Did) 322 + if err != nil { 323 + log.Error(ctx, "failed to get repo", "err", err) 324 + return 325 + } 326 + if r == nil { 327 + log.Debug(ctx, "no repo found for identity", "did", evt.Did) 328 + return 329 + } 330 + _, err = atsync.RefreshIdentity(ctx, evt.Did) 331 + if err != nil { 332 + log.Error(ctx, "failed to refresh ident", "err", err) 333 + return 334 + } 335 + }
+112
pkg/atproto/handle_test.go
··· 1 + package atproto 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + "testing" 8 + "time" 9 + 10 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + lexutil "github.com/bluesky-social/indigo/lex/util" 12 + "github.com/bluesky-social/indigo/util" 13 + "github.com/stretchr/testify/require" 14 + "stream.place/streamplace/pkg/bus" 15 + "stream.place/streamplace/pkg/config" 16 + "stream.place/streamplace/pkg/devenv" 17 + "stream.place/streamplace/pkg/log" 18 + "stream.place/streamplace/pkg/model" 19 + "stream.place/streamplace/pkg/statedb" 20 + "stream.place/streamplace/pkg/streamplace" 21 + ) 22 + 23 + func TestHandleChange(t *testing.T) { 24 + ctx, cancel := context.WithCancel(context.Background()) 25 + defer cancel() 26 + 27 + dev := devenv.WithDevEnv(t) 28 + t.Logf("dev: %+v", dev) 29 + cli := config.CLI{ 30 + PublicHost: "example.com", 31 + DBURL: ":memory:", 32 + RelayHost: strings.ReplaceAll(dev.PDSURL, "http://", "ws://"), 33 + PLCURL: dev.PLCURL, 34 + } 35 + 36 + t.Logf("cli: %+v", cli) 37 + b := bus.NewBus() 38 + cli.DataDir = t.TempDir() 39 + mod, err := model.MakeDB(":memory:") 40 + require.NoError(t, err) 41 + state, err := statedb.MakeDB(context.Background(), &cli, nil, mod) 42 + require.NoError(t, err) 43 + atsync := &ATProtoSynchronizer{ 44 + CLI: &cli, 45 + StatefulDB: state, 46 + Model: mod, 47 + Bus: b, 48 + PLCDirectory: dev.TestDirectory(), 49 + } 50 + 51 + done := make(chan struct{}) 52 + 53 + go func() { 54 + err := atsync.StartFirehose(ctx) 55 + require.NoError(t, err) 56 + close(done) 57 + }() 58 + 59 + user := dev.CreateAccount(t) 60 + 61 + msg := &streamplace.ChatMessage{ 62 + LexiconTypeID: "place.stream.chat.message", 63 + Text: "Hello, world!", 64 + CreatedAt: time.Now().Add(-time.Second).Format(util.ISO8601), 65 + Streamer: user.DID, 66 + } 67 + 68 + _, err = comatproto.RepoCreateRecord(ctx, user.XRPC, &comatproto.RepoCreateRecord_Input{ 69 + Collection: "place.stream.chat.message", 70 + Repo: user.DID, 71 + Record: &lexutil.LexiconTypeDecoder{Val: msg}, 72 + }) 73 + require.NoError(t, err) 74 + 75 + var message *streamplace.ChatDefs_MessageView 76 + err = untilNoErrors(t, func() error { 77 + messages, err := mod.MostRecentChatMessages(user.DID) 78 + if err != nil { 79 + return err 80 + } 81 + if len(messages) != 1 { 82 + return fmt.Errorf("expected 2 messages, got %d", len(messages)) 83 + } 84 + message = messages[0] 85 + return nil 86 + }) 87 + require.NoError(t, err) 88 + require.Equal(t, user.Handle, message.Author.Handle) 89 + 90 + log.Log(ctx, "updating handle", "handle", "new-handle.test") 91 + err = comatproto.IdentityUpdateHandle(context.Background(), user.XRPC, &comatproto.IdentityUpdateHandle_Input{ 92 + Handle: "new-handle.test", 93 + }) 94 + require.NoError(t, err) 95 + 96 + err = untilNoErrors(t, func() error { 97 + messages, err := mod.MostRecentChatMessages(user.DID) 98 + if err != nil { 99 + return err 100 + } 101 + if len(messages) != 1 { 102 + return fmt.Errorf("expected 2 messages, got %d", len(messages)) 103 + } 104 + message = messages[0] 105 + if message.Author.Handle != "new-handle.test" { 106 + return fmt.Errorf("expected new handle, got %s", message.Author.Handle) 107 + } 108 + return nil 109 + }) 110 + require.NoError(t, err) 111 + require.Equal(t, message.Author.Handle, "new-handle.test") 112 + }
+1 -1
pkg/atproto/labeler_firehose.go
··· 54 54 func (atsync *ATProtoSynchronizer) StartLabelerFirehoseRetry(ctx context.Context, did string) error { 55 55 ctx = log.WithLogValues(ctx, "func", "StartLabelerFirehose") 56 56 57 - ident, err := atsync.resolveIdent(ctx, did) 57 + ident, err := atsync.resolveIdent(ctx, did, true) 58 58 if err != nil { 59 59 return fmt.Errorf("failed to resolve DID %s: %w", did, err) 60 60 }
+1 -1
pkg/config/config.go
··· 250 250 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 251 251 TimeFormat: time.RFC3339, 252 252 })), 253 - slogGorm.WithTraceAll(), 253 + // slogGorm.WithTraceAll(), 254 254 ) 255 255 256 256 func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
+57
pkg/devenv/devenv.go
··· 5 5 "context" 6 6 "encoding/json" 7 7 "fmt" 8 + "net" 9 + "net/http" 8 10 "os/exec" 9 11 "path/filepath" 10 12 "runtime" 13 + "strings" 11 14 "testing" 15 + "time" 12 16 13 17 comatproto "github.com/bluesky-social/indigo/api/atproto" 18 + "github.com/bluesky-social/indigo/atproto/identity" 14 19 "github.com/bluesky-social/indigo/xrpc" 15 20 "github.com/google/uuid" 16 21 "github.com/stretchr/testify/require" ··· 127 132 XRPC: xrpcc, 128 133 } 129 134 } 135 + 136 + // Custom RoundTripper for intercepting .test domain requests 137 + type TestRoundTripper struct { 138 + DevEnv *DevEnv 139 + } 140 + 141 + func (d *DevEnv) TestHTTPClient() *http.Client { 142 + return &http.Client{ 143 + Transport: d.TestRoundTripper(), 144 + } 145 + } 146 + 147 + func (d *DevEnv) TestRoundTripper() *TestRoundTripper { 148 + return &TestRoundTripper{DevEnv: d} 149 + } 150 + 151 + func (rt *TestRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { 152 + if strings.HasSuffix(req.URL.Hostname(), ".test") { 153 + log.Log(context.Background(), "intercepting .test domain request", "url", req.URL.String()) 154 + upstreamURL := fmt.Sprintf("%s%s", rt.DevEnv.PDSURL, req.URL.Path) 155 + upstreamReq, err := http.NewRequest(req.Method, upstreamURL, req.Body) 156 + if err != nil { 157 + return nil, err 158 + } 159 + upstreamReq.Header = req.Header 160 + upstreamReq.Host = req.URL.Hostname() 161 + upstreamResp, err := http.DefaultTransport.RoundTrip(upstreamReq) 162 + if err != nil { 163 + return nil, err 164 + } 165 + return upstreamResp, nil 166 + } 167 + // For non-.test domains, use the default transport 168 + return http.DefaultTransport.RoundTrip(req) 169 + } 170 + 171 + func (d *DevEnv) TestDirectory() identity.Directory { 172 + // We need to create a new directory with our custom client 173 + base := identity.BaseDirectory{ 174 + PLCURL: d.PLCURL, 175 + HTTPClient: *d.TestHTTPClient(), 176 + Resolver: net.Resolver{ 177 + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { 178 + d := net.Dialer{Timeout: time.Second * 3} 179 + return d.DialContext(ctx, network, address) 180 + }, 181 + }, 182 + TryAuthoritativeDNS: true, 183 + SkipDNSDomainSuffixes: []string{".bsky.social"}, 184 + } 185 + return &base 186 + }
+12
pkg/spxrpc/com_atproto_identity.go
··· 14 14 } 15 15 return &comatprototypes.IdentityResolveHandle_Output{Did: did}, nil 16 16 } 17 + 18 + func (s *Server) handleComAtprotoIdentityRefreshIdentity(ctx context.Context, body *comatprototypes.IdentityRefreshIdentity_Input) (*comatprototypes.IdentityDefs_IdentityInfo, error) { 19 + ident, err := s.ATSync.RefreshIdentity(ctx, body.Identifier) 20 + if err != nil { 21 + return nil, err 22 + } 23 + return &comatprototypes.IdentityDefs_IdentityInfo{ 24 + Did: ident.DID.String(), 25 + Handle: ident.Handle.String(), 26 + DidDoc: ident.DIDDocument(), 27 + }, nil 28 + }
+19
pkg/spxrpc/stubs.go
··· 62 62 } 63 63 64 64 func (s *Server) RegisterHandlersComAtproto(e *echo.Echo) error { 65 + e.POST("/xrpc/com.atproto.identity.refreshIdentity", s.HandleComAtprotoIdentityRefreshIdentity) 65 66 e.GET("/xrpc/com.atproto.identity.resolveHandle", s.HandleComAtprotoIdentityResolveHandle) 66 67 e.POST("/xrpc/com.atproto.moderation.createReport", s.HandleComAtprotoModerationCreateReport) 67 68 e.GET("/xrpc/com.atproto.repo.describeRepo", s.HandleComAtprotoRepoDescribeRepo) ··· 72 73 e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord) 73 74 e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) 74 75 return nil 76 + } 77 + 78 + func (s *Server) HandleComAtprotoIdentityRefreshIdentity(c echo.Context) error { 79 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoIdentityRefreshIdentity") 80 + defer span.End() 81 + 82 + var body comatprototypes.IdentityRefreshIdentity_Input 83 + if err := c.Bind(&body); err != nil { 84 + return err 85 + } 86 + var out *comatprototypes.IdentityDefs_IdentityInfo 87 + var handleErr error 88 + // func (s *Server) handleComAtprotoIdentityRefreshIdentity(ctx context.Context,body *comatprototypes.IdentityRefreshIdentity_Input) (*comatprototypes.IdentityDefs_IdentityInfo, error) 89 + out, handleErr = s.handleComAtprotoIdentityRefreshIdentity(ctx, &body) 90 + if handleErr != nil { 91 + return handleErr 92 + } 93 + return c.JSON(200, out) 75 94 } 76 95 77 96 func (s *Server) HandleComAtprotoIdentityResolveHandle(c echo.Context) error {