This commit is contained in:
🇺🇦 Sviatoslav Sydorenko (Святослав Сидоренко) 2026-02-03 19:16:27 -06:00 committed by GitHub
commit 208d5ee8f0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 480 additions and 266 deletions

View file

@ -53,33 +53,12 @@ if t.TYPE_CHECKING:
ConcreteArtifactsManager,
)
ManifestKeysType = t.Literal[
'collection_info', 'file_manifest_file', 'format',
]
FileMetaKeysType = t.Literal[
'name',
'ftype',
'chksum_type',
'chksum_sha256',
'format',
]
CollectionInfoKeysType = t.Literal[
# collection meta:
'namespace', 'name', 'version',
'authors', 'readme',
'tags', 'description',
'license', 'license_file',
'dependencies',
'repository', 'documentation',
'homepage', 'issues',
# files meta:
FileMetaKeysType,
]
ManifestValueType = t.Dict[CollectionInfoKeysType, t.Union[int, str, t.List[str], t.Dict[str, str], None]]
CollectionManifestType = t.Dict[ManifestKeysType, ManifestValueType]
FileManifestEntryType = t.Dict[FileMetaKeysType, t.Union[str, int, None]]
FilesManifestType = t.Dict[t.Literal['files', 'format'], t.Union[t.List[FileManifestEntryType], int]]
from ._types import (
CollectionManifestType,
DisplayQueueType,
DisplayThreadProto,
FilesManifestType,
)
import ansible.constants as C
from ansible.errors import AnsibleError
@ -130,8 +109,8 @@ from ansible.utils.collection_loader import AnsibleCollectionRef
from ansible.utils.display import Display
from ansible.utils.hashing import secure_hash, secure_hash_s
from ._types import ManifestMetadataType
display = Display()
MANIFEST_FORMAT = 1
MANIFEST_FILENAME = 'MANIFEST.json'
@ -141,12 +120,27 @@ ModifiedContent = namedtuple('ModifiedContent', ['filename', 'expected', 'instal
SIGNATURE_COUNT_RE = r"^(?P<strict>\+)?(?:(?P<count>\d+)|(?P<all>all))$"
class DisplayThread:
def __init__(self, display_queue: DisplayQueueType) -> None:
self.display_queue = display_queue
def __getattr__(self, attr: str) -> t.Callable:
def call_display(*args, **kwargs) -> None:
self.display_queue.put((attr, args, kwargs))
return call_display
display: Display | DisplayThreadProto = Display()
@dataclass
class ManifestControl:
directives: list[str] = None
directives: list[str]
omit_default_directives: bool = False
def __post_init__(self):
def __post_init__(self) -> None:
# Allow a dict representing this dataclass to be splatted directly.
# Requires attrs to have a default value, so anything with a default
# of None is swapped for its, potentially mutable, default
@ -156,25 +150,31 @@ class ManifestControl:
class CollectionSignatureError(Exception):
def __init__(self, reasons=None, stdout=None, rc=None, ignore=False):
def __init__(
self,
reasons: t.Iterable | None = None,
stdout: str | None = None,
rc: int | None = None,
ignore: bool = False,
) -> None:
self.reasons = reasons
self.stdout = stdout
self.rc = rc
self.ignore = ignore
self._reason_wrapper = None
self._reason_wrapper: textwrap.TextWrapper | None = None
def _report_unexpected(self, collection_name):
def _report_unexpected(self, collection_name: str) -> str:
return (
f"Unexpected error for '{collection_name}': "
f"GnuPG signature verification failed with the return code {self.rc} and output {self.stdout}"
)
def _report_expected(self, collection_name):
def _report_expected(self, collection_name: str) -> str:
header = f"Signature verification failed for '{collection_name}' (return code {self.rc}):"
return header + self._format_reasons()
def _format_reasons(self):
def _format_reasons(self) -> str:
if self._reason_wrapper is None:
self._reason_wrapper = textwrap.TextWrapper(
initial_indent=" * ", # 6 chars
@ -183,12 +183,12 @@ class CollectionSignatureError(Exception):
wrapped_reasons = [
'\n'.join(self._reason_wrapper.wrap(reason))
for reason in self.reasons
for reason in self.reasons or ()
]
return '\n' + '\n'.join(wrapped_reasons)
def report(self, collection_name):
def report(self, collection_name: str) -> str:
if self.reasons:
return self._report_expected(collection_name)
@ -202,8 +202,11 @@ class CollectionVerifyResult:
self.success = True
def verify_local_collection(local_collection, remote_collection, artifacts_manager):
# type: (Candidate, t.Optional[Candidate], ConcreteArtifactsManager) -> CollectionVerifyResult
def verify_local_collection(
local_collection: Candidate,
remote_collection: Candidate | None,
artifacts_manager: ConcreteArtifactsManager,
) -> CollectionVerifyResult:
"""Verify integrity of the locally installed collection.
:param local_collection: Collection being checked.
@ -221,7 +224,7 @@ def verify_local_collection(local_collection, remote_collection, artifacts_manag
format(path=to_text(local_collection.src)),
)
modified_content = [] # type: list[ModifiedContent]
modified_content: list[ModifiedContent] = []
verify_local_only = remote_collection is None
@ -377,8 +380,14 @@ def verify_local_collection(local_collection, remote_collection, artifacts_manag
return result
def verify_file_signatures(fqcn, manifest_file, detached_signatures, keyring, required_successful_count, ignore_signature_errors):
# type: (str, str, list[str], str, str, list[str]) -> bool
def verify_file_signatures(
fqcn: str,
manifest_file: str,
detached_signatures: list[str],
keyring: str,
required_successful_count: str,
ignore_signature_errors: list[str],
) -> bool:
successful = 0
error_messages = []
@ -427,8 +436,12 @@ def verify_file_signatures(fqcn, manifest_file, detached_signatures, keyring, re
return verified
def verify_file_signature(manifest_file, detached_signature, keyring, ignore_signature_errors):
# type: (str, str, str, list[str]) -> None
def verify_file_signature(
manifest_file: str,
detached_signature: str,
keyring: str,
ignore_signature_errors: list[str],
) -> None:
"""Run the gpg command and parse any errors. Raises CollectionSignatureError on failure."""
gpg_result, gpg_verification_rc = run_gpg_verify(manifest_file, detached_signature, keyring, display)
@ -459,8 +472,7 @@ def verify_file_signature(manifest_file, detached_signature, keyring, ignore_sig
return None
def build_collection(u_collection_path, u_output_path, force):
# type: (str, str, bool) -> str
def build_collection(u_collection_path: str, u_output_path: str, force: bool) -> str:
"""Creates the Ansible collection artifact in a .tar.gz file.
:param u_collection_path: The path to the collection to build. This should be the directory that contains the
@ -478,11 +490,14 @@ def build_collection(u_collection_path, u_output_path, force):
collection_manifest = _build_manifest(**collection_meta)
file_manifest = _build_files_manifest(
b_collection_path,
collection_meta['namespace'], # type: ignore[arg-type]
collection_meta['name'], # type: ignore[arg-type]
collection_meta['build_ignore'], # type: ignore[arg-type]
collection_meta['manifest'], # type: ignore[arg-type]
collection_meta['license_file'], # type: ignore[arg-type]
collection_meta['namespace'],
collection_meta['name'],
t.cast(list[str], collection_meta.get('build_ignore', [])),
t.cast(
ManifestMetadataType | t.Type[Sentinel],
collection_meta.get('manifest', Sentinel),
),
collection_meta['license_file'],
)
artifact_tarball_file_name = '{ns!s}-{name!s}-{ver!s}.tar.gz'.format(
@ -508,13 +523,13 @@ def build_collection(u_collection_path, u_output_path, force):
def download_collections(
collections, # type: t.Iterable[Requirement]
output_path, # type: str
apis, # type: t.Iterable[GalaxyAPI]
no_deps, # type: bool
allow_pre_release, # type: bool
artifacts_manager, # type: ConcreteArtifactsManager
): # type: (...) -> None
collections: t.Iterable[Requirement],
output_path: str,
apis: t.Iterable[GalaxyAPI],
no_deps: bool,
allow_pre_release: bool,
artifacts_manager: ConcreteArtifactsManager,
) -> None:
"""Download Ansible collections as their tarball from a Galaxy server to the path specified and creates a requirements
file of the downloaded requirements to be used for an install.
@ -609,7 +624,7 @@ def download_collections(
req_fd.write(yaml_bytes)
def publish_collection(collection_path, api, wait, timeout):
def publish_collection(collection_path, api, wait, timeout) -> None:
"""Publish an Ansible collection tarball into an Ansible Galaxy server.
:param collection_path: The path to the collection tarball to publish.
@ -634,20 +649,20 @@ def publish_collection(collection_path, api, wait, timeout):
def install_collections(
collections, # type: t.Iterable[Requirement]
output_path, # type: str
apis, # type: t.Iterable[GalaxyAPI]
ignore_errors, # type: bool
no_deps, # type: bool
force, # type: bool
force_deps, # type: bool
upgrade, # type: bool
allow_pre_release, # type: bool
artifacts_manager, # type: ConcreteArtifactsManager
disable_gpg_verify, # type: bool
offline, # type: bool
read_requirement_paths, # type: set[str]
): # type: (...) -> None
collections: t.Iterable[Requirement],
output_path: str,
apis: t.Iterable[GalaxyAPI],
ignore_errors: bool,
no_deps: bool,
force: bool,
force_deps: bool,
upgrade: bool,
allow_pre_release: bool,
artifacts_manager: ConcreteArtifactsManager,
disable_gpg_verify: bool,
offline: bool,
read_requirement_paths: set[str],
) -> None:
"""Install Ansible collections to the path specified.
:param collections: The collections to install.
@ -776,7 +791,7 @@ def install_collections(
# NOTE: imported in ansible.cli.galaxy
def validate_collection_name(name): # type: (str) -> str
def validate_collection_name(name: str) -> str:
"""Validates the collection name as an input from the user or a requirements file fit the requirements.
:param name: The input name with optional range specifier split by ':'.
@ -793,7 +808,7 @@ def validate_collection_name(name): # type: (str) -> str
# NOTE: imported in ansible.cli.galaxy
def validate_collection_path(collection_path): # type: (str) -> str
def validate_collection_path(collection_path: str) -> str:
"""Ensure a given path ends with 'ansible_collections'
:param collection_path: The path that should end in 'ansible_collections'
@ -807,13 +822,13 @@ def validate_collection_path(collection_path): # type: (str) -> str
def verify_collections(
collections, # type: t.Iterable[Requirement]
search_paths, # type: t.Iterable[str]
apis, # type: t.Iterable[GalaxyAPI]
ignore_errors, # type: bool
local_verify_only, # type: bool
artifacts_manager, # type: ConcreteArtifactsManager
): # type: (...) -> list[CollectionVerifyResult]
collections: t.Iterable[Requirement],
search_paths: t.Iterable[str],
apis: t.Iterable[GalaxyAPI],
ignore_errors: bool,
local_verify_only: bool,
artifacts_manager: ConcreteArtifactsManager,
) -> list[CollectionVerifyResult]:
r"""Verify the integrity of locally installed collections.
:param collections: The collections to check.
@ -824,7 +839,7 @@ def verify_collections(
:param artifacts_manager: Artifacts manager.
:return: list of CollectionVerifyResult objects describing the results of each collection verification
"""
results = [] # type: list[CollectionVerifyResult]
results: list[CollectionVerifyResult] = []
api_proxy = MultiGalaxyAPIProxy(apis, artifacts_manager)
@ -954,7 +969,7 @@ def _tempdir():
@contextmanager
def _display_progress(msg=None):
def _display_progress(msg: str | None = None) -> t.Iterator[None]:
config_display = C.GALAXY_DISPLAY_PROGRESS
display_wheel = sys.stdout.isatty() if config_display is None else config_display
@ -966,7 +981,7 @@ def _display_progress(msg=None):
yield
return
def progress(display_queue, actual_display):
def progress(display_queue: DisplayQueueType, actual_display: Display) -> None:
actual_display.debug("Starting display_progress display thread")
t = threading.current_thread()
@ -989,21 +1004,10 @@ def _display_progress(msg=None):
actual_display.debug("Received end signal for display_progress display thread")
return
class DisplayThread(object):
def __init__(self, display_queue):
self.display_queue = display_queue
def __getattr__(self, attr):
def call_display(*args, **kwargs):
self.display_queue.put((attr, args, kwargs))
return call_display
# Temporary override the global display class with our own which add the calls to a queue for the thread to call.
old_display = display
try:
display_queue = queue.Queue()
display_queue: DisplayQueueType = queue.Queue()
display = DisplayThread(display_queue)
t = threading.Thread(target=progress, args=(display_queue, old_display))
t.daemon = True
@ -1021,7 +1025,7 @@ def _display_progress(msg=None):
display = old_display
def _verify_file_hash(b_path, filename, expected_hash, error_queue):
def _verify_file_hash(b_path, filename: str, expected_hash, error_queue: list[ModifiedContent]) -> None:
b_file_path = to_bytes(os.path.join(to_text(b_path), filename), errors='surrogate_or_strict')
if not os.path.isfile(b_file_path):
@ -1034,7 +1038,7 @@ def _verify_file_hash(b_path, filename, expected_hash, error_queue):
error_queue.append(ModifiedContent(filename=filename, expected=expected_hash, installed=actual_hash))
def _make_manifest():
def _make_manifest() -> FilesManifestType:
return {
'files': [
{
@ -1049,7 +1053,7 @@ def _make_manifest():
}
def _make_entry(name, ftype, chksum_type='sha256', chksum=None):
def _make_entry(name: str, ftype: str, chksum_type='sha256', chksum=None):
return {
'name': name,
'ftype': ftype,
@ -1059,9 +1063,14 @@ def _make_entry(name, ftype, chksum_type='sha256', chksum=None):
}
def _build_files_manifest(b_collection_path, namespace, name, ignore_patterns,
manifest_control, license_file):
# type: (bytes, str, str, list[str], dict[str, t.Any], t.Optional[str]) -> FilesManifestType
def _build_files_manifest(
b_collection_path: bytes,
namespace: str,
name: str,
ignore_patterns: list[str],
manifest_control: ManifestMetadataType | t.Type[Sentinel],
license_file: str | None,
) -> FilesManifestType:
if ignore_patterns and manifest_control is not Sentinel:
raise AnsibleError('"build_ignore" and "manifest" are mutually exclusive')
@ -1070,24 +1079,25 @@ def _build_files_manifest(b_collection_path, namespace, name, ignore_patterns,
b_collection_path,
namespace,
name,
manifest_control,
t.cast(ManifestMetadataType, manifest_control), # no narrowing??
license_file,
)
return _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patterns)
def _build_files_manifest_distlib(b_collection_path, namespace, name, manifest_control,
license_file):
# type: (bytes, str, str, dict[str, t.Any], t.Optional[str]) -> FilesManifestType
def _build_files_manifest_distlib(
b_collection_path: bytes,
namespace: str,
name: str,
manifest_control: ManifestMetadataType,
license_file: str | None,
) -> FilesManifestType:
if not HAS_DISTLIB:
raise AnsibleError('Use of "manifest" requires the python "distlib" library')
if manifest_control is None:
manifest_control = {}
try:
control = ManifestControl(**manifest_control)
control = ManifestControl(**(manifest_control or {'directives': []}))
except TypeError as ex:
raise AnsibleError(f'Invalid "manifest" provided: {ex}')
@ -1180,8 +1190,12 @@ def _build_files_manifest_distlib(b_collection_path, namespace, name, manifest_c
return manifest
def _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patterns):
# type: (bytes, str, str, list[str]) -> FilesManifestType
def _build_files_manifest_walk(
b_collection_path: bytes,
namespace: str,
name: str,
ignore_patterns: list[str],
) -> FilesManifestType:
# We always ignore .pyc and .retry files as well as some well known version control directories. The ignore
# patterns can be extended by the build_ignore key in galaxy.yml
b_ignore_patterns = [
@ -1207,7 +1221,7 @@ def _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patter
b_rel_base_dir = os.path.relpath(b_path, common_prefix)
return b_rel_base_dir.lstrip(os.path.sep.encode())
def _walk(b_path, b_top_level_dir):
def _walk(b_path: bytes, b_top_level_dir: bytes) -> None:
b_rel_base_dir = _discover_relative_base_directory(b_path, b_top_level_dir)
for b_item in os.listdir(b_path):
b_abs_path = os.path.join(b_path, b_item)
@ -1254,9 +1268,23 @@ def _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patter
# FIXME: accept a dict produced from `galaxy.yml` instead of separate args
def _build_manifest(namespace, name, version, authors, readme, tags, description, license_file,
dependencies, repository, documentation, homepage, issues, **kwargs):
manifest = {
def _build_manifest(
namespace: str,
name: str,
version: str | None,
authors: list[str],
readme: str,
tags: list[str],
description: str,
license_file: str,
dependencies: dict[str, str],
repository: str,
documentation: str,
homepage: str,
issues: str,
**kwargs: str,
) -> CollectionManifestType:
return {
'collection_info': {
'namespace': namespace,
'name': name,
@ -1283,19 +1311,29 @@ def _build_manifest(namespace, name, version, authors, readme, tags, description
'format': MANIFEST_FORMAT,
}
return manifest
def _build_collection_tar(
b_collection_path, # type: bytes
b_tar_path, # type: bytes
collection_manifest, # type: CollectionManifestType
file_manifest, # type: FilesManifestType
): # type: (...) -> str
b_collection_path: bytes,
b_tar_path: bytes,
collection_manifest: CollectionManifestType,
file_manifest: FilesManifestType,
) -> str:
"""Build a tar.gz collection artifact from the manifest data."""
files_manifest_json = to_bytes(json.dumps(file_manifest, indent=True), errors='surrogate_or_strict')
collection_manifest['file_manifest_file']['chksum_sha256'] = secure_hash_s(files_manifest_json, hash_func=sha256)
collection_manifest_json = to_bytes(json.dumps(collection_manifest, indent=True), errors='surrogate_or_strict')
collection_manifest_with_hash = {
**collection_manifest,
'file_manifest_file': {
**collection_manifest['file_manifest_file'],
'chksum_sha256': secure_hash_s(
files_manifest_json,
hash_func=sha256,
),
},
}
collection_manifest_json = to_bytes(
json.dumps(collection_manifest_with_hash, indent=True),
errors='surrogate_or_strict',
)
with _tempdir() as b_temp_path:
b_tar_filepath = os.path.join(b_temp_path, os.path.basename(b_tar_path))
@ -1310,7 +1348,7 @@ def _build_collection_tar(
tar_info.mode = S_IRWU_RG_RO
tar_file.addfile(tarinfo=tar_info, fileobj=b_io)
for file_info in file_manifest['files']: # type: ignore[union-attr]
for file_info in file_manifest['files']:
if file_info['name'] == '.':
continue
@ -1318,7 +1356,7 @@ def _build_collection_tar(
filename = to_native(file_info['name'], errors='surrogate_or_strict')
b_src_path = os.path.join(b_collection_path, to_bytes(filename, errors='surrogate_or_strict'))
def reset_stat(tarinfo):
def reset_stat(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo:
if tarinfo.type != tarfile.SYMTYPE:
existing_is_exec = tarinfo.mode & stat.S_IXUSR
tarinfo.mode = S_IRWXU_RXG_RXO if existing_is_exec or tarinfo.isdir() else S_IRWU_RG_RO
@ -1358,7 +1396,12 @@ def _build_collection_tar(
return tar_path
def _build_collection_dir(b_collection_path, b_collection_output, collection_manifest, file_manifest):
def _build_collection_dir(
b_collection_path: bytes,
b_collection_output: bytes,
collection_manifest: CollectionManifestType,
file_manifest: FilesManifestType,
) -> str:
"""Build a collection directory from the manifest data.
This should follow the same pattern as _build_collection_tar.
@ -1366,8 +1409,20 @@ def _build_collection_dir(b_collection_path, b_collection_output, collection_man
os.makedirs(b_collection_output, mode=S_IRWXU_RXG_RXO)
files_manifest_json = to_bytes(json.dumps(file_manifest, indent=True), errors='surrogate_or_strict')
collection_manifest['file_manifest_file']['chksum_sha256'] = secure_hash_s(files_manifest_json, hash_func=sha256)
collection_manifest_json = to_bytes(json.dumps(collection_manifest, indent=True), errors='surrogate_or_strict')
collection_manifest_with_hash = {
**collection_manifest,
'file_manifest_file': {
**collection_manifest['file_manifest_file'],
'chksum_sha256': secure_hash_s(
files_manifest_json,
hash_func=sha256,
),
},
}
collection_manifest_json = to_bytes(
json.dumps(collection_manifest_with_hash, indent=True),
errors='surrogate_or_strict',
)
# Write contents to the files
for name, b in [(MANIFEST_FILENAME, collection_manifest_json), ('FILES.json', files_manifest_json)]:
@ -1407,7 +1462,7 @@ def _build_collection_dir(b_collection_path, b_collection_output, collection_man
return collection_output
def _normalize_collection_path(path):
def _normalize_collection_path(path: pathlib.Path | str) -> pathlib.Path:
str_path = path.as_posix() if isinstance(path, pathlib.Path) else path
return pathlib.Path(
# This is annoying, but GalaxyCLI._resolve_path did it
@ -1415,18 +1470,24 @@ def _normalize_collection_path(path):
).expanduser().absolute()
def find_existing_collections(path_filter, artifacts_manager, namespace_filter=None, collection_filter=None, dedupe=True):
def find_existing_collections(
path_filter: str | t.Sequence[str],
artifacts_manager: ConcreteArtifactsManager,
namespace_filter: str | t.Sequence[str] | None = None,
collection_filter: str | t.Sequence[str] | None = None,
dedupe: bool = True,
) -> t.Iterable[Candidate]:
"""Locate all collections under a given path.
:param path: Collection dirs layout search path.
:param artifacts_manager: Artifacts manager.
"""
if path_filter and not is_sequence(path_filter):
path_filter = [path_filter]
path_filter = [t.cast(str, path_filter)]
if namespace_filter and not is_sequence(namespace_filter):
namespace_filter = [namespace_filter]
namespace_filter = [t.cast(str, namespace_filter)]
if collection_filter and not is_sequence(collection_filter):
collection_filter = [collection_filter]
collection_filter = [t.cast(str, collection_filter)]
paths = set()
for path in files('ansible_collections').glob('*/*/'):
@ -1483,8 +1544,11 @@ def find_existing_collections(path_filter, artifacts_manager, namespace_filter=N
yield req
def install(collection, path, artifacts_manager): # FIXME: mv to dataclasses?
# type: (Candidate, str, ConcreteArtifactsManager) -> None
def install( # FIXME: mv to dataclasses?
collection: Candidate,
path: str,
artifacts_manager: ConcreteArtifactsManager,
) -> None:
"""Install a collection under a given path.
:param collection: Collection to be installed.
@ -1529,8 +1593,11 @@ def install(collection, path, artifacts_manager): # FIXME: mv to dataclasses?
)
def write_source_metadata(collection, b_collection_path, artifacts_manager):
# type: (Candidate, bytes, ConcreteArtifactsManager) -> None
def write_source_metadata(
collection: Candidate,
b_collection_path: bytes,
artifacts_manager: ConcreteArtifactsManager,
) -> None:
source_data = artifacts_manager.get_galaxy_artifact_source_info(collection)
b_yaml_source_data = to_bytes(yaml_dump(source_data), errors='surrogate_or_strict')
@ -1552,7 +1619,7 @@ def write_source_metadata(collection, b_collection_path, artifacts_manager):
raise
def remove_source_metadata(collection, b_collection_path):
def remove_source_metadata(collection, b_collection_path) -> None:
pattern = f"{collection.namespace}.{collection.name}-*.info"
info_path = os.path.join(
b_collection_path,
@ -1568,8 +1635,13 @@ def remove_source_metadata(collection, b_collection_path):
pass
def verify_artifact_manifest(manifest_file, signatures, keyring, required_signature_count, ignore_signature_errors):
# type: (str, list[str], str, str, list[str]) -> None
def verify_artifact_manifest(
manifest_file: str,
signatures: list[str],
keyring: str,
required_signature_count: str,
ignore_signature_errors: list[str],
) -> None:
failed_verify = False
coll_path_parts = to_text(manifest_file, errors='surrogate_or_strict').split(os.path.sep)
collection_name = '%s.%s' % (coll_path_parts[-3], coll_path_parts[-2]) # get 'ns' and 'coll' from /path/to/ns/coll/MANIFEST.json
@ -1578,7 +1650,7 @@ def verify_artifact_manifest(manifest_file, signatures, keyring, required_signat
display.vvvv(f"GnuPG signature verification succeeded for {collection_name}")
def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatures, keyring, required_signature_count, ignore_signature_errors):
def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatures, keyring, required_signature_count, ignore_signature_errors) -> None:
"""Install a collection from tarball under a given path.
:param b_coll_targz_path: Collection tarball to be installed.
@ -1627,7 +1699,7 @@ def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatur
raise
def install_src(collection, b_collection_path, b_collection_output_path, artifacts_manager):
def install_src(collection, b_collection_path, b_collection_output_path, artifacts_manager) -> None:
r"""Install the collection from source control into given dir.
Generates the Ansible collection artifact data from a galaxy.yml and
@ -1669,7 +1741,7 @@ def install_src(collection, b_collection_path, b_collection_output_path, artifac
)
def _extract_tar_dir(tar, dirname, b_dest):
def _extract_tar_dir(tar: tarfile.TarFile, dirname, b_dest) -> None:
""" Extracts a directory from a collection tar. """
dirname = to_native(dirname, errors='surrogate_or_strict')
@ -1687,7 +1759,7 @@ def _extract_tar_dir(tar, dirname, b_dest):
b_link_path = to_bytes(tar_member.linkname, errors='surrogate_or_strict')
if not _is_child_path(b_link_path, b_dest, link_name=b_dir_path):
raise AnsibleError("Cannot extract symlink '%s' in collection: path points to location outside of "
"collection '%s'" % (to_native(dirname), b_link_path))
"collection %r" % (to_native(dirname), b_link_path))
os.symlink(b_link_path, b_dir_path)
@ -1696,7 +1768,13 @@ def _extract_tar_dir(tar, dirname, b_dest):
os.mkdir(b_dir_path, S_IRWXU_RXG_RXO)
def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
def _extract_tar_file(
tar: tarfile.TarFile,
filename: str,
b_dest: bytes,
b_temp_path: bytes,
expected_hash=None,
) -> None:
""" Extracts a file from a collection tar. """
with _get_tar_file_member(tar, filename) as (tar_member, tar_obj):
if tar_member.type == tarfile.SYMTYPE:
@ -1725,12 +1803,15 @@ def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
b_link_path = to_bytes(tar_member.linkname, errors='surrogate_or_strict')
if not _is_child_path(b_link_path, b_dest, link_name=b_dest_filepath):
raise AnsibleError("Cannot extract symlink '%s' in collection: path points to location outside of "
"collection '%s'" % (to_native(filename), b_link_path))
"collection %r" % (to_native(filename), b_link_path))
os.symlink(b_link_path, b_dest_filepath)
else:
shutil.move(to_bytes(tmpfile_obj.name, errors='surrogate_or_strict'), b_dest_filepath)
shutil.move(
to_native(tmpfile_obj.name, errors='surrogate_or_strict'),
to_native(b_dest_filepath, errors='surrogate_or_strict'),
)
# Default to rw-r--r-- and only add execute if the tar file has execute.
tar_member = tar.getmember(to_native(filename, errors='surrogate_or_strict'))
@ -1741,7 +1822,10 @@ def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
os.chmod(b_dest_filepath, new_mode)
def _get_tar_file_member(tar, filename):
def _get_tar_file_member(
tar: tarfile.TarFile,
filename: str,
) -> t.ContextManager[tuple[tarfile.TarInfo, t.IO[bytes] | None]]:
n_filename = to_native(filename, errors='surrogate_or_strict')
try:
member = tar.getmember(n_filename)
@ -1753,7 +1837,7 @@ def _get_tar_file_member(tar, filename):
return _tarfile_extract(tar, member)
def _get_json_from_tar_file(b_path, filename):
def _get_json_from_tar_file(b_path: bytes, filename: str) -> dict:
file_contents = ''
with tarfile.open(b_path, mode='r') as collection_tar:
@ -1767,19 +1851,19 @@ def _get_json_from_tar_file(b_path, filename):
return json.loads(file_contents)
def _get_tar_file_hash(b_path, filename):
def _get_tar_file_hash(b_path: bytes, filename: str) -> str:
with tarfile.open(b_path, mode='r') as collection_tar:
with _get_tar_file_member(collection_tar, filename) as (dummy, tar_obj):
return _consume_file(tar_obj)
def _get_file_hash(b_path, filename): # type: (bytes, str) -> str
def _get_file_hash(b_path: bytes, filename: str) -> str:
filepath = os.path.join(b_path, to_bytes(filename, errors='surrogate_or_strict'))
with open(filepath, 'rb') as fp:
return _consume_file(fp)
def _is_child_path(path, parent_path, link_name=None):
def _is_child_path(path: bytes, parent_path: bytes, link_name=None) -> bool:
""" Checks that path is a path within the parent_path specified. """
b_path = to_bytes(path, errors='surrogate_or_strict')
@ -1793,16 +1877,16 @@ def _is_child_path(path, parent_path, link_name=None):
def _resolve_depenency_map(
requested_requirements, # type: t.Iterable[Requirement]
galaxy_apis, # type: t.Iterable[GalaxyAPI]
concrete_artifacts_manager, # type: ConcreteArtifactsManager
preferred_candidates, # type: t.Iterable[Candidate] | None
no_deps, # type: bool
allow_pre_release, # type: bool
upgrade, # type: bool
include_signatures, # type: bool
offline, # type: bool
): # type: (...) -> dict[str, Candidate]
requested_requirements: t.Iterable[Requirement],
galaxy_apis: t.Iterable[GalaxyAPI],
concrete_artifacts_manager: ConcreteArtifactsManager,
preferred_candidates: t.Iterable[Candidate] | None,
no_deps: bool,
allow_pre_release: bool,
upgrade: bool,
include_signatures: bool,
offline: bool,
) -> dict[str, Candidate]:
"""Return the resolved dependency map."""
if not HAS_RESOLVELIB:
raise AnsibleError("Failed to import resolvelib, check that a supported version is installed")

View file

@ -0,0 +1,98 @@
# Copyright: (c) 2025, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""A collection of shared types for ``ansible.galaxy.collection``."""
from __future__ import annotations
import queue as _q
import typing as _t
import ansible.module_utils.compat.typing as _tc
from ansible.module_utils.common.sentinel import Sentinel as _Sentinel
DisplayQueueItemType: _t.TypeAlias = tuple[
str,
tuple[_t.Any, ...],
dict[str, _t.Any],
]
DisplayQueueType: _t.TypeAlias = _q.Queue[DisplayQueueItemType]
class DisplayThreadProto(_t.Protocol):
def __init__(self, display_queue: DisplayQueueType) -> None:
...
def __getattr__(self, attr: str) -> _t.Callable:
...
# FIXME: Use `TypedDict` from `typing_extension` with `closed=True` once
# FIXME: it's fixed for subclasses.
# Ref: https://github.com/python/typing_extensions/issues/686
class ManifestMetadataType(_t.TypedDict, total=False):
directives: _tc.ReadOnly[_t.Required[list[str]]]
omit_default_directives: _tc.ReadOnly[bool]
class _CollectionInfoTypeBase(_t.TypedDict, total=False):
namespace: _tc.ReadOnly[_t.Required[str]]
name: _tc.ReadOnly[_t.Required[str]]
# NOTE: `version: null` is only allowed for `galaxy.yml`
# NOTE: and not `MANIFEST.json`. The use-case for it is collections
# NOTE: that generate the version from Git before building a
# NOTE: distributable tarball artifact.
version: _tc.ReadOnly[_t.Required[str | None]]
authors: _tc.ReadOnly[_t.Required[list[str]]]
readme: _tc.ReadOnly[_t.Required[str]]
tags: _tc.ReadOnly[list[str]]
description: _tc.ReadOnly[str]
license: _tc.ReadOnly[str]
license_file: _tc.ReadOnly[str]
dependencies: _tc.ReadOnly[dict[str, str]]
repository: _tc.ReadOnly[str]
documentation: _tc.ReadOnly[str]
homepage: _tc.ReadOnly[str]
issues: _tc.ReadOnly[str]
# FIXME: Use `TypedDict` from `typing_extension` with `closed=True` once
# FIXME: it's fixed for subclasses.
# Ref: https://github.com/python/typing_extensions/issues/686
class _CollectionInfoWithBuildIgnoreType(_CollectionInfoTypeBase):
# `build_ignore` is mutually exclusive with `manifest`
build_ignore: _tc.ReadOnly[list[str]]
# FIXME: Use `TypedDict` from `typing_extension` with `closed=True` once
# FIXME: it's fixed for subclasses.
# Ref: https://github.com/python/typing_extensions/issues/686
class _CollectionInfoWithManifestType(_CollectionInfoTypeBase):
# `manifest` is mutually exclusive with `build_ignore`
manifest: _tc.ReadOnly[ManifestMetadataType | _t.Type[_Sentinel]]
CollectionInfoType = (
_CollectionInfoTypeBase
| _CollectionInfoWithBuildIgnoreType
| _CollectionInfoWithManifestType
)
class _FileManifestEntryType(_t.TypedDict):
name: _tc.ReadOnly[str]
ftype: _tc.ReadOnly[str]
chksum_type: _tc.ReadOnly[str | None]
chksum_sha256: _tc.ReadOnly[str | None]
format: _tc.ReadOnly[int]
class CollectionManifestType(_t.TypedDict):
collection_info: CollectionInfoType
file_manifest_file: _FileManifestEntryType
format: int
class FilesManifestType(_t.TypedDict):
files: list[_FileManifestEntryType]
format: int

View file

@ -42,6 +42,8 @@ from ansible.utils.display import Display
import ansible.constants as C
from ._types import CollectionInfoType
display = Display()
@ -61,29 +63,36 @@ class ConcreteArtifactsManager:
* caching all of above
* retrieving the metadata out of the downloaded artifacts
"""
def __init__(self, b_working_directory, validate_certs=True, keyring=None, timeout=60, required_signature_count=None, ignore_signature_errors=None):
# type: (bytes, bool, str, int, str, list[str]) -> None
def __init__(
self,
b_working_directory: bytes,
validate_certs: bool = True,
keyring: str | None = None,
timeout: int = 60,
required_signature_count: str | None = None,
ignore_signature_errors: list[str] | None = None,
) -> None:
"""Initialize ConcreteArtifactsManager caches and constraints."""
self._validate_certs = validate_certs # type: bool
self._artifact_cache = {} # type: dict[bytes, bytes]
self._galaxy_artifact_cache = {} # type: dict[Candidate | Requirement, bytes]
self._artifact_meta_cache = {} # type: dict[bytes, dict[str, str | list[str] | dict[str, str] | None | t.Type[Sentinel]]]
self._galaxy_collection_cache = {} # type: dict[Candidate | Requirement, tuple[str, str, GalaxyToken]]
self._galaxy_collection_origin_cache = {} # type: dict[Candidate, tuple[str, list[dict[str, str]]]]
self._b_working_directory = b_working_directory # type: bytes
self._supplemental_signature_cache = {} # type: dict[str, str]
self._keyring = keyring # type: str
self.timeout = timeout # type: int
self._required_signature_count = required_signature_count # type: str
self._ignore_signature_errors = ignore_signature_errors # type: list[str]
self._require_build_metadata = True # type: bool
self._validate_certs = validate_certs
self._artifact_cache: dict[bytes, bytes] = {}
self._galaxy_artifact_cache: dict[Candidate | Requirement, bytes] = {}
self._artifact_meta_cache: dict[bytes, CollectionInfoType] = {}
self._galaxy_collection_cache: dict[Candidate | Requirement, tuple[str, str, GalaxyToken]] = {}
self._galaxy_collection_origin_cache: dict[Candidate, tuple[str, list[dict[str, str]]]] = {}
self._b_working_directory = b_working_directory
self._supplemental_signature_cache: dict[str, str] = {}
self._keyring = keyring
self.timeout = timeout
self._required_signature_count = required_signature_count
self._ignore_signature_errors = ignore_signature_errors
self._require_build_metadata = True
@property
def keyring(self):
def keyring(self) -> str | None:
return self._keyring
@property
def required_successful_signature_count(self):
def required_successful_signature_count(self) -> str | None:
return self._required_signature_count
@property
@ -93,17 +102,17 @@ class ConcreteArtifactsManager:
return self._ignore_signature_errors
@property
def require_build_metadata(self):
# type: () -> bool
def require_build_metadata(self) -> bool:
return self._require_build_metadata
@require_build_metadata.setter
def require_build_metadata(self, value):
# type: (bool) -> None
def require_build_metadata(self, value: bool) -> None:
self._require_build_metadata = value
def get_galaxy_artifact_source_info(self, collection):
# type: (Candidate) -> dict[str, t.Union[str, list[dict[str, str]]]]
def get_galaxy_artifact_source_info(
self,
collection: Candidate,
) -> dict[str, str | list[dict[str, str]]]:
server = collection.src.api_server
try:
@ -126,8 +135,10 @@ class ConcreteArtifactsManager:
"signatures": signatures,
}
def get_galaxy_artifact_path(self, collection):
# type: (t.Union[Candidate, Requirement]) -> bytes
def get_galaxy_artifact_path(
self,
collection: Candidate | Requirement,
) -> bytes:
"""Given a Galaxy-stored collection, return a cached path.
If it's not yet on disk, this method downloads the artifact first.
@ -157,7 +168,7 @@ class ConcreteArtifactsManager:
expected_hash=sha256_hash,
validate_certs=self._validate_certs,
token=token,
) # type: bytes
)
except URLError as err:
raise AnsibleError(
'Failed to download collection tar '
@ -190,8 +201,7 @@ class ConcreteArtifactsManager:
self._galaxy_artifact_cache[collection] = b_artifact_path
return b_artifact_path
def get_artifact_path(self, collection):
# type: (Collection) -> bytes
def get_artifact_path(self, collection: Collection) -> bytes:
"""Given a concrete collection pointer, return a cached path.
If it's not yet on disk, this method downloads the artifact first.
@ -252,22 +262,27 @@ class ConcreteArtifactsManager:
self._artifact_cache[collection.src] = b_artifact_path
return b_artifact_path
def get_artifact_path_from_unknown(self, collection):
# type: (Candidate) -> bytes
def get_artifact_path_from_unknown(self, collection: Candidate) -> bytes:
if collection.is_concrete_artifact:
return self.get_artifact_path(collection)
return self.get_galaxy_artifact_path(collection)
def _get_direct_collection_namespace(self, collection):
# type: (Candidate) -> t.Optional[str]
return self.get_direct_collection_meta(collection)['namespace'] # type: ignore[return-value]
def _get_direct_collection_namespace(
self,
collection: Collection,
) -> str | None:
return self.get_direct_collection_meta(collection)['namespace']
def _get_direct_collection_name(self, collection):
# type: (Collection) -> t.Optional[str]
return self.get_direct_collection_meta(collection)['name'] # type: ignore[return-value]
def _get_direct_collection_name(
self,
collection: Collection,
) -> str | None:
return self.get_direct_collection_meta(collection)['name']
def get_direct_collection_fqcn(self, collection):
# type: (Collection) -> t.Optional[str]
def get_direct_collection_fqcn(
self,
collection: Collection,
) -> str | None:
"""Extract FQCN from the given on-disk collection artifact.
If the collection is virtual, ``None`` is returned instead
@ -277,26 +292,29 @@ class ConcreteArtifactsManager:
# NOTE: should it be something like "<virtual>"?
return None
return '.'.join(( # type: ignore[type-var]
self._get_direct_collection_namespace(collection), # type: ignore[arg-type]
return '.'.join((
self._get_direct_collection_namespace(collection),
self._get_direct_collection_name(collection),
))
def get_direct_collection_version(self, collection):
# type: (Collection) -> str
def get_direct_collection_version(self, collection: Collection) -> str:
"""Extract version from the given on-disk collection artifact."""
return self.get_direct_collection_meta(collection)['version'] # type: ignore[return-value]
return self.get_direct_collection_meta(collection)['version']
def get_direct_collection_dependencies(self, collection):
# type: (t.Union[Candidate, Requirement]) -> dict[str, str]
def get_direct_collection_dependencies(
self,
collection: Candidate | Requirement,
) -> dict[str, str]:
"""Extract deps from the given on-disk collection artifact."""
collection_dependencies = self.get_direct_collection_meta(collection)['dependencies']
if collection_dependencies is None:
collection_dependencies = {}
return collection_dependencies # type: ignore[return-value]
return collection_dependencies
def get_direct_collection_meta(self, collection):
# type: (Collection) -> dict[str, t.Union[str, dict[str, str], list[str], None, t.Type[Sentinel]]]
def get_direct_collection_meta(
self,
collection: Collection,
) -> CollectionInfoType:
"""Extract meta from the given on-disk collection artifact."""
try: # FIXME: use unique collection identifier as a cache key?
return self._artifact_meta_cache[collection.src]
@ -320,6 +338,8 @@ class ConcreteArtifactsManager:
'namespace': None,
'dependencies': {to_native(b_artifact_path): '*'},
'version': '*',
'authors': [], # required in `_CollectionInfoTypeBase`
'readme': '', # required in `_CollectionInfoTypeBase`
}
elif collection.is_subdirs:
collection_meta = {
@ -331,6 +351,8 @@ class ConcreteArtifactsManager:
'*',
),
'version': '*',
'authors': [], # required in `_CollectionInfoTypeBase`
'readme': '', # required in `_CollectionInfoTypeBase`
}
else:
raise RuntimeError
@ -338,8 +360,15 @@ class ConcreteArtifactsManager:
self._artifact_meta_cache[collection.src] = collection_meta
return collection_meta
def save_collection_source(self, collection, url, sha256_hash, token, signatures_url, signatures):
# type: (Candidate, str, str, GalaxyToken, str, list[dict[str, str]]) -> None
def save_collection_source(
self,
collection: Candidate,
url: str,
sha256_hash: str,
token: GalaxyToken,
signatures_url: str,
signatures: list[dict[str, str]],
) -> None:
"""Store collection URL, SHA256 hash and Galaxy API token.
This is a hook that is supposed to be called before attempting to
@ -352,13 +381,13 @@ class ConcreteArtifactsManager:
@contextmanager
def under_tmpdir(
cls,
temp_dir_base, # type: str
validate_certs=True, # type: bool
keyring=None, # type: str
required_signature_count=None, # type: str
ignore_signature_errors=None, # type: list[str]
require_build_metadata=True, # type: bool
): # type: (...) -> t.Iterator[ConcreteArtifactsManager]
temp_dir_base: str,
validate_certs: bool = True,
keyring: str | None = None,
required_signature_count: str | None = None,
ignore_signature_errors: list[str] | None = None,
require_build_metadata: bool = True,
) -> t.Iterator[ConcreteArtifactsManager]:
"""Custom ConcreteArtifactsManager constructor with temp dir.
This method returns a context manager that allocates and cleans
@ -410,7 +439,7 @@ def parse_scm(collection, version):
return name, version, path, fragment
def _extract_collection_from_git(repo_url, coll_ver, b_path):
def _extract_collection_from_git(repo_url, coll_ver, b_path: bytes) -> bytes:
name, version, git_url, fragment = parse_scm(repo_url, coll_ver)
b_checkout_path = mkdtemp(
dir=b_path,
@ -467,8 +496,14 @@ def _extract_collection_from_git(repo_url, coll_ver, b_path):
backoff_iterator=generate_jittered_backoff(retries=6, delay_base=2, delay_threshold=40),
should_retry_error=should_retry_error
)
def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeout=60):
# type: (str, bytes, t.Optional[str], bool, GalaxyToken, int) -> bytes
def _download_file(
url: str,
b_path: bytes,
expected_hash: str | None,
validate_certs: bool,
token: GalaxyToken | None = None,
timeout: int = 60,
) -> bytes:
# ^ NOTE: used in download and verify_collections ^
b_tarball_name = to_bytes(
url.rsplit('/', 1)[1], errors='surrogate_or_strict',
@ -478,7 +513,7 @@ def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeo
b_tarball_dir = mkdtemp(
dir=b_path,
prefix=b'-'.join((b_file_name, b'')),
) # type: bytes
)
b_file_path = os.path.join(b_tarball_dir, b_tarball_name)
@ -493,7 +528,7 @@ def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeo
timeout=timeout
)
with open(b_file_path, 'wb') as download_file: # type: t.BinaryIO
with open(b_file_path, 'wb') as download_file:
actual_hash = _consume_file(resp, write_to=download_file)
if expected_hash:
@ -508,8 +543,10 @@ def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeo
return b_file_path
def _consume_file(read_from, write_to=None):
# type: (t.BinaryIO, t.BinaryIO) -> str
def _consume_file(
read_from: t.IO[bytes],
write_to: t.IO[bytes] | None = None,
) -> str:
bufsize = 65536
sha256_digest = sha256()
data = read_from.read(bufsize)
@ -524,21 +561,20 @@ def _consume_file(read_from, write_to=None):
def _normalize_galaxy_yml_manifest(
galaxy_yml, # type: dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
b_galaxy_yml_path, # type: bytes
require_build_metadata=True, # type: bool
):
# type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
galaxy_yml_schema = (
get_collections_galaxy_meta_info()
) # type: list[dict[str, t.Any]] # FIXME: <--
# FIXME: 👆maybe precise type: list[dict[str, t.Union[bool, str, list[str]]]]
galaxy_yml: dict[
str,
str | list[str] | dict[str, str] | None | t.Type[Sentinel],
],
b_galaxy_yml_path: bytes,
require_build_metadata: bool = True,
) -> CollectionInfoType:
galaxy_yml_schema = get_collections_galaxy_meta_info()
mandatory_keys = set()
string_keys = set() # type: set[str]
list_keys = set() # type: set[str]
dict_keys = set() # type: set[str]
sentinel_keys = set() # type: set[str]
mandatory_keys: set[str] = set()
string_keys: set[str] = set()
list_keys: set[str] = set()
dict_keys: set[str] = set()
sentinel_keys: set[str] = set()
for info in galaxy_yml_schema:
if info.get('required', False):
@ -599,13 +635,13 @@ def _normalize_galaxy_yml_manifest(
if not galaxy_yml.get('version'):
galaxy_yml['version'] = '*'
return galaxy_yml
return t.cast(CollectionInfoType, galaxy_yml)
def _get_meta_from_dir(
b_path, # type: bytes
require_build_metadata=True, # type: bool
): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
b_path: bytes,
require_build_metadata: bool = True,
) -> CollectionInfoType:
try:
return _get_meta_from_installed_dir(b_path)
except LookupError:
@ -613,9 +649,9 @@ def _get_meta_from_dir(
def _get_meta_from_src_dir(
b_path, # type: bytes
require_build_metadata=True, # type: bool
): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
b_path: bytes,
require_build_metadata: bool = True,
) -> CollectionInfoType:
galaxy_yml = os.path.join(b_path, _GALAXY_YAML)
if not os.path.isfile(galaxy_yml):
raise LookupError(
@ -647,9 +683,9 @@ def _get_meta_from_src_dir(
def _get_json_from_installed_dir(
b_path, # type: bytes
filename, # type: str
): # type: (...) -> dict
b_path: bytes,
filename: str,
) -> dict:
b_json_filepath = os.path.join(b_path, to_bytes(filename, errors='surrogate_or_strict'))
@ -673,9 +709,7 @@ def _get_json_from_installed_dir(
return manifest
def _get_meta_from_installed_dir(
b_path, # type: bytes
): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
def _get_meta_from_installed_dir(b_path: bytes) -> CollectionInfoType:
manifest = _get_json_from_installed_dir(b_path, MANIFEST_FILENAME)
collection_info = manifest['collection_info']
@ -694,9 +728,7 @@ def _get_meta_from_installed_dir(
return collection_info
def _get_meta_from_tar(
b_path, # type: bytes
): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]]
def _get_meta_from_tar(b_path: bytes) -> CollectionInfoType:
if not os.path.exists(b_path):
raise AnsibleError(
f"Unable to find collection artifact file at '{to_native(b_path)}'."
@ -708,7 +740,7 @@ def _get_meta_from_tar(
format(path=to_native(b_path)),
)
with tarfile.open(b_path, mode='r') as collection_tar: # type: tarfile.TarFile
with tarfile.open(b_path, mode='r') as collection_tar:
try:
member = collection_tar.getmember(MANIFEST_FILENAME)
except KeyError:
@ -746,10 +778,9 @@ def _get_meta_from_tar(
@contextmanager
def _tarfile_extract(
tar, # type: tarfile.TarFile
member, # type: tarfile.TarInfo
):
# type: (...) -> t.Iterator[tuple[tarfile.TarInfo, t.Optional[t.IO[bytes]]]]
tar: tarfile.TarFile,
member: tarfile.TarInfo,
) -> t.Iterator[tuple[tarfile.TarInfo, t.IO[bytes] | None]]:
tar_obj = tar.extractfile(member)
try:
yield member, tar_obj

View file

@ -19,11 +19,12 @@ from urllib.error import HTTPError, URLError
if t.TYPE_CHECKING:
from ansible.utils.display import Display
from ._types import DisplayQueueType, DisplayThreadProto
def get_signature_from_source(
source: str,
display: Display | None = None,
display: Display | DisplayThreadProto | None = None,
) -> str:
if display is not None:
display.vvvv(f"Using signature at {source}")
@ -47,7 +48,7 @@ def run_gpg_verify(
manifest_file: str,
signature: str,
keyring: str,
display: Display,
display: Display | DisplayThreadProto,
) -> tuple[str, int]:
status_fd_read, status_fd_write = os.pipe()