Monorepo for Tangled tangled.org
at master 17 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/mentions" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/appview/notify" 21 dbnotify "tangled.org/core/appview/notify/db" 22 phnotify "tangled.org/core/appview/notify/posthog" 23 "tangled.org/core/appview/oauth" 24 "tangled.org/core/appview/pages" 25 "tangled.org/core/appview/reporesolver" 26 "tangled.org/core/appview/validator" 27 xrpcclient "tangled.org/core/appview/xrpcclient" 28 "tangled.org/core/eventconsumer" 29 "tangled.org/core/idresolver" 30 "tangled.org/core/jetstream" 31 "tangled.org/core/log" 32 tlog "tangled.org/core/log" 33 "tangled.org/core/orm" 34 "tangled.org/core/rbac" 35 "tangled.org/core/tid" 36 37 comatproto "github.com/bluesky-social/indigo/api/atproto" 38 atpclient "github.com/bluesky-social/indigo/atproto/client" 39 "github.com/bluesky-social/indigo/atproto/syntax" 40 lexutil "github.com/bluesky-social/indigo/lex/util" 41 securejoin "github.com/cyphar/filepath-securejoin" 42 "github.com/go-chi/chi/v5" 43 "github.com/posthog/posthog-go" 44) 45 46type State struct { 47 db *db.DB 48 notifier notify.Notifier 49 indexer *indexer.Indexer 50 oauth *oauth.OAuth 51 enforcer *rbac.Enforcer 52 pages *pages.Pages 53 idResolver *idresolver.Resolver 54 mentionsResolver *mentions.Resolver 55 posthog posthog.Client 56 jc *jetstream.JetstreamClient 57 config *config.Config 58 repoResolver *reporesolver.RepoResolver 59 knotstream *eventconsumer.Consumer 60 spindlestream *eventconsumer.Consumer 61 logger *slog.Logger 62 validator *validator.Validator 63} 64 65func Make(ctx context.Context, config *config.Config) (*State, error) { 66 logger := tlog.FromContext(ctx) 67 68 d, err := db.Make(ctx, config.Core.DbPath) 69 if err != nil { 70 return nil, fmt.Errorf("failed to create db: %w", err) 71 } 72 73 indexer := indexer.New(log.SubLogger(logger, "indexer")) 74 err = indexer.Init(ctx, d) 75 if err != nil { 76 return nil, fmt.Errorf("failed to create indexer: %w", err) 77 } 78 79 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create enforcer: %w", err) 82 } 83 84 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 85 if err != nil { 86 logger.Error("failed to create redis resolver", "err", err) 87 res = idresolver.DefaultResolver(config.Plc.PLCURL) 88 } 89 90 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 91 if err != nil { 92 return nil, fmt.Errorf("failed to create posthog client: %w", err) 93 } 94 95 pages := pages.NewPages(config, res, log.SubLogger(logger, "pages")) 96 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 97 if err != nil { 98 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 99 } 100 validator := validator.New(d, res, enforcer) 101 102 repoResolver := reporesolver.New(config, enforcer, d) 103 104 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 105 106 wrapper := db.DbWrapper{Execer: d} 107 jc, err := jetstream.NewJetstreamClient( 108 config.Jetstream.Endpoint, 109 "appview", 110 []string{ 111 tangled.GraphFollowNSID, 112 tangled.FeedStarNSID, 113 tangled.PublicKeyNSID, 114 tangled.RepoArtifactNSID, 115 tangled.ActorProfileNSID, 116 tangled.SpindleMemberNSID, 117 tangled.SpindleNSID, 118 tangled.StringNSID, 119 tangled.RepoIssueNSID, 120 tangled.RepoIssueCommentNSID, 121 tangled.LabelDefinitionNSID, 122 tangled.LabelOpNSID, 123 }, 124 nil, 125 tlog.SubLogger(logger, "jetstream"), 126 wrapper, 127 false, 128 129 // in-memory filter is inapplicalble to appview so 130 // we'll never log dids anyway. 131 false, 132 ) 133 if err != nil { 134 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 135 } 136 137 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 138 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 139 } 140 141 ingester := appview.Ingester{ 142 Db: wrapper, 143 Enforcer: enforcer, 144 IdResolver: res, 145 Config: config, 146 Logger: log.SubLogger(logger, "ingester"), 147 Validator: validator, 148 } 149 err = jc.StartJetstream(ctx, ingester.Ingest()) 150 if err != nil { 151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 152 } 153 154 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 155 if err != nil { 156 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 157 } 158 knotstream.Start(ctx) 159 160 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 161 if err != nil { 162 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 163 } 164 spindlestream.Start(ctx) 165 166 var notifiers []notify.Notifier 167 168 // Always add the database notifier 169 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 170 171 // Add other notifiers in production only 172 if !config.Core.Dev { 173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 174 } 175 notifiers = append(notifiers, indexer) 176 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify")) 177 178 state := &State{ 179 d, 180 notifier, 181 indexer, 182 oauth, 183 enforcer, 184 pages, 185 res, 186 mentionsResolver, 187 posthog, 188 jc, 189 config, 190 repoResolver, 191 knotstream, 192 spindlestream, 193 logger, 194 validator, 195 } 196 197 return state, nil 198} 199 200func (s *State) Close() error { 201 // other close up logic goes here 202 return s.db.Close() 203} 204 205func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 206 w.Header().Set("Content-Type", "text/plain") 207 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 208 209 robotsTxt := `User-agent: * 210Allow: / 211` 212 w.Write([]byte(robotsTxt)) 213} 214 215func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 216 user := s.oauth.GetUser(r) 217 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 218 LoggedInUser: user, 219 }) 220} 221 222func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 223 user := s.oauth.GetUser(r) 224 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 225 LoggedInUser: user, 226 }) 227} 228 229func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 230 user := s.oauth.GetUser(r) 231 s.pages.Brand(w, pages.BrandParams{ 232 LoggedInUser: user, 233 }) 234} 235 236func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 237 if s.oauth.GetUser(r) != nil { 238 s.Timeline(w, r) 239 return 240 } 241 s.Home(w, r) 242} 243 244func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 245 user := s.oauth.GetUser(r) 246 247 // TODO: set this flag based on the UI 248 filtered := false 249 250 var userDid string 251 if user != nil { 252 userDid = user.Did 253 } 254 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 255 if err != nil { 256 s.logger.Error("failed to make timeline", "err", err) 257 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 258 } 259 260 repos, err := db.GetTopStarredReposLastWeek(s.db) 261 if err != nil { 262 s.logger.Error("failed to get top starred repos", "err", err) 263 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 264 return 265 } 266 267 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 268 if err != nil { 269 // non-fatal 270 } 271 272 s.pages.Timeline(w, pages.TimelineParams{ 273 LoggedInUser: user, 274 Timeline: timeline, 275 Repos: repos, 276 GfiLabel: gfiLabel, 277 }) 278} 279 280func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 281 user := s.oauth.GetUser(r) 282 if user == nil { 283 return 284 } 285 286 l := s.logger.With("handler", "UpgradeBanner") 287 l = l.With("did", user.Did) 288 289 regs, err := db.GetRegistrations( 290 s.db, 291 orm.FilterEq("did", user.Did), 292 orm.FilterEq("needs_upgrade", 1), 293 ) 294 if err != nil { 295 l.Error("non-fatal: failed to get registrations", "err", err) 296 } 297 298 spindles, err := db.GetSpindles( 299 s.db, 300 orm.FilterEq("owner", user.Did), 301 orm.FilterEq("needs_upgrade", 1), 302 ) 303 if err != nil { 304 l.Error("non-fatal: failed to get spindles", "err", err) 305 } 306 307 if regs == nil && spindles == nil { 308 return 309 } 310 311 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 312 Registrations: regs, 313 Spindles: spindles, 314 }) 315} 316 317func (s *State) Home(w http.ResponseWriter, r *http.Request) { 318 // TODO: set this flag based on the UI 319 filtered := false 320 321 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 322 if err != nil { 323 s.logger.Error("failed to make timeline", "err", err) 324 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 325 return 326 } 327 328 repos, err := db.GetTopStarredReposLastWeek(s.db) 329 if err != nil { 330 s.logger.Error("failed to get top starred repos", "err", err) 331 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 332 return 333 } 334 335 s.pages.Home(w, pages.TimelineParams{ 336 LoggedInUser: nil, 337 Timeline: timeline, 338 Repos: repos, 339 }) 340} 341 342func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 343 user := chi.URLParam(r, "user") 344 user = strings.TrimPrefix(user, "@") 345 346 if user == "" { 347 w.WriteHeader(http.StatusBadRequest) 348 return 349 } 350 351 id, err := s.idResolver.ResolveIdent(r.Context(), user) 352 if err != nil { 353 w.WriteHeader(http.StatusInternalServerError) 354 return 355 } 356 357 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 358 if err != nil { 359 s.logger.Error("failed to get public keys", "err", err) 360 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 361 return 362 } 363 364 if len(pubKeys) == 0 { 365 w.WriteHeader(http.StatusNoContent) 366 return 367 } 368 369 for _, k := range pubKeys { 370 key := strings.TrimRight(k.Key, "\n") 371 fmt.Fprintln(w, key) 372 } 373} 374 375func validateRepoName(name string) error { 376 // check for path traversal attempts 377 if name == "." || name == ".." || 378 strings.Contains(name, "/") || strings.Contains(name, "\\") { 379 return fmt.Errorf("Repository name contains invalid path characters") 380 } 381 382 // check for sequences that could be used for traversal when normalized 383 if strings.Contains(name, "./") || strings.Contains(name, "../") || 384 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 385 return fmt.Errorf("Repository name contains invalid path sequence") 386 } 387 388 // then continue with character validation 389 for _, char := range name { 390 if !((char >= 'a' && char <= 'z') || 391 (char >= 'A' && char <= 'Z') || 392 (char >= '0' && char <= '9') || 393 char == '-' || char == '_' || char == '.') { 394 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 395 } 396 } 397 398 // additional check to prevent multiple sequential dots 399 if strings.Contains(name, "..") { 400 return fmt.Errorf("Repository name cannot contain sequential dots") 401 } 402 403 // if all checks pass 404 return nil 405} 406 407func stripGitExt(name string) string { 408 return strings.TrimSuffix(name, ".git") 409} 410 411func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 412 switch r.Method { 413 case http.MethodGet: 414 user := s.oauth.GetUser(r) 415 knots, err := s.enforcer.GetKnotsForUser(user.Did) 416 if err != nil { 417 s.pages.Notice(w, "repo", "Invalid user account.") 418 return 419 } 420 421 s.pages.NewRepo(w, pages.NewRepoParams{ 422 LoggedInUser: user, 423 Knots: knots, 424 }) 425 426 case http.MethodPost: 427 l := s.logger.With("handler", "NewRepo") 428 429 user := s.oauth.GetUser(r) 430 l = l.With("did", user.Did) 431 432 // form validation 433 domain := r.FormValue("domain") 434 if domain == "" { 435 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 436 return 437 } 438 l = l.With("knot", domain) 439 440 repoName := r.FormValue("name") 441 if repoName == "" { 442 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 443 return 444 } 445 446 if err := validateRepoName(repoName); err != nil { 447 s.pages.Notice(w, "repo", err.Error()) 448 return 449 } 450 repoName = stripGitExt(repoName) 451 l = l.With("repoName", repoName) 452 453 defaultBranch := r.FormValue("branch") 454 if defaultBranch == "" { 455 defaultBranch = "main" 456 } 457 l = l.With("defaultBranch", defaultBranch) 458 459 description := r.FormValue("description") 460 461 // ACL validation 462 ok, err := s.enforcer.E.Enforce(user.Did, domain, domain, "repo:create") 463 if err != nil || !ok { 464 l.Info("unauthorized") 465 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 466 return 467 } 468 469 // Check for existing repos 470 existingRepo, err := db.GetRepo( 471 s.db, 472 orm.FilterEq("did", user.Did), 473 orm.FilterEq("name", repoName), 474 ) 475 if err == nil && existingRepo != nil { 476 l.Info("repo exists") 477 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 478 return 479 } 480 481 // create atproto record for this repo 482 rkey := tid.TID() 483 repo := &models.Repo{ 484 Did: user.Did, 485 Name: repoName, 486 Knot: domain, 487 Rkey: rkey, 488 Description: description, 489 Created: time.Now(), 490 Labels: s.config.Label.DefaultLabelDefs, 491 } 492 record := repo.AsRecord() 493 494 atpClient, err := s.oauth.AuthorizedClient(r) 495 if err != nil { 496 l.Info("PDS write failed", "err", err) 497 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 498 return 499 } 500 501 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 502 Collection: tangled.RepoNSID, 503 Repo: user.Did, 504 Rkey: rkey, 505 Record: &lexutil.LexiconTypeDecoder{ 506 Val: &record, 507 }, 508 }) 509 if err != nil { 510 l.Info("PDS write failed", "err", err) 511 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 512 return 513 } 514 515 aturi := atresp.Uri 516 l = l.With("aturi", aturi) 517 l.Info("wrote to PDS") 518 519 tx, err := s.db.BeginTx(r.Context(), nil) 520 if err != nil { 521 l.Info("txn failed", "err", err) 522 s.pages.Notice(w, "repo", "Failed to save repository information.") 523 return 524 } 525 526 // The rollback function reverts a few things on failure: 527 // - the pending txn 528 // - the ACLs 529 // - the atproto record created 530 rollback := func() { 531 err1 := tx.Rollback() 532 err2 := s.enforcer.E.LoadPolicy() 533 err3 := rollbackRecord(context.Background(), aturi, atpClient) 534 535 // ignore txn complete errors, this is okay 536 if errors.Is(err1, sql.ErrTxDone) { 537 err1 = nil 538 } 539 540 if errs := errors.Join(err1, err2, err3); errs != nil { 541 l.Error("failed to rollback changes", "errs", errs) 542 return 543 } 544 } 545 defer rollback() 546 547 client, err := s.oauth.ServiceClient( 548 r, 549 oauth.WithService(domain), 550 oauth.WithLxm(tangled.RepoCreateNSID), 551 oauth.WithDev(s.config.Core.Dev), 552 ) 553 if err != nil { 554 l.Error("service auth failed", "err", err) 555 s.pages.Notice(w, "repo", "Failed to reach PDS.") 556 return 557 } 558 559 xe := tangled.RepoCreate( 560 r.Context(), 561 client, 562 &tangled.RepoCreate_Input{ 563 Rkey: rkey, 564 }, 565 ) 566 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 567 l.Error("xrpc error", "xe", xe) 568 s.pages.Notice(w, "repo", err.Error()) 569 return 570 } 571 572 err = db.AddRepo(tx, repo) 573 if err != nil { 574 l.Error("db write failed", "err", err) 575 s.pages.Notice(w, "repo", "Failed to save repository information.") 576 return 577 } 578 579 // acls 580 p, _ := securejoin.SecureJoin(user.Did, repoName) 581 err = s.enforcer.AddRepo(user.Did, domain, p) 582 if err != nil { 583 l.Error("acl setup failed", "err", err) 584 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 585 return 586 } 587 588 err = tx.Commit() 589 if err != nil { 590 l.Error("txn commit failed", "err", err) 591 http.Error(w, err.Error(), http.StatusInternalServerError) 592 return 593 } 594 595 err = s.enforcer.E.SavePolicy() 596 if err != nil { 597 l.Error("acl save failed", "err", err) 598 http.Error(w, err.Error(), http.StatusInternalServerError) 599 return 600 } 601 602 // reset the ATURI because the transaction completed successfully 603 aturi = "" 604 605 s.notifier.NewRepo(r.Context(), repo) 606 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Did, repoName)) 607 } 608} 609 610// this is used to rollback changes made to the PDS 611// 612// it is a no-op if the provided ATURI is empty 613func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 614 if aturi == "" { 615 return nil 616 } 617 618 parsed := syntax.ATURI(aturi) 619 620 collection := parsed.Collection().String() 621 repo := parsed.Authority().String() 622 rkey := parsed.RecordKey().String() 623 624 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 625 Collection: collection, 626 Repo: repo, 627 Rkey: rkey, 628 }) 629 return err 630} 631 632func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 633 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 634 if err != nil { 635 return err 636 } 637 // already present 638 if len(defaultLabels) == len(defaults) { 639 return nil 640 } 641 642 labelDefs, err := models.FetchLabelDefs(r, defaults) 643 if err != nil { 644 return err 645 } 646 647 // Insert each label definition to the database 648 for _, labelDef := range labelDefs { 649 _, err = db.AddLabelDefinition(e, &labelDef) 650 if err != nil { 651 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 652 } 653 } 654 655 return nil 656}