]> Cypherpunks.ru repositories - pyderasn.git/blobdiff - pyderasn.py
Decode context and defines feature extending
[pyderasn.git] / pyderasn.py
index 41ed042a8424574a6fc4c1da0556fb197c8d773b..b4a10d9fc04ba14fcf91a86ce21f2490cf5c41fb 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 # coding: utf-8
 # PyDERASN -- Python ASN.1 DER codec with abstract structures
-# Copyright (C) 2017 Sergey Matveev <stargrave@stargrave.org>
+# Copyright (C) 2017-2018 Sergey Matveev <stargrave@stargrave.org>
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as
@@ -195,6 +195,19 @@ lesser than ``offset``), ``expl_tlen``, ``expl_llen``, ``expl_vlen``
 
 When error occurs, then :py:exc:`pyderasn.DecodeError` is raised.
 
+.. _ctx:
+
+Context
+_______
+
+You can specify so called context keyword argument during ``decode()``
+invocation. It is dictionary containing various options governing
+decoding process.
+
+Currently available context options:
+
+* :ref:`defines_by_path <defines_by_path_ctx>`
+
 .. _pprinting:
 
 Pretty printing
@@ -230,15 +243,15 @@ _____________
 
 :py:class:`pyderasn.ObjectIdentifier` field inside
 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
-necessary for decoding structrures. For example, CMS (:rfc:`5652`)
+necessary for decoding structures. For example, CMS (:rfc:`5652`)
 container::
 
     class ContentInfo(Sequence):
         schema = (
-            ("contentType", ContentType(defines=("content", {
+            ("contentType", ContentType(defines=((("content",), {
                 id_digestedData: DigestedData(),
                 id_signedData: SignedData(),
-            }))),
+            }),))),
             ("content", Any(expl=tag_ctxc(0))),
         )
 
@@ -248,6 +261,27 @@ decoded with ``SignedData`` specification, if ``contentType`` equals to
 ``contentType`` contains unknown OID, then no automatic decoding is
 done.
 
+You can specify multiple fields, that will be autodecoded -- that is why
+``defines`` kwarg is a sequence. You can specify defined field
+relatively or absolutely to current decode path. For example ``defines``
+for AlgorithmIdentifier of X.509's
+``tbsCertificate.subjectPublicKeyInfo.algorithm.algorithm``::
+
+        (
+            (('parameters',), {
+                id_ecPublicKey: ECParameters(),
+                id_GostR3410_2001: GostR34102001PublicKeyParameters(),
+            }),
+            (('..', 'subjectPublicKey'), {
+                id_rsaEncryption: RSAPublicKey(),
+                id_GostR3410_2001: OctetString(),
+            }),
+        ),
+
+tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
+autodecode its parameters inside SPKI's algorithm and its public key
+itself.
+
 Following types can be automatically decoded (DEFINED BY):
 
 * :py:class:`pyderasn.Any`
@@ -262,17 +296,17 @@ was defined, ``value`` contains corresponding decoded value. For example
 above, ``content_info["content"].defined == (id_signedData,
 signed_data)``.
 
-.. _defines_by_path_kwarg:
+.. _defines_by_path_ctx:
 
-defines_by_path kwarg
-_____________________
+defines_by_path context option
+______________________________
 
 Sometimes you either can not or do not want to explicitly set *defines*
 in the scheme. You can dynamically apply those definitions when calling
 ``.decode()`` method.
 
-Decode method takes optional ``defines_by_path`` keyword argument that
-must be sequence of following tuples::
+Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
+value must be sequence of following tuples::
 
     (decode_path, defines)
 
@@ -289,7 +323,7 @@ of ``PKIResponse``::
     content_info, tail = ContentInfo().decode(data, defines_by_path=(
         (
             ("contentType",),
-            ("content", {id_signedData: SignedData()}),
+            ((("content",), {id_signedData: SignedData()}),),
         ),
         (
             (
@@ -298,10 +332,10 @@ of ``PKIResponse``::
                 "encapContentInfo",
                 "eContentType",
             ),
-            ("eContent", {
+            ((("eContent",), {
                 id_cct_PKIData: PKIData(),
                 id_cct_PKIResponse: PKIResponse(),
-            }),
+            })),
         ),
         (
             (
@@ -314,12 +348,12 @@ of ``PKIResponse``::
                 any,
                 "attrType",
             ),
-            ("attrValues", {
+            ((("attrValues",), {
                 id_cmc_recipientNonce: RecipientNonce(),
                 id_cmc_senderNonce: SenderNonce(),
                 id_cmc_statusInfoV2: CMCStatusInfoV2(),
                 id_cmc_transactionId: TransactionId(),
-            }),
+            })),
         ),
     ))
 
@@ -421,6 +455,7 @@ _____
 Various
 -------
 
+.. autofunction:: pyderasn.abs_decode_path
 .. autofunction:: pyderasn.hexenc
 .. autofunction:: pyderasn.hexdec
 .. autofunction:: pyderasn.tag_encode
@@ -867,7 +902,7 @@ class Obj(object):
     def _encode(self):  # pragma: no cover
         raise NotImplementedError()
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):  # pragma: no cover
+    def _decode(self, tlv, offset, decode_path, ctx):  # pragma: no cover
         raise NotImplementedError()
 
     def encode(self):
@@ -876,23 +911,25 @@ class Obj(object):
             return raw
         return b"".join((self._expl, len_encode(len(raw)), raw))
 
-    def decode(self, data, offset=0, leavemm=False, decode_path=(), defines_by_path=None):
+    def decode(self, data, offset=0, leavemm=False, decode_path=(), ctx=None):
         """Decode the data
 
         :param data: either binary or memoryview
         :param int offset: initial data's offset
         :param bool leavemm: do we need to leave memoryview of remaining
                     data as is, or convert it to bytes otherwise
-        :param defines_by_path: :ref:`Read about DEFINED BY <definedby>`
+        :param ctx: optional :ref:`context <ctx>` governing decoding process.
         :returns: (Obj, remaining data)
         """
+        if ctx is None:
+            ctx = {}
         tlv = memoryview(data)
         if self._expl is None:
             obj, tail = self._decode(
                 tlv,
                 offset,
                 decode_path=decode_path,
-                defines_by_path=defines_by_path,
+                ctx=ctx,
             )
         else:
             try:
@@ -930,7 +967,7 @@ class Obj(object):
                 v,
                 offset=offset + tlen + llen,
                 decode_path=decode_path,
-                defines_by_path=defines_by_path,
+                ctx=ctx,
             )
         return obj, (tail if leavemm else tail.tobytes())
 
@@ -1238,7 +1275,7 @@ class Boolean(Obj):
             (b"\xFF" if self._value else b"\x00"),
         ))
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -1531,7 +1568,7 @@ class Integer(Obj):
                     break
         return b"".join((self.tag, len_encode(len(octets)), octets))
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -1865,7 +1902,7 @@ class BitString(Obj):
             octets,
         ))
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -2117,7 +2154,7 @@ class OctetString(Obj):
             self._value,
         ))
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -2266,7 +2303,7 @@ class Null(Obj):
     def _encode(self):
         return self.tag + len_encode(0)
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -2353,7 +2390,7 @@ class ObjectIdentifier(Obj):
     def __init__(
             self,
             value=None,
-            defines=None,
+            defines=(),
             impl=None,
             expl=None,
             default=None,
@@ -2364,12 +2401,14 @@ class ObjectIdentifier(Obj):
         :param value: set the value. Either tuples of integers,
                       string of "."-concatenated integers, or
                       :py:class:`pyderasn.ObjectIdentifier` object
-        :param defines: tuple of two elements. First one is a name of
-                        field inside :py:class:`pyderasn.Sequence`,
-                        defining with that OID. Second element is a
-                        ``{OID: pyderasn.Obj()}`` dictionary, mapping
-                        between current OID value and structure applied
-                        to defined field.
+        :param defines: sequence of tuples. Each tuple has two elements.
+                        First one is relative to current one decode
+                        path, aiming to the field defined by that OID.
+                        Read about relative path in
+                        :py:func:`pyderasn.abs_decode_path`. Second
+                        tuple element is ``{OID: pyderasn.Obj()}``
+                        dictionary, mapping between current OID value
+                        and structure applied to defined field.
                         :ref:`Read about DEFINED BY <definedby>`
         :param bytes impl: override default tag with ``IMPLICIT`` one
         :param bytes expl: override default tag with ``EXPLICIT`` one
@@ -2509,7 +2548,7 @@ class ObjectIdentifier(Obj):
         v = b"".join(octets)
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, _, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3276,7 +3315,7 @@ class Choice(Obj):
         self._assert_ready()
         return self._value[1].encode()
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         for choice, spec in self.specs.items():
             try:
                 value, tail = spec.decode(
@@ -3284,7 +3323,7 @@ class Choice(Obj):
                     offset=offset,
                     leavemm=True,
                     decode_path=decode_path + (choice,),
-                    defines_by_path=defines_by_path,
+                    ctx=ctx,
                 )
             except TagMismatch:
                 continue
@@ -3451,7 +3490,7 @@ class Any(Obj):
         self._assert_ready()
         return self._value
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, tlen, lv = tag_strip(tlv)
             l, llen, v = len_decode(lv)
@@ -3526,6 +3565,32 @@ def get_def_by_path(defines_by_path, sub_decode_path):
             return define
 
 
+def abs_decode_path(decode_path, rel_path):
+    """Create an absolute decode path from current and relative ones
+
+    :param decode_path: current decode path, starting point.
+                        Tuple of strings
+    :param rel_path: relative path to ``decode_path``. Tuple of strings.
+                     If first tuple's element is "/", then treat it as
+                     an absolute path, ignoring ``decode_path`` as
+                     starting point. Also this tuple can contain ".."
+                     elements, stripping the leading element from
+                     ``decode_path``
+
+    >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
+    ("foo", "bar", "baz", "whatever")
+    >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
+    ("foo", "whatever")
+    >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
+    ("baz", "whatever")
+    """
+    if rel_path[0] == "/":
+        return rel_path[1:]
+    if rel_path[0] == "..":
+        return abs_decode_path(decode_path[:-1], rel_path[1:])
+    return decode_path + rel_path
+
+
 class Sequence(Obj):
     """``SEQUENCE`` structure type
 
@@ -3739,7 +3804,7 @@ class Sequence(Obj):
         v = b"".join(self._encoded_values())
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3774,7 +3839,6 @@ class Sequence(Obj):
         v, tail = v[:l], v[l:]
         sub_offset = offset + tlen + llen
         values = {}
-        defines = {}
         for name, spec in self.specs.items():
             if len(v) == 0 and spec.optional:
                 continue
@@ -3785,14 +3849,14 @@ class Sequence(Obj):
                     sub_offset,
                     leavemm=True,
                     decode_path=sub_decode_path,
-                    defines_by_path=defines_by_path,
+                    ctx=ctx,
                 )
             except TagMismatch:
                 if spec.optional:
                     continue
                 raise
 
-            defined = defines.pop(name, None)
+            defined = get_def_by_path(ctx.get("defines", ()), sub_decode_path)
             if defined is not None:
                 defined_by, defined_spec = defined
                 if issubclass(value.__class__, SequenceOf):
@@ -3806,7 +3870,7 @@ class Sequence(Obj):
                             sub_offset + value.tlen + value.llen,
                             leavemm=True,
                             decode_path=sub_sub_decode_path,
-                            defines_by_path=defines_by_path,
+                            ctx=ctx,
                         )
                         if len(defined_tail) > 0:
                             raise DecodeError(
@@ -3822,7 +3886,7 @@ class Sequence(Obj):
                         sub_offset + value.tlen + value.llen,
                         leavemm=True,
                         decode_path=sub_decode_path + (decode_path_defby(defined_by),),
-                        defines_by_path=defines_by_path,
+                        ctx=ctx,
                     )
                     if len(defined_tail) > 0:
                         raise DecodeError(
@@ -3841,14 +3905,19 @@ class Sequence(Obj):
                 continue
             values[name] = value
 
-            spec_defines = getattr(spec, "defines", None)
-            if defines_by_path is not None and spec_defines is None:
-                spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
-            if spec_defines is not None:
-                what, schema = spec_defines
-                defined = schema.get(value, None)
-                if defined is not None:
-                    defines[what] = (value, defined)
+            spec_defines = getattr(spec, "defines", ())
+            if len(spec_defines) == 0:
+                defines_by_path = ctx.get("defines_by_path", ())
+                if len(defines_by_path) > 0:
+                    spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
+            if spec_defines is not None and len(spec_defines) > 0:
+                for rel_path, schema in spec_defines:
+                    defined = schema.get(value, None)
+                    if defined is not None:
+                        ctx.setdefault("defines", []).append((
+                            abs_decode_path(sub_decode_path[:-1], rel_path),
+                            (value, defined),
+                        ))
         if len(v) > 0:
             raise DecodeError(
                 "remaining data",
@@ -3917,7 +3986,7 @@ class Set(Sequence):
         v = b"".join(raws)
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -3960,7 +4029,7 @@ class Set(Sequence):
                         sub_offset,
                         leavemm=True,
                         decode_path=decode_path + (name,),
-                        defines_by_path=defines_by_path,
+                        ctx=ctx,
                     )
                 except TagMismatch:
                     continue
@@ -4170,7 +4239,7 @@ class SequenceOf(Obj):
         v = b"".join(self._encoded_values())
         return b"".join((self.tag, len_encode(len(v)), v))
 
-    def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+    def _decode(self, tlv, offset, decode_path, ctx):
         try:
             t, tlen, lv = tag_strip(tlv)
         except DecodeError as err:
@@ -4212,7 +4281,7 @@ class SequenceOf(Obj):
                 sub_offset,
                 leavemm=True,
                 decode_path=decode_path + (str(len(_value)),),
-                defines_by_path=defines_by_path,
+                ctx=ctx,
             )
             sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
             v = v_tail
@@ -4371,9 +4440,9 @@ def main():  # pragma: no cover
         schema, pprinter = generic_decoder()
     obj, tail = schema().decode(
         der,
-        defines_by_path=(
-            None if args.defines_by_path is None
-            else obj_by_path(args.defines_by_path)
+        ctx=(
+            None if args.defines_by_path is None else
+            {"defines_by_path": obj_by_path(args.defines_by_path)}
         ),
     )
     print(pprinter(obj, oids=oids))