+1
go.mod
+1
go.mod
···
131
131
github.com/hashicorp/go-secure-stdlib/parseutil v0.2.0 // indirect
132
132
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect
133
133
github.com/hashicorp/go-sockaddr v1.0.7 // indirect
134
+
github.com/hashicorp/go-version v1.8.0 // indirect
134
135
github.com/hashicorp/golang-lru v1.0.2 // indirect
135
136
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
136
137
github.com/hashicorp/hcl v1.0.1-vault-7 // indirect
+2
go.sum
+2
go.sum
···
264
264
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4=
265
265
github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw=
266
266
github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw=
267
+
github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4=
268
+
github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
267
269
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
268
270
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
269
271
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+3
nix/gomod2nix.toml
+3
nix/gomod2nix.toml
···
304
304
[mod."github.com/hashicorp/go-sockaddr"]
305
305
version = "v1.0.7"
306
306
hash = "sha256-p6eDOrGzN1jMmT/F/f/VJMq0cKNFhUcEuVVwTE6vSrs="
307
+
[mod."github.com/hashicorp/go-version"]
308
+
version = "v1.8.0"
309
+
hash = "sha256-KXtqERmYrWdpqPCViWcHbe6jnuH7k16bvBIcuJuevj8="
307
310
[mod."github.com/hashicorp/golang-lru"]
308
311
version = "v1.0.2"
309
312
hash = "sha256-yy+5botc6T5wXgOe2mfNXJP3wr+MkVlUZ2JBkmmrA48="
+4
nix/modules/spindle.nix
+4
nix/modules/spindle.nix
···
1
1
{
2
2
config,
3
+
pkgs,
3
4
lib,
4
5
...
5
6
}: let
···
149
150
description = "spindle service";
150
151
after = ["network.target" "docker.service"];
151
152
wantedBy = ["multi-user.target"];
153
+
path = [
154
+
pkgs.git
155
+
];
152
156
serviceConfig = {
153
157
LogsDirectory = "spindle";
154
158
StateDirectory = "spindle";
+10
orm/orm.go
+10
orm/orm.go
···
20
20
}
21
21
defer tx.Rollback()
22
22
23
+
_, err = tx.Exec(`
24
+
create table if not exists migrations (
25
+
id integer primary key autoincrement,
26
+
name text unique
27
+
);
28
+
`)
29
+
if err != nil {
30
+
return fmt.Errorf("creating migrations table: %w", err)
31
+
}
32
+
23
33
var exists bool
24
34
err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists)
25
35
if err != nil {
+7
spindle/config/config.go
+7
spindle/config/config.go
···
3
3
import (
4
4
"context"
5
5
"fmt"
6
+
"path"
6
7
7
8
"github.com/bluesky-social/indigo/atproto/syntax"
8
9
"github.com/sethvargo/go-envconfig"
···
13
14
DBPath string `env:"DB_PATH, default=spindle.db"`
14
15
Hostname string `env:"HOSTNAME, required"`
15
16
JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"`
17
+
TapUrl string `env:"TAP_URL, required"`
16
18
PlcUrl string `env:"PLC_URL, default=https://plc.directory"`
17
19
Dev bool `env:"DEV, default=false"`
18
20
Owner syntax.DID `env:"OWNER, required"`
19
21
Secrets Secrets `env:",prefix=SECRETS_"`
20
22
LogDir string `env:"LOG_DIR, default=/var/log/spindle"`
23
+
DataDir string `env:"DATA_DIR, default=/var/lib/spindle"`
21
24
QueueSize int `env:"QUEUE_SIZE, default=100"`
22
25
MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time
23
26
}
24
27
25
28
func (s Server) Did() syntax.DID {
26
29
return syntax.DID(fmt.Sprintf("did:web:%s", s.Hostname))
30
+
}
31
+
32
+
func (s Server) RepoDir() string {
33
+
return path.Join(s.DataDir, "repos")
27
34
}
28
35
29
36
type Secrets struct {
+59
-18
spindle/db/db.go
+59
-18
spindle/db/db.go
···
1
1
package db
2
2
3
3
import (
4
+
"context"
4
5
"database/sql"
5
6
"strings"
6
7
8
+
"github.com/bluesky-social/indigo/atproto/syntax"
7
9
_ "github.com/mattn/go-sqlite3"
10
+
"tangled.org/core/log"
11
+
"tangled.org/core/orm"
8
12
)
9
13
10
14
type DB struct {
11
15
*sql.DB
12
16
}
13
17
14
-
func Make(dbPath string) (*DB, error) {
18
+
func Make(ctx context.Context, dbPath string) (*DB, error) {
15
19
// https://github.com/mattn/go-sqlite3#connection-string
16
20
opts := []string{
17
21
"_foreign_keys=1",
···
19
23
"_synchronous=NORMAL",
20
24
"_auto_vacuum=incremental",
21
25
}
26
+
27
+
logger := log.FromContext(ctx)
28
+
logger = log.SubLogger(logger, "db")
22
29
23
30
db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
24
31
if err != nil {
25
32
return nil, err
26
33
}
27
34
28
-
// NOTE: If any other migration is added here, you MUST
29
-
// copy the pattern in appview: use a single sql.Conn
30
-
// for every migration.
35
+
conn, err := db.Conn(ctx)
36
+
if err != nil {
37
+
return nil, err
38
+
}
39
+
defer conn.Close()
31
40
32
41
_, err = db.Exec(`
33
42
create table if not exists _jetstream (
···
76
85
return nil, err
77
86
}
78
87
79
-
return &DB{db}, nil
80
-
}
88
+
// run migrations
89
+
90
+
// NOTE: this won't migrate existing records
91
+
// they will be fetched again with tap instead
92
+
orm.RunMigration(conn, logger, "add-rkey-to-repos", func(tx *sql.Tx) error {
93
+
// archive legacy repos (just in case)
94
+
_, err = tx.Exec(`alter table repos rename to repos_old`)
95
+
if err != nil {
96
+
return err
97
+
}
98
+
99
+
_, err := tx.Exec(`
100
+
create table repos_new (
101
+
-- identifiers
102
+
id integer primary key autoincrement,
103
+
did text not null,
104
+
rkey text not null,
105
+
at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo' || '/' || rkey) stored,
106
+
107
+
name text not null,
108
+
knot text not null,
109
+
110
+
addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
111
+
unique(did, rkey)
112
+
);
113
+
`)
114
+
if err != nil {
115
+
return err
116
+
}
117
+
118
+
return nil
119
+
})
81
120
82
-
func (d *DB) SaveLastTimeUs(lastTimeUs int64) error {
83
-
_, err := d.Exec(`
84
-
insert into _jetstream (id, last_time_us)
85
-
values (1, ?)
86
-
on conflict(id) do update set last_time_us = excluded.last_time_us
87
-
`, lastTimeUs)
88
-
return err
121
+
return &DB{db}, nil
89
122
}
90
123
91
-
func (d *DB) GetLastTimeUs() (int64, error) {
92
-
var lastTimeUs int64
93
-
row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`)
94
-
err := row.Scan(&lastTimeUs)
95
-
return lastTimeUs, err
124
+
func (d *DB) IsKnownDid(did syntax.DID) (bool, error) {
125
+
// is spindle member / repo collaborator
126
+
var exists bool
127
+
err := d.QueryRow(
128
+
`select exists (
129
+
select 1 from repo_collaborators where did = ?
130
+
union all
131
+
select 1 from spindle_members where did = ?
132
+
)`,
133
+
did,
134
+
did,
135
+
).Scan(&exists)
136
+
return exists, err
96
137
}
-44
spindle/db/known_dids.go
-44
spindle/db/known_dids.go
···
1
-
package db
2
-
3
-
func (d *DB) AddDid(did string) error {
4
-
_, err := d.Exec(`insert or ignore into known_dids (did) values (?)`, did)
5
-
return err
6
-
}
7
-
8
-
func (d *DB) RemoveDid(did string) error {
9
-
_, err := d.Exec(`delete from known_dids where did = ?`, did)
10
-
return err
11
-
}
12
-
13
-
func (d *DB) GetAllDids() ([]string, error) {
14
-
var dids []string
15
-
16
-
rows, err := d.Query(`select did from known_dids`)
17
-
if err != nil {
18
-
return nil, err
19
-
}
20
-
defer rows.Close()
21
-
22
-
for rows.Next() {
23
-
var did string
24
-
if err := rows.Scan(&did); err != nil {
25
-
return nil, err
26
-
}
27
-
dids = append(dids, did)
28
-
}
29
-
30
-
if err := rows.Err(); err != nil {
31
-
return nil, err
32
-
}
33
-
34
-
return dids, nil
35
-
}
36
-
37
-
func (d *DB) HasKnownDids() bool {
38
-
var count int
39
-
err := d.QueryRow(`select count(*) from known_dids`).Scan(&count)
40
-
if err != nil {
41
-
return false
42
-
}
43
-
return count > 0
44
-
}
+120
-11
spindle/db/repos.go
+120
-11
spindle/db/repos.go
···
1
1
package db
2
2
3
+
import "github.com/bluesky-social/indigo/atproto/syntax"
4
+
3
5
type Repo struct {
4
-
Knot string
5
-
Owner string
6
-
Name string
6
+
Did syntax.DID
7
+
Rkey syntax.RecordKey
8
+
Name string
9
+
Knot string
10
+
}
11
+
12
+
type RepoCollaborator struct {
13
+
Did syntax.DID
14
+
Rkey syntax.RecordKey
15
+
Repo syntax.ATURI
16
+
Subject syntax.DID
7
17
}
8
18
9
-
func (d *DB) AddRepo(knot, owner, name string) error {
10
-
_, err := d.Exec(`insert or ignore into repos (knot, owner, name) values (?, ?, ?)`, knot, owner, name)
19
+
func (d *DB) PutRepo(repo *Repo) error {
20
+
_, err := d.Exec(
21
+
`insert or ignore into repos (did, rkey, name, knot)
22
+
values (?, ?, ?, ?)
23
+
on conflict(did, rkey) do update set
24
+
name = excluded.name
25
+
knot = excluded.knot`,
26
+
repo.Did,
27
+
repo.Rkey,
28
+
repo.Name,
29
+
repo.Knot,
30
+
)
31
+
return err
32
+
}
33
+
34
+
func (d *DB) DeleteRepo(did syntax.DID, rkey syntax.RecordKey) error {
35
+
_, err := d.Exec(
36
+
`delete from repos where did = ? and rkey = ?`,
37
+
did,
38
+
rkey,
39
+
)
11
40
return err
12
41
}
13
42
···
34
63
return knots, nil
35
64
}
36
65
37
-
func (d *DB) GetRepo(knot, owner, name string) (*Repo, error) {
66
+
func (d *DB) GetRepo(did syntax.DID, rkey syntax.RecordKey) (*Repo, error) {
38
67
var repo Repo
68
+
err := d.DB.QueryRow(
69
+
`select
70
+
did,
71
+
rkey,
72
+
name,
73
+
knot
74
+
from repos where did = ? and rkey = ?`,
75
+
did,
76
+
rkey,
77
+
).Scan(
78
+
&repo.Did,
79
+
&repo.Rkey,
80
+
&repo.Name,
81
+
&repo.Knot,
82
+
)
83
+
if err != nil {
84
+
return nil, err
85
+
}
86
+
return &repo, nil
87
+
}
39
88
40
-
query := "select knot, owner, name from repos where knot = ? and owner = ? and name = ?"
41
-
err := d.DB.QueryRow(query, knot, owner, name).
42
-
Scan(&repo.Knot, &repo.Owner, &repo.Name)
89
+
func (d *DB) GetRepoWithName(did syntax.DID, name string) (*Repo, error) {
90
+
var repo Repo
91
+
err := d.DB.QueryRow(
92
+
`select
93
+
did,
94
+
rkey,
95
+
name,
96
+
knot
97
+
from repos where did = ? and name = ?`,
98
+
did,
99
+
name,
100
+
).Scan(
101
+
&repo.Did,
102
+
&repo.Rkey,
103
+
&repo.Name,
104
+
&repo.Knot,
105
+
)
106
+
if err != nil {
107
+
return nil, err
108
+
}
109
+
return &repo, nil
110
+
}
111
+
112
+
func (d *DB) PutRepoCollaborator(collaborator *RepoCollaborator) error {
113
+
_, err := d.Exec(
114
+
`insert into repo_collaborators (did, rkey, repo, subject)
115
+
values (?, ?, ?, ?)
116
+
on conflict(did, rkey) do update set
117
+
repo = excluded.repo
118
+
subject = excluded.subject`,
119
+
collaborator.Did,
120
+
collaborator.Rkey,
121
+
collaborator.Repo,
122
+
collaborator.Subject,
123
+
)
124
+
return err
125
+
}
126
+
127
+
func (d *DB) RemoveRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) error {
128
+
_, err := d.Exec(
129
+
`delete from repo_collaborators where did = ? and rkey = ?`,
130
+
did,
131
+
rkey,
132
+
)
133
+
return err
134
+
}
43
135
136
+
func (d *DB) GetRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) (*RepoCollaborator, error) {
137
+
var collaborator RepoCollaborator
138
+
err := d.DB.QueryRow(
139
+
`select
140
+
did,
141
+
rkey,
142
+
repo,
143
+
subject
144
+
from repo_collaborators
145
+
where did = ? and rkey = ?`,
146
+
did,
147
+
rkey,
148
+
).Scan(
149
+
&collaborator.Did,
150
+
&collaborator.Rkey,
151
+
&collaborator.Repo,
152
+
&collaborator.Subject,
153
+
)
44
154
if err != nil {
45
155
return nil, err
46
156
}
47
-
48
-
return &repo, nil
157
+
return &collaborator, nil
49
158
}
-276
spindle/ingester.go
-276
spindle/ingester.go
···
1
-
package spindle
2
-
3
-
import (
4
-
"context"
5
-
"encoding/json"
6
-
"errors"
7
-
"fmt"
8
-
"time"
9
-
10
-
"tangled.org/core/api/tangled"
11
-
"tangled.org/core/eventconsumer"
12
-
"tangled.org/core/spindle/db"
13
-
14
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
15
-
"github.com/bluesky-social/indigo/atproto/syntax"
16
-
"github.com/bluesky-social/indigo/xrpc"
17
-
"github.com/bluesky-social/jetstream/pkg/models"
18
-
)
19
-
20
-
type Ingester func(ctx context.Context, e *models.Event) error
21
-
22
-
func (s *Spindle) ingest() Ingester {
23
-
return func(ctx context.Context, e *models.Event) error {
24
-
var err error
25
-
defer func() {
26
-
eventTime := e.TimeUS
27
-
lastTimeUs := eventTime + 1
28
-
if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
29
-
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
30
-
}
31
-
}()
32
-
33
-
if e.Kind != models.EventKindCommit {
34
-
return nil
35
-
}
36
-
37
-
switch e.Commit.Collection {
38
-
case tangled.SpindleMemberNSID:
39
-
err = s.ingestMember(ctx, e)
40
-
case tangled.RepoNSID:
41
-
err = s.ingestRepo(ctx, e)
42
-
case tangled.RepoCollaboratorNSID:
43
-
err = s.ingestCollaborator(ctx, e)
44
-
}
45
-
46
-
if err != nil {
47
-
s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err)
48
-
}
49
-
50
-
return nil
51
-
}
52
-
}
53
-
54
-
func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
55
-
var err error
56
-
did := e.Did
57
-
rkey := e.Commit.RKey
58
-
59
-
l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
60
-
61
-
switch e.Commit.Operation {
62
-
case models.CommitOperationCreate, models.CommitOperationUpdate:
63
-
raw := e.Commit.Record
64
-
record := tangled.SpindleMember{}
65
-
err = json.Unmarshal(raw, &record)
66
-
if err != nil {
67
-
l.Error("invalid record", "error", err)
68
-
return err
69
-
}
70
-
71
-
domain := s.cfg.Server.Hostname
72
-
recordInstance := record.Instance
73
-
74
-
if recordInstance != domain {
75
-
l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
76
-
return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
77
-
}
78
-
79
-
ok, err := s.e.IsSpindleMemberInviteAllowed(syntax.DID(did), s.cfg.Server.Did())
80
-
if err != nil || !ok {
81
-
l.Error("failed to add member", "did", did, "error", err)
82
-
return fmt.Errorf("failed to enforce permissions: %w", err)
83
-
}
84
-
85
-
if err := db.AddSpindleMember(s.db, db.SpindleMember{
86
-
Did: syntax.DID(did),
87
-
Rkey: rkey,
88
-
Instance: recordInstance,
89
-
Subject: syntax.DID(record.Subject),
90
-
Created: time.Now(),
91
-
}); err != nil {
92
-
l.Error("failed to add member", "error", err)
93
-
return fmt.Errorf("failed to add member: %w", err)
94
-
}
95
-
96
-
if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil {
97
-
l.Error("failed to add member", "error", err)
98
-
return fmt.Errorf("failed to add member: %w", err)
99
-
}
100
-
l.Info("added member from firehose", "member", record.Subject)
101
-
102
-
if err := s.db.AddDid(record.Subject); err != nil {
103
-
l.Error("failed to add did", "error", err)
104
-
return fmt.Errorf("failed to add did: %w", err)
105
-
}
106
-
s.jc.AddDid(record.Subject)
107
-
108
-
return nil
109
-
110
-
case models.CommitOperationDelete:
111
-
record, err := db.GetSpindleMember(s.db, did, rkey)
112
-
if err != nil {
113
-
l.Error("failed to find member", "error", err)
114
-
return fmt.Errorf("failed to find member: %w", err)
115
-
}
116
-
117
-
if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
118
-
l.Error("failed to remove member", "error", err)
119
-
return fmt.Errorf("failed to remove member: %w", err)
120
-
}
121
-
122
-
if err := s.e.RemoveSpindleMember(record.Subject, s.cfg.Server.Did()); err != nil {
123
-
l.Error("failed to add member", "error", err)
124
-
return fmt.Errorf("failed to add member: %w", err)
125
-
}
126
-
l.Info("added member from firehose", "member", record.Subject)
127
-
128
-
if err := s.db.RemoveDid(record.Subject.String()); err != nil {
129
-
l.Error("failed to add did", "error", err)
130
-
return fmt.Errorf("failed to add did: %w", err)
131
-
}
132
-
s.jc.RemoveDid(record.Subject.String())
133
-
134
-
}
135
-
return nil
136
-
}
137
-
138
-
func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error {
139
-
var err error
140
-
did := e.Did
141
-
142
-
l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
143
-
144
-
l.Info("ingesting repo record", "did", did)
145
-
146
-
switch e.Commit.Operation {
147
-
case models.CommitOperationCreate, models.CommitOperationUpdate:
148
-
raw := e.Commit.Record
149
-
record := tangled.Repo{}
150
-
err = json.Unmarshal(raw, &record)
151
-
if err != nil {
152
-
l.Error("invalid record", "error", err)
153
-
return err
154
-
}
155
-
156
-
domain := s.cfg.Server.Hostname
157
-
158
-
// no spindle configured for this repo
159
-
if record.Spindle == nil {
160
-
l.Info("no spindle configured", "name", record.Name)
161
-
return nil
162
-
}
163
-
164
-
// this repo did not want this spindle
165
-
if *record.Spindle != domain {
166
-
l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
167
-
return nil
168
-
}
169
-
170
-
// add this repo to the watch list
171
-
if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil {
172
-
l.Error("failed to add repo", "error", err)
173
-
return fmt.Errorf("failed to add repo: %w", err)
174
-
}
175
-
176
-
repoAt := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, e.Commit.Collection, e.Commit.RKey))
177
-
178
-
// add repo to rbac
179
-
if err := s.e.AddRepo(repoAt); err != nil {
180
-
l.Error("failed to add repo to enforcer", "error", err)
181
-
return fmt.Errorf("failed to add repo: %w", err)
182
-
}
183
-
184
-
// add collaborators to rbac
185
-
if err := s.fetchAndAddCollaborators(ctx, repoAt); err != nil {
186
-
return err
187
-
}
188
-
189
-
// add this knot to the event consumer
190
-
src := eventconsumer.NewKnotSource(record.Knot)
191
-
s.ks.AddSource(context.Background(), src)
192
-
193
-
return nil
194
-
195
-
}
196
-
return nil
197
-
}
198
-
199
-
func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
200
-
var err error
201
-
202
-
l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
203
-
204
-
l.Info("ingesting collaborator record")
205
-
206
-
switch e.Commit.Operation {
207
-
case models.CommitOperationCreate, models.CommitOperationUpdate:
208
-
raw := e.Commit.Record
209
-
record := tangled.RepoCollaborator{}
210
-
err = json.Unmarshal(raw, &record)
211
-
if err != nil {
212
-
l.Error("invalid record", "error", err)
213
-
return err
214
-
}
215
-
216
-
subjectId, err := s.res.ResolveIdent(ctx, record.Subject)
217
-
if err != nil || subjectId.Handle.IsInvalidHandle() {
218
-
return err
219
-
}
220
-
221
-
repoAt, err := syntax.ParseATURI(record.Repo)
222
-
if err != nil {
223
-
l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo)
224
-
return nil
225
-
}
226
-
227
-
// check perms for this user
228
-
if ok, err := s.e.IsRepoCollaboratorInviteAllowed(syntax.DID(e.Did), repoAt); !ok || err != nil {
229
-
return fmt.Errorf("insufficient permissions: %w", err)
230
-
}
231
-
232
-
// add collaborator to rbac
233
-
if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), repoAt); err != nil {
234
-
l.Error("failed to add repo to enforcer", "error", err)
235
-
return fmt.Errorf("failed to add repo: %w", err)
236
-
}
237
-
238
-
return nil
239
-
}
240
-
return nil
241
-
}
242
-
243
-
func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, repo syntax.ATURI) error {
244
-
l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
245
-
246
-
l.Info("fetching and adding existing collaborators")
247
-
248
-
ident, err := s.res.ResolveIdent(ctx, repo.Authority().String())
249
-
if err != nil || ident.Handle.IsInvalidHandle() {
250
-
return fmt.Errorf("failed to resolve handle: %w", err)
251
-
}
252
-
253
-
xrpcc := xrpc.Client{
254
-
Host: ident.PDSEndpoint(),
255
-
}
256
-
257
-
resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, ident.DID.String(), false)
258
-
if err != nil {
259
-
return err
260
-
}
261
-
262
-
var errs error
263
-
for _, r := range resp.Records {
264
-
if r == nil {
265
-
continue
266
-
}
267
-
record := r.Value.Val.(*tangled.RepoCollaborator)
268
-
269
-
if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil {
270
-
l.Error("failed to add repo to enforcer", "error", err)
271
-
errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
272
-
}
273
-
}
274
-
275
-
return errs
276
-
}
+134
-31
spindle/server.go
+134
-31
spindle/server.go
···
1
1
package spindle
2
2
3
3
import (
4
+
"bytes"
4
5
"context"
5
6
_ "embed"
6
7
"encoding/json"
···
8
9
"log/slog"
9
10
"maps"
10
11
"net/http"
12
+
"os"
13
+
"os/exec"
14
+
"path"
15
+
"strings"
11
16
17
+
"github.com/bluesky-social/indigo/atproto/syntax"
12
18
"github.com/go-chi/chi/v5"
19
+
"github.com/hashicorp/go-version"
13
20
"tangled.org/core/api/tangled"
14
21
"tangled.org/core/eventconsumer"
15
22
"tangled.org/core/eventconsumer/cursor"
16
23
"tangled.org/core/idresolver"
17
-
"tangled.org/core/jetstream"
18
24
"tangled.org/core/log"
19
25
"tangled.org/core/notifier"
20
26
"tangled.org/core/rbac2"
···
26
32
"tangled.org/core/spindle/queue"
27
33
"tangled.org/core/spindle/secrets"
28
34
"tangled.org/core/spindle/xrpc"
35
+
"tangled.org/core/tap"
29
36
"tangled.org/core/xrpc/serviceauth"
30
37
)
31
38
···
33
40
var motd []byte
34
41
35
42
type Spindle struct {
36
-
jc *jetstream.JetstreamClient
43
+
tap *tap.Client
37
44
db *db.DB
38
45
e *rbac2.Enforcer
39
46
l *slog.Logger
···
50
57
func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
51
58
logger := log.FromContext(ctx)
52
59
53
-
d, err := db.Make(cfg.Server.DBPath)
60
+
if err := ensureGitVersion(); err != nil {
61
+
return nil, fmt.Errorf("ensuring git version: %w", err)
62
+
}
63
+
64
+
d, err := db.Make(ctx, cfg.Server.DBPath)
54
65
if err != nil {
55
66
return nil, fmt.Errorf("failed to setup db: %w", err)
56
67
}
···
90
101
jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
91
102
logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
92
103
93
-
collections := []string{
94
-
tangled.SpindleMemberNSID,
95
-
tangled.RepoNSID,
96
-
tangled.RepoCollaboratorNSID,
97
-
}
98
-
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
99
-
if err != nil {
100
-
return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
101
-
}
102
-
jc.AddDid(cfg.Server.Owner.String())
103
-
104
-
// Check if the spindle knows about any Dids;
105
-
dids, err := d.GetAllDids()
106
-
if err != nil {
107
-
return nil, fmt.Errorf("failed to get all dids: %w", err)
108
-
}
109
-
for _, d := range dids {
110
-
jc.AddDid(d)
111
-
}
104
+
tap := tap.NewClient(cfg.Server.TapUrl, "")
112
105
113
106
resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
114
107
115
108
spindle := &Spindle{
116
-
jc: jc,
109
+
tap: &tap,
117
110
e: e,
118
111
db: d,
119
112
l: logger,
···
136
129
return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
137
130
}
138
131
139
-
err = jc.StartJetstream(ctx, spindle.ingest())
140
-
if err != nil {
141
-
return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
142
-
}
143
-
144
132
// for each incoming sh.tangled.pipeline, we execute
145
133
// spindle.processPipeline, which in turn enqueues the pipeline
146
134
// job in the above registered queue.
···
208
196
s.ks.Start(ctx)
209
197
}()
210
198
199
+
go func() {
200
+
s.l.Info("starting tap stream consumer")
201
+
s.tap.Connect(ctx, &tap.SimpleIndexer{
202
+
EventHandler: s.processEvent,
203
+
})
204
+
}()
205
+
211
206
s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
212
207
return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
213
208
}
···
267
262
}
268
263
269
264
func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
265
+
l := log.FromContext(ctx).With("handler", "processKnotStream")
266
+
l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
270
267
if msg.Nsid == tangled.PipelineNSID {
268
+
return nil
271
269
tpl := tangled.Pipeline{}
272
270
err := json.Unmarshal(msg.EventJson, &tpl)
273
271
if err != nil {
···
288
286
}
289
287
290
288
// filter by repos
291
-
_, err = s.db.GetRepo(
292
-
tpl.TriggerMetadata.Repo.Knot,
293
-
tpl.TriggerMetadata.Repo.Did,
289
+
_, err = s.db.GetRepoWithName(
290
+
syntax.DID(tpl.TriggerMetadata.Repo.Did),
294
291
tpl.TriggerMetadata.Repo.Repo,
295
292
)
296
293
if err != nil {
···
369
366
} else {
370
367
s.l.Error("failed to enqueue pipeline: queue is full")
371
368
}
369
+
} else if msg.Nsid == tangled.GitRefUpdateNSID {
370
+
event := tangled.GitRefUpdate{}
371
+
if err := json.Unmarshal(msg.EventJson, &event); err != nil {
372
+
l.Error("error unmarshalling", "err", err)
373
+
return err
374
+
}
375
+
l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
376
+
377
+
// use event.RepoAt
378
+
// sync git repos in {data}/repos/{did}/sh.tangled.repo/{rkey}
379
+
// if it's nil, don't run pipeline. knot needs upgrade
380
+
// we will leave sh.tangled.pipeline.trigger for backward compatibility
381
+
382
+
// NOTE: we are blindly trusting the knot that it will return only repos it own
383
+
repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
384
+
repoPath := s.newRepoPath(event.RepoDid, event.RepoName)
385
+
err := sparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha)
386
+
if err != nil {
387
+
l.Error("failed to sync git repo", "err", err)
388
+
return fmt.Errorf("sync git repo: %w", err)
389
+
}
390
+
l.Info("synced git repo")
391
+
392
+
// TODO: plan the pipeline
372
393
}
373
394
374
395
return nil
375
396
}
397
+
398
+
func (s *Spindle) newRepoPath(did, name string) string {
399
+
return path.Join(s.cfg.Server.RepoDir(), did, name)
400
+
}
401
+
402
+
func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
403
+
scheme := "https://"
404
+
if s.cfg.Server.Dev {
405
+
scheme = "http://"
406
+
}
407
+
return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
408
+
}
409
+
410
+
const RequiredVersion = "2.49.0"
411
+
412
+
func ensureGitVersion() error {
413
+
v, err := gitVersion()
414
+
if err != nil {
415
+
return fmt.Errorf("fetching git version: %w", err)
416
+
}
417
+
if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
418
+
return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
419
+
}
420
+
return nil
421
+
}
422
+
423
+
// TODO: move to "git" module shared between knot, appview & spindle
424
+
func gitVersion() (*version.Version, error) {
425
+
var buf bytes.Buffer
426
+
cmd := exec.Command("git", "version")
427
+
cmd.Stdout = &buf
428
+
cmd.Stderr = os.Stderr
429
+
err := cmd.Run()
430
+
if err != nil {
431
+
return nil, err
432
+
}
433
+
fields := strings.Fields(buf.String())
434
+
if len(fields) < 3 {
435
+
return nil, fmt.Errorf("invalid git version: %s", buf)
436
+
}
437
+
438
+
// version string is like: "git version 2.29.3" or "git version 2.29.3.windows.1"
439
+
versionString := fields[2]
440
+
if pos := strings.Index(versionString, "windows"); pos >= 1 {
441
+
versionString = versionString[:pos-1]
442
+
}
443
+
return version.NewVersion(versionString)
444
+
}
445
+
446
+
func sparseSyncGitRepo(ctx context.Context, cloneUri, path, rev string) error {
447
+
exist, err := isDir(path)
448
+
if err != nil {
449
+
return err
450
+
}
451
+
if !exist {
452
+
if err := exec.Command("git", "clone", "--no-checkout", "--depth=1", "--filter=tree:0", "--revision="+rev, cloneUri, path).Run(); err != nil {
453
+
return fmt.Errorf("git clone: %w", err)
454
+
}
455
+
if err := exec.Command("git", "-C", path, "sparse-checkout", "set", "--no-cone", `'/.tangled/workflows'`).Run(); err != nil {
456
+
return fmt.Errorf("git sparse-checkout set: %w", err)
457
+
}
458
+
if err := exec.Command("git", "-C", path, "checkout", rev).Run(); err != nil {
459
+
return fmt.Errorf("git checkout: %w", err)
460
+
}
461
+
} else {
462
+
if err := exec.Command("git", "-C", path, "pull", "origin", rev).Run(); err != nil {
463
+
return fmt.Errorf("git pull: %w", err)
464
+
}
465
+
}
466
+
return nil
467
+
}
468
+
469
+
func isDir(path string) (bool, error) {
470
+
info, err := os.Stat(path)
471
+
if err == nil && info.IsDir() {
472
+
return true, nil
473
+
}
474
+
if os.IsNotExist(err) {
475
+
return false, nil
476
+
}
477
+
return false, err
478
+
}
+281
spindle/tap.go
+281
spindle/tap.go
···
1
+
package spindle
2
+
3
+
import (
4
+
"context"
5
+
"encoding/json"
6
+
"fmt"
7
+
"time"
8
+
9
+
"github.com/bluesky-social/indigo/atproto/syntax"
10
+
"tangled.org/core/api/tangled"
11
+
"tangled.org/core/eventconsumer"
12
+
"tangled.org/core/spindle/db"
13
+
"tangled.org/core/tap"
14
+
)
15
+
16
+
func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error {
17
+
l := s.l.With("component", "tapIndexer")
18
+
19
+
var err error
20
+
switch evt.Type {
21
+
case tap.EvtRecord:
22
+
switch evt.Record.Collection.String() {
23
+
case tangled.SpindleMemberNSID:
24
+
err = s.processMember(ctx, evt)
25
+
case tangled.RepoNSID:
26
+
err = s.processRepo(ctx, evt)
27
+
case tangled.RepoCollaboratorNSID:
28
+
err = s.processCollaborator(ctx, evt)
29
+
case tangled.RepoPullNSID:
30
+
err = s.processPull(ctx, evt)
31
+
}
32
+
case tap.EvtIdentity:
33
+
// no-op
34
+
}
35
+
36
+
if err != nil {
37
+
l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
38
+
return err
39
+
}
40
+
return nil
41
+
}
42
+
43
+
// NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated)
44
+
45
+
func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error {
46
+
l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
47
+
48
+
l.Info("processing spindle.member record")
49
+
50
+
// check perms for this user
51
+
if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
52
+
l.Warn("forbidden request", "did", evt.Record.Did, "error", err)
53
+
return nil
54
+
}
55
+
56
+
switch evt.Record.Action {
57
+
case tap.RecordCreateAction, tap.RecordUpdateAction:
58
+
record := tangled.SpindleMember{}
59
+
if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
60
+
return fmt.Errorf("parsing record: %w", err)
61
+
}
62
+
63
+
domain := s.cfg.Server.Hostname
64
+
if record.Instance != domain {
65
+
l.Info("domain mismatch", "domain", record.Instance, "expected", domain)
66
+
return nil
67
+
}
68
+
69
+
created, err := time.Parse(record.CreatedAt, time.RFC3339)
70
+
if err != nil {
71
+
created = time.Now()
72
+
}
73
+
if err := db.AddSpindleMember(s.db, db.SpindleMember{
74
+
Did: evt.Record.Did,
75
+
Rkey: evt.Record.Rkey.String(),
76
+
Instance: record.Instance,
77
+
Subject: syntax.DID(record.Subject),
78
+
Created: created,
79
+
}); err != nil {
80
+
l.Error("failed to add member", "error", err)
81
+
return fmt.Errorf("adding member to db: %w", err)
82
+
}
83
+
if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil {
84
+
return fmt.Errorf("adding member to rbac: %w", err)
85
+
}
86
+
if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
87
+
return fmt.Errorf("adding did to tap", err)
88
+
}
89
+
90
+
l.Info("added member", "member", record.Subject)
91
+
return nil
92
+
93
+
case tap.RecordDeleteAction:
94
+
var (
95
+
did = evt.Record.Did.String()
96
+
rkey = evt.Record.Rkey.String()
97
+
)
98
+
member, err := db.GetSpindleMember(s.db, did, rkey)
99
+
if err != nil {
100
+
return fmt.Errorf("finding member: %w", err)
101
+
}
102
+
103
+
if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
104
+
return fmt.Errorf("removing member from db: %w", err)
105
+
}
106
+
if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil {
107
+
return fmt.Errorf("removing member from rbac: %w", err)
108
+
}
109
+
if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil {
110
+
return fmt.Errorf("removing did from tap: %w", err)
111
+
}
112
+
113
+
l.Info("removed member", "member", member.Subject)
114
+
return nil
115
+
}
116
+
return nil
117
+
}
118
+
119
+
func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error {
120
+
l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
121
+
122
+
l.Info("processing collaborator record")
123
+
switch evt.Record.Action {
124
+
case tap.RecordCreateAction, tap.RecordUpdateAction:
125
+
record := tangled.RepoCollaborator{}
126
+
if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
127
+
l.Error("invalid record", "err", err)
128
+
return fmt.Errorf("parsing record: %w", err)
129
+
}
130
+
131
+
// check perms for this user
132
+
if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil {
133
+
l.Warn("forbidden request", "did", evt.Record.Did, "err", err)
134
+
return nil
135
+
}
136
+
137
+
if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{
138
+
Did: evt.Record.Did,
139
+
Rkey: evt.Record.Rkey,
140
+
Repo: syntax.ATURI(record.Repo),
141
+
Subject: syntax.DID(record.Subject),
142
+
}); err != nil {
143
+
return fmt.Errorf("adding collaborator to db: %w", err)
144
+
}
145
+
if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil {
146
+
return fmt.Errorf("adding collaborator to rbac: %w", err)
147
+
}
148
+
if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
149
+
return fmt.Errorf("adding did to tap: %w", err)
150
+
}
151
+
152
+
l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo)
153
+
return nil
154
+
155
+
case tap.RecordDeleteAction:
156
+
// get existing collaborator
157
+
collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey)
158
+
if err != nil {
159
+
return fmt.Errorf("failed to get existing collaborator info: %w", err)
160
+
}
161
+
162
+
// check perms for this user
163
+
if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil {
164
+
l.Warn("forbidden request", "did", evt.Record.Did, "err", err)
165
+
return nil
166
+
}
167
+
168
+
if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil {
169
+
return fmt.Errorf("removing collaborator from db: %w", err)
170
+
}
171
+
if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil {
172
+
return fmt.Errorf("removing collaborator from rbac: %w", err)
173
+
}
174
+
if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil {
175
+
return fmt.Errorf("removing did from tap: %w", err)
176
+
}
177
+
178
+
l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo)
179
+
return nil
180
+
}
181
+
return nil
182
+
}
183
+
184
+
func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error {
185
+
l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
186
+
187
+
l.Info("processing repo record")
188
+
189
+
// check perms for this user
190
+
if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
191
+
l.Warn("forbidden request", "did", evt.Record.Did, "err", err)
192
+
return nil
193
+
}
194
+
195
+
switch evt.Record.Action {
196
+
case tap.RecordCreateAction, tap.RecordUpdateAction:
197
+
record := tangled.Repo{}
198
+
if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
199
+
return fmt.Errorf("parsing record: %w", err)
200
+
}
201
+
202
+
domain := s.cfg.Server.Hostname
203
+
if record.Spindle == nil || *record.Spindle != domain {
204
+
if record.Spindle == nil {
205
+
l.Info("spindle isn't configured", "name", record.Name)
206
+
} else {
207
+
l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
208
+
}
209
+
if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
210
+
return fmt.Errorf("deleting repo from db: %w", err)
211
+
}
212
+
return nil
213
+
}
214
+
215
+
if err := s.db.PutRepo(&db.Repo{
216
+
Did: evt.Record.Did,
217
+
Rkey: evt.Record.Rkey,
218
+
Name: record.Name,
219
+
Knot: record.Knot,
220
+
}); err != nil {
221
+
return fmt.Errorf("adding repo to db: %w", err)
222
+
}
223
+
224
+
if err := s.e.AddRepo(evt.Record.AtUri()); err != nil {
225
+
return fmt.Errorf("adding repo to rbac")
226
+
}
227
+
228
+
// add this knot to the event consumer
229
+
src := eventconsumer.NewKnotSource(record.Knot)
230
+
s.ks.AddSource(context.Background(), src)
231
+
232
+
l.Info("added repo", "repo", evt.Record.AtUri())
233
+
return nil
234
+
235
+
case tap.RecordDeleteAction:
236
+
// check perms for this user
237
+
if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil {
238
+
l.Warn("forbidden request", "did", evt.Record.Did, "err", err)
239
+
return nil
240
+
}
241
+
242
+
if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
243
+
return fmt.Errorf("deleting repo from db: %w", err)
244
+
}
245
+
246
+
if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil {
247
+
return fmt.Errorf("deleting repo from rbac: %w", err)
248
+
}
249
+
250
+
l.Info("deleted repo", "repo", evt.Record.AtUri())
251
+
return nil
252
+
}
253
+
return nil
254
+
}
255
+
256
+
func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error {
257
+
l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
258
+
259
+
l.Info("processing pull record")
260
+
261
+
switch evt.Record.Action {
262
+
case tap.RecordCreateAction, tap.RecordUpdateAction:
263
+
// TODO
264
+
case tap.RecordDeleteAction:
265
+
// TODO
266
+
}
267
+
return nil
268
+
}
269
+
270
+
func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error {
271
+
known, err := s.db.IsKnownDid(syntax.DID(did))
272
+
if err != nil {
273
+
return fmt.Errorf("ensuring did known state: %w", err)
274
+
}
275
+
if !known {
276
+
if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil {
277
+
return fmt.Errorf("removing did from tap: %w", err)
278
+
}
279
+
}
280
+
return nil
281
+
}