X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=pyderasn.py;h=62776b4bbb7570aabdf37ace1ce6d0e52be14551;hb=3370a2b2433533aaeaf17f0507bb142199cbafcf;hp=43475f19fbdb486580b6c198a1a629698be8b90b;hpb=e0842b448ce1d78c479eeec663900b9826b63bf6;p=pyderasn.git diff --git a/pyderasn.py b/pyderasn.py index 43475f1..62776b4 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -352,7 +352,6 @@ Let's parse that output, human:: (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime`` could be BERed. - .. _definedby: DEFINED BY @@ -644,6 +643,8 @@ Various .. autofunction:: pyderasn.abs_decode_path .. autofunction:: pyderasn.colonize_hex +.. autofunction:: pyderasn.encode_cer +.. autofunction:: pyderasn.file_mmaped .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec .. autofunction:: pyderasn.tag_encode @@ -780,7 +781,11 @@ from collections import OrderedDict from copy import copy from datetime import datetime from datetime import timedelta +from io import BytesIO from math import ceil +from mmap import mmap +from mmap import PROT_READ +from operator import attrgetter from string import ascii_letters from string import digits from sys import version_info @@ -808,9 +813,10 @@ except ImportError: # pragma: no cover def colored(what, *args, **kwargs): return what -__version__ = "6.3" +__version__ = "7.0" __all__ = ( + "agg_octet_string", "Any", "BitString", "BMPString", @@ -819,8 +825,10 @@ __all__ = ( "Choice", "DecodeError", "DecodePathDefBy", + "encode_cer", "Enumerated", "ExceedingData", + "file_mmaped", "GeneralizedTime", "GeneralString", "GraphicString", @@ -886,7 +894,16 @@ NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__} SET01 = frozenset("01") DECIMALS = frozenset(digits) DECIMAL_SIGNS = ".," +NEXT_ATTR_NAME = "next" if PY2 else "__next__" + + +def file_mmaped(fd): + """Make mmap-ed memoryview for reading from file + :param fd: file object + :returns: memoryview over read-only mmap-ing of the whole file + """ + return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ)) def pureint(value): if not set(value) <= DECIMALS: @@ -898,6 +915,19 @@ def fractions2float(fractions_raw): return float("0." + fractions_raw) +def get_def_by_path(defines_by_path, sub_decode_path): + """Get define by decode path + """ + for path, define in defines_by_path: + if len(path) != len(sub_decode_path): + continue + for p1, p2 in zip(path, sub_decode_path): + if (not p1 is any) and (p1 != p2): + break + else: + return define + + ######################################################################## # Errors ######################################################################## @@ -1177,6 +1207,23 @@ def len_decode(data): return l, 1 + octets_num, data[1 + octets_num:] +LEN1K = len_encode(1000) + + +def write_full(writer, data): + """Fully write provided data + + BytesIO does not guarantee that the whole data will be written at once. + """ + data = memoryview(data) + written = 0 + while written != len(data): + n = writer(data[written:]) + if n is None: + raise ValueError("can not write to buf") + written += n + + ######################################################################## # Base class ######################################################################## @@ -1190,6 +1237,7 @@ class AutoAddSlots(type): BasicState = namedtuple("BasicState", ( "version", "tag", + "tag_order", "expl", "default", "optional", @@ -1211,6 +1259,7 @@ class Obj(object): """ __slots__ = ( "tag", + "_tag_order", "_value", "_expl", "default", @@ -1235,6 +1284,13 @@ class Obj(object): self._expl = getattr(self, "expl", None) if expl is None else expl if self.tag != self.tag_default and self._expl is not None: raise ValueError("implicit and explicit tags can not be set simultaneously") + if self.tag is None: + self._tag_order = None + else: + tag_class, _, tag_num = tag_decode( + self.tag if self._expl is None else self._expl + ) + self._tag_order = (tag_class, tag_num) if default is not None: optional = True self.optional = optional @@ -1275,6 +1331,7 @@ class Obj(object): if state.version != __version__: raise ValueError("data is pickled by different PyDERASN version") self.tag = state.tag + self._tag_order = state.tag_order self._expl = state.expl self.default = state.default self.optional = state.optional @@ -1285,6 +1342,16 @@ class Obj(object): self.lenindef = state.lenindef self.ber_encoded = state.ber_encoded + @property + def tag_order(self): + """Tag's (class, number) used for DER/CER sorting + """ + return self._tag_order + + @property + def tag_order_cer(self): + return self.tag_order + @property def tlen(self): """See :ref:`decoding` @@ -1315,8 +1382,8 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): # pragma: no cover - raise NotImplementedError() + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover + yield NotImplemented def encode(self): """Encode the structure @@ -1328,6 +1395,19 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) + def encode_cer(self, writer): + if self._expl is not None: + write_full(writer, self._expl + LENINDEF) + if getattr(self, "der_forced", False): + write_full(writer, self._encode()) + else: + self._encode_cer(writer) + if self._expl is not None: + write_full(writer, EOC) + + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def hexencode(self): """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode` """ @@ -1342,6 +1422,32 @@ class Obj(object): ctx=None, tag_only=False, _ctx_immutable=True, + ): + result = next(self.decode_evgen( + data, + offset, + leavemm, + decode_path, + ctx, + tag_only, + _ctx_immutable, + _evgen_mode=False, + )) + if result is None: + return None + _, obj, tail = result + return obj, tail + + def decode_evgen( + self, + data, + offset=0, + leavemm=False, + decode_path=(), + ctx=None, + tag_only=False, + _ctx_immutable=True, + _evgen_mode=True, ): """Decode the data @@ -1364,17 +1470,26 @@ class Obj(object): elif _ctx_immutable: ctx = copy(ctx) tlv = memoryview(data) + if ( + _evgen_mode and + get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None + ): + _evgen_mode = False if self._expl is None: - result = self._decode( - tlv, - offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: - return None - obj, tail = result + for result in self._decode( + tlv, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result else: try: t, tlen, lv = tag_strip(tlv) @@ -1403,16 +1518,20 @@ class Obj(object): ) llen, v = 1, lv[1:] offset += tlen + llen - result = self._decode( - v, - offset=offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: raise DecodeError( @@ -1438,16 +1557,20 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - result = self._decode( - v, - offset=offset + tlen + llen, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset + tlen + llen, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( "explicit tag out-of-bound, longer than data", @@ -1455,7 +1578,7 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - return obj, (tail if leavemm else tail.tobytes()) + yield decode_path, obj, (tail if leavemm else tail.tobytes()) def decod(self, data, offset=0, decode_path=(), ctx=None): """Decode the data, check that tail is empty @@ -1576,6 +1699,14 @@ class Obj(object): ) +def encode_cer(obj): + """Encode to CER in memory + """ + buf = BytesIO() + obj.encode_cer(buf.write) + return buf.getvalue() + + class DecodePathDefBy(object): """DEFINED BY representation inside decode path """ @@ -1814,6 +1945,7 @@ def pprint( with_colours=False, with_decode_path=False, decode_path_only=(), + decode_path=(), ): """Pretty print object @@ -1866,7 +1998,7 @@ def pprint( else: for row in _pprint_pps(pp): yield row - return "\n".join(_pprint_pps(obj.pps())) + return "\n".join(_pprint_pps(obj.pps(decode_path))) ######################################################################## @@ -1938,6 +2070,7 @@ class Boolean(Obj): return BooleanState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -1997,7 +2130,7 @@ class Boolean(Obj): (b"\xFF" if self._value else b"\x00"), )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2014,7 +2147,8 @@ class Boolean(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -2063,7 +2197,7 @@ class Boolean(Obj): _decoded=(offset, 1, 1), ) obj.ber_encoded = ber_encoded - return obj, v[1:] + yield decode_path, obj, v[1:] def __repr__(self): return pp_console_row(next(self.pps())) @@ -2207,6 +2341,7 @@ class Integer(Obj): return IntegerState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2325,7 +2460,7 @@ class Integer(Obj): break return b"".join((self.tag, len_encode(len(octets)), octets)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2342,7 +2477,8 @@ class Integer(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2413,7 +2549,7 @@ class Integer(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -2607,6 +2743,7 @@ class BitString(Obj): return BitStringState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2707,7 +2844,31 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + bit_len, octets = self._value + if len(octets) + 1 <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 999) * 999, 999): + write_full(writer, b"".join(( + BitString.tag_default, + LEN1K, + int2byte(0), + octets[offset:offset + 999], + ))) + tail = octets[offset+999:] + if len(tail) > 0: + tail = int2byte((8 - bit_len % 8) % 8) + tail + write_full(writer, b"".join(( + BitString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -2719,7 +2880,8 @@ class BitString(Obj): ) if t == self.tag: if tag_only: # pragma: no cover - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2766,8 +2928,9 @@ class BitString(Obj): offset=offset, ) v, tail = v[:l], v[l:] + bit_len = (len(v) - 1) * 8 - pad_size obj = self.__class__( - value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), + value=None if evgen_mode else (bit_len, v[1:].tobytes()), impl=self.tag, expl=self._expl, default=self.default, @@ -2775,7 +2938,10 @@ class BitString(Obj): _specs=self.specs, _decoded=(offset, llen, l), ) - return obj, tail + if evgen_mode: + obj._value = (bit_len, None) + yield decode_path, obj, tail + return if t != self.tag_constructed: raise TagMismatch( klass=self.__class__, @@ -2790,7 +2956,8 @@ class BitString(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return lenindef = False try: l, llen, v = len_decode(lv) @@ -2837,14 +3004,26 @@ class BitString(Obj): ) sub_decode_path = decode_path + (str(len(chunks)),) try: - chunk, v_tail = BitString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, chunk, v_tail in BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + else: + _, chunk, v_tail = next(BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) except TagMismatch: raise DecodeError( "expected BitString encoded chunk", @@ -2873,13 +3052,15 @@ class BitString(Obj): decode_path=decode_path + (str(chunk_i),), offset=chunk.offset, ) - values.append(bytes(chunk)) + if not evgen_mode: + values.append(bytes(chunk)) bit_len += chunk.bit_len chunk_last = chunks[-1] - values.append(bytes(chunk_last)) + if not evgen_mode: + values.append(bytes(chunk_last)) bit_len += chunk_last.bit_len obj = self.__class__( - value=(bit_len, b"".join(values)), + value=None if evgen_mode else (bit_len, b"".join(values)), impl=self.tag, expl=self._expl, default=self.default, @@ -2887,9 +3068,11 @@ class BitString(Obj): _specs=self.specs, _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), ) + if evgen_mode: + obj._value = (bit_len, None) obj.lenindef = lenindef obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -2900,7 +3083,7 @@ class BitString(Obj): if self.ready: bit_len, blob = self._value value = "%d bits" % bit_len - if len(self.specs) > 0: + if len(self.specs) > 0 and blob is not None: blob = tuple(self.named) yield _pp( obj=self, @@ -2964,17 +3147,15 @@ class OctetString(Obj): >>> OctetString(b"hell", bounds=(4, 4)) OCTET STRING 4 bytes 68656c6c - .. note:: - - Pay attention that OCTET STRING can be encoded both in primitive - and constructed forms. Decoder always checks constructed form tag - additionally to specified primitive one. If BER decoding is - :ref:`not enabled `, then decoder will fail, because - of DER restrictions. + Memoryviews can be used as a values. If memoryview is made on + mmap-ed file, then it does not take storage inside OctetString + itself. In CER encoding mode it will be streamed to the specified + writer, copying 1 KB chunks. """ __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" + evgen_mode_skip_value = True def __init__( self, @@ -3024,12 +3205,12 @@ class OctetString(Obj): ) def _value_sanitize(self, value): - if value.__class__ == binary_type: + if value.__class__ == binary_type or value.__class__ == memoryview: pass elif issubclass(value.__class__, OctetString): value = value._value else: - raise InvalidValueType((self.__class__, bytes)) + raise InvalidValueType((self.__class__, bytes, memoryview)) if not self._bound_min <= len(value) <= self._bound_max: raise BoundsError(self._bound_min, len(value), self._bound_max) return value @@ -3042,6 +3223,7 @@ class OctetString(Obj): return OctetStringState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3068,7 +3250,7 @@ class OctetString(Obj): def __bytes__(self): self._assert_ready() - return self._value + return bytes(self._value) def __eq__(self, their): if their.__class__ == binary_type: @@ -3113,7 +3295,29 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + octets = self._value + if len(octets) <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000): + write_full(writer, b"".join(( + OctetString.tag_default, + LEN1K, + octets[offset:offset + 1000], + ))) + tail = octets[offset+1000:] + if len(tail) > 0: + write_full(writer, b"".join(( + OctetString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3125,7 +3329,8 @@ class OctetString(Obj): ) if t == self.tag: if tag_only: - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3143,9 +3348,19 @@ class OctetString(Obj): offset=offset, ) v, tail = v[:l], v[l:] + if evgen_mode and not self._bound_min <= len(v) <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, len(v), self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=v.tobytes(), + value=( + None if (evgen_mode and self.evgen_mode_skip_value) + else v.tobytes() + ), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -3168,7 +3383,8 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail + yield decode_path, obj, tail + return if t != self.tag_constructed: raise TagMismatch( klass=self.__class__, @@ -3183,7 +3399,8 @@ class OctetString(Obj): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False try: l, llen, v = len_decode(lv) @@ -3205,8 +3422,10 @@ class OctetString(Obj): offset=offset, ) chunks = [] + chunks_count = 0 sub_offset = offset + tlen + llen vlen = 0 + payload_len = 0 while True: if lenindef: if v[:EOC_LEN].tobytes() == EOC: @@ -3221,16 +3440,33 @@ class OctetString(Obj): decode_path=decode_path + (str(len(chunks) - 1),), offset=chunks[-1].offset, ) - sub_decode_path = decode_path + (str(len(chunks)),) try: - chunk, v_tail = OctetString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + sub_decode_path = decode_path + (str(chunks_count),) + for _decode_path, chunk, v_tail in OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + if not chunk.ber_encoded: + payload_len += chunk.vlen + chunks_count += 1 + else: + sub_decode_path = decode_path + (str(len(chunks)),) + _, chunk, v_tail = next(OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + chunks.append(chunk) except TagMismatch: raise DecodeError( "expected OctetString encoded chunk", @@ -3238,13 +3474,22 @@ class OctetString(Obj): decode_path=sub_decode_path, offset=sub_offset, ) - chunks.append(chunk) sub_offset += chunk.tlvlen vlen += chunk.tlvlen v = v_tail + if evgen_mode and not self._bound_min <= payload_len <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=b"".join(bytes(chunk) for chunk in chunks), + value=( + None if evgen_mode else + b"".join(bytes(chunk) for chunk in chunks) + ), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -3269,7 +3514,7 @@ class OctetString(Obj): ) obj.lenindef = lenindef obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -3308,6 +3553,29 @@ class OctetString(Obj): yield pp +def agg_octet_string(evgens, decode_path, raw, writer): + """Aggregate constructed string (OctetString and its derivatives) + + :param evgens: iterator of generated events + :param decode_path: points to the string we want to decode + :param raw: slicebable (memoryview, bytearray, etc) with + the data evgens are generated one + :param writer: buffer.write where string is going to be saved + """ + decode_path_len = len(decode_path) + for dp, obj, _ in evgens: + if dp[:decode_path_len] != decode_path: + continue + if not obj.ber_encoded: + write_full(writer, raw[ + obj.offset + obj.tlen + obj.llen: + obj.offset + obj.tlen + obj.llen + obj.vlen - + (EOC_LEN if obj.expl_lenindef else 0) + ]) + if len(dp) == decode_path_len: + break + + NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS) @@ -3347,6 +3615,7 @@ class Null(Obj): return NullState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3382,7 +3651,7 @@ class Null(Obj): def _encode(self): return self.tag + len_encode(0) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3399,7 +3668,8 @@ class Null(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -3422,7 +3692,7 @@ class Null(Obj): optional=self.optional, _decoded=(offset, 1, 0), ) - return obj, v + yield decode_path, obj, v def __repr__(self): return pp_console_row(next(self.pps())) @@ -3562,6 +3832,7 @@ class ObjectIdentifier(Obj): return ObjectIdentifierState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3646,7 +3917,7 @@ class ObjectIdentifier(Obj): v = b"".join(octets) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3663,7 +3934,8 @@ class ObjectIdentifier(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3733,7 +4005,7 @@ class ObjectIdentifier(Obj): ) if ber_encoded: obj.ber_encoded = True - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -4217,6 +4489,7 @@ class UTCTime(VisibleString): tag_default = tag_encode(23) encoding = "ascii" asn1_type_name = "UTCTime" + evgen_mode_skip_value = False def __init__( self, @@ -4387,6 +4660,9 @@ class UTCTime(VisibleString): value = self._encode_time() return b"".join((self.tag, len_encode(len(value)), value)) + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def todatetime(self): return self._value @@ -4663,6 +4939,9 @@ class Choice(Obj): self.default = default_obj if value is None: self._value = copy(default_obj._value) + if self._expl is not None: + tag_class, _, tag_num = tag_decode(self._expl) + self._tag_order = (tag_class, tag_num) def _value_sanitize(self, value): if (value.__class__ == tuple) and len(value) == 2: @@ -4692,6 +4971,7 @@ class Choice(Obj): return ChoiceState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -4749,6 +5029,15 @@ class Choice(Obj): self._assert_ready() return self._value[1] + @property + def tag_order(self): + self._assert_ready() + return self._value[1].tag_order if self._tag_order is None else self._tag_order + + @property + def tag_order_cer(self): + return min(v.tag_order_cer for v in itervalues(self.specs)) + def __getitem__(self, key): if key not in self.specs: raise ObjUnknown(key) @@ -4779,7 +5068,11 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + self._assert_ready() + self._value[1].encode_cer(writer) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): for choice, spec in iteritems(self.specs): sub_decode_path = decode_path + (choice,) try: @@ -4802,15 +5095,28 @@ class Choice(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None - value, tail = spec.decode( - tlv, - offset=offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + yield None + return + if evgen_mode: + for _decode_path, value, tail in spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, tail + else: + _, value, tail = next(spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) obj = self.__class__( schema=self.specs, expl=self._expl, @@ -4819,7 +5125,7 @@ class Choice(Obj): _decoded=(offset, 0, value.fulllen), ) obj._value = (choice, value) - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -4897,7 +5203,7 @@ class Any(Obj): """``ANY`` special type >>> Any(Integer(-123)) - ANY 020185 + ANY INTEGER -123 (0X:7B) >>> a = Any(OctetString(b"hello world").encode()) ANY 040b68656c6c6f20776f726c64 >>> hexenc(bytes(a)) @@ -4917,28 +5223,47 @@ class Any(Obj): """ :param value: set the value. Either any kind of pyderasn's **ready** object, or bytes. Pay attention that - **no** validation is performed is raw binary value - is valid TLV + **no** validation is performed if raw binary value + is valid TLV, except just tag decoding :param bytes expl: override default tag with ``EXPLICIT`` one :param bool optional: is object ``OPTIONAL`` in sequence """ super(Any, self).__init__(None, expl, None, optional, _decoded) - self._value = None if value is None else self._value_sanitize(value) + if value is None: + self._value = None + else: + value = self._value_sanitize(value) + self._value = value + if self._expl is None: + if value.__class__ == binary_type: + tag_class, _, tag_num = tag_decode(tag_strip(value)[0]) + else: + tag_class, tag_num = value.tag_order + else: + tag_class, _, tag_num = tag_decode(self._expl) + self._tag_order = (tag_class, tag_num) self.defined = None def _value_sanitize(self, value): if value.__class__ == binary_type: + if len(value) == 0: + raise ValueError("Any value can not be empty") return value if isinstance(value, self.__class__): return value._value - if isinstance(value, Obj): - return value.encode() - raise InvalidValueType((self.__class__, Obj, binary_type)) + if not isinstance(value, Obj): + raise InvalidValueType((self.__class__, Obj, binary_type)) + return value @property def ready(self): return self._value is not None + @property + def tag_order(self): + self._assert_ready() + return self._tag_order + @property def bered(self): if self.expl_lenindef or self.lenindef: @@ -4951,6 +5276,7 @@ class Any(Obj): return AnyState( __version__, self.tag, + self._tag_order, self._expl, None, self.optional, @@ -4971,9 +5297,13 @@ class Any(Obj): def __eq__(self, their): if their.__class__ == binary_type: - return self._value == their + if self._value.__class__ == binary_type: + return self._value == their + return self._value.encode() == their if issubclass(their.__class__, Any): - return self._value == their._value + if self.ready and their.ready: + return bytes(self) == bytes(their) + return self.ready == their.ready return False def __call__( @@ -4990,7 +5320,10 @@ class Any(Obj): def __bytes__(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return self._value.encode() @property def tlen(self): @@ -4998,9 +5331,20 @@ class Any(Obj): def _encode(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return value.encode() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + self._assert_ready() + value = self._value + if value.__class__ == binary_type: + write_full(writer, value) + else: + value.encode_cer(writer) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5037,14 +5381,15 @@ class Any(Obj): chunk_i += 1 tlvlen = tlen + llen + vlen + EOC_LEN obj = self.__class__( - value=tlv[:tlvlen].tobytes(), + value=None if evgen_mode else tlv[:tlvlen].tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.lenindef = True obj.tag = t.tobytes() - return obj, v[EOC_LEN:] + yield decode_path, obj, v[EOC_LEN:] + return except DecodeError as err: raise err.__class__( msg=err.msg, @@ -5062,24 +5407,32 @@ class Any(Obj): tlvlen = tlen + llen + l v, tail = tlv[:tlvlen], v[l:] obj = self.__class__( - value=v.tobytes(), + value=None if evgen_mode else v.tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.tag = t.tobytes() - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) def pps(self, decode_path=()): + value = self._value + if value is None: + pass + elif value.__class__ == binary_type: + value = None + else: + value = repr(value) yield _pp( obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, - blob=self._value if self.ready else None, + value=value, + blob=self._value if self._value.__class__ == binary_type else None, optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), @@ -5242,6 +5595,12 @@ class Sequence(Obj): defaulted values existence validation by setting ``"allow_default_values": True`` :ref:`context ` option. + .. warning:: + + Check for default value existence is not performed in + ``evgen_mode``, because previously decoded values are not stored + in memory, to be able to compare them. + Two sequences are equal if they have equal specification (schema), implicit/explicit tagging and the same values. """ @@ -5307,6 +5666,7 @@ class Sequence(Obj): return SequenceState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -5394,7 +5754,13 @@ class Sequence(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5411,7 +5777,8 @@ class Sequence(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5455,21 +5822,33 @@ class Sequence(Obj): continue sub_decode_path = decode_path + (name,) try: - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) except TagMismatch as err: if (len(err.decode_path) == len(decode_path) + 1) and spec.optional: continue raise defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path) - if defined is not None: + if not evgen_mode and defined is not None: defined_by, defined_spec = defined if issubclass(value.__class__, SequenceOf): for i, _value in enumerate(value): @@ -5521,31 +5900,32 @@ class Sequence(Obj): vlen += value_len sub_offset += value_len v = v_tail - if spec.default is not None and value == spec.default: - if ctx_bered or ctx_allow_default_values: - ber_encoded = True - else: - raise DecodeError( - "DEFAULT value met", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - values[name] = value - - spec_defines = getattr(spec, "defines", ()) - if len(spec_defines) == 0: - defines_by_path = ctx.get("defines_by_path", ()) - if len(defines_by_path) > 0: - spec_defines = get_def_by_path(defines_by_path, sub_decode_path) - if spec_defines is not None and len(spec_defines) > 0: - for rel_path, schema in spec_defines: - defined = schema.get(value, None) - if defined is not None: - ctx.setdefault("_defines", []).append(( - abs_decode_path(sub_decode_path[:-1], rel_path), - (value, defined), - )) + if not evgen_mode: + if spec.default is not None and value == spec.default: + # This will not work in evgen_mode + if ctx_bered or ctx_allow_default_values: + ber_encoded = True + else: + raise DecodeError( + "DEFAULT value met", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + values[name] = value + spec_defines = getattr(spec, "defines", ()) + if len(spec_defines) == 0: + defines_by_path = ctx.get("defines_by_path", ()) + if len(defines_by_path) > 0: + spec_defines = get_def_by_path(defines_by_path, sub_decode_path) + if spec_defines is not None and len(spec_defines) > 0: + for rel_path, schema in spec_defines: + defined = schema.get(value, None) + if defined is not None: + ctx.setdefault("_defines", []).append(( + abs_decode_path(sub_decode_path[:-1], rel_path), + (value, defined), + )) if lenindef: if v[:EOC_LEN].tobytes() != EOC: raise DecodeError( @@ -5574,7 +5954,7 @@ class Sequence(Obj): obj._value = values obj.lenindef = lenindef obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -5636,15 +6016,22 @@ class Set(Sequence): asn1_type_name = "SET" def _encode(self): - raws = [v.encode() for v in self._values_for_encoding()] - raws.sort() - v = b"".join(raws) + v = b"".join(value.encode() for value in sorted( + self._values_for_encoding(), + key=attrgetter("tag_order"), + )) return b"".join((self.tag, len_encode(len(v)), v)) - def _specs_items(self): - return iteritems(self.specs) + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted( + self._values_for_encoding(), + key=attrgetter("tag_order_cer"), + ): + v.encode_cer(writer) + write_full(writer, EOC) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5661,7 +6048,8 @@ class Set(Sequence): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5697,12 +6085,13 @@ class Set(Sequence): ber_encoded = False ctx_allow_default_values = ctx.get("allow_default_values", False) ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) - value_prev = memoryview(v[:0]) + tag_order_prev = (0, 0) + _specs_items = copy(self.specs) while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - for name, spec in self._specs_items(): + for name, spec in iteritems(_specs_items): sub_decode_path = decode_path + (name,) try: spec.decode( @@ -5723,16 +6112,29 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + value_tag_order = value.tag_order value_len = value.fulllen - if value_prev.tobytes() > v[:value_len].tobytes(): + if tag_order_prev >= value_tag_order: if ctx_bered or ctx_allow_unordered_set: ber_encoded = True else: @@ -5754,10 +6156,12 @@ class Set(Sequence): offset=sub_offset, ) values[name] = value - value_prev = v[:value_len] + del _specs_items[name] + tag_order_prev = value_tag_order sub_offset += value_len vlen += value_len v = v_tail + obj = self.__class__( schema=self.specs, impl=self.tag, @@ -5776,7 +6180,6 @@ class Set(Sequence): ) tail = v[EOC_LEN:] obj.lenindef = True - obj._value = values for name, spec in iteritems(self.specs): if name not in values and not spec.optional: raise DecodeError( @@ -5785,8 +6188,10 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) + if not evgen_mode: + obj._value = values obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail SequenceOfState = namedtuple( @@ -5822,9 +6227,21 @@ class SequenceOf(Obj): >>> ints Ints SEQUENCE OF[INTEGER 123, INTEGER 345] - Also you can initialize sequence with preinitialized values: + You can initialize sequence with preinitialized values: >>> ints = Ints([Integer(123), Integer(234)]) + + Also you can use iterator as a value: + + >>> ints = Ints(iter(Integer(i) for i in range(1000000))) + + And it won't be iterated until encoding process. Pay attention that + bounds and required schema checks are done only during the encoding + process in that case! After encode was called, then value is zeroed + back to empty list and you have to set it again. That mode is useful + mainly with CER encoding mode, where all objects from the iterable + will be streamed to the buffer, without copying all of them to + memory first. """ __slots__ = ("spec", "_bound_min", "_bound_max") tag_default = tag_encode(form=TagFormConstructed, num=16) @@ -5868,21 +6285,31 @@ class SequenceOf(Obj): self._value = copy(default_obj._value) def _value_sanitize(self, value): + iterator = False if issubclass(value.__class__, SequenceOf): value = value._value + elif hasattr(value, NEXT_ATTR_NAME): + iterator = True + value = value elif hasattr(value, "__iter__"): value = list(value) else: - raise InvalidValueType((self.__class__, iter)) - if not self._bound_min <= len(value) <= self._bound_max: - raise BoundsError(self._bound_min, len(value), self._bound_max) - for v in value: - if not isinstance(v, self.spec.__class__): - raise InvalidValueType((self.spec.__class__,)) + raise InvalidValueType((self.__class__, iter, "iterator")) + if not iterator: + if not self._bound_min <= len(value) <= self._bound_max: + raise BoundsError(self._bound_min, len(value), self._bound_max) + class_expected = self.spec.__class__ + for v in value: + if not isinstance(v, class_expected): + raise InvalidValueType((class_expected,)) return value @property def ready(self): + if hasattr(self._value, NEXT_ATTR_NAME): + return True + if self._bound_min > 0 and len(self._value) == 0: + return False return all(v.ready for v in self._value) @property @@ -5892,9 +6319,12 @@ class SequenceOf(Obj): return any(v.bered for v in self._value) def __getstate__(self): + if hasattr(self._value, NEXT_ATTR_NAME): + raise ValueError("can not pickle SequenceOf with iterator") return SequenceOfState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -5966,11 +6396,9 @@ class SequenceOf(Obj): self._value.append(value) def __iter__(self): - self._assert_ready() return iter(self._value) def __len__(self): - self._assert_ready() return len(self._value) def __setitem__(self, key, value): @@ -5985,10 +6413,54 @@ class SequenceOf(Obj): return iter(self._value) def _encode(self): - v = b"".join(v.encode() for v in self._values_for_encoding()) - return b"".join((self.tag, len_encode(len(v)), v)) + iterator = hasattr(self._value, NEXT_ATTR_NAME) + if iterator: + values = [] + values_append = values.append + class_expected = self.spec.__class__ + values_for_encoding = self._values_for_encoding() + self._value = [] + for v in values_for_encoding: + if not isinstance(v, class_expected): + raise InvalidValueType((class_expected,)) + values_append(v.encode()) + if not self._bound_min <= len(values) <= self._bound_max: + raise BoundsError(self._bound_min, len(values), self._bound_max) + value = b"".join(values) + else: + value = b"".join(v.encode() for v in self._values_for_encoding()) + return b"".join((self.tag, len_encode(len(value)), value)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only, ordering_check=False): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + iterator = hasattr(self._value, NEXT_ATTR_NAME) + if iterator: + class_expected = self.spec.__class__ + values_count = 0 + values_for_encoding = self._values_for_encoding() + self._value = [] + for v in values_for_encoding: + if not isinstance(v, class_expected): + raise InvalidValueType((class_expected,)) + v.encode_cer(writer) + values_count += 1 + if not self._bound_min <= values_count <= self._bound_max: + raise BoundsError(self._bound_min, values_count, self._bound_max) + else: + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) + + def _decode( + self, + tlv, + offset, + decode_path, + ctx, + tag_only, + evgen_mode, + ordering_check=False, + ): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -6005,7 +6477,8 @@ class SequenceOf(Obj): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -6039,6 +6512,7 @@ class SequenceOf(Obj): vlen = 0 sub_offset = offset + tlen + llen _value = [] + _value_count = 0 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) value_prev = memoryview(v[:0]) ber_encoded = False @@ -6046,15 +6520,27 @@ class SequenceOf(Obj): while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - sub_decode_path = decode_path + (str(len(_value)),) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + sub_decode_path = decode_path + (str(_value_count),) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) value_len = value.fulllen if ordering_check: if value_prev.tobytes() > v[:value_len].tobytes(): @@ -6068,13 +6554,22 @@ class SequenceOf(Obj): offset=sub_offset, ) value_prev = v[:value_len] - _value.append(value) + _value_count += 1 + if not evgen_mode: + _value.append(value) sub_offset += value_len vlen += value_len v = v_tail + if evgen_mode and not self._bound_min <= _value_count <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=_value, + value=None if evgen_mode else _value, schema=spec, bounds=(self._bound_min, self._bound_max), impl=self.tag, @@ -6101,7 +6596,7 @@ class SequenceOf(Obj): obj.lenindef = True tail = v[EOC_LEN:] obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): return "%s[%s]" % ( @@ -6147,19 +6642,32 @@ class SetOf(SequenceOf): tag_default = tag_encode(form=TagFormConstructed, num=17) asn1_type_name = "SET OF" + def _value_sanitize(self, value): + value = super(SetOf, self)._value_sanitize(value) + if hasattr(value, NEXT_ATTR_NAME): + raise ValueError( + "SetOf does not support iterator values, as no sense in them" + ) + return value + def _encode(self): - raws = [v.encode() for v in self._values_for_encoding()] - raws.sort() - v = b"".join(raws) + v = b"".join(sorted(v.encode() for v in self._values_for_encoding())) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted(encode_cer(v) for v in self._values_for_encoding()): + write_full(writer, v) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): return super(SetOf, self)._decode( tlv, offset, decode_path, ctx, tag_only, + evgen_mode, ordering_check=True, ) @@ -6281,14 +6789,22 @@ def main(): # pragma: no cover help="Allow explicit tag out-of-bound", ) parser.add_argument( - "DERFile", + "--evgen", + action="store_true", + help="Turn on event generation mode", + ) + parser.add_argument( + "RAWFile", type=argparse.FileType("rb"), - help="Path to DER file you want to decode", + help="Path to BER/CER/DER file you want to decode", ) args = parser.parse_args() - args.DERFile.seek(args.skip) - der = memoryview(args.DERFile.read()) - args.DERFile.close() + if PY2: + args.RAWFile.seek(args.skip) + raw = memoryview(args.RAWFile.read()) + args.RAWFile.close() + else: + raw = file_mmaped(args.RAWFile)[args.skip:] oid_maps = ( [obj_by_path(_path) for _path in (args.oids or "").split(",")] if args.oids else () @@ -6305,10 +6821,9 @@ def main(): # pragma: no cover } if args.defines_by_path is not None: ctx["defines_by_path"] = obj_by_path(args.defines_by_path) - obj, tail = schema().decode(der, ctx=ctx) from os import environ - print(pprinter( - obj, + pprinter = partial( + pprinter, oid_maps=oid_maps, with_colours=environ.get("NO_COLOR") is None, with_decode_path=args.print_decode_path, @@ -6316,7 +6831,13 @@ def main(): # pragma: no cover () if args.decode_path_only is None else tuple(args.decode_path_only.split(":")) ), - )) + ) + if args.evgen: + for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx): + print(pprinter(obj, decode_path=decode_path)) + else: + obj, tail = schema().decode(raw, ctx=ctx) + print(pprinter(obj)) if tail != b"": print("\nTrailing data: %s" % hexenc(tail))