mirror of
https://github.com/borgbackup/borg.git
synced 2026-03-24 19:34:23 -04:00
when borg mount is used without -f/--foreground (so that the FUSE borg process was started daemonized in the background), it did not display the rc of the main process, even when --show-rc was used. now it does display the rc of the main process. note that this is rather a consistency fix than being super useful, because the main "action" happens in the background daemon process, not in the main process.
729 lines
33 KiB
Python
729 lines
33 KiB
Python
import errno
|
|
import functools
|
|
import io
|
|
import os
|
|
import stat
|
|
import struct
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from collections import defaultdict
|
|
from signal import SIGINT
|
|
|
|
from .fuse_impl import llfuse, has_pyfuse3
|
|
|
|
|
|
if has_pyfuse3:
|
|
import trio
|
|
|
|
def async_wrapper(fn):
|
|
@functools.wraps(fn)
|
|
async def wrapper(*args, **kwargs):
|
|
return fn(*args, **kwargs)
|
|
return wrapper
|
|
else:
|
|
trio = None
|
|
|
|
def async_wrapper(fn):
|
|
return fn
|
|
|
|
|
|
from .logger import create_logger
|
|
logger = create_logger()
|
|
|
|
from .crypto.low_level import blake2b_128
|
|
from .archiver import Archiver
|
|
from .archive import Archive, get_item_uid_gid
|
|
from .hashindex import FuseVersionsIndex
|
|
from .helpers import daemonize, daemonizing, hardlinkable, signal_handler, format_file_size, Error
|
|
from .helpers import msgpack
|
|
from .item import Item
|
|
from .lrucache import LRUCache
|
|
from .platform import uid2user, gid2group
|
|
from .platformflags import is_darwin
|
|
from .remote import RemoteRepository
|
|
|
|
|
|
def fuse_main():
|
|
if has_pyfuse3:
|
|
try:
|
|
trio.run(llfuse.main)
|
|
except:
|
|
return 1 # TODO return signal number if it was killed by signal
|
|
else:
|
|
return None
|
|
else:
|
|
return llfuse.main(workers=1)
|
|
|
|
|
|
# size of some LRUCaches (1 element per simultaneously open file)
|
|
# note: _inode_cache might have rather large elements - Item.chunks can be large!
|
|
# also, simultaneously reading too many files should be avoided anyway.
|
|
# thus, do not set FILES to high values.
|
|
FILES = 4
|
|
|
|
|
|
class ItemCache:
|
|
"""
|
|
This is the "meat" of the file system's metadata storage.
|
|
|
|
This class generates inode numbers that efficiently index items in archives,
|
|
and retrieves items from these inode numbers.
|
|
"""
|
|
|
|
# 2 MiB are approximately ~230000 items (depends on the average number of items per metadata chunk).
|
|
#
|
|
# Since growing a bytearray has to copy it, growing it will converge to O(n^2), however,
|
|
# this is not yet relevant due to the swiftness of copying memory. If it becomes an issue,
|
|
# use an anonymous mmap and just resize that (or, if on 64 bit, make it so big you never need
|
|
# to resize it in the first place; that's free).
|
|
GROW_META_BY = 2 * 1024 * 1024
|
|
|
|
indirect_entry_struct = struct.Struct('=cII')
|
|
assert indirect_entry_struct.size == 9
|
|
|
|
def __init__(self, decrypted_repository):
|
|
self.decrypted_repository = decrypted_repository
|
|
# self.meta, the "meta-array" is a densely packed array of metadata about where items can be found.
|
|
# It is indexed by the inode number minus self.offset. (This is in a way eerily similar to how the first
|
|
# unices did this).
|
|
# The meta-array contains chunk IDs and item entries (described in iter_archive_items).
|
|
# The chunk IDs are referenced by item entries through relative offsets,
|
|
# which are bounded by the metadata chunk size.
|
|
self.meta = bytearray()
|
|
# The current write offset in self.meta
|
|
self.write_offset = 0
|
|
|
|
# Offset added to meta-indices, resulting in inodes,
|
|
# or subtracted from inodes, resulting in meta-indices.
|
|
# XXX: Merge FuseOperations.items and ItemCache to avoid
|
|
# this implicit limitation / hack (on the number of synthetic inodes, degenerate
|
|
# cases can inflate their number far beyond the number of archives).
|
|
self.offset = 1000000
|
|
|
|
# A temporary file that contains direct items, i.e. items directly cached in this layer.
|
|
# These are items that span more than one chunk and thus cannot be efficiently cached
|
|
# by the object cache (self.decrypted_repository), which would require variable-length structures;
|
|
# possible but not worth the effort, see iter_archive_items.
|
|
self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
|
|
|
|
# A small LRU cache for chunks requested by ItemCache.get() from the object cache,
|
|
# this significantly speeds up directory traversal and similar operations which
|
|
# tend to re-read the same chunks over and over.
|
|
# The capacity is kept low because increasing it does not provide any significant advantage,
|
|
# but makes LRUCache's square behaviour noticeable and consumes more memory.
|
|
self.chunks = LRUCache(capacity=10, dispose=lambda _: None)
|
|
|
|
# Instrumentation
|
|
# Count of indirect items, i.e. data is cached in the object cache, not directly in this cache
|
|
self.indirect_items = 0
|
|
# Count of direct items, i.e. data is in self.fd
|
|
self.direct_items = 0
|
|
|
|
def get(self, inode):
|
|
offset = inode - self.offset
|
|
if offset < 0:
|
|
raise ValueError('ItemCache.get() called with an invalid inode number')
|
|
if self.meta[offset] == ord(b'I'):
|
|
_, chunk_id_relative_offset, chunk_offset = self.indirect_entry_struct.unpack_from(self.meta, offset)
|
|
chunk_id_offset = offset - chunk_id_relative_offset
|
|
# bytearray slices are bytearrays as well, explicitly convert to bytes()
|
|
chunk_id = bytes(self.meta[chunk_id_offset:chunk_id_offset + 32])
|
|
chunk = self.chunks.get(chunk_id)
|
|
if not chunk:
|
|
csize, chunk = next(self.decrypted_repository.get_many([chunk_id]))
|
|
self.chunks[chunk_id] = chunk
|
|
data = memoryview(chunk)[chunk_offset:]
|
|
unpacker = msgpack.Unpacker()
|
|
unpacker.feed(data)
|
|
return Item(internal_dict=next(unpacker))
|
|
elif self.meta[offset] == ord(b'S'):
|
|
fd_offset = int.from_bytes(self.meta[offset + 1:offset + 9], 'little')
|
|
self.fd.seek(fd_offset, io.SEEK_SET)
|
|
return Item(internal_dict=next(msgpack.Unpacker(self.fd, read_size=1024)))
|
|
else:
|
|
raise ValueError('Invalid entry type in self.meta')
|
|
|
|
def iter_archive_items(self, archive_item_ids, filter=None, consider_part_files=False):
|
|
unpacker = msgpack.Unpacker()
|
|
|
|
# Current offset in the metadata stream, which consists of all metadata chunks glued together
|
|
stream_offset = 0
|
|
# Offset of the current chunk in the metadata stream
|
|
chunk_begin = 0
|
|
# Length of the chunk preceding the current chunk
|
|
last_chunk_length = 0
|
|
msgpacked_bytes = b''
|
|
|
|
write_offset = self.write_offset
|
|
meta = self.meta
|
|
pack_indirect_into = self.indirect_entry_struct.pack_into
|
|
|
|
for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)):
|
|
# Store the chunk ID in the meta-array
|
|
if write_offset + 32 >= len(meta):
|
|
self.meta = meta = meta + bytes(self.GROW_META_BY)
|
|
meta[write_offset:write_offset + 32] = key
|
|
current_id_offset = write_offset
|
|
write_offset += 32
|
|
|
|
chunk_begin += last_chunk_length
|
|
last_chunk_length = len(data)
|
|
|
|
unpacker.feed(data)
|
|
while True:
|
|
try:
|
|
item = unpacker.unpack()
|
|
need_more_data = False
|
|
except msgpack.OutOfData:
|
|
need_more_data = True
|
|
|
|
start = stream_offset - chunk_begin
|
|
# tell() is not helpful for the need_more_data case, but we know it is the remainder
|
|
# of the data in that case. in the other case, tell() works as expected.
|
|
length = (len(data) - start) if need_more_data else (unpacker.tell() - stream_offset)
|
|
msgpacked_bytes += data[start:start+length]
|
|
stream_offset += length
|
|
|
|
if need_more_data:
|
|
# Need more data, feed the next chunk
|
|
break
|
|
|
|
item = Item(internal_dict=item)
|
|
if filter and not filter(item) or not consider_part_files and 'part' in item:
|
|
msgpacked_bytes = b''
|
|
continue
|
|
|
|
current_item = msgpacked_bytes
|
|
current_item_length = len(current_item)
|
|
current_spans_chunks = stream_offset - current_item_length < chunk_begin
|
|
msgpacked_bytes = b''
|
|
|
|
if write_offset + 9 >= len(meta):
|
|
self.meta = meta = meta + bytes(self.GROW_META_BY)
|
|
|
|
# item entries in the meta-array come in two different flavours, both nine bytes long.
|
|
# (1) for items that span chunks:
|
|
#
|
|
# 'S' + 8 byte offset into the self.fd file, where the msgpacked item starts.
|
|
#
|
|
# (2) for items that are completely contained in one chunk, which usually is the great majority
|
|
# (about 700:1 for system backups)
|
|
#
|
|
# 'I' + 4 byte offset where the chunk ID is + 4 byte offset in the chunk
|
|
# where the msgpacked items starts
|
|
#
|
|
# The chunk ID offset is the number of bytes _back_ from the start of the entry, i.e.:
|
|
#
|
|
# |Chunk ID| .... |S1234abcd|
|
|
# ^------ offset ----------^
|
|
|
|
if current_spans_chunks:
|
|
pos = self.fd.seek(0, io.SEEK_END)
|
|
self.fd.write(current_item)
|
|
meta[write_offset:write_offset + 9] = b'S' + pos.to_bytes(8, 'little')
|
|
self.direct_items += 1
|
|
else:
|
|
item_offset = stream_offset - current_item_length - chunk_begin
|
|
pack_indirect_into(meta, write_offset, b'I', write_offset - current_id_offset, item_offset)
|
|
self.indirect_items += 1
|
|
inode = write_offset + self.offset
|
|
write_offset += 9
|
|
|
|
yield inode, item
|
|
|
|
self.write_offset = write_offset
|
|
|
|
|
|
class FuseBackend:
|
|
"""Virtual filesystem based on archive(s) to provide information to fuse
|
|
"""
|
|
|
|
def __init__(self, key, manifest, repository, args, decrypted_repository):
|
|
self.repository_uncached = repository
|
|
self._args = args
|
|
self.numeric_ids = args.numeric_ids
|
|
self._manifest = manifest
|
|
self.key = key
|
|
# Maps inode numbers to Item instances. This is used for synthetic inodes, i.e. file-system objects that are
|
|
# made up and are not contained in the archives. For example archive directories or intermediate directories
|
|
# not contained in archives.
|
|
self._items = {}
|
|
# cache up to <FILES> Items
|
|
self._inode_cache = LRUCache(capacity=FILES, dispose=lambda _: None)
|
|
# _inode_count is the current count of synthetic inodes, i.e. those in self._items
|
|
self.inode_count = 0
|
|
# Maps inode numbers to the inode number of the parent
|
|
self.parent = {}
|
|
# Maps inode numbers to a dictionary mapping byte directory entry names to their inode numbers,
|
|
# i.e. this contains all dirents of everything that is mounted. (It becomes really big).
|
|
self.contents = defaultdict(dict)
|
|
self.default_uid = os.getuid()
|
|
self.default_gid = os.getgid()
|
|
self.default_dir = None
|
|
# Archives to be loaded when first accessed, mapped by their placeholder inode
|
|
self.pending_archives = {}
|
|
self.cache = ItemCache(decrypted_repository)
|
|
self.allow_damaged_files = False
|
|
self.versions = False
|
|
self.uid_forced = None
|
|
self.gid_forced = None
|
|
self.umask = 0
|
|
|
|
def _create_filesystem(self):
|
|
self._create_dir(parent=1) # first call, create root dir (inode == 1)
|
|
if self._args.location.archive:
|
|
if self.versions:
|
|
raise Error("for versions view, do not specify a single archive, "
|
|
"but always give the repository as location.")
|
|
self._process_archive(self._args.location.archive)
|
|
else:
|
|
self.versions_index = FuseVersionsIndex()
|
|
for archive in self._manifest.archives.list_considering(self._args):
|
|
if self.versions:
|
|
# process archives immediately
|
|
self._process_archive(archive.name)
|
|
else:
|
|
# lazily load archives, create archive placeholder inode
|
|
archive_inode = self._create_dir(parent=1, mtime=int(archive.ts.timestamp() * 1e9))
|
|
self.contents[1][os.fsencode(archive.name)] = archive_inode
|
|
self.pending_archives[archive_inode] = archive.name
|
|
|
|
def get_item(self, inode):
|
|
item = self._inode_cache.get(inode)
|
|
if item is not None:
|
|
return item
|
|
try:
|
|
# this is a cheap get-from-dictionary operation, no need to cache the result.
|
|
return self._items[inode]
|
|
except KeyError:
|
|
# while self.cache does some internal caching, it has still quite some overhead, so we cache the result.
|
|
item = self.cache.get(inode)
|
|
self._inode_cache[inode] = item
|
|
return item
|
|
|
|
def check_pending_archive(self, inode):
|
|
# Check if this is an archive we need to load
|
|
archive_name = self.pending_archives.pop(inode, None)
|
|
if archive_name is not None:
|
|
self._process_archive(archive_name, [os.fsencode(archive_name)])
|
|
|
|
def _allocate_inode(self):
|
|
self.inode_count += 1
|
|
return self.inode_count
|
|
|
|
def _create_dir(self, parent, mtime=None):
|
|
"""Create directory
|
|
"""
|
|
ino = self._allocate_inode()
|
|
if mtime is not None:
|
|
self._items[ino] = Item(internal_dict=self.default_dir.as_dict())
|
|
self._items[ino].mtime = mtime
|
|
else:
|
|
self._items[ino] = self.default_dir
|
|
self.parent[ino] = parent
|
|
return ino
|
|
|
|
def find_inode(self, path, prefix=[]):
|
|
segments = prefix + path.split(b'/')
|
|
inode = 1
|
|
for segment in segments:
|
|
inode = self.contents[inode][segment]
|
|
return inode
|
|
|
|
def _process_archive(self, archive_name, prefix=[]):
|
|
"""Build FUSE inode hierarchy from archive metadata
|
|
"""
|
|
self.file_versions = {} # for versions mode: original path -> version
|
|
t0 = time.perf_counter()
|
|
archive = Archive(self.repository_uncached, self.key, self._manifest, archive_name,
|
|
consider_part_files=self._args.consider_part_files)
|
|
strip_components = self._args.strip_components
|
|
matcher = Archiver.build_matcher(self._args.patterns, self._args.paths)
|
|
partial_extract = not matcher.empty() or strip_components
|
|
hardlink_masters = {} if partial_extract else None
|
|
|
|
def peek_and_store_hardlink_masters(item, matched):
|
|
if (partial_extract and not matched and hardlinkable(item.mode) and
|
|
item.get('hardlink_master', True) and 'source' not in item):
|
|
hardlink_masters[item.get('path')] = (item.get('chunks'), None)
|
|
|
|
filter = Archiver.build_filter(matcher, peek_and_store_hardlink_masters, strip_components)
|
|
for item_inode, item in self.cache.iter_archive_items(archive.metadata.items, filter=filter,
|
|
consider_part_files=self._args.consider_part_files):
|
|
if strip_components:
|
|
item.path = os.sep.join(item.path.split(os.sep)[strip_components:])
|
|
path = os.fsencode(item.path)
|
|
is_dir = stat.S_ISDIR(item.mode)
|
|
if is_dir:
|
|
try:
|
|
# This can happen if an archive was created with a command line like
|
|
# $ borg create ... dir1/file dir1
|
|
# In this case the code below will have created a default_dir inode for dir1 already.
|
|
inode = self.find_inode(path, prefix)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self._items[inode] = item
|
|
continue
|
|
segments = prefix + path.split(b'/')
|
|
parent = 1
|
|
for segment in segments[:-1]:
|
|
parent = self._process_inner(segment, parent)
|
|
self._process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode,
|
|
hardlink_masters, strip_components)
|
|
duration = time.perf_counter() - t0
|
|
logger.debug('fuse: _process_archive completed in %.1f s for archive %s', duration, archive.name)
|
|
|
|
def _process_leaf(self, name, item, parent, prefix, is_dir, item_inode, hardlink_masters, stripped_components):
|
|
path = item.path
|
|
del item.path # save some space
|
|
hardlink_masters = hardlink_masters or {}
|
|
|
|
def file_version(item, path):
|
|
if 'chunks' in item:
|
|
file_id = blake2b_128(path)
|
|
current_version, previous_id = self.versions_index.get(file_id, (0, None))
|
|
|
|
contents_id = blake2b_128(b''.join(chunk_id for chunk_id, _, _ in item.chunks))
|
|
|
|
if contents_id != previous_id:
|
|
current_version += 1
|
|
self.versions_index[file_id] = current_version, contents_id
|
|
|
|
return current_version
|
|
|
|
def make_versioned_name(name, version, add_dir=False):
|
|
if add_dir:
|
|
# add intermediate directory with same name as filename
|
|
path_fname = name.rsplit(b'/', 1)
|
|
name += b'/' + path_fname[-1]
|
|
# keep original extension at end to avoid confusing tools
|
|
name, ext = os.path.splitext(name)
|
|
version_enc = os.fsencode('.%05d' % version)
|
|
return name + version_enc + ext
|
|
|
|
if 'source' in item and hardlinkable(item.mode):
|
|
source = os.sep.join(item.source.split(os.sep)[stripped_components:])
|
|
chunks, link_target = hardlink_masters.get(item.source, (None, source))
|
|
if link_target:
|
|
# Hard link was extracted previously, just link
|
|
link_target = os.fsencode(link_target)
|
|
if self.versions:
|
|
# adjust link target name with version
|
|
version = self.file_versions[link_target]
|
|
link_target = make_versioned_name(link_target, version, add_dir=True)
|
|
try:
|
|
inode = self.find_inode(link_target, prefix)
|
|
except KeyError:
|
|
logger.warning('Skipping broken hard link: %s -> %s', path, source)
|
|
return
|
|
item = self.get_item(inode)
|
|
item.nlink = item.get('nlink', 1) + 1
|
|
self._items[inode] = item
|
|
elif chunks is not None:
|
|
# assign chunks to this item, since the item which had the chunks was not extracted
|
|
item.chunks = chunks
|
|
inode = item_inode
|
|
self._items[inode] = item
|
|
if hardlink_masters:
|
|
# Update master entry with extracted item path, so that following hardlinks don't extract twice.
|
|
hardlink_masters[item.source] = (None, path)
|
|
else:
|
|
inode = item_inode
|
|
|
|
if self.versions and not is_dir:
|
|
parent = self._process_inner(name, parent)
|
|
enc_path = os.fsencode(path)
|
|
version = file_version(item, enc_path)
|
|
if version is not None:
|
|
# regular file, with contents - maybe a hardlink master
|
|
name = make_versioned_name(name, version)
|
|
self.file_versions[enc_path] = version
|
|
|
|
self.parent[inode] = parent
|
|
if name:
|
|
self.contents[parent][name] = inode
|
|
|
|
def _process_inner(self, name, parent_inode):
|
|
dir = self.contents[parent_inode]
|
|
if name in dir:
|
|
inode = dir[name]
|
|
else:
|
|
inode = self._create_dir(parent_inode)
|
|
if name:
|
|
dir[name] = inode
|
|
return inode
|
|
|
|
|
|
class FuseOperations(llfuse.Operations, FuseBackend):
|
|
"""Export archive as a FUSE filesystem
|
|
"""
|
|
|
|
def __init__(self, key, repository, manifest, args, decrypted_repository):
|
|
llfuse.Operations.__init__(self)
|
|
FuseBackend.__init__(self, key, manifest, repository, args, decrypted_repository)
|
|
self.decrypted_repository = decrypted_repository
|
|
data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
|
|
logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
|
|
self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
|
|
self._last_pos = LRUCache(capacity=FILES, dispose=lambda _: None)
|
|
|
|
def sig_info_handler(self, sig_no, stack):
|
|
logger.debug('fuse: %d synth inodes, %d edges (%s)',
|
|
self.inode_count, len(self.parent),
|
|
# getsizeof is the size of the dict itself; key and value are two small-ish integers,
|
|
# which are shared due to code structure (this has been verified).
|
|
format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self.inode_count)))
|
|
logger.debug('fuse: %d pending archives', len(self.pending_archives))
|
|
logger.debug('fuse: ItemCache %d entries (%d direct, %d indirect), meta-array size %s, direct items size %s',
|
|
self.cache.direct_items + self.cache.indirect_items, self.cache.direct_items, self.cache.indirect_items,
|
|
format_file_size(sys.getsizeof(self.cache.meta)),
|
|
format_file_size(os.stat(self.cache.fd.fileno()).st_size))
|
|
logger.debug('fuse: data cache: %d/%d entries, %s', len(self.data_cache.items()), self.data_cache._capacity,
|
|
format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())))
|
|
self.decrypted_repository.log_instrumentation()
|
|
|
|
def mount(self, mountpoint, mount_options, foreground=False, show_rc=False):
|
|
"""Mount filesystem on *mountpoint* with *mount_options*."""
|
|
|
|
def pop_option(options, key, present, not_present, wanted_type, int_base=0):
|
|
assert isinstance(options, list) # we mutate this
|
|
for idx, option in enumerate(options):
|
|
if option == key:
|
|
options.pop(idx)
|
|
return present
|
|
if option.startswith(key + '='):
|
|
options.pop(idx)
|
|
value = option.split('=', 1)[1]
|
|
if wanted_type is bool:
|
|
v = value.lower()
|
|
if v in ('y', 'yes', 'true', '1'):
|
|
return True
|
|
if v in ('n', 'no', 'false', '0'):
|
|
return False
|
|
raise ValueError('unsupported value in option: %s' % option)
|
|
if wanted_type is int:
|
|
try:
|
|
return int(value, base=int_base)
|
|
except ValueError:
|
|
raise ValueError('unsupported value in option: %s' % option) from None
|
|
try:
|
|
return wanted_type(value)
|
|
except ValueError:
|
|
raise ValueError('unsupported value in option: %s' % option) from None
|
|
else:
|
|
return not_present
|
|
|
|
# default_permissions enables permission checking by the kernel. Without
|
|
# this, any umask (or uid/gid) would not have an effect and this could
|
|
# cause security issues if used with allow_other mount option.
|
|
# When not using allow_other or allow_root, access is limited to the
|
|
# mounting user anyway.
|
|
options = ['fsname=borgfs', 'ro', 'default_permissions']
|
|
if mount_options:
|
|
options.extend(mount_options.split(','))
|
|
if is_darwin:
|
|
# macFUSE supports a volname mount option to give what Finder displays on the desktop / in directory list.
|
|
volname = pop_option(options, 'volname', '', '', str)
|
|
# if the user did not specify it, we make something up,
|
|
# because otherwise it would be "macFUSE Volume 0 (Python)", #7690.
|
|
volname = volname or f"{os.path.basename(mountpoint)} (borgfs)"
|
|
options.append(f"volname={volname}")
|
|
ignore_permissions = pop_option(options, 'ignore_permissions', True, False, bool)
|
|
if ignore_permissions:
|
|
# in case users have a use-case that requires NOT giving "default_permissions",
|
|
# this is enabled by the custom "ignore_permissions" mount option which just
|
|
# removes "default_permissions" again:
|
|
pop_option(options, 'default_permissions', True, False, bool)
|
|
self.allow_damaged_files = pop_option(options, 'allow_damaged_files', True, False, bool)
|
|
self.versions = pop_option(options, 'versions', True, False, bool)
|
|
self.uid_forced = pop_option(options, 'uid', None, None, int)
|
|
self.gid_forced = pop_option(options, 'gid', None, None, int)
|
|
self.umask = pop_option(options, 'umask', 0, 0, int, int_base=8) # umask is octal, e.g. 222 or 0222
|
|
dir_uid = self.uid_forced if self.uid_forced is not None else self.default_uid
|
|
dir_gid = self.gid_forced if self.gid_forced is not None else self.default_gid
|
|
dir_user = uid2user(dir_uid)
|
|
dir_group = gid2group(dir_gid)
|
|
assert isinstance(dir_user, str)
|
|
assert isinstance(dir_group, str)
|
|
dir_mode = 0o40755 & ~self.umask
|
|
self.default_dir = Item(mode=dir_mode, mtime=int(time.time() * 1e9),
|
|
user=dir_user, group=dir_group, uid=dir_uid, gid=dir_gid)
|
|
self._create_filesystem()
|
|
llfuse.init(self, mountpoint, options)
|
|
if not foreground:
|
|
if isinstance(self.repository_uncached, RemoteRepository):
|
|
daemonize()
|
|
else:
|
|
with daemonizing(show_rc=show_rc) as (old_id, new_id):
|
|
# local repo: the locking process' PID is changing, migrate it:
|
|
logger.debug('fuse: mount local repo, going to background: migrating lock.')
|
|
self.repository_uncached.migrate_lock(old_id, new_id)
|
|
|
|
# If the file system crashes, we do not want to umount because in that
|
|
# case the mountpoint suddenly appears to become empty. This can have
|
|
# nasty consequences, imagine the user has e.g. an active rsync mirror
|
|
# job - seeing the mountpoint empty, rsync would delete everything in the
|
|
# mirror.
|
|
umount = False
|
|
try:
|
|
with signal_handler('SIGUSR1', self.sig_info_handler), \
|
|
signal_handler('SIGINFO', self.sig_info_handler):
|
|
signal = fuse_main()
|
|
# no crash and no signal (or it's ^C and we're in the foreground) -> umount request
|
|
umount = (signal is None or (signal == SIGINT and foreground))
|
|
finally:
|
|
llfuse.close(umount)
|
|
|
|
@async_wrapper
|
|
def statfs(self, ctx=None):
|
|
stat_ = llfuse.StatvfsData()
|
|
stat_.f_bsize = 512
|
|
stat_.f_frsize = 512
|
|
stat_.f_blocks = 0
|
|
stat_.f_bfree = 0
|
|
stat_.f_bavail = 0
|
|
stat_.f_files = 0
|
|
stat_.f_ffree = 0
|
|
stat_.f_favail = 0
|
|
stat_.f_namemax = 255 # == NAME_MAX (depends on archive source OS / FS)
|
|
return stat_
|
|
|
|
def _getattr(self, inode, ctx=None):
|
|
item = self.get_item(inode)
|
|
entry = llfuse.EntryAttributes()
|
|
entry.st_ino = inode
|
|
entry.generation = 0
|
|
entry.entry_timeout = 300
|
|
entry.attr_timeout = 300
|
|
entry.st_mode = item.mode & ~self.umask
|
|
entry.st_nlink = item.get('nlink', 1)
|
|
entry.st_uid, entry.st_gid = get_item_uid_gid(item, numeric=self.numeric_ids,
|
|
uid_default=self.default_uid, gid_default=self.default_gid,
|
|
uid_forced=self.uid_forced, gid_forced=self.gid_forced)
|
|
entry.st_rdev = item.get('rdev', 0)
|
|
entry.st_size = item.get_size()
|
|
entry.st_blksize = 512
|
|
entry.st_blocks = (entry.st_size + entry.st_blksize - 1) // entry.st_blksize
|
|
# note: older archives only have mtime (not atime nor ctime)
|
|
entry.st_mtime_ns = mtime_ns = item.mtime
|
|
entry.st_atime_ns = item.get('atime', mtime_ns)
|
|
entry.st_ctime_ns = item.get('ctime', mtime_ns)
|
|
entry.st_birthtime_ns = item.get('birthtime', mtime_ns)
|
|
return entry
|
|
|
|
@async_wrapper
|
|
def getattr(self, inode, ctx=None):
|
|
return self._getattr(inode, ctx=ctx)
|
|
|
|
@async_wrapper
|
|
def listxattr(self, inode, ctx=None):
|
|
item = self.get_item(inode)
|
|
return item.get('xattrs', {}).keys()
|
|
|
|
@async_wrapper
|
|
def getxattr(self, inode, name, ctx=None):
|
|
item = self.get_item(inode)
|
|
try:
|
|
return item.get('xattrs', {})[name] or b''
|
|
except KeyError:
|
|
raise llfuse.FUSEError(llfuse.ENOATTR) from None
|
|
|
|
@async_wrapper
|
|
def lookup(self, parent_inode, name, ctx=None):
|
|
self.check_pending_archive(parent_inode)
|
|
if name == b'.':
|
|
inode = parent_inode
|
|
elif name == b'..':
|
|
inode = self.parent[parent_inode]
|
|
else:
|
|
inode = self.contents[parent_inode].get(name)
|
|
if not inode:
|
|
raise llfuse.FUSEError(errno.ENOENT)
|
|
return self._getattr(inode)
|
|
|
|
@async_wrapper
|
|
def open(self, inode, flags, ctx=None):
|
|
if not self.allow_damaged_files:
|
|
item = self.get_item(inode)
|
|
if 'chunks_healthy' in item:
|
|
# Processed archive items don't carry the path anymore; for converting the inode
|
|
# to the path we'd either have to store the inverse of the current structure,
|
|
# or search the entire archive. So we just don't print it. It's easy to correlate anyway.
|
|
logger.warning('File has damaged (all-zero) chunks. Try running borg check --repair. '
|
|
'Mount with allow_damaged_files to read damaged files.')
|
|
raise llfuse.FUSEError(errno.EIO)
|
|
return llfuse.FileInfo(fh=inode) if has_pyfuse3 else inode
|
|
|
|
@async_wrapper
|
|
def opendir(self, inode, ctx=None):
|
|
self.check_pending_archive(inode)
|
|
return inode
|
|
|
|
@async_wrapper
|
|
def read(self, fh, offset, size):
|
|
parts = []
|
|
item = self.get_item(fh)
|
|
|
|
# optimize for linear reads:
|
|
# we cache the chunk number and the in-file offset of the chunk in _last_pos[fh]
|
|
chunk_no, chunk_offset = self._last_pos.get(fh, (0, 0))
|
|
if chunk_offset > offset:
|
|
# this is not a linear read, so we lost track and need to start from beginning again...
|
|
chunk_no, chunk_offset = (0, 0)
|
|
|
|
offset -= chunk_offset
|
|
chunks = item.chunks
|
|
# note: using index iteration to avoid frequently copying big (sub)lists by slicing
|
|
for idx in range(chunk_no, len(chunks)):
|
|
id, s, csize = chunks[idx]
|
|
if s < offset:
|
|
offset -= s
|
|
chunk_offset += s
|
|
chunk_no += 1
|
|
continue
|
|
n = min(size, s - offset)
|
|
if id in self.data_cache:
|
|
data = self.data_cache[id]
|
|
if offset + n == len(data):
|
|
# evict fully read chunk from cache
|
|
del self.data_cache[id]
|
|
else:
|
|
data = self.key.decrypt(id, self.repository_uncached.get(id))
|
|
if offset + n < len(data):
|
|
# chunk was only partially read, cache it
|
|
self.data_cache[id] = data
|
|
parts.append(data[offset:offset + n])
|
|
offset = 0
|
|
size -= n
|
|
if not size:
|
|
if fh in self._last_pos:
|
|
self._last_pos.upd(fh, (chunk_no, chunk_offset))
|
|
else:
|
|
self._last_pos[fh] = (chunk_no, chunk_offset)
|
|
break
|
|
return b''.join(parts)
|
|
|
|
# note: we can't have a generator (with yield) and not a generator (async) in the same method
|
|
if has_pyfuse3:
|
|
async def readdir(self, fh, off, token):
|
|
entries = [(b'.', fh), (b'..', self.parent[fh])]
|
|
entries.extend(self.contents[fh].items())
|
|
for i, (name, inode) in enumerate(entries[off:], off):
|
|
attrs = self._getattr(inode)
|
|
if not llfuse.readdir_reply(token, name, attrs, i + 1):
|
|
break
|
|
|
|
else:
|
|
def readdir(self, fh, off):
|
|
entries = [(b'.', fh), (b'..', self.parent[fh])]
|
|
entries.extend(self.contents[fh].items())
|
|
for i, (name, inode) in enumerate(entries[off:], off):
|
|
attrs = self._getattr(inode)
|
|
yield name, attrs, i + 1
|
|
|
|
@async_wrapper
|
|
def readlink(self, inode, ctx=None):
|
|
item = self.get_item(inode)
|
|
return os.fsencode(item.source)
|