+10
spindle/db/db.go
+10
spindle/db/db.go
···
35
35
did text primary key
36
36
);
37
37
38
+
create table if not exists repos (
39
+
id integer primary key autoincrement,
40
+
knot text not null,
41
+
owner text not null,
42
+
name text not null,
43
+
addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
44
+
45
+
unique(owner, name)
46
+
);
47
+
38
48
-- status event for a single workflow
39
49
create table if not exists events (
40
50
rkey text not null,
+28
spindle/db/repos.go
+28
spindle/db/repos.go
···
1
+
package db
2
+
3
+
func (d *DB) AddRepo(knot, owner, name string) error {
4
+
_, err := d.Exec(`insert or ignore into repos (knot, owner, name) values (?, ?, ?)`, knot, owner, name)
5
+
return err
6
+
}
7
+
8
+
func (d *DB) Knots() ([]string, error) {
9
+
rows, err := d.Query(`select knot from repos`)
10
+
if err != nil {
11
+
return nil, err
12
+
}
13
+
14
+
var knots []string
15
+
for rows.Next() {
16
+
var knot string
17
+
if err := rows.Scan(&knot); err != nil {
18
+
return nil, err
19
+
}
20
+
knots = append(knots, knot)
21
+
}
22
+
23
+
if err = rows.Err(); err != nil {
24
+
return nil, err
25
+
}
26
+
27
+
return knots, nil
28
+
}
+52
-2
spindle/ingester.go
+52
-2
spindle/ingester.go
···
5
5
"encoding/json"
6
6
"fmt"
7
7
8
-
"github.com/bluesky-social/jetstream/pkg/models"
9
8
"tangled.sh/tangled.sh/core/api/tangled"
9
+
"tangled.sh/tangled.sh/core/knotclient"
10
+
11
+
"github.com/bluesky-social/jetstream/pkg/models"
10
12
)
11
13
12
14
type Ingester func(ctx context.Context, e *models.Event) error
···
29
31
switch e.Commit.Collection {
30
32
case tangled.SpindleMemberNSID:
31
33
s.ingestMember(ctx, e)
34
+
case tangled.RepoNSID:
35
+
s.ingestRepo(ctx, e)
32
36
}
33
37
34
38
return err
···
68
72
return fmt.Errorf("failed to enforce permissions: %w", err)
69
73
}
70
74
71
-
if err := s.e.AddMember(rbacDomain, record.Subject); err != nil {
75
+
if err := s.e.AddKnotMember(rbacDomain, record.Subject); err != nil {
72
76
l.Error("failed to add member", "error", err)
73
77
return fmt.Errorf("failed to add member: %w", err)
74
78
}
···
85
89
}
86
90
return nil
87
91
}
92
+
93
+
func (s *Spindle) ingestRepo(_ context.Context, e *models.Event) error {
94
+
var err error
95
+
96
+
l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
97
+
98
+
switch e.Commit.Operation {
99
+
case models.CommitOperationCreate, models.CommitOperationUpdate:
100
+
raw := e.Commit.Record
101
+
record := tangled.Repo{}
102
+
err = json.Unmarshal(raw, &record)
103
+
if err != nil {
104
+
l.Error("invalid record", "error", err)
105
+
return err
106
+
}
107
+
108
+
domain := s.cfg.Server.Hostname
109
+
if s.cfg.Server.Dev {
110
+
domain = s.cfg.Server.ListenAddr
111
+
}
112
+
113
+
// no spindle configured for this repo
114
+
if record.Spindle == nil {
115
+
return nil
116
+
}
117
+
118
+
// this repo did not want this spindle
119
+
if *record.Spindle != domain {
120
+
return nil
121
+
}
122
+
123
+
// add this repo to the watch list
124
+
if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil {
125
+
l.Error("failed to add repo", "error", err)
126
+
return fmt.Errorf("failed to add repo: %w", err)
127
+
}
128
+
129
+
// add this knot to the event consumer
130
+
src := knotclient.NewEventSource(record.Knot)
131
+
s.ks.AddSource(context.Background(), src)
132
+
133
+
return nil
134
+
135
+
}
136
+
return nil
137
+
}