X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=blobdiff_plain;f=pyderasn.py;h=7bb1799863c454bb22bdbdc70c75411c47635f04;hp=0d3a6637840115c15fdcf90bc898f095f0c640f7;hb=749705c0df79a03dfda53dc0f78044efba8590a6;hpb=517f9ab0bb42c9d89fade2aa69ec5da9501efc0a diff --git a/pyderasn.py b/pyderasn.py index 0d3a663..7bb1799 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # coding: utf-8 +# cython: language_level=3 # PyDERASN -- Python ASN.1 DER/BER codec with abstract structures # Copyright (C) 2017-2020 Sergey Matveev # @@ -74,9 +75,17 @@ tags simultaneously. There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc` functions, allowing you to easily create ``CONTEXT`` ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag -number. Pay attention that explicit tags always have *constructed* tag -(``tag_ctxc``), but implicit tags for primitive types are primitive -(``tag_ctxp``). +number. + +.. note:: + + EXPLICIT tags always have **constructed** tag. PyDERASN does not + explicitly check correctness of schema input here. + +.. note:: + + Implicit tags have **primitive** (``tag_ctxp``) encoding for + primitive values. :: @@ -223,6 +232,7 @@ Currently available context options: * :ref:`allow_unordered_set ` * :ref:`bered ` * :ref:`defines_by_path ` +* :ref:`evgen_mode_upto ` .. _pprinting: @@ -343,7 +353,6 @@ Let's parse that output, human:: (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime`` could be BERed. - .. _definedby: DEFINED BY @@ -419,8 +428,8 @@ defines_by_path context option ______________________________ Sometimes you either can not or do not want to explicitly set *defines* -in the scheme. You can dynamically apply those definitions when calling -``.decode()`` method. +in the schema. You can dynamically apply those definitions when calling +:py:meth:`pyderasn.Obj.decode` method. Specify ``defines_by_path`` key in the :ref:`decode context `. Its value must be sequence of following tuples:: @@ -484,9 +493,9 @@ useful for SEQUENCE/SET OF-s. BER encoding ------------ -By default PyDERASN accepts only DER encoded data. It always encodes to -DER. But you can optionally enable BER decoding with setting ``bered`` -:ref:`context ` argument to True. Indefinite lengths and +By default PyDERASN accepts only DER encoded data. By default it encodes +to DER. But you can optionally enable BER decoding with setting +``bered`` :ref:`context ` argument to True. Indefinite lengths and constructed primitive types should be parsed successfully. * If object is encoded in BER form (not the DER one), then ``ber_encoded`` @@ -525,6 +534,314 @@ lengths will be invalid in that case. This option should be used only for skipping some decode errors, just to see the decoded structure somehow. +.. _streaming: + +Streaming and dealing with huge structures +------------------------------------------ + +.. _evgen_mode: + +evgen mode +__________ + +ASN.1 structures can be huge, they can hold millions of objects inside +(for example Certificate Revocation Lists (CRL), holding revocation +state for every previously issued X.509 certificate). CACert.org's 8 MiB +CRL file takes more than half a gigabyte of memory to hold the decoded +structure. + +If you just simply want to check the signature over the ``tbsCertList``, +you can create specialized schema with that field represented as +OctetString for example:: + + class TBSCertListFast(Sequence): + schema = ( + [...] + ("revokedCertificates", OctetString( + impl=SequenceOf.tag_default, + optional=True, + )), + [...] + ) + +This allows you to quickly decode a few fields and check the signature +over the ``tbsCertList`` bytes. + +But how can you get all certificate's serial number from it, after you +trust that CRL after signature validation? You can use so called +``evgen`` (event generation) mode, to catch the events/facts of some +successful object decoding. Let's use command line capabilities:: + + $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl + 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL + 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13 + 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL + 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE + 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10 + 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY + 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE + 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF + [...] + 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11) + 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08 + 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime + 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08 + 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE + 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14) + 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01 + 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime + 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01 + 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE + [...] + 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime + 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06 + 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE + 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL + 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE + 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13 + 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL + 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE + 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits + 0 [1,4,9145534] CertificateList SEQUENCE + +Here we see how decoder works: it decodes SEQUENCE's tag, length, then +decodes underlying values. It can not tell if SEQUENCE is decoded, so +the event of the upper level SEQUENCE is the last one we see. +``version`` field is just a single INTEGER -- it is decoded and event is +fired immediately. Then we see that ``algorithm`` and ``parameters`` +fields are decoded and only after them the ``signature`` SEQUENCE is +fired as a successfully decoded. There are 4 events for each revoked +certificate entry in that CRL: ``userCertificate`` serial number, +``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself +as a one of entity in ``revokedCertificates`` SEQUENCE OF. + +We can do that in our ordinary Python code and understand where we are +by looking at deterministically generated decode paths (do not forget +about useful ``--print-decode-path`` CLI option). We must use +:py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary +:py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path, +obj, tail)`` tuples:: + + for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw): + if ( + len(decode_path) == 4 and + decode_path[:2] == ("tbsCertList", "revokedCertificates"), + decode_path[3] == "userCertificate" + ): + print("serial number:", int(obj)) + +Virtually it does not take any memory except at least needed for single +object storage. You can easily use that mode to determine required +object ``.offset`` and ``.*len`` to be able to decode it separately, or +maybe verify signature upon it just by taking bytes by ``.offset`` and +``.tlvlen``. + +.. _evgen_mode_upto_ctx: + +evgen_mode_upto +_______________ + +There is full ability to get any kind of data from the CRL in the +example above. However it is not too convenient to get the whole +``RevokedCertificate`` structure, that is pretty lightweight and one may +do not want to disassemble it. You can use ``evgen_mode_upto`` +:ref:`ctx ` option that semantically equals to +:ref:`defines_by_path ` -- list of decode paths +mapped to any non-None value. If specified decode path is met, then any +subsequent objects won't be decoded in evgen mode. That allows us to +parse the CRL above with fully assembled ``RevokedCertificate``:: + + for decode_path, obj, _ in CertificateList().decode_evgen( + crl_raw, + ctx={"evgen_mode_upto": ( + (("tbsCertList", "revokedCertificates", any), True), + )}, + ): + if ( + len(decode_path) == 3 and + decode_path[:2] == ("tbsCertList", "revokedCertificates"), + ): + print("serial number:", int(obj["userCertificate"])) + +.. _mmap: + +mmap-ed file +____________ + +POSIX compliant systems have ``mmap`` syscall, giving ability to work +the memory mapped file. You can deal with the file like it was an +ordinary binary string, allowing you not to load it to the memory first. +Also you can use them as an input for OCTET STRING, taking no Python +memory for their storage. + +There is convenient :py:func:`pyderasn.file_mmaped` function that +creates read-only memoryview on the file contents:: + + with open("huge", "rb") as fd: + raw = file_mmaped(fd) + obj = Something.decode(raw) + +.. warning:: + + mmap-ed files in Python2.7 does not implement buffer protocol, so + memoryview won't work on them. + +.. warning:: + + mmap maps the **whole** file. So it plays no role if you seek-ed it + before. Take the slice of the resulting memoryview with required + offset instead. + +.. note:: + + If you use ZFS as underlying storage, then pay attention that + currently most platforms does not deal good with ZFS ARC and ordinary + page cache used for mmaps. It can take twice the necessary size in + the memory: both in page cache and ZFS ARC. + +CER encoding +____________ + +We can parse any kind of data now, but how can we produce files +streamingly, without storing their encoded representation in memory? +SEQUENCE by default encodes in memory all its values, joins them in huge +binary string, just to know the exact size of SEQUENCE's value for +encoding it in TLV. DER requires you to know all exact sizes of the +objects. + +You can use CER encoding mode, that slightly differs from the DER, but +does not require exact sizes knowledge, allowing streaming encoding +directly to some writer/buffer. Just use +:py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where +encoded data will flow:: + + opener = io.open if PY2 else open + with opener("result", "wb") as fd: + obj.encode_cer(fd.write) + +:: + + buf = io.BytesIO() + obj.encode_cer(buf.write) + +If you do not want to create in-memory buffer every time, then you can +use :py:func:`pyderasn.encode_cer` function:: + + data = encode_cer(obj) + +Remember that CER is **not valid** DER in most cases, so you **have to** +use :ref:`bered ` :ref:`ctx ` option during its +decoding. Also currently there is **no** validation that provided CER is +valid one -- you are sure that it has only valid BER encoding. + +.. warning:: + + SET OF values can not be streamingly encoded, because they are + required to be sorted byte-by-byte. Big SET OF values still will take + much memory. Use neither SET nor SET OF values, as modern ASN.1 + also recommends too. + +Do not forget about using :ref:`mmap-ed ` memoryviews for your +OCTET STRINGs! They will be streamingly copied from underlying file to +the buffer using 1 KB chunks. + +Some structures require that some of the elements have to be forcefully +DER encoded. For example ``SignedData`` CMS requires you to encode +``SignedAttributes`` and X.509 certificates in DER form, allowing you to +encode everything else in BER. You can tell any of the structures to be +forcefully encoded in DER during CER encoding, by specifying +``der_forced=True`` attribute:: + + class Certificate(Sequence): + schema = (...) + der_forced = True + + class SignedAttributes(SetOf): + schema = Attribute() + bounds = (1, 32) + der_forced = True + +agg_octet_string +________________ + +In most cases, huge quantity of binary data is stored as OCTET STRING. +CER encoding splits it on 1 KB chunks. BER allows splitting on various +levels of chunks inclusion:: + + SOME STRING[CONSTRUCTED] + OCTET STRING[CONSTRUCTED] + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[CONSTRUCTED] + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[CONSTRUCTED] + OCTET STRING[CONSTRUCTED] + OCTET STRING[PRIMITIVE] + DATA CHUNK + +You can not just take the offset and some ``.vlen`` of the STRING and +treat it as the payload. If you decode it without +:ref:`evgen mode `, then it will be automatically aggregated +and ``bytes()`` will give the whole payload contents. + +You are forced to use :ref:`evgen mode ` for decoding for +small memory footprint. There is convenient +:py:func:`pyderasn.agg_octet_string` helper for reconstructing the +payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with +huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the +SHA512 digest of its ``eContent``:: + + fd = open("data.p7m", "rb") + raw = file_mmaped(fd) + ctx = {"bered": True} + for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx): + if decode_path == ("content",): + content = obj + break + else: + raise ValueError("no content found") + hasher_state = sha512() + def hasher(data): + hasher_state.update(data) + return len(data) + evgens = SignedData().decode_evgen( + raw[content.offset:], + offset=content.offset, + ctx=ctx, + ) + agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher) + fd.close() + digest = hasher_state.digest() + +Simply replace ``hasher`` with some writeable file's ``fd.write`` to +copy the payload (without BER/CER encoding interleaved overhead) in it. +Virtually it won't take memory more than for keeping small structures +and 1 KB binary chunks. + +SEQUENCE OF iterators +_____________________ + +You can use iterators as a value in :py:class:`pyderasn.SequenceOf` +classes. The only difference with providing the full list of objects, is +that type and bounds checking is done during encoding process. Also +sequence's value will be emptied after encoding, forcing you to set its +value again. + +This is very useful when you have to create some huge objects, like +CRLs, with thousands and millions of entities inside. You can write the +generator taking necessary data from the database and giving the +``RevokedCertificate`` objects. Only binary representation of that +objects will take memory during DER encoding. + Base Obj -------- .. autoclass:: pyderasn.Obj @@ -541,12 +858,12 @@ _______ Integer _______ .. autoclass:: pyderasn.Integer - :members: __init__ + :members: __init__, named BitString _________ .. autoclass:: pyderasn.BitString - :members: __init__ + :members: __init__, bit_len, named OctetString ___________ @@ -578,7 +895,7 @@ _____________ PrintableString _______________ .. autoclass:: pyderasn.PrintableString - :members: __init__ + :members: __init__, allow_asterisk, allow_ampersand UTCTime _______ @@ -588,6 +905,7 @@ _______ GeneralizedTime _______________ .. autoclass:: pyderasn.GeneralizedTime + :members: __init__, todatetime Special types ------------- @@ -595,7 +913,7 @@ Special types Choice ______ .. autoclass:: pyderasn.Choice - :members: __init__ + :members: __init__, choice, value PrimitiveTypes ______________ @@ -633,7 +951,10 @@ Various ------- .. autofunction:: pyderasn.abs_decode_path +.. autofunction:: pyderasn.agg_octet_string .. autofunction:: pyderasn.colonize_hex +.. autofunction:: pyderasn.encode_cer +.. autofunction:: pyderasn.file_mmaped .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec .. autofunction:: pyderasn.tag_encode @@ -652,8 +973,118 @@ Various .. autoclass:: pyderasn.ObjNotReady .. autoclass:: pyderasn.InvalidValueType .. autoclass:: pyderasn.BoundsError + +.. _cmdline: + +Command-line usage +------------------ + +You can decode DER/BER files using command line abilities:: + + $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file + +If there is no schema for your file, then you can try parsing it without, +but of course IMPLICIT tags will often make it impossible. But result is +good enough for the certificate above:: + + $ python -m pyderasn path/to/file + 0 [1,3,1604] . >: SEQUENCE OF + 4 [1,3,1453] . . >: SEQUENCE OF + 8 [0,0, 5] . . . . >: [0] ANY + . . . . . A0:03:02:01:02 + 13 [1,1, 3] . . . . >: INTEGER 61595 + 18 [1,1, 13] . . . . >: SEQUENCE OF + 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [1,1, 0] . . . . . . >: NULL + 33 [1,3, 274] . . . . >: SEQUENCE OF + 37 [1,1, 11] . . . . . . >: SET OF + 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF + 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6 + 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES + [...] + 1409 [1,1, 50] . . . . . . >: SEQUENCE OF + 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1 + 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes + . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16 + . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63 + . . . . . . . . . 61:2E:63:6F:6D:2F + 1461 [1,1, 13] . . >: SEQUENCE OF + 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 1474 [1,1, 0] . . . . >: NULL + 1476 [1,2, 129] . . >: BIT STRING 1024 bits + . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD + . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C + [...] + +Human readable OIDs +___________________ + +If you have got dictionaries with ObjectIdentifiers, like example one +from ``tests/test_crts.py``:: + + stroid2name = { + "1.2.840.113549.1.1.1": "id-rsaEncryption", + "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption", + [...] + "2.5.4.10": "id-at-organizationName", + "2.5.4.11": "id-at-organizationalUnitName", + } + +then you can pass it to pretty printer to see human readable OIDs:: + + $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file + [...] + 37 [1,1, 11] . . . . . . >: SET OF + 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF + 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6) + 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES + 50 [1,1, 18] . . . . . . >: SET OF + 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF + 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8) + 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona + 70 [1,1, 18] . . . . . . >: SET OF + 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF + 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7) + 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona + [...] + +Decode paths +____________ + +Each decoded element has so-called decode path: sequence of structure +names it is passing during the decode process. Each element has its own +unique path inside the whole ASN.1 tree. You can print it out with +``--print-decode-path`` option:: + + $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file + 0 [1,3,1604] Certificate SEQUENCE [] + 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate] + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version] + 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber] + 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature] + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm] + 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters] + . . . . 05:00 + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer] + 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence] + 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0] + 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0] + 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type] + 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value] + . . . . . . . 13:02:45:53 + 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6] + [...] + +Now you can print only the specified tree, for example signature algorithm:: + + $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file + 18 [1,1, 13] AlgorithmIdentifier SEQUENCE + 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL + . . 05:00 """ +from array import array from codecs import getdecoder from codecs import getencoder from collections import namedtuple @@ -661,10 +1092,14 @@ from collections import OrderedDict from copy import copy from datetime import datetime from datetime import timedelta +from io import BytesIO from math import ceil -from os import environ +from mmap import mmap +from mmap import PROT_READ +from operator import attrgetter from string import ascii_letters from string import digits +from sys import version_info from unicodedata import category as unicat from six import add_metaclass @@ -689,9 +1124,10 @@ except ImportError: # pragma: no cover def colored(what, *args, **kwargs): return what -__version__ = "6.1" +__version__ = "7.0" __all__ = ( + "agg_octet_string", "Any", "BitString", "BMPString", @@ -700,8 +1136,10 @@ __all__ = ( "Choice", "DecodeError", "DecodePathDefBy", + "encode_cer", "Enumerated", "ExceedingData", + "file_mmaped", "GeneralizedTime", "GeneralString", "GraphicString", @@ -763,6 +1201,42 @@ EOC = b"\x00\x00" EOC_LEN = len(EOC) LENINDEF = b"\x80" # length indefinite mark LENINDEF_PP_CHAR = "I" if PY2 else "∞" +NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__} +SET01 = frozenset("01") +DECIMALS = frozenset(digits) +DECIMAL_SIGNS = ".," +NEXT_ATTR_NAME = "next" if PY2 else "__next__" + + +def file_mmaped(fd): + """Make mmap-ed memoryview for reading from file + + :param fd: file object + :returns: memoryview over read-only mmap-ing of the whole file + """ + return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ)) + +def pureint(value): + if not set(value) <= DECIMALS: + raise ValueError("non-pure integer") + return int(value) + +def fractions2float(fractions_raw): + pureint(fractions_raw) + return float("0." + fractions_raw) + + +def get_def_by_path(defines_by_path, sub_decode_path): + """Get define by decode path + """ + for path, define in defines_by_path: + if len(path) != len(sub_decode_path): + continue + for p1, p2 in zip(path, sub_decode_path): + if (not p1 is any) and (p1 != p2): + break + else: + return define ######################################################################## @@ -1044,6 +1518,27 @@ def len_decode(data): return l, 1 + octets_num, data[1 + octets_num:] +LEN1K = len_encode(1000) + + +def write_full(writer, data): + """Fully write provided data + + :param writer: must comply with ``io.RawIOBase.write`` behaviour + + BytesIO does not guarantee that the whole data will be written at + once. That function write everything provided, raising an error if + ``writer`` returns None. + """ + data = memoryview(data) + written = 0 + while written != len(data): + n = writer(data[written:]) + if n is None: + raise ValueError("can not write to buf") + written += n + + ######################################################################## # Base class ######################################################################## @@ -1054,6 +1549,22 @@ class AutoAddSlots(type): return type.__new__(cls, name, bases, _dict) +BasicState = namedtuple("BasicState", ( + "version", + "tag", + "tag_order", + "expl", + "default", + "optional", + "offset", + "llen", + "vlen", + "expl_lenindef", + "lenindef", + "ber_encoded", +), **NAMEDTUPLE_KWARGS) + + @add_metaclass(AutoAddSlots) class Obj(object): """Common ASN.1 object class @@ -1063,6 +1574,7 @@ class Obj(object): """ __slots__ = ( "tag", + "_tag_order", "_value", "_expl", "default", @@ -1087,6 +1599,13 @@ class Obj(object): self._expl = getattr(self, "expl", None) if expl is None else expl if self.tag != self.tag_default and self._expl is not None: raise ValueError("implicit and explicit tags can not be set simultaneously") + if self.tag is None: + self._tag_order = None + else: + tag_class, _, tag_num = tag_decode( + self.tag if self._expl is None else self._expl + ) + self._tag_order = (tag_class, tag_num) if default is not None: optional = True self.optional = optional @@ -1126,17 +1645,27 @@ class Obj(object): def __setstate__(self, state): if state.version != __version__: raise ValueError("data is pickled by different PyDERASN version") - self.tag = self.tag_default - self._value = None - self._expl = None - self.default = None - self.optional = False - self.offset = 0 - self.llen = 0 - self.vlen = 0 - self.expl_lenindef = False - self.lenindef = False - self.ber_encoded = False + self.tag = state.tag + self._tag_order = state.tag_order + self._expl = state.expl + self.default = state.default + self.optional = state.optional + self.offset = state.offset + self.llen = state.llen + self.vlen = state.vlen + self.expl_lenindef = state.expl_lenindef + self.lenindef = state.lenindef + self.ber_encoded = state.ber_encoded + + @property + def tag_order(self): + """Tag's (class, number) used for DER/CER sorting + """ + return self._tag_order + + @property + def tag_order_cer(self): + return self.tag_order @property def tlen(self): @@ -1168,11 +1697,11 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): # pragma: no cover - raise NotImplementedError() + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover + yield NotImplemented def encode(self): - """Encode the structure + """DER encode the structure :returns: DER representation """ @@ -1181,6 +1710,26 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) + def encode_cer(self, writer): + """CER encode the structure to specified writer + + :param writer: must comply with ``io.RawIOBase.write`` + behaviour. It takes slice to be written and + returns number of bytes processed. If it returns + None, then exception will be raised + """ + if self._expl is not None: + write_full(writer, self._expl + LENINDEF) + if getattr(self, "der_forced", False): + write_full(writer, self._encode()) + else: + self._encode_cer(writer) + if self._expl is not None: + write_full(writer, EOC) + + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def hexencode(self): """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode` """ @@ -1202,32 +1751,77 @@ class Obj(object): :param int offset: initial data's offset :param bool leavemm: do we need to leave memoryview of remaining data as is, or convert it to bytes otherwise + :param decode_path: current decode path (tuples of strings, + possibly with DecodePathDefBy) with will be + the root for all underlying objects :param ctx: optional :ref:`context ` governing decoding process - :param tag_only: decode only the tag, without length and contents - (used only in Choice and Set structures, trying to - determine if tag satisfies the scheme) - :param _ctx_immutable: do we need to ``copy.copy()`` ``ctx`` - before using it? + :param bool tag_only: decode only the tag, without length and + contents (used only in Choice and Set + structures, trying to determine if tag satisfies + the schema) + :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx`` + before using it? :returns: (Obj, remaining data) .. seealso:: :ref:`decoding` """ + result = next(self.decode_evgen( + data, + offset, + leavemm, + decode_path, + ctx, + tag_only, + _ctx_immutable, + _evgen_mode=False, + )) + if result is None: + return None + _, obj, tail = result + return obj, tail + + def decode_evgen( + self, + data, + offset=0, + leavemm=False, + decode_path=(), + ctx=None, + tag_only=False, + _ctx_immutable=True, + _evgen_mode=True, + ): + """Decode with evgen mode on + + That method is identical to :py:meth:`pyderasn.Obj.decode`, but + it returns the generator producing ``(decode_path, obj, tail)`` + values. See :ref:`evgen mode `. + """ if ctx is None: ctx = {} elif _ctx_immutable: ctx = copy(ctx) tlv = memoryview(data) + if ( + _evgen_mode and + get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None + ): + _evgen_mode = False if self._expl is None: - result = self._decode( - tlv, - offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: - return None - obj, tail = result + for result in self._decode( + tlv, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result else: try: t, tlen, lv = tag_strip(tlv) @@ -1256,16 +1850,20 @@ class Obj(object): ) llen, v = 1, lv[1:] offset += tlen + llen - result = self._decode( - v, - offset=offset, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: raise DecodeError( @@ -1291,16 +1889,20 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - result = self._decode( - v, - offset=offset + tlen + llen, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: # pragma: no cover - return None - obj, tail = result + for result in self._decode( + v, + offset=offset + tlen + llen, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + evgen_mode=_evgen_mode, + ): + if tag_only: # pragma: no cover + yield None + return + _decode_path, obj, tail = result + if not _decode_path is decode_path: + yield result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( "explicit tag out-of-bound, longer than data", @@ -1308,7 +1910,7 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - return obj, (tail if leavemm else tail.tobytes()) + yield decode_path, obj, (tail if leavemm else tail.tobytes()) def decod(self, data, offset=0, decode_path=(), ctx=None): """Decode the data, check that tail is empty @@ -1429,6 +2031,16 @@ class Obj(object): ) +def encode_cer(obj): + """Encode to CER in memory buffer + + :returns bytes: memory buffer contents + """ + buf = BytesIO() + obj.encode_cer(buf.write) + return buf.getvalue() + + class DecodePathDefBy(object): """DEFINED BY representation inside decode path """ @@ -1479,7 +2091,7 @@ PP = namedtuple("PP", ( "lenindef", "ber_encoded", "bered", -)) +), **NAMEDTUPLE_KWARGS) def _pp( @@ -1629,9 +2241,9 @@ def pp_console_row( with_colours, )) if with_blob: - if isinstance(pp.blob, binary_type): + if pp.blob.__class__ == binary_type: cols.append(hexenc(pp.blob)) - elif isinstance(pp.blob, tuple): + elif pp.blob.__class__ == tuple: cols.append(", ".join(pp.blob)) if pp.optional: cols.append(_colourize("OPTIONAL", "red", with_colours)) @@ -1651,12 +2263,12 @@ def pp_console_blob(pp, decode_path_len_decrease=0): decode_path_len = len(pp.decode_path) - decode_path_len_decrease if decode_path_len > 0: cols.append(" ." * (decode_path_len + 1)) - if isinstance(pp.blob, binary_type): + if pp.blob.__class__ == binary_type: blob = hexenc(pp.blob).upper() for i in six_xrange(0, len(blob), 32): chunk = blob[i:i + 32] yield " ".join(cols + [colonize_hex(chunk)]) - elif isinstance(pp.blob, tuple): + elif pp.blob.__class__ == tuple: yield " ".join(cols + [", ".join(pp.blob)]) @@ -1667,6 +2279,7 @@ def pprint( with_colours=False, with_decode_path=False, decode_path_only=(), + decode_path=(), ): """Pretty print object @@ -1719,27 +2332,18 @@ def pprint( else: for row in _pprint_pps(pp): yield row - return "\n".join(_pprint_pps(obj.pps())) + return "\n".join(_pprint_pps(obj.pps(decode_path))) ######################################################################## # ASN.1 primitive types ######################################################################## -BooleanState = namedtuple("BooleanState", ( - "version", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +BooleanState = namedtuple( + "BooleanState", + BasicState._fields + ("value",), + **NAMEDTUPLE_KWARGS +) class Boolean(Obj): @@ -1786,7 +2390,7 @@ class Boolean(Obj): self._value = default def _value_sanitize(self, value): - if isinstance(value, bool): + if value.__class__ == bool: return value if issubclass(value.__class__, Boolean): return value._value @@ -1799,8 +2403,8 @@ class Boolean(Obj): def __getstate__(self): return BooleanState( __version__, - self._value, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -1810,21 +2414,12 @@ class Boolean(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, ) def __setstate__(self, state): super(Boolean, self).__setstate__(state) self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __nonzero__(self): self._assert_ready() @@ -1835,7 +2430,7 @@ class Boolean(Obj): return self._value def __eq__(self, their): - if isinstance(their, bool): + if their.__class__ == bool: return self._value == their if not issubclass(their.__class__, Boolean): return False @@ -1869,7 +2464,7 @@ class Boolean(Obj): (b"\xFF" if self._value else b"\x00"), )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1886,7 +2481,8 @@ class Boolean(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -1935,7 +2531,7 @@ class Boolean(Obj): _decoded=(offset, 1, 1), ) obj.ber_encoded = ber_encoded - return obj, v[1:] + yield decode_path, obj, v[1:] def __repr__(self): return pp_console_row(next(self.pps())) @@ -1967,23 +2563,11 @@ class Boolean(Obj): yield pp -IntegerState = namedtuple("IntegerState", ( - "version", - "specs", - "value", - "bound_min", - "bound_max", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +IntegerState = namedtuple( + "IntegerState", + BasicState._fields + ("specs", "value", "bound_min", "bound_max"), + **NAMEDTUPLE_KWARGS +) class Integer(Obj): @@ -2049,7 +2633,7 @@ class Integer(Obj): super(Integer, self).__init__(impl, expl, default, optional, _decoded) self._value = value specs = getattr(self, "schema", {}) if _specs is None else _specs - self.specs = specs if isinstance(specs, dict) else dict(specs) + self.specs = specs if specs.__class__ == dict else dict(specs) self._bound_min, self._bound_max = getattr( self, "bounds", @@ -2073,7 +2657,7 @@ class Integer(Obj): pass elif issubclass(value.__class__, Integer): value = value._value - elif isinstance(value, str): + elif value.__class__ == str: value = self.specs.get(value) if value is None: raise ObjUnknown("integer value: %s" % value) @@ -2090,11 +2674,8 @@ class Integer(Obj): def __getstate__(self): return IntegerState( __version__, - self.specs, - self._value, - self._bound_min, - self._bound_max, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2104,6 +2685,10 @@ class Integer(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + self._value, + self._bound_min, + self._bound_max, ) def __setstate__(self, state): @@ -2112,16 +2697,6 @@ class Integer(Obj): self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __int__(self): self._assert_ready() @@ -2151,6 +2726,8 @@ class Integer(Obj): @property def named(self): + """Return named representation (if exists) of the value + """ for name, value in iteritems(self.specs): if value == self._value: return name @@ -2217,7 +2794,7 @@ class Integer(Obj): break return b"".join((self.tag, len_encode(len(octets)), octets)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2234,7 +2811,8 @@ class Integer(Obj): offset=offset, ) if tag_only: - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2305,7 +2883,7 @@ class Integer(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -2336,24 +2914,11 @@ class Integer(Obj): yield pp -SET01 = frozenset(("0", "1")) -BitStringState = namedtuple("BitStringState", ( - "version", - "specs", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "tag_constructed", - "defined", -)) +BitStringState = namedtuple( + "BitStringState", + BasicState._fields + ("specs", "value", "tag_constructed", "defined"), + **NAMEDTUPLE_KWARGS +) class BitString(Obj): @@ -2431,7 +2996,7 @@ class BitString(Obj): """ super(BitString, self).__init__(impl, expl, default, optional, _decoded) specs = getattr(self, "schema", {}) if _specs is None else _specs - self.specs = specs if isinstance(specs, dict) else dict(specs) + self.specs = specs if specs.__class__ == dict else dict(specs) self._value = None if value is None else self._value_sanitize(value) if default is not None: default = self._value_sanitize(default) @@ -2477,14 +3042,14 @@ class BitString(Obj): len(value) * 4, hexdec(value + ("" if len(value) % 2 == 0 else "0")), ) - if isinstance(value, binary_type): + if value.__class__ == binary_type: return (len(value) * 8, value) raise InvalidValueType((self.__class__, string_types, binary_type)) - if isinstance(value, tuple): + if value.__class__ == tuple: if ( len(value) == 2 and isinstance(value[0], integer_types) and - isinstance(value[1], binary_type) + value[1].__class__ == binary_type ): return value bits = [] @@ -2511,9 +3076,8 @@ class BitString(Obj): def __getstate__(self): return BitStringState( __version__, - self.specs, - self._value, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2523,6 +3087,8 @@ class BitString(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + self._value, self.tag_constructed, self.defined, ) @@ -2531,16 +3097,6 @@ class BitString(Obj): super(BitString, self).__setstate__(state) self.specs = state.specs self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.tag_constructed = state.tag_constructed self.defined = state.defined @@ -2551,6 +3107,8 @@ class BitString(Obj): @property def bit_len(self): + """Returns number of bits in the string + """ self._assert_ready() return self._value[0] @@ -2559,7 +3117,7 @@ class BitString(Obj): return self._value[1] def __eq__(self, their): - if isinstance(their, bytes): + if their.__class__ == bytes: return self._value[1] == their if not issubclass(their.__class__, BitString): return False @@ -2571,6 +3129,10 @@ class BitString(Obj): @property def named(self): + """Named representation (if exists) of the bits + + :returns: [str(name), ...] + """ return [name for name, bit in iteritems(self.specs) if self[bit]] def __call__( @@ -2591,7 +3153,7 @@ class BitString(Obj): ) def __getitem__(self, key): - if isinstance(key, int): + if key.__class__ == int: bit_len, octets = self._value if key >= bit_len: return False @@ -2616,65 +3178,31 @@ class BitString(Obj): octets, )) - def _decode_chunk(self, lv, offset, decode_path): - try: - l, llen, v = len_decode(lv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if l > len(v): - raise NotEnoughData( - "encoded length is longer than data", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if l == 0: - raise NotEnoughData( - "zero length", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - pad_size = byte2int(v) - if l == 1 and pad_size != 0: - raise DecodeError( - "invalid empty value", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if pad_size > 7: - raise DecodeError( - "too big pad", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0: - raise DecodeError( - "invalid pad", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - v, tail = v[:l], v[l:] - obj = self.__class__( - value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), - impl=self.tag, - expl=self._expl, - default=self.default, - optional=self.optional, - _specs=self.specs, - _decoded=(offset, llen, l), - ) - return obj, tail - - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + bit_len, octets = self._value + if len(octets) + 1 <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 999) * 999, 999): + write_full(writer, b"".join(( + BitString.tag_default, + LEN1K, + int2byte(0), + octets[offset:offset + 999], + ))) + tail = octets[offset+999:] + if len(tail) > 0: + tail = int2byte((8 - bit_len % 8) % 8) + tail + write_full(writer, b"".join(( + BitString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -2686,24 +3214,10 @@ class BitString(Obj): ) if t == self.tag: if tag_only: # pragma: no cover - return None - return self._decode_chunk(lv, offset, decode_path) - if t == self.tag_constructed: - if not ctx.get("bered", False): - raise DecodeError( - "unallowed BER constructed encoding", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: # pragma: no cover - return None - lenindef = False + yield None + return try: l, llen, v = len_decode(lv) - except LenIndefForm: - llen, l, v = 1, 0, lv[1:] - lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -2718,90 +3232,181 @@ class BitString(Obj): decode_path=decode_path, offset=offset, ) - if not lenindef and l == 0: + if l == 0: raise NotEnoughData( "zero length", klass=self.__class__, decode_path=decode_path, offset=offset, ) - chunks = [] - sub_offset = offset + tlen + llen - vlen = 0 - while True: - if lenindef: - if v[:EOC_LEN].tobytes() == EOC: - break - else: - if vlen == l: - break - if vlen > l: - raise DecodeError( - "chunk out of bounds", - klass=self.__class__, - decode_path=decode_path + (str(len(chunks) - 1),), - offset=chunks[-1].offset, - ) - sub_decode_path = decode_path + (str(len(chunks)),) - try: - chunk, v_tail = BitString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) - except TagMismatch: - raise DecodeError( - "expected BitString encoded chunk", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - chunks.append(chunk) - sub_offset += chunk.tlvlen - vlen += chunk.tlvlen - v = v_tail - if len(chunks) == 0: + pad_size = byte2int(v) + if l == 1 and pad_size != 0: raise DecodeError( - "no chunks", + "invalid empty value", klass=self.__class__, decode_path=decode_path, offset=offset, ) - values = [] - bit_len = 0 - for chunk_i, chunk in enumerate(chunks[:-1]): - if chunk.bit_len % 8 != 0: - raise DecodeError( - "BitString chunk is not multiple of 8 bits", - klass=self.__class__, - decode_path=decode_path + (str(chunk_i),), - offset=chunk.offset, - ) - values.append(bytes(chunk)) - bit_len += chunk.bit_len - chunk_last = chunks[-1] - values.append(bytes(chunk_last)) - bit_len += chunk_last.bit_len + if pad_size > 7: + raise DecodeError( + "too big pad", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0: + raise DecodeError( + "invalid pad", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] + bit_len = (len(v) - 1) * 8 - pad_size obj = self.__class__( - value=(bit_len, b"".join(values)), + value=None if evgen_mode else (bit_len, v[1:].tobytes()), impl=self.tag, expl=self._expl, default=self.default, optional=self.optional, _specs=self.specs, - _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + _decoded=(offset, llen, l), ) - obj.lenindef = lenindef - obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, + if evgen_mode: + obj._value = (bit_len, None) + yield decode_path, obj, tail + return + if t != self.tag_constructed: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not ctx.get("bered", False): + raise DecodeError( + "unallowed BER constructed encoding", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if tag_only: # pragma: no cover + yield None + return + lenindef = False + try: + l, llen, v = len_decode(lv) + except LenIndefForm: + llen, l, v = 1, 0, lv[1:] + lenindef = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not lenindef and l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if lenindef: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + "chunk out of bounds", + klass=self.__class__, + decode_path=decode_path + (str(len(chunks) - 1),), + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + if evgen_mode: + for _decode_path, chunk, v_tail in BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + else: + _, chunk, v_tail = next(BitString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + except TagMismatch: + raise DecodeError( + "expected BitString encoded chunk", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if len(chunks) == 0: + raise DecodeError( + "no chunks", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + values = [] + bit_len = 0 + for chunk_i, chunk in enumerate(chunks[:-1]): + if chunk.bit_len % 8 != 0: + raise DecodeError( + "BitString chunk is not multiple of 8 bits", + klass=self.__class__, + decode_path=decode_path + (str(chunk_i),), + offset=chunk.offset, + ) + if not evgen_mode: + values.append(bytes(chunk)) + bit_len += chunk.bit_len + chunk_last = chunks[-1] + if not evgen_mode: + values.append(bytes(chunk_last)) + bit_len += chunk_last.bit_len + obj = self.__class__( + value=None if evgen_mode else (bit_len, b"".join(values)), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _specs=self.specs, + _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), ) + if evgen_mode: + obj._value = (bit_len, None) + obj.lenindef = lenindef + obj.ber_encoded = True + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -2812,7 +3417,7 @@ class BitString(Obj): if self.ready: bit_len, blob = self._value value = "%d bits" % bit_len - if len(self.specs) > 0: + if len(self.specs) > 0 and blob is not None: blob = tuple(self.named) yield _pp( obj=self, @@ -2847,24 +3452,17 @@ class BitString(Obj): yield pp -OctetStringState = namedtuple("OctetStringState", ( - "version", - "value", - "bound_min", - "bound_max", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "tag_constructed", - "defined", -)) +OctetStringState = namedtuple( + "OctetStringState", + BasicState._fields + ( + "value", + "bound_min", + "bound_max", + "tag_constructed", + "defined", + ), + **NAMEDTUPLE_KWARGS +) class OctetString(Obj): @@ -2883,17 +3481,15 @@ class OctetString(Obj): >>> OctetString(b"hell", bounds=(4, 4)) OCTET STRING 4 bytes 68656c6c - .. note:: - - Pay attention that OCTET STRING can be encoded both in primitive - and constructed forms. Decoder always checks constructed form tag - additionally to specified primitive one. If BER decoding is - :ref:`not enabled `, then decoder will fail, because - of DER restrictions. + Memoryviews can be used as a values. If memoryview is made on + mmap-ed file, then it does not take storage inside OctetString + itself. In CER encoding mode it will be streamed to the specified + writer, copying 1 KB chunks. """ __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" + evgen_mode_skip_value = True def __init__( self, @@ -2943,12 +3539,12 @@ class OctetString(Obj): ) def _value_sanitize(self, value): - if isinstance(value, binary_type): + if value.__class__ == binary_type or value.__class__ == memoryview: pass elif issubclass(value.__class__, OctetString): value = value._value else: - raise InvalidValueType((self.__class__, bytes)) + raise InvalidValueType((self.__class__, bytes, memoryview)) if not self._bound_min <= len(value) <= self._bound_max: raise BoundsError(self._bound_min, len(value), self._bound_max) return value @@ -2960,10 +3556,8 @@ class OctetString(Obj): def __getstate__(self): return OctetStringState( __version__, - self._value, - self._bound_min, - self._bound_max, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -2973,6 +3567,9 @@ class OctetString(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, + self._bound_min, + self._bound_max, self.tag_constructed, self.defined, ) @@ -2982,25 +3579,15 @@ class OctetString(Obj): self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.tag_constructed = state.tag_constructed self.defined = state.defined def __bytes__(self): self._assert_ready() - return self._value + return bytes(self._value) def __eq__(self, their): - if isinstance(their, binary_type): + if their.__class__ == binary_type: return self._value == their if not issubclass(their.__class__, OctetString): return False @@ -3042,9 +3629,118 @@ class OctetString(Obj): self._value, )) - def _decode_chunk(self, lv, offset, decode_path, ctx): + def _encode_cer(self, writer): + octets = self._value + if len(octets) <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000): + write_full(writer, b"".join(( + OctetString.tag_default, + LEN1K, + octets[offset:offset + 1000], + ))) + tail = octets[offset+1000:] + if len(tail) > 0: + write_full(writer, b"".join(( + OctetString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t == self.tag: + if tag_only: + yield None + return + try: + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] + if evgen_mode and not self._bound_min <= len(v) <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, len(v), self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + obj = self.__class__( + value=( + None if (evgen_mode and self.evgen_mode_skip_value) + else v.tobytes() + ), + bounds=(self._bound_min, self._bound_max), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, l), + ctx=ctx, + ) + except DecodeError as err: + raise DecodeError( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + except BoundsError as err: + raise DecodeError( + msg=str(err), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + yield decode_path, obj, tail + return + if t != self.tag_constructed: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not ctx.get("bered", False): + raise DecodeError( + "unallowed BER constructed encoding", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if tag_only: + yield None + return + lenindef = False try: l, llen, v = len_decode(lv) + except LenIndefForm: + llen, l, v = 1, 0, lv[1:] + lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -3059,16 +3755,81 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - v, tail = v[:l], v[l:] + chunks = [] + chunks_count = 0 + sub_offset = offset + tlen + llen + vlen = 0 + payload_len = 0 + while True: + if lenindef: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + "chunk out of bounds", + klass=self.__class__, + decode_path=decode_path + (str(len(chunks) - 1),), + offset=chunks[-1].offset, + ) + try: + if evgen_mode: + sub_decode_path = decode_path + (str(chunks_count),) + for _decode_path, chunk, v_tail in OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, chunk, v_tail + if not chunk.ber_encoded: + payload_len += chunk.vlen + chunks_count += 1 + else: + sub_decode_path = decode_path + (str(len(chunks)),) + _, chunk, v_tail = next(OctetString().decode_evgen( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + chunks.append(chunk) + except TagMismatch: + raise DecodeError( + "expected OctetString encoded chunk", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if evgen_mode and not self._bound_min <= payload_len <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=v.tobytes(), + value=( + None if evgen_mode else + b"".join(bytes(chunk) for chunk in chunks) + ), bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, default=self.default, optional=self.optional, - _decoded=(offset, llen, l), + _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), ctx=ctx, ) except DecodeError as err: @@ -3085,123 +3846,9 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) - return obj, tail - - def _decode(self, tlv, offset, decode_path, ctx, tag_only): - try: - t, tlen, lv = tag_strip(tlv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if t == self.tag: - if tag_only: - return None - return self._decode_chunk(lv, offset, decode_path, ctx) - if t == self.tag_constructed: - if not ctx.get("bered", False): - raise DecodeError( - "unallowed BER constructed encoding", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: - return None - lenindef = False - try: - l, llen, v = len_decode(lv) - except LenIndefForm: - llen, l, v = 1, 0, lv[1:] - lenindef = True - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if l > len(v): - raise NotEnoughData( - "encoded length is longer than data", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - chunks = [] - sub_offset = offset + tlen + llen - vlen = 0 - while True: - if lenindef: - if v[:EOC_LEN].tobytes() == EOC: - break - else: - if vlen == l: - break - if vlen > l: - raise DecodeError( - "chunk out of bounds", - klass=self.__class__, - decode_path=decode_path + (str(len(chunks) - 1),), - offset=chunks[-1].offset, - ) - sub_decode_path = decode_path + (str(len(chunks)),) - try: - chunk, v_tail = OctetString().decode( - v, - offset=sub_offset, - decode_path=sub_decode_path, - leavemm=True, - ctx=ctx, - _ctx_immutable=False, - ) - except TagMismatch: - raise DecodeError( - "expected OctetString encoded chunk", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - chunks.append(chunk) - sub_offset += chunk.tlvlen - vlen += chunk.tlvlen - v = v_tail - try: - obj = self.__class__( - value=b"".join(bytes(chunk) for chunk in chunks), - bounds=(self._bound_min, self._bound_max), - impl=self.tag, - expl=self._expl, - default=self.default, - optional=self.optional, - _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), - ctx=ctx, - ) - except DecodeError as err: - raise DecodeError( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - except BoundsError as err: - raise DecodeError( - msg=str(err), - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - obj.lenindef = lenindef - obj.ber_encoded = True - return obj, (v[EOC_LEN:] if lenindef else v) - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) + obj.lenindef = lenindef + obj.ber_encoded = True + yield decode_path, obj, (v[EOC_LEN:] if lenindef else v) def __repr__(self): return pp_console_row(next(self.pps())) @@ -3240,19 +3887,32 @@ class OctetString(Obj): yield pp -NullState = namedtuple("NullState", ( - "version", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +def agg_octet_string(evgens, decode_path, raw, writer): + """Aggregate constructed string (OctetString and its derivatives) + + :param evgens: iterator of generated events + :param decode_path: points to the string we want to decode + :param raw: slicebable (memoryview, bytearray, etc) with + the data evgens are generated on + :param writer: buffer.write where string is going to be saved + :param writer: where string is going to be saved. Must comply + with ``io.RawIOBase.write`` behaviour + """ + decode_path_len = len(decode_path) + for dp, obj, _ in evgens: + if dp[:decode_path_len] != decode_path: + continue + if not obj.ber_encoded: + write_full(writer, raw[ + obj.offset + obj.tlen + obj.llen: + obj.offset + obj.tlen + obj.llen + obj.vlen - + (EOC_LEN if obj.expl_lenindef else 0) + ]) + if len(dp) == decode_path_len: + break + + +NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS) class Null(Obj): @@ -3291,6 +3951,7 @@ class Null(Obj): return NullState( __version__, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3302,19 +3963,6 @@ class Null(Obj): self.ber_encoded, ) - def __setstate__(self, state): - super(Null, self).__setstate__(state) - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded - def __eq__(self, their): if not issubclass(their.__class__, Null): return False @@ -3339,7 +3987,7 @@ class Null(Obj): def _encode(self): return self.tag + len_encode(0) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3356,7 +4004,8 @@ class Null(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -3379,7 +4028,7 @@ class Null(Obj): optional=self.optional, _decoded=(offset, 1, 0), ) - return obj, v + yield decode_path, obj, v def __repr__(self): return pp_console_row(next(self.pps())) @@ -3408,28 +4057,11 @@ class Null(Obj): yield pp -ObjectIdentifierState = namedtuple("ObjectIdentifierState", ( - "version", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "defines", -)) - - -def pureint(value): - i = int(value) - if (value[0] in "+- ") or (value[-1] == " "): - raise ValueError("non-pure integer") - return i +ObjectIdentifierState = namedtuple( + "ObjectIdentifierState", + BasicState._fields + ("value", "defines"), + **NAMEDTUPLE_KWARGS +) class ObjectIdentifier(Obj): @@ -3498,10 +4130,10 @@ class ObjectIdentifier(Obj): self.defines = defines def __add__(self, their): + if their.__class__ == tuple: + return self.__class__(self._value + array("L", their)) if isinstance(their, self.__class__): return self.__class__(self._value + their._value) - if isinstance(their, tuple): - return self.__class__(self._value + their) raise InvalidValueType((self.__class__, tuple)) def _value_sanitize(self, value): @@ -3509,10 +4141,15 @@ class ObjectIdentifier(Obj): return value._value if isinstance(value, string_types): try: - value = tuple(pureint(arc) for arc in value.split(".")) + value = array("L", (pureint(arc) for arc in value.split("."))) except ValueError: raise InvalidOID("unacceptable arcs values") - if isinstance(value, tuple): + if value.__class__ == tuple: + try: + value = array("L", value) + except OverflowError as err: + raise InvalidOID(repr(err)) + if value.__class__ is array: if len(value) < 2: raise InvalidOID("less than 2 arcs") first_arc = value[0] @@ -3535,8 +4172,8 @@ class ObjectIdentifier(Obj): def __getstate__(self): return ObjectIdentifierState( __version__, - self._value, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -3546,22 +4183,13 @@ class ObjectIdentifier(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, self.defines, ) def __setstate__(self, state): super(ObjectIdentifier, self).__setstate__(state) self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.defines = state.defines def __iter__(self): @@ -3580,8 +4208,8 @@ class ObjectIdentifier(Obj): ) def __eq__(self, their): - if isinstance(their, tuple): - return self._value == their + if their.__class__ == tuple: + return self._value == array("L", their) if not issubclass(their.__class__, ObjectIdentifier): return False return ( @@ -3630,7 +4258,7 @@ class ObjectIdentifier(Obj): v = b"".join(octets) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -3647,7 +4275,8 @@ class ObjectIdentifier(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -3672,7 +4301,7 @@ class ObjectIdentifier(Obj): offset=offset, ) v, tail = v[:l], v[l:] - arcs = [] + arcs = array("L") ber_encoded = False while len(v) > 0: i = 0 @@ -3683,10 +4312,23 @@ class ObjectIdentifier(Obj): if ctx.get("bered", False): ber_encoded = True else: - raise DecodeError("non normalized arc encoding") + raise DecodeError( + "non normalized arc encoding", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) arc = (arc << 7) | (octet & 0x7F) if octet & 0x80 == 0: - arcs.append(arc) + try: + arcs.append(arc) + except OverflowError: + raise DecodeError( + "too huge value for local unsigned long", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) v = v[i + 1:] break i += 1 @@ -3708,7 +4350,7 @@ class ObjectIdentifier(Obj): first_arc = 2 second_arc -= 80 obj = self.__class__( - value=tuple([first_arc, second_arc] + arcs[1:]), + value=array("L", (first_arc, second_arc)) + arcs[1:], impl=self.tag, expl=self._expl, default=self.default, @@ -3717,7 +4359,7 @@ class ObjectIdentifier(Obj): ) if ber_encoded: obj.ber_encoded = True - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) @@ -3816,7 +4458,7 @@ class Enumerated(Integer): def escape_control_unicode(c): - if unicat(c).startswith("C"): + if unicat(c)[0] == "C": c = repr(c).lstrip("u").strip("'") return c @@ -3890,9 +4532,9 @@ class CommonString(OctetString): value_decoded = None if isinstance(value, self.__class__): value_raw = value._value - elif isinstance(value, text_type): + elif value.__class__ == text_type: value_decoded = value - elif isinstance(value, binary_type): + elif value.__class__ == binary_type: value_raw = value else: raise InvalidValueType((self.__class__, text_type, binary_type)) @@ -3916,9 +4558,9 @@ class CommonString(OctetString): return value_raw def __eq__(self, their): - if isinstance(their, binary_type): + if their.__class__ == binary_type: return self._value == their - if isinstance(their, text_type): + if their.__class__ == text_type: return self._value == their.encode(self.encoding) if not isinstance(their, self.__class__): return False @@ -4009,6 +4651,7 @@ class NumericString(AllowableCharsMixin, CommonString): PrintableStringState = namedtuple( "PrintableStringState", OctetStringState._fields + ("allowable_chars",), + **NAMEDTUPLE_KWARGS ) @@ -4142,11 +4785,6 @@ LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ") LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ") -def fractions2float(fractions_raw): - pureint(fractions_raw) - return float("0." + fractions_raw) - - class VisibleString(CommonString): __slots__ = () tag_default = tag_encode(26) @@ -4154,7 +4792,21 @@ class VisibleString(CommonString): asn1_type_name = "VisibleString" -UTCTimeState = namedtuple("UTCTimeState", OctetStringState._fields + ("ber_raw",)) +UTCTimeState = namedtuple( + "UTCTimeState", + OctetStringState._fields + ("ber_raw",), + **NAMEDTUPLE_KWARGS +) + + +def str_to_time_fractions(value): + v = pureint(value) + year, v = (v // 10**10), (v % 10**10) + month, v = (v // 10**8), (v % 10**8) + day, v = (v // 10**6), (v % 10**6) + hour, v = (v // 10**4), (v % 10**4) + minute, second = (v // 100), (v % 100) + return year, month, day, hour, minute, second class UTCTime(VisibleString): @@ -4171,6 +4823,9 @@ class UTCTime(VisibleString): >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime() datetime.datetime(1957, 9, 30, 22, 7, 50) + If BER encoded value was met, then ``ber_raw`` attribute will hold + its raw representation. + .. warning:: Pay attention that UTCTime can not hold full year, so all years @@ -4184,10 +4839,11 @@ class UTCTime(VisibleString): * minutes are not exceeding 60 * offset value is not exceeding 14 hours """ - __slots__ = ("_ber_raw",) + __slots__ = ("ber_raw",) tag_default = tag_encode(23) encoding = "ascii" asn1_type_name = "UTCTime" + evgen_mode_skip_value = False def __init__( self, @@ -4212,10 +4868,10 @@ class UTCTime(VisibleString): None, None, impl, expl, None, optional, _decoded, ctx, ) self._value = value - self._ber_raw = None + self.ber_raw = None if value is not None: - self._value, self._ber_raw = self._value_sanitize(value, ctx) - self.ber_encoded = self._ber_raw is not None + self._value, self.ber_raw = self._value_sanitize(value, ctx) + self.ber_encoded = self.ber_raw is not None if default is not None: default, _ = self._value_sanitize(default) self.default = self.__class__( @@ -4229,18 +4885,12 @@ class UTCTime(VisibleString): self.optional = optional def _strptime_bered(self, value): - year = pureint(value[:2]) - year += 2000 if year < 50 else 1900 - decoded = datetime( - year, # %Y - pureint(value[2:4]), # %m - pureint(value[4:6]), # %d - pureint(value[6:8]), # %H - pureint(value[8:10]), # %M - ) + year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00") value = value[10:] if len(value) == 0: raise ValueError("no timezone") + year += 2000 if year < 50 else 1900 + decoded = datetime(year, month, day, hour, minute) offset = 0 if value[-1] == "Z": value = value[:-1] @@ -4253,10 +4903,11 @@ class UTCTime(VisibleString): sign = 1 else: raise ValueError("invalid UTC offset") - offset = 60 * pureint(value[-2:]) + v = pureint(value[-4:]) + offset, v = (60 * (v % 100)), v // 100 if offset >= 3600: raise ValueError("invalid UTC offset minutes") - offset += 3600 * pureint(value[-4:-2]) + offset += 3600 * v if offset > 14 * 3600: raise ValueError("too big UTC offset") offset *= sign @@ -4268,8 +4919,7 @@ class UTCTime(VisibleString): seconds = pureint(value) if seconds >= 60: raise ValueError("invalid seconds value") - decoded += timedelta(seconds=seconds) - return offset, decoded + return offset, decoded + timedelta(seconds=seconds) def _strptime(self, value): # datetime.strptime's format: %y%m%d%H%M%SZ @@ -4277,16 +4927,9 @@ class UTCTime(VisibleString): raise ValueError("invalid UTCTime length") if value[-1] != "Z": raise ValueError("non UTC timezone") - year = pureint(value[:2]) + year, month, day, hour, minute, second = str_to_time_fractions(value[:-1]) year += 2000 if year < 50 else 1900 - return datetime( - year, # %y - pureint(value[2:4]), # %m - pureint(value[4:6]), # %d - pureint(value[6:8]), # %H - pureint(value[8:10]), # %M - pureint(value[10:12]), # %S - ) + return datetime(year, month, day, hour, minute, second) def _dt_sanitize(self, value): if value.year < 1950 or value.year > 2049: @@ -4294,7 +4937,7 @@ class UTCTime(VisibleString): return value.replace(microsecond=0) def _value_sanitize(self, value, ctx=None): - if isinstance(value, binary_type): + if value.__class__ == binary_type: try: value_decoded = value.decode("ascii") except (UnicodeEncodeError, UnicodeDecodeError) as err: @@ -4308,7 +4951,7 @@ class UTCTime(VisibleString): try: offset, _value = self._strptime_bered(value_decoded) _value = _value - timedelta(seconds=offset) - return self._dt_sanitize(_value), value_decoded + return self._dt_sanitize(_value), value except (TypeError, ValueError, OverflowError) as _err: err = _err raise DecodeError( @@ -4317,7 +4960,7 @@ class UTCTime(VisibleString): ) if isinstance(value, self.__class__): return value._value, None - if isinstance(value, datetime): + if value.__class__ == datetime: return self._dt_sanitize(value), None raise InvalidValueType((self.__class__, datetime)) @@ -4325,35 +4968,35 @@ class UTCTime(VisibleString): if self.ready: value = self._value.isoformat() if self.ber_encoded: - value += " (%s)" % self._ber_raw + value += " (%s)" % self.ber_raw return value def __unicode__(self): if self.ready: value = self._value.isoformat() if self.ber_encoded: - value += " (%s)" % self._ber_raw + value += " (%s)" % self.ber_raw return value return text_type(self._pp_value()) def __getstate__(self): return UTCTimeState( *super(UTCTime, self).__getstate__(), - **{"ber_raw": self._ber_raw} + **{"ber_raw": self.ber_raw} ) def __setstate__(self, state): super(UTCTime, self).__setstate__(state) - self._ber_raw = state.ber_raw + self.ber_raw = state.ber_raw def __bytes__(self): self._assert_ready() return self._encode_time() def __eq__(self, their): - if isinstance(their, binary_type): + if their.__class__ == binary_type: return self._encode_time() == their - if isinstance(their, datetime): + if their.__class__ == datetime: return self.todatetime() == their if not isinstance(their, self.__class__): return False @@ -4371,6 +5014,9 @@ class UTCTime(VisibleString): value = self._encode_time() return b"".join((self.tag, len_encode(len(value)), value)) + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def todatetime(self): return self._value @@ -4446,14 +5092,9 @@ class GeneralizedTime(UTCTime): def _strptime_bered(self, value): if len(value) < 4 + 3 * 2: raise ValueError("invalid GeneralizedTime") - decoded = datetime( - pureint(value[:4]), # %Y - pureint(value[4:6]), # %m - pureint(value[6:8]), # %d - pureint(value[8:10]), # %H - ) - value = value[10:] - offset = 0 + year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000") + decoded = datetime(year, month, day, hour) + offset, value = 0, value[10:] if len(value) == 0: return offset, decoded if value[-1] == "Z": @@ -4463,22 +5104,24 @@ class GeneralizedTime(UTCTime): idx = value.rfind(char) if idx == -1: continue - offset_raw = value[idx + 1:].replace(":", "") - if len(offset_raw) not in (2, 4): + offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx] + v = pureint(offset_raw) + if len(offset_raw) == 4: + offset, v = (60 * (v % 100)), v // 100 + if offset >= 3600: + raise ValueError("invalid UTC offset minutes") + elif len(offset_raw) == 2: + pass + else: raise ValueError("invalid UTC offset") - value = value[:idx] - offset = 60 * pureint(offset_raw[2:] or "0") - if offset >= 3600: - raise ValueError("invalid UTC offset minutes") - offset += 3600 * pureint(offset_raw[:2]) + offset += 3600 * v if offset > 14 * 3600: raise ValueError("too big UTC offset") offset *= sign break if len(value) == 0: return offset, decoded - decimal_signs = ".," - if value[0] in decimal_signs: + if value[0] in DECIMAL_SIGNS: return offset, ( decoded + timedelta(seconds=3600 * fractions2float(value[1:])) ) @@ -4488,7 +5131,7 @@ class GeneralizedTime(UTCTime): value = value[2:] if len(value) == 0: return offset, decoded - if value[0] in decimal_signs: + if value[0] in DECIMAL_SIGNS: return offset, ( decoded + timedelta(seconds=60 * fractions2float(value[1:])) ) @@ -4498,7 +5141,7 @@ class GeneralizedTime(UTCTime): value = value[2:] if len(value) == 0: return offset, decoded - if value[0] not in decimal_signs: + if value[0] not in DECIMAL_SIGNS: raise ValueError("invalid format after seconds") return offset, ( decoded + timedelta(microseconds=10**6 * fractions2float(value[1:])) @@ -4510,14 +5153,7 @@ class GeneralizedTime(UTCTime): # datetime.strptime's format: %Y%m%d%H%M%SZ if value[-1] != "Z": raise ValueError("non UTC timezone") - return datetime( - pureint(value[:4]), # %Y - pureint(value[4:6]), # %m - pureint(value[6:8]), # %d - pureint(value[8:10]), # %H - pureint(value[10:12]), # %M - pureint(value[12:14]), # %S - ) + return datetime(*str_to_time_fractions(value[:-1])) if l >= LEN_YYYYMMDDHHMMSSDMZ: # datetime.strptime's format: %Y%m%d%H%M%S.%fZ if value[-1] != "Z": @@ -4531,16 +5167,8 @@ class GeneralizedTime(UTCTime): if us_len > 6: raise ValueError("only microsecond fractions are supported") us = pureint(us + ("0" * (6 - us_len))) - decoded = datetime( - pureint(value[:4]), # %Y - pureint(value[4:6]), # %m - pureint(value[6:8]), # %d - pureint(value[8:10]), # %H - pureint(value[10:12]), # %M - pureint(value[12:14]), # %S - us, # %f - ) - return decoded + year, month, day, hour, minute, second = str_to_time_fractions(value[:14]) + return datetime(year, month, day, hour, minute, second, us) raise ValueError("invalid GeneralizedTime length") def _encode_time(self): @@ -4584,21 +5212,11 @@ class BMPString(CommonString): asn1_type_name = "BMPString" -ChoiceState = namedtuple("ChoiceState", ( - "version", - "specs", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +ChoiceState = namedtuple( + "ChoiceState", + BasicState._fields + ("specs", "value",), + **NAMEDTUPLE_KWARGS +) class Choice(Obj): @@ -4662,7 +5280,7 @@ class Choice(Obj): if len(schema) == 0: raise ValueError("schema must be specified") self.specs = ( - schema if isinstance(schema, OrderedDict) else OrderedDict(schema) + schema if schema.__class__ == OrderedDict else OrderedDict(schema) ) self._value = None if value is not None: @@ -4675,9 +5293,12 @@ class Choice(Obj): self.default = default_obj if value is None: self._value = copy(default_obj._value) + if self._expl is not None: + tag_class, _, tag_num = tag_decode(self._expl) + self._tag_order = (tag_class, tag_num) def _value_sanitize(self, value): - if isinstance(value, tuple) and len(value) == 2: + if (value.__class__ == tuple) and len(value) == 2: choice, obj = value spec = self.specs.get(choice) if spec is None: @@ -4703,9 +5324,8 @@ class Choice(Obj): def __getstate__(self): return ChoiceState( __version__, - self.specs, - copy(self._value), self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -4715,24 +5335,17 @@ class Choice(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + copy(self._value), ) def __setstate__(self, state): super(Choice, self).__setstate__(state) self.specs = state.specs self._value = state.value - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __eq__(self, their): - if isinstance(their, tuple) and len(their) == 2: + if (their.__class__ == tuple) and len(their) == 2: return self._value == their if not isinstance(their, self.__class__): return False @@ -4758,14 +5371,27 @@ class Choice(Obj): @property def choice(self): + """Name of the choice + """ self._assert_ready() return self._value[0] @property def value(self): + """Value of underlying choice + """ self._assert_ready() return self._value[1] + @property + def tag_order(self): + self._assert_ready() + return self._value[1].tag_order if self._tag_order is None else self._tag_order + + @property + def tag_order_cer(self): + return min(v.tag_order_cer for v in itervalues(self.specs)) + def __getitem__(self, key): if key not in self.specs: raise ObjUnknown(key) @@ -4796,7 +5422,11 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + self._assert_ready() + self._value[1].encode_cer(writer) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): for choice, spec in iteritems(self.specs): sub_decode_path = decode_path + (choice,) try: @@ -4819,15 +5449,28 @@ class Choice(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None - value, tail = spec.decode( - tlv, - offset=offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + yield None + return + if evgen_mode: + for _decode_path, value, tail in spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, tail + else: + _, value, tail = next(spec.decode_evgen( + tlv, + offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) obj = self.__class__( schema=self.specs, expl=self._expl, @@ -4836,7 +5479,7 @@ class Choice(Obj): _decoded=(offset, 0, value.fulllen), ) obj._value = (choice, value) - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -4903,27 +5546,18 @@ class PrimitiveTypes(Choice): )) -AnyState = namedtuple("AnyState", ( - "version", - "value", - "tag", - "expl", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", - "defined", -)) +AnyState = namedtuple( + "AnyState", + BasicState._fields + ("value", "defined"), + **NAMEDTUPLE_KWARGS +) class Any(Obj): """``ANY`` special type >>> Any(Integer(-123)) - ANY 020185 + ANY INTEGER -123 (0X:7B) >>> a = Any(OctetString(b"hello world").encode()) ANY 040b68656c6c6f20776f726c64 >>> hexenc(bytes(a)) @@ -4943,28 +5577,47 @@ class Any(Obj): """ :param value: set the value. Either any kind of pyderasn's **ready** object, or bytes. Pay attention that - **no** validation is performed is raw binary value - is valid TLV + **no** validation is performed if raw binary value + is valid TLV, except just tag decoding :param bytes expl: override default tag with ``EXPLICIT`` one :param bool optional: is object ``OPTIONAL`` in sequence """ super(Any, self).__init__(None, expl, None, optional, _decoded) - self._value = None if value is None else self._value_sanitize(value) + if value is None: + self._value = None + else: + value = self._value_sanitize(value) + self._value = value + if self._expl is None: + if value.__class__ == binary_type: + tag_class, _, tag_num = tag_decode(tag_strip(value)[0]) + else: + tag_class, tag_num = value.tag_order + else: + tag_class, _, tag_num = tag_decode(self._expl) + self._tag_order = (tag_class, tag_num) self.defined = None def _value_sanitize(self, value): - if isinstance(value, binary_type): + if value.__class__ == binary_type: + if len(value) == 0: + raise ValueError("Any value can not be empty") return value if isinstance(value, self.__class__): return value._value - if isinstance(value, Obj): - return value.encode() - raise InvalidValueType((self.__class__, Obj, binary_type)) + if not isinstance(value, Obj): + raise InvalidValueType((self.__class__, Obj, binary_type)) + return value @property def ready(self): return self._value is not None + @property + def tag_order(self): + self._assert_ready() + return self._tag_order + @property def bered(self): if self.expl_lenindef or self.lenindef: @@ -4976,9 +5629,10 @@ class Any(Obj): def __getstate__(self): return AnyState( __version__, - self._value, self.tag, + self._tag_order, self._expl, + None, self.optional, self.offset, self.llen, @@ -4986,28 +5640,24 @@ class Any(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self._value, self.defined, ) def __setstate__(self, state): super(Any, self).__setstate__(state) self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded self.defined = state.defined def __eq__(self, their): - if isinstance(their, binary_type): - return self._value == their + if their.__class__ == binary_type: + if self._value.__class__ == binary_type: + return self._value == their + return self._value.encode() == their if issubclass(their.__class__, Any): - return self._value == their._value + if self.ready and their.ready: + return bytes(self) == bytes(their) + return self.ready == their.ready return False def __call__( @@ -5024,7 +5674,10 @@ class Any(Obj): def __bytes__(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return self._value.encode() @property def tlen(self): @@ -5032,9 +5685,20 @@ class Any(Obj): def _encode(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return value.encode() + + def _encode_cer(self, writer): + self._assert_ready() + value = self._value + if value.__class__ == binary_type: + write_full(writer, value) + else: + value.encode_cer(writer) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5071,14 +5735,15 @@ class Any(Obj): chunk_i += 1 tlvlen = tlen + llen + vlen + EOC_LEN obj = self.__class__( - value=tlv[:tlvlen].tobytes(), + value=None if evgen_mode else tlv[:tlvlen].tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.lenindef = True obj.tag = t.tobytes() - return obj, v[EOC_LEN:] + yield decode_path, obj, v[EOC_LEN:] + return except DecodeError as err: raise err.__class__( msg=err.msg, @@ -5096,24 +5761,32 @@ class Any(Obj): tlvlen = tlen + llen + l v, tail = tlv[:tlvlen], v[l:] obj = self.__class__( - value=v.tobytes(), + value=None if evgen_mode else v.tobytes(), expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), ) obj.tag = t.tobytes() - return obj, tail + yield decode_path, obj, tail def __repr__(self): return pp_console_row(next(self.pps())) def pps(self, decode_path=()): + value = self._value + if value is None: + pass + elif value.__class__ == binary_type: + value = None + else: + value = repr(value) yield _pp( obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, - blob=self._value if self.ready else None, + value=value, + blob=self._value if self._value.__class__ == binary_type else None, optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), @@ -5150,7 +5823,7 @@ def get_def_by_path(defines_by_path, sub_decode_path): if len(path) != len(sub_decode_path): continue for p1, p2 in zip(path, sub_decode_path): - if (p1 != any) and (p1 != p2): + if (not p1 is any) and (p1 != p2): break else: return define @@ -5181,21 +5854,11 @@ def abs_decode_path(decode_path, rel_path): return decode_path + rel_path -SequenceState = namedtuple("SequenceState", ( - "version", - "specs", - "value", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +SequenceState = namedtuple( + "SequenceState", + BasicState._fields + ("specs", "value",), + **NAMEDTUPLE_KWARGS +) class Sequence(Obj): @@ -5286,6 +5949,12 @@ class Sequence(Obj): defaulted values existence validation by setting ``"allow_default_values": True`` :ref:`context ` option. + .. warning:: + + Check for default value existence is not performed in + ``evgen_mode``, because previously decoded values are not stored + in memory, to be able to compare them. + Two sequences are equal if they have equal specification (schema), implicit/explicit tagging and the same values. """ @@ -5307,7 +5976,7 @@ class Sequence(Obj): if schema is None: schema = getattr(self, "schema", ()) self.specs = ( - schema if isinstance(schema, OrderedDict) else OrderedDict(schema) + schema if schema.__class__ == OrderedDict else OrderedDict(schema) ) self._value = {} if value is not None: @@ -5350,9 +6019,8 @@ class Sequence(Obj): def __getstate__(self): return SequenceState( __version__, - self.specs, - {k: copy(v) for k, v in iteritems(self._value)}, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -5362,22 +6030,14 @@ class Sequence(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.specs, + {k: copy(v) for k, v in iteritems(self._value)}, ) def __setstate__(self, state): super(Sequence, self).__setstate__(state) self.specs = state.specs self._value = state.value - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __eq__(self, their): if not isinstance(their, self.__class__): @@ -5435,22 +6095,26 @@ class Sequence(Obj): return spec.default return None - def _encoded_values(self): - raws = [] + def _values_for_encoding(self): for name, spec in iteritems(self.specs): value = self._value.get(name) if value is None: if spec.optional: continue raise ObjNotReady(name) - raws.append(value.encode()) - return raws + yield value def _encode(self): - v = b"".join(self._encoded_values()) + v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5467,7 +6131,8 @@ class Sequence(Obj): offset=offset, ) if tag_only: # pragma: no cover - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5511,21 +6176,33 @@ class Sequence(Obj): continue sub_decode_path = decode_path + (name,) try: - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) except TagMismatch as err: if (len(err.decode_path) == len(decode_path) + 1) and spec.optional: continue raise defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path) - if defined is not None: + if not evgen_mode and defined is not None: defined_by, defined_spec = defined if issubclass(value.__class__, SequenceOf): for i, _value in enumerate(value): @@ -5577,31 +6254,32 @@ class Sequence(Obj): vlen += value_len sub_offset += value_len v = v_tail - if spec.default is not None and value == spec.default: - if ctx_bered or ctx_allow_default_values: - ber_encoded = True - else: - raise DecodeError( - "DEFAULT value met", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) - values[name] = value - - spec_defines = getattr(spec, "defines", ()) - if len(spec_defines) == 0: - defines_by_path = ctx.get("defines_by_path", ()) - if len(defines_by_path) > 0: - spec_defines = get_def_by_path(defines_by_path, sub_decode_path) - if spec_defines is not None and len(spec_defines) > 0: - for rel_path, schema in spec_defines: - defined = schema.get(value, None) - if defined is not None: - ctx.setdefault("_defines", []).append(( - abs_decode_path(sub_decode_path[:-1], rel_path), - (value, defined), - )) + if not evgen_mode: + if spec.default is not None and value == spec.default: + # This will not work in evgen_mode + if ctx_bered or ctx_allow_default_values: + ber_encoded = True + else: + raise DecodeError( + "DEFAULT value met", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) + values[name] = value + spec_defines = getattr(spec, "defines", ()) + if len(spec_defines) == 0: + defines_by_path = ctx.get("defines_by_path", ()) + if len(defines_by_path) > 0: + spec_defines = get_def_by_path(defines_by_path, sub_decode_path) + if spec_defines is not None and len(spec_defines) > 0: + for rel_path, schema in spec_defines: + defined = schema.get(value, None) + if defined is not None: + ctx.setdefault("_defines", []).append(( + abs_decode_path(sub_decode_path[:-1], rel_path), + (value, defined), + )) if lenindef: if v[:EOC_LEN].tobytes() != EOC: raise DecodeError( @@ -5630,7 +6308,7 @@ class Sequence(Obj): obj._value = values obj.lenindef = lenindef obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -5682,8 +6360,8 @@ class Set(Sequence): .. _allow_unordered_set_ctx: DER prohibits unordered values encoding and will raise an error - during decode. If If :ref:`bered ` context option is set, - then no error will occure. Also you can disable strict values + during decode. If :ref:`bered ` context option is set, + then no error will occur. Also you can disable strict values ordering check by setting ``"allow_unordered_set": True`` :ref:`context ` option. """ @@ -5692,15 +6370,22 @@ class Set(Sequence): asn1_type_name = "SET" def _encode(self): - raws = self._encoded_values() - raws.sort() - v = b"".join(raws) + v = b"".join(value.encode() for value in sorted( + self._values_for_encoding(), + key=attrgetter("tag_order"), + )) return b"".join((self.tag, len_encode(len(v)), v)) - def _specs_items(self): - return iteritems(self.specs) + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted( + self._values_for_encoding(), + key=attrgetter("tag_order_cer"), + ): + v.encode_cer(writer) + write_full(writer, EOC) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -5717,7 +6402,8 @@ class Set(Sequence): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -5753,12 +6439,13 @@ class Set(Sequence): ber_encoded = False ctx_allow_default_values = ctx.get("allow_default_values", False) ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) - value_prev = memoryview(v[:0]) + tag_order_prev = (0, 0) + _specs_items = copy(self.specs) while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - for name, spec in self._specs_items(): + for name, spec in iteritems(_specs_items): sub_decode_path = decode_path + (name,) try: spec.decode( @@ -5779,16 +6466,29 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) + value_tag_order = value.tag_order value_len = value.fulllen - if value_prev.tobytes() > v[:value_len].tobytes(): + if tag_order_prev >= value_tag_order: if ctx_bered or ctx_allow_unordered_set: ber_encoded = True else: @@ -5810,10 +6510,12 @@ class Set(Sequence): offset=sub_offset, ) values[name] = value - value_prev = v[:value_len] + del _specs_items[name] + tag_order_prev = value_tag_order sub_offset += value_len vlen += value_len v = v_tail + obj = self.__class__( schema=self.specs, impl=self.tag, @@ -5832,35 +6534,25 @@ class Set(Sequence): ) tail = v[EOC_LEN:] obj.lenindef = True - obj._value = values - if not obj.ready: - raise DecodeError( - "not all values are ready", - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) + for name, spec in iteritems(self.specs): + if name not in values and not spec.optional: + raise DecodeError( + "%s value is not ready" % name, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not evgen_mode: + obj._value = values obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail -SequenceOfState = namedtuple("SequenceOfState", ( - "version", - "spec", - "value", - "bound_min", - "bound_max", - "tag", - "expl", - "default", - "optional", - "offset", - "llen", - "vlen", - "expl_lenindef", - "lenindef", - "ber_encoded", -)) +SequenceOfState = namedtuple( + "SequenceOfState", + BasicState._fields + ("spec", "value", "bound_min", "bound_max"), + **NAMEDTUPLE_KWARGS +) class SequenceOf(Obj): @@ -5889,9 +6581,21 @@ class SequenceOf(Obj): >>> ints Ints SEQUENCE OF[INTEGER 123, INTEGER 345] - Also you can initialize sequence with preinitialized values: + You can initialize sequence with preinitialized values: >>> ints = Ints([Integer(123), Integer(234)]) + + Also you can use iterator as a value: + + >>> ints = Ints(iter(Integer(i) for i in range(1000000))) + + And it won't be iterated until encoding process. Pay attention that + bounds and required schema checks are done only during the encoding + process in that case! After encode was called, then value is zeroed + back to empty list and you have to set it again. That mode is useful + mainly with CER encoding mode, where all objects from the iterable + will be streamed to the buffer, without copying all of them to + memory first. """ __slots__ = ("spec", "_bound_min", "_bound_max") tag_default = tag_encode(form=TagFormConstructed, num=16) @@ -5935,21 +6639,31 @@ class SequenceOf(Obj): self._value = copy(default_obj._value) def _value_sanitize(self, value): + iterator = False if issubclass(value.__class__, SequenceOf): value = value._value + elif hasattr(value, NEXT_ATTR_NAME): + iterator = True + value = value elif hasattr(value, "__iter__"): value = list(value) else: - raise InvalidValueType((self.__class__, iter)) - if not self._bound_min <= len(value) <= self._bound_max: - raise BoundsError(self._bound_min, len(value), self._bound_max) - for v in value: - if not isinstance(v, self.spec.__class__): - raise InvalidValueType((self.spec.__class__,)) + raise InvalidValueType((self.__class__, iter, "iterator")) + if not iterator: + if not self._bound_min <= len(value) <= self._bound_max: + raise BoundsError(self._bound_min, len(value), self._bound_max) + class_expected = self.spec.__class__ + for v in value: + if not isinstance(v, class_expected): + raise InvalidValueType((class_expected,)) return value @property def ready(self): + if hasattr(self._value, NEXT_ATTR_NAME): + return True + if self._bound_min > 0 and len(self._value) == 0: + return False return all(v.ready for v in self._value) @property @@ -5959,13 +6673,12 @@ class SequenceOf(Obj): return any(v.bered for v in self._value) def __getstate__(self): + if hasattr(self._value, NEXT_ATTR_NAME): + raise ValueError("can not pickle SequenceOf with iterator") return SequenceOfState( __version__, - self.spec, - [copy(v) for v in self._value], - self._bound_min, - self._bound_max, self.tag, + self._tag_order, self._expl, self.default, self.optional, @@ -5975,6 +6688,10 @@ class SequenceOf(Obj): self.expl_lenindef, self.lenindef, self.ber_encoded, + self.spec, + [copy(v) for v in self._value], + self._bound_min, + self._bound_max, ) def __setstate__(self, state): @@ -5983,16 +6700,6 @@ class SequenceOf(Obj): self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max - self.tag = state.tag - self._expl = state.expl - self.default = state.default - self.optional = state.optional - self.offset = state.offset - self.llen = state.llen - self.vlen = state.vlen - self.expl_lenindef = state.expl_lenindef - self.lenindef = state.lenindef - self.ber_encoded = state.ber_encoded def __eq__(self, their): if isinstance(their, self.__class__): @@ -6043,11 +6750,9 @@ class SequenceOf(Obj): self._value.append(value) def __iter__(self): - self._assert_ready() return iter(self._value) def __len__(self): - self._assert_ready() return len(self._value) def __setitem__(self, key, value): @@ -6058,14 +6763,58 @@ class SequenceOf(Obj): def __getitem__(self, key): return self._value[key] - def _encoded_values(self): - return [v.encode() for v in self._value] + def _values_for_encoding(self): + return iter(self._value) def _encode(self): - v = b"".join(self._encoded_values()) - return b"".join((self.tag, len_encode(len(v)), v)) + iterator = hasattr(self._value, NEXT_ATTR_NAME) + if iterator: + values = [] + values_append = values.append + class_expected = self.spec.__class__ + values_for_encoding = self._values_for_encoding() + self._value = [] + for v in values_for_encoding: + if not isinstance(v, class_expected): + raise InvalidValueType((class_expected,)) + values_append(v.encode()) + if not self._bound_min <= len(values) <= self._bound_max: + raise BoundsError(self._bound_min, len(values), self._bound_max) + value = b"".join(values) + else: + value = b"".join(v.encode() for v in self._values_for_encoding()) + return b"".join((self.tag, len_encode(len(value)), value)) + + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + iterator = hasattr(self._value, NEXT_ATTR_NAME) + if iterator: + class_expected = self.spec.__class__ + values_count = 0 + values_for_encoding = self._values_for_encoding() + self._value = [] + for v in values_for_encoding: + if not isinstance(v, class_expected): + raise InvalidValueType((class_expected,)) + v.encode_cer(writer) + values_count += 1 + if not self._bound_min <= values_count <= self._bound_max: + raise BoundsError(self._bound_min, values_count, self._bound_max) + else: + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) - def _decode(self, tlv, offset, decode_path, ctx, tag_only, ordering_check=False): + def _decode( + self, + tlv, + offset, + decode_path, + ctx, + tag_only, + evgen_mode, + ordering_check=False, + ): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -6082,7 +6831,8 @@ class SequenceOf(Obj): offset=offset, ) if tag_only: - return None + yield None + return lenindef = False ctx_bered = ctx.get("bered", False) try: @@ -6116,6 +6866,7 @@ class SequenceOf(Obj): vlen = 0 sub_offset = offset + tlen + llen _value = [] + _value_count = 0 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False) value_prev = memoryview(v[:0]) ber_encoded = False @@ -6123,15 +6874,27 @@ class SequenceOf(Obj): while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - sub_decode_path = decode_path + (str(len(_value)),) - value, v_tail = spec.decode( - v, - sub_offset, - leavemm=True, - decode_path=sub_decode_path, - ctx=ctx, - _ctx_immutable=False, - ) + sub_decode_path = decode_path + (str(_value_count),) + if evgen_mode: + for _decode_path, value, v_tail in spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + ): + yield _decode_path, value, v_tail + else: + _, value, v_tail = next(spec.decode_evgen( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + _ctx_immutable=False, + _evgen_mode=False, + )) value_len = value.fulllen if ordering_check: if value_prev.tobytes() > v[:value_len].tobytes(): @@ -6145,13 +6908,22 @@ class SequenceOf(Obj): offset=sub_offset, ) value_prev = v[:value_len] - _value.append(value) + _value_count += 1 + if not evgen_mode: + _value.append(value) sub_offset += value_len vlen += value_len v = v_tail + if evgen_mode and not self._bound_min <= _value_count <= self._bound_max: + raise DecodeError( + msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) try: obj = self.__class__( - value=_value, + value=None if evgen_mode else _value, schema=spec, bounds=(self._bound_min, self._bound_max), impl=self.tag, @@ -6178,7 +6950,7 @@ class SequenceOf(Obj): obj.lenindef = True tail = v[EOC_LEN:] obj.ber_encoded = ber_encoded - return obj, tail + yield decode_path, obj, tail def __repr__(self): return "%s[%s]" % ( @@ -6224,19 +6996,32 @@ class SetOf(SequenceOf): tag_default = tag_encode(form=TagFormConstructed, num=17) asn1_type_name = "SET OF" + def _value_sanitize(self, value): + value = super(SetOf, self)._value_sanitize(value) + if hasattr(value, NEXT_ATTR_NAME): + raise ValueError( + "SetOf does not support iterator values, as no sense in them" + ) + return value + def _encode(self): - raws = self._encoded_values() - raws.sort() - v = b"".join(raws) + v = b"".join(sorted(v.encode() for v in self._values_for_encoding())) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted(encode_cer(v) for v in self._values_for_encoding()): + write_full(writer, v) + write_full(writer, EOC) + + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): return super(SetOf, self)._decode( tlv, offset, decode_path, ctx, tag_only, + evgen_mode, ordering_check=True, ) @@ -6358,14 +7143,22 @@ def main(): # pragma: no cover help="Allow explicit tag out-of-bound", ) parser.add_argument( - "DERFile", + "--evgen", + action="store_true", + help="Turn on event generation mode", + ) + parser.add_argument( + "RAWFile", type=argparse.FileType("rb"), - help="Path to DER file you want to decode", + help="Path to BER/CER/DER file you want to decode", ) args = parser.parse_args() - args.DERFile.seek(args.skip) - der = memoryview(args.DERFile.read()) - args.DERFile.close() + if PY2: + args.RAWFile.seek(args.skip) + raw = memoryview(args.RAWFile.read()) + args.RAWFile.close() + else: + raw = file_mmaped(args.RAWFile)[args.skip:] oid_maps = ( [obj_by_path(_path) for _path in (args.oids or "").split(",")] if args.oids else () @@ -6382,9 +7175,9 @@ def main(): # pragma: no cover } if args.defines_by_path is not None: ctx["defines_by_path"] = obj_by_path(args.defines_by_path) - obj, tail = schema().decode(der, ctx=ctx) - print(pprinter( - obj, + from os import environ + pprinter = partial( + pprinter, oid_maps=oid_maps, with_colours=environ.get("NO_COLOR") is None, with_decode_path=args.print_decode_path, @@ -6392,7 +7185,13 @@ def main(): # pragma: no cover () if args.decode_path_only is None else tuple(args.decode_path_only.split(":")) ), - )) + ) + if args.evgen: + for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx): + print(pprinter(obj, decode_path=decode_path)) + else: + obj, tail = schema().decode(raw, ctx=ctx) + print(pprinter(obj)) if tail != b"": print("\nTrailing data: %s" % hexenc(tail))