forked from
tangled.org/core
Monorepo for Tangled
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/appview/cloudflare"
16
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 "tangled.org/core/appview/oauth"
23 "tangled.org/core/appview/pages"
24 "tangled.org/core/appview/reporesolver"
25 "tangled.org/core/appview/validator"
26 xrpcclient "tangled.org/core/appview/xrpcclient"
27 "tangled.org/core/eventconsumer"
28 "tangled.org/core/idresolver"
29 "tangled.org/core/ogre"
30 "tangled.org/core/orm"
31 "tangled.org/core/rbac"
32 "tangled.org/core/tid"
33 "tangled.org/core/xrpc/serviceauth"
34
35 comatproto "github.com/bluesky-social/indigo/api/atproto"
36 "github.com/bluesky-social/indigo/atproto/atclient"
37 "github.com/bluesky-social/indigo/atproto/syntax"
38 lexutil "github.com/bluesky-social/indigo/lex/util"
39
40 "github.com/go-chi/chi/v5"
41)
42
43type Repo struct {
44 repoResolver *reporesolver.RepoResolver
45 idResolver *idresolver.Resolver
46 config *config.Config
47 oauth *oauth.OAuth
48 pages *pages.Pages
49 spindlestream *eventconsumer.Consumer
50 db *db.DB
51 enforcer *rbac.Enforcer
52 notifier notify.Notifier
53 logger *slog.Logger
54 serviceAuth *serviceauth.ServiceAuth
55 validator *validator.Validator
56 cfClient *cloudflare.Client
57 ogreClient *ogre.Client
58}
59
60func New(
61 oauth *oauth.OAuth,
62 repoResolver *reporesolver.RepoResolver,
63 pages *pages.Pages,
64 spindlestream *eventconsumer.Consumer,
65 idResolver *idresolver.Resolver,
66 db *db.DB,
67 config *config.Config,
68 notifier notify.Notifier,
69 enforcer *rbac.Enforcer,
70 logger *slog.Logger,
71 validator *validator.Validator,
72 cfClient *cloudflare.Client,
73) *Repo {
74 return &Repo{
75 oauth: oauth,
76 repoResolver: repoResolver,
77 pages: pages,
78 idResolver: idResolver,
79 config: config,
80 spindlestream: spindlestream,
81 db: db,
82 notifier: notifier,
83 enforcer: enforcer,
84 logger: logger,
85 validator: validator,
86 cfClient: cfClient,
87 ogreClient: ogre.NewClient(config.Ogre.Host),
88 }
89}
90
91// modify the spindle configured for this repo
92func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
93 user := rp.oauth.GetMultiAccountUser(r)
94 l := rp.logger.With("handler", "EditSpindle")
95 l = l.With("did", user.Active.Did)
96
97 errorId := "operation-error"
98 fail := func(msg string, err error) {
99 l.Error(msg, "err", err)
100 rp.pages.Notice(w, errorId, msg)
101 }
102
103 f, err := rp.repoResolver.Resolve(r)
104 if err != nil {
105 fail("Failed to resolve repo. Try again later", err)
106 return
107 }
108
109 newSpindle := r.FormValue("spindle")
110 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
111 client, err := rp.oauth.AuthorizedClient(r)
112 if err != nil {
113 fail("Failed to authorize. Try again later.", err)
114 return
115 }
116
117 if !removingSpindle {
118 // ensure that this is a valid spindle for this user
119 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did)
120 if err != nil {
121 fail("Failed to find spindles. Try again later.", err)
122 return
123 }
124
125 if !slices.Contains(validSpindles, newSpindle) {
126 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
127 return
128 }
129 }
130
131 newRepo := *f
132 newRepo.Spindle = newSpindle
133 record := newRepo.AsRecord()
134
135 spindlePtr := &newSpindle
136 if removingSpindle {
137 spindlePtr = nil
138 newRepo.Spindle = ""
139 }
140
141 // optimistic update
142 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
143 if err != nil {
144 fail("Failed to update spindle. Try again later.", err)
145 return
146 }
147
148 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
149 if err != nil {
150 fail("Failed to update spindle, no record found on PDS.", err)
151 return
152 }
153 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
154 Collection: tangled.RepoNSID,
155 Repo: newRepo.Did,
156 Rkey: newRepo.Rkey,
157 SwapRecord: ex.Cid,
158 Record: &lexutil.LexiconTypeDecoder{
159 Val: &record,
160 },
161 })
162
163 if err != nil {
164 fail("Failed to update spindle, unable to save to PDS.", err)
165 return
166 }
167
168 if !removingSpindle {
169 // add this spindle to spindle stream
170 rp.spindlestream.AddSource(
171 context.Background(),
172 eventconsumer.NewSpindleSource(newSpindle),
173 )
174 }
175
176 rp.pages.HxRefresh(w)
177}
178
179func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
180 user := rp.oauth.GetMultiAccountUser(r)
181 l := rp.logger.With("handler", "AddLabel")
182 l = l.With("did", user.Active.Did)
183
184 f, err := rp.repoResolver.Resolve(r)
185 if err != nil {
186 l.Error("failed to get repo and knot", "err", err)
187 return
188 }
189
190 errorId := "add-label-error"
191 fail := func(msg string, err error) {
192 l.Error(msg, "err", err)
193 rp.pages.Notice(w, errorId, msg)
194 }
195
196 // get form values for label definition
197 name := r.FormValue("name")
198 concreteType := r.FormValue("valueType")
199 valueFormat := r.FormValue("valueFormat")
200 enumValues := r.FormValue("enumValues")
201 scope := r.Form["scope"]
202 color := r.FormValue("color")
203 multiple := r.FormValue("multiple") == "true"
204
205 var variants []string
206 for part := range strings.SplitSeq(enumValues, ",") {
207 if part = strings.TrimSpace(part); part != "" {
208 variants = append(variants, part)
209 }
210 }
211
212 if concreteType == "" {
213 concreteType = "null"
214 }
215
216 format := models.ValueTypeFormatAny
217 if valueFormat == "did" {
218 format = models.ValueTypeFormatDid
219 }
220
221 valueType := models.ValueType{
222 Type: models.ConcreteType(concreteType),
223 Format: format,
224 Enum: variants,
225 }
226
227 label := models.LabelDefinition{
228 Did: user.Active.Did,
229 Rkey: tid.TID(),
230 Name: name,
231 ValueType: valueType,
232 Scope: scope,
233 Color: &color,
234 Multiple: multiple,
235 Created: time.Now(),
236 }
237 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
238 fail(err.Error(), err)
239 return
240 }
241
242 // announce this relation into the firehose, store into owners' pds
243 client, err := rp.oauth.AuthorizedClient(r)
244 if err != nil {
245 fail(err.Error(), err)
246 return
247 }
248
249 // emit a labelRecord
250 labelRecord := label.AsRecord()
251 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
252 Collection: tangled.LabelDefinitionNSID,
253 Repo: label.Did,
254 Rkey: label.Rkey,
255 Record: &lexutil.LexiconTypeDecoder{
256 Val: &labelRecord,
257 },
258 })
259 // invalid record
260 if err != nil {
261 fail("Failed to write record to PDS.", err)
262 return
263 }
264
265 aturi := resp.Uri
266 l = l.With("at-uri", aturi)
267 l.Info("wrote label record to PDS")
268
269 // update the repo to subscribe to this label
270 newRepo := *f
271 newRepo.Labels = append(newRepo.Labels, aturi)
272 repoRecord := newRepo.AsRecord()
273
274 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
275 if err != nil {
276 fail("Failed to update labels, no record found on PDS.", err)
277 return
278 }
279 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
280 Collection: tangled.RepoNSID,
281 Repo: newRepo.Did,
282 Rkey: newRepo.Rkey,
283 SwapRecord: ex.Cid,
284 Record: &lexutil.LexiconTypeDecoder{
285 Val: &repoRecord,
286 },
287 })
288 if err != nil {
289 fail("Failed to update labels for repo.", err)
290 return
291 }
292
293 tx, err := rp.db.BeginTx(r.Context(), nil)
294 if err != nil {
295 fail("Failed to add label.", err)
296 return
297 }
298
299 rollback := func() {
300 err1 := tx.Rollback()
301 err2 := rollbackRecord(context.Background(), aturi, client)
302
303 // ignore txn complete errors, this is okay
304 if errors.Is(err1, sql.ErrTxDone) {
305 err1 = nil
306 }
307
308 if errs := errors.Join(err1, err2); errs != nil {
309 l.Error("failed to rollback changes", "errs", errs)
310 return
311 }
312 }
313 defer rollback()
314
315 _, err = db.AddLabelDefinition(tx, &label)
316 if err != nil {
317 fail("Failed to add label.", err)
318 return
319 }
320
321 if err = db.SubscribeLabel(tx, &models.RepoLabel{
322 RepoAt: f.RepoAt(),
323 LabelAt: label.AtUri(),
324 }); err != nil {
325 fail("Failed to subscribe to label.", err)
326 return
327 }
328
329 err = tx.Commit()
330 if err != nil {
331 fail("Failed to add label.", err)
332 return
333 }
334
335 // clear aturi when everything is successful
336 aturi = ""
337
338 rp.pages.HxRefresh(w)
339}
340
341func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
342 user := rp.oauth.GetMultiAccountUser(r)
343 l := rp.logger.With("handler", "DeleteLabel")
344 l = l.With("did", user.Active.Did)
345
346 f, err := rp.repoResolver.Resolve(r)
347 if err != nil {
348 l.Error("failed to get repo and knot", "err", err)
349 return
350 }
351
352 errorId := "label-operation"
353 fail := func(msg string, err error) {
354 l.Error(msg, "err", err)
355 rp.pages.Notice(w, errorId, msg)
356 }
357
358 // get form values
359 labelId := r.FormValue("label-id")
360
361 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
362 if err != nil {
363 fail("Failed to find label definition.", err)
364 return
365 }
366
367 client, err := rp.oauth.AuthorizedClient(r)
368 if err != nil {
369 fail(err.Error(), err)
370 return
371 }
372
373 // delete label record from PDS
374 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
375 Collection: tangled.LabelDefinitionNSID,
376 Repo: label.Did,
377 Rkey: label.Rkey,
378 })
379 if err != nil {
380 fail("Failed to delete label record from PDS.", err)
381 return
382 }
383
384 // update repo record to remove the label reference
385 newRepo := *f
386 var updated []string
387 removedAt := label.AtUri().String()
388 for _, l := range newRepo.Labels {
389 if l != removedAt {
390 updated = append(updated, l)
391 }
392 }
393 newRepo.Labels = updated
394 repoRecord := newRepo.AsRecord()
395
396 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
397 if err != nil {
398 fail("Failed to update labels, no record found on PDS.", err)
399 return
400 }
401 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
402 Collection: tangled.RepoNSID,
403 Repo: newRepo.Did,
404 Rkey: newRepo.Rkey,
405 SwapRecord: ex.Cid,
406 Record: &lexutil.LexiconTypeDecoder{
407 Val: &repoRecord,
408 },
409 })
410 if err != nil {
411 fail("Failed to update repo record.", err)
412 return
413 }
414
415 // transaction for DB changes
416 tx, err := rp.db.BeginTx(r.Context(), nil)
417 if err != nil {
418 fail("Failed to delete label.", err)
419 return
420 }
421 defer tx.Rollback()
422
423 err = db.UnsubscribeLabel(
424 tx,
425 orm.FilterEq("repo_at", f.RepoAt()),
426 orm.FilterEq("label_at", removedAt),
427 )
428 if err != nil {
429 fail("Failed to unsubscribe label.", err)
430 return
431 }
432
433 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
434 if err != nil {
435 fail("Failed to delete label definition.", err)
436 return
437 }
438
439 err = tx.Commit()
440 if err != nil {
441 fail("Failed to delete label.", err)
442 return
443 }
444
445 // everything succeeded
446 rp.pages.HxRefresh(w)
447}
448
449func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
450 user := rp.oauth.GetMultiAccountUser(r)
451 l := rp.logger.With("handler", "SubscribeLabel")
452 l = l.With("did", user.Active.Did)
453
454 f, err := rp.repoResolver.Resolve(r)
455 if err != nil {
456 l.Error("failed to get repo and knot", "err", err)
457 return
458 }
459
460 if err := r.ParseForm(); err != nil {
461 l.Error("invalid form", "err", err)
462 return
463 }
464
465 errorId := "default-label-operation"
466 fail := func(msg string, err error) {
467 l.Error(msg, "err", err)
468 rp.pages.Notice(w, errorId, msg)
469 }
470
471 labelAts := r.Form["label"]
472 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
473 if err != nil {
474 fail("Failed to subscribe to label.", err)
475 return
476 }
477
478 newRepo := *f
479 newRepo.Labels = append(newRepo.Labels, labelAts...)
480
481 // dedup
482 slices.Sort(newRepo.Labels)
483 newRepo.Labels = slices.Compact(newRepo.Labels)
484
485 repoRecord := newRepo.AsRecord()
486
487 client, err := rp.oauth.AuthorizedClient(r)
488 if err != nil {
489 fail(err.Error(), err)
490 return
491 }
492
493 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
494 if err != nil {
495 fail("Failed to update labels, no record found on PDS.", err)
496 return
497 }
498 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
499 Collection: tangled.RepoNSID,
500 Repo: newRepo.Did,
501 Rkey: newRepo.Rkey,
502 SwapRecord: ex.Cid,
503 Record: &lexutil.LexiconTypeDecoder{
504 Val: &repoRecord,
505 },
506 })
507
508 tx, err := rp.db.Begin()
509 if err != nil {
510 fail("Failed to subscribe to label.", err)
511 return
512 }
513 defer tx.Rollback()
514
515 for _, l := range labelAts {
516 err = db.SubscribeLabel(tx, &models.RepoLabel{
517 RepoAt: f.RepoAt(),
518 LabelAt: syntax.ATURI(l),
519 })
520 if err != nil {
521 fail("Failed to subscribe to label.", err)
522 return
523 }
524 }
525
526 if err := tx.Commit(); err != nil {
527 fail("Failed to subscribe to label.", err)
528 return
529 }
530
531 // everything succeeded
532 rp.pages.HxRefresh(w)
533}
534
535func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
536 user := rp.oauth.GetMultiAccountUser(r)
537 l := rp.logger.With("handler", "UnsubscribeLabel")
538 l = l.With("did", user.Active.Did)
539
540 f, err := rp.repoResolver.Resolve(r)
541 if err != nil {
542 l.Error("failed to get repo and knot", "err", err)
543 return
544 }
545
546 if err := r.ParseForm(); err != nil {
547 l.Error("invalid form", "err", err)
548 return
549 }
550
551 errorId := "default-label-operation"
552 fail := func(msg string, err error) {
553 l.Error(msg, "err", err)
554 rp.pages.Notice(w, errorId, msg)
555 }
556
557 labelAts := r.Form["label"]
558 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
559 if err != nil {
560 fail("Failed to unsubscribe to label.", err)
561 return
562 }
563
564 // update repo record to remove the label reference
565 newRepo := *f
566 var updated []string
567 for _, l := range newRepo.Labels {
568 if !slices.Contains(labelAts, l) {
569 updated = append(updated, l)
570 }
571 }
572 newRepo.Labels = updated
573 repoRecord := newRepo.AsRecord()
574
575 client, err := rp.oauth.AuthorizedClient(r)
576 if err != nil {
577 fail(err.Error(), err)
578 return
579 }
580
581 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
582 if err != nil {
583 fail("Failed to update labels, no record found on PDS.", err)
584 return
585 }
586 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
587 Collection: tangled.RepoNSID,
588 Repo: newRepo.Did,
589 Rkey: newRepo.Rkey,
590 SwapRecord: ex.Cid,
591 Record: &lexutil.LexiconTypeDecoder{
592 Val: &repoRecord,
593 },
594 })
595
596 err = db.UnsubscribeLabel(
597 rp.db,
598 orm.FilterEq("repo_at", f.RepoAt()),
599 orm.FilterIn("label_at", labelAts),
600 )
601 if err != nil {
602 fail("Failed to unsubscribe label.", err)
603 return
604 }
605
606 // everything succeeded
607 rp.pages.HxRefresh(w)
608}
609
610func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
611 l := rp.logger.With("handler", "LabelPanel")
612
613 f, err := rp.repoResolver.Resolve(r)
614 if err != nil {
615 l.Error("failed to get repo and knot", "err", err)
616 return
617 }
618
619 subjectStr := r.FormValue("subject")
620 subject, err := syntax.ParseATURI(subjectStr)
621 if err != nil {
622 l.Error("failed to get repo and knot", "err", err)
623 return
624 }
625
626 labelDefs, err := db.GetLabelDefinitions(
627 rp.db,
628 orm.FilterIn("at_uri", f.Labels),
629 orm.FilterContains("scope", subject.Collection().String()),
630 )
631 if err != nil {
632 l.Error("failed to fetch label defs", "err", err)
633 return
634 }
635
636 defs := make(map[string]*models.LabelDefinition)
637 for _, l := range labelDefs {
638 defs[l.AtUri().String()] = &l
639 }
640
641 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
642 if err != nil {
643 l.Error("failed to build label state", "err", err)
644 return
645 }
646 state := states[subject]
647
648 user := rp.oauth.GetMultiAccountUser(r)
649 rp.pages.LabelPanel(w, pages.LabelPanelParams{
650 LoggedInUser: user,
651 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
652 Defs: defs,
653 Subject: subject.String(),
654 State: state,
655 })
656}
657
658func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
659 l := rp.logger.With("handler", "EditLabelPanel")
660
661 f, err := rp.repoResolver.Resolve(r)
662 if err != nil {
663 l.Error("failed to get repo and knot", "err", err)
664 return
665 }
666
667 subjectStr := r.FormValue("subject")
668 subject, err := syntax.ParseATURI(subjectStr)
669 if err != nil {
670 l.Error("failed to get repo and knot", "err", err)
671 return
672 }
673
674 labelDefs, err := db.GetLabelDefinitions(
675 rp.db,
676 orm.FilterIn("at_uri", f.Labels),
677 orm.FilterContains("scope", subject.Collection().String()),
678 )
679 if err != nil {
680 l.Error("failed to fetch labels", "err", err)
681 return
682 }
683
684 defs := make(map[string]*models.LabelDefinition)
685 for _, l := range labelDefs {
686 defs[l.AtUri().String()] = &l
687 }
688
689 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
690 if err != nil {
691 l.Error("failed to build label state", "err", err)
692 return
693 }
694 state := states[subject]
695
696 user := rp.oauth.GetMultiAccountUser(r)
697 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
698 LoggedInUser: user,
699 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
700 Defs: defs,
701 Subject: subject.String(),
702 State: state,
703 })
704}
705
706func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
707 user := rp.oauth.GetMultiAccountUser(r)
708 l := rp.logger.With("handler", "AddCollaborator")
709 l = l.With("did", user.Active.Did)
710
711 f, err := rp.repoResolver.Resolve(r)
712 if err != nil {
713 l.Error("failed to get repo and knot", "err", err)
714 return
715 }
716
717 errorId := "add-collaborator-error"
718 fail := func(msg string, err error) {
719 l.Error(msg, "err", err)
720 rp.pages.Notice(w, errorId, msg)
721 }
722
723 collaborator := r.FormValue("collaborator")
724 if collaborator == "" {
725 fail("Invalid form.", nil)
726 return
727 }
728
729 // remove a single leading `@`, to make @handle work with ResolveIdent
730 collaborator = strings.TrimPrefix(collaborator, "@")
731
732 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
733 if err != nil {
734 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
735 return
736 }
737
738 if collaboratorIdent.DID.String() == user.Active.Did {
739 fail("You seem to be adding yourself as a collaborator.", nil)
740 return
741 }
742 l = l.With("collaborator", collaboratorIdent.Handle)
743 l = l.With("knot", f.Knot)
744
745 // announce this relation into the firehose, store into owners' pds
746 client, err := rp.oauth.AuthorizedClient(r)
747 if err != nil {
748 fail("Failed to write to PDS.", err)
749 return
750 }
751
752 // emit a record
753 currentUser := rp.oauth.GetMultiAccountUser(r)
754 rkey := tid.TID()
755 createdAt := time.Now()
756 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
757 Collection: tangled.RepoCollaboratorNSID,
758 Repo: currentUser.Active.Did,
759 Rkey: rkey,
760 Record: &lexutil.LexiconTypeDecoder{
761 Val: repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt),
762 },
763 })
764 // invalid record
765 if err != nil {
766 fail("Failed to write record to PDS.", err)
767 return
768 }
769
770 aturi := resp.Uri
771 l = l.With("at-uri", aturi)
772 l.Info("wrote record to PDS")
773
774 tx, err := rp.db.BeginTx(r.Context(), nil)
775 if err != nil {
776 fail("Failed to add collaborator.", err)
777 return
778 }
779
780 rollback := func() {
781 err1 := tx.Rollback()
782 err2 := rp.enforcer.E.LoadPolicy()
783 err3 := rollbackRecord(context.Background(), aturi, client)
784
785 // ignore txn complete errors, this is okay
786 if errors.Is(err1, sql.ErrTxDone) {
787 err1 = nil
788 }
789
790 if errs := errors.Join(err1, err2, err3); errs != nil {
791 l.Error("failed to rollback changes", "errs", errs)
792 return
793 }
794 }
795 defer rollback()
796
797 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier())
798 if err != nil {
799 fail("Failed to add collaborator permissions.", err)
800 return
801 }
802
803 err = db.AddCollaborator(tx, models.Collaborator{
804 Did: syntax.DID(currentUser.Active.Did),
805 Rkey: rkey,
806 SubjectDid: collaboratorIdent.DID,
807 RepoAt: f.RepoAt(),
808 Created: createdAt,
809 })
810 if err != nil {
811 fail("Failed to add collaborator.", err)
812 return
813 }
814
815 err = tx.Commit()
816 if err != nil {
817 fail("Failed to add collaborator.", err)
818 return
819 }
820
821 err = rp.enforcer.E.SavePolicy()
822 if err != nil {
823 fail("Failed to update collaborator permissions.", err)
824 return
825 }
826
827 // clear aturi to when everything is successful
828 aturi = ""
829
830 rp.pages.HxRefresh(w)
831}
832
833func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
834 user := rp.oauth.GetMultiAccountUser(r)
835 l := rp.logger.With("handler", "DeleteRepo")
836
837 noticeId := "operation-error"
838 f, err := rp.repoResolver.Resolve(r)
839 if err != nil {
840 l.Error("failed to get repo and knot", "err", err)
841 return
842 }
843
844 // remove record from pds
845 atpClient, err := rp.oauth.AuthorizedClient(r)
846 if err != nil {
847 l.Error("failed to get authorized client", "err", err)
848 return
849 }
850 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
851 Collection: tangled.RepoNSID,
852 Repo: user.Active.Did,
853 Rkey: f.Rkey,
854 })
855 if err != nil {
856 l.Error("failed to delete record", "err", err)
857 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
858 return
859 }
860 l.Info("removed repo record", "aturi", f.RepoAt().String())
861
862 client, err := rp.oauth.ServiceClient(
863 r,
864 oauth.WithService(f.Knot),
865 oauth.WithLxm(tangled.RepoDeleteNSID),
866 oauth.WithDev(rp.config.Core.Dev),
867 )
868 if err != nil {
869 l.Error("failed to connect to knot server", "err", err)
870 return
871 }
872
873 err = tangled.RepoDelete(
874 r.Context(),
875 client,
876 &tangled.RepoDelete_Input{
877 Did: f.Did,
878 Name: f.Name,
879 Rkey: f.Rkey,
880 },
881 )
882 if err := xrpcclient.HandleXrpcErr(err); err != nil {
883 rp.pages.Notice(w, noticeId, err.Error())
884 return
885 }
886 l.Info("deleted repo from knot")
887
888 tx, err := rp.db.BeginTx(r.Context(), nil)
889 if err != nil {
890 l.Error("failed to start tx")
891 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
892 return
893 }
894 defer func() {
895 tx.Rollback()
896 err = rp.enforcer.E.LoadPolicy()
897 if err != nil {
898 l.Error("failed to rollback policies")
899 }
900 }()
901
902 // remove collaborator RBAC
903 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot)
904 if err != nil {
905 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
906 return
907 }
908 for _, c := range repoCollaborators {
909 did := c[0]
910 rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier())
911 }
912 l.Info("removed collaborators")
913
914 // remove repo RBAC
915 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier())
916 if err != nil {
917 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
918 return
919 }
920
921 // remove repo from db
922 err = db.RemoveRepo(tx, f.Did, f.Name)
923 if err != nil {
924 rp.pages.Notice(w, noticeId, "Failed to update appview")
925 return
926 }
927 l.Info("removed repo from db")
928
929 err = tx.Commit()
930 if err != nil {
931 l.Error("failed to commit changes", "err", err)
932 http.Error(w, err.Error(), http.StatusInternalServerError)
933 return
934 }
935
936 err = rp.enforcer.E.SavePolicy()
937 if err != nil {
938 l.Error("failed to update ACLs", "err", err)
939 http.Error(w, err.Error(), http.StatusInternalServerError)
940 return
941 }
942
943 rp.notifier.DeleteRepo(r.Context(), f)
944 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
945}
946
947func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
948 l := rp.logger.With("handler", "SyncRepoFork")
949
950 ref := chi.URLParam(r, "ref")
951 ref, _ = url.PathUnescape(ref)
952
953 user := rp.oauth.GetMultiAccountUser(r)
954 f, err := rp.repoResolver.Resolve(r)
955 if err != nil {
956 l.Error("failed to resolve source repo", "err", err)
957 return
958 }
959
960 switch r.Method {
961 case http.MethodPost:
962 client, err := rp.oauth.ServiceClient(
963 r,
964 oauth.WithService(f.Knot),
965 oauth.WithLxm(tangled.RepoForkSyncNSID),
966 oauth.WithDev(rp.config.Core.Dev),
967 )
968 if err != nil {
969 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
970 return
971 }
972
973 if f.Source == "" {
974 rp.pages.Notice(w, "repo", "This repository is not a fork.")
975 return
976 }
977
978 err = tangled.RepoForkSync(
979 r.Context(),
980 client,
981 &tangled.RepoForkSync_Input{
982 Did: user.Active.Did,
983 Name: f.Name,
984 Source: f.Source,
985 Branch: ref,
986 },
987 )
988 if err := xrpcclient.HandleXrpcErr(err); err != nil {
989 rp.pages.Notice(w, "repo", err.Error())
990 return
991 }
992
993 rp.pages.HxRefresh(w)
994 return
995 }
996}
997
998func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
999 l := rp.logger.With("handler", "ForkRepo")
1000
1001 user := rp.oauth.GetMultiAccountUser(r)
1002 f, err := rp.repoResolver.Resolve(r)
1003 if err != nil {
1004 l.Error("failed to resolve source repo", "err", err)
1005 return
1006 }
1007
1008 switch r.Method {
1009 case http.MethodGet:
1010 user := rp.oauth.GetMultiAccountUser(r)
1011 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did)
1012 if err != nil {
1013 rp.pages.Notice(w, "repo", "Invalid user account.")
1014 return
1015 }
1016
1017 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1018 LoggedInUser: user,
1019 Knots: knots,
1020 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1021 })
1022
1023 case http.MethodPost:
1024 l := rp.logger.With("handler", "ForkRepo")
1025
1026 targetKnot := r.FormValue("knot")
1027 if targetKnot == "" {
1028 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1029 return
1030 }
1031 l = l.With("targetKnot", targetKnot)
1032
1033 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create")
1034 if err != nil || !ok {
1035 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1036 return
1037 }
1038
1039 // choose a name for a fork
1040 forkName := r.FormValue("repo_name")
1041 if forkName == "" {
1042 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1043 return
1044 }
1045
1046 // this check is *only* to see if the forked repo name already exists
1047 // in the user's account.
1048 existingRepo, err := db.GetRepo(
1049 rp.db,
1050 orm.FilterEq("did", user.Active.Did),
1051 orm.FilterEq("name", forkName),
1052 )
1053 if err != nil {
1054 if !errors.Is(err, sql.ErrNoRows) {
1055 l.Error("error fetching existing repo from db", "err", err)
1056 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1057 return
1058 }
1059 } else if existingRepo != nil {
1060 // repo with this name already exists
1061 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1062 return
1063 }
1064 l = l.With("forkName", forkName)
1065
1066 uri := "https"
1067 if rp.config.Core.Dev {
1068 uri = "http"
1069 }
1070
1071 forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier())
1072 l = l.With("cloneUrl", forkSourceUrl)
1073
1074 rkey := tid.TID()
1075
1076 // TODO: this could coordinate better with the knot to recieve a clone status
1077 client, err := rp.oauth.ServiceClient(
1078 r,
1079 oauth.WithService(targetKnot),
1080 oauth.WithLxm(tangled.RepoCreateNSID),
1081 oauth.WithDev(rp.config.Core.Dev),
1082 oauth.WithTimeout(time.Second*20),
1083 )
1084 if err != nil {
1085 l.Error("could not create service client", "err", err)
1086 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1087 return
1088 }
1089
1090 forkInput := &tangled.RepoCreate_Input{
1091 Rkey: rkey,
1092 Name: forkName,
1093 Source: &forkSourceUrl,
1094 }
1095 createResp, createErr := tangled.RepoCreate(
1096 r.Context(),
1097 client,
1098 forkInput,
1099 )
1100 if err := xrpcclient.HandleXrpcErr(createErr); err != nil {
1101 rp.pages.Notice(w, "repo", err.Error())
1102 return
1103 }
1104
1105 var repoDid string
1106 if createResp != nil && createResp.RepoDid != nil {
1107 repoDid = *createResp.RepoDid
1108 }
1109 if repoDid == "" {
1110 l.Error("knot returned empty repo DID for fork")
1111 rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
1112 return
1113 }
1114
1115 forkSource := f.RepoAt().String()
1116 if f.RepoDid != "" {
1117 forkSource = f.RepoDid
1118 }
1119
1120 repo := &models.Repo{
1121 Did: user.Active.Did,
1122 Name: forkName,
1123 Knot: targetKnot,
1124 Rkey: rkey,
1125 Source: forkSource,
1126 Description: f.Description,
1127 Created: time.Now(),
1128 Labels: rp.config.Label.DefaultLabelDefs,
1129 RepoDid: repoDid,
1130 }
1131 record := repo.AsRecord()
1132
1133 cleanupKnot := func() {
1134 go func() {
1135 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
1136 for attempt, delay := range delays {
1137 time.Sleep(delay)
1138 deleteClient, dErr := rp.oauth.ServiceClient(
1139 r,
1140 oauth.WithService(targetKnot),
1141 oauth.WithLxm(tangled.RepoDeleteNSID),
1142 oauth.WithDev(rp.config.Core.Dev),
1143 )
1144 if dErr != nil {
1145 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
1146 continue
1147 }
1148 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1149 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
1150 Did: user.Active.Did,
1151 Name: forkName,
1152 Rkey: rkey,
1153 }); dErr != nil {
1154 cancel()
1155 l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr)
1156 continue
1157 }
1158 cancel()
1159 l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1)
1160 return
1161 }
1162 l.Error("exhausted retries for knot cleanup, fork may be orphaned",
1163 "did", user.Active.Did, "fork", forkName, "knot", targetKnot)
1164 }()
1165 }
1166
1167 atpClient, err := rp.oauth.AuthorizedClient(r)
1168 if err != nil {
1169 l.Error("failed to create xrpcclient", "err", err)
1170 cleanupKnot()
1171 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1172 return
1173 }
1174
1175 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1176 Collection: tangled.RepoNSID,
1177 Repo: user.Active.Did,
1178 Rkey: rkey,
1179 Record: &lexutil.LexiconTypeDecoder{
1180 Val: &record,
1181 },
1182 })
1183 if err != nil {
1184 l.Error("failed to write to PDS", "err", err)
1185 cleanupKnot()
1186 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1187 return
1188 }
1189
1190 aturi := atresp.Uri
1191 l = l.With("aturi", aturi)
1192 l.Info("wrote to PDS")
1193
1194 tx, err := rp.db.BeginTx(r.Context(), nil)
1195 if err != nil {
1196 l.Info("txn failed", "err", err)
1197 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1198 return
1199 }
1200
1201 rollback := func() {
1202 err1 := tx.Rollback()
1203 err2 := rp.enforcer.E.LoadPolicy()
1204 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1205
1206 if errors.Is(err1, sql.ErrTxDone) {
1207 err1 = nil
1208 }
1209
1210 if errs := errors.Join(err1, err2, err3); errs != nil {
1211 l.Error("failed to rollback changes", "errs", errs)
1212 }
1213
1214 if aturi != "" {
1215 cleanupKnot()
1216 }
1217 }
1218 defer rollback()
1219
1220 err = db.AddRepo(tx, repo)
1221 if err != nil {
1222 l.Error("failed to AddRepo", "err", err)
1223 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1224 return
1225 }
1226
1227 rbacPath := repo.RepoIdentifier()
1228 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, rbacPath)
1229 if err != nil {
1230 l.Error("failed to add ACLs", "err", err)
1231 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1232 return
1233 }
1234
1235 err = tx.Commit()
1236 if err != nil {
1237 l.Error("failed to commit changes", "err", err)
1238 http.Error(w, err.Error(), http.StatusInternalServerError)
1239 return
1240 }
1241
1242 err = rp.enforcer.E.SavePolicy()
1243 if err != nil {
1244 l.Error("failed to update ACLs", "err", err)
1245 http.Error(w, err.Error(), http.StatusInternalServerError)
1246 return
1247 }
1248
1249 aturi = ""
1250
1251 rp.notifier.NewRepo(r.Context(), repo)
1252 if repoDid != "" {
1253 rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
1254 } else {
1255 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName))
1256 }
1257 }
1258}
1259
1260// this is used to rollback changes made to the PDS
1261//
1262// it is a no-op if the provided ATURI is empty
1263func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
1264 if aturi == "" {
1265 return nil
1266 }
1267
1268 parsed := syntax.ATURI(aturi)
1269
1270 collection := parsed.Collection().String()
1271 repo := parsed.Authority().String()
1272 rkey := parsed.RecordKey().String()
1273
1274 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1275 Collection: collection,
1276 Repo: repo,
1277 Rkey: rkey,
1278 })
1279 return err
1280}
1281
1282func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator {
1283 rec := &tangled.RepoCollaborator{
1284 Subject: subject,
1285 CreatedAt: createdAt.Format(time.RFC3339),
1286 }
1287 s := string(f.RepoAt())
1288 rec.Repo = &s
1289 if f.RepoDid != "" {
1290 rec.RepoDid = &f.RepoDid
1291 }
1292 return rec
1293}