-34
api/tangled/pipelinecancelPipeline.go
-34
api/tangled/pipelinecancelPipeline.go
···
1
-
// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
2
-
3
-
package tangled
4
-
5
-
// schema: sh.tangled.pipeline.cancelPipeline
6
-
7
-
import (
8
-
"context"
9
-
10
-
"github.com/bluesky-social/indigo/lex/util"
11
-
)
12
-
13
-
const (
14
-
PipelineCancelPipelineNSID = "sh.tangled.pipeline.cancelPipeline"
15
-
)
16
-
17
-
// PipelineCancelPipeline_Input is the input argument to a sh.tangled.pipeline.cancelPipeline call.
18
-
type PipelineCancelPipeline_Input struct {
19
-
// pipeline: pipeline at-uri
20
-
Pipeline string `json:"pipeline" cborgen:"pipeline"`
21
-
// repo: repo at-uri, spindle can't resolve repo from pipeline at-uri yet
22
-
Repo string `json:"repo" cborgen:"repo"`
23
-
// workflow: workflow name
24
-
Workflow string `json:"workflow" cborgen:"workflow"`
25
-
}
26
-
27
-
// PipelineCancelPipeline calls the XRPC method "sh.tangled.pipeline.cancelPipeline".
28
-
func PipelineCancelPipeline(ctx context.Context, c util.LexClient, input *PipelineCancelPipeline_Input) error {
29
-
if err := c.LexDo(ctx, util.Procedure, "application/json", "sh.tangled.pipeline.cancelPipeline", nil, input, nil); err != nil {
30
-
return err
31
-
}
32
-
33
-
return nil
34
-
}
+6
-6
appview/db/pipeline.go
+6
-6
appview/db/pipeline.go
···
6
6
"strings"
7
7
"time"
8
8
9
-
"github.com/bluesky-social/indigo/atproto/syntax"
10
9
"tangled.org/core/appview/models"
11
10
"tangled.org/core/orm"
12
11
)
···
217
216
}
218
217
defer rows.Close()
219
218
220
-
pipelines := make(map[syntax.ATURI]models.Pipeline)
219
+
pipelines := make(map[string]models.Pipeline)
221
220
for rows.Next() {
222
221
var p models.Pipeline
223
222
var t models.Trigger
···
254
253
p.Trigger = &t
255
254
p.Statuses = make(map[string]models.WorkflowStatus)
256
255
257
-
pipelines[p.AtUri()] = p
256
+
k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
257
+
pipelines[k] = p
258
258
}
259
259
260
260
// get all statuses
···
314
314
return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
315
315
}
316
316
317
-
pipelineAt := ps.PipelineAt()
317
+
key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
318
318
319
319
// extract
320
-
pipeline, ok := pipelines[pipelineAt]
320
+
pipeline, ok := pipelines[key]
321
321
if !ok {
322
322
continue
323
323
}
···
331
331
332
332
// reassign
333
333
pipeline.Statuses[ps.Workflow] = statuses
334
-
pipelines[pipelineAt] = pipeline
334
+
pipelines[key] = pipeline
335
335
}
336
336
337
337
var all []models.Pipeline
-10
appview/models/pipeline.go
-10
appview/models/pipeline.go
···
1
1
package models
2
2
3
3
import (
4
-
"fmt"
5
4
"slices"
6
5
"time"
7
6
8
7
"github.com/bluesky-social/indigo/atproto/syntax"
9
8
"github.com/go-git/go-git/v5/plumbing"
10
-
"tangled.org/core/api/tangled"
11
9
spindle "tangled.org/core/spindle/models"
12
10
"tangled.org/core/workflow"
13
11
)
···
25
23
// populate when querying for reverse mappings
26
24
Trigger *Trigger
27
25
Statuses map[string]WorkflowStatus
28
-
}
29
-
30
-
func (p *Pipeline) AtUri() syntax.ATURI {
31
-
return syntax.ATURI(fmt.Sprintf("at://did:web:%s/%s/%s", p.Knot, tangled.PipelineNSID, p.Rkey))
32
26
}
33
27
34
28
type WorkflowStatus struct {
···
134
128
Error *string
135
129
ExitCode int
136
130
}
137
-
138
-
func (ps *PipelineStatus) PipelineAt() syntax.ATURI {
139
-
return syntax.ATURI(fmt.Sprintf("at://did:web:%s/%s/%s", ps.PipelineKnot, tangled.PipelineNSID, ps.PipelineRkey))
140
-
}
+63
-22
appview/pages/templates/layouts/fragments/topbar.html
+63
-22
appview/pages/templates/layouts/fragments/topbar.html
···
47
47
{{ end }}
48
48
49
49
{{ define "profileDropdown" }}
50
-
<details class="relative inline-block text-left nav-dropdown">
51
-
<summary class="cursor-pointer list-none flex items-center gap-1">
52
-
{{ $user := .Did }}
53
-
<img
54
-
src="{{ tinyAvatar $user }}"
55
-
alt=""
56
-
class="rounded-full h-6 w-6 border border-gray-300 dark:border-gray-700"
57
-
/>
58
-
<span class="hidden md:inline">{{ $user | resolve | truncateAt30 }}</span>
59
-
</summary>
60
-
<div class="absolute flex flex-col right-0 mt-4 p-4 rounded w-48 bg-white dark:bg-gray-800 dark:text-white border border-gray-200 dark:border-gray-700">
61
-
<a href="/{{ $user }}">profile</a>
62
-
<a href="/{{ $user }}?tab=repos">repositories</a>
63
-
<a href="/{{ $user }}?tab=strings">strings</a>
64
-
<a href="/settings">settings</a>
50
+
{{ $user := .Did }}
51
+
<button type="button" popovertarget="navigation-popover" class="site-navigation-dropdown-trigger" aria-label="Open site navigation dropdown">
52
+
<img
53
+
src="{{ tinyAvatar $user }}"
54
+
alt=""
55
+
class="rounded-full h-6 w-6 border border-gray-300 dark:border-gray-700"
56
+
/>
57
+
</button>
58
+
<div popover="auto" id="navigation-popover" class="site-navigation-popover shadow-md border border-gray-200 rounded p-2 bg-white dark:bg-gray-800 dark:text-white dark:border-gray-700">
59
+
<div class="flex gap-2 py-2">
60
+
<img
61
+
src="{{ tinyAvatar $user }}"
62
+
alt=""
63
+
class="rounded-full h-6 w-6 border border-gray-300 dark:border-gray-700"
64
+
/>
65
+
<p>{{ $user | resolve | truncateAt30 }}</p>
66
+
</div>
67
+
<hr class="h-1 w-full mb-1 mt-2 dark:border-gray-700" />
68
+
<ul id="navigation-menu-popover">
69
+
<li>
70
+
<a href="/{{ $user }}">
71
+
{{ i "user" "w-4 h-4" }}
72
+
<span>profile</span>
73
+
</a>
74
+
</li>
75
+
<li>
76
+
<a href="/{{ $user }}?tab=repos">
77
+
{{ i "book-marked" "w-4 h-4" }}
78
+
<span>repositories</span>
79
+
</a>
80
+
</li>
81
+
<li>
82
+
<a href="/{{ $user }}?tab=strings">
83
+
{{ i "spool" "w-4 h-4" }}
84
+
<span>strings</span>
85
+
</a>
86
+
</li>
87
+
<li>
88
+
<a href="/settings">
89
+
{{ i "settings" "w-4 h-4" }}
90
+
<span>settings</span>
91
+
</a>
92
+
</li>
93
+
<hr class="h-1 w-full mb-1 mt-2 dark:border-gray-700" />
94
+
<li>
65
95
<a href="#"
66
-
hx-post="/logout"
67
-
hx-swap="none"
68
-
class="text-red-400 hover:text-red-700 dark:text-red-400 dark:hover:text-red-300">
69
-
logout
70
-
</a>
71
-
</div>
72
-
</details>
96
+
hx-post="/logout"
97
+
hx-swap="none"
98
+
class="text-red-400 flex gap-2 items-center hover:bg-red-50 hover:text-red-700 px-2 py-2 rounded-sm dark:hover:bg-red-700 dark:hover:text-red-50">
99
+
{{ i "arrow-right-from-line" "w-4 h-4" }}
100
+
<span>logout</span>
101
+
</a>
102
+
</li>
103
+
</ul>
104
+
</div>
73
105
74
106
<script>
75
107
document.addEventListener('click', function(event) {
···
80
112
}
81
113
});
82
114
});
115
+
116
+
const navigationPopoverLinks = document.querySelectorAll("#navigation-menu-popover li a");
117
+
const currentPageURL = window.location.href
118
+
navigationPopoverLinks.forEach(link => {
119
+
const navigationPopoverLinkURL = link.href
120
+
if (navigationPopoverLinkURL === currentPageURL) {
121
+
link.ariaCurrent = "page"
122
+
}
123
+
})
83
124
</script>
84
125
{{ end }}
-10
appview/pages/templates/repo/pipelines/workflow.html
-10
appview/pages/templates/repo/pipelines/workflow.html
···
12
12
{{ block "sidebar" . }} {{ end }}
13
13
</div>
14
14
<div class="col-span-1 md:col-span-3">
15
-
<div class="flex justify-end mb-2">
16
-
<button
17
-
class="btn"
18
-
hx-post="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/cancel"
19
-
hx-swap="none"
20
-
{{ if (index .Pipeline.Statuses .Workflow).Latest.Status.IsFinish -}}
21
-
disabled
22
-
{{- end }}
23
-
>Cancel</button>
24
-
</div>
25
15
{{ block "logs" . }} {{ end }}
26
16
</div>
27
17
</section>
-82
appview/pipelines/pipelines.go
-82
appview/pipelines/pipelines.go
···
4
4
"bytes"
5
5
"context"
6
6
"encoding/json"
7
-
"fmt"
8
7
"log/slog"
9
8
"net/http"
10
9
"strings"
11
10
"time"
12
11
13
-
"tangled.org/core/api/tangled"
14
12
"tangled.org/core/appview/config"
15
13
"tangled.org/core/appview/db"
16
-
"tangled.org/core/appview/models"
17
14
"tangled.org/core/appview/oauth"
18
15
"tangled.org/core/appview/pages"
19
16
"tangled.org/core/appview/reporesolver"
···
44
41
r.Get("/", p.Index)
45
42
r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
46
43
r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
47
-
r.Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel)
48
44
49
45
return r
50
46
}
···
318
314
}
319
315
}
320
316
}
321
-
}
322
-
323
-
func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) {
324
-
l := p.logger.With("handler", "Cancel")
325
-
326
-
var (
327
-
pipelineId = chi.URLParam(r, "pipeline")
328
-
workflow = chi.URLParam(r, "workflow")
329
-
)
330
-
if pipelineId == "" || workflow == "" {
331
-
http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
332
-
return
333
-
}
334
-
335
-
f, err := p.repoResolver.Resolve(r)
336
-
if err != nil {
337
-
l.Error("failed to get repo and knot", "err", err)
338
-
http.Error(w, "bad repo/knot", http.StatusBadRequest)
339
-
return
340
-
}
341
-
342
-
pipeline, err := func() (models.Pipeline, error) {
343
-
ps, err := db.GetPipelineStatuses(
344
-
p.db,
345
-
1,
346
-
orm.FilterEq("repo_owner", f.Did),
347
-
orm.FilterEq("repo_name", f.Name),
348
-
orm.FilterEq("knot", f.Knot),
349
-
orm.FilterEq("id", pipelineId),
350
-
)
351
-
if err != nil {
352
-
return models.Pipeline{}, err
353
-
}
354
-
if len(ps) != 1 {
355
-
return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps))
356
-
}
357
-
return ps[0], nil
358
-
}()
359
-
if err != nil {
360
-
l.Error("pipeline query failed", "err", err)
361
-
http.Error(w, "pipeline not found", http.StatusNotFound)
362
-
}
363
-
var (
364
-
spindle = f.Spindle
365
-
knot = f.Knot
366
-
rkey = pipeline.Rkey
367
-
)
368
-
369
-
if spindle == "" || knot == "" || rkey == "" {
370
-
http.Error(w, "invalid repo info", http.StatusBadRequest)
371
-
return
372
-
}
373
-
374
-
spindleClient, err := p.oauth.ServiceClient(
375
-
r,
376
-
oauth.WithService(f.Spindle),
377
-
oauth.WithLxm(tangled.PipelineCancelPipelineNSID),
378
-
oauth.WithExp(60),
379
-
oauth.WithDev(p.config.Core.Dev),
380
-
oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time
381
-
)
382
-
383
-
err = tangled.PipelineCancelPipeline(
384
-
r.Context(),
385
-
spindleClient,
386
-
&tangled.PipelineCancelPipeline_Input{
387
-
Repo: string(f.RepoAt()),
388
-
Pipeline: pipeline.AtUri().String(),
389
-
Workflow: workflow,
390
-
},
391
-
)
392
-
errorId := "pipeline-action"
393
-
if err != nil {
394
-
l.Error("failed to cancel pipeline", "err", err)
395
-
p.pages.Notice(w, errorId, "Failed to add secret.")
396
-
return
397
-
}
398
-
l.Debug("canceled pipeline", "uri", pipeline.AtUri())
399
317
}
400
318
401
319
// either a message or an error
+2
-30
flake.nix
+2
-30
flake.nix
···
91
91
spindle = self.callPackage ./nix/pkgs/spindle.nix {};
92
92
knot-unwrapped = self.callPackage ./nix/pkgs/knot-unwrapped.nix {};
93
93
knot = self.callPackage ./nix/pkgs/knot.nix {};
94
-
did-method-plc = self.callPackage ./nix/pkgs/did-method-plc.nix {};
95
-
bluesky-jetstream = self.callPackage ./nix/pkgs/bluesky-jetstream.nix {};
96
-
bluesky-relay = self.callPackage ./nix/pkgs/bluesky-relay.nix {};
97
-
tap = self.callPackage ./nix/pkgs/tap.nix {};
98
94
});
99
95
in {
100
96
overlays.default = final: prev: {
101
-
inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview did-method-plc bluesky-jetstream bluesky-relay tap;
97
+
inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview;
102
98
};
103
99
104
100
packages = forAllSystems (system: let
···
107
103
staticPackages = mkPackageSet pkgs.pkgsStatic;
108
104
crossPackages = mkPackageSet pkgs.pkgsCross.gnu64.pkgsStatic;
109
105
in {
110
-
inherit (packages) appview appview-static-files lexgen goat spindle knot knot-unwrapped sqlite-lib did-method-plc bluesky-jetstream bluesky-relay tap;
106
+
inherit (packages) appview appview-static-files lexgen goat spindle knot knot-unwrapped sqlite-lib;
111
107
112
108
pkgsStatic-appview = staticPackages.appview;
113
109
pkgsStatic-knot = staticPackages.knot;
···
306
302
imports = [./nix/modules/spindle.nix];
307
303
308
304
services.tangled.spindle.package = lib.mkDefault self.packages.${pkgs.stdenv.hostPlatform.system}.spindle;
309
-
};
310
-
nixosModules.did-method-plc = {
311
-
lib,
312
-
pkgs,
313
-
...
314
-
}: {
315
-
imports = [./nix/modules/did-method-plc.nix];
316
-
services.did-method-plc.package = lib.mkDefault self.packages.${pkgs.system}.did-method-plc;
317
-
};
318
-
nixosModules.bluesky-relay = {
319
-
lib,
320
-
pkgs,
321
-
...
322
-
}: {
323
-
imports = [./nix/modules/bluesky-relay.nix];
324
-
services.bluesky-relay.package = lib.mkDefault self.packages.${pkgs.system}.bluesky-relay;
325
-
};
326
-
nixosModules.bluesky-jetstream = {
327
-
lib,
328
-
pkgs,
329
-
...
330
-
}: {
331
-
imports = [./nix/modules/bluesky-jetstream.nix];
332
-
services.bluesky-jetstream.package = lib.mkDefault self.packages.${pkgs.system}.bluesky-jetstream;
333
305
};
334
306
};
335
307
}
+19
input.css
+19
input.css
···
89
89
@apply no-underline text-black hover:underline hover:text-gray-800 dark:text-white dark:hover:text-gray-300;
90
90
}
91
91
92
+
#navigation-menu-popover li:not(:last-of-type) a {
93
+
@apply flex gap-2 items-center px-2 pb-2 pt-1.5 rounded-sm hover:bg-green-50 hover:text-green-700 no-underline dark:hover:text-green-50 dark:hover:bg-green-700;
94
+
}
95
+
96
+
a[hx-post="/logout"] {
97
+
@apply no-underline;
98
+
}
99
+
92
100
label {
93
101
@apply block text-gray-900 text-sm font-bold py-2 uppercase dark:text-gray-100;
94
102
}
···
962
970
color: #f9fafb;
963
971
}
964
972
}
973
+
974
+
.site-navigation-dropdown-trigger {
975
+
anchor-name: --dropdown-trigger;
976
+
}
977
+
978
+
.site-navigation-popover {
979
+
margin: 0;
980
+
inset: auto;
981
+
position-anchor: --dropdown-trigger;
982
+
position-area: bottom left;
983
+
}
-33
lexicons/pipeline/cancelPipeline.json
-33
lexicons/pipeline/cancelPipeline.json
···
1
-
{
2
-
"lexicon": 1,
3
-
"id": "sh.tangled.pipeline.cancelPipeline",
4
-
"defs": {
5
-
"main": {
6
-
"type": "procedure",
7
-
"description": "Cancel a running pipeline",
8
-
"input": {
9
-
"encoding": "application/json",
10
-
"schema": {
11
-
"type": "object",
12
-
"required": ["repo", "pipeline", "workflow"],
13
-
"properties": {
14
-
"repo": {
15
-
"type": "string",
16
-
"format": "at-uri",
17
-
"description": "repo at-uri, spindle can't resolve repo from pipeline at-uri yet"
18
-
},
19
-
"pipeline": {
20
-
"type": "string",
21
-
"format": "at-uri",
22
-
"description": "pipeline at-uri"
23
-
},
24
-
"workflow": {
25
-
"type": "string",
26
-
"description": "workflow name"
27
-
}
28
-
}
29
-
}
30
-
}
31
-
}
32
-
}
33
-
}
-64
nix/modules/bluesky-jetstream.nix
-64
nix/modules/bluesky-jetstream.nix
···
1
-
{
2
-
config,
3
-
pkgs,
4
-
lib,
5
-
...
6
-
}: let
7
-
cfg = config.services.bluesky-jetstream;
8
-
in
9
-
with lib; {
10
-
options.services.bluesky-jetstream = {
11
-
enable = mkEnableOption "jetstream server";
12
-
package = mkPackageOption pkgs "bluesky-jetstream" {};
13
-
14
-
# dataDir = mkOption {
15
-
# type = types.str;
16
-
# default = "/var/lib/jetstream";
17
-
# description = "directory to store data (pebbleDB)";
18
-
# };
19
-
livenessTtl = mkOption {
20
-
type = types.int;
21
-
default = 15;
22
-
description = "time to restart when no event detected (seconds)";
23
-
};
24
-
websocketUrl = mkOption {
25
-
type = types.str;
26
-
default = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos";
27
-
description = "full websocket path to the ATProto SubscribeRepos XRPC endpoint";
28
-
};
29
-
};
30
-
config = mkIf cfg.enable {
31
-
systemd.services.bluesky-jetstream = {
32
-
description = "bluesky jetstream";
33
-
after = ["network.target" "pds.service"];
34
-
wantedBy = ["multi-user.target"];
35
-
36
-
serviceConfig = {
37
-
User = "jetstream";
38
-
Group = "jetstream";
39
-
StateDirectory = "jetstream";
40
-
StateDirectoryMode = "0755";
41
-
# preStart = ''
42
-
# mkdir -p "${cfg.dataDir}"
43
-
# chown -R jetstream:jetstream "${cfg.dataDir}"
44
-
# '';
45
-
# WorkingDirectory = cfg.dataDir;
46
-
Environment = [
47
-
"JETSTREAM_DATA_DIR=/var/lib/jetstream/data"
48
-
"JETSTREAM_LIVENESS_TTL=${toString cfg.livenessTtl}s"
49
-
"JETSTREAM_WS_URL=${cfg.websocketUrl}"
50
-
];
51
-
ExecStart = getExe cfg.package;
52
-
Restart = "always";
53
-
RestartSec = 5;
54
-
};
55
-
};
56
-
users = {
57
-
users.jetstream = {
58
-
group = "jetstream";
59
-
isSystemUser = true;
60
-
};
61
-
groups.jetstream = {};
62
-
};
63
-
};
64
-
}
-48
nix/modules/bluesky-relay.nix
-48
nix/modules/bluesky-relay.nix
···
1
-
{
2
-
config,
3
-
pkgs,
4
-
lib,
5
-
...
6
-
}: let
7
-
cfg = config.services.bluesky-relay;
8
-
in
9
-
with lib; {
10
-
options.services.bluesky-relay = {
11
-
enable = mkEnableOption "relay server";
12
-
package = mkPackageOption pkgs "bluesky-relay" {};
13
-
};
14
-
config = mkIf cfg.enable {
15
-
systemd.services.bluesky-relay = {
16
-
description = "bluesky relay";
17
-
after = ["network.target" "pds.service"];
18
-
wantedBy = ["multi-user.target"];
19
-
20
-
serviceConfig = {
21
-
User = "relay";
22
-
Group = "relay";
23
-
StateDirectory = "relay";
24
-
StateDirectoryMode = "0755";
25
-
Environment = [
26
-
"RELAY_ADMIN_PASSWORD=password"
27
-
"RELAY_PLC_HOST=https://plc.tngl.boltless.dev"
28
-
"DATABASE_URL=sqlite:///var/lib/relay/relay.sqlite"
29
-
"RELAY_IP_BIND=:2470"
30
-
"RELAY_PERSIST_DIR=/var/lib/relay"
31
-
"RELAY_DISABLE_REQUEST_CRAWL=0"
32
-
"RELAY_INITIAL_SEQ_NUMBER=1"
33
-
"RELAY_ALLOW_INSECURE_HOSTS=1"
34
-
];
35
-
ExecStart = "${getExe cfg.package} serve";
36
-
Restart = "always";
37
-
RestartSec = 5;
38
-
};
39
-
};
40
-
users = {
41
-
users.relay = {
42
-
group = "relay";
43
-
isSystemUser = true;
44
-
};
45
-
groups.relay = {};
46
-
};
47
-
};
48
-
}
-76
nix/modules/did-method-plc.nix
-76
nix/modules/did-method-plc.nix
···
1
-
{
2
-
config,
3
-
pkgs,
4
-
lib,
5
-
...
6
-
}: let
7
-
cfg = config.services.did-method-plc;
8
-
in
9
-
with lib; {
10
-
options.services.did-method-plc = {
11
-
enable = mkEnableOption "did-method-plc server";
12
-
package = mkPackageOption pkgs "did-method-plc" {};
13
-
};
14
-
config = mkIf cfg.enable {
15
-
services.postgresql = {
16
-
enable = true;
17
-
package = pkgs.postgresql_14;
18
-
ensureDatabases = ["plc"];
19
-
ensureUsers = [
20
-
{
21
-
name = "pg";
22
-
# ensurePermissions."DATABASE plc" = "ALL PRIVILEGES";
23
-
}
24
-
];
25
-
authentication = ''
26
-
local all all trust
27
-
host all all 127.0.0.1/32 trust
28
-
'';
29
-
};
30
-
systemd.services.did-method-plc = {
31
-
description = "did-method-plc";
32
-
33
-
after = ["postgresql.service"];
34
-
wants = ["postgresql.service"];
35
-
wantedBy = ["multi-user.target"];
36
-
37
-
environment = let
38
-
db_creds_json = builtins.toJSON {
39
-
username = "pg";
40
-
password = "";
41
-
host = "127.0.0.1";
42
-
port = 5432;
43
-
};
44
-
in {
45
-
# TODO: inherit from config
46
-
DEBUG_MODE = "1";
47
-
LOG_ENABLED = "true";
48
-
LOG_LEVEL = "debug";
49
-
LOG_DESTINATION = "1";
50
-
ENABLE_MIGRATIONS = "true";
51
-
DB_CREDS_JSON = db_creds_json;
52
-
DB_MIGRATE_CREDS_JSON = db_creds_json;
53
-
PLC_VERSION = "0.0.1";
54
-
PORT = "8080";
55
-
};
56
-
57
-
serviceConfig = {
58
-
ExecStart = getExe cfg.package;
59
-
User = "plc";
60
-
Group = "plc";
61
-
StateDirectory = "plc";
62
-
StateDirectoryMode = "0755";
63
-
Restart = "always";
64
-
65
-
# Hardening
66
-
};
67
-
};
68
-
users = {
69
-
users.plc = {
70
-
group = "plc";
71
-
isSystemUser = true;
72
-
};
73
-
groups.plc = {};
74
-
};
75
-
};
76
-
}
-20
nix/pkgs/bluesky-jetstream.nix
-20
nix/pkgs/bluesky-jetstream.nix
···
1
-
{
2
-
buildGoModule,
3
-
fetchFromGitHub,
4
-
}:
5
-
buildGoModule {
6
-
pname = "bluesky-jetstream";
7
-
version = "0.1.0";
8
-
src = fetchFromGitHub {
9
-
owner = "bluesky-social";
10
-
repo = "jetstream";
11
-
rev = "7d7efa58d7f14101a80ccc4f1085953948b7d5de";
12
-
sha256 = "sha256-1e9SL/8gaDPMA4YZed51ffzgpkptbMd0VTbTTDbPTFw=";
13
-
};
14
-
subPackages = ["cmd/jetstream"];
15
-
vendorHash = "sha256-/21XJQH6fo9uPzlABUAbdBwt1O90odmppH6gXu2wkiQ=";
16
-
doCheck = false;
17
-
meta = {
18
-
mainProgram = "jetstream";
19
-
};
20
-
}
-20
nix/pkgs/bluesky-relay.nix
-20
nix/pkgs/bluesky-relay.nix
···
1
-
{
2
-
buildGoModule,
3
-
fetchFromGitHub,
4
-
}:
5
-
buildGoModule {
6
-
pname = "bluesky-relay";
7
-
version = "0.1.0";
8
-
src = fetchFromGitHub {
9
-
owner = "boltlessengineer";
10
-
repo = "indigo";
11
-
rev = "b769ea60b7dde5e2bd0b8ee3ce8462a0c0e596fe";
12
-
sha256 = "sha256-jHRY825TBYaH1WkKFUoNbo4UlMSyuHvCGjYPiBnKo44=";
13
-
};
14
-
subPackages = ["cmd/relay"];
15
-
vendorHash = "sha256-UOedwNYnM8Jx6B7Y9tFcZX8IeUBESAFAPTRYk7n0yo8=";
16
-
doCheck = false;
17
-
meta = {
18
-
mainProgram = "relay";
19
-
};
20
-
}
-65
nix/pkgs/did-method-plc.nix
-65
nix/pkgs/did-method-plc.nix
···
1
-
# inspired by https://github.com/NixOS/nixpkgs/blob/333bfb7c258fab089a834555ea1c435674c459b4/pkgs/by-name/ga/gatsby-cli/package.nix
2
-
{
3
-
lib,
4
-
stdenv,
5
-
fetchFromGitHub,
6
-
fetchYarnDeps,
7
-
yarnConfigHook,
8
-
yarnBuildHook,
9
-
nodejs,
10
-
makeBinaryWrapper,
11
-
}:
12
-
stdenv.mkDerivation (finalAttrs: {
13
-
pname = "did-method-plc";
14
-
version = "0.0.1";
15
-
16
-
src = fetchFromGitHub {
17
-
owner = "did-method-plc";
18
-
repo = "did-method-plc";
19
-
rev = "158ba5535ac3da4fd4309954bde41deab0b45972";
20
-
sha256 = "sha256-O5smubbrnTDMCvL6iRyMXkddr5G7YHxkQRVMRULHanQ=";
21
-
};
22
-
postPatch = ''
23
-
# remove dd-trace dependency
24
-
sed -i '3d' packages/server/service/index.js
25
-
'';
26
-
27
-
yarnOfflineCache = fetchYarnDeps {
28
-
yarnLock = finalAttrs.src + "/yarn.lock";
29
-
hash = "sha256-g8GzaAbWSnWwbQjJMV2DL5/ZlWCCX0sRkjjvX3tqU4Y=";
30
-
};
31
-
32
-
nativeBuildInputs = [
33
-
yarnConfigHook
34
-
yarnBuildHook
35
-
nodejs
36
-
makeBinaryWrapper
37
-
];
38
-
yarnBuildScript = "lerna";
39
-
yarnBuildFlags = [
40
-
"run"
41
-
"build"
42
-
"--scope"
43
-
"@did-plc/server"
44
-
"--include-dependencies"
45
-
];
46
-
47
-
installPhase = ''
48
-
runHook preInstall
49
-
50
-
mkdir -p $out/lib/node_modules/
51
-
mv packages/ $out/lib/packages/
52
-
mv node_modules/* $out/lib/node_modules/
53
-
54
-
makeWrapper ${lib.getExe nodejs} $out/bin/plc \
55
-
--add-flags $out/lib/packages/server/service/index.js \
56
-
--add-flags --enable-source-maps \
57
-
--set NODE_PATH $out/lib/node_modules
58
-
59
-
runHook postInstall
60
-
'';
61
-
62
-
meta = {
63
-
mainProgram = "plc";
64
-
};
65
-
})
-20
nix/pkgs/tap.nix
-20
nix/pkgs/tap.nix
···
1
-
{
2
-
buildGoModule,
3
-
fetchFromGitHub,
4
-
}:
5
-
buildGoModule {
6
-
pname = "tap";
7
-
version = "0.1.0";
8
-
src = fetchFromGitHub {
9
-
owner = "bluesky-social";
10
-
repo = "indigo";
11
-
rev = "f92cb29224fcc60f666b20ee3514e431a58ff811";
12
-
sha256 = "sha256-35ltXnq0SJeo3j33D7Nndbcnw5XWBJLRrmZ+nCmZVQw=";
13
-
};
14
-
subPackages = ["cmd/tap"];
15
-
vendorHash = "sha256-UOedwNYnM8Jx6B7Y9tFcZX8IeUBESAFAPTRYk7n0yo8=";
16
-
doCheck = false;
17
-
meta = {
18
-
mainProgram = "tap";
19
-
};
20
-
}
-144
rbac2/rbac2.go
-144
rbac2/rbac2.go
···
1
-
package rbac2
2
-
3
-
import (
4
-
"database/sql"
5
-
"fmt"
6
-
7
-
adapter "github.com/Blank-Xu/sql-adapter"
8
-
"github.com/bluesky-social/indigo/atproto/syntax"
9
-
"github.com/casbin/casbin/v2"
10
-
"github.com/casbin/casbin/v2/model"
11
-
"github.com/casbin/casbin/v2/util"
12
-
"tangled.org/core/api/tangled"
13
-
)
14
-
15
-
const (
16
-
Model = `
17
-
[request_definition]
18
-
r = sub, dom, obj, act
19
-
20
-
[policy_definition]
21
-
p = sub, dom, obj, act
22
-
23
-
[role_definition]
24
-
g = _, _, _
25
-
26
-
[policy_effect]
27
-
e = some(where (p.eft == allow))
28
-
29
-
[matchers]
30
-
m = g(r.sub, p.sub, r.dom) && keyMatch4(r.dom, p.dom) && r.obj == p.obj && r.act == p.act
31
-
`
32
-
)
33
-
34
-
type Enforcer struct {
35
-
e *casbin.Enforcer
36
-
}
37
-
38
-
func NewEnforcer(path string) (*Enforcer, error) {
39
-
m, err := model.NewModelFromString(Model)
40
-
if err != nil {
41
-
return nil, err
42
-
}
43
-
44
-
db, err := sql.Open("sqlite3", path+"?_foreign_keys=1")
45
-
if err != nil {
46
-
return nil, err
47
-
}
48
-
49
-
a, err := adapter.NewAdapter(db, "sqlite3", "acl")
50
-
if err != nil {
51
-
return nil, err
52
-
}
53
-
54
-
e, err := casbin.NewEnforcer(m, a)
55
-
if err != nil {
56
-
return nil, err
57
-
}
58
-
59
-
if err := seedTangledPolicies(e); err != nil {
60
-
return nil, err
61
-
}
62
-
63
-
return &Enforcer{e}, nil
64
-
}
65
-
66
-
func seedTangledPolicies(e *casbin.Enforcer) error {
67
-
// policies
68
-
aturi := func(nsid string) string {
69
-
return fmt.Sprintf("at://{did}/%s/{rkey}", nsid)
70
-
}
71
-
72
-
_, err := e.AddPoliciesEx([][]string{
73
-
// sub | dom | obj | act
74
-
{"repo:owner", aturi(tangled.RepoNSID), "/", "write"},
75
-
{"repo:owner", aturi(tangled.RepoNSID), "/collaborator", "write"}, // invite
76
-
{"repo:collaborator", aturi(tangled.RepoNSID), "/settings", "write"},
77
-
{"repo:collaborator", aturi(tangled.RepoNSID), "/git", "write"}, // git push
78
-
79
-
{"server:owner", "/knot/{did}", "/member", "write"}, // invite
80
-
{"server:member", "/knot/{did}", "/git", "write"},
81
-
82
-
{"server:owner", "/spindle/{did}", "/member", "write"}, // invite
83
-
})
84
-
if err != nil {
85
-
return err
86
-
}
87
-
88
-
// grouping policies
89
-
// TODO(boltless): define our own matcher to replace keyMatch4
90
-
e.AddNamedDomainMatchingFunc("g", "keyMatch4", util.KeyMatch4)
91
-
_, err = e.AddGroupingPoliciesEx([][]string{
92
-
// sub | role | dom
93
-
{"repo:owner", "repo:collaborator", aturi(tangled.RepoNSID)},
94
-
95
-
// using '/knot/' prefix here because knot/spindle identifiers don't
96
-
// include the collection type
97
-
{"server:owner", "server:member", "/knot/{did}"},
98
-
{"server:owner", "server:member", "/spindle/{did}"},
99
-
})
100
-
return err
101
-
}
102
-
103
-
func (e *Enforcer) hasImplicitRoleForUser(name string, role string, domain ...string) (bool, error) {
104
-
roles, err := e.e.GetImplicitRolesForUser(name, domain...)
105
-
if err != nil {
106
-
return false, err
107
-
}
108
-
for _, r := range roles {
109
-
if r == role {
110
-
return true, nil
111
-
}
112
-
}
113
-
return false, nil
114
-
}
115
-
116
-
// setRoleForUser sets single user role for specified domain.
117
-
// All existing users with that role will be removed.
118
-
func (e *Enforcer) setRoleForUser(name string, role string, domain ...string) error {
119
-
currentUsers, err := e.e.GetUsersForRole(role, domain...)
120
-
if err != nil {
121
-
return err
122
-
}
123
-
124
-
for _, oldUser := range currentUsers {
125
-
_, err = e.e.DeleteRoleForUser(oldUser, role, domain...)
126
-
if err != nil {
127
-
return err
128
-
}
129
-
}
130
-
131
-
_, err = e.e.AddRoleForUser(name, role, domain...)
132
-
return err
133
-
}
134
-
135
-
// validateAtUri enforeces AT-URI to have valid did as authority and match collection NSID.
136
-
func validateAtUri(uri syntax.ATURI, expected string) error {
137
-
if !uri.Authority().IsDID() {
138
-
return fmt.Errorf("expected at-uri with did")
139
-
}
140
-
if expected != "" && uri.Collection().String() != expected {
141
-
return fmt.Errorf("incorrect repo at-uri collection nsid '%s' (expected '%s')", uri.Collection(), expected)
142
-
}
143
-
return nil
144
-
}
-115
rbac2/rbac2_test.go
-115
rbac2/rbac2_test.go
···
1
-
package rbac2_test
2
-
3
-
import (
4
-
"testing"
5
-
6
-
"github.com/bluesky-social/indigo/atproto/syntax"
7
-
_ "github.com/mattn/go-sqlite3"
8
-
"github.com/stretchr/testify/assert"
9
-
"tangled.org/core/rbac2"
10
-
)
11
-
12
-
func setup(t *testing.T) *rbac2.Enforcer {
13
-
enforcer, err := rbac2.NewEnforcer(":memory:")
14
-
assert.NoError(t, err)
15
-
16
-
return enforcer
17
-
}
18
-
19
-
func TestRepoOwnerPermissions(t *testing.T) {
20
-
var (
21
-
e = setup(t)
22
-
ok bool
23
-
err error
24
-
fooRepo = syntax.ATURI("at://did:plc:foo/sh.tangled.repo/reporkey")
25
-
fooUser = syntax.DID("did:plc:foo")
26
-
)
27
-
28
-
assert.NoError(t, e.AddRepo(fooRepo))
29
-
30
-
ok, err = e.IsRepoOwner(fooUser, fooRepo)
31
-
assert.NoError(t, err)
32
-
assert.True(t, ok, "repo author should be repo owner")
33
-
34
-
ok, err = e.IsRepoWriteAllowed(fooUser, fooRepo)
35
-
assert.NoError(t, err)
36
-
assert.True(t, ok, "repo owner should be able to modify the repo itself")
37
-
38
-
ok, err = e.IsRepoCollaborator(fooUser, fooRepo)
39
-
assert.NoError(t, err)
40
-
assert.True(t, ok, "repo owner should inherit role role:collaborator")
41
-
42
-
ok, err = e.IsRepoSettingsWriteAllowed(fooUser, fooRepo)
43
-
assert.NoError(t, err)
44
-
assert.True(t, ok, "repo owner should inherit collaborator permissions")
45
-
}
46
-
47
-
func TestRepoCollaboratorPermissions(t *testing.T) {
48
-
var (
49
-
e = setup(t)
50
-
ok bool
51
-
err error
52
-
fooRepo = syntax.ATURI("at://did:plc:foo/sh.tangled.repo/reporkey")
53
-
barUser = syntax.DID("did:plc:bar")
54
-
)
55
-
56
-
assert.NoError(t, e.AddRepo(fooRepo))
57
-
assert.NoError(t, e.AddRepoCollaborator(barUser, fooRepo))
58
-
59
-
ok, err = e.IsRepoCollaborator(barUser, fooRepo)
60
-
assert.NoError(t, err)
61
-
assert.True(t, ok, "should set repo collaborator")
62
-
63
-
ok, err = e.IsRepoSettingsWriteAllowed(barUser, fooRepo)
64
-
assert.NoError(t, err)
65
-
assert.True(t, ok, "repo collaborator should be able to edit repo settings")
66
-
67
-
ok, err = e.IsRepoWriteAllowed(barUser, fooRepo)
68
-
assert.NoError(t, err)
69
-
assert.False(t, ok, "repo collaborator shouldn't be able to modify the repo itself")
70
-
}
71
-
72
-
func TestGetByRole(t *testing.T) {
73
-
var (
74
-
e = setup(t)
75
-
err error
76
-
fooRepo = syntax.ATURI("at://did:plc:foo/sh.tangled.repo/reporkey")
77
-
owner = syntax.DID("did:plc:foo")
78
-
collaborator1 = syntax.DID("did:plc:bar")
79
-
collaborator2 = syntax.DID("did:plc:baz")
80
-
)
81
-
82
-
assert.NoError(t, e.AddRepo(fooRepo))
83
-
assert.NoError(t, e.AddRepoCollaborator(collaborator1, fooRepo))
84
-
assert.NoError(t, e.AddRepoCollaborator(collaborator2, fooRepo))
85
-
86
-
collaborators, err := e.GetRepoCollaborators(fooRepo)
87
-
assert.NoError(t, err)
88
-
assert.ElementsMatch(t, []syntax.DID{
89
-
owner,
90
-
collaborator1,
91
-
collaborator2,
92
-
}, collaborators)
93
-
}
94
-
95
-
func TestSpindleOwnerPermissions(t *testing.T) {
96
-
var (
97
-
e = setup(t)
98
-
ok bool
99
-
err error
100
-
spindle = syntax.DID("did:web:spindle.example.com")
101
-
owner = syntax.DID("did:plc:foo")
102
-
member = syntax.DID("did:plc:bar")
103
-
)
104
-
105
-
assert.NoError(t, e.SetSpindleOwner(owner, spindle))
106
-
assert.NoError(t, e.AddSpindleMember(member, spindle))
107
-
108
-
ok, err = e.IsSpindleMemberInviteAllowed(owner, spindle)
109
-
assert.NoError(t, err)
110
-
assert.True(t, ok, "spindle owner can invite members")
111
-
112
-
ok, err = e.IsSpindleMemberInviteAllowed(member, spindle)
113
-
assert.NoError(t, err)
114
-
assert.False(t, ok, "spindle member cannot invite members")
115
-
}
-91
rbac2/repo.go
-91
rbac2/repo.go
···
1
-
package rbac2
2
-
3
-
import (
4
-
"slices"
5
-
"strings"
6
-
7
-
"github.com/bluesky-social/indigo/atproto/syntax"
8
-
"tangled.org/core/api/tangled"
9
-
)
10
-
11
-
// AddRepo adds new repo with its owner to rbac enforcer
12
-
func (e *Enforcer) AddRepo(repo syntax.ATURI) error {
13
-
if err := validateAtUri(repo, tangled.RepoNSID); err != nil {
14
-
return err
15
-
}
16
-
user := repo.Authority()
17
-
18
-
return e.setRoleForUser(user.String(), "repo:owner", repo.String())
19
-
}
20
-
21
-
// DeleteRepo deletes all policies related to the repo
22
-
func (e *Enforcer) DeleteRepo(repo syntax.ATURI) error {
23
-
if err := validateAtUri(repo, tangled.RepoNSID); err != nil {
24
-
return err
25
-
}
26
-
27
-
_, err := e.e.DeleteDomains(repo.String())
28
-
return err
29
-
}
30
-
31
-
// AddRepoCollaborator adds new collaborator to the repo
32
-
func (e *Enforcer) AddRepoCollaborator(user syntax.DID, repo syntax.ATURI) error {
33
-
if err := validateAtUri(repo, tangled.RepoNSID); err != nil {
34
-
return err
35
-
}
36
-
37
-
_, err := e.e.AddRoleForUser(user.String(), "repo:collaborator", repo.String())
38
-
return err
39
-
}
40
-
41
-
// RemoveRepoCollaborator removes the collaborator from the repo.
42
-
// This won't remove inherited roles like repository owner.
43
-
func (e *Enforcer) RemoveRepoCollaborator(user syntax.DID, repo syntax.ATURI) error {
44
-
if err := validateAtUri(repo, tangled.RepoNSID); err != nil {
45
-
return err
46
-
}
47
-
48
-
_, err := e.e.DeleteRoleForUser(user.String(), "repo:collaborator", repo.String())
49
-
return err
50
-
}
51
-
52
-
func (e *Enforcer) GetRepoCollaborators(repo syntax.ATURI) ([]syntax.DID, error) {
53
-
var collaborators []syntax.DID
54
-
members, err := e.e.GetImplicitUsersForRole("repo:collaborator", repo.String())
55
-
if err != nil {
56
-
return nil, err
57
-
}
58
-
for _, m := range members {
59
-
if !strings.HasPrefix(m, "did:") { // skip non-user subjects like 'repo:owner'
60
-
continue
61
-
}
62
-
collaborators = append(collaborators, syntax.DID(m))
63
-
}
64
-
65
-
slices.Sort(collaborators)
66
-
return slices.Compact(collaborators), nil
67
-
}
68
-
69
-
func (e *Enforcer) IsRepoOwner(user syntax.DID, repo syntax.ATURI) (bool, error) {
70
-
return e.e.HasRoleForUser(user.String(), "repo:owner", repo.String())
71
-
}
72
-
73
-
func (e *Enforcer) IsRepoCollaborator(user syntax.DID, repo syntax.ATURI) (bool, error) {
74
-
return e.hasImplicitRoleForUser(user.String(), "repo:collaborator", repo.String())
75
-
}
76
-
77
-
func (e *Enforcer) IsRepoWriteAllowed(user syntax.DID, repo syntax.ATURI) (bool, error) {
78
-
return e.e.Enforce(user.String(), repo.String(), "#/", "write")
79
-
}
80
-
81
-
func (e *Enforcer) IsRepoSettingsWriteAllowed(user syntax.DID, repo syntax.ATURI) (bool, error) {
82
-
return e.e.Enforce(user.String(), repo.String(), "#/settings", "write")
83
-
}
84
-
85
-
func (e *Enforcer) IsRepoCollaboratorInviteAllowed(user syntax.DID, repo syntax.ATURI) (bool, error) {
86
-
return e.e.Enforce(user.String(), repo.String(), "#/collaborator", "write")
87
-
}
88
-
89
-
func (e *Enforcer) IsRepoGitPushAllowed(user syntax.DID, repo syntax.ATURI) (bool, error) {
90
-
return e.e.Enforce(user.String(), repo.String(), "#/git", "write")
91
-
}
-29
rbac2/spindle.go
-29
rbac2/spindle.go
···
1
-
package rbac2
2
-
3
-
import "github.com/bluesky-social/indigo/atproto/syntax"
4
-
5
-
func (e *Enforcer) SetSpindleOwner(user syntax.DID, spindle syntax.DID) error {
6
-
return e.setRoleForUser(user.String(), "server:owner", intoSpindle(spindle))
7
-
}
8
-
9
-
func (e *Enforcer) IsSpindleMember(user syntax.DID, spindle syntax.DID) (bool, error) {
10
-
return e.e.HasRoleForUser(user.String(), "server:member", spindle.String())
11
-
}
12
-
13
-
func (e *Enforcer) AddSpindleMember(user syntax.DID, spindle syntax.DID) error {
14
-
_, err := e.e.AddRoleForUser(user.String(), "server:member", intoSpindle(spindle))
15
-
return err
16
-
}
17
-
18
-
func (e *Enforcer) RemoveSpindleMember(user syntax.DID, spindle syntax.DID) error {
19
-
_, err := e.e.DeleteRoleForUser(user.String(), "server:member", intoSpindle(spindle))
20
-
return err
21
-
}
22
-
23
-
func (e *Enforcer) IsSpindleMemberInviteAllowed(user syntax.DID, spindle syntax.DID) (bool, error) {
24
-
return e.e.Enforce(user.String(), intoSpindle(spindle), "#/member", "write")
25
-
}
26
-
27
-
func intoSpindle(did syntax.DID) string {
28
-
return "/spindle/" + did.String()
29
-
}
+11
-11
spindle/config/config.go
+11
-11
spindle/config/config.go
···
9
9
)
10
10
11
11
type Server struct {
12
-
ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:6555"`
13
-
DBPath string `env:"DB_PATH, default=spindle.db"`
14
-
Hostname string `env:"HOSTNAME, required"`
15
-
JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"`
16
-
PlcUrl string `env:"PLC_URL, default=https://plc.directory"`
17
-
Dev bool `env:"DEV, default=false"`
18
-
Owner syntax.DID `env:"OWNER, required"`
19
-
Secrets Secrets `env:",prefix=SECRETS_"`
20
-
LogDir string `env:"LOG_DIR, default=/var/log/spindle"`
21
-
QueueSize int `env:"QUEUE_SIZE, default=100"`
22
-
MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time
12
+
ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:6555"`
13
+
DBPath string `env:"DB_PATH, default=spindle.db"`
14
+
Hostname string `env:"HOSTNAME, required"`
15
+
JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"`
16
+
PlcUrl string `env:"PLC_URL, default=https://plc.directory"`
17
+
Dev bool `env:"DEV, default=false"`
18
+
Owner string `env:"OWNER, required"`
19
+
Secrets Secrets `env:",prefix=SECRETS_"`
20
+
LogDir string `env:"LOG_DIR, default=/var/log/spindle"`
21
+
QueueSize int `env:"QUEUE_SIZE, default=100"`
22
+
MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time
23
23
}
24
24
25
25
func (s Server) Did() syntax.DID {
+18
-6
spindle/db/events.go
+18
-6
spindle/db/events.go
···
18
18
EventJson string `json:"event"`
19
19
}
20
20
21
-
func (d *DB) insertEvent(event Event, notifier *notifier.Notifier) error {
21
+
func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error {
22
22
_, err := d.Exec(
23
23
`insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`,
24
24
event.Rkey,
···
70
70
return evts, nil
71
71
}
72
72
73
+
func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error {
74
+
eventJson, err := json.Marshal(s)
75
+
if err != nil {
76
+
return err
77
+
}
78
+
79
+
event := Event{
80
+
Rkey: rkey,
81
+
Nsid: tangled.PipelineStatusNSID,
82
+
Created: time.Now().UnixNano(),
83
+
EventJson: string(eventJson),
84
+
}
85
+
86
+
return d.InsertEvent(event, n)
87
+
}
88
+
73
89
func (d *DB) createStatusEvent(
74
90
workflowId models.WorkflowId,
75
91
statusKind models.StatusKind,
···
100
116
EventJson: string(eventJson),
101
117
}
102
118
103
-
return d.insertEvent(event, n)
119
+
return d.InsertEvent(event, n)
104
120
105
121
}
106
122
···
148
164
149
165
func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
150
166
return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n)
151
-
}
152
-
153
-
func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error {
154
-
return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n)
155
167
}
156
168
157
169
func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
+10
-24
spindle/engines/nixery/engine.go
+10
-24
spindle/engines/nixery/engine.go
···
179
179
return err
180
180
}
181
181
e.registerCleanup(wid, func(ctx context.Context) error {
182
-
err := e.docker.NetworkRemove(ctx, networkName(wid))
183
-
if err != nil {
184
-
return fmt.Errorf("removing network: %w", err)
185
-
}
186
-
return nil
182
+
return e.docker.NetworkRemove(ctx, networkName(wid))
187
183
})
188
184
189
185
addl := wf.Data.(addlFields)
···
233
229
return fmt.Errorf("creating container: %w", err)
234
230
}
235
231
e.registerCleanup(wid, func(ctx context.Context) error {
236
-
err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
232
+
err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
237
233
if err != nil {
238
-
return fmt.Errorf("stopping container: %w", err)
234
+
return err
239
235
}
240
236
241
-
err = e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
237
+
return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
242
238
RemoveVolumes: true,
243
239
RemoveLinks: false,
244
240
Force: false,
245
241
})
246
-
if err != nil {
247
-
return fmt.Errorf("removing container: %w", err)
248
-
}
249
-
return nil
250
242
})
251
243
252
244
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
···
402
394
}
403
395
404
396
func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
405
-
fns := e.drainCleanups(wid)
397
+
e.cleanupMu.Lock()
398
+
key := wid.String()
399
+
400
+
fns := e.cleanup[key]
401
+
delete(e.cleanup, key)
402
+
e.cleanupMu.Unlock()
406
403
407
404
for _, fn := range fns {
408
405
if err := fn(ctx); err != nil {
···
418
415
419
416
key := wid.String()
420
417
e.cleanup[key] = append(e.cleanup[key], fn)
421
-
}
422
-
423
-
func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
424
-
e.cleanupMu.Lock()
425
-
key := wid.String()
426
-
427
-
fns := e.cleanup[key]
428
-
delete(e.cleanup, key)
429
-
e.cleanupMu.Unlock()
430
-
431
-
return fns
432
418
}
433
419
434
420
func networkName(wid models.WorkflowId) string {
+41
-17
spindle/ingester.go
+41
-17
spindle/ingester.go
···
9
9
10
10
"tangled.org/core/api/tangled"
11
11
"tangled.org/core/eventconsumer"
12
+
"tangled.org/core/rbac"
12
13
"tangled.org/core/spindle/db"
13
14
14
15
comatproto "github.com/bluesky-social/indigo/api/atproto"
16
+
"github.com/bluesky-social/indigo/atproto/identity"
15
17
"github.com/bluesky-social/indigo/atproto/syntax"
16
18
"github.com/bluesky-social/indigo/xrpc"
17
19
"github.com/bluesky-social/jetstream/pkg/models"
20
+
securejoin "github.com/cyphar/filepath-securejoin"
18
21
)
19
22
20
23
type Ingester func(ctx context.Context, e *models.Event) error
···
76
79
return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
77
80
}
78
81
79
-
ok, err := s.e.IsSpindleMemberInviteAllowed(syntax.DID(did), s.cfg.Server.Did())
82
+
ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
80
83
if err != nil || !ok {
81
84
l.Error("failed to add member", "did", did, "error", err)
82
85
return fmt.Errorf("failed to enforce permissions: %w", err)
···
93
96
return fmt.Errorf("failed to add member: %w", err)
94
97
}
95
98
96
-
if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil {
99
+
if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
97
100
l.Error("failed to add member", "error", err)
98
101
return fmt.Errorf("failed to add member: %w", err)
99
102
}
···
119
122
return fmt.Errorf("failed to remove member: %w", err)
120
123
}
121
124
122
-
if err := s.e.RemoveSpindleMember(record.Subject, s.cfg.Server.Did()); err != nil {
125
+
if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil {
123
126
l.Error("failed to add member", "error", err)
124
127
return fmt.Errorf("failed to add member: %w", err)
125
128
}
···
173
176
return fmt.Errorf("failed to add repo: %w", err)
174
177
}
175
178
176
-
repoAt := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, e.Commit.Collection, e.Commit.RKey))
179
+
didSlashRepo, err := securejoin.SecureJoin(did, record.Name)
180
+
if err != nil {
181
+
return err
182
+
}
177
183
178
184
// add repo to rbac
179
-
if err := s.e.AddRepo(repoAt); err != nil {
185
+
if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil {
180
186
l.Error("failed to add repo to enforcer", "error", err)
181
187
return fmt.Errorf("failed to add repo: %w", err)
182
188
}
183
189
184
190
// add collaborators to rbac
185
-
if err := s.fetchAndAddCollaborators(ctx, repoAt); err != nil {
191
+
owner, err := s.res.ResolveIdent(ctx, did)
192
+
if err != nil || owner.Handle.IsInvalidHandle() {
193
+
return err
194
+
}
195
+
if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil {
186
196
return err
187
197
}
188
198
···
224
234
return nil
225
235
}
226
236
237
+
// TODO: get rid of this entirely
238
+
// resolve this aturi to extract the repo record
239
+
owner, err := s.res.ResolveIdent(ctx, repoAt.Authority().String())
240
+
if err != nil || owner.Handle.IsInvalidHandle() {
241
+
return fmt.Errorf("failed to resolve handle: %w", err)
242
+
}
243
+
244
+
xrpcc := xrpc.Client{
245
+
Host: owner.PDSEndpoint(),
246
+
}
247
+
248
+
resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
249
+
if err != nil {
250
+
return err
251
+
}
252
+
253
+
repo := resp.Value.Val.(*tangled.Repo)
254
+
didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
255
+
227
256
// check perms for this user
228
-
if ok, err := s.e.IsRepoCollaboratorInviteAllowed(syntax.DID(e.Did), repoAt); !ok || err != nil {
257
+
if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
229
258
return fmt.Errorf("insufficient permissions: %w", err)
230
259
}
231
260
232
261
// add collaborator to rbac
233
-
if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), repoAt); err != nil {
262
+
if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
234
263
l.Error("failed to add repo to enforcer", "error", err)
235
264
return fmt.Errorf("failed to add repo: %w", err)
236
265
}
···
240
269
return nil
241
270
}
242
271
243
-
func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, repo syntax.ATURI) error {
272
+
func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
244
273
l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
245
274
246
275
l.Info("fetching and adding existing collaborators")
247
276
248
-
ident, err := s.res.ResolveIdent(ctx, repo.Authority().String())
249
-
if err != nil || ident.Handle.IsInvalidHandle() {
250
-
return fmt.Errorf("failed to resolve handle: %w", err)
251
-
}
252
-
253
277
xrpcc := xrpc.Client{
254
-
Host: ident.PDSEndpoint(),
278
+
Host: owner.PDSEndpoint(),
255
279
}
256
280
257
-
resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, ident.DID.String(), false)
281
+
resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
258
282
if err != nil {
259
283
return err
260
284
}
···
266
290
}
267
291
record := r.Value.Val.(*tangled.RepoCollaborator)
268
292
269
-
if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil {
293
+
if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
270
294
l.Error("failed to add repo to enforcer", "error", err)
271
295
errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
272
296
}
+1
-1
spindle/models/pipeline_env.go
+1
-1
spindle/models/pipeline_env.go
+51
-11
spindle/server.go
+51
-11
spindle/server.go
···
17
17
"tangled.org/core/jetstream"
18
18
"tangled.org/core/log"
19
19
"tangled.org/core/notifier"
20
-
"tangled.org/core/rbac2"
20
+
"tangled.org/core/rbac"
21
21
"tangled.org/core/spindle/config"
22
22
"tangled.org/core/spindle/db"
23
23
"tangled.org/core/spindle/engine"
···
32
32
//go:embed motd
33
33
var motd []byte
34
34
35
+
const (
36
+
rbacDomain = "thisserver"
37
+
)
38
+
35
39
type Spindle struct {
36
40
jc *jetstream.JetstreamClient
37
41
db *db.DB
38
-
e *rbac2.Enforcer
42
+
e *rbac.Enforcer
39
43
l *slog.Logger
40
44
n *notifier.Notifier
41
45
engs map[string]models.Engine
···
55
59
return nil, fmt.Errorf("failed to setup db: %w", err)
56
60
}
57
61
58
-
e, err := rbac2.NewEnforcer(cfg.Server.DBPath)
62
+
e, err := rbac.NewEnforcer(cfg.Server.DBPath)
59
63
if err != nil {
60
64
return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
61
65
}
66
+
e.E.EnableAutoSave(true)
62
67
63
68
n := notifier.New()
64
69
···
99
104
if err != nil {
100
105
return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
101
106
}
102
-
jc.AddDid(cfg.Server.Owner.String())
107
+
jc.AddDid(cfg.Server.Owner)
103
108
104
109
// Check if the spindle knows about any Dids;
105
110
dids, err := d.GetAllDids()
···
125
130
vault: vault,
126
131
}
127
132
128
-
err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
133
+
err = e.AddSpindle(rbacDomain)
134
+
if err != nil {
135
+
return nil, fmt.Errorf("failed to set rbac domain: %w", err)
136
+
}
137
+
err = spindle.configureOwner()
129
138
if err != nil {
130
139
return nil, err
131
140
}
···
188
197
}
189
198
190
199
// Enforcer returns the RBAC enforcer instance.
191
-
func (s *Spindle) Enforcer() *rbac2.Enforcer {
200
+
func (s *Spindle) Enforcer() *rbac.Enforcer {
192
201
return s.e
193
202
}
194
203
···
259
268
Config: s.cfg,
260
269
Resolver: s.res,
261
270
Vault: s.vault,
262
-
Notifier: s.Notifier(),
263
271
ServiceAuth: serviceAuth,
264
272
}
265
273
···
294
302
tpl.TriggerMetadata.Repo.Repo,
295
303
)
296
304
if err != nil {
297
-
return fmt.Errorf("failed to get repo: %w", err)
305
+
return err
298
306
}
299
307
300
308
pipelineId := models.PipelineId{
···
315
323
Name: w.Name,
316
324
}, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
317
325
if err != nil {
318
-
return fmt.Errorf("db.StatusFailed: %w", err)
326
+
return err
319
327
}
320
328
321
329
continue
···
329
337
330
338
ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
331
339
if err != nil {
332
-
return fmt.Errorf("init workflow: %w", err)
340
+
return err
333
341
}
334
342
335
343
// inject TANGLED_* env vars after InitWorkflow
···
346
354
Name: w.Name,
347
355
}, s.n)
348
356
if err != nil {
349
-
return fmt.Errorf("db.StatusPending: %w", err)
357
+
return err
350
358
}
351
359
}
352
360
}
···
373
381
374
382
return nil
375
383
}
384
+
385
+
func (s *Spindle) configureOwner() error {
386
+
cfgOwner := s.cfg.Server.Owner
387
+
388
+
existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
389
+
if err != nil {
390
+
return err
391
+
}
392
+
393
+
switch len(existing) {
394
+
case 0:
395
+
// no owner configured, continue
396
+
case 1:
397
+
// find existing owner
398
+
existingOwner := existing[0]
399
+
400
+
// no ownership change, this is okay
401
+
if existingOwner == s.cfg.Server.Owner {
402
+
break
403
+
}
404
+
405
+
// remove existing owner
406
+
err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
407
+
if err != nil {
408
+
return nil
409
+
}
410
+
default:
411
+
return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
412
+
}
413
+
414
+
return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
415
+
}
+2
-1
spindle/xrpc/add_secret.go
+2
-1
spindle/xrpc/add_secret.go
···
11
11
"github.com/bluesky-social/indigo/xrpc"
12
12
securejoin "github.com/cyphar/filepath-securejoin"
13
13
"tangled.org/core/api/tangled"
14
+
"tangled.org/core/rbac"
14
15
"tangled.org/core/spindle/secrets"
15
16
xrpcerr "tangled.org/core/xrpc/errors"
16
17
)
···
67
68
return
68
69
}
69
70
70
-
if ok, err := x.Enforcer.IsRepoSettingsWriteAllowed(actorDid, repoAt); !ok || err != nil {
71
+
if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, didPath); !ok || err != nil {
71
72
l.Error("insufficent permissions", "did", actorDid.String())
72
73
writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized)
73
74
return
+2
-1
spindle/xrpc/list_secrets.go
+2
-1
spindle/xrpc/list_secrets.go
···
11
11
"github.com/bluesky-social/indigo/xrpc"
12
12
securejoin "github.com/cyphar/filepath-securejoin"
13
13
"tangled.org/core/api/tangled"
14
+
"tangled.org/core/rbac"
14
15
"tangled.org/core/spindle/secrets"
15
16
xrpcerr "tangled.org/core/xrpc/errors"
16
17
)
···
62
63
return
63
64
}
64
65
65
-
if ok, err := x.Enforcer.IsRepoSettingsWriteAllowed(actorDid, repoAt); !ok || err != nil {
66
+
if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, didPath); !ok || err != nil {
66
67
l.Error("insufficent permissions", "did", actorDid.String())
67
68
writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized)
68
69
return
+1
-1
spindle/xrpc/owner.go
+1
-1
spindle/xrpc/owner.go
-72
spindle/xrpc/pipeline_cancelPipeline.go
-72
spindle/xrpc/pipeline_cancelPipeline.go
···
1
-
package xrpc
2
-
3
-
import (
4
-
"encoding/json"
5
-
"fmt"
6
-
"net/http"
7
-
"strings"
8
-
9
-
"github.com/bluesky-social/indigo/atproto/syntax"
10
-
"tangled.org/core/api/tangled"
11
-
"tangled.org/core/spindle/models"
12
-
xrpcerr "tangled.org/core/xrpc/errors"
13
-
)
14
-
15
-
func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) {
16
-
l := x.Logger
17
-
fail := func(e xrpcerr.XrpcError) {
18
-
l.Error("failed", "kind", e.Tag, "error", e.Message)
19
-
writeError(w, e, http.StatusBadRequest)
20
-
}
21
-
l.Debug("cancel pipeline")
22
-
23
-
actorDid, ok := r.Context().Value(ActorDid).(syntax.DID)
24
-
if !ok {
25
-
fail(xrpcerr.MissingActorDidError)
26
-
return
27
-
}
28
-
29
-
var input tangled.PipelineCancelPipeline_Input
30
-
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
31
-
fail(xrpcerr.GenericError(err))
32
-
return
33
-
}
34
-
35
-
aturi := syntax.ATURI(input.Pipeline)
36
-
wid := models.WorkflowId{
37
-
PipelineId: models.PipelineId{
38
-
Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"),
39
-
Rkey: aturi.RecordKey().String(),
40
-
},
41
-
Name: input.Workflow,
42
-
}
43
-
l.Debug("cancel pipeline", "wid", wid)
44
-
45
-
// unfortunately we have to resolve repo-at here
46
-
repoAt, err := syntax.ParseATURI(input.Repo)
47
-
if err != nil {
48
-
fail(xrpcerr.InvalidRepoError(input.Repo))
49
-
return
50
-
}
51
-
52
-
isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid, repoAt)
53
-
if err != nil || !isRepoOwner {
54
-
fail(xrpcerr.AccessControlError(actorDid.String()))
55
-
return
56
-
}
57
-
for _, engine := range x.Engines {
58
-
l.Debug("destorying workflow", "wid", wid)
59
-
err = engine.DestroyWorkflow(r.Context(), wid)
60
-
if err != nil {
61
-
fail(xrpcerr.GenericError(fmt.Errorf("dailed to destroy workflow: %w", err)))
62
-
return
63
-
}
64
-
err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier)
65
-
if err != nil {
66
-
fail(xrpcerr.GenericError(fmt.Errorf("dailed to emit status failed: %w", err)))
67
-
return
68
-
}
69
-
}
70
-
71
-
w.WriteHeader(http.StatusOK)
72
-
}
+2
-1
spindle/xrpc/remove_secret.go
+2
-1
spindle/xrpc/remove_secret.go
···
10
10
"github.com/bluesky-social/indigo/xrpc"
11
11
securejoin "github.com/cyphar/filepath-securejoin"
12
12
"tangled.org/core/api/tangled"
13
+
"tangled.org/core/rbac"
13
14
"tangled.org/core/spindle/secrets"
14
15
xrpcerr "tangled.org/core/xrpc/errors"
15
16
)
···
61
62
return
62
63
}
63
64
64
-
if ok, err := x.Enforcer.IsRepoSettingsWriteAllowed(actorDid, repoAt); !ok || err != nil {
65
+
if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, didPath); !ok || err != nil {
65
66
l.Error("insufficent permissions", "did", actorDid.String())
66
67
writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized)
67
68
return
+2
-5
spindle/xrpc/xrpc.go
+2
-5
spindle/xrpc/xrpc.go
···
10
10
11
11
"tangled.org/core/api/tangled"
12
12
"tangled.org/core/idresolver"
13
-
"tangled.org/core/notifier"
14
-
"tangled.org/core/rbac2"
13
+
"tangled.org/core/rbac"
15
14
"tangled.org/core/spindle/config"
16
15
"tangled.org/core/spindle/db"
17
16
"tangled.org/core/spindle/models"
···
25
24
type Xrpc struct {
26
25
Logger *slog.Logger
27
26
Db *db.DB
28
-
Enforcer *rbac2.Enforcer
27
+
Enforcer *rbac.Enforcer
29
28
Engines map[string]models.Engine
30
29
Config *config.Config
31
30
Resolver *idresolver.Resolver
32
31
Vault secrets.Manager
33
-
Notifier *notifier.Notifier
34
32
ServiceAuth *serviceauth.ServiceAuth
35
33
}
36
34
···
43
41
r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret)
44
42
r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret)
45
43
r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets)
46
-
r.Post("/"+tangled.PipelineCancelPipelineNSID, x.CancelPipeline)
47
44
})
48
45
49
46
// service query endpoints (no auth required)
-18
tap/simpleIndexer.go
-18
tap/simpleIndexer.go
···
1
-
package tap
2
-
3
-
import "context"
4
-
5
-
type SimpleIndexer struct {
6
-
EventHandler func(ctx context.Context, evt Event) error
7
-
ErrorHandler func(ctx context.Context, err error)
8
-
}
9
-
10
-
var _ Handler = (*SimpleIndexer)(nil)
11
-
12
-
func (i *SimpleIndexer) OnEvent(ctx context.Context, evt Event) error {
13
-
return i.EventHandler(ctx, evt)
14
-
}
15
-
16
-
func (i *SimpleIndexer) OnError(ctx context.Context, err error) {
17
-
i.ErrorHandler(ctx, err)
18
-
}
-169
tap/tap.go
-169
tap/tap.go
···
1
-
/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md>
2
-
3
-
package tap
4
-
5
-
import (
6
-
"bytes"
7
-
"context"
8
-
"encoding/json"
9
-
"fmt"
10
-
"net/http"
11
-
"net/url"
12
-
13
-
"github.com/bluesky-social/indigo/atproto/syntax"
14
-
"github.com/gorilla/websocket"
15
-
"tangled.org/core/log"
16
-
)
17
-
18
-
// type WebsocketOptions struct {
19
-
// maxReconnectSeconds int
20
-
// heartbeatIntervalMs int
21
-
// // onReconnectError
22
-
// }
23
-
24
-
type Handler interface {
25
-
OnEvent(ctx context.Context, evt Event) error
26
-
OnError(ctx context.Context, err error)
27
-
}
28
-
29
-
type Client struct {
30
-
Url string
31
-
AdminPassword string
32
-
HTTPClient *http.Client
33
-
}
34
-
35
-
func NewClient(url, adminPassword string) Client {
36
-
return Client{
37
-
Url: url,
38
-
AdminPassword: adminPassword,
39
-
HTTPClient: &http.Client{},
40
-
}
41
-
}
42
-
43
-
func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error {
44
-
body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
45
-
if err != nil {
46
-
return err
47
-
}
48
-
req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body))
49
-
if err != nil {
50
-
return err
51
-
}
52
-
req.SetBasicAuth("admin", c.AdminPassword)
53
-
req.Header.Set("Content-Type", "application/json")
54
-
55
-
resp, err := c.HTTPClient.Do(req)
56
-
if err != nil {
57
-
return err
58
-
}
59
-
defer resp.Body.Close()
60
-
if resp.StatusCode != http.StatusOK {
61
-
return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode)
62
-
}
63
-
return nil
64
-
}
65
-
66
-
func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error {
67
-
body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
68
-
if err != nil {
69
-
return err
70
-
}
71
-
req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body))
72
-
if err != nil {
73
-
return err
74
-
}
75
-
req.SetBasicAuth("admin", c.AdminPassword)
76
-
req.Header.Set("Content-Type", "application/json")
77
-
78
-
resp, err := c.HTTPClient.Do(req)
79
-
if err != nil {
80
-
return err
81
-
}
82
-
defer resp.Body.Close()
83
-
if resp.StatusCode != http.StatusOK {
84
-
return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode)
85
-
}
86
-
return nil
87
-
}
88
-
89
-
func (c *Client) Connect(ctx context.Context, handler Handler) error {
90
-
l := log.FromContext(ctx)
91
-
92
-
u, err := url.Parse(c.Url)
93
-
if err != nil {
94
-
return err
95
-
}
96
-
if u.Scheme == "https" {
97
-
u.Scheme = "wss"
98
-
} else {
99
-
u.Scheme = "ws"
100
-
}
101
-
u.Path = "/channel"
102
-
103
-
// TODO: set auth on dial
104
-
105
-
url := u.String()
106
-
107
-
// var backoff int
108
-
// for {
109
-
// select {
110
-
// case <-ctx.Done():
111
-
// return ctx.Err()
112
-
// default:
113
-
// }
114
-
//
115
-
// header := http.Header{
116
-
// "Authorization": []string{""},
117
-
// }
118
-
// conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
119
-
// if err != nil {
120
-
// l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
121
-
// time.Sleep(time.Duration(5+backoff) * time.Second)
122
-
// backoff++
123
-
//
124
-
// continue
125
-
// } else {
126
-
// backoff = 0
127
-
// }
128
-
//
129
-
// l.Info("event subscription response", "code", res.StatusCode)
130
-
// }
131
-
132
-
// TODO: keep websocket connection alive
133
-
conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil)
134
-
if err != nil {
135
-
return err
136
-
}
137
-
defer conn.Close()
138
-
139
-
for {
140
-
select {
141
-
case <-ctx.Done():
142
-
return ctx.Err()
143
-
default:
144
-
}
145
-
_, message, err := conn.ReadMessage()
146
-
if err != nil {
147
-
return err
148
-
}
149
-
150
-
var ev Event
151
-
if err := json.Unmarshal(message, &ev); err != nil {
152
-
handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err))
153
-
continue
154
-
}
155
-
if err := handler.OnEvent(ctx, ev); err != nil {
156
-
handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err))
157
-
continue
158
-
}
159
-
160
-
ack := map[string]any{
161
-
"type": "ack",
162
-
"id": ev.ID,
163
-
}
164
-
if err := conn.WriteJSON(ack); err != nil {
165
-
l.Warn("failed to send ack", "err", err)
166
-
continue
167
-
}
168
-
}
169
-
}
-62
tap/types.go
-62
tap/types.go
···
1
-
package tap
2
-
3
-
import (
4
-
"encoding/json"
5
-
"fmt"
6
-
7
-
"github.com/bluesky-social/indigo/atproto/syntax"
8
-
)
9
-
10
-
type EventType string
11
-
12
-
const (
13
-
EvtRecord EventType = "record"
14
-
EvtIdentity EventType = "identity"
15
-
)
16
-
17
-
type Event struct {
18
-
ID int64 `json:"id"`
19
-
Type EventType `json:"type"`
20
-
Record *RecordEventData `json:"record,omitempty"`
21
-
Identity *IdentityEventData `json:"identity,omitempty"`
22
-
}
23
-
24
-
type RecordEventData struct {
25
-
Live bool `json:"live"`
26
-
Did syntax.DID `json:"did"`
27
-
Rev string `json:"rev"`
28
-
Collection syntax.NSID `json:"collection"`
29
-
Rkey syntax.RecordKey `json:"rkey"`
30
-
Action RecordAction `json:"action"`
31
-
Record json.RawMessage `json:"record,omitempty"`
32
-
CID *syntax.CID `json:"cid,omitempty"`
33
-
}
34
-
35
-
func (r *RecordEventData) AtUri() syntax.ATURI {
36
-
return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, r.Collection, r.Rkey))
37
-
}
38
-
39
-
type RecordAction string
40
-
41
-
const (
42
-
RecordCreateAction RecordAction = "create"
43
-
RecordUpdateAction RecordAction = "update"
44
-
RecordDeleteAction RecordAction = "delete"
45
-
)
46
-
47
-
type IdentityEventData struct {
48
-
DID syntax.DID `json:"did"`
49
-
Handle string `json:"handle"`
50
-
IsActive bool `json:"is_active"`
51
-
Status RepoStatus `json:"status"`
52
-
}
53
-
54
-
type RepoStatus string
55
-
56
-
const (
57
-
RepoStatusActive RepoStatus = "active"
58
-
RepoStatusTakendown RepoStatus = "takendown"
59
-
RepoStatusSuspended RepoStatus = "suspended"
60
-
RepoStatusDeactivated RepoStatus = "deactivated"
61
-
RepoStatusDeleted RepoStatus = "deleted"
62
-
)