mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
remote write: simplify readability of timeseries filtering by using the slices package (#14318)
* simplify readability of timeseries filtering by using the slices package Signed-off-by: Callum Styan <callumstyan@gmail.com> * ensure that BenchmarkBuildTimeSeries doesn't account for the building of the actual proto in the benchmark results, we only care about the buildTimeSeries call Signed-off-by: Callum Styan <callumstyan@gmail.com> --------- Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
parent
2597a12080
commit
97e7ef802c
2 changed files with 14 additions and 14 deletions
|
|
@ -19,6 +19,7 @@ import (
|
|||
"fmt"
|
||||
"log/slog"
|
||||
"math"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -2105,12 +2106,11 @@ func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, upda
|
|||
|
||||
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) ([]prompb.TimeSeries, *timeSeriesStats) {
|
||||
stats := newTimeSeriesStats()
|
||||
keepIdx := 0
|
||||
|
||||
for i, ts := range timeSeries {
|
||||
timeSeries = slices.DeleteFunc(timeSeries, func(ts prompb.TimeSeries) bool {
|
||||
if filter != nil && filter(ts) {
|
||||
stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0)
|
||||
continue
|
||||
return true
|
||||
}
|
||||
|
||||
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
|
||||
|
|
@ -2123,16 +2123,10 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri
|
|||
if len(ts.Histograms) > 0 {
|
||||
stats.updateTimestamp(ts.Histograms[0].Timestamp)
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
if i != keepIdx {
|
||||
// We have to swap the kept timeseries with the one which should be dropped.
|
||||
// Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries).
|
||||
timeSeries[keepIdx], timeSeries[i] = timeSeries[i], timeSeries[keepIdx]
|
||||
}
|
||||
keepIdx++
|
||||
}
|
||||
|
||||
return timeSeries[:keepIdx], stats
|
||||
return timeSeries, stats
|
||||
}
|
||||
|
||||
func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, filter func(prompb.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (_ []byte, highest, lowest int64, _ error) {
|
||||
|
|
|
|||
|
|
@ -871,7 +871,7 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
|
|||
return samples, series
|
||||
}
|
||||
|
||||
func createProtoTimeseriesWithOld(numSamples, baseTs int64, _ ...labels.Label) []prompb.TimeSeries {
|
||||
func createProtoTimeseriesWithOld(numSamples, baseTs int64) []prompb.TimeSeries {
|
||||
samples := make([]prompb.TimeSeries, numSamples)
|
||||
// use a fixed rand source so tests are consistent
|
||||
r := rand.New(rand.NewSource(99))
|
||||
|
|
@ -2365,8 +2365,14 @@ func BenchmarkBuildTimeSeries(b *testing.B) {
|
|||
// Send one sample per series, which is the typical remote_write case
|
||||
const numSamples = 10000
|
||||
filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) }
|
||||
originalSamples := createProtoTimeseriesWithOld(numSamples, 100)
|
||||
|
||||
b.ReportAllocs()
|
||||
for b.Loop() {
|
||||
samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...)
|
||||
b.StopTimer()
|
||||
samples := make([]prompb.TimeSeries, len(originalSamples))
|
||||
copy(samples, originalSamples)
|
||||
b.StartTimer()
|
||||
result, _ := buildTimeSeries(samples, filter)
|
||||
require.NotNil(b, result)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue