Signed-off-by: Lewis lewis@tangled.org
+527
-41
Diff
round #1
+168
-2
appview/db/repos.go
+168
-2
appview/db/repos.go
···
596
596
return GetRepo(e, orm.FilterEq("repo_did", repoDid))
597
597
}
598
598
599
+
func EnqueuePdsRewritesForRepo(tx *sql.Tx, repoDid, repoAtUri string) error {
600
+
type record struct {
601
+
userDidCol string
602
+
table string
603
+
nsid string
604
+
fkCol string
605
+
}
606
+
sources := []record{
607
+
{"did", "repos", "sh.tangled.repo", "at_uri"},
608
+
{"did", "issues", "sh.tangled.repo.issue", "repo_at"},
609
+
{"owner_did", "pulls", "sh.tangled.repo.pull", "repo_at"},
610
+
{"did", "collaborators", "sh.tangled.repo.collaborator", "repo_at"},
611
+
{"did", "artifacts", "sh.tangled.repo.artifact", "repo_at"},
612
+
{"did", "stars", "sh.tangled.feed.star", "subject_at"},
613
+
}
614
+
615
+
for _, src := range sources {
616
+
rows, err := tx.Query(
617
+
fmt.Sprintf(`SELECT %s, rkey FROM %s WHERE %s = ?`, src.userDidCol, src.table, src.fkCol),
618
+
repoAtUri,
619
+
)
620
+
if err != nil {
621
+
return fmt.Errorf("query %s for pds rewrites: %w", src.table, err)
622
+
}
623
+
624
+
var pairs []struct{ did, rkey string }
625
+
for rows.Next() {
626
+
var d, r string
627
+
if scanErr := rows.Scan(&d, &r); scanErr != nil {
628
+
rows.Close()
629
+
return fmt.Errorf("scan %s for pds rewrites: %w", src.table, scanErr)
630
+
}
631
+
pairs = append(pairs, struct{ did, rkey string }{d, r})
632
+
}
633
+
rows.Close()
634
+
if rowsErr := rows.Err(); rowsErr != nil {
635
+
return fmt.Errorf("iterate %s for pds rewrites: %w", src.table, rowsErr)
636
+
}
637
+
638
+
for _, p := range pairs {
639
+
if err := EnqueuePdsRewrite(tx, p.did, repoDid, src.nsid, p.rkey, repoAtUri); err != nil {
640
+
return fmt.Errorf("enqueue pds rewrite for %s/%s: %w", src.table, p.rkey, err)
641
+
}
642
+
}
643
+
}
644
+
645
+
profileRows, err := tx.Query(
646
+
`SELECT DISTINCT did FROM profile_pinned_repositories WHERE at_uri = ?`,
647
+
repoAtUri,
648
+
)
649
+
if err != nil {
650
+
return fmt.Errorf("query profile_pinned_repositories for pds rewrites: %w", err)
651
+
}
652
+
var profileDids []string
653
+
for profileRows.Next() {
654
+
var d string
655
+
if scanErr := profileRows.Scan(&d); scanErr != nil {
656
+
profileRows.Close()
657
+
return fmt.Errorf("scan profile_pinned_repositories for pds rewrites: %w", scanErr)
658
+
}
659
+
profileDids = append(profileDids, d)
660
+
}
661
+
profileRows.Close()
662
+
if profileRowsErr := profileRows.Err(); profileRowsErr != nil {
663
+
return fmt.Errorf("iterate profile_pinned_repositories for pds rewrites: %w", profileRowsErr)
664
+
}
665
+
666
+
for _, d := range profileDids {
667
+
if err := EnqueuePdsRewrite(tx, d, repoDid, "sh.tangled.actor.profile", "self", repoAtUri); err != nil {
668
+
return fmt.Errorf("enqueue pds rewrite for profile/%s: %w", d, err)
669
+
}
670
+
}
671
+
672
+
return nil
673
+
}
674
+
675
+
type PdsRewrite struct {
676
+
Id int
677
+
UserDid string
678
+
RepoDid string
679
+
RecordNsid string
680
+
RecordRkey string
681
+
OldRepoAt string
682
+
}
683
+
684
+
func GetPendingPdsRewrites(e Execer, userDid string) ([]PdsRewrite, error) {
685
+
rows, err := e.Query(
686
+
`SELECT id, user_did, repo_did, record_nsid, record_rkey, old_repo_at
687
+
FROM pds_rewrite_status
688
+
WHERE user_did = ? AND status = 'pending'`,
689
+
userDid,
690
+
)
691
+
if err != nil {
692
+
return nil, err
693
+
}
694
+
defer rows.Close()
695
+
696
+
var rewrites []PdsRewrite
697
+
for rows.Next() {
698
+
var r PdsRewrite
699
+
if err := rows.Scan(&r.Id, &r.UserDid, &r.RepoDid, &r.RecordNsid, &r.RecordRkey, &r.OldRepoAt); err != nil {
700
+
return nil, err
701
+
}
702
+
rewrites = append(rewrites, r)
703
+
}
704
+
return rewrites, rows.Err()
705
+
}
706
+
707
+
func CompletePdsRewrite(e Execer, id int) error {
708
+
_, err := e.Exec(
709
+
`UPDATE pds_rewrite_status SET status = 'done', updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?`,
710
+
id,
711
+
)
712
+
return err
713
+
}
714
+
599
715
func EnqueuePdsRewrite(e Execer, userDid, repoDid, recordNsid, recordRkey, oldRepoAt string) error {
600
716
_, err := e.Exec(
601
-
`INSERT OR IGNORE INTO pds_rewrite_status
717
+
`INSERT INTO pds_rewrite_status
602
718
(user_did, repo_did, record_nsid, record_rkey, old_repo_at, status)
603
-
VALUES (?, ?, ?, ?, ?, 'pending')`,
719
+
VALUES (?, ?, ?, ?, ?, 'pending')
720
+
ON CONFLICT(user_did, record_nsid, record_rkey) DO UPDATE SET
721
+
status = 'pending',
722
+
repo_did = excluded.repo_did,
723
+
old_repo_at = excluded.old_repo_at,
724
+
updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')`,
604
725
userDid, repoDid, recordNsid, recordRkey, oldRepoAt,
605
726
)
606
727
return err
607
728
}
608
729
730
+
func CascadeRepoDid(tx *sql.Tx, repoAtUri, repoDid string) error {
731
+
updates := []struct{ table, column string }{
732
+
{"repos", "at_uri"},
733
+
{"issues", "repo_at"},
734
+
{"pulls", "repo_at"},
735
+
{"collaborators", "repo_at"},
736
+
{"artifacts", "repo_at"},
737
+
{"webhooks", "repo_at"},
738
+
{"pull_comments", "repo_at"},
739
+
{"repo_issue_seqs", "repo_at"},
740
+
{"repo_pull_seqs", "repo_at"},
741
+
{"repo_languages", "repo_at"},
742
+
{"repo_labels", "repo_at"},
743
+
{"profile_pinned_repositories", "at_uri"},
744
+
}
745
+
746
+
for _, u := range updates {
747
+
_, err := tx.Exec(
748
+
fmt.Sprintf(`UPDATE %s SET repo_did = ? WHERE %s = ?`, u.table, u.column),
749
+
repoDid, repoAtUri,
750
+
)
751
+
if err != nil {
752
+
return fmt.Errorf("cascade repo_did to %s: %w", u.table, err)
753
+
}
754
+
}
755
+
756
+
_, err := tx.Exec(
757
+
`UPDATE stars SET subject_did = ? WHERE subject_at = ?`,
758
+
repoDid, repoAtUri,
759
+
)
760
+
if err != nil {
761
+
return fmt.Errorf("cascade subject_did to stars: %w", err)
762
+
}
763
+
764
+
_, err = tx.Exec(
765
+
`UPDATE repos SET source = ? WHERE source = ?`,
766
+
repoDid, repoAtUri,
767
+
)
768
+
if err != nil {
769
+
return fmt.Errorf("cascade repo_did to repos.source: %w", err)
770
+
}
771
+
772
+
return nil
773
+
}
774
+
609
775
func UpdateDescription(e Execer, repoAt, newDescription string) error {
610
776
_, err := e.Exec(
611
777
`update repos set description = ? where at_uri = ?`, newDescription, repoAt)
+140
appview/oauth/handler.go
+140
appview/oauth/handler.go
···
13
13
14
14
comatproto "github.com/bluesky-social/indigo/api/atproto"
15
15
"github.com/bluesky-social/indigo/atproto/auth/oauth"
16
+
atpclient "github.com/bluesky-social/indigo/atproto/client"
16
17
lexutil "github.com/bluesky-social/indigo/lex/util"
17
18
"github.com/go-chi/chi/v5"
18
19
"github.com/posthog/posthog-go"
···
91
92
go o.addToDefaultKnot(sessData.AccountDID.String())
92
93
go o.addToDefaultSpindle(sessData.AccountDID.String())
93
94
go o.ensureTangledProfile(sessData)
95
+
go o.drainPdsRewrites(sessData)
94
96
95
97
if !o.Config.Core.Dev {
96
98
err = o.Posthog.Enqueue(posthog.Capture{
···
243
245
l.Debug("successfully created empty Tangled profile on PDS and DB")
244
246
}
245
247
248
+
func (o *OAuth) drainPdsRewrites(sessData *oauth.ClientSessionData) {
249
+
ctx := context.Background()
250
+
did := sessData.AccountDID.String()
251
+
l := o.Logger.With("did", did, "handler", "drainPdsRewrites")
252
+
253
+
rewrites, err := db.GetPendingPdsRewrites(o.Db, did)
254
+
if err != nil {
255
+
l.Error("failed to get pending rewrites", "err", err)
256
+
return
257
+
}
258
+
if len(rewrites) == 0 {
259
+
return
260
+
}
261
+
262
+
l.Info("draining pending PDS rewrites", "count", len(rewrites))
263
+
264
+
sess, err := o.ClientApp.ResumeSession(ctx, sessData.AccountDID, sessData.SessionID)
265
+
if err != nil {
266
+
l.Error("failed to resume session for PDS rewrites", "err", err)
267
+
return
268
+
}
269
+
client := sess.APIClient()
270
+
271
+
for _, rw := range rewrites {
272
+
if err := o.rewritePdsRecord(ctx, client, did, rw); err != nil {
273
+
l.Error("failed to rewrite PDS record",
274
+
"nsid", rw.RecordNsid,
275
+
"rkey", rw.RecordRkey,
276
+
"repo_did", rw.RepoDid,
277
+
"err", err)
278
+
continue
279
+
}
280
+
281
+
if err := db.CompletePdsRewrite(o.Db, rw.Id); err != nil {
282
+
l.Error("failed to mark rewrite complete", "id", rw.Id, "err", err)
283
+
}
284
+
}
285
+
}
286
+
287
+
func (o *OAuth) rewritePdsRecord(ctx context.Context, client *atpclient.APIClient, userDid string, rw db.PdsRewrite) error {
288
+
ex, err := comatproto.RepoGetRecord(ctx, client, "", rw.RecordNsid, userDid, rw.RecordRkey)
289
+
if err != nil {
290
+
return fmt.Errorf("get record: %w", err)
291
+
}
292
+
293
+
val := ex.Value.Val
294
+
repoDid := rw.RepoDid
295
+
296
+
switch rw.RecordNsid {
297
+
case tangled.RepoNSID:
298
+
rec, ok := val.(*tangled.Repo)
299
+
if !ok {
300
+
return fmt.Errorf("unexpected type for repo record")
301
+
}
302
+
rec.RepoDid = &repoDid
303
+
304
+
case tangled.RepoIssueNSID:
305
+
rec, ok := val.(*tangled.RepoIssue)
306
+
if !ok {
307
+
return fmt.Errorf("unexpected type for issue record")
308
+
}
309
+
rec.RepoDid = &repoDid
310
+
rec.Repo = nil
311
+
312
+
case tangled.RepoPullNSID:
313
+
rec, ok := val.(*tangled.RepoPull)
314
+
if !ok {
315
+
return fmt.Errorf("unexpected type for pull record")
316
+
}
317
+
if rec.Target != nil {
318
+
rec.Target.RepoDid = &repoDid
319
+
rec.Target.Repo = nil
320
+
}
321
+
if rec.Source != nil && rec.Source.Repo != nil && *rec.Source.Repo == rw.OldRepoAt {
322
+
rec.Source.RepoDid = &repoDid
323
+
rec.Source.Repo = nil
324
+
}
325
+
326
+
case tangled.RepoCollaboratorNSID:
327
+
rec, ok := val.(*tangled.RepoCollaborator)
328
+
if !ok {
329
+
return fmt.Errorf("unexpected type for collaborator record")
330
+
}
331
+
rec.RepoDid = &repoDid
332
+
rec.Repo = nil
333
+
334
+
case tangled.RepoArtifactNSID:
335
+
rec, ok := val.(*tangled.RepoArtifact)
336
+
if !ok {
337
+
return fmt.Errorf("unexpected type for artifact record")
338
+
}
339
+
rec.RepoDid = &repoDid
340
+
rec.Repo = nil
341
+
342
+
case tangled.FeedStarNSID:
343
+
rec, ok := val.(*tangled.FeedStar)
344
+
if !ok {
345
+
return fmt.Errorf("unexpected type for star record")
346
+
}
347
+
rec.SubjectDid = &repoDid
348
+
rec.Subject = nil
349
+
350
+
case tangled.ActorProfileNSID:
351
+
rec, ok := val.(*tangled.ActorProfile)
352
+
if !ok {
353
+
return fmt.Errorf("unexpected type for profile record")
354
+
}
355
+
var dids []string
356
+
var remaining []string
357
+
for _, pinUri := range rec.PinnedRepositories {
358
+
repo, repoErr := db.GetRepoByAtUri(o.Db, pinUri)
359
+
if repoErr != nil || repo.RepoDid == "" {
360
+
remaining = append(remaining, pinUri)
361
+
continue
362
+
}
363
+
dids = append(dids, repo.RepoDid)
364
+
}
365
+
rec.PinnedRepositoryDids = append(rec.PinnedRepositoryDids, dids...)
366
+
rec.PinnedRepositories = remaining
367
+
368
+
default:
369
+
return fmt.Errorf("unsupported NSID for PDS rewrite: %s", rw.RecordNsid)
370
+
}
371
+
372
+
_, err = comatproto.RepoPutRecord(ctx, client, &comatproto.RepoPutRecord_Input{
373
+
Collection: rw.RecordNsid,
374
+
Repo: userDid,
375
+
Rkey: rw.RecordRkey,
376
+
SwapRecord: ex.Cid,
377
+
Record: &lexutil.LexiconTypeDecoder{Val: val},
378
+
})
379
+
if err != nil {
380
+
return fmt.Errorf("put record: %w", err)
381
+
}
382
+
383
+
return nil
384
+
}
385
+
246
386
// create a AppPasswordSession using apppasswords
247
387
type AppPasswordSession struct {
248
388
AccessJwt string `json:"accessJwt"`
+85
appview/state/knotstream.go
+85
appview/state/knotstream.go
···
18
18
"tangled.org/core/appview/models"
19
19
ec "tangled.org/core/eventconsumer"
20
20
"tangled.org/core/eventconsumer/cursor"
21
+
knotdb "tangled.org/core/knotserver/db"
21
22
"tangled.org/core/log"
22
23
"tangled.org/core/orm"
23
24
"tangled.org/core/rbac"
···
72
73
return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx)
73
74
case tangled.PipelineNSID:
74
75
return ingestPipeline(d, source, msg)
76
+
case knotdb.RepoDIDAssignNSID:
77
+
return ingestDIDAssign(d, enforcer, source, msg, ctx)
75
78
}
76
79
77
80
return nil
···
307
310
308
311
return nil
309
312
}
313
+
314
+
func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg ec.Message, ctx context.Context) error {
315
+
logger := log.FromContext(ctx)
316
+
317
+
var record knotdb.RepoDIDAssign
318
+
if err := json.Unmarshal(msg.EventJson, &record); err != nil {
319
+
return fmt.Errorf("unmarshal didAssign: %w", err)
320
+
}
321
+
322
+
if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" {
323
+
return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q",
324
+
record.RepoDid, record.OwnerDid, record.RepoName)
325
+
}
326
+
327
+
logger.Info("processing didAssign event",
328
+
"repo_did", record.RepoDid,
329
+
"owner_did", record.OwnerDid,
330
+
"repo_name", record.RepoName)
331
+
332
+
repos, err := db.GetRepos(d, 1,
333
+
orm.FilterEq("did", record.OwnerDid),
334
+
orm.FilterEq("name", record.RepoName),
335
+
)
336
+
if err != nil || len(repos) == 0 {
337
+
logger.Warn("didAssign for unknown repo, skipping",
338
+
"owner_did", record.OwnerDid,
339
+
"repo_name", record.RepoName)
340
+
return nil
341
+
}
342
+
repo := repos[0]
343
+
repoAtUri := repo.RepoAt().String()
344
+
knot := source.Key()
345
+
legacyResource := record.OwnerDid + "/" + record.RepoName
346
+
347
+
if repo.RepoDid != record.RepoDid {
348
+
tx, err := d.Begin()
349
+
if err != nil {
350
+
return fmt.Errorf("begin didAssign txn: %w", err)
351
+
}
352
+
defer tx.Rollback()
353
+
354
+
if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil {
355
+
return fmt.Errorf("cascade repo_did: %w", err)
356
+
}
357
+
358
+
if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil {
359
+
return fmt.Errorf("enqueue pds rewrites: %w", err)
360
+
}
361
+
362
+
if err := tx.Commit(); err != nil {
363
+
return fmt.Errorf("commit didAssign txn: %w", err)
364
+
}
365
+
}
366
+
367
+
if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil {
368
+
return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err)
369
+
}
370
+
if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil {
371
+
return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err)
372
+
}
373
+
374
+
collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_at", repoAtUri))
375
+
if collabErr != nil {
376
+
return fmt.Errorf("get collaborators for RBAC update: %w", collabErr)
377
+
}
378
+
for _, c := range collabs {
379
+
collabDid := c.SubjectDid.String()
380
+
if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil {
381
+
return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err)
382
+
}
383
+
if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil {
384
+
return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err)
385
+
}
386
+
}
387
+
388
+
logger.Info("didAssign processed successfully",
389
+
"repo_did", record.RepoDid,
390
+
"owner_did", record.OwnerDid,
391
+
"repo_name", record.RepoName)
392
+
393
+
return nil
394
+
}
+10
knotserver/db/didassign.go
+10
knotserver/db/didassign.go
+24
knotserver/db/events.go
+24
knotserver/db/events.go
···
1
1
package db
2
2
3
3
import (
4
+
"encoding/json"
4
5
"fmt"
5
6
"time"
6
7
8
+
"github.com/bluesky-social/indigo/atproto/syntax"
7
9
"tangled.org/core/notifier"
8
10
)
9
11
12
+
var tidClock = syntax.NewTIDClock(0)
13
+
10
14
type Event struct {
11
15
Rkey string `json:"rkey"`
12
16
Nsid string `json:"nsid"`
···
29
33
return err
30
34
}
31
35
36
+
func (d *DB) EmitDIDAssign(n *notifier.Notifier, ownerDid, repoName, repoDid, oldRepoAt string) error {
37
+
payload := RepoDIDAssign{
38
+
OwnerDid: ownerDid,
39
+
RepoName: repoName,
40
+
RepoDid: repoDid,
41
+
OldRepoAt: oldRepoAt,
42
+
}
43
+
44
+
eventJson, err := json.Marshal(payload)
45
+
if err != nil {
46
+
return fmt.Errorf("marshal didAssign event: %w", err)
47
+
}
48
+
49
+
return d.InsertEvent(Event{
50
+
Rkey: tidClock.Next().String(),
51
+
Nsid: RepoDIDAssignNSID,
52
+
EventJson: string(eventJson),
53
+
}, n)
54
+
}
55
+
32
56
func (d *DB) GetEvents(cursor int64) ([]Event, error) {
33
57
whereClause := ""
34
58
args := []any{}
+26
-14
knotserver/events.go
+26
-14
knotserver/events.go
···
54
54
cursor = defaultCursor
55
55
}
56
56
57
-
// complete backfill first before going to live data
58
57
l.Debug("going through backfill", "cursor", cursor)
59
-
if err := h.streamOps(conn, &cursor); err != nil {
58
+
if err := h.drainBackfill(conn, &cursor, 10_000); err != nil {
60
59
l.Error("failed to backfill", "err", err)
61
60
return
62
61
}
···
68
67
l.Debug("stopping stream: client closed connection")
69
68
return
70
69
case <-ch:
71
-
// we have been notified of new data
72
70
l.Debug("going through live data", "cursor", cursor)
73
-
if err := h.streamOps(conn, &cursor); err != nil {
71
+
if _, err := h.streamOps(conn, &cursor); err != nil {
74
72
l.Error("failed to stream", "err", err)
75
73
return
76
74
}
···
83
81
}
84
82
}
85
83
86
-
func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error {
84
+
func (h *Knot) drainBackfill(conn *websocket.Conn, cursor *int64, maxBatches int) error {
85
+
for range maxBatches {
86
+
n, err := h.streamOps(conn, cursor)
87
+
if err != nil {
88
+
return err
89
+
}
90
+
if n < 100 {
91
+
return nil
92
+
}
93
+
}
94
+
h.l.Warn("backfill hit batch limit", "maxBatches", maxBatches, "cursor", *cursor)
95
+
return nil
96
+
}
97
+
98
+
func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) (int, error) {
87
99
events, err := h.db.GetEvents(*cursor)
88
100
if err != nil {
89
101
h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
90
-
return err
102
+
return 0, err
91
103
}
92
104
93
105
for _, event := range events {
94
-
// first extract the inner json into a map
95
106
var eventJson map[string]any
96
107
err := json.Unmarshal([]byte(event.EventJson), &eventJson)
97
108
if err != nil {
98
109
h.l.Error("failed to unmarshal event", "err", err)
99
-
return err
110
+
return 0, err
100
111
}
101
112
102
113
jsonMsg, err := json.Marshal(map[string]any{
103
-
"rkey": event.Rkey,
104
-
"nsid": event.Nsid,
105
-
"event": eventJson,
114
+
"rkey": event.Rkey,
115
+
"nsid": event.Nsid,
116
+
"event": eventJson,
117
+
"created": event.Created,
106
118
})
107
119
if err != nil {
108
120
h.l.Error("failed to marshal record", "err", err)
109
-
return err
121
+
return 0, err
110
122
}
111
123
112
124
if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
113
125
h.l.Debug("err", "err", err)
114
-
return err
126
+
return 0, err
115
127
}
116
128
*cursor = event.Created
117
129
}
118
130
119
-
return nil
131
+
return len(events), nil
120
132
}
+15
-5
knotserver/git.go
+15
-5
knotserver/git.go
···
5
5
"fmt"
6
6
"io"
7
7
"net/http"
8
+
"os"
9
+
"path/filepath"
8
10
"strings"
9
11
12
+
securejoin "github.com/cyphar/filepath-securejoin"
10
13
"github.com/go-chi/chi/v5"
11
14
"tangled.org/core/knotserver/git/service"
12
15
)
···
24
27
}
25
28
26
29
repoDid, err := h.db.GetRepoDid(did, name)
27
-
if err != nil {
28
-
return "", "", fmt.Errorf("repo not found: %w", err)
30
+
if err == nil {
31
+
repoPath, _, _, resolveErr := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
32
+
if resolveErr == nil {
33
+
return repoPath, name, nil
34
+
}
29
35
}
30
-
repoPath, _, _, err := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
31
-
if err != nil {
32
-
return "", "", fmt.Errorf("repo not found: %w", err)
36
+
37
+
repoPath, joinErr := securejoin.SecureJoin(h.c.Repo.ScanPath, filepath.Join(did, name))
38
+
if joinErr != nil {
39
+
return "", "", fmt.Errorf("repo not found: %w", joinErr)
40
+
}
41
+
if _, statErr := os.Stat(repoPath); statErr != nil {
42
+
return "", "", fmt.Errorf("repo not found: %w", statErr)
33
43
}
34
44
return repoPath, name, nil
35
45
}
+28
-13
knotserver/internal.go
+28
-13
knotserver/internal.go
···
6
6
"fmt"
7
7
"log/slog"
8
8
"net/http"
9
+
"os"
9
10
"path/filepath"
10
11
"strings"
11
12
13
+
securejoin "github.com/cyphar/filepath-securejoin"
12
14
"github.com/go-chi/chi/v5"
13
15
"github.com/go-chi/chi/v5/middleware"
14
16
"github.com/go-git/go-git/v5/plumbing"
···
124
126
ownerDid := repoOwnerIdent.DID.String()
125
127
repoName := components[1]
126
128
repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName)
127
-
if didErr != nil {
128
-
w.WriteHeader(http.StatusNotFound)
129
-
l.Error("repo DID not found", "owner", ownerDid, "name", repoName, "err", didErr)
130
-
fmt.Fprintln(w, "repo not found")
131
-
return
132
-
}
133
-
repoPath, _, _, lookupErr := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
134
-
if lookupErr != nil {
135
-
w.WriteHeader(http.StatusNotFound)
136
-
l.Error("repo not found on disk", "repoDid", repoDid, "err", lookupErr)
137
-
fmt.Fprintln(w, "repo not found")
138
-
return
129
+
var repoPath string
130
+
if didErr == nil {
131
+
var lookupErr error
132
+
repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid)
133
+
if lookupErr != nil {
134
+
w.WriteHeader(http.StatusNotFound)
135
+
l.Error("repo not found on disk", "repoDid", repoDid, "err", lookupErr)
136
+
fmt.Fprintln(w, "repo not found")
137
+
return
138
+
}
139
+
rbacResource = repoDid
140
+
} else {
141
+
legacyPath, joinErr := securejoin.SecureJoin(h.c.Repo.ScanPath, filepath.Join(ownerDid, repoName))
142
+
if joinErr != nil {
143
+
w.WriteHeader(http.StatusNotFound)
144
+
fmt.Fprintln(w, "repo not found")
145
+
return
146
+
}
147
+
if _, statErr := os.Stat(legacyPath); statErr != nil {
148
+
w.WriteHeader(http.StatusNotFound)
149
+
l.Error("repo not found on disk (legacy)", "owner", ownerDid, "name", repoName)
150
+
fmt.Fprintln(w, "repo not found")
151
+
return
152
+
}
153
+
repoPath = legacyPath
154
+
rbacResource = ownerDid + "/" + repoName
139
155
}
140
-
rbacResource = repoDid
141
156
rel, relErr := filepath.Rel(h.c.Repo.ScanPath, repoPath)
142
157
if relErr != nil {
143
158
w.WriteHeader(http.StatusInternalServerError)
+28
-4
knotserver/xrpc/xrpc.go
+28
-4
knotserver/xrpc/xrpc.go
···
4
4
"encoding/json"
5
5
"log/slog"
6
6
"net/http"
7
+
"os"
8
+
"path/filepath"
7
9
"strings"
8
10
11
+
securejoin "github.com/cyphar/filepath-securejoin"
12
+
"github.com/go-chi/chi/v5"
9
13
"tangled.org/core/api/tangled"
10
14
"tangled.org/core/idresolver"
11
15
"tangled.org/core/jetstream"
···
15
19
"tangled.org/core/rbac"
16
20
xrpcerr "tangled.org/core/xrpc/errors"
17
21
"tangled.org/core/xrpc/serviceauth"
18
-
19
-
"github.com/go-chi/chi/v5"
20
22
)
21
23
22
24
type Xrpc struct {
···
85
87
)
86
88
}
87
89
88
-
repoPath, _, _, err := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repo)
89
-
if err != nil {
90
+
if !strings.Contains(repo, "/") {
91
+
repoPath, _, _, err := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repo)
92
+
if err != nil {
93
+
return "", xrpcerr.RepoNotFoundError
94
+
}
95
+
return repoPath, nil
96
+
}
97
+
98
+
parts := strings.SplitN(repo, "/", 2)
99
+
ownerDid, repoName := parts[0], parts[1]
100
+
101
+
repoDid, err := x.Db.GetRepoDid(ownerDid, repoName)
102
+
if err == nil {
103
+
repoPath, _, _, resolveErr := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repoDid)
104
+
if resolveErr == nil {
105
+
return repoPath, nil
106
+
}
107
+
}
108
+
109
+
repoPath, joinErr := securejoin.SecureJoin(x.Config.Repo.ScanPath, filepath.Join(ownerDid, repoName))
110
+
if joinErr != nil {
111
+
return "", xrpcerr.RepoNotFoundError
112
+
}
113
+
if _, statErr := os.Stat(repoPath); statErr != nil {
90
114
return "", xrpcerr.RepoNotFoundError
91
115
}
92
116
return repoPath, nil
+2
-2
rbac/rbac.go
+2
-2
rbac/rbac.go
···
34
34
)
35
35
36
36
type Enforcer struct {
37
-
E *casbin.Enforcer
37
+
E *casbin.SyncedEnforcer
38
38
}
39
39
40
40
func NewEnforcer(path string) (*Enforcer, error) {
···
53
53
return nil, err
54
54
}
55
55
56
-
e, err := casbin.NewEnforcer(m, a)
56
+
e, err := casbin.NewSyncedEnforcer(m, a)
57
57
if err != nil {
58
58
return nil, err
59
59
}
History
12 rounds
1 comment
oyster.cafe
submitted
#11
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
merge conflicts detected
expand
collapse
expand
collapse
- go.mod:34
- go.sum:339
expand 0 comments
oyster.cafe
submitted
#10
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#9
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 1 comment
oyster.cafe
submitted
#8
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#7
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#6
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#5
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#4
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#3
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#2
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
appview, knotserver: add didAssign event and PDS rewrite on login
Signed-off-by: Lewis <lewis@tangled.org>
knotserver/xrpc/xrpc.go:88can you absorb this?knotserver/db/events.go:12can be replaced withtid.TID()appview/oauth/handler.go:340I think we should better keep that information just in case. Especially considering users cannot found the target repo without ingesting entire knotstream events.everything else lgtm!