Monorepo for Tangled tangled.org

knotserver,knotmirror/xrpc: sync.requestCrawl support #1202

merged opened by boltless.me targeting master from sl/rvyxvsnxkkql

knotmirror ingests repo creation event from atproto relay through tap, not from knotstream. Therefore, knotserver will request sync.requestCrawl with optional new repo information to notify the repo creation event.

Knot will call sync.requestCrawl on following cases:

  • on startup
  • when /event stream has failed
  • on repo creation (mandatory)

Signed-off-by: Seongmin Lee git@boltless.me

Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mhk6tzq7xq22
+261 -6
Diff #6
+56
knotmirror/hostutil/hostutil.go
··· 1 + package hostutil 2 + 3 + import ( 4 + "fmt" 5 + "net/url" 6 + "strings" 7 + 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 + ) 10 + 11 + func ParseHostname(raw string) (hostname string, noSSL bool, err error) { 12 + // handle case of bare hostname 13 + if !strings.Contains(raw, "://") { 14 + if strings.HasPrefix(raw, "localhost:") { 15 + raw = "http://" + raw 16 + } else { 17 + raw = "https://" + raw 18 + } 19 + } 20 + 21 + u, err := url.Parse(raw) 22 + if err != nil { 23 + return "", false, fmt.Errorf("not a valid host URL: %w", err) 24 + } 25 + 26 + switch u.Scheme { 27 + case "https", "wss": 28 + noSSL = false 29 + case "http", "ws": 30 + noSSL = true 31 + default: 32 + return "", false, fmt.Errorf("unsupported URL scheme: %s", u.Scheme) 33 + } 34 + 35 + // 'localhost' (exact string) is allowed *with* a required port number; SSL is optional 36 + if u.Hostname() == "localhost" { 37 + if u.Port() == "" || !strings.HasPrefix(u.Host, "localhost:") { 38 + return "", false, fmt.Errorf("port number is required for localhost") 39 + } 40 + return u.Host, noSSL, nil 41 + } 42 + 43 + // port numbers not allowed otherwise 44 + if u.Port() != "" { 45 + return "", false, fmt.Errorf("port number not allowed for non-local names") 46 + } 47 + 48 + // check it is a real hostname (eg, not IP address or single-word alias) 49 + h, err := syntax.ParseHandle(u.Host) 50 + if err != nil { 51 + return "", false, fmt.Errorf("not a public hostname") 52 + } 53 + 54 + // lower-case in response 55 + return h.Normalize().String(), noSSL, nil 56 + }
+1 -1
knotmirror/knotmirror.go
··· 49 49 } 50 50 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 51 51 52 - xrpc := xrpc.New(logger, cfg, db, resolver) 53 52 knotstream := knotstream.NewKnotStream(logger, db, cfg) 54 53 crawler := NewCrawler(logger, db) 55 54 resyncer := NewResyncer(logger, db, gitm, cfg) 56 55 adminpage := NewAdminServer(logger, db, resyncer) 56 + xrpc := xrpc.New(logger, cfg, db, resolver, knotstream) 57 57 58 58 // maintain repository list with tap 59 59 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
+2
knotmirror/resyncer.go
··· 281 281 282 282 repoUrl += "/info/refs?service=git-upload-pack" 283 283 284 + r.logger.Debug("checking knot reachability", "url", repoUrl) 285 + 284 286 client := http.Client{ 285 287 Timeout: 30 * time.Second, 286 288 }
+104
knotmirror/xrpc/sync_requestCrawl.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strings" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/atclient" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "github.com/bluesky-social/indigo/xrpc" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/knotmirror/db" 15 + "tangled.org/core/knotmirror/hostutil" 16 + "tangled.org/core/knotmirror/models" 17 + ) 18 + 19 + func (x *Xrpc) RequestCrawl(w http.ResponseWriter, r *http.Request) { 20 + var input tangled.SyncRequestCrawl_Input 21 + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 22 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: "failed to decode json body"}) 23 + return 24 + } 25 + 26 + ctx := r.Context() 27 + 28 + l := x.logger.With("input", input) 29 + 30 + hostname, noSSL, err := hostutil.ParseHostname(input.Hostname) 31 + if err != nil { 32 + l.Error("invalid hostname", "err", err) 33 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("hostname field empty or invalid: %s", input.Hostname)}) 34 + return 35 + } 36 + 37 + // TODO: check if host is Knot with knot.describeServer 38 + 39 + // store given repoAt to db 40 + // this will allow knotmirror to ingest repo creation event bypassing tap. 41 + // this step won't be needed once we introduce did-for-repo 42 + // TODO(boltless): remove this section 43 + if input.EnsureRepo != nil { 44 + repoAt, err := syntax.ParseATURI(*input.EnsureRepo) 45 + if err != nil { 46 + l.Error("invalid repo at-uri", "err", err) 47 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("repo parameter invalid: %s", *input.EnsureRepo)}) 48 + return 49 + } 50 + owner, err := x.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 51 + if err != nil || owner.Handle.IsInvalidHandle() { 52 + l.Error("failed to resolve ident", "err", err, "owner", repoAt.Authority().String()) 53 + writeErr(w, fmt.Errorf("failed to resolve repo owner")) 54 + return 55 + } 56 + xrpcc := xrpc.Client{Host: owner.PDSEndpoint()} 57 + out, err := atproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 58 + if err != nil { 59 + l.Error("failed to get repo record", "err", err, "repo", repoAt) 60 + writeErr(w, fmt.Errorf("failed to get repo record")) 61 + return 62 + } 63 + record := out.Value.Val.(*tangled.Repo) 64 + 65 + knotUrl := record.Knot 66 + if !strings.Contains(record.Knot, "://") { 67 + if noSSL { 68 + knotUrl = "http://" + knotUrl 69 + } else { 70 + knotUrl = "https://" + knotUrl 71 + } 72 + } 73 + 74 + repo := &models.Repo{ 75 + Did: owner.DID, 76 + Rkey: repoAt.RecordKey(), 77 + Cid: (*syntax.CID)(out.Cid), 78 + Name: record.Name, 79 + KnotDomain: knotUrl, 80 + State: models.RepoStatePending, 81 + ErrorMsg: "", 82 + RetryAfter: 0, 83 + RetryCount: 0, 84 + } 85 + 86 + if err := db.UpsertRepo(ctx, x.db, repo); err != nil { 87 + l.Error("failed to upsert repo", "err", err) 88 + writeErr(w, err) 89 + return 90 + } 91 + } 92 + 93 + // subscribe to requested host 94 + if !x.ks.CheckIfSubscribed(hostname) { 95 + if err := x.ks.SubscribeHost(ctx, hostname, noSSL); err != nil { 96 + // TODO(boltless): return HostBanned on banned hosts 97 + l.Error("failed to subscribe host", "err", err) 98 + writeErr(w, err) 99 + return 100 + } 101 + } 102 + 103 + w.WriteHeader(http.StatusOK) 104 + }
+5 -1
knotmirror/xrpc/xrpc.go
··· 12 12 "tangled.org/core/api/tangled" 13 13 "tangled.org/core/idresolver" 14 14 "tangled.org/core/knotmirror/config" 15 + "tangled.org/core/knotmirror/knotstream" 15 16 "tangled.org/core/log" 16 17 ) 17 18 ··· 19 20 cfg *config.Config 20 21 db *sql.DB 21 22 resolver *idresolver.Resolver 23 + ks *knotstream.KnotStream 22 24 logger *slog.Logger 23 25 } 24 26 25 - func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, resolver *idresolver.Resolver) *Xrpc { 27 + func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, resolver *idresolver.Resolver, ks *knotstream.KnotStream) *Xrpc { 26 28 return &Xrpc{ 27 29 cfg, 28 30 db, 29 31 resolver, 32 + ks, 30 33 log.SubLogger(logger, "xrpc"), 31 34 } 32 35 } ··· 47 50 r.Get("/"+tangled.GitTempListCommitsNSID, x.ListCommits) 48 51 r.Get("/"+tangled.GitTempListLanguagesNSID, x.ListLanguages) 49 52 r.Get("/"+tangled.GitTempListTagsNSID, x.ListTags) 53 + r.Post("/"+tangled.SyncRequestCrawlNSID, x.RequestCrawl) 50 54 51 55 return r 52 56 }
+5 -4
knotserver/config/config.go
··· 39 39 } 40 40 41 41 type Config struct { 42 - Repo Repo `env:",prefix=KNOT_REPO_"` 43 - Server Server `env:",prefix=KNOT_SERVER_"` 44 - Git Git `env:",prefix=KNOT_GIT_"` 45 - AppViewEndpoint string `env:"APPVIEW_ENDPOINT, default=https://tangled.org"` 42 + Repo Repo `env:",prefix=KNOT_REPO_"` 43 + Server Server `env:",prefix=KNOT_SERVER_"` 44 + Git Git `env:",prefix=KNOT_GIT_"` 45 + AppViewEndpoint string `env:"APPVIEW_ENDPOINT, default=https://tangled.org"` 46 + KnotMirrors []string `env:"KNOT_MIRRORS, default=https://mirror.tangled.network"` 46 47 } 47 48 48 49 func Load(ctx context.Context) (*Config, error) {
+29
knotserver/events.go
··· 7 7 "strconv" 8 8 "time" 9 9 10 + "github.com/bluesky-social/indigo/xrpc" 10 11 "github.com/gorilla/websocket" 12 + "tangled.org/core/api/tangled" 11 13 "tangled.org/core/log" 12 14 ) 13 15 ··· 61 63 return 62 64 } 63 65 66 + // try request crawl when connection closed 67 + defer func() { 68 + go func() { 69 + retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 70 + defer retryCancel() 71 + if err := h.requestCrawl(retryCtx); err != nil { 72 + l.Error("error requesting crawls", "err", err) 73 + } 74 + }() 75 + }() 76 + 64 77 for { 65 78 // wait for new data or timeout 66 79 select { ··· 118 131 119 132 return nil 120 133 } 134 + 135 + func (h *Knot) requestCrawl(ctx context.Context) error { 136 + h.l.Info("requesting crawl", "mirrors", h.c.KnotMirrors) 137 + input := &tangled.SyncRequestCrawl_Input{ 138 + Hostname: h.c.Server.Hostname, 139 + } 140 + for _, knotmirror := range h.c.KnotMirrors { 141 + xrpcc := xrpc.Client{Host: knotmirror} 142 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 143 + h.l.Error("error requesting crawl", "err", err) 144 + } else { 145 + h.l.Info("crawl requested successfully") 146 + } 147 + } 148 + return nil 149 + }
+16
knotserver/server.go
··· 5 5 "fmt" 6 6 "net/http" 7 7 8 + "github.com/bluesky-social/indigo/xrpc" 8 9 "github.com/urfave/cli/v3" 9 10 "tangled.org/core/api/tangled" 10 11 "tangled.org/core/hook" ··· 98 99 logger.Info("starting internal server", "address", c.Server.InternalListenAddr) 99 100 go http.ListenAndServe(c.Server.InternalListenAddr, imux) 100 101 102 + // TODO(boltless): too lazy here. should clear this up 103 + go func() { 104 + input := &tangled.SyncRequestCrawl_Input{ 105 + Hostname: c.Server.Hostname, 106 + } 107 + for _, knotmirror := range c.KnotMirrors { 108 + xrpcc := xrpc.Client{Host: knotmirror} 109 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 110 + logger.Error("error requesting crawl", "err", err) 111 + } else { 112 + logger.Info("crawl requested successfully") 113 + } 114 + } 115 + }() 116 + 101 117 logger.Info("starting main server", "address", c.Server.ListenAddr) 102 118 logger.Error("server error", "error", http.ListenAndServe(c.Server.ListenAddr, mux)) 103 119
+30
knotserver/xrpc/create_repo.go
··· 1 1 package xrpc 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "errors" 6 7 "fmt" 7 8 "net/http" 8 9 "path/filepath" 9 10 "strings" 11 + "time" 10 12 11 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 14 "github.com/bluesky-social/indigo/atproto/syntax" ··· 120 122 repoPath, 121 123 ) 122 124 125 + // HACK: request crawl for this repository 126 + // Users won't want to sync entire network from their local knotmirror. 127 + // Therefore, to bypass the local tap, requestCrawl directly to the knotmirror. 128 + go func() { 129 + if h.Config.Server.Dev { 130 + repoAt := fmt.Sprintf("at://%s/%s/%s", actorDid, tangled.RepoNSID, rkey) 131 + rCtx, rCancel := context.WithTimeout(context.Background(), 10*time.Second) 132 + defer rCancel() 133 + h.requestCrawl(rCtx, &tangled.SyncRequestCrawl_Input{ 134 + Hostname: h.Config.Server.Hostname, 135 + EnsureRepo: &repoAt, 136 + }) 137 + } 138 + }() 139 + 123 140 w.WriteHeader(http.StatusOK) 124 141 } 125 142 143 + func (h *Xrpc) requestCrawl(ctx context.Context, input *tangled.SyncRequestCrawl_Input) error { 144 + h.Logger.Info("requesting crawl", "mirrors", h.Config.KnotMirrors) 145 + for _, knotmirror := range h.Config.KnotMirrors { 146 + xrpcc := xrpc.Client{Host: knotmirror} 147 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 148 + h.Logger.Error("error requesting crawl", "err", err) 149 + } else { 150 + h.Logger.Info("crawl requested successfully") 151 + } 152 + } 153 + return nil 154 + } 155 + 126 156 func validateRepoName(name string) error { 127 157 // check for path traversal attempts 128 158 if name == "." || name == ".." ||
+9
nix/modules/knot.nix
··· 115 115 ''; 116 116 }; 117 117 118 + knotmirrors = mkOption { 119 + type = types.listOf types.str; 120 + default = [ 121 + "https://mirror.tangled.network" 122 + ]; 123 + description = "List of knotmirror hosts to request crawl"; 124 + }; 125 + 118 126 server = { 119 127 listenAddr = mkOption { 120 128 type = types.str; ··· 263 271 "KNOT_SERVER_PLC_URL=${cfg.server.plcUrl}" 264 272 "KNOT_SERVER_JETSTREAM_ENDPOINT=${cfg.server.jetstreamEndpoint}" 265 273 "KNOT_SERVER_OWNER=${cfg.server.owner}" 274 + "KNOT_MIRRORS=${concatStringsSep "," cfg.knotmirrors}" 266 275 "KNOT_SERVER_LOG_DIDS=${ 267 276 if cfg.server.logDids 268 277 then "true"
+4
nix/vm.nix
··· 110 110 plcUrl = plcUrl; 111 111 jetstreamEndpoint = jetstream; 112 112 listenAddr = "0.0.0.0:6444"; 113 + dev = true; 113 114 }; 115 + knotmirrors = [ 116 + "http://localhost:7000" 117 + ]; 114 118 }; 115 119 services.tangled.spindle = { 116 120 enable = true;

History

7 rounds 4 comments
sign up or login to add to the discussion
1 commit
expand
nix,knotserver,knotmirror/xrpc: sync.requestCrawl support
2/3 timeout, 1/3 success
expand
expand 1 comment
pull request successfully merged
1 commit
expand
nix,knotserver,knotmirror/xrpc: sync.requestCrawl support
2/3 timeout, 1/3 success
expand
expand 3 comments

knotmirror/xrpc/sync_requestCrawl.go:22 this is unauthed right? so if this code goes through, I could curl -X POST /xrpc/sh.tangled.sync.requestCrawl -d "garbage" and crash knotmirror?

Thank you for checking. I should make a git hook to check unimplemented codes before push at this point...

1 commit
expand
nix,knotserver,knotmirror/xrpc: sync.requestCrawl support
2/3 timeout, 1/3 success
expand
expand 0 comments
1 commit
expand
nix,knotserver,knotmirror/xrpc: sync.requestCrawl support
expand 0 comments
1 commit
expand
knotserver,knotmirror/xrpc: sync.requestCrawl support
expand 0 comments
1 commit
expand
knotserver,knotmirror/xrpc: sync.requestCrawl support
expand 0 comments
1 commit
expand
knotserver,knotmirror/xrpc: sync.requestCrawl support
expand 0 comments