Monorepo for Tangled
tangled.org
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/indexer"
18 "tangled.org/core/appview/mentions"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/appview/notify"
21 dbnotify "tangled.org/core/appview/notify/db"
22 phnotify "tangled.org/core/appview/notify/posthog"
23 "tangled.org/core/appview/oauth"
24 "tangled.org/core/appview/pages"
25 "tangled.org/core/appview/reporesolver"
26 "tangled.org/core/appview/validator"
27 xrpcclient "tangled.org/core/appview/xrpcclient"
28 "tangled.org/core/eventconsumer"
29 "tangled.org/core/idresolver"
30 "tangled.org/core/jetstream"
31 "tangled.org/core/log"
32 tlog "tangled.org/core/log"
33 "tangled.org/core/orm"
34 "tangled.org/core/rbac"
35 "tangled.org/core/tid"
36
37 comatproto "github.com/bluesky-social/indigo/api/atproto"
38 atpclient "github.com/bluesky-social/indigo/atproto/client"
39 "github.com/bluesky-social/indigo/atproto/syntax"
40 lexutil "github.com/bluesky-social/indigo/lex/util"
41 securejoin "github.com/cyphar/filepath-securejoin"
42 "github.com/go-chi/chi/v5"
43 "github.com/posthog/posthog-go"
44)
45
46type State struct {
47 db *db.DB
48 notifier notify.Notifier
49 indexer *indexer.Indexer
50 oauth *oauth.OAuth
51 enforcer *rbac.Enforcer
52 pages *pages.Pages
53 idResolver *idresolver.Resolver
54 mentionsResolver *mentions.Resolver
55 posthog posthog.Client
56 jc *jetstream.JetstreamClient
57 config *config.Config
58 repoResolver *reporesolver.RepoResolver
59 knotstream *eventconsumer.Consumer
60 spindlestream *eventconsumer.Consumer
61 logger *slog.Logger
62 validator *validator.Validator
63}
64
65func Make(ctx context.Context, config *config.Config) (*State, error) {
66 logger := tlog.FromContext(ctx)
67
68 d, err := db.Make(ctx, config.Core.DbPath)
69 if err != nil {
70 return nil, fmt.Errorf("failed to create db: %w", err)
71 }
72
73 indexer := indexer.New(log.SubLogger(logger, "indexer"))
74 err = indexer.Init(ctx, d)
75 if err != nil {
76 return nil, fmt.Errorf("failed to create indexer: %w", err)
77 }
78
79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create enforcer: %w", err)
82 }
83
84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
85 if err != nil {
86 logger.Error("failed to create redis resolver", "err", err)
87 res = idresolver.DefaultResolver(config.Plc.PLCURL)
88 }
89
90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
91 if err != nil {
92 return nil, fmt.Errorf("failed to create posthog client: %w", err)
93 }
94
95 pages := pages.NewPages(config, res, log.SubLogger(logger, "pages"))
96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
97 if err != nil {
98 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
99 }
100 validator := validator.New(d, res, enforcer)
101
102 repoResolver := reporesolver.New(config, enforcer, d)
103
104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
105
106 wrapper := db.DbWrapper{Execer: d}
107 jc, err := jetstream.NewJetstreamClient(
108 config.Jetstream.Endpoint,
109 "appview",
110 []string{
111 tangled.GraphFollowNSID,
112 tangled.FeedStarNSID,
113 tangled.PublicKeyNSID,
114 tangled.RepoArtifactNSID,
115 tangled.ActorProfileNSID,
116 tangled.SpindleMemberNSID,
117 tangled.SpindleNSID,
118 tangled.StringNSID,
119 tangled.RepoIssueNSID,
120 tangled.RepoIssueCommentNSID,
121 tangled.LabelDefinitionNSID,
122 tangled.LabelOpNSID,
123 },
124 nil,
125 tlog.SubLogger(logger, "jetstream"),
126 wrapper,
127 false,
128
129 // in-memory filter is inapplicalble to appview so
130 // we'll never log dids anyway.
131 false,
132 )
133 if err != nil {
134 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
135 }
136
137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
139 }
140
141 ingester := appview.Ingester{
142 Db: wrapper,
143 Enforcer: enforcer,
144 IdResolver: res,
145 Config: config,
146 Logger: log.SubLogger(logger, "ingester"),
147 Validator: validator,
148 }
149 err = jc.StartJetstream(ctx, ingester.Ingest())
150 if err != nil {
151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
152 }
153
154 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
155 if err != nil {
156 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
157 }
158 knotstream.Start(ctx)
159
160 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
161 if err != nil {
162 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
163 }
164 spindlestream.Start(ctx)
165
166 var notifiers []notify.Notifier
167
168 // Always add the database notifier
169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
170
171 // Add other notifiers in production only
172 if !config.Core.Dev {
173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
174 }
175 notifiers = append(notifiers, indexer)
176 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify"))
177
178 state := &State{
179 d,
180 notifier,
181 indexer,
182 oauth,
183 enforcer,
184 pages,
185 res,
186 mentionsResolver,
187 posthog,
188 jc,
189 config,
190 repoResolver,
191 knotstream,
192 spindlestream,
193 logger,
194 validator,
195 }
196
197 return state, nil
198}
199
200func (s *State) Close() error {
201 // other close up logic goes here
202 return s.db.Close()
203}
204
205func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
206 w.Header().Set("Content-Type", "text/plain")
207 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
208
209 robotsTxt := `User-agent: *
210Allow: /
211`
212 w.Write([]byte(robotsTxt))
213}
214
215func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
216 user := s.oauth.GetUser(r)
217 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
218 LoggedInUser: user,
219 })
220}
221
222func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
223 user := s.oauth.GetUser(r)
224 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
225 LoggedInUser: user,
226 })
227}
228
229func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
230 user := s.oauth.GetUser(r)
231 s.pages.Brand(w, pages.BrandParams{
232 LoggedInUser: user,
233 })
234}
235
236func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
237 if s.oauth.GetUser(r) != nil {
238 s.Timeline(w, r)
239 return
240 }
241 s.Home(w, r)
242}
243
244func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
245 user := s.oauth.GetUser(r)
246
247 // TODO: set this flag based on the UI
248 filtered := false
249
250 var userDid string
251 if user != nil {
252 userDid = user.Did
253 }
254 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
255 if err != nil {
256 s.logger.Error("failed to make timeline", "err", err)
257 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
258 }
259
260 repos, err := db.GetTopStarredReposLastWeek(s.db)
261 if err != nil {
262 s.logger.Error("failed to get top starred repos", "err", err)
263 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
264 return
265 }
266
267 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue))
268 if err != nil {
269 // non-fatal
270 }
271
272 s.pages.Timeline(w, pages.TimelineParams{
273 LoggedInUser: user,
274 Timeline: timeline,
275 Repos: repos,
276 GfiLabel: gfiLabel,
277 })
278}
279
280func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
281 user := s.oauth.GetUser(r)
282 if user == nil {
283 return
284 }
285
286 l := s.logger.With("handler", "UpgradeBanner")
287 l = l.With("did", user.Did)
288
289 regs, err := db.GetRegistrations(
290 s.db,
291 orm.FilterEq("did", user.Did),
292 orm.FilterEq("needs_upgrade", 1),
293 )
294 if err != nil {
295 l.Error("non-fatal: failed to get registrations", "err", err)
296 }
297
298 spindles, err := db.GetSpindles(
299 s.db,
300 orm.FilterEq("owner", user.Did),
301 orm.FilterEq("needs_upgrade", 1),
302 )
303 if err != nil {
304 l.Error("non-fatal: failed to get spindles", "err", err)
305 }
306
307 if regs == nil && spindles == nil {
308 return
309 }
310
311 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
312 Registrations: regs,
313 Spindles: spindles,
314 })
315}
316
317func (s *State) Home(w http.ResponseWriter, r *http.Request) {
318 // TODO: set this flag based on the UI
319 filtered := false
320
321 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
322 if err != nil {
323 s.logger.Error("failed to make timeline", "err", err)
324 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
325 return
326 }
327
328 repos, err := db.GetTopStarredReposLastWeek(s.db)
329 if err != nil {
330 s.logger.Error("failed to get top starred repos", "err", err)
331 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
332 return
333 }
334
335 s.pages.Home(w, pages.TimelineParams{
336 LoggedInUser: nil,
337 Timeline: timeline,
338 Repos: repos,
339 })
340}
341
342func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
343 user := chi.URLParam(r, "user")
344 user = strings.TrimPrefix(user, "@")
345
346 if user == "" {
347 w.WriteHeader(http.StatusBadRequest)
348 return
349 }
350
351 id, err := s.idResolver.ResolveIdent(r.Context(), user)
352 if err != nil {
353 w.WriteHeader(http.StatusInternalServerError)
354 return
355 }
356
357 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
358 if err != nil {
359 s.logger.Error("failed to get public keys", "err", err)
360 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
361 return
362 }
363
364 if len(pubKeys) == 0 {
365 w.WriteHeader(http.StatusNoContent)
366 return
367 }
368
369 for _, k := range pubKeys {
370 key := strings.TrimRight(k.Key, "\n")
371 fmt.Fprintln(w, key)
372 }
373}
374
375func validateRepoName(name string) error {
376 // check for path traversal attempts
377 if name == "." || name == ".." ||
378 strings.Contains(name, "/") || strings.Contains(name, "\\") {
379 return fmt.Errorf("Repository name contains invalid path characters")
380 }
381
382 // check for sequences that could be used for traversal when normalized
383 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
384 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
385 return fmt.Errorf("Repository name contains invalid path sequence")
386 }
387
388 // then continue with character validation
389 for _, char := range name {
390 if !((char >= 'a' && char <= 'z') ||
391 (char >= 'A' && char <= 'Z') ||
392 (char >= '0' && char <= '9') ||
393 char == '-' || char == '_' || char == '.') {
394 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
395 }
396 }
397
398 // additional check to prevent multiple sequential dots
399 if strings.Contains(name, "..") {
400 return fmt.Errorf("Repository name cannot contain sequential dots")
401 }
402
403 // if all checks pass
404 return nil
405}
406
407func stripGitExt(name string) string {
408 return strings.TrimSuffix(name, ".git")
409}
410
411func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
412 switch r.Method {
413 case http.MethodGet:
414 user := s.oauth.GetUser(r)
415 knots, err := s.enforcer.GetKnotsForUser(user.Did)
416 if err != nil {
417 s.pages.Notice(w, "repo", "Invalid user account.")
418 return
419 }
420
421 s.pages.NewRepo(w, pages.NewRepoParams{
422 LoggedInUser: user,
423 Knots: knots,
424 })
425
426 case http.MethodPost:
427 l := s.logger.With("handler", "NewRepo")
428
429 user := s.oauth.GetUser(r)
430 l = l.With("did", user.Did)
431
432 // form validation
433 domain := r.FormValue("domain")
434 if domain == "" {
435 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
436 return
437 }
438 l = l.With("knot", domain)
439
440 repoName := r.FormValue("name")
441 if repoName == "" {
442 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
443 return
444 }
445
446 if err := validateRepoName(repoName); err != nil {
447 s.pages.Notice(w, "repo", err.Error())
448 return
449 }
450 repoName = stripGitExt(repoName)
451 l = l.With("repoName", repoName)
452
453 defaultBranch := r.FormValue("branch")
454 if defaultBranch == "" {
455 defaultBranch = "main"
456 }
457 l = l.With("defaultBranch", defaultBranch)
458
459 description := r.FormValue("description")
460
461 // ACL validation
462 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create")
463 if err != nil || !ok {
464 l.Info("unauthorized")
465 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
466 return
467 }
468
469 // Check for existing repos
470 existingRepo, err := db.GetRepo(
471 s.db,
472 orm.FilterEq("did", user.Did),
473 orm.FilterEq("name", repoName),
474 )
475 if err == nil && existingRepo != nil {
476 l.Info("repo exists")
477 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
478 return
479 }
480
481 // create atproto record for this repo
482 rkey := tid.TID()
483 repo := &models.Repo{
484 Did: user.Did,
485 Name: repoName,
486 Knot: domain,
487 Rkey: rkey,
488 Description: description,
489 Created: time.Now(),
490 Labels: s.config.Label.DefaultLabelDefs,
491 }
492 record := repo.AsRecord()
493
494 atpClient, err := s.oauth.AuthorizedClient(r)
495 if err != nil {
496 l.Info("PDS write failed", "err", err)
497 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
498 return
499 }
500
501 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
502 Collection: tangled.RepoNSID,
503 Repo: user.Did,
504 Rkey: rkey,
505 Record: &lexutil.LexiconTypeDecoder{
506 Val: &record,
507 },
508 })
509 if err != nil {
510 l.Info("PDS write failed", "err", err)
511 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
512 return
513 }
514
515 aturi := atresp.Uri
516 l = l.With("aturi", aturi)
517 l.Info("wrote to PDS")
518
519 tx, err := s.db.BeginTx(r.Context(), nil)
520 if err != nil {
521 l.Info("txn failed", "err", err)
522 s.pages.Notice(w, "repo", "Failed to save repository information.")
523 return
524 }
525
526 // The rollback function reverts a few things on failure:
527 // - the pending txn
528 // - the ACLs
529 // - the atproto record created
530 rollback := func() {
531 err1 := tx.Rollback()
532 err2 := s.enforcer.E.LoadPolicy()
533 err3 := rollbackRecord(context.Background(), aturi, atpClient)
534
535 // ignore txn complete errors, this is okay
536 if errors.Is(err1, sql.ErrTxDone) {
537 err1 = nil
538 }
539
540 if errs := errors.Join(err1, err2, err3); errs != nil {
541 l.Error("failed to rollback changes", "errs", errs)
542 return
543 }
544 }
545 defer rollback()
546
547 client, err := s.oauth.ServiceClient(
548 r,
549 oauth.WithService(domain),
550 oauth.WithLxm(tangled.RepoCreateNSID),
551 oauth.WithDev(s.config.Core.Dev),
552 )
553 if err != nil {
554 l.Error("service auth failed", "err", err)
555 s.pages.Notice(w, "repo", "Failed to reach PDS.")
556 return
557 }
558
559 xe := tangled.RepoCreate(
560 r.Context(),
561 client,
562 &tangled.RepoCreate_Input{
563 Rkey: rkey,
564 },
565 )
566 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
567 l.Error("xrpc error", "xe", xe)
568 s.pages.Notice(w, "repo", err.Error())
569 return
570 }
571
572 err = db.AddRepo(tx, repo)
573 if err != nil {
574 l.Error("db write failed", "err", err)
575 s.pages.Notice(w, "repo", "Failed to save repository information.")
576 return
577 }
578
579 // acls
580 p, _ := securejoin.SecureJoin(user.Did, repoName)
581 err = s.enforcer.AddRepo(user.Did, domain, p)
582 if err != nil {
583 l.Error("acl setup failed", "err", err)
584 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
585 return
586 }
587
588 err = tx.Commit()
589 if err != nil {
590 l.Error("txn commit failed", "err", err)
591 http.Error(w, err.Error(), http.StatusInternalServerError)
592 return
593 }
594
595 err = s.enforcer.E.SavePolicy()
596 if err != nil {
597 l.Error("acl save failed", "err", err)
598 http.Error(w, err.Error(), http.StatusInternalServerError)
599 return
600 }
601
602 // reset the ATURI because the transaction completed successfully
603 aturi = ""
604
605 s.notifier.NewRepo(r.Context(), repo)
606 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, repoName))
607 }
608}
609
610// this is used to rollback changes made to the PDS
611//
612// it is a no-op if the provided ATURI is empty
613func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
614 if aturi == "" {
615 return nil
616 }
617
618 parsed := syntax.ATURI(aturi)
619
620 collection := parsed.Collection().String()
621 repo := parsed.Authority().String()
622 rkey := parsed.RecordKey().String()
623
624 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
625 Collection: collection,
626 Repo: repo,
627 Rkey: rkey,
628 })
629 return err
630}
631
632func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
633 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
634 if err != nil {
635 return err
636 }
637 // already present
638 if len(defaultLabels) == len(defaults) {
639 return nil
640 }
641
642 labelDefs, err := models.FetchLabelDefs(r, defaults)
643 if err != nil {
644 return err
645 }
646
647 // Insert each label definition to the database
648 for _, labelDef := range labelDefs {
649 _, err = db.AddLabelDefinition(e, &labelDef)
650 if err != nil {
651 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
652 }
653 }
654
655 return nil
656}