return float("0." + fractions_raw)
+def get_def_by_path(defines_by_path, sub_decode_path):
+ """Get define by decode path
+ """
+ for path, define in defines_by_path:
+ if len(path) != len(sub_decode_path):
+ continue
+ for p1, p2 in zip(path, sub_decode_path):
+ if (not p1 is any) and (p1 != p2):
+ break
+ else:
+ return define
+
+
########################################################################
# Errors
########################################################################
def _encode(self): # pragma: no cover
raise NotImplementedError()
- def _decode(self, tlv, offset, decode_path, ctx, tag_only): # pragma: no cover
- raise NotImplementedError()
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
+ yield NotImplemented
def encode(self):
"""Encode the structure
ctx=None,
tag_only=False,
_ctx_immutable=True,
+ ):
+ result = next(self.decode_evgen(
+ data,
+ offset,
+ leavemm,
+ decode_path,
+ ctx,
+ tag_only,
+ _ctx_immutable,
+ _evgen_mode=False,
+ ))
+ if result is None:
+ return None
+ _, obj, tail = result
+ return obj, tail
+
+ def decode_evgen(
+ self,
+ data,
+ offset=0,
+ leavemm=False,
+ decode_path=(),
+ ctx=None,
+ tag_only=False,
+ _ctx_immutable=True,
+ _evgen_mode=True,
):
"""Decode the data
elif _ctx_immutable:
ctx = copy(ctx)
tlv = memoryview(data)
+ if (
+ _evgen_mode and
+ get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
+ ):
+ _evgen_mode = False
if self._expl is None:
- result = self._decode(
- tlv,
- offset,
- decode_path=decode_path,
- ctx=ctx,
- tag_only=tag_only,
- )
- if tag_only:
- return None
- obj, tail = result
+ for result in self._decode(
+ tlv,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx,
+ tag_only=tag_only,
+ evgen_mode=_evgen_mode,
+ ):
+ if tag_only:
+ yield None
+ return
+ _decode_path, obj, tail = result
+ if not _decode_path is decode_path:
+ yield result
else:
try:
t, tlen, lv = tag_strip(tlv)
)
llen, v = 1, lv[1:]
offset += tlen + llen
- result = self._decode(
- v,
- offset=offset,
- decode_path=decode_path,
- ctx=ctx,
- tag_only=tag_only,
- )
- if tag_only: # pragma: no cover
- return None
- obj, tail = result
+ for result in self._decode(
+ v,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx,
+ tag_only=tag_only,
+ evgen_mode=_evgen_mode,
+ ):
+ if tag_only: # pragma: no cover
+ yield None
+ return
+ _decode_path, obj, tail = result
+ if not _decode_path is decode_path:
+ yield result
eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
if eoc_expected.tobytes() != EOC:
raise DecodeError(
decode_path=decode_path,
offset=offset,
)
- result = self._decode(
- v,
- offset=offset + tlen + llen,
- decode_path=decode_path,
- ctx=ctx,
- tag_only=tag_only,
- )
- if tag_only: # pragma: no cover
- return None
- obj, tail = result
+ for result in self._decode(
+ v,
+ offset=offset + tlen + llen,
+ decode_path=decode_path,
+ ctx=ctx,
+ tag_only=tag_only,
+ evgen_mode=_evgen_mode,
+ ):
+ if tag_only: # pragma: no cover
+ yield None
+ return
+ _decode_path, obj, tail = result
+ if not _decode_path is decode_path:
+ yield result
if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
raise DecodeError(
"explicit tag out-of-bound, longer than data",
decode_path=decode_path,
offset=offset,
)
- return obj, (tail if leavemm else tail.tobytes())
+ yield decode_path, obj, (tail if leavemm else tail.tobytes())
def decod(self, data, offset=0, decode_path=(), ctx=None):
"""Decode the data, check that tail is empty
(b"\xFF" if self._value else b"\x00"),
))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
try:
l, _, v = len_decode(lv)
except DecodeError as err:
_decoded=(offset, 1, 1),
)
obj.ber_encoded = ber_encoded
- return obj, v[1:]
+ yield decode_path, obj, v[1:]
def __repr__(self):
return pp_console_row(next(self.pps()))
break
return b"".join((self.tag, len_encode(len(octets)), octets))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
decode_path=decode_path,
offset=offset,
)
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return pp_console_row(next(self.pps()))
octets,
))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
)
if t == self.tag:
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
offset=offset,
)
v, tail = v[:l], v[l:]
+ bit_len = (len(v) - 1) * 8 - pad_size
obj = self.__class__(
- value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()),
+ value=None if evgen_mode else (bit_len, v[1:].tobytes()),
impl=self.tag,
expl=self._expl,
default=self.default,
_specs=self.specs,
_decoded=(offset, llen, l),
)
- return obj, tail
+ if evgen_mode:
+ obj._value = (bit_len, None)
+ yield decode_path, obj, tail
+ return
if t != self.tag_constructed:
raise TagMismatch(
klass=self.__class__,
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
lenindef = False
try:
l, llen, v = len_decode(lv)
)
sub_decode_path = decode_path + (str(len(chunks)),)
try:
- chunk, v_tail = BitString().decode(
- v,
- offset=sub_offset,
- decode_path=sub_decode_path,
- leavemm=True,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, chunk, v_tail in BitString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, chunk, v_tail
+ else:
+ _, chunk, v_tail = next(BitString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
except TagMismatch:
raise DecodeError(
"expected BitString encoded chunk",
decode_path=decode_path + (str(chunk_i),),
offset=chunk.offset,
)
- values.append(bytes(chunk))
+ if not evgen_mode:
+ values.append(bytes(chunk))
bit_len += chunk.bit_len
chunk_last = chunks[-1]
- values.append(bytes(chunk_last))
+ if not evgen_mode:
+ values.append(bytes(chunk_last))
bit_len += chunk_last.bit_len
obj = self.__class__(
- value=(bit_len, b"".join(values)),
+ value=None if evgen_mode else (bit_len, b"".join(values)),
impl=self.tag,
expl=self._expl,
default=self.default,
_specs=self.specs,
_decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
)
+ if evgen_mode:
+ obj._value = (bit_len, None)
obj.lenindef = lenindef
obj.ber_encoded = True
- return obj, (v[EOC_LEN:] if lenindef else v)
+ yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
def __repr__(self):
return pp_console_row(next(self.pps()))
if self.ready:
bit_len, blob = self._value
value = "%d bits" % bit_len
- if len(self.specs) > 0:
+ if len(self.specs) > 0 and blob is not None:
blob = tuple(self.named)
yield _pp(
obj=self,
__slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
tag_default = tag_encode(4)
asn1_type_name = "OCTET STRING"
+ evgen_mode_skip_value = True
def __init__(
self,
self._value,
))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
)
if t == self.tag:
if tag_only:
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
offset=offset,
)
v, tail = v[:l], v[l:]
+ if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
+ raise DecodeError(
+ msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
+ klass=self.__class__,
+ decode_path=decode_path,
+ offset=offset,
+ )
try:
obj = self.__class__(
- value=v.tobytes(),
+ value=(
+ None if (evgen_mode and self.evgen_mode_skip_value)
+ else v.tobytes()
+ ),
bounds=(self._bound_min, self._bound_max),
impl=self.tag,
expl=self._expl,
decode_path=decode_path,
offset=offset,
)
- return obj, tail
+ yield decode_path, obj, tail
+ return
if t != self.tag_constructed:
raise TagMismatch(
klass=self.__class__,
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
lenindef = False
try:
l, llen, v = len_decode(lv)
offset=offset,
)
chunks = []
+ chunks_count = 0
sub_offset = offset + tlen + llen
vlen = 0
+ payload_len = 0
while True:
if lenindef:
if v[:EOC_LEN].tobytes() == EOC:
decode_path=decode_path + (str(len(chunks) - 1),),
offset=chunks[-1].offset,
)
- sub_decode_path = decode_path + (str(len(chunks)),)
try:
- chunk, v_tail = OctetString().decode(
- v,
- offset=sub_offset,
- decode_path=sub_decode_path,
- leavemm=True,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ sub_decode_path = decode_path + (str(chunks_count),)
+ for _decode_path, chunk, v_tail in OctetString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, chunk, v_tail
+ if not chunk.ber_encoded:
+ payload_len += chunk.vlen
+ chunks_count += 1
+ else:
+ sub_decode_path = decode_path + (str(len(chunks)),)
+ _, chunk, v_tail = next(OctetString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
+ chunks.append(chunk)
except TagMismatch:
raise DecodeError(
"expected OctetString encoded chunk",
decode_path=sub_decode_path,
offset=sub_offset,
)
- chunks.append(chunk)
sub_offset += chunk.tlvlen
vlen += chunk.tlvlen
v = v_tail
+ if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
+ raise DecodeError(
+ msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
+ klass=self.__class__,
+ decode_path=decode_path,
+ offset=offset,
+ )
try:
obj = self.__class__(
- value=b"".join(bytes(chunk) for chunk in chunks),
+ value=(
+ None if evgen_mode else
+ b"".join(bytes(chunk) for chunk in chunks)
+ ),
bounds=(self._bound_min, self._bound_max),
impl=self.tag,
expl=self._expl,
)
obj.lenindef = lenindef
obj.ber_encoded = True
- return obj, (v[EOC_LEN:] if lenindef else v)
+ yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
def __repr__(self):
return pp_console_row(next(self.pps()))
def _encode(self):
return self.tag + len_encode(0)
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
try:
l, _, v = len_decode(lv)
except DecodeError as err:
optional=self.optional,
_decoded=(offset, 1, 0),
)
- return obj, v
+ yield decode_path, obj, v
def __repr__(self):
return pp_console_row(next(self.pps()))
v = b"".join(octets)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
)
if ber_encoded:
obj.ber_encoded = True
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return pp_console_row(next(self.pps()))
tag_default = tag_encode(23)
encoding = "ascii"
asn1_type_name = "UTCTime"
+ evgen_mode_skip_value = False
def __init__(
self,
self._assert_ready()
return self._value[1].encode()
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
for choice, spec in iteritems(self.specs):
sub_decode_path = decode_path + (choice,)
try:
offset=offset,
)
if tag_only: # pragma: no cover
- return None
- value, tail = spec.decode(
- tlv,
- offset=offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ yield None
+ return
+ if evgen_mode:
+ for _decode_path, value, tail in spec.decode_evgen(
+ tlv,
+ offset=offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, tail
+ else:
+ _, value, tail = next(spec.decode_evgen(
+ tlv,
+ offset=offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
obj = self.__class__(
schema=self.specs,
expl=self._expl,
_decoded=(offset, 0, value.fulllen),
)
obj._value = (choice, value)
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
value = pp_console_row(next(self.pps()))
self._assert_ready()
return self._value
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
chunk_i += 1
tlvlen = tlen + llen + vlen + EOC_LEN
obj = self.__class__(
- value=tlv[:tlvlen].tobytes(),
+ value=None if evgen_mode else tlv[:tlvlen].tobytes(),
expl=self._expl,
optional=self.optional,
_decoded=(offset, 0, tlvlen),
)
obj.lenindef = True
obj.tag = t.tobytes()
- return obj, v[EOC_LEN:]
+ yield decode_path, obj, v[EOC_LEN:]
+ return
except DecodeError as err:
raise err.__class__(
msg=err.msg,
tlvlen = tlen + llen + l
v, tail = tlv[:tlvlen], v[l:]
obj = self.__class__(
- value=v.tobytes(),
+ value=None if evgen_mode else v.tobytes(),
expl=self._expl,
optional=self.optional,
_decoded=(offset, 0, tlvlen),
)
obj.tag = t.tobytes()
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return pp_console_row(next(self.pps()))
defaulted values existence validation by setting
``"allow_default_values": True`` :ref:`context <ctx>` option.
+ .. warning::
+
+ Check for default value existence is not performed in
+ ``evgen_mode``, because previously decoded values are not stored
+ in memory, to be able to compare them.
+
Two sequences are equal if they have equal specification (schema),
implicit/explicit tagging and the same values.
"""
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
lenindef = False
ctx_bered = ctx.get("bered", False)
try:
continue
sub_decode_path = decode_path + (name,)
try:
- value, v_tail = spec.decode(
- v,
- sub_offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, value, v_tail in spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, v_tail
+ else:
+ _, value, v_tail = next(spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
except TagMismatch as err:
if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
continue
raise
defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
- if defined is not None:
+ if not evgen_mode and defined is not None:
defined_by, defined_spec = defined
if issubclass(value.__class__, SequenceOf):
for i, _value in enumerate(value):
vlen += value_len
sub_offset += value_len
v = v_tail
- if spec.default is not None and value == spec.default:
- if ctx_bered or ctx_allow_default_values:
- ber_encoded = True
- else:
- raise DecodeError(
- "DEFAULT value met",
- klass=self.__class__,
- decode_path=sub_decode_path,
- offset=sub_offset,
- )
- values[name] = value
-
- spec_defines = getattr(spec, "defines", ())
- if len(spec_defines) == 0:
- defines_by_path = ctx.get("defines_by_path", ())
- if len(defines_by_path) > 0:
- spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
- if spec_defines is not None and len(spec_defines) > 0:
- for rel_path, schema in spec_defines:
- defined = schema.get(value, None)
- if defined is not None:
- ctx.setdefault("_defines", []).append((
- abs_decode_path(sub_decode_path[:-1], rel_path),
- (value, defined),
- ))
+ if not evgen_mode:
+ if spec.default is not None and value == spec.default:
+ # This will not work in evgen_mode
+ if ctx_bered or ctx_allow_default_values:
+ ber_encoded = True
+ else:
+ raise DecodeError(
+ "DEFAULT value met",
+ klass=self.__class__,
+ decode_path=sub_decode_path,
+ offset=sub_offset,
+ )
+ values[name] = value
+ spec_defines = getattr(spec, "defines", ())
+ if len(spec_defines) == 0:
+ defines_by_path = ctx.get("defines_by_path", ())
+ if len(defines_by_path) > 0:
+ spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
+ if spec_defines is not None and len(spec_defines) > 0:
+ for rel_path, schema in spec_defines:
+ defined = schema.get(value, None)
+ if defined is not None:
+ ctx.setdefault("_defines", []).append((
+ abs_decode_path(sub_decode_path[:-1], rel_path),
+ (value, defined),
+ ))
if lenindef:
if v[:EOC_LEN].tobytes() != EOC:
raise DecodeError(
obj._value = values
obj.lenindef = lenindef
obj.ber_encoded = ber_encoded
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
value = pp_console_row(next(self.pps()))
))
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
lenindef = False
ctx_bered = ctx.get("bered", False)
try:
decode_path=decode_path,
offset=offset,
)
- value, v_tail = spec.decode(
- v,
- sub_offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, value, v_tail in spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, v_tail
+ else:
+ _, value, v_tail = next(spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
value_tag_order = value.tag_order
value_len = value.fulllen
if tag_order_prev >= value_tag_order:
)
tail = v[EOC_LEN:]
obj.lenindef = True
- obj._value = values
for name, spec in iteritems(self.specs):
if name not in values and not spec.optional:
raise DecodeError(
decode_path=decode_path,
offset=offset,
)
+ if not evgen_mode:
+ obj._value = values
obj.ber_encoded = ber_encoded
- return obj, tail
+ yield decode_path, obj, tail
SequenceOfState = namedtuple(
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only, ordering_check=False):
+ def _decode(
+ self,
+ tlv,
+ offset,
+ decode_path,
+ ctx,
+ tag_only,
+ evgen_mode,
+ ordering_check=False,
+ ):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
lenindef = False
ctx_bered = ctx.get("bered", False)
try:
vlen = 0
sub_offset = offset + tlen + llen
_value = []
+ _value_count = 0
ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
value_prev = memoryview(v[:0])
ber_encoded = False
while len(v) > 0:
if lenindef and v[:EOC_LEN].tobytes() == EOC:
break
- sub_decode_path = decode_path + (str(len(_value)),)
- value, v_tail = spec.decode(
- v,
- sub_offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ sub_decode_path = decode_path + (str(_value_count),)
+ if evgen_mode:
+ for _decode_path, value, v_tail in spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, v_tail
+ else:
+ _, value, v_tail = next(spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
value_len = value.fulllen
if ordering_check:
if value_prev.tobytes() > v[:value_len].tobytes():
offset=sub_offset,
)
value_prev = v[:value_len]
- _value.append(value)
+ _value_count += 1
+ if not evgen_mode:
+ _value.append(value)
sub_offset += value_len
vlen += value_len
v = v_tail
+ if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
+ raise DecodeError(
+ msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
+ klass=self.__class__,
+ decode_path=decode_path,
+ offset=offset,
+ )
try:
obj = self.__class__(
- value=_value,
+ value=None if evgen_mode else _value,
schema=spec,
bounds=(self._bound_min, self._bound_max),
impl=self.tag,
obj.lenindef = True
tail = v[EOC_LEN:]
obj.ber_encoded = ber_encoded
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return "%s[%s]" % (
v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
return super(SetOf, self)._decode(
tlv,
offset,
decode_path,
ctx,
tag_only,
+ evgen_mode,
ordering_check=True,
)
integers(min_value=1).map(tag_ctxc),
integers(min_value=0),
binary(max_size=5),
+ decode_path_strat,
)
- def test_symmetric(self, values, value, tag_expl, offset, tail_junk):
+ def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path):
for klass in (Boolean, BooleanInherited):
_, _, _, default, optional, _decoded = values
obj = klass(
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ hexdec(obj_expled_hex_encoded) + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 1)
+ _decode_path, obj, tail = evgens[0]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ self.assertEqual(obj, obj_decoded)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
@given(integers(min_value=2))
def test_invalid_len(self, l):
with self.assertRaises(InvalidLength):
len_encode(1),
int2byte(value),
)))
- obj, _ = Boolean().decode(
- b"".join((
- Boolean.tag_default,
- len_encode(1),
- int2byte(value),
- )),
- ctx={"bered": True},
- )
+ encoded = b"".join((
+ Boolean.tag_default,
+ len_encode(1),
+ int2byte(value),
+ ))
+ obj, _ = Boolean().decode(encoded, ctx={"bered": True})
+ list(Boolean().decode_evgen(encoded, ctx={"bered": True}))
self.assertTrue(bool(obj))
self.assertTrue(obj.ber_encoded)
self.assertFalse(obj.lenindef)
with self.assertRaises(LenIndefForm):
SeqOf().decode(encoded)
seqof, tail = SeqOf().decode(encoded, ctx={"bered": True})
+ list(SeqOf().decode_evgen(encoded, ctx={"bered": True}))
self.assertSequenceEqual(tail, b"")
self.assertSequenceEqual([bool(v) for v in seqof], values)
self.assertSetEqual(
integers(min_value=1).map(tag_ctxc),
integers(min_value=0),
binary(max_size=5),
+ decode_path_strat,
)
- def test_symmetric(self, values, value, tag_expl, offset, tail_junk):
+ def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path):
for klass in (Integer, IntegerInherited):
_, _, _, _, default, optional, _, _decoded = values
obj = klass(
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ obj_expled_encoded + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 1)
+ _decode_path, obj, tail = evgens[0]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ self.assertEqual(obj, obj_decoded)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
def test_go_vectors_valid(self):
for data, expect in ((
(b"\x00", 0),
tail_junk = d.draw(binary(max_size=5))
tag_expl = tag_ctxc(d.draw(integers(min_value=1)))
offset = d.draw(integers(min_value=0))
+ decode_path = d.draw(decode_path_strat)
for klass in (BitString, BitStringInherited):
class BS(klass):
schema = _schema
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ obj_expled_encoded + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 1)
+ _decode_path, obj, tail = evgens[0]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
@given(integers(min_value=1, max_value=255))
def test_bad_zero_value(self, pad_size):
with self.assertRaises(DecodeError):
self.assertTrue(obj[9])
self.assertFalse(obj[17])
+ @settings(max_examples=LONG_TEST_MAX_EXAMPLES)
@given(
integers(min_value=1, max_value=30),
lists(
),
lists(booleans(), min_size=1),
binary(),
+ decode_path_strat,
)
- def test_constructed(self, impl, chunk_inputs, chunk_last_bits, junk):
+ def test_constructed(self, impl, chunk_inputs, chunk_last_bits, junk, decode_path):
def chunk_constructed(contents):
return (
tag_encode(form=TagFormConstructed, num=3) +
EOC
)
chunks = []
+ chunks_len_expected = []
payload_expected = b""
bit_len_expected = 0
for chunk_input in chunk_inputs:
chunks.append(BitString(chunk_input).encode())
payload_expected += chunk_input
bit_len_expected += len(chunk_input) * 8
+ chunks_len_expected.append(len(chunk_input) + 1)
else:
chunks.append(chunk_constructed(chunk_input))
payload = b"".join(chunk_input)
payload_expected += payload
bit_len_expected += len(payload) * 8
+ for c in chunk_input:
+ chunks_len_expected.append(len(c) + 1)
+ chunks_len_expected.append(len(chunks[-1]) - 1 - 1)
chunk_last = BitString("'%s'B" % "".join(
"1" if bit else "0" for bit in chunk_last_bits
))
+ chunks_len_expected.append(BitString().decod(chunk_last.encode()).vlen)
payload_expected += bytes(chunk_last)
bit_len_expected += chunk_last.bit_len
encoded_indefinite = (
list(obj.pps())
pprint(obj, big_blobs=True, with_decode_path=True)
+ evgens = list(BitString(impl=tag_encode(impl)).decode_evgen(
+ encoded,
+ decode_path=decode_path,
+ ctx={"bered": True},
+ ))
+ self.assertEqual(len(evgens), len(chunks_len_expected) + 1)
+ for chunk_len_expected, (dp, obj, _) in zip(chunks_len_expected, evgens):
+ self.assertGreater(len(dp), len(decode_path))
+ self.assertEqual(obj.vlen, chunk_len_expected)
+
@given(
integers(min_value=0),
decode_path_strat,
integers(min_value=1).map(tag_ctxc),
integers(min_value=0),
binary(max_size=5),
+ decode_path_strat,
)
- def test_symmetric(self, values, value, tag_expl, offset, tail_junk):
+ def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path):
for klass in (OctetString, OctetStringInherited):
_, _, _, _, default, optional, _decoded = values
obj = klass(
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ obj_expled_encoded + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 1)
+ _decode_path, obj, tail = evgens[0]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
+ @settings(max_examples=LONG_TEST_MAX_EXAMPLES)
@given(
integers(min_value=1, max_value=30),
lists(
max_size=3,
),
binary(),
+ decode_path_strat,
)
- def test_constructed(self, impl, chunk_inputs, junk):
+ def test_constructed(self, impl, chunk_inputs, junk, decode_path):
def chunk_constructed(contents):
return (
tag_encode(form=TagFormConstructed, num=4) +
EOC
)
chunks = []
+ chunks_len_expected = []
payload_expected = b""
for chunk_input in chunk_inputs:
if isinstance(chunk_input, binary_type):
chunks.append(OctetString(chunk_input).encode())
payload_expected += chunk_input
+ chunks_len_expected.append(len(chunk_input))
else:
chunks.append(chunk_constructed(chunk_input))
payload = b"".join(chunk_input)
payload_expected += payload
+ for c in chunk_input:
+ chunks_len_expected.append(len(c))
+ chunks_len_expected.append(len(chunks[-1]) - 1 - 1)
encoded_indefinite = (
tag_encode(form=TagFormConstructed, num=impl) +
LENINDEF +
list(obj.pps())
pprint(obj, big_blobs=True, with_decode_path=True)
+ evgens = list(OctetString(impl=tag_encode(impl)).decode_evgen(
+ encoded,
+ decode_path=decode_path,
+ ctx={"bered": True},
+ ))
+ self.assertEqual(len(evgens), len(chunks_len_expected) + 1)
+ for chunk_len_expected, (dp, obj, _) in zip(chunks_len_expected, evgens):
+ self.assertGreater(len(dp), len(decode_path))
+ self.assertEqual(obj.vlen, chunk_len_expected)
+
@given(
integers(min_value=0),
decode_path_strat,
integers(min_value=1).map(tag_ctxc),
integers(min_value=0),
binary(max_size=5),
+ decode_path_strat,
)
- def test_symmetric(self, values, tag_expl, offset, tail_junk):
+ def test_symmetric(self, values, tag_expl, offset, tail_junk, decode_path):
for klass in (Null, NullInherited):
_, _, optional, _decoded = values
obj = klass(optional=optional, _decoded=_decoded)
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ obj_expled_encoded + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 1)
+ _decode_path, obj, tail = evgens[0]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ self.assertEqual(obj, obj_decoded)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
+
@given(integers(min_value=1))
def test_invalid_len(self, l):
with self.assertRaises(InvalidLength):
integers(min_value=1).map(tag_ctxc),
integers(min_value=0),
binary(max_size=5),
+ decode_path_strat,
)
- def test_symmetric(self, values, value, tag_expl, offset, tail_junk):
+ def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path):
for klass in (ObjectIdentifier, ObjectIdentifierInherited):
_, _, _, default, optional, _decoded = values
obj = klass(
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ obj_expled_encoded + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 1)
+ _decode_path, obj, tail = evgens[0]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ self.assertEqual(obj, obj_decoded)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
@given(
oid_strategy().map(ObjectIdentifier),
oid_strategy().map(ObjectIdentifier),
offset = d.draw(integers(min_value=0))
value = d.draw(sampled_from(sorted([v for _, v in schema_input])))
tail_junk = d.draw(binary(max_size=5))
+ decode_path = d.draw(decode_path_strat)
class E(Enumerated):
schema = schema_input
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ obj_expled_encoded + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 1)
+ _decode_path, obj, tail = evgens[0]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ self.assertEqual(obj, obj_decoded)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
@composite
def string_values_strategy(draw, alphabet, do_expl=False):
tag_expl = tag_ctxc(d.draw(integers(min_value=1)))
offset = d.draw(integers(min_value=0))
tail_junk = d.draw(binary(max_size=5))
+ decode_path = d.draw(decode_path_strat)
_, _, _, _, default, optional, _decoded = values
obj = self.base_klass(
value=value,
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ obj_expled_encoded + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 1)
+ _decode_path, obj, tail = evgens[0]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ if not getattr(self, "evgen_mode_skip_value", True):
+ self.assertEqual(obj, obj_decoded)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
class TestUTF8String(StringMixin, CommonMixin, TestCase):
base_klass = UTF8String
omit_ms = False
min_datetime = datetime(1900, 1, 1)
max_datetime = datetime(9999, 12, 31)
+ evgen_mode_skip_value = False
def additional_symmetric_check(self, value, obj_encoded):
if value.microsecond > 0:
omit_ms = True
min_datetime = datetime(2000, 1, 1)
max_datetime = datetime(2049, 12, 31)
+ evgen_mode_skip_value = False
def additional_symmetric_check(self, value, obj_encoded):
pass
integers(min_value=1).map(tag_ctxc),
integers(min_value=0),
binary(max_size=5),
+ decode_path_strat,
)
- def test_symmetric(self, values, value, tag_expl, offset, tail_junk):
+ def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path):
for klass in (Any, AnyInherited):
_, _, optional, _decoded = values
obj = klass(value=value, optional=optional, _decoded=_decoded)
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ obj_expled_encoded + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 1)
+ _decode_path, obj, tail = evgens[0]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
@given(
integers(min_value=1).map(tag_ctxc),
integers(min_value=0, max_value=3),
tag_expl = tag_ctxc(d.draw(integers(min_value=1)))
offset = d.draw(integers(min_value=0))
tail_junk = d.draw(binary(max_size=5))
+ decode_path = d.draw(decode_path_strat)
class Wahl(self.base_klass):
schema = _schema
tail_junk,
)
+ evgens = list(obj_expled.decode_evgen(
+ obj_expled_encoded + tail_junk,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx_copied,
+ ))
+ self.assertEqual(len(evgens), 2)
+ _decode_path, obj, tail = evgens[0]
+ self.assertEqual(_decode_path, decode_path + (obj_decoded.choice,))
+ _decode_path, obj, tail = evgens[1]
+ self.assertSequenceEqual(tail, tail_junk)
+ self.assertEqual(_decode_path, decode_path)
+ self.assertEqual(obj.expl_offset, offset)
+ repr(obj)
+ list(obj.pps())
+
@given(integers())
def test_set_get(self, value):
class Wahl(Choice):
def test_symmetric(self, d):
seq, expects = d.draw(sequence_strategy(seq_klass=self.base_klass))
tail_junk = d.draw(binary(max_size=5))
+ decode_path = d.draw(decode_path_strat)
self.assertTrue(seq.ready)
self.assertFalse(seq.decoded)
self._assert_expects(seq, expects)
obj.encode(),
)
+ evgens = list(seq.decode_evgen(
+ encoded + decoded_tail,
+ decode_path=decode_path,
+ ctx={"bered": True},
+ ))
+ self.assertEqual(len(evgens), len(list(decoded._values_for_encoding())) + 1)
+ for _decode_path, obj, _ in evgens[:-1]:
+ self.assertEqual(_decode_path[:-1], decode_path)
+ repr(obj)
+ list(obj.pps())
+ _decode_path, obj, tail = evgens[-1]
+ self.assertEqual(_decode_path, decode_path)
+ repr(obj)
+ list(obj.pps())
+
assert_exceeding_data(
self,
lambda: seq.decod(seq_encoded_lenindef + tail_junk, ctx={"bered": True}),
integers(min_value=1).map(tag_ctxc),
integers(min_value=0),
binary(max_size=5),
+ decode_path_strat,
)
- def test_symmetric(self, values, value, tag_expl, offset, tail_junk):
+ def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path):
_, _, _, _, _, default, optional, _decoded = values
class SeqOf(self.base_klass):
with self.assertRaises(DecodeError):
obj.decode(obj_encoded_lenindef[:-2], ctx={"bered": True})
+ evgens = list(obj.decode_evgen(
+ obj_encoded_lenindef + tail_junk,
+ decode_path=decode_path,
+ ctx={"bered": True},
+ ))
+ self.assertEqual(len(evgens), len(obj_decoded_lenindef) + 1)
+ for i, (_decode_path, obj, _) in enumerate(evgens[:-1]):
+ self.assertEqual(_decode_path, decode_path + (str(i),))
+ repr(obj)
+ list(obj.pps())
+ _decode_path, obj, tail = evgens[-1]
+ self.assertEqual(_decode_path, decode_path)
+ repr(obj)
+ list(obj.pps())
+
assert_exceeding_data(
self,
lambda: obj_expled.decod(obj_expled_encoded + tail_junk),