Monorepo for Tangled tangled.org
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/eventconsumer" 12 "tangled.org/core/spindle/db" 13 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/xrpc" 17 "github.com/bluesky-social/jetstream/pkg/models" 18) 19 20type Ingester func(ctx context.Context, e *models.Event) error 21 22func (s *Spindle) ingest() Ingester { 23 return func(ctx context.Context, e *models.Event) error { 24 var err error 25 defer func() { 26 eventTime := e.TimeUS 27 lastTimeUs := eventTime + 1 28 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 29 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 30 } 31 }() 32 33 if e.Kind != models.EventKindCommit { 34 return nil 35 } 36 37 switch e.Commit.Collection { 38 case tangled.SpindleMemberNSID: 39 err = s.ingestMember(ctx, e) 40 case tangled.RepoNSID: 41 err = s.ingestRepo(ctx, e) 42 case tangled.RepoCollaboratorNSID: 43 err = s.ingestCollaborator(ctx, e) 44 } 45 46 if err != nil { 47 s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 48 } 49 50 return nil 51 } 52} 53 54func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 55 var err error 56 did := e.Did 57 rkey := e.Commit.RKey 58 59 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 60 61 switch e.Commit.Operation { 62 case models.CommitOperationCreate, models.CommitOperationUpdate: 63 raw := e.Commit.Record 64 record := tangled.SpindleMember{} 65 err = json.Unmarshal(raw, &record) 66 if err != nil { 67 l.Error("invalid record", "error", err) 68 return err 69 } 70 71 domain := s.cfg.Server.Hostname 72 recordInstance := record.Instance 73 74 if recordInstance != domain { 75 l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 76 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 77 } 78 79 ok, err := s.e.IsSpindleMemberInviteAllowed(syntax.DID(did), s.cfg.Server.Did()) 80 if err != nil || !ok { 81 l.Error("failed to add member", "did", did, "error", err) 82 return fmt.Errorf("failed to enforce permissions: %w", err) 83 } 84 85 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 86 Did: syntax.DID(did), 87 Rkey: rkey, 88 Instance: recordInstance, 89 Subject: syntax.DID(record.Subject), 90 Created: time.Now(), 91 }); err != nil { 92 l.Error("failed to add member", "error", err) 93 return fmt.Errorf("failed to add member: %w", err) 94 } 95 96 if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 97 l.Error("failed to add member", "error", err) 98 return fmt.Errorf("failed to add member: %w", err) 99 } 100 l.Info("added member from firehose", "member", record.Subject) 101 102 if err := s.db.AddDid(record.Subject); err != nil { 103 l.Error("failed to add did", "error", err) 104 return fmt.Errorf("failed to add did: %w", err) 105 } 106 s.jc.AddDid(record.Subject) 107 108 return nil 109 110 case models.CommitOperationDelete: 111 record, err := db.GetSpindleMember(s.db, did, rkey) 112 if err != nil { 113 l.Error("failed to find member", "error", err) 114 return fmt.Errorf("failed to find member: %w", err) 115 } 116 117 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 118 l.Error("failed to remove member", "error", err) 119 return fmt.Errorf("failed to remove member: %w", err) 120 } 121 122 if err := s.e.RemoveSpindleMember(record.Subject, s.cfg.Server.Did()); err != nil { 123 l.Error("failed to add member", "error", err) 124 return fmt.Errorf("failed to add member: %w", err) 125 } 126 l.Info("added member from firehose", "member", record.Subject) 127 128 if err := s.db.RemoveDid(record.Subject.String()); err != nil { 129 l.Error("failed to add did", "error", err) 130 return fmt.Errorf("failed to add did: %w", err) 131 } 132 s.jc.RemoveDid(record.Subject.String()) 133 134 } 135 return nil 136} 137 138func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 139 var err error 140 did := e.Did 141 142 l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 143 144 l.Info("ingesting repo record", "did", did) 145 146 switch e.Commit.Operation { 147 case models.CommitOperationCreate, models.CommitOperationUpdate: 148 raw := e.Commit.Record 149 record := tangled.Repo{} 150 err = json.Unmarshal(raw, &record) 151 if err != nil { 152 l.Error("invalid record", "error", err) 153 return err 154 } 155 156 domain := s.cfg.Server.Hostname 157 158 // no spindle configured for this repo 159 if record.Spindle == nil { 160 l.Info("no spindle configured", "name", record.Name) 161 return nil 162 } 163 164 // this repo did not want this spindle 165 if *record.Spindle != domain { 166 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 167 return nil 168 } 169 170 // add this repo to the watch list 171 if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil { 172 l.Error("failed to add repo", "error", err) 173 return fmt.Errorf("failed to add repo: %w", err) 174 } 175 176 repoAt := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, e.Commit.Collection, e.Commit.RKey)) 177 178 // add repo to rbac 179 if err := s.e.AddRepo(repoAt); err != nil { 180 l.Error("failed to add repo to enforcer", "error", err) 181 return fmt.Errorf("failed to add repo: %w", err) 182 } 183 184 // add collaborators to rbac 185 if err := s.fetchAndAddCollaborators(ctx, repoAt); err != nil { 186 return err 187 } 188 189 // add this knot to the event consumer 190 src := eventconsumer.NewKnotSource(record.Knot) 191 s.ks.AddSource(context.Background(), src) 192 193 return nil 194 195 } 196 return nil 197} 198 199func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 200 var err error 201 202 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 203 204 l.Info("ingesting collaborator record") 205 206 switch e.Commit.Operation { 207 case models.CommitOperationCreate, models.CommitOperationUpdate: 208 raw := e.Commit.Record 209 record := tangled.RepoCollaborator{} 210 err = json.Unmarshal(raw, &record) 211 if err != nil { 212 l.Error("invalid record", "error", err) 213 return err 214 } 215 216 subjectId, err := s.res.ResolveIdent(ctx, record.Subject) 217 if err != nil || subjectId.Handle.IsInvalidHandle() { 218 return err 219 } 220 221 repoAt, err := syntax.ParseATURI(record.Repo) 222 if err != nil { 223 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 224 return nil 225 } 226 227 // check perms for this user 228 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(syntax.DID(e.Did), repoAt); !ok || err != nil { 229 return fmt.Errorf("insufficient permissions: %w", err) 230 } 231 232 // add collaborator to rbac 233 if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), repoAt); err != nil { 234 l.Error("failed to add repo to enforcer", "error", err) 235 return fmt.Errorf("failed to add repo: %w", err) 236 } 237 238 return nil 239 } 240 return nil 241} 242 243func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, repo syntax.ATURI) error { 244 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 245 246 l.Info("fetching and adding existing collaborators") 247 248 ident, err := s.res.ResolveIdent(ctx, repo.Authority().String()) 249 if err != nil || ident.Handle.IsInvalidHandle() { 250 return fmt.Errorf("failed to resolve handle: %w", err) 251 } 252 253 xrpcc := xrpc.Client{ 254 Host: ident.PDSEndpoint(), 255 } 256 257 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, ident.DID.String(), false) 258 if err != nil { 259 return err 260 } 261 262 var errs error 263 for _, r := range resp.Records { 264 if r == nil { 265 continue 266 } 267 record := r.Value.Val.(*tangled.RepoCollaborator) 268 269 if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 270 l.Error("failed to add repo to enforcer", "error", err) 271 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 272 } 273 } 274 275 return errs 276}