mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Merge 726aafa2be into 7769495a4a
This commit is contained in:
commit
e26673f8b8
12 changed files with 83 additions and 363 deletions
|
|
@ -33,7 +33,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
|
|
@ -297,12 +296,12 @@ func writeMetaFile(logger *slog.Logger, dir string, meta *BlockMeta) (int64, err
|
|||
|
||||
n, err := f.Write(jsonMeta)
|
||||
if err != nil {
|
||||
return 0, tsdb_errors.NewMulti(err, f.Close()).Err()
|
||||
return 0, errors.Join(err, f.Close())
|
||||
}
|
||||
|
||||
// Force the kernel to persist the file on disk to avoid data loss if the host crashes.
|
||||
if err := f.Sync(); err != nil {
|
||||
return 0, tsdb_errors.NewMulti(err, f.Close()).Err()
|
||||
return 0, errors.Join(err, f.Close())
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
return 0, err
|
||||
|
|
@ -344,7 +343,7 @@ func OpenBlock(logger *slog.Logger, dir string, pool chunkenc.Pool, postingsDeco
|
|||
var closers []io.Closer
|
||||
defer func() {
|
||||
if err != nil {
|
||||
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
|
||||
err = errors.Join(err, closeAll(closers))
|
||||
}
|
||||
}()
|
||||
meta, sizeMeta, err := readMetaFile(dir)
|
||||
|
|
@ -398,11 +397,11 @@ func (pb *Block) Close() error {
|
|||
|
||||
pb.pendingReaders.Wait()
|
||||
|
||||
return tsdb_errors.NewMulti(
|
||||
return errors.Join(
|
||||
pb.chunkr.Close(),
|
||||
pb.indexr.Close(),
|
||||
pb.tombstones.Close(),
|
||||
).Err()
|
||||
)
|
||||
}
|
||||
|
||||
func (pb *Block) String() string {
|
||||
|
|
|
|||
|
|
@ -777,7 +777,7 @@ func sequenceFiles(dir string) ([]string, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
// closeAll closes all given closers while recording error in MultiError.
|
||||
// closeAll closes all given closers while recording all errors.
|
||||
func closeAll(cs []io.Closer) error {
|
||||
var errs []error
|
||||
for _, c := range cs {
|
||||
|
|
|
|||
|
|
@ -32,7 +32,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
|
|
@ -572,16 +571,16 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,
|
|||
return []ulid.ULID{uid}, nil
|
||||
}
|
||||
|
||||
errs := tsdb_errors.NewMulti(err)
|
||||
errs := []error{err}
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
for _, b := range bs {
|
||||
if err := b.setCompactionFailed(); err != nil {
|
||||
errs.Add(fmt.Errorf("setting compaction failed for block: %s: %w", b.Dir(), err))
|
||||
errs = append(errs, fmt.Errorf("setting compaction failed for block: %s: %w", b.Dir(), err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errs.Err()
|
||||
return nil, errors.Join(errs...)
|
||||
}
|
||||
|
||||
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) ([]ulid.ULID, error) {
|
||||
|
|
@ -661,7 +660,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
|
|||
tmp := dir + tmpForCreationBlockDirSuffix
|
||||
var closers []io.Closer
|
||||
defer func(t time.Time) {
|
||||
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
|
||||
err = errors.Join(err, closeAll(closers))
|
||||
|
||||
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
|
||||
if err := os.RemoveAll(tmp); err != nil {
|
||||
|
|
@ -718,13 +717,13 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
|
|||
// though these are covered under defer. This is because in Windows,
|
||||
// you cannot delete these unless they are closed and the defer is to
|
||||
// make sure they are closed if the function exits due to an error above.
|
||||
errs := tsdb_errors.NewMulti()
|
||||
var errs []error
|
||||
for _, w := range closers {
|
||||
errs.Add(w.Close())
|
||||
errs = append(errs, w.Close())
|
||||
}
|
||||
closers = closers[:0] // Avoid closing the writers twice in the defer.
|
||||
if errs.Err() != nil {
|
||||
return errs.Err()
|
||||
if err := errors.Join(errs...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Populated block is empty, so exit early.
|
||||
|
|
@ -803,11 +802,9 @@ func (DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compact
|
|||
overlapping bool
|
||||
)
|
||||
defer func() {
|
||||
errs := tsdb_errors.NewMulti(err)
|
||||
if cerr := tsdb_errors.CloseAll(closers); cerr != nil {
|
||||
errs.Add(fmt.Errorf("close: %w", cerr))
|
||||
if cerr := closeAll(closers); cerr != nil {
|
||||
err = errors.Join(err, fmt.Errorf("close: %w", cerr))
|
||||
}
|
||||
err = errs.Err()
|
||||
metrics.PopulatingBlocks.Set(0)
|
||||
}()
|
||||
metrics.PopulatingBlocks.Set(1)
|
||||
|
|
|
|||
|
|
@ -1421,7 +1421,6 @@ func TestCancelCompactions(t *testing.T) {
|
|||
|
||||
// Make sure that no blocks were marked as compaction failed.
|
||||
// This checks that the `context.Canceled` error is properly checked at all levels:
|
||||
// - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks.
|
||||
// - callers should check with errors.Is() instead of ==.
|
||||
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", promslog.NewNopLogger())
|
||||
require.NoError(t, err)
|
||||
|
|
|
|||
67
tsdb/db.go
67
tsdb/db.go
|
|
@ -41,7 +41,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met.
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
|
|
@ -538,11 +537,9 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
|
|||
return err
|
||||
}
|
||||
defer func() {
|
||||
errs := tsdb_errors.NewMulti(returnErr)
|
||||
if err := head.Close(); err != nil {
|
||||
errs.Add(fmt.Errorf("closing Head: %w", err))
|
||||
returnErr = errors.Join(returnErr, fmt.Errorf("closing Head: %w", err))
|
||||
}
|
||||
returnErr = errs.Err()
|
||||
}()
|
||||
// Set the min valid time for the ingested wal samples
|
||||
// to be no lower than the maxt of the last block.
|
||||
|
|
@ -697,13 +694,13 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
|
|||
db.logger.Warn("Closing block failed", "err", err, "block", b)
|
||||
}
|
||||
}
|
||||
errs := tsdb_errors.NewMulti()
|
||||
var errs []error
|
||||
for ulid, err := range corrupted {
|
||||
if err != nil {
|
||||
errs.Add(fmt.Errorf("corrupted block %s: %w", ulid.String(), err))
|
||||
errs = append(errs, fmt.Errorf("corrupted block %s: %w", ulid.String(), err))
|
||||
}
|
||||
}
|
||||
return nil, errs.Err()
|
||||
return nil, errors.Join(errs...)
|
||||
}
|
||||
|
||||
if len(loadable) == 0 {
|
||||
|
|
@ -814,7 +811,7 @@ func (db *DBReadOnly) Close() error {
|
|||
}
|
||||
close(db.closed)
|
||||
|
||||
return tsdb_errors.CloseAll(db.closers)
|
||||
return closeAll(db.closers)
|
||||
}
|
||||
|
||||
// Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.
|
||||
|
|
@ -934,11 +931,9 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
|
|||
}
|
||||
|
||||
close(db.donec) // DB is never run if it was an error, so close this channel here.
|
||||
errs := tsdb_errors.NewMulti(returnedErr)
|
||||
if err := db.Close(); err != nil {
|
||||
errs.Add(fmt.Errorf("close DB after failed startup: %w", err))
|
||||
returnedErr = errors.Join(returnedErr, fmt.Errorf("close DB after failed startup: %w", err))
|
||||
}
|
||||
returnedErr = errs.Err()
|
||||
}()
|
||||
|
||||
if db.blocksToDelete == nil {
|
||||
|
|
@ -1392,11 +1387,9 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) {
|
|||
|
||||
lastBlockMaxt := int64(math.MinInt64)
|
||||
defer func() {
|
||||
errs := tsdb_errors.NewMulti(returnErr)
|
||||
if err := db.head.truncateWAL(lastBlockMaxt); err != nil {
|
||||
errs.Add(fmt.Errorf("WAL truncation in Compact defer: %w", err))
|
||||
returnErr = errors.Join(returnErr, fmt.Errorf("WAL truncation in Compact defer: %w", err))
|
||||
}
|
||||
returnErr = errs.Err()
|
||||
}()
|
||||
|
||||
start := time.Now()
|
||||
|
|
@ -1521,13 +1514,13 @@ func (db *DB) compactOOOHead(ctx context.Context) error {
|
|||
return fmt.Errorf("compact ooo head: %w", err)
|
||||
}
|
||||
if err := db.reloadBlocks(); err != nil {
|
||||
errs := tsdb_errors.NewMulti(err)
|
||||
errs := []error{err}
|
||||
for _, uid := range ulids {
|
||||
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
|
||||
errs.Add(errRemoveAll)
|
||||
errs = append(errs, errRemoveAll)
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("reloadBlocks blocks after failed compact ooo head: %w", errs.Err())
|
||||
return fmt.Errorf("reloadBlocks blocks after failed compact ooo head: %w", errors.Join(errs...))
|
||||
}
|
||||
|
||||
lastWBLFile, minOOOMmapRef := oooHead.LastWBLFile(), oooHead.LastMmapRef()
|
||||
|
|
@ -1612,13 +1605,15 @@ func (db *DB) compactHead(head *RangeHead) error {
|
|||
}
|
||||
|
||||
if err := db.reloadBlocks(); err != nil {
|
||||
multiErr := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err))
|
||||
errs := []error{
|
||||
fmt.Errorf("reloadBlocks blocks: %w", err),
|
||||
}
|
||||
for _, uid := range uids {
|
||||
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
|
||||
multiErr.Add(fmt.Errorf("delete persisted head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
|
||||
errs = append(errs, fmt.Errorf("delete persisted head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
|
||||
}
|
||||
}
|
||||
return multiErr.Err()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil {
|
||||
return fmt.Errorf("head memory truncate: %w", err)
|
||||
|
|
@ -1708,13 +1703,13 @@ func (db *DB) compactBlocks() (err error) {
|
|||
}
|
||||
|
||||
if err := db.reloadBlocks(); err != nil {
|
||||
errs := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err))
|
||||
errs := []error{fmt.Errorf("reloadBlocks blocks: %w", err)}
|
||||
for _, uid := range uids {
|
||||
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
|
||||
errs.Add(fmt.Errorf("delete persisted block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
|
||||
errs = append(errs, fmt.Errorf("delete persisted block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
|
||||
}
|
||||
}
|
||||
return errs.Err()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1794,13 +1789,13 @@ func (db *DB) reloadBlocks() (err error) {
|
|||
}
|
||||
}
|
||||
db.mtx.RUnlock()
|
||||
errs := tsdb_errors.NewMulti()
|
||||
var errs []error
|
||||
for ulid, err := range corrupted {
|
||||
if err != nil {
|
||||
errs.Add(fmt.Errorf("corrupted block %s: %w", ulid.String(), err))
|
||||
errs = append(errs, fmt.Errorf("corrupted block %s: %w", ulid.String(), err))
|
||||
}
|
||||
}
|
||||
return errs.Err()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
@ -2172,11 +2167,14 @@ func (db *DB) Close() error {
|
|||
g.Go(pb.Close)
|
||||
}
|
||||
|
||||
errs := tsdb_errors.NewMulti(g.Wait(), db.locker.Release())
|
||||
if db.head != nil {
|
||||
errs.Add(db.head.Close())
|
||||
errs := []error{
|
||||
g.Wait(),
|
||||
db.locker.Release(),
|
||||
}
|
||||
return errs.Err()
|
||||
if db.head != nil {
|
||||
errs = append(errs, db.head.Close())
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// DisableCompactions disables auto compactions.
|
||||
|
|
@ -2557,3 +2555,12 @@ func exponential(d, minD, maxD time.Duration) time.Duration {
|
|||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// closeAll closes all given closers while recording all errors.
|
||||
func closeAll(cs []io.Closer) error {
|
||||
var errs []error
|
||||
for _, c := range cs {
|
||||
errs = append(errs, c.Close())
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,109 +0,0 @@
|
|||
// Copyright The Prometheus Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package errors
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
// multiError type allows combining multiple errors into one.
|
||||
type multiError []error
|
||||
|
||||
// NewMulti returns multiError with provided errors added if not nil.
|
||||
func NewMulti(errs ...error) multiError { //nolint:revive // unexported-return
|
||||
m := multiError{}
|
||||
m.Add(errs...)
|
||||
return m
|
||||
}
|
||||
|
||||
// Add adds single or many errors to the error list. Each error is added only if not nil.
|
||||
// If the error is a nonNilMultiError type, the errors inside nonNilMultiError are added to the main multiError.
|
||||
func (es *multiError) Add(errs ...error) {
|
||||
for _, err := range errs {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
var merr nonNilMultiError
|
||||
if errors.As(err, &merr) {
|
||||
*es = append(*es, merr.errs...)
|
||||
continue
|
||||
}
|
||||
*es = append(*es, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Err returns the error list as an error or nil if it is empty.
|
||||
func (es multiError) Err() error {
|
||||
if len(es) == 0 {
|
||||
return nil
|
||||
}
|
||||
return nonNilMultiError{errs: es}
|
||||
}
|
||||
|
||||
// nonNilMultiError implements the error interface, and it represents
|
||||
// multiError with at least one error inside it.
|
||||
// This type is needed to make sure that nil is returned when no error is combined in multiError for err != nil
|
||||
// check to work.
|
||||
type nonNilMultiError struct {
|
||||
errs multiError
|
||||
}
|
||||
|
||||
// Error returns a concatenated string of the contained errors.
|
||||
func (es nonNilMultiError) Error() string {
|
||||
var buf bytes.Buffer
|
||||
|
||||
if len(es.errs) > 1 {
|
||||
fmt.Fprintf(&buf, "%d errors: ", len(es.errs))
|
||||
}
|
||||
|
||||
for i, err := range es.errs {
|
||||
if i != 0 {
|
||||
buf.WriteString("; ")
|
||||
}
|
||||
buf.WriteString(err.Error())
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// Is attempts to match the provided error against errors in the error list.
|
||||
//
|
||||
// This function allows errors.Is to traverse the values stored in the MultiError.
|
||||
// It returns true if any of the errors in the list match the target.
|
||||
func (es nonNilMultiError) Is(target error) bool {
|
||||
for _, err := range es.errs {
|
||||
if errors.Is(err, target) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Unwrap returns the list of errors contained in the multiError.
|
||||
func (es nonNilMultiError) Unwrap() []error {
|
||||
return es.errs
|
||||
}
|
||||
|
||||
// CloseAll closes all given closers while recording error in MultiError.
|
||||
func CloseAll(cs []io.Closer) error {
|
||||
errs := NewMulti()
|
||||
for _, c := range cs {
|
||||
errs.Add(c.Close())
|
||||
}
|
||||
return errs.Err()
|
||||
}
|
||||
|
|
@ -1,172 +0,0 @@
|
|||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package errors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMultiError_Is(t *testing.T) {
|
||||
customErr1 := errors.New("test error 1")
|
||||
customErr2 := errors.New("test error 2")
|
||||
|
||||
testCases := map[string]struct {
|
||||
sourceErrors []error
|
||||
target error
|
||||
is bool
|
||||
}{
|
||||
"adding a context cancellation doesn't lose the information": {
|
||||
sourceErrors: []error{context.Canceled},
|
||||
target: context.Canceled,
|
||||
is: true,
|
||||
},
|
||||
"adding multiple context cancellations doesn't lose the information": {
|
||||
sourceErrors: []error{context.Canceled, context.Canceled},
|
||||
target: context.Canceled,
|
||||
is: true,
|
||||
},
|
||||
"adding wrapped context cancellations doesn't lose the information": {
|
||||
sourceErrors: []error{errors.New("some error"), fmt.Errorf("some message: %w", context.Canceled)},
|
||||
target: context.Canceled,
|
||||
is: true,
|
||||
},
|
||||
"adding a nil error doesn't lose the information": {
|
||||
sourceErrors: []error{errors.New("some error"), fmt.Errorf("some message: %w", context.Canceled), nil},
|
||||
target: context.Canceled,
|
||||
is: true,
|
||||
},
|
||||
"errors with no context cancellation error are not a context canceled error": {
|
||||
sourceErrors: []error{errors.New("first error"), errors.New("second error")},
|
||||
target: context.Canceled,
|
||||
is: false,
|
||||
},
|
||||
"no errors are not a context canceled error": {
|
||||
sourceErrors: nil,
|
||||
target: context.Canceled,
|
||||
is: false,
|
||||
},
|
||||
"no errors are a nil error": {
|
||||
sourceErrors: nil,
|
||||
target: nil,
|
||||
is: true,
|
||||
},
|
||||
"nested multi-error contains customErr1": {
|
||||
sourceErrors: []error{
|
||||
customErr1,
|
||||
NewMulti(
|
||||
customErr2,
|
||||
fmt.Errorf("wrapped %w", context.Canceled),
|
||||
).Err(),
|
||||
},
|
||||
target: customErr1,
|
||||
is: true,
|
||||
},
|
||||
"nested multi-error contains customErr2": {
|
||||
sourceErrors: []error{
|
||||
customErr1,
|
||||
NewMulti(
|
||||
customErr2,
|
||||
fmt.Errorf("wrapped %w", context.Canceled),
|
||||
).Err(),
|
||||
},
|
||||
target: customErr2,
|
||||
is: true,
|
||||
},
|
||||
"nested multi-error contains wrapped context.Canceled": {
|
||||
sourceErrors: []error{
|
||||
customErr1,
|
||||
NewMulti(
|
||||
customErr2,
|
||||
fmt.Errorf("wrapped %w", context.Canceled),
|
||||
).Err(),
|
||||
},
|
||||
target: context.Canceled,
|
||||
is: true,
|
||||
},
|
||||
"nested multi-error does not contain context.DeadlineExceeded": {
|
||||
sourceErrors: []error{
|
||||
customErr1,
|
||||
NewMulti(
|
||||
customErr2,
|
||||
fmt.Errorf("wrapped %w", context.Canceled),
|
||||
).Err(),
|
||||
},
|
||||
target: context.DeadlineExceeded,
|
||||
is: false, // make sure we still return false in valid cases
|
||||
},
|
||||
}
|
||||
|
||||
for testName, testCase := range testCases {
|
||||
t.Run(testName, func(t *testing.T) {
|
||||
mErr := NewMulti(testCase.sourceErrors...)
|
||||
require.Equal(t, testCase.is, errors.Is(mErr.Err(), testCase.target))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiError_As(t *testing.T) {
|
||||
tE1 := testError{"error cause 1"}
|
||||
tE2 := testError{"error cause 2"}
|
||||
var target testError
|
||||
testCases := map[string]struct {
|
||||
sourceErrors []error
|
||||
target error
|
||||
as bool
|
||||
}{
|
||||
"MultiError containing only a testError can be cast to that testError": {
|
||||
sourceErrors: []error{tE1},
|
||||
target: tE1,
|
||||
as: true,
|
||||
},
|
||||
"MultiError containing multiple testErrors can be cast to the first testError added": {
|
||||
sourceErrors: []error{tE1, tE2},
|
||||
target: tE1,
|
||||
as: true,
|
||||
},
|
||||
"MultiError containing multiple errors can be cast to the first testError added": {
|
||||
sourceErrors: []error{context.Canceled, tE1, context.DeadlineExceeded, tE2},
|
||||
target: tE1,
|
||||
as: true,
|
||||
},
|
||||
"MultiError not containing a testError cannot be cast to a testError": {
|
||||
sourceErrors: []error{context.Canceled, context.DeadlineExceeded},
|
||||
as: false,
|
||||
},
|
||||
}
|
||||
|
||||
for testName, testCase := range testCases {
|
||||
t.Run(testName, func(t *testing.T) {
|
||||
mErr := NewMulti(testCase.sourceErrors...).Err()
|
||||
if testCase.as {
|
||||
require.ErrorAs(t, mErr, &target)
|
||||
require.Equal(t, testCase.target, target)
|
||||
} else {
|
||||
require.NotErrorAs(t, mErr, &target)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type testError struct {
|
||||
cause string
|
||||
}
|
||||
|
||||
func (e testError) Error() string {
|
||||
return fmt.Sprintf("testError[cause: %s]", e.cause)
|
||||
}
|
||||
13
tsdb/head.go
13
tsdb/head.go
|
|
@ -40,7 +40,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
|
|
@ -1812,17 +1811,17 @@ func (h *Head) Close() error {
|
|||
// takes samples from most recent head chunk.
|
||||
h.mmapHeadChunks()
|
||||
|
||||
errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close())
|
||||
errs := h.chunkDiskMapper.Close()
|
||||
if h.wal != nil {
|
||||
errs.Add(h.wal.Close())
|
||||
errs = errors.Join(errs, h.wal.Close())
|
||||
}
|
||||
if h.wbl != nil {
|
||||
errs.Add(h.wbl.Close())
|
||||
errs = errors.Join(errs, h.wbl.Close())
|
||||
}
|
||||
if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown {
|
||||
errs.Add(h.performChunkSnapshot())
|
||||
if errs == nil && h.opts.EnableMemorySnapshotOnShutdown {
|
||||
errs = errors.Join(errs, h.performChunkSnapshot())
|
||||
}
|
||||
return errs.Err()
|
||||
return errs
|
||||
}
|
||||
|
||||
// String returns an human readable representation of the TSDB head. It's important to
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/encoding"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
|
|
@ -1536,7 +1535,7 @@ func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
|
|||
return err
|
||||
}
|
||||
|
||||
errs := tsdb_errors.NewMulti()
|
||||
var errs []error
|
||||
for _, fi := range files {
|
||||
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
|
||||
continue
|
||||
|
|
@ -1559,11 +1558,11 @@ func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
|
|||
|
||||
if idx < maxIndex || (idx == maxIndex && offset < maxOffset) {
|
||||
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
|
||||
errs.Add(err)
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return errs.Err()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned,
|
||||
|
|
@ -1751,14 +1750,14 @@ Outer:
|
|||
}
|
||||
|
||||
close(errChan)
|
||||
merr := tsdb_errors.NewMulti()
|
||||
var errs []error
|
||||
if loopErr != nil {
|
||||
merr.Add(fmt.Errorf("decode loop: %w", loopErr))
|
||||
errs = append(errs, fmt.Errorf("decode loop: %w", loopErr))
|
||||
}
|
||||
for err := range errChan {
|
||||
merr.Add(fmt.Errorf("record processing: %w", err))
|
||||
errs = append(errs, fmt.Errorf("record processing: %w", err))
|
||||
}
|
||||
if err := merr.Err(); err != nil {
|
||||
if err := errors.Join(errs...); err != nil {
|
||||
return -1, -1, nil, err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
|
|
@ -92,13 +91,13 @@ func (q *blockBaseQuerier) Close() error {
|
|||
return errors.New("block querier already closed")
|
||||
}
|
||||
|
||||
errs := tsdb_errors.NewMulti(
|
||||
errs := []error{
|
||||
q.index.Close(),
|
||||
q.chunks.Close(),
|
||||
q.tombstones.Close(),
|
||||
)
|
||||
}
|
||||
q.closed = true
|
||||
return errs.Err()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
type blockQuerier struct {
|
||||
|
|
|
|||
|
|
@ -15,13 +15,13 @@ package tsdb
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
)
|
||||
|
||||
|
|
@ -82,20 +82,22 @@ func repairBadIndexVersion(logger *slog.Logger, dir string) error {
|
|||
|
||||
// Set the 5th byte to 2 to indicate the correct file format version.
|
||||
if _, err := repl.WriteAt([]byte{2}, 4); err != nil {
|
||||
errs := tsdb_errors.NewMulti(
|
||||
fmt.Errorf("rewrite of index.repaired for block dir: %v: %w", d, err))
|
||||
if err := repl.Close(); err != nil {
|
||||
errs.Add(fmt.Errorf("close: %w", err))
|
||||
errs := []error{
|
||||
fmt.Errorf("rewrite of index.repaired for block dir: %v: %w", d, err),
|
||||
}
|
||||
return errs.Err()
|
||||
if err := repl.Close(); err != nil {
|
||||
errs = append(errs, fmt.Errorf("close: %w", err))
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
if err := repl.Sync(); err != nil {
|
||||
errs := tsdb_errors.NewMulti(
|
||||
fmt.Errorf("sync of index.repaired for block dir: %v: %w", d, err))
|
||||
if err := repl.Close(); err != nil {
|
||||
errs.Add(fmt.Errorf("close: %w", err))
|
||||
errs := []error{
|
||||
fmt.Errorf("sync of index.repaired for block dir: %v: %w", d, err),
|
||||
}
|
||||
return errs.Err()
|
||||
if err := repl.Close(); err != nil {
|
||||
errs = append(errs, fmt.Errorf("close: %w", err))
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
if err := repl.Close(); err != nil {
|
||||
return fmt.Errorf("close repaired index for block dir: %v: %w", d, err)
|
||||
|
|
|
|||
|
|
@ -550,7 +550,7 @@ func TestReaderData(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// closeAll closes all given closers while recording error in MultiError.
|
||||
// closeAll closes all given closers while recording all errors.
|
||||
func closeAll(cs []io.Closer) error {
|
||||
var errs []error
|
||||
for _, c := range cs {
|
||||
|
|
|
|||
Loading…
Reference in a new issue