Monorepo for Tangled tangled.org

knotmirror: introduce knotmirror #1160

merged opened by boltless.me targeting master from sl/knotmirror

knotmirror is an external service that is intended to be used by appview. It will ingest all known git repos and provide xrpc methods to inspect them, so appview won't need to fetch individual knots on every page render.

Signed-off-by: Seongmin Lee git@boltless.me

Labels

None yet.

assignee

None yet.

Participants 3
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mh3qbj6xvc22
+582 -232
Interdiff #1 #2
+1 -1
.gitignore
··· 21 21 # Created if following hacking.md 22 22 genjwks.out 23 23 /nix/vm-data 24 - /mirror 25 24 blog/build/ 25 + /mirror
+9 -2
cmd/knotmirror/main.go
··· 10 10 "github.com/carlmjohnson/versioninfo" 11 11 "github.com/urfave/cli/v3" 12 12 "tangled.org/core/knotmirror" 13 + "tangled.org/core/knotmirror/config" 13 14 "tangled.org/core/log" 14 15 ) 15 16 ··· 46 47 } 47 48 48 49 func runKnotMirror(ctx context.Context, cmd *cli.Command) error { 49 - // TODO: generate Config from arguments & pass down to Run() 50 - return knotmirror.Run(ctx) 50 + logger := log.FromContext(ctx) 51 + cfg, err := config.Load(ctx) 52 + if err != nil { 53 + return err 54 + } 55 + 56 + logger.Debug("config loaded:", "config", cfg) 57 + return knotmirror.Run(ctx, cfg) 51 58 }
flake.nix

This patch was likely rebased, as context lines do not match.

go.mod

This patch was likely rebased, as context lines do not match.

go.sum

This patch was likely rebased, as context lines do not match.

+79 -29
knotmirror/adminpage.go
··· 3 3 import ( 4 4 "database/sql" 5 5 "embed" 6 + "fmt" 7 + "html" 6 8 "html/template" 7 9 "log/slog" 8 10 "net/http" 9 11 "strconv" 10 12 "time" 11 13 14 + "github.com/bluesky-social/indigo/atproto/syntax" 12 15 "github.com/go-chi/chi/v5" 13 16 "tangled.org/core/appview/pagination" 14 17 "tangled.org/core/knotmirror/db" 15 18 "tangled.org/core/knotmirror/models" 16 - "tangled.org/core/orm" 17 19 ) 18 20 19 21 //go:embed templates/*.html ··· 22 24 const repoPageSize = 20 23 25 24 26 type AdminServer struct { 25 - db *sql.DB 27 + db *sql.DB 28 + resyncer *Resyncer 29 + logger *slog.Logger 26 30 } 27 31 28 - func NewAdminServer(database *sql.DB) *AdminServer { 29 - return &AdminServer{db: database} 32 + func NewAdminServer(l *slog.Logger, database *sql.DB, resyncer *Resyncer) *AdminServer { 33 + return &AdminServer{ 34 + db: database, 35 + resyncer: resyncer, 36 + logger: l, 37 + } 30 38 } 31 39 32 40 func (s *AdminServer) Router() http.Handler { 33 41 r := chi.NewRouter() 34 42 r.Get("/repos", s.handleRepos()) 35 43 r.Get("/hosts", s.handleHosts()) 44 + 45 + r.Post("/api/triggerRepoResync", s.handleRepoResyncTrigger()) 46 + r.Post("/api/cancelRepoResync", s.handleRepoResyncCancel()) 36 47 return r 37 48 } 38 49 ··· 41 52 "add": func(a, b int) int { return a + b }, 42 53 "sub": func(a, b int) int { return a - b }, 43 54 "readt": func(ts int64) string { 44 - if ts == 0 { 55 + if ts <= 0 { 45 56 return "n/a" 46 57 } 47 58 return time.Unix(ts, 0).Format("2006-01-02 15:04") ··· 56 67 } 57 68 58 69 func (s *AdminServer) handleRepos() http.HandlerFunc { 59 - // TODO: prepare template 60 70 tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/repos.html")) 61 71 return func(w http.ResponseWriter, r *http.Request) { 62 72 pageNum, _ := strconv.Atoi(r.URL.Query().Get("page")) 63 73 if pageNum < 1 { 64 74 pageNum = 1 65 75 } 66 - var ( 67 - did = r.URL.Query().Get("did") 68 - knot = r.URL.Query().Get("knot") 69 - state = r.URL.Query().Get("state") 70 - ) 71 - 72 76 page := pagination.Page{ 73 77 Offset: (pageNum - 1) * repoPageSize, 74 78 Limit: repoPageSize, 75 79 } 76 - var filters []orm.Filter 77 80 78 - if did != "" { 79 - filters = append(filters, orm.FilterEq("did", did)) 80 - } 81 - if knot != "" { 82 - filters = append(filters, orm.FilterEq("knot_domain", knot)) 83 - } 84 - if state != "" { 85 - filters = append(filters, orm.FilterEq("state", state)) 86 - } 81 + var ( 82 + did = r.URL.Query().Get("did") 83 + knot = r.URL.Query().Get("knot") 84 + state = r.URL.Query().Get("state") 85 + ) 87 86 88 - repos, err := db.ListRepos(r.Context(), s.db, page, filters...) 87 + repos, err := db.ListRepos(r.Context(), s.db, page, did, knot, state) 89 88 if err != nil { 90 89 http.Error(w, err.Error(), http.StatusInternalServerError) 91 90 } ··· 110 109 func (s *AdminServer) handleHosts() http.HandlerFunc { 111 110 tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/hosts.html")) 112 111 return func(w http.ResponseWriter, r *http.Request) { 113 - var status = r.URL.Query().Get("status") 114 - 115 - var filters []orm.Filter 116 - if status != "" { 117 - filters = append(filters, orm.FilterEq("status", status)) 112 + var status = models.HostStatus(r.URL.Query().Get("status")) 113 + if status == "" { 114 + status = models.HostStatusActive 118 115 } 119 116 120 - hosts, err := db.ListHosts(r.Context(), s.db, filters...) 117 + hosts, err := db.ListHosts(r.Context(), s.db, status) 121 118 if err != nil { 122 119 http.Error(w, err.Error(), http.StatusInternalServerError) 123 120 } ··· 130 127 } 131 128 } 132 129 } 130 + 131 + func (s *AdminServer) handleRepoResyncTrigger() http.HandlerFunc { 132 + return func(w http.ResponseWriter, r *http.Request) { 133 + var repoQuery = r.FormValue("repo") 134 + 135 + repo, err := syntax.ParseATURI(repoQuery) 136 + if err != nil || repo.RecordKey() == "" { 137 + writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 138 + return 139 + } 140 + 141 + if err := s.resyncer.TriggerResyncJob(r.Context(), repo); err != nil { 142 + s.logger.Error("failed to trigger resync job", "err", err) 143 + writeNotif(w, http.StatusInternalServerError, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 144 + return 145 + } 146 + writeNotif(w, http.StatusOK, "success") 147 + } 148 + } 149 + 150 + func (s *AdminServer) handleRepoResyncCancel() http.HandlerFunc { 151 + return func(w http.ResponseWriter, r *http.Request) { 152 + var repoQuery = r.FormValue("repo") 153 + 154 + repo, err := syntax.ParseATURI(repoQuery) 155 + if err != nil || repo.RecordKey() == "" { 156 + writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 157 + return 158 + } 159 + 160 + s.resyncer.CancelResyncJob(repo) 161 + writeNotif(w, http.StatusOK, "success") 162 + } 163 + } 164 + 165 + func writeNotif(w http.ResponseWriter, status int, msg string) { 166 + w.Header().Set("Content-Type", "text/html") 167 + w.WriteHeader(status) 168 + 169 + class := "info" 170 + switch { 171 + case status >= 500: 172 + class = "error" 173 + case status >= 400: 174 + class = "warn" 175 + } 176 + 177 + fmt.Fprintf(w, 178 + `<div hx-swap-oob="beforeend:#notifications"><div class="notif %s">%s</div></div>`, 179 + class, 180 + html.EscapeString(msg), 181 + ) 182 + }
+1 -1
knotmirror/config/config.go
··· 9 9 10 10 type Config struct { 11 11 TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"` 12 - DbPath string `env:"MIRROR_DB_PATH, default=mirror.db"` 12 + DbUrl string `env:"MIRROR_DB_URL, required"` 13 13 KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not scheme is not specified 14 14 KnotSSRF bool `env:"MIRROR_KNOT_SSRF, default=false"` 15 15 GitRepoBasePath string `env:"MIRROR_GIT_BASEPATH, default=repos"`
knotmirror/crawler.go

This file has not been changed.

+48 -16
knotmirror/db/db.go
··· 4 4 "context" 5 5 "database/sql" 6 6 "fmt" 7 - "strings" 8 - _ "github.com/mattn/go-sqlite3" 7 + "time" 8 + 9 + _ "github.com/jackc/pgx/v5/stdlib" 9 10 ) 10 11 11 - func Make(ctx context.Context, dbPath string) (*sql.DB, error) { 12 - // https://github.com/mattn/go-sqlite3#connection-string 13 - opts := []string{ 14 - "_foreign_keys=1", 15 - "_journal_mode=WAL", 16 - "_synchronous=NORMAL", 17 - "_auto_vacuum=incremental", 12 + func Make(ctx context.Context, dbUrl string, maxConns int) (*sql.DB, error) { 13 + db, err := sql.Open("pgx", dbUrl) 14 + if err != nil { 15 + return nil, fmt.Errorf("opening db: %w", err) 18 16 } 19 17 20 - db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 21 - if err != nil { 22 - return nil, err 18 + db.SetMaxOpenConns(maxConns) 19 + db.SetMaxIdleConns(maxConns) 20 + db.SetConnMaxIdleTime(time.Hour) 21 + 22 + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) 23 + defer cancel() 24 + if err := db.PingContext(pingCtx); err != nil { 25 + db.Close() 26 + return nil, fmt.Errorf("ping db: %w", err) 23 27 } 24 28 25 29 conn, err := db.Conn(ctx) ··· 46 50 error_msg text, 47 51 retry_count integer not null default 0, 48 52 retry_after integer not null default 0, 53 + db_created_at timestamptz not null default now(), 54 + db_updated_at timestamptz not null default now(), 49 55 50 - unique(did, rkey) 56 + constraint repos_pkey primary key (did, rkey) 51 57 ); 52 58 53 59 -- knot hosts 54 60 create table if not exists hosts ( 55 61 hostname text not null, 56 - no_ssl integer not null default 0, 62 + no_ssl boolean not null default false, 57 63 status text not null default 'active', 58 - last_seq integer not null default -1, 64 + last_seq bigint not null default -1, 65 + db_created_at timestamptz not null default now(), 66 + db_updated_at timestamptz not null default now(), 59 67 60 - unique(hostname) 68 + constraint hosts_pkey primary key (hostname) 61 69 ); 70 + 71 + create index if not exists idx_repos_aturi on repos (at_uri); 72 + create index if not exists idx_repos_db_updated_at on repos (db_updated_at desc); 73 + create index if not exists idx_hosts_db_updated_at on hosts (db_updated_at desc); 74 + 75 + create or replace function set_updated_at() 76 + returns trigger as $$ 77 + begin 78 + new.db_updated_at = now(); 79 + return new; 80 + end; 81 + $$ language plpgsql; 82 + 83 + drop trigger if exists repos_set_updated_at on repos; 84 + create trigger repos_set_updated_at 85 + before update on repos 86 + for each row 87 + execute function set_updated_at(); 88 + 89 + drop trigger if exists hosts_set_updated_at on hosts; 90 + create trigger hosts_set_updated_at 91 + before update on hosts 92 + for each row 93 + execute function set_updated_at(); 62 94 `) 63 95 if err != nil { 64 96 return nil, fmt.Errorf("initializing db schema: %w", err)
+8 -22
knotmirror/db/hosts.go
··· 6 6 "errors" 7 7 "fmt" 8 8 "log" 9 - "strings" 10 9 11 10 "tangled.org/core/knotmirror/models" 12 - "tangled.org/core/orm" 13 11 ) 14 12 15 13 func UpsertHost(ctx context.Context, e *sql.DB, host *models.Host) error { 16 14 if _, err := e.ExecContext(ctx, 17 15 `insert into hosts (hostname, no_ssl, status, last_seq) 18 - values (?, ?, ?, ?) 16 + values ($1, $2, $3, $4) 19 17 on conflict(hostname) do update set 20 18 no_ssl = excluded.no_ssl, 21 19 status = excluded.status, ··· 35 33 var host models.Host 36 34 if err := e.QueryRowContext(ctx, 37 35 `select hostname, no_ssl, status, last_seq 38 - from hosts where hostname = ?`, 36 + from hosts where hostname = $1`, 39 37 hostname, 40 38 ).Scan( 41 39 &host.Hostname, ··· 62 60 continue 63 61 } 64 62 if _, err := tx.ExecContext(ctx, 65 - `update hosts set last_seq = ? where hostname = ?`, 63 + `update hosts set last_seq = $1 where hostname = $2`, 66 64 cur.LastSeq, 67 65 cur.Hostname, 68 66 ); err != nil { 69 - log.Println("failed to persist host cursor", "host:", cur.Hostname, "lastSeq", cur.LastSeq) 67 + log.Println("failed to persist host cursor", "host", cur.Hostname, "lastSeq", cur.LastSeq, "err", err) 70 68 } 71 69 } 72 70 return tx.Commit() 73 71 } 74 72 75 - func ListHosts(ctx context.Context, e *sql.DB, filters ...orm.Filter) ([]models.Host, error) { 76 - var conditions []string 77 - var args []any 78 - 79 - for _, filter := range filters { 80 - conditions = append(conditions, filter.Condition()) 81 - args = append(args, filter.Arg()...) 82 - } 83 - 84 - whereClause := "" 85 - if len(conditions) > 0 { 86 - whereClause = " where " + strings.Join(conditions, " and ") 87 - } 88 - 73 + func ListHosts(ctx context.Context, e *sql.DB, status models.HostStatus) ([]models.Host, error) { 89 74 rows, err := e.QueryContext(ctx, 90 75 `select hostname, no_ssl, status, last_seq 91 - from hosts` + whereClause, 92 - args..., 76 + from hosts 77 + where status = $1`, 78 + status, 93 79 ) 94 80 if err != nil { 95 81 return nil, fmt.Errorf("querying hosts: %w", err)
+24 -17
knotmirror/db/repos.go
··· 9 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 10 "tangled.org/core/appview/pagination" 11 11 "tangled.org/core/knotmirror/models" 12 - "tangled.org/core/orm" 13 12 ) 14 13 15 14 func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error { 16 15 if _, err := e.ExecContext(ctx, 17 16 `insert into repos (did, rkey, cid, name, knot_domain) 18 - values (?, ?, ?, ?, ?)`, 17 + values ($1, $2, $3, $4, $5)`, 19 18 did, rkey, cid, name, knot, 20 19 ); err != nil { 21 20 return fmt.Errorf("inserting repo: %w", err) ··· 26 25 func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 27 26 if _, err := e.ExecContext(ctx, 28 27 `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 29 - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 28 + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 30 29 on conflict(did, rkey) do update set 31 30 cid = excluded.cid, 32 31 name = excluded.name, ··· 58 57 func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error { 59 58 if _, err := e.ExecContext(ctx, 60 59 `update repos 61 - set state = ? 62 - where did = ? and rkey = ?`, 60 + set state = $1 61 + where did = $2 and rkey = $3`, 63 62 state, 64 63 did, rkey, 65 64 ); err != nil { ··· 70 69 71 70 func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error { 72 71 if _, err := e.ExecContext(ctx, 73 - `delete from repos where did = ? and rkey = ?`, 72 + `delete from repos where did = $1 and rkey = $2`, 74 73 did, 75 74 rkey, 76 75 ); err != nil { ··· 95 94 retry_count, 96 95 retry_after 97 96 from repos 98 - where did = ? and name = ?`, 97 + where did = $1 and name = $2`, 99 98 did, 100 99 name, 101 100 ).Scan( ··· 135 134 retry_count, 136 135 retry_after 137 136 from repos 138 - where at_uri = ?`, 137 + where at_uri = $1`, 139 138 aturi, 140 139 ).Scan( 141 140 &repo.Did, ··· 158 157 return &repo, nil 159 158 } 160 159 161 - func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, filters ...orm.Filter) ([]models.Repo, error) { 160 + func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, did, knot, state string) ([]models.Repo, error) { 162 161 var conditions []string 163 162 var args []any 164 163 165 - for _, filter := range filters { 166 - conditions = append(conditions, filter.Condition()) 167 - args = append(args, filter.Arg()...) 164 + pageClause := "" 165 + if page.Limit > 0 { 166 + pageClause = " limit $1 offset $2 " 167 + args = append(args, page.Limit, page.Offset) 168 168 } 169 169 170 170 whereClause := "" 171 + if did != "" { 172 + conditions = append(conditions, fmt.Sprintf("did = $%d", len(args)+1)) 173 + args = append(args, did) 174 + } 175 + if knot != "" { 176 + conditions = append(conditions, fmt.Sprintf("knot_domain = $%d", len(args)+1)) 177 + args = append(args, knot) 178 + } 179 + if state != "" { 180 + conditions = append(conditions, fmt.Sprintf("state = $%d", len(args)+1)) 181 + args = append(args, state) 182 + } 171 183 if len(conditions) > 0 { 172 184 whereClause = "WHERE " + conditions[0] 173 185 for _, condition := range conditions[1:] { 174 186 whereClause += " AND " + condition 175 187 } 176 188 } 177 - pageClause := "" 178 - if page.Limit > 0 { 179 - pageClause = " limit ? offset ? " 180 - args = append(args, page.Limit, page.Offset) 181 - } 182 189 183 190 query := ` 184 191 select
+221 -21
knotmirror/git.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 + "net/url" 8 + "os" 7 9 "os/exec" 10 + "path/filepath" 8 11 "regexp" 9 12 "strings" 10 13 11 14 "github.com/go-git/go-git/v5" 12 15 gitconfig "github.com/go-git/go-git/v5/config" 13 16 "github.com/go-git/go-git/v5/plumbing/transport" 17 + "tangled.org/core/knotmirror/models" 14 18 ) 15 19 16 - type GitMirrorClient interface { 17 - Clone(ctx context.Context, path, url string) error 18 - Fetch(ctx context.Context, path, url string) error 20 + type GitMirrorManager interface { 21 + Exist(repo *models.Repo) (bool, error) 22 + // RemoteSetUrl updates git repository 'origin' remote 23 + RemoteSetUrl(ctx context.Context, repo *models.Repo) error 24 + // Clone clones the repository as a mirror 25 + Clone(ctx context.Context, repo *models.Repo) error 26 + // Fetch fetches the repository 27 + Fetch(ctx context.Context, repo *models.Repo) error 28 + // Sync mirrors the repository. It will clone the repository if repository doesn't exist. 29 + Sync(ctx context.Context, repo *models.Repo) error 19 30 } 20 31 21 - type CliGitMirrorClient struct{} 32 + type CliGitMirrorManager struct { 33 + repoBasePath string 34 + knotUseSSL bool 35 + } 36 + 37 + func NewCliGitMirrorManager(repoBasePath string, knotUseSSL bool) *CliGitMirrorManager { 38 + return &CliGitMirrorManager{ 39 + repoBasePath, 40 + knotUseSSL, 41 + } 42 + } 43 + 44 + var _ GitMirrorManager = new(CliGitMirrorManager) 22 45 23 - var _ GitMirrorClient = new(CliGitMirrorClient) 46 + func (c *CliGitMirrorManager) makeRepoPath(repo *models.Repo) string { 47 + return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String()) 48 + } 24 49 25 - func (c *CliGitMirrorClient) Clone(ctx context.Context, path, url string) error { 50 + func (c *CliGitMirrorManager) Exist(repo *models.Repo) (bool, error) { 51 + return isDir(c.makeRepoPath(repo)) 52 + } 53 + 54 + func (c *CliGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error { 55 + path := c.makeRepoPath(repo) 56 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 57 + if err != nil { 58 + return fmt.Errorf("constructing repo remote url: %w", err) 59 + } 60 + cmd := exec.CommandContext(ctx, "git", "-C", path, "remote", "set-url", "origin", url) 61 + if out, err := cmd.CombinedOutput(); err != nil { 62 + if ctx.Err() != nil { 63 + return ctx.Err() 64 + } 65 + msg := string(out) 66 + return fmt.Errorf("running 'git remote set-url origin %s': %w\n%s", url, err, msg) 67 + } 68 + return nil 69 + } 70 + 71 + func (c *CliGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error { 72 + path := c.makeRepoPath(repo) 73 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 74 + if err != nil { 75 + return fmt.Errorf("constructing repo remote url: %w", err) 76 + } 77 + return c.clone(ctx, path, url) 78 + } 79 + 80 + func (c *CliGitMirrorManager) clone(ctx context.Context, path, url string) error { 26 81 cmd := exec.CommandContext(ctx, "git", "clone", "--mirror", url, path) 27 82 if out, err := cmd.CombinedOutput(); err != nil { 28 83 if ctx.Err() != nil { 29 84 return ctx.Err() 30 85 } 31 86 msg := string(out) 32 - if classification := classifyError(msg); classification != nil { 87 + if classification := classifyCliError(msg); classification != nil { 33 88 return classification 34 89 } 35 - return fmt.Errorf("cloning repo: %w\n%s", err, msg) 90 + return fmt.Errorf("running 'git clone --mirror %s': %w\n%s", url, err, msg) 36 91 } 37 92 return nil 38 93 } 39 94 40 - func (c *CliGitMirrorClient) Fetch(ctx context.Context, path, url string) error { 95 + func (c *CliGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error { 96 + path := c.makeRepoPath(repo) 97 + return c.fetch(ctx, path) 98 + } 99 + 100 + func (c *CliGitMirrorManager) fetch(ctx context.Context, path string) error { 101 + // TODO: use `repo.Knot` instead of depending on origin 41 102 cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", "origin") 42 103 if out, err := cmd.CombinedOutput(); err != nil { 43 104 if ctx.Err() != nil { 44 105 return ctx.Err() 45 106 } 46 - return fmt.Errorf("fetching repo: %w\n%s", err, string(out)) 107 + return fmt.Errorf("running 'git fetch': %w\n%s", err, string(out)) 108 + } 109 + return nil 110 + } 111 + 112 + func (c *CliGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error { 113 + path := c.makeRepoPath(repo) 114 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 115 + if err != nil { 116 + return fmt.Errorf("constructing repo remote url: %w", err) 117 + } 118 + 119 + exist, err := isDir(path) 120 + if err != nil { 121 + return fmt.Errorf("checking repo path: %w", err) 122 + } 123 + if !exist { 124 + if err := c.clone(ctx, path, url); err != nil { 125 + return fmt.Errorf("cloning repo: %w", err) 126 + } 127 + } else { 128 + if err := c.fetch(ctx, path); err != nil { 129 + return fmt.Errorf("fetching repo: %w", err) 130 + } 47 131 } 48 132 return nil 49 133 } 50 134 51 135 var ( 52 - ErrDNSFailure = errors.New("git: dns failure (could not resolve host)") 53 - ErrCertExpired = errors.New("git: certificate has expired") 54 - ErrRepoNotFound = errors.New("git: repository not found") 136 + ErrDNSFailure = errors.New("git: knot: dns failure (could not resolve host)") 137 + ErrCertExpired = errors.New("git: knot: certificate has expired") 138 + ErrCertMismatch = errors.New("git: knot: certificate hostname mismatch") 139 + ErrTLSHandshake = errors.New("git: knot: tls handshake failure") 140 + ErrHTTPStatus = errors.New("git: knot: request url returned error") 141 + ErrUnreachable = errors.New("git: knot: could not connect to server") 142 + ErrRepoNotFound = errors.New("git: repo: repository not found") 55 143 ) 56 144 57 145 var ( 58 - reDNS = regexp.MustCompile(`Could not resolve host:`) 146 + reDNSFailure = regexp.MustCompile(`Could not resolve host:`) 59 147 reCertExpired = regexp.MustCompile(`SSL certificate OpenSSL verify result: certificate has expired`) 60 - reRepoNotFound = regexp.MustCompile(`repository '.*' not found`) 148 + reCertMismatch = regexp.MustCompile(`SSL: no alternative certificate subject name matches target hostname`) 149 + reTLSHandshake = regexp.MustCompile(`TLS connect error: (.*)`) 150 + reHTTPStatus = regexp.MustCompile(`The requested URL returned error: (\d\d\d)`) 151 + reUnreachable = regexp.MustCompile(`Could not connect to server`) 152 + reRepoNotFound = regexp.MustCompile(`repository '.*?' not found`) 61 153 ) 62 154 63 - func classifyError(stderr string) error { 155 + // classifyCliError classifies git cli error message. It will return nil for unknown error messages 156 + func classifyCliError(stderr string) error { 64 157 msg := strings.TrimSpace(stderr) 158 + if m := reTLSHandshake.FindStringSubmatch(msg); len(m) > 1 { 159 + return fmt.Errorf("%w: %s", ErrTLSHandshake, m[1]) 160 + } 161 + if m := reHTTPStatus.FindStringSubmatch(msg); len(m) > 1 { 162 + return fmt.Errorf("%w: %s", ErrHTTPStatus, m[1]) 163 + } 65 164 switch { 66 - case reDNS.MatchString(msg): 165 + case reDNSFailure.MatchString(msg): 67 166 return ErrDNSFailure 68 167 case reCertExpired.MatchString(msg): 69 168 return ErrCertExpired 169 + case reCertMismatch.MatchString(msg): 170 + return ErrCertMismatch 171 + case reUnreachable.MatchString(msg): 172 + return ErrUnreachable 70 173 case reRepoNotFound.MatchString(msg): 71 174 return ErrRepoNotFound 72 175 } 73 176 return nil 74 177 } 75 178 76 - type GoGitMirrorClient struct{} 179 + type GoGitMirrorManager struct { 180 + repoBasePath string 181 + knotUseSSL bool 182 + } 183 + 184 + func NewGoGitMirrorClient(repoBasePath string, knotUseSSL bool) *GoGitMirrorManager { 185 + return &GoGitMirrorManager{ 186 + repoBasePath, 187 + knotUseSSL, 188 + } 189 + } 190 + 191 + var _ GitMirrorManager = new(GoGitMirrorManager) 192 + 193 + func (c *GoGitMirrorManager) makeRepoPath(repo *models.Repo) string { 194 + return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String()) 195 + } 196 + 197 + func (c *GoGitMirrorManager) Exist(repo *models.Repo) (bool, error) { 198 + return isDir(c.makeRepoPath(repo)) 199 + } 77 200 78 - var _ GitMirrorClient = new(GoGitMirrorClient) 201 + func (c *GoGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error { 202 + panic("unimplemented") 203 + } 204 + 205 + func (c *GoGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error { 206 + path := c.makeRepoPath(repo) 207 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 208 + if err != nil { 209 + return fmt.Errorf("constructing repo remote url: %w", err) 210 + } 211 + return c.clone(ctx, path, url) 212 + } 79 213 80 - func (c *GoGitMirrorClient) Clone(ctx context.Context, path string, url string) error { 214 + func (c *GoGitMirrorManager) clone(ctx context.Context, path, url string) error { 81 215 _, err := git.PlainCloneContext(ctx, path, true, &git.CloneOptions{ 82 216 URL: url, 83 217 Mirror: true, ··· 88 222 return nil 89 223 } 90 224 91 - func (c *GoGitMirrorClient) Fetch(ctx context.Context, path string, url string) error { 225 + func (c *GoGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error { 226 + path := c.makeRepoPath(repo) 227 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 228 + if err != nil { 229 + return fmt.Errorf("constructing repo remote url: %w", err) 230 + } 231 + 232 + return c.fetch(ctx, path, url) 233 + } 234 + 235 + func (c *GoGitMirrorManager) fetch(ctx context.Context, path, url string) error { 92 236 gr, err := git.PlainOpen(path) 93 237 if err != nil { 94 238 return fmt.Errorf("opening local repo: %w", err) ··· 103 247 } 104 248 return nil 105 249 } 250 + 251 + func (c *GoGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error { 252 + path := c.makeRepoPath(repo) 253 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 254 + if err != nil { 255 + return fmt.Errorf("constructing repo remote url: %w", err) 256 + } 257 + 258 + exist, err := isDir(path) 259 + if err != nil { 260 + return fmt.Errorf("checking repo path: %w", err) 261 + } 262 + if !exist { 263 + if err := c.clone(ctx, path, url); err != nil { 264 + return fmt.Errorf("cloning repo: %w", err) 265 + } 266 + } else { 267 + if err := c.fetch(ctx, path, url); err != nil { 268 + return fmt.Errorf("fetching repo: %w", err) 269 + } 270 + } 271 + return nil 272 + } 273 + 274 + func makeRepoRemoteUrl(knot, didSlashRepo string, knotUseSSL bool) (string, error) { 275 + if !strings.Contains(knot, "://") { 276 + if knotUseSSL { 277 + knot = "https://" + knot 278 + } else { 279 + knot = "http://" + knot 280 + } 281 + } 282 + 283 + u, err := url.Parse(knot) 284 + if err != nil { 285 + return "", err 286 + } 287 + 288 + if u.Scheme != "http" && u.Scheme != "https" { 289 + return "", fmt.Errorf("unsupported scheme: %s", u.Scheme) 290 + } 291 + 292 + u = u.JoinPath(didSlashRepo) 293 + return u.String(), nil 294 + } 295 + 296 + func isDir(path string) (bool, error) { 297 + info, err := os.Stat(path) 298 + if err == nil && info.IsDir() { 299 + return true, nil 300 + } 301 + if os.IsNotExist(err) { 302 + return false, nil 303 + } 304 + return false, err 305 + }
+9 -12
knotmirror/knotmirror.go
··· 15 15 "tangled.org/core/log" 16 16 ) 17 17 18 - func Run(ctx context.Context) error { 18 + func Run(ctx context.Context, cfg *config.Config) error { 19 19 // make sure every services are cleaned up on fast return 20 20 ctx, cancel := context.WithCancel(ctx) 21 21 defer cancel() 22 22 23 23 logger := log.FromContext(ctx) 24 - cfg, err := config.Load(ctx) 25 - if err != nil { 26 - return fmt.Errorf("loading config: %w", err) 27 - } 28 24 29 - logger.Debug("config loaded:", "config", cfg) 30 - 31 - db, err := db.Make(ctx, cfg.DbPath) 25 + db, err := db.Make(ctx, cfg.DbUrl, 32) 32 26 if err != nil { 33 27 return fmt.Errorf("initializing db: %w", err) 34 28 } 35 29 30 + // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 31 + gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL) 32 + 36 33 res, err := db.ExecContext(ctx, 37 - `update repos set state = ? where state = ?`, 34 + `update repos set state = $1 where state = $2`, 38 35 models.RepoStateDesynchronized, 39 36 models.RepoStateResyncing, 40 37 ) ··· 49 46 50 47 knotstream := knotstream.NewKnotStream(logger, db, cfg) 51 48 crawler := NewCrawler(logger, db) 52 - resyncer := NewResyncer(logger, db, cfg) 53 - adminpage := NewAdminServer(db) 49 + resyncer := NewResyncer(logger, db, gitm, cfg) 50 + adminpage := NewAdminServer(logger, db, resyncer) 54 51 55 52 // maintain repository list with tap 56 53 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 57 - tap := NewTapClient(logger, cfg, db, knotstream) 54 + tap := NewTapClient(logger, cfg, db, gitm, knotstream) 58 55 59 56 // start metrics endpoint 60 57 go func() {
+1 -2
knotmirror/knotstream/knotstream.go
··· 11 11 "tangled.org/core/knotmirror/db" 12 12 "tangled.org/core/knotmirror/models" 13 13 "tangled.org/core/log" 14 - "tangled.org/core/orm" 15 14 ) 16 15 17 16 type KnotStream struct { ··· 71 70 } 72 71 73 72 func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error { 74 - hosts, err := db.ListHosts(ctx, s.db, orm.FilterEq("status", "active")) 73 + hosts, err := db.ListHosts(ctx, s.db, models.HostStatusActive) 75 74 if err != nil { 76 75 return fmt.Errorf("listing hosts: %w", err) 77 76 }
knotmirror/knotstream/metrics.go

This file has not been changed.

knotmirror/knotstream/scheduler.go

This file has not been changed.

+2 -2
knotmirror/knotstream/slurper.go
··· 150 150 l.Warn("dialing failed", "err", err, "backoff", backoff) 151 151 time.Sleep(sleepForBackoff(backoff)) 152 152 backoff++ 153 - if backoff > 15 { 153 + if backoff > 30 { 154 154 l.Warn("host does not appear to be online, disabling for now") 155 155 host.Status = models.HostStatusOffline 156 156 if err := db.UpsertHost(ctx, s.db, &host); err != nil { ··· 328 328 return 0 329 329 } 330 330 if b < 10 { 331 - return (time.Duration(b) * 2) + (time.Millisecond * time.Duration(rand.Intn(1000))) 331 + return time.Millisecond * time.Duration((50*b)+rand.Intn(500)) 332 332 } 333 333 return time.Second * 30 334 334 }
knotmirror/knotstream/subscription.go

This file has not been changed.

knotmirror/metrics.go

This file has not been changed.

+2 -9
knotmirror/models/models.go
··· 51 51 RepoStateError, 52 52 } 53 53 54 - func (s RepoState) AllStates() []RepoState { 55 - return []RepoState{ 56 - RepoStatePending, 57 - RepoStateDesynchronized, 58 - RepoStateResyncing, 59 - RepoStateActive, 60 - RepoStateSuspended, 61 - RepoStateError, 62 - } 54 + func (s RepoState) IsResyncing() bool { 55 + return s == RepoStateResyncing 63 56 } 64 57 65 58 type HostCursor struct {
knotmirror/readme.md

This file has not been changed.

+98 -88
knotmirror/resyncer.go
··· 7 7 "fmt" 8 8 "log/slog" 9 9 "math/rand" 10 - "net/url" 11 - "os" 12 - "path" 10 + "strings" 13 11 "sync" 14 12 "time" 15 13 ··· 23 21 type Resyncer struct { 24 22 logger *slog.Logger 25 23 db *sql.DB 24 + gitm GitMirrorManager 26 25 27 26 claimJobMu sync.Mutex 28 27 29 - repoBasePath string 30 - repoFetchTimeout time.Duration 31 - knotUseSSL bool 28 + runningJobs map[syntax.ATURI]context.CancelFunc 29 + runningJobsMu sync.Mutex 30 + 31 + repoFetchTimeout time.Duration 32 + manualResyncTimeout time.Duration 33 + parallelism int 32 34 33 - parallelism int 34 35 } 35 36 36 - func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer { 37 + func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer { 37 38 return &Resyncer{ 38 - logger: log.SubLogger(l, "resyncer"), 39 - db: db, 40 - repoBasePath: cfg.GitRepoBasePath, 41 - repoFetchTimeout: cfg.GitRepoFetchTimeout, 42 - knotUseSSL: cfg.KnotUseSSL, 43 - parallelism: cfg.ResyncParallelism, 39 + logger: log.SubLogger(l, "resyncer"), 40 + db: db, 41 + gitm: gitm, 42 + 43 + runningJobs: make(map[syntax.ATURI]context.CancelFunc), 44 + 45 + repoFetchTimeout: cfg.GitRepoFetchTimeout, 46 + manualResyncTimeout: 30 * time.Minute, 47 + parallelism: cfg.ResyncParallelism, 44 48 } 45 49 } 46 50 ··· 76 80 } 77 81 } 78 82 83 + func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) { 84 + r.runningJobsMu.Lock() 85 + defer r.runningJobsMu.Unlock() 86 + 87 + if _, exists := r.runningJobs[repo]; exists { 88 + return 89 + } 90 + r.runningJobs[repo] = cancel 91 + } 92 + 93 + func (r *Resyncer) unregisterRunning(repo syntax.ATURI) { 94 + r.runningJobsMu.Lock() 95 + defer r.runningJobsMu.Unlock() 96 + 97 + delete(r.runningJobs, repo) 98 + } 99 + 100 + func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) { 101 + r.runningJobsMu.Lock() 102 + defer r.runningJobsMu.Unlock() 103 + 104 + cancel, ok := r.runningJobs[repo] 105 + if !ok { 106 + return 107 + } 108 + delete(r.runningJobs, repo) 109 + cancel() 110 + } 111 + 112 + // TriggerResyncJob manually triggers the resync job 113 + func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error { 114 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 115 + if err != nil { 116 + return fmt.Errorf("failed to get repo: %w", err) 117 + } 118 + if repo == nil { 119 + return fmt.Errorf("repo not found: %s", repoAt) 120 + } 121 + 122 + if repo.State == models.RepoStateResyncing { 123 + return fmt.Errorf("repo already resyncing") 124 + } 125 + 126 + repo.State = models.RepoStatePending 127 + repo.RetryAfter = -1 // resyncer will prioritize this 128 + 129 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 130 + return fmt.Errorf("updating repo state to pending %w", err) 131 + } 132 + return nil 133 + } 134 + 79 135 func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 80 136 // use mutex to prevent duplicated jobs 81 137 r.claimJobMu.Lock() ··· 85 141 now := time.Now().Unix() 86 142 if err := r.db.QueryRowContext(ctx, 87 143 `update repos 88 - set state = ? 144 + set state = $1 89 145 where at_uri = ( 90 146 select at_uri from repos 91 - where state in (?, ?, ?) 92 - and (retry_after = 0 or retry_after < ?) 147 + where state in ($2, $3, $4) 148 + and (retry_after = -1 or retry_after = 0 or retry_after < $5) 149 + order by 150 + (retry_after = -1) desc, 151 + (retry_after = 0) desc, 152 + retry_after 93 153 limit 1 94 154 ) 95 155 returning at_uri ··· 115 175 resyncsStarted.Inc() 116 176 startTime := time.Now() 117 177 118 - success, err := r.doResync(ctx, repoAt) 178 + jobCtx, cancel := context.WithCancel(ctx) 179 + r.registerRunning(repoAt, cancel) 180 + defer r.unregisterRunning(repoAt) 181 + 182 + success, err := r.doResync(jobCtx, repoAt) 119 183 if !success { 120 184 resyncsFailed.Inc() 121 185 resyncDuration.Observe(time.Since(startTime).Seconds()) ··· 140 204 return false, nil 141 205 } 142 206 143 - repoPath := r.repoPath(repo) 144 - l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath) 145 - 146 - remoteUrl, err := r.repoRemoteURL(repo) 147 - if err != nil { 148 - return false, fmt.Errorf("parsing knot url: %w", err) 149 - } 150 - l = l.With("url", remoteUrl) 151 - 152 - ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 153 - defer cancel() 154 - 155 207 // TODO: check if Knot is on backoff list. If so, return (false, nil) 156 - // TODO: use r.repoFetchTimeout on fetch 157 208 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 158 209 159 - // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 160 - gitclient := &CliGitMirrorClient{} 161 - 162 - exist, err := isDir(repoPath) 163 - if err != nil { 164 - return false, fmt.Errorf("checking repo path: %w", err) 210 + timeout := r.repoFetchTimeout 211 + if repo.RetryAfter == -1 { 212 + timeout = r.manualResyncTimeout 165 213 } 166 - if !exist { 167 - if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil { 168 - return false, err 169 - } 170 - } else { 171 - if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil { 172 - return false, err 173 - } 214 + fetchCtx, cancel := context.WithTimeout(ctx, timeout) 215 + defer cancel() 216 + 217 + if err := r.gitm.Sync(fetchCtx, repo); err != nil { 218 + return false, err 174 219 } 175 220 176 221 // repo.GitRev = <processed git.refUpdate revision> ··· 205 250 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 206 251 } 207 252 253 + // start a 1 min & go up to 1 hr between retries 208 254 var retryCount = repo.RetryCount + 1 209 - var retryAfter int64 210 - if retryCount >= 10 { 211 - state = models.RepoStateSuspended 212 - errMsg = fmt.Sprintf("too many resync fails: %s", errMsg) 213 - retryAfter = 0 214 - } else { 215 - // start a 1 min & go up to 1 hr between retries 216 - retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 217 - } 255 + var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 256 + 257 + // remove null bytes 258 + errMsg = strings.ReplaceAll(errMsg, "\x00", "") 218 259 219 260 repo.State = state 220 261 repo.ErrorMsg = errMsg 221 262 repo.RetryCount = retryCount 222 263 repo.RetryAfter = retryAfter 223 - if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 224 - return dbErr 225 - } 226 - return err 227 - } 228 - 229 - func (r *Resyncer) repoPath(repo *models.Repo) string { 230 - return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) 231 - } 232 - 233 - func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) { 234 - u, err := url.Parse(repo.KnotDomain) 235 - if err != nil { 236 - return "", err 237 - } 238 - if u.Scheme == "" { 239 - if r.knotUseSSL { 240 - u.Scheme = "https" 241 - } else { 242 - u.Scheme = "http" 243 - } 264 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 265 + return fmt.Errorf("failed to update repo state: %w", err) 244 266 } 245 - u = u.JoinPath(repo.DidSlashRepo()) 246 - return u.String(), nil 267 + return nil 247 268 } 248 269 249 270 func backoff(retries int, max int) time.Duration { ··· 251 272 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 252 273 return time.Second*time.Duration(dur) + jitter 253 274 } 254 - 255 - func isDir(path string) (bool, error) { 256 - info, err := os.Stat(path) 257 - if err == nil && info.IsDir() { 258 - return true, nil 259 - } 260 - if os.IsNotExist(err) { 261 - return false, nil 262 - } 263 - return false, err 264 - }
+22 -3
knotmirror/tapclient.go
··· 24 24 cfg *config.Config 25 25 tap tapc.Client 26 26 db *sql.DB 27 + gitm GitMirrorManager 27 28 ks *knotstream.KnotStream 28 29 } 29 30 30 - func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, ks *knotstream.KnotStream) *Tap { 31 + func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap { 31 32 return &Tap{ 32 33 logger: log.SubLogger(l, "tapclient"), 33 34 cfg: cfg, 34 35 tap: tapc.NewClient(cfg.TapUrl, ""), 35 36 db: db, 37 + gitm: gitm, 36 38 ks: ks, 37 39 } 38 40 } ··· 87 89 errMsg = "suspending non-public knot" 88 90 } 89 91 90 - if err := db.UpsertRepo(ctx, t.db, &models.Repo{ 92 + repo := &models.Repo{ 91 93 Did: evt.Did, 92 94 Rkey: evt.Rkey, 93 95 Cid: evt.CID, ··· 95 97 KnotDomain: record.Knot, 96 98 State: status, 97 99 ErrorMsg: errMsg, 98 - }); err != nil { 100 + RetryAfter: 0, // clear retry info 101 + RetryCount: 0, 102 + } 103 + 104 + if evt.Action == tapc.RecordUpdateAction { 105 + exist, err := t.gitm.Exist(repo) 106 + if err != nil { 107 + return fmt.Errorf("checking git repo existance: %w", err) 108 + } 109 + if exist { 110 + // update git repo remote url 111 + if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil { 112 + return fmt.Errorf("updating git repo remote url: %w", err) 113 + } 114 + } 115 + } 116 + 117 + if err := db.UpsertRepo(ctx, t.db, repo); err != nil { 99 118 return fmt.Errorf("upserting repo to db: %w", err) 100 119 } 101 120
+29
knotmirror/templates/base.html
··· 11 11 th, td { text-align: left; padding: 8px; border: 1px solid #ddd; } 12 12 .pagination { margin-top: 20px; } 13 13 .filters { background: #f4f4f4; padding: 15px; margin-bottom: 20px; } 14 + #notifications { 15 + position: fixed; 16 + bottom: 8px; 17 + right: 8px; 18 + z-index: 1000; 19 + pointer-events: none; 20 + } 21 + .notif { 22 + pointer-events: auto; 23 + background: #333; 24 + color: #fff; 25 + padding: 2px 4px; 26 + margin: 4px 0; 27 + opacity: 0.95; 28 + } 29 + .notif.warn { background: #ed6c02 } 30 + .notif.error { background: #d32f2f } 14 31 </style> 15 32 </head> 16 33 <body> ··· 21 38 <main id="main"> 22 39 {{template "content" .}} 23 40 </main> 41 + <div id="notifications"></div> 42 + <script> 43 + document.body.addEventListener("htmx:oobBeforeSwap", (evt) => { 44 + evt.detail.fragment.querySelectorAll(".notif").forEach((el) => { 45 + console.debug("set timeout to notif element", el) 46 + setTimeout(() => { 47 + console.debug("clearing notif element", el); 48 + el.remove(); 49 + }, 10 * 1000); 50 + }); 51 + }); 52 + </script> 24 53 </body> 25 54 </html> 26 55 {{end}}
-1
knotmirror/templates/hosts.html
··· 11 11 hx-trigger="every 10s" 12 12 > 13 13 <select name="status"> 14 - <option value="">-- Statuse --</option> 15 14 {{ range const.AllHostStatuses }} 16 15 <option value="{{.}}" {{ if eq $.FilterByStatus . }}selected{{end}}>{{.}}</option> 17 16 {{ end }}
+16 -1
knotmirror/templates/repos.html
··· 25 25 26 26 <div id="table"> 27 27 <div class="repo-state-indicators"> 28 - {{range .FilterByState.AllStates}} 28 + {{range const.AllRepoStates}} 29 29 <span class="state-pill state-{{.}}"> 30 30 {{.}}: {{index $.RepoCounts .}} 31 31 </span> ··· 41 41 <th>Retry</th> 42 42 <th>Retry After</th> 43 43 <th>Error Message</th> 44 + <th>Action</th> 44 45 </tr> 45 46 </thead> 46 47 <tbody> ··· 53 54 <td>{{.RetryCount}}</td> 54 55 <td>{{readt .RetryAfter}}</td> 55 56 <td>{{.ErrorMsg}}</td> 57 + <td> 58 + <form 59 + {{ if .State.IsResyncing -}} 60 + hx-post="/api/cancelRepoResync" 61 + {{- else -}} 62 + hx-post="/api/triggerRepoResync" 63 + {{- end }} 64 + hx-swap="none" 65 + hx-disabled-elt="find button" 66 + > 67 + <input type="hidden" name="repo" value="{{.AtUri}}"> 68 + <button type="submit">{{ if .State.IsResyncing }}cancel{{ else }}resync{{ end }}</button> 69 + </form> 70 + </td> 56 71 </tr> 57 72 {{else}} 58 73 <tr><td colspan="99">No repositories found.</td></tr>
+12
nix/gomod2nix.toml
··· 398 398 [mod."github.com/ipfs/go-metrics-interface"] 399 399 version = "v0.3.0" 400 400 hash = "sha256-b3tp3jxecLmJEGx2kW7MiKGlAKPEWg/LJ7hXylSC8jQ=" 401 + [mod."github.com/jackc/pgpassfile"] 402 + version = "v1.0.0" 403 + hash = "sha256-H0nFbC34/3pZUFnuiQk9W7yvAMh6qJDrqvHp+akBPLM=" 404 + [mod."github.com/jackc/pgservicefile"] 405 + version = "v0.0.0-20240606120523-5a60cdf6a761" 406 + hash = "sha256-ETpGsLAA2wcm5xJBayr/mZrCE1YsWbnkbSSX3ptrFn0=" 407 + [mod."github.com/jackc/pgx/v5"] 408 + version = "v5.8.0" 409 + hash = "sha256-Mq5/A/Obcceu6kKxUv30DPC2ZaVvD8Iq/YtmLm1BVec=" 410 + [mod."github.com/jackc/puddle/v2"] 411 + version = "v2.2.2" 412 + hash = "sha256-IUxdu4JYfsCh/qlz2SiUWu7EVPHhyooiVA4oaS2Z6yk=" 401 413 [mod."github.com/json-iterator/go"] 402 414 version = "v1.1.12" 403 415 hash = "sha256-To8A0h+lbfZ/6zM+2PpRpY3+L6725OPC66lffq6fUoM="
-5
nix/pkgs/knot-mirror.nix
··· 1 1 { 2 2 buildGoApplication, 3 3 modules, 4 - sqlite-lib, 5 4 src, 6 5 }: 7 6 buildGoApplication { ··· 12 11 doCheck = false; 13 12 14 13 subPackages = ["cmd/knotmirror"]; 15 - tags = ["libsqlite3"]; 16 14 17 - env.CGO_CFLAGS = "-I ${sqlite-lib}/include "; 18 - env.CGO_LDFLAGS = "-L ${sqlite-lib}/lib"; 19 - CGO_ENABLED = 1; 20 15 meta = { 21 16 mainProgram = "knotmirror"; 22 17 };

History

5 rounds 4 comments
sign up or login to add to the discussion
1 commit
expand
knotmirror: introduce knotmirror
2/3 failed, 1/3 success
expand
expand 0 comments
pull request successfully merged
1 commit
expand
knotmirror: introduce knotmirror
expand 0 comments
1 commit
expand
knotmirror: introduce knotmirror
expand 0 comments
1 commit
expand
knotmirror: introduce knotmirror
expand 4 comments

cmd/knotmirror/main.go:49 we can have the entire thing config'd via env vars, no need for more flags IMO. knotmirror/crawler.go:11: what does the Crawler do? knotmirror/models/models.go:45-63: do we need the func and the global?

will post this as a preliminary review, and do a further code review shortly, i can see that PRs above this stack mutate the knotmirror modules, such as the postgres migration. would appreciate if the stack were cleaned up to remove such mutations as it is slightly hard to follow.

no need for more flags IMO

The intention was to make knotmirror service embeddable by allowing full config as an input of knotmirror.Run(). And while doing that, I thought it would be nice if we can have cli flags to override the config.

what does the Crawler do?

Crawler came from tap.Crawler which periodically schedules the resync job for known repos. As we don't have a way to check the repository state, I leave the implementation blank. It isn't strictly necessary and not valuable to implement in current structure. See TAN-283 on how I am going to check repository sync state. So yeah, it does nothing right now.

do we need the func and the global?

Uh, I think I lost one of my changes while reordering commits... I will fix that.

would appreciate if the stack were cleaned up to remove such mutations as it is slightly hard to follow.

will do.

knotmirror/knotstream/slurper.go:331

I think this is counting nanoseconds instead of seconds. Intentional?

good catch. Seems like I brought a bug from indigo/relay. Sorry for that, should check more carefully.

1 commit
expand
knotmirror: introduce knotmirror
expand 0 comments