use pep585 types everywhere and add a test (#10414)

this is the final part of
https://github.com/certbot/certbot/issues/10195. this fixes
https://github.com/certbot/certbot/issues/10195

the changes in the first commit were done automatically with the
command:
```
ruff check --fix --extend-select UP006 --unsafe-fixes
```
the second commit configures ruff to check for this to avoid regressions

thanks for bearing with me thru these somewhat large automatically
generated PRs ohemorange 🙏
This commit is contained in:
Brad Warren 2025-08-12 16:56:45 -07:00 committed by GitHub
parent 27b344c8d8
commit d5a2e9227c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 161 additions and 219 deletions

View file

@ -1,7 +1,6 @@
"""This module contains advanced assertions for the certbot integration tests."""
import os
from typing import Optional
from typing import Type
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
@ -22,7 +21,7 @@ SYSTEM_SID = 'S-1-5-18'
ADMINS_SID = 'S-1-5-32-544'
def assert_elliptic_key(key_path: str, curve: Type[EllipticCurve]) -> None:
def assert_elliptic_key(key_path: str, curve: type[EllipticCurve]) -> None:
"""
Asserts that the key at the given path is an EC key using the given curve.
:param key_path: path to key

View file

@ -4,7 +4,6 @@ import shutil
import sys
import tempfile
from typing import Iterable
from typing import Tuple
import pytest
@ -67,7 +66,7 @@ class IntegrationTestsContext:
"""Cleanup the integration test context."""
shutil.rmtree(self.workspace)
def certbot(self, args: Iterable[str], force_renew: bool = True) -> Tuple[str, str]:
def certbot(self, args: Iterable[str], force_renew: bool = True) -> tuple[str, str]:
"""
Execute certbot with given args, not renewing certificates by default.
:param args: args to pass to certbot

View file

@ -8,8 +8,6 @@ import shutil
import subprocess
import time
from typing import Generator
from typing import Tuple
from typing import Type
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
from cryptography.hazmat.primitives.asymmetric.ec import SECP256R1
@ -570,7 +568,7 @@ def test_reuse_key_allow_subset_of_names(context: IntegrationTestsContext) -> No
def test_new_key(context: IntegrationTestsContext) -> None:
"""Tests --new-key and its interactions with --reuse-key"""
def private_key(generation: int) -> Tuple[str, str]:
def private_key(generation: int) -> tuple[str, str]:
pk_path = join(context.config_dir, f'archive/{certname}/privkey{generation}.pem')
with open(pk_path, 'r') as file:
return file.read(), pk_path
@ -670,7 +668,7 @@ def test_default_rsa_size(context: IntegrationTestsContext) -> None:
('secp521r1', SECP521R1)]
)
def test_ecdsa_curves(context: IntegrationTestsContext, curve: str,
curve_cls: Type[EllipticCurve]) -> None:
curve_cls: type[EllipticCurve]) -> None:
"""Test issuance for each supported ECDSA curve"""
domain = context.get_domain('curve')
context.certbot([
@ -806,7 +804,7 @@ def test_revoke_and_unregister(context: IntegrationTestsContext) -> None:
('secp521r1', SECP521R1)]
)
def test_revoke_ecdsa_cert_key(
context: IntegrationTestsContext, curve: str, curve_cls: Type[EllipticCurve]) -> None:
context: IntegrationTestsContext, curve: str, curve_cls: type[EllipticCurve]) -> None:
"""Test revoking a certificate """
cert: str = context.get_domain('curve')
context.certbot([
@ -831,7 +829,7 @@ def test_revoke_ecdsa_cert_key(
('secp521r1', SECP521R1)]
)
def test_revoke_ecdsa_cert_key_delete(
context: IntegrationTestsContext, curve: str, curve_cls: Type[EllipticCurve]) -> None:
context: IntegrationTestsContext, curve: str, curve_cls: type[EllipticCurve]) -> None:
"""Test revoke and deletion for each supported curve type"""
cert: str = context.get_domain('curve')
context.certbot([

View file

@ -2,7 +2,6 @@
import os
import subprocess
from typing import Iterable
from typing import Tuple
import pytest
@ -36,7 +35,7 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext):
self._stop_nginx()
super().cleanup()
def certbot_test_nginx(self, args: Iterable[str]) -> Tuple[str, str]:
def certbot_test_nginx(self, args: Iterable[str]) -> tuple[str, str]:
"""
Main command to execute certbot using the nginx plugin.
:param list args: list of arguments to pass to nginx

View file

@ -2,7 +2,6 @@
import os
import ssl
from typing import Generator
from typing import List
import pytest
@ -33,7 +32,7 @@ def test_context(request: pytest.FixtureRequest) -> Generator[IntegrationTestsCo
'--preferred-challenges', 'http'
], {'default_server': False}),
], indirect=['context'])
def test_certificate_deployment(certname_pattern: str, params: List[str],
def test_certificate_deployment(certname_pattern: str, params: list[str],
context: IntegrationTestsContext) -> None:
"""
Test various scenarios to deploy a certificate to nginx using certbot.

View file

@ -4,7 +4,6 @@ import importlib.resources
import tempfile
from typing import Generator
from typing import Iterable
from typing import Tuple
import pytest
@ -24,7 +23,7 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext):
else: # Primary node
self._dns_xdist = request.config.dns_xdist # type: ignore[attr-defined]
def certbot_test_rfc2136(self, args: Iterable[str]) -> Tuple[str, str]:
def certbot_test_rfc2136(self, args: Iterable[str]) -> tuple[str, str]:
"""
Main command to execute certbot using the RFC2136 DNS authenticator.
:param list args: list of arguments to pass to Certbot

View file

@ -13,11 +13,8 @@ import time
from types import TracebackType
from typing import Any
from typing import cast
from typing import Dict
from typing import List
from typing import Mapping
from typing import Optional
from typing import Type
# pylint: disable=wildcard-import,unused-wildcard-import
from certbot_integration_tests.utils import misc
@ -44,7 +41,7 @@ class ACMEServer:
ACMEServer is also a context manager, and so can be used to ensure ACME server is
started/stopped upon context enter/exit.
"""
def __init__(self, nodes: List[str], http_proxy: bool = True,
def __init__(self, nodes: list[str], http_proxy: bool = True,
stdout: bool = False, dns_server: Optional[str] = None,
http_01_port: Optional[int] = None) -> None:
"""
@ -60,7 +57,7 @@ class ACMEServer:
self._proxy = http_proxy
self._workspace = tempfile.mkdtemp()
self._processes: List[subprocess.Popen[bytes]] = []
self._processes: list[subprocess.Popen[bytes]] = []
self._stdout = sys.stdout if stdout else open(os.devnull, 'w') # pylint: disable=consider-using-with
self._dns_server = dns_server
self._http_01_port = DEFAULT_HTTP_01_PORT
@ -101,17 +98,17 @@ class ACMEServer:
self._stdout.close()
print('=> Test infrastructure stopped and cleaned up.')
def __enter__(self) -> Dict[str, Any]:
def __enter__(self) -> dict[str, Any]:
self.start()
return self.acme_xdist
def __exit__(self, exc_type: Optional[Type[BaseException]], exc: Optional[BaseException],
def __exit__(self, exc_type: Optional[type[BaseException]], exc: Optional[BaseException],
traceback: Optional[TracebackType]) -> None:
self.stop()
def _construct_acme_xdist(self, nodes: List[str]) -> None:
def _construct_acme_xdist(self, nodes: list[str]) -> None:
"""Generate and return the acme_xdist dict"""
acme_xdist: Dict[str, Any] = {}
acme_xdist: dict[str, Any] = {}
# Directory and ACME port are set implicitly in the docker-compose.yml
# files of Pebble.
@ -161,14 +158,14 @@ class ACMEServer:
def _prepare_http_proxy(self) -> None:
"""Configure and launch an HTTP proxy"""
print(f'=> Configuring the HTTP proxy on port {self._http_01_port}...')
http_port_map = cast(Dict[str, int], self.acme_xdist['http_port'])
http_port_map = cast(dict[str, int], self.acme_xdist['http_port'])
mapping = {r'.+\.{0}\.wtf'.format(node): 'http://127.0.0.1:{0}'.format(port)
for node, port in http_port_map.items()}
command = [sys.executable, proxy.__file__, str(self._http_01_port), json.dumps(mapping)]
self._launch_process(command)
print('=> Finished configuring the HTTP proxy.')
def _launch_process(self, command: List[str], cwd: str = os.getcwd(),
def _launch_process(self, command: list[str], cwd: str = os.getcwd(),
env: Optional[Mapping[str, str]] = None,
force_stderr: bool = False) -> subprocess.Popen[bytes]:
"""Launch silently a subprocess OS command"""

View file

@ -4,10 +4,7 @@
import os
import subprocess
import sys
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
import certbot_integration_tests
from certbot_integration_tests.utils.constants import DEFAULT_HTTP_01_PORT
@ -15,9 +12,9 @@ from certbot_integration_tests.utils.constants import HTTPS_PORT
from certbot_integration_tests.utils.constants import PEBBLE_DIRECTORY_URL
def certbot_test(certbot_args: List[str], directory_url: Optional[str], http_01_port: int,
def certbot_test(certbot_args: list[str], directory_url: Optional[str], http_01_port: int,
https_port: int, config_dir: str, workspace: str,
force_renew: bool = True) -> Tuple[str, str]:
force_renew: bool = True) -> tuple[str, str]:
"""
Invoke the certbot executable available in PATH in a test context for the given args.
The test context consists in running certbot in debug mode, with various flags suitable
@ -45,7 +42,7 @@ def certbot_test(certbot_args: List[str], directory_url: Optional[str], http_01_
return proc.stdout, proc.stderr
def _prepare_environ(workspace: str) -> Dict[str, str]:
def _prepare_environ(workspace: str) -> dict[str, str]:
# pylint: disable=missing-function-docstring
new_environ = os.environ.copy()
@ -83,9 +80,9 @@ def _prepare_environ(workspace: str) -> Dict[str, str]:
return new_environ
def _prepare_args_env(certbot_args: List[str], directory_url: Optional[str], http_01_port: int,
def _prepare_args_env(certbot_args: list[str], directory_url: Optional[str], http_01_port: int,
https_port: int, config_dir: str, workspace: str,
force_renew: bool) -> Tuple[List[str], Dict[str, str]]:
force_renew: bool) -> tuple[list[str], dict[str, str]]:
new_environ = _prepare_environ(workspace)
additional_args = ['--no-random-sleep-on-renew']

View file

@ -11,10 +11,7 @@ import tempfile
import time
from types import TracebackType
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Type
from certbot_integration_tests.utils import constants
@ -36,7 +33,7 @@ class DNSServer:
future to support parallelization (https://github.com/certbot/certbot/issues/8455).
"""
def __init__(self, unused_nodes: List[str], show_output: bool = False) -> None:
def __init__(self, unused_nodes: list[str], show_output: bool = False) -> None:
"""
Create an DNSServer instance.
:param list nodes: list of node names that will be setup by pytest xdist
@ -155,10 +152,10 @@ class DNSServer:
"Gave up waiting for DNS server {} to respond".format(BIND_BIND_ADDRESS)
)
def __start__(self) -> Dict[str, Any]:
def __start__(self) -> dict[str, Any]:
self.start()
return self.dns_xdist
def __exit__(self, exc_type: Optional[Type[BaseException]], exc: Optional[BaseException],
def __exit__(self, exc_type: Optional[type[BaseException]], exc: Optional[BaseException],
traceback: Optional[TracebackType]) -> None:
self.stop()

View file

@ -20,9 +20,7 @@ import threading
import time
from typing import Generator
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from cryptography import x509
@ -103,7 +101,7 @@ def create_http_server(port: int) -> Generator[str, None, None]:
server.server_close()
def list_renewal_hooks_dirs(config_dir: str) -> List[str]:
def list_renewal_hooks_dirs(config_dir: str) -> list[str]:
"""
Find and return paths of all hook directories for the given certbot config directory
:param str config_dir: path to the certbot config directory
@ -159,7 +157,7 @@ set -e
@contextlib.contextmanager
def manual_http_hooks(http_server_root: str) -> Generator[Tuple[str, str], None, None]:
def manual_http_hooks(http_server_root: str) -> Generator[tuple[str, str], None, None]:
"""
Generate suitable http-01 hooks command for test purpose in the given HTTP
server webroot directory. These hooks command use temporary python scripts
@ -303,7 +301,7 @@ def echo(keyword: str, path: Optional[str] = None) -> str:
os.path.basename(sys.executable), keyword, ' >> "{0}"'.format(path) if path else '')
def get_acme_issuers() -> List[Certificate]:
def get_acme_issuers() -> list[Certificate]:
"""Gets the list of one or more issuer certificates from the ACME server used by the
context.
:param context: the testing context.

View file

@ -8,7 +8,7 @@ import platform
import stat
import zipfile
from contextlib import ExitStack
from typing import Optional, Tuple
from typing import Optional
import requests
@ -18,7 +18,7 @@ from certbot_integration_tests.utils.constants import MOCK_OCSP_SERVER_PORT
PEBBLE_VERSION = 'v2.8.0'
def fetch(workspace: str, http_01_port: int = DEFAULT_HTTP_01_PORT) -> Tuple[str, str, str]:
def fetch(workspace: str, http_01_port: int = DEFAULT_HTTP_01_PORT) -> tuple[str, str, str]:
# pylint: disable=missing-function-docstring
file_manager = ExitStack()
atexit.register(file_manager.close)

View file

@ -6,14 +6,13 @@ import json
import re
import sys
from typing import Mapping
from typing import Type
import requests
from certbot_integration_tests.utils.misc import GracefulTCPServer
def _create_proxy(mapping: Mapping[str, str]) -> Type[BaseHTTPServer.BaseHTTPRequestHandler]:
def _create_proxy(mapping: Mapping[str, str]) -> type[BaseHTTPServer.BaseHTTPRequestHandler]:
# pylint: disable=missing-function-docstring
class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
# pylint: disable=missing-class-docstring

View file

@ -3,8 +3,6 @@ import argparse
import os
import shutil
import subprocess
from typing import Set
from typing import Tuple
from unittest import mock
from certbot import configuration
@ -80,7 +78,7 @@ def _get_server_root(config: str) -> str:
return os.path.join(config, subdirs[0].rstrip())
def _get_names(config: str) -> Tuple[Set[str], Set[str]]:
def _get_names(config: str) -> tuple[set[str], set[str]]:
"""Returns all and testable domain names in config"""
all_names = set()
non_ip_names = set()

View file

@ -6,12 +6,8 @@ import os
import shutil
import tempfile
from typing import Iterable
from typing import List
from typing import Optional
from typing import overload
from typing import Set
from typing import Tuple
from typing import Type
from typing import Union
from acme import challenges
@ -50,8 +46,8 @@ class Proxy(interfaces.ConfiguratorProxy):
self.http_port = 80
self.https_port = 443
self._configurator: common.Configurator
self._all_names: Optional[Set[str]] = None
self._test_names: Optional[Set[str]] = None
self._all_names: Optional[set[str]] = None
self._test_names: Optional[set[str]] = None
def has_more_configs(self) -> bool:
"""Returns true if there are more configs to test"""
@ -70,14 +66,14 @@ class Proxy(interfaces.ConfiguratorProxy):
@overload
def copy_certs_and_keys(self, cert_path: str, key_path: str,
chain_path: str) -> Tuple[str, str, str]: ...
chain_path: str) -> tuple[str, str, str]: ...
@overload
def copy_certs_and_keys(self, cert_path: str, key_path: str,
chain_path: Optional[str]) -> Tuple[str, str, Optional[str]]: ...
chain_path: Optional[str]) -> tuple[str, str, Optional[str]]: ...
def copy_certs_and_keys(self, cert_path: str, key_path: str,
chain_path: Optional[str] = None) -> Tuple[str, str, Optional[str]]:
chain_path: Optional[str] = None) -> tuple[str, str, Optional[str]]:
"""Copies certs and keys into the temporary directory"""
cert_and_key_dir = os.path.join(self._temp_dir, "certs_and_keys")
if not os.path.isdir(cert_and_key_dir):
@ -94,13 +90,13 @@ class Proxy(interfaces.ConfiguratorProxy):
return cert, key, chain
def get_all_names_answer(self) -> Set[str]:
def get_all_names_answer(self) -> set[str]:
"""Returns the set of domain names that the plugin should find"""
if self._all_names:
return self._all_names
raise errors.Error("No configuration file loaded")
def get_testable_domain_names(self) -> Set[str]:
def get_testable_domain_names(self) -> set[str]:
"""Returns the set of domain names that can be tested against"""
if self._test_names:
return self._test_names
@ -115,20 +111,20 @@ class Proxy(interfaces.ConfiguratorProxy):
self._configurator.deploy_cert(
domain, cert_path, key_path, chain_path, fullchain_path)
def cleanup(self, achalls: List[AnnotatedChallenge]) -> None:
def cleanup(self, achalls: list[AnnotatedChallenge]) -> None:
self._configurator.cleanup(achalls)
def config_test(self) -> None:
self._configurator.config_test()
def enhance(self, domain: str, enhancement: str,
options: Optional[Union[List[str], str]] = None) -> None:
options: Optional[Union[list[str], str]] = None) -> None:
self._configurator.enhance(domain, enhancement, options)
def get_all_names(self) -> Iterable[str]:
return self._configurator.get_all_names()
def get_chall_pref(self, domain: str) -> Iterable[Type[Challenge]]:
def get_chall_pref(self, domain: str) -> Iterable[type[Challenge]]:
return self._configurator.get_chall_pref(domain)
@classmethod
@ -138,7 +134,7 @@ class Proxy(interfaces.ConfiguratorProxy):
def more_info(self) -> str:
return self._configurator.more_info()
def perform(self, achalls: List[AnnotatedChallenge]) -> List[challenges.ChallengeResponse]:
def perform(self, achalls: list[AnnotatedChallenge]) -> list[challenges.ChallengeResponse]:
return self._configurator.perform(achalls)
def prepare(self) -> None:
@ -156,5 +152,5 @@ class Proxy(interfaces.ConfiguratorProxy):
def save(self, title: Optional[str] = None, temporary: bool = False) -> None:
self._configurator.save(title, temporary)
def supported_enhancements(self) -> List[str]:
def supported_enhancements(self) -> list[str]:
return self._configurator.supported_enhancements()

View file

@ -2,8 +2,6 @@
import os
import shutil
import subprocess
from typing import Set
from typing import Tuple
from certbot import configuration
from certbot_compatibility_test import errors
@ -64,9 +62,9 @@ def _get_server_root(config: str) -> str:
return os.path.join(config, subdirs[0].rstrip())
def _get_names(config: str) -> Tuple[Set[str], Set[str]]:
def _get_names(config: str) -> tuple[set[str], set[str]]:
"""Returns all and testable domain names in config"""
all_names: Set[str] = set()
all_names: set[str] = set()
for root, _dirs, files in os.walk(config):
for this_file in files:
update_names = _get_server_names(root, this_file)
@ -75,7 +73,7 @@ def _get_names(config: str) -> Tuple[Set[str], Set[str]]:
return all_names, non_ip_names
def _get_server_names(root: str, filename: str) -> Set[str]:
def _get_server_names(root: str, filename: str) -> set[str]:
"""Returns all names in a config file path"""
all_names = set()
with open(os.path.join(root, filename)) as f:

View file

@ -3,7 +3,6 @@ from abc import ABCMeta
from abc import abstractmethod
import argparse
from typing import cast
from typing import Set
from certbot import interfaces
from certbot.configuration import NamespaceConfig
@ -45,7 +44,7 @@ class PluginProxy(interfaces.Plugin, metaclass=ABCMeta):
"""Loads the next config and returns its name"""
@abstractmethod
def get_testable_domain_names(self) -> Set[str]:
def get_testable_domain_names(self) -> set[str]:
"""Returns the domain names that can be used in testing"""
@ -57,7 +56,7 @@ class InstallerProxy(PluginProxy, interfaces.Installer, metaclass=ABCMeta):
"""Wraps a Certbot installer"""
@abstractmethod
def get_all_names_answer(self) -> Set[str]:
def get_all_names_answer(self) -> set[str]:
"""Returns all names that should be found by the installer"""

View file

@ -10,13 +10,9 @@ import sys
import tempfile
import time
from typing import Any
from typing import Dict
from typing import Generator
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from cryptography.hazmat.primitives import serialization
from urllib3.util import connection
@ -42,7 +38,7 @@ tests that the plugin supports are performed.
"""
PLUGINS: Dict[str, Type[common.Proxy]] = {"apache": a_common.Proxy, "nginx": n_common.Proxy}
PLUGINS: dict[str, type[common.Proxy]] = {"apache": a_common.Proxy, "nginx": n_common.Proxy}
logger = logging.getLogger(__name__)
@ -103,9 +99,9 @@ def test_authenticator(plugin: common.Proxy, config: str, temp_dir: str) -> bool
return success
def _create_achalls(plugin: common.Proxy) -> List[achallenges.AnnotatedChallenge]:
def _create_achalls(plugin: common.Proxy) -> list[achallenges.AnnotatedChallenge]:
"""Returns a list of annotated challenges to test on plugin"""
achalls: List[achallenges.AnnotatedChallenge] = []
achalls: list[achallenges.AnnotatedChallenge] = []
names = plugin.get_testable_domain_names()
for domain in names:
prefs = plugin.get_chall_pref(domain)
@ -145,7 +141,7 @@ def test_installer(args: argparse.Namespace, plugin: common.Proxy, config: str,
return names_match and success and good_rollback
def test_deploy_cert(plugin: common.Proxy, temp_dir: str, domains: List[str]) -> bool:
def test_deploy_cert(plugin: common.Proxy, temp_dir: str, domains: list[str]) -> bool:
"""Tests deploy_cert returning True if the tests are successful"""
cert = crypto_util.make_self_signed_cert(util.KEY, domains)
cert_path = os.path.join(temp_dir, "cert.pem")
@ -187,7 +183,7 @@ def test_enhancements(plugin: common.Proxy, domains: Iterable[str]) -> bool:
"enhancements")
return False
domains_and_info: List[Tuple[str, List[bool]]] = [(domain, []) for domain in domains]
domains_and_info: list[tuple[str, list[bool]]] = [(domain, []) for domain in domains]
for domain, info in domains_and_info:
try:
@ -390,7 +386,7 @@ def _fake_dns_resolution(resolved_ip: str) -> Generator[None, None, None]:
"""Monkey patch urllib3 to make any hostname be resolved to the provided IP"""
_original_create_connection = connection.create_connection
def _patched_create_connection(address: Tuple[str, int],
def _patched_create_connection(address: tuple[str, int],
*args: Any, **kwargs: Any) -> socket.socket:
_, port = address
return _original_create_connection((resolved_ip, port), *args, **kwargs)

View file

@ -5,7 +5,6 @@ import socket
from typing import cast
from typing import Mapping
from typing import Optional
from typing import Tuple
from typing import Union
from cryptography import x509
@ -146,7 +145,7 @@ def _probe_sni(name: bytes, host: bytes, port: int = 443) -> x509.Certificate:
# Enables multi-path probing (selection
# of source interface). See `socket.creation_connection` for more
# info. Available only in Python 2.7+.
source_address: Tuple[str, int] = ('', 0)
source_address: tuple[str, int] = ('', 0)
socket_kwargs = {'source_address': source_address}
try:
@ -157,7 +156,7 @@ def _probe_sni(name: bytes, host: bytes, port: int = 443) -> x509.Certificate:
source_address[1]
) if any(source_address) else ""
)
socket_tuple: Tuple[bytes, int] = (host, port)
socket_tuple: tuple[bytes, int] = (host, port)
sock = socket.create_connection(socket_tuple, **socket_kwargs) # type: ignore[arg-type]
except OSError as error:
raise acme_errors.Error(error)

View file

@ -2,8 +2,6 @@
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import cast
@ -188,7 +186,7 @@ class _CloudflareClient:
"""
zone_name_guesses = dns_common.base_domain_name_guesses(domain)
zones: List[Dict[str, Any]] = []
zones: list[dict[str, Any]] = []
code = msg = None
for zone_name in zone_name_guesses:

View file

@ -2,7 +2,6 @@
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import cast
@ -253,7 +252,7 @@ class _GoogleClient:
except googleapiclient_errors.Error as e:
logger.warning('Encountered error deleting TXT record: %s', e)
def get_existing_txt_rrset(self, zone_id: str, record_name: str) -> Optional[Dict[str, Any]]:
def get_existing_txt_rrset(self, zone_id: str, record_name: str) -> Optional[dict[str, Any]]:
"""
Get existing TXT records from the RRset for the record name.

View file

@ -2,7 +2,6 @@
from __future__ import annotations
import sys
from typing import Optional
from typing import Tuple
import unittest
from unittest import mock
@ -103,7 +102,7 @@ class GoogleClientTest(unittest.TestCase):
def _setUp_client_with_mock(self,
zone_request_side_effect: list[dict[str, list[dict[str, str]]]],
rrs_list_side_effect: Optional[Error] = None
) -> Tuple['certbot_dns_google._internal.dns_google._GoogleClient', mock.MagicMock]:
) -> tuple['certbot_dns_google._internal.dns_google._GoogleClient', mock.MagicMock]:
from certbot_dns_google._internal.dns_google import _GoogleClient
pwd = os.path.dirname(__file__)

View file

@ -4,11 +4,7 @@ import logging
import time
from typing import Any
from typing import Callable
from typing import DefaultDict
from typing import Dict
from typing import Iterable
from typing import List
from typing import Type
from typing import cast
import boto3
@ -45,7 +41,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
super().__init__(*args, **kwargs)
self.r53 = boto3.client("route53")
self._attempt_cleanup = False
self._resource_records: DefaultDict[str, List[Dict[str, str]]] = \
self._resource_records: collections.defaultdict[str, list[dict[str, str]]] = \
collections.defaultdict(list)
def more_info(self) -> str:
@ -56,7 +52,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
# This authenticator currently adds no extra arguments.
pass
def auth_hint(self, failed_achalls: List[achallenges.AnnotatedChallenge]) -> str:
def auth_hint(self, failed_achalls: list[achallenges.AnnotatedChallenge]) -> str:
return (
'The Certificate Authority failed to verify the DNS TXT records created by '
'--dns-route53. Ensure the above domains have their DNS hosted by AWS Route53.'
@ -65,10 +61,10 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
def prepare(self) -> None:
pass
def get_chall_pref(self, unused_domain: str) -> Iterable[Type[challenges.Challenge]]:
def get_chall_pref(self, unused_domain: str) -> Iterable[type[challenges.Challenge]]:
return [challenges.DNS01]
def perform(self, achalls: List[AnnotatedChallenge]) -> List[challenges.ChallengeResponse]:
def perform(self, achalls: list[AnnotatedChallenge]) -> list[challenges.ChallengeResponse]:
self._attempt_cleanup = True
try:
@ -86,7 +82,7 @@ class Authenticator(common.Plugin, interfaces.Authenticator):
raise errors.PluginError("\n".join([str(e), INSTRUCTIONS]))
return [achall.response(achall.account_key) for achall in achalls]
def cleanup(self, achalls: List[achallenges.AnnotatedChallenge]) -> None:
def cleanup(self, achalls: list[achallenges.AnnotatedChallenge]) -> None:
if self._attempt_cleanup:
for achall in achalls:
domain = achall.domain

View file

@ -11,15 +11,10 @@ import tempfile
import time
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Type
from typing import Union
from typing import cast
@ -119,8 +114,8 @@ class NginxConfigurator(common.Configurator):
# List of vhosts configured per wildcard domain on this run.
# used by deploy_cert() and enhance()
self._wildcard_vhosts: Dict[str, List[obj.VirtualHost]] = {}
self._wildcard_redirect_vhosts: Dict[str, List[obj.VirtualHost]] = {}
self._wildcard_vhosts: dict[str, list[obj.VirtualHost]] = {}
self._wildcard_redirect_vhosts: dict[str, list[obj.VirtualHost]] = {}
# Add number of outstanding challenges
self._chall_out = 0
@ -286,7 +281,7 @@ class NginxConfigurator(common.Configurator):
self.save_notes += "\tssl_certificate_key %s\n" % key_path
def _choose_vhosts_wildcard(self, domain: str, prefer_ssl: bool,
no_ssl_filter_port: Optional[str] = None) -> List[obj.VirtualHost]:
no_ssl_filter_port: Optional[str] = None) -> list[obj.VirtualHost]:
"""Prompts user to choose vhosts to install a wildcard certificate for"""
if prefer_ssl:
vhosts_cache = self._wildcard_vhosts
@ -337,13 +332,13 @@ class NginxConfigurator(common.Configurator):
#######################
# Vhost parsing methods
#######################
def _choose_vhost_single(self, target_name: str) -> List[obj.VirtualHost]:
def _choose_vhost_single(self, target_name: str) -> list[obj.VirtualHost]:
matches = self._get_ranked_matches(target_name)
vhosts = [x for x in [self._select_best_name_match(matches)] if x is not None]
return vhosts
def choose_vhosts(self, target_name: str,
create_if_no_match: bool = False) -> List[obj.VirtualHost]:
create_if_no_match: bool = False) -> list[obj.VirtualHost]:
"""Chooses a virtual host based on the given domain name.
.. note:: This makes the vhost SSL-enabled if it isn't already. Follows
@ -387,7 +382,7 @@ class NginxConfigurator(common.Configurator):
return vhosts
def ipv6_info(self, host: str, port: str) -> Tuple[bool, bool]:
def ipv6_info(self, host: str, port: str) -> tuple[bool, bool]:
"""Returns tuple of booleans (ipv6_active, ipv6only_present)
ipv6_active is true if any server block listens ipv6 address in any port
@ -462,7 +457,7 @@ class NginxConfigurator(common.Configurator):
f"block for {domain}. Set the `server_name` directive "
"to use the Nginx installer.")
def _get_ranked_matches(self, target_name: str) -> List[Dict[str, Any]]:
def _get_ranked_matches(self, target_name: str) -> list[dict[str, Any]]:
"""Returns a ranked list of vhosts that match target_name.
The ranking gives preference to SSL vhosts.
@ -497,7 +492,7 @@ class NginxConfigurator(common.Configurator):
return cast(obj.VirtualHost, matches[0]['vhost'])
def _rank_matches_by_name(self, vhost_list: Iterable[obj.VirtualHost],
target_name: str) -> List[Dict[str, Any]]:
target_name: str) -> list[dict[str, Any]]:
"""Returns a ranked list of vhosts from vhost_list that match target_name.
This method should always be followed by a call to _select_best_name_match.
@ -535,7 +530,7 @@ class NginxConfigurator(common.Configurator):
return sorted(matches, key=lambda x: x['rank'])
def _rank_matches_by_name_and_ssl(self, vhost_list: Iterable[obj.VirtualHost],
target_name: str) -> List[Dict[str, Any]]:
target_name: str) -> list[dict[str, Any]]:
"""Returns a ranked list of vhosts from vhost_list that match target_name.
The ranking gives preference to SSLishness before name match level.
@ -552,7 +547,7 @@ class NginxConfigurator(common.Configurator):
match['rank'] += NO_SSL_MODIFIER
return sorted(matches, key=lambda x: x['rank'])
def choose_redirect_vhosts(self, target_name: str, port: str) -> List[obj.VirtualHost]:
def choose_redirect_vhosts(self, target_name: str, port: str) -> list[obj.VirtualHost]:
"""Chooses a single virtual host for redirect enhancement.
Chooses the vhost most closely matching target_name that is
@ -580,8 +575,8 @@ class NginxConfigurator(common.Configurator):
vhosts = [x for x in [self._select_best_name_match(matches)]if x is not None]
return vhosts
def choose_auth_vhosts(self, target_name: str) -> Tuple[List[obj.VirtualHost],
List[obj.VirtualHost]]:
def choose_auth_vhosts(self, target_name: str) -> tuple[list[obj.VirtualHost],
list[obj.VirtualHost]]:
"""Returns a list of HTTP and HTTPS vhosts with a server_name matching target_name.
If no HTTP vhost exists, one will be cloned from the default vhost. If that fails, no HTTP
@ -649,7 +644,7 @@ class NginxConfigurator(common.Configurator):
def _vhost_listening_on_port_no_ssl(self, vhost: obj.VirtualHost, port: str) -> bool:
return self._vhost_listening(vhost, port, False)
def _get_redirect_ranked_matches(self, target_name: str, port: str) -> List[Dict[str, Any]]:
def _get_redirect_ranked_matches(self, target_name: str, port: str) -> list[dict[str, Any]]:
"""Gets a ranked list of plaintextish port-listening vhosts matching target_name
Filter all hosts for those listening on port without using ssl.
@ -671,7 +666,7 @@ class NginxConfigurator(common.Configurator):
return self._rank_matches_by_name(matching_vhosts, target_name)
def get_all_names(self) -> Set[str]:
def get_all_names(self) -> set[str]:
"""Returns all names found in the Nginx Configuration.
:returns: All ServerNames, ServerAliases, and reverse DNS entries for
@ -679,7 +674,7 @@ class NginxConfigurator(common.Configurator):
:rtype: set
"""
all_names: Set[str] = set()
all_names: set[str] = set()
for vhost in self.parser.get_vhosts():
try:
@ -709,7 +704,7 @@ class NginxConfigurator(common.Configurator):
return util.get_filtered_names(all_names)
def _get_snakeoil_paths(self) -> Tuple[str, str]:
def _get_snakeoil_paths(self) -> tuple[str, str]:
"""Generate invalid certs that let us create ssl directives for Nginx"""
# TODO: generate only once
tmp_dir = os.path.join(self.config.work_dir, "snakeoil")
@ -745,7 +740,7 @@ class NginxConfigurator(common.Configurator):
# no addresses should have ssl turned on here
assert not vhost.ssl
addrs_to_insert: List[obj.Addr] = [
addrs_to_insert: list[obj.Addr] = [
obj.Addr.fromstring(f'{addr.get_addr()}:{https_port} ssl')
for addr in vhost.addrs
if addr.get_port() == str(http_port)
@ -764,8 +759,8 @@ class NginxConfigurator(common.Configurator):
if vhost.ipv4_enabled():
addrs_to_insert += [obj.Addr.fromstring(f'{https_port} ssl')]
addr_blocks: List[List[str]] = []
ipv6only_set_here: Set[Tuple[str, str]] = set()
addr_blocks: list[list[str]] = []
ipv6only_set_here: set[tuple[str, str]] = set()
for addr in addrs_to_insert:
host = addr.get_addr()
port = addr.get_port()
@ -809,12 +804,12 @@ class NginxConfigurator(common.Configurator):
##################################
# enhancement methods (Installer)
##################################
def supported_enhancements(self) -> List[str]:
def supported_enhancements(self) -> list[str]:
"""Returns currently supported enhancements."""
return ['redirect', 'ensure-http-header', 'staple-ocsp']
def enhance(self, domain: str, enhancement: str,
options: Optional[Union[str, List[str]]] = None) -> None:
options: Optional[Union[str, list[str]]] = None) -> None:
"""Enhance configuration.
:param str domain: domain to enhance
@ -835,7 +830,7 @@ class NginxConfigurator(common.Configurator):
test_redirect_block = _test_block_from_block(_redirect_block_for_domain(domain))
return vhost.contains_list(test_redirect_block)
def _set_http_header(self, domain: str, header_substring: Union[str, List[str], None]) -> None:
def _set_http_header(self, domain: str, header_substring: Union[str, list[str], None]) -> None:
"""Enables header identified by header_substring on domain.
If the vhost is listening plaintextishly, separates out the relevant
@ -884,8 +879,8 @@ class NginxConfigurator(common.Configurator):
self.parser.add_server_directives(
vhost, redirect_block, insert_at_top=True)
def _split_block(self, vhost: obj.VirtualHost, only_directives: Optional[List[str]] = None
) -> Tuple[obj.VirtualHost, obj.VirtualHost]:
def _split_block(self, vhost: obj.VirtualHost, only_directives: Optional[list[str]] = None
) -> tuple[obj.VirtualHost, obj.VirtualHost]:
"""Splits this "virtual host" (i.e. this nginx server block) into
separate HTTP and HTTPS blocks.
@ -919,7 +914,7 @@ class NginxConfigurator(common.Configurator):
return http_vhost, vhost
def _enable_redirect(self, domain: str,
unused_options: Optional[Union[str, List[str]]]) -> None:
unused_options: Optional[Union[str, list[str]]]) -> None:
"""Redirect all equivalent HTTP traffic to ssl_vhost.
If the vhost is listening plaintextishly, separate out the
@ -976,7 +971,7 @@ class NginxConfigurator(common.Configurator):
self.DEFAULT_LISTEN_PORT, vhost.filep)
def _enable_ocsp_stapling(self, domain: str,
chain_path: Optional[Union[str, List[str]]]) -> None:
chain_path: Optional[Union[str, list[str]]]) -> None:
"""Include OCSP response in TLS handshake
:param str domain: domain to enable OCSP response for
@ -1075,7 +1070,7 @@ class NginxConfigurator(common.Configurator):
"Unable to run %s -V" % self.conf('ctl'))
return text
def get_version(self) -> Tuple[int, ...]:
def get_version(self) -> tuple[int, ...]:
"""Return version of Nginx Server.
Version is returned as tuple. (ie. 2.4.7 = (2, 4, 7))
@ -1133,7 +1128,7 @@ class NginxConfigurator(common.Configurator):
"""
text = self._nginx_version()
matches: List[str] = re.findall(r"running with OpenSSL ([^ ]+) ", text)
matches: list[str] = re.findall(r"running with OpenSSL ([^ ]+) ", text)
if not matches:
matches = re.findall(r"built with OpenSSL ([^ ]+) ", text)
if not matches:
@ -1225,13 +1220,13 @@ class NginxConfigurator(common.Configurator):
###########################################################################
# Challenges Section for Authenticator
###########################################################################
def get_chall_pref(self, unused_domain: str) -> List[Type[challenges.Challenge]]:
def get_chall_pref(self, unused_domain: str) -> list[type[challenges.Challenge]]:
"""Return list of challenge preferences."""
return [challenges.HTTP01]
# Entry point in main.py for performing challenges
def perform(self, achalls: List[achallenges.AnnotatedChallenge]
) -> List[challenges.ChallengeResponse]:
def perform(self, achalls: list[achallenges.AnnotatedChallenge]
) -> list[challenges.ChallengeResponse]:
"""Perform the configuration related challenge.
This function currently assumes all challenges will be fulfilled.
@ -1240,7 +1235,7 @@ class NginxConfigurator(common.Configurator):
"""
self._chall_out += len(achalls)
responses: List[Optional[challenges.ChallengeResponse]] = [None] * len(achalls)
responses: list[Optional[challenges.ChallengeResponse]] = [None] * len(achalls)
http_doer = http_01.NginxHttp01(self)
for i, achall in enumerate(achalls):
@ -1266,7 +1261,7 @@ class NginxConfigurator(common.Configurator):
return [response for response in responses if response]
# called after challenges are performed
def cleanup(self, achalls: List[achallenges.AnnotatedChallenge]) -> None:
def cleanup(self, achalls: list[achallenges.AnnotatedChallenge]) -> None:
"""Revert all challenges."""
self._chall_out -= len(achalls)
@ -1276,13 +1271,13 @@ class NginxConfigurator(common.Configurator):
self.restart()
def _test_block_from_block(block: List[Any]) -> List[Any]:
def _test_block_from_block(block: list[Any]) -> list[Any]:
test_block = nginxparser.UnspacedList(block)
parser.comment_directive(test_block, 0)
return test_block[:-1]
def _redirect_block_for_domain(domain: str) -> List[Any]:
def _redirect_block_for_domain(domain: str) -> list[Any]:
updated_domain = domain
match_symbol = '='
if util.is_wildcard_domain(domain):

View file

@ -1,7 +1,6 @@
"""nginx plugin constants."""
import platform
from typing import Any
from typing import Dict
FREEBSD_DARWIN_SERVER_ROOT = "/usr/local/etc/nginx"
LINUX_SERVER_ROOT = "/etc/nginx"
@ -14,7 +13,7 @@ elif platform.system() in ('NetBSD',):
else:
server_root_tmp = LINUX_SERVER_ROOT
CLI_DEFAULTS: Dict[str, Any] = {
CLI_DEFAULTS: dict[str, Any] = {
"server_root": server_root_tmp,
"ctl": "nginx",
"sleep_seconds": 1

View file

@ -1,7 +1,6 @@
"""Contains UI methods for Nginx operations."""
import logging
from typing import Iterable
from typing import List
from typing import Optional
from certbot.display import util as display_util
@ -10,7 +9,7 @@ from certbot_nginx._internal.obj import VirtualHost
logger = logging.getLogger(__name__)
def select_vhost_multiple(vhosts: Optional[Iterable[VirtualHost]]) -> List[VirtualHost]:
def select_vhost_multiple(vhosts: Optional[Iterable[VirtualHost]]) -> list[VirtualHost]:
"""Select multiple Vhosts to install the certificate for
:param vhosts: Available Nginx VirtualHosts
:type vhosts: :class:`list` of type `~obj.Vhost`
@ -32,7 +31,7 @@ def select_vhost_multiple(vhosts: Optional[Iterable[VirtualHost]]) -> List[Virtu
return []
def _reversemap_vhosts(names: Iterable[str], vhosts: Iterable[VirtualHost]) -> List[VirtualHost]:
def _reversemap_vhosts(names: Iterable[str], vhosts: Iterable[VirtualHost]) -> list[VirtualHost]:
"""Helper function for select_vhost_multiple for mapping string
representations back to actual vhost objects"""
return_vhosts = []

View file

@ -2,7 +2,6 @@
import logging
from typing import Any
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
@ -47,7 +46,7 @@ class NginxHttp01(common.ChallengePerformer):
self.challenge_conf = os.path.join(
configurator.config.config_dir, "le_http_01_cert_challenge.conf")
def perform(self) -> List[KeyAuthorizationChallengeResponse]:
def perform(self) -> list[KeyAuthorizationChallengeResponse]:
"""Perform a challenge on Nginx.
:returns: list of :class:`acme.challenges.KeyAuthorizationChallengeResponse`
@ -142,12 +141,12 @@ class NginxHttp01(common.ChallengePerformer):
with open(self.challenge_conf, "w", encoding="utf-8") as new_conf:
nginxparser.dump(config, new_conf)
def _default_listen_addresses(self) -> List[Addr]:
def _default_listen_addresses(self) -> list[Addr]:
"""Finds addresses for a challenge block to listen on.
:returns: list of :class:`certbot_nginx._internal.obj.Addr` to apply
:rtype: list
"""
addresses: List[Addr] = []
addresses: list[Addr] = []
default_addr = "%s" % self.configurator.config.http01_port
ipv6_addr = "[::]:{0}".format(
self.configurator.config.http01_port)
@ -175,7 +174,7 @@ class NginxHttp01(common.ChallengePerformer):
def _get_validation_path(self, achall: KeyAuthorizationAnnotatedChallenge) -> str:
return os.sep + os.path.join(challenges.HTTP01.URI_ROOT_PATH, achall.chall.encode("token"))
def _make_server_block(self, achall: KeyAuthorizationAnnotatedChallenge) -> List[Any]:
def _make_server_block(self, achall: KeyAuthorizationAnnotatedChallenge) -> list[Any]:
"""Creates a server block for a challenge.
:param achall: Annotated HTTP-01 challenge
@ -199,7 +198,7 @@ class NginxHttp01(common.ChallengePerformer):
return [['server'], block]
def _location_directive_for_achall(self, achall: KeyAuthorizationAnnotatedChallenge
) -> List[Any]:
) -> list[Any]:
validation = achall.validation(achall.account_key)
validation_path = self._get_validation_path(achall)
@ -209,7 +208,7 @@ class NginxHttp01(common.ChallengePerformer):
return location_directive
def _make_or_mod_server_block(self, achall: KeyAuthorizationAnnotatedChallenge
) -> Optional[List[Any]]:
) -> Optional[list[Any]]:
"""Modifies server blocks to respond to a challenge. Returns a new HTTP server block
to add to the configuration if an existing one can't be found.
@ -222,7 +221,7 @@ class NginxHttp01(common.ChallengePerformer):
"""
http_vhosts, https_vhosts = self.configurator.choose_auth_vhosts(achall.domain)
new_vhost: Optional[List[Any]] = None
new_vhost: Optional[list[Any]] = None
if not http_vhosts:
# Couldn't find either a matching name+port server block
# or a port+default_server block, so create a dummy block

View file

@ -8,10 +8,8 @@ from typing import Any
from typing import IO
from typing import Iterable
from typing import Iterator
from typing import List
from typing import overload
from typing import SupportsIndex
from typing import Tuple
from typing import Union
from pyparsing import Combine
@ -76,17 +74,17 @@ class RawNginxParser:
"""Returns the parsed tree."""
return self.script.parseString(self.source)
def as_list(self) -> List[Any]:
def as_list(self) -> list[Any]:
"""Returns the parsed tree as a list."""
return self.parse().asList()
class RawNginxDumper:
"""A class that dumps nginx configuration from the provided tree."""
def __init__(self, blocks: List[Any]) -> None:
def __init__(self, blocks: list[Any]) -> None:
self.blocks = blocks
def __iter__(self, blocks: typing.Optional[List[Any]] = None) -> Iterator[str]:
def __iter__(self, blocks: typing.Optional[list[Any]] = None) -> Iterator[str]:
"""Iterates the dumped nginx content."""
blocks = blocks or self.blocks
for b0 in blocks:
@ -120,7 +118,7 @@ def spacey(x: Any) -> bool:
return (isinstance(x, str) and x.isspace()) or x == ''
class UnspacedList(List[Any]):
class UnspacedList(list[Any]):
"""Wrap a list [of lists], making any whitespace entries magically invisible"""
def __init__(self, list_source: Iterable[Any]) -> None:
@ -142,15 +140,15 @@ class UnspacedList(List[Any]):
super().__delitem__(i)
@overload
def _coerce(self, inbound: None) -> Tuple[None, None]: ...
def _coerce(self, inbound: None) -> tuple[None, None]: ...
@overload
def _coerce(self, inbound: str) -> Tuple[str, str]: ...
def _coerce(self, inbound: str) -> tuple[str, str]: ...
@overload
def _coerce(self, inbound: List[Any]) -> Tuple["UnspacedList", List[Any]]: ...
def _coerce(self, inbound: list[Any]) -> tuple["UnspacedList", list[Any]]: ...
def _coerce(self, inbound: Any) -> Tuple[Any, Any]:
def _coerce(self, inbound: Any) -> tuple[Any, Any]:
"""
Coerce some inbound object to be appropriately usable in this object
@ -191,7 +189,7 @@ class UnspacedList(List[Any]):
super().extend(item)
self.dirty = True
def __add__(self, other: List[Any]) -> "UnspacedList":
def __add__(self, other: list[Any]) -> "UnspacedList":
new_list = copy.deepcopy(self)
new_list.extend(other)
new_list.dirty = True

View file

@ -1,10 +1,8 @@
"""Module contains classes used by the Nginx Configurator."""
import re
from typing import Any
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Union
from certbot.plugins import common
@ -178,7 +176,7 @@ class VirtualHost:
"""
def __init__(self, filep: str, addrs: Sequence[Addr], ssl: bool, enabled: bool,
names: Set[str], raw: List[Any], path: List[int]) -> None:
names: set[str], raw: list[Any], path: list[int]) -> None:
"""Initialize a VH."""
self.filep = filep
self.addrs = addrs
@ -226,7 +224,7 @@ class VirtualHost:
found = _find_directive(self.raw, ADD_HEADER_DIRECTIVE, header_name)
return found is not None
def contains_list(self, test: List[Any]) -> bool:
def contains_list(self, test: list[Any]) -> bool:
"""Determine if raw server block contains test list at top level
"""
for i in range(0, len(self.raw) - len(test) + 1):
@ -265,7 +263,7 @@ class VirtualHost:
https="Yes" if self.ssl else "No"))
def _find_directive(directives: Optional[Union[str, List[Any]]], directive_name: str,
def _find_directive(directives: Optional[Union[str, list[Any]]], directive_name: str,
match_content: Optional[Any] = None) -> Optional[Any]:
"""Find a directive of type directive_name in directives. If match_content is given,
Searches for `match_content` in the directive arguments.

View file

@ -8,14 +8,10 @@ import re
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
import pyparsing
@ -39,7 +35,7 @@ class NginxParser:
"""
def __init__(self, root: str) -> None:
self.parsed: Dict[str, UnspacedList] = {}
self.parsed: dict[str, UnspacedList] = {}
self.root = os.path.abspath(root)
self.config_root = self._find_config_root()
self._http_path: str | None = None
@ -108,12 +104,12 @@ class NginxParser:
return os.path.normpath(os.path.join(self.root, path))
return os.path.normpath(path)
def _build_addr_to_ssl(self) -> Dict[Tuple[str, str], bool]:
def _build_addr_to_ssl(self) -> dict[tuple[str, str], bool]:
"""Builds a map from address to whether it listens on ssl in any server block
"""
servers = self._get_raw_servers()
addr_to_ssl: Dict[Tuple[str, str], bool] = {}
addr_to_ssl: dict[tuple[str, str], bool] = {}
for server_list in servers.values():
for server, _ in server_list:
# Parse the server block to save addr info
@ -125,11 +121,11 @@ class NginxParser:
addr_to_ssl[addr_tuple] = addr.ssl or addr_to_ssl[addr_tuple]
return addr_to_ssl
def _get_raw_servers(self) -> Dict[str, Union[List[Any], UnspacedList]]:
def _get_raw_servers(self) -> dict[str, Union[list[Any], UnspacedList]]:
# pylint: disable=cell-var-from-loop
"""Get a map of unparsed all server blocks
"""
servers: Dict[str, Union[List[Any], nginxparser.UnspacedList]] = {}
servers: dict[str, Union[list[Any], nginxparser.UnspacedList]] = {}
for filename, tree in self.parsed.items():
servers[filename] = []
srv = servers[filename] # workaround undefined loop var in lambdas
@ -144,7 +140,7 @@ class NginxParser:
servers[filename][i] = (new_server, path)
return servers
def get_vhosts(self) -> List[obj.VirtualHost]:
def get_vhosts(self) -> list[obj.VirtualHost]:
"""Gets list of all 'virtual hosts' found in Nginx configuration.
Technically this is a misnomer because Nginx does not have virtual
hosts, it has 'server blocks'.
@ -276,7 +272,7 @@ class NginxParser:
except OSError:
logger.error("Could not open file for writing: %s", filename)
def parse_server(self, server: UnspacedList) -> Dict[str, Any]:
def parse_server(self, server: UnspacedList) -> dict[str, Any]:
"""Parses a list of server directives, accounting for global address sslishness.
:param list server: list of directives in a server block
@ -305,7 +301,7 @@ class NginxParser:
return False
def add_server_directives(self, vhost: obj.VirtualHost, directives: List[Any],
def add_server_directives(self, vhost: obj.VirtualHost, directives: list[Any],
insert_at_top: bool = False) -> None:
"""Add directives to the server block identified by vhost.
@ -327,7 +323,7 @@ class NginxParser:
self._modify_server_directives(vhost,
functools.partial(_add_directives, directives, insert_at_top))
def update_or_add_server_directives(self, vhost: obj.VirtualHost, directives: List[Any],
def update_or_add_server_directives(self, vhost: obj.VirtualHost, directives: list[Any],
insert_at_top: bool = False) -> None:
"""Add or replace directives in the server block identified by vhost.
@ -373,7 +369,7 @@ class NginxParser:
vhost.raw = new_server
def _modify_server_directives(self, vhost: obj.VirtualHost,
block_func: Callable[[List[Any]], None]) -> None:
block_func: Callable[[list[Any]], None]) -> None:
filename = vhost.filep
try:
result = self.parsed[filename]
@ -390,7 +386,7 @@ class NginxParser:
def duplicate_vhost(self, vhost_template: obj.VirtualHost,
remove_singleton_listen_params: bool = False,
only_directives: Optional[List[Any]] = None) -> obj.VirtualHost:
only_directives: Optional[list[Any]] = None) -> obj.VirtualHost:
"""Duplicate the vhost in the configuration files.
:param :class:`~certbot_nginx._internal.obj.VirtualHost` vhost_template: The vhost
@ -443,7 +439,7 @@ class NginxParser:
return new_vhost
def _parse_ssl_options(ssl_options: Optional[str]) -> List[UnspacedList]:
def _parse_ssl_options(ssl_options: Optional[str]) -> list[UnspacedList]:
if ssl_options is not None:
try:
with open(ssl_options, "r", encoding="utf-8") as _file:
@ -458,9 +454,9 @@ def _parse_ssl_options(ssl_options: Optional[str]) -> List[UnspacedList]:
return UnspacedList([])
def _do_for_subarray(entry: List[Any], condition: Callable[[List[Any]], bool],
func: Callable[[List[Any], List[int]], None],
path: Optional[List[int]] = None) -> None:
def _do_for_subarray(entry: list[Any], condition: Callable[[list[Any]], bool],
func: Callable[[list[Any], list[int]], None],
path: Optional[list[int]] = None) -> None:
"""Executes a function for a subarray of a nested array if it matches
the given condition.
@ -479,7 +475,7 @@ def _do_for_subarray(entry: List[Any], condition: Callable[[List[Any]], bool],
_do_for_subarray(item, condition, func, path + [index])
def get_best_match(target_name: str, names: Iterable[str]) -> Tuple[Optional[str], Optional[str]]:
def get_best_match(target_name: str, names: Iterable[str]) -> tuple[Optional[str], Optional[str]]:
"""Finds the best match for target_name out of names using the Nginx
name-matching rules (exact > longest wildcard starting with * >
longest wildcard ending with * > regex).
@ -595,7 +591,7 @@ def _is_ssl_on_directive(entry: Any) -> bool:
entry[1] == 'on')
def _add_directives(directives: List[Any], insert_at_top: bool,
def _add_directives(directives: list[Any], insert_at_top: bool,
block: UnspacedList) -> None:
"""Adds directives to a config block."""
for directive in directives:
@ -604,7 +600,7 @@ def _add_directives(directives: List[Any], insert_at_top: bool,
block.append(nginxparser.UnspacedList('\n'))
def _update_or_add_directives(directives: List[Any], insert_at_top: bool,
def _update_or_add_directives(directives: list[Any], insert_at_top: bool,
block: UnspacedList) -> None:
"""Adds or replaces directives in a config block."""
for directive in directives:
@ -783,8 +779,8 @@ def _remove_directives(directive_name: str, match_func: Callable[[Any], bool],
del block[location]
def _apply_global_addr_ssl(addr_to_ssl: Mapping[Tuple[str, str], bool],
parsed_server: Dict[str, Any]) -> None:
def _apply_global_addr_ssl(addr_to_ssl: Mapping[tuple[str, str], bool],
parsed_server: dict[str, Any]) -> None:
"""Apply global sslishness information to the parsed server block
"""
for addr in parsed_server['addrs']:
@ -793,16 +789,16 @@ def _apply_global_addr_ssl(addr_to_ssl: Mapping[Tuple[str, str], bool],
parsed_server['ssl'] = True
def _parse_server_raw(server: UnspacedList) -> Dict[str, Any]:
def _parse_server_raw(server: UnspacedList) -> dict[str, Any]:
"""Parses a list of server directives.
:param list server: list of directives in a server block
:rtype: dict
"""
addrs: Set[obj.Addr] = set()
addrs: set[obj.Addr] = set()
ssl: bool = False
names: Set[str] = set()
names: set[str] = set()
apply_ssl_to_all_addrs = False

View file

@ -9,11 +9,8 @@ import logging
from typing import Any
from typing import Callable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Type
from certbot import errors
@ -32,12 +29,12 @@ class Parsable:
__metaclass__ = abc.ABCMeta
def __init__(self, parent: Optional["Parsable"] = None):
self._data: List[Any] = []
self._data: list[Any] = []
self._tabs = None
self.parent = parent
@classmethod
def parsing_hooks(cls) -> Tuple[Type["Block"], Type["Sentence"], Type["Statements"]]:
def parsing_hooks(cls) -> tuple[type["Block"], type["Sentence"], type["Statements"]]:
"""Returns object types that this class should be able to `parse` recursively.
The order of the objects indicates the order in which the parser should
try to parse each subitem.
@ -57,7 +54,7 @@ class Parsable:
raise NotImplementedError()
@abc.abstractmethod
def parse(self, raw_list: List[Any], add_spaces: bool = False) -> None:
def parse(self, raw_list: list[Any], add_spaces: bool = False) -> None:
""" Loads information into this object from underlying raw_list structure.
Each Parsable object might make different assumptions about the structure of
raw_list.
@ -108,7 +105,7 @@ class Parsable:
"""
raise NotImplementedError()
def dump(self, include_spaces: bool = False) -> List[Any]:
def dump(self, include_spaces: bool = False) -> list[Any]:
""" Dumps back to pyparsing-like list tree. The opposite of `parse`.
Note: if this object has not been modified, `dump` with `include_spaces=True`
@ -153,7 +150,7 @@ class Statements(Parsable):
if self.parent is not None:
self._trailing_whitespace = "\n" + self.parent.get_tabs()
def parse(self, raw_list: List[Any], add_spaces: bool = False) -> None:
def parse(self, raw_list: list[Any], add_spaces: bool = False) -> None:
""" Parses a list of statements.
Expects all elements in `raw_list` to be parseable by `type(self).parsing_hooks`,
with an optional whitespace string at the last index of `raw_list`.
@ -173,7 +170,7 @@ class Statements(Parsable):
return self._data[0].get_tabs()
return ""
def dump(self, include_spaces: bool = False) -> List[Any]:
def dump(self, include_spaces: bool = False) -> list[Any]:
""" Dumps this object by first dumping each statement, then appending its
trailing whitespace (if `include_spaces` is set) """
data = super().dump(include_spaces)
@ -190,9 +187,9 @@ class Statements(Parsable):
# ======== End overridden functions
def _space_list(list_: Sequence[Any]) -> List[str]:
def _space_list(list_: Sequence[Any]) -> list[str]:
""" Inserts whitespace between adjacent non-whitespace tokens. """
spaced_statement: List[str] = []
spaced_statement: list[str] = []
for i in reversed(range(len(list_))):
spaced_statement.insert(0, list_[i])
if i > 0 and not list_[i].isspace() and not list_[i-1].isspace():
@ -217,7 +214,7 @@ class Sentence(Parsable):
return (isinstance(lists, list) and len(lists) > 0 and
all(isinstance(elem, str) for elem in lists))
def parse(self, raw_list: List[Any], add_spaces: bool = False) -> None:
def parse(self, raw_list: list[Any], add_spaces: bool = False) -> None:
""" Parses a list of string types into this object.
If add_spaces is set, adds whitespace tokens between adjacent non-whitespace tokens."""
if add_spaces:
@ -240,7 +237,7 @@ class Sentence(Parsable):
return
self._data.insert(0, "\n" + tabs)
def dump(self, include_spaces: bool = False) -> List[Any]:
def dump(self, include_spaces: bool = False) -> list[Any]:
""" Dumps this sentence. If include_spaces is set, includes whitespace tokens."""
if not include_spaces:
return self.words
@ -258,7 +255,7 @@ class Sentence(Parsable):
# ======== End overridden functions
@property
def words(self) -> List[str]:
def words(self) -> list[str]:
""" Iterates over words, but without spaces. Like Unspaced List. """
return [word.strip("\"\'") for word in self._data if not word.isspace()]
@ -311,7 +308,7 @@ class Block(Parsable):
if expanded:
yield from self.contents.iterate(expanded, match)
def parse(self, raw_list: List[Any], add_spaces: bool = False) -> None:
def parse(self, raw_list: list[Any], add_spaces: bool = False) -> None:
""" Parses a list that resembles a block.
The assumptions that this routine makes are:

View file

@ -3,7 +3,6 @@ import glob
import re
import shutil
import sys
from typing import List
import pytest
@ -125,7 +124,7 @@ class NginxParserTest(util.NginxTest):
([[[0], [3], [4]], [[5], [3], [0]]], [])]
for mylist, result in mylists:
paths: List[List[int]] = []
paths: list[list[int]] = []
parser._do_for_subarray(mylist,
lambda x: isinstance(x, list) and
len(x) >= 1 and

View file

@ -3,6 +3,9 @@ line-length = 100
extend-exclude = ['tools', 'letstest']
[lint]
# Check for PEP 585 style annotations to prevent regressions in
# https://github.com/certbot/certbot/issues/10195
extend-select = ["UP006"]
# Skip bare `except` rules (`E722`).
# Skip ambiguous variable name (`E741`).
ignore = ["E722", "E741",]