forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cloudflare"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/indexer"
20 "tangled.org/core/appview/mentions"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/notify"
23 dbnotify "tangled.org/core/appview/notify/db"
24 phnotify "tangled.org/core/appview/notify/posthog"
25 "tangled.org/core/appview/oauth"
26 "tangled.org/core/appview/pages"
27 "tangled.org/core/appview/reporesolver"
28 "tangled.org/core/appview/validator"
29 xrpcclient "tangled.org/core/appview/xrpcclient"
30 "tangled.org/core/consts"
31 "tangled.org/core/eventconsumer"
32 "tangled.org/core/idresolver"
33 "tangled.org/core/jetstream"
34 "tangled.org/core/log"
35 tlog "tangled.org/core/log"
36 "tangled.org/core/orm"
37 "tangled.org/core/rbac"
38 "tangled.org/core/tid"
39
40 comatproto "github.com/bluesky-social/indigo/api/atproto"
41 "github.com/bluesky-social/indigo/atproto/atclient"
42 "github.com/bluesky-social/indigo/atproto/syntax"
43 lexutil "github.com/bluesky-social/indigo/lex/util"
44 "github.com/bluesky-social/indigo/xrpc"
45 securejoin "github.com/cyphar/filepath-securejoin"
46 "github.com/go-chi/chi/v5"
47 "github.com/posthog/posthog-go"
48)
49
50type State struct {
51 db *db.DB
52 notifier notify.Notifier
53 indexer *indexer.Indexer
54 oauth *oauth.OAuth
55 enforcer *rbac.Enforcer
56 pages *pages.Pages
57 idResolver *idresolver.Resolver
58 mentionsResolver *mentions.Resolver
59 posthog posthog.Client
60 jc *jetstream.JetstreamClient
61 config *config.Config
62 repoResolver *reporesolver.RepoResolver
63 knotstream *eventconsumer.Consumer
64 spindlestream *eventconsumer.Consumer
65 logger *slog.Logger
66 validator *validator.Validator
67 cfClient *cloudflare.Client
68}
69
70func Make(ctx context.Context, config *config.Config) (*State, error) {
71 logger := tlog.FromContext(ctx)
72
73 d, err := db.Make(ctx, config.Core.DbPath)
74 if err != nil {
75 return nil, fmt.Errorf("failed to create db: %w", err)
76 }
77
78 indexer := indexer.New(log.SubLogger(logger, "indexer"))
79 err = indexer.Init(ctx, d)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create indexer: %w", err)
82 }
83
84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create enforcer: %w", err)
87 }
88
89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
90 if err != nil {
91 logger.Error("failed to create redis resolver", "err", err)
92 res = idresolver.DefaultResolver(config.Plc.PLCURL)
93 }
94
95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
96 if err != nil {
97 return nil, fmt.Errorf("failed to create posthog client: %w", err)
98 }
99
100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
102 if err != nil {
103 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
104 }
105 validator := validator.New(d, res, enforcer)
106
107 repoResolver := reporesolver.New(config, enforcer, d)
108
109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
110
111 wrapper := db.DbWrapper{Execer: d}
112 jc, err := jetstream.NewJetstreamClient(
113 config.Jetstream.Endpoint,
114 "appview",
115 []string{
116 tangled.GraphFollowNSID,
117 tangled.FeedStarNSID,
118 tangled.PublicKeyNSID,
119 tangled.RepoArtifactNSID,
120 tangled.ActorProfileNSID,
121 tangled.KnotMemberNSID,
122 tangled.SpindleMemberNSID,
123 tangled.SpindleNSID,
124 tangled.StringNSID,
125 tangled.RepoIssueNSID,
126 tangled.RepoIssueCommentNSID,
127 tangled.FeedCommentNSID,
128 tangled.LabelDefinitionNSID,
129 tangled.LabelOpNSID,
130 },
131 nil,
132 tlog.SubLogger(logger, "jetstream"),
133 wrapper,
134 false,
135
136 // in-memory filter is inapplicable to appview so
137 // we'll never log dids anyway.
138 false,
139 )
140 if err != nil {
141 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
142 }
143
144 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
145 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
146 }
147
148 var notifiers []notify.Notifier
149
150 // Always add the database notifier
151 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
152
153 // Add other notifiers in production only
154 if !config.Core.Dev {
155 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
156 }
157 notifiers = append(notifiers, indexer)
158
159 // Add webhook notifier
160 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
161
162 notifier := notify.NewMergedNotifier(notifiers)
163 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
164
165 ingester := appview.Ingester{
166 Db: wrapper,
167 Enforcer: enforcer,
168 IdResolver: res,
169 Config: config,
170 Logger: log.SubLogger(logger, "ingester"),
171 Validator: validator,
172 MentionsResolver: mentionsResolver,
173 Notifier: notifier,
174 }
175 err = jc.StartJetstream(ctx, ingester.Ingest())
176 if err != nil {
177 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
178 }
179
180 var cfClient *cloudflare.Client
181 if config.Cloudflare.ApiToken != "" {
182 cfClient, err = cloudflare.New(config)
183 if err != nil {
184 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
185 cfClient = nil
186 }
187 }
188
189 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
190 if err != nil {
191 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
192 }
193 knotstream.Start(ctx)
194
195 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
196 if err != nil {
197 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
198 }
199 spindlestream.Start(ctx)
200
201 state := &State{
202 db: d,
203 notifier: notifier,
204 indexer: indexer,
205 oauth: oauth,
206 enforcer: enforcer,
207 pages: pages,
208 idResolver: res,
209 mentionsResolver: mentionsResolver,
210 posthog: posthog,
211 jc: jc,
212 config: config,
213 repoResolver: repoResolver,
214 knotstream: knotstream,
215 spindlestream: spindlestream,
216 logger: logger,
217 validator: validator,
218 cfClient: cfClient,
219 }
220
221 // fetch initial bluesky posts if configured
222 go fetchBskyPosts(ctx, res, config, d, logger)
223
224 return state, nil
225}
226
227func (s *State) Close() error {
228 // other close up logic goes here
229 return s.db.Close()
230}
231
232func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
233 w.Header().Set("Content-Type", "text/plain")
234 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
235
236 securityTxt := `Contact: mailto:security@tangled.org
237Preferred-Languages: en
238Canonical: https://tangled.org/.well-known/security.txt
239Expires: 2030-01-01T21:59:00.000Z
240`
241 w.Write([]byte(securityTxt))
242}
243
244func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
245 w.Header().Set("Content-Type", "text/plain")
246 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
247
248 robotsTxt := `# Hello, Tanglers!
249User-agent: *
250Allow: /
251Disallow: /*/*/settings
252Disallow: /settings
253Disallow: /*/*/compare
254Disallow: /*/*/fork
255
256Crawl-delay: 1
257`
258 w.Write([]byte(robotsTxt))
259}
260
261func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
262 user := s.oauth.GetMultiAccountUser(r)
263 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
264 LoggedInUser: user,
265 })
266}
267
268func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
269 user := s.oauth.GetMultiAccountUser(r)
270 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
271 LoggedInUser: user,
272 })
273}
274
275func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
276 user := s.oauth.GetMultiAccountUser(r)
277 s.pages.Brand(w, pages.BrandParams{
278 LoggedInUser: user,
279 })
280}
281
282func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
283 user := s.oauth.GetMultiAccountUser(r)
284 if user == nil {
285 return
286 }
287
288 l := s.logger.With("handler", "UpgradeBanner")
289 l = l.With("did", user.Active.Did)
290
291 regs, err := db.GetRegistrations(
292 s.db,
293 orm.FilterEq("did", user.Active.Did),
294 orm.FilterEq("needs_upgrade", 1),
295 )
296 if err != nil {
297 l.Error("non-fatal: failed to get registrations", "err", err)
298 }
299
300 spindles, err := db.GetSpindles(
301 r.Context(),
302 s.db,
303 orm.FilterEq("owner", user.Active.Did),
304 orm.FilterEq("needs_upgrade", 1),
305 )
306 if err != nil {
307 l.Error("non-fatal: failed to get spindles", "err", err)
308 }
309
310 if regs == nil && spindles == nil {
311 return
312 }
313
314 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
315 Registrations: regs,
316 Spindles: spindles,
317 })
318}
319
320func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
321 user := chi.URLParam(r, "user")
322 user = strings.TrimPrefix(user, "@")
323
324 if user == "" {
325 w.WriteHeader(http.StatusBadRequest)
326 return
327 }
328
329 id, err := s.idResolver.ResolveIdent(r.Context(), user)
330 if err != nil {
331 w.WriteHeader(http.StatusInternalServerError)
332 return
333 }
334
335 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
336 if err != nil {
337 s.logger.Error("failed to get public keys", "err", err)
338 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
339 return
340 }
341
342 if len(pubKeys) == 0 {
343 w.WriteHeader(http.StatusNoContent)
344 return
345 }
346
347 for _, k := range pubKeys {
348 key := strings.TrimRight(k.Key, "\n")
349 fmt.Fprintln(w, key)
350 }
351}
352
353func validateRepoName(name string) error {
354 // check for path traversal attempts
355 if name == "." || name == ".." ||
356 strings.Contains(name, "/") || strings.Contains(name, "\\") {
357 return fmt.Errorf("Repository name contains invalid path characters")
358 }
359
360 // check for sequences that could be used for traversal when normalized
361 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
362 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
363 return fmt.Errorf("Repository name contains invalid path sequence")
364 }
365
366 // then continue with character validation
367 for _, char := range name {
368 if !((char >= 'a' && char <= 'z') ||
369 (char >= 'A' && char <= 'Z') ||
370 (char >= '0' && char <= '9') ||
371 char == '-' || char == '_' || char == '.') {
372 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
373 }
374 }
375
376 // additional check to prevent multiple sequential dots
377 if strings.Contains(name, "..") {
378 return fmt.Errorf("Repository name cannot contain sequential dots")
379 }
380
381 // if all checks pass
382 return nil
383}
384
385func stripGitExt(name string) string {
386 return strings.TrimSuffix(name, ".git")
387}
388
389func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
390 switch r.Method {
391 case http.MethodGet:
392 user := s.oauth.GetMultiAccountUser(r)
393 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
394 if err != nil {
395 s.pages.Notice(w, "repo", "Invalid user account.")
396 return
397 }
398
399 s.pages.NewRepo(w, pages.NewRepoParams{
400 LoggedInUser: user,
401 Knots: knots,
402 })
403
404 case http.MethodPost:
405 l := s.logger.With("handler", "NewRepo")
406
407 user := s.oauth.GetMultiAccountUser(r)
408 l = l.With("did", user.Active.Did)
409
410 // form validation
411 domain := r.FormValue("domain")
412 if domain == "" {
413 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
414 return
415 }
416 l = l.With("knot", domain)
417
418 repoName := r.FormValue("name")
419 if repoName == "" {
420 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
421 return
422 }
423
424 if err := validateRepoName(repoName); err != nil {
425 s.pages.Notice(w, "repo", err.Error())
426 return
427 }
428 repoName = stripGitExt(repoName)
429 l = l.With("repoName", repoName)
430
431 defaultBranch := r.FormValue("branch")
432 if defaultBranch == "" {
433 defaultBranch = "main"
434 }
435 l = l.With("defaultBranch", defaultBranch)
436
437 description := r.FormValue("description")
438 if len([]rune(description)) > 140 {
439 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
440 return
441 }
442
443 // ACL validation
444 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
445 if err != nil || !ok {
446 l.Info("unauthorized")
447 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
448 return
449 }
450
451 // Check for existing repos
452 existingRepo, err := db.GetRepo(
453 s.db,
454 orm.FilterEq("did", user.Active.Did),
455 orm.FilterEq("name", repoName),
456 )
457 if err == nil && existingRepo != nil {
458 l.Info("repo exists")
459 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
460 return
461 }
462
463 // create atproto record for this repo
464 rkey := tid.TID()
465 repo := &models.Repo{
466 Did: user.Active.Did,
467 Name: repoName,
468 Knot: domain,
469 Rkey: rkey,
470 Description: description,
471 Created: time.Now(),
472 Labels: s.config.Label.DefaultLabelDefs,
473 }
474 record := repo.AsRecord()
475
476 atpClient, err := s.oauth.AuthorizedClient(r)
477 if err != nil {
478 l.Info("PDS write failed", "err", err)
479 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
480 return
481 }
482
483 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
484 Collection: tangled.RepoNSID,
485 Repo: user.Active.Did,
486 Rkey: rkey,
487 Record: &lexutil.LexiconTypeDecoder{
488 Val: &record,
489 },
490 })
491 if err != nil {
492 l.Info("PDS write failed", "err", err)
493 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
494 return
495 }
496
497 aturi := atresp.Uri
498 l = l.With("aturi", aturi)
499 l.Info("wrote to PDS")
500
501 tx, err := s.db.BeginTx(r.Context(), nil)
502 if err != nil {
503 l.Info("txn failed", "err", err)
504 s.pages.Notice(w, "repo", "Failed to save repository information.")
505 return
506 }
507
508 // The rollback function reverts a few things on failure:
509 // - the pending txn
510 // - the ACLs
511 // - the atproto record created
512 rollback := func() {
513 err1 := tx.Rollback()
514 err2 := s.enforcer.E.LoadPolicy()
515 err3 := rollbackRecord(context.Background(), aturi, atpClient)
516
517 // ignore txn complete errors, this is okay
518 if errors.Is(err1, sql.ErrTxDone) {
519 err1 = nil
520 }
521
522 if errs := errors.Join(err1, err2, err3); errs != nil {
523 l.Error("failed to rollback changes", "errs", errs)
524 return
525 }
526 }
527 defer rollback()
528
529 client, err := s.oauth.ServiceClient(
530 r,
531 oauth.WithService(domain),
532 oauth.WithLxm(tangled.RepoCreateNSID),
533 oauth.WithDev(s.config.Core.Dev),
534 )
535 if err != nil {
536 l.Error("service auth failed", "err", err)
537 s.pages.Notice(w, "repo", "Failed to reach PDS.")
538 return
539 }
540
541 xe := tangled.RepoCreate(
542 r.Context(),
543 client,
544 &tangled.RepoCreate_Input{
545 Rkey: rkey,
546 },
547 )
548 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
549 l.Error("xrpc error", "xe", xe)
550 s.pages.Notice(w, "repo", err.Error())
551 return
552 }
553
554 err = db.AddRepo(tx, repo)
555 if err != nil {
556 l.Error("db write failed", "err", err)
557 s.pages.Notice(w, "repo", "Failed to save repository information.")
558 return
559 }
560
561 // acls
562 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
563 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
564 if err != nil {
565 l.Error("acl setup failed", "err", err)
566 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
567 return
568 }
569
570 err = tx.Commit()
571 if err != nil {
572 l.Error("txn commit failed", "err", err)
573 http.Error(w, err.Error(), http.StatusInternalServerError)
574 return
575 }
576
577 err = s.enforcer.E.SavePolicy()
578 if err != nil {
579 l.Error("acl save failed", "err", err)
580 http.Error(w, err.Error(), http.StatusInternalServerError)
581 return
582 }
583
584 // reset the ATURI because the transaction completed successfully
585 aturi = ""
586
587 s.notifier.NewRepo(r.Context(), repo)
588 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
589 }
590}
591
592// this is used to rollback changes made to the PDS
593//
594// it is a no-op if the provided ATURI is empty
595func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
596 if aturi == "" {
597 return nil
598 }
599
600 parsed := syntax.ATURI(aturi)
601
602 collection := parsed.Collection().String()
603 repo := parsed.Authority().String()
604 rkey := parsed.RecordKey().String()
605
606 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
607 Collection: collection,
608 Repo: repo,
609 Rkey: rkey,
610 })
611 return err
612}
613
614func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
615 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
616 if err != nil {
617 return err
618 }
619 // already present
620 if len(defaultLabels) == len(defaults) {
621 return nil
622 }
623
624 labelDefs, err := models.FetchLabelDefs(r, defaults)
625 if err != nil {
626 return err
627 }
628
629 // Insert each label definition to the database
630 for _, labelDef := range labelDefs {
631 _, err = db.AddLabelDefinition(e, &labelDef)
632 if err != nil {
633 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
634 }
635 }
636
637 return nil
638}
639
640func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
641 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
642 if err != nil {
643 logger.Error("failed to resolve tangled.org DID", "err", err)
644 return
645 }
646
647 pdsEndpoint := resolved.PDSEndpoint()
648 if pdsEndpoint == "" {
649 logger.Error("no PDS endpoint found for tangled.sh DID")
650 return
651 }
652
653 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
654 if err != nil {
655 logger.Error("failed to create appassword session... skipping fetch", "err", err)
656 return
657 }
658
659 client := xrpc.Client{
660 Auth: &xrpc.AuthInfo{
661 AccessJwt: session.AccessJwt,
662 Did: session.Did,
663 },
664 Host: session.PdsEndpoint,
665 }
666
667 l := log.SubLogger(logger, "bluesky")
668
669 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
670 defer ticker.Stop()
671
672 for {
673 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
674 if err != nil {
675 l.Error("failed to fetch bluesky posts", "err", err)
676 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
677 l.Error("failed to insert bluesky posts", "err", err)
678 } else {
679 l.Info("inserted bluesky posts", "count", len(posts))
680 }
681
682 select {
683 case <-ticker.C:
684 case <-ctx.Done():
685 l.Info("stopping bluesky updater")
686 return
687 }
688 }
689}