+10
spindle/db/db.go
+10
spindle/db/db.go
···
35
did text primary key
36
);
37
38
+
create table if not exists repos (
39
+
id integer primary key autoincrement,
40
+
knot text not null,
41
+
owner text not null,
42
+
name text not null,
43
+
addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
44
+
45
+
unique(owner, name)
46
+
);
47
+
48
-- status event for a single workflow
49
create table if not exists events (
50
rkey text not null,
+28
spindle/db/repos.go
+28
spindle/db/repos.go
···
···
1
+
package db
2
+
3
+
func (d *DB) AddRepo(knot, owner, name string) error {
4
+
_, err := d.Exec(`insert or ignore into repos (knot, owner, name) values (?, ?, ?)`, knot, owner, name)
5
+
return err
6
+
}
7
+
8
+
func (d *DB) Knots() ([]string, error) {
9
+
rows, err := d.Query(`select knot from repos`)
10
+
if err != nil {
11
+
return nil, err
12
+
}
13
+
14
+
var knots []string
15
+
for rows.Next() {
16
+
var knot string
17
+
if err := rows.Scan(&knot); err != nil {
18
+
return nil, err
19
+
}
20
+
knots = append(knots, knot)
21
+
}
22
+
23
+
if err = rows.Err(); err != nil {
24
+
return nil, err
25
+
}
26
+
27
+
return knots, nil
28
+
}
+52
-2
spindle/ingester.go
+52
-2
spindle/ingester.go
···
5
"encoding/json"
6
"fmt"
7
8
-
"github.com/bluesky-social/jetstream/pkg/models"
9
"tangled.sh/tangled.sh/core/api/tangled"
10
)
11
12
type Ingester func(ctx context.Context, e *models.Event) error
···
29
switch e.Commit.Collection {
30
case tangled.SpindleMemberNSID:
31
s.ingestMember(ctx, e)
32
}
33
34
return err
···
68
return fmt.Errorf("failed to enforce permissions: %w", err)
69
}
70
71
-
if err := s.e.AddMember(rbacDomain, record.Subject); err != nil {
72
l.Error("failed to add member", "error", err)
73
return fmt.Errorf("failed to add member: %w", err)
74
}
···
85
}
86
return nil
87
}
···
5
"encoding/json"
6
"fmt"
7
8
"tangled.sh/tangled.sh/core/api/tangled"
9
+
"tangled.sh/tangled.sh/core/knotclient"
10
+
11
+
"github.com/bluesky-social/jetstream/pkg/models"
12
)
13
14
type Ingester func(ctx context.Context, e *models.Event) error
···
31
switch e.Commit.Collection {
32
case tangled.SpindleMemberNSID:
33
s.ingestMember(ctx, e)
34
+
case tangled.RepoNSID:
35
+
s.ingestRepo(ctx, e)
36
}
37
38
return err
···
72
return fmt.Errorf("failed to enforce permissions: %w", err)
73
}
74
75
+
if err := s.e.AddKnotMember(rbacDomain, record.Subject); err != nil {
76
l.Error("failed to add member", "error", err)
77
return fmt.Errorf("failed to add member: %w", err)
78
}
···
89
}
90
return nil
91
}
92
+
93
+
func (s *Spindle) ingestRepo(_ context.Context, e *models.Event) error {
94
+
var err error
95
+
96
+
l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
97
+
98
+
switch e.Commit.Operation {
99
+
case models.CommitOperationCreate, models.CommitOperationUpdate:
100
+
raw := e.Commit.Record
101
+
record := tangled.Repo{}
102
+
err = json.Unmarshal(raw, &record)
103
+
if err != nil {
104
+
l.Error("invalid record", "error", err)
105
+
return err
106
+
}
107
+
108
+
domain := s.cfg.Server.Hostname
109
+
if s.cfg.Server.Dev {
110
+
domain = s.cfg.Server.ListenAddr
111
+
}
112
+
113
+
// no spindle configured for this repo
114
+
if record.Spindle == nil {
115
+
return nil
116
+
}
117
+
118
+
// this repo did not want this spindle
119
+
if *record.Spindle != domain {
120
+
return nil
121
+
}
122
+
123
+
// add this repo to the watch list
124
+
if err := s.db.AddRepo(record.Knot, record.Owner, record.Name); err != nil {
125
+
l.Error("failed to add repo", "error", err)
126
+
return fmt.Errorf("failed to add repo: %w", err)
127
+
}
128
+
129
+
// add this knot to the event consumer
130
+
src := knotclient.NewEventSource(record.Knot)
131
+
s.ks.AddSource(context.Background(), src)
132
+
133
+
return nil
134
+
135
+
}
136
+
return nil
137
+
}