#!/usr/bin/env python
# coding: utf-8
# PyDERASN -- Python ASN.1 DER codec with abstract structures
-# Copyright (C) 2017 Sergey Matveev <stargrave@stargrave.org>
+# Copyright (C) 2017-2018 Sergey Matveev <stargrave@stargrave.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
the default tag used during coding process. You can override it with
either ``IMPLICIT`` (using ``impl`` keyword argument), or
-``EXPLICIT`` one (using ``expl`` keyword argument). Both arguments takes
+``EXPLICIT`` one (using ``expl`` keyword argument). Both arguments take
raw binary string, containing that tag. You can **not** set implicit and
explicit tags simultaneously.
Implicit tag is not explicitly shown.
-Two object of the same type, but with different implicit/explicit tags
+Two objects of the same type, but with different implicit/explicit tags
are **not** equal.
-You can get objects effective tag (either default or implicited) through
+You can get object's effective tag (either default or implicited) through
``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
function::
When default argument is used and value is not specified, then it equals
to default one.
+.. _bounds:
+
Size constraints
________________
Common methods
______________
-All objects have ``ready`` boolean property, that tells if it is ready
-to be encoded. If that kind of action is performed on unready object,
-then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
+All objects have ``ready`` boolean property, that tells if object is
+ready to be encoded. If that kind of action is performed on unready
+object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
+
+All objects have ``copy()`` method, that returns their copy, that can be
+safely mutated.
-All objects have ``copy()`` method, returning its copy, that can be safely
-mutated.
+.. _decoding:
Decoding
-________
+--------
Decoding is performed using ``decode()`` method. ``offset`` optional
argument could be used to set initial object's offset in the binary
When error occurs, then :py:exc:`pyderasn.DecodeError` is raised.
+.. _ctx:
+
+Context
+_______
+
+You can specify so called context keyword argument during ``decode()``
+invocation. It is dictionary containing various options governing
+decoding process.
+
+Currently available context options:
+
+* :ref:`defines_by_path <defines_by_path_ctx>`
+* :ref:`strict_default_existence <strict_default_existence_ctx>`
+
+.. _pprinting:
+
Pretty printing
-_______________
+---------------
All objects have ``pps()`` method, that is a generator of
:py:class:`pyderasn.PP` namedtuple, holding various raw information
>>> print(pprint(obj))
0 [1,1, 2] INTEGER -12345
+.. _definedby:
+
+DEFINED BY
+----------
+
+ASN.1 structures often have ANY and OCTET STRING fields, that are
+DEFINED BY some previously met ObjectIdentifier. This library provides
+ability to specify mapping between some OID and field that must be
+decoded with specific specification.
+
+defines kwarg
+_____________
+
+:py:class:`pyderasn.ObjectIdentifier` field inside
+:py:class:`pyderasn.Sequence` can hold mapping between OIDs and
+necessary for decoding structures. For example, CMS (:rfc:`5652`)
+container::
+
+ class ContentInfo(Sequence):
+ schema = (
+ ("contentType", ContentType(defines=((("content",), {
+ id_digestedData: DigestedData(),
+ id_signedData: SignedData(),
+ }),))),
+ ("content", Any(expl=tag_ctxc(0))),
+ )
+
+``contentType`` field tells that it defines that ``content`` must be
+decoded with ``SignedData`` specification, if ``contentType`` equals to
+``id-signedData``. The same applies to ``DigestedData``. If
+``contentType`` contains unknown OID, then no automatic decoding is
+done.
+
+You can specify multiple fields, that will be autodecoded -- that is why
+``defines`` kwarg is a sequence. You can specify defined field
+relatively or absolutely to current decode path. For example ``defines``
+for AlgorithmIdentifier of X.509's
+``tbsCertificate.subjectPublicKeyInfo.algorithm.algorithm``::
+
+ (
+ (('parameters',), {
+ id_ecPublicKey: ECParameters(),
+ id_GostR3410_2001: GostR34102001PublicKeyParameters(),
+ }),
+ (('..', 'subjectPublicKey'), {
+ id_rsaEncryption: RSAPublicKey(),
+ id_GostR3410_2001: OctetString(),
+ }),
+ ),
+
+tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
+autodecode its parameters inside SPKI's algorithm and its public key
+itself.
+
+Following types can be automatically decoded (DEFINED BY):
+
+* :py:class:`pyderasn.Any`
+* :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
+* :py:class:`pyderasn.OctetString`
+* :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
+ ``Any``/``OctetString``-s
+
+When any of those fields is automatically decoded, then ``.defined``
+attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
+was defined, ``value`` contains corresponding decoded value. For example
+above, ``content_info["content"].defined == (id_signedData,
+signed_data)``.
+
+.. _defines_by_path_ctx:
+
+defines_by_path context option
+______________________________
+
+Sometimes you either can not or do not want to explicitly set *defines*
+in the scheme. You can dynamically apply those definitions when calling
+``.decode()`` method.
+
+Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
+value must be sequence of following tuples::
+
+ (decode_path, defines)
+
+where ``decode_path`` is a tuple holding so-called decode path to the
+exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
+``defines``, holding exactly the same value as accepted in its keyword
+argument.
+
+For example, again for CMS, you want to automatically decode
+``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
+structures it may hold. Also, automatically decode ``controlSequence``
+of ``PKIResponse``::
+
+ content_info, tail = ContentInfo().decode(data, defines_by_path=(
+ (
+ ("contentType",),
+ ((("content",), {id_signedData: SignedData()}),),
+ ),
+ (
+ (
+ "content",
+ decode_path_defby(id_signedData),
+ "encapContentInfo",
+ "eContentType",
+ ),
+ ((("eContent",), {
+ id_cct_PKIData: PKIData(),
+ id_cct_PKIResponse: PKIResponse(),
+ })),
+ ),
+ (
+ (
+ "content",
+ decode_path_defby(id_signedData),
+ "encapContentInfo",
+ "eContent",
+ decode_path_defby(id_cct_PKIResponse),
+ "controlSequence",
+ any,
+ "attrType",
+ ),
+ ((("attrValues",), {
+ id_cmc_recipientNonce: RecipientNonce(),
+ id_cmc_senderNonce: SenderNonce(),
+ id_cmc_statusInfoV2: CMCStatusInfoV2(),
+ id_cmc_transactionId: TransactionId(),
+ })),
+ ),
+ ))
+
+Pay attention for :py:func:`pyderasn.decode_path_defby` and ``any``.
+First function is useful for path construction when some automatic
+decoding is already done. ``any`` means literally any value it meet --
+useful for SEQUENCE/SET OF-s.
+
Primitive types
---------------
Various
-------
+.. autofunction:: pyderasn.abs_decode_path
.. autofunction:: pyderasn.hexenc
.. autofunction:: pyderasn.hexdec
.. autofunction:: pyderasn.tag_encode
"Boolean",
"BoundsError",
"Choice",
+ "decode_path_defby",
"DecodeError",
"Enumerated",
"GeneralizedTime",
########################################################################
class AutoAddSlots(type):
- def __new__(cls, name, bases, _dict):
+ def __new__(mcs, name, bases, _dict):
_dict["__slots__"] = _dict.get("__slots__", ())
- return type.__new__(cls, name, bases, _dict)
+ return type.__new__(mcs, name, bases, _dict)
@add_metaclass(AutoAddSlots)
optional=False,
_decoded=(0, 0, 0),
):
- if impl is None:
- self.tag = getattr(self, "impl", self.tag_default)
- else:
- self.tag = impl
+ self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
self._expl = getattr(self, "expl", None) if expl is None else expl
if self.tag != self.tag_default and self._expl is not None:
raise ValueError(
def _encode(self): # pragma: no cover
raise NotImplementedError()
- def _decode(self, tlv, offset=0, decode_path=()): # pragma: no cover
+ def _decode(self, tlv, offset, decode_path, ctx): # pragma: no cover
raise NotImplementedError()
def encode(self):
return raw
return b"".join((self._expl, len_encode(len(raw)), raw))
- def decode(self, data, offset=0, leavemm=False, decode_path=()):
+ def decode(self, data, offset=0, leavemm=False, decode_path=(), ctx=None):
"""Decode the data
:param data: either binary or memoryview
:param int offset: initial data's offset
:param bool leavemm: do we need to leave memoryview of remaining
data as is, or convert it to bytes otherwise
+ :param ctx: optional :ref:`context <ctx>` governing decoding process.
:returns: (Obj, remaining data)
"""
+ if ctx is None:
+ ctx = {}
tlv = memoryview(data)
if self._expl is None:
obj, tail = self._decode(
tlv,
offset,
decode_path=decode_path,
+ ctx=ctx,
)
else:
try:
obj, tail = self._decode(
v,
offset=offset + tlen + llen,
- decode_path=(),
+ decode_path=decode_path,
+ ctx=ctx,
)
return obj, (tail if leavemm else tail.tobytes())
return self.expl_tlen + self.expl_llen + self.expl_vlen
+def decode_path_defby(defined_by):
+ """DEFINED BY representation inside decode path
+ """
+ return "DEFINED BY (%s)" % defined_by
+
+
########################################################################
# Pretty printing
########################################################################
(b"\xFF" if self._value else b"\x00"),
))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
self._value = value
specs = getattr(self, "schema", {}) if _specs is None else _specs
self.specs = specs if isinstance(specs, dict) else dict(specs)
- if bounds is None:
- self._bound_min, self._bound_max = getattr(
- self,
- "bounds",
- (float("-inf"), float("+inf")),
- )
- else:
- self._bound_min, self._bound_max = bounds
+ self._bound_min, self._bound_max = getattr(
+ self,
+ "bounds",
+ (float("-inf"), float("+inf")),
+ ) if bounds is None else bounds
if value is not None:
self._value = self._value_sanitize(value)
if default is not None:
break
return b"".join((self.tag, len_encode(len(octets)), octets))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
>>> b.specs
{'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
"""
- __slots__ = ("specs",)
+ __slots__ = ("specs", "defined")
tag_default = tag_encode(3)
asn1_type_name = "BIT STRING"
)
if value is None:
self._value = default
+ self.defined = None
def _bits2octets(self, bits):
if len(self.specs) > 0:
def copy(self):
obj = self.__class__(_specs=self.specs)
- obj._value = self._value
+ value = self._value
+ if value is not None:
+ value = (value[0], value[1])
+ obj._value = value
obj.tag = self.tag
obj._expl = self._expl
obj.default = self.default
octets,
))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
expl_llen=self.expl_llen if self.expled else None,
expl_vlen=self.expl_vlen if self.expled else None,
)
+ defined_by, defined = self.defined or (None, None)
+ if defined_by is not None:
+ yield defined.pps(
+ decode_path=decode_path + (decode_path_defby(defined_by),)
+ )
class OctetString(Obj):
>>> OctetString(b"hell", bounds=(4, 4))
OCTET STRING 4 bytes 68656c6c
"""
- __slots__ = ("_bound_min", "_bound_max")
+ __slots__ = ("_bound_min", "_bound_max", "defined")
tag_default = tag_encode(4)
asn1_type_name = "OCTET STRING"
_decoded,
)
self._value = value
- if bounds is None:
- self._bound_min, self._bound_max = getattr(
- self,
- "bounds",
- (0, float("+inf")),
- )
- else:
- self._bound_min, self._bound_max = bounds
+ self._bound_min, self._bound_max = getattr(
+ self,
+ "bounds",
+ (0, float("+inf")),
+ ) if bounds is None else bounds
if value is not None:
self._value = self._value_sanitize(value)
if default is not None:
)
if self._value is None:
self._value = default
+ self.defined = None
def _value_sanitize(self, value):
if issubclass(value.__class__, OctetString):
self._value,
))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
expl_llen=self.expl_llen if self.expled else None,
expl_vlen=self.expl_vlen if self.expled else None,
)
+ defined_by, defined = self.defined or (None, None)
+ if defined_by is not None:
+ yield defined.pps(
+ decode_path=decode_path + (decode_path_defby(defined_by),)
+ )
class Null(Obj):
def _encode(self):
return self.tag + len_encode(0)
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
Traceback (most recent call last):
pyderasn.InvalidOID: unacceptable first arc value
"""
- __slots__ = ()
+ __slots__ = ("defines",)
tag_default = tag_encode(6)
asn1_type_name = "OBJECT IDENTIFIER"
def __init__(
self,
value=None,
+ defines=(),
impl=None,
expl=None,
default=None,
:param value: set the value. Either tuples of integers,
string of "."-concatenated integers, or
:py:class:`pyderasn.ObjectIdentifier` object
+ :param defines: sequence of tuples. Each tuple has two elements.
+ First one is relative to current one decode
+ path, aiming to the field defined by that OID.
+ Read about relative path in
+ :py:func:`pyderasn.abs_decode_path`. Second
+ tuple element is ``{OID: pyderasn.Obj()}``
+ dictionary, mapping between current OID value
+ and structure applied to defined field.
+ :ref:`Read about DEFINED BY <definedby>`
:param bytes impl: override default tag with ``IMPLICIT`` one
:param bytes expl: override default tag with ``EXPLICIT`` one
:param default: set default value. Type same as in ``value``
)
if self._value is None:
self._value = default
+ self.defines = defines
def __add__(self, their):
if isinstance(their, self.__class__):
def copy(self):
obj = self.__class__()
obj._value = self._value
+ obj.defines = self.defines
obj.tag = self.tag
obj._expl = self._expl
obj.default = self.default
def __call__(
self,
value=None,
+ defines=None,
impl=None,
expl=None,
default=None,
):
return self.__class__(
value=value,
+ defines=self.defines if defines is None else defines,
impl=self.tag if impl is None else impl,
expl=self._expl if expl is None else expl,
default=self.default if default is None else default,
v = b"".join(octets)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
asn1_type_name = "IA5"
+LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
+LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
+LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
+
+
class UTCTime(CommonString):
"""``UTCTime`` datetime type
return value.strftime(self.fmt).encode("ascii")
if isinstance(value, binary_type):
value_decoded = value.decode("ascii")
- if len(value_decoded) == 2 + 2 + 2 + 2 + 2 + 2 + 1:
+ if len(value_decoded) == LEN_YYMMDDHHMMSSZ:
try:
datetime.strptime(value_decoded, self.fmt)
except ValueError:
).encode("ascii")
if isinstance(value, binary_type):
value_decoded = value.decode("ascii")
- if len(value_decoded) == 4 + 2 + 2 + 2 + 2 + 2 + 1:
+ if len(value_decoded) == LEN_YYYYMMDDHHMMSSZ:
try:
datetime.strptime(value_decoded, self.fmt)
except ValueError:
"invalid GeneralizedTime (without ms) format",
)
return value
- elif len(value_decoded) >= 4 + 2 + 2 + 2 + 2 + 2 + 1 + 1 + 1:
+ elif len(value_decoded) >= LEN_YYYYMMDDHHMMSSDMZ:
try:
datetime.strptime(value_decoded, self.fmt_ms)
except ValueError:
def todatetime(self):
value = self._value.decode("ascii")
- if len(value) == 4 + 2 + 2 + 2 + 2 + 2 + 1:
+ if len(value) == LEN_YYYYMMDDHHMMSSZ:
return datetime.strptime(value, self.fmt)
return datetime.strptime(value, self.fmt_ms)
self._assert_ready()
return self._value[1].encode()
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
for choice, spec in self.specs.items():
try:
value, tail = spec.decode(
offset=offset,
leavemm=True,
decode_path=decode_path + (choice,),
+ ctx=ctx,
)
except TagMismatch:
continue
>>> hexenc(bytes(a))
b'0x040x0bhello world'
"""
- __slots__ = ()
+ __slots__ = ("defined",)
tag_default = tag_encode(0)
asn1_type_name = "ANY"
"""
super(Any, self).__init__(None, expl, None, optional, _decoded)
self._value = None if value is None else self._value_sanitize(value)
+ self.defined = None
def _value_sanitize(self, value):
if isinstance(value, self.__class__):
self._assert_ready()
return self._value
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, tlen, lv = tag_strip(tlv)
l, llen, v = len_decode(lv)
expl_llen=self.expl_llen if self.expled else None,
expl_vlen=self.expl_vlen if self.expled else None,
)
+ defined_by, defined = self.defined or (None, None)
+ if defined_by is not None:
+ yield defined.pps(
+ decode_path=decode_path + (decode_path_defby(defined_by),)
+ )
########################################################################
# ASN.1 constructed types
########################################################################
+def get_def_by_path(defines_by_path, sub_decode_path):
+ """Get define by decode path
+ """
+ for path, define in defines_by_path:
+ if len(path) != len(sub_decode_path):
+ continue
+ for p1, p2 in zip(path, sub_decode_path):
+ if (p1 != any) and (p1 != p2):
+ break
+ else:
+ return define
+
+
+def abs_decode_path(decode_path, rel_path):
+ """Create an absolute decode path from current and relative ones
+
+ :param decode_path: current decode path, starting point.
+ Tuple of strings
+ :param rel_path: relative path to ``decode_path``. Tuple of strings.
+ If first tuple's element is "/", then treat it as
+ an absolute path, ignoring ``decode_path`` as
+ starting point. Also this tuple can contain ".."
+ elements, stripping the leading element from
+ ``decode_path``
+
+ >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
+ ("foo", "bar", "baz", "whatever")
+ >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
+ ("foo", "whatever")
+ >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
+ ("baz", "whatever")
+ """
+ if rel_path[0] == "/":
+ return rel_path[1:]
+ if rel_path[0] == "..":
+ return abs_decode_path(decode_path[:-1], rel_path[1:])
+ return decode_path + rel_path
+
+
class Sequence(Obj):
"""``SEQUENCE`` structure type
You have to make specification of sequence::
class Extension(Sequence):
- __slots__ = ()
schema = (
("extnID", ObjectIdentifier()),
("critical", Boolean(default=False)),
>>> tbs = TBSCertificate()
>>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
+ Assign ``None`` to remove value from sequence.
+
You can know if value exists/set in the sequence and take its value:
>>> "extnID" in ext, "extnValue" in ext, "critical" in ext
All defaulted values are always optional.
+ .. _strict_default_existence_ctx:
+
.. warning::
When decoded DER contains defaulted value inside, then
- technically this is not valid DER encoding. But we allow
- and pass it. Of course reencoding of that kind of DER will
+ technically this is not valid DER encoding. But we allow and pass
+ it **by default**. Of course reencoding of that kind of DER will
result in different binary representation (validly without
- defaulted value inside).
+ defaulted value inside). You can enable strict defaulted values
+ existence validation by setting ``"strict_default_existence":
+ True`` :ref:`context <ctx>` option -- decoding process will raise
+ an exception if defaulted value is met.
Two sequences are equal if they have equal specification (schema),
implicit/explicit tagging and the same values.
v = b"".join(self._encoded_values())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
for name, spec in self.specs.items():
if len(v) == 0 and spec.optional:
continue
+ sub_decode_path = decode_path + (name,)
try:
value, v_tail = spec.decode(
v,
sub_offset,
leavemm=True,
- decode_path=decode_path + (name,),
+ decode_path=sub_decode_path,
+ ctx=ctx,
)
except TagMismatch:
if spec.optional:
continue
raise
+
+ defined = get_def_by_path(ctx.get("defines", ()), sub_decode_path)
+ if defined is not None:
+ defined_by, defined_spec = defined
+ if issubclass(value.__class__, SequenceOf):
+ for i, _value in enumerate(value):
+ sub_sub_decode_path = sub_decode_path + (
+ str(i),
+ decode_path_defby(defined_by),
+ )
+ defined_value, defined_tail = defined_spec.decode(
+ memoryview(bytes(_value)),
+ sub_offset + (
+ (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
+ if value.expled else (value.tlen + value.llen)
+ ),
+ leavemm=True,
+ decode_path=sub_sub_decode_path,
+ ctx=ctx,
+ )
+ if len(defined_tail) > 0:
+ raise DecodeError(
+ "remaining data",
+ klass=self.__class__,
+ decode_path=sub_sub_decode_path,
+ offset=offset,
+ )
+ _value.defined = (defined_by, defined_value)
+ else:
+ defined_value, defined_tail = defined_spec.decode(
+ memoryview(bytes(value)),
+ sub_offset + (
+ (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
+ if value.expled else (value.tlen + value.llen)
+ ),
+ leavemm=True,
+ decode_path=sub_decode_path + (decode_path_defby(defined_by),),
+ ctx=ctx,
+ )
+ if len(defined_tail) > 0:
+ raise DecodeError(
+ "remaining data",
+ klass=self.__class__,
+ decode_path=sub_decode_path + (decode_path_defby(defined_by),),
+ offset=offset,
+ )
+ value.defined = (defined_by, defined_value)
+
sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
v = v_tail
if spec.default is not None and value == spec.default:
- # Encoded default values are not valid in DER,
- # but we still allow that
- continue
+ if ctx.get("strict_default_existence", False):
+ raise DecodeError(
+ "DEFAULT value met",
+ klass=self.__class__,
+ decode_path=sub_decode_path,
+ offset=sub_offset,
+ )
+ else:
+ continue
values[name] = value
+
+ spec_defines = getattr(spec, "defines", ())
+ if len(spec_defines) == 0:
+ defines_by_path = ctx.get("defines_by_path", ())
+ if len(defines_by_path) > 0:
+ spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
+ if spec_defines is not None and len(spec_defines) > 0:
+ for rel_path, schema in spec_defines:
+ defined = schema.get(value, None)
+ if defined is not None:
+ ctx.setdefault("defines", []).append((
+ abs_decode_path(sub_decode_path[:-1], rel_path),
+ (value, defined),
+ ))
if len(v) > 0:
raise DecodeError(
"remaining data",
v = b"".join(raws)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
sub_offset,
leavemm=True,
decode_path=decode_path + (name,),
+ ctx=ctx,
)
except TagMismatch:
continue
if schema is None:
raise ValueError("schema must be specified")
self.spec = schema
- if bounds is None:
- self._bound_min, self._bound_max = getattr(
- self,
- "bounds",
- (0, float("+inf")),
- )
- else:
- self._bound_min, self._bound_max = bounds
+ self._bound_min, self._bound_max = getattr(
+ self,
+ "bounds",
+ (0, float("+inf")),
+ ) if bounds is None else bounds
self._value = []
if value is not None:
self._value = self._value_sanitize(value)
v = b"".join(self._encoded_values())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset=0, decode_path=()):
+ def _decode(self, tlv, offset, decode_path, ctx):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
sub_offset,
leavemm=True,
decode_path=decode_path + (str(len(_value)),),
+ ctx=ctx,
)
sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen)
v = v_tail
return obj
+def generic_decoder(): # pragma: no cover
+ # All of this below is a big hack with self references
+ choice = PrimitiveTypes()
+ choice.specs["SequenceOf"] = SequenceOf(schema=choice)
+ choice.specs["SetOf"] = SetOf(schema=choice)
+ for i in range(31):
+ choice.specs["SequenceOf%d" % i] = SequenceOf(
+ schema=choice,
+ expl=tag_ctxc(i),
+ )
+ choice.specs["Any"] = Any()
+
+ # Class name equals to type name, to omit it from output
+ class SEQUENCEOF(SequenceOf):
+ __slots__ = ()
+ schema = choice
+
+ def pprint_any(obj, oids=None):
+ def _pprint_pps(pps):
+ for pp in pps:
+ if hasattr(pp, "_fields"):
+ if pp.asn1_type_name == Choice.asn1_type_name:
+ continue
+ pp_kwargs = pp._asdict()
+ pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
+ pp = _pp(**pp_kwargs)
+ yield pp_console_row(
+ pp,
+ oids=oids,
+ with_offsets=True,
+ with_blob=False,
+ )
+ for row in pp_console_blob(pp):
+ yield row
+ else:
+ for row in _pprint_pps(pp):
+ yield row
+ return "\n".join(_pprint_pps(obj.pps()))
+ return SEQUENCEOF(), pprint_any
+
+
def main(): # pragma: no cover
import argparse
parser = argparse.ArgumentParser(description="PyDERASN ASN.1 DER decoder")
+ parser.add_argument(
+ "--skip",
+ type=int,
+ default=0,
+ help="Skip that number of bytes from the beginning",
+ )
parser.add_argument(
"--oids",
help="Python path to dictionary with OIDs",
"--schema",
help="Python path to schema definition to use",
)
+ parser.add_argument(
+ "--defines-by-path",
+ help="Python path to decoder's defines_by_path",
+ )
parser.add_argument(
"DERFile",
type=argparse.FileType("rb"),
help="Path to DER file you want to decode",
)
args = parser.parse_args()
+ args.DERFile.seek(args.skip)
der = memoryview(args.DERFile.read())
args.DERFile.close()
oids = obj_by_path(args.oids) if args.oids else {}
from functools import partial
pprinter = partial(pprint, big_blobs=True)
else:
- # All of this below is a big hack with self references
- choice = PrimitiveTypes()
- choice.specs["SequenceOf"] = SequenceOf(schema=choice)
- choice.specs["SetOf"] = SetOf(schema=choice)
- for i in range(31):
- choice.specs["SequenceOf%d" % i] = SequenceOf(
- schema=choice,
- expl=tag_ctxc(i),
- )
- choice.specs["Any"] = Any()
-
- # Class name equals to type name, to omit it from output
- class SEQUENCEOF(SequenceOf):
- __slots__ = ()
- schema = choice
- schema = SEQUENCEOF()
-
- def pprint_any(obj, oids=None):
- def _pprint_pps(pps):
- for pp in pps:
- if hasattr(pp, "_fields"):
- if pp.asn1_type_name == Choice.asn1_type_name:
- continue
- pp_kwargs = pp._asdict()
- pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
- pp = _pp(**pp_kwargs)
- yield pp_console_row(
- pp,
- oids=oids,
- with_offsets=True,
- with_blob=False,
- )
- for row in pp_console_blob(pp):
- yield row
- else:
- for row in _pprint_pps(pp):
- yield row
- return "\n".join(_pprint_pps(obj.pps()))
- pprinter = pprint_any
- obj, tail = schema().decode(der)
+ schema, pprinter = generic_decoder()
+ obj, tail = schema().decode(
+ der,
+ ctx=(
+ None if args.defines_by_path is None else
+ {"defines_by_path": obj_by_path(args.defines_by_path)}
+ ),
+ )
print(pprinter(obj, oids=oids))
if tail != b"":
print("\nTrailing data: %s" % hexenc(tail))