From: Sergey Matveev Date: Sun, 29 Oct 2017 16:13:42 +0000 (+0300) Subject: DEFINED BY support X-Git-Tag: 1.4~2 X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=commitdiff_plain;h=7aed684a10179b2e57e81369e956ac6df4fb135e DEFINED BY support --- diff --git a/README b/README index 607fedc..8d6d2a0 100644 --- a/README +++ b/README @@ -10,10 +10,11 @@ PyDERASN -- ASN.1 DER library for Python * Working with sequences as high level data objects with ability to (un)marshall them * Python 2.7/3.5 compatibility -* __slots__ friendliness +* Automatic decoding of DEFINED BY fields * Ability to know exact decoded objects offset and lengths in the binary * Pretty printer and command-line decoder, that could conveniently replace utilities like either dumpasn1 or openssl asn1parse +* __slots__ friendliness pyderasn is free software: see the file COPYING.LESSER for copying conditions. diff --git a/VERSION b/VERSION index 7e32cd5..c068b24 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.3 +1.4 diff --git a/doc/examples.rst b/doc/examples.rst index 108048e..a9e57b6 100644 --- a/doc/examples.rst +++ b/doc/examples.rst @@ -415,3 +415,54 @@ Let's create some simple self-signed X.509 certificate from the ground:: crt.encode() And we will get the same certificate used in Go's library tests. + +DEFINED BY fields +----------------- + +Here is only very simple example how you can define Any/OctetString +fields automatic decoding:: + + class AttributeTypeAndValue(Sequence): + schema = ( + ("type", AttributeType(defines=("value", { + id_at_countryName: PrintableString(), + id_at_stateOrProvinceName: PrintableString(), + id_at_localityName: PrintableString(), + id_at_organizationName: PrintableString(), + id_at_commonName: PrintableString(), + }))), + ("value", AttributeValue()), + ) + +And when you will try to decode X.509 certificate with it, your pretty +printer will show:: + + 34 [0,0, 149] . . issuer: Name CHOICE rdnSequence + 34 [1,2, 146] . . . rdnSequence: RDNSequence SEQUENCE OF + 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF + 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE + 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-countryName (2.5.4.6) + 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY + . . . . . . . 13:02:58:58 + 46 [1,1, 2] . . . . . . . DEFINED BY (2.5.4.6): PrintableString PrintableString XX + 50 [1,1, 19] . . . . 1: RelativeDistinguishedName SET OF + 52 [1,1, 17] . . . . . 0: AttributeTypeAndValue SEQUENCE + 54 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8) + 59 [0,0, 12] . . . . . . value: [UNIV 19] AttributeValue ANY + . . . . . . . 13:0A:53:6F:6D:65:2D:53:74:61:74:65 + 59 [1,1, 10] . . . . . . . DEFINED BY (2.5.4.8): PrintableString PrintableString Some-State + 71 [1,1, 13] . . . . 2: RelativeDistinguishedName SET OF + 73 [1,1, 11] . . . . . 0: AttributeTypeAndValue SEQUENCE + 75 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-localityName (2.5.4.7) + 80 [0,0, 6] . . . . . . value: [UNIV 19] AttributeValue ANY + . . . . . . . 13:04:43:69:74:79 + 80 [1,1, 4] . . . . . . . DEFINED BY (2.5.4.7): PrintableString PrintableString City + 86 [1,1, 33] . . . . 3: RelativeDistinguishedName SET OF + 88 [1,1, 31] . . . . . 0: AttributeTypeAndValue SEQUENCE + 90 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-organizationName (2.5.4.10) + 95 [0,0, 26] . . . . . . value: [UNIV 19] AttributeValue ANY + . . . . . . . 13:18:49:6E:74:65:72:6E:65:74:20:57:69:64:67:69 + . . . . . . . 74:73:20:50:74:79:20:4C:74:64 + 95 [1,1, 24] . . . . . . . DEFINED BY (2.5.4.10): PrintableString PrintableString Internet Widgits Pty Ltd + +:ref:`Read more ` about that feature. diff --git a/doc/features.rst b/doc/features.rst index e6afdd3..d2d1734 100644 --- a/doc/features.rst +++ b/doc/features.rst @@ -4,7 +4,7 @@ Features * Basic ASN.1 data types (X.208): BOOLEAN, INTEGER, BIT STRING, OCTET STRING, NULL, OBJECT IDENTIFIER, ENUMERATED, all strings, UTCTime, GeneralizedTime, CHOICE, ANY, SEQUENCE (OF), SET (OF) -* Size constraints checking +* Size :ref:`constraints ` checking * Working with sequences as high level data objects with ability to (un)marshall them * Python 2.7/3.5 compatibility @@ -15,14 +15,17 @@ practice it should be relatively easy to convert ``pyasn1``'s code to ``pyderasn``'s one. But additionally it offers: * Small, simple and trying to be reviewable code. Just a single file -* ``__slots__`` friendliness -* Ability to know exact decoded objects offsets and lengths in the binary -* Pretty printer and command-line decoder, that could conveniently - replace utilities like either ``dumpasn1`` or ``openssl asn1parse`` +* Automatic decoding of :ref:`DEFINED BY ` fields +* Ability to know :ref:`exact decoded ` objects offsets and + lengths inside the binary +* :ref:`Pretty printer ` and command-line decoder, that could + conveniently replace utilities like either ``dumpasn1`` or + ``openssl asn1parse`` * Some kind of strong typing: SEQUENCEs require the exact **type** of settable values, even when they are inherited * However they do not require tags matching: IMPLICIT/EXPLICIT tags will be set automatically in the given sequence +* ``__slots__`` friendliness * Could be significantly faster. For example parsing of CACert.org's CRL under Python 3.5.2: diff --git a/doc/news.rst b/doc/news.rst index c80cf9b..bd71b82 100644 --- a/doc/news.rst +++ b/doc/news.rst @@ -1,6 +1,13 @@ News ==== +.. _release1.4: + +1.4 +--- +Ability to automatically decode :ref:`DEFINED BY ` fields +inside SEQUENCEs. + .. _release1.3: 1.3 diff --git a/pyderasn.py b/pyderasn.py index 32ce73c..56eec8d 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -135,6 +135,8 @@ example ``TBSCertificate`` sequence holds defaulted, explicitly tagged When default argument is used and value is not specified, then it equals to default one. +.. _bounds: + Size constraints ________________ @@ -164,8 +166,10 @@ then :py:exc:`pyderasn.ObjNotReady` exception will be raised. All objects have ``copy()`` method, returning its copy, that can be safely mutated. +.. _decoding: + Decoding -________ +-------- Decoding is performed using ``decode()`` method. ``offset`` optional argument could be used to set initial object's offset in the binary @@ -191,8 +195,10 @@ lesser than ``offset``), ``expl_tlen``, ``expl_llen``, ``expl_vlen`` When error occurs, then :py:exc:`pyderasn.DecodeError` is raised. +.. _pprinting: + Pretty printing -_______________ +--------------- All objects have ``pps()`` method, that is a generator of :py:class:`pyderasn.PP` namedtuple, holding various raw information @@ -209,6 +215,116 @@ all object ``repr``. But it is easy to write custom formatters. >>> print(pprint(obj)) 0 [1,1, 2] INTEGER -12345 +.. _definedby: + +DEFINED BY +---------- + +ASN.1 structures often have ANY and OCTET STRING fields, that are +DEFINED BY some previously met ObjectIdentifier. This library provides +ability to specify mapping between some OID and field that must be +decoded with specific specification. + +defines kwarg +_____________ + +:py:class:`pyderasn.ObjectIdentifier` field inside +:py:class:`pyderasn.Sequence` can hold mapping between OIDs and +necessary for decoding structrures. For example, CMS (:rfc:`5652`) +container:: + + class ContentInfo(Sequence): + schema = ( + ("contentType", ContentType(defines=("content", { + id_digestedData: DigestedData(), + id_signedData: SignedData(), + }))), + ("content", Any(expl=tag_ctxc(0))), + ) + +``contentType`` field tells that it defines that ``content`` must be +decoded with ``SignedData`` specification, if ``contentType`` equals to +``id-signedData``. The same applies to ``DigestedData``. If +``contentType`` contains unknown OID, then no automatic decoding is +done. + +Following types can be automatically decoded (DEFINED BY): + +* :py:class:`pyderasn.Any` +* :py:class:`pyderasn.OctetString` +* :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf` + ``Any``/``OctetString``-s + +When any of those fields is automatically decoded, then ``.defined`` +attribute contains ``(OID, value)`` tuple. OID tell by which OID it was +defined, ``value`` contains corresponding decoded value. For example +above, ``content_info["content"].defined == (id_signedData, +signed_data)``. + +defines_by_path kwarg +_____________________ + +Sometimes you either can not or do not want to explicitly set *defines* +in the scheme. You can dynamically apply those definitions when calling +``.decode()`` method. + +Decode method takes optional ``defines_by_path`` keyword argument that +must be sequence of following tuples:: + + (decode_path, defines) + +where ``decode_path`` is a tuple holding so-called decode path to the +exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply +``defines``, holding exactly the same value as accepted in its keyword +argument. + +For example, again for CMS, you want to automatically decode +``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse`` +structures it may hold. Also, automatically decode ``controlSequence`` +of ``PKIResponse``:: + + content_info, tail = ContentInfo().decode(data, defines_by_path=( + ( + ("contentType",), + ("content", {id_signedData: SignedData()}), + ), + ( + ( + "content", + decode_path_defby(id_signedData), + "encapContentInfo", + "eContentType", + ), + ("eContent", { + id_cct_PKIData: PKIData(), + id_cct_PKIResponse: PKIResponse(), + }), + ), + ( + ( + "content", + decode_path_defby(id_signedData), + "encapContentInfo", + "eContent", + decode_path_defby(id_cct_PKIResponse), + "controlSequence", + any, + "attrType", + ), + ("attrValues", { + id_cmc_recipientNonce: RecipientNonce(), + id_cmc_senderNonce: SenderNonce(), + id_cmc_statusInfoV2: CMCStatusInfoV2(), + id_cmc_transactionId: TransactionId(), + }), + ), + )) + +Pay attention for :py:func:`pyderasn.decode_path_defby` and ``any``. +First function is useful for path construction when some automatic +decoding is already done. ``any`` is used for human readability and +means literally any value it meet -- useful for sequence and set of-s. + Primitive types --------------- @@ -338,6 +454,7 @@ __all__ = ( "Boolean", "BoundsError", "Choice", + "decode_path_defby", "DecodeError", "Enumerated", "GeneralizedTime", @@ -747,7 +864,7 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() - def _decode(self, tlv, offset=0, decode_path=()): # pragma: no cover + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): # pragma: no cover raise NotImplementedError() def encode(self): @@ -756,13 +873,14 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) - def decode(self, data, offset=0, leavemm=False, decode_path=()): + def decode(self, data, offset=0, leavemm=False, decode_path=(), defines_by_path=None): """Decode the data :param data: either binary or memoryview :param int offset: initial data's offset :param bool leavemm: do we need to leave memoryview of remaining data as is, or convert it to bytes otherwise + :param defines_by_path: :ref:`Read about DEFINED BY ` :returns: (Obj, remaining data) """ tlv = memoryview(data) @@ -771,6 +889,7 @@ class Obj(object): tlv, offset, decode_path=decode_path, + defines_by_path=defines_by_path, ) else: try: @@ -808,6 +927,7 @@ class Obj(object): v, offset=offset + tlen + llen, decode_path=decode_path, + defines_by_path=defines_by_path, ) return obj, (tail if leavemm else tail.tobytes()) @@ -840,6 +960,12 @@ class Obj(object): return self.expl_tlen + self.expl_llen + self.expl_vlen +def decode_path_defby(defined_by): + """DEFINED BY representation inside decode path + """ + return "DEFINED BY (%s)" % defined_by + + ######################################################################## # Pretty printing ######################################################################## @@ -1109,7 +1235,7 @@ class Boolean(Obj): (b"\xFF" if self._value else b"\x00"), )) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1402,7 +1528,7 @@ class Integer(Obj): break return b"".join((self.tag, len_encode(len(octets)), octets)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1735,7 +1861,7 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1856,7 +1982,7 @@ class OctetString(Obj): >>> OctetString(b"hell", bounds=(4, 4)) OCTET STRING 4 bytes 68656c6c """ - __slots__ = ("_bound_min", "_bound_max") + __slots__ = ("_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" @@ -1904,6 +2030,7 @@ class OctetString(Obj): ) if self._value is None: self._value = default + self.defined = None def _value_sanitize(self, value): if issubclass(value.__class__, OctetString): @@ -1981,7 +2108,7 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2056,6 +2183,11 @@ class OctetString(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, ) + defined_by, defined = self.defined or (None, None) + if defined_by is not None: + yield defined.pps( + decode_path=decode_path + (decode_path_defby(defined_by),) + ) class Null(Obj): @@ -2125,7 +2257,7 @@ class Null(Obj): def _encode(self): return self.tag + len_encode(0) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2205,13 +2337,14 @@ class ObjectIdentifier(Obj): Traceback (most recent call last): pyderasn.InvalidOID: unacceptable first arc value """ - __slots__ = () + __slots__ = ("defines",) tag_default = tag_encode(6) asn1_type_name = "OBJECT IDENTIFIER" def __init__( self, value=None, + defines=None, impl=None, expl=None, default=None, @@ -2222,6 +2355,13 @@ class ObjectIdentifier(Obj): :param value: set the value. Either tuples of integers, string of "."-concatenated integers, or :py:class:`pyderasn.ObjectIdentifier` object + :param defines: tuple of two elements. First one is a name of + field inside :py:class:`pyderasn.Sequence`, + defining with that OID. Second element is a + ``{OID: pyderasn.Obj()}`` dictionary, mapping + between current OID value and structure applied + to defined field. + :ref:`Read about DEFINED BY ` :param bytes impl: override default tag with ``IMPLICIT`` one :param bytes expl: override default tag with ``EXPLICIT`` one :param default: set default value. Type same as in ``value`` @@ -2246,6 +2386,7 @@ class ObjectIdentifier(Obj): ) if self._value is None: self._value = default + self.defines = defines def __add__(self, their): if isinstance(their, self.__class__): @@ -2283,6 +2424,7 @@ class ObjectIdentifier(Obj): def copy(self): obj = self.__class__() obj._value = self._value + obj.defines = self.defines obj.tag = self.tag obj._expl = self._expl obj.default = self.default @@ -2324,6 +2466,7 @@ class ObjectIdentifier(Obj): def __call__( self, value=None, + defines=None, impl=None, expl=None, default=None, @@ -2331,6 +2474,7 @@ class ObjectIdentifier(Obj): ): return self.__class__( value=value, + defines=self.defines if defines is None else defines, impl=self.tag if impl is None else impl, expl=self._expl if expl is None else expl, default=self.default if default is None else default, @@ -2356,7 +2500,7 @@ class ObjectIdentifier(Obj): v = b"".join(octets) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3123,7 +3267,7 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): for choice, spec in self.specs.items(): try: value, tail = spec.decode( @@ -3131,6 +3275,7 @@ class Choice(Obj): offset=offset, leavemm=True, decode_path=decode_path + (choice,), + defines_by_path=defines_by_path, ) except TagMismatch: continue @@ -3219,7 +3364,7 @@ class Any(Obj): >>> hexenc(bytes(a)) b'0x040x0bhello world' """ - __slots__ = () + __slots__ = ("defined",) tag_default = tag_encode(0) asn1_type_name = "ANY" @@ -3240,6 +3385,7 @@ class Any(Obj): """ super(Any, self).__init__(None, expl, None, optional, _decoded) self._value = None if value is None else self._value_sanitize(value) + self.defined = None def _value_sanitize(self, value): if isinstance(value, self.__class__): @@ -3296,7 +3442,7 @@ class Any(Obj): self._assert_ready() return self._value - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, tlen, lv = tag_strip(tlv) l, llen, v = len_decode(lv) @@ -3347,12 +3493,30 @@ class Any(Obj): expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, ) + defined_by, defined = self.defined or (None, None) + if defined_by is not None: + yield defined.pps( + decode_path=decode_path + (decode_path_defby(defined_by),) + ) ######################################################################## # ASN.1 constructed types ######################################################################## +def get_def_by_path(defines_by_path, sub_decode_path): + """Get define by decode path + """ + for path, define in defines_by_path: + if len(path) != len(sub_decode_path): + continue + for p1, p2 in zip(path, sub_decode_path): + if (p1 != any) and (p1 != p2): + break + else: + return define + + class Sequence(Obj): """``SEQUENCE`` structure type @@ -3566,7 +3730,7 @@ class Sequence(Obj): v = b"".join(self._encoded_values()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3601,20 +3765,65 @@ class Sequence(Obj): v, tail = v[:l], v[l:] sub_offset = offset + tlen + llen values = {} + defines = {} for name, spec in self.specs.items(): if len(v) == 0 and spec.optional: continue + sub_decode_path = decode_path + (name,) try: value, v_tail = spec.decode( v, sub_offset, leavemm=True, - decode_path=decode_path + (name,), + decode_path=sub_decode_path, + defines_by_path=defines_by_path, ) except TagMismatch: if spec.optional: continue raise + + defined = defines.pop(name, None) + if defined is not None: + defined_by, defined_spec = defined + if issubclass(value.__class__, SequenceOf): + for i, _value in enumerate(value): + sub_sub_decode_path = sub_decode_path + ( + str(i), + decode_path_defby(defined_by), + ) + defined_value, defined_tail = defined_spec.decode( + memoryview(bytes(_value)), + sub_offset + value.tlen + value.llen, + leavemm=True, + decode_path=sub_sub_decode_path, + defines_by_path=defines_by_path, + ) + if len(defined_tail) > 0: + raise DecodeError( + "remaining data", + klass=self.__class__, + decode_path=sub_sub_decode_path, + offset=offset, + ) + _value.defined = (defined_by, defined_value) + else: + defined_value, defined_tail = defined_spec.decode( + memoryview(bytes(value)), + sub_offset + value.tlen + value.llen, + leavemm=True, + decode_path=sub_decode_path + (decode_path_defby(defined_by),), + defines_by_path=defines_by_path, + ) + if len(defined_tail) > 0: + raise DecodeError( + "remaining data", + klass=self.__class__, + decode_path=sub_decode_path + (decode_path_defby(defined_by),), + offset=offset, + ) + value.defined = (defined_by, defined_value) + sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen) v = v_tail if spec.default is not None and value == spec.default: @@ -3622,6 +3831,15 @@ class Sequence(Obj): # but we allow that anyway continue values[name] = value + + spec_defines = getattr(spec, "defines", None) + if defines_by_path is not None and spec_defines is None: + spec_defines = get_def_by_path(defines_by_path, sub_decode_path) + if spec_defines is not None: + what, schema = spec_defines + defined = schema.get(value, None) + if defined is not None: + defines[what] = (value, defined) if len(v) > 0: raise DecodeError( "remaining data", @@ -3690,7 +3908,7 @@ class Set(Sequence): v = b"".join(raws) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3733,6 +3951,7 @@ class Set(Sequence): sub_offset, leavemm=True, decode_path=decode_path + (name,), + defines_by_path=defines_by_path, ) except TagMismatch: continue @@ -3942,7 +4161,7 @@ class SequenceOf(Obj): v = b"".join(self._encoded_values()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=()): + def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3984,6 +4203,7 @@ class SequenceOf(Obj): sub_offset, leavemm=True, decode_path=decode_path + (str(len(_value)),), + defines_by_path=defines_by_path, ) sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen) v = v_tail diff --git a/tests/test_crts.py b/tests/test_crts.py index 5a839eb..a096412 100644 --- a/tests/test_crts.py +++ b/tests/test_crts.py @@ -37,6 +37,7 @@ from pyderasn import SequenceOf from pyderasn import SetOf from pyderasn import tag_ctxc from pyderasn import tag_ctxp +from pyderasn import TeletexString from pyderasn import UTCTime @@ -88,9 +89,22 @@ class AttributeValue(Any): pass +class OrganizationName(Choice): + schema = ( + ('printableString', PrintableString()), + ('teletexString', TeletexString()), + ) + + class AttributeTypeAndValue(Sequence): schema = ( - ("type", AttributeType()), + ("type", AttributeType(defines=("value", { + ObjectIdentifier("2.5.4.6"): PrintableString(), + ObjectIdentifier("2.5.4.8"): PrintableString(), + ObjectIdentifier("2.5.4.7"): PrintableString(), + ObjectIdentifier("2.5.4.10"): OrganizationName(), + ObjectIdentifier("2.5.4.3"): PrintableString(), + }))), ("value", AttributeValue()), ) diff --git a/tests/test_pyderasn.py b/tests/test_pyderasn.py index fdc03b5..2312726 100644 --- a/tests/test_pyderasn.py +++ b/tests/test_pyderasn.py @@ -56,6 +56,7 @@ from pyderasn import BMPString from pyderasn import Boolean from pyderasn import BoundsError from pyderasn import Choice +from pyderasn import decode_path_defby from pyderasn import DecodeError from pyderasn import Enumerated from pyderasn import GeneralizedTime @@ -85,6 +86,7 @@ from pyderasn import SequenceOf from pyderasn import Set from pyderasn import SetOf from pyderasn import tag_ctxc +from pyderasn import tag_ctxp from pyderasn import tag_decode from pyderasn import tag_encode from pyderasn import tag_strip @@ -1953,12 +1955,12 @@ class TestObjectIdentifier(CommonMixin, TestCase): _decoded_initial, ) = d.draw(oid_values_strategy()) obj_initial = klass( - value_initial, - impl_initial, - expl_initial, - default_initial, - optional_initial or False, - _decoded_initial, + value=value_initial, + impl=impl_initial, + expl=expl_initial, + default=default_initial, + optional=optional_initial or False, + _decoded=_decoded_initial, ) ( value, @@ -1968,7 +1970,13 @@ class TestObjectIdentifier(CommonMixin, TestCase): optional, _decoded, ) = d.draw(oid_values_strategy(do_expl=impl_initial is None)) - obj = obj_initial(value, impl, expl, default, optional) + obj = obj_initial( + value=value, + impl=impl, + expl=expl, + default=default, + optional=optional, + ) if obj.ready: value_expected = default if value is None else value value_expected = ( @@ -1992,7 +2000,22 @@ class TestObjectIdentifier(CommonMixin, TestCase): @given(oid_values_strategy()) def test_copy(self, values): for klass in (ObjectIdentifier, ObjectIdentifierInherited): - obj = klass(*values) + ( + value, + impl, + expl, + default, + optional, + _decoded, + ) = values + obj = klass( + value=value, + impl=impl, + expl=expl, + default=default, + optional=optional, + _decoded=_decoded, + ) obj_copied = obj.copy() self.assert_copied_basic_fields(obj, obj_copied) self.assertEqual(obj._value, obj_copied._value) @@ -4861,3 +4884,163 @@ class TestAutoAddSlots(TestCase): with self.assertRaises(AttributeError): inher = Inher() inher.unexistent = "whatever" + + +class TestOIDDefines(TestCase): + @given(data_strategy()) + def runTest(self, d): + value_names = list(d.draw(sets(text_letters(), min_size=1, max_size=10))) + value_name_chosen = d.draw(sampled_from(value_names)) + oids = [ + ObjectIdentifier(oid) + for oid in d.draw(sets(oid_strategy(), min_size=2, max_size=10)) + ] + oid_chosen = d.draw(sampled_from(oids)) + values = d.draw(lists( + integers(), + min_size=len(value_names), + max_size=len(value_names), + )) + _schema = [ + ("type", ObjectIdentifier(defines=(value_name_chosen, { + oid: Integer() for oid in oids[:-1] + }))), + ] + for i, value_name in enumerate(value_names): + _schema.append((value_name, Any(expl=tag_ctxp(i)))) + + class Seq(Sequence): + schema = _schema + seq = Seq() + for value_name, value in zip(value_names, values): + seq[value_name] = Any(Integer(value).encode()) + seq["type"] = oid_chosen + seq, _ = Seq().decode(seq.encode()) + for value_name in value_names: + if value_name == value_name_chosen: + continue + self.assertIsNone(seq[value_name].defined) + if value_name_chosen in oids[:-1]: + self.assertIsNotNone(seq[value_name_chosen].defined) + self.assertEqual(seq[value_name_chosen].defined[0], oid_chosen) + self.assertIsInstance(seq[value_name_chosen].defined[1], Integer) + + +class TestDefinesByPath(TestCase): + def runTest(self): + class Seq(Sequence): + schema = ( + ("type", ObjectIdentifier()), + ("value", OctetString(expl=tag_ctxc(123))), + ) + + class SeqInner(Sequence): + schema = ( + ("typeInner", ObjectIdentifier()), + ("valueInner", Any()), + ) + + class PairValue(SetOf): + schema = Any() + + class Pair(Sequence): + schema = ( + ("type", ObjectIdentifier()), + ("value", PairValue()), + ) + + class Pairs(SequenceOf): + schema = Pair() + + ( + type_integered, + type_sequenced, + type_innered, + type_octet_stringed, + ) = [ + ObjectIdentifier(oid) + for oid in sets(oid_strategy(), min_size=4, max_size=4).example() + ] + seq_integered = Seq() + seq_integered["type"] = type_integered + seq_integered["value"] = OctetString(Integer(123).encode()) + seq_integered_raw = seq_integered.encode() + + pairs = Pairs() + pairs_input = ( + (type_octet_stringed, OctetString(b"whatever")), + (type_integered, Integer(123)), + (type_octet_stringed, OctetString(b"whenever")), + (type_integered, Integer(234)), + ) + for t, v in pairs_input: + pair = Pair() + pair["type"] = t + pair["value"] = PairValue((Any(v),)) + pairs.append(pair) + seq_inner = SeqInner() + seq_inner["typeInner"] = type_innered + seq_inner["valueInner"] = Any(pairs) + seq_sequenced = Seq() + seq_sequenced["type"] = type_sequenced + seq_sequenced["value"] = OctetString(seq_inner.encode()) + seq_sequenced_raw = seq_sequenced.encode() + + defines_by_path = [] + seq_integered, _ = Seq().decode(seq_integered_raw) + self.assertIsNone(seq_integered["value"].defined) + defines_by_path.append( + (("type",), ("value", { + type_integered: Integer(), + type_sequenced: SeqInner(), + })) + ) + seq_integered, _ = Seq().decode(seq_integered_raw, defines_by_path=defines_by_path) + self.assertIsNotNone(seq_integered["value"].defined) + self.assertEqual(seq_integered["value"].defined[0], type_integered) + self.assertEqual(seq_integered["value"].defined[1], Integer(123)) + + seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path) + self.assertIsNotNone(seq_sequenced["value"].defined) + self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced) + seq_inner = seq_sequenced["value"].defined[1] + self.assertIsNone(seq_inner["valueInner"].defined) + + defines_by_path.append(( + ("value", decode_path_defby(type_sequenced), "typeInner"), + ("valueInner", {type_innered: Pairs()}), + )) + seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path) + self.assertIsNotNone(seq_sequenced["value"].defined) + self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced) + seq_inner = seq_sequenced["value"].defined[1] + self.assertIsNotNone(seq_inner["valueInner"].defined) + self.assertEqual(seq_inner["valueInner"].defined[0], type_innered) + pairs = seq_inner["valueInner"].defined[1] + for pair in pairs: + self.assertIsNone(pair["value"][0].defined) + + defines_by_path.append(( + ( + "value", + decode_path_defby(type_sequenced), + "valueInner", + decode_path_defby(type_innered), + any, + "type", + ), + ("value", { + type_integered: Integer(), + type_octet_stringed: OctetString(), + }), + )) + seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path) + self.assertIsNotNone(seq_sequenced["value"].defined) + self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced) + seq_inner = seq_sequenced["value"].defined[1] + self.assertIsNotNone(seq_inner["valueInner"].defined) + self.assertEqual(seq_inner["valueInner"].defined[0], type_innered) + pairs_got = seq_inner["valueInner"].defined[1] + for pair_input, pair_got in zip(pairs_input, pairs_got): + self.assertEqual(pair_got["value"][0].defined[0], pair_input[0]) + self.assertEqual(pair_got["value"][0].defined[1], pair_input[1])