feat(tsdb): new AppenderV2 and AtST interface for chunks

No implementation yet. Just to test the shape of the interface.
AtST is implemented for trivial cases, anything else is hard coded
to return 0.

Ref: https://github.com/prometheus/prometheus/issues/17791

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2026-01-12 13:14:54 +01:00
parent c7bc56cf6c
commit 70bc06718d
No known key found for this signature in database
GPG key ID: 47A8F9CE80FD7C7F
29 changed files with 295 additions and 145 deletions

View file

@ -3747,12 +3747,12 @@ func TestHistogramRateWithFloatStaleness(t *testing.T) {
recoded bool
)
newc, recoded, app, err = app.AppendHistogram(nil, 0, h1.Copy(), false)
newc, recoded, app, err = app.AppendHistogram(nil, 0, 0, h1.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)
newc, recoded, _, err = app.AppendHistogram(nil, 10, h1.Copy(), false)
newc, recoded, _, err = app.AppendHistogram(nil, 0, 10, h1.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)
@ -3762,7 +3762,7 @@ func TestHistogramRateWithFloatStaleness(t *testing.T) {
app, err = c2.Appender()
require.NoError(t, err)
app.Append(20, math.Float64frombits(value.StaleNaN))
app.Append(0, 20, math.Float64frombits(value.StaleNaN))
// Make a chunk with two normal histograms that have zero value.
h2 := histogram.Histogram{
@ -3773,12 +3773,12 @@ func TestHistogramRateWithFloatStaleness(t *testing.T) {
app, err = c3.Appender()
require.NoError(t, err)
newc, recoded, app, err = app.AppendHistogram(nil, 30, h2.Copy(), false)
newc, recoded, app, err = app.AppendHistogram(nil, 0, 30, h2.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)
newc, recoded, _, err = app.AppendHistogram(nil, 40, h2.Copy(), false)
newc, recoded, _, err = app.AppendHistogram(nil, 0, 40, h2.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)

View file

@ -235,4 +235,6 @@ func (h *histogramIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64,
func (*histogramIterator) AtT() int64 { return 0 }
func (*histogramIterator) AtST() int64 { return 0 }
func (*histogramIterator) Err() error { return nil }

View file

@ -487,6 +487,11 @@ func (ssi *storageSeriesIterator) AtT() int64 {
return ssi.currT
}
// TODO(krajorama): implement AtST.
func (*storageSeriesIterator) AtST() int64 {
return 0
}
func (ssi *storageSeriesIterator) Next() chunkenc.ValueType {
if ssi.currH != nil {
ssi.iHistograms++

View file

@ -697,12 +697,14 @@ func TestQueryForStateSeries(t *testing.T) {
{
selectMockFunction: func(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return storage.TestSeriesSet(storage.MockSeries(
nil,
[]int64{1, 2, 3},
[]float64{1, 2, 3},
[]string{"__name__", "ALERTS_FOR_STATE", "alertname", "TestRule", "severity", "critical"},
))
},
expectedSeries: storage.MockSeries(
nil,
[]int64{1, 2, 3},
[]float64{1, 2, 3},
[]string{"__name__", "ALERTS_FOR_STATE", "alertname", "TestRule", "severity", "critical"},

View file

@ -171,6 +171,11 @@ func (s fSample) T() int64 {
return s.t
}
// TODO(krajorama): implement ST.
func (fSample) ST() int64 {
return 0
}
func (s fSample) F() float64 {
return s.f
}
@ -200,6 +205,11 @@ func (s hSample) T() int64 {
return s.t
}
// TODO(krajorama): implement ST.
func (hSample) ST() int64 {
return 0
}
func (hSample) F() float64 {
panic("F() called for hSample")
}
@ -229,6 +239,11 @@ func (s fhSample) T() int64 {
return s.t
}
// TODO(krajorama): implement ST.
func (fhSample) ST() int64 {
return 0
}
func (fhSample) F() float64 {
panic("F() called for fhSample")
}

View file

@ -402,6 +402,10 @@ func (*mockSeriesIterator) AtT() int64 {
return 0 // Not really mocked.
}
func (*mockSeriesIterator) AtST() int64 {
return 0 // Not really mocked.
}
type fakeSeriesIterator struct {
nsamples int64
step int64
@ -428,6 +432,10 @@ func (it *fakeSeriesIterator) AtT() int64 {
return it.idx * it.step
}
func (*fakeSeriesIterator) AtST() int64 {
return 0 // No start timestamps in this fake iterator.
}
func (it *fakeSeriesIterator) Next() chunkenc.ValueType {
it.idx++
if it.idx >= it.nsamples {

View file

@ -473,9 +473,10 @@ type Series interface {
}
type mockSeries struct {
timestamps []int64
values []float64
labelSet []string
startTimestamps []int64
timestamps []int64
values []float64
labelSet []string
}
func (s mockSeries) Labels() labels.Labels {
@ -483,15 +484,19 @@ func (s mockSeries) Labels() labels.Labels {
}
func (s mockSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
return chunkenc.MockSeriesIterator(s.timestamps, s.values)
return chunkenc.MockSeriesIterator(s.startTimestamps, s.timestamps, s.values)
}
// MockSeries returns a series with custom timestamps, values and labelSet.
func MockSeries(timestamps []int64, values []float64, labelSet []string) Series {
// MockSeries returns a series with custom start timestamp, timestamps, values,
// and labelSet.
// Start timestamps is optional, pass nil or empty slice to indicate no start
// timestamps.
func MockSeries(startTimestamps, timestamps []int64, values []float64, labelSet []string) Series {
return mockSeries{
timestamps: timestamps,
values: values,
labelSet: labelSet,
startTimestamps: startTimestamps,
timestamps: timestamps,
values: values,
labelSet: labelSet,
}
}

View file

@ -23,7 +23,7 @@ import (
)
func TestMockSeries(t *testing.T) {
s := storage.MockSeries([]int64{1, 2, 3}, []float64{1, 2, 3}, []string{"__name__", "foo"})
s := storage.MockSeries(nil, []int64{1, 2, 3}, []float64{1, 2, 3}, []string{"__name__", "foo"})
it := s.Iterator(nil)
ts := []int64{}
vs := []float64{}
@ -35,3 +35,20 @@ func TestMockSeries(t *testing.T) {
require.Equal(t, []int64{1, 2, 3}, ts)
require.Equal(t, []float64{1, 2, 3}, vs)
}
func TestMockSeriesWithST(t *testing.T) {
s := storage.MockSeries([]int64{0, 1, 2}, []int64{1, 2, 3}, []float64{1, 2, 3}, []string{"__name__", "foo"})
it := s.Iterator(nil)
ts := []int64{}
vs := []float64{}
st := []int64{}
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
ts = append(ts, t)
vs = append(vs, v)
st = append(st, it.AtST())
}
require.Equal(t, []int64{1, 2, 3}, ts)
require.Equal(t, []float64{1, 2, 3}, vs)
require.Equal(t, []int64{0, 1, 2}, st)
}

View file

@ -599,6 +599,13 @@ func (c *chainSampleIterator) AtT() int64 {
return c.curr.AtT()
}
func (c *chainSampleIterator) AtST() int64 {
if c.curr == nil {
panic("chainSampleIterator.AtST called before first .Next or after .Next returned false.")
}
return c.curr.AtST()
}
func (c *chainSampleIterator) Next() chunkenc.ValueType {
var (
currT int64

View file

@ -1716,6 +1716,10 @@ func (errIterator) AtT() int64 {
return 0
}
func (errIterator) AtST() int64 {
return 0
}
func (e errIterator) Err() error {
return e.err
}

View file

@ -564,6 +564,12 @@ func (c *concreteSeriesIterator) AtT() int64 {
return c.series.floats[c.floatsCur].Timestamp
}
// TODO(krajorama): implement AtST. Maybe. concreteSeriesIterator is used
// for turning query results into an iterable, but query results do not have ST.
func (*concreteSeriesIterator) AtST() int64 {
return 0
}
const noTS = int64(math.MaxInt64)
// Next implements chunkenc.Iterator.
@ -832,6 +838,11 @@ func (it *chunkedSeriesIterator) AtT() int64 {
return it.cur.AtT()
}
// TODO(krajorama): test AtST once we have a chunk format that provides ST.
func (it *chunkedSeriesIterator) AtST() int64 {
return it.cur.AtST()
}
func (it *chunkedSeriesIterator) Err() error {
return it.err
}

View file

@ -1146,7 +1146,7 @@ func buildTestChunks(t *testing.T) []prompb.Chunk {
minTimeMs := time
for j := range numSamplesPerTestChunk {
a.Append(time, float64(i+j))
a.Append(0, time, float64(i+j))
time += int64(1000)
}

View file

@ -138,6 +138,11 @@ func (it *listSeriesIterator) AtT() int64 {
return s.T()
}
func (it *listSeriesIterator) AtST() int64 {
s := it.samples.Get(it.idx)
return s.ST()
}
func (it *listSeriesIterator) Next() chunkenc.ValueType {
it.idx++
if it.idx >= it.samples.Len() {
@ -355,18 +360,20 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
lastType = typ
var (
t int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
st, t int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
)
switch typ {
case chunkenc.ValFloat:
t, v = seriesIter.At()
app.Append(t, v)
st = seriesIter.AtST()
app.Append(st, t, v)
case chunkenc.ValHistogram:
t, h = seriesIter.AtHistogram(nil)
newChk, recoded, app, err = app.AppendHistogram(nil, t, h, false)
st = seriesIter.AtST()
newChk, recoded, app, err = app.AppendHistogram(nil, st, t, h, false)
if err != nil {
return errChunksIterator{err: err}
}
@ -381,7 +388,8 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
}
case chunkenc.ValFloatHistogram:
t, fh = seriesIter.AtFloatHistogram(nil)
newChk, recoded, app, err = app.AppendFloatHistogram(nil, t, fh, false)
st = seriesIter.AtST()
newChk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, fh, false)
if err != nil {
return errChunksIterator{err: err}
}

View file

@ -99,9 +99,9 @@ type Iterable interface {
Iterator(Iterator) Iterator
}
// Appender adds sample pairs to a chunk.
// Appender adds sample with start timestamp, timestamp, and value to a chunk.
type Appender interface {
Append(int64, float64)
Append(st, t int64, v float64)
// AppendHistogram and AppendFloatHistogram append a histogram sample to a histogram or float histogram chunk.
// Appending a histogram may require creating a completely new chunk or recoding (changing) the current chunk.
@ -114,8 +114,8 @@ type Appender interface {
// The returned bool isRecoded can be used to distinguish between the new Chunk c being a completely new Chunk
// or the current Chunk recoded to a new Chunk.
// The Appender app that can be used for the next append is always returned.
AppendHistogram(prev *HistogramAppender, t int64, h *histogram.Histogram, appendOnly bool) (c Chunk, isRecoded bool, app Appender, err error)
AppendFloatHistogram(prev *FloatHistogramAppender, t int64, h *histogram.FloatHistogram, appendOnly bool) (c Chunk, isRecoded bool, app Appender, err error)
AppendHistogram(prev *HistogramAppender, st, t int64, h *histogram.Histogram, appendOnly bool) (c Chunk, isRecoded bool, app Appender, err error)
AppendFloatHistogram(prev *FloatHistogramAppender, st, t int64, h *histogram.FloatHistogram, appendOnly bool) (c Chunk, isRecoded bool, app Appender, err error)
}
// Iterator is a simple iterator that can only get the next value.
@ -151,6 +151,10 @@ type Iterator interface {
// AtT returns the current timestamp.
// Before the iterator has advanced, the behaviour is unspecified.
AtT() int64
// AtST returns the current start timestamp.
// Return 0 if the start timestamp is not implemented or not set.
// Before the iterator has advanced, the behaviour is unspecified.
AtST() int64
// Err returns the current error. It should be used only after the
// iterator is exhausted, i.e. `Next` or `Seek` have returned ValNone.
Err() error
@ -208,25 +212,30 @@ func (v ValueType) NewChunk() (Chunk, error) {
}
}
// MockSeriesIterator returns an iterator for a mock series with custom timeStamps and values.
func MockSeriesIterator(timestamps []int64, values []float64) Iterator {
// MockSeriesIterator returns an iterator for a mock series with custom
// start timestamp, timestamps, and values.
// Start timestamps is optional, pass nil or empty slice to indicate no start
// timestamps.
func MockSeriesIterator(startTimestamps, timestamps []int64, values []float64) Iterator {
return &mockSeriesIterator{
timeStamps: timestamps,
values: values,
currIndex: -1,
startTimestamps: startTimestamps,
timestamps: timestamps,
values: values,
currIndex: -1,
}
}
type mockSeriesIterator struct {
timeStamps []int64
values []float64
currIndex int
timestamps []int64
startTimestamps []int64
values []float64
currIndex int
}
func (*mockSeriesIterator) Seek(int64) ValueType { return ValNone }
func (it *mockSeriesIterator) At() (int64, float64) {
return it.timeStamps[it.currIndex], it.values[it.currIndex]
return it.timestamps[it.currIndex], it.values[it.currIndex]
}
func (*mockSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
@ -238,11 +247,18 @@ func (*mockSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *
}
func (it *mockSeriesIterator) AtT() int64 {
return it.timeStamps[it.currIndex]
return it.timestamps[it.currIndex]
}
func (it *mockSeriesIterator) AtST() int64 {
if len(it.startTimestamps) == 0 {
return 0
}
return it.startTimestamps[it.currIndex]
}
func (it *mockSeriesIterator) Next() ValueType {
if it.currIndex < len(it.timeStamps)-1 {
if it.currIndex < len(it.timestamps)-1 {
it.currIndex++
return ValFloat
}
@ -268,8 +284,9 @@ func (nopIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogra
func (nopIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return math.MinInt64, nil
}
func (nopIterator) AtT() int64 { return math.MinInt64 }
func (nopIterator) Err() error { return nil }
func (nopIterator) AtT() int64 { return math.MinInt64 }
func (nopIterator) AtST() int64 { return 0 }
func (nopIterator) Err() error { return nil }
// Pool is used to create and reuse chunk references to avoid allocations.
type Pool interface {

View file

@ -65,7 +65,7 @@ func testChunk(t *testing.T, c Chunk) {
require.NoError(t, err)
}
app.Append(ts, v)
app.Append(0, ts, v)
exp = append(exp, pair{t: ts, v: v})
}
@ -226,7 +226,7 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
if j > 250 {
break
}
a.Append(p.t, p.v)
a.Append(0, p.t, p.v)
j++
}
}
@ -303,7 +303,7 @@ func benchmarkAppender(b *testing.B, deltas func() (int64, float64), newChunk fu
b.Fatalf("get appender: %s", err)
}
for _, p := range exp {
a.Append(p.t, p.v)
a.Append(0, p.t, p.v)
}
}
}

View file

@ -195,7 +195,7 @@ func (a *FloatHistogramAppender) NumSamples() int {
// Append implements Appender. This implementation panics because normal float
// samples must never be appended to a histogram chunk.
func (*FloatHistogramAppender) Append(int64, float64) {
func (*FloatHistogramAppender) Append(int64, int64, float64) {
panic("appended a float sample to a histogram chunk")
}
@ -682,11 +682,11 @@ func (*FloatHistogramAppender) recodeHistogram(
}
}
func (*FloatHistogramAppender) AppendHistogram(*HistogramAppender, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
func (*FloatHistogramAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
panic("appended a histogram sample to a float histogram chunk")
}
func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppender, t int64, h *histogram.FloatHistogram, appendOnly bool) (Chunk, bool, Appender, error) {
func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppender, _, t int64, h *histogram.FloatHistogram, appendOnly bool) (Chunk, bool, Appender, error) {
if a.NumSamples() == 0 {
a.appendFloatHistogram(t, h)
if h.CounterResetHint == histogram.GaugeType {
@ -938,6 +938,10 @@ func (it *floatHistogramIterator) AtT() int64 {
return it.t
}
func (*floatHistogramIterator) AtST() int64 {
return 0
}
func (it *floatHistogramIterator) Err() error {
return it.err
}

View file

@ -63,7 +63,7 @@ func TestFirstFloatHistogramExplicitCounterReset(t *testing.T) {
chk := NewFloatHistogramChunk()
app, err := chk.Appender()
require.NoError(t, err)
newChk, recoded, newApp, err := app.AppendFloatHistogram(nil, 0, h, false)
newChk, recoded, newApp, err := app.AppendFloatHistogram(nil, 0, 0, h, false)
require.NoError(t, err)
require.Nil(t, newChk)
require.False(t, recoded)
@ -101,7 +101,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
},
NegativeBuckets: []int64{2, 1, -1, -1}, // counts: 2, 3, 2, 1 (total 8)
}
chk, _, app, err := app.AppendFloatHistogram(nil, ts, h.ToFloat(nil), false)
chk, _, app, err := app.AppendFloatHistogram(nil, 0, ts, h.ToFloat(nil), false)
require.NoError(t, err)
require.Nil(t, chk)
exp = append(exp, floatResult{t: ts, h: h.ToFloat(nil)})
@ -115,7 +115,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
h.Sum = 24.4
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
chk, _, _, err = app.AppendFloatHistogram(nil, ts, h.ToFloat(nil), false)
chk, _, _, err = app.AppendFloatHistogram(nil, 0, ts, h.ToFloat(nil), false)
require.NoError(t, err)
require.Nil(t, chk)
expH := h.ToFloat(nil)
@ -134,7 +134,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
h.Sum = 24.4
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
chk, _, _, err = app.AppendFloatHistogram(nil, ts, h.ToFloat(nil), false)
chk, _, _, err = app.AppendFloatHistogram(nil, 0, ts, h.ToFloat(nil), false)
require.NoError(t, err)
require.Nil(t, chk)
expH = h.ToFloat(nil)
@ -224,7 +224,7 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
NegativeBuckets: []int64{1},
}
chk, _, app, err := app.AppendFloatHistogram(nil, ts1, h1.ToFloat(nil), false)
chk, _, app, err := app.AppendFloatHistogram(nil, 0, ts1, h1.ToFloat(nil), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
@ -260,7 +260,7 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
chk, _, _, err = app.AppendFloatHistogram(nil, ts2, h2.ToFloat(nil), false)
chk, _, _, err = app.AppendFloatHistogram(nil, 0, ts2, h2.ToFloat(nil), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 2, c.NumSamples())
@ -330,7 +330,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
ts := int64(1234567890)
chk, _, app, err := app.AppendFloatHistogram(nil, ts, h.Copy(), false)
chk, _, app, err := app.AppendFloatHistogram(nil, 0, ts, h.Copy(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
@ -557,7 +557,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
nextChunk := NewFloatHistogramChunk()
app, err := nextChunk.Appender()
require.NoError(t, err)
newChunk, recoded, newApp, err := app.AppendFloatHistogram(hApp, ts+1, h2, false)
newChunk, recoded, newApp, err := app.AppendFloatHistogram(hApp, 0, ts+1, h2, false)
require.NoError(t, err)
require.Nil(t, newChunk)
require.False(t, recoded)
@ -575,7 +575,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
nextChunk := NewFloatHistogramChunk()
app, err := nextChunk.Appender()
require.NoError(t, err)
newChunk, recoded, newApp, err := app.AppendFloatHistogram(hApp, ts+1, h2, false)
newChunk, recoded, newApp, err := app.AppendFloatHistogram(hApp, 0, ts+1, h2, false)
require.NoError(t, err)
require.Nil(t, newChunk)
require.False(t, recoded)
@ -602,7 +602,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
nextChunk := NewFloatHistogramChunk()
app, err := nextChunk.Appender()
require.NoError(t, err)
newChunk, recoded, newApp, err := app.AppendFloatHistogram(hApp, ts+1, h2, false)
newChunk, recoded, newApp, err := app.AppendFloatHistogram(hApp, 0, ts+1, h2, false)
require.NoError(t, err)
require.Nil(t, newChunk)
require.False(t, recoded)
@ -717,7 +717,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
func assertNewFloatHistogramChunkOnAppend(t *testing.T, oldChunk Chunk, hApp *FloatHistogramAppender, ts int64, h *histogram.FloatHistogram, expectHeader CounterResetHeader, expectHint histogram.CounterResetHint) {
oldChunkBytes := oldChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendFloatHistogram(nil, ts, h, false)
newChunk, recoded, newAppender, err := hApp.AppendFloatHistogram(nil, 0, ts, h, false)
require.Equal(t, oldChunkBytes, oldChunk.Bytes()) // Sanity check that previous chunk is untouched.
require.NoError(t, err)
require.NotNil(t, newChunk)
@ -732,7 +732,7 @@ func assertNewFloatHistogramChunkOnAppend(t *testing.T, oldChunk Chunk, hApp *Fl
func assertNoNewFloatHistogramChunkOnAppend(t *testing.T, oldChunk Chunk, hApp *FloatHistogramAppender, ts int64, h *histogram.FloatHistogram, expectHeader CounterResetHeader) {
oldChunkBytes := oldChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendFloatHistogram(nil, ts, h, false)
newChunk, recoded, newAppender, err := hApp.AppendFloatHistogram(nil, 0, ts, h, false)
require.Greater(t, len(oldChunk.Bytes()), len(oldChunkBytes)) // Check that current chunk is bigger than previously.
require.NoError(t, err)
require.Nil(t, newChunk)
@ -745,7 +745,7 @@ func assertNoNewFloatHistogramChunkOnAppend(t *testing.T, oldChunk Chunk, hApp *
func assertRecodedFloatHistogramChunkOnAppend(t *testing.T, prevChunk Chunk, hApp *FloatHistogramAppender, ts int64, h *histogram.FloatHistogram, expectHeader CounterResetHeader) {
prevChunkBytes := prevChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendFloatHistogram(nil, ts, h, false)
newChunk, recoded, newAppender, err := hApp.AppendFloatHistogram(nil, 0, ts, h, false)
require.Equal(t, prevChunkBytes, prevChunk.Bytes()) // Sanity check that previous chunk is untouched. This may change in the future if we implement in-place recoding.
require.NoError(t, err)
require.NotNil(t, newChunk)
@ -959,7 +959,7 @@ func TestFloatHistogramChunkAppendableWithEmptySpan(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
_, _, _, err = app.AppendFloatHistogram(nil, 1, tc.h1, true)
_, _, _, err = app.AppendFloatHistogram(nil, 0, 1, tc.h1, true)
require.NoError(t, err)
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*FloatHistogramAppender)
@ -1019,7 +1019,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
ts := int64(1234567890)
chk, _, app, err := app.AppendFloatHistogram(nil, ts, h.Copy(), false)
chk, _, app, err := app.AppendFloatHistogram(nil, 0, ts, h.Copy(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
@ -1259,7 +1259,7 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) {
h := tsdbutil.GenerateTestFloatHistogram(0)
var isRecoded bool
c, isRecoded, app, err = app.AppendFloatHistogram(nil, 1, h, true)
c, isRecoded, app, err = app.AppendFloatHistogram(nil, 0, 1, h, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.NoError(t, err)
@ -1267,7 +1267,7 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) {
// Add erroring histogram.
h2 := h.Copy()
h2.Schema++
c, isRecoded, _, err = app.AppendFloatHistogram(nil, 2, h2, true)
c, isRecoded, _, err = app.AppendFloatHistogram(nil, 0, 2, h2, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.EqualError(t, err, "float histogram schema change")
@ -1281,7 +1281,7 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) {
h := tsdbutil.GenerateTestFloatHistogram(0)
var isRecoded bool
c, isRecoded, app, err = app.AppendFloatHistogram(nil, 1, h, true)
c, isRecoded, app, err = app.AppendFloatHistogram(nil, 0, 1, h, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.NoError(t, err)
@ -1289,7 +1289,7 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) {
// Add erroring histogram.
h2 := h.Copy()
h2.CounterResetHint = histogram.CounterReset
c, isRecoded, _, err = app.AppendFloatHistogram(nil, 2, h2, true)
c, isRecoded, _, err = app.AppendFloatHistogram(nil, 0, 2, h2, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.EqualError(t, err, "float histogram counter reset")
@ -1303,7 +1303,7 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) {
h := tsdbutil.GenerateTestCustomBucketsFloatHistogram(0)
var isRecoded bool
c, isRecoded, app, err = app.AppendFloatHistogram(nil, 1, h, true)
c, isRecoded, app, err = app.AppendFloatHistogram(nil, 0, 1, h, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.NoError(t, err)
@ -1311,7 +1311,7 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) {
// Add erroring histogram.
h2 := h.Copy()
h2.CustomValues = []float64{0, 1, 2, 3, 4, 5, 6, 7}
c, isRecoded, _, err = app.AppendFloatHistogram(nil, 2, h2, true)
c, isRecoded, _, err = app.AppendFloatHistogram(nil, 0, 2, h2, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.EqualError(t, err, "float histogram counter reset")
@ -1344,10 +1344,10 @@ func TestFloatHistogramUniqueSpansAfterNext(t *testing.T) {
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 0, h1, false)
_, _, _, err = app.AppendFloatHistogram(nil, 0, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 1, h2, false)
_, _, _, err = app.AppendFloatHistogram(nil, 0, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
@ -1390,10 +1390,10 @@ func TestFloatHistogramUniqueCustomValuesAfterNext(t *testing.T) {
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 0, h1, false)
_, _, _, err = app.AppendFloatHistogram(nil, 0, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 1, h2, false)
_, _, _, err = app.AppendFloatHistogram(nil, 0, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
@ -1435,7 +1435,7 @@ func TestFloatHistogramEmptyBucketsWithGaps(t *testing.T) {
c := NewFloatHistogramChunk()
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 1, h1, false)
_, _, _, err = app.AppendFloatHistogram(nil, 0, 1, h1, false)
require.NoError(t, err)
h2 := &histogram.FloatHistogram{
@ -1448,7 +1448,7 @@ func TestFloatHistogramEmptyBucketsWithGaps(t *testing.T) {
}
require.NoError(t, h2.Validate())
newC, recoded, _, err := app.AppendFloatHistogram(nil, 2, h2, false)
newC, recoded, _, err := app.AppendFloatHistogram(nil, 0, 2, h2, false)
require.NoError(t, err)
require.True(t, recoded)
require.NotNil(t, newC)
@ -1483,7 +1483,7 @@ func TestFloatHistogramIteratorFailIfSchemaInValid(t *testing.T) {
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 1, h, false)
_, _, _, err = app.AppendFloatHistogram(nil, 0, 1, h, false)
require.NoError(t, err)
it := c.Iterator(nil)
@ -1512,7 +1512,7 @@ func TestFloatHistogramIteratorReduceSchema(t *testing.T) {
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 1, h, false)
_, _, _, err = app.AppendFloatHistogram(nil, 0, 1, h, false)
require.NoError(t, err)
it := c.Iterator(nil)

View file

@ -219,7 +219,7 @@ func (a *HistogramAppender) NumSamples() int {
// Append implements Appender. This implementation panics because normal float
// samples must never be appended to a histogram chunk.
func (*HistogramAppender) Append(int64, float64) {
func (*HistogramAppender) Append(int64, int64, float64) {
panic("appended a float sample to a histogram chunk")
}
@ -734,11 +734,11 @@ func (a *HistogramAppender) writeSumDelta(v float64) {
xorWrite(a.b, v, a.sum, &a.leading, &a.trailing)
}
func (*HistogramAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
func (*HistogramAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
panic("appended a float histogram sample to a histogram chunk")
}
func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) {
func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, _, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) {
if a.NumSamples() == 0 {
a.appendHistogram(t, h)
if h.CounterResetHint == histogram.GaugeType {
@ -1075,6 +1075,10 @@ func (it *histogramIterator) AtT() int64 {
return it.t
}
func (*histogramIterator) AtST() int64 {
return 0
}
func (it *histogramIterator) Err() error {
return it.err
}

View file

@ -64,7 +64,7 @@ func TestFirstHistogramExplicitCounterReset(t *testing.T) {
chk := NewHistogramChunk()
app, err := chk.Appender()
require.NoError(t, err)
newChk, recoded, newApp, err := app.AppendHistogram(nil, 0, h, false)
newChk, recoded, newApp, err := app.AppendHistogram(nil, 0, 0, h, false)
require.NoError(t, err)
require.Nil(t, newChk)
require.False(t, recoded)
@ -102,7 +102,7 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
},
NegativeBuckets: []int64{2, 1, -1, -1}, // counts: 2, 3, 2, 1 (total 8)
}
chk, _, app, err := app.AppendHistogram(nil, ts, h, false)
chk, _, app, err := app.AppendHistogram(nil, 0, ts, h, false)
require.NoError(t, err)
require.Nil(t, chk)
exp = append(exp, result{t: ts, h: h, fh: h.ToFloat(nil)})
@ -116,7 +116,7 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
h.Sum = 24.4
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
chk, _, _, err = app.AppendHistogram(nil, ts, h, false)
chk, _, _, err = app.AppendHistogram(nil, 0, ts, h, false)
require.NoError(t, err)
require.Nil(t, chk)
hExp := h.Copy()
@ -135,7 +135,7 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
h.Sum = 24.4
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
chk, _, _, err = app.AppendHistogram(nil, ts, h, false)
chk, _, _, err = app.AppendHistogram(nil, 0, ts, h, false)
require.NoError(t, err)
require.Nil(t, chk)
hExp = h.Copy()
@ -235,7 +235,7 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
NegativeBuckets: []int64{1},
}
chk, _, app, err := app.AppendHistogram(nil, ts1, h1, false)
chk, _, app, err := app.AppendHistogram(nil, 0, ts1, h1, false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
@ -271,7 +271,7 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
require.True(t, ok) // Only new buckets came in.
require.Equal(t, NotCounterReset, cr)
c, app = hApp.recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
chk, _, _, err = app.AppendHistogram(nil, ts2, h2, false)
chk, _, _, err = app.AppendHistogram(nil, 0, ts2, h2, false)
require.NoError(t, err)
require.Nil(t, chk)
@ -344,7 +344,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
ts := int64(1234567890)
chk, _, app, err := app.AppendHistogram(nil, ts, h.Copy(), false)
chk, _, app, err := app.AppendHistogram(nil, 0, ts, h.Copy(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
@ -581,7 +581,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
nextChunk := NewHistogramChunk()
app, err := nextChunk.Appender()
require.NoError(t, err)
newChunk, recoded, newApp, err := app.AppendHistogram(hApp, ts+1, h2, false)
newChunk, recoded, newApp, err := app.AppendHistogram(hApp, 0, ts+1, h2, false)
require.NoError(t, err)
require.Nil(t, newChunk)
require.False(t, recoded)
@ -599,7 +599,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
nextChunk := NewHistogramChunk()
app, err := nextChunk.Appender()
require.NoError(t, err)
newChunk, recoded, newApp, err := app.AppendHistogram(hApp, ts+1, h2, false)
newChunk, recoded, newApp, err := app.AppendHistogram(hApp, 0, ts+1, h2, false)
require.NoError(t, err)
require.Nil(t, newChunk)
require.False(t, recoded)
@ -629,7 +629,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
nextChunk := NewHistogramChunk()
app, err := nextChunk.Appender()
require.NoError(t, err)
newChunk, recoded, newApp, err := app.AppendHistogram(hApp, ts+1, h2, false)
newChunk, recoded, newApp, err := app.AppendHistogram(hApp, 0, ts+1, h2, false)
require.NoError(t, err)
require.Nil(t, newChunk)
require.False(t, recoded)
@ -776,7 +776,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
func assertNewHistogramChunkOnAppend(t *testing.T, oldChunk Chunk, hApp *HistogramAppender, ts int64, h *histogram.Histogram, expectHeader CounterResetHeader, expectHint histogram.CounterResetHint) {
oldChunkBytes := oldChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendHistogram(nil, ts, h, false)
newChunk, recoded, newAppender, err := hApp.AppendHistogram(nil, 0, ts, h, false)
require.Equal(t, oldChunkBytes, oldChunk.Bytes()) // Sanity check that previous chunk is untouched.
require.NoError(t, err)
require.NotNil(t, newChunk)
@ -791,7 +791,7 @@ func assertNewHistogramChunkOnAppend(t *testing.T, oldChunk Chunk, hApp *Histogr
func assertNoNewHistogramChunkOnAppend(t *testing.T, currChunk Chunk, hApp *HistogramAppender, ts int64, h *histogram.Histogram, expectHeader CounterResetHeader) {
prevChunkBytes := currChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendHistogram(nil, ts, h, false)
newChunk, recoded, newAppender, err := hApp.AppendHistogram(nil, 0, ts, h, false)
require.Greater(t, len(currChunk.Bytes()), len(prevChunkBytes)) // Check that current chunk is bigger than previously.
require.NoError(t, err)
require.Nil(t, newChunk)
@ -804,7 +804,7 @@ func assertNoNewHistogramChunkOnAppend(t *testing.T, currChunk Chunk, hApp *Hist
func assertRecodedHistogramChunkOnAppend(t *testing.T, prevChunk Chunk, hApp *HistogramAppender, ts int64, h *histogram.Histogram, expectHeader CounterResetHeader) {
prevChunkBytes := prevChunk.Bytes()
newChunk, recoded, newAppender, err := hApp.AppendHistogram(nil, ts, h, false)
newChunk, recoded, newAppender, err := hApp.AppendHistogram(nil, 0, ts, h, false)
require.Equal(t, prevChunkBytes, prevChunk.Bytes()) // Sanity check that previous chunk is untouched. This may change in the future if we implement in-place recoding.
require.NoError(t, err)
require.NotNil(t, newChunk)
@ -1029,7 +1029,7 @@ func TestHistogramChunkAppendableWithEmptySpan(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, c.NumSamples())
_, _, _, err = app.AppendHistogram(nil, 1, tc.h1, true)
_, _, _, err = app.AppendHistogram(nil, 1, 0, tc.h1, true)
require.NoError(t, err)
require.Equal(t, 1, c.NumSamples())
hApp, _ := app.(*HistogramAppender)
@ -1172,7 +1172,7 @@ func TestAtFloatHistogram(t *testing.T) {
app, err := chk.Appender()
require.NoError(t, err)
for i := range input {
newc, _, _, err := app.AppendHistogram(nil, int64(i), &input[i], false)
newc, _, _, err := app.AppendHistogram(nil, 0, int64(i), &input[i], false)
require.NoError(t, err)
require.Nil(t, newc)
}
@ -1230,7 +1230,7 @@ func TestHistogramChunkAppendableGauge(t *testing.T) {
ts := int64(1234567890)
chk, _, app, err := app.AppendHistogram(nil, ts, h.Copy(), false)
chk, _, app, err := app.AppendHistogram(nil, 0, ts, h.Copy(), false)
require.NoError(t, err)
require.Nil(t, chk)
require.Equal(t, 1, c.NumSamples())
@ -1471,7 +1471,7 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
h := tsdbutil.GenerateTestHistogram(0)
var isRecoded bool
c, isRecoded, app, err = app.AppendHistogram(nil, 1, h, true)
c, isRecoded, app, err = app.AppendHistogram(nil, 0, 1, h, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.NoError(t, err)
@ -1479,7 +1479,7 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
// Add erroring histogram.
h2 := h.Copy()
h2.Schema++
c, isRecoded, _, err = app.AppendHistogram(nil, 2, h2, true)
c, isRecoded, _, err = app.AppendHistogram(nil, 0, 2, h2, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.EqualError(t, err, "histogram schema change")
@ -1493,7 +1493,7 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
h := tsdbutil.GenerateTestHistogram(0)
var isRecoded bool
c, isRecoded, app, err = app.AppendHistogram(nil, 1, h, true)
c, isRecoded, app, err = app.AppendHistogram(nil, 0, 1, h, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.NoError(t, err)
@ -1501,7 +1501,7 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
// Add erroring histogram.
h2 := h.Copy()
h2.CounterResetHint = histogram.CounterReset
c, isRecoded, _, err = app.AppendHistogram(nil, 2, h2, true)
c, isRecoded, _, err = app.AppendHistogram(nil, 0, 2, h2, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.EqualError(t, err, "histogram counter reset")
@ -1515,7 +1515,7 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
h := tsdbutil.GenerateTestCustomBucketsHistogram(0)
var isRecoded bool
c, isRecoded, app, err = app.AppendHistogram(nil, 1, h, true)
c, isRecoded, app, err = app.AppendHistogram(nil, 0, 1, h, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.NoError(t, err)
@ -1523,7 +1523,7 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
// Add erroring histogram.
h2 := h.Copy()
h2.CustomValues = []float64{0, 1, 2, 3, 4, 5, 6, 7}
c, isRecoded, _, err = app.AppendHistogram(nil, 2, h2, true)
c, isRecoded, _, err = app.AppendHistogram(nil, 0, 2, h2, true)
require.Nil(t, c)
require.False(t, isRecoded)
require.EqualError(t, err, "histogram counter reset")
@ -1556,10 +1556,10 @@ func TestHistogramUniqueSpansAfterNextWithAtHistogram(t *testing.T) {
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 0, h1, false)
_, _, _, err = app.AppendHistogram(nil, 0, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h2, false)
_, _, _, err = app.AppendHistogram(nil, 0, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
@ -1607,10 +1607,10 @@ func TestHistogramUniqueSpansAfterNextWithAtFloatHistogram(t *testing.T) {
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 0, h1, false)
_, _, _, err = app.AppendHistogram(nil, 0, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h2, false)
_, _, _, err = app.AppendHistogram(nil, 0, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
@ -1653,10 +1653,10 @@ func TestHistogramCustomValuesInternedAfterNextWithAtHistogram(t *testing.T) {
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 0, h1, false)
_, _, _, err = app.AppendHistogram(nil, 0, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h2, false)
_, _, _, err = app.AppendHistogram(nil, 0, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
@ -1699,10 +1699,10 @@ func TestHistogramCustomValuesInternedAfterNextWithAtFloatHistogram(t *testing.T
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 0, h1, false)
_, _, _, err = app.AppendHistogram(nil, 0, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h2, false)
_, _, _, err = app.AppendHistogram(nil, 0, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
@ -1754,7 +1754,7 @@ func BenchmarkAppendable(b *testing.B) {
b.Fatal(err)
}
_, _, _, err = app.AppendHistogram(nil, 1, h, true)
_, _, _, err = app.AppendHistogram(nil, 0, 1, h, true)
if err != nil {
b.Fatal(err)
}
@ -1791,7 +1791,7 @@ func TestIntHistogramEmptyBucketsWithGaps(t *testing.T) {
c := NewHistogramChunk()
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h1, false)
_, _, _, err = app.AppendHistogram(nil, 0, 1, h1, false)
require.NoError(t, err)
h2 := &histogram.Histogram{
@ -1804,7 +1804,7 @@ func TestIntHistogramEmptyBucketsWithGaps(t *testing.T) {
}
require.NoError(t, h2.Validate())
newC, recoded, _, err := app.AppendHistogram(nil, 2, h2, false)
newC, recoded, _, err := app.AppendHistogram(nil, 0, 2, h2, false)
require.NoError(t, err)
require.True(t, recoded)
require.NotNil(t, newC)
@ -1839,7 +1839,7 @@ func TestHistogramIteratorFailIfSchemaInValid(t *testing.T) {
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h, false)
_, _, _, err = app.AppendHistogram(nil, 0, 1, h, false)
require.NoError(t, err)
it := c.Iterator(nil)
@ -1868,7 +1868,7 @@ func TestHistogramIteratorReduceSchema(t *testing.T) {
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h, false)
_, _, _, err = app.AppendHistogram(nil, 0, 1, h, false)
require.NoError(t, err)
it := c.Iterator(nil)

View file

@ -158,7 +158,7 @@ type xorAppender struct {
trailing uint8
}
func (a *xorAppender) Append(t int64, v float64) {
func (a *xorAppender) Append(_, t int64, v float64) {
var tDelta uint64
num := binary.BigEndian.Uint16(a.b.bytes())
switch num {
@ -225,11 +225,11 @@ func (a *xorAppender) writeVDelta(v float64) {
xorWrite(a.b, v, a.v, &a.leading, &a.trailing)
}
func (*xorAppender) AppendHistogram(*HistogramAppender, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
func (*xorAppender) AppendHistogram(*HistogramAppender, int64, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
panic("appended a histogram sample to a float chunk")
}
func (*xorAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
func (*xorAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
panic("appended a float histogram sample to a float chunk")
}
@ -277,6 +277,10 @@ func (it *xorIterator) AtT() int64 {
return it.t
}
func (*xorIterator) AtST() int64 {
return 0
}
func (it *xorIterator) Err() error {
return it.err
}

View file

@ -24,7 +24,7 @@ func BenchmarkXorRead(b *testing.B) {
app, err := c.Appender()
require.NoError(b, err)
for i := int64(0); i < 120*1000; i += 1000 {
app.Append(i, float64(i)+float64(i)/10+float64(i)/100+float64(i)/1000)
app.Append(0, i, float64(i)+float64(i)/10+float64(i)/100+float64(i)/1000)
}
b.ReportAllocs()

View file

@ -135,6 +135,7 @@ type Meta struct {
}
// ChunkFromSamples requires all samples to have the same type.
// TODO(krajorama): test with ST when chunk formats support it.
func ChunkFromSamples(s []Sample) (Meta, error) {
return ChunkFromSamplesGeneric(SampleSlice(s))
}
@ -164,9 +165,9 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
for i := 0; i < s.Len(); i++ {
switch sampleType {
case chunkenc.ValFloat:
ca.Append(s.Get(i).T(), s.Get(i).F())
ca.Append(s.Get(i).ST(), s.Get(i).T(), s.Get(i).F())
case chunkenc.ValHistogram:
newChunk, _, ca, err = ca.AppendHistogram(nil, s.Get(i).T(), s.Get(i).H(), false)
newChunk, _, ca, err = ca.AppendHistogram(nil, s.Get(i).ST(), s.Get(i).T(), s.Get(i).H(), false)
if err != nil {
return emptyChunk, err
}
@ -174,7 +175,7 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
return emptyChunk, errors.New("did not expect to start a second chunk")
}
case chunkenc.ValFloatHistogram:
newChunk, _, ca, err = ca.AppendFloatHistogram(nil, s.Get(i).T(), s.Get(i).FH(), false)
newChunk, _, ca, err = ca.AppendFloatHistogram(nil, s.Get(i).ST(), s.Get(i).T(), s.Get(i).FH(), false)
if err != nil {
return emptyChunk, err
}

View file

@ -559,7 +559,7 @@ func randomChunk(t *testing.T) chunkenc.Chunk {
app, err := chunk.Appender()
require.NoError(t, err)
for range length {
app.Append(rand.Int63(), rand.Float64())
app.Append(0, rand.Int63(), rand.Float64())
}
return chunk
}

View file

@ -25,6 +25,7 @@ type Samples interface {
type Sample interface {
T() int64
ST() int64
F() float64
H() *histogram.Histogram
FH() *histogram.FloatHistogram
@ -38,16 +39,20 @@ func (s SampleSlice) Get(i int) Sample { return s[i] }
func (s SampleSlice) Len() int { return len(s) }
type sample struct {
t int64
f float64
h *histogram.Histogram
fh *histogram.FloatHistogram
st, t int64
f float64
h *histogram.Histogram
fh *histogram.FloatHistogram
}
func (s sample) T() int64 {
return s.t
}
func (s sample) ST() int64 {
return s.st
}
func (s sample) F() float64 {
return s.f
}

View file

@ -2111,7 +2111,10 @@ func newSample(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHi
return sample{t, v, h, fh}
}
func (s sample) T() int64 { return s.t }
func (s sample) T() int64 { return s.t }
// TODO(krajorama): implement ST.
func (sample) ST() int64 { return 0 }
func (s sample) F() float64 { return s.f }
func (s sample) H() *histogram.Histogram { return s.h }
func (s sample) FH() *histogram.FloatHistogram { return s.fh }

View file

@ -1843,7 +1843,8 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
s.app.Append(t, v)
// TODO(krajorama): pass ST.
s.app.Append(0, t, v)
c.maxTime = t
@ -1885,7 +1886,8 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
prevApp = nil
}
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, t, h, false) // false=request a new chunk if needed
// TODO(krajorama): pass ST.
newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, 0, t, h, false) // false=request a new chunk if needed
s.lastHistogramValue = h
s.lastFloatHistogramValue = nil
@ -1942,7 +1944,8 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
prevApp = nil
}
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, t, fh, false) // False means request a new chunk if needed.
// TODO(krajorama): pass ST.
newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, 0, t, fh, false) // False means request a new chunk if needed.
s.lastHistogramValue = nil
s.lastFloatHistogramValue = fh

View file

@ -125,7 +125,8 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
}
switch encoding {
case chunkenc.EncXOR:
app.Append(s.t, s.f)
// TODO(krajorama): pass ST.
app.Append(0, s.t, s.f)
case chunkenc.EncHistogram:
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
@ -133,7 +134,8 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
newChunk chunkenc.Chunk
recoded bool
)
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.t, s.h, false)
// TODO(krajorama): pass ST.
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, 0, s.t, s.h, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
@ -148,7 +150,8 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
newChunk chunkenc.Chunk
recoded bool
)
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.t, s.fh, false)
// TODO(krajorama): pass ST.
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, 0, s.t, s.fh, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})

View file

@ -788,6 +788,11 @@ func (p *populateWithDelSeriesIterator) AtT() int64 {
return p.curr.AtT()
}
// AtST TODO(krajorama): test AtST() when chunks support it.
func (p *populateWithDelSeriesIterator) AtST() int64 {
return p.curr.AtST()
}
func (p *populateWithDelSeriesIterator) Err() error {
if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil {
return err
@ -862,6 +867,7 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
// populateCurrForSingleChunk sets the fields within p.currMetaWithChunk. This
// should be called if the samples in p.currDelIter only form one chunk.
// TODO(krajorama): test ST when chunks support it.
func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
valueType := p.currDelIter.Next()
if valueType == chunkenc.ValNone {
@ -877,7 +883,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
var (
newChunk chunkenc.Chunk
app chunkenc.Appender
t int64
st, t int64
err error
)
switch valueType {
@ -893,7 +899,8 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
}
var h *histogram.Histogram
t, h = p.currDelIter.AtHistogram(nil)
_, _, app, err = app.AppendHistogram(nil, t, h, true)
st = p.currDelIter.AtST()
_, _, app, err = app.AppendHistogram(nil, st, t, h, true)
if err != nil {
break
}
@ -910,7 +917,8 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
}
var v float64
t, v = p.currDelIter.At()
app.Append(t, v)
st = p.currDelIter.AtST()
app.Append(st, t, v)
}
case chunkenc.ValFloatHistogram:
newChunk = chunkenc.NewFloatHistogramChunk()
@ -924,7 +932,8 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
}
var h *histogram.FloatHistogram
t, h = p.currDelIter.AtFloatHistogram(nil)
_, _, app, err = app.AppendFloatHistogram(nil, t, h, true)
st = p.currDelIter.AtST()
_, _, app, err = app.AppendFloatHistogram(nil, st, t, h, true)
if err != nil {
break
}
@ -950,6 +959,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
// populateChunksFromIterable reads the samples from currDelIter to create
// chunks for chunksFromIterable. It also sets p.currMetaWithChunk to the first
// chunk.
// TODO(krajorama): test ST when chunks support it.
func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
p.chunksFromIterable = p.chunksFromIterable[:0]
p.chunksFromIterableIdx = -1
@ -965,7 +975,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
var (
// t is the timestamp for the current sample.
t int64
st, t int64
cmint int64
cmaxt int64
@ -1004,23 +1014,26 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
{
var v float64
t, v = p.currDelIter.At()
app.Append(t, v)
st = p.currDelIter.AtST()
app.Append(st, t, v)
}
case chunkenc.ValHistogram:
{
var v *histogram.Histogram
t, v = p.currDelIter.AtHistogram(nil)
st = p.currDelIter.AtST()
// No need to set prevApp as AppendHistogram will set the
// counter reset header for the appender that's returned.
newChunk, recoded, app, err = app.AppendHistogram(nil, t, v, false)
newChunk, recoded, app, err = app.AppendHistogram(nil, st, t, v, false)
}
case chunkenc.ValFloatHistogram:
{
var v *histogram.FloatHistogram
t, v = p.currDelIter.AtFloatHistogram(nil)
st = p.currDelIter.AtST()
// No need to set prevApp as AppendHistogram will set the
// counter reset header for the appender that's returned.
newChunk, recoded, app, err = app.AppendFloatHistogram(nil, t, v, false)
newChunk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, v, false)
}
}
@ -1202,6 +1215,11 @@ func (it *DeletedIterator) AtT() int64 {
return it.Iter.AtT()
}
// AtST TODO(krajorama): test AtST() when chunks support it.
func (it *DeletedIterator) AtST() int64 {
return it.Iter.AtST()
}
func (it *DeletedIterator) Seek(t int64) chunkenc.ValueType {
if it.Iter.Err() != nil {
return chunkenc.ValNone

View file

@ -141,7 +141,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
app, _ := chunk.Appender()
for _, smpl := range chk {
require.NotNil(t, smpl.fh, "chunk can only contain one type of sample")
_, _, _, err := app.AppendFloatHistogram(nil, smpl.t, smpl.fh, true)
_, _, _, err := app.AppendFloatHistogram(nil, 0, smpl.t, smpl.fh, true)
require.NoError(t, err, "chunk should be appendable")
}
chkReader[chunkRef] = chunk
@ -150,7 +150,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
app, _ := chunk.Appender()
for _, smpl := range chk {
require.NotNil(t, smpl.h, "chunk can only contain one type of sample")
_, _, _, err := app.AppendHistogram(nil, smpl.t, smpl.h, true)
_, _, _, err := app.AppendHistogram(nil, 0, smpl.t, smpl.h, true)
require.NoError(t, err, "chunk should be appendable")
}
chkReader[chunkRef] = chunk
@ -160,7 +160,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
for _, smpl := range chk {
require.Nil(t, smpl.h, "chunk can only contain one type of sample")
require.Nil(t, smpl.fh, "chunk can only contain one type of sample")
app.Append(smpl.t, smpl.f)
app.Append(0, smpl.t, smpl.f)
}
chkReader[chunkRef] = chunk
}
@ -790,6 +790,10 @@ func (it *mockSampleIterator) AtT() int64 {
return it.s[it.idx].T()
}
func (it *mockSampleIterator) AtST() int64 {
return it.s[it.idx].ST()
}
func (it *mockSampleIterator) Next() chunkenc.ValueType {
if it.idx < len(it.s)-1 {
it.idx++
@ -2096,7 +2100,7 @@ func TestDeletedIterator(t *testing.T) {
for i := range 1000 {
act[i].t = int64(i)
act[i].f = rand.Float64()
app.Append(act[i].t, act[i].f)
app.Append(0, act[i].t, act[i].f)
}
cases := []struct {
@ -2156,7 +2160,7 @@ func TestDeletedIterator_WithSeek(t *testing.T) {
for i := range 1000 {
act[i].t = int64(i)
act[i].f = float64(i)
app.Append(act[i].t, act[i].f)
app.Append(0, act[i].t, act[i].f)
}
cases := []struct {