forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cloudflare"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/indexer"
20 "tangled.org/core/appview/mentions"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/notify"
23 dbnotify "tangled.org/core/appview/notify/db"
24 phnotify "tangled.org/core/appview/notify/posthog"
25 "tangled.org/core/appview/oauth"
26 "tangled.org/core/appview/pages"
27 "tangled.org/core/appview/reporesolver"
28 "tangled.org/core/appview/validator"
29 xrpcclient "tangled.org/core/appview/xrpcclient"
30 "tangled.org/core/consts"
31 "tangled.org/core/eventconsumer"
32 "tangled.org/core/idresolver"
33 "tangled.org/core/jetstream"
34 "tangled.org/core/log"
35 tlog "tangled.org/core/log"
36 "tangled.org/core/orm"
37 "tangled.org/core/rbac"
38 "tangled.org/core/tid"
39
40 comatproto "github.com/bluesky-social/indigo/api/atproto"
41 "github.com/bluesky-social/indigo/atproto/atclient"
42 "github.com/bluesky-social/indigo/atproto/syntax"
43 lexutil "github.com/bluesky-social/indigo/lex/util"
44 "github.com/bluesky-social/indigo/xrpc"
45
46 "github.com/go-chi/chi/v5"
47 "github.com/posthog/posthog-go"
48)
49
50type State struct {
51 db *db.DB
52 notifier notify.Notifier
53 indexer *indexer.Indexer
54 oauth *oauth.OAuth
55 enforcer *rbac.Enforcer
56 pages *pages.Pages
57 idResolver *idresolver.Resolver
58 mentionsResolver *mentions.Resolver
59 posthog posthog.Client
60 jc *jetstream.JetstreamClient
61 config *config.Config
62 repoResolver *reporesolver.RepoResolver
63 knotstream *eventconsumer.Consumer
64 spindlestream *eventconsumer.Consumer
65 logger *slog.Logger
66 validator *validator.Validator
67 cfClient *cloudflare.Client
68}
69
70func Make(ctx context.Context, config *config.Config) (*State, error) {
71 logger := tlog.FromContext(ctx)
72
73 d, err := db.Make(ctx, config.Core.DbPath)
74 if err != nil {
75 return nil, fmt.Errorf("failed to create db: %w", err)
76 }
77
78 indexer := indexer.New(log.SubLogger(logger, "indexer"))
79 err = indexer.Init(ctx, d)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create indexer: %w", err)
82 }
83
84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create enforcer: %w", err)
87 }
88
89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
90 if err != nil {
91 logger.Error("failed to create redis resolver", "err", err)
92 res = idresolver.DefaultResolver(config.Plc.PLCURL)
93 }
94
95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
96 if err != nil {
97 return nil, fmt.Errorf("failed to create posthog client: %w", err)
98 }
99
100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
102 if err != nil {
103 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
104 }
105 validator := validator.New(d, res, enforcer)
106
107 repoResolver := reporesolver.New(config, enforcer, d)
108
109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
110
111 wrapper := db.DbWrapper{Execer: d}
112 jc, err := jetstream.NewJetstreamClient(
113 config.Jetstream.Endpoint,
114 "appview",
115 []string{
116 tangled.GraphFollowNSID,
117 tangled.FeedStarNSID,
118 tangled.PublicKeyNSID,
119 tangled.RepoArtifactNSID,
120 tangled.ActorProfileNSID,
121 tangled.KnotMemberNSID,
122 tangled.SpindleMemberNSID,
123 tangled.SpindleNSID,
124 tangled.StringNSID,
125 tangled.RepoIssueNSID,
126 tangled.RepoIssueCommentNSID,
127 tangled.LabelDefinitionNSID,
128 tangled.LabelOpNSID,
129 },
130 nil,
131 tlog.SubLogger(logger, "jetstream"),
132 wrapper,
133 false,
134
135 // in-memory filter is inapplicable to appview so
136 // we'll never log dids anyway.
137 false,
138 )
139 if err != nil {
140 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
141 }
142
143 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
144 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
145 }
146
147 ingester := appview.Ingester{
148 Db: wrapper,
149 Enforcer: enforcer,
150 IdResolver: res,
151 Config: config,
152 Logger: log.SubLogger(logger, "ingester"),
153 Validator: validator,
154 }
155 err = jc.StartJetstream(ctx, ingester.Ingest())
156 if err != nil {
157 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
158 }
159
160 var notifiers []notify.Notifier
161
162 // Always add the database notifier
163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
164
165 // Add other notifiers in production only
166 if !config.Core.Dev {
167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
168 }
169 notifiers = append(notifiers, indexer)
170
171 // Add webhook notifier
172 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
173
174 notifier := notify.NewMergedNotifier(notifiers)
175 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
176
177 var cfClient *cloudflare.Client
178 if config.Cloudflare.ApiToken != "" {
179 cfClient, err = cloudflare.New(config)
180 if err != nil {
181 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
182 cfClient = nil
183 }
184 }
185
186 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
187 if err != nil {
188 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
189 }
190 knotstream.Start(ctx)
191
192 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
193 if err != nil {
194 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
195 }
196 spindlestream.Start(ctx)
197
198 state := &State{
199 db: d,
200 notifier: notifier,
201 indexer: indexer,
202 oauth: oauth,
203 enforcer: enforcer,
204 pages: pages,
205 idResolver: res,
206 mentionsResolver: mentionsResolver,
207 posthog: posthog,
208 jc: jc,
209 config: config,
210 repoResolver: repoResolver,
211 knotstream: knotstream,
212 spindlestream: spindlestream,
213 logger: logger,
214 validator: validator,
215 cfClient: cfClient,
216 }
217
218 // fetch initial bluesky posts if configured
219 go fetchBskyPosts(ctx, res, config, d, logger)
220
221 return state, nil
222}
223
224func (s *State) Close() error {
225 // other close up logic goes here
226 return s.db.Close()
227}
228
229func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
230 w.Header().Set("Content-Type", "text/plain")
231 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
232
233 securityTxt := `Contact: mailto:security@tangled.org
234Preferred-Languages: en
235Canonical: https://tangled.org/.well-known/security.txt
236Expires: 2030-01-01T21:59:00.000Z
237`
238 w.Write([]byte(securityTxt))
239}
240
241func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
242 w.Header().Set("Content-Type", "text/plain")
243 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
244
245 robotsTxt := `# Hello, Tanglers!
246User-agent: *
247Allow: /
248Disallow: /*/*/settings
249Disallow: /settings
250Disallow: /*/*/compare
251Disallow: /*/*/fork
252
253Crawl-delay: 1
254`
255 w.Write([]byte(robotsTxt))
256}
257
258func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
259 user := s.oauth.GetMultiAccountUser(r)
260 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
261 LoggedInUser: user,
262 })
263}
264
265func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
266 user := s.oauth.GetMultiAccountUser(r)
267 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
268 LoggedInUser: user,
269 })
270}
271
272func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
273 user := s.oauth.GetMultiAccountUser(r)
274 s.pages.Brand(w, pages.BrandParams{
275 LoggedInUser: user,
276 })
277}
278
279func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
280 user := s.oauth.GetMultiAccountUser(r)
281 if user == nil {
282 return
283 }
284
285 l := s.logger.With("handler", "UpgradeBanner")
286 l = l.With("did", user.Active.Did)
287
288 regs, err := db.GetRegistrations(
289 s.db,
290 orm.FilterEq("did", user.Active.Did),
291 orm.FilterEq("needs_upgrade", 1),
292 )
293 if err != nil {
294 l.Error("non-fatal: failed to get registrations", "err", err)
295 }
296
297 spindles, err := db.GetSpindles(
298 r.Context(),
299 s.db,
300 orm.FilterEq("owner", user.Active.Did),
301 orm.FilterEq("needs_upgrade", 1),
302 )
303 if err != nil {
304 l.Error("non-fatal: failed to get spindles", "err", err)
305 }
306
307 if regs == nil && spindles == nil {
308 return
309 }
310
311 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
312 Registrations: regs,
313 Spindles: spindles,
314 })
315}
316
317func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
318 user := chi.URLParam(r, "user")
319 user = strings.TrimPrefix(user, "@")
320
321 if user == "" {
322 w.WriteHeader(http.StatusBadRequest)
323 return
324 }
325
326 id, err := s.idResolver.ResolveIdent(r.Context(), user)
327 if err != nil {
328 w.WriteHeader(http.StatusInternalServerError)
329 return
330 }
331
332 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
333 if err != nil {
334 s.logger.Error("failed to get public keys", "err", err)
335 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
336 return
337 }
338
339 if len(pubKeys) == 0 {
340 w.WriteHeader(http.StatusNoContent)
341 return
342 }
343
344 for _, k := range pubKeys {
345 key := strings.TrimRight(k.Key, "\n")
346 fmt.Fprintln(w, key)
347 }
348}
349
350func validateRepoName(name string) error {
351 // check for path traversal attempts
352 if name == "." || name == ".." ||
353 strings.Contains(name, "/") || strings.Contains(name, "\\") {
354 return fmt.Errorf("Repository name contains invalid path characters")
355 }
356
357 // check for sequences that could be used for traversal when normalized
358 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
359 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
360 return fmt.Errorf("Repository name contains invalid path sequence")
361 }
362
363 // then continue with character validation
364 for _, char := range name {
365 if !((char >= 'a' && char <= 'z') ||
366 (char >= 'A' && char <= 'Z') ||
367 (char >= '0' && char <= '9') ||
368 char == '-' || char == '_' || char == '.') {
369 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
370 }
371 }
372
373 // additional check to prevent multiple sequential dots
374 if strings.Contains(name, "..") {
375 return fmt.Errorf("Repository name cannot contain sequential dots")
376 }
377
378 // if all checks pass
379 return nil
380}
381
382func stripGitExt(name string) string {
383 return strings.TrimSuffix(name, ".git")
384}
385
386func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
387 switch r.Method {
388 case http.MethodGet:
389 user := s.oauth.GetMultiAccountUser(r)
390 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
391 if err != nil {
392 s.pages.Notice(w, "repo", "Invalid user account.")
393 return
394 }
395
396 s.pages.NewRepo(w, pages.NewRepoParams{
397 LoggedInUser: user,
398 Knots: knots,
399 })
400
401 case http.MethodPost:
402 l := s.logger.With("handler", "NewRepo")
403
404 user := s.oauth.GetMultiAccountUser(r)
405 l = l.With("did", user.Active.Did)
406
407 // form validation
408 domain := r.FormValue("domain")
409 if domain == "" {
410 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
411 return
412 }
413 l = l.With("knot", domain)
414
415 repoName := r.FormValue("name")
416 if repoName == "" {
417 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
418 return
419 }
420
421 if err := validateRepoName(repoName); err != nil {
422 s.pages.Notice(w, "repo", err.Error())
423 return
424 }
425 repoName = stripGitExt(repoName)
426 l = l.With("repoName", repoName)
427
428 defaultBranch := r.FormValue("branch")
429 if defaultBranch == "" {
430 defaultBranch = "main"
431 }
432 l = l.With("defaultBranch", defaultBranch)
433
434 description := r.FormValue("description")
435 if len([]rune(description)) > 140 {
436 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
437 return
438 }
439
440 // ACL validation
441 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
442 if err != nil || !ok {
443 l.Info("unauthorized")
444 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
445 return
446 }
447
448 // Check for existing repos
449 existingRepo, err := db.GetRepo(
450 s.db,
451 orm.FilterEq("did", user.Active.Did),
452 orm.FilterEq("name", repoName),
453 )
454 if err == nil && existingRepo != nil {
455 l.Info("repo exists")
456 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
457 return
458 }
459
460 rkey := tid.TID()
461
462 client, err := s.oauth.ServiceClient(
463 r,
464 oauth.WithService(domain),
465 oauth.WithLxm(tangled.RepoCreateNSID),
466 oauth.WithDev(s.config.Core.Dev),
467 )
468 if err != nil {
469 l.Error("service auth failed", "err", err)
470 s.pages.Notice(w, "repo", "Failed to reach knot server.")
471 return
472 }
473
474 input := &tangled.RepoCreate_Input{
475 Rkey: rkey,
476 Name: repoName,
477 DefaultBranch: &defaultBranch,
478 }
479 createResp, xe := tangled.RepoCreate(
480 r.Context(),
481 client,
482 input,
483 )
484 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
485 l.Error("xrpc error", "xe", xe)
486 s.pages.Notice(w, "repo", err.Error())
487 return
488 }
489
490 var repoDid string
491 if createResp != nil && createResp.RepoDid != nil {
492 repoDid = *createResp.RepoDid
493 }
494 if repoDid == "" {
495 l.Error("knot returned empty repo DID")
496 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
497 return
498 }
499
500 repo := &models.Repo{
501 Did: user.Active.Did,
502 Name: repoName,
503 Knot: domain,
504 Rkey: rkey,
505 Description: description,
506 Created: time.Now(),
507 Labels: s.config.Label.DefaultLabelDefs,
508 RepoDid: repoDid,
509 }
510 record := repo.AsRecord()
511
512 cleanupKnot := func() {
513 go func() {
514 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
515 for attempt, delay := range delays {
516 time.Sleep(delay)
517 deleteClient, dErr := s.oauth.ServiceClient(
518 r,
519 oauth.WithService(domain),
520 oauth.WithLxm(tangled.RepoDeleteNSID),
521 oauth.WithDev(s.config.Core.Dev),
522 )
523 if dErr != nil {
524 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
525 continue
526 }
527 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
528 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
529 Did: user.Active.Did,
530 Name: repoName,
531 Rkey: rkey,
532 }); dErr != nil {
533 cancel()
534 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
535 continue
536 }
537 cancel()
538 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
539 return
540 }
541 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
542 "did", user.Active.Did, "repo", repoName, "knot", domain)
543 }()
544 }
545
546 atpClient, err := s.oauth.AuthorizedClient(r)
547 if err != nil {
548 l.Info("PDS write failed", "err", err)
549 cleanupKnot()
550 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
551 return
552 }
553
554 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
555 Collection: tangled.RepoNSID,
556 Repo: user.Active.Did,
557 Rkey: rkey,
558 Record: &lexutil.LexiconTypeDecoder{
559 Val: &record,
560 },
561 })
562 if err != nil {
563 l.Info("PDS write failed", "err", err)
564 cleanupKnot()
565 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
566 return
567 }
568
569 aturi := atresp.Uri
570 l = l.With("aturi", aturi)
571 l.Info("wrote to PDS")
572
573 tx, err := s.db.BeginTx(r.Context(), nil)
574 if err != nil {
575 l.Info("txn failed", "err", err)
576 s.pages.Notice(w, "repo", "Failed to save repository information.")
577 return
578 }
579
580 rollback := func() {
581 err1 := tx.Rollback()
582 err2 := s.enforcer.E.LoadPolicy()
583 err3 := rollbackRecord(context.Background(), aturi, atpClient)
584
585 if errors.Is(err1, sql.ErrTxDone) {
586 err1 = nil
587 }
588
589 if errs := errors.Join(err1, err2, err3); errs != nil {
590 l.Error("failed to rollback changes", "errs", errs)
591 }
592
593 if aturi != "" {
594 cleanupKnot()
595 }
596 }
597 defer rollback()
598
599 err = db.AddRepo(tx, repo)
600 if err != nil {
601 l.Error("db write failed", "err", err)
602 s.pages.Notice(w, "repo", "Failed to save repository information.")
603 return
604 }
605
606 rbacPath := repo.RepoIdentifier()
607 err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath)
608 if err != nil {
609 l.Error("acl setup failed", "err", err)
610 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
611 return
612 }
613
614 err = tx.Commit()
615 if err != nil {
616 l.Error("txn commit failed", "err", err)
617 http.Error(w, err.Error(), http.StatusInternalServerError)
618 return
619 }
620
621 err = s.enforcer.E.SavePolicy()
622 if err != nil {
623 l.Error("acl save failed", "err", err)
624 http.Error(w, err.Error(), http.StatusInternalServerError)
625 return
626 }
627
628 aturi = ""
629
630 s.notifier.NewRepo(r.Context(), repo)
631 if repoDid != "" {
632 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
633 } else {
634 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
635 }
636 }
637}
638
639// this is used to rollback changes made to the PDS
640//
641// it is a no-op if the provided ATURI is empty
642func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
643 if aturi == "" {
644 return nil
645 }
646
647 parsed := syntax.ATURI(aturi)
648
649 collection := parsed.Collection().String()
650 repo := parsed.Authority().String()
651 rkey := parsed.RecordKey().String()
652
653 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
654 Collection: collection,
655 Repo: repo,
656 Rkey: rkey,
657 })
658 return err
659}
660
661func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
662 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
663 if err != nil {
664 return err
665 }
666 // already present
667 if len(defaultLabels) == len(defaults) {
668 return nil
669 }
670
671 labelDefs, err := models.FetchLabelDefs(r, defaults)
672 if err != nil {
673 return err
674 }
675
676 // Insert each label definition to the database
677 for _, labelDef := range labelDefs {
678 _, err = db.AddLabelDefinition(e, &labelDef)
679 if err != nil {
680 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
681 }
682 }
683
684 return nil
685}
686
687func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
688 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
689 if err != nil {
690 logger.Error("failed to resolve tangled.org DID", "err", err)
691 return
692 }
693
694 pdsEndpoint := resolved.PDSEndpoint()
695 if pdsEndpoint == "" {
696 logger.Error("no PDS endpoint found for tangled.sh DID")
697 return
698 }
699
700 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
701 if err != nil {
702 logger.Error("failed to create appassword session... skipping fetch", "err", err)
703 return
704 }
705
706 client := xrpc.Client{
707 Auth: &xrpc.AuthInfo{
708 AccessJwt: session.AccessJwt,
709 Did: session.Did,
710 },
711 Host: session.PdsEndpoint,
712 }
713
714 l := log.SubLogger(logger, "bluesky")
715
716 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
717 defer ticker.Stop()
718
719 for {
720 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
721 if err != nil {
722 l.Error("failed to fetch bluesky posts", "err", err)
723 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
724 l.Error("failed to insert bluesky posts", "err", err)
725 } else {
726 l.Info("inserted bluesky posts", "count", len(posts))
727 }
728
729 select {
730 case <-ticker.C:
731 case <-ctx.Done():
732 l.Info("stopping bluesky updater")
733 return
734 }
735 }
736}