From ea0ddb3fc956ee5102f35ba34844482bcb81db02 Mon Sep 17 00:00:00 2001 From: Georges Chaudy Date: Fri, 11 Jul 2025 11:10:19 +0200 Subject: [PATCH] unistore: refactor get to return a reader (#107951) --- pkg/storage/unified/resource/datastore.go | 6 +-- pkg/storage/unified/resource/eventstore.go | 13 ++++--- pkg/storage/unified/resource/kv.go | 30 +++++--------- pkg/storage/unified/resource/kv_test.go | 12 +++--- pkg/storage/unified/resource/metadata.go | 6 +-- pkg/storage/unified/resource/metadata_test.go | 7 ++-- pkg/storage/unified/testing/kv.go | 39 +++++++++---------- 7 files changed, 48 insertions(+), 65 deletions(-) diff --git a/pkg/storage/unified/resource/datastore.go b/pkg/storage/unified/resource/datastore.go index 836008117ba..5838b585b8f 100644 --- a/pkg/storage/unified/resource/datastore.go +++ b/pkg/storage/unified/resource/datastore.go @@ -239,11 +239,7 @@ func (d *dataStore) Get(ctx context.Context, key DataKey) (io.ReadCloser, error) return nil, fmt.Errorf("invalid data key: %w", err) } - obj, err := d.kv.Get(ctx, dataSection, key.String()) - if err != nil { - return nil, err - } - return obj.Value, nil + return d.kv.Get(ctx, dataSection, key.String()) } func (d *dataStore) Save(ctx context.Context, key DataKey, value io.Reader) error { diff --git a/pkg/storage/unified/resource/eventstore.go b/pkg/storage/unified/resource/eventstore.go index 6070d1a6eeb..e1f4b565952 100644 --- a/pkg/storage/unified/resource/eventstore.go +++ b/pkg/storage/unified/resource/eventstore.go @@ -147,16 +147,15 @@ func (n *eventStore) Get(ctx context.Context, key EventKey) (Event, error) { return Event{}, fmt.Errorf("invalid event key: %w", err) } - obj, err := n.kv.Get(ctx, eventsSection, key.String()) + reader, err := n.kv.Get(ctx, eventsSection, key.String()) if err != nil { return Event{}, err } + defer func() { _ = reader.Close() }() var event Event - if err = json.NewDecoder(obj.Value).Decode(&event); err != nil { - _ = obj.Value.Close() + if err = json.NewDecoder(reader).Decode(&event); err != nil { return Event{}, err } - defer func() { _ = obj.Value.Close() }() return event, nil } @@ -173,14 +172,16 @@ func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Eve if err != nil { return } - obj, err := n.kv.Get(ctx, eventsSection, key) + reader, err := n.kv.Get(ctx, eventsSection, key) if err != nil { return } var event Event - if err := json.NewDecoder(obj.Value).Decode(&event); err != nil { + if err := json.NewDecoder(reader).Decode(&event); err != nil { + _ = reader.Close() return } + _ = reader.Close() if !yield(event, nil) { return } diff --git a/pkg/storage/unified/resource/kv.go b/pkg/storage/unified/resource/kv.go index 21343984c8c..0e9c7a13d0e 100644 --- a/pkg/storage/unified/resource/kv.go +++ b/pkg/storage/unified/resource/kv.go @@ -29,18 +29,12 @@ type ListOptions struct { Limit int64 // maximum number of results to return. 0 means no limit. } -// KVObject represents a key-value object -type KVObject struct { - Key string // the key of the object within the section - Value io.ReadCloser // the value of the object -} - type KV interface { // Keys returns all the keys in the store Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error] - // Get retrieves a key-value pair from the store - Get(ctx context.Context, section string, key string) (KVObject, error) + // Get retrieves the value for a key from the store + Get(ctx context.Context, section string, key string) (io.ReadCloser, error) // Save a new value Save(ctx context.Context, section string, key string, value io.Reader) error @@ -67,16 +61,16 @@ func NewBadgerKV(db *badger.DB) *badgerKV { } } -func (k *badgerKV) Get(ctx context.Context, section string, key string) (KVObject, error) { +func (k *badgerKV) Get(ctx context.Context, section string, key string) (io.ReadCloser, error) { if k.db.IsClosed() { - return KVObject{}, fmt.Errorf("database is closed") + return nil, fmt.Errorf("database is closed") } txn := k.db.NewTransaction(false) defer txn.Discard() if section == "" { - return KVObject{}, fmt.Errorf("section is required") + return nil, fmt.Errorf("section is required") } key = section + "/" + key @@ -84,24 +78,18 @@ func (k *badgerKV) Get(ctx context.Context, section string, key string) (KVObjec item, err := txn.Get([]byte(key)) if err != nil { if errors.Is(err, badger.ErrKeyNotFound) { - return KVObject{}, ErrNotFound + return nil, ErrNotFound } - return KVObject{}, err - } - - out := KVObject{ - Key: string(item.Key())[len(section)+1:], + return nil, err } // Get the value and create a reader from it value, err := item.ValueCopy(nil) if err != nil { - return KVObject{}, err + return nil, err } - out.Value = io.NopCloser(bytes.NewReader(value)) - - return out, nil + return io.NopCloser(bytes.NewReader(value)), nil } func (k *badgerKV) Save(ctx context.Context, section string, key string, value io.Reader) error { diff --git a/pkg/storage/unified/resource/kv_test.go b/pkg/storage/unified/resource/kv_test.go index a2e3544db7c..6b7ead40879 100644 --- a/pkg/storage/unified/resource/kv_test.go +++ b/pkg/storage/unified/resource/kv_test.go @@ -116,20 +116,20 @@ func TestBadgerKV_UnderlyingStorage(t *testing.T) { require.NoError(t, err) // Verify KV interface returns correct values for each section - obj1, err := kv.Get(ctx, section1, key) + reader1, err := kv.Get(ctx, section1, key) require.NoError(t, err) - val1, err := io.ReadAll(obj1.Value) + val1, err := io.ReadAll(reader1) require.NoError(t, err) require.Equal(t, value1, string(val1)) - err = obj1.Value.Close() + err = reader1.Close() require.NoError(t, err) - obj2, err := kv.Get(ctx, section2, key) + reader2, err := kv.Get(ctx, section2, key) require.NoError(t, err) - val2, err := io.ReadAll(obj2.Value) + val2, err := io.ReadAll(reader2) require.NoError(t, err) require.Equal(t, value2, string(val2)) - err = obj2.Value.Close() + err = reader2.Close() require.NoError(t, err) }) diff --git a/pkg/storage/unified/resource/metadata.go b/pkg/storage/unified/resource/metadata.go index 0ed7d4ec493..ba0ea429faf 100644 --- a/pkg/storage/unified/resource/metadata.go +++ b/pkg/storage/unified/resource/metadata.go @@ -206,15 +206,15 @@ func (d *metadataStore) Get(ctx context.Context, key MetaDataKey) (MetaData, err return MetaData{}, fmt.Errorf("invalid metadata key: %w", err) } - obj, err := d.kv.Get(ctx, metaSection, key.String()) + reader, err := d.kv.Get(ctx, metaSection, key.String()) if err != nil { return MetaData{}, err } defer func() { - _ = obj.Value.Close() + _ = reader.Close() }() var meta MetaData - err = json.NewDecoder(obj.Value).Decode(&meta) + err = json.NewDecoder(reader).Decode(&meta) return meta, err } diff --git a/pkg/storage/unified/resource/metadata_test.go b/pkg/storage/unified/resource/metadata_test.go index 70c3ea26a32..2e4816a3c32 100644 --- a/pkg/storage/unified/resource/metadata_test.go +++ b/pkg/storage/unified/resource/metadata_test.go @@ -110,11 +110,12 @@ func TestMetadataStore_Save(t *testing.T) { }) require.NoError(t, err) // Verify in the kv store that the metadata is saved - retrievedObj, err := store.kv.Get(ctx, metaSection, key.String()) + reader, err := store.kv.Get(ctx, metaSection, key.String()) require.NoError(t, err) - assert.Equal(t, key.String(), retrievedObj.Key) var retrivedMeta MetaData - actualData, err := io.ReadAll(retrievedObj.Value) + actualData, err := io.ReadAll(reader) + require.NoError(t, err) + err = reader.Close() require.NoError(t, err) err = json.Unmarshal(actualData, &retrivedMeta) require.NoError(t, err) diff --git a/pkg/storage/unified/testing/kv.go b/pkg/storage/unified/testing/kv.go index 0c3b745fdf0..60b69d38497 100644 --- a/pkg/storage/unified/testing/kv.go +++ b/pkg/storage/unified/testing/kv.go @@ -85,17 +85,16 @@ func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) { require.NoError(t, err) // Now get it - obj, err := kv.Get(ctx, section, "existing-key") + reader, err := kv.Get(ctx, section, "existing-key") require.NoError(t, err) - assert.Equal(t, "existing-key", obj.Key) // Read the value - value, err := io.ReadAll(obj.Value) + value, err := io.ReadAll(reader) require.NoError(t, err) assert.Equal(t, testValue, string(value)) // Close the value reader - err = obj.Value.Close() + err = reader.Close() require.NoError(t, err) }) @@ -122,14 +121,13 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { require.NoError(t, err) // Verify it was saved - obj, err := kv.Get(ctx, section, "new-key") + reader, err := kv.Get(ctx, section, "new-key") require.NoError(t, err) - assert.Equal(t, "new-key", obj.Key) - value, err := io.ReadAll(obj.Value) + value, err := io.ReadAll(reader) require.NoError(t, err) assert.Equal(t, testValue, string(value)) - err = obj.Value.Close() + err = reader.Close() require.NoError(t, err) }) @@ -144,13 +142,13 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { require.NoError(t, err) // Verify it was updated - obj, err := kv.Get(ctx, section, "overwrite-key") + reader, err := kv.Get(ctx, section, "overwrite-key") require.NoError(t, err) - value, err := io.ReadAll(obj.Value) + value, err := io.ReadAll(reader) require.NoError(t, err) assert.Equal(t, newValue, string(value)) - err = obj.Value.Close() + err = reader.Close() require.NoError(t, err) }) @@ -166,13 +164,13 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { require.NoError(t, err) // Verify binary data - obj, err := kv.Get(ctx, section, "binary-key") + reader, err := kv.Get(ctx, section, "binary-key") require.NoError(t, err) - value, err := io.ReadAll(obj.Value) + value, err := io.ReadAll(reader) require.NoError(t, err) assert.Equal(t, binaryData, value) - err = obj.Value.Close() + err = reader.Close() require.NoError(t, err) }) @@ -182,15 +180,14 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) { require.NoError(t, err) // Verify it was saved with empty data - obj, err := kv.Get(ctx, section, "empty-key") + reader, err := kv.Get(ctx, section, "empty-key") require.NoError(t, err) - assert.Equal(t, "empty-key", obj.Key) - value, err := io.ReadAll(obj.Value) + value, err := io.ReadAll(reader) require.NoError(t, err) assert.Equal(t, "", string(value)) assert.Len(t, value, 0) - err = obj.Value.Close() + err = reader.Close() require.NoError(t, err) }) } @@ -416,14 +413,14 @@ func runTestKVConcurrent(t *testing.T, kv resource.KV, nsPrefix string) { } // Get immediately - obj, err := kv.Get(ctx, section, key) + reader, err := kv.Get(ctx, section, key) if err != nil { return } - readValue, err := io.ReadAll(obj.Value) + readValue, err := io.ReadAll(reader) require.NoError(t, err) - err = obj.Value.Close() + err = reader.Close() require.NoError(t, err) assert.Equal(t, value, string(readValue)) }