diff --git a/src/borg/cache.py b/src/borg/cache.py index 992fda667..8399ab287 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -679,6 +679,7 @@ class ChunksMixin: def chunks(self): if self._chunks is None: self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True) + self.repository.set_chunk_index(self._chunks) return self._chunks def seen_chunk(self, id, size=None): @@ -869,6 +870,7 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): def wipe_cache(self): logger.warning("Discarding incompatible cache and forcing a cache rebuild") self._chunks = ChunkIndex() + self.repository.set_chunk_index(self._chunks) self.cache_config.manifest_id = "" self.cache_config._config.set("cache", "manifest", "") diff --git a/src/borg/repository.py b/src/borg/repository.py index f8b0ffee5..403234efa 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -285,6 +285,7 @@ class Repository: self.lock_wait = lock_wait self.exclusive = exclusive self._pack_writer = None + self._chunks = None # borrowed ChunkIndex reference, set by set_chunk_index() def __repr__(self): return f"<{self.__class__.__name__} {self._location}>" @@ -407,15 +408,19 @@ class Repository: if lock: self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire() self._pack_writer = PackWriter(self.store, max_count=1) + self._chunks = None self.opened = True - def flush(self): - """Flush any buffered pack writer chunks. Returns pack_results (or None). + def set_chunk_index(self, chunks): + """Set the ChunkIndex get() uses to resolve pack locations. - Callers that maintain a ChunkIndex must call this and pass the result to - chunks.update_pack_info() before closing, so index entries for the last - batch of chunks get real pack location values instead of UNKNOWN_*. + The caller retains ownership; Repository holds a borrowed reference. + Pass None to clear. """ + self._chunks = chunks + + def flush(self): + """Flush any buffered pack writer chunks. Returns pack_results (or None).""" if self._pack_writer is not None: return self._pack_writer.flush() return None @@ -429,6 +434,7 @@ class Repository: if self.store_opened: self.store.close() self.store_opened = False + self._chunks = None self.opened = False def info(self): @@ -606,6 +612,10 @@ class Repository: def get(self, id, read_data=True, raise_missing=True, obj_offset=0, obj_size=None): self._lock_refresh() pack_id = id # N=1: pack_id == chunk_id + if self._chunks is not None: + entry = self._chunks.get(id) + if entry is not None and entry.pack_id != UNKNOWN_BYTES32: # UNKNOWN: buffered, not yet flushed + pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) key = "packs/" + bin_to_hex(pack_id) try: @@ -644,6 +654,8 @@ class Repository: return None def get_many(self, ids, read_data=True, raise_missing=True): + # N>1: set_chunk_index() must be called before any get() so locations from prior sessions + # are available; without it get() falls back to N=1 (pack_id == chunk_id) for every id. for id_ in ids: yield self.get(id_, read_data=read_data, raise_missing=raise_missing) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 04de1f37d..5852f3fd8 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -4,6 +4,7 @@ from hashlib import sha256 import pytest from ..helpers import IntegrityError, Location, bin_to_hex +from ..hashindex import ChunkIndex from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -210,6 +211,53 @@ def test_get_with_range(tmp_path): assert repository.get(pack_id, obj_offset=len(chunk1), obj_size=len(chunk2)) == chunk2 +def test_get_read_data_false_with_range(tmp_path): + # read_data=False with obj_size limits the load to the object boundary. + hdr_size = RepoObj.obj_header.size + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(43) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk1)) + assert result == chunk1[:hdr_size] # empty meta, so header only + result2 = repository.get(pack_id, read_data=False, obj_offset=len(chunk1), obj_size=len(chunk2)) + assert result2 == chunk2[:hdr_size] + + +def test_get_read_data_false_large_meta(tmp_path): + # When meta_size > extra_size (975 bytes), get() retries with a larger load. + hdr_size = RepoObj.obj_header.size + big_meta = b"M" * 1000 # 1000 > 975, forces the retry load + chunk = fchunk(b"DATA", meta=big_meta) + pack_id = H(44) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.store_store("packs/" + bin_to_hex(pack_id), chunk) + result = repository.get(pack_id, read_data=False, obj_offset=0, obj_size=len(chunk)) + assert result == chunk[: hdr_size + len(big_meta)] + + +def test_get_uses_chunk_index_location(tmp_path): + # get() routes to the correct pack and offset when a ChunkIndex is set via set_chunk_index(). + chunk1 = fchunk(b"FIRST") + chunk2 = fchunk(b"SECOND") + pack = chunk1 + chunk2 + pack_id = H(55) + id1, id2 = H(56), H(57) + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + # Inject the pack directly; bypasses PackWriter to test routing independently. + repository.store_store("packs/" + bin_to_hex(pack_id), pack) + chunks = ChunkIndex() + chunks.add(id1, len(chunk1)) + chunks.update_pack_info([(id1, pack_id, 0, len(chunk1))]) + chunks.add(id2, len(chunk2)) + chunks.update_pack_info([(id2, pack_id, len(chunk1), len(chunk2))]) + repository.set_chunk_index(chunks) + assert repository.get(id1) == chunk1 + assert repository.get(id2) == chunk2 + + def test_pack_writer_final_partial_pack_uses_sha256(): # When max_count > 1, a final flush with only 1 piece must still use SHA256, # not the N=1 pack_id == chunk_id hack.