From 00eabe340e8b511ecea5d9c00dd97269ceb02c3a Mon Sep 17 00:00:00 2001 From: Sviatoslav Sydorenko Date: Tue, 7 Oct 2025 21:04:19 +0200 Subject: [PATCH] Type-annotate `ansible.galaxy.collection` This patch is a result of `pyrefly autotype` with *a lot* of post-processing. --- lib/ansible/galaxy/collection/__init__.py | 414 +++++++++++------- lib/ansible/galaxy/collection/_types.py | 98 +++++ .../collection/concrete_artifact_manager.py | 229 +++++----- lib/ansible/galaxy/collection/gpg.py | 5 +- 4 files changed, 480 insertions(+), 266 deletions(-) create mode 100644 lib/ansible/galaxy/collection/_types.py diff --git a/lib/ansible/galaxy/collection/__init__.py b/lib/ansible/galaxy/collection/__init__.py index 433af9f5efa..eb376250430 100644 --- a/lib/ansible/galaxy/collection/__init__.py +++ b/lib/ansible/galaxy/collection/__init__.py @@ -53,33 +53,12 @@ if t.TYPE_CHECKING: ConcreteArtifactsManager, ) - ManifestKeysType = t.Literal[ - 'collection_info', 'file_manifest_file', 'format', - ] - FileMetaKeysType = t.Literal[ - 'name', - 'ftype', - 'chksum_type', - 'chksum_sha256', - 'format', - ] - CollectionInfoKeysType = t.Literal[ - # collection meta: - 'namespace', 'name', 'version', - 'authors', 'readme', - 'tags', 'description', - 'license', 'license_file', - 'dependencies', - 'repository', 'documentation', - 'homepage', 'issues', - - # files meta: - FileMetaKeysType, - ] - ManifestValueType = t.Dict[CollectionInfoKeysType, t.Union[int, str, t.List[str], t.Dict[str, str], None]] - CollectionManifestType = t.Dict[ManifestKeysType, ManifestValueType] - FileManifestEntryType = t.Dict[FileMetaKeysType, t.Union[str, int, None]] - FilesManifestType = t.Dict[t.Literal['files', 'format'], t.Union[t.List[FileManifestEntryType], int]] + from ._types import ( + CollectionManifestType, + DisplayQueueType, + DisplayThreadProto, + FilesManifestType, + ) import ansible.constants as C from ansible.errors import AnsibleError @@ -130,8 +109,8 @@ from ansible.utils.collection_loader import AnsibleCollectionRef from ansible.utils.display import Display from ansible.utils.hashing import secure_hash, secure_hash_s +from ._types import ManifestMetadataType -display = Display() MANIFEST_FORMAT = 1 MANIFEST_FILENAME = 'MANIFEST.json' @@ -141,12 +120,27 @@ ModifiedContent = namedtuple('ModifiedContent', ['filename', 'expected', 'instal SIGNATURE_COUNT_RE = r"^(?P\+)?(?:(?P\d+)|(?Pall))$" +class DisplayThread: + + def __init__(self, display_queue: DisplayQueueType) -> None: + self.display_queue = display_queue + + def __getattr__(self, attr: str) -> t.Callable: + def call_display(*args, **kwargs) -> None: + self.display_queue.put((attr, args, kwargs)) + + return call_display + + +display: Display | DisplayThreadProto = Display() + + @dataclass class ManifestControl: - directives: list[str] = None + directives: list[str] omit_default_directives: bool = False - def __post_init__(self): + def __post_init__(self) -> None: # Allow a dict representing this dataclass to be splatted directly. # Requires attrs to have a default value, so anything with a default # of None is swapped for its, potentially mutable, default @@ -156,25 +150,31 @@ class ManifestControl: class CollectionSignatureError(Exception): - def __init__(self, reasons=None, stdout=None, rc=None, ignore=False): + def __init__( + self, + reasons: t.Iterable | None = None, + stdout: str | None = None, + rc: int | None = None, + ignore: bool = False, + ) -> None: self.reasons = reasons self.stdout = stdout self.rc = rc self.ignore = ignore - self._reason_wrapper = None + self._reason_wrapper: textwrap.TextWrapper | None = None - def _report_unexpected(self, collection_name): + def _report_unexpected(self, collection_name: str) -> str: return ( f"Unexpected error for '{collection_name}': " f"GnuPG signature verification failed with the return code {self.rc} and output {self.stdout}" ) - def _report_expected(self, collection_name): + def _report_expected(self, collection_name: str) -> str: header = f"Signature verification failed for '{collection_name}' (return code {self.rc}):" return header + self._format_reasons() - def _format_reasons(self): + def _format_reasons(self) -> str: if self._reason_wrapper is None: self._reason_wrapper = textwrap.TextWrapper( initial_indent=" * ", # 6 chars @@ -183,12 +183,12 @@ class CollectionSignatureError(Exception): wrapped_reasons = [ '\n'.join(self._reason_wrapper.wrap(reason)) - for reason in self.reasons + for reason in self.reasons or () ] return '\n' + '\n'.join(wrapped_reasons) - def report(self, collection_name): + def report(self, collection_name: str) -> str: if self.reasons: return self._report_expected(collection_name) @@ -202,8 +202,11 @@ class CollectionVerifyResult: self.success = True -def verify_local_collection(local_collection, remote_collection, artifacts_manager): - # type: (Candidate, t.Optional[Candidate], ConcreteArtifactsManager) -> CollectionVerifyResult +def verify_local_collection( + local_collection: Candidate, + remote_collection: Candidate | None, + artifacts_manager: ConcreteArtifactsManager, +) -> CollectionVerifyResult: """Verify integrity of the locally installed collection. :param local_collection: Collection being checked. @@ -221,7 +224,7 @@ def verify_local_collection(local_collection, remote_collection, artifacts_manag format(path=to_text(local_collection.src)), ) - modified_content = [] # type: list[ModifiedContent] + modified_content: list[ModifiedContent] = [] verify_local_only = remote_collection is None @@ -377,8 +380,14 @@ def verify_local_collection(local_collection, remote_collection, artifacts_manag return result -def verify_file_signatures(fqcn, manifest_file, detached_signatures, keyring, required_successful_count, ignore_signature_errors): - # type: (str, str, list[str], str, str, list[str]) -> bool +def verify_file_signatures( + fqcn: str, + manifest_file: str, + detached_signatures: list[str], + keyring: str, + required_successful_count: str, + ignore_signature_errors: list[str], +) -> bool: successful = 0 error_messages = [] @@ -427,8 +436,12 @@ def verify_file_signatures(fqcn, manifest_file, detached_signatures, keyring, re return verified -def verify_file_signature(manifest_file, detached_signature, keyring, ignore_signature_errors): - # type: (str, str, str, list[str]) -> None +def verify_file_signature( + manifest_file: str, + detached_signature: str, + keyring: str, + ignore_signature_errors: list[str], +) -> None: """Run the gpg command and parse any errors. Raises CollectionSignatureError on failure.""" gpg_result, gpg_verification_rc = run_gpg_verify(manifest_file, detached_signature, keyring, display) @@ -459,8 +472,7 @@ def verify_file_signature(manifest_file, detached_signature, keyring, ignore_sig return None -def build_collection(u_collection_path, u_output_path, force): - # type: (str, str, bool) -> str +def build_collection(u_collection_path: str, u_output_path: str, force: bool) -> str: """Creates the Ansible collection artifact in a .tar.gz file. :param u_collection_path: The path to the collection to build. This should be the directory that contains the @@ -478,11 +490,14 @@ def build_collection(u_collection_path, u_output_path, force): collection_manifest = _build_manifest(**collection_meta) file_manifest = _build_files_manifest( b_collection_path, - collection_meta['namespace'], # type: ignore[arg-type] - collection_meta['name'], # type: ignore[arg-type] - collection_meta['build_ignore'], # type: ignore[arg-type] - collection_meta['manifest'], # type: ignore[arg-type] - collection_meta['license_file'], # type: ignore[arg-type] + collection_meta['namespace'], + collection_meta['name'], + t.cast(list[str], collection_meta.get('build_ignore', [])), + t.cast( + ManifestMetadataType | t.Type[Sentinel], + collection_meta.get('manifest', Sentinel), + ), + collection_meta['license_file'], ) artifact_tarball_file_name = '{ns!s}-{name!s}-{ver!s}.tar.gz'.format( @@ -508,13 +523,13 @@ def build_collection(u_collection_path, u_output_path, force): def download_collections( - collections, # type: t.Iterable[Requirement] - output_path, # type: str - apis, # type: t.Iterable[GalaxyAPI] - no_deps, # type: bool - allow_pre_release, # type: bool - artifacts_manager, # type: ConcreteArtifactsManager -): # type: (...) -> None + collections: t.Iterable[Requirement], + output_path: str, + apis: t.Iterable[GalaxyAPI], + no_deps: bool, + allow_pre_release: bool, + artifacts_manager: ConcreteArtifactsManager, +) -> None: """Download Ansible collections as their tarball from a Galaxy server to the path specified and creates a requirements file of the downloaded requirements to be used for an install. @@ -609,7 +624,7 @@ def download_collections( req_fd.write(yaml_bytes) -def publish_collection(collection_path, api, wait, timeout): +def publish_collection(collection_path, api, wait, timeout) -> None: """Publish an Ansible collection tarball into an Ansible Galaxy server. :param collection_path: The path to the collection tarball to publish. @@ -634,20 +649,20 @@ def publish_collection(collection_path, api, wait, timeout): def install_collections( - collections, # type: t.Iterable[Requirement] - output_path, # type: str - apis, # type: t.Iterable[GalaxyAPI] - ignore_errors, # type: bool - no_deps, # type: bool - force, # type: bool - force_deps, # type: bool - upgrade, # type: bool - allow_pre_release, # type: bool - artifacts_manager, # type: ConcreteArtifactsManager - disable_gpg_verify, # type: bool - offline, # type: bool - read_requirement_paths, # type: set[str] -): # type: (...) -> None + collections: t.Iterable[Requirement], + output_path: str, + apis: t.Iterable[GalaxyAPI], + ignore_errors: bool, + no_deps: bool, + force: bool, + force_deps: bool, + upgrade: bool, + allow_pre_release: bool, + artifacts_manager: ConcreteArtifactsManager, + disable_gpg_verify: bool, + offline: bool, + read_requirement_paths: set[str], +) -> None: """Install Ansible collections to the path specified. :param collections: The collections to install. @@ -776,7 +791,7 @@ def install_collections( # NOTE: imported in ansible.cli.galaxy -def validate_collection_name(name): # type: (str) -> str +def validate_collection_name(name: str) -> str: """Validates the collection name as an input from the user or a requirements file fit the requirements. :param name: The input name with optional range specifier split by ':'. @@ -793,7 +808,7 @@ def validate_collection_name(name): # type: (str) -> str # NOTE: imported in ansible.cli.galaxy -def validate_collection_path(collection_path): # type: (str) -> str +def validate_collection_path(collection_path: str) -> str: """Ensure a given path ends with 'ansible_collections' :param collection_path: The path that should end in 'ansible_collections' @@ -807,13 +822,13 @@ def validate_collection_path(collection_path): # type: (str) -> str def verify_collections( - collections, # type: t.Iterable[Requirement] - search_paths, # type: t.Iterable[str] - apis, # type: t.Iterable[GalaxyAPI] - ignore_errors, # type: bool - local_verify_only, # type: bool - artifacts_manager, # type: ConcreteArtifactsManager -): # type: (...) -> list[CollectionVerifyResult] + collections: t.Iterable[Requirement], + search_paths: t.Iterable[str], + apis: t.Iterable[GalaxyAPI], + ignore_errors: bool, + local_verify_only: bool, + artifacts_manager: ConcreteArtifactsManager, +) -> list[CollectionVerifyResult]: r"""Verify the integrity of locally installed collections. :param collections: The collections to check. @@ -824,7 +839,7 @@ def verify_collections( :param artifacts_manager: Artifacts manager. :return: list of CollectionVerifyResult objects describing the results of each collection verification """ - results = [] # type: list[CollectionVerifyResult] + results: list[CollectionVerifyResult] = [] api_proxy = MultiGalaxyAPIProxy(apis, artifacts_manager) @@ -949,7 +964,7 @@ def _tempdir(): @contextmanager -def _display_progress(msg=None): +def _display_progress(msg: str | None = None) -> t.Iterator[None]: config_display = C.GALAXY_DISPLAY_PROGRESS display_wheel = sys.stdout.isatty() if config_display is None else config_display @@ -961,7 +976,7 @@ def _display_progress(msg=None): yield return - def progress(display_queue, actual_display): + def progress(display_queue: DisplayQueueType, actual_display: Display) -> None: actual_display.debug("Starting display_progress display thread") t = threading.current_thread() @@ -984,21 +999,10 @@ def _display_progress(msg=None): actual_display.debug("Received end signal for display_progress display thread") return - class DisplayThread(object): - - def __init__(self, display_queue): - self.display_queue = display_queue - - def __getattr__(self, attr): - def call_display(*args, **kwargs): - self.display_queue.put((attr, args, kwargs)) - - return call_display - # Temporary override the global display class with our own which add the calls to a queue for the thread to call. old_display = display try: - display_queue = queue.Queue() + display_queue: DisplayQueueType = queue.Queue() display = DisplayThread(display_queue) t = threading.Thread(target=progress, args=(display_queue, old_display)) t.daemon = True @@ -1016,7 +1020,7 @@ def _display_progress(msg=None): display = old_display -def _verify_file_hash(b_path, filename, expected_hash, error_queue): +def _verify_file_hash(b_path, filename: str, expected_hash, error_queue: list[ModifiedContent]) -> None: b_file_path = to_bytes(os.path.join(to_text(b_path), filename), errors='surrogate_or_strict') if not os.path.isfile(b_file_path): @@ -1029,7 +1033,7 @@ def _verify_file_hash(b_path, filename, expected_hash, error_queue): error_queue.append(ModifiedContent(filename=filename, expected=expected_hash, installed=actual_hash)) -def _make_manifest(): +def _make_manifest() -> FilesManifestType: return { 'files': [ { @@ -1044,7 +1048,7 @@ def _make_manifest(): } -def _make_entry(name, ftype, chksum_type='sha256', chksum=None): +def _make_entry(name: str, ftype: str, chksum_type='sha256', chksum=None): return { 'name': name, 'ftype': ftype, @@ -1054,9 +1058,14 @@ def _make_entry(name, ftype, chksum_type='sha256', chksum=None): } -def _build_files_manifest(b_collection_path, namespace, name, ignore_patterns, - manifest_control, license_file): - # type: (bytes, str, str, list[str], dict[str, t.Any], t.Optional[str]) -> FilesManifestType +def _build_files_manifest( + b_collection_path: bytes, + namespace: str, + name: str, + ignore_patterns: list[str], + manifest_control: ManifestMetadataType | t.Type[Sentinel], + license_file: str | None, +) -> FilesManifestType: if ignore_patterns and manifest_control is not Sentinel: raise AnsibleError('"build_ignore" and "manifest" are mutually exclusive') @@ -1065,24 +1074,25 @@ def _build_files_manifest(b_collection_path, namespace, name, ignore_patterns, b_collection_path, namespace, name, - manifest_control, + t.cast(ManifestMetadataType, manifest_control), # no narrowing?? license_file, ) return _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patterns) -def _build_files_manifest_distlib(b_collection_path, namespace, name, manifest_control, - license_file): - # type: (bytes, str, str, dict[str, t.Any], t.Optional[str]) -> FilesManifestType +def _build_files_manifest_distlib( + b_collection_path: bytes, + namespace: str, + name: str, + manifest_control: ManifestMetadataType, + license_file: str | None, +) -> FilesManifestType: if not HAS_DISTLIB: raise AnsibleError('Use of "manifest" requires the python "distlib" library') - if manifest_control is None: - manifest_control = {} - try: - control = ManifestControl(**manifest_control) + control = ManifestControl(**(manifest_control or {'directives': []})) except TypeError as ex: raise AnsibleError(f'Invalid "manifest" provided: {ex}') @@ -1175,8 +1185,12 @@ def _build_files_manifest_distlib(b_collection_path, namespace, name, manifest_c return manifest -def _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patterns): - # type: (bytes, str, str, list[str]) -> FilesManifestType +def _build_files_manifest_walk( + b_collection_path: bytes, + namespace: str, + name: str, + ignore_patterns: list[str], +) -> FilesManifestType: # We always ignore .pyc and .retry files as well as some well known version control directories. The ignore # patterns can be extended by the build_ignore key in galaxy.yml b_ignore_patterns = [ @@ -1202,7 +1216,7 @@ def _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patter b_rel_base_dir = os.path.relpath(b_path, common_prefix) return b_rel_base_dir.lstrip(os.path.sep.encode()) - def _walk(b_path, b_top_level_dir): + def _walk(b_path: bytes, b_top_level_dir: bytes) -> None: b_rel_base_dir = _discover_relative_base_directory(b_path, b_top_level_dir) for b_item in os.listdir(b_path): b_abs_path = os.path.join(b_path, b_item) @@ -1249,9 +1263,23 @@ def _build_files_manifest_walk(b_collection_path, namespace, name, ignore_patter # FIXME: accept a dict produced from `galaxy.yml` instead of separate args -def _build_manifest(namespace, name, version, authors, readme, tags, description, license_file, - dependencies, repository, documentation, homepage, issues, **kwargs): - manifest = { +def _build_manifest( + namespace: str, + name: str, + version: str | None, + authors: list[str], + readme: str, + tags: list[str], + description: str, + license_file: str, + dependencies: dict[str, str], + repository: str, + documentation: str, + homepage: str, + issues: str, + **kwargs: str, +) -> CollectionManifestType: + return { 'collection_info': { 'namespace': namespace, 'name': name, @@ -1278,19 +1306,29 @@ def _build_manifest(namespace, name, version, authors, readme, tags, description 'format': MANIFEST_FORMAT, } - return manifest - def _build_collection_tar( - b_collection_path, # type: bytes - b_tar_path, # type: bytes - collection_manifest, # type: CollectionManifestType - file_manifest, # type: FilesManifestType -): # type: (...) -> str + b_collection_path: bytes, + b_tar_path: bytes, + collection_manifest: CollectionManifestType, + file_manifest: FilesManifestType, +) -> str: """Build a tar.gz collection artifact from the manifest data.""" files_manifest_json = to_bytes(json.dumps(file_manifest, indent=True), errors='surrogate_or_strict') - collection_manifest['file_manifest_file']['chksum_sha256'] = secure_hash_s(files_manifest_json, hash_func=sha256) - collection_manifest_json = to_bytes(json.dumps(collection_manifest, indent=True), errors='surrogate_or_strict') + collection_manifest_with_hash = { + **collection_manifest, + 'file_manifest_file': { + **collection_manifest['file_manifest_file'], + 'chksum_sha256': secure_hash_s( + files_manifest_json, + hash_func=sha256, + ), + }, + } + collection_manifest_json = to_bytes( + json.dumps(collection_manifest_with_hash, indent=True), + errors='surrogate_or_strict', + ) with _tempdir() as b_temp_path: b_tar_filepath = os.path.join(b_temp_path, os.path.basename(b_tar_path)) @@ -1305,7 +1343,7 @@ def _build_collection_tar( tar_info.mode = S_IRWU_RG_RO tar_file.addfile(tarinfo=tar_info, fileobj=b_io) - for file_info in file_manifest['files']: # type: ignore[union-attr] + for file_info in file_manifest['files']: if file_info['name'] == '.': continue @@ -1313,7 +1351,7 @@ def _build_collection_tar( filename = to_native(file_info['name'], errors='surrogate_or_strict') b_src_path = os.path.join(b_collection_path, to_bytes(filename, errors='surrogate_or_strict')) - def reset_stat(tarinfo): + def reset_stat(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo: if tarinfo.type != tarfile.SYMTYPE: existing_is_exec = tarinfo.mode & stat.S_IXUSR tarinfo.mode = S_IRWXU_RXG_RXO if existing_is_exec or tarinfo.isdir() else S_IRWU_RG_RO @@ -1353,7 +1391,12 @@ def _build_collection_tar( return tar_path -def _build_collection_dir(b_collection_path, b_collection_output, collection_manifest, file_manifest): +def _build_collection_dir( + b_collection_path: bytes, + b_collection_output: bytes, + collection_manifest: CollectionManifestType, + file_manifest: FilesManifestType, +) -> str: """Build a collection directory from the manifest data. This should follow the same pattern as _build_collection_tar. @@ -1361,8 +1404,20 @@ def _build_collection_dir(b_collection_path, b_collection_output, collection_man os.makedirs(b_collection_output, mode=S_IRWXU_RXG_RXO) files_manifest_json = to_bytes(json.dumps(file_manifest, indent=True), errors='surrogate_or_strict') - collection_manifest['file_manifest_file']['chksum_sha256'] = secure_hash_s(files_manifest_json, hash_func=sha256) - collection_manifest_json = to_bytes(json.dumps(collection_manifest, indent=True), errors='surrogate_or_strict') + collection_manifest_with_hash = { + **collection_manifest, + 'file_manifest_file': { + **collection_manifest['file_manifest_file'], + 'chksum_sha256': secure_hash_s( + files_manifest_json, + hash_func=sha256, + ), + }, + } + collection_manifest_json = to_bytes( + json.dumps(collection_manifest_with_hash, indent=True), + errors='surrogate_or_strict', + ) # Write contents to the files for name, b in [(MANIFEST_FILENAME, collection_manifest_json), ('FILES.json', files_manifest_json)]: @@ -1402,7 +1457,7 @@ def _build_collection_dir(b_collection_path, b_collection_output, collection_man return collection_output -def _normalize_collection_path(path): +def _normalize_collection_path(path: pathlib.Path | str) -> pathlib.Path: str_path = path.as_posix() if isinstance(path, pathlib.Path) else path return pathlib.Path( # This is annoying, but GalaxyCLI._resolve_path did it @@ -1410,18 +1465,24 @@ def _normalize_collection_path(path): ).expanduser().absolute() -def find_existing_collections(path_filter, artifacts_manager, namespace_filter=None, collection_filter=None, dedupe=True): +def find_existing_collections( + path_filter: str | t.Sequence[str], + artifacts_manager: ConcreteArtifactsManager, + namespace_filter: str | t.Sequence[str] | None = None, + collection_filter: str | t.Sequence[str] | None = None, + dedupe: bool = True, +) -> t.Iterable[Candidate]: """Locate all collections under a given path. :param path: Collection dirs layout search path. :param artifacts_manager: Artifacts manager. """ if path_filter and not is_sequence(path_filter): - path_filter = [path_filter] + path_filter = [t.cast(str, path_filter)] if namespace_filter and not is_sequence(namespace_filter): - namespace_filter = [namespace_filter] + namespace_filter = [t.cast(str, namespace_filter)] if collection_filter and not is_sequence(collection_filter): - collection_filter = [collection_filter] + collection_filter = [t.cast(str, collection_filter)] paths = set() for path in files('ansible_collections').glob('*/*/'): @@ -1474,8 +1535,11 @@ def find_existing_collections(path_filter, artifacts_manager, namespace_filter=N yield req -def install(collection, path, artifacts_manager): # FIXME: mv to dataclasses? - # type: (Candidate, str, ConcreteArtifactsManager) -> None +def install( # FIXME: mv to dataclasses? + collection: Candidate, + path: str, + artifacts_manager: ConcreteArtifactsManager, +) -> None: """Install a collection under a given path. :param collection: Collection to be installed. @@ -1520,8 +1584,11 @@ def install(collection, path, artifacts_manager): # FIXME: mv to dataclasses? ) -def write_source_metadata(collection, b_collection_path, artifacts_manager): - # type: (Candidate, bytes, ConcreteArtifactsManager) -> None +def write_source_metadata( + collection: Candidate, + b_collection_path: bytes, + artifacts_manager: ConcreteArtifactsManager, +) -> None: source_data = artifacts_manager.get_galaxy_artifact_source_info(collection) b_yaml_source_data = to_bytes(yaml_dump(source_data), errors='surrogate_or_strict') @@ -1543,7 +1610,7 @@ def write_source_metadata(collection, b_collection_path, artifacts_manager): raise -def remove_source_metadata(collection, b_collection_path): +def remove_source_metadata(collection, b_collection_path) -> None: pattern = f"{collection.namespace}.{collection.name}-*.info" info_path = os.path.join( b_collection_path, @@ -1559,8 +1626,13 @@ def remove_source_metadata(collection, b_collection_path): pass -def verify_artifact_manifest(manifest_file, signatures, keyring, required_signature_count, ignore_signature_errors): - # type: (str, list[str], str, str, list[str]) -> None +def verify_artifact_manifest( + manifest_file: str, + signatures: list[str], + keyring: str, + required_signature_count: str, + ignore_signature_errors: list[str], +) -> None: failed_verify = False coll_path_parts = to_text(manifest_file, errors='surrogate_or_strict').split(os.path.sep) collection_name = '%s.%s' % (coll_path_parts[-3], coll_path_parts[-2]) # get 'ns' and 'coll' from /path/to/ns/coll/MANIFEST.json @@ -1569,7 +1641,7 @@ def verify_artifact_manifest(manifest_file, signatures, keyring, required_signat display.vvvv(f"GnuPG signature verification succeeded for {collection_name}") -def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatures, keyring, required_signature_count, ignore_signature_errors): +def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatures, keyring, required_signature_count, ignore_signature_errors) -> None: """Install a collection from tarball under a given path. :param b_coll_targz_path: Collection tarball to be installed. @@ -1618,7 +1690,7 @@ def install_artifact(b_coll_targz_path, b_collection_path, b_temp_path, signatur raise -def install_src(collection, b_collection_path, b_collection_output_path, artifacts_manager): +def install_src(collection, b_collection_path, b_collection_output_path, artifacts_manager) -> None: r"""Install the collection from source control into given dir. Generates the Ansible collection artifact data from a galaxy.yml and @@ -1660,7 +1732,7 @@ def install_src(collection, b_collection_path, b_collection_output_path, artifac ) -def _extract_tar_dir(tar, dirname, b_dest): +def _extract_tar_dir(tar: tarfile.TarFile, dirname, b_dest) -> None: """ Extracts a directory from a collection tar. """ dirname = to_native(dirname, errors='surrogate_or_strict') @@ -1678,7 +1750,7 @@ def _extract_tar_dir(tar, dirname, b_dest): b_link_path = to_bytes(tar_member.linkname, errors='surrogate_or_strict') if not _is_child_path(b_link_path, b_dest, link_name=b_dir_path): raise AnsibleError("Cannot extract symlink '%s' in collection: path points to location outside of " - "collection '%s'" % (to_native(dirname), b_link_path)) + "collection %r" % (to_native(dirname), b_link_path)) os.symlink(b_link_path, b_dir_path) @@ -1687,7 +1759,13 @@ def _extract_tar_dir(tar, dirname, b_dest): os.mkdir(b_dir_path, S_IRWXU_RXG_RXO) -def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None): +def _extract_tar_file( + tar: tarfile.TarFile, + filename: str, + b_dest: bytes, + b_temp_path: bytes, + expected_hash=None, +) -> None: """ Extracts a file from a collection tar. """ with _get_tar_file_member(tar, filename) as (tar_member, tar_obj): if tar_member.type == tarfile.SYMTYPE: @@ -1716,12 +1794,15 @@ def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None): b_link_path = to_bytes(tar_member.linkname, errors='surrogate_or_strict') if not _is_child_path(b_link_path, b_dest, link_name=b_dest_filepath): raise AnsibleError("Cannot extract symlink '%s' in collection: path points to location outside of " - "collection '%s'" % (to_native(filename), b_link_path)) + "collection %r" % (to_native(filename), b_link_path)) os.symlink(b_link_path, b_dest_filepath) else: - shutil.move(to_bytes(tmpfile_obj.name, errors='surrogate_or_strict'), b_dest_filepath) + shutil.move( + to_native(tmpfile_obj.name, errors='surrogate_or_strict'), + to_native(b_dest_filepath, errors='surrogate_or_strict'), + ) # Default to rw-r--r-- and only add execute if the tar file has execute. tar_member = tar.getmember(to_native(filename, errors='surrogate_or_strict')) @@ -1732,7 +1813,10 @@ def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None): os.chmod(b_dest_filepath, new_mode) -def _get_tar_file_member(tar, filename): +def _get_tar_file_member( + tar: tarfile.TarFile, + filename: str, +) -> t.ContextManager[tuple[tarfile.TarInfo, t.IO[bytes] | None]]: n_filename = to_native(filename, errors='surrogate_or_strict') try: member = tar.getmember(n_filename) @@ -1744,7 +1828,7 @@ def _get_tar_file_member(tar, filename): return _tarfile_extract(tar, member) -def _get_json_from_tar_file(b_path, filename): +def _get_json_from_tar_file(b_path: bytes, filename: str) -> dict: file_contents = '' with tarfile.open(b_path, mode='r') as collection_tar: @@ -1758,19 +1842,19 @@ def _get_json_from_tar_file(b_path, filename): return json.loads(file_contents) -def _get_tar_file_hash(b_path, filename): +def _get_tar_file_hash(b_path: bytes, filename: str) -> str: with tarfile.open(b_path, mode='r') as collection_tar: with _get_tar_file_member(collection_tar, filename) as (dummy, tar_obj): return _consume_file(tar_obj) -def _get_file_hash(b_path, filename): # type: (bytes, str) -> str +def _get_file_hash(b_path: bytes, filename: str) -> str: filepath = os.path.join(b_path, to_bytes(filename, errors='surrogate_or_strict')) with open(filepath, 'rb') as fp: return _consume_file(fp) -def _is_child_path(path, parent_path, link_name=None): +def _is_child_path(path: bytes, parent_path: bytes, link_name=None) -> bool: """ Checks that path is a path within the parent_path specified. """ b_path = to_bytes(path, errors='surrogate_or_strict') @@ -1784,16 +1868,16 @@ def _is_child_path(path, parent_path, link_name=None): def _resolve_depenency_map( - requested_requirements, # type: t.Iterable[Requirement] - galaxy_apis, # type: t.Iterable[GalaxyAPI] - concrete_artifacts_manager, # type: ConcreteArtifactsManager - preferred_candidates, # type: t.Iterable[Candidate] | None - no_deps, # type: bool - allow_pre_release, # type: bool - upgrade, # type: bool - include_signatures, # type: bool - offline, # type: bool -): # type: (...) -> dict[str, Candidate] + requested_requirements: t.Iterable[Requirement], + galaxy_apis: t.Iterable[GalaxyAPI], + concrete_artifacts_manager: ConcreteArtifactsManager, + preferred_candidates: t.Iterable[Candidate] | None, + no_deps: bool, + allow_pre_release: bool, + upgrade: bool, + include_signatures: bool, + offline: bool, +) -> dict[str, Candidate]: """Return the resolved dependency map.""" if not HAS_RESOLVELIB: raise AnsibleError("Failed to import resolvelib, check that a supported version is installed") diff --git a/lib/ansible/galaxy/collection/_types.py b/lib/ansible/galaxy/collection/_types.py new file mode 100644 index 00000000000..541fa67b028 --- /dev/null +++ b/lib/ansible/galaxy/collection/_types.py @@ -0,0 +1,98 @@ +# Copyright: (c) 2025, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +"""A collection of shared types for ``ansible.galaxy.collection``.""" + +from __future__ import annotations + +import queue as _q +import typing as _t + +import ansible.module_utils.compat.typing as _tc +from ansible.module_utils.common.sentinel import Sentinel as _Sentinel + + +DisplayQueueItemType: _t.TypeAlias = tuple[ + str, + tuple[_t.Any, ...], + dict[str, _t.Any], +] +DisplayQueueType: _t.TypeAlias = _q.Queue[DisplayQueueItemType] + + +class DisplayThreadProto(_t.Protocol): + def __init__(self, display_queue: DisplayQueueType) -> None: + ... + + def __getattr__(self, attr: str) -> _t.Callable: + ... + + +# FIXME: Use `TypedDict` from `typing_extension` with `closed=True` once +# FIXME: it's fixed for subclasses. +# Ref: https://github.com/python/typing_extensions/issues/686 +class ManifestMetadataType(_t.TypedDict, total=False): + directives: _tc.ReadOnly[_t.Required[list[str]]] + omit_default_directives: _tc.ReadOnly[bool] + + +class _CollectionInfoTypeBase(_t.TypedDict, total=False): + namespace: _tc.ReadOnly[_t.Required[str]] + name: _tc.ReadOnly[_t.Required[str]] + # NOTE: `version: null` is only allowed for `galaxy.yml` + # NOTE: and not `MANIFEST.json`. The use-case for it is collections + # NOTE: that generate the version from Git before building a + # NOTE: distributable tarball artifact. + version: _tc.ReadOnly[_t.Required[str | None]] + authors: _tc.ReadOnly[_t.Required[list[str]]] + readme: _tc.ReadOnly[_t.Required[str]] + tags: _tc.ReadOnly[list[str]] + description: _tc.ReadOnly[str] + license: _tc.ReadOnly[str] + license_file: _tc.ReadOnly[str] + dependencies: _tc.ReadOnly[dict[str, str]] + repository: _tc.ReadOnly[str] + documentation: _tc.ReadOnly[str] + homepage: _tc.ReadOnly[str] + issues: _tc.ReadOnly[str] + + +# FIXME: Use `TypedDict` from `typing_extension` with `closed=True` once +# FIXME: it's fixed for subclasses. +# Ref: https://github.com/python/typing_extensions/issues/686 +class _CollectionInfoWithBuildIgnoreType(_CollectionInfoTypeBase): + # `build_ignore` is mutually exclusive with `manifest` + build_ignore: _tc.ReadOnly[list[str]] + + +# FIXME: Use `TypedDict` from `typing_extension` with `closed=True` once +# FIXME: it's fixed for subclasses. +# Ref: https://github.com/python/typing_extensions/issues/686 +class _CollectionInfoWithManifestType(_CollectionInfoTypeBase): + # `manifest` is mutually exclusive with `build_ignore` + manifest: _tc.ReadOnly[ManifestMetadataType | _t.Type[_Sentinel]] + + +CollectionInfoType = ( + _CollectionInfoTypeBase + | _CollectionInfoWithBuildIgnoreType + | _CollectionInfoWithManifestType +) + + +class _FileManifestEntryType(_t.TypedDict): + name: _tc.ReadOnly[str] + ftype: _tc.ReadOnly[str] + chksum_type: _tc.ReadOnly[str | None] + chksum_sha256: _tc.ReadOnly[str | None] + format: _tc.ReadOnly[int] + + +class CollectionManifestType(_t.TypedDict): + collection_info: CollectionInfoType + file_manifest_file: _FileManifestEntryType + format: int + + +class FilesManifestType(_t.TypedDict): + files: list[_FileManifestEntryType] + format: int diff --git a/lib/ansible/galaxy/collection/concrete_artifact_manager.py b/lib/ansible/galaxy/collection/concrete_artifact_manager.py index 1659bc46b49..07412a54935 100644 --- a/lib/ansible/galaxy/collection/concrete_artifact_manager.py +++ b/lib/ansible/galaxy/collection/concrete_artifact_manager.py @@ -42,6 +42,8 @@ from ansible.utils.display import Display import ansible.constants as C +from ._types import CollectionInfoType + display = Display() @@ -61,29 +63,36 @@ class ConcreteArtifactsManager: * caching all of above * retrieving the metadata out of the downloaded artifacts """ - def __init__(self, b_working_directory, validate_certs=True, keyring=None, timeout=60, required_signature_count=None, ignore_signature_errors=None): - # type: (bytes, bool, str, int, str, list[str]) -> None + def __init__( + self, + b_working_directory: bytes, + validate_certs: bool = True, + keyring: str | None = None, + timeout: int = 60, + required_signature_count: str | None = None, + ignore_signature_errors: list[str] | None = None, + ) -> None: """Initialize ConcreteArtifactsManager caches and constraints.""" - self._validate_certs = validate_certs # type: bool - self._artifact_cache = {} # type: dict[bytes, bytes] - self._galaxy_artifact_cache = {} # type: dict[Candidate | Requirement, bytes] - self._artifact_meta_cache = {} # type: dict[bytes, dict[str, str | list[str] | dict[str, str] | None | t.Type[Sentinel]]] - self._galaxy_collection_cache = {} # type: dict[Candidate | Requirement, tuple[str, str, GalaxyToken]] - self._galaxy_collection_origin_cache = {} # type: dict[Candidate, tuple[str, list[dict[str, str]]]] - self._b_working_directory = b_working_directory # type: bytes - self._supplemental_signature_cache = {} # type: dict[str, str] - self._keyring = keyring # type: str - self.timeout = timeout # type: int - self._required_signature_count = required_signature_count # type: str - self._ignore_signature_errors = ignore_signature_errors # type: list[str] - self._require_build_metadata = True # type: bool + self._validate_certs = validate_certs + self._artifact_cache: dict[bytes, bytes] = {} + self._galaxy_artifact_cache: dict[Candidate | Requirement, bytes] = {} + self._artifact_meta_cache: dict[bytes, CollectionInfoType] = {} + self._galaxy_collection_cache: dict[Candidate | Requirement, tuple[str, str, GalaxyToken]] = {} + self._galaxy_collection_origin_cache: dict[Candidate, tuple[str, list[dict[str, str]]]] = {} + self._b_working_directory = b_working_directory + self._supplemental_signature_cache: dict[str, str] = {} + self._keyring = keyring + self.timeout = timeout + self._required_signature_count = required_signature_count + self._ignore_signature_errors = ignore_signature_errors + self._require_build_metadata = True @property - def keyring(self): + def keyring(self) -> str | None: return self._keyring @property - def required_successful_signature_count(self): + def required_successful_signature_count(self) -> str | None: return self._required_signature_count @property @@ -93,17 +102,17 @@ class ConcreteArtifactsManager: return self._ignore_signature_errors @property - def require_build_metadata(self): - # type: () -> bool + def require_build_metadata(self) -> bool: return self._require_build_metadata @require_build_metadata.setter - def require_build_metadata(self, value): - # type: (bool) -> None + def require_build_metadata(self, value: bool) -> None: self._require_build_metadata = value - def get_galaxy_artifact_source_info(self, collection): - # type: (Candidate) -> dict[str, t.Union[str, list[dict[str, str]]]] + def get_galaxy_artifact_source_info( + self, + collection: Candidate, + ) -> dict[str, str | list[dict[str, str]]]: server = collection.src.api_server try: @@ -126,8 +135,10 @@ class ConcreteArtifactsManager: "signatures": signatures, } - def get_galaxy_artifact_path(self, collection): - # type: (t.Union[Candidate, Requirement]) -> bytes + def get_galaxy_artifact_path( + self, + collection: Candidate | Requirement, + ) -> bytes: """Given a Galaxy-stored collection, return a cached path. If it's not yet on disk, this method downloads the artifact first. @@ -157,7 +168,7 @@ class ConcreteArtifactsManager: expected_hash=sha256_hash, validate_certs=self._validate_certs, token=token, - ) # type: bytes + ) except URLError as err: raise AnsibleError( 'Failed to download collection tar ' @@ -190,8 +201,7 @@ class ConcreteArtifactsManager: self._galaxy_artifact_cache[collection] = b_artifact_path return b_artifact_path - def get_artifact_path(self, collection): - # type: (Collection) -> bytes + def get_artifact_path(self, collection: Collection) -> bytes: """Given a concrete collection pointer, return a cached path. If it's not yet on disk, this method downloads the artifact first. @@ -252,22 +262,27 @@ class ConcreteArtifactsManager: self._artifact_cache[collection.src] = b_artifact_path return b_artifact_path - def get_artifact_path_from_unknown(self, collection): - # type: (Candidate) -> bytes + def get_artifact_path_from_unknown(self, collection: Candidate) -> bytes: if collection.is_concrete_artifact: return self.get_artifact_path(collection) return self.get_galaxy_artifact_path(collection) - def _get_direct_collection_namespace(self, collection): - # type: (Candidate) -> t.Optional[str] - return self.get_direct_collection_meta(collection)['namespace'] # type: ignore[return-value] + def _get_direct_collection_namespace( + self, + collection: Collection, + ) -> str | None: + return self.get_direct_collection_meta(collection)['namespace'] - def _get_direct_collection_name(self, collection): - # type: (Collection) -> t.Optional[str] - return self.get_direct_collection_meta(collection)['name'] # type: ignore[return-value] + def _get_direct_collection_name( + self, + collection: Collection, + ) -> str | None: + return self.get_direct_collection_meta(collection)['name'] - def get_direct_collection_fqcn(self, collection): - # type: (Collection) -> t.Optional[str] + def get_direct_collection_fqcn( + self, + collection: Collection, + ) -> str | None: """Extract FQCN from the given on-disk collection artifact. If the collection is virtual, ``None`` is returned instead @@ -277,26 +292,29 @@ class ConcreteArtifactsManager: # NOTE: should it be something like ""? return None - return '.'.join(( # type: ignore[type-var] - self._get_direct_collection_namespace(collection), # type: ignore[arg-type] + return '.'.join(( + self._get_direct_collection_namespace(collection), self._get_direct_collection_name(collection), )) - def get_direct_collection_version(self, collection): - # type: (Collection) -> str + def get_direct_collection_version(self, collection: Collection) -> str: """Extract version from the given on-disk collection artifact.""" - return self.get_direct_collection_meta(collection)['version'] # type: ignore[return-value] + return self.get_direct_collection_meta(collection)['version'] - def get_direct_collection_dependencies(self, collection): - # type: (t.Union[Candidate, Requirement]) -> dict[str, str] + def get_direct_collection_dependencies( + self, + collection: Candidate | Requirement, + ) -> dict[str, str]: """Extract deps from the given on-disk collection artifact.""" collection_dependencies = self.get_direct_collection_meta(collection)['dependencies'] if collection_dependencies is None: collection_dependencies = {} - return collection_dependencies # type: ignore[return-value] + return collection_dependencies - def get_direct_collection_meta(self, collection): - # type: (Collection) -> dict[str, t.Union[str, dict[str, str], list[str], None, t.Type[Sentinel]]] + def get_direct_collection_meta( + self, + collection: Collection, + ) -> CollectionInfoType: """Extract meta from the given on-disk collection artifact.""" try: # FIXME: use unique collection identifier as a cache key? return self._artifact_meta_cache[collection.src] @@ -320,6 +338,8 @@ class ConcreteArtifactsManager: 'namespace': None, 'dependencies': {to_native(b_artifact_path): '*'}, 'version': '*', + 'authors': [], # required in `_CollectionInfoTypeBase` + 'readme': '', # required in `_CollectionInfoTypeBase` } elif collection.is_subdirs: collection_meta = { @@ -331,6 +351,8 @@ class ConcreteArtifactsManager: '*', ), 'version': '*', + 'authors': [], # required in `_CollectionInfoTypeBase` + 'readme': '', # required in `_CollectionInfoTypeBase` } else: raise RuntimeError @@ -338,8 +360,15 @@ class ConcreteArtifactsManager: self._artifact_meta_cache[collection.src] = collection_meta return collection_meta - def save_collection_source(self, collection, url, sha256_hash, token, signatures_url, signatures): - # type: (Candidate, str, str, GalaxyToken, str, list[dict[str, str]]) -> None + def save_collection_source( + self, + collection: Candidate, + url: str, + sha256_hash: str, + token: GalaxyToken, + signatures_url: str, + signatures: list[dict[str, str]], + ) -> None: """Store collection URL, SHA256 hash and Galaxy API token. This is a hook that is supposed to be called before attempting to @@ -352,13 +381,13 @@ class ConcreteArtifactsManager: @contextmanager def under_tmpdir( cls, - temp_dir_base, # type: str - validate_certs=True, # type: bool - keyring=None, # type: str - required_signature_count=None, # type: str - ignore_signature_errors=None, # type: list[str] - require_build_metadata=True, # type: bool - ): # type: (...) -> t.Iterator[ConcreteArtifactsManager] + temp_dir_base: str, + validate_certs: bool = True, + keyring: str | None = None, + required_signature_count: str | None = None, + ignore_signature_errors: list[str] | None = None, + require_build_metadata: bool = True, + ) -> t.Iterator[ConcreteArtifactsManager]: """Custom ConcreteArtifactsManager constructor with temp dir. This method returns a context manager that allocates and cleans @@ -410,7 +439,7 @@ def parse_scm(collection, version): return name, version, path, fragment -def _extract_collection_from_git(repo_url, coll_ver, b_path): +def _extract_collection_from_git(repo_url, coll_ver, b_path: bytes) -> bytes: name, version, git_url, fragment = parse_scm(repo_url, coll_ver) b_checkout_path = mkdtemp( dir=b_path, @@ -467,8 +496,14 @@ def _extract_collection_from_git(repo_url, coll_ver, b_path): backoff_iterator=generate_jittered_backoff(retries=6, delay_base=2, delay_threshold=40), should_retry_error=should_retry_error ) -def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeout=60): - # type: (str, bytes, t.Optional[str], bool, GalaxyToken, int) -> bytes +def _download_file( + url: str, + b_path: bytes, + expected_hash: str | None, + validate_certs: bool, + token: GalaxyToken | None = None, + timeout: int = 60, +) -> bytes: # ^ NOTE: used in download and verify_collections ^ b_tarball_name = to_bytes( url.rsplit('/', 1)[1], errors='surrogate_or_strict', @@ -478,7 +513,7 @@ def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeo b_tarball_dir = mkdtemp( dir=b_path, prefix=b'-'.join((b_file_name, b'')), - ) # type: bytes + ) b_file_path = os.path.join(b_tarball_dir, b_tarball_name) @@ -493,7 +528,7 @@ def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeo timeout=timeout ) - with open(b_file_path, 'wb') as download_file: # type: t.BinaryIO + with open(b_file_path, 'wb') as download_file: actual_hash = _consume_file(resp, write_to=download_file) if expected_hash: @@ -508,8 +543,10 @@ def _download_file(url, b_path, expected_hash, validate_certs, token=None, timeo return b_file_path -def _consume_file(read_from, write_to=None): - # type: (t.BinaryIO, t.BinaryIO) -> str +def _consume_file( + read_from: t.IO[bytes], + write_to: t.IO[bytes] | None = None, +) -> str: bufsize = 65536 sha256_digest = sha256() data = read_from.read(bufsize) @@ -524,21 +561,20 @@ def _consume_file(read_from, write_to=None): def _normalize_galaxy_yml_manifest( - galaxy_yml, # type: dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]] - b_galaxy_yml_path, # type: bytes - require_build_metadata=True, # type: bool -): - # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]] - galaxy_yml_schema = ( - get_collections_galaxy_meta_info() - ) # type: list[dict[str, t.Any]] # FIXME: <-- - # FIXME: 👆maybe precise type: list[dict[str, t.Union[bool, str, list[str]]]] + galaxy_yml: dict[ + str, + str | list[str] | dict[str, str] | None | t.Type[Sentinel], + ], + b_galaxy_yml_path: bytes, + require_build_metadata: bool = True, +) -> CollectionInfoType: + galaxy_yml_schema = get_collections_galaxy_meta_info() - mandatory_keys = set() - string_keys = set() # type: set[str] - list_keys = set() # type: set[str] - dict_keys = set() # type: set[str] - sentinel_keys = set() # type: set[str] + mandatory_keys: set[str] = set() + string_keys: set[str] = set() + list_keys: set[str] = set() + dict_keys: set[str] = set() + sentinel_keys: set[str] = set() for info in galaxy_yml_schema: if info.get('required', False): @@ -599,13 +635,13 @@ def _normalize_galaxy_yml_manifest( if not galaxy_yml.get('version'): galaxy_yml['version'] = '*' - return galaxy_yml + return t.cast(CollectionInfoType, galaxy_yml) def _get_meta_from_dir( - b_path, # type: bytes - require_build_metadata=True, # type: bool -): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]] + b_path: bytes, + require_build_metadata: bool = True, +) -> CollectionInfoType: try: return _get_meta_from_installed_dir(b_path) except LookupError: @@ -613,9 +649,9 @@ def _get_meta_from_dir( def _get_meta_from_src_dir( - b_path, # type: bytes - require_build_metadata=True, # type: bool -): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]] + b_path: bytes, + require_build_metadata: bool = True, +) -> CollectionInfoType: galaxy_yml = os.path.join(b_path, _GALAXY_YAML) if not os.path.isfile(galaxy_yml): raise LookupError( @@ -647,9 +683,9 @@ def _get_meta_from_src_dir( def _get_json_from_installed_dir( - b_path, # type: bytes - filename, # type: str -): # type: (...) -> dict + b_path: bytes, + filename: str, +) -> dict: b_json_filepath = os.path.join(b_path, to_bytes(filename, errors='surrogate_or_strict')) @@ -673,9 +709,7 @@ def _get_json_from_installed_dir( return manifest -def _get_meta_from_installed_dir( - b_path, # type: bytes -): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]] +def _get_meta_from_installed_dir(b_path: bytes) -> CollectionInfoType: manifest = _get_json_from_installed_dir(b_path, MANIFEST_FILENAME) collection_info = manifest['collection_info'] @@ -694,9 +728,7 @@ def _get_meta_from_installed_dir( return collection_info -def _get_meta_from_tar( - b_path, # type: bytes -): # type: (...) -> dict[str, t.Union[str, list[str], dict[str, str], None, t.Type[Sentinel]]] +def _get_meta_from_tar(b_path: bytes) -> CollectionInfoType: if not os.path.exists(b_path): raise AnsibleError( f"Unable to find collection artifact file at '{to_native(b_path)}'." @@ -708,7 +740,7 @@ def _get_meta_from_tar( format(path=to_native(b_path)), ) - with tarfile.open(b_path, mode='r') as collection_tar: # type: tarfile.TarFile + with tarfile.open(b_path, mode='r') as collection_tar: try: member = collection_tar.getmember(MANIFEST_FILENAME) except KeyError: @@ -746,10 +778,9 @@ def _get_meta_from_tar( @contextmanager def _tarfile_extract( - tar, # type: tarfile.TarFile - member, # type: tarfile.TarInfo -): - # type: (...) -> t.Iterator[tuple[tarfile.TarInfo, t.Optional[t.IO[bytes]]]] + tar: tarfile.TarFile, + member: tarfile.TarInfo, +) -> t.Iterator[tuple[tarfile.TarInfo, t.IO[bytes] | None]]: tar_obj = tar.extractfile(member) try: yield member, tar_obj diff --git a/lib/ansible/galaxy/collection/gpg.py b/lib/ansible/galaxy/collection/gpg.py index 55a9c10347f..c2c7e55715a 100644 --- a/lib/ansible/galaxy/collection/gpg.py +++ b/lib/ansible/galaxy/collection/gpg.py @@ -19,11 +19,12 @@ from urllib.error import HTTPError, URLError if t.TYPE_CHECKING: from ansible.utils.display import Display + from ._types import DisplayQueueType, DisplayThreadProto def get_signature_from_source( source: str, - display: Display | None = None, + display: Display | DisplayThreadProto | None = None, ) -> str: if display is not None: display.vvvv(f"Using signature at {source}") @@ -47,7 +48,7 @@ def run_gpg_verify( manifest_file: str, signature: str, keyring: str, - display: Display, + display: Display | DisplayThreadProto, ) -> tuple[str, int]: status_fd_read, status_fd_write = os.pipe()