mirror of
https://github.com/certbot/certbot.git
synced 2026-05-25 02:48:54 -04:00
Feature acme cryptography 2 (#10245)
redoing https://github.com/certbot/certbot/pull/10174 but lots of mergecommits and ff wanted; so test in a clean environment
This commit is contained in:
parent
7a90cdd231
commit
dd876a40ed
25 changed files with 176 additions and 504 deletions
|
|
@ -13,7 +13,7 @@ import requests
|
|||
from acme import errors
|
||||
from acme._internal.tests import test_util
|
||||
|
||||
CERT = test_util.load_comparable_cert('cert.pem')
|
||||
CERT = test_util.load_cert('cert.pem')
|
||||
KEY = jose.JWKRSA(key=test_util.load_rsa_private_key('rsa512_key.pem'))
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -11,8 +11,6 @@ import unittest
|
|||
from unittest import mock
|
||||
import warnings
|
||||
|
||||
import josepy as jose
|
||||
import OpenSSL
|
||||
import pytest
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
|
@ -33,10 +31,10 @@ class SSLSocketAndProbeSNITest(unittest.TestCase):
|
|||
"""Tests for acme.crypto_util.SSLSocket/probe_sni."""
|
||||
|
||||
def setUp(self):
|
||||
self.cert = test_util.load_comparable_cert('rsa2048_cert.pem')
|
||||
self.cert = test_util.load_cert('rsa2048_cert.pem')
|
||||
key = test_util.load_pyopenssl_private_key('rsa2048_key.pem')
|
||||
# pylint: disable=protected-access
|
||||
certs = {b'foo': (key, self.cert.wrapped)}
|
||||
certs = {b'foo': (key, self.cert)}
|
||||
|
||||
from acme.crypto_util import SSLSocket
|
||||
|
||||
|
|
@ -58,8 +56,7 @@ class SSLSocketAndProbeSNITest(unittest.TestCase):
|
|||
|
||||
def _probe(self, name):
|
||||
from acme.crypto_util import probe_sni
|
||||
return jose.ComparableX509(probe_sni(
|
||||
name, host='127.0.0.1', port=self.port))
|
||||
return probe_sni(name, host='127.0.0.1', port=self.port)
|
||||
|
||||
def _start_server(self):
|
||||
self.server_thread.start()
|
||||
|
|
@ -97,48 +94,29 @@ class SSLSocketTest(unittest.TestCase):
|
|||
_ = SSLSocket(None)
|
||||
|
||||
|
||||
class PyOpenSSLCertOrReqAllNamesTest(unittest.TestCase):
|
||||
"""Test for acme.crypto_util._pyopenssl_cert_or_req_all_names."""
|
||||
class MiscTests(unittest.TestCase):
|
||||
|
||||
def test_dump_cryptography_chain(self):
|
||||
from acme.crypto_util import dump_cryptography_chain
|
||||
|
||||
cert1 = test_util.load_cert('rsa2048_cert.pem')
|
||||
cert2 = test_util.load_cert('rsa4096_cert.pem')
|
||||
|
||||
chain = [cert1, cert2]
|
||||
dumped = dump_cryptography_chain(chain)
|
||||
|
||||
# default is PEM encoding Encoding.PEM
|
||||
assert isinstance(dumped, bytes)
|
||||
|
||||
|
||||
class CryptographyCertOrReqSANTest(unittest.TestCase):
|
||||
"""Test for acme.crypto_util._cryptography_cert_or_req_san."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, loader, name):
|
||||
# pylint: disable=protected-access
|
||||
from acme.crypto_util import _pyopenssl_cert_or_req_all_names
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='acme.crypto_util._pyopenssl_cert_or_req_all_names is deprecated *'
|
||||
)
|
||||
return _pyopenssl_cert_or_req_all_names(loader(name))
|
||||
|
||||
def _call_cert(self, name):
|
||||
return self._call(test_util.load_cert, name)
|
||||
|
||||
def test_cert_one_san_no_common(self):
|
||||
assert self._call_cert('cert-nocn.der') == \
|
||||
['no-common-name.badssl.com']
|
||||
|
||||
def test_cert_no_sans_yes_common(self):
|
||||
assert self._call_cert('cert.pem') == ['example.com']
|
||||
|
||||
def test_cert_two_sans_yes_common(self):
|
||||
assert self._call_cert('cert-san.pem') == \
|
||||
['example.com', 'www.example.com']
|
||||
|
||||
|
||||
class PyOpenSSLCertOrReqSANTest(unittest.TestCase):
|
||||
"""Test for acme.crypto_util._pyopenssl_cert_or_req_san."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, loader, name):
|
||||
# pylint: disable=protected-access
|
||||
from acme.crypto_util import _pyopenssl_cert_or_req_san
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='acme.crypto_util._pyopenssl_cert_or_req_san is deprecated *'
|
||||
)
|
||||
return _pyopenssl_cert_or_req_san(loader(name))
|
||||
from acme.crypto_util import _cryptography_cert_or_req_san
|
||||
return _cryptography_cert_or_req_san(loader(name))
|
||||
|
||||
@classmethod
|
||||
def _get_idn_names(cls):
|
||||
|
|
@ -285,41 +263,6 @@ class GenMakeSelfSignedCertTest(unittest.TestCase):
|
|||
self.assertIn(extension, cert.extensions)
|
||||
|
||||
|
||||
class GenSsCertTest(unittest.TestCase):
|
||||
"""Test for gen_ss_cert (generation of self-signed cert)."""
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.cert_count = 5
|
||||
self.serial_num: List[int] = []
|
||||
self.key = OpenSSL.crypto.PKey()
|
||||
self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
|
||||
|
||||
def test_sn_collisions(self):
|
||||
from acme.crypto_util import gen_ss_cert
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
for _ in range(self.cert_count):
|
||||
cert = gen_ss_cert(self.key, ['dummy'], force_san=True,
|
||||
ips=[ipaddress.ip_address("10.10.10.10")])
|
||||
self.serial_num.append(cert.get_serial_number())
|
||||
assert len(set(self.serial_num)) >= self.cert_count
|
||||
|
||||
def test_no_name(self):
|
||||
from acme.crypto_util import gen_ss_cert
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
with pytest.raises(AssertionError):
|
||||
gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")])
|
||||
gen_ss_cert(self.key)
|
||||
|
||||
def test_no_ips(self):
|
||||
from acme.crypto_util import gen_ss_cert
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
gen_ss_cert(self.key, ['dummy'])
|
||||
|
||||
|
||||
class MakeCSRTest(unittest.TestCase):
|
||||
"""Test for standalone functions."""
|
||||
|
||||
|
|
@ -395,42 +338,5 @@ class MakeCSRTest(unittest.TestCase):
|
|||
make_csr(privkey_pem, ["a.example"])
|
||||
|
||||
|
||||
class DumpPyopensslChainTest(unittest.TestCase):
|
||||
"""Test for dump_pyopenssl_chain."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, loaded):
|
||||
# pylint: disable=protected-access
|
||||
from acme.crypto_util import dump_pyopenssl_chain
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='acme.crypto_util.dump_pyopenssl_chain is *'
|
||||
)
|
||||
return dump_pyopenssl_chain(loaded)
|
||||
|
||||
def test_dump_pyopenssl_chain(self):
|
||||
names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem']
|
||||
loaded = [test_util.load_cert(name) for name in names]
|
||||
length = sum(
|
||||
len(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
|
||||
for cert in loaded)
|
||||
assert len(self._call(loaded)) == length
|
||||
|
||||
def test_dump_pyopenssl_chain_wrapped(self):
|
||||
names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem']
|
||||
loaded = [test_util.load_cert(name) for name in names]
|
||||
wrap_func = jose.ComparableX509
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='The next major version of josepy *'
|
||||
)
|
||||
wrapped = [wrap_func(cert) for cert in loaded]
|
||||
dump_func = OpenSSL.crypto.dump_certificate
|
||||
length = sum(len(dump_func(OpenSSL.crypto.FILETYPE_PEM, cert)) for cert in loaded)
|
||||
assert len(self._call(wrapped)) == length
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
"""Tests for acme.jose shim."""
|
||||
import importlib
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,8 @@
|
|||
"""Tests for acme.messages."""
|
||||
import contextlib
|
||||
import sys
|
||||
from typing import Dict
|
||||
import unittest
|
||||
from unittest import mock
|
||||
import warnings
|
||||
|
||||
import josepy as jose
|
||||
import pytest
|
||||
|
|
@ -12,8 +10,8 @@ import pytest
|
|||
from acme import challenges
|
||||
from acme._internal.tests import test_util
|
||||
|
||||
CERT = test_util.load_comparable_cert('cert.der')
|
||||
CSR = test_util.load_comparable_csr('csr.der')
|
||||
CERT = test_util.load_cert('cert.der')
|
||||
CSR = test_util.load_csr('csr.der')
|
||||
KEY = test_util.load_rsa_private_key('rsa512_key.pem')
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -144,8 +144,8 @@ class TLSALPN01ServerTest(unittest.TestCase):
|
|||
# cert = crypto_util.probe_sni(
|
||||
# b'localhost', host=host, port=port, timeout=1)
|
||||
# # Expect normal cert when connecting without ALPN.
|
||||
# self.assertEqual(jose.ComparableX509(cert),
|
||||
# jose.ComparableX509(self.certs[b'localhost'][1]))
|
||||
# self.assertEqual(cert,
|
||||
# self.certs[b'localhost'][1])
|
||||
|
||||
def test_challenge_certs(self):
|
||||
host, port = self.server.socket.getsockname()[:2]
|
||||
|
|
@ -153,7 +153,7 @@ class TLSALPN01ServerTest(unittest.TestCase):
|
|||
b'localhost', host=host, port=port, timeout=1,
|
||||
alpn_protocols=[b"acme-tls/1"])
|
||||
# Expect challenge cert when connecting with ALPN.
|
||||
assert cert.to_cryptography() == self.challenge_certs[b'localhost'][1]
|
||||
assert cert == self.challenge_certs[b'localhost'][1]
|
||||
|
||||
def test_bad_alpn(self):
|
||||
host, port = self.server.socket.getsockname()[:2]
|
||||
|
|
|
|||
|
|
@ -5,8 +5,9 @@
|
|||
"""
|
||||
import importlib.resources
|
||||
import os
|
||||
import sys
|
||||
from typing import Callable
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import josepy as jose
|
||||
|
|
@ -21,37 +22,36 @@ def load_vector(*names):
|
|||
return vector_ref.read_bytes()
|
||||
|
||||
|
||||
def _guess_loader(filename, loader_pem, loader_der):
|
||||
def _guess_loader(filename: str, loader_pem: Callable, loader_der: Callable) -> Callable:
|
||||
_, ext = os.path.splitext(filename)
|
||||
if ext.lower() == '.pem':
|
||||
if ext.lower() == ".pem":
|
||||
return loader_pem
|
||||
elif ext.lower() == '.der':
|
||||
elif ext.lower() == ".der":
|
||||
return loader_der
|
||||
raise ValueError("Loader could not be recognized based on extension") # pragma: no cover
|
||||
else: # pragma: no cover
|
||||
raise ValueError("Loader could not be recognized based on extension")
|
||||
|
||||
|
||||
def load_cert(*names):
|
||||
def _guess_pyopenssl_loader(filename: str, loader_pem: int, loader_der: int) -> int:
|
||||
_, ext = os.path.splitext(filename)
|
||||
if ext.lower() == ".pem":
|
||||
return loader_pem
|
||||
else: # pragma: no cover
|
||||
raise ValueError("Loader could not be recognized based on extension")
|
||||
|
||||
|
||||
def load_cert(*names: str) -> x509.Certificate:
|
||||
"""Load certificate."""
|
||||
loader = _guess_loader(
|
||||
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
return crypto.load_certificate(loader, load_vector(*names))
|
||||
names[-1], x509.load_pem_x509_certificate, x509.load_der_x509_certificate
|
||||
)
|
||||
return loader(load_vector(*names))
|
||||
|
||||
|
||||
def load_comparable_cert(*names):
|
||||
"""Load ComparableX509 cert."""
|
||||
return jose.ComparableX509(load_cert(*names))
|
||||
|
||||
|
||||
def load_csr(*names):
|
||||
def load_csr(*names: str) -> x509.CertificateSigningRequest:
|
||||
"""Load certificate request."""
|
||||
loader = _guess_loader(
|
||||
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
return crypto.load_certificate_request(loader, load_vector(*names))
|
||||
|
||||
|
||||
def load_comparable_csr(*names):
|
||||
"""Load ComparableX509 certificate request."""
|
||||
return jose.ComparableX509(load_csr(*names))
|
||||
loader = _guess_loader(names[-1], x509.load_pem_x509_csr, x509.load_der_x509_csr)
|
||||
return loader(load_vector(*names))
|
||||
|
||||
|
||||
def load_rsa_private_key(*names):
|
||||
|
|
@ -72,6 +72,6 @@ def load_ecdsa_private_key(*names):
|
|||
|
||||
def load_pyopenssl_private_key(*names):
|
||||
"""Load pyOpenSSL private key."""
|
||||
loader = _guess_loader(
|
||||
loader = _guess_pyopenssl_loader(
|
||||
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
return crypto.load_privatekey(loader, load_vector(*names))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
"""Tests for acme.util."""
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
|
|||
|
|
@ -419,7 +419,7 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse):
|
|||
return hashlib.sha256(self.key_authorization.encode('utf-8')).digest()
|
||||
|
||||
def gen_cert(self, domain: str, key: Optional[crypto.PKey] = None, bits: int = 2048
|
||||
) -> Tuple[crypto.X509, crypto.PKey]:
|
||||
) -> Tuple[x509.Certificate, crypto.PKey]:
|
||||
"""Generate tls-alpn-01 certificate.
|
||||
|
||||
:param str domain: Domain verified by the challenge.
|
||||
|
|
@ -428,7 +428,7 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse):
|
|||
fresh key will be generated.
|
||||
:param int bits: Number of bits for newly generated key.
|
||||
|
||||
:rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey`
|
||||
:rtype: `tuple` of `x509.Certificate` and `OpenSSL.crypto.PKey`
|
||||
|
||||
"""
|
||||
if key is None:
|
||||
|
|
@ -450,10 +450,10 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse):
|
|||
force_san=True,
|
||||
extensions=[acme_extension]
|
||||
)
|
||||
return crypto.X509.from_cryptography(cert), key
|
||||
return cert, key
|
||||
|
||||
def probe_cert(self, domain: str, host: Optional[str] = None,
|
||||
port: Optional[int] = None) -> crypto.X509:
|
||||
port: Optional[int] = None) -> x509.Certificate:
|
||||
"""Probe tls-alpn-01 challenge certificate.
|
||||
|
||||
:param str domain: domain being validated, required.
|
||||
|
|
@ -470,20 +470,17 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse):
|
|||
return crypto_util.probe_sni(host=host.encode(), port=port, name=domain.encode(),
|
||||
alpn_protocols=[self.ACME_TLS_1_PROTOCOL])
|
||||
|
||||
def verify_cert(self, domain: str, cert: Union[x509.Certificate, crypto.X509]) -> bool:
|
||||
def verify_cert(self, domain: str, cert: x509.Certificate, ) -> bool:
|
||||
"""Verify tls-alpn-01 challenge certificate.
|
||||
|
||||
:param str domain: Domain name being validated.
|
||||
:param cert: Challenge certificate.
|
||||
:type cert: `cryptography.x509.Certificate` or `OpenSSL.crypto.X509`
|
||||
:type cert: `cryptography.x509.Certificate`
|
||||
|
||||
:returns: Whether the certificate was successfully verified.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not isinstance(cert, x509.Certificate):
|
||||
cert = cert.to_cryptography()
|
||||
|
||||
names = crypto_util.get_names_from_subject_and_extensions(
|
||||
cert.subject, cert.extensions
|
||||
)
|
||||
|
|
@ -506,7 +503,7 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse):
|
|||
|
||||
# pylint: disable=too-many-arguments
|
||||
def simple_verify(self, chall: 'TLSALPN01', domain: str, account_public_key: jose.JWK,
|
||||
cert: Optional[crypto.X509] = None, host: Optional[str] = None,
|
||||
cert: Optional[x509.Certificate] = None, host: Optional[str] = None,
|
||||
port: Optional[int] = None) -> bool:
|
||||
"""Simple verify.
|
||||
|
||||
|
|
@ -516,7 +513,7 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse):
|
|||
:param .challenges.TLSALPN01 chall: Corresponding challenge.
|
||||
:param str domain: Domain name being validated.
|
||||
:param JWK account_public_key:
|
||||
:param OpenSSL.crypto.X509 cert: Optional certificate. If not
|
||||
:param x509.Certificate cert: Optional certificate. If not
|
||||
provided (``None``) certificate will be retrieved using
|
||||
`probe_cert`.
|
||||
:param string host: IP address used to probe the certificate.
|
||||
|
|
@ -547,7 +544,8 @@ class TLSALPN01(KeyAuthorizationChallenge):
|
|||
response_cls = TLSALPN01Response
|
||||
typ = response_cls.typ
|
||||
|
||||
def validation(self, account_key: jose.JWK, **kwargs: Any) -> Tuple[crypto.X509, crypto.PKey]:
|
||||
def validation(self, account_key: jose.JWK,
|
||||
**kwargs: Any) -> Tuple[x509.Certificate, crypto.PKey]:
|
||||
"""Generate validation.
|
||||
|
||||
:param JWK account_key:
|
||||
|
|
@ -556,7 +554,7 @@ class TLSALPN01(KeyAuthorizationChallenge):
|
|||
in certificate generation. If not provided (``None``), then
|
||||
fresh key will be generated.
|
||||
|
||||
:rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey`
|
||||
:rtype: `tuple` of `x509.Certificate` and `OpenSSL.crypto.PKey`
|
||||
|
||||
"""
|
||||
# TODO: Remove cast when response() is generic.
|
||||
|
|
|
|||
|
|
@ -14,12 +14,10 @@ from typing import Optional
|
|||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
import warnings
|
||||
|
||||
from cryptography import x509
|
||||
|
||||
import josepy as jose
|
||||
import OpenSSL
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
from requests.utils import parse_header_links
|
||||
|
|
@ -227,12 +225,8 @@ class ClientV2:
|
|||
:returns: updated order
|
||||
:rtype: messages.OrderResource
|
||||
"""
|
||||
csr = OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_PEM, orderr.csr_pem)
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore',
|
||||
message='The next major version of josepy will remove josepy.util.ComparableX509')
|
||||
wrapped_csr = messages.CertificateRequest(csr=jose.ComparableX509(csr))
|
||||
csr = x509.load_pem_x509_csr(orderr.csr_pem)
|
||||
wrapped_csr = messages.CertificateRequest(csr=csr)
|
||||
res = self._post(orderr.body.finalize, wrapped_csr)
|
||||
orderr = orderr.update(body=messages.Order.from_json(res.json()))
|
||||
return orderr
|
||||
|
|
@ -285,11 +279,10 @@ class ClientV2:
|
|||
self.begin_finalization(orderr)
|
||||
return self.poll_finalization(orderr, deadline, fetch_alternative_chains)
|
||||
|
||||
def revoke(self, cert: jose.ComparableX509, rsn: int) -> None:
|
||||
def revoke(self, cert: x509.Certificate, rsn: int) -> None:
|
||||
"""Revoke certificate.
|
||||
|
||||
:param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in
|
||||
`.ComparableX509`
|
||||
:param x509.Certificate cert: `x509.Certificate`
|
||||
|
||||
:param int rsn: Reason code for certificate revocation.
|
||||
|
||||
|
|
@ -477,11 +470,10 @@ class ClientV2:
|
|||
|
||||
return datetime.datetime.now() + datetime.timedelta(seconds=seconds)
|
||||
|
||||
def _revoke(self, cert: jose.ComparableX509, rsn: int, url: str) -> None:
|
||||
def _revoke(self, cert: x509.Certificate, rsn: int, url: str) -> None:
|
||||
"""Revoke certificate.
|
||||
|
||||
:param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in
|
||||
`.ComparableX509`
|
||||
:param .x509.Certificate cert: `x509.Certificate`
|
||||
|
||||
:param int rsn: Reason code for certificate revocation.
|
||||
|
||||
|
|
|
|||
|
|
@ -1,16 +1,15 @@
|
|||
"""Crypto utilities."""
|
||||
import binascii
|
||||
import contextlib
|
||||
import enum
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import ipaddress
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import typing
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import List
|
||||
from typing import Literal
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Sequence
|
||||
|
|
@ -21,12 +20,11 @@ from typing import Union
|
|||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import dsa, rsa, ec, ed25519, ed448, types
|
||||
import josepy as jose
|
||||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
from OpenSSL import crypto
|
||||
from OpenSSL import SSL
|
||||
|
||||
from acme import errors
|
||||
import warnings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -49,13 +47,13 @@ class Format(enum.IntEnum):
|
|||
DER = crypto.FILETYPE_ASN1
|
||||
PEM = crypto.FILETYPE_PEM
|
||||
|
||||
def to_cryptography_encoding(self) -> serialization.Encoding:
|
||||
def to_cryptography_encoding(self) -> Encoding:
|
||||
"""Converts the Format to the corresponding cryptography `Encoding`.
|
||||
"""
|
||||
if self == Format.DER:
|
||||
return serialization.Encoding.DER
|
||||
return Encoding.DER
|
||||
else:
|
||||
return serialization.Encoding.PEM
|
||||
return Encoding.PEM
|
||||
|
||||
|
||||
_KeyAndCert = Union[
|
||||
|
|
@ -74,6 +72,7 @@ class _DefaultCertSelection:
|
|||
return self.certs.get(server_name, None)
|
||||
return None # pragma: no cover
|
||||
|
||||
|
||||
class SSLSocket: # pylint: disable=too-few-public-methods
|
||||
"""SSL wrapper for sockets.
|
||||
|
||||
|
|
@ -135,6 +134,8 @@ class SSLSocket: # pylint: disable=too-few-public-methods
|
|||
new_context = SSL.Context(self.method)
|
||||
new_context.set_min_proto_version(SSL.TLS1_2_VERSION)
|
||||
new_context.use_privatekey(key)
|
||||
if isinstance(cert, x509.Certificate):
|
||||
cert = crypto.X509.from_cryptography(cert)
|
||||
new_context.use_certificate(cert)
|
||||
if self.alpn_selection is not None:
|
||||
new_context.set_alpn_select_callback(self.alpn_selection)
|
||||
|
|
@ -196,7 +197,7 @@ class SSLSocket: # pylint: disable=too-few-public-methods
|
|||
|
||||
def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, # pylint: disable=too-many-arguments
|
||||
method: int = _DEFAULT_SSL_METHOD, source_address: Tuple[str, int] = ('', 0),
|
||||
alpn_protocols: Optional[Sequence[bytes]] = None) -> crypto.X509:
|
||||
alpn_protocols: Optional[Sequence[bytes]] = None) -> x509.Certificate:
|
||||
"""Probe SNI server for SSL certificate.
|
||||
|
||||
:param bytes name: Byte string to send as the server name in the
|
||||
|
|
@ -214,7 +215,7 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, #
|
|||
:raises acme.errors.Error: In case of any problems.
|
||||
|
||||
:returns: SSL certificate presented by the server.
|
||||
:rtype: OpenSSL.crypto.X509
|
||||
:rtype: cryptography.x509.Certificate
|
||||
|
||||
"""
|
||||
context = SSL.Context(method)
|
||||
|
|
@ -248,7 +249,7 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, #
|
|||
raise errors.Error(error)
|
||||
cert = client_ssl.get_peer_certificate()
|
||||
assert cert # Appease mypy. We would have crashed out by now if there was no certificate.
|
||||
return cert
|
||||
return cert.to_cryptography()
|
||||
|
||||
|
||||
# Even *more* annoyingly, due to a mypy bug, we can't use Union[] types in
|
||||
|
|
@ -321,7 +322,7 @@ def make_csr(
|
|||
)
|
||||
|
||||
csr = builder.sign(private_key, hashes.SHA256())
|
||||
return csr.public_bytes(serialization.Encoding.PEM)
|
||||
return csr.public_bytes(Encoding.PEM)
|
||||
|
||||
|
||||
def get_names_from_subject_and_extensions(
|
||||
|
|
@ -358,32 +359,16 @@ def get_names_from_subject_and_extensions(
|
|||
return [cns[0]] + [d for d in dns_names if d != cns[0]]
|
||||
|
||||
|
||||
def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req: Union[crypto.X509, crypto.X509Req]
|
||||
) -> List[str]:
|
||||
"""
|
||||
Deprecated
|
||||
.. deprecated: 3.2.1
|
||||
"""
|
||||
warnings.warn(
|
||||
"acme.crypto_util._pyopenssl_cert_or_req_all_names is deprecated and "
|
||||
"will be removed in the next major release of Certbot.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
cert_or_req = loaded_cert_or_req.to_cryptography()
|
||||
return get_names_from_subject_and_extensions(
|
||||
cert_or_req.subject, cert_or_req.extensions
|
||||
)
|
||||
|
||||
|
||||
def _pyopenssl_cert_or_req_san(cert_or_req: Union[crypto.X509, crypto.X509Req]) -> List[str]:
|
||||
def _cryptography_cert_or_req_san(
|
||||
cert_or_req: Union[x509.Certificate, x509.CertificateSigningRequest],
|
||||
) -> List[str]:
|
||||
"""Get Subject Alternative Names from certificate or CSR using pyOpenSSL.
|
||||
|
||||
.. note:: Although this is `acme` internal API, it is used by
|
||||
`letsencrypt`.
|
||||
|
||||
:param cert_or_req: Certificate or CSR.
|
||||
:type cert_or_req: `OpenSSL.crypto.X509` or `OpenSSL.crypto.X509Req`.
|
||||
:type cert_or_req: `x509.Certificate` or `x509.CertificateSigningRequest`.
|
||||
|
||||
:returns: A list of Subject Alternative Names that is DNS.
|
||||
:rtype: `list` of `str`
|
||||
|
|
@ -391,13 +376,8 @@ def _pyopenssl_cert_or_req_san(cert_or_req: Union[crypto.X509, crypto.X509Req])
|
|||
Deprecated
|
||||
.. deprecated: 3.2.1
|
||||
"""
|
||||
warnings.warn(
|
||||
"acme.crypto_util._pyopenssl_cert_or_req_san is deprecated and "
|
||||
"will be removed in the next major release of Certbot.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
exts = cert_or_req.to_cryptography().extensions
|
||||
# ???: is this translation needed?
|
||||
exts = cert_or_req.extensions
|
||||
try:
|
||||
san_ext = exts.get_extension_for_class(x509.SubjectAlternativeName)
|
||||
except x509.ExtensionNotFound:
|
||||
|
|
@ -486,85 +466,13 @@ def make_self_signed_cert(private_key: types.CertificateIssuerPrivateKeyTypes,
|
|||
return builder.sign(private_key, hashes.SHA256())
|
||||
|
||||
|
||||
def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None,
|
||||
not_before: Optional[int] = None,
|
||||
validity: int = (7 * 24 * 60 * 60), force_san: bool = True,
|
||||
extensions: Optional[List[crypto.X509Extension]] = None,
|
||||
ips: Optional[List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]] = None
|
||||
) -> crypto.X509:
|
||||
"""Generate new self-signed certificate.
|
||||
|
||||
:type domains: `list` of `str`
|
||||
:param OpenSSL.crypto.PKey key:
|
||||
:param bool force_san:
|
||||
:param extensions: List of additional extensions to include in the cert.
|
||||
:type extensions: `list` of `OpenSSL.crypto.X509Extension`
|
||||
:type ips: `list` of (`ipaddress.IPv4Address` or `ipaddress.IPv6Address`)
|
||||
|
||||
If more than one domain is provided, all of the domains are put into
|
||||
``subjectAltName`` X.509 extension and first domain is set as the
|
||||
subject CN. If only one domain is provided no ``subjectAltName``
|
||||
extension is used, unless `force_san` is ``True``.
|
||||
|
||||
.. deprecated: 2.10.0
|
||||
"""
|
||||
warnings.warn(
|
||||
"acme.crypto_util.gen_ss_cert is deprecated and will be removed in the "
|
||||
"next major release of Certbot. Please use "
|
||||
"acme.crypto_util.make_self_signed_cert instead.", DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
assert domains or ips, "Must provide one or more hostnames or IPs for the cert."
|
||||
|
||||
cert = crypto.X509()
|
||||
cert.set_serial_number(int(binascii.hexlify(os.urandom(16)), 16))
|
||||
cert.set_version(2)
|
||||
|
||||
if extensions is None:
|
||||
extensions = []
|
||||
if domains is None:
|
||||
domains = []
|
||||
if ips is None:
|
||||
ips = []
|
||||
extensions.append(
|
||||
crypto.X509Extension(
|
||||
b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
|
||||
)
|
||||
|
||||
if len(domains) > 0:
|
||||
cert.get_subject().CN = domains[0]
|
||||
# TODO: what to put into cert.get_subject()?
|
||||
cert.set_issuer(cert.get_subject())
|
||||
|
||||
sanlist = []
|
||||
for address in domains:
|
||||
sanlist.append('DNS:' + address)
|
||||
for ip in ips:
|
||||
sanlist.append('IP:' + ip.exploded)
|
||||
san_string = ', '.join(sanlist).encode('ascii')
|
||||
if force_san or len(domains) > 1 or len(ips) > 0:
|
||||
extensions.append(crypto.X509Extension(
|
||||
b"subjectAltName",
|
||||
critical=False,
|
||||
value=san_string
|
||||
))
|
||||
|
||||
cert.add_extensions(extensions)
|
||||
|
||||
cert.gmtime_adj_notBefore(0 if not_before is None else not_before)
|
||||
cert.gmtime_adj_notAfter(validity)
|
||||
|
||||
cert.set_pubkey(key)
|
||||
cert.sign(key, "sha256")
|
||||
return cert
|
||||
|
||||
|
||||
def dump_pyopenssl_chain(chain: Union[List[jose.ComparableX509], List[crypto.X509]],
|
||||
filetype: Union[Format, int] = Format.PEM) -> bytes:
|
||||
def dump_cryptography_chain(
|
||||
chain: List[x509.Certificate],
|
||||
encoding: Literal[Encoding.PEM, Encoding.DER] = Encoding.PEM,
|
||||
) -> bytes:
|
||||
"""Dump certificate chain into a bundle.
|
||||
|
||||
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
|
||||
:class:`josepy.util.ComparableX509`).
|
||||
:param list chain: List of `cryptography.x509.Certificate`.
|
||||
|
||||
:returns: certificate chain bundle
|
||||
:rtype: bytes
|
||||
|
|
@ -572,24 +480,12 @@ def dump_pyopenssl_chain(chain: Union[List[jose.ComparableX509], List[crypto.X50
|
|||
Deprecated
|
||||
.. deprecated: 3.2.1
|
||||
"""
|
||||
warnings.warn(
|
||||
"acme.crypto_util.dump_pyopenssl_chain is deprecated and "
|
||||
"will be removed in the next major release of Certbot.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
# XXX: returns empty string when no chain is available, which
|
||||
# shuts up RenewableCert, but might not be the best solution...
|
||||
|
||||
filetype = Format(filetype)
|
||||
def _dump_cert(cert: Union[jose.ComparableX509, crypto.X509]) -> bytes:
|
||||
if isinstance(cert, jose.ComparableX509):
|
||||
if isinstance(cert.wrapped, crypto.X509Req):
|
||||
raise errors.Error("Unexpected CSR provided.") # pragma: no cover
|
||||
cert = cert.wrapped
|
||||
def _dump_cert(cert: x509.Certificate) -> bytes:
|
||||
return cert.public_bytes(encoding)
|
||||
|
||||
return cert.to_cryptography().public_bytes(filetype.to_cryptography_encoding())
|
||||
|
||||
# assumes that OpenSSL.crypto.dump_certificate includes ending
|
||||
# assumes that x509.Certificate.public_bytes includes ending
|
||||
# newline character
|
||||
return b"".join(_dump_cert(cert) for cert in chain)
|
||||
|
|
|
|||
|
|
@ -12,7 +12,8 @@ from typing import Optional
|
|||
from typing import Tuple
|
||||
from typing import Type
|
||||
from typing import TypeVar
|
||||
import warnings
|
||||
|
||||
from cryptography import x509
|
||||
|
||||
import josepy as jose
|
||||
|
||||
|
|
@ -580,45 +581,33 @@ class AuthorizationResource(ResourceWithURI):
|
|||
class CertificateRequest(jose.JSONObjectWithFields):
|
||||
"""ACME newOrder request.
|
||||
|
||||
:ivar jose.ComparableX509 csr:
|
||||
`OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
|
||||
:ivar x509.CertificateSigningRequest csr: `x509.CertificateSigningRequest`
|
||||
|
||||
"""
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore',
|
||||
message='The next major version of josepy will remove josepy.util.ComparableX509')
|
||||
csr: jose.ComparableX509 = jose.field(
|
||||
'csr', decoder=jose.decode_csr, encoder=jose.encode_csr)
|
||||
csr: x509.CertificateSigningRequest = jose.field(
|
||||
'csr', decoder=jose.decode_csr, encoder=jose.encode_csr)
|
||||
|
||||
|
||||
class CertificateResource(ResourceWithURI):
|
||||
"""Certificate Resource.
|
||||
|
||||
:ivar josepy.util.ComparableX509 body:
|
||||
`OpenSSL.crypto.X509` wrapped in `.ComparableX509`
|
||||
:ivar x509.Certificate body: `x509.Certificate`
|
||||
:ivar str cert_chain_uri: URI found in the 'up' ``Link`` header
|
||||
:ivar tuple authzrs: `tuple` of `AuthorizationResource`.
|
||||
|
||||
"""
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore',
|
||||
message='The next major version of josepy will remove josepy.util.ComparableX509')
|
||||
cert_chain_uri: str = jose.field('cert_chain_uri')
|
||||
cert_chain_uri: str = jose.field('cert_chain_uri')
|
||||
authzrs: Tuple[AuthorizationResource, ...] = jose.field('authzrs')
|
||||
|
||||
|
||||
class Revocation(jose.JSONObjectWithFields):
|
||||
"""Revocation message.
|
||||
|
||||
:ivar jose.ComparableX509 certificate: `OpenSSL.crypto.X509` wrapped in
|
||||
`jose.ComparableX509`
|
||||
:ivar x509.Certificate certificate: `x509.Certificate`
|
||||
|
||||
"""
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore',
|
||||
message='The next major version of josepy will remove josepy.util.ComparableX509')
|
||||
certificate: jose.ComparableX509 = jose.field(
|
||||
'certificate', decoder=jose.decode_cert, encoder=jose.encode_cert)
|
||||
certificate: x509.Certificate = jose.field(
|
||||
'certificate', decoder=jose.decode_cert, encoder=jose.encode_cert)
|
||||
reason: int = jose.field('reason')
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -200,11 +200,7 @@ def example_http():
|
|||
|
||||
# Revoke certificate
|
||||
|
||||
fullchain_com = jose.ComparableX509(
|
||||
OpenSSL.crypto.X509.from_cryptography(
|
||||
x509.load_pem_x509_certificate(fullchain_pem)
|
||||
)
|
||||
)
|
||||
fullchain_com = x509.load_pem_x509_certificate(fullchain_pem)
|
||||
|
||||
try:
|
||||
client_acme.revoke(fullchain_com, 0) # revocation reason = 0
|
||||
|
|
|
|||
|
|
@ -1,5 +1,3 @@
|
|||
import sys
|
||||
|
||||
from setuptools import find_packages
|
||||
from setuptools import setup
|
||||
|
||||
|
|
@ -7,9 +5,7 @@ version = '4.0.0.dev0'
|
|||
|
||||
install_requires = [
|
||||
'cryptography>=43.0.0',
|
||||
# Josepy 2+ may introduce backward incompatible changes by droping usage of
|
||||
# deprecated PyOpenSSL APIs.
|
||||
'josepy>=1.13.0, <2',
|
||||
'josepy>=2.0.0',
|
||||
# PyOpenSSL>=25.0.0 is just needed to satisfy mypy right now so this dependency can probably be
|
||||
# relaxed to >=24.0.0 if needed.
|
||||
'PyOpenSSL>=25.0.0',
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ class Validator:
|
|||
name = name if isinstance(name, bytes) else name.encode()
|
||||
|
||||
try:
|
||||
presented_cert = crypto_util.probe_sni(name, host, port).to_cryptography()
|
||||
presented_cert = crypto_util.probe_sni(name, host, port)
|
||||
except acme_errors.Error as error:
|
||||
logger.exception(str(error))
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -15,6 +15,14 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
|
|||
renewal at 30 days before expiration. The config field renew_before_expiry
|
||||
still overrides this default.
|
||||
|
||||
* removed `acme.crypto_util._pyopenssl_cert_or_req_all_names`
|
||||
* removed `acme.crypto_util._pyopenssl_cert_or_req_san`
|
||||
* removed `acme.crypto_util.dump_pyopenssl_chain`
|
||||
* removed `acme.crypto_util.gen_ss_cert`
|
||||
* removed `certbot.crypto_util.dump_pyopenssl_chain`
|
||||
* removed `certbot.crypto_util.pyopenssl_load_certificate`
|
||||
|
||||
|
||||
### Fixed
|
||||
|
||||
* Moved `RewriteEngine on` directive added during apache http01 authentication
|
||||
|
|
@ -33,9 +41,9 @@ More details about these changes can be found on our GitHub repo.
|
|||
|
||||
* The --register-unsafely-without-email flag is no longer needed in non-interactive mode.
|
||||
* In interactive mode, pressing Enter at the email prompt will register without an email.
|
||||
* deprecated `acme.crypto_util.dump_pyopenssl_chain`
|
||||
* deprecated `acme.crypto_util._pyopenssl_cert_or_req_all_names`
|
||||
* deprecated `acme.crypto_util._pyopenssl_cert_or_req_san`
|
||||
* deprecated `acme.crypto_util.dump_pyopenssl_chain`
|
||||
* deprecated `certbot.crypto_util.dump_pyopenssl_chain`
|
||||
* deprecated `certbot.crypto_util.pyopenssl_load_certificate`
|
||||
|
||||
|
|
|
|||
|
|
@ -48,8 +48,9 @@ from certbot.interfaces import AccountStorage
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def acme_from_config_key(config: configuration.NamespaceConfig, key: jose.JWK,
|
||||
regr: Optional[messages.RegistrationResource] = None
|
||||
def acme_from_config_key(config: configuration.NamespaceConfig,
|
||||
key: jose.JWK,
|
||||
regr: Optional[messages.RegistrationResource] = None,
|
||||
) -> acme_client.ClientV2:
|
||||
"""Wrangle ACME client construction"""
|
||||
if key.typ == 'EC':
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ from typing import TypeVar
|
|||
from typing import Union
|
||||
|
||||
import configobj
|
||||
from cryptography import x509
|
||||
import josepy as jose
|
||||
from josepy import b64
|
||||
|
||||
|
|
@ -1364,10 +1365,10 @@ def revoke(config: configuration.NamespaceConfig,
|
|||
acme = client.acme_from_config_key(config, acc.key, acc.regr)
|
||||
|
||||
with open(config.cert_path, 'rb') as f:
|
||||
cert = crypto_util.pyopenssl_load_certificate(f.read())[0]
|
||||
cert = x509.load_pem_x509_certificate(f.read())
|
||||
logger.debug("Reason code for revocation: %s", config.reason)
|
||||
try:
|
||||
acme.revoke(jose.ComparableX509(cert), config.reason)
|
||||
acme.revoke(cert, config.reason)
|
||||
_delete_if_appropriate(config)
|
||||
except acme_errors.ClientError as e:
|
||||
return str(e)
|
||||
|
|
|
|||
|
|
@ -1106,8 +1106,7 @@ class RenewableCert(interfaces.RenewableCert):
|
|||
logger.debug("Writing chain to %s.", target["chain"])
|
||||
f_b.write(chain)
|
||||
with open(target["fullchain"], "wb") as f_b:
|
||||
# assumes that OpenSSL.crypto.dump_certificate includes
|
||||
# ending newline character
|
||||
# assumes the cert includes ending newline character
|
||||
logger.debug("Writing full chain to %s.", target["fullchain"])
|
||||
f_b.write(cert + chain)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,19 +1,17 @@
|
|||
"""Tests for certbot.crypto_util."""
|
||||
import binascii
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import unittest
|
||||
from unittest import mock
|
||||
import warnings
|
||||
|
||||
import OpenSSL
|
||||
import pytest
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot.compat import filesystem
|
||||
|
|
@ -143,7 +141,7 @@ class ImportCSRFileTest(unittest.TestCase):
|
|||
data = test_util.load_vector('csr_512.der')
|
||||
data_pem = test_util.load_vector('csr_512.pem')
|
||||
|
||||
assert (OpenSSL.crypto.FILETYPE_PEM,
|
||||
assert (acme_crypto_util.Format.PEM,
|
||||
util.CSR(file=csrfile,
|
||||
data=data_pem,
|
||||
form="pem"),
|
||||
|
|
@ -154,7 +152,7 @@ class ImportCSRFileTest(unittest.TestCase):
|
|||
csrfile = test_util.vector_path('csr_512.pem')
|
||||
data = test_util.load_vector('csr_512.pem')
|
||||
|
||||
assert (OpenSSL.crypto.FILETYPE_PEM,
|
||||
assert (acme_crypto_util.Format.PEM,
|
||||
util.CSR(file=csrfile,
|
||||
data=data,
|
||||
form="pem"),
|
||||
|
|
@ -396,39 +394,8 @@ class GetNamesFromReqTest(unittest.TestCase):
|
|||
self._call(test_util.load_vector('csr-6sans_512.pem'))
|
||||
|
||||
def test_der(self):
|
||||
from OpenSSL.crypto import FILETYPE_ASN1
|
||||
assert ['Example.com'] == \
|
||||
self._call(test_util.load_vector('csr_512.der'), typ=FILETYPE_ASN1)
|
||||
|
||||
|
||||
class CertLoaderTest(unittest.TestCase):
|
||||
"""Tests for certbot.crypto_util.pyopenssl_load_certificate"""
|
||||
|
||||
def test_load_valid_cert(self):
|
||||
from certbot.crypto_util import pyopenssl_load_certificate
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='certbot.crypto_util.pyopenssl_load_certificate is *'
|
||||
)
|
||||
cert, file_type = pyopenssl_load_certificate(CERT)
|
||||
|
||||
assert file_type == OpenSSL.crypto.FILETYPE_PEM
|
||||
assert binascii.unhexlify(
|
||||
cert.digest("sha256").replace(b":", b"")
|
||||
) == x509.load_pem_x509_certificate(CERT).fingerprint(hashes.SHA256())
|
||||
|
||||
def test_load_invalid_cert(self):
|
||||
from certbot.crypto_util import pyopenssl_load_certificate
|
||||
bad_cert_data = CERT.replace(b"BEGIN CERTIFICATE", b"ASDFASDFASDF!!!")
|
||||
with pytest.raises(errors.Error):
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='certbot.crypto_util.pyopenssl_load_certificate is *'
|
||||
)
|
||||
pyopenssl_load_certificate(bad_cert_data)
|
||||
self._call(test_util.load_vector('csr_512.der'), typ=acme_crypto_util.Format.DER)
|
||||
|
||||
|
||||
class NotBeforeTest(unittest.TestCase):
|
||||
|
|
@ -460,20 +427,19 @@ class Sha256sumTest(unittest.TestCase):
|
|||
class CertAndChainFromFullchainTest(unittest.TestCase):
|
||||
"""Tests for certbot.crypto_util.cert_and_chain_from_fullchain"""
|
||||
|
||||
def _parse_and_reencode_pem(self, cert_pem):
|
||||
from OpenSSL import crypto
|
||||
return crypto.dump_certificate(crypto.FILETYPE_PEM,
|
||||
crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)).decode()
|
||||
def _parse_and_reencode_pem(self, cert_pem:str)->str:
|
||||
cert = x509.load_pem_x509_certificate(cert_pem.encode())
|
||||
return cert.public_bytes(Encoding.PEM).decode()
|
||||
|
||||
def test_cert_and_chain_from_fullchain(self):
|
||||
cert_pem = CERT.decode()
|
||||
chain_pem = cert_pem + SS_CERT.decode()
|
||||
fullchain_pem = cert_pem + chain_pem
|
||||
spacey_fullchain_pem = cert_pem + u'\n' + chain_pem
|
||||
crlf_fullchain_pem = fullchain_pem.replace(u'\n', u'\r\n')
|
||||
cert_pem: str = CERT.decode()
|
||||
chain_pem: str = cert_pem + SS_CERT.decode()
|
||||
fullchain_pem: str = cert_pem + chain_pem
|
||||
spacey_fullchain_pem: str = cert_pem + u'\n' + chain_pem
|
||||
crlf_fullchain_pem: str = fullchain_pem.replace(u'\n', u'\r\n')
|
||||
|
||||
# In the ACME v1 code path, the fullchain is constructed by loading cert+chain DERs
|
||||
# and using OpenSSL to dump them, so here we confirm that OpenSSL is producing certs
|
||||
# and using OpenSSL to dump them, so here we confirm that cryptography is producing certs
|
||||
# that will be parseable by cert_and_chain_from_fullchain.
|
||||
acmev1_fullchain_pem = self._parse_and_reencode_pem(cert_pem) + \
|
||||
self._parse_and_reencode_pem(cert_pem) + self._parse_and_reencode_pem(SS_CERT.decode())
|
||||
|
|
|
|||
|
|
@ -14,15 +14,14 @@ import traceback
|
|||
from typing import List
|
||||
import unittest
|
||||
from unittest import mock
|
||||
import warnings
|
||||
|
||||
import configobj
|
||||
from cryptography import x509
|
||||
import josepy as jose
|
||||
import pytest
|
||||
import pytz
|
||||
|
||||
from acme.messages import Error as acme_error
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
|
|
@ -219,6 +218,7 @@ class RunTest(test_util.ConfigTestCase):
|
|||
with pytest.raises(errors.NotSupportedError):
|
||||
main.run(self.config, plugins)
|
||||
|
||||
|
||||
class CertonlyTest(unittest.TestCase):
|
||||
"""Tests for certbot._internal.main.certonly."""
|
||||
|
||||
|
|
@ -436,12 +436,7 @@ class RevokeTest(test_util.TempDirTestCase):
|
|||
config = cli.prepare_and_parse_args(plugins, args)
|
||||
|
||||
from certbot._internal.main import revoke
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='certbot.crypto_util.pyopenssl_load_certificate is *'
|
||||
)
|
||||
revoke(config, plugins)
|
||||
revoke(config, plugins)
|
||||
|
||||
@mock.patch('certbot._internal.main._delete_if_appropriate')
|
||||
@mock.patch('certbot._internal.main.client.acme_client')
|
||||
|
|
@ -1801,23 +1796,16 @@ class MainTest(test_util.ConfigTestCase):
|
|||
mock_delete_if_appropriate):
|
||||
mock_delete_if_appropriate.return_value = False
|
||||
server = 'foo.bar'
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='certbot.crypto_util.pyopenssl_load_certificate is *'
|
||||
)
|
||||
self._call_no_clientmock(['--cert-path', SS_CERT_PATH, '--key-path', RSA2048_KEY_PATH,
|
||||
'--server', server, 'revoke'])
|
||||
with open(RSA2048_KEY_PATH, 'rb') as f:
|
||||
assert mock_acme_client.ClientV2.call_count == 1
|
||||
assert mock_acme_client.ClientNetwork.call_args[0][0] == \
|
||||
jose.JWK.load(f.read())
|
||||
with open(SS_CERT_PATH, 'rb') as f:
|
||||
cert = crypto_util.pyopenssl_load_certificate(f.read())[0]
|
||||
mock_revoke = mock_acme_client.ClientV2().revoke
|
||||
mock_revoke.assert_called_once_with(
|
||||
jose.ComparableX509(cert),
|
||||
mock.ANY)
|
||||
self._call_no_clientmock(['--cert-path', SS_CERT_PATH, '--key-path', RSA2048_KEY_PATH,
|
||||
'--server', server, 'revoke'])
|
||||
with open(RSA2048_KEY_PATH, 'rb') as f:
|
||||
assert mock_acme_client.ClientV2.call_count == 1
|
||||
assert mock_acme_client.ClientNetwork.call_args[0][0] == \
|
||||
jose.JWK.load(f.read())
|
||||
with open(SS_CERT_PATH, 'rb') as f:
|
||||
cert = x509.load_pem_x509_certificate(f.read())
|
||||
mock_revoke = mock_acme_client.ClientV2().revoke
|
||||
mock_revoke.assert_called_once_with(cert, mock.ANY)
|
||||
|
||||
def test_revoke_with_key_mismatch(self):
|
||||
server = 'foo.bar'
|
||||
|
|
@ -1831,18 +1819,11 @@ class MainTest(test_util.ConfigTestCase):
|
|||
mock_delete_if_appropriate):
|
||||
mock_delete_if_appropriate.return_value = False
|
||||
mock_determine_account.return_value = (mock.MagicMock(), None)
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='certbot.crypto_util.pyopenssl_load_certificate is *'
|
||||
)
|
||||
_, _, _, client = self._call(['--cert-path', CERT, 'revoke'])
|
||||
with open(CERT) as f:
|
||||
cert = crypto_util.pyopenssl_load_certificate(f.read())[0]
|
||||
mock_revoke = client.acme_from_config_key().revoke
|
||||
mock_revoke.assert_called_once_with(
|
||||
jose.ComparableX509(cert),
|
||||
mock.ANY)
|
||||
_, _, _, client = self._call(['--cert-path', CERT, 'revoke'])
|
||||
with open(CERT, 'rb') as f:
|
||||
cert = x509.load_pem_x509_certificate(f.read())
|
||||
mock_revoke = client.acme_from_config_key().revoke
|
||||
mock_revoke.assert_called_once_with(cert, mock.ANY)
|
||||
|
||||
@mock.patch('certbot._internal.log.post_arg_parse_setup')
|
||||
def test_register(self, _):
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ from typing import Set
|
|||
from typing import Tuple
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Union
|
||||
import warnings
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
|
|
@ -32,8 +31,6 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
|
|||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
from cryptography.hazmat.primitives.serialization import NoEncryption
|
||||
from cryptography.hazmat.primitives.serialization import PrivateFormat
|
||||
import josepy
|
||||
from OpenSSL import crypto
|
||||
from OpenSSL import SSL
|
||||
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
|
|
@ -391,32 +388,6 @@ def verify_fullchain(renewable_cert: interfaces.RenewableCert) -> None:
|
|||
raise e
|
||||
|
||||
|
||||
def pyopenssl_load_certificate(data: bytes) -> Tuple[crypto.X509, int]:
|
||||
"""Load PEM/DER certificate.
|
||||
|
||||
:raises errors.Error:
|
||||
|
||||
Deprecated
|
||||
.. deprecated: 3.2.1
|
||||
"""
|
||||
warnings.warn(
|
||||
"certbot.crypto_util.pyopenssl_load_certificate is deprecated and "
|
||||
"will be removed in the next major release of Certbot.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
|
||||
openssl_errors = []
|
||||
|
||||
for file_type in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1):
|
||||
try:
|
||||
return crypto.load_certificate(file_type, data), file_type
|
||||
except crypto.Error as error: # TODO: other errors?
|
||||
openssl_errors.append(error)
|
||||
raise errors.Error("Unable to load: {0}".format(",".join(
|
||||
str(error) for error in openssl_errors)))
|
||||
|
||||
|
||||
def get_sans_from_cert(
|
||||
cert: bytes, typ: Union[acme_crypto_util.Format, int] = acme_crypto_util.Format.PEM
|
||||
) -> List[str]:
|
||||
|
|
@ -491,29 +462,6 @@ def get_names_from_req(
|
|||
)
|
||||
|
||||
|
||||
def dump_pyopenssl_chain(
|
||||
chain: Union[List[crypto.X509], List[josepy.ComparableX509]],
|
||||
filetype: Union[acme_crypto_util.Format, int] = acme_crypto_util.Format.PEM,
|
||||
) -> bytes:
|
||||
"""Dump certificate chain into a bundle.
|
||||
|
||||
:param list chain: List of `crypto.X509` (or wrapped in
|
||||
:class:`josepy.util.ComparableX509`).
|
||||
|
||||
Deprecated
|
||||
.. deprecated: 3.2.1
|
||||
"""
|
||||
warnings.warn(
|
||||
"certbot.crypto_util.dump_pyopenssl_chain is deprecated and "
|
||||
"will be removed in the next major release of Certbot.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
# XXX: returns empty string when no chain is available, which
|
||||
# shuts up RenewableCert, but might not be the best solution...
|
||||
return acme_crypto_util.dump_pyopenssl_chain(chain, filetype)
|
||||
|
||||
|
||||
def notBefore(cert_path: str) -> datetime.datetime:
|
||||
"""When does the cert at cert_path start being valid?
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ from typing import Union
|
|||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
|
||||
|
|
@ -98,7 +99,7 @@ def load_vector(*names: str) -> bytes:
|
|||
return data
|
||||
|
||||
|
||||
def _guess_loader(filename: str, loader_pem: int, loader_der: int) -> int:
|
||||
def _guess_loader(filename: str, loader_pem: Callable, loader_der: Callable) -> Callable:
|
||||
_, ext = os.path.splitext(filename)
|
||||
if ext.lower() == '.pem':
|
||||
return loader_pem
|
||||
|
|
@ -107,23 +108,12 @@ def _guess_loader(filename: str, loader_pem: int, loader_der: int) -> int:
|
|||
raise ValueError("Loader could not be recognized based on extension") # pragma: no cover
|
||||
|
||||
|
||||
def load_cert(*names: str) -> crypto.X509:
|
||||
def load_cert(*names: str) -> x509.Certificate:
|
||||
"""Load certificate."""
|
||||
loader = _guess_loader(
|
||||
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
return crypto.load_certificate(loader, load_vector(*names))
|
||||
|
||||
|
||||
def load_csr(*names: str) -> crypto.X509Req:
|
||||
"""Load certificate request."""
|
||||
loader = _guess_loader(
|
||||
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
return crypto.load_certificate_request(loader, load_vector(*names))
|
||||
|
||||
|
||||
def load_comparable_csr(*names: str) -> jose.ComparableX509:
|
||||
"""Load ComparableX509 certificate request."""
|
||||
return jose.ComparableX509(load_csr(*names))
|
||||
names[-1], x509.load_pem_x509_certificate, x509.load_der_x509_certificate
|
||||
)
|
||||
return loader(load_vector(*names))
|
||||
|
||||
|
||||
def load_jose_rsa_private_key_pem(*names: str) -> jose.ComparableRSAKey:
|
||||
|
|
@ -131,9 +121,19 @@ def load_jose_rsa_private_key_pem(*names: str) -> jose.ComparableRSAKey:
|
|||
return jose.ComparableRSAKey(load_rsa_private_key_pem(*names))
|
||||
|
||||
|
||||
def _guess_loader_pyopenssl(filename: str, loader_pem: int, loader_der: int) -> int:
|
||||
# note: used by `load_rsa_private_key_pem`
|
||||
_, ext = os.path.splitext(filename)
|
||||
if ext.lower() == '.pem':
|
||||
return loader_pem
|
||||
elif ext.lower() == '.der':
|
||||
return loader_der
|
||||
raise ValueError("Loader could not be recognized based on extension") # pragma: no cover
|
||||
|
||||
|
||||
def load_rsa_private_key_pem(*names: str) -> RSAPrivateKey:
|
||||
"""Load RSA private key."""
|
||||
loader = _guess_loader(names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
loader = _guess_loader_pyopenssl(names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
loader_fn: Callable[..., Any]
|
||||
if loader == crypto.FILETYPE_PEM:
|
||||
loader_fn = serialization.load_pem_private_key
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import re
|
|||
from setuptools import find_packages
|
||||
from setuptools import setup
|
||||
|
||||
|
||||
def read_file(filename, encoding='utf8'):
|
||||
"""Read unicode from given file."""
|
||||
with codecs.open(filename, encoding=encoding) as fd:
|
||||
|
|
@ -33,9 +34,7 @@ install_requires = [
|
|||
'cryptography>=43.0.0',
|
||||
'distro>=1.0.1',
|
||||
'importlib_metadata>=4.6; python_version < "3.10"',
|
||||
# Josepy 2+ may introduce backward incompatible changes by droping usage of
|
||||
# deprecated PyOpenSSL APIs.
|
||||
'josepy>=1.13.0, <2',
|
||||
'josepy>=2.0.0',
|
||||
'parsedatetime>=2.4',
|
||||
'pyrfc3339',
|
||||
'pytz>=2019.3',
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ iniconfig==2.0.0 ; python_version >= "3.9" and python_version < "3.10"
|
|||
ipaddress==1.0.16 ; python_version >= "3.9" and python_version < "3.10"
|
||||
isort==6.0.0 ; python_version >= "3.9" and python_version < "3.10"
|
||||
jmespath==0.10.0 ; python_version >= "3.9" and python_version < "3.10"
|
||||
josepy==1.15.0 ; python_version >= "3.9" and python_version < "3.10"
|
||||
josepy==2.0.0 ; python_version >= "3.9" and python_version < "3.10"
|
||||
jsonlines==4.0.0 ; python_version >= "3.9" and python_version < "3.10"
|
||||
mccabe==0.7.0 ; python_version >= "3.9" and python_version < "3.10"
|
||||
mypy-extensions==1.0.0 ; python_version >= "3.9" and python_version < "3.10"
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ jedi==0.19.2 ; python_version >= "3.9" and python_version < "4.0"
|
|||
jeepney==0.8.0 ; python_version >= "3.9" and python_version < "4.0" and sys_platform == "linux"
|
||||
jinja2==3.1.5 ; python_version >= "3.9" and python_version < "4.0"
|
||||
jmespath==1.0.1 ; python_version >= "3.9" and python_version < "4.0"
|
||||
josepy==1.15.0 ; python_version >= "3.9" and python_version < "4.0"
|
||||
josepy==2.0.0 ; python_version >= "3.9" and python_version < "4.0"
|
||||
jsonlines==4.0.0 ; python_version >= "3.9" and python_version < "4.0"
|
||||
jsonpickle==4.0.1 ; python_version >= "3.9" and python_version < "4.0"
|
||||
keyring==25.6.0 ; python_version >= "3.9" and python_version < "4.0"
|
||||
|
|
|
|||
Loading…
Reference in a new issue