From 66bdc88013e6c6098da7026ce828d3b33235d527 Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Wed, 7 Jan 2026 08:44:57 +0100 Subject: [PATCH] fix(remote_read): NHCB not returned over remote read samples (#17794) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NHCB is native histograms with custom buckets. prompb is used for both remote write 1.0 and remote read. We do not support NHCB over remote write 1.0 , however we should absolutely support it for remote read. Prometheus remote write 1.0 client already refuses to send NHCB. Prometheus remote write 1.0 server accepts NHCB, but doesn't store custom values, corrupting the result. I'm now handling NHCB correctly, instead of refusing or corrupting. Signed-off-by: György Krajcsovits --- prompb/codec.go | 5 ++- prompb/rwcommon/codec_test.go | 21 +++++-------- storage/remote/read_handler_test.go | 48 ++++++++++++++++++++++++++--- 3 files changed, 56 insertions(+), 18 deletions(-) diff --git a/prompb/codec.go b/prompb/codec.go index 9eb668a8e7..36490984a0 100644 --- a/prompb/codec.go +++ b/prompb/codec.go @@ -110,7 +110,7 @@ func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram { PositiveBuckets: h.GetPositiveCounts(), NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()), NegativeBuckets: h.GetNegativeCounts(), - CustomValues: h.CustomValues, + CustomValues: h.CustomValues, // CustomValues are immutable. } } // Conversion from integer histogram. @@ -125,6 +125,7 @@ func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram { PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()), NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()), NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()), + CustomValues: h.CustomValues, // CustomValues are immutable. } } @@ -161,6 +162,7 @@ func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram { PositiveDeltas: h.PositiveBuckets, ResetHint: Histogram_ResetHint(h.CounterResetHint), Timestamp: timestamp, + CustomValues: h.CustomValues, // CustomValues are immutable. } } @@ -178,6 +180,7 @@ func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram PositiveCounts: fh.PositiveBuckets, ResetHint: Histogram_ResetHint(fh.CounterResetHint), Timestamp: timestamp, + CustomValues: fh.CustomValues, // CustomValues are immutable. } } diff --git a/prompb/rwcommon/codec_test.go b/prompb/rwcommon/codec_test.go index 2e0a72eff9..ee92581f59 100644 --- a/prompb/rwcommon/codec_test.go +++ b/prompb/rwcommon/codec_test.go @@ -198,17 +198,14 @@ func testFloatHistogram() histogram.FloatHistogram { func TestFromIntToFloatOrIntHistogram(t *testing.T) { t.Run("v1", func(t *testing.T) { - // v1 does not support nhcb. - testIntHistWithoutNHCB := testIntHistogram() - testIntHistWithoutNHCB.CustomValues = nil - testFloatHistWithoutNHCB := testFloatHistogram() - testFloatHistWithoutNHCB.CustomValues = nil + testIntHist := testIntHistogram() + testFloatHist := testFloatHistogram() - h := prompb.FromIntHistogram(123, &testIntHistWithoutNHCB) + h := prompb.FromIntHistogram(123, &testIntHist) require.False(t, h.IsFloatHistogram()) require.Equal(t, int64(123), h.Timestamp) - require.Equal(t, testIntHistWithoutNHCB, *h.ToIntHistogram()) - require.Equal(t, testFloatHistWithoutNHCB, *h.ToFloatHistogram()) + require.Equal(t, testIntHist, *h.ToIntHistogram()) + require.Equal(t, testFloatHist, *h.ToFloatHistogram()) }) t.Run("v2", func(t *testing.T) { testIntHist := testIntHistogram() @@ -224,15 +221,13 @@ func TestFromIntToFloatOrIntHistogram(t *testing.T) { func TestFromFloatToFloatHistogram(t *testing.T) { t.Run("v1", func(t *testing.T) { - // v1 does not support nhcb. - testFloatHistWithoutNHCB := testFloatHistogram() - testFloatHistWithoutNHCB.CustomValues = nil + testFloatHist := testFloatHistogram() - h := prompb.FromFloatHistogram(123, &testFloatHistWithoutNHCB) + h := prompb.FromFloatHistogram(123, &testFloatHist) require.True(t, h.IsFloatHistogram()) require.Equal(t, int64(123), h.Timestamp) require.Nil(t, h.ToIntHistogram()) - require.Equal(t, testFloatHistWithoutNHCB, *h.ToFloatHistogram()) + require.Equal(t, testFloatHist, *h.ToFloatHistogram()) }) t.Run("v2", func(t *testing.T) { testFloatHist := testFloatHistogram() diff --git a/storage/remote/read_handler_test.go b/storage/remote/read_handler_test.go index 255a037d1e..a59c940f30 100644 --- a/storage/remote/read_handler_test.go +++ b/storage/remote/read_handler_test.go @@ -15,7 +15,6 @@ package remote import ( "bytes" - "context" "errors" "io" "net/http" @@ -28,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/promql/promqltest" @@ -64,13 +64,19 @@ func TestSampledReadEndpoint(t *testing.T) { matcher3, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_histogram_metric1") require.NoError(t, err) + matcher4, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_nhcb_metric1") + require.NoError(t, err) + query1, err := ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"}) require.NoError(t, err) query2, err := ToQuery(0, 1, []*labels.Matcher{matcher3, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"}) require.NoError(t, err) - req := &prompb.ReadRequest{Queries: []*prompb.Query{query1, query2}} + query3, err := ToQuery(0, 1, []*labels.Matcher{matcher4, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"}) + require.NoError(t, err) + + req := &prompb.ReadRequest{Queries: []*prompb.Query{query1, query2, query3}} data, err := proto.Marshal(req) require.NoError(t, err) @@ -97,7 +103,7 @@ func TestSampledReadEndpoint(t *testing.T) { err = proto.Unmarshal(uncompressed, &resp) require.NoError(t, err) - require.Len(t, resp.Results, 2, "Expected 2 results.") + require.Len(t, resp.Results, 3, "Expected 3 results.") require.Equal(t, &prompb.QueryResult{ Timeseries: []*prompb.TimeSeries{ @@ -129,6 +135,33 @@ func TestSampledReadEndpoint(t *testing.T) { }, }, }, resp.Results[1]) + + require.Equal(t, &prompb.QueryResult{ + Timeseries: []*prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_nhcb_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + }, + Histograms: []prompb.Histogram{{ + // We cannot use prompb.FromFloatHistogram as that's one + // of the things we are testing here. + Schema: histogram.CustomBucketsSchema, + Count: &prompb.Histogram_CountFloat{CountFloat: 5}, + Sum: 18.4, + ZeroCount: &prompb.Histogram_ZeroCountFloat{}, + PositiveSpans: []prompb.BucketSpan{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveCounts: []float64{1, 2, 1, 1}, + CustomValues: []float64{0, 1, 2, 3, 4}, + }}, + }, + }, + }, resp.Results[2]) } func BenchmarkStreamReadEndpoint(b *testing.B) { @@ -433,10 +466,17 @@ func TestStreamReadEndpoint(t *testing.T) { func addNativeHistogramsToTestSuite(t *testing.T, storage *teststorage.TestStorage, n int) { lbls := labels.FromStrings("__name__", "test_histogram_metric1", "baz", "qux") - app := storage.Appender(context.TODO()) + app := storage.Appender(t.Context()) for i, fh := range tsdbutil.GenerateTestFloatHistograms(n) { _, err := app.AppendHistogram(0, lbls, int64(i)*int64(60*time.Second/time.Millisecond), nil, fh) require.NoError(t, err) } + + lbls = labels.FromStrings("__name__", "test_nhcb_metric1", "baz", "qux") + for i, fh := range tsdbutil.GenerateTestCustomBucketsFloatHistograms(n) { + _, err := app.AppendHistogram(0, lbls, int64(i)*int64(60*time.Second/time.Millisecond), nil, fh) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) }