):
print("serial number:", int(obj["userCertificate"]))
+.. note::
+
+ SEQUENCE/SET values with DEFAULT specified are automatically decoded
+ without evgen mode.
+
.. _mmap:
mmap-ed file
bounds = (1, 32)
der_forced = True
+.. _agg_octet_string:
+
agg_octet_string
________________
Virtually it won't take memory more than for keeping small structures
and 1 KB binary chunks.
+.. _seqof-iterators:
+
SEQUENCE OF iterators
_____________________
``RevokedCertificate`` objects. Only binary representation of that
objects will take memory during DER encoding.
+2-pass DER encoding
+-------------------
+
+There is ability to do 2-pass encoding to DER, writing results directly
+to specified writer (buffer, file, whatever). It could be 1.5+ times
+slower than ordinary encoding, but it takes little memory for 1st pass
+state storing. For example, 1st pass state for CACert.org's CRL with
+~416K of certificate entries takes nearly 3.5 MB of memory.
+``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
+nearly 0.5 KB of memory.
+
+If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
+iterators <seqof-iterators>` and write directly to opened file, then
+there is very small memory footprint.
+
+1st pass traverses through all the objects of the structure and returns
+the size of DER encoded structure, together with 1st pass state object.
+That state contains precalculated lengths for various objects inside the
+structure.
+
+::
+
+ fulllen, state = obj.encode1st()
+
+2nd pass takes the writer and 1st pass state. It traverses through all
+the objects again, but writes their encoded representation to the writer.
+
+::
+
+ opener = io.open if PY2 else open
+ with opener("result", "wb") as fd:
+ obj.encode2nd(fd.write, iter(state))
+
+.. warning::
+
+ You **MUST NOT** use 1st pass state if anything is changed in the
+ objects. It is intended to be used immediately after 1st pass is
+ done!
+
+If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
+have to reinitialize the values after the 1st pass. And you **have to**
+be sure that the iterator gives exactly the same values as previously.
+Yes, you have to run your iterator twice -- because this is two pass
+encoding mode.
+
+If you want to encode to the memory, then you can use convenient
+:py:func:`pyderasn.encode2pass` helper.
+
+.. _browser:
+
+ASN.1 browser
+-------------
+.. autofunction:: pyderasn.browse
+
Base Obj
--------
.. autoclass:: pyderasn.Obj
Integer
_______
.. autoclass:: pyderasn.Integer
- :members: __init__, named
+ :members: __init__, named, tohex
BitString
_________
.. autofunction:: pyderasn.abs_decode_path
.. autofunction:: pyderasn.agg_octet_string
+.. autofunction:: pyderasn.ascii_visualize
.. autofunction:: pyderasn.colonize_hex
+.. autofunction:: pyderasn.encode2pass
.. autofunction:: pyderasn.encode_cer
.. autofunction:: pyderasn.file_mmaped
.. autofunction:: pyderasn.hexenc
.. autofunction:: pyderasn.hexdec
+.. autofunction:: pyderasn.hexdump
.. autofunction:: pyderasn.tag_encode
.. autofunction:: pyderasn.tag_decode
.. autofunction:: pyderasn.tag_ctxp
from operator import attrgetter
from string import ascii_letters
from string import digits
+from sys import maxsize as sys_maxsize
from sys import version_info
from unicodedata import category as unicat
def colored(what, *args, **kwargs):
return what
-__version__ = "7.2"
+__version__ = "7.4"
__all__ = (
"agg_octet_string",
"Boolean",
"BoundsError",
"Choice",
+ "colonize_hex",
"DecodeError",
"DecodePathDefBy",
+ "encode2pass",
"encode_cer",
"Enumerated",
"ExceedingData",
LEN1K = len_encode(1000)
+def len_size(l):
+ """How many bytes length field will take
+ """
+ if l < 128:
+ return 1
+ if l < 256: # 1 << 8
+ return 2
+ if l < 65536: # 1 << 16
+ return 3
+ if l < 16777216: # 1 << 24
+ return 4
+ if l < 4294967296: # 1 << 32
+ return 5
+ if l < 1099511627776: # 1 << 40
+ return 6
+ if l < 281474976710656: # 1 << 48
+ return 7
+ if l < 72057594037927936: # 1 << 56
+ return 8
+ raise OverflowError("too big length")
+
+
def write_full(writer, data):
"""Fully write provided data
written += n
+# If it is 64-bit system, then use compact 64-bit array of unsigned
+# longs. Use an ordinary list with universal integers otherwise, that
+# is slower.
+if sys_maxsize > 2 ** 32:
+ def state_2pass_new():
+ return array("L")
+else:
+ def state_2pass_new():
+ return []
+
+
########################################################################
# Base class
########################################################################
@property
def tlen(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return len(self.tag)
@property
def tlvlen(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return self.tlen + self.llen + self.vlen
def _encode(self): # pragma: no cover
raise NotImplementedError()
+ def _encode_cer(self, writer):
+ write_full(writer, self._encode())
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
yield NotImplemented
+ def _encode1st(self, state):
+ raise NotImplementedError()
+
+ def _encode2nd(self, writer, state_iter):
+ raise NotImplementedError()
+
def encode(self):
"""DER encode the structure
return raw
return b"".join((self._expl, len_encode(len(raw)), raw))
+ def encode1st(self, state=None):
+ """Do the 1st pass of 2-pass encoding
+
+ :rtype: (int, array("L"))
+ :returns: full length of encoded data and precalculated various
+ objects lengths
+ """
+ if state is None:
+ state = state_2pass_new()
+ if self._expl is None:
+ return self._encode1st(state)
+ state.append(0)
+ idx = len(state) - 1
+ vlen, _ = self._encode1st(state)
+ state[idx] = vlen
+ fulllen = len(self._expl) + len_size(vlen) + vlen
+ return fulllen, state
+
+ def encode2nd(self, writer, state_iter):
+ """Do the 2nd pass of 2-pass encoding
+
+ :param writer: must comply with ``io.RawIOBase.write`` behaviour
+ :param state_iter: iterator over the 1st pass state (``iter(state)``)
+ """
+ if self._expl is None:
+ self._encode2nd(writer, state_iter)
+ else:
+ write_full(writer, self._expl + len_encode(next(state_iter)))
+ self._encode2nd(writer, state_iter)
+
def encode_cer(self, writer):
"""CER encode the structure to specified writer
if self._expl is not None:
write_full(writer, EOC)
- def _encode_cer(self, writer):
- write_full(writer, self._encode())
-
def hexencode(self):
"""Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
"""
That method is identical to :py:meth:`pyderasn.Obj.decode`, but
it returns the generator producing ``(decode_path, obj, tail)``
- values. See :ref:`evgen mode <evgen_mode>`.
+ values.
+ .. seealso:: :ref:`evgen mode <evgen_mode>`.
"""
if ctx is None:
ctx = {}
@property
def expled(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return self._expl is not None
@property
def expl_tag(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return self._expl
@property
def expl_tlen(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return len(self._expl)
@property
def expl_llen(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
if self.expl_lenindef:
return 1
@property
def expl_offset(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return self.offset - self.expl_tlen - self.expl_llen
@property
def expl_vlen(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return self.tlvlen
@property
def expl_tlvlen(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return self.expl_tlen + self.expl_llen + self.expl_vlen
@property
def fulloffset(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return self.expl_offset if self.expled else self.offset
@property
def fulllen(self):
- """See :ref:`decoding`
+ """.. seealso:: :ref:`decoding`
"""
return self.expl_tlvlen if self.expled else self.tlvlen
return buf.getvalue()
+def encode2pass(obj):
+ """Encode (2-pass mode) to DER in memory buffer
+
+ :returns bytes: memory buffer contents
+ """
+ buf = BytesIO()
+ _, state = obj.encode1st()
+ obj.encode2nd(buf.write, iter(state))
+ return buf.getvalue()
+
+
class DecodePathDefBy(object):
"""DEFINED BY representation inside decode path
"""
return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
+def find_oid_name(asn1_type_name, oid_maps, value):
+ if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
+ for oid_map in oid_maps:
+ oid_name = oid_map.get(value)
+ if oid_name is not None:
+ return oid_name
+ return None
+
+
def pp_console_row(
pp,
oid_maps=(),
col += _colourize("B", "red", with_colours) if pp.bered else " "
cols.append(col)
col = "[%d,%d,%4d]%s" % (
- pp.tlen,
- pp.llen,
- pp.vlen,
+ pp.tlen, pp.llen, pp.vlen,
LENINDEF_PP_CHAR if pp.lenindef else " "
)
col = _colourize(col, "green", with_colours, ())
if isinstance(ent, DecodePathDefBy):
cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
value = str(ent.defined_by)
- oid_name = None
- if (
- len(oid_maps) > 0 and
- ent.defined_by.asn1_type_name ==
- ObjectIdentifier.asn1_type_name
- ):
- for oid_map in oid_maps:
- oid_name = oid_map.get(value)
- if oid_name is not None:
- cols.append(_colourize("%s:" % oid_name, "green", with_colours))
- break
+ oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
if oid_name is None:
cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
+ else:
+ cols.append(_colourize("%s:" % oid_name, "green", with_colours))
else:
cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
if pp.expl is not None:
if pp.value is not None:
value = pp.value
cols.append(_colourize(value, "white", with_colours, ("reverse",)))
- if (
- len(oid_maps) > 0 and
- pp.asn1_type_name == ObjectIdentifier.asn1_type_name
- ):
- for oid_map in oid_maps:
- oid_name = oid_map.get(value)
- if oid_name is not None:
- cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
- break
+ oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
+ if oid_name is not None:
+ cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
if pp.asn1_type_name == Integer.asn1_type_name:
- hex_repr = hex(int(pp.obj._value))[2:].upper()
- if len(hex_repr) % 2 != 0:
- hex_repr = "0" + hex_repr
cols.append(_colourize(
- "(%s)" % colonize_hex(hex_repr),
- "green",
- with_colours,
+ "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
))
if with_blob:
if pp.blob.__class__ == binary_type:
"""Pretty print object
:param Obj obj: object you want to pretty print
- :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
+ :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
Its human readable form is printed when OID is met
:param big_blobs: if large binary objects are met (like OctetString
values), do we need to print them too, on separate
self._assert_ready()
return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
+ def _encode1st(self, state):
+ return len(self.tag) + 2, state
+
+ def _encode2nd(self, writer, state_iter):
+ self._assert_ready()
+ write_full(writer, self._encode())
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
self._assert_ready()
return int(self._value)
+ def tohex(self):
+ """Hexadecimal representation
+
+ Use :py:func:`pyderasn.colonize_hex` for colonizing it.
+ """
+ hex_repr = hex(int(self))[2:].upper()
+ if len(hex_repr) % 2 != 0:
+ hex_repr = "0" + hex_repr
+ return hex_repr
+
def __hash__(self):
self._assert_ready()
return hash(b"".join((
_specs=self.specs,
)
- def _encode(self):
+ def _encode_payload(self):
self._assert_ready()
value = self._value
if PY2:
bytes_len += 1
else:
break
+ return octets
+
+ def _encode(self):
+ octets = self._encode_payload()
return b"".join((self.tag, len_encode(len(octets)), octets))
+ def _encode1st(self, state):
+ l = len(self._encode_payload())
+ return len(self.tag) + len_size(l) + l, state
+
+ def _encode2nd(self, writer, state_iter):
+ write_full(writer, self._encode())
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
octets,
))
+ def _encode1st(self, state):
+ self._assert_ready()
+ _, octets = self._value
+ l = len(octets) + 1
+ return len(self.tag) + len_size(l) + l, state
+
+ def _encode2nd(self, writer, state_iter):
+ bit_len, octets = self._value
+ write_full(writer, b"".join((
+ self.tag,
+ len_encode(len(octets) + 1),
+ int2byte((8 - bit_len % 8) % 8),
+ )))
+ write_full(writer, octets)
+
def _encode_cer(self, writer):
bit_len, octets = self._value
if len(octets) + 1 <= 1000:
self._value,
))
+ def _encode1st(self, state):
+ self._assert_ready()
+ l = len(self._value)
+ return len(self.tag) + len_size(l) + l, state
+
+ def _encode2nd(self, writer, state_iter):
+ value = self._value
+ write_full(writer, self.tag + len_encode(len(value)))
+ write_full(writer, value)
+
def _encode_cer(self, writer):
octets = self._value
if len(octets) <= 1000:
:param writer: buffer.write where string is going to be saved
:param writer: where string is going to be saved. Must comply
with ``io.RawIOBase.write`` behaviour
+
+ .. seealso:: :ref:`agg_octet_string`
"""
decode_path_len = len(decode_path)
for dp, obj, _ in evgens:
def _encode(self):
return self.tag + LEN0
+ def _encode1st(self, state):
+ return len(self.tag) + 1, state
+
+ def _encode2nd(self, writer, state_iter):
+ write_full(writer, self.tag + LEN0)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
tuple element is ``{OID: pyderasn.Obj()}``
dictionary, mapping between current OID value
and structure applied to defined field.
- :ref:`Read about DEFINED BY <definedby>`
+
+ .. seealso:: :ref:`definedby`
+
:param bytes impl: override default tag with ``IMPLICIT`` one
:param bytes expl: override default tag with ``EXPLICIT`` one
:param default: set default value. Type same as in ``value``
optional=self.optional if optional is None else optional,
)
- def _encode(self):
+ def _encode_octets(self):
self._assert_ready()
value = self._value
first_value = value[1]
octets = [zero_ended_encode(first_value)]
for arc in value[2:]:
octets.append(zero_ended_encode(arc))
- v = b"".join(octets)
+ return b"".join(octets)
+
+ def _encode(self):
+ v = self._encode_octets()
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode1st(self, state):
+ l = len(self._encode_octets())
+ return len(self.tag) + len_size(l) + l, state
+
+ def _encode2nd(self, writer, state_iter):
+ write_full(writer, self._encode())
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
self._assert_ready()
return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
+ def _encode1st(self, state):
+ return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
+
+ def _encode2nd(self, writer, state_iter):
+ self._assert_ready()
+ write_full(writer, self._encode())
+
def _encode_cer(self, writer):
write_full(writer, self._encode())
return b"".join((self.tag, len_encode(len(encoded)), encoded))
return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
+ def _encode1st(self, state):
+ self._assert_ready()
+ vlen = len(self._encode_time())
+ return len(self.tag) + len_size(vlen) + vlen, state
+
+ def _encode2nd(self, writer, state_iter):
+ write_full(writer, self._encode())
+
class GraphicString(CommonString):
__slots__ = ()
self._assert_ready()
return self._value[1].encode()
+ def _encode1st(self, state):
+ self._assert_ready()
+ return self._value[1].encode1st(state)
+
+ def _encode2nd(self, writer, state_iter):
+ self._value[1].encode2nd(writer, state_iter)
+
def _encode_cer(self, writer):
self._assert_ready()
self._value[1].encode_cer(writer)
return value
return value.encode()
+ def _encode1st(self, state):
+ self._assert_ready()
+ value = self._value
+ if value.__class__ == binary_type:
+ return len(value), state
+ return value.encode1st(state)
+
+ def _encode2nd(self, writer, state_iter):
+ value = self._value
+ if value.__class__ == binary_type:
+ write_full(writer, value)
+ else:
+ value.encode2nd(writer, state_iter)
+
def _encode_cer(self, writer):
self._assert_ready()
value = self._value
)
-class Sequence(Obj):
+class SequenceEncode1stMixing(object):
+ def _encode1st(self, state):
+ state.append(0)
+ idx = len(state) - 1
+ vlen = 0
+ for v in self._values_for_encoding():
+ l, _ = v.encode1st(state)
+ vlen += l
+ state[idx] = vlen
+ return len(self.tag) + len_size(vlen) + vlen, state
+
+
+class Sequence(SequenceEncode1stMixing, Obj):
"""``SEQUENCE`` structure type
You have to make specification of sequence::
defaulted values existence validation by setting
``"allow_default_values": True`` :ref:`context <ctx>` option.
- .. warning::
-
- Check for default value existence is not performed in
- ``evgen_mode``, because previously decoded values are not stored
- in memory, to be able to compare them.
+ All values with DEFAULT specified are decoded atomically in
+ :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
+ SEQUENCE, then it will be yielded as a single element, not
+ disassembled. That is required for DEFAULT existence check.
Two sequences are equal if they have equal specification (schema),
implicit/explicit tagging and the same values.
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode2nd(self, writer, state_iter):
+ write_full(writer, self.tag + len_encode(next(state_iter)))
+ for v in self._values_for_encoding():
+ v.encode2nd(writer, state_iter)
+
def _encode_cer(self, writer):
write_full(writer, self.tag + LENINDEF)
for v in self._values_for_encoding():
len(v) == 0
):
continue
+ spec_defaulted = spec.default is not None
sub_decode_path = decode_path + (name,)
try:
- if evgen_mode:
+ if evgen_mode and not spec_defaulted:
for _decode_path, value, v_tail in spec.decode_evgen(
v,
sub_offset,
vlen += value_len
sub_offset += value_len
v = v_tail
- if not evgen_mode:
- if spec.default is not None and value == spec.default:
- # This will not work in evgen_mode
+ if spec_defaulted:
+ if evgen_mode:
+ yield sub_decode_path, value, v_tail
+ if value == spec.default:
if ctx_bered or ctx_allow_default_values:
ber_encoded = True
else:
decode_path=sub_decode_path,
offset=sub_offset,
)
+ if not evgen_mode:
values[name] = value
spec_defines = getattr(spec, "defines", ())
if len(spec_defines) == 0:
yield pp
-class Set(Sequence):
+class Set(Sequence, SequenceEncode1stMixing):
"""``SET`` structure type
Its usage is identical to :py:class:`pyderasn.Sequence`.
tag_default = tag_encode(form=TagFormConstructed, num=17)
asn1_type_name = "SET"
- def _encode(self):
- v = b"".join(value.encode() for value in sorted(
- self._values_for_encoding(),
+ def _values_for_encoding(self):
+ return sorted(
+ super(Set, self)._values_for_encoding(),
key=attrgetter("tag_order"),
- ))
- return b"".join((self.tag, len_encode(len(v)), v))
+ )
def _encode_cer(self, writer):
write_full(writer, self.tag + LENINDEF)
for v in sorted(
- self._values_for_encoding(),
+ super(Set, self)._values_for_encoding(),
key=attrgetter("tag_order_cer"),
):
v.encode_cer(writer)
decode_path=decode_path,
offset=offset,
)
- if evgen_mode:
+ spec_defaulted = spec.default is not None
+ if evgen_mode and not spec_defaulted:
for _decode_path, value, v_tail in spec.decode_evgen(
v,
sub_offset,
decode_path=sub_decode_path,
offset=sub_offset,
)
- if spec.default is None or value != spec.default:
- pass
- elif ctx_bered or ctx_allow_default_values:
- ber_encoded = True
- else:
- raise DecodeError(
- "DEFAULT value met",
- klass=self.__class__,
- decode_path=sub_decode_path,
- offset=sub_offset,
- )
+ if spec_defaulted:
+ if evgen_mode:
+ yield sub_decode_path, value, v_tail
+ if value != spec.default:
+ pass
+ elif ctx_bered or ctx_allow_default_values:
+ ber_encoded = True
+ else:
+ raise DecodeError(
+ "DEFAULT value met",
+ klass=self.__class__,
+ decode_path=sub_decode_path,
+ offset=sub_offset,
+ )
values[name] = value
del _specs_items[name]
tag_order_prev = value_tag_order
)
-class SequenceOf(Obj):
+class SequenceOf(SequenceEncode1stMixing, Obj):
"""``SEQUENCE OF`` sequence type
For that kind of type you must specify the object it will carry on
value = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(value)), value))
+ def _encode1st(self, state):
+ state = super(SequenceOf, self)._encode1st(state)
+ if hasattr(self._value, NEXT_ATTR_NAME):
+ self._value = []
+ return state
+
+ def _encode2nd(self, writer, state_iter):
+ write_full(writer, self.tag + len_encode(next(state_iter)))
+ iterator = hasattr(self._value, NEXT_ATTR_NAME)
+ if iterator:
+ values_count = 0
+ class_expected = self.spec.__class__
+ values_for_encoding = self._values_for_encoding()
+ self._value = []
+ for v in values_for_encoding:
+ if not isinstance(v, class_expected):
+ raise InvalidValueType((class_expected,))
+ v.encode2nd(writer, state_iter)
+ values_count += 1
+ if not self._bound_min <= values_count <= self._bound_max:
+ raise BoundsError(self._bound_min, values_count, self._bound_max)
+ else:
+ for v in self._values_for_encoding():
+ v.encode2nd(writer, state_iter)
+
def _encode_cer(self, writer):
write_full(writer, self.tag + LENINDEF)
iterator = hasattr(self._value, NEXT_ATTR_NAME)
v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode2nd(self, writer, state_iter):
+ write_full(writer, self.tag + len_encode(next(state_iter)))
+ values = []
+ for v in self._values_for_encoding():
+ buf = BytesIO()
+ v.encode2nd(buf.write, state_iter)
+ values.append(buf.getvalue())
+ values.sort()
+ for v in values:
+ write_full(writer, v)
+
def _encode_cer(self, writer):
write_full(writer, self.tag + LENINDEF)
for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
return SEQUENCEOF(), pprint_any
+def ascii_visualize(ba):
+ """Output only ASCII printable characters, like in hexdump -C
+
+ Example output for given binary string (right part)::
+
+ 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.|
+ ^^^^^^^^^^^^^^^^
+ """
+ return "".join((chr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
+
+
+def hexdump(raw):
+ """Generate ``hexdump -C`` like output
+
+ Rendered example::
+
+ 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..|
+ 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j|
+
+ Result of that function is a generator of lines, where each line is
+ a list of columns::
+
+ [
+ [...],
+ ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
+ " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
+ " |i..?.....G...k.j|"]
+ [...],
+ ]
+ """
+ hexed = hexenc(raw).upper()
+ addr, cols = 0, ["%08x " % 0]
+ for i in six_xrange(0, len(hexed), 2):
+ if i != 0 and i // 2 % 8 == 0:
+ cols[-1] += " "
+ if i != 0 and i // 2 % 16 == 0:
+ cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
+ yield cols
+ addr += 16
+ cols = ["%08x " % addr]
+ cols.append(" " + hexed[i:i + 2])
+ if len(cols) > 0:
+ cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
+ yield cols
+
+
+def browse(raw, obj, oid_maps=()):
+ """Interactive browser
+
+ :param bytes raw: binary data you decoded
+ :param obj: decoded :py:class:`pyderasn.Obj`
+ :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
+ Its human readable form is printed when OID is met
+
+ .. note:: `urwid <http://urwid.org/>`__ dependency required
+
+ This browser is an interactive terminal application for browsing
+ structures of your decoded ASN.1 objects. You can quit it with **q**
+ key. It consists of three windows:
+
+ :tree:
+ View of ASN.1 elements hierarchy. You can navigate it using **Up**,
+ **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
+ **Left** key goes to constructed element above. **Plus**/**Minus**
+ keys collapse/uncollapse constructed elements. **Space** toggles it
+ :info:
+ window with various information about element. You can scroll it
+ with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
+ :hexdump:
+ window with raw data hexdump and highlighted current element's
+ contents. It automatically focuses on element's data. You can
+ scroll it with **j**/**k** (down, up) (**J**/**K** for triple
+ speed) keys. If element has explicit tag, then it also will be
+ highlighted with different colour
+
+ Window's header contains current decode path and progress bars with
+ position in *info* and *hexdump* windows.
+
+ If you press **d**, then current element will be saved in the
+ current directory under its decode path name (adding ".0", ".1", etc
+ suffix if such file already exists). **D** will save it with explicit tag.
+ """
+ from copy import deepcopy
+ from os.path import exists as path_exists
+ import urwid
+
+ class TW(urwid.TreeWidget):
+ def __init__(self, state, *args, **kwargs):
+ self.state = state
+ self.scrolled = {"info": False, "hexdump": False}
+ super(TW, self).__init__(*args, **kwargs)
+
+ def _get_pp(self):
+ pp = self.get_node().get_value()
+ constructed = len(pp) > 1
+ return (pp if hasattr(pp, "_fields") else pp[0]), constructed
+
+ def _state_update(self):
+ pp, _ = self._get_pp()
+ self.state["decode_path"].set_text(
+ ":".join(str(p) for p in pp.decode_path)
+ )
+ lines = deepcopy(self.state["hexed"])
+
+ def attr_set(i, attr):
+ line = lines[i // 16]
+ idx = 1 + (i - 16 * (i // 16))
+ line[idx] = (attr, line[idx])
+
+ if pp.expl_offset is not None:
+ for i in six_xrange(
+ pp.expl_offset,
+ pp.expl_offset + pp.expl_tlen + pp.expl_llen,
+ ):
+ attr_set(i, "select-expl")
+ for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
+ attr_set(i, "select-value")
+ self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
+ self.state["hexdump"].set_focus(pp.offset // 16)
+ self.state["hexdump"].set_focus_valign("middle")
+ self.state["hexdump_bar"].set_completion(
+ (100 * pp.offset // 16) //
+ len(self.state["hexdump"]._body.positions())
+ )
+
+ lines = [
+ [("header", "Name: "), pp.obj_name],
+ [("header", "Type: "), pp.asn1_type_name],
+ [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
+ [("header", "[TLV]len: "), "%d/%d/%d" % (
+ pp.tlen, pp.llen, pp.vlen,
+ )],
+ [("header", "Slice: "), "[%d:%d]" % (
+ pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
+ )],
+ ]
+ if pp.lenindef:
+ lines.append([("warning", "LENINDEF")])
+ if pp.ber_encoded:
+ lines.append([("warning", "BER encoded")])
+ if pp.bered:
+ lines.append([("warning", "BERed")])
+ if pp.expl is not None:
+ lines.append([("header", "EXPLICIT")])
+ klass, _, num = pp.expl
+ lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)])
+ if pp.expl_offset is not None:
+ lines.append([" Offset: %d" % pp.expl_offset])
+ lines.append([" [TLV]len: %d/%d/%d" % (
+ pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
+ )])
+ lines.append([" Slice: [%d:%d]" % (
+ pp.expl_offset,
+ pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
+ )])
+ if pp.impl is not None:
+ klass, _, num = pp.impl
+ lines.append([
+ ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
+ ])
+ if pp.optional:
+ lines.append(["OPTIONAL"])
+ if pp.default:
+ lines.append(["DEFAULT"])
+ if len(pp.decode_path) > 0:
+ ent = pp.decode_path[-1]
+ if isinstance(ent, DecodePathDefBy):
+ lines.append([""])
+ value = str(ent.defined_by)
+ oid_name = find_oid_name(
+ ent.defined_by.asn1_type_name, oid_maps, value,
+ )
+ lines.append([("header", "DEFINED BY: "), "%s" % (
+ value if oid_name is None
+ else "%s (%s)" % (oid_name, value)
+ )])
+ lines.append([""])
+ if pp.value is not None:
+ lines.append([("header", "Value: "), pp.value])
+ if (
+ len(oid_maps) > 0 and
+ pp.asn1_type_name == ObjectIdentifier.asn1_type_name
+ ):
+ for oid_map in oid_maps:
+ oid_name = oid_map.get(pp.value)
+ if oid_name is not None:
+ lines.append([("header", "Human: "), oid_name])
+ break
+ if pp.asn1_type_name == Integer.asn1_type_name:
+ lines.append([
+ ("header", "Decimal: "), "%d" % int(pp.obj),
+ ])
+ lines.append([
+ ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
+ ])
+ if pp.blob.__class__ == binary_type:
+ blob = hexenc(pp.blob).upper()
+ for i in six_xrange(0, len(blob), 32):
+ lines.append([colonize_hex(blob[i:i + 32])])
+ elif pp.blob.__class__ == tuple:
+ lines.append([", ".join(pp.blob)])
+ self.state["info"]._set_body([urwid.Text(line) for line in lines])
+ self.state["info_bar"].set_completion(0)
+
+ def selectable(self):
+ if self.state["widget_current"] != self:
+ self.state["widget_current"] = self
+ self.scrolled["info"] = False
+ self.scrolled["hexdump"] = False
+ self._state_update()
+ return super(TW, self).selectable()
+
+ def get_display_text(self):
+ pp, constructed = self._get_pp()
+ style = "constructed" if constructed else ""
+ if len(pp.decode_path) == 0:
+ return (style, pp.obj_name)
+ if pp.asn1_type_name == "EOC":
+ return ("eoc", "EOC")
+ ent = pp.decode_path[-1]
+ if isinstance(ent, DecodePathDefBy):
+ value = str(ent.defined_by)
+ oid_name = find_oid_name(
+ ent.defined_by.asn1_type_name, oid_maps, value,
+ )
+ return ("defby", "DEFBY:" + (
+ value if oid_name is None else oid_name
+ ))
+ return (style, ent)
+
+ def _scroll(self, what, step):
+ self.state[what]._invalidate()
+ pos = self.state[what].focus_position
+ if not self.scrolled[what]:
+ self.scrolled[what] = True
+ pos -= 2
+ pos = max(0, pos + step)
+ pos = min(pos, len(self.state[what]._body.positions()) - 1)
+ self.state[what].set_focus(pos)
+ self.state[what].set_focus_valign("top")
+ self.state[what + "_bar"].set_completion(
+ (100 * pos) // len(self.state[what]._body.positions())
+ )
+
+ def keypress(self, size, key):
+ if key == "q":
+ raise urwid.ExitMainLoop()
+
+ if key == " ":
+ self.expanded = not self.expanded
+ self.update_expanded_icon()
+ return None
+
+ hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
+ if key in hexdump_steps:
+ self._scroll("hexdump", hexdump_steps[key])
+ return None
+
+ info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
+ if key in info_steps:
+ self._scroll("info", info_steps[key])
+ return None
+
+ if key in ("d", "D"):
+ pp, _ = self._get_pp()
+ dp = ":".join(str(p) for p in pp.decode_path)
+ dp = dp.replace(" ", "_")
+ if dp == "":
+ dp = "root"
+ if key == "d" or pp.expl_offset is None:
+ data = self.state["raw"][pp.offset:(
+ pp.offset + pp.tlen + pp.llen + pp.vlen
+ )]
+ else:
+ data = self.state["raw"][pp.expl_offset:(
+ pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
+ )]
+ ctr = 0
+
+ def duplicate_path(dp, ctr):
+ if ctr == 0:
+ return dp
+ return "%s.%d" % (dp, ctr)
+
+ while True:
+ if not path_exists(duplicate_path(dp, ctr)):
+ break
+ ctr += 1
+ dp = duplicate_path(dp, ctr)
+ with open(dp, "wb") as fd:
+ fd.write(data)
+ self.state["decode_path"].set_text(
+ ("warning", "Saved to: " + dp)
+ )
+ return None
+ return super(TW, self).keypress(size, key)
+
+ class PN(urwid.ParentNode):
+ def __init__(self, state, value, *args, **kwargs):
+ self.state = state
+ if not hasattr(value, "_fields"):
+ value = list(value)
+ super(PN, self).__init__(value, *args, **kwargs)
+
+ def load_widget(self):
+ return TW(self.state, self)
+
+ def load_child_keys(self):
+ value = self.get_value()
+ if hasattr(value, "_fields"):
+ return []
+ return range(len(value[1:]))
+
+ def load_child_node(self, key):
+ return PN(
+ self.state,
+ self.get_value()[key + 1],
+ parent=self,
+ key=key,
+ depth=self.get_depth() + 1,
+ )
+
+ class LabeledPG(urwid.ProgressBar):
+ def __init__(self, label, *args, **kwargs):
+ self.label = label
+ super(LabeledPG, self).__init__(*args, **kwargs)
+
+ def get_text(self):
+ return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
+
+ WinHexdump = urwid.ListBox([urwid.Text("")])
+ WinInfo = urwid.ListBox([urwid.Text("")])
+ WinDecodePath = urwid.Text("", "center")
+ WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
+ WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
+ WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
+ {
+ "raw": raw,
+ "hexed": list(hexdump(raw)),
+ "widget_current": None,
+ "info": WinInfo,
+ "info_bar": WinInfoBar,
+ "hexdump": WinHexdump,
+ "hexdump_bar": WinHexdumpBar,
+ "decode_path": WinDecodePath,
+ },
+ list(obj.pps()),
+ )))
+ help_text = " ".join((
+ "q:quit",
+ "space:(un)collapse",
+ "(pg)up/down/home/end:nav",
+ "jkJK:hexdump hlHL:info",
+ "dD:dump",
+ ))
+ urwid.MainLoop(
+ urwid.Frame(
+ urwid.Columns([
+ ("weight", 1, WinTree),
+ ("weight", 2, urwid.Pile([
+ urwid.LineBox(WinInfo),
+ urwid.LineBox(WinHexdump),
+ ])),
+ ]),
+ header=urwid.Columns([
+ ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
+ ("weight", 1, WinInfoBar),
+ ("weight", 1, WinHexdumpBar),
+ ]),
+ footer=urwid.AttrWrap(urwid.Text(help_text), "help")
+ ),
+ [
+ ("header", "bold", ""),
+ ("constructed", "bold", ""),
+ ("help", "light magenta", ""),
+ ("warning", "light red", ""),
+ ("defby", "light red", ""),
+ ("eoc", "dark red", ""),
+ ("select-value", "light green", ""),
+ ("select-expl", "light red", ""),
+ ("pg-normal", "", "light blue"),
+ ("pg-complete", "black", "yellow"),
+ ],
+ ).run()
+
+
def main(): # pragma: no cover
import argparse
parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
action="store_true",
help="Turn on event generation mode",
)
+ parser.add_argument(
+ "--browse",
+ action="store_true",
+ help="Start ASN.1 browser",
+ )
parser.add_argument(
"RAWFile",
type=argparse.FileType("rb"),
}
if args.defines_by_path is not None:
ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
+ if args.browse:
+ obj, _ = schema().decode(raw, ctx=ctx)
+ browse(raw, obj, oid_maps)
+ from sys import exit as sys_exit
+ sys_exit(0)
from os import environ
pprinter = partial(
pprinter,
if __name__ == "__main__":
+ from pyderasn import *
main()