pylint network2/messages2/fields

This commit is contained in:
Jakub Warmuz 2015-03-26 11:02:42 +00:00
parent ede635ad99
commit d304f53895
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
4 changed files with 27 additions and 27 deletions

View file

@ -28,7 +28,7 @@ logging.debug(regr)
authzr = net.request_challenges(
identifier=messages2.Identifier(
typ=messages2.IdentifierFQDN, value='example1.com'),
typ=messages2.IDENTIFIER_FQDN, value='example1.com'),
regr=regr)
logging.debug(authzr)

View file

@ -8,7 +8,7 @@ class RFC3339Field(jose.Field):
"""RFC3339 field encoder/decoder"""
@classmethod
def default_encoder(self, value):
def default_encoder(cls, value):
return pyrfc3339.generate(value)
@classmethod

View file

@ -1,12 +1,7 @@
"""ACME protocol v02 messages."""
import jsonschema
from letsencrypt.acme import challenges
from letsencrypt.acme import errors
from letsencrypt.acme import fields
from letsencrypt.acme import jose
from letsencrypt.acme import other
from letsencrypt.acme import util
class Error(jose.JSONObjectWithFields, Exception):
@ -37,7 +32,7 @@ class Error(jose.JSONObjectWithFields, Exception):
@typ.decoder
def typ(value):
if not value.startswith(ERROR_TYPE_NAMESPACE):
raise errors.DeserializationError('Unrecognized error type')
raise jose.DeserializationError('Unrecognized error type')
return value[len(ERROR_TYPE_NAMESPACE):]
@ -75,18 +70,18 @@ class _Constant(jose.JSONDeSerializable):
class Status(_Constant):
"""ACME "status" field."""
POSSIBLE_NAMES = {}
StatusUnknown = Status('unknown')
StatusPending = Status('pending')
StatusProcessing = Status('processing')
StatusValid = Status('valid')
StatusInvalid = Status('invalid')
StatusRevoked = Status('revoked')
STATUS_UNKNOWN = Status('unknown')
STATUS_PENDING = Status('pending')
STATUS_PROCESSING = Status('processing')
STATUS_VALID = Status('valid')
STATUS_INVALID = Status('invalid')
STATUS_REVOKED = Status('revoked')
class IdentifierType(_Constant):
"""ACME identifier type."""
POSSIBLE_NAMES = {}
IdentifierFQDN = IdentifierType('dns') # IdentifierDNS in Boulder
IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
class Identifier(jose.JSONObjectWithFields):
@ -255,11 +250,11 @@ class Revocation(jose.JSONObjectWithFields):
if jobj == NOW:
return jobj
else:
return RFC3339Field.default_decoder(value)
return fields.RFC3339Field.default_decoder(value)
@revoke.encoder
def revoke(value):
if jobj == NOW:
return value
else:
return RFC3339Field.default_encoder(value)
return fields.RFC3339Field.default_encoder(value)

View file

@ -2,6 +2,7 @@
import datetime
import heapq
import httplib
import itertools
import logging
import time
@ -10,6 +11,7 @@ import werkzeug
import M2Crypto
from letsencrypt.acme import challenges
from letsencrypt.acme import jose
from letsencrypt.acme import messages2
@ -119,7 +121,8 @@ class Network(object):
self._check_response(response, content_type)
return response
def _regr_from_response(self, response, uri=None, new_authz_uri=None):
@classmethod
def _regr_from_response(cls, response, uri=None, new_authz_uri=None):
terms_of_service = (
response.links['next']['url']
if 'terms-of-service' in response.links else None)
@ -136,7 +139,8 @@ class Network(object):
new_authz_uri=new_authz_uri,
terms_of_service=terms_of_service)
def register(self, contact=messages2.Registration._fields['contact'].default):
def register(self, contact=messages2.Registration._fields[
'contact'].default):
"""Register.
:returns: Registration Resource.
@ -231,7 +235,7 @@ class Network(object):
"""
response = self._post(challr.uri, self._wrap_in_jws(response))
if response.headers['location'] != challr.uri:
raise UnexpectedUpdate(response.headers['location'])
raise errors.UnexpectedUpdate(response.headers['location'])
updated_challr = challr.update(
body=challenges.Challenge.from_json(response.json()))
return updated_challr
@ -247,12 +251,13 @@ class Network(object):
return [self.answer_challenge(challr, response)
for challr, response in itertools.izip(challrs, responses)]
def _retry_after(self, response, mintime):
ra = response.headers.get('Retry-After', str(mintime))
@classmethod
def _retry_after(cls, response, mintime):
retry_after = response.headers.get('Retry-After', str(mintime))
try:
seconds = int(ra)
seconds = int(retry_after)
except ValueError:
return werkzeug.parse_date(ra)
return werkzeug.parse_date(retry_after) # pylint: disable=no-member
else:
return datetime.datetime.now() + datetime.timedelta(seconds=seconds)
@ -329,12 +334,12 @@ class Network(object):
# original Authorization Resource URI only
assert updated_authzr.uri == authzr
if updated_authzr.body.status != messages2.StatusValidated:
if updated_authzr.body.status != messages2.StatusValid:
# push back to the priority queue, with updated retry_after
heapq.heappush(waiting, (self._retry_after(
response, mintime=mintime), authzr))
return request_issuance(csr, authzrs), tuple(
return self.request_issuance(csr, authzrs), tuple(
updated[authzr] for authzr in authzrs)
def _get_cert(self, uri):
@ -357,7 +362,7 @@ class Network(object):
# "refresh cert", and this method integrated with self.refresh
response, cert = self._get_cert(certr.uri)
if not response.headers['location'] != certr.uri:
raise UnexpectedUpdate(response.text)
raise errors.UnexpectedUpdate(response.text)
return certr.update(body=cert)
def refresh(self, certr):