Monorepo for Tangled
at master 686 lines 18 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cloudflare" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/indexer" 20 "tangled.org/core/appview/mentions" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 dbnotify "tangled.org/core/appview/notify/db" 24 phnotify "tangled.org/core/appview/notify/posthog" 25 "tangled.org/core/appview/oauth" 26 "tangled.org/core/appview/pages" 27 "tangled.org/core/appview/reporesolver" 28 "tangled.org/core/appview/validator" 29 xrpcclient "tangled.org/core/appview/xrpcclient" 30 "tangled.org/core/consts" 31 "tangled.org/core/eventconsumer" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/jetstream" 34 "tangled.org/core/log" 35 tlog "tangled.org/core/log" 36 "tangled.org/core/orm" 37 "tangled.org/core/rbac" 38 "tangled.org/core/tid" 39 40 comatproto "github.com/bluesky-social/indigo/api/atproto" 41 "github.com/bluesky-social/indigo/atproto/atclient" 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 "github.com/bluesky-social/indigo/xrpc" 45 securejoin "github.com/cyphar/filepath-securejoin" 46 "github.com/go-chi/chi/v5" 47 "github.com/posthog/posthog-go" 48) 49 50type State struct { 51 db *db.DB 52 notifier notify.Notifier 53 indexer *indexer.Indexer 54 oauth *oauth.OAuth 55 enforcer *rbac.Enforcer 56 pages *pages.Pages 57 idResolver *idresolver.Resolver 58 mentionsResolver *mentions.Resolver 59 posthog posthog.Client 60 jc *jetstream.JetstreamClient 61 config *config.Config 62 repoResolver *reporesolver.RepoResolver 63 knotstream *eventconsumer.Consumer 64 spindlestream *eventconsumer.Consumer 65 logger *slog.Logger 66 validator *validator.Validator 67 cfClient *cloudflare.Client 68} 69 70func Make(ctx context.Context, config *config.Config) (*State, error) { 71 logger := tlog.FromContext(ctx) 72 73 d, err := db.Make(ctx, config.Core.DbPath) 74 if err != nil { 75 return nil, fmt.Errorf("failed to create db: %w", err) 76 } 77 78 indexer := indexer.New(log.SubLogger(logger, "indexer")) 79 err = indexer.Init(ctx, d) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create indexer: %w", err) 82 } 83 84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create enforcer: %w", err) 87 } 88 89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 90 if err != nil { 91 logger.Error("failed to create redis resolver", "err", err) 92 res = idresolver.DefaultResolver(config.Plc.PLCURL) 93 } 94 95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create posthog client: %w", err) 98 } 99 100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 102 if err != nil { 103 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 104 } 105 validator := validator.New(d, res, enforcer) 106 107 repoResolver := reporesolver.New(config, enforcer, d) 108 109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 110 111 wrapper := db.DbWrapper{Execer: d} 112 jc, err := jetstream.NewJetstreamClient( 113 config.Jetstream.Endpoint, 114 "appview", 115 []string{ 116 tangled.GraphFollowNSID, 117 tangled.FeedStarNSID, 118 tangled.PublicKeyNSID, 119 tangled.RepoArtifactNSID, 120 tangled.ActorProfileNSID, 121 tangled.KnotMemberNSID, 122 tangled.SpindleMemberNSID, 123 tangled.SpindleNSID, 124 tangled.StringNSID, 125 tangled.RepoIssueNSID, 126 tangled.RepoIssueCommentNSID, 127 tangled.LabelDefinitionNSID, 128 tangled.LabelOpNSID, 129 }, 130 nil, 131 tlog.SubLogger(logger, "jetstream"), 132 wrapper, 133 false, 134 135 // in-memory filter is inapplicable to appview so 136 // we'll never log dids anyway. 137 false, 138 ) 139 if err != nil { 140 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 141 } 142 143 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 144 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 145 } 146 147 ingester := appview.Ingester{ 148 Db: wrapper, 149 Enforcer: enforcer, 150 IdResolver: res, 151 Config: config, 152 Logger: log.SubLogger(logger, "ingester"), 153 Validator: validator, 154 } 155 err = jc.StartJetstream(ctx, ingester.Ingest()) 156 if err != nil { 157 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 158 } 159 160 var notifiers []notify.Notifier 161 162 // Always add the database notifier 163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 164 165 // Add other notifiers in production only 166 if !config.Core.Dev { 167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 168 } 169 notifiers = append(notifiers, indexer) 170 171 // Add webhook notifier 172 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 173 174 notifier := notify.NewMergedNotifier(notifiers) 175 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 176 177 var cfClient *cloudflare.Client 178 if config.Cloudflare.ApiToken != "" { 179 cfClient, err = cloudflare.New(config) 180 if err != nil { 181 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 182 cfClient = nil 183 } 184 } 185 186 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 187 if err != nil { 188 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 189 } 190 knotstream.Start(ctx) 191 192 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 193 if err != nil { 194 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 195 } 196 spindlestream.Start(ctx) 197 198 state := &State{ 199 db: d, 200 notifier: notifier, 201 indexer: indexer, 202 oauth: oauth, 203 enforcer: enforcer, 204 pages: pages, 205 idResolver: res, 206 mentionsResolver: mentionsResolver, 207 posthog: posthog, 208 jc: jc, 209 config: config, 210 repoResolver: repoResolver, 211 knotstream: knotstream, 212 spindlestream: spindlestream, 213 logger: logger, 214 validator: validator, 215 cfClient: cfClient, 216 } 217 218 // fetch initial bluesky posts if configured 219 go fetchBskyPosts(ctx, res, config, d, logger) 220 221 return state, nil 222} 223 224func (s *State) Close() error { 225 // other close up logic goes here 226 return s.db.Close() 227} 228 229func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 230 w.Header().Set("Content-Type", "text/plain") 231 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 232 233 securityTxt := `Contact: mailto:security@tangled.org 234Preferred-Languages: en 235Canonical: https://tangled.org/.well-known/security.txt 236Expires: 2030-01-01T21:59:00.000Z 237` 238 w.Write([]byte(securityTxt)) 239} 240 241func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 242 w.Header().Set("Content-Type", "text/plain") 243 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 244 245 robotsTxt := `# Hello, Tanglers! 246User-agent: * 247Allow: / 248Disallow: /*/*/settings 249Disallow: /settings 250Disallow: /*/*/compare 251Disallow: /*/*/fork 252 253Crawl-delay: 1 254` 255 w.Write([]byte(robotsTxt)) 256} 257 258func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 259 user := s.oauth.GetMultiAccountUser(r) 260 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 261 LoggedInUser: user, 262 }) 263} 264 265func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 266 user := s.oauth.GetMultiAccountUser(r) 267 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 268 LoggedInUser: user, 269 }) 270} 271 272func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 273 user := s.oauth.GetMultiAccountUser(r) 274 s.pages.Brand(w, pages.BrandParams{ 275 LoggedInUser: user, 276 }) 277} 278 279func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 280 user := s.oauth.GetMultiAccountUser(r) 281 if user == nil { 282 return 283 } 284 285 l := s.logger.With("handler", "UpgradeBanner") 286 l = l.With("did", user.Active.Did) 287 288 regs, err := db.GetRegistrations( 289 s.db, 290 orm.FilterEq("did", user.Active.Did), 291 orm.FilterEq("needs_upgrade", 1), 292 ) 293 if err != nil { 294 l.Error("non-fatal: failed to get registrations", "err", err) 295 } 296 297 spindles, err := db.GetSpindles( 298 r.Context(), 299 s.db, 300 orm.FilterEq("owner", user.Active.Did), 301 orm.FilterEq("needs_upgrade", 1), 302 ) 303 if err != nil { 304 l.Error("non-fatal: failed to get spindles", "err", err) 305 } 306 307 if regs == nil && spindles == nil { 308 return 309 } 310 311 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 312 Registrations: regs, 313 Spindles: spindles, 314 }) 315} 316 317func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 318 user := chi.URLParam(r, "user") 319 user = strings.TrimPrefix(user, "@") 320 321 if user == "" { 322 w.WriteHeader(http.StatusBadRequest) 323 return 324 } 325 326 id, err := s.idResolver.ResolveIdent(r.Context(), user) 327 if err != nil { 328 w.WriteHeader(http.StatusInternalServerError) 329 return 330 } 331 332 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 333 if err != nil { 334 s.logger.Error("failed to get public keys", "err", err) 335 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 336 return 337 } 338 339 if len(pubKeys) == 0 { 340 w.WriteHeader(http.StatusNoContent) 341 return 342 } 343 344 for _, k := range pubKeys { 345 key := strings.TrimRight(k.Key, "\n") 346 fmt.Fprintln(w, key) 347 } 348} 349 350func validateRepoName(name string) error { 351 // check for path traversal attempts 352 if name == "." || name == ".." || 353 strings.Contains(name, "/") || strings.Contains(name, "\\") { 354 return fmt.Errorf("Repository name contains invalid path characters") 355 } 356 357 // check for sequences that could be used for traversal when normalized 358 if strings.Contains(name, "./") || strings.Contains(name, "../") || 359 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 360 return fmt.Errorf("Repository name contains invalid path sequence") 361 } 362 363 // then continue with character validation 364 for _, char := range name { 365 if !((char >= 'a' && char <= 'z') || 366 (char >= 'A' && char <= 'Z') || 367 (char >= '0' && char <= '9') || 368 char == '-' || char == '_' || char == '.') { 369 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 370 } 371 } 372 373 // additional check to prevent multiple sequential dots 374 if strings.Contains(name, "..") { 375 return fmt.Errorf("Repository name cannot contain sequential dots") 376 } 377 378 // if all checks pass 379 return nil 380} 381 382func stripGitExt(name string) string { 383 return strings.TrimSuffix(name, ".git") 384} 385 386func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 387 switch r.Method { 388 case http.MethodGet: 389 user := s.oauth.GetMultiAccountUser(r) 390 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 391 if err != nil { 392 s.pages.Notice(w, "repo", "Invalid user account.") 393 return 394 } 395 396 s.pages.NewRepo(w, pages.NewRepoParams{ 397 LoggedInUser: user, 398 Knots: knots, 399 }) 400 401 case http.MethodPost: 402 l := s.logger.With("handler", "NewRepo") 403 404 user := s.oauth.GetMultiAccountUser(r) 405 l = l.With("did", user.Active.Did) 406 407 // form validation 408 domain := r.FormValue("domain") 409 if domain == "" { 410 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 411 return 412 } 413 l = l.With("knot", domain) 414 415 repoName := r.FormValue("name") 416 if repoName == "" { 417 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 418 return 419 } 420 421 if err := validateRepoName(repoName); err != nil { 422 s.pages.Notice(w, "repo", err.Error()) 423 return 424 } 425 repoName = stripGitExt(repoName) 426 l = l.With("repoName", repoName) 427 428 defaultBranch := r.FormValue("branch") 429 if defaultBranch == "" { 430 defaultBranch = "main" 431 } 432 l = l.With("defaultBranch", defaultBranch) 433 434 description := r.FormValue("description") 435 if len([]rune(description)) > 140 { 436 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 437 return 438 } 439 440 // ACL validation 441 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 442 if err != nil || !ok { 443 l.Info("unauthorized") 444 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 445 return 446 } 447 448 // Check for existing repos 449 existingRepo, err := db.GetRepo( 450 s.db, 451 orm.FilterEq("did", user.Active.Did), 452 orm.FilterEq("name", repoName), 453 ) 454 if err == nil && existingRepo != nil { 455 l.Info("repo exists") 456 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 457 return 458 } 459 460 // create atproto record for this repo 461 rkey := tid.TID() 462 repo := &models.Repo{ 463 Did: user.Active.Did, 464 Name: repoName, 465 Knot: domain, 466 Rkey: rkey, 467 Description: description, 468 Created: time.Now(), 469 Labels: s.config.Label.DefaultLabelDefs, 470 } 471 record := repo.AsRecord() 472 473 atpClient, err := s.oauth.AuthorizedClient(r) 474 if err != nil { 475 l.Info("PDS write failed", "err", err) 476 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 477 return 478 } 479 480 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 481 Collection: tangled.RepoNSID, 482 Repo: user.Active.Did, 483 Rkey: rkey, 484 Record: &lexutil.LexiconTypeDecoder{ 485 Val: &record, 486 }, 487 }) 488 if err != nil { 489 l.Info("PDS write failed", "err", err) 490 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 491 return 492 } 493 494 aturi := atresp.Uri 495 l = l.With("aturi", aturi) 496 l.Info("wrote to PDS") 497 498 tx, err := s.db.BeginTx(r.Context(), nil) 499 if err != nil { 500 l.Info("txn failed", "err", err) 501 s.pages.Notice(w, "repo", "Failed to save repository information.") 502 return 503 } 504 505 // The rollback function reverts a few things on failure: 506 // - the pending txn 507 // - the ACLs 508 // - the atproto record created 509 rollback := func() { 510 err1 := tx.Rollback() 511 err2 := s.enforcer.E.LoadPolicy() 512 err3 := rollbackRecord(context.Background(), aturi, atpClient) 513 514 // ignore txn complete errors, this is okay 515 if errors.Is(err1, sql.ErrTxDone) { 516 err1 = nil 517 } 518 519 if errs := errors.Join(err1, err2, err3); errs != nil { 520 l.Error("failed to rollback changes", "errs", errs) 521 return 522 } 523 } 524 defer rollback() 525 526 client, err := s.oauth.ServiceClient( 527 r, 528 oauth.WithService(domain), 529 oauth.WithLxm(tangled.RepoCreateNSID), 530 oauth.WithDev(s.config.Core.Dev), 531 ) 532 if err != nil { 533 l.Error("service auth failed", "err", err) 534 s.pages.Notice(w, "repo", "Failed to reach PDS.") 535 return 536 } 537 538 xe := tangled.RepoCreate( 539 r.Context(), 540 client, 541 &tangled.RepoCreate_Input{ 542 Rkey: rkey, 543 }, 544 ) 545 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 546 l.Error("xrpc error", "xe", xe) 547 s.pages.Notice(w, "repo", err.Error()) 548 return 549 } 550 551 err = db.AddRepo(tx, repo) 552 if err != nil { 553 l.Error("db write failed", "err", err) 554 s.pages.Notice(w, "repo", "Failed to save repository information.") 555 return 556 } 557 558 // acls 559 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 560 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 561 if err != nil { 562 l.Error("acl setup failed", "err", err) 563 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 564 return 565 } 566 567 err = tx.Commit() 568 if err != nil { 569 l.Error("txn commit failed", "err", err) 570 http.Error(w, err.Error(), http.StatusInternalServerError) 571 return 572 } 573 574 err = s.enforcer.E.SavePolicy() 575 if err != nil { 576 l.Error("acl save failed", "err", err) 577 http.Error(w, err.Error(), http.StatusInternalServerError) 578 return 579 } 580 581 // reset the ATURI because the transaction completed successfully 582 aturi = "" 583 584 s.notifier.NewRepo(r.Context(), repo) 585 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 586 } 587} 588 589// this is used to rollback changes made to the PDS 590// 591// it is a no-op if the provided ATURI is empty 592func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 593 if aturi == "" { 594 return nil 595 } 596 597 parsed := syntax.ATURI(aturi) 598 599 collection := parsed.Collection().String() 600 repo := parsed.Authority().String() 601 rkey := parsed.RecordKey().String() 602 603 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 604 Collection: collection, 605 Repo: repo, 606 Rkey: rkey, 607 }) 608 return err 609} 610 611func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 612 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 613 if err != nil { 614 return err 615 } 616 // already present 617 if len(defaultLabels) == len(defaults) { 618 return nil 619 } 620 621 labelDefs, err := models.FetchLabelDefs(r, defaults) 622 if err != nil { 623 return err 624 } 625 626 // Insert each label definition to the database 627 for _, labelDef := range labelDefs { 628 _, err = db.AddLabelDefinition(e, &labelDef) 629 if err != nil { 630 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 631 } 632 } 633 634 return nil 635} 636 637func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 638 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 639 if err != nil { 640 logger.Error("failed to resolve tangled.org DID", "err", err) 641 return 642 } 643 644 pdsEndpoint := resolved.PDSEndpoint() 645 if pdsEndpoint == "" { 646 logger.Error("no PDS endpoint found for tangled.sh DID") 647 return 648 } 649 650 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 651 if err != nil { 652 logger.Error("failed to create appassword session... skipping fetch", "err", err) 653 return 654 } 655 656 client := xrpc.Client{ 657 Auth: &xrpc.AuthInfo{ 658 AccessJwt: session.AccessJwt, 659 Did: session.Did, 660 }, 661 Host: session.PdsEndpoint, 662 } 663 664 l := log.SubLogger(logger, "bluesky") 665 666 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 667 defer ticker.Stop() 668 669 for { 670 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 671 if err != nil { 672 l.Error("failed to fetch bluesky posts", "err", err) 673 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 674 l.Error("failed to insert bluesky posts", "err", err) 675 } else { 676 l.Info("inserted bluesky posts", "count", len(posts)) 677 } 678 679 select { 680 case <-ticker.C: 681 case <-ctx.Done(): 682 l.Info("stopping bluesky updater") 683 return 684 } 685 } 686}