Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# Changelog

## master / unreleased

## 1.21.0 in progress
## 1.21.0 2026-04-24

* [CHANGE] Ruler: Graduate Ruler API from experimental. #7312
* Flag: Renamed `-experimental.ruler.enable-api` to `-ruler.enable-api`. The old flag is kept as deprecated.
Expand Down Expand Up @@ -71,6 +69,7 @@
* [ENHANCEMENT] Parquet: Support sharded parquet files in parquet converter and queryable. #7189
* [ENHANCEMENT] Compactor: Add graceful period for compaction groups to prevent compacting recently written blocks. #7182
* [ENHANCEMENT] Query Engine: Add projection pushdown optimizer for improved query performance. #7141
* [ENHANCEMENT] Distributor: Optimize memory allocations by pooling PreallocWriteRequestV2 and preserving the capacity of the Symbols slice during resets. #7404
* [ENHANCEMENT] Ruler: Allow ExternalPusher and ExternalQueryable to be specified separately. #7224
* [BUGFIX] Distributor: Add bounds checking for symbol references in Remote Write V2 requests to prevent panics when UnitRef or HelpRef exceed the symbols array length. #7290
* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.21.0-rc.1
1.21.0
2 changes: 1 addition & 1 deletion docs/getting-started/.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CORTEX_VERSION=v1.20.1
CORTEX_VERSION=v1.21.0
GRAFANA_VERSION=10.4.2
PROMETHEUS_VERSION=v3.2.1
SEAWEEDFS_VERSION=3.67
Expand Down
8 changes: 8 additions & 0 deletions pkg/cortexpb/timeseriesv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 {
return writeRequestPoolV2.Get().(*PreallocWriteRequestV2)
}

// Reset implements proto.Message and preserves the capacity of the Symbols slice.
func (p *PreallocWriteRequestV2) Reset() {
savedSymbols := p.Symbols
p.WriteRequestV2.Reset()
p.Symbols = savedSymbols[:0]
p.data = nil
}

// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool.
// ReuseSliceV2 should be called once done.
func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 {
Expand Down
66 changes: 66 additions & 0 deletions pkg/cortexpb/timeseriesv2_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cortexpb

import (
"fmt"
"testing"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -84,6 +85,71 @@ func TestReuseWriteRequestV2(t *testing.T) {
assert.Nil(t, req.Timeseries)
}

func TestPreallocWriteRequestV2Reset(t *testing.T) {
t.Run("preserves Symbols capacity", func(t *testing.T) {
const symbolsCap = 100
req := &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Symbols: make([]string, 0, symbolsCap),
},
}
req.Symbols = append(req.Symbols, "a", "b", "c")

ptrBefore := &req.Symbols[:cap(req.Symbols)][0]

req.Reset()

assert.Equal(t, 0, len(req.Symbols), "Symbols length should be 0 after Reset")
assert.Equal(t, symbolsCap, cap(req.Symbols), "Symbols capacity should be preserved after Reset")
assert.Same(t, ptrBefore, &req.Symbols[:cap(req.Symbols)][0], "Symbols backing array should be reused after Reset")
})

t.Run("clears non-Symbols WriteRequestV2 fields", func(t *testing.T) {
b := []byte{1, 2, 3}
req := &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Source: RULE,
SkipLabelNameValidation: true,
Timeseries: []PreallocTimeseriesV2{{TimeSeriesV2: &TimeSeriesV2{}}},
},
data: &b,
}

req.Reset()

assert.Equal(t, SourceEnum(0), req.Source)
assert.False(t, req.SkipLabelNameValidation)
assert.Nil(t, req.Timeseries)
assert.Nil(t, req.data)
})

t.Run("Unmarshal after Reset reuses Symbols backing array", func(t *testing.T) {
const symbolsCount = 50
symbols := make([]string, symbolsCount)
for i := range symbols {
symbols[i] = fmt.Sprintf("symbol_%04d", i)
}
data, err := (&WriteRequestV2{Symbols: symbols}).Marshal()
require.NoError(t, err)

req := &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Symbols: make([]string, 0, symbolsCount*2),
},
}

// Simulate Reset in util.ParseProtoReader()
req.Reset()
ptrAfterReset := &req.Symbols[:cap(req.Symbols)][0]
capAfterReset := cap(req.Symbols)

require.NoError(t, req.WriteRequestV2.Unmarshal(data))
assert.Equal(t, symbolsCount, len(req.Symbols))
assert.Equal(t, capAfterReset, cap(req.Symbols), "capacity should not change: Unmarshal reused the existing backing array")
assert.Same(t, ptrAfterReset, &req.Symbols[:cap(req.Symbols)][0], "backing array pointer should be identical: no new allocation occurred")
})
}

func BenchmarkMarshallWriteRequestV2(b *testing.B) {
ts := PreallocTimeseriesV2SliceFromPool()

Expand Down
11 changes: 4 additions & 7 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,11 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
return
}

var req cortexpb.PreallocWriteRequestV2
req := cortexpb.PreallocWriteRequestV2FromPool()
// v1 request is put back into the pool by the Distributor.
defer func() {
cortexpb.ReuseWriteRequestV2(&req)
req.Free()
}()
defer cortexpb.ReuseWriteRequestV2(req)

err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, req, util.RawSnappy)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -112,7 +109,7 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool,
req.Source = cortexpb.API
}

v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID))
v1Req, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID))
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
119 changes: 117 additions & 2 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// benchMaxRecvMsgSize is the max message size used in benchmarks.
const benchMaxRecvMsgSize = 100 * 1024 * 1024

var (
testHistogram = histogram.Histogram{
Schema: 2,
Expand All @@ -42,6 +46,51 @@ var (
}
)

// makeV2ReqWithSeriesAndSymbols builds a PRW2 request with the given number of
// series and symbols
func makeV2ReqWithSeriesAndSymbols(seriesNum, symbolCount int) *cortexpb.PreallocWriteRequestV2 {
const baseSymbols = 5 // "", "__name__", "bench_metric", "help text", "unit"
if symbolCount < baseSymbols {
symbolCount = baseSymbols
}

symbols := make([]string, 0, symbolCount)
symbols = append(symbols, "", "__name__", "bench_metric", "help text", "unit")

extraPairs := (symbolCount - baseSymbols) / 2
for i := range extraPairs {
symbols = append(symbols, fmt.Sprintf("lbl_%d", i), fmt.Sprintf("val_%d", i))
}

labelsRefs := []uint32{1, 2} // __name__ = "bench_metric"
for i := range extraPairs {
nameIdx := uint32(baseSymbols + i*2)
labelsRefs = append(labelsRefs, nameIdx, nameIdx+1)
}

ts := make([]cortexpb.PreallocTimeseriesV2, 0, seriesNum)
for range seriesNum {
ts = append(ts, cortexpb.PreallocTimeseriesV2{
TimeSeriesV2: &cortexpb.TimeSeriesV2{
LabelsRefs: labelsRefs,
Metadata: cortexpb.MetadataV2{
Type: cortexpb.METRIC_TYPE_GAUGE,
HelpRef: 3,
UnitRef: 4,
},
Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 10}},
},
})
}

return &cortexpb.PreallocWriteRequestV2{
WriteRequestV2: cortexpb.WriteRequestV2{
Symbols: symbols,
Timeseries: ts,
},
}
}

func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 {
ts := make([]cortexpb.PreallocTimeseriesV2, 0, num)
symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
Expand All @@ -50,8 +99,7 @@ func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 {
TimeSeriesV2: &cortexpb.TimeSeriesV2{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
Metadata: cortexpb.MetadataV2{
Type: cortexpb.METRIC_TYPE_GAUGE,

Type: cortexpb.METRIC_TYPE_GAUGE,
HelpRef: 15,
UnitRef: 16,
},
Expand Down Expand Up @@ -175,6 +223,73 @@ func Benchmark_convertV2RequestToV1(b *testing.B) {
}
}

func makeEncodedPRW2Body(b *testing.B, seriesNum, symbolCount int) (body []byte, contentLength int) {
b.Helper()
series := makeV2ReqWithSeriesAndSymbols(seriesNum, symbolCount)
protobuf, err := series.Marshal()
if err != nil {
b.Fatal(err)
}
encoded := snappy.Encode(nil, protobuf)
return encoded, len(encoded)
}

// runPRW2HandleFromPool simulates handlePRW2 using the sync.Pool
func runPRW2HandleFromPool(ctx context.Context, body []byte, contentLength int, overrides *validation.Overrides, userID string) error {
req := cortexpb.PreallocWriteRequestV2FromPool()
defer cortexpb.ReuseWriteRequestV2(req)

if err := util.ParseProtoReader(ctx, bytes.NewReader(body), contentLength, benchMaxRecvMsgSize, req, util.RawSnappy); err != nil {
return err
}
_, err := convertV2RequestToV1(req, overrides.EnableTypeAndUnitLabels(userID))
return err
}

// runPRW2HandleFromScratch simulates handlePRW2 without using the sync.Pool.
func runPRW2HandleFromScratch(ctx context.Context, body []byte, contentLength int, overrides *validation.Overrides, userID string) error {
var req cortexpb.PreallocWriteRequestV2
defer cortexpb.ReuseWriteRequestV2(&req)

if err := util.ParseProtoReader(ctx, bytes.NewReader(body), contentLength, benchMaxRecvMsgSize, &req, util.RawSnappy); err != nil {
return err
}
_, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID))
return err
}

// Benchmark_HandlePRW2_PoolVsScratch compares two allocation strategies for the PRW2 parse path.
// - pool: req := cortexpb.PreallocWriteRequestV2FromPool() + defer ReuseWriteRequestV2(req)
// - scratch: var req cortexpb.PreallocWriteRequestV2 + defer ReuseWriteRequestV2(&req)
func Benchmark_HandlePRW2_PoolVsScratch(b *testing.B) {
var limits validation.Limits
flagext.DefaultValues(&limits)
overrides := validation.NewOverrides(limits, nil)

userID := "bench-user"
seriesNum := 100
ctx := user.InjectOrgID(context.Background(), userID)

for _, symCount := range []int{32, 128, 512, 2048, 4096} {
body, contentLength := makeEncodedPRW2Body(b, seriesNum, symCount)
name := fmt.Sprintf("symbols=%d", symCount)

b.Run("pool/"+name, func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
require.NoError(b, runPRW2HandleFromPool(ctx, body, contentLength, overrides, userID))
}
})

b.Run("scratch/"+name, func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
require.NoError(b, runPRW2HandleFromScratch(ctx, body, contentLength, overrides, userID))
}
})
}
}

func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) {
symbols := []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes", "__type__", "exist type", "__unit__", "exist unit"}
samples := []cortexpb.Sample{
Expand Down
Loading