diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 2b26179e58..63cdfb36f4 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -19,6 +19,7 @@ import ( "fmt" "log/slog" "math" + "slices" "strconv" "sync" "time" @@ -2105,12 +2106,11 @@ func setAtomicToNewer(value *atomic.Int64, newValue int64) (previous int64, upda func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) ([]prompb.TimeSeries, *timeSeriesStats) { stats := newTimeSeriesStats() - keepIdx := 0 - for i, ts := range timeSeries { + timeSeries = slices.DeleteFunc(timeSeries, func(ts prompb.TimeSeries) bool { if filter != nil && filter(ts) { stats.recordDropped(len(ts.Samples) > 0, len(ts.Exemplars) > 0, len(ts.Histograms) > 0) - continue + return true } // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. @@ -2123,16 +2123,10 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri if len(ts.Histograms) > 0 { stats.updateTimestamp(ts.Histograms[0].Timestamp) } + return false + }) - if i != keepIdx { - // We have to swap the kept timeseries with the one which should be dropped. - // Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries). - timeSeries[keepIdx], timeSeries[i] = timeSeries[i], timeSeries[keepIdx] - } - keepIdx++ - } - - return timeSeries[:keepIdx], stats + return timeSeries, stats } func buildWriteRequest(logger *slog.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, filter func(prompb.TimeSeries) bool, buf compression.EncodeBuffer, compr compression.Type) (_ []byte, highest, lowest int64, _ error) { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index f1462b4406..a4b05d387a 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -871,7 +871,7 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([ return samples, series } -func createProtoTimeseriesWithOld(numSamples, baseTs int64, _ ...labels.Label) []prompb.TimeSeries { +func createProtoTimeseriesWithOld(numSamples, baseTs int64) []prompb.TimeSeries { samples := make([]prompb.TimeSeries, numSamples) // use a fixed rand source so tests are consistent r := rand.New(rand.NewSource(99)) @@ -2365,8 +2365,14 @@ func BenchmarkBuildTimeSeries(b *testing.B) { // Send one sample per series, which is the typical remote_write case const numSamples = 10000 filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) } + originalSamples := createProtoTimeseriesWithOld(numSamples, 100) + + b.ReportAllocs() for b.Loop() { - samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...) + b.StopTimer() + samples := make([]prompb.TimeSeries, len(originalSamples)) + copy(samples, originalSamples) + b.StartTimer() result, _ := buildTimeSeries(samples, filter) require.NotNil(b, result) }