1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-17 00:07:37 +02:00

initial working x509 cert parser

This commit is contained in:
Kelly Brazil
2022-07-01 15:48:29 -07:00
parent 430385fa63
commit 13388df603
27 changed files with 16863 additions and 9 deletions

View File

@ -622,7 +622,7 @@ def main():
try:
# differentiate between regular and streaming parsers
# streaming
# streaming (only supports UTF-8 string data for now)
if _parser_is_streaming(parser):
result = parser.parse(sys.stdin,
raw=raw,
@ -639,9 +639,17 @@ def main():
sys.exit(combined_exit_code(magic_exit_code, 0))
# regular
# regular (supports binary and UTF-8 string data)
else:
data = magic_stdout or sys.stdin.read()
data = magic_stdout or sys.stdin.buffer.read()
# convert to UTF-8, if possible. Otherwise, leave as bytes
try:
if isinstance(data, bytes):
data = data.decode('utf-8')
except UnicodeDecodeError:
pass
result = parser.parse(data,
raw=raw,
quiet=quiet)

View File

@ -110,6 +110,7 @@ parsers = [
'wc',
'who',
'xml',
'x509-cert',
'xrandr',
'yaml',
'zipinfo'

View File

@ -0,0 +1,47 @@
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
from .version import __version__, __version_info__
__all__ = [
'__version__',
'__version_info__',
'load_order',
]
def load_order():
"""
Returns a list of the module and sub-module names for asn1crypto in
dependency load order, for the sake of live reloading code
:return:
A list of unicode strings of module names, as they would appear in
sys.modules, ordered by which module should be reloaded first
"""
return [
'asn1crypto._errors',
'asn1crypto._int',
'asn1crypto._ordereddict',
'asn1crypto._teletex_codec',
'asn1crypto._types',
'asn1crypto._inet',
'asn1crypto._iri',
'asn1crypto.version',
'asn1crypto.pem',
'asn1crypto.util',
'asn1crypto.parser',
'asn1crypto.core',
'asn1crypto.algos',
'asn1crypto.keys',
'asn1crypto.x509',
'asn1crypto.crl',
'asn1crypto.csr',
'asn1crypto.ocsp',
'asn1crypto.cms',
'asn1crypto.pdf',
'asn1crypto.pkcs12',
'asn1crypto.tsp',
'asn1crypto',
]

View File

@ -0,0 +1,54 @@
# coding: utf-8
"""
Exports the following items:
- unwrap()
- APIException()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import re
import textwrap
class APIException(Exception):
"""
An exception indicating an API has been removed from asn1crypto
"""
pass
def unwrap(string, *params):
"""
Takes a multi-line string and does the following:
- dedents
- converts newlines with text before and after into a single line
- strips leading and trailing whitespace
:param string:
The string to format
:param *params:
Params to interpolate into the string
:return:
The formatted string
"""
output = textwrap.dedent(string)
# Unwrap lines, taking into account bulleted lists, ordered lists and
# underlines consisting of = signs
if output.find('\n') != -1:
output = re.sub('(?<=\\S)\n(?=[^ \n\t\\d\\*\\-=])', ' ', output)
if params:
output = output % params
output = output.strip()
return output

View File

@ -0,0 +1,170 @@
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
import socket
import struct
from ._errors import unwrap
from ._types import byte_cls, bytes_to_list, str_cls, type_name
def inet_ntop(address_family, packed_ip):
"""
Windows compatibility shim for socket.inet_ntop().
:param address_family:
socket.AF_INET for IPv4 or socket.AF_INET6 for IPv6
:param packed_ip:
A byte string of the network form of an IP address
:return:
A unicode string of the IP address
"""
if address_family not in set([socket.AF_INET, socket.AF_INET6]):
raise ValueError(unwrap(
'''
address_family must be socket.AF_INET (%s) or socket.AF_INET6 (%s),
not %s
''',
repr(socket.AF_INET),
repr(socket.AF_INET6),
repr(address_family)
))
if not isinstance(packed_ip, byte_cls):
raise TypeError(unwrap(
'''
packed_ip must be a byte string, not %s
''',
type_name(packed_ip)
))
required_len = 4 if address_family == socket.AF_INET else 16
if len(packed_ip) != required_len:
raise ValueError(unwrap(
'''
packed_ip must be %d bytes long - is %d
''',
required_len,
len(packed_ip)
))
if address_family == socket.AF_INET:
return '%d.%d.%d.%d' % tuple(bytes_to_list(packed_ip))
octets = struct.unpack(b'!HHHHHHHH', packed_ip)
runs_of_zero = {}
longest_run = 0
zero_index = None
for i, octet in enumerate(octets + (-1,)):
if octet != 0:
if zero_index is not None:
length = i - zero_index
if length not in runs_of_zero:
runs_of_zero[length] = zero_index
longest_run = max(longest_run, length)
zero_index = None
elif zero_index is None:
zero_index = i
hexed = [hex(o)[2:] for o in octets]
if longest_run < 2:
return ':'.join(hexed)
zero_start = runs_of_zero[longest_run]
zero_end = zero_start + longest_run
return ':'.join(hexed[:zero_start]) + '::' + ':'.join(hexed[zero_end:])
def inet_pton(address_family, ip_string):
"""
Windows compatibility shim for socket.inet_ntop().
:param address_family:
socket.AF_INET for IPv4 or socket.AF_INET6 for IPv6
:param ip_string:
A unicode string of an IP address
:return:
A byte string of the network form of the IP address
"""
if address_family not in set([socket.AF_INET, socket.AF_INET6]):
raise ValueError(unwrap(
'''
address_family must be socket.AF_INET (%s) or socket.AF_INET6 (%s),
not %s
''',
repr(socket.AF_INET),
repr(socket.AF_INET6),
repr(address_family)
))
if not isinstance(ip_string, str_cls):
raise TypeError(unwrap(
'''
ip_string must be a unicode string, not %s
''',
type_name(ip_string)
))
if address_family == socket.AF_INET:
octets = ip_string.split('.')
error = len(octets) != 4
if not error:
ints = []
for o in octets:
o = int(o)
if o > 255 or o < 0:
error = True
break
ints.append(o)
if error:
raise ValueError(unwrap(
'''
ip_string must be a dotted string with four integers in the
range of 0 to 255, got %s
''',
repr(ip_string)
))
return struct.pack(b'!BBBB', *ints)
error = False
omitted = ip_string.count('::')
if omitted > 1:
error = True
elif omitted == 0:
octets = ip_string.split(':')
error = len(octets) != 8
else:
begin, end = ip_string.split('::')
begin_octets = begin.split(':')
end_octets = end.split(':')
missing = 8 - len(begin_octets) - len(end_octets)
octets = begin_octets + (['0'] * missing) + end_octets
if not error:
ints = []
for o in octets:
o = int(o, 16)
if o > 65535 or o < 0:
error = True
break
ints.append(o)
return struct.pack(b'!HHHHHHHH', *ints)
raise ValueError(unwrap(
'''
ip_string must be a valid ipv6 string, got %s
''',
repr(ip_string)
))

View File

@ -0,0 +1,22 @@
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
def fill_width(bytes_, width):
"""
Ensure a byte string representing a positive integer is a specific width
(in bytes)
:param bytes_:
The integer byte string
:param width:
The desired width as an integer
:return:
A byte string of the width specified
"""
while len(bytes_) < width:
bytes_ = b'\x00' + bytes_
return bytes_

View File

@ -0,0 +1,291 @@
# coding: utf-8
"""
Functions to convert unicode IRIs into ASCII byte string URIs and back. Exports
the following items:
- iri_to_uri()
- uri_to_iri()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from encodings import idna # noqa
import codecs
import re
import sys
from ._errors import unwrap
from ._types import byte_cls, str_cls, type_name, bytes_to_list, int_types
if sys.version_info < (3,):
from urlparse import urlsplit, urlunsplit
from urllib import (
quote as urlquote,
unquote as unquote_to_bytes,
)
else:
from urllib.parse import (
quote as urlquote,
unquote_to_bytes,
urlsplit,
urlunsplit,
)
def iri_to_uri(value, normalize=False):
"""
Encodes a unicode IRI into an ASCII byte string URI
:param value:
A unicode string of an IRI
:param normalize:
A bool that controls URI normalization
:return:
A byte string of the ASCII-encoded URI
"""
if not isinstance(value, str_cls):
raise TypeError(unwrap(
'''
value must be a unicode string, not %s
''',
type_name(value)
))
scheme = None
# Python 2.6 doesn't split properly is the URL doesn't start with http:// or https://
if sys.version_info < (2, 7) and not value.startswith('http://') and not value.startswith('https://'):
real_prefix = None
prefix_match = re.match('^[^:]*://', value)
if prefix_match:
real_prefix = prefix_match.group(0)
value = 'http://' + value[len(real_prefix):]
parsed = urlsplit(value)
if real_prefix:
value = real_prefix + value[7:]
scheme = _urlquote(real_prefix[:-3])
else:
parsed = urlsplit(value)
if scheme is None:
scheme = _urlquote(parsed.scheme)
hostname = parsed.hostname
if hostname is not None:
hostname = hostname.encode('idna')
# RFC 3986 allows userinfo to contain sub-delims
username = _urlquote(parsed.username, safe='!$&\'()*+,;=')
password = _urlquote(parsed.password, safe='!$&\'()*+,;=')
port = parsed.port
if port is not None:
port = str_cls(port).encode('ascii')
netloc = b''
if username is not None:
netloc += username
if password:
netloc += b':' + password
netloc += b'@'
if hostname is not None:
netloc += hostname
if port is not None:
default_http = scheme == b'http' and port == b'80'
default_https = scheme == b'https' and port == b'443'
if not normalize or (not default_http and not default_https):
netloc += b':' + port
# RFC 3986 allows a path to contain sub-delims, plus "@" and ":"
path = _urlquote(parsed.path, safe='/!$&\'()*+,;=@:')
# RFC 3986 allows the query to contain sub-delims, plus "@", ":" , "/" and "?"
query = _urlquote(parsed.query, safe='/?!$&\'()*+,;=@:')
# RFC 3986 allows the fragment to contain sub-delims, plus "@", ":" , "/" and "?"
fragment = _urlquote(parsed.fragment, safe='/?!$&\'()*+,;=@:')
if normalize and query is None and fragment is None and path == b'/':
path = None
# Python 2.7 compat
if path is None:
path = ''
output = urlunsplit((scheme, netloc, path, query, fragment))
if isinstance(output, str_cls):
output = output.encode('latin1')
return output
def uri_to_iri(value):
"""
Converts an ASCII URI byte string into a unicode IRI
:param value:
An ASCII-encoded byte string of the URI
:return:
A unicode string of the IRI
"""
if not isinstance(value, byte_cls):
raise TypeError(unwrap(
'''
value must be a byte string, not %s
''',
type_name(value)
))
parsed = urlsplit(value)
scheme = parsed.scheme
if scheme is not None:
scheme = scheme.decode('ascii')
username = _urlunquote(parsed.username, remap=[':', '@'])
password = _urlunquote(parsed.password, remap=[':', '@'])
hostname = parsed.hostname
if hostname:
hostname = hostname.decode('idna')
port = parsed.port
if port and not isinstance(port, int_types):
port = port.decode('ascii')
netloc = ''
if username is not None:
netloc += username
if password:
netloc += ':' + password
netloc += '@'
if hostname is not None:
netloc += hostname
if port is not None:
netloc += ':' + str_cls(port)
path = _urlunquote(parsed.path, remap=['/'], preserve=True)
query = _urlunquote(parsed.query, remap=['&', '='], preserve=True)
fragment = _urlunquote(parsed.fragment)
return urlunsplit((scheme, netloc, path, query, fragment))
def _iri_utf8_errors_handler(exc):
"""
Error handler for decoding UTF-8 parts of a URI into an IRI. Leaves byte
sequences encoded in %XX format, but as part of a unicode string.
:param exc:
The UnicodeDecodeError exception
:return:
A 2-element tuple of (replacement unicode string, integer index to
resume at)
"""
bytes_as_ints = bytes_to_list(exc.object[exc.start:exc.end])
replacements = ['%%%02x' % num for num in bytes_as_ints]
return (''.join(replacements), exc.end)
codecs.register_error('iriutf8', _iri_utf8_errors_handler)
def _urlquote(string, safe=''):
"""
Quotes a unicode string for use in a URL
:param string:
A unicode string
:param safe:
A unicode string of character to not encode
:return:
None (if string is None) or an ASCII byte string of the quoted string
"""
if string is None or string == '':
return None
# Anything already hex quoted is pulled out of the URL and unquoted if
# possible
escapes = []
if re.search('%[0-9a-fA-F]{2}', string):
# Try to unquote any percent values, restoring them if they are not
# valid UTF-8. Also, requote any safe chars since encoded versions of
# those are functionally different than the unquoted ones.
def _try_unescape(match):
byte_string = unquote_to_bytes(match.group(0))
unicode_string = byte_string.decode('utf-8', 'iriutf8')
for safe_char in list(safe):
unicode_string = unicode_string.replace(safe_char, '%%%02x' % ord(safe_char))
return unicode_string
string = re.sub('(?:%[0-9a-fA-F]{2})+', _try_unescape, string)
# Once we have the minimal set of hex quoted values, removed them from
# the string so that they are not double quoted
def _extract_escape(match):
escapes.append(match.group(0).encode('ascii'))
return '\x00'
string = re.sub('%[0-9a-fA-F]{2}', _extract_escape, string)
output = urlquote(string.encode('utf-8'), safe=safe.encode('utf-8'))
if not isinstance(output, byte_cls):
output = output.encode('ascii')
# Restore the existing quoted values that we extracted
if len(escapes) > 0:
def _return_escape(_):
return escapes.pop(0)
output = re.sub(b'%00', _return_escape, output)
return output
def _urlunquote(byte_string, remap=None, preserve=None):
"""
Unquotes a URI portion from a byte string into unicode using UTF-8
:param byte_string:
A byte string of the data to unquote
:param remap:
A list of characters (as unicode) that should be re-mapped to a
%XX encoding. This is used when characters are not valid in part of a
URL.
:param preserve:
A bool - indicates that the chars to be remapped if they occur in
non-hex form, should be preserved. E.g. / for URL path.
:return:
A unicode string
"""
if byte_string is None:
return byte_string
if byte_string == b'':
return ''
if preserve:
replacements = ['\x1A', '\x1C', '\x1D', '\x1E', '\x1F']
preserve_unmap = {}
for char in remap:
replacement = replacements.pop(0)
preserve_unmap[replacement] = char
byte_string = byte_string.replace(char.encode('ascii'), replacement.encode('ascii'))
byte_string = unquote_to_bytes(byte_string)
if remap:
for char in remap:
byte_string = byte_string.replace(char.encode('ascii'), ('%%%02x' % ord(char)).encode('ascii'))
output = byte_string.decode('utf-8', 'iriutf8')
if preserve:
for replacement, original in preserve_unmap.items():
output = output.replace(replacement, original)
return output

View File

@ -0,0 +1,135 @@
# Copyright (c) 2009 Raymond Hettinger
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
import sys
if not sys.version_info < (2, 7):
from collections import OrderedDict
else:
from UserDict import DictMixin
class OrderedDict(dict, DictMixin):
def __init__(self, *args, **kwds):
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__end
except AttributeError:
self.clear()
self.update(*args, **kwds)
def clear(self):
self.__end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.__map = {} # key --> [key, prev, next]
dict.clear(self)
def __setitem__(self, key, value):
if key not in self:
end = self.__end
curr = end[1]
curr[2] = end[1] = self.__map[key] = [key, curr, end]
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self, key)
key, prev, next_ = self.__map.pop(key)
prev[2] = next_
next_[1] = prev
def __iter__(self):
end = self.__end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.__end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def popitem(self, last=True):
if not self:
raise KeyError('dictionary is empty')
if last:
key = reversed(self).next()
else:
key = iter(self).next()
value = self.pop(key)
return key, value
def __reduce__(self):
items = [[k, self[k]] for k in self]
tmp = self.__map, self.__end
del self.__map, self.__end
inst_dict = vars(self).copy()
self.__map, self.__end = tmp
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def keys(self):
return list(self)
setdefault = DictMixin.setdefault
update = DictMixin.update
pop = DictMixin.pop
values = DictMixin.values
items = DictMixin.items
iterkeys = DictMixin.iterkeys
itervalues = DictMixin.itervalues
iteritems = DictMixin.iteritems
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, self.items())
def copy(self):
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
d = cls()
for key in iterable:
d[key] = value
return d
def __eq__(self, other):
if isinstance(other, OrderedDict):
if len(self) != len(other):
return False
for p, q in zip(self.items(), other.items()):
if p != q:
return False
return True
return dict.__eq__(self, other)
def __ne__(self, other):
return not self == other

View File

@ -0,0 +1,331 @@
# coding: utf-8
"""
Implementation of the teletex T.61 codec. Exports the following items:
- register()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import codecs
class TeletexCodec(codecs.Codec):
def encode(self, input_, errors='strict'):
return codecs.charmap_encode(input_, errors, ENCODING_TABLE)
def decode(self, input_, errors='strict'):
return codecs.charmap_decode(input_, errors, DECODING_TABLE)
class TeletexIncrementalEncoder(codecs.IncrementalEncoder):
def encode(self, input_, final=False):
return codecs.charmap_encode(input_, self.errors, ENCODING_TABLE)[0]
class TeletexIncrementalDecoder(codecs.IncrementalDecoder):
def decode(self, input_, final=False):
return codecs.charmap_decode(input_, self.errors, DECODING_TABLE)[0]
class TeletexStreamWriter(TeletexCodec, codecs.StreamWriter):
pass
class TeletexStreamReader(TeletexCodec, codecs.StreamReader):
pass
def teletex_search_function(name):
"""
Search function for teletex codec that is passed to codecs.register()
"""
if name != 'teletex':
return None
return codecs.CodecInfo(
name='teletex',
encode=TeletexCodec().encode,
decode=TeletexCodec().decode,
incrementalencoder=TeletexIncrementalEncoder,
incrementaldecoder=TeletexIncrementalDecoder,
streamreader=TeletexStreamReader,
streamwriter=TeletexStreamWriter,
)
def register():
"""
Registers the teletex codec
"""
codecs.register(teletex_search_function)
# http://en.wikipedia.org/wiki/ITU_T.61
DECODING_TABLE = (
'\u0000'
'\u0001'
'\u0002'
'\u0003'
'\u0004'
'\u0005'
'\u0006'
'\u0007'
'\u0008'
'\u0009'
'\u000A'
'\u000B'
'\u000C'
'\u000D'
'\u000E'
'\u000F'
'\u0010'
'\u0011'
'\u0012'
'\u0013'
'\u0014'
'\u0015'
'\u0016'
'\u0017'
'\u0018'
'\u0019'
'\u001A'
'\u001B'
'\u001C'
'\u001D'
'\u001E'
'\u001F'
'\u0020'
'\u0021'
'\u0022'
'\ufffe'
'\ufffe'
'\u0025'
'\u0026'
'\u0027'
'\u0028'
'\u0029'
'\u002A'
'\u002B'
'\u002C'
'\u002D'
'\u002E'
'\u002F'
'\u0030'
'\u0031'
'\u0032'
'\u0033'
'\u0034'
'\u0035'
'\u0036'
'\u0037'
'\u0038'
'\u0039'
'\u003A'
'\u003B'
'\u003C'
'\u003D'
'\u003E'
'\u003F'
'\u0040'
'\u0041'
'\u0042'
'\u0043'
'\u0044'
'\u0045'
'\u0046'
'\u0047'
'\u0048'
'\u0049'
'\u004A'
'\u004B'
'\u004C'
'\u004D'
'\u004E'
'\u004F'
'\u0050'
'\u0051'
'\u0052'
'\u0053'
'\u0054'
'\u0055'
'\u0056'
'\u0057'
'\u0058'
'\u0059'
'\u005A'
'\u005B'
'\ufffe'
'\u005D'
'\ufffe'
'\u005F'
'\ufffe'
'\u0061'
'\u0062'
'\u0063'
'\u0064'
'\u0065'
'\u0066'
'\u0067'
'\u0068'
'\u0069'
'\u006A'
'\u006B'
'\u006C'
'\u006D'
'\u006E'
'\u006F'
'\u0070'
'\u0071'
'\u0072'
'\u0073'
'\u0074'
'\u0075'
'\u0076'
'\u0077'
'\u0078'
'\u0079'
'\u007A'
'\ufffe'
'\u007C'
'\ufffe'
'\ufffe'
'\u007F'
'\u0080'
'\u0081'
'\u0082'
'\u0083'
'\u0084'
'\u0085'
'\u0086'
'\u0087'
'\u0088'
'\u0089'
'\u008A'
'\u008B'
'\u008C'
'\u008D'
'\u008E'
'\u008F'
'\u0090'
'\u0091'
'\u0092'
'\u0093'
'\u0094'
'\u0095'
'\u0096'
'\u0097'
'\u0098'
'\u0099'
'\u009A'
'\u009B'
'\u009C'
'\u009D'
'\u009E'
'\u009F'
'\u00A0'
'\u00A1'
'\u00A2'
'\u00A3'
'\u0024'
'\u00A5'
'\u0023'
'\u00A7'
'\u00A4'
'\ufffe'
'\ufffe'
'\u00AB'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\u00B0'
'\u00B1'
'\u00B2'
'\u00B3'
'\u00D7'
'\u00B5'
'\u00B6'
'\u00B7'
'\u00F7'
'\ufffe'
'\ufffe'
'\u00BB'
'\u00BC'
'\u00BD'
'\u00BE'
'\u00BF'
'\ufffe'
'\u0300'
'\u0301'
'\u0302'
'\u0303'
'\u0304'
'\u0306'
'\u0307'
'\u0308'
'\ufffe'
'\u030A'
'\u0327'
'\u0332'
'\u030B'
'\u0328'
'\u030C'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\ufffe'
'\u2126'
'\u00C6'
'\u00D0'
'\u00AA'
'\u0126'
'\ufffe'
'\u0132'
'\u013F'
'\u0141'
'\u00D8'
'\u0152'
'\u00BA'
'\u00DE'
'\u0166'
'\u014A'
'\u0149'
'\u0138'
'\u00E6'
'\u0111'
'\u00F0'
'\u0127'
'\u0131'
'\u0133'
'\u0140'
'\u0142'
'\u00F8'
'\u0153'
'\u00DF'
'\u00FE'
'\u0167'
'\u014B'
'\ufffe'
)
ENCODING_TABLE = codecs.charmap_build(DECODING_TABLE)

View File

@ -0,0 +1,46 @@
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
import inspect
import sys
if sys.version_info < (3,):
str_cls = unicode # noqa
byte_cls = str
int_types = (int, long) # noqa
def bytes_to_list(byte_string):
return [ord(b) for b in byte_string]
chr_cls = chr
else:
str_cls = str
byte_cls = bytes
int_types = int
bytes_to_list = list
def chr_cls(num):
return bytes([num])
def type_name(value):
"""
Returns a user-readable name for the type of an object
:param value:
A value to get the type name of
:return:
A unicode string of the object's type name
"""
if inspect.isclass(value):
cls = value
else:
cls = value.__class__
if cls.__module__ in set(['builtins', '__builtin__']):
return cls.__name__
return '%s.%s' % (cls.__module__, cls.__name__)

File diff suppressed because it is too large Load Diff

1003
jc/parsers/asn1crypto/cms.py Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,536 @@
# coding: utf-8
"""
ASN.1 type classes for certificate revocation lists (CRL). Exports the
following items:
- CertificateList()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import hashlib
from .algos import SignedDigestAlgorithm
from .core import (
Boolean,
Enumerated,
GeneralizedTime,
Integer,
ObjectIdentifier,
OctetBitString,
ParsableOctetString,
Sequence,
SequenceOf,
)
from .x509 import (
AuthorityInfoAccessSyntax,
AuthorityKeyIdentifier,
CRLDistributionPoints,
DistributionPointName,
GeneralNames,
Name,
ReasonFlags,
Time,
)
# The structures in this file are taken from https://tools.ietf.org/html/rfc5280
class Version(Integer):
_map = {
0: 'v1',
1: 'v2',
2: 'v3',
}
class IssuingDistributionPoint(Sequence):
_fields = [
('distribution_point', DistributionPointName, {'explicit': 0, 'optional': True}),
('only_contains_user_certs', Boolean, {'implicit': 1, 'default': False}),
('only_contains_ca_certs', Boolean, {'implicit': 2, 'default': False}),
('only_some_reasons', ReasonFlags, {'implicit': 3, 'optional': True}),
('indirect_crl', Boolean, {'implicit': 4, 'default': False}),
('only_contains_attribute_certs', Boolean, {'implicit': 5, 'default': False}),
]
class TBSCertListExtensionId(ObjectIdentifier):
_map = {
'2.5.29.18': 'issuer_alt_name',
'2.5.29.20': 'crl_number',
'2.5.29.27': 'delta_crl_indicator',
'2.5.29.28': 'issuing_distribution_point',
'2.5.29.35': 'authority_key_identifier',
'2.5.29.46': 'freshest_crl',
'1.3.6.1.5.5.7.1.1': 'authority_information_access',
}
class TBSCertListExtension(Sequence):
_fields = [
('extn_id', TBSCertListExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'issuer_alt_name': GeneralNames,
'crl_number': Integer,
'delta_crl_indicator': Integer,
'issuing_distribution_point': IssuingDistributionPoint,
'authority_key_identifier': AuthorityKeyIdentifier,
'freshest_crl': CRLDistributionPoints,
'authority_information_access': AuthorityInfoAccessSyntax,
}
class TBSCertListExtensions(SequenceOf):
_child_spec = TBSCertListExtension
class CRLReason(Enumerated):
_map = {
0: 'unspecified',
1: 'key_compromise',
2: 'ca_compromise',
3: 'affiliation_changed',
4: 'superseded',
5: 'cessation_of_operation',
6: 'certificate_hold',
8: 'remove_from_crl',
9: 'privilege_withdrawn',
10: 'aa_compromise',
}
@property
def human_friendly(self):
"""
:return:
A unicode string with revocation description that is suitable to
show to end-users. Starts with a lower case letter and phrased in
such a way that it makes sense after the phrase "because of" or
"due to".
"""
return {
'unspecified': 'an unspecified reason',
'key_compromise': 'a compromised key',
'ca_compromise': 'the CA being compromised',
'affiliation_changed': 'an affiliation change',
'superseded': 'certificate supersession',
'cessation_of_operation': 'a cessation of operation',
'certificate_hold': 'a certificate hold',
'remove_from_crl': 'removal from the CRL',
'privilege_withdrawn': 'privilege withdrawl',
'aa_compromise': 'the AA being compromised',
}[self.native]
class CRLEntryExtensionId(ObjectIdentifier):
_map = {
'2.5.29.21': 'crl_reason',
'2.5.29.23': 'hold_instruction_code',
'2.5.29.24': 'invalidity_date',
'2.5.29.29': 'certificate_issuer',
}
class CRLEntryExtension(Sequence):
_fields = [
('extn_id', CRLEntryExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'crl_reason': CRLReason,
'hold_instruction_code': ObjectIdentifier,
'invalidity_date': GeneralizedTime,
'certificate_issuer': GeneralNames,
}
class CRLEntryExtensions(SequenceOf):
_child_spec = CRLEntryExtension
class RevokedCertificate(Sequence):
_fields = [
('user_certificate', Integer),
('revocation_date', Time),
('crl_entry_extensions', CRLEntryExtensions, {'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_crl_reason_value = None
_invalidity_date_value = None
_certificate_issuer_value = None
_issuer_name = False
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['crl_entry_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def crl_reason_value(self):
"""
This extension indicates the reason that a certificate was revoked.
:return:
None or a CRLReason object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._crl_reason_value
@property
def invalidity_date_value(self):
"""
This extension indicates the suspected date/time the private key was
compromised or the certificate became invalid. This would usually be
before the revocation date, which is when the CA processed the
revocation.
:return:
None or a GeneralizedTime object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._invalidity_date_value
@property
def certificate_issuer_value(self):
"""
This extension indicates the issuer of the certificate in question,
and is used in indirect CRLs. CRL entries without this extension are
for certificates issued from the last seen issuer.
:return:
None or an x509.GeneralNames object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._certificate_issuer_value
@property
def issuer_name(self):
"""
:return:
None, or an asn1crypto.x509.Name object for the issuer of the cert
"""
if self._issuer_name is False:
self._issuer_name = None
if self.certificate_issuer_value:
for general_name in self.certificate_issuer_value:
if general_name.name == 'directory_name':
self._issuer_name = general_name.chosen
break
return self._issuer_name
class RevokedCertificates(SequenceOf):
_child_spec = RevokedCertificate
class TbsCertList(Sequence):
_fields = [
('version', Version, {'optional': True}),
('signature', SignedDigestAlgorithm),
('issuer', Name),
('this_update', Time),
('next_update', Time, {'optional': True}),
('revoked_certificates', RevokedCertificates, {'optional': True}),
('crl_extensions', TBSCertListExtensions, {'explicit': 0, 'optional': True}),
]
class CertificateList(Sequence):
_fields = [
('tbs_cert_list', TbsCertList),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
]
_processed_extensions = False
_critical_extensions = None
_issuer_alt_name_value = None
_crl_number_value = None
_delta_crl_indicator_value = None
_issuing_distribution_point_value = None
_authority_key_identifier_value = None
_freshest_crl_value = None
_authority_information_access_value = None
_issuer_cert_urls = None
_delta_crl_distribution_points = None
_sha1 = None
_sha256 = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['tbs_cert_list']['crl_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def issuer_alt_name_value(self):
"""
This extension allows associating one or more alternative names with
the issuer of the CRL.
:return:
None or an x509.GeneralNames object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._issuer_alt_name_value
@property
def crl_number_value(self):
"""
This extension adds a monotonically increasing number to the CRL and is
used to distinguish different versions of the CRL.
:return:
None or an Integer object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._crl_number_value
@property
def delta_crl_indicator_value(self):
"""
This extension indicates a CRL is a delta CRL, and contains the CRL
number of the base CRL that it is a delta from.
:return:
None or an Integer object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._delta_crl_indicator_value
@property
def issuing_distribution_point_value(self):
"""
This extension includes information about what types of revocations
and certificates are part of the CRL.
:return:
None or an IssuingDistributionPoint object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._issuing_distribution_point_value
@property
def authority_key_identifier_value(self):
"""
This extension helps in identifying the public key with which to
validate the authenticity of the CRL.
:return:
None or an AuthorityKeyIdentifier object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._authority_key_identifier_value
@property
def freshest_crl_value(self):
"""
This extension is used in complete CRLs to indicate where a delta CRL
may be located.
:return:
None or a CRLDistributionPoints object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._freshest_crl_value
@property
def authority_information_access_value(self):
"""
This extension is used to provide a URL with which to download the
certificate used to sign this CRL.
:return:
None or an AuthorityInfoAccessSyntax object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._authority_information_access_value
@property
def issuer(self):
"""
:return:
An asn1crypto.x509.Name object for the issuer of the CRL
"""
return self['tbs_cert_list']['issuer']
@property
def authority_key_identifier(self):
"""
:return:
None or a byte string of the key_identifier from the authority key
identifier extension
"""
if not self.authority_key_identifier_value:
return None
return self.authority_key_identifier_value['key_identifier'].native
@property
def issuer_cert_urls(self):
"""
:return:
A list of unicode strings that are URLs that should contain either
an individual DER-encoded X.509 certificate, or a DER-encoded CMS
message containing multiple certificates
"""
if self._issuer_cert_urls is None:
self._issuer_cert_urls = []
if self.authority_information_access_value:
for entry in self.authority_information_access_value:
if entry['access_method'].native == 'ca_issuers':
location = entry['access_location']
if location.name != 'uniform_resource_identifier':
continue
url = location.native
if url.lower()[0:7] == 'http://':
self._issuer_cert_urls.append(url)
return self._issuer_cert_urls
@property
def delta_crl_distribution_points(self):
"""
Returns delta CRL URLs - only applies to complete CRLs
:return:
A list of zero or more DistributionPoint objects
"""
if self._delta_crl_distribution_points is None:
self._delta_crl_distribution_points = []
if self.freshest_crl_value is not None:
for distribution_point in self.freshest_crl_value:
distribution_point_name = distribution_point['distribution_point']
# RFC 5280 indicates conforming CA should not use the relative form
if distribution_point_name.name == 'name_relative_to_crl_issuer':
continue
# This library is currently only concerned with HTTP-based CRLs
for general_name in distribution_point_name.chosen:
if general_name.name == 'uniform_resource_identifier':
self._delta_crl_distribution_points.append(distribution_point)
return self._delta_crl_distribution_points
@property
def signature(self):
"""
:return:
A byte string of the signature
"""
return self['signature'].native
@property
def sha1(self):
"""
:return:
The SHA1 hash of the DER-encoded bytes of this certificate list
"""
if self._sha1 is None:
self._sha1 = hashlib.sha1(self.dump()).digest()
return self._sha1
@property
def sha256(self):
"""
:return:
The SHA-256 hash of the DER-encoded bytes of this certificate list
"""
if self._sha256 is None:
self._sha256 = hashlib.sha256(self.dump()).digest()
return self._sha256

View File

@ -0,0 +1,133 @@
# coding: utf-8
"""
ASN.1 type classes for certificate signing requests (CSR). Exports the
following items:
- CertificationRequest()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from .algos import SignedDigestAlgorithm
from .core import (
Any,
BitString,
BMPString,
Integer,
ObjectIdentifier,
OctetBitString,
Sequence,
SetOf,
UTF8String
)
from .keys import PublicKeyInfo
from .x509 import DirectoryString, Extensions, Name
# The structures in this file are taken from https://tools.ietf.org/html/rfc2986
# and https://tools.ietf.org/html/rfc2985
class Version(Integer):
_map = {
0: 'v1',
}
class CSRAttributeType(ObjectIdentifier):
_map = {
'1.2.840.113549.1.9.7': 'challenge_password',
'1.2.840.113549.1.9.9': 'extended_certificate_attributes',
'1.2.840.113549.1.9.14': 'extension_request',
# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-wcce/a5eaae36-e9f3-4dc5-a687-bfa7115954f1
'1.3.6.1.4.1.311.13.2.2': 'microsoft_enrollment_csp_provider',
# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-wcce/7c677cba-030d-48be-ba2b-01e407705f34
'1.3.6.1.4.1.311.13.2.3': 'microsoft_os_version',
# https://docs.microsoft.com/en-us/openspecs/windows_protocols/ms-wcce/64e5ff6d-c6dd-4578-92f7-b3d895f9b9c7
'1.3.6.1.4.1.311.21.20': 'microsoft_request_client_info',
}
class SetOfDirectoryString(SetOf):
_child_spec = DirectoryString
class Attribute(Sequence):
_fields = [
('type', ObjectIdentifier),
('values', SetOf, {'spec': Any}),
]
class SetOfAttributes(SetOf):
_child_spec = Attribute
class SetOfExtensions(SetOf):
_child_spec = Extensions
class MicrosoftEnrollmentCSProvider(Sequence):
_fields = [
('keyspec', Integer),
('cspname', BMPString), # cryptographic service provider name
('signature', BitString),
]
class SetOfMicrosoftEnrollmentCSProvider(SetOf):
_child_spec = MicrosoftEnrollmentCSProvider
class MicrosoftRequestClientInfo(Sequence):
_fields = [
('clientid', Integer),
('machinename', UTF8String),
('username', UTF8String),
('processname', UTF8String),
]
class SetOfMicrosoftRequestClientInfo(SetOf):
_child_spec = MicrosoftRequestClientInfo
class CRIAttribute(Sequence):
_fields = [
('type', CSRAttributeType),
('values', Any),
]
_oid_pair = ('type', 'values')
_oid_specs = {
'challenge_password': SetOfDirectoryString,
'extended_certificate_attributes': SetOfAttributes,
'extension_request': SetOfExtensions,
'microsoft_enrollment_csp_provider': SetOfMicrosoftEnrollmentCSProvider,
'microsoft_os_version': SetOfDirectoryString,
'microsoft_request_client_info': SetOfMicrosoftRequestClientInfo,
}
class CRIAttributes(SetOf):
_child_spec = CRIAttribute
class CertificationRequestInfo(Sequence):
_fields = [
('version', Version),
('subject', Name),
('subject_pk_info', PublicKeyInfo),
('attributes', CRIAttributes, {'implicit': 0, 'optional': True}),
]
class CertificationRequest(Sequence):
_fields = [
('certification_request_info', CertificationRequestInfo),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
]

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,703 @@
# coding: utf-8
"""
ASN.1 type classes for the online certificate status protocol (OCSP). Exports
the following items:
- OCSPRequest()
- OCSPResponse()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from ._errors import unwrap
from .algos import DigestAlgorithm, SignedDigestAlgorithm
from .core import (
Boolean,
Choice,
Enumerated,
GeneralizedTime,
IA5String,
Integer,
Null,
ObjectIdentifier,
OctetBitString,
OctetString,
ParsableOctetString,
Sequence,
SequenceOf,
)
from .crl import AuthorityInfoAccessSyntax, CRLReason
from .keys import PublicKeyAlgorithm
from .x509 import Certificate, GeneralName, GeneralNames, Name
# The structures in this file are taken from https://tools.ietf.org/html/rfc6960
class Version(Integer):
_map = {
0: 'v1'
}
class CertId(Sequence):
_fields = [
('hash_algorithm', DigestAlgorithm),
('issuer_name_hash', OctetString),
('issuer_key_hash', OctetString),
('serial_number', Integer),
]
class ServiceLocator(Sequence):
_fields = [
('issuer', Name),
('locator', AuthorityInfoAccessSyntax),
]
class RequestExtensionId(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.7': 'service_locator',
}
class RequestExtension(Sequence):
_fields = [
('extn_id', RequestExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'service_locator': ServiceLocator,
}
class RequestExtensions(SequenceOf):
_child_spec = RequestExtension
class Request(Sequence):
_fields = [
('req_cert', CertId),
('single_request_extensions', RequestExtensions, {'explicit': 0, 'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_service_locator_value = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['single_request_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def service_locator_value(self):
"""
This extension is used when communicating with an OCSP responder that
acts as a proxy for OCSP requests
:return:
None or a ServiceLocator object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._service_locator_value
class Requests(SequenceOf):
_child_spec = Request
class ResponseType(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.1': 'basic_ocsp_response',
}
class AcceptableResponses(SequenceOf):
_child_spec = ResponseType
class PreferredSignatureAlgorithm(Sequence):
_fields = [
('sig_identifier', SignedDigestAlgorithm),
('cert_identifier', PublicKeyAlgorithm, {'optional': True}),
]
class PreferredSignatureAlgorithms(SequenceOf):
_child_spec = PreferredSignatureAlgorithm
class TBSRequestExtensionId(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.2': 'nonce',
'1.3.6.1.5.5.7.48.1.4': 'acceptable_responses',
'1.3.6.1.5.5.7.48.1.8': 'preferred_signature_algorithms',
}
class TBSRequestExtension(Sequence):
_fields = [
('extn_id', TBSRequestExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'nonce': OctetString,
'acceptable_responses': AcceptableResponses,
'preferred_signature_algorithms': PreferredSignatureAlgorithms,
}
class TBSRequestExtensions(SequenceOf):
_child_spec = TBSRequestExtension
class TBSRequest(Sequence):
_fields = [
('version', Version, {'explicit': 0, 'default': 'v1'}),
('requestor_name', GeneralName, {'explicit': 1, 'optional': True}),
('request_list', Requests),
('request_extensions', TBSRequestExtensions, {'explicit': 2, 'optional': True}),
]
class Certificates(SequenceOf):
_child_spec = Certificate
class Signature(Sequence):
_fields = [
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
('certs', Certificates, {'explicit': 0, 'optional': True}),
]
class OCSPRequest(Sequence):
_fields = [
('tbs_request', TBSRequest),
('optional_signature', Signature, {'explicit': 0, 'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_nonce_value = None
_acceptable_responses_value = None
_preferred_signature_algorithms_value = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['tbs_request']['request_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def nonce_value(self):
"""
This extension is used to prevent replay attacks by including a unique,
random value with each request/response pair
:return:
None or an OctetString object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._nonce_value
@property
def acceptable_responses_value(self):
"""
This extension is used to allow the client and server to communicate
with alternative response formats other than just basic_ocsp_response,
although no other formats are defined in the standard.
:return:
None or an AcceptableResponses object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._acceptable_responses_value
@property
def preferred_signature_algorithms_value(self):
"""
This extension is used by the client to define what signature algorithms
are preferred, including both the hash algorithm and the public key
algorithm, with a level of detail down to even the public key algorithm
parameters, such as curve name.
:return:
None or a PreferredSignatureAlgorithms object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._preferred_signature_algorithms_value
class OCSPResponseStatus(Enumerated):
_map = {
0: 'successful',
1: 'malformed_request',
2: 'internal_error',
3: 'try_later',
5: 'sign_required',
6: 'unauthorized',
}
class ResponderId(Choice):
_alternatives = [
('by_name', Name, {'explicit': 1}),
('by_key', OctetString, {'explicit': 2}),
]
# Custom class to return a meaningful .native attribute from CertStatus()
class StatusGood(Null):
def set(self, value):
"""
Sets the value of the object
:param value:
None or 'good'
"""
if value is not None and value != 'good' and not isinstance(value, Null):
raise ValueError(unwrap(
'''
value must be one of None, "good", not %s
''',
repr(value)
))
self.contents = b''
@property
def native(self):
return 'good'
# Custom class to return a meaningful .native attribute from CertStatus()
class StatusUnknown(Null):
def set(self, value):
"""
Sets the value of the object
:param value:
None or 'unknown'
"""
if value is not None and value != 'unknown' and not isinstance(value, Null):
raise ValueError(unwrap(
'''
value must be one of None, "unknown", not %s
''',
repr(value)
))
self.contents = b''
@property
def native(self):
return 'unknown'
class RevokedInfo(Sequence):
_fields = [
('revocation_time', GeneralizedTime),
('revocation_reason', CRLReason, {'explicit': 0, 'optional': True}),
]
class CertStatus(Choice):
_alternatives = [
('good', StatusGood, {'implicit': 0}),
('revoked', RevokedInfo, {'implicit': 1}),
('unknown', StatusUnknown, {'implicit': 2}),
]
class CrlId(Sequence):
_fields = [
('crl_url', IA5String, {'explicit': 0, 'optional': True}),
('crl_num', Integer, {'explicit': 1, 'optional': True}),
('crl_time', GeneralizedTime, {'explicit': 2, 'optional': True}),
]
class SingleResponseExtensionId(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.3': 'crl',
'1.3.6.1.5.5.7.48.1.6': 'archive_cutoff',
# These are CRLEntryExtension values from
# https://tools.ietf.org/html/rfc5280
'2.5.29.21': 'crl_reason',
'2.5.29.24': 'invalidity_date',
'2.5.29.29': 'certificate_issuer',
# https://tools.ietf.org/html/rfc6962.html#page-13
'1.3.6.1.4.1.11129.2.4.5': 'signed_certificate_timestamp_list',
}
class SingleResponseExtension(Sequence):
_fields = [
('extn_id', SingleResponseExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'crl': CrlId,
'archive_cutoff': GeneralizedTime,
'crl_reason': CRLReason,
'invalidity_date': GeneralizedTime,
'certificate_issuer': GeneralNames,
'signed_certificate_timestamp_list': OctetString,
}
class SingleResponseExtensions(SequenceOf):
_child_spec = SingleResponseExtension
class SingleResponse(Sequence):
_fields = [
('cert_id', CertId),
('cert_status', CertStatus),
('this_update', GeneralizedTime),
('next_update', GeneralizedTime, {'explicit': 0, 'optional': True}),
('single_extensions', SingleResponseExtensions, {'explicit': 1, 'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_crl_value = None
_archive_cutoff_value = None
_crl_reason_value = None
_invalidity_date_value = None
_certificate_issuer_value = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['single_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def crl_value(self):
"""
This extension is used to locate the CRL that a certificate's revocation
is contained within.
:return:
None or a CrlId object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._crl_value
@property
def archive_cutoff_value(self):
"""
This extension is used to indicate the date at which an archived
(historical) certificate status entry will no longer be available.
:return:
None or a GeneralizedTime object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._archive_cutoff_value
@property
def crl_reason_value(self):
"""
This extension indicates the reason that a certificate was revoked.
:return:
None or a CRLReason object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._crl_reason_value
@property
def invalidity_date_value(self):
"""
This extension indicates the suspected date/time the private key was
compromised or the certificate became invalid. This would usually be
before the revocation date, which is when the CA processed the
revocation.
:return:
None or a GeneralizedTime object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._invalidity_date_value
@property
def certificate_issuer_value(self):
"""
This extension indicates the issuer of the certificate in question.
:return:
None or an x509.GeneralNames object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._certificate_issuer_value
class Responses(SequenceOf):
_child_spec = SingleResponse
class ResponseDataExtensionId(ObjectIdentifier):
_map = {
'1.3.6.1.5.5.7.48.1.2': 'nonce',
'1.3.6.1.5.5.7.48.1.9': 'extended_revoke',
}
class ResponseDataExtension(Sequence):
_fields = [
('extn_id', ResponseDataExtensionId),
('critical', Boolean, {'default': False}),
('extn_value', ParsableOctetString),
]
_oid_pair = ('extn_id', 'extn_value')
_oid_specs = {
'nonce': OctetString,
'extended_revoke': Null,
}
class ResponseDataExtensions(SequenceOf):
_child_spec = ResponseDataExtension
class ResponseData(Sequence):
_fields = [
('version', Version, {'explicit': 0, 'default': 'v1'}),
('responder_id', ResponderId),
('produced_at', GeneralizedTime),
('responses', Responses),
('response_extensions', ResponseDataExtensions, {'explicit': 1, 'optional': True}),
]
class BasicOCSPResponse(Sequence):
_fields = [
('tbs_response_data', ResponseData),
('signature_algorithm', SignedDigestAlgorithm),
('signature', OctetBitString),
('certs', Certificates, {'explicit': 0, 'optional': True}),
]
class ResponseBytes(Sequence):
_fields = [
('response_type', ResponseType),
('response', ParsableOctetString),
]
_oid_pair = ('response_type', 'response')
_oid_specs = {
'basic_ocsp_response': BasicOCSPResponse,
}
class OCSPResponse(Sequence):
_fields = [
('response_status', OCSPResponseStatus),
('response_bytes', ResponseBytes, {'explicit': 0, 'optional': True}),
]
_processed_extensions = False
_critical_extensions = None
_nonce_value = None
_extended_revoke_value = None
def _set_extensions(self):
"""
Sets common named extensions to private attributes and creates a list
of critical extensions
"""
self._critical_extensions = set()
for extension in self['response_bytes']['response'].parsed['tbs_response_data']['response_extensions']:
name = extension['extn_id'].native
attribute_name = '_%s_value' % name
if hasattr(self, attribute_name):
setattr(self, attribute_name, extension['extn_value'].parsed)
if extension['critical'].native:
self._critical_extensions.add(name)
self._processed_extensions = True
@property
def critical_extensions(self):
"""
Returns a set of the names (or OID if not a known extension) of the
extensions marked as critical
:return:
A set of unicode strings
"""
if not self._processed_extensions:
self._set_extensions()
return self._critical_extensions
@property
def nonce_value(self):
"""
This extension is used to prevent replay attacks on the request/response
exchange
:return:
None or an OctetString object
"""
if self._processed_extensions is False:
self._set_extensions()
return self._nonce_value
@property
def extended_revoke_value(self):
"""
This extension is used to signal that the responder will return a
"revoked" status for non-issued certificates.
:return:
None or a Null object (if present)
"""
if self._processed_extensions is False:
self._set_extensions()
return self._extended_revoke_value
@property
def basic_ocsp_response(self):
"""
A shortcut into the BasicOCSPResponse sequence
:return:
None or an asn1crypto.ocsp.BasicOCSPResponse object
"""
return self['response_bytes']['response'].parsed
@property
def response_data(self):
"""
A shortcut into the parsed, ResponseData sequence
:return:
None or an asn1crypto.ocsp.ResponseData object
"""
return self['response_bytes']['response'].parsed['tbs_response_data']

View File

@ -0,0 +1,292 @@
# coding: utf-8
"""
Functions for parsing and dumping using the ASN.1 DER encoding. Exports the
following items:
- emit()
- parse()
- peek()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import sys
from ._types import byte_cls, chr_cls, type_name
from .util import int_from_bytes, int_to_bytes
_PY2 = sys.version_info <= (3,)
_INSUFFICIENT_DATA_MESSAGE = 'Insufficient data - %s bytes requested but only %s available'
_MAX_DEPTH = 10
def emit(class_, method, tag, contents):
"""
Constructs a byte string of an ASN.1 DER-encoded value
This is typically not useful. Instead, use one of the standard classes from
asn1crypto.core, or construct a new class with specific fields, and call the
.dump() method.
:param class_:
An integer ASN.1 class value: 0 (universal), 1 (application),
2 (context), 3 (private)
:param method:
An integer ASN.1 method value: 0 (primitive), 1 (constructed)
:param tag:
An integer ASN.1 tag value
:param contents:
A byte string of the encoded byte contents
:return:
A byte string of the ASN.1 DER value (header and contents)
"""
if not isinstance(class_, int):
raise TypeError('class_ must be an integer, not %s' % type_name(class_))
if class_ < 0 or class_ > 3:
raise ValueError('class_ must be one of 0, 1, 2 or 3, not %s' % class_)
if not isinstance(method, int):
raise TypeError('method must be an integer, not %s' % type_name(method))
if method < 0 or method > 1:
raise ValueError('method must be 0 or 1, not %s' % method)
if not isinstance(tag, int):
raise TypeError('tag must be an integer, not %s' % type_name(tag))
if tag < 0:
raise ValueError('tag must be greater than zero, not %s' % tag)
if not isinstance(contents, byte_cls):
raise TypeError('contents must be a byte string, not %s' % type_name(contents))
return _dump_header(class_, method, tag, contents) + contents
def parse(contents, strict=False):
"""
Parses a byte string of ASN.1 BER/DER-encoded data.
This is typically not useful. Instead, use one of the standard classes from
asn1crypto.core, or construct a new class with specific fields, and call the
.load() class method.
:param contents:
A byte string of BER/DER-encoded data
:param strict:
A boolean indicating if trailing data should be forbidden - if so, a
ValueError will be raised when trailing data exists
:raises:
ValueError - when the contents do not contain an ASN.1 header or are truncated in some way
TypeError - when contents is not a byte string
:return:
A 6-element tuple:
- 0: integer class (0 to 3)
- 1: integer method
- 2: integer tag
- 3: byte string header
- 4: byte string content
- 5: byte string trailer
"""
if not isinstance(contents, byte_cls):
raise TypeError('contents must be a byte string, not %s' % type_name(contents))
contents_len = len(contents)
info, consumed = _parse(contents, contents_len)
if strict and consumed != contents_len:
raise ValueError('Extra data - %d bytes of trailing data were provided' % (contents_len - consumed))
return info
def peek(contents):
"""
Parses a byte string of ASN.1 BER/DER-encoded data to find the length
This is typically used to look into an encoded value to see how long the
next chunk of ASN.1-encoded data is. Primarily it is useful when a
value is a concatenation of multiple values.
:param contents:
A byte string of BER/DER-encoded data
:raises:
ValueError - when the contents do not contain an ASN.1 header or are truncated in some way
TypeError - when contents is not a byte string
:return:
An integer with the number of bytes occupied by the ASN.1 value
"""
if not isinstance(contents, byte_cls):
raise TypeError('contents must be a byte string, not %s' % type_name(contents))
info, consumed = _parse(contents, len(contents))
return consumed
def _parse(encoded_data, data_len, pointer=0, lengths_only=False, depth=0):
"""
Parses a byte string into component parts
:param encoded_data:
A byte string that contains BER-encoded data
:param data_len:
The integer length of the encoded data
:param pointer:
The index in the byte string to parse from
:param lengths_only:
A boolean to cause the call to return a 2-element tuple of the integer
number of bytes in the header and the integer number of bytes in the
contents. Internal use only.
:param depth:
The recursion depth when evaluating indefinite-length encoding.
:return:
A 2-element tuple:
- 0: A tuple of (class_, method, tag, header, content, trailer)
- 1: An integer indicating how many bytes were consumed
"""
if depth > _MAX_DEPTH:
raise ValueError('Indefinite-length recursion limit exceeded')
start = pointer
if data_len < pointer + 1:
raise ValueError(_INSUFFICIENT_DATA_MESSAGE % (1, data_len - pointer))
first_octet = ord(encoded_data[pointer]) if _PY2 else encoded_data[pointer]
pointer += 1
tag = first_octet & 31
constructed = (first_octet >> 5) & 1
# Base 128 length using 8th bit as continuation indicator
if tag == 31:
tag = 0
while True:
if data_len < pointer + 1:
raise ValueError(_INSUFFICIENT_DATA_MESSAGE % (1, data_len - pointer))
num = ord(encoded_data[pointer]) if _PY2 else encoded_data[pointer]
pointer += 1
if num == 0x80 and tag == 0:
raise ValueError('Non-minimal tag encoding')
tag *= 128
tag += num & 127
if num >> 7 == 0:
break
if tag < 31:
raise ValueError('Non-minimal tag encoding')
if data_len < pointer + 1:
raise ValueError(_INSUFFICIENT_DATA_MESSAGE % (1, data_len - pointer))
length_octet = ord(encoded_data[pointer]) if _PY2 else encoded_data[pointer]
pointer += 1
trailer = b''
if length_octet >> 7 == 0:
contents_end = pointer + (length_octet & 127)
else:
length_octets = length_octet & 127
if length_octets:
if data_len < pointer + length_octets:
raise ValueError(_INSUFFICIENT_DATA_MESSAGE % (length_octets, data_len - pointer))
pointer += length_octets
contents_end = pointer + int_from_bytes(encoded_data[pointer - length_octets:pointer], signed=False)
else:
# To properly parse indefinite length values, we need to scan forward
# parsing headers until we find a value with a length of zero. If we
# just scanned looking for \x00\x00, nested indefinite length values
# would not work.
if not constructed:
raise ValueError('Indefinite-length element must be constructed')
contents_end = pointer
while data_len < contents_end + 2 or encoded_data[contents_end:contents_end+2] != b'\x00\x00':
_, contents_end = _parse(encoded_data, data_len, contents_end, lengths_only=True, depth=depth+1)
contents_end += 2
trailer = b'\x00\x00'
if contents_end > data_len:
raise ValueError(_INSUFFICIENT_DATA_MESSAGE % (contents_end - pointer, data_len - pointer))
if lengths_only:
return (pointer, contents_end)
return (
(
first_octet >> 6,
constructed,
tag,
encoded_data[start:pointer],
encoded_data[pointer:contents_end-len(trailer)],
trailer
),
contents_end
)
def _dump_header(class_, method, tag, contents):
"""
Constructs the header bytes for an ASN.1 object
:param class_:
An integer ASN.1 class value: 0 (universal), 1 (application),
2 (context), 3 (private)
:param method:
An integer ASN.1 method value: 0 (primitive), 1 (constructed)
:param tag:
An integer ASN.1 tag value
:param contents:
A byte string of the encoded byte contents
:return:
A byte string of the ASN.1 DER header
"""
header = b''
id_num = 0
id_num |= class_ << 6
id_num |= method << 5
if tag >= 31:
cont_bit = 0
while tag > 0:
header = chr_cls(cont_bit | (tag & 0x7f)) + header
if not cont_bit:
cont_bit = 0x80
tag = tag >> 7
header = chr_cls(id_num | 31) + header
else:
header += chr_cls(id_num | tag)
length = len(contents)
if length <= 127:
header += chr_cls(length)
else:
length_bytes = int_to_bytes(length)
header += chr_cls(0x80 | len(length_bytes))
header += length_bytes
return header

View File

@ -0,0 +1,84 @@
# coding: utf-8
"""
ASN.1 type classes for PDF signature structures. Adds extra oid mapping and
value parsing to asn1crypto.x509.Extension() and asn1crypto.xms.CMSAttribute().
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from .cms import CMSAttributeType, CMSAttribute
from .core import (
Boolean,
Integer,
Null,
ObjectIdentifier,
OctetString,
Sequence,
SequenceOf,
SetOf,
)
from .crl import CertificateList
from .ocsp import OCSPResponse
from .x509 import (
Extension,
ExtensionId,
GeneralName,
KeyPurposeId,
)
class AdobeArchiveRevInfo(Sequence):
_fields = [
('version', Integer)
]
class AdobeTimestamp(Sequence):
_fields = [
('version', Integer),
('location', GeneralName),
('requires_auth', Boolean, {'optional': True, 'default': False}),
]
class OtherRevInfo(Sequence):
_fields = [
('type', ObjectIdentifier),
('value', OctetString),
]
class SequenceOfCertificateList(SequenceOf):
_child_spec = CertificateList
class SequenceOfOCSPResponse(SequenceOf):
_child_spec = OCSPResponse
class SequenceOfOtherRevInfo(SequenceOf):
_child_spec = OtherRevInfo
class RevocationInfoArchival(Sequence):
_fields = [
('crl', SequenceOfCertificateList, {'explicit': 0, 'optional': True}),
('ocsp', SequenceOfOCSPResponse, {'explicit': 1, 'optional': True}),
('other_rev_info', SequenceOfOtherRevInfo, {'explicit': 2, 'optional': True}),
]
class SetOfRevocationInfoArchival(SetOf):
_child_spec = RevocationInfoArchival
ExtensionId._map['1.2.840.113583.1.1.9.2'] = 'adobe_archive_rev_info'
ExtensionId._map['1.2.840.113583.1.1.9.1'] = 'adobe_timestamp'
ExtensionId._map['1.2.840.113583.1.1.10'] = 'adobe_ppklite_credential'
Extension._oid_specs['adobe_archive_rev_info'] = AdobeArchiveRevInfo
Extension._oid_specs['adobe_timestamp'] = AdobeTimestamp
Extension._oid_specs['adobe_ppklite_credential'] = Null
KeyPurposeId._map['1.2.840.113583.1.1.5'] = 'pdf_signing'
CMSAttributeType._map['1.2.840.113583.1.1.8'] = 'adobe_revocation_info_archival'
CMSAttribute._oid_specs['adobe_revocation_info_archival'] = SetOfRevocationInfoArchival

View File

@ -0,0 +1,222 @@
# coding: utf-8
"""
Encoding DER to PEM and decoding PEM to DER. Exports the following items:
- armor()
- detect()
- unarmor()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import base64
import re
import sys
from ._errors import unwrap
from ._types import type_name as _type_name, str_cls, byte_cls
if sys.version_info < (3,):
from cStringIO import StringIO as BytesIO
else:
from io import BytesIO
def detect(byte_string):
"""
Detect if a byte string seems to contain a PEM-encoded block
:param byte_string:
A byte string to look through
:return:
A boolean, indicating if a PEM-encoded block is contained in the byte
string
"""
if not isinstance(byte_string, byte_cls):
raise TypeError(unwrap(
'''
byte_string must be a byte string, not %s
''',
_type_name(byte_string)
))
return byte_string.find(b'-----BEGIN') != -1 or byte_string.find(b'---- BEGIN') != -1
def armor(type_name, der_bytes, headers=None):
"""
Armors a DER-encoded byte string in PEM
:param type_name:
A unicode string that will be capitalized and placed in the header
and footer of the block. E.g. "CERTIFICATE", "PRIVATE KEY", etc. This
will appear as "-----BEGIN CERTIFICATE-----" and
"-----END CERTIFICATE-----".
:param der_bytes:
A byte string to be armored
:param headers:
An OrderedDict of the header lines to write after the BEGIN line
:return:
A byte string of the PEM block
"""
if not isinstance(der_bytes, byte_cls):
raise TypeError(unwrap(
'''
der_bytes must be a byte string, not %s
''' % _type_name(der_bytes)
))
if not isinstance(type_name, str_cls):
raise TypeError(unwrap(
'''
type_name must be a unicode string, not %s
''',
_type_name(type_name)
))
type_name = type_name.upper().encode('ascii')
output = BytesIO()
output.write(b'-----BEGIN ')
output.write(type_name)
output.write(b'-----\n')
if headers:
for key in headers:
output.write(key.encode('ascii'))
output.write(b': ')
output.write(headers[key].encode('ascii'))
output.write(b'\n')
output.write(b'\n')
b64_bytes = base64.b64encode(der_bytes)
b64_len = len(b64_bytes)
i = 0
while i < b64_len:
output.write(b64_bytes[i:i + 64])
output.write(b'\n')
i += 64
output.write(b'-----END ')
output.write(type_name)
output.write(b'-----\n')
return output.getvalue()
def _unarmor(pem_bytes):
"""
Convert a PEM-encoded byte string into one or more DER-encoded byte strings
:param pem_bytes:
A byte string of the PEM-encoded data
:raises:
ValueError - when the pem_bytes do not appear to be PEM-encoded bytes
:return:
A generator of 3-element tuples in the format: (object_type, headers,
der_bytes). The object_type is a unicode string of what is between
"-----BEGIN " and "-----". Examples include: "CERTIFICATE",
"PUBLIC KEY", "PRIVATE KEY". The headers is a dict containing any lines
in the form "Name: Value" that are right after the begin line.
"""
if not isinstance(pem_bytes, byte_cls):
raise TypeError(unwrap(
'''
pem_bytes must be a byte string, not %s
''',
_type_name(pem_bytes)
))
# Valid states include: "trash", "headers", "body"
state = 'trash'
headers = {}
base64_data = b''
object_type = None
found_start = False
found_end = False
for line in pem_bytes.splitlines(False):
if line == b'':
continue
if state == "trash":
# Look for a starting line since some CA cert bundle show the cert
# into in a parsed format above each PEM block
type_name_match = re.match(b'^(?:---- |-----)BEGIN ([A-Z0-9 ]+)(?: ----|-----)', line)
if not type_name_match:
continue
object_type = type_name_match.group(1).decode('ascii')
found_start = True
state = 'headers'
continue
if state == 'headers':
if line.find(b':') == -1:
state = 'body'
else:
decoded_line = line.decode('ascii')
name, value = decoded_line.split(':', 1)
headers[name] = value.strip()
continue
if state == 'body':
if line[0:5] in (b'-----', b'---- '):
der_bytes = base64.b64decode(base64_data)
yield (object_type, headers, der_bytes)
state = 'trash'
headers = {}
base64_data = b''
object_type = None
found_end = True
continue
base64_data += line
if not found_start or not found_end:
raise ValueError(unwrap(
'''
pem_bytes does not appear to contain PEM-encoded data - no
BEGIN/END combination found
'''
))
def unarmor(pem_bytes, multiple=False):
"""
Convert a PEM-encoded byte string into a DER-encoded byte string
:param pem_bytes:
A byte string of the PEM-encoded data
:param multiple:
If True, function will return a generator
:raises:
ValueError - when the pem_bytes do not appear to be PEM-encoded bytes
:return:
A 3-element tuple (object_name, headers, der_bytes). The object_name is
a unicode string of what is between "-----BEGIN " and "-----". Examples
include: "CERTIFICATE", "PUBLIC KEY", "PRIVATE KEY". The headers is a
dict containing any lines in the form "Name: Value" that are right
after the begin line.
"""
generator = _unarmor(pem_bytes)
if not multiple:
return next(generator)
return generator

View File

@ -0,0 +1,193 @@
# coding: utf-8
"""
ASN.1 type classes for PKCS#12 files. Exports the following items:
- CertBag()
- CrlBag()
- Pfx()
- SafeBag()
- SecretBag()
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from .algos import DigestInfo
from .cms import ContentInfo, SignedData
from .core import (
Any,
BMPString,
Integer,
ObjectIdentifier,
OctetString,
ParsableOctetString,
Sequence,
SequenceOf,
SetOf,
)
from .keys import PrivateKeyInfo, EncryptedPrivateKeyInfo
from .x509 import Certificate, KeyPurposeId
# The structures in this file are taken from https://tools.ietf.org/html/rfc7292
class MacData(Sequence):
_fields = [
('mac', DigestInfo),
('mac_salt', OctetString),
('iterations', Integer, {'default': 1}),
]
class Version(Integer):
_map = {
3: 'v3'
}
class AttributeType(ObjectIdentifier):
_map = {
# https://tools.ietf.org/html/rfc2985#page-18
'1.2.840.113549.1.9.20': 'friendly_name',
'1.2.840.113549.1.9.21': 'local_key_id',
# https://support.microsoft.com/en-us/kb/287547
'1.3.6.1.4.1.311.17.1': 'microsoft_local_machine_keyset',
# https://github.com/frohoff/jdk8u-dev-jdk/blob/master/src/share/classes/sun/security/pkcs12/PKCS12KeyStore.java
# this is a set of OIDs, representing key usage, the usual value is a SET of one element OID 2.5.29.37.0
'2.16.840.1.113894.746875.1.1': 'trusted_key_usage',
}
class SetOfAny(SetOf):
_child_spec = Any
class SetOfBMPString(SetOf):
_child_spec = BMPString
class SetOfOctetString(SetOf):
_child_spec = OctetString
class SetOfKeyPurposeId(SetOf):
_child_spec = KeyPurposeId
class Attribute(Sequence):
_fields = [
('type', AttributeType),
('values', None),
]
_oid_specs = {
'friendly_name': SetOfBMPString,
'local_key_id': SetOfOctetString,
'microsoft_csp_name': SetOfBMPString,
'trusted_key_usage': SetOfKeyPurposeId,
}
def _values_spec(self):
return self._oid_specs.get(self['type'].native, SetOfAny)
_spec_callbacks = {
'values': _values_spec
}
class Attributes(SetOf):
_child_spec = Attribute
class Pfx(Sequence):
_fields = [
('version', Version),
('auth_safe', ContentInfo),
('mac_data', MacData, {'optional': True})
]
_authenticated_safe = None
@property
def authenticated_safe(self):
if self._authenticated_safe is None:
content = self['auth_safe']['content']
if isinstance(content, SignedData):
content = content['content_info']['content']
self._authenticated_safe = AuthenticatedSafe.load(content.native)
return self._authenticated_safe
class AuthenticatedSafe(SequenceOf):
_child_spec = ContentInfo
class BagId(ObjectIdentifier):
_map = {
'1.2.840.113549.1.12.10.1.1': 'key_bag',
'1.2.840.113549.1.12.10.1.2': 'pkcs8_shrouded_key_bag',
'1.2.840.113549.1.12.10.1.3': 'cert_bag',
'1.2.840.113549.1.12.10.1.4': 'crl_bag',
'1.2.840.113549.1.12.10.1.5': 'secret_bag',
'1.2.840.113549.1.12.10.1.6': 'safe_contents',
}
class CertId(ObjectIdentifier):
_map = {
'1.2.840.113549.1.9.22.1': 'x509',
'1.2.840.113549.1.9.22.2': 'sdsi',
}
class CertBag(Sequence):
_fields = [
('cert_id', CertId),
('cert_value', ParsableOctetString, {'explicit': 0}),
]
_oid_pair = ('cert_id', 'cert_value')
_oid_specs = {
'x509': Certificate,
}
class CrlBag(Sequence):
_fields = [
('crl_id', ObjectIdentifier),
('crl_value', OctetString, {'explicit': 0}),
]
class SecretBag(Sequence):
_fields = [
('secret_type_id', ObjectIdentifier),
('secret_value', OctetString, {'explicit': 0}),
]
class SafeContents(SequenceOf):
pass
class SafeBag(Sequence):
_fields = [
('bag_id', BagId),
('bag_value', Any, {'explicit': 0}),
('bag_attributes', Attributes, {'optional': True}),
]
_oid_pair = ('bag_id', 'bag_value')
_oid_specs = {
'key_bag': PrivateKeyInfo,
'pkcs8_shrouded_key_bag': EncryptedPrivateKeyInfo,
'cert_bag': CertBag,
'crl_bag': CrlBag,
'secret_bag': SecretBag,
'safe_contents': SafeContents
}
SafeContents._child_spec = SafeBag

View File

@ -0,0 +1,310 @@
# coding: utf-8
"""
ASN.1 type classes for the time stamp protocol (TSP). Exports the following
items:
- TimeStampReq()
- TimeStampResp()
Also adds TimeStampedData() support to asn1crypto.cms.ContentInfo(),
TimeStampedData() and TSTInfo() support to
asn1crypto.cms.EncapsulatedContentInfo() and some oids and value parsers to
asn1crypto.cms.CMSAttribute().
Other type classes are defined that help compose the types listed above.
"""
from __future__ import unicode_literals, division, absolute_import, print_function
from .algos import DigestAlgorithm
from .cms import (
CMSAttribute,
CMSAttributeType,
ContentInfo,
ContentType,
EncapsulatedContentInfo,
)
from .core import (
Any,
BitString,
Boolean,
Choice,
GeneralizedTime,
IA5String,
Integer,
ObjectIdentifier,
OctetString,
Sequence,
SequenceOf,
SetOf,
UTF8String,
)
from .crl import CertificateList
from .x509 import (
Attributes,
CertificatePolicies,
GeneralName,
GeneralNames,
)
# The structures in this file are based on https://tools.ietf.org/html/rfc3161,
# https://tools.ietf.org/html/rfc4998, https://tools.ietf.org/html/rfc5544,
# https://tools.ietf.org/html/rfc5035, https://tools.ietf.org/html/rfc2634
class Version(Integer):
_map = {
0: 'v0',
1: 'v1',
2: 'v2',
3: 'v3',
4: 'v4',
5: 'v5',
}
class MessageImprint(Sequence):
_fields = [
('hash_algorithm', DigestAlgorithm),
('hashed_message', OctetString),
]
class Accuracy(Sequence):
_fields = [
('seconds', Integer, {'optional': True}),
('millis', Integer, {'implicit': 0, 'optional': True}),
('micros', Integer, {'implicit': 1, 'optional': True}),
]
class Extension(Sequence):
_fields = [
('extn_id', ObjectIdentifier),
('critical', Boolean, {'default': False}),
('extn_value', OctetString),
]
class Extensions(SequenceOf):
_child_spec = Extension
class TSTInfo(Sequence):
_fields = [
('version', Version),
('policy', ObjectIdentifier),
('message_imprint', MessageImprint),
('serial_number', Integer),
('gen_time', GeneralizedTime),
('accuracy', Accuracy, {'optional': True}),
('ordering', Boolean, {'default': False}),
('nonce', Integer, {'optional': True}),
('tsa', GeneralName, {'explicit': 0, 'optional': True}),
('extensions', Extensions, {'implicit': 1, 'optional': True}),
]
class TimeStampReq(Sequence):
_fields = [
('version', Version),
('message_imprint', MessageImprint),
('req_policy', ObjectIdentifier, {'optional': True}),
('nonce', Integer, {'optional': True}),
('cert_req', Boolean, {'default': False}),
('extensions', Extensions, {'implicit': 0, 'optional': True}),
]
class PKIStatus(Integer):
_map = {
0: 'granted',
1: 'granted_with_mods',
2: 'rejection',
3: 'waiting',
4: 'revocation_warning',
5: 'revocation_notification',
}
class PKIFreeText(SequenceOf):
_child_spec = UTF8String
class PKIFailureInfo(BitString):
_map = {
0: 'bad_alg',
2: 'bad_request',
5: 'bad_data_format',
14: 'time_not_available',
15: 'unaccepted_policy',
16: 'unaccepted_extensions',
17: 'add_info_not_available',
25: 'system_failure',
}
class PKIStatusInfo(Sequence):
_fields = [
('status', PKIStatus),
('status_string', PKIFreeText, {'optional': True}),
('fail_info', PKIFailureInfo, {'optional': True}),
]
class TimeStampResp(Sequence):
_fields = [
('status', PKIStatusInfo),
('time_stamp_token', ContentInfo),
]
class MetaData(Sequence):
_fields = [
('hash_protected', Boolean),
('file_name', UTF8String, {'optional': True}),
('media_type', IA5String, {'optional': True}),
('other_meta_data', Attributes, {'optional': True}),
]
class TimeStampAndCRL(Sequence):
_fields = [
('time_stamp', EncapsulatedContentInfo),
('crl', CertificateList, {'optional': True}),
]
class TimeStampTokenEvidence(SequenceOf):
_child_spec = TimeStampAndCRL
class DigestAlgorithms(SequenceOf):
_child_spec = DigestAlgorithm
class EncryptionInfo(Sequence):
_fields = [
('encryption_info_type', ObjectIdentifier),
('encryption_info_value', Any),
]
class PartialHashtree(SequenceOf):
_child_spec = OctetString
class PartialHashtrees(SequenceOf):
_child_spec = PartialHashtree
class ArchiveTimeStamp(Sequence):
_fields = [
('digest_algorithm', DigestAlgorithm, {'implicit': 0, 'optional': True}),
('attributes', Attributes, {'implicit': 1, 'optional': True}),
('reduced_hashtree', PartialHashtrees, {'implicit': 2, 'optional': True}),
('time_stamp', ContentInfo),
]
class ArchiveTimeStampSequence(SequenceOf):
_child_spec = ArchiveTimeStamp
class EvidenceRecord(Sequence):
_fields = [
('version', Version),
('digest_algorithms', DigestAlgorithms),
('crypto_infos', Attributes, {'implicit': 0, 'optional': True}),
('encryption_info', EncryptionInfo, {'implicit': 1, 'optional': True}),
('archive_time_stamp_sequence', ArchiveTimeStampSequence),
]
class OtherEvidence(Sequence):
_fields = [
('oe_type', ObjectIdentifier),
('oe_value', Any),
]
class Evidence(Choice):
_alternatives = [
('tst_evidence', TimeStampTokenEvidence, {'implicit': 0}),
('ers_evidence', EvidenceRecord, {'implicit': 1}),
('other_evidence', OtherEvidence, {'implicit': 2}),
]
class TimeStampedData(Sequence):
_fields = [
('version', Version),
('data_uri', IA5String, {'optional': True}),
('meta_data', MetaData, {'optional': True}),
('content', OctetString, {'optional': True}),
('temporal_evidence', Evidence),
]
class IssuerSerial(Sequence):
_fields = [
('issuer', GeneralNames),
('serial_number', Integer),
]
class ESSCertID(Sequence):
_fields = [
('cert_hash', OctetString),
('issuer_serial', IssuerSerial, {'optional': True}),
]
class ESSCertIDs(SequenceOf):
_child_spec = ESSCertID
class SigningCertificate(Sequence):
_fields = [
('certs', ESSCertIDs),
('policies', CertificatePolicies, {'optional': True}),
]
class SetOfSigningCertificates(SetOf):
_child_spec = SigningCertificate
class ESSCertIDv2(Sequence):
_fields = [
('hash_algorithm', DigestAlgorithm, {'default': {'algorithm': 'sha256'}}),
('cert_hash', OctetString),
('issuer_serial', IssuerSerial, {'optional': True}),
]
class ESSCertIDv2s(SequenceOf):
_child_spec = ESSCertIDv2
class SigningCertificateV2(Sequence):
_fields = [
('certs', ESSCertIDv2s),
('policies', CertificatePolicies, {'optional': True}),
]
class SetOfSigningCertificatesV2(SetOf):
_child_spec = SigningCertificateV2
EncapsulatedContentInfo._oid_specs['tst_info'] = TSTInfo
EncapsulatedContentInfo._oid_specs['timestamped_data'] = TimeStampedData
ContentInfo._oid_specs['timestamped_data'] = TimeStampedData
ContentType._map['1.2.840.113549.1.9.16.1.4'] = 'tst_info'
ContentType._map['1.2.840.113549.1.9.16.1.31'] = 'timestamped_data'
CMSAttributeType._map['1.2.840.113549.1.9.16.2.12'] = 'signing_certificate'
CMSAttribute._oid_specs['signing_certificate'] = SetOfSigningCertificates
CMSAttributeType._map['1.2.840.113549.1.9.16.2.47'] = 'signing_certificate_v2'
CMSAttribute._oid_specs['signing_certificate_v2'] = SetOfSigningCertificatesV2

View File

@ -0,0 +1,878 @@
# coding: utf-8
"""
Miscellaneous data helpers, including functions for converting integers to and
from bytes and UTC timezone. Exports the following items:
- OrderedDict()
- int_from_bytes()
- int_to_bytes()
- timezone.utc
- utc_with_dst
- create_timezone()
- inet_ntop()
- inet_pton()
- uri_to_iri()
- iri_to_uri()
"""
from __future__ import unicode_literals, division, absolute_import, print_function
import math
import sys
from datetime import datetime, date, timedelta, tzinfo
from ._errors import unwrap
from ._iri import iri_to_uri, uri_to_iri # noqa
from ._ordereddict import OrderedDict # noqa
from ._types import type_name
if sys.platform == 'win32':
from ._inet import inet_ntop, inet_pton
else:
from socket import inet_ntop, inet_pton # noqa
# Python 2
if sys.version_info <= (3,):
def int_to_bytes(value, signed=False, width=None):
"""
Converts an integer to a byte string
:param value:
The integer to convert
:param signed:
If the byte string should be encoded using two's complement
:param width:
If None, the minimal possible size (but at least 1),
otherwise an integer of the byte width for the return value
:return:
A byte string
"""
if value == 0 and width == 0:
return b''
# Handle negatives in two's complement
is_neg = False
if signed and value < 0:
is_neg = True
bits = int(math.ceil(len('%x' % abs(value)) / 2.0) * 8)
value = (value + (1 << bits)) % (1 << bits)
hex_str = '%x' % value
if len(hex_str) & 1:
hex_str = '0' + hex_str
output = hex_str.decode('hex')
if signed and not is_neg and ord(output[0:1]) & 0x80:
output = b'\x00' + output
if width is not None:
if len(output) > width:
raise OverflowError('int too big to convert')
if is_neg:
pad_char = b'\xFF'
else:
pad_char = b'\x00'
output = (pad_char * (width - len(output))) + output
elif is_neg and ord(output[0:1]) & 0x80 == 0:
output = b'\xFF' + output
return output
def int_from_bytes(value, signed=False):
"""
Converts a byte string to an integer
:param value:
The byte string to convert
:param signed:
If the byte string should be interpreted using two's complement
:return:
An integer
"""
if value == b'':
return 0
num = long(value.encode("hex"), 16) # noqa
if not signed:
return num
# Check for sign bit and handle two's complement
if ord(value[0:1]) & 0x80:
bit_len = len(value) * 8
return num - (1 << bit_len)
return num
class timezone(tzinfo): # noqa
"""
Implements datetime.timezone for py2.
Only full minute offsets are supported.
DST is not supported.
"""
def __init__(self, offset, name=None):
"""
:param offset:
A timedelta with this timezone's offset from UTC
:param name:
Name of the timezone; if None, generate one.
"""
if not timedelta(hours=-24) < offset < timedelta(hours=24):
raise ValueError('Offset must be in [-23:59, 23:59]')
if offset.seconds % 60 or offset.microseconds:
raise ValueError('Offset must be full minutes')
self._offset = offset
if name is not None:
self._name = name
elif not offset:
self._name = 'UTC'
else:
self._name = 'UTC' + _format_offset(offset)
def __eq__(self, other):
"""
Compare two timezones
:param other:
The other timezone to compare to
:return:
A boolean
"""
if type(other) != timezone:
return False
return self._offset == other._offset
def __getinitargs__(self):
"""
Called by tzinfo.__reduce__ to support pickle and copy.
:return:
offset and name, to be used for __init__
"""
return self._offset, self._name
def tzname(self, dt):
"""
:param dt:
A datetime object; ignored.
:return:
Name of this timezone
"""
return self._name
def utcoffset(self, dt):
"""
:param dt:
A datetime object; ignored.
:return:
A timedelta object with the offset from UTC
"""
return self._offset
def dst(self, dt):
"""
:param dt:
A datetime object; ignored.
:return:
Zero timedelta
"""
return timedelta(0)
timezone.utc = timezone(timedelta(0))
# Python 3
else:
from datetime import timezone # noqa
def int_to_bytes(value, signed=False, width=None):
"""
Converts an integer to a byte string
:param value:
The integer to convert
:param signed:
If the byte string should be encoded using two's complement
:param width:
If None, the minimal possible size (but at least 1),
otherwise an integer of the byte width for the return value
:return:
A byte string
"""
if width is None:
if signed:
if value < 0:
bits_required = abs(value + 1).bit_length()
else:
bits_required = value.bit_length()
if bits_required % 8 == 0:
bits_required += 1
else:
bits_required = value.bit_length()
width = math.ceil(bits_required / 8) or 1
return value.to_bytes(width, byteorder='big', signed=signed)
def int_from_bytes(value, signed=False):
"""
Converts a byte string to an integer
:param value:
The byte string to convert
:param signed:
If the byte string should be interpreted using two's complement
:return:
An integer
"""
return int.from_bytes(value, 'big', signed=signed)
def _format_offset(off):
"""
Format a timedelta into "[+-]HH:MM" format or "" for None
"""
if off is None:
return ''
mins = off.days * 24 * 60 + off.seconds // 60
sign = '-' if mins < 0 else '+'
return sign + '%02d:%02d' % divmod(abs(mins), 60)
class _UtcWithDst(tzinfo):
"""
Utc class where dst does not return None; required for astimezone
"""
def tzname(self, dt):
return 'UTC'
def utcoffset(self, dt):
return timedelta(0)
def dst(self, dt):
return timedelta(0)
utc_with_dst = _UtcWithDst()
_timezone_cache = {}
def create_timezone(offset):
"""
Returns a new datetime.timezone object with the given offset.
Uses cached objects if possible.
:param offset:
A datetime.timedelta object; It needs to be in full minutes and between -23:59 and +23:59.
:return:
A datetime.timezone object
"""
try:
tz = _timezone_cache[offset]
except KeyError:
tz = _timezone_cache[offset] = timezone(offset)
return tz
class extended_date(object):
"""
A datetime.datetime-like object that represents the year 0. This is just
to handle 0000-01-01 found in some certificates. Python's datetime does
not support year 0.
The proleptic gregorian calendar repeats itself every 400 years. Therefore,
the simplest way to format is to substitute year 2000.
"""
def __init__(self, year, month, day):
"""
:param year:
The integer 0
:param month:
An integer from 1 to 12
:param day:
An integer from 1 to 31
"""
if year != 0:
raise ValueError('year must be 0')
self._y2k = date(2000, month, day)
@property
def year(self):
"""
:return:
The integer 0
"""
return 0
@property
def month(self):
"""
:return:
An integer from 1 to 12
"""
return self._y2k.month
@property
def day(self):
"""
:return:
An integer from 1 to 31
"""
return self._y2k.day
def strftime(self, format):
"""
Formats the date using strftime()
:param format:
A strftime() format string
:return:
A str, the formatted date as a unicode string
in Python 3 and a byte string in Python 2
"""
# Format the date twice, once with year 2000, once with year 4000.
# The only differences in the result will be in the millennium. Find them and replace by zeros.
y2k = self._y2k.strftime(format)
y4k = self._y2k.replace(year=4000).strftime(format)
return ''.join('0' if (c2, c4) == ('2', '4') else c2 for c2, c4 in zip(y2k, y4k))
def isoformat(self):
"""
Formats the date as %Y-%m-%d
:return:
The date formatted to %Y-%m-%d as a unicode string in Python 3
and a byte string in Python 2
"""
return self.strftime('0000-%m-%d')
def replace(self, year=None, month=None, day=None):
"""
Returns a new datetime.date or asn1crypto.util.extended_date
object with the specified components replaced
:return:
A datetime.date or asn1crypto.util.extended_date object
"""
if year is None:
year = self.year
if month is None:
month = self.month
if day is None:
day = self.day
if year > 0:
cls = date
else:
cls = extended_date
return cls(
year,
month,
day
)
def __str__(self):
"""
:return:
A str representing this extended_date, e.g. "0000-01-01"
"""
return self.strftime('%Y-%m-%d')
def __eq__(self, other):
"""
Compare two extended_date objects
:param other:
The other extended_date to compare to
:return:
A boolean
"""
# datetime.date object wouldn't compare equal because it can't be year 0
if not isinstance(other, self.__class__):
return False
return self.__cmp__(other) == 0
def __ne__(self, other):
"""
Compare two extended_date objects
:param other:
The other extended_date to compare to
:return:
A boolean
"""
return not self.__eq__(other)
def _comparison_error(self, other):
raise TypeError(unwrap(
'''
An asn1crypto.util.extended_date object can only be compared to
an asn1crypto.util.extended_date or datetime.date object, not %s
''',
type_name(other)
))
def __cmp__(self, other):
"""
Compare two extended_date or datetime.date objects
:param other:
The other extended_date object to compare to
:return:
An integer smaller than, equal to, or larger than 0
"""
# self is year 0, other is >= year 1
if isinstance(other, date):
return -1
if not isinstance(other, self.__class__):
self._comparison_error(other)
if self._y2k < other._y2k:
return -1
if self._y2k > other._y2k:
return 1
return 0
def __lt__(self, other):
return self.__cmp__(other) < 0
def __le__(self, other):
return self.__cmp__(other) <= 0
def __gt__(self, other):
return self.__cmp__(other) > 0
def __ge__(self, other):
return self.__cmp__(other) >= 0
class extended_datetime(object):
"""
A datetime.datetime-like object that represents the year 0. This is just
to handle 0000-01-01 found in some certificates. Python's datetime does
not support year 0.
The proleptic gregorian calendar repeats itself every 400 years. Therefore,
the simplest way to format is to substitute year 2000.
"""
# There are 97 leap days during 400 years.
DAYS_IN_400_YEARS = 400 * 365 + 97
DAYS_IN_2000_YEARS = 5 * DAYS_IN_400_YEARS
def __init__(self, year, *args, **kwargs):
"""
:param year:
The integer 0
:param args:
Other positional arguments; see datetime.datetime.
:param kwargs:
Other keyword arguments; see datetime.datetime.
"""
if year != 0:
raise ValueError('year must be 0')
self._y2k = datetime(2000, *args, **kwargs)
@property
def year(self):
"""
:return:
The integer 0
"""
return 0
@property
def month(self):
"""
:return:
An integer from 1 to 12
"""
return self._y2k.month
@property
def day(self):
"""
:return:
An integer from 1 to 31
"""
return self._y2k.day
@property
def hour(self):
"""
:return:
An integer from 1 to 24
"""
return self._y2k.hour
@property
def minute(self):
"""
:return:
An integer from 1 to 60
"""
return self._y2k.minute
@property
def second(self):
"""
:return:
An integer from 1 to 60
"""
return self._y2k.second
@property
def microsecond(self):
"""
:return:
An integer from 0 to 999999
"""
return self._y2k.microsecond
@property
def tzinfo(self):
"""
:return:
If object is timezone aware, a datetime.tzinfo object, else None.
"""
return self._y2k.tzinfo
def utcoffset(self):
"""
:return:
If object is timezone aware, a datetime.timedelta object, else None.
"""
return self._y2k.utcoffset()
def time(self):
"""
:return:
A datetime.time object
"""
return self._y2k.time()
def date(self):
"""
:return:
An asn1crypto.util.extended_date of the date
"""
return extended_date(0, self.month, self.day)
def strftime(self, format):
"""
Performs strftime(), always returning a str
:param format:
A strftime() format string
:return:
A str of the formatted datetime
"""
# Format the datetime twice, once with year 2000, once with year 4000.
# The only differences in the result will be in the millennium. Find them and replace by zeros.
y2k = self._y2k.strftime(format)
y4k = self._y2k.replace(year=4000).strftime(format)
return ''.join('0' if (c2, c4) == ('2', '4') else c2 for c2, c4 in zip(y2k, y4k))
def isoformat(self, sep='T'):
"""
Formats the date as "%Y-%m-%d %H:%M:%S" with the sep param between the
date and time portions
:param set:
A single character of the separator to place between the date and
time
:return:
The formatted datetime as a unicode string in Python 3 and a byte
string in Python 2
"""
s = '0000-%02d-%02d%c%02d:%02d:%02d' % (self.month, self.day, sep, self.hour, self.minute, self.second)
if self.microsecond:
s += '.%06d' % self.microsecond
return s + _format_offset(self.utcoffset())
def replace(self, year=None, *args, **kwargs):
"""
Returns a new datetime.datetime or asn1crypto.util.extended_datetime
object with the specified components replaced
:param year:
The new year to substitute. None to keep it.
:param args:
Other positional arguments; see datetime.datetime.replace.
:param kwargs:
Other keyword arguments; see datetime.datetime.replace.
:return:
A datetime.datetime or asn1crypto.util.extended_datetime object
"""
if year:
return self._y2k.replace(year, *args, **kwargs)
return extended_datetime.from_y2k(self._y2k.replace(2000, *args, **kwargs))
def astimezone(self, tz):
"""
Convert this extended_datetime to another timezone.
:param tz:
A datetime.tzinfo object.
:return:
A new extended_datetime or datetime.datetime object
"""
return extended_datetime.from_y2k(self._y2k.astimezone(tz))
def timestamp(self):
"""
Return POSIX timestamp. Only supported in python >= 3.3
:return:
A float representing the seconds since 1970-01-01 UTC. This will be a negative value.
"""
return self._y2k.timestamp() - self.DAYS_IN_2000_YEARS * 86400
def __str__(self):
"""
:return:
A str representing this extended_datetime, e.g. "0000-01-01 00:00:00.000001-10:00"
"""
return self.isoformat(sep=' ')
def __eq__(self, other):
"""
Compare two extended_datetime objects
:param other:
The other extended_datetime to compare to
:return:
A boolean
"""
# Only compare against other datetime or extended_datetime objects
if not isinstance(other, (self.__class__, datetime)):
return False
# Offset-naive and offset-aware datetimes are never the same
if (self.tzinfo is None) != (other.tzinfo is None):
return False
return self.__cmp__(other) == 0
def __ne__(self, other):
"""
Compare two extended_datetime objects
:param other:
The other extended_datetime to compare to
:return:
A boolean
"""
return not self.__eq__(other)
def _comparison_error(self, other):
"""
Raises a TypeError about the other object not being suitable for
comparison
:param other:
The object being compared to
"""
raise TypeError(unwrap(
'''
An asn1crypto.util.extended_datetime object can only be compared to
an asn1crypto.util.extended_datetime or datetime.datetime object,
not %s
''',
type_name(other)
))
def __cmp__(self, other):
"""
Compare two extended_datetime or datetime.datetime objects
:param other:
The other extended_datetime or datetime.datetime object to compare to
:return:
An integer smaller than, equal to, or larger than 0
"""
if not isinstance(other, (self.__class__, datetime)):
self._comparison_error(other)
if (self.tzinfo is None) != (other.tzinfo is None):
raise TypeError("can't compare offset-naive and offset-aware datetimes")
diff = self - other
zero = timedelta(0)
if diff < zero:
return -1
if diff > zero:
return 1
return 0
def __lt__(self, other):
return self.__cmp__(other) < 0
def __le__(self, other):
return self.__cmp__(other) <= 0
def __gt__(self, other):
return self.__cmp__(other) > 0
def __ge__(self, other):
return self.__cmp__(other) >= 0
def __add__(self, other):
"""
Adds a timedelta
:param other:
A datetime.timedelta object to add.
:return:
A new extended_datetime or datetime.datetime object.
"""
return extended_datetime.from_y2k(self._y2k + other)
def __sub__(self, other):
"""
Subtracts a timedelta or another datetime.
:param other:
A datetime.timedelta or datetime.datetime or extended_datetime object to subtract.
:return:
If a timedelta is passed, a new extended_datetime or datetime.datetime object.
Else a datetime.timedelta object.
"""
if isinstance(other, timedelta):
return extended_datetime.from_y2k(self._y2k - other)
if isinstance(other, extended_datetime):
return self._y2k - other._y2k
if isinstance(other, datetime):
return self._y2k - other - timedelta(days=self.DAYS_IN_2000_YEARS)
return NotImplemented
def __rsub__(self, other):
return -(self - other)
@classmethod
def from_y2k(cls, value):
"""
Revert substitution of year 2000.
:param value:
A datetime.datetime object which is 2000 years in the future.
:return:
A new extended_datetime or datetime.datetime object.
"""
year = value.year - 2000
if year > 0:
new_cls = datetime
else:
new_cls = cls
return new_cls(
year,
value.month,
value.day,
value.hour,
value.minute,
value.second,
value.microsecond,
value.tzinfo
)

View File

@ -0,0 +1,6 @@
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function
__version__ = '1.5.1'
__version_info__ = (1, 5, 1)

File diff suppressed because it is too large Load Diff

181
jc/parsers/x509_cert.py Normal file
View File

@ -0,0 +1,181 @@
"""jc - JSON Convert X.509 Certificate format file parser
This parser will convert DER, PEM, and PKCS#12 encoded X.509 certificates.
Usage (cli):
$ cat certificate.pem | jc --x509-cert
Usage (module):
import jc
result = jc.parse('x509_cert', x509_cert_file_output)
Schema:
[
{
"x509_cert": string,
"bar": boolean,
"baz": integer
}
]
Examples:
$ cat certificate.pem | jc --x509-cert -p
[]
$ cat certificate.der | jc --x509-cert -p -r
[]
"""
import binascii
from collections import OrderedDict
from datetime import datetime
from typing import List, Dict, Union
import jc.utils
from jc.parsers.asn1crypto import pem, x509
class info():
"""Provides parser metadata (version, author, etc.)"""
version = '1.0'
description = 'X.509 certificate file parser'
author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com'
details = 'Using the asn1crypto library at https://github.com/wbond/asn1crypto/releases/tag/1.5.1'
compatible = ['linux', 'darwin', 'cygwin', 'win32', 'aix', 'freebsd']
__version__ = info.version
def _process(proc_data: List[Dict]) -> List[Dict]:
"""
Final processing to conform to the schema.
Parameters:
proc_data: (List of Dictionaries) raw structured data to process
Returns:
List of Dictionaries. Structured to conform to the schema.
"""
# process the data here
# rebuild output for added semantic information
# use helper functions in jc.utils for int, float, bool
# conversions and timestamps
return proc_data
def _b2a(byte_string: bytes) -> str:
return binascii.hexlify(byte_string, ':').decode('utf-8')
def _fix_objects(obj):
"""
Recursively traverse the nested dictionary or list and convert objects
into JSON serializable types.
"""
if isinstance(obj, set):
obj = list(obj)
if isinstance(obj, OrderedDict):
obj = dict(obj)
if isinstance(obj, datetime):
obj = int(round(obj.timestamp()))
if isinstance(obj, bytes):
obj = _b2a(obj)
if isinstance(obj, dict):
for k, v in obj.items():
if isinstance(v, datetime):
v = int(round(v.timestamp()))
obj.update({k: v})
continue
if isinstance(v, bytes):
v = _b2a(v)
obj.update({k: v})
continue
if isinstance(v, set):
v = list(v)
obj.update({k: v})
if isinstance(v, OrderedDict):
v = dict(v)
obj.update({k: v})
if isinstance(v, dict):
obj.update({k: _fix_objects(v)})
continue
if isinstance(v, list):
newlist =[]
for i in v:
newlist.append(_fix_objects(i))
obj.update({k: newlist})
continue
if isinstance(obj, list):
new_list = []
for i in obj:
new_list.append(_fix_objects(i))
obj = new_list
return obj
def parse(
data: Union[str, bytes],
raw: bool = False,
quiet: bool = False
) -> List[Dict]:
"""
Main text parsing function
Parameters:
data: (string) text data to parse
raw: (boolean) unprocessed output if True
quiet: (boolean) suppress warning messages if True
Returns:
List of Dictionaries. Raw or processed structured data.
"""
jc.utils.compatibility(__name__, info.compatible, quiet)
raw_output: List = []
if jc.utils.has_data(data):
# convert to bytes, if not already for PEM detection since that's
# what pem.detect() needs. (cli.py will auto-convert to UTF-8 if it can)
try:
der_bytes = bytes(data, 'utf-8') # type: ignore
except TypeError:
der_bytes = data # type: ignore
if pem.detect(der_bytes):
type_name, headers, der_bytes = pem.unarmor(der_bytes)
cert = x509.Certificate.load(der_bytes)
# convert bytes to hexadecimal representation strings for JSON conversion: _b2a()
# convert datetime objects to timestamp integers for JSON conversion:
# timestamp = int(round(dt_obj.timestamp()))
clean_dict = _fix_objects(cert.native)
# import pprint
# pprint.pprint(clean_dict)
raw_output = clean_dict
return raw_output if raw else _process(raw_output)

View File

@ -140,22 +140,28 @@ def compatibility(mod_name: str, compatible: List, quiet: bool = False) -> None:
])
def has_data(data: str) -> bool:
def has_data(data: Union[str, bytes]) -> bool:
"""
Checks if the input contains data. If there are any non-whitespace
characters then return `True`, else return `False`.
Checks if the string input contains data. If there are any
non-whitespace characters then return `True`, else return `False`.
For bytes, returns True if there is any data.
Parameters:
data: (string) input to check whether it contains data
data: (string, bytes) input to check whether it contains data
Returns:
Boolean True if input string (data) contains non-whitespace
characters, otherwise False
characters, otherwise False. For bytes data, returns
True if there is any data, otherwise False.
"""
if isinstance(data, str):
return bool(data and not data.isspace())
return bool(data)
def convert_to_int(value: Union[str, float]) -> Optional[int]:
"""