When error occurs, then :py:exc:`pyderasn.DecodeError` is raised.
+.. _ctx:
+
+Context
+_______
+
+You can specify so called context keyword argument during ``decode()``
+invocation. It is dictionary containing various options governing
+decoding process.
+
+Currently available context options:
+
+* :ref:`defines_by_path <defines_by_path_ctx>`
+
.. _pprinting:
Pretty printing
:py:class:`pyderasn.ObjectIdentifier` field inside
:py:class:`pyderasn.Sequence` can hold mapping between OIDs and
-necessary for decoding structrures. For example, CMS (:rfc:`5652`)
+necessary for decoding structures. For example, CMS (:rfc:`5652`)
container::
class ContentInfo(Sequence):
schema = (
- ("contentType", ContentType(defines=("content", {
+ ("contentType", ContentType(defines=((("content",), {
id_digestedData: DigestedData(),
id_signedData: SignedData(),
- }))),
+ }),))),
("content", Any(expl=tag_ctxc(0))),
)
``contentType`` contains unknown OID, then no automatic decoding is
done.
+You can specify multiple fields, that will be autodecoded -- that is why
+``defines`` kwarg is a sequence. You can specify defined field
+relatively or absolutely to current decode path. For example ``defines``
+for AlgorithmIdentifier of X.509's
+``tbsCertificate.subjectPublicKeyInfo.algorithm.algorithm``::
+
+ (
+ (('parameters',), {
+ id_ecPublicKey: ECParameters(),
+ id_GostR3410_2001: GostR34102001PublicKeyParameters(),
+ }),
+ (('..', 'subjectPublicKey'), {
+ id_rsaEncryption: RSAPublicKey(),
+ id_GostR3410_2001: OctetString(),
+ }),
+ ),
+
+tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
+autodecode its parameters inside SPKI's algorithm and its public key
+itself.
+
Following types can be automatically decoded (DEFINED BY):
* :py:class:`pyderasn.Any`
above, ``content_info["content"].defined == (id_signedData,
signed_data)``.
-.. _defines_by_path_kwarg:
+.. _defines_by_path_ctx:
-defines_by_path kwarg
-_____________________
+defines_by_path context option
+______________________________
Sometimes you either can not or do not want to explicitly set *defines*
in the scheme. You can dynamically apply those definitions when calling
``.decode()`` method.
-Decode method takes optional ``defines_by_path`` keyword argument that
-must be sequence of following tuples::
+Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
+value must be sequence of following tuples::
(decode_path, defines)
content_info, tail = ContentInfo().decode(data, defines_by_path=(
(
("contentType",),
- ("content", {id_signedData: SignedData()}),
+ ((("content",), {id_signedData: SignedData()}),),
),
(
(
"encapContentInfo",
"eContentType",
),
- ("eContent", {
+ ((("eContent",), {
id_cct_PKIData: PKIData(),
id_cct_PKIResponse: PKIResponse(),
- }),
+ })),
),
(
(
any,
"attrType",
),
- ("attrValues", {
+ ((("attrValues",), {
id_cmc_recipientNonce: RecipientNonce(),
id_cmc_senderNonce: SenderNonce(),
id_cmc_statusInfoV2: CMCStatusInfoV2(),
id_cmc_transactionId: TransactionId(),
- }),
+ })),
),
))
Various
-------
+.. autofunction:: pyderasn.abs_decode_path
.. autofunction:: pyderasn.hexenc
.. autofunction:: pyderasn.hexdec
.. autofunction:: pyderasn.tag_encode
def _encode(self): # pragma: no cover
raise NotImplementedError()
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): # pragma: no cover
+ def _decode(self, tlv, offset, decode_path, ctx): # pragma: no cover
raise NotImplementedError()
def encode(self):
return raw
return b"".join((self._expl, len_encode(len(raw)), raw))
- def decode(self, data, offset=0, leavemm=False, decode_path=(), defines_by_path=None):
+ def decode(self, data, offset=0, leavemm=False, decode_path=(), ctx=None):
"""Decode the data
:param data: either binary or memoryview
:param int offset: initial data's offset
:param bool leavemm: do we need to leave memoryview of remaining
data as is, or convert it to bytes otherwise
- :param defines_by_path: :ref:`Read about DEFINED BY <definedby>`
+ :param ctx: optional :ref:`context <ctx>` governing decoding process.
:returns: (Obj, remaining data)
"""
+ if ctx is None:
+ ctx = {}
tlv = memoryview(data)
if self._expl is None:
obj, tail = self._decode(
tlv,
offset,
decode_path=decode_path,
- defines_by_path=defines_by_path,
+ ctx=ctx,
)
else:
try:
v,
offset=offset + tlen + llen,
decode_path=decode_path,
- defines_by_path=defines_by_path,
+ ctx=ctx,
)
return obj, (tail if leavemm else tail.tobytes())
(b"\xFF" if self._value else b"\x00"),
))
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
break
return b"".join((self.tag, len_encode(len(octets)), octets))
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
octets,
))
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
self._value,
))
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
def _encode(self):
return self.tag + len_encode(0)
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
def __init__(
self,
value=None,
- defines=None,
+ defines=(),
impl=None,
expl=None,
default=None,
:param value: set the value. Either tuples of integers,
string of "."-concatenated integers, or
:py:class:`pyderasn.ObjectIdentifier` object
- :param defines: tuple of two elements. First one is a name of
- field inside :py:class:`pyderasn.Sequence`,
- defining with that OID. Second element is a
- ``{OID: pyderasn.Obj()}`` dictionary, mapping
- between current OID value and structure applied
- to defined field.
+ :param defines: sequence of tuples. Each tuple has two elements.
+ First one is relative to current one decode
+ path, aiming to the field defined by that OID.
+ Read about relative path in
+ :py:func:`pyderasn.abs_decode_path`. Second
+ tuple element is ``{OID: pyderasn.Obj()}``
+ dictionary, mapping between current OID value
+ and structure applied to defined field.
:ref:`Read about DEFINED BY <definedby>`
:param bytes impl: override default tag with ``IMPLICIT`` one
:param bytes expl: override default tag with ``EXPLICIT`` one
v = b"".join(octets)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
self._assert_ready()
return self._value[1].encode()
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
for choice, spec in self.specs.items():
try:
value, tail = spec.decode(
offset=offset,
leavemm=True,
decode_path=decode_path + (choice,),
- defines_by_path=defines_by_path,
+ ctx=ctx,
)
except TagMismatch:
continue
self._assert_ready()
return self._value
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, tlen, lv = tag_strip(tlv)
l, llen, v = len_decode(lv)
return define
+def abs_decode_path(decode_path, rel_path):
+ """Create an absolute decode path from current and relative ones
+
+ :param decode_path: current decode path, starting point.
+ Tuple of strings
+ :param rel_path: relative path to ``decode_path``. Tuple of strings.
+ If first tuple's element is "/", then treat it as
+ an absolute path, ignoring ``decode_path`` as
+ starting point. Also this tuple can contain ".."
+ elements, stripping the leading element from
+ ``decode_path``
+
+ >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
+ ("foo", "bar", "baz", "whatever")
+ >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
+ ("foo", "whatever")
+ >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
+ ("baz", "whatever")
+ """
+ if rel_path[0] == "/":
+ return rel_path[1:]
+ if rel_path[0] == "..":
+ return abs_decode_path(decode_path[:-1], rel_path[1:])
+ return decode_path + rel_path
+
+
class Sequence(Obj):
"""``SEQUENCE`` structure type
v = b"".join(self._encoded_values())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
v, tail = v[:l], v[l:]
sub_offset = offset + tlen + llen
values = {}
- defines = {}
for name, spec in self.specs.items():
if len(v) == 0 and spec.optional:
continue
sub_offset,
leavemm=True,
decode_path=sub_decode_path,
- defines_by_path=defines_by_path,
+ ctx=ctx,
)
except TagMismatch:
if spec.optional:
continue
raise
- defined = defines.pop(name, None)
+ defined = get_def_by_path(ctx.get("defines", ()), sub_decode_path)
if defined is not None:
defined_by, defined_spec = defined
if issubclass(value.__class__, SequenceOf):
sub_offset + value.tlen + value.llen,
leavemm=True,
decode_path=sub_sub_decode_path,
- defines_by_path=defines_by_path,
+ ctx=ctx,
)
if len(defined_tail) > 0:
raise DecodeError(
sub_offset + value.tlen + value.llen,
leavemm=True,
decode_path=sub_decode_path + (decode_path_defby(defined_by),),
- defines_by_path=defines_by_path,
+ ctx=ctx,
)
if len(defined_tail) > 0:
raise DecodeError(
continue
values[name] = value
- spec_defines = getattr(spec, "defines", None)
- if defines_by_path is not None and spec_defines is None:
- spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
- if spec_defines is not None:
- what, schema = spec_defines
- defined = schema.get(value, None)
- if defined is not None:
- defines[what] = (value, defined)
+ spec_defines = getattr(spec, "defines", ())
+ if len(spec_defines) == 0:
+ defines_by_path = ctx.get("defines_by_path", ())
+ if len(defines_by_path) > 0:
+ spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
+ if spec_defines is not None and len(spec_defines) > 0:
+ for rel_path, schema in spec_defines:
+ defined = schema.get(value, None)
+ if defined is not None:
+ ctx.setdefault("defines", []).append((
+ abs_decode_path(sub_decode_path[:-1], rel_path),
+ (value, defined),
+ ))
if len(v) > 0:
raise DecodeError(
"remaining data",
v = b"".join(raws)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
sub_offset,
leavemm=True,
decode_path=decode_path + (name,),
- defines_by_path=defines_by_path,
+ ctx=ctx,
)
except TagMismatch:
continue
v = b"".join(self._encoded_values())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
sub_offset,
leavemm=True,
decode_path=decode_path + (str(len(_value)),),
- defines_by_path=defines_by_path,
+ ctx=ctx,
)
sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
v = v_tail
schema, pprinter = generic_decoder()
obj, tail = schema().decode(
der,
- defines_by_path=(
- None if args.defines_by_path is None
- else obj_by_path(args.defines_by_path)
+ ctx=(
+ None if args.defines_by_path is None else
+ {"defines_by_path": obj_by_path(args.defines_by_path)}
),
)
print(pprinter(obj, oids=oids))
from six import text_type
from pyderasn import _pp
+from pyderasn import abs_decode_path
from pyderasn import Any
from pyderasn import BitString
from pyderasn import BMPString
max_size=len(value_names),
))
_schema = [
- ("type", ObjectIdentifier(defines=(value_name_chosen, {
+ ("type", ObjectIdentifier(defines=(((value_name_chosen,), {
oid: Integer() for oid in oids[:-1]
- }))),
+ }),))),
]
for i, value_name in enumerate(value_names):
_schema.append((value_name, Any(expl=tag_ctxp(i))))
class TestDefinesByPath(TestCase):
- def runTest(self):
+ def test_generated(self):
class Seq(Sequence):
schema = (
("type", ObjectIdentifier()),
seq_integered, _ = Seq().decode(seq_integered_raw)
self.assertIsNone(seq_integered["value"].defined)
defines_by_path.append(
- (("type",), ("value", {
+ (("type",), ((("value",), {
type_integered: Integer(),
type_sequenced: SeqInner(),
- }))
+ }),))
+ )
+ seq_integered, _ = Seq().decode(
+ seq_integered_raw,
+ ctx={"defines_by_path": defines_by_path},
)
- seq_integered, _ = Seq().decode(seq_integered_raw, defines_by_path=defines_by_path)
self.assertIsNotNone(seq_integered["value"].defined)
self.assertEqual(seq_integered["value"].defined[0], type_integered)
self.assertEqual(seq_integered["value"].defined[1], Integer(123))
- seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+ seq_sequenced, _ = Seq().decode(
+ seq_sequenced_raw,
+ ctx={"defines_by_path": defines_by_path},
+ )
self.assertIsNotNone(seq_sequenced["value"].defined)
self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
seq_inner = seq_sequenced["value"].defined[1]
defines_by_path.append((
("value", decode_path_defby(type_sequenced), "typeInner"),
- ("valueInner", {type_innered: Pairs()}),
+ ((("valueInner",), {type_innered: Pairs()}),),
))
- seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+ seq_sequenced, _ = Seq().decode(
+ seq_sequenced_raw,
+ ctx={"defines_by_path": defines_by_path},
+ )
self.assertIsNotNone(seq_sequenced["value"].defined)
self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
seq_inner = seq_sequenced["value"].defined[1]
any,
"type",
),
- ("value", {
+ ((("value",), {
type_integered: Integer(),
type_octet_stringed: OctetString(),
- }),
+ }),),
))
- seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+ seq_sequenced, _ = Seq().decode(
+ seq_sequenced_raw,
+ ctx={"defines_by_path": defines_by_path},
+ )
self.assertIsNotNone(seq_sequenced["value"].defined)
self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
seq_inner = seq_sequenced["value"].defined[1]
for pair_input, pair_got in zip(pairs_input, pairs_got):
self.assertEqual(pair_got["value"][0].defined[0], pair_input[0])
self.assertEqual(pair_got["value"][0].defined[1], pair_input[1])
+
+ @given(oid_strategy(), integers())
+ def test_simple(self, oid, tgt):
+ class Inner(Sequence):
+ schema = (
+ ("oid", ObjectIdentifier(defines=((("..", "tgt"), {
+ ObjectIdentifier(oid): Integer(),
+ }),))),
+ )
+
+ class Outer(Sequence):
+ schema = (
+ ("inner", Inner()),
+ ("tgt", OctetString()),
+ )
+
+ inner = Inner()
+ inner["oid"] = ObjectIdentifier(oid)
+ outer = Outer()
+ outer["inner"] = inner
+ outer["tgt"] = OctetString(Integer(tgt).encode())
+ decoded, _ = Outer().decode(outer.encode())
+ self.assertEqual(decoded["tgt"].defined[1], Integer(tgt))
+
+class TestAbsDecodePath(TestCase):
+ @given(
+ lists(text(alphabet=ascii_letters, min_size=1)).map(tuple),
+ lists(text(alphabet=ascii_letters, min_size=1), min_size=1).map(tuple),
+ )
+ def test_concat(self, decode_path, rel_path):
+ self.assertSequenceEqual(
+ abs_decode_path(decode_path, rel_path),
+ decode_path + rel_path,
+ )
+
+ @given(
+ lists(text(alphabet=ascii_letters, min_size=1)).map(tuple),
+ lists(text(alphabet=ascii_letters, min_size=1), min_size=1).map(tuple),
+ )
+ def test_abs(self, decode_path, rel_path):
+ self.assertSequenceEqual(
+ abs_decode_path(decode_path, ("/",) + rel_path),
+ rel_path,
+ )
+
+ @given(
+ lists(text(alphabet=ascii_letters, min_size=1), min_size=5).map(tuple),
+ integers(min_value=1, max_value=3),
+ lists(text(alphabet=ascii_letters, min_size=1), min_size=1).map(tuple),
+ )
+ def test_dots(self, decode_path, number_of_dots, rel_path):
+ self.assertSequenceEqual(
+ abs_decode_path(decode_path, tuple([".."] * number_of_dots) + rel_path),
+ decode_path[:-number_of_dots] + rel_path,
+ )