borgbackup/borg/cache.py

505 lines
22 KiB
Python
Raw Permalink Normal View History

import configparser
from .remote import cache_if_remote
from collections import namedtuple
2010-10-20 15:08:46 -04:00
import os
import stat
multithreaded "create" operation Making much better use of the CPU by dispatching all CPU intensive stuff (hashing, crypto, compression) to N crypter threads (N == logical cpu count == 4 for a dual-core CPU with hyperthreading). I/O intensive stuff also runs in separate threads: the MainThread does the filesystem traversal, the reader thread reads and chunks the files, the writer thread writes to the repo. This way, we don't need to sit idle waiting for I/O, but the I/O thread will block and another thread will get dispatched and use the time. This applies for read as well as for write/fsync I/O wait time (access time + data transfer). There's one more thread, the "delayer". We need it to handle a race condition related to the computation of the compressed size (which is only possible after hashing/compression/encryption has finished). This "csize" makes all this code quite more complicated than if we would not need it. Although there is the GIL issue for Python code, we can still make good use of multithreading as I/O operations and C code (that releases the GIL) can run in parallel. All threads are connected via Python Queues (which are intended for this and thread safe). The Cache.chunks datastructure is also updated by threadsafe code. A little benchmark ------------------ Both is with compression (zlib level 6) and encryption on a haswell/ssd laptop: Without multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 13.78 System time (seconds): 0.40 Percent of CPU this job got: 83% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:16.98 With multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 24.08 System time (seconds): 1.16 Percent of CPU this job got: 249% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.11 It's unclear to me why it uses much more "User time" (I'm not even sure that measurement is correct). But the overall runtime "Elapsed" significantly dropped and it makes better use of all cpu cores (not just 83% of one).
2015-05-25 16:37:15 -04:00
import threading
from binascii import hexlify, unhexlify
2010-12-21 15:29:09 -05:00
import shutil
from .key import PlaintextKey
from .logger import create_logger
logger = create_logger()
from .helpers import Error, get_cache_dir, decode_dict, int_to_bigint, \
bigint_to_int, format_file_size, yes
from .locking import UpgradableLock
from .hashindex import ChunkIndex
2010-03-06 12:25:35 -05:00
import msgpack
2015-10-08 17:03:35 -04:00
2010-03-06 12:25:35 -05:00
2015-03-17 18:03:36 -04:00
class Cache:
2010-03-06 12:25:35 -05:00
"""Client Side cache
"""
class RepositoryReplay(Error):
"""Cache is newer than repository, refusing to continue"""
class CacheInitAbortedError(Error):
"""Cache initialization aborted"""
class RepositoryAccessAborted(Error):
"""Repository access aborted"""
class EncryptionMethodMismatch(Error):
"""Repository encryption method changed since last access, refusing to continue"""
@staticmethod
def break_lock(repository, path=None):
path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
UpgradableLock(os.path.join(path, 'lock'), exclusive=True).break_lock()
multithreaded "create" operation Making much better use of the CPU by dispatching all CPU intensive stuff (hashing, crypto, compression) to N crypter threads (N == logical cpu count == 4 for a dual-core CPU with hyperthreading). I/O intensive stuff also runs in separate threads: the MainThread does the filesystem traversal, the reader thread reads and chunks the files, the writer thread writes to the repo. This way, we don't need to sit idle waiting for I/O, but the I/O thread will block and another thread will get dispatched and use the time. This applies for read as well as for write/fsync I/O wait time (access time + data transfer). There's one more thread, the "delayer". We need it to handle a race condition related to the computation of the compressed size (which is only possible after hashing/compression/encryption has finished). This "csize" makes all this code quite more complicated than if we would not need it. Although there is the GIL issue for Python code, we can still make good use of multithreading as I/O operations and C code (that releases the GIL) can run in parallel. All threads are connected via Python Queues (which are intended for this and thread safe). The Cache.chunks datastructure is also updated by threadsafe code. A little benchmark ------------------ Both is with compression (zlib level 6) and encryption on a haswell/ssd laptop: Without multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 13.78 System time (seconds): 0.40 Percent of CPU this job got: 83% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:16.98 With multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 24.08 System time (seconds): 1.16 Percent of CPU this job got: 249% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.11 It's unclear to me why it uses much more "User time" (I'm not even sure that measurement is correct). But the overall runtime "Elapsed" significantly dropped and it makes better use of all cpu cores (not just 83% of one).
2015-05-25 16:37:15 -04:00
class ChunkSizeNotReady(Exception):
"""computation of some chunk size is not yet finished"""
def __init__(self, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True,
lock_wait=None):
self.lock = None
self.timestamp = None
multithreaded "create" operation Making much better use of the CPU by dispatching all CPU intensive stuff (hashing, crypto, compression) to N crypter threads (N == logical cpu count == 4 for a dual-core CPU with hyperthreading). I/O intensive stuff also runs in separate threads: the MainThread does the filesystem traversal, the reader thread reads and chunks the files, the writer thread writes to the repo. This way, we don't need to sit idle waiting for I/O, but the I/O thread will block and another thread will get dispatched and use the time. This applies for read as well as for write/fsync I/O wait time (access time + data transfer). There's one more thread, the "delayer". We need it to handle a race condition related to the computation of the compressed size (which is only possible after hashing/compression/encryption has finished). This "csize" makes all this code quite more complicated than if we would not need it. Although there is the GIL issue for Python code, we can still make good use of multithreading as I/O operations and C code (that releases the GIL) can run in parallel. All threads are connected via Python Queues (which are intended for this and thread safe). The Cache.chunks datastructure is also updated by threadsafe code. A little benchmark ------------------ Both is with compression (zlib level 6) and encryption on a haswell/ssd laptop: Without multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 13.78 System time (seconds): 0.40 Percent of CPU this job got: 83% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:16.98 With multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 24.08 System time (seconds): 1.16 Percent of CPU this job got: 249% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.11 It's unclear to me why it uses much more "User time" (I'm not even sure that measurement is correct). But the overall runtime "Elapsed" significantly dropped and it makes better use of all cpu cores (not just 83% of one).
2015-05-25 16:37:15 -04:00
self.thread_lock = threading.Lock()
2010-12-21 15:29:09 -05:00
self.txn_active = False
2013-06-20 06:44:58 -04:00
self.repository = repository
self.key = key
self.manifest = manifest
self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii'))
self.do_files = do_files
# Warn user before sending data to a never seen before unencrypted repository
2010-12-21 15:29:09 -05:00
if not os.path.exists(self.path):
if warn_if_unencrypted and isinstance(key, PlaintextKey):
msg = ("Warning: Attempting to access a previously unknown unencrypted repository!" +
"\n" +
"Do you want to continue? [yN] ")
if not yes(msg, false_msg="Aborting.", env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK'):
raise self.CacheInitAbortedError()
2010-12-21 15:29:09 -05:00
self.create()
self.open(lock_wait=lock_wait)
# Warn user before sending data to a relocated repository
if self.previous_location and self.previous_location != repository._location.canonical_path():
msg = ("Warning: The repository at location {} was previously located at {}".format(repository._location.canonical_path(), self.previous_location) +
"\n" +
"Do you want to continue? [yN] ")
if not yes(msg, false_msg="Aborting.", env_var_override='BORG_RELOCATED_REPO_ACCESS_IS_OK'):
raise self.RepositoryAccessAborted()
if sync and self.manifest.id != self.manifest_id:
# If repository is older than the cache something fishy is going on
if self.timestamp and self.timestamp > manifest.timestamp:
raise self.RepositoryReplay()
# Make sure an encrypted repository has not been swapped for an unencrypted repository
if self.key_type is not None and self.key_type != str(key.TYPE):
raise self.EncryptionMethodMismatch()
2010-12-21 15:29:09 -05:00
self.sync()
self.commit()
2010-03-06 12:25:35 -05:00
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
2013-06-24 16:41:05 -04:00
self.close()
def __str__(self):
fmt = """\
All archives: {0.total_size:>20s} {0.total_csize:>20s} {0.unique_csize:>20s}
Unique chunks Total chunks
Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
return fmt.format(self.format_tuple())
def format_tuple(self):
# XXX: this should really be moved down to `hashindex.pyx`
Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks', 'total_chunks'])
stats = Summary(*self.chunks.summarize())._asdict()
for field in ['total_size', 'total_csize', 'unique_csize']:
stats[field] = format_file_size(stats[field])
return Summary(**stats)
2010-12-21 15:29:09 -05:00
def create(self):
"""Create a new empty cache at `self.path`
2010-12-21 15:29:09 -05:00
"""
2011-01-04 17:00:39 -05:00
os.makedirs(self.path)
2013-06-03 07:45:48 -04:00
with open(os.path.join(self.path, 'README'), 'w') as fd:
fd.write('This is a Borg cache')
config = configparser.ConfigParser(interpolation=None)
2010-12-21 15:29:09 -05:00
config.add_section('cache')
config.set('cache', 'version', '1')
2013-06-20 06:44:58 -04:00
config.set('cache', 'repository', hexlify(self.repository.id).decode('ascii'))
config.set('cache', 'manifest', '')
2013-06-03 07:45:48 -04:00
with open(os.path.join(self.path, 'config'), 'w') as fd:
2010-12-21 15:29:09 -05:00
config.write(fd)
ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
with open(os.path.join(self.path, 'files'), 'wb') as fd:
pass # empty file
2010-12-21 15:29:09 -05:00
def destroy(self):
"""destroy the cache at `self.path`
"""
self.close()
os.remove(os.path.join(self.path, 'config')) # kill config first
shutil.rmtree(self.path)
def _do_open(self):
self.config = configparser.ConfigParser(interpolation=None)
config_path = os.path.join(self.path, 'config')
self.config.read(config_path)
try:
cache_version = self.config.getint('cache', 'version')
wanted_version = 1
2016-01-30 15:32:45 -05:00
if cache_version != wanted_version:
raise Exception('%s has unexpected cache version %d (wanted: %d).' % (
config_path, cache_version, wanted_version))
2016-01-30 15:32:45 -05:00
except configparser.NoSectionError:
raise Exception('%s does not look like a Borg cache.' % config_path) from None
2013-06-20 06:44:58 -04:00
self.id = self.config.get('cache', 'repository')
self.manifest_id = unhexlify(self.config.get('cache', 'manifest'))
self.timestamp = self.config.get('cache', 'timestamp', fallback=None)
self.key_type = self.config.get('cache', 'key_type', fallback=None)
self.previous_location = self.config.get('cache', 'previous_location', fallback=None)
self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
self.files = None
2011-07-02 14:39:35 -04:00
def open(self, lock_wait=None):
if not os.path.isdir(self.path):
raise Exception('%s Does not look like a Borg cache' % self.path)
self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire()
self.rollback()
2013-06-24 16:41:05 -04:00
def close(self):
if self.lock is not None:
self.lock.release()
self.lock = None
2013-06-24 16:41:05 -04:00
2011-07-02 14:39:35 -04:00
def _read_files(self):
2011-07-06 16:23:41 -04:00
self.files = {}
self._newest_mtime = 0
logger.info('reading files cache')
2010-12-21 15:29:09 -05:00
with open(os.path.join(self.path, 'files'), 'rb') as fd:
u = msgpack.Unpacker(use_list=True)
2010-12-21 15:29:09 -05:00
while True:
data = fd.read(64 * 1024)
if not data:
break
u.feed(data)
for path_hash, item in u:
item[0] += 1
# in the end, this takes about 240 Bytes per file
self.files[path_hash] = msgpack.packb(item)
2010-12-21 15:29:09 -05:00
def begin_txn(self):
# Initialize transaction snapshot
txn_dir = os.path.join(self.path, 'txn.tmp')
os.mkdir(txn_dir)
shutil.copy(os.path.join(self.path, 'config'), txn_dir)
shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
shutil.copy(os.path.join(self.path, 'files'), txn_dir)
os.rename(os.path.join(self.path, 'txn.tmp'),
os.path.join(self.path, 'txn.active'))
self.txn_active = True
def commit(self):
"""Commit transaction
"""
2011-01-04 17:16:55 -05:00
if not self.txn_active:
return
if self.files is not None:
with open(os.path.join(self.path, 'files'), 'wb') as fd:
2014-06-03 17:10:52 -04:00
for path_hash, item in self.files.items():
# Discard cached files with the newest mtime to avoid
# issues with filesystem snapshots and mtime precision
2014-06-03 17:10:52 -04:00
item = msgpack.unpackb(item)
if item[0] < 10 and bigint_to_int(item[3]) < self._newest_mtime:
msgpack.pack((path_hash, item), fd)
2013-06-03 07:45:48 -04:00
self.config.set('cache', 'manifest', hexlify(self.manifest.id).decode('ascii'))
self.config.set('cache', 'timestamp', self.manifest.timestamp)
self.config.set('cache', 'key_type', str(self.key.TYPE))
self.config.set('cache', 'previous_location', self.repository._location.canonical_path())
2010-12-21 15:29:09 -05:00
with open(os.path.join(self.path, 'config'), 'w') as fd:
self.config.write(fd)
self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8'))
2010-12-21 15:29:09 -05:00
os.rename(os.path.join(self.path, 'txn.active'),
os.path.join(self.path, 'txn.tmp'))
shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
self.txn_active = False
def rollback(self):
"""Roll back partial and aborted transactions
"""
# Remove partial transaction
if os.path.exists(os.path.join(self.path, 'txn.tmp')):
shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
2010-12-21 15:29:09 -05:00
# Roll back active transaction
txn_dir = os.path.join(self.path, 'txn.active')
if os.path.exists(txn_dir):
shutil.copy(os.path.join(txn_dir, 'config'), self.path)
shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
shutil.copy(os.path.join(txn_dir, 'files'), self.path)
os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
if os.path.exists(os.path.join(self.path, 'txn.tmp')):
shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
2010-12-21 15:29:09 -05:00
self.txn_active = False
self._do_open()
2010-12-21 15:29:09 -05:00
def sync(self):
"""Re-synchronize chunks cache with repository.
Maintains a directory with known backup archive indexes, so it only
needs to fetch infos from repo and build a chunk index once per backup
archive.
If out of sync, missing archive indexes get added, outdated indexes
get removed and a new master chunks index is built by merging all
archive indexes.
2010-03-06 12:25:35 -05:00
"""
archive_path = os.path.join(self.path, 'chunks.archive.d')
def mkpath(id, suffix=''):
id_hex = hexlify(id).decode('ascii')
path = os.path.join(archive_path, id_hex + suffix)
return path.encode('utf-8')
def cached_archives():
if self.do_cache:
fns = os.listdir(archive_path)
# filenames with 64 hex digits == 256bit
return set(unhexlify(fn) for fn in fns if len(fn) == 64)
else:
return set()
def repo_archives():
return set(info[b'id'] for info in self.manifest.archives.values())
def cleanup_outdated(ids):
for id in ids:
os.unlink(mkpath(id))
def fetch_and_build_idx(archive_id, repository, key):
chunk_idx = ChunkIndex()
cdata = repository.get(archive_id)
data = key.decrypt(archive_id, cdata)
chunk_idx.add(archive_id, 1, len(data), len(cdata))
archive = msgpack.unpackb(data)
if archive[b'version'] != 1:
raise Exception('Unknown archive metadata version')
decode_dict(archive, (b'name',))
unpacker = msgpack.Unpacker()
for item_id, chunk in zip(archive[b'items'], repository.get_many(archive[b'items'])):
data = key.decrypt(item_id, chunk)
chunk_idx.add(item_id, 1, len(data), len(chunk))
unpacker.feed(data)
for item in unpacker:
if not isinstance(item, dict):
logger.error('Error: Did not get expected metadata dict - archive corrupted!')
continue
if b'chunks' in item:
for chunk_id, size, csize in item[b'chunks']:
chunk_idx.add(chunk_id, 1, size, csize)
if self.do_cache:
fn = mkpath(archive_id)
fn_tmp = mkpath(archive_id, suffix='.tmp')
try:
chunk_idx.write(fn_tmp)
except Exception:
os.unlink(fn_tmp)
else:
os.rename(fn_tmp, fn)
return chunk_idx
def lookup_name(archive_id):
for name, info in self.manifest.archives.items():
if info[b'id'] == archive_id:
return name
def create_master_idx(chunk_idx):
logger.info('Synchronizing chunks cache...')
cached_ids = cached_archives()
archive_ids = repo_archives()
logger.info('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.' % (
len(archive_ids), len(cached_ids),
len(cached_ids - archive_ids), len(archive_ids - cached_ids), ))
# deallocates old hashindex, creates empty hashindex:
chunk_idx.clear()
cleanup_outdated(cached_ids - archive_ids)
if archive_ids:
chunk_idx = None
for archive_id in archive_ids:
archive_name = lookup_name(archive_id)
if archive_id in cached_ids:
archive_chunk_idx_path = mkpath(archive_id)
logger.info("Reading cached archive chunk index for %s ..." % archive_name)
archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path)
else:
logger.info('Fetching and building archive index for %s ...' % archive_name)
archive_chunk_idx = fetch_and_build_idx(archive_id, repository, self.key)
logger.info("Merging into master chunks index ...")
if chunk_idx is None:
# we just use the first archive's idx as starting point,
# to avoid growing the hash table from 0 size and also
# to save 1 merge call.
chunk_idx = archive_chunk_idx
else:
chunk_idx.merge(archive_chunk_idx)
logger.info('Done.')
return chunk_idx
def legacy_cleanup():
"""bring old cache dirs into the desired state (cleanup and adapt)"""
try:
os.unlink(os.path.join(self.path, 'chunks.archive'))
except:
pass
try:
os.unlink(os.path.join(self.path, 'chunks.archive.tmp'))
except:
pass
try:
os.mkdir(archive_path)
except:
pass
self.begin_txn()
with cache_if_remote(self.repository) as repository:
legacy_cleanup()
# TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
# this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
self.do_cache = os.path.isdir(archive_path)
self.chunks = create_master_idx(self.chunks)
2010-03-06 12:25:35 -05:00
def add_chunk(self, id, data, stats):
2010-12-21 15:29:09 -05:00
if not self.txn_active:
self.begin_txn()
size = len(data)
if self.seen_chunk(id, size):
return self.chunk_incref(id, stats)
data = self.key.encrypt(data)
csize = len(data)
2013-06-20 06:44:58 -04:00
self.repository.put(id, data, wait=False)
self.chunks[id] = (1, size, csize)
stats.update(size, csize, True)
return id, size, csize
2010-03-06 12:25:35 -05:00
multithreaded "create" operation Making much better use of the CPU by dispatching all CPU intensive stuff (hashing, crypto, compression) to N crypter threads (N == logical cpu count == 4 for a dual-core CPU with hyperthreading). I/O intensive stuff also runs in separate threads: the MainThread does the filesystem traversal, the reader thread reads and chunks the files, the writer thread writes to the repo. This way, we don't need to sit idle waiting for I/O, but the I/O thread will block and another thread will get dispatched and use the time. This applies for read as well as for write/fsync I/O wait time (access time + data transfer). There's one more thread, the "delayer". We need it to handle a race condition related to the computation of the compressed size (which is only possible after hashing/compression/encryption has finished). This "csize" makes all this code quite more complicated than if we would not need it. Although there is the GIL issue for Python code, we can still make good use of multithreading as I/O operations and C code (that releases the GIL) can run in parallel. All threads are connected via Python Queues (which are intended for this and thread safe). The Cache.chunks datastructure is also updated by threadsafe code. A little benchmark ------------------ Both is with compression (zlib level 6) and encryption on a haswell/ssd laptop: Without multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 13.78 System time (seconds): 0.40 Percent of CPU this job got: 83% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:16.98 With multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 24.08 System time (seconds): 1.16 Percent of CPU this job got: 249% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.11 It's unclear to me why it uses much more "User time" (I'm not even sure that measurement is correct). But the overall runtime "Elapsed" significantly dropped and it makes better use of all cpu cores (not just 83% of one).
2015-05-25 16:37:15 -04:00
def chunk_modify(self, id, count=None, delta=None, size=None, csize=None):
"""modify a self.chunks entry, return the new value.
must be thread safe.
"""
with self.thread_lock:
_count, _size, _csize = self.chunks[id]
modified = False
if size is not None and size != _size:
assert _size == 0
_size = size
modified = True
if csize is not None and csize != _csize:
assert _csize == 0
_csize = csize
modified = True
if count is not None and count != _count:
assert _count == 0
_count = count
modified = True
if delta is not None and delta != 0:
_count += delta
assert _count >= 0
modified = True
if modified:
self.chunks[id] = _count, _size, _csize
return _count, _size, _csize
def add_chunk_nostats(self, cchunk, id, size, csize):
# do not update stats here, see postprocess
if not self.txn_active:
self.begin_txn()
new_chunk = cchunk is not None
if new_chunk:
# note: count = 1 already set in seen_or_announce_chunk
_, size, csize = self.chunk_modify(id, size=size, csize=csize)
self.repository.put(id, cchunk, wait=False)
else:
# note: csize might be still 0 (not yet computed) here
_, size, csize = self.chunk_modify(id, delta=1, size=size)
return size, csize, new_chunk
def postprocess_results(self, size_infos, results, stats):
# we need to do some post processing:
# - chunks that are duplicate may have csize not yet set correctly due
# to the multi threaded processing. all (x, 0) sizes must be still
# set using the correct size from the other duplicate chunk (not x, 0).
# - we need to reconstruct the correct order of the chunks.
# - we need to fix the stats now we have the correct csize
chunks = []
for _, id, new_chunk in sorted(results):
try:
size, csize = size_infos[id]
except KeyError:
raise self.ChunkSizeNotReady
chunks.append((id, size, csize, new_chunk))
# do another pass after we have made sure we have all size info
results = []
for id, size, csize, new_chunk in chunks:
stats.update(size, csize, new_chunk)
results.append((id, size, csize))
return results
def seen_chunk(self, id, size=None):
refcount, stored_size, _ = self.chunks.get(id, (0, None, None))
if size is not None and stored_size is not None and size != stored_size:
# we already have a chunk with that id, but different size.
# this is either a hash collision (unlikely) or corruption or a bug.
raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % (
id, stored_size, size))
return refcount
2010-03-06 12:25:35 -05:00
multithreaded "create" operation Making much better use of the CPU by dispatching all CPU intensive stuff (hashing, crypto, compression) to N crypter threads (N == logical cpu count == 4 for a dual-core CPU with hyperthreading). I/O intensive stuff also runs in separate threads: the MainThread does the filesystem traversal, the reader thread reads and chunks the files, the writer thread writes to the repo. This way, we don't need to sit idle waiting for I/O, but the I/O thread will block and another thread will get dispatched and use the time. This applies for read as well as for write/fsync I/O wait time (access time + data transfer). There's one more thread, the "delayer". We need it to handle a race condition related to the computation of the compressed size (which is only possible after hashing/compression/encryption has finished). This "csize" makes all this code quite more complicated than if we would not need it. Although there is the GIL issue for Python code, we can still make good use of multithreading as I/O operations and C code (that releases the GIL) can run in parallel. All threads are connected via Python Queues (which are intended for this and thread safe). The Cache.chunks datastructure is also updated by threadsafe code. A little benchmark ------------------ Both is with compression (zlib level 6) and encryption on a haswell/ssd laptop: Without multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 13.78 System time (seconds): 0.40 Percent of CPU this job got: 83% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:16.98 With multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 24.08 System time (seconds): 1.16 Percent of CPU this job got: 249% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.11 It's unclear to me why it uses much more "User time" (I'm not even sure that measurement is correct). But the overall runtime "Elapsed" significantly dropped and it makes better use of all cpu cores (not just 83% of one).
2015-05-25 16:37:15 -04:00
def seen_or_announce_chunk(self, id, size):
"""return True if we have seen the chunk <id> already (thus, we already have it or will have it soon).
in case we don't have seen it, announce its (future) availability, return False.
must be thread safe.
"""
with self.thread_lock:
try:
# did we see this id already (and is count > 0)?
count, _size, _csize = self.chunks[id]
if size != _size:
raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % (
id, _size, size))
multithreaded "create" operation Making much better use of the CPU by dispatching all CPU intensive stuff (hashing, crypto, compression) to N crypter threads (N == logical cpu count == 4 for a dual-core CPU with hyperthreading). I/O intensive stuff also runs in separate threads: the MainThread does the filesystem traversal, the reader thread reads and chunks the files, the writer thread writes to the repo. This way, we don't need to sit idle waiting for I/O, but the I/O thread will block and another thread will get dispatched and use the time. This applies for read as well as for write/fsync I/O wait time (access time + data transfer). There's one more thread, the "delayer". We need it to handle a race condition related to the computation of the compressed size (which is only possible after hashing/compression/encryption has finished). This "csize" makes all this code quite more complicated than if we would not need it. Although there is the GIL issue for Python code, we can still make good use of multithreading as I/O operations and C code (that releases the GIL) can run in parallel. All threads are connected via Python Queues (which are intended for this and thread safe). The Cache.chunks datastructure is also updated by threadsafe code. A little benchmark ------------------ Both is with compression (zlib level 6) and encryption on a haswell/ssd laptop: Without multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 13.78 System time (seconds): 0.40 Percent of CPU this job got: 83% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:16.98 With multithreading code: Command being timed: "borg create /extra/attic/borg::1 /home/tw/Desktop/" User time (seconds): 24.08 System time (seconds): 1.16 Percent of CPU this job got: 249% Elapsed (wall clock) time (h:mm:ss or m:ss): 0:10.11 It's unclear to me why it uses much more "User time" (I'm not even sure that measurement is correct). But the overall runtime "Elapsed" significantly dropped and it makes better use of all cpu cores (not just 83% of one).
2015-05-25 16:37:15 -04:00
return count > 0
except KeyError:
# announce that we will put this chunk soon,
# so that deduplication knows we already have it.
self.chunks[id] = 1, size, 0
return False
def chunk_incref(self, id, stats):
2010-12-21 15:29:09 -05:00
if not self.txn_active:
self.begin_txn()
count, size, csize = self.chunks[id]
self.chunks[id] = (count + 1, size, csize)
stats.update(size, csize, False)
return id, size, csize
2010-03-06 12:25:35 -05:00
def chunk_decref(self, id, stats):
2010-12-21 15:29:09 -05:00
if not self.txn_active:
self.begin_txn()
count, size, csize = self.chunks[id]
if count == 1:
2010-12-21 15:29:09 -05:00
del self.chunks[id]
2013-06-20 06:44:58 -04:00
self.repository.delete(id, wait=False)
stats.update(-size, -csize, True)
else:
self.chunks[id] = (count - 1, size, csize)
stats.update(-size, -csize, False)
def file_known_and_unchanged(self, path_hash, st):
if not (self.do_files and stat.S_ISREG(st.st_mode)):
return None
if self.files is None:
2011-07-02 14:39:35 -04:00
self._read_files()
2010-12-21 15:29:09 -05:00
entry = self.files.get(path_hash)
if not entry:
return None
entry = msgpack.unpackb(entry)
if entry[2] == st.st_size and bigint_to_int(entry[3]) == st.st_mtime_ns and entry[1] == st.st_ino:
2010-10-26 14:50:30 -04:00
# reset entry age
entry[0] = 0
self.files[path_hash] = msgpack.packb(entry)
return entry[4]
else:
return None
2010-12-21 15:29:09 -05:00
def memorize_file(self, path_hash, st, ids):
if not (self.do_files and stat.S_ISREG(st.st_mode)):
return
# Entry: Age, inode, size, mtime, chunk ids
mtime_ns = st.st_mtime_ns
2014-06-03 16:00:34 -04:00
self.files[path_hash] = msgpack.packb((0, st.st_ino, st.st_size, int_to_bigint(mtime_ns), ids))
self._newest_mtime = max(self._newest_mtime, mtime_ns)