mirror of
https://github.com/certbot/certbot.git
synced 2026-04-15 22:20:28 -04:00
Check version requirements on optional dependencies (#3618)
* Add and test activate function to acme. This function can be used to check if our optional dependencies are available and they meet our version requirements. * use activate in dns_resolver * use activate in dns_available() in challenges_test * Use activate in dns_resolver_test * Use activate in certbot.plugins.util_test * Use acme.util.activate for psutil * Better testing and handling of missing deps * Factored out *_available() code into a common function * Delayed exception caused from using acme.dns_resolver without dnspython until the function is called. This makes both production and testing code simpler. * Make a common subclass for already_listening tests * Simplify mocking of USE_PSUTIL in tests
This commit is contained in:
parent
20ac4aebaf
commit
052be6d4ba
10 changed files with 157 additions and 101 deletions
|
|
@ -9,6 +9,7 @@ from cryptography.hazmat.primitives import hashes
|
|||
import OpenSSL
|
||||
import requests
|
||||
|
||||
from acme import dns_resolver
|
||||
from acme import errors
|
||||
from acme import crypto_util
|
||||
from acme import fields
|
||||
|
|
@ -232,11 +233,11 @@ class DNS01Response(KeyAuthorizationChallengeResponse):
|
|||
logger.debug("Verifying %s at %s...", chall.typ, validation_domain_name)
|
||||
|
||||
try:
|
||||
from acme import dns_resolver
|
||||
except ImportError: # pragma: no cover
|
||||
txt_records = dns_resolver.txt_records_for_name(
|
||||
validation_domain_name)
|
||||
except errors.DependencyError:
|
||||
raise errors.DependencyError("Local validation for 'dns-01' "
|
||||
"challenges requires 'dnspython'")
|
||||
txt_records = dns_resolver.txt_records_for_name(validation_domain_name)
|
||||
exists = validation in txt_records
|
||||
if not exists:
|
||||
logger.debug("Key authorization from response (%r) doesn't match "
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from six.moves.urllib import parse as urllib_parse # pylint: disable=import-err
|
|||
from acme import errors
|
||||
from acme import jose
|
||||
from acme import test_util
|
||||
from acme.dns_resolver import DNS_REQUIREMENT
|
||||
|
||||
CERT = test_util.load_comparable_cert('cert.pem')
|
||||
KEY = jose.JWKRSA(key=test_util.load_rsa_private_key('rsa512_key.pem'))
|
||||
|
|
@ -76,20 +77,6 @@ class KeyAuthorizationChallengeResponseTest(unittest.TestCase):
|
|||
self.assertFalse(response.verify(self.chall, KEY.public_key()))
|
||||
|
||||
|
||||
def dns_available():
|
||||
"""Checks if dns can be imported.
|
||||
|
||||
:rtype: bool
|
||||
:returns: ``True`` if dns can be imported, otherwise, ``False``
|
||||
|
||||
"""
|
||||
try:
|
||||
import dns # pylint: disable=unused-variable
|
||||
except ImportError: # pragma: no cover
|
||||
return False
|
||||
return True # pragma: no cover
|
||||
|
||||
|
||||
class DNS01ResponseTest(unittest.TestCase):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
|
|
@ -122,7 +109,13 @@ class DNS01ResponseTest(unittest.TestCase):
|
|||
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
|
||||
self.response.simple_verify(self.chall, "local", key2.public_key())
|
||||
|
||||
@test_util.skip_unless(dns_available(),
|
||||
@mock.patch('acme.dns_resolver.DNS_AVAILABLE', False)
|
||||
def test_simple_verify_without_dns(self):
|
||||
self.assertRaises(
|
||||
errors.DependencyError, self.response.simple_verify,
|
||||
self.chall, 'local', KEY.public_key())
|
||||
|
||||
@test_util.skip_unless(test_util.requirement_available(DNS_REQUIREMENT),
|
||||
"optional dependency dnspython is not available")
|
||||
def test_simple_verify_good_validation(self): # pragma: no cover
|
||||
with mock.patch(self.records_for_name_path) as mock_resolver:
|
||||
|
|
@ -133,7 +126,7 @@ class DNS01ResponseTest(unittest.TestCase):
|
|||
mock_resolver.assert_called_once_with(
|
||||
self.chall.validation_domain_name("local"))
|
||||
|
||||
@test_util.skip_unless(dns_available(),
|
||||
@test_util.skip_unless(test_util.requirement_available(DNS_REQUIREMENT),
|
||||
"optional dependency dnspython is not available")
|
||||
def test_simple_verify_good_validation_multitxts(self): # pragma: no cover
|
||||
with mock.patch(self.records_for_name_path) as mock_resolver:
|
||||
|
|
@ -144,7 +137,7 @@ class DNS01ResponseTest(unittest.TestCase):
|
|||
mock_resolver.assert_called_once_with(
|
||||
self.chall.validation_domain_name("local"))
|
||||
|
||||
@test_util.skip_unless(dns_available(),
|
||||
@test_util.skip_unless(test_util.requirement_available(DNS_REQUIREMENT),
|
||||
"optional dependency dnspython is not available")
|
||||
def test_simple_verify_bad_validation(self): # pragma: no cover
|
||||
with mock.patch(self.records_for_name_path) as mock_resolver:
|
||||
|
|
|
|||
|
|
@ -3,8 +3,20 @@ Required only for local validation of 'dns-01' challenges.
|
|||
"""
|
||||
import logging
|
||||
|
||||
import dns.resolver
|
||||
import dns.exception
|
||||
from acme import errors
|
||||
from acme import util
|
||||
|
||||
DNS_REQUIREMENT = 'dnspython>=1.12'
|
||||
|
||||
try:
|
||||
util.activate(DNS_REQUIREMENT)
|
||||
# pragma: no cover
|
||||
import dns.exception
|
||||
import dns.resolver
|
||||
DNS_AVAILABLE = True
|
||||
except errors.DependencyError: # pragma: no cover
|
||||
DNS_AVAILABLE = False
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -18,6 +30,9 @@ def txt_records_for_name(name):
|
|||
:rtype: list of unicode
|
||||
|
||||
"""
|
||||
if not DNS_AVAILABLE:
|
||||
raise errors.DependencyError(
|
||||
'{0} is required to use this function'.format(DNS_REQUIREMENT))
|
||||
try:
|
||||
dns_response = dns.resolver.query(name, 'TXT')
|
||||
except dns.resolver.NXDOMAIN as error:
|
||||
|
|
|
|||
|
|
@ -1,17 +1,16 @@
|
|||
"""Tests for acme.dns_resolver."""
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
from six.moves import reload_module # pylint: disable=import-error
|
||||
|
||||
from acme import errors
|
||||
from acme import test_util
|
||||
from acme.dns_resolver import DNS_REQUIREMENT
|
||||
|
||||
|
||||
try:
|
||||
if test_util.requirement_available(DNS_REQUIREMENT):
|
||||
import dns
|
||||
DNS_AVAILABLE = True # pragma: no cover
|
||||
except ImportError: # pragma: no cover
|
||||
DNS_AVAILABLE = False
|
||||
|
||||
|
||||
def create_txt_response(name, txt_records):
|
||||
|
|
@ -25,15 +24,18 @@ def create_txt_response(name, txt_records):
|
|||
return dns.rrset.from_text_list(name, 60, "IN", "TXT", txt_records)
|
||||
|
||||
|
||||
@test_util.skip_unless(DNS_AVAILABLE,
|
||||
"optional dependency dnspython is not available")
|
||||
class DnsResolverTestWithDns(unittest.TestCase):
|
||||
"""Tests for acme.dns_resolver when dns is available."""
|
||||
class TxtRecordsForNameTest(unittest.TestCase):
|
||||
"""Tests for acme.dns_resolver.txt_records_for_name."""
|
||||
@classmethod
|
||||
def _call(cls, name):
|
||||
from acme import dns_resolver
|
||||
return dns_resolver.txt_records_for_name(name)
|
||||
def _call(cls, *args, **kwargs):
|
||||
from acme.dns_resolver import txt_records_for_name
|
||||
return txt_records_for_name(*args, **kwargs)
|
||||
|
||||
|
||||
@test_util.skip_unless(test_util.requirement_available(DNS_REQUIREMENT),
|
||||
"optional dependency dnspython is not available")
|
||||
class TxtRecordsForNameWithDnsTest(TxtRecordsForNameTest):
|
||||
"""Tests for acme.dns_resolver.txt_records_for_name with dns."""
|
||||
@mock.patch("acme.dns_resolver.dns.resolver.query")
|
||||
def test_txt_records_for_name_with_single_response(self, mock_dns):
|
||||
mock_dns.return_value = create_txt_response('name', ['response'])
|
||||
|
|
@ -56,24 +58,19 @@ class DnsResolverTestWithDns(unittest.TestCase):
|
|||
self.assertEquals([], self._call('name'))
|
||||
|
||||
|
||||
class DnsResolverTestWithoutDns(unittest.TestCase):
|
||||
"""Tests for acme.dns_resolver when dns is unavailable."""
|
||||
class TxtRecordsForNameWithoutDnsTest(TxtRecordsForNameTest):
|
||||
"""Tests for acme.dns_resolver.txt_records_for_name without dns."""
|
||||
def setUp(self):
|
||||
self.dns_module = sys.modules['dns'] if 'dns' in sys.modules else None
|
||||
|
||||
if DNS_AVAILABLE:
|
||||
sys.modules['dns'] = None # pragma: no cover
|
||||
from acme import dns_resolver
|
||||
dns_resolver.DNS_AVAILABLE = False
|
||||
|
||||
def tearDown(self):
|
||||
if self.dns_module is not None:
|
||||
sys.modules['dns'] = self.dns_module # pragma: no cover
|
||||
from acme import dns_resolver
|
||||
reload_module(dns_resolver)
|
||||
|
||||
@classmethod
|
||||
def _import_dns(cls):
|
||||
import dns as failed_dns_import # pylint: disable=unused-variable
|
||||
|
||||
def test_import_error_is_raised(self):
|
||||
self.assertRaises(ImportError, self._import_dns)
|
||||
def test_exception_raised(self):
|
||||
self.assertRaises(
|
||||
errors.DependencyError, self._call, "example.org")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
|
|
@ -11,7 +11,9 @@ from cryptography.hazmat.backends import default_backend
|
|||
from cryptography.hazmat.primitives import serialization
|
||||
import OpenSSL
|
||||
|
||||
from acme import errors
|
||||
from acme import jose
|
||||
from acme import util
|
||||
|
||||
|
||||
def vector_path(*names):
|
||||
|
|
@ -76,6 +78,20 @@ def load_pyopenssl_private_key(*names):
|
|||
return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))
|
||||
|
||||
|
||||
def requirement_available(requirement):
|
||||
"""Checks if requirement can be imported.
|
||||
|
||||
:rtype: bool
|
||||
:returns: ``True`` iff requirement can be imported
|
||||
|
||||
"""
|
||||
try:
|
||||
util.activate(requirement)
|
||||
except errors.DependencyError: # pragma: no cover
|
||||
return False
|
||||
return True # pragma: no cover
|
||||
|
||||
|
||||
def skip_unless(condition, reason): # pragma: no cover
|
||||
"""Skip tests unless a condition holds.
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,25 @@
|
|||
"""ACME utilities."""
|
||||
import pkg_resources
|
||||
import six
|
||||
|
||||
from acme import errors
|
||||
|
||||
|
||||
def map_keys(dikt, func):
|
||||
"""Map dictionary keys."""
|
||||
return dict((func(key), value) for key, value in six.iteritems(dikt))
|
||||
|
||||
|
||||
def activate(requirement):
|
||||
"""Make requirement importable.
|
||||
|
||||
:param str requirement: the distribution and version to activate
|
||||
|
||||
:raises acme.errors.DependencyError: if cannot activate requirement
|
||||
|
||||
"""
|
||||
try:
|
||||
for distro in pkg_resources.require(requirement): # pylint: disable=not-callable
|
||||
distro.activate()
|
||||
except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict):
|
||||
raise errors.DependencyError('{0} is unavailable'.format(requirement))
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
"""Tests for acme.util."""
|
||||
import unittest
|
||||
|
||||
from acme import errors
|
||||
|
||||
|
||||
class MapKeysTest(unittest.TestCase):
|
||||
"""Tests for acme.util.map_keys."""
|
||||
|
|
@ -12,5 +14,21 @@ class MapKeysTest(unittest.TestCase):
|
|||
self.assertEqual({2: 2, 4: 4}, map_keys({1: 2, 3: 4}, lambda x: x + 1))
|
||||
|
||||
|
||||
class ActivateTest(unittest.TestCase):
|
||||
"""Tests for acme.util.activate."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, *args, **kwargs):
|
||||
from acme.util import activate
|
||||
return activate(*args, **kwargs)
|
||||
|
||||
def test_failure(self):
|
||||
self.assertRaises(errors.DependencyError, self._call, 'acme>99.0.0')
|
||||
|
||||
def test_success(self):
|
||||
self._call('acme')
|
||||
import acme as unused_acme
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
|
|||
|
|
@ -5,13 +5,19 @@ import socket
|
|||
|
||||
import zope.component
|
||||
|
||||
from acme import errors as acme_errors
|
||||
from acme import util as acme_util
|
||||
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
|
||||
PSUTIL_REQUIREMENT = "psutil>=2.2.1"
|
||||
|
||||
try:
|
||||
import psutil
|
||||
acme_util.activate(PSUTIL_REQUIREMENT)
|
||||
import psutil # pragma: no cover
|
||||
USE_PSUTIL = True
|
||||
except ImportError:
|
||||
except acme_errors.DependencyError: # pragma: no cover
|
||||
USE_PSUTIL = False
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
"""Tests for certbot.plugins.util."""
|
||||
import os
|
||||
import socket
|
||||
import unittest
|
||||
import sys
|
||||
|
||||
import mock
|
||||
from six.moves import reload_module # pylint: disable=import-error
|
||||
|
||||
from certbot.plugins.util import PSUTIL_REQUIREMENT
|
||||
from certbot.tests import test_util
|
||||
|
||||
|
||||
|
|
@ -34,71 +34,47 @@ class PathSurgeryTest(unittest.TestCase):
|
|||
self.assertTrue("/tmp" in os.environ["PATH"])
|
||||
|
||||
|
||||
class AlreadyListeningTestNoPsutil(unittest.TestCase):
|
||||
class AlreadyListeningTest(unittest.TestCase):
|
||||
"""Tests for certbot.plugins.already_listening."""
|
||||
@classmethod
|
||||
def _call(cls, *args, **kwargs):
|
||||
from certbot.plugins.util import already_listening
|
||||
return already_listening(*args, **kwargs)
|
||||
|
||||
|
||||
class AlreadyListeningTestNoPsutil(AlreadyListeningTest):
|
||||
"""Tests for certbot.plugins.already_listening when
|
||||
psutil is not available"""
|
||||
def setUp(self):
|
||||
import certbot.plugins.util
|
||||
# Ensure we get importerror
|
||||
self.psutil = None
|
||||
if "psutil" in sys.modules:
|
||||
self.psutil = sys.modules['psutil']
|
||||
sys.modules['psutil'] = None
|
||||
# Reload hackery to ensure getting non-psutil version
|
||||
# loaded to memory
|
||||
reload_module(certbot.plugins.util)
|
||||
|
||||
def tearDown(self):
|
||||
# Need to reload the module to ensure
|
||||
# getting back to normal
|
||||
import certbot.plugins.util
|
||||
sys.modules["psutil"] = self.psutil
|
||||
reload_module(certbot.plugins.util)
|
||||
@classmethod
|
||||
def _call(cls, *args, **kwargs):
|
||||
with mock.patch("certbot.plugins.util.USE_PSUTIL", False):
|
||||
return super(
|
||||
AlreadyListeningTestNoPsutil, cls)._call(*args, **kwargs)
|
||||
|
||||
@mock.patch("certbot.plugins.util.zope.component.getUtility")
|
||||
def test_ports_available(self, mock_getutil):
|
||||
import certbot.plugins.util as plugins_util
|
||||
# Ensure we don't get error
|
||||
with mock.patch("socket.socket.bind"):
|
||||
self.assertFalse(plugins_util.already_listening(80))
|
||||
self.assertFalse(plugins_util.already_listening(80, True))
|
||||
self.assertFalse(self._call(80))
|
||||
self.assertFalse(self._call(80, True))
|
||||
self.assertEqual(mock_getutil.call_count, 0)
|
||||
|
||||
@mock.patch("certbot.plugins.util.zope.component.getUtility")
|
||||
def test_ports_blocked(self, mock_getutil):
|
||||
sys.modules["psutil"] = None
|
||||
import certbot.plugins.util as plugins_util
|
||||
import socket
|
||||
with mock.patch("socket.socket.bind", side_effect=socket.error):
|
||||
self.assertTrue(plugins_util.already_listening(80))
|
||||
self.assertTrue(plugins_util.already_listening(80, True))
|
||||
with mock.patch("socket.socket", side_effect=socket.error):
|
||||
self.assertFalse(plugins_util.already_listening(80))
|
||||
with mock.patch("certbot.plugins.util.socket.socket.bind") as mock_bind:
|
||||
mock_bind.side_effect = socket.error
|
||||
self.assertTrue(self._call(80))
|
||||
self.assertTrue(self._call(80, True))
|
||||
with mock.patch("certbot.plugins.util.socket.socket") as mock_socket:
|
||||
mock_socket.side_effect = socket.error
|
||||
self.assertFalse(self._call(80))
|
||||
self.assertEqual(mock_getutil.call_count, 2)
|
||||
|
||||
|
||||
def psutil_available():
|
||||
"""Checks if psutil can be imported.
|
||||
|
||||
:rtype: bool
|
||||
:returns: ``True`` if psutil can be imported, otherwise, ``False``
|
||||
|
||||
"""
|
||||
try:
|
||||
import psutil # pylint: disable=unused-variable
|
||||
except ImportError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@test_util.skip_unless(psutil_available(),
|
||||
@test_util.skip_unless(test_util.requirement_available(PSUTIL_REQUIREMENT),
|
||||
"optional dependency psutil is not available")
|
||||
class AlreadyListeningTestPsutil(unittest.TestCase):
|
||||
class AlreadyListeningTestPsutil(AlreadyListeningTest):
|
||||
"""Tests for certbot.plugins.already_listening."""
|
||||
def _call(self, *args, **kwargs):
|
||||
from certbot.plugins.util import already_listening
|
||||
return already_listening(*args, **kwargs)
|
||||
|
||||
@mock.patch("certbot.plugins.util.psutil.net_connections")
|
||||
@mock.patch("certbot.plugins.util.psutil.Process")
|
||||
@mock.patch("certbot.plugins.util.zope.component.getUtility")
|
||||
|
|
|
|||
|
|
@ -11,7 +11,9 @@ from cryptography.hazmat.backends import default_backend
|
|||
from cryptography.hazmat.primitives import serialization
|
||||
import OpenSSL
|
||||
|
||||
from acme import errors
|
||||
from acme import jose
|
||||
from acme import util
|
||||
|
||||
|
||||
def vector_path(*names):
|
||||
|
|
@ -76,6 +78,20 @@ def load_pyopenssl_private_key(*names):
|
|||
return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))
|
||||
|
||||
|
||||
def requirement_available(requirement):
|
||||
"""Checks if requirement can be imported.
|
||||
|
||||
:rtype: bool
|
||||
:returns: ``True`` iff requirement can be imported
|
||||
|
||||
"""
|
||||
try:
|
||||
util.activate(requirement)
|
||||
except errors.DependencyError: # pragma: no cover
|
||||
return False
|
||||
return True # pragma: no cover
|
||||
|
||||
|
||||
def skip_unless(condition, reason): # pragma: no cover
|
||||
"""Skip tests unless a condition holds.
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue