forked from tangled.org/core
Monorepo for Tangled

spindle: switch to tap from jetstream

spindle-tap will collect/stream record events from:
- users dynamically added by spindle (spindle members | collaborators of
repos using spindle)
- any users with `sh.tangled.repo.pull` collection

It might be bit inefficient considering it will also stream repo
creation events from PR authors due to second rule, but at least we now
have backfill logic and Sync 1.1 based syncing.

This inefficiency can be fixed later by modifying upstream tap cli or
embedding tap into spindle.

```
+--------- all tangled users --------+
| |
| +-- users known to spindle-tap --+ |
| | (PR author / manually added) | |
| | | |
| | +----------------------------+ | |
| | | users known to spindle | | |
| | | (members / collaborators) | | |
| | +----------------------------+ | |
| +--------------------------------+ |
+------------------------------------+
```

Close: <https://tangled.org/tangled.org/core/issues/341>

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me 4fd20b2b 2f1dddb0

verified
Changed files
+430 -375
spindle
+1
spindle/config/config.go
··· 13 13 DBPath string `env:"DB_PATH, default=spindle.db"` 14 14 Hostname string `env:"HOSTNAME, required"` 15 15 JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"` 16 + TapUrl string `env:"TAP_URL, required"` 16 17 PlcUrl string `env:"PLC_URL, default=https://plc.directory"` 17 18 Dev bool `env:"DEV, default=false"` 18 19 Owner syntax.DID `env:"OWNER, required"`
+14 -14
spindle/db/db.go
··· 5 5 "database/sql" 6 6 "strings" 7 7 8 + "github.com/bluesky-social/indigo/atproto/syntax" 8 9 _ "github.com/mattn/go-sqlite3" 9 10 "tangled.org/core/log" 10 11 "tangled.org/core/orm" ··· 120 121 return &DB{db}, nil 121 122 } 122 123 123 - func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 124 - _, err := d.Exec(` 125 - insert into _jetstream (id, last_time_us) 126 - values (1, ?) 127 - on conflict(id) do update set last_time_us = excluded.last_time_us 128 - `, lastTimeUs) 129 - return err 130 - } 131 - 132 - func (d *DB) GetLastTimeUs() (int64, error) { 133 - var lastTimeUs int64 134 - row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`) 135 - err := row.Scan(&lastTimeUs) 136 - return lastTimeUs, err 124 + func (d *DB) IsKnownDid(did syntax.DID) (bool, error) { 125 + // is spindle member / repo collaborator 126 + var exists bool 127 + err := d.QueryRow( 128 + `select exists ( 129 + select 1 from repo_collaborators where did = ? 130 + union all 131 + select 1 from spindle_members where did = ? 132 + )`, 133 + did, 134 + did, 135 + ).Scan(&exists) 136 + return exists, err 137 137 }
-44
spindle/db/known_dids.go
··· 1 - package db 2 - 3 - func (d *DB) AddDid(did string) error { 4 - _, err := d.Exec(`insert or ignore into known_dids (did) values (?)`, did) 5 - return err 6 - } 7 - 8 - func (d *DB) RemoveDid(did string) error { 9 - _, err := d.Exec(`delete from known_dids where did = ?`, did) 10 - return err 11 - } 12 - 13 - func (d *DB) GetAllDids() ([]string, error) { 14 - var dids []string 15 - 16 - rows, err := d.Query(`select did from known_dids`) 17 - if err != nil { 18 - return nil, err 19 - } 20 - defer rows.Close() 21 - 22 - for rows.Next() { 23 - var did string 24 - if err := rows.Scan(&did); err != nil { 25 - return nil, err 26 - } 27 - dids = append(dids, did) 28 - } 29 - 30 - if err := rows.Err(); err != nil { 31 - return nil, err 32 - } 33 - 34 - return dids, nil 35 - } 36 - 37 - func (d *DB) HasKnownDids() bool { 38 - var count int 39 - err := d.QueryRow(`select count(*) from known_dids`).Scan(&count) 40 - if err != nil { 41 - return false 42 - } 43 - return count > 0 44 - }
+120 -11
spindle/db/repos.go
··· 1 1 package db 2 2 3 + import "github.com/bluesky-social/indigo/atproto/syntax" 4 + 3 5 type Repo struct { 4 - Knot string 5 - Owner string 6 - Name string 6 + Did syntax.DID 7 + Rkey syntax.RecordKey 8 + Name string 9 + Knot string 10 + } 11 + 12 + type RepoCollaborator struct { 13 + Did syntax.DID 14 + Rkey syntax.RecordKey 15 + Repo syntax.ATURI 16 + Subject syntax.DID 7 17 } 8 18 9 - func (d *DB) AddRepo(knot, owner, name string) error { 10 - _, err := d.Exec(`insert or ignore into repos (knot, owner, name) values (?, ?, ?)`, knot, owner, name) 19 + func (d *DB) PutRepo(repo *Repo) error { 20 + _, err := d.Exec( 21 + `insert or ignore into repos (did, rkey, name, knot) 22 + values (?, ?, ?, ?) 23 + on conflict(did, rkey) do update set 24 + name = excluded.name 25 + knot = excluded.knot`, 26 + repo.Did, 27 + repo.Rkey, 28 + repo.Name, 29 + repo.Knot, 30 + ) 31 + return err 32 + } 33 + 34 + func (d *DB) DeleteRepo(did syntax.DID, rkey syntax.RecordKey) error { 35 + _, err := d.Exec( 36 + `delete from repos where did = ? and rkey = ?`, 37 + did, 38 + rkey, 39 + ) 11 40 return err 12 41 } 13 42 ··· 34 63 return knots, nil 35 64 } 36 65 37 - func (d *DB) GetRepo(knot, owner, name string) (*Repo, error) { 66 + func (d *DB) GetRepo(did syntax.DID, rkey syntax.RecordKey) (*Repo, error) { 38 67 var repo Repo 68 + err := d.DB.QueryRow( 69 + `select 70 + did, 71 + rkey, 72 + name, 73 + knot 74 + from repos where did = ? and rkey = ?`, 75 + did, 76 + rkey, 77 + ).Scan( 78 + &repo.Did, 79 + &repo.Rkey, 80 + &repo.Name, 81 + &repo.Knot, 82 + ) 83 + if err != nil { 84 + return nil, err 85 + } 86 + return &repo, nil 87 + } 39 88 40 - query := "select knot, owner, name from repos where knot = ? and owner = ? and name = ?" 41 - err := d.DB.QueryRow(query, knot, owner, name). 42 - Scan(&repo.Knot, &repo.Owner, &repo.Name) 89 + func (d *DB) GetRepoWithName(did syntax.DID, name string) (*Repo, error) { 90 + var repo Repo 91 + err := d.DB.QueryRow( 92 + `select 93 + did, 94 + rkey, 95 + name, 96 + knot 97 + from repos where did = ? and name = ?`, 98 + did, 99 + name, 100 + ).Scan( 101 + &repo.Did, 102 + &repo.Rkey, 103 + &repo.Name, 104 + &repo.Knot, 105 + ) 106 + if err != nil { 107 + return nil, err 108 + } 109 + return &repo, nil 110 + } 111 + 112 + func (d *DB) PutRepoCollaborator(collaborator *RepoCollaborator) error { 113 + _, err := d.Exec( 114 + `insert into repo_collaborators (did, rkey, repo, subject) 115 + values (?, ?, ?, ?) 116 + on conflict(did, rkey) do update set 117 + repo = excluded.repo 118 + subject = excluded.subject`, 119 + collaborator.Did, 120 + collaborator.Rkey, 121 + collaborator.Repo, 122 + collaborator.Subject, 123 + ) 124 + return err 125 + } 126 + 127 + func (d *DB) RemoveRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) error { 128 + _, err := d.Exec( 129 + `delete from repo_collaborators where did = ? and rkey = ?`, 130 + did, 131 + rkey, 132 + ) 133 + return err 134 + } 43 135 136 + func (d *DB) GetRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) (*RepoCollaborator, error) { 137 + var collaborator RepoCollaborator 138 + err := d.DB.QueryRow( 139 + `select 140 + did, 141 + rkey, 142 + repo, 143 + subject 144 + from repo_collaborators 145 + where did = ? and rkey = ?`, 146 + did, 147 + rkey, 148 + ).Scan( 149 + &collaborator.Did, 150 + &collaborator.Rkey, 151 + &collaborator.Repo, 152 + &collaborator.Subject, 153 + ) 44 154 if err != nil { 45 155 return nil, err 46 156 } 47 - 48 - return &repo, nil 157 + return &collaborator, nil 49 158 }
-276
spindle/ingester.go
··· 1 - package spindle 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "errors" 7 - "fmt" 8 - "time" 9 - 10 - "tangled.org/core/api/tangled" 11 - "tangled.org/core/eventconsumer" 12 - "tangled.org/core/spindle/db" 13 - 14 - comatproto "github.com/bluesky-social/indigo/api/atproto" 15 - "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/bluesky-social/indigo/xrpc" 17 - "github.com/bluesky-social/jetstream/pkg/models" 18 - ) 19 - 20 - type Ingester func(ctx context.Context, e *models.Event) error 21 - 22 - func (s *Spindle) ingest() Ingester { 23 - return func(ctx context.Context, e *models.Event) error { 24 - var err error 25 - defer func() { 26 - eventTime := e.TimeUS 27 - lastTimeUs := eventTime + 1 28 - if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 29 - err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 30 - } 31 - }() 32 - 33 - if e.Kind != models.EventKindCommit { 34 - return nil 35 - } 36 - 37 - switch e.Commit.Collection { 38 - case tangled.SpindleMemberNSID: 39 - err = s.ingestMember(ctx, e) 40 - case tangled.RepoNSID: 41 - err = s.ingestRepo(ctx, e) 42 - case tangled.RepoCollaboratorNSID: 43 - err = s.ingestCollaborator(ctx, e) 44 - } 45 - 46 - if err != nil { 47 - s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 48 - } 49 - 50 - return nil 51 - } 52 - } 53 - 54 - func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 55 - var err error 56 - did := e.Did 57 - rkey := e.Commit.RKey 58 - 59 - l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 60 - 61 - switch e.Commit.Operation { 62 - case models.CommitOperationCreate, models.CommitOperationUpdate: 63 - raw := e.Commit.Record 64 - record := tangled.SpindleMember{} 65 - err = json.Unmarshal(raw, &record) 66 - if err != nil { 67 - l.Error("invalid record", "error", err) 68 - return err 69 - } 70 - 71 - domain := s.cfg.Server.Hostname 72 - recordInstance := record.Instance 73 - 74 - if recordInstance != domain { 75 - l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 76 - return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 77 - } 78 - 79 - ok, err := s.e.IsSpindleMemberInviteAllowed(syntax.DID(did), s.cfg.Server.Did()) 80 - if err != nil || !ok { 81 - l.Error("failed to add member", "did", did, "error", err) 82 - return fmt.Errorf("failed to enforce permissions: %w", err) 83 - } 84 - 85 - if err := db.AddSpindleMember(s.db, db.SpindleMember{ 86 - Did: syntax.DID(did), 87 - Rkey: rkey, 88 - Instance: recordInstance, 89 - Subject: syntax.DID(record.Subject), 90 - Created: time.Now(), 91 - }); err != nil { 92 - l.Error("failed to add member", "error", err) 93 - return fmt.Errorf("failed to add member: %w", err) 94 - } 95 - 96 - if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 97 - l.Error("failed to add member", "error", err) 98 - return fmt.Errorf("failed to add member: %w", err) 99 - } 100 - l.Info("added member from firehose", "member", record.Subject) 101 - 102 - if err := s.db.AddDid(record.Subject); err != nil { 103 - l.Error("failed to add did", "error", err) 104 - return fmt.Errorf("failed to add did: %w", err) 105 - } 106 - s.jc.AddDid(record.Subject) 107 - 108 - return nil 109 - 110 - case models.CommitOperationDelete: 111 - record, err := db.GetSpindleMember(s.db, did, rkey) 112 - if err != nil { 113 - l.Error("failed to find member", "error", err) 114 - return fmt.Errorf("failed to find member: %w", err) 115 - } 116 - 117 - if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 118 - l.Error("failed to remove member", "error", err) 119 - return fmt.Errorf("failed to remove member: %w", err) 120 - } 121 - 122 - if err := s.e.RemoveSpindleMember(record.Subject, s.cfg.Server.Did()); err != nil { 123 - l.Error("failed to add member", "error", err) 124 - return fmt.Errorf("failed to add member: %w", err) 125 - } 126 - l.Info("added member from firehose", "member", record.Subject) 127 - 128 - if err := s.db.RemoveDid(record.Subject.String()); err != nil { 129 - l.Error("failed to add did", "error", err) 130 - return fmt.Errorf("failed to add did: %w", err) 131 - } 132 - s.jc.RemoveDid(record.Subject.String()) 133 - 134 - } 135 - return nil 136 - } 137 - 138 - func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 139 - var err error 140 - did := e.Did 141 - 142 - l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 143 - 144 - l.Info("ingesting repo record", "did", did) 145 - 146 - switch e.Commit.Operation { 147 - case models.CommitOperationCreate, models.CommitOperationUpdate: 148 - raw := e.Commit.Record 149 - record := tangled.Repo{} 150 - err = json.Unmarshal(raw, &record) 151 - if err != nil { 152 - l.Error("invalid record", "error", err) 153 - return err 154 - } 155 - 156 - domain := s.cfg.Server.Hostname 157 - 158 - // no spindle configured for this repo 159 - if record.Spindle == nil { 160 - l.Info("no spindle configured", "name", record.Name) 161 - return nil 162 - } 163 - 164 - // this repo did not want this spindle 165 - if *record.Spindle != domain { 166 - l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 167 - return nil 168 - } 169 - 170 - // add this repo to the watch list 171 - if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil { 172 - l.Error("failed to add repo", "error", err) 173 - return fmt.Errorf("failed to add repo: %w", err) 174 - } 175 - 176 - repoAt := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, e.Commit.Collection, e.Commit.RKey)) 177 - 178 - // add repo to rbac 179 - if err := s.e.AddRepo(repoAt); err != nil { 180 - l.Error("failed to add repo to enforcer", "error", err) 181 - return fmt.Errorf("failed to add repo: %w", err) 182 - } 183 - 184 - // add collaborators to rbac 185 - if err := s.fetchAndAddCollaborators(ctx, repoAt); err != nil { 186 - return err 187 - } 188 - 189 - // add this knot to the event consumer 190 - src := eventconsumer.NewKnotSource(record.Knot) 191 - s.ks.AddSource(context.Background(), src) 192 - 193 - return nil 194 - 195 - } 196 - return nil 197 - } 198 - 199 - func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 200 - var err error 201 - 202 - l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 203 - 204 - l.Info("ingesting collaborator record") 205 - 206 - switch e.Commit.Operation { 207 - case models.CommitOperationCreate, models.CommitOperationUpdate: 208 - raw := e.Commit.Record 209 - record := tangled.RepoCollaborator{} 210 - err = json.Unmarshal(raw, &record) 211 - if err != nil { 212 - l.Error("invalid record", "error", err) 213 - return err 214 - } 215 - 216 - subjectId, err := s.res.ResolveIdent(ctx, record.Subject) 217 - if err != nil || subjectId.Handle.IsInvalidHandle() { 218 - return err 219 - } 220 - 221 - repoAt, err := syntax.ParseATURI(record.Repo) 222 - if err != nil { 223 - l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 224 - return nil 225 - } 226 - 227 - // check perms for this user 228 - if ok, err := s.e.IsRepoCollaboratorInviteAllowed(syntax.DID(e.Did), repoAt); !ok || err != nil { 229 - return fmt.Errorf("insufficient permissions: %w", err) 230 - } 231 - 232 - // add collaborator to rbac 233 - if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), repoAt); err != nil { 234 - l.Error("failed to add repo to enforcer", "error", err) 235 - return fmt.Errorf("failed to add repo: %w", err) 236 - } 237 - 238 - return nil 239 - } 240 - return nil 241 - } 242 - 243 - func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, repo syntax.ATURI) error { 244 - l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 245 - 246 - l.Info("fetching and adding existing collaborators") 247 - 248 - ident, err := s.res.ResolveIdent(ctx, repo.Authority().String()) 249 - if err != nil || ident.Handle.IsInvalidHandle() { 250 - return fmt.Errorf("failed to resolve handle: %w", err) 251 - } 252 - 253 - xrpcc := xrpc.Client{ 254 - Host: ident.PDSEndpoint(), 255 - } 256 - 257 - resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, ident.DID.String(), false) 258 - if err != nil { 259 - return err 260 - } 261 - 262 - var errs error 263 - for _, r := range resp.Records { 264 - if r == nil { 265 - continue 266 - } 267 - record := r.Value.Val.(*tangled.RepoCollaborator) 268 - 269 - if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 270 - l.Error("failed to add repo to enforcer", "error", err) 271 - errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 272 - } 273 - } 274 - 275 - return errs 276 - }
+14 -30
spindle/server.go
··· 9 9 "maps" 10 10 "net/http" 11 11 12 + "github.com/bluesky-social/indigo/atproto/syntax" 12 13 "github.com/go-chi/chi/v5" 13 14 "tangled.org/core/api/tangled" 14 15 "tangled.org/core/eventconsumer" 15 16 "tangled.org/core/eventconsumer/cursor" 16 17 "tangled.org/core/idresolver" 17 - "tangled.org/core/jetstream" 18 18 "tangled.org/core/log" 19 19 "tangled.org/core/notifier" 20 20 "tangled.org/core/rbac2" ··· 26 26 "tangled.org/core/spindle/queue" 27 27 "tangled.org/core/spindle/secrets" 28 28 "tangled.org/core/spindle/xrpc" 29 + "tangled.org/core/tap" 29 30 "tangled.org/core/xrpc/serviceauth" 30 31 ) 31 32 ··· 33 34 var motd []byte 34 35 35 36 type Spindle struct { 36 - jc *jetstream.JetstreamClient 37 + tap *tap.Client 37 38 db *db.DB 38 39 e *rbac2.Enforcer 39 40 l *slog.Logger ··· 90 91 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 91 92 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 92 93 93 - collections := []string{ 94 - tangled.SpindleMemberNSID, 95 - tangled.RepoNSID, 96 - tangled.RepoCollaboratorNSID, 97 - } 98 - jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 99 - if err != nil { 100 - return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 101 - } 102 - jc.AddDid(cfg.Server.Owner.String()) 103 - 104 - // Check if the spindle knows about any Dids; 105 - dids, err := d.GetAllDids() 106 - if err != nil { 107 - return nil, fmt.Errorf("failed to get all dids: %w", err) 108 - } 109 - for _, d := range dids { 110 - jc.AddDid(d) 111 - } 94 + tap := tap.NewClient(cfg.Server.TapUrl, "") 112 95 113 96 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 114 97 115 98 spindle := &Spindle{ 116 - jc: jc, 99 + tap: &tap, 117 100 e: e, 118 101 db: d, 119 102 l: logger, ··· 134 117 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 135 118 if err != nil { 136 119 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 137 - } 138 - 139 - err = jc.StartJetstream(ctx, spindle.ingest()) 140 - if err != nil { 141 - return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 142 120 } 143 121 144 122 // for each incoming sh.tangled.pipeline, we execute ··· 208 186 s.ks.Start(ctx) 209 187 }() 210 188 189 + go func() { 190 + s.l.Info("starting tap stream consumer") 191 + s.tap.Connect(ctx, &tap.SimpleIndexer{ 192 + EventHandler: s.processEvent, 193 + }) 194 + }() 195 + 211 196 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 212 197 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 213 198 } ··· 288 273 } 289 274 290 275 // filter by repos 291 - _, err = s.db.GetRepo( 292 - tpl.TriggerMetadata.Repo.Knot, 293 - tpl.TriggerMetadata.Repo.Did, 276 + _, err = s.db.GetRepoWithName( 277 + syntax.DID(tpl.TriggerMetadata.Repo.Did), 294 278 tpl.TriggerMetadata.Repo.Repo, 295 279 ) 296 280 if err != nil {
+281
spindle/tap.go
··· 1 + package spindle 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/api/tangled" 11 + "tangled.org/core/eventconsumer" 12 + "tangled.org/core/spindle/db" 13 + "tangled.org/core/tap" 14 + ) 15 + 16 + func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 17 + l := s.l.With("component", "tapIndexer") 18 + 19 + var err error 20 + switch evt.Type { 21 + case tap.EvtRecord: 22 + switch evt.Record.Collection.String() { 23 + case tangled.SpindleMemberNSID: 24 + err = s.processMember(ctx, evt) 25 + case tangled.RepoNSID: 26 + err = s.processRepo(ctx, evt) 27 + case tangled.RepoCollaboratorNSID: 28 + err = s.processCollaborator(ctx, evt) 29 + case tangled.RepoPullNSID: 30 + err = s.processPull(ctx, evt) 31 + } 32 + case tap.EvtIdentity: 33 + // no-op 34 + } 35 + 36 + if err != nil { 37 + l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 38 + return err 39 + } 40 + return nil 41 + } 42 + 43 + // NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 44 + 45 + func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 46 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 47 + 48 + l.Info("processing spindle.member record") 49 + 50 + // check perms for this user 51 + if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 52 + l.Warn("forbidden request", "did", evt.Record.Did, "error", err) 53 + return nil 54 + } 55 + 56 + switch evt.Record.Action { 57 + case tap.RecordCreateAction, tap.RecordUpdateAction: 58 + record := tangled.SpindleMember{} 59 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 60 + return fmt.Errorf("parsing record: %w", err) 61 + } 62 + 63 + domain := s.cfg.Server.Hostname 64 + if record.Instance != domain { 65 + l.Info("domain mismatch", "domain", record.Instance, "expected", domain) 66 + return nil 67 + } 68 + 69 + created, err := time.Parse(record.CreatedAt, time.RFC3339) 70 + if err != nil { 71 + created = time.Now() 72 + } 73 + if err := db.AddSpindleMember(s.db, db.SpindleMember{ 74 + Did: evt.Record.Did, 75 + Rkey: evt.Record.Rkey.String(), 76 + Instance: record.Instance, 77 + Subject: syntax.DID(record.Subject), 78 + Created: created, 79 + }); err != nil { 80 + l.Error("failed to add member", "error", err) 81 + return fmt.Errorf("adding member to db: %w", err) 82 + } 83 + if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 84 + return fmt.Errorf("adding member to rbac: %w", err) 85 + } 86 + if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 87 + return fmt.Errorf("adding did to tap", err) 88 + } 89 + 90 + l.Info("added member", "member", record.Subject) 91 + return nil 92 + 93 + case tap.RecordDeleteAction: 94 + var ( 95 + did = evt.Record.Did.String() 96 + rkey = evt.Record.Rkey.String() 97 + ) 98 + member, err := db.GetSpindleMember(s.db, did, rkey) 99 + if err != nil { 100 + return fmt.Errorf("finding member: %w", err) 101 + } 102 + 103 + if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 104 + return fmt.Errorf("removing member from db: %w", err) 105 + } 106 + if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil { 107 + return fmt.Errorf("removing member from rbac: %w", err) 108 + } 109 + if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 110 + return fmt.Errorf("removing did from tap: %w", err) 111 + } 112 + 113 + l.Info("removed member", "member", member.Subject) 114 + return nil 115 + } 116 + return nil 117 + } 118 + 119 + func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 120 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 121 + 122 + l.Info("processing collaborator record") 123 + switch evt.Record.Action { 124 + case tap.RecordCreateAction, tap.RecordUpdateAction: 125 + record := tangled.RepoCollaborator{} 126 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 127 + l.Error("invalid record", "err", err) 128 + return fmt.Errorf("parsing record: %w", err) 129 + } 130 + 131 + // check perms for this user 132 + if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil { 133 + l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 134 + return nil 135 + } 136 + 137 + if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{ 138 + Did: evt.Record.Did, 139 + Rkey: evt.Record.Rkey, 140 + Repo: syntax.ATURI(record.Repo), 141 + Subject: syntax.DID(record.Subject), 142 + }); err != nil { 143 + return fmt.Errorf("adding collaborator to db: %w", err) 144 + } 145 + if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 146 + return fmt.Errorf("adding collaborator to rbac: %w", err) 147 + } 148 + if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 149 + return fmt.Errorf("adding did to tap: %w", err) 150 + } 151 + 152 + l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 153 + return nil 154 + 155 + case tap.RecordDeleteAction: 156 + // get existing collaborator 157 + collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 158 + if err != nil { 159 + return fmt.Errorf("failed to get existing collaborator info: %w", err) 160 + } 161 + 162 + // check perms for this user 163 + if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil { 164 + l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 165 + return nil 166 + } 167 + 168 + if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil { 169 + return fmt.Errorf("removing collaborator from db: %w", err) 170 + } 171 + if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil { 172 + return fmt.Errorf("removing collaborator from rbac: %w", err) 173 + } 174 + if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 175 + return fmt.Errorf("removing did from tap: %w", err) 176 + } 177 + 178 + l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) 179 + return nil 180 + } 181 + return nil 182 + } 183 + 184 + func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 185 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 186 + 187 + l.Info("processing repo record") 188 + 189 + // check perms for this user 190 + if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 191 + l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 192 + return nil 193 + } 194 + 195 + switch evt.Record.Action { 196 + case tap.RecordCreateAction, tap.RecordUpdateAction: 197 + record := tangled.Repo{} 198 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 199 + return fmt.Errorf("parsing record: %w", err) 200 + } 201 + 202 + domain := s.cfg.Server.Hostname 203 + if record.Spindle == nil || *record.Spindle != domain { 204 + if record.Spindle == nil { 205 + l.Info("spindle isn't configured", "name", record.Name) 206 + } else { 207 + l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 208 + } 209 + if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 210 + return fmt.Errorf("deleting repo from db: %w", err) 211 + } 212 + return nil 213 + } 214 + 215 + if err := s.db.PutRepo(&db.Repo{ 216 + Did: evt.Record.Did, 217 + Rkey: evt.Record.Rkey, 218 + Name: record.Name, 219 + Knot: record.Knot, 220 + }); err != nil { 221 + return fmt.Errorf("adding repo to db: %w", err) 222 + } 223 + 224 + if err := s.e.AddRepo(evt.Record.AtUri()); err != nil { 225 + return fmt.Errorf("adding repo to rbac") 226 + } 227 + 228 + // add this knot to the event consumer 229 + src := eventconsumer.NewKnotSource(record.Knot) 230 + s.ks.AddSource(context.Background(), src) 231 + 232 + l.Info("added repo", "repo", evt.Record.AtUri()) 233 + return nil 234 + 235 + case tap.RecordDeleteAction: 236 + // check perms for this user 237 + if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 238 + l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 239 + return nil 240 + } 241 + 242 + if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 243 + return fmt.Errorf("deleting repo from db: %w", err) 244 + } 245 + 246 + if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil { 247 + return fmt.Errorf("deleting repo from rbac: %w", err) 248 + } 249 + 250 + l.Info("deleted repo", "repo", evt.Record.AtUri()) 251 + return nil 252 + } 253 + return nil 254 + } 255 + 256 + func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 257 + l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 258 + 259 + l.Info("processing pull record") 260 + 261 + switch evt.Record.Action { 262 + case tap.RecordCreateAction, tap.RecordUpdateAction: 263 + // TODO 264 + case tap.RecordDeleteAction: 265 + // TODO 266 + } 267 + return nil 268 + } 269 + 270 + func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error { 271 + known, err := s.db.IsKnownDid(syntax.DID(did)) 272 + if err != nil { 273 + return fmt.Errorf("ensuring did known state: %w", err) 274 + } 275 + if !known { 276 + if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 277 + return fmt.Errorf("removing did from tap: %w", err) 278 + } 279 + } 280 + return nil 281 + }