X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=blobdiff_plain;f=pyderasn.py;h=5adc5b068d17915f0c543144b66557b9fc355f30;hp=1c5b53f1f617810c1e8c3cb906710a282cf23126;hb=3b5f3e7f4219b79410bd7b3e1a3d41be51f78f0a;hpb=abaaaec8b7a936d9a1f263d4ecc8be38f454cd73 diff --git a/pyderasn.py b/pyderasn.py index 1c5b53f..5adc5b0 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -1,12 +1,11 @@ #!/usr/bin/env python # coding: utf-8 # PyDERASN -- Python ASN.1 DER/BER codec with abstract structures -# Copyright (C) 2017-2019 Sergey Matveev +# Copyright (C) 2017-2020 Sergey Matveev # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. +# published by the Free Software Foundation, version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -14,8 +13,7 @@ # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public -# License along with this program. If not, see -# . +# License along with this program. If not, see . """Python ASN.1 DER/BER codec with abstract structures This library allows you to marshal various structures in ASN.1 DER @@ -23,7 +21,7 @@ format, unmarshal them in BER/CER/DER ones. >>> i = Integer(123) >>> raw = i.encode() - >>> Integer().decode(raw) == i + >>> Integer().decod(raw) == i True There are primitive types, holding single values @@ -67,10 +65,11 @@ ____ Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is the default tag used during coding process. You can override it with -either ``IMPLICIT`` (using ``impl`` keyword argument), or -``EXPLICIT`` one (using ``expl`` keyword argument). Both arguments take -raw binary string, containing that tag. You can **not** set implicit and -explicit tags simultaneously. +either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl`` +class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword +argument or ``expl`` class attribute). Both arguments take raw binary +string, containing that tag. You can **not** set implicit and explicit +tags simultaneously. There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc` functions, allowing you to easily create ``CONTEXT`` @@ -163,21 +162,27 @@ All objects have ``ready`` boolean property, that tells if object is ready to be encoded. If that kind of action is performed on unready object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised. -All objects have ``copy()`` method, that returns their copy, that can be +All objects are friendly to ``copy.copy()`` and copied objects can be safely mutated. +Also all objects can be safely ``pickle``-d, but pay attention that +pickling among different PyDERASN versions is prohibited. + .. _decoding: Decoding -------- -Decoding is performed using ``decode()`` method. ``offset`` optional -argument could be used to set initial object's offset in the binary -data, for convenience. It returns decoded object and remaining -unmarshalled data (tail). Internally all work is done on +Decoding is performed using :py:meth:`pyderasn.Obj.decode` method. +``offset`` optional argument could be used to set initial object's +offset in the binary data, for convenience. It returns decoded object +and remaining unmarshalled data (tail). Internally all work is done on ``memoryview(data)``, and you can leave returning tail as a memoryview, by specifying ``leavemm=True`` argument. +Also note convenient :py:meth:`pyderasn.Obj.decod` method, that +immediately checks and raises if there is non-empty tail. + When object is decoded, ``decoded`` property is true and you can safely use following properties: @@ -207,9 +212,9 @@ When error occurs, :py:exc:`pyderasn.DecodeError` is raised. Context _______ -You can specify so called context keyword argument during ``decode()`` -invocation. It is dictionary containing various options governing -decoding process. +You can specify so called context keyword argument during +:py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing +various options governing decoding process. Currently available context options: @@ -348,6 +353,8 @@ DEFINED BY some previously met ObjectIdentifier. This library provides ability to specify mapping between some OID and field that must be decoded with specific specification. +.. _defines: + defines kwarg _____________ @@ -421,15 +428,15 @@ value must be sequence of following tuples:: where ``decode_path`` is a tuple holding so-called decode path to the exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply -``defines``, holding exactly the same value as accepted in its keyword -argument. +``defines``, holding exactly the same value as accepted in its +:ref:`keyword argument `. For example, again for CMS, you want to automatically decode ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse`` structures it may hold. Also, automatically decode ``controlSequence`` of ``PKIResponse``:: - content_info, tail = ContentInfo().decode(data, defines_by_path=( + content_info = ContentInfo().decod(data, ctx={"defines_by_path": ( ( ("contentType",), ((("content",), {id_signedData: SignedData()}),), @@ -464,7 +471,7 @@ of ``PKIResponse``:: id_cmc_transactionId: TransactionId(), })), ), - )) + )}) Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``. First function is useful for path construction when some automatic @@ -517,6 +524,11 @@ lengths will be invalid in that case. This option should be used only for skipping some decode errors, just to see the decoded structure somehow. +Base Obj +-------- +.. autoclass:: pyderasn.Obj + :members: + Primitive types --------------- @@ -565,6 +577,7 @@ _____________ PrintableString _______________ .. autoclass:: pyderasn.PrintableString + :members: __init__ UTCTime _______ @@ -626,10 +639,10 @@ Various .. autofunction:: pyderasn.tag_decode .. autofunction:: pyderasn.tag_ctxp .. autofunction:: pyderasn.tag_ctxc -.. autoclass:: pyderasn.Obj .. autoclass:: pyderasn.DecodeError :members: __init__ .. autoclass:: pyderasn.NotEnoughData +.. autoclass:: pyderasn.ExceedingData .. autoclass:: pyderasn.LenIndefForm .. autoclass:: pyderasn.TagMismatch .. autoclass:: pyderasn.InvalidLength @@ -650,6 +663,7 @@ from math import ceil from os import environ from string import ascii_letters from string import digits +from unicodedata import category as unicat from six import add_metaclass from six import binary_type @@ -670,9 +684,10 @@ from six.moves import xrange as six_xrange try: from termcolor import colored except ImportError: # pragma: no cover - def colored(what, *args): + def colored(what, *args, **kwargs): return what +__version__ = "6.0" __all__ = ( "Any", @@ -684,6 +699,7 @@ __all__ = ( "DecodeError", "DecodePathDefBy", "Enumerated", + "ExceedingData", "GeneralizedTime", "GeneralString", "GraphicString", @@ -794,6 +810,18 @@ class NotEnoughData(DecodeError): pass +class ExceedingData(ASN1Error): + def __init__(self, nbytes): + super(ExceedingData, self).__init__() + self.nbytes = nbytes + + def __str__(self): + return "%d trailing bytes" % self.nbytes + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self) + + class LenIndefForm(DecodeError): pass @@ -1019,9 +1047,9 @@ def len_decode(data): ######################################################################## class AutoAddSlots(type): - def __new__(mcs, name, bases, _dict): + def __new__(cls, name, bases, _dict): _dict["__slots__"] = _dict.get("__slots__", ()) - return type.__new__(mcs, name, bases, _dict) + return type.__new__(cls, name, bases, _dict) @add_metaclass(AutoAddSlots) @@ -1088,17 +1116,36 @@ class Obj(object): """ return (self.llen + self.vlen) > 0 - def copy(self): # pragma: no cover - """Make a copy of object, safe to be mutated + def __getstate__(self): # pragma: no cover + """Used for making safe to be mutable pickleable copies """ raise NotImplementedError() + def __setstate__(self, state): + if state.version != __version__: + raise ValueError("data is pickled by different PyDERASN version") + self.tag = self.tag_default + self._value = None + self._expl = None + self.default = None + self.optional = False + self.offset = 0 + self.llen = 0 + self.vlen = 0 + self.expl_lenindef = False + self.lenindef = False + self.ber_encoded = False + @property def tlen(self): + """See :ref:`decoding` + """ return len(self.tag) @property def tlvlen(self): + """See :ref:`decoding` + """ return self.tlen + self.llen + self.vlen def __str__(self): # pragma: no cover @@ -1123,11 +1170,20 @@ class Obj(object): raise NotImplementedError() def encode(self): + """Encode the structure + + :returns: DER representation + """ raw = self._encode() if self._expl is None: return raw return b"".join((self._expl, len_encode(len(raw)), raw)) + def hexencode(self): + """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode` + """ + return hexenc(self.encode()) + def decode( self, data, @@ -1148,8 +1204,11 @@ class Obj(object): :param tag_only: decode only the tag, without length and contents (used only in Choice and Set structures, trying to determine if tag satisfies the scheme) - :param _ctx_immutable: do we need to copy ``ctx`` before using it + :param _ctx_immutable: do we need to ``copy.copy()`` ``ctx`` + before using it? :returns: (Obj, remaining data) + + .. seealso:: :ref:`decoding` """ if ctx is None: ctx = {} @@ -1165,7 +1224,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: - return + return None obj, tail = result else: try: @@ -1203,7 +1262,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: # pragma: no cover - return + return None obj, tail = result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: @@ -1238,7 +1297,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: # pragma: no cover - return + return None obj, tail = result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( @@ -1249,48 +1308,96 @@ class Obj(object): ) return obj, (tail if leavemm else tail.tobytes()) + def decod(self, data, offset=0, decode_path=(), ctx=None): + """Decode the data, check that tail is empty + + :raises ExceedingData: if tail is not empty + + This is just a wrapper over :py:meth:`pyderasn.Obj.decode` + (decode without tail) that also checks that there is no + trailing data left. + """ + obj, tail = self.decode( + data, + offset=offset, + decode_path=decode_path, + ctx=ctx, + leavemm=True, + ) + if len(tail) > 0: + raise ExceedingData(len(tail)) + return obj + + def hexdecode(self, data, *args, **kwargs): + """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data + """ + return self.decode(hexdec(data), *args, **kwargs) + + def hexdecod(self, data, *args, **kwargs): + """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data + """ + return self.decod(hexdec(data), *args, **kwargs) + @property def expled(self): + """See :ref:`decoding` + """ return self._expl is not None @property def expl_tag(self): + """See :ref:`decoding` + """ return self._expl @property def expl_tlen(self): + """See :ref:`decoding` + """ return len(self._expl) @property def expl_llen(self): + """See :ref:`decoding` + """ if self.expl_lenindef: return 1 return len(len_encode(self.tlvlen)) @property def expl_offset(self): + """See :ref:`decoding` + """ return self.offset - self.expl_tlen - self.expl_llen @property def expl_vlen(self): + """See :ref:`decoding` + """ return self.tlvlen @property def expl_tlvlen(self): + """See :ref:`decoding` + """ return self.expl_tlen + self.expl_llen + self.expl_vlen @property def fulloffset(self): + """See :ref:`decoding` + """ return self.expl_offset if self.expled else self.offset @property def fulllen(self): + """See :ref:`decoding` + """ return self.expl_tlvlen if self.expled else self.tlvlen def pps_lenindef(self, decode_path): if self.lenindef and not ( - getattr(self, "defined", None) is not None and - self.defined[1].lenindef + getattr(self, "defined", None) is not None and + self.defined[1].lenindef ): yield _pp( asn1_type_name="EOC", @@ -1435,7 +1542,7 @@ def colonize_hex(hexed): def pp_console_row( pp, - oids=None, + oid_maps=(), with_offsets=False, with_blob=True, with_colours=False, @@ -1470,14 +1577,18 @@ def pp_console_row( if isinstance(ent, DecodePathDefBy): cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",))) value = str(ent.defined_by) + oid_name = None if ( - oids is not None and + len(oid_maps) > 0 and ent.defined_by.asn1_type_name == - ObjectIdentifier.asn1_type_name and - value in oids + ObjectIdentifier.asn1_type_name ): - cols.append(_colourize("%s:" % oids[value], "green", with_colours)) - else: + for oid_map in oid_maps: + oid_name = oid_map.get(value) + if oid_name is not None: + cols.append(_colourize("%s:" % oid_name, "green", with_colours)) + break + if oid_name is None: cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",))) else: cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",))) @@ -1498,11 +1609,14 @@ def pp_console_row( value = pp.value cols.append(_colourize(value, "white", with_colours, ("reverse",))) if ( - oids is not None and - pp.asn1_type_name == ObjectIdentifier.asn1_type_name and - value in oids + len(oid_maps) > 0 and + pp.asn1_type_name == ObjectIdentifier.asn1_type_name ): - cols.append(_colourize("(%s)" % oids[value], "green", with_colours)) + for oid_map in oid_maps: + oid_name = oid_map.get(value) + if oid_name is not None: + cols.append(_colourize("(%s)" % oid_name, "green", with_colours)) + break if pp.asn1_type_name == Integer.asn1_type_name: hex_repr = hex(int(pp.obj._value))[2:].upper() if len(hex_repr) % 2 != 0: @@ -1546,7 +1660,7 @@ def pp_console_blob(pp, decode_path_len_decrease=0): def pprint( obj, - oids=None, + oid_maps=(), big_blobs=False, with_colours=False, with_decode_path=False, @@ -1555,8 +1669,8 @@ def pprint( """Pretty print object :param Obj obj: object you want to pretty print - :param oids: ``OID <-> humand readable string`` dictionary. When OID - from it is met, then its humand readable form is printed + :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary. + Its human readable form is printed when OID is met :param big_blobs: if large binary objects are met (like OctetString values), do we need to print them too, on separate lines @@ -1569,16 +1683,16 @@ def pprint( for pp in pps: if hasattr(pp, "_fields"): if ( - decode_path_only != () and - tuple( - str(p) for p in pp.decode_path[:len(decode_path_only)] - ) != decode_path_only + decode_path_only != () and + tuple( + str(p) for p in pp.decode_path[:len(decode_path_only)] + ) != decode_path_only ): continue if big_blobs: yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=False, with_colours=with_colours, @@ -1586,14 +1700,14 @@ def pprint( decode_path_len_decrease=len(decode_path_only), ) for row in pp_console_blob( - pp, - decode_path_len_decrease=len(decode_path_only), + pp, + decode_path_len_decrease=len(decode_path_only), ): yield row else: yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=True, with_colours=with_colours, @@ -1610,6 +1724,22 @@ def pprint( # ASN.1 primitive types ######################################################################## +BooleanState = namedtuple("BooleanState", ( + "version", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +)) + + class Boolean(Obj): """``BOOLEAN`` boolean type @@ -1664,20 +1794,35 @@ class Boolean(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__() - obj._value = self._value - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return BooleanState( + __version__, + self._value, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Boolean, self).__setstate__(state) + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __nonzero__(self): self._assert_ready() @@ -1739,7 +1884,7 @@ class Boolean(Obj): offset=offset, ) if tag_only: - return + return None try: l, _, v = len_decode(lv) except DecodeError as err: @@ -1820,6 +1965,25 @@ class Boolean(Obj): yield pp +IntegerState = namedtuple("IntegerState", ( + "version", + "specs", + "value", + "bound_min", + "bound_max", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +)) + + class Integer(Obj): """``INTEGER`` integer type @@ -1921,22 +2085,41 @@ class Integer(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__(_specs=self.specs) - obj._value = self._value - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return IntegerState( + __version__, + self.specs, + self._value, + self._bound_min, + self._bound_max, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Integer, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self._bound_min = state.bound_min + self._bound_max = state.bound_max + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __int__(self): self._assert_ready() @@ -1969,6 +2152,7 @@ class Integer(Obj): for name, value in iteritems(self.specs): if value == self._value: return name + return None def __call__( self, @@ -2048,7 +2232,7 @@ class Integer(Obj): offset=offset, ) if tag_only: - return + return None try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2151,6 +2335,23 @@ class Integer(Obj): SET01 = frozenset(("0", "1")) +BitStringState = namedtuple("BitStringState", ( + "version", + "specs", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "tag_constructed", + "defined", +)) class BitString(Obj): @@ -2268,7 +2469,7 @@ class BitString(Obj): if not frozenset(value) <= SET01: raise ValueError("B's coding contains unacceptable chars") return self._bits2octets(value) - elif value.endswith("'H"): + if value.endswith("'H"): value = value[1:-2] return ( len(value) * 4, @@ -2276,8 +2477,7 @@ class BitString(Obj): ) if isinstance(value, binary_type): return (len(value) * 8, value) - else: - raise InvalidValueType((self.__class__, string_types, binary_type)) + raise InvalidValueType((self.__class__, string_types, binary_type)) if isinstance(value, tuple): if ( len(value) == 2 and @@ -2306,23 +2506,41 @@ class BitString(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__(_specs=self.specs) - value = self._value - if value is not None: - value = (value[0], value[1]) - obj._value = value - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return BitStringState( + __version__, + self.specs, + self._value, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.tag_constructed, + self.defined, + ) + + def __setstate__(self, state): + super(BitString, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.tag_constructed = state.tag_constructed + self.defined = state.defined def __iter__(self): self._assert_ready() @@ -2396,7 +2614,7 @@ class BitString(Obj): octets, )) - def _decode_chunk(self, lv, offset, decode_path, ctx): + def _decode_chunk(self, lv, offset, decode_path): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2466,8 +2684,8 @@ class BitString(Obj): ) if t == self.tag: if tag_only: # pragma: no cover - return - return self._decode_chunk(lv, offset, decode_path, ctx) + return None + return self._decode_chunk(lv, offset, decode_path) if t == self.tag_constructed: if not ctx.get("bered", False): raise DecodeError( @@ -2477,7 +2695,7 @@ class BitString(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None lenindef = False try: l, llen, v = len_decode(lv) @@ -2627,6 +2845,26 @@ class BitString(Obj): yield pp +OctetStringState = namedtuple("OctetStringState", ( + "version", + "value", + "bound_min", + "bound_max", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "tag_constructed", + "defined", +)) + + class OctetString(Obj): """``OCTET STRING`` binary string type @@ -2675,13 +2913,7 @@ class OctetString(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(OctetString, self).__init__( - impl, - expl, - default, - optional, - _decoded, - ) + super(OctetString, self).__init__(impl, expl, default, optional, _decoded) self._value = value self._bound_min, self._bound_max = getattr( self, @@ -2722,22 +2954,43 @@ class OctetString(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__() - obj._value = self._value - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return OctetStringState( + __version__, + self._value, + self._bound_min, + self._bound_max, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.tag_constructed, + self.defined, + ) + + def __setstate__(self, state): + super(OctetString, self).__setstate__(state) + self._value = state.value + self._bound_min = state.bound_min + self._bound_max = state.bound_max + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.tag_constructed = state.tag_constructed + self.defined = state.defined def __bytes__(self): self._assert_ready() @@ -2786,7 +3039,7 @@ class OctetString(Obj): self._value, )) - def _decode_chunk(self, lv, offset, decode_path, ctx): + def _decode_chunk(self, lv, offset, decode_path): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2842,8 +3095,8 @@ class OctetString(Obj): ) if t == self.tag: if tag_only: - return - return self._decode_chunk(lv, offset, decode_path, ctx) + return None + return self._decode_chunk(lv, offset, decode_path) if t == self.tag_constructed: if not ctx.get("bered", False): raise DecodeError( @@ -2853,7 +3106,7 @@ class OctetString(Obj): offset=offset, ) if tag_only: - return + return None lenindef = False try: l, llen, v = len_decode(lv) @@ -2982,6 +3235,21 @@ class OctetString(Obj): yield pp +NullState = namedtuple("NullState", ( + "version", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +)) + + class Null(Obj): """``NULL`` null object @@ -3014,19 +3282,33 @@ class Null(Obj): def ready(self): return True - def copy(self): - obj = self.__class__() - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return NullState( + __version__, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Null, self).__setstate__(state) + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if not issubclass(their.__class__, Null): @@ -3069,7 +3351,7 @@ class Null(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None try: l, _, v = len_decode(lv) except DecodeError as err: @@ -3121,6 +3403,23 @@ class Null(Obj): yield pp +ObjectIdentifierState = namedtuple("ObjectIdentifierState", ( + "version", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "defines", +)) + + class ObjectIdentifier(Obj): """``OBJECT IDENTIFIER`` OID type @@ -3171,13 +3470,7 @@ class ObjectIdentifier(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(ObjectIdentifier, self).__init__( - impl, - expl, - default, - optional, - _decoded, - ) + super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded) self._value = value if value is not None: self._value = self._value_sanitize(value) @@ -3225,21 +3518,37 @@ class ObjectIdentifier(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__() - obj._value = self._value - obj.defines = self.defines - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return ObjectIdentifierState( + __version__, + self._value, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.defines, + ) + + def __setstate__(self, state): + super(ObjectIdentifier, self).__setstate__(state) + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.defines = state.defines def __iter__(self): self._assert_ready() @@ -3324,7 +3633,7 @@ class ObjectIdentifier(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3448,13 +3757,7 @@ class Enumerated(Integer): bounds=None, # dummy argument, workability for Integer.decode ): super(Enumerated, self).__init__( - value=value, - impl=impl, - expl=expl, - default=default, - optional=optional, - _specs=_specs, - _decoded=_decoded, + value, bounds, impl, expl, default, optional, _specs, _decoded, ) if len(self.specs) == 0: raise ValueError("schema must be specified") @@ -3479,23 +3782,6 @@ class Enumerated(Integer): raise InvalidValueType((self.__class__, int, str)) return value - def copy(self): - obj = self.__class__(_specs=self.specs) - obj._value = self._value - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj - def __call__( self, value=None, @@ -3515,6 +3801,12 @@ class Enumerated(Integer): ) +def escape_control_unicode(c): + if unicat(c).startswith("C"): + c = repr(c).lstrip("u").strip("'") + return c + + class CommonString(OctetString): """Common class for all strings @@ -3577,7 +3869,7 @@ class CommonString(OctetString): * - :py:class:`pyderasn.BMPString` - utf-16-be """ - __slots__ = ("encoding",) + __slots__ = () def _value_sanitize(self, value): value_raw = None @@ -3633,7 +3925,10 @@ class CommonString(OctetString): def pps(self, decode_path=(), no_unicode=False): value = None if self.ready: - value = hexenc(bytes(self)) if no_unicode else self.__unicode__() + value = ( + hexenc(bytes(self)) if no_unicode else + "".join(escape_control_unicode(c) for c in self.__unicode__()) + ) yield _pp( obj=self, asn1_type_name=self.asn1_type_name, @@ -3697,6 +3992,12 @@ class NumericString(AllowableCharsMixin, CommonString): return value +PrintableStringState = namedtuple( + "PrintableStringState", + OctetStringState._fields + ("allowable_chars",), +) + + class PrintableString(AllowableCharsMixin, CommonString): """Printable string @@ -3704,6 +4005,10 @@ class PrintableString(AllowableCharsMixin, CommonString): >>> PrintableString().allowable_chars frozenset([' ', "'", ..., 'z']) + >>> obj = PrintableString("foo*bar", allow_asterisk=True) + PrintableString PrintableString foo*bar + >>> obj.allow_asterisk, obj.allow_ampersand + (True, False) """ __slots__ = () tag_default = tag_encode(19) @@ -3712,6 +4017,44 @@ class PrintableString(AllowableCharsMixin, CommonString): _allowable_chars = frozenset( (ascii_letters + digits + " '()+,-./:=?").encode("ascii") ) + _asterisk = frozenset("*".encode("ascii")) + _ampersand = frozenset("&".encode("ascii")) + + def __init__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + allow_asterisk=False, + allow_ampersand=False, + ): + """ + :param allow_asterisk: allow asterisk character + :param allow_ampersand: allow ampersand character + """ + if allow_asterisk: + self._allowable_chars |= self._asterisk + if allow_ampersand: + self._allowable_chars |= self._ampersand + super(PrintableString, self).__init__( + value, bounds, impl, expl, default, optional, _decoded, + ) + + @property + def allow_asterisk(self): + """Is asterisk character allowed? + """ + return self._asterisk <= self._allowable_chars + + @property + def allow_ampersand(self): + """Is ampersand character allowed? + """ + return self._ampersand <= self._allowable_chars def _value_sanitize(self, value): value = super(PrintableString, self)._value_sanitize(value) @@ -3719,6 +4062,39 @@ class PrintableString(AllowableCharsMixin, CommonString): raise DecodeError("non-printable value") return value + def __getstate__(self): + return PrintableStringState( + *super(PrintableString, self).__getstate__(), + **{"allowable_chars": self._allowable_chars} + ) + + def __setstate__(self, state): + super(PrintableString, self).__setstate__(state) + self._allowable_chars = state.allowable_chars + + def __call__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + bounds=( + (self._bound_min, self._bound_max) + if bounds is None else bounds + ), + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + allow_asterisk=self.allow_asterisk, + allow_ampersand=self.allow_ampersand, + ) + class TeletexString(CommonString): __slots__ = () @@ -3793,11 +4169,7 @@ class UTCTime(CommonString): :param bool optional: is object ``OPTIONAL`` in sequence """ super(UTCTime, self).__init__( - impl=impl, - expl=expl, - default=default, - optional=optional, - _decoded=_decoded, + None, None, impl, expl, default, optional, _decoded, ) self._value = value if value is not None: @@ -3832,7 +4204,7 @@ class UTCTime(CommonString): try: value_decoded = value.decode("ascii") except (UnicodeEncodeError, UnicodeDecodeError) as err: - raise DecodeError("invalid UTCTime encoding") + raise DecodeError("invalid UTCTime encoding: %r" % err) try: self._strptime(value_decoded) except (TypeError, ValueError) as err: @@ -3928,6 +4300,10 @@ class GeneralizedTime(UTCTime): Only microsecond fractions are supported. :py:exc:`pyderasn.DecodeError` will be raised during decoding of higher precision values. + + .. warning:: + + Zero year is unsupported. """ __slots__ = () tag_default = tag_encode(24) @@ -3936,7 +4312,7 @@ class GeneralizedTime(UTCTime): def _strptime(self, value): l = len(value) if l == LEN_YYYYMMDDHHMMSSZ: - # datetime.strptime's format: %y%m%d%H%M%SZ + # datetime.strptime's format: %Y%m%d%H%M%SZ if value[-1] != "Z": raise ValueError("non UTC timezone") return datetime( @@ -3977,7 +4353,7 @@ class GeneralizedTime(UTCTime): try: value_decoded = value.decode("ascii") except (UnicodeEncodeError, UnicodeDecodeError) as err: - raise DecodeError("invalid GeneralizedTime encoding") + raise DecodeError("invalid GeneralizedTime encoding: %r" % err) try: self._strptime(value_decoded) except (TypeError, ValueError) as err: @@ -4039,6 +4415,23 @@ class BMPString(CommonString): asn1_type_name = "BMPString" +ChoiceState = namedtuple("ChoiceState", ( + "version", + "specs", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +)) + + class Choice(Obj): """``CHOICE`` special type @@ -4112,7 +4505,7 @@ class Choice(Obj): default_obj._value = default_value self.default = default_obj if value is None: - self._value = default_obj.copy()._value + self._value = copy(default_obj._value) def _value_sanitize(self, value): if isinstance(value, tuple) and len(value) == 2: @@ -4138,21 +4531,36 @@ class Choice(Obj): self._value[1].bered ) - def copy(self): - obj = self.__class__(schema=self.specs) - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - value = self._value - if value is not None: - obj._value = (value[0], value[1].copy()) - return obj + def __getstate__(self): + return ChoiceState( + __version__, + self.specs, + copy(self._value), + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Choice, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if isinstance(their, tuple) and len(their) == 2: @@ -4242,7 +4650,7 @@ class Choice(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None value, tail = spec.decode( tlv, offset=offset, @@ -4296,9 +4704,9 @@ class PrimitiveTypes(Choice): It could be useful for general decoding of some unspecified values: - >>> PrimitiveTypes().decode(hexdec("0403666f6f"))[0].value + >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value OCTET STRING 3 bytes 666f6f - >>> PrimitiveTypes().decode(hexdec("0203123456"))[0].value + >>> PrimitiveTypes().decod(hexdec("0203123456")).value INTEGER 1193046 """ __slots__ = () @@ -4326,6 +4734,22 @@ class PrimitiveTypes(Choice): )) +AnyState = namedtuple("AnyState", ( + "version", + "value", + "tag", + "expl", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "defined", +)) + + class Any(Obj): """``ANY`` special type @@ -4380,19 +4804,35 @@ class Any(Obj): return False return self.defined[1].bered - def copy(self): - obj = self.__class__() - obj._value = self._value - obj.tag = self.tag - obj._expl = self._expl - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return AnyState( + __version__, + self._value, + self.tag, + self._expl, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.defined, + ) + + def __setstate__(self, state): + super(Any, self).__setstate__(state) + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.defined = state.defined def __eq__(self, their): if isinstance(their, binary_type): @@ -4468,7 +4908,7 @@ class Any(Obj): _decoded=(offset, 0, tlvlen), ) obj.lenindef = True - obj.tag = t + obj.tag = t.tobytes() return obj, v[EOC_LEN:] except DecodeError as err: raise err.__class__( @@ -4492,7 +4932,7 @@ class Any(Obj): optional=self.optional, _decoded=(offset, 0, tlvlen), ) - obj.tag = t + obj.tag = t.tobytes() return obj, tail def __repr__(self): @@ -4572,6 +5012,23 @@ def abs_decode_path(decode_path, rel_path): return decode_path + rel_path +SequenceState = namedtuple("SequenceState", ( + "version", + "specs", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +)) + + class Sequence(Obj): """``SEQUENCE`` structure type @@ -4701,7 +5158,7 @@ class Sequence(Obj): default_obj._value = default_value self.default = default_obj if value is None: - self._value = default_obj.copy()._value + self._value = copy(default_obj._value) @property def ready(self): @@ -4711,9 +5168,8 @@ class Sequence(Obj): if spec.optional: continue return False - else: - if not value.ready: - return False + if not value.ready: + return False return True @property @@ -4722,20 +5178,37 @@ class Sequence(Obj): return True return any(value.bered for value in itervalues(self._value)) - def copy(self): - obj = self.__class__(schema=self.specs) - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - obj._value = {k: v.copy() for k, v in iteritems(self._value)} - return obj + def __getstate__(self): + return SequenceState( + __version__, + self.specs, + {k: copy(v) for k, v in iteritems(self._value)}, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Sequence, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if not isinstance(their, self.__class__): @@ -4825,7 +5298,7 @@ class Sequence(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -4877,8 +5350,8 @@ class Sequence(Obj): ctx=ctx, _ctx_immutable=False, ) - except TagMismatch: - if spec.optional: + except TagMismatch as err: + if (len(err.decode_path) == len(decode_path) + 1) and spec.optional: continue raise @@ -5075,7 +5548,7 @@ class Set(Sequence): offset=offset, ) if tag_only: - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5202,6 +5675,25 @@ class Set(Sequence): return obj, tail +SequenceOfState = namedtuple("SequenceOfState", ( + "version", + "spec", + "value", + "bound_min", + "bound_max", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +)) + + class SequenceOf(Obj): """``SEQUENCE OF`` sequence type @@ -5247,13 +5739,7 @@ class SequenceOf(Obj): optional=False, _decoded=(0, 0, 0), ): - super(SequenceOf, self).__init__( - impl, - expl, - default, - optional, - _decoded, - ) + super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded) if schema is None: schema = getattr(self, "schema", None) if schema is None: @@ -5277,7 +5763,7 @@ class SequenceOf(Obj): default_obj._value = default_value self.default = default_obj if value is None: - self._value = default_obj.copy()._value + self._value = copy(default_obj._value) def _value_sanitize(self, value): if issubclass(value.__class__, SequenceOf): @@ -5303,22 +5789,41 @@ class SequenceOf(Obj): return True return any(v.bered for v in self._value) - def copy(self): - obj = self.__class__(schema=self.spec) - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - obj._value = [v.copy() for v in self._value] - return obj + def __getstate__(self): + return SequenceOfState( + __version__, + self.spec, + [copy(v) for v in self._value], + self._bound_min, + self._bound_max, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(SequenceOf, self).__setstate__(state) + self.spec = state.spec + self._value = state.value + self._bound_min = state.bound_min + self._bound_max = state.bound_max + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if isinstance(their, self.__class__): @@ -5408,7 +5913,7 @@ class SequenceOf(Obj): offset=offset, ) if tag_only: - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5604,7 +6109,7 @@ def generic_decoder(): # pragma: no cover def pprint_any( obj, - oids=None, + oid_maps=(), with_colours=False, with_decode_path=False, decode_path_only=(), @@ -5613,8 +6118,8 @@ def generic_decoder(): # pragma: no cover for pp in pps: if hasattr(pp, "_fields"): if ( - decode_path_only != () and - pp.decode_path[:len(decode_path_only)] != decode_path_only + decode_path_only != () and + pp.decode_path[:len(decode_path_only)] != decode_path_only ): continue if pp.asn1_type_name == Choice.asn1_type_name: @@ -5624,7 +6129,7 @@ def generic_decoder(): # pragma: no cover pp = _pp(**pp_kwargs) yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=False, with_colours=with_colours, @@ -5632,8 +6137,8 @@ def generic_decoder(): # pragma: no cover decode_path_len_decrease=len(decode_path_only), ) for row in pp_console_blob( - pp, - decode_path_len_decrease=len(decode_path_only), + pp, + decode_path_len_decrease=len(decode_path_only), ): yield row else: @@ -5654,7 +6159,7 @@ def main(): # pragma: no cover ) parser.add_argument( "--oids", - help="Python path to dictionary with OIDs", + help="Python paths to dictionary with OIDs, comma separated", ) parser.add_argument( "--schema", @@ -5692,7 +6197,10 @@ def main(): # pragma: no cover args.DERFile.seek(args.skip) der = memoryview(args.DERFile.read()) args.DERFile.close() - oids = obj_by_path(args.oids) if args.oids else {} + oid_maps = ( + [obj_by_path(_path) for _path in (args.oids or "").split(",")] + if args.oids else () + ) if args.schema: schema = obj_by_path(args.schema) from functools import partial @@ -5708,8 +6216,8 @@ def main(): # pragma: no cover obj, tail = schema().decode(der, ctx=ctx) print(pprinter( obj, - oids=oids, - with_colours=True if environ.get("NO_COLOR") is None else False, + oid_maps=oid_maps, + with_colours=environ.get("NO_COLOR") is None, with_decode_path=args.print_decode_path, decode_path_only=( () if args.decode_path_only is None else