Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 1293 lines 32 kB view raw
1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/appview/cloudflare" 16 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 "tangled.org/core/appview/oauth" 23 "tangled.org/core/appview/pages" 24 "tangled.org/core/appview/reporesolver" 25 "tangled.org/core/appview/validator" 26 xrpcclient "tangled.org/core/appview/xrpcclient" 27 "tangled.org/core/eventconsumer" 28 "tangled.org/core/idresolver" 29 "tangled.org/core/ogre" 30 "tangled.org/core/orm" 31 "tangled.org/core/rbac" 32 "tangled.org/core/tid" 33 "tangled.org/core/xrpc/serviceauth" 34 35 comatproto "github.com/bluesky-social/indigo/api/atproto" 36 "github.com/bluesky-social/indigo/atproto/atclient" 37 "github.com/bluesky-social/indigo/atproto/syntax" 38 lexutil "github.com/bluesky-social/indigo/lex/util" 39 40 "github.com/go-chi/chi/v5" 41) 42 43type Repo struct { 44 repoResolver *reporesolver.RepoResolver 45 idResolver *idresolver.Resolver 46 config *config.Config 47 oauth *oauth.OAuth 48 pages *pages.Pages 49 spindlestream *eventconsumer.Consumer 50 db *db.DB 51 enforcer *rbac.Enforcer 52 notifier notify.Notifier 53 logger *slog.Logger 54 serviceAuth *serviceauth.ServiceAuth 55 validator *validator.Validator 56 cfClient *cloudflare.Client 57 ogreClient *ogre.Client 58} 59 60func New( 61 oauth *oauth.OAuth, 62 repoResolver *reporesolver.RepoResolver, 63 pages *pages.Pages, 64 spindlestream *eventconsumer.Consumer, 65 idResolver *idresolver.Resolver, 66 db *db.DB, 67 config *config.Config, 68 notifier notify.Notifier, 69 enforcer *rbac.Enforcer, 70 logger *slog.Logger, 71 validator *validator.Validator, 72 cfClient *cloudflare.Client, 73) *Repo { 74 return &Repo{ 75 oauth: oauth, 76 repoResolver: repoResolver, 77 pages: pages, 78 idResolver: idResolver, 79 config: config, 80 spindlestream: spindlestream, 81 db: db, 82 notifier: notifier, 83 enforcer: enforcer, 84 logger: logger, 85 validator: validator, 86 cfClient: cfClient, 87 ogreClient: ogre.NewClient(config.Ogre.Host), 88 } 89} 90 91// modify the spindle configured for this repo 92func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 93 user := rp.oauth.GetMultiAccountUser(r) 94 l := rp.logger.With("handler", "EditSpindle") 95 l = l.With("did", user.Active.Did) 96 97 errorId := "operation-error" 98 fail := func(msg string, err error) { 99 l.Error(msg, "err", err) 100 rp.pages.Notice(w, errorId, msg) 101 } 102 103 f, err := rp.repoResolver.Resolve(r) 104 if err != nil { 105 fail("Failed to resolve repo. Try again later", err) 106 return 107 } 108 109 newSpindle := r.FormValue("spindle") 110 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 111 client, err := rp.oauth.AuthorizedClient(r) 112 if err != nil { 113 fail("Failed to authorize. Try again later.", err) 114 return 115 } 116 117 if !removingSpindle { 118 // ensure that this is a valid spindle for this user 119 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did) 120 if err != nil { 121 fail("Failed to find spindles. Try again later.", err) 122 return 123 } 124 125 if !slices.Contains(validSpindles, newSpindle) { 126 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 127 return 128 } 129 } 130 131 newRepo := *f 132 newRepo.Spindle = newSpindle 133 record := newRepo.AsRecord() 134 135 spindlePtr := &newSpindle 136 if removingSpindle { 137 spindlePtr = nil 138 newRepo.Spindle = "" 139 } 140 141 // optimistic update 142 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr) 143 if err != nil { 144 fail("Failed to update spindle. Try again later.", err) 145 return 146 } 147 148 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 149 if err != nil { 150 fail("Failed to update spindle, no record found on PDS.", err) 151 return 152 } 153 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 154 Collection: tangled.RepoNSID, 155 Repo: newRepo.Did, 156 Rkey: newRepo.Rkey, 157 SwapRecord: ex.Cid, 158 Record: &lexutil.LexiconTypeDecoder{ 159 Val: &record, 160 }, 161 }) 162 163 if err != nil { 164 fail("Failed to update spindle, unable to save to PDS.", err) 165 return 166 } 167 168 if !removingSpindle { 169 // add this spindle to spindle stream 170 rp.spindlestream.AddSource( 171 context.Background(), 172 eventconsumer.NewSpindleSource(newSpindle), 173 ) 174 } 175 176 rp.pages.HxRefresh(w) 177} 178 179func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 180 user := rp.oauth.GetMultiAccountUser(r) 181 l := rp.logger.With("handler", "AddLabel") 182 l = l.With("did", user.Active.Did) 183 184 f, err := rp.repoResolver.Resolve(r) 185 if err != nil { 186 l.Error("failed to get repo and knot", "err", err) 187 return 188 } 189 190 errorId := "add-label-error" 191 fail := func(msg string, err error) { 192 l.Error(msg, "err", err) 193 rp.pages.Notice(w, errorId, msg) 194 } 195 196 // get form values for label definition 197 name := r.FormValue("name") 198 concreteType := r.FormValue("valueType") 199 valueFormat := r.FormValue("valueFormat") 200 enumValues := r.FormValue("enumValues") 201 scope := r.Form["scope"] 202 color := r.FormValue("color") 203 multiple := r.FormValue("multiple") == "true" 204 205 var variants []string 206 for part := range strings.SplitSeq(enumValues, ",") { 207 if part = strings.TrimSpace(part); part != "" { 208 variants = append(variants, part) 209 } 210 } 211 212 if concreteType == "" { 213 concreteType = "null" 214 } 215 216 format := models.ValueTypeFormatAny 217 if valueFormat == "did" { 218 format = models.ValueTypeFormatDid 219 } 220 221 valueType := models.ValueType{ 222 Type: models.ConcreteType(concreteType), 223 Format: format, 224 Enum: variants, 225 } 226 227 label := models.LabelDefinition{ 228 Did: user.Active.Did, 229 Rkey: tid.TID(), 230 Name: name, 231 ValueType: valueType, 232 Scope: scope, 233 Color: &color, 234 Multiple: multiple, 235 Created: time.Now(), 236 } 237 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 238 fail(err.Error(), err) 239 return 240 } 241 242 // announce this relation into the firehose, store into owners' pds 243 client, err := rp.oauth.AuthorizedClient(r) 244 if err != nil { 245 fail(err.Error(), err) 246 return 247 } 248 249 // emit a labelRecord 250 labelRecord := label.AsRecord() 251 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 252 Collection: tangled.LabelDefinitionNSID, 253 Repo: label.Did, 254 Rkey: label.Rkey, 255 Record: &lexutil.LexiconTypeDecoder{ 256 Val: &labelRecord, 257 }, 258 }) 259 // invalid record 260 if err != nil { 261 fail("Failed to write record to PDS.", err) 262 return 263 } 264 265 aturi := resp.Uri 266 l = l.With("at-uri", aturi) 267 l.Info("wrote label record to PDS") 268 269 // update the repo to subscribe to this label 270 newRepo := *f 271 newRepo.Labels = append(newRepo.Labels, aturi) 272 repoRecord := newRepo.AsRecord() 273 274 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 275 if err != nil { 276 fail("Failed to update labels, no record found on PDS.", err) 277 return 278 } 279 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 280 Collection: tangled.RepoNSID, 281 Repo: newRepo.Did, 282 Rkey: newRepo.Rkey, 283 SwapRecord: ex.Cid, 284 Record: &lexutil.LexiconTypeDecoder{ 285 Val: &repoRecord, 286 }, 287 }) 288 if err != nil { 289 fail("Failed to update labels for repo.", err) 290 return 291 } 292 293 tx, err := rp.db.BeginTx(r.Context(), nil) 294 if err != nil { 295 fail("Failed to add label.", err) 296 return 297 } 298 299 rollback := func() { 300 err1 := tx.Rollback() 301 err2 := rollbackRecord(context.Background(), aturi, client) 302 303 // ignore txn complete errors, this is okay 304 if errors.Is(err1, sql.ErrTxDone) { 305 err1 = nil 306 } 307 308 if errs := errors.Join(err1, err2); errs != nil { 309 l.Error("failed to rollback changes", "errs", errs) 310 return 311 } 312 } 313 defer rollback() 314 315 _, err = db.AddLabelDefinition(tx, &label) 316 if err != nil { 317 fail("Failed to add label.", err) 318 return 319 } 320 321 if err = db.SubscribeLabel(tx, &models.RepoLabel{ 322 RepoAt: f.RepoAt(), 323 LabelAt: label.AtUri(), 324 }); err != nil { 325 fail("Failed to subscribe to label.", err) 326 return 327 } 328 329 err = tx.Commit() 330 if err != nil { 331 fail("Failed to add label.", err) 332 return 333 } 334 335 // clear aturi when everything is successful 336 aturi = "" 337 338 rp.pages.HxRefresh(w) 339} 340 341func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 342 user := rp.oauth.GetMultiAccountUser(r) 343 l := rp.logger.With("handler", "DeleteLabel") 344 l = l.With("did", user.Active.Did) 345 346 f, err := rp.repoResolver.Resolve(r) 347 if err != nil { 348 l.Error("failed to get repo and knot", "err", err) 349 return 350 } 351 352 errorId := "label-operation" 353 fail := func(msg string, err error) { 354 l.Error(msg, "err", err) 355 rp.pages.Notice(w, errorId, msg) 356 } 357 358 // get form values 359 labelId := r.FormValue("label-id") 360 361 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 362 if err != nil { 363 fail("Failed to find label definition.", err) 364 return 365 } 366 367 client, err := rp.oauth.AuthorizedClient(r) 368 if err != nil { 369 fail(err.Error(), err) 370 return 371 } 372 373 // delete label record from PDS 374 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 375 Collection: tangled.LabelDefinitionNSID, 376 Repo: label.Did, 377 Rkey: label.Rkey, 378 }) 379 if err != nil { 380 fail("Failed to delete label record from PDS.", err) 381 return 382 } 383 384 // update repo record to remove the label reference 385 newRepo := *f 386 var updated []string 387 removedAt := label.AtUri().String() 388 for _, l := range newRepo.Labels { 389 if l != removedAt { 390 updated = append(updated, l) 391 } 392 } 393 newRepo.Labels = updated 394 repoRecord := newRepo.AsRecord() 395 396 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 397 if err != nil { 398 fail("Failed to update labels, no record found on PDS.", err) 399 return 400 } 401 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 402 Collection: tangled.RepoNSID, 403 Repo: newRepo.Did, 404 Rkey: newRepo.Rkey, 405 SwapRecord: ex.Cid, 406 Record: &lexutil.LexiconTypeDecoder{ 407 Val: &repoRecord, 408 }, 409 }) 410 if err != nil { 411 fail("Failed to update repo record.", err) 412 return 413 } 414 415 // transaction for DB changes 416 tx, err := rp.db.BeginTx(r.Context(), nil) 417 if err != nil { 418 fail("Failed to delete label.", err) 419 return 420 } 421 defer tx.Rollback() 422 423 err = db.UnsubscribeLabel( 424 tx, 425 orm.FilterEq("repo_at", f.RepoAt()), 426 orm.FilterEq("label_at", removedAt), 427 ) 428 if err != nil { 429 fail("Failed to unsubscribe label.", err) 430 return 431 } 432 433 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 434 if err != nil { 435 fail("Failed to delete label definition.", err) 436 return 437 } 438 439 err = tx.Commit() 440 if err != nil { 441 fail("Failed to delete label.", err) 442 return 443 } 444 445 // everything succeeded 446 rp.pages.HxRefresh(w) 447} 448 449func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 450 user := rp.oauth.GetMultiAccountUser(r) 451 l := rp.logger.With("handler", "SubscribeLabel") 452 l = l.With("did", user.Active.Did) 453 454 f, err := rp.repoResolver.Resolve(r) 455 if err != nil { 456 l.Error("failed to get repo and knot", "err", err) 457 return 458 } 459 460 if err := r.ParseForm(); err != nil { 461 l.Error("invalid form", "err", err) 462 return 463 } 464 465 errorId := "default-label-operation" 466 fail := func(msg string, err error) { 467 l.Error(msg, "err", err) 468 rp.pages.Notice(w, errorId, msg) 469 } 470 471 labelAts := r.Form["label"] 472 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 473 if err != nil { 474 fail("Failed to subscribe to label.", err) 475 return 476 } 477 478 newRepo := *f 479 newRepo.Labels = append(newRepo.Labels, labelAts...) 480 481 // dedup 482 slices.Sort(newRepo.Labels) 483 newRepo.Labels = slices.Compact(newRepo.Labels) 484 485 repoRecord := newRepo.AsRecord() 486 487 client, err := rp.oauth.AuthorizedClient(r) 488 if err != nil { 489 fail(err.Error(), err) 490 return 491 } 492 493 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 494 if err != nil { 495 fail("Failed to update labels, no record found on PDS.", err) 496 return 497 } 498 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 499 Collection: tangled.RepoNSID, 500 Repo: newRepo.Did, 501 Rkey: newRepo.Rkey, 502 SwapRecord: ex.Cid, 503 Record: &lexutil.LexiconTypeDecoder{ 504 Val: &repoRecord, 505 }, 506 }) 507 508 tx, err := rp.db.Begin() 509 if err != nil { 510 fail("Failed to subscribe to label.", err) 511 return 512 } 513 defer tx.Rollback() 514 515 for _, l := range labelAts { 516 err = db.SubscribeLabel(tx, &models.RepoLabel{ 517 RepoAt: f.RepoAt(), 518 LabelAt: syntax.ATURI(l), 519 }) 520 if err != nil { 521 fail("Failed to subscribe to label.", err) 522 return 523 } 524 } 525 526 if err := tx.Commit(); err != nil { 527 fail("Failed to subscribe to label.", err) 528 return 529 } 530 531 // everything succeeded 532 rp.pages.HxRefresh(w) 533} 534 535func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 536 user := rp.oauth.GetMultiAccountUser(r) 537 l := rp.logger.With("handler", "UnsubscribeLabel") 538 l = l.With("did", user.Active.Did) 539 540 f, err := rp.repoResolver.Resolve(r) 541 if err != nil { 542 l.Error("failed to get repo and knot", "err", err) 543 return 544 } 545 546 if err := r.ParseForm(); err != nil { 547 l.Error("invalid form", "err", err) 548 return 549 } 550 551 errorId := "default-label-operation" 552 fail := func(msg string, err error) { 553 l.Error(msg, "err", err) 554 rp.pages.Notice(w, errorId, msg) 555 } 556 557 labelAts := r.Form["label"] 558 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 559 if err != nil { 560 fail("Failed to unsubscribe to label.", err) 561 return 562 } 563 564 // update repo record to remove the label reference 565 newRepo := *f 566 var updated []string 567 for _, l := range newRepo.Labels { 568 if !slices.Contains(labelAts, l) { 569 updated = append(updated, l) 570 } 571 } 572 newRepo.Labels = updated 573 repoRecord := newRepo.AsRecord() 574 575 client, err := rp.oauth.AuthorizedClient(r) 576 if err != nil { 577 fail(err.Error(), err) 578 return 579 } 580 581 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 582 if err != nil { 583 fail("Failed to update labels, no record found on PDS.", err) 584 return 585 } 586 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 587 Collection: tangled.RepoNSID, 588 Repo: newRepo.Did, 589 Rkey: newRepo.Rkey, 590 SwapRecord: ex.Cid, 591 Record: &lexutil.LexiconTypeDecoder{ 592 Val: &repoRecord, 593 }, 594 }) 595 596 err = db.UnsubscribeLabel( 597 rp.db, 598 orm.FilterEq("repo_at", f.RepoAt()), 599 orm.FilterIn("label_at", labelAts), 600 ) 601 if err != nil { 602 fail("Failed to unsubscribe label.", err) 603 return 604 } 605 606 // everything succeeded 607 rp.pages.HxRefresh(w) 608} 609 610func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 611 l := rp.logger.With("handler", "LabelPanel") 612 613 f, err := rp.repoResolver.Resolve(r) 614 if err != nil { 615 l.Error("failed to get repo and knot", "err", err) 616 return 617 } 618 619 subjectStr := r.FormValue("subject") 620 subject, err := syntax.ParseATURI(subjectStr) 621 if err != nil { 622 l.Error("failed to get repo and knot", "err", err) 623 return 624 } 625 626 labelDefs, err := db.GetLabelDefinitions( 627 rp.db, 628 orm.FilterIn("at_uri", f.Labels), 629 orm.FilterContains("scope", subject.Collection().String()), 630 ) 631 if err != nil { 632 l.Error("failed to fetch label defs", "err", err) 633 return 634 } 635 636 defs := make(map[string]*models.LabelDefinition) 637 for _, l := range labelDefs { 638 defs[l.AtUri().String()] = &l 639 } 640 641 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 642 if err != nil { 643 l.Error("failed to build label state", "err", err) 644 return 645 } 646 state := states[subject] 647 648 user := rp.oauth.GetMultiAccountUser(r) 649 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 650 LoggedInUser: user, 651 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 652 Defs: defs, 653 Subject: subject.String(), 654 State: state, 655 }) 656} 657 658func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 659 l := rp.logger.With("handler", "EditLabelPanel") 660 661 f, err := rp.repoResolver.Resolve(r) 662 if err != nil { 663 l.Error("failed to get repo and knot", "err", err) 664 return 665 } 666 667 subjectStr := r.FormValue("subject") 668 subject, err := syntax.ParseATURI(subjectStr) 669 if err != nil { 670 l.Error("failed to get repo and knot", "err", err) 671 return 672 } 673 674 labelDefs, err := db.GetLabelDefinitions( 675 rp.db, 676 orm.FilterIn("at_uri", f.Labels), 677 orm.FilterContains("scope", subject.Collection().String()), 678 ) 679 if err != nil { 680 l.Error("failed to fetch labels", "err", err) 681 return 682 } 683 684 defs := make(map[string]*models.LabelDefinition) 685 for _, l := range labelDefs { 686 defs[l.AtUri().String()] = &l 687 } 688 689 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 690 if err != nil { 691 l.Error("failed to build label state", "err", err) 692 return 693 } 694 state := states[subject] 695 696 user := rp.oauth.GetMultiAccountUser(r) 697 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 698 LoggedInUser: user, 699 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 700 Defs: defs, 701 Subject: subject.String(), 702 State: state, 703 }) 704} 705 706func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 707 user := rp.oauth.GetMultiAccountUser(r) 708 l := rp.logger.With("handler", "AddCollaborator") 709 l = l.With("did", user.Active.Did) 710 711 f, err := rp.repoResolver.Resolve(r) 712 if err != nil { 713 l.Error("failed to get repo and knot", "err", err) 714 return 715 } 716 717 errorId := "add-collaborator-error" 718 fail := func(msg string, err error) { 719 l.Error(msg, "err", err) 720 rp.pages.Notice(w, errorId, msg) 721 } 722 723 collaborator := r.FormValue("collaborator") 724 if collaborator == "" { 725 fail("Invalid form.", nil) 726 return 727 } 728 729 // remove a single leading `@`, to make @handle work with ResolveIdent 730 collaborator = strings.TrimPrefix(collaborator, "@") 731 732 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 733 if err != nil { 734 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 735 return 736 } 737 738 if collaboratorIdent.DID.String() == user.Active.Did { 739 fail("You seem to be adding yourself as a collaborator.", nil) 740 return 741 } 742 l = l.With("collaborator", collaboratorIdent.Handle) 743 l = l.With("knot", f.Knot) 744 745 // announce this relation into the firehose, store into owners' pds 746 client, err := rp.oauth.AuthorizedClient(r) 747 if err != nil { 748 fail("Failed to write to PDS.", err) 749 return 750 } 751 752 // emit a record 753 currentUser := rp.oauth.GetMultiAccountUser(r) 754 rkey := tid.TID() 755 createdAt := time.Now() 756 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 757 Collection: tangled.RepoCollaboratorNSID, 758 Repo: currentUser.Active.Did, 759 Rkey: rkey, 760 Record: &lexutil.LexiconTypeDecoder{ 761 Val: repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt), 762 }, 763 }) 764 // invalid record 765 if err != nil { 766 fail("Failed to write record to PDS.", err) 767 return 768 } 769 770 aturi := resp.Uri 771 l = l.With("at-uri", aturi) 772 l.Info("wrote record to PDS") 773 774 tx, err := rp.db.BeginTx(r.Context(), nil) 775 if err != nil { 776 fail("Failed to add collaborator.", err) 777 return 778 } 779 780 rollback := func() { 781 err1 := tx.Rollback() 782 err2 := rp.enforcer.E.LoadPolicy() 783 err3 := rollbackRecord(context.Background(), aturi, client) 784 785 // ignore txn complete errors, this is okay 786 if errors.Is(err1, sql.ErrTxDone) { 787 err1 = nil 788 } 789 790 if errs := errors.Join(err1, err2, err3); errs != nil { 791 l.Error("failed to rollback changes", "errs", errs) 792 return 793 } 794 } 795 defer rollback() 796 797 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()) 798 if err != nil { 799 fail("Failed to add collaborator permissions.", err) 800 return 801 } 802 803 err = db.AddCollaborator(tx, models.Collaborator{ 804 Did: syntax.DID(currentUser.Active.Did), 805 Rkey: rkey, 806 SubjectDid: collaboratorIdent.DID, 807 RepoAt: f.RepoAt(), 808 Created: createdAt, 809 }) 810 if err != nil { 811 fail("Failed to add collaborator.", err) 812 return 813 } 814 815 err = tx.Commit() 816 if err != nil { 817 fail("Failed to add collaborator.", err) 818 return 819 } 820 821 err = rp.enforcer.E.SavePolicy() 822 if err != nil { 823 fail("Failed to update collaborator permissions.", err) 824 return 825 } 826 827 // clear aturi to when everything is successful 828 aturi = "" 829 830 rp.pages.HxRefresh(w) 831} 832 833func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 834 user := rp.oauth.GetMultiAccountUser(r) 835 l := rp.logger.With("handler", "DeleteRepo") 836 837 noticeId := "operation-error" 838 f, err := rp.repoResolver.Resolve(r) 839 if err != nil { 840 l.Error("failed to get repo and knot", "err", err) 841 return 842 } 843 844 // remove record from pds 845 atpClient, err := rp.oauth.AuthorizedClient(r) 846 if err != nil { 847 l.Error("failed to get authorized client", "err", err) 848 return 849 } 850 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 851 Collection: tangled.RepoNSID, 852 Repo: user.Active.Did, 853 Rkey: f.Rkey, 854 }) 855 if err != nil { 856 l.Error("failed to delete record", "err", err) 857 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 858 return 859 } 860 l.Info("removed repo record", "aturi", f.RepoAt().String()) 861 862 client, err := rp.oauth.ServiceClient( 863 r, 864 oauth.WithService(f.Knot), 865 oauth.WithLxm(tangled.RepoDeleteNSID), 866 oauth.WithDev(rp.config.Core.Dev), 867 ) 868 if err != nil { 869 l.Error("failed to connect to knot server", "err", err) 870 return 871 } 872 873 err = tangled.RepoDelete( 874 r.Context(), 875 client, 876 &tangled.RepoDelete_Input{ 877 Did: f.Did, 878 Name: f.Name, 879 Rkey: f.Rkey, 880 }, 881 ) 882 if err := xrpcclient.HandleXrpcErr(err); err != nil { 883 rp.pages.Notice(w, noticeId, err.Error()) 884 return 885 } 886 l.Info("deleted repo from knot") 887 888 tx, err := rp.db.BeginTx(r.Context(), nil) 889 if err != nil { 890 l.Error("failed to start tx") 891 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 892 return 893 } 894 defer func() { 895 tx.Rollback() 896 err = rp.enforcer.E.LoadPolicy() 897 if err != nil { 898 l.Error("failed to rollback policies") 899 } 900 }() 901 902 // remove collaborator RBAC 903 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot) 904 if err != nil { 905 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 906 return 907 } 908 for _, c := range repoCollaborators { 909 did := c[0] 910 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier()) 911 } 912 l.Info("removed collaborators") 913 914 // remove repo RBAC 915 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier()) 916 if err != nil { 917 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 918 return 919 } 920 921 // remove repo from db 922 err = db.RemoveRepo(tx, f.Did, f.Name) 923 if err != nil { 924 rp.pages.Notice(w, noticeId, "Failed to update appview") 925 return 926 } 927 l.Info("removed repo from db") 928 929 err = tx.Commit() 930 if err != nil { 931 l.Error("failed to commit changes", "err", err) 932 http.Error(w, err.Error(), http.StatusInternalServerError) 933 return 934 } 935 936 err = rp.enforcer.E.SavePolicy() 937 if err != nil { 938 l.Error("failed to update ACLs", "err", err) 939 http.Error(w, err.Error(), http.StatusInternalServerError) 940 return 941 } 942 943 rp.notifier.DeleteRepo(r.Context(), f) 944 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 945} 946 947func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 948 l := rp.logger.With("handler", "SyncRepoFork") 949 950 ref := chi.URLParam(r, "ref") 951 ref, _ = url.PathUnescape(ref) 952 953 user := rp.oauth.GetMultiAccountUser(r) 954 f, err := rp.repoResolver.Resolve(r) 955 if err != nil { 956 l.Error("failed to resolve source repo", "err", err) 957 return 958 } 959 960 switch r.Method { 961 case http.MethodPost: 962 client, err := rp.oauth.ServiceClient( 963 r, 964 oauth.WithService(f.Knot), 965 oauth.WithLxm(tangled.RepoForkSyncNSID), 966 oauth.WithDev(rp.config.Core.Dev), 967 ) 968 if err != nil { 969 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 970 return 971 } 972 973 if f.Source == "" { 974 rp.pages.Notice(w, "repo", "This repository is not a fork.") 975 return 976 } 977 978 err = tangled.RepoForkSync( 979 r.Context(), 980 client, 981 &tangled.RepoForkSync_Input{ 982 Did: user.Active.Did, 983 Name: f.Name, 984 Source: f.Source, 985 Branch: ref, 986 }, 987 ) 988 if err := xrpcclient.HandleXrpcErr(err); err != nil { 989 rp.pages.Notice(w, "repo", err.Error()) 990 return 991 } 992 993 rp.pages.HxRefresh(w) 994 return 995 } 996} 997 998func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 999 l := rp.logger.With("handler", "ForkRepo") 1000 1001 user := rp.oauth.GetMultiAccountUser(r) 1002 f, err := rp.repoResolver.Resolve(r) 1003 if err != nil { 1004 l.Error("failed to resolve source repo", "err", err) 1005 return 1006 } 1007 1008 switch r.Method { 1009 case http.MethodGet: 1010 user := rp.oauth.GetMultiAccountUser(r) 1011 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did) 1012 if err != nil { 1013 rp.pages.Notice(w, "repo", "Invalid user account.") 1014 return 1015 } 1016 1017 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1018 LoggedInUser: user, 1019 Knots: knots, 1020 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1021 }) 1022 1023 case http.MethodPost: 1024 l := rp.logger.With("handler", "ForkRepo") 1025 1026 targetKnot := r.FormValue("knot") 1027 if targetKnot == "" { 1028 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.") 1029 return 1030 } 1031 l = l.With("targetKnot", targetKnot) 1032 1033 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create") 1034 if err != nil || !ok { 1035 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1036 return 1037 } 1038 1039 // choose a name for a fork 1040 forkName := r.FormValue("repo_name") 1041 if forkName == "" { 1042 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1043 return 1044 } 1045 1046 // this check is *only* to see if the forked repo name already exists 1047 // in the user's account. 1048 existingRepo, err := db.GetRepo( 1049 rp.db, 1050 orm.FilterEq("did", user.Active.Did), 1051 orm.FilterEq("name", forkName), 1052 ) 1053 if err != nil { 1054 if !errors.Is(err, sql.ErrNoRows) { 1055 l.Error("error fetching existing repo from db", "err", err) 1056 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1057 return 1058 } 1059 } else if existingRepo != nil { 1060 // repo with this name already exists 1061 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1062 return 1063 } 1064 l = l.With("forkName", forkName) 1065 1066 uri := "https" 1067 if rp.config.Core.Dev { 1068 uri = "http" 1069 } 1070 1071 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier()) 1072 l = l.With("cloneUrl", forkSourceUrl) 1073 1074 rkey := tid.TID() 1075 1076 // TODO: this could coordinate better with the knot to recieve a clone status 1077 client, err := rp.oauth.ServiceClient( 1078 r, 1079 oauth.WithService(targetKnot), 1080 oauth.WithLxm(tangled.RepoCreateNSID), 1081 oauth.WithDev(rp.config.Core.Dev), 1082 oauth.WithTimeout(time.Second*20), 1083 ) 1084 if err != nil { 1085 l.Error("could not create service client", "err", err) 1086 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1087 return 1088 } 1089 1090 forkInput := &tangled.RepoCreate_Input{ 1091 Rkey: rkey, 1092 Name: forkName, 1093 Source: &forkSourceUrl, 1094 } 1095 createResp, createErr := tangled.RepoCreate( 1096 r.Context(), 1097 client, 1098 forkInput, 1099 ) 1100 if err := xrpcclient.HandleXrpcErr(createErr); err != nil { 1101 rp.pages.Notice(w, "repo", err.Error()) 1102 return 1103 } 1104 1105 var repoDid string 1106 if createResp != nil && createResp.RepoDid != nil { 1107 repoDid = *createResp.RepoDid 1108 } 1109 if repoDid == "" { 1110 l.Error("knot returned empty repo DID for fork") 1111 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 1112 return 1113 } 1114 1115 forkSource := f.RepoAt().String() 1116 if f.RepoDid != "" { 1117 forkSource = f.RepoDid 1118 } 1119 1120 repo := &models.Repo{ 1121 Did: user.Active.Did, 1122 Name: forkName, 1123 Knot: targetKnot, 1124 Rkey: rkey, 1125 Source: forkSource, 1126 Description: f.Description, 1127 Created: time.Now(), 1128 Labels: rp.config.Label.DefaultLabelDefs, 1129 RepoDid: repoDid, 1130 } 1131 record := repo.AsRecord() 1132 1133 cleanupKnot := func() { 1134 go func() { 1135 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 1136 for attempt, delay := range delays { 1137 time.Sleep(delay) 1138 deleteClient, dErr := rp.oauth.ServiceClient( 1139 r, 1140 oauth.WithService(targetKnot), 1141 oauth.WithLxm(tangled.RepoDeleteNSID), 1142 oauth.WithDev(rp.config.Core.Dev), 1143 ) 1144 if dErr != nil { 1145 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 1146 continue 1147 } 1148 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 1149 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 1150 Did: user.Active.Did, 1151 Name: forkName, 1152 Rkey: rkey, 1153 }); dErr != nil { 1154 cancel() 1155 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr) 1156 continue 1157 } 1158 cancel() 1159 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1) 1160 return 1161 } 1162 l.Error("exhausted retries for knot cleanup, fork may be orphaned", 1163 "did", user.Active.Did, "fork", forkName, "knot", targetKnot) 1164 }() 1165 } 1166 1167 atpClient, err := rp.oauth.AuthorizedClient(r) 1168 if err != nil { 1169 l.Error("failed to create xrpcclient", "err", err) 1170 cleanupKnot() 1171 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1172 return 1173 } 1174 1175 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1176 Collection: tangled.RepoNSID, 1177 Repo: user.Active.Did, 1178 Rkey: rkey, 1179 Record: &lexutil.LexiconTypeDecoder{ 1180 Val: &record, 1181 }, 1182 }) 1183 if err != nil { 1184 l.Error("failed to write to PDS", "err", err) 1185 cleanupKnot() 1186 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1187 return 1188 } 1189 1190 aturi := atresp.Uri 1191 l = l.With("aturi", aturi) 1192 l.Info("wrote to PDS") 1193 1194 tx, err := rp.db.BeginTx(r.Context(), nil) 1195 if err != nil { 1196 l.Info("txn failed", "err", err) 1197 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1198 return 1199 } 1200 1201 rollback := func() { 1202 err1 := tx.Rollback() 1203 err2 := rp.enforcer.E.LoadPolicy() 1204 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1205 1206 if errors.Is(err1, sql.ErrTxDone) { 1207 err1 = nil 1208 } 1209 1210 if errs := errors.Join(err1, err2, err3); errs != nil { 1211 l.Error("failed to rollback changes", "errs", errs) 1212 } 1213 1214 if aturi != "" { 1215 cleanupKnot() 1216 } 1217 } 1218 defer rollback() 1219 1220 err = db.AddRepo(tx, repo) 1221 if err != nil { 1222 l.Error("failed to AddRepo", "err", err) 1223 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1224 return 1225 } 1226 1227 rbacPath := repo.RepoIdentifier() 1228 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, rbacPath) 1229 if err != nil { 1230 l.Error("failed to add ACLs", "err", err) 1231 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1232 return 1233 } 1234 1235 err = tx.Commit() 1236 if err != nil { 1237 l.Error("failed to commit changes", "err", err) 1238 http.Error(w, err.Error(), http.StatusInternalServerError) 1239 return 1240 } 1241 1242 err = rp.enforcer.E.SavePolicy() 1243 if err != nil { 1244 l.Error("failed to update ACLs", "err", err) 1245 http.Error(w, err.Error(), http.StatusInternalServerError) 1246 return 1247 } 1248 1249 aturi = "" 1250 1251 rp.notifier.NewRepo(r.Context(), repo) 1252 if repoDid != "" { 1253 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 1254 } else { 1255 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName)) 1256 } 1257 } 1258} 1259 1260// this is used to rollback changes made to the PDS 1261// 1262// it is a no-op if the provided ATURI is empty 1263func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 1264 if aturi == "" { 1265 return nil 1266 } 1267 1268 parsed := syntax.ATURI(aturi) 1269 1270 collection := parsed.Collection().String() 1271 repo := parsed.Authority().String() 1272 rkey := parsed.RecordKey().String() 1273 1274 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1275 Collection: collection, 1276 Repo: repo, 1277 Rkey: rkey, 1278 }) 1279 return err 1280} 1281 1282func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator { 1283 rec := &tangled.RepoCollaborator{ 1284 Subject: subject, 1285 CreatedAt: createdAt.Format(time.RFC3339), 1286 } 1287 s := string(f.RepoAt()) 1288 rec.Repo = &s 1289 if f.RepoDid != "" { 1290 rec.RepoDid = &f.RepoDid 1291 } 1292 return rec 1293}