Signed-off-by: Lewis lewis@tangled.org
+526
-41
Diff
round #5
+167
-2
appview/db/repos.go
+167
-2
appview/db/repos.go
···
596
596
return GetRepo(e, orm.FilterEq("repo_did", repoDid))
597
597
}
598
598
599
+
func EnqueuePdsRewritesForRepo(tx *sql.Tx, repoDid, repoAtUri string) error {
600
+
type record struct {
601
+
userDidCol string
602
+
table string
603
+
nsid string
604
+
fkCol string
605
+
}
606
+
sources := []record{
607
+
{"did", "repos", "sh.tangled.repo", "at_uri"},
608
+
{"did", "issues", "sh.tangled.repo.issue", "repo_at"},
609
+
{"owner_did", "pulls", "sh.tangled.repo.pull", "repo_at"},
610
+
{"did", "collaborators", "sh.tangled.repo.collaborator", "repo_at"},
611
+
{"did", "artifacts", "sh.tangled.repo.artifact", "repo_at"},
612
+
{"did", "stars", "sh.tangled.feed.star", "subject_at"},
613
+
}
614
+
615
+
for _, src := range sources {
616
+
rows, err := tx.Query(
617
+
fmt.Sprintf(`SELECT %s, rkey FROM %s WHERE %s = ?`, src.userDidCol, src.table, src.fkCol),
618
+
repoAtUri,
619
+
)
620
+
if err != nil {
621
+
return fmt.Errorf("query %s for pds rewrites: %w", src.table, err)
622
+
}
623
+
624
+
var pairs []struct{ did, rkey string }
625
+
for rows.Next() {
626
+
var d, r string
627
+
if scanErr := rows.Scan(&d, &r); scanErr != nil {
628
+
rows.Close()
629
+
return fmt.Errorf("scan %s for pds rewrites: %w", src.table, scanErr)
630
+
}
631
+
pairs = append(pairs, struct{ did, rkey string }{d, r})
632
+
}
633
+
rows.Close()
634
+
if rowsErr := rows.Err(); rowsErr != nil {
635
+
return fmt.Errorf("iterate %s for pds rewrites: %w", src.table, rowsErr)
636
+
}
637
+
638
+
for _, p := range pairs {
639
+
if err := EnqueuePdsRewrite(tx, p.did, repoDid, src.nsid, p.rkey, repoAtUri); err != nil {
640
+
return fmt.Errorf("enqueue pds rewrite for %s/%s: %w", src.table, p.rkey, err)
641
+
}
642
+
}
643
+
}
644
+
645
+
profileRows, err := tx.Query(
646
+
`SELECT DISTINCT did FROM profile_pinned_repositories WHERE at_uri = ?`,
647
+
repoAtUri,
648
+
)
649
+
if err != nil {
650
+
return fmt.Errorf("query profile_pinned_repositories for pds rewrites: %w", err)
651
+
}
652
+
var profileDids []string
653
+
for profileRows.Next() {
654
+
var d string
655
+
if scanErr := profileRows.Scan(&d); scanErr != nil {
656
+
profileRows.Close()
657
+
return fmt.Errorf("scan profile_pinned_repositories for pds rewrites: %w", scanErr)
658
+
}
659
+
profileDids = append(profileDids, d)
660
+
}
661
+
profileRows.Close()
662
+
if profileRowsErr := profileRows.Err(); profileRowsErr != nil {
663
+
return fmt.Errorf("iterate profile_pinned_repositories for pds rewrites: %w", profileRowsErr)
664
+
}
665
+
666
+
for _, d := range profileDids {
667
+
if err := EnqueuePdsRewrite(tx, d, repoDid, "sh.tangled.actor.profile", "self", repoAtUri); err != nil {
668
+
return fmt.Errorf("enqueue pds rewrite for profile/%s: %w", d, err)
669
+
}
670
+
}
671
+
672
+
return nil
673
+
}
674
+
675
+
type PdsRewrite struct {
676
+
Id int
677
+
RepoDid string
678
+
RecordNsid string
679
+
RecordRkey string
680
+
OldRepoAt string
681
+
}
682
+
683
+
func GetPendingPdsRewrites(e Execer, userDid string) ([]PdsRewrite, error) {
684
+
rows, err := e.Query(
685
+
`SELECT id, repo_did, record_nsid, record_rkey, old_repo_at
686
+
FROM pds_rewrite_status
687
+
WHERE user_did = ? AND status = 'pending'`,
688
+
userDid,
689
+
)
690
+
if err != nil {
691
+
return nil, err
692
+
}
693
+
defer rows.Close()
694
+
695
+
var rewrites []PdsRewrite
696
+
for rows.Next() {
697
+
var r PdsRewrite
698
+
if err := rows.Scan(&r.Id, &r.RepoDid, &r.RecordNsid, &r.RecordRkey, &r.OldRepoAt); err != nil {
699
+
return nil, err
700
+
}
701
+
rewrites = append(rewrites, r)
702
+
}
703
+
return rewrites, rows.Err()
704
+
}
705
+
706
+
func CompletePdsRewrite(e Execer, id int) error {
707
+
_, err := e.Exec(
708
+
`UPDATE pds_rewrite_status SET status = 'done', updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?`,
709
+
id,
710
+
)
711
+
return err
712
+
}
713
+
599
714
func EnqueuePdsRewrite(e Execer, userDid, repoDid, recordNsid, recordRkey, oldRepoAt string) error {
600
715
_, err := e.Exec(
601
-
`INSERT OR IGNORE INTO pds_rewrite_status
716
+
`INSERT INTO pds_rewrite_status
602
717
(user_did, repo_did, record_nsid, record_rkey, old_repo_at, status)
603
-
VALUES (?, ?, ?, ?, ?, 'pending')`,
718
+
VALUES (?, ?, ?, ?, ?, 'pending')
719
+
ON CONFLICT(user_did, record_nsid, record_rkey) DO UPDATE SET
720
+
status = 'pending',
721
+
repo_did = excluded.repo_did,
722
+
old_repo_at = excluded.old_repo_at,
723
+
updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')`,
604
724
userDid, repoDid, recordNsid, recordRkey, oldRepoAt,
605
725
)
606
726
return err
607
727
}
608
728
729
+
func CascadeRepoDid(tx *sql.Tx, repoAtUri, repoDid string) error {
730
+
updates := []struct{ table, column string }{
731
+
{"repos", "at_uri"},
732
+
{"issues", "repo_at"},
733
+
{"pulls", "repo_at"},
734
+
{"collaborators", "repo_at"},
735
+
{"artifacts", "repo_at"},
736
+
{"webhooks", "repo_at"},
737
+
{"pull_comments", "repo_at"},
738
+
{"repo_issue_seqs", "repo_at"},
739
+
{"repo_pull_seqs", "repo_at"},
740
+
{"repo_languages", "repo_at"},
741
+
{"repo_labels", "repo_at"},
742
+
{"profile_pinned_repositories", "at_uri"},
743
+
}
744
+
745
+
for _, u := range updates {
746
+
_, err := tx.Exec(
747
+
fmt.Sprintf(`UPDATE %s SET repo_did = ? WHERE %s = ?`, u.table, u.column),
748
+
repoDid, repoAtUri,
749
+
)
750
+
if err != nil {
751
+
return fmt.Errorf("cascade repo_did to %s: %w", u.table, err)
752
+
}
753
+
}
754
+
755
+
_, err := tx.Exec(
756
+
`UPDATE stars SET subject_did = ? WHERE subject_at = ?`,
757
+
repoDid, repoAtUri,
758
+
)
759
+
if err != nil {
760
+
return fmt.Errorf("cascade subject_did to stars: %w", err)
761
+
}
762
+
763
+
_, err = tx.Exec(
764
+
`UPDATE repos SET source = ? WHERE source = ?`,
765
+
repoDid, repoAtUri,
766
+
)
767
+
if err != nil {
768
+
return fmt.Errorf("cascade repo_did to repos.source: %w", err)
769
+
}
770
+
771
+
return nil
772
+
}
773
+
609
774
func UpdateDescription(e Execer, repoAt, newDescription string) error {
610
775
_, err := e.Exec(
611
776
`update repos set description = ? where at_uri = ?`, newDescription, repoAt)
+140
appview/oauth/handler.go
+140
appview/oauth/handler.go
···
14
14
15
15
comatproto "github.com/bluesky-social/indigo/api/atproto"
16
16
"github.com/bluesky-social/indigo/atproto/auth/oauth"
17
+
atpclient "github.com/bluesky-social/indigo/atproto/client"
17
18
lexutil "github.com/bluesky-social/indigo/lex/util"
18
19
xrpc "github.com/bluesky-social/indigo/xrpc"
19
20
"github.com/go-chi/chi/v5"
···
95
96
go o.addToDefaultSpindle(sessData.AccountDID.String())
96
97
go o.ensureTangledProfile(sessData)
97
98
go o.autoClaimTnglShDomain(sessData.AccountDID.String())
99
+
go o.drainPdsRewrites(sessData)
98
100
99
101
if !o.Config.Core.Dev {
100
102
err = o.Posthog.Enqueue(posthog.Capture{
···
273
275
l.Debug("successfully created empty Tangled profile on PDS and DB")
274
276
}
275
277
278
+
func (o *OAuth) drainPdsRewrites(sessData *oauth.ClientSessionData) {
279
+
ctx := context.Background()
280
+
did := sessData.AccountDID.String()
281
+
l := o.Logger.With("did", did, "handler", "drainPdsRewrites")
282
+
283
+
rewrites, err := db.GetPendingPdsRewrites(o.Db, did)
284
+
if err != nil {
285
+
l.Error("failed to get pending rewrites", "err", err)
286
+
return
287
+
}
288
+
if len(rewrites) == 0 {
289
+
return
290
+
}
291
+
292
+
l.Info("draining pending PDS rewrites", "count", len(rewrites))
293
+
294
+
sess, err := o.ClientApp.ResumeSession(ctx, sessData.AccountDID, sessData.SessionID)
295
+
if err != nil {
296
+
l.Error("failed to resume session for PDS rewrites", "err", err)
297
+
return
298
+
}
299
+
client := sess.APIClient()
300
+
301
+
for _, rw := range rewrites {
302
+
if err := o.rewritePdsRecord(ctx, client, did, rw); err != nil {
303
+
l.Error("failed to rewrite PDS record",
304
+
"nsid", rw.RecordNsid,
305
+
"rkey", rw.RecordRkey,
306
+
"repo_did", rw.RepoDid,
307
+
"err", err)
308
+
continue
309
+
}
310
+
311
+
if err := db.CompletePdsRewrite(o.Db, rw.Id); err != nil {
312
+
l.Error("failed to mark rewrite complete", "id", rw.Id, "err", err)
313
+
}
314
+
}
315
+
}
316
+
317
+
func (o *OAuth) rewritePdsRecord(ctx context.Context, client *atpclient.APIClient, userDid string, rw db.PdsRewrite) error {
318
+
ex, err := comatproto.RepoGetRecord(ctx, client, "", rw.RecordNsid, userDid, rw.RecordRkey)
319
+
if err != nil {
320
+
return fmt.Errorf("get record: %w", err)
321
+
}
322
+
323
+
val := ex.Value.Val
324
+
repoDid := rw.RepoDid
325
+
326
+
switch rw.RecordNsid {
327
+
case tangled.RepoNSID:
328
+
rec, ok := val.(*tangled.Repo)
329
+
if !ok {
330
+
return fmt.Errorf("unexpected type for repo record")
331
+
}
332
+
rec.RepoDid = &repoDid
333
+
334
+
case tangled.RepoIssueNSID:
335
+
rec, ok := val.(*tangled.RepoIssue)
336
+
if !ok {
337
+
return fmt.Errorf("unexpected type for issue record")
338
+
}
339
+
rec.RepoDid = &repoDid
340
+
rec.Repo = nil
341
+
342
+
case tangled.RepoPullNSID:
343
+
rec, ok := val.(*tangled.RepoPull)
344
+
if !ok {
345
+
return fmt.Errorf("unexpected type for pull record")
346
+
}
347
+
if rec.Target != nil {
348
+
rec.Target.RepoDid = &repoDid
349
+
rec.Target.Repo = nil
350
+
}
351
+
if rec.Source != nil && rec.Source.Repo != nil && *rec.Source.Repo == rw.OldRepoAt {
352
+
rec.Source.RepoDid = &repoDid
353
+
rec.Source.Repo = nil
354
+
}
355
+
356
+
case tangled.RepoCollaboratorNSID:
357
+
rec, ok := val.(*tangled.RepoCollaborator)
358
+
if !ok {
359
+
return fmt.Errorf("unexpected type for collaborator record")
360
+
}
361
+
rec.RepoDid = &repoDid
362
+
rec.Repo = nil
363
+
364
+
case tangled.RepoArtifactNSID:
365
+
rec, ok := val.(*tangled.RepoArtifact)
366
+
if !ok {
367
+
return fmt.Errorf("unexpected type for artifact record")
368
+
}
369
+
rec.RepoDid = &repoDid
370
+
rec.Repo = nil
371
+
372
+
case tangled.FeedStarNSID:
373
+
rec, ok := val.(*tangled.FeedStar)
374
+
if !ok {
375
+
return fmt.Errorf("unexpected type for star record")
376
+
}
377
+
rec.SubjectDid = &repoDid
378
+
rec.Subject = nil
379
+
380
+
case tangled.ActorProfileNSID:
381
+
rec, ok := val.(*tangled.ActorProfile)
382
+
if !ok {
383
+
return fmt.Errorf("unexpected type for profile record")
384
+
}
385
+
var dids []string
386
+
var remaining []string
387
+
for _, pinUri := range rec.PinnedRepositories {
388
+
repo, repoErr := db.GetRepoByAtUri(o.Db, pinUri)
389
+
if repoErr != nil || repo.RepoDid == "" {
390
+
remaining = append(remaining, pinUri)
391
+
continue
392
+
}
393
+
dids = append(dids, repo.RepoDid)
394
+
}
395
+
rec.PinnedRepositoryDids = append(rec.PinnedRepositoryDids, dids...)
396
+
rec.PinnedRepositories = remaining
397
+
398
+
default:
399
+
return fmt.Errorf("unsupported NSID for PDS rewrite: %s", rw.RecordNsid)
400
+
}
401
+
402
+
_, err = comatproto.RepoPutRecord(ctx, client, &comatproto.RepoPutRecord_Input{
403
+
Collection: rw.RecordNsid,
404
+
Repo: userDid,
405
+
Rkey: rw.RecordRkey,
406
+
SwapRecord: ex.Cid,
407
+
Record: &lexutil.LexiconTypeDecoder{Val: val},
408
+
})
409
+
if err != nil {
410
+
return fmt.Errorf("put record: %w", err)
411
+
}
412
+
413
+
return nil
414
+
}
415
+
276
416
// create a AppPasswordSession using apppasswords
277
417
type AppPasswordSession struct {
278
418
AccessJwt string `json:"accessJwt"`
+85
appview/state/knotstream.go
+85
appview/state/knotstream.go
···
20
20
"tangled.org/core/appview/sites"
21
21
ec "tangled.org/core/eventconsumer"
22
22
"tangled.org/core/eventconsumer/cursor"
23
+
knotdb "tangled.org/core/knotserver/db"
23
24
"tangled.org/core/log"
24
25
"tangled.org/core/orm"
25
26
"tangled.org/core/rbac"
···
74
75
return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
75
76
case tangled.PipelineNSID:
76
77
return ingestPipeline(d, source, msg)
78
+
case knotdb.RepoDIDAssignNSID:
79
+
return ingestDIDAssign(d, enforcer, source, msg, ctx)
77
80
}
78
81
79
82
return nil
···
374
377
375
378
return nil
376
379
}
380
+
381
+
func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg ec.Message, ctx context.Context) error {
382
+
logger := log.FromContext(ctx)
383
+
384
+
var record knotdb.RepoDIDAssign
385
+
if err := json.Unmarshal(msg.EventJson, &record); err != nil {
386
+
return fmt.Errorf("unmarshal didAssign: %w", err)
387
+
}
388
+
389
+
if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" {
390
+
return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q",
391
+
record.RepoDid, record.OwnerDid, record.RepoName)
392
+
}
393
+
394
+
logger.Info("processing didAssign event",
395
+
"repo_did", record.RepoDid,
396
+
"owner_did", record.OwnerDid,
397
+
"repo_name", record.RepoName)
398
+
399
+
repos, err := db.GetRepos(d, 1,
400
+
orm.FilterEq("did", record.OwnerDid),
401
+
orm.FilterEq("name", record.RepoName),
402
+
)
403
+
if err != nil || len(repos) == 0 {
404
+
logger.Warn("didAssign for unknown repo, skipping",
405
+
"owner_did", record.OwnerDid,
406
+
"repo_name", record.RepoName)
407
+
return nil
408
+
}
409
+
repo := repos[0]
410
+
repoAtUri := repo.RepoAt().String()
411
+
knot := source.Key()
412
+
legacyResource := record.OwnerDid + "/" + record.RepoName
413
+
414
+
if repo.RepoDid != record.RepoDid {
415
+
tx, err := d.Begin()
416
+
if err != nil {
417
+
return fmt.Errorf("begin didAssign txn: %w", err)
418
+
}
419
+
defer tx.Rollback()
420
+
421
+
if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil {
422
+
return fmt.Errorf("cascade repo_did: %w", err)
423
+
}
424
+
425
+
if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil {
426
+
return fmt.Errorf("enqueue pds rewrites: %w", err)
427
+
}
428
+
429
+
if err := tx.Commit(); err != nil {
430
+
return fmt.Errorf("commit didAssign txn: %w", err)
431
+
}
432
+
}
433
+
434
+
if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil {
435
+
return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err)
436
+
}
437
+
if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil {
438
+
return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err)
439
+
}
440
+
441
+
collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_at", repoAtUri))
442
+
if collabErr != nil {
443
+
return fmt.Errorf("get collaborators for RBAC update: %w", collabErr)
444
+
}
445
+
for _, c := range collabs {
446
+
collabDid := c.SubjectDid.String()
447
+
if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil {
448
+
return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err)
449
+
}
450
+
if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil {
451
+
return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err)
452
+
}
453
+
}
454
+
455
+
logger.Info("didAssign processed successfully",
456
+
"repo_did", record.RepoDid,
457
+
"owner_did", record.OwnerDid,
458
+
"repo_name", record.RepoName)
459
+
460
+
return nil
461
+
}
+10
knotserver/db/didassign.go
+10
knotserver/db/didassign.go
+24
knotserver/db/events.go
+24
knotserver/db/events.go
···
1
1
package db
2
2
3
3
import (
4
+
"encoding/json"
4
5
"fmt"
5
6
"time"
6
7
8
+
"github.com/bluesky-social/indigo/atproto/syntax"
7
9
"tangled.org/core/notifier"
8
10
)
9
11
12
+
var tidClock = syntax.NewTIDClock(0)
13
+
10
14
type Event struct {
11
15
Rkey string `json:"rkey"`
12
16
Nsid string `json:"nsid"`
···
29
33
return err
30
34
}
31
35
36
+
func (d *DB) EmitDIDAssign(n *notifier.Notifier, ownerDid, repoName, repoDid, oldRepoAt string) error {
37
+
payload := RepoDIDAssign{
38
+
OwnerDid: ownerDid,
39
+
RepoName: repoName,
40
+
RepoDid: repoDid,
41
+
OldRepoAt: oldRepoAt,
42
+
}
43
+
44
+
eventJson, err := json.Marshal(payload)
45
+
if err != nil {
46
+
return fmt.Errorf("marshal didAssign event: %w", err)
47
+
}
48
+
49
+
return d.InsertEvent(Event{
50
+
Rkey: tidClock.Next().String(),
51
+
Nsid: RepoDIDAssignNSID,
52
+
EventJson: string(eventJson),
53
+
}, n)
54
+
}
55
+
32
56
func (d *DB) GetEvents(cursor int64) ([]Event, error) {
33
57
whereClause := ""
34
58
args := []any{}
+26
-14
knotserver/events.go
+26
-14
knotserver/events.go
···
54
54
cursor = defaultCursor
55
55
}
56
56
57
-
// complete backfill first before going to live data
58
57
l.Debug("going through backfill", "cursor", cursor)
59
-
if err := h.streamOps(conn, &cursor); err != nil {
58
+
if err := h.drainBackfill(conn, &cursor, 10_000); err != nil {
60
59
l.Error("failed to backfill", "err", err)
61
60
return
62
61
}
···
68
67
l.Debug("stopping stream: client closed connection")
69
68
return
70
69
case <-ch:
71
-
// we have been notified of new data
72
70
l.Debug("going through live data", "cursor", cursor)
73
-
if err := h.streamOps(conn, &cursor); err != nil {
71
+
if _, err := h.streamOps(conn, &cursor); err != nil {
74
72
l.Error("failed to stream", "err", err)
75
73
return
76
74
}
···
83
81
}
84
82
}
85
83
86
-
func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error {
84
+
func (h *Knot) drainBackfill(conn *websocket.Conn, cursor *int64, maxBatches int) error {
85
+
for range maxBatches {
86
+
n, err := h.streamOps(conn, cursor)
87
+
if err != nil {
88
+
return err
89
+
}
90
+
if n < 100 {
91
+
return nil
92
+
}
93
+
}
94
+
h.l.Warn("backfill hit batch limit", "maxBatches", maxBatches, "cursor", *cursor)
95
+
return nil
96
+
}
97
+
98
+
func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) (int, error) {
87
99
events, err := h.db.GetEvents(*cursor)
88
100
if err != nil {
89
101
h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
90
-
return err
102
+
return 0, err
91
103
}
92
104
93
105
for _, event := range events {
94
-
// first extract the inner json into a map
95
106
var eventJson map[string]any
96
107
err := json.Unmarshal([]byte(event.EventJson), &eventJson)
97
108
if err != nil {
98
109
h.l.Error("failed to unmarshal event", "err", err)
99
-
return err
110
+
return 0, err
100
111
}
101
112
102
113
jsonMsg, err := json.Marshal(map[string]any{
103
-
"rkey": event.Rkey,
104
-
"nsid": event.Nsid,
105
-
"event": eventJson,
114
+
"rkey": event.Rkey,
115
+
"nsid": event.Nsid,
116
+
"event": eventJson,
117
+
"created": event.Created,
106
118
})
107
119
if err != nil {
108
120
h.l.Error("failed to marshal record", "err", err)
109
-
return err
121
+
return 0, err
110
122
}
111
123
112
124
if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
113
125
h.l.Debug("err", "err", err)
114
-
return err
126
+
return 0, err
115
127
}
116
128
*cursor = event.Created
117
129
}
118
130
119
-
return nil
131
+
return len(events), nil
120
132
}
+15
-5
knotserver/git.go
+15
-5
knotserver/git.go
···
5
5
"fmt"
6
6
"io"
7
7
"net/http"
8
+
"os"
9
+
"path/filepath"
8
10
"strings"
9
11
12
+
securejoin "github.com/cyphar/filepath-securejoin"
10
13
"github.com/go-chi/chi/v5"
11
14
"tangled.org/core/knotserver/git/service"
12
15
)
···
24
27
}
25
28
26
29
repoDid, err := h.db.GetRepoDid(did, name)
27
-
if err != nil {
28
-
return "", "", fmt.Errorf("repo not found: %w", err)
30
+
if err == nil {
31
+
repoPath, _, _, resolveErr := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
32
+
if resolveErr == nil {
33
+
return repoPath, name, nil
34
+
}
29
35
}
30
-
repoPath, _, _, err := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
31
-
if err != nil {
32
-
return "", "", fmt.Errorf("repo not found: %w", err)
36
+
37
+
repoPath, joinErr := securejoin.SecureJoin(h.c.Repo.ScanPath, filepath.Join(did, name))
38
+
if joinErr != nil {
39
+
return "", "", fmt.Errorf("repo not found: %w", joinErr)
40
+
}
41
+
if _, statErr := os.Stat(repoPath); statErr != nil {
42
+
return "", "", fmt.Errorf("repo not found: %w", statErr)
33
43
}
34
44
return repoPath, name, nil
35
45
}
+28
-13
knotserver/internal.go
+28
-13
knotserver/internal.go
···
6
6
"fmt"
7
7
"log/slog"
8
8
"net/http"
9
+
"os"
9
10
"path/filepath"
10
11
"strings"
11
12
13
+
securejoin "github.com/cyphar/filepath-securejoin"
12
14
"github.com/go-chi/chi/v5"
13
15
"github.com/go-chi/chi/v5/middleware"
14
16
"github.com/go-git/go-git/v5/plumbing"
···
124
126
ownerDid := repoOwnerIdent.DID.String()
125
127
repoName := components[1]
126
128
repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
127
-
if didErr != nil {
128
-
w.WriteHeader(http.StatusNotFound)
129
-
l.Error("repo DID not found", "owner", ownerDid, "name", repoName, "err", didErr)
130
-
fmt.Fprintln(w, "repo not found")
131
-
return
132
-
}
133
-
repoPath, _, _, lookupErr := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
134
-
if lookupErr != nil {
135
-
w.WriteHeader(http.StatusNotFound)
136
-
l.Error("repo not found on disk", "repoDid", repoDid, "err", lookupErr)
137
-
fmt.Fprintln(w, "repo not found")
138
-
return
129
+
var repoPath string
130
+
if didErr == nil {
131
+
var lookupErr error
132
+
repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
133
+
if lookupErr != nil {
134
+
w.WriteHeader(http.StatusNotFound)
135
+
l.Error("repo not found on disk", "repoDid", repoDid, "err", lookupErr)
136
+
fmt.Fprintln(w, "repo not found")
137
+
return
138
+
}
139
+
rbacResource = repoDid
140
+
} else {
141
+
legacyPath, joinErr := securejoin.SecureJoin(h.c.Repo.ScanPath, filepath.Join(ownerDid, repoName))
142
+
if joinErr != nil {
143
+
w.WriteHeader(http.StatusNotFound)
144
+
fmt.Fprintln(w, "repo not found")
145
+
return
146
+
}
147
+
if _, statErr := os.Stat(legacyPath); statErr != nil {
148
+
w.WriteHeader(http.StatusNotFound)
149
+
l.Error("repo not found on disk (legacy)", "owner", ownerDid, "name", repoName)
150
+
fmt.Fprintln(w, "repo not found")
151
+
return
152
+
}
153
+
repoPath = legacyPath
154
+
rbacResource = ownerDid + "/" + repoName
139
155
}
140
-
rbacResource = repoDid
141
156
rel, relErr := filepath.Rel(h.c.Repo.ScanPath, repoPath)
142
157
if relErr != nil {
143
158
w.WriteHeader(http.StatusInternalServerError)
+28
-4
knotserver/xrpc/xrpc.go
+28
-4
knotserver/xrpc/xrpc.go
···
4
4
"encoding/json"
5
5
"log/slog"
6
6
"net/http"
7
+
"os"
8
+
"path/filepath"
7
9
"strings"
8
10
11
+
securejoin "github.com/cyphar/filepath-securejoin"
12
+
"github.com/go-chi/chi/v5"
9
13
"tangled.org/core/api/tangled"
10
14
"tangled.org/core/idresolver"
11
15
"tangled.org/core/jetstream"
···
15
19
"tangled.org/core/rbac"
16
20
xrpcerr "tangled.org/core/xrpc/errors"
17
21
"tangled.org/core/xrpc/serviceauth"
18
-
19
-
"github.com/go-chi/chi/v5"
20
22
)
21
23
22
24
type Xrpc struct {
···
85
87
)
86
88
}
87
89
88
-
repoPath, _, _, err := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repo)
89
-
if err != nil {
90
+
if !strings.Contains(repo, "/") {
91
+
repoPath, _, _, err := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repo)
92
+
if err != nil {
93
+
return "", xrpcerr.RepoNotFoundError
94
+
}
95
+
return repoPath, nil
96
+
}
97
+
98
+
parts := strings.SplitN(repo, "/", 2)
99
+
ownerDid, repoName := parts[0], parts[1]
100
+
101
+
repoDid, err := x.Db.GetRepoDid(ownerDid, repoName)
102
+
if err == nil {
103
+
repoPath, _, _, resolveErr := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repoDid)
104
+
if resolveErr == nil {
105
+
return repoPath, nil
106
+
}
107
+
}
108
+
109
+
repoPath, joinErr := securejoin.SecureJoin(x.Config.Repo.ScanPath, filepath.Join(ownerDid, repoName))
110
+
if joinErr != nil {
111
+
return "", xrpcerr.RepoNotFoundError
112
+
}
113
+
if _, statErr := os.Stat(repoPath); statErr != nil {
90
114
return "", xrpcerr.RepoNotFoundError
91
115
}
92
116
return repoPath, nil
+2
-2
rbac/rbac.go
+2
-2
rbac/rbac.go
···
34
34
)
35
35
36
36
type Enforcer struct {
37
-
E *casbin.Enforcer
37
+
E *casbin.SyncedEnforcer
38
38
}
39
39
40
40
func NewEnforcer(path string) (*Enforcer, error) {
···
53
53
return nil, err
54
54
}
55
55
56
-
e, err := casbin.NewEnforcer(m, a)
56
+
e, err := casbin.NewSyncedEnforcer(m, a)
57
57
if err != nil {
58
58
return nil, err
59
59
}
History
12 rounds
1 comment
oyster.cafe
submitted
#11
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
merge conflicts detected
expand
collapse
expand
collapse
- go.mod:34
- go.sum:339
expand 0 comments
oyster.cafe
submitted
#10
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#9
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 1 comment
oyster.cafe
submitted
#8
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#7
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#6
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#5
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#4
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#3
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#2
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
knotserver/xrpc/xrpc.go:88can you absorb this?knotserver/db/events.go:12can be replaced withtid.TID()appview/oauth/handler.go:340I think we should better keep that information just in case. Especially considering users cannot found the target repo without ingesting entire knotstream events.everything else lgtm!