.. autofunction:: pyderasn.abs_decode_path
.. autofunction:: pyderasn.colonize_hex
+.. autofunction:: pyderasn.encode_cer
+.. autofunction:: pyderasn.file_mmaped
.. autofunction:: pyderasn.hexenc
.. autofunction:: pyderasn.hexdec
.. autofunction:: pyderasn.tag_encode
from copy import copy
from datetime import datetime
from datetime import timedelta
+from io import BytesIO
from math import ceil
+from mmap import mmap
+from mmap import PROT_READ
from operator import attrgetter
from string import ascii_letters
from string import digits
__version__ = "7.0"
__all__ = (
+ "agg_octet_string",
"Any",
"BitString",
"BMPString",
"Choice",
"DecodeError",
"DecodePathDefBy",
+ "encode_cer",
"Enumerated",
"ExceedingData",
+ "file_mmaped",
"GeneralizedTime",
"GeneralString",
"GraphicString",
SET01 = frozenset("01")
DECIMALS = frozenset(digits)
DECIMAL_SIGNS = ".,"
+NEXT_ATTR_NAME = "next" if PY2 else "__next__"
+def file_mmaped(fd):
+ """Make mmap-ed memoryview for reading from file
+
+ :param fd: file object
+ :returns: memoryview over read-only mmap-ing of the whole file
+ """
+ return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
+
def pureint(value):
if not set(value) <= DECIMALS:
raise ValueError("non-pure integer")
return float("0." + fractions_raw)
+def get_def_by_path(defines_by_path, sub_decode_path):
+ """Get define by decode path
+ """
+ for path, define in defines_by_path:
+ if len(path) != len(sub_decode_path):
+ continue
+ for p1, p2 in zip(path, sub_decode_path):
+ if (not p1 is any) and (p1 != p2):
+ break
+ else:
+ return define
+
+
########################################################################
# Errors
########################################################################
return l, 1 + octets_num, data[1 + octets_num:]
+LEN1K = len_encode(1000)
+
+
+def write_full(writer, data):
+ """Fully write provided data
+
+ BytesIO does not guarantee that the whole data will be written at once.
+ """
+ data = memoryview(data)
+ written = 0
+ while written != len(data):
+ n = writer(data[written:])
+ if n is None:
+ raise ValueError("can not write to buf")
+ written += n
+
+
########################################################################
# Base class
########################################################################
"""
return self._tag_order
+ @property
+ def tag_order_cer(self):
+ return self.tag_order
+
@property
def tlen(self):
"""See :ref:`decoding`
def _encode(self): # pragma: no cover
raise NotImplementedError()
- def _decode(self, tlv, offset, decode_path, ctx, tag_only): # pragma: no cover
- raise NotImplementedError()
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
+ yield NotImplemented
def encode(self):
"""Encode the structure
return raw
return b"".join((self._expl, len_encode(len(raw)), raw))
+ def encode_cer(self, writer):
+ if self._expl is not None:
+ write_full(writer, self._expl + LENINDEF)
+ if getattr(self, "der_forced", False):
+ write_full(writer, self._encode())
+ else:
+ self._encode_cer(writer)
+ if self._expl is not None:
+ write_full(writer, EOC)
+
+ def _encode_cer(self, writer):
+ write_full(writer, self._encode())
+
def hexencode(self):
"""Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
"""
ctx=None,
tag_only=False,
_ctx_immutable=True,
+ ):
+ result = next(self.decode_evgen(
+ data,
+ offset,
+ leavemm,
+ decode_path,
+ ctx,
+ tag_only,
+ _ctx_immutable,
+ _evgen_mode=False,
+ ))
+ if result is None:
+ return None
+ _, obj, tail = result
+ return obj, tail
+
+ def decode_evgen(
+ self,
+ data,
+ offset=0,
+ leavemm=False,
+ decode_path=(),
+ ctx=None,
+ tag_only=False,
+ _ctx_immutable=True,
+ _evgen_mode=True,
):
"""Decode the data
elif _ctx_immutable:
ctx = copy(ctx)
tlv = memoryview(data)
+ if (
+ _evgen_mode and
+ get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
+ ):
+ _evgen_mode = False
if self._expl is None:
- result = self._decode(
- tlv,
- offset,
- decode_path=decode_path,
- ctx=ctx,
- tag_only=tag_only,
- )
- if tag_only:
- return None
- obj, tail = result
+ for result in self._decode(
+ tlv,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx,
+ tag_only=tag_only,
+ evgen_mode=_evgen_mode,
+ ):
+ if tag_only:
+ yield None
+ return
+ _decode_path, obj, tail = result
+ if not _decode_path is decode_path:
+ yield result
else:
try:
t, tlen, lv = tag_strip(tlv)
)
llen, v = 1, lv[1:]
offset += tlen + llen
- result = self._decode(
- v,
- offset=offset,
- decode_path=decode_path,
- ctx=ctx,
- tag_only=tag_only,
- )
- if tag_only: # pragma: no cover
- return None
- obj, tail = result
+ for result in self._decode(
+ v,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx,
+ tag_only=tag_only,
+ evgen_mode=_evgen_mode,
+ ):
+ if tag_only: # pragma: no cover
+ yield None
+ return
+ _decode_path, obj, tail = result
+ if not _decode_path is decode_path:
+ yield result
eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
if eoc_expected.tobytes() != EOC:
raise DecodeError(
decode_path=decode_path,
offset=offset,
)
- result = self._decode(
- v,
- offset=offset + tlen + llen,
- decode_path=decode_path,
- ctx=ctx,
- tag_only=tag_only,
- )
- if tag_only: # pragma: no cover
- return None
- obj, tail = result
+ for result in self._decode(
+ v,
+ offset=offset + tlen + llen,
+ decode_path=decode_path,
+ ctx=ctx,
+ tag_only=tag_only,
+ evgen_mode=_evgen_mode,
+ ):
+ if tag_only: # pragma: no cover
+ yield None
+ return
+ _decode_path, obj, tail = result
+ if not _decode_path is decode_path:
+ yield result
if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
raise DecodeError(
"explicit tag out-of-bound, longer than data",
decode_path=decode_path,
offset=offset,
)
- return obj, (tail if leavemm else tail.tobytes())
+ yield decode_path, obj, (tail if leavemm else tail.tobytes())
def decod(self, data, offset=0, decode_path=(), ctx=None):
"""Decode the data, check that tail is empty
)
+def encode_cer(obj):
+ """Encode to CER in memory
+ """
+ buf = BytesIO()
+ obj.encode_cer(buf.write)
+ return buf.getvalue()
+
+
class DecodePathDefBy(object):
"""DEFINED BY representation inside decode path
"""
with_colours=False,
with_decode_path=False,
decode_path_only=(),
+ decode_path=(),
):
"""Pretty print object
else:
for row in _pprint_pps(pp):
yield row
- return "\n".join(_pprint_pps(obj.pps()))
+ return "\n".join(_pprint_pps(obj.pps(decode_path)))
########################################################################
(b"\xFF" if self._value else b"\x00"),
))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
try:
l, _, v = len_decode(lv)
except DecodeError as err:
_decoded=(offset, 1, 1),
)
obj.ber_encoded = ber_encoded
- return obj, v[1:]
+ yield decode_path, obj, v[1:]
def __repr__(self):
return pp_console_row(next(self.pps()))
break
return b"".join((self.tag, len_encode(len(octets)), octets))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
decode_path=decode_path,
offset=offset,
)
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return pp_console_row(next(self.pps()))
octets,
))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _encode_cer(self, writer):
+ bit_len, octets = self._value
+ if len(octets) + 1 <= 1000:
+ write_full(writer, self._encode())
+ return
+ write_full(writer, self.tag_constructed)
+ write_full(writer, LENINDEF)
+ for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
+ write_full(writer, b"".join((
+ BitString.tag_default,
+ LEN1K,
+ int2byte(0),
+ octets[offset:offset + 999],
+ )))
+ tail = octets[offset+999:]
+ if len(tail) > 0:
+ tail = int2byte((8 - bit_len % 8) % 8) + tail
+ write_full(writer, b"".join((
+ BitString.tag_default,
+ len_encode(len(tail)),
+ tail,
+ )))
+ write_full(writer, EOC)
+
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
)
if t == self.tag:
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
offset=offset,
)
v, tail = v[:l], v[l:]
+ bit_len = (len(v) - 1) * 8 - pad_size
obj = self.__class__(
- value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()),
+ value=None if evgen_mode else (bit_len, v[1:].tobytes()),
impl=self.tag,
expl=self._expl,
default=self.default,
_specs=self.specs,
_decoded=(offset, llen, l),
)
- return obj, tail
+ if evgen_mode:
+ obj._value = (bit_len, None)
+ yield decode_path, obj, tail
+ return
if t != self.tag_constructed:
raise TagMismatch(
klass=self.__class__,
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
lenindef = False
try:
l, llen, v = len_decode(lv)
)
sub_decode_path = decode_path + (str(len(chunks)),)
try:
- chunk, v_tail = BitString().decode(
- v,
- offset=sub_offset,
- decode_path=sub_decode_path,
- leavemm=True,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, chunk, v_tail in BitString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, chunk, v_tail
+ else:
+ _, chunk, v_tail = next(BitString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
except TagMismatch:
raise DecodeError(
"expected BitString encoded chunk",
decode_path=decode_path + (str(chunk_i),),
offset=chunk.offset,
)
- values.append(bytes(chunk))
+ if not evgen_mode:
+ values.append(bytes(chunk))
bit_len += chunk.bit_len
chunk_last = chunks[-1]
- values.append(bytes(chunk_last))
+ if not evgen_mode:
+ values.append(bytes(chunk_last))
bit_len += chunk_last.bit_len
obj = self.__class__(
- value=(bit_len, b"".join(values)),
+ value=None if evgen_mode else (bit_len, b"".join(values)),
impl=self.tag,
expl=self._expl,
default=self.default,
_specs=self.specs,
_decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
)
+ if evgen_mode:
+ obj._value = (bit_len, None)
obj.lenindef = lenindef
obj.ber_encoded = True
- return obj, (v[EOC_LEN:] if lenindef else v)
+ yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
def __repr__(self):
return pp_console_row(next(self.pps()))
if self.ready:
bit_len, blob = self._value
value = "%d bits" % bit_len
- if len(self.specs) > 0:
+ if len(self.specs) > 0 and blob is not None:
blob = tuple(self.named)
yield _pp(
obj=self,
>>> OctetString(b"hell", bounds=(4, 4))
OCTET STRING 4 bytes 68656c6c
- .. note::
-
- Pay attention that OCTET STRING can be encoded both in primitive
- and constructed forms. Decoder always checks constructed form tag
- additionally to specified primitive one. If BER decoding is
- :ref:`not enabled <bered_ctx>`, then decoder will fail, because
- of DER restrictions.
+ Memoryviews can be used as a values. If memoryview is made on
+ mmap-ed file, then it does not take storage inside OctetString
+ itself. In CER encoding mode it will be streamed to the specified
+ writer, copying 1 KB chunks.
"""
__slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
tag_default = tag_encode(4)
asn1_type_name = "OCTET STRING"
+ evgen_mode_skip_value = True
def __init__(
self,
)
def _value_sanitize(self, value):
- if value.__class__ == binary_type:
+ if value.__class__ == binary_type or value.__class__ == memoryview:
pass
elif issubclass(value.__class__, OctetString):
value = value._value
else:
- raise InvalidValueType((self.__class__, bytes))
+ raise InvalidValueType((self.__class__, bytes, memoryview))
if not self._bound_min <= len(value) <= self._bound_max:
raise BoundsError(self._bound_min, len(value), self._bound_max)
return value
def __bytes__(self):
self._assert_ready()
- return self._value
+ return bytes(self._value)
def __eq__(self, their):
if their.__class__ == binary_type:
self._value,
))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _encode_cer(self, writer):
+ octets = self._value
+ if len(octets) <= 1000:
+ write_full(writer, self._encode())
+ return
+ write_full(writer, self.tag_constructed)
+ write_full(writer, LENINDEF)
+ for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
+ write_full(writer, b"".join((
+ OctetString.tag_default,
+ LEN1K,
+ octets[offset:offset + 1000],
+ )))
+ tail = octets[offset+1000:]
+ if len(tail) > 0:
+ write_full(writer, b"".join((
+ OctetString.tag_default,
+ len_encode(len(tail)),
+ tail,
+ )))
+ write_full(writer, EOC)
+
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
)
if t == self.tag:
if tag_only:
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
offset=offset,
)
v, tail = v[:l], v[l:]
+ if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
+ raise DecodeError(
+ msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
+ klass=self.__class__,
+ decode_path=decode_path,
+ offset=offset,
+ )
try:
obj = self.__class__(
- value=v.tobytes(),
+ value=(
+ None if (evgen_mode and self.evgen_mode_skip_value)
+ else v.tobytes()
+ ),
bounds=(self._bound_min, self._bound_max),
impl=self.tag,
expl=self._expl,
decode_path=decode_path,
offset=offset,
)
- return obj, tail
+ yield decode_path, obj, tail
+ return
if t != self.tag_constructed:
raise TagMismatch(
klass=self.__class__,
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
lenindef = False
try:
l, llen, v = len_decode(lv)
offset=offset,
)
chunks = []
+ chunks_count = 0
sub_offset = offset + tlen + llen
vlen = 0
+ payload_len = 0
while True:
if lenindef:
if v[:EOC_LEN].tobytes() == EOC:
decode_path=decode_path + (str(len(chunks) - 1),),
offset=chunks[-1].offset,
)
- sub_decode_path = decode_path + (str(len(chunks)),)
try:
- chunk, v_tail = OctetString().decode(
- v,
- offset=sub_offset,
- decode_path=sub_decode_path,
- leavemm=True,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ sub_decode_path = decode_path + (str(chunks_count),)
+ for _decode_path, chunk, v_tail in OctetString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, chunk, v_tail
+ if not chunk.ber_encoded:
+ payload_len += chunk.vlen
+ chunks_count += 1
+ else:
+ sub_decode_path = decode_path + (str(len(chunks)),)
+ _, chunk, v_tail = next(OctetString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
+ chunks.append(chunk)
except TagMismatch:
raise DecodeError(
"expected OctetString encoded chunk",
decode_path=sub_decode_path,
offset=sub_offset,
)
- chunks.append(chunk)
sub_offset += chunk.tlvlen
vlen += chunk.tlvlen
v = v_tail
+ if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
+ raise DecodeError(
+ msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
+ klass=self.__class__,
+ decode_path=decode_path,
+ offset=offset,
+ )
try:
obj = self.__class__(
- value=b"".join(bytes(chunk) for chunk in chunks),
+ value=(
+ None if evgen_mode else
+ b"".join(bytes(chunk) for chunk in chunks)
+ ),
bounds=(self._bound_min, self._bound_max),
impl=self.tag,
expl=self._expl,
)
obj.lenindef = lenindef
obj.ber_encoded = True
- return obj, (v[EOC_LEN:] if lenindef else v)
+ yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
def __repr__(self):
return pp_console_row(next(self.pps()))
yield pp
+def agg_octet_string(evgens, decode_path, raw, writer):
+ """Aggregate constructed string (OctetString and its derivatives)
+
+ :param evgens: iterator of generated events
+ :param decode_path: points to the string we want to decode
+ :param raw: slicebable (memoryview, bytearray, etc) with
+ the data evgens are generated one
+ :param writer: buffer.write where string is going to be saved
+ """
+ decode_path_len = len(decode_path)
+ for dp, obj, _ in evgens:
+ if dp[:decode_path_len] != decode_path:
+ continue
+ if not obj.ber_encoded:
+ write_full(writer, raw[
+ obj.offset + obj.tlen + obj.llen:
+ obj.offset + obj.tlen + obj.llen + obj.vlen -
+ (EOC_LEN if obj.expl_lenindef else 0)
+ ])
+ if len(dp) == decode_path_len:
+ break
+
+
NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
def _encode(self):
return self.tag + len_encode(0)
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
try:
l, _, v = len_decode(lv)
except DecodeError as err:
optional=self.optional,
_decoded=(offset, 1, 0),
)
- return obj, v
+ yield decode_path, obj, v
def __repr__(self):
return pp_console_row(next(self.pps()))
v = b"".join(octets)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
)
if ber_encoded:
obj.ber_encoded = True
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return pp_console_row(next(self.pps()))
tag_default = tag_encode(23)
encoding = "ascii"
asn1_type_name = "UTCTime"
+ evgen_mode_skip_value = False
def __init__(
self,
value = self._encode_time()
return b"".join((self.tag, len_encode(len(value)), value))
+ def _encode_cer(self, writer):
+ write_full(writer, self._encode())
+
def todatetime(self):
return self._value
self._assert_ready()
return self._value[1].tag_order if self._tag_order is None else self._tag_order
+ @property
+ def tag_order_cer(self):
+ return min(v.tag_order_cer for v in itervalues(self.specs))
+
def __getitem__(self, key):
if key not in self.specs:
raise ObjUnknown(key)
self._assert_ready()
return self._value[1].encode()
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _encode_cer(self, writer):
+ self._assert_ready()
+ self._value[1].encode_cer(writer)
+
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
for choice, spec in iteritems(self.specs):
sub_decode_path = decode_path + (choice,)
try:
offset=offset,
)
if tag_only: # pragma: no cover
- return None
- value, tail = spec.decode(
- tlv,
- offset=offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ yield None
+ return
+ if evgen_mode:
+ for _decode_path, value, tail in spec.decode_evgen(
+ tlv,
+ offset=offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, tail
+ else:
+ _, value, tail = next(spec.decode_evgen(
+ tlv,
+ offset=offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
obj = self.__class__(
schema=self.specs,
expl=self._expl,
_decoded=(offset, 0, value.fulllen),
)
obj._value = (choice, value)
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
value = pp_console_row(next(self.pps()))
"""``ANY`` special type
>>> Any(Integer(-123))
- ANY 020185
+ ANY INTEGER -123 (0X:7B)
>>> a = Any(OctetString(b"hello world").encode())
ANY 040b68656c6c6f20776f726c64
>>> hexenc(bytes(a))
return value
if isinstance(value, self.__class__):
return value._value
- if isinstance(value, Obj):
- return value.encode()
- raise InvalidValueType((self.__class__, Obj, binary_type))
+ if not isinstance(value, Obj):
+ raise InvalidValueType((self.__class__, Obj, binary_type))
+ return value
@property
def ready(self):
def __eq__(self, their):
if their.__class__ == binary_type:
- return self._value == their
+ if self._value.__class__ == binary_type:
+ return self._value == their
+ return self._value.encode() == their
if issubclass(their.__class__, Any):
- return self._value == their._value
+ if self.ready and their.ready:
+ return bytes(self) == bytes(their)
+ return self.ready == their.ready
return False
def __call__(
def __bytes__(self):
self._assert_ready()
- return self._value
+ value = self._value
+ if value.__class__ == binary_type:
+ return value
+ return self._value.encode()
@property
def tlen(self):
def _encode(self):
self._assert_ready()
- return self._value
+ value = self._value
+ if value.__class__ == binary_type:
+ return value
+ return value.encode()
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _encode_cer(self, writer):
+ self._assert_ready()
+ value = self._value
+ if value.__class__ == binary_type:
+ write_full(writer, value)
+ else:
+ value.encode_cer(writer)
+
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
chunk_i += 1
tlvlen = tlen + llen + vlen + EOC_LEN
obj = self.__class__(
- value=tlv[:tlvlen].tobytes(),
+ value=None if evgen_mode else tlv[:tlvlen].tobytes(),
expl=self._expl,
optional=self.optional,
_decoded=(offset, 0, tlvlen),
)
obj.lenindef = True
obj.tag = t.tobytes()
- return obj, v[EOC_LEN:]
+ yield decode_path, obj, v[EOC_LEN:]
+ return
except DecodeError as err:
raise err.__class__(
msg=err.msg,
tlvlen = tlen + llen + l
v, tail = tlv[:tlvlen], v[l:]
obj = self.__class__(
- value=v.tobytes(),
+ value=None if evgen_mode else v.tobytes(),
expl=self._expl,
optional=self.optional,
_decoded=(offset, 0, tlvlen),
)
obj.tag = t.tobytes()
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return pp_console_row(next(self.pps()))
def pps(self, decode_path=()):
+ value = self._value
+ if value is None:
+ pass
+ elif value.__class__ == binary_type:
+ value = None
+ else:
+ value = repr(value)
yield _pp(
obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
- blob=self._value if self.ready else None,
+ value=value,
+ blob=self._value if self._value.__class__ == binary_type else None,
optional=self.optional,
default=self == self.default,
impl=None if self.tag == self.tag_default else tag_decode(self.tag),
defaulted values existence validation by setting
``"allow_default_values": True`` :ref:`context <ctx>` option.
+ .. warning::
+
+ Check for default value existence is not performed in
+ ``evgen_mode``, because previously decoded values are not stored
+ in memory, to be able to compare them.
+
Two sequences are equal if they have equal specification (schema),
implicit/explicit tagging and the same values.
"""
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in self._values_for_encoding():
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
lenindef = False
ctx_bered = ctx.get("bered", False)
try:
continue
sub_decode_path = decode_path + (name,)
try:
- value, v_tail = spec.decode(
- v,
- sub_offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, value, v_tail in spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, v_tail
+ else:
+ _, value, v_tail = next(spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
except TagMismatch as err:
if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
continue
raise
defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
- if defined is not None:
+ if not evgen_mode and defined is not None:
defined_by, defined_spec = defined
if issubclass(value.__class__, SequenceOf):
for i, _value in enumerate(value):
vlen += value_len
sub_offset += value_len
v = v_tail
- if spec.default is not None and value == spec.default:
- if ctx_bered or ctx_allow_default_values:
- ber_encoded = True
- else:
- raise DecodeError(
- "DEFAULT value met",
- klass=self.__class__,
- decode_path=sub_decode_path,
- offset=sub_offset,
- )
- values[name] = value
-
- spec_defines = getattr(spec, "defines", ())
- if len(spec_defines) == 0:
- defines_by_path = ctx.get("defines_by_path", ())
- if len(defines_by_path) > 0:
- spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
- if spec_defines is not None and len(spec_defines) > 0:
- for rel_path, schema in spec_defines:
- defined = schema.get(value, None)
- if defined is not None:
- ctx.setdefault("_defines", []).append((
- abs_decode_path(sub_decode_path[:-1], rel_path),
- (value, defined),
- ))
+ if not evgen_mode:
+ if spec.default is not None and value == spec.default:
+ # This will not work in evgen_mode
+ if ctx_bered or ctx_allow_default_values:
+ ber_encoded = True
+ else:
+ raise DecodeError(
+ "DEFAULT value met",
+ klass=self.__class__,
+ decode_path=sub_decode_path,
+ offset=sub_offset,
+ )
+ values[name] = value
+ spec_defines = getattr(spec, "defines", ())
+ if len(spec_defines) == 0:
+ defines_by_path = ctx.get("defines_by_path", ())
+ if len(defines_by_path) > 0:
+ spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
+ if spec_defines is not None and len(spec_defines) > 0:
+ for rel_path, schema in spec_defines:
+ defined = schema.get(value, None)
+ if defined is not None:
+ ctx.setdefault("_defines", []).append((
+ abs_decode_path(sub_decode_path[:-1], rel_path),
+ (value, defined),
+ ))
if lenindef:
if v[:EOC_LEN].tobytes() != EOC:
raise DecodeError(
obj._value = values
obj.lenindef = lenindef
obj.ber_encoded = ber_encoded
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
value = pp_console_row(next(self.pps()))
))
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in sorted(
+ self._values_for_encoding(),
+ key=attrgetter("tag_order_cer"),
+ ):
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
lenindef = False
ctx_bered = ctx.get("bered", False)
try:
decode_path=decode_path,
offset=offset,
)
- value, v_tail = spec.decode(
- v,
- sub_offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, value, v_tail in spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, v_tail
+ else:
+ _, value, v_tail = next(spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
value_tag_order = value.tag_order
value_len = value.fulllen
if tag_order_prev >= value_tag_order:
)
tail = v[EOC_LEN:]
obj.lenindef = True
- obj._value = values
for name, spec in iteritems(self.specs):
if name not in values and not spec.optional:
raise DecodeError(
decode_path=decode_path,
offset=offset,
)
+ if not evgen_mode:
+ obj._value = values
obj.ber_encoded = ber_encoded
- return obj, tail
+ yield decode_path, obj, tail
SequenceOfState = namedtuple(
>>> ints
Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
- Also you can initialize sequence with preinitialized values:
+ You can initialize sequence with preinitialized values:
>>> ints = Ints([Integer(123), Integer(234)])
+
+ Also you can use iterator as a value:
+
+ >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
+
+ And it won't be iterated until encoding process. Pay attention that
+ bounds and required schema checks are done only during the encoding
+ process in that case! After encode was called, then value is zeroed
+ back to empty list and you have to set it again. That mode is useful
+ mainly with CER encoding mode, where all objects from the iterable
+ will be streamed to the buffer, without copying all of them to
+ memory first.
"""
__slots__ = ("spec", "_bound_min", "_bound_max")
tag_default = tag_encode(form=TagFormConstructed, num=16)
self._value = copy(default_obj._value)
def _value_sanitize(self, value):
+ iterator = False
if issubclass(value.__class__, SequenceOf):
value = value._value
+ elif hasattr(value, NEXT_ATTR_NAME):
+ iterator = True
+ value = value
elif hasattr(value, "__iter__"):
value = list(value)
else:
- raise InvalidValueType((self.__class__, iter))
- if not self._bound_min <= len(value) <= self._bound_max:
- raise BoundsError(self._bound_min, len(value), self._bound_max)
- for v in value:
- if not isinstance(v, self.spec.__class__):
- raise InvalidValueType((self.spec.__class__,))
+ raise InvalidValueType((self.__class__, iter, "iterator"))
+ if not iterator:
+ if not self._bound_min <= len(value) <= self._bound_max:
+ raise BoundsError(self._bound_min, len(value), self._bound_max)
+ class_expected = self.spec.__class__
+ for v in value:
+ if not isinstance(v, class_expected):
+ raise InvalidValueType((class_expected,))
return value
@property
def ready(self):
+ if hasattr(self._value, NEXT_ATTR_NAME):
+ return True
+ if self._bound_min > 0 and len(self._value) == 0:
+ return False
return all(v.ready for v in self._value)
@property
return any(v.bered for v in self._value)
def __getstate__(self):
+ if hasattr(self._value, NEXT_ATTR_NAME):
+ raise ValueError("can not pickle SequenceOf with iterator")
return SequenceOfState(
__version__,
self.tag,
self._value.append(value)
def __iter__(self):
- self._assert_ready()
return iter(self._value)
def __len__(self):
- self._assert_ready()
return len(self._value)
def __setitem__(self, key, value):
return iter(self._value)
def _encode(self):
- v = b"".join(v.encode() for v in self._values_for_encoding())
- return b"".join((self.tag, len_encode(len(v)), v))
+ iterator = hasattr(self._value, NEXT_ATTR_NAME)
+ if iterator:
+ values = []
+ values_append = values.append
+ class_expected = self.spec.__class__
+ values_for_encoding = self._values_for_encoding()
+ self._value = []
+ for v in values_for_encoding:
+ if not isinstance(v, class_expected):
+ raise InvalidValueType((class_expected,))
+ values_append(v.encode())
+ if not self._bound_min <= len(values) <= self._bound_max:
+ raise BoundsError(self._bound_min, len(values), self._bound_max)
+ value = b"".join(values)
+ else:
+ value = b"".join(v.encode() for v in self._values_for_encoding())
+ return b"".join((self.tag, len_encode(len(value)), value))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only, ordering_check=False):
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ iterator = hasattr(self._value, NEXT_ATTR_NAME)
+ if iterator:
+ class_expected = self.spec.__class__
+ values_count = 0
+ values_for_encoding = self._values_for_encoding()
+ self._value = []
+ for v in values_for_encoding:
+ if not isinstance(v, class_expected):
+ raise InvalidValueType((class_expected,))
+ v.encode_cer(writer)
+ values_count += 1
+ if not self._bound_min <= values_count <= self._bound_max:
+ raise BoundsError(self._bound_min, values_count, self._bound_max)
+ else:
+ for v in self._values_for_encoding():
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
+ def _decode(
+ self,
+ tlv,
+ offset,
+ decode_path,
+ ctx,
+ tag_only,
+ evgen_mode,
+ ordering_check=False,
+ ):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
lenindef = False
ctx_bered = ctx.get("bered", False)
try:
vlen = 0
sub_offset = offset + tlen + llen
_value = []
+ _value_count = 0
ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
value_prev = memoryview(v[:0])
ber_encoded = False
while len(v) > 0:
if lenindef and v[:EOC_LEN].tobytes() == EOC:
break
- sub_decode_path = decode_path + (str(len(_value)),)
- value, v_tail = spec.decode(
- v,
- sub_offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ sub_decode_path = decode_path + (str(_value_count),)
+ if evgen_mode:
+ for _decode_path, value, v_tail in spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, v_tail
+ else:
+ _, value, v_tail = next(spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
value_len = value.fulllen
if ordering_check:
if value_prev.tobytes() > v[:value_len].tobytes():
offset=sub_offset,
)
value_prev = v[:value_len]
- _value.append(value)
+ _value_count += 1
+ if not evgen_mode:
+ _value.append(value)
sub_offset += value_len
vlen += value_len
v = v_tail
+ if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
+ raise DecodeError(
+ msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
+ klass=self.__class__,
+ decode_path=decode_path,
+ offset=offset,
+ )
try:
obj = self.__class__(
- value=_value,
+ value=None if evgen_mode else _value,
schema=spec,
bounds=(self._bound_min, self._bound_max),
impl=self.tag,
obj.lenindef = True
tail = v[EOC_LEN:]
obj.ber_encoded = ber_encoded
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return "%s[%s]" % (
tag_default = tag_encode(form=TagFormConstructed, num=17)
asn1_type_name = "SET OF"
+ def _value_sanitize(self, value):
+ value = super(SetOf, self)._value_sanitize(value)
+ if hasattr(value, NEXT_ATTR_NAME):
+ raise ValueError(
+ "SetOf does not support iterator values, as no sense in them"
+ )
+ return value
+
def _encode(self):
v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
+ write_full(writer, v)
+ write_full(writer, EOC)
+
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
return super(SetOf, self)._decode(
tlv,
offset,
decode_path,
ctx,
tag_only,
+ evgen_mode,
ordering_check=True,
)
help="Allow explicit tag out-of-bound",
)
parser.add_argument(
- "DERFile",
+ "--evgen",
+ action="store_true",
+ help="Turn on event generation mode",
+ )
+ parser.add_argument(
+ "RAWFile",
type=argparse.FileType("rb"),
- help="Path to DER file you want to decode",
+ help="Path to BER/CER/DER file you want to decode",
)
args = parser.parse_args()
- args.DERFile.seek(args.skip)
- der = memoryview(args.DERFile.read())
- args.DERFile.close()
+ if PY2:
+ args.RAWFile.seek(args.skip)
+ raw = memoryview(args.RAWFile.read())
+ args.RAWFile.close()
+ else:
+ raw = file_mmaped(args.RAWFile)[args.skip:]
oid_maps = (
[obj_by_path(_path) for _path in (args.oids or "").split(",")]
if args.oids else ()
}
if args.defines_by_path is not None:
ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
- obj, tail = schema().decode(der, ctx=ctx)
from os import environ
- print(pprinter(
- obj,
+ pprinter = partial(
+ pprinter,
oid_maps=oid_maps,
with_colours=environ.get("NO_COLOR") is None,
with_decode_path=args.print_decode_path,
() if args.decode_path_only is None else
tuple(args.decode_path_only.split(":"))
),
- ))
+ )
+ if args.evgen:
+ for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
+ print(pprinter(obj, decode_path=decode_path))
+ else:
+ obj, tail = schema().decode(raw, ctx=ctx)
+ print(pprinter(obj))
if tail != b"":
print("\nTrailing data: %s" % hexenc(tail))