forked from
whitequark.org/git-pages
fork of whitequark.org/git-pages with mods for tangled
1package git_pages
2
3import (
4 "bytes"
5 "context"
6 "crypto/sha256"
7 "fmt"
8 "io"
9 "net/http"
10 "path"
11 "strings"
12 "time"
13
14 "github.com/c2h5oh/datasize"
15 "github.com/maypok86/otter/v2"
16 "github.com/minio/minio-go/v7"
17 "github.com/minio/minio-go/v7/pkg/credentials"
18 "github.com/prometheus/client_golang/prometheus"
19 "github.com/prometheus/client_golang/prometheus/promauto"
20)
21
22var (
23 blobsDedupedCount prometheus.Counter
24 blobsDedupedBytes prometheus.Counter
25
26 blobCacheHitsCount prometheus.Counter
27 blobCacheHitsBytes prometheus.Counter
28 blobCacheMissesCount prometheus.Counter
29 blobCacheMissesBytes prometheus.Counter
30 blobCacheEvictionsCount prometheus.Counter
31 blobCacheEvictionsBytes prometheus.Counter
32
33 manifestCacheHitsCount prometheus.Counter
34 manifestCacheMissesCount prometheus.Counter
35 manifestCacheEvictionsCount prometheus.Counter
36
37 s3GetObjectDurationSeconds *prometheus.HistogramVec
38 s3GetObjectResponseCount *prometheus.CounterVec
39)
40
41func initS3BackendMetrics() {
42 blobsDedupedCount = promauto.NewCounter(prometheus.CounterOpts{
43 Name: "git_pages_blobs_deduped",
44 Help: "Count of blobs deduplicated",
45 })
46 blobsDedupedBytes = promauto.NewCounter(prometheus.CounterOpts{
47 Name: "git_pages_blobs_deduped_bytes",
48 Help: "Total size in bytes of blobs deduplicated",
49 })
50
51 blobCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{
52 Name: "git_pages_blob_cache_hits_count",
53 Help: "Count of blobs that were retrieved from the cache",
54 })
55 blobCacheHitsBytes = promauto.NewCounter(prometheus.CounterOpts{
56 Name: "git_pages_blob_cache_hits_bytes",
57 Help: "Total size in bytes of blobs that were retrieved from the cache",
58 })
59 blobCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{
60 Name: "git_pages_blob_cache_misses_count",
61 Help: "Count of blobs that were not found in the cache (and were then successfully cached)",
62 })
63 blobCacheMissesBytes = promauto.NewCounter(prometheus.CounterOpts{
64 Name: "git_pages_blob_cache_misses_bytes",
65 Help: "Total size in bytes of blobs that were not found in the cache (and were then successfully cached)",
66 })
67 blobCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{
68 Name: "git_pages_blob_cache_evictions_count",
69 Help: "Count of blobs evicted from the cache",
70 })
71 blobCacheEvictionsBytes = promauto.NewCounter(prometheus.CounterOpts{
72 Name: "git_pages_blob_cache_evictions_bytes",
73 Help: "Total size in bytes of blobs evicted from the cache",
74 })
75
76 manifestCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{
77 Name: "git_pages_manifest_cache_hits_count",
78 Help: "Count of manifests that were retrieved from the cache",
79 })
80 manifestCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{
81 Name: "git_pages_manifest_cache_misses_count",
82 Help: "Count of manifests that were not found in the cache (and were then successfully cached)",
83 })
84 manifestCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{
85 Name: "git_pages_manifest_cache_evictions_count",
86 Help: "Count of manifests evicted from the cache",
87 })
88
89 s3GetObjectDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
90 Name: "git_pages_s3_get_object_duration_seconds",
91 Help: "Time to read a whole object from S3",
92 Buckets: []float64{.01, .025, .05, .1, .25, .5, .75, 1, 1.25, 1.5, 1.75, 2, 2.5, 5, 10},
93
94 NativeHistogramBucketFactor: 1.1,
95 NativeHistogramMaxBucketNumber: 100,
96 NativeHistogramMinResetDuration: 10 * time.Minute,
97 }, []string{"kind"})
98 s3GetObjectResponseCount = promauto.NewCounterVec(prometheus.CounterOpts{
99 Name: "git_pages_s3_get_object_responses_count",
100 Help: "Count of s3:GetObject responses",
101 }, []string{"kind", "code"})
102}
103
104// Blobs can be safely cached indefinitely. They only need to be evicted to preserve memory.
105type CachedBlob struct {
106 blob []byte
107 mtime time.Time
108}
109
110func (c *CachedBlob) Weight() uint32 { return uint32(len(c.blob)) }
111
112// Manifests can only be cached for a short time to avoid serving stale content. Browser
113// page loads cause a large burst of manifest accesses that are essential for serving
114// `304 No Content` responses and these need to be handled very quickly, so both hits and
115// misses are cached.
116type CachedManifest struct {
117 manifest *Manifest
118 weight uint32
119 mtime time.Time
120 etag string
121 err error
122}
123
124func (c *CachedManifest) Weight() uint32 { return c.weight }
125
126type S3Backend struct {
127 client *minio.Client
128 bucket string
129 blobCache *observedCache[string, *CachedBlob]
130 siteCache *observedCache[string, *CachedManifest]
131 featureCache *otter.Cache[BackendFeature, bool]
132}
133
134var _ Backend = (*S3Backend)(nil)
135
136func makeCacheOptions[K comparable, V any](
137 config *CacheConfig,
138 weigher func(K, V) uint32,
139) *otter.Options[K, V] {
140 options := &otter.Options[K, V]{}
141 if config.MaxSize != 0 {
142 options.MaximumWeight = config.MaxSize.Bytes()
143 options.Weigher = weigher
144 }
145 if config.MaxStale != 0 {
146 options.RefreshCalculator = otter.RefreshWriting[K, V](
147 time.Duration(config.MaxAge))
148 }
149 if config.MaxAge != 0 || config.MaxStale != 0 {
150 options.ExpiryCalculator = otter.ExpiryWriting[K, V](
151 time.Duration(config.MaxAge + config.MaxStale))
152 }
153 return options
154}
155
156func NewS3Backend(ctx context.Context, config *S3Config) (*S3Backend, error) {
157 client, err := minio.New(config.Endpoint, &minio.Options{
158 Creds: credentials.NewStaticV4(
159 config.AccessKeyID,
160 config.SecretAccessKey,
161 "",
162 ),
163 Secure: !config.Insecure,
164 })
165 if err != nil {
166 return nil, err
167 }
168
169 bucket := config.Bucket
170 exists, err := client.BucketExists(ctx, bucket)
171 if err != nil {
172 return nil, err
173 } else if !exists {
174 logc.Printf(ctx, "s3: create bucket %s\n", bucket)
175
176 err = client.MakeBucket(ctx, bucket,
177 minio.MakeBucketOptions{Region: config.Region})
178 if err != nil {
179 return nil, err
180 }
181 }
182
183 initS3BackendMetrics()
184
185 blobCacheMetrics := observedCacheMetrics{
186 HitNumberCounter: blobCacheHitsCount,
187 HitWeightCounter: blobCacheHitsBytes,
188 MissNumberCounter: blobCacheMissesCount,
189 MissWeightCounter: blobCacheMissesBytes,
190 EvictionNumberCounter: blobCacheEvictionsCount,
191 EvictionWeightCounter: blobCacheEvictionsBytes,
192 }
193 blobCache, err := newObservedCache(makeCacheOptions(&config.BlobCache,
194 func(key string, value *CachedBlob) uint32 { return uint32(len(value.blob)) }),
195 blobCacheMetrics)
196 if err != nil {
197 return nil, err
198 }
199
200 siteCacheMetrics := observedCacheMetrics{
201 HitNumberCounter: manifestCacheHitsCount,
202 MissNumberCounter: manifestCacheMissesCount,
203 EvictionNumberCounter: manifestCacheEvictionsCount,
204 }
205 siteCache, err := newObservedCache(makeCacheOptions(&config.SiteCache,
206 func(key string, value *CachedManifest) uint32 { return value.weight }),
207 siteCacheMetrics)
208 if err != nil {
209 return nil, err
210 }
211
212 featureCache, err := otter.New(&otter.Options[BackendFeature, bool]{
213 RefreshCalculator: otter.RefreshWriting[BackendFeature, bool](10 * time.Minute),
214 })
215 if err != nil {
216 return nil, err
217 }
218
219 return &S3Backend{client, bucket, blobCache, siteCache, featureCache}, nil
220}
221
222func (s3 *S3Backend) Backend() Backend {
223 return s3
224}
225
226func blobObjectName(name string) string {
227 return fmt.Sprintf("blob/%s", path.Join(splitBlobName(name)...))
228}
229
230func storeFeatureObjectName(feature BackendFeature) string {
231 return fmt.Sprintf("meta/feature/%s", feature)
232}
233
234func (s3 *S3Backend) HasFeature(ctx context.Context, feature BackendFeature) bool {
235 loader := func(ctx context.Context, feature BackendFeature) (bool, error) {
236 _, err := s3.client.StatObject(ctx, s3.bucket, storeFeatureObjectName(feature),
237 minio.StatObjectOptions{})
238 if err != nil {
239 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
240 logc.Printf(ctx, "s3 feature %q: disabled", feature)
241 return false, nil
242 } else {
243 return false, err
244 }
245 }
246 logc.Printf(ctx, "s3 feature %q: enabled", feature)
247 return true, nil
248 }
249
250 isOn, err := s3.featureCache.Get(ctx, feature, otter.LoaderFunc[BackendFeature, bool](loader))
251 if err != nil {
252 err = fmt.Errorf("getting s3 backend feature %q: %w", feature, err)
253 ObserveError(err)
254 logc.Println(ctx, err)
255 return false
256 }
257 return isOn
258}
259
260func (s3 *S3Backend) EnableFeature(ctx context.Context, feature BackendFeature) error {
261 _, err := s3.client.PutObject(ctx, s3.bucket, storeFeatureObjectName(feature),
262 &bytes.Reader{}, 0, minio.PutObjectOptions{})
263 return err
264}
265
266func (s3 *S3Backend) GetBlob(
267 ctx context.Context, name string,
268) (
269 reader io.ReadSeeker, size uint64, mtime time.Time, err error,
270) {
271 loader := func(ctx context.Context, name string) (*CachedBlob, error) {
272 logc.Printf(ctx, "s3: get blob %s\n", name)
273
274 startTime := time.Now()
275
276 object, err := s3.client.GetObject(ctx, s3.bucket, blobObjectName(name),
277 minio.GetObjectOptions{})
278 // Note that many errors (e.g. NoSuchKey) will be reported only after this point.
279 if err != nil {
280 return nil, err
281 }
282 defer object.Close()
283
284 data, err := io.ReadAll(object)
285 if err != nil {
286 return nil, err
287 }
288
289 stat, err := object.Stat()
290 if err != nil {
291 return nil, err
292 }
293
294 s3GetObjectDurationSeconds.
295 With(prometheus.Labels{"kind": "blob"}).
296 Observe(time.Since(startTime).Seconds())
297
298 return &CachedBlob{data, stat.LastModified}, nil
299 }
300
301 observer := func(ctx context.Context, name string) (*CachedBlob, error) {
302 cached, err := loader(ctx, name)
303 var code = "OK"
304 if resp, ok := err.(minio.ErrorResponse); ok {
305 code = resp.Code
306 }
307 s3GetObjectResponseCount.With(prometheus.Labels{"kind": "blob", "code": code}).Inc()
308 return cached, err
309 }
310
311 var cached *CachedBlob
312 cached, err = s3.blobCache.Get(ctx, name, otter.LoaderFunc[string, *CachedBlob](observer))
313 if err != nil {
314 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
315 err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key)
316 }
317 } else {
318 reader = bytes.NewReader(cached.blob)
319 size = uint64(len(cached.blob))
320 mtime = cached.mtime
321 }
322 return
323}
324
325func (s3 *S3Backend) PutBlob(ctx context.Context, name string, data []byte) error {
326 logc.Printf(ctx, "s3: put blob %s (%s)\n", name, datasize.ByteSize(len(data)).HumanReadable())
327
328 _, err := s3.client.StatObject(ctx, s3.bucket, blobObjectName(name),
329 minio.GetObjectOptions{})
330 if err != nil {
331 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
332 _, err := s3.client.PutObject(ctx, s3.bucket, blobObjectName(name),
333 bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
334 if err != nil {
335 return err
336 } else {
337 ObserveData(ctx, "blob.status", "created")
338 logc.Printf(ctx, "s3: put blob %s (created)\n", name)
339 return nil
340 }
341 } else {
342 return err
343 }
344 } else {
345 ObserveData(ctx, "blob.status", "exists")
346 logc.Printf(ctx, "s3: put blob %s (exists)\n", name)
347 blobsDedupedCount.Inc()
348 blobsDedupedBytes.Add(float64(len(data)))
349 return nil
350 }
351}
352
353func (s3 *S3Backend) DeleteBlob(ctx context.Context, name string) error {
354 logc.Printf(ctx, "s3: delete blob %s\n", name)
355
356 return s3.client.RemoveObject(ctx, s3.bucket, blobObjectName(name),
357 minio.RemoveObjectOptions{})
358}
359
360func manifestObjectName(name string) string {
361 return fmt.Sprintf("site/%s", name)
362}
363
364func stagedManifestObjectName(manifestData []byte) string {
365 return fmt.Sprintf("dirty/%x", sha256.Sum256(manifestData))
366}
367
368func (s3 *S3Backend) ListManifests(ctx context.Context) (manifests []string, err error) {
369 logc.Print(ctx, "s3: list manifests")
370
371 ctx, cancel := context.WithCancel(ctx)
372 defer cancel()
373
374 prefix := manifestObjectName("")
375 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
376 Prefix: prefix,
377 Recursive: true,
378 }) {
379 if object.Err != nil {
380 return nil, object.Err
381 }
382 key := strings.TrimRight(strings.TrimPrefix(object.Key, prefix), "/")
383 if strings.Count(key, "/") > 1 {
384 continue
385 }
386 _, project, _ := strings.Cut(key, "/")
387 if project == "" || strings.HasPrefix(project, ".") && project != ".index" {
388 continue
389 }
390 manifests = append(manifests, key)
391 }
392
393 return
394}
395
396type s3ManifestLoader struct {
397 s3 *S3Backend
398}
399
400func (l s3ManifestLoader) Load(
401 ctx context.Context, key string,
402) (
403 *CachedManifest, error,
404) {
405 return l.load(ctx, key, nil)
406}
407
408func (l s3ManifestLoader) Reload(
409 ctx context.Context, key string, oldValue *CachedManifest,
410) (
411 *CachedManifest, error,
412) {
413 return l.load(ctx, key, oldValue)
414}
415
416func (l s3ManifestLoader) load(
417 ctx context.Context, name string, oldManifest *CachedManifest,
418) (
419 *CachedManifest, error,
420) {
421 logc.Printf(ctx, "s3: get manifest %s\n", name)
422
423 loader := func() (*CachedManifest, error) {
424 opts := minio.GetObjectOptions{}
425 if oldManifest != nil && oldManifest.etag != "" {
426 opts.SetMatchETagExcept(oldManifest.etag)
427 }
428 object, err := l.s3.client.GetObject(ctx, l.s3.bucket, manifestObjectName(name), opts)
429 // Note that many errors (e.g. NoSuchKey) will be reported only after this point.
430 if err != nil {
431 return nil, err
432 }
433 defer object.Close()
434
435 data, err := io.ReadAll(object)
436 if err != nil {
437 return nil, err
438 }
439
440 stat, err := object.Stat()
441 if err != nil {
442 return nil, err
443 }
444
445 manifest, err := DecodeManifest(data)
446 if err != nil {
447 return nil, err
448 }
449
450 return &CachedManifest{manifest, uint32(len(data)), stat.LastModified, stat.ETag, nil}, nil
451 }
452
453 observer := func() (*CachedManifest, error) {
454 cached, err := loader()
455 var code = "OK"
456 if resp, ok := err.(minio.ErrorResponse); ok {
457 code = resp.Code
458 }
459 s3GetObjectResponseCount.With(prometheus.Labels{"kind": "manifest", "code": code}).Inc()
460 return cached, err
461 }
462
463 startTime := time.Now()
464 cached, err := observer()
465 s3GetObjectDurationSeconds.
466 With(prometheus.Labels{"kind": "manifest"}).
467 Observe(time.Since(startTime).Seconds())
468
469 if err != nil {
470 errResp := minio.ToErrorResponse(err)
471 if errResp.Code == "NoSuchKey" {
472 err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key)
473 return &CachedManifest{nil, 1, time.Time{}, "", err}, nil
474 } else if errResp.StatusCode == http.StatusNotModified && oldManifest != nil {
475 return oldManifest, nil
476 } else {
477 return nil, err
478 }
479 } else {
480 return cached, nil
481 }
482}
483
484func (s3 *S3Backend) GetManifest(
485 ctx context.Context, name string, opts GetManifestOptions,
486) (
487 manifest *Manifest, mtime time.Time, err error,
488) {
489 if opts.BypassCache {
490 entry, found := s3.siteCache.Cache.GetEntry(name)
491 if found && entry.RefreshableAt().Before(time.Now()) {
492 s3.siteCache.Cache.Invalidate(name)
493 }
494 }
495
496 var cached *CachedManifest
497 cached, err = s3.siteCache.Get(ctx, name, s3ManifestLoader{s3})
498 if err != nil {
499 return
500 } else {
501 // This could be `manifest, mtime, nil` or `nil, time.Time{}, ErrObjectNotFound`.
502 manifest, mtime, err = cached.manifest, cached.mtime, cached.err
503 return
504 }
505}
506
507func (s3 *S3Backend) StageManifest(ctx context.Context, manifest *Manifest) error {
508 data := EncodeManifest(manifest)
509 logc.Printf(ctx, "s3: stage manifest %x\n", sha256.Sum256(data))
510
511 _, err := s3.client.PutObject(ctx, s3.bucket, stagedManifestObjectName(data),
512 bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
513 return err
514}
515
516func domainFrozenObjectName(domain string) string {
517 return manifestObjectName(fmt.Sprintf("%s/.frozen", domain))
518}
519
520func (s3 *S3Backend) checkDomainFrozen(ctx context.Context, domain string) error {
521 _, err := s3.client.GetObject(ctx, s3.bucket, domainFrozenObjectName(domain),
522 minio.GetObjectOptions{})
523 if err == nil {
524 return ErrDomainFrozen
525 } else if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
526 return nil
527 } else {
528 return err
529 }
530}
531
532func (s3 *S3Backend) CommitManifest(ctx context.Context, name string, manifest *Manifest) error {
533 data := EncodeManifest(manifest)
534 logc.Printf(ctx, "s3: commit manifest %x -> %s", sha256.Sum256(data), name)
535
536 _, domain, _ := strings.Cut(name, "/")
537 if err := s3.checkDomainFrozen(ctx, domain); err != nil {
538 return err
539 }
540
541 // Remove staged object unconditionally (whether commit succeeded or failed), since
542 // the upper layer has to retry the complete operation anyway.
543 _, putErr := s3.client.PutObject(ctx, s3.bucket, manifestObjectName(name),
544 bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
545 removeErr := s3.client.RemoveObject(ctx, s3.bucket, stagedManifestObjectName(data),
546 minio.RemoveObjectOptions{})
547 s3.siteCache.Cache.Invalidate(name)
548 if putErr != nil {
549 return putErr
550 } else if removeErr != nil {
551 return removeErr
552 } else {
553 return nil
554 }
555}
556
557func (s3 *S3Backend) DeleteManifest(ctx context.Context, name string) error {
558 logc.Printf(ctx, "s3: delete manifest %s\n", name)
559
560 _, domain, _ := strings.Cut(name, "/")
561 if err := s3.checkDomainFrozen(ctx, domain); err != nil {
562 return err
563 }
564
565 err := s3.client.RemoveObject(ctx, s3.bucket, manifestObjectName(name),
566 minio.RemoveObjectOptions{})
567 s3.siteCache.Cache.Invalidate(name)
568 return err
569}
570
571func domainCheckObjectName(domain string) string {
572 return manifestObjectName(fmt.Sprintf("%s/.exists", domain))
573}
574
575func (s3 *S3Backend) CheckDomain(ctx context.Context, domain string) (exists bool, err error) {
576 logc.Printf(ctx, "s3: check domain %s\n", domain)
577
578 _, err = s3.client.StatObject(ctx, s3.bucket, domainCheckObjectName(domain),
579 minio.StatObjectOptions{})
580 if err != nil {
581 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
582 exists, err = false, nil
583 }
584 } else {
585 exists = true
586 }
587
588 if !exists && !s3.HasFeature(ctx, FeatureCheckDomainMarker) {
589 ctx, cancel := context.WithCancel(ctx)
590 defer cancel()
591
592 for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
593 Prefix: manifestObjectName(fmt.Sprintf("%s/", domain)),
594 }) {
595 if object.Err != nil {
596 return false, object.Err
597 }
598 return true, nil
599 }
600 return false, nil
601 }
602
603 return
604}
605
606func (s3 *S3Backend) CreateDomain(ctx context.Context, domain string) error {
607 logc.Printf(ctx, "s3: create domain %s\n", domain)
608
609 _, err := s3.client.PutObject(ctx, s3.bucket, domainCheckObjectName(domain),
610 &bytes.Reader{}, 0, minio.PutObjectOptions{})
611 return err
612}
613
614func (s3 *S3Backend) FreezeDomain(ctx context.Context, domain string, freeze bool) error {
615 if freeze {
616 logc.Printf(ctx, "s3: freeze domain %s\n", domain)
617
618 _, err := s3.client.PutObject(ctx, s3.bucket, domainFrozenObjectName(domain),
619 &bytes.Reader{}, 0, minio.PutObjectOptions{})
620 return err
621 } else {
622 logc.Printf(ctx, "s3: thaw domain %s\n", domain)
623
624 err := s3.client.RemoveObject(ctx, s3.bucket, domainFrozenObjectName(domain),
625 minio.RemoveObjectOptions{})
626 if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
627 return nil
628 } else {
629 return err
630 }
631 }
632}