[mirror] Scalable static site server for Git forges (like GitHub Pages)
1package git_pages
2
3import (
4 "bytes"
5 "context"
6 "crypto/sha256"
7 "fmt"
8 "io"
9 "iter"
10 "net/http"
11 "path"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/c2h5oh/datasize"
17 "github.com/maypok86/otter/v2"
18 "github.com/minio/minio-go/v7"
19 "github.com/minio/minio-go/v7/pkg/credentials"
20 "github.com/prometheus/client_golang/prometheus"
21 "github.com/prometheus/client_golang/prometheus/promauto"
22)
23
24var (
25 blobsDedupedCount prometheus.Counter
26 blobsDedupedBytes prometheus.Counter
27
28 blobCacheHitsCount prometheus.Counter
29 blobCacheHitsBytes prometheus.Counter
30 blobCacheMissesCount prometheus.Counter
31 blobCacheMissesBytes prometheus.Counter
32 blobCacheEvictionsCount prometheus.Counter
33 blobCacheEvictionsBytes prometheus.Counter
34
35 manifestCacheHitsCount prometheus.Counter
36 manifestCacheMissesCount prometheus.Counter
37 manifestCacheEvictionsCount prometheus.Counter
38
39 s3GetObjectDurationSeconds *prometheus.HistogramVec
40 s3GetObjectResponseCount *prometheus.CounterVec
41)
42
43func initS3BackendMetrics() {
44 blobsDedupedCount = promauto.NewCounter(prometheus.CounterOpts{
45 Name: "git_pages_blobs_deduped",
46 Help: "Count of blobs deduplicated",
47 })
48 blobsDedupedBytes = promauto.NewCounter(prometheus.CounterOpts{
49 Name: "git_pages_blobs_deduped_bytes",
50 Help: "Total size in bytes of blobs deduplicated",
51 })
52
53 blobCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{
54 Name: "git_pages_blob_cache_hits_count",
55 Help: "Count of blobs that were retrieved from the cache",
56 })
57 blobCacheHitsBytes = promauto.NewCounter(prometheus.CounterOpts{
58 Name: "git_pages_blob_cache_hits_bytes",
59 Help: "Total size in bytes of blobs that were retrieved from the cache",
60 })
61 blobCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{
62 Name: "git_pages_blob_cache_misses_count",
63 Help: "Count of blobs that were not found in the cache (and were then successfully cached)",
64 })
65 blobCacheMissesBytes = promauto.NewCounter(prometheus.CounterOpts{
66 Name: "git_pages_blob_cache_misses_bytes",
67 Help: "Total size in bytes of blobs that were not found in the cache (and were then successfully cached)",
68 })
69 blobCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{
70 Name: "git_pages_blob_cache_evictions_count",
71 Help: "Count of blobs evicted from the cache",
72 })
73 blobCacheEvictionsBytes = promauto.NewCounter(prometheus.CounterOpts{
74 Name: "git_pages_blob_cache_evictions_bytes",
75 Help: "Total size in bytes of blobs evicted from the cache",
76 })
77
78 manifestCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{
79 Name: "git_pages_manifest_cache_hits_count",
80 Help: "Count of manifests that were retrieved from the cache",
81 })
82 manifestCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{
83 Name: "git_pages_manifest_cache_misses_count",
84 Help: "Count of manifests that were not found in the cache (and were then successfully cached)",
85 })
86 manifestCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{
87 Name: "git_pages_manifest_cache_evictions_count",
88 Help: "Count of manifests evicted from the cache",
89 })
90
91 s3GetObjectDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
92 Name: "git_pages_s3_get_object_duration_seconds",
93 Help: "Time to read a whole object from S3",
94 Buckets: []float64{.01, .025, .05, .1, .25, .5, .75, 1, 1.25, 1.5, 1.75, 2, 2.5, 5, 10},
95
96 NativeHistogramBucketFactor: 1.1,
97 NativeHistogramMaxBucketNumber: 100,
98 NativeHistogramMinResetDuration: 10 * time.Minute,
99 }, []string{"kind"})
100 s3GetObjectResponseCount = promauto.NewCounterVec(prometheus.CounterOpts{
101 Name: "git_pages_s3_get_object_responses_count",
102 Help: "Count of s3:GetObject responses",
103 }, []string{"kind", "code"})
104}
105
106// Blobs can be safely cached indefinitely. They only need to be evicted to preserve memory.
107type CachedBlob struct {
108 blob []byte
109 mtime time.Time
110}
111
112func (c *CachedBlob) Weight() uint32 { return uint32(len(c.blob)) }
113
114// Manifests can only be cached for a short time to avoid serving stale content. Browser
115// page loads cause a large burst of manifest accesses that are essential for serving
116// `304 No Content` responses and these need to be handled very quickly, so both hits and
117// misses are cached.
118type CachedManifest struct {
119 manifest *Manifest
120 weight uint32
121 metadata ManifestMetadata
122 err error
123}
124
125func (c *CachedManifest) Weight() uint32 { return c.weight }
126
127type S3Backend struct {
128 client *minio.Client
129 bucket string
130 blobCache *observedCache[string, *CachedBlob]
131 siteCache *observedCache[string, *CachedManifest]
132 featureCache *otter.Cache[BackendFeature, bool]
133}
134
135var _ Backend = (*S3Backend)(nil)
136
137func makeCacheOptions[K comparable, V any](
138 config *CacheConfig,
139 weigher func(K, V) uint32,
140) *otter.Options[K, V] {
141 options := &otter.Options[K, V]{}
142 if config.MaxSize != 0 {
143 options.MaximumWeight = config.MaxSize.Bytes()
144 options.Weigher = weigher
145 }
146 if config.MaxStale != 0 {
147 options.RefreshCalculator = otter.RefreshWriting[K, V](
148 time.Duration(config.MaxAge))
149 }
150 if config.MaxAge != 0 || config.MaxStale != 0 {
151 options.ExpiryCalculator = otter.ExpiryWriting[K, V](
152 time.Duration(config.MaxAge + config.MaxStale))
153 }
154 return options
155}
156
157func NewS3Backend(ctx context.Context, config *S3Config) (*S3Backend, error) {
158 client, err := minio.New(config.Endpoint, &minio.Options{
159 Creds: credentials.NewStaticV4(
160 config.AccessKeyID,
161 config.SecretAccessKey,
162 "",
163 ),
164 Secure: !config.Insecure,
165 })
166 if err != nil {
167 return nil, err
168 }
169
170 bucket := config.Bucket
171 exists, err := client.BucketExists(ctx, bucket)
172 if err != nil {
173 return nil, err
174 } else if !exists {
175 logc.Printf(ctx, "s3: create bucket %s\n", bucket)
176
177 err = client.MakeBucket(ctx, bucket,
178 minio.MakeBucketOptions{Region: config.Region})
179 if err != nil {
180 return nil, err
181 }
182
183 err = (&S3Backend{client: client, bucket: bucket}).
184 EnableFeature(ctx, FeatureCheckDomainMarker)
185 if err != nil {
186 return nil, err
187 }
188 }
189
190 initS3BackendMetrics()
191
192 blobCacheMetrics := observedCacheMetrics{
193 HitNumberCounter: blobCacheHitsCount,
194 HitWeightCounter: blobCacheHitsBytes,
195 MissNumberCounter: blobCacheMissesCount,
196 MissWeightCounter: blobCacheMissesBytes,
197 EvictionNumberCounter: blobCacheEvictionsCount,
198 EvictionWeightCounter: blobCacheEvictionsBytes,
199 }
200 blobCache, err := newObservedCache(makeCacheOptions(&config.BlobCache,
201 func(key string, value *CachedBlob) uint32 { return uint32(len(value.blob)) }),
202 blobCacheMetrics)
203 if err != nil {
204 return nil, err
205 }
206
207 siteCacheMetrics := observedCacheMetrics{
208 HitNumberCounter: manifestCacheHitsCount,
209 MissNumberCounter: manifestCacheMissesCount,
210 EvictionNumberCounter: manifestCacheEvictionsCount,
211 }
212 siteCache, err := newObservedCache(makeCacheOptions(&config.SiteCache,
213 func(key string, value *CachedManifest) uint32 { return value.weight }),
214 siteCacheMetrics)
215 if err != nil {
216 return nil, err
217 }
218
219 featureCache, err := otter.New(&otter.Options[BackendFeature, bool]{
220 RefreshCalculator: otter.RefreshWriting[BackendFeature, bool](10 * time.Minute),
221 })
222 if err != nil {
223 return nil, err
224 }
225
226 return &S3Backend{client, bucket, blobCache, siteCache, featureCache}, nil
227}
228
229func (s3 *S3Backend) Backend() Backend {
230 return s3
231}
232
233func blobObjectName(name string) string {
234 return fmt.Sprintf("blob/%s", path.Join(splitBlobName(name)...))
235}
236
237func storeFeatureObjectName(feature BackendFeature) string {
238 return fmt.Sprintf("meta/feature/%s", feature)
239}
240
241func (s3 *S3Backend) HasFeature(ctx context.Context, feature BackendFeature) bool {
242 loader := func(ctx context.Context, feature BackendFeature) (bool, error) {
243 _, err := s3.client.StatObject(ctx, s3.bucket, storeFeatureObjectName(feature),
244 minio.StatObjectOptions{})
245 if err != nil {
246 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
247 logc.Printf(ctx, "s3 feature %q: disabled", feature)
248 return false, nil
249 } else {
250 return false, err
251 }
252 }
253 logc.Printf(ctx, "s3 feature %q: enabled", feature)
254 return true, nil
255 }
256
257 isOn, err := s3.featureCache.Get(ctx, feature, otter.LoaderFunc[BackendFeature, bool](loader))
258 if err != nil {
259 err = fmt.Errorf("getting s3 backend feature %q: %w", feature, err)
260 ObserveError(err)
261 logc.Println(ctx, err)
262 return false
263 }
264 return isOn
265}
266
267func (s3 *S3Backend) EnableFeature(ctx context.Context, feature BackendFeature) error {
268 _, err := s3.client.PutObject(ctx, s3.bucket, storeFeatureObjectName(feature),
269 &bytes.Reader{}, 0, minio.PutObjectOptions{})
270 return err
271}
272
273func (s3 *S3Backend) GetBlob(
274 ctx context.Context, name string,
275) (
276 reader io.ReadSeeker, metadata BlobMetadata, err error,
277) {
278 loader := func(ctx context.Context, name string) (*CachedBlob, error) {
279 logc.Printf(ctx, "s3: get blob %s\n", name)
280
281 startTime := time.Now()
282
283 object, err := s3.client.GetObject(ctx, s3.bucket, blobObjectName(name),
284 minio.GetObjectOptions{})
285 // Note that many errors (e.g. NoSuchKey) will be reported only after this point.
286 if err != nil {
287 return nil, err
288 }
289 defer object.Close()
290
291 data, err := io.ReadAll(object)
292 if err != nil {
293 return nil, err
294 }
295
296 stat, err := object.Stat()
297 if err != nil {
298 return nil, err
299 }
300
301 s3GetObjectDurationSeconds.
302 With(prometheus.Labels{"kind": "blob"}).
303 Observe(time.Since(startTime).Seconds())
304
305 return &CachedBlob{data, stat.LastModified}, nil
306 }
307
308 observer := func(ctx context.Context, name string) (*CachedBlob, error) {
309 cached, err := loader(ctx, name)
310 var code = "OK"
311 if resp, ok := err.(minio.ErrorResponse); ok {
312 code = resp.Code
313 }
314 s3GetObjectResponseCount.With(prometheus.Labels{"kind": "blob", "code": code}).Inc()
315 return cached, err
316 }
317
318 var cached *CachedBlob
319 cached, err = s3.blobCache.Get(ctx, name, otter.LoaderFunc[string, *CachedBlob](observer))
320 if err != nil {
321 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
322 err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key)
323 }
324 } else {
325 reader = bytes.NewReader(cached.blob)
326 metadata.Name = name
327 metadata.Size = int64(len(cached.blob))
328 metadata.LastModified = cached.mtime
329 }
330 return
331}
332
333func (s3 *S3Backend) PutBlob(ctx context.Context, name string, data []byte) error {
334 logc.Printf(ctx, "s3: put blob %s (%s)\n", name, datasize.ByteSize(len(data)).HumanReadable())
335
336 _, err := s3.client.StatObject(ctx, s3.bucket, blobObjectName(name),
337 minio.GetObjectOptions{})
338 if err != nil {
339 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
340 _, err := s3.client.PutObject(ctx, s3.bucket, blobObjectName(name),
341 bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
342 if err != nil {
343 return err
344 } else {
345 ObserveData(ctx, "blob.status", "created")
346 logc.Printf(ctx, "s3: put blob %s (created)\n", name)
347 return nil
348 }
349 } else {
350 return err
351 }
352 } else {
353 ObserveData(ctx, "blob.status", "exists")
354 logc.Printf(ctx, "s3: put blob %s (exists)\n", name)
355 blobsDedupedCount.Inc()
356 blobsDedupedBytes.Add(float64(len(data)))
357 return nil
358 }
359}
360
361func (s3 *S3Backend) DeleteBlob(ctx context.Context, name string) error {
362 logc.Printf(ctx, "s3: delete blob %s\n", name)
363
364 return s3.client.RemoveObject(ctx, s3.bucket, blobObjectName(name),
365 minio.RemoveObjectOptions{})
366}
367
368func (s3 *S3Backend) EnumerateBlobs(ctx context.Context) iter.Seq2[BlobMetadata, error] {
369 return func(yield func(BlobMetadata, error) bool) {
370 logc.Print(ctx, "s3: enumerate blobs")
371
372 ctx, cancel := context.WithCancel(ctx)
373 defer cancel()
374
375 prefix := "blob/"
376 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
377 Prefix: prefix,
378 Recursive: true,
379 }) {
380 var metadata BlobMetadata
381 var err error
382 if err = object.Err; err == nil {
383 key := strings.TrimPrefix(object.Key, prefix)
384 if strings.HasSuffix(key, "/") {
385 continue // directory; skip
386 } else {
387 metadata.Name = joinBlobName(strings.Split(key, "/"))
388 metadata.Size = object.Size
389 metadata.LastModified = object.LastModified
390 }
391 }
392 if !yield(metadata, err) {
393 break
394 }
395 }
396 }
397}
398
399func manifestObjectName(name string) string {
400 return fmt.Sprintf("site/%s", name)
401}
402
403func stagedManifestObjectName(manifestData []byte) string {
404 return fmt.Sprintf("dirty/%x", sha256.Sum256(manifestData))
405}
406
407type s3ManifestLoader struct {
408 s3 *S3Backend
409}
410
411func (l s3ManifestLoader) Load(
412 ctx context.Context, key string,
413) (
414 *CachedManifest, error,
415) {
416 return l.load(ctx, key, nil)
417}
418
419func (l s3ManifestLoader) Reload(
420 ctx context.Context, key string, oldValue *CachedManifest,
421) (
422 *CachedManifest, error,
423) {
424 return l.load(ctx, key, oldValue)
425}
426
427func (l s3ManifestLoader) load(
428 ctx context.Context, name string, oldManifest *CachedManifest,
429) (
430 *CachedManifest, error,
431) {
432 logc.Printf(ctx, "s3: get manifest %s\n", name)
433
434 loader := func() (*CachedManifest, error) {
435 opts := minio.GetObjectOptions{}
436 if oldManifest != nil && oldManifest.metadata.ETag != "" {
437 opts.SetMatchETagExcept(oldManifest.metadata.ETag)
438 }
439 object, err := l.s3.client.GetObject(ctx, l.s3.bucket, manifestObjectName(name), opts)
440 // Note that many errors (e.g. NoSuchKey) will be reported only after this point.
441 if err != nil {
442 return nil, err
443 }
444 defer object.Close()
445
446 data, err := io.ReadAll(object)
447 if err != nil {
448 return nil, err
449 }
450
451 stat, err := object.Stat()
452 if err != nil {
453 return nil, err
454 }
455
456 manifest, err := DecodeManifest(data)
457 if err != nil {
458 return nil, err
459 }
460
461 metadata := ManifestMetadata{
462 LastModified: stat.LastModified,
463 ETag: stat.ETag,
464 }
465 return &CachedManifest{manifest, uint32(len(data)), metadata, nil}, nil
466 }
467
468 observer := func() (*CachedManifest, error) {
469 cached, err := loader()
470 var code = "OK"
471 if resp, ok := err.(minio.ErrorResponse); ok {
472 code = resp.Code
473 }
474 s3GetObjectResponseCount.With(prometheus.Labels{"kind": "manifest", "code": code}).Inc()
475 return cached, err
476 }
477
478 startTime := time.Now()
479 cached, err := observer()
480 s3GetObjectDurationSeconds.
481 With(prometheus.Labels{"kind": "manifest"}).
482 Observe(time.Since(startTime).Seconds())
483
484 if err != nil {
485 errResp := minio.ToErrorResponse(err)
486 if errResp.Code == "NoSuchKey" {
487 err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key)
488 return &CachedManifest{nil, 1, ManifestMetadata{}, err}, nil
489 } else if errResp.StatusCode == http.StatusNotModified && oldManifest != nil {
490 return oldManifest, nil
491 } else {
492 return nil, err
493 }
494 } else {
495 return cached, nil
496 }
497}
498
499func (s3 *S3Backend) GetManifest(
500 ctx context.Context, name string, opts GetManifestOptions,
501) (
502 manifest *Manifest, metadata ManifestMetadata, err error,
503) {
504 if opts.BypassCache {
505 entry, found := s3.siteCache.Cache.GetEntry(name)
506 if found && entry.RefreshableAt().Before(time.Now()) {
507 s3.siteCache.Cache.Invalidate(name)
508 }
509 }
510
511 var cached *CachedManifest
512 cached, err = s3.siteCache.Get(ctx, name, s3ManifestLoader{s3})
513 if err != nil {
514 return
515 } else {
516 // This could be `manifest, mtime, nil` or `nil, time.Time{}, ErrObjectNotFound`.
517 manifest, metadata, err = cached.manifest, cached.metadata, cached.err
518 return
519 }
520}
521
522func (s3 *S3Backend) StageManifest(ctx context.Context, manifest *Manifest) error {
523 data := EncodeManifest(manifest)
524 logc.Printf(ctx, "s3: stage manifest %x\n", sha256.Sum256(data))
525
526 _, err := s3.client.PutObject(ctx, s3.bucket, stagedManifestObjectName(data),
527 bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
528 return err
529}
530
531func domainFrozenObjectName(domain string) string {
532 return manifestObjectName(fmt.Sprintf("%s/.frozen", domain))
533}
534
535func (s3 *S3Backend) checkDomainFrozen(ctx context.Context, domain string) error {
536 _, err := s3.client.StatObject(ctx, s3.bucket, domainFrozenObjectName(domain),
537 minio.GetObjectOptions{})
538 if err == nil {
539 return ErrDomainFrozen
540 } else if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
541 return nil
542 } else {
543 return err
544 }
545}
546
547func (s3 *S3Backend) HasAtomicCAS(ctx context.Context) bool {
548 // Support for `If-Unmodified-Since:` or `If-Match:` for PutObject requests is very spotty:
549 // - AWS supports only `If-Match:`:
550 // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
551 // - Minio supports `If-Match:`:
552 // https://blog.min.io/leading-the-way-minios-conditional-write-feature-for-modern-data-workloads/
553 // - Tigris supports `If-Unmodified-Since:` and `If-Match:`, but only with `X-Tigris-Consistent: true`;
554 // https://www.tigrisdata.com/docs/objects/conditionals/
555 // Note that the `X-Tigris-Consistent: true` header must be present on *every* transaction
556 // touching the object, not just on the CAS transactions.
557 // - Wasabi does not support either one and docs seem to suggest that the headers are ignored;
558 // - Garage does not support either one and source code suggests the headers are ignored.
559 // It seems that the only safe option is to not claim support for atomic CAS, and only do
560 // best-effort CAS implementation using HeadObject and PutObject/DeleteObject.
561 return false
562}
563
564func (s3 *S3Backend) checkManifestPrecondition(
565 ctx context.Context, name string, opts ModifyManifestOptions,
566) error {
567 if opts.IfUnmodifiedSince.IsZero() && opts.IfMatch == "" {
568 return nil
569 }
570
571 stat, err := s3.client.StatObject(ctx, s3.bucket, manifestObjectName(name),
572 minio.GetObjectOptions{})
573 if err != nil {
574 return err
575 }
576
577 if !opts.IfUnmodifiedSince.IsZero() && stat.LastModified.Compare(opts.IfUnmodifiedSince) > 0 {
578 return fmt.Errorf("%w: If-Unmodified-Since", ErrPreconditionFailed)
579 }
580 if opts.IfMatch != "" && stat.ETag != opts.IfMatch {
581 return fmt.Errorf("%w: If-Match", ErrPreconditionFailed)
582 }
583
584 return nil
585}
586
587func (s3 *S3Backend) CommitManifest(
588 ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions,
589) error {
590 data := EncodeManifest(manifest)
591 logc.Printf(ctx, "s3: commit manifest %x -> %s", sha256.Sum256(data), name)
592
593 domain, _, _ := strings.Cut(name, "/")
594 if err := s3.checkDomainFrozen(ctx, domain); err != nil {
595 return err
596 }
597
598 if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil {
599 return err
600 }
601
602 // Remove staged object unconditionally (whether commit succeeded or failed), since
603 // the upper layer has to retry the complete operation anyway.
604 putOptions := minio.PutObjectOptions{}
605 putOptions.Header().Add("X-Tigris-Consistent", "true")
606 if opts.IfMatch != "" {
607 // Not guaranteed to do anything (see `HasAtomicCAS`), but let's try anyway;
608 // this is a "belt and suspenders" approach, together with `checkManifestPrecondition`.
609 // It does reliably work on MinIO at least.
610 putOptions.SetMatchETag(opts.IfMatch)
611 }
612 _, putErr := s3.client.PutObject(ctx, s3.bucket, manifestObjectName(name),
613 bytes.NewReader(data), int64(len(data)), putOptions)
614 removeErr := s3.client.RemoveObject(ctx, s3.bucket, stagedManifestObjectName(data),
615 minio.RemoveObjectOptions{})
616 s3.siteCache.Cache.Invalidate(name)
617 if putErr != nil {
618 if errResp := minio.ToErrorResponse(putErr); errResp.Code == "PreconditionFailed" {
619 return ErrPreconditionFailed
620 } else {
621 return putErr
622 }
623 } else if removeErr != nil {
624 return removeErr
625 } else {
626 return nil
627 }
628}
629
630func (s3 *S3Backend) DeleteManifest(
631 ctx context.Context, name string, opts ModifyManifestOptions,
632) error {
633 logc.Printf(ctx, "s3: delete manifest %s\n", name)
634
635 domain, _, _ := strings.Cut(name, "/")
636 if err := s3.checkDomainFrozen(ctx, domain); err != nil {
637 return err
638 }
639
640 if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil {
641 return err
642 }
643
644 err := s3.client.RemoveObject(ctx, s3.bucket, manifestObjectName(name),
645 minio.RemoveObjectOptions{})
646 if err != nil {
647 return err
648 }
649 s3.siteCache.Cache.Invalidate(name)
650 return s3.bumpLastDomainUpdateTimestamp(ctx)
651}
652
653func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] {
654 return func(yield func(*ManifestMetadata, error) bool) {
655 logc.Print(ctx, "s3: enumerate manifests")
656
657 ctx, cancel := context.WithCancel(ctx)
658 defer cancel()
659
660 prefix := "site/"
661 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
662 Prefix: prefix,
663 Recursive: true,
664 }) {
665 var metadata *ManifestMetadata
666 var err error
667 if err = object.Err; err == nil {
668 key := strings.TrimPrefix(object.Key, prefix)
669 _, project, _ := strings.Cut(key, "/")
670 if strings.HasSuffix(key, "/") {
671 continue // directory; skip
672 } else if project == "" || strings.HasPrefix(project, ".") && project != ".index" {
673 continue // internal; skip
674 } else {
675 metadata = &ManifestMetadata{
676 Name: key,
677 Size: object.Size,
678 LastModified: object.LastModified,
679 ETag: object.ETag,
680 }
681 }
682 }
683 if !yield(metadata, err) {
684 break
685 }
686 }
687 }
688}
689
690// Limits the number of concurrent uploads, globally across the entire git-pages process.
691// Not currently configurable as there seems to be little need.
692var getAllManifestsSemaphore = make(chan struct{}, 64)
693
694func (s3 *S3Backend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] {
695 type result struct {
696 metadata *ManifestMetadata
697 manifest *Manifest
698 err error
699 }
700
701 resultsChan := make(chan result)
702 enumeratorCtx, cancel := context.WithCancel(ctx)
703 go func(ctx context.Context) {
704 wg := sync.WaitGroup{}
705 for metadata, err := range s3.EnumerateManifests(ctx) {
706 if err != nil {
707 resultsChan <- result{nil, nil, err}
708 } else {
709 getAllManifestsSemaphore <- struct{}{} // acquire
710 wg.Go(func() {
711 defer func() { <-getAllManifestsSemaphore }() // release
712 manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{})
713 resultsChan <- result{metadata, manifest, err}
714 })
715 }
716 }
717 wg.Wait()
718 close(resultsChan)
719 }(enumeratorCtx)
720
721 return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) {
722 for result := range resultsChan {
723 item := tuple[*ManifestMetadata, *Manifest]{result.metadata, result.manifest}
724 if !yield(item, result.err) {
725 cancel()
726 break
727 }
728 }
729 }
730}
731
732func domainCheckObjectName(domain string) string {
733 return manifestObjectName(fmt.Sprintf("%s/.exists", domain))
734}
735
736func (s3 *S3Backend) CheckDomain(ctx context.Context, domain string) (exists bool, err error) {
737 logc.Printf(ctx, "s3: check domain %s\n", domain)
738
739 _, err = s3.client.StatObject(ctx, s3.bucket, domainCheckObjectName(domain),
740 minio.StatObjectOptions{})
741 if err != nil {
742 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
743 exists, err = false, nil
744 }
745 } else {
746 exists = true
747 }
748
749 if !exists && !s3.HasFeature(ctx, FeatureCheckDomainMarker) {
750 ctx, cancel := context.WithCancel(ctx)
751 defer cancel()
752
753 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
754 Prefix: manifestObjectName(fmt.Sprintf("%s/", domain)),
755 }) {
756 if object.Err != nil {
757 return false, object.Err
758 }
759 return true, nil
760 }
761 return false, nil
762 }
763
764 return
765}
766
767func (s3 *S3Backend) CreateDomain(ctx context.Context, domain string) error {
768 logc.Printf(ctx, "s3: create domain %s\n", domain)
769
770 exists, err := s3.CheckDomain(ctx, domain)
771 if err != nil {
772 return err
773 }
774
775 _, err = s3.client.PutObject(ctx, s3.bucket, domainCheckObjectName(domain),
776 &bytes.Reader{}, 0, minio.PutObjectOptions{})
777 if err != nil {
778 return err
779 }
780 if !exists {
781 err = s3.bumpLastDomainUpdateTimestamp(ctx)
782 }
783 return err
784}
785
786func (s3 *S3Backend) FreezeDomain(ctx context.Context, domain string) error {
787 logc.Printf(ctx, "s3: freeze domain %s\n", domain)
788
789 _, err := s3.client.PutObject(ctx, s3.bucket, domainFrozenObjectName(domain),
790 &bytes.Reader{}, 0, minio.PutObjectOptions{})
791 return err
792
793}
794
795func (s3 *S3Backend) UnfreezeDomain(ctx context.Context, domain string) error {
796 logc.Printf(ctx, "s3: unfreeze domain %s\n", domain)
797
798 err := s3.client.RemoveObject(ctx, s3.bucket, domainFrozenObjectName(domain),
799 minio.RemoveObjectOptions{})
800 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
801 return nil
802 } else {
803 return err
804 }
805}
806
807const lastDomainUpdateObjectName = "meta/last-domain-update"
808
809func (s3 *S3Backend) HaveDomainsChanged(ctx context.Context, since time.Time) (bool, error) {
810 info, err := s3.client.StatObject(ctx, s3.bucket, lastDomainUpdateObjectName,
811 minio.GetObjectOptions{})
812 if err != nil {
813 return false, err
814 }
815
816 return info.LastModified.After(since), nil
817}
818
819func (s3 *S3Backend) bumpLastDomainUpdateTimestamp(ctx context.Context) error {
820 logc.Print(ctx, "s3: bumping last domain update timestamp")
821 _, err := s3.client.PutObject(ctx, s3.bucket, lastDomainUpdateObjectName,
822 &bytes.Reader{}, 0, minio.PutObjectOptions{})
823 return err
824}
825
826func auditObjectName(id AuditID) string {
827 return fmt.Sprintf("audit/%s", id)
828}
829
830func (s3 *S3Backend) AppendAuditLog(ctx context.Context, id AuditID, record *AuditRecord) error {
831 logc.Printf(ctx, "s3: append audit %s\n", id)
832
833 name := auditObjectName(id)
834 data := EncodeAuditRecord(record)
835
836 options := minio.PutObjectOptions{}
837 options.SetMatchETagExcept("*") // may or may not be supported
838 _, err := s3.client.PutObject(ctx, s3.bucket, name,
839 bytes.NewReader(data), int64(len(data)), options)
840 if errResp := minio.ToErrorResponse(err); errResp.StatusCode == 412 {
841 panic(fmt.Errorf("audit ID collision: %s", name))
842 }
843 return err
844}
845
846func (s3 *S3Backend) QueryAuditLog(ctx context.Context, id AuditID) (*AuditRecord, error) {
847 logc.Printf(ctx, "s3: read audit %s\n", id)
848
849 object, err := s3.client.GetObject(ctx, s3.bucket, auditObjectName(id),
850 minio.GetObjectOptions{})
851 if err != nil {
852 return nil, err
853 }
854 defer object.Close()
855
856 data, err := io.ReadAll(object)
857 if err != nil {
858 return nil, err
859 }
860
861 return DecodeAuditRecord(data)
862}
863
864func (s3 *S3Backend) SearchAuditLog(
865 ctx context.Context, opts SearchAuditLogOptions,
866) iter.Seq2[AuditID, error] {
867 return func(yield func(AuditID, error) bool) {
868 logc.Printf(ctx, "s3: search audit\n")
869
870 ctx, cancel := context.WithCancel(ctx)
871 defer cancel()
872
873 prefix := "audit/"
874 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
875 Prefix: prefix,
876 }) {
877 var id AuditID
878 var err error
879 if object.Err != nil {
880 err = object.Err
881 } else {
882 id, err = ParseAuditID(strings.TrimPrefix(object.Key, prefix))
883 }
884 if !yield(id, err) {
885 break
886 }
887 }
888 }
889}
890
891var getAuditLogRecordsSemaphore = make(chan struct{}, 64)
892
893func (s3 *S3Backend) GetAuditLogRecords(
894 ctx context.Context, ids iter.Seq2[AuditID, error],
895) iter.Seq2[*AuditRecord, error] {
896 return func(yield func(*AuditRecord, error) bool) {
897 resultsChan := make(chan tuple[*AuditRecord, error])
898 enumeratorCtx, cancel := context.WithCancel(ctx)
899 defer cancel()
900
901 go func(ctx context.Context) {
902 wg := sync.WaitGroup{}
903 for id, err := range ids {
904 if err != nil {
905 resultsChan <- tuple[*AuditRecord, error]{nil, err}
906 } else {
907 getAuditLogRecordsSemaphore <- struct{}{} // acquire
908 wg.Go(func() {
909 defer func() { <-getAuditLogRecordsSemaphore }() // release
910 record, err := s3.QueryAuditLog(ctx, id)
911 resultsChan <- tuple[*AuditRecord, error]{record, err}
912 })
913 }
914 }
915 wg.Wait()
916 close(resultsChan)
917 }(enumeratorCtx)
918
919 for result := range resultsChan {
920 record, err := result.Splat()
921 if !yield(record, err) {
922 break
923 }
924 }
925 }
926}
927
928func (s3 *S3Backend) ExpireAuditRecord(ctx context.Context, id AuditID) error {
929 logc.Printf(ctx, "s3: expire audit record %s\n", id)
930
931 return s3.client.RemoveObject(ctx, s3.bucket, auditObjectName(id),
932 minio.RemoveObjectOptions{})
933}