prometheus/tsdb/chunkenc/bstream.go
György Krajcsovits 5e5b14c04b
feat(chunkenc): replace xoroptst chunk encoding with xor2
XOR2 is based on https://github.com/prometheus/prometheus/pull/18238
With additional ST support.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
2026-03-06 14:35:06 +01:00

352 lines
9.5 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The code in this file was largely written by Damian Gryski as part of
// https://github.com/dgryski/go-tsz and published under the license below.
// It received minor modifications to suit Prometheus's needs.
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
// All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package chunkenc
import (
"encoding/binary"
"io"
)
// bstream is a stream of bits.
type bstream struct {
stream []byte // The data stream.
count uint8 // How many right-most bits are available for writing in the current byte (the last byte of the stream).
}
// Reset resets b around stream.
func (b *bstream) Reset(stream []byte) {
b.stream = stream
b.count = 0
}
func (b *bstream) bytes() []byte {
return b.stream
}
type bit bool
const (
zero bit = false
one bit = true
)
func (b *bstream) writeBit(bit bit) {
if b.count == 0 {
b.stream = append(b.stream, 0)
b.count = 8
}
i := len(b.stream) - 1
if bit {
b.stream[i] |= 1 << (b.count - 1)
}
b.count--
}
func (b *bstream) writeByte(byt byte) {
if b.count == 0 {
b.stream = append(b.stream, byt)
return
}
i := len(b.stream) - 1
// Complete the last byte with the leftmost b.count bits from byt.
b.stream[i] |= byt >> (8 - b.count)
// Write the remainder, if any.
b.stream = append(b.stream, byt<<b.count)
}
// writeBits writes the nbits right-most bits of u to the stream
// in left-to-right order.
func (b *bstream) writeBits(u uint64, nbits int) {
u <<= 64 - uint(nbits)
for nbits >= 8 {
byt := byte(u >> 56)
b.writeByte(byt)
u <<= 8
nbits -= 8
}
for nbits > 0 {
b.writeBit((u >> 63) == 1)
u <<= 1
nbits--
}
}
type bstreamReader struct {
stream []byte
streamOffset int // The offset from which read the next byte from the stream.
buffer uint64 // The current buffer, filled from the stream, containing up to 8 bytes from which read bits.
valid uint8 // The number of right-most bits valid to read (from left) in the current 8 byte buffer.
last byte // A copy of the last byte of the stream.
}
func newBReader(b []byte) bstreamReader {
// The last byte of the stream can be updated later, so we take a copy.
var last byte
if len(b) > 0 {
last = b[len(b)-1]
}
return bstreamReader{
stream: b,
last: last,
}
}
func (b *bstreamReader) readBit() (bit, error) {
if b.valid == 0 {
if !b.loadNextBuffer(1) {
return false, io.EOF
}
}
return b.readBitFast()
}
// readBitFast is like readBit but can return io.EOF if the internal buffer is empty.
// If it returns io.EOF, the caller should retry reading bits calling readBit().
// This function must be kept small and a leaf in order to help the compiler inlining it
// and further improve performances.
func (b *bstreamReader) readBitFast() (bit, error) {
if b.valid == 0 {
return false, io.EOF
}
b.valid--
bitmask := uint64(1) << b.valid
return (b.buffer & bitmask) != 0, nil
}
// readBits constructs a uint64 with the nbits right-most bits
// read from the stream, and any other bits 0.
func (b *bstreamReader) readBits(nbits uint8) (uint64, error) {
if b.valid == 0 {
if !b.loadNextBuffer(nbits) {
return 0, io.EOF
}
}
if nbits <= b.valid {
return b.readBitsFast(nbits)
}
// We have to read all remaining valid bits from the current buffer and a part from the next one.
bitmask := (uint64(1) << b.valid) - 1
nbits -= b.valid
v := (b.buffer & bitmask) << nbits
b.valid = 0
if !b.loadNextBuffer(nbits) {
return 0, io.EOF
}
bitmask = (uint64(1) << nbits) - 1
v |= ((b.buffer >> (b.valid - nbits)) & bitmask)
b.valid -= nbits
return v, nil
}
// readBitsFast is like readBits but can return io.EOF if the internal buffer is empty.
// If it returns io.EOF, the caller should retry reading bits calling readBits().
// This function must be kept small and a leaf in order to help the compiler inlining it
// and further improve performances.
func (b *bstreamReader) readBitsFast(nbits uint8) (uint64, error) {
if nbits > b.valid {
return 0, io.EOF
}
bitmask := (uint64(1) << nbits) - 1
b.valid -= nbits
return (b.buffer >> b.valid) & bitmask, nil
}
func (b *bstreamReader) ReadByte() (byte, error) {
v, err := b.readBits(8)
if err != nil {
return 0, err
}
return byte(v), nil
}
// readXOR2Control reads the XOR2 variable-length joint control prefix
// and returns 0-5 mapping to the six encoding cases:
//
// 0 → '0' dod=0, val=0 (1 bit consumed)
// 1 → '10' dod=0, val≠0 (2 bits consumed)
// 2 → '110' dod≠0, 13-bit signed dod (3 bits consumed)
// 3 → '1110' dod≠0, 20-bit signed dod (4 bits consumed)
// 4 → '11110' dod≠0, 64-bit escape (5 bits consumed)
// 5 → '11111' dod=0, stale NaN (5 bits consumed)
//
// The fast path peeks at 4 bits from the internal buffer; for the '1111'
// prefix a fifth bit is read to distinguish cases 4 and 5.
func (b *bstreamReader) readXOR2Control() (uint8, error) {
if b.valid >= 4 {
top4 := uint8((b.buffer >> (b.valid - 4)) & 0xf)
if top4 < 8 { // '0xxx' → case 0.
b.valid--
return 0, nil
}
if top4 < 12 { // '10xx' → case 1.
b.valid -= 2
return 1, nil
}
if top4 < 14 { // '110x' → case 2.
b.valid -= 3
return 2, nil
}
if top4 == 14 { // '1110' → case 3.
b.valid -= 4
return 3, nil
}
// '1111': need fifth bit to distinguish cases 4 and 5.
if b.valid >= 5 {
bit4 := uint8((b.buffer >> (b.valid - 5)) & 1)
b.valid -= 5
return 4 + bit4, nil
}
// Fifth bit spans a buffer boundary; consume the four known bits
// and read the fifth from the stream.
b.valid -= 4
bit4, err := b.readBit()
if err != nil {
return 0, err
}
if bit4 == zero {
return 4, nil
}
return 5, nil
}
// Slow path: bits may span buffer boundaries, read one at a time.
bit0, err := b.readBit()
if err != nil {
return 0, err
}
if bit0 == zero {
return 0, nil
}
bit1, err := b.readBit()
if err != nil {
return 0, err
}
if bit1 == zero {
return 1, nil
}
bit2, err := b.readBit()
if err != nil {
return 0, err
}
if bit2 == zero {
return 2, nil
}
bit3, err := b.readBit()
if err != nil {
return 0, err
}
if bit3 == zero {
return 3, nil
}
bit4, err := b.readBit()
if err != nil {
return 0, err
}
if bit4 == zero {
return 4, nil
}
return 5, nil
}
// loadNextBuffer loads the next bytes from the stream into the internal buffer.
// The input nbits is the minimum number of bits that must be read, but the implementation
// can read more (if possible) to improve performances.
func (b *bstreamReader) loadNextBuffer(nbits uint8) bool {
if b.streamOffset >= len(b.stream) {
return false
}
// Handle the case there are more then 8 bytes in the buffer (most common case)
// in a optimized way. It's guaranteed that this branch will never read from the
// very last byte of the stream (which suffers race conditions due to concurrent
// writes).
if b.streamOffset+8 < len(b.stream) {
b.buffer = binary.BigEndian.Uint64(b.stream[b.streamOffset:])
b.streamOffset += 8
b.valid = 64
return true
}
// We're here if there are 8 or less bytes left in the stream.
// The following code is slower but called less frequently.
nbytes := int((nbits / 8) + 1)
if b.streamOffset+nbytes > len(b.stream) {
nbytes = len(b.stream) - b.streamOffset
}
buffer := uint64(0)
skip := 0
if b.streamOffset+nbytes == len(b.stream) {
// There can be concurrent writes happening on the very last byte
// of the stream, so use the copy we took at initialization time.
buffer |= uint64(b.last)
// Read up to the byte before
skip = 1
}
for i := 0; i < nbytes-skip; i++ {
buffer |= (uint64(b.stream[b.streamOffset+i]) << uint(8*(nbytes-i-1)))
}
b.buffer = buffer
b.streamOffset += nbytes
b.valid = uint8(nbytes * 8)
return true
}