···2121 <div class="col-span-1 md:col-span-2">
2222 <h2 class="text-sm pb-2 uppercase font-bold">SSH Keys</h2>
2323 <p class="text-gray-500 dark:text-gray-400">
2424- SSH public keys added here will be broadcasted to knots that you are a member of,
2424+ SSH public keys added here will be broadcasted to knots that you are a member of,
2525 allowing you to push to repositories there.
2626 </p>
2727 </div>
···6363 hx-swap="none"
6464 class="flex flex-col gap-2"
6565>
6666- <p class="uppercase p-0">ADD SSH KEY</p>
6666+ <label for="key-name" class="uppercase p-0">
6767+ add ssh key
6868+ </label>
6769 <p class="text-sm text-gray-500 dark:text-gray-400">SSH keys allow you to push to repositories in knots you're a member of.</p>
6870 <input
6971 type="text"
+43-60
appview/pages/templates/user/signup.html
···11-{{ define "user/signup" }}
22- <!doctype html>
33- <html lang="en" class="dark:bg-gray-900">
44- <head>
55- <meta charset="UTF-8" />
66- <meta name="viewport" content="width=device-width, initial-scale=1.0" />
77- <meta property="og:title" content="signup ยท tangled" />
88- <meta property="og:url" content="https://tangled.org/signup" />
99- <meta property="og:description" content="sign up for tangled" />
1010- <script src="/static/htmx.min.js"></script>
1111- <link rel="manifest" href="/pwa-manifest.json" />
1212- <link rel="stylesheet" href="/static/tw.css?{{ cssContentHash }}" type="text/css" />
1313- <title>sign up · tangled</title>
11+{{ define "title" }} signup {{ end }}
1421515- <script src="https://challenges.cloudflare.com/turnstile/v0/api.js" async defer></script>
1616- </head>
1717- <body class="flex items-center justify-center min-h-screen">
1818- <main class="max-w-md px-6 -mt-4">
1919- <h1 class="flex place-content-center text-2xl font-semibold italic dark:text-white" >
2020- {{ template "fragments/logotype" }}
2121- </h1>
2222- <h2 class="text-center text-xl italic dark:text-white">tightly-knit social coding.</h2>
2323- <form
2424- class="mt-4 max-w-sm mx-auto"
2525- hx-post="/signup"
2626- hx-swap="none"
2727- hx-disabled-elt="#signup-button"
2828- >
2929- <div class="flex flex-col mt-2">
3030- <label for="email">email</label>
3131- <input
3232- type="email"
3333- id="email"
3434- name="email"
3535- tabindex="4"
3636- required
3737- placeholder="jason@bourne.co"
3838- />
3939- </div>
4040- <span class="text-sm text-gray-500 mt-1">
4141- You will receive an email with an invite code. Enter your
4242- invite code, desired username, and password in the next
4343- page to complete your registration.
4444- </span>
4545- <div class="w-full mt-4 text-center">
4646- <div class="cf-turnstile" data-sitekey="{{ .CloudflareSiteKey }}" data-size="flexible"></div>
4747- </div>
4848- <button class="btn text-base w-full my-2 mt-6" type="submit" id="signup-button" tabindex="7" >
4949- <span>join now</span>
5050- </button>
5151- <p class="text-sm text-gray-500">
5252- Already have an AT Protocol account? <a href="/login" class="underline">Login to Tangled</a>.
5353- </p>
33+{{ define "extrameta" }}
44+ <script src="https://challenges.cloudflare.com/turnstile/v0/api.js" async defer></script>
55+{{ end }}
66+77+{{ define "content" }}
88+ <form
99+ class="mt-4 max-w-sm mx-auto group"
1010+ hx-post="/signup"
1111+ hx-swap="none"
1212+ hx-disabled-elt="#signup-button"
1313+ >
1414+ <div class="flex flex-col mt-2">
1515+ <label for="email">email</label>
1616+ <input
1717+ type="email"
1818+ id="email"
1919+ name="email"
2020+ tabindex="4"
2121+ required
2222+ placeholder="jason@bourne.co"
2323+ />
2424+ </div>
2525+ <span class="text-sm text-gray-500 mt-1">
2626+ You will receive an email with an invite code. Enter your
2727+ invite code, desired username, and password in the next
2828+ page to complete your registration.
2929+ </span>
3030+ <div class="w-full mt-4 text-center">
3131+ <div class="cf-turnstile" data-sitekey="{{ .CloudflareSiteKey }}" data-size="flexible"></div>
3232+ </div>
3333+ <button class="btn text-base w-full my-2 mt-6" type="submit" id="signup-button" tabindex="7" >
3434+ {{ i "loader-circle" "size-4 animate-spin hidden group-[.htmx-request]:inline" }}
3535+ <span class="inline group-[.htmx-request]:hidden">join now</span>
3636+ </button>
3737+ <p class="text-sm text-gray-500">
3838+ Already have an AT Protocol account? <a href="/login" class="underline">Login to Tangled</a>.
3939+ </p>
54405555- <p id="signup-msg" class="error w-full"></p>
5656- <p class="text-sm text-gray-500 pt-4">
5757- By signing up, you agree to our <a href="/terms" class="underline">Terms of Service</a> and <a href="/privacy" class="underline">Privacy Policy</a>.
5858- </p>
5959- </form>
6060- </main>
6161- </body>
6262- </html>
4141+ <p id="signup-msg" class="error w-full"></p>
4242+ <p class="text-sm text-gray-500 pt-4">
4343+ By signing up, you agree to our <a href="/terms" class="underline">Terms of Service</a> and <a href="/privacy" class="underline">Privacy Policy</a>.
4444+ </p>
4545+ </form>
6346{{ end }}
···11-# how to setup local appview dev environment
22-33-Appview requires several microservices from knot and spindle to entire atproto infra. This test environment is implemented under nixos vm.
44-55-1. copy `contrib/example.env` to `.env`, fill it and source it
66-2. run vm
77- ```bash
88- nix run --impure .#vm
99- ```
1010-3. trust the generated cert from host machine
1111- ```bash
1212- # for macos
1313- sudo security add-trusted-cert -d -r trustRoot \
1414- -k /Library/Keychains/System.keychain \
1515- ./nix/vm-data/caddy/.local/share/caddy/pki/authorities/local/root.crt
1616- ```
1717-4. create test accounts with valid emails (use [`create-test-account.sh`](./scripts/create-test-account.sh))
1818-5. create default labels (use [`setup-const-records`](./scripts/setup-const-records.sh))
1919-6. restart vm with correct owner-did
2020-2121-for git-https, you should change your local git config:
2222-```
2323-[http "https://knot.tngl.boltless.dev"]
2424- sslCAPath = /Users/boltless/repo/tangled/nix/vm-data/caddy/.local/share/caddy/pki/authorities/local/
2525-```
-68
contrib/scripts/create-test-account.sh
···11-#!/bin/bash
22-set -o errexit
33-set -o nounset
44-set -o pipefail
55-66-source "$(dirname "$0")/../pds.env"
77-88-# PDS_HOSTNAME=
99-# PDS_ADMIN_PASSWORD=
1010-1111-# curl a URL and fail if the request fails.
1212-function curl_cmd_get {
1313- curl --fail --silent --show-error "$@"
1414-}
1515-1616-# curl a URL and fail if the request fails.
1717-function curl_cmd_post {
1818- curl --fail --silent --show-error --request POST --header "Content-Type: application/json" "$@"
1919-}
2020-2121-# curl a URL but do not fail if the request fails.
2222-function curl_cmd_post_nofail {
2323- curl --silent --show-error --request POST --header "Content-Type: application/json" "$@"
2424-}
2525-2626-USERNAME="${1:-}"
2727-2828-if [[ "${USERNAME}" == "" ]]; then
2929- read -p "Enter a username: " USERNAME
3030-fi
3131-3232-if [[ "${USERNAME}" == "" ]]; then
3333- echo "ERROR: missing USERNAME parameter." >/dev/stderr
3434- echo "Usage: $0 ${SUBCOMMAND} <USERNAME>" >/dev/stderr
3535- exit 1
3636-fi
3737-3838-EMAIL=${USERNAME}@${PDS_HOSTNAME}
3939-4040-PASSWORD="password"
4141-INVITE_CODE="$(curl_cmd_post \
4242- --user "admin:${PDS_ADMIN_PASSWORD}" \
4343- --data '{"useCount": 1}' \
4444- "https://${PDS_HOSTNAME}/xrpc/com.atproto.server.createInviteCode" | jq --raw-output '.code'
4545-)"
4646-RESULT="$(curl_cmd_post_nofail \
4747- --data "{\"email\":\"${EMAIL}\", \"handle\":\"${USERNAME}.${PDS_HOSTNAME}\", \"password\":\"${PASSWORD}\", \"inviteCode\":\"${INVITE_CODE}\"}" \
4848- "https://${PDS_HOSTNAME}/xrpc/com.atproto.server.createAccount"
4949-)"
5050-5151-DID="$(echo $RESULT | jq --raw-output '.did')"
5252-if [[ "${DID}" != did:* ]]; then
5353- ERR="$(echo ${RESULT} | jq --raw-output '.message')"
5454- echo "ERROR: ${ERR}" >/dev/stderr
5555- echo "Usage: $0 <EMAIL> <HANDLE>" >/dev/stderr
5656- exit 1
5757-fi
5858-5959-echo
6060-echo "Account created successfully!"
6161-echo "-----------------------------"
6262-echo "Handle : ${USERNAME}.${PDS_HOSTNAME}"
6363-echo "DID : ${DID}"
6464-echo "Password : ${PASSWORD}"
6565-echo "-----------------------------"
6666-echo "This is a test account with an insecure password."
6767-echo "Make sure it's only used for development."
6868-echo
···375375KNOT_SERVER_LISTEN_ADDR=127.0.0.1:5555
376376```
377377378378-If you run a Linux distribution that uses systemd, you can use the provided
379379-service file to run the server. Copy
380380-[`knotserver.service`](/systemd/knotserver.service)
378378+If you run a Linux distribution that uses systemd, you can
379379+use the provided service file to run the server. Copy
380380+[`knotserver.service`](https://tangled.org/tangled.org/core/blob/master/systemd/knotserver.service)
381381to `/etc/systemd/system/`. Then, run:
382382383383```
···501501502502Note that you should add a newline at the end if setting a non-empty message
503503since the knot won't do this for you.
504504+505505+## Troubleshooting
506506+507507+If you run your own knot, you may run into some of these
508508+common issues. You can always join the
509509+[IRC](https://web.libera.chat/#tangled) or
510510+[Discord](https://chat.tangled.org/) if this section does
511511+not help.
512512+513513+### Unable to push
514514+515515+If you are unable to push to your knot or repository:
516516+517517+1. First, ensure that you have added your SSH public key to
518518+ your account
519519+2. Check to see that your knot has synced the key by running
520520+ `knot keys`
521521+3. Check to see if git is supplying the correct private key
522522+ when pushing: `GIT_SSH_COMMAND="ssh -v" git push ...`
523523+4. Check to see if `sshd` on the knot is rejecting the push
524524+ for some reason: `journalctl -xeu ssh` (or `sshd`,
525525+ depending on your machine). These logs are unavailable if
526526+ using docker.
527527+5. Check to see if the knot itself is rejecting the push,
528528+ depending on your setup, the logs might be in one of the
529529+ following paths:
530530+ * `/tmp/knotguard.log`
531531+ * `/home/git/log`
532532+ * `/home/git/guard.log`
504533505534# Spindles
506535···15611590Refer to the [jujutsu
15621591documentation](https://jj-vcs.github.io/jj/latest/config/#commit-trailers)
15631592for more information.
15931593+15941594+# Troubleshooting guide
15951595+15961596+## Login issues
15971597+15981598+Owing to the distributed nature of OAuth on AT Protocol, you
15991599+may run into issues with logging in. If you run a
16001600+self-hosted PDS:
16011601+16021602+- You may need to ensure that your PDS is timesynced using
16031603+ NTP:
16041604+ * Enable the `ntpd` service
16051605+ * Run `ntpd -qg` to synchronize your clock
16061606+- You may need to increase the default request timeout:
16071607+ `NODE_OPTIONS="--network-family-autoselection-attempt-timeout=500"`
16081608+16091609+## Empty punchcard
16101610+16111611+For Tangled to register commits that you make across the
16121612+network, you need to setup one of following:
16131613+16141614+- The committer email should be a verified email associated
16151615+ to your account. You can add and verify emails on the
16161616+ settings page.
16171617+- Or, the committer email should be set to your account's
16181618+ DID: `git config user.email "did:plc:foobar". You can find
16191619+ your account's DID on the settings page
16201620+16211621+## Commit is not marked as verified
16221622+16231623+Presently, Tangled only supports SSH commit signatures.
16241624+16251625+To sign commits using an SSH key with git:
16261626+16271627+```
16281628+git config --global gpg.format ssh
16291629+git config --global user.signingkey ~/.ssh/tangled-key
16301630+```
16311631+16321632+To sign commits using an SSH key with jj, add this to your
16331633+config:
16341634+16351635+```
16361636+[signing]
16371637+behavior = "own"
16381638+backend = "ssh"
16391639+key = "~/.ssh/tangled-key"
16401640+```
16411641+16421642+## Self-hosted knot issues
16431643+16441644+If you need help troubleshooting a self-hosted knot, check
16451645+out the [knot troubleshooting
16461646+guide](/knot-self-hosting-guide.html#troubleshooting).
···11+package spindle
22+33+import (
44+ "context"
55+ "encoding/json"
66+ "errors"
77+ "fmt"
88+ "time"
99+1010+ "tangled.org/core/api/tangled"
1111+ "tangled.org/core/eventconsumer"
1212+ "tangled.org/core/rbac"
1313+ "tangled.org/core/spindle/db"
1414+1515+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1616+ "github.com/bluesky-social/indigo/atproto/identity"
1717+ "github.com/bluesky-social/indigo/atproto/syntax"
1818+ "github.com/bluesky-social/indigo/xrpc"
1919+ "github.com/bluesky-social/jetstream/pkg/models"
2020+ securejoin "github.com/cyphar/filepath-securejoin"
2121+)
2222+2323+type Ingester func(ctx context.Context, e *models.Event) error
2424+2525+func (s *Spindle) ingest() Ingester {
2626+ return func(ctx context.Context, e *models.Event) error {
2727+ var err error
2828+ defer func() {
2929+ eventTime := e.TimeUS
3030+ lastTimeUs := eventTime + 1
3131+ if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
3232+ err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
3333+ }
3434+ }()
3535+3636+ if e.Kind != models.EventKindCommit {
3737+ return nil
3838+ }
3939+4040+ switch e.Commit.Collection {
4141+ case tangled.SpindleMemberNSID:
4242+ err = s.ingestMember(ctx, e)
4343+ case tangled.RepoNSID:
4444+ err = s.ingestRepo(ctx, e)
4545+ case tangled.RepoCollaboratorNSID:
4646+ err = s.ingestCollaborator(ctx, e)
4747+ }
4848+4949+ if err != nil {
5050+ s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err)
5151+ }
5252+5353+ return nil
5454+ }
5555+}
5656+5757+func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
5858+ var err error
5959+ did := e.Did
6060+ rkey := e.Commit.RKey
6161+6262+ l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
6363+6464+ switch e.Commit.Operation {
6565+ case models.CommitOperationCreate, models.CommitOperationUpdate:
6666+ raw := e.Commit.Record
6767+ record := tangled.SpindleMember{}
6868+ err = json.Unmarshal(raw, &record)
6969+ if err != nil {
7070+ l.Error("invalid record", "error", err)
7171+ return err
7272+ }
7373+7474+ domain := s.cfg.Server.Hostname
7575+ recordInstance := record.Instance
7676+7777+ if recordInstance != domain {
7878+ l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
7979+ return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
8080+ }
8181+8282+ ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
8383+ if err != nil || !ok {
8484+ l.Error("failed to add member", "did", did, "error", err)
8585+ return fmt.Errorf("failed to enforce permissions: %w", err)
8686+ }
8787+8888+ if err := db.AddSpindleMember(s.db, db.SpindleMember{
8989+ Did: syntax.DID(did),
9090+ Rkey: rkey,
9191+ Instance: recordInstance,
9292+ Subject: syntax.DID(record.Subject),
9393+ Created: time.Now(),
9494+ }); err != nil {
9595+ l.Error("failed to add member", "error", err)
9696+ return fmt.Errorf("failed to add member: %w", err)
9797+ }
9898+9999+ if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
100100+ l.Error("failed to add member", "error", err)
101101+ return fmt.Errorf("failed to add member: %w", err)
102102+ }
103103+ l.Info("added member from firehose", "member", record.Subject)
104104+105105+ if err := s.db.AddDid(record.Subject); err != nil {
106106+ l.Error("failed to add did", "error", err)
107107+ return fmt.Errorf("failed to add did: %w", err)
108108+ }
109109+ s.jc.AddDid(record.Subject)
110110+111111+ return nil
112112+113113+ case models.CommitOperationDelete:
114114+ record, err := db.GetSpindleMember(s.db, did, rkey)
115115+ if err != nil {
116116+ l.Error("failed to find member", "error", err)
117117+ return fmt.Errorf("failed to find member: %w", err)
118118+ }
119119+120120+ if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
121121+ l.Error("failed to remove member", "error", err)
122122+ return fmt.Errorf("failed to remove member: %w", err)
123123+ }
124124+125125+ if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil {
126126+ l.Error("failed to add member", "error", err)
127127+ return fmt.Errorf("failed to add member: %w", err)
128128+ }
129129+ l.Info("added member from firehose", "member", record.Subject)
130130+131131+ if err := s.db.RemoveDid(record.Subject.String()); err != nil {
132132+ l.Error("failed to add did", "error", err)
133133+ return fmt.Errorf("failed to add did: %w", err)
134134+ }
135135+ s.jc.RemoveDid(record.Subject.String())
136136+137137+ }
138138+ return nil
139139+}
140140+141141+func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error {
142142+ var err error
143143+ did := e.Did
144144+145145+ l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
146146+147147+ l.Info("ingesting repo record", "did", did)
148148+149149+ switch e.Commit.Operation {
150150+ case models.CommitOperationCreate, models.CommitOperationUpdate:
151151+ raw := e.Commit.Record
152152+ record := tangled.Repo{}
153153+ err = json.Unmarshal(raw, &record)
154154+ if err != nil {
155155+ l.Error("invalid record", "error", err)
156156+ return err
157157+ }
158158+159159+ domain := s.cfg.Server.Hostname
160160+161161+ // no spindle configured for this repo
162162+ if record.Spindle == nil {
163163+ l.Info("no spindle configured", "name", record.Name)
164164+ return nil
165165+ }
166166+167167+ // this repo did not want this spindle
168168+ if *record.Spindle != domain {
169169+ l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
170170+ return nil
171171+ }
172172+173173+ // add this repo to the watch list
174174+ if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil {
175175+ l.Error("failed to add repo", "error", err)
176176+ return fmt.Errorf("failed to add repo: %w", err)
177177+ }
178178+179179+ didSlashRepo, err := securejoin.SecureJoin(did, record.Name)
180180+ if err != nil {
181181+ return err
182182+ }
183183+184184+ // add repo to rbac
185185+ if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil {
186186+ l.Error("failed to add repo to enforcer", "error", err)
187187+ return fmt.Errorf("failed to add repo: %w", err)
188188+ }
189189+190190+ // add collaborators to rbac
191191+ owner, err := s.res.ResolveIdent(ctx, did)
192192+ if err != nil || owner.Handle.IsInvalidHandle() {
193193+ return err
194194+ }
195195+ if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil {
196196+ return err
197197+ }
198198+199199+ // add this knot to the event consumer
200200+ src := eventconsumer.NewKnotSource(record.Knot)
201201+ s.ks.AddSource(context.Background(), src)
202202+203203+ return nil
204204+205205+ }
206206+ return nil
207207+}
208208+209209+func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
210210+ var err error
211211+212212+ l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
213213+214214+ l.Info("ingesting collaborator record")
215215+216216+ switch e.Commit.Operation {
217217+ case models.CommitOperationCreate, models.CommitOperationUpdate:
218218+ raw := e.Commit.Record
219219+ record := tangled.RepoCollaborator{}
220220+ err = json.Unmarshal(raw, &record)
221221+ if err != nil {
222222+ l.Error("invalid record", "error", err)
223223+ return err
224224+ }
225225+226226+ subjectId, err := s.res.ResolveIdent(ctx, record.Subject)
227227+ if err != nil || subjectId.Handle.IsInvalidHandle() {
228228+ return err
229229+ }
230230+231231+ repoAt, err := syntax.ParseATURI(record.Repo)
232232+ if err != nil {
233233+ l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo)
234234+ return nil
235235+ }
236236+237237+ // TODO: get rid of this entirely
238238+ // resolve this aturi to extract the repo record
239239+ owner, err := s.res.ResolveIdent(ctx, repoAt.Authority().String())
240240+ if err != nil || owner.Handle.IsInvalidHandle() {
241241+ return fmt.Errorf("failed to resolve handle: %w", err)
242242+ }
243243+244244+ xrpcc := xrpc.Client{
245245+ Host: owner.PDSEndpoint(),
246246+ }
247247+248248+ resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
249249+ if err != nil {
250250+ return err
251251+ }
252252+253253+ repo := resp.Value.Val.(*tangled.Repo)
254254+ didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
255255+256256+ // check perms for this user
257257+ if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
258258+ return fmt.Errorf("insufficient permissions: %w", err)
259259+ }
260260+261261+ // add collaborator to rbac
262262+ if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
263263+ l.Error("failed to add repo to enforcer", "error", err)
264264+ return fmt.Errorf("failed to add repo: %w", err)
265265+ }
266266+267267+ return nil
268268+ }
269269+ return nil
270270+}
271271+272272+func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
273273+ l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
274274+275275+ l.Info("fetching and adding existing collaborators")
276276+277277+ xrpcc := xrpc.Client{
278278+ Host: owner.PDSEndpoint(),
279279+ }
280280+281281+ resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
282282+ if err != nil {
283283+ return err
284284+ }
285285+286286+ var errs error
287287+ for _, r := range resp.Records {
288288+ if r == nil {
289289+ continue
290290+ }
291291+ record := r.Value.Val.(*tangled.RepoCollaborator)
292292+293293+ if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
294294+ l.Error("failed to add repo to enforcer", "error", err)
295295+ errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
296296+ }
297297+ }
298298+299299+ return errs
300300+}
-93
spindle/models/adapter.go
···11-package models
22-33-import (
44- "context"
55-66- "github.com/bluesky-social/indigo/atproto/syntax"
77-)
88-99-// Adapter is the core of the spindle. It can use its own way to configure and
1010-// run the workflows. The workflow definition can be either yaml files in git
1111-// repositories or even from dedicated web UI.
1212-//
1313-// An adapter is expected to be hold all created workflow runs.
1414-type Adapter interface {
1515- // Init intializes the adapter
1616- Init() error
1717-1818- // Shutdown gracefully shuts down background jobs
1919- Shutdown(ctx context.Context) error
2020-2121- // SetupRepo ensures adapter connected to the repository.
2222- // This usually includes adding repository watcher that does sparse-clone.
2323- SetupRepo(ctx context.Context, repo syntax.ATURI) error
2424-2525- // ListWorkflowDefs parses and returns all workflow definitions in the given
2626- // repository at the specified revision
2727- ListWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]WorkflowDef, error)
2828-2929- // EvaluateEvent consumes a trigger event and returns a list of triggered
3030- // workflow runs. It is expected to return immediately after scheduling the
3131- // workflows.
3232- EvaluateEvent(ctx context.Context, event Event) ([]WorkflowRun, error)
3333-3434- // GetActiveWorkflowRun returns current state of specific workflow run.
3535- // This method will be called regularly for active workflow runs.
3636- GetActiveWorkflowRun(ctx context.Context, runId syntax.ATURI) (WorkflowRun, error)
3737-3838-3939-4040-4141- // NOTE: baisically I'm not sure about this method.
4242- // How to properly sync workflow.run states?
4343- //
4444- // for adapters with external engine, they will hold every past
4545- // workflow.run objects.
4646- // for adapters with internal engine, they... should also hold every
4747- // past workflow.run objects..?
4848- //
4949- // problem:
5050- // when spindle suffer downtime (spindle server shutdown),
5151- // external `workflow.run`s might be unsynced in "running" or "pending" state
5252- // same for internal `workflow.run`s.
5353- //
5454- // BUT, spindle itself is holding the runs,
5555- // so it already knows unsynced workflows (=workflows not finished)
5656- // therefore, it can just fetch them again.
5757- // for adapters with internal engines, they will fail to fetch previous
5858- // run.
5959- // Leaving spindle to mark the run as "Lost" or "Failed".
6060- // Because of _lacking_ adaters, spindle should be able to manually
6161- // mark unknown runs with "lost" state.
6262- //
6363- // GetWorkflowRun : used to get background crawling
6464- // XCodeCloud: ok
6565- // Nixery: (will fail if unknown) -> spindle will mark workflow as failed anyways
6666- // StreamWorkflowRun : used to notify real-time updates
6767- // XCodeCloud: ok (but old events will be lost)
6868- // Nixery: same. old events on spindle downtime will be lost
6969- //
7070- //
7171- // To avoid this, each adapters should hold outbox buffer
7272- //
7373- // |
7474- // v
7575-7676- // StreamWorkflowRun(ctx context.Context) <-chan WorkflowRun
7777-7878-7979- // ListActiveWorkflowRuns returns current list of active workflow runs.
8080- // Runs where status is either Pending or Running
8181- ListActiveWorkflowRuns(ctx context.Context) ([]WorkflowRun, error)
8282- SubscribeWorkflowRun(ctx context.Context) <-chan WorkflowRun
8383-8484-8585-8686-8787- // StreamWorkflowRunLogs streams logs for a running workflow execution
8888- StreamWorkflowRunLogs(ctx context.Context, runId syntax.ATURI, handle func(line LogLine) error) error
8989-9090- // CancelWorkflowRun attempts to stop a running workflow execution.
9191- // It won't do anything when the workflow has already completed.
9292- CancelWorkflowRun(ctx context.Context, runId syntax.ATURI) error
9393-}
···11-package models
22-33-import (
44- "fmt"
55- "slices"
66-77- "github.com/bluesky-social/indigo/atproto/syntax"
88- "tangled.org/core/api/tangled"
99-)
1010-1111-// `sh.tangled.ci.event`
1212-type Event struct {
1313- SourceRepo syntax.ATURI // repository to find the workflow definition
1414- SourceSha string // sha to find the workflow definition
1515- TargetSha string // sha to run the workflow
1616- // union type of:
1717- // 1. PullRequestEvent
1818- // 2. PushEvent
1919- // 3. ManualEvent
2020-}
2121-2222-func (e *Event) AsRecord() tangled.CiEvent {
2323- // var meta tangled.CiEvent_Meta
2424- // return tangled.CiEvent{
2525- // Meta: &meta,
2626- // }
2727- panic("unimplemented")
2828-}
2929-3030-// `sh.tangled.ci.pipeline`
3131-//
3232-// Pipeline is basically a group of workflows triggered by single event.
3333-type Pipeline2 struct {
3434- Did syntax.DID
3535- Rkey syntax.RecordKey
3636-3737- Event Event // event that triggered the pipeline
3838- WorkflowRuns []WorkflowRun // workflow runs inside this pipeline
3939-}
4040-4141-func (p *Pipeline2) AtUri() syntax.ATURI {
4242- return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", p.Did, tangled.CiPipelineNSID, p.Rkey))
4343-}
4444-4545-func (p *Pipeline2) AsRecord() tangled.CiPipeline {
4646- event := p.Event.AsRecord()
4747- runs := make([]string, len(p.WorkflowRuns))
4848- for i, run := range p.WorkflowRuns {
4949- runs[i] = run.AtUri().String()
5050- }
5151- return tangled.CiPipeline{
5252- Event: &event,
5353- WorkflowRuns: runs,
5454- }
5555-}
5656-5757-// `sh.tangled.ci.workflow.run`
5858-type WorkflowRun struct {
5959- Did syntax.DID
6060- Rkey syntax.RecordKey
6161-6262- AdapterId string // adapter id
6363- Name string // name of workflow run (not workflow definition name!)
6464- Status WorkflowStatus // workflow status
6565- // TODO: can add some custom fields like adapter-specific log-id
6666-}
6767-6868-func (r WorkflowRun) WithStatus(status WorkflowStatus) WorkflowRun {
6969- return WorkflowRun{
7070- Did: r.Did,
7171- Rkey: r.Rkey,
7272- AdapterId: r.AdapterId,
7373- Name: r.Name,
7474- Status: status,
7575- }
7676-}
7777-7878-func (r *WorkflowRun) AtUri() syntax.ATURI {
7979- return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, tangled.CiWorkflowRunNSID, r.Rkey))
8080-}
8181-8282-func (r *WorkflowRun) AsRecord() tangled.CiWorkflowRun {
8383- statusStr := string(r.Status)
8484- return tangled.CiWorkflowRun{
8585- Adapter: r.AdapterId,
8686- Name: r.Name,
8787- Status: &statusStr,
8888- }
8989-}
9090-9191-// `sh.tangled.ci.workflow.status`
9292-type WorkflowStatus string
9393-9494-var (
9595- WorkflowStatusPending WorkflowStatus = "pending"
9696- WorkflowStatusRunning WorkflowStatus = "running"
9797- WorkflowStatusFailed WorkflowStatus = "failed"
9898- WorkflowStatusCancelled WorkflowStatus = "cancelled"
9999- WorkflowStatusSuccess WorkflowStatus = "success"
100100- WorkflowStatusTimeout WorkflowStatus = "timeout"
101101-102102- activeStatuses [2]WorkflowStatus = [2]WorkflowStatus{
103103- WorkflowStatusPending,
104104- WorkflowStatusRunning,
105105- }
106106-)
107107-108108-func (s WorkflowStatus) IsActive() bool {
109109- return slices.Contains(activeStatuses[:], s)
110110-}
111111-112112-func (s WorkflowStatus) IsFinish() bool {
113113- return !s.IsActive()
114114-}
115115-116116-// `sh.tangled.ci.workflow.def`
117117-//
118118-// Brief information of the workflow definition. A workflow can be defined in
119119-// any form. This is a common info struct for any workflow definitions
120120-type WorkflowDef struct {
121121- AdapterId string // adapter id
122122- Name string // name or the workflow (usually the yml file name)
123123- When any // events the workflow is listening to
124124-}
-40
spindle/pipeline.go
···11-package spindle
22-33-import (
44- "context"
55-66- "tangled.org/core/spindle/models"
77-)
88-99-// createPipeline creates a pipeline from given event.
1010-// It will call `EvaluateEvent` for all adapters, gather the triggered workflow
1111-// runs, and constuct a pipeline record from them. pipeline record. It will
1212-// return nil if no workflow run has triggered.
1313-//
1414-// NOTE: This method won't fail. If `adapter.EvaluateEvent` returns an error,
1515-// the error will be logged but won't bubble-up.
1616-//
1717-// NOTE: Adapters might create sub-event on its own for workflows triggered by
1818-// other workflow runs.
1919-func (s *Spindle) createPipeline(ctx context.Context, event models.Event) (*models.Pipeline2) {
2020- l := s.l
2121-2222- pipeline := models.Pipeline2{
2323- Event: event,
2424- }
2525-2626- // TODO: run in parallel
2727- for id, adapter := range s.adapters {
2828- runs, err := adapter.EvaluateEvent(ctx, event)
2929- if err != nil {
3030- l.Error("failed to process trigger from adapter '%s': %w", id, err)
3131- }
3232- pipeline.WorkflowRuns = append(pipeline.WorkflowRuns, runs...)
3333- }
3434-3535- if len(pipeline.WorkflowRuns) == 0 {
3636- return nil
3737- }
3838-3939- return &pipeline
4040-}
-169
spindle/repomanager/repomanager.go
···11-package repomanager
22-33-import (
44- "bufio"
55- "bytes"
66- "context"
77- "errors"
88- "fmt"
99- "os"
1010- "os/exec"
1111- "path/filepath"
1212- "slices"
1313- "strings"
1414-1515- "github.com/bluesky-social/indigo/atproto/syntax"
1616- "github.com/go-git/go-git/v5"
1717- "github.com/go-git/go-git/v5/config"
1818- "github.com/go-git/go-git/v5/plumbing/object"
1919- kgit "tangled.org/core/knotserver/git"
2020- "tangled.org/core/types"
2121-)
2222-2323-// RepoManager manages a `sh.tangled.repo` record with its git context.
2424-// It can be used to efficiently fetch the filetree of the repository.
2525-type RepoManager struct {
2626- repoDir string
2727- // TODO: it would be nice if RepoManager can be configured with different
2828- // strategies:
2929- // - use db as an only source for repo records
3030- // - use atproto if record doesn't exist from the db
3131- // - always use atproto
3232- // hmm do we need `RepoStore` interface?
3333- // now `DbRepoStore` and `AtprotoRepoStore` can implement both.
3434- // all `RepoStore` objects will hold `KnotStore` interface, so they can
3535- // source the knot store if needed.
3636-3737- // but now we can't do complex queries like "get repo with issue count"
3838- // that kind of queries will be done directly from `appview.DB` struct
3939- // is graphql better tech for atproto?
4040-}
4141-4242-func New(repoDir string) RepoManager {
4343- return RepoManager{
4444- repoDir: repoDir,
4545- }
4646-}
4747-4848-// TODO: RepoManager can return file tree from repoAt & rev
4949-// It will start syncing the repository if doesn't exist
5050-5151-// RegisterRepo starts sparse-syncing repository with paths
5252-func (m *RepoManager) RegisterRepo(ctx context.Context, repoAt syntax.ATURI, paths []string) error {
5353- repoPath := m.repoPath(repoAt)
5454- exist, err := isDir(repoPath)
5555- if err != nil {
5656- return fmt.Errorf("checking dir info: %w", err)
5757- }
5858- var sparsePaths []string
5959- if !exist {
6060- // init bare git repo
6161- repo, err := git.PlainInit(repoPath, true)
6262- if err != nil {
6363- return fmt.Errorf("initializing repo: %w", err)
6464- }
6565- _, err = repo.CreateRemote(&config.RemoteConfig{
6666- Name: "origin",
6767- URLs: []string{m.repoCloneUrl(repoAt)},
6868- })
6969- if err != nil {
7070- return fmt.Errorf("configuring repo remote: %w", err)
7171- }
7272- } else {
7373- // get sparse-checkout list
7474- sparsePaths, err = func(path string) ([]string, error) {
7575- var stdout bytes.Buffer
7676- listCmd := exec.Command("git", "-C", path, "sparse-checkout", "list")
7777- listCmd.Stdout = &stdout
7878- if err := listCmd.Run(); err != nil {
7979- return nil, err
8080- }
8181-8282- var sparseList []string
8383- scanner := bufio.NewScanner(&stdout)
8484- for scanner.Scan() {
8585- line := strings.TrimSpace(scanner.Text())
8686- if line == "" {
8787- continue
8888- }
8989- sparseList = append(sparseList, line)
9090- }
9191- if err := scanner.Err(); err != nil {
9292- return nil, fmt.Errorf("scanning stdout: %w", err)
9393- }
9494-9595- return sparseList, nil
9696- }(repoPath)
9797- if err != nil {
9898- return fmt.Errorf("parsing sparse-checkout list: %w", err)
9999- }
100100-101101- // add paths to sparse-checkout list
102102- for _, path := range paths {
103103- sparsePaths = append(sparsePaths, path)
104104- }
105105- sparsePaths = slices.Collect(slices.Values(sparsePaths))
106106- }
107107-108108- // set sparse-checkout list
109109- args := append([]string{"-C", repoPath, "sparse-checkout", "set", "--no-cone"}, sparsePaths...)
110110- if err := exec.Command("git", args...).Run(); err != nil {
111111- return fmt.Errorf("setting sparse-checkout list: %w", err)
112112- }
113113- return nil
114114-}
115115-116116-// SyncRepo sparse-fetch specific rev of the repo
117117-func (m *RepoManager) SyncRepo(ctx context.Context, repo syntax.ATURI, rev string) error {
118118- // TODO: fetch repo with rev.
119119- panic("unimplemented")
120120-}
121121-122122-func (m *RepoManager) Open(repo syntax.ATURI, rev string) (*kgit.GitRepo, error) {
123123- // TODO: don't depend on knot/git
124124- return kgit.Open(m.repoPath(repo), rev)
125125-}
126126-127127-func (m *RepoManager) FileTree(ctx context.Context, repo syntax.ATURI, rev, path string) ([]types.NiceTree, error) {
128128- if err := m.SyncRepo(ctx, repo, rev); err != nil {
129129- return nil, fmt.Errorf("syncing git repo")
130130- }
131131- gr, err := m.Open(repo, rev)
132132- if err != nil {
133133- return nil, err
134134- }
135135- dir, err := gr.FileTree(ctx, path)
136136- if err != nil {
137137- if errors.Is(err, object.ErrDirectoryNotFound) {
138138- return nil, nil
139139- }
140140- return nil, fmt.Errorf("loading file tree: %w", err)
141141- }
142142- return dir, err
143143-}
144144-145145-func (m *RepoManager) repoPath(repo syntax.ATURI) string {
146146- return filepath.Join(
147147- m.repoDir,
148148- repo.Authority().String(),
149149- repo.Collection().String(),
150150- repo.RecordKey().String(),
151151- )
152152-}
153153-154154-func (m *RepoManager) repoCloneUrl(repo syntax.ATURI) string {
155155- // 1. get repo & knot models from db. fetch it if doesn't exist
156156- // 2. construct https clone url
157157- panic("unimplemented")
158158-}
159159-160160-func isDir(path string) (bool, error) {
161161- info, err := os.Stat(path)
162162- if err == nil && info.IsDir() {
163163- return true, nil
164164- }
165165- if os.IsNotExist(err) {
166166- return false, nil
167167- }
168168- return false, err
169169-}
+150-223
spindle/server.go
···44 "context"
55 _ "embed"
66 "encoding/json"
77- "errors"
87 "fmt"
98 "log/slog"
109 "maps"
1110 "net/http"
1212- "path/filepath"
1311 "sync"
14121515- "github.com/bluesky-social/indigo/atproto/syntax"
1613 "github.com/go-chi/chi/v5"
1717- "github.com/go-git/go-git/v5/plumbing/object"
1818- "github.com/hashicorp/go-version"
1914 "tangled.org/core/api/tangled"
2015 "tangled.org/core/eventconsumer"
2116 "tangled.org/core/eventconsumer/cursor"
2217 "tangled.org/core/idresolver"
2323- kgit "tangled.org/core/knotserver/git"
1818+ "tangled.org/core/jetstream"
2419 "tangled.org/core/log"
2520 "tangled.org/core/notifier"
2626- "tangled.org/core/rbac2"
2121+ "tangled.org/core/rbac"
2722 "tangled.org/core/spindle/config"
2823 "tangled.org/core/spindle/db"
2924 "tangled.org/core/spindle/engine"
3025 "tangled.org/core/spindle/engines/nixery"
3131- "tangled.org/core/spindle/git"
3226 "tangled.org/core/spindle/models"
3327 "tangled.org/core/spindle/queue"
3428 "tangled.org/core/spindle/secrets"
3529 "tangled.org/core/spindle/xrpc"
3636- "tangled.org/core/tap"
3737- "tangled.org/core/tid"
3838- "tangled.org/core/workflow"
3930 "tangled.org/core/xrpc/serviceauth"
4031)
41324233//go:embed motd
4334var defaultMotd []byte
44353636+const (
3737+ rbacDomain = "thisserver"
3838+)
3939+4540type Spindle struct {
4646- tap *tap.Client
4141+ jc *jetstream.JetstreamClient
4742 db *db.DB
4848- e *rbac2.Enforcer
4343+ e *rbac.Enforcer
4944 l *slog.Logger
5045 n *notifier.Notifier
5146 engs map[string]models.Engine
5252- adapters map[string]models.Adapter
5347 jq *queue.Queue
5448 cfg *config.Config
5549 ks *eventconsumer.Consumer
···6357func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
6458 logger := log.FromContext(ctx)
65596666- if err := ensureGitVersion(); err != nil {
6767- return nil, fmt.Errorf("ensuring git version: %w", err)
6868- }
6969-7070- d, err := db.Make(ctx, cfg.Server.DBPath())
6060+ d, err := db.Make(cfg.Server.DBPath)
7161 if err != nil {
7262 return nil, fmt.Errorf("failed to setup db: %w", err)
7363 }
74647575- e, err := rbac2.NewEnforcer(cfg.Server.DBPath())
6565+ e, err := rbac.NewEnforcer(cfg.Server.DBPath)
7666 if err != nil {
7767 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
7868 }
6969+ e.E.EnableAutoSave(true)
79708071 n := notifier.New()
8172···9586 }
9687 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
9788 case "sqlite", "":
9898- vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets"))
8989+ vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
9990 if err != nil {
10091 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
10192 }
102102- logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath())
9393+ logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
10394 default:
10495 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
10596 }
···10798 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
10899 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
109100110110- tap := tap.NewClient(cfg.Server.TapUrl, "")
101101+ collections := []string{
102102+ tangled.SpindleMemberNSID,
103103+ tangled.RepoNSID,
104104+ tangled.RepoCollaboratorNSID,
105105+ }
106106+ jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
107107+ if err != nil {
108108+ return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
109109+ }
110110+ jc.AddDid(cfg.Server.Owner)
111111+112112+ // Check if the spindle knows about any Dids;
113113+ dids, err := d.GetAllDids()
114114+ if err != nil {
115115+ return nil, fmt.Errorf("failed to get all dids: %w", err)
116116+ }
117117+ for _, d := range dids {
118118+ jc.AddDid(d)
119119+ }
111120112121 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
113122114123 spindle := &Spindle{
115115- tap: &tap,
124124+ jc: jc,
116125 e: e,
117126 db: d,
118127 l: logger,
···125134 motd: defaultMotd,
126135 }
127136128128- err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
137137+ err = e.AddSpindle(rbacDomain)
138138+ if err != nil {
139139+ return nil, fmt.Errorf("failed to set rbac domain: %w", err)
140140+ }
141141+ err = spindle.configureOwner()
129142 if err != nil {
130143 return nil, err
131144 }
132145 logger.Info("owner set", "did", cfg.Server.Owner)
133146134134- cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath())
147147+ cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
135148 if err != nil {
136149 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
137150 }
138151139139- // spindle listen to knot stream for sh.tangled.git.refUpdate
140140- // which will sync the local workflow files in spindle and enqueues the
141141- // pipeline job for on-push workflows
152152+ err = jc.StartJetstream(ctx, spindle.ingest())
153153+ if err != nil {
154154+ return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
155155+ }
156156+157157+ // for each incoming sh.tangled.pipeline, we execute
158158+ // spindle.processPipeline, which in turn enqueues the pipeline
159159+ // job in the above registered queue.
142160 ccfg := eventconsumer.NewConsumerConfig()
143161 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
144162 ccfg.Dev = cfg.Server.Dev
145145- ccfg.ProcessFunc = spindle.processKnotStream
163163+ ccfg.ProcessFunc = spindle.processPipeline
146164 ccfg.CursorStore = cursorStore
147165 knownKnots, err := d.Knots()
148166 if err != nil {
···183201}
184202185203// Enforcer returns the RBAC enforcer instance.
186186-func (s *Spindle) Enforcer() *rbac2.Enforcer {
204204+func (s *Spindle) Enforcer() *rbac.Enforcer {
187205 return s.e
188206}
189207···217235 s.ks.Start(ctx)
218236 }()
219237220220- // ensure server owner is tracked
221221- if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
222222- return err
223223- }
224224-225225- go func() {
226226- s.l.Info("starting tap stream consumer")
227227- s.tap.Connect(ctx, &tap.SimpleIndexer{
228228- EventHandler: s.processEvent,
229229- })
230230- }()
231231-232238 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
233239 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
234240}
···287293 return x.Router()
288294}
289295290290-func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
291291- l := log.FromContext(ctx).With("handler", "processKnotStream")
292292- l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
293293- if msg.Nsid == tangled.GitRefUpdateNSID {
294294- event := tangled.GitRefUpdate{}
295295- if err := json.Unmarshal(msg.EventJson, &event); err != nil {
296296- l.Error("error unmarshalling", "err", err)
296296+func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
297297+ if msg.Nsid == tangled.PipelineNSID {
298298+ tpl := tangled.Pipeline{}
299299+ err := json.Unmarshal(msg.EventJson, &tpl)
300300+ if err != nil {
301301+ fmt.Println("error unmarshalling", err)
297302 return err
298303 }
299299- l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
300304301301- // resolve repo name to rkey
302302- // TODO: git.refUpdate should respond with rkey instead of repo name
303303- repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName)
304304- if err != nil {
305305- return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err)
305305+ if tpl.TriggerMetadata == nil {
306306+ return fmt.Errorf("no trigger metadata found")
306307 }
307308308308- // NOTE: we are blindly trusting the knot that it will return only repos it own
309309- repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
310310- repoPath := s.newRepoPath(repo.Did, repo.Rkey)
311311- if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
312312- return fmt.Errorf("sync git repo: %w", err)
309309+ if tpl.TriggerMetadata.Repo == nil {
310310+ return fmt.Errorf("no repo data found")
313311 }
314314- l.Info("synced git repo")
315312316316- compiler := workflow.Compiler{
317317- Trigger: tangled.Pipeline_TriggerMetadata{
318318- Kind: string(workflow.TriggerKindPush),
319319- Push: &tangled.Pipeline_PushTriggerData{
320320- Ref: event.Ref,
321321- OldSha: event.OldSha,
322322- NewSha: event.NewSha,
323323- },
324324- Repo: &tangled.Pipeline_TriggerRepo{
325325- Did: repo.Did.String(),
326326- Knot: repo.Knot,
327327- Repo: repo.Name,
328328- },
329329- },
313313+ if src.Key() != tpl.TriggerMetadata.Repo.Knot {
314314+ return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
330315 }
331316332332- // load workflow definitions from rev (without spindle context)
333333- rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha)
317317+ // filter by repos
318318+ _, err = s.db.GetRepo(
319319+ tpl.TriggerMetadata.Repo.Knot,
320320+ tpl.TriggerMetadata.Repo.Did,
321321+ tpl.TriggerMetadata.Repo.Repo,
322322+ )
334323 if err != nil {
335335- return fmt.Errorf("loading pipeline: %w", err)
336336- }
337337- if len(rawPipeline) == 0 {
338338- l.Info("no workflow definition find for the repo. skipping the event")
339339- return nil
340340- }
341341- tpl := compiler.Compile(compiler.Parse(rawPipeline))
342342- // TODO: pass compile error to workflow log
343343- for _, w := range compiler.Diagnostics.Errors {
344344- l.Error(w.String())
345345- }
346346- for _, w := range compiler.Diagnostics.Warnings {
347347- l.Warn(w.String())
324324+ return fmt.Errorf("failed to get repo: %w", err)
348325 }
349326350327 pipelineId := models.PipelineId{
351351- Knot: tpl.TriggerMetadata.Repo.Knot,
352352- Rkey: tid.TID(),
353353- }
354354- if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil {
355355- l.Error("failed to create pipeline event", "err", err)
356356- return nil
357357- }
358358- err = s.processPipeline(ctx, tpl, pipelineId)
359359- if err != nil {
360360- return err
328328+ Knot: src.Key(),
329329+ Rkey: msg.Rkey,
361330 }
362362- }
363331364364- return nil
365365-}
332332+ workflows := make(map[models.Engine][]models.Workflow)
333333+334334+ // Build pipeline environment variables once for all workflows
335335+ pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
366336367367-func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) {
368368- if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil {
369369- return nil, fmt.Errorf("syncing git repo: %w", err)
370370- }
371371- gr, err := kgit.Open(repoPath, rev)
372372- if err != nil {
373373- return nil, fmt.Errorf("opening git repo: %w", err)
374374- }
337337+ for _, w := range tpl.Workflows {
338338+ if w != nil {
339339+ if _, ok := s.engs[w.Engine]; !ok {
340340+ err = s.db.StatusFailed(models.WorkflowId{
341341+ PipelineId: pipelineId,
342342+ Name: w.Name,
343343+ }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
344344+ if err != nil {
345345+ return fmt.Errorf("db.StatusFailed: %w", err)
346346+ }
375347376376- workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
377377- if errors.Is(err, object.ErrDirectoryNotFound) {
378378- // return empty RawPipeline when directory doesn't exist
379379- return nil, nil
380380- } else if err != nil {
381381- return nil, fmt.Errorf("loading file tree: %w", err)
382382- }
348348+ continue
349349+ }
383350384384- var rawPipeline workflow.RawPipeline
385385- for _, e := range workflowDir {
386386- if !e.IsFile() {
387387- continue
388388- }
351351+ eng := s.engs[w.Engine]
389352390390- fpath := filepath.Join(workflow.WorkflowDir, e.Name)
391391- contents, err := gr.RawContent(fpath)
392392- if err != nil {
393393- return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err)
394394- }
353353+ if _, ok := workflows[eng]; !ok {
354354+ workflows[eng] = []models.Workflow{}
355355+ }
395356396396- rawPipeline = append(rawPipeline, workflow.RawWorkflow{
397397- Name: e.Name,
398398- Contents: contents,
399399- })
400400- }
357357+ ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
358358+ if err != nil {
359359+ return fmt.Errorf("init workflow: %w", err)
360360+ }
401361402402- return rawPipeline, nil
403403-}
362362+ // inject TANGLED_* env vars after InitWorkflow
363363+ // This prevents user-defined env vars from overriding them
364364+ if ewf.Environment == nil {
365365+ ewf.Environment = make(map[string]string)
366366+ }
367367+ maps.Copy(ewf.Environment, pipelineEnv)
404368405405-func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error {
406406- // Build pipeline environment variables once for all workflows
407407- pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
369369+ workflows[eng] = append(workflows[eng], *ewf)
408370409409- // filter & init workflows
410410- workflows := make(map[models.Engine][]models.Workflow)
411411- for _, w := range tpl.Workflows {
412412- if w == nil {
413413- continue
414414- }
415415- if _, ok := s.engs[w.Engine]; !ok {
416416- err := s.db.StatusFailed(models.WorkflowId{
417417- PipelineId: pipelineId,
418418- Name: w.Name,
419419- }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
420420- if err != nil {
421421- return fmt.Errorf("db.StatusFailed: %w", err)
371371+ err = s.db.StatusPending(models.WorkflowId{
372372+ PipelineId: pipelineId,
373373+ Name: w.Name,
374374+ }, s.n)
375375+ if err != nil {
376376+ return fmt.Errorf("db.StatusPending: %w", err)
377377+ }
422378 }
423423-424424- continue
425379 }
426380427427- eng := s.engs[w.Engine]
428428-429429- if _, ok := workflows[eng]; !ok {
430430- workflows[eng] = []models.Workflow{}
431431- }
432432-433433- ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
434434- if err != nil {
435435- return fmt.Errorf("init workflow: %w", err)
436436- }
437437-438438- // inject TANGLED_* env vars after InitWorkflow
439439- // This prevents user-defined env vars from overriding them
440440- if ewf.Environment == nil {
441441- ewf.Environment = make(map[string]string)
381381+ ok := s.jq.Enqueue(queue.Job{
382382+ Run: func() error {
383383+ engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
384384+ RepoOwner: tpl.TriggerMetadata.Repo.Did,
385385+ RepoName: tpl.TriggerMetadata.Repo.Repo,
386386+ Workflows: workflows,
387387+ }, pipelineId)
388388+ return nil
389389+ },
390390+ OnFail: func(jobError error) {
391391+ s.l.Error("pipeline run failed", "error", jobError)
392392+ },
393393+ })
394394+ if ok {
395395+ s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
396396+ } else {
397397+ s.l.Error("failed to enqueue pipeline: queue is full")
442398 }
443443- maps.Copy(ewf.Environment, pipelineEnv)
444444-445445- workflows[eng] = append(workflows[eng], *ewf)
446399 }
447400448448- // enqueue pipeline
449449- ok := s.jq.Enqueue(queue.Job{
450450- Run: func() error {
451451- engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
452452- RepoOwner: tpl.TriggerMetadata.Repo.Did,
453453- RepoName: tpl.TriggerMetadata.Repo.Repo,
454454- Workflows: workflows,
455455- }, pipelineId)
456456- return nil
457457- },
458458- OnFail: func(jobError error) {
459459- s.l.Error("pipeline run failed", "error", jobError)
460460- },
461461- })
462462- if !ok {
463463- return fmt.Errorf("failed to enqueue pipeline: queue is full")
464464- }
465465- s.l.Info("pipeline enqueued successfully", "id", pipelineId)
466466-467467- // emit StatusPending for all workflows here (after successful enqueue)
468468- for _, ewfs := range workflows {
469469- for _, ewf := range ewfs {
470470- err := s.db.StatusPending(models.WorkflowId{
471471- PipelineId: pipelineId,
472472- Name: ewf.Name,
473473- }, s.n)
474474- if err != nil {
475475- return fmt.Errorf("db.StatusPending: %w", err)
476476- }
477477- }
478478- }
479401 return nil
480402}
481403482482-// newRepoPath creates a path to store repository by its did and rkey.
483483-// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
484484-func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string {
485485- return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String())
486486-}
404404+func (s *Spindle) configureOwner() error {
405405+ cfgOwner := s.cfg.Server.Owner
487406488488-func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
489489- scheme := "https://"
490490- if s.cfg.Server.Dev {
491491- scheme = "http://"
407407+ existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
408408+ if err != nil {
409409+ return err
492410 }
493493- return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
494494-}
495411496496-const RequiredVersion = "2.49.0"
412412+ switch len(existing) {
413413+ case 0:
414414+ // no owner configured, continue
415415+ case 1:
416416+ // find existing owner
417417+ existingOwner := existing[0]
497418498498-func ensureGitVersion() error {
499499- v, err := git.Version()
500500- if err != nil {
501501- return fmt.Errorf("fetching git version: %w", err)
419419+ // no ownership change, this is okay
420420+ if existingOwner == s.cfg.Server.Owner {
421421+ break
422422+ }
423423+424424+ // remove existing owner
425425+ err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
426426+ if err != nil {
427427+ return nil
428428+ }
429429+ default:
430430+ return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
502431 }
503503- if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
504504- return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
505505- }
506506- return nil
432432+433433+ return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
507434}
-391
spindle/tap.go
···11-package spindle
22-33-import (
44- "context"
55- "encoding/json"
66- "fmt"
77- "time"
88-99- "github.com/bluesky-social/indigo/atproto/syntax"
1010- "tangled.org/core/api/tangled"
1111- "tangled.org/core/eventconsumer"
1212- "tangled.org/core/spindle/db"
1313- "tangled.org/core/spindle/git"
1414- "tangled.org/core/spindle/models"
1515- "tangled.org/core/tap"
1616- "tangled.org/core/tid"
1717- "tangled.org/core/workflow"
1818-)
1919-2020-func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error {
2121- l := s.l.With("component", "tapIndexer")
2222-2323- var err error
2424- switch evt.Type {
2525- case tap.EvtRecord:
2626- switch evt.Record.Collection.String() {
2727- case tangled.SpindleMemberNSID:
2828- err = s.processMember(ctx, evt)
2929- case tangled.RepoNSID:
3030- err = s.processRepo(ctx, evt)
3131- case tangled.RepoCollaboratorNSID:
3232- err = s.processCollaborator(ctx, evt)
3333- case tangled.RepoPullNSID:
3434- err = s.processPull(ctx, evt)
3535- }
3636- case tap.EvtIdentity:
3737- // no-op
3838- }
3939-4040- if err != nil {
4141- l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
4242- return err
4343- }
4444- return nil
4545-}
4646-4747-// NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated)
4848-4949-func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error {
5050- l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
5151-5252- l.Info("processing spindle.member record")
5353-5454- // only listen to members
5555- if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
5656- l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err)
5757- return nil
5858- }
5959-6060- switch evt.Record.Action {
6161- case tap.RecordCreateAction, tap.RecordUpdateAction:
6262- record := tangled.SpindleMember{}
6363- if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
6464- return fmt.Errorf("parsing record: %w", err)
6565- }
6666-6767- domain := s.cfg.Server.Hostname
6868- if record.Instance != domain {
6969- l.Info("domain mismatch", "domain", record.Instance, "expected", domain)
7070- return nil
7171- }
7272-7373- created, err := time.Parse(record.CreatedAt, time.RFC3339)
7474- if err != nil {
7575- created = time.Now()
7676- }
7777- if err := db.AddSpindleMember(s.db, db.SpindleMember{
7878- Did: evt.Record.Did,
7979- Rkey: evt.Record.Rkey.String(),
8080- Instance: record.Instance,
8181- Subject: syntax.DID(record.Subject),
8282- Created: created,
8383- }); err != nil {
8484- l.Error("failed to add member", "error", err)
8585- return fmt.Errorf("adding member to db: %w", err)
8686- }
8787- if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil {
8888- return fmt.Errorf("adding member to rbac: %w", err)
8989- }
9090- if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
9191- return fmt.Errorf("adding did to tap: %w", err)
9292- }
9393-9494- l.Info("added member", "member", record.Subject)
9595- return nil
9696-9797- case tap.RecordDeleteAction:
9898- var (
9999- did = evt.Record.Did.String()
100100- rkey = evt.Record.Rkey.String()
101101- )
102102- member, err := db.GetSpindleMember(s.db, did, rkey)
103103- if err != nil {
104104- return fmt.Errorf("finding member: %w", err)
105105- }
106106-107107- if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
108108- return fmt.Errorf("removing member from db: %w", err)
109109- }
110110- if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil {
111111- return fmt.Errorf("removing member from rbac: %w", err)
112112- }
113113- if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil {
114114- return fmt.Errorf("removing did from tap: %w", err)
115115- }
116116-117117- l.Info("removed member", "member", member.Subject)
118118- return nil
119119- }
120120- return nil
121121-}
122122-123123-func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error {
124124- l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
125125-126126- l.Info("processing repo.collaborator record")
127127-128128- // only listen to members
129129- if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
130130- l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err)
131131- return nil
132132- }
133133-134134- switch evt.Record.Action {
135135- case tap.RecordCreateAction, tap.RecordUpdateAction:
136136- record := tangled.RepoCollaborator{}
137137- if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
138138- l.Error("invalid record", "err", err)
139139- return fmt.Errorf("parsing record: %w", err)
140140- }
141141-142142- // retry later if target repo is not ingested yet
143143- if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil {
144144- l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err)
145145- return fmt.Errorf("target repo is unknown")
146146- }
147147-148148- // check perms for this user
149149- if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil {
150150- l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err)
151151- return nil
152152- }
153153-154154- if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{
155155- Did: evt.Record.Did,
156156- Rkey: evt.Record.Rkey,
157157- Repo: syntax.ATURI(record.Repo),
158158- Subject: syntax.DID(record.Subject),
159159- }); err != nil {
160160- return fmt.Errorf("adding collaborator to db: %w", err)
161161- }
162162- if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil {
163163- return fmt.Errorf("adding collaborator to rbac: %w", err)
164164- }
165165- if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
166166- return fmt.Errorf("adding did to tap: %w", err)
167167- }
168168-169169- l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo)
170170- return nil
171171-172172- case tap.RecordDeleteAction:
173173- // get existing collaborator
174174- collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey)
175175- if err != nil {
176176- return fmt.Errorf("failed to get existing collaborator info: %w", err)
177177- }
178178-179179- // check perms for this user
180180- if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil {
181181- l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err)
182182- return nil
183183- }
184184-185185- if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil {
186186- return fmt.Errorf("removing collaborator from db: %w", err)
187187- }
188188- if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil {
189189- return fmt.Errorf("removing collaborator from rbac: %w", err)
190190- }
191191- if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil {
192192- return fmt.Errorf("removing did from tap: %w", err)
193193- }
194194-195195- l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo)
196196- return nil
197197- }
198198- return nil
199199-}
200200-201201-func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error {
202202- l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
203203-204204- l.Info("processing repo record")
205205-206206- // only listen to members
207207- if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
208208- l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err)
209209- return nil
210210- }
211211-212212- switch evt.Record.Action {
213213- case tap.RecordCreateAction, tap.RecordUpdateAction:
214214- record := tangled.Repo{}
215215- if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
216216- return fmt.Errorf("parsing record: %w", err)
217217- }
218218-219219- domain := s.cfg.Server.Hostname
220220- if record.Spindle == nil || *record.Spindle != domain {
221221- if record.Spindle == nil {
222222- l.Info("spindle isn't configured", "name", record.Name)
223223- } else {
224224- l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
225225- }
226226- if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
227227- return fmt.Errorf("deleting repo from db: %w", err)
228228- }
229229- return nil
230230- }
231231-232232- repo := &db.Repo{
233233- Did: evt.Record.Did,
234234- Rkey: evt.Record.Rkey,
235235- Name: record.Name,
236236- Knot: record.Knot,
237237- }
238238-239239- if err := s.db.PutRepo(repo); err != nil {
240240- return fmt.Errorf("adding repo to db: %w", err)
241241- }
242242-243243- if err := s.e.AddRepo(evt.Record.AtUri()); err != nil {
244244- return fmt.Errorf("adding repo to rbac")
245245- }
246246-247247- // add this knot to the event consumer
248248- src := eventconsumer.NewKnotSource(record.Knot)
249249- s.ks.AddSource(context.Background(), src)
250250-251251- // setup sparse sync
252252- repoCloneUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name)
253253- repoPath := s.newRepoPath(repo.Did, repo.Rkey)
254254- if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil {
255255- return fmt.Errorf("setting up sparse-clone git repo: %w", err)
256256- }
257257-258258- l.Info("added repo", "repo", evt.Record.AtUri())
259259- return nil
260260-261261- case tap.RecordDeleteAction:
262262- // check perms for this user
263263- if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil {
264264- l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err)
265265- return nil
266266- }
267267-268268- if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
269269- return fmt.Errorf("deleting repo from db: %w", err)
270270- }
271271-272272- if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil {
273273- return fmt.Errorf("deleting repo from rbac: %w", err)
274274- }
275275-276276- l.Info("deleted repo", "repo", evt.Record.AtUri())
277277- return nil
278278- }
279279- return nil
280280-}
281281-282282-func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error {
283283- l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
284284-285285- l.Info("processing pull record")
286286-287287- // only listen to live events
288288- if !evt.Record.Live {
289289- l.Info("skipping backfill event", "event", evt.Record.AtUri())
290290- return nil
291291- }
292292-293293- switch evt.Record.Action {
294294- case tap.RecordCreateAction, tap.RecordUpdateAction:
295295- record := tangled.RepoPull{}
296296- if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
297297- l.Error("invalid record", "err", err)
298298- return fmt.Errorf("parsing record: %w", err)
299299- }
300300-301301- // ignore legacy records
302302- if record.Target == nil {
303303- l.Info("ignoring pull record: target repo is nil")
304304- return nil
305305- }
306306-307307- // ignore patch-based and fork-based PRs
308308- if record.Source == nil || record.Source.Repo != nil {
309309- l.Info("ignoring pull record: not a branch-based pull request")
310310- return nil
311311- }
312312-313313- // skip if target repo is unknown
314314- repo, err := s.db.GetRepo(syntax.ATURI(record.Target.Repo))
315315- if err != nil {
316316- l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err)
317317- return fmt.Errorf("target repo is unknown")
318318- }
319319-320320- compiler := workflow.Compiler{
321321- Trigger: tangled.Pipeline_TriggerMetadata{
322322- Kind: string(workflow.TriggerKindPullRequest),
323323- PullRequest: &tangled.Pipeline_PullRequestTriggerData{
324324- Action: "create",
325325- SourceBranch: record.Source.Branch,
326326- SourceSha: record.Source.Sha,
327327- TargetBranch: record.Target.Branch,
328328- },
329329- Repo: &tangled.Pipeline_TriggerRepo{
330330- Did: repo.Did.String(),
331331- Knot: repo.Knot,
332332- Repo: repo.Name,
333333- },
334334- },
335335- }
336336-337337- repoUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name)
338338- repoPath := s.newRepoPath(repo.Did, repo.Rkey)
339339-340340- // load workflow definitions from rev (without spindle context)
341341- rawPipeline, err := s.loadPipeline(ctx, repoUri, repoPath, record.Source.Sha)
342342- if err != nil {
343343- // don't retry
344344- l.Error("failed loading pipeline", "err", err)
345345- return nil
346346- }
347347- if len(rawPipeline) == 0 {
348348- l.Info("no workflow definition find for the repo. skipping the event")
349349- return nil
350350- }
351351- tpl := compiler.Compile(compiler.Parse(rawPipeline))
352352- // TODO: pass compile error to workflow log
353353- for _, w := range compiler.Diagnostics.Errors {
354354- l.Error(w.String())
355355- }
356356- for _, w := range compiler.Diagnostics.Warnings {
357357- l.Warn(w.String())
358358- }
359359-360360- pipelineId := models.PipelineId{
361361- Knot: tpl.TriggerMetadata.Repo.Knot,
362362- Rkey: tid.TID(),
363363- }
364364- if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil {
365365- l.Error("failed to create pipeline event", "err", err)
366366- return nil
367367- }
368368- err = s.processPipeline(ctx, tpl, pipelineId)
369369- if err != nil {
370370- // don't retry
371371- l.Error("failed processing pipeline", "err", err)
372372- return nil
373373- }
374374- case tap.RecordDeleteAction:
375375- // no-op
376376- }
377377- return nil
378378-}
379379-380380-func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error {
381381- known, err := s.db.IsKnownDid(syntax.DID(did))
382382- if err != nil {
383383- return fmt.Errorf("ensuring did known state: %w", err)
384384- }
385385- if !known {
386386- if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil {
387387- return fmt.Errorf("removing did from tap: %w", err)
388388- }
389389- }
390390- return nil
391391-}