ingester: ingest sh.tangled.repo records from jetstream #112

closed
opened by rockorager.dev targeting master

Ingest sh.tangled.repo records off the jetstream. This enables third-party clients to create tangled.sh git repositories on a user's PDS and tangled to pick up the record and create the repo on the knot, and in the appview db.

Changed files
+235 -7
appview
+55
appview/db/repos.go
··· 3 3 import ( 4 4 "database/sql" 5 5 "fmt" 6 + "strings" 6 7 "time" 7 8 8 9 "github.com/bluesky-social/indigo/atproto/syntax" ··· 397 398 398 399 return nil 399 400 } 401 + 402 + // ValidateRepo ensures the repo has a knot specified, a valid name, and does 403 + // not exist on the knot 404 + func ValidateRepo(e Execer, repo *Repo) error { 405 + if repo.Knot == "" { 406 + return fmt.Errorf("missing knot domain") 407 + } 408 + 409 + if err := validateRepoName(repo.Name); err != nil { 410 + return err 411 + } 412 + 413 + existingRepo, _ := GetRepo(e, repo.Did, repo.Name) 414 + if existingRepo != nil { 415 + return fmt.Errorf("A repo by this name already exists on %s", existingRepo.Knot) 416 + } 417 + 418 + return nil 419 + } 420 + 421 + func validateRepoName(name string) error { 422 + if name == "" { 423 + return fmt.Errorf("Repository name cannot be empty") 424 + } 425 + // check for path traversal attempts 426 + if name == "." || name == ".." || 427 + strings.Contains(name, "/") || strings.Contains(name, "\\") { 428 + return fmt.Errorf("Repository name contains invalid path characters") 429 + } 430 + 431 + // check for sequences that could be used for traversal when normalized 432 + if strings.Contains(name, "./") || strings.Contains(name, "../") || 433 + strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 434 + return fmt.Errorf("Repository name contains invalid path sequence") 435 + } 436 + 437 + // then continue with character validation 438 + for _, char := range name { 439 + if !((char >= 'a' && char <= 'z') || 440 + (char >= 'A' && char <= 'Z') || 441 + (char >= '0' && char <= '9') || 442 + char == '-' || char == '_' || char == '.') { 443 + return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 444 + } 445 + } 446 + 447 + // additional check to prevent multiple sequential dots 448 + if strings.Contains(name, "..") { 449 + return fmt.Errorf("Repository name cannot contain sequential dots") 450 + } 451 + 452 + // if all checks pass 453 + return nil 454 + }
+178 -1
appview/ingester.go
··· 5 5 "encoding/json" 6 6 "fmt" 7 7 "log" 8 + "net/http" 8 9 "time" 9 10 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 11 12 "github.com/bluesky-social/jetstream/pkg/models" 13 + securejoin "github.com/cyphar/filepath-securejoin" 12 14 "github.com/go-git/go-git/v5/plumbing" 13 15 "github.com/ipfs/go-cid" 14 16 "tangled.sh/tangled.sh/core/api/tangled" 15 17 "tangled.sh/tangled.sh/core/appview/db" 18 + "tangled.sh/tangled.sh/core/appview/knotclient" 16 19 "tangled.sh/tangled.sh/core/rbac" 17 20 ) 18 21 19 22 type Ingester func(ctx context.Context, e *models.Event) error 20 23 21 - func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer) Ingester { 24 + func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer, dev bool) Ingester { 22 25 return func(ctx context.Context, e *models.Event) error { 23 26 var err error 24 27 defer func() { ··· 44 47 ingestArtifact(&d, e, enforcer) 45 48 case tangled.ActorProfileNSID: 46 49 ingestProfile(&d, e) 50 + case tangled.RepoNSID: 51 + ingestRepo(ctx, &d, e, enforcer, dev) 47 52 } 48 53 49 54 return err ··· 285 290 286 291 return nil 287 292 } 293 + 294 + func ingestRepo( 295 + ctx context.Context, 296 + d *db.DbWrapper, 297 + e *models.Event, 298 + enforcer *rbac.Enforcer, 299 + dev bool, 300 + ) error { 301 + did := e.Did 302 + 303 + raw := json.RawMessage(e.Commit.Record) 304 + record := tangled.Repo{} 305 + if err := json.Unmarshal(raw, &record); err != nil { 306 + return err 307 + } 308 + 309 + switch e.Commit.Operation { 310 + case models.CommitOperationCreate, models.CommitOperationUpdate: 311 + var description string 312 + if record.Description != nil { 313 + description = *record.Description 314 + } 315 + 316 + created, err := time.Parse(time.RFC3339, record.CreatedAt) 317 + if err != nil { 318 + return fmt.Errorf("invalid createdAt format: %q", record.CreatedAt) 319 + } 320 + 321 + repo := &db.Repo{ 322 + Did: did, 323 + Name: record.Name, 324 + Knot: record.Knot, 325 + Rkey: e.Commit.RKey, 326 + Created: created, 327 + AtUri: fmt.Sprintf("at://%s/%s/%s", did, e.Kind, e.Commit.RKey), 328 + Description: description, 329 + } 330 + 331 + if err := db.ValidateRepo(d, repo); err != nil { 332 + return err 333 + } 334 + 335 + ok, err := enforcer.E.Enforce(did, record.Knot, record.Knot, "repo:create") 336 + if err != nil || !ok { 337 + return fmt.Errorf("insufficient permissions to create a repo on this knot") 338 + } 339 + 340 + secret, err := db.GetRegistrationKey(d, record.Knot) 341 + if err != nil { 342 + return err 343 + } 344 + 345 + client, err := knotclient.NewSignedClient(record.Knot, secret, dev) 346 + if err != nil { 347 + return err 348 + } 349 + 350 + // NOTE: The sh.tangled.repo lexicon has no branch field. For 351 + // repos we ingest via the jetstream we always default to "main" 352 + defaultBranch := "main" 353 + 354 + resp, err := client.NewRepo(did, record.Name, defaultBranch) 355 + if err != nil { 356 + return err 357 + } 358 + 359 + switch resp.StatusCode { 360 + case http.StatusConflict: 361 + return fmt.Errorf("repo with that name already exists") 362 + case http.StatusInternalServerError: 363 + return fmt.Errorf("failed to create repo") 364 + case http.StatusNoContent: 365 + // continue 366 + } 367 + 368 + ddb, ok := d.Execer.(*db.DB) 369 + if !ok { 370 + return fmt.Errorf("failed to index profile record, invalid db cast") 371 + } 372 + 373 + tx, err := ddb.BeginTx(ctx, nil) 374 + if err != nil { 375 + return fmt.Errorf("failed to start transaction") 376 + } 377 + 378 + defer func() { 379 + tx.Rollback() 380 + err = enforcer.E.LoadPolicy() 381 + if err != nil { 382 + log.Println("failed to rollback policies") 383 + } 384 + }() 385 + 386 + if err := db.AddRepo(tx, repo); err != nil { 387 + return err 388 + } 389 + 390 + // acls 391 + p, _ := securejoin.SecureJoin(did, record.Name) 392 + if err := enforcer.AddRepo(did, record.Knot, p); err != nil { 393 + return err 394 + } 395 + 396 + if err := tx.Commit(); err != nil { 397 + return err 398 + } 399 + 400 + if err := enforcer.E.SavePolicy(); err != nil { 401 + return err 402 + } 403 + case models.CommitOperationDelete: 404 + secret, err := db.GetRegistrationKey(d, record.Knot) 405 + if err != nil { 406 + return err 407 + } 408 + 409 + client, err := knotclient.NewSignedClient(record.Knot, secret, dev) 410 + if err != nil { 411 + return err 412 + } 413 + 414 + // We don't do anything with the response from the knot. This is 415 + // a fire and forget 416 + _, _ = client.RemoveRepo(did, record.Name) 417 + 418 + ddb, ok := d.Execer.(*db.DB) 419 + if !ok { 420 + return fmt.Errorf("failed to index profile record, invalid db cast") 421 + } 422 + 423 + tx, err := ddb.BeginTx(ctx, nil) 424 + if err != nil { 425 + return fmt.Errorf("failed to start transaction") 426 + } 427 + defer func() { 428 + tx.Rollback() 429 + err = enforcer.E.LoadPolicy() 430 + if err != nil { 431 + log.Println("failed to rollback policies") 432 + } 433 + }() 434 + 435 + p, _ := securejoin.SecureJoin(did, record.Name) 436 + collaborators, err := enforcer.E.GetImplicitUsersForResourceByDomain(p, record.Knot) 437 + if err != nil { 438 + return err 439 + } 440 + 441 + for _, c := range collaborators { 442 + cDid := c[0] 443 + enforcer.RemoveCollaborator(cDid, record.Knot, record.Name) 444 + } 445 + 446 + if err := enforcer.RemoveRepo(did, record.Knot, p); err != nil { 447 + return err 448 + } 449 + 450 + if err := db.RemoveRepo(d, did, record.Name); err != nil { 451 + return err 452 + } 453 + 454 + if err := tx.Commit(); err != nil { 455 + return err 456 + } 457 + 458 + if err := enforcer.E.SavePolicy(); err != nil { 459 + return err 460 + } 461 + } 462 + 463 + return nil 464 + }
+2 -6
appview/state/repo.go
··· 283 283 }, 284 284 }, 285 285 }) 286 - 287 286 if err != nil { 288 287 log.Println("failed to perferom update-description query", err) 289 288 // failed to get record ··· 716 715 } 717 716 718 717 w.Write([]byte(fmt.Sprint("added collaborator: ", collaboratorIdent.Handle.String()))) 719 - 720 718 } 721 719 722 720 func (s *State) DeleteRepo(w http.ResponseWriter, r *http.Request) { ··· 1167 1165 IssueOwnerHandle: issueOwnerIdent.Handle.String(), 1168 1166 DidHandleMap: didHandleMap, 1169 1167 }) 1170 - 1171 1168 } 1172 1169 1173 1170 func (s *State) CloseIssue(w http.ResponseWriter, r *http.Request) { ··· 1223 1220 }, 1224 1221 }, 1225 1222 }) 1226 - 1227 1223 if err != nil { 1228 1224 log.Println("failed to update issue state", err) 1229 1225 s.pages.Notice(w, "issue-action", "Failed to close issue. Try again later.") ··· 1564 1560 return 1565 1561 1566 1562 } 1567 - 1568 1563 } 1569 1564 1570 1565 func (s *State) DeleteIssueComment(w http.ResponseWriter, r *http.Request) { ··· 1938 1933 CreatedAt: createdAt, 1939 1934 Owner: user.Did, 1940 1935 Source: &sourceAt, 1941 - }}, 1936 + }, 1937 + }, 1942 1938 }) 1943 1939 if err != nil { 1944 1940 log.Printf("failed to create record: %s", err)