integrate mount2_cmds_test into mount_cmds_test

Updated mount_cmds_test.py to work with both llfuse/pyfuse3 and mfusepy
by checking for either implementation in skip conditions.

mfusepy: 2 test fails due to hardlink implementation differences
This commit is contained in:
Thomas Waldmann 2025-11-26 00:40:44 +01:00
parent ead93b6d12
commit 176dec80f3
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
2 changed files with 18 additions and 399 deletions

View file

@ -1,392 +0,0 @@
# this is testing the mount/umount commands with mfusepy implementation
import errno
import os
import sys
import time
import subprocess
from contextlib import contextmanager
from unittest.mock import patch
import pytest
from ...constants import * # NOQA
from ...helpers import flags_noatime, flags_normal
from .. import has_lchflags, changedir
from .. import same_ts_ns
from ..platform.platform_test import fakeroot_detected
from . import (
RK_ENCRYPTION,
cmd,
assert_dirs_equal,
create_test_files,
generate_archiver_tests,
create_src_archive,
open_archive,
src_file,
create_regular_file,
)
from . import requires_hardlinks, _extract_hardlinks_setup, are_hardlinks_supported
try:
import mfusepy
except ImportError:
mfusepy = None
pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA
@contextmanager
def fuse_mount2(archiver, mountpoint, *args, **kwargs):
os.makedirs(mountpoint, exist_ok=True)
# We use subprocess to run borg mount to ensure it runs in a separate process
# and we can control it via signals if needed.
# We use --foreground to keep it running.
cmd_args = ["mount", "--foreground"]
# We need to construct the command line carefully.
# args might contain options or paths.
# Usage: fuse_mount2(archiver, mountpoint, options...)
# The repo path is archiver.repository_path
# If we want to mount a specific archive: fuse_mount2(archiver, mountpoint, "-a", "archive_name", ...)
# The mount command uses: borg mount --repo REPO [options] MOUNTPOINT
location = archiver.repository_path
# Check if we have extra args that look like options
# Just pass all args to the command
# We put mountpoint first, then --repo location, then all other args
# This supports: borg mount [options] MOUNTPOINT --repo LOCATION [more options]
borg_cmd = [sys.executable, "-m", "borg"]
full_cmd = borg_cmd + cmd_args + [mountpoint, "--repo", location] + list(args)
# The mount command supports various options like -a/--match-archives, -o, paths, etc.
# All options are passed through in args.
# Command: borg mount [options] MOUNTPOINT --repo=LOCATION
borg_cmd = [sys.executable, "-m", "borg"]
# We pass mountpoint as positional arg, and repo as --repo
# options and other_args are passed as is
# full_cmd constructed above
env = os.environ.copy()
# Set BORG_FUSE_IMPL to use mfusepy implementation
env["BORG_FUSE_IMPL"] = "mfusepy"
# env["BORG_REPO"] = archiver.repository_location # Not needed if --repo is used, but keeps it safe?
# Actually, if we use --repo, we don't need BORG_REPO env var for the command,
# but we might need it for other things?
# Let's keep it but --repo should take precedence or be used.
env["BORG_RELOCATED_REPO_ACCESS_IS_OK"] = "yes"
# p = subprocess.Popen(full_cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# For debugging, let's inherit stderr
# p = subprocess.Popen(full_cmd, env=env, stdout=subprocess.PIPE, stderr=None)
log_file_path = os.path.join(os.getcwd(), "mount2.log")
log_file = open(log_file_path, "w")
p = subprocess.Popen(full_cmd, env=env, stdout=log_file, stderr=log_file)
# Wait for mount
timeout = 5
start = time.time()
while time.time() - start < timeout:
if os.path.ismount(mountpoint):
break
time.sleep(0.1)
else:
# Timeout or failed
p.terminate()
p.wait()
log_file.close()
with open(log_file_path, "r") as f:
output = f.read()
print("Mount failed to appear. Output:", output, file=sys.stderr)
# We might want to raise, but let's yield to let the test fail with a better error
# or maybe the test expects failure?
try:
yield
finally:
if not log_file.closed:
log_file.close()
if os.path.ismount(mountpoint):
# Try to umount
subprocess.call(["umount", mountpoint])
# If that fails (e.g. busy), we might need force or fusermount -u
if os.path.ismount(mountpoint):
subprocess.call(["fusermount", "-u", "-z", mountpoint])
p.terminate()
p.wait()
# Cleanup mountpoint dir if empty
try:
os.rmdir(mountpoint)
except OSError:
pass
def test_mount2_missing_mfuse(archivers, request):
archiver = request.getfixturevalue(archivers)
# Ensure mfuse is NOT in sys.modules or is None
with patch.dict(sys.modules, {"mfusepy": None}):
cmd(archiver, "repo-create", RK_ENCRYPTION)
cmd(archiver, "create", "archive", "input")
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
os.makedirs(mountpoint, exist_ok=True)
from ...helpers import CommandError
# Set BORG_FUSE_IMPL to mfusepy, but it won't be available
env = os.environ.copy()
env["BORG_FUSE_IMPL"] = "mfusepy"
try:
# This should fail because mfusepy is not available
cmd(archiver, "mount", "--repo", archiver.repository_path, "-a", "archive", mountpoint, fork=True, env=env)
except CommandError:
# We expect it to fail because mfuse is missing
# The error message might vary depending on how it's handled
pass
except Exception:
pass
@requires_hardlinks
@pytest.mark.skipif(mfusepy is None, reason="mfusepy not installed")
def test_fuse_mount_hardlinks(archivers, request):
archiver = request.getfixturevalue(archivers)
_extract_hardlinks_setup(archiver)
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
# we need to get rid of permissions checking because fakeroot causes issues with it.
# On all platforms, borg defaults to "default_permissions" and we need to get rid of it via "ignore_permissions".
# On macOS (darwin), we additionally need "defer_permissions" to switch off the checks in osxfuse.
if sys.platform == "darwin":
ignore_perms = ["-o", "ignore_permissions,defer_permissions"]
else:
ignore_perms = ["-o", "ignore_permissions"]
with fuse_mount2(archiver, mountpoint, "-a", "test", "--strip-components=2", *ignore_perms):
with changedir(os.path.join(mountpoint, "test")):
assert os.stat("hardlink").st_nlink == 2
assert os.stat("subdir/hardlink").st_nlink == 2
assert open("subdir/hardlink", "rb").read() == b"123456"
assert os.stat("aaaa").st_nlink == 2
assert os.stat("source2").st_nlink == 2
with fuse_mount2(archiver, mountpoint, "input/dir1", "-a", "test", *ignore_perms):
with changedir(os.path.join(mountpoint, "test")):
assert os.stat("input/dir1/hardlink").st_nlink == 2
assert os.stat("input/dir1/subdir/hardlink").st_nlink == 2
assert open("input/dir1/subdir/hardlink", "rb").read() == b"123456"
assert os.stat("input/dir1/aaaa").st_nlink == 2
assert os.stat("input/dir1/source2").st_nlink == 2
with fuse_mount2(archiver, mountpoint, "-a", "test", *ignore_perms):
with changedir(os.path.join(mountpoint, "test")):
assert os.stat("input/source").st_nlink == 4
assert os.stat("input/abba").st_nlink == 4
assert os.stat("input/dir1/hardlink").st_nlink == 4
assert os.stat("input/dir1/subdir/hardlink").st_nlink == 4
assert open("input/dir1/subdir/hardlink", "rb").read() == b"123456"
@pytest.mark.skipif(mfusepy is None, reason="mfusepy not installed")
def test_fuse_duplicate_name(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
cmd(archiver, "create", "duplicate", "input")
cmd(archiver, "create", "duplicate", "input")
cmd(archiver, "create", "unique1", "input")
cmd(archiver, "create", "unique2", "input")
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
# mount the whole repository, archives show up as toplevel directories:
with fuse_mount2(archiver, mountpoint):
path = os.path.join(mountpoint)
dirs = os.listdir(path)
assert len(set(dirs)) == 4 # there must be 4 unique dir names for 4 archives
assert "unique1" in dirs # if an archive has a unique name, do not append the archive id
assert "unique2" in dirs
@pytest.mark.skipif(mfusepy is None, reason="mfusepy not installed")
def test_fuse_allow_damaged_files(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
create_src_archive(archiver, "archive")
# Get rid of a chunk
archive, repository = open_archive(archiver.repository_path, "archive")
with repository:
for item in archive.iter_items():
if item.path.endswith(src_file):
repository.delete(item.chunks[-1].id)
path = item.path # store full path for later
break
else:
assert False # missed the file
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
with fuse_mount2(archiver, mountpoint, "-a", "archive"):
with open(os.path.join(mountpoint, "archive", path), "rb") as f:
with pytest.raises(OSError) as excinfo:
f.read()
assert excinfo.value.errno == errno.EIO
with fuse_mount2(archiver, mountpoint, "-a", "archive", "-o", "allow_damaged_files"):
with open(os.path.join(mountpoint, "archive", path), "rb") as f:
# no exception raised, missing data will be all-zero
data = f.read()
assert data.endswith(b"\0\0")
@pytest.mark.skipif(mfusepy is None, reason="mfusepy not installed")
def test_fuse_versions_view(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
create_regular_file(archiver.input_path, "test", contents=b"first")
if are_hardlinks_supported():
create_regular_file(archiver.input_path, "hardlink1", contents=b"123456")
os.link("input/hardlink1", "input/hardlink2")
os.link("input/hardlink1", "input/hardlink3")
cmd(archiver, "create", "archive1", "input")
create_regular_file(archiver.input_path, "test", contents=b"second")
cmd(archiver, "create", "archive2", "input")
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
# mount the whole repository, archive contents shall show up in versioned view:
with fuse_mount2(archiver, mountpoint, "-o", "versions"):
path = os.path.join(mountpoint, "input", "test") # filename shows up as directory ...
files = os.listdir(path)
assert all(f.startswith("test.") for f in files) # ... with files test.xxxxx in there
assert {b"first", b"second"} == {open(os.path.join(path, f), "rb").read() for f in files}
if are_hardlinks_supported():
hl1 = os.path.join(mountpoint, "input", "hardlink1", "hardlink1.00001")
hl2 = os.path.join(mountpoint, "input", "hardlink2", "hardlink2.00001")
hl3 = os.path.join(mountpoint, "input", "hardlink3", "hardlink3.00001")
# Note: In fuse2.py versions mode, hardlinks don't share inodes due to Node architecture
# but they do have correct nlink counts and content
# assert os.stat(hl1).st_ino == os.stat(hl2).st_ino == os.stat(hl3).st_ino
assert os.stat(hl1).st_nlink == 3
assert os.stat(hl2).st_nlink == 3
assert os.stat(hl3).st_nlink == 3
assert open(hl3, "rb").read() == b"123456"
# similar again, but exclude the 1st hard link:
with fuse_mount2(archiver, mountpoint, "-o", "versions", "-e", "input/hardlink1"):
if are_hardlinks_supported():
hl2 = os.path.join(mountpoint, "input", "hardlink2", "hardlink2.00001")
hl3 = os.path.join(mountpoint, "input", "hardlink3", "hardlink3.00001")
# Note: Same limitation as above
# assert os.stat(hl2).st_ino == os.stat(hl3).st_ino
assert os.stat(hl2).st_nlink == 2
assert os.stat(hl3).st_nlink == 2
assert open(hl3, "rb").read() == b"123456"
@pytest.mark.skipif(mfusepy is None, reason="mfusepy not installed")
def test_fuse_mount_options(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
create_src_archive(archiver, "arch11")
create_src_archive(archiver, "arch12")
create_src_archive(archiver, "arch21")
create_src_archive(archiver, "arch22")
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
with fuse_mount2(archiver, mountpoint, "--first=2", "--sort-by=name"):
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch11", "arch12"]
with fuse_mount2(archiver, mountpoint, "--last=2", "--sort-by=name"):
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch21", "arch22"]
with fuse_mount2(archiver, mountpoint, "--match-archives=sh:arch1*"):
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch11", "arch12"]
with fuse_mount2(archiver, mountpoint, "--match-archives=sh:arch2*"):
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch21", "arch22"]
with fuse_mount2(archiver, mountpoint, "--match-archives=sh:arch*"):
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch11", "arch12", "arch21", "arch22"]
with fuse_mount2(archiver, mountpoint, "--match-archives=nope"):
assert sorted(os.listdir(os.path.join(mountpoint))) == []
def test_fuse2(archivers, request):
archiver = request.getfixturevalue(archivers)
if archiver.EXE and fakeroot_detected():
pytest.skip("test_fuse with the binary is not compatible with fakeroot")
def has_noatime(some_file):
atime_before = os.stat(some_file).st_atime_ns
try:
os.close(os.open(some_file, flags_noatime))
except PermissionError:
return False
else:
atime_after = os.stat(some_file).st_atime_ns
noatime_used = flags_noatime != flags_normal
return noatime_used and atime_before == atime_after
cmd(archiver, "repo-create", RK_ENCRYPTION)
create_test_files(archiver.input_path)
have_noatime = has_noatime("input/file1")
cmd(archiver, "create", "--atime", "archive", "input")
cmd(archiver, "create", "--atime", "archive2", "input")
if has_lchflags:
os.remove(os.path.join("input", "flagfile"))
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
# Mount specific archive
with fuse_mount2(archiver, mountpoint, "-a", "archive"):
# Check if archive is listed
assert "archive" in os.listdir(mountpoint)
# Check contents
assert_dirs_equal(
archiver.input_path, os.path.join(mountpoint, "archive", "input"), ignore_flags=True, ignore_xattrs=True
)
# Check details of a file
in_fn = "input/file1"
out_fn = os.path.join(mountpoint, "archive", "input", "file1")
sti1 = os.stat(in_fn)
sto1 = os.stat(out_fn)
assert sti1.st_mode == sto1.st_mode
assert sti1.st_uid == sto1.st_uid
assert sti1.st_gid == sto1.st_gid
assert sti1.st_size == sto1.st_size
# Check timestamps (nanosecond resolution)
# We enabled use_ns = True, so we expect high precision if supported
assert same_ts_ns(sti1.st_mtime * 1e9, sto1.st_mtime * 1e9)
assert same_ts_ns(sti1.st_ctime * 1e9, sto1.st_ctime * 1e9)
if have_noatime:
assert same_ts_ns(sti1.st_atime * 1e9, sto1.st_atime * 1e9)
# Read content
with open(in_fn, "rb") as f1, open(out_fn, "rb") as f2:
assert f1.read() == f2.read()
# Mount whole repository
with fuse_mount2(archiver, mountpoint):
assert_dirs_equal(
archiver.input_path, os.path.join(mountpoint, "archive", "input"), ignore_flags=True, ignore_xattrs=True
)
assert_dirs_equal(
archiver.input_path, os.path.join(mountpoint, "archive2", "input"), ignore_flags=True, ignore_xattrs=True
)
# Ignore permissions
with fuse_mount2(archiver, mountpoint, "-o", "ignore_permissions"):
assert_dirs_equal(
archiver.input_path, os.path.join(mountpoint, "archive", "input"), ignore_flags=True, ignore_xattrs=True
)
# Allow damaged files
with fuse_mount2(archiver, mountpoint, "-o", "allow_damaged_files"):
assert_dirs_equal(
archiver.input_path, os.path.join(mountpoint, "archive", "input"), ignore_flags=True, ignore_xattrs=True
)

View file

@ -1,3 +1,9 @@
# This file tests the mount/umount commands.
# The FUSE implementation used depends on the BORG_FUSE_IMPL environment variable:
# - BORG_FUSE_IMPL=pyfuse3,llfuse: Tests run with llfuse/pyfuse3 (skipped if not available)
# - BORG_FUSE_IMPL=mfusepy: Tests run with mfusepy (skipped if not available)
# The tox configuration (pyproject.toml) runs these tests with different BORG_FUSE_IMPL settings.
import errno
import os
import stat
@ -5,6 +11,11 @@ import sys
import pytest
try:
import mfusepy
except ImportError:
mfusepy = None
from ... import xattr, platform
from ...constants import * # NOQA
from ...platform import ENOATTR
@ -21,7 +32,7 @@ pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds
@requires_hardlinks
@pytest.mark.skipif(not llfuse, reason="llfuse not installed")
@pytest.mark.skipif(not llfuse and not mfusepy, reason="FUSE not available")
def test_fuse_mount_hardlinks(archivers, request):
archiver = request.getfixturevalue(archivers)
_extract_hardlinks_setup(archiver)
@ -59,7 +70,7 @@ def test_fuse_mount_hardlinks(archivers, request):
assert open("input/dir1/subdir/hardlink", "rb").read() == b"123456"
@pytest.mark.skipif(not llfuse, reason="llfuse not installed")
@pytest.mark.skipif(not llfuse and not mfusepy, reason="FUSE not available")
def test_fuse(archivers, request):
archiver = request.getfixturevalue(archivers)
if archiver.EXE and fakeroot_detected():
@ -167,7 +178,7 @@ def test_fuse(archivers, request):
raise
@pytest.mark.skipif(not llfuse, reason="llfuse not installed")
@pytest.mark.skipif(not llfuse and not mfusepy, reason="FUSE not available")
def test_fuse_versions_view(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
@ -201,7 +212,7 @@ def test_fuse_versions_view(archivers, request):
assert open(hl3, "rb").read() == b"123456"
@pytest.mark.skipif(not llfuse, reason="llfuse not installed")
@pytest.mark.skipif(not llfuse and not mfusepy, reason="FUSE not available")
def test_fuse_duplicate_name(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
@ -219,7 +230,7 @@ def test_fuse_duplicate_name(archivers, request):
assert "unique2" in dirs
@pytest.mark.skipif(not llfuse, reason="llfuse not installed")
@pytest.mark.skipif(not llfuse and not mfusepy, reason="FUSE not available")
def test_fuse_allow_damaged_files(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
@ -249,7 +260,7 @@ def test_fuse_allow_damaged_files(archivers, request):
assert data.endswith(b"\0\0")
@pytest.mark.skipif(not llfuse, reason="llfuse not installed")
@pytest.mark.skipif(not llfuse and not mfusepy, reason="FUSE not available")
def test_fuse_mount_options(archivers, request):
archiver = request.getfixturevalue(archivers)
cmd(archiver, "repo-create", RK_ENCRYPTION)
@ -272,7 +283,7 @@ def test_fuse_mount_options(archivers, request):
assert sorted(os.listdir(os.path.join(mountpoint))) == []
@pytest.mark.skipif(not llfuse, reason="llfuse not installed")
@pytest.mark.skipif(not llfuse and not mfusepy, reason="FUSE not available")
def test_migrate_lock_alive(archivers, request):
"""Both old_id and new_id must not be stale during lock migration / daemonization."""
archiver = request.getfixturevalue(archivers)