From 78833daa9b9637827bfb570135e699b5871aefd3 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Mon, 1 Jan 2018 20:20:34 +0300 Subject: [PATCH 1/1] Decode context and defines feature extending --- VERSION | 2 +- doc/examples.rst | 4 +- doc/news.rst | 12 ++- pyderasn.py | 179 ++++++++++++++++++++++++++++------------- pyderasn.pyi | 2 +- tests/test_crts.py | 4 +- tests/test_pyderasn.py | 92 ++++++++++++++++++--- 7 files changed, 221 insertions(+), 74 deletions(-) diff --git a/VERSION b/VERSION index 810ee4e..cd5ac03 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.6 +2.0 diff --git a/doc/examples.rst b/doc/examples.rst index a9e57b6..512c8f8 100644 --- a/doc/examples.rst +++ b/doc/examples.rst @@ -424,13 +424,13 @@ fields automatic decoding:: class AttributeTypeAndValue(Sequence): schema = ( - ("type", AttributeType(defines=("value", { + ((("type",), AttributeType(defines=("value", { id_at_countryName: PrintableString(), id_at_stateOrProvinceName: PrintableString(), id_at_localityName: PrintableString(), id_at_organizationName: PrintableString(), id_at_commonName: PrintableString(), - }))), + }))),), ("value", AttributeValue()), ) diff --git a/doc/news.rst b/doc/news.rst index a8757b8..914a9f8 100644 --- a/doc/news.rst +++ b/doc/news.rst @@ -1,6 +1,16 @@ News ==== +.. _release2.0: + +2.0 +--- +* BIT STRINGs can also be :ref:`DEFINED BY ` +* Decoding process can be governed with optional :ref:`ctx ` + keyword argument to ``decode()`` method +* :ref:`defines_by_path ` option is now + :ref:`decode context ` option, not a keyword argument + .. _release1.6: 1.6 @@ -14,7 +24,7 @@ utility. --- * Generic decoder's scheme and pretty printer (:py:func:`pyderasn.generic_decoder`) can be used in libraries. -* Ability to specify :ref:`defines_by_path ` +* Ability to specify :ref:`defines_by_path ` during command line invocation. .. _release1.4: diff --git a/pyderasn.py b/pyderasn.py index 73eec5e..b4a10d9 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -195,6 +195,19 @@ lesser than ``offset``), ``expl_tlen``, ``expl_llen``, ``expl_vlen`` When error occurs, then :py:exc:`pyderasn.DecodeError` is raised. +.. _ctx: + +Context +_______ + +You can specify so called context keyword argument during ``decode()`` +invocation. It is dictionary containing various options governing +decoding process. + +Currently available context options: + +* :ref:`defines_by_path ` + .. _pprinting: Pretty printing @@ -230,15 +243,15 @@ _____________ :py:class:`pyderasn.ObjectIdentifier` field inside :py:class:`pyderasn.Sequence` can hold mapping between OIDs and -necessary for decoding structrures. For example, CMS (:rfc:`5652`) +necessary for decoding structures. For example, CMS (:rfc:`5652`) container:: class ContentInfo(Sequence): schema = ( - ("contentType", ContentType(defines=("content", { + ("contentType", ContentType(defines=((("content",), { id_digestedData: DigestedData(), id_signedData: SignedData(), - }))), + }),))), ("content", Any(expl=tag_ctxc(0))), ) @@ -248,6 +261,27 @@ decoded with ``SignedData`` specification, if ``contentType`` equals to ``contentType`` contains unknown OID, then no automatic decoding is done. +You can specify multiple fields, that will be autodecoded -- that is why +``defines`` kwarg is a sequence. You can specify defined field +relatively or absolutely to current decode path. For example ``defines`` +for AlgorithmIdentifier of X.509's +``tbsCertificate.subjectPublicKeyInfo.algorithm.algorithm``:: + + ( + (('parameters',), { + id_ecPublicKey: ECParameters(), + id_GostR3410_2001: GostR34102001PublicKeyParameters(), + }), + (('..', 'subjectPublicKey'), { + id_rsaEncryption: RSAPublicKey(), + id_GostR3410_2001: OctetString(), + }), + ), + +tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then +autodecode its parameters inside SPKI's algorithm and its public key +itself. + Following types can be automatically decoded (DEFINED BY): * :py:class:`pyderasn.Any` @@ -262,17 +296,17 @@ was defined, ``value`` contains corresponding decoded value. For example above, ``content_info["content"].defined == (id_signedData, signed_data)``. -.. _defines_by_path_kwarg: +.. _defines_by_path_ctx: -defines_by_path kwarg -_____________________ +defines_by_path context option +______________________________ Sometimes you either can not or do not want to explicitly set *defines* in the scheme. You can dynamically apply those definitions when calling ``.decode()`` method. -Decode method takes optional ``defines_by_path`` keyword argument that -must be sequence of following tuples:: +Specify ``defines_by_path`` key in the :ref:`decode context `. Its +value must be sequence of following tuples:: (decode_path, defines) @@ -289,7 +323,7 @@ of ``PKIResponse``:: content_info, tail = ContentInfo().decode(data, defines_by_path=( ( ("contentType",), - ("content", {id_signedData: SignedData()}), + ((("content",), {id_signedData: SignedData()}),), ), ( ( @@ -298,10 +332,10 @@ of ``PKIResponse``:: "encapContentInfo", "eContentType", ), - ("eContent", { + ((("eContent",), { id_cct_PKIData: PKIData(), id_cct_PKIResponse: PKIResponse(), - }), + })), ), ( ( @@ -314,12 +348,12 @@ of ``PKIResponse``:: any, "attrType", ), - ("attrValues", { + ((("attrValues",), { id_cmc_recipientNonce: RecipientNonce(), id_cmc_senderNonce: SenderNonce(), id_cmc_statusInfoV2: CMCStatusInfoV2(), id_cmc_transactionId: TransactionId(), - }), + })), ), )) @@ -421,6 +455,7 @@ _____ Various ------- +.. autofunction:: pyderasn.abs_decode_path .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec .. autofunction:: pyderasn.tag_encode @@ -867,7 +902,7 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): # pragma: no cover + def _decode(self, tlv, offset, decode_path, ctx): # pragma: no cover raise NotImplementedError() def encode(self): @@ -876,23 +911,25 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) - def decode(self, data, offset=0, leavemm=False, decode_path=(), defines_by_path=None): + def decode(self, data, offset=0, leavemm=False, decode_path=(), ctx=None): """Decode the data :param data: either binary or memoryview :param int offset: initial data's offset :param bool leavemm: do we need to leave memoryview of remaining data as is, or convert it to bytes otherwise - :param defines_by_path: :ref:`Read about DEFINED BY ` + :param ctx: optional :ref:`context ` governing decoding process. :returns: (Obj, remaining data) """ + if ctx is None: + ctx = {} tlv = memoryview(data) if self._expl is None: obj, tail = self._decode( tlv, offset, decode_path=decode_path, - defines_by_path=defines_by_path, + ctx=ctx, ) else: try: @@ -930,7 +967,7 @@ class Obj(object): v, offset=offset + tlen + llen, decode_path=decode_path, - defines_by_path=defines_by_path, + ctx=ctx, ) return obj, (tail if leavemm else tail.tobytes()) @@ -1238,7 +1275,7 @@ class Boolean(Obj): (b"\xFF" if self._value else b"\x00"), )) - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1531,7 +1568,7 @@ class Integer(Obj): break return b"".join((self.tag, len_encode(len(octets)), octets)) - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1865,7 +1902,7 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2117,7 +2154,7 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2266,7 +2303,7 @@ class Null(Obj): def _encode(self): return self.tag + len_encode(0) - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2353,7 +2390,7 @@ class ObjectIdentifier(Obj): def __init__( self, value=None, - defines=None, + defines=(), impl=None, expl=None, default=None, @@ -2364,12 +2401,14 @@ class ObjectIdentifier(Obj): :param value: set the value. Either tuples of integers, string of "."-concatenated integers, or :py:class:`pyderasn.ObjectIdentifier` object - :param defines: tuple of two elements. First one is a name of - field inside :py:class:`pyderasn.Sequence`, - defining with that OID. Second element is a - ``{OID: pyderasn.Obj()}`` dictionary, mapping - between current OID value and structure applied - to defined field. + :param defines: sequence of tuples. Each tuple has two elements. + First one is relative to current one decode + path, aiming to the field defined by that OID. + Read about relative path in + :py:func:`pyderasn.abs_decode_path`. Second + tuple element is ``{OID: pyderasn.Obj()}`` + dictionary, mapping between current OID value + and structure applied to defined field. :ref:`Read about DEFINED BY ` :param bytes impl: override default tag with ``IMPLICIT`` one :param bytes expl: override default tag with ``EXPLICIT`` one @@ -2509,7 +2548,7 @@ class ObjectIdentifier(Obj): v = b"".join(octets) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3276,7 +3315,7 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): for choice, spec in self.specs.items(): try: value, tail = spec.decode( @@ -3284,7 +3323,7 @@ class Choice(Obj): offset=offset, leavemm=True, decode_path=decode_path + (choice,), - defines_by_path=defines_by_path, + ctx=ctx, ) except TagMismatch: continue @@ -3451,7 +3490,7 @@ class Any(Obj): self._assert_ready() return self._value - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, tlen, lv = tag_strip(tlv) l, llen, v = len_decode(lv) @@ -3526,6 +3565,32 @@ def get_def_by_path(defines_by_path, sub_decode_path): return define +def abs_decode_path(decode_path, rel_path): + """Create an absolute decode path from current and relative ones + + :param decode_path: current decode path, starting point. + Tuple of strings + :param rel_path: relative path to ``decode_path``. Tuple of strings. + If first tuple's element is "/", then treat it as + an absolute path, ignoring ``decode_path`` as + starting point. Also this tuple can contain ".." + elements, stripping the leading element from + ``decode_path`` + + >>> abs_decode_path(("foo", "bar"), ("baz", "whatever")) + ("foo", "bar", "baz", "whatever") + >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever")) + ("foo", "whatever") + >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever")) + ("baz", "whatever") + """ + if rel_path[0] == "/": + return rel_path[1:] + if rel_path[0] == "..": + return abs_decode_path(decode_path[:-1], rel_path[1:]) + return decode_path + rel_path + + class Sequence(Obj): """``SEQUENCE`` structure type @@ -3739,7 +3804,7 @@ class Sequence(Obj): v = b"".join(self._encoded_values()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3774,7 +3839,6 @@ class Sequence(Obj): v, tail = v[:l], v[l:] sub_offset = offset + tlen + llen values = {} - defines = {} for name, spec in self.specs.items(): if len(v) == 0 and spec.optional: continue @@ -3785,14 +3849,14 @@ class Sequence(Obj): sub_offset, leavemm=True, decode_path=sub_decode_path, - defines_by_path=defines_by_path, + ctx=ctx, ) except TagMismatch: if spec.optional: continue raise - defined = defines.pop(name, None) + defined = get_def_by_path(ctx.get("defines", ()), sub_decode_path) if defined is not None: defined_by, defined_spec = defined if issubclass(value.__class__, SequenceOf): @@ -3806,7 +3870,7 @@ class Sequence(Obj): sub_offset + value.tlen + value.llen, leavemm=True, decode_path=sub_sub_decode_path, - defines_by_path=defines_by_path, + ctx=ctx, ) if len(defined_tail) > 0: raise DecodeError( @@ -3822,7 +3886,7 @@ class Sequence(Obj): sub_offset + value.tlen + value.llen, leavemm=True, decode_path=sub_decode_path + (decode_path_defby(defined_by),), - defines_by_path=defines_by_path, + ctx=ctx, ) if len(defined_tail) > 0: raise DecodeError( @@ -3841,14 +3905,19 @@ class Sequence(Obj): continue values[name] = value - spec_defines = getattr(spec, "defines", None) - if defines_by_path is not None and spec_defines is None: - spec_defines = get_def_by_path(defines_by_path, sub_decode_path) - if spec_defines is not None: - what, schema = spec_defines - defined = schema.get(value, None) - if defined is not None: - defines[what] = (value, defined) + spec_defines = getattr(spec, "defines", ()) + if len(spec_defines) == 0: + defines_by_path = ctx.get("defines_by_path", ()) + if len(defines_by_path) > 0: + spec_defines = get_def_by_path(defines_by_path, sub_decode_path) + if spec_defines is not None and len(spec_defines) > 0: + for rel_path, schema in spec_defines: + defined = schema.get(value, None) + if defined is not None: + ctx.setdefault("defines", []).append(( + abs_decode_path(sub_decode_path[:-1], rel_path), + (value, defined), + )) if len(v) > 0: raise DecodeError( "remaining data", @@ -3917,7 +3986,7 @@ class Set(Sequence): v = b"".join(raws) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3960,7 +4029,7 @@ class Set(Sequence): sub_offset, leavemm=True, decode_path=decode_path + (name,), - defines_by_path=defines_by_path, + ctx=ctx, ) except TagMismatch: continue @@ -4170,7 +4239,7 @@ class SequenceOf(Obj): v = b"".join(self._encoded_values()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): + def _decode(self, tlv, offset, decode_path, ctx): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -4212,7 +4281,7 @@ class SequenceOf(Obj): sub_offset, leavemm=True, decode_path=decode_path + (str(len(_value)),), - defines_by_path=defines_by_path, + ctx=ctx, ) sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen) v = v_tail @@ -4371,9 +4440,9 @@ def main(): # pragma: no cover schema, pprinter = generic_decoder() obj, tail = schema().decode( der, - defines_by_path=( - None if args.defines_by_path is None - else obj_by_path(args.defines_by_path) + ctx=( + None if args.defines_by_path is None else + {"defines_by_path": obj_by_path(args.defines_by_path)} ), ) print(pprinter(obj, oids=oids)) diff --git a/pyderasn.pyi b/pyderasn.pyi index 8d91745..1c784fb 100644 --- a/pyderasn.pyi +++ b/pyderasn.pyi @@ -118,7 +118,7 @@ class Obj: offset: int=..., leavemm: bool=..., decode_path: Tuple[str, ...]=..., - defines_by_path: Optional[Sequence[Tuple[Tuple[str, ...], Tuple[str, Dict[ObjectIdentifier, Obj]]]]], + ctx: Optional[Dict[str, TAny]]=..., ) -> Tuple[Obj, bytes]: ... @property diff --git a/tests/test_crts.py b/tests/test_crts.py index 8f8f1ec..9c6b76c 100644 --- a/tests/test_crts.py +++ b/tests/test_crts.py @@ -98,13 +98,13 @@ class OrganizationName(Choice): class AttributeTypeAndValue(Sequence): schema = ( - ("type", AttributeType(defines=("value", { + ("type", AttributeType(defines=(((".", "value"), { ObjectIdentifier("2.5.4.6"): PrintableString(), ObjectIdentifier("2.5.4.8"): PrintableString(), ObjectIdentifier("2.5.4.7"): PrintableString(), ObjectIdentifier("2.5.4.10"): OrganizationName(), ObjectIdentifier("2.5.4.3"): PrintableString(), - }))), + }),))), ("value", AttributeValue()), ) diff --git a/tests/test_pyderasn.py b/tests/test_pyderasn.py index 8297415..498b160 100644 --- a/tests/test_pyderasn.py +++ b/tests/test_pyderasn.py @@ -50,6 +50,7 @@ from six import PY2 from six import text_type from pyderasn import _pp +from pyderasn import abs_decode_path from pyderasn import Any from pyderasn import BitString from pyderasn import BMPString @@ -4902,9 +4903,9 @@ class TestOIDDefines(TestCase): max_size=len(value_names), )) _schema = [ - ("type", ObjectIdentifier(defines=(value_name_chosen, { + ("type", ObjectIdentifier(defines=(((value_name_chosen,), { oid: Integer() for oid in oids[:-1] - }))), + }),))), ] for i, value_name in enumerate(value_names): _schema.append((value_name, Any(expl=tag_ctxp(i)))) @@ -4927,7 +4928,7 @@ class TestOIDDefines(TestCase): class TestDefinesByPath(TestCase): - def runTest(self): + def test_generated(self): class Seq(Sequence): schema = ( ("type", ObjectIdentifier()), @@ -4990,17 +4991,23 @@ class TestDefinesByPath(TestCase): seq_integered, _ = Seq().decode(seq_integered_raw) self.assertIsNone(seq_integered["value"].defined) defines_by_path.append( - (("type",), ("value", { + (("type",), ((("value",), { type_integered: Integer(), type_sequenced: SeqInner(), - })) + }),)) + ) + seq_integered, _ = Seq().decode( + seq_integered_raw, + ctx={"defines_by_path": defines_by_path}, ) - seq_integered, _ = Seq().decode(seq_integered_raw, defines_by_path=defines_by_path) self.assertIsNotNone(seq_integered["value"].defined) self.assertEqual(seq_integered["value"].defined[0], type_integered) self.assertEqual(seq_integered["value"].defined[1], Integer(123)) - seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path) + seq_sequenced, _ = Seq().decode( + seq_sequenced_raw, + ctx={"defines_by_path": defines_by_path}, + ) self.assertIsNotNone(seq_sequenced["value"].defined) self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced) seq_inner = seq_sequenced["value"].defined[1] @@ -5008,9 +5015,12 @@ class TestDefinesByPath(TestCase): defines_by_path.append(( ("value", decode_path_defby(type_sequenced), "typeInner"), - ("valueInner", {type_innered: Pairs()}), + ((("valueInner",), {type_innered: Pairs()}),), )) - seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path) + seq_sequenced, _ = Seq().decode( + seq_sequenced_raw, + ctx={"defines_by_path": defines_by_path}, + ) self.assertIsNotNone(seq_sequenced["value"].defined) self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced) seq_inner = seq_sequenced["value"].defined[1] @@ -5029,12 +5039,15 @@ class TestDefinesByPath(TestCase): any, "type", ), - ("value", { + ((("value",), { type_integered: Integer(), type_octet_stringed: OctetString(), - }), + }),), )) - seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path) + seq_sequenced, _ = Seq().decode( + seq_sequenced_raw, + ctx={"defines_by_path": defines_by_path}, + ) self.assertIsNotNone(seq_sequenced["value"].defined) self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced) seq_inner = seq_sequenced["value"].defined[1] @@ -5044,3 +5057,58 @@ class TestDefinesByPath(TestCase): for pair_input, pair_got in zip(pairs_input, pairs_got): self.assertEqual(pair_got["value"][0].defined[0], pair_input[0]) self.assertEqual(pair_got["value"][0].defined[1], pair_input[1]) + + @given(oid_strategy(), integers()) + def test_simple(self, oid, tgt): + class Inner(Sequence): + schema = ( + ("oid", ObjectIdentifier(defines=((("..", "tgt"), { + ObjectIdentifier(oid): Integer(), + }),))), + ) + + class Outer(Sequence): + schema = ( + ("inner", Inner()), + ("tgt", OctetString()), + ) + + inner = Inner() + inner["oid"] = ObjectIdentifier(oid) + outer = Outer() + outer["inner"] = inner + outer["tgt"] = OctetString(Integer(tgt).encode()) + decoded, _ = Outer().decode(outer.encode()) + self.assertEqual(decoded["tgt"].defined[1], Integer(tgt)) + +class TestAbsDecodePath(TestCase): + @given( + lists(text(alphabet=ascii_letters, min_size=1)).map(tuple), + lists(text(alphabet=ascii_letters, min_size=1), min_size=1).map(tuple), + ) + def test_concat(self, decode_path, rel_path): + self.assertSequenceEqual( + abs_decode_path(decode_path, rel_path), + decode_path + rel_path, + ) + + @given( + lists(text(alphabet=ascii_letters, min_size=1)).map(tuple), + lists(text(alphabet=ascii_letters, min_size=1), min_size=1).map(tuple), + ) + def test_abs(self, decode_path, rel_path): + self.assertSequenceEqual( + abs_decode_path(decode_path, ("/",) + rel_path), + rel_path, + ) + + @given( + lists(text(alphabet=ascii_letters, min_size=1), min_size=5).map(tuple), + integers(min_value=1, max_value=3), + lists(text(alphabet=ascii_letters, min_size=1), min_size=1).map(tuple), + ) + def test_dots(self, decode_path, number_of_dots, rel_path): + self.assertSequenceEqual( + abs_decode_path(decode_path, tuple([".."] * number_of_dots) + rel_path), + decode_path[:-number_of_dots] + rel_path, + ) -- 2.44.0