mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
fix: fix rare race on empty head.initialized() vs head.initTime() (#17963)
* fix: fix rare race on empty head.initized() vs head.initTime() Relates to https://github.com/prometheus/prometheus/issues/17941 Signed-off-by: bwplotka <bwplotka@gmail.com> * Apply suggestions from code review Co-authored-by: Owen Williams <owen.williams@grafana.com> Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * addressed comments Signed-off-by: bwplotka <bwplotka@gmail.com> --------- Signed-off-by: bwplotka <bwplotka@gmail.com> Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> Co-authored-by: Owen Williams <owen.williams@grafana.com>
This commit is contained in:
parent
9657c23c37
commit
eefa6178fb
3 changed files with 221 additions and 357 deletions
|
|
@ -19,6 +19,7 @@ import (
|
|||
"fmt"
|
||||
"log/slog"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
|
|
@ -117,10 +118,19 @@ func (a *initAppender) AppendSTZeroSample(ref storage.SeriesRef, lset labels.Lab
|
|||
// for a completely fresh head with an empty WAL.
|
||||
func (h *Head) initTime(t int64) {
|
||||
if !h.minTime.CompareAndSwap(math.MaxInt64, t) {
|
||||
// Concurrent appends that are initializing.
|
||||
// Wait until h.maxTime is swapped to avoid minTime/maxTime races.
|
||||
antiDeadlockTimeout := time.After(500 * time.Millisecond)
|
||||
for h.maxTime.Load() == math.MinInt64 {
|
||||
select {
|
||||
case <-antiDeadlockTimeout:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
// Ensure that max time is initialized to at least the min time we just set.
|
||||
// Concurrent appenders may already have set it to a higher value.
|
||||
h.maxTime.CompareAndSwap(math.MinInt64, t)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ import (
|
|||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
|
|
@ -56,207 +55,14 @@ import (
|
|||
"github.com/prometheus/prometheus/util/testutil/synctest"
|
||||
)
|
||||
|
||||
// TODO(bwplotka): Ensure non-ported tests are not deleted from db_test.go when removing AppenderV1 flow (#17632),
|
||||
// TODO(bwplotka): Ensure non-ported tests are not deleted from head_test.go when removing AppenderV1 flow (#17632),
|
||||
// for example:
|
||||
// * TestChunkNotFoundHeadGCRace
|
||||
// * TestHeadSeriesChunkRace
|
||||
// * TestHeadLabelValuesWithMatchers
|
||||
// * TestHeadLabelNamesWithMatchers
|
||||
// * TestHeadShardedPostings
|
||||
|
||||
// TestHeadAppenderV2_HighConcurrencyReadAndWrite generates 1000 series with a step of 15s and fills a whole block with samples,
|
||||
// this means in total it generates 4000 chunks because with a step of 15s there are 4 chunks per block per series.
|
||||
// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
|
||||
// returned results are correct.
|
||||
func TestHeadAppenderV2_HighConcurrencyReadAndWrite(t *testing.T) {
|
||||
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
|
||||
defer func() {
|
||||
require.NoError(t, head.Close())
|
||||
}()
|
||||
|
||||
seriesCnt := 1000
|
||||
readConcurrency := 2
|
||||
writeConcurrency := 10
|
||||
startTs := uint64(DefaultBlockDuration) // start at the second block relative to the unix epoch.
|
||||
qryRange := uint64(5 * time.Minute.Milliseconds())
|
||||
step := uint64(15 * time.Second / time.Millisecond)
|
||||
endTs := startTs + uint64(DefaultBlockDuration)
|
||||
|
||||
labelSets := make([]labels.Labels, seriesCnt)
|
||||
for i := range seriesCnt {
|
||||
labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i))
|
||||
}
|
||||
|
||||
head.Init(0)
|
||||
|
||||
g, ctx := errgroup.WithContext(context.Background())
|
||||
whileNotCanceled := func(f func() (bool, error)) error {
|
||||
for ctx.Err() == nil {
|
||||
cont, err := f()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !cont {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create one channel for each write worker, the channels will be used by the coordinator
|
||||
// go routine to coordinate which timestamps each write worker has to write.
|
||||
writerTsCh := make([]chan uint64, writeConcurrency)
|
||||
for writerTsChIdx := range writerTsCh {
|
||||
writerTsCh[writerTsChIdx] = make(chan uint64)
|
||||
}
|
||||
|
||||
// workerReadyWg is used to synchronize the start of the test,
|
||||
// we only start the test once all workers signal that they're ready.
|
||||
var workerReadyWg sync.WaitGroup
|
||||
workerReadyWg.Add(writeConcurrency + readConcurrency)
|
||||
|
||||
// Start the write workers.
|
||||
for wid := range writeConcurrency {
|
||||
// Create copy of workerID to be used by worker routine.
|
||||
workerID := wid
|
||||
|
||||
g.Go(func() error {
|
||||
// The label sets which this worker will write.
|
||||
workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)]
|
||||
|
||||
// Signal that this worker is ready.
|
||||
workerReadyWg.Done()
|
||||
|
||||
return whileNotCanceled(func() (bool, error) {
|
||||
ts, ok := <-writerTsCh[workerID]
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
app := head.AppenderV2(ctx)
|
||||
for i := range workerLabelSets {
|
||||
// We also use the timestamp as the sample value.
|
||||
_, err := app.Append(0, workerLabelSets[i], 0, int64(ts), float64(ts), nil, nil, storage.AOptions{})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Error when appending to head: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return true, app.Commit()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// queryHead is a helper to query the head for a given time range and labelset.
|
||||
queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]chunks.Sample, error) {
|
||||
q, err := NewBlockQuerier(head, int64(mint), int64(maxt))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
|
||||
}
|
||||
|
||||
// readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read.
|
||||
readerTsCh := make(chan uint64)
|
||||
|
||||
// Start the read workers.
|
||||
for wid := range readConcurrency {
|
||||
// Create copy of threadID to be used by worker routine.
|
||||
workerID := wid
|
||||
|
||||
g.Go(func() error {
|
||||
querySeriesRef := (seriesCnt / readConcurrency) * workerID
|
||||
|
||||
// Signal that this worker is ready.
|
||||
workerReadyWg.Done()
|
||||
|
||||
return whileNotCanceled(func() (bool, error) {
|
||||
ts, ok := <-readerTsCh
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
querySeriesRef = (querySeriesRef + 1) % seriesCnt
|
||||
lbls := labelSets[querySeriesRef]
|
||||
// lbls has a single entry; extract it so we can run a query.
|
||||
var lbl labels.Label
|
||||
lbls.Range(func(l labels.Label) {
|
||||
lbl = l
|
||||
})
|
||||
samples, err := queryHead(ts-qryRange, ts, lbl)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(samples) != 1 {
|
||||
return false, fmt.Errorf("expected 1 series, got %d", len(samples))
|
||||
}
|
||||
|
||||
series := lbls.String()
|
||||
expectSampleCnt := qryRange/step + 1
|
||||
if expectSampleCnt != uint64(len(samples[series])) {
|
||||
return false, fmt.Errorf("expected %d samples, got %d", expectSampleCnt, len(samples[series]))
|
||||
}
|
||||
|
||||
for sampleIdx, sample := range samples[series] {
|
||||
expectedValue := ts - qryRange + (uint64(sampleIdx) * step)
|
||||
if sample.T() != int64(expectedValue) {
|
||||
return false, fmt.Errorf("expected sample %d to have ts %d, got %d", sampleIdx, expectedValue, sample.T())
|
||||
}
|
||||
if sample.F() != float64(expectedValue) {
|
||||
return false, fmt.Errorf("expected sample %d to have value %d, got %f", sampleIdx, expectedValue, sample.F())
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// Start the coordinator go routine.
|
||||
g.Go(func() error {
|
||||
currTs := startTs
|
||||
|
||||
defer func() {
|
||||
// End of the test, close all channels to stop the workers.
|
||||
for _, ch := range writerTsCh {
|
||||
close(ch)
|
||||
}
|
||||
close(readerTsCh)
|
||||
}()
|
||||
|
||||
// Wait until all workers are ready to start the test.
|
||||
workerReadyWg.Wait()
|
||||
return whileNotCanceled(func() (bool, error) {
|
||||
// Send the current timestamp to each of the writers.
|
||||
for _, ch := range writerTsCh {
|
||||
select {
|
||||
case ch <- currTs:
|
||||
case <-ctx.Done():
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Once data for at least <qryRange> has been ingested, send the current timestamp to the readers.
|
||||
if currTs > startTs+qryRange {
|
||||
select {
|
||||
case readerTsCh <- currTs - step:
|
||||
case <-ctx.Done():
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
currTs += step
|
||||
if currTs > endTs {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
})
|
||||
|
||||
require.NoError(t, g.Wait())
|
||||
}
|
||||
// * TestHead_HighConcurrencyReadAndWrite
|
||||
|
||||
func TestHeadAppenderV2_WALMultiRef(t *testing.T) {
|
||||
head, w := newTestHead(t, 1000, compression.None, false)
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
|
|
@ -471,195 +472,242 @@ func BenchmarkLoadRealWLs(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestHead_InitAppenderRace_ErrOutOfBounds tests against init races with maxTime vs minTime on empty head concurrent appends.
|
||||
// See: https://github.com/prometheus/prometheus/pull/17963
|
||||
func TestHead_InitAppenderRace_ErrOutOfBounds(t *testing.T) {
|
||||
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
|
||||
require.NoError(t, head.Init(0))
|
||||
|
||||
ts := timestamp.FromTime(time.Now())
|
||||
appendCycles := 100
|
||||
|
||||
g, ctx := errgroup.WithContext(t.Context())
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
for i := range 100 {
|
||||
g.Go(func() error {
|
||||
appends := 0
|
||||
wg.Wait()
|
||||
for ctx.Err() == nil && appends < appendCycles {
|
||||
appends++
|
||||
app := head.Appender(t.Context())
|
||||
if _, err := app.Append(0, labels.FromStrings("__name__", strconv.Itoa(i)), ts, float64(ts)); err != nil {
|
||||
return fmt.Errorf("error when appending to head: %w", err)
|
||||
}
|
||||
if err := app.Rollback(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
wg.Done()
|
||||
require.NoError(t, g.Wait())
|
||||
}
|
||||
|
||||
// TestHead_HighConcurrencyReadAndWrite generates 1000 series with a step of 15s and fills a whole block with samples,
|
||||
// this means in total it generates 4000 chunks because with a step of 15s there are 4 chunks per block per series.
|
||||
// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
|
||||
// returned results are correct.
|
||||
func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
|
||||
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
|
||||
for _, appV2 := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("appV2=%v", appV2), func(t *testing.T) {
|
||||
head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
|
||||
|
||||
seriesCnt := 1000
|
||||
readConcurrency := 2
|
||||
writeConcurrency := 10
|
||||
startTs := uint64(DefaultBlockDuration) // start at the second block relative to the unix epoch.
|
||||
qryRange := uint64(5 * time.Minute.Milliseconds())
|
||||
step := uint64(15 * time.Second / time.Millisecond)
|
||||
endTs := startTs + uint64(DefaultBlockDuration)
|
||||
seriesCnt := 1000
|
||||
readConcurrency := 2
|
||||
writeConcurrency := 10
|
||||
startTs := uint64(DefaultBlockDuration) // Start at the second block relative to the unix epoch.
|
||||
qryRange := uint64(5 * time.Minute.Milliseconds())
|
||||
step := uint64(15 * time.Second / time.Millisecond)
|
||||
endTs := startTs + uint64(DefaultBlockDuration)
|
||||
|
||||
labelSets := make([]labels.Labels, seriesCnt)
|
||||
for i := range seriesCnt {
|
||||
labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i))
|
||||
}
|
||||
|
||||
head.Init(0)
|
||||
|
||||
g, ctx := errgroup.WithContext(context.Background())
|
||||
whileNotCanceled := func(f func() (bool, error)) error {
|
||||
for ctx.Err() == nil {
|
||||
cont, err := f()
|
||||
if err != nil {
|
||||
return err
|
||||
labelSets := make([]labels.Labels, seriesCnt)
|
||||
for i := range seriesCnt {
|
||||
labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i))
|
||||
}
|
||||
if !cont {
|
||||
require.NoError(t, head.Init(0))
|
||||
|
||||
g, ctx := errgroup.WithContext(t.Context())
|
||||
whileNotCanceled := func(f func() (bool, error)) error {
|
||||
for ctx.Err() == nil {
|
||||
cont, err := f()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !cont {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create one channel for each write worker, the channels will be used by the coordinator
|
||||
// go routine to coordinate which timestamps each write worker has to write.
|
||||
writerTsCh := make([]chan uint64, writeConcurrency)
|
||||
for writerTsChIdx := range writerTsCh {
|
||||
writerTsCh[writerTsChIdx] = make(chan uint64)
|
||||
}
|
||||
// Create one channel for each write worker, the channels will be used by the coordinator
|
||||
// go routine to coordinate which timestamps each write worker has to write.
|
||||
writerTsCh := make([]chan uint64, writeConcurrency)
|
||||
for writerTsChIdx := range writerTsCh {
|
||||
writerTsCh[writerTsChIdx] = make(chan uint64)
|
||||
}
|
||||
|
||||
// workerReadyWg is used to synchronize the start of the test,
|
||||
// we only start the test once all workers signal that they're ready.
|
||||
var workerReadyWg sync.WaitGroup
|
||||
workerReadyWg.Add(writeConcurrency + readConcurrency)
|
||||
// workerReadyWg is used to synchronize the start of the test,
|
||||
// we only start the test once all workers signal that they're ready.
|
||||
var workerReadyWg sync.WaitGroup
|
||||
workerReadyWg.Add(writeConcurrency + readConcurrency)
|
||||
|
||||
// Start the write workers.
|
||||
for wid := range writeConcurrency {
|
||||
// Create copy of workerID to be used by worker routine.
|
||||
workerID := wid
|
||||
// Start the write workers.
|
||||
for wid := range writeConcurrency {
|
||||
// Create copy of workerID to be used by worker routine.
|
||||
workerID := wid
|
||||
|
||||
g.Go(func() error {
|
||||
// The label sets which this worker will write.
|
||||
workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)]
|
||||
g.Go(func() error {
|
||||
// The label sets which this worker will write.
|
||||
workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)]
|
||||
|
||||
// Signal that this worker is ready.
|
||||
workerReadyWg.Done()
|
||||
// Signal that this worker is ready.
|
||||
workerReadyWg.Done()
|
||||
|
||||
return whileNotCanceled(func() (bool, error) {
|
||||
ts, ok := <-writerTsCh[workerID]
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
return whileNotCanceled(func() (bool, error) {
|
||||
ts, ok := <-writerTsCh[workerID]
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
app := head.Appender(ctx)
|
||||
for i := range workerLabelSets {
|
||||
// We also use the timestamp as the sample value.
|
||||
_, err := app.Append(0, workerLabelSets[i], int64(ts), float64(ts))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Error when appending to head: %w", err)
|
||||
}
|
||||
}
|
||||
if appV2 {
|
||||
app := head.AppenderV2(ctx)
|
||||
for i := range workerLabelSets {
|
||||
// We also use the timestamp as the sample value.
|
||||
if _, err := app.Append(0, workerLabelSets[i], 0, int64(ts), float64(ts), nil, nil, storage.AOptions{}); err != nil {
|
||||
return false, fmt.Errorf("error when appending (V2) to head: %w", err)
|
||||
}
|
||||
}
|
||||
return true, app.Commit()
|
||||
}
|
||||
|
||||
return true, app.Commit()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// queryHead is a helper to query the head for a given time range and labelset.
|
||||
queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]chunks.Sample, error) {
|
||||
q, err := NewBlockQuerier(head, int64(mint), int64(maxt))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
|
||||
}
|
||||
|
||||
// readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read.
|
||||
readerTsCh := make(chan uint64)
|
||||
|
||||
// Start the read workers.
|
||||
for wid := range readConcurrency {
|
||||
// Create copy of threadID to be used by worker routine.
|
||||
workerID := wid
|
||||
|
||||
g.Go(func() error {
|
||||
querySeriesRef := (seriesCnt / readConcurrency) * workerID
|
||||
|
||||
// Signal that this worker is ready.
|
||||
workerReadyWg.Done()
|
||||
|
||||
return whileNotCanceled(func() (bool, error) {
|
||||
ts, ok := <-readerTsCh
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
querySeriesRef = (querySeriesRef + 1) % seriesCnt
|
||||
lbls := labelSets[querySeriesRef]
|
||||
// lbls has a single entry; extract it so we can run a query.
|
||||
var lbl labels.Label
|
||||
lbls.Range(func(l labels.Label) {
|
||||
lbl = l
|
||||
app := head.Appender(ctx)
|
||||
for i := range workerLabelSets {
|
||||
// We also use the timestamp as the sample value.
|
||||
if _, err := app.Append(0, workerLabelSets[i], int64(ts), float64(ts)); err != nil {
|
||||
return false, fmt.Errorf("error when appending to head: %w", err)
|
||||
}
|
||||
}
|
||||
return true, app.Commit()
|
||||
})
|
||||
})
|
||||
samples, err := queryHead(ts-qryRange, ts, lbl)
|
||||
}
|
||||
|
||||
// queryHead is a helper to query the head for a given time range and labelset.
|
||||
queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]chunks.Sample, error) {
|
||||
q, err := NewBlockQuerier(head, int64(mint), int64(maxt))
|
||||
if err != nil {
|
||||
return false, err
|
||||
return nil, err
|
||||
}
|
||||
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
|
||||
}
|
||||
|
||||
if len(samples) != 1 {
|
||||
return false, fmt.Errorf("expected 1 series, got %d", len(samples))
|
||||
}
|
||||
// readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read.
|
||||
readerTsCh := make(chan uint64)
|
||||
|
||||
series := lbls.String()
|
||||
expectSampleCnt := qryRange/step + 1
|
||||
if expectSampleCnt != uint64(len(samples[series])) {
|
||||
return false, fmt.Errorf("expected %d samples, got %d", expectSampleCnt, len(samples[series]))
|
||||
}
|
||||
// Start the read workers.
|
||||
for wid := range readConcurrency {
|
||||
// Create copy of threadID to be used by worker routine.
|
||||
workerID := wid
|
||||
|
||||
for sampleIdx, sample := range samples[series] {
|
||||
expectedValue := ts - qryRange + (uint64(sampleIdx) * step)
|
||||
if sample.T() != int64(expectedValue) {
|
||||
return false, fmt.Errorf("expected sample %d to have ts %d, got %d", sampleIdx, expectedValue, sample.T())
|
||||
g.Go(func() error {
|
||||
querySeriesRef := (seriesCnt / readConcurrency) * workerID
|
||||
|
||||
// Signal that this worker is ready.
|
||||
workerReadyWg.Done()
|
||||
|
||||
return whileNotCanceled(func() (bool, error) {
|
||||
ts, ok := <-readerTsCh
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
querySeriesRef = (querySeriesRef + 1) % seriesCnt
|
||||
lbls := labelSets[querySeriesRef]
|
||||
// lbls has a single entry; extract it so we can run a query.
|
||||
var lbl labels.Label
|
||||
lbls.Range(func(l labels.Label) {
|
||||
lbl = l
|
||||
})
|
||||
samples, err := queryHead(ts-qryRange, ts, lbl)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(samples) != 1 {
|
||||
return false, fmt.Errorf("expected 1 series, got %d", len(samples))
|
||||
}
|
||||
|
||||
series := lbls.String()
|
||||
expectSampleCnt := qryRange/step + 1
|
||||
if expectSampleCnt != uint64(len(samples[series])) {
|
||||
return false, fmt.Errorf("expected %d samples, got %d", expectSampleCnt, len(samples[series]))
|
||||
}
|
||||
|
||||
for sampleIdx, sample := range samples[series] {
|
||||
expectedValue := ts - qryRange + (uint64(sampleIdx) * step)
|
||||
if sample.T() != int64(expectedValue) {
|
||||
return false, fmt.Errorf("expected sample %d to have ts %d, got %d", sampleIdx, expectedValue, sample.T())
|
||||
}
|
||||
if sample.F() != float64(expectedValue) {
|
||||
return false, fmt.Errorf("expected sample %d to have value %d, got %f", sampleIdx, expectedValue, sample.F())
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// Start the coordinator go routine.
|
||||
g.Go(func() error {
|
||||
currTs := startTs
|
||||
|
||||
defer func() {
|
||||
// End of the test, close all channels to stop the workers.
|
||||
for _, ch := range writerTsCh {
|
||||
close(ch)
|
||||
}
|
||||
if sample.F() != float64(expectedValue) {
|
||||
return false, fmt.Errorf("expected sample %d to have value %d, got %f", sampleIdx, expectedValue, sample.F())
|
||||
}
|
||||
}
|
||||
close(readerTsCh)
|
||||
}()
|
||||
|
||||
return true, nil
|
||||
// Wait until all workers are ready to start the test.
|
||||
workerReadyWg.Wait()
|
||||
|
||||
return whileNotCanceled(func() (bool, error) {
|
||||
// Send the current timestamp to each of the writers.
|
||||
for _, ch := range writerTsCh {
|
||||
select {
|
||||
case ch <- currTs:
|
||||
case <-ctx.Done():
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Once data for at least <qryRange> has been ingested, send the current timestamp to the readers.
|
||||
if currTs > startTs+qryRange {
|
||||
select {
|
||||
case readerTsCh <- currTs - step:
|
||||
case <-ctx.Done():
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
currTs += step
|
||||
if currTs > endTs {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
})
|
||||
|
||||
require.NoError(t, g.Wait())
|
||||
})
|
||||
}
|
||||
|
||||
// Start the coordinator go routine.
|
||||
g.Go(func() error {
|
||||
currTs := startTs
|
||||
|
||||
defer func() {
|
||||
// End of the test, close all channels to stop the workers.
|
||||
for _, ch := range writerTsCh {
|
||||
close(ch)
|
||||
}
|
||||
close(readerTsCh)
|
||||
}()
|
||||
|
||||
// Wait until all workers are ready to start the test.
|
||||
workerReadyWg.Wait()
|
||||
return whileNotCanceled(func() (bool, error) {
|
||||
// Send the current timestamp to each of the writers.
|
||||
for _, ch := range writerTsCh {
|
||||
select {
|
||||
case ch <- currTs:
|
||||
case <-ctx.Done():
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Once data for at least <qryRange> has been ingested, send the current timestamp to the readers.
|
||||
if currTs > startTs+qryRange {
|
||||
select {
|
||||
case readerTsCh <- currTs - step:
|
||||
case <-ctx.Done():
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
currTs += step
|
||||
if currTs > endTs {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
})
|
||||
|
||||
require.NoError(t, g.Wait())
|
||||
}
|
||||
|
||||
func TestHead_ReadWAL(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue