Standalone 2.0

This commit is contained in:
Jakub Warmuz 2015-09-26 16:47:29 +00:00
parent ef3605730c
commit faa6cbdd71
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
10 changed files with 228 additions and 907 deletions

View file

@ -19,8 +19,6 @@ Note, that all annotated challenges act as a proxy objects::
"""
import logging
import OpenSSL
from acme import challenges
from acme import jose
@ -56,10 +54,10 @@ class DVSNI(AnnotatedChallenge):
__slots__ = ('challb', 'domain', 'account_key')
acme_type = challenges.DVSNI
def gen_cert_and_response(self, key_pem=None, bits=2048, alg=jose.RS256):
def gen_cert_and_response(self, key=None, bits=2048, alg=jose.RS256):
"""Generate a DVSNI cert and response.
:param bytes key_pem: Private PEM-encoded key used for
:param OpenSSL.crypto.PKey key: Private key used for
certificate generation. If none provided, a fresh key will
be generated.
:param int bits: Number of bits for fresh key generation.
@ -67,23 +65,15 @@ class DVSNI(AnnotatedChallenge):
:returns: ``(response, cert_pem, key_pem)`` tuple, where
``response`` is an instance of
`acme.challenges.DVSNIResponse`, ``cert_pem`` is the
PEM-encoded certificate and ``key_pem`` is PEM-encoded
private key.
`acme.challenges.DVSNIResponse`, ``cert`` is a certificate
(`OpenSSL.crypto.X509`) and ``key`` is a private key
(`OpenSSL.crypto.PKey`).
:rtype: tuple
"""
key = None if key_pem is None else OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key_pem)
response = self.challb.chall.gen_response(self.account_key, alg=alg)
cert, key = response.gen_cert(key=key, bits=bits)
cert_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert)
key_pem = OpenSSL.crypto.dump_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key)
return response, cert_pem, key_pem
return response, cert, key
class SimpleHTTP(AnnotatedChallenge):

View file

@ -80,3 +80,13 @@ class NotSupportedError(PluginError):
class RevokerError(Error):
"""Let's Encrypt Revoker error."""
class StandaloneBindError(Error):
"""Standalone plugin bind error."""
def __init__(self, socket_error, port):
super(StandaloneBindError, self).__init__(
"Problem binding to port {0}: {1}".format(port, socket_error))
self.socket_error = socket_error
self.port = port

View file

@ -8,11 +8,11 @@ import zope.interface
from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt.plugins.standalone import authenticator
from letsencrypt.plugins import standalone
EP_SA = pkg_resources.EntryPoint(
"sa", "letsencrypt.plugins.standalone.authenticator",
attrs=("StandaloneAuthenticator",),
"sa", "letsencrypt.plugins.standalone",
attrs=("Authenticator",),
dist=mock.MagicMock(key="letsencrypt"))
@ -71,8 +71,7 @@ class PluginEntryPointTest(unittest.TestCase):
self.assertTrue(self.plugin_ep.entry_point is EP_SA)
self.assertEqual("sa", self.plugin_ep.name)
self.assertTrue(
self.plugin_ep.plugin_cls is authenticator.StandaloneAuthenticator)
self.assertTrue(self.plugin_ep.plugin_cls is standalone.Authenticator)
def test_init(self):
config = mock.MagicMock()
@ -174,8 +173,7 @@ class PluginsRegistryTest(unittest.TestCase):
with mock.patch("letsencrypt.plugins.disco.pkg_resources") as mock_pkg:
mock_pkg.iter_entry_points.return_value = iter([EP_SA])
plugins = PluginsRegistry.find_all()
self.assertTrue(plugins["sa"].plugin_cls
is authenticator.StandaloneAuthenticator)
self.assertTrue(plugins["sa"].plugin_cls is standalone.Authenticator)
self.assertTrue(plugins["sa"].entry_point is EP_SA)
def test_getitem(self):

View file

@ -0,0 +1,200 @@
"""Standalone Authenticator."""
import collections
import functools
import logging
import random
import socket
import threading
from six.moves import BaseHTTPServer # pylint: disable=import-error
import OpenSSL
import zope.interface
from acme import challenges
from acme import crypto_util as acme_crypto_util
from acme import standalone as acme_standalone
from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt.plugins import common
from letsencrypt.plugins import util
logger = logging.getLogger(__name__)
class ServerManager(object):
"""Standalone servers manager."""
def __init__(self, certs, simple_http_resources):
self.servers = {}
self.certs = certs
self.simple_http_resources = simple_http_resources
def run(self, port, tls):
"""Run ACME server on specified ``port``."""
if port in self.servers:
return self.servers[port]
logger.debug("Starting new server at %s (tls=%s)", port, tls)
handler = acme_standalone.ACMERequestHandler.partial_init(
self.simple_http_resources)
if tls:
cls = functools.partial(
acme_standalone.HTTPSServer, certs=self.certs)
else:
cls = BaseHTTPServer.HTTPServer
try:
server = cls(('', port), handler)
except socket.error as error:
errors.StandaloneBindError(error, port)
stop = threading.Event()
thread = threading.Thread(
target=self._serve,
args=(server, stop),
)
thread.start()
self.servers[port] = (server, thread, stop)
return self.servers[port]
def _serve(self, server, stop):
while not stop.is_set():
server.handle_request()
def stop(self, port):
"""Stop ACME server running on the specified ``port``."""
server, thread, stop = self.servers[port]
stop.set()
# dummy request to terminate last handle_request()
sock = socket.socket()
try:
sock.connect(server.socket.getsockname())
except socket.error:
pass # thread is probably already finished
finally:
sock.close()
thread.join()
del self.servers[port]
def items(self):
"""Return a list of all port, server tuples."""
return self.servers.items()
class Authenticator(common.Plugin):
"""Standalone Authenticator.
This authenticator creates its own ephemeral TCP listener on the
necessary port in order to respond to incoming DVSNI and SimpleHTTP
challenges from the certificate authority. Therefore, it does not
rely on any existing server program.
"""
zope.interface.implements(interfaces.IAuthenticator)
zope.interface.classProvides(interfaces.IPluginFactory)
description = "Standalone Authenticator"
supported_challenges = set([challenges.SimpleHTTP, challenges.DVSNI])
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
# one self-signed key for all DVSNI and SimpleHTTP certificates
self.key = OpenSSL.crypto.PKey()
self.key.generate_key(OpenSSL.crypto.TYPE_RSA, bits=2048)
# TODO: generate only when the first SimpleHTTP challenge is solved
self.simple_http_cert = acme_crypto_util.gen_ss_cert(
self.key, domains=["temp server"])
self.responses = {}
self.servers = {}
self.served = collections.defaultdict(set)
# Stuff below is shared across threads (i.e. servers read
# values, main thread writes). Due to the nature of Cython's
# GIL, the operations are safe, c.f.
# https://docs.python.org/2/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
self.certs = {}
self.simple_http_resources = set()
self.servers = ServerManager(self.certs, self.simple_http_resources)
def more_info(self): # pylint: disable=missing-docstring
return self.__doc__
def prepare(self): # pylint: disable=missing-docstring
if any(util.already_listening(port) for port in
(self.config.dvsni_port, self.config.simple_http_port)):
raise errors.MisconfigurationError(
"One of the (possibly) required ports is already taken taken.")
# TODO: add --chall-pref flag
def get_chall_pref(self, domain):
# pylint: disable=unused-argument,missing-docstring
chall_pref = list(self.supported_challenges)
random.shuffle(chall_pref) # 50% for each challenge
return chall_pref
def perform(self, achalls): # pylint: disable=missing-docstring
try:
return self.perform2(achalls)
except errors.StandaloneBindError as error:
display = zope.component.getUtility(interfaces.IDisplay)
if error.socket_error.errno == socket.errno.EACCES:
display.notification(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
"aren't running this program as "
"root).".format(error.port))
elif error.socket_error.errno == socket.errno.EADDRINUSE:
display.notification(
"Could not bind TCP port {0} because it is already in "
"use by another process on this system (such as a web "
"server). Please stop the program in question and then "
"try again.".format(error.port))
else:
raise # XXX: How to handle unknown errors in binding?
def perform2(self, achalls):
"""Perform achallenges without IDisplay interaction."""
responses = []
tls = not self.config.no_simple_http_tls
for achall in achalls:
if isinstance(achall, achallenges.SimpleHTTP):
server, _, _ = self.servers.run(self.config.simple_http_port, tls=tls)
response, validation = achall.gen_response_and_validation(tls=tls)
self.simple_http_resources.add(
acme_standalone.SimpleHTTPRequestHandler.SimpleHTTPResource(
chall=achall.chall, response=response,
validation=validation))
cert = self.simple_http_cert
domain = achall.domain
else: # DVSNI
server, _, _ = self.servers.run(self.config.dvsni_port, tls=True)
response, cert, _ = achall.gen_cert_and_response(self.key)
domain = response.z_domain
self.certs[domain] = (self.key, cert)
self.responses[achall] = response
self.served[server].add(achall)
responses.append(response)
return responses
def cleanup(self, achalls): # pylint: disable=missing-docstring
# reduce self.served and close servers if none challenges are served
for server, server_achalls in self.served.items():
for achall in achalls:
if achall in server_achalls:
server_achalls.remove(achall)
for port, (server, _, _) in self.servers.items():
if not self.served[server]:
self.servers.stop(port)

View file

@ -1 +0,0 @@
"""Let's Encrypt Standalone Authenticator plugin."""

View file

@ -1,394 +0,0 @@
"""Standalone authenticator."""
import os
import signal
import socket
import sys
import time
import OpenSSL
import zope.component
import zope.interface
from acme import challenges
from letsencrypt import achallenges
from letsencrypt import crypto_util
from letsencrypt import interfaces
from letsencrypt.plugins import common
class StandaloneAuthenticator(common.Plugin):
# pylint: disable=too-many-instance-attributes
"""Standalone authenticator.
This authenticator creates its own ephemeral TCP listener on the
specified port in order to respond to incoming DVSNI challenges from
the certificate authority. Therefore, it does not rely on any
existing server program.
:param OpenSSL.crypto.PKey private_key: DVSNI challenge certificate
key.
:param sni_names: Mapping from z_domain (`bytes`) to PEM-encoded
certificate (`bytes`).
"""
zope.interface.implements(interfaces.IAuthenticator)
zope.interface.classProvides(interfaces.IPluginFactory)
description = "Standalone Authenticator"
def __init__(self, *args, **kwargs):
super(StandaloneAuthenticator, self).__init__(*args, **kwargs)
self.child_pid = None
self.parent_pid = os.getpid()
self.subproc_state = None
self.tasks = {}
self.sni_names = {}
self.sock = None
self.connection = None
self.key_pem = crypto_util.make_key(bits=2048)
self.private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, self.key_pem)
self.ssl_conn = None
def prepare(self):
"""There is nothing left to setup.
.. todo:: This should probably do the port check
"""
def client_signal_handler(self, sig, unused_frame):
"""Signal handler for the parent process.
This handler receives inter-process communication from the
child process in the form of Unix signals.
:param int sig: Which signal the process received.
"""
# subprocess to client READY: SIGIO
# subprocess to client INUSE: SIGUSR1
# subprocess to client CANTBIND: SIGUSR2
if sig == signal.SIGIO:
self.subproc_state = "ready"
elif sig == signal.SIGUSR1:
self.subproc_state = "inuse"
elif sig == signal.SIGUSR2:
self.subproc_state = "cantbind"
else:
# NOTREACHED
raise ValueError("Unexpected signal in signal handler")
def subproc_signal_handler(self, sig, unused_frame):
"""Signal handler for the child process.
This handler receives inter-process communication from the parent
process in the form of Unix signals.
:param int sig: Which signal the process received.
"""
# client to subprocess CLEANUP : SIGINT
if sig == signal.SIGINT:
try:
self.ssl_conn.shutdown()
self.ssl_conn.close()
except BaseException:
# There might not even be any currently active SSL connection.
pass
try:
self.connection.close()
except BaseException:
# There might not even be any currently active connection.
pass
try:
self.sock.close()
except BaseException:
# Various things can go wrong in the course of closing these
# connections, but none of them can clearly be usefully
# reported here and none of them should impede us from
# exiting as gracefully as possible.
pass
os.kill(self.parent_pid, signal.SIGUSR1)
sys.exit(0)
def sni_callback(self, connection):
"""Used internally to respond to incoming SNI names.
This method will set a new OpenSSL context object for this
connection when an incoming connection provides an SNI name
(in order to serve the appropriate certificate, if any).
:param connection: The TLS connection object on which the SNI
extension was received.
:type connection: :class:`OpenSSL.Connection`
"""
sni_name = connection.get_servername()
if sni_name in self.sni_names:
pem_cert = self.sni_names[sni_name]
else:
# TODO: Should we really present a certificate if we get an
# unexpected SNI name? Or should we just disconnect?
pem_cert = next(self.sni_names.itervalues())
cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pem_cert)
new_ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
new_ctx.set_verify(OpenSSL.SSL.VERIFY_NONE, lambda: False)
new_ctx.use_certificate(cert)
new_ctx.use_privatekey(self.private_key)
connection.set_context(new_ctx)
def do_parent_process(self, port, delay_amount=5):
"""Perform the parent process side of the TCP listener task.
This should only be called by :meth:`start_listener`. We will
wait up to delay_amount seconds to hear from the child process
via a signal.
:param int port: Which TCP port to bind.
:param float delay_amount: How long in seconds to wait for the
subprocess to notify us whether it succeeded.
:returns: ``True`` or ``False`` according to whether we were notified
that the child process succeeded or failed in binding the port.
:rtype: bool
"""
display = zope.component.getUtility(interfaces.IDisplay)
start_time = time.time()
while time.time() < start_time + delay_amount:
if self.subproc_state == "ready":
return True
elif self.subproc_state == "inuse":
display.notification(
"Could not bind TCP port {0} because it is already in "
"use by another process on this system (such as a web "
"server). Please stop the program in question and then "
"try again.".format(port))
return False
elif self.subproc_state == "cantbind":
display.notification(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
"aren't running this program as "
"root).".format(port))
return False
time.sleep(0.1)
display.notification(
"Subprocess unexpectedly timed out while trying to bind TCP "
"port {0}.".format(port))
return False
def do_child_process(self, port):
"""Perform the child process side of the TCP listener task.
This should only be called by :meth:`start_listener`.
Normally does not return; instead, the child process exits from
within this function or from within the child process signal
handler.
:param int port: Which TCP port to bind.
"""
signal.signal(signal.SIGINT, self.subproc_signal_handler)
self.sock = socket.socket()
# SO_REUSEADDR flag tells the kernel to reuse a local socket
# in TIME_WAIT state, without waiting for its natural timeout
# to expire.
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.sock.bind(("0.0.0.0", port))
except socket.error, error:
if error.errno == socket.errno.EACCES:
# Signal permissions denied to bind TCP port
os.kill(self.parent_pid, signal.SIGUSR2)
elif error.errno == socket.errno.EADDRINUSE:
# Signal TCP port is already in use
os.kill(self.parent_pid, signal.SIGUSR1)
else:
# XXX: How to handle unknown errors in binding?
raise error
sys.exit(1)
# XXX: We could use poll mechanism to handle simultaneous
# XXX: rather than sequential inbound TCP connections here
self.sock.listen(1)
# Signal that we've successfully bound TCP port
os.kill(self.parent_pid, signal.SIGIO)
while True:
self.connection, _ = self.sock.accept()
# The code below uses the PyOpenSSL bindings to respond to
# the client. This may expose us to bugs and vulnerabilities
# in OpenSSL (and creates additional dependencies).
ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
ctx.set_verify(OpenSSL.SSL.VERIFY_NONE, lambda: False)
pem_cert = self.tasks.values()[0]
first_cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pem_cert)
ctx.use_certificate(first_cert)
ctx.use_privatekey(self.private_key)
ctx.set_cipher_list("HIGH")
ctx.set_tlsext_servername_callback(self.sni_callback)
self.ssl_conn = OpenSSL.SSL.Connection(ctx, self.connection)
self.ssl_conn.set_accept_state()
self.ssl_conn.do_handshake()
self.ssl_conn.shutdown()
self.ssl_conn.close()
def start_listener(self, port):
"""Start listener.
Create a child process which will start a TCP listener on the
specified port to perform the specified DVSNI challenges.
:param int port: The TCP port to bind.
:returns: ``True`` or ``False`` to indicate success or failure creating
the subprocess.
:rtype: bool
"""
# In order to avoid a race condition, we set the signal handler
# that will be needed by the parent process now, and undo this
# action if we turn out to be the child process. (This needs
# to happen before the fork because the child will send one of
# these signals to the parent almost immediately after the
# fork, and the parent must already be ready to receive it.)
signal.signal(signal.SIGIO, self.client_signal_handler)
signal.signal(signal.SIGUSR1, self.client_signal_handler)
signal.signal(signal.SIGUSR2, self.client_signal_handler)
sys.stdout.flush()
fork_result = os.fork()
if fork_result:
# PARENT process (still the Let's Encrypt client process)
self.child_pid = fork_result
# do_parent_process() can return True or False to indicate
# reported success or failure creating the listener.
return self.do_parent_process(port)
else:
# CHILD process (the TCP listener subprocess)
# Undo the parent's signal handler settings, which aren't
# applicable to us.
signal.signal(signal.SIGIO, signal.SIG_DFL)
signal.signal(signal.SIGUSR1, signal.SIG_DFL)
signal.signal(signal.SIGUSR2, signal.SIG_DFL)
self.child_pid = os.getpid()
# do_child_process() is normally not expected to return but
# should terminate via sys.exit().
return self.do_child_process(port)
# IAuthenticator method implementations follow
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
"""Get challenge preferences.
IAuthenticator interface method get_chall_pref.
Return a list of challenge types that this authenticator
can perform for this domain. In the case of the
StandaloneAuthenticator, the only challenge type that can ever
be performed is dvsni.
:returns: A list containing only 'dvsni'.
"""
return [challenges.DVSNI]
def perform(self, achalls):
"""Perform the challenge.
.. warning::
For the StandaloneAuthenticator, because there is no convenient
way to add additional requests, this should only be invoked
once; subsequent invocations are an error. To perform
validations for multiple independent sets of domains, a separate
StandaloneAuthenticator should be instantiated.
"""
if self.child_pid or self.tasks:
# We should not be willing to continue with perform
# if there were existing pending challenges.
raise ValueError(".perform() was called with pending tasks!")
results_if_success = []
results_if_failure = []
if not achalls or not isinstance(achalls, list):
raise ValueError(".perform() was called without challenge list")
# TODO: "bits" should be user-configurable
for achall in achalls:
if isinstance(achall, achallenges.DVSNI):
# We will attempt to do it
response, cert_pem, _ = achall.gen_cert_and_response(
key_pem=self.key_pem)
self.sni_names[response.z_domain] = cert_pem
self.tasks[achall.token] = cert_pem
results_if_success.append(response)
results_if_failure.append(None)
else:
# We will not attempt to do this challenge because it
# is not a type we can handle
results_if_success.append(False)
results_if_failure.append(False)
if not self.tasks:
raise ValueError("nothing for .perform() to do")
if util.already_listening(self.config.dvsni_port):
# If we know a process is already listening on this port,
# tell the user, and don't even attempt to bind it. (This
# test is Linux-specific and won't indicate that the port
# is bound if invoked on a different operating system.)
return results_if_failure
# Try to do the authentication; note that this creates
# the listener subprocess via os.fork()
if self.start_listener(self.config.dvsni_port):
return results_if_success
else:
# TODO: This should probably raise a DVAuthError exception
# rather than returning a list of None objects.
return results_if_failure
def cleanup(self, achalls):
"""Clean up.
If some challenges are removed from the list, the authenticator
socket will still respond to those challenges. Once all
challenges have been removed from the list, the listener is
deactivated and stops listening.
"""
# Remove this from pending tasks list
for achall in achalls:
assert isinstance(achall, achallenges.DVSNI)
if achall.token in self.tasks:
del self.tasks[achall.token]
else:
# Could not find the challenge to remove!
raise ValueError("could not find the challenge to remove")
if self.child_pid and not self.tasks:
# There are no remaining challenges, so
# try to shutdown self.child_pid cleanly.
# TODO: ignore any signals from child during this process
os.kill(self.child_pid, signal.SIGINT)
time.sleep(1)
# TODO: restore original signal handlers in parent process
# by resetting their actions to SIG_DFL
# print "TCP listener subprocess has been told to shut down"
def more_info(self): # pylint: disable=no-self-use
"""Human-readable string that describes the Authenticator."""
return ("The Standalone Authenticator uses PyOpenSSL to listen "
"on port {port} and perform DVSNI challenges. Once a "
"certificate is attained, it will be saved in the "
"(TODO) current working directory.{linesep}{linesep}"
"TCP port {port} must be available in order to use the "
"Standalone Authenticator.".format(
linesep=os.linesep, port=self.config.dvsni_port))

View file

@ -1 +0,0 @@
"""Let's Encrypt Standalone Tests"""

View file

@ -1,483 +0,0 @@
"""Tests for letsencrypt.plugins.standalone.authenticator."""
import os
import psutil
import signal
import socket
import unittest
import mock
import OpenSSL
from acme import challenges
from acme import jose
from letsencrypt import achallenges
from letsencrypt.tests import acme_util
from letsencrypt.tests import test_util
ACCOUNT_KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
CHALL_KEY_PEM = test_util.load_vector("rsa512_key_2.pem")
CHALL_KEY = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, CHALL_KEY_PEM)
CONFIG = mock.Mock(dvsni_port=5001)
# Classes based on to allow interrupting infinite loop under test
# after one iteration, based on.
# http://igorsobreira.com/2013/03/17/testing-infinite-loops.html
class _SocketAcceptOnlyNTimes(object):
# pylint: disable=too-few-public-methods
"""
Callable that will raise `CallableExhausted`
exception after `limit` calls, modified to also return
a tuple simulating the return values of a socket.accept()
call
"""
def __init__(self, limit):
self.limit = limit
self.calls = 0
def __call__(self):
self.calls += 1
if self.calls > self.limit:
raise CallableExhausted
# Modified here for a single use as socket.accept()
return (mock.MagicMock(), "ignored")
class CallableExhausted(Exception):
# pylint: disable=too-few-public-methods
"""Exception raised when a method is called more than the
specified number of times."""
class ChallPrefTest(unittest.TestCase):
"""Tests for chall_pref() method."""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
def test_chall_pref(self):
self.assertEqual(self.authenticator.get_chall_pref("example.com"),
[challenges.DVSNI])
class SNICallbackTest(unittest.TestCase):
"""Tests for sni_callback() method."""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.cert = achallenges.DVSNI(
challb=acme_util.DVSNI_P,
domain="example.com",
account_key=ACCOUNT_KEY
).gen_cert_and_response(key_pem=CHALL_KEY_PEM)[1]
self.authenticator.private_key = CHALL_KEY
self.authenticator.sni_names = {"abcdef.acme.invalid": self.cert}
self.authenticator.child_pid = 12345
def test_real_servername(self):
connection = mock.MagicMock()
connection.get_servername.return_value = "abcdef.acme.invalid"
self.authenticator.sni_callback(connection)
self.assertEqual(connection.set_context.call_count, 1)
called_ctx = connection.set_context.call_args[0][0]
self.assertTrue(isinstance(called_ctx, OpenSSL.SSL.Context))
def test_fake_servername(self):
"""Test behavior of SNI callback when an unexpected name is received.
(Currently the expected behavior in this case is to return the
"first" certificate with which the listener was configured,
although they are stored in an unordered data structure so
this might not be the one that was first in the challenge list
passed to the perform method. In the future, this might result
in dropping the connection instead.)"""
connection = mock.MagicMock()
connection.get_servername.return_value = "example.com"
self.authenticator.sni_callback(connection)
self.assertEqual(connection.set_context.call_count, 1)
called_ctx = connection.set_context.call_args[0][0]
self.assertTrue(isinstance(called_ctx, OpenSSL.SSL.Context))
class ClientSignalHandlerTest(unittest.TestCase):
"""Tests for client_signal_handler() method."""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.authenticator.tasks = {"footoken.acme.invalid": "stuff"}
self.authenticator.child_pid = 12345
def test_client_signal_handler(self):
self.assertTrue(self.authenticator.subproc_state is None)
self.authenticator.client_signal_handler(signal.SIGIO, None)
self.assertEqual(self.authenticator.subproc_state, "ready")
self.authenticator.client_signal_handler(signal.SIGUSR1, None)
self.assertEqual(self.authenticator.subproc_state, "inuse")
self.authenticator.client_signal_handler(signal.SIGUSR2, None)
self.assertEqual(self.authenticator.subproc_state, "cantbind")
# Testing the unreached path for a signal other than these
# specified (which can't occur in normal use because this
# function is only set as a signal handler for the above three
# signals).
self.assertRaises(
ValueError, self.authenticator.client_signal_handler,
signal.SIGPIPE, None)
class SubprocSignalHandlerTest(unittest.TestCase):
"""Tests for subproc_signal_handler() method."""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.authenticator.tasks = {"footoken.acme.invalid": "stuff"}
self.authenticator.child_pid = 12345
self.authenticator.parent_pid = 23456
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill")
@mock.patch("letsencrypt.plugins.standalone.authenticator.sys.exit")
def test_subproc_signal_handler(self, mock_exit, mock_kill):
self.authenticator.ssl_conn = mock.MagicMock()
self.authenticator.connection = mock.MagicMock()
self.authenticator.sock = mock.MagicMock()
self.authenticator.subproc_signal_handler(signal.SIGINT, None)
self.assertEquals(self.authenticator.ssl_conn.shutdown.call_count, 1)
self.assertEquals(self.authenticator.ssl_conn.close.call_count, 1)
self.assertEquals(self.authenticator.connection.close.call_count, 1)
self.assertEquals(self.authenticator.sock.close.call_count, 1)
mock_kill.assert_called_once_with(
self.authenticator.parent_pid, signal.SIGUSR1)
mock_exit.assert_called_once_with(0)
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill")
@mock.patch("letsencrypt.plugins.standalone.authenticator.sys.exit")
def test_subproc_signal_handler_trouble(self, mock_exit, mock_kill):
"""Test attempting to shut down a non-existent connection.
(This could occur because none was established or active at the
time the signal handler tried to perform the cleanup)."""
self.authenticator.ssl_conn = mock.MagicMock()
self.authenticator.connection = mock.MagicMock()
self.authenticator.sock = mock.MagicMock()
# AttributeError simulates the case where one of these properties
# is None because no connection exists. We raise it for
# ssl_conn.close() instead of ssl_conn.shutdown() for better code
# coverage.
self.authenticator.ssl_conn.close.side_effect = AttributeError("!")
self.authenticator.connection.close.side_effect = AttributeError("!")
self.authenticator.sock.close.side_effect = AttributeError("!")
self.authenticator.subproc_signal_handler(signal.SIGINT, None)
self.assertEquals(self.authenticator.ssl_conn.shutdown.call_count, 1)
self.assertEquals(self.authenticator.ssl_conn.close.call_count, 1)
self.assertEquals(self.authenticator.connection.close.call_count, 1)
self.assertEquals(self.authenticator.sock.close.call_count, 1)
mock_kill.assert_called_once_with(
self.authenticator.parent_pid, signal.SIGUSR1)
mock_exit.assert_called_once_with(0)
class PerformTest(unittest.TestCase):
"""Tests for perform() method."""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.achall1 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(token=b"foo"), "pending"),
domain="foo.example.com", account_key=ACCOUNT_KEY)
self.achall2 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(token=b"bar"), "pending"),
domain="bar.example.com", account_key=ACCOUNT_KEY)
bad_achall = ("This", "Represents", "A Non-DVSNI", "Challenge")
self.achalls = [self.achall1, self.achall2, bad_achall]
def test_perform_when_already_listening(self):
self.authenticator.already_listening = mock.Mock()
self.authenticator.already_listening.return_value = True
result = self.authenticator.perform([self.achall1])
self.assertEqual(result, [None])
def test_can_perform(self):
"""What happens if start_listener() returns True."""
self.authenticator.start_listener = mock.Mock()
self.authenticator.start_listener.return_value = True
self.authenticator.already_listening = mock.Mock(return_value=False)
result = self.authenticator.perform(self.achalls)
self.assertEqual(len(self.authenticator.tasks), 2)
self.assertTrue(self.achall1.token in self.authenticator.tasks)
self.assertTrue(self.achall2.token in self.authenticator.tasks)
self.assertTrue(isinstance(result, list))
self.assertEqual(len(result), 3)
self.assertTrue(isinstance(result[0], challenges.ChallengeResponse))
self.assertTrue(isinstance(result[1], challenges.ChallengeResponse))
self.assertFalse(result[2])
self.authenticator.start_listener.assert_called_once_with(
CONFIG.dvsni_port)
def test_cannot_perform(self):
"""What happens if start_listener() returns False."""
self.authenticator.start_listener = mock.Mock()
self.authenticator.start_listener.return_value = False
self.authenticator.already_listening = mock.Mock(return_value=False)
result = self.authenticator.perform(self.achalls)
self.assertEqual(len(self.authenticator.tasks), 2)
self.assertTrue(self.achall1.token in self.authenticator.tasks)
self.assertTrue(self.achall2.token in self.authenticator.tasks)
self.assertTrue(isinstance(result, list))
self.assertEqual(len(result), 3)
self.assertEqual(result, [None, None, False])
self.authenticator.start_listener.assert_called_once_with(
CONFIG.dvsni_port)
def test_perform_with_pending_tasks(self):
self.authenticator.tasks = {"footoken.acme.invalid": "cert_data"}
extra_achall = acme_util.DVSNI_P
self.assertRaises(
ValueError, self.authenticator.perform, [extra_achall])
def test_perform_without_challenge_list(self):
extra_achall = acme_util.DVSNI_P
# This is wrong because a challenge must be specified.
self.assertRaises(ValueError, self.authenticator.perform, [])
# This is wrong because it must be a list, not a bare challenge.
self.assertRaises(
ValueError, self.authenticator.perform, extra_achall)
# This is wrong because the list must contain at least one challenge.
self.assertRaises(
ValueError, self.authenticator.perform, range(20))
class StartListenerTest(unittest.TestCase):
"""Tests for start_listener() method."""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.fork")
def test_start_listener_fork_parent(self, mock_fork):
self.authenticator.do_parent_process = mock.Mock()
self.authenticator.do_parent_process.return_value = True
mock_fork.return_value = 22222
result = self.authenticator.start_listener(1717)
# start_listener is expected to return the True or False return
# value from do_parent_process.
self.assertTrue(result)
self.assertEqual(self.authenticator.child_pid, 22222)
self.authenticator.do_parent_process.assert_called_once_with(1717)
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.fork")
def test_start_listener_fork_child(self, mock_fork):
self.authenticator.do_parent_process = mock.Mock()
self.authenticator.do_child_process = mock.Mock()
mock_fork.return_value = 0
self.authenticator.start_listener(1717)
self.assertEqual(self.authenticator.child_pid, os.getpid())
self.authenticator.do_child_process.assert_called_once_with(1717)
class DoParentProcessTest(unittest.TestCase):
"""Tests for do_parent_process() method."""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
@mock.patch("letsencrypt.plugins.standalone.authenticator."
"zope.component.getUtility")
def test_do_parent_process_ok(self, mock_get_utility):
self.authenticator.subproc_state = "ready"
result = self.authenticator.do_parent_process(1717)
self.assertTrue(result)
self.assertEqual(mock_get_utility.call_count, 1)
@mock.patch("letsencrypt.plugins.standalone.authenticator."
"zope.component.getUtility")
def test_do_parent_process_inuse(self, mock_get_utility):
self.authenticator.subproc_state = "inuse"
result = self.authenticator.do_parent_process(1717)
self.assertFalse(result)
self.assertEqual(mock_get_utility.call_count, 1)
@mock.patch("letsencrypt.plugins.standalone.authenticator."
"zope.component.getUtility")
def test_do_parent_process_cantbind(self, mock_get_utility):
self.authenticator.subproc_state = "cantbind"
result = self.authenticator.do_parent_process(1717)
self.assertFalse(result)
self.assertEqual(mock_get_utility.call_count, 1)
@mock.patch("letsencrypt.plugins.standalone.authenticator."
"zope.component.getUtility")
def test_do_parent_process_timeout(self, mock_get_utility):
# Normally times out in 5 seconds and returns False. We can
# now set delay_amount to a lower value so that it times out
# faster than it would under normal use.
result = self.authenticator.do_parent_process(1717, delay_amount=1)
self.assertFalse(result)
self.assertEqual(mock_get_utility.call_count, 1)
class DoChildProcessTest(unittest.TestCase):
"""Tests for do_child_process() method."""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.cert = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(token=b"abcdef"), "pending"),
domain="example.com", account_key=ACCOUNT_KEY).gen_cert_and_response(
key_pem=CHALL_KEY_PEM)[1]
self.authenticator.private_key = CHALL_KEY
self.authenticator.tasks = {"abcdef.acme.invalid": self.cert}
self.authenticator.parent_pid = 12345
@mock.patch("letsencrypt.plugins.standalone.authenticator.socket.socket")
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill")
@mock.patch("letsencrypt.plugins.standalone.authenticator.sys.exit")
def test_do_child_process_cantbind1(
self, mock_exit, mock_kill, mock_socket):
mock_exit.side_effect = IndentationError("subprocess would exit here")
eaccess = socket.error(socket.errno.EACCES, "Permission denied")
sample_socket = mock.MagicMock()
sample_socket.bind.side_effect = eaccess
mock_socket.return_value = sample_socket
# Using the IndentationError as an error that cannot easily be
# generated at runtime, to indicate the behavior of sys.exit has
# taken effect without actually causing the test process to exit.
# (Just replacing it with a no-op causes logic errors because the
# do_child_process code assumes that calling sys.exit() will
# cause subsequent code not to be executed.)
self.assertRaises(
IndentationError, self.authenticator.do_child_process, 1717)
mock_exit.assert_called_once_with(1)
mock_kill.assert_called_once_with(12345, signal.SIGUSR2)
@mock.patch("letsencrypt.plugins.standalone.authenticator.socket.socket")
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill")
@mock.patch("letsencrypt.plugins.standalone.authenticator.sys.exit")
def test_do_child_process_cantbind2(self, mock_exit, mock_kill,
mock_socket):
mock_exit.side_effect = IndentationError("subprocess would exit here")
eaccess = socket.error(socket.errno.EADDRINUSE, "Port already in use")
sample_socket = mock.MagicMock()
sample_socket.bind.side_effect = eaccess
mock_socket.return_value = sample_socket
self.assertRaises(
IndentationError, self.authenticator.do_child_process, 1717)
mock_exit.assert_called_once_with(1)
mock_kill.assert_called_once_with(12345, signal.SIGUSR1)
@mock.patch("letsencrypt.plugins.standalone.authenticator."
"socket.socket")
def test_do_child_process_cantbind3(self, mock_socket):
"""Test case where attempt to bind socket results in an unhandled
socket error. (The expected behavior is arguably wrong because it
will crash the program; the reason for the expected behavior is
that we don't have a way to report arbitrary socket errors.)"""
eio = socket.error(socket.errno.EIO, "Imaginary unhandled error")
sample_socket = mock.MagicMock()
sample_socket.bind.side_effect = eio
mock_socket.return_value = sample_socket
self.assertRaises(
socket.error, self.authenticator.do_child_process, 1717)
@mock.patch("letsencrypt.plugins.standalone.authenticator."
"OpenSSL.SSL.Connection")
@mock.patch("letsencrypt.plugins.standalone.authenticator.socket.socket")
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill")
def test_do_child_process_success(
self, mock_kill, mock_socket, mock_connection):
sample_socket = mock.MagicMock()
sample_socket.accept.side_effect = _SocketAcceptOnlyNTimes(2)
mock_socket.return_value = sample_socket
mock_connection.return_value = mock.MagicMock()
self.assertRaises(
CallableExhausted, self.authenticator.do_child_process, 1717)
mock_socket.assert_called_once_with()
sample_socket.bind.assert_called_once_with(("0.0.0.0", 1717))
sample_socket.listen.assert_called_once_with(1)
self.assertEqual(sample_socket.accept.call_count, 3)
mock_kill.assert_called_once_with(12345, signal.SIGIO)
# TODO: We could have some tests about the fact that the listener
# asks OpenSSL to negotiate a TLS connection (and correctly
# sets the SNI callback function).
class CleanupTest(unittest.TestCase):
"""Tests for cleanup() method."""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.achall = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(token=b"footoken"), "pending"),
domain="foo.example.com", account_key="key")
self.authenticator.tasks = {self.achall.token: "stuff"}
self.authenticator.child_pid = 12345
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill")
@mock.patch("letsencrypt.plugins.standalone.authenticator.time.sleep")
def test_cleanup(self, mock_sleep, mock_kill):
mock_sleep.return_value = None
mock_kill.return_value = None
self.authenticator.cleanup([self.achall])
mock_kill.assert_called_once_with(12345, signal.SIGINT)
mock_sleep.assert_called_once_with(1)
def test_bad_cleanup(self):
self.assertRaises(
ValueError, self.authenticator.cleanup, [achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(token=b"badtoken"), "pending"),
domain="bad.example.com", account_key="key")])
class MoreInfoTest(unittest.TestCase):
"""Tests for more_info() method. (trivially)"""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import (
StandaloneAuthenticator)
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
def test_more_info(self):
"""Make sure exceptions aren't raised."""
self.authenticator.more_info()
class InitTest(unittest.TestCase):
"""Tests for more_info() method. (trivially)"""
def setUp(self):
from letsencrypt.plugins.standalone.authenticator import (
StandaloneAuthenticator)
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
def test_prepare(self):
"""Make sure exceptions aren't raised.
.. todo:: Add on more once things are setup appropriately.
"""
self.authenticator.prepare()
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -1,6 +1,8 @@
"""Tests for letsencrypt.achallenges."""
import unittest
import OpenSSL
from acme import challenges
from acme import jose
@ -22,10 +24,10 @@ class DVSNITest(unittest.TestCase):
self.assertEqual(self.challb.token, self.achall.token)
def test_gen_cert_and_response(self):
response, cert_pem, key_pem = self.achall.gen_cert_and_response()
response, cert, key = self.achall.gen_cert_and_response()
self.assertTrue(isinstance(response, challenges.DVSNIResponse))
self.assertTrue(isinstance(cert_pem, bytes))
self.assertTrue(isinstance(key_pem, bytes))
self.assertTrue(isinstance(cert, OpenSSL.crypto.X509))
self.assertTrue(isinstance(key, OpenSSL.crypto.PKey))
if __name__ == "__main__":

View file

@ -42,6 +42,7 @@ install_requires = [
'python2-pythondialog>=3.2.2rc1', # Debian squeeze support, cf. #280
'pytz',
'requests',
'six',
'zope.component',
'zope.interface',
]
@ -119,8 +120,7 @@ setup(
'manual = letsencrypt.plugins.manual:ManualAuthenticator',
# TODO: null should probably not be presented to the user
'null = letsencrypt.plugins.null:Installer',
'standalone = letsencrypt.plugins.standalone.authenticator'
':StandaloneAuthenticator',
'standalone = letsencrypt.plugins.standalone:Authenticator',
],
},