]> Cypherpunks.ru repositories - pyderasn.git/commitdiff
DEFINED BY support
authorSergey Matveev <stargrave@stargrave.org>
Sun, 29 Oct 2017 16:13:42 +0000 (19:13 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sun, 29 Oct 2017 16:19:22 +0000 (19:19 +0300)
README
VERSION
doc/examples.rst
doc/features.rst
doc/news.rst
pyderasn.py
tests/test_crts.py
tests/test_pyderasn.py

diff --git a/README b/README
index 607fedc0df44b5fd4af1d2dcb3de9169bd0270bb..8d6d2a070746ffe001ecf003e08fd1220ce6a42c 100644 (file)
--- a/README
+++ b/README
@@ -10,10 +10,11 @@ PyDERASN -- ASN.1 DER library for Python
 * Working with sequences as high level data objects with ability to
   (un)marshall them
 * Python 2.7/3.5 compatibility
 * Working with sequences as high level data objects with ability to
   (un)marshall them
 * Python 2.7/3.5 compatibility
-* __slots__ friendliness
+* Automatic decoding of DEFINED BY fields
 * Ability to know exact decoded objects offset and lengths in the binary
 * Pretty printer and command-line decoder, that could conveniently
   replace utilities like either dumpasn1 or openssl asn1parse
 * Ability to know exact decoded objects offset and lengths in the binary
 * Pretty printer and command-line decoder, that could conveniently
   replace utilities like either dumpasn1 or openssl asn1parse
+* __slots__ friendliness
 
 pyderasn is free software: see the file COPYING.LESSER for copying conditions.
 
 
 pyderasn is free software: see the file COPYING.LESSER for copying conditions.
 
diff --git a/VERSION b/VERSION
index 7e32cd56983e65ffbfcfeb39146e7ee67e986e10..c068b2447cc22327bcac3f4884de3b32025c5bb2 100644 (file)
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.3
+1.4
index 108048e33036fe54d70f56d0bdf87ddbe315fc88..a9e57b66e2e88d8623e5b971660de824fa1a8f01 100644 (file)
@@ -415,3 +415,54 @@ Let's create some simple self-signed X.509 certificate from the ground::
     crt.encode()
 
 And we will get the same certificate used in Go's library tests.
     crt.encode()
 
 And we will get the same certificate used in Go's library tests.
+
+DEFINED BY fields
+-----------------
+
+Here is only very simple example how you can define Any/OctetString
+fields automatic decoding::
+
+    class AttributeTypeAndValue(Sequence):
+        schema = (
+            ("type", AttributeType(defines=("value", {
+                id_at_countryName: PrintableString(),
+                id_at_stateOrProvinceName: PrintableString(),
+                id_at_localityName: PrintableString(),
+                id_at_organizationName: PrintableString(),
+                id_at_commonName: PrintableString(),
+            }))),
+            ("value", AttributeValue()),
+        )
+
+And when you will try to decode X.509 certificate with it, your pretty
+printer will show::
+
+       34   [0,0, 149]  . . issuer: Name CHOICE rdnSequence
+       34   [1,2, 146]  . . . rdnSequence: RDNSequence SEQUENCE OF
+       37   [1,1,  11]  . . . . 0: RelativeDistinguishedName SET OF
+       39   [1,1,   9]  . . . . . 0: AttributeTypeAndValue SEQUENCE
+       41   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
+       46   [0,0,   4]  . . . . . . value: [UNIV 19] AttributeValue ANY
+                        . . . . . . . 13:02:58:58
+       46   [1,1,   2]  . . . . . . . DEFINED BY (2.5.4.6): PrintableString PrintableString XX
+       50   [1,1,  19]  . . . . 1: RelativeDistinguishedName SET OF
+       52   [1,1,  17]  . . . . . 0: AttributeTypeAndValue SEQUENCE
+       54   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
+       59   [0,0,  12]  . . . . . . value: [UNIV 19] AttributeValue ANY
+                        . . . . . . . 13:0A:53:6F:6D:65:2D:53:74:61:74:65
+       59   [1,1,  10]  . . . . . . . DEFINED BY (2.5.4.8): PrintableString PrintableString Some-State
+       71   [1,1,  13]  . . . . 2: RelativeDistinguishedName SET OF
+       73   [1,1,  11]  . . . . . 0: AttributeTypeAndValue SEQUENCE
+       75   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
+       80   [0,0,   6]  . . . . . . value: [UNIV 19] AttributeValue ANY
+                        . . . . . . . 13:04:43:69:74:79
+       80   [1,1,   4]  . . . . . . . DEFINED BY (2.5.4.7): PrintableString PrintableString City
+       86   [1,1,  33]  . . . . 3: RelativeDistinguishedName SET OF
+       88   [1,1,  31]  . . . . . 0: AttributeTypeAndValue SEQUENCE
+       90   [1,1,   3]  . . . . . . type: AttributeType OBJECT IDENTIFIER id-at-organizationName (2.5.4.10)
+       95   [0,0,  26]  . . . . . . value: [UNIV 19] AttributeValue ANY
+                        . . . . . . . 13:18:49:6E:74:65:72:6E:65:74:20:57:69:64:67:69
+                        . . . . . . . 74:73:20:50:74:79:20:4C:74:64
+       95   [1,1,  24]  . . . . . . . DEFINED BY (2.5.4.10): PrintableString PrintableString Internet Widgits Pty Ltd
+
+:ref:`Read more <definedby>` about that feature.
index e6afdd3a96687d2d36a207474fd5d74a43be60e6..d2d1734eb779b6545132b698bf65344d281f66d9 100644 (file)
@@ -4,7 +4,7 @@ Features
 * Basic ASN.1 data types (X.208): BOOLEAN, INTEGER, BIT STRING, OCTET
   STRING, NULL, OBJECT IDENTIFIER, ENUMERATED, all strings, UTCTime,
   GeneralizedTime, CHOICE, ANY, SEQUENCE (OF), SET (OF)
 * Basic ASN.1 data types (X.208): BOOLEAN, INTEGER, BIT STRING, OCTET
   STRING, NULL, OBJECT IDENTIFIER, ENUMERATED, all strings, UTCTime,
   GeneralizedTime, CHOICE, ANY, SEQUENCE (OF), SET (OF)
-* Size constraints checking
+* Size :ref:`constraints <bounds>` checking
 * Working with sequences as high level data objects with ability to
   (un)marshall them
 * Python 2.7/3.5 compatibility
 * Working with sequences as high level data objects with ability to
   (un)marshall them
 * Python 2.7/3.5 compatibility
@@ -15,14 +15,17 @@ practice it should be relatively easy to convert ``pyasn1``'s code to
 ``pyderasn``'s one. But additionally it offers:
 
 * Small, simple and trying to be reviewable code. Just a single file
 ``pyderasn``'s one. But additionally it offers:
 
 * Small, simple and trying to be reviewable code. Just a single file
-* ``__slots__`` friendliness
-* Ability to know exact decoded objects offsets and lengths in the binary
-* Pretty printer and command-line decoder, that could conveniently
-  replace utilities like either ``dumpasn1`` or ``openssl asn1parse``
+* Automatic decoding of :ref:`DEFINED BY <definedby>` fields
+* Ability to know :ref:`exact decoded <decoding>` objects offsets and
+  lengths inside the binary
+* :ref:`Pretty printer <pprinting>` and command-line decoder, that could
+  conveniently replace utilities like either ``dumpasn1`` or
+  ``openssl asn1parse``
 * Some kind of strong typing: SEQUENCEs require the exact **type** of
   settable values, even when they are inherited
 * However they do not require tags matching: IMPLICIT/EXPLICIT tags will
   be set automatically in the given sequence
 * Some kind of strong typing: SEQUENCEs require the exact **type** of
   settable values, even when they are inherited
 * However they do not require tags matching: IMPLICIT/EXPLICIT tags will
   be set automatically in the given sequence
+* ``__slots__`` friendliness
 * Could be significantly faster. For example parsing of CACert.org's CRL
   under Python 3.5.2:
 
 * Could be significantly faster. For example parsing of CACert.org's CRL
   under Python 3.5.2:
 
index c80cf9b8f88b100473d4aee2f7d4d9d5be4adcf7..bd71b82769d5339baeaafca6f249ba5dd9a8d6c9 100644 (file)
@@ -1,6 +1,13 @@
 News
 ====
 
 News
 ====
 
+.. _release1.4:
+
+1.4
+---
+Ability to automatically decode :ref:`DEFINED BY <definedby>` fields
+inside SEQUENCEs.
+
 .. _release1.3:
 
 1.3
 .. _release1.3:
 
 1.3
index 32ce73c664a72155d64844ac42ef434a7ce1a709..56eec8d7953e62d6b63295b80795d1dd148d8aed 100755 (executable)
@@ -135,6 +135,8 @@ example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
 When default argument is used and value is not specified, then it equals
 to default one.
 
 When default argument is used and value is not specified, then it equals
 to default one.
 
+.. _bounds:
+
 Size constraints
 ________________
 
 Size constraints
 ________________
 
@@ -164,8 +166,10 @@ then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
 All objects have ``copy()`` method, returning its copy, that can be safely
 mutated.
 
 All objects have ``copy()`` method, returning its copy, that can be safely
 mutated.
 
+.. _decoding:
+
 Decoding
 Decoding
-________
+--------
 
 Decoding is performed using ``decode()`` method. ``offset`` optional
 argument could be used to set initial object's offset in the binary
 
 Decoding is performed using ``decode()`` method. ``offset`` optional
 argument could be used to set initial object's offset in the binary
@@ -191,8 +195,10 @@ lesser than ``offset``), ``expl_tlen``, ``expl_llen``, ``expl_vlen``
 
 When error occurs, then :py:exc:`pyderasn.DecodeError` is raised.
 
 
 When error occurs, then :py:exc:`pyderasn.DecodeError` is raised.
 
+.. _pprinting:
+
 Pretty printing
 Pretty printing
-_______________
+---------------
 
 All objects have ``pps()`` method, that is a generator of
 :py:class:`pyderasn.PP` namedtuple, holding various raw information
 
 All objects have ``pps()`` method, that is a generator of
 :py:class:`pyderasn.PP` namedtuple, holding various raw information
@@ -209,6 +215,116 @@ all object ``repr``. But it is easy to write custom formatters.
     >>> print(pprint(obj))
         0   [1,1,   2] INTEGER -12345
 
     >>> print(pprint(obj))
         0   [1,1,   2] INTEGER -12345
 
+.. _definedby:
+
+DEFINED BY
+----------
+
+ASN.1 structures often have ANY and OCTET STRING fields, that are
+DEFINED BY some previously met ObjectIdentifier. This library provides
+ability to specify mapping between some OID and field that must be
+decoded with specific specification.
+
+defines kwarg
+_____________
+
+:py:class:`pyderasn.ObjectIdentifier` field inside
+:py:class:`pyderasn.Sequence` can hold mapping between OIDs and
+necessary for decoding structrures. For example, CMS (:rfc:`5652`)
+container::
+
+    class ContentInfo(Sequence):
+        schema = (
+            ("contentType", ContentType(defines=("content", {
+                id_digestedData: DigestedData(),
+                id_signedData: SignedData(),
+            }))),
+            ("content", Any(expl=tag_ctxc(0))),
+        )
+
+``contentType`` field tells that it defines that ``content`` must be
+decoded with ``SignedData`` specification, if ``contentType`` equals to
+``id-signedData``. The same applies to ``DigestedData``. If
+``contentType`` contains unknown OID, then no automatic decoding is
+done.
+
+Following types can be automatically decoded (DEFINED BY):
+
+* :py:class:`pyderasn.Any`
+* :py:class:`pyderasn.OctetString`
+* :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
+  ``Any``/``OctetString``-s
+
+When any of those fields is automatically decoded, then ``.defined``
+attribute contains ``(OID, value)`` tuple. OID tell by which OID it was
+defined, ``value`` contains corresponding decoded value. For example
+above, ``content_info["content"].defined == (id_signedData,
+signed_data)``.
+
+defines_by_path kwarg
+_____________________
+
+Sometimes you either can not or do not want to explicitly set *defines*
+in the scheme. You can dynamically apply those definitions when calling
+``.decode()`` method.
+
+Decode method takes optional ``defines_by_path`` keyword argument that
+must be sequence of following tuples::
+
+    (decode_path, defines)
+
+where ``decode_path`` is a tuple holding so-called decode path to the
+exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
+``defines``, holding exactly the same value as accepted in its keyword
+argument.
+
+For example, again for CMS, you want to automatically decode
+``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
+structures it may hold. Also, automatically decode ``controlSequence``
+of ``PKIResponse``::
+
+    content_info, tail = ContentInfo().decode(data, defines_by_path=(
+        (
+            ("contentType",),
+            ("content", {id_signedData: SignedData()}),
+        ),
+        (
+            (
+                "content",
+                decode_path_defby(id_signedData),
+                "encapContentInfo",
+                "eContentType",
+            ),
+            ("eContent", {
+                id_cct_PKIData: PKIData(),
+                id_cct_PKIResponse: PKIResponse(),
+            }),
+        ),
+        (
+            (
+                "content",
+                decode_path_defby(id_signedData),
+                "encapContentInfo",
+                "eContent",
+                decode_path_defby(id_cct_PKIResponse),
+                "controlSequence",
+                any,
+                "attrType",
+            ),
+            ("attrValues", {
+                id_cmc_recipientNonce: RecipientNonce(),
+                id_cmc_senderNonce: SenderNonce(),
+                id_cmc_statusInfoV2: CMCStatusInfoV2(),
+                id_cmc_transactionId: TransactionId(),
+            }),
+        ),
+    ))
+
+Pay attention for :py:func:`pyderasn.decode_path_defby` and ``any``.
+First function is useful for path construction when some automatic
+decoding is already done. ``any`` is used for human readability and
+means literally any value it meet -- useful for sequence and set of-s.
+
 Primitive types
 ---------------
 
 Primitive types
 ---------------
 
@@ -338,6 +454,7 @@ __all__ = (
     "Boolean",
     "BoundsError",
     "Choice",
     "Boolean",
     "BoundsError",
     "Choice",
+    "decode_path_defby",
     "DecodeError",
     "Enumerated",
     "GeneralizedTime",
     "DecodeError",
     "Enumerated",
     "GeneralizedTime",
@@ -747,7 +864,7 @@ class Obj(object):
     def _encode(self):  # pragma: no cover
         raise NotImplementedError()
 
     def _encode(self):  # pragma: no cover
         raise NotImplementedError()
 
-    def _decode(self, tlv, offset=0, decode_path=()):  # pragma: no cover
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):  # pragma: no cover
         raise NotImplementedError()
 
     def encode(self):
         raise NotImplementedError()
 
     def encode(self):
@@ -756,13 +873,14 @@ class Obj(object):
             return raw
         return b"".join((self._expl, len_encode(len(raw)), raw))
 
             return raw
         return b"".join((self._expl, len_encode(len(raw)), raw))
 
-    def decode(self, data, offset=0, leavemm=False, decode_path=()):
+    def decode(self, data, offset=0, leavemm=False, decode_path=(), defines_by_path=None):
         """Decode the data
 
         :param data: either binary or memoryview
         :param int offset: initial data's offset
         :param bool leavemm: do we need to leave memoryview of remaining
                     data as is, or convert it to bytes otherwise
         """Decode the data
 
         :param data: either binary or memoryview
         :param int offset: initial data's offset
         :param bool leavemm: do we need to leave memoryview of remaining
                     data as is, or convert it to bytes otherwise
+        :param defines_by_path: :ref:`Read about DEFINED BY <definedby>`
         :returns: (Obj, remaining data)
         """
         tlv = memoryview(data)
         :returns: (Obj, remaining data)
         """
         tlv = memoryview(data)
@@ -771,6 +889,7 @@ class Obj(object):
                 tlv,
                 offset,
                 decode_path=decode_path,
                 tlv,
                 offset,
                 decode_path=decode_path,
+                defines_by_path=defines_by_path,
             )
         else:
             try:
             )
         else:
             try:
@@ -808,6 +927,7 @@ class Obj(object):
                 v,
                 offset=offset + tlen + llen,
                 decode_path=decode_path,
                 v,
                 offset=offset + tlen + llen,
                 decode_path=decode_path,
+                defines_by_path=defines_by_path,
             )
         return obj, (tail if leavemm else tail.tobytes())
 
             )
         return obj, (tail if leavemm else tail.tobytes())
 
@@ -840,6 +960,12 @@ class Obj(object):
         return self.expl_tlen + self.expl_llen + self.expl_vlen
 
 
         return self.expl_tlen + self.expl_llen + self.expl_vlen
 
 
+def decode_path_defby(defined_by):
+    """DEFINED BY representation inside decode path
+    """
+    return "DEFINED BY (%s)" % defined_by
+
+
 ########################################################################
 # Pretty printing
 ########################################################################
 ########################################################################
 # Pretty printing
 ########################################################################
@@ -1109,7 +1235,7 @@ class Boolean(Obj):
             (b"\xFF" if self._value else b"\x00"),
         ))
 
             (b"\xFF" if self._value else b"\x00"),
         ))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -1402,7 +1528,7 @@ class Integer(Obj):
                     break
         return b"".join((self.tag, len_encode(len(octets)), octets))
 
                     break
         return b"".join((self.tag, len_encode(len(octets)), octets))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -1735,7 +1861,7 @@ class BitString(Obj):
             octets,
         ))
 
             octets,
         ))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -1856,7 +1982,7 @@ class OctetString(Obj):
     >>> OctetString(b"hell", bounds=(4, 4))
     OCTET STRING 4 bytes 68656c6c
     """
     >>> OctetString(b"hell", bounds=(4, 4))
     OCTET STRING 4 bytes 68656c6c
     """
-    __slots__ = ("_bound_min", "_bound_max")
+    __slots__ = ("_bound_min", "_bound_max", "defined")
     tag_default = tag_encode(4)
     asn1_type_name = "OCTET STRING"
 
     tag_default = tag_encode(4)
     asn1_type_name = "OCTET STRING"
 
@@ -1904,6 +2030,7 @@ class OctetString(Obj):
             )
             if self._value is None:
                 self._value = default
             )
             if self._value is None:
                 self._value = default
+        self.defined = None
 
     def _value_sanitize(self, value):
         if issubclass(value.__class__, OctetString):
 
     def _value_sanitize(self, value):
         if issubclass(value.__class__, OctetString):
@@ -1981,7 +2108,7 @@ class OctetString(Obj):
             self._value,
         ))
 
             self._value,
         ))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -2056,6 +2183,11 @@ class OctetString(Obj):
             expl_llen=self.expl_llen if self.expled else None,
             expl_vlen=self.expl_vlen if self.expled else None,
         )
             expl_llen=self.expl_llen if self.expled else None,
             expl_vlen=self.expl_vlen if self.expled else None,
         )
+        defined_by, defined = self.defined or (None, None)
+        if defined_by is not None:
+            yield defined.pps(
+                decode_path=decode_path + (decode_path_defby(defined_by),)
+            )
 
 
 class Null(Obj):
 
 
 class Null(Obj):
@@ -2125,7 +2257,7 @@ class Null(Obj):
     def _encode(self):
         return self.tag + len_encode(0)
 
     def _encode(self):
         return self.tag + len_encode(0)
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -2205,13 +2337,14 @@ class ObjectIdentifier(Obj):
     Traceback (most recent call last):
     pyderasn.InvalidOID: unacceptable first arc value
     """
     Traceback (most recent call last):
     pyderasn.InvalidOID: unacceptable first arc value
     """
-    __slots__ = ()
+    __slots__ = ("defines",)
     tag_default = tag_encode(6)
     asn1_type_name = "OBJECT IDENTIFIER"
 
     def __init__(
             self,
             value=None,
     tag_default = tag_encode(6)
     asn1_type_name = "OBJECT IDENTIFIER"
 
     def __init__(
             self,
             value=None,
+            defines=None,
             impl=None,
             expl=None,
             default=None,
             impl=None,
             expl=None,
             default=None,
@@ -2222,6 +2355,13 @@ class ObjectIdentifier(Obj):
         :param value: set the value. Either tuples of integers,
                       string of "."-concatenated integers, or
                       :py:class:`pyderasn.ObjectIdentifier` object
         :param value: set the value. Either tuples of integers,
                       string of "."-concatenated integers, or
                       :py:class:`pyderasn.ObjectIdentifier` object
+        :param defines: tuple of two elements. First one is a name of
+                        field inside :py:class:`pyderasn.Sequence`,
+                        defining with that OID. Second element is a
+                        ``{OID: pyderasn.Obj()}`` dictionary, mapping
+                        between current OID value and structure applied
+                        to defined field.
+                        :ref:`Read about DEFINED BY <definedby>`
         :param bytes impl: override default tag with ``IMPLICIT`` one
         :param bytes expl: override default tag with ``EXPLICIT`` one
         :param default: set default value. Type same as in ``value``
         :param bytes impl: override default tag with ``IMPLICIT`` one
         :param bytes expl: override default tag with ``EXPLICIT`` one
         :param default: set default value. Type same as in ``value``
@@ -2246,6 +2386,7 @@ class ObjectIdentifier(Obj):
             )
             if self._value is None:
                 self._value = default
             )
             if self._value is None:
                 self._value = default
+        self.defines = defines
 
     def __add__(self, their):
         if isinstance(their, self.__class__):
 
     def __add__(self, their):
         if isinstance(their, self.__class__):
@@ -2283,6 +2424,7 @@ class ObjectIdentifier(Obj):
     def copy(self):
         obj = self.__class__()
         obj._value = self._value
     def copy(self):
         obj = self.__class__()
         obj._value = self._value
+        obj.defines = self.defines
         obj.tag = self.tag
         obj._expl = self._expl
         obj.default = self.default
         obj.tag = self.tag
         obj._expl = self._expl
         obj.default = self.default
@@ -2324,6 +2466,7 @@ class ObjectIdentifier(Obj):
     def __call__(
             self,
             value=None,
     def __call__(
             self,
             value=None,
+            defines=None,
             impl=None,
             expl=None,
             default=None,
             impl=None,
             expl=None,
             default=None,
@@ -2331,6 +2474,7 @@ class ObjectIdentifier(Obj):
     ):
         return self.__class__(
             value=value,
     ):
         return self.__class__(
             value=value,
+            defines=self.defines if defines is None else defines,
             impl=self.tag if impl is None else impl,
             expl=self._expl if expl is None else expl,
             default=self.default if default is None else default,
             impl=self.tag if impl is None else impl,
             expl=self._expl if expl is None else expl,
             default=self.default if default is None else default,
@@ -2356,7 +2500,7 @@ class ObjectIdentifier(Obj):
         v = b"".join(octets)
         return b"".join((self.tag, len_encode(len(v)), v))
 
         v = b"".join(octets)
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3123,7 +3267,7 @@ class Choice(Obj):
         self._assert_ready()
         return self._value[1].encode()
 
         self._assert_ready()
         return self._value[1].encode()
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         for choice, spec in self.specs.items():
             try:
                 value, tail = spec.decode(
         for choice, spec in self.specs.items():
             try:
                 value, tail = spec.decode(
@@ -3131,6 +3275,7 @@ class Choice(Obj):
                     offset=offset,
                     leavemm=True,
                     decode_path=decode_path + (choice,),
                     offset=offset,
                     leavemm=True,
                     decode_path=decode_path + (choice,),
+                    defines_by_path=defines_by_path,
                 )
             except TagMismatch:
                 continue
                 )
             except TagMismatch:
                 continue
@@ -3219,7 +3364,7 @@ class Any(Obj):
     >>> hexenc(bytes(a))
     b'0x040x0bhello world'
     """
     >>> hexenc(bytes(a))
     b'0x040x0bhello world'
     """
-    __slots__ = ()
+    __slots__ = ("defined",)
     tag_default = tag_encode(0)
     asn1_type_name = "ANY"
 
     tag_default = tag_encode(0)
     asn1_type_name = "ANY"
 
@@ -3240,6 +3385,7 @@ class Any(Obj):
         """
         super(Any, self).__init__(None, expl, None, optional, _decoded)
         self._value = None if value is None else self._value_sanitize(value)
         """
         super(Any, self).__init__(None, expl, None, optional, _decoded)
         self._value = None if value is None else self._value_sanitize(value)
+        self.defined = None
 
     def _value_sanitize(self, value):
         if isinstance(value, self.__class__):
 
     def _value_sanitize(self, value):
         if isinstance(value, self.__class__):
@@ -3296,7 +3442,7 @@ class Any(Obj):
         self._assert_ready()
         return self._value
 
         self._assert_ready()
         return self._value
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, tlen, lv = tag_strip(tlv)
             l, llen, v = len_decode(lv)
         try:
             t, tlen, lv = tag_strip(tlv)
             l, llen, v = len_decode(lv)
@@ -3347,12 +3493,30 @@ class Any(Obj):
             expl_llen=self.expl_llen if self.expled else None,
             expl_vlen=self.expl_vlen if self.expled else None,
         )
             expl_llen=self.expl_llen if self.expled else None,
             expl_vlen=self.expl_vlen if self.expled else None,
         )
+        defined_by, defined = self.defined or (None, None)
+        if defined_by is not None:
+            yield defined.pps(
+                decode_path=decode_path + (decode_path_defby(defined_by),)
+            )
 
 
 ########################################################################
 # ASN.1 constructed types
 ########################################################################
 
 
 
 ########################################################################
 # ASN.1 constructed types
 ########################################################################
 
+def get_def_by_path(defines_by_path, sub_decode_path):
+    """Get define by decode path
+    """
+    for path, define in defines_by_path:
+        if len(path) != len(sub_decode_path):
+            continue
+        for p1, p2 in zip(path, sub_decode_path):
+            if (p1 != any) and (p1 != p2):
+                break
+        else:
+            return define
+
+
 class Sequence(Obj):
     """``SEQUENCE`` structure type
 
 class Sequence(Obj):
     """``SEQUENCE`` structure type
 
@@ -3566,7 +3730,7 @@ class Sequence(Obj):
         v = b"".join(self._encoded_values())
         return b"".join((self.tag, len_encode(len(v)), v))
 
         v = b"".join(self._encoded_values())
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3601,20 +3765,65 @@ class Sequence(Obj):
         v, tail = v[:l], v[l:]
         sub_offset = offset + tlen + llen
         values = {}
         v, tail = v[:l], v[l:]
         sub_offset = offset + tlen + llen
         values = {}
+        defines = {}
         for name, spec in self.specs.items():
             if len(v) == 0 and spec.optional:
                 continue
         for name, spec in self.specs.items():
             if len(v) == 0 and spec.optional:
                 continue
+            sub_decode_path = decode_path + (name,)
             try:
                 value, v_tail = spec.decode(
                     v,
                     sub_offset,
                     leavemm=True,
             try:
                 value, v_tail = spec.decode(
                     v,
                     sub_offset,
                     leavemm=True,
-                    decode_path=decode_path + (name,),
+                    decode_path=sub_decode_path,
+                    defines_by_path=defines_by_path,
                 )
             except TagMismatch:
                 if spec.optional:
                     continue
                 raise
                 )
             except TagMismatch:
                 if spec.optional:
                     continue
                 raise
+
+            defined = defines.pop(name, None)
+            if defined is not None:
+                defined_by, defined_spec = defined
+                if issubclass(value.__class__, SequenceOf):
+                    for i, _value in enumerate(value):
+                        sub_sub_decode_path = sub_decode_path + (
+                            str(i),
+                            decode_path_defby(defined_by),
+                        )
+                        defined_value, defined_tail = defined_spec.decode(
+                            memoryview(bytes(_value)),
+                            sub_offset + value.tlen + value.llen,
+                            leavemm=True,
+                            decode_path=sub_sub_decode_path,
+                            defines_by_path=defines_by_path,
+                        )
+                        if len(defined_tail) > 0:
+                            raise DecodeError(
+                                "remaining data",
+                                klass=self.__class__,
+                                decode_path=sub_sub_decode_path,
+                                offset=offset,
+                            )
+                        _value.defined = (defined_by, defined_value)
+                else:
+                    defined_value, defined_tail = defined_spec.decode(
+                        memoryview(bytes(value)),
+                        sub_offset + value.tlen + value.llen,
+                        leavemm=True,
+                        decode_path=sub_decode_path + (decode_path_defby(defined_by),),
+                        defines_by_path=defines_by_path,
+                    )
+                    if len(defined_tail) > 0:
+                        raise DecodeError(
+                            "remaining data",
+                            klass=self.__class__,
+                            decode_path=sub_decode_path + (decode_path_defby(defined_by),),
+                            offset=offset,
+                        )
+                    value.defined = (defined_by, defined_value)
+
             sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
             v = v_tail
             if spec.default is not None and value == spec.default:
             sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
             v = v_tail
             if spec.default is not None and value == spec.default:
@@ -3622,6 +3831,15 @@ class Sequence(Obj):
                 # but we allow that anyway
                 continue
             values[name] = value
                 # but we allow that anyway
                 continue
             values[name] = value
+
+            spec_defines = getattr(spec, "defines", None)
+            if defines_by_path is not None and spec_defines is None:
+                spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
+            if spec_defines is not None:
+                what, schema = spec_defines
+                defined = schema.get(value, None)
+                if defined is not None:
+                    defines[what] = (value, defined)
         if len(v) > 0:
             raise DecodeError(
                 "remaining data",
         if len(v) > 0:
             raise DecodeError(
                 "remaining data",
@@ -3690,7 +3908,7 @@ class Set(Sequence):
         v = b"".join(raws)
         return b"".join((self.tag, len_encode(len(v)), v))
 
         v = b"".join(raws)
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3733,6 +3951,7 @@ class Set(Sequence):
                         sub_offset,
                         leavemm=True,
                         decode_path=decode_path + (name,),
                         sub_offset,
                         leavemm=True,
                         decode_path=decode_path + (name,),
+                        defines_by_path=defines_by_path,
                     )
                 except TagMismatch:
                     continue
                     )
                 except TagMismatch:
                     continue
@@ -3942,7 +4161,7 @@ class SequenceOf(Obj):
         v = b"".join(self._encoded_values())
         return b"".join((self.tag, len_encode(len(v)), v))
 
         v = b"".join(self._encoded_values())
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=()):
+    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3984,6 +4203,7 @@ class SequenceOf(Obj):
                 sub_offset,
                 leavemm=True,
                 decode_path=decode_path + (str(len(_value)),),
                 sub_offset,
                 leavemm=True,
                 decode_path=decode_path + (str(len(_value)),),
+                defines_by_path=defines_by_path,
             )
             sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
             v = v_tail
             )
             sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
             v = v_tail
index 5a839eb4d0059a1edf7d22056354226a9cd576ea..a096412333df3248740d3a467ba2eb43d47aa3d7 100644 (file)
@@ -37,6 +37,7 @@ from pyderasn import SequenceOf
 from pyderasn import SetOf
 from pyderasn import tag_ctxc
 from pyderasn import tag_ctxp
 from pyderasn import SetOf
 from pyderasn import tag_ctxc
 from pyderasn import tag_ctxp
+from pyderasn import TeletexString
 from pyderasn import UTCTime
 
 
 from pyderasn import UTCTime
 
 
@@ -88,9 +89,22 @@ class AttributeValue(Any):
     pass
 
 
     pass
 
 
+class OrganizationName(Choice):
+    schema = (
+        ('printableString', PrintableString()),
+        ('teletexString', TeletexString()),
+    )
+
+
 class AttributeTypeAndValue(Sequence):
     schema = (
 class AttributeTypeAndValue(Sequence):
     schema = (
-        ("type", AttributeType()),
+        ("type", AttributeType(defines=("value", {
+            ObjectIdentifier("2.5.4.6"): PrintableString(),
+            ObjectIdentifier("2.5.4.8"): PrintableString(),
+            ObjectIdentifier("2.5.4.7"): PrintableString(),
+            ObjectIdentifier("2.5.4.10"): OrganizationName(),
+            ObjectIdentifier("2.5.4.3"): PrintableString(),
+        }))),
         ("value", AttributeValue()),
     )
 
         ("value", AttributeValue()),
     )
 
index fdc03b58369cb8bcd7607f5a9a5c4d7b56661ea8..23127261feb9175309eea0f8b6e41356f0e7cf56 100644 (file)
@@ -56,6 +56,7 @@ from pyderasn import BMPString
 from pyderasn import Boolean
 from pyderasn import BoundsError
 from pyderasn import Choice
 from pyderasn import Boolean
 from pyderasn import BoundsError
 from pyderasn import Choice
+from pyderasn import decode_path_defby
 from pyderasn import DecodeError
 from pyderasn import Enumerated
 from pyderasn import GeneralizedTime
 from pyderasn import DecodeError
 from pyderasn import Enumerated
 from pyderasn import GeneralizedTime
@@ -85,6 +86,7 @@ from pyderasn import SequenceOf
 from pyderasn import Set
 from pyderasn import SetOf
 from pyderasn import tag_ctxc
 from pyderasn import Set
 from pyderasn import SetOf
 from pyderasn import tag_ctxc
+from pyderasn import tag_ctxp
 from pyderasn import tag_decode
 from pyderasn import tag_encode
 from pyderasn import tag_strip
 from pyderasn import tag_decode
 from pyderasn import tag_encode
 from pyderasn import tag_strip
@@ -1953,12 +1955,12 @@ class TestObjectIdentifier(CommonMixin, TestCase):
                 _decoded_initial,
             ) = d.draw(oid_values_strategy())
             obj_initial = klass(
                 _decoded_initial,
             ) = d.draw(oid_values_strategy())
             obj_initial = klass(
-                value_initial,
-                impl_initial,
-                expl_initial,
-                default_initial,
-                optional_initial or False,
-                _decoded_initial,
+                value=value_initial,
+                impl=impl_initial,
+                expl=expl_initial,
+                default=default_initial,
+                optional=optional_initial or False,
+                _decoded=_decoded_initial,
             )
             (
                 value,
             )
             (
                 value,
@@ -1968,7 +1970,13 @@ class TestObjectIdentifier(CommonMixin, TestCase):
                 optional,
                 _decoded,
             ) = d.draw(oid_values_strategy(do_expl=impl_initial is None))
                 optional,
                 _decoded,
             ) = d.draw(oid_values_strategy(do_expl=impl_initial is None))
-            obj = obj_initial(value, impl, expl, default, optional)
+            obj = obj_initial(
+                value=value,
+                impl=impl,
+                expl=expl,
+                default=default,
+                optional=optional,
+            )
             if obj.ready:
                 value_expected = default if value is None else value
                 value_expected = (
             if obj.ready:
                 value_expected = default if value is None else value
                 value_expected = (
@@ -1992,7 +2000,22 @@ class TestObjectIdentifier(CommonMixin, TestCase):
     @given(oid_values_strategy())
     def test_copy(self, values):
         for klass in (ObjectIdentifier, ObjectIdentifierInherited):
     @given(oid_values_strategy())
     def test_copy(self, values):
         for klass in (ObjectIdentifier, ObjectIdentifierInherited):
-            obj = klass(*values)
+            (
+                value,
+                impl,
+                expl,
+                default,
+                optional,
+                _decoded,
+            ) = values
+            obj = klass(
+                value=value,
+                impl=impl,
+                expl=expl,
+                default=default,
+                optional=optional,
+                _decoded=_decoded,
+            )
             obj_copied = obj.copy()
             self.assert_copied_basic_fields(obj, obj_copied)
             self.assertEqual(obj._value, obj_copied._value)
             obj_copied = obj.copy()
             self.assert_copied_basic_fields(obj, obj_copied)
             self.assertEqual(obj._value, obj_copied._value)
@@ -4861,3 +4884,163 @@ class TestAutoAddSlots(TestCase):
         with self.assertRaises(AttributeError):
             inher = Inher()
             inher.unexistent = "whatever"
         with self.assertRaises(AttributeError):
             inher = Inher()
             inher.unexistent = "whatever"
+
+
+class TestOIDDefines(TestCase):
+    @given(data_strategy())
+    def runTest(self, d):
+        value_names = list(d.draw(sets(text_letters(), min_size=1, max_size=10)))
+        value_name_chosen = d.draw(sampled_from(value_names))
+        oids = [
+            ObjectIdentifier(oid)
+            for oid in d.draw(sets(oid_strategy(), min_size=2, max_size=10))
+        ]
+        oid_chosen = d.draw(sampled_from(oids))
+        values = d.draw(lists(
+            integers(),
+            min_size=len(value_names),
+            max_size=len(value_names),
+        ))
+        _schema = [
+            ("type", ObjectIdentifier(defines=(value_name_chosen, {
+                oid: Integer() for oid in oids[:-1]
+            }))),
+        ]
+        for i, value_name in enumerate(value_names):
+            _schema.append((value_name, Any(expl=tag_ctxp(i))))
+
+        class Seq(Sequence):
+            schema = _schema
+        seq = Seq()
+        for value_name, value in zip(value_names, values):
+            seq[value_name] = Any(Integer(value).encode())
+        seq["type"] = oid_chosen
+        seq, _ = Seq().decode(seq.encode())
+        for value_name in value_names:
+            if value_name == value_name_chosen:
+                continue
+            self.assertIsNone(seq[value_name].defined)
+        if value_name_chosen in oids[:-1]:
+            self.assertIsNotNone(seq[value_name_chosen].defined)
+            self.assertEqual(seq[value_name_chosen].defined[0], oid_chosen)
+            self.assertIsInstance(seq[value_name_chosen].defined[1], Integer)
+
+
+class TestDefinesByPath(TestCase):
+    def runTest(self):
+        class Seq(Sequence):
+            schema = (
+                ("type", ObjectIdentifier()),
+                ("value", OctetString(expl=tag_ctxc(123))),
+            )
+
+        class SeqInner(Sequence):
+            schema = (
+                ("typeInner", ObjectIdentifier()),
+                ("valueInner", Any()),
+            )
+
+        class PairValue(SetOf):
+            schema = Any()
+
+        class Pair(Sequence):
+            schema = (
+                ("type", ObjectIdentifier()),
+                ("value", PairValue()),
+            )
+
+        class Pairs(SequenceOf):
+            schema = Pair()
+
+        (
+            type_integered,
+            type_sequenced,
+            type_innered,
+            type_octet_stringed,
+        ) = [
+            ObjectIdentifier(oid)
+            for oid in sets(oid_strategy(), min_size=4, max_size=4).example()
+        ]
+        seq_integered = Seq()
+        seq_integered["type"] = type_integered
+        seq_integered["value"] = OctetString(Integer(123).encode())
+        seq_integered_raw = seq_integered.encode()
+
+        pairs = Pairs()
+        pairs_input = (
+            (type_octet_stringed, OctetString(b"whatever")),
+            (type_integered, Integer(123)),
+            (type_octet_stringed, OctetString(b"whenever")),
+            (type_integered, Integer(234)),
+        )
+        for t, v in pairs_input:
+            pair = Pair()
+            pair["type"] = t
+            pair["value"] = PairValue((Any(v),))
+            pairs.append(pair)
+        seq_inner = SeqInner()
+        seq_inner["typeInner"] = type_innered
+        seq_inner["valueInner"] = Any(pairs)
+        seq_sequenced = Seq()
+        seq_sequenced["type"] = type_sequenced
+        seq_sequenced["value"] = OctetString(seq_inner.encode())
+        seq_sequenced_raw = seq_sequenced.encode()
+
+        defines_by_path = []
+        seq_integered, _ = Seq().decode(seq_integered_raw)
+        self.assertIsNone(seq_integered["value"].defined)
+        defines_by_path.append(
+            (("type",), ("value", {
+                type_integered: Integer(),
+                type_sequenced: SeqInner(),
+            }))
+        )
+        seq_integered, _ = Seq().decode(seq_integered_raw, defines_by_path=defines_by_path)
+        self.assertIsNotNone(seq_integered["value"].defined)
+        self.assertEqual(seq_integered["value"].defined[0], type_integered)
+        self.assertEqual(seq_integered["value"].defined[1], Integer(123))
+
+        seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+        self.assertIsNotNone(seq_sequenced["value"].defined)
+        self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
+        seq_inner = seq_sequenced["value"].defined[1]
+        self.assertIsNone(seq_inner["valueInner"].defined)
+
+        defines_by_path.append((
+            ("value", decode_path_defby(type_sequenced), "typeInner"),
+            ("valueInner", {type_innered: Pairs()}),
+        ))
+        seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+        self.assertIsNotNone(seq_sequenced["value"].defined)
+        self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
+        seq_inner = seq_sequenced["value"].defined[1]
+        self.assertIsNotNone(seq_inner["valueInner"].defined)
+        self.assertEqual(seq_inner["valueInner"].defined[0], type_innered)
+        pairs = seq_inner["valueInner"].defined[1]
+        for pair in pairs:
+            self.assertIsNone(pair["value"][0].defined)
+
+        defines_by_path.append((
+            (
+                "value",
+                decode_path_defby(type_sequenced),
+                "valueInner",
+                decode_path_defby(type_innered),
+                any,
+                "type",
+            ),
+            ("value", {
+                type_integered: Integer(),
+                type_octet_stringed: OctetString(),
+            }),
+        ))
+        seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+        self.assertIsNotNone(seq_sequenced["value"].defined)
+        self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
+        seq_inner = seq_sequenced["value"].defined[1]
+        self.assertIsNotNone(seq_inner["valueInner"].defined)
+        self.assertEqual(seq_inner["valueInner"].defined[0], type_innered)
+        pairs_got = seq_inner["valueInner"].defined[1]
+        for pair_input, pair_got in zip(pairs_input, pairs_got):
+            self.assertEqual(pair_got["value"][0].defined[0], pair_input[0])
+            self.assertEqual(pair_got["value"][0].defined[1], pair_input[1])