X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=pyderasn.py;h=39f2d885062a49f4e59b630e1305ffa21fede80b;hb=26557d24cc3e80445c4ce6792fb00e75e8697ef9;hp=c83b4393037e180c943c6e72decb9509b188a097;hpb=e7395b3353a1e14fb62ded263c102a707172e6cd;p=pyderasn.git diff --git a/pyderasn.py b/pyderasn.py index c83b439..39f2d88 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # coding: utf-8 +# cython: language_level=3 # PyDERASN -- Python ASN.1 DER/BER codec with abstract structures -# Copyright (C) 2017-2019 Sergey Matveev +# Copyright (C) 2017-2020 Sergey Matveev # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. +# published by the Free Software Foundation, version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -14,8 +14,7 @@ # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public -# License along with this program. If not, see -# . +# License along with this program. If not, see . """Python ASN.1 DER/BER codec with abstract structures This library allows you to marshal various structures in ASN.1 DER @@ -23,7 +22,7 @@ format, unmarshal them in BER/CER/DER ones. >>> i = Integer(123) >>> raw = i.encode() - >>> Integer().decode(raw) == i + >>> Integer().decod(raw) == i True There are primitive types, holding single values @@ -67,10 +66,11 @@ ____ Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is the default tag used during coding process. You can override it with -either ``IMPLICIT`` (using ``impl`` keyword argument), or -``EXPLICIT`` one (using ``expl`` keyword argument). Both arguments take -raw binary string, containing that tag. You can **not** set implicit and -explicit tags simultaneously. +either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl`` +class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword +argument or ``expl`` class attribute). Both arguments take raw binary +string, containing that tag. You can **not** set implicit and explicit +tags simultaneously. There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc` functions, allowing you to easily create ``CONTEXT`` @@ -163,21 +163,27 @@ All objects have ``ready`` boolean property, that tells if object is ready to be encoded. If that kind of action is performed on unready object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised. -All objects have ``copy()`` method, that returns their copy, that can be +All objects are friendly to ``copy.copy()`` and copied objects can be safely mutated. +Also all objects can be safely ``pickle``-d, but pay attention that +pickling among different PyDERASN versions is prohibited. + .. _decoding: Decoding -------- -Decoding is performed using ``decode()`` method. ``offset`` optional -argument could be used to set initial object's offset in the binary -data, for convenience. It returns decoded object and remaining -unmarshalled data (tail). Internally all work is done on +Decoding is performed using :py:meth:`pyderasn.Obj.decode` method. +``offset`` optional argument could be used to set initial object's +offset in the binary data, for convenience. It returns decoded object +and remaining unmarshalled data (tail). Internally all work is done on ``memoryview(data)``, and you can leave returning tail as a memoryview, by specifying ``leavemm=True`` argument. +Also note convenient :py:meth:`pyderasn.Obj.decod` method, that +immediately checks and raises if there is non-empty tail. + When object is decoded, ``decoded`` property is true and you can safely use following properties: @@ -207,9 +213,9 @@ When error occurs, :py:exc:`pyderasn.DecodeError` is raised. Context _______ -You can specify so called context keyword argument during ``decode()`` -invocation. It is dictionary containing various options governing -decoding process. +You can specify so called context keyword argument during +:py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing +various options governing decoding process. Currently available context options: @@ -239,6 +245,106 @@ all object ``repr``. But it is easy to write custom formatters. >>> print(pprint(obj)) 0 [1,1, 2] INTEGER -12345 +.. _pprint_example: + +Example certificate:: + + >>> print(pprint(crt)) + 0 [1,3,1604] Certificate SEQUENCE + 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL + 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 + 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL + . . . . 05:00 + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence + 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF + 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF + 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE + 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 + 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY + . . . . . . . 13:02:45:53 + [...] + 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE + 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL + . . . 05:00 + 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits + . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD + . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C + [...] + + Trailing data: 0a + +Let's parse that output, human:: + + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL + ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ + 0 1 2 3 4 5 6 7 8 9 10 11 + +:: + + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + ^ ^ ^ ^ ^ ^ ^ ^ + 0 2 3 4 5 6 9 10 + +:: + + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence + ^ ^ ^ ^ ^ ^ ^ ^ ^ + 0 2 3 4 5 6 8 9 10 + +:: + + 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes + ^ ^ ^ ^ ^ + 12 13 14 9 10 + +:0: + Offset of the object, where its DER/BER encoding begins. + Pay attention that it does **not** include explicit tag. +:1: + If explicit tag exists, then this is its length (tag + encoded length). +:2: + Length of object's tag. For example CHOICE does not have its own tag, + so it is zero. +:3: + Length of encoded length. +:4: + Length of encoded value. +:5: + Visual indentation to show the depth of object in the hierarchy. +:6: + Object's name inside SEQUENCE/CHOICE. +:7: + If either IMPLICIT or EXPLICIT tag is set, then it will be shown + here. "IMPLICIT" is omitted. +:8: + Object's class name, if set. Omitted if it is just an ordinary simple + value (like with ``algorithm`` in example above). +:9: + Object's ASN.1 type. +:10: + Object's value, if set. Can consist of multiple words (like OCTET/BIT + STRINGs above). We see ``v3`` value in Version, because it is named. + ``rdnSequence`` is the choice of CHOICE type. +:11: + Possible other flags like OPTIONAL and DEFAULT, if value equals to the + default one, specified in the schema. +:12: + Shows does object contains any kind of BER encoded data (possibly + Sequence holding BER-encoded underlying value). +:13: + Only applicable to BER encoded data. Indefinite length encoding mark. +:14: + Only applicable to BER encoded data. If object has BER-specific + encoding, then ``BER`` will be shown. It does not depend on indefinite + length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING`` + (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime`` + could be BERed. + + .. _definedby: DEFINED BY @@ -249,6 +355,8 @@ DEFINED BY some previously met ObjectIdentifier. This library provides ability to specify mapping between some OID and field that must be decoded with specific specification. +.. _defines: + defines kwarg _____________ @@ -322,15 +430,15 @@ value must be sequence of following tuples:: where ``decode_path`` is a tuple holding so-called decode path to the exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply -``defines``, holding exactly the same value as accepted in its keyword -argument. +``defines``, holding exactly the same value as accepted in its +:ref:`keyword argument `. For example, again for CMS, you want to automatically decode ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse`` structures it may hold. Also, automatically decode ``controlSequence`` of ``PKIResponse``:: - content_info, tail = ContentInfo().decode(data, defines_by_path=( + content_info = ContentInfo().decod(data, ctx={"defines_by_path": ( ( ("contentType",), ((("content",), {id_signedData: SignedData()}),), @@ -365,7 +473,7 @@ of ``PKIResponse``:: id_cmc_transactionId: TransactionId(), })), ), - )) + )}) Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``. First function is useful for path construction when some automatic @@ -384,8 +492,8 @@ constructed primitive types should be parsed successfully. * If object is encoded in BER form (not the DER one), then ``ber_encoded`` attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET - STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF`` - can contain it. + STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``, + ``UTCTime``, ``GeneralizedTime`` can contain it. * If object has an indefinite length encoding, then its ``lenindef`` attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``, ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can @@ -418,6 +526,11 @@ lengths will be invalid in that case. This option should be used only for skipping some decode errors, just to see the decoded structure somehow. +Base Obj +-------- +.. autoclass:: pyderasn.Obj + :members: + Primitive types --------------- @@ -463,6 +576,11 @@ NumericString _____________ .. autoclass:: pyderasn.NumericString +PrintableString +_______________ +.. autoclass:: pyderasn.PrintableString + :members: __init__ + UTCTime _______ .. autoclass:: pyderasn.UTCTime @@ -523,10 +641,10 @@ Various .. autofunction:: pyderasn.tag_decode .. autofunction:: pyderasn.tag_ctxp .. autofunction:: pyderasn.tag_ctxc -.. autoclass:: pyderasn.Obj .. autoclass:: pyderasn.DecodeError :members: __init__ .. autoclass:: pyderasn.NotEnoughData +.. autoclass:: pyderasn.ExceedingData .. autoclass:: pyderasn.LenIndefForm .. autoclass:: pyderasn.TagMismatch .. autoclass:: pyderasn.InvalidLength @@ -543,10 +661,12 @@ from collections import namedtuple from collections import OrderedDict from copy import copy from datetime import datetime +from datetime import timedelta from math import ceil from os import environ from string import ascii_letters from string import digits +from unicodedata import category as unicat from six import add_metaclass from six import binary_type @@ -567,9 +687,10 @@ from six.moves import xrange as six_xrange try: from termcolor import colored except ImportError: # pragma: no cover - def colored(what, *args): + def colored(what, *args, **kwargs): return what +__version__ = "6.1" __all__ = ( "Any", @@ -581,6 +702,7 @@ __all__ = ( "DecodeError", "DecodePathDefBy", "Enumerated", + "ExceedingData", "GeneralizedTime", "GeneralString", "GraphicString", @@ -642,6 +764,7 @@ EOC = b"\x00\x00" EOC_LEN = len(EOC) LENINDEF = b"\x80" # length indefinite mark LENINDEF_PP_CHAR = "I" if PY2 else "∞" +NAMEDTUPLE_KWARGS = {} if PY2 else {"module": __name__} ######################################################################## @@ -691,6 +814,18 @@ class NotEnoughData(DecodeError): pass +class ExceedingData(ASN1Error): + def __init__(self, nbytes): + super(ExceedingData, self).__init__() + self.nbytes = nbytes + + def __str__(self): + return "%d trailing bytes" % self.nbytes + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self) + + class LenIndefForm(DecodeError): pass @@ -916,9 +1051,9 @@ def len_decode(data): ######################################################################## class AutoAddSlots(type): - def __new__(mcs, name, bases, _dict): + def __new__(cls, name, bases, _dict): _dict["__slots__"] = _dict.get("__slots__", ()) - return type.__new__(mcs, name, bases, _dict) + return type.__new__(cls, name, bases, _dict) @add_metaclass(AutoAddSlots) @@ -985,17 +1120,36 @@ class Obj(object): """ return (self.llen + self.vlen) > 0 - def copy(self): # pragma: no cover - """Make a copy of object, safe to be mutated + def __getstate__(self): # pragma: no cover + """Used for making safe to be mutable pickleable copies """ raise NotImplementedError() + def __setstate__(self, state): + if state.version != __version__: + raise ValueError("data is pickled by different PyDERASN version") + self.tag = self.tag_default + self._value = None + self._expl = None + self.default = None + self.optional = False + self.offset = 0 + self.llen = 0 + self.vlen = 0 + self.expl_lenindef = False + self.lenindef = False + self.ber_encoded = False + @property def tlen(self): + """See :ref:`decoding` + """ return len(self.tag) @property def tlvlen(self): + """See :ref:`decoding` + """ return self.tlen + self.llen + self.vlen def __str__(self): # pragma: no cover @@ -1020,11 +1174,20 @@ class Obj(object): raise NotImplementedError() def encode(self): + """Encode the structure + + :returns: DER representation + """ raw = self._encode() if self._expl is None: return raw return b"".join((self._expl, len_encode(len(raw)), raw)) + def hexencode(self): + """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode` + """ + return hexenc(self.encode()) + def decode( self, data, @@ -1045,8 +1208,11 @@ class Obj(object): :param tag_only: decode only the tag, without length and contents (used only in Choice and Set structures, trying to determine if tag satisfies the scheme) - :param _ctx_immutable: do we need to copy ``ctx`` before using it + :param _ctx_immutable: do we need to ``copy.copy()`` ``ctx`` + before using it? :returns: (Obj, remaining data) + + .. seealso:: :ref:`decoding` """ if ctx is None: ctx = {} @@ -1062,7 +1228,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: - return + return None obj, tail = result else: try: @@ -1100,7 +1266,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: # pragma: no cover - return + return None obj, tail = result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: @@ -1135,7 +1301,7 @@ class Obj(object): tag_only=tag_only, ) if tag_only: # pragma: no cover - return + return None obj, tail = result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( @@ -1146,48 +1312,96 @@ class Obj(object): ) return obj, (tail if leavemm else tail.tobytes()) + def decod(self, data, offset=0, decode_path=(), ctx=None): + """Decode the data, check that tail is empty + + :raises ExceedingData: if tail is not empty + + This is just a wrapper over :py:meth:`pyderasn.Obj.decode` + (decode without tail) that also checks that there is no + trailing data left. + """ + obj, tail = self.decode( + data, + offset=offset, + decode_path=decode_path, + ctx=ctx, + leavemm=True, + ) + if len(tail) > 0: + raise ExceedingData(len(tail)) + return obj + + def hexdecode(self, data, *args, **kwargs): + """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data + """ + return self.decode(hexdec(data), *args, **kwargs) + + def hexdecod(self, data, *args, **kwargs): + """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data + """ + return self.decod(hexdec(data), *args, **kwargs) + @property def expled(self): + """See :ref:`decoding` + """ return self._expl is not None @property def expl_tag(self): + """See :ref:`decoding` + """ return self._expl @property def expl_tlen(self): + """See :ref:`decoding` + """ return len(self._expl) @property def expl_llen(self): + """See :ref:`decoding` + """ if self.expl_lenindef: return 1 return len(len_encode(self.tlvlen)) @property def expl_offset(self): + """See :ref:`decoding` + """ return self.offset - self.expl_tlen - self.expl_llen @property def expl_vlen(self): + """See :ref:`decoding` + """ return self.tlvlen @property def expl_tlvlen(self): + """See :ref:`decoding` + """ return self.expl_tlen + self.expl_llen + self.expl_vlen @property def fulloffset(self): + """See :ref:`decoding` + """ return self.expl_offset if self.expled else self.offset @property def fulllen(self): + """See :ref:`decoding` + """ return self.expl_tlvlen if self.expled else self.tlvlen def pps_lenindef(self, decode_path): if self.lenindef and not ( - getattr(self, "defined", None) is not None and - self.defined[1].lenindef + getattr(self, "defined", None) is not None and + self.defined[1].lenindef ): yield _pp( asn1_type_name="EOC", @@ -1267,7 +1481,7 @@ PP = namedtuple("PP", ( "lenindef", "ber_encoded", "bered", -)) +), **NAMEDTUPLE_KWARGS) def _pp( @@ -1332,7 +1546,7 @@ def colonize_hex(hexed): def pp_console_row( pp, - oids=None, + oid_maps=(), with_offsets=False, with_blob=True, with_colours=False, @@ -1367,14 +1581,18 @@ def pp_console_row( if isinstance(ent, DecodePathDefBy): cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",))) value = str(ent.defined_by) + oid_name = None if ( - oids is not None and + len(oid_maps) > 0 and ent.defined_by.asn1_type_name == - ObjectIdentifier.asn1_type_name and - value in oids + ObjectIdentifier.asn1_type_name ): - cols.append(_colourize("%s:" % oids[value], "green", with_colours)) - else: + for oid_map in oid_maps: + oid_name = oid_map.get(value) + if oid_name is not None: + cols.append(_colourize("%s:" % oid_name, "green", with_colours)) + break + if oid_name is None: cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",))) else: cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",))) @@ -1395,11 +1613,14 @@ def pp_console_row( value = pp.value cols.append(_colourize(value, "white", with_colours, ("reverse",))) if ( - oids is not None and - pp.asn1_type_name == ObjectIdentifier.asn1_type_name and - value in oids + len(oid_maps) > 0 and + pp.asn1_type_name == ObjectIdentifier.asn1_type_name ): - cols.append(_colourize("(%s)" % oids[value], "green", with_colours)) + for oid_map in oid_maps: + oid_name = oid_map.get(value) + if oid_name is not None: + cols.append(_colourize("(%s)" % oid_name, "green", with_colours)) + break if pp.asn1_type_name == Integer.asn1_type_name: hex_repr = hex(int(pp.obj._value))[2:].upper() if len(hex_repr) % 2 != 0: @@ -1443,7 +1664,7 @@ def pp_console_blob(pp, decode_path_len_decrease=0): def pprint( obj, - oids=None, + oid_maps=(), big_blobs=False, with_colours=False, with_decode_path=False, @@ -1452,8 +1673,8 @@ def pprint( """Pretty print object :param Obj obj: object you want to pretty print - :param oids: ``OID <-> humand readable string`` dictionary. When OID - from it is met, then its humand readable form is printed + :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary. + Its human readable form is printed when OID is met :param big_blobs: if large binary objects are met (like OctetString values), do we need to print them too, on separate lines @@ -1466,16 +1687,16 @@ def pprint( for pp in pps: if hasattr(pp, "_fields"): if ( - decode_path_only != () and - tuple( - str(p) for p in pp.decode_path[:len(decode_path_only)] - ) != decode_path_only + decode_path_only != () and + tuple( + str(p) for p in pp.decode_path[:len(decode_path_only)] + ) != decode_path_only ): continue if big_blobs: yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=False, with_colours=with_colours, @@ -1483,14 +1704,14 @@ def pprint( decode_path_len_decrease=len(decode_path_only), ) for row in pp_console_blob( - pp, - decode_path_len_decrease=len(decode_path_only), + pp, + decode_path_len_decrease=len(decode_path_only), ): yield row else: yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=True, with_colours=with_colours, @@ -1507,6 +1728,22 @@ def pprint( # ASN.1 primitive types ######################################################################## +BooleanState = namedtuple("BooleanState", ( + "version", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Boolean(Obj): """``BOOLEAN`` boolean type @@ -1561,20 +1798,35 @@ class Boolean(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__() - obj._value = self._value - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return BooleanState( + __version__, + self._value, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Boolean, self).__setstate__(state) + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __nonzero__(self): self._assert_ready() @@ -1636,7 +1888,7 @@ class Boolean(Obj): offset=offset, ) if tag_only: - return + return None try: l, _, v = len_decode(lv) except DecodeError as err: @@ -1717,6 +1969,25 @@ class Boolean(Obj): yield pp +IntegerState = namedtuple("IntegerState", ( + "version", + "specs", + "value", + "bound_min", + "bound_max", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Integer(Obj): """``INTEGER`` integer type @@ -1818,22 +2089,41 @@ class Integer(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__(_specs=self.specs) - obj._value = self._value - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return IntegerState( + __version__, + self.specs, + self._value, + self._bound_min, + self._bound_max, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Integer, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self._bound_min = state.bound_min + self._bound_max = state.bound_max + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __int__(self): self._assert_ready() @@ -1866,6 +2156,7 @@ class Integer(Obj): for name, value in iteritems(self.specs): if value == self._value: return name + return None def __call__( self, @@ -1945,7 +2236,7 @@ class Integer(Obj): offset=offset, ) if tag_only: - return + return None try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2048,6 +2339,23 @@ class Integer(Obj): SET01 = frozenset(("0", "1")) +BitStringState = namedtuple("BitStringState", ( + "version", + "specs", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "tag_constructed", + "defined", +), **NAMEDTUPLE_KWARGS) class BitString(Obj): @@ -2165,7 +2473,7 @@ class BitString(Obj): if not frozenset(value) <= SET01: raise ValueError("B's coding contains unacceptable chars") return self._bits2octets(value) - elif value.endswith("'H"): + if value.endswith("'H"): value = value[1:-2] return ( len(value) * 4, @@ -2173,8 +2481,7 @@ class BitString(Obj): ) if isinstance(value, binary_type): return (len(value) * 8, value) - else: - raise InvalidValueType((self.__class__, string_types, binary_type)) + raise InvalidValueType((self.__class__, string_types, binary_type)) if isinstance(value, tuple): if ( len(value) == 2 and @@ -2203,23 +2510,41 @@ class BitString(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__(_specs=self.specs) - value = self._value - if value is not None: - value = (value[0], value[1]) - obj._value = value - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return BitStringState( + __version__, + self.specs, + self._value, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.tag_constructed, + self.defined, + ) + + def __setstate__(self, state): + super(BitString, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.tag_constructed = state.tag_constructed + self.defined = state.defined def __iter__(self): self._assert_ready() @@ -2293,7 +2618,7 @@ class BitString(Obj): octets, )) - def _decode_chunk(self, lv, offset, decode_path, ctx): + def _decode_chunk(self, lv, offset, decode_path): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2363,8 +2688,8 @@ class BitString(Obj): ) if t == self.tag: if tag_only: # pragma: no cover - return - return self._decode_chunk(lv, offset, decode_path, ctx) + return None + return self._decode_chunk(lv, offset, decode_path) if t == self.tag_constructed: if not ctx.get("bered", False): raise DecodeError( @@ -2374,7 +2699,7 @@ class BitString(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None lenindef = False try: l, llen, v = len_decode(lv) @@ -2524,6 +2849,26 @@ class BitString(Obj): yield pp +OctetStringState = namedtuple("OctetStringState", ( + "version", + "value", + "bound_min", + "bound_max", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "tag_constructed", + "defined", +), **NAMEDTUPLE_KWARGS) + + class OctetString(Obj): """``OCTET STRING`` binary string type @@ -2561,6 +2906,7 @@ class OctetString(Obj): default=None, optional=False, _decoded=(0, 0, 0), + ctx=None, ): """ :param value: set the value. Either binary type, or @@ -2572,13 +2918,7 @@ class OctetString(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(OctetString, self).__init__( - impl, - expl, - default, - optional, - _decoded, - ) + super(OctetString, self).__init__(impl, expl, default, optional, _decoded) self._value = value self._bound_min, self._bound_max = getattr( self, @@ -2619,22 +2959,43 @@ class OctetString(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__() - obj._value = self._value - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return OctetStringState( + __version__, + self._value, + self._bound_min, + self._bound_max, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.tag_constructed, + self.defined, + ) + + def __setstate__(self, state): + super(OctetString, self).__setstate__(state) + self._value = state.value + self._bound_min = state.bound_min + self._bound_max = state.bound_max + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.tag_constructed = state.tag_constructed + self.defined = state.defined def __bytes__(self): self._assert_ready() @@ -2710,6 +3071,7 @@ class OctetString(Obj): default=self.default, optional=self.optional, _decoded=(offset, llen, l), + ctx=ctx, ) except DecodeError as err: raise DecodeError( @@ -2739,7 +3101,7 @@ class OctetString(Obj): ) if t == self.tag: if tag_only: - return + return None return self._decode_chunk(lv, offset, decode_path, ctx) if t == self.tag_constructed: if not ctx.get("bered", False): @@ -2750,7 +3112,7 @@ class OctetString(Obj): offset=offset, ) if tag_only: - return + return None lenindef = False try: l, llen, v = len_decode(lv) @@ -2818,6 +3180,7 @@ class OctetString(Obj): default=self.default, optional=self.optional, _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + ctx=ctx, ) except DecodeError as err: raise DecodeError( @@ -2879,6 +3242,21 @@ class OctetString(Obj): yield pp +NullState = namedtuple("NullState", ( + "version", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Null(Obj): """``NULL`` null object @@ -2911,19 +3289,33 @@ class Null(Obj): def ready(self): return True - def copy(self): - obj = self.__class__() - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return NullState( + __version__, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Null, self).__setstate__(state) + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if not issubclass(their.__class__, Null): @@ -2966,7 +3358,7 @@ class Null(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None try: l, _, v = len_decode(lv) except DecodeError as err: @@ -3018,6 +3410,30 @@ class Null(Obj): yield pp +ObjectIdentifierState = namedtuple("ObjectIdentifierState", ( + "version", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "defines", +), **NAMEDTUPLE_KWARGS) + + +def pureint(value): + i = int(value) + if (value[0] in "+- ") or (value[-1] == " "): + raise ValueError("non-pure integer") + return i + + class ObjectIdentifier(Obj): """``OBJECT IDENTIFIER`` OID type @@ -3068,13 +3484,7 @@ class ObjectIdentifier(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(ObjectIdentifier, self).__init__( - impl, - expl, - default, - optional, - _decoded, - ) + super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded) self._value = value if value is not None: self._value = self._value_sanitize(value) @@ -3101,7 +3511,7 @@ class ObjectIdentifier(Obj): return value._value if isinstance(value, string_types): try: - value = tuple(int(arc) for arc in value.split(".")) + value = tuple(pureint(arc) for arc in value.split(".")) except ValueError: raise InvalidOID("unacceptable arcs values") if isinstance(value, tuple): @@ -3115,6 +3525,8 @@ class ObjectIdentifier(Obj): pass else: raise InvalidOID("unacceptable first arc value") + if not all(arc >= 0 for arc in value): + raise InvalidOID("negative arc value") return value raise InvalidValueType((self.__class__, str, tuple)) @@ -3122,21 +3534,37 @@ class ObjectIdentifier(Obj): def ready(self): return self._value is not None - def copy(self): - obj = self.__class__() - obj._value = self._value - obj.defines = self.defines - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return ObjectIdentifierState( + __version__, + self._value, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.defines, + ) + + def __setstate__(self, state): + super(ObjectIdentifier, self).__setstate__(state) + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.defines = state.defines def __iter__(self): self._assert_ready() @@ -3221,7 +3649,7 @@ class ObjectIdentifier(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3345,13 +3773,7 @@ class Enumerated(Integer): bounds=None, # dummy argument, workability for Integer.decode ): super(Enumerated, self).__init__( - value=value, - impl=impl, - expl=expl, - default=default, - optional=optional, - _specs=_specs, - _decoded=_decoded, + value, bounds, impl, expl, default, optional, _specs, _decoded, ) if len(self.specs) == 0: raise ValueError("schema must be specified") @@ -3376,23 +3798,6 @@ class Enumerated(Integer): raise InvalidValueType((self.__class__, int, str)) return value - def copy(self): - obj = self.__class__(_specs=self.specs) - obj._value = self._value - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj - def __call__( self, value=None, @@ -3412,6 +3817,12 @@ class Enumerated(Integer): ) +def escape_control_unicode(c): + if unicat(c)[0] == "C": + c = repr(c).lstrip("u").strip("'") + return c + + class CommonString(OctetString): """Common class for all strings @@ -3474,7 +3885,7 @@ class CommonString(OctetString): * - :py:class:`pyderasn.BMPString` - utf-16-be """ - __slots__ = ("encoding",) + __slots__ = () def _value_sanitize(self, value): value_raw = None @@ -3530,7 +3941,10 @@ class CommonString(OctetString): def pps(self, decode_path=(), no_unicode=False): value = None if self.ready: - value = hexenc(bytes(self)) if no_unicode else self.__unicode__() + value = ( + hexenc(bytes(self)) if no_unicode else + "".join(escape_control_unicode(c) for c in self.__unicode__()) + ) yield _pp( obj=self, asn1_type_name=self.asn1_type_name, @@ -3579,7 +3993,7 @@ class NumericString(AllowableCharsMixin, CommonString): be stored. >>> NumericString().allowable_chars - set(['3', '4', '7', '5', '1', '0', '8', '9', ' ', '6', '2']) + frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ']) """ __slots__ = () tag_default = tag_encode(18) @@ -3594,13 +4008,24 @@ class NumericString(AllowableCharsMixin, CommonString): return value +PrintableStringState = namedtuple( + "PrintableStringState", + OctetStringState._fields + ("allowable_chars",), + **NAMEDTUPLE_KWARGS +) + + class PrintableString(AllowableCharsMixin, CommonString): """Printable string Its value is properly sanitized: see X.680 41.4 table 10. >>> PrintableString().allowable_chars - >>> set([' ', "'", ..., 'z']) + frozenset([' ', "'", ..., 'z']) + >>> obj = PrintableString("foo*bar", allow_asterisk=True) + PrintableString PrintableString foo*bar + >>> obj.allow_asterisk, obj.allow_ampersand + (True, False) """ __slots__ = () tag_default = tag_encode(19) @@ -3609,6 +4034,45 @@ class PrintableString(AllowableCharsMixin, CommonString): _allowable_chars = frozenset( (ascii_letters + digits + " '()+,-./:=?").encode("ascii") ) + _asterisk = frozenset("*".encode("ascii")) + _ampersand = frozenset("&".encode("ascii")) + + def __init__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + ctx=None, + allow_asterisk=False, + allow_ampersand=False, + ): + """ + :param allow_asterisk: allow asterisk character + :param allow_ampersand: allow ampersand character + """ + if allow_asterisk: + self._allowable_chars |= self._asterisk + if allow_ampersand: + self._allowable_chars |= self._ampersand + super(PrintableString, self).__init__( + value, bounds, impl, expl, default, optional, _decoded, ctx, + ) + + @property + def allow_asterisk(self): + """Is asterisk character allowed? + """ + return self._asterisk <= self._allowable_chars + + @property + def allow_ampersand(self): + """Is ampersand character allowed? + """ + return self._ampersand <= self._allowable_chars def _value_sanitize(self, value): value = super(PrintableString, self)._value_sanitize(value) @@ -3616,6 +4080,39 @@ class PrintableString(AllowableCharsMixin, CommonString): raise DecodeError("non-printable value") return value + def __getstate__(self): + return PrintableStringState( + *super(PrintableString, self).__getstate__(), + **{"allowable_chars": self._allowable_chars} + ) + + def __setstate__(self, state): + super(PrintableString, self).__setstate__(state) + self._allowable_chars = state.allowable_chars + + def __call__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + bounds=( + (self._bound_min, self._bound_max) + if bounds is None else bounds + ), + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + allow_asterisk=self.allow_asterisk, + allow_ampersand=self.allow_ampersand, + ) + class TeletexString(CommonString): __slots__ = () @@ -3648,7 +4145,26 @@ LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ") LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ") -class UTCTime(CommonString): +def fractions2float(fractions_raw): + pureint(fractions_raw) + return float("0." + fractions_raw) + + +class VisibleString(CommonString): + __slots__ = () + tag_default = tag_encode(26) + encoding = "ascii" + asn1_type_name = "VisibleString" + + +UTCTimeState = namedtuple( + "UTCTimeState", + OctetStringState._fields + ("ber_raw",), + **NAMEDTUPLE_KWARGS +) + + +class UTCTime(VisibleString): """``UTCTime`` datetime type >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123)) @@ -3664,9 +4180,18 @@ class UTCTime(CommonString): .. warning:: - No BER encodings are supported. Only DER. + Pay attention that UTCTime can not hold full year, so all years + having < 50 years are treated as 20xx, 19xx otherwise, according + to X.509 recommendation. + + .. warning:: + + No strict validation of UTC offsets are made, but very crude: + + * minutes are not exceeding 60 + * offset value is not exceeding 14 hours """ - __slots__ = () + __slots__ = ("_ber_raw",) tag_default = tag_encode(23) encoding = "ascii" asn1_type_name = "UTCTime" @@ -3680,6 +4205,7 @@ class UTCTime(CommonString): optional=False, _decoded=(0, 0, 0), bounds=None, # dummy argument, workability for OctetString.decode + ctx=None, ): """ :param value: set the value. Either datetime type, or @@ -3690,17 +4216,15 @@ class UTCTime(CommonString): :param bool optional: is object ``OPTIONAL`` in sequence """ super(UTCTime, self).__init__( - impl=impl, - expl=expl, - default=default, - optional=optional, - _decoded=_decoded, + None, None, impl, expl, None, optional, _decoded, ctx, ) self._value = value + self._ber_raw = None if value is not None: - self._value = self._value_sanitize(value) + self._value, self._ber_raw = self._value_sanitize(value, ctx) + self.ber_encoded = self._ber_raw is not None if default is not None: - default = self._value_sanitize(default) + default, _ = self._value_sanitize(default) self.default = self.__class__( value=default, impl=self.tag, @@ -3708,6 +4232,51 @@ class UTCTime(CommonString): ) if self._value is None: self._value = default + optional = True + self.optional = optional + + def _strptime_bered(self, value): + year = pureint(value[:2]) + year += 2000 if year < 50 else 1900 + decoded = datetime( + year, # %Y + pureint(value[2:4]), # %m + pureint(value[4:6]), # %d + pureint(value[6:8]), # %H + pureint(value[8:10]), # %M + ) + value = value[10:] + if len(value) == 0: + raise ValueError("no timezone") + offset = 0 + if value[-1] == "Z": + value = value[:-1] + else: + if len(value) < 5: + raise ValueError("invalid UTC offset") + if value[-5] == "-": + sign = -1 + elif value[-5] == "+": + sign = 1 + else: + raise ValueError("invalid UTC offset") + offset = 60 * pureint(value[-2:]) + if offset >= 3600: + raise ValueError("invalid UTC offset minutes") + offset += 3600 * pureint(value[-4:-2]) + if offset > 14 * 3600: + raise ValueError("too big UTC offset") + offset *= sign + value = value[:-5] + if len(value) == 0: + return offset, decoded + if len(value) != 2: + raise ValueError("invalid UTC offset seconds") + seconds = pureint(value) + if seconds >= 60: + raise ValueError("invalid seconds value") + decoded += timedelta(seconds=seconds) + return offset, decoded def _strptime(self, value): # datetime.strptime's format: %y%m%d%H%M%SZ @@ -3715,35 +4284,82 @@ class UTCTime(CommonString): raise ValueError("invalid UTCTime length") if value[-1] != "Z": raise ValueError("non UTC timezone") + year = pureint(value[:2]) + year += 2000 if year < 50 else 1900 return datetime( - 2000 + int(value[:2]), # %y - int(value[2:4]), # %m - int(value[4:6]), # %d - int(value[6:8]), # %H - int(value[8:10]), # %M - int(value[10:12]), # %S + year, # %y + pureint(value[2:4]), # %m + pureint(value[4:6]), # %d + pureint(value[6:8]), # %H + pureint(value[8:10]), # %M + pureint(value[10:12]), # %S ) - def _value_sanitize(self, value): + def _dt_sanitize(self, value): + if value.year < 1950 or value.year > 2049: + raise ValueError("UTCTime can hold only 1950-2049 years") + return value.replace(microsecond=0) + + def _value_sanitize(self, value, ctx=None): if isinstance(value, binary_type): try: value_decoded = value.decode("ascii") except (UnicodeEncodeError, UnicodeDecodeError) as err: - raise DecodeError("invalid UTCTime encoding") + raise DecodeError("invalid UTCTime encoding: %r" % err) + err = None try: - self._strptime(value_decoded) - except (TypeError, ValueError) as err: - raise DecodeError("invalid UTCTime format: %r" % err) - return value + return self._strptime(value_decoded), None + except (TypeError, ValueError) as _err: + err = _err + if (ctx is not None) and ctx.get("bered", False): + try: + offset, _value = self._strptime_bered(value_decoded) + _value = _value - timedelta(seconds=offset) + return self._dt_sanitize(_value), value_decoded + except (TypeError, ValueError, OverflowError) as _err: + err = _err + raise DecodeError( + "invalid %s format: %r" % (self.asn1_type_name, err), + klass=self.__class__, + ) if isinstance(value, self.__class__): - return value._value + return value._value, None if isinstance(value, datetime): - return value.strftime("%y%m%d%H%M%SZ").encode("ascii") + return self._dt_sanitize(value), None raise InvalidValueType((self.__class__, datetime)) + def _pp_value(self): + if self.ready: + value = self._value.isoformat() + if self.ber_encoded: + value += " (%s)" % self._ber_raw + return value + + def __unicode__(self): + if self.ready: + value = self._value.isoformat() + if self.ber_encoded: + value += " (%s)" % self._ber_raw + return value + return text_type(self._pp_value()) + + def __getstate__(self): + return UTCTimeState( + *super(UTCTime, self).__getstate__(), + **{"ber_raw": self._ber_raw} + ) + + def __setstate__(self, state): + super(UTCTime, self).__setstate__(state) + self._ber_raw = state.ber_raw + + def __bytes__(self): + self._assert_ready() + return self._encode_time() + def __eq__(self, their): if isinstance(their, binary_type): - return self._value == their + return self._encode_time() == their if isinstance(their, datetime): return self.todatetime() == their if not isinstance(their, self.__class__): @@ -3754,25 +4370,16 @@ class UTCTime(CommonString): self._expl == their._expl ) - def todatetime(self): - """Convert to datetime + def _encode_time(self): + return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii") - :returns: datetime + def _encode(self): + self._assert_ready() + value = self._encode_time() + return b"".join((self.tag, len_encode(len(value)), value)) - Pay attention that UTCTime can not hold full year, so all years - having < 50 years are treated as 20xx, 19xx otherwise, according - to X.509 recomendation. - """ - value = self._strptime(self._value.decode("ascii")) - year = value.year % 100 - return datetime( - year=(2000 + year) if year < 50 else (1900 + year), - month=value.month, - day=value.day, - hour=value.hour, - minute=value.minute, - second=value.second, - ) + def todatetime(self): + return self._value def __repr__(self): return pp_console_row(next(self.pps())) @@ -3783,7 +4390,7 @@ class UTCTime(CommonString): asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, - value=self.todatetime().isoformat() if self.ready else None, + value=self._pp_value(), optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), @@ -3818,31 +4425,105 @@ class GeneralizedTime(UTCTime): .. warning:: - No BER encodings are supported. Only DER. + Only microsecond fractions are supported in DER encoding. + :py:exc:`pyderasn.DecodeError` will be raised during decoding of + higher precision values. .. warning:: - Only microsecond fractions are supported. - :py:exc:`pyderasn.DecodeError` will be raised during decoding of - higher precision values. + BER encoded data can loss information (accuracy) during decoding + because of float transformations. + + .. warning:: + + Local times (without explicit timezone specification) are treated + as UTC one, no transformations are made. + + .. warning:: + + Zero year is unsupported. """ __slots__ = () tag_default = tag_encode(24) asn1_type_name = "GeneralizedTime" + def _dt_sanitize(self, value): + return value + + def _strptime_bered(self, value): + if len(value) < 4 + 3 * 2: + raise ValueError("invalid GeneralizedTime") + decoded = datetime( + pureint(value[:4]), # %Y + pureint(value[4:6]), # %m + pureint(value[6:8]), # %d + pureint(value[8:10]), # %H + ) + value = value[10:] + offset = 0 + if len(value) == 0: + return offset, decoded + if value[-1] == "Z": + value = value[:-1] + else: + for char, sign in (("-", -1), ("+", 1)): + idx = value.rfind(char) + if idx == -1: + continue + offset_raw = value[idx + 1:].replace(":", "") + if len(offset_raw) not in (2, 4): + raise ValueError("invalid UTC offset") + value = value[:idx] + offset = 60 * pureint(offset_raw[2:] or "0") + if offset >= 3600: + raise ValueError("invalid UTC offset minutes") + offset += 3600 * pureint(offset_raw[:2]) + if offset > 14 * 3600: + raise ValueError("too big UTC offset") + offset *= sign + break + if len(value) == 0: + return offset, decoded + decimal_signs = ".," + if value[0] in decimal_signs: + return offset, ( + decoded + timedelta(seconds=3600 * fractions2float(value[1:])) + ) + if len(value) < 2: + raise ValueError("stripped minutes") + decoded += timedelta(seconds=60 * pureint(value[:2])) + value = value[2:] + if len(value) == 0: + return offset, decoded + if value[0] in decimal_signs: + return offset, ( + decoded + timedelta(seconds=60 * fractions2float(value[1:])) + ) + if len(value) < 2: + raise ValueError("stripped seconds") + decoded += timedelta(seconds=pureint(value[:2])) + value = value[2:] + if len(value) == 0: + return offset, decoded + if value[0] not in decimal_signs: + raise ValueError("invalid format after seconds") + return offset, ( + decoded + timedelta(microseconds=10**6 * fractions2float(value[1:])) + ) + def _strptime(self, value): l = len(value) if l == LEN_YYYYMMDDHHMMSSZ: - # datetime.strptime's format: %y%m%d%H%M%SZ + # datetime.strptime's format: %Y%m%d%H%M%SZ if value[-1] != "Z": raise ValueError("non UTC timezone") return datetime( - int(value[:4]), # %Y - int(value[4:6]), # %m - int(value[6:8]), # %d - int(value[8:10]), # %H - int(value[10:12]), # %M - int(value[12:14]), # %S + pureint(value[:4]), # %Y + pureint(value[4:6]), # %m + pureint(value[6:8]), # %d + pureint(value[8:10]), # %H + pureint(value[10:12]), # %M + pureint(value[12:14]), # %S ) if l >= LEN_YYYYMMDDHHMMSSDMZ: # datetime.strptime's format: %Y%m%d%H%M%S.%fZ @@ -3856,44 +4537,25 @@ class GeneralizedTime(UTCTime): us_len = len(us) if us_len > 6: raise ValueError("only microsecond fractions are supported") - us = int(us + ("0" * (6 - us_len))) + us = pureint(us + ("0" * (6 - us_len))) decoded = datetime( - int(value[:4]), # %Y - int(value[4:6]), # %m - int(value[6:8]), # %d - int(value[8:10]), # %H - int(value[10:12]), # %M - int(value[12:14]), # %S + pureint(value[:4]), # %Y + pureint(value[4:6]), # %m + pureint(value[6:8]), # %d + pureint(value[8:10]), # %H + pureint(value[10:12]), # %M + pureint(value[12:14]), # %S us, # %f ) return decoded raise ValueError("invalid GeneralizedTime length") - def _value_sanitize(self, value): - if isinstance(value, binary_type): - try: - value_decoded = value.decode("ascii") - except (UnicodeEncodeError, UnicodeDecodeError) as err: - raise DecodeError("invalid GeneralizedTime encoding") - try: - self._strptime(value_decoded) - except (TypeError, ValueError) as err: - raise DecodeError( - "invalid GeneralizedTime format: %r" % err, - klass=self.__class__, - ) - return value - if isinstance(value, self.__class__): - return value._value - if isinstance(value, datetime): - encoded = value.strftime("%Y%m%d%H%M%S") - if value.microsecond > 0: - encoded = encoded + (".%06d" % value.microsecond).rstrip("0") - return (encoded + "Z").encode("ascii") - raise InvalidValueType((self.__class__, datetime)) - - def todatetime(self): - return self._strptime(self._value.decode("ascii")) + def _encode_time(self): + value = self._value + encoded = value.strftime("%Y%m%d%H%M%S") + if value.microsecond > 0: + encoded += (".%06d" % value.microsecond).rstrip("0") + return (encoded + "Z").encode("ascii") class GraphicString(CommonString): @@ -3903,13 +4565,6 @@ class GraphicString(CommonString): asn1_type_name = "GraphicString" -class VisibleString(CommonString): - __slots__ = () - tag_default = tag_encode(26) - encoding = "ascii" - asn1_type_name = "VisibleString" - - class ISO646String(VisibleString): __slots__ = () asn1_type_name = "ISO646String" @@ -3936,6 +4591,23 @@ class BMPString(CommonString): asn1_type_name = "BMPString" +ChoiceState = namedtuple("ChoiceState", ( + "version", + "specs", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Choice(Obj): """``CHOICE`` special type @@ -4009,7 +4681,7 @@ class Choice(Obj): default_obj._value = default_value self.default = default_obj if value is None: - self._value = default_obj.copy()._value + self._value = copy(default_obj._value) def _value_sanitize(self, value): if isinstance(value, tuple) and len(value) == 2: @@ -4035,21 +4707,36 @@ class Choice(Obj): self._value[1].bered ) - def copy(self): - obj = self.__class__(schema=self.specs) - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - value = self._value - if value is not None: - obj._value = (value[0], value[1].copy()) - return obj + def __getstate__(self): + return ChoiceState( + __version__, + self.specs, + copy(self._value), + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Choice, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if isinstance(their, tuple) and len(their) == 2: @@ -4139,7 +4826,7 @@ class Choice(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None value, tail = spec.decode( tlv, offset=offset, @@ -4193,9 +4880,9 @@ class PrimitiveTypes(Choice): It could be useful for general decoding of some unspecified values: - >>> PrimitiveTypes().decode(hexdec("0403666f6f"))[0].value + >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value OCTET STRING 3 bytes 666f6f - >>> PrimitiveTypes().decode(hexdec("0203123456"))[0].value + >>> PrimitiveTypes().decod(hexdec("0203123456")).value INTEGER 1193046 """ __slots__ = () @@ -4223,6 +4910,22 @@ class PrimitiveTypes(Choice): )) +AnyState = namedtuple("AnyState", ( + "version", + "value", + "tag", + "expl", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", + "defined", +), **NAMEDTUPLE_KWARGS) + + class Any(Obj): """``ANY`` special type @@ -4277,19 +4980,35 @@ class Any(Obj): return False return self.defined[1].bered - def copy(self): - obj = self.__class__() - obj._value = self._value - obj.tag = self.tag - obj._expl = self._expl - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - return obj + def __getstate__(self): + return AnyState( + __version__, + self._value, + self.tag, + self._expl, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + self.defined, + ) + + def __setstate__(self, state): + super(Any, self).__setstate__(state) + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + self.defined = state.defined def __eq__(self, their): if isinstance(their, binary_type): @@ -4365,7 +5084,7 @@ class Any(Obj): _decoded=(offset, 0, tlvlen), ) obj.lenindef = True - obj.tag = t + obj.tag = t.tobytes() return obj, v[EOC_LEN:] except DecodeError as err: raise err.__class__( @@ -4389,7 +5108,7 @@ class Any(Obj): optional=self.optional, _decoded=(offset, 0, tlvlen), ) - obj.tag = t + obj.tag = t.tobytes() return obj, tail def __repr__(self): @@ -4469,6 +5188,23 @@ def abs_decode_path(decode_path, rel_path): return decode_path + rel_path +SequenceState = namedtuple("SequenceState", ( + "version", + "specs", + "value", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class Sequence(Obj): """``SEQUENCE`` structure type @@ -4598,7 +5334,7 @@ class Sequence(Obj): default_obj._value = default_value self.default = default_obj if value is None: - self._value = default_obj.copy()._value + self._value = copy(default_obj._value) @property def ready(self): @@ -4608,9 +5344,8 @@ class Sequence(Obj): if spec.optional: continue return False - else: - if not value.ready: - return False + if not value.ready: + return False return True @property @@ -4619,20 +5354,37 @@ class Sequence(Obj): return True return any(value.bered for value in itervalues(self._value)) - def copy(self): - obj = self.__class__(schema=self.specs) - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - obj._value = {k: v.copy() for k, v in iteritems(self._value)} - return obj + def __getstate__(self): + return SequenceState( + __version__, + self.specs, + {k: copy(v) for k, v in iteritems(self._value)}, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(Sequence, self).__setstate__(state) + self.specs = state.specs + self._value = state.value + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if not isinstance(their, self.__class__): @@ -4722,7 +5474,7 @@ class Sequence(Obj): offset=offset, ) if tag_only: # pragma: no cover - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -4774,8 +5526,8 @@ class Sequence(Obj): ctx=ctx, _ctx_immutable=False, ) - except TagMismatch: - if spec.optional: + except TagMismatch as err: + if (len(err.decode_path) == len(decode_path) + 1) and spec.optional: continue raise @@ -4972,7 +5724,7 @@ class Set(Sequence): offset=offset, ) if tag_only: - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5099,6 +5851,25 @@ class Set(Sequence): return obj, tail +SequenceOfState = namedtuple("SequenceOfState", ( + "version", + "spec", + "value", + "bound_min", + "bound_max", + "tag", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + class SequenceOf(Obj): """``SEQUENCE OF`` sequence type @@ -5144,13 +5915,7 @@ class SequenceOf(Obj): optional=False, _decoded=(0, 0, 0), ): - super(SequenceOf, self).__init__( - impl, - expl, - default, - optional, - _decoded, - ) + super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded) if schema is None: schema = getattr(self, "schema", None) if schema is None: @@ -5174,7 +5939,7 @@ class SequenceOf(Obj): default_obj._value = default_value self.default = default_obj if value is None: - self._value = default_obj.copy()._value + self._value = copy(default_obj._value) def _value_sanitize(self, value): if issubclass(value.__class__, SequenceOf): @@ -5200,22 +5965,41 @@ class SequenceOf(Obj): return True return any(v.bered for v in self._value) - def copy(self): - obj = self.__class__(schema=self.spec) - obj._bound_min = self._bound_min - obj._bound_max = self._bound_max - obj.tag = self.tag - obj._expl = self._expl - obj.default = self.default - obj.optional = self.optional - obj.offset = self.offset - obj.llen = self.llen - obj.vlen = self.vlen - obj.expl_lenindef = self.expl_lenindef - obj.lenindef = self.lenindef - obj.ber_encoded = self.ber_encoded - obj._value = [v.copy() for v in self._value] - return obj + def __getstate__(self): + return SequenceOfState( + __version__, + self.spec, + [copy(v) for v in self._value], + self._bound_min, + self._bound_max, + self.tag, + self._expl, + self.default, + self.optional, + self.offset, + self.llen, + self.vlen, + self.expl_lenindef, + self.lenindef, + self.ber_encoded, + ) + + def __setstate__(self, state): + super(SequenceOf, self).__setstate__(state) + self.spec = state.spec + self._value = state.value + self._bound_min = state.bound_min + self._bound_max = state.bound_max + self.tag = state.tag + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded def __eq__(self, their): if isinstance(their, self.__class__): @@ -5305,7 +6089,7 @@ class SequenceOf(Obj): offset=offset, ) if tag_only: - return + return None lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5501,7 +6285,7 @@ def generic_decoder(): # pragma: no cover def pprint_any( obj, - oids=None, + oid_maps=(), with_colours=False, with_decode_path=False, decode_path_only=(), @@ -5510,8 +6294,8 @@ def generic_decoder(): # pragma: no cover for pp in pps: if hasattr(pp, "_fields"): if ( - decode_path_only != () and - pp.decode_path[:len(decode_path_only)] != decode_path_only + decode_path_only != () and + pp.decode_path[:len(decode_path_only)] != decode_path_only ): continue if pp.asn1_type_name == Choice.asn1_type_name: @@ -5521,7 +6305,7 @@ def generic_decoder(): # pragma: no cover pp = _pp(**pp_kwargs) yield pp_console_row( pp, - oids=oids, + oid_maps=oid_maps, with_offsets=True, with_blob=False, with_colours=with_colours, @@ -5529,8 +6313,8 @@ def generic_decoder(): # pragma: no cover decode_path_len_decrease=len(decode_path_only), ) for row in pp_console_blob( - pp, - decode_path_len_decrease=len(decode_path_only), + pp, + decode_path_len_decrease=len(decode_path_only), ): yield row else: @@ -5551,7 +6335,7 @@ def main(): # pragma: no cover ) parser.add_argument( "--oids", - help="Python path to dictionary with OIDs", + help="Python paths to dictionary with OIDs, comma separated", ) parser.add_argument( "--schema", @@ -5589,7 +6373,10 @@ def main(): # pragma: no cover args.DERFile.seek(args.skip) der = memoryview(args.DERFile.read()) args.DERFile.close() - oids = obj_by_path(args.oids) if args.oids else {} + oid_maps = ( + [obj_by_path(_path) for _path in (args.oids or "").split(",")] + if args.oids else () + ) if args.schema: schema = obj_by_path(args.schema) from functools import partial @@ -5605,8 +6392,8 @@ def main(): # pragma: no cover obj, tail = schema().decode(der, ctx=ctx) print(pprinter( obj, - oids=oids, - with_colours=True if environ.get("NO_COLOR") is None else False, + oid_maps=oid_maps, + with_colours=environ.get("NO_COLOR") is None, with_decode_path=args.print_decode_path, decode_path_only=( () if args.decode_path_only is None else