Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
at master 685 lines 18 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cloudflare" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/indexer" 20 "tangled.org/core/appview/mentions" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 dbnotify "tangled.org/core/appview/notify/db" 24 phnotify "tangled.org/core/appview/notify/posthog" 25 "tangled.org/core/appview/oauth" 26 "tangled.org/core/appview/pages" 27 "tangled.org/core/appview/reporesolver" 28 "tangled.org/core/appview/validator" 29 xrpcclient "tangled.org/core/appview/xrpcclient" 30 "tangled.org/core/consts" 31 "tangled.org/core/eventconsumer" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/jetstream" 34 "tangled.org/core/log" 35 tlog "tangled.org/core/log" 36 "tangled.org/core/orm" 37 "tangled.org/core/rbac" 38 "tangled.org/core/tid" 39 40 comatproto "github.com/bluesky-social/indigo/api/atproto" 41 "github.com/bluesky-social/indigo/atproto/atclient" 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 "github.com/bluesky-social/indigo/xrpc" 45 securejoin "github.com/cyphar/filepath-securejoin" 46 "github.com/go-chi/chi/v5" 47 "github.com/posthog/posthog-go" 48) 49 50type State struct { 51 db *db.DB 52 notifier notify.Notifier 53 indexer *indexer.Indexer 54 oauth *oauth.OAuth 55 enforcer *rbac.Enforcer 56 pages *pages.Pages 57 idResolver *idresolver.Resolver 58 mentionsResolver *mentions.Resolver 59 posthog posthog.Client 60 jc *jetstream.JetstreamClient 61 config *config.Config 62 repoResolver *reporesolver.RepoResolver 63 knotstream *eventconsumer.Consumer 64 spindlestream *eventconsumer.Consumer 65 logger *slog.Logger 66 validator *validator.Validator 67 cfClient *cloudflare.Client 68} 69 70func Make(ctx context.Context, config *config.Config) (*State, error) { 71 logger := tlog.FromContext(ctx) 72 73 d, err := db.Make(ctx, config.Core.DbPath) 74 if err != nil { 75 return nil, fmt.Errorf("failed to create db: %w", err) 76 } 77 78 indexer := indexer.New(log.SubLogger(logger, "indexer")) 79 err = indexer.Init(ctx, d) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create indexer: %w", err) 82 } 83 84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create enforcer: %w", err) 87 } 88 89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 90 if err != nil { 91 logger.Error("failed to create redis resolver", "err", err) 92 res = idresolver.DefaultResolver(config.Plc.PLCURL) 93 } 94 95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create posthog client: %w", err) 98 } 99 100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 102 if err != nil { 103 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 104 } 105 validator := validator.New(d, res, enforcer) 106 107 repoResolver := reporesolver.New(config, enforcer, d) 108 109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 110 111 wrapper := db.DbWrapper{Execer: d} 112 jc, err := jetstream.NewJetstreamClient( 113 config.Jetstream.Endpoint, 114 "appview", 115 []string{ 116 tangled.GraphFollowNSID, 117 tangled.FeedStarNSID, 118 tangled.PublicKeyNSID, 119 tangled.RepoArtifactNSID, 120 tangled.ActorProfileNSID, 121 tangled.KnotMemberNSID, 122 tangled.SpindleMemberNSID, 123 tangled.SpindleNSID, 124 tangled.StringNSID, 125 tangled.RepoIssueNSID, 126 tangled.RepoIssueCommentNSID, 127 tangled.LabelDefinitionNSID, 128 tangled.LabelOpNSID, 129 }, 130 nil, 131 tlog.SubLogger(logger, "jetstream"), 132 wrapper, 133 false, 134 135 // in-memory filter is inapplicable to appview so 136 // we'll never log dids anyway. 137 false, 138 ) 139 if err != nil { 140 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 141 } 142 143 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 144 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 145 } 146 147 ingester := appview.Ingester{ 148 Db: wrapper, 149 Enforcer: enforcer, 150 IdResolver: res, 151 Config: config, 152 Logger: log.SubLogger(logger, "ingester"), 153 Validator: validator, 154 } 155 err = jc.StartJetstream(ctx, ingester.Ingest()) 156 if err != nil { 157 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 158 } 159 160 var notifiers []notify.Notifier 161 162 // Always add the database notifier 163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 164 165 // Add other notifiers in production only 166 if !config.Core.Dev { 167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 168 } 169 notifiers = append(notifiers, indexer) 170 171 // Add webhook notifier 172 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 173 174 notifier := notify.NewMergedNotifier(notifiers) 175 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 176 177 var cfClient *cloudflare.Client 178 if config.Cloudflare.ApiToken != "" { 179 cfClient, err = cloudflare.New(config) 180 if err != nil { 181 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 182 cfClient = nil 183 } 184 } 185 186 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 187 if err != nil { 188 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 189 } 190 knotstream.Start(ctx) 191 192 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 193 if err != nil { 194 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 195 } 196 spindlestream.Start(ctx) 197 198 state := &State{ 199 db: d, 200 notifier: notifier, 201 indexer: indexer, 202 oauth: oauth, 203 enforcer: enforcer, 204 pages: pages, 205 idResolver: res, 206 mentionsResolver: mentionsResolver, 207 posthog: posthog, 208 jc: jc, 209 config: config, 210 repoResolver: repoResolver, 211 knotstream: knotstream, 212 spindlestream: spindlestream, 213 logger: logger, 214 validator: validator, 215 cfClient: cfClient, 216 } 217 218 // fetch initial bluesky posts if configured 219 go fetchBskyPosts(ctx, res, config, d, logger) 220 221 return state, nil 222} 223 224func (s *State) Close() error { 225 // other close up logic goes here 226 return s.db.Close() 227} 228 229func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) { 230 w.Header().Set("Content-Type", "text/plain") 231 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 232 233 securityTxt := `Contact: mailto:security@tangled.org 234Preferred-Languages: en 235Canonical: https://tangled.org/.well-known/security.txt 236Expires: 2030-01-01T21:59:00.000Z 237` 238 w.Write([]byte(securityTxt)) 239} 240 241func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 242 w.Header().Set("Content-Type", "text/plain") 243 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 244 245 robotsTxt := `# Hello, Tanglers! 246User-agent: * 247Allow: / 248Disallow: /*/*/settings 249Disallow: /settings 250Disallow: /*/*/compare 251Disallow: /*/*/fork 252 253Crawl-delay: 1 254` 255 w.Write([]byte(robotsTxt)) 256} 257 258func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 259 user := s.oauth.GetMultiAccountUser(r) 260 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 261 LoggedInUser: user, 262 }) 263} 264 265func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 266 user := s.oauth.GetMultiAccountUser(r) 267 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 268 LoggedInUser: user, 269 }) 270} 271 272func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 273 user := s.oauth.GetMultiAccountUser(r) 274 s.pages.Brand(w, pages.BrandParams{ 275 LoggedInUser: user, 276 }) 277} 278 279func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 280 user := s.oauth.GetMultiAccountUser(r) 281 if user == nil { 282 return 283 } 284 285 l := s.logger.With("handler", "UpgradeBanner") 286 l = l.With("did", user.Active.Did) 287 288 regs, err := db.GetRegistrations( 289 s.db, 290 orm.FilterEq("did", user.Active.Did), 291 orm.FilterEq("needs_upgrade", 1), 292 ) 293 if err != nil { 294 l.Error("non-fatal: failed to get registrations", "err", err) 295 } 296 297 spindles, err := db.GetSpindles( 298 s.db, 299 orm.FilterEq("owner", user.Active.Did), 300 orm.FilterEq("needs_upgrade", 1), 301 ) 302 if err != nil { 303 l.Error("non-fatal: failed to get spindles", "err", err) 304 } 305 306 if regs == nil && spindles == nil { 307 return 308 } 309 310 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 311 Registrations: regs, 312 Spindles: spindles, 313 }) 314} 315 316func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 317 user := chi.URLParam(r, "user") 318 user = strings.TrimPrefix(user, "@") 319 320 if user == "" { 321 w.WriteHeader(http.StatusBadRequest) 322 return 323 } 324 325 id, err := s.idResolver.ResolveIdent(r.Context(), user) 326 if err != nil { 327 w.WriteHeader(http.StatusInternalServerError) 328 return 329 } 330 331 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 332 if err != nil { 333 s.logger.Error("failed to get public keys", "err", err) 334 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 335 return 336 } 337 338 if len(pubKeys) == 0 { 339 w.WriteHeader(http.StatusNoContent) 340 return 341 } 342 343 for _, k := range pubKeys { 344 key := strings.TrimRight(k.Key, "\n") 345 fmt.Fprintln(w, key) 346 } 347} 348 349func validateRepoName(name string) error { 350 // check for path traversal attempts 351 if name == "." || name == ".." || 352 strings.Contains(name, "/") || strings.Contains(name, "\\") { 353 return fmt.Errorf("Repository name contains invalid path characters") 354 } 355 356 // check for sequences that could be used for traversal when normalized 357 if strings.Contains(name, "./") || strings.Contains(name, "../") || 358 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 359 return fmt.Errorf("Repository name contains invalid path sequence") 360 } 361 362 // then continue with character validation 363 for _, char := range name { 364 if !((char >= 'a' && char <= 'z') || 365 (char >= 'A' && char <= 'Z') || 366 (char >= '0' && char <= '9') || 367 char == '-' || char == '_' || char == '.') { 368 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 369 } 370 } 371 372 // additional check to prevent multiple sequential dots 373 if strings.Contains(name, "..") { 374 return fmt.Errorf("Repository name cannot contain sequential dots") 375 } 376 377 // if all checks pass 378 return nil 379} 380 381func stripGitExt(name string) string { 382 return strings.TrimSuffix(name, ".git") 383} 384 385func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 386 switch r.Method { 387 case http.MethodGet: 388 user := s.oauth.GetMultiAccountUser(r) 389 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 390 if err != nil { 391 s.pages.Notice(w, "repo", "Invalid user account.") 392 return 393 } 394 395 s.pages.NewRepo(w, pages.NewRepoParams{ 396 LoggedInUser: user, 397 Knots: knots, 398 }) 399 400 case http.MethodPost: 401 l := s.logger.With("handler", "NewRepo") 402 403 user := s.oauth.GetMultiAccountUser(r) 404 l = l.With("did", user.Active.Did) 405 406 // form validation 407 domain := r.FormValue("domain") 408 if domain == "" { 409 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 410 return 411 } 412 l = l.With("knot", domain) 413 414 repoName := r.FormValue("name") 415 if repoName == "" { 416 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 417 return 418 } 419 420 if err := validateRepoName(repoName); err != nil { 421 s.pages.Notice(w, "repo", err.Error()) 422 return 423 } 424 repoName = stripGitExt(repoName) 425 l = l.With("repoName", repoName) 426 427 defaultBranch := r.FormValue("branch") 428 if defaultBranch == "" { 429 defaultBranch = "main" 430 } 431 l = l.With("defaultBranch", defaultBranch) 432 433 description := r.FormValue("description") 434 if len([]rune(description)) > 140 { 435 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 436 return 437 } 438 439 // ACL validation 440 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 441 if err != nil || !ok { 442 l.Info("unauthorized") 443 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 444 return 445 } 446 447 // Check for existing repos 448 existingRepo, err := db.GetRepo( 449 s.db, 450 orm.FilterEq("did", user.Active.Did), 451 orm.FilterEq("name", repoName), 452 ) 453 if err == nil && existingRepo != nil { 454 l.Info("repo exists") 455 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 456 return 457 } 458 459 // create atproto record for this repo 460 rkey := tid.TID() 461 repo := &models.Repo{ 462 Did: user.Active.Did, 463 Name: repoName, 464 Knot: domain, 465 Rkey: rkey, 466 Description: description, 467 Created: time.Now(), 468 Labels: s.config.Label.DefaultLabelDefs, 469 } 470 record := repo.AsRecord() 471 472 atpClient, err := s.oauth.AuthorizedClient(r) 473 if err != nil { 474 l.Info("PDS write failed", "err", err) 475 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 476 return 477 } 478 479 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 480 Collection: tangled.RepoNSID, 481 Repo: user.Active.Did, 482 Rkey: rkey, 483 Record: &lexutil.LexiconTypeDecoder{ 484 Val: &record, 485 }, 486 }) 487 if err != nil { 488 l.Info("PDS write failed", "err", err) 489 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 490 return 491 } 492 493 aturi := atresp.Uri 494 l = l.With("aturi", aturi) 495 l.Info("wrote to PDS") 496 497 tx, err := s.db.BeginTx(r.Context(), nil) 498 if err != nil { 499 l.Info("txn failed", "err", err) 500 s.pages.Notice(w, "repo", "Failed to save repository information.") 501 return 502 } 503 504 // The rollback function reverts a few things on failure: 505 // - the pending txn 506 // - the ACLs 507 // - the atproto record created 508 rollback := func() { 509 err1 := tx.Rollback() 510 err2 := s.enforcer.E.LoadPolicy() 511 err3 := rollbackRecord(context.Background(), aturi, atpClient) 512 513 // ignore txn complete errors, this is okay 514 if errors.Is(err1, sql.ErrTxDone) { 515 err1 = nil 516 } 517 518 if errs := errors.Join(err1, err2, err3); errs != nil { 519 l.Error("failed to rollback changes", "errs", errs) 520 return 521 } 522 } 523 defer rollback() 524 525 client, err := s.oauth.ServiceClient( 526 r, 527 oauth.WithService(domain), 528 oauth.WithLxm(tangled.RepoCreateNSID), 529 oauth.WithDev(s.config.Core.Dev), 530 ) 531 if err != nil { 532 l.Error("service auth failed", "err", err) 533 s.pages.Notice(w, "repo", "Failed to reach PDS.") 534 return 535 } 536 537 xe := tangled.RepoCreate( 538 r.Context(), 539 client, 540 &tangled.RepoCreate_Input{ 541 Rkey: rkey, 542 }, 543 ) 544 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 545 l.Error("xrpc error", "xe", xe) 546 s.pages.Notice(w, "repo", err.Error()) 547 return 548 } 549 550 err = db.AddRepo(tx, repo) 551 if err != nil { 552 l.Error("db write failed", "err", err) 553 s.pages.Notice(w, "repo", "Failed to save repository information.") 554 return 555 } 556 557 // acls 558 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 559 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 560 if err != nil { 561 l.Error("acl setup failed", "err", err) 562 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 563 return 564 } 565 566 err = tx.Commit() 567 if err != nil { 568 l.Error("txn commit failed", "err", err) 569 http.Error(w, err.Error(), http.StatusInternalServerError) 570 return 571 } 572 573 err = s.enforcer.E.SavePolicy() 574 if err != nil { 575 l.Error("acl save failed", "err", err) 576 http.Error(w, err.Error(), http.StatusInternalServerError) 577 return 578 } 579 580 // reset the ATURI because the transaction completed successfully 581 aturi = "" 582 583 s.notifier.NewRepo(r.Context(), repo) 584 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 585 } 586} 587 588// this is used to rollback changes made to the PDS 589// 590// it is a no-op if the provided ATURI is empty 591func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error { 592 if aturi == "" { 593 return nil 594 } 595 596 parsed := syntax.ATURI(aturi) 597 598 collection := parsed.Collection().String() 599 repo := parsed.Authority().String() 600 rkey := parsed.RecordKey().String() 601 602 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 603 Collection: collection, 604 Repo: repo, 605 Rkey: rkey, 606 }) 607 return err 608} 609 610func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 611 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 612 if err != nil { 613 return err 614 } 615 // already present 616 if len(defaultLabels) == len(defaults) { 617 return nil 618 } 619 620 labelDefs, err := models.FetchLabelDefs(r, defaults) 621 if err != nil { 622 return err 623 } 624 625 // Insert each label definition to the database 626 for _, labelDef := range labelDefs { 627 _, err = db.AddLabelDefinition(e, &labelDef) 628 if err != nil { 629 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 630 } 631 } 632 633 return nil 634} 635 636func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 637 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 638 if err != nil { 639 logger.Error("failed to resolve tangled.org DID", "err", err) 640 return 641 } 642 643 pdsEndpoint := resolved.PDSEndpoint() 644 if pdsEndpoint == "" { 645 logger.Error("no PDS endpoint found for tangled.sh DID") 646 return 647 } 648 649 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 650 if err != nil { 651 logger.Error("failed to create appassword session... skipping fetch", "err", err) 652 return 653 } 654 655 client := xrpc.Client{ 656 Auth: &xrpc.AuthInfo{ 657 AccessJwt: session.AccessJwt, 658 Did: session.Did, 659 }, 660 Host: session.PdsEndpoint, 661 } 662 663 l := log.SubLogger(logger, "bluesky") 664 665 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 666 defer ticker.Stop() 667 668 for { 669 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 670 if err != nil { 671 l.Error("failed to fetch bluesky posts", "err", err) 672 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 673 l.Error("failed to insert bluesky posts", "err", err) 674 } else { 675 l.Info("inserted bluesky posts", "count", len(posts)) 676 } 677 678 select { 679 case <-ticker.C: 680 case <-ctx.Done(): 681 l.Info("stopping bluesky updater") 682 return 683 } 684 } 685}