Monorepo for Tangled tangled.org

knotserver,knotmirror/xrpc: `sync.requestCrawl` support

knotmirror ingests repo creation event from atproto relay through tap,
not from knotstream. Therefore, knotserver will request
`sync.requestCrawl` with optional new repo information to notify the
repo creation event.

Knot will call `sync.requestCrawl` on following cases:
- on startup
- when `/event` stream has failed
- on repo creation (mandatory)

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me 16d4b9e6 e871bb17

verified
+256 -6
+57
knotmirror/hostutil/hostutil.go
··· 1 + package hostutil 2 + 3 + import ( 4 + "fmt" 5 + "net/url" 6 + "strings" 7 + 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 + ) 10 + 11 + func ParseHostname(raw string) (hsotname string, noSSL bool, err error) { 12 + // handle case of bare hostname 13 + if !strings.Contains(raw, "://") { 14 + if strings.HasPrefix(raw, "localhost:") { 15 + raw = "http://" + raw 16 + } else { 17 + raw = "https://" + raw 18 + } 19 + } 20 + 21 + u, err := url.Parse(raw) 22 + if err != nil { 23 + return "", false, fmt.Errorf("not a valid host URL: %w", err) 24 + } 25 + 26 + switch u.Scheme { 27 + case "https", "wss": 28 + noSSL = false 29 + case "http", "ws": 30 + noSSL = true 31 + default: 32 + return "", false, fmt.Errorf("unsupported URL scheme: %s", u.Scheme) 33 + } 34 + 35 + // 'localhost' (exact string) is allowed *with* a required port number; SSL is optional 36 + if u.Hostname() == "localhost" { 37 + if u.Port() == "" || !strings.HasPrefix(u.Host, "localhost:") { 38 + return "", false, fmt.Errorf("port number is required for localhost") 39 + } 40 + return u.Host, noSSL, nil 41 + } 42 + 43 + // port numbers not allowed otherwise 44 + if u.Port() != "" { 45 + return "", false, fmt.Errorf("port number not allowed for non-local names") 46 + } 47 + 48 + // check it is a real hostname (eg, not IP address or single-word alias) 49 + h, err := syntax.ParseHandle(u.Host) 50 + if err != nil { 51 + return "", false, fmt.Errorf("not a public hostname") 52 + } 53 + 54 + // lower-case in response 55 + return h.Normalize().String(), noSSL, nil 56 + } 57 +
+1 -1
knotmirror/knotmirror.go
··· 49 49 } 50 50 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 51 51 52 - xrpc := xrpc.New(logger, cfg, db, resolver) 53 52 knotstream := knotstream.NewKnotStream(logger, db, cfg) 54 53 crawler := NewCrawler(logger, db) 55 54 resyncer := NewResyncer(logger, db, gitm, cfg) 56 55 adminpage := NewAdminServer(logger, db, resyncer) 56 + xrpc := xrpc.New(logger, cfg, db, resolver, knotstream) 57 57 58 58 // maintain repository list with tap 59 59 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
+2
knotmirror/resyncer.go
··· 281 281 282 282 repoUrl += "/info/refs?service=git-upload-pack" 283 283 284 + r.logger.Debug("checking knot reachability", "url", repoUrl) 285 + 284 286 client := http.Client{ 285 287 Timeout: 30 * time.Second, 286 288 }
+103
knotmirror/xrpc/sync_requestCrawl.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strings" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/atclient" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "github.com/bluesky-social/indigo/xrpc" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/knotmirror/db" 15 + "tangled.org/core/knotmirror/hostutil" 16 + "tangled.org/core/knotmirror/models" 17 + ) 18 + 19 + func (x *Xrpc) RequestCrawl(w http.ResponseWriter, r *http.Request) { 20 + var input tangled.SyncRequestCrawl_Input 21 + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 22 + panic("unimplemented") 23 + } 24 + 25 + ctx := r.Context() 26 + 27 + l := x.logger.With("input", input) 28 + 29 + hostname, noSSL, err := hostutil.ParseHostname(input.Hostname) 30 + if err != nil { 31 + l.Error("invalid hostname", "err", err) 32 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("hostname field empty or invalid: %s", input.Hostname)}) 33 + return 34 + } 35 + 36 + // TODO: check if host is Knot with knot.describeServer 37 + 38 + // store given repoAt to db 39 + // this will allow knotmirror to ingest repo creation event bypassing tap. 40 + // this step won't be needed once we introduce did-for-repo 41 + // TODO(boltless): remove this section 42 + if input.EnsureRepo != nil { 43 + repoAt, err := syntax.ParseATURI(*input.EnsureRepo) 44 + if err != nil { 45 + l.Error("invalid repo at-uri", "err", err) 46 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("repo parameter invalid: %s", *input.EnsureRepo)}) 47 + return 48 + } 49 + owner, err := x.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 50 + if err != nil || owner.Handle.IsInvalidHandle() { 51 + l.Error("failed to resolve ident", "err", err, "owner", repoAt.Authority().String()) 52 + writeErr(w, fmt.Errorf("failed to resolve repo owner")) 53 + return 54 + } 55 + xrpcc := xrpc.Client{Host: owner.PDSEndpoint()} 56 + out, err := atproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 57 + if err != nil { 58 + l.Error("failed to get repo record", "err", err, "repo", repoAt) 59 + writeErr(w, fmt.Errorf("failed to get repo record")) 60 + return 61 + } 62 + record := out.Value.Val.(*tangled.Repo) 63 + 64 + knotUrl := record.Knot 65 + if !strings.Contains(record.Knot, "://") { 66 + if noSSL { 67 + knotUrl = "http://" + knotUrl 68 + } else { 69 + knotUrl = "https://" + knotUrl 70 + } 71 + } 72 + 73 + repo := &models.Repo{ 74 + Did: owner.DID, 75 + Rkey: repoAt.RecordKey(), 76 + Cid: (*syntax.CID)(out.Cid), 77 + Name: record.Name, 78 + KnotDomain: knotUrl, 79 + State: models.RepoStatePending, 80 + ErrorMsg: "", 81 + RetryAfter: 0, 82 + RetryCount: 0, 83 + } 84 + 85 + if err := db.UpsertRepo(ctx, x.db, repo); err != nil { 86 + l.Error("failed to upsert repo", "err", err) 87 + writeErr(w, err) 88 + return 89 + } 90 + } 91 + 92 + // subscribe to requested host 93 + if !x.ks.CheckIfSubscribed(hostname) { 94 + if err := x.ks.SubscribeHost(ctx, hostname, noSSL); err != nil { 95 + // TODO(boltless): return HostBanned on banned hosts 96 + l.Error("failed to subscribe host", "err", err) 97 + writeErr(w, err) 98 + return 99 + } 100 + } 101 + 102 + w.WriteHeader(http.StatusOK) 103 + }
+5 -1
knotmirror/xrpc/xrpc.go
··· 12 12 "tangled.org/core/api/tangled" 13 13 "tangled.org/core/idresolver" 14 14 "tangled.org/core/knotmirror/config" 15 + "tangled.org/core/knotmirror/knotstream" 15 16 "tangled.org/core/log" 16 17 ) 17 18 ··· 19 20 cfg *config.Config 20 21 db *sql.DB 21 22 resolver *idresolver.Resolver 23 + ks *knotstream.KnotStream 22 24 logger *slog.Logger 23 25 } 24 26 25 - func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, resolver *idresolver.Resolver) *Xrpc { 27 + func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, resolver *idresolver.Resolver, ks *knotstream.KnotStream) *Xrpc { 26 28 return &Xrpc{ 27 29 cfg, 28 30 db, 29 31 resolver, 32 + ks, 30 33 log.SubLogger(logger, "xrpc"), 31 34 } 32 35 } ··· 47 50 r.Get("/"+tangled.GitTempListCommitsNSID, x.ListCommits) 48 51 r.Get("/"+tangled.GitTempListLanguagesNSID, x.ListLanguages) 49 52 r.Get("/"+tangled.GitTempListTagsNSID, x.ListTags) 53 + r.Post("/"+tangled.SyncRequestCrawlNSID, x.RequestCrawl) 50 54 51 55 return r 52 56 }
+5 -4
knotserver/config/config.go
··· 39 39 } 40 40 41 41 type Config struct { 42 - Repo Repo `env:",prefix=KNOT_REPO_"` 43 - Server Server `env:",prefix=KNOT_SERVER_"` 44 - Git Git `env:",prefix=KNOT_GIT_"` 45 - AppViewEndpoint string `env:"APPVIEW_ENDPOINT, default=https://tangled.org"` 42 + Repo Repo `env:",prefix=KNOT_REPO_"` 43 + Server Server `env:",prefix=KNOT_SERVER_"` 44 + Git Git `env:",prefix=KNOT_GIT_"` 45 + AppViewEndpoint string `env:"APPVIEW_ENDPOINT, default=https://tangled.org"` 46 + KnotMirrors []string `env:"KNOT_MIRRORS, default=https://mirror.tangled.network"` 46 47 } 47 48 48 49 func Load(ctx context.Context) (*Config, error) {
+29
knotserver/events.go
··· 7 7 "strconv" 8 8 "time" 9 9 10 + "github.com/bluesky-social/indigo/xrpc" 10 11 "github.com/gorilla/websocket" 12 + "tangled.org/core/api/tangled" 11 13 "tangled.org/core/log" 12 14 ) 13 15 ··· 61 63 return 62 64 } 63 65 66 + // try request crawl when connection closed 67 + defer func() { 68 + go func() { 69 + retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 70 + defer retryCancel() 71 + if err := h.requestCrawl(retryCtx); err != nil { 72 + l.Error("error requesting crawls", "err", err) 73 + } 74 + }() 75 + }() 76 + 64 77 for { 65 78 // wait for new data or timeout 66 79 select { ··· 118 131 119 132 return nil 120 133 } 134 + 135 + func (h *Knot) requestCrawl(ctx context.Context) error { 136 + h.l.Info("requesting crawl", "mirrors", h.c.KnotMirrors) 137 + input := &tangled.SyncRequestCrawl_Input{ 138 + Hostname: h.c.Server.Hostname, 139 + } 140 + for _, knotmirror := range h.c.KnotMirrors { 141 + xrpcc := xrpc.Client{Host: knotmirror} 142 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 143 + h.l.Error("error requesting crawl", "err", err) 144 + } else { 145 + h.l.Info("crawl requested successfully") 146 + } 147 + } 148 + return nil 149 + }
+16
knotserver/server.go
··· 5 5 "fmt" 6 6 "net/http" 7 7 8 + "github.com/bluesky-social/indigo/xrpc" 8 9 "github.com/urfave/cli/v3" 9 10 "tangled.org/core/api/tangled" 10 11 "tangled.org/core/hook" ··· 97 98 98 99 logger.Info("starting internal server", "address", c.Server.InternalListenAddr) 99 100 go http.ListenAndServe(c.Server.InternalListenAddr, imux) 101 + 102 + // TODO(boltless): too lazy here. should clear this up 103 + go func() { 104 + input := &tangled.SyncRequestCrawl_Input{ 105 + Hostname: c.Server.Hostname, 106 + } 107 + for _, knotmirror := range c.KnotMirrors { 108 + xrpcc := xrpc.Client{Host: knotmirror} 109 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 110 + logger.Error("error requesting crawl", "err", err) 111 + } else { 112 + logger.Info("crawl requested successfully") 113 + } 114 + } 115 + }() 100 116 101 117 logger.Info("starting main server", "address", c.Server.ListenAddr) 102 118 logger.Error("server error", "error", http.ListenAndServe(c.Server.ListenAddr, mux))
+26
knotserver/xrpc/create_repo.go
··· 1 1 package xrpc 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "errors" 6 7 "fmt" 7 8 "net/http" 8 9 "path/filepath" 9 10 "strings" 11 + "time" 10 12 11 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 14 "github.com/bluesky-social/indigo/atproto/syntax" ··· 120 122 repoPath, 121 123 ) 122 124 125 + // request crawl for this repository 126 + repoAt := fmt.Sprintf("at://%s/%s/%s", actorDid, tangled.RepoNSID, rkey) 127 + go func() { 128 + rCtx, rCancel := context.WithTimeout(context.Background(), 10*time.Second) 129 + defer rCancel() 130 + h.requestCrawl(rCtx, &tangled.SyncRequestCrawl_Input{ 131 + Hostname: h.Config.Server.Hostname, 132 + EnsureRepo: &repoAt, 133 + }) 134 + }() 135 + 123 136 w.WriteHeader(http.StatusOK) 137 + } 138 + 139 + func (h *Xrpc) requestCrawl(ctx context.Context, input *tangled.SyncRequestCrawl_Input) error { 140 + h.Logger.Info("requesting crawl", "mirrors", h.Config.KnotMirrors) 141 + for _, knotmirror := range h.Config.KnotMirrors { 142 + xrpcc := xrpc.Client{Host: knotmirror} 143 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 144 + h.Logger.Error("error requesting crawl", "err", err) 145 + } else { 146 + h.Logger.Info("crawl requested successfully") 147 + } 148 + } 149 + return nil 124 150 } 125 151 126 152 func validateRepoName(name string) error {
+9
nix/modules/knot.nix
··· 115 115 ''; 116 116 }; 117 117 118 + knotmirrors = mkOption { 119 + type = types.listOf types.str; 120 + default = [ 121 + "https://mirror.tangled.network" 122 + ]; 123 + description = "List of knotmirror hosts to request crawl"; 124 + }; 125 + 118 126 server = { 119 127 listenAddr = mkOption { 120 128 type = types.str; ··· 263 271 "KNOT_SERVER_PLC_URL=${cfg.server.plcUrl}" 264 272 "KNOT_SERVER_JETSTREAM_ENDPOINT=${cfg.server.jetstreamEndpoint}" 265 273 "KNOT_SERVER_OWNER=${cfg.server.owner}" 274 + "KNOT_MIRRORS=${concatStringsSep "," cfg.knotmirrors}" 266 275 "KNOT_SERVER_LOG_DIDS=${ 267 276 if cfg.server.logDids 268 277 then "true"
+3
nix/vm.nix
··· 111 111 jetstreamEndpoint = jetstream; 112 112 listenAddr = "0.0.0.0:6444"; 113 113 }; 114 + knotmirrors = [ 115 + "http://localhost:7000" 116 + ]; 114 117 }; 115 118 services.tangled.spindle = { 116 119 enable = true;