When default argument is used and value is not specified, then it equals
to default one.
+.. _bounds:
+
Size constraints
________________
All objects have ``copy()`` method, returning its copy, that can be safely
mutated.
+.. _decoding:
+
Decoding
-________
+--------
Decoding is performed using ``decode()`` method. ``offset`` optional
argument could be used to set initial object's offset in the binary
When error occurs, then :py:exc:`pyderasn.DecodeError` is raised.
+.. _pprinting:
+
Pretty printing
-_______________
+---------------
All objects have ``pps()`` method, that is a generator of
:py:class:`pyderasn.PP` namedtuple, holding various raw information
>>> print(pprint(obj))
0 [1,1, 2] INTEGER -12345
+.. _definedby:
+
+DEFINED BY
+----------
+
+ASN.1 structures often have ANY and OCTET STRING fields, that are
+DEFINED BY some previously met ObjectIdentifier. This library provides
+ability to specify mapping between some OID and field that must be
+decoded with specific specification.
+
+defines kwarg
+_____________
+
+:py:class:`pyderasn.ObjectIdentifier` field inside
+:py:class:`pyderasn.Sequence` can hold mapping between OIDs and
+necessary for decoding structrures. For example, CMS (:rfc:`5652`)
+container::
+
+ class ContentInfo(Sequence):
+ schema = (
+ ("contentType", ContentType(defines=("content", {
+ id_digestedData: DigestedData(),
+ id_signedData: SignedData(),
+ }))),
+ ("content", Any(expl=tag_ctxc(0))),
+ )
+
+``contentType`` field tells that it defines that ``content`` must be
+decoded with ``SignedData`` specification, if ``contentType`` equals to
+``id-signedData``. The same applies to ``DigestedData``. If
+``contentType`` contains unknown OID, then no automatic decoding is
+done.
+
+Following types can be automatically decoded (DEFINED BY):
+
+* :py:class:`pyderasn.Any`
+* :py:class:`pyderasn.OctetString`
+* :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
+ ``Any``/``OctetString``-s
+
+When any of those fields is automatically decoded, then ``.defined``
+attribute contains ``(OID, value)`` tuple. OID tell by which OID it was
+defined, ``value`` contains corresponding decoded value. For example
+above, ``content_info["content"].defined == (id_signedData,
+signed_data)``.
+
+defines_by_path kwarg
+_____________________
+
+Sometimes you either can not or do not want to explicitly set *defines*
+in the scheme. You can dynamically apply those definitions when calling
+``.decode()`` method.
+
+Decode method takes optional ``defines_by_path`` keyword argument that
+must be sequence of following tuples::
+
+ (decode_path, defines)
+
+where ``decode_path`` is a tuple holding so-called decode path to the
+exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
+``defines``, holding exactly the same value as accepted in its keyword
+argument.
+
+For example, again for CMS, you want to automatically decode
+``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
+structures it may hold. Also, automatically decode ``controlSequence``
+of ``PKIResponse``::
+
+ content_info, tail = ContentInfo().decode(data, defines_by_path=(
+ (
+ ("contentType",),
+ ("content", {id_signedData: SignedData()}),
+ ),
+ (
+ (
+ "content",
+ decode_path_defby(id_signedData),
+ "encapContentInfo",
+ "eContentType",
+ ),
+ ("eContent", {
+ id_cct_PKIData: PKIData(),
+ id_cct_PKIResponse: PKIResponse(),
+ }),
+ ),
+ (
+ (
+ "content",
+ decode_path_defby(id_signedData),
+ "encapContentInfo",
+ "eContent",
+ decode_path_defby(id_cct_PKIResponse),
+ "controlSequence",
+ any,
+ "attrType",
+ ),
+ ("attrValues", {
+ id_cmc_recipientNonce: RecipientNonce(),
+ id_cmc_senderNonce: SenderNonce(),
+ id_cmc_statusInfoV2: CMCStatusInfoV2(),
+ id_cmc_transactionId: TransactionId(),
+ }),
+ ),
+ ))
+
+Pay attention for :py:func:`pyderasn.decode_path_defby` and ``any``.
+First function is useful for path construction when some automatic
+decoding is already done. ``any`` is used for human readability and
+means literally any value it meet -- useful for sequence and set of-s.
+
Primitive types
---------------
"Boolean",
"BoundsError",
"Choice",
+ "decode_path_defby",
"DecodeError",
"Enumerated",
"GeneralizedTime",
def _encode(self): # pragma: no cover
raise NotImplementedError()
- def _decode(self, tlv, offset=0, decode_path=()): # pragma: no cover
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None): # pragma: no cover
raise NotImplementedError()
def encode(self):
return raw
return b"".join((self._expl, len_encode(len(raw)), raw))
- def decode(self, data, offset=0, leavemm=False, decode_path=()):
+ def decode(self, data, offset=0, leavemm=False, decode_path=(), defines_by_path=None):
"""Decode the data
:param data: either binary or memoryview
:param int offset: initial data's offset
:param bool leavemm: do we need to leave memoryview of remaining
data as is, or convert it to bytes otherwise
+ :param defines_by_path: :ref:`Read about DEFINED BY <definedby>`
:returns: (Obj, remaining data)
"""
tlv = memoryview(data)
tlv,
offset,
decode_path=decode_path,
+ defines_by_path=defines_by_path,
)
else:
try:
v,
offset=offset + tlen + llen,
decode_path=decode_path,
+ defines_by_path=defines_by_path,
)
return obj, (tail if leavemm else tail.tobytes())
return self.expl_tlen + self.expl_llen + self.expl_vlen
+def decode_path_defby(defined_by):
+ """DEFINED BY representation inside decode path
+ """
+ return "DEFINED BY (%s)" % defined_by
+
+
########################################################################
# Pretty printing
########################################################################
(b"\xFF" if self._value else b"\x00"),
))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
break
return b"".join((self.tag, len_encode(len(octets)), octets))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
octets,
))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
>>> OctetString(b"hell", bounds=(4, 4))
OCTET STRING 4 bytes 68656c6c
"""
- __slots__ = ("_bound_min", "_bound_max")
+ __slots__ = ("_bound_min", "_bound_max", "defined")
tag_default = tag_encode(4)
asn1_type_name = "OCTET STRING"
)
if self._value is None:
self._value = default
+ self.defined = None
def _value_sanitize(self, value):
if issubclass(value.__class__, OctetString):
self._value,
))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
expl_llen=self.expl_llen if self.expled else None,
expl_vlen=self.expl_vlen if self.expled else None,
)
+ defined_by, defined = self.defined or (None, None)
+ if defined_by is not None:
+ yield defined.pps(
+ decode_path=decode_path + (decode_path_defby(defined_by),)
+ )
class Null(Obj):
def _encode(self):
return self.tag + len_encode(0)
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
Traceback (most recent call last):
pyderasn.InvalidOID: unacceptable first arc value
"""
- __slots__ = ()
+ __slots__ = ("defines",)
tag_default = tag_encode(6)
asn1_type_name = "OBJECT IDENTIFIER"
def __init__(
self,
value=None,
+ defines=None,
impl=None,
expl=None,
default=None,
:param value: set the value. Either tuples of integers,
string of "."-concatenated integers, or
:py:class:`pyderasn.ObjectIdentifier` object
+ :param defines: tuple of two elements. First one is a name of
+ field inside :py:class:`pyderasn.Sequence`,
+ defining with that OID. Second element is a
+ ``{OID: pyderasn.Obj()}`` dictionary, mapping
+ between current OID value and structure applied
+ to defined field.
+ :ref:`Read about DEFINED BY <definedby>`
:param bytes impl: override default tag with ``IMPLICIT`` one
:param bytes expl: override default tag with ``EXPLICIT`` one
:param default: set default value. Type same as in ``value``
)
if self._value is None:
self._value = default
+ self.defines = defines
def __add__(self, their):
if isinstance(their, self.__class__):
def copy(self):
obj = self.__class__()
obj._value = self._value
+ obj.defines = self.defines
obj.tag = self.tag
obj._expl = self._expl
obj.default = self.default
def __call__(
self,
value=None,
+ defines=None,
impl=None,
expl=None,
default=None,
):
return self.__class__(
value=value,
+ defines=self.defines if defines is None else defines,
impl=self.tag if impl is None else impl,
expl=self._expl if expl is None else expl,
default=self.default if default is None else default,
v = b"".join(octets)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
self._assert_ready()
return self._value[1].encode()
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
for choice, spec in self.specs.items():
try:
value, tail = spec.decode(
offset=offset,
leavemm=True,
decode_path=decode_path + (choice,),
+ defines_by_path=defines_by_path,
)
except TagMismatch:
continue
>>> hexenc(bytes(a))
b'0x040x0bhello world'
"""
- __slots__ = ()
+ __slots__ = ("defined",)
tag_default = tag_encode(0)
asn1_type_name = "ANY"
"""
super(Any, self).__init__(None, expl, None, optional, _decoded)
self._value = None if value is None else self._value_sanitize(value)
+ self.defined = None
def _value_sanitize(self, value):
if isinstance(value, self.__class__):
self._assert_ready()
return self._value
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, tlen, lv = tag_strip(tlv)
l, llen, v = len_decode(lv)
expl_llen=self.expl_llen if self.expled else None,
expl_vlen=self.expl_vlen if self.expled else None,
)
+ defined_by, defined = self.defined or (None, None)
+ if defined_by is not None:
+ yield defined.pps(
+ decode_path=decode_path + (decode_path_defby(defined_by),)
+ )
########################################################################
# ASN.1 constructed types
########################################################################
+def get_def_by_path(defines_by_path, sub_decode_path):
+ """Get define by decode path
+ """
+ for path, define in defines_by_path:
+ if len(path) != len(sub_decode_path):
+ continue
+ for p1, p2 in zip(path, sub_decode_path):
+ if (p1 != any) and (p1 != p2):
+ break
+ else:
+ return define
+
+
class Sequence(Obj):
"""``SEQUENCE`` structure type
v = b"".join(self._encoded_values())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
v, tail = v[:l], v[l:]
sub_offset = offset + tlen + llen
values = {}
+ defines = {}
for name, spec in self.specs.items():
if len(v) == 0 and spec.optional:
continue
+ sub_decode_path = decode_path + (name,)
try:
value, v_tail = spec.decode(
v,
sub_offset,
leavemm=True,
- decode_path=decode_path + (name,),
+ decode_path=sub_decode_path,
+ defines_by_path=defines_by_path,
)
except TagMismatch:
if spec.optional:
continue
raise
+
+ defined = defines.pop(name, None)
+ if defined is not None:
+ defined_by, defined_spec = defined
+ if issubclass(value.__class__, SequenceOf):
+ for i, _value in enumerate(value):
+ sub_sub_decode_path = sub_decode_path + (
+ str(i),
+ decode_path_defby(defined_by),
+ )
+ defined_value, defined_tail = defined_spec.decode(
+ memoryview(bytes(_value)),
+ sub_offset + value.tlen + value.llen,
+ leavemm=True,
+ decode_path=sub_sub_decode_path,
+ defines_by_path=defines_by_path,
+ )
+ if len(defined_tail) > 0:
+ raise DecodeError(
+ "remaining data",
+ klass=self.__class__,
+ decode_path=sub_sub_decode_path,
+ offset=offset,
+ )
+ _value.defined = (defined_by, defined_value)
+ else:
+ defined_value, defined_tail = defined_spec.decode(
+ memoryview(bytes(value)),
+ sub_offset + value.tlen + value.llen,
+ leavemm=True,
+ decode_path=sub_decode_path + (decode_path_defby(defined_by),),
+ defines_by_path=defines_by_path,
+ )
+ if len(defined_tail) > 0:
+ raise DecodeError(
+ "remaining data",
+ klass=self.__class__,
+ decode_path=sub_decode_path + (decode_path_defby(defined_by),),
+ offset=offset,
+ )
+ value.defined = (defined_by, defined_value)
+
sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
v = v_tail
if spec.default is not None and value == spec.default:
# but we allow that anyway
continue
values[name] = value
+
+ spec_defines = getattr(spec, "defines", None)
+ if defines_by_path is not None and spec_defines is None:
+ spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
+ if spec_defines is not None:
+ what, schema = spec_defines
+ defined = schema.get(value, None)
+ if defined is not None:
+ defines[what] = (value, defined)
if len(v) > 0:
raise DecodeError(
"remaining data",
v = b"".join(raws)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
sub_offset,
leavemm=True,
decode_path=decode_path + (name,),
+ defines_by_path=defines_by_path,
)
except TagMismatch:
continue
v = b"".join(self._encoded_values())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset=0, decode_path=(), defines_by_path=None):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
sub_offset,
leavemm=True,
decode_path=decode_path + (str(len(_value)),),
+ defines_by_path=defines_by_path,
)
sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
v = v_tail
from pyderasn import Boolean
from pyderasn import BoundsError
from pyderasn import Choice
+from pyderasn import decode_path_defby
from pyderasn import DecodeError
from pyderasn import Enumerated
from pyderasn import GeneralizedTime
from pyderasn import Set
from pyderasn import SetOf
from pyderasn import tag_ctxc
+from pyderasn import tag_ctxp
from pyderasn import tag_decode
from pyderasn import tag_encode
from pyderasn import tag_strip
_decoded_initial,
) = d.draw(oid_values_strategy())
obj_initial = klass(
- value_initial,
- impl_initial,
- expl_initial,
- default_initial,
- optional_initial or False,
- _decoded_initial,
+ value=value_initial,
+ impl=impl_initial,
+ expl=expl_initial,
+ default=default_initial,
+ optional=optional_initial or False,
+ _decoded=_decoded_initial,
)
(
value,
optional,
_decoded,
) = d.draw(oid_values_strategy(do_expl=impl_initial is None))
- obj = obj_initial(value, impl, expl, default, optional)
+ obj = obj_initial(
+ value=value,
+ impl=impl,
+ expl=expl,
+ default=default,
+ optional=optional,
+ )
if obj.ready:
value_expected = default if value is None else value
value_expected = (
@given(oid_values_strategy())
def test_copy(self, values):
for klass in (ObjectIdentifier, ObjectIdentifierInherited):
- obj = klass(*values)
+ (
+ value,
+ impl,
+ expl,
+ default,
+ optional,
+ _decoded,
+ ) = values
+ obj = klass(
+ value=value,
+ impl=impl,
+ expl=expl,
+ default=default,
+ optional=optional,
+ _decoded=_decoded,
+ )
obj_copied = obj.copy()
self.assert_copied_basic_fields(obj, obj_copied)
self.assertEqual(obj._value, obj_copied._value)
with self.assertRaises(AttributeError):
inher = Inher()
inher.unexistent = "whatever"
+
+
+class TestOIDDefines(TestCase):
+ @given(data_strategy())
+ def runTest(self, d):
+ value_names = list(d.draw(sets(text_letters(), min_size=1, max_size=10)))
+ value_name_chosen = d.draw(sampled_from(value_names))
+ oids = [
+ ObjectIdentifier(oid)
+ for oid in d.draw(sets(oid_strategy(), min_size=2, max_size=10))
+ ]
+ oid_chosen = d.draw(sampled_from(oids))
+ values = d.draw(lists(
+ integers(),
+ min_size=len(value_names),
+ max_size=len(value_names),
+ ))
+ _schema = [
+ ("type", ObjectIdentifier(defines=(value_name_chosen, {
+ oid: Integer() for oid in oids[:-1]
+ }))),
+ ]
+ for i, value_name in enumerate(value_names):
+ _schema.append((value_name, Any(expl=tag_ctxp(i))))
+
+ class Seq(Sequence):
+ schema = _schema
+ seq = Seq()
+ for value_name, value in zip(value_names, values):
+ seq[value_name] = Any(Integer(value).encode())
+ seq["type"] = oid_chosen
+ seq, _ = Seq().decode(seq.encode())
+ for value_name in value_names:
+ if value_name == value_name_chosen:
+ continue
+ self.assertIsNone(seq[value_name].defined)
+ if value_name_chosen in oids[:-1]:
+ self.assertIsNotNone(seq[value_name_chosen].defined)
+ self.assertEqual(seq[value_name_chosen].defined[0], oid_chosen)
+ self.assertIsInstance(seq[value_name_chosen].defined[1], Integer)
+
+
+class TestDefinesByPath(TestCase):
+ def runTest(self):
+ class Seq(Sequence):
+ schema = (
+ ("type", ObjectIdentifier()),
+ ("value", OctetString(expl=tag_ctxc(123))),
+ )
+
+ class SeqInner(Sequence):
+ schema = (
+ ("typeInner", ObjectIdentifier()),
+ ("valueInner", Any()),
+ )
+
+ class PairValue(SetOf):
+ schema = Any()
+
+ class Pair(Sequence):
+ schema = (
+ ("type", ObjectIdentifier()),
+ ("value", PairValue()),
+ )
+
+ class Pairs(SequenceOf):
+ schema = Pair()
+
+ (
+ type_integered,
+ type_sequenced,
+ type_innered,
+ type_octet_stringed,
+ ) = [
+ ObjectIdentifier(oid)
+ for oid in sets(oid_strategy(), min_size=4, max_size=4).example()
+ ]
+ seq_integered = Seq()
+ seq_integered["type"] = type_integered
+ seq_integered["value"] = OctetString(Integer(123).encode())
+ seq_integered_raw = seq_integered.encode()
+
+ pairs = Pairs()
+ pairs_input = (
+ (type_octet_stringed, OctetString(b"whatever")),
+ (type_integered, Integer(123)),
+ (type_octet_stringed, OctetString(b"whenever")),
+ (type_integered, Integer(234)),
+ )
+ for t, v in pairs_input:
+ pair = Pair()
+ pair["type"] = t
+ pair["value"] = PairValue((Any(v),))
+ pairs.append(pair)
+ seq_inner = SeqInner()
+ seq_inner["typeInner"] = type_innered
+ seq_inner["valueInner"] = Any(pairs)
+ seq_sequenced = Seq()
+ seq_sequenced["type"] = type_sequenced
+ seq_sequenced["value"] = OctetString(seq_inner.encode())
+ seq_sequenced_raw = seq_sequenced.encode()
+
+ defines_by_path = []
+ seq_integered, _ = Seq().decode(seq_integered_raw)
+ self.assertIsNone(seq_integered["value"].defined)
+ defines_by_path.append(
+ (("type",), ("value", {
+ type_integered: Integer(),
+ type_sequenced: SeqInner(),
+ }))
+ )
+ seq_integered, _ = Seq().decode(seq_integered_raw, defines_by_path=defines_by_path)
+ self.assertIsNotNone(seq_integered["value"].defined)
+ self.assertEqual(seq_integered["value"].defined[0], type_integered)
+ self.assertEqual(seq_integered["value"].defined[1], Integer(123))
+
+ seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+ self.assertIsNotNone(seq_sequenced["value"].defined)
+ self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
+ seq_inner = seq_sequenced["value"].defined[1]
+ self.assertIsNone(seq_inner["valueInner"].defined)
+
+ defines_by_path.append((
+ ("value", decode_path_defby(type_sequenced), "typeInner"),
+ ("valueInner", {type_innered: Pairs()}),
+ ))
+ seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+ self.assertIsNotNone(seq_sequenced["value"].defined)
+ self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
+ seq_inner = seq_sequenced["value"].defined[1]
+ self.assertIsNotNone(seq_inner["valueInner"].defined)
+ self.assertEqual(seq_inner["valueInner"].defined[0], type_innered)
+ pairs = seq_inner["valueInner"].defined[1]
+ for pair in pairs:
+ self.assertIsNone(pair["value"][0].defined)
+
+ defines_by_path.append((
+ (
+ "value",
+ decode_path_defby(type_sequenced),
+ "valueInner",
+ decode_path_defby(type_innered),
+ any,
+ "type",
+ ),
+ ("value", {
+ type_integered: Integer(),
+ type_octet_stringed: OctetString(),
+ }),
+ ))
+ seq_sequenced, _ = Seq().decode(seq_sequenced_raw, defines_by_path=defines_by_path)
+ self.assertIsNotNone(seq_sequenced["value"].defined)
+ self.assertEqual(seq_sequenced["value"].defined[0], type_sequenced)
+ seq_inner = seq_sequenced["value"].defined[1]
+ self.assertIsNotNone(seq_inner["valueInner"].defined)
+ self.assertEqual(seq_inner["valueInner"].defined[0], type_innered)
+ pairs_got = seq_inner["valueInner"].defined[1]
+ for pair_input, pair_got in zip(pairs_input, pairs_got):
+ self.assertEqual(pair_got["value"][0].defined[0], pair_input[0])
+ self.assertEqual(pair_got["value"][0].defined[1], pair_input[1])