From d943295aabc2ab8c150c604b9208e40549240156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timoth=C3=A9e=20Peignier?= Date: Wed, 15 Apr 2026 15:02:39 -0700 Subject: [PATCH] Add quantiles to summaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Timothée Peignier --- lib/prometheus/client/formats/text.rb | 5 + lib/prometheus/client/registry.rb | 8 +- lib/prometheus/client/summary.rb | 94 +++++++- lib/prometheus/client/summary/quantile.rb | 212 ++++++++++++++++++ spec/prometheus/client/formats/text_spec.rb | 13 ++ .../client/summary/quantile_spec.rb | 109 +++++++++ spec/prometheus/client/summary_spec.rb | 59 +++++ 7 files changed, 495 insertions(+), 5 deletions(-) create mode 100644 lib/prometheus/client/summary/quantile.rb create mode 100644 spec/prometheus/client/summary/quantile_spec.rb diff --git a/lib/prometheus/client/formats/text.rb b/lib/prometheus/client/formats/text.rb index b735389c..fe863126 100644 --- a/lib/prometheus/client/formats/text.rb +++ b/lib/prometheus/client/formats/text.rb @@ -50,6 +50,11 @@ def representation(metric, label_set, value, &block) end def summary(name, set, value) + value.each do |k, v| + next if k == "sum" || k == "count" + yield metric(name, labels(set.merge(quantile: k)), v) + end + l = labels(set) yield metric("#{name}_sum", l, value["sum"]) yield metric("#{name}_count", l, value["count"]) diff --git a/lib/prometheus/client/registry.rb b/lib/prometheus/client/registry.rb index 0b2f6e9a..b5ba4e38 100644 --- a/lib/prometheus/client/registry.rb +++ b/lib/prometheus/client/registry.rb @@ -45,11 +45,17 @@ def counter(name, docstring:, labels: [], preset_labels: {}, store_settings: {}) store_settings: store_settings)) end - def summary(name, docstring:, labels: [], preset_labels: {}, store_settings: {}) + def summary(name, docstring:, labels: [], preset_labels: {}, + objectives: {}, max_age: Summary::DEFAULT_MAX_AGE, + age_buckets: Summary::DEFAULT_AGE_BUCKETS, + store_settings: {}) register(Summary.new(name, docstring: docstring, labels: labels, preset_labels: preset_labels, + objectives: objectives, + max_age: max_age, + age_buckets: age_buckets, store_settings: store_settings)) end diff --git a/lib/prometheus/client/summary.rb b/lib/prometheus/client/summary.rb index dff2f360..cfe3f68b 100644 --- a/lib/prometheus/client/summary.rb +++ b/lib/prometheus/client/summary.rb @@ -1,12 +1,56 @@ # encoding: UTF-8 require 'prometheus/client/metric' +require 'prometheus/client/summary/quantile' module Prometheus module Client # Summary is an accumulator for samples. It captures Numeric data and - # provides the total count and sum of observations. + # provides the total count, sum of observations, and configurable + # quantile estimates. class Summary < Metric + attr_reader :objectives + + DEFAULT_MAX_AGE = 600 + DEFAULT_AGE_BUCKETS = 5 + + def initialize(name, + docstring:, + labels: [], + preset_labels: {}, + objectives: {}, + max_age: DEFAULT_MAX_AGE, + age_buckets: DEFAULT_AGE_BUCKETS, + store_settings: {}) + @objectives = objectives + @max_age = max_age + @age_buckets = age_buckets + @estimators = {} + @estimator_lock = Monitor.new + + super(name, + docstring: docstring, + labels: labels, + preset_labels: preset_labels, + store_settings: store_settings) + end + + def with_labels(labels) + new_metric = self.class.new(name, + docstring: docstring, + labels: @labels, + preset_labels: preset_labels.merge(labels), + objectives: @objectives, + max_age: @max_age, + age_buckets: @age_buckets, + store_settings: @store_settings) + + new_metric.replace_internal_store(@store) + new_metric.replace_estimators(@estimators, @estimator_lock) + + new_metric + end + def type :summary end @@ -24,30 +68,55 @@ def observe(value, labels: {}) @store.increment(labels: base_label_set.merge(quantile: "count"), by: 1) @store.increment(labels: base_label_set.merge(quantile: "sum"), by: value) end + + unless @objectives.empty? + estimator = estimator_for(base_label_set) + estimator.observe(value) + end end - # Returns a hash with "sum" and "count" as keys + # Returns a hash with "sum", "count", and quantile keys def get(labels: {}) base_label_set = label_set_for(labels) internal_counters = ["count", "sum"] - @store.synchronize do + result = @store.synchronize do internal_counters.each_with_object({}) do |counter, acc| acc[counter] = @store.get(labels: base_label_set.merge(quantile: counter)) end end + + unless @objectives.empty? + estimator = estimator_for(base_label_set) + @objectives.each_key do |quantile| + result[quantile.to_s] = estimator.query(quantile) + end + end + + result end # Returns all label sets with their values expressed as hashes with their sum/count def values values = @store.all_values - values.each_with_object({}) do |(label_set, v), acc| + result = values.each_with_object({}) do |(label_set, v), acc| actual_label_set = label_set.reject{|l| l == :quantile } acc[actual_label_set] ||= { "count" => 0.0, "sum" => 0.0 } acc[actual_label_set][label_set[:quantile]] = v end + + unless @objectives.empty? + result.each do |label_set, hash| + estimator = estimator_for(label_set) + @objectives.each_key do |quantile| + hash[quantile.to_s] = estimator.query(quantile) + end + end + end + + result end def init_label_set(labels) @@ -59,11 +128,28 @@ def init_label_set(labels) end end + protected + + def replace_estimators(estimators, lock) + @estimators = estimators + @estimator_lock = lock + end + private def reserved_labels [:quantile] end + + def estimator_for(label_set) + @estimator_lock.synchronize do + @estimators[label_set] ||= ::Prometheus::Client::Quantile::TimeWindowEstimator.new( + objectives: @objectives, + max_age: @max_age, + age_buckets: @age_buckets, + ) + end + end end end end diff --git a/lib/prometheus/client/summary/quantile.rb b/lib/prometheus/client/summary/quantile.rb new file mode 100644 index 00000000..1cdff51e --- /dev/null +++ b/lib/prometheus/client/summary/quantile.rb @@ -0,0 +1,212 @@ +# encoding: UTF-8 + +module Prometheus + module Client + module Quantile + Sample = Struct.new(:value, :width, :delta) + + BUFFER_SIZE = 500 + + class Estimator + attr_reader :observations + + def initialize(objectives) + @objectives = objectives + @buffer = [] + @samples = [] + @observations = 0 + end + + def observe(value) + @buffer << value + @observations += 1 + flush if @buffer.size >= BUFFER_SIZE + end + + def query(quantile) + flush unless @buffer.empty? + return Float::NAN if @samples.empty? + + desired = quantile * @observations + cumulative = 0 + + @samples.each_with_index do |sample, i| + cumulative += sample.width + if i == @samples.size - 1 + return sample.value + end + + upper = desired + (allowable_error(desired) / 2.0) + if cumulative + @samples[i + 1].delta > upper + return sample.value + end + end + + @samples.last.value + end + + def flush + return if @buffer.empty? + + @buffer.sort! + merge(@buffer) + @buffer.clear + compress + end + + def reset + @buffer.clear + @samples.clear + @observations = 0 + end + + private + + def merge(sorted_values) + if @samples.empty? + sorted_values.each do |v| + @samples << Sample.new(v, 1, 0) + end + return + end + + merged = [] + sample_idx = 0 + value_idx = 0 + cumulative = 0 + + while sample_idx < @samples.length && value_idx < sorted_values.length + s = @samples[sample_idx] + + if sorted_values[value_idx] <= s.value + v = sorted_values[value_idx] + value_idx += 1 + + if merged.empty? + merged << Sample.new(v, 1, 0) + else + delta = compute_delta(cumulative + 1) + merged << Sample.new(v, 1, delta) + end + else + cumulative += s.width + merged << s + sample_idx += 1 + end + end + + while sample_idx < @samples.length + merged << @samples[sample_idx] + sample_idx += 1 + end + + while value_idx < sorted_values.length + v = sorted_values[value_idx] + value_idx += 1 + if merged.empty? + merged << Sample.new(v, 1, 0) + else + merged << Sample.new(v, 1, 0) + end + end + + @samples = merged + end + + def compress + return if @samples.size < 3 + + i = @samples.size - 2 + cumulative = @samples.last.width + + while i >= 1 + s = @samples[i] + next_s = @samples[i + 1] + cumulative_at_i = @observations - cumulative + + if s.width + next_s.width + next_s.delta <= allowable_error(cumulative_at_i) + next_s.width += s.width + @samples.delete_at(i) + end + + cumulative += s.width + i -= 1 + end + end + + def compute_delta(rank) + return 0 if rank <= 1 || rank >= @observations + allowable_error(rank).floor + end + + def allowable_error(rank) + n = @observations.to_f + min_val = Float::INFINITY + + @objectives.each do |quantile, epsilon| + if rank <= quantile * n + err = (2.0 * epsilon * rank) / quantile + else + err = (2.0 * epsilon * (n - rank)) / (1.0 - quantile) + end + min_val = err if err < min_val + end + + min_val + end + end + + class TimeWindowEstimator + def initialize(objectives:, max_age: 600, age_buckets: 5) + @objectives = objectives + @max_age = max_age + @age_buckets = age_buckets + @streams = Array.new(age_buckets) { Estimator.new(objectives) } + @rotation_interval = max_age.to_f / age_buckets + @head = 0 + @last_rotation = current_time + end + + def observe(value) + rotate + @streams.each { |s| s.observe(value) } + end + + def query(quantile) + rotate + @streams[@head].flush + @streams[@head].query(quantile) + end + + def reset + @streams.each(&:reset) + @head = 0 + @last_rotation = current_time + end + + private + + def rotate + now = current_time + elapsed = now - @last_rotation + + rotations = (elapsed / @rotation_interval).floor + return if rotations < 1 + + rotations = @age_buckets if rotations > @age_buckets + + rotations.times do + @head = (@head + 1) % @age_buckets + @streams[@head].reset + end + + @last_rotation = now + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + end + end + end +end diff --git a/spec/prometheus/client/formats/text_spec.rb b/spec/prometheus/client/formats/text_spec.rb index d200ec1c..8a8d8e82 100644 --- a/spec/prometheus/client/formats/text_spec.rb +++ b/spec/prometheus/client/formats/text_spec.rb @@ -41,6 +41,13 @@ 92.times { qux.observe(0) } qux.observe(1243.21) + quux = registry.summary(:quux, + docstring: 'quux description', + labels: [:for], + preset_labels: { for: 'test' }, + objectives: { 0.5 => 0.05, 0.9 => 0.01 }) + 1000.times { |i| quux.observe(i) } + xuq = registry.histogram(:xuq, docstring: 'xuq description', @@ -69,6 +76,12 @@ # HELP qux qux description qux_sum{for="sake",code="1"} 1243.21 qux_count{for="sake",code="1"} 93.0 +# TYPE quux summary +# HELP quux quux description +quux{for="test",quantile="0.5"} 507 +quux{for="test",quantile="0.9"} 920 +quux_sum{for="test"} 499500.0 +quux_count{for="test"} 1000.0 # TYPE xuq histogram # HELP xuq xuq description xuq_bucket{code="ah",le="10"} 1.0 diff --git a/spec/prometheus/client/summary/quantile_spec.rb b/spec/prometheus/client/summary/quantile_spec.rb new file mode 100644 index 00000000..c5b19032 --- /dev/null +++ b/spec/prometheus/client/summary/quantile_spec.rb @@ -0,0 +1,109 @@ +# encoding: UTF-8 + +require 'prometheus/client/summary/quantile' + +describe Prometheus::Client::Quantile do + let(:objectives) { { 0.5 => 0.05, 0.9 => 0.01, 0.99 => 0.001 } } + + describe Prometheus::Client::Quantile::Estimator do + let(:estimator) { described_class.new(objectives) } + + describe '#observe' do + it 'records observations' do + 100.times { |i| estimator.observe(i) } + expect(estimator.observations).to eq(100) + end + end + + describe '#query' do + it 'returns NaN for empty estimator' do + expect(estimator.query(0.5)).to be_nan + end + + it 'returns the value for a single observation' do + estimator.observe(42) + expect(estimator.query(0.5)).to eq(42) + end + + context 'with uniform distribution' do + before do + 10_000.times { |i| estimator.observe(i) } + end + + it 'estimates p50 within epsilon' do + result = estimator.query(0.5) + expect(result).to be_within(10_000 * 0.05 * 2).of(5_000) + end + + it 'estimates p90 within epsilon' do + result = estimator.query(0.9) + expect(result).to be_within(10_000 * 0.01 * 2).of(9_000) + end + + it 'estimates p99 within epsilon' do + result = estimator.query(0.99) + expect(result).to be_within(10_000 * 0.001 * 2).of(9_900) + end + end + + context 'with small number of observations' do + before do + [1, 2, 3, 4, 5].each { |v| estimator.observe(v) } + end + + it 'returns reasonable p50' do + result = estimator.query(0.5) + expect(result).to be_between(2, 4) + end + + it 'returns reasonable p99' do + result = estimator.query(0.99) + expect(result).to be_between(4, 5) + end + end + end + + describe '#flush' do + it 'flushes the buffer into samples' do + 10.times { |i| estimator.observe(i) } + estimator.flush + expect(estimator.query(0.5)).to be_between(3, 7) + end + end + + describe '#reset' do + it 'clears all state' do + 100.times { |i| estimator.observe(i) } + estimator.reset + expect(estimator.observations).to eq(0) + expect(estimator.query(0.5)).to be_nan + end + end + end + + describe Prometheus::Client::Quantile::TimeWindowEstimator do + let(:estimator) do + described_class.new(objectives: objectives, max_age: 10, age_buckets: 5) + end + + describe '#observe and #query' do + it 'tracks observations' do + 1000.times { |i| estimator.observe(i) } + result = estimator.query(0.5) + expect(result).to be_within(1000 * 0.05 * 2).of(500) + end + + it 'returns NaN with no observations' do + expect(estimator.query(0.5)).to be_nan + end + end + + describe '#reset' do + it 'clears all state' do + 100.times { |i| estimator.observe(i) } + estimator.reset + expect(estimator.query(0.5)).to be_nan + end + end + end +end diff --git a/spec/prometheus/client/summary_spec.rb b/spec/prometheus/client/summary_spec.rb index ba02ad36..5102889b 100644 --- a/spec/prometheus/client/summary_spec.rb +++ b/spec/prometheus/client/summary_spec.rb @@ -86,6 +86,65 @@ end end + describe 'with quantile objectives' do + let(:objectives) { { 0.5 => 0.05, 0.9 => 0.01, 0.99 => 0.001 } } + + let(:summary_with_objectives) do + Prometheus::Client::Summary.new(:bar_quantile, + docstring: 'bar description', + labels: expected_labels, + objectives: objectives) + end + + describe '#observe' do + it 'records the given value with quantiles' do + 1000.times { |i| summary_with_objectives.observe(i) } + result = summary_with_objectives.get + expect(result["count"]).to eq(1000.0) + expect(result["sum"]).to eq(499500.0) + expect(result["0.5"]).to be_within(1000 * 0.05 * 2).of(500) + expect(result["0.9"]).to be_within(1000 * 0.01 * 2).of(900) + expect(result["0.99"]).to be_within(1000 * 0.001 * 2).of(990) + end + end + + describe '#get' do + it 'returns NaN for quantiles with no observations' do + result = summary_with_objectives.get + expect(result["count"]).to eq(0.0) + expect(result["sum"]).to eq(0.0) + expect(result["0.5"]).to be_nan + expect(result["0.9"]).to be_nan + expect(result["0.99"]).to be_nan + end + end + + describe '#values' do + it 'includes quantile values' do + 100.times { |i| summary_with_objectives.observe(i) } + vals = summary_with_objectives.values + label_set_values = vals[{}] + expect(label_set_values).to have_key("0.5") + expect(label_set_values).to have_key("0.9") + expect(label_set_values).to have_key("0.99") + expect(label_set_values).to have_key("count") + expect(label_set_values).to have_key("sum") + end + end + + describe '#with_labels' do + let(:expected_labels) { [:foo] } + + it 'passes through quantile settings' do + with_labels = summary_with_objectives.with_labels(foo: 'bar') + 100.times { |i| with_labels.observe(i) } + result = with_labels.get(labels: { foo: 'bar' }) + expect(result).to have_key("0.5") + expect(result["count"]).to eq(100.0) + end + end + end + describe '#init_label_set' do context "with labels" do let(:expected_labels) { [:status] }