mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Refactor various tsdb sub-packages (#17847)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Migrate various tsdb related packages from `tsdb/errors` to the standard library `errors` package. Signed-off-by: SuperQ <superq@gmail.com>
This commit is contained in:
parent
f331aa6d14
commit
72a23934ad
8 changed files with 54 additions and 42 deletions
|
|
@ -37,7 +37,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage/remote"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
|
|
@ -798,7 +797,7 @@ func (db *DB) Close() error {
|
|||
|
||||
db.metrics.Unregister()
|
||||
|
||||
return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err()
|
||||
return errors.Join(db.locker.Release(), db.wal.Close())
|
||||
}
|
||||
|
||||
type appenderBase struct {
|
||||
|
|
|
|||
|
|
@ -25,7 +25,6 @@ import (
|
|||
"strconv"
|
||||
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
)
|
||||
|
||||
|
|
@ -431,13 +430,15 @@ func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, all
|
|||
}
|
||||
defer func() {
|
||||
if returnErr != nil {
|
||||
errs := tsdb_errors.NewMulti(returnErr)
|
||||
errs := []error{
|
||||
returnErr,
|
||||
}
|
||||
if f != nil {
|
||||
errs.Add(f.Close())
|
||||
errs = append(errs, f.Close())
|
||||
}
|
||||
// Calling RemoveAll on a non-existent file does not return error.
|
||||
errs.Add(os.RemoveAll(ptmp))
|
||||
returnErr = errs.Err()
|
||||
errs = append(errs, os.RemoveAll(ptmp))
|
||||
returnErr = errors.Join(errs...)
|
||||
}
|
||||
}()
|
||||
if allocSize > 0 {
|
||||
|
|
@ -665,10 +666,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
|
|||
for _, fn := range files {
|
||||
f, err := fileutil.OpenMmapFile(fn)
|
||||
if err != nil {
|
||||
return nil, tsdb_errors.NewMulti(
|
||||
return nil, errors.Join(
|
||||
fmt.Errorf("mmap files: %w", err),
|
||||
tsdb_errors.CloseAll(cs),
|
||||
).Err()
|
||||
closeAll(cs),
|
||||
)
|
||||
}
|
||||
cs = append(cs, f)
|
||||
bs = append(bs, realByteSlice(f.Bytes()))
|
||||
|
|
@ -676,16 +677,16 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
|
|||
|
||||
reader, err := newReader(bs, cs, pool)
|
||||
if err != nil {
|
||||
return nil, tsdb_errors.NewMulti(
|
||||
return nil, errors.Join(
|
||||
err,
|
||||
tsdb_errors.CloseAll(cs),
|
||||
).Err()
|
||||
closeAll(cs),
|
||||
)
|
||||
}
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
func (s *Reader) Close() error {
|
||||
return tsdb_errors.CloseAll(s.cs)
|
||||
return closeAll(s.cs)
|
||||
}
|
||||
|
||||
// Size returns the size of the chunks.
|
||||
|
|
@ -774,3 +775,12 @@ func sequenceFiles(dir string) ([]string, error) {
|
|||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// closeAll closes all given closers while recording error in MultiError.
|
||||
func closeAll(cs []io.Closer) error {
|
||||
var errs []error
|
||||
for _, c := range cs {
|
||||
errs = append(errs, c.Close())
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,6 @@ import (
|
|||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
)
|
||||
|
||||
|
|
@ -304,7 +303,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
|||
cdm.closers = map[int]io.Closer{}
|
||||
defer func() {
|
||||
if returnErr != nil {
|
||||
returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err()
|
||||
returnErr = errors.Join(returnErr, closeAllFromMap(cdm.closers))
|
||||
|
||||
cdm.mmappedChunkFiles = nil
|
||||
cdm.closers = nil
|
||||
|
|
@ -614,7 +613,7 @@ func (cdm *ChunkDiskMapper) cut() (seq, offset int, returnErr error) {
|
|||
// The file should not be closed if there is no error,
|
||||
// its kept open in the ChunkDiskMapper.
|
||||
if returnErr != nil {
|
||||
returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err()
|
||||
returnErr = errors.Join(returnErr, newFile.Close())
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -970,7 +969,7 @@ func (cdm *ChunkDiskMapper) Truncate(fileNo uint32) error {
|
|||
}
|
||||
cdm.readPathMtx.RUnlock()
|
||||
|
||||
errs := tsdb_errors.NewMulti()
|
||||
var errs []error
|
||||
// Cut a new file only if the current file has some chunks.
|
||||
if cdm.curFileSize() > HeadChunkFileHeaderSize {
|
||||
// There is a known race condition here because between the check of curFileSize() and the call to CutNewFile()
|
||||
|
|
@ -979,7 +978,7 @@ func (cdm *ChunkDiskMapper) Truncate(fileNo uint32) error {
|
|||
cdm.CutNewFile()
|
||||
}
|
||||
pendingDeletes, err := cdm.deleteFiles(removedFiles)
|
||||
errs.Add(err)
|
||||
errs = append(errs, err)
|
||||
|
||||
if len(chkFileIndices) == len(removedFiles) {
|
||||
// All files were deleted. Reset the current sequence.
|
||||
|
|
@ -1003,7 +1002,7 @@ func (cdm *ChunkDiskMapper) Truncate(fileNo uint32) error {
|
|||
cdm.evtlPosMtx.Unlock()
|
||||
}
|
||||
|
||||
return errs.Err()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// deleteFiles deletes the given file sequences in order of the sequence.
|
||||
|
|
@ -1098,23 +1097,23 @@ func (cdm *ChunkDiskMapper) Close() error {
|
|||
}
|
||||
cdm.closed = true
|
||||
|
||||
errs := tsdb_errors.NewMulti(
|
||||
errs := []error{
|
||||
closeAllFromMap(cdm.closers),
|
||||
cdm.finalizeCurFile(),
|
||||
cdm.dir.Close(),
|
||||
)
|
||||
}
|
||||
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
|
||||
cdm.closers = map[int]io.Closer{}
|
||||
|
||||
return errs.Err()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
func closeAllFromMap(cs map[int]io.Closer) error {
|
||||
errs := tsdb_errors.NewMulti()
|
||||
var errs []error
|
||||
for _, c := range cs {
|
||||
errs.Add(c.Close())
|
||||
errs = append(errs, c.Close())
|
||||
}
|
||||
return errs.Err()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
const inBufferShards = 128 // 128 is a randomly chosen number.
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/encoding"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
)
|
||||
|
||||
|
|
@ -1007,10 +1006,10 @@ func NewFileReader(path string, decoder PostingsDecoder) (*Reader, error) {
|
|||
}
|
||||
r, err := newReader(realByteSlice(f.Bytes()), f, decoder)
|
||||
if err != nil {
|
||||
return nil, tsdb_errors.NewMulti(
|
||||
return nil, errors.Join(
|
||||
err,
|
||||
f.Close(),
|
||||
).Err()
|
||||
)
|
||||
}
|
||||
|
||||
return r, nil
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ import (
|
|||
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/encoding"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
)
|
||||
|
||||
|
|
@ -128,7 +127,7 @@ func WriteFile(logger *slog.Logger, dir string, tr Reader) (int64, error) {
|
|||
size += n
|
||||
|
||||
if err := f.Sync(); err != nil {
|
||||
return 0, tsdb_errors.NewMulti(err, f.Close()).Err()
|
||||
return 0, errors.Join(err, f.Close())
|
||||
}
|
||||
|
||||
if err = f.Close(); err != nil {
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
)
|
||||
|
||||
|
|
@ -94,10 +93,9 @@ func (l *DirLocker) Release() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
errs := tsdb_errors.NewMulti()
|
||||
errs.Add(l.releaser.Release())
|
||||
errs.Add(os.Remove(l.path))
|
||||
releaserErr := l.releaser.Release()
|
||||
removeErr := os.Remove(l.path)
|
||||
|
||||
l.releaser = nil
|
||||
return errs.Err()
|
||||
return errors.Join(releaserErr, removeErr)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ import (
|
|||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
|
|
@ -71,14 +70,14 @@ func DeleteCheckpoints(dir string, maxIndex int) error {
|
|||
return err
|
||||
}
|
||||
|
||||
errs := tsdb_errors.NewMulti()
|
||||
var errs []error
|
||||
for _, checkpoint := range checkpoints {
|
||||
if checkpoint.index >= maxIndex {
|
||||
break
|
||||
}
|
||||
errs.Add(os.RemoveAll(filepath.Join(dir, checkpoint.name)))
|
||||
errs = append(errs, os.RemoveAll(filepath.Join(dir, checkpoint.name)))
|
||||
}
|
||||
return errs.Err()
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// CheckpointPrefix is the prefix used for checkpoint files.
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
|
|
@ -32,7 +33,6 @@ import (
|
|||
"github.com/prometheus/common/promslog"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/util/compression"
|
||||
)
|
||||
|
||||
|
|
@ -287,7 +287,7 @@ func (m *multiReadCloser) Read(p []byte) (n int, err error) {
|
|||
}
|
||||
|
||||
func (m *multiReadCloser) Close() error {
|
||||
return tsdb_errors.NewMulti(tsdb_errors.CloseAll(m.closers)).Err()
|
||||
return errors.Join(closeAll(m.closers))
|
||||
}
|
||||
|
||||
func allSegments(dir string) (io.ReadCloser, error) {
|
||||
|
|
@ -549,3 +549,12 @@ func TestReaderData(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// closeAll closes all given closers while recording error in MultiError.
|
||||
func closeAll(cs []io.Closer) error {
|
||||
var errs []error
|
||||
for _, c := range cs {
|
||||
errs = append(errs, c.Close())
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue