repository: add set_chunk_index() for ChunkIndex-based pack routing, refs #8572

Replace _pack_info (session-scoped dict) with a borrowed ChunkIndex reference.
Cache passes its index via set_chunk_index(); get() routes correctly for all sessions.
This commit is contained in:
Mrityunjay Raj 2026-06-10 10:44:51 +05:30
parent 2553a705cc
commit 68491c4409
3 changed files with 67 additions and 5 deletions

View file

@ -679,6 +679,7 @@ class ChunksMixin:
def chunks(self):
if self._chunks is None:
self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
self.repository.set_chunk_index(self._chunks)
return self._chunks
def seen_chunk(self, id, size=None):
@ -869,6 +870,7 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
def wipe_cache(self):
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
self._chunks = ChunkIndex()
self.repository.set_chunk_index(self._chunks)
self.cache_config.manifest_id = ""
self.cache_config._config.set("cache", "manifest", "")

View file

@ -285,6 +285,7 @@ class Repository:
self.lock_wait = lock_wait
self.exclusive = exclusive
self._pack_writer = None
self._chunks = None # borrowed ChunkIndex reference, set by set_chunk_index()
def __repr__(self):
return f"<{self.__class__.__name__} {self._location}>"
@ -407,15 +408,19 @@ class Repository:
if lock:
self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire()
self._pack_writer = PackWriter(self.store, max_count=1)
self._chunks = None
self.opened = True
def flush(self):
"""Flush any buffered pack writer chunks. Returns pack_results (or None).
def set_chunk_index(self, chunks):
"""Set the ChunkIndex get() uses to resolve pack locations.
Callers that maintain a ChunkIndex must call this and pass the result to
chunks.update_pack_info() before closing, so index entries for the last
batch of chunks get real pack location values instead of UNKNOWN_*.
The caller retains ownership; Repository holds a borrowed reference.
Pass None to clear.
"""
self._chunks = chunks
def flush(self):
"""Flush any buffered pack writer chunks. Returns pack_results (or None)."""
if self._pack_writer is not None:
return self._pack_writer.flush()
return None
@ -429,6 +434,7 @@ class Repository:
if self.store_opened:
self.store.close()
self.store_opened = False
self._chunks = None
self.opened = False
def info(self):
@ -606,6 +612,10 @@ class Repository:
def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=None):
self._lock_refresh()
pack_id = id # N=1: pack_id == chunk_id
if self._chunks is not None:
entry = self._chunks.get(id)
if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed
pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size
id_hex = bin_to_hex(id)
key = "packs/" + bin_to_hex(pack_id)
try:
@ -644,6 +654,8 @@ class Repository:
return None
def get_many(self, ids, read_data=True, raise_missing=True):
# N>1: set_chunk_index() must be called before any get() so locations from prior sessions
# are available; without it get() falls back to N=1 (pack_id == chunk_id) for every id.
for id_ in ids:
yield self.get(id_, read_data=read_data, raise_missing=raise_missing)

View file

@ -4,6 +4,7 @@ from hashlib import sha256
import pytest
from ..helpers import IntegrityError, Location, bin_to_hex
from ..hashindex import ChunkIndex
from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter
from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
from .hashindex_test import H
@ -210,6 +211,53 @@ def test_get_with_range(tmp_path):
assert repository.get(pack_id, obj_offset=len(chunk1), obj_size=len(chunk2)) == chunk2
def test_get_read_data_false_with_range(tmp_path):
# read_data=False with obj_size limits the load to the object boundary.
hdr_size = RepoObj.obj_header.size
chunk1 = fchunk(b"FIRST")
chunk2 = fchunk(b"SECOND")
pack = chunk1 + chunk2
pack_id = H(43)
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
repository.store_store("packs/" + bin_to_hex(pack_id), pack)
result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk1))
assert result == chunk1[:hdr_size] # empty meta, so header only
result2 = repository.get(pack_id, read_data=False, obj_offset=len(chunk1), obj_size=len(chunk2))
assert result2 == chunk2[:hdr_size]
def test_get_read_data_false_large_meta(tmp_path):
# When meta_size > extra_size (975 bytes), get() retries with a larger load.
hdr_size = RepoObj.obj_header.size
big_meta = b"M" * 1000 # 1000 > 975, forces the retry load
chunk = fchunk(b"DATA", meta=big_meta)
pack_id = H(44)
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
repository.store_store("packs/" + bin_to_hex(pack_id), chunk)
result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk))
assert result == chunk[: hdr_size + len(big_meta)]
def test_get_uses_chunk_index_location(tmp_path):
# get() routes to the correct pack and offset when a ChunkIndex is set via set_chunk_index().
chunk1 = fchunk(b"FIRST")
chunk2 = fchunk(b"SECOND")
pack = chunk1 + chunk2
pack_id = H(55)
id1, id2 = H(56), H(57)
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
# Inject the pack directly; bypasses PackWriter to test routing independently.
repository.store_store("packs/" + bin_to_hex(pack_id), pack)
chunks = ChunkIndex()
chunks.add(id1, len(chunk1))
chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))])
chunks.add(id2, len(chunk2))
chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))])
repository.set_chunk_index(chunks)
assert repository.get(id1) == chunk1
assert repository.get(id2) == chunk2
def test_pack_writer_final_partial_pack_uses_sha256():
# When max_count > 1, a final flush with only 1 piece must still use SHA256,
# not the N=1 pack_id == chunk_id hack.