forked from tangled.org/core
Monorepo for Tangled

Compare changes

Choose any two refs to compare.

+1
flake.nix
··· 306 306 imports = [./nix/modules/spindle.nix]; 307 307 308 308 services.tangled.spindle.package = lib.mkDefault self.packages.${pkgs.stdenv.hostPlatform.system}.spindle; 309 + services.tangled.spindle.tap-package = lib.mkDefault self.packages.${pkgs.system}.tap; 309 310 }; 310 311 nixosModules.did-method-plc = { 311 312 lib,
+1
go.mod
··· 131 131 github.com/hashicorp/go-secure-stdlib/parseutil v0.2.0 // indirect 132 132 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect 133 133 github.com/hashicorp/go-sockaddr v1.0.7 // indirect 134 + github.com/hashicorp/go-version v1.8.0 // indirect 134 135 github.com/hashicorp/golang-lru v1.0.2 // indirect 135 136 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 136 137 github.com/hashicorp/hcl v1.0.1-vault-7 // indirect
+2
go.sum
··· 264 264 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4= 265 265 github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw= 266 266 github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw= 267 + github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= 268 + github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= 267 269 github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= 268 270 github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= 269 271 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+3
nix/gomod2nix.toml
··· 304 304 [mod."github.com/hashicorp/go-sockaddr"] 305 305 version = "v1.0.7" 306 306 hash = "sha256-p6eDOrGzN1jMmT/F/f/VJMq0cKNFhUcEuVVwTE6vSrs=" 307 + [mod."github.com/hashicorp/go-version"] 308 + version = "v1.8.0" 309 + hash = "sha256-KXtqERmYrWdpqPCViWcHbe6jnuH7k16bvBIcuJuevj8=" 307 310 [mod."github.com/hashicorp/golang-lru"] 308 311 version = "v1.0.2" 309 312 hash = "sha256-yy+5botc6T5wXgOe2mfNXJP3wr+MkVlUZ2JBkmmrA48="
+35
nix/modules/spindle.nix
··· 1 1 { 2 2 config, 3 + pkgs, 3 4 lib, 4 5 ... 5 6 }: let ··· 16 17 package = mkOption { 17 18 type = types.package; 18 19 description = "Package to use for the spindle"; 20 + }; 21 + tap-package = mkOption { 22 + type = types.package; 23 + description = "Package to use for the spindle"; 24 + }; 25 + 26 + atpRelayUrl = mkOption { 27 + type = types.str; 28 + default = "https://relay1.us-east.bsky.network"; 29 + description = "atproto relay"; 19 30 }; 20 31 21 32 server = { ··· 114 125 config = mkIf cfg.enable { 115 126 virtualisation.docker.enable = true; 116 127 128 + systemd.services.spindle-tap = { 129 + description = "spindle tap service"; 130 + after = ["network.target" "docker.service"]; 131 + wantedBy = ["multi-user.target"]; 132 + serviceConfig = { 133 + LogsDirectory = "spindle-tap"; 134 + StateDirectory = "spindle-tap"; 135 + Environment = [ 136 + "TAP_BIND=:2480" 137 + "TAP_PLC_URL=${cfg.server.plcUrl}" 138 + "TAP_RELAY_URL=${cfg.atpRelayUrl}" 139 + "TAP_COLLECTION_FILTERS=${concatStringsSep "," [ 140 + "sh.tangled.repo" 141 + "sh.tangled.repo.collaborator" 142 + "sh.tangled.spindle.member" 143 + ]}" 144 + ]; 145 + ExecStart = "${getExe cfg.tap-package} run"; 146 + }; 147 + }; 148 + 117 149 systemd.services.spindle = { 118 150 description = "spindle service"; 119 151 after = ["network.target" "docker.service"]; 120 152 wantedBy = ["multi-user.target"]; 153 + path = [ 154 + pkgs.git 155 + ]; 121 156 serviceConfig = { 122 157 LogsDirectory = "spindle"; 123 158 StateDirectory = "spindle";
+2
nix/vm.nix
··· 19 19 20 20 plcUrl = envVarOr "TANGLED_VM_PLC_URL" "https://plc.directory"; 21 21 jetstream = envVarOr "TANGLED_VM_JETSTREAM_ENDPOINT" "wss://jetstream1.us-west.bsky.network/subscribe"; 22 + relayUrl = envVarOr "TANGLED_VM_RELAY_URL" "https://relay1.us-east.bsky.network"; 22 23 in 23 24 nixpkgs.lib.nixosSystem { 24 25 inherit system; ··· 95 96 }; 96 97 services.tangled.spindle = { 97 98 enable = true; 99 + atpRelayUrl = relayUrl; 98 100 server = { 99 101 owner = envVar "TANGLED_VM_SPINDLE_OWNER"; 100 102 hostname = envVarOr "TANGLED_VM_SPINDLE_HOST" "localhost:6555";
+10
orm/orm.go
··· 20 20 } 21 21 defer tx.Rollback() 22 22 23 + _, err = tx.Exec(` 24 + create table if not exists migrations ( 25 + id integer primary key autoincrement, 26 + name text unique 27 + ); 28 + `) 29 + if err != nil { 30 + return fmt.Errorf("creating migrations table: %w", err) 31 + } 32 + 23 33 var exists bool 24 34 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists) 25 35 if err != nil {
+7
spindle/config/config.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "path" 6 7 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 8 9 "github.com/sethvargo/go-envconfig" ··· 13 14 DBPath string `env:"DB_PATH, default=spindle.db"` 14 15 Hostname string `env:"HOSTNAME, required"` 15 16 JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"` 17 + TapUrl string `env:"TAP_URL, required"` 16 18 PlcUrl string `env:"PLC_URL, default=https://plc.directory"` 17 19 Dev bool `env:"DEV, default=false"` 18 20 Owner syntax.DID `env:"OWNER, required"` 19 21 Secrets Secrets `env:",prefix=SECRETS_"` 20 22 LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 23 + DataDir string `env:"DATA_DIR, default=/var/lib/spindle"` 21 24 QueueSize int `env:"QUEUE_SIZE, default=100"` 22 25 MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time 23 26 } 24 27 25 28 func (s Server) Did() syntax.DID { 26 29 return syntax.DID(fmt.Sprintf("did:web:%s", s.Hostname)) 30 + } 31 + 32 + func (s Server) RepoDir() string { 33 + return path.Join(s.DataDir, "repos") 27 34 } 28 35 29 36 type Secrets struct {
+59 -18
spindle/db/db.go
··· 1 1 package db 2 2 3 3 import ( 4 + "context" 4 5 "database/sql" 5 6 "strings" 6 7 8 + "github.com/bluesky-social/indigo/atproto/syntax" 7 9 _ "github.com/mattn/go-sqlite3" 10 + "tangled.org/core/log" 11 + "tangled.org/core/orm" 8 12 ) 9 13 10 14 type DB struct { 11 15 *sql.DB 12 16 } 13 17 14 - func Make(dbPath string) (*DB, error) { 18 + func Make(ctx context.Context, dbPath string) (*DB, error) { 15 19 // https://github.com/mattn/go-sqlite3#connection-string 16 20 opts := []string{ 17 21 "_foreign_keys=1", ··· 19 23 "_synchronous=NORMAL", 20 24 "_auto_vacuum=incremental", 21 25 } 26 + 27 + logger := log.FromContext(ctx) 28 + logger = log.SubLogger(logger, "db") 22 29 23 30 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 24 31 if err != nil { 25 32 return nil, err 26 33 } 27 34 28 - // NOTE: If any other migration is added here, you MUST 29 - // copy the pattern in appview: use a single sql.Conn 30 - // for every migration. 35 + conn, err := db.Conn(ctx) 36 + if err != nil { 37 + return nil, err 38 + } 39 + defer conn.Close() 31 40 32 41 _, err = db.Exec(` 33 42 create table if not exists _jetstream ( ··· 76 85 return nil, err 77 86 } 78 87 79 - return &DB{db}, nil 80 - } 88 + // run migrations 89 + 90 + // NOTE: this won't migrate existing records 91 + // they will be fetched again with tap instead 92 + orm.RunMigration(conn, logger, "add-rkey-to-repos", func(tx *sql.Tx) error { 93 + // archive legacy repos (just in case) 94 + _, err = tx.Exec(`alter table repos rename to repos_old`) 95 + if err != nil { 96 + return err 97 + } 98 + 99 + _, err := tx.Exec(` 100 + create table repos_new ( 101 + -- identifiers 102 + id integer primary key autoincrement, 103 + did text not null, 104 + rkey text not null, 105 + at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo' || '/' || rkey) stored, 106 + 107 + name text not null, 108 + knot text not null, 109 + 110 + addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 111 + unique(did, rkey) 112 + ); 113 + `) 114 + if err != nil { 115 + return err 116 + } 117 + 118 + return nil 119 + }) 81 120 82 - func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 83 - _, err := d.Exec(` 84 - insert into _jetstream (id, last_time_us) 85 - values (1, ?) 86 - on conflict(id) do update set last_time_us = excluded.last_time_us 87 - `, lastTimeUs) 88 - return err 121 + return &DB{db}, nil 89 122 } 90 123 91 - func (d *DB) GetLastTimeUs() (int64, error) { 92 - var lastTimeUs int64 93 - row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`) 94 - err := row.Scan(&lastTimeUs) 95 - return lastTimeUs, err 124 + func (d *DB) IsKnownDid(did syntax.DID) (bool, error) { 125 + // is spindle member / repo collaborator 126 + var exists bool 127 + err := d.QueryRow( 128 + `select exists ( 129 + select 1 from repo_collaborators where did = ? 130 + union all 131 + select 1 from spindle_members where did = ? 132 + )`, 133 + did, 134 + did, 135 + ).Scan(&exists) 136 + return exists, err 96 137 }
-44
spindle/db/known_dids.go
··· 1 - package db 2 - 3 - func (d *DB) AddDid(did string) error { 4 - _, err := d.Exec(`insert or ignore into known_dids (did) values (?)`, did) 5 - return err 6 - } 7 - 8 - func (d *DB) RemoveDid(did string) error { 9 - _, err := d.Exec(`delete from known_dids where did = ?`, did) 10 - return err 11 - } 12 - 13 - func (d *DB) GetAllDids() ([]string, error) { 14 - var dids []string 15 - 16 - rows, err := d.Query(`select did from known_dids`) 17 - if err != nil { 18 - return nil, err 19 - } 20 - defer rows.Close() 21 - 22 - for rows.Next() { 23 - var did string 24 - if err := rows.Scan(&did); err != nil { 25 - return nil, err 26 - } 27 - dids = append(dids, did) 28 - } 29 - 30 - if err := rows.Err(); err != nil { 31 - return nil, err 32 - } 33 - 34 - return dids, nil 35 - } 36 - 37 - func (d *DB) HasKnownDids() bool { 38 - var count int 39 - err := d.QueryRow(`select count(*) from known_dids`).Scan(&count) 40 - if err != nil { 41 - return false 42 - } 43 - return count > 0 44 - }
+120 -11
spindle/db/repos.go
··· 1 1 package db 2 2 3 + import "github.com/bluesky-social/indigo/atproto/syntax" 4 + 3 5 type Repo struct { 4 - Knot string 5 - Owner string 6 - Name string 6 + Did syntax.DID 7 + Rkey syntax.RecordKey 8 + Name string 9 + Knot string 10 + } 11 + 12 + type RepoCollaborator struct { 13 + Did syntax.DID 14 + Rkey syntax.RecordKey 15 + Repo syntax.ATURI 16 + Subject syntax.DID 7 17 } 8 18 9 - func (d *DB) AddRepo(knot, owner, name string) error { 10 - _, err := d.Exec(`insert or ignore into repos (knot, owner, name) values (?, ?, ?)`, knot, owner, name) 19 + func (d *DB) PutRepo(repo *Repo) error { 20 + _, err := d.Exec( 21 + `insert or ignore into repos (did, rkey, name, knot) 22 + values (?, ?, ?, ?) 23 + on conflict(did, rkey) do update set 24 + name = excluded.name 25 + knot = excluded.knot`, 26 + repo.Did, 27 + repo.Rkey, 28 + repo.Name, 29 + repo.Knot, 30 + ) 31 + return err 32 + } 33 + 34 + func (d *DB) DeleteRepo(did syntax.DID, rkey syntax.RecordKey) error { 35 + _, err := d.Exec( 36 + `delete from repos where did = ? and rkey = ?`, 37 + did, 38 + rkey, 39 + ) 11 40 return err 12 41 } 13 42 ··· 34 63 return knots, nil 35 64 } 36 65 37 - func (d *DB) GetRepo(knot, owner, name string) (*Repo, error) { 66 + func (d *DB) GetRepo(did syntax.DID, rkey syntax.RecordKey) (*Repo, error) { 38 67 var repo Repo 68 + err := d.DB.QueryRow( 69 + `select 70 + did, 71 + rkey, 72 + name, 73 + knot 74 + from repos where did = ? and rkey = ?`, 75 + did, 76 + rkey, 77 + ).Scan( 78 + &repo.Did, 79 + &repo.Rkey, 80 + &repo.Name, 81 + &repo.Knot, 82 + ) 83 + if err != nil { 84 + return nil, err 85 + } 86 + return &repo, nil 87 + } 39 88 40 - query := "select knot, owner, name from repos where knot = ? and owner = ? and name = ?" 41 - err := d.DB.QueryRow(query, knot, owner, name). 42 - Scan(&repo.Knot, &repo.Owner, &repo.Name) 89 + func (d *DB) GetRepoWithName(did syntax.DID, name string) (*Repo, error) { 90 + var repo Repo 91 + err := d.DB.QueryRow( 92 + `select 93 + did, 94 + rkey, 95 + name, 96 + knot 97 + from repos where did = ? and name = ?`, 98 + did, 99 + name, 100 + ).Scan( 101 + &repo.Did, 102 + &repo.Rkey, 103 + &repo.Name, 104 + &repo.Knot, 105 + ) 106 + if err != nil { 107 + return nil, err 108 + } 109 + return &repo, nil 110 + } 111 + 112 + func (d *DB) PutRepoCollaborator(collaborator *RepoCollaborator) error { 113 + _, err := d.Exec( 114 + `insert into repo_collaborators (did, rkey, repo, subject) 115 + values (?, ?, ?, ?) 116 + on conflict(did, rkey) do update set 117 + repo = excluded.repo 118 + subject = excluded.subject`, 119 + collaborator.Did, 120 + collaborator.Rkey, 121 + collaborator.Repo, 122 + collaborator.Subject, 123 + ) 124 + return err 125 + } 126 + 127 + func (d *DB) RemoveRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) error { 128 + _, err := d.Exec( 129 + `delete from repo_collaborators where did = ? and rkey = ?`, 130 + did, 131 + rkey, 132 + ) 133 + return err 134 + } 43 135 136 + func (d *DB) GetRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) (*RepoCollaborator, error) { 137 + var collaborator RepoCollaborator 138 + err := d.DB.QueryRow( 139 + `select 140 + did, 141 + rkey, 142 + repo, 143 + subject 144 + from repo_collaborators 145 + where did = ? and rkey = ?`, 146 + did, 147 + rkey, 148 + ).Scan( 149 + &collaborator.Did, 150 + &collaborator.Rkey, 151 + &collaborator.Repo, 152 + &collaborator.Subject, 153 + ) 44 154 if err != nil { 45 155 return nil, err 46 156 } 47 - 48 - return &repo, nil 157 + return &collaborator, nil 49 158 }
-276
spindle/ingester.go
··· 1 - package spindle 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "errors" 7 - "fmt" 8 - "time" 9 - 10 - "tangled.org/core/api/tangled" 11 - "tangled.org/core/eventconsumer" 12 - "tangled.org/core/spindle/db" 13 - 14 - comatproto "github.com/bluesky-social/indigo/api/atproto" 15 - "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/bluesky-social/indigo/xrpc" 17 - "github.com/bluesky-social/jetstream/pkg/models" 18 - ) 19 - 20 - type Ingester func(ctx context.Context, e *models.Event) error 21 - 22 - func (s *Spindle) ingest() Ingester { 23 - return func(ctx context.Context, e *models.Event) error { 24 - var err error 25 - defer func() { 26 - eventTime := e.TimeUS 27 - lastTimeUs := eventTime + 1 28 - if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 29 - err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 30 - } 31 - }() 32 - 33 - if e.Kind != models.EventKindCommit { 34 - return nil 35 - } 36 - 37 - switch e.Commit.Collection { 38 - case tangled.SpindleMemberNSID: 39 - err = s.ingestMember(ctx, e) 40 - case tangled.RepoNSID: 41 - err = s.ingestRepo(ctx, e) 42 - case tangled.RepoCollaboratorNSID: 43 - err = s.ingestCollaborator(ctx, e) 44 - } 45 - 46 - if err != nil { 47 - s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 48 - } 49 - 50 - return nil 51 - } 52 - } 53 - 54 - func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 55 - var err error 56 - did := e.Did 57 - rkey := e.Commit.RKey 58 - 59 - l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 60 - 61 - switch e.Commit.Operation { 62 - case models.CommitOperationCreate, models.CommitOperationUpdate: 63 - raw := e.Commit.Record 64 - record := tangled.SpindleMember{} 65 - err = json.Unmarshal(raw, &record) 66 - if err != nil { 67 - l.Error("invalid record", "error", err) 68 - return err 69 - } 70 - 71 - domain := s.cfg.Server.Hostname 72 - recordInstance := record.Instance 73 - 74 - if recordInstance != domain { 75 - l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 76 - return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 77 - } 78 - 79 - ok, err := s.e.IsSpindleMemberInviteAllowed(syntax.DID(did), s.cfg.Server.Did()) 80 - if err != nil || !ok { 81 - l.Error("failed to add member", "did", did, "error", err) 82 - return fmt.Errorf("failed to enforce permissions: %w", err) 83 - } 84 - 85 - if err := db.AddSpindleMember(s.db, db.SpindleMember{ 86 - Did: syntax.DID(did), 87 - Rkey: rkey, 88 - Instance: recordInstance, 89 - Subject: syntax.DID(record.Subject), 90 - Created: time.Now(), 91 - }); err != nil { 92 - l.Error("failed to add member", "error", err) 93 - return fmt.Errorf("failed to add member: %w", err) 94 - } 95 - 96 - if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 97 - l.Error("failed to add member", "error", err) 98 - return fmt.Errorf("failed to add member: %w", err) 99 - } 100 - l.Info("added member from firehose", "member", record.Subject) 101 - 102 - if err := s.db.AddDid(record.Subject); err != nil { 103 - l.Error("failed to add did", "error", err) 104 - return fmt.Errorf("failed to add did: %w", err) 105 - } 106 - s.jc.AddDid(record.Subject) 107 - 108 - return nil 109 - 110 - case models.CommitOperationDelete: 111 - record, err := db.GetSpindleMember(s.db, did, rkey) 112 - if err != nil { 113 - l.Error("failed to find member", "error", err) 114 - return fmt.Errorf("failed to find member: %w", err) 115 - } 116 - 117 - if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 118 - l.Error("failed to remove member", "error", err) 119 - return fmt.Errorf("failed to remove member: %w", err) 120 - } 121 - 122 - if err := s.e.RemoveSpindleMember(record.Subject, s.cfg.Server.Did()); err != nil { 123 - l.Error("failed to add member", "error", err) 124 - return fmt.Errorf("failed to add member: %w", err) 125 - } 126 - l.Info("added member from firehose", "member", record.Subject) 127 - 128 - if err := s.db.RemoveDid(record.Subject.String()); err != nil { 129 - l.Error("failed to add did", "error", err) 130 - return fmt.Errorf("failed to add did: %w", err) 131 - } 132 - s.jc.RemoveDid(record.Subject.String()) 133 - 134 - } 135 - return nil 136 - } 137 - 138 - func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 139 - var err error 140 - did := e.Did 141 - 142 - l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 143 - 144 - l.Info("ingesting repo record", "did", did) 145 - 146 - switch e.Commit.Operation { 147 - case models.CommitOperationCreate, models.CommitOperationUpdate: 148 - raw := e.Commit.Record 149 - record := tangled.Repo{} 150 - err = json.Unmarshal(raw, &record) 151 - if err != nil { 152 - l.Error("invalid record", "error", err) 153 - return err 154 - } 155 - 156 - domain := s.cfg.Server.Hostname 157 - 158 - // no spindle configured for this repo 159 - if record.Spindle == nil { 160 - l.Info("no spindle configured", "name", record.Name) 161 - return nil 162 - } 163 - 164 - // this repo did not want this spindle 165 - if *record.Spindle != domain { 166 - l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 167 - return nil 168 - } 169 - 170 - // add this repo to the watch list 171 - if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil { 172 - l.Error("failed to add repo", "error", err) 173 - return fmt.Errorf("failed to add repo: %w", err) 174 - } 175 - 176 - repoAt := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, e.Commit.Collection, e.Commit.RKey)) 177 - 178 - // add repo to rbac 179 - if err := s.e.AddRepo(repoAt); err != nil { 180 - l.Error("failed to add repo to enforcer", "error", err) 181 - return fmt.Errorf("failed to add repo: %w", err) 182 - } 183 - 184 - // add collaborators to rbac 185 - if err := s.fetchAndAddCollaborators(ctx, repoAt); err != nil { 186 - return err 187 - } 188 - 189 - // add this knot to the event consumer 190 - src := eventconsumer.NewKnotSource(record.Knot) 191 - s.ks.AddSource(context.Background(), src) 192 - 193 - return nil 194 - 195 - } 196 - return nil 197 - } 198 - 199 - func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 200 - var err error 201 - 202 - l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 203 - 204 - l.Info("ingesting collaborator record") 205 - 206 - switch e.Commit.Operation { 207 - case models.CommitOperationCreate, models.CommitOperationUpdate: 208 - raw := e.Commit.Record 209 - record := tangled.RepoCollaborator{} 210 - err = json.Unmarshal(raw, &record) 211 - if err != nil { 212 - l.Error("invalid record", "error", err) 213 - return err 214 - } 215 - 216 - subjectId, err := s.res.ResolveIdent(ctx, record.Subject) 217 - if err != nil || subjectId.Handle.IsInvalidHandle() { 218 - return err 219 - } 220 - 221 - repoAt, err := syntax.ParseATURI(record.Repo) 222 - if err != nil { 223 - l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 224 - return nil 225 - } 226 - 227 - // check perms for this user 228 - if ok, err := s.e.IsRepoCollaboratorInviteAllowed(syntax.DID(e.Did), repoAt); !ok || err != nil { 229 - return fmt.Errorf("insufficient permissions: %w", err) 230 - } 231 - 232 - // add collaborator to rbac 233 - if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), repoAt); err != nil { 234 - l.Error("failed to add repo to enforcer", "error", err) 235 - return fmt.Errorf("failed to add repo: %w", err) 236 - } 237 - 238 - return nil 239 - } 240 - return nil 241 - } 242 - 243 - func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, repo syntax.ATURI) error { 244 - l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 245 - 246 - l.Info("fetching and adding existing collaborators") 247 - 248 - ident, err := s.res.ResolveIdent(ctx, repo.Authority().String()) 249 - if err != nil || ident.Handle.IsInvalidHandle() { 250 - return fmt.Errorf("failed to resolve handle: %w", err) 251 - } 252 - 253 - xrpcc := xrpc.Client{ 254 - Host: ident.PDSEndpoint(), 255 - } 256 - 257 - resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, ident.DID.String(), false) 258 - if err != nil { 259 - return err 260 - } 261 - 262 - var errs error 263 - for _, r := range resp.Records { 264 - if r == nil { 265 - continue 266 - } 267 - record := r.Value.Val.(*tangled.RepoCollaborator) 268 - 269 - if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 270 - l.Error("failed to add repo to enforcer", "error", err) 271 - errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 272 - } 273 - } 274 - 275 - return errs 276 - }
+134 -31
spindle/server.go
··· 1 1 package spindle 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 _ "embed" 6 7 "encoding/json" ··· 8 9 "log/slog" 9 10 "maps" 10 11 "net/http" 12 + "os" 13 + "os/exec" 14 + "path" 15 + "strings" 11 16 17 + "github.com/bluesky-social/indigo/atproto/syntax" 12 18 "github.com/go-chi/chi/v5" 19 + "github.com/hashicorp/go-version" 13 20 "tangled.org/core/api/tangled" 14 21 "tangled.org/core/eventconsumer" 15 22 "tangled.org/core/eventconsumer/cursor" 16 23 "tangled.org/core/idresolver" 17 - "tangled.org/core/jetstream" 18 24 "tangled.org/core/log" 19 25 "tangled.org/core/notifier" 20 26 "tangled.org/core/rbac2" ··· 26 32 "tangled.org/core/spindle/queue" 27 33 "tangled.org/core/spindle/secrets" 28 34 "tangled.org/core/spindle/xrpc" 35 + "tangled.org/core/tap" 29 36 "tangled.org/core/xrpc/serviceauth" 30 37 ) 31 38 ··· 33 40 var motd []byte 34 41 35 42 type Spindle struct { 36 - jc *jetstream.JetstreamClient 43 + tap *tap.Client 37 44 db *db.DB 38 45 e *rbac2.Enforcer 39 46 l *slog.Logger ··· 50 57 func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 51 58 logger := log.FromContext(ctx) 52 59 53 - d, err := db.Make(cfg.Server.DBPath) 60 + if err := ensureGitVersion(); err != nil { 61 + return nil, fmt.Errorf("ensuring git version: %w", err) 62 + } 63 + 64 + d, err := db.Make(ctx, cfg.Server.DBPath) 54 65 if err != nil { 55 66 return nil, fmt.Errorf("failed to setup db: %w", err) 56 67 } ··· 90 101 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 91 102 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 92 103 93 - collections := []string{ 94 - tangled.SpindleMemberNSID, 95 - tangled.RepoNSID, 96 - tangled.RepoCollaboratorNSID, 97 - } 98 - jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 99 - if err != nil { 100 - return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 101 - } 102 - jc.AddDid(cfg.Server.Owner.String()) 103 - 104 - // Check if the spindle knows about any Dids; 105 - dids, err := d.GetAllDids() 106 - if err != nil { 107 - return nil, fmt.Errorf("failed to get all dids: %w", err) 108 - } 109 - for _, d := range dids { 110 - jc.AddDid(d) 111 - } 104 + tap := tap.NewClient(cfg.Server.TapUrl, "") 112 105 113 106 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 114 107 115 108 spindle := &Spindle{ 116 - jc: jc, 109 + tap: &tap, 117 110 e: e, 118 111 db: d, 119 112 l: logger, ··· 136 129 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 137 130 } 138 131 139 - err = jc.StartJetstream(ctx, spindle.ingest()) 140 - if err != nil { 141 - return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 142 - } 143 - 144 132 // for each incoming sh.tangled.pipeline, we execute 145 133 // spindle.processPipeline, which in turn enqueues the pipeline 146 134 // job in the above registered queue. ··· 208 196 s.ks.Start(ctx) 209 197 }() 210 198 199 + go func() { 200 + s.l.Info("starting tap stream consumer") 201 + s.tap.Connect(ctx, &tap.SimpleIndexer{ 202 + EventHandler: s.processEvent, 203 + }) 204 + }() 205 + 211 206 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 212 207 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 213 208 } ··· 267 262 } 268 263 269 264 func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 265 + l := log.FromContext(ctx).With("handler", "processKnotStream") 266 + l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 270 267 if msg.Nsid == tangled.PipelineNSID { 268 + return nil 271 269 tpl := tangled.Pipeline{} 272 270 err := json.Unmarshal(msg.EventJson, &tpl) 273 271 if err != nil { ··· 288 286 } 289 287 290 288 // filter by repos 291 - _, err = s.db.GetRepo( 292 - tpl.TriggerMetadata.Repo.Knot, 293 - tpl.TriggerMetadata.Repo.Did, 289 + _, err = s.db.GetRepoWithName( 290 + syntax.DID(tpl.TriggerMetadata.Repo.Did), 294 291 tpl.TriggerMetadata.Repo.Repo, 295 292 ) 296 293 if err != nil { ··· 369 366 } else { 370 367 s.l.Error("failed to enqueue pipeline: queue is full") 371 368 } 369 + } else if msg.Nsid == tangled.GitRefUpdateNSID { 370 + event := tangled.GitRefUpdate{} 371 + if err := json.Unmarshal(msg.EventJson, &event); err != nil { 372 + l.Error("error unmarshalling", "err", err) 373 + return err 374 + } 375 + l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 376 + 377 + // use event.RepoAt 378 + // sync git repos in {data}/repos/{did}/sh.tangled.repo/{rkey} 379 + // if it's nil, don't run pipeline. knot needs upgrade 380 + // we will leave sh.tangled.pipeline.trigger for backward compatibility 381 + 382 + // NOTE: we are blindly trusting the knot that it will return only repos it own 383 + repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 384 + repoPath := s.newRepoPath(event.RepoDid, event.RepoName) 385 + err := sparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha) 386 + if err != nil { 387 + l.Error("failed to sync git repo", "err", err) 388 + return fmt.Errorf("sync git repo: %w", err) 389 + } 390 + l.Info("synced git repo") 391 + 392 + // TODO: plan the pipeline 372 393 } 373 394 374 395 return nil 375 396 } 397 + 398 + func (s *Spindle) newRepoPath(did, name string) string { 399 + return path.Join(s.cfg.Server.RepoDir(), did, name) 400 + } 401 + 402 + func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 403 + scheme := "https://" 404 + if s.cfg.Server.Dev { 405 + scheme = "http://" 406 + } 407 + return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 408 + } 409 + 410 + const RequiredVersion = "2.49.0" 411 + 412 + func ensureGitVersion() error { 413 + v, err := gitVersion() 414 + if err != nil { 415 + return fmt.Errorf("fetching git version: %w", err) 416 + } 417 + if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 418 + return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 419 + } 420 + return nil 421 + } 422 + 423 + // TODO: move to "git" module shared between knot, appview & spindle 424 + func gitVersion() (*version.Version, error) { 425 + var buf bytes.Buffer 426 + cmd := exec.Command("git", "version") 427 + cmd.Stdout = &buf 428 + cmd.Stderr = os.Stderr 429 + err := cmd.Run() 430 + if err != nil { 431 + return nil, err 432 + } 433 + fields := strings.Fields(buf.String()) 434 + if len(fields) < 3 { 435 + return nil, fmt.Errorf("invalid git version: %s", buf) 436 + } 437 + 438 + // version string is like: "git version 2.29.3" or "git version 2.29.3.windows.1" 439 + versionString := fields[2] 440 + if pos := strings.Index(versionString, "windows"); pos >= 1 { 441 + versionString = versionString[:pos-1] 442 + } 443 + return version.NewVersion(versionString) 444 + } 445 + 446 + func sparseSyncGitRepo(ctx context.Context, cloneUri, path, rev string) error { 447 + exist, err := isDir(path) 448 + if err != nil { 449 + return err 450 + } 451 + if !exist { 452 + if err := exec.Command("git", "clone", "--no-checkout", "--depth=1", "--filter=tree:0", "--revision="+rev, cloneUri, path).Run(); err != nil { 453 + return fmt.Errorf("git clone: %w", err) 454 + } 455 + if err := exec.Command("git", "-C", path, "sparse-checkout", "set", "--no-cone", `'/.tangled/workflows'`).Run(); err != nil { 456 + return fmt.Errorf("git sparse-checkout set: %w", err) 457 + } 458 + if err := exec.Command("git", "-C", path, "checkout", rev).Run(); err != nil { 459 + return fmt.Errorf("git checkout: %w", err) 460 + } 461 + } else { 462 + if err := exec.Command("git", "-C", path, "pull", "origin", rev).Run(); err != nil { 463 + return fmt.Errorf("git pull: %w", err) 464 + } 465 + } 466 + return nil 467 + } 468 + 469 + func isDir(path string) (bool, error) { 470 + info, err := os.Stat(path) 471 + if err == nil && info.IsDir() { 472 + return true, nil 473 + } 474 + if os.IsNotExist(err) { 475 + return false, nil 476 + } 477 + return false, err 478 + }
+281
spindle/tap.go
··· 1 + package spindle 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/api/tangled" 11 + "tangled.org/core/eventconsumer" 12 + "tangled.org/core/spindle/db" 13 + "tangled.org/core/tap" 14 + ) 15 + 16 + func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 17 + l := s.l.With("component", "tapIndexer") 18 + 19 + var err error 20 + switch evt.Type { 21 + case tap.EvtRecord: 22 + switch evt.Record.Collection.String() { 23 + case tangled.SpindleMemberNSID: 24 + err = s.processMember(ctx, evt) 25 + case tangled.RepoNSID: 26 + err = s.processRepo(ctx, evt) 27 + case tangled.RepoCollaboratorNSID: 28 + err = s.processCollaborator(ctx, evt) 29 + case tangled.RepoPullNSID: 30 + err = s.processPull(ctx, evt) 31 + } 32 + case tap.EvtIdentity: 33 + // no-op 34 + } 35 + 36 + if err != nil { 37 + l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 38 + return err 39 + } 40 + return nil 41 + } 42 + 43 + // NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 44 + 45 + func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 46 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 47 + 48 + l.Info("processing spindle.member record") 49 + 50 + // check perms for this user 51 + if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 52 + l.Warn("forbidden request", "did", evt.Record.Did, "error", err) 53 + return nil 54 + } 55 + 56 + switch evt.Record.Action { 57 + case tap.RecordCreateAction, tap.RecordUpdateAction: 58 + record := tangled.SpindleMember{} 59 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 60 + return fmt.Errorf("parsing record: %w", err) 61 + } 62 + 63 + domain := s.cfg.Server.Hostname 64 + if record.Instance != domain { 65 + l.Info("domain mismatch", "domain", record.Instance, "expected", domain) 66 + return nil 67 + } 68 + 69 + created, err := time.Parse(record.CreatedAt, time.RFC3339) 70 + if err != nil { 71 + created = time.Now() 72 + } 73 + if err := db.AddSpindleMember(s.db, db.SpindleMember{ 74 + Did: evt.Record.Did, 75 + Rkey: evt.Record.Rkey.String(), 76 + Instance: record.Instance, 77 + Subject: syntax.DID(record.Subject), 78 + Created: created, 79 + }); err != nil { 80 + l.Error("failed to add member", "error", err) 81 + return fmt.Errorf("adding member to db: %w", err) 82 + } 83 + if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 84 + return fmt.Errorf("adding member to rbac: %w", err) 85 + } 86 + if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 87 + return fmt.Errorf("adding did to tap", err) 88 + } 89 + 90 + l.Info("added member", "member", record.Subject) 91 + return nil 92 + 93 + case tap.RecordDeleteAction: 94 + var ( 95 + did = evt.Record.Did.String() 96 + rkey = evt.Record.Rkey.String() 97 + ) 98 + member, err := db.GetSpindleMember(s.db, did, rkey) 99 + if err != nil { 100 + return fmt.Errorf("finding member: %w", err) 101 + } 102 + 103 + if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 104 + return fmt.Errorf("removing member from db: %w", err) 105 + } 106 + if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil { 107 + return fmt.Errorf("removing member from rbac: %w", err) 108 + } 109 + if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 110 + return fmt.Errorf("removing did from tap: %w", err) 111 + } 112 + 113 + l.Info("removed member", "member", member.Subject) 114 + return nil 115 + } 116 + return nil 117 + } 118 + 119 + func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 120 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 121 + 122 + l.Info("processing collaborator record") 123 + switch evt.Record.Action { 124 + case tap.RecordCreateAction, tap.RecordUpdateAction: 125 + record := tangled.RepoCollaborator{} 126 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 127 + l.Error("invalid record", "err", err) 128 + return fmt.Errorf("parsing record: %w", err) 129 + } 130 + 131 + // check perms for this user 132 + if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil { 133 + l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 134 + return nil 135 + } 136 + 137 + if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{ 138 + Did: evt.Record.Did, 139 + Rkey: evt.Record.Rkey, 140 + Repo: syntax.ATURI(record.Repo), 141 + Subject: syntax.DID(record.Subject), 142 + }); err != nil { 143 + return fmt.Errorf("adding collaborator to db: %w", err) 144 + } 145 + if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 146 + return fmt.Errorf("adding collaborator to rbac: %w", err) 147 + } 148 + if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 149 + return fmt.Errorf("adding did to tap: %w", err) 150 + } 151 + 152 + l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 153 + return nil 154 + 155 + case tap.RecordDeleteAction: 156 + // get existing collaborator 157 + collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 158 + if err != nil { 159 + return fmt.Errorf("failed to get existing collaborator info: %w", err) 160 + } 161 + 162 + // check perms for this user 163 + if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil { 164 + l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 165 + return nil 166 + } 167 + 168 + if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil { 169 + return fmt.Errorf("removing collaborator from db: %w", err) 170 + } 171 + if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil { 172 + return fmt.Errorf("removing collaborator from rbac: %w", err) 173 + } 174 + if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 175 + return fmt.Errorf("removing did from tap: %w", err) 176 + } 177 + 178 + l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) 179 + return nil 180 + } 181 + return nil 182 + } 183 + 184 + func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 185 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 186 + 187 + l.Info("processing repo record") 188 + 189 + // check perms for this user 190 + if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 191 + l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 192 + return nil 193 + } 194 + 195 + switch evt.Record.Action { 196 + case tap.RecordCreateAction, tap.RecordUpdateAction: 197 + record := tangled.Repo{} 198 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 199 + return fmt.Errorf("parsing record: %w", err) 200 + } 201 + 202 + domain := s.cfg.Server.Hostname 203 + if record.Spindle == nil || *record.Spindle != domain { 204 + if record.Spindle == nil { 205 + l.Info("spindle isn't configured", "name", record.Name) 206 + } else { 207 + l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 208 + } 209 + if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 210 + return fmt.Errorf("deleting repo from db: %w", err) 211 + } 212 + return nil 213 + } 214 + 215 + if err := s.db.PutRepo(&db.Repo{ 216 + Did: evt.Record.Did, 217 + Rkey: evt.Record.Rkey, 218 + Name: record.Name, 219 + Knot: record.Knot, 220 + }); err != nil { 221 + return fmt.Errorf("adding repo to db: %w", err) 222 + } 223 + 224 + if err := s.e.AddRepo(evt.Record.AtUri()); err != nil { 225 + return fmt.Errorf("adding repo to rbac") 226 + } 227 + 228 + // add this knot to the event consumer 229 + src := eventconsumer.NewKnotSource(record.Knot) 230 + s.ks.AddSource(context.Background(), src) 231 + 232 + l.Info("added repo", "repo", evt.Record.AtUri()) 233 + return nil 234 + 235 + case tap.RecordDeleteAction: 236 + // check perms for this user 237 + if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 238 + l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 239 + return nil 240 + } 241 + 242 + if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 243 + return fmt.Errorf("deleting repo from db: %w", err) 244 + } 245 + 246 + if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil { 247 + return fmt.Errorf("deleting repo from rbac: %w", err) 248 + } 249 + 250 + l.Info("deleted repo", "repo", evt.Record.AtUri()) 251 + return nil 252 + } 253 + return nil 254 + } 255 + 256 + func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 257 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 258 + 259 + l.Info("processing pull record") 260 + 261 + switch evt.Record.Action { 262 + case tap.RecordCreateAction, tap.RecordUpdateAction: 263 + // TODO 264 + case tap.RecordDeleteAction: 265 + // TODO 266 + } 267 + return nil 268 + } 269 + 270 + func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error { 271 + known, err := s.db.IsKnownDid(syntax.DID(did)) 272 + if err != nil { 273 + return fmt.Errorf("ensuring did known state: %w", err) 274 + } 275 + if !known { 276 + if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 277 + return fmt.Errorf("removing did from tap: %w", err) 278 + } 279 + } 280 + return nil 281 + }
+18
tap/simpleIndexer.go
··· 1 + package tap 2 + 3 + import "context" 4 + 5 + type SimpleIndexer struct { 6 + EventHandler func(ctx context.Context, evt Event) error 7 + ErrorHandler func(ctx context.Context, err error) 8 + } 9 + 10 + var _ Handler = (*SimpleIndexer)(nil) 11 + 12 + func (i *SimpleIndexer) OnEvent(ctx context.Context, evt Event) error { 13 + return i.EventHandler(ctx, evt) 14 + } 15 + 16 + func (i *SimpleIndexer) OnError(ctx context.Context, err error) { 17 + i.ErrorHandler(ctx, err) 18 + }
+169
tap/tap.go
··· 1 + /// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 + 3 + package tap 4 + 5 + import ( 6 + "bytes" 7 + "context" 8 + "encoding/json" 9 + "fmt" 10 + "net/http" 11 + "net/url" 12 + 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + "github.com/gorilla/websocket" 15 + "tangled.org/core/log" 16 + ) 17 + 18 + // type WebsocketOptions struct { 19 + // maxReconnectSeconds int 20 + // heartbeatIntervalMs int 21 + // // onReconnectError 22 + // } 23 + 24 + type Handler interface { 25 + OnEvent(ctx context.Context, evt Event) error 26 + OnError(ctx context.Context, err error) 27 + } 28 + 29 + type Client struct { 30 + Url string 31 + AdminPassword string 32 + HTTPClient *http.Client 33 + } 34 + 35 + func NewClient(url, adminPassword string) Client { 36 + return Client{ 37 + Url: url, 38 + AdminPassword: adminPassword, 39 + HTTPClient: &http.Client{}, 40 + } 41 + } 42 + 43 + func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 44 + body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 45 + if err != nil { 46 + return err 47 + } 48 + req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 49 + if err != nil { 50 + return err 51 + } 52 + req.SetBasicAuth("admin", c.AdminPassword) 53 + req.Header.Set("Content-Type", "application/json") 54 + 55 + resp, err := c.HTTPClient.Do(req) 56 + if err != nil { 57 + return err 58 + } 59 + defer resp.Body.Close() 60 + if resp.StatusCode != http.StatusOK { 61 + return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 62 + } 63 + return nil 64 + } 65 + 66 + func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 67 + body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 68 + if err != nil { 69 + return err 70 + } 71 + req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 72 + if err != nil { 73 + return err 74 + } 75 + req.SetBasicAuth("admin", c.AdminPassword) 76 + req.Header.Set("Content-Type", "application/json") 77 + 78 + resp, err := c.HTTPClient.Do(req) 79 + if err != nil { 80 + return err 81 + } 82 + defer resp.Body.Close() 83 + if resp.StatusCode != http.StatusOK { 84 + return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 85 + } 86 + return nil 87 + } 88 + 89 + func (c *Client) Connect(ctx context.Context, handler Handler) error { 90 + l := log.FromContext(ctx) 91 + 92 + u, err := url.Parse(c.Url) 93 + if err != nil { 94 + return err 95 + } 96 + if u.Scheme == "https" { 97 + u.Scheme = "wss" 98 + } else { 99 + u.Scheme = "ws" 100 + } 101 + u.Path = "/channel" 102 + 103 + // TODO: set auth on dial 104 + 105 + url := u.String() 106 + 107 + // var backoff int 108 + // for { 109 + // select { 110 + // case <-ctx.Done(): 111 + // return ctx.Err() 112 + // default: 113 + // } 114 + // 115 + // header := http.Header{ 116 + // "Authorization": []string{""}, 117 + // } 118 + // conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 119 + // if err != nil { 120 + // l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 121 + // time.Sleep(time.Duration(5+backoff) * time.Second) 122 + // backoff++ 123 + // 124 + // continue 125 + // } else { 126 + // backoff = 0 127 + // } 128 + // 129 + // l.Info("event subscription response", "code", res.StatusCode) 130 + // } 131 + 132 + // TODO: keep websocket connection alive 133 + conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil) 134 + if err != nil { 135 + return err 136 + } 137 + defer conn.Close() 138 + 139 + for { 140 + select { 141 + case <-ctx.Done(): 142 + return ctx.Err() 143 + default: 144 + } 145 + _, message, err := conn.ReadMessage() 146 + if err != nil { 147 + return err 148 + } 149 + 150 + var ev Event 151 + if err := json.Unmarshal(message, &ev); err != nil { 152 + handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 153 + continue 154 + } 155 + if err := handler.OnEvent(ctx, ev); err != nil { 156 + handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 157 + continue 158 + } 159 + 160 + ack := map[string]any{ 161 + "type": "ack", 162 + "id": ev.ID, 163 + } 164 + if err := conn.WriteJSON(ack); err != nil { 165 + l.Warn("failed to send ack", "err", err) 166 + continue 167 + } 168 + } 169 + }
+62
tap/types.go
··· 1 + package tap 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + ) 9 + 10 + type EventType string 11 + 12 + const ( 13 + EvtRecord EventType = "record" 14 + EvtIdentity EventType = "identity" 15 + ) 16 + 17 + type Event struct { 18 + ID int64 `json:"id"` 19 + Type EventType `json:"type"` 20 + Record *RecordEventData `json:"record,omitempty"` 21 + Identity *IdentityEventData `json:"identity,omitempty"` 22 + } 23 + 24 + type RecordEventData struct { 25 + Live bool `json:"live"` 26 + Did syntax.DID `json:"did"` 27 + Rev string `json:"rev"` 28 + Collection syntax.NSID `json:"collection"` 29 + Rkey syntax.RecordKey `json:"rkey"` 30 + Action RecordAction `json:"action"` 31 + Record json.RawMessage `json:"record,omitempty"` 32 + CID *syntax.CID `json:"cid,omitempty"` 33 + } 34 + 35 + func (r *RecordEventData) AtUri() syntax.ATURI { 36 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, r.Collection, r.Rkey)) 37 + } 38 + 39 + type RecordAction string 40 + 41 + const ( 42 + RecordCreateAction RecordAction = "create" 43 + RecordUpdateAction RecordAction = "update" 44 + RecordDeleteAction RecordAction = "delete" 45 + ) 46 + 47 + type IdentityEventData struct { 48 + DID syntax.DID `json:"did"` 49 + Handle string `json:"handle"` 50 + IsActive bool `json:"is_active"` 51 + Status RepoStatus `json:"status"` 52 + } 53 + 54 + type RepoStatus string 55 + 56 + const ( 57 + RepoStatusActive RepoStatus = "active" 58 + RepoStatusTakendown RepoStatus = "takendown" 59 + RepoStatusSuspended RepoStatus = "suspended" 60 + RepoStatusDeactivated RepoStatus = "deactivated" 61 + RepoStatusDeleted RepoStatus = "deleted" 62 + )