forked from
tangled.org/core
Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cloudflare"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/indexer"
20 "tangled.org/core/appview/mentions"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/notify"
23 dbnotify "tangled.org/core/appview/notify/db"
24 phnotify "tangled.org/core/appview/notify/posthog"
25 "tangled.org/core/appview/oauth"
26 "tangled.org/core/appview/pages"
27 "tangled.org/core/appview/reporesolver"
28 "tangled.org/core/appview/validator"
29 xrpcclient "tangled.org/core/appview/xrpcclient"
30 "tangled.org/core/consts"
31 "tangled.org/core/eventconsumer"
32 "tangled.org/core/idresolver"
33 "tangled.org/core/jetstream"
34 "tangled.org/core/log"
35 tlog "tangled.org/core/log"
36 "tangled.org/core/orm"
37 "tangled.org/core/rbac"
38 "tangled.org/core/tid"
39
40 comatproto "github.com/bluesky-social/indigo/api/atproto"
41 "github.com/bluesky-social/indigo/atproto/atclient"
42 "github.com/bluesky-social/indigo/atproto/syntax"
43 lexutil "github.com/bluesky-social/indigo/lex/util"
44 "github.com/bluesky-social/indigo/xrpc"
45 securejoin "github.com/cyphar/filepath-securejoin"
46 "github.com/go-chi/chi/v5"
47 "github.com/posthog/posthog-go"
48)
49
50type State struct {
51 db *db.DB
52 notifier notify.Notifier
53 indexer *indexer.Indexer
54 oauth *oauth.OAuth
55 enforcer *rbac.Enforcer
56 pages *pages.Pages
57 idResolver *idresolver.Resolver
58 mentionsResolver *mentions.Resolver
59 posthog posthog.Client
60 jc *jetstream.JetstreamClient
61 config *config.Config
62 repoResolver *reporesolver.RepoResolver
63 knotstream *eventconsumer.Consumer
64 spindlestream *eventconsumer.Consumer
65 logger *slog.Logger
66 validator *validator.Validator
67 cfClient *cloudflare.Client
68}
69
70func Make(ctx context.Context, config *config.Config) (*State, error) {
71 logger := tlog.FromContext(ctx)
72
73 d, err := db.Make(ctx, config.Core.DbPath)
74 if err != nil {
75 return nil, fmt.Errorf("failed to create db: %w", err)
76 }
77
78 indexer := indexer.New(log.SubLogger(logger, "indexer"))
79 err = indexer.Init(ctx, d)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create indexer: %w", err)
82 }
83
84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create enforcer: %w", err)
87 }
88
89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
90 if err != nil {
91 logger.Error("failed to create redis resolver", "err", err)
92 res = idresolver.DefaultResolver(config.Plc.PLCURL)
93 }
94
95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
96 if err != nil {
97 return nil, fmt.Errorf("failed to create posthog client: %w", err)
98 }
99
100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
102 if err != nil {
103 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
104 }
105 validator := validator.New(d, res, enforcer)
106
107 repoResolver := reporesolver.New(config, enforcer, d)
108
109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
110
111 wrapper := db.DbWrapper{Execer: d}
112 jc, err := jetstream.NewJetstreamClient(
113 config.Jetstream.Endpoint,
114 "appview",
115 []string{
116 tangled.GraphFollowNSID,
117 tangled.FeedStarNSID,
118 tangled.PublicKeyNSID,
119 tangled.RepoArtifactNSID,
120 tangled.ActorProfileNSID,
121 tangled.KnotMemberNSID,
122 tangled.SpindleMemberNSID,
123 tangled.SpindleNSID,
124 tangled.StringNSID,
125 tangled.RepoIssueNSID,
126 tangled.RepoIssueCommentNSID,
127 tangled.LabelDefinitionNSID,
128 tangled.LabelOpNSID,
129 },
130 nil,
131 tlog.SubLogger(logger, "jetstream"),
132 wrapper,
133 false,
134
135 // in-memory filter is inapplicable to appview so
136 // we'll never log dids anyway.
137 false,
138 )
139 if err != nil {
140 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
141 }
142
143 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
144 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
145 }
146
147 ingester := appview.Ingester{
148 Db: wrapper,
149 Enforcer: enforcer,
150 IdResolver: res,
151 Config: config,
152 Logger: log.SubLogger(logger, "ingester"),
153 Validator: validator,
154 }
155 err = jc.StartJetstream(ctx, ingester.Ingest())
156 if err != nil {
157 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
158 }
159
160 var notifiers []notify.Notifier
161
162 // Always add the database notifier
163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
164
165 // Add other notifiers in production only
166 if !config.Core.Dev {
167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
168 }
169 notifiers = append(notifiers, indexer)
170
171 // Add webhook notifier
172 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
173
174 notifier := notify.NewMergedNotifier(notifiers)
175 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
176
177 var cfClient *cloudflare.Client
178 if config.Cloudflare.ApiToken != "" {
179 cfClient, err = cloudflare.New(config)
180 if err != nil {
181 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
182 cfClient = nil
183 }
184 }
185
186 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
187 if err != nil {
188 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
189 }
190 knotstream.Start(ctx)
191
192 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
193 if err != nil {
194 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
195 }
196 spindlestream.Start(ctx)
197
198 state := &State{
199 db: d,
200 notifier: notifier,
201 indexer: indexer,
202 oauth: oauth,
203 enforcer: enforcer,
204 pages: pages,
205 idResolver: res,
206 mentionsResolver: mentionsResolver,
207 posthog: posthog,
208 jc: jc,
209 config: config,
210 repoResolver: repoResolver,
211 knotstream: knotstream,
212 spindlestream: spindlestream,
213 logger: logger,
214 validator: validator,
215 cfClient: cfClient,
216 }
217
218 // fetch initial bluesky posts if configured
219 go fetchBskyPosts(ctx, res, config, d, logger)
220
221 return state, nil
222}
223
224func (s *State) Close() error {
225 // other close up logic goes here
226 return s.db.Close()
227}
228
229func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
230 w.Header().Set("Content-Type", "text/plain")
231 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
232
233 securityTxt := `Contact: mailto:security@tangled.org
234Preferred-Languages: en
235Canonical: https://tangled.org/.well-known/security.txt
236Expires: 2030-01-01T21:59:00.000Z
237`
238 w.Write([]byte(securityTxt))
239}
240
241func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
242 w.Header().Set("Content-Type", "text/plain")
243 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
244
245 robotsTxt := `# Hello, Tanglers!
246User-agent: *
247Allow: /
248Disallow: /*/*/settings
249Disallow: /settings
250Disallow: /*/*/compare
251Disallow: /*/*/fork
252
253Crawl-delay: 1
254`
255 w.Write([]byte(robotsTxt))
256}
257
258func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
259 user := s.oauth.GetMultiAccountUser(r)
260 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
261 LoggedInUser: user,
262 })
263}
264
265func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
266 user := s.oauth.GetMultiAccountUser(r)
267 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
268 LoggedInUser: user,
269 })
270}
271
272func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
273 user := s.oauth.GetMultiAccountUser(r)
274 s.pages.Brand(w, pages.BrandParams{
275 LoggedInUser: user,
276 })
277}
278
279func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
280 user := s.oauth.GetMultiAccountUser(r)
281 if user == nil {
282 return
283 }
284
285 l := s.logger.With("handler", "UpgradeBanner")
286 l = l.With("did", user.Active.Did)
287
288 regs, err := db.GetRegistrations(
289 s.db,
290 orm.FilterEq("did", user.Active.Did),
291 orm.FilterEq("needs_upgrade", 1),
292 )
293 if err != nil {
294 l.Error("non-fatal: failed to get registrations", "err", err)
295 }
296
297 spindles, err := db.GetSpindles(
298 s.db,
299 orm.FilterEq("owner", user.Active.Did),
300 orm.FilterEq("needs_upgrade", 1),
301 )
302 if err != nil {
303 l.Error("non-fatal: failed to get spindles", "err", err)
304 }
305
306 if regs == nil && spindles == nil {
307 return
308 }
309
310 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
311 Registrations: regs,
312 Spindles: spindles,
313 })
314}
315
316func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
317 user := chi.URLParam(r, "user")
318 user = strings.TrimPrefix(user, "@")
319
320 if user == "" {
321 w.WriteHeader(http.StatusBadRequest)
322 return
323 }
324
325 id, err := s.idResolver.ResolveIdent(r.Context(), user)
326 if err != nil {
327 w.WriteHeader(http.StatusInternalServerError)
328 return
329 }
330
331 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
332 if err != nil {
333 s.logger.Error("failed to get public keys", "err", err)
334 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
335 return
336 }
337
338 if len(pubKeys) == 0 {
339 w.WriteHeader(http.StatusNoContent)
340 return
341 }
342
343 for _, k := range pubKeys {
344 key := strings.TrimRight(k.Key, "\n")
345 fmt.Fprintln(w, key)
346 }
347}
348
349func validateRepoName(name string) error {
350 // check for path traversal attempts
351 if name == "." || name == ".." ||
352 strings.Contains(name, "/") || strings.Contains(name, "\\") {
353 return fmt.Errorf("Repository name contains invalid path characters")
354 }
355
356 // check for sequences that could be used for traversal when normalized
357 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
358 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
359 return fmt.Errorf("Repository name contains invalid path sequence")
360 }
361
362 // then continue with character validation
363 for _, char := range name {
364 if !((char >= 'a' && char <= 'z') ||
365 (char >= 'A' && char <= 'Z') ||
366 (char >= '0' && char <= '9') ||
367 char == '-' || char == '_' || char == '.') {
368 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
369 }
370 }
371
372 // additional check to prevent multiple sequential dots
373 if strings.Contains(name, "..") {
374 return fmt.Errorf("Repository name cannot contain sequential dots")
375 }
376
377 // if all checks pass
378 return nil
379}
380
381func stripGitExt(name string) string {
382 return strings.TrimSuffix(name, ".git")
383}
384
385func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
386 switch r.Method {
387 case http.MethodGet:
388 user := s.oauth.GetMultiAccountUser(r)
389 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
390 if err != nil {
391 s.pages.Notice(w, "repo", "Invalid user account.")
392 return
393 }
394
395 s.pages.NewRepo(w, pages.NewRepoParams{
396 LoggedInUser: user,
397 Knots: knots,
398 })
399
400 case http.MethodPost:
401 l := s.logger.With("handler", "NewRepo")
402
403 user := s.oauth.GetMultiAccountUser(r)
404 l = l.With("did", user.Active.Did)
405
406 // form validation
407 domain := r.FormValue("domain")
408 if domain == "" {
409 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
410 return
411 }
412 l = l.With("knot", domain)
413
414 repoName := r.FormValue("name")
415 if repoName == "" {
416 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
417 return
418 }
419
420 if err := validateRepoName(repoName); err != nil {
421 s.pages.Notice(w, "repo", err.Error())
422 return
423 }
424 repoName = stripGitExt(repoName)
425 l = l.With("repoName", repoName)
426
427 defaultBranch := r.FormValue("branch")
428 if defaultBranch == "" {
429 defaultBranch = "main"
430 }
431 l = l.With("defaultBranch", defaultBranch)
432
433 description := r.FormValue("description")
434 if len([]rune(description)) > 140 {
435 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
436 return
437 }
438
439 // ACL validation
440 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
441 if err != nil || !ok {
442 l.Info("unauthorized")
443 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
444 return
445 }
446
447 // Check for existing repos
448 existingRepo, err := db.GetRepo(
449 s.db,
450 orm.FilterEq("did", user.Active.Did),
451 orm.FilterEq("name", repoName),
452 )
453 if err == nil && existingRepo != nil {
454 l.Info("repo exists")
455 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
456 return
457 }
458
459 // create atproto record for this repo
460 rkey := tid.TID()
461 repo := &models.Repo{
462 Did: user.Active.Did,
463 Name: repoName,
464 Knot: domain,
465 Rkey: rkey,
466 Description: description,
467 Created: time.Now(),
468 Labels: s.config.Label.DefaultLabelDefs,
469 }
470 record := repo.AsRecord()
471
472 atpClient, err := s.oauth.AuthorizedClient(r)
473 if err != nil {
474 l.Info("PDS write failed", "err", err)
475 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
476 return
477 }
478
479 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
480 Collection: tangled.RepoNSID,
481 Repo: user.Active.Did,
482 Rkey: rkey,
483 Record: &lexutil.LexiconTypeDecoder{
484 Val: &record,
485 },
486 })
487 if err != nil {
488 l.Info("PDS write failed", "err", err)
489 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
490 return
491 }
492
493 aturi := atresp.Uri
494 l = l.With("aturi", aturi)
495 l.Info("wrote to PDS")
496
497 tx, err := s.db.BeginTx(r.Context(), nil)
498 if err != nil {
499 l.Info("txn failed", "err", err)
500 s.pages.Notice(w, "repo", "Failed to save repository information.")
501 return
502 }
503
504 // The rollback function reverts a few things on failure:
505 // - the pending txn
506 // - the ACLs
507 // - the atproto record created
508 rollback := func() {
509 err1 := tx.Rollback()
510 err2 := s.enforcer.E.LoadPolicy()
511 err3 := rollbackRecord(context.Background(), aturi, atpClient)
512
513 // ignore txn complete errors, this is okay
514 if errors.Is(err1, sql.ErrTxDone) {
515 err1 = nil
516 }
517
518 if errs := errors.Join(err1, err2, err3); errs != nil {
519 l.Error("failed to rollback changes", "errs", errs)
520 return
521 }
522 }
523 defer rollback()
524
525 client, err := s.oauth.ServiceClient(
526 r,
527 oauth.WithService(domain),
528 oauth.WithLxm(tangled.RepoCreateNSID),
529 oauth.WithDev(s.config.Core.Dev),
530 )
531 if err != nil {
532 l.Error("service auth failed", "err", err)
533 s.pages.Notice(w, "repo", "Failed to reach PDS.")
534 return
535 }
536
537 xe := tangled.RepoCreate(
538 r.Context(),
539 client,
540 &tangled.RepoCreate_Input{
541 Rkey: rkey,
542 },
543 )
544 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
545 l.Error("xrpc error", "xe", xe)
546 s.pages.Notice(w, "repo", err.Error())
547 return
548 }
549
550 err = db.AddRepo(tx, repo)
551 if err != nil {
552 l.Error("db write failed", "err", err)
553 s.pages.Notice(w, "repo", "Failed to save repository information.")
554 return
555 }
556
557 // acls
558 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
559 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
560 if err != nil {
561 l.Error("acl setup failed", "err", err)
562 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
563 return
564 }
565
566 err = tx.Commit()
567 if err != nil {
568 l.Error("txn commit failed", "err", err)
569 http.Error(w, err.Error(), http.StatusInternalServerError)
570 return
571 }
572
573 err = s.enforcer.E.SavePolicy()
574 if err != nil {
575 l.Error("acl save failed", "err", err)
576 http.Error(w, err.Error(), http.StatusInternalServerError)
577 return
578 }
579
580 // reset the ATURI because the transaction completed successfully
581 aturi = ""
582
583 s.notifier.NewRepo(r.Context(), repo)
584 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
585 }
586}
587
588// this is used to rollback changes made to the PDS
589//
590// it is a no-op if the provided ATURI is empty
591func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
592 if aturi == "" {
593 return nil
594 }
595
596 parsed := syntax.ATURI(aturi)
597
598 collection := parsed.Collection().String()
599 repo := parsed.Authority().String()
600 rkey := parsed.RecordKey().String()
601
602 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
603 Collection: collection,
604 Repo: repo,
605 Rkey: rkey,
606 })
607 return err
608}
609
610func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
611 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
612 if err != nil {
613 return err
614 }
615 // already present
616 if len(defaultLabels) == len(defaults) {
617 return nil
618 }
619
620 labelDefs, err := models.FetchLabelDefs(r, defaults)
621 if err != nil {
622 return err
623 }
624
625 // Insert each label definition to the database
626 for _, labelDef := range labelDefs {
627 _, err = db.AddLabelDefinition(e, &labelDef)
628 if err != nil {
629 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
630 }
631 }
632
633 return nil
634}
635
636func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
637 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
638 if err != nil {
639 logger.Error("failed to resolve tangled.org DID", "err", err)
640 return
641 }
642
643 pdsEndpoint := resolved.PDSEndpoint()
644 if pdsEndpoint == "" {
645 logger.Error("no PDS endpoint found for tangled.sh DID")
646 return
647 }
648
649 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
650 if err != nil {
651 logger.Error("failed to create appassword session... skipping fetch", "err", err)
652 return
653 }
654
655 client := xrpc.Client{
656 Auth: &xrpc.AuthInfo{
657 AccessJwt: session.AccessJwt,
658 Did: session.Did,
659 },
660 Host: session.PdsEndpoint,
661 }
662
663 l := log.SubLogger(logger, "bluesky")
664
665 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
666 defer ticker.Stop()
667
668 for {
669 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
670 if err != nil {
671 l.Error("failed to fetch bluesky posts", "err", err)
672 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
673 l.Error("failed to insert bluesky posts", "err", err)
674 } else {
675 l.Info("inserted bluesky posts", "count", len(posts))
676 }
677
678 select {
679 case <-ticker.C:
680 case <-ctx.Done():
681 l.Info("stopping bluesky updater")
682 return
683 }
684 }
685}