X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=pyderasn.py;h=5f0c0343ccd0d4153d29e6328fde4a73882229ec;hb=ac86b0fe801de06f9b861579e5d566d3cdb3236b;hp=2213e7804b60ae3b403dc09c25f6ad536ef08af0;hpb=795d767d71d08311fe4e57efaa7521455db1d574;p=pyderasn.git diff --git a/pyderasn.py b/pyderasn.py index 2213e78..5f0c034 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -1,12 +1,11 @@ #!/usr/bin/env python # coding: utf-8 # PyDERASN -- Python ASN.1 DER/BER codec with abstract structures -# Copyright (C) 2017-2018 Sergey Matveev +# Copyright (C) 2017-2020 Sergey Matveev # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. +# published by the Free Software Foundation, version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -14,8 +13,7 @@ # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public -# License along with this program. If not, see -# . +# License along with this program. If not, see . """Python ASN.1 DER/BER codec with abstract structures This library allows you to marshal various structures in ASN.1 DER @@ -239,6 +237,105 @@ all object ``repr``. But it is easy to write custom formatters. >>> print(pprint(obj)) 0 [1,1, 2] INTEGER -12345 +.. _pprint_example: + +Example certificate:: + + >>> print(pprint(crt)) + 0 [1,3,1604] Certificate SEQUENCE + 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL + 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 + 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL + . . . . 05:00 + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence + 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF + 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF + 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE + 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 + 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY + . . . . . . . 13:02:45:53 + [...] + 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE + 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL + . . . 05:00 + 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits + . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD + . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C + [...] + + Trailing data: 0a + +Let's parse that output, human:: + + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL + ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ + 0 1 2 3 4 5 6 7 8 9 10 11 + +:: + + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + ^ ^ ^ ^ ^ ^ ^ ^ + 0 2 3 4 5 6 9 10 + +:: + + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence + ^ ^ ^ ^ ^ ^ ^ ^ ^ + 0 2 3 4 5 6 8 9 10 + +:: + + 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes + ^ ^ ^ ^ ^ + 12 13 14 9 10 + +:0: + Offset of the object, where its DER/BER encoding begins. + Pay attention that it does **not** include explicit tag. +:1: + If explicit tag exists, then this is its length (tag + encoded length). +:2: + Length of object's tag. For example CHOICE does not have its own tag, + so it is zero. +:3: + Length of encoded length. +:4: + Length of encoded value. +:5: + Visual indentation to show the depth of object in the hierarchy. +:6: + Object's name inside SEQUENCE/CHOICE. +:7: + If either IMPLICIT or EXPLICIT tag is set, then it will be shown + here. "IMPLICIT" is omitted. +:8: + Object's class name, if set. Omitted if it is just an ordinary simple + value (like with ``algorithm`` in example above). +:9: + Object's ASN.1 type. +:10: + Object's value, if set. Can consist of multiple words (like OCTET/BIT + STRINGs above). We see ``v3`` value in Version, because it is named. + ``rdnSequence`` is the choice of CHOICE type. +:11: + Possible other flags like OPTIONAL and DEFAULT, if value equals to the + default one, specified in the schema. +:12: + Shows does object contains any kind of BER encoded data (possibly + Sequence holding BER-encoded underlying value). +:13: + Only applicable to BER encoded data. Indefinite length encoding mark. +:14: + Only applicable to BER encoded data. If object has BER-specific + encoding, then ``BER`` will be shown. It does not depend on indefinite + length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING`` + (and its derivatives), ``SET``, ``SET OF`` could be BERed. + + .. _definedby: DEFINED BY @@ -249,6 +346,8 @@ DEFINED BY some previously met ObjectIdentifier. This library provides ability to specify mapping between some OID and field that must be decoded with specific specification. +.. _defines: + defines kwarg _____________ @@ -322,15 +421,15 @@ value must be sequence of following tuples:: where ``decode_path`` is a tuple holding so-called decode path to the exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply -``defines``, holding exactly the same value as accepted in its keyword -argument. +``defines``, holding exactly the same value as accepted in its +:ref:`keyword argument `. For example, again for CMS, you want to automatically decode ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse`` structures it may hold. Also, automatically decode ``controlSequence`` of ``PKIResponse``:: - content_info, tail = ContentInfo().decode(data, defines_by_path=( + content_info, tail = ContentInfo().decode(data, ctx={"defines_by_path": ( ( ("contentType",), ((("content",), {id_signedData: SignedData()}),), @@ -365,7 +464,7 @@ of ``PKIResponse``:: id_cmc_transactionId: TransactionId(), })), ), - )) + )}) Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``. First function is useful for path construction when some automatic @@ -382,15 +481,22 @@ DER. But you can optionally enable BER decoding with setting ``bered`` :ref:`context ` argument to True. Indefinite lengths and constructed primitive types should be parsed successfully. -* If object is encoded in BER form (not the DER one), then ``bered`` +* If object is encoded in BER form (not the DER one), then ``ber_encoded`` attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET - STRING``, ``SEQUENCE``, ``SET``, ``SET OF`` can contain it. + STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF`` + can contain it. * If object has an indefinite length encoding, then its ``lenindef`` attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``, ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can contain it. * If object has an indefinite length encoded explicit tag, then ``expl_lenindef`` is set to True. +* If object has either any of BER-related encoding (explicit tag + indefinite length, object's indefinite length, BER-encoding) or any + underlying component has that kind of encoding, then ``bered`` + attribute is set to True. For example SignedData CMS can have + ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but + ``ContentInfo:content:signerInfos:*:signedAttrs`` won't. EOC (end-of-contents) token's length is taken in advance in object's value length. @@ -411,6 +517,11 @@ lengths will be invalid in that case. This option should be used only for skipping some decode errors, just to see the decoded structure somehow. +Base Obj +-------- +.. autoclass:: pyderasn.Obj + :members: + Primitive types --------------- @@ -456,6 +567,10 @@ NumericString _____________ .. autoclass:: pyderasn.NumericString +PrintableString +_______________ +.. autoclass:: pyderasn.PrintableString + UTCTime _______ .. autoclass:: pyderasn.UTCTime @@ -509,16 +624,17 @@ Various ------- .. autofunction:: pyderasn.abs_decode_path +.. autofunction:: pyderasn.colonize_hex .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec .. autofunction:: pyderasn.tag_encode .. autofunction:: pyderasn.tag_decode .. autofunction:: pyderasn.tag_ctxp .. autofunction:: pyderasn.tag_ctxc -.. autoclass:: pyderasn.Obj .. autoclass:: pyderasn.DecodeError :members: __init__ .. autoclass:: pyderasn.NotEnoughData +.. autoclass:: pyderasn.ExceedingData .. autoclass:: pyderasn.LenIndefForm .. autoclass:: pyderasn.TagMismatch .. autoclass:: pyderasn.InvalidLength @@ -533,9 +649,11 @@ from codecs import getdecoder from codecs import getencoder from collections import namedtuple from collections import OrderedDict +from copy import copy from datetime import datetime from math import ceil from os import environ +from string import ascii_letters from string import digits from six import add_metaclass @@ -545,18 +663,22 @@ from six import indexbytes from six import int2byte from six import integer_types from six import iterbytes +from six import iteritems +from six import itervalues from six import PY2 from six import string_types from six import text_type +from six import unichr as six_unichr from six.moves import xrange as six_xrange try: from termcolor import colored -except ImportError: - def colored(what, *args): +except ImportError: # pragma: no cover + def colored(what, *args, **kwargs): return what +__version__ = "5.6" __all__ = ( "Any", @@ -568,6 +690,7 @@ __all__ = ( "DecodeError", "DecodePathDefBy", "Enumerated", + "ExceedingData", "GeneralizedTime", "GeneralString", "GraphicString", @@ -635,7 +758,11 @@ LENINDEF_PP_CHAR = "I" if PY2 else "∞" # Errors ######################################################################## -class DecodeError(Exception): +class ASN1Error(ValueError): + pass + + +class DecodeError(ASN1Error): def __init__(self, msg="", klass=None, decode_path=(), offset=0): """ :param str msg: reason of decode failing @@ -674,6 +801,18 @@ class NotEnoughData(DecodeError): pass +class ExceedingData(ASN1Error): + def __init__(self, nbytes): + super(ExceedingData, self).__init__() + self.nbytes = nbytes + + def __str__(self): + return "%d trailing bytes" % self.nbytes + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self) + + class LenIndefForm(DecodeError): pass @@ -690,7 +829,7 @@ class InvalidOID(DecodeError): pass -class ObjUnknown(ValueError): +class ObjUnknown(ASN1Error): def __init__(self, name): super(ObjUnknown, self).__init__() self.name = name @@ -702,7 +841,7 @@ class ObjUnknown(ValueError): return "%s(%s)" % (self.__class__.__name__, self) -class ObjNotReady(ValueError): +class ObjNotReady(ASN1Error): def __init__(self, name): super(ObjNotReady, self).__init__() self.name = name @@ -714,7 +853,7 @@ class ObjNotReady(ValueError): return "%s(%s)" % (self.__class__.__name__, self) -class InvalidValueType(ValueError): +class InvalidValueType(ASN1Error): def __init__(self, expected_types): super(InvalidValueType, self).__init__() self.expected_types = expected_types @@ -728,7 +867,7 @@ class InvalidValueType(ValueError): return "%s(%s)" % (self.__class__.__name__, self) -class BoundsError(ValueError): +class BoundsError(ASN1Error): def __init__(self, bound_min, value, bound_max): super(BoundsError, self).__init__() self.bound_min = bound_min @@ -899,9 +1038,9 @@ def len_decode(data): ######################################################################## class AutoAddSlots(type): - def __new__(mcs, name, bases, _dict): + def __new__(cls, name, bases, _dict): _dict["__slots__"] = _dict.get("__slots__", ()) - return type.__new__(mcs, name, bases, _dict) + return type.__new__(cls, name, bases, _dict) @add_metaclass(AutoAddSlots) @@ -922,7 +1061,7 @@ class Obj(object): "vlen", "expl_lenindef", "lenindef", - "bered", + "ber_encoded", ) def __init__( @@ -944,7 +1083,7 @@ class Obj(object): self.default = None self.expl_lenindef = False self.lenindef = False - self.bered = False + self.ber_encoded = False @property def ready(self): # pragma: no cover @@ -956,6 +1095,12 @@ class Obj(object): if not self.ready: raise ObjNotReady(self.__class__.__name__) + @property + def bered(self): + """Is either object or any elements inside is BER encoded? + """ + return self.expl_lenindef or self.lenindef or self.ber_encoded + @property def decoded(self): """Is object decoded? @@ -969,10 +1114,14 @@ class Obj(object): @property def tlen(self): + """See :ref:`decoding` + """ return len(self.tag) @property def tlvlen(self): + """See :ref:`decoding` + """ return self.tlen + self.llen + self.vlen def __str__(self): # pragma: no cover @@ -997,6 +1146,10 @@ class Obj(object): raise NotImplementedError() def encode(self): + """Encode the structure + + :returns: DER representation + """ raw = self._encode() if self._expl is None: return raw @@ -1010,6 +1163,7 @@ class Obj(object): decode_path=(), ctx=None, tag_only=False, + _ctx_immutable=True, ): """Decode the data @@ -1017,14 +1171,19 @@ class Obj(object): :param int offset: initial data's offset :param bool leavemm: do we need to leave memoryview of remaining data as is, or convert it to bytes otherwise - :param ctx: optional :ref:`context ` governing decoding process. + :param ctx: optional :ref:`context ` governing decoding process :param tag_only: decode only the tag, without length and contents (used only in Choice and Set structures, trying to determine if tag satisfies the scheme) + :param _ctx_immutable: do we need to copy ``ctx`` before using it :returns: (Obj, remaining data) + + .. seealso:: :ref:`decoding` """ if ctx is None: ctx = {} + elif _ctx_immutable: + ctx = copy(ctx) tlv = memoryview(data) if self._expl is None: result = self._decode( @@ -1035,7 +1194,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: - return + return None obj, tail = result else: try: @@ -1072,8 +1231,8 @@ class Obj(object): ctx=ctx, tag_only=tag_only, ) - if tag_only: - return + if tag_only: # pragma: no cover + return None obj, tail = result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: @@ -1107,8 +1266,8 @@ class Obj(object): ctx=ctx, tag_only=tag_only, ) - if tag_only: - return + if tag_only: # pragma: no cover + return None obj, tail = result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( @@ -1119,46 +1278,87 @@ class Obj(object): ) return obj, (tail if leavemm else tail.tobytes()) + def decod(self, data, offset=0, decode_path=(), ctx=None): + """Decode the data, check that tail is empty + + :raises ExceedingData: if tail is not empty + + This is just a wrapper over :py:meth:`pyderasn.Obj.decode` + (decode without tail) that also checks that there is no + trailing data left. + """ + obj, tail = self.decode( + data, + offset=offset, + decode_path=decode_path, + ctx=ctx, + leavemm=True, + ) + if len(tail) > 0: + raise ExceedingData(len(tail)) + return obj + @property def expled(self): + """See :ref:`decoding` + """ return self._expl is not None @property def expl_tag(self): + """See :ref:`decoding` + """ return self._expl @property def expl_tlen(self): + """See :ref:`decoding` + """ return len(self._expl) @property def expl_llen(self): + """See :ref:`decoding` + """ if self.expl_lenindef: return 1 return len(len_encode(self.tlvlen)) @property def expl_offset(self): + """See :ref:`decoding` + """ return self.offset - self.expl_tlen - self.expl_llen @property def expl_vlen(self): + """See :ref:`decoding` + """ return self.tlvlen @property def expl_tlvlen(self): + """See :ref:`decoding` + """ return self.expl_tlen + self.expl_llen + self.expl_vlen @property def fulloffset(self): + """See :ref:`decoding` + """ return self.expl_offset if self.expled else self.offset @property def fulllen(self): + """See :ref:`decoding` + """ return self.expl_tlvlen if self.expled else self.tlvlen def pps_lenindef(self, decode_path): - if self.lenindef: + if self.lenindef and not ( + getattr(self, "defined", None) is not None and + self.defined[1].lenindef + ): yield _pp( asn1_type_name="EOC", obj_name="", @@ -1170,6 +1370,7 @@ class Obj(object): tlen=1, llen=1, vlen=0, + ber_encoded=True, bered=True, ) if self.expl_lenindef: @@ -1181,6 +1382,7 @@ class Obj(object): tlen=1, llen=1, vlen=0, + ber_encoded=True, bered=True, ) @@ -1213,6 +1415,7 @@ class DecodePathDefBy(object): ######################################################################## PP = namedtuple("PP", ( + "obj", "asn1_type_name", "obj_name", "decode_path", @@ -1232,11 +1435,13 @@ PP = namedtuple("PP", ( "expl_vlen", "expl_lenindef", "lenindef", + "ber_encoded", "bered", )) def _pp( + obj=None, asn1_type_name="unknown", obj_name="unknown", decode_path=(), @@ -1256,9 +1461,11 @@ def _pp( expl_vlen=None, expl_lenindef=False, lenindef=False, + ber_encoded=False, bered=False, ): return PP( + obj, asn1_type_name, obj_name, decode_path, @@ -1278,6 +1485,7 @@ def _pp( expl_vlen, expl_lenindef, lenindef, + ber_encoded, bered, ) @@ -1286,9 +1494,15 @@ def _colourize(what, colour, with_colours, attrs=("bold",)): return colored(what, colour, attrs=attrs) if with_colours else what +def colonize_hex(hexed): + """Separate hexadecimal string with colons + """ + return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2)) + + def pp_console_row( pp, - oids=None, + oid_maps=(), with_offsets=False, with_blob=True, with_colours=False, @@ -1305,7 +1519,9 @@ def pp_console_row( ), LENINDEF_PP_CHAR if pp.expl_lenindef else " ", ) - cols.append(_colourize(col, "red", with_colours, ())) + col = _colourize(col, "red", with_colours, ()) + col += _colourize("B", "red", with_colours) if pp.bered else " " + cols.append(col) col = "[%d,%d,%4d]%s" % ( pp.tlen, pp.llen, @@ -1321,14 +1537,18 @@ def pp_console_row( if isinstance(ent, DecodePathDefBy): cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",))) value = str(ent.defined_by) + oid_name = None if ( - oids is not None and + len(oid_maps) > 0 and ent.defined_by.asn1_type_name == - ObjectIdentifier.asn1_type_name and - value in oids + ObjectIdentifier.asn1_type_name ): - cols.append(_colourize("%s:" % oids[value], "green", with_colours)) - else: + for oid_map in oid_maps: + oid_name = oid_map.get(value) + if oid_name is not None: + cols.append(_colourize("%s:" % oid_name, "green", with_colours)) + break + if oid_name is None: cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",))) else: cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",))) @@ -1342,18 +1562,30 @@ def pp_console_row( cols.append(_colourize(col, "blue", with_colours)) if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper(): cols.append(_colourize(pp.obj_name, "magenta", with_colours)) - if pp.bered: + if pp.ber_encoded: cols.append(_colourize("BER", "red", with_colours)) cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours)) if pp.value is not None: value = pp.value cols.append(_colourize(value, "white", with_colours, ("reverse",))) if ( - oids is not None and - pp.asn1_type_name == ObjectIdentifier.asn1_type_name and - value in oids + len(oid_maps) > 0 and + pp.asn1_type_name == ObjectIdentifier.asn1_type_name ): - cols.append(_colourize("(%s)" % oids[value], "green", with_colours)) + for oid_map in oid_maps: + oid_name = oid_map.get(value) + if oid_name is not None: + cols.append(_colourize("(%s)" % oid_name, "green", with_colours)) + break + if pp.asn1_type_name == Integer.asn1_type_name: + hex_repr = hex(int(pp.obj._value))[2:].upper() + if len(hex_repr) % 2 != 0: + hex_repr = "0" + hex_repr + cols.append(_colourize( + "(%s)" % colonize_hex(hex_repr), + "green", + with_colours, + )) if with_blob: if isinstance(pp.blob, binary_type): cols.append(hexenc(pp.blob)) @@ -1373,24 +1605,22 @@ def pp_console_row( def pp_console_blob(pp, decode_path_len_decrease=0): - cols = [" " * len("XXXXXYYZ [X,X,XXXX]Z")] + cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")] decode_path_len = len(pp.decode_path) - decode_path_len_decrease if decode_path_len > 0: cols.append(" ." * (decode_path_len + 1)) if isinstance(pp.blob, binary_type): blob = hexenc(pp.blob).upper() - for i in range(0, len(blob), 32): + for i in six_xrange(0, len(blob), 32): chunk = blob[i:i + 32] - yield " ".join(cols + [":".join( - chunk[j:j + 2] for j in range(0, len(chunk), 2) - )]) + yield " ".join(cols + [colonize_hex(chunk)]) elif isinstance(pp.blob, tuple): yield " ".join(cols + [", ".join(pp.blob)]) def pprint( obj, - oids=None, + oid_maps=(), big_blobs=False, with_colours=False, with_decode_path=False, @@ -1399,8 +1629,9 @@ def pprint( """Pretty print object :param Obj obj: object you want to pretty print - :param oids: ``OID <-> humand readable string`` dictionary. When OID - from it is met, then its humand readable form is printed + :param oid_maps: list of ``OID <-> humand readable string`` dictionary. + When OID from it is met, then its humand readable form + is printed :param big_blobs: if large binary objects are met (like OctetString values), do we need to print them too, on separate lines @@ -1422,7 +1653,7 @@ def pprint( if big_blobs: yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=False, with_colours=with_colours, @@ -1437,7 +1668,7 @@ def pprint( else: yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=True, with_colours=with_colours, @@ -1498,10 +1729,10 @@ class Boolean(Obj): self._value = default def _value_sanitize(self, value): - if issubclass(value.__class__, Boolean): - return value._value if isinstance(value, bool): return value + if issubclass(value.__class__, Boolean): + return value._value raise InvalidValueType((self.__class__, bool)) @property @@ -1518,6 +1749,9 @@ class Boolean(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded return obj def __nonzero__(self): @@ -1580,7 +1814,7 @@ class Boolean(Obj): offset=offset, ) if tag_only: - return + return None try: l, _, v = len_decode(lv) except DecodeError as err: @@ -1605,14 +1839,14 @@ class Boolean(Obj): offset=offset, ) first_octet = byte2int(v) - bered = False + ber_encoded = False if first_octet == 0: value = False elif first_octet == 0xFF: value = True elif ctx.get("bered", False): value = True - bered = True + ber_encoded = True else: raise DecodeError( "unacceptable Boolean value", @@ -1628,7 +1862,7 @@ class Boolean(Obj): optional=self.optional, _decoded=(offset, 1, 1), ) - obj.bered = bered + obj.ber_encoded = ber_encoded return obj, v[1:] def __repr__(self): @@ -1636,6 +1870,7 @@ class Boolean(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -1653,6 +1888,7 @@ class Boolean(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, + ber_encoded=self.ber_encoded, bered=self.bered, ) for pp in self.pps_lenindef(decode_path): @@ -1742,10 +1978,10 @@ class Integer(Obj): self._value = default def _value_sanitize(self, value): - if issubclass(value.__class__, Integer): - value = value._value - elif isinstance(value, integer_types): + if isinstance(value, integer_types): pass + elif issubclass(value.__class__, Integer): + value = value._value elif isinstance(value, str): value = self.specs.get(value) if value is None: @@ -1772,6 +2008,9 @@ class Integer(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded return obj def __int__(self): @@ -1802,9 +2041,10 @@ class Integer(Obj): @property def named(self): - for name, value in self.specs.items(): + for name, value in iteritems(self.specs): if value == self._value: return name + return None def __call__( self, @@ -1884,7 +2124,7 @@ class Integer(Obj): offset=offset, ) if tag_only: - return + return None try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -1962,6 +2202,7 @@ class Integer(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -1979,11 +2220,15 @@ class Integer(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, + bered=self.bered, ) for pp in self.pps_lenindef(decode_path): yield pp +SET01 = frozenset(("0", "1")) + + class BitString(Obj): """``BIT STRING`` bit string type @@ -2089,8 +2334,6 @@ class BitString(Obj): return bit_len, bytes(octets) def _value_sanitize(self, value): - if issubclass(value.__class__, BitString): - return value._value if isinstance(value, (string_types, binary_type)): if ( isinstance(value, string_types) and @@ -2098,10 +2341,10 @@ class BitString(Obj): ): if value.endswith("'B"): value = value[1:-2] - if not set(value) <= set(("0", "1")): + if not frozenset(value) <= SET01: raise ValueError("B's coding contains unacceptable chars") return self._bits2octets(value) - elif value.endswith("'H"): + if value.endswith("'H"): value = value[1:-2] return ( len(value) * 4, @@ -2109,8 +2352,7 @@ class BitString(Obj): ) if isinstance(value, binary_type): return (len(value) * 8, value) - else: - raise InvalidValueType((self.__class__, string_types, binary_type)) + raise InvalidValueType((self.__class__, string_types, binary_type)) if isinstance(value, tuple): if ( len(value) == 2 and @@ -2126,11 +2368,13 @@ class BitString(Obj): bits.append(bit) if len(bits) == 0: return self._bits2octets("") - bits = set(bits) + bits = frozenset(bits) return self._bits2octets("".join( ("1" if bit in bits else "0") for bit in six_xrange(max(bits) + 1) )) + if issubclass(value.__class__, BitString): + return value._value raise InvalidValueType((self.__class__, binary_type, string_types)) @property @@ -2150,6 +2394,9 @@ class BitString(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded return obj def __iter__(self): @@ -2179,7 +2426,7 @@ class BitString(Obj): @property def named(self): - return [name for name, bit in self.specs.items() if self[bit]] + return [name for name, bit in iteritems(self.specs) if self[bit]] def __call__( self, @@ -2224,7 +2471,7 @@ class BitString(Obj): octets, )) - def _decode_chunk(self, lv, offset, decode_path, ctx): + def _decode_chunk(self, lv, offset, decode_path): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2293,9 +2540,9 @@ class BitString(Obj): offset=offset, ) if t == self.tag: - if tag_only: - return - return self._decode_chunk(lv, offset, decode_path, ctx) + if tag_only: # pragma: no cover + return None + return self._decode_chunk(lv, offset, decode_path) if t == self.tag_constructed: if not ctx.get("bered", False): raise DecodeError( @@ -2304,8 +2551,8 @@ class BitString(Obj): decode_path=decode_path, offset=offset, ) - if tag_only: - return + if tag_only: # pragma: no cover + return None lenindef = False try: l, llen, v = len_decode(lv) @@ -2358,6 +2605,7 @@ class BitString(Obj): decode_path=sub_decode_path, leavemm=True, ctx=ctx, + _ctx_immutable=False, ) except TagMismatch: raise DecodeError( @@ -2402,7 +2650,7 @@ class BitString(Obj): _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), ) obj.lenindef = lenindef - obj.bered = True + obj.ber_encoded = True return obj, (v[EOC_LEN:] if lenindef else v) raise TagMismatch( klass=self.__class__, @@ -2422,6 +2670,7 @@ class BitString(Obj): if len(self.specs) > 0: blob = tuple(self.named) yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -2441,6 +2690,7 @@ class BitString(Obj): expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, lenindef=self.lenindef, + ber_encoded=self.ber_encoded, bered=self.bered, ) defined_by, defined = self.defined or (None, None) @@ -2533,10 +2783,10 @@ class OctetString(Obj): ) def _value_sanitize(self, value): - if issubclass(value.__class__, OctetString): - value = value._value - elif isinstance(value, binary_type): + if isinstance(value, binary_type): pass + elif issubclass(value.__class__, OctetString): + value = value._value else: raise InvalidValueType((self.__class__, bytes)) if not self._bound_min <= len(value) <= self._bound_max: @@ -2559,6 +2809,9 @@ class OctetString(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded return obj def __bytes__(self): @@ -2608,7 +2861,7 @@ class OctetString(Obj): self._value, )) - def _decode_chunk(self, lv, offset, decode_path, ctx): + def _decode_chunk(self, lv, offset, decode_path): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2664,8 +2917,8 @@ class OctetString(Obj): ) if t == self.tag: if tag_only: - return - return self._decode_chunk(lv, offset, decode_path, ctx) + return None + return self._decode_chunk(lv, offset, decode_path) if t == self.tag_constructed: if not ctx.get("bered", False): raise DecodeError( @@ -2675,7 +2928,7 @@ class OctetString(Obj): offset=offset, ) if tag_only: - return + return None lenindef = False try: l, llen, v = len_decode(lv) @@ -2721,6 +2974,7 @@ class OctetString(Obj): decode_path=sub_decode_path, leavemm=True, ctx=ctx, + _ctx_immutable=False, ) except TagMismatch: raise DecodeError( @@ -2758,7 +3012,7 @@ class OctetString(Obj): offset=offset, ) obj.lenindef = lenindef - obj.bered = True + obj.ber_encoded = True return obj, (v[EOC_LEN:] if lenindef else v) raise TagMismatch( klass=self.__class__, @@ -2771,6 +3025,7 @@ class OctetString(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -2790,6 +3045,7 @@ class OctetString(Obj): expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, lenindef=self.lenindef, + ber_encoded=self.ber_encoded, bered=self.bered, ) defined_by, defined = self.defined or (None, None) @@ -2842,6 +3098,9 @@ class Null(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded return obj def __eq__(self, their): @@ -2884,8 +3143,8 @@ class Null(Obj): decode_path=decode_path, offset=offset, ) - if tag_only: - return + if tag_only: # pragma: no cover + return None try: l, _, v = len_decode(lv) except DecodeError as err: @@ -2915,6 +3174,7 @@ class Null(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -2930,6 +3190,7 @@ class Null(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, + bered=self.bered, ) for pp in self.pps_lenindef(decode_path): yield pp @@ -3050,6 +3311,9 @@ class ObjectIdentifier(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded return obj def __iter__(self): @@ -3134,8 +3398,8 @@ class ObjectIdentifier(Obj): decode_path=decode_path, offset=offset, ) - if tag_only: - return + if tag_only: # pragma: no cover + return None try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3161,11 +3425,17 @@ class ObjectIdentifier(Obj): ) v, tail = v[:l], v[l:] arcs = [] + ber_encoded = False while len(v) > 0: i = 0 arc = 0 while True: octet = indexbytes(v, i) + if i == 0 and octet == 0x80: + if ctx.get("bered", False): + ber_encoded = True + else: + raise DecodeError("non normalized arc encoding") arc = (arc << 7) | (octet & 0x7F) if octet & 0x80 == 0: arcs.append(arc) @@ -3197,6 +3467,8 @@ class ObjectIdentifier(Obj): optional=self.optional, _decoded=(offset, llen, l), ) + if ber_encoded: + obj.ber_encoded = True return obj, tail def __repr__(self): @@ -3204,6 +3476,7 @@ class ObjectIdentifier(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -3221,6 +3494,8 @@ class ObjectIdentifier(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, + ber_encoded=self.ber_encoded, + bered=self.bered, ) for pp in self.pps_lenindef(decode_path): yield pp @@ -3263,7 +3538,10 @@ class Enumerated(Integer): if isinstance(value, self.__class__): value = value._value elif isinstance(value, integer_types): - if value not in list(self.specs.values()): + for _value in itervalues(self.specs): + if _value == value: + break + else: raise DecodeError( "unknown integer value: %s" % value, klass=self.__class__, @@ -3288,6 +3566,9 @@ class Enumerated(Integer): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded return obj def __call__( @@ -3429,6 +3710,7 @@ class CommonString(OctetString): if self.ready: value = hexenc(bytes(self)) if no_unicode else self.__unicode__() yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -3446,6 +3728,8 @@ class CommonString(OctetString): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, + ber_encoded=self.ber_encoded, + bered=self.bered, ) for pp in self.pps_lenindef(decode_path): yield pp @@ -3458,29 +3742,57 @@ class UTF8String(CommonString): asn1_type_name = "UTF8String" -class NumericString(CommonString): +class AllowableCharsMixin(object): + @property + def allowable_chars(self): + if PY2: + return self._allowable_chars + return frozenset(six_unichr(c) for c in self._allowable_chars) + + +class NumericString(AllowableCharsMixin, CommonString): """Numeric string - Its value is properly sanitized: only ASCII digits can be stored. + Its value is properly sanitized: only ASCII digits with spaces can + be stored. + + >>> NumericString().allowable_chars + frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ']) """ __slots__ = () tag_default = tag_encode(18) encoding = "ascii" asn1_type_name = "NumericString" - allowable_chars = set(digits.encode("ascii")) + _allowable_chars = frozenset(digits.encode("ascii") + b" ") def _value_sanitize(self, value): value = super(NumericString, self)._value_sanitize(value) - if not set(value) <= self.allowable_chars: + if not frozenset(value) <= self._allowable_chars: raise DecodeError("non-numeric value") return value -class PrintableString(CommonString): +class PrintableString(AllowableCharsMixin, CommonString): + """Printable string + + Its value is properly sanitized: see X.680 41.4 table 10. + + >>> PrintableString().allowable_chars + frozenset([' ', "'", ..., 'z']) + """ __slots__ = () tag_default = tag_encode(19) encoding = "ascii" asn1_type_name = "PrintableString" + _allowable_chars = frozenset( + (ascii_letters + digits + " '()+,-./:=?").encode("ascii") + ) + + def _value_sanitize(self, value): + value = super(PrintableString, self)._value_sanitize(value) + if not frozenset(value) <= self._allowable_chars: + raise DecodeError("non-printable value") + return value class TeletexString(CommonString): @@ -3527,14 +3839,16 @@ class UTCTime(CommonString): datetime.datetime(2017, 9, 30, 22, 7, 50) >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime() datetime.datetime(1957, 9, 30, 22, 7, 50) + + .. warning:: + + BER encoding is unsupported. """ __slots__ = () tag_default = tag_encode(23) encoding = "ascii" asn1_type_name = "UTCTime" - fmt = "%y%m%d%H%M%SZ" - def __init__( self, value=None, @@ -3573,24 +3887,36 @@ class UTCTime(CommonString): if self._value is None: self._value = default + def _strptime(self, value): + # datetime.strptime's format: %y%m%d%H%M%SZ + if len(value) != LEN_YYMMDDHHMMSSZ: + raise ValueError("invalid UTCTime length") + if value[-1] != "Z": + raise ValueError("non UTC timezone") + return datetime( + 2000 + int(value[:2]), # %y + int(value[2:4]), # %m + int(value[4:6]), # %d + int(value[6:8]), # %H + int(value[8:10]), # %M + int(value[10:12]), # %S + ) + def _value_sanitize(self, value): - if isinstance(value, self.__class__): - return value._value - if isinstance(value, datetime): - return value.strftime(self.fmt).encode("ascii") if isinstance(value, binary_type): try: value_decoded = value.decode("ascii") except (UnicodeEncodeError, UnicodeDecodeError) as err: - raise DecodeError("invalid UTCTime encoding") - if len(value_decoded) == LEN_YYMMDDHHMMSSZ: - try: - datetime.strptime(value_decoded, self.fmt) - except (TypeError, ValueError): - raise DecodeError("invalid UTCTime format") - return value - else: - raise DecodeError("invalid UTCTime length") + raise DecodeError("invalid UTCTime encoding: %r" % err) + try: + self._strptime(value_decoded) + except (TypeError, ValueError) as err: + raise DecodeError("invalid UTCTime format: %r" % err) + return value + if isinstance(value, self.__class__): + return value._value + if isinstance(value, datetime): + return value.strftime("%y%m%d%H%M%SZ").encode("ascii") raise InvalidValueType((self.__class__, datetime)) def __eq__(self, their): @@ -3615,7 +3941,7 @@ class UTCTime(CommonString): having < 50 years are treated as 20xx, 19xx otherwise, according to X.509 recomendation. """ - value = datetime.strptime(self._value.decode("ascii"), self.fmt) + value = self._strptime(self._value.decode("ascii")) year = value.year % 100 return datetime( year=(2000 + year) if year < 50 else (1900 + year), @@ -3631,6 +3957,7 @@ class UTCTime(CommonString): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -3648,6 +3975,8 @@ class UTCTime(CommonString): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, + ber_encoded=self.ber_encoded, + bered=self.bered, ) for pp in self.pps_lenindef(decode_path): yield pp @@ -3664,54 +3993,85 @@ class GeneralizedTime(UTCTime): '20170930220750.000123Z' >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50)) GeneralizedTime GeneralizedTime 2057-09-30T22:07:50 + + .. warning:: + + BER encoding is unsupported. + + .. warning:: + + Only microsecond fractions are supported. + :py:exc:`pyderasn.DecodeError` will be raised during decoding of + higher precision values. """ __slots__ = () tag_default = tag_encode(24) asn1_type_name = "GeneralizedTime" - fmt = "%Y%m%d%H%M%SZ" - fmt_ms = "%Y%m%d%H%M%S.%fZ" + def _strptime(self, value): + l = len(value) + if l == LEN_YYYYMMDDHHMMSSZ: + # datetime.strptime's format: %y%m%d%H%M%SZ + if value[-1] != "Z": + raise ValueError("non UTC timezone") + return datetime( + int(value[:4]), # %Y + int(value[4:6]), # %m + int(value[6:8]), # %d + int(value[8:10]), # %H + int(value[10:12]), # %M + int(value[12:14]), # %S + ) + if l >= LEN_YYYYMMDDHHMMSSDMZ: + # datetime.strptime's format: %Y%m%d%H%M%S.%fZ + if value[-1] != "Z": + raise ValueError("non UTC timezone") + if value[14] != ".": + raise ValueError("no fractions separator") + us = value[15:-1] + if us[-1] == "0": + raise ValueError("trailing zero") + us_len = len(us) + if us_len > 6: + raise ValueError("only microsecond fractions are supported") + us = int(us + ("0" * (6 - us_len))) + decoded = datetime( + int(value[:4]), # %Y + int(value[4:6]), # %m + int(value[6:8]), # %d + int(value[8:10]), # %H + int(value[10:12]), # %M + int(value[12:14]), # %S + us, # %f + ) + return decoded + raise ValueError("invalid GeneralizedTime length") def _value_sanitize(self, value): - if isinstance(value, self.__class__): - return value._value - if isinstance(value, datetime): - return value.strftime( - self.fmt_ms if value.microsecond > 0 else self.fmt - ).encode("ascii") if isinstance(value, binary_type): try: value_decoded = value.decode("ascii") except (UnicodeEncodeError, UnicodeDecodeError) as err: - raise DecodeError("invalid GeneralizedTime encoding") - if len(value_decoded) == LEN_YYYYMMDDHHMMSSZ: - try: - datetime.strptime(value_decoded, self.fmt) - except (TypeError, ValueError): - raise DecodeError( - "invalid GeneralizedTime (without ms) format", - ) - return value - elif len(value_decoded) >= LEN_YYYYMMDDHHMMSSDMZ: - try: - datetime.strptime(value_decoded, self.fmt_ms) - except (TypeError, ValueError): - raise DecodeError( - "invalid GeneralizedTime (with ms) format", - ) - return value - else: + raise DecodeError("invalid GeneralizedTime encoding: %r" % err) + try: + self._strptime(value_decoded) + except (TypeError, ValueError) as err: raise DecodeError( - "invalid GeneralizedTime length", + "invalid GeneralizedTime format: %r" % err, klass=self.__class__, ) + return value + if isinstance(value, self.__class__): + return value._value + if isinstance(value, datetime): + encoded = value.strftime("%Y%m%d%H%M%S") + if value.microsecond > 0: + encoded = encoded + (".%06d" % value.microsecond).rstrip("0") + return (encoded + "Z").encode("ascii") raise InvalidValueType((self.__class__, datetime)) def todatetime(self): - value = self._value.decode("ascii") - if len(value) == LEN_YYYYMMDDHHMMSSZ: - return datetime.strptime(value, self.fmt) - return datetime.strptime(value, self.fmt_ms) + return self._strptime(self._value.decode("ascii")) class GraphicString(CommonString): @@ -3830,8 +4190,6 @@ class Choice(Obj): self._value = default_obj.copy()._value def _value_sanitize(self, value): - if isinstance(value, self.__class__): - return value._value if isinstance(value, tuple) and len(value) == 2: choice, obj = value spec = self.specs.get(choice) @@ -3840,12 +4198,21 @@ class Choice(Obj): if not isinstance(obj, spec.__class__): raise InvalidValueType((spec,)) return (choice, spec(obj)) + if isinstance(value, self.__class__): + return value._value raise InvalidValueType((self.__class__, tuple)) @property def ready(self): return self._value is not None and self._value[1].ready + @property + def bered(self): + return self.expl_lenindef or ( + (self._value is not None) and + self._value[1].bered + ) + def copy(self): obj = self.__class__(schema=self.specs) obj._expl = self._expl @@ -3854,6 +4221,9 @@ class Choice(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded value = self._value if value is not None: obj._value = (value[0], value[1].copy()) @@ -3925,7 +4295,7 @@ class Choice(Obj): return self._value[1].encode() def _decode(self, tlv, offset, decode_path, ctx, tag_only): - for choice, spec in self.specs.items(): + for choice, spec in iteritems(self.specs): sub_decode_path = decode_path + (choice,) try: spec.decode( @@ -3935,6 +4305,7 @@ class Choice(Obj): decode_path=sub_decode_path, ctx=ctx, tag_only=True, + _ctx_immutable=False, ) except TagMismatch: continue @@ -3945,14 +4316,15 @@ class Choice(Obj): decode_path=decode_path, offset=offset, ) - if tag_only: - return + if tag_only: # pragma: no cover + return None value, tail = spec.decode( tlv, offset=offset, leavemm=True, decode_path=sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) obj = self.__class__( schema=self.specs, @@ -3962,8 +4334,6 @@ class Choice(Obj): _decoded=(offset, 0, value.fulllen), ) obj._value = (choice, value) - obj.lenindef = value.lenindef - obj.bered = value.bered return obj, tail def __repr__(self): @@ -3974,6 +4344,7 @@ class Choice(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -3987,7 +4358,6 @@ class Choice(Obj): llen=self.llen, vlen=self.vlen, expl_lenindef=self.expl_lenindef, - lenindef=self.lenindef, bered=self.bered, ) if self.ready: @@ -4065,18 +4435,26 @@ class Any(Obj): self.defined = None def _value_sanitize(self, value): + if isinstance(value, binary_type): + return value if isinstance(value, self.__class__): return value._value if isinstance(value, Obj): return value.encode() - if isinstance(value, binary_type): - return value raise InvalidValueType((self.__class__, Obj, binary_type)) @property def ready(self): return self._value is not None + @property + def bered(self): + if self.expl_lenindef or self.lenindef: + return True + if self.defined is None: + return False + return self.defined[1].bered + def copy(self): obj = self.__class__() obj._value = self._value @@ -4086,6 +4464,9 @@ class Any(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded return obj def __eq__(self, their): @@ -4149,6 +4530,7 @@ class Any(Obj): decode_path=decode_path + (str(chunk_i),), leavemm=True, ctx=ctx, + _ctx_immutable=False, ) vlen += chunk.tlvlen sub_offset += chunk.tlvlen @@ -4193,6 +4575,7 @@ class Any(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -4211,6 +4594,7 @@ class Any(Obj): expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, lenindef=self.lenindef, + bered=self.bered, ) defined_by, defined = self.defined or (None, None) if defined_by is not None: @@ -4241,8 +4625,7 @@ def get_def_by_path(defines_by_path, sub_decode_path): def abs_decode_path(decode_path, rel_path): """Create an absolute decode path from current and relative ones - :param decode_path: current decode path, starting point. - Tuple of strings + :param decode_path: current decode path, starting point. Tuple of strings :param rel_path: relative path to ``decode_path``. Tuple of strings. If first tuple's element is "/", then treat it as an absolute path, ignoring ``decode_path`` as @@ -4397,17 +4780,22 @@ class Sequence(Obj): @property def ready(self): - for name, spec in self.specs.items(): + for name, spec in iteritems(self.specs): value = self._value.get(name) if value is None: if spec.optional: continue return False - else: - if not value.ready: - return False + if not value.ready: + return False return True + @property + def bered(self): + if self.expl_lenindef or self.lenindef or self.ber_encoded: + return True + return any(value.bered for value in itervalues(self._value)) + def copy(self): obj = self.__class__(schema=self.specs) obj.tag = self.tag @@ -4417,7 +4805,10 @@ class Sequence(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen - obj._value = {k: v.copy() for k, v in self._value.items()} + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded + obj._value = {k: v.copy() for k, v in iteritems(self._value)} return obj def __eq__(self, their): @@ -4478,7 +4869,7 @@ class Sequence(Obj): def _encoded_values(self): raws = [] - for name, spec in self.specs.items(): + for name, spec in iteritems(self.specs): value = self._value.get(name) if value is None: if spec.optional: @@ -4507,8 +4898,8 @@ class Sequence(Obj): decode_path=decode_path, offset=offset, ) - if tag_only: - return + if tag_only: # pragma: no cover + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -4542,9 +4933,9 @@ class Sequence(Obj): vlen = 0 sub_offset = offset + tlen + llen values = {} - bered = False + ber_encoded = False ctx_allow_default_values = ctx.get("allow_default_values", False) - for name, spec in self.specs.items(): + for name, spec in iteritems(self.specs): if spec.optional and ( (lenindef and v[:EOC_LEN].tobytes() == EOC) or len(v) == 0 @@ -4558,9 +4949,10 @@ class Sequence(Obj): leavemm=True, decode_path=sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) - except TagMismatch: - if spec.optional: + except TagMismatch as err: + if (len(err.decode_path) == len(decode_path) + 1) and spec.optional: continue raise @@ -4582,6 +4974,7 @@ class Sequence(Obj): leavemm=True, decode_path=sub_sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) if len(defined_tail) > 0: raise DecodeError( @@ -4601,6 +4994,7 @@ class Sequence(Obj): leavemm=True, decode_path=sub_decode_path + (DecodePathDefBy(defined_by),), ctx=ctx, + _ctx_immutable=False, ) if len(defined_tail) > 0: raise DecodeError( @@ -4617,7 +5011,7 @@ class Sequence(Obj): v = v_tail if spec.default is not None and value == spec.default: if ctx_bered or ctx_allow_default_values: - bered = True + ber_encoded = True else: raise DecodeError( "DEFAULT value met", @@ -4667,7 +5061,7 @@ class Sequence(Obj): ) obj._value = values obj.lenindef = lenindef - obj.bered = bered + obj.ber_encoded = ber_encoded return obj, tail def __repr__(self): @@ -4682,6 +5076,7 @@ class Sequence(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -4699,6 +5094,8 @@ class Sequence(Obj): expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, lenindef=self.lenindef, + ber_encoded=self.ber_encoded, + bered=self.bered, ) for name in self.specs: value = self._value.get(name) @@ -4732,6 +5129,9 @@ class Set(Sequence): v = b"".join(raws) return b"".join((self.tag, len_encode(len(v)), v)) + def _specs_items(self): + return iteritems(self.specs) + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) @@ -4749,7 +5149,7 @@ class Set(Sequence): offset=offset, ) if tag_only: - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -4782,15 +5182,15 @@ class Set(Sequence): vlen = 0 sub_offset = offset + tlen + llen values = {} - bered = False + ber_encoded = False ctx_allow_default_values = ctx.get("allow_default_values", False) ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) value_prev = memoryview(v[:0]) - specs_items = self.specs.items + while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - for name, spec in specs_items(): + for name, spec in self._specs_items(): sub_decode_path = decode_path + (name,) try: spec.decode( @@ -4800,6 +5200,7 @@ class Set(Sequence): decode_path=sub_decode_path, ctx=ctx, tag_only=True, + _ctx_immutable=False, ) except TagMismatch: continue @@ -4816,11 +5217,12 @@ class Set(Sequence): leavemm=True, decode_path=sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) value_len = value.fulllen if value_prev.tobytes() > v[:value_len].tobytes(): if ctx_bered or ctx_allow_unordered_set: - bered = True + ber_encoded = True else: raise DecodeError( "unordered " + self.asn1_type_name, @@ -4831,7 +5233,7 @@ class Set(Sequence): if spec.default is None or value != spec.default: pass elif ctx_bered or ctx_allow_default_values: - bered = True + ber_encoded = True else: raise DecodeError( "DEFAULT value met", @@ -4870,7 +5272,7 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) - obj.bered = bered + obj.ber_encoded = ber_encoded return obj, tail @@ -4969,6 +5371,12 @@ class SequenceOf(Obj): def ready(self): return all(v.ready for v in self._value) + @property + def bered(self): + if self.expl_lenindef or self.lenindef or self.ber_encoded: + return True + return any(v.bered for v in self._value) + def copy(self): obj = self.__class__(schema=self.spec) obj._bound_min = self._bound_min @@ -4980,6 +5388,9 @@ class SequenceOf(Obj): obj.offset = self.offset obj.llen = self.llen obj.vlen = self.vlen + obj.expl_lenindef = self.expl_lenindef + obj.lenindef = self.lenindef + obj.ber_encoded = self.ber_encoded obj._value = [v.copy() for v in self._value] return obj @@ -5071,7 +5482,7 @@ class SequenceOf(Obj): offset=offset, ) if tag_only: - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5107,7 +5518,7 @@ class SequenceOf(Obj): _value = [] ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) value_prev = memoryview(v[:0]) - bered = False + ber_encoded = False spec = self.spec while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: @@ -5119,12 +5530,13 @@ class SequenceOf(Obj): leavemm=True, decode_path=sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) value_len = value.fulllen if ordering_check: if value_prev.tobytes() > v[:value_len].tobytes(): if ctx_bered or ctx_allow_unordered_set: - bered = True + ber_encoded = True else: raise DecodeError( "unordered " + self.asn1_type_name, @@ -5165,7 +5577,7 @@ class SequenceOf(Obj): ) obj.lenindef = True tail = v[EOC_LEN:] - obj.bered = bered + obj.ber_encoded = ber_encoded return obj, tail def __repr__(self): @@ -5176,6 +5588,7 @@ class SequenceOf(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -5193,6 +5606,8 @@ class SequenceOf(Obj): expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, lenindef=self.lenindef, + ber_encoded=self.ber_encoded, + bered=self.bered, ) for i, value in enumerate(self._value): yield value.pps(decode_path=decode_path + (str(i),)) @@ -5249,7 +5664,7 @@ def generic_decoder(): # pragma: no cover choice = PrimitiveTypes() choice.specs["SequenceOf"] = SequenceOf(schema=choice) choice.specs["SetOf"] = SetOf(schema=choice) - for i in range(31): + for i in six_xrange(31): choice.specs["SequenceOf%d" % i] = SequenceOf( schema=choice, expl=tag_ctxc(i), @@ -5263,7 +5678,7 @@ def generic_decoder(): # pragma: no cover def pprint_any( obj, - oids=None, + oid_maps=(), with_colours=False, with_decode_path=False, decode_path_only=(), @@ -5283,7 +5698,7 @@ def generic_decoder(): # pragma: no cover pp = _pp(**pp_kwargs) yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=False, with_colours=with_colours, @@ -5313,7 +5728,7 @@ def main(): # pragma: no cover ) parser.add_argument( "--oids", - help="Python path to dictionary with OIDs", + help="Python paths to dictionary with OIDs, comma separated", ) parser.add_argument( "--schema", @@ -5351,7 +5766,10 @@ def main(): # pragma: no cover args.DERFile.seek(args.skip) der = memoryview(args.DERFile.read()) args.DERFile.close() - oids = obj_by_path(args.oids) if args.oids else {} + oid_maps = ( + [obj_by_path(_path) for _path in (args.oids or "").split(",")] + if args.oids else () + ) if args.schema: schema = obj_by_path(args.schema) from functools import partial @@ -5367,8 +5785,8 @@ def main(): # pragma: no cover obj, tail = schema().decode(der, ctx=ctx) print(pprinter( obj, - oids=oids, - with_colours=True if environ.get("NO_COLOR") is None else False, + oid_maps=oid_maps, + with_colours=environ.get("NO_COLOR") is None, with_decode_path=args.print_decode_path, decode_path_only=( () if args.decode_path_only is None else