Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions internal/cache/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package cache

import (
"net/http"
"strings"
)

// ServedByHeader is an internal response header set by the Tiered cache on a
// successful Open to report which backing tier produced the object (e.g.
// "disk" or "s3"). It lets serve handlers attribute latency to a specific
// tier. It is not forwarded to clients by the strategy handlers.
const ServedByHeader = "X-Cachew-Served-By"

// BackendKind returns a coarse, low-cardinality identifier for a cache
// implementation, derived from the text before the first ":" in its String()
// form (e.g. "disk", "s3", "memory", "remote", "tiered"). It is suitable as a
// metric label value.
func BackendKind(c Cache) string {
if c == nil {
return "unknown"
}
kind, _, _ := strings.Cut(c.String(), ":")
return kind
}

// BackendFromHeaders returns the serving tier reported via ServedByHeader, or
// "" when absent (e.g. a single-tier cache that does not annotate the tier).
func BackendFromHeaders(h http.Header) string {
if h == nil {
return ""
}
return h.Get(ServedByHeader)
}
89 changes: 89 additions & 0 deletions internal/cache/backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package cache_test

import (
"io"
"log/slog"
"net/http"
"testing"
"time"

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/cache"
"github.com/block/cachew/internal/logging"
)

func TestBackendKind(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug})

disk, err := cache.NewDisk(ctx, cache.DiskConfig{Root: t.TempDir(), LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
defer disk.Close()
assert.Equal(t, "disk", cache.BackendKind(disk))

mem, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
defer mem.Close()
assert.Equal(t, "memory", cache.BackendKind(mem))

tiered := cache.MaybeNewTiered(ctx, []cache.Cache{disk, mem})
assert.Equal(t, "tiered", cache.BackendKind(tiered))

assert.Equal(t, "unknown", cache.BackendKind(nil))
}

func TestBackendFromHeaders(t *testing.T) {
assert.Equal(t, "", cache.BackendFromHeaders(nil))
assert.Equal(t, "", cache.BackendFromHeaders(http.Header{}))

h := http.Header{}
h.Set(cache.ServedByHeader, "s3")
assert.Equal(t, "s3", cache.BackendFromHeaders(h))
}

// TestTieredOpenAnnotatesServingTier verifies that Tiered.Open reports which
// tier produced the object via ServedByHeader, and that the annotation is not
// persisted into the backfilled lower-tier entry.
func TestTieredOpenAnnotatesServingTier(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug})

lower, err := cache.NewDisk(ctx, cache.DiskConfig{Root: t.TempDir(), LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
upper, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
tiered := cache.MaybeNewTiered(ctx, []cache.Cache{lower, upper})
defer tiered.Close()

key := cache.NewKey("served-by-test")
content := []byte("hello tier")

// Seed only the upper tier so the first tiered Open hits upper and
// backfills lower.
w, err := upper.Create(ctx, key, nil, time.Minute)
assert.NoError(t, err)
_, err = w.Write(content)
assert.NoError(t, err)
assert.NoError(t, w.Close())

r, headers, err := tiered.Open(ctx, key)
assert.NoError(t, err)
data, err := io.ReadAll(r)
assert.NoError(t, err)
assert.NoError(t, r.Close())
assert.Equal(t, content, data)
assert.Equal(t, "memory", cache.BackendFromHeaders(headers))

// The backfilled lower-tier entry must not carry the serving-tier
// annotation; a subsequent direct read of the lower tier has no such header.
lr, lowerHeaders, err := lower.Open(ctx, key)
assert.NoError(t, err)
assert.NoError(t, lr.Close())
assert.Equal(t, "", cache.BackendFromHeaders(lowerHeaders))

// A direct read through the tiered cache now hits the (backfilled) lower
// tier and is annotated accordingly.
r2, headers2, err := tiered.Open(ctx, key)
assert.NoError(t, err)
assert.NoError(t, r2.Close())
assert.Equal(t, "disk", cache.BackendFromHeaders(headers2))
}
7 changes: 7 additions & 0 deletions internal/cache/tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ func (t Tiered) Open(ctx context.Context, key Key, opts ...Option) (io.ReadClose
if i > 0 {
r = t.backfillReader(ctx, key, r, headers, t.caches[0])
}
// Annotate which tier served this Open so serve handlers can attribute
// latency to a specific backend. Set after backfillReader so the value
// is not persisted into the backfilled lower-tier entry.
if headers == nil {
headers = http.Header{}
}
headers.Set(ServedByHeader, BackendKind(t.caches[i]))
return r, headers, nil
}
return nil, nil, errors.Join(errs...)
Expand Down
123 changes: 86 additions & 37 deletions internal/strategy/git/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,56 @@ import (
"github.com/block/cachew/internal/metrics"
)

// backendNone labels serves whose bytes did not come from the object cache
// (e.g. on-demand generation), so the "backend" dimension is always populated.
const backendNone = "none"

type gitMetrics struct {
operationDuration metric.Float64Histogram
operationTotal metric.Int64Counter
requestTotal metric.Int64Counter
snapshotServeTotal metric.Int64Counter
snapshotServeSize metric.Float64Histogram
snapshotServeDuration metric.Float64Histogram
bundleServeTotal metric.Int64Counter
bundleServeSize metric.Float64Histogram
bundleServeDuration metric.Float64Histogram
ensureRefsTotal metric.Int64Counter
ensureRefsDuration metric.Float64Histogram
spoolWriterDuration metric.Float64Histogram
spoolFollowerWaitTotal metric.Int64Counter
spoolFollowerWait metric.Float64Histogram
repackPackCount metric.Float64Histogram
snapshotServeBandwidth metric.Float64Histogram
lfsPhaseDuration metric.Float64Histogram
lfsPhaseBytes metric.Float64Histogram
operationDuration metric.Float64Histogram
operationTotal metric.Int64Counter
requestTotal metric.Int64Counter
snapshotServeTotal metric.Int64Counter
snapshotServeSize metric.Float64Histogram
snapshotServeDuration metric.Float64Histogram
snapshotServeTTFB metric.Float64Histogram
snapshotCacheOpenLatency metric.Float64Histogram
bundleServeTotal metric.Int64Counter
bundleServeSize metric.Float64Histogram
bundleServeDuration metric.Float64Histogram
ensureRefsTotal metric.Int64Counter
ensureRefsDuration metric.Float64Histogram
spoolWriterDuration metric.Float64Histogram
spoolFollowerWaitTotal metric.Int64Counter
spoolFollowerWait metric.Float64Histogram
repackPackCount metric.Float64Histogram
snapshotServeBandwidth metric.Float64Histogram
lfsPhaseDuration metric.Float64Histogram
lfsPhaseBytes metric.Float64Histogram
}

func newGitMetrics() *gitMetrics {
meter := otel.Meter("cachew.git")
return &gitMetrics{
operationDuration: metrics.NewHistogram(meter, "cachew.git.operation_duration_seconds", "s", "Duration of git operations (clone, fetch, repack, snapshot)", metrics.LatencyBuckets()),
operationTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.operations_total", "{operations}", "Total number of git operations"),
requestTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.requests_total", "{requests}", "Total number of git HTTP requests by type"),
snapshotServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.snapshot_serves_total", "{serves}", "Snapshot serve events by source (cache, spool, cold_cache, generated) and repository"),
snapshotServeSize: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bytes", "By", "Size of served snapshots in bytes", metrics.ByteBuckets()),
snapshotServeDuration: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_duration_seconds", "s", "Wall-clock duration of snapshot serves, from handler entry to last byte sent", metrics.LatencyBuckets()),
bundleServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.bundle_serves_total", "{serves}", "Bundle serve events by source (cache, generated, miss) and repository"),
bundleServeSize: metrics.NewHistogram(meter, "cachew.git.bundle_serve_bytes", "By", "Size of served bundles in bytes", metrics.ByteBuckets()),
bundleServeDuration: metrics.NewHistogram(meter, "cachew.git.bundle_serve_duration_seconds", "s", "Wall-clock duration of bundle serves, including any on-demand generation", metrics.LatencyBuckets()),
ensureRefsTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.ensure_refs_total", "{requests}", "EnsureRefs requests by fetched and status"),
ensureRefsDuration: metrics.NewHistogram(meter, "cachew.git.ensure_refs_duration_seconds", "s", "Duration of EnsureRefs requests, including any upstream fetch", metrics.FastLatencyBuckets()),
spoolWriterDuration: metrics.NewHistogram(meter, "cachew.git.spool_writer_duration_seconds", "s", "Time the snapshot spool writer spent producing the stream", metrics.LatencyBuckets()),
spoolFollowerWaitTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.spool_follower_waits_total", "{waits}", "Snapshot spool follower events, by outcome (served, writer_failed)"),
spoolFollowerWait: metrics.NewHistogram(meter, "cachew.git.spool_follower_wait_seconds", "s", "Time a snapshot spool follower spent waiting for the writer to publish headers", metrics.FastLatencyBuckets()),
repackPackCount: metrics.NewHistogram(meter, "cachew.git.repack_pack_count", "{packs}", "Pack file count observed before and after repack, by stage (before, after)", metrics.SmallCountBuckets()),
snapshotServeBandwidth: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bandwidth_mbps", "MiBy/s", "Per-request snapshot serve throughput in MiB/s, by source and repository", metrics.BandwidthMbpsBuckets()),
lfsPhaseDuration: metrics.NewHistogram(meter, "cachew.git.lfs_phase_duration_seconds", "s", "Duration of an LFS-snapshot generation phase (discover, clone, fetch, archive_upload), by status and repository", metrics.LatencyBuckets()),
lfsPhaseBytes: metrics.NewHistogram(meter, "cachew.git.lfs_phase_bytes", "By", "Bytes processed in an LFS-snapshot generation phase, by phase and repository (e.g. .git/lfs size after fetch)", metrics.ByteBuckets()),
operationDuration: metrics.NewHistogram(meter, "cachew.git.operation_duration_seconds", "s", "Duration of git operations (clone, fetch, repack, snapshot)", metrics.LatencyBuckets()),
operationTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.operations_total", "{operations}", "Total number of git operations"),
requestTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.requests_total", "{requests}", "Total number of git HTTP requests by type"),
snapshotServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.snapshot_serves_total", "{serves}", "Snapshot serve events by source (cache, spool, cold_cache, generated) and repository"),
snapshotServeSize: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bytes", "By", "Size of served snapshots in bytes", metrics.ByteBuckets()),
snapshotServeDuration: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_duration_seconds", "s", "Wall-clock duration of snapshot serves, from handler entry to last byte sent", metrics.LatencyBuckets()),
snapshotServeTTFB: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_ttfb_seconds", "s", "Server-side time-to-first-byte for snapshot serves, from handler entry to the first response byte, by source, backend and repository", metrics.LatencyBuckets()),
snapshotCacheOpenLatency: metrics.NewHistogram(meter, "cachew.git.snapshot_cache_open_duration_seconds", "s", "Duration of the snapshot cache Open (lookup/metadata/reader creation) before streaming, by backend, status and repository", metrics.LatencyBuckets()),
bundleServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.bundle_serves_total", "{serves}", "Bundle serve events by source (cache, generated, miss) and repository"),
bundleServeSize: metrics.NewHistogram(meter, "cachew.git.bundle_serve_bytes", "By", "Size of served bundles in bytes", metrics.ByteBuckets()),
bundleServeDuration: metrics.NewHistogram(meter, "cachew.git.bundle_serve_duration_seconds", "s", "Wall-clock duration of bundle serves, including any on-demand generation", metrics.LatencyBuckets()),
ensureRefsTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.ensure_refs_total", "{requests}", "EnsureRefs requests by fetched and status"),
ensureRefsDuration: metrics.NewHistogram(meter, "cachew.git.ensure_refs_duration_seconds", "s", "Duration of EnsureRefs requests, including any upstream fetch", metrics.FastLatencyBuckets()),
spoolWriterDuration: metrics.NewHistogram(meter, "cachew.git.spool_writer_duration_seconds", "s", "Time the snapshot spool writer spent producing the stream", metrics.LatencyBuckets()),
spoolFollowerWaitTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.spool_follower_waits_total", "{waits}", "Snapshot spool follower events, by outcome (served, writer_failed)"),
spoolFollowerWait: metrics.NewHistogram(meter, "cachew.git.spool_follower_wait_seconds", "s", "Time a snapshot spool follower spent waiting for the writer to publish headers", metrics.FastLatencyBuckets()),
repackPackCount: metrics.NewHistogram(meter, "cachew.git.repack_pack_count", "{packs}", "Pack file count observed before and after repack, by stage (before, after)", metrics.SmallCountBuckets()),
snapshotServeBandwidth: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bandwidth_mbps", "MiBy/s", "Per-request snapshot serve throughput in MiB/s, by source and repository", metrics.BandwidthMbpsBuckets()),
lfsPhaseDuration: metrics.NewHistogram(meter, "cachew.git.lfs_phase_duration_seconds", "s", "Duration of an LFS-snapshot generation phase (discover, clone, fetch, archive_upload), by status and repository", metrics.LatencyBuckets()),
lfsPhaseBytes: metrics.NewHistogram(meter, "cachew.git.lfs_phase_bytes", "By", "Bytes processed in an LFS-snapshot generation phase, by phase and repository (e.g. .git/lfs size after fetch)", metrics.ByteBuckets()),
}
}

Expand All @@ -78,9 +86,17 @@ func (m *gitMetrics) recordRequest(ctx context.Context, requestType string) {
// of relying on aggregate-over-time of bytes/duration sums.
//
// Source is one of: "cache", "cold_cache", "spool", "generated".
func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo string, sizeBytes int64, duration time.Duration) {
//
// Backend is the cache tier that produced the bytes ("disk", "s3", ...) for
// cache-backed serves, or "" / "none" when not applicable (e.g. on-demand
// generation that never read from the object cache).
func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, backend, repo string, sizeBytes int64, duration time.Duration) {
if backend == "" {
backend = backendNone
}
attrs := metric.WithAttributes(
attribute.String("source", source),
attribute.String("backend", backend),
attribute.String("repository", repo),
)
m.snapshotServeTotal.Add(ctx, 1, attrs)
Expand All @@ -99,6 +115,39 @@ func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo strin
}
}

// recordSnapshotTTFB records server-side time-to-first-byte for a snapshot
// serve: the wall time from handler entry until the first response byte. For
// cache-backed serves this includes cache Open plus the latency until the
// backend (e.g. an S3 range reader) yields its first chunk, which is what a
// client observes as its snapshot "lookup" latency before the download begins.
func (m *gitMetrics) recordSnapshotTTFB(ctx context.Context, source, backend, repo string, ttfb time.Duration) {
if ttfb <= 0 {
return
}
if backend == "" {
backend = backendNone
}
m.snapshotServeTTFB.Record(ctx, ttfb.Seconds(), metric.WithAttributes(
attribute.String("source", source),
attribute.String("backend", backend),
attribute.String("repository", repo),
))
}

// recordSnapshotCacheOpen records the duration of the snapshot cache Open
// (lookup, metadata read and reader creation) that precedes streaming. Status
// is "hit", "miss" or "error". Backend is the serving tier on a hit.
func (m *gitMetrics) recordSnapshotCacheOpen(ctx context.Context, backend, repo, status string, duration time.Duration) {
if backend == "" {
backend = backendNone
}
m.snapshotCacheOpenLatency.Record(ctx, duration.Seconds(), metric.WithAttributes(
attribute.String("backend", backend),
attribute.String("repository", repo),
attribute.String("status", status),
))
}

// recordBundleServe records a bundle serve event. Source is one of:
// "cache" (served from object cache), "generated" (created on demand from the
// local mirror), or "miss" (no bundle could be produced).
Expand Down
Loading