Monorepo for Tangled
at master 736 lines 20 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cloudflare" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/indexer" 20 "tangled.org/core/appview/mentions" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 dbnotify "tangled.org/core/appview/notify/db" 24 phnotify "tangled.org/core/appview/notify/posthog" 25 "tangled.org/core/appview/oauth" 26 "tangled.org/core/appview/pages" 27 "tangled.org/core/appview/reporesolver" 28 "tangled.org/core/appview/validator" 29 xrpcclient "tangled.org/core/appview/xrpcclient" 30 "tangled.org/core/consts" 31 "tangled.org/core/eventconsumer" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/jetstream" 34 "tangled.org/core/log" 35 tlog "tangled.org/core/log" 36 "tangled.org/core/orm" 37 "tangled.org/core/rbac" 38 "tangled.org/core/tid" 39 40 comatproto "github.com/bluesky-social/indigo/api/atproto" 41 "github.com/bluesky-social/indigo/atproto/atclient" 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 "github.com/bluesky-social/indigo/xrpc" 45 46 "github.com/go-chi/chi/v5" 47 "github.com/posthog/posthog-go" 48) 49 50type State struct { 51 db *db.DB 52 notifier notify.Notifier 53 indexer *indexer.Indexer 54 oauth *oauth.OAuth 55 enforcer *rbac.Enforcer 56 pages *pages.Pages 57 idResolver *idresolver.Resolver 58 mentionsResolver *mentions.Resolver 59 posthog posthog.Client 60 jc *jetstream.JetstreamClient 61 config *config.Config 62 repoResolver *reporesolver.RepoResolver 63 knotstream *eventconsumer.Consumer 64 spindlestream *eventconsumer.Consumer 65 logger *slog.Logger 66 validator *validator.Validator 67 cfClient *cloudflare.Client 68} 69 70func Make(ctx context.Context, config *config.Config) (*State, error) { 71 logger := tlog.FromContext(ctx) 72 73 d, err := db.Make(ctx, config.Core.DbPath) 74 if err != nil { 75 return nil, fmt.Errorf("failed to create db: %w", err) 76 } 77 78 indexer := indexer.New(log.SubLogger(logger, "indexer")) 79 err = indexer.Init(ctx, d) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create indexer: %w", err) 82 } 83 84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create enforcer: %w", err) 87 } 88 89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 90 if err != nil { 91 logger.Error("failed to create redis resolver", "err", err) 92 res = idresolver.DefaultResolver(config.Plc.PLCURL) 93 } 94 95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create posthog client: %w", err) 98 } 99 100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 102 if err != nil { 103 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 104 } 105 validator := validator.New(d, res, enforcer) 106 107 repoResolver := reporesolver.New(config, enforcer, d) 108 109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 110 111 wrapper := db.DbWrapper{Execer: d} 112 jc, err := jetstream.NewJetstreamClient( 113 config.Jetstream.Endpoint, 114 "appview", 115 []string{ 116 tangled.GraphFollowNSID, 117 tangled.FeedStarNSID, 118 tangled.PublicKeyNSID, 119 tangled.RepoArtifactNSID, 120 tangled.ActorProfileNSID, 121 tangled.KnotMemberNSID, 122 tangled.SpindleMemberNSID, 123 tangled.SpindleNSID, 124 tangled.StringNSID, 125 tangled.RepoIssueNSID, 126 tangled.RepoIssueCommentNSID, 127 tangled.LabelDefinitionNSID, 128 tangled.LabelOpNSID, 129 }, 130 nil, 131 tlog.SubLogger(logger, "jetstream"), 132 wrapper, 133 false, 134 135 // in-memory filter is inapplicable to appview so 136 // we'll never log dids anyway. 137 false, 138 ) 139 if err != nil { 140 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 141 } 142 143 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 144 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 145 } 146 147 ingester := appview.Ingester{ 148 Db: wrapper, 149 Enforcer: enforcer, 150 IdResolver: res, 151 Config: config, 152 Logger: log.SubLogger(logger, "ingester"), 153 Validator: validator, 154 } 155 err = jc.StartJetstream(ctx, ingester.Ingest()) 156 if err != nil { 157 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 158 } 159 160 var notifiers []notify.Notifier 161 162 // Always add the database notifier 163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 164 165 // Add other notifiers in production only 166 if !config.Core.Dev { 167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 168 } 169 notifiers = append(notifiers, indexer) 170 171 // Add webhook notifier 172 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 173 174 notifier := notify.NewMergedNotifier(notifiers) 175 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 176 177 var cfClient *cloudflare.Client 178 if config.Cloudflare.ApiToken != "" { 179 cfClient, err = cloudflare.New(config) 180 if err != nil { 181 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 182 cfClient = nil 183 } 184 } 185 186 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 187 if err != nil { 188 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 189 } 190 knotstream.Start(ctx) 191 192 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 193 if err != nil { 194 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 195 } 196 spindlestream.Start(ctx) 197 198 state := &State{ 199 db: d, 200 notifier: notifier, 201 indexer: indexer, 202 oauth: oauth, 203 enforcer: enforcer, 204 pages: pages, 205 idResolver: res, 206 mentionsResolver: mentionsResolver, 207 posthog: posthog, 208 jc: jc, 209 config: config, 210 repoResolver: repoResolver, 211 knotstream: knotstream, 212 spindlestream: spindlestream, 213 logger: logger, 214 validator: validator, 215 cfClient: cfClient, 216 } 217 218 // fetch initial bluesky posts if configured 219 go fetchBskyPosts(ctx, res, config, d, logger) 220 221 return state, nil 222} 223 224func (s *State) Close() error { 225 // other close up logic goes here 226 return s.db.Close() 227} 228 229func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 230 w.Header().Set("Content-Type", "text/plain") 231 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 232 233 securityTxt := `Contact: mailto:security@tangled.org 234Preferred-Languages: en 235Canonical: https://tangled.org/.well-known/security.txt 236Expires: 2030-01-01T21:59:00.000Z 237` 238 w.Write([]byte(securityTxt)) 239} 240 241func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 242 w.Header().Set("Content-Type", "text/plain") 243 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 244 245 robotsTxt := `# Hello, Tanglers! 246User-agent: * 247Allow: / 248Disallow: /*/*/settings 249Disallow: /settings 250Disallow: /*/*/compare 251Disallow: /*/*/fork 252 253Crawl-delay: 1 254` 255 w.Write([]byte(robotsTxt)) 256} 257 258func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 259 user := s.oauth.GetMultiAccountUser(r) 260 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 261 LoggedInUser: user, 262 }) 263} 264 265func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 266 user := s.oauth.GetMultiAccountUser(r) 267 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 268 LoggedInUser: user, 269 }) 270} 271 272func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 273 user := s.oauth.GetMultiAccountUser(r) 274 s.pages.Brand(w, pages.BrandParams{ 275 LoggedInUser: user, 276 }) 277} 278 279func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 280 user := s.oauth.GetMultiAccountUser(r) 281 if user == nil { 282 return 283 } 284 285 l := s.logger.With("handler", "UpgradeBanner") 286 l = l.With("did", user.Active.Did) 287 288 regs, err := db.GetRegistrations( 289 s.db, 290 orm.FilterEq("did", user.Active.Did), 291 orm.FilterEq("needs_upgrade", 1), 292 ) 293 if err != nil { 294 l.Error("non-fatal: failed to get registrations", "err", err) 295 } 296 297 spindles, err := db.GetSpindles( 298 r.Context(), 299 s.db, 300 orm.FilterEq("owner", user.Active.Did), 301 orm.FilterEq("needs_upgrade", 1), 302 ) 303 if err != nil { 304 l.Error("non-fatal: failed to get spindles", "err", err) 305 } 306 307 if regs == nil && spindles == nil { 308 return 309 } 310 311 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 312 Registrations: regs, 313 Spindles: spindles, 314 }) 315} 316 317func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 318 user := chi.URLParam(r, "user") 319 user = strings.TrimPrefix(user, "@") 320 321 if user == "" { 322 w.WriteHeader(http.StatusBadRequest) 323 return 324 } 325 326 id, err := s.idResolver.ResolveIdent(r.Context(), user) 327 if err != nil { 328 w.WriteHeader(http.StatusInternalServerError) 329 return 330 } 331 332 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 333 if err != nil { 334 s.logger.Error("failed to get public keys", "err", err) 335 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 336 return 337 } 338 339 if len(pubKeys) == 0 { 340 w.WriteHeader(http.StatusNoContent) 341 return 342 } 343 344 for _, k := range pubKeys { 345 key := strings.TrimRight(k.Key, "\n") 346 fmt.Fprintln(w, key) 347 } 348} 349 350func validateRepoName(name string) error { 351 // check for path traversal attempts 352 if name == "." || name == ".." || 353 strings.Contains(name, "/") || strings.Contains(name, "\\") { 354 return fmt.Errorf("Repository name contains invalid path characters") 355 } 356 357 // check for sequences that could be used for traversal when normalized 358 if strings.Contains(name, "./") || strings.Contains(name, "../") || 359 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 360 return fmt.Errorf("Repository name contains invalid path sequence") 361 } 362 363 // then continue with character validation 364 for _, char := range name { 365 if !((char >= 'a' && char <= 'z') || 366 (char >= 'A' && char <= 'Z') || 367 (char >= '0' && char <= '9') || 368 char == '-' || char == '_' || char == '.') { 369 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 370 } 371 } 372 373 // additional check to prevent multiple sequential dots 374 if strings.Contains(name, "..") { 375 return fmt.Errorf("Repository name cannot contain sequential dots") 376 } 377 378 // if all checks pass 379 return nil 380} 381 382func stripGitExt(name string) string { 383 return strings.TrimSuffix(name, ".git") 384} 385 386func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 387 switch r.Method { 388 case http.MethodGet: 389 user := s.oauth.GetMultiAccountUser(r) 390 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 391 if err != nil { 392 s.pages.Notice(w, "repo", "Invalid user account.") 393 return 394 } 395 396 s.pages.NewRepo(w, pages.NewRepoParams{ 397 LoggedInUser: user, 398 Knots: knots, 399 }) 400 401 case http.MethodPost: 402 l := s.logger.With("handler", "NewRepo") 403 404 user := s.oauth.GetMultiAccountUser(r) 405 l = l.With("did", user.Active.Did) 406 407 // form validation 408 domain := r.FormValue("domain") 409 if domain == "" { 410 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 411 return 412 } 413 l = l.With("knot", domain) 414 415 repoName := r.FormValue("name") 416 if repoName == "" { 417 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 418 return 419 } 420 421 if err := validateRepoName(repoName); err != nil { 422 s.pages.Notice(w, "repo", err.Error()) 423 return 424 } 425 repoName = stripGitExt(repoName) 426 l = l.With("repoName", repoName) 427 428 defaultBranch := r.FormValue("branch") 429 if defaultBranch == "" { 430 defaultBranch = "main" 431 } 432 l = l.With("defaultBranch", defaultBranch) 433 434 description := r.FormValue("description") 435 if len([]rune(description)) > 140 { 436 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 437 return 438 } 439 440 // ACL validation 441 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 442 if err != nil || !ok { 443 l.Info("unauthorized") 444 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 445 return 446 } 447 448 // Check for existing repos 449 existingRepo, err := db.GetRepo( 450 s.db, 451 orm.FilterEq("did", user.Active.Did), 452 orm.FilterEq("name", repoName), 453 ) 454 if err == nil && existingRepo != nil { 455 l.Info("repo exists") 456 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 457 return 458 } 459 460 rkey := tid.TID() 461 462 client, err := s.oauth.ServiceClient( 463 r, 464 oauth.WithService(domain), 465 oauth.WithLxm(tangled.RepoCreateNSID), 466 oauth.WithDev(s.config.Core.Dev), 467 ) 468 if err != nil { 469 l.Error("service auth failed", "err", err) 470 s.pages.Notice(w, "repo", "Failed to reach knot server.") 471 return 472 } 473 474 input := &tangled.RepoCreate_Input{ 475 Rkey: rkey, 476 Name: repoName, 477 DefaultBranch: &defaultBranch, 478 } 479 createResp, xe := tangled.RepoCreate( 480 r.Context(), 481 client, 482 input, 483 ) 484 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 485 l.Error("xrpc error", "xe", xe) 486 s.pages.Notice(w, "repo", err.Error()) 487 return 488 } 489 490 var repoDid string 491 if createResp != nil && createResp.RepoDid != nil { 492 repoDid = *createResp.RepoDid 493 } 494 if repoDid == "" { 495 l.Error("knot returned empty repo DID") 496 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 497 return 498 } 499 500 repo := &models.Repo{ 501 Did: user.Active.Did, 502 Name: repoName, 503 Knot: domain, 504 Rkey: rkey, 505 Description: description, 506 Created: time.Now(), 507 Labels: s.config.Label.DefaultLabelDefs, 508 RepoDid: repoDid, 509 } 510 record := repo.AsRecord() 511 512 cleanupKnot := func() { 513 go func() { 514 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 515 for attempt, delay := range delays { 516 time.Sleep(delay) 517 deleteClient, dErr := s.oauth.ServiceClient( 518 r, 519 oauth.WithService(domain), 520 oauth.WithLxm(tangled.RepoDeleteNSID), 521 oauth.WithDev(s.config.Core.Dev), 522 ) 523 if dErr != nil { 524 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 525 continue 526 } 527 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 528 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 529 Did: user.Active.Did, 530 Name: repoName, 531 Rkey: rkey, 532 }); dErr != nil { 533 cancel() 534 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 535 continue 536 } 537 cancel() 538 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 539 return 540 } 541 l.Error("exhausted retries for knot cleanup, repo may be orphaned", 542 "did", user.Active.Did, "repo", repoName, "knot", domain) 543 }() 544 } 545 546 atpClient, err := s.oauth.AuthorizedClient(r) 547 if err != nil { 548 l.Info("PDS write failed", "err", err) 549 cleanupKnot() 550 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 551 return 552 } 553 554 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 555 Collection: tangled.RepoNSID, 556 Repo: user.Active.Did, 557 Rkey: rkey, 558 Record: &lexutil.LexiconTypeDecoder{ 559 Val: &record, 560 }, 561 }) 562 if err != nil { 563 l.Info("PDS write failed", "err", err) 564 cleanupKnot() 565 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 566 return 567 } 568 569 aturi := atresp.Uri 570 l = l.With("aturi", aturi) 571 l.Info("wrote to PDS") 572 573 tx, err := s.db.BeginTx(r.Context(), nil) 574 if err != nil { 575 l.Info("txn failed", "err", err) 576 s.pages.Notice(w, "repo", "Failed to save repository information.") 577 return 578 } 579 580 rollback := func() { 581 err1 := tx.Rollback() 582 err2 := s.enforcer.E.LoadPolicy() 583 err3 := rollbackRecord(context.Background(), aturi, atpClient) 584 585 if errors.Is(err1, sql.ErrTxDone) { 586 err1 = nil 587 } 588 589 if errs := errors.Join(err1, err2, err3); errs != nil { 590 l.Error("failed to rollback changes", "errs", errs) 591 } 592 593 if aturi != "" { 594 cleanupKnot() 595 } 596 } 597 defer rollback() 598 599 err = db.AddRepo(tx, repo) 600 if err != nil { 601 l.Error("db write failed", "err", err) 602 s.pages.Notice(w, "repo", "Failed to save repository information.") 603 return 604 } 605 606 rbacPath := repo.RepoIdentifier() 607 err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath) 608 if err != nil { 609 l.Error("acl setup failed", "err", err) 610 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 611 return 612 } 613 614 err = tx.Commit() 615 if err != nil { 616 l.Error("txn commit failed", "err", err) 617 http.Error(w, err.Error(), http.StatusInternalServerError) 618 return 619 } 620 621 err = s.enforcer.E.SavePolicy() 622 if err != nil { 623 l.Error("acl save failed", "err", err) 624 http.Error(w, err.Error(), http.StatusInternalServerError) 625 return 626 } 627 628 aturi = "" 629 630 s.notifier.NewRepo(r.Context(), repo) 631 if repoDid != "" { 632 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 633 } else { 634 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 635 } 636 } 637} 638 639// this is used to rollback changes made to the PDS 640// 641// it is a no-op if the provided ATURI is empty 642func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 643 if aturi == "" { 644 return nil 645 } 646 647 parsed := syntax.ATURI(aturi) 648 649 collection := parsed.Collection().String() 650 repo := parsed.Authority().String() 651 rkey := parsed.RecordKey().String() 652 653 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 654 Collection: collection, 655 Repo: repo, 656 Rkey: rkey, 657 }) 658 return err 659} 660 661func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 662 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 663 if err != nil { 664 return err 665 } 666 // already present 667 if len(defaultLabels) == len(defaults) { 668 return nil 669 } 670 671 labelDefs, err := models.FetchLabelDefs(r, defaults) 672 if err != nil { 673 return err 674 } 675 676 // Insert each label definition to the database 677 for _, labelDef := range labelDefs { 678 _, err = db.AddLabelDefinition(e, &labelDef) 679 if err != nil { 680 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 681 } 682 } 683 684 return nil 685} 686 687func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 688 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 689 if err != nil { 690 logger.Error("failed to resolve tangled.org DID", "err", err) 691 return 692 } 693 694 pdsEndpoint := resolved.PDSEndpoint() 695 if pdsEndpoint == "" { 696 logger.Error("no PDS endpoint found for tangled.sh DID") 697 return 698 } 699 700 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 701 if err != nil { 702 logger.Error("failed to create appassword session... skipping fetch", "err", err) 703 return 704 } 705 706 client := xrpc.Client{ 707 Auth: &xrpc.AuthInfo{ 708 AccessJwt: session.AccessJwt, 709 Did: session.Did, 710 }, 711 Host: session.PdsEndpoint, 712 } 713 714 l := log.SubLogger(logger, "bluesky") 715 716 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 717 defer ticker.Stop() 718 719 for { 720 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 721 if err != nil { 722 l.Error("failed to fetch bluesky posts", "err", err) 723 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 724 l.Error("failed to insert bluesky posts", "err", err) 725 } else { 726 l.Info("inserted bluesky posts", "count", len(posts)) 727 } 728 729 select { 730 case <-ticker.C: 731 case <-ctx.Done(): 732 l.Info("stopping bluesky updater") 733 return 734 } 735 } 736}