X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=blobdiff_plain;f=pyderasn.py;h=e6c7fefab199266370c3f420d5469cf17a28a4a7;hp=0c2c2ee44d1fff5f4b664427124f8f0d41caaa81;hb=0602282ab60f6d203ca37f65006098a285154814;hpb=a3d3be6767ec310b97f6ae55921c76ba8894d14b diff --git a/pyderasn.py b/pyderasn.py index 0c2c2ee..e6c7fef 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # coding: utf-8 # PyDERASN -- Python ASN.1 DER codec with abstract structures -# Copyright (C) 2017 Sergey Matveev +# Copyright (C) 2017-2018 Sergey Matveev # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as @@ -68,7 +68,7 @@ ____ Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is the default tag used during coding process. You can override it with either ``IMPLICIT`` (using ``impl`` keyword argument), or -``EXPLICIT`` one (using ``expl`` keyword argument). Both arguments takes +``EXPLICIT`` one (using ``expl`` keyword argument). Both arguments take raw binary string, containing that tag. You can **not** set implicit and explicit tags simultaneously. @@ -88,10 +88,10 @@ number. Pay attention that explicit tags always have *constructed* tag Implicit tag is not explicitly shown. -Two object of the same type, but with different implicit/explicit tags +Two objects of the same type, but with different implicit/explicit tags are **not** equal. -You can get objects effective tag (either default or implicited) through +You can get object's effective tag (either default or implicited) through ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode` function:: @@ -135,6 +135,8 @@ example ``TBSCertificate`` sequence holds defaulted, explicitly tagged When default argument is used and value is not specified, then it equals to default one. +.. _bounds: + Size constraints ________________ @@ -157,15 +159,17 @@ raised. Common methods ______________ -All objects have ``ready`` boolean property, that tells if it is ready -to be encoded. If that kind of action is performed on unready object, -then :py:exc:`pyderasn.ObjNotReady` exception will be raised. +All objects have ``ready`` boolean property, that tells if object is +ready to be encoded. If that kind of action is performed on unready +object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised. + +All objects have ``copy()`` method, that returns their copy, that can be +safely mutated. -All objects have ``copy()`` method, returning its copy, that can be safely -mutated. +.. _decoding: Decoding -________ +-------- Decoding is performed using ``decode()`` method. ``offset`` optional argument could be used to set initial object's offset in the binary @@ -177,7 +181,7 @@ by specifying ``leavemm=True`` argument. When object is decoded, ``decoded`` property is true and you can safely use following properties: -* ``offset`` -- position from initial offset where object's tag is started +* ``offset`` -- position including initial offset where object's tag starts * ``tlen`` -- length of object's tag * ``llen`` -- length of object's length value * ``vlen`` -- length of object's value @@ -191,8 +195,24 @@ lesser than ``offset``), ``expl_tlen``, ``expl_llen``, ``expl_vlen`` When error occurs, then :py:exc:`pyderasn.DecodeError` is raised. +.. _ctx: + +Context +_______ + +You can specify so called context keyword argument during ``decode()`` +invocation. It is dictionary containing various options governing +decoding process. + +Currently available context options: + +* :ref:`defines_by_path ` +* :ref:`strict_default_existence ` + +.. _pprinting: + Pretty printing -_______________ +--------------- All objects have ``pps()`` method, that is a generator of :py:class:`pyderasn.PP` namedtuple, holding various raw information @@ -209,6 +229,140 @@ all object ``repr``. But it is easy to write custom formatters. >>> print(pprint(obj)) 0 [1,1, 2] INTEGER -12345 +.. _definedby: + +DEFINED BY +---------- + +ASN.1 structures often have ANY and OCTET STRING fields, that are +DEFINED BY some previously met ObjectIdentifier. This library provides +ability to specify mapping between some OID and field that must be +decoded with specific specification. + +defines kwarg +_____________ + +:py:class:`pyderasn.ObjectIdentifier` field inside +:py:class:`pyderasn.Sequence` can hold mapping between OIDs and +necessary for decoding structures. For example, CMS (:rfc:`5652`) +container:: + + class ContentInfo(Sequence): + schema = ( + ("contentType", ContentType(defines=((("content",), { + id_digestedData: DigestedData(), + id_signedData: SignedData(), + }),))), + ("content", Any(expl=tag_ctxc(0))), + ) + +``contentType`` field tells that it defines that ``content`` must be +decoded with ``SignedData`` specification, if ``contentType`` equals to +``id-signedData``. The same applies to ``DigestedData``. If +``contentType`` contains unknown OID, then no automatic decoding is +done. + +You can specify multiple fields, that will be autodecoded -- that is why +``defines`` kwarg is a sequence. You can specify defined field +relatively or absolutely to current decode path. For example ``defines`` +for AlgorithmIdentifier of X.509's +``tbsCertificate.subjectPublicKeyInfo.algorithm.algorithm``:: + + ( + (('parameters',), { + id_ecPublicKey: ECParameters(), + id_GostR3410_2001: GostR34102001PublicKeyParameters(), + }), + (('..', 'subjectPublicKey'), { + id_rsaEncryption: RSAPublicKey(), + id_GostR3410_2001: OctetString(), + }), + ), + +tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then +autodecode its parameters inside SPKI's algorithm and its public key +itself. + +Following types can be automatically decoded (DEFINED BY): + +* :py:class:`pyderasn.Any` +* :py:class:`pyderasn.BitString` (that is multiple of 8 bits) +* :py:class:`pyderasn.OctetString` +* :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf` + ``Any``/``OctetString``-s + +When any of those fields is automatically decoded, then ``.defined`` +attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it +was defined, ``value`` contains corresponding decoded value. For example +above, ``content_info["content"].defined == (id_signedData, +signed_data)``. + +.. _defines_by_path_ctx: + +defines_by_path context option +______________________________ + +Sometimes you either can not or do not want to explicitly set *defines* +in the scheme. You can dynamically apply those definitions when calling +``.decode()`` method. + +Specify ``defines_by_path`` key in the :ref:`decode context `. Its +value must be sequence of following tuples:: + + (decode_path, defines) + +where ``decode_path`` is a tuple holding so-called decode path to the +exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply +``defines``, holding exactly the same value as accepted in its keyword +argument. + +For example, again for CMS, you want to automatically decode +``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse`` +structures it may hold. Also, automatically decode ``controlSequence`` +of ``PKIResponse``:: + + content_info, tail = ContentInfo().decode(data, defines_by_path=( + ( + ("contentType",), + ((("content",), {id_signedData: SignedData()}),), + ), + ( + ( + "content", + decode_path_defby(id_signedData), + "encapContentInfo", + "eContentType", + ), + ((("eContent",), { + id_cct_PKIData: PKIData(), + id_cct_PKIResponse: PKIResponse(), + })), + ), + ( + ( + "content", + decode_path_defby(id_signedData), + "encapContentInfo", + "eContent", + decode_path_defby(id_cct_PKIResponse), + "controlSequence", + any, + "attrType", + ), + ((("attrValues",), { + id_cmc_recipientNonce: RecipientNonce(), + id_cmc_senderNonce: SenderNonce(), + id_cmc_statusInfoV2: CMCStatusInfoV2(), + id_cmc_transactionId: TransactionId(), + })), + ), + )) + +Pay attention for :py:func:`pyderasn.decode_path_defby` and ``any``. +First function is useful for path construction when some automatic +decoding is already done. ``any`` means literally any value it meet -- +useful for SEQUENCE/SET OF-s. + Primitive types --------------- @@ -302,6 +456,7 @@ _____ Various ------- +.. autofunction:: pyderasn.abs_decode_path .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec .. autofunction:: pyderasn.tag_encode @@ -338,6 +493,7 @@ __all__ = ( "Boolean", "BoundsError", "Choice", + "decode_path_defby", "DecodeError", "Enumerated", "GeneralizedTime", @@ -657,9 +813,9 @@ def len_decode(data): ######################################################################## class AutoAddSlots(type): - def __new__(cls, name, bases, _dict): + def __new__(mcs, name, bases, _dict): _dict["__slots__"] = _dict.get("__slots__", ()) - return type.__new__(cls, name, bases, _dict) + return type.__new__(mcs, name, bases, _dict) @add_metaclass(AutoAddSlots) @@ -688,10 +844,7 @@ class Obj(object): optional=False, _decoded=(0, 0, 0), ): - if impl is None: - self.tag = getattr(self, "impl", self.tag_default) - else: - self.tag = impl + self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl self._expl = getattr(self, "expl", None) if expl is None else expl if self.tag != self.tag_default and self._expl is not None: raise ValueError( @@ -735,15 +888,9 @@ class Obj(object): def __str__(self): # pragma: no cover return self.__bytes__() if PY2 else self.__unicode__() - def __eq__(self, their): # pragma: no cover - raise NotImplementedError() - def __ne__(self, their): return not(self == their) - def __lt__(self, their): # pragma: no cover - raise NotImplementedError() - def __gt__(self, their): # pragma: no cover return not(self < their) @@ -756,7 +903,7 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() - def _decode(self, tlv, offset=0, decode_path=()): # pragma: no cover + def _decode(self, tlv, offset, decode_path, ctx): # pragma: no cover raise NotImplementedError() def encode(self): @@ -765,21 +912,25 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) - def decode(self, data, offset=0, leavemm=False, decode_path=()): + def decode(self, data, offset=0, leavemm=False, decode_path=(), ctx=None): """Decode the data :param data: either binary or memoryview :param int offset: initial data's offset :param bool leavemm: do we need to leave memoryview of remaining data as is, or convert it to bytes otherwise + :param ctx: optional :ref:`context ` governing decoding process. :returns: (Obj, remaining data) """ + if ctx is None: + ctx = {} tlv = memoryview(data) if self._expl is None: obj, tail = self._decode( tlv, offset, decode_path=decode_path, + ctx=ctx, ) else: try: @@ -816,7 +967,8 @@ class Obj(object): obj, tail = self._decode( v, offset=offset + tlen + llen, - decode_path=(), + decode_path=decode_path, + ctx=ctx, ) return obj, (tail if leavemm else tail.tobytes()) @@ -849,6 +1001,12 @@ class Obj(object): return self.expl_tlen + self.expl_llen + self.expl_vlen +def decode_path_defby(defined_by): + """DEFINED BY representation inside decode path + """ + return "DEFINED BY (%s)" % defined_by + + ######################################################################## # Pretty printing ######################################################################## @@ -1118,7 +1276,7 @@ class Boolean(Obj): (b"\xFF" if self._value else b"\x00"), )) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1267,14 +1425,11 @@ class Integer(Obj): self._value = value specs = getattr(self, "schema", {}) if _specs is None else _specs self.specs = specs if isinstance(specs, dict) else dict(specs) - if bounds is None: - self._bound_min, self._bound_max = getattr( - self, - "bounds", - (float("-inf"), float("+inf")), - ) - else: - self._bound_min, self._bound_max = bounds + self._bound_min, self._bound_max = getattr( + self, + "bounds", + (float("-inf"), float("+inf")), + ) if bounds is None else bounds if value is not None: self._value = self._value_sanitize(value) if default is not None: @@ -1414,7 +1569,7 @@ class Integer(Obj): break return b"".join((self.tag, len_encode(len(octets)), octets)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1565,7 +1720,7 @@ class BitString(Obj): >>> b.specs {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2} """ - __slots__ = ("specs",) + __slots__ = ("specs", "defined") tag_default = tag_encode(3) asn1_type_name = "BIT STRING" @@ -1602,6 +1757,7 @@ class BitString(Obj): ) if value is None: self._value = default + self.defined = None def _bits2octets(self, bits): if len(self.specs) > 0: @@ -1662,7 +1818,10 @@ class BitString(Obj): def copy(self): obj = self.__class__(_specs=self.specs) - obj._value = self._value + value = self._value + if value is not None: + value = (value[0], value[1]) + obj._value = value obj.tag = self.tag obj._expl = self._expl obj.default = self.default @@ -1744,7 +1903,7 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1847,6 +2006,11 @@ class BitString(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, ) + defined_by, defined = self.defined or (None, None) + if defined_by is not None: + yield defined.pps( + decode_path=decode_path + (decode_path_defby(defined_by),) + ) class OctetString(Obj): @@ -1865,7 +2029,7 @@ class OctetString(Obj): >>> OctetString(b"hell", bounds=(4, 4)) OCTET STRING 4 bytes 68656c6c """ - __slots__ = ("_bound_min", "_bound_max") + __slots__ = ("_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" @@ -1897,14 +2061,11 @@ class OctetString(Obj): _decoded, ) self._value = value - if bounds is None: - self._bound_min, self._bound_max = getattr( - self, - "bounds", - (0, float("+inf")), - ) - else: - self._bound_min, self._bound_max = bounds + self._bound_min, self._bound_max = getattr( + self, + "bounds", + (0, float("+inf")), + ) if bounds is None else bounds if value is not None: self._value = self._value_sanitize(value) if default is not None: @@ -1916,6 +2077,7 @@ class OctetString(Obj): ) if self._value is None: self._value = default + self.defined = None def _value_sanitize(self, value): if issubclass(value.__class__, OctetString): @@ -1993,7 +2155,7 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2068,6 +2230,11 @@ class OctetString(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, ) + defined_by, defined = self.defined or (None, None) + if defined_by is not None: + yield defined.pps( + decode_path=decode_path + (decode_path_defby(defined_by),) + ) class Null(Obj): @@ -2137,7 +2304,7 @@ class Null(Obj): def _encode(self): return self.tag + len_encode(0) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2217,13 +2384,14 @@ class ObjectIdentifier(Obj): Traceback (most recent call last): pyderasn.InvalidOID: unacceptable first arc value """ - __slots__ = () + __slots__ = ("defines",) tag_default = tag_encode(6) asn1_type_name = "OBJECT IDENTIFIER" def __init__( self, value=None, + defines=(), impl=None, expl=None, default=None, @@ -2234,6 +2402,15 @@ class ObjectIdentifier(Obj): :param value: set the value. Either tuples of integers, string of "."-concatenated integers, or :py:class:`pyderasn.ObjectIdentifier` object + :param defines: sequence of tuples. Each tuple has two elements. + First one is relative to current one decode + path, aiming to the field defined by that OID. + Read about relative path in + :py:func:`pyderasn.abs_decode_path`. Second + tuple element is ``{OID: pyderasn.Obj()}`` + dictionary, mapping between current OID value + and structure applied to defined field. + :ref:`Read about DEFINED BY ` :param bytes impl: override default tag with ``IMPLICIT`` one :param bytes expl: override default tag with ``EXPLICIT`` one :param default: set default value. Type same as in ``value`` @@ -2258,6 +2435,7 @@ class ObjectIdentifier(Obj): ) if self._value is None: self._value = default + self.defines = defines def __add__(self, their): if isinstance(their, self.__class__): @@ -2295,6 +2473,7 @@ class ObjectIdentifier(Obj): def copy(self): obj = self.__class__() obj._value = self._value + obj.defines = self.defines obj.tag = self.tag obj._expl = self._expl obj.default = self.default @@ -2336,6 +2515,7 @@ class ObjectIdentifier(Obj): def __call__( self, value=None, + defines=None, impl=None, expl=None, default=None, @@ -2343,6 +2523,7 @@ class ObjectIdentifier(Obj): ): return self.__class__( value=value, + defines=self.defines if defines is None else defines, impl=self.tag if impl is None else impl, expl=self._expl if expl is None else expl, default=self.default if default is None else default, @@ -2368,7 +2549,7 @@ class ObjectIdentifier(Obj): v = b"".join(octets) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2733,6 +2914,11 @@ class IA5String(CommonString): asn1_type_name = "IA5" +LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ") +LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ") +LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ") + + class UTCTime(CommonString): """``UTCTime`` datetime type @@ -2799,7 +2985,7 @@ class UTCTime(CommonString): return value.strftime(self.fmt).encode("ascii") if isinstance(value, binary_type): value_decoded = value.decode("ascii") - if len(value_decoded) == 2 + 2 + 2 + 2 + 2 + 2 + 1: + if len(value_decoded) == LEN_YYMMDDHHMMSSZ: try: datetime.strptime(value_decoded, self.fmt) except ValueError: @@ -2890,7 +3076,7 @@ class GeneralizedTime(UTCTime): ).encode("ascii") if isinstance(value, binary_type): value_decoded = value.decode("ascii") - if len(value_decoded) == 4 + 2 + 2 + 2 + 2 + 2 + 1: + if len(value_decoded) == LEN_YYYYMMDDHHMMSSZ: try: datetime.strptime(value_decoded, self.fmt) except ValueError: @@ -2898,7 +3084,7 @@ class GeneralizedTime(UTCTime): "invalid GeneralizedTime (without ms) format", ) return value - elif len(value_decoded) >= 4 + 2 + 2 + 2 + 2 + 2 + 1 + 1 + 1: + elif len(value_decoded) >= LEN_YYYYMMDDHHMMSSDMZ: try: datetime.strptime(value_decoded, self.fmt_ms) except ValueError: @@ -2915,7 +3101,7 @@ class GeneralizedTime(UTCTime): def todatetime(self): value = self._value.decode("ascii") - if len(value) == 4 + 2 + 2 + 2 + 2 + 2 + 1: + if len(value) == LEN_YYYYMMDDHHMMSSZ: return datetime.strptime(value, self.fmt) return datetime.strptime(value, self.fmt_ms) @@ -3130,7 +3316,7 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): for choice, spec in self.specs.items(): try: value, tail = spec.decode( @@ -3138,6 +3324,7 @@ class Choice(Obj): offset=offset, leavemm=True, decode_path=decode_path + (choice,), + ctx=ctx, ) except TagMismatch: continue @@ -3226,7 +3413,7 @@ class Any(Obj): >>> hexenc(bytes(a)) b'0x040x0bhello world' """ - __slots__ = () + __slots__ = ("defined",) tag_default = tag_encode(0) asn1_type_name = "ANY" @@ -3247,6 +3434,7 @@ class Any(Obj): """ super(Any, self).__init__(None, expl, None, optional, _decoded) self._value = None if value is None else self._value_sanitize(value) + self.defined = None def _value_sanitize(self, value): if isinstance(value, self.__class__): @@ -3303,7 +3491,7 @@ class Any(Obj): self._assert_ready() return self._value - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, tlen, lv = tag_strip(tlv) l, llen, v = len_decode(lv) @@ -3354,19 +3542,62 @@ class Any(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, ) + defined_by, defined = self.defined or (None, None) + if defined_by is not None: + yield defined.pps( + decode_path=decode_path + (decode_path_defby(defined_by),) + ) ######################################################################## # ASN.1 constructed types ######################################################################## +def get_def_by_path(defines_by_path, sub_decode_path): + """Get define by decode path + """ + for path, define in defines_by_path: + if len(path) != len(sub_decode_path): + continue + for p1, p2 in zip(path, sub_decode_path): + if (p1 != any) and (p1 != p2): + break + else: + return define + + +def abs_decode_path(decode_path, rel_path): + """Create an absolute decode path from current and relative ones + + :param decode_path: current decode path, starting point. + Tuple of strings + :param rel_path: relative path to ``decode_path``. Tuple of strings. + If first tuple's element is "/", then treat it as + an absolute path, ignoring ``decode_path`` as + starting point. Also this tuple can contain ".." + elements, stripping the leading element from + ``decode_path`` + + >>> abs_decode_path(("foo", "bar"), ("baz", "whatever")) + ("foo", "bar", "baz", "whatever") + >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever")) + ("foo", "whatever") + >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever")) + ("baz", "whatever") + """ + if rel_path[0] == "/": + return rel_path[1:] + if rel_path[0] == "..": + return abs_decode_path(decode_path[:-1], rel_path[1:]) + return decode_path + rel_path + + class Sequence(Obj): """``SEQUENCE`` structure type You have to make specification of sequence:: class Extension(Sequence): - __slots__ = () schema = ( ("extnID", ObjectIdentifier()), ("critical", Boolean(default=False)), @@ -3411,6 +3642,8 @@ class Sequence(Obj): >>> tbs = TBSCertificate() >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl`` + Assign ``None`` to remove value from sequence. + You can know if value exists/set in the sequence and take its value: >>> "extnID" in ext, "extnValue" in ext, "critical" in ext @@ -3430,13 +3663,18 @@ class Sequence(Obj): All defaulted values are always optional. + .. _strict_default_existence_ctx: + .. warning:: When decoded DER contains defaulted value inside, then - technically this is not valid DER encoding. But we allow - and pass it. Of course reencoding of that kind of DER will + technically this is not valid DER encoding. But we allow and pass + it **by default**. Of course reencoding of that kind of DER will result in different binary representation (validly without - defaulted value inside). + defaulted value inside). You can enable strict defaulted values + existence validation by setting ``"strict_default_existence": + True`` :ref:`context ` option -- decoding process will raise + an exception if defaulted value is met. Two sequences are equal if they have equal specification (schema), implicit/explicit tagging and the same values. @@ -3574,7 +3812,7 @@ class Sequence(Obj): v = b"".join(self._encoded_values()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3612,24 +3850,88 @@ class Sequence(Obj): for name, spec in self.specs.items(): if len(v) == 0 and spec.optional: continue + sub_decode_path = decode_path + (name,) try: value, v_tail = spec.decode( v, sub_offset, leavemm=True, - decode_path=decode_path + (name,), + decode_path=sub_decode_path, + ctx=ctx, ) except TagMismatch: if spec.optional: continue raise + + defined = get_def_by_path(ctx.get("defines", ()), sub_decode_path) + if defined is not None: + defined_by, defined_spec = defined + if issubclass(value.__class__, SequenceOf): + for i, _value in enumerate(value): + sub_sub_decode_path = sub_decode_path + ( + str(i), + decode_path_defby(defined_by), + ) + defined_value, defined_tail = defined_spec.decode( + memoryview(bytes(_value)), + sub_offset + value.tlen + value.llen, + leavemm=True, + decode_path=sub_sub_decode_path, + ctx=ctx, + ) + if len(defined_tail) > 0: + raise DecodeError( + "remaining data", + klass=self.__class__, + decode_path=sub_sub_decode_path, + offset=offset, + ) + _value.defined = (defined_by, defined_value) + else: + defined_value, defined_tail = defined_spec.decode( + memoryview(bytes(value)), + sub_offset + value.tlen + value.llen, + leavemm=True, + decode_path=sub_decode_path + (decode_path_defby(defined_by),), + ctx=ctx, + ) + if len(defined_tail) > 0: + raise DecodeError( + "remaining data", + klass=self.__class__, + decode_path=sub_decode_path + (decode_path_defby(defined_by),), + offset=offset, + ) + value.defined = (defined_by, defined_value) + sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen) v = v_tail if spec.default is not None and value == spec.default: - # Encoded default values are not valid in DER, - # but we still allow that - continue + if ctx.get("strict_default_existence", False): + raise DecodeError( + "DEFAULT value met", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + else: + continue values[name] = value + + spec_defines = getattr(spec, "defines", ()) + if len(spec_defines) == 0: + defines_by_path = ctx.get("defines_by_path", ()) + if len(defines_by_path) > 0: + spec_defines = get_def_by_path(defines_by_path, sub_decode_path) + if spec_defines is not None and len(spec_defines) > 0: + for rel_path, schema in spec_defines: + defined = schema.get(value, None) + if defined is not None: + ctx.setdefault("defines", []).append(( + abs_decode_path(sub_decode_path[:-1], rel_path), + (value, defined), + )) if len(v) > 0: raise DecodeError( "remaining data", @@ -3698,7 +4000,7 @@ class Set(Sequence): v = b"".join(raws) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3741,6 +4043,7 @@ class Set(Sequence): sub_offset, leavemm=True, decode_path=decode_path + (name,), + ctx=ctx, ) except TagMismatch: continue @@ -3827,14 +4130,11 @@ class SequenceOf(Obj): if schema is None: raise ValueError("schema must be specified") self.spec = schema - if bounds is None: - self._bound_min, self._bound_max = getattr( - self, - "bounds", - (0, float("+inf")), - ) - else: - self._bound_min, self._bound_max = bounds + self._bound_min, self._bound_max = getattr( + self, + "bounds", + (0, float("+inf")), + ) if bounds is None else bounds self._value = [] if value is not None: self._value = self._value_sanitize(value) @@ -3953,7 +4253,7 @@ class SequenceOf(Obj): v = b"".join(self._encoded_values()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset, decode_path, ctx): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3995,6 +4295,7 @@ class SequenceOf(Obj): sub_offset, leavemm=True, decode_path=decode_path + (str(len(_value)),), + ctx=ctx, ) sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen) v = v_tail @@ -4073,9 +4374,56 @@ def obj_by_path(pypath): # pragma: no cover return obj +def generic_decoder(): # pragma: no cover + # All of this below is a big hack with self references + choice = PrimitiveTypes() + choice.specs["SequenceOf"] = SequenceOf(schema=choice) + choice.specs["SetOf"] = SetOf(schema=choice) + for i in range(31): + choice.specs["SequenceOf%d" % i] = SequenceOf( + schema=choice, + expl=tag_ctxc(i), + ) + choice.specs["Any"] = Any() + + # Class name equals to type name, to omit it from output + class SEQUENCEOF(SequenceOf): + __slots__ = () + schema = choice + + def pprint_any(obj, oids=None): + def _pprint_pps(pps): + for pp in pps: + if hasattr(pp, "_fields"): + if pp.asn1_type_name == Choice.asn1_type_name: + continue + pp_kwargs = pp._asdict() + pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",) + pp = _pp(**pp_kwargs) + yield pp_console_row( + pp, + oids=oids, + with_offsets=True, + with_blob=False, + ) + for row in pp_console_blob(pp): + yield row + else: + for row in _pprint_pps(pp): + yield row + return "\n".join(_pprint_pps(obj.pps())) + return SEQUENCEOF(), pprint_any + + def main(): # pragma: no cover import argparse parser = argparse.ArgumentParser(description="PyDERASN ASN.1 DER decoder") + parser.add_argument( + "--skip", + type=int, + default=0, + help="Skip that number of bytes from the beginning", + ) parser.add_argument( "--oids", help="Python path to dictionary with OIDs", @@ -4084,12 +4432,17 @@ def main(): # pragma: no cover "--schema", help="Python path to schema definition to use", ) + parser.add_argument( + "--defines-by-path", + help="Python path to decoder's defines_by_path", + ) parser.add_argument( "DERFile", type=argparse.FileType("rb"), help="Path to DER file you want to decode", ) args = parser.parse_args() + args.DERFile.seek(args.skip) der = memoryview(args.DERFile.read()) args.DERFile.close() oids = obj_by_path(args.oids) if args.oids else {} @@ -4098,46 +4451,14 @@ def main(): # pragma: no cover from functools import partial pprinter = partial(pprint, big_blobs=True) else: - # All of this below is a big hack with self references - choice = PrimitiveTypes() - choice.specs["SequenceOf"] = SequenceOf(schema=choice) - choice.specs["SetOf"] = SetOf(schema=choice) - for i in range(31): - choice.specs["SequenceOf%d" % i] = SequenceOf( - schema=choice, - expl=tag_ctxc(i), - ) - choice.specs["Any"] = Any() - - # Class name equals to type name, to omit it from output - class SEQUENCEOF(SequenceOf): - __slots__ = () - schema = choice - schema = SEQUENCEOF() - - def pprint_any(obj, oids=None): - def _pprint_pps(pps): - for pp in pps: - if hasattr(pp, "_fields"): - if pp.asn1_type_name == Choice.asn1_type_name: - continue - pp_kwargs = pp._asdict() - pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",) - pp = _pp(**pp_kwargs) - yield pp_console_row( - pp, - oids=oids, - with_offsets=True, - with_blob=False, - ) - for row in pp_console_blob(pp): - yield row - else: - for row in _pprint_pps(pp): - yield row - return "\n".join(_pprint_pps(obj.pps())) - pprinter = pprint_any - obj, tail = schema().decode(der) + schema, pprinter = generic_decoder() + obj, tail = schema().decode( + der, + ctx=( + None if args.defines_by_path is None else + {"defines_by_path": obj_by_path(args.defines_by_path)} + ), + ) print(pprinter(obj, oids=oids)) if tail != b"": print("\nTrailing data: %s" % hexenc(tail))