Signed-off-by: Seongmin Lee git@boltless.me
+97
-94
Diff
round #1
+4
go.mod
+4
go.mod
···
35
35
github.com/hiddeco/sshsig v0.2.0
36
36
github.com/hpcloud/tail v1.0.0
37
37
github.com/ipfs/go-cid v0.5.0
38
+
github.com/jackc/pgx/v5 v5.8.0
38
39
github.com/mattn/go-sqlite3 v1.14.24
39
40
github.com/microcosm-cc/bluemonday v1.0.27
40
41
github.com/openbao/openbao/api/v2 v2.3.0
···
160
161
github.com/ipfs/go-log v1.0.5 // indirect
161
162
github.com/ipfs/go-log/v2 v2.6.0 // indirect
162
163
github.com/ipfs/go-metrics-interface v0.3.0 // indirect
164
+
github.com/jackc/pgpassfile v1.0.0 // indirect
165
+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
166
+
github.com/jackc/puddle/v2 v2.2.2 // indirect
163
167
github.com/json-iterator/go v1.1.12 // indirect
164
168
github.com/kevinburke/ssh_config v1.2.0 // indirect
165
169
github.com/klauspost/compress v1.18.0 // indirect
+8
go.sum
+8
go.sum
···
347
347
github.com/ipfs/go-log/v2 v2.6.0/go.mod h1:p+Efr3qaY5YXpx9TX7MoLCSEZX5boSWj9wh86P5HJa8=
348
348
github.com/ipfs/go-metrics-interface v0.3.0 h1:YwG7/Cy4R94mYDUuwsBfeziJCVm9pBMJ6q/JR9V40TU=
349
349
github.com/ipfs/go-metrics-interface v0.3.0/go.mod h1:OxxQjZDGocXVdyTPocns6cOLwHieqej/jos7H4POwoY=
350
+
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
351
+
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
352
+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
353
+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
354
+
github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
355
+
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
356
+
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
357
+
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
350
358
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
351
359
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
352
360
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
+7
-25
knotmirror/adminpage.go
+7
-25
knotmirror/adminpage.go
···
13
13
"tangled.org/core/appview/pagination"
14
14
"tangled.org/core/knotmirror/db"
15
15
"tangled.org/core/knotmirror/models"
16
-
"tangled.org/core/orm"
17
16
)
18
17
19
18
//go:embed templates/*.html
···
56
55
}
57
56
58
57
func (s *AdminServer) handleRepos() http.HandlerFunc {
59
-
// TODO: prepare template
60
58
tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/repos.html"))
61
59
return func(w http.ResponseWriter, r *http.Request) {
62
60
pageNum, _ := strconv.Atoi(r.URL.Query().Get("page"))
63
61
if pageNum < 1 {
64
62
pageNum = 1
65
63
}
66
-
var (
67
-
did = r.URL.Query().Get("did")
68
-
knot = r.URL.Query().Get("knot")
69
-
state = r.URL.Query().Get("state")
70
-
)
71
-
72
64
page := pagination.Page{
73
65
Offset: (pageNum - 1) * repoPageSize,
74
66
Limit: repoPageSize,
75
67
}
76
-
var filters []orm.Filter
77
68
78
-
if did != "" {
79
-
filters = append(filters, orm.FilterEq("did", did))
80
-
}
81
-
if knot != "" {
82
-
filters = append(filters, orm.FilterEq("knot_domain", knot))
83
-
}
84
-
if state != "" {
85
-
filters = append(filters, orm.FilterEq("state", state))
86
-
}
69
+
var (
70
+
did = r.URL.Query().Get("did")
71
+
knot = r.URL.Query().Get("knot")
72
+
state = r.URL.Query().Get("state")
73
+
)
87
74
88
-
repos, err := db.ListRepos(r.Context(), s.db, page, filters...)
75
+
repos, err := db.ListRepos(r.Context(), s.db, page, did, knot, state)
89
76
if err != nil {
90
77
http.Error(w, err.Error(), http.StatusInternalServerError)
91
78
}
···
112
99
return func(w http.ResponseWriter, r *http.Request) {
113
100
var status = r.URL.Query().Get("status")
114
101
115
-
var filters []orm.Filter
116
-
if status != "" {
117
-
filters = append(filters, orm.FilterEq("status", status))
118
-
}
119
-
120
-
hosts, err := db.ListHosts(r.Context(), s.db, filters...)
102
+
hosts, err := db.ListHosts(r.Context(), s.db, models.HostStatus(status))
121
103
if err != nil {
122
104
http.Error(w, err.Error(), http.StatusInternalServerError)
123
105
}
+1
-1
knotmirror/config/config.go
+1
-1
knotmirror/config/config.go
···
9
9
10
10
type Config struct {
11
11
TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"`
12
-
DbPath string `env:"MIRROR_DB_PATH, default=mirror.db"`
12
+
DbUrl string `env:"MIRROR_DB_URL, required"`
13
13
KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not scheme is not specified
14
14
KnotSSRF bool `env:"MIRROR_KNOT_SSRF, default=false"`
15
15
GitRepoBasePath string `env:"MIRROR_GIT_BASEPATH, default=repos"`
+22
-16
knotmirror/db/db.go
+22
-16
knotmirror/db/db.go
···
4
4
"context"
5
5
"database/sql"
6
6
"fmt"
7
-
"strings"
8
-
_ "github.com/mattn/go-sqlite3"
7
+
"time"
8
+
9
+
_ "github.com/jackc/pgx/v5/stdlib"
9
10
)
10
11
11
-
func Make(ctx context.Context, dbPath string) (*sql.DB, error) {
12
-
// https://github.com/mattn/go-sqlite3#connection-string
13
-
opts := []string{
14
-
"_foreign_keys=1",
15
-
"_journal_mode=WAL",
16
-
"_synchronous=NORMAL",
17
-
"_auto_vacuum=incremental",
12
+
func Make(ctx context.Context, dbUrl string, maxConns int) (*sql.DB, error) {
13
+
db, err := sql.Open("pgx", dbUrl)
14
+
if err != nil {
15
+
return nil, fmt.Errorf("opening db: %w", err)
18
16
}
19
17
20
-
db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
21
-
if err != nil {
22
-
return nil, err
18
+
db.SetMaxOpenConns(maxConns)
19
+
db.SetMaxIdleConns(maxConns)
20
+
db.SetConnMaxIdleTime(time.Hour)
21
+
22
+
pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
23
+
defer cancel()
24
+
if err := db.PingContext(pingCtx); err != nil {
25
+
db.Close()
26
+
return nil, fmt.Errorf("ping db: %w", err)
23
27
}
24
28
25
29
conn, err := db.Conn(ctx)
···
47
51
retry_count integer not null default 0,
48
52
retry_after integer not null default 0,
49
53
50
-
unique(did, rkey)
54
+
constraint repos_pkey primary key (did, rkey)
51
55
);
52
56
53
57
-- knot hosts
54
58
create table if not exists hosts (
55
59
hostname text not null,
56
-
no_ssl integer not null default 0,
60
+
no_ssl boolean not null default false,
57
61
status text not null default 'active',
58
-
last_seq integer not null default -1,
62
+
last_seq bigint not null default -1,
59
63
60
-
unique(hostname)
64
+
constraint hosts_pkey primary key (hostname)
61
65
);
66
+
67
+
create index if not exists idx_repos_aturi on repos (at_uri);
62
68
`)
63
69
if err != nil {
64
70
return nil, fmt.Errorf("initializing db schema: %w", err)
+8
-22
knotmirror/db/hosts.go
+8
-22
knotmirror/db/hosts.go
···
6
6
"errors"
7
7
"fmt"
8
8
"log"
9
-
"strings"
10
9
11
10
"tangled.org/core/knotmirror/models"
12
-
"tangled.org/core/orm"
13
11
)
14
12
15
13
func UpsertHost(ctx context.Context, e *sql.DB, host *models.Host) error {
16
14
if _, err := e.ExecContext(ctx,
17
15
`insert into hosts (hostname, no_ssl, status, last_seq)
18
-
values (?, ?, ?, ?)
16
+
values ($1, $2, $3, $4)
19
17
on conflict(hostname) do update set
20
18
no_ssl = excluded.no_ssl,
21
19
status = excluded.status,
···
35
33
var host models.Host
36
34
if err := e.QueryRowContext(ctx,
37
35
`select hostname, no_ssl, status, last_seq
38
-
from hosts where hostname = ?`,
36
+
from hosts where hostname = $1`,
39
37
hostname,
40
38
).Scan(
41
39
&host.Hostname,
···
62
60
continue
63
61
}
64
62
if _, err := tx.ExecContext(ctx,
65
-
`update hosts set last_seq = ? where hostname = ?`,
63
+
`update hosts set last_seq = $1 where hostname = $2`,
66
64
cur.LastSeq,
67
65
cur.Hostname,
68
66
); err != nil {
69
-
log.Println("failed to persist host cursor", "host:", cur.Hostname, "lastSeq", cur.LastSeq)
67
+
log.Println("failed to persist host cursor", "host", cur.Hostname, "lastSeq", cur.LastSeq, "err", err)
70
68
}
71
69
}
72
70
return tx.Commit()
73
71
}
74
72
75
-
func ListHosts(ctx context.Context, e *sql.DB, filters ...orm.Filter) ([]models.Host, error) {
76
-
var conditions []string
77
-
var args []any
78
-
79
-
for _, filter := range filters {
80
-
conditions = append(conditions, filter.Condition())
81
-
args = append(args, filter.Arg()...)
82
-
}
83
-
84
-
whereClause := ""
85
-
if len(conditions) > 0 {
86
-
whereClause = " where " + strings.Join(conditions, " and ")
87
-
}
88
-
73
+
func ListHosts(ctx context.Context, e *sql.DB, status models.HostStatus) ([]models.Host, error) {
89
74
rows, err := e.QueryContext(ctx,
90
75
`select hostname, no_ssl, status, last_seq
91
-
from hosts` + whereClause,
92
-
args...,
76
+
from hosts
77
+
where status = $1`,
78
+
status,
93
79
)
94
80
if err != nil {
95
81
return nil, fmt.Errorf("querying hosts: %w", err)
+24
-17
knotmirror/db/repos.go
+24
-17
knotmirror/db/repos.go
···
9
9
"github.com/bluesky-social/indigo/atproto/syntax"
10
10
"tangled.org/core/appview/pagination"
11
11
"tangled.org/core/knotmirror/models"
12
-
"tangled.org/core/orm"
13
12
)
14
13
15
14
func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error {
16
15
if _, err := e.ExecContext(ctx,
17
16
`insert into repos (did, rkey, cid, name, knot_domain)
18
-
values (?, ?, ?, ?, ?)`,
17
+
values ($1, $2, $3, $4, $5)`,
19
18
did, rkey, cid, name, knot,
20
19
); err != nil {
21
20
return fmt.Errorf("inserting repo: %w", err)
···
26
25
func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error {
27
26
if _, err := e.ExecContext(ctx,
28
27
`insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after)
29
-
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
28
+
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
30
29
on conflict(did, rkey) do update set
31
30
cid = excluded.cid,
32
31
name = excluded.name,
···
58
57
func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error {
59
58
if _, err := e.ExecContext(ctx,
60
59
`update repos
61
-
set state = ?
62
-
where did = ? and rkey = ?`,
60
+
set state = $1
61
+
where did = $2 and rkey = $3`,
63
62
state,
64
63
did, rkey,
65
64
); err != nil {
···
70
69
71
70
func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error {
72
71
if _, err := e.ExecContext(ctx,
73
-
`delete from repos where did = ? and rkey = ?`,
72
+
`delete from repos where did = $1 and rkey = $2`,
74
73
did,
75
74
rkey,
76
75
); err != nil {
···
95
94
retry_count,
96
95
retry_after
97
96
from repos
98
-
where did = ? and name = ?`,
97
+
where did = $1 and name = $2`,
99
98
did,
100
99
name,
101
100
).Scan(
···
135
134
retry_count,
136
135
retry_after
137
136
from repos
138
-
where at_uri = ?`,
137
+
where at_uri = $1`,
139
138
aturi,
140
139
).Scan(
141
140
&repo.Did,
···
158
157
return &repo, nil
159
158
}
160
159
161
-
func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, filters ...orm.Filter) ([]models.Repo, error) {
160
+
func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, did, knot, state string) ([]models.Repo, error) {
162
161
var conditions []string
163
162
var args []any
164
163
165
-
for _, filter := range filters {
166
-
conditions = append(conditions, filter.Condition())
167
-
args = append(args, filter.Arg()...)
164
+
pageClause := ""
165
+
if page.Limit > 0 {
166
+
pageClause = " limit $1 offset $2 "
167
+
args = append(args, page.Limit, page.Offset)
168
168
}
169
169
170
170
whereClause := ""
171
+
if did != "" {
172
+
conditions = append(conditions, fmt.Sprintf("did = $%d", len(args)+1))
173
+
args = append(args, did)
174
+
}
175
+
if knot != "" {
176
+
conditions = append(conditions, fmt.Sprintf("knot_domain = $%d", len(args)+1))
177
+
args = append(args, knot)
178
+
}
179
+
if state != "" {
180
+
conditions = append(conditions, fmt.Sprintf("state = $%d", len(args)+1))
181
+
args = append(args, state)
182
+
}
171
183
if len(conditions) > 0 {
172
184
whereClause = "WHERE " + conditions[0]
173
185
for _, condition := range conditions[1:] {
174
186
whereClause += " AND " + condition
175
187
}
176
188
}
177
-
pageClause := ""
178
-
if page.Limit > 0 {
179
-
pageClause = " limit ? offset ? "
180
-
args = append(args, page.Limit, page.Offset)
181
-
}
182
189
183
190
query := `
184
191
select
+2
-2
knotmirror/knotmirror.go
+2
-2
knotmirror/knotmirror.go
···
28
28
29
29
logger.Debug("config loaded:", "config", cfg)
30
30
31
-
db, err := db.Make(ctx, cfg.DbPath)
31
+
db, err := db.Make(ctx, cfg.DbUrl, 32)
32
32
if err != nil {
33
33
return fmt.Errorf("initializing db: %w", err)
34
34
}
35
35
36
36
res, err := db.ExecContext(ctx,
37
-
`update repos set state = ? where state = ?`,
37
+
`update repos set state = $1 where state = $2`,
38
38
models.RepoStateDesynchronized,
39
39
models.RepoStateResyncing,
40
40
)
+1
-2
knotmirror/knotstream/knotstream.go
+1
-2
knotmirror/knotstream/knotstream.go
···
11
11
"tangled.org/core/knotmirror/db"
12
12
"tangled.org/core/knotmirror/models"
13
13
"tangled.org/core/log"
14
-
"tangled.org/core/orm"
15
14
)
16
15
17
16
type KnotStream struct {
···
71
70
}
72
71
73
72
func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error {
74
-
hosts, err := db.ListHosts(ctx, s.db, orm.FilterEq("status", "active"))
73
+
hosts, err := db.ListHosts(ctx, s.db, models.HostStatusActive)
75
74
if err != nil {
76
75
return fmt.Errorf("listing hosts: %w", err)
77
76
}
+8
-4
knotmirror/resyncer.go
+8
-4
knotmirror/resyncer.go
···
10
10
"net/url"
11
11
"os"
12
12
"path"
13
+
"strings"
13
14
"sync"
14
15
"time"
15
16
···
85
86
now := time.Now().Unix()
86
87
if err := r.db.QueryRowContext(ctx,
87
88
`update repos
88
-
set state = ?
89
+
set state = $1
89
90
where at_uri = (
90
91
select at_uri from repos
91
-
where state in (?, ?, ?)
92
-
and (retry_after = 0 or retry_after < ?)
92
+
where state in ($2, $3, $4)
93
+
and (retry_after = 0 or retry_after < $5)
93
94
limit 1
94
95
)
95
96
returning at_uri
···
216
217
retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
217
218
}
218
219
220
+
// remove null bytes
221
+
errMsg = strings.ReplaceAll(errMsg, "\x00", "")
222
+
219
223
repo.State = state
220
224
repo.ErrorMsg = errMsg
221
225
repo.RetryCount = retryCount
222
226
repo.RetryAfter = retryAfter
223
227
if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil {
224
-
return dbErr
228
+
return fmt.Errorf("failed to update repo state: %w", err)
225
229
}
226
230
return err
227
231
}
+12
nix/gomod2nix.toml
+12
nix/gomod2nix.toml
···
398
398
[mod."github.com/ipfs/go-metrics-interface"]
399
399
version = "v0.3.0"
400
400
hash = "sha256-b3tp3jxecLmJEGx2kW7MiKGlAKPEWg/LJ7hXylSC8jQ="
401
+
[mod."github.com/jackc/pgpassfile"]
402
+
version = "v1.0.0"
403
+
hash = "sha256-H0nFbC34/3pZUFnuiQk9W7yvAMh6qJDrqvHp+akBPLM="
404
+
[mod."github.com/jackc/pgservicefile"]
405
+
version = "v0.0.0-20240606120523-5a60cdf6a761"
406
+
hash = "sha256-ETpGsLAA2wcm5xJBayr/mZrCE1YsWbnkbSSX3ptrFn0="
407
+
[mod."github.com/jackc/pgx/v5"]
408
+
version = "v5.8.0"
409
+
hash = "sha256-Mq5/A/Obcceu6kKxUv30DPC2ZaVvD8Iq/YtmLm1BVec="
410
+
[mod."github.com/jackc/puddle/v2"]
411
+
version = "v2.2.2"
412
+
hash = "sha256-IUxdu4JYfsCh/qlz2SiUWu7EVPHhyooiVA4oaS2Z6yk="
401
413
[mod."github.com/json-iterator/go"]
402
414
version = "v1.1.12"
403
415
hash = "sha256-To8A0h+lbfZ/6zM+2PpRpY3+L6725OPC66lffq6fUoM="
-5
nix/pkgs/knot-mirror.nix
-5
nix/pkgs/knot-mirror.nix
···
1
1
{
2
2
buildGoApplication,
3
3
modules,
4
-
sqlite-lib,
5
4
src,
6
5
}:
7
6
buildGoApplication {
···
12
11
doCheck = false;
13
12
14
13
subPackages = ["cmd/knotmirror"];
15
-
tags = ["libsqlite3"];
16
14
17
-
env.CGO_CFLAGS = "-I ${sqlite-lib}/include ";
18
-
env.CGO_LDFLAGS = "-L ${sqlite-lib}/lib";
19
-
CGO_ENABLED = 1;
20
15
meta = {
21
16
mainProgram = "knotmirror";
22
17
};
History
2 rounds
3 comments
boltless.me
submitted
#1
1 commit
expand
collapse
knotmirror: switch to postgres for concurrent writes
Signed-off-by: Seongmin Lee <git@boltless.me>
expand 3 comments
Ah, that's my mistake. I was going make error message more verbose there. will fix.
nix/pkgs/knot-mirror.nix:4: super minor nit: can be absorbed to older commit.
This pull has been deleted (possibly by jj abandon or jj squash)
boltless.me
submitted
#0
1 commit
expand
collapse
knotmirror: switch to postgres for concurrent writes
Signed-off-by: Seongmin Lee <git@boltless.me>
knotmirror/resyncer.go:228why change from dbErr to err here?