Monorepo for Tangled
at sl/comment 689 lines 18 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cloudflare" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/indexer" 20 "tangled.org/core/appview/mentions" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 dbnotify "tangled.org/core/appview/notify/db" 24 phnotify "tangled.org/core/appview/notify/posthog" 25 "tangled.org/core/appview/oauth" 26 "tangled.org/core/appview/pages" 27 "tangled.org/core/appview/reporesolver" 28 "tangled.org/core/appview/validator" 29 xrpcclient "tangled.org/core/appview/xrpcclient" 30 "tangled.org/core/consts" 31 "tangled.org/core/eventconsumer" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/jetstream" 34 "tangled.org/core/log" 35 tlog "tangled.org/core/log" 36 "tangled.org/core/orm" 37 "tangled.org/core/rbac" 38 "tangled.org/core/tid" 39 40 comatproto "github.com/bluesky-social/indigo/api/atproto" 41 "github.com/bluesky-social/indigo/atproto/atclient" 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 "github.com/bluesky-social/indigo/xrpc" 45 securejoin "github.com/cyphar/filepath-securejoin" 46 "github.com/go-chi/chi/v5" 47 "github.com/posthog/posthog-go" 48) 49 50type State struct { 51 db *db.DB 52 notifier notify.Notifier 53 indexer *indexer.Indexer 54 oauth *oauth.OAuth 55 enforcer *rbac.Enforcer 56 pages *pages.Pages 57 idResolver *idresolver.Resolver 58 mentionsResolver *mentions.Resolver 59 posthog posthog.Client 60 jc *jetstream.JetstreamClient 61 config *config.Config 62 repoResolver *reporesolver.RepoResolver 63 knotstream *eventconsumer.Consumer 64 spindlestream *eventconsumer.Consumer 65 logger *slog.Logger 66 validator *validator.Validator 67 cfClient *cloudflare.Client 68} 69 70func Make(ctx context.Context, config *config.Config) (*State, error) { 71 logger := tlog.FromContext(ctx) 72 73 d, err := db.Make(ctx, config.Core.DbPath) 74 if err != nil { 75 return nil, fmt.Errorf("failed to create db: %w", err) 76 } 77 78 indexer := indexer.New(log.SubLogger(logger, "indexer")) 79 err = indexer.Init(ctx, d) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create indexer: %w", err) 82 } 83 84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create enforcer: %w", err) 87 } 88 89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 90 if err != nil { 91 logger.Error("failed to create redis resolver", "err", err) 92 res = idresolver.DefaultResolver(config.Plc.PLCURL) 93 } 94 95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create posthog client: %w", err) 98 } 99 100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 102 if err != nil { 103 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 104 } 105 validator := validator.New(d, res, enforcer) 106 107 repoResolver := reporesolver.New(config, enforcer, d) 108 109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 110 111 wrapper := db.DbWrapper{Execer: d} 112 jc, err := jetstream.NewJetstreamClient( 113 config.Jetstream.Endpoint, 114 "appview", 115 []string{ 116 tangled.GraphFollowNSID, 117 tangled.FeedStarNSID, 118 tangled.PublicKeyNSID, 119 tangled.RepoArtifactNSID, 120 tangled.ActorProfileNSID, 121 tangled.KnotMemberNSID, 122 tangled.SpindleMemberNSID, 123 tangled.SpindleNSID, 124 tangled.StringNSID, 125 tangled.RepoIssueNSID, 126 tangled.RepoIssueCommentNSID, 127 tangled.FeedCommentNSID, 128 tangled.LabelDefinitionNSID, 129 tangled.LabelOpNSID, 130 }, 131 nil, 132 tlog.SubLogger(logger, "jetstream"), 133 wrapper, 134 false, 135 136 // in-memory filter is inapplicable to appview so 137 // we'll never log dids anyway. 138 false, 139 ) 140 if err != nil { 141 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 142 } 143 144 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 145 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 146 } 147 148 var notifiers []notify.Notifier 149 150 // Always add the database notifier 151 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 152 153 // Add other notifiers in production only 154 if !config.Core.Dev { 155 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 156 } 157 notifiers = append(notifiers, indexer) 158 159 // Add webhook notifier 160 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 161 162 notifier := notify.NewMergedNotifier(notifiers) 163 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 164 165 ingester := appview.Ingester{ 166 Db: wrapper, 167 Enforcer: enforcer, 168 IdResolver: res, 169 Config: config, 170 Logger: log.SubLogger(logger, "ingester"), 171 Validator: validator, 172 MentionsResolver: mentionsResolver, 173 Notifier: notifier, 174 } 175 err = jc.StartJetstream(ctx, ingester.Ingest()) 176 if err != nil { 177 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 178 } 179 180 var cfClient *cloudflare.Client 181 if config.Cloudflare.ApiToken != "" { 182 cfClient, err = cloudflare.New(config) 183 if err != nil { 184 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 185 cfClient = nil 186 } 187 } 188 189 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 190 if err != nil { 191 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 192 } 193 knotstream.Start(ctx) 194 195 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 196 if err != nil { 197 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 198 } 199 spindlestream.Start(ctx) 200 201 state := &State{ 202 db: d, 203 notifier: notifier, 204 indexer: indexer, 205 oauth: oauth, 206 enforcer: enforcer, 207 pages: pages, 208 idResolver: res, 209 mentionsResolver: mentionsResolver, 210 posthog: posthog, 211 jc: jc, 212 config: config, 213 repoResolver: repoResolver, 214 knotstream: knotstream, 215 spindlestream: spindlestream, 216 logger: logger, 217 validator: validator, 218 cfClient: cfClient, 219 } 220 221 // fetch initial bluesky posts if configured 222 go fetchBskyPosts(ctx, res, config, d, logger) 223 224 return state, nil 225} 226 227func (s *State) Close() error { 228 // other close up logic goes here 229 return s.db.Close() 230} 231 232func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 233 w.Header().Set("Content-Type", "text/plain") 234 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 235 236 securityTxt := `Contact: mailto:security@tangled.org 237Preferred-Languages: en 238Canonical: https://tangled.org/.well-known/security.txt 239Expires: 2030-01-01T21:59:00.000Z 240` 241 w.Write([]byte(securityTxt)) 242} 243 244func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 245 w.Header().Set("Content-Type", "text/plain") 246 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 247 248 robotsTxt := `# Hello, Tanglers! 249User-agent: * 250Allow: / 251Disallow: /*/*/settings 252Disallow: /settings 253Disallow: /*/*/compare 254Disallow: /*/*/fork 255 256Crawl-delay: 1 257` 258 w.Write([]byte(robotsTxt)) 259} 260 261func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 262 user := s.oauth.GetMultiAccountUser(r) 263 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 264 LoggedInUser: user, 265 }) 266} 267 268func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 269 user := s.oauth.GetMultiAccountUser(r) 270 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 271 LoggedInUser: user, 272 }) 273} 274 275func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 276 user := s.oauth.GetMultiAccountUser(r) 277 s.pages.Brand(w, pages.BrandParams{ 278 LoggedInUser: user, 279 }) 280} 281 282func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 283 user := s.oauth.GetMultiAccountUser(r) 284 if user == nil { 285 return 286 } 287 288 l := s.logger.With("handler", "UpgradeBanner") 289 l = l.With("did", user.Active.Did) 290 291 regs, err := db.GetRegistrations( 292 s.db, 293 orm.FilterEq("did", user.Active.Did), 294 orm.FilterEq("needs_upgrade", 1), 295 ) 296 if err != nil { 297 l.Error("non-fatal: failed to get registrations", "err", err) 298 } 299 300 spindles, err := db.GetSpindles( 301 r.Context(), 302 s.db, 303 orm.FilterEq("owner", user.Active.Did), 304 orm.FilterEq("needs_upgrade", 1), 305 ) 306 if err != nil { 307 l.Error("non-fatal: failed to get spindles", "err", err) 308 } 309 310 if regs == nil && spindles == nil { 311 return 312 } 313 314 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 315 Registrations: regs, 316 Spindles: spindles, 317 }) 318} 319 320func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 321 user := chi.URLParam(r, "user") 322 user = strings.TrimPrefix(user, "@") 323 324 if user == "" { 325 w.WriteHeader(http.StatusBadRequest) 326 return 327 } 328 329 id, err := s.idResolver.ResolveIdent(r.Context(), user) 330 if err != nil { 331 w.WriteHeader(http.StatusInternalServerError) 332 return 333 } 334 335 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 336 if err != nil { 337 s.logger.Error("failed to get public keys", "err", err) 338 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 339 return 340 } 341 342 if len(pubKeys) == 0 { 343 w.WriteHeader(http.StatusNoContent) 344 return 345 } 346 347 for _, k := range pubKeys { 348 key := strings.TrimRight(k.Key, "\n") 349 fmt.Fprintln(w, key) 350 } 351} 352 353func validateRepoName(name string) error { 354 // check for path traversal attempts 355 if name == "." || name == ".." || 356 strings.Contains(name, "/") || strings.Contains(name, "\\") { 357 return fmt.Errorf("Repository name contains invalid path characters") 358 } 359 360 // check for sequences that could be used for traversal when normalized 361 if strings.Contains(name, "./") || strings.Contains(name, "../") || 362 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 363 return fmt.Errorf("Repository name contains invalid path sequence") 364 } 365 366 // then continue with character validation 367 for _, char := range name { 368 if !((char >= 'a' && char <= 'z') || 369 (char >= 'A' && char <= 'Z') || 370 (char >= '0' && char <= '9') || 371 char == '-' || char == '_' || char == '.') { 372 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 373 } 374 } 375 376 // additional check to prevent multiple sequential dots 377 if strings.Contains(name, "..") { 378 return fmt.Errorf("Repository name cannot contain sequential dots") 379 } 380 381 // if all checks pass 382 return nil 383} 384 385func stripGitExt(name string) string { 386 return strings.TrimSuffix(name, ".git") 387} 388 389func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 390 switch r.Method { 391 case http.MethodGet: 392 user := s.oauth.GetMultiAccountUser(r) 393 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 394 if err != nil { 395 s.pages.Notice(w, "repo", "Invalid user account.") 396 return 397 } 398 399 s.pages.NewRepo(w, pages.NewRepoParams{ 400 LoggedInUser: user, 401 Knots: knots, 402 }) 403 404 case http.MethodPost: 405 l := s.logger.With("handler", "NewRepo") 406 407 user := s.oauth.GetMultiAccountUser(r) 408 l = l.With("did", user.Active.Did) 409 410 // form validation 411 domain := r.FormValue("domain") 412 if domain == "" { 413 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 414 return 415 } 416 l = l.With("knot", domain) 417 418 repoName := r.FormValue("name") 419 if repoName == "" { 420 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 421 return 422 } 423 424 if err := validateRepoName(repoName); err != nil { 425 s.pages.Notice(w, "repo", err.Error()) 426 return 427 } 428 repoName = stripGitExt(repoName) 429 l = l.With("repoName", repoName) 430 431 defaultBranch := r.FormValue("branch") 432 if defaultBranch == "" { 433 defaultBranch = "main" 434 } 435 l = l.With("defaultBranch", defaultBranch) 436 437 description := r.FormValue("description") 438 if len([]rune(description)) > 140 { 439 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 440 return 441 } 442 443 // ACL validation 444 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 445 if err != nil || !ok { 446 l.Info("unauthorized") 447 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 448 return 449 } 450 451 // Check for existing repos 452 existingRepo, err := db.GetRepo( 453 s.db, 454 orm.FilterEq("did", user.Active.Did), 455 orm.FilterEq("name", repoName), 456 ) 457 if err == nil && existingRepo != nil { 458 l.Info("repo exists") 459 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 460 return 461 } 462 463 // create atproto record for this repo 464 rkey := tid.TID() 465 repo := &models.Repo{ 466 Did: user.Active.Did, 467 Name: repoName, 468 Knot: domain, 469 Rkey: rkey, 470 Description: description, 471 Created: time.Now(), 472 Labels: s.config.Label.DefaultLabelDefs, 473 } 474 record := repo.AsRecord() 475 476 atpClient, err := s.oauth.AuthorizedClient(r) 477 if err != nil { 478 l.Info("PDS write failed", "err", err) 479 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 480 return 481 } 482 483 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 484 Collection: tangled.RepoNSID, 485 Repo: user.Active.Did, 486 Rkey: rkey, 487 Record: &lexutil.LexiconTypeDecoder{ 488 Val: &record, 489 }, 490 }) 491 if err != nil { 492 l.Info("PDS write failed", "err", err) 493 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 494 return 495 } 496 497 aturi := atresp.Uri 498 l = l.With("aturi", aturi) 499 l.Info("wrote to PDS") 500 501 tx, err := s.db.BeginTx(r.Context(), nil) 502 if err != nil { 503 l.Info("txn failed", "err", err) 504 s.pages.Notice(w, "repo", "Failed to save repository information.") 505 return 506 } 507 508 // The rollback function reverts a few things on failure: 509 // - the pending txn 510 // - the ACLs 511 // - the atproto record created 512 rollback := func() { 513 err1 := tx.Rollback() 514 err2 := s.enforcer.E.LoadPolicy() 515 err3 := rollbackRecord(context.Background(), aturi, atpClient) 516 517 // ignore txn complete errors, this is okay 518 if errors.Is(err1, sql.ErrTxDone) { 519 err1 = nil 520 } 521 522 if errs := errors.Join(err1, err2, err3); errs != nil { 523 l.Error("failed to rollback changes", "errs", errs) 524 return 525 } 526 } 527 defer rollback() 528 529 client, err := s.oauth.ServiceClient( 530 r, 531 oauth.WithService(domain), 532 oauth.WithLxm(tangled.RepoCreateNSID), 533 oauth.WithDev(s.config.Core.Dev), 534 ) 535 if err != nil { 536 l.Error("service auth failed", "err", err) 537 s.pages.Notice(w, "repo", "Failed to reach PDS.") 538 return 539 } 540 541 xe := tangled.RepoCreate( 542 r.Context(), 543 client, 544 &tangled.RepoCreate_Input{ 545 Rkey: rkey, 546 }, 547 ) 548 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 549 l.Error("xrpc error", "xe", xe) 550 s.pages.Notice(w, "repo", err.Error()) 551 return 552 } 553 554 err = db.AddRepo(tx, repo) 555 if err != nil { 556 l.Error("db write failed", "err", err) 557 s.pages.Notice(w, "repo", "Failed to save repository information.") 558 return 559 } 560 561 // acls 562 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 563 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 564 if err != nil { 565 l.Error("acl setup failed", "err", err) 566 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 567 return 568 } 569 570 err = tx.Commit() 571 if err != nil { 572 l.Error("txn commit failed", "err", err) 573 http.Error(w, err.Error(), http.StatusInternalServerError) 574 return 575 } 576 577 err = s.enforcer.E.SavePolicy() 578 if err != nil { 579 l.Error("acl save failed", "err", err) 580 http.Error(w, err.Error(), http.StatusInternalServerError) 581 return 582 } 583 584 // reset the ATURI because the transaction completed successfully 585 aturi = "" 586 587 s.notifier.NewRepo(r.Context(), repo) 588 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 589 } 590} 591 592// this is used to rollback changes made to the PDS 593// 594// it is a no-op if the provided ATURI is empty 595func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 596 if aturi == "" { 597 return nil 598 } 599 600 parsed := syntax.ATURI(aturi) 601 602 collection := parsed.Collection().String() 603 repo := parsed.Authority().String() 604 rkey := parsed.RecordKey().String() 605 606 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 607 Collection: collection, 608 Repo: repo, 609 Rkey: rkey, 610 }) 611 return err 612} 613 614func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 615 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 616 if err != nil { 617 return err 618 } 619 // already present 620 if len(defaultLabels) == len(defaults) { 621 return nil 622 } 623 624 labelDefs, err := models.FetchLabelDefs(r, defaults) 625 if err != nil { 626 return err 627 } 628 629 // Insert each label definition to the database 630 for _, labelDef := range labelDefs { 631 _, err = db.AddLabelDefinition(e, &labelDef) 632 if err != nil { 633 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 634 } 635 } 636 637 return nil 638} 639 640func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 641 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 642 if err != nil { 643 logger.Error("failed to resolve tangled.org DID", "err", err) 644 return 645 } 646 647 pdsEndpoint := resolved.PDSEndpoint() 648 if pdsEndpoint == "" { 649 logger.Error("no PDS endpoint found for tangled.sh DID") 650 return 651 } 652 653 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 654 if err != nil { 655 logger.Error("failed to create appassword session... skipping fetch", "err", err) 656 return 657 } 658 659 client := xrpc.Client{ 660 Auth: &xrpc.AuthInfo{ 661 AccessJwt: session.AccessJwt, 662 Did: session.Did, 663 }, 664 Host: session.PdsEndpoint, 665 } 666 667 l := log.SubLogger(logger, "bluesky") 668 669 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 670 defer ticker.Stop() 671 672 for { 673 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 674 if err != nil { 675 l.Error("failed to fetch bluesky posts", "err", err) 676 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 677 l.Error("failed to insert bluesky posts", "err", err) 678 } else { 679 l.Info("inserted bluesky posts", "count", len(posts)) 680 } 681 682 select { 683 case <-ticker.C: 684 case <-ctx.Done(): 685 l.Info("stopping bluesky updater") 686 return 687 } 688 } 689}