X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=blobdiff_plain;f=pyderasn.py;h=3eb10715ed1400a0bd3c55dccd71419a46e7c499;hp=62776b4bbb7570aabdf37ace1ce6d0e52be14551;hb=HEAD;hpb=3370a2b2433533aaeaf17f0507bb142199cbafcf diff --git a/pyderasn.py b/pyderasn.py index 62776b4..54242a3 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -1,8 +1,10 @@ #!/usr/bin/env python # coding: utf-8 # cython: language_level=3 -# PyDERASN -- Python ASN.1 DER/BER codec with abstract structures -# Copyright (C) 2017-2020 Sergey Matveev +# pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines +# pylint: disable=too-many-return-statements,too-many-branches,too-many-statements +# PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures +# Copyright (C) 2017-2024 Sergey Matveev # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as @@ -15,10 +17,10 @@ # # You should have received a copy of the GNU Lesser General Public # License along with this program. If not, see . -"""Python ASN.1 DER/BER codec with abstract structures +"""Python ASN.1 DER/CER/BER codec with abstract structures -This library allows you to marshal various structures in ASN.1 DER -format, unmarshal them in BER/CER/DER ones. +This library allows you to marshal various structures in ASN.1 DER/CER +format, unmarshal BER/CER/DER ones. >>> i = Integer(123) >>> raw = i.encode() @@ -232,6 +234,8 @@ Currently available context options: * :ref:`allow_unordered_set ` * :ref:`bered ` * :ref:`defines_by_path ` +* :ref:`evgen_mode_upto ` +* :ref:`keep_memoryview ` .. _pprinting: @@ -352,6 +356,11 @@ Let's parse that output, human:: (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime`` could be BERed. +Also it could be helpful to add quick ASN.1 pprinting command in your +pdb's configuration file:: + + alias pp1 import pyderasn ;; print(pyderasn.pprint(%1, oid_maps=(locals().get("OID_STR_TO_NAME", {}),))) + .. _definedby: DEFINED BY @@ -428,7 +437,7 @@ ______________________________ Sometimes you either can not or do not want to explicitly set *defines* in the schema. You can dynamically apply those definitions when calling -``.decode()`` method. +:py:meth:`pyderasn.Obj.decode` method. Specify ``defines_by_path`` key in the :ref:`decode context `. Its value must be sequence of following tuples:: @@ -492,9 +501,9 @@ useful for SEQUENCE/SET OF-s. BER encoding ------------ -By default PyDERASN accepts only DER encoded data. It always encodes to -DER. But you can optionally enable BER decoding with setting ``bered`` -:ref:`context ` argument to True. Indefinite lengths and +By default PyDERASN accepts only DER encoded data. By default it encodes +to DER. But you can optionally enable BER decoding with setting +``bered`` :ref:`context ` argument to True. Indefinite lengths and constructed primitive types should be parsed successfully. * If object is encoded in BER form (not the DER one), then ``ber_encoded`` @@ -533,6 +542,379 @@ lengths will be invalid in that case. This option should be used only for skipping some decode errors, just to see the decoded structure somehow. +.. _streaming: + +Streaming and dealing with huge structures +------------------------------------------ + +.. _evgen_mode: + +evgen mode +__________ + +ASN.1 structures can be huge, they can hold millions of objects inside +(for example Certificate Revocation Lists (CRL), holding revocation +state for every previously issued X.509 certificate). CACert.org's 8 MiB +CRL file takes more than half a gigabyte of memory to hold the decoded +structure. + +If you just simply want to check the signature over the ``tbsCertList``, +you can create specialized schema with that field represented as +OctetString for example:: + + class TBSCertListFast(Sequence): + schema = ( + [...] + ("revokedCertificates", OctetString( + impl=SequenceOf.tag_default, + optional=True, + )), + [...] + ) + +This allows you to quickly decode a few fields and check the signature +over the ``tbsCertList`` bytes. + +But how can you get all certificate's serial number from it, after you +trust that CRL after signature validation? You can use so called +``evgen`` (event generation) mode, to catch the events/facts of some +successful object decoding. Let's use command line capabilities:: + + $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl + 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL + 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13 + 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL + 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE + 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10 + 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY + 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE + 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF + [...] + 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11) + 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08 + 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime + 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08 + 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE + 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14) + 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01 + 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime + 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01 + 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE + [...] + 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime + 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06 + 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE + 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL + 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE + 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13 + 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL + 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE + 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits + 0 [1,4,9145534] CertificateList SEQUENCE + +Here we see how decoder works: it decodes SEQUENCE's tag, length, then +decodes underlying values. It can not tell if SEQUENCE is decoded, so +the event of the upper level SEQUENCE is the last one we see. +``version`` field is just a single INTEGER -- it is decoded and event is +fired immediately. Then we see that ``algorithm`` and ``parameters`` +fields are decoded and only after them the ``signature`` SEQUENCE is +fired as a successfully decoded. There are 4 events for each revoked +certificate entry in that CRL: ``userCertificate`` serial number, +``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself +as a one of entity in ``revokedCertificates`` SEQUENCE OF. + +We can do that in our ordinary Python code and understand where we are +by looking at deterministically generated decode paths (do not forget +about useful ``--print-decode-path`` CLI option). We must use +:py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary +:py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path, +obj, tail)`` tuples:: + + for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw): + if ( + len(decode_path) == 4 and + decode_path[:2] == ("tbsCertList", "revokedCertificates"), + decode_path[3] == "userCertificate" + ): + print("serial number:", int(obj)) + +Virtually it does not take any memory except at least needed for single +object storage. You can easily use that mode to determine required +object ``.offset`` and ``.*len`` to be able to decode it separately, or +maybe verify signature upon it just by taking bytes by ``.offset`` and +``.tlvlen``. + +.. _evgen_mode_upto_ctx: + +evgen_mode_upto +_______________ + +There is full ability to get any kind of data from the CRL in the +example above. However it is not too convenient to get the whole +``RevokedCertificate`` structure, that is pretty lightweight and one may +do not want to disassemble it. You can use ``evgen_mode_upto`` +:ref:`ctx ` option that semantically equals to +:ref:`defines_by_path ` -- list of decode paths +mapped to any non-None value. If specified decode path is met, then any +subsequent objects won't be decoded in evgen mode. That allows us to +parse the CRL above with fully assembled ``RevokedCertificate``:: + + for decode_path, obj, _ in CertificateList().decode_evgen( + crl_raw, + ctx={"evgen_mode_upto": ( + (("tbsCertList", "revokedCertificates", any), True), + )}, + ): + if ( + len(decode_path) == 3 and + decode_path[:2] == ("tbsCertList", "revokedCertificates"), + ): + print("serial number:", int(obj["userCertificate"])) + +.. note:: + + SEQUENCE/SET values with DEFAULT specified are automatically decoded + without evgen mode. + +.. _mmap: + +mmap-ed file +____________ + +POSIX compliant systems have ``mmap`` syscall, giving ability to work +the memory mapped file. You can deal with the file like it was an +ordinary binary string, allowing you not to load it to the memory first. +Also you can use them as an input for OCTET STRING, taking no Python +memory for their storage. + +There is convenient :py:func:`pyderasn.file_mmaped` function that +creates read-only memoryview on the file contents:: + + with open("huge", "rb") as fd: + raw = file_mmaped(fd) + obj = Something.decode(raw) + +.. warning:: + + mmap maps the **whole** file. So it plays no role if you seek-ed it + before. Take the slice of the resulting memoryview with required + offset instead. + +.. note:: + + If you use ZFS as underlying storage, then pay attention that + currently most platforms does not deal good with ZFS ARC and ordinary + page cache used for mmaps. It can take twice the necessary size in + the memory: both in page cache and ZFS ARC. + +.. _keep_memoryview_ctx: + +That read-only memoryview could be safe to be used as a value inside +decoded :py:class:`pyderasn.OctetString` and :py:class:`pyderasn.Any` +objects. You can enable that by setting `"keep_memoryview": True` in +:ref:`decode context `. No OCTET STRING and ANY values will be +copied to memory. Of course that works only in DER encoding, where the +value is continuously encoded. + +CER encoding +____________ + +We can parse any kind of data now, but how can we produce files +streamingly, without storing their encoded representation in memory? +SEQUENCE by default encodes in memory all its values, joins them in huge +binary string, just to know the exact size of SEQUENCE's value for +encoding it in TLV. DER requires you to know all exact sizes of the +objects. + +You can use CER encoding mode, that slightly differs from the DER, but +does not require exact sizes knowledge, allowing streaming encoding +directly to some writer/buffer. Just use +:py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where +encoded data will flow:: + + with open("result", "wb") as fd: + obj.encode_cer(fd.write) + +:: + + buf = io.BytesIO() + obj.encode_cer(buf.write) + +If you do not want to create in-memory buffer every time, then you can +use :py:func:`pyderasn.encode_cer` function:: + + data = encode_cer(obj) + +Remember that CER is **not valid** DER in most cases, so you **have to** +use :ref:`bered ` :ref:`ctx ` option during its +decoding. Also currently there is **no** validation that provided CER is +valid one -- you are sure that it has only valid BER encoding. + +.. warning:: + + SET OF values can not be streamingly encoded, because they are + required to be sorted byte-by-byte. Big SET OF values still will take + much memory. Use neither SET nor SET OF values, as modern ASN.1 + also recommends too. + +Do not forget about using :ref:`mmap-ed ` memoryviews for your +OCTET STRINGs! They will be streamingly copied from underlying file to +the buffer using 1 KB chunks. + +Some structures require that some of the elements have to be forcefully +DER encoded. For example ``SignedData`` CMS requires you to encode +``SignedAttributes`` and X.509 certificates in DER form, allowing you to +encode everything else in BER. You can tell any of the structures to be +forcefully encoded in DER during CER encoding, by specifying +``der_forced=True`` attribute:: + + class Certificate(Sequence): + schema = (...) + der_forced = True + + class SignedAttributes(SetOf): + schema = Attribute() + bounds = (1, float("+inf")) + der_forced = True + +.. _agg_octet_string: + +agg_octet_string +________________ + +In most cases, huge quantity of binary data is stored as OCTET STRING. +CER encoding splits it on 1 KB chunks. BER allows splitting on various +levels of chunks inclusion:: + + SOME STRING[CONSTRUCTED] + OCTET STRING[CONSTRUCTED] + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[CONSTRUCTED] + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[PRIMITIVE] + DATA CHUNK + OCTET STRING[CONSTRUCTED] + OCTET STRING[CONSTRUCTED] + OCTET STRING[PRIMITIVE] + DATA CHUNK + +You can not just take the offset and some ``.vlen`` of the STRING and +treat it as the payload. If you decode it without +:ref:`evgen mode `, then it will be automatically aggregated +and ``bytes()`` will give the whole payload contents. + +You are forced to use :ref:`evgen mode ` for decoding for +small memory footprint. There is convenient +:py:func:`pyderasn.agg_octet_string` helper for reconstructing the +payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with +huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the +SHA512 digest of its ``eContent``:: + + fd = open("data.p7m", "rb") + raw = file_mmaped(fd) + ctx = {"bered": True} + for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx): + if decode_path == ("content",): + content = obj + break + else: + raise ValueError("no content found") + hasher_state = sha512() + def hasher(data): + hasher_state.update(data) + return len(data) + evgens = SignedData().decode_evgen( + raw[content.offset:], + offset=content.offset, + ctx=ctx, + ) + agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher) + fd.close() + digest = hasher_state.digest() + +Simply replace ``hasher`` with some writeable file's ``fd.write`` to +copy the payload (without BER/CER encoding interleaved overhead) in it. +Virtually it won't take memory more than for keeping small structures +and 1 KB binary chunks. + +.. _seqof-iterators: + +SEQUENCE OF iterators +_____________________ + +You can use iterators as a value in :py:class:`pyderasn.SequenceOf` +classes. The only difference with providing the full list of objects, is +that type and bounds checking is done during encoding process. Also +sequence's value will be emptied after encoding, forcing you to set its +value again. + +This is very useful when you have to create some huge objects, like +CRLs, with thousands and millions of entities inside. You can write the +generator taking necessary data from the database and giving the +``RevokedCertificate`` objects. Only binary representation of that +objects will take memory during DER encoding. + +2-pass DER encoding +------------------- + +There is ability to do 2-pass encoding to DER, writing results directly +to specified writer (buffer, file, whatever). It could be 1.5+ times +slower than ordinary encoding, but it takes little memory for 1st pass +state storing. For example, 1st pass state for CACert.org's CRL with +~416K of certificate entries takes nearly 3.5 MB of memory. +``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes +nearly 0.5 KB of memory. + +If you use :ref:`mmap-ed ` memoryviews, :ref:`SEQUENCE OF +iterators ` and write directly to opened file, then +there is very small memory footprint. + +1st pass traverses through all the objects of the structure and returns +the size of DER encoded structure, together with 1st pass state object. +That state contains precalculated lengths for various objects inside the +structure. + +:: + + fulllen, state = obj.encode1st() + +2nd pass takes the writer and 1st pass state. It traverses through all +the objects again, but writes their encoded representation to the writer. + +:: + + with open("result", "wb") as fd: + obj.encode2nd(fd.write, iter(state)) + +.. warning:: + + You **MUST NOT** use 1st pass state if anything is changed in the + objects. It is intended to be used immediately after 1st pass is + done! + +If you use :ref:`SEQUENCE OF iterators `, then you +have to reinitialize the values after the 1st pass. And you **have to** +be sure that the iterator gives exactly the same values as previously. +Yes, you have to run your iterator twice -- because this is two pass +encoding mode. + +If you want to encode to the memory, then you can use convenient +:py:func:`pyderasn.encode2pass` helper. + +.. _browser: + +ASN.1 browser +------------- +.. autofunction:: pyderasn.browse + Base Obj -------- .. autoclass:: pyderasn.Obj @@ -549,7 +931,7 @@ _______ Integer _______ .. autoclass:: pyderasn.Integer - :members: __init__, named + :members: __init__, named, tohex BitString _________ @@ -588,15 +970,23 @@ _______________ .. autoclass:: pyderasn.PrintableString :members: __init__, allow_asterisk, allow_ampersand +IA5String +_________ +.. autoclass:: pyderasn.IA5String + +VisibleString +_____________ +.. autoclass:: pyderasn.VisibleString + UTCTime _______ .. autoclass:: pyderasn.UTCTime - :members: __init__, todatetime + :members: __init__, todatetime, totzdatetime GeneralizedTime _______________ .. autoclass:: pyderasn.GeneralizedTime - :members: __init__, todatetime + :members: __init__, todatetime, totzdatetime Special types ------------- @@ -642,11 +1032,15 @@ Various ------- .. autofunction:: pyderasn.abs_decode_path +.. autofunction:: pyderasn.agg_octet_string +.. autofunction:: pyderasn.ascii_visualize .. autofunction:: pyderasn.colonize_hex +.. autofunction:: pyderasn.encode2pass .. autofunction:: pyderasn.encode_cer .. autofunction:: pyderasn.file_mmaped .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec +.. autofunction:: pyderasn.hexdump .. autofunction:: pyderasn.tag_encode .. autofunction:: pyderasn.tag_decode .. autofunction:: pyderasn.tag_ctxp @@ -774,8 +1168,7 @@ Now you can print only the specified tree, for example signature algorithm:: . . 05:00 """ -from codecs import getdecoder -from codecs import getencoder +from array import array from collections import namedtuple from collections import OrderedDict from copy import copy @@ -783,37 +1176,27 @@ from datetime import datetime from datetime import timedelta from io import BytesIO from math import ceil -from mmap import mmap -from mmap import PROT_READ from operator import attrgetter from string import ascii_letters from string import digits +from struct import Struct as struct_Struct +from sys import maxsize as sys_maxsize from sys import version_info from unicodedata import category as unicat -from six import add_metaclass -from six import binary_type -from six import byte2int -from six import indexbytes -from six import int2byte -from six import integer_types -from six import iterbytes -from six import iteritems -from six import itervalues -from six import PY2 -from six import string_types -from six import text_type -from six import unichr as six_unichr -from six.moves import xrange as six_xrange - - try: from termcolor import colored except ImportError: # pragma: no cover def colored(what, *args, **kwargs): return what -__version__ = "7.0" +try: + from dateutil.tz import UTC as tzUTC +except ImportError: # pragma: no cover + tzUTC = "missing" + + +__version__ = "9.3" __all__ = ( "agg_octet_string", @@ -823,8 +1206,10 @@ __all__ = ( "Boolean", "BoundsError", "Choice", + "colonize_hex", "DecodeError", "DecodePathDefBy", + "encode2pass", "encode_cer", "Enumerated", "ExceedingData", @@ -889,12 +1274,12 @@ TagClassReprs = { EOC = b"\x00\x00" EOC_LEN = len(EOC) LENINDEF = b"\x80" # length indefinite mark -LENINDEF_PP_CHAR = "I" if PY2 else "∞" +LENINDEF_PP_CHAR = "∞" NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__} SET01 = frozenset("01") DECIMALS = frozenset(digits) DECIMAL_SIGNS = ".," -NEXT_ATTR_NAME = "next" if PY2 else "__next__" +NEXT_ATTR_NAME = "__next__" def file_mmaped(fd): @@ -902,14 +1287,21 @@ def file_mmaped(fd): :param fd: file object :returns: memoryview over read-only mmap-ing of the whole file + + .. warning:: + + It does not work under Windows. """ - return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ)) + import mmap + return memoryview(mmap.mmap(fd.fileno(), length=0, prot=mmap.PROT_READ)) + def pureint(value): if not set(value) <= DECIMALS: raise ValueError("non-pure integer") return int(value) + def fractions2float(fractions_raw): pureint(fractions_raw) return float("0." + fractions_raw) @@ -922,7 +1314,7 @@ def get_def_by_path(defines_by_path, sub_decode_path): if len(path) != len(sub_decode_path): continue for p1, p2 in zip(path, sub_decode_path): - if (not p1 is any) and (p1 != p2): + if (p1 is not any) and (p1 != p2): break else: return define @@ -948,7 +1340,7 @@ class DecodeError(ASN1Error): decoding process has passed :param int offset: binary offset where failure happened """ - super(DecodeError, self).__init__() + super().__init__() self.msg = msg self.klass = klass self.decode_path = decode_path @@ -977,7 +1369,7 @@ class NotEnoughData(DecodeError): class ExceedingData(ASN1Error): def __init__(self, nbytes): - super(ExceedingData, self).__init__() + super().__init__() self.nbytes = nbytes def __str__(self): @@ -1005,7 +1397,7 @@ class InvalidOID(DecodeError): class ObjUnknown(ASN1Error): def __init__(self, name): - super(ObjUnknown, self).__init__() + super().__init__() self.name = name def __str__(self): @@ -1017,7 +1409,7 @@ class ObjUnknown(ASN1Error): class ObjNotReady(ASN1Error): def __init__(self, name): - super(ObjNotReady, self).__init__() + super().__init__() self.name = name def __str__(self): @@ -1029,7 +1421,7 @@ class ObjNotReady(ASN1Error): class InvalidValueType(ASN1Error): def __init__(self, expected_types): - super(InvalidValueType, self).__init__() + super().__init__() self.expected_types = expected_types def __str__(self): @@ -1043,7 +1435,7 @@ class InvalidValueType(ASN1Error): class BoundsError(ASN1Error): def __init__(self, bound_min, value, bound_max): - super(BoundsError, self).__init__() + super().__init__() self.bound_min = bound_min self.value = value self.bound_max = bound_max @@ -1063,20 +1455,16 @@ class BoundsError(ASN1Error): # Basic coders ######################################################################## -_hexdecoder = getdecoder("hex") -_hexencoder = getencoder("hex") - - def hexdec(data): """Binary data to hexadecimal string convert """ - return _hexdecoder(data)[0] + return bytes.fromhex(data) def hexenc(data): """Hexadecimal string to binary data convert """ - return _hexencoder(data)[0].decode("ascii") + return data.hex() def int_bytes_len(num, byte_len=8): @@ -1098,6 +1486,9 @@ def zero_ended_encode(num): return bytes(octets) +int2byte = struct_Struct(">B").pack + + def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive): """Encode tag to binary form @@ -1126,13 +1517,13 @@ def tag_decode(tag): It returns tuple with three integers, as :py:func:`pyderasn.tag_encode` accepts. """ - first_octet = byte2int(tag) + first_octet = tag[0] klass = first_octet & 0xC0 form = first_octet & 0x20 if first_octet & 0x1F < 0x1F: return (klass, form, first_octet & 0x1F) num = 0 - for octet in iterbytes(tag[1:]): + for octet in tag[1:]: num <<= 7 num |= octet & 0x7F return (klass, form, num) @@ -1157,15 +1548,19 @@ def tag_strip(data): """ if len(data) == 0: raise NotEnoughData("no data at all") - if byte2int(data) & 0x1F < 31: + if data[0] & 0x1F < 31: return data[:1], 1, data[1:] i = 0 while True: i += 1 if i == len(data): raise DecodeError("unfinished tag") - if indexbytes(data, i) & 0x80 == 0: + if data[i] & 0x80 == 0: break + if i == 1 and data[1] < 0x1F: + raise DecodeError("unexpected long form") + if i > 1 and data[1] & 0x7F == 0: + raise DecodeError("leading zero byte in tag value") i += 1 return data[:i], i, data[i:] @@ -1175,7 +1570,7 @@ def len_encode(l): return int2byte(l) octets = bytearray(int_bytes_len(l) + 1) octets[0] = 0x80 | (len(octets) - 1) - for i in six_xrange(len(octets) - 1, 0, -1): + for i in range(len(octets) - 1, 0, -1): octets[i] = l & 0xFF l >>= 8 return bytes(octets) @@ -1189,7 +1584,7 @@ def len_decode(data): """ if len(data) == 0: raise NotEnoughData("no data at all") - first_octet = byte2int(data) + first_octet = data[0] if first_octet & 0x80 == 0: return first_octet, 1, data[1:] octets_num = first_octet & 0x7F @@ -1197,23 +1592,51 @@ def len_decode(data): raise NotEnoughData("encoded length is longer than data") if octets_num == 0: raise LenIndefForm() - if byte2int(data[1:]) == 0: + if data[1] == 0: raise DecodeError("leading zeros") l = 0 - for v in iterbytes(data[1:1 + octets_num]): + for v in data[1:1 + octets_num]: l = (l << 8) | v if l <= 127: raise DecodeError("long form instead of short one") return l, 1 + octets_num, data[1 + octets_num:] +LEN0 = len_encode(0) +LEN1 = len_encode(1) LEN1K = len_encode(1000) +def len_size(l): + """How many bytes length field will take + """ + if l < 128: + return 1 + if l < 256: # 1 << 8 + return 2 + if l < 65536: # 1 << 16 + return 3 + if l < 16777216: # 1 << 24 + return 4 + if l < 4294967296: # 1 << 32 + return 5 + if l < 1099511627776: # 1 << 40 + return 6 + if l < 281474976710656: # 1 << 48 + return 7 + if l < 72057594037927936: # 1 << 56 + return 8 + raise OverflowError("too big length") + + def write_full(writer, data): """Fully write provided data - BytesIO does not guarantee that the whole data will be written at once. + :param writer: must comply with ``io.RawIOBase.write`` behaviour + + BytesIO does not guarantee that the whole data will be written at + once. That function write everything provided, raising an error if + ``writer`` returns None. """ data = memoryview(data) written = 0 @@ -1224,6 +1647,17 @@ def write_full(writer, data): written += n +# If it is 64-bit system, then use compact 64-bit array of unsigned +# longs. Use an ordinary list with universal integers otherwise, that +# is slower. +if sys_maxsize > 2 ** 32: + def state_2pass_new(): + return array("L") +else: + def state_2pass_new(): + return [] + + ######################################################################## # Base class ######################################################################## @@ -1231,7 +1665,7 @@ def write_full(writer, data): class AutoAddSlots(type): def __new__(cls, name, bases, _dict): _dict["__slots__"] = _dict.get("__slots__", ()) - return type.__new__(cls, name, bases, _dict) + return super().__new__(cls, name, bases, _dict) BasicState = namedtuple("BasicState", ( @@ -1250,8 +1684,7 @@ BasicState = namedtuple("BasicState", ( ), **NAMEDTUPLE_KWARGS) -@add_metaclass(AutoAddSlots) -class Obj(object): +class Obj(metaclass=AutoAddSlots): """Common ASN.1 object class All ASN.1 types are inherited from it. It has metaclass that @@ -1354,18 +1787,18 @@ class Obj(object): @property def tlen(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return len(self.tag) @property def tlvlen(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return self.tlen + self.llen + self.vlen def __str__(self): # pragma: no cover - return self.__bytes__() if PY2 else self.__unicode__() + return self.__unicode__() def __ne__(self, their): return not(self == their) @@ -1382,11 +1815,20 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover yield NotImplemented + def _encode1st(self, state): + raise NotImplementedError() + + def _encode2nd(self, writer, state_iter): + raise NotImplementedError() + def encode(self): - """Encode the structure + """DER encode the structure :returns: DER representation """ @@ -1395,7 +1837,44 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) + def encode1st(self, state=None): + """Do the 1st pass of 2-pass encoding + + :rtype: (int, array("L")) + :returns: full length of encoded data and precalculated various + objects lengths + """ + if state is None: + state = state_2pass_new() + if self._expl is None: + return self._encode1st(state) + state.append(0) + idx = len(state) - 1 + vlen, _ = self._encode1st(state) + state[idx] = vlen + fulllen = len(self._expl) + len_size(vlen) + vlen + return fulllen, state + + def encode2nd(self, writer, state_iter): + """Do the 2nd pass of 2-pass encoding + + :param writer: must comply with ``io.RawIOBase.write`` behaviour + :param state_iter: iterator over the 1st pass state (``iter(state)``) + """ + if self._expl is None: + self._encode2nd(writer, state_iter) + else: + write_full(writer, self._expl + len_encode(next(state_iter))) + self._encode2nd(writer, state_iter) + def encode_cer(self, writer): + """CER encode the structure to specified writer + + :param writer: must comply with ``io.RawIOBase.write`` + behaviour. It takes slice to be written and + returns number of bytes processed. If it returns + None, then exception will be raised + """ if self._expl is not None: write_full(writer, self._expl + LENINDEF) if getattr(self, "der_forced", False): @@ -1405,9 +1884,6 @@ class Obj(object): if self._expl is not None: write_full(writer, EOC) - def _encode_cer(self, writer): - write_full(writer, self._encode()) - def hexencode(self): """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode` """ @@ -1423,6 +1899,26 @@ class Obj(object): tag_only=False, _ctx_immutable=True, ): + """Decode the data + + :param data: either binary or memoryview + :param int offset: initial data's offset + :param bool leavemm: do we need to leave memoryview of remaining + data as is, or convert it to bytes otherwise + :param decode_path: current decode path (tuples of strings, + possibly with DecodePathDefBy) with will be + the root for all underlying objects + :param ctx: optional :ref:`context ` governing decoding process + :param bool tag_only: decode only the tag, without length and + contents (used only in Choice and Set + structures, trying to determine if tag satisfies + the schema) + :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx`` + before using it? + :returns: (Obj, remaining data) + + .. seealso:: :ref:`decoding` + """ result = next(self.decode_evgen( data, offset, @@ -1449,21 +1945,12 @@ class Obj(object): _ctx_immutable=True, _evgen_mode=True, ): - """Decode the data - - :param data: either binary or memoryview - :param int offset: initial data's offset - :param bool leavemm: do we need to leave memoryview of remaining - data as is, or convert it to bytes otherwise - :param ctx: optional :ref:`context ` governing decoding process - :param tag_only: decode only the tag, without length and contents - (used only in Choice and Set structures, trying to - determine if tag satisfies the schema) - :param _ctx_immutable: do we need to ``copy.copy()`` ``ctx`` - before using it? - :returns: (Obj, remaining data) + """Decode with evgen mode on - .. seealso:: :ref:`decoding` + That method is identical to :py:meth:`pyderasn.Obj.decode`, but + it returns the generator producing ``(decode_path, obj, tail)`` + values. + .. seealso:: :ref:`evgen mode `. """ if ctx is None: ctx = {} @@ -1488,7 +1975,7 @@ class Obj(object): yield None return _decode_path, obj, tail = result - if not _decode_path is decode_path: + if _decode_path is not decode_path: yield result else: try: @@ -1530,7 +2017,7 @@ class Obj(object): yield None return _decode_path, obj, tail = result - if not _decode_path is decode_path: + if _decode_path is not decode_path: yield result eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] if eoc_expected.tobytes() != EOC: @@ -1569,7 +2056,7 @@ class Obj(object): yield None return _decode_path, obj, tail = result - if not _decode_path is decode_path: + if _decode_path is not decode_path: yield result if obj.tlvlen < l and not ctx.get("allow_expl_oob", False): raise DecodeError( @@ -1612,25 +2099,25 @@ class Obj(object): @property def expled(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return self._expl is not None @property def expl_tag(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return self._expl @property def expl_tlen(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return len(self._expl) @property def expl_llen(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ if self.expl_lenindef: return 1 @@ -1638,31 +2125,31 @@ class Obj(object): @property def expl_offset(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return self.offset - self.expl_tlen - self.expl_llen @property def expl_vlen(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return self.tlvlen @property def expl_tlvlen(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return self.expl_tlen + self.expl_llen + self.expl_vlen @property def fulloffset(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return self.expl_offset if self.expled else self.offset @property def fulllen(self): - """See :ref:`decoding` + """.. seealso:: :ref:`decoding` """ return self.expl_tlvlen if self.expled else self.tlvlen @@ -1700,14 +2187,27 @@ class Obj(object): def encode_cer(obj): - """Encode to CER in memory + """Encode to CER in memory buffer + + :returns bytes: memory buffer contents """ buf = BytesIO() obj.encode_cer(buf.write) return buf.getvalue() -class DecodePathDefBy(object): +def encode2pass(obj): + """Encode (2-pass mode) to DER in memory buffer + + :returns bytes: memory buffer contents + """ + buf = BytesIO() + _, state = obj.encode1st() + obj.encode2nd(buf.write, iter(state)) + return buf.getvalue() + + +class DecodePathDefBy: """DEFINED BY representation inside decode path """ __slots__ = ("defined_by",) @@ -1817,7 +2317,16 @@ def _colourize(what, colour, with_colours, attrs=("bold",)): def colonize_hex(hexed): """Separate hexadecimal string with colons """ - return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2)) + return ":".join(hexed[i:i + 2] for i in range(0, len(hexed), 2)) + + +def find_oid_name(asn1_type_name, oid_maps, value): + if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name: + for oid_map in oid_maps: + oid_name = oid_map.get(value) + if oid_name is not None: + return oid_name + return None def pp_console_row( @@ -1843,9 +2352,7 @@ def pp_console_row( col += _colourize("B", "red", with_colours) if pp.bered else " " cols.append(col) col = "[%d,%d,%4d]%s" % ( - pp.tlen, - pp.llen, - pp.vlen, + pp.tlen, pp.llen, pp.vlen, LENINDEF_PP_CHAR if pp.lenindef else " " ) col = _colourize(col, "green", with_colours, ()) @@ -1857,19 +2364,11 @@ def pp_console_row( if isinstance(ent, DecodePathDefBy): cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",))) value = str(ent.defined_by) - oid_name = None - if ( - len(oid_maps) > 0 and - ent.defined_by.asn1_type_name == - ObjectIdentifier.asn1_type_name - ): - for oid_map in oid_maps: - oid_name = oid_map.get(value) - if oid_name is not None: - cols.append(_colourize("%s:" % oid_name, "green", with_colours)) - break + oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value) if oid_name is None: cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",))) + else: + cols.append(_colourize("%s:" % oid_name, "green", with_colours)) else: cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",))) if pp.expl is not None: @@ -1888,26 +2387,15 @@ def pp_console_row( if pp.value is not None: value = pp.value cols.append(_colourize(value, "white", with_colours, ("reverse",))) - if ( - len(oid_maps) > 0 and - pp.asn1_type_name == ObjectIdentifier.asn1_type_name - ): - for oid_map in oid_maps: - oid_name = oid_map.get(value) - if oid_name is not None: - cols.append(_colourize("(%s)" % oid_name, "green", with_colours)) - break + oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value) + if oid_name is not None: + cols.append(_colourize("(%s)" % oid_name, "green", with_colours)) if pp.asn1_type_name == Integer.asn1_type_name: - hex_repr = hex(int(pp.obj._value))[2:].upper() - if len(hex_repr) % 2 != 0: - hex_repr = "0" + hex_repr cols.append(_colourize( - "(%s)" % colonize_hex(hex_repr), - "green", - with_colours, + "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours, )) if with_blob: - if pp.blob.__class__ == binary_type: + if pp.blob.__class__ == bytes: cols.append(hexenc(pp.blob)) elif pp.blob.__class__ == tuple: cols.append(", ".join(pp.blob)) @@ -1929,9 +2417,9 @@ def pp_console_blob(pp, decode_path_len_decrease=0): decode_path_len = len(pp.decode_path) - decode_path_len_decrease if decode_path_len > 0: cols.append(" ." * (decode_path_len + 1)) - if pp.blob.__class__ == binary_type: + if pp.blob.__class__ == bytes: blob = hexenc(pp.blob).upper() - for i in six_xrange(0, len(blob), 32): + for i in range(0, len(blob), 32): chunk = blob[i:i + 32] yield " ".join(cols + [colonize_hex(chunk)]) elif pp.blob.__class__ == tuple: @@ -1950,7 +2438,7 @@ def pprint( """Pretty print object :param Obj obj: object you want to pretty print - :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary. + :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries. Its human readable form is printed when OID is met :param big_blobs: if large binary objects are met (like OctetString values), do we need to print them too, on separate @@ -2043,7 +2531,7 @@ class Boolean(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(Boolean, self).__init__(impl, expl, default, optional, _decoded) + super().__init__(impl, expl, default, optional, _decoded) self._value = None if value is None else self._value_sanitize(value) if default is not None: default = self._value_sanitize(default) @@ -2084,7 +2572,7 @@ class Boolean(Obj): ) def __setstate__(self, state): - super(Boolean, self).__setstate__(state) + super().__setstate__(state) self._value = state.value def __nonzero__(self): @@ -2124,11 +2612,14 @@ class Boolean(Obj): def _encode(self): self._assert_ready() - return b"".join(( - self.tag, - len_encode(1), - (b"\xFF" if self._value else b"\x00"), - )) + return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00"))) + + def _encode1st(self, state): + return len(self.tag) + 2, state + + def _encode2nd(self, writer, state_iter): + self._assert_ready() + write_full(writer, self._encode()) def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: @@ -2172,7 +2663,7 @@ class Boolean(Obj): decode_path=decode_path, offset=offset, ) - first_octet = byte2int(v) + first_octet = v[0] ber_encoded = False if first_octet == 0: value = False @@ -2296,7 +2787,7 @@ class Integer(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(Integer, self).__init__(impl, expl, default, optional, _decoded) + super().__init__(impl, expl, default, optional, _decoded) self._value = value specs = getattr(self, "schema", {}) if _specs is None else _specs self.specs = specs if specs.__class__ == dict else dict(specs) @@ -2319,7 +2810,7 @@ class Integer(Obj): self._value = default def _value_sanitize(self, value): - if isinstance(value, integer_types): + if isinstance(value, int): pass elif issubclass(value.__class__, Integer): value = value._value @@ -2358,7 +2849,7 @@ class Integer(Obj): ) def __setstate__(self, state): - super(Integer, self).__setstate__(state) + super().__setstate__(state) self.specs = state.specs self._value = state.value self._bound_min = state.bound_min @@ -2368,16 +2859,26 @@ class Integer(Obj): self._assert_ready() return int(self._value) + def tohex(self): + """Hexadecimal representation + + Use :py:func:`pyderasn.colonize_hex` for colonizing it. + """ + hex_repr = hex(int(self))[2:].upper() + if len(hex_repr) % 2 != 0: + hex_repr = "0" + hex_repr + return hex_repr + def __hash__(self): self._assert_ready() - return hash( - self.tag + - bytes(self._expl or b"") + + return hash(b"".join(( + self.tag, + bytes(self._expl or b""), str(self._value).encode("ascii"), - ) + ))) def __eq__(self, their): - if isinstance(their, integer_types): + if isinstance(their, int): return self._value == their if not issubclass(their.__class__, Integer): return False @@ -2394,7 +2895,7 @@ class Integer(Obj): def named(self): """Return named representation (if exists) of the value """ - for name, value in iteritems(self.specs): + for name, value in self.specs.items(): if value == self._value: return name return None @@ -2421,45 +2922,30 @@ class Integer(Obj): _specs=self.specs, ) - def _encode(self): + def _encode_payload(self): self._assert_ready() value = self._value - if PY2: - if value == 0: - octets = bytearray([0]) - elif value < 0: - value = -value - value -= 1 - octets = bytearray() - while value > 0: - octets.append((value & 0xFF) ^ 0xFF) - value >>= 8 - if len(octets) == 0 or octets[-1] & 0x80 == 0: - octets.append(0xFF) + bytes_len = ceil(value.bit_length() / 8) or 1 + while True: + try: + octets = value.to_bytes(bytes_len, byteorder="big", signed=True) + except OverflowError: + bytes_len += 1 else: - octets = bytearray() - while value > 0: - octets.append(value & 0xFF) - value >>= 8 - if octets[-1] & 0x80 > 0: - octets.append(0x00) - octets.reverse() - octets = bytes(octets) - else: - bytes_len = ceil(value.bit_length() / 8) or 1 - while True: - try: - octets = value.to_bytes( - bytes_len, - byteorder="big", - signed=True, - ) - except OverflowError: - bytes_len += 1 - else: - break + break + return octets + + def _encode(self): + octets = self._encode_payload() return b"".join((self.tag, len_encode(len(octets)), octets)) + def _encode1st(self, state): + l = len(self._encode_payload()) + return len(self.tag) + len_size(l) + l, state + + def _encode2nd(self, writer, state_iter): + write_full(writer, self._encode()) + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) @@ -2503,9 +2989,9 @@ class Integer(Obj): offset=offset, ) v, tail = v[:l], v[l:] - first_octet = byte2int(v) + first_octet = v[0] if l > 1: - second_octet = byte2int(v[1:]) + second_octet = v[1] if ( ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or ((first_octet == 0xFF) and (second_octet & 0x80 != 0)) @@ -2516,21 +3002,7 @@ class Integer(Obj): decode_path=decode_path, offset=offset, ) - if PY2: - value = 0 - if first_octet & 0x80 > 0: - octets = bytearray() - for octet in bytearray(v): - octets.append(octet ^ 0xFF) - for octet in octets: - value = (value << 8) | octet - value += 1 - value = -value - else: - for octet in bytearray(v): - value = (value << 8) | octet - else: - value = int.from_bytes(v, byteorder="big", signed=True) + value = int.from_bytes(v, byteorder="big", signed=True) try: obj = self.__class__( value=value, @@ -2660,7 +3132,7 @@ class BitString(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(BitString, self).__init__(impl, expl, default, optional, _decoded) + super().__init__(impl, expl, default, optional, _decoded) specs = getattr(self, "schema", {}) if _specs is None else _specs self.specs = specs if specs.__class__ == dict else dict(specs) self._value = None if value is None else self._value_sanitize(value) @@ -2687,14 +3159,14 @@ class BitString(Obj): bit_len = len(bits) bits += "0" * ((8 - (bit_len % 8)) % 8) octets = bytearray(len(bits) // 8) - for i in six_xrange(len(octets)): + for i in range(len(octets)): octets[i] = int(bits[i * 8:(i * 8) + 8], 2) return bit_len, bytes(octets) def _value_sanitize(self, value): - if isinstance(value, (string_types, binary_type)): + if isinstance(value, (str, bytes)): if ( - isinstance(value, string_types) and + isinstance(value, str) and value.startswith("'") ): if value.endswith("'B"): @@ -2708,14 +3180,14 @@ class BitString(Obj): len(value) * 4, hexdec(value + ("" if len(value) % 2 == 0 else "0")), ) - if value.__class__ == binary_type: + if value.__class__ == bytes: return (len(value) * 8, value) - raise InvalidValueType((self.__class__, string_types, binary_type)) + raise InvalidValueType((self.__class__, str, bytes)) if value.__class__ == tuple: if ( len(value) == 2 and - isinstance(value[0], integer_types) and - value[1].__class__ == binary_type + isinstance(value[0], int) and + value[1].__class__ == bytes ): return value bits = [] @@ -2729,11 +3201,11 @@ class BitString(Obj): bits = frozenset(bits) return self._bits2octets("".join( ("1" if bit in bits else "0") - for bit in six_xrange(max(bits) + 1) + for bit in range(max(bits) + 1) )) if issubclass(value.__class__, BitString): return value._value - raise InvalidValueType((self.__class__, binary_type, string_types)) + raise InvalidValueType((self.__class__, bytes, str)) @property def ready(self): @@ -2760,7 +3232,7 @@ class BitString(Obj): ) def __setstate__(self, state): - super(BitString, self).__setstate__(state) + super().__setstate__(state) self.specs = state.specs self._value = state.value self.tag_constructed = state.tag_constructed @@ -2768,7 +3240,7 @@ class BitString(Obj): def __iter__(self): self._assert_ready() - for i in six_xrange(self._value[0]): + for i in range(self._value[0]): yield self[i] @property @@ -2799,7 +3271,7 @@ class BitString(Obj): :returns: [str(name), ...] """ - return [name for name, bit in iteritems(self.specs) if self[bit]] + return [name for name, bit in self.specs.items() if self[bit]] def __call__( self, @@ -2823,11 +3295,8 @@ class BitString(Obj): bit_len, octets = self._value if key >= bit_len: return False - return ( - byte2int(memoryview(octets)[key // 8:]) >> - (7 - (key % 8)) - ) & 1 == 1 - if isinstance(key, string_types): + return memoryview(octets)[key // 8] >> (7 - (key % 8)) & 1 == 1 + if isinstance(key, str): value = self.specs.get(key) if value is None: raise ObjUnknown("BitString value: %s" % key) @@ -2844,6 +3313,21 @@ class BitString(Obj): octets, )) + def _encode1st(self, state): + self._assert_ready() + _, octets = self._value + l = len(octets) + 1 + return len(self.tag) + len_size(l) + l, state + + def _encode2nd(self, writer, state_iter): + bit_len, octets = self._value + write_full(writer, b"".join(( + self.tag, + len_encode(len(octets) + 1), + int2byte((8 - bit_len % 8) % 8), + ))) + write_full(writer, octets) + def _encode_cer(self, writer): bit_len, octets = self._value if len(octets) + 1 <= 1000: @@ -2851,14 +3335,14 @@ class BitString(Obj): return write_full(writer, self.tag_constructed) write_full(writer, LENINDEF) - for offset in six_xrange(0, (len(octets) // 999) * 999, 999): + for offset in range(0, (len(octets) // 999) * 999, 999): write_full(writer, b"".join(( BitString.tag_default, LEN1K, - int2byte(0), + b"\x00", octets[offset:offset + 999], ))) - tail = octets[offset+999:] + tail = octets[offset + 999:] if len(tail) > 0: tail = int2byte((8 - bit_len % 8) % 8) + tail write_full(writer, b"".join(( @@ -2905,7 +3389,7 @@ class BitString(Obj): decode_path=decode_path, offset=offset, ) - pad_size = byte2int(v) + pad_size = v[0] if l == 1 and pad_size != 0: raise DecodeError( "invalid empty value", @@ -2920,7 +3404,7 @@ class BitString(Obj): decode_path=decode_path, offset=offset, ) - if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0: + if v[l - 1] & ((1 << pad_size) - 1) != 0: raise DecodeError( "invalid pad", klass=self.__class__, @@ -3156,6 +3640,7 @@ class OctetString(Obj): tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" evgen_mode_skip_value = True + memoryview_safe = True def __init__( self, @@ -3178,7 +3663,7 @@ class OctetString(Obj): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(OctetString, self).__init__(impl, expl, default, optional, _decoded) + super().__init__(impl, expl, default, optional, _decoded) self._value = value self._bound_min, self._bound_max = getattr( self, @@ -3205,7 +3690,7 @@ class OctetString(Obj): ) def _value_sanitize(self, value): - if value.__class__ == binary_type or value.__class__ == memoryview: + if value.__class__ == bytes or value.__class__ == memoryview: pass elif issubclass(value.__class__, OctetString): value = value._value @@ -3241,7 +3726,7 @@ class OctetString(Obj): ) def __setstate__(self, state): - super(OctetString, self).__setstate__(state) + super().__setstate__(state) self._value = state.value self._bound_min = state.bound_min self._bound_max = state.bound_max @@ -3252,8 +3737,12 @@ class OctetString(Obj): self._assert_ready() return bytes(self._value) + def memoryview(self): + self._assert_ready() + return memoryview(self._value) + def __eq__(self, their): - if their.__class__ == binary_type: + if their.__class__ == bytes: return self._value == their if not issubclass(their.__class__, OctetString): return False @@ -3295,6 +3784,16 @@ class OctetString(Obj): self._value, )) + def _encode1st(self, state): + self._assert_ready() + l = len(self._value) + return len(self.tag) + len_size(l) + l, state + + def _encode2nd(self, writer, state_iter): + value = self._value + write_full(writer, self.tag + len_encode(len(value))) + write_full(writer, value) + def _encode_cer(self, writer): octets = self._value if len(octets) <= 1000: @@ -3302,13 +3801,13 @@ class OctetString(Obj): return write_full(writer, self.tag_constructed) write_full(writer, LENINDEF) - for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000): + for offset in range(0, (len(octets) // 1000) * 1000, 1000): write_full(writer, b"".join(( OctetString.tag_default, LEN1K, octets[offset:offset + 1000], ))) - tail = octets[offset+1000:] + tail = octets[offset + 1000:] if len(tail) > 0: write_full(writer, b"".join(( OctetString.tag_default, @@ -3355,12 +3854,15 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) + if evgen_mode and self.evgen_mode_skip_value: + value = None + elif self.memoryview_safe and ctx.get("keep_memoryview", False): + value = v + else: + value = v.tobytes() try: obj = self.__class__( - value=( - None if (evgen_mode and self.evgen_mode_skip_value) - else v.tobytes() - ), + value=value, bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -3559,8 +4061,12 @@ def agg_octet_string(evgens, decode_path, raw, writer): :param evgens: iterator of generated events :param decode_path: points to the string we want to decode :param raw: slicebable (memoryview, bytearray, etc) with - the data evgens are generated one + the data evgens are generated on :param writer: buffer.write where string is going to be saved + :param writer: where string is going to be saved. Must comply + with ``io.RawIOBase.write`` behaviour + + .. seealso:: :ref:`agg_octet_string` """ decode_path_len = len(decode_path) for dp, obj, _ in evgens: @@ -3604,7 +4110,7 @@ class Null(Obj): :param bytes expl: override default tag with ``EXPLICIT`` one :param bool optional: is object ``OPTIONAL`` in sequence """ - super(Null, self).__init__(impl, expl, None, optional, _decoded) + super().__init__(impl, expl, None, optional, _decoded) self.default = None @property @@ -3649,7 +4155,13 @@ class Null(Obj): ) def _encode(self): - return self.tag + len_encode(0) + return self.tag + LEN0 + + def _encode1st(self, state): + return len(self.tag) + 1, state + + def _encode2nd(self, writer, state_iter): + write_full(writer, self.tag + LEN0) def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: @@ -3772,13 +4284,15 @@ class ObjectIdentifier(Obj): tuple element is ``{OID: pyderasn.Obj()}`` dictionary, mapping between current OID value and structure applied to defined field. - :ref:`Read about DEFINED BY ` + + .. seealso:: :ref:`definedby` + :param bytes impl: override default tag with ``IMPLICIT`` one :param bytes expl: override default tag with ``EXPLICIT`` one :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded) + super().__init__(impl, expl, default, optional, _decoded) self._value = value if value is not None: self._value = self._value_sanitize(value) @@ -3795,7 +4309,7 @@ class ObjectIdentifier(Obj): def __add__(self, their): if their.__class__ == tuple: - return self.__class__(self._value + their) + return self.__class__(self._value + array("L", their)) if isinstance(their, self.__class__): return self.__class__(self._value + their._value) raise InvalidValueType((self.__class__, tuple)) @@ -3803,12 +4317,17 @@ class ObjectIdentifier(Obj): def _value_sanitize(self, value): if issubclass(value.__class__, ObjectIdentifier): return value._value - if isinstance(value, string_types): + if isinstance(value, str): try: - value = tuple(pureint(arc) for arc in value.split(".")) + value = array("L", (pureint(arc) for arc in value.split("."))) except ValueError: raise InvalidOID("unacceptable arcs values") if value.__class__ == tuple: + try: + value = array("L", value) + except OverflowError as err: + raise InvalidOID(repr(err)) + if value.__class__ is array: if len(value) < 2: raise InvalidOID("less than 2 arcs") first_arc = value[0] @@ -3847,7 +4366,7 @@ class ObjectIdentifier(Obj): ) def __setstate__(self, state): - super(ObjectIdentifier, self).__setstate__(state) + super().__setstate__(state) self._value = state.value self.defines = state.defines @@ -3860,15 +4379,15 @@ class ObjectIdentifier(Obj): def __hash__(self): self._assert_ready() - return hash( - self.tag + - bytes(self._expl or b"") + + return hash(b"".join(( + self.tag, + bytes(self._expl or b""), str(self._value).encode("ascii"), - ) + ))) def __eq__(self, their): if their.__class__ == tuple: - return self._value == their + return self._value == array("L", their) if not issubclass(their.__class__, ObjectIdentifier): return False return ( @@ -3898,7 +4417,7 @@ class ObjectIdentifier(Obj): optional=self.optional if optional is None else optional, ) - def _encode(self): + def _encode_octets(self): self._assert_ready() value = self._value first_value = value[1] @@ -3914,9 +4433,19 @@ class ObjectIdentifier(Obj): octets = [zero_ended_encode(first_value)] for arc in value[2:]: octets.append(zero_ended_encode(arc)) - v = b"".join(octets) + return b"".join(octets) + + def _encode(self): + v = self._encode_octets() return b"".join((self.tag, len_encode(len(v)), v)) + def _encode1st(self, state): + l = len(self._encode_octets()) + return len(self.tag) + len_size(l) + l, state + + def _encode2nd(self, writer, state_iter): + write_full(writer, self._encode()) + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, _, lv = tag_strip(tlv) @@ -3960,21 +4489,34 @@ class ObjectIdentifier(Obj): offset=offset, ) v, tail = v[:l], v[l:] - arcs = [] + arcs = array("L") ber_encoded = False while len(v) > 0: i = 0 arc = 0 while True: - octet = indexbytes(v, i) + octet = v[i] if i == 0 and octet == 0x80: if ctx.get("bered", False): ber_encoded = True else: - raise DecodeError("non normalized arc encoding") + raise DecodeError( + "non normalized arc encoding", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) arc = (arc << 7) | (octet & 0x7F) if octet & 0x80 == 0: - arcs.append(arc) + try: + arcs.append(arc) + except OverflowError: + raise DecodeError( + "too huge value for local unsigned long", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) v = v[i + 1:] break i += 1 @@ -3996,7 +4538,7 @@ class ObjectIdentifier(Obj): first_arc = 2 second_arc -= 80 obj = self.__class__( - value=tuple([first_arc, second_arc] + arcs[1:]), + value=array("L", (first_arc, second_arc)) + arcs[1:], impl=self.tag, expl=self._expl, default=self.default, @@ -4058,7 +4600,7 @@ class Enumerated(Integer): _decoded=(0, 0, 0), bounds=None, # dummy argument, workability for Integer.decode ): - super(Enumerated, self).__init__( + super().__init__( value, bounds, impl, expl, default, optional, _specs, _decoded, ) if len(self.specs) == 0: @@ -4067,8 +4609,8 @@ class Enumerated(Integer): def _value_sanitize(self, value): if isinstance(value, self.__class__): value = value._value - elif isinstance(value, integer_types): - for _value in itervalues(self.specs): + elif isinstance(value, int): + for _value in self.specs.values(): if _value == value: break else: @@ -4076,7 +4618,7 @@ class Enumerated(Integer): "unknown integer value: %s" % value, klass=self.__class__, ) - elif isinstance(value, string_types): + elif isinstance(value, str): value = self.specs.get(value) if value is None: raise ObjUnknown("integer value: %s" % value) @@ -4143,27 +4685,25 @@ class CommonString(OctetString): :header-rows: 1 * - Class - - Text Encoding + - Text Encoding, validation * - :py:class:`pyderasn.UTF8String` - utf-8 * - :py:class:`pyderasn.NumericString` - - ascii + - proper alphabet validation * - :py:class:`pyderasn.PrintableString` - - ascii + - proper alphabet validation * - :py:class:`pyderasn.TeletexString` - - ascii + - iso-8859-1 * - :py:class:`pyderasn.T61String` - - ascii + - iso-8859-1 * - :py:class:`pyderasn.VideotexString` - iso-8859-1 * - :py:class:`pyderasn.IA5String` - - ascii + - proper alphabet validation * - :py:class:`pyderasn.GraphicString` - iso-8859-1 - * - :py:class:`pyderasn.VisibleString` - - ascii - * - :py:class:`pyderasn.ISO646String` - - ascii + * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String` + - proper alphabet validation * - :py:class:`pyderasn.GeneralString` - iso-8859-1 * - :py:class:`pyderasn.UniversalString` @@ -4172,18 +4712,19 @@ class CommonString(OctetString): - utf-16-be """ __slots__ = () + memoryview_safe = False def _value_sanitize(self, value): value_raw = None value_decoded = None if isinstance(value, self.__class__): value_raw = value._value - elif value.__class__ == text_type: + elif value.__class__ == str: value_decoded = value - elif value.__class__ == binary_type: + elif value.__class__ == bytes: value_raw = value else: - raise InvalidValueType((self.__class__, text_type, binary_type)) + raise InvalidValueType((self.__class__, str, bytes)) try: value_raw = ( value_decoded.encode(self.encoding) @@ -4204,9 +4745,9 @@ class CommonString(OctetString): return value_raw def __eq__(self, their): - if their.__class__ == binary_type: + if their.__class__ == bytes: return self._value == their - if their.__class__ == text_type: + if their.__class__ == str: return self._value == their.encode(self.encoding) if not isinstance(their, self.__class__): return False @@ -4219,18 +4760,18 @@ class CommonString(OctetString): def __unicode__(self): if self.ready: return self._value.decode(self.encoding) - return text_type(self._value) + return str(self._value) + + def memoryview(self): + raise ValueError("CommonString does not support .memoryview()") def __repr__(self): - return pp_console_row(next(self.pps(no_unicode=PY2))) + return pp_console_row(next(self.pps())) - def pps(self, decode_path=(), no_unicode=False): + def pps(self, decode_path=()): value = None if self.ready: - value = ( - hexenc(bytes(self)) if no_unicode else - "".join(escape_control_unicode(c) for c in self.__unicode__()) - ) + value = "".join(escape_control_unicode(c) for c in self.__unicode__()) yield _pp( obj=self, asn1_type_name=self.asn1_type_name, @@ -4264,12 +4805,21 @@ class UTF8String(CommonString): asn1_type_name = "UTF8String" -class AllowableCharsMixin(object): +class AllowableCharsMixin: + __slots__ = () + @property def allowable_chars(self): - if PY2: - return self._allowable_chars - return frozenset(six_unichr(c) for c in self._allowable_chars) + return frozenset(chr(c) for c in self._allowable_chars) + + def _value_sanitize(self, value): + value = super()._value_sanitize(value) + if not frozenset(value) <= self._allowable_chars: + raise DecodeError("non satisfying alphabet value") + return value + + +NUMERIC_ALLOWABLE_CHARS = frozenset(digits.encode("ascii") + b" ") class NumericString(AllowableCharsMixin, CommonString): @@ -4281,17 +4831,14 @@ class NumericString(AllowableCharsMixin, CommonString): >>> NumericString().allowable_chars frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' ']) """ - __slots__ = () + __slots__ = ("_allowable_chars",) tag_default = tag_encode(18) encoding = "ascii" asn1_type_name = "NumericString" - _allowable_chars = frozenset(digits.encode("ascii") + b" ") - def _value_sanitize(self, value): - value = super(NumericString, self)._value_sanitize(value) - if not frozenset(value) <= self._allowable_chars: - raise DecodeError("non-numeric value") - return value + def __init__(self, *args, **kwargs): + self._allowable_chars = NUMERIC_ALLOWABLE_CHARS + super().__init__(*args, **kwargs) PrintableStringState = namedtuple( @@ -4301,6 +4848,11 @@ PrintableStringState = namedtuple( ) +PRINTABLE_ALLOWABLE_CHARS = frozenset( + (ascii_letters + digits + " '()+,-./:=?").encode("ascii") +) + + class PrintableString(AllowableCharsMixin, CommonString): """Printable string @@ -4313,13 +4865,10 @@ class PrintableString(AllowableCharsMixin, CommonString): >>> obj.allow_asterisk, obj.allow_ampersand (True, False) """ - __slots__ = () + __slots__ = ("_allowable_chars",) tag_default = tag_encode(19) encoding = "ascii" asn1_type_name = "PrintableString" - _allowable_chars = frozenset( - (ascii_letters + digits + " '()+,-./:=?").encode("ascii") - ) _asterisk = frozenset("*".encode("ascii")) _ampersand = frozenset("&".encode("ascii")) @@ -4340,11 +4889,13 @@ class PrintableString(AllowableCharsMixin, CommonString): :param allow_asterisk: allow asterisk character :param allow_ampersand: allow ampersand character """ + allowable_chars = PRINTABLE_ALLOWABLE_CHARS if allow_asterisk: - self._allowable_chars |= self._asterisk + allowable_chars |= self._asterisk if allow_ampersand: - self._allowable_chars |= self._ampersand - super(PrintableString, self).__init__( + allowable_chars |= self._ampersand + self._allowable_chars = allowable_chars + super().__init__( value, bounds, impl, expl, default, optional, _decoded, ctx, ) @@ -4360,20 +4911,14 @@ class PrintableString(AllowableCharsMixin, CommonString): """ return self._ampersand <= self._allowable_chars - def _value_sanitize(self, value): - value = super(PrintableString, self)._value_sanitize(value) - if not frozenset(value) <= self._allowable_chars: - raise DecodeError("non-printable value") - return value - def __getstate__(self): return PrintableStringState( - *super(PrintableString, self).__getstate__(), + *super().__getstate__(), **{"allowable_chars": self._allowable_chars} ) def __setstate__(self, state): - super(PrintableString, self).__setstate__(state) + super().__setstate__(state) self._allowable_chars = state.allowable_chars def __call__( @@ -4403,7 +4948,7 @@ class PrintableString(AllowableCharsMixin, CommonString): class TeletexString(CommonString): __slots__ = () tag_default = tag_encode(20) - encoding = "ascii" + encoding = "iso-8859-1" asn1_type_name = "TeletexString" @@ -4419,24 +4964,71 @@ class VideotexString(CommonString): asn1_type_name = "VideotexString" -class IA5String(CommonString): - __slots__ = () +IA5_ALLOWABLE_CHARS = frozenset(b"".join( + chr(c).encode("ascii") for c in range(128) +)) + + +class IA5String(AllowableCharsMixin, CommonString): + """IA5 string + + Its value is properly sanitized: it is a mix of + + * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G) + * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0) + * DEL character (0x7F) + + It is just 7-bit ASCII. + + >>> IA5String().allowable_chars + frozenset(["NUL", ... "DEL"]) + """ + __slots__ = ("_allowable_chars",) tag_default = tag_encode(22) encoding = "ascii" asn1_type_name = "IA5" + def __init__(self, *args, **kwargs): + self._allowable_chars = IA5_ALLOWABLE_CHARS + super().__init__(*args, **kwargs) + LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ") +LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ) +LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ") LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ") +LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ) -class VisibleString(CommonString): - __slots__ = () +VISIBLE_ALLOWABLE_CHARS = frozenset(b"".join( + chr(c).encode("ascii") for c in range(ord(" "), ord("~") + 1) +)) + + +class VisibleString(AllowableCharsMixin, CommonString): + """Visible string + + Its value is properly sanitized. ASCII subset from space to tilde is + allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf + + >>> VisibleString().allowable_chars + frozenset([" ", ... "~"]) + """ + __slots__ = ("_allowable_chars",) tag_default = tag_encode(26) encoding = "ascii" asn1_type_name = "VisibleString" + def __init__(self, *args, **kwargs): + self._allowable_chars = VISIBLE_ALLOWABLE_CHARS + super().__init__(*args, **kwargs) + + +class ISO646String(VisibleString): + __slots__ = () + asn1_type_name = "ISO646String" + UTCTimeState = namedtuple( "UTCTimeState", @@ -4468,19 +5060,28 @@ class UTCTime(VisibleString): datetime.datetime(2017, 9, 30, 22, 7, 50) >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime() datetime.datetime(1957, 9, 30, 22, 7, 50) + >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).totzdatetime() + datetime.datetime(1957, 9, 30, 22, 7, 50, tzinfo=tzutc()) If BER encoded value was met, then ``ber_raw`` attribute will hold its raw representation. .. warning:: - Pay attention that UTCTime can not hold full year, so all years - having < 50 years are treated as 20xx, 19xx otherwise, according - to X.509 recommendation. + Only **naive** ``datetime`` objects are supported. + Library assumes that all work is done in UTC. .. warning:: - No strict validation of UTC offsets are made, but very crude: + Pay attention that ``UTCTime`` can not hold full year, so all years + having < 50 years are treated as 20xx, 19xx otherwise, according to + X.509 recommendation. Use ``GeneralizedTime`` instead for + removing ambiguity. + + .. warning:: + + No strict validation of UTC offsets are made (only applicable to + **BER**), but very crude: * minutes are not exceeding 60 * offset value is not exceeding 14 hours @@ -4510,9 +5111,7 @@ class UTCTime(VisibleString): :param default: set default value. Type same as in ``value`` :param bool optional: is object ``OPTIONAL`` in sequence """ - super(UTCTime, self).__init__( - None, None, impl, expl, None, optional, _decoded, ctx, - ) + super().__init__(None, None, impl, expl, None, optional, _decoded, ctx) self._value = value self.ber_raw = None if value is not None: @@ -4583,7 +5182,7 @@ class UTCTime(VisibleString): return value.replace(microsecond=0) def _value_sanitize(self, value, ctx=None): - if value.__class__ == binary_type: + if value.__class__ == bytes: try: value_decoded = value.decode("ascii") except (UnicodeEncodeError, UnicodeDecodeError) as err: @@ -4607,6 +5206,8 @@ class UTCTime(VisibleString): if isinstance(value, self.__class__): return value._value, None if value.__class__ == datetime: + if value.tzinfo is not None: + raise ValueError("only naive datetime supported") return self._dt_sanitize(value), None raise InvalidValueType((self.__class__, datetime)) @@ -4616,6 +5217,7 @@ class UTCTime(VisibleString): if self.ber_encoded: value += " (%s)" % self.ber_raw return value + return None def __unicode__(self): if self.ready: @@ -4623,16 +5225,13 @@ class UTCTime(VisibleString): if self.ber_encoded: value += " (%s)" % self.ber_raw return value - return text_type(self._pp_value()) + return str(self._pp_value()) def __getstate__(self): - return UTCTimeState( - *super(UTCTime, self).__getstate__(), - **{"ber_raw": self.ber_raw} - ) + return UTCTimeState(*super().__getstate__(), **{"ber_raw": self.ber_raw}) def __setstate__(self, state): - super(UTCTime, self).__setstate__(state) + super().__setstate__(state) self.ber_raw = state.ber_raw def __bytes__(self): @@ -4640,7 +5239,7 @@ class UTCTime(VisibleString): return self._encode_time() def __eq__(self, their): - if their.__class__ == binary_type: + if their.__class__ == bytes: return self._encode_time() == their if their.__class__ == datetime: return self.todatetime() == their @@ -4657,8 +5256,14 @@ class UTCTime(VisibleString): def _encode(self): self._assert_ready() - value = self._encode_time() - return b"".join((self.tag, len_encode(len(value)), value)) + return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time())) + + def _encode1st(self, state): + return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state + + def _encode2nd(self, writer, state_iter): + self._assert_ready() + write_full(writer, self._encode()) def _encode_cer(self, writer): write_full(writer, self._encode()) @@ -4666,6 +5271,12 @@ class UTCTime(VisibleString): def todatetime(self): return self._value + def totzdatetime(self): + try: + return self._value.replace(tzinfo=tzUTC) + except TypeError as err: + raise NotImplementedError("Missing dateutil.tz") from err + def __repr__(self): return pp_console_row(next(self.pps())) @@ -4710,23 +5321,23 @@ class GeneralizedTime(UTCTime): .. warning:: - Only microsecond fractions are supported in DER encoding. - :py:exc:`pyderasn.DecodeError` will be raised during decoding of - higher precision values. + Only **naive** datetime objects are supported. + Library assumes that all work is done in UTC. .. warning:: - BER encoded data can loss information (accuracy) during decoding - because of float transformations. + Only **microsecond** fractions are supported in DER encoding. + :py:exc:`pyderasn.DecodeError` will be raised during decoding of + higher precision values. .. warning:: - Local times (without explicit timezone specification) are treated - as UTC one, no transformations are made. + **BER** encoded data can loss information (accuracy) during + decoding because of float transformations. .. warning:: - Zero year is unsupported. + **Zero** year is unsupported. """ __slots__ = () tag_default = tag_encode(24) @@ -4824,6 +5435,22 @@ class GeneralizedTime(UTCTime): encoded += (".%06d" % value.microsecond).rstrip("0") return (encoded + "Z").encode("ascii") + def _encode(self): + self._assert_ready() + value = self._value + if value.microsecond > 0: + encoded = self._encode_time() + return b"".join((self.tag, len_encode(len(encoded)), encoded)) + return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time())) + + def _encode1st(self, state): + self._assert_ready() + vlen = len(self._encode_time()) + return len(self.tag) + len_size(vlen) + vlen, state + + def _encode2nd(self, writer, state_iter): + write_full(writer, self._encode()) + class GraphicString(CommonString): __slots__ = () @@ -4832,11 +5459,6 @@ class GraphicString(CommonString): asn1_type_name = "GraphicString" -class ISO646String(VisibleString): - __slots__ = () - asn1_type_name = "ISO646String" - - class GeneralString(CommonString): __slots__ = () tag_default = tag_encode(27) @@ -4920,7 +5542,7 @@ class Choice(Obj): """ if impl is not None: raise ValueError("no implicit tag allowed for CHOICE") - super(Choice, self).__init__(None, expl, default, optional, _decoded) + super().__init__(None, expl, default, optional, _decoded) if schema is None: schema = getattr(self, "schema", ()) if len(schema) == 0: @@ -4986,7 +5608,7 @@ class Choice(Obj): ) def __setstate__(self, state): - super(Choice, self).__setstate__(state) + super().__setstate__(state) self.specs = state.specs self._value = state.value @@ -5036,7 +5658,7 @@ class Choice(Obj): @property def tag_order_cer(self): - return min(v.tag_order_cer for v in itervalues(self.specs)) + return min(v.tag_order_cer for v in self.specs.values()) def __getitem__(self, key): if key not in self.specs: @@ -5068,12 +5690,19 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() + def _encode1st(self, state): + self._assert_ready() + return self._value[1].encode1st(state) + + def _encode2nd(self, writer, state_iter): + self._value[1].encode2nd(writer, state_iter) + def _encode_cer(self, writer): self._assert_ready() self._value[1].encode_cer(writer) def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): - for choice, spec in iteritems(self.specs): + for choice, spec in self.specs.items(): sub_decode_path = decode_path + (choice,) try: spec.decode( @@ -5228,14 +5857,14 @@ class Any(Obj): :param bytes expl: override default tag with ``EXPLICIT`` one :param bool optional: is object ``OPTIONAL`` in sequence """ - super(Any, self).__init__(None, expl, None, optional, _decoded) + super().__init__(None, expl, None, optional, _decoded) if value is None: self._value = None else: value = self._value_sanitize(value) self._value = value if self._expl is None: - if value.__class__ == binary_type: + if value.__class__ == bytes or value.__class__ == memoryview: tag_class, _, tag_num = tag_decode(tag_strip(value)[0]) else: tag_class, tag_num = value.tag_order @@ -5245,14 +5874,14 @@ class Any(Obj): self.defined = None def _value_sanitize(self, value): - if value.__class__ == binary_type: + if value.__class__ == bytes or value.__class__ == memoryview: if len(value) == 0: - raise ValueError("Any value can not be empty") + raise ValueError("%s value can not be empty" % self.__class__.__name__) return value if isinstance(value, self.__class__): return value._value if not isinstance(value, Obj): - raise InvalidValueType((self.__class__, Obj, binary_type)) + raise InvalidValueType((self.__class__, Obj, bytes)) return value @property @@ -5291,18 +5920,18 @@ class Any(Obj): ) def __setstate__(self, state): - super(Any, self).__setstate__(state) + super().__setstate__(state) self._value = state.value self.defined = state.defined def __eq__(self, their): - if their.__class__ == binary_type: - if self._value.__class__ == binary_type: + if their.__class__ == bytes or their.__class__ == memoryview: + if self._value.__class__ == bytes or their.__class__ == memoryview: return self._value == their return self._value.encode() == their if issubclass(their.__class__, Any): if self.ready and their.ready: - return bytes(self) == bytes(their) + return self.memoryview() == their.memoryview() return self.ready == their.ready return False @@ -5321,10 +5950,19 @@ class Any(Obj): def __bytes__(self): self._assert_ready() value = self._value - if value.__class__ == binary_type: + if value.__class__ == bytes: return value + if value.__class__ == memoryview: + return bytes(value) return self._value.encode() + def memoryview(self): + self._assert_ready() + value = self._value + if value.__class__ == memoryview: + return memoryview(value) + return memoryview(bytes(self)) + @property def tlen(self): return 0 @@ -5332,14 +5970,28 @@ class Any(Obj): def _encode(self): self._assert_ready() value = self._value - if value.__class__ == binary_type: - return value + if value.__class__ == bytes or value.__class__ == memoryview: + return bytes(self) return value.encode() + def _encode1st(self, state): + self._assert_ready() + value = self._value + if value.__class__ == bytes or value.__class__ == memoryview: + return len(value), state + return value.encode1st(state) + + def _encode2nd(self, writer, state_iter): + value = self._value + if value.__class__ == bytes or value.__class__ == memoryview: + write_full(writer, value) + else: + value.encode2nd(writer, state_iter) + def _encode_cer(self, writer): self._assert_ready() value = self._value - if value.__class__ == binary_type: + if value.__class__ == bytes or value.__class__ == memoryview: write_full(writer, value) else: value.encode_cer(writer) @@ -5406,8 +6058,14 @@ class Any(Obj): ) tlvlen = tlen + llen + l v, tail = tlv[:tlvlen], v[l:] + if evgen_mode: + value = None + elif ctx.get("keep_memoryview", False): + value = v + else: + value = v.tobytes() obj = self.__class__( - value=None if evgen_mode else v.tobytes(), + value=value, expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), @@ -5422,7 +6080,7 @@ class Any(Obj): value = self._value if value is None: pass - elif value.__class__ == binary_type: + elif value.__class__ == bytes or value.__class__ == memoryview: value = None else: value = repr(value) @@ -5432,7 +6090,10 @@ class Any(Obj): obj_name=self.__class__.__name__, decode_path=decode_path, value=value, - blob=self._value if self._value.__class__ == binary_type else None, + blob=self._value if ( + self._value.__class__ == bytes or + value.__class__ == memoryview + ) else None, optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), @@ -5462,19 +6123,6 @@ class Any(Obj): # ASN.1 constructed types ######################################################################## -def get_def_by_path(defines_by_path, sub_decode_path): - """Get define by decode path - """ - for path, define in defines_by_path: - if len(path) != len(sub_decode_path): - continue - for p1, p2 in zip(path, sub_decode_path): - if (not p1 is any) and (p1 != p2): - break - else: - return define - - def abs_decode_path(decode_path, rel_path): """Create an absolute decode path from current and relative ones @@ -5507,7 +6155,21 @@ SequenceState = namedtuple( ) -class Sequence(Obj): +class SequenceEncode1stMixin: + __slots__ = () + + def _encode1st(self, state): + state.append(0) + idx = len(state) - 1 + vlen = 0 + for v in self._values_for_encoding(): + l, _ = v.encode1st(state) + vlen += l + state[idx] = vlen + return len(self.tag) + len_size(vlen) + vlen, state + + +class Sequence(SequenceEncode1stMixin, Obj): """``SEQUENCE`` structure type You have to make specification of sequence:: @@ -5595,11 +6257,10 @@ class Sequence(Obj): defaulted values existence validation by setting ``"allow_default_values": True`` :ref:`context ` option. - .. warning:: - - Check for default value existence is not performed in - ``evgen_mode``, because previously decoded values are not stored - in memory, to be able to compare them. + All values with DEFAULT specified are decoded atomically in + :ref:`evgen mode `. If DEFAULT value is some kind of + SEQUENCE, then it will be yielded as a single element, not + disassembled. That is required for DEFAULT existence check. Two sequences are equal if they have equal specification (schema), implicit/explicit tagging and the same values. @@ -5618,7 +6279,7 @@ class Sequence(Obj): optional=False, _decoded=(0, 0, 0), ): - super(Sequence, self).__init__(impl, expl, default, optional, _decoded) + super().__init__(impl, expl, default, optional, _decoded) if schema is None: schema = getattr(self, "schema", ()) self.specs = ( @@ -5646,7 +6307,7 @@ class Sequence(Obj): @property def ready(self): - for name, spec in iteritems(self.specs): + for name, spec in self.specs.items(): value = self._value.get(name) if value is None: if spec.optional: @@ -5660,7 +6321,7 @@ class Sequence(Obj): def bered(self): if self.expl_lenindef or self.lenindef or self.ber_encoded: return True - return any(value.bered for value in itervalues(self._value)) + return any(value.bered for value in self._value.values()) def __getstate__(self): return SequenceState( @@ -5677,11 +6338,11 @@ class Sequence(Obj): self.lenindef, self.ber_encoded, self.specs, - {k: copy(v) for k, v in iteritems(self._value)}, + {k: copy(v) for k, v in self._value.items()}, ) def __setstate__(self, state): - super(Sequence, self).__setstate__(state) + super().__setstate__(state) self.specs = state.specs self._value = state.value @@ -5742,7 +6403,7 @@ class Sequence(Obj): return None def _values_for_encoding(self): - for name, spec in iteritems(self.specs): + for name, spec in self.specs.items(): value = self._value.get(name) if value is None: if spec.optional: @@ -5754,6 +6415,11 @@ class Sequence(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) + def _encode2nd(self, writer, state_iter): + write_full(writer, self.tag + len_encode(next(state_iter))) + for v in self._values_for_encoding(): + v.encode2nd(writer, state_iter) + def _encode_cer(self, writer): write_full(writer, self.tag + LENINDEF) for v in self._values_for_encoding(): @@ -5814,15 +6480,16 @@ class Sequence(Obj): values = {} ber_encoded = False ctx_allow_default_values = ctx.get("allow_default_values", False) - for name, spec in iteritems(self.specs): + for name, spec in self.specs.items(): if spec.optional and ( (lenindef and v[:EOC_LEN].tobytes() == EOC) or len(v) == 0 ): continue + spec_defaulted = spec.default is not None sub_decode_path = decode_path + (name,) try: - if evgen_mode: + if evgen_mode and not spec_defaulted: for _decode_path, value, v_tail in spec.decode_evgen( v, sub_offset, @@ -5900,9 +6567,10 @@ class Sequence(Obj): vlen += value_len sub_offset += value_len v = v_tail - if not evgen_mode: - if spec.default is not None and value == spec.default: - # This will not work in evgen_mode + if spec_defaulted: + if evgen_mode: + yield sub_decode_path, value, v_tail + if value == spec.default: if ctx_bered or ctx_allow_default_values: ber_encoded = True else: @@ -5912,6 +6580,7 @@ class Sequence(Obj): decode_path=sub_decode_path, offset=sub_offset, ) + if not evgen_mode: values[name] = value spec_defines = getattr(spec, "defines", ()) if len(spec_defines) == 0: @@ -5998,7 +6667,7 @@ class Sequence(Obj): yield pp -class Set(Sequence): +class Set(Sequence, SequenceEncode1stMixin): """``SET`` structure type Its usage is identical to :py:class:`pyderasn.Sequence`. @@ -6015,17 +6684,13 @@ class Set(Sequence): tag_default = tag_encode(form=TagFormConstructed, num=17) asn1_type_name = "SET" - def _encode(self): - v = b"".join(value.encode() for value in sorted( - self._values_for_encoding(), - key=attrgetter("tag_order"), - )) - return b"".join((self.tag, len_encode(len(v)), v)) + def _values_for_encoding(self): + return sorted(super()._values_for_encoding(), key=attrgetter("tag_order")) def _encode_cer(self, writer): write_full(writer, self.tag + LENINDEF) for v in sorted( - self._values_for_encoding(), + super()._values_for_encoding(), key=attrgetter("tag_order_cer"), ): v.encode_cer(writer) @@ -6091,7 +6756,7 @@ class Set(Sequence): while len(v) > 0: if lenindef and v[:EOC_LEN].tobytes() == EOC: break - for name, spec in iteritems(_specs_items): + for name, spec in _specs_items.items(): sub_decode_path = decode_path + (name,) try: spec.decode( @@ -6112,7 +6777,8 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) - if evgen_mode: + spec_defaulted = spec.default is not None + if evgen_mode and not spec_defaulted: for _decode_path, value, v_tail in spec.decode_evgen( v, sub_offset, @@ -6144,17 +6810,20 @@ class Set(Sequence): decode_path=sub_decode_path, offset=sub_offset, ) - if spec.default is None or value != spec.default: - pass - elif ctx_bered or ctx_allow_default_values: - ber_encoded = True - else: - raise DecodeError( - "DEFAULT value met", - klass=self.__class__, - decode_path=sub_decode_path, - offset=sub_offset, - ) + if spec_defaulted: + if evgen_mode: + yield sub_decode_path, value, v_tail + if value != spec.default: + pass + elif ctx_bered or ctx_allow_default_values: + ber_encoded = True + else: + raise DecodeError( + "DEFAULT value met", + klass=self.__class__, + decode_path=sub_decode_path, + offset=sub_offset, + ) values[name] = value del _specs_items[name] tag_order_prev = value_tag_order @@ -6180,7 +6849,7 @@ class Set(Sequence): ) tail = v[EOC_LEN:] obj.lenindef = True - for name, spec in iteritems(self.specs): + for name, spec in self.specs.items(): if name not in values and not spec.optional: raise DecodeError( "%s value is not ready" % name, @@ -6201,7 +6870,7 @@ SequenceOfState = namedtuple( ) -class SequenceOf(Obj): +class SequenceOf(SequenceEncode1stMixin, Obj): """``SEQUENCE OF`` sequence type For that kind of type you must specify the object it will carry on @@ -6258,7 +6927,7 @@ class SequenceOf(Obj): optional=False, _decoded=(0, 0, 0), ): - super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded) + super().__init__(impl, expl, default, optional, _decoded) if schema is None: schema = getattr(self, "schema", None) if schema is None: @@ -6290,7 +6959,6 @@ class SequenceOf(Obj): value = value._value elif hasattr(value, NEXT_ATTR_NAME): iterator = True - value = value elif hasattr(value, "__iter__"): value = list(value) else: @@ -6341,7 +7009,7 @@ class SequenceOf(Obj): ) def __setstate__(self, state): - super(SequenceOf, self).__setstate__(state) + super().__setstate__(state) self.spec = state.spec self._value = state.value self._bound_min = state.bound_min @@ -6431,6 +7099,31 @@ class SequenceOf(Obj): value = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(value)), value)) + def _encode1st(self, state): + state = super()._encode1st(state) + if hasattr(self._value, NEXT_ATTR_NAME): + self._value = [] + return state + + def _encode2nd(self, writer, state_iter): + write_full(writer, self.tag + len_encode(next(state_iter))) + iterator = hasattr(self._value, NEXT_ATTR_NAME) + if iterator: + values_count = 0 + class_expected = self.spec.__class__ + values_for_encoding = self._values_for_encoding() + self._value = [] + for v in values_for_encoding: + if not isinstance(v, class_expected): + raise InvalidValueType((class_expected,)) + v.encode2nd(writer, state_iter) + values_count += 1 + if not self._bound_min <= values_count <= self._bound_max: + raise BoundsError(self._bound_min, values_count, self._bound_max) + else: + for v in self._values_for_encoding(): + v.encode2nd(writer, state_iter) + def _encode_cer(self, writer): write_full(writer, self.tag + LENINDEF) iterator = hasattr(self._value, NEXT_ATTR_NAME) @@ -6643,7 +7336,7 @@ class SetOf(SequenceOf): asn1_type_name = "SET OF" def _value_sanitize(self, value): - value = super(SetOf, self)._value_sanitize(value) + value = super()._value_sanitize(value) if hasattr(value, NEXT_ATTR_NAME): raise ValueError( "SetOf does not support iterator values, as no sense in them" @@ -6654,6 +7347,17 @@ class SetOf(SequenceOf): v = b"".join(sorted(v.encode() for v in self._values_for_encoding())) return b"".join((self.tag, len_encode(len(v)), v)) + def _encode2nd(self, writer, state_iter): + write_full(writer, self.tag + len_encode(next(state_iter))) + values = [] + for v in self._values_for_encoding(): + buf = BytesIO() + v.encode2nd(buf.write, state_iter) + values.append(buf.getvalue()) + values.sort() + for v in values: + write_full(writer, v) + def _encode_cer(self, writer): write_full(writer, self.tag + LENINDEF) for v in sorted(encode_cer(v) for v in self._values_for_encoding()): @@ -6661,7 +7365,7 @@ class SetOf(SequenceOf): write_full(writer, EOC) def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): - return super(SetOf, self)._decode( + return super()._decode( tlv, offset, decode_path, @@ -6695,7 +7399,7 @@ def generic_decoder(): # pragma: no cover choice = PrimitiveTypes() choice.specs["SequenceOf"] = SequenceOf(schema=choice) choice.specs["SetOf"] = SetOf(schema=choice) - for i in six_xrange(31): + for i in range(31): choice.specs["SequenceOf%d" % i] = SequenceOf( schema=choice, expl=tag_ctxc(i), @@ -6713,6 +7417,7 @@ def generic_decoder(): # pragma: no cover with_colours=False, with_decode_path=False, decode_path_only=(), + decode_path=(), ): def _pprint_pps(pps): for pp in pps: @@ -6744,13 +7449,412 @@ def generic_decoder(): # pragma: no cover else: for row in _pprint_pps(pp): yield row - return "\n".join(_pprint_pps(obj.pps())) + return "\n".join(_pprint_pps(obj.pps(decode_path))) return SEQUENCEOF(), pprint_any +def ascii_visualize(ba): + """Output only ASCII printable characters, like in hexdump -C + + Example output for given binary string (right part):: + + 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.| + ^^^^^^^^^^^^^^^^ + """ + return "".join((chr(b) if 0x20 <= b <= 0x7E else ".") for b in ba) + + +def hexdump(raw): + """Generate ``hexdump -C`` like output + + Rendered example:: + + 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..| + 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j| + + Result of that function is a generator of lines, where each line is + a list of columns:: + + [ + [...], + ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ", + " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ", + " |i..?.....G...k.j|"] + [...], + ] + """ + hexed = hexenc(raw).upper() + addr, cols = 0, ["%08x " % 0] + for i in range(0, len(hexed), 2): + if i != 0 and i // 2 % 8 == 0: + cols[-1] += " " + if i != 0 and i // 2 % 16 == 0: + cols.append(" |%s|" % ascii_visualize(bytes(raw[addr:addr + 16]))) + yield cols + addr += 16 + cols = ["%08x " % addr] + cols.append(" " + hexed[i:i + 2]) + if len(cols) > 0: + cols.append(" |%s|" % ascii_visualize(bytes(raw[addr:]))) + yield cols + + +def browse(raw, obj, oid_maps=()): + """Interactive browser + + :param bytes raw: binary data you decoded + :param obj: decoded :py:class:`pyderasn.Obj` + :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries. + Its human readable form is printed when OID is met + + .. note:: `urwid `__ dependency required + + This browser is an interactive terminal application for browsing + structures of your decoded ASN.1 objects. You can quit it with **q** + key. It consists of three windows: + + :tree: + View of ASN.1 elements hierarchy. You can navigate it using **Up**, + **Down**, **PageUp**, **PageDown**, **Home**, **End** keys. + **Left** key goes to constructed element above. **Plus**/**Minus** + keys collapse/uncollapse constructed elements. **Space** toggles it + :info: + window with various information about element. You can scroll it + with **h**/**l** (down, up) (**H**/**L** for triple speed) keys + :hexdump: + window with raw data hexdump and highlighted current element's + contents. It automatically focuses on element's data. You can + scroll it with **j**/**k** (down, up) (**J**/**K** for triple + speed) keys. If element has explicit tag, then it also will be + highlighted with different colour + + Window's header contains current decode path and progress bars with + position in *info* and *hexdump* windows. + + If you press **d**, then current element will be saved in the + current directory under its decode path name (adding ".0", ".1", etc + suffix if such file already exists). **D** will save it with explicit tag. + + You can also invoke it with ``--browse`` command line argument. + """ + from copy import deepcopy + from os.path import exists as path_exists + import urwid + + class TW(urwid.TreeWidget): + def __init__(self, state, *args, **kwargs): + self.state = state + self.scrolled = {"info": False, "hexdump": False} + super().__init__(*args, **kwargs) + + def _get_pp(self): + pp = self.get_node().get_value() + constructed = len(pp) > 1 + return (pp if hasattr(pp, "_fields") else pp[0]), constructed + + def _state_update(self): + pp, _ = self._get_pp() + self.state["decode_path"].set_text( + ":".join(str(p) for p in pp.decode_path) + ) + lines = deepcopy(self.state["hexed"]) + + def attr_set(i, attr): + line = lines[i // 16] + idx = 1 + (i - 16 * (i // 16)) + line[idx] = (attr, line[idx]) + + if pp.expl_offset is not None: + for i in range( + pp.expl_offset, + pp.expl_offset + pp.expl_tlen + pp.expl_llen, + ): + attr_set(i, "select-expl") + for i in range(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen): + attr_set(i, "select-value") + self.state["hexdump"]._set_body([urwid.Text(line) for line in lines]) + self.state["hexdump"].set_focus(pp.offset // 16) + self.state["hexdump"].set_focus_valign("middle") + self.state["hexdump_bar"].set_completion( + (100 * pp.offset // 16) // + len(self.state["hexdump"]._body.positions()) + ) + + lines = [ + [("header", "Name: "), pp.obj_name], + [("header", "Type: "), pp.asn1_type_name], + [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)], + [("header", "[TLV]len: "), "%d/%d/%d" % ( + pp.tlen, pp.llen, pp.vlen, + )], + [("header", "TLVlen: "), "%d" % sum(( + pp.tlen, pp.llen, pp.vlen, + ))], + [("header", "Slice: "), "[%d:%d]" % ( + pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen, + )], + ] + if pp.lenindef: + lines.append([("warning", "LENINDEF")]) + if pp.ber_encoded: + lines.append([("warning", "BER encoded")]) + if pp.bered: + lines.append([("warning", "BERed")]) + if pp.expl is not None: + lines.append([("header", "EXPLICIT")]) + klass, _, num = pp.expl + lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)]) + if pp.expl_offset is not None: + lines.append([" Offset: %d" % pp.expl_offset]) + lines.append([" [TLV]len: %d/%d/%d" % ( + pp.expl_tlen, pp.expl_llen, pp.expl_vlen, + )]) + lines.append([" TLVlen: %d" % sum(( + pp.expl_tlen, pp.expl_llen, pp.expl_vlen, + ))]) + lines.append([" Slice: [%d:%d]" % ( + pp.expl_offset, + pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen, + )]) + if pp.impl is not None: + klass, _, num = pp.impl + lines.append([ + ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num), + ]) + if pp.optional: + lines.append(["OPTIONAL"]) + if pp.default: + lines.append(["DEFAULT"]) + if len(pp.decode_path) > 0: + ent = pp.decode_path[-1] + if isinstance(ent, DecodePathDefBy): + lines.append([""]) + value = str(ent.defined_by) + oid_name = find_oid_name( + ent.defined_by.asn1_type_name, oid_maps, value, + ) + lines.append([("header", "DEFINED BY: "), "%s" % ( + value if oid_name is None + else "%s (%s)" % (oid_name, value) + )]) + lines.append([""]) + if pp.value is not None: + lines.append([("header", "Value: "), pp.value]) + if ( + len(oid_maps) > 0 and + pp.asn1_type_name == ObjectIdentifier.asn1_type_name + ): + for oid_map in oid_maps: + oid_name = oid_map.get(pp.value) + if oid_name is not None: + lines.append([("header", "Human: "), oid_name]) + break + if pp.asn1_type_name == Integer.asn1_type_name: + lines.append([ + ("header", "Decimal: "), "%d" % int(pp.obj), + ]) + lines.append([ + ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()), + ]) + if pp.blob.__class__ == bytes: + blob = hexenc(pp.blob).upper() + for i in range(0, len(blob), 32): + lines.append([colonize_hex(blob[i:i + 32])]) + elif pp.blob.__class__ == tuple: + lines.append([", ".join(pp.blob)]) + self.state["info"]._set_body([urwid.Text(line) for line in lines]) + self.state["info_bar"].set_completion(0) + + def selectable(self): + if self.state["widget_current"] != self: + self.state["widget_current"] = self + self.scrolled["info"] = False + self.scrolled["hexdump"] = False + self._state_update() + return super().selectable() + + def _get_display_text_without_offset(self): + pp, constructed = self._get_pp() + style = "constructed" if constructed else "" + if len(pp.decode_path) == 0: + return (style, pp.obj_name) + if pp.asn1_type_name == "EOC": + return ("eoc", "EOC") + ent = pp.decode_path[-1] + if isinstance(ent, DecodePathDefBy): + value = str(ent.defined_by) + oid_name = find_oid_name( + ent.defined_by.asn1_type_name, oid_maps, value, + ) + return ("defby", "DEFBY:" + ( + value if oid_name is None else oid_name + )) + return (style, ent) + + def get_display_text(self): + pp, _ = self._get_pp() + style, ent = self._get_display_text_without_offset() + return [(style, ent), " [%d]" % pp.offset] + + def _scroll(self, what, step): + self.state[what]._invalidate() + pos = self.state[what].focus_position + if not self.scrolled[what]: + self.scrolled[what] = True + pos -= 2 + pos = max(0, pos + step) + pos = min(pos, len(self.state[what]._body.positions()) - 1) + self.state[what].set_focus(pos) + self.state[what].set_focus_valign("top") + self.state[what + "_bar"].set_completion( + (100 * pos) // len(self.state[what]._body.positions()) + ) + + def keypress(self, size, key): + if key == "q": + raise urwid.ExitMainLoop() + + if key == " ": + self.expanded = not self.expanded + self.update_expanded_icon() + return None + + hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5} + if key in hexdump_steps: + self._scroll("hexdump", hexdump_steps[key]) + return None + + info_steps = {"h": 1, "l": -1, "H": 5, "L": -5} + if key in info_steps: + self._scroll("info", info_steps[key]) + return None + + if key in ("d", "D"): + pp, _ = self._get_pp() + dp = ":".join(str(p) for p in pp.decode_path) + dp = dp.replace(" ", "_") + if dp == "": + dp = "root" + if key == "d" or pp.expl_offset is None: + data = self.state["raw"][pp.offset:( + pp.offset + pp.tlen + pp.llen + pp.vlen + )] + else: + data = self.state["raw"][pp.expl_offset:( + pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen + )] + ctr = 0 + + def duplicate_path(dp, ctr): + if ctr == 0: + return dp + return "%s.%d" % (dp, ctr) + + while True: + if not path_exists(duplicate_path(dp, ctr)): + break + ctr += 1 + dp = duplicate_path(dp, ctr) + with open(dp, "wb") as fd: + fd.write(data) + self.state["decode_path"].set_text( + ("warning", "Saved to: " + dp) + ) + return None + return super().keypress(size, key) + + class PN(urwid.ParentNode): + def __init__(self, state, value, *args, **kwargs): + self.state = state + if not hasattr(value, "_fields"): + value = list(value) + super().__init__(value, *args, **kwargs) + + def load_widget(self): + return TW(self.state, self) + + def load_child_keys(self): + value = self.get_value() + if hasattr(value, "_fields"): + return [] + return range(len(value[1:])) + + def load_child_node(self, key): + return PN( + self.state, + self.get_value()[key + 1], + parent=self, + key=key, + depth=self.get_depth() + 1, + ) + + class LabeledPG(urwid.ProgressBar): + def __init__(self, label, *args, **kwargs): + self.label = label + super().__init__(*args, **kwargs) + + def get_text(self): + return "%s: %s" % (self.label, super().get_text()) + + WinHexdump = urwid.ListBox([urwid.Text("")]) + WinInfo = urwid.ListBox([urwid.Text("")]) + WinDecodePath = urwid.Text("", "center") + WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete") + WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete") + WinTree = urwid.TreeListBox(urwid.TreeWalker(PN( + { + "raw": raw, + "hexed": list(hexdump(raw)), + "widget_current": None, + "info": WinInfo, + "info_bar": WinInfoBar, + "hexdump": WinHexdump, + "hexdump_bar": WinHexdumpBar, + "decode_path": WinDecodePath, + }, + list(obj.pps()), + ))) + help_text = " ".join(( + "q:quit", + "space:(un)collapse", + "(pg)up/down/home/end:nav", + "jkJK:hexdump hlHL:info", + "dD:dump", + )) + urwid.MainLoop( + urwid.Frame( + urwid.Columns([ + ("weight", 1, WinTree), + ("weight", 2, urwid.Pile([ + urwid.LineBox(WinInfo), + urwid.LineBox(WinHexdump), + ])), + ]), + header=urwid.Columns([ + ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")), + ("weight", 1, WinInfoBar), + ("weight", 1, WinHexdumpBar), + ]), + footer=urwid.AttrWrap(urwid.Text(help_text), "help") + ), + [ + ("header", "bold", ""), + ("constructed", "bold", ""), + ("help", "light magenta", ""), + ("warning", "light red", ""), + ("defby", "light red", ""), + ("eoc", "dark red", ""), + ("select-value", "light green", ""), + ("select-expl", "light red", ""), + ("pg-normal", "", "light blue"), + ("pg-complete", "black", "yellow"), + ], + ).run() + + def main(): # pragma: no cover import argparse - parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/DER decoder") + parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder") parser.add_argument( "--skip", type=int, @@ -6793,25 +7897,30 @@ def main(): # pragma: no cover action="store_true", help="Turn on event generation mode", ) + parser.add_argument( + "--browse", + action="store_true", + help="Start ASN.1 browser", + ) parser.add_argument( "RAWFile", type=argparse.FileType("rb"), help="Path to BER/CER/DER file you want to decode", ) args = parser.parse_args() - if PY2: + try: + raw = file_mmaped(args.RAWFile)[args.skip:] + except: args.RAWFile.seek(args.skip) raw = memoryview(args.RAWFile.read()) args.RAWFile.close() - else: - raw = file_mmaped(args.RAWFile)[args.skip:] oid_maps = ( [obj_by_path(_path) for _path in (args.oids or "").split(",")] if args.oids else () ) + from functools import partial if args.schema: schema = obj_by_path(args.schema) - from functools import partial pprinter = partial(pprint, big_blobs=True) else: schema, pprinter = generic_decoder() @@ -6821,6 +7930,11 @@ def main(): # pragma: no cover } if args.defines_by_path is not None: ctx["defines_by_path"] = obj_by_path(args.defines_by_path) + if args.browse: + obj, _ = schema().decode(raw, ctx=ctx) + browse(raw, obj, oid_maps) + from sys import exit as sys_exit + sys_exit(0) from os import environ pprinter = partial( pprinter, @@ -6843,4 +7957,5 @@ def main(): # pragma: no cover if __name__ == "__main__": + from pyderasn import * main()