X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=blobdiff_plain;f=pyderasn.py;h=ed5b763ccef4123c790d52569d9b376ef2acf168;hp=9a8ccddfa319df841e98a94ac4950f9b6f308c3d;hb=ad6ac8681ad5479b7274165a056624433e0ca2f1;hpb=e7a4b259f8cd580f52160c17842d797fc3ddf7dc diff --git a/pyderasn.py b/pyderasn.py index 9a8ccdd..ed5b763 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # coding: utf-8 +# cython: language_level=3 # PyDERASN -- Python ASN.1 DER/BER codec with abstract structures -# Copyright (C) 2017-2018 Sergey Matveev +# Copyright (C) 2017-2020 Sergey Matveev # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. +# published by the Free Software Foundation, version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -14,8 +14,7 @@ # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public -# License along with this program. If not, see -# . +# License along with this program. If not, see . """Python ASN.1 DER/BER codec with abstract structures This library allows you to marshal various structures in ASN.1 DER @@ -23,7 +22,7 @@ format, unmarshal them in BER/CER/DER ones. >>> i = Integer(123) >>> raw = i.encode() - >>> Integer().decode(raw) == i + >>> Integer().decod(raw) == i True There are primitive types, holding single values @@ -67,10 +66,11 @@ ____ Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is the default tag used during coding process. You can override it with -either ``IMPLICIT`` (using ``impl`` keyword argument), or -``EXPLICIT`` one (using ``expl`` keyword argument). Both arguments take -raw binary string, containing that tag. You can **not** set implicit and -explicit tags simultaneously. +either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl`` +class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword +argument or ``expl`` class attribute). Both arguments take raw binary +string, containing that tag. You can **not** set implicit and explicit +tags simultaneously. There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc` functions, allowing you to easily create ``CONTEXT`` @@ -163,21 +163,27 @@ All objects have ``ready`` boolean property, that tells if object is ready to be encoded. If that kind of action is performed on unready object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised. -All objects have ``copy()`` method, that returns their copy, that can be +All objects are friendly to ``copy.copy()`` and copied objects can be safely mutated. +Also all objects can be safely ``pickle``-d, but pay attention that +pickling among different PyDERASN versions is prohibited. + .. _decoding: Decoding -------- -Decoding is performed using ``decode()`` method. ``offset`` optional -argument could be used to set initial object's offset in the binary -data, for convenience. It returns decoded object and remaining -unmarshalled data (tail). Internally all work is done on +Decoding is performed using :py:meth:`pyderasn.Obj.decode` method. +``offset`` optional argument could be used to set initial object's +offset in the binary data, for convenience. It returns decoded object +and remaining unmarshalled data (tail). Internally all work is done on ``memoryview(data)``, and you can leave returning tail as a memoryview, by specifying ``leavemm=True`` argument. +Also note convenient :py:meth:`pyderasn.Obj.decod` method, that +immediately checks and raises if there is non-empty tail. + When object is decoded, ``decoded`` property is true and you can safely use following properties: @@ -207,9 +213,9 @@ When error occurs, :py:exc:`pyderasn.DecodeError` is raised. Context _______ -You can specify so called context keyword argument during ``decode()`` -invocation. It is dictionary containing various options governing -decoding process. +You can specify so called context keyword argument during +:py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing +various options governing decoding process. Currently available context options: @@ -239,6 +245,106 @@ all object ``repr``. But it is easy to write custom formatters. >>> print(pprint(obj)) 0 [1,1, 2] INTEGER -12345 +.. _pprint_example: + +Example certificate:: + + >>> print(pprint(crt)) + 0 [1,3,1604] Certificate SEQUENCE + 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL + 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 + 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL + . . . . 05:00 + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence + 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF + 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF + 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE + 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 + 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY + . . . . . . . 13:02:45:53 + [...] + 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE + 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL + . . . 05:00 + 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits + . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD + . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C + [...] + + Trailing data: 0a + +Let's parse that output, human:: + + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL + ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ + 0 1 2 3 4 5 6 7 8 9 10 11 + +:: + + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + ^ ^ ^ ^ ^ ^ ^ ^ + 0 2 3 4 5 6 9 10 + +:: + + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence + ^ ^ ^ ^ ^ ^ ^ ^ ^ + 0 2 3 4 5 6 8 9 10 + +:: + + 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes + ^ ^ ^ ^ ^ + 12 13 14 9 10 + +:0: + Offset of the object, where its DER/BER encoding begins. + Pay attention that it does **not** include explicit tag. +:1: + If explicit tag exists, then this is its length (tag + encoded length). +:2: + Length of object's tag. For example CHOICE does not have its own tag, + so it is zero. +:3: + Length of encoded length. +:4: + Length of encoded value. +:5: + Visual indentation to show the depth of object in the hierarchy. +:6: + Object's name inside SEQUENCE/CHOICE. +:7: + If either IMPLICIT or EXPLICIT tag is set, then it will be shown + here. "IMPLICIT" is omitted. +:8: + Object's class name, if set. Omitted if it is just an ordinary simple + value (like with ``algorithm`` in example above). +:9: + Object's ASN.1 type. +:10: + Object's value, if set. Can consist of multiple words (like OCTET/BIT + STRINGs above). We see ``v3`` value in Version, because it is named. + ``rdnSequence`` is the choice of CHOICE type. +:11: + Possible other flags like OPTIONAL and DEFAULT, if value equals to the + default one, specified in the schema. +:12: + Shows does object contains any kind of BER encoded data (possibly + Sequence holding BER-encoded underlying value). +:13: + Only applicable to BER encoded data. Indefinite length encoding mark. +:14: + Only applicable to BER encoded data. If object has BER-specific + encoding, then ``BER`` will be shown. It does not depend on indefinite + length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING`` + (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime`` + could be BERed. + + .. _definedby: DEFINED BY @@ -249,6 +355,8 @@ DEFINED BY some previously met ObjectIdentifier. This library provides ability to specify mapping between some OID and field that must be decoded with specific specification. +.. _defines: + defines kwarg _____________ @@ -322,15 +430,15 @@ value must be sequence of following tuples:: where ``decode_path`` is a tuple holding so-called decode path to the exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply -``defines``, holding exactly the same value as accepted in its keyword -argument. +``defines``, holding exactly the same value as accepted in its +:ref:`keyword argument `. For example, again for CMS, you want to automatically decode ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse`` structures it may hold. Also, automatically decode ``controlSequence`` of ``PKIResponse``:: - content_info, tail = ContentInfo().decode(data, defines_by_path=( + content_info = ContentInfo().decod(data, ctx={"defines_by_path": ( ( ("contentType",), ((("content",), {id_signedData: SignedData()}),), @@ -365,7 +473,7 @@ of ``PKIResponse``:: id_cmc_transactionId: TransactionId(), })), ), - )) + )}) Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``. First function is useful for path construction when some automatic @@ -384,7 +492,8 @@ constructed primitive types should be parsed successfully. * If object is encoded in BER form (not the DER one), then ``ber_encoded`` attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET - STRING``, ``SEQUENCE``, ``SET``, ``SET OF`` can contain it. + STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``, + ``UTCTime``, ``GeneralizedTime`` can contain it. * If object has an indefinite length encoding, then its ``lenindef`` attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``, ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can @@ -417,6 +526,11 @@ lengths will be invalid in that case. This option should be used only for skipping some decode errors, just to see the decoded structure somehow. +Base Obj +-------- +.. autoclass:: pyderasn.Obj + :members: + Primitive types --------------- @@ -462,6 +576,11 @@ NumericString _____________ .. autoclass:: pyderasn.NumericString +PrintableString +_______________ +.. autoclass:: pyderasn.PrintableString + :members: __init__ + UTCTime _______ .. autoclass:: pyderasn.UTCTime @@ -515,16 +634,17 @@ Various ------- .. autofunction:: pyderasn.abs_decode_path +.. autofunction:: pyderasn.colonize_hex .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec .. autofunction:: pyderasn.tag_encode .. autofunction:: pyderasn.tag_decode .. autofunction:: pyderasn.tag_ctxp .. autofunction:: pyderasn.tag_ctxc -.. autoclass:: pyderasn.Obj .. autoclass:: pyderasn.DecodeError :members: __init__ .. autoclass:: pyderasn.NotEnoughData +.. autoclass:: pyderasn.ExceedingData .. autoclass:: pyderasn.LenIndefForm .. autoclass:: pyderasn.TagMismatch .. autoclass:: pyderasn.InvalidLength @@ -539,11 +659,14 @@ from codecs import getdecoder from codecs import getencoder from collections import namedtuple from collections import OrderedDict +from copy import copy from datetime import datetime +from datetime import timedelta from math import ceil from os import environ from string import ascii_letters from string import digits +from unicodedata import category as unicat from six import add_metaclass from six import binary_type @@ -552,18 +675,22 @@ from six import indexbytes from six import int2byte from six import integer_types from six import iterbytes +from six import iteritems +from six import itervalues from six import PY2 from six import string_types from six import text_type +from six import unichr as six_unichr from six.moves import xrange as six_xrange try: from termcolor import colored except ImportError: # pragma: no cover - def colored(what, *args): + def colored(what, *args, **kwargs): return what +__version__ = "6.2" __all__ = ( "Any", @@ -575,6 +702,7 @@ __all__ = ( "DecodeError", "DecodePathDefBy", "Enumerated", + "ExceedingData", "GeneralizedTime", "GeneralString", "GraphicString", @@ -636,13 +764,31 @@ EOC = b"\x00\x00" EOC_LEN = len(EOC) LENINDEF = b"\x80" # length indefinite mark LENINDEF_PP_CHAR = "I" if PY2 else "∞" +NAMEDTUPLE_KWARGS = {} if PY2 else {"module": __name__} +SET01 = frozenset("01") +DECIMALS = frozenset(digits) +DECIMAL_SIGNS = ".," + + +def pureint(value): + if not set(value) <= DECIMALS: + raise ValueError("non-pure integer") + return int(value) + +def fractions2float(fractions_raw): + pureint(fractions_raw) + return float("0." + fractions_raw) ######################################################################## # Errors ######################################################################## -class DecodeError(Exception): +class ASN1Error(ValueError): + pass + + +class DecodeError(ASN1Error): def __init__(self, msg="", klass=None, decode_path=(), offset=0): """ :param str msg: reason of decode failing @@ -681,6 +827,18 @@ class NotEnoughData(DecodeError): pass +class ExceedingData(ASN1Error): + def __init__(self, nbytes): + super(ExceedingData, self).__init__() + self.nbytes = nbytes + + def __str__(self): + return "%d trailing bytes" % self.nbytes + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self) + + class LenIndefForm(DecodeError): pass @@ -697,7 +855,7 @@ class InvalidOID(DecodeError): pass -class ObjUnknown(ValueError): +class ObjUnknown(ASN1Error): def __init__(self, name): super(ObjUnknown, self).__init__() self.name = name @@ -709,7 +867,7 @@ class ObjUnknown(ValueError): return "%s(%s)" % (self.__class__.__name__, self) -class ObjNotReady(ValueError): +class ObjNotReady(ASN1Error): def __init__(self, name): super(ObjNotReady, self).__init__() self.name = name @@ -721,7 +879,7 @@ class ObjNotReady(ValueError): return "%s(%s)" % (self.__class__.__name__, self) -class InvalidValueType(ValueError): +class InvalidValueType(ASN1Error): def __init__(self, expected_types): super(InvalidValueType, self).__init__() self.expected_types = expected_types @@ -735,7 +893,7 @@ class InvalidValueType(ValueError): return "%s(%s)" % (self.__class__.__name__, self) -class BoundsError(ValueError): +class BoundsError(ASN1Error): def __init__(self, bound_min, value, bound_max): super(BoundsError, self).__init__() self.bound_min = bound_min @@ -906,9 +1064,9 @@ def len_decode(data): ######################################################################## class AutoAddSlots(type): - def __new__(mcs, name, bases, _dict): + def __new__(cls, name, bases, _dict): _dict["__slots__"] = _dict.get("__slots__", ()) - return type.__new__(mcs, name, bases, _dict) + return type.__new__(cls, name, bases, _dict) @add_metaclass(AutoAddSlots) @@ -975,17 +1133,36 @@ class Obj(object): """ return (self.llen + self.vlen) > 0 - def copy(self): # pragma: no cover - """Make a copy of object, safe to be mutated + def __getstate__(self): # pragma: no cover + """Used for making safe to be mutable pickleable copies """ raise NotImplementedError() + def __setstate__(self, state): + if state.version != __version__: + raise ValueError("data is pickled by different PyDERASN version") + self.tag = self.tag_default + self._value = None + self._expl = None + self.default = None + self.optional = False + self.offset = 0 + self.llen = 0 + self.vlen = 0 + self.expl_lenindef = False + self.lenindef = False + self.ber_encoded = False + @property def tlen(self): + """See :ref:`decoding` + """ return len(self.tag) @property def tlvlen(self): + """See :ref:`decoding` + """ return self.tlen + self.llen + self.vlen def __str__(self): # pragma: no cover @@ -1010,11 +1187,20 @@ class Obj(object): raise NotImplementedError() def encode(self): + """Encode the structure + + :returns: DER representation + """ raw = self._encode() if self._expl is None: return raw return b"".join((self._expl, len_encode(len(raw)), raw)) + def hexencode(self): + """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode` + """ + return hexenc(self.encode()) + def decode( self, data, @@ -1023,6 +1209,7 @@ class Obj(object): decode_path=(), ctx=None, tag_only=False, + _ctx_immutable=True, ): """Decode the data @@ -1030,14 +1217,20 @@ class Obj(object): :param int offset: initial data's offset :param bool leavemm: do we need to leave memoryview of remaining data as is, or convert it to bytes otherwise - :param ctx: optional :ref:`context ` governing decoding process. + :param ctx: optional :ref:`context ` governing decoding process :param tag_only: decode only the tag, without length and contents (used only in Choice and Set structures, trying to determine if tag satisfies the scheme) + :param _ctx_immutable: do we need to ``copy.copy()`` ``ctx`` + before using it? :returns: (Obj, remaining data) + + .. seealso:: :ref:`decoding` """ if ctx is None: ctx = {} + elif _ctx_immutable: + ctx = copy(ctx) tlv = memoryview(data) if self._expl is None: result = self._decode( @@ -1048,7 +1241,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: - return + return None obj, tail = result else: try: @@ -1086,7 +1279,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: # pragma: no cover - return + return None obj, tail = result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: @@ -1121,7 +1314,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: # pragma: no cover - return + return None obj, tail = result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( @@ -1132,48 +1325,96 @@ class Obj(object): ) return obj, (tail if leavemm else tail.tobytes()) + def decod(self, data, offset=0, decode_path=(), ctx=None): + """Decode the data, check that tail is empty + + :raises ExceedingData: if tail is not empty + + This is just a wrapper over :py:meth:`pyderasn.Obj.decode` + (decode without tail) that also checks that there is no + trailing data left. + """ + obj, tail = self.decode( + data, + offset=offset, + decode_path=decode_path, + ctx=ctx, + leavemm=True, + ) + if len(tail) > 0: + raise ExceedingData(len(tail)) + return obj + + def hexdecode(self, data, *args, **kwargs): + """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data + """ + return self.decode(hexdec(data), *args, **kwargs) + + def hexdecod(self, data, *args, **kwargs): + """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data + """ + return self.decod(hexdec(data), *args, **kwargs) + @property def expled(self): + """See :ref:`decoding` + """ return self._expl is not None @property def expl_tag(self): + """See :ref:`decoding` + """ return self._expl @property def expl_tlen(self): + """See :ref:`decoding` + """ return len(self._expl) @property def expl_llen(self): + """See :ref:`decoding` + """ if self.expl_lenindef: return 1 return len(len_encode(self.tlvlen)) @property def expl_offset(self): + """See :ref:`decoding` + """ return self.offset - self.expl_tlen - self.expl_llen @property def expl_vlen(self): + """See :ref:`decoding` + """ return self.tlvlen @property def expl_tlvlen(self): + """See :ref:`decoding` + """ return self.expl_tlen + self.expl_llen + self.expl_vlen @property def fulloffset(self): + """See :ref:`decoding` + """ return self.expl_offset if self.expled else self.offset @property def fulllen(self): + """See :ref:`decoding` + """ return self.expl_tlvlen if self.expled else self.tlvlen def pps_lenindef(self, decode_path): if self.lenindef and not ( - getattr(self, "defined", None) is not None and - self.defined[1].lenindef + getattr(self, "defined", None) is not None and + self.defined[1].lenindef ): yield _pp( asn1_type_name="EOC", @@ -1231,6 +1472,7 @@ class DecodePathDefBy(object): ######################################################################## PP = namedtuple("PP", ( + "obj", "asn1_type_name", "obj_name", "decode_path", @@ -1252,10 +1494,11 @@ PP = namedtuple("PP", ( "lenindef", "ber_encoded", "bered", -)) +), **NAMEDTUPLE_KWARGS) def _pp( + obj=None, asn1_type_name="unknown", obj_name="unknown", decode_path=(), @@ -1279,6 +1522,7 @@ def _pp( bered=False, ): return PP( + obj, asn1_type_name, obj_name, decode_path, @@ -1307,9 +1551,15 @@ def _colourize(what, colour, with_colours, attrs=("bold",)): return colored(what, colour, attrs=attrs) if with_colours else what +def colonize_hex(hexed): + """Separate hexadecimal string with colons + """ + return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2)) + + def pp_console_row( pp, - oids=None, + oid_maps=(), with_offsets=False, with_blob=True, with_colours=False, @@ -1344,14 +1594,18 @@ def pp_console_row( if isinstance(ent, DecodePathDefBy): cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",))) value = str(ent.defined_by) + oid_name = None if ( - oids is not None and + len(oid_maps) > 0 and ent.defined_by.asn1_type_name == - ObjectIdentifier.asn1_type_name and - value in oids + ObjectIdentifier.asn1_type_name ): - cols.append(_colourize("%s:" % oids[value], "green", with_colours)) - else: + for oid_map in oid_maps: + oid_name = oid_map.get(value) + if oid_name is not None: + cols.append(_colourize("%s:" % oid_name, "green", with_colours)) + break + if oid_name is None: cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",))) else: cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",))) @@ -1372,15 +1626,27 @@ def pp_console_row( value = pp.value cols.append(_colourize(value, "white", with_colours, ("reverse",))) if ( - oids is not None and - pp.asn1_type_name == ObjectIdentifier.asn1_type_name and - value in oids + len(oid_maps) > 0 and + pp.asn1_type_name == ObjectIdentifier.asn1_type_name ): - cols.append(_colourize("(%s)" % oids[value], "green", with_colours)) + for oid_map in oid_maps: + oid_name = oid_map.get(value) + if oid_name is not None: + cols.append(_colourize("(%s)" % oid_name, "green", with_colours)) + break + if pp.asn1_type_name == Integer.asn1_type_name: + hex_repr = hex(int(pp.obj._value))[2:].upper() + if len(hex_repr) % 2 != 0: + hex_repr = "0" + hex_repr + cols.append(_colourize( + "(%s)" % colonize_hex(hex_repr), + "green", + with_colours, + )) if with_blob: - if isinstance(pp.blob, binary_type): + if pp.blob.__class__ == binary_type: cols.append(hexenc(pp.blob)) - elif isinstance(pp.blob, tuple): + elif pp.blob.__class__ == tuple: cols.append(", ".join(pp.blob)) if pp.optional: cols.append(_colourize("OPTIONAL", "red", with_colours)) @@ -1400,20 +1666,18 @@ def pp_console_blob(pp, decode_path_len_decrease=0): decode_path_len = len(pp.decode_path) - decode_path_len_decrease if decode_path_len > 0: cols.append(" ." * (decode_path_len + 1)) - if isinstance(pp.blob, binary_type): + if pp.blob.__class__ == binary_type: blob = hexenc(pp.blob).upper() - for i in range(0, len(blob), 32): + for i in six_xrange(0, len(blob), 32): chunk = blob[i:i + 32] - yield " ".join(cols + [":".join( - chunk[j:j + 2] for j in range(0, len(chunk), 2) - )]) - elif isinstance(pp.blob, tuple): + yield " ".join(cols + [colonize_hex(chunk)]) + elif pp.blob.__class__ == tuple: yield " ".join(cols + [", ".join(pp.blob)]) def pprint( obj, - oids=None, + oid_maps=(), big_blobs=False, with_colours=False, with_decode_path=False, @@ -1422,8 +1686,8 @@ def pprint( """Pretty print object :param Obj obj: object you want to pretty print - :param oids: ``OID <-> humand readable string`` dictionary. When OID - from it is met, then its humand readable form is printed + :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary. + Its human readable form is printed when OID is met :param big_blobs: if large binary objects are met (like OctetString values), do we need to print them too, on separate lines @@ -1436,16 +1700,16 @@ def pprint( for pp in pps: if hasattr(pp, "_fields"): if ( - decode_path_only != () and - tuple( - str(p) for p in pp.decode_path[:len(decode_path_only)] - ) != decode_path_only + decode_path_only != () and + tuple( + str(p) for p in pp.decode_path[:len(decode_path_only)] + ) != decode_path_only ): continue if big_blobs: yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=False, with_colours=with_colours, @@ -1453,14 +1717,14 @@ def pprint( decode_path_len_decrease=len(decode_path_only), ) for row in pp_console_blob( - pp, - decode_path_len_decrease=len(decode_path_only), + pp, + decode_path_len_decrease=len(decode_path_only), ): yield row else: yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=True, with_colours=with_colours, @@ -1477,6 +1741,22 @@ def pprint( # ASN.1 primitive types ######################################################################## +BooleanState = namedtuple("BooleanState", ( + "version", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Boolean(Obj): """``BOOLEAN`` boolean type @@ -1521,27 +1801,45 @@ class Boolean(Obj): self._value = default def _value_sanitize(self, value): + if value.__class__ == bool: + return value if issubclass(value.__class__, Boolean): return value._value - if isinstance(value, bool): - return value raise InvalidValueType((self.__class__, bool)) @property def ready(self): return self._value is not None - def copy(self): - obj = self.__class__() - obj._value = self._value - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - return obj + def __getstate__(self): + return BooleanState( + __version__, + self._value, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Boolean, self).__setstate__(state) + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __nonzero__(self): self._assert_ready() @@ -1552,7 +1850,7 @@ class Boolean(Obj): return self._value def __eq__(self, their): - if isinstance(their, bool): + if their.__class__ == bool: return self._value == their if not issubclass(their.__class__, Boolean): return False @@ -1603,7 +1901,7 @@ class Boolean(Obj): offset=offset, ) if tag_only: - return + return None try: l, _, v = len_decode(lv) except DecodeError as err: @@ -1659,6 +1957,7 @@ class Boolean(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -1683,6 +1982,25 @@ class Boolean(Obj): yield pp +IntegerState = namedtuple("IntegerState", ( + "version", + "specs", + "value", + "bound_min", + "bound_max", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Integer(Obj): """``INTEGER`` integer type @@ -1746,7 +2064,7 @@ class Integer(Obj): super(Integer, self).__init__(impl, expl, default, optional, _decoded) self._value = value specs = getattr(self, "schema", {}) if _specs is None else _specs - self.specs = specs if isinstance(specs, dict) else dict(specs) + self.specs = specs if specs.__class__ == dict else dict(specs) self._bound_min, self._bound_max = getattr( self, "bounds", @@ -1766,11 +2084,11 @@ class Integer(Obj): self._value = default def _value_sanitize(self, value): - if issubclass(value.__class__, Integer): - value = value._value - elif isinstance(value, integer_types): + if isinstance(value, integer_types): pass - elif isinstance(value, str): + elif issubclass(value.__class__, Integer): + value = value._value + elif value.__class__ == str: value = self.specs.get(value) if value is None: raise ObjUnknown("integer value: %s" % value) @@ -1784,19 +2102,41 @@ class Integer(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__(_specs=self.specs) - obj._value = self._value - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - return obj + def __getstate__(self): + return IntegerState( + __version__, + self.specs, + self._value, + self._bound_min, + self._bound_max, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Integer, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self._bound_min = state.bound_min + self._bound_max = state.bound_max + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __int__(self): self._assert_ready() @@ -1826,9 +2166,10 @@ class Integer(Obj): @property def named(self): - for name, value in self.specs.items(): + for name, value in iteritems(self.specs): if value == self._value: return name + return None def __call__( self, @@ -1908,7 +2249,7 @@ class Integer(Obj): offset=offset, ) if tag_only: - return + return None try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -1986,6 +2327,7 @@ class Integer(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -2009,6 +2351,25 @@ class Integer(Obj): yield pp +BitStringState = namedtuple("BitStringState", ( + "version", + "specs", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "tag_constructed", + "defined", +), **NAMEDTUPLE_KWARGS) + + class BitString(Obj): """``BIT STRING`` bit string type @@ -2084,7 +2445,7 @@ class BitString(Obj): """ super(BitString, self).__init__(impl, expl, default, optional, _decoded) specs = getattr(self, "schema", {}) if _specs is None else _specs - self.specs = specs if isinstance(specs, dict) else dict(specs) + self.specs = specs if specs.__class__ == dict else dict(specs) self._value = None if value is None else self._value_sanitize(value) if default is not None: default = self._value_sanitize(default) @@ -2114,8 +2475,6 @@ class BitString(Obj): return bit_len, bytes(octets) def _value_sanitize(self, value): - if issubclass(value.__class__, BitString): - return value._value if isinstance(value, (string_types, binary_type)): if ( isinstance(value, string_types) and @@ -2123,24 +2482,23 @@ class BitString(Obj): ): if value.endswith("'B"): value = value[1:-2] - if not set(value) <= set(("0", "1")): + if not frozenset(value) <= SET01: raise ValueError("B's coding contains unacceptable chars") return self._bits2octets(value) - elif value.endswith("'H"): + if value.endswith("'H"): value = value[1:-2] return ( len(value) * 4, hexdec(value + ("" if len(value) % 2 == 0 else "0")), ) - if isinstance(value, binary_type): + if value.__class__ == binary_type: return (len(value) * 8, value) - else: - raise InvalidValueType((self.__class__, string_types, binary_type)) - if isinstance(value, tuple): + raise InvalidValueType((self.__class__, string_types, binary_type)) + if value.__class__ == tuple: if ( len(value) == 2 and isinstance(value[0], integer_types) and - isinstance(value[1], binary_type) + value[1].__class__ == binary_type ): return value bits = [] @@ -2151,31 +2509,54 @@ class BitString(Obj): bits.append(bit) if len(bits) == 0: return self._bits2octets("") - bits = set(bits) + bits = frozenset(bits) return self._bits2octets("".join( ("1" if bit in bits else "0") for bit in six_xrange(max(bits) + 1) )) + if issubclass(value.__class__, BitString): + return value._value raise InvalidValueType((self.__class__, binary_type, string_types)) @property def ready(self): return self._value is not None - def copy(self): - obj = self.__class__(_specs=self.specs) - value = self._value - if value is not None: - value = (value[0], value[1]) - obj._value = value - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - return obj + def __getstate__(self): + return BitStringState( + __version__, + self.specs, + self._value, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.tag_constructed, + self.defined, + ) + + def __setstate__(self, state): + super(BitString, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.tag_constructed = state.tag_constructed + self.defined = state.defined def __iter__(self): self._assert_ready() @@ -2192,7 +2573,7 @@ class BitString(Obj): return self._value[1] def __eq__(self, their): - if isinstance(their, bytes): + if their.__class__ == bytes: return self._value[1] == their if not issubclass(their.__class__, BitString): return False @@ -2204,7 +2585,7 @@ class BitString(Obj): @property def named(self): - return [name for name, bit in self.specs.items() if self[bit]] + return [name for name, bit in iteritems(self.specs) if self[bit]] def __call__( self, @@ -2224,7 +2605,7 @@ class BitString(Obj): ) def __getitem__(self, key): - if isinstance(key, int): + if key.__class__ == int: bit_len, octets = self._value if key >= bit_len: return False @@ -2249,64 +2630,6 @@ class BitString(Obj): octets, )) - def _decode_chunk(self, lv, offset, decode_path, ctx): - try: - l, llen, v = len_decode(lv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if l > len(v): - raise NotEnoughData( - "encoded length is longer than data", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if l == 0: - raise NotEnoughData( - "zero length", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - pad_size = byte2int(v) - if l == 1 and pad_size != 0: - raise DecodeError( - "invalid empty value", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if pad_size > 7: - raise DecodeError( - "too big pad", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0: - raise DecodeError( - "invalid pad", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - v, tail = v[:l], v[l:] - obj = self.__class__( - value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), - impl=self.tag, - expl=self._expl, - default=self.default, - optional=self.optional, - _specs=self.specs, - _decoded=(offset, llen, l), - ) - return obj, tail - def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) @@ -2319,24 +2642,9 @@ class BitString(Obj): ) if t == self.tag: if tag_only: # pragma: no cover - return - return self._decode_chunk(lv, offset, decode_path, ctx) - if t == self.tag_constructed: - if not ctx.get("bered", False): - raise DecodeError( - "unallowed BER constructed encoding", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: # pragma: no cover - return - lenindef = False + return None try: l, llen, v = len_decode(lv) - except LenIndefForm: - llen, l, v = 1, 0, lv[1:] - lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -2351,95 +2659,166 @@ class BitString(Obj): decode_path=decode_path, offset=offset, ) - if not lenindef and l == 0: + if l == 0: raise NotEnoughData( "zero length", klass=self.__class__, decode_path=decode_path, offset=offset, ) - chunks = [] - sub_offset = offset + tlen + llen - vlen = 0 - while True: - if lenindef: - if v[:EOC_LEN].tobytes() == EOC: - break - else: - if vlen == l: - break - if vlen > l: - raise DecodeError( - "chunk out of bounds", - klass=self.__class__, - decode_path=decode_path + (str(len(chunks) - 1),), - offset=chunks[-1].offset, - ) - sub_decode_path = decode_path + (str(len(chunks)),) - try: - chunk, v_tail = BitString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - ) - except TagMismatch: - raise DecodeError( - "expected BitString encoded chunk", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - chunks.append(chunk) - sub_offset += chunk.tlvlen - vlen += chunk.tlvlen - v = v_tail - if len(chunks) == 0: + pad_size = byte2int(v) + if l == 1 and pad_size != 0: raise DecodeError( - "no chunks", + "invalid empty value", klass=self.__class__, decode_path=decode_path, offset=offset, ) - values = [] - bit_len = 0 - for chunk_i, chunk in enumerate(chunks[:-1]): - if chunk.bit_len % 8 != 0: - raise DecodeError( - "BitString chunk is not multiple of 8 bits", - klass=self.__class__, - decode_path=decode_path + (str(chunk_i),), - offset=chunk.offset, - ) - values.append(bytes(chunk)) - bit_len += chunk.bit_len - chunk_last = chunks[-1] - values.append(bytes(chunk_last)) - bit_len += chunk_last.bit_len + if pad_size > 7: + raise DecodeError( + "too big pad", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0: + raise DecodeError( + "invalid pad", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] obj = self.__class__( - value=(bit_len, b"".join(values)), + value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), impl=self.tag, expl=self._expl, default=self.default, optional=self.optional, _specs=self.specs, - _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + _decoded=(offset, llen, l), ) - obj.lenindef = lenindef - obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - - def __repr__(self): - return pp_console_row(next(self.pps())) - - def pps(self, decode_path=()): - value = None + return obj, tail + if t != self.tag_constructed: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not ctx.get("bered", False): + raise DecodeError( + "unallowed BER constructed encoding", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if tag_only: # pragma: no cover + return None + lenindef = False + try: + l, llen, v = len_decode(lv) + except LenIndefForm: + llen, l, v = 1, 0, lv[1:] + lenindef = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not lenindef and l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if lenindef: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + "chunk out of bounds", + klass=self.__class__, + decode_path=decode_path + (str(len(chunks) - 1),), + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = BitString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ) + except TagMismatch: + raise DecodeError( + "expected BitString encoded chunk", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if len(chunks) == 0: + raise DecodeError( + "no chunks", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + values = [] + bit_len = 0 + for chunk_i, chunk in enumerate(chunks[:-1]): + if chunk.bit_len % 8 != 0: + raise DecodeError( + "BitString chunk is not multiple of 8 bits", + klass=self.__class__, + decode_path=decode_path + (str(chunk_i),), + offset=chunk.offset, + ) + values.append(bytes(chunk)) + bit_len += chunk.bit_len + chunk_last = chunks[-1] + values.append(bytes(chunk_last)) + bit_len += chunk_last.bit_len + obj = self.__class__( + value=(bit_len, b"".join(values)), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _specs=self.specs, + _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + ) + obj.lenindef = lenindef + obj.ber_encoded = True + return obj, (v[EOC_LEN:] if lenindef else v) + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + value = None blob = None if self.ready: bit_len, blob = self._value @@ -2447,6 +2826,7 @@ class BitString(Obj): if len(self.specs) > 0: blob = tuple(self.named) yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -2478,6 +2858,26 @@ class BitString(Obj): yield pp +OctetStringState = namedtuple("OctetStringState", ( + "version", + "value", + "bound_min", + "bound_max", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "tag_constructed", + "defined", +), **NAMEDTUPLE_KWARGS) + + class OctetString(Obj): """``OCTET STRING`` binary string type @@ -2515,6 +2915,7 @@ class OctetString(Obj): default=None, optional=False, _decoded=(0, 0, 0), + ctx=None, ): """ :param value: set the value. Either binary type, or @@ -2526,13 +2927,7 @@ class OctetString(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(OctetString, self).__init__( - impl, - expl, - default, - optional, - _decoded, - ) + super(OctetString, self).__init__(impl, expl, default, optional, _decoded) self._value = value self._bound_min, self._bound_max = getattr( self, @@ -2559,10 +2954,10 @@ class OctetString(Obj): ) def _value_sanitize(self, value): - if issubclass(value.__class__, OctetString): - value = value._value - elif isinstance(value, binary_type): + if value.__class__ == binary_type: pass + elif issubclass(value.__class__, OctetString): + value = value._value else: raise InvalidValueType((self.__class__, bytes)) if not self._bound_min <= len(value) <= self._bound_max: @@ -2573,26 +2968,50 @@ class OctetString(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__() - obj._value = self._value - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - return obj + def __getstate__(self): + return OctetStringState( + __version__, + self._value, + self._bound_min, + self._bound_max, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.tag_constructed, + self.defined, + ) + + def __setstate__(self, state): + super(OctetString, self).__setstate__(state) + self._value = state.value + self._bound_min = state.bound_min + self._bound_max = state.bound_max + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.tag_constructed = state.tag_constructed + self.defined = state.defined def __bytes__(self): self._assert_ready() return self._value def __eq__(self, their): - if isinstance(their, binary_type): + if their.__class__ == binary_type: return self._value == their if not issubclass(their.__class__, OctetString): return False @@ -2634,50 +3053,6 @@ class OctetString(Obj): self._value, )) - def _decode_chunk(self, lv, offset, decode_path, ctx): - try: - l, llen, v = len_decode(lv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if l > len(v): - raise NotEnoughData( - "encoded length is longer than data", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - v, tail = v[:l], v[l:] - try: - obj = self.__class__( - value=v.tobytes(), - bounds=(self._bound_min, self._bound_max), - impl=self.tag, - expl=self._expl, - default=self.default, - optional=self.optional, - _decoded=(offset, llen, l), - ) - except DecodeError as err: - raise DecodeError( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - except BoundsError as err: - raise DecodeError( - msg=str(err), - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - return obj, tail - def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) @@ -2690,24 +3065,9 @@ class OctetString(Obj): ) if t == self.tag: if tag_only: - return - return self._decode_chunk(lv, offset, decode_path, ctx) - if t == self.tag_constructed: - if not ctx.get("bered", False): - raise DecodeError( - "unallowed BER constructed encoding", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: - return - lenindef = False + return None try: l, llen, v = len_decode(lv) - except LenIndefForm: - llen, l, v = 1, 0, lv[1:] - lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -2722,52 +3082,17 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - chunks = [] - sub_offset = offset + tlen + llen - vlen = 0 - while True: - if lenindef: - if v[:EOC_LEN].tobytes() == EOC: - break - else: - if vlen == l: - break - if vlen > l: - raise DecodeError( - "chunk out of bounds", - klass=self.__class__, - decode_path=decode_path + (str(len(chunks) - 1),), - offset=chunks[-1].offset, - ) - sub_decode_path = decode_path + (str(len(chunks)),) - try: - chunk, v_tail = OctetString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - ) - except TagMismatch: - raise DecodeError( - "expected OctetString encoded chunk", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - chunks.append(chunk) - sub_offset += chunk.tlvlen - vlen += chunk.tlvlen - v = v_tail + v, tail = v[:l], v[l:] try: obj = self.__class__( - value=b"".join(bytes(chunk) for chunk in chunks), + value=v.tobytes(), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, default=self.default, optional=self.optional, - _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + _decoded=(offset, llen, l), + ctx=ctx, ) except DecodeError as err: raise DecodeError( @@ -2783,20 +3108,115 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - obj.lenindef = lenindef - obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) + return obj, tail + if t != self.tag_constructed: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not ctx.get("bered", False): + raise DecodeError( + "unallowed BER constructed encoding", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if tag_only: + return None + lenindef = False + try: + l, llen, v = len_decode(lv) + except LenIndefForm: + llen, l, v = 1, 0, lv[1:] + lenindef = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if lenindef: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + "chunk out of bounds", + klass=self.__class__, + decode_path=decode_path + (str(len(chunks) - 1),), + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = OctetString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ) + except TagMismatch: + raise DecodeError( + "expected OctetString encoded chunk", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + try: + obj = self.__class__( + value=b"".join(bytes(chunk) for chunk in chunks), + bounds=(self._bound_min, self._bound_max), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + ctx=ctx, + ) + except DecodeError as err: + raise DecodeError( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + except BoundsError as err: + raise DecodeError( + msg=str(err), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj.lenindef = lenindef + obj.ber_encoded = True + return obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -2828,6 +3248,21 @@ class OctetString(Obj): yield pp +NullState = namedtuple("NullState", ( + "version", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Null(Obj): """``NULL`` null object @@ -2860,16 +3295,33 @@ class Null(Obj): def ready(self): return True - def copy(self): - obj = self.__class__() - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - return obj + def __getstate__(self): + return NullState( + __version__, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Null, self).__setstate__(state) + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if not issubclass(their.__class__, Null): @@ -2912,7 +3364,7 @@ class Null(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None try: l, _, v = len_decode(lv) except DecodeError as err: @@ -2942,6 +3394,7 @@ class Null(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -2963,6 +3416,23 @@ class Null(Obj): yield pp +ObjectIdentifierState = namedtuple("ObjectIdentifierState", ( + "version", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "defines", +), **NAMEDTUPLE_KWARGS) + + class ObjectIdentifier(Obj): """``OBJECT IDENTIFIER`` OID type @@ -3013,13 +3483,7 @@ class ObjectIdentifier(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(ObjectIdentifier, self).__init__( - impl, - expl, - default, - optional, - _decoded, - ) + super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded) self._value = value if value is not None: self._value = self._value_sanitize(value) @@ -3035,10 +3499,10 @@ class ObjectIdentifier(Obj): self.defines = defines def __add__(self, their): + if their.__class__ == tuple: + return self.__class__(self._value + their) if isinstance(their, self.__class__): return self.__class__(self._value + their._value) - if isinstance(their, tuple): - return self.__class__(self._value + their) raise InvalidValueType((self.__class__, tuple)) def _value_sanitize(self, value): @@ -3046,10 +3510,10 @@ class ObjectIdentifier(Obj): return value._value if isinstance(value, string_types): try: - value = tuple(int(arc) for arc in value.split(".")) + value = tuple(pureint(arc) for arc in value.split(".")) except ValueError: raise InvalidOID("unacceptable arcs values") - if isinstance(value, tuple): + if value.__class__ == tuple: if len(value) < 2: raise InvalidOID("less than 2 arcs") first_arc = value[0] @@ -3060,6 +3524,8 @@ class ObjectIdentifier(Obj): pass else: raise InvalidOID("unacceptable first arc value") + if not all(arc >= 0 for arc in value): + raise InvalidOID("negative arc value") return value raise InvalidValueType((self.__class__, str, tuple)) @@ -3067,18 +3533,37 @@ class ObjectIdentifier(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__() - obj._value = self._value - obj.defines = self.defines - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - return obj + def __getstate__(self): + return ObjectIdentifierState( + __version__, + self._value, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.defines, + ) + + def __setstate__(self, state): + super(ObjectIdentifier, self).__setstate__(state) + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.defines = state.defines def __iter__(self): self._assert_ready() @@ -3096,7 +3581,7 @@ class ObjectIdentifier(Obj): ) def __eq__(self, their): - if isinstance(their, tuple): + if their.__class__ == tuple: return self._value == their if not issubclass(their.__class__, ObjectIdentifier): return False @@ -3163,7 +3648,7 @@ class ObjectIdentifier(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3189,11 +3674,17 @@ class ObjectIdentifier(Obj): ) v, tail = v[:l], v[l:] arcs = [] + ber_encoded = False while len(v) > 0: i = 0 arc = 0 while True: octet = indexbytes(v, i) + if i == 0 and octet == 0x80: + if ctx.get("bered", False): + ber_encoded = True + else: + raise DecodeError("non normalized arc encoding") arc = (arc << 7) | (octet & 0x7F) if octet & 0x80 == 0: arcs.append(arc) @@ -3225,6 +3716,8 @@ class ObjectIdentifier(Obj): optional=self.optional, _decoded=(offset, llen, l), ) + if ber_encoded: + obj.ber_encoded = True return obj, tail def __repr__(self): @@ -3232,6 +3725,7 @@ class ObjectIdentifier(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -3249,6 +3743,7 @@ class ObjectIdentifier(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, expl_lenindef=self.expl_lenindef, + ber_encoded=self.ber_encoded, bered=self.bered, ) for pp in self.pps_lenindef(decode_path): @@ -3277,13 +3772,7 @@ class Enumerated(Integer): bounds=None, # dummy argument, workability for Integer.decode ): super(Enumerated, self).__init__( - value=value, - impl=impl, - expl=expl, - default=default, - optional=optional, - _specs=_specs, - _decoded=_decoded, + value, bounds, impl, expl, default, optional, _specs, _decoded, ) if len(self.specs) == 0: raise ValueError("schema must be specified") @@ -3292,7 +3781,10 @@ class Enumerated(Integer): if isinstance(value, self.__class__): value = value._value elif isinstance(value, integer_types): - if value not in list(self.specs.values()): + for _value in itervalues(self.specs): + if _value == value: + break + else: raise DecodeError( "unknown integer value: %s" % value, klass=self.__class__, @@ -3305,20 +3797,6 @@ class Enumerated(Integer): raise InvalidValueType((self.__class__, int, str)) return value - def copy(self): - obj = self.__class__(_specs=self.specs) - obj._value = self._value - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - return obj - def __call__( self, value=None, @@ -3338,6 +3816,12 @@ class Enumerated(Integer): ) +def escape_control_unicode(c): + if unicat(c)[0] == "C": + c = repr(c).lstrip("u").strip("'") + return c + + class CommonString(OctetString): """Common class for all strings @@ -3400,16 +3884,16 @@ class CommonString(OctetString): * - :py:class:`pyderasn.BMPString` - utf-16-be """ - __slots__ = ("encoding",) + __slots__ = () def _value_sanitize(self, value): value_raw = None value_decoded = None if isinstance(value, self.__class__): value_raw = value._value - elif isinstance(value, text_type): + elif value.__class__ == text_type: value_decoded = value - elif isinstance(value, binary_type): + elif value.__class__ == binary_type: value_raw = value else: raise InvalidValueType((self.__class__, text_type, binary_type)) @@ -3433,9 +3917,9 @@ class CommonString(OctetString): return value_raw def __eq__(self, their): - if isinstance(their, binary_type): + if their.__class__ == binary_type: return self._value == their - if isinstance(their, text_type): + if their.__class__ == text_type: return self._value == their.encode(self.encoding) if not isinstance(their, self.__class__): return False @@ -3456,8 +3940,12 @@ class CommonString(OctetString): def pps(self, decode_path=(), no_unicode=False): value = None if self.ready: - value = hexenc(bytes(self)) if no_unicode else self.__unicode__() + value = ( + hexenc(bytes(self)) if no_unicode else + "".join(escape_control_unicode(c) for c in self.__unicode__()) + ) yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -3489,36 +3977,140 @@ class UTF8String(CommonString): asn1_type_name = "UTF8String" -class NumericString(CommonString): - """Numeric string +class AllowableCharsMixin(object): + @property + def allowable_chars(self): + if PY2: + return self._allowable_chars + return frozenset(six_unichr(c) for c in self._allowable_chars) + + +class NumericString(AllowableCharsMixin, CommonString): + """Numeric string + + Its value is properly sanitized: only ASCII digits with spaces can + be stored. + + >>> NumericString().allowable_chars + frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ']) + """ + __slots__ = () + tag_default = tag_encode(18) + encoding = "ascii" + asn1_type_name = "NumericString" + _allowable_chars = frozenset(digits.encode("ascii") + b" ") + + def _value_sanitize(self, value): + value = super(NumericString, self)._value_sanitize(value) + if not frozenset(value) <= self._allowable_chars: + raise DecodeError("non-numeric value") + return value + + +PrintableStringState = namedtuple( + "PrintableStringState", + OctetStringState._fields + ("allowable_chars",), + **NAMEDTUPLE_KWARGS +) + + +class PrintableString(AllowableCharsMixin, CommonString): + """Printable string + + Its value is properly sanitized: see X.680 41.4 table 10. + + >>> PrintableString().allowable_chars + frozenset([' ', "'", ..., 'z']) + >>> obj = PrintableString("foo*bar", allow_asterisk=True) + PrintableString PrintableString foo*bar + >>> obj.allow_asterisk, obj.allow_ampersand + (True, False) + """ + __slots__ = () + tag_default = tag_encode(19) + encoding = "ascii" + asn1_type_name = "PrintableString" + _allowable_chars = frozenset( + (ascii_letters + digits + " '()+,-./:=?").encode("ascii") + ) + _asterisk = frozenset("*".encode("ascii")) + _ampersand = frozenset("&".encode("ascii")) + + def __init__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + ctx=None, + allow_asterisk=False, + allow_ampersand=False, + ): + """ + :param allow_asterisk: allow asterisk character + :param allow_ampersand: allow ampersand character + """ + if allow_asterisk: + self._allowable_chars |= self._asterisk + if allow_ampersand: + self._allowable_chars |= self._ampersand + super(PrintableString, self).__init__( + value, bounds, impl, expl, default, optional, _decoded, ctx, + ) + + @property + def allow_asterisk(self): + """Is asterisk character allowed? + """ + return self._asterisk <= self._allowable_chars - Its value is properly sanitized: only ASCII digits can be stored. - """ - __slots__ = () - tag_default = tag_encode(18) - encoding = "ascii" - asn1_type_name = "NumericString" - allowable_chars = set(digits.encode("ascii") + b" ") + @property + def allow_ampersand(self): + """Is ampersand character allowed? + """ + return self._ampersand <= self._allowable_chars def _value_sanitize(self, value): - value = super(NumericString, self)._value_sanitize(value) - if not set(value) <= self.allowable_chars: - raise DecodeError("non-numeric value") + value = super(PrintableString, self)._value_sanitize(value) + if not frozenset(value) <= self._allowable_chars: + raise DecodeError("non-printable value") return value + def __getstate__(self): + return PrintableStringState( + *super(PrintableString, self).__getstate__(), + **{"allowable_chars": self._allowable_chars} + ) -class PrintableString(CommonString): - __slots__ = () - tag_default = tag_encode(19) - encoding = "ascii" - asn1_type_name = "PrintableString" - allowable_chars = set((ascii_letters + digits + " '()+,-./:=?").encode("ascii")) + def __setstate__(self, state): + super(PrintableString, self).__setstate__(state) + self._allowable_chars = state.allowable_chars - def _value_sanitize(self, value): - value = super(PrintableString, self)._value_sanitize(value) - if not set(value) <= self.allowable_chars: - raise DecodeError("non-printable value") - return value + def __call__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + bounds=( + (self._bound_min, self._bound_max) + if bounds is None else bounds + ), + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + allow_asterisk=self.allow_asterisk, + allow_ampersand=self.allow_ampersand, + ) class TeletexString(CommonString): @@ -3552,7 +4144,31 @@ LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ") LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ") -class UTCTime(CommonString): +class VisibleString(CommonString): + __slots__ = () + tag_default = tag_encode(26) + encoding = "ascii" + asn1_type_name = "VisibleString" + + +UTCTimeState = namedtuple( + "UTCTimeState", + OctetStringState._fields + ("ber_raw",), + **NAMEDTUPLE_KWARGS +) + + +def str_to_time_fractions(value): + v = pureint(value) + year, v = (v // 10**10), (v % 10**10) + month, v = (v // 10**8), (v % 10**8) + day, v = (v // 10**6), (v % 10**6) + hour, v = (v // 10**4), (v % 10**4) + minute, second = (v // 100), (v % 100) + return year, month, day, hour, minute, second + + +class UTCTime(VisibleString): """``UTCTime`` datetime type >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123)) @@ -3565,14 +4181,28 @@ class UTCTime(CommonString): datetime.datetime(2017, 9, 30, 22, 7, 50) >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime() datetime.datetime(1957, 9, 30, 22, 7, 50) + + If BER encoded value was met, then ``ber_raw`` attribute will hold + its raw representation. + + .. warning:: + + Pay attention that UTCTime can not hold full year, so all years + having < 50 years are treated as 20xx, 19xx otherwise, according + to X.509 recommendation. + + .. warning:: + + No strict validation of UTC offsets are made, but very crude: + + * minutes are not exceeding 60 + * offset value is not exceeding 14 hours """ - __slots__ = () + __slots__ = ("ber_raw",) tag_default = tag_encode(23) encoding = "ascii" asn1_type_name = "UTCTime" - fmt = "%y%m%d%H%M%SZ" - def __init__( self, value=None, @@ -3582,6 +4212,7 @@ class UTCTime(CommonString): optional=False, _decoded=(0, 0, 0), bounds=None, # dummy argument, workability for OctetString.decode + ctx=None, ): """ :param value: set the value. Either datetime type, or @@ -3592,17 +4223,15 @@ class UTCTime(CommonString): :param bool optional: is object ``OPTIONAL`` in sequence """ super(UTCTime, self).__init__( - impl=impl, - expl=expl, - default=default, - optional=optional, - _decoded=_decoded, + None, None, impl, expl, None, optional, _decoded, ctx, ) self._value = value + self.ber_raw = None if value is not None: - self._value = self._value_sanitize(value) + self._value, self.ber_raw = self._value_sanitize(value, ctx) + self.ber_encoded = self.ber_raw is not None if default is not None: - default = self._value_sanitize(default) + default, _ = self._value_sanitize(default) self.default = self.__class__( value=default, impl=self.tag, @@ -3610,31 +4239,122 @@ class UTCTime(CommonString): ) if self._value is None: self._value = default + optional = True + self.optional = optional - def _value_sanitize(self, value): - if isinstance(value, self.__class__): - return value._value - if isinstance(value, datetime): - return value.strftime(self.fmt).encode("ascii") - if isinstance(value, binary_type): + def _strptime_bered(self, value): + year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00") + value = value[10:] + if len(value) == 0: + raise ValueError("no timezone") + year += 2000 if year < 50 else 1900 + decoded = datetime(year, month, day, hour, minute) + offset = 0 + if value[-1] == "Z": + value = value[:-1] + else: + if len(value) < 5: + raise ValueError("invalid UTC offset") + if value[-5] == "-": + sign = -1 + elif value[-5] == "+": + sign = 1 + else: + raise ValueError("invalid UTC offset") + v = pureint(value[-4:]) + offset, v = (60 * (v % 100)), v // 100 + if offset >= 3600: + raise ValueError("invalid UTC offset minutes") + offset += 3600 * v + if offset > 14 * 3600: + raise ValueError("too big UTC offset") + offset *= sign + value = value[:-5] + if len(value) == 0: + return offset, decoded + if len(value) != 2: + raise ValueError("invalid UTC offset seconds") + seconds = pureint(value) + if seconds >= 60: + raise ValueError("invalid seconds value") + return offset, decoded + timedelta(seconds=seconds) + + def _strptime(self, value): + # datetime.strptime's format: %y%m%d%H%M%SZ + if len(value) != LEN_YYMMDDHHMMSSZ: + raise ValueError("invalid UTCTime length") + if value[-1] != "Z": + raise ValueError("non UTC timezone") + year, month, day, hour, minute, second = str_to_time_fractions(value[:-1]) + year += 2000 if year < 50 else 1900 + return datetime(year, month, day, hour, minute, second) + + def _dt_sanitize(self, value): + if value.year < 1950 or value.year > 2049: + raise ValueError("UTCTime can hold only 1950-2049 years") + return value.replace(microsecond=0) + + def _value_sanitize(self, value, ctx=None): + if value.__class__ == binary_type: try: value_decoded = value.decode("ascii") except (UnicodeEncodeError, UnicodeDecodeError) as err: - raise DecodeError("invalid UTCTime encoding") - if len(value_decoded) == LEN_YYMMDDHHMMSSZ: - try: - datetime.strptime(value_decoded, self.fmt) - except (TypeError, ValueError): - raise DecodeError("invalid UTCTime format") - return value - else: - raise DecodeError("invalid UTCTime length") + raise DecodeError("invalid UTCTime encoding: %r" % err) + err = None + try: + return self._strptime(value_decoded), None + except (TypeError, ValueError) as _err: + err = _err + if (ctx is not None) and ctx.get("bered", False): + try: + offset, _value = self._strptime_bered(value_decoded) + _value = _value - timedelta(seconds=offset) + return self._dt_sanitize(_value), value + except (TypeError, ValueError, OverflowError) as _err: + err = _err + raise DecodeError( + "invalid %s format: %r" % (self.asn1_type_name, err), + klass=self.__class__, + ) + if isinstance(value, self.__class__): + return value._value, None + if value.__class__ == datetime: + return self._dt_sanitize(value), None raise InvalidValueType((self.__class__, datetime)) + def _pp_value(self): + if self.ready: + value = self._value.isoformat() + if self.ber_encoded: + value += " (%s)" % self.ber_raw + return value + + def __unicode__(self): + if self.ready: + value = self._value.isoformat() + if self.ber_encoded: + value += " (%s)" % self.ber_raw + return value + return text_type(self._pp_value()) + + def __getstate__(self): + return UTCTimeState( + *super(UTCTime, self).__getstate__(), + **{"ber_raw": self.ber_raw} + ) + + def __setstate__(self, state): + super(UTCTime, self).__setstate__(state) + self.ber_raw = state.ber_raw + + def __bytes__(self): + self._assert_ready() + return self._encode_time() + def __eq__(self, their): - if isinstance(their, binary_type): - return self._value == their - if isinstance(their, datetime): + if their.__class__ == binary_type: + return self._encode_time() == their + if their.__class__ == datetime: return self.todatetime() == their if not isinstance(their, self.__class__): return False @@ -3644,35 +4364,27 @@ class UTCTime(CommonString): self._expl == their._expl ) - def todatetime(self): - """Convert to datetime + def _encode_time(self): + return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii") - :returns: datetime + def _encode(self): + self._assert_ready() + value = self._encode_time() + return b"".join((self.tag, len_encode(len(value)), value)) - Pay attention that UTCTime can not hold full year, so all years - having < 50 years are treated as 20xx, 19xx otherwise, according - to X.509 recomendation. - """ - value = datetime.strptime(self._value.decode("ascii"), self.fmt) - year = value.year % 100 - return datetime( - year=(2000 + year) if year < 50 else (1900 + year), - month=value.month, - day=value.day, - hour=value.hour, - minute=value.minute, - second=value.second, - ) + def todatetime(self): + return self._value def __repr__(self): return pp_console_row(next(self.pps())) def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, - value=self.todatetime().isoformat() if self.ready else None, + value=self._pp_value(), optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), @@ -3704,54 +4416,122 @@ class GeneralizedTime(UTCTime): '20170930220750.000123Z' >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50)) GeneralizedTime GeneralizedTime 2057-09-30T22:07:50 + + .. warning:: + + Only microsecond fractions are supported in DER encoding. + :py:exc:`pyderasn.DecodeError` will be raised during decoding of + higher precision values. + + .. warning:: + + BER encoded data can loss information (accuracy) during decoding + because of float transformations. + + .. warning:: + + Local times (without explicit timezone specification) are treated + as UTC one, no transformations are made. + + .. warning:: + + Zero year is unsupported. """ __slots__ = () tag_default = tag_encode(24) asn1_type_name = "GeneralizedTime" - fmt = "%Y%m%d%H%M%SZ" - fmt_ms = "%Y%m%d%H%M%S.%fZ" + def _dt_sanitize(self, value): + return value - def _value_sanitize(self, value): - if isinstance(value, self.__class__): - return value._value - if isinstance(value, datetime): - return value.strftime( - self.fmt_ms if value.microsecond > 0 else self.fmt - ).encode("ascii") - if isinstance(value, binary_type): - try: - value_decoded = value.decode("ascii") - except (UnicodeEncodeError, UnicodeDecodeError) as err: - raise DecodeError("invalid GeneralizedTime encoding") - if len(value_decoded) == LEN_YYYYMMDDHHMMSSZ: - try: - datetime.strptime(value_decoded, self.fmt) - except (TypeError, ValueError): - raise DecodeError( - "invalid GeneralizedTime (without ms) format", - ) - return value - elif len(value_decoded) >= LEN_YYYYMMDDHHMMSSDMZ: - try: - datetime.strptime(value_decoded, self.fmt_ms) - except (TypeError, ValueError): - raise DecodeError( - "invalid GeneralizedTime (with ms) format", - ) - return value - else: - raise DecodeError( - "invalid GeneralizedTime length", - klass=self.__class__, - ) - raise InvalidValueType((self.__class__, datetime)) + def _strptime_bered(self, value): + if len(value) < 4 + 3 * 2: + raise ValueError("invalid GeneralizedTime") + year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000") + decoded = datetime(year, month, day, hour) + offset, value = 0, value[10:] + if len(value) == 0: + return offset, decoded + if value[-1] == "Z": + value = value[:-1] + else: + for char, sign in (("-", -1), ("+", 1)): + idx = value.rfind(char) + if idx == -1: + continue + offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx] + v = pureint(offset_raw) + if len(offset_raw) == 4: + offset, v = (60 * (v % 100)), v // 100 + if offset >= 3600: + raise ValueError("invalid UTC offset minutes") + elif len(offset_raw) == 2: + pass + else: + raise ValueError("invalid UTC offset") + offset += 3600 * v + if offset > 14 * 3600: + raise ValueError("too big UTC offset") + offset *= sign + break + if len(value) == 0: + return offset, decoded + if value[0] in DECIMAL_SIGNS: + return offset, ( + decoded + timedelta(seconds=3600 * fractions2float(value[1:])) + ) + if len(value) < 2: + raise ValueError("stripped minutes") + decoded += timedelta(seconds=60 * pureint(value[:2])) + value = value[2:] + if len(value) == 0: + return offset, decoded + if value[0] in DECIMAL_SIGNS: + return offset, ( + decoded + timedelta(seconds=60 * fractions2float(value[1:])) + ) + if len(value) < 2: + raise ValueError("stripped seconds") + decoded += timedelta(seconds=pureint(value[:2])) + value = value[2:] + if len(value) == 0: + return offset, decoded + if value[0] not in DECIMAL_SIGNS: + raise ValueError("invalid format after seconds") + return offset, ( + decoded + timedelta(microseconds=10**6 * fractions2float(value[1:])) + ) - def todatetime(self): - value = self._value.decode("ascii") - if len(value) == LEN_YYYYMMDDHHMMSSZ: - return datetime.strptime(value, self.fmt) - return datetime.strptime(value, self.fmt_ms) + def _strptime(self, value): + l = len(value) + if l == LEN_YYYYMMDDHHMMSSZ: + # datetime.strptime's format: %Y%m%d%H%M%SZ + if value[-1] != "Z": + raise ValueError("non UTC timezone") + return datetime(*str_to_time_fractions(value[:-1])) + if l >= LEN_YYYYMMDDHHMMSSDMZ: + # datetime.strptime's format: %Y%m%d%H%M%S.%fZ + if value[-1] != "Z": + raise ValueError("non UTC timezone") + if value[14] != ".": + raise ValueError("no fractions separator") + us = value[15:-1] + if us[-1] == "0": + raise ValueError("trailing zero") + us_len = len(us) + if us_len > 6: + raise ValueError("only microsecond fractions are supported") + us = pureint(us + ("0" * (6 - us_len))) + year, month, day, hour, minute, second = str_to_time_fractions(value[:14]) + return datetime(year, month, day, hour, minute, second, us) + raise ValueError("invalid GeneralizedTime length") + + def _encode_time(self): + value = self._value + encoded = value.strftime("%Y%m%d%H%M%S") + if value.microsecond > 0: + encoded += (".%06d" % value.microsecond).rstrip("0") + return (encoded + "Z").encode("ascii") class GraphicString(CommonString): @@ -3761,13 +4541,6 @@ class GraphicString(CommonString): asn1_type_name = "GraphicString" -class VisibleString(CommonString): - __slots__ = () - tag_default = tag_encode(26) - encoding = "ascii" - asn1_type_name = "VisibleString" - - class ISO646String(VisibleString): __slots__ = () asn1_type_name = "ISO646String" @@ -3794,6 +4567,23 @@ class BMPString(CommonString): asn1_type_name = "BMPString" +ChoiceState = namedtuple("ChoiceState", ( + "version", + "specs", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Choice(Obj): """``CHOICE`` special type @@ -3855,7 +4645,7 @@ class Choice(Obj): if len(schema) == 0: raise ValueError("schema must be specified") self.specs = ( - schema if isinstance(schema, OrderedDict) else OrderedDict(schema) + schema if schema.__class__ == OrderedDict else OrderedDict(schema) ) self._value = None if value is not None: @@ -3867,12 +4657,10 @@ class Choice(Obj): default_obj._value = default_value self.default = default_obj if value is None: - self._value = default_obj.copy()._value + self._value = copy(default_obj._value) def _value_sanitize(self, value): - if isinstance(value, self.__class__): - return value._value - if isinstance(value, tuple) and len(value) == 2: + if (value.__class__ == tuple) and len(value) == 2: choice, obj = value spec = self.specs.get(choice) if spec is None: @@ -3880,6 +4668,8 @@ class Choice(Obj): if not isinstance(obj, spec.__class__): raise InvalidValueType((spec,)) return (choice, spec(obj)) + if isinstance(value, self.__class__): + return value._value raise InvalidValueType((self.__class__, tuple)) @property @@ -3893,21 +4683,39 @@ class Choice(Obj): self._value[1].bered ) - def copy(self): - obj = self.__class__(schema=self.specs) - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - value = self._value - if value is not None: - obj._value = (value[0], value[1].copy()) - return obj + def __getstate__(self): + return ChoiceState( + __version__, + self.specs, + copy(self._value), + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Choice, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): - if isinstance(their, tuple) and len(their) == 2: + if (their.__class__ == tuple) and len(their) == 2: return self._value == their if not isinstance(their, self.__class__): return False @@ -3972,7 +4780,7 @@ class Choice(Obj): return self._value[1].encode() def _decode(self, tlv, offset, decode_path, ctx, tag_only): - for choice, spec in self.specs.items(): + for choice, spec in iteritems(self.specs): sub_decode_path = decode_path + (choice,) try: spec.decode( @@ -3982,6 +4790,7 @@ class Choice(Obj): decode_path=sub_decode_path, ctx=ctx, tag_only=True, + _ctx_immutable=False, ) except TagMismatch: continue @@ -3993,13 +4802,14 @@ class Choice(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None value, tail = spec.decode( tlv, offset=offset, leavemm=True, decode_path=sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) obj = self.__class__( schema=self.specs, @@ -4019,6 +4829,7 @@ class Choice(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -4045,9 +4856,9 @@ class PrimitiveTypes(Choice): It could be useful for general decoding of some unspecified values: - >>> PrimitiveTypes().decode(hexdec("0403666f6f"))[0].value + >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value OCTET STRING 3 bytes 666f6f - >>> PrimitiveTypes().decode(hexdec("0203123456"))[0].value + >>> PrimitiveTypes().decod(hexdec("0203123456")).value INTEGER 1193046 """ __slots__ = () @@ -4075,6 +4886,22 @@ class PrimitiveTypes(Choice): )) +AnyState = namedtuple("AnyState", ( + "version", + "value", + "tag", + "expl", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "defined", +), **NAMEDTUPLE_KWARGS) + + class Any(Obj): """``ANY`` special type @@ -4109,12 +4936,12 @@ class Any(Obj): self.defined = None def _value_sanitize(self, value): + if value.__class__ == binary_type: + return value if isinstance(value, self.__class__): return value._value if isinstance(value, Obj): return value.encode() - if isinstance(value, binary_type): - return value raise InvalidValueType((self.__class__, Obj, binary_type)) @property @@ -4129,19 +4956,38 @@ class Any(Obj): return False return self.defined[1].bered - def copy(self): - obj = self.__class__() - obj._value = self._value - obj.tag = self.tag - obj._expl = self._expl - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - return obj + def __getstate__(self): + return AnyState( + __version__, + self._value, + self.tag, + self._expl, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.defined, + ) + + def __setstate__(self, state): + super(Any, self).__setstate__(state) + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.defined = state.defined def __eq__(self, their): - if isinstance(their, binary_type): + if their.__class__ == binary_type: return self._value == their if issubclass(their.__class__, Any): return self._value == their._value @@ -4201,6 +5047,7 @@ class Any(Obj): decode_path=decode_path + (str(chunk_i),), leavemm=True, ctx=ctx, + _ctx_immutable=False, ) vlen += chunk.tlvlen sub_offset += chunk.tlvlen @@ -4213,7 +5060,7 @@ class Any(Obj): _decoded=(offset, 0, tlvlen), ) obj.lenindef = True - obj.tag = t + obj.tag = t.tobytes() return obj, v[EOC_LEN:] except DecodeError as err: raise err.__class__( @@ -4237,7 +5084,7 @@ class Any(Obj): optional=self.optional, _decoded=(offset, 0, tlvlen), ) - obj.tag = t + obj.tag = t.tobytes() return obj, tail def __repr__(self): @@ -4245,6 +5092,7 @@ class Any(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -4294,8 +5142,7 @@ def get_def_by_path(defines_by_path, sub_decode_path): def abs_decode_path(decode_path, rel_path): """Create an absolute decode path from current and relative ones - :param decode_path: current decode path, starting point. - Tuple of strings + :param decode_path: current decode path, starting point. Tuple of strings :param rel_path: relative path to ``decode_path``. Tuple of strings. If first tuple's element is "/", then treat it as an absolute path, ignoring ``decode_path`` as @@ -4317,6 +5164,23 @@ def abs_decode_path(decode_path, rel_path): return decode_path + rel_path +SequenceState = namedtuple("SequenceState", ( + "version", + "specs", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Sequence(Obj): """``SEQUENCE`` structure type @@ -4426,7 +5290,7 @@ class Sequence(Obj): if schema is None: schema = getattr(self, "schema", ()) self.specs = ( - schema if isinstance(schema, OrderedDict) else OrderedDict(schema) + schema if schema.__class__ == OrderedDict else OrderedDict(schema) ) self._value = {} if value is not None: @@ -4446,38 +5310,57 @@ class Sequence(Obj): default_obj._value = default_value self.default = default_obj if value is None: - self._value = default_obj.copy()._value + self._value = copy(default_obj._value) @property def ready(self): - for name, spec in self.specs.items(): + for name, spec in iteritems(self.specs): value = self._value.get(name) if value is None: if spec.optional: continue return False - else: - if not value.ready: - return False + if not value.ready: + return False return True @property def bered(self): if self.expl_lenindef or self.lenindef or self.ber_encoded: return True - return any(value.bered for value in self._value.values()) - - def copy(self): - obj = self.__class__(schema=self.specs) - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj._value = {k: v.copy() for k, v in self._value.items()} - return obj + return any(value.bered for value in itervalues(self._value)) + + def __getstate__(self): + return SequenceState( + __version__, + self.specs, + {k: copy(v) for k, v in iteritems(self._value)}, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Sequence, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if not isinstance(their, self.__class__): @@ -4537,7 +5420,7 @@ class Sequence(Obj): def _encoded_values(self): raws = [] - for name, spec in self.specs.items(): + for name, spec in iteritems(self.specs): value = self._value.get(name) if value is None: if spec.optional: @@ -4567,7 +5450,7 @@ class Sequence(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -4603,7 +5486,7 @@ class Sequence(Obj): values = {} ber_encoded = False ctx_allow_default_values = ctx.get("allow_default_values", False) - for name, spec in self.specs.items(): + for name, spec in iteritems(self.specs): if spec.optional and ( (lenindef and v[:EOC_LEN].tobytes() == EOC) or len(v) == 0 @@ -4617,9 +5500,10 @@ class Sequence(Obj): leavemm=True, decode_path=sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) - except TagMismatch: - if spec.optional: + except TagMismatch as err: + if (len(err.decode_path) == len(decode_path) + 1) and spec.optional: continue raise @@ -4641,6 +5525,7 @@ class Sequence(Obj): leavemm=True, decode_path=sub_sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) if len(defined_tail) > 0: raise DecodeError( @@ -4660,6 +5545,7 @@ class Sequence(Obj): leavemm=True, decode_path=sub_decode_path + (DecodePathDefBy(defined_by),), ctx=ctx, + _ctx_immutable=False, ) if len(defined_tail) > 0: raise DecodeError( @@ -4741,6 +5627,7 @@ class Sequence(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -4793,6 +5680,9 @@ class Set(Sequence): v = b"".join(raws) return b"".join((self.tag, len_encode(len(v)), v)) + def _specs_items(self): + return iteritems(self.specs) + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) @@ -4810,7 +5700,7 @@ class Set(Sequence): offset=offset, ) if tag_only: - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -4847,11 +5737,11 @@ class Set(Sequence): ctx_allow_default_values = ctx.get("allow_default_values", False) ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) value_prev = memoryview(v[:0]) - specs_items = self.specs.items + while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - for name, spec in specs_items(): + for name, spec in self._specs_items(): sub_decode_path = decode_path + (name,) try: spec.decode( @@ -4861,6 +5751,7 @@ class Set(Sequence): decode_path=sub_decode_path, ctx=ctx, tag_only=True, + _ctx_immutable=False, ) except TagMismatch: continue @@ -4877,6 +5768,7 @@ class Set(Sequence): leavemm=True, decode_path=sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) value_len = value.fulllen if value_prev.tobytes() > v[:value_len].tobytes(): @@ -4924,17 +5816,37 @@ class Set(Sequence): tail = v[EOC_LEN:] obj.lenindef = True obj._value = values - if not obj.ready: - raise DecodeError( - "not all values are ready", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) + for name, spec in iteritems(self.specs): + if name not in values and not spec.optional: + raise DecodeError( + "%s value is not ready" % name, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) obj.ber_encoded = ber_encoded return obj, tail +SequenceOfState = namedtuple("SequenceOfState", ( + "version", + "spec", + "value", + "bound_min", + "bound_max", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class SequenceOf(Obj): """``SEQUENCE OF`` sequence type @@ -4980,13 +5892,7 @@ class SequenceOf(Obj): optional=False, _decoded=(0, 0, 0), ): - super(SequenceOf, self).__init__( - impl, - expl, - default, - optional, - _decoded, - ) + super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded) if schema is None: schema = getattr(self, "schema", None) if schema is None: @@ -5010,7 +5916,7 @@ class SequenceOf(Obj): default_obj._value = default_value self.default = default_obj if value is None: - self._value = default_obj.copy()._value + self._value = copy(default_obj._value) def _value_sanitize(self, value): if issubclass(value.__class__, SequenceOf): @@ -5036,19 +5942,41 @@ class SequenceOf(Obj): return True return any(v.bered for v in self._value) - def copy(self): - obj = self.__class__(schema=self.spec) - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj._value = [v.copy() for v in self._value] - return obj + def __getstate__(self): + return SequenceOfState( + __version__, + self.spec, + [copy(v) for v in self._value], + self._bound_min, + self._bound_max, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(SequenceOf, self).__setstate__(state) + self.spec = state.spec + self._value = state.value + self._bound_min = state.bound_min + self._bound_max = state.bound_max + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if isinstance(their, self.__class__): @@ -5138,7 +6066,7 @@ class SequenceOf(Obj): offset=offset, ) if tag_only: - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5186,6 +6114,7 @@ class SequenceOf(Obj): leavemm=True, decode_path=sub_decode_path, ctx=ctx, + _ctx_immutable=False, ) value_len = value.fulllen if ordering_check: @@ -5243,6 +6172,7 @@ class SequenceOf(Obj): def pps(self, decode_path=()): yield _pp( + obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, @@ -5318,7 +6248,7 @@ def generic_decoder(): # pragma: no cover choice = PrimitiveTypes() choice.specs["SequenceOf"] = SequenceOf(schema=choice) choice.specs["SetOf"] = SetOf(schema=choice) - for i in range(31): + for i in six_xrange(31): choice.specs["SequenceOf%d" % i] = SequenceOf( schema=choice, expl=tag_ctxc(i), @@ -5332,7 +6262,7 @@ def generic_decoder(): # pragma: no cover def pprint_any( obj, - oids=None, + oid_maps=(), with_colours=False, with_decode_path=False, decode_path_only=(), @@ -5341,8 +6271,8 @@ def generic_decoder(): # pragma: no cover for pp in pps: if hasattr(pp, "_fields"): if ( - decode_path_only != () and - pp.decode_path[:len(decode_path_only)] != decode_path_only + decode_path_only != () and + pp.decode_path[:len(decode_path_only)] != decode_path_only ): continue if pp.asn1_type_name == Choice.asn1_type_name: @@ -5352,7 +6282,7 @@ def generic_decoder(): # pragma: no cover pp = _pp(**pp_kwargs) yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=False, with_colours=with_colours, @@ -5360,8 +6290,8 @@ def generic_decoder(): # pragma: no cover decode_path_len_decrease=len(decode_path_only), ) for row in pp_console_blob( - pp, - decode_path_len_decrease=len(decode_path_only), + pp, + decode_path_len_decrease=len(decode_path_only), ): yield row else: @@ -5382,7 +6312,7 @@ def main(): # pragma: no cover ) parser.add_argument( "--oids", - help="Python path to dictionary with OIDs", + help="Python paths to dictionary with OIDs, comma separated", ) parser.add_argument( "--schema", @@ -5420,7 +6350,10 @@ def main(): # pragma: no cover args.DERFile.seek(args.skip) der = memoryview(args.DERFile.read()) args.DERFile.close() - oids = obj_by_path(args.oids) if args.oids else {} + oid_maps = ( + [obj_by_path(_path) for _path in (args.oids or "").split(",")] + if args.oids else () + ) if args.schema: schema = obj_by_path(args.schema) from functools import partial @@ -5436,8 +6369,8 @@ def main(): # pragma: no cover obj, tail = schema().decode(der, ctx=ctx) print(pprinter( obj, - oids=oids, - with_colours=True if environ.get("NO_COLOR") is None else False, + oid_maps=oid_maps, + with_colours=environ.get("NO_COLOR") is None, with_decode_path=args.print_decode_path, decode_path_only=( () if args.decode_path_only is None else