mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
fix(remote_read): NHCB not returned over remote read samples (#17794)
NHCB is native histograms with custom buckets. prompb is used for both remote write 1.0 and remote read. We do not support NHCB over remote write 1.0 , however we should absolutely support it for remote read. Prometheus remote write 1.0 client already refuses to send NHCB. Prometheus remote write 1.0 server accepts NHCB, but doesn't store custom values, corrupting the result. I'm now handling NHCB correctly, instead of refusing or corrupting. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
6286e3fb55
commit
66bdc88013
3 changed files with 56 additions and 18 deletions
|
|
@ -110,7 +110,7 @@ func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
|
|||
PositiveBuckets: h.GetPositiveCounts(),
|
||||
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
|
||||
NegativeBuckets: h.GetNegativeCounts(),
|
||||
CustomValues: h.CustomValues,
|
||||
CustomValues: h.CustomValues, // CustomValues are immutable.
|
||||
}
|
||||
}
|
||||
// Conversion from integer histogram.
|
||||
|
|
@ -125,6 +125,7 @@ func (h Histogram) ToFloatHistogram() *histogram.FloatHistogram {
|
|||
PositiveBuckets: deltasToCounts(h.GetPositiveDeltas()),
|
||||
NegativeSpans: spansProtoToSpans(h.GetNegativeSpans()),
|
||||
NegativeBuckets: deltasToCounts(h.GetNegativeDeltas()),
|
||||
CustomValues: h.CustomValues, // CustomValues are immutable.
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -161,6 +162,7 @@ func FromIntHistogram(timestamp int64, h *histogram.Histogram) Histogram {
|
|||
PositiveDeltas: h.PositiveBuckets,
|
||||
ResetHint: Histogram_ResetHint(h.CounterResetHint),
|
||||
Timestamp: timestamp,
|
||||
CustomValues: h.CustomValues, // CustomValues are immutable.
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -178,6 +180,7 @@ func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) Histogram
|
|||
PositiveCounts: fh.PositiveBuckets,
|
||||
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
|
||||
Timestamp: timestamp,
|
||||
CustomValues: fh.CustomValues, // CustomValues are immutable.
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -198,17 +198,14 @@ func testFloatHistogram() histogram.FloatHistogram {
|
|||
|
||||
func TestFromIntToFloatOrIntHistogram(t *testing.T) {
|
||||
t.Run("v1", func(t *testing.T) {
|
||||
// v1 does not support nhcb.
|
||||
testIntHistWithoutNHCB := testIntHistogram()
|
||||
testIntHistWithoutNHCB.CustomValues = nil
|
||||
testFloatHistWithoutNHCB := testFloatHistogram()
|
||||
testFloatHistWithoutNHCB.CustomValues = nil
|
||||
testIntHist := testIntHistogram()
|
||||
testFloatHist := testFloatHistogram()
|
||||
|
||||
h := prompb.FromIntHistogram(123, &testIntHistWithoutNHCB)
|
||||
h := prompb.FromIntHistogram(123, &testIntHist)
|
||||
require.False(t, h.IsFloatHistogram())
|
||||
require.Equal(t, int64(123), h.Timestamp)
|
||||
require.Equal(t, testIntHistWithoutNHCB, *h.ToIntHistogram())
|
||||
require.Equal(t, testFloatHistWithoutNHCB, *h.ToFloatHistogram())
|
||||
require.Equal(t, testIntHist, *h.ToIntHistogram())
|
||||
require.Equal(t, testFloatHist, *h.ToFloatHistogram())
|
||||
})
|
||||
t.Run("v2", func(t *testing.T) {
|
||||
testIntHist := testIntHistogram()
|
||||
|
|
@ -224,15 +221,13 @@ func TestFromIntToFloatOrIntHistogram(t *testing.T) {
|
|||
|
||||
func TestFromFloatToFloatHistogram(t *testing.T) {
|
||||
t.Run("v1", func(t *testing.T) {
|
||||
// v1 does not support nhcb.
|
||||
testFloatHistWithoutNHCB := testFloatHistogram()
|
||||
testFloatHistWithoutNHCB.CustomValues = nil
|
||||
testFloatHist := testFloatHistogram()
|
||||
|
||||
h := prompb.FromFloatHistogram(123, &testFloatHistWithoutNHCB)
|
||||
h := prompb.FromFloatHistogram(123, &testFloatHist)
|
||||
require.True(t, h.IsFloatHistogram())
|
||||
require.Equal(t, int64(123), h.Timestamp)
|
||||
require.Nil(t, h.ToIntHistogram())
|
||||
require.Equal(t, testFloatHistWithoutNHCB, *h.ToFloatHistogram())
|
||||
require.Equal(t, testFloatHist, *h.ToFloatHistogram())
|
||||
})
|
||||
t.Run("v2", func(t *testing.T) {
|
||||
testFloatHist := testFloatHistogram()
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ package remote
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
|
|
@ -28,6 +27,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
|
|
@ -64,13 +64,19 @@ func TestSampledReadEndpoint(t *testing.T) {
|
|||
matcher3, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_histogram_metric1")
|
||||
require.NoError(t, err)
|
||||
|
||||
matcher4, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_nhcb_metric1")
|
||||
require.NoError(t, err)
|
||||
|
||||
query1, err := ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"})
|
||||
require.NoError(t, err)
|
||||
|
||||
query2, err := ToQuery(0, 1, []*labels.Matcher{matcher3, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"})
|
||||
require.NoError(t, err)
|
||||
|
||||
req := &prompb.ReadRequest{Queries: []*prompb.Query{query1, query2}}
|
||||
query3, err := ToQuery(0, 1, []*labels.Matcher{matcher4, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"})
|
||||
require.NoError(t, err)
|
||||
|
||||
req := &prompb.ReadRequest{Queries: []*prompb.Query{query1, query2, query3}}
|
||||
data, err := proto.Marshal(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
@ -97,7 +103,7 @@ func TestSampledReadEndpoint(t *testing.T) {
|
|||
err = proto.Unmarshal(uncompressed, &resp)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, resp.Results, 2, "Expected 2 results.")
|
||||
require.Len(t, resp.Results, 3, "Expected 3 results.")
|
||||
|
||||
require.Equal(t, &prompb.QueryResult{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
|
|
@ -129,6 +135,33 @@ func TestSampledReadEndpoint(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}, resp.Results[1])
|
||||
|
||||
require.Equal(t, &prompb.QueryResult{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "test_nhcb_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
},
|
||||
Histograms: []prompb.Histogram{{
|
||||
// We cannot use prompb.FromFloatHistogram as that's one
|
||||
// of the things we are testing here.
|
||||
Schema: histogram.CustomBucketsSchema,
|
||||
Count: &prompb.Histogram_CountFloat{CountFloat: 5},
|
||||
Sum: 18.4,
|
||||
ZeroCount: &prompb.Histogram_ZeroCountFloat{},
|
||||
PositiveSpans: []prompb.BucketSpan{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveCounts: []float64{1, 2, 1, 1},
|
||||
CustomValues: []float64{0, 1, 2, 3, 4},
|
||||
}},
|
||||
},
|
||||
},
|
||||
}, resp.Results[2])
|
||||
}
|
||||
|
||||
func BenchmarkStreamReadEndpoint(b *testing.B) {
|
||||
|
|
@ -433,10 +466,17 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
func addNativeHistogramsToTestSuite(t *testing.T, storage *teststorage.TestStorage, n int) {
|
||||
lbls := labels.FromStrings("__name__", "test_histogram_metric1", "baz", "qux")
|
||||
|
||||
app := storage.Appender(context.TODO())
|
||||
app := storage.Appender(t.Context())
|
||||
for i, fh := range tsdbutil.GenerateTestFloatHistograms(n) {
|
||||
_, err := app.AppendHistogram(0, lbls, int64(i)*int64(60*time.Second/time.Millisecond), nil, fh)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
lbls = labels.FromStrings("__name__", "test_nhcb_metric1", "baz", "qux")
|
||||
for i, fh := range tsdbutil.GenerateTestCustomBucketsFloatHistograms(n) {
|
||||
_, err := app.AppendHistogram(0, lbls, int64(i)*int64(60*time.Second/time.Millisecond), nil, fh)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue