[mirror] Scalable static site server for Git forges (like GitHub Pages)
10
fork

Configure Feed

Select the types of activity you want to include in your feed.

at latest 933 lines 28 kB view raw
1package git_pages 2 3import ( 4 "bytes" 5 "context" 6 "crypto/sha256" 7 "fmt" 8 "io" 9 "iter" 10 "net/http" 11 "path" 12 "strings" 13 "sync" 14 "time" 15 16 "github.com/c2h5oh/datasize" 17 "github.com/maypok86/otter/v2" 18 "github.com/minio/minio-go/v7" 19 "github.com/minio/minio-go/v7/pkg/credentials" 20 "github.com/prometheus/client_golang/prometheus" 21 "github.com/prometheus/client_golang/prometheus/promauto" 22) 23 24var ( 25 blobsDedupedCount prometheus.Counter 26 blobsDedupedBytes prometheus.Counter 27 28 blobCacheHitsCount prometheus.Counter 29 blobCacheHitsBytes prometheus.Counter 30 blobCacheMissesCount prometheus.Counter 31 blobCacheMissesBytes prometheus.Counter 32 blobCacheEvictionsCount prometheus.Counter 33 blobCacheEvictionsBytes prometheus.Counter 34 35 manifestCacheHitsCount prometheus.Counter 36 manifestCacheMissesCount prometheus.Counter 37 manifestCacheEvictionsCount prometheus.Counter 38 39 s3GetObjectDurationSeconds *prometheus.HistogramVec 40 s3GetObjectResponseCount *prometheus.CounterVec 41) 42 43func initS3BackendMetrics() { 44 blobsDedupedCount = promauto.NewCounter(prometheus.CounterOpts{ 45 Name: "git_pages_blobs_deduped", 46 Help: "Count of blobs deduplicated", 47 }) 48 blobsDedupedBytes = promauto.NewCounter(prometheus.CounterOpts{ 49 Name: "git_pages_blobs_deduped_bytes", 50 Help: "Total size in bytes of blobs deduplicated", 51 }) 52 53 blobCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{ 54 Name: "git_pages_blob_cache_hits_count", 55 Help: "Count of blobs that were retrieved from the cache", 56 }) 57 blobCacheHitsBytes = promauto.NewCounter(prometheus.CounterOpts{ 58 Name: "git_pages_blob_cache_hits_bytes", 59 Help: "Total size in bytes of blobs that were retrieved from the cache", 60 }) 61 blobCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{ 62 Name: "git_pages_blob_cache_misses_count", 63 Help: "Count of blobs that were not found in the cache (and were then successfully cached)", 64 }) 65 blobCacheMissesBytes = promauto.NewCounter(prometheus.CounterOpts{ 66 Name: "git_pages_blob_cache_misses_bytes", 67 Help: "Total size in bytes of blobs that were not found in the cache (and were then successfully cached)", 68 }) 69 blobCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{ 70 Name: "git_pages_blob_cache_evictions_count", 71 Help: "Count of blobs evicted from the cache", 72 }) 73 blobCacheEvictionsBytes = promauto.NewCounter(prometheus.CounterOpts{ 74 Name: "git_pages_blob_cache_evictions_bytes", 75 Help: "Total size in bytes of blobs evicted from the cache", 76 }) 77 78 manifestCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{ 79 Name: "git_pages_manifest_cache_hits_count", 80 Help: "Count of manifests that were retrieved from the cache", 81 }) 82 manifestCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{ 83 Name: "git_pages_manifest_cache_misses_count", 84 Help: "Count of manifests that were not found in the cache (and were then successfully cached)", 85 }) 86 manifestCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{ 87 Name: "git_pages_manifest_cache_evictions_count", 88 Help: "Count of manifests evicted from the cache", 89 }) 90 91 s3GetObjectDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ 92 Name: "git_pages_s3_get_object_duration_seconds", 93 Help: "Time to read a whole object from S3", 94 Buckets: []float64{.01, .025, .05, .1, .25, .5, .75, 1, 1.25, 1.5, 1.75, 2, 2.5, 5, 10}, 95 96 NativeHistogramBucketFactor: 1.1, 97 NativeHistogramMaxBucketNumber: 100, 98 NativeHistogramMinResetDuration: 10 * time.Minute, 99 }, []string{"kind"}) 100 s3GetObjectResponseCount = promauto.NewCounterVec(prometheus.CounterOpts{ 101 Name: "git_pages_s3_get_object_responses_count", 102 Help: "Count of s3:GetObject responses", 103 }, []string{"kind", "code"}) 104} 105 106// Blobs can be safely cached indefinitely. They only need to be evicted to preserve memory. 107type CachedBlob struct { 108 blob []byte 109 mtime time.Time 110} 111 112func (c *CachedBlob) Weight() uint32 { return uint32(len(c.blob)) } 113 114// Manifests can only be cached for a short time to avoid serving stale content. Browser 115// page loads cause a large burst of manifest accesses that are essential for serving 116// `304 No Content` responses and these need to be handled very quickly, so both hits and 117// misses are cached. 118type CachedManifest struct { 119 manifest *Manifest 120 weight uint32 121 metadata ManifestMetadata 122 err error 123} 124 125func (c *CachedManifest) Weight() uint32 { return c.weight } 126 127type S3Backend struct { 128 client *minio.Client 129 bucket string 130 blobCache *observedCache[string, *CachedBlob] 131 siteCache *observedCache[string, *CachedManifest] 132 featureCache *otter.Cache[BackendFeature, bool] 133} 134 135var _ Backend = (*S3Backend)(nil) 136 137func makeCacheOptions[K comparable, V any]( 138 config *CacheConfig, 139 weigher func(K, V) uint32, 140) *otter.Options[K, V] { 141 options := &otter.Options[K, V]{} 142 if config.MaxSize != 0 { 143 options.MaximumWeight = config.MaxSize.Bytes() 144 options.Weigher = weigher 145 } 146 if config.MaxStale != 0 { 147 options.RefreshCalculator = otter.RefreshWriting[K, V]( 148 time.Duration(config.MaxAge)) 149 } 150 if config.MaxAge != 0 || config.MaxStale != 0 { 151 options.ExpiryCalculator = otter.ExpiryWriting[K, V]( 152 time.Duration(config.MaxAge + config.MaxStale)) 153 } 154 return options 155} 156 157func NewS3Backend(ctx context.Context, config *S3Config) (*S3Backend, error) { 158 client, err := minio.New(config.Endpoint, &minio.Options{ 159 Creds: credentials.NewStaticV4( 160 config.AccessKeyID, 161 config.SecretAccessKey, 162 "", 163 ), 164 Secure: !config.Insecure, 165 }) 166 if err != nil { 167 return nil, err 168 } 169 170 bucket := config.Bucket 171 exists, err := client.BucketExists(ctx, bucket) 172 if err != nil { 173 return nil, err 174 } else if !exists { 175 logc.Printf(ctx, "s3: create bucket %s\n", bucket) 176 177 err = client.MakeBucket(ctx, bucket, 178 minio.MakeBucketOptions{Region: config.Region}) 179 if err != nil { 180 return nil, err 181 } 182 183 err = (&S3Backend{client: client, bucket: bucket}). 184 EnableFeature(ctx, FeatureCheckDomainMarker) 185 if err != nil { 186 return nil, err 187 } 188 } 189 190 initS3BackendMetrics() 191 192 blobCacheMetrics := observedCacheMetrics{ 193 HitNumberCounter: blobCacheHitsCount, 194 HitWeightCounter: blobCacheHitsBytes, 195 MissNumberCounter: blobCacheMissesCount, 196 MissWeightCounter: blobCacheMissesBytes, 197 EvictionNumberCounter: blobCacheEvictionsCount, 198 EvictionWeightCounter: blobCacheEvictionsBytes, 199 } 200 blobCache, err := newObservedCache(makeCacheOptions(&config.BlobCache, 201 func(key string, value *CachedBlob) uint32 { return uint32(len(value.blob)) }), 202 blobCacheMetrics) 203 if err != nil { 204 return nil, err 205 } 206 207 siteCacheMetrics := observedCacheMetrics{ 208 HitNumberCounter: manifestCacheHitsCount, 209 MissNumberCounter: manifestCacheMissesCount, 210 EvictionNumberCounter: manifestCacheEvictionsCount, 211 } 212 siteCache, err := newObservedCache(makeCacheOptions(&config.SiteCache, 213 func(key string, value *CachedManifest) uint32 { return value.weight }), 214 siteCacheMetrics) 215 if err != nil { 216 return nil, err 217 } 218 219 featureCache, err := otter.New(&otter.Options[BackendFeature, bool]{ 220 RefreshCalculator: otter.RefreshWriting[BackendFeature, bool](10 * time.Minute), 221 }) 222 if err != nil { 223 return nil, err 224 } 225 226 return &S3Backend{client, bucket, blobCache, siteCache, featureCache}, nil 227} 228 229func (s3 *S3Backend) Backend() Backend { 230 return s3 231} 232 233func blobObjectName(name string) string { 234 return fmt.Sprintf("blob/%s", path.Join(splitBlobName(name)...)) 235} 236 237func storeFeatureObjectName(feature BackendFeature) string { 238 return fmt.Sprintf("meta/feature/%s", feature) 239} 240 241func (s3 *S3Backend) HasFeature(ctx context.Context, feature BackendFeature) bool { 242 loader := func(ctx context.Context, feature BackendFeature) (bool, error) { 243 _, err := s3.client.StatObject(ctx, s3.bucket, storeFeatureObjectName(feature), 244 minio.StatObjectOptions{}) 245 if err != nil { 246 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { 247 logc.Printf(ctx, "s3 feature %q: disabled", feature) 248 return false, nil 249 } else { 250 return false, err 251 } 252 } 253 logc.Printf(ctx, "s3 feature %q: enabled", feature) 254 return true, nil 255 } 256 257 isOn, err := s3.featureCache.Get(ctx, feature, otter.LoaderFunc[BackendFeature, bool](loader)) 258 if err != nil { 259 err = fmt.Errorf("getting s3 backend feature %q: %w", feature, err) 260 ObserveError(err) 261 logc.Println(ctx, err) 262 return false 263 } 264 return isOn 265} 266 267func (s3 *S3Backend) EnableFeature(ctx context.Context, feature BackendFeature) error { 268 _, err := s3.client.PutObject(ctx, s3.bucket, storeFeatureObjectName(feature), 269 &bytes.Reader{}, 0, minio.PutObjectOptions{}) 270 return err 271} 272 273func (s3 *S3Backend) GetBlob( 274 ctx context.Context, name string, 275) ( 276 reader io.ReadSeeker, metadata BlobMetadata, err error, 277) { 278 loader := func(ctx context.Context, name string) (*CachedBlob, error) { 279 logc.Printf(ctx, "s3: get blob %s\n", name) 280 281 startTime := time.Now() 282 283 object, err := s3.client.GetObject(ctx, s3.bucket, blobObjectName(name), 284 minio.GetObjectOptions{}) 285 // Note that many errors (e.g. NoSuchKey) will be reported only after this point. 286 if err != nil { 287 return nil, err 288 } 289 defer object.Close() 290 291 data, err := io.ReadAll(object) 292 if err != nil { 293 return nil, err 294 } 295 296 stat, err := object.Stat() 297 if err != nil { 298 return nil, err 299 } 300 301 s3GetObjectDurationSeconds. 302 With(prometheus.Labels{"kind": "blob"}). 303 Observe(time.Since(startTime).Seconds()) 304 305 return &CachedBlob{data, stat.LastModified}, nil 306 } 307 308 observer := func(ctx context.Context, name string) (*CachedBlob, error) { 309 cached, err := loader(ctx, name) 310 var code = "OK" 311 if resp, ok := err.(minio.ErrorResponse); ok { 312 code = resp.Code 313 } 314 s3GetObjectResponseCount.With(prometheus.Labels{"kind": "blob", "code": code}).Inc() 315 return cached, err 316 } 317 318 var cached *CachedBlob 319 cached, err = s3.blobCache.Get(ctx, name, otter.LoaderFunc[string, *CachedBlob](observer)) 320 if err != nil { 321 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { 322 err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key) 323 } 324 } else { 325 reader = bytes.NewReader(cached.blob) 326 metadata.Name = name 327 metadata.Size = int64(len(cached.blob)) 328 metadata.LastModified = cached.mtime 329 } 330 return 331} 332 333func (s3 *S3Backend) PutBlob(ctx context.Context, name string, data []byte) error { 334 logc.Printf(ctx, "s3: put blob %s (%s)\n", name, datasize.ByteSize(len(data)).HumanReadable()) 335 336 _, err := s3.client.StatObject(ctx, s3.bucket, blobObjectName(name), 337 minio.GetObjectOptions{}) 338 if err != nil { 339 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { 340 _, err := s3.client.PutObject(ctx, s3.bucket, blobObjectName(name), 341 bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}) 342 if err != nil { 343 return err 344 } else { 345 ObserveData(ctx, "blob.status", "created") 346 logc.Printf(ctx, "s3: put blob %s (created)\n", name) 347 return nil 348 } 349 } else { 350 return err 351 } 352 } else { 353 ObserveData(ctx, "blob.status", "exists") 354 logc.Printf(ctx, "s3: put blob %s (exists)\n", name) 355 blobsDedupedCount.Inc() 356 blobsDedupedBytes.Add(float64(len(data))) 357 return nil 358 } 359} 360 361func (s3 *S3Backend) DeleteBlob(ctx context.Context, name string) error { 362 logc.Printf(ctx, "s3: delete blob %s\n", name) 363 364 return s3.client.RemoveObject(ctx, s3.bucket, blobObjectName(name), 365 minio.RemoveObjectOptions{}) 366} 367 368func (s3 *S3Backend) EnumerateBlobs(ctx context.Context) iter.Seq2[BlobMetadata, error] { 369 return func(yield func(BlobMetadata, error) bool) { 370 logc.Print(ctx, "s3: enumerate blobs") 371 372 ctx, cancel := context.WithCancel(ctx) 373 defer cancel() 374 375 prefix := "blob/" 376 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{ 377 Prefix: prefix, 378 Recursive: true, 379 }) { 380 var metadata BlobMetadata 381 var err error 382 if err = object.Err; err == nil { 383 key := strings.TrimPrefix(object.Key, prefix) 384 if strings.HasSuffix(key, "/") { 385 continue // directory; skip 386 } else { 387 metadata.Name = joinBlobName(strings.Split(key, "/")) 388 metadata.Size = object.Size 389 metadata.LastModified = object.LastModified 390 } 391 } 392 if !yield(metadata, err) { 393 break 394 } 395 } 396 } 397} 398 399func manifestObjectName(name string) string { 400 return fmt.Sprintf("site/%s", name) 401} 402 403func stagedManifestObjectName(manifestData []byte) string { 404 return fmt.Sprintf("dirty/%x", sha256.Sum256(manifestData)) 405} 406 407type s3ManifestLoader struct { 408 s3 *S3Backend 409} 410 411func (l s3ManifestLoader) Load( 412 ctx context.Context, key string, 413) ( 414 *CachedManifest, error, 415) { 416 return l.load(ctx, key, nil) 417} 418 419func (l s3ManifestLoader) Reload( 420 ctx context.Context, key string, oldValue *CachedManifest, 421) ( 422 *CachedManifest, error, 423) { 424 return l.load(ctx, key, oldValue) 425} 426 427func (l s3ManifestLoader) load( 428 ctx context.Context, name string, oldManifest *CachedManifest, 429) ( 430 *CachedManifest, error, 431) { 432 logc.Printf(ctx, "s3: get manifest %s\n", name) 433 434 loader := func() (*CachedManifest, error) { 435 opts := minio.GetObjectOptions{} 436 if oldManifest != nil && oldManifest.metadata.ETag != "" { 437 opts.SetMatchETagExcept(oldManifest.metadata.ETag) 438 } 439 object, err := l.s3.client.GetObject(ctx, l.s3.bucket, manifestObjectName(name), opts) 440 // Note that many errors (e.g. NoSuchKey) will be reported only after this point. 441 if err != nil { 442 return nil, err 443 } 444 defer object.Close() 445 446 data, err := io.ReadAll(object) 447 if err != nil { 448 return nil, err 449 } 450 451 stat, err := object.Stat() 452 if err != nil { 453 return nil, err 454 } 455 456 manifest, err := DecodeManifest(data) 457 if err != nil { 458 return nil, err 459 } 460 461 metadata := ManifestMetadata{ 462 LastModified: stat.LastModified, 463 ETag: stat.ETag, 464 } 465 return &CachedManifest{manifest, uint32(len(data)), metadata, nil}, nil 466 } 467 468 observer := func() (*CachedManifest, error) { 469 cached, err := loader() 470 var code = "OK" 471 if resp, ok := err.(minio.ErrorResponse); ok { 472 code = resp.Code 473 } 474 s3GetObjectResponseCount.With(prometheus.Labels{"kind": "manifest", "code": code}).Inc() 475 return cached, err 476 } 477 478 startTime := time.Now() 479 cached, err := observer() 480 s3GetObjectDurationSeconds. 481 With(prometheus.Labels{"kind": "manifest"}). 482 Observe(time.Since(startTime).Seconds()) 483 484 if err != nil { 485 errResp := minio.ToErrorResponse(err) 486 if errResp.Code == "NoSuchKey" { 487 err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key) 488 return &CachedManifest{nil, 1, ManifestMetadata{}, err}, nil 489 } else if errResp.StatusCode == http.StatusNotModified && oldManifest != nil { 490 return oldManifest, nil 491 } else { 492 return nil, err 493 } 494 } else { 495 return cached, nil 496 } 497} 498 499func (s3 *S3Backend) GetManifest( 500 ctx context.Context, name string, opts GetManifestOptions, 501) ( 502 manifest *Manifest, metadata ManifestMetadata, err error, 503) { 504 if opts.BypassCache { 505 entry, found := s3.siteCache.Cache.GetEntry(name) 506 if found && entry.RefreshableAt().Before(time.Now()) { 507 s3.siteCache.Cache.Invalidate(name) 508 } 509 } 510 511 var cached *CachedManifest 512 cached, err = s3.siteCache.Get(ctx, name, s3ManifestLoader{s3}) 513 if err != nil { 514 return 515 } else { 516 // This could be `manifest, mtime, nil` or `nil, time.Time{}, ErrObjectNotFound`. 517 manifest, metadata, err = cached.manifest, cached.metadata, cached.err 518 return 519 } 520} 521 522func (s3 *S3Backend) StageManifest(ctx context.Context, manifest *Manifest) error { 523 data := EncodeManifest(manifest) 524 logc.Printf(ctx, "s3: stage manifest %x\n", sha256.Sum256(data)) 525 526 _, err := s3.client.PutObject(ctx, s3.bucket, stagedManifestObjectName(data), 527 bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}) 528 return err 529} 530 531func domainFrozenObjectName(domain string) string { 532 return manifestObjectName(fmt.Sprintf("%s/.frozen", domain)) 533} 534 535func (s3 *S3Backend) checkDomainFrozen(ctx context.Context, domain string) error { 536 _, err := s3.client.StatObject(ctx, s3.bucket, domainFrozenObjectName(domain), 537 minio.GetObjectOptions{}) 538 if err == nil { 539 return ErrDomainFrozen 540 } else if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { 541 return nil 542 } else { 543 return err 544 } 545} 546 547func (s3 *S3Backend) HasAtomicCAS(ctx context.Context) bool { 548 // Support for `If-Unmodified-Since:` or `If-Match:` for PutObject requests is very spotty: 549 // - AWS supports only `If-Match:`: 550 // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html 551 // - Minio supports `If-Match:`: 552 // https://blog.min.io/leading-the-way-minios-conditional-write-feature-for-modern-data-workloads/ 553 // - Tigris supports `If-Unmodified-Since:` and `If-Match:`, but only with `X-Tigris-Consistent: true`; 554 // https://www.tigrisdata.com/docs/objects/conditionals/ 555 // Note that the `X-Tigris-Consistent: true` header must be present on *every* transaction 556 // touching the object, not just on the CAS transactions. 557 // - Wasabi does not support either one and docs seem to suggest that the headers are ignored; 558 // - Garage does not support either one and source code suggests the headers are ignored. 559 // It seems that the only safe option is to not claim support for atomic CAS, and only do 560 // best-effort CAS implementation using HeadObject and PutObject/DeleteObject. 561 return false 562} 563 564func (s3 *S3Backend) checkManifestPrecondition( 565 ctx context.Context, name string, opts ModifyManifestOptions, 566) error { 567 if opts.IfUnmodifiedSince.IsZero() && opts.IfMatch == "" { 568 return nil 569 } 570 571 stat, err := s3.client.StatObject(ctx, s3.bucket, manifestObjectName(name), 572 minio.GetObjectOptions{}) 573 if err != nil { 574 return err 575 } 576 577 if !opts.IfUnmodifiedSince.IsZero() && stat.LastModified.Compare(opts.IfUnmodifiedSince) > 0 { 578 return fmt.Errorf("%w: If-Unmodified-Since", ErrPreconditionFailed) 579 } 580 if opts.IfMatch != "" && stat.ETag != opts.IfMatch { 581 return fmt.Errorf("%w: If-Match", ErrPreconditionFailed) 582 } 583 584 return nil 585} 586 587func (s3 *S3Backend) CommitManifest( 588 ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions, 589) error { 590 data := EncodeManifest(manifest) 591 logc.Printf(ctx, "s3: commit manifest %x -> %s", sha256.Sum256(data), name) 592 593 domain, _, _ := strings.Cut(name, "/") 594 if err := s3.checkDomainFrozen(ctx, domain); err != nil { 595 return err 596 } 597 598 if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil { 599 return err 600 } 601 602 // Remove staged object unconditionally (whether commit succeeded or failed), since 603 // the upper layer has to retry the complete operation anyway. 604 putOptions := minio.PutObjectOptions{} 605 putOptions.Header().Add("X-Tigris-Consistent", "true") 606 if opts.IfMatch != "" { 607 // Not guaranteed to do anything (see `HasAtomicCAS`), but let's try anyway; 608 // this is a "belt and suspenders" approach, together with `checkManifestPrecondition`. 609 // It does reliably work on MinIO at least. 610 putOptions.SetMatchETag(opts.IfMatch) 611 } 612 _, putErr := s3.client.PutObject(ctx, s3.bucket, manifestObjectName(name), 613 bytes.NewReader(data), int64(len(data)), putOptions) 614 removeErr := s3.client.RemoveObject(ctx, s3.bucket, stagedManifestObjectName(data), 615 minio.RemoveObjectOptions{}) 616 s3.siteCache.Cache.Invalidate(name) 617 if putErr != nil { 618 if errResp := minio.ToErrorResponse(putErr); errResp.Code == "PreconditionFailed" { 619 return ErrPreconditionFailed 620 } else { 621 return putErr 622 } 623 } else if removeErr != nil { 624 return removeErr 625 } else { 626 return nil 627 } 628} 629 630func (s3 *S3Backend) DeleteManifest( 631 ctx context.Context, name string, opts ModifyManifestOptions, 632) error { 633 logc.Printf(ctx, "s3: delete manifest %s\n", name) 634 635 domain, _, _ := strings.Cut(name, "/") 636 if err := s3.checkDomainFrozen(ctx, domain); err != nil { 637 return err 638 } 639 640 if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil { 641 return err 642 } 643 644 err := s3.client.RemoveObject(ctx, s3.bucket, manifestObjectName(name), 645 minio.RemoveObjectOptions{}) 646 if err != nil { 647 return err 648 } 649 s3.siteCache.Cache.Invalidate(name) 650 return s3.bumpLastDomainUpdateTimestamp(ctx) 651} 652 653func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] { 654 return func(yield func(*ManifestMetadata, error) bool) { 655 logc.Print(ctx, "s3: enumerate manifests") 656 657 ctx, cancel := context.WithCancel(ctx) 658 defer cancel() 659 660 prefix := "site/" 661 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{ 662 Prefix: prefix, 663 Recursive: true, 664 }) { 665 var metadata *ManifestMetadata 666 var err error 667 if err = object.Err; err == nil { 668 key := strings.TrimPrefix(object.Key, prefix) 669 _, project, _ := strings.Cut(key, "/") 670 if strings.HasSuffix(key, "/") { 671 continue // directory; skip 672 } else if project == "" || strings.HasPrefix(project, ".") && project != ".index" { 673 continue // internal; skip 674 } else { 675 metadata = &ManifestMetadata{ 676 Name: key, 677 Size: object.Size, 678 LastModified: object.LastModified, 679 ETag: object.ETag, 680 } 681 } 682 } 683 if !yield(metadata, err) { 684 break 685 } 686 } 687 } 688} 689 690// Limits the number of concurrent uploads, globally across the entire git-pages process. 691// Not currently configurable as there seems to be little need. 692var getAllManifestsSemaphore = make(chan struct{}, 64) 693 694func (s3 *S3Backend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] { 695 type result struct { 696 metadata *ManifestMetadata 697 manifest *Manifest 698 err error 699 } 700 701 resultsChan := make(chan result) 702 enumeratorCtx, cancel := context.WithCancel(ctx) 703 go func(ctx context.Context) { 704 wg := sync.WaitGroup{} 705 for metadata, err := range s3.EnumerateManifests(ctx) { 706 if err != nil { 707 resultsChan <- result{nil, nil, err} 708 } else { 709 getAllManifestsSemaphore <- struct{}{} // acquire 710 wg.Go(func() { 711 defer func() { <-getAllManifestsSemaphore }() // release 712 manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{}) 713 resultsChan <- result{metadata, manifest, err} 714 }) 715 } 716 } 717 wg.Wait() 718 close(resultsChan) 719 }(enumeratorCtx) 720 721 return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) { 722 for result := range resultsChan { 723 item := tuple[*ManifestMetadata, *Manifest]{result.metadata, result.manifest} 724 if !yield(item, result.err) { 725 cancel() 726 break 727 } 728 } 729 } 730} 731 732func domainCheckObjectName(domain string) string { 733 return manifestObjectName(fmt.Sprintf("%s/.exists", domain)) 734} 735 736func (s3 *S3Backend) CheckDomain(ctx context.Context, domain string) (exists bool, err error) { 737 logc.Printf(ctx, "s3: check domain %s\n", domain) 738 739 _, err = s3.client.StatObject(ctx, s3.bucket, domainCheckObjectName(domain), 740 minio.StatObjectOptions{}) 741 if err != nil { 742 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { 743 exists, err = false, nil 744 } 745 } else { 746 exists = true 747 } 748 749 if !exists && !s3.HasFeature(ctx, FeatureCheckDomainMarker) { 750 ctx, cancel := context.WithCancel(ctx) 751 defer cancel() 752 753 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{ 754 Prefix: manifestObjectName(fmt.Sprintf("%s/", domain)), 755 }) { 756 if object.Err != nil { 757 return false, object.Err 758 } 759 return true, nil 760 } 761 return false, nil 762 } 763 764 return 765} 766 767func (s3 *S3Backend) CreateDomain(ctx context.Context, domain string) error { 768 logc.Printf(ctx, "s3: create domain %s\n", domain) 769 770 exists, err := s3.CheckDomain(ctx, domain) 771 if err != nil { 772 return err 773 } 774 775 _, err = s3.client.PutObject(ctx, s3.bucket, domainCheckObjectName(domain), 776 &bytes.Reader{}, 0, minio.PutObjectOptions{}) 777 if err != nil { 778 return err 779 } 780 if !exists { 781 err = s3.bumpLastDomainUpdateTimestamp(ctx) 782 } 783 return err 784} 785 786func (s3 *S3Backend) FreezeDomain(ctx context.Context, domain string) error { 787 logc.Printf(ctx, "s3: freeze domain %s\n", domain) 788 789 _, err := s3.client.PutObject(ctx, s3.bucket, domainFrozenObjectName(domain), 790 &bytes.Reader{}, 0, minio.PutObjectOptions{}) 791 return err 792 793} 794 795func (s3 *S3Backend) UnfreezeDomain(ctx context.Context, domain string) error { 796 logc.Printf(ctx, "s3: unfreeze domain %s\n", domain) 797 798 err := s3.client.RemoveObject(ctx, s3.bucket, domainFrozenObjectName(domain), 799 minio.RemoveObjectOptions{}) 800 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { 801 return nil 802 } else { 803 return err 804 } 805} 806 807const lastDomainUpdateObjectName = "meta/last-domain-update" 808 809func (s3 *S3Backend) HaveDomainsChanged(ctx context.Context, since time.Time) (bool, error) { 810 info, err := s3.client.StatObject(ctx, s3.bucket, lastDomainUpdateObjectName, 811 minio.GetObjectOptions{}) 812 if err != nil { 813 return false, err 814 } 815 816 return info.LastModified.After(since), nil 817} 818 819func (s3 *S3Backend) bumpLastDomainUpdateTimestamp(ctx context.Context) error { 820 logc.Print(ctx, "s3: bumping last domain update timestamp") 821 _, err := s3.client.PutObject(ctx, s3.bucket, lastDomainUpdateObjectName, 822 &bytes.Reader{}, 0, minio.PutObjectOptions{}) 823 return err 824} 825 826func auditObjectName(id AuditID) string { 827 return fmt.Sprintf("audit/%s", id) 828} 829 830func (s3 *S3Backend) AppendAuditLog(ctx context.Context, id AuditID, record *AuditRecord) error { 831 logc.Printf(ctx, "s3: append audit %s\n", id) 832 833 name := auditObjectName(id) 834 data := EncodeAuditRecord(record) 835 836 options := minio.PutObjectOptions{} 837 options.SetMatchETagExcept("*") // may or may not be supported 838 _, err := s3.client.PutObject(ctx, s3.bucket, name, 839 bytes.NewReader(data), int64(len(data)), options) 840 if errResp := minio.ToErrorResponse(err); errResp.StatusCode == 412 { 841 panic(fmt.Errorf("audit ID collision: %s", name)) 842 } 843 return err 844} 845 846func (s3 *S3Backend) QueryAuditLog(ctx context.Context, id AuditID) (*AuditRecord, error) { 847 logc.Printf(ctx, "s3: read audit %s\n", id) 848 849 object, err := s3.client.GetObject(ctx, s3.bucket, auditObjectName(id), 850 minio.GetObjectOptions{}) 851 if err != nil { 852 return nil, err 853 } 854 defer object.Close() 855 856 data, err := io.ReadAll(object) 857 if err != nil { 858 return nil, err 859 } 860 861 return DecodeAuditRecord(data) 862} 863 864func (s3 *S3Backend) SearchAuditLog( 865 ctx context.Context, opts SearchAuditLogOptions, 866) iter.Seq2[AuditID, error] { 867 return func(yield func(AuditID, error) bool) { 868 logc.Printf(ctx, "s3: search audit\n") 869 870 ctx, cancel := context.WithCancel(ctx) 871 defer cancel() 872 873 prefix := "audit/" 874 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{ 875 Prefix: prefix, 876 }) { 877 var id AuditID 878 var err error 879 if object.Err != nil { 880 err = object.Err 881 } else { 882 id, err = ParseAuditID(strings.TrimPrefix(object.Key, prefix)) 883 } 884 if !yield(id, err) { 885 break 886 } 887 } 888 } 889} 890 891var getAuditLogRecordsSemaphore = make(chan struct{}, 64) 892 893func (s3 *S3Backend) GetAuditLogRecords( 894 ctx context.Context, ids iter.Seq2[AuditID, error], 895) iter.Seq2[*AuditRecord, error] { 896 return func(yield func(*AuditRecord, error) bool) { 897 resultsChan := make(chan tuple[*AuditRecord, error]) 898 enumeratorCtx, cancel := context.WithCancel(ctx) 899 defer cancel() 900 901 go func(ctx context.Context) { 902 wg := sync.WaitGroup{} 903 for id, err := range ids { 904 if err != nil { 905 resultsChan <- tuple[*AuditRecord, error]{nil, err} 906 } else { 907 getAuditLogRecordsSemaphore <- struct{}{} // acquire 908 wg.Go(func() { 909 defer func() { <-getAuditLogRecordsSemaphore }() // release 910 record, err := s3.QueryAuditLog(ctx, id) 911 resultsChan <- tuple[*AuditRecord, error]{record, err} 912 }) 913 } 914 } 915 wg.Wait() 916 close(resultsChan) 917 }(enumeratorCtx) 918 919 for result := range resultsChan { 920 record, err := result.Splat() 921 if !yield(record, err) { 922 break 923 } 924 } 925 } 926} 927 928func (s3 *S3Backend) ExpireAuditRecord(ctx context.Context, id AuditID) error { 929 logc.Printf(ctx, "s3: expire audit record %s\n", id) 930 931 return s3.client.RemoveObject(ctx, s3.bucket, auditObjectName(id), 932 minio.RemoveObjectOptions{}) 933}