diff --git a/tsdb/head_append.go b/tsdb/head_append.go index c171079509..005d20b720 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -19,6 +19,7 @@ import ( "fmt" "log/slog" "math" + "time" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" @@ -117,10 +118,19 @@ func (a *initAppender) AppendSTZeroSample(ref storage.SeriesRef, lset labels.Lab // for a completely fresh head with an empty WAL. func (h *Head) initTime(t int64) { if !h.minTime.CompareAndSwap(math.MaxInt64, t) { + // Concurrent appends that are initializing. + // Wait until h.maxTime is swapped to avoid minTime/maxTime races. + antiDeadlockTimeout := time.After(500 * time.Millisecond) + for h.maxTime.Load() == math.MinInt64 { + select { + case <-antiDeadlockTimeout: + return + default: + } + } return } // Ensure that max time is initialized to at least the min time we just set. - // Concurrent appenders may already have set it to a higher value. h.maxTime.CompareAndSwap(math.MinInt64, t) } diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index 6bb88bf16e..082d756e60 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -37,7 +37,6 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "go.uber.org/atomic" - "golang.org/x/sync/errgroup" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/exemplar" @@ -56,207 +55,14 @@ import ( "github.com/prometheus/prometheus/util/testutil/synctest" ) -// TODO(bwplotka): Ensure non-ported tests are not deleted from db_test.go when removing AppenderV1 flow (#17632), +// TODO(bwplotka): Ensure non-ported tests are not deleted from head_test.go when removing AppenderV1 flow (#17632), // for example: // * TestChunkNotFoundHeadGCRace // * TestHeadSeriesChunkRace // * TestHeadLabelValuesWithMatchers // * TestHeadLabelNamesWithMatchers // * TestHeadShardedPostings - -// TestHeadAppenderV2_HighConcurrencyReadAndWrite generates 1000 series with a step of 15s and fills a whole block with samples, -// this means in total it generates 4000 chunks because with a step of 15s there are 4 chunks per block per series. -// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the -// returned results are correct. -func TestHeadAppenderV2_HighConcurrencyReadAndWrite(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) - defer func() { - require.NoError(t, head.Close()) - }() - - seriesCnt := 1000 - readConcurrency := 2 - writeConcurrency := 10 - startTs := uint64(DefaultBlockDuration) // start at the second block relative to the unix epoch. - qryRange := uint64(5 * time.Minute.Milliseconds()) - step := uint64(15 * time.Second / time.Millisecond) - endTs := startTs + uint64(DefaultBlockDuration) - - labelSets := make([]labels.Labels, seriesCnt) - for i := range seriesCnt { - labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i)) - } - - head.Init(0) - - g, ctx := errgroup.WithContext(context.Background()) - whileNotCanceled := func(f func() (bool, error)) error { - for ctx.Err() == nil { - cont, err := f() - if err != nil { - return err - } - if !cont { - return nil - } - } - return nil - } - - // Create one channel for each write worker, the channels will be used by the coordinator - // go routine to coordinate which timestamps each write worker has to write. - writerTsCh := make([]chan uint64, writeConcurrency) - for writerTsChIdx := range writerTsCh { - writerTsCh[writerTsChIdx] = make(chan uint64) - } - - // workerReadyWg is used to synchronize the start of the test, - // we only start the test once all workers signal that they're ready. - var workerReadyWg sync.WaitGroup - workerReadyWg.Add(writeConcurrency + readConcurrency) - - // Start the write workers. - for wid := range writeConcurrency { - // Create copy of workerID to be used by worker routine. - workerID := wid - - g.Go(func() error { - // The label sets which this worker will write. - workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)] - - // Signal that this worker is ready. - workerReadyWg.Done() - - return whileNotCanceled(func() (bool, error) { - ts, ok := <-writerTsCh[workerID] - if !ok { - return false, nil - } - - app := head.AppenderV2(ctx) - for i := range workerLabelSets { - // We also use the timestamp as the sample value. - _, err := app.Append(0, workerLabelSets[i], 0, int64(ts), float64(ts), nil, nil, storage.AOptions{}) - if err != nil { - return false, fmt.Errorf("Error when appending to head: %w", err) - } - } - - return true, app.Commit() - }) - }) - } - - // queryHead is a helper to query the head for a given time range and labelset. - queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]chunks.Sample, error) { - q, err := NewBlockQuerier(head, int64(mint), int64(maxt)) - if err != nil { - return nil, err - } - return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil - } - - // readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read. - readerTsCh := make(chan uint64) - - // Start the read workers. - for wid := range readConcurrency { - // Create copy of threadID to be used by worker routine. - workerID := wid - - g.Go(func() error { - querySeriesRef := (seriesCnt / readConcurrency) * workerID - - // Signal that this worker is ready. - workerReadyWg.Done() - - return whileNotCanceled(func() (bool, error) { - ts, ok := <-readerTsCh - if !ok { - return false, nil - } - - querySeriesRef = (querySeriesRef + 1) % seriesCnt - lbls := labelSets[querySeriesRef] - // lbls has a single entry; extract it so we can run a query. - var lbl labels.Label - lbls.Range(func(l labels.Label) { - lbl = l - }) - samples, err := queryHead(ts-qryRange, ts, lbl) - if err != nil { - return false, err - } - - if len(samples) != 1 { - return false, fmt.Errorf("expected 1 series, got %d", len(samples)) - } - - series := lbls.String() - expectSampleCnt := qryRange/step + 1 - if expectSampleCnt != uint64(len(samples[series])) { - return false, fmt.Errorf("expected %d samples, got %d", expectSampleCnt, len(samples[series])) - } - - for sampleIdx, sample := range samples[series] { - expectedValue := ts - qryRange + (uint64(sampleIdx) * step) - if sample.T() != int64(expectedValue) { - return false, fmt.Errorf("expected sample %d to have ts %d, got %d", sampleIdx, expectedValue, sample.T()) - } - if sample.F() != float64(expectedValue) { - return false, fmt.Errorf("expected sample %d to have value %d, got %f", sampleIdx, expectedValue, sample.F()) - } - } - - return true, nil - }) - }) - } - - // Start the coordinator go routine. - g.Go(func() error { - currTs := startTs - - defer func() { - // End of the test, close all channels to stop the workers. - for _, ch := range writerTsCh { - close(ch) - } - close(readerTsCh) - }() - - // Wait until all workers are ready to start the test. - workerReadyWg.Wait() - return whileNotCanceled(func() (bool, error) { - // Send the current timestamp to each of the writers. - for _, ch := range writerTsCh { - select { - case ch <- currTs: - case <-ctx.Done(): - return false, nil - } - } - - // Once data for at least has been ingested, send the current timestamp to the readers. - if currTs > startTs+qryRange { - select { - case readerTsCh <- currTs - step: - case <-ctx.Done(): - return false, nil - } - } - - currTs += step - if currTs > endTs { - return false, nil - } - - return true, nil - }) - }) - - require.NoError(t, g.Wait()) -} +// * TestHead_HighConcurrencyReadAndWrite func TestHeadAppenderV2_WALMultiRef(t *testing.T) { head, w := newTestHead(t, 1000, compression.None, false) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index aee61602ff..7b8ae0ecbd 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -44,6 +44,7 @@ import ( "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -471,195 +472,242 @@ func BenchmarkLoadRealWLs(b *testing.B) { } } +// TestHead_InitAppenderRace_ErrOutOfBounds tests against init races with maxTime vs minTime on empty head concurrent appends. +// See: https://github.com/prometheus/prometheus/pull/17963 +func TestHead_InitAppenderRace_ErrOutOfBounds(t *testing.T) { + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) + require.NoError(t, head.Init(0)) + + ts := timestamp.FromTime(time.Now()) + appendCycles := 100 + + g, ctx := errgroup.WithContext(t.Context()) + var wg sync.WaitGroup + wg.Add(1) + + for i := range 100 { + g.Go(func() error { + appends := 0 + wg.Wait() + for ctx.Err() == nil && appends < appendCycles { + appends++ + app := head.Appender(t.Context()) + if _, err := app.Append(0, labels.FromStrings("__name__", strconv.Itoa(i)), ts, float64(ts)); err != nil { + return fmt.Errorf("error when appending to head: %w", err) + } + if err := app.Rollback(); err != nil { + return err + } + } + return nil + }) + } + wg.Done() + require.NoError(t, g.Wait()) +} + // TestHead_HighConcurrencyReadAndWrite generates 1000 series with a step of 15s and fills a whole block with samples, // this means in total it generates 4000 chunks because with a step of 15s there are 4 chunks per block per series. // While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the // returned results are correct. func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) + for _, appV2 := range []bool{false, true} { + t.Run(fmt.Sprintf("appV2=%v", appV2), func(t *testing.T) { + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) - seriesCnt := 1000 - readConcurrency := 2 - writeConcurrency := 10 - startTs := uint64(DefaultBlockDuration) // start at the second block relative to the unix epoch. - qryRange := uint64(5 * time.Minute.Milliseconds()) - step := uint64(15 * time.Second / time.Millisecond) - endTs := startTs + uint64(DefaultBlockDuration) + seriesCnt := 1000 + readConcurrency := 2 + writeConcurrency := 10 + startTs := uint64(DefaultBlockDuration) // Start at the second block relative to the unix epoch. + qryRange := uint64(5 * time.Minute.Milliseconds()) + step := uint64(15 * time.Second / time.Millisecond) + endTs := startTs + uint64(DefaultBlockDuration) - labelSets := make([]labels.Labels, seriesCnt) - for i := range seriesCnt { - labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i)) - } - - head.Init(0) - - g, ctx := errgroup.WithContext(context.Background()) - whileNotCanceled := func(f func() (bool, error)) error { - for ctx.Err() == nil { - cont, err := f() - if err != nil { - return err + labelSets := make([]labels.Labels, seriesCnt) + for i := range seriesCnt { + labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i)) } - if !cont { + require.NoError(t, head.Init(0)) + + g, ctx := errgroup.WithContext(t.Context()) + whileNotCanceled := func(f func() (bool, error)) error { + for ctx.Err() == nil { + cont, err := f() + if err != nil { + return err + } + if !cont { + return nil + } + } return nil } - } - return nil - } - // Create one channel for each write worker, the channels will be used by the coordinator - // go routine to coordinate which timestamps each write worker has to write. - writerTsCh := make([]chan uint64, writeConcurrency) - for writerTsChIdx := range writerTsCh { - writerTsCh[writerTsChIdx] = make(chan uint64) - } + // Create one channel for each write worker, the channels will be used by the coordinator + // go routine to coordinate which timestamps each write worker has to write. + writerTsCh := make([]chan uint64, writeConcurrency) + for writerTsChIdx := range writerTsCh { + writerTsCh[writerTsChIdx] = make(chan uint64) + } - // workerReadyWg is used to synchronize the start of the test, - // we only start the test once all workers signal that they're ready. - var workerReadyWg sync.WaitGroup - workerReadyWg.Add(writeConcurrency + readConcurrency) + // workerReadyWg is used to synchronize the start of the test, + // we only start the test once all workers signal that they're ready. + var workerReadyWg sync.WaitGroup + workerReadyWg.Add(writeConcurrency + readConcurrency) - // Start the write workers. - for wid := range writeConcurrency { - // Create copy of workerID to be used by worker routine. - workerID := wid + // Start the write workers. + for wid := range writeConcurrency { + // Create copy of workerID to be used by worker routine. + workerID := wid - g.Go(func() error { - // The label sets which this worker will write. - workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)] + g.Go(func() error { + // The label sets which this worker will write. + workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)] - // Signal that this worker is ready. - workerReadyWg.Done() + // Signal that this worker is ready. + workerReadyWg.Done() - return whileNotCanceled(func() (bool, error) { - ts, ok := <-writerTsCh[workerID] - if !ok { - return false, nil - } + return whileNotCanceled(func() (bool, error) { + ts, ok := <-writerTsCh[workerID] + if !ok { + return false, nil + } - app := head.Appender(ctx) - for i := range workerLabelSets { - // We also use the timestamp as the sample value. - _, err := app.Append(0, workerLabelSets[i], int64(ts), float64(ts)) - if err != nil { - return false, fmt.Errorf("Error when appending to head: %w", err) - } - } + if appV2 { + app := head.AppenderV2(ctx) + for i := range workerLabelSets { + // We also use the timestamp as the sample value. + if _, err := app.Append(0, workerLabelSets[i], 0, int64(ts), float64(ts), nil, nil, storage.AOptions{}); err != nil { + return false, fmt.Errorf("error when appending (V2) to head: %w", err) + } + } + return true, app.Commit() + } - return true, app.Commit() - }) - }) - } - - // queryHead is a helper to query the head for a given time range and labelset. - queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]chunks.Sample, error) { - q, err := NewBlockQuerier(head, int64(mint), int64(maxt)) - if err != nil { - return nil, err - } - return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil - } - - // readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read. - readerTsCh := make(chan uint64) - - // Start the read workers. - for wid := range readConcurrency { - // Create copy of threadID to be used by worker routine. - workerID := wid - - g.Go(func() error { - querySeriesRef := (seriesCnt / readConcurrency) * workerID - - // Signal that this worker is ready. - workerReadyWg.Done() - - return whileNotCanceled(func() (bool, error) { - ts, ok := <-readerTsCh - if !ok { - return false, nil - } - - querySeriesRef = (querySeriesRef + 1) % seriesCnt - lbls := labelSets[querySeriesRef] - // lbls has a single entry; extract it so we can run a query. - var lbl labels.Label - lbls.Range(func(l labels.Label) { - lbl = l + app := head.Appender(ctx) + for i := range workerLabelSets { + // We also use the timestamp as the sample value. + if _, err := app.Append(0, workerLabelSets[i], int64(ts), float64(ts)); err != nil { + return false, fmt.Errorf("error when appending to head: %w", err) + } + } + return true, app.Commit() + }) }) - samples, err := queryHead(ts-qryRange, ts, lbl) + } + + // queryHead is a helper to query the head for a given time range and labelset. + queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]chunks.Sample, error) { + q, err := NewBlockQuerier(head, int64(mint), int64(maxt)) if err != nil { - return false, err + return nil, err } + return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil + } - if len(samples) != 1 { - return false, fmt.Errorf("expected 1 series, got %d", len(samples)) - } + // readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read. + readerTsCh := make(chan uint64) - series := lbls.String() - expectSampleCnt := qryRange/step + 1 - if expectSampleCnt != uint64(len(samples[series])) { - return false, fmt.Errorf("expected %d samples, got %d", expectSampleCnt, len(samples[series])) - } + // Start the read workers. + for wid := range readConcurrency { + // Create copy of threadID to be used by worker routine. + workerID := wid - for sampleIdx, sample := range samples[series] { - expectedValue := ts - qryRange + (uint64(sampleIdx) * step) - if sample.T() != int64(expectedValue) { - return false, fmt.Errorf("expected sample %d to have ts %d, got %d", sampleIdx, expectedValue, sample.T()) + g.Go(func() error { + querySeriesRef := (seriesCnt / readConcurrency) * workerID + + // Signal that this worker is ready. + workerReadyWg.Done() + + return whileNotCanceled(func() (bool, error) { + ts, ok := <-readerTsCh + if !ok { + return false, nil + } + + querySeriesRef = (querySeriesRef + 1) % seriesCnt + lbls := labelSets[querySeriesRef] + // lbls has a single entry; extract it so we can run a query. + var lbl labels.Label + lbls.Range(func(l labels.Label) { + lbl = l + }) + samples, err := queryHead(ts-qryRange, ts, lbl) + if err != nil { + return false, err + } + + if len(samples) != 1 { + return false, fmt.Errorf("expected 1 series, got %d", len(samples)) + } + + series := lbls.String() + expectSampleCnt := qryRange/step + 1 + if expectSampleCnt != uint64(len(samples[series])) { + return false, fmt.Errorf("expected %d samples, got %d", expectSampleCnt, len(samples[series])) + } + + for sampleIdx, sample := range samples[series] { + expectedValue := ts - qryRange + (uint64(sampleIdx) * step) + if sample.T() != int64(expectedValue) { + return false, fmt.Errorf("expected sample %d to have ts %d, got %d", sampleIdx, expectedValue, sample.T()) + } + if sample.F() != float64(expectedValue) { + return false, fmt.Errorf("expected sample %d to have value %d, got %f", sampleIdx, expectedValue, sample.F()) + } + } + + return true, nil + }) + }) + } + + // Start the coordinator go routine. + g.Go(func() error { + currTs := startTs + + defer func() { + // End of the test, close all channels to stop the workers. + for _, ch := range writerTsCh { + close(ch) } - if sample.F() != float64(expectedValue) { - return false, fmt.Errorf("expected sample %d to have value %d, got %f", sampleIdx, expectedValue, sample.F()) - } - } + close(readerTsCh) + }() - return true, nil + // Wait until all workers are ready to start the test. + workerReadyWg.Wait() + + return whileNotCanceled(func() (bool, error) { + // Send the current timestamp to each of the writers. + for _, ch := range writerTsCh { + select { + case ch <- currTs: + case <-ctx.Done(): + return false, nil + } + } + + // Once data for at least has been ingested, send the current timestamp to the readers. + if currTs > startTs+qryRange { + select { + case readerTsCh <- currTs - step: + case <-ctx.Done(): + return false, nil + } + } + + currTs += step + if currTs > endTs { + return false, nil + } + + return true, nil + }) }) + + require.NoError(t, g.Wait()) }) } - - // Start the coordinator go routine. - g.Go(func() error { - currTs := startTs - - defer func() { - // End of the test, close all channels to stop the workers. - for _, ch := range writerTsCh { - close(ch) - } - close(readerTsCh) - }() - - // Wait until all workers are ready to start the test. - workerReadyWg.Wait() - return whileNotCanceled(func() (bool, error) { - // Send the current timestamp to each of the writers. - for _, ch := range writerTsCh { - select { - case ch <- currTs: - case <-ctx.Done(): - return false, nil - } - } - - // Once data for at least has been ingested, send the current timestamp to the readers. - if currTs > startTs+qryRange { - select { - case readerTsCh <- currTs - step: - case <-ctx.Done(): - return false, nil - } - } - - currTs += step - if currTs > endTs { - return false, nil - } - - return true, nil - }) - }) - - require.NoError(t, g.Wait()) } func TestHead_ReadWAL(t *testing.T) {