Monorepo for Tangled tangled.org

knotserver,knotmirror/xrpc: sync.requestCrawl support #1202

merged opened by boltless.me targeting master from sl/rvyxvsnxkkql

knotmirror ingests repo creation event from atproto relay through tap, not from knotstream. Therefore, knotserver will request sync.requestCrawl with optional new repo information to notify the repo creation event.

Knot will call sync.requestCrawl on following cases:

  • on startup
  • when /event stream has failed
  • on repo creation (mandatory)

Signed-off-by: Seongmin Lee git@boltless.me

Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mhk6tzq7xq22
+256 -6
Diff #2
+57
knotmirror/hostutil/hostutil.go
··· 1 + package hostutil 2 + 3 + import ( 4 + "fmt" 5 + "net/url" 6 + "strings" 7 + 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 + ) 10 + 11 + func ParseHostname(raw string) (hsotname string, noSSL bool, err error) { 12 + // handle case of bare hostname 13 + if !strings.Contains(raw, "://") { 14 + if strings.HasPrefix(raw, "localhost:") { 15 + raw = "http://" + raw 16 + } else { 17 + raw = "https://" + raw 18 + } 19 + } 20 + 21 + u, err := url.Parse(raw) 22 + if err != nil { 23 + return "", false, fmt.Errorf("not a valid host URL: %w", err) 24 + } 25 + 26 + switch u.Scheme { 27 + case "https", "wss": 28 + noSSL = false 29 + case "http", "ws": 30 + noSSL = true 31 + default: 32 + return "", false, fmt.Errorf("unsupported URL scheme: %s", u.Scheme) 33 + } 34 + 35 + // 'localhost' (exact string) is allowed *with* a required port number; SSL is optional 36 + if u.Hostname() == "localhost" { 37 + if u.Port() == "" || !strings.HasPrefix(u.Host, "localhost:") { 38 + return "", false, fmt.Errorf("port number is required for localhost") 39 + } 40 + return u.Host, noSSL, nil 41 + } 42 + 43 + // port numbers not allowed otherwise 44 + if u.Port() != "" { 45 + return "", false, fmt.Errorf("port number not allowed for non-local names") 46 + } 47 + 48 + // check it is a real hostname (eg, not IP address or single-word alias) 49 + h, err := syntax.ParseHandle(u.Host) 50 + if err != nil { 51 + return "", false, fmt.Errorf("not a public hostname") 52 + } 53 + 54 + // lower-case in response 55 + return h.Normalize().String(), noSSL, nil 56 + } 57 +
+1 -1
knotmirror/knotmirror.go
··· 49 49 } 50 50 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 51 51 52 - xrpc := xrpc.New(logger, cfg, db, resolver) 53 52 knotstream := knotstream.NewKnotStream(logger, db, cfg) 54 53 crawler := NewCrawler(logger, db) 55 54 resyncer := NewResyncer(logger, db, gitm, cfg) 56 55 adminpage := NewAdminServer(logger, db, resyncer) 56 + xrpc := xrpc.New(logger, cfg, db, resolver, knotstream) 57 57 58 58 // maintain repository list with tap 59 59 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
+2
knotmirror/resyncer.go
··· 281 281 282 282 repoUrl += "/info/refs?service=git-upload-pack" 283 283 284 + r.logger.Debug("checking knot reachability", "url", repoUrl) 285 + 284 286 client := http.Client{ 285 287 Timeout: 30 * time.Second, 286 288 }
+103
knotmirror/xrpc/sync_requestCrawl.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strings" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/atclient" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "github.com/bluesky-social/indigo/xrpc" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/knotmirror/db" 15 + "tangled.org/core/knotmirror/hostutil" 16 + "tangled.org/core/knotmirror/models" 17 + ) 18 + 19 + func (x *Xrpc) RequestCrawl(w http.ResponseWriter, r *http.Request) { 20 + var input tangled.SyncRequestCrawl_Input 21 + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 22 + panic("unimplemented") 23 + } 24 + 25 + ctx := r.Context() 26 + 27 + l := x.logger.With("input", input) 28 + 29 + hostname, noSSL, err := hostutil.ParseHostname(input.Hostname) 30 + if err != nil { 31 + l.Error("invalid hostname", "err", err) 32 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("hostname field empty or invalid: %s", input.Hostname)}) 33 + return 34 + } 35 + 36 + // TODO: check if host is Knot with knot.describeServer 37 + 38 + // store given repoAt to db 39 + // this will allow knotmirror to ingest repo creation event bypassing tap. 40 + // this step won't be needed once we introduce did-for-repo 41 + // TODO(boltless): remove this section 42 + if input.EnsureRepo != nil { 43 + repoAt, err := syntax.ParseATURI(*input.EnsureRepo) 44 + if err != nil { 45 + l.Error("invalid repo at-uri", "err", err) 46 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("repo parameter invalid: %s", *input.EnsureRepo)}) 47 + return 48 + } 49 + owner, err := x.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 50 + if err != nil || owner.Handle.IsInvalidHandle() { 51 + l.Error("failed to resolve ident", "err", err, "owner", repoAt.Authority().String()) 52 + writeErr(w, fmt.Errorf("failed to resolve repo owner")) 53 + return 54 + } 55 + xrpcc := xrpc.Client{Host: owner.PDSEndpoint()} 56 + out, err := atproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 57 + if err != nil { 58 + l.Error("failed to get repo record", "err", err, "repo", repoAt) 59 + writeErr(w, fmt.Errorf("failed to get repo record")) 60 + return 61 + } 62 + record := out.Value.Val.(*tangled.Repo) 63 + 64 + knotUrl := record.Knot 65 + if !strings.Contains(record.Knot, "://") { 66 + if noSSL { 67 + knotUrl = "http://" + knotUrl 68 + } else { 69 + knotUrl = "https://" + knotUrl 70 + } 71 + } 72 + 73 + repo := &models.Repo{ 74 + Did: owner.DID, 75 + Rkey: repoAt.RecordKey(), 76 + Cid: (*syntax.CID)(out.Cid), 77 + Name: record.Name, 78 + KnotDomain: knotUrl, 79 + State: models.RepoStatePending, 80 + ErrorMsg: "", 81 + RetryAfter: 0, 82 + RetryCount: 0, 83 + } 84 + 85 + if err := db.UpsertRepo(ctx, x.db, repo); err != nil { 86 + l.Error("failed to upsert repo", "err", err) 87 + writeErr(w, err) 88 + return 89 + } 90 + } 91 + 92 + // subscribe to requested host 93 + if !x.ks.CheckIfSubscribed(hostname) { 94 + if err := x.ks.SubscribeHost(ctx, hostname, noSSL); err != nil { 95 + // TODO(boltless): return HostBanned on banned hosts 96 + l.Error("failed to subscribe host", "err", err) 97 + writeErr(w, err) 98 + return 99 + } 100 + } 101 + 102 + w.WriteHeader(http.StatusOK) 103 + }
+5 -1
knotmirror/xrpc/xrpc.go
··· 12 12 "tangled.org/core/api/tangled" 13 13 "tangled.org/core/idresolver" 14 14 "tangled.org/core/knotmirror/config" 15 + "tangled.org/core/knotmirror/knotstream" 15 16 "tangled.org/core/log" 16 17 ) 17 18 ··· 19 20 cfg *config.Config 20 21 db *sql.DB 21 22 resolver *idresolver.Resolver 23 + ks *knotstream.KnotStream 22 24 logger *slog.Logger 23 25 } 24 26 25 - func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, resolver *idresolver.Resolver) *Xrpc { 27 + func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, resolver *idresolver.Resolver, ks *knotstream.KnotStream) *Xrpc { 26 28 return &Xrpc{ 27 29 cfg, 28 30 db, 29 31 resolver, 32 + ks, 30 33 log.SubLogger(logger, "xrpc"), 31 34 } 32 35 } ··· 47 50 r.Get("/"+tangled.GitTempListCommitsNSID, x.ListCommits) 48 51 r.Get("/"+tangled.GitTempListLanguagesNSID, x.ListLanguages) 49 52 r.Get("/"+tangled.GitTempListTagsNSID, x.ListTags) 53 + r.Post("/"+tangled.SyncRequestCrawlNSID, x.RequestCrawl) 50 54 51 55 return r 52 56 }
+5 -4
knotserver/config/config.go
··· 39 39 } 40 40 41 41 type Config struct { 42 - Repo Repo `env:",prefix=KNOT_REPO_"` 43 - Server Server `env:",prefix=KNOT_SERVER_"` 44 - Git Git `env:",prefix=KNOT_GIT_"` 45 - AppViewEndpoint string `env:"APPVIEW_ENDPOINT, default=https://tangled.org"` 42 + Repo Repo `env:",prefix=KNOT_REPO_"` 43 + Server Server `env:",prefix=KNOT_SERVER_"` 44 + Git Git `env:",prefix=KNOT_GIT_"` 45 + AppViewEndpoint string `env:"APPVIEW_ENDPOINT, default=https://tangled.org"` 46 + KnotMirrors []string `env:"KNOT_MIRRORS, default=https://mirror.tangled.network"` 46 47 } 47 48 48 49 func Load(ctx context.Context) (*Config, error) {
+29
knotserver/events.go
··· 7 7 "strconv" 8 8 "time" 9 9 10 + "github.com/bluesky-social/indigo/xrpc" 10 11 "github.com/gorilla/websocket" 12 + "tangled.org/core/api/tangled" 11 13 "tangled.org/core/log" 12 14 ) 13 15 ··· 61 63 return 62 64 } 63 65 66 + // try request crawl when connection closed 67 + defer func() { 68 + go func() { 69 + retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 70 + defer retryCancel() 71 + if err := h.requestCrawl(retryCtx); err != nil { 72 + l.Error("error requesting crawls", "err", err) 73 + } 74 + }() 75 + }() 76 + 64 77 for { 65 78 // wait for new data or timeout 66 79 select { ··· 118 131 119 132 return nil 120 133 } 134 + 135 + func (h *Knot) requestCrawl(ctx context.Context) error { 136 + h.l.Info("requesting crawl", "mirrors", h.c.KnotMirrors) 137 + input := &tangled.SyncRequestCrawl_Input{ 138 + Hostname: h.c.Server.Hostname, 139 + } 140 + for _, knotmirror := range h.c.KnotMirrors { 141 + xrpcc := xrpc.Client{Host: knotmirror} 142 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 143 + h.l.Error("error requesting crawl", "err", err) 144 + } else { 145 + h.l.Info("crawl requested successfully") 146 + } 147 + } 148 + return nil 149 + }
+16
knotserver/server.go
··· 5 5 "fmt" 6 6 "net/http" 7 7 8 + "github.com/bluesky-social/indigo/xrpc" 8 9 "github.com/urfave/cli/v3" 9 10 "tangled.org/core/api/tangled" 10 11 "tangled.org/core/hook" ··· 98 99 logger.Info("starting internal server", "address", c.Server.InternalListenAddr) 99 100 go http.ListenAndServe(c.Server.InternalListenAddr, imux) 100 101 102 + // TODO(boltless): too lazy here. should clear this up 103 + go func() { 104 + input := &tangled.SyncRequestCrawl_Input{ 105 + Hostname: c.Server.Hostname, 106 + } 107 + for _, knotmirror := range c.KnotMirrors { 108 + xrpcc := xrpc.Client{Host: knotmirror} 109 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 110 + logger.Error("error requesting crawl", "err", err) 111 + } else { 112 + logger.Info("crawl requested successfully") 113 + } 114 + } 115 + }() 116 + 101 117 logger.Info("starting main server", "address", c.Server.ListenAddr) 102 118 logger.Error("server error", "error", http.ListenAndServe(c.Server.ListenAddr, mux)) 103 119
+26
knotserver/xrpc/create_repo.go
··· 1 1 package xrpc 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "errors" 6 7 "fmt" 7 8 "net/http" 8 9 "path/filepath" 9 10 "strings" 11 + "time" 10 12 11 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 14 "github.com/bluesky-social/indigo/atproto/syntax" ··· 120 122 repoPath, 121 123 ) 122 124 125 + // request crawl for this repository 126 + repoAt := fmt.Sprintf("at://%s/%s/%s", actorDid, tangled.RepoNSID, rkey) 127 + go func() { 128 + rCtx, rCancel := context.WithTimeout(context.Background(), 10*time.Second) 129 + defer rCancel() 130 + h.requestCrawl(rCtx, &tangled.SyncRequestCrawl_Input{ 131 + Hostname: h.Config.Server.Hostname, 132 + EnsureRepo: &repoAt, 133 + }) 134 + }() 135 + 123 136 w.WriteHeader(http.StatusOK) 124 137 } 125 138 139 + func (h *Xrpc) requestCrawl(ctx context.Context, input *tangled.SyncRequestCrawl_Input) error { 140 + h.Logger.Info("requesting crawl", "mirrors", h.Config.KnotMirrors) 141 + for _, knotmirror := range h.Config.KnotMirrors { 142 + xrpcc := xrpc.Client{Host: knotmirror} 143 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 144 + h.Logger.Error("error requesting crawl", "err", err) 145 + } else { 146 + h.Logger.Info("crawl requested successfully") 147 + } 148 + } 149 + return nil 150 + } 151 + 126 152 func validateRepoName(name string) error { 127 153 // check for path traversal attempts 128 154 if name == "." || name == ".." ||
+9
nix/modules/knot.nix
··· 115 115 ''; 116 116 }; 117 117 118 + knotmirrors = mkOption { 119 + type = types.listOf types.str; 120 + default = [ 121 + "https://mirror.tangled.network" 122 + ]; 123 + description = "List of knotmirror hosts to request crawl"; 124 + }; 125 + 118 126 server = { 119 127 listenAddr = mkOption { 120 128 type = types.str; ··· 263 271 "KNOT_SERVER_PLC_URL=${cfg.server.plcUrl}" 264 272 "KNOT_SERVER_JETSTREAM_ENDPOINT=${cfg.server.jetstreamEndpoint}" 265 273 "KNOT_SERVER_OWNER=${cfg.server.owner}" 274 + "KNOT_MIRRORS=${concatStringsSep "," cfg.knotmirrors}" 266 275 "KNOT_SERVER_LOG_DIDS=${ 267 276 if cfg.server.logDids 268 277 then "true"
+3
nix/vm.nix
··· 111 111 jetstreamEndpoint = jetstream; 112 112 listenAddr = "0.0.0.0:6444"; 113 113 }; 114 + knotmirrors = [ 115 + "http://localhost:7000" 116 + ]; 114 117 }; 115 118 services.tangled.spindle = { 116 119 enable = true;

History

7 rounds 4 comments
sign up or login to add to the discussion
1 commit
expand
nix,knotserver,knotmirror/xrpc: sync.requestCrawl support
2/3 timeout, 1/3 success
expand
expand 1 comment
pull request successfully merged
1 commit
expand
nix,knotserver,knotmirror/xrpc: sync.requestCrawl support
2/3 timeout, 1/3 success
expand
expand 3 comments

knotmirror/xrpc/sync_requestCrawl.go:22 this is unauthed right? so if this code goes through, I could curl -X POST /xrpc/sh.tangled.sync.requestCrawl -d "garbage" and crash knotmirror?

Thank you for checking. I should make a git hook to check unimplemented codes before push at this point...

1 commit
expand
nix,knotserver,knotmirror/xrpc: sync.requestCrawl support
2/3 timeout, 1/3 success
expand
expand 0 comments
1 commit
expand
nix,knotserver,knotmirror/xrpc: sync.requestCrawl support
expand 0 comments
1 commit
expand
knotserver,knotmirror/xrpc: sync.requestCrawl support
expand 0 comments
1 commit
expand
knotserver,knotmirror/xrpc: sync.requestCrawl support
expand 0 comments
1 commit
expand
knotserver,knotmirror/xrpc: sync.requestCrawl support
expand 0 comments