]> Cypherpunks.ru repositories - pyderasn.git/commitdiff
DEFINED BY support
authorSergey Matveev <stargrave@stargrave.org>
Sun, 29 Oct 2017 16:13:42 +0000 (19:13 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sun, 29 Oct 2017 16:19:22 +0000 (19:19 +0300)
README
VERSION
doc/examples.rst
doc/features.rst
doc/news.rst
pyderasn.py
tests/test_crts.py
tests/test_pyderasn.py

diff --git a/README b/README
index 607fedc0df44b5fd4af1d2dcb3de9169bd0270bb..8d6d2a070746ffe001ecf003e08fd1220ce6a42c 100644 (file)
--- a/README
+++ b/README
@@ -10,10 +10,11 @@ PyDERASN -- ASN.1 DER library for Python
 * Working with sequences as high level data objects with ability to
   (un)marshall them
 * Python 2.7/3.5 compatibility
-* __slots__ friendliness
+* Automatic decoding of DEFINED BY fields
 * Ability to know exact decoded objects offset and lengths in the binary
 * Pretty printer and command-line decoder, that could conveniently
   replace utilities like either dumpasn1 or openssl asn1parse
+* __slots__ friendliness
 
 pyderasn is free software: see the file COPYING.LESSER for copying conditions.
 
diff --git a/VERSION b/VERSION
index 7e32cd56983e65ffbfcfeb39146e7ee67e986e10..c068b2447cc22327bcac3f4884de3b32025c5bb2 100644 (file)
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.3
+1.4
index 108048e33036fe54d70f56d0bdf87ddbe315fc88..a9e57b66e2e88d8623e5b971660de824fa1a8f01 100644 (file)
@@ -415,3 +415,54 @@ Let's create some simple self-signed X.509 certificate from the ground::
     crt.encode()
 
 And we will get the same certificate used in Go's library tests.
+
+DEFINED BY fields
+-----------------
+
+Here is only very simple example how you can define Any/OctetString
+fields automatic decoding::
+
+    class AttributeTypeAndValue(Sequence):
+        schema = (
+            ("type", AttributeType(defines=("value", {
+                id_at_countryName: PrintableString(),
+                id_at_stateOrProvinceName: PrintableString(),
+                id_at_localityName: PrintableString(),
+                id_at_organizationName: PrintableString(),
+                id_at_commonName: PrintableString(),
+            }))),
+            ("value", AttributeValue()),
+        )
+
+And when you will try to decode X.509 certificate with it, your pretty
+printer will show::
+
+       34   [0,0, 149]  . . issuer: Name CHOICE rdnSequence
+       34   [1,2, 146]  . . . rdnSequence: RDNSequence SEQUENCE OF
+       37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
+       39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
+       41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
+       46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
+                        . . . . . . . 13:02:58:58
+       46   [1,1,   2]  . . . . . . . DEFINED BY (2.5.4.6): PrintableString PrintableString XX
+       50   [1,1,  19]  . . . . 1: RelativeDistinguishedName SET OF
+       52   [1,1,  17]  . . . . . 0: AttributeTypeAndValue SEQUENCE
+       54   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
+       59   [0,0,  12]  . . . . . . value: [UNIV 19] AttributeValue ANY
+                        . . . . . . . 13:0A:53:6F:6D:65:2D:53:74:61:74:65
+       59   [1,1,  10]  . . . . . . . DEFINED BY (2.5.4.8): PrintableString PrintableString Some-State
+       71   [1,1,  13]  . . . . 2: RelativeDistinguishedName SET OF
+       73   [1,1,  11]  . . . . . 0: AttributeTypeAndValue SEQUENCE
+       75   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
+       80   [0,0,   6]  . . . . . . value: [UNIV 19] AttributeValue ANY
+                        . . . . . . . 13:04:43:69:74:79
+       80   [1,1,   4]  . . . . . . . DEFINED BY (2.5.4.7): PrintableString PrintableString City
+       86   [1,1,  33]  . . . . 3: RelativeDistinguishedName SET OF
+       88   [1,1,  31]  . . . . . 0: AttributeTypeAndValue SEQUENCE
+       90   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-organizationName (2.5.4.10)
+       95   [0,0,  26]  . . . . . . value: [UNIV 19] AttributeValue ANY
+                        . . . . . . . 13:18:49:6E:74:65:72:6E:65:74:20:57:69:64:67:69
+                        . . . . . . . 74:73:20:50:74:79:20:4C:74:64
+       95   [1,1,  24]  . . . . . . . DEFINED BY (2.5.4.10): PrintableString PrintableString Internet Widgits Pty Ltd
+
+:ref:`Read more <definedby>` about that feature.
index e6afdd3a96687d2d36a207474fd5d74a43be60e6..d2d1734eb779b6545132b698bf65344d281f66d9 100644 (file)
@@ -4,7 +4,7 @@ Features
 * Basic ASN.1 data types (X.208): BOOLEAN, INTEGER, BIT STRING, OCTET
   STRING, NULL, OBJECT IDENTIFIER, ENUMERATED, all strings, UTCTime,
   GeneralizedTime, CHOICE, ANY, SEQUENCE (OF), SET (OF)
-* Size constraints checking
+* Size :ref:`constraints <bounds>` checking
 * Working with sequences as high level data objects with ability to
   (un)marshall them
 * Python 2.7/3.5 compatibility
@@ -15,14 +15,17 @@ practice it should be relatively easy to convert ``pyasn1``'s code to
 ``pyderasn``'s one. But additionally it offers:
 
 * Small, simple and trying to be reviewable code. Just a single file
-* ``__slots__`` friendliness
-* Ability to know exact decoded objects offsets and lengths in the binary
-* Pretty printer and command-line decoder, that could conveniently
-  replace utilities like either ``dumpasn1`` or ``openssl asn1parse``
+* Automatic decoding of :ref:`DEFINED BY <definedby>` fields
+* Ability to know :ref:`exact decoded <decoding>` objects offsets and
+  lengths inside the binary
+* :ref:`Pretty printer <pprinting>` and command-line decoder, that could
+  conveniently replace utilities like either ``dumpasn1`` or
+  ``openssl asn1parse``
 * Some kind of strong typing: SEQUENCEs require the exact **type** of
   settable values, even when they are inherited
 * However they do not require tags matching: IMPLICIT/EXPLICIT tags will
   be set automatically in the given sequence
+* ``__slots__`` friendliness
 * Could be significantly faster. For example parsing of CACert.org's CRL
   under Python 3.5.2:
 
index c80cf9b8f88b100473d4aee2f7d4d9d5be4adcf7..bd71b82769d5339baeaafca6f249ba5dd9a8d6c9 100644 (file)
@@ -1,6 +1,13 @@
 News
 ====
 
+.. _release1.4:
+
+1.4
+---
+Ability to automatically decode :ref:`DEFINED BY <definedby>` fields
+inside SEQUENCEs.
+
 .. _release1.3:
 
 1.3
index 32ce73c664a72155d64844ac42ef434a7ce1a709..56eec8d7953e62d6b63295b80795d1dd148d8aed 100755 (executable)
@@ -135,6 +135,8 @@ example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
 When default argument is used and value is not specified, then it equals
 to default one.
 
+.. _bounds:
+
 Size constraints
 ________________
 
@@ -164,8 +166,10 @@ then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
 All objects have ``copy()`` method, returning its copy, that can be safely
 mutated.
 
+.. _decoding:
+
 Decoding
-________
+--------
 
 Decoding is performed using ``decode()`` method. ``offset`` optional
 argument could be used to set initial object's offset in the binary
@@ -191,8 +195,10 @@ lesser than ``offset``), ``expl_tlen``, ``expl_llen``, ``expl_vlen``
 
 When error occurs, then :py:exc:`pyderasn.DecodeError` is raised.
 
+.. _pprinting:
+
 Pretty printing
-_______________
+---------------
 
 All objects have ``pps()`` method, that is a generator of
 :py:class:`pyderasn.PP` namedtuple, holding various raw information
@@ -209,6 +215,116 @@ all object ``repr``. But it is easy to write custom formatters.
     >>> print(pprint(obj))
         0   [1,1,   2] INTEGER -12345
 
+.. _definedby:
+
+DEFINED BY
+----------
+
+ASN.1 structures often have ANY and OCTET STRING fields, that are
+DEFINED BY some previously met ObjectIdentifier. This library provides
+ability to specify mapping between some OID and field that must be
+decoded with specific specification.
+
+defines kwarg
+_____________
+
+:py:class:`pyderasn.ObjectIdentifier` field inside
+:py:class:`pyderasn.Sequence` can hold mapping between OIDs and
+necessary for decoding structrures. For example, CMS (:rfc:`5652`)
+container::
+
+    class ContentInfo(Sequence):
+        schema = (
+            ("contentType", ContentType(defines=("content", {
+                id_digestedData: DigestedData(),
+                id_signedData: SignedData(),
+            }))),
+            ("content", Any(expl=tag_ctxc(0))),
+        )
+
+``contentType`` field tells that it defines that ``content`` must be
+decoded with ``SignedData`` specification, if ``contentType`` equals to
+``id-signedData``. The same applies to ``DigestedData``. If
+``contentType`` contains unknown OID, then no automatic decoding is
+done.
+
+Following types can be automatically decoded (DEFINED BY):
+
+* :py:class:`pyderasn.Any`
+* :py:class:`pyderasn.OctetString`
+* :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
+  ``Any``/``OctetString``-s
+
+When any of those fields is automatically decoded, then ``.defined``
+attribute contains ``(OID, value)`` tuple. OID tell by which OID it was
+defined, ``value`` contains corresponding decoded value. For example
+above, ``content_info["content"].defined == (id_signedData,
+signed_data)``.
+
+defines_by_path kwarg
+_____________________
+
+Sometimes you either can not or do not want to explicitly set *defines*
+in the scheme. You can dynamically apply those definitions when calling
+``.decode()`` method.
+
+Decode method takes optional ``defines_by_path`` keyword argument that
+must be sequence of following tuples::
+
+    (decode_path, defines)
+
+where ``decode_path`` is a tuple holding so-called decode path to the
+exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
+``defines``, holding exactly the same value as accepted in its keyword
+argument.
+
+For example, again for CMS, you want to automatically decode
+``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
+structures it may hold. Also, automatically decode ``controlSequence``
+of ``PKIResponse``::
+
+    content_info, tail = ContentInfo().decode(data, defines_by_path=(
+        (
+            ("contentType",),
+            ("content", {id_signedData: SignedData()}),
+        ),
+        (
+            (
+                "content",
+                decode_path_defby(id_signedData),
+                "encapContentInfo",
+                "eContentType",
+            ),
+            ("eContent", {
+                id_cct_PKIData: PKIData(),
+                id_cct_PKIResponse: PKIResponse(),
+            }),
+        ),
+        (
+            (
+                "content",
+                decode_path_defby(id_signedData),
+                "encapContentInfo",
+                "eContent",
+                decode_path_defby(id_cct_PKIResponse),
+                "controlSequence",
+                any,
+                "attrType",
+            ),
+            ("attrValues", {
+                id_cmc_recipientNonce: RecipientNonce(),
+                id_cmc_senderNonce: SenderNonce(),
+                id_cmc_statusInfoV2: CMCStatusInfoV2(),
+                id_cmc_transactionId: TransactionId(),
+            }),
+        ),
+    ))
+
+Pay attention for :py:func:`pyderasn.decode_path_defby` and ``any``.
+First function is useful for path construction when some automatic
+decoding is already done. ``any`` is used for human readability and
+means literally any value it meet -- useful for sequence and set of-s.
+
 Primitive types
 ---------------
 
@@ -338,6 +454,7 @@ __all__ = (
     "Boolean",
     "BoundsError",
     "Choice",
+    "decode_path_defby",
     "DecodeError",
     "Enumerated",
     "GeneralizedTime",
@@ -747,7 +864,7 @@ class Obj(object):
     def _encode(self):  # pragma: no cover
         raise NotImplementedError()
 
-    def _decode(self, tlv, offset=0, decode_path=()):  # pragma: no cover
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):  # pragma: no cover
         raise NotImplementedError()
 
     def encode(self):
@@ -756,13 +873,14 @@ class Obj(object):
             return raw
         return b"".join((self._expl, len_encode(len(raw)), raw))
 
-    def decode(self, data, offset=0, leavemm=False, decode_path=()):
+    def decode(self, data, offset=0, leavemm=False, decode_path=(), defines_by_path=None):
         """Decode the data
 
         :param data: either binary or memoryview
         :param int offset: initial data's offset
         :param bool leavemm: do we need to leave memoryview of remaining
                     data as is, or convert it to bytes otherwise
+        :param defines_by_path: :ref:`Read about DEFINED BY <definedby>`
         :returns: (Obj, remaining data)
         """
         tlv = memoryview(data)
@@ -771,6 +889,7 @@ class Obj(object):
                 tlv,
                 offset,
                 decode_path=decode_path,
+                defines_by_path=defines_by_path,
             )
         else:
             try:
@@ -808,6 +927,7 @@ class Obj(object):
                 v,
                 offset=offset + tlen + llen,
                 decode_path=decode_path,
+                defines_by_path=defines_by_path,
             )
         return obj, (tail if leavemm else tail.tobytes())
 
@@ -840,6 +960,12 @@ class Obj(object):
         return self.expl_tlen + self.expl_llen + self.expl_vlen
 
 
+def decode_path_defby(defined_by):
+    """DEFINED BY representation inside decode path
+    """
+    return "DEFINED BY (%s)" % defined_by
+
+
 ########################################################################
 # Pretty printing
 ########################################################################
@@ -1109,7 +1235,7 @@ class Boolean(Obj):
             (b"\xFF" if self._value else b"\x00"),
         ))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -1402,7 +1528,7 @@ class Integer(Obj):
                     break
         return b"".join((self.tag, len_encode(len(octets)), octets))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -1735,7 +1861,7 @@ class BitString(Obj):
             octets,
         ))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -1856,7 +1982,7 @@ class OctetString(Obj):
     >>> OctetString(b"hell", bounds=(4, 4))
     OCTET STRING 4 bytes 68656c6c
     """
-    __slots__ = ("_bound_min", "_bound_max")
+    __slots__ = ("_bound_min", "_bound_max", "defined")
     tag_default = tag_encode(4)
     asn1_type_name = "OCTET STRING"
 
@@ -1904,6 +2030,7 @@ class OctetString(Obj):
             )
             if self._value is None:
                 self._value = default
+        self.defined = None
 
     def _value_sanitize(self, value):
         if issubclass(value.__class__, OctetString):
@@ -1981,7 +2108,7 @@ class OctetString(Obj):
             self._value,
         ))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -2056,6 +2183,11 @@ class OctetString(Obj):
             expl_llen=self.expl_llen if self.expled else None,
             expl_vlen=self.expl_vlen if self.expled else None,
         )
+        defined_by, defined = self.defined or (None, None)
+        if defined_by is not None:
+            yield defined.pps(
+                decode_path=decode_path + (decode_path_defby(defined_by),)
+            )
 
 
 class Null(Obj):
@@ -2125,7 +2257,7 @@ class Null(Obj):
     def _encode(self):
         return self.tag + len_encode(0)
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -2205,13 +2337,14 @@ class ObjectIdentifier(Obj):
     Traceback (most recent call last):
     pyderasn.InvalidOID: unacceptable first arc value
     """
-    __slots__ = ()
+    __slots__ = ("defines",)
     tag_default = tag_encode(6)
     asn1_type_name = "OBJECT IDENTIFIER"
 
     def __init__(
             self,
             value=None,
+            defines=None,
             impl=None,
             expl=None,
             default=None,
@@ -2222,6 +2355,13 @@ class ObjectIdentifier(Obj):
         :param value: set the value. Either tuples of integers,
                       string of "."-concatenated integers, or
                       :py:class:`pyderasn.ObjectIdentifier` object
+        :param defines: tuple of two elements. First one is a name of
+                        field inside :py:class:`pyderasn.Sequence`,
+                        defining with that OID. Second element is a
+                        ``{OID: pyderasn.Obj()}`` dictionary, mapping
+                        between current OID value and structure applied
+                        to defined field.
+                        :ref:`Read about DEFINED BY <definedby>`
         :param bytes impl: override default tag with ``IMPLICIT`` one
         :param bytes expl: override default tag with ``EXPLICIT`` one
         :param default: set default value. Type same as in ``value``
@@ -2246,6 +2386,7 @@ class ObjectIdentifier(Obj):
             )
             if self._value is None:
                 self._value = default
+        self.defines = defines
 
     def __add__(self, their):
         if isinstance(their, self.__class__):
@@ -2283,6 +2424,7 @@ class ObjectIdentifier(Obj):
     def copy(self):
         obj = self.__class__()
         obj._value = self._value
+        obj.defines = self.defines
         obj.tag = self.tag
         obj._expl = self._expl
         obj.default = self.default
@@ -2324,6 +2466,7 @@ class ObjectIdentifier(Obj):
     def __call__(
             self,
             value=None,
+            defines=None,
             impl=None,
             expl=None,
             default=None,
@@ -2331,6 +2474,7 @@ class ObjectIdentifier(Obj):
     ):
         return self.__class__(
             value=value,
+            defines=self.defines if defines is None else defines,
             impl=self.tag if impl is None else impl,
             expl=self._expl if expl is None else expl,
             default=self.default if default is None else default,
@@ -2356,7 +2500,7 @@ class ObjectIdentifier(Obj):
         v = b"".join(octets)
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3123,7 +3267,7 @@ class Choice(Obj):
         self._assert_ready()
         return self._value[1].encode()
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         for choice, spec in self.specs.items():
             try:
                 value, tail = spec.decode(
@@ -3131,6 +3275,7 @@ class Choice(Obj):
                     offset=offset,
                     leavemm=True,
                     decode_path=decode_path + (choice,),
+                    defines_by_path=defines_by_path,
                 )
             except TagMismatch:
                 continue
@@ -3219,7 +3364,7 @@ class Any(Obj):
     >>> hexenc(bytes(a))
     b'0x040x0bhello world'
     """
-    __slots__ = ()
+    __slots__ = ("defined",)
     tag_default = tag_encode(0)
     asn1_type_name = "ANY"
 
@@ -3240,6 +3385,7 @@ class Any(Obj):
         """
         super(Any, self).__init__(None, expl, None, optional, _decoded)
         self._value = None if value is None else self._value_sanitize(value)
+        self.defined = None
 
     def _value_sanitize(self, value):
         if isinstance(value, self.__class__):
@@ -3296,7 +3442,7 @@ class Any(Obj):
         self._assert_ready()
         return self._value
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, tlen, lv = tag_strip(tlv)
             l, llen, v = len_decode(lv)
@@ -3347,12 +3493,30 @@ class Any(Obj):
             expl_llen=self.expl_llen if self.expled else None,
             expl_vlen=self.expl_vlen if self.expled else None,
         )
+        defined_by, defined = self.defined or (None, None)
+        if defined_by is not None:
+            yield defined.pps(
+                decode_path=decode_path + (decode_path_defby(defined_by),)
+            )
 
 
 ########################################################################
 # ASN.1 constructed types
 ########################################################################
 
+def get_def_by_path(defines_by_path, sub_decode_path):
+    """Get define by decode path
+    """
+    for path, define in defines_by_path:
+        if len(path) != len(sub_decode_path):
+            continue
+        for p1, p2 in zip(path, sub_decode_path):
+            if (p1 != any) and (p1 != p2):
+                break
+        else:
+            return define
+
+
 class Sequence(Obj):
     """``SEQUENCE`` structure type
 
@@ -3566,7 +3730,7 @@ class Sequence(Obj):
         v = b"".join(self._encoded_values())
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3601,20 +3765,65 @@ class Sequence(Obj):
         v, tail = v[:l], v[l:]
         sub_offset = offset + tlen + llen
         values = {}
+        defines = {}
         for name, spec in self.specs.items():
             if len(v) == 0 and spec.optional:
                 continue
+            sub_decode_path = decode_path + (name,)
             try:
                 value, v_tail = spec.decode(
                     v,
                     sub_offset,
                     leavemm=True,
-                    decode_path=decode_path + (name,),
+                    decode_path=sub_decode_path,
+                    defines_by_path=defines_by_path,
                 )
             except TagMismatch:
                 if spec.optional:
                     continue
                 raise
+
+            defined = defines.pop(name, None)
+            if defined is not None:
+                defined_by, defined_spec = defined
+                if issubclass(value.__class__, SequenceOf):
+                    for i, _value in enumerate(value):
+                        sub_sub_decode_path = sub_decode_path + (
+                            str(i),
+                            decode_path_defby(defined_by),
+                        )
+                        defined_value, defined_tail = defined_spec.decode(
+                            memoryview(bytes(_value)),
+                            sub_offset + value.tlen + value.llen,
+                            leavemm=True,
+                            decode_path=sub_sub_decode_path,
+                            defines_by_path=defines_by_path,
+                        )
+                        if len(defined_tail) > 0:
+                            raise DecodeError(
+                                "remaining data",
+                                klass=self.__class__,
+                                decode_path=sub_sub_decode_path,
+                                offset=offset,
+                            )
+                        _value.defined = (defined_by, defined_value)
+                else:
+                    defined_value, defined_tail = defined_spec.decode(
+                        memoryview(bytes(value)),
+                        sub_offset + value.tlen + value.llen,
+                        leavemm=True,
+                        decode_path=sub_decode_path + (decode_path_defby(defined_by),),
+                        defines_by_path=defines_by_path,
+                    )
+                    if len(defined_tail) > 0:
+                        raise DecodeError(
+                            "remaining data",
+                            klass=self.__class__,
+                            decode_path=sub_decode_path + (decode_path_defby(defined_by),),
+                            offset=offset,
+                        )
+                    value.defined = (defined_by, defined_value)
+
             sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
             v = v_tail
             if spec.default is not None and value == spec.default:
@@ -3622,6 +3831,15 @@ class Sequence(Obj):
                 # but we allow that anyway
                 continue
             values[name] = value
+
+            spec_defines = getattr(spec, "defines", None)
+            if defines_by_path is not None and spec_defines is None:
+                spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
+            if spec_defines is not None:
+                what, schema = spec_defines
+                defined = schema.get(value, None)
+                if defined is not None:
+                    defines[what] = (value, defined)
         if len(v) > 0:
             raise DecodeError(
                 "remaining data",
@@ -3690,7 +3908,7 @@ class Set(Sequence):
         v = b"".join(raws)
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3733,6 +3951,7 @@ class Set(Sequence):
                         sub_offset,
                         leavemm=True,
                         decode_path=decode_path + (name,),
+                        defines_by_path=defines_by_path,
                     )
                 except TagMismatch:
                     continue
@@ -3942,7 +4161,7 @@ class SequenceOf(Obj):
         v = b"".join(self._encoded_values())
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3984,6 +4203,7 @@ class SequenceOf(Obj):
                 sub_offset,
                 leavemm=True,
                 decode_path=decode_path + (str(len(_value)),),
+                defines_by_path=defines_by_path,
             )
             sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
             v = v_tail
index 5a839eb4d0059a1edf7d22056354226a9cd576ea..a096412333df3248740d3a467ba2eb43d47aa3d7 100644 (file)
@@ -37,6 +37,7 @@ from pyderasn import SequenceOf
 from pyderasn import SetOf
 from pyderasn import tag_ctxc
 from pyderasn import tag_ctxp
+from pyderasn import TeletexString
 from pyderasn import UTCTime
 
 
@@ -88,9 +89,22 @@ class AttributeValue(Any):
     pass
 
 
+class OrganizationName(Choice):
+    schema = (
+        ('printableString', PrintableString()),
+        ('teletexString', TeletexString()),
+    )
+
+
 class AttributeTypeAndValue(Sequence):
     schema = (
-        ("type", AttributeType()),
+        ("type", AttributeType(defines=("value", {
+            ObjectIdentifier("2.5.4.6"): PrintableString(),
+            ObjectIdentifier("2.5.4.8"): PrintableString(),
+            ObjectIdentifier("2.5.4.7"): PrintableString(),
+            ObjectIdentifier("2.5.4.10"): OrganizationName(),
+            ObjectIdentifier("2.5.4.3"): PrintableString(),
+        }))),
         ("value", AttributeValue()),
     )
 
index fdc03b58369cb8bcd7607f5a9a5c4d7b56661ea8..23127261feb9175309eea0f8b6e41356f0e7cf56 100644 (file)
@@ -56,6 +56,7 @@ from pyderasn import BMPString
 from pyderasn import Boolean
 from pyderasn import BoundsError
 from pyderasn import Choice
+from pyderasn import decode_path_defby
 from pyderasn import DecodeError
 from pyderasn import Enumerated
 from pyderasn import GeneralizedTime
@@ -85,6 +86,7 @@ from pyderasn import SequenceOf
 from pyderasn import Set
 from pyderasn import SetOf
 from pyderasn import tag_ctxc
+from pyderasn import tag_ctxp
 from pyderasn import tag_decode
 from pyderasn import tag_encode
 from pyderasn import tag_strip
@@ -1953,12 +1955,12 @@ class TestObjectIdentifier(CommonMixin, TestCase):
                 _decoded_initial,
             ) = d.draw(oid_values_strategy())
             obj_initial = klass(
-                value_initial,
-                impl_initial,
-                expl_initial,
-                default_initial,
-                optional_initial or False,
-                _decoded_initial,
+                value=value_initial,
+                impl=impl_initial,
+                expl=expl_initial,
+                default=default_initial,
+                optional=optional_initial or False,
+                _decoded=_decoded_initial,
             )
             (
                 value,
@@ -1968,7 +1970,13 @@ class TestObjectIdentifier(CommonMixin, TestCase):
                 optional,
                 _decoded,
             ) = d.draw(oid_values_strategy(do_expl=impl_initial is None))
-            obj = obj_initial(value, impl, expl, default, optional)
+            obj = obj_initial(
+                value=value,
+                impl=impl,
+                expl=expl,
+                default=default,
+                optional=optional,
+            )
             if obj.ready:
                 value_expected = default if value is None else value
                 value_expected = (
@@ -1992,7 +2000,22 @@ class TestObjectIdentifier(CommonMixin, TestCase):
     @given(oid_values_strategy())
     def test_copy(self, values):
         for klass in (ObjectIdentifier, ObjectIdentifierInherited):
-            obj = klass(*values)
+            (
+                value,
+                impl,
+                expl,
+                default,
+                optional,
+                _decoded,
+            ) = values
+            obj = klass(
+                value=value,
+                impl=impl,
+                expl=expl,
+                default=default,
+                optional=optional,
+                _decoded=_decoded,
+            )
             obj_copied = obj.copy()
             self.assert_copied_basic_fields(obj, obj_copied)
             self.assertEqual(obj._value, obj_copied._value)
@@ -4861,3 +4884,163 @@ class TestAutoAddSlots(TestCase):
         with self.assertRaises(AttributeError):
             inher = Inher()
             inher.unexistent = "whatever"
+
+
+class TestOIDDefines(TestCase):
+    @given(data_strategy())
+    def runTest(self, d):
+        value_names = list(d.draw(sets(text_letters(), min_size=1, max_size=10)))
+        value_name_chosen = d.draw(sampled_from(value_names))
+        oids = [
+            ObjectIdentifier(oid)
+            for oid in d.draw(sets(oid_strategy(), min_size=2, max_size=10))
+        ]
+        oid_chosen = d.draw(sampled_from(oids))
+        values = d.draw(lists(
+            integers(),
+            min_size=len(value_names),
+            max_size=len(value_names),
+        ))
+        _schema = [
+            ("type", ObjectIdentifier(defines=(value_name_chosen, {
+                oid: Integer() for oid in oids[:-1]
+            }))),
+        ]
+        for i, value_name in enumerate(value_names):
+            _schema.append((value_name, Any(expl=tag_ctxp(i))))
+
+        class Seq(Sequence):
+            schema = _schema
+        seq = Seq()
+        for value_name, value in zip(value_names, values):
+            seq[value_name] = Any(Integer(value).encode())
+        seq["type"] = oid_chosen
+        seq, _ = Seq().decode(seq.encode())
+        for value_name in value_names:
+            if value_name == value_name_chosen:
+                continue
+            self.assertIsNone(seq[value_name].defined)
+        if value_name_chosen in oids[:-1]:
+            self.assertIsNotNone(seq[value_name_chosen].defined)
+            self.assertEqual(seq[value_name_chosen].defined[0], oid_chosen)
+            self.assertIsInstance(seq[value_name_chosen].defined[1], Integer)
+
+
+class TestDefinesByPath(TestCase):
+    def runTest(self):
+        class Seq(Sequence):
+            schema = (
+                ("type", ObjectIdentifier()),
+                ("value", OctetString(expl=tag_ctxc(123))),
+            )
+
+        class SeqInner(Sequence):
+            schema = (
+                ("typeInner", ObjectIdentifier()),
+                ("valueInner", Any()),
+            )
+
+        class PairValue(SetOf):
+            schema = Any()
+
+        class Pair(Sequence):
+            schema = (
+                ("type", ObjectIdentifier()),
+                ("value", PairValue()),
+            )
+
+        class Pairs(SequenceOf):
+            schema = Pair()
+
+        (
+            type_integered,
+            type_sequenced,
+            type_innered,
+            type_octet_stringed,
+        ) = [
+            ObjectIdentifier(oid)
+            for oid in sets(oid_strategy(), min_size=4, max_size=4).example()
+        ]
+        seq_integered = Seq()
+        seq_integered["type"] = type_integered
+        seq_integered["value"] = OctetString(Integer(123).encode())
+        seq_integered_raw = seq_integered.encode()
+
+        pairs = Pairs()
+        pairs_input = (
+            (type_octet_stringed, OctetString(b"whatever")),
+            (type_integered, Integer(123)),
+            (type_octet_stringed, OctetString(b"whenever")),
+            (type_integered, Integer(234)),
+        )
+        for t, v in pairs_input:
+            pair = Pair()
+            pair["type"] = t
+            pair["value"] = PairValue((Any(v),))
+            pairs.append(pair)
+        seq_inner = SeqInner()
+        seq_inner["typeInner"] = type_innered
+        seq_inner["valueInner"] = Any(pairs)
+        seq_sequenced = Seq()
+        seq_sequenced["type"] = type_sequenced
+        seq_sequenced["value"] = OctetString(seq_inner.encode())
+        seq_sequenced_raw = seq_sequenced.encode()
+
+        defines_by_path = []
+        seq_integered, _ = Seq().decode(seq_integered_raw)
+        self.assertIsNone(seq_integered["value"].defined)
+        defines_by_path.append(
+            (("type",), ("value", {
+                type_integered: Integer(),
+                type_sequenced: SeqInner(),
+            }))
+        )
+        seq_integered, _ = Seq().decode(seq_integered_raw, defines_by_path=defines_by_path)
+        self.assertIsNotNone(seq_integered["value"].defined)
+        self.assertEqual(seq_integered["value"].defined[0], type_integered)
+        self.assertEqual(seq_integered["value"].defined[1], Integer(123))
+
+        seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+        self.assertIsNotNone(seq_sequenced["value"].defined)
+        self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
+        seq_inner = seq_sequenced["value"].defined[1]
+        self.assertIsNone(seq_inner["valueInner"].defined)
+
+        defines_by_path.append((
+            ("value", decode_path_defby(type_sequenced), "typeInner"),
+            ("valueInner", {type_innered: Pairs()}),
+        ))
+        seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+        self.assertIsNotNone(seq_sequenced["value"].defined)
+        self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
+        seq_inner = seq_sequenced["value"].defined[1]
+        self.assertIsNotNone(seq_inner["valueInner"].defined)
+        self.assertEqual(seq_inner["valueInner"].defined[0], type_innered)
+        pairs = seq_inner["valueInner"].defined[1]
+        for pair in pairs:
+            self.assertIsNone(pair["value"][0].defined)
+
+        defines_by_path.append((
+            (
+                "value",
+                decode_path_defby(type_sequenced),
+                "valueInner",
+                decode_path_defby(type_innered),
+                any,
+                "type",
+            ),
+            ("value", {
+                type_integered: Integer(),
+                type_octet_stringed: OctetString(),
+            }),
+        ))
+        seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+        self.assertIsNotNone(seq_sequenced["value"].defined)
+        self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
+        seq_inner = seq_sequenced["value"].defined[1]
+        self.assertIsNotNone(seq_inner["valueInner"].defined)
+        self.assertEqual(seq_inner["valueInner"].defined[0], type_innered)
+        pairs_got = seq_inner["valueInner"].defined[1]
+        for pair_input, pair_got in zip(pairs_input, pairs_got):
+            self.assertEqual(pair_got["value"][0].defined[0], pair_input[0])
+            self.assertEqual(pair_got["value"][0].defined[1], pair_input[1])