X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=pyderasn.py;h=fe1d4ab800624dfcb03307fe06b649f9ae259b3b;hb=e9ab3a6d6679d825d0e48523cec13ff710f0220d;hp=58583b18d0083bb56df58116e8f588533e9c3186;hpb=370a9698a2b53a681a0f94d371b3f889df55225f;p=pyderasn.git diff --git a/pyderasn.py b/pyderasn.py index 58583b1..fe1d4ab 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -352,7 +352,6 @@ Let's parse that output, human:: (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime`` could be BERed. - .. _definedby: DEFINED BY @@ -550,12 +549,12 @@ _______ Integer _______ .. autoclass:: pyderasn.Integer - :members: __init__ + :members: __init__, named BitString _________ .. autoclass:: pyderasn.BitString - :members: __init__ + :members: __init__, bit_len, named OctetString ___________ @@ -587,7 +586,7 @@ _____________ PrintableString _______________ .. autoclass:: pyderasn.PrintableString - :members: __init__ + :members: __init__, allow_asterisk, allow_ampersand UTCTime _______ @@ -597,6 +596,7 @@ _______ GeneralizedTime _______________ .. autoclass:: pyderasn.GeneralizedTime + :members: __init__, todatetime Special types ------------- @@ -604,7 +604,7 @@ Special types Choice ______ .. autoclass:: pyderasn.Choice - :members: __init__ + :members: __init__, choice, value PrimitiveTypes ______________ @@ -643,6 +643,8 @@ Various .. autofunction:: pyderasn.abs_decode_path .. autofunction:: pyderasn.colonize_hex +.. autofunction:: pyderasn.encode_cer +.. autofunction:: pyderasn.file_mmaped .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec .. autofunction:: pyderasn.tag_encode @@ -779,7 +781,11 @@ from collections import OrderedDict from copy import copy from datetime import datetime from datetime import timedelta +from io import BytesIO from math import ceil +from mmap import mmap +from mmap import PROT_READ +from operator import attrgetter from string import ascii_letters from string import digits from sys import version_info @@ -807,7 +813,7 @@ except ImportError: # pragma: no cover def colored(what, *args, **kwargs): return what -__version__ = "6.3" +__version__ = "7.0" __all__ = ( "Any", @@ -818,8 +824,10 @@ __all__ = ( "Choice", "DecodeError", "DecodePathDefBy", + "encode_cer", "Enumerated", "ExceedingData", + "file_mmaped", "GeneralizedTime", "GeneralString", "GraphicString", @@ -887,6 +895,14 @@ DECIMALS = frozenset(digits) DECIMAL_SIGNS = ".," +def file_mmaped(fd): + """Make mmap-ed memoryview for reading from file + + :param fd: file object + :returns: memoryview over read-only mmap-ing of the whole file + """ + return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ)) + def pureint(value): if not set(value) <= DECIMALS: raise ValueError("non-pure integer") @@ -897,6 +913,19 @@ def fractions2float(fractions_raw): return float("0." + fractions_raw) +def get_def_by_path(defines_by_path, sub_decode_path): + """Get define by decode path + """ + for path, define in defines_by_path: + if len(path) != len(sub_decode_path): + continue + for p1, p2 in zip(path, sub_decode_path): + if (not p1 is any) and (p1 != p2): + break + else: + return define + + ######################################################################## # Errors ######################################################################## @@ -1176,6 +1205,23 @@ def len_decode(data): return l, 1 + octets_num, data[1 + octets_num:] +LEN1K = len_encode(1000) + + +def write_full(writer, data): + """Fully write provided data + + BytesIO does not guarantee that the whole data will be written at once. + """ + data = memoryview(data) + written = 0 + while written != len(data): + n = writer(data[written:]) + if n is None: + raise ValueError("can not write to buf") + written += n + + ######################################################################## # Base class ######################################################################## @@ -1189,6 +1235,7 @@ class AutoAddSlots(type): BasicState = namedtuple("BasicState", ( "version", "tag", + "tag_order", "expl", "default", "optional", @@ -1210,6 +1257,7 @@ class Obj(object): """ __slots__ = ( "tag", + "_tag_order", "_value", "_expl", "default", @@ -1234,6 +1282,13 @@ class Obj(object): self._expl = getattr(self, "expl", None) if expl is None else expl if self.tag != self.tag_default and self._expl is not None: raise ValueError("implicit and explicit tags can not be set simultaneously") + if self.tag is None: + self._tag_order = None + else: + tag_class, _, tag_num = tag_decode( + self.tag if self._expl is None else self._expl + ) + self._tag_order = (tag_class, tag_num) if default is not None: optional = True self.optional = optional @@ -1274,6 +1329,7 @@ class Obj(object): if state.version != __version__: raise ValueError("data is pickled by different PyDERASN version") self.tag = state.tag + self._tag_order = state.tag_order self._expl = state.expl self.default = state.default self.optional = state.optional @@ -1284,6 +1340,16 @@ class Obj(object): self.lenindef = state.lenindef self.ber_encoded = state.ber_encoded + @property + def tag_order(self): + """Tag's (class, number) used for DER/CER sorting + """ + return self._tag_order + + @property + def tag_order_cer(self): + return self.tag_order + @property def tlen(self): """See :ref:`decoding` @@ -1314,8 +1380,8 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): # pragma: no cover - raise NotImplementedError() + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover + yield NotImplemented def encode(self): """Encode the structure @@ -1327,6 +1393,19 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) + def encode_cer(self, writer): + if self._expl is not None: + write_full(writer, self._expl + LENINDEF) + if getattr(self, "der_forced", False): + write_full(writer, self._encode()) + else: + self._encode_cer(writer) + if self._expl is not None: + write_full(writer, EOC) + + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def hexencode(self): """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode` """ @@ -1341,6 +1420,32 @@ class Obj(object): ctx=None, tag_only=False, _ctx_immutable=True, + ): + result = next(self.decode_evgen( + data, + offset, + leavemm, + decode_path, + ctx, + tag_only, + _ctx_immutable, + _evgen_mode=False, + )) + if result is None: + return None + _, obj, tail = result + return obj, tail + + def decode_evgen( + self, + data, + offset=0, + leavemm=False, + decode_path=(), + ctx=None, + tag_only=False, + _ctx_immutable=True, + _evgen_mode=True, ): """Decode the data @@ -1363,17 +1468,26 @@ class Obj(object): elif _ctx_immutable: ctx = copy(ctx) tlv = memoryview(data) + if ( + _evgen_mode and + get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None + ): + _evgen_mode = False if self._expl is None: - result = self._decode( - tlv, - offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: - return None - obj, tail = result + for result in self._decode( + tlv, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result else: try: t, tlen, lv = tag_strip(tlv) @@ -1402,16 +1516,20 @@ class Obj(object): ) llen, v = 1, lv[1:] offset += tlen + llen - result = self._decode( - v, - offset=offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: raise DecodeError( @@ -1437,16 +1555,20 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - result = self._decode( - v, - offset=offset + tlen + llen, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset + tlen + llen, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( "explicit tag out-of-bound, longer than data", @@ -1454,7 +1576,7 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - return obj, (tail if leavemm else tail.tobytes()) + yield decode_path, obj, (tail if leavemm else tail.tobytes()) def decod(self, data, offset=0, decode_path=(), ctx=None): """Decode the data, check that tail is empty @@ -1575,6 +1697,14 @@ class Obj(object): ) +def encode_cer(obj): + """Encode to CER in memory + """ + buf = BytesIO() + obj.encode_cer(buf.write) + return buf.getvalue() + + class DecodePathDefBy(object): """DEFINED BY representation inside decode path """ @@ -1813,6 +1943,7 @@ def pprint( with_colours=False, with_decode_path=False, decode_path_only=(), + decode_path=(), ): """Pretty print object @@ -1865,7 +1996,7 @@ def pprint( else: for row in _pprint_pps(pp): yield row - return "\n".join(_pprint_pps(obj.pps())) + return "\n".join(_pprint_pps(obj.pps(decode_path))) ######################################################################## @@ -1937,6 +2068,7 @@ class Boolean(Obj): return BooleanState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -1996,7 +2128,7 @@ class Boolean(Obj): (b"\xFF" if self._value else b"\x00"), )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2013,7 +2145,8 @@ class Boolean(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -2062,7 +2195,7 @@ class Boolean(Obj): _decoded=(offset, 1, 1), ) obj.ber_encoded = ber_encoded - return obj, v[1:] + yield decode_path, obj, v[1:] def __repr__(self): return pp_console_row(next(self.pps())) @@ -2206,6 +2339,7 @@ class Integer(Obj): return IntegerState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2256,6 +2390,8 @@ class Integer(Obj): @property def named(self): + """Return named representation (if exists) of the value + """ for name, value in iteritems(self.specs): if value == self._value: return name @@ -2322,7 +2458,7 @@ class Integer(Obj): break return b"".join((self.tag, len_encode(len(octets)), octets)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2339,7 +2475,8 @@ class Integer(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2410,7 +2547,7 @@ class Integer(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -2604,6 +2741,7 @@ class BitString(Obj): return BitStringState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2633,6 +2771,8 @@ class BitString(Obj): @property def bit_len(self): + """Returns number of bits in the string + """ self._assert_ready() return self._value[0] @@ -2653,6 +2793,10 @@ class BitString(Obj): @property def named(self): + """Named representation (if exists) of the bits + + :returns: [str(name), ...] + """ return [name for name, bit in iteritems(self.specs) if self[bit]] def __call__( @@ -2698,7 +2842,31 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + bit_len, octets = self._value + if len(octets) + 1 <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 999) * 999, 999): + write_full(writer, b"".join(( + BitString.tag_default, + LEN1K, + int2byte(0), + octets[offset:offset + 999], + ))) + tail = octets[offset+999:] + if len(tail) > 0: + tail = int2byte((8 - bit_len % 8) % 8) + tail + write_full(writer, b"".join(( + BitString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -2710,7 +2878,8 @@ class BitString(Obj): ) if t == self.tag: if tag_only: # pragma: no cover - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2757,8 +2926,9 @@ class BitString(Obj): offset=offset, ) v, tail = v[:l], v[l:] + bit_len = (len(v) - 1) * 8 - pad_size obj = self.__class__( - value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), + value=None if evgen_mode else (bit_len, v[1:].tobytes()), impl=self.tag, expl=self._expl, default=self.default, @@ -2766,7 +2936,10 @@ class BitString(Obj): _specs=self.specs, _decoded=(offset, llen, l), ) - return obj, tail + if evgen_mode: + obj._value = (bit_len, None) + yield decode_path, obj, tail + return if t != self.tag_constructed: raise TagMismatch( klass=self.__class__, @@ -2781,7 +2954,8 @@ class BitString(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return lenindef = False try: l, llen, v = len_decode(lv) @@ -2828,14 +3002,26 @@ class BitString(Obj): ) sub_decode_path = decode_path + (str(len(chunks)),) try: - chunk, v_tail = BitString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, chunk, v_tail in BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + else: + _, chunk, v_tail = next(BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) except TagMismatch: raise DecodeError( "expected BitString encoded chunk", @@ -2864,13 +3050,15 @@ class BitString(Obj): decode_path=decode_path + (str(chunk_i),), offset=chunk.offset, ) - values.append(bytes(chunk)) + if not evgen_mode: + values.append(bytes(chunk)) bit_len += chunk.bit_len chunk_last = chunks[-1] - values.append(bytes(chunk_last)) + if not evgen_mode: + values.append(bytes(chunk_last)) bit_len += chunk_last.bit_len obj = self.__class__( - value=(bit_len, b"".join(values)), + value=None if evgen_mode else (bit_len, b"".join(values)), impl=self.tag, expl=self._expl, default=self.default, @@ -2878,9 +3066,11 @@ class BitString(Obj): _specs=self.specs, _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), ) + if evgen_mode: + obj._value = (bit_len, None) obj.lenindef = lenindef obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -2891,7 +3081,7 @@ class BitString(Obj): if self.ready: bit_len, blob = self._value value = "%d bits" % bit_len - if len(self.specs) > 0: + if len(self.specs) > 0 and blob is not None: blob = tuple(self.named) yield _pp( obj=self, @@ -2966,6 +3156,7 @@ class OctetString(Obj): __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" + evgen_mode_skip_value = True def __init__( self, @@ -3033,6 +3224,7 @@ class OctetString(Obj): return OctetStringState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3104,7 +3296,29 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + octets = self._value + if len(octets) <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000): + write_full(writer, b"".join(( + OctetString.tag_default, + LEN1K, + octets[offset:offset + 1000], + ))) + tail = octets[offset+1000:] + if len(tail) > 0: + write_full(writer, b"".join(( + OctetString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3116,7 +3330,8 @@ class OctetString(Obj): ) if t == self.tag: if tag_only: - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3134,9 +3349,19 @@ class OctetString(Obj): offset=offset, ) v, tail = v[:l], v[l:] + if evgen_mode and not self._bound_min <= len(v) <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, len(v), self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=v.tobytes(), + value=( + None if (evgen_mode and self.evgen_mode_skip_value) + else v.tobytes() + ), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -3159,7 +3384,8 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail + yield decode_path, obj, tail + return if t != self.tag_constructed: raise TagMismatch( klass=self.__class__, @@ -3174,7 +3400,8 @@ class OctetString(Obj): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False try: l, llen, v = len_decode(lv) @@ -3196,8 +3423,10 @@ class OctetString(Obj): offset=offset, ) chunks = [] + chunks_count = 0 sub_offset = offset + tlen + llen vlen = 0 + payload_len = 0 while True: if lenindef: if v[:EOC_LEN].tobytes() == EOC: @@ -3212,16 +3441,33 @@ class OctetString(Obj): decode_path=decode_path + (str(len(chunks) - 1),), offset=chunks[-1].offset, ) - sub_decode_path = decode_path + (str(len(chunks)),) try: - chunk, v_tail = OctetString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + sub_decode_path = decode_path + (str(chunks_count),) + for _decode_path, chunk, v_tail in OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + if not chunk.ber_encoded: + payload_len += chunk.vlen + chunks_count += 1 + else: + sub_decode_path = decode_path + (str(len(chunks)),) + _, chunk, v_tail = next(OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + chunks.append(chunk) except TagMismatch: raise DecodeError( "expected OctetString encoded chunk", @@ -3229,13 +3475,22 @@ class OctetString(Obj): decode_path=sub_decode_path, offset=sub_offset, ) - chunks.append(chunk) sub_offset += chunk.tlvlen vlen += chunk.tlvlen v = v_tail + if evgen_mode and not self._bound_min <= payload_len <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=b"".join(bytes(chunk) for chunk in chunks), + value=( + None if evgen_mode else + b"".join(bytes(chunk) for chunk in chunks) + ), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -3260,7 +3515,7 @@ class OctetString(Obj): ) obj.lenindef = lenindef obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -3338,6 +3593,7 @@ class Null(Obj): return NullState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3373,7 +3629,7 @@ class Null(Obj): def _encode(self): return self.tag + len_encode(0) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3390,7 +3646,8 @@ class Null(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -3413,7 +3670,7 @@ class Null(Obj): optional=self.optional, _decoded=(offset, 1, 0), ) - return obj, v + yield decode_path, obj, v def __repr__(self): return pp_console_row(next(self.pps())) @@ -3553,6 +3810,7 @@ class ObjectIdentifier(Obj): return ObjectIdentifierState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3637,7 +3895,7 @@ class ObjectIdentifier(Obj): v = b"".join(octets) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3654,7 +3912,8 @@ class ObjectIdentifier(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3724,7 +3983,7 @@ class ObjectIdentifier(Obj): ) if ber_encoded: obj.ber_encoded = True - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -4208,6 +4467,7 @@ class UTCTime(VisibleString): tag_default = tag_encode(23) encoding = "ascii" asn1_type_name = "UTCTime" + evgen_mode_skip_value = False def __init__( self, @@ -4378,6 +4638,9 @@ class UTCTime(VisibleString): value = self._encode_time() return b"".join((self.tag, len_encode(len(value)), value)) + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def todatetime(self): return self._value @@ -4654,6 +4917,9 @@ class Choice(Obj): self.default = default_obj if value is None: self._value = copy(default_obj._value) + if self._expl is not None: + tag_class, _, tag_num = tag_decode(self._expl) + self._tag_order = (tag_class, tag_num) def _value_sanitize(self, value): if (value.__class__ == tuple) and len(value) == 2: @@ -4683,6 +4949,7 @@ class Choice(Obj): return ChoiceState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -4728,14 +4995,27 @@ class Choice(Obj): @property def choice(self): + """Name of the choice + """ self._assert_ready() return self._value[0] @property def value(self): + """Value of underlying choice + """ self._assert_ready() return self._value[1] + @property + def tag_order(self): + self._assert_ready() + return self._value[1].tag_order if self._tag_order is None else self._tag_order + + @property + def tag_order_cer(self): + return min(v.tag_order_cer for v in itervalues(self.specs)) + def __getitem__(self, key): if key not in self.specs: raise ObjUnknown(key) @@ -4766,7 +5046,11 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + self._assert_ready() + self._value[1].encode_cer(writer) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): for choice, spec in iteritems(self.specs): sub_decode_path = decode_path + (choice,) try: @@ -4789,15 +5073,28 @@ class Choice(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None - value, tail = spec.decode( - tlv, - offset=offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + yield None + return + if evgen_mode: + for _decode_path, value, tail in spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, tail + else: + _, value, tail = next(spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) obj = self.__class__( schema=self.specs, expl=self._expl, @@ -4806,7 +5103,7 @@ class Choice(Obj): _decoded=(offset, 0, value.fulllen), ) obj._value = (choice, value) - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -4884,7 +5181,7 @@ class Any(Obj): """``ANY`` special type >>> Any(Integer(-123)) - ANY 020185 + ANY INTEGER -123 (0X:7B) >>> a = Any(OctetString(b"hello world").encode()) ANY 040b68656c6c6f20776f726c64 >>> hexenc(bytes(a)) @@ -4904,28 +5201,47 @@ class Any(Obj): """ :param value: set the value. Either any kind of pyderasn's **ready** object, or bytes. Pay attention that - **no** validation is performed is raw binary value - is valid TLV + **no** validation is performed if raw binary value + is valid TLV, except just tag decoding :param bytes expl: override default tag with ``EXPLICIT`` one :param bool optional: is object ``OPTIONAL`` in sequence """ super(Any, self).__init__(None, expl, None, optional, _decoded) - self._value = None if value is None else self._value_sanitize(value) + if value is None: + self._value = None + else: + value = self._value_sanitize(value) + self._value = value + if self._expl is None: + if value.__class__ == binary_type: + tag_class, _, tag_num = tag_decode(tag_strip(value)[0]) + else: + tag_class, tag_num = value.tag_order + else: + tag_class, _, tag_num = tag_decode(self._expl) + self._tag_order = (tag_class, tag_num) self.defined = None def _value_sanitize(self, value): if value.__class__ == binary_type: + if len(value) == 0: + raise ValueError("Any value can not be empty") return value if isinstance(value, self.__class__): return value._value - if isinstance(value, Obj): - return value.encode() - raise InvalidValueType((self.__class__, Obj, binary_type)) + if not isinstance(value, Obj): + raise InvalidValueType((self.__class__, Obj, binary_type)) + return value @property def ready(self): return self._value is not None + @property + def tag_order(self): + self._assert_ready() + return self._tag_order + @property def bered(self): if self.expl_lenindef or self.lenindef: @@ -4938,6 +5254,7 @@ class Any(Obj): return AnyState( __version__, self.tag, + self._tag_order, self._expl, None, self.optional, @@ -4958,9 +5275,13 @@ class Any(Obj): def __eq__(self, their): if their.__class__ == binary_type: - return self._value == their + if self._value.__class__ == binary_type: + return self._value == their + return self._value.encode() == their if issubclass(their.__class__, Any): - return self._value == their._value + if self.ready and their.ready: + return bytes(self) == bytes(their) + return self.ready == their.ready return False def __call__( @@ -4977,7 +5298,10 @@ class Any(Obj): def __bytes__(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return self._value.encode() @property def tlen(self): @@ -4985,9 +5309,20 @@ class Any(Obj): def _encode(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return value.encode() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + self._assert_ready() + value = self._value + if value.__class__ == binary_type: + write_full(writer, value) + else: + value.encode_cer(writer) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5024,14 +5359,15 @@ class Any(Obj): chunk_i += 1 tlvlen = tlen + llen + vlen + EOC_LEN obj = self.__class__( - value=tlv[:tlvlen].tobytes(), + value=None if evgen_mode else tlv[:tlvlen].tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.lenindef = True obj.tag = t.tobytes() - return obj, v[EOC_LEN:] + yield decode_path, obj, v[EOC_LEN:] + return except DecodeError as err: raise err.__class__( msg=err.msg, @@ -5049,24 +5385,32 @@ class Any(Obj): tlvlen = tlen + llen + l v, tail = tlv[:tlvlen], v[l:] obj = self.__class__( - value=v.tobytes(), + value=None if evgen_mode else v.tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.tag = t.tobytes() - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) def pps(self, decode_path=()): + value = self._value + if value is None: + pass + elif value.__class__ == binary_type: + value = None + else: + value = repr(value) yield _pp( obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, - blob=self._value if self.ready else None, + value=value, + blob=self._value if self._value.__class__ == binary_type else None, optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), @@ -5229,6 +5573,12 @@ class Sequence(Obj): defaulted values existence validation by setting ``"allow_default_values": True`` :ref:`context ` option. + .. warning:: + + Check for default value existence is not performed in + ``evgen_mode``, because previously decoded values are not stored + in memory, to be able to compare them. + Two sequences are equal if they have equal specification (schema), implicit/explicit tagging and the same values. """ @@ -5294,6 +5644,7 @@ class Sequence(Obj): return SequenceState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -5381,7 +5732,13 @@ class Sequence(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5398,7 +5755,8 @@ class Sequence(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5442,21 +5800,33 @@ class Sequence(Obj): continue sub_decode_path = decode_path + (name,) try: - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) except TagMismatch as err: if (len(err.decode_path) == len(decode_path) + 1) and spec.optional: continue raise defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path) - if defined is not None: + if not evgen_mode and defined is not None: defined_by, defined_spec = defined if issubclass(value.__class__, SequenceOf): for i, _value in enumerate(value): @@ -5508,31 +5878,32 @@ class Sequence(Obj): vlen += value_len sub_offset += value_len v = v_tail - if spec.default is not None and value == spec.default: - if ctx_bered or ctx_allow_default_values: - ber_encoded = True - else: - raise DecodeError( - "DEFAULT value met", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - values[name] = value - - spec_defines = getattr(spec, "defines", ()) - if len(spec_defines) == 0: - defines_by_path = ctx.get("defines_by_path", ()) - if len(defines_by_path) > 0: - spec_defines = get_def_by_path(defines_by_path, sub_decode_path) - if spec_defines is not None and len(spec_defines) > 0: - for rel_path, schema in spec_defines: - defined = schema.get(value, None) - if defined is not None: - ctx.setdefault("_defines", []).append(( - abs_decode_path(sub_decode_path[:-1], rel_path), - (value, defined), - )) + if not evgen_mode: + if spec.default is not None and value == spec.default: + # This will not work in evgen_mode + if ctx_bered or ctx_allow_default_values: + ber_encoded = True + else: + raise DecodeError( + "DEFAULT value met", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + values[name] = value + spec_defines = getattr(spec, "defines", ()) + if len(spec_defines) == 0: + defines_by_path = ctx.get("defines_by_path", ()) + if len(defines_by_path) > 0: + spec_defines = get_def_by_path(defines_by_path, sub_decode_path) + if spec_defines is not None and len(spec_defines) > 0: + for rel_path, schema in spec_defines: + defined = schema.get(value, None) + if defined is not None: + ctx.setdefault("_defines", []).append(( + abs_decode_path(sub_decode_path[:-1], rel_path), + (value, defined), + )) if lenindef: if v[:EOC_LEN].tobytes() != EOC: raise DecodeError( @@ -5561,7 +5932,7 @@ class Sequence(Obj): obj._value = values obj.lenindef = lenindef obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -5623,15 +5994,22 @@ class Set(Sequence): asn1_type_name = "SET" def _encode(self): - raws = [v.encode() for v in self._values_for_encoding()] - raws.sort() - v = b"".join(raws) + v = b"".join(value.encode() for value in sorted( + self._values_for_encoding(), + key=attrgetter("tag_order"), + )) return b"".join((self.tag, len_encode(len(v)), v)) - def _specs_items(self): - return iteritems(self.specs) + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted( + self._values_for_encoding(), + key=attrgetter("tag_order_cer"), + ): + v.encode_cer(writer) + write_full(writer, EOC) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5648,7 +6026,8 @@ class Set(Sequence): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5684,12 +6063,13 @@ class Set(Sequence): ber_encoded = False ctx_allow_default_values = ctx.get("allow_default_values", False) ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) - value_prev = memoryview(v[:0]) + tag_order_prev = (0, 0) + _specs_items = copy(self.specs) while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - for name, spec in self._specs_items(): + for name, spec in iteritems(_specs_items): sub_decode_path = decode_path + (name,) try: spec.decode( @@ -5710,16 +6090,29 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + value_tag_order = value.tag_order value_len = value.fulllen - if value_prev.tobytes() > v[:value_len].tobytes(): + if tag_order_prev >= value_tag_order: if ctx_bered or ctx_allow_unordered_set: ber_encoded = True else: @@ -5741,10 +6134,12 @@ class Set(Sequence): offset=sub_offset, ) values[name] = value - value_prev = v[:value_len] + del _specs_items[name] + tag_order_prev = value_tag_order sub_offset += value_len vlen += value_len v = v_tail + obj = self.__class__( schema=self.specs, impl=self.tag, @@ -5763,7 +6158,6 @@ class Set(Sequence): ) tail = v[EOC_LEN:] obj.lenindef = True - obj._value = values for name, spec in iteritems(self.specs): if name not in values and not spec.optional: raise DecodeError( @@ -5772,8 +6166,10 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) + if not evgen_mode: + obj._value = values obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail SequenceOfState = namedtuple( @@ -5882,6 +6278,7 @@ class SequenceOf(Obj): return SequenceOfState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -5975,7 +6372,22 @@ class SequenceOf(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only, ordering_check=False): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) + + def _decode( + self, + tlv, + offset, + decode_path, + ctx, + tag_only, + evgen_mode, + ordering_check=False, + ): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5992,7 +6404,8 @@ class SequenceOf(Obj): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -6026,6 +6439,7 @@ class SequenceOf(Obj): vlen = 0 sub_offset = offset + tlen + llen _value = [] + _value_count = 0 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) value_prev = memoryview(v[:0]) ber_encoded = False @@ -6033,15 +6447,27 @@ class SequenceOf(Obj): while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - sub_decode_path = decode_path + (str(len(_value)),) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + sub_decode_path = decode_path + (str(_value_count),) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) value_len = value.fulllen if ordering_check: if value_prev.tobytes() > v[:value_len].tobytes(): @@ -6055,13 +6481,22 @@ class SequenceOf(Obj): offset=sub_offset, ) value_prev = v[:value_len] - _value.append(value) + _value_count += 1 + if not evgen_mode: + _value.append(value) sub_offset += value_len vlen += value_len v = v_tail + if evgen_mode and not self._bound_min <= _value_count <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=_value, + value=None if evgen_mode else _value, schema=spec, bounds=(self._bound_min, self._bound_max), impl=self.tag, @@ -6088,7 +6523,7 @@ class SequenceOf(Obj): obj.lenindef = True tail = v[EOC_LEN:] obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): return "%s[%s]" % ( @@ -6135,18 +6570,23 @@ class SetOf(SequenceOf): asn1_type_name = "SET OF" def _encode(self): - raws = [v.encode() for v in self._values_for_encoding()] - raws.sort() - v = b"".join(raws) + v = b"".join(sorted(v.encode() for v in self._values_for_encoding())) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted(encode_cer(v) for v in self._values_for_encoding()): + write_full(writer, v) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): return super(SetOf, self)._decode( tlv, offset, decode_path, ctx, tag_only, + evgen_mode, ordering_check=True, ) @@ -6268,14 +6708,22 @@ def main(): # pragma: no cover help="Allow explicit tag out-of-bound", ) parser.add_argument( - "DERFile", + "--evgen", + action="store_true", + help="Turn on event generation mode", + ) + parser.add_argument( + "RAWFile", type=argparse.FileType("rb"), - help="Path to DER file you want to decode", + help="Path to BER/CER/DER file you want to decode", ) args = parser.parse_args() - args.DERFile.seek(args.skip) - der = memoryview(args.DERFile.read()) - args.DERFile.close() + if PY2: + args.RAWFile.seek(args.skip) + raw = memoryview(args.RAWFile.read()) + args.RAWFile.close() + else: + raw = file_mmaped(args.RAWFile)[args.skip:] oid_maps = ( [obj_by_path(_path) for _path in (args.oids or "").split(",")] if args.oids else () @@ -6292,10 +6740,9 @@ def main(): # pragma: no cover } if args.defines_by_path is not None: ctx["defines_by_path"] = obj_by_path(args.defines_by_path) - obj, tail = schema().decode(der, ctx=ctx) from os import environ - print(pprinter( - obj, + pprinter = partial( + pprinter, oid_maps=oid_maps, with_colours=environ.get("NO_COLOR") is None, with_decode_path=args.print_decode_path, @@ -6303,7 +6750,13 @@ def main(): # pragma: no cover () if args.decode_path_only is None else tuple(args.decode_path_only.split(":")) ), - )) + ) + if args.evgen: + for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx): + print(pprinter(obj, decode_path=decode_path)) + else: + obj, tail = schema().decode(raw, ctx=ctx) + print(pprinter(obj)) if tail != b"": print("\nTrailing data: %s" % hexenc(tail))