From 7c9decad1b0cca5a53ff318ad174267f3eaea2cb Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Tue, 23 Jun 2026 17:04:44 -0700 Subject: [PATCH] feat(git): instrument snapshot serve backend and server-side TTFB Add a backend dimension (disk/s3/...) and server-side time-to-first-byte to snapshot serves so snapshot-lookup latency can be attributed to a cache tier and split into cache-open vs first-chunk time. - Tiered.Open annotates the serving tier via an internal X-Cachew-Served-By header; serve handlers read it for the backend label. - New metrics: cachew.git.snapshot_serve_ttfb_seconds and cachew.git.snapshot_cache_open_duration_seconds. - serveReaderFast measures TTFB (sendfile for files, first-Read for stream readers) and snapshot serves/spans carry backend + ttfb attributes. Amp-Thread-ID: https://ampcode.com/threads/T-019ef6a9-a407-7389-bc43-001405e3ae9e Co-authored-by: Amp --- internal/cache/backend.go | 33 ++++++++ internal/cache/backend_test.go | 89 +++++++++++++++++++++ internal/cache/tiered.go | 7 ++ internal/strategy/git/metrics.go | 123 +++++++++++++++++++++--------- internal/strategy/git/snapshot.go | 97 +++++++++++++++++------ 5 files changed, 288 insertions(+), 61 deletions(-) create mode 100644 internal/cache/backend.go create mode 100644 internal/cache/backend_test.go diff --git a/internal/cache/backend.go b/internal/cache/backend.go new file mode 100644 index 00000000..f2304c04 --- /dev/null +++ b/internal/cache/backend.go @@ -0,0 +1,33 @@ +package cache + +import ( + "net/http" + "strings" +) + +// ServedByHeader is an internal response header set by the Tiered cache on a +// successful Open to report which backing tier produced the object (e.g. +// "disk" or "s3"). It lets serve handlers attribute latency to a specific +// tier. It is not forwarded to clients by the strategy handlers. +const ServedByHeader = "X-Cachew-Served-By" + +// BackendKind returns a coarse, low-cardinality identifier for a cache +// implementation, derived from the text before the first ":" in its String() +// form (e.g. "disk", "s3", "memory", "remote", "tiered"). It is suitable as a +// metric label value. +func BackendKind(c Cache) string { + if c == nil { + return "unknown" + } + kind, _, _ := strings.Cut(c.String(), ":") + return kind +} + +// BackendFromHeaders returns the serving tier reported via ServedByHeader, or +// "" when absent (e.g. a single-tier cache that does not annotate the tier). +func BackendFromHeaders(h http.Header) string { + if h == nil { + return "" + } + return h.Get(ServedByHeader) +} diff --git a/internal/cache/backend_test.go b/internal/cache/backend_test.go new file mode 100644 index 00000000..d34da81c --- /dev/null +++ b/internal/cache/backend_test.go @@ -0,0 +1,89 @@ +package cache_test + +import ( + "io" + "log/slog" + "net/http" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/block/cachew/internal/cache" + "github.com/block/cachew/internal/logging" +) + +func TestBackendKind(t *testing.T) { + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + + disk, err := cache.NewDisk(ctx, cache.DiskConfig{Root: t.TempDir(), LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + defer disk.Close() + assert.Equal(t, "disk", cache.BackendKind(disk)) + + mem, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + defer mem.Close() + assert.Equal(t, "memory", cache.BackendKind(mem)) + + tiered := cache.MaybeNewTiered(ctx, []cache.Cache{disk, mem}) + assert.Equal(t, "tiered", cache.BackendKind(tiered)) + + assert.Equal(t, "unknown", cache.BackendKind(nil)) +} + +func TestBackendFromHeaders(t *testing.T) { + assert.Equal(t, "", cache.BackendFromHeaders(nil)) + assert.Equal(t, "", cache.BackendFromHeaders(http.Header{})) + + h := http.Header{} + h.Set(cache.ServedByHeader, "s3") + assert.Equal(t, "s3", cache.BackendFromHeaders(h)) +} + +// TestTieredOpenAnnotatesServingTier verifies that Tiered.Open reports which +// tier produced the object via ServedByHeader, and that the annotation is not +// persisted into the backfilled lower-tier entry. +func TestTieredOpenAnnotatesServingTier(t *testing.T) { + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + + lower, err := cache.NewDisk(ctx, cache.DiskConfig{Root: t.TempDir(), LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + upper, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + tiered := cache.MaybeNewTiered(ctx, []cache.Cache{lower, upper}) + defer tiered.Close() + + key := cache.NewKey("served-by-test") + content := []byte("hello tier") + + // Seed only the upper tier so the first tiered Open hits upper and + // backfills lower. + w, err := upper.Create(ctx, key, nil, time.Minute) + assert.NoError(t, err) + _, err = w.Write(content) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + r, headers, err := tiered.Open(ctx, key) + assert.NoError(t, err) + data, err := io.ReadAll(r) + assert.NoError(t, err) + assert.NoError(t, r.Close()) + assert.Equal(t, content, data) + assert.Equal(t, "memory", cache.BackendFromHeaders(headers)) + + // The backfilled lower-tier entry must not carry the serving-tier + // annotation; a subsequent direct read of the lower tier has no such header. + lr, lowerHeaders, err := lower.Open(ctx, key) + assert.NoError(t, err) + assert.NoError(t, lr.Close()) + assert.Equal(t, "", cache.BackendFromHeaders(lowerHeaders)) + + // A direct read through the tiered cache now hits the (backfilled) lower + // tier and is annotated accordingly. + r2, headers2, err := tiered.Open(ctx, key) + assert.NoError(t, err) + assert.NoError(t, r2.Close()) + assert.Equal(t, "disk", cache.BackendFromHeaders(headers2)) +} diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 0880b4c1..2d9c3399 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -129,6 +129,13 @@ func (t Tiered) Open(ctx context.Context, key Key, opts ...Option) (io.ReadClose if i > 0 { r = t.backfillReader(ctx, key, r, headers, t.caches[0]) } + // Annotate which tier served this Open so serve handlers can attribute + // latency to a specific backend. Set after backfillReader so the value + // is not persisted into the backfilled lower-tier entry. + if headers == nil { + headers = http.Header{} + } + headers.Set(ServedByHeader, BackendKind(t.caches[i])) return r, headers, nil } return nil, nil, errors.Join(errs...) diff --git a/internal/strategy/git/metrics.go b/internal/strategy/git/metrics.go index b95d806d..4e956e74 100644 --- a/internal/strategy/git/metrics.go +++ b/internal/strategy/git/metrics.go @@ -12,48 +12,56 @@ import ( "github.com/block/cachew/internal/metrics" ) +// backendNone labels serves whose bytes did not come from the object cache +// (e.g. on-demand generation), so the "backend" dimension is always populated. +const backendNone = "none" + type gitMetrics struct { - operationDuration metric.Float64Histogram - operationTotal metric.Int64Counter - requestTotal metric.Int64Counter - snapshotServeTotal metric.Int64Counter - snapshotServeSize metric.Float64Histogram - snapshotServeDuration metric.Float64Histogram - bundleServeTotal metric.Int64Counter - bundleServeSize metric.Float64Histogram - bundleServeDuration metric.Float64Histogram - ensureRefsTotal metric.Int64Counter - ensureRefsDuration metric.Float64Histogram - spoolWriterDuration metric.Float64Histogram - spoolFollowerWaitTotal metric.Int64Counter - spoolFollowerWait metric.Float64Histogram - repackPackCount metric.Float64Histogram - snapshotServeBandwidth metric.Float64Histogram - lfsPhaseDuration metric.Float64Histogram - lfsPhaseBytes metric.Float64Histogram + operationDuration metric.Float64Histogram + operationTotal metric.Int64Counter + requestTotal metric.Int64Counter + snapshotServeTotal metric.Int64Counter + snapshotServeSize metric.Float64Histogram + snapshotServeDuration metric.Float64Histogram + snapshotServeTTFB metric.Float64Histogram + snapshotCacheOpenLatency metric.Float64Histogram + bundleServeTotal metric.Int64Counter + bundleServeSize metric.Float64Histogram + bundleServeDuration metric.Float64Histogram + ensureRefsTotal metric.Int64Counter + ensureRefsDuration metric.Float64Histogram + spoolWriterDuration metric.Float64Histogram + spoolFollowerWaitTotal metric.Int64Counter + spoolFollowerWait metric.Float64Histogram + repackPackCount metric.Float64Histogram + snapshotServeBandwidth metric.Float64Histogram + lfsPhaseDuration metric.Float64Histogram + lfsPhaseBytes metric.Float64Histogram } func newGitMetrics() *gitMetrics { meter := otel.Meter("cachew.git") return &gitMetrics{ - operationDuration: metrics.NewHistogram(meter, "cachew.git.operation_duration_seconds", "s", "Duration of git operations (clone, fetch, repack, snapshot)", metrics.LatencyBuckets()), - operationTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.operations_total", "{operations}", "Total number of git operations"), - requestTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.requests_total", "{requests}", "Total number of git HTTP requests by type"), - snapshotServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.snapshot_serves_total", "{serves}", "Snapshot serve events by source (cache, spool, cold_cache, generated) and repository"), - snapshotServeSize: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bytes", "By", "Size of served snapshots in bytes", metrics.ByteBuckets()), - snapshotServeDuration: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_duration_seconds", "s", "Wall-clock duration of snapshot serves, from handler entry to last byte sent", metrics.LatencyBuckets()), - bundleServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.bundle_serves_total", "{serves}", "Bundle serve events by source (cache, generated, miss) and repository"), - bundleServeSize: metrics.NewHistogram(meter, "cachew.git.bundle_serve_bytes", "By", "Size of served bundles in bytes", metrics.ByteBuckets()), - bundleServeDuration: metrics.NewHistogram(meter, "cachew.git.bundle_serve_duration_seconds", "s", "Wall-clock duration of bundle serves, including any on-demand generation", metrics.LatencyBuckets()), - ensureRefsTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.ensure_refs_total", "{requests}", "EnsureRefs requests by fetched and status"), - ensureRefsDuration: metrics.NewHistogram(meter, "cachew.git.ensure_refs_duration_seconds", "s", "Duration of EnsureRefs requests, including any upstream fetch", metrics.FastLatencyBuckets()), - spoolWriterDuration: metrics.NewHistogram(meter, "cachew.git.spool_writer_duration_seconds", "s", "Time the snapshot spool writer spent producing the stream", metrics.LatencyBuckets()), - spoolFollowerWaitTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.spool_follower_waits_total", "{waits}", "Snapshot spool follower events, by outcome (served, writer_failed)"), - spoolFollowerWait: metrics.NewHistogram(meter, "cachew.git.spool_follower_wait_seconds", "s", "Time a snapshot spool follower spent waiting for the writer to publish headers", metrics.FastLatencyBuckets()), - repackPackCount: metrics.NewHistogram(meter, "cachew.git.repack_pack_count", "{packs}", "Pack file count observed before and after repack, by stage (before, after)", metrics.SmallCountBuckets()), - snapshotServeBandwidth: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bandwidth_mbps", "MiBy/s", "Per-request snapshot serve throughput in MiB/s, by source and repository", metrics.BandwidthMbpsBuckets()), - lfsPhaseDuration: metrics.NewHistogram(meter, "cachew.git.lfs_phase_duration_seconds", "s", "Duration of an LFS-snapshot generation phase (discover, clone, fetch, archive_upload), by status and repository", metrics.LatencyBuckets()), - lfsPhaseBytes: metrics.NewHistogram(meter, "cachew.git.lfs_phase_bytes", "By", "Bytes processed in an LFS-snapshot generation phase, by phase and repository (e.g. .git/lfs size after fetch)", metrics.ByteBuckets()), + operationDuration: metrics.NewHistogram(meter, "cachew.git.operation_duration_seconds", "s", "Duration of git operations (clone, fetch, repack, snapshot)", metrics.LatencyBuckets()), + operationTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.operations_total", "{operations}", "Total number of git operations"), + requestTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.requests_total", "{requests}", "Total number of git HTTP requests by type"), + snapshotServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.snapshot_serves_total", "{serves}", "Snapshot serve events by source (cache, spool, cold_cache, generated) and repository"), + snapshotServeSize: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bytes", "By", "Size of served snapshots in bytes", metrics.ByteBuckets()), + snapshotServeDuration: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_duration_seconds", "s", "Wall-clock duration of snapshot serves, from handler entry to last byte sent", metrics.LatencyBuckets()), + snapshotServeTTFB: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_ttfb_seconds", "s", "Server-side time-to-first-byte for snapshot serves, from handler entry to the first response byte, by source, backend and repository", metrics.LatencyBuckets()), + snapshotCacheOpenLatency: metrics.NewHistogram(meter, "cachew.git.snapshot_cache_open_duration_seconds", "s", "Duration of the snapshot cache Open (lookup/metadata/reader creation) before streaming, by backend, status and repository", metrics.LatencyBuckets()), + bundleServeTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.bundle_serves_total", "{serves}", "Bundle serve events by source (cache, generated, miss) and repository"), + bundleServeSize: metrics.NewHistogram(meter, "cachew.git.bundle_serve_bytes", "By", "Size of served bundles in bytes", metrics.ByteBuckets()), + bundleServeDuration: metrics.NewHistogram(meter, "cachew.git.bundle_serve_duration_seconds", "s", "Wall-clock duration of bundle serves, including any on-demand generation", metrics.LatencyBuckets()), + ensureRefsTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.ensure_refs_total", "{requests}", "EnsureRefs requests by fetched and status"), + ensureRefsDuration: metrics.NewHistogram(meter, "cachew.git.ensure_refs_duration_seconds", "s", "Duration of EnsureRefs requests, including any upstream fetch", metrics.FastLatencyBuckets()), + spoolWriterDuration: metrics.NewHistogram(meter, "cachew.git.spool_writer_duration_seconds", "s", "Time the snapshot spool writer spent producing the stream", metrics.LatencyBuckets()), + spoolFollowerWaitTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.git.spool_follower_waits_total", "{waits}", "Snapshot spool follower events, by outcome (served, writer_failed)"), + spoolFollowerWait: metrics.NewHistogram(meter, "cachew.git.spool_follower_wait_seconds", "s", "Time a snapshot spool follower spent waiting for the writer to publish headers", metrics.FastLatencyBuckets()), + repackPackCount: metrics.NewHistogram(meter, "cachew.git.repack_pack_count", "{packs}", "Pack file count observed before and after repack, by stage (before, after)", metrics.SmallCountBuckets()), + snapshotServeBandwidth: metrics.NewHistogram(meter, "cachew.git.snapshot_serve_bandwidth_mbps", "MiBy/s", "Per-request snapshot serve throughput in MiB/s, by source and repository", metrics.BandwidthMbpsBuckets()), + lfsPhaseDuration: metrics.NewHistogram(meter, "cachew.git.lfs_phase_duration_seconds", "s", "Duration of an LFS-snapshot generation phase (discover, clone, fetch, archive_upload), by status and repository", metrics.LatencyBuckets()), + lfsPhaseBytes: metrics.NewHistogram(meter, "cachew.git.lfs_phase_bytes", "By", "Bytes processed in an LFS-snapshot generation phase, by phase and repository (e.g. .git/lfs size after fetch)", metrics.ByteBuckets()), } } @@ -78,9 +86,17 @@ func (m *gitMetrics) recordRequest(ctx context.Context, requestType string) { // of relying on aggregate-over-time of bytes/duration sums. // // Source is one of: "cache", "cold_cache", "spool", "generated". -func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo string, sizeBytes int64, duration time.Duration) { +// +// Backend is the cache tier that produced the bytes ("disk", "s3", ...) for +// cache-backed serves, or "" / "none" when not applicable (e.g. on-demand +// generation that never read from the object cache). +func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, backend, repo string, sizeBytes int64, duration time.Duration) { + if backend == "" { + backend = backendNone + } attrs := metric.WithAttributes( attribute.String("source", source), + attribute.String("backend", backend), attribute.String("repository", repo), ) m.snapshotServeTotal.Add(ctx, 1, attrs) @@ -99,6 +115,39 @@ func (m *gitMetrics) recordSnapshotServe(ctx context.Context, source, repo strin } } +// recordSnapshotTTFB records server-side time-to-first-byte for a snapshot +// serve: the wall time from handler entry until the first response byte. For +// cache-backed serves this includes cache Open plus the latency until the +// backend (e.g. an S3 range reader) yields its first chunk, which is what a +// client observes as its snapshot "lookup" latency before the download begins. +func (m *gitMetrics) recordSnapshotTTFB(ctx context.Context, source, backend, repo string, ttfb time.Duration) { + if ttfb <= 0 { + return + } + if backend == "" { + backend = backendNone + } + m.snapshotServeTTFB.Record(ctx, ttfb.Seconds(), metric.WithAttributes( + attribute.String("source", source), + attribute.String("backend", backend), + attribute.String("repository", repo), + )) +} + +// recordSnapshotCacheOpen records the duration of the snapshot cache Open +// (lookup, metadata read and reader creation) that precedes streaming. Status +// is "hit", "miss" or "error". Backend is the serving tier on a hit. +func (m *gitMetrics) recordSnapshotCacheOpen(ctx context.Context, backend, repo, status string, duration time.Duration) { + if backend == "" { + backend = backendNone + } + m.snapshotCacheOpenLatency.Record(ctx, duration.Seconds(), metric.WithAttributes( + attribute.String("backend", backend), + attribute.String("repository", repo), + attribute.String("status", status), + )) +} + // recordBundleServe records a bundle serve event. Source is one of: // "cache" (served from object cache), "generated" (created on demand from the // local mirror), or "miss" (no bundle could be produced). diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 7cd5c689..c2f655c7 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -265,8 +265,9 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, if existing, loaded := s.coldSnapshotMu.LoadOrStore(upstreamURL, entry); loaded { winner := existing.(*coldSnapshotEntry) <-winner.done - reader, _, openErr := s.cache.Open(ctx, cacheKey) + reader, openHeaders, openErr := s.cache.Open(ctx, cacheKey) if openErr == nil && reader != nil { + backend := cache.BackendFromHeaders(openHeaders) winner.serving.Add(1) defer func() { _ = reader.Close() @@ -274,9 +275,10 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, }() logger.InfoContext(ctx, "Serving locally cached snapshot after waiting for in-flight fill", "upstream", upstreamURL) w.Header().Set("Content-Type", "application/zstd") - n, err := serveReaderFast(w, r, reader) - s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n, time.Since(start)) - span.SetAttributes(attribute.String("cachew.source", "cold_cache"), attribute.Int64("cachew.bytes", n)) + n, ttfb, err := serveReaderFast(w, r, reader, start) + s.metrics.recordSnapshotTTFB(ctx, "cold_cache", backend, repoName, ttfb) + s.metrics.recordSnapshotServe(ctx, "cold_cache", backend, repoName, n, time.Since(start)) + span.SetAttributes(attribute.String("cachew.source", "cold_cache"), attribute.String("cachew.backend", backend), attribute.Int64("cachew.bytes", n), attribute.Float64("cachew.ttfb_seconds", ttfb.Seconds())) if err != nil { logger.WarnContext(ctx, "Failed to stream locally cached snapshot", "upstream", upstreamURL, "error", err) span.RecordError(err) @@ -289,13 +291,15 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, close(entry.done) s.coldSnapshotMu.Delete(upstreamURL) }() - reader, _, openErr := s.cache.Open(ctx, cacheKey) + reader, openHeaders, openErr := s.cache.Open(ctx, cacheKey) if openErr == nil && reader != nil { + backend := cache.BackendFromHeaders(openHeaders) logger.InfoContext(ctx, "Serving cached snapshot while mirror warms up", "upstream", upstreamURL) w.Header().Set("Content-Type", "application/zstd") - n, err := serveReaderFast(w, r, reader) - s.metrics.recordSnapshotServe(ctx, "cold_cache", repoName, n, time.Since(start)) - span.SetAttributes(attribute.String("cachew.source", "cold_cache"), attribute.Int64("cachew.bytes", n)) + n, ttfb, err := serveReaderFast(w, r, reader, start) + s.metrics.recordSnapshotTTFB(ctx, "cold_cache", backend, repoName, ttfb) + s.metrics.recordSnapshotServe(ctx, "cold_cache", backend, repoName, n, time.Since(start)) + span.SetAttributes(attribute.String("cachew.source", "cold_cache"), attribute.String("cachew.backend", backend), attribute.Int64("cachew.bytes", n), attribute.Float64("cachew.ttfb_seconds", ttfb.Seconds())) if err != nil { logger.WarnContext(ctx, "Failed to stream cached snapshot", "upstream", upstreamURL, "error", err) span.RecordError(err) @@ -320,11 +324,19 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, } s.maybeBackgroundFetch(repo) + openStart := time.Now() reader, headers, err := s.cache.Open(ctx, cacheKey) - if err != nil && !errors.Is(err, os.ErrNotExist) { + openElapsed := time.Since(openStart) + switch { + case err != nil && !errors.Is(err, os.ErrNotExist): + s.metrics.recordSnapshotCacheOpen(ctx, cache.BackendFromHeaders(headers), repoName, "error", openElapsed) logger.ErrorContext(ctx, "Failed to open snapshot from cache", "upstream", upstreamURL, "error", err) http.Error(w, "Internal server error", http.StatusInternalServerError) return + case reader == nil: + s.metrics.recordSnapshotCacheOpen(ctx, "", repoName, "miss", openElapsed) + default: + s.metrics.recordSnapshotCacheOpen(ctx, cache.BackendFromHeaders(headers), repoName, "hit", openElapsed) } if reader == nil { @@ -372,7 +384,7 @@ func (s *Strategy) serveSnapshotHead(ctx context.Context, w http.ResponseWriter, } w.WriteHeader(status) - s.metrics.recordSnapshotServe(ctx, "head", repoName, 0, time.Since(start)) + s.metrics.recordSnapshotServe(ctx, "head", backendNone, repoName, 0, time.Since(start)) if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { span.SetAttributes(attribute.String("cachew.source", "head"), attribute.Int64("cachew.bytes", 0)) } @@ -384,7 +396,7 @@ func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWrit w.Header().Add(key, value) } } - if _, err := serveReaderFast(w, r, reader); err != nil { + if _, _, err := serveReaderFast(w, r, reader, time.Now()); err != nil { return errors.Wrap(err, "streaming artifact") } return nil @@ -393,20 +405,46 @@ func (s *Strategy) streamSnapshotArtifact(_ context.Context, w http.ResponseWrit // serveReaderFast serves the content from reader using the most efficient method // available. When reader is an *os.File, it uses http.ServeContent which enables // sendfile(2) zero-copy I/O and automatic Content-Length/Range support. For other -// reader types it falls back to io.Copy. Returns bytes served for metrics. -func serveReaderFast(w http.ResponseWriter, r *http.Request, reader io.Reader) (int64, error) { +// reader types it falls back to io.Copy. Returns bytes served and the +// server-side time-to-first-byte measured relative to start (handler entry). +func serveReaderFast(w http.ResponseWriter, r *http.Request, reader io.Reader, start time.Time) (n int64, ttfb time.Duration, err error) { if f, ok := reader.(*os.File); ok { - info, err := f.Stat() - if err != nil { - return 0, errors.Wrap(err, "stat file for serving") + info, statErr := f.Stat() + if statErr != nil { + return 0, 0, errors.Wrap(statErr, "stat file for serving") } + // A local file is served via sendfile(2) with an effectively immediate + // first byte, so TTFB is just the pre-stream work already elapsed. + ttfb = time.Since(start) // http.ServeContent handles Content-Length, Range requests, and uses // sendfile(2) for zero-copy transfer from file to socket. http.ServeContent(w, r, "", time.Time{}, f) - return info.Size(), nil + return info.Size(), ttfb, nil + } + fr := &firstByteReader{r: reader, start: start} + n, err = io.Copy(w, fr) + return n, fr.ttfb, errors.Wrap(err, "copy to response") +} + +// firstByteReader records the wall time, relative to start, at which the +// wrapped reader returns its first non-empty Read. For a non-file cache reader +// (e.g. an S3 range reader whose first Read blocks until the initial chunk is +// downloaded) this is when bytes first become available to write to the client, +// approximating server-side time-to-first-byte. +type firstByteReader struct { + r io.Reader + start time.Time + ttfb time.Duration + marked bool +} + +func (f *firstByteReader) Read(p []byte) (int, error) { + n, err := f.r.Read(p) + if !f.marked && n > 0 { + f.marked = true + f.ttfb = time.Since(f.start) } - n, err := io.Copy(w, reader) - return n, errors.Wrap(err, "copy to response") + return n, err //nolint:wrapcheck // must return unwrapped io.EOF per io.Reader contract } func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) { //nolint:funlen @@ -517,8 +555,11 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h } func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL, repoName string, start time.Time) error { + backend := cache.BackendFromHeaders(headers) snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit") + mirrorHeadStart := time.Now() mirrorHead := s.getMirrorHead(ctx, repo) + mirrorHeadElapsed := time.Since(mirrorHeadStart) // Forward the snapshot commit to the client so it knows whether the // snapshot is fresh (no bundle URL = already at HEAD, skip freshen). @@ -556,15 +597,23 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW // (S3, memory, remote) fall through to io.Copy, so revalidate explicitly to // avoid streaming the full snapshot when the client already has it. var n int64 + var ttfb time.Duration var err error if status := httputil.CheckConditionals(r, headers.Get(cache.ETagKey)); status != 0 { w.WriteHeader(status) } else { - n, err = serveReaderFast(w, r, reader) + n, ttfb, err = serveReaderFast(w, r, reader, start) + s.metrics.recordSnapshotTTFB(ctx, "cache", backend, repoName, ttfb) } - s.metrics.recordSnapshotServe(ctx, "cache", repoName, n, time.Since(start)) + s.metrics.recordSnapshotServe(ctx, "cache", backend, repoName, n, time.Since(start)) if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { - span.SetAttributes(attribute.String("cachew.source", "cache"), attribute.Int64("cachew.bytes", n)) + span.SetAttributes( + attribute.String("cachew.source", "cache"), + attribute.String("cachew.backend", backend), + attribute.Int64("cachew.bytes", n), + attribute.Float64("cachew.ttfb_seconds", ttfb.Seconds()), + attribute.Float64("cachew.mirror_head_seconds", mirrorHeadElapsed.Seconds()), + ) } return errors.Wrap(err, "stream snapshot") } @@ -684,7 +733,7 @@ func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request return errors.Wrap(err, "snapshot spool read") } s.metrics.recordSpoolFollowerWait(ctx, repoName, "served", wait) - s.metrics.recordSnapshotServe(ctx, "spool", repoName, spool.Written(), time.Since(start)) + s.metrics.recordSnapshotServe(ctx, "spool", backendNone, repoName, spool.Written(), time.Since(start)) if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { span.SetAttributes(attribute.String("cachew.source", "spool"), attribute.Int64("cachew.bytes", spool.Written()), attribute.Float64("cachew.spool_wait_seconds", wait.Seconds())) @@ -698,7 +747,7 @@ func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request err := s.writeSnapshotSpool(w, r, repo, upstreamURL, repoName, entry) if err == nil { - s.metrics.recordSnapshotServe(ctx, "generated", repoName, entry.spool.Written(), time.Since(start)) + s.metrics.recordSnapshotServe(ctx, "generated", backendNone, repoName, entry.spool.Written(), time.Since(start)) if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { span.SetAttributes(attribute.String("cachew.source", "generated"), attribute.Int64("cachew.bytes", entry.spool.Written())) }