Monorepo for Tangled tangled.org

appview: refactor ingester, ingest spindle records

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 460e1816 f1a8e659

verified
Changed files
+213 -92
appview
+201 -91
appview/ingester.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 - "errors" 7 6 "fmt" 8 - "io" 9 - "log" 10 - "net/http" 11 - "strings" 7 + "log/slog" 12 8 "time" 13 9 14 10 "github.com/bluesky-social/indigo/atproto/syntax" ··· 16 12 "github.com/go-git/go-git/v5/plumbing" 17 13 "github.com/ipfs/go-cid" 18 14 "tangled.sh/tangled.sh/core/api/tangled" 15 + "tangled.sh/tangled.sh/core/appview/config" 19 16 "tangled.sh/tangled.sh/core/appview/db" 17 + "tangled.sh/tangled.sh/core/appview/idresolver" 18 + "tangled.sh/tangled.sh/core/appview/spindleverify" 20 19 "tangled.sh/tangled.sh/core/rbac" 21 20 ) 22 21 23 - type Ingester func(ctx context.Context, e *models.Event) error 22 + type Ingester struct { 23 + Db db.DbWrapper 24 + Enforcer *rbac.Enforcer 25 + IdResolver *idresolver.Resolver 26 + Config *config.Config 27 + Logger *slog.Logger 28 + } 29 + 30 + type processFunc func(ctx context.Context, e *models.Event) error 24 31 25 - func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer) Ingester { 32 + func (i *Ingester) Ingest() processFunc { 26 33 return func(ctx context.Context, e *models.Event) error { 27 34 var err error 28 35 defer func() { 29 36 eventTime := e.TimeUS 30 37 lastTimeUs := eventTime + 1 31 - if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 38 + if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 32 39 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 33 40 } 34 41 }() ··· 39 46 40 47 switch e.Commit.Collection { 41 48 case tangled.GraphFollowNSID: 42 - ingestFollow(&d, e) 49 + err = i.ingestFollow(e) 43 50 case tangled.FeedStarNSID: 44 - ingestStar(&d, e) 51 + err = i.ingestStar(e) 45 52 case tangled.PublicKeyNSID: 46 - ingestPublicKey(&d, e) 53 + err = i.ingestPublicKey(e) 47 54 case tangled.RepoArtifactNSID: 48 - ingestArtifact(&d, e, enforcer) 55 + err = i.ingestArtifact(e) 49 56 case tangled.ActorProfileNSID: 50 - ingestProfile(&d, e) 57 + err = i.ingestProfile(e) 51 58 case tangled.SpindleMemberNSID: 52 - ingestSpindleMember(&d, e, enforcer) 59 + err = i.ingestSpindleMember(e) 53 60 case tangled.SpindleNSID: 54 - ingestSpindle(&d, e, true) // TODO: change this to dynamic 61 + err = i.ingestSpindle(e) 62 + } 63 + 64 + if err != nil { 65 + l := i.Logger.With("nsid", e.Commit.Collection) 66 + l.Error("error ingesting record", "err", err) 55 67 } 56 68 57 69 return err 58 70 } 59 71 } 60 72 61 - func ingestStar(d *db.DbWrapper, e *models.Event) error { 73 + func (i *Ingester) ingestStar(e *models.Event) error { 62 74 var err error 63 75 did := e.Did 64 76 77 + l := i.Logger.With("handler", "ingestStar") 78 + l = l.With("nsid", e.Commit.Collection) 79 + 65 80 switch e.Commit.Operation { 66 81 case models.CommitOperationCreate, models.CommitOperationUpdate: 67 82 var subjectUri syntax.ATURI ··· 70 85 record := tangled.FeedStar{} 71 86 err := json.Unmarshal(raw, &record) 72 87 if err != nil { 73 - log.Println("invalid record") 88 + l.Error("invalid record", "err", err) 74 89 return err 75 90 } 76 91 77 92 subjectUri, err = syntax.ParseATURI(record.Subject) 78 93 if err != nil { 79 - log.Println("invalid record") 94 + l.Error("invalid record", "err", err) 80 95 return err 81 96 } 82 - err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 97 + err = db.AddStar(i.Db, did, subjectUri, e.Commit.RKey) 83 98 case models.CommitOperationDelete: 84 - err = db.DeleteStarByRkey(d, did, e.Commit.RKey) 99 + err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 85 100 } 86 101 87 102 if err != nil { ··· 91 106 return nil 92 107 } 93 108 94 - func ingestFollow(d *db.DbWrapper, e *models.Event) error { 109 + func (i *Ingester) ingestFollow(e *models.Event) error { 95 110 var err error 96 111 did := e.Did 112 + 113 + l := i.Logger.With("handler", "ingestFollow") 114 + l = l.With("nsid", e.Commit.Collection) 97 115 98 116 switch e.Commit.Operation { 99 117 case models.CommitOperationCreate, models.CommitOperationUpdate: ··· 101 119 record := tangled.GraphFollow{} 102 120 err = json.Unmarshal(raw, &record) 103 121 if err != nil { 104 - log.Println("invalid record") 122 + l.Error("invalid record", "err", err) 105 123 return err 106 124 } 107 125 108 126 subjectDid := record.Subject 109 - err = db.AddFollow(d, did, subjectDid, e.Commit.RKey) 127 + err = db.AddFollow(i.Db, did, subjectDid, e.Commit.RKey) 110 128 case models.CommitOperationDelete: 111 - err = db.DeleteFollowByRkey(d, did, e.Commit.RKey) 129 + err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 112 130 } 113 131 114 132 if err != nil { ··· 118 136 return nil 119 137 } 120 138 121 - func ingestPublicKey(d *db.DbWrapper, e *models.Event) error { 139 + func (i *Ingester) ingestPublicKey(e *models.Event) error { 122 140 did := e.Did 123 141 var err error 124 142 143 + l := i.Logger.With("handler", "ingestPublicKey") 144 + l = l.With("nsid", e.Commit.Collection) 145 + 125 146 switch e.Commit.Operation { 126 147 case models.CommitOperationCreate, models.CommitOperationUpdate: 127 - log.Println("processing add of pubkey") 148 + l.Debug("processing add of pubkey") 128 149 raw := json.RawMessage(e.Commit.Record) 129 150 record := tangled.PublicKey{} 130 151 err = json.Unmarshal(raw, &record) 131 152 if err != nil { 132 - log.Printf("invalid record: %s", err) 153 + l.Error("invalid record", "err", err) 133 154 return err 134 155 } 135 156 136 157 name := record.Name 137 158 key := record.Key 138 - err = db.AddPublicKey(d, did, name, key, e.Commit.RKey) 159 + err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 139 160 case models.CommitOperationDelete: 140 - log.Println("processing delete of pubkey") 141 - err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey) 161 + l.Debug("processing delete of pubkey") 162 + err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 142 163 } 143 164 144 165 if err != nil { ··· 148 169 return nil 149 170 } 150 171 151 - func ingestArtifact(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error { 172 + func (i *Ingester) ingestArtifact(e *models.Event) error { 152 173 did := e.Did 153 174 var err error 154 175 176 + l := i.Logger.With("handler", "ingestArtifact") 177 + l = l.With("nsid", e.Commit.Collection) 178 + 155 179 switch e.Commit.Operation { 156 180 case models.CommitOperationCreate, models.CommitOperationUpdate: 157 181 raw := json.RawMessage(e.Commit.Record) 158 182 record := tangled.RepoArtifact{} 159 183 err = json.Unmarshal(raw, &record) 160 184 if err != nil { 161 - log.Printf("invalid record: %s", err) 185 + l.Error("invalid record", "err", err) 162 186 return err 163 187 } 164 188 ··· 167 191 return err 168 192 } 169 193 170 - repo, err := db.GetRepoByAtUri(d, repoAt.String()) 194 + repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 171 195 if err != nil { 172 196 return err 173 197 } 174 198 175 - ok, err := enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 199 + ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 176 200 if err != nil || !ok { 177 201 return err 178 202 } ··· 194 218 MimeType: record.Artifact.MimeType, 195 219 } 196 220 197 - err = db.AddArtifact(d, artifact) 221 + err = db.AddArtifact(i.Db, artifact) 198 222 case models.CommitOperationDelete: 199 - err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 223 + err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 200 224 } 201 225 202 226 if err != nil { ··· 206 230 return nil 207 231 } 208 232 209 - func ingestProfile(d *db.DbWrapper, e *models.Event) error { 233 + func (i *Ingester) ingestProfile(e *models.Event) error { 210 234 did := e.Did 211 235 var err error 236 + 237 + l := i.Logger.With("handler", "ingestProfile") 238 + l = l.With("nsid", e.Commit.Collection) 212 239 213 240 if e.Commit.RKey != "self" { 214 241 return fmt.Errorf("ingestProfile only ingests `self` record") ··· 220 247 record := tangled.ActorProfile{} 221 248 err = json.Unmarshal(raw, &record) 222 249 if err != nil { 223 - log.Printf("invalid record: %s", err) 250 + l.Error("invalid record", "err", err) 224 251 return err 225 252 } 226 253 ··· 267 294 PinnedRepos: pinned, 268 295 } 269 296 270 - ddb, ok := d.Execer.(*db.DB) 297 + ddb, ok := i.Db.Execer.(*db.DB) 271 298 if !ok { 272 299 return fmt.Errorf("failed to index profile record, invalid db cast") 273 300 } ··· 284 311 285 312 err = db.UpsertProfile(tx, &profile) 286 313 case models.CommitOperationDelete: 287 - err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 314 + err = db.DeleteArtifact(i.Db, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 288 315 } 289 316 290 317 if err != nil { ··· 294 321 return nil 295 322 } 296 323 297 - func ingestSpindleMember(_ *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error { 324 + func (i *Ingester) ingestSpindleMember(e *models.Event) error { 298 325 did := e.Did 299 326 var err error 300 327 328 + l := i.Logger.With("handler", "ingestSpindleMember") 329 + l = l.With("nsid", e.Commit.Collection) 330 + 301 331 switch e.Commit.Operation { 302 332 case models.CommitOperationCreate: 303 333 raw := json.RawMessage(e.Commit.Record) 304 334 record := tangled.SpindleMember{} 305 335 err = json.Unmarshal(raw, &record) 306 336 if err != nil { 307 - log.Printf("invalid record: %s", err) 337 + l.Error("invalid record", "err", err) 308 338 return err 309 339 } 310 340 311 341 // only spindle owner can invite to spindles 312 - ok, err := enforcer.IsSpindleInviteAllowed(did, record.Instance) 342 + ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 313 343 if err != nil || !ok { 314 344 return fmt.Errorf("failed to enforce permissions: %w", err) 315 345 } 316 346 317 - err = enforcer.AddSpindleMember(record.Instance, record.Subject) 347 + memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 318 348 if err != nil { 319 - return fmt.Errorf("failed to add member: %w", err) 349 + return err 350 + } 351 + 352 + if memberId.Handle.IsInvalidHandle() { 353 + return err 354 + } 355 + 356 + ddb, ok := i.Db.Execer.(*db.DB) 357 + if !ok { 358 + return fmt.Errorf("failed to index profile record, invalid db cast") 359 + } 360 + 361 + err = db.AddSpindleMember(ddb, db.SpindleMember{ 362 + Did: syntax.DID(did), 363 + Rkey: e.Commit.RKey, 364 + Instance: record.Instance, 365 + Subject: memberId.DID, 366 + }) 367 + if !ok { 368 + return fmt.Errorf("failed to add to db: %w", err) 369 + } 370 + 371 + err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 372 + if err != nil { 373 + return fmt.Errorf("failed to update ACLs: %w", err) 374 + } 375 + case models.CommitOperationDelete: 376 + rkey := e.Commit.RKey 377 + 378 + ddb, ok := i.Db.Execer.(*db.DB) 379 + if !ok { 380 + return fmt.Errorf("failed to index profile record, invalid db cast") 381 + } 382 + 383 + // get record from db first 384 + members, err := db.GetSpindleMembers( 385 + ddb, 386 + db.FilterEq("did", did), 387 + db.FilterEq("rkey", rkey), 388 + ) 389 + if err != nil || len(members) != 1 { 390 + return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 391 + } 392 + member := members[0] 393 + 394 + tx, err := ddb.Begin() 395 + if err != nil { 396 + return fmt.Errorf("failed to start txn: %w", err) 397 + } 398 + 399 + // remove record by rkey && update enforcer 400 + if err = db.RemoveSpindleMember( 401 + tx, 402 + db.FilterEq("did", did), 403 + db.FilterEq("rkey", rkey), 404 + ); err != nil { 405 + return fmt.Errorf("failed to remove from db: %w", err) 406 + } 407 + 408 + // update enforcer 409 + err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 410 + if err != nil { 411 + return fmt.Errorf("failed to update ACLs: %w", err) 412 + } 413 + 414 + if err = tx.Commit(); err != nil { 415 + return fmt.Errorf("failed to commit txn: %w", err) 416 + } 417 + 418 + if err = i.Enforcer.E.SavePolicy(); err != nil { 419 + return fmt.Errorf("failed to save ACLs: %w", err) 320 420 } 321 421 } 322 422 323 423 return nil 324 424 } 325 425 326 - func ingestSpindle(d *db.DbWrapper, e *models.Event, dev bool) error { 426 + func (i *Ingester) ingestSpindle(e *models.Event) error { 327 427 did := e.Did 328 428 var err error 329 429 430 + l := i.Logger.With("handler", "ingestSpindle") 431 + l = l.With("nsid", e.Commit.Collection) 432 + 330 433 switch e.Commit.Operation { 331 434 case models.CommitOperationCreate: 332 435 raw := json.RawMessage(e.Commit.Record) 333 436 record := tangled.Spindle{} 334 437 err = json.Unmarshal(raw, &record) 335 438 if err != nil { 336 - log.Printf("invalid record: %s", err) 439 + l.Error("invalid record", "err", err) 337 440 return err 338 441 } 339 442 340 - // this is a special record whose rkey is the instance of the spindle itself 341 443 instance := e.Commit.RKey 342 444 343 - owner, err := fetchOwner(context.TODO(), instance, dev) 445 + ddb, ok := i.Db.Execer.(*db.DB) 446 + if !ok { 447 + return fmt.Errorf("failed to index profile record, invalid db cast") 448 + } 449 + 450 + err := db.AddSpindle(ddb, db.Spindle{ 451 + Owner: syntax.DID(did), 452 + Instance: instance, 453 + }) 344 454 if err != nil { 345 - log.Printf("failed to verify owner of %s: %s", instance, err) 455 + l.Error("failed to add spindle to db", "err", err, "instance", instance) 346 456 return err 347 457 } 348 458 349 - // verify that the spindle owner points back to this did 350 - if owner != did { 351 - log.Printf("incorrect owner for domain: %s, %s != %s", instance, owner, did) 459 + err = spindleverify.RunVerification(context.Background(), instance, did, i.Config.Core.Dev) 460 + if err != nil { 461 + l.Error("failed to add spindle to db", "err", err, "instance", instance) 352 462 return err 353 463 } 354 464 355 - // mark this spindle as registered 356 - ddb, ok := d.Execer.(*db.DB) 465 + _, err = spindleverify.MarkVerified(ddb, i.Enforcer, instance, did) 466 + if err != nil { 467 + return fmt.Errorf("failed to mark verified: %w", err) 468 + } 469 + 470 + return nil 471 + 472 + case models.CommitOperationDelete: 473 + instance := e.Commit.RKey 474 + 475 + ddb, ok := i.Db.Execer.(*db.DB) 357 476 if !ok { 358 477 return fmt.Errorf("failed to index profile record, invalid db cast") 359 478 } 360 479 361 - _, err = db.VerifySpindle( 362 - ddb, 480 + tx, err := ddb.Begin() 481 + if err != nil { 482 + return err 483 + } 484 + defer func() { 485 + tx.Rollback() 486 + i.Enforcer.E.LoadPolicy() 487 + }() 488 + 489 + err = db.DeleteSpindle( 490 + tx, 363 491 db.FilterEq("owner", did), 364 492 db.FilterEq("instance", instance), 365 493 ) 494 + if err != nil { 495 + return err 496 + } 366 497 367 - return err 368 - } 498 + err = i.Enforcer.RemoveSpindle(instance) 499 + if err != nil { 500 + return err 501 + } 369 502 370 - return nil 371 - } 503 + err = tx.Commit() 504 + if err != nil { 505 + return err 506 + } 372 507 373 - func fetchOwner(ctx context.Context, domain string, dev bool) (string, error) { 374 - scheme := "https" 375 - if dev { 376 - scheme = "http" 377 - } 378 - 379 - url := fmt.Sprintf("%s://%s/owner", scheme, domain) 380 - req, err := http.NewRequest("GET", url, nil) 381 - if err != nil { 382 - return "", err 383 - } 384 - 385 - client := &http.Client{ 386 - Timeout: 1 * time.Second, 387 - } 388 - 389 - resp, err := client.Do(req.WithContext(ctx)) 390 - if err != nil || resp.StatusCode != 200 { 391 - return "", errors.New("failed to fetch /owner") 392 - } 393 - 394 - body, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) // read atmost 1kb of data 395 - if err != nil { 396 - return "", fmt.Errorf("failed to read /owner response: %w", err) 397 - } 398 - 399 - did := strings.TrimSpace(string(body)) 400 - if did == "" { 401 - return "", errors.New("empty DID in /owner response") 508 + err = i.Enforcer.E.SavePolicy() 509 + if err != nil { 510 + return err 511 + } 402 512 } 403 513 404 - return did, nil 514 + return nil 405 515 }
+12 -1
appview/state/state.go
··· 31 31 "tangled.sh/tangled.sh/core/eventconsumer" 32 32 "tangled.sh/tangled.sh/core/jetstream" 33 33 "tangled.sh/tangled.sh/core/knotclient" 34 + tlog "tangled.sh/tangled.sh/core/log" 34 35 "tangled.sh/tangled.sh/core/rbac" 35 36 ) 36 37 ··· 93 94 tangled.PublicKeyNSID, 94 95 tangled.RepoArtifactNSID, 95 96 tangled.ActorProfileNSID, 97 + tangled.SpindleMemberNSID, 98 + tangled.SpindleNSID, 96 99 }, 97 100 nil, 98 101 slog.Default(), ··· 106 109 if err != nil { 107 110 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 108 111 } 109 - err = jc.StartJetstream(ctx, appview.Ingest(wrapper, enforcer)) 112 + 113 + ingester := appview.Ingester{ 114 + Db: wrapper, 115 + Enforcer: enforcer, 116 + IdResolver: res, 117 + Config: config, 118 + Logger: tlog.New("ingester"), 119 + } 120 + err = jc.StartJetstream(ctx, ingester.Ingest()) 110 121 if err != nil { 111 122 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 112 123 }