mirror of
https://github.com/borgbackup/borg.git
synced 2026-03-25 11:56:00 -04:00
this refactors umount code we already used for the testsuite into the platform module's namespace. also, it exposes that functionality via the cli api, so users can use it via "borg umount <mountpoint>", which is more consistent than using borg to mount and fusermount -u (or umount) to un-mount.
174 lines
5.8 KiB
Python
174 lines
5.8 KiB
Python
from contextlib import contextmanager
|
|
import filecmp
|
|
import os
|
|
import posix
|
|
import stat
|
|
import sys
|
|
import sysconfig
|
|
import time
|
|
import unittest
|
|
from ..xattr import get_all
|
|
from ..logger import setup_logging
|
|
from ..platform import umount
|
|
|
|
try:
|
|
import llfuse
|
|
# Does this version of llfuse support ns precision?
|
|
have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
|
|
except ImportError:
|
|
have_fuse_mtime_ns = False
|
|
|
|
has_lchflags = hasattr(os, 'lchflags')
|
|
|
|
|
|
# The mtime get/set precision varies on different OS and Python versions
|
|
if 'HAVE_FUTIMENS' in getattr(posix, '_have_functions', []):
|
|
st_mtime_ns_round = 0
|
|
elif 'HAVE_UTIMES' in sysconfig.get_config_vars():
|
|
st_mtime_ns_round = -6
|
|
else:
|
|
st_mtime_ns_round = -9
|
|
|
|
if sys.platform.startswith('netbsd'):
|
|
st_mtime_ns_round = -4 # only >1 microsecond resolution here?
|
|
|
|
# Ensure that the loggers exist for all tests
|
|
setup_logging()
|
|
|
|
|
|
def no_selinux(x):
|
|
# selinux fails our FUSE tests, thus ignore selinux xattrs
|
|
SELINUX_KEY = 'security.selinux'
|
|
if isinstance(x, dict):
|
|
return {k: v for k, v in x.items() if k != SELINUX_KEY}
|
|
if isinstance(x, list):
|
|
return [k for k in x if k != SELINUX_KEY]
|
|
|
|
|
|
class BaseTestCase(unittest.TestCase):
|
|
"""
|
|
"""
|
|
assert_in = unittest.TestCase.assertIn
|
|
assert_not_in = unittest.TestCase.assertNotIn
|
|
assert_equal = unittest.TestCase.assertEqual
|
|
assert_not_equal = unittest.TestCase.assertNotEqual
|
|
assert_raises = unittest.TestCase.assertRaises
|
|
assert_true = unittest.TestCase.assertTrue
|
|
|
|
@contextmanager
|
|
def assert_creates_file(self, path):
|
|
self.assert_true(not os.path.exists(path), '{} should not exist'.format(path))
|
|
yield
|
|
self.assert_true(os.path.exists(path), '{} should exist'.format(path))
|
|
|
|
def assert_dirs_equal(self, dir1, dir2, **kwargs):
|
|
diff = filecmp.dircmp(dir1, dir2)
|
|
self._assert_dirs_equal_cmp(diff, **kwargs)
|
|
|
|
def _assert_dirs_equal_cmp(self, diff, ignore_bsdflags=False, ignore_xattrs=False):
|
|
self.assert_equal(diff.left_only, [])
|
|
self.assert_equal(diff.right_only, [])
|
|
self.assert_equal(diff.diff_files, [])
|
|
self.assert_equal(diff.funny_files, [])
|
|
for filename in diff.common:
|
|
path1 = os.path.join(diff.left, filename)
|
|
path2 = os.path.join(diff.right, filename)
|
|
s1 = os.lstat(path1)
|
|
s2 = os.lstat(path2)
|
|
# Assume path2 is on FUSE if st_dev is different
|
|
fuse = s1.st_dev != s2.st_dev
|
|
attrs = ['st_mode', 'st_uid', 'st_gid', 'st_rdev']
|
|
if has_lchflags and not ignore_bsdflags:
|
|
attrs.append('st_flags')
|
|
if not fuse or not os.path.isdir(path1):
|
|
# dir nlink is always 1 on our fuse filesystem
|
|
attrs.append('st_nlink')
|
|
d1 = [filename] + [getattr(s1, a) for a in attrs]
|
|
d2 = [filename] + [getattr(s2, a) for a in attrs]
|
|
# ignore st_rdev if file is not a block/char device, fixes #203
|
|
if not stat.S_ISCHR(d1[1]) and not stat.S_ISBLK(d1[1]):
|
|
d1[4] = None
|
|
if not stat.S_ISCHR(d2[1]) and not stat.S_ISBLK(d2[1]):
|
|
d2[4] = None
|
|
# Older versions of llfuse do not support ns precision properly
|
|
if fuse and not have_fuse_mtime_ns:
|
|
d1.append(round(s1.st_mtime_ns, -4))
|
|
d2.append(round(s2.st_mtime_ns, -4))
|
|
else:
|
|
d1.append(round(s1.st_mtime_ns, st_mtime_ns_round))
|
|
d2.append(round(s2.st_mtime_ns, st_mtime_ns_round))
|
|
if not ignore_xattrs:
|
|
d1.append(no_selinux(get_all(path1, follow_symlinks=False)))
|
|
d2.append(no_selinux(get_all(path2, follow_symlinks=False)))
|
|
self.assert_equal(d1, d2)
|
|
for sub_diff in diff.subdirs.values():
|
|
self._assert_dirs_equal_cmp(sub_diff, ignore_bsdflags=ignore_bsdflags, ignore_xattrs=ignore_xattrs)
|
|
|
|
@contextmanager
|
|
def fuse_mount(self, location, mountpoint, *options):
|
|
os.mkdir(mountpoint)
|
|
args = ['mount', location, mountpoint] + list(options)
|
|
self.cmd(*args, fork=True)
|
|
self.wait_for_mount(mountpoint)
|
|
yield
|
|
umount(mountpoint)
|
|
os.rmdir(mountpoint)
|
|
# Give the daemon some time to exit
|
|
time.sleep(.2)
|
|
|
|
def wait_for_mount(self, path, timeout=5):
|
|
"""Wait until a filesystem is mounted on `path`
|
|
"""
|
|
timeout += time.time()
|
|
while timeout > time.time():
|
|
if os.path.ismount(path):
|
|
return
|
|
time.sleep(.1)
|
|
raise Exception('wait_for_mount(%s) timeout' % path)
|
|
|
|
|
|
class changedir:
|
|
def __init__(self, dir):
|
|
self.dir = dir
|
|
|
|
def __enter__(self):
|
|
self.old = os.getcwd()
|
|
os.chdir(self.dir)
|
|
|
|
def __exit__(self, *args, **kw):
|
|
os.chdir(self.old)
|
|
|
|
|
|
class environment_variable:
|
|
def __init__(self, **values):
|
|
self.values = values
|
|
self.old_values = {}
|
|
|
|
def __enter__(self):
|
|
for k, v in self.values.items():
|
|
self.old_values[k] = os.environ.get(k)
|
|
if v is None:
|
|
os.environ.pop(k, None)
|
|
else:
|
|
os.environ[k] = v
|
|
|
|
def __exit__(self, *args, **kw):
|
|
for k, v in self.old_values.items():
|
|
if v is None:
|
|
os.environ.pop(k, None)
|
|
else:
|
|
os.environ[k] = v
|
|
|
|
|
|
class FakeInputs:
|
|
"""Simulate multiple user inputs, can be used as input() replacement"""
|
|
def __init__(self, inputs):
|
|
self.inputs = inputs
|
|
|
|
def __call__(self, prompt=None):
|
|
if prompt is not None:
|
|
print(prompt, end='')
|
|
try:
|
|
return self.inputs.pop(0)
|
|
except IndexError:
|
|
raise EOFError from None
|