X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=pyderasn.py;h=fe1d4ab800624dfcb03307fe06b649f9ae259b3b;hb=e9ab3a6d6679d825d0e48523cec13ff710f0220d;hp=ced46ddeb0f4fde4ca6f9be75120e9d1d421e1c2;hpb=287f77ca2cf0b62db54e199ca1b5c16c93c3624e;p=pyderasn.git diff --git a/pyderasn.py b/pyderasn.py index ced46dd..fe1d4ab 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -352,7 +352,6 @@ Let's parse that output, human:: (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime`` could be BERed. - .. _definedby: DEFINED BY @@ -550,12 +549,12 @@ _______ Integer _______ .. autoclass:: pyderasn.Integer - :members: __init__ + :members: __init__, named BitString _________ .. autoclass:: pyderasn.BitString - :members: __init__ + :members: __init__, bit_len, named OctetString ___________ @@ -587,7 +586,7 @@ _____________ PrintableString _______________ .. autoclass:: pyderasn.PrintableString - :members: __init__ + :members: __init__, allow_asterisk, allow_ampersand UTCTime _______ @@ -597,6 +596,7 @@ _______ GeneralizedTime _______________ .. autoclass:: pyderasn.GeneralizedTime + :members: __init__, todatetime Special types ------------- @@ -604,7 +604,7 @@ Special types Choice ______ .. autoclass:: pyderasn.Choice - :members: __init__ + :members: __init__, choice, value PrimitiveTypes ______________ @@ -643,6 +643,8 @@ Various .. autofunction:: pyderasn.abs_decode_path .. autofunction:: pyderasn.colonize_hex +.. autofunction:: pyderasn.encode_cer +.. autofunction:: pyderasn.file_mmaped .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec .. autofunction:: pyderasn.tag_encode @@ -661,6 +663,115 @@ Various .. autoclass:: pyderasn.ObjNotReady .. autoclass:: pyderasn.InvalidValueType .. autoclass:: pyderasn.BoundsError + +.. _cmdline: + +Command-line usage +------------------ + +You can decode DER/BER files using command line abilities:: + + $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file + +If there is no schema for your file, then you can try parsing it without, +but of course IMPLICIT tags will often make it impossible. But result is +good enough for the certificate above:: + + $ python -m pyderasn path/to/file + 0 [1,3,1604] . >: SEQUENCE OF + 4 [1,3,1453] . . >: SEQUENCE OF + 8 [0,0, 5] . . . . >: [0] ANY + . . . . . A0:03:02:01:02 + 13 [1,1, 3] . . . . >: INTEGER 61595 + 18 [1,1, 13] . . . . >: SEQUENCE OF + 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [1,1, 0] . . . . . . >: NULL + 33 [1,3, 274] . . . . >: SEQUENCE OF + 37 [1,1, 11] . . . . . . >: SET OF + 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF + 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6 + 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES + [...] + 1409 [1,1, 50] . . . . . . >: SEQUENCE OF + 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1 + 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes + . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16 + . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63 + . . . . . . . . . 61:2E:63:6F:6D:2F + 1461 [1,1, 13] . . >: SEQUENCE OF + 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 1474 [1,1, 0] . . . . >: NULL + 1476 [1,2, 129] . . >: BIT STRING 1024 bits + . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD + . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C + [...] + +Human readable OIDs +___________________ + +If you have got dictionaries with ObjectIdentifiers, like example one +from ``tests/test_crts.py``:: + + stroid2name = { + "1.2.840.113549.1.1.1": "id-rsaEncryption", + "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption", + [...] + "2.5.4.10": "id-at-organizationName", + "2.5.4.11": "id-at-organizationalUnitName", + } + +then you can pass it to pretty printer to see human readable OIDs:: + + $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file + [...] + 37 [1,1, 11] . . . . . . >: SET OF + 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF + 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6) + 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES + 50 [1,1, 18] . . . . . . >: SET OF + 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF + 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8) + 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona + 70 [1,1, 18] . . . . . . >: SET OF + 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF + 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7) + 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona + [...] + +Decode paths +____________ + +Each decoded element has so-called decode path: sequence of structure +names it is passing during the decode process. Each element has its own +unique path inside the whole ASN.1 tree. You can print it out with +``--print-decode-path`` option:: + + $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file + 0 [1,3,1604] Certificate SEQUENCE [] + 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate] + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version] + 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber] + 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature] + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm] + 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters] + . . . . 05:00 + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer] + 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence] + 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0] + 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0] + 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type] + 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value] + . . . . . . . 13:02:45:53 + 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6] + [...] + +Now you can print only the specified tree, for example signature algorithm:: + + $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file + 18 [1,1, 13] AlgorithmIdentifier SEQUENCE + 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL + . . 05:00 """ from codecs import getdecoder @@ -670,7 +781,11 @@ from collections import OrderedDict from copy import copy from datetime import datetime from datetime import timedelta +from io import BytesIO from math import ceil +from mmap import mmap +from mmap import PROT_READ +from operator import attrgetter from string import ascii_letters from string import digits from sys import version_info @@ -698,7 +813,7 @@ except ImportError: # pragma: no cover def colored(what, *args, **kwargs): return what -__version__ = "6.3" +__version__ = "7.0" __all__ = ( "Any", @@ -709,8 +824,10 @@ __all__ = ( "Choice", "DecodeError", "DecodePathDefBy", + "encode_cer", "Enumerated", "ExceedingData", + "file_mmaped", "GeneralizedTime", "GeneralString", "GraphicString", @@ -778,6 +895,14 @@ DECIMALS = frozenset(digits) DECIMAL_SIGNS = ".," +def file_mmaped(fd): + """Make mmap-ed memoryview for reading from file + + :param fd: file object + :returns: memoryview over read-only mmap-ing of the whole file + """ + return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ)) + def pureint(value): if not set(value) <= DECIMALS: raise ValueError("non-pure integer") @@ -788,6 +913,19 @@ def fractions2float(fractions_raw): return float("0." + fractions_raw) +def get_def_by_path(defines_by_path, sub_decode_path): + """Get define by decode path + """ + for path, define in defines_by_path: + if len(path) != len(sub_decode_path): + continue + for p1, p2 in zip(path, sub_decode_path): + if (not p1 is any) and (p1 != p2): + break + else: + return define + + ######################################################################## # Errors ######################################################################## @@ -1067,6 +1205,23 @@ def len_decode(data): return l, 1 + octets_num, data[1 + octets_num:] +LEN1K = len_encode(1000) + + +def write_full(writer, data): + """Fully write provided data + + BytesIO does not guarantee that the whole data will be written at once. + """ + data = memoryview(data) + written = 0 + while written != len(data): + n = writer(data[written:]) + if n is None: + raise ValueError("can not write to buf") + written += n + + ######################################################################## # Base class ######################################################################## @@ -1077,6 +1232,22 @@ class AutoAddSlots(type): return type.__new__(cls, name, bases, _dict) +BasicState = namedtuple("BasicState", ( + "version", + "tag", + "tag_order", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + @add_metaclass(AutoAddSlots) class Obj(object): """Common ASN.1 object class @@ -1086,6 +1257,7 @@ class Obj(object): """ __slots__ = ( "tag", + "_tag_order", "_value", "_expl", "default", @@ -1110,6 +1282,13 @@ class Obj(object): self._expl = getattr(self, "expl", None) if expl is None else expl if self.tag != self.tag_default and self._expl is not None: raise ValueError("implicit and explicit tags can not be set simultaneously") + if self.tag is None: + self._tag_order = None + else: + tag_class, _, tag_num = tag_decode( + self.tag if self._expl is None else self._expl + ) + self._tag_order = (tag_class, tag_num) if default is not None: optional = True self.optional = optional @@ -1149,17 +1328,27 @@ class Obj(object): def __setstate__(self, state): if state.version != __version__: raise ValueError("data is pickled by different PyDERASN version") - self.tag = self.tag_default - self._value = None - self._expl = None - self.default = None - self.optional = False - self.offset = 0 - self.llen = 0 - self.vlen = 0 - self.expl_lenindef = False - self.lenindef = False - self.ber_encoded = False + self.tag = state.tag + self._tag_order = state.tag_order + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + + @property + def tag_order(self): + """Tag's (class, number) used for DER/CER sorting + """ + return self._tag_order + + @property + def tag_order_cer(self): + return self.tag_order @property def tlen(self): @@ -1191,8 +1380,8 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): # pragma: no cover - raise NotImplementedError() + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover + yield NotImplemented def encode(self): """Encode the structure @@ -1204,6 +1393,19 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) + def encode_cer(self, writer): + if self._expl is not None: + write_full(writer, self._expl + LENINDEF) + if getattr(self, "der_forced", False): + write_full(writer, self._encode()) + else: + self._encode_cer(writer) + if self._expl is not None: + write_full(writer, EOC) + + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def hexencode(self): """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode` """ @@ -1218,6 +1420,32 @@ class Obj(object): ctx=None, tag_only=False, _ctx_immutable=True, + ): + result = next(self.decode_evgen( + data, + offset, + leavemm, + decode_path, + ctx, + tag_only, + _ctx_immutable, + _evgen_mode=False, + )) + if result is None: + return None + _, obj, tail = result + return obj, tail + + def decode_evgen( + self, + data, + offset=0, + leavemm=False, + decode_path=(), + ctx=None, + tag_only=False, + _ctx_immutable=True, + _evgen_mode=True, ): """Decode the data @@ -1240,17 +1468,26 @@ class Obj(object): elif _ctx_immutable: ctx = copy(ctx) tlv = memoryview(data) + if ( + _evgen_mode and + get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None + ): + _evgen_mode = False if self._expl is None: - result = self._decode( - tlv, - offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: - return None - obj, tail = result + for result in self._decode( + tlv, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result else: try: t, tlen, lv = tag_strip(tlv) @@ -1279,16 +1516,20 @@ class Obj(object): ) llen, v = 1, lv[1:] offset += tlen + llen - result = self._decode( - v, - offset=offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: raise DecodeError( @@ -1314,16 +1555,20 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - result = self._decode( - v, - offset=offset + tlen + llen, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset + tlen + llen, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( "explicit tag out-of-bound, longer than data", @@ -1331,7 +1576,7 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - return obj, (tail if leavemm else tail.tobytes()) + yield decode_path, obj, (tail if leavemm else tail.tobytes()) def decod(self, data, offset=0, decode_path=(), ctx=None): """Decode the data, check that tail is empty @@ -1452,6 +1697,14 @@ class Obj(object): ) +def encode_cer(obj): + """Encode to CER in memory + """ + buf = BytesIO() + obj.encode_cer(buf.write) + return buf.getvalue() + + class DecodePathDefBy(object): """DEFINED BY representation inside decode path """ @@ -1690,6 +1943,7 @@ def pprint( with_colours=False, with_decode_path=False, decode_path_only=(), + decode_path=(), ): """Pretty print object @@ -1742,27 +1996,18 @@ def pprint( else: for row in _pprint_pps(pp): yield row - return "\n".join(_pprint_pps(obj.pps())) + return "\n".join(_pprint_pps(obj.pps(decode_path))) ######################################################################## # ASN.1 primitive types ######################################################################## -BooleanState = namedtuple("BooleanState", ( - "version", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -), **NAMEDTUPLE_KWARGS) +BooleanState = namedtuple( + "BooleanState", + BasicState._fields + ("value",), + **NAMEDTUPLE_KWARGS +) class Boolean(Obj): @@ -1822,8 +2067,8 @@ class Boolean(Obj): def __getstate__(self): return BooleanState( __version__, - self._value, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -1833,21 +2078,12 @@ class Boolean(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, ) def __setstate__(self, state): super(Boolean, self).__setstate__(state) self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __nonzero__(self): self._assert_ready() @@ -1892,7 +2128,7 @@ class Boolean(Obj): (b"\xFF" if self._value else b"\x00"), )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1909,7 +2145,8 @@ class Boolean(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -1958,7 +2195,7 @@ class Boolean(Obj): _decoded=(offset, 1, 1), ) obj.ber_encoded = ber_encoded - return obj, v[1:] + yield decode_path, obj, v[1:] def __repr__(self): return pp_console_row(next(self.pps())) @@ -1990,23 +2227,11 @@ class Boolean(Obj): yield pp -IntegerState = namedtuple("IntegerState", ( - "version", - "specs", - "value", - "bound_min", - "bound_max", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -), **NAMEDTUPLE_KWARGS) +IntegerState = namedtuple( + "IntegerState", + BasicState._fields + ("specs", "value", "bound_min", "bound_max"), + **NAMEDTUPLE_KWARGS +) class Integer(Obj): @@ -2113,11 +2338,8 @@ class Integer(Obj): def __getstate__(self): return IntegerState( __version__, - self.specs, - self._value, - self._bound_min, - self._bound_max, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2127,6 +2349,10 @@ class Integer(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + self._value, + self._bound_min, + self._bound_max, ) def __setstate__(self, state): @@ -2135,16 +2361,6 @@ class Integer(Obj): self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __int__(self): self._assert_ready() @@ -2174,6 +2390,8 @@ class Integer(Obj): @property def named(self): + """Return named representation (if exists) of the value + """ for name, value in iteritems(self.specs): if value == self._value: return name @@ -2240,7 +2458,7 @@ class Integer(Obj): break return b"".join((self.tag, len_encode(len(octets)), octets)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2257,7 +2475,8 @@ class Integer(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2328,7 +2547,7 @@ class Integer(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -2359,23 +2578,11 @@ class Integer(Obj): yield pp -BitStringState = namedtuple("BitStringState", ( - "version", - "specs", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "tag_constructed", - "defined", -), **NAMEDTUPLE_KWARGS) +BitStringState = namedtuple( + "BitStringState", + BasicState._fields + ("specs", "value", "tag_constructed", "defined"), + **NAMEDTUPLE_KWARGS +) class BitString(Obj): @@ -2533,9 +2740,8 @@ class BitString(Obj): def __getstate__(self): return BitStringState( __version__, - self.specs, - self._value, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2545,6 +2751,8 @@ class BitString(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + self._value, self.tag_constructed, self.defined, ) @@ -2553,16 +2761,6 @@ class BitString(Obj): super(BitString, self).__setstate__(state) self.specs = state.specs self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.tag_constructed = state.tag_constructed self.defined = state.defined @@ -2573,6 +2771,8 @@ class BitString(Obj): @property def bit_len(self): + """Returns number of bits in the string + """ self._assert_ready() return self._value[0] @@ -2593,6 +2793,10 @@ class BitString(Obj): @property def named(self): + """Named representation (if exists) of the bits + + :returns: [str(name), ...] + """ return [name for name, bit in iteritems(self.specs) if self[bit]] def __call__( @@ -2638,7 +2842,31 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + bit_len, octets = self._value + if len(octets) + 1 <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 999) * 999, 999): + write_full(writer, b"".join(( + BitString.tag_default, + LEN1K, + int2byte(0), + octets[offset:offset + 999], + ))) + tail = octets[offset+999:] + if len(tail) > 0: + tail = int2byte((8 - bit_len % 8) % 8) + tail + write_full(writer, b"".join(( + BitString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -2650,7 +2878,8 @@ class BitString(Obj): ) if t == self.tag: if tag_only: # pragma: no cover - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2697,8 +2926,9 @@ class BitString(Obj): offset=offset, ) v, tail = v[:l], v[l:] + bit_len = (len(v) - 1) * 8 - pad_size obj = self.__class__( - value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), + value=None if evgen_mode else (bit_len, v[1:].tobytes()), impl=self.tag, expl=self._expl, default=self.default, @@ -2706,7 +2936,10 @@ class BitString(Obj): _specs=self.specs, _decoded=(offset, llen, l), ) - return obj, tail + if evgen_mode: + obj._value = (bit_len, None) + yield decode_path, obj, tail + return if t != self.tag_constructed: raise TagMismatch( klass=self.__class__, @@ -2721,7 +2954,8 @@ class BitString(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return lenindef = False try: l, llen, v = len_decode(lv) @@ -2768,18 +3002,30 @@ class BitString(Obj): ) sub_decode_path = decode_path + (str(len(chunks)),) try: - chunk, v_tail = BitString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) - except TagMismatch: - raise DecodeError( - "expected BitString encoded chunk", - klass=self.__class__, + if evgen_mode: + for _decode_path, chunk, v_tail in BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + else: + _, chunk, v_tail = next(BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + except TagMismatch: + raise DecodeError( + "expected BitString encoded chunk", + klass=self.__class__, decode_path=sub_decode_path, offset=sub_offset, ) @@ -2804,13 +3050,15 @@ class BitString(Obj): decode_path=decode_path + (str(chunk_i),), offset=chunk.offset, ) - values.append(bytes(chunk)) + if not evgen_mode: + values.append(bytes(chunk)) bit_len += chunk.bit_len chunk_last = chunks[-1] - values.append(bytes(chunk_last)) + if not evgen_mode: + values.append(bytes(chunk_last)) bit_len += chunk_last.bit_len obj = self.__class__( - value=(bit_len, b"".join(values)), + value=None if evgen_mode else (bit_len, b"".join(values)), impl=self.tag, expl=self._expl, default=self.default, @@ -2818,9 +3066,11 @@ class BitString(Obj): _specs=self.specs, _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), ) + if evgen_mode: + obj._value = (bit_len, None) obj.lenindef = lenindef obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -2831,7 +3081,7 @@ class BitString(Obj): if self.ready: bit_len, blob = self._value value = "%d bits" % bit_len - if len(self.specs) > 0: + if len(self.specs) > 0 and blob is not None: blob = tuple(self.named) yield _pp( obj=self, @@ -2866,24 +3116,17 @@ class BitString(Obj): yield pp -OctetStringState = namedtuple("OctetStringState", ( - "version", - "value", - "bound_min", - "bound_max", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "tag_constructed", - "defined", -), **NAMEDTUPLE_KWARGS) +OctetStringState = namedtuple( + "OctetStringState", + BasicState._fields + ( + "value", + "bound_min", + "bound_max", + "tag_constructed", + "defined", + ), + **NAMEDTUPLE_KWARGS +) class OctetString(Obj): @@ -2913,6 +3156,7 @@ class OctetString(Obj): __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" + evgen_mode_skip_value = True def __init__( self, @@ -2979,10 +3223,8 @@ class OctetString(Obj): def __getstate__(self): return OctetStringState( __version__, - self._value, - self._bound_min, - self._bound_max, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2992,6 +3234,9 @@ class OctetString(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, + self._bound_min, + self._bound_max, self.tag_constructed, self.defined, ) @@ -3001,16 +3246,6 @@ class OctetString(Obj): self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.tag_constructed = state.tag_constructed self.defined = state.defined @@ -3061,7 +3296,29 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + octets = self._value + if len(octets) <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000): + write_full(writer, b"".join(( + OctetString.tag_default, + LEN1K, + octets[offset:offset + 1000], + ))) + tail = octets[offset+1000:] + if len(tail) > 0: + write_full(writer, b"".join(( + OctetString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3073,7 +3330,8 @@ class OctetString(Obj): ) if t == self.tag: if tag_only: - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3091,9 +3349,19 @@ class OctetString(Obj): offset=offset, ) v, tail = v[:l], v[l:] + if evgen_mode and not self._bound_min <= len(v) <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, len(v), self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=v.tobytes(), + value=( + None if (evgen_mode and self.evgen_mode_skip_value) + else v.tobytes() + ), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -3116,7 +3384,8 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail + yield decode_path, obj, tail + return if t != self.tag_constructed: raise TagMismatch( klass=self.__class__, @@ -3131,7 +3400,8 @@ class OctetString(Obj): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False try: l, llen, v = len_decode(lv) @@ -3153,8 +3423,10 @@ class OctetString(Obj): offset=offset, ) chunks = [] + chunks_count = 0 sub_offset = offset + tlen + llen vlen = 0 + payload_len = 0 while True: if lenindef: if v[:EOC_LEN].tobytes() == EOC: @@ -3169,16 +3441,33 @@ class OctetString(Obj): decode_path=decode_path + (str(len(chunks) - 1),), offset=chunks[-1].offset, ) - sub_decode_path = decode_path + (str(len(chunks)),) try: - chunk, v_tail = OctetString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + sub_decode_path = decode_path + (str(chunks_count),) + for _decode_path, chunk, v_tail in OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + if not chunk.ber_encoded: + payload_len += chunk.vlen + chunks_count += 1 + else: + sub_decode_path = decode_path + (str(len(chunks)),) + _, chunk, v_tail = next(OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + chunks.append(chunk) except TagMismatch: raise DecodeError( "expected OctetString encoded chunk", @@ -3186,13 +3475,22 @@ class OctetString(Obj): decode_path=sub_decode_path, offset=sub_offset, ) - chunks.append(chunk) sub_offset += chunk.tlvlen vlen += chunk.tlvlen v = v_tail + if evgen_mode and not self._bound_min <= payload_len <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=b"".join(bytes(chunk) for chunk in chunks), + value=( + None if evgen_mode else + b"".join(bytes(chunk) for chunk in chunks) + ), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -3217,7 +3515,7 @@ class OctetString(Obj): ) obj.lenindef = lenindef obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -3256,19 +3554,7 @@ class OctetString(Obj): yield pp -NullState = namedtuple("NullState", ( - "version", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -), **NAMEDTUPLE_KWARGS) +NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS) class Null(Obj): @@ -3307,6 +3593,7 @@ class Null(Obj): return NullState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3318,19 +3605,6 @@ class Null(Obj): self.ber_encoded, ) - def __setstate__(self, state): - super(Null, self).__setstate__(state) - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded - def __eq__(self, their): if not issubclass(their.__class__, Null): return False @@ -3355,7 +3629,7 @@ class Null(Obj): def _encode(self): return self.tag + len_encode(0) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3372,7 +3646,8 @@ class Null(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -3395,7 +3670,7 @@ class Null(Obj): optional=self.optional, _decoded=(offset, 1, 0), ) - return obj, v + yield decode_path, obj, v def __repr__(self): return pp_console_row(next(self.pps())) @@ -3424,21 +3699,11 @@ class Null(Obj): yield pp -ObjectIdentifierState = namedtuple("ObjectIdentifierState", ( - "version", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "defines", -), **NAMEDTUPLE_KWARGS) +ObjectIdentifierState = namedtuple( + "ObjectIdentifierState", + BasicState._fields + ("value", "defines"), + **NAMEDTUPLE_KWARGS +) class ObjectIdentifier(Obj): @@ -3544,8 +3809,8 @@ class ObjectIdentifier(Obj): def __getstate__(self): return ObjectIdentifierState( __version__, - self._value, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3555,22 +3820,13 @@ class ObjectIdentifier(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, self.defines, ) def __setstate__(self, state): super(ObjectIdentifier, self).__setstate__(state) self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.defines = state.defines def __iter__(self): @@ -3639,7 +3895,7 @@ class ObjectIdentifier(Obj): v = b"".join(octets) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3656,7 +3912,8 @@ class ObjectIdentifier(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3726,7 +3983,7 @@ class ObjectIdentifier(Obj): ) if ber_encoded: obj.ber_encoded = True - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -4210,6 +4467,7 @@ class UTCTime(VisibleString): tag_default = tag_encode(23) encoding = "ascii" asn1_type_name = "UTCTime" + evgen_mode_skip_value = False def __init__( self, @@ -4380,6 +4638,9 @@ class UTCTime(VisibleString): value = self._encode_time() return b"".join((self.tag, len_encode(len(value)), value)) + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def todatetime(self): return self._value @@ -4575,21 +4836,11 @@ class BMPString(CommonString): asn1_type_name = "BMPString" -ChoiceState = namedtuple("ChoiceState", ( - "version", - "specs", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -), **NAMEDTUPLE_KWARGS) +ChoiceState = namedtuple( + "ChoiceState", + BasicState._fields + ("specs", "value",), + **NAMEDTUPLE_KWARGS +) class Choice(Obj): @@ -4666,6 +4917,9 @@ class Choice(Obj): self.default = default_obj if value is None: self._value = copy(default_obj._value) + if self._expl is not None: + tag_class, _, tag_num = tag_decode(self._expl) + self._tag_order = (tag_class, tag_num) def _value_sanitize(self, value): if (value.__class__ == tuple) and len(value) == 2: @@ -4694,9 +4948,8 @@ class Choice(Obj): def __getstate__(self): return ChoiceState( __version__, - self.specs, - copy(self._value), self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -4706,21 +4959,14 @@ class Choice(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + copy(self._value), ) def __setstate__(self, state): super(Choice, self).__setstate__(state) self.specs = state.specs self._value = state.value - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __eq__(self, their): if (their.__class__ == tuple) and len(their) == 2: @@ -4749,14 +4995,27 @@ class Choice(Obj): @property def choice(self): + """Name of the choice + """ self._assert_ready() return self._value[0] @property def value(self): + """Value of underlying choice + """ self._assert_ready() return self._value[1] + @property + def tag_order(self): + self._assert_ready() + return self._value[1].tag_order if self._tag_order is None else self._tag_order + + @property + def tag_order_cer(self): + return min(v.tag_order_cer for v in itervalues(self.specs)) + def __getitem__(self, key): if key not in self.specs: raise ObjUnknown(key) @@ -4787,7 +5046,11 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + self._assert_ready() + self._value[1].encode_cer(writer) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): for choice, spec in iteritems(self.specs): sub_decode_path = decode_path + (choice,) try: @@ -4810,15 +5073,28 @@ class Choice(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None - value, tail = spec.decode( - tlv, - offset=offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + yield None + return + if evgen_mode: + for _decode_path, value, tail in spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, tail + else: + _, value, tail = next(spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) obj = self.__class__( schema=self.specs, expl=self._expl, @@ -4827,7 +5103,7 @@ class Choice(Obj): _decoded=(offset, 0, value.fulllen), ) obj._value = (choice, value) - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -4894,27 +5170,18 @@ class PrimitiveTypes(Choice): )) -AnyState = namedtuple("AnyState", ( - "version", - "value", - "tag", - "expl", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "defined", -), **NAMEDTUPLE_KWARGS) +AnyState = namedtuple( + "AnyState", + BasicState._fields + ("value", "defined"), + **NAMEDTUPLE_KWARGS +) class Any(Obj): """``ANY`` special type >>> Any(Integer(-123)) - ANY 020185 + ANY INTEGER -123 (0X:7B) >>> a = Any(OctetString(b"hello world").encode()) ANY 040b68656c6c6f20776f726c64 >>> hexenc(bytes(a)) @@ -4934,28 +5201,47 @@ class Any(Obj): """ :param value: set the value. Either any kind of pyderasn's **ready** object, or bytes. Pay attention that - **no** validation is performed is raw binary value - is valid TLV + **no** validation is performed if raw binary value + is valid TLV, except just tag decoding :param bytes expl: override default tag with ``EXPLICIT`` one :param bool optional: is object ``OPTIONAL`` in sequence """ super(Any, self).__init__(None, expl, None, optional, _decoded) - self._value = None if value is None else self._value_sanitize(value) + if value is None: + self._value = None + else: + value = self._value_sanitize(value) + self._value = value + if self._expl is None: + if value.__class__ == binary_type: + tag_class, _, tag_num = tag_decode(tag_strip(value)[0]) + else: + tag_class, tag_num = value.tag_order + else: + tag_class, _, tag_num = tag_decode(self._expl) + self._tag_order = (tag_class, tag_num) self.defined = None def _value_sanitize(self, value): if value.__class__ == binary_type: + if len(value) == 0: + raise ValueError("Any value can not be empty") return value if isinstance(value, self.__class__): return value._value - if isinstance(value, Obj): - return value.encode() - raise InvalidValueType((self.__class__, Obj, binary_type)) + if not isinstance(value, Obj): + raise InvalidValueType((self.__class__, Obj, binary_type)) + return value @property def ready(self): return self._value is not None + @property + def tag_order(self): + self._assert_ready() + return self._tag_order + @property def bered(self): if self.expl_lenindef or self.lenindef: @@ -4967,9 +5253,10 @@ class Any(Obj): def __getstate__(self): return AnyState( __version__, - self._value, self.tag, + self._tag_order, self._expl, + None, self.optional, self.offset, self.llen, @@ -4977,28 +5264,24 @@ class Any(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, self.defined, ) def __setstate__(self, state): super(Any, self).__setstate__(state) self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.defined = state.defined def __eq__(self, their): if their.__class__ == binary_type: - return self._value == their + if self._value.__class__ == binary_type: + return self._value == their + return self._value.encode() == their if issubclass(their.__class__, Any): - return self._value == their._value + if self.ready and their.ready: + return bytes(self) == bytes(their) + return self.ready == their.ready return False def __call__( @@ -5015,7 +5298,10 @@ class Any(Obj): def __bytes__(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return self._value.encode() @property def tlen(self): @@ -5023,9 +5309,20 @@ class Any(Obj): def _encode(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return value.encode() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + self._assert_ready() + value = self._value + if value.__class__ == binary_type: + write_full(writer, value) + else: + value.encode_cer(writer) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5062,14 +5359,15 @@ class Any(Obj): chunk_i += 1 tlvlen = tlen + llen + vlen + EOC_LEN obj = self.__class__( - value=tlv[:tlvlen].tobytes(), + value=None if evgen_mode else tlv[:tlvlen].tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.lenindef = True obj.tag = t.tobytes() - return obj, v[EOC_LEN:] + yield decode_path, obj, v[EOC_LEN:] + return except DecodeError as err: raise err.__class__( msg=err.msg, @@ -5087,24 +5385,32 @@ class Any(Obj): tlvlen = tlen + llen + l v, tail = tlv[:tlvlen], v[l:] obj = self.__class__( - value=v.tobytes(), + value=None if evgen_mode else v.tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.tag = t.tobytes() - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) def pps(self, decode_path=()): + value = self._value + if value is None: + pass + elif value.__class__ == binary_type: + value = None + else: + value = repr(value) yield _pp( obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, - blob=self._value if self.ready else None, + value=value, + blob=self._value if self._value.__class__ == binary_type else None, optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), @@ -5172,21 +5478,11 @@ def abs_decode_path(decode_path, rel_path): return decode_path + rel_path -SequenceState = namedtuple("SequenceState", ( - "version", - "specs", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -), **NAMEDTUPLE_KWARGS) +SequenceState = namedtuple( + "SequenceState", + BasicState._fields + ("specs", "value",), + **NAMEDTUPLE_KWARGS +) class Sequence(Obj): @@ -5277,6 +5573,12 @@ class Sequence(Obj): defaulted values existence validation by setting ``"allow_default_values": True`` :ref:`context ` option. + .. warning:: + + Check for default value existence is not performed in + ``evgen_mode``, because previously decoded values are not stored + in memory, to be able to compare them. + Two sequences are equal if they have equal specification (schema), implicit/explicit tagging and the same values. """ @@ -5341,9 +5643,8 @@ class Sequence(Obj): def __getstate__(self): return SequenceState( __version__, - self.specs, - {k: copy(v) for k, v in iteritems(self._value)}, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -5353,22 +5654,14 @@ class Sequence(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + {k: copy(v) for k, v in iteritems(self._value)}, ) def __setstate__(self, state): super(Sequence, self).__setstate__(state) self.specs = state.specs self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __eq__(self, their): if not isinstance(their, self.__class__): @@ -5439,7 +5732,13 @@ class Sequence(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5456,7 +5755,8 @@ class Sequence(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5500,21 +5800,33 @@ class Sequence(Obj): continue sub_decode_path = decode_path + (name,) try: - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) except TagMismatch as err: if (len(err.decode_path) == len(decode_path) + 1) and spec.optional: continue raise defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path) - if defined is not None: + if not evgen_mode and defined is not None: defined_by, defined_spec = defined if issubclass(value.__class__, SequenceOf): for i, _value in enumerate(value): @@ -5566,31 +5878,32 @@ class Sequence(Obj): vlen += value_len sub_offset += value_len v = v_tail - if spec.default is not None and value == spec.default: - if ctx_bered or ctx_allow_default_values: - ber_encoded = True - else: - raise DecodeError( - "DEFAULT value met", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - values[name] = value - - spec_defines = getattr(spec, "defines", ()) - if len(spec_defines) == 0: - defines_by_path = ctx.get("defines_by_path", ()) - if len(defines_by_path) > 0: - spec_defines = get_def_by_path(defines_by_path, sub_decode_path) - if spec_defines is not None and len(spec_defines) > 0: - for rel_path, schema in spec_defines: - defined = schema.get(value, None) - if defined is not None: - ctx.setdefault("_defines", []).append(( - abs_decode_path(sub_decode_path[:-1], rel_path), - (value, defined), - )) + if not evgen_mode: + if spec.default is not None and value == spec.default: + # This will not work in evgen_mode + if ctx_bered or ctx_allow_default_values: + ber_encoded = True + else: + raise DecodeError( + "DEFAULT value met", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + values[name] = value + spec_defines = getattr(spec, "defines", ()) + if len(spec_defines) == 0: + defines_by_path = ctx.get("defines_by_path", ()) + if len(defines_by_path) > 0: + spec_defines = get_def_by_path(defines_by_path, sub_decode_path) + if spec_defines is not None and len(spec_defines) > 0: + for rel_path, schema in spec_defines: + defined = schema.get(value, None) + if defined is not None: + ctx.setdefault("_defines", []).append(( + abs_decode_path(sub_decode_path[:-1], rel_path), + (value, defined), + )) if lenindef: if v[:EOC_LEN].tobytes() != EOC: raise DecodeError( @@ -5619,7 +5932,7 @@ class Sequence(Obj): obj._value = values obj.lenindef = lenindef obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -5681,15 +5994,22 @@ class Set(Sequence): asn1_type_name = "SET" def _encode(self): - raws = [v.encode() for v in self._values_for_encoding()] - raws.sort() - v = b"".join(raws) + v = b"".join(value.encode() for value in sorted( + self._values_for_encoding(), + key=attrgetter("tag_order"), + )) return b"".join((self.tag, len_encode(len(v)), v)) - def _specs_items(self): - return iteritems(self.specs) + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted( + self._values_for_encoding(), + key=attrgetter("tag_order_cer"), + ): + v.encode_cer(writer) + write_full(writer, EOC) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5706,7 +6026,8 @@ class Set(Sequence): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5742,12 +6063,13 @@ class Set(Sequence): ber_encoded = False ctx_allow_default_values = ctx.get("allow_default_values", False) ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) - value_prev = memoryview(v[:0]) + tag_order_prev = (0, 0) + _specs_items = copy(self.specs) while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - for name, spec in self._specs_items(): + for name, spec in iteritems(_specs_items): sub_decode_path = decode_path + (name,) try: spec.decode( @@ -5768,16 +6090,29 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + value_tag_order = value.tag_order value_len = value.fulllen - if value_prev.tobytes() > v[:value_len].tobytes(): + if tag_order_prev >= value_tag_order: if ctx_bered or ctx_allow_unordered_set: ber_encoded = True else: @@ -5799,10 +6134,12 @@ class Set(Sequence): offset=sub_offset, ) values[name] = value - value_prev = v[:value_len] + del _specs_items[name] + tag_order_prev = value_tag_order sub_offset += value_len vlen += value_len v = v_tail + obj = self.__class__( schema=self.specs, impl=self.tag, @@ -5821,7 +6158,6 @@ class Set(Sequence): ) tail = v[EOC_LEN:] obj.lenindef = True - obj._value = values for name, spec in iteritems(self.specs): if name not in values and not spec.optional: raise DecodeError( @@ -5830,27 +6166,17 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) + if not evgen_mode: + obj._value = values obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail -SequenceOfState = namedtuple("SequenceOfState", ( - "version", - "spec", - "value", - "bound_min", - "bound_max", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -), **NAMEDTUPLE_KWARGS) +SequenceOfState = namedtuple( + "SequenceOfState", + BasicState._fields + ("spec", "value", "bound_min", "bound_max"), + **NAMEDTUPLE_KWARGS +) class SequenceOf(Obj): @@ -5951,11 +6277,8 @@ class SequenceOf(Obj): def __getstate__(self): return SequenceOfState( __version__, - self.spec, - [copy(v) for v in self._value], - self._bound_min, - self._bound_max, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -5965,6 +6288,10 @@ class SequenceOf(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.spec, + [copy(v) for v in self._value], + self._bound_min, + self._bound_max, ) def __setstate__(self, state): @@ -5973,16 +6300,6 @@ class SequenceOf(Obj): self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __eq__(self, their): if isinstance(their, self.__class__): @@ -6055,7 +6372,22 @@ class SequenceOf(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only, ordering_check=False): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) + + def _decode( + self, + tlv, + offset, + decode_path, + ctx, + tag_only, + evgen_mode, + ordering_check=False, + ): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -6072,7 +6404,8 @@ class SequenceOf(Obj): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -6106,6 +6439,7 @@ class SequenceOf(Obj): vlen = 0 sub_offset = offset + tlen + llen _value = [] + _value_count = 0 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) value_prev = memoryview(v[:0]) ber_encoded = False @@ -6113,15 +6447,27 @@ class SequenceOf(Obj): while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - sub_decode_path = decode_path + (str(len(_value)),) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + sub_decode_path = decode_path + (str(_value_count),) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) value_len = value.fulllen if ordering_check: if value_prev.tobytes() > v[:value_len].tobytes(): @@ -6135,13 +6481,22 @@ class SequenceOf(Obj): offset=sub_offset, ) value_prev = v[:value_len] - _value.append(value) + _value_count += 1 + if not evgen_mode: + _value.append(value) sub_offset += value_len vlen += value_len v = v_tail + if evgen_mode and not self._bound_min <= _value_count <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=_value, + value=None if evgen_mode else _value, schema=spec, bounds=(self._bound_min, self._bound_max), impl=self.tag, @@ -6168,7 +6523,7 @@ class SequenceOf(Obj): obj.lenindef = True tail = v[EOC_LEN:] obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): return "%s[%s]" % ( @@ -6215,18 +6570,23 @@ class SetOf(SequenceOf): asn1_type_name = "SET OF" def _encode(self): - raws = [v.encode() for v in self._values_for_encoding()] - raws.sort() - v = b"".join(raws) + v = b"".join(sorted(v.encode() for v in self._values_for_encoding())) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted(encode_cer(v) for v in self._values_for_encoding()): + write_full(writer, v) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): return super(SetOf, self)._decode( tlv, offset, decode_path, ctx, tag_only, + evgen_mode, ordering_check=True, ) @@ -6348,14 +6708,22 @@ def main(): # pragma: no cover help="Allow explicit tag out-of-bound", ) parser.add_argument( - "DERFile", + "--evgen", + action="store_true", + help="Turn on event generation mode", + ) + parser.add_argument( + "RAWFile", type=argparse.FileType("rb"), - help="Path to DER file you want to decode", + help="Path to BER/CER/DER file you want to decode", ) args = parser.parse_args() - args.DERFile.seek(args.skip) - der = memoryview(args.DERFile.read()) - args.DERFile.close() + if PY2: + args.RAWFile.seek(args.skip) + raw = memoryview(args.RAWFile.read()) + args.RAWFile.close() + else: + raw = file_mmaped(args.RAWFile)[args.skip:] oid_maps = ( [obj_by_path(_path) for _path in (args.oids or "").split(",")] if args.oids else () @@ -6372,10 +6740,9 @@ def main(): # pragma: no cover } if args.defines_by_path is not None: ctx["defines_by_path"] = obj_by_path(args.defines_by_path) - obj, tail = schema().decode(der, ctx=ctx) from os import environ - print(pprinter( - obj, + pprinter = partial( + pprinter, oid_maps=oid_maps, with_colours=environ.get("NO_COLOR") is None, with_decode_path=args.print_decode_path, @@ -6383,7 +6750,13 @@ def main(): # pragma: no cover () if args.decode_path_only is None else tuple(args.decode_path_only.split(":")) ), - )) + ) + if args.evgen: + for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx): + print(pprinter(obj, decode_path=decode_path)) + else: + obj, tail = schema().decode(raw, ctx=ctx) + print(pprinter(obj)) if tail != b"": print("\nTrailing data: %s" % hexenc(tail))