From dd876a40ed79efaddc7dfcb9a192c5361260f32f Mon Sep 17 00:00:00 2001 From: Jonathan Vanasco Date: Wed, 2 Apr 2025 13:53:47 -0400 Subject: [PATCH] Feature acme cryptography 2 (#10245) redoing https://github.com/certbot/certbot/pull/10174 but lots of mergecommits and ff wanted; so test in a clean environment --- acme/acme/_internal/tests/challenges_test.py | 2 +- acme/acme/_internal/tests/crypto_util_test.py | 138 +++------------- acme/acme/_internal/tests/jose_test.py | 1 - acme/acme/_internal/tests/messages_test.py | 6 +- acme/acme/_internal/tests/standalone_test.py | 6 +- acme/acme/_internal/tests/test_util.py | 46 +++--- acme/acme/_internal/tests/util_test.py | 1 - acme/acme/challenges.py | 24 ++- acme/acme/client.py | 20 +-- acme/acme/crypto_util.py | 156 +++--------------- acme/acme/messages.py | 31 ++-- acme/examples/http01_example.py | 6 +- acme/setup.py | 6 +- .../certbot_compatibility_test/validator.py | 2 +- certbot/CHANGELOG.md | 10 +- certbot/certbot/_internal/client.py | 5 +- certbot/certbot/_internal/main.py | 5 +- certbot/certbot/_internal/storage.py | 3 +- .../_internal/tests/crypto_util_test.py | 62 ++----- certbot/certbot/_internal/tests/main_test.py | 55 ++---- certbot/certbot/crypto_util.py | 52 ------ certbot/certbot/tests/util.py | 34 ++-- certbot/setup.py | 5 +- tools/oldest_constraints.txt | 2 +- tools/requirements.txt | 2 +- 25 files changed, 176 insertions(+), 504 deletions(-) diff --git a/acme/acme/_internal/tests/challenges_test.py b/acme/acme/_internal/tests/challenges_test.py index 98461dc51..1d5536469 100644 --- a/acme/acme/_internal/tests/challenges_test.py +++ b/acme/acme/_internal/tests/challenges_test.py @@ -13,7 +13,7 @@ import requests from acme import errors from acme._internal.tests import test_util -CERT = test_util.load_comparable_cert('cert.pem') +CERT = test_util.load_cert('cert.pem') KEY = jose.JWKRSA(key=test_util.load_rsa_private_key('rsa512_key.pem')) diff --git a/acme/acme/_internal/tests/crypto_util_test.py b/acme/acme/_internal/tests/crypto_util_test.py index fc37f591f..ee42b3f43 100644 --- a/acme/acme/_internal/tests/crypto_util_test.py +++ b/acme/acme/_internal/tests/crypto_util_test.py @@ -11,8 +11,6 @@ import unittest from unittest import mock import warnings -import josepy as jose -import OpenSSL import pytest from cryptography import x509 from cryptography.hazmat.primitives import serialization @@ -33,10 +31,10 @@ class SSLSocketAndProbeSNITest(unittest.TestCase): """Tests for acme.crypto_util.SSLSocket/probe_sni.""" def setUp(self): - self.cert = test_util.load_comparable_cert('rsa2048_cert.pem') + self.cert = test_util.load_cert('rsa2048_cert.pem') key = test_util.load_pyopenssl_private_key('rsa2048_key.pem') # pylint: disable=protected-access - certs = {b'foo': (key, self.cert.wrapped)} + certs = {b'foo': (key, self.cert)} from acme.crypto_util import SSLSocket @@ -58,8 +56,7 @@ class SSLSocketAndProbeSNITest(unittest.TestCase): def _probe(self, name): from acme.crypto_util import probe_sni - return jose.ComparableX509(probe_sni( - name, host='127.0.0.1', port=self.port)) + return probe_sni(name, host='127.0.0.1', port=self.port) def _start_server(self): self.server_thread.start() @@ -97,48 +94,29 @@ class SSLSocketTest(unittest.TestCase): _ = SSLSocket(None) -class PyOpenSSLCertOrReqAllNamesTest(unittest.TestCase): - """Test for acme.crypto_util._pyopenssl_cert_or_req_all_names.""" +class MiscTests(unittest.TestCase): + + def test_dump_cryptography_chain(self): + from acme.crypto_util import dump_cryptography_chain + + cert1 = test_util.load_cert('rsa2048_cert.pem') + cert2 = test_util.load_cert('rsa4096_cert.pem') + + chain = [cert1, cert2] + dumped = dump_cryptography_chain(chain) + + # default is PEM encoding Encoding.PEM + assert isinstance(dumped, bytes) + + +class CryptographyCertOrReqSANTest(unittest.TestCase): + """Test for acme.crypto_util._cryptography_cert_or_req_san.""" @classmethod def _call(cls, loader, name): # pylint: disable=protected-access - from acme.crypto_util import _pyopenssl_cert_or_req_all_names - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='acme.crypto_util._pyopenssl_cert_or_req_all_names is deprecated *' - ) - return _pyopenssl_cert_or_req_all_names(loader(name)) - - def _call_cert(self, name): - return self._call(test_util.load_cert, name) - - def test_cert_one_san_no_common(self): - assert self._call_cert('cert-nocn.der') == \ - ['no-common-name.badssl.com'] - - def test_cert_no_sans_yes_common(self): - assert self._call_cert('cert.pem') == ['example.com'] - - def test_cert_two_sans_yes_common(self): - assert self._call_cert('cert-san.pem') == \ - ['example.com', 'www.example.com'] - - -class PyOpenSSLCertOrReqSANTest(unittest.TestCase): - """Test for acme.crypto_util._pyopenssl_cert_or_req_san.""" - - @classmethod - def _call(cls, loader, name): - # pylint: disable=protected-access - from acme.crypto_util import _pyopenssl_cert_or_req_san - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='acme.crypto_util._pyopenssl_cert_or_req_san is deprecated *' - ) - return _pyopenssl_cert_or_req_san(loader(name)) + from acme.crypto_util import _cryptography_cert_or_req_san + return _cryptography_cert_or_req_san(loader(name)) @classmethod def _get_idn_names(cls): @@ -285,41 +263,6 @@ class GenMakeSelfSignedCertTest(unittest.TestCase): self.assertIn(extension, cert.extensions) -class GenSsCertTest(unittest.TestCase): - """Test for gen_ss_cert (generation of self-signed cert).""" - - - def setUp(self): - self.cert_count = 5 - self.serial_num: List[int] = [] - self.key = OpenSSL.crypto.PKey() - self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) - - def test_sn_collisions(self): - from acme.crypto_util import gen_ss_cert - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - for _ in range(self.cert_count): - cert = gen_ss_cert(self.key, ['dummy'], force_san=True, - ips=[ipaddress.ip_address("10.10.10.10")]) - self.serial_num.append(cert.get_serial_number()) - assert len(set(self.serial_num)) >= self.cert_count - - def test_no_name(self): - from acme.crypto_util import gen_ss_cert - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - with pytest.raises(AssertionError): - gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")]) - gen_ss_cert(self.key) - - def test_no_ips(self): - from acme.crypto_util import gen_ss_cert - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - gen_ss_cert(self.key, ['dummy']) - - class MakeCSRTest(unittest.TestCase): """Test for standalone functions.""" @@ -395,42 +338,5 @@ class MakeCSRTest(unittest.TestCase): make_csr(privkey_pem, ["a.example"]) -class DumpPyopensslChainTest(unittest.TestCase): - """Test for dump_pyopenssl_chain.""" - - @classmethod - def _call(cls, loaded): - # pylint: disable=protected-access - from acme.crypto_util import dump_pyopenssl_chain - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='acme.crypto_util.dump_pyopenssl_chain is *' - ) - return dump_pyopenssl_chain(loaded) - - def test_dump_pyopenssl_chain(self): - names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem'] - loaded = [test_util.load_cert(name) for name in names] - length = sum( - len(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)) - for cert in loaded) - assert len(self._call(loaded)) == length - - def test_dump_pyopenssl_chain_wrapped(self): - names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem'] - loaded = [test_util.load_cert(name) for name in names] - wrap_func = jose.ComparableX509 - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='The next major version of josepy *' - ) - wrapped = [wrap_func(cert) for cert in loaded] - dump_func = OpenSSL.crypto.dump_certificate - length = sum(len(dump_func(OpenSSL.crypto.FILETYPE_PEM, cert)) for cert in loaded) - assert len(self._call(wrapped)) == length - - if __name__ == '__main__': sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover diff --git a/acme/acme/_internal/tests/jose_test.py b/acme/acme/_internal/tests/jose_test.py index 216bc69ca..74654434d 100644 --- a/acme/acme/_internal/tests/jose_test.py +++ b/acme/acme/_internal/tests/jose_test.py @@ -1,7 +1,6 @@ """Tests for acme.jose shim.""" import importlib import sys -import unittest import pytest diff --git a/acme/acme/_internal/tests/messages_test.py b/acme/acme/_internal/tests/messages_test.py index 76757f19c..ae656bd0c 100644 --- a/acme/acme/_internal/tests/messages_test.py +++ b/acme/acme/_internal/tests/messages_test.py @@ -1,10 +1,8 @@ """Tests for acme.messages.""" -import contextlib import sys from typing import Dict import unittest from unittest import mock -import warnings import josepy as jose import pytest @@ -12,8 +10,8 @@ import pytest from acme import challenges from acme._internal.tests import test_util -CERT = test_util.load_comparable_cert('cert.der') -CSR = test_util.load_comparable_csr('csr.der') +CERT = test_util.load_cert('cert.der') +CSR = test_util.load_csr('csr.der') KEY = test_util.load_rsa_private_key('rsa512_key.pem') diff --git a/acme/acme/_internal/tests/standalone_test.py b/acme/acme/_internal/tests/standalone_test.py index a5e060cde..05cac9728 100644 --- a/acme/acme/_internal/tests/standalone_test.py +++ b/acme/acme/_internal/tests/standalone_test.py @@ -144,8 +144,8 @@ class TLSALPN01ServerTest(unittest.TestCase): # cert = crypto_util.probe_sni( # b'localhost', host=host, port=port, timeout=1) # # Expect normal cert when connecting without ALPN. - # self.assertEqual(jose.ComparableX509(cert), - # jose.ComparableX509(self.certs[b'localhost'][1])) + # self.assertEqual(cert, + # self.certs[b'localhost'][1]) def test_challenge_certs(self): host, port = self.server.socket.getsockname()[:2] @@ -153,7 +153,7 @@ class TLSALPN01ServerTest(unittest.TestCase): b'localhost', host=host, port=port, timeout=1, alpn_protocols=[b"acme-tls/1"]) # Expect challenge cert when connecting with ALPN. - assert cert.to_cryptography() == self.challenge_certs[b'localhost'][1] + assert cert == self.challenge_certs[b'localhost'][1] def test_bad_alpn(self): host, port = self.server.socket.getsockname()[:2] diff --git a/acme/acme/_internal/tests/test_util.py b/acme/acme/_internal/tests/test_util.py index 7a2789ba6..fe016e12d 100644 --- a/acme/acme/_internal/tests/test_util.py +++ b/acme/acme/_internal/tests/test_util.py @@ -5,8 +5,9 @@ """ import importlib.resources import os -import sys +from typing import Callable +from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization import josepy as jose @@ -21,37 +22,36 @@ def load_vector(*names): return vector_ref.read_bytes() -def _guess_loader(filename, loader_pem, loader_der): +def _guess_loader(filename: str, loader_pem: Callable, loader_der: Callable) -> Callable: _, ext = os.path.splitext(filename) - if ext.lower() == '.pem': + if ext.lower() == ".pem": return loader_pem - elif ext.lower() == '.der': + elif ext.lower() == ".der": return loader_der - raise ValueError("Loader could not be recognized based on extension") # pragma: no cover + else: # pragma: no cover + raise ValueError("Loader could not be recognized based on extension") -def load_cert(*names): +def _guess_pyopenssl_loader(filename: str, loader_pem: int, loader_der: int) -> int: + _, ext = os.path.splitext(filename) + if ext.lower() == ".pem": + return loader_pem + else: # pragma: no cover + raise ValueError("Loader could not be recognized based on extension") + + +def load_cert(*names: str) -> x509.Certificate: """Load certificate.""" loader = _guess_loader( - names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1) - return crypto.load_certificate(loader, load_vector(*names)) + names[-1], x509.load_pem_x509_certificate, x509.load_der_x509_certificate + ) + return loader(load_vector(*names)) -def load_comparable_cert(*names): - """Load ComparableX509 cert.""" - return jose.ComparableX509(load_cert(*names)) - - -def load_csr(*names): +def load_csr(*names: str) -> x509.CertificateSigningRequest: """Load certificate request.""" - loader = _guess_loader( - names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1) - return crypto.load_certificate_request(loader, load_vector(*names)) - - -def load_comparable_csr(*names): - """Load ComparableX509 certificate request.""" - return jose.ComparableX509(load_csr(*names)) + loader = _guess_loader(names[-1], x509.load_pem_x509_csr, x509.load_der_x509_csr) + return loader(load_vector(*names)) def load_rsa_private_key(*names): @@ -72,6 +72,6 @@ def load_ecdsa_private_key(*names): def load_pyopenssl_private_key(*names): """Load pyOpenSSL private key.""" - loader = _guess_loader( + loader = _guess_pyopenssl_loader( names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1) return crypto.load_privatekey(loader, load_vector(*names)) diff --git a/acme/acme/_internal/tests/util_test.py b/acme/acme/_internal/tests/util_test.py index 8918755fe..4bda9933b 100644 --- a/acme/acme/_internal/tests/util_test.py +++ b/acme/acme/_internal/tests/util_test.py @@ -1,6 +1,5 @@ """Tests for acme.util.""" import sys -import unittest import pytest diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index 161637c5f..4204adfa4 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -419,7 +419,7 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse): return hashlib.sha256(self.key_authorization.encode('utf-8')).digest() def gen_cert(self, domain: str, key: Optional[crypto.PKey] = None, bits: int = 2048 - ) -> Tuple[crypto.X509, crypto.PKey]: + ) -> Tuple[x509.Certificate, crypto.PKey]: """Generate tls-alpn-01 certificate. :param str domain: Domain verified by the challenge. @@ -428,7 +428,7 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse): fresh key will be generated. :param int bits: Number of bits for newly generated key. - :rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey` + :rtype: `tuple` of `x509.Certificate` and `OpenSSL.crypto.PKey` """ if key is None: @@ -450,10 +450,10 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse): force_san=True, extensions=[acme_extension] ) - return crypto.X509.from_cryptography(cert), key + return cert, key def probe_cert(self, domain: str, host: Optional[str] = None, - port: Optional[int] = None) -> crypto.X509: + port: Optional[int] = None) -> x509.Certificate: """Probe tls-alpn-01 challenge certificate. :param str domain: domain being validated, required. @@ -470,20 +470,17 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse): return crypto_util.probe_sni(host=host.encode(), port=port, name=domain.encode(), alpn_protocols=[self.ACME_TLS_1_PROTOCOL]) - def verify_cert(self, domain: str, cert: Union[x509.Certificate, crypto.X509]) -> bool: + def verify_cert(self, domain: str, cert: x509.Certificate, ) -> bool: """Verify tls-alpn-01 challenge certificate. :param str domain: Domain name being validated. :param cert: Challenge certificate. - :type cert: `cryptography.x509.Certificate` or `OpenSSL.crypto.X509` + :type cert: `cryptography.x509.Certificate` :returns: Whether the certificate was successfully verified. :rtype: bool """ - if not isinstance(cert, x509.Certificate): - cert = cert.to_cryptography() - names = crypto_util.get_names_from_subject_and_extensions( cert.subject, cert.extensions ) @@ -506,7 +503,7 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse): # pylint: disable=too-many-arguments def simple_verify(self, chall: 'TLSALPN01', domain: str, account_public_key: jose.JWK, - cert: Optional[crypto.X509] = None, host: Optional[str] = None, + cert: Optional[x509.Certificate] = None, host: Optional[str] = None, port: Optional[int] = None) -> bool: """Simple verify. @@ -516,7 +513,7 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse): :param .challenges.TLSALPN01 chall: Corresponding challenge. :param str domain: Domain name being validated. :param JWK account_public_key: - :param OpenSSL.crypto.X509 cert: Optional certificate. If not + :param x509.Certificate cert: Optional certificate. If not provided (``None``) certificate will be retrieved using `probe_cert`. :param string host: IP address used to probe the certificate. @@ -547,7 +544,8 @@ class TLSALPN01(KeyAuthorizationChallenge): response_cls = TLSALPN01Response typ = response_cls.typ - def validation(self, account_key: jose.JWK, **kwargs: Any) -> Tuple[crypto.X509, crypto.PKey]: + def validation(self, account_key: jose.JWK, + **kwargs: Any) -> Tuple[x509.Certificate, crypto.PKey]: """Generate validation. :param JWK account_key: @@ -556,7 +554,7 @@ class TLSALPN01(KeyAuthorizationChallenge): in certificate generation. If not provided (``None``), then fresh key will be generated. - :rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey` + :rtype: `tuple` of `x509.Certificate` and `OpenSSL.crypto.PKey` """ # TODO: Remove cast when response() is generic. diff --git a/acme/acme/client.py b/acme/acme/client.py index e5f610f2c..ede6ccac1 100644 --- a/acme/acme/client.py +++ b/acme/acme/client.py @@ -14,12 +14,10 @@ from typing import Optional from typing import Set from typing import Tuple from typing import Union -import warnings from cryptography import x509 import josepy as jose -import OpenSSL import requests from requests.adapters import HTTPAdapter from requests.utils import parse_header_links @@ -227,12 +225,8 @@ class ClientV2: :returns: updated order :rtype: messages.OrderResource """ - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, orderr.csr_pem) - with warnings.catch_warnings(): - warnings.filterwarnings('ignore', - message='The next major version of josepy will remove josepy.util.ComparableX509') - wrapped_csr = messages.CertificateRequest(csr=jose.ComparableX509(csr)) + csr = x509.load_pem_x509_csr(orderr.csr_pem) + wrapped_csr = messages.CertificateRequest(csr=csr) res = self._post(orderr.body.finalize, wrapped_csr) orderr = orderr.update(body=messages.Order.from_json(res.json())) return orderr @@ -285,11 +279,10 @@ class ClientV2: self.begin_finalization(orderr) return self.poll_finalization(orderr, deadline, fetch_alternative_chains) - def revoke(self, cert: jose.ComparableX509, rsn: int) -> None: + def revoke(self, cert: x509.Certificate, rsn: int) -> None: """Revoke certificate. - :param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in - `.ComparableX509` + :param x509.Certificate cert: `x509.Certificate` :param int rsn: Reason code for certificate revocation. @@ -477,11 +470,10 @@ class ClientV2: return datetime.datetime.now() + datetime.timedelta(seconds=seconds) - def _revoke(self, cert: jose.ComparableX509, rsn: int, url: str) -> None: + def _revoke(self, cert: x509.Certificate, rsn: int, url: str) -> None: """Revoke certificate. - :param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in - `.ComparableX509` + :param .x509.Certificate cert: `x509.Certificate` :param int rsn: Reason code for certificate revocation. diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index 560a13403..c20c49c7d 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -1,16 +1,15 @@ """Crypto utilities.""" -import binascii import contextlib import enum from datetime import datetime, timedelta, timezone import ipaddress import logging -import os import socket import typing from typing import Any from typing import Callable from typing import List +from typing import Literal from typing import Mapping from typing import Optional from typing import Sequence @@ -21,12 +20,11 @@ from typing import Union from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import dsa, rsa, ec, ed25519, ed448, types -import josepy as jose +from cryptography.hazmat.primitives.serialization import Encoding from OpenSSL import crypto from OpenSSL import SSL from acme import errors -import warnings logger = logging.getLogger(__name__) @@ -49,13 +47,13 @@ class Format(enum.IntEnum): DER = crypto.FILETYPE_ASN1 PEM = crypto.FILETYPE_PEM - def to_cryptography_encoding(self) -> serialization.Encoding: + def to_cryptography_encoding(self) -> Encoding: """Converts the Format to the corresponding cryptography `Encoding`. """ if self == Format.DER: - return serialization.Encoding.DER + return Encoding.DER else: - return serialization.Encoding.PEM + return Encoding.PEM _KeyAndCert = Union[ @@ -74,6 +72,7 @@ class _DefaultCertSelection: return self.certs.get(server_name, None) return None # pragma: no cover + class SSLSocket: # pylint: disable=too-few-public-methods """SSL wrapper for sockets. @@ -135,6 +134,8 @@ class SSLSocket: # pylint: disable=too-few-public-methods new_context = SSL.Context(self.method) new_context.set_min_proto_version(SSL.TLS1_2_VERSION) new_context.use_privatekey(key) + if isinstance(cert, x509.Certificate): + cert = crypto.X509.from_cryptography(cert) new_context.use_certificate(cert) if self.alpn_selection is not None: new_context.set_alpn_select_callback(self.alpn_selection) @@ -196,7 +197,7 @@ class SSLSocket: # pylint: disable=too-few-public-methods def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, # pylint: disable=too-many-arguments method: int = _DEFAULT_SSL_METHOD, source_address: Tuple[str, int] = ('', 0), - alpn_protocols: Optional[Sequence[bytes]] = None) -> crypto.X509: + alpn_protocols: Optional[Sequence[bytes]] = None) -> x509.Certificate: """Probe SNI server for SSL certificate. :param bytes name: Byte string to send as the server name in the @@ -214,7 +215,7 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, # :raises acme.errors.Error: In case of any problems. :returns: SSL certificate presented by the server. - :rtype: OpenSSL.crypto.X509 + :rtype: cryptography.x509.Certificate """ context = SSL.Context(method) @@ -248,7 +249,7 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, # raise errors.Error(error) cert = client_ssl.get_peer_certificate() assert cert # Appease mypy. We would have crashed out by now if there was no certificate. - return cert + return cert.to_cryptography() # Even *more* annoyingly, due to a mypy bug, we can't use Union[] types in @@ -321,7 +322,7 @@ def make_csr( ) csr = builder.sign(private_key, hashes.SHA256()) - return csr.public_bytes(serialization.Encoding.PEM) + return csr.public_bytes(Encoding.PEM) def get_names_from_subject_and_extensions( @@ -358,32 +359,16 @@ def get_names_from_subject_and_extensions( return [cns[0]] + [d for d in dns_names if d != cns[0]] -def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req: Union[crypto.X509, crypto.X509Req] - ) -> List[str]: - """ - Deprecated - .. deprecated: 3.2.1 - """ - warnings.warn( - "acme.crypto_util._pyopenssl_cert_or_req_all_names is deprecated and " - "will be removed in the next major release of Certbot.", - DeprecationWarning, - stacklevel=2 - ) - cert_or_req = loaded_cert_or_req.to_cryptography() - return get_names_from_subject_and_extensions( - cert_or_req.subject, cert_or_req.extensions - ) - - -def _pyopenssl_cert_or_req_san(cert_or_req: Union[crypto.X509, crypto.X509Req]) -> List[str]: +def _cryptography_cert_or_req_san( + cert_or_req: Union[x509.Certificate, x509.CertificateSigningRequest], +) -> List[str]: """Get Subject Alternative Names from certificate or CSR using pyOpenSSL. .. note:: Although this is `acme` internal API, it is used by `letsencrypt`. :param cert_or_req: Certificate or CSR. - :type cert_or_req: `OpenSSL.crypto.X509` or `OpenSSL.crypto.X509Req`. + :type cert_or_req: `x509.Certificate` or `x509.CertificateSigningRequest`. :returns: A list of Subject Alternative Names that is DNS. :rtype: `list` of `str` @@ -391,13 +376,8 @@ def _pyopenssl_cert_or_req_san(cert_or_req: Union[crypto.X509, crypto.X509Req]) Deprecated .. deprecated: 3.2.1 """ - warnings.warn( - "acme.crypto_util._pyopenssl_cert_or_req_san is deprecated and " - "will be removed in the next major release of Certbot.", - DeprecationWarning, - stacklevel=2 - ) - exts = cert_or_req.to_cryptography().extensions + # ???: is this translation needed? + exts = cert_or_req.extensions try: san_ext = exts.get_extension_for_class(x509.SubjectAlternativeName) except x509.ExtensionNotFound: @@ -486,85 +466,13 @@ def make_self_signed_cert(private_key: types.CertificateIssuerPrivateKeyTypes, return builder.sign(private_key, hashes.SHA256()) -def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None, - not_before: Optional[int] = None, - validity: int = (7 * 24 * 60 * 60), force_san: bool = True, - extensions: Optional[List[crypto.X509Extension]] = None, - ips: Optional[List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]] = None - ) -> crypto.X509: - """Generate new self-signed certificate. - - :type domains: `list` of `str` - :param OpenSSL.crypto.PKey key: - :param bool force_san: - :param extensions: List of additional extensions to include in the cert. - :type extensions: `list` of `OpenSSL.crypto.X509Extension` - :type ips: `list` of (`ipaddress.IPv4Address` or `ipaddress.IPv6Address`) - - If more than one domain is provided, all of the domains are put into - ``subjectAltName`` X.509 extension and first domain is set as the - subject CN. If only one domain is provided no ``subjectAltName`` - extension is used, unless `force_san` is ``True``. - - .. deprecated: 2.10.0 - """ - warnings.warn( - "acme.crypto_util.gen_ss_cert is deprecated and will be removed in the " - "next major release of Certbot. Please use " - "acme.crypto_util.make_self_signed_cert instead.", DeprecationWarning, - stacklevel=2 - ) - assert domains or ips, "Must provide one or more hostnames or IPs for the cert." - - cert = crypto.X509() - cert.set_serial_number(int(binascii.hexlify(os.urandom(16)), 16)) - cert.set_version(2) - - if extensions is None: - extensions = [] - if domains is None: - domains = [] - if ips is None: - ips = [] - extensions.append( - crypto.X509Extension( - b"basicConstraints", True, b"CA:TRUE, pathlen:0"), - ) - - if len(domains) > 0: - cert.get_subject().CN = domains[0] - # TODO: what to put into cert.get_subject()? - cert.set_issuer(cert.get_subject()) - - sanlist = [] - for address in domains: - sanlist.append('DNS:' + address) - for ip in ips: - sanlist.append('IP:' + ip.exploded) - san_string = ', '.join(sanlist).encode('ascii') - if force_san or len(domains) > 1 or len(ips) > 0: - extensions.append(crypto.X509Extension( - b"subjectAltName", - critical=False, - value=san_string - )) - - cert.add_extensions(extensions) - - cert.gmtime_adj_notBefore(0 if not_before is None else not_before) - cert.gmtime_adj_notAfter(validity) - - cert.set_pubkey(key) - cert.sign(key, "sha256") - return cert - - -def dump_pyopenssl_chain(chain: Union[List[jose.ComparableX509], List[crypto.X509]], - filetype: Union[Format, int] = Format.PEM) -> bytes: +def dump_cryptography_chain( + chain: List[x509.Certificate], + encoding: Literal[Encoding.PEM, Encoding.DER] = Encoding.PEM, +) -> bytes: """Dump certificate chain into a bundle. - :param list chain: List of `OpenSSL.crypto.X509` (or wrapped in - :class:`josepy.util.ComparableX509`). + :param list chain: List of `cryptography.x509.Certificate`. :returns: certificate chain bundle :rtype: bytes @@ -572,24 +480,12 @@ def dump_pyopenssl_chain(chain: Union[List[jose.ComparableX509], List[crypto.X50 Deprecated .. deprecated: 3.2.1 """ - warnings.warn( - "acme.crypto_util.dump_pyopenssl_chain is deprecated and " - "will be removed in the next major release of Certbot.", - DeprecationWarning, - stacklevel=2 - ) # XXX: returns empty string when no chain is available, which # shuts up RenewableCert, but might not be the best solution... - filetype = Format(filetype) - def _dump_cert(cert: Union[jose.ComparableX509, crypto.X509]) -> bytes: - if isinstance(cert, jose.ComparableX509): - if isinstance(cert.wrapped, crypto.X509Req): - raise errors.Error("Unexpected CSR provided.") # pragma: no cover - cert = cert.wrapped + def _dump_cert(cert: x509.Certificate) -> bytes: + return cert.public_bytes(encoding) - return cert.to_cryptography().public_bytes(filetype.to_cryptography_encoding()) - - # assumes that OpenSSL.crypto.dump_certificate includes ending + # assumes that x509.Certificate.public_bytes includes ending # newline character return b"".join(_dump_cert(cert) for cert in chain) diff --git a/acme/acme/messages.py b/acme/acme/messages.py index 46593d82c..b26082181 100644 --- a/acme/acme/messages.py +++ b/acme/acme/messages.py @@ -12,7 +12,8 @@ from typing import Optional from typing import Tuple from typing import Type from typing import TypeVar -import warnings + +from cryptography import x509 import josepy as jose @@ -580,45 +581,33 @@ class AuthorizationResource(ResourceWithURI): class CertificateRequest(jose.JSONObjectWithFields): """ACME newOrder request. - :ivar jose.ComparableX509 csr: - `OpenSSL.crypto.X509Req` wrapped in `.ComparableX509` + :ivar x509.CertificateSigningRequest csr: `x509.CertificateSigningRequest` """ - with warnings.catch_warnings(): - warnings.filterwarnings('ignore', - message='The next major version of josepy will remove josepy.util.ComparableX509') - csr: jose.ComparableX509 = jose.field( - 'csr', decoder=jose.decode_csr, encoder=jose.encode_csr) + csr: x509.CertificateSigningRequest = jose.field( + 'csr', decoder=jose.decode_csr, encoder=jose.encode_csr) class CertificateResource(ResourceWithURI): """Certificate Resource. - :ivar josepy.util.ComparableX509 body: - `OpenSSL.crypto.X509` wrapped in `.ComparableX509` + :ivar x509.Certificate body: `x509.Certificate` :ivar str cert_chain_uri: URI found in the 'up' ``Link`` header :ivar tuple authzrs: `tuple` of `AuthorizationResource`. """ - with warnings.catch_warnings(): - warnings.filterwarnings('ignore', - message='The next major version of josepy will remove josepy.util.ComparableX509') - cert_chain_uri: str = jose.field('cert_chain_uri') + cert_chain_uri: str = jose.field('cert_chain_uri') authzrs: Tuple[AuthorizationResource, ...] = jose.field('authzrs') class Revocation(jose.JSONObjectWithFields): """Revocation message. - :ivar jose.ComparableX509 certificate: `OpenSSL.crypto.X509` wrapped in - `jose.ComparableX509` + :ivar x509.Certificate certificate: `x509.Certificate` """ - with warnings.catch_warnings(): - warnings.filterwarnings('ignore', - message='The next major version of josepy will remove josepy.util.ComparableX509') - certificate: jose.ComparableX509 = jose.field( - 'certificate', decoder=jose.decode_cert, encoder=jose.encode_cert) + certificate: x509.Certificate = jose.field( + 'certificate', decoder=jose.decode_cert, encoder=jose.encode_cert) reason: int = jose.field('reason') diff --git a/acme/examples/http01_example.py b/acme/examples/http01_example.py index 4a240afa0..0598fda41 100644 --- a/acme/examples/http01_example.py +++ b/acme/examples/http01_example.py @@ -200,11 +200,7 @@ def example_http(): # Revoke certificate - fullchain_com = jose.ComparableX509( - OpenSSL.crypto.X509.from_cryptography( - x509.load_pem_x509_certificate(fullchain_pem) - ) - ) + fullchain_com = x509.load_pem_x509_certificate(fullchain_pem) try: client_acme.revoke(fullchain_com, 0) # revocation reason = 0 diff --git a/acme/setup.py b/acme/setup.py index 0b607d7b3..b15ef4221 100644 --- a/acme/setup.py +++ b/acme/setup.py @@ -1,5 +1,3 @@ -import sys - from setuptools import find_packages from setuptools import setup @@ -7,9 +5,7 @@ version = '4.0.0.dev0' install_requires = [ 'cryptography>=43.0.0', - # Josepy 2+ may introduce backward incompatible changes by droping usage of - # deprecated PyOpenSSL APIs. - 'josepy>=1.13.0, <2', + 'josepy>=2.0.0', # PyOpenSSL>=25.0.0 is just needed to satisfy mypy right now so this dependency can probably be # relaxed to >=24.0.0 if needed. 'PyOpenSSL>=25.0.0', diff --git a/certbot-compatibility-test/certbot_compatibility_test/validator.py b/certbot-compatibility-test/certbot_compatibility_test/validator.py index 5fc226e4c..a51339468 100644 --- a/certbot-compatibility-test/certbot_compatibility_test/validator.py +++ b/certbot-compatibility-test/certbot_compatibility_test/validator.py @@ -34,7 +34,7 @@ class Validator: name = name if isinstance(name, bytes) else name.encode() try: - presented_cert = crypto_util.probe_sni(name, host, port).to_cryptography() + presented_cert = crypto_util.probe_sni(name, host, port) except acme_errors.Error as error: logger.exception(str(error)) return False diff --git a/certbot/CHANGELOG.md b/certbot/CHANGELOG.md index 5dadb384f..586fcd0c5 100644 --- a/certbot/CHANGELOG.md +++ b/certbot/CHANGELOG.md @@ -15,6 +15,14 @@ Certbot adheres to [Semantic Versioning](https://semver.org/). renewal at 30 days before expiration. The config field renew_before_expiry still overrides this default. +* removed `acme.crypto_util._pyopenssl_cert_or_req_all_names` +* removed `acme.crypto_util._pyopenssl_cert_or_req_san` +* removed `acme.crypto_util.dump_pyopenssl_chain` +* removed `acme.crypto_util.gen_ss_cert` +* removed `certbot.crypto_util.dump_pyopenssl_chain` +* removed `certbot.crypto_util.pyopenssl_load_certificate` + + ### Fixed * Moved `RewriteEngine on` directive added during apache http01 authentication @@ -33,9 +41,9 @@ More details about these changes can be found on our GitHub repo. * The --register-unsafely-without-email flag is no longer needed in non-interactive mode. * In interactive mode, pressing Enter at the email prompt will register without an email. -* deprecated `acme.crypto_util.dump_pyopenssl_chain` * deprecated `acme.crypto_util._pyopenssl_cert_or_req_all_names` * deprecated `acme.crypto_util._pyopenssl_cert_or_req_san` +* deprecated `acme.crypto_util.dump_pyopenssl_chain` * deprecated `certbot.crypto_util.dump_pyopenssl_chain` * deprecated `certbot.crypto_util.pyopenssl_load_certificate` diff --git a/certbot/certbot/_internal/client.py b/certbot/certbot/_internal/client.py index f794eef8c..e35d7aa68 100644 --- a/certbot/certbot/_internal/client.py +++ b/certbot/certbot/_internal/client.py @@ -48,8 +48,9 @@ from certbot.interfaces import AccountStorage logger = logging.getLogger(__name__) -def acme_from_config_key(config: configuration.NamespaceConfig, key: jose.JWK, - regr: Optional[messages.RegistrationResource] = None +def acme_from_config_key(config: configuration.NamespaceConfig, + key: jose.JWK, + regr: Optional[messages.RegistrationResource] = None, ) -> acme_client.ClientV2: """Wrangle ACME client construction""" if key.typ == 'EC': diff --git a/certbot/certbot/_internal/main.py b/certbot/certbot/_internal/main.py index c1352b49e..aee68ac67 100644 --- a/certbot/certbot/_internal/main.py +++ b/certbot/certbot/_internal/main.py @@ -17,6 +17,7 @@ from typing import TypeVar from typing import Union import configobj +from cryptography import x509 import josepy as jose from josepy import b64 @@ -1364,10 +1365,10 @@ def revoke(config: configuration.NamespaceConfig, acme = client.acme_from_config_key(config, acc.key, acc.regr) with open(config.cert_path, 'rb') as f: - cert = crypto_util.pyopenssl_load_certificate(f.read())[0] + cert = x509.load_pem_x509_certificate(f.read()) logger.debug("Reason code for revocation: %s", config.reason) try: - acme.revoke(jose.ComparableX509(cert), config.reason) + acme.revoke(cert, config.reason) _delete_if_appropriate(config) except acme_errors.ClientError as e: return str(e) diff --git a/certbot/certbot/_internal/storage.py b/certbot/certbot/_internal/storage.py index 6b7444398..a03848ae6 100644 --- a/certbot/certbot/_internal/storage.py +++ b/certbot/certbot/_internal/storage.py @@ -1106,8 +1106,7 @@ class RenewableCert(interfaces.RenewableCert): logger.debug("Writing chain to %s.", target["chain"]) f_b.write(chain) with open(target["fullchain"], "wb") as f_b: - # assumes that OpenSSL.crypto.dump_certificate includes - # ending newline character + # assumes the cert includes ending newline character logger.debug("Writing full chain to %s.", target["fullchain"]) f_b.write(cert + chain) diff --git a/certbot/certbot/_internal/tests/crypto_util_test.py b/certbot/certbot/_internal/tests/crypto_util_test.py index 67b358da9..e8af06bee 100644 --- a/certbot/certbot/_internal/tests/crypto_util_test.py +++ b/certbot/certbot/_internal/tests/crypto_util_test.py @@ -1,19 +1,17 @@ """Tests for certbot.crypto_util.""" -import binascii import logging import re import sys import unittest from unittest import mock -import warnings -import OpenSSL import pytest from cryptography import x509 -from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.serialization import Encoding +from acme import crypto_util as acme_crypto_util from certbot import errors from certbot import util from certbot.compat import filesystem @@ -143,7 +141,7 @@ class ImportCSRFileTest(unittest.TestCase): data = test_util.load_vector('csr_512.der') data_pem = test_util.load_vector('csr_512.pem') - assert (OpenSSL.crypto.FILETYPE_PEM, + assert (acme_crypto_util.Format.PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), @@ -154,7 +152,7 @@ class ImportCSRFileTest(unittest.TestCase): csrfile = test_util.vector_path('csr_512.pem') data = test_util.load_vector('csr_512.pem') - assert (OpenSSL.crypto.FILETYPE_PEM, + assert (acme_crypto_util.Format.PEM, util.CSR(file=csrfile, data=data, form="pem"), @@ -396,39 +394,8 @@ class GetNamesFromReqTest(unittest.TestCase): self._call(test_util.load_vector('csr-6sans_512.pem')) def test_der(self): - from OpenSSL.crypto import FILETYPE_ASN1 assert ['Example.com'] == \ - self._call(test_util.load_vector('csr_512.der'), typ=FILETYPE_ASN1) - - -class CertLoaderTest(unittest.TestCase): - """Tests for certbot.crypto_util.pyopenssl_load_certificate""" - - def test_load_valid_cert(self): - from certbot.crypto_util import pyopenssl_load_certificate - - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='certbot.crypto_util.pyopenssl_load_certificate is *' - ) - cert, file_type = pyopenssl_load_certificate(CERT) - - assert file_type == OpenSSL.crypto.FILETYPE_PEM - assert binascii.unhexlify( - cert.digest("sha256").replace(b":", b"") - ) == x509.load_pem_x509_certificate(CERT).fingerprint(hashes.SHA256()) - - def test_load_invalid_cert(self): - from certbot.crypto_util import pyopenssl_load_certificate - bad_cert_data = CERT.replace(b"BEGIN CERTIFICATE", b"ASDFASDFASDF!!!") - with pytest.raises(errors.Error): - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='certbot.crypto_util.pyopenssl_load_certificate is *' - ) - pyopenssl_load_certificate(bad_cert_data) + self._call(test_util.load_vector('csr_512.der'), typ=acme_crypto_util.Format.DER) class NotBeforeTest(unittest.TestCase): @@ -460,20 +427,19 @@ class Sha256sumTest(unittest.TestCase): class CertAndChainFromFullchainTest(unittest.TestCase): """Tests for certbot.crypto_util.cert_and_chain_from_fullchain""" - def _parse_and_reencode_pem(self, cert_pem): - from OpenSSL import crypto - return crypto.dump_certificate(crypto.FILETYPE_PEM, - crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)).decode() + def _parse_and_reencode_pem(self, cert_pem:str)->str: + cert = x509.load_pem_x509_certificate(cert_pem.encode()) + return cert.public_bytes(Encoding.PEM).decode() def test_cert_and_chain_from_fullchain(self): - cert_pem = CERT.decode() - chain_pem = cert_pem + SS_CERT.decode() - fullchain_pem = cert_pem + chain_pem - spacey_fullchain_pem = cert_pem + u'\n' + chain_pem - crlf_fullchain_pem = fullchain_pem.replace(u'\n', u'\r\n') + cert_pem: str = CERT.decode() + chain_pem: str = cert_pem + SS_CERT.decode() + fullchain_pem: str = cert_pem + chain_pem + spacey_fullchain_pem: str = cert_pem + u'\n' + chain_pem + crlf_fullchain_pem: str = fullchain_pem.replace(u'\n', u'\r\n') # In the ACME v1 code path, the fullchain is constructed by loading cert+chain DERs - # and using OpenSSL to dump them, so here we confirm that OpenSSL is producing certs + # and using OpenSSL to dump them, so here we confirm that cryptography is producing certs # that will be parseable by cert_and_chain_from_fullchain. acmev1_fullchain_pem = self._parse_and_reencode_pem(cert_pem) + \ self._parse_and_reencode_pem(cert_pem) + self._parse_and_reencode_pem(SS_CERT.decode()) diff --git a/certbot/certbot/_internal/tests/main_test.py b/certbot/certbot/_internal/tests/main_test.py index 59fb594fb..6693a0500 100644 --- a/certbot/certbot/_internal/tests/main_test.py +++ b/certbot/certbot/_internal/tests/main_test.py @@ -14,15 +14,14 @@ import traceback from typing import List import unittest from unittest import mock -import warnings import configobj +from cryptography import x509 import josepy as jose import pytest import pytz from acme.messages import Error as acme_error -from certbot import crypto_util from certbot import errors from certbot import interfaces from certbot import util @@ -219,6 +218,7 @@ class RunTest(test_util.ConfigTestCase): with pytest.raises(errors.NotSupportedError): main.run(self.config, plugins) + class CertonlyTest(unittest.TestCase): """Tests for certbot._internal.main.certonly.""" @@ -436,12 +436,7 @@ class RevokeTest(test_util.TempDirTestCase): config = cli.prepare_and_parse_args(plugins, args) from certbot._internal.main import revoke - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='certbot.crypto_util.pyopenssl_load_certificate is *' - ) - revoke(config, plugins) + revoke(config, plugins) @mock.patch('certbot._internal.main._delete_if_appropriate') @mock.patch('certbot._internal.main.client.acme_client') @@ -1801,23 +1796,16 @@ class MainTest(test_util.ConfigTestCase): mock_delete_if_appropriate): mock_delete_if_appropriate.return_value = False server = 'foo.bar' - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='certbot.crypto_util.pyopenssl_load_certificate is *' - ) - self._call_no_clientmock(['--cert-path', SS_CERT_PATH, '--key-path', RSA2048_KEY_PATH, - '--server', server, 'revoke']) - with open(RSA2048_KEY_PATH, 'rb') as f: - assert mock_acme_client.ClientV2.call_count == 1 - assert mock_acme_client.ClientNetwork.call_args[0][0] == \ - jose.JWK.load(f.read()) - with open(SS_CERT_PATH, 'rb') as f: - cert = crypto_util.pyopenssl_load_certificate(f.read())[0] - mock_revoke = mock_acme_client.ClientV2().revoke - mock_revoke.assert_called_once_with( - jose.ComparableX509(cert), - mock.ANY) + self._call_no_clientmock(['--cert-path', SS_CERT_PATH, '--key-path', RSA2048_KEY_PATH, + '--server', server, 'revoke']) + with open(RSA2048_KEY_PATH, 'rb') as f: + assert mock_acme_client.ClientV2.call_count == 1 + assert mock_acme_client.ClientNetwork.call_args[0][0] == \ + jose.JWK.load(f.read()) + with open(SS_CERT_PATH, 'rb') as f: + cert = x509.load_pem_x509_certificate(f.read()) + mock_revoke = mock_acme_client.ClientV2().revoke + mock_revoke.assert_called_once_with(cert, mock.ANY) def test_revoke_with_key_mismatch(self): server = 'foo.bar' @@ -1831,18 +1819,11 @@ class MainTest(test_util.ConfigTestCase): mock_delete_if_appropriate): mock_delete_if_appropriate.return_value = False mock_determine_account.return_value = (mock.MagicMock(), None) - with warnings.catch_warnings(): - warnings.filterwarnings( - 'ignore', - message='certbot.crypto_util.pyopenssl_load_certificate is *' - ) - _, _, _, client = self._call(['--cert-path', CERT, 'revoke']) - with open(CERT) as f: - cert = crypto_util.pyopenssl_load_certificate(f.read())[0] - mock_revoke = client.acme_from_config_key().revoke - mock_revoke.assert_called_once_with( - jose.ComparableX509(cert), - mock.ANY) + _, _, _, client = self._call(['--cert-path', CERT, 'revoke']) + with open(CERT, 'rb') as f: + cert = x509.load_pem_x509_certificate(f.read()) + mock_revoke = client.acme_from_config_key().revoke + mock_revoke.assert_called_once_with(cert, mock.ANY) @mock.patch('certbot._internal.log.post_arg_parse_setup') def test_register(self, _): diff --git a/certbot/certbot/crypto_util.py b/certbot/certbot/crypto_util.py index c22c70ab4..b5f1d047d 100644 --- a/certbot/certbot/crypto_util.py +++ b/certbot/certbot/crypto_util.py @@ -14,7 +14,6 @@ from typing import Set from typing import Tuple from typing import TYPE_CHECKING from typing import Union -import warnings from cryptography import x509 from cryptography.exceptions import InvalidSignature @@ -32,8 +31,6 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import NoEncryption from cryptography.hazmat.primitives.serialization import PrivateFormat -import josepy -from OpenSSL import crypto from OpenSSL import SSL from acme import crypto_util as acme_crypto_util @@ -391,32 +388,6 @@ def verify_fullchain(renewable_cert: interfaces.RenewableCert) -> None: raise e -def pyopenssl_load_certificate(data: bytes) -> Tuple[crypto.X509, int]: - """Load PEM/DER certificate. - - :raises errors.Error: - - Deprecated - .. deprecated: 3.2.1 - """ - warnings.warn( - "certbot.crypto_util.pyopenssl_load_certificate is deprecated and " - "will be removed in the next major release of Certbot.", - DeprecationWarning, - stacklevel=2 - ) - - openssl_errors = [] - - for file_type in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1): - try: - return crypto.load_certificate(file_type, data), file_type - except crypto.Error as error: # TODO: other errors? - openssl_errors.append(error) - raise errors.Error("Unable to load: {0}".format(",".join( - str(error) for error in openssl_errors))) - - def get_sans_from_cert( cert: bytes, typ: Union[acme_crypto_util.Format, int] = acme_crypto_util.Format.PEM ) -> List[str]: @@ -491,29 +462,6 @@ def get_names_from_req( ) -def dump_pyopenssl_chain( - chain: Union[List[crypto.X509], List[josepy.ComparableX509]], - filetype: Union[acme_crypto_util.Format, int] = acme_crypto_util.Format.PEM, -) -> bytes: - """Dump certificate chain into a bundle. - - :param list chain: List of `crypto.X509` (or wrapped in - :class:`josepy.util.ComparableX509`). - - Deprecated - .. deprecated: 3.2.1 - """ - warnings.warn( - "certbot.crypto_util.dump_pyopenssl_chain is deprecated and " - "will be removed in the next major release of Certbot.", - DeprecationWarning, - stacklevel=2 - ) - # XXX: returns empty string when no chain is available, which - # shuts up RenewableCert, but might not be the best solution... - return acme_crypto_util.dump_pyopenssl_chain(chain, filetype) - - def notBefore(cert_path: str) -> datetime.datetime: """When does the cert at cert_path start being valid? diff --git a/certbot/certbot/tests/util.py b/certbot/certbot/tests/util.py index 021581d7c..f23bc5416 100644 --- a/certbot/certbot/tests/util.py +++ b/certbot/certbot/tests/util.py @@ -22,6 +22,7 @@ from typing import Union import unittest from unittest import mock +from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey @@ -98,7 +99,7 @@ def load_vector(*names: str) -> bytes: return data -def _guess_loader(filename: str, loader_pem: int, loader_der: int) -> int: +def _guess_loader(filename: str, loader_pem: Callable, loader_der: Callable) -> Callable: _, ext = os.path.splitext(filename) if ext.lower() == '.pem': return loader_pem @@ -107,23 +108,12 @@ def _guess_loader(filename: str, loader_pem: int, loader_der: int) -> int: raise ValueError("Loader could not be recognized based on extension") # pragma: no cover -def load_cert(*names: str) -> crypto.X509: +def load_cert(*names: str) -> x509.Certificate: """Load certificate.""" loader = _guess_loader( - names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1) - return crypto.load_certificate(loader, load_vector(*names)) - - -def load_csr(*names: str) -> crypto.X509Req: - """Load certificate request.""" - loader = _guess_loader( - names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1) - return crypto.load_certificate_request(loader, load_vector(*names)) - - -def load_comparable_csr(*names: str) -> jose.ComparableX509: - """Load ComparableX509 certificate request.""" - return jose.ComparableX509(load_csr(*names)) + names[-1], x509.load_pem_x509_certificate, x509.load_der_x509_certificate + ) + return loader(load_vector(*names)) def load_jose_rsa_private_key_pem(*names: str) -> jose.ComparableRSAKey: @@ -131,9 +121,19 @@ def load_jose_rsa_private_key_pem(*names: str) -> jose.ComparableRSAKey: return jose.ComparableRSAKey(load_rsa_private_key_pem(*names)) +def _guess_loader_pyopenssl(filename: str, loader_pem: int, loader_der: int) -> int: + # note: used by `load_rsa_private_key_pem` + _, ext = os.path.splitext(filename) + if ext.lower() == '.pem': + return loader_pem + elif ext.lower() == '.der': + return loader_der + raise ValueError("Loader could not be recognized based on extension") # pragma: no cover + + def load_rsa_private_key_pem(*names: str) -> RSAPrivateKey: """Load RSA private key.""" - loader = _guess_loader(names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1) + loader = _guess_loader_pyopenssl(names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1) loader_fn: Callable[..., Any] if loader == crypto.FILETYPE_PEM: loader_fn = serialization.load_pem_private_key diff --git a/certbot/setup.py b/certbot/setup.py index 70a1ecc14..7685b7839 100644 --- a/certbot/setup.py +++ b/certbot/setup.py @@ -5,6 +5,7 @@ import re from setuptools import find_packages from setuptools import setup + def read_file(filename, encoding='utf8'): """Read unicode from given file.""" with codecs.open(filename, encoding=encoding) as fd: @@ -33,9 +34,7 @@ install_requires = [ 'cryptography>=43.0.0', 'distro>=1.0.1', 'importlib_metadata>=4.6; python_version < "3.10"', - # Josepy 2+ may introduce backward incompatible changes by droping usage of - # deprecated PyOpenSSL APIs. - 'josepy>=1.13.0, <2', + 'josepy>=2.0.0', 'parsedatetime>=2.4', 'pyrfc3339', 'pytz>=2019.3', diff --git a/tools/oldest_constraints.txt b/tools/oldest_constraints.txt index 087a00fb0..26269150e 100644 --- a/tools/oldest_constraints.txt +++ b/tools/oldest_constraints.txt @@ -37,7 +37,7 @@ iniconfig==2.0.0 ; python_version >= "3.9" and python_version < "3.10" ipaddress==1.0.16 ; python_version >= "3.9" and python_version < "3.10" isort==6.0.0 ; python_version >= "3.9" and python_version < "3.10" jmespath==0.10.0 ; python_version >= "3.9" and python_version < "3.10" -josepy==1.15.0 ; python_version >= "3.9" and python_version < "3.10" +josepy==2.0.0 ; python_version >= "3.9" and python_version < "3.10" jsonlines==4.0.0 ; python_version >= "3.9" and python_version < "3.10" mccabe==0.7.0 ; python_version >= "3.9" and python_version < "3.10" mypy-extensions==1.0.0 ; python_version >= "3.9" and python_version < "3.10" diff --git a/tools/requirements.txt b/tools/requirements.txt index d9323ab18..8c44f4e03 100644 --- a/tools/requirements.txt +++ b/tools/requirements.txt @@ -73,7 +73,7 @@ jedi==0.19.2 ; python_version >= "3.9" and python_version < "4.0" jeepney==0.8.0 ; python_version >= "3.9" and python_version < "4.0" and sys_platform == "linux" jinja2==3.1.5 ; python_version >= "3.9" and python_version < "4.0" jmespath==1.0.1 ; python_version >= "3.9" and python_version < "4.0" -josepy==1.15.0 ; python_version >= "3.9" and python_version < "4.0" +josepy==2.0.0 ; python_version >= "3.9" and python_version < "4.0" jsonlines==4.0.0 ; python_version >= "3.9" and python_version < "4.0" jsonpickle==4.0.1 ; python_version >= "3.9" and python_version < "4.0" keyring==25.6.0 ; python_version >= "3.9" and python_version < "4.0"