X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=pyderasn.py;h=43475f19fbdb486580b6c198a1a629698be8b90b;hb=e0842b448ce1d78c479eeec663900b9826b63bf6;hp=5adc5b068d17915f0c543144b66557b9fc355f30;hpb=3b5f3e7f4219b79410bd7b3e1a3d41be51f78f0a;p=pyderasn.git diff --git a/pyderasn.py b/pyderasn.py index 5adc5b0..43475f1 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # coding: utf-8 +# cython: language_level=3 # PyDERASN -- Python ASN.1 DER/BER codec with abstract structures # Copyright (C) 2017-2020 Sergey Matveev # @@ -74,9 +75,17 @@ tags simultaneously. There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc` functions, allowing you to easily create ``CONTEXT`` ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag -number. Pay attention that explicit tags always have *constructed* tag -(``tag_ctxc``), but implicit tags for primitive types are primitive -(``tag_ctxp``). +number. + +.. note:: + + EXPLICIT tags always have **constructed** tag. PyDERASN does not + explicitly check correctness of schema input here. + +.. note:: + + Implicit tags have **primitive** (``tag_ctxp``) encoding for + primitive values. :: @@ -340,7 +349,8 @@ Let's parse that output, human:: Only applicable to BER encoded data. If object has BER-specific encoding, then ``BER`` will be shown. It does not depend on indefinite length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING`` - (and its derivatives), ``SET``, ``SET OF`` could be BERed. + (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime`` + could be BERed. .. _definedby: @@ -418,7 +428,7 @@ defines_by_path context option ______________________________ Sometimes you either can not or do not want to explicitly set *defines* -in the scheme. You can dynamically apply those definitions when calling +in the schema. You can dynamically apply those definitions when calling ``.decode()`` method. Specify ``defines_by_path`` key in the :ref:`decode context `. Its @@ -490,8 +500,8 @@ constructed primitive types should be parsed successfully. * If object is encoded in BER form (not the DER one), then ``ber_encoded`` attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET - STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF`` - can contain it. + STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``, + ``UTCTime``, ``GeneralizedTime`` can contain it. * If object has an indefinite length encoding, then its ``lenindef`` attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``, ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can @@ -540,12 +550,12 @@ _______ Integer _______ .. autoclass:: pyderasn.Integer - :members: __init__ + :members: __init__, named BitString _________ .. autoclass:: pyderasn.BitString - :members: __init__ + :members: __init__, bit_len, named OctetString ___________ @@ -577,7 +587,7 @@ _____________ PrintableString _______________ .. autoclass:: pyderasn.PrintableString - :members: __init__ + :members: __init__, allow_asterisk, allow_ampersand UTCTime _______ @@ -587,6 +597,7 @@ _______ GeneralizedTime _______________ .. autoclass:: pyderasn.GeneralizedTime + :members: __init__, todatetime Special types ------------- @@ -594,7 +605,7 @@ Special types Choice ______ .. autoclass:: pyderasn.Choice - :members: __init__ + :members: __init__, choice, value PrimitiveTypes ______________ @@ -651,6 +662,115 @@ Various .. autoclass:: pyderasn.ObjNotReady .. autoclass:: pyderasn.InvalidValueType .. autoclass:: pyderasn.BoundsError + +.. _cmdline: + +Command-line usage +------------------ + +You can decode DER/BER files using command line abilities:: + + $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file + +If there is no schema for your file, then you can try parsing it without, +but of course IMPLICIT tags will often make it impossible. But result is +good enough for the certificate above:: + + $ python -m pyderasn path/to/file + 0 [1,3,1604] . >: SEQUENCE OF + 4 [1,3,1453] . . >: SEQUENCE OF + 8 [0,0, 5] . . . . >: [0] ANY + . . . . . A0:03:02:01:02 + 13 [1,1, 3] . . . . >: INTEGER 61595 + 18 [1,1, 13] . . . . >: SEQUENCE OF + 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [1,1, 0] . . . . . . >: NULL + 33 [1,3, 274] . . . . >: SEQUENCE OF + 37 [1,1, 11] . . . . . . >: SET OF + 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF + 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6 + 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES + [...] + 1409 [1,1, 50] . . . . . . >: SEQUENCE OF + 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1 + 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes + . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16 + . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63 + . . . . . . . . . 61:2E:63:6F:6D:2F + 1461 [1,1, 13] . . >: SEQUENCE OF + 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 1474 [1,1, 0] . . . . >: NULL + 1476 [1,2, 129] . . >: BIT STRING 1024 bits + . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD + . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C + [...] + +Human readable OIDs +___________________ + +If you have got dictionaries with ObjectIdentifiers, like example one +from ``tests/test_crts.py``:: + + stroid2name = { + "1.2.840.113549.1.1.1": "id-rsaEncryption", + "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption", + [...] + "2.5.4.10": "id-at-organizationName", + "2.5.4.11": "id-at-organizationalUnitName", + } + +then you can pass it to pretty printer to see human readable OIDs:: + + $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file + [...] + 37 [1,1, 11] . . . . . . >: SET OF + 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF + 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6) + 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES + 50 [1,1, 18] . . . . . . >: SET OF + 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF + 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8) + 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona + 70 [1,1, 18] . . . . . . >: SET OF + 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF + 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7) + 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona + [...] + +Decode paths +____________ + +Each decoded element has so-called decode path: sequence of structure +names it is passing during the decode process. Each element has its own +unique path inside the whole ASN.1 tree. You can print it out with +``--print-decode-path`` option:: + + $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file + 0 [1,3,1604] Certificate SEQUENCE [] + 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate] + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version] + 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber] + 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature] + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm] + 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters] + . . . . 05:00 + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer] + 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence] + 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0] + 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0] + 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type] + 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value] + . . . . . . . 13:02:45:53 + 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6] + [...] + +Now you can print only the specified tree, for example signature algorithm:: + + $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file + 18 [1,1, 13] AlgorithmIdentifier SEQUENCE + 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL + . . 05:00 """ from codecs import getdecoder @@ -659,10 +779,11 @@ from collections import namedtuple from collections import OrderedDict from copy import copy from datetime import datetime +from datetime import timedelta from math import ceil -from os import environ from string import ascii_letters from string import digits +from sys import version_info from unicodedata import category as unicat from six import add_metaclass @@ -687,7 +808,7 @@ except ImportError: # pragma: no cover def colored(what, *args, **kwargs): return what -__version__ = "6.0" +__version__ = "6.3" __all__ = ( "Any", @@ -761,6 +882,20 @@ EOC = b"\x00\x00" EOC_LEN = len(EOC) LENINDEF = b"\x80" # length indefinite mark LENINDEF_PP_CHAR = "I" if PY2 else "∞" +NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__} +SET01 = frozenset("01") +DECIMALS = frozenset(digits) +DECIMAL_SIGNS = ".," + + +def pureint(value): + if not set(value) <= DECIMALS: + raise ValueError("non-pure integer") + return int(value) + +def fractions2float(fractions_raw): + pureint(fractions_raw) + return float("0." + fractions_raw) ######################################################################## @@ -1052,6 +1187,21 @@ class AutoAddSlots(type): return type.__new__(cls, name, bases, _dict) +BasicState = namedtuple("BasicState", ( + "version", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + @add_metaclass(AutoAddSlots) class Obj(object): """Common ASN.1 object class @@ -1124,17 +1274,16 @@ class Obj(object): def __setstate__(self, state): if state.version != __version__: raise ValueError("data is pickled by different PyDERASN version") - self.tag = self.tag_default - self._value = None - self._expl = None - self.default = None - self.optional = False - self.offset = 0 - self.llen = 0 - self.vlen = 0 - self.expl_lenindef = False - self.lenindef = False - self.ber_encoded = False + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded @property def tlen(self): @@ -1203,7 +1352,7 @@ class Obj(object): :param ctx: optional :ref:`context ` governing decoding process :param tag_only: decode only the tag, without length and contents (used only in Choice and Set structures, trying to - determine if tag satisfies the scheme) + determine if tag satisfies the schema) :param _ctx_immutable: do we need to ``copy.copy()`` ``ctx`` before using it? :returns: (Obj, remaining data) @@ -1477,7 +1626,7 @@ PP = namedtuple("PP", ( "lenindef", "ber_encoded", "bered", -)) +), **NAMEDTUPLE_KWARGS) def _pp( @@ -1627,9 +1776,9 @@ def pp_console_row( with_colours, )) if with_blob: - if isinstance(pp.blob, binary_type): + if pp.blob.__class__ == binary_type: cols.append(hexenc(pp.blob)) - elif isinstance(pp.blob, tuple): + elif pp.blob.__class__ == tuple: cols.append(", ".join(pp.blob)) if pp.optional: cols.append(_colourize("OPTIONAL", "red", with_colours)) @@ -1649,12 +1798,12 @@ def pp_console_blob(pp, decode_path_len_decrease=0): decode_path_len = len(pp.decode_path) - decode_path_len_decrease if decode_path_len > 0: cols.append(" ." * (decode_path_len + 1)) - if isinstance(pp.blob, binary_type): + if pp.blob.__class__ == binary_type: blob = hexenc(pp.blob).upper() for i in six_xrange(0, len(blob), 32): chunk = blob[i:i + 32] yield " ".join(cols + [colonize_hex(chunk)]) - elif isinstance(pp.blob, tuple): + elif pp.blob.__class__ == tuple: yield " ".join(cols + [", ".join(pp.blob)]) @@ -1724,20 +1873,11 @@ def pprint( # ASN.1 primitive types ######################################################################## -BooleanState = namedtuple("BooleanState", ( - "version", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +BooleanState = namedtuple( + "BooleanState", + BasicState._fields + ("value",), + **NAMEDTUPLE_KWARGS +) class Boolean(Obj): @@ -1784,7 +1924,7 @@ class Boolean(Obj): self._value = default def _value_sanitize(self, value): - if isinstance(value, bool): + if value.__class__ == bool: return value if issubclass(value.__class__, Boolean): return value._value @@ -1797,7 +1937,6 @@ class Boolean(Obj): def __getstate__(self): return BooleanState( __version__, - self._value, self.tag, self._expl, self.default, @@ -1808,21 +1947,12 @@ class Boolean(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, ) def __setstate__(self, state): super(Boolean, self).__setstate__(state) self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __nonzero__(self): self._assert_ready() @@ -1833,7 +1963,7 @@ class Boolean(Obj): return self._value def __eq__(self, their): - if isinstance(their, bool): + if their.__class__ == bool: return self._value == their if not issubclass(their.__class__, Boolean): return False @@ -1965,23 +2095,11 @@ class Boolean(Obj): yield pp -IntegerState = namedtuple("IntegerState", ( - "version", - "specs", - "value", - "bound_min", - "bound_max", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +IntegerState = namedtuple( + "IntegerState", + BasicState._fields + ("specs", "value", "bound_min", "bound_max"), + **NAMEDTUPLE_KWARGS +) class Integer(Obj): @@ -2047,7 +2165,7 @@ class Integer(Obj): super(Integer, self).__init__(impl, expl, default, optional, _decoded) self._value = value specs = getattr(self, "schema", {}) if _specs is None else _specs - self.specs = specs if isinstance(specs, dict) else dict(specs) + self.specs = specs if specs.__class__ == dict else dict(specs) self._bound_min, self._bound_max = getattr( self, "bounds", @@ -2071,7 +2189,7 @@ class Integer(Obj): pass elif issubclass(value.__class__, Integer): value = value._value - elif isinstance(value, str): + elif value.__class__ == str: value = self.specs.get(value) if value is None: raise ObjUnknown("integer value: %s" % value) @@ -2088,10 +2206,6 @@ class Integer(Obj): def __getstate__(self): return IntegerState( __version__, - self.specs, - self._value, - self._bound_min, - self._bound_max, self.tag, self._expl, self.default, @@ -2102,6 +2216,10 @@ class Integer(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + self._value, + self._bound_min, + self._bound_max, ) def __setstate__(self, state): @@ -2110,16 +2228,6 @@ class Integer(Obj): self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __int__(self): self._assert_ready() @@ -2149,6 +2257,8 @@ class Integer(Obj): @property def named(self): + """Return named representation (if exists) of the value + """ for name, value in iteritems(self.specs): if value == self._value: return name @@ -2334,24 +2444,11 @@ class Integer(Obj): yield pp -SET01 = frozenset(("0", "1")) -BitStringState = namedtuple("BitStringState", ( - "version", - "specs", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "tag_constructed", - "defined", -)) +BitStringState = namedtuple( + "BitStringState", + BasicState._fields + ("specs", "value", "tag_constructed", "defined"), + **NAMEDTUPLE_KWARGS +) class BitString(Obj): @@ -2429,7 +2526,7 @@ class BitString(Obj): """ super(BitString, self).__init__(impl, expl, default, optional, _decoded) specs = getattr(self, "schema", {}) if _specs is None else _specs - self.specs = specs if isinstance(specs, dict) else dict(specs) + self.specs = specs if specs.__class__ == dict else dict(specs) self._value = None if value is None else self._value_sanitize(value) if default is not None: default = self._value_sanitize(default) @@ -2475,14 +2572,14 @@ class BitString(Obj): len(value) * 4, hexdec(value + ("" if len(value) % 2 == 0 else "0")), ) - if isinstance(value, binary_type): + if value.__class__ == binary_type: return (len(value) * 8, value) raise InvalidValueType((self.__class__, string_types, binary_type)) - if isinstance(value, tuple): + if value.__class__ == tuple: if ( len(value) == 2 and isinstance(value[0], integer_types) and - isinstance(value[1], binary_type) + value[1].__class__ == binary_type ): return value bits = [] @@ -2509,8 +2606,6 @@ class BitString(Obj): def __getstate__(self): return BitStringState( __version__, - self.specs, - self._value, self.tag, self._expl, self.default, @@ -2521,6 +2616,8 @@ class BitString(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + self._value, self.tag_constructed, self.defined, ) @@ -2529,16 +2626,6 @@ class BitString(Obj): super(BitString, self).__setstate__(state) self.specs = state.specs self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.tag_constructed = state.tag_constructed self.defined = state.defined @@ -2549,6 +2636,8 @@ class BitString(Obj): @property def bit_len(self): + """Returns number of bits in the string + """ self._assert_ready() return self._value[0] @@ -2557,7 +2646,7 @@ class BitString(Obj): return self._value[1] def __eq__(self, their): - if isinstance(their, bytes): + if their.__class__ == bytes: return self._value[1] == their if not issubclass(their.__class__, BitString): return False @@ -2569,6 +2658,10 @@ class BitString(Obj): @property def named(self): + """Named representation (if exists) of the bits + + :returns: [str(name), ...] + """ return [name for name, bit in iteritems(self.specs) if self[bit]] def __call__( @@ -2589,7 +2682,7 @@ class BitString(Obj): ) def __getitem__(self, key): - if isinstance(key, int): + if key.__class__ == int: bit_len, octets = self._value if key >= bit_len: return False @@ -2614,64 +2707,6 @@ class BitString(Obj): octets, )) - def _decode_chunk(self, lv, offset, decode_path): - try: - l, llen, v = len_decode(lv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if l > len(v): - raise NotEnoughData( - "encoded length is longer than data", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if l == 0: - raise NotEnoughData( - "zero length", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - pad_size = byte2int(v) - if l == 1 and pad_size != 0: - raise DecodeError( - "invalid empty value", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if pad_size > 7: - raise DecodeError( - "too big pad", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0: - raise DecodeError( - "invalid pad", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - v, tail = v[:l], v[l:] - obj = self.__class__( - value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), - impl=self.tag, - expl=self._expl, - default=self.default, - optional=self.optional, - _specs=self.specs, - _decoded=(offset, llen, l), - ) - return obj, tail - def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) @@ -2685,23 +2720,8 @@ class BitString(Obj): if t == self.tag: if tag_only: # pragma: no cover return None - return self._decode_chunk(lv, offset, decode_path) - if t == self.tag_constructed: - if not ctx.get("bered", False): - raise DecodeError( - "unallowed BER constructed encoding", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: # pragma: no cover - return None - lenindef = False try: l, llen, v = len_decode(lv) - except LenIndefForm: - llen, l, v = 1, 0, lv[1:] - lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -2716,100 +2736,170 @@ class BitString(Obj): decode_path=decode_path, offset=offset, ) - if not lenindef and l == 0: + if l == 0: raise NotEnoughData( "zero length", klass=self.__class__, decode_path=decode_path, offset=offset, ) - chunks = [] - sub_offset = offset + tlen + llen - vlen = 0 - while True: - if lenindef: - if v[:EOC_LEN].tobytes() == EOC: - break - else: - if vlen == l: - break - if vlen > l: - raise DecodeError( - "chunk out of bounds", - klass=self.__class__, - decode_path=decode_path + (str(len(chunks) - 1),), - offset=chunks[-1].offset, - ) - sub_decode_path = decode_path + (str(len(chunks)),) - try: - chunk, v_tail = BitString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) - except TagMismatch: - raise DecodeError( - "expected BitString encoded chunk", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - chunks.append(chunk) - sub_offset += chunk.tlvlen - vlen += chunk.tlvlen - v = v_tail - if len(chunks) == 0: + pad_size = byte2int(v) + if l == 1 and pad_size != 0: raise DecodeError( - "no chunks", + "invalid empty value", klass=self.__class__, decode_path=decode_path, offset=offset, ) - values = [] - bit_len = 0 - for chunk_i, chunk in enumerate(chunks[:-1]): - if chunk.bit_len % 8 != 0: - raise DecodeError( - "BitString chunk is not multiple of 8 bits", - klass=self.__class__, - decode_path=decode_path + (str(chunk_i),), - offset=chunk.offset, - ) - values.append(bytes(chunk)) - bit_len += chunk.bit_len - chunk_last = chunks[-1] - values.append(bytes(chunk_last)) - bit_len += chunk_last.bit_len + if pad_size > 7: + raise DecodeError( + "too big pad", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0: + raise DecodeError( + "invalid pad", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] obj = self.__class__( - value=(bit_len, b"".join(values)), + value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), impl=self.tag, expl=self._expl, default=self.default, optional=self.optional, _specs=self.specs, - _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + _decoded=(offset, llen, l), ) - obj.lenindef = lenindef - obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - - def __repr__(self): - return pp_console_row(next(self.pps())) - - def pps(self, decode_path=()): - value = None - blob = None - if self.ready: - bit_len, blob = self._value - value = "%d bits" % bit_len + return obj, tail + if t != self.tag_constructed: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not ctx.get("bered", False): + raise DecodeError( + "unallowed BER constructed encoding", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if tag_only: # pragma: no cover + return None + lenindef = False + try: + l, llen, v = len_decode(lv) + except LenIndefForm: + llen, l, v = 1, 0, lv[1:] + lenindef = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not lenindef and l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if lenindef: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + "chunk out of bounds", + klass=self.__class__, + decode_path=decode_path + (str(len(chunks) - 1),), + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = BitString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ) + except TagMismatch: + raise DecodeError( + "expected BitString encoded chunk", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if len(chunks) == 0: + raise DecodeError( + "no chunks", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + values = [] + bit_len = 0 + for chunk_i, chunk in enumerate(chunks[:-1]): + if chunk.bit_len % 8 != 0: + raise DecodeError( + "BitString chunk is not multiple of 8 bits", + klass=self.__class__, + decode_path=decode_path + (str(chunk_i),), + offset=chunk.offset, + ) + values.append(bytes(chunk)) + bit_len += chunk.bit_len + chunk_last = chunks[-1] + values.append(bytes(chunk_last)) + bit_len += chunk_last.bit_len + obj = self.__class__( + value=(bit_len, b"".join(values)), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _specs=self.specs, + _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + ) + obj.lenindef = lenindef + obj.ber_encoded = True + return obj, (v[EOC_LEN:] if lenindef else v) + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + value = None + blob = None + if self.ready: + bit_len, blob = self._value + value = "%d bits" % bit_len if len(self.specs) > 0: blob = tuple(self.named) yield _pp( @@ -2845,24 +2935,17 @@ class BitString(Obj): yield pp -OctetStringState = namedtuple("OctetStringState", ( - "version", - "value", - "bound_min", - "bound_max", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "tag_constructed", - "defined", -)) +OctetStringState = namedtuple( + "OctetStringState", + BasicState._fields + ( + "value", + "bound_min", + "bound_max", + "tag_constructed", + "defined", + ), + **NAMEDTUPLE_KWARGS +) class OctetString(Obj): @@ -2902,6 +2985,7 @@ class OctetString(Obj): default=None, optional=False, _decoded=(0, 0, 0), + ctx=None, ): """ :param value: set the value. Either binary type, or @@ -2940,7 +3024,7 @@ class OctetString(Obj): ) def _value_sanitize(self, value): - if isinstance(value, binary_type): + if value.__class__ == binary_type: pass elif issubclass(value.__class__, OctetString): value = value._value @@ -2957,9 +3041,6 @@ class OctetString(Obj): def __getstate__(self): return OctetStringState( __version__, - self._value, - self._bound_min, - self._bound_max, self.tag, self._expl, self.default, @@ -2970,6 +3051,9 @@ class OctetString(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, + self._bound_min, + self._bound_max, self.tag_constructed, self.defined, ) @@ -2979,16 +3063,6 @@ class OctetString(Obj): self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.tag_constructed = state.tag_constructed self.defined = state.defined @@ -2997,7 +3071,7 @@ class OctetString(Obj): return self._value def __eq__(self, their): - if isinstance(their, binary_type): + if their.__class__ == binary_type: return self._value == their if not issubclass(their.__class__, OctetString): return False @@ -3039,50 +3113,6 @@ class OctetString(Obj): self._value, )) - def _decode_chunk(self, lv, offset, decode_path): - try: - l, llen, v = len_decode(lv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if l > len(v): - raise NotEnoughData( - "encoded length is longer than data", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - v, tail = v[:l], v[l:] - try: - obj = self.__class__( - value=v.tobytes(), - bounds=(self._bound_min, self._bound_max), - impl=self.tag, - expl=self._expl, - default=self.default, - optional=self.optional, - _decoded=(offset, llen, l), - ) - except DecodeError as err: - raise DecodeError( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - except BoundsError as err: - raise DecodeError( - msg=str(err), - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - return obj, tail - def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) @@ -3096,23 +3126,8 @@ class OctetString(Obj): if t == self.tag: if tag_only: return None - return self._decode_chunk(lv, offset, decode_path) - if t == self.tag_constructed: - if not ctx.get("bered", False): - raise DecodeError( - "unallowed BER constructed encoding", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: - return None - lenindef = False try: l, llen, v = len_decode(lv) - except LenIndefForm: - llen, l, v = 1, 0, lv[1:] - lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -3127,53 +3142,17 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - chunks = [] - sub_offset = offset + tlen + llen - vlen = 0 - while True: - if lenindef: - if v[:EOC_LEN].tobytes() == EOC: - break - else: - if vlen == l: - break - if vlen > l: - raise DecodeError( - "chunk out of bounds", - klass=self.__class__, - decode_path=decode_path + (str(len(chunks) - 1),), - offset=chunks[-1].offset, - ) - sub_decode_path = decode_path + (str(len(chunks)),) - try: - chunk, v_tail = OctetString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) - except TagMismatch: - raise DecodeError( - "expected OctetString encoded chunk", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - chunks.append(chunk) - sub_offset += chunk.tlvlen - vlen += chunk.tlvlen - v = v_tail + v, tail = v[:l], v[l:] try: obj = self.__class__( - value=b"".join(bytes(chunk) for chunk in chunks), + value=v.tobytes(), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, default=self.default, optional=self.optional, - _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + _decoded=(offset, llen, l), + ctx=ctx, ) except DecodeError as err: raise DecodeError( @@ -3189,14 +3168,108 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - obj.lenindef = lenindef - obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) + return obj, tail + if t != self.tag_constructed: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not ctx.get("bered", False): + raise DecodeError( + "unallowed BER constructed encoding", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if tag_only: + return None + lenindef = False + try: + l, llen, v = len_decode(lv) + except LenIndefForm: + llen, l, v = 1, 0, lv[1:] + lenindef = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if lenindef: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + "chunk out of bounds", + klass=self.__class__, + decode_path=decode_path + (str(len(chunks) - 1),), + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = OctetString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ) + except TagMismatch: + raise DecodeError( + "expected OctetString encoded chunk", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + try: + obj = self.__class__( + value=b"".join(bytes(chunk) for chunk in chunks), + bounds=(self._bound_min, self._bound_max), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + ctx=ctx, + ) + except DecodeError as err: + raise DecodeError( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + except BoundsError as err: + raise DecodeError( + msg=str(err), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj.lenindef = lenindef + obj.ber_encoded = True + return obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -3235,19 +3308,7 @@ class OctetString(Obj): yield pp -NullState = namedtuple("NullState", ( - "version", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS) class Null(Obj): @@ -3287,28 +3348,15 @@ class Null(Obj): __version__, self.tag, self._expl, - self.default, - self.optional, - self.offset, - self.llen, - self.vlen, - self.expl_lenindef, - self.lenindef, - self.ber_encoded, - ) - - def __setstate__(self, state): - super(Null, self).__setstate__(state) - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) def __eq__(self, their): if not issubclass(their.__class__, Null): @@ -3403,21 +3451,11 @@ class Null(Obj): yield pp -ObjectIdentifierState = namedtuple("ObjectIdentifierState", ( - "version", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "defines", -)) +ObjectIdentifierState = namedtuple( + "ObjectIdentifierState", + BasicState._fields + ("value", "defines"), + **NAMEDTUPLE_KWARGS +) class ObjectIdentifier(Obj): @@ -3486,10 +3524,10 @@ class ObjectIdentifier(Obj): self.defines = defines def __add__(self, their): + if their.__class__ == tuple: + return self.__class__(self._value + their) if isinstance(their, self.__class__): return self.__class__(self._value + their._value) - if isinstance(their, tuple): - return self.__class__(self._value + their) raise InvalidValueType((self.__class__, tuple)) def _value_sanitize(self, value): @@ -3497,10 +3535,10 @@ class ObjectIdentifier(Obj): return value._value if isinstance(value, string_types): try: - value = tuple(int(arc) for arc in value.split(".")) + value = tuple(pureint(arc) for arc in value.split(".")) except ValueError: raise InvalidOID("unacceptable arcs values") - if isinstance(value, tuple): + if value.__class__ == tuple: if len(value) < 2: raise InvalidOID("less than 2 arcs") first_arc = value[0] @@ -3511,6 +3549,8 @@ class ObjectIdentifier(Obj): pass else: raise InvalidOID("unacceptable first arc value") + if not all(arc >= 0 for arc in value): + raise InvalidOID("negative arc value") return value raise InvalidValueType((self.__class__, str, tuple)) @@ -3521,7 +3561,6 @@ class ObjectIdentifier(Obj): def __getstate__(self): return ObjectIdentifierState( __version__, - self._value, self.tag, self._expl, self.default, @@ -3532,22 +3571,13 @@ class ObjectIdentifier(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, self.defines, ) def __setstate__(self, state): super(ObjectIdentifier, self).__setstate__(state) self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.defines = state.defines def __iter__(self): @@ -3566,7 +3596,7 @@ class ObjectIdentifier(Obj): ) def __eq__(self, their): - if isinstance(their, tuple): + if their.__class__ == tuple: return self._value == their if not issubclass(their.__class__, ObjectIdentifier): return False @@ -3802,7 +3832,7 @@ class Enumerated(Integer): def escape_control_unicode(c): - if unicat(c).startswith("C"): + if unicat(c)[0] == "C": c = repr(c).lstrip("u").strip("'") return c @@ -3876,9 +3906,9 @@ class CommonString(OctetString): value_decoded = None if isinstance(value, self.__class__): value_raw = value._value - elif isinstance(value, text_type): + elif value.__class__ == text_type: value_decoded = value - elif isinstance(value, binary_type): + elif value.__class__ == binary_type: value_raw = value else: raise InvalidValueType((self.__class__, text_type, binary_type)) @@ -3902,9 +3932,9 @@ class CommonString(OctetString): return value_raw def __eq__(self, their): - if isinstance(their, binary_type): + if their.__class__ == binary_type: return self._value == their - if isinstance(their, text_type): + if their.__class__ == text_type: return self._value == their.encode(self.encoding) if not isinstance(their, self.__class__): return False @@ -3995,6 +4025,7 @@ class NumericString(AllowableCharsMixin, CommonString): PrintableStringState = namedtuple( "PrintableStringState", OctetStringState._fields + ("allowable_chars",), + **NAMEDTUPLE_KWARGS ) @@ -4029,6 +4060,7 @@ class PrintableString(AllowableCharsMixin, CommonString): default=None, optional=False, _decoded=(0, 0, 0), + ctx=None, allow_asterisk=False, allow_ampersand=False, ): @@ -4041,7 +4073,7 @@ class PrintableString(AllowableCharsMixin, CommonString): if allow_ampersand: self._allowable_chars |= self._ampersand super(PrintableString, self).__init__( - value, bounds, impl, expl, default, optional, _decoded, + value, bounds, impl, expl, default, optional, _decoded, ctx, ) @property @@ -4127,7 +4159,31 @@ LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ") LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ") -class UTCTime(CommonString): +class VisibleString(CommonString): + __slots__ = () + tag_default = tag_encode(26) + encoding = "ascii" + asn1_type_name = "VisibleString" + + +UTCTimeState = namedtuple( + "UTCTimeState", + OctetStringState._fields + ("ber_raw",), + **NAMEDTUPLE_KWARGS +) + + +def str_to_time_fractions(value): + v = pureint(value) + year, v = (v // 10**10), (v % 10**10) + month, v = (v // 10**8), (v % 10**8) + day, v = (v // 10**6), (v % 10**6) + hour, v = (v // 10**4), (v % 10**4) + minute, second = (v // 100), (v % 100) + return year, month, day, hour, minute, second + + +class UTCTime(VisibleString): """``UTCTime`` datetime type >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123)) @@ -4141,11 +4197,23 @@ class UTCTime(CommonString): >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime() datetime.datetime(1957, 9, 30, 22, 7, 50) + If BER encoded value was met, then ``ber_raw`` attribute will hold + its raw representation. + + .. warning:: + + Pay attention that UTCTime can not hold full year, so all years + having < 50 years are treated as 20xx, 19xx otherwise, according + to X.509 recommendation. + .. warning:: - BER encoding is unsupported. + No strict validation of UTC offsets are made, but very crude: + + * minutes are not exceeding 60 + * offset value is not exceeding 14 hours """ - __slots__ = () + __slots__ = ("ber_raw",) tag_default = tag_encode(23) encoding = "ascii" asn1_type_name = "UTCTime" @@ -4159,6 +4227,7 @@ class UTCTime(CommonString): optional=False, _decoded=(0, 0, 0), bounds=None, # dummy argument, workability for OctetString.decode + ctx=None, ): """ :param value: set the value. Either datetime type, or @@ -4169,13 +4238,15 @@ class UTCTime(CommonString): :param bool optional: is object ``OPTIONAL`` in sequence """ super(UTCTime, self).__init__( - None, None, impl, expl, default, optional, _decoded, + None, None, impl, expl, None, optional, _decoded, ctx, ) self._value = value + self.ber_raw = None if value is not None: - self._value = self._value_sanitize(value) + self._value, self.ber_raw = self._value_sanitize(value, ctx) + self.ber_encoded = self.ber_raw is not None if default is not None: - default = self._value_sanitize(default) + default, _ = self._value_sanitize(default) self.default = self.__class__( value=default, impl=self.tag, @@ -4183,6 +4254,45 @@ class UTCTime(CommonString): ) if self._value is None: self._value = default + optional = True + self.optional = optional + + def _strptime_bered(self, value): + year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00") + value = value[10:] + if len(value) == 0: + raise ValueError("no timezone") + year += 2000 if year < 50 else 1900 + decoded = datetime(year, month, day, hour, minute) + offset = 0 + if value[-1] == "Z": + value = value[:-1] + else: + if len(value) < 5: + raise ValueError("invalid UTC offset") + if value[-5] == "-": + sign = -1 + elif value[-5] == "+": + sign = 1 + else: + raise ValueError("invalid UTC offset") + v = pureint(value[-4:]) + offset, v = (60 * (v % 100)), v // 100 + if offset >= 3600: + raise ValueError("invalid UTC offset minutes") + offset += 3600 * v + if offset > 14 * 3600: + raise ValueError("too big UTC offset") + offset *= sign + value = value[:-5] + if len(value) == 0: + return offset, decoded + if len(value) != 2: + raise ValueError("invalid UTC offset seconds") + seconds = pureint(value) + if seconds >= 60: + raise ValueError("invalid seconds value") + return offset, decoded + timedelta(seconds=seconds) def _strptime(self, value): # datetime.strptime's format: %y%m%d%H%M%SZ @@ -4190,36 +4300,76 @@ class UTCTime(CommonString): raise ValueError("invalid UTCTime length") if value[-1] != "Z": raise ValueError("non UTC timezone") - return datetime( - 2000 + int(value[:2]), # %y - int(value[2:4]), # %m - int(value[4:6]), # %d - int(value[6:8]), # %H - int(value[8:10]), # %M - int(value[10:12]), # %S - ) + year, month, day, hour, minute, second = str_to_time_fractions(value[:-1]) + year += 2000 if year < 50 else 1900 + return datetime(year, month, day, hour, minute, second) - def _value_sanitize(self, value): - if isinstance(value, binary_type): + def _dt_sanitize(self, value): + if value.year < 1950 or value.year > 2049: + raise ValueError("UTCTime can hold only 1950-2049 years") + return value.replace(microsecond=0) + + def _value_sanitize(self, value, ctx=None): + if value.__class__ == binary_type: try: value_decoded = value.decode("ascii") except (UnicodeEncodeError, UnicodeDecodeError) as err: raise DecodeError("invalid UTCTime encoding: %r" % err) + err = None try: - self._strptime(value_decoded) - except (TypeError, ValueError) as err: - raise DecodeError("invalid UTCTime format: %r" % err) - return value + return self._strptime(value_decoded), None + except (TypeError, ValueError) as _err: + err = _err + if (ctx is not None) and ctx.get("bered", False): + try: + offset, _value = self._strptime_bered(value_decoded) + _value = _value - timedelta(seconds=offset) + return self._dt_sanitize(_value), value + except (TypeError, ValueError, OverflowError) as _err: + err = _err + raise DecodeError( + "invalid %s format: %r" % (self.asn1_type_name, err), + klass=self.__class__, + ) if isinstance(value, self.__class__): - return value._value - if isinstance(value, datetime): - return value.strftime("%y%m%d%H%M%SZ").encode("ascii") + return value._value, None + if value.__class__ == datetime: + return self._dt_sanitize(value), None raise InvalidValueType((self.__class__, datetime)) + def _pp_value(self): + if self.ready: + value = self._value.isoformat() + if self.ber_encoded: + value += " (%s)" % self.ber_raw + return value + + def __unicode__(self): + if self.ready: + value = self._value.isoformat() + if self.ber_encoded: + value += " (%s)" % self.ber_raw + return value + return text_type(self._pp_value()) + + def __getstate__(self): + return UTCTimeState( + *super(UTCTime, self).__getstate__(), + **{"ber_raw": self.ber_raw} + ) + + def __setstate__(self, state): + super(UTCTime, self).__setstate__(state) + self.ber_raw = state.ber_raw + + def __bytes__(self): + self._assert_ready() + return self._encode_time() + def __eq__(self, their): - if isinstance(their, binary_type): - return self._value == their - if isinstance(their, datetime): + if their.__class__ == binary_type: + return self._encode_time() == their + if their.__class__ == datetime: return self.todatetime() == their if not isinstance(their, self.__class__): return False @@ -4229,25 +4379,16 @@ class UTCTime(CommonString): self._expl == their._expl ) - def todatetime(self): - """Convert to datetime + def _encode_time(self): + return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii") - :returns: datetime + def _encode(self): + self._assert_ready() + value = self._encode_time() + return b"".join((self.tag, len_encode(len(value)), value)) - Pay attention that UTCTime can not hold full year, so all years - having < 50 years are treated as 20xx, 19xx otherwise, according - to X.509 recomendation. - """ - value = self._strptime(self._value.decode("ascii")) - year = value.year % 100 - return datetime( - year=(2000 + year) if year < 50 else (1900 + year), - month=value.month, - day=value.day, - hour=value.hour, - minute=value.minute, - second=value.second, - ) + def todatetime(self): + return self._value def __repr__(self): return pp_console_row(next(self.pps())) @@ -4258,7 +4399,7 @@ class UTCTime(CommonString): asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, - value=self.todatetime().isoformat() if self.ready else None, + value=self._pp_value(), optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), @@ -4293,13 +4434,19 @@ class GeneralizedTime(UTCTime): .. warning:: - BER encoding is unsupported. + Only microsecond fractions are supported in DER encoding. + :py:exc:`pyderasn.DecodeError` will be raised during decoding of + higher precision values. + + .. warning:: + + BER encoded data can loss information (accuracy) during decoding + because of float transformations. .. warning:: - Only microsecond fractions are supported. - :py:exc:`pyderasn.DecodeError` will be raised during decoding of - higher precision values. + Local times (without explicit timezone specification) are treated + as UTC one, no transformations are made. .. warning:: @@ -4309,20 +4456,74 @@ class GeneralizedTime(UTCTime): tag_default = tag_encode(24) asn1_type_name = "GeneralizedTime" + def _dt_sanitize(self, value): + return value + + def _strptime_bered(self, value): + if len(value) < 4 + 3 * 2: + raise ValueError("invalid GeneralizedTime") + year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000") + decoded = datetime(year, month, day, hour) + offset, value = 0, value[10:] + if len(value) == 0: + return offset, decoded + if value[-1] == "Z": + value = value[:-1] + else: + for char, sign in (("-", -1), ("+", 1)): + idx = value.rfind(char) + if idx == -1: + continue + offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx] + v = pureint(offset_raw) + if len(offset_raw) == 4: + offset, v = (60 * (v % 100)), v // 100 + if offset >= 3600: + raise ValueError("invalid UTC offset minutes") + elif len(offset_raw) == 2: + pass + else: + raise ValueError("invalid UTC offset") + offset += 3600 * v + if offset > 14 * 3600: + raise ValueError("too big UTC offset") + offset *= sign + break + if len(value) == 0: + return offset, decoded + if value[0] in DECIMAL_SIGNS: + return offset, ( + decoded + timedelta(seconds=3600 * fractions2float(value[1:])) + ) + if len(value) < 2: + raise ValueError("stripped minutes") + decoded += timedelta(seconds=60 * pureint(value[:2])) + value = value[2:] + if len(value) == 0: + return offset, decoded + if value[0] in DECIMAL_SIGNS: + return offset, ( + decoded + timedelta(seconds=60 * fractions2float(value[1:])) + ) + if len(value) < 2: + raise ValueError("stripped seconds") + decoded += timedelta(seconds=pureint(value[:2])) + value = value[2:] + if len(value) == 0: + return offset, decoded + if value[0] not in DECIMAL_SIGNS: + raise ValueError("invalid format after seconds") + return offset, ( + decoded + timedelta(microseconds=10**6 * fractions2float(value[1:])) + ) + def _strptime(self, value): l = len(value) if l == LEN_YYYYMMDDHHMMSSZ: # datetime.strptime's format: %Y%m%d%H%M%SZ if value[-1] != "Z": raise ValueError("non UTC timezone") - return datetime( - int(value[:4]), # %Y - int(value[4:6]), # %m - int(value[6:8]), # %d - int(value[8:10]), # %H - int(value[10:12]), # %M - int(value[12:14]), # %S - ) + return datetime(*str_to_time_fractions(value[:-1])) if l >= LEN_YYYYMMDDHHMMSSDMZ: # datetime.strptime's format: %Y%m%d%H%M%S.%fZ if value[-1] != "Z": @@ -4335,44 +4536,17 @@ class GeneralizedTime(UTCTime): us_len = len(us) if us_len > 6: raise ValueError("only microsecond fractions are supported") - us = int(us + ("0" * (6 - us_len))) - decoded = datetime( - int(value[:4]), # %Y - int(value[4:6]), # %m - int(value[6:8]), # %d - int(value[8:10]), # %H - int(value[10:12]), # %M - int(value[12:14]), # %S - us, # %f - ) - return decoded + us = pureint(us + ("0" * (6 - us_len))) + year, month, day, hour, minute, second = str_to_time_fractions(value[:14]) + return datetime(year, month, day, hour, minute, second, us) raise ValueError("invalid GeneralizedTime length") - def _value_sanitize(self, value): - if isinstance(value, binary_type): - try: - value_decoded = value.decode("ascii") - except (UnicodeEncodeError, UnicodeDecodeError) as err: - raise DecodeError("invalid GeneralizedTime encoding: %r" % err) - try: - self._strptime(value_decoded) - except (TypeError, ValueError) as err: - raise DecodeError( - "invalid GeneralizedTime format: %r" % err, - klass=self.__class__, - ) - return value - if isinstance(value, self.__class__): - return value._value - if isinstance(value, datetime): - encoded = value.strftime("%Y%m%d%H%M%S") - if value.microsecond > 0: - encoded = encoded + (".%06d" % value.microsecond).rstrip("0") - return (encoded + "Z").encode("ascii") - raise InvalidValueType((self.__class__, datetime)) - - def todatetime(self): - return self._strptime(self._value.decode("ascii")) + def _encode_time(self): + value = self._value + encoded = value.strftime("%Y%m%d%H%M%S") + if value.microsecond > 0: + encoded += (".%06d" % value.microsecond).rstrip("0") + return (encoded + "Z").encode("ascii") class GraphicString(CommonString): @@ -4382,13 +4556,6 @@ class GraphicString(CommonString): asn1_type_name = "GraphicString" -class VisibleString(CommonString): - __slots__ = () - tag_default = tag_encode(26) - encoding = "ascii" - asn1_type_name = "VisibleString" - - class ISO646String(VisibleString): __slots__ = () asn1_type_name = "ISO646String" @@ -4415,21 +4582,11 @@ class BMPString(CommonString): asn1_type_name = "BMPString" -ChoiceState = namedtuple("ChoiceState", ( - "version", - "specs", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +ChoiceState = namedtuple( + "ChoiceState", + BasicState._fields + ("specs", "value",), + **NAMEDTUPLE_KWARGS +) class Choice(Obj): @@ -4493,7 +4650,7 @@ class Choice(Obj): if len(schema) == 0: raise ValueError("schema must be specified") self.specs = ( - schema if isinstance(schema, OrderedDict) else OrderedDict(schema) + schema if schema.__class__ == OrderedDict else OrderedDict(schema) ) self._value = None if value is not None: @@ -4508,7 +4665,7 @@ class Choice(Obj): self._value = copy(default_obj._value) def _value_sanitize(self, value): - if isinstance(value, tuple) and len(value) == 2: + if (value.__class__ == tuple) and len(value) == 2: choice, obj = value spec = self.specs.get(choice) if spec is None: @@ -4534,8 +4691,6 @@ class Choice(Obj): def __getstate__(self): return ChoiceState( __version__, - self.specs, - copy(self._value), self.tag, self._expl, self.default, @@ -4546,24 +4701,17 @@ class Choice(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + copy(self._value), ) def __setstate__(self, state): super(Choice, self).__setstate__(state) self.specs = state.specs self._value = state.value - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __eq__(self, their): - if isinstance(their, tuple) and len(their) == 2: + if (their.__class__ == tuple) and len(their) == 2: return self._value == their if not isinstance(their, self.__class__): return False @@ -4589,11 +4737,15 @@ class Choice(Obj): @property def choice(self): + """Name of the choice + """ self._assert_ready() return self._value[0] @property def value(self): + """Value of underlying choice + """ self._assert_ready() return self._value[1] @@ -4734,20 +4886,11 @@ class PrimitiveTypes(Choice): )) -AnyState = namedtuple("AnyState", ( - "version", - "value", - "tag", - "expl", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "defined", -)) +AnyState = namedtuple( + "AnyState", + BasicState._fields + ("value", "defined"), + **NAMEDTUPLE_KWARGS +) class Any(Obj): @@ -4784,7 +4927,7 @@ class Any(Obj): self.defined = None def _value_sanitize(self, value): - if isinstance(value, binary_type): + if value.__class__ == binary_type: return value if isinstance(value, self.__class__): return value._value @@ -4807,9 +4950,9 @@ class Any(Obj): def __getstate__(self): return AnyState( __version__, - self._value, self.tag, self._expl, + None, self.optional, self.offset, self.llen, @@ -4817,25 +4960,17 @@ class Any(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, self.defined, ) def __setstate__(self, state): super(Any, self).__setstate__(state) self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.defined = state.defined def __eq__(self, their): - if isinstance(their, binary_type): + if their.__class__ == binary_type: return self._value == their if issubclass(their.__class__, Any): return self._value == their._value @@ -4981,7 +5116,7 @@ def get_def_by_path(defines_by_path, sub_decode_path): if len(path) != len(sub_decode_path): continue for p1, p2 in zip(path, sub_decode_path): - if (p1 != any) and (p1 != p2): + if (not p1 is any) and (p1 != p2): break else: return define @@ -5012,21 +5147,11 @@ def abs_decode_path(decode_path, rel_path): return decode_path + rel_path -SequenceState = namedtuple("SequenceState", ( - "version", - "specs", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +SequenceState = namedtuple( + "SequenceState", + BasicState._fields + ("specs", "value",), + **NAMEDTUPLE_KWARGS +) class Sequence(Obj): @@ -5138,7 +5263,7 @@ class Sequence(Obj): if schema is None: schema = getattr(self, "schema", ()) self.specs = ( - schema if isinstance(schema, OrderedDict) else OrderedDict(schema) + schema if schema.__class__ == OrderedDict else OrderedDict(schema) ) self._value = {} if value is not None: @@ -5181,8 +5306,6 @@ class Sequence(Obj): def __getstate__(self): return SequenceState( __version__, - self.specs, - {k: copy(v) for k, v in iteritems(self._value)}, self.tag, self._expl, self.default, @@ -5193,22 +5316,14 @@ class Sequence(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + {k: copy(v) for k, v in iteritems(self._value)}, ) def __setstate__(self, state): super(Sequence, self).__setstate__(state) self.specs = state.specs self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __eq__(self, their): if not isinstance(their, self.__class__): @@ -5266,19 +5381,17 @@ class Sequence(Obj): return spec.default return None - def _encoded_values(self): - raws = [] + def _values_for_encoding(self): for name, spec in iteritems(self.specs): value = self._value.get(name) if value is None: if spec.optional: continue raise ObjNotReady(name) - raws.append(value.encode()) - return raws + yield value def _encode(self): - v = b"".join(self._encoded_values()) + v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) def _decode(self, tlv, offset, decode_path, ctx, tag_only): @@ -5513,8 +5626,8 @@ class Set(Sequence): .. _allow_unordered_set_ctx: DER prohibits unordered values encoding and will raise an error - during decode. If If :ref:`bered ` context option is set, - then no error will occure. Also you can disable strict values + during decode. If :ref:`bered ` context option is set, + then no error will occur. Also you can disable strict values ordering check by setting ``"allow_unordered_set": True`` :ref:`context ` option. """ @@ -5523,7 +5636,7 @@ class Set(Sequence): asn1_type_name = "SET" def _encode(self): - raws = self._encoded_values() + raws = [v.encode() for v in self._values_for_encoding()] raws.sort() v = b"".join(raws) return b"".join((self.tag, len_encode(len(v)), v)) @@ -5664,34 +5777,23 @@ class Set(Sequence): tail = v[EOC_LEN:] obj.lenindef = True obj._value = values - if not obj.ready: - raise DecodeError( - "not all values are ready", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) + for name, spec in iteritems(self.specs): + if name not in values and not spec.optional: + raise DecodeError( + "%s value is not ready" % name, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) obj.ber_encoded = ber_encoded return obj, tail -SequenceOfState = namedtuple("SequenceOfState", ( - "version", - "spec", - "value", - "bound_min", - "bound_max", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +SequenceOfState = namedtuple( + "SequenceOfState", + BasicState._fields + ("spec", "value", "bound_min", "bound_max"), + **NAMEDTUPLE_KWARGS +) class SequenceOf(Obj): @@ -5792,10 +5894,6 @@ class SequenceOf(Obj): def __getstate__(self): return SequenceOfState( __version__, - self.spec, - [copy(v) for v in self._value], - self._bound_min, - self._bound_max, self.tag, self._expl, self.default, @@ -5806,6 +5904,10 @@ class SequenceOf(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.spec, + [copy(v) for v in self._value], + self._bound_min, + self._bound_max, ) def __setstate__(self, state): @@ -5814,16 +5916,6 @@ class SequenceOf(Obj): self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __eq__(self, their): if isinstance(their, self.__class__): @@ -5889,11 +5981,11 @@ class SequenceOf(Obj): def __getitem__(self, key): return self._value[key] - def _encoded_values(self): - return [v.encode() for v in self._value] + def _values_for_encoding(self): + return iter(self._value) def _encode(self): - v = b"".join(self._encoded_values()) + v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) def _decode(self, tlv, offset, decode_path, ctx, tag_only, ordering_check=False): @@ -6056,7 +6148,7 @@ class SetOf(SequenceOf): asn1_type_name = "SET OF" def _encode(self): - raws = self._encoded_values() + raws = [v.encode() for v in self._values_for_encoding()] raws.sort() v = b"".join(raws) return b"".join((self.tag, len_encode(len(v)), v)) @@ -6214,6 +6306,7 @@ def main(): # pragma: no cover if args.defines_by_path is not None: ctx["defines_by_path"] = obj_by_path(args.defines_by_path) obj, tail = schema().decode(der, ctx=ctx) + from os import environ print(pprinter( obj, oid_maps=oid_maps,