From e1249a0c754920c57e6d7682458f50fd65b70026 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Mon, 10 Feb 2020 19:41:57 +0300 Subject: [PATCH] Initial support for event generated mode --- doc/news.rst | 3 + pyderasn.py | 526 ++++++++++++++++++++++++++++------------- tests/test_pyderasn.py | 263 +++++++++++++++++++-- 3 files changed, 615 insertions(+), 177 deletions(-) diff --git a/doc/news.rst b/doc/news.rst index 6f36001..e9749c2 100644 --- a/doc/news.rst +++ b/doc/news.rst @@ -11,6 +11,9 @@ News by tag, but by encoded representation * Any does not allow empty data value now. Now it checks if it has valid ASN.1 tag +* Initial support for so called ``evgen_mode``: event generation mode, + where no in-memory objects storing happens, giving ability to process + ASN.1 data without fully parsing it first .. _release6.3: diff --git a/pyderasn.py b/pyderasn.py index f1be647..24204ce 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -898,6 +898,19 @@ def fractions2float(fractions_raw): return float("0." + fractions_raw) +def get_def_by_path(defines_by_path, sub_decode_path): + """Get define by decode path + """ + for path, define in defines_by_path: + if len(path) != len(sub_decode_path): + continue + for p1, p2 in zip(path, sub_decode_path): + if (not p1 is any) and (p1 != p2): + break + else: + return define + + ######################################################################## # Errors ######################################################################## @@ -1331,8 +1344,8 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): # pragma: no cover - raise NotImplementedError() + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover + yield NotImplemented def encode(self): """Encode the structure @@ -1358,6 +1371,32 @@ class Obj(object): ctx=None, tag_only=False, _ctx_immutable=True, + ): + result = next(self.decode_evgen( + data, + offset, + leavemm, + decode_path, + ctx, + tag_only, + _ctx_immutable, + _evgen_mode=False, + )) + if result is None: + return None + _, obj, tail = result + return obj, tail + + def decode_evgen( + self, + data, + offset=0, + leavemm=False, + decode_path=(), + ctx=None, + tag_only=False, + _ctx_immutable=True, + _evgen_mode=True, ): """Decode the data @@ -1380,17 +1419,26 @@ class Obj(object): elif _ctx_immutable: ctx = copy(ctx) tlv = memoryview(data) + if ( + _evgen_mode and + get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None + ): + _evgen_mode = False if self._expl is None: - result = self._decode( - tlv, - offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: - return None - obj, tail = result + for result in self._decode( + tlv, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result else: try: t, tlen, lv = tag_strip(tlv) @@ -1419,16 +1467,20 @@ class Obj(object): ) llen, v = 1, lv[1:] offset += tlen + llen - result = self._decode( - v, - offset=offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: raise DecodeError( @@ -1454,16 +1506,20 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - result = self._decode( - v, - offset=offset + tlen + llen, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset + tlen + llen, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( "explicit tag out-of-bound, longer than data", @@ -1471,7 +1527,7 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - return obj, (tail if leavemm else tail.tobytes()) + yield decode_path, obj, (tail if leavemm else tail.tobytes()) def decod(self, data, offset=0, decode_path=(), ctx=None): """Decode the data, check that tail is empty @@ -2014,7 +2070,7 @@ class Boolean(Obj): (b"\xFF" if self._value else b"\x00"), )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2031,7 +2087,8 @@ class Boolean(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -2080,7 +2137,7 @@ class Boolean(Obj): _decoded=(offset, 1, 1), ) obj.ber_encoded = ber_encoded - return obj, v[1:] + yield decode_path, obj, v[1:] def __repr__(self): return pp_console_row(next(self.pps())) @@ -2343,7 +2400,7 @@ class Integer(Obj): break return b"".join((self.tag, len_encode(len(octets)), octets)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2360,7 +2417,8 @@ class Integer(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2431,7 +2489,7 @@ class Integer(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -2726,7 +2784,7 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -2738,7 +2796,8 @@ class BitString(Obj): ) if t == self.tag: if tag_only: # pragma: no cover - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2785,8 +2844,9 @@ class BitString(Obj): offset=offset, ) v, tail = v[:l], v[l:] + bit_len = (len(v) - 1) * 8 - pad_size obj = self.__class__( - value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), + value=None if evgen_mode else (bit_len, v[1:].tobytes()), impl=self.tag, expl=self._expl, default=self.default, @@ -2794,7 +2854,10 @@ class BitString(Obj): _specs=self.specs, _decoded=(offset, llen, l), ) - return obj, tail + if evgen_mode: + obj._value = (bit_len, None) + yield decode_path, obj, tail + return if t != self.tag_constructed: raise TagMismatch( klass=self.__class__, @@ -2809,7 +2872,8 @@ class BitString(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return lenindef = False try: l, llen, v = len_decode(lv) @@ -2856,14 +2920,26 @@ class BitString(Obj): ) sub_decode_path = decode_path + (str(len(chunks)),) try: - chunk, v_tail = BitString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, chunk, v_tail in BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + else: + _, chunk, v_tail = next(BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) except TagMismatch: raise DecodeError( "expected BitString encoded chunk", @@ -2892,13 +2968,15 @@ class BitString(Obj): decode_path=decode_path + (str(chunk_i),), offset=chunk.offset, ) - values.append(bytes(chunk)) + if not evgen_mode: + values.append(bytes(chunk)) bit_len += chunk.bit_len chunk_last = chunks[-1] - values.append(bytes(chunk_last)) + if not evgen_mode: + values.append(bytes(chunk_last)) bit_len += chunk_last.bit_len obj = self.__class__( - value=(bit_len, b"".join(values)), + value=None if evgen_mode else (bit_len, b"".join(values)), impl=self.tag, expl=self._expl, default=self.default, @@ -2906,9 +2984,11 @@ class BitString(Obj): _specs=self.specs, _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), ) + if evgen_mode: + obj._value = (bit_len, None) obj.lenindef = lenindef obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -2919,7 +2999,7 @@ class BitString(Obj): if self.ready: bit_len, blob = self._value value = "%d bits" % bit_len - if len(self.specs) > 0: + if len(self.specs) > 0 and blob is not None: blob = tuple(self.named) yield _pp( obj=self, @@ -2994,6 +3074,7 @@ class OctetString(Obj): __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" + evgen_mode_skip_value = True def __init__( self, @@ -3133,7 +3214,7 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3145,7 +3226,8 @@ class OctetString(Obj): ) if t == self.tag: if tag_only: - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3163,9 +3245,19 @@ class OctetString(Obj): offset=offset, ) v, tail = v[:l], v[l:] + if evgen_mode and not self._bound_min <= len(v) <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, len(v), self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=v.tobytes(), + value=( + None if (evgen_mode and self.evgen_mode_skip_value) + else v.tobytes() + ), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -3188,7 +3280,8 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail + yield decode_path, obj, tail + return if t != self.tag_constructed: raise TagMismatch( klass=self.__class__, @@ -3203,7 +3296,8 @@ class OctetString(Obj): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False try: l, llen, v = len_decode(lv) @@ -3225,8 +3319,10 @@ class OctetString(Obj): offset=offset, ) chunks = [] + chunks_count = 0 sub_offset = offset + tlen + llen vlen = 0 + payload_len = 0 while True: if lenindef: if v[:EOC_LEN].tobytes() == EOC: @@ -3241,16 +3337,33 @@ class OctetString(Obj): decode_path=decode_path + (str(len(chunks) - 1),), offset=chunks[-1].offset, ) - sub_decode_path = decode_path + (str(len(chunks)),) try: - chunk, v_tail = OctetString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + sub_decode_path = decode_path + (str(chunks_count),) + for _decode_path, chunk, v_tail in OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + if not chunk.ber_encoded: + payload_len += chunk.vlen + chunks_count += 1 + else: + sub_decode_path = decode_path + (str(len(chunks)),) + _, chunk, v_tail = next(OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + chunks.append(chunk) except TagMismatch: raise DecodeError( "expected OctetString encoded chunk", @@ -3258,13 +3371,22 @@ class OctetString(Obj): decode_path=sub_decode_path, offset=sub_offset, ) - chunks.append(chunk) sub_offset += chunk.tlvlen vlen += chunk.tlvlen v = v_tail + if evgen_mode and not self._bound_min <= payload_len <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=b"".join(bytes(chunk) for chunk in chunks), + value=( + None if evgen_mode else + b"".join(bytes(chunk) for chunk in chunks) + ), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -3289,7 +3411,7 @@ class OctetString(Obj): ) obj.lenindef = lenindef obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -3403,7 +3525,7 @@ class Null(Obj): def _encode(self): return self.tag + len_encode(0) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3420,7 +3542,8 @@ class Null(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -3443,7 +3566,7 @@ class Null(Obj): optional=self.optional, _decoded=(offset, 1, 0), ) - return obj, v + yield decode_path, obj, v def __repr__(self): return pp_console_row(next(self.pps())) @@ -3668,7 +3791,7 @@ class ObjectIdentifier(Obj): v = b"".join(octets) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3685,7 +3808,8 @@ class ObjectIdentifier(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3755,7 +3879,7 @@ class ObjectIdentifier(Obj): ) if ber_encoded: obj.ber_encoded = True - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -4239,6 +4363,7 @@ class UTCTime(VisibleString): tag_default = tag_encode(23) encoding = "ascii" asn1_type_name = "UTCTime" + evgen_mode_skip_value = False def __init__( self, @@ -4810,7 +4935,7 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): for choice, spec in iteritems(self.specs): sub_decode_path = decode_path + (choice,) try: @@ -4833,15 +4958,28 @@ class Choice(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None - value, tail = spec.decode( - tlv, - offset=offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + yield None + return + if evgen_mode: + for _decode_path, value, tail in spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, tail + else: + _, value, tail = next(spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) obj = self.__class__( schema=self.specs, expl=self._expl, @@ -4850,7 +4988,7 @@ class Choice(Obj): _decoded=(offset, 0, value.fulllen), ) obj._value = (choice, value) - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -5051,7 +5189,7 @@ class Any(Obj): self._assert_ready() return self._value - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5088,14 +5226,15 @@ class Any(Obj): chunk_i += 1 tlvlen = tlen + llen + vlen + EOC_LEN obj = self.__class__( - value=tlv[:tlvlen].tobytes(), + value=None if evgen_mode else tlv[:tlvlen].tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.lenindef = True obj.tag = t.tobytes() - return obj, v[EOC_LEN:] + yield decode_path, obj, v[EOC_LEN:] + return except DecodeError as err: raise err.__class__( msg=err.msg, @@ -5113,13 +5252,13 @@ class Any(Obj): tlvlen = tlen + llen + l v, tail = tlv[:tlvlen], v[l:] obj = self.__class__( - value=v.tobytes(), + value=None if evgen_mode else v.tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.tag = t.tobytes() - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -5293,6 +5432,12 @@ class Sequence(Obj): defaulted values existence validation by setting ``"allow_default_values": True`` :ref:`context ` option. + .. warning:: + + Check for default value existence is not performed in + ``evgen_mode``, because previously decoded values are not stored + in memory, to be able to compare them. + Two sequences are equal if they have equal specification (schema), implicit/explicit tagging and the same values. """ @@ -5446,7 +5591,7 @@ class Sequence(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5463,7 +5608,8 @@ class Sequence(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5507,21 +5653,33 @@ class Sequence(Obj): continue sub_decode_path = decode_path + (name,) try: - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) except TagMismatch as err: if (len(err.decode_path) == len(decode_path) + 1) and spec.optional: continue raise defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path) - if defined is not None: + if not evgen_mode and defined is not None: defined_by, defined_spec = defined if issubclass(value.__class__, SequenceOf): for i, _value in enumerate(value): @@ -5573,31 +5731,32 @@ class Sequence(Obj): vlen += value_len sub_offset += value_len v = v_tail - if spec.default is not None and value == spec.default: - if ctx_bered or ctx_allow_default_values: - ber_encoded = True - else: - raise DecodeError( - "DEFAULT value met", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - values[name] = value - - spec_defines = getattr(spec, "defines", ()) - if len(spec_defines) == 0: - defines_by_path = ctx.get("defines_by_path", ()) - if len(defines_by_path) > 0: - spec_defines = get_def_by_path(defines_by_path, sub_decode_path) - if spec_defines is not None and len(spec_defines) > 0: - for rel_path, schema in spec_defines: - defined = schema.get(value, None) - if defined is not None: - ctx.setdefault("_defines", []).append(( - abs_decode_path(sub_decode_path[:-1], rel_path), - (value, defined), - )) + if not evgen_mode: + if spec.default is not None and value == spec.default: + # This will not work in evgen_mode + if ctx_bered or ctx_allow_default_values: + ber_encoded = True + else: + raise DecodeError( + "DEFAULT value met", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + values[name] = value + spec_defines = getattr(spec, "defines", ()) + if len(spec_defines) == 0: + defines_by_path = ctx.get("defines_by_path", ()) + if len(defines_by_path) > 0: + spec_defines = get_def_by_path(defines_by_path, sub_decode_path) + if spec_defines is not None and len(spec_defines) > 0: + for rel_path, schema in spec_defines: + defined = schema.get(value, None) + if defined is not None: + ctx.setdefault("_defines", []).append(( + abs_decode_path(sub_decode_path[:-1], rel_path), + (value, defined), + )) if lenindef: if v[:EOC_LEN].tobytes() != EOC: raise DecodeError( @@ -5626,7 +5785,7 @@ class Sequence(Obj): obj._value = values obj.lenindef = lenindef obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -5694,7 +5853,7 @@ class Set(Sequence): )) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5711,7 +5870,8 @@ class Set(Sequence): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5774,14 +5934,26 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) value_tag_order = value.tag_order value_len = value.fulllen if tag_order_prev >= value_tag_order: @@ -5830,7 +6002,6 @@ class Set(Sequence): ) tail = v[EOC_LEN:] obj.lenindef = True - obj._value = values for name, spec in iteritems(self.specs): if name not in values and not spec.optional: raise DecodeError( @@ -5839,8 +6010,10 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) + if not evgen_mode: + obj._value = values obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail SequenceOfState = namedtuple( @@ -6043,7 +6216,16 @@ class SequenceOf(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only, ordering_check=False): + def _decode( + self, + tlv, + offset, + decode_path, + ctx, + tag_only, + evgen_mode, + ordering_check=False, + ): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -6060,7 +6242,8 @@ class SequenceOf(Obj): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -6094,6 +6277,7 @@ class SequenceOf(Obj): vlen = 0 sub_offset = offset + tlen + llen _value = [] + _value_count = 0 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) value_prev = memoryview(v[:0]) ber_encoded = False @@ -6101,15 +6285,27 @@ class SequenceOf(Obj): while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - sub_decode_path = decode_path + (str(len(_value)),) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + sub_decode_path = decode_path + (str(_value_count),) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) value_len = value.fulllen if ordering_check: if value_prev.tobytes() > v[:value_len].tobytes(): @@ -6123,13 +6319,22 @@ class SequenceOf(Obj): offset=sub_offset, ) value_prev = v[:value_len] - _value.append(value) + _value_count += 1 + if not evgen_mode: + _value.append(value) sub_offset += value_len vlen += value_len v = v_tail + if evgen_mode and not self._bound_min <= _value_count <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=_value, + value=None if evgen_mode else _value, schema=spec, bounds=(self._bound_min, self._bound_max), impl=self.tag, @@ -6156,7 +6361,7 @@ class SequenceOf(Obj): obj.lenindef = True tail = v[EOC_LEN:] obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): return "%s[%s]" % ( @@ -6206,13 +6411,14 @@ class SetOf(SequenceOf): v = b"".join(sorted(v.encode() for v in self._values_for_encoding())) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): return super(SetOf, self)._decode( tlv, offset, decode_path, ctx, tag_only, + evgen_mode, ordering_check=True, ) diff --git a/tests/test_pyderasn.py b/tests/test_pyderasn.py index 309bc22..f160ba8 100644 --- a/tests/test_pyderasn.py +++ b/tests/test_pyderasn.py @@ -583,8 +583,9 @@ class TestBoolean(CommonMixin, TestCase): integers(min_value=1).map(tag_ctxc), integers(min_value=0), binary(max_size=5), + decode_path_strat, ) - def test_symmetric(self, values, value, tag_expl, offset, tail_junk): + def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path): for klass in (Boolean, BooleanInherited): _, _, _, default, optional, _decoded = values obj = klass( @@ -639,6 +640,21 @@ class TestBoolean(CommonMixin, TestCase): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + hexdec(obj_expled_hex_encoded) + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 1) + _decode_path, obj, tail = evgens[0] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + self.assertEqual(obj, obj_decoded) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + @given(integers(min_value=2)) def test_invalid_len(self, l): with self.assertRaises(InvalidLength): @@ -656,14 +672,13 @@ class TestBoolean(CommonMixin, TestCase): len_encode(1), int2byte(value), ))) - obj, _ = Boolean().decode( - b"".join(( - Boolean.tag_default, - len_encode(1), - int2byte(value), - )), - ctx={"bered": True}, - ) + encoded = b"".join(( + Boolean.tag_default, + len_encode(1), + int2byte(value), + )) + obj, _ = Boolean().decode(encoded, ctx={"bered": True}) + list(Boolean().decode_evgen(encoded, ctx={"bered": True})) self.assertTrue(bool(obj)) self.assertTrue(obj.ber_encoded) self.assertFalse(obj.lenindef) @@ -725,6 +740,7 @@ class TestBoolean(CommonMixin, TestCase): with self.assertRaises(LenIndefForm): SeqOf().decode(encoded) seqof, tail = SeqOf().decode(encoded, ctx={"bered": True}) + list(SeqOf().decode_evgen(encoded, ctx={"bered": True})) self.assertSequenceEqual(tail, b"") self.assertSequenceEqual([bool(v) for v in seqof], values) self.assertSetEqual( @@ -1081,8 +1097,9 @@ class TestInteger(CommonMixin, TestCase): integers(min_value=1).map(tag_ctxc), integers(min_value=0), binary(max_size=5), + decode_path_strat, ) - def test_symmetric(self, values, value, tag_expl, offset, tail_junk): + def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path): for klass in (Integer, IntegerInherited): _, _, _, _, default, optional, _, _decoded = values obj = klass( @@ -1137,6 +1154,21 @@ class TestInteger(CommonMixin, TestCase): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + obj_expled_encoded + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 1) + _decode_path, obj, tail = evgens[0] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + self.assertEqual(obj, obj_decoded) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + def test_go_vectors_valid(self): for data, expect in (( (b"\x00", 0), @@ -1475,6 +1507,7 @@ class TestBitString(CommonMixin, TestCase): tail_junk = d.draw(binary(max_size=5)) tag_expl = tag_ctxc(d.draw(integers(min_value=1))) offset = d.draw(integers(min_value=0)) + decode_path = d.draw(decode_path_strat) for klass in (BitString, BitStringInherited): class BS(klass): schema = _schema @@ -1534,6 +1567,20 @@ class TestBitString(CommonMixin, TestCase): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + obj_expled_encoded + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 1) + _decode_path, obj, tail = evgens[0] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + @given(integers(min_value=1, max_value=255)) def test_bad_zero_value(self, pad_size): with self.assertRaises(DecodeError): @@ -1580,6 +1627,7 @@ class TestBitString(CommonMixin, TestCase): self.assertTrue(obj[9]) self.assertFalse(obj[17]) + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) @given( integers(min_value=1, max_value=30), lists( @@ -1596,8 +1644,9 @@ class TestBitString(CommonMixin, TestCase): ), lists(booleans(), min_size=1), binary(), + decode_path_strat, ) - def test_constructed(self, impl, chunk_inputs, chunk_last_bits, junk): + def test_constructed(self, impl, chunk_inputs, chunk_last_bits, junk, decode_path): def chunk_constructed(contents): return ( tag_encode(form=TagFormConstructed, num=3) + @@ -1606,6 +1655,7 @@ class TestBitString(CommonMixin, TestCase): EOC ) chunks = [] + chunks_len_expected = [] payload_expected = b"" bit_len_expected = 0 for chunk_input in chunk_inputs: @@ -1613,14 +1663,19 @@ class TestBitString(CommonMixin, TestCase): chunks.append(BitString(chunk_input).encode()) payload_expected += chunk_input bit_len_expected += len(chunk_input) * 8 + chunks_len_expected.append(len(chunk_input) + 1) else: chunks.append(chunk_constructed(chunk_input)) payload = b"".join(chunk_input) payload_expected += payload bit_len_expected += len(payload) * 8 + for c in chunk_input: + chunks_len_expected.append(len(c) + 1) + chunks_len_expected.append(len(chunks[-1]) - 1 - 1) chunk_last = BitString("'%s'B" % "".join( "1" if bit else "0" for bit in chunk_last_bits )) + chunks_len_expected.append(BitString().decod(chunk_last.encode()).vlen) payload_expected += bytes(chunk_last) bit_len_expected += chunk_last.bit_len encoded_indefinite = ( @@ -1661,6 +1716,16 @@ class TestBitString(CommonMixin, TestCase): list(obj.pps()) pprint(obj, big_blobs=True, with_decode_path=True) + evgens = list(BitString(impl=tag_encode(impl)).decode_evgen( + encoded, + decode_path=decode_path, + ctx={"bered": True}, + )) + self.assertEqual(len(evgens), len(chunks_len_expected) + 1) + for chunk_len_expected, (dp, obj, _) in zip(chunks_len_expected, evgens): + self.assertGreater(len(dp), len(decode_path)) + self.assertEqual(obj.vlen, chunk_len_expected) + @given( integers(min_value=0), decode_path_strat, @@ -2072,8 +2137,9 @@ class TestOctetString(CommonMixin, TestCase): integers(min_value=1).map(tag_ctxc), integers(min_value=0), binary(max_size=5), + decode_path_strat, ) - def test_symmetric(self, values, value, tag_expl, offset, tail_junk): + def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path): for klass in (OctetString, OctetStringInherited): _, _, _, _, default, optional, _decoded = values obj = klass( @@ -2128,6 +2194,21 @@ class TestOctetString(CommonMixin, TestCase): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + obj_expled_encoded + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 1) + _decode_path, obj, tail = evgens[0] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) @given( integers(min_value=1, max_value=30), lists( @@ -2143,8 +2224,9 @@ class TestOctetString(CommonMixin, TestCase): max_size=3, ), binary(), + decode_path_strat, ) - def test_constructed(self, impl, chunk_inputs, junk): + def test_constructed(self, impl, chunk_inputs, junk, decode_path): def chunk_constructed(contents): return ( tag_encode(form=TagFormConstructed, num=4) + @@ -2153,15 +2235,20 @@ class TestOctetString(CommonMixin, TestCase): EOC ) chunks = [] + chunks_len_expected = [] payload_expected = b"" for chunk_input in chunk_inputs: if isinstance(chunk_input, binary_type): chunks.append(OctetString(chunk_input).encode()) payload_expected += chunk_input + chunks_len_expected.append(len(chunk_input)) else: chunks.append(chunk_constructed(chunk_input)) payload = b"".join(chunk_input) payload_expected += payload + for c in chunk_input: + chunks_len_expected.append(len(c)) + chunks_len_expected.append(len(chunks[-1]) - 1 - 1) encoded_indefinite = ( tag_encode(form=TagFormConstructed, num=impl) + LENINDEF + @@ -2197,6 +2284,16 @@ class TestOctetString(CommonMixin, TestCase): list(obj.pps()) pprint(obj, big_blobs=True, with_decode_path=True) + evgens = list(OctetString(impl=tag_encode(impl)).decode_evgen( + encoded, + decode_path=decode_path, + ctx={"bered": True}, + )) + self.assertEqual(len(evgens), len(chunks_len_expected) + 1) + for chunk_len_expected, (dp, obj, _) in zip(chunks_len_expected, evgens): + self.assertGreater(len(dp), len(decode_path)) + self.assertEqual(obj.vlen, chunk_len_expected) + @given( integers(min_value=0), decode_path_strat, @@ -2390,8 +2487,9 @@ class TestNull(CommonMixin, TestCase): integers(min_value=1).map(tag_ctxc), integers(min_value=0), binary(max_size=5), + decode_path_strat, ) - def test_symmetric(self, values, tag_expl, offset, tail_junk): + def test_symmetric(self, values, tag_expl, offset, tail_junk, decode_path): for klass in (Null, NullInherited): _, _, optional, _decoded = values obj = klass(optional=optional, _decoded=_decoded) @@ -2439,6 +2537,22 @@ class TestNull(CommonMixin, TestCase): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + obj_expled_encoded + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 1) + _decode_path, obj, tail = evgens[0] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + self.assertEqual(obj, obj_decoded) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + + @given(integers(min_value=1)) def test_invalid_len(self, l): with self.assertRaises(InvalidLength): @@ -2727,8 +2841,9 @@ class TestObjectIdentifier(CommonMixin, TestCase): integers(min_value=1).map(tag_ctxc), integers(min_value=0), binary(max_size=5), + decode_path_strat, ) - def test_symmetric(self, values, value, tag_expl, offset, tail_junk): + def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path): for klass in (ObjectIdentifier, ObjectIdentifierInherited): _, _, _, default, optional, _decoded = values obj = klass( @@ -2783,6 +2898,21 @@ class TestObjectIdentifier(CommonMixin, TestCase): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + obj_expled_encoded + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 1) + _decode_path, obj, tail = evgens[0] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + self.assertEqual(obj, obj_decoded) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + @given( oid_strategy().map(ObjectIdentifier), oid_strategy().map(ObjectIdentifier), @@ -3100,6 +3230,7 @@ class TestEnumerated(CommonMixin, TestCase): offset = d.draw(integers(min_value=0)) value = d.draw(sampled_from(sorted([v for _, v in schema_input]))) tail_junk = d.draw(binary(max_size=5)) + decode_path = d.draw(decode_path_strat) class E(Enumerated): schema = schema_input @@ -3155,6 +3286,21 @@ class TestEnumerated(CommonMixin, TestCase): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + obj_expled_encoded + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 1) + _decode_path, obj, tail = evgens[0] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + self.assertEqual(obj, obj_decoded) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + @composite def string_values_strategy(draw, alphabet, do_expl=False): @@ -3436,6 +3582,7 @@ class StringMixin(object): tag_expl = tag_ctxc(d.draw(integers(min_value=1))) offset = d.draw(integers(min_value=0)) tail_junk = d.draw(binary(max_size=5)) + decode_path = d.draw(decode_path_strat) _, _, _, _, default, optional, _decoded = values obj = self.base_klass( value=value, @@ -3491,6 +3638,22 @@ class StringMixin(object): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + obj_expled_encoded + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 1) + _decode_path, obj, tail = evgens[0] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + if not getattr(self, "evgen_mode_skip_value", True): + self.assertEqual(obj, obj_decoded) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + class TestUTF8String(StringMixin, CommonMixin, TestCase): base_klass = UTF8String @@ -3964,6 +4127,7 @@ class TestGeneralizedTime(TimeMixin, CommonMixin, TestCase): omit_ms = False min_datetime = datetime(1900, 1, 1) max_datetime = datetime(9999, 12, 31) + evgen_mode_skip_value = False def additional_symmetric_check(self, value, obj_encoded): if value.microsecond > 0: @@ -4348,6 +4512,7 @@ class TestUTCTime(TimeMixin, CommonMixin, TestCase): omit_ms = True min_datetime = datetime(2000, 1, 1) max_datetime = datetime(2049, 12, 31) + evgen_mode_skip_value = False def additional_symmetric_check(self, value, obj_encoded): pass @@ -4859,8 +5024,9 @@ class TestAny(CommonMixin, TestCase): integers(min_value=1).map(tag_ctxc), integers(min_value=0), binary(max_size=5), + decode_path_strat, ) - def test_symmetric(self, values, value, tag_expl, offset, tail_junk): + def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path): for klass in (Any, AnyInherited): _, _, optional, _decoded = values obj = klass(value=value, optional=optional, _decoded=_decoded) @@ -4916,6 +5082,20 @@ class TestAny(CommonMixin, TestCase): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + obj_expled_encoded + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 1) + _decode_path, obj, tail = evgens[0] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + @given( integers(min_value=1).map(tag_ctxc), integers(min_value=0, max_value=3), @@ -5225,6 +5405,7 @@ class TestChoice(CommonMixin, TestCase): tag_expl = tag_ctxc(d.draw(integers(min_value=1))) offset = d.draw(integers(min_value=0)) tail_junk = d.draw(binary(max_size=5)) + decode_path = d.draw(decode_path_strat) class Wahl(self.base_klass): schema = _schema @@ -5291,6 +5472,22 @@ class TestChoice(CommonMixin, TestCase): tail_junk, ) + evgens = list(obj_expled.decode_evgen( + obj_expled_encoded + tail_junk, + offset=offset, + decode_path=decode_path, + ctx=ctx_copied, + )) + self.assertEqual(len(evgens), 2) + _decode_path, obj, tail = evgens[0] + self.assertEqual(_decode_path, decode_path + (obj_decoded.choice,)) + _decode_path, obj, tail = evgens[1] + self.assertSequenceEqual(tail, tail_junk) + self.assertEqual(_decode_path, decode_path) + self.assertEqual(obj.expl_offset, offset) + repr(obj) + list(obj.pps()) + @given(integers()) def test_set_get(self, value): class Wahl(Choice): @@ -5750,6 +5947,7 @@ class SeqMixing(object): def test_symmetric(self, d): seq, expects = d.draw(sequence_strategy(seq_klass=self.base_klass)) tail_junk = d.draw(binary(max_size=5)) + decode_path = d.draw(decode_path_strat) self.assertTrue(seq.ready) self.assertFalse(seq.decoded) self._assert_expects(seq, expects) @@ -5813,6 +6011,21 @@ class SeqMixing(object): obj.encode(), ) + evgens = list(seq.decode_evgen( + encoded + decoded_tail, + decode_path=decode_path, + ctx={"bered": True}, + )) + self.assertEqual(len(evgens), len(list(decoded._values_for_encoding())) + 1) + for _decode_path, obj, _ in evgens[:-1]: + self.assertEqual(_decode_path[:-1], decode_path) + repr(obj) + list(obj.pps()) + _decode_path, obj, tail = evgens[-1] + self.assertEqual(_decode_path, decode_path) + repr(obj) + list(obj.pps()) + assert_exceeding_data( self, lambda: seq.decod(seq_encoded_lenindef + tail_junk, ctx={"bered": True}), @@ -6440,8 +6653,9 @@ class SeqOfMixing(object): integers(min_value=1).map(tag_ctxc), integers(min_value=0), binary(max_size=5), + decode_path_strat, ) - def test_symmetric(self, values, value, tag_expl, offset, tail_junk): + def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path): _, _, _, _, _, default, optional, _decoded = values class SeqOf(self.base_klass): @@ -6523,6 +6737,21 @@ class SeqOfMixing(object): with self.assertRaises(DecodeError): obj.decode(obj_encoded_lenindef[:-2], ctx={"bered": True}) + evgens = list(obj.decode_evgen( + obj_encoded_lenindef + tail_junk, + decode_path=decode_path, + ctx={"bered": True}, + )) + self.assertEqual(len(evgens), len(obj_decoded_lenindef) + 1) + for i, (_decode_path, obj, _) in enumerate(evgens[:-1]): + self.assertEqual(_decode_path, decode_path + (str(i),)) + repr(obj) + list(obj.pps()) + _decode_path, obj, tail = evgens[-1] + self.assertEqual(_decode_path, decode_path) + repr(obj) + list(obj.pps()) + assert_exceeding_data( self, lambda: obj_expled.decod(obj_expled_encoded + tail_junk), -- 2.44.0