.. autofunction:: pyderasn.abs_decode_path
.. autofunction:: pyderasn.colonize_hex
+.. autofunction:: pyderasn.encode_cer
+.. autofunction:: pyderasn.file_mmaped
.. autofunction:: pyderasn.hexenc
.. autofunction:: pyderasn.hexdec
.. autofunction:: pyderasn.tag_encode
from copy import copy
from datetime import datetime
from datetime import timedelta
+from io import BytesIO
from math import ceil
+from mmap import mmap
+from mmap import PROT_READ
from operator import attrgetter
from string import ascii_letters
from string import digits
__version__ = "7.0"
__all__ = (
+ "agg_octet_string",
"Any",
"BitString",
"BMPString",
"Choice",
"DecodeError",
"DecodePathDefBy",
+ "encode_cer",
"Enumerated",
"ExceedingData",
+ "file_mmaped",
"GeneralizedTime",
"GeneralString",
"GraphicString",
DECIMAL_SIGNS = ".,"
+def file_mmaped(fd):
+ """Make mmap-ed memoryview for reading from file
+
+ :param fd: file object
+ :returns: memoryview over read-only mmap-ing of the whole file
+ """
+ return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
+
def pureint(value):
if not set(value) <= DECIMALS:
raise ValueError("non-pure integer")
return l, 1 + octets_num, data[1 + octets_num:]
+LEN1K = len_encode(1000)
+
+
+def write_full(writer, data):
+ """Fully write provided data
+
+ BytesIO does not guarantee that the whole data will be written at once.
+ """
+ data = memoryview(data)
+ written = 0
+ while written != len(data):
+ n = writer(data[written:])
+ if n is None:
+ raise ValueError("can not write to buf")
+ written += n
+
+
########################################################################
# Base class
########################################################################
"""
return self._tag_order
+ @property
+ def tag_order_cer(self):
+ return self.tag_order
+
@property
def tlen(self):
"""See :ref:`decoding`
return raw
return b"".join((self._expl, len_encode(len(raw)), raw))
+ def encode_cer(self, writer):
+ if self._expl is not None:
+ write_full(writer, self._expl + LENINDEF)
+ if getattr(self, "der_forced", False):
+ write_full(writer, self._encode())
+ else:
+ self._encode_cer(writer)
+ if self._expl is not None:
+ write_full(writer, EOC)
+
+ def _encode_cer(self, writer):
+ write_full(writer, self._encode())
+
def hexencode(self):
"""Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
"""
)
+def encode_cer(obj):
+ """Encode to CER in memory
+ """
+ buf = BytesIO()
+ obj.encode_cer(buf.write)
+ return buf.getvalue()
+
+
class DecodePathDefBy(object):
"""DEFINED BY representation inside decode path
"""
with_colours=False,
with_decode_path=False,
decode_path_only=(),
+ decode_path=(),
):
"""Pretty print object
else:
for row in _pprint_pps(pp):
yield row
- return "\n".join(_pprint_pps(obj.pps()))
+ return "\n".join(_pprint_pps(obj.pps(decode_path)))
########################################################################
octets,
))
+ def _encode_cer(self, writer):
+ bit_len, octets = self._value
+ if len(octets) + 1 <= 1000:
+ write_full(writer, self._encode())
+ return
+ write_full(writer, self.tag_constructed)
+ write_full(writer, LENINDEF)
+ for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
+ write_full(writer, b"".join((
+ BitString.tag_default,
+ LEN1K,
+ int2byte(0),
+ octets[offset:offset + 999],
+ )))
+ tail = octets[offset+999:]
+ if len(tail) > 0:
+ tail = int2byte((8 - bit_len % 8) % 8) + tail
+ write_full(writer, b"".join((
+ BitString.tag_default,
+ len_encode(len(tail)),
+ tail,
+ )))
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
self._value,
))
+ def _encode_cer(self, writer):
+ octets = self._value
+ if len(octets) <= 1000:
+ write_full(writer, self._encode())
+ return
+ write_full(writer, self.tag_constructed)
+ write_full(writer, LENINDEF)
+ for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
+ write_full(writer, b"".join((
+ OctetString.tag_default,
+ LEN1K,
+ octets[offset:offset + 1000],
+ )))
+ tail = octets[offset+1000:]
+ if len(tail) > 0:
+ write_full(writer, b"".join((
+ OctetString.tag_default,
+ len_encode(len(tail)),
+ tail,
+ )))
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
yield pp
+def agg_octet_string(evgens, decode_path, raw, writer):
+ """Aggregate constructed string (OctetString and its derivatives)
+
+ :param evgens: iterator of generated events
+ :param decode_path: points to the string we want to decode
+ :param raw: slicebable (memoryview, bytearray, etc) with
+ the data evgens are generated one
+ :param writer: buffer.write where string is going to be saved
+ """
+ decode_path_len = len(decode_path)
+ for dp, obj, _ in evgens:
+ if dp[:decode_path_len] != decode_path:
+ continue
+ if not obj.ber_encoded:
+ write_full(writer, raw[
+ obj.offset + obj.tlen + obj.llen:
+ obj.offset + obj.tlen + obj.llen + obj.vlen -
+ (EOC_LEN if obj.expl_lenindef else 0)
+ ])
+ if len(dp) == decode_path_len:
+ break
+
+
NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
value = self._encode_time()
return b"".join((self.tag, len_encode(len(value)), value))
+ def _encode_cer(self, writer):
+ write_full(writer, self._encode())
+
def todatetime(self):
return self._value
self._assert_ready()
return self._value[1].tag_order if self._tag_order is None else self._tag_order
+ @property
+ def tag_order_cer(self):
+ return min(v.tag_order_cer for v in itervalues(self.specs))
+
def __getitem__(self, key):
if key not in self.specs:
raise ObjUnknown(key)
self._assert_ready()
return self._value[1].encode()
+ def _encode_cer(self, writer):
+ self._assert_ready()
+ self._value[1].encode_cer(writer)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
for choice, spec in iteritems(self.specs):
sub_decode_path = decode_path + (choice,)
"""``ANY`` special type
>>> Any(Integer(-123))
- ANY 020185
+ ANY INTEGER -123 (0X:7B)
>>> a = Any(OctetString(b"hello world").encode())
ANY 040b68656c6c6f20776f726c64
>>> hexenc(bytes(a))
return value
if isinstance(value, self.__class__):
return value._value
- if isinstance(value, Obj):
- return value.encode()
- raise InvalidValueType((self.__class__, Obj, binary_type))
+ if not isinstance(value, Obj):
+ raise InvalidValueType((self.__class__, Obj, binary_type))
+ return value
@property
def ready(self):
def __eq__(self, their):
if their.__class__ == binary_type:
- return self._value == their
+ if self._value.__class__ == binary_type:
+ return self._value == their
+ return self._value.encode() == their
if issubclass(their.__class__, Any):
- return self._value == their._value
+ if self.ready and their.ready:
+ return bytes(self) == bytes(their)
+ return self.ready == their.ready
return False
def __call__(
def __bytes__(self):
self._assert_ready()
- return self._value
+ value = self._value
+ if value.__class__ == binary_type:
+ return value
+ return self._value.encode()
@property
def tlen(self):
def _encode(self):
self._assert_ready()
- return self._value
+ value = self._value
+ if value.__class__ == binary_type:
+ return value
+ return value.encode()
+
+ def _encode_cer(self, writer):
+ self._assert_ready()
+ value = self._value
+ if value.__class__ == binary_type:
+ write_full(writer, value)
+ else:
+ value.encode_cer(writer)
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
return pp_console_row(next(self.pps()))
def pps(self, decode_path=()):
+ value = self._value
+ if value is None:
+ pass
+ elif value.__class__ == binary_type:
+ value = None
+ else:
+ value = repr(value)
yield _pp(
obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
- blob=self._value if self.ready else None,
+ value=value,
+ blob=self._value if self._value.__class__ == binary_type else None,
optional=self.optional,
default=self == self.default,
impl=None if self.tag == self.tag_default else tag_decode(self.tag),
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in self._values_for_encoding():
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
))
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in sorted(
+ self._values_for_encoding(),
+ key=attrgetter("tag_order_cer"),
+ ):
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in self._values_for_encoding():
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
def _decode(
self,
tlv,
v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
+ write_full(writer, v)
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
return super(SetOf, self)._decode(
tlv,
help="Allow explicit tag out-of-bound",
)
parser.add_argument(
- "DERFile",
+ "--evgen",
+ action="store_true",
+ help="Turn on event generation mode",
+ )
+ parser.add_argument(
+ "RAWFile",
type=argparse.FileType("rb"),
- help="Path to DER file you want to decode",
+ help="Path to BER/CER/DER file you want to decode",
)
args = parser.parse_args()
- args.DERFile.seek(args.skip)
- der = memoryview(args.DERFile.read())
- args.DERFile.close()
+ if PY2:
+ args.RAWFile.seek(args.skip)
+ raw = memoryview(args.RAWFile.read())
+ args.RAWFile.close()
+ else:
+ raw = file_mmaped(args.RAWFile)[args.skip:]
oid_maps = (
[obj_by_path(_path) for _path in (args.oids or "").split(",")]
if args.oids else ()
}
if args.defines_by_path is not None:
ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
- obj, tail = schema().decode(der, ctx=ctx)
from os import environ
- print(pprinter(
- obj,
+ pprinter = partial(
+ pprinter,
oid_maps=oid_maps,
with_colours=environ.get("NO_COLOR") is None,
with_decode_path=args.print_decode_path,
() if args.decode_path_only is None else
tuple(args.decode_path_only.split(":"))
),
- ))
+ )
+ if args.evgen:
+ for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
+ print(pprinter(obj, decode_path=decode_path))
+ else:
+ obj, tail = schema().decode(raw, ctx=ctx)
+ print(pprinter(obj))
if tail != b"":
print("\nTrailing data: %s" % hexenc(tail))