Signed-off-by: Seongmin Lee git@boltless.me
+245
-103
Diff
round #1
+212
-21
knotmirror/git.go
+212
-21
knotmirror/git.go
···
4
4
"context"
5
5
"errors"
6
6
"fmt"
7
+
"net/url"
8
+
"os"
7
9
"os/exec"
10
+
"path/filepath"
8
11
"regexp"
9
12
"strings"
10
13
11
14
"github.com/go-git/go-git/v5"
12
15
gitconfig "github.com/go-git/go-git/v5/config"
13
16
"github.com/go-git/go-git/v5/plumbing/transport"
17
+
"tangled.org/core/knotmirror/models"
14
18
)
15
19
16
-
type GitMirrorClient interface {
17
-
Clone(ctx context.Context, path, url string) error
18
-
Fetch(ctx context.Context, path, url string) error
20
+
type GitMirrorManager interface {
21
+
// RemoteSetUrl updates git repository 'origin' remote
22
+
RemoteSetUrl(ctx context.Context, repo *models.Repo) error
23
+
// Clone clones the repository as a mirror
24
+
Clone(ctx context.Context, repo *models.Repo) error
25
+
// Fetch fetches the repository
26
+
Fetch(ctx context.Context, repo *models.Repo) error
27
+
// Sync mirrors the repository. It will clone the repository if repository doesn't exist.
28
+
Sync(ctx context.Context, repo *models.Repo) error
19
29
}
20
30
21
-
type CliGitMirrorClient struct{}
31
+
type CliGitMirrorManager struct {
32
+
repoBasePath string
33
+
knotUseSSL bool
34
+
}
35
+
36
+
func NewCliGitMirrorManager(repoBasePath string, knotUseSSL bool) *CliGitMirrorManager {
37
+
return &CliGitMirrorManager{
38
+
repoBasePath,
39
+
knotUseSSL,
40
+
}
41
+
}
42
+
43
+
var _ GitMirrorManager = new(CliGitMirrorManager)
22
44
23
-
var _ GitMirrorClient = new(CliGitMirrorClient)
45
+
func (c *CliGitMirrorManager) makeRepoPath(repo *models.Repo) string {
46
+
return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String())
47
+
}
48
+
49
+
func (c *CliGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error {
50
+
path := c.makeRepoPath(repo)
51
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
52
+
if err != nil {
53
+
return fmt.Errorf("constructing repo remote url: %w", err)
54
+
}
55
+
cmd := exec.CommandContext(ctx, "git", "-C", path, "remote", "set-url", "origin", url)
56
+
if out, err := cmd.CombinedOutput(); err != nil {
57
+
if ctx.Err() != nil {
58
+
return ctx.Err()
59
+
}
60
+
msg := string(out)
61
+
return fmt.Errorf("running 'git remote set-url origin %s': %w\n%s", url, err, msg)
62
+
}
63
+
return nil
64
+
}
24
65
25
-
func (c *CliGitMirrorClient) Clone(ctx context.Context, path, url string) error {
66
+
func (c *CliGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error {
67
+
path := c.makeRepoPath(repo)
68
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
69
+
if err != nil {
70
+
return fmt.Errorf("constructing repo remote url: %w", err)
71
+
}
72
+
return c.clone(ctx, path, url)
73
+
}
74
+
75
+
func (c *CliGitMirrorManager) clone(ctx context.Context, path, url string) error {
26
76
cmd := exec.CommandContext(ctx, "git", "clone", "--mirror", url, path)
27
77
if out, err := cmd.CombinedOutput(); err != nil {
28
78
if ctx.Err() != nil {
29
79
return ctx.Err()
30
80
}
31
81
msg := string(out)
32
-
if classification := classifyError(msg); classification != nil {
82
+
if classification := classifyCliError(msg); classification != nil {
33
83
return classification
34
84
}
35
-
return fmt.Errorf("cloning repo: %w\n%s", err, msg)
85
+
return fmt.Errorf("running 'git clone --mirror %s': %w\n%s", url, err, msg)
36
86
}
37
87
return nil
38
88
}
39
89
40
-
func (c *CliGitMirrorClient) Fetch(ctx context.Context, path, url string) error {
90
+
func (c *CliGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error {
91
+
path := c.makeRepoPath(repo)
92
+
return c.fetch(ctx, path)
93
+
}
94
+
95
+
func (c *CliGitMirrorManager) fetch(ctx context.Context, path string) error {
96
+
// TODO: use `repo.Knot` instead of depending on origin
41
97
cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", "origin")
42
98
if out, err := cmd.CombinedOutput(); err != nil {
43
99
if ctx.Err() != nil {
44
100
return ctx.Err()
45
101
}
46
-
return fmt.Errorf("fetching repo: %w\n%s", err, string(out))
102
+
return fmt.Errorf("running 'git fetch': %w\n%s", err, string(out))
103
+
}
104
+
return nil
105
+
}
106
+
107
+
func (c *CliGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error {
108
+
path := c.makeRepoPath(repo)
109
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
110
+
if err != nil {
111
+
return fmt.Errorf("constructing repo remote url: %w", err)
112
+
}
113
+
114
+
exist, err := isDir(path)
115
+
if err != nil {
116
+
return fmt.Errorf("checking repo path: %w", err)
117
+
}
118
+
if !exist {
119
+
if err := c.clone(ctx, path, url); err != nil {
120
+
return fmt.Errorf("cloning repo: %w", err)
121
+
}
122
+
} else {
123
+
if err := c.fetch(ctx, path); err != nil {
124
+
return fmt.Errorf("fetching repo: %w", err)
125
+
}
47
126
}
48
127
return nil
49
128
}
50
129
51
130
var (
52
-
ErrDNSFailure = errors.New("git: dns failure (could not resolve host)")
53
-
ErrCertExpired = errors.New("git: certificate has expired")
54
-
ErrRepoNotFound = errors.New("git: repository not found")
131
+
ErrDNSFailure = errors.New("git: knot: dns failure (could not resolve host)")
132
+
ErrCertExpired = errors.New("git: knot: certificate has expired")
133
+
ErrCertMismatch = errors.New("git: knot: certificate hostname mismatch")
134
+
ErrTLSHandshake = errors.New("git: knot: tls handshake failure")
135
+
ErrHTTPStatus = errors.New("git: knot: request url returned error")
136
+
ErrUnreachable = errors.New("git: knot: could not connect to server")
137
+
ErrRepoNotFound = errors.New("git: repo: repository not found")
55
138
)
56
139
57
140
var (
58
-
reDNS = regexp.MustCompile(`Could not resolve host:`)
141
+
reDNSFailure = regexp.MustCompile(`Could not resolve host:`)
59
142
reCertExpired = regexp.MustCompile(`SSL certificate OpenSSL verify result: certificate has expired`)
60
-
reRepoNotFound = regexp.MustCompile(`repository '.*' not found`)
143
+
reCertMismatch = regexp.MustCompile(`SSL: no alternative certificate subject name matches target hostname`)
144
+
reTLSHandshake = regexp.MustCompile(`TLS connect error: (.*)`)
145
+
reHTTPStatus = regexp.MustCompile(`The requested URL returned error: (\d\d\d)`)
146
+
reUnreachable = regexp.MustCompile(`Could not connect to server`)
147
+
reRepoNotFound = regexp.MustCompile(`repository '.*?' not found`)
61
148
)
62
149
63
-
func classifyError(stderr string) error {
150
+
// classifyCliError classifies git cli error message. It will return nil for unknown error messages
151
+
func classifyCliError(stderr string) error {
64
152
msg := strings.TrimSpace(stderr)
153
+
if m := reTLSHandshake.FindStringSubmatch(msg); len(m) > 1 {
154
+
return fmt.Errorf("%w: %s", ErrTLSHandshake, m[1])
155
+
}
156
+
if m := reHTTPStatus.FindStringSubmatch(msg); len(m) > 1 {
157
+
return fmt.Errorf("%w: %s", ErrHTTPStatus, m[1])
158
+
}
65
159
switch {
66
-
case reDNS.MatchString(msg):
160
+
case reDNSFailure.MatchString(msg):
67
161
return ErrDNSFailure
68
162
case reCertExpired.MatchString(msg):
69
163
return ErrCertExpired
164
+
case reCertMismatch.MatchString(msg):
165
+
return ErrCertMismatch
166
+
case reUnreachable.MatchString(msg):
167
+
return ErrUnreachable
70
168
case reRepoNotFound.MatchString(msg):
71
169
return ErrRepoNotFound
72
170
}
73
171
return nil
74
172
}
75
173
76
-
type GoGitMirrorClient struct{}
174
+
type GoGitMirrorManager struct {
175
+
repoBasePath string
176
+
knotUseSSL bool
177
+
}
178
+
179
+
func NewGoGitMirrorClient(repoBasePath string, knotUseSSL bool) *GoGitMirrorManager {
180
+
return &GoGitMirrorManager{
181
+
repoBasePath,
182
+
knotUseSSL,
183
+
}
184
+
}
185
+
186
+
var _ GitMirrorManager = new(GoGitMirrorManager)
187
+
188
+
func (c *GoGitMirrorManager) makeRepoPath(repo *models.Repo) string {
189
+
return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String())
190
+
}
191
+
192
+
func (c *GoGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error {
193
+
panic("unimplemented")
194
+
}
77
195
78
-
var _ GitMirrorClient = new(GoGitMirrorClient)
196
+
func (c *GoGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error {
197
+
path := c.makeRepoPath(repo)
198
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
199
+
if err != nil {
200
+
return fmt.Errorf("constructing repo remote url: %w", err)
201
+
}
202
+
return c.clone(ctx, path, url)
203
+
}
79
204
80
-
func (c *GoGitMirrorClient) Clone(ctx context.Context, path string, url string) error {
205
+
func (c *GoGitMirrorManager) clone(ctx context.Context, path, url string) error {
81
206
_, err := git.PlainCloneContext(ctx, path, true, &git.CloneOptions{
82
207
URL: url,
83
208
Mirror: true,
···
88
213
return nil
89
214
}
90
215
91
-
func (c *GoGitMirrorClient) Fetch(ctx context.Context, path string, url string) error {
216
+
func (c *GoGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error {
217
+
path := c.makeRepoPath(repo)
218
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
219
+
if err != nil {
220
+
return fmt.Errorf("constructing repo remote url: %w", err)
221
+
}
222
+
223
+
return c.fetch(ctx, path, url)
224
+
}
225
+
226
+
func (c *GoGitMirrorManager) fetch(ctx context.Context, path, url string) error {
92
227
gr, err := git.PlainOpen(path)
93
228
if err != nil {
94
229
return fmt.Errorf("opening local repo: %w", err)
···
103
238
}
104
239
return nil
105
240
}
241
+
242
+
func (c *GoGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error {
243
+
path := c.makeRepoPath(repo)
244
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
245
+
if err != nil {
246
+
return fmt.Errorf("constructing repo remote url: %w", err)
247
+
}
248
+
249
+
exist, err := isDir(path)
250
+
if err != nil {
251
+
return fmt.Errorf("checking repo path: %w", err)
252
+
}
253
+
if !exist {
254
+
if err := c.clone(ctx, path, url); err != nil {
255
+
return fmt.Errorf("cloning repo: %w", err)
256
+
}
257
+
} else {
258
+
if err := c.fetch(ctx, path, url); err != nil {
259
+
return fmt.Errorf("fetching repo: %w", err)
260
+
}
261
+
}
262
+
return nil
263
+
}
264
+
265
+
func makeRepoRemoteUrl(knot, didSlashRepo string, knotUseSSL bool) (string, error) {
266
+
if !strings.Contains(knot, "://") {
267
+
if knotUseSSL {
268
+
knot = "https://" + knot
269
+
} else {
270
+
knot = "http://" + knot
271
+
}
272
+
}
273
+
274
+
u, err := url.Parse(knot)
275
+
if err != nil {
276
+
return "", err
277
+
}
278
+
279
+
if u.Scheme != "http" && u.Scheme != "https" {
280
+
return "", fmt.Errorf("unsupported scheme: %s", u.Scheme)
281
+
}
282
+
283
+
u = u.JoinPath(didSlashRepo)
284
+
return u.String(), nil
285
+
}
286
+
287
+
func isDir(path string) (bool, error) {
288
+
info, err := os.Stat(path)
289
+
if err == nil && info.IsDir() {
290
+
return true, nil
291
+
}
292
+
if os.IsNotExist(err) {
293
+
return false, nil
294
+
}
295
+
return false, err
296
+
}
+5
-2
knotmirror/knotmirror.go
+5
-2
knotmirror/knotmirror.go
···
36
36
return fmt.Errorf("initializing db: %w", err)
37
37
}
38
38
39
+
// NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive.
40
+
gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL)
41
+
39
42
resolver := idresolver.DefaultResolver(cfg.PlcUrl)
40
43
41
44
res, err := db.ExecContext(ctx,
···
55
58
xrpc := xrpc.New(logger, cfg, db, resolver)
56
59
knotstream := knotstream.NewKnotStream(logger, db, cfg)
57
60
crawler := NewCrawler(logger, db)
58
-
resyncer := NewResyncer(logger, db, cfg)
61
+
resyncer := NewResyncer(logger, db, gitm, cfg)
59
62
adminpage := NewAdminServer(db)
60
63
61
64
// maintain repository list with tap
62
65
// NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
63
-
tap := NewTapClient(logger, cfg, db, knotstream)
66
+
tap := NewTapClient(logger, cfg, db, gitm, knotstream)
64
67
65
68
// start http server
66
69
go func() {
+12
-77
knotmirror/resyncer.go
+12
-77
knotmirror/resyncer.go
···
7
7
"fmt"
8
8
"log/slog"
9
9
"math/rand"
10
-
"net/url"
11
-
"os"
12
-
"path"
13
10
"strings"
14
11
"sync"
15
12
"time"
···
24
21
type Resyncer struct {
25
22
logger *slog.Logger
26
23
db *sql.DB
24
+
gitm GitMirrorManager
27
25
28
26
claimJobMu sync.Mutex
29
27
30
-
repoBasePath string
31
28
repoFetchTimeout time.Duration
32
-
knotUseSSL bool
33
29
34
30
parallelism int
35
31
}
36
32
37
-
func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer {
33
+
func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer {
38
34
return &Resyncer{
39
-
logger: log.SubLogger(l, "resyncer"),
40
-
db: db,
41
-
repoBasePath: cfg.GitRepoBasePath,
35
+
logger: log.SubLogger(l, "resyncer"),
36
+
db: db,
37
+
gitm: gitm,
38
+
42
39
repoFetchTimeout: cfg.GitRepoFetchTimeout,
43
-
knotUseSSL: cfg.KnotUseSSL,
44
40
parallelism: cfg.ResyncParallelism,
45
41
}
46
42
}
···
141
137
return false, nil
142
138
}
143
139
144
-
repoPath := r.repoPath(repo)
145
-
l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath)
146
-
147
-
remoteUrl, err := r.repoRemoteURL(repo)
148
-
if err != nil {
149
-
return false, fmt.Errorf("parsing knot url: %w", err)
150
-
}
151
-
l = l.With("url", remoteUrl)
152
-
153
-
ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout)
154
-
defer cancel()
155
-
156
140
// TODO: check if Knot is on backoff list. If so, return (false, nil)
157
-
// TODO: use r.repoFetchTimeout on fetch
158
141
// TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list
159
142
160
-
// NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive.
161
-
gitclient := &CliGitMirrorClient{}
143
+
fetchCtx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout)
144
+
defer cancel()
162
145
163
-
exist, err := isDir(repoPath)
164
-
if err != nil {
165
-
return false, fmt.Errorf("checking repo path: %w", err)
166
-
}
167
-
if !exist {
168
-
if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil {
169
-
return false, err
170
-
}
171
-
} else {
172
-
if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil {
173
-
return false, err
174
-
}
146
+
if err := r.gitm.Sync(fetchCtx, repo); err != nil {
147
+
return false, err
175
148
}
176
149
177
150
// repo.GitRev = <processed git.refUpdate revision>
···
206
179
return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt)
207
180
}
208
181
182
+
// start a 1 min & go up to 1 hr between retries
209
183
var retryCount = repo.RetryCount + 1
210
-
var retryAfter int64
211
-
if retryCount >= 10 {
212
-
state = models.RepoStateSuspended
213
-
errMsg = fmt.Sprintf("too many resync fails: %s", errMsg)
214
-
retryAfter = 0
215
-
} else {
216
-
// start a 1 min & go up to 1 hr between retries
217
-
retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
218
-
}
184
+
var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
219
185
220
186
// remove null bytes
221
187
errMsg = strings.ReplaceAll(errMsg, "\x00", "")
···
230
196
return err
231
197
}
232
198
233
-
func (r *Resyncer) repoPath(repo *models.Repo) string {
234
-
return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String())
235
-
}
236
-
237
-
func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) {
238
-
u, err := url.Parse(repo.KnotDomain)
239
-
if err != nil {
240
-
return "", err
241
-
}
242
-
if u.Scheme == "" {
243
-
if r.knotUseSSL {
244
-
u.Scheme = "https"
245
-
} else {
246
-
u.Scheme = "http"
247
-
}
248
-
}
249
-
u = u.JoinPath(repo.DidSlashRepo())
250
-
return u.String(), nil
251
-
}
252
-
253
199
func backoff(retries int, max int) time.Duration {
254
200
dur := min(1<<retries, max)
255
201
jitter := time.Millisecond * time.Duration(rand.Intn(1000))
256
202
return time.Second*time.Duration(dur) + jitter
257
203
}
258
-
259
-
func isDir(path string) (bool, error) {
260
-
info, err := os.Stat(path)
261
-
if err == nil && info.IsDir() {
262
-
return true, nil
263
-
}
264
-
if os.IsNotExist(err) {
265
-
return false, nil
266
-
}
267
-
return false, err
268
-
}
+16
-3
knotmirror/tapclient.go
+16
-3
knotmirror/tapclient.go
···
24
24
cfg *config.Config
25
25
tap tapc.Client
26
26
db *sql.DB
27
+
gitm GitMirrorManager
27
28
ks *knotstream.KnotStream
28
29
}
29
30
30
-
func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, ks *knotstream.KnotStream) *Tap {
31
+
func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap {
31
32
return &Tap{
32
33
logger: log.SubLogger(l, "tapclient"),
33
34
cfg: cfg,
34
35
tap: tapc.NewClient(cfg.TapUrl, ""),
35
36
db: db,
37
+
gitm: gitm,
36
38
ks: ks,
37
39
}
38
40
}
···
87
89
errMsg = "suspending non-public knot"
88
90
}
89
91
90
-
if err := db.UpsertRepo(ctx, t.db, &models.Repo{
92
+
repo := &models.Repo{
91
93
Did: evt.Did,
92
94
Rkey: evt.Rkey,
93
95
Cid: evt.CID,
···
95
97
KnotDomain: record.Knot,
96
98
State: status,
97
99
ErrorMsg: errMsg,
98
-
}); err != nil {
100
+
RetryAfter: 0, // clear retry info
101
+
RetryCount: 0,
102
+
}
103
+
104
+
if evt.Action == tapc.RecordUpdateAction {
105
+
// update git repo remote url
106
+
if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil {
107
+
return fmt.Errorf("updating git repo remote url: %w", err)
108
+
}
109
+
}
110
+
111
+
if err := db.UpsertRepo(ctx, t.db, repo); err != nil {
99
112
return fmt.Errorf("upserting repo to db: %w", err)
100
113
}
101
114
History
2 rounds
0 comments
boltless.me
submitted
#1
1 commit
expand
collapse
knotmirror: move git logic into GitMirrorManager
Signed-off-by: Seongmin Lee <git@boltless.me>
expand 0 comments
This pull has been deleted (possibly by jj abandon or jj squash)
boltless.me
submitted
#0
1 commit
expand
collapse
knotmirror: move git logic into GitMirrorManager
Signed-off-by: Seongmin Lee <git@boltless.me>