Announcements also go to this mailing list. + +Development Git source code repository currently is located here: + diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..d3827e7 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +1.0 diff --git a/doc/.gitignore b/doc/.gitignore new file mode 100644 index 0000000..e35d885 --- /dev/null +++ b/doc/.gitignore @@ -0,0 +1 @@ +_build diff --git a/doc/Makefile b/doc/Makefile new file mode 100644 index 0000000..2edea2b --- /dev/null +++ b/doc/Makefile @@ -0,0 +1,2 @@ +html: + python -msphinx . _build/html diff --git a/doc/ b/doc/ new file mode 100644 index 0000000..fc5892e --- /dev/null +++ b/doc/ @@ -0,0 +1,16 @@ +extensions = ["sphinx.ext.autodoc"] +templates_path = ["_templates"] +source_suffix = ".rst" +master_doc = "index" +project = "pyderasn" +copyright = "2017, Sergey Matveev" +author = "Sergey Matveev" +version = "1.0" +release = "1.0" +language = None +exclude_patterns = ["_build"] +pygments_style = "sphinx" +todo_include_todos = False +html_theme = "classic" +html_static_path = ["_static"] +html_sidebars = {} diff --git a/doc/download.rst b/doc/download.rst new file mode 100644 index 0000000..d4e91de --- /dev/null +++ b/doc/download.rst @@ -0,0 +1,41 @@ +.. _download: + +Download +======== + +.. list-table:: + :widths: 10 10 20 60 + :header-rows: 1 + + * - Package + - Size + - Tarball + - SHA256 checksum + * - ``attrs`` 17.2.0 + - 59 KiB + - `link `__ + `sign `__ + - ``612F3F53 90F2D0C7 FCA6A32A B5B1E750 5BC56C00 1D68B28F 56B7446D 6970DC0A`` + * - ``coverage`` 4.4.1 + - 287 KiB + - `link `__ + `sign `__ + - ``DF312773 C59A0CB5 1EB793F5 BA14A1D5 54D467D6 C46375F1 8E066DAA B8A86271`` + * - ``enum34`` 1.1.6 + - 31 KiB + - `link `__ + `sign `__ + - ``CC26B270 E58910E6 B54ACEE9 EC36C388 4C9BE18B 7A55FA46 305D4BA9 18D00177`` + * - ``hypothesis`` 3.30.4 + - 102 KiB + - `link `__ + `sign `__ + - ``A6281672 88FDCC15 EA806C45 9EBEF827 8D2A8BAD 01DB7C61 BD45D14A 905F53D6`` + * - ``six`` 1.11.0 + - 25 KiB + - `link `__ + `sign `__ + - ``890AC076 5EF9AEFA 5079CEBA ADE9C680 DBFB0E84 E7CFA1F9 9B9B43A8 5FA80126`` + +Development Git source code repository is located here: + diff --git a/doc/examples.rst b/doc/examples.rst new file mode 100644 index 0000000..16a1d54 --- /dev/null +++ b/doc/examples.rst @@ -0,0 +1,417 @@ +Examples +======== + +.. contents:: + +Schema definition +----------------- + +Let's try to parse X.509 certificate. We have to define our structures +based on ASN.1 schema descriptions. + +.. list-table:: + :header-rows: 1 + + * - ASN.1 specification + - pyderasn's code + * - :: + + Certificate ::= SEQUENCE { + tbsCertificate TBSCertificate, + signatureAlgorithm AlgorithmIdentifier, + signatureValue BIT STRING } + - :: + + class Certificate(Sequence): + schema = ( + ("tbsCertificate", TBSCertificate()), + ("signatureAlgorithm", AlgorithmIdentifier()), + ("signatureValue", BitString()), + ) + * - :: + + AlgorithmIdentifier ::= SEQUENCE { + algorithm OBJECT IDENTIFIER, + parameters ANY DEFINED BY algorithm OPTIONAL } + - :: + + class AlgorithmIdentifier(Sequence): + schema = ( + ("algorithm", ObjectIdentifier()), + ("parameters", Any(optional=True)), + ) + * - :: + + TBSCertificate ::= SEQUENCE { + version [0] EXPLICIT Version DEFAULT v1, + serialNumber CertificateSerialNumber, + signature AlgorithmIdentifier, + issuer Name, + validity Validity, + subject Name, + subjectPublicKeyInfo SubjectPublicKeyInfo, + issuerUniqueID [1] IMPLICIT UniqueIdentifier OPTIONAL, + subjectUniqueID [2] IMPLICIT UniqueIdentifier OPTIONAL, + extensions [3] EXPLICIT Extensions OPTIONAL } + - :: + + class TBSCertificate(Sequence): + schema = ( + ("version", Version(expl=tag_ctxc(0), default="v1")), + ("serialNumber", CertificateSerialNumber()), + ("signature", AlgorithmIdentifier()), + ("issuer", Name()), + ("validity", Validity()), + ("subject", Name()), + ("subjectPublicKeyInfo", SubjectPublicKeyInfo()), + ("issuerUniqueID", UniqueIdentifier(impl=tag_ctxp(1), optional=True)), + ("subjectUniqueID", UniqueIdentifier(impl=tag_ctxp(2), optional=True)), + ("extensions", Extensions(expl=tag_ctxc(3), optional=True)), + ) + * - :: + + Version ::= INTEGER { v1(0), v2(1), v3(2) } + - :: + + class Version(Integer): + schema = (("v1", 0), ("v2", 1), ("v3", 2)) + * - :: + + CertificateSerialNumber ::= INTEGER + - :: + + class CertificateSerialNumber(Integer): + pass + * - :: + + Validity ::= SEQUENCE { + notBefore Time, + notAfter Time } + Time ::= CHOICE { + utcTime UTCTime, + generalTime GeneralizedTime } + - :: + + class Validity(Sequence): + schema = ( + ("notBefore", Time()), + ("notAfter", Time()), + ) + class Time(Choice): + schema = ( + ("utcTime", UTCTime()), + ("generalTime", GeneralizedTime()), + ) + * - :: + + SubjectPublicKeyInfo ::= SEQUENCE { + algorithm AlgorithmIdentifier, + subjectPublicKey BIT STRING } + - :: + + class SubjectPublicKeyInfo(Sequence): + schema = ( + ("algorithm", AlgorithmIdentifier()), + ("subjectPublicKey", BitString()), + ) + * - :: + + UniqueIdentifier ::= BIT STRING + - :: + + class UniqueIdentifier(BitString): + pass + * - :: + + Name ::= CHOICE { rdnSequence RDNSequence } + + RDNSequence ::= SEQUENCE OF RelativeDistinguishedName + + RelativeDistinguishedName ::= SET SIZE (1..MAX) OF AttributeTypeAndValue + + AttributeTypeAndValue ::= SEQUENCE { type AttributeType, value AttributeValue } + + AttributeType ::= OBJECT IDENTIFIER + + AttributeValue ::= ANY -- DEFINED BY AttributeType + - :: + + class Name(Choice): + schema = (("rdnSequence", RDNSequence()),) + class RDNSequence(SequenceOf): + schema = RelativeDistinguishedName() + class RelativeDistinguishedName(SetOf): + schema = AttributeTypeAndValue() + bounds = (1, float("+inf")) + class AttributeTypeAndValue(Sequence): + schema = ( + ("type", AttributeType()), + ("value", AttributeValue()), + ) + class AttributeType(ObjectIdentifier): + pass + class AttributeValue(Any): + pass + * - :: + + Extensions ::= SEQUENCE SIZE (1..MAX) OF Extension + + Extension ::= SEQUENCE { + extnID OBJECT IDENTIFIER, + critical BOOLEAN DEFAULT FALSE, + extnValue OCTET STRING + } + - :: + + class Extensions(SequenceOf): + schema = Extension() + bounds = (1, float("+inf")) + class Extension(Sequence): + schema = ( + ("extnID", ObjectIdentifier()), + ("critical", Boolean(default=False)), + ("extnValue", OctetString()), + ) + +We are ready to decode PayPal's certificate from Go `encoding/asn1 +`__ test suite (assuming that +it's DER encoded representation is already in ``raw`` variable):: + + >>> crt, tail = Certificate().decode(raw) + >>> crt + Certificate SEQUENCE[TBSCertificate SEQUENCE[[0] EXPLICIT Version + INTEGER v3 OPTIONAL, CertificateSerialNumber INTEGER 61595, + AlgorithmIdentifier SEQUENCE[OBJECT IDENTIFIER 1.2.840.113549.1.1.5... + +Pretty printing +--------------- + +There is huge output. Let's pretty print it:: + + >>> print(pprint(crt)) + 0 [1,3,1604] Certificate SEQUENCE + 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL + 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 + 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL + . . . . 05:00 + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence + 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF + 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF + 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE + 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER + 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY + . . . . . . . 13:02:45:53 + [...] + 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE + 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL + . . . 05:00 + 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits + . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD + . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C + [...] + + Trailing data: 0a + +Let's parse that output, human:: + + 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL + ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ + 0 1 2 3 4 5 6 7 8 9 10 11 + +:: + + 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + ^ ^ ^ ^ ^ ^ ^ ^ + 0 2 3 4 5 6 9 10 + +:: + + 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence + ^ ^ ^ ^ ^ ^ ^ ^ ^ + 0 2 3 4 5 6 8 9 10 + +:0: + Offset of the object, where its DER encoding begins. + Pay attention that it does **not** include explicit tag. +:1: + If explicit tag exists, then this is its length (tag + encoded length). +:2: + Length of object's tag. For example CHOICE does not have its own tag, + so it is zero. +:3: + Length of encoded length. +:4: + Length of encoded value. +:5: + Visual indentation to show the depth of object in the hierarchy. +:6: + Object's name inside SEQUENCE/CHOICE. +:7: + If either IMPLICIT or EXPLICIT tag is set, then it will be shown + here. "IMPLICIT" is omitted. +:8: + Object's class name, if set. Omitted if it is just an ordinary simple + value (like with ``algorithm`` in example above). +:9: + Object's ASN.1 type. +:10: + Object's value, if set. Can consist of multiple words (like OCTET/BIT + STRINGs above). We see ``v3`` value in Version, because it is named. + ``rdnSequence`` is the choice of CHOICE type. +:11: + Possible other flags like OPTIONAL and DEFAULT, if value equals to the + default one, specified in the schema. + +As command line utility +----------------------- + +You can decode DER files using command line abilities and get the same +picture as above by executing:: + + % --schema tests.test_crts:Certificate path/to/file + +If there is no schema for you file, then you can try parsing it without, +but of course IMPLICIT tags will often make it impossible. But result is +good enough for the certificate above:: + + % path/to/file + 0 [1,3,1604] . >: SEQUENCE OF + 4 [1,3,1453] . . >: SEQUENCE OF + 8 [0,0, 5] . . . . >: [0] ANY + . . . . . A0:03:02:01:02 + 13 [1,1, 3] . . . . >: INTEGER 61595 + 18 [1,1, 13] . . . . >: SEQUENCE OF + 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 31 [1,1, 0] . . . . . . >: NULL + 33 [1,3, 274] . . . . >: SEQUENCE OF + 37 [1,1, 11] . . . . . . >: SET OF + 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF + 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER + 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES + [...] + 1409 [1,1, 50] . . . . . . >: SEQUENCE OF + 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER + 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes + . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16 + . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63 + . . . . . . . . . 61:2E:63:6F:6D:2F + 1461 [1,1, 13] . . >: SEQUENCE OF + 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 + 1474 [1,1, 0] . . . . >: NULL + 1476 [1,2, 129] . . >: BIT STRING 1024 bits + . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD + . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C + [...] + +If you have got dictionaries with ObjectIdentifiers, like example one +from ``tests/``:: + + some_oids = { + "1.2.840.113549.1.1.1": "id-rsaEncryption", + "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption", + [...] + "": "id-at-organizationName", + "": "id-at-organizationalUnitName", + } + +then you can pass it to pretty printer to see human readable OIDs:: + + % --oids tests.test_crts:some_oids path/to/file + [...] + 37 [1,1, 11] . . . . . . >: SET OF + 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF + 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName ( + 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES + 50 [1,1, 18] . . . . . . >: SET OF + 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF + 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName ( + 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona + 70 [1,1, 18] . . . . . . >: SET OF + 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF + 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName ( + 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona + [...] + +Descriptive errors +------------------ + +If you have bad DER, then errors will show you where error occurred:: + + % --schema tests.test_crts:Certificate path/to/bad/file + Traceback (most recent call last): + [...] + pyderasn.DecodeError: UTCTime (tbsCertificate.validity.notAfter.utcTime) (at 328) invalid UTCTime format + +:: + + % path/to/bad/file + [...] + pyderasn.DecodeError: UTCTime (0.SequenceOf.4.SequenceOf.1.UTCTime) (at 328) invalid UTCTime format + +You can see, so called, decode path inside the structures: +``tbsCertificate`` -> ``validity`` -> ``notAfter`` -> ``utcTime`` and +that object at byte 328 is invalid. + +X.509 certificate creation +-------------------------- + +Let's create some simple self-signed X.509 certificate from the ground:: + + tbs = TBSCertificate() + tbs["serialNumber"] = CertificateSerialNumber(10143011886257155224) + + sign_algo_id = AlgorithmIdentifier() + sign_algo_id["algorithm"] = ObjectIdentifier("1.2.840.113549.1.1.5") + sign_algo_id["parameters"] = Any(Null()) + tbs["signature"] = sign_algo_id + + rdnSeq = RDNSequence() + for oid, klass, text in ( + ("", PrintableString, "XX"), + ("", PrintableString, "Some-State"), + ("", PrintableString, "City"), + ("", PrintableString, "Internet Widgits Pty Ltd"), + ("", PrintableString, ""), + ("1.2.840.113549.1.9.1", IA5String, ""), + ): + attr = AttributeTypeAndValue() + attr["type"] = AttributeType(oid) + attr["value"] = AttributeValue(klass(text)) + rdn = RelativeDistinguishedName() + rdn.append(attr) + rdnSeq.append(rdn) + issuer = Name() + issuer["rdnSequence"] = rdnSeq + tbs["issuer"] = issuer + tbs["subject"] = issuer + + validity = Validity() + validity["notBefore"] = Time(("utcTime", UTCTime(datetime(2009, 10, 8, 0, 25, 53)))) + validity["notAfter"] = Time(("utcTime", UTCTime(datetime(2010, 10, 8, 0, 25, 53)))) + tbs["validity"] = validity + + spki = SubjectPublicKeyInfo() + spki_algo_id = sign_algo_id.copy() + spki_algo_id["algorithm"] = ObjectIdentifier("1.2.840.113549.1.1.1") + spki["algorithm"] = spki_algo_id + spki["subjectPublicKey"] = BitString(hexdec("".join(( + "3048024100cdb7639c3278f006aa277f6eaf42902b592d8cbcbe38a1c92ba4695", + "a331b1deadeadd8e9a5c27e8c4c2fd0a8889657722a4f2af7589cf2c77045dc8f", + "deec357d0203010001", + )))) + tbs["subjectPublicKeyInfo"] = spki + + crt = Certificate() + crt["tbsCertificate"] = tbs + crt["signatureAlgorithm"] = sign_algo_id + crt["signatureValue"] = BitString(hexdec("".join(( + "a67b06ec5ece92772ca413cba3ca12568fdc6c7b4511cd40a7f659980402df2b", + "998bb9a4a8cbeb34c0f0a78cf8d91ede14a5ed76bf116fe360aafa8821490435", + )))) + crt.encode() + +And we will get the same certificate used in Go's library tests. diff --git a/doc/features.rst b/doc/features.rst new file mode 100644 index 0000000..4d94324 --- /dev/null +++ b/doc/features.rst @@ -0,0 +1,50 @@ +Features +======== + +* Basic ASN.1 data types (X.208): BOOLEAN, INTEGER, BIT STRING, OCTET + STRING, NULL, OBJECT IDENTIFIER, ENUMERATED, all strings, UTCTime, + GeneralizedTime, CHOICE, ANY, SEQUENCE (OF), SET (OF) +* Size constraints checking +* Working with sequences as high level data objects with ability to + (un)marshall them +* Python 2.7/3.5 compatibility + +Why yet another library? `pyasn1 `__ +had all of this a long time ago. PyDERASN resembles it in many ways. In +practice it should be relatively easy to convert ``pyasn1``'s code to +``pyderasn``'s one. But additionally it offers: + +* Small, simple and trying to be reviewable code. Just a single file +* ``__slots__`` friendliness +* Ability to know exact decoded objects offsets and lengths in the binary +* Pretty printer and command-line decoder, that could conveniently + replace utilities like either ``dumpasn1`` or ``openssl asn1parse`` +* Some kind of strong typing: SEQUENCEs require the exact **type** of + settable values, even when they are inherited +* However they do not require tags matching: IMPLICIT/EXPLICIT tags will + be set automatically in the given sequence +* Could be significantly faster. For example parsing of's CRL + under Python 3.5.2: + + :`` revoke.crl``: + ~2 min + :`` --schema revoke.crl``: + ~38 sec + :``pyasn1.decode(asn1Spec=pyasn1.CertificateList())``: + ~22 min (``pyasn1 == 0.2.3``) + +There are drawbacks: + +* No old Python versions support +* No BER/CER support +* PyDERASN does **not** have object recreation capable ``repr``-s:: + + pyderasn>>> repr(algo_id) + AlgorithmIdentifier SEQUENCE[OBJECT IDENTIFIER, [UNIV 5] ANY 0500 OPTIONAL] + + pyasn1>>> repr(algo_id) + AlgorithmIdentifier().setComponents(ObjectIdentifier(''), Any(hexValue='0500')) + +* Strings are not validated in any way, except just trying to be decoded + in ``ascii``, ``iso-8859-1``, ``utf-8/16/32`` correspondingly +* No REAL, RELATIVE OID, EXTERNAL, INSTANCE OF, EMBEDDED PDV, CHARACTER STRING diff --git a/doc/feedback.rst b/doc/feedback.rst new file mode 100644 index 0000000..d83f51b --- /dev/null +++ b/doc/feedback.rst @@ -0,0 +1,10 @@ +Feedback +======== + +Please send questions regarding the use of PyDERASN, bug reports and +patches to `pyderasn-devel `__ +mailing list. Announcements also go to this mailing list. + +Official website is +Development Git source code repository is located here: + diff --git a/doc/index.rst b/doc/index.rst new file mode 100644 index 0000000..edccc23 --- /dev/null +++ b/doc/index.rst @@ -0,0 +1,30 @@ +======================================== +PyDERASN -- ASN.1 DER library for Python +======================================== + +.. + + I'm going to build my own ASN.1 library with slots and blobs! + (C) PyDERASN's author + +`ASN.1 `__ (Abstract Syntax +Notation One) is a standard for abstract data serialization. +`DER `__ +(Distinguished Encoding Rules) is a subset of encoding rules suitable +and widely used in cryptography-related stuff. PyDERASN is yet another +library for dealing with the data encoded that way. Although ASN.1 is +written more than 30 years ago by wise Ancients (taken from ``pyasn1``'s +README), it is still often can be seen anywhere in our life. + +PyDERASN is `free software `__, +licenced under `GNU LGPLv3+ `__. + +.. toctree:: + :maxdepth: 1 + + features + examples + reference + install + download + feedback diff --git a/doc/install.rst b/doc/install.rst new file mode 100644 index 0000000..08285de --- /dev/null +++ b/doc/install.rst @@ -0,0 +1,42 @@ +Install +======= + +Preferable way is to :ref:`download ` tarball with the +signature from `official website `__:: + + % wget + % wget + % gpg --verify pyderasn-1.0.tar.xz.sig pyderasn-1.0.tar.xz + % xz -d < pyderasn-1.0.tar.xz | tar xf - + % cd pyderasn-1.0 + % python install + +PyDERASN depends on `six `__ package +for keeping compatibility with Py27/Py35. If it is not installed on your +system, then `` install`` will try to download it from PyPI. You +can also find it mirrored on :ref:`download ` page. + +You could use PIP (**no** authentication is performed!):: + + % pip install pyderasn + +You have to verify downloaded tarballs integrity and authenticity to be +sure that you retrieved trusted and untampered software. `GNU Privacy +Guard `__ is used for that purpose. + +For the very first time it is necessary to get signing public key and +import it. It is provided below, but you should check alternative +resources. + +:: + + pub rsa2048/0x04A933D1BA20327A 2017-09-20 + 2ED6 C846 3051 02DF 5B4E 0383 04A9 33D1 BA20 327A + uid PyDERASN releases + + % gpg --keyserver hkp:// --recv-keys 0x04A933D1BA20327A + % gpg --auto-key-locate dane --locate-keys pyderasn at cypherpunks dot ru + % gpg --auto-key-locate wkd --locate-keys pyderasn at cypherpunks dot ru + % gpg --auto-key-locate pka --locate-keys pyderasn at cypherpunks dot ru + +.. literalinclude:: ../PUBKEY.asc diff --git a/doc/pip-requirements.txt b/doc/pip-requirements.txt new file mode 100644 index 0000000..bc0bbd0 --- /dev/null +++ b/doc/pip-requirements.txt @@ -0,0 +1,16 @@ +alabaster == 0.7.10 +babel == 2.5.1 +certifi == 2017.7.27.1 +chardet == 3.0.4 +docutils == 0.14 +idna == 2.6 +imagesize == 0.7.1 +Jinja2 == 2.9.6 +MarkupSafe == 1.0 +Pygments == 2.2.0 +pytz == 2017.2 +requests == 2.18.4 +snowballstemmer == 1.2.1 +sphinx == 1.6.3 +sphinxcontrib-websupport == 1.0.1 +urllib3 == 1.22 diff --git a/doc/reference.rst b/doc/reference.rst new file mode 100644 index 0000000..e1d5a51 --- /dev/null +++ b/doc/reference.rst @@ -0,0 +1,6 @@ +Library reference +================= + +.. contents:: + +.. automodule:: pyderasn diff --git a/nose.cfg b/nose.cfg new file mode 100644 index 0000000..90a426a --- /dev/null +++ b/nose.cfg @@ -0,0 +1,10 @@ +[nosetests] +tests=tests +verbosity=2 +with-runnable-test-names=1 +nocapture=1 +with-coverage=1 +cover-erase=1 +cover-package=pyderasn +processes=-1 +process-timeout=1800 diff --git a/pip-requirements-tests.txt b/pip-requirements-tests.txt new file mode 100644 index 0000000..625aed7 --- /dev/null +++ b/pip-requirements-tests.txt @@ -0,0 +1,4 @@ +attrs==17.2.0 +coverage==4.4.1 +enum34==1.1.6 ; python_version == '2.7' +hypothesis==3.30.4 diff --git a/pip-requirements.txt b/pip-requirements.txt new file mode 100644 index 0000000..756cde0 --- /dev/null +++ b/pip-requirements.txt @@ -0,0 +1 @@ +six==1.11.0 diff --git a/ b/ new file mode 100755 index 0000000..b1f029c --- /dev/null +++ b/ @@ -0,0 +1,3944 @@ +#!/usr/bin/env python +# coding: utf-8 +# PyDERASN -- Python ASN.1 DER codec with abstract structures +# Copyright (C) 2017 Sergey Matveev +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program. If not, see +# . +"""Python ASN.1 DER codec with abstract structures + +This library allows you to marshal and unmarshal various structures in +ASN.1 DER format, like this: + + >>> i = Integer(123) + >>> raw = i.encode() + >>> Integer().decode(raw) == i + True + +There are primitive types, holding single values +(:py:class:`pyderasn.BitString`, +:py:class:`pyderasn.Boolean`, +:py:class:`pyderasn.Enumerated`, +:py:class:`pyderasn.GeneralizedTime`, +:py:class:`pyderasn.Integer`, +:py:class:`pyderasn.Null`, +:py:class:`pyderasn.ObjectIdentifier`, +:py:class:`pyderasn.OctetString`, +:py:class:`pyderasn.UTCTime`, +:py:class:`various strings ` +(:py:class:`pyderasn.BMPString`, +:py:class:`pyderasn.GeneralString`, +:py:class:`pyderasn.GraphicString`, +:py:class:`pyderasn.IA5String`, +:py:class:`pyderasn.ISO646String`, +:py:class:`pyderasn.NumericString`, +:py:class:`pyderasn.PrintableString`, +:py:class:`pyderasn.T61String`, +:py:class:`pyderasn.TeletexString`, +:py:class:`pyderasn.UniversalString`, +:py:class:`pyderasn.UTF8String`, +:py:class:`pyderasn.VideotexString`, +:py:class:`pyderasn.VisibleString`)), +constructed types, holding multiple primitive types +(:py:class:`pyderasn.Sequence`, +:py:class:`pyderasn.SequenceOf`, +:py:class:`pyderasn.Set`, +:py:class:`pyderasn.SetOf`), +and special types like +:py:class:`pyderasn.Any` and +:py:class:`pyderasn.Choice`. + +Common for most types +--------------------- + +Tags +____ + +Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is +the default tag used during coding process. You can override it with +either ``IMPLICIT`` (using ``impl`` keyword argument), or +``EXPLICIT`` one (using ``expl`` keyword argument). Both arguments takes +raw binary string, containing that tag. You can **not** set implicit and +explicit tags simultaneously. + +There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc` +functions, allowing you to easily create ``CONTEXT`` +``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag +number. Pay attention that explicit tags always have *constructed* tag +(``tag_ctxc``), but implicit tags for primitive types are primitive +(``tag_ctxp``). + +:: + + >>> Integer(impl=tag_ctxp(1)) + [1] INTEGER + >>> Integer(expl=tag_ctxc(2)) + [2] EXPLICIT INTEGER + +Implicit tag is not explicitly shown. + +Two object of the same type, but with different implicit/explicit tags +are **not** equal. + +You can get objects effective tag (either default or implicited) through +``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode` +function:: + + >>> tag_decode(tag_ctxc(123)) + (128, 32, 123) + >>> klass, form, num = tag_decode(tag_ctxc(123)) + >>> klass == TagClassContext + True + >>> form == TagFormConstructed + True + +To determine if object has explicit tag, use ``expled`` boolean property +and ``expl_tag`` property, returning explicit tag's value. + +Default/optional +________________ + +Many objects in sequences could be ``OPTIONAL`` and could have +``DEFAULT`` value. You can specify that object's property using +corresponding keyword arguments. + + >>> Integer(optional=True, default=123) + INTEGER 123 OPTIONAL DEFAULT + +Those specifications do not play any role in primitive value encoding, +but are taken into account when dealing with sequences holding them. For +example ``TBSCertificate`` sequence holds defaulted, explicitly tagged +``version`` field:: + + class Version(Integer): + schema = ( + ("v1", 0), + ("v2", 1), + ("v3", 2), + ) + class TBSCertificate(Sequence): + schema = ( + ("version", Version(expl=tag_ctxc(0), default="v1")), + [...] + +When default argument is used and value is not specified, then it equals +to default one. + +Size constraints +________________ + +Some objects give ability to set value size constraints. This is either +possible integer value, or allowed length of various strings and +sequences. Constraints are set in the following way:: + + class X(...): + bounds = (MIN, MAX) + +And values satisfaction is checked as: ``MIN <= X <= MAX``. + +For simplicity you can also set bounds the following way:: + + bounded_x = X(bounds=(MIN, MAX)) + +If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is +raised. + +Common methods +______________ + +All objects have ``ready`` boolean property, that tells if it is ready +to be encoded. If that kind of action is performed on unready object, +then :py:exc:`pyderasn.ObjNotReady` exception will be raised. + +All objects have ``copy()`` method, returning its copy, that can be safely +mutated. + +Decoding +________ + +Decoding is performed using ``decode()`` method. ``offset`` optional +argument could be used to set initial object's offset in the binary +data, for convenience. It returns decoded object and remaining +unmarshalled data (tail). Internally all work is done on +``memoryview(data)``, and you can leave returning tail as a memoryview, +by specifying ``leavemm=True`` argument. + +When object is decoded, ``decoded`` property is true and you can safely +use following properties: + +* ``offset`` -- position from initial offset where object's tag is started +* ``tlen`` -- length of object's tag +* ``llen`` -- length of object's length value +* ``vlen`` -- length of object's value +* ``tlvlen`` -- length of the whole object + +Pay attention that those values do **not** include anything related to +explicit tag. If you want to know information about it, then use: +``expled`` (to know if explicit tag is set), ``expl_offset`` (it is +lesser than ``offset``), ``expl_tlen``, ``expl_llen``, ``expl_vlen`` +(that actually equals to ordinary ``tlvlen``). + +When error occurs, then :py:exc:`pyderasn.DecodeError` is raised. + +Pretty printing +_______________ + +All objects have ``pps()`` method, that is a generator of +:py:class:`pyderasn.PP` namedtuple, holding various raw information +about the object. If ``pps`` is called on sequences, then all underlying +``PP`` will be yielded. + +You can use :py:func:`pyderasn.pp_console_row` function, converting +those ``PP`` to human readable string. Actually exactly it is used for +all object ``repr``. But it is easy to write custom formatters. + + >>> from pyderasn import pprint + >>> encoded = Integer(-12345).encode() + >>> obj, tail = Integer().decode(encoded) + >>> print(pprint(obj)) + 0 [1,1, 2] INTEGER -12345 + +Primitive types +--------------- + +Boolean +_______ +.. autoclass:: pyderasn.Boolean + :members: __init__ + +Integer +_______ +.. autoclass:: pyderasn.Integer + :members: __init__ + +BitString +_________ +.. autoclass:: pyderasn.BitString + :members: __init__ + +OctetString +___________ +.. autoclass:: pyderasn.OctetString + :members: __init__ + +Null +____ +.. autoclass:: pyderasn.Null + :members: __init__ + +ObjectIdentifier +________________ +.. autoclass:: pyderasn.ObjectIdentifier + :members: __init__ + +Enumerated +__________ +.. autoclass:: pyderasn.Enumerated + +CommonString +____________ +.. autoclass:: pyderasn.CommonString + +UTCTime +_______ +.. autoclass:: pyderasn.UTCTime + :members: __init__, todatetime + +GeneralizedTime +_______________ +.. autoclass:: pyderasn.GeneralizedTime + +Special types +------------- + +Choice +______ +.. autoclass:: pyderasn.Choice + :members: __init__ + +PrimitiveTypes +______________ +.. autoclass:: PrimitiveTypes + +Any +___ +.. autoclass:: pyderasn.Any + :members: __init__ + +Constructed types +----------------- + +Sequence +________ +.. autoclass:: pyderasn.Sequence + :members: __init__ + +Set +___ +.. autoclass:: pyderasn.Set + :members: __init__ + +SequenceOf +__________ +.. autoclass:: pyderasn.SequenceOf + :members: __init__ + +SetOf +_____ +.. autoclass:: pyderasn.SetOf + :members: __init__ +""" + +from codecs import getdecoder +from codecs import getencoder +from collections import namedtuple +from collections import OrderedDict +from datetime import datetime +from math import ceil + +from six import binary_type +from six import byte2int +from six import indexbytes +from six import int2byte +from six import integer_types +from six import iterbytes +from six import PY2 +from six import string_types +from six import text_type +from six.moves import xrange as six_xrange + + +__all__ = ( + "Any", + "BitString", + "BMPString", + "Boolean", + "BoundsError", + "Choice", + "DecodeError", + "Enumerated", + "GeneralizedTime", + "GeneralString", + "GraphicString", + "hexdec", + "hexenc", + "IA5String", + "Integer", + "InvalidLength", + "InvalidOID", + "InvalidValueType", + "ISO646String", + "NotEnoughData", + "Null", + "NumericString", + "obj_by_path", + "ObjectIdentifier", + "ObjNotReady", + "ObjUnknown", + "OctetString", + "PrimitiveTypes", + "PrintableString", + "Sequence", + "SequenceOf", + "Set", + "SetOf", + "T61String", + "tag_ctxc", + "tag_ctxp", + "tag_decode", + "TagClassApplication", + "TagClassContext", + "TagClassPrivate", + "TagClassUniversal", + "TagFormConstructed", + "TagFormPrimitive", + "TagMismatch", + "TeletexString", + "UniversalString", + "UTCTime", + "UTF8String", + "VideotexString", + "VisibleString", +) + +TagClassUniversal = 0 +TagClassApplication = 1 << 6 +TagClassContext = 1 << 7 +TagClassPrivate = 1 << 6 | 1 << 7 +TagFormPrimitive = 0 +TagFormConstructed = 1 << 5 +TagClassReprs = { + TagClassContext: "", + TagClassApplication: "APPLICATION ", + TagClassPrivate: "PRIVATE ", + TagClassUniversal: "UNIV ", +} + + +######################################################################## +# Errors +######################################################################## + +class DecodeError(Exception): + def __init__(self, msg="", klass=None, decode_path=(), offset=0): + super(DecodeError, self).__init__() + self.msg = msg + self.klass = klass + self.decode_path = decode_path + self.offset = offset + + def __str__(self): + return " ".join( + c for c in ( + "" if self.klass is None else self.klass.__name__, + ( + ("(%s)" % ".".join(self.decode_path)) + if len(self.decode_path) > 0 else "" + ), + ("(at %d)" % self.offset) if self.offset > 0 else "", + self.msg, + ) if c != "" + ) + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self) + + +class NotEnoughData(DecodeError): + pass + + +class TagMismatch(DecodeError): + pass + + +class InvalidLength(DecodeError): + pass + + +class InvalidOID(DecodeError): + pass + + +class ObjUnknown(ValueError): + def __init__(self, name): + super(ObjUnknown, self).__init__() + = name + + def __str__(self): + return "object is unknown: %s" % + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self) + + +class ObjNotReady(ValueError): + def __init__(self, name): + super(ObjNotReady, self).__init__() + = name + + def __str__(self): + return "object is not ready: %s" % + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self) + + +class InvalidValueType(ValueError): + def __init__(self, expected_types): + super(InvalidValueType, self).__init__() + self.expected_types = expected_types + + def __str__(self): + return "invalid value type, expected: %s" % ", ".join( + [repr(t) for t in self.expected_types] + ) + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self) + + +class BoundsError(ValueError): + def __init__(self, bound_min, value, bound_max): + super(BoundsError, self).__init__() + self.bound_min = bound_min + self.value = value + self.bound_max = bound_max + + def __str__(self): + return "unsatisfied bounds: %s <= %s <= %s" % ( + self.bound_min, + self.value, + self.bound_max, + ) + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self) + + +######################################################################## +# Basic coders +######################################################################## + +_hexdecoder = getdecoder("hex") +_hexencoder = getencoder("hex") + + +def hexdec(data): + return _hexdecoder(data)[0] + + +def hexenc(data): + return _hexencoder(data)[0].decode("ascii") + + +def int_bytes_len(num, byte_len=8): + if num == 0: + return 1 + return int(ceil(float(num.bit_length()) / byte_len)) + + +def zero_ended_encode(num): + octets = bytearray(int_bytes_len(num, 7)) + i = len(octets) - 1 + octets[i] = num & 0x7F + num >>= 7 + i -= 1 + while num > 0: + octets[i] = 0x80 | (num & 0x7F) + num >>= 7 + i -= 1 + return bytes(octets) + + +def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive): + if num < 31: + # [XX|X|.....] + return int2byte(klass | form | num) + # [XX|X|11111][1.......][1.......] ... [0.......] + return int2byte(klass | form | 31) + zero_ended_encode(num) + + +def tag_decode(tag): + """ + assume that data is validated + """ + first_octet = byte2int(tag) + klass = first_octet & 0xC0 + form = first_octet & 0x20 + if first_octet & 0x1F < 0x1F: + return (klass, form, first_octet & 0x1F) + num = 0 + for octet in iterbytes(tag[1:]): + num <<= 7 + num |= octet & 0x7F + return (klass, form, num) + + +def tag_ctxp(num): + return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive) + + +def tag_ctxc(num): + return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed) + + +def tag_strip(data): + """Take off tag from the data + + :returns: (encoded tag, tag length, remaining data) + """ + if len(data) == 0: + raise NotEnoughData("no data at all") + if byte2int(data) & 0x1F < 31: + return data[:1], 1, data[1:] + i = 0 + while True: + i += 1 + if i == len(data): + raise DecodeError("unfinished tag") + if indexbytes(data, i) & 0x80 == 0: + break + i += 1 + return data[:i], i, data[i:] + + +def len_encode(l): + if l < 0x80: + return int2byte(l) + octets = bytearray(int_bytes_len(l) + 1) + octets[0] = 0x80 | (len(octets) - 1) + for i in six_xrange(len(octets) - 1, 0, -1): + octets[i] = l & 0xFF + l >>= 8 + return bytes(octets) + + +def len_decode(data): + if len(data) == 0: + raise NotEnoughData("no data at all") + first_octet = byte2int(data) + if first_octet & 0x80 == 0: + return first_octet, 1, data[1:] + octets_num = first_octet & 0x7F + if octets_num + 1 > len(data): + raise NotEnoughData("encoded length is longer than data") + if octets_num == 0: + raise DecodeError("long form instead of short one") + if byte2int(data[1:]) == 0: + raise DecodeError("leading zeros") + l = 0 + for v in iterbytes(data[1:1 + octets_num]): + l = (l << 8) | v + if l <= 127: + raise DecodeError("long form instead of short one") + return l, 1 + octets_num, data[1 + octets_num:] + + +######################################################################## +# Base class +######################################################################## + +class Obj(object): + """Common ASN.1 object class + """ + __slots__ = ( + "tag", + "_value", + "_expl", + "default", + "optional", + "offset", + "llen", + "vlen", + ) + + def __init__( + self, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + ): + if impl is None: + self.tag = getattr(self, "impl", self.tag_default) + else: + self.tag = impl + self._expl = getattr(self, "expl", None) if expl is None else expl + if self.tag != self.tag_default and self._expl is not None: + raise ValueError( + "implicit and explicit tags can not be set simultaneously" + ) + if default is not None: + optional = True + self.optional = optional + self.offset, self.llen, self.vlen = _decoded + self.default = None + + @property + def ready(self): # pragma: no cover + raise NotImplementedError() + + def _assert_ready(self): + if not self.ready: + raise ObjNotReady(self.__class__.__name__) + + @property + def decoded(self): + return self.llen > 0 + + def copy(self): # pragma: no cover + raise NotImplementedError() + + @property + def tlen(self): + return len(self.tag) + + @property + def tlvlen(self): + return self.tlen + self.llen + self.vlen + + def __str__(self): # pragma: no cover + return self.__bytes__() if PY2 else self.__unicode__() + + def _encode(self): # pragma: no cover + raise NotImplementedError() + + def _decode(self, tlv, offset=0, decode_path=()): # pragma: no cover + raise NotImplementedError() + + def encode(self): + raw = self._encode() + if self._expl is None: + return raw + return b"".join((self._expl, len_encode(len(raw)), raw)) + + def decode(self, data, offset=0, leavemm=False, decode_path=()): + tlv = memoryview(data) + if self._expl is None: + obj, tail = self._decode( + tlv, + offset, + decode_path=decode_path, + ) + else: + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self._expl: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj, tail = self._decode( + v, + offset=offset + tlen + llen, + decode_path=(), + ) + return obj, (tail if leavemm else tail.tobytes()) + + @property + def expled(self): + return self._expl is not None + + @property + def expl_tag(self): + return self._expl + + @property + def expl_tlen(self): + return len(self._expl) + + @property + def expl_llen(self): + return len(len_encode(self.tlvlen)) + + @property + def expl_offset(self): + return self.offset - self.expl_tlen - self.expl_llen + + @property + def expl_vlen(self): + return self.tlvlen + + @property + def expl_tlvlen(self): + return self.expl_tlen + self.expl_llen + self.expl_vlen + + +######################################################################## +# Pretty printing +######################################################################## + +PP = namedtuple("PP", ( + "asn1_type_name", + "obj_name", + "decode_path", + "value", + "blob", + "optional", + "default", + "impl", + "expl", + "offset", + "tlen", + "llen", + "vlen", + "expl_offset", + "expl_tlen", + "expl_llen", + "expl_vlen", +)) + + +def _pp( + asn1_type_name="unknown", + obj_name="unknown", + decode_path=(), + value=None, + blob=None, + optional=False, + default=False, + impl=None, + expl=None, + offset=0, + tlen=0, + llen=0, + vlen=0, + expl_offset=None, + expl_tlen=None, + expl_llen=None, + expl_vlen=None, +): + return PP( + asn1_type_name, + obj_name, + decode_path, + value, + blob, + optional, + default, + impl, + expl, + offset, + tlen, + llen, + vlen, + expl_offset, + expl_tlen, + expl_llen, + expl_vlen, + ) + + +def pp_console_row(pp, oids=None, with_offsets=False, with_blob=True): + cols = [] + if with_offsets: + cols.append("%5d%s [%d,%d,%4d]" % ( + pp.offset, + ( + " " if pp.expl_offset is None else + ("-%d" % (pp.offset - pp.expl_offset)) + ), + pp.tlen, + pp.llen, + pp.vlen, + )) + if len(pp.decode_path) > 0: + cols.append(" ." * (len(pp.decode_path))) + cols.append("%s:" % pp.decode_path[-1]) + if pp.expl is not None: + klass, _, num = pp.expl + cols.append("[%s%d] EXPLICIT" % (TagClassReprs[klass], num)) + if pp.impl is not None: + klass, _, num = pp.impl + cols.append("[%s%d]" % (TagClassReprs[klass], num)) + if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper(): + cols.append(pp.obj_name) + cols.append(pp.asn1_type_name) + if pp.value is not None: + value = pp.value + if ( + oids is not None and + pp.asn1_type_name == ObjectIdentifier.asn1_type_name and + value in oids + ): + value = "%s (%s)" % (oids[value], pp.value) + cols.append(value) + if with_blob: + if isinstance(pp.blob, binary_type): + cols.append(hexenc(pp.blob)) + elif isinstance(pp.blob, tuple): + cols.append(", ".join(pp.blob)) + if pp.optional: + cols.append("OPTIONAL") + if pp.default: + cols.append("DEFAULT") + return " ".join(cols) + + +def pp_console_blob(pp): + cols = [" " * len("XXXXXYY [X,X,XXXX]")] + if len(pp.decode_path) > 0: + cols.append(" ." * (len(pp.decode_path) + 1)) + if isinstance(pp.blob, binary_type): + blob = hexenc(pp.blob).upper() + for i in range(0, len(blob), 32): + chunk = blob[i:i + 32] + yield " ".join(cols + [":".join( + chunk[j:j + 2] for j in range(0, len(chunk), 2) + )]) + elif isinstance(pp.blob, tuple): + yield " ".join(cols + [", ".join(pp.blob)]) + + +def pprint(obj, oids=None, big_blobs=False): + def _pprint_pps(pps): + for pp in pps: + if hasattr(pp, "_fields"): + if big_blobs: + yield pp_console_row( + pp, + oids=oids, + with_offsets=True, + with_blob=False, + ) + for row in pp_console_blob(pp): + yield row + else: + yield pp_console_row(pp, oids=oids, with_offsets=True) + else: + for row in _pprint_pps(pp): + yield row + return "\n".join(_pprint_pps(obj.pps())) + + +######################################################################## +# ASN.1 primitive types +######################################################################## + +class Boolean(Obj): + """``BOOLEAN`` boolean type + + >>> b = Boolean(True) + BOOLEAN True + >>> b == Boolean(True) + True + >>> bool(b) + True + >>> Boolean(optional=True) + BOOLEAN OPTIONAL + >>> Boolean(impl=tag_ctxp(1), default=False) + [1] BOOLEAN False OPTIONAL DEFAULT + """ + __slots__ = () + tag_default = tag_encode(1) + asn1_type_name = "BOOLEAN" + + def __init__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + ): + """ + :param value: set the value. Either boolean type, or + :py:class:`pyderasn.Boolean` object + :param bytes impl: override default tag with ``IMPLICIT`` one + :param bytes expl: override default tag with ``EXPLICIT`` one + :param default: set default value. Type same as in ``value`` + :param bool optional: is object ``OPTIONAL`` in sequence + """ + super(Boolean, self).__init__(impl, expl, default, optional, _decoded) + self._value = None if value is None else self._value_sanitize(value) + if default is not None: + default = self._value_sanitize(default) + self.default = self.__class__( + value=default, + impl=self.tag, + expl=self._expl, + ) + if value is None: + self._value = default + + def _value_sanitize(self, value): + if issubclass(value.__class__, Boolean): + return value._value + if isinstance(value, bool): + return value + raise InvalidValueType((self.__class__, bool)) + + @property + def ready(self): + return self._value is not None + + def copy(self): + obj = self.__class__() + obj._value = self._value + obj.tag = self.tag + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + return obj + + def __nonzero__(self): + self._assert_ready() + return self._value + + def __bool__(self): + self._assert_ready() + return self._value + + def __eq__(self, their): + if isinstance(their, bool): + return self._value == their + if not issubclass(their.__class__, Boolean): + return False + return ( + self._value == their._value and + self.tag == their.tag and + self._expl == their._expl + ) + + def __call__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + ) + + def _encode(self): + self._assert_ready() + return b"".join(( + self.tag, + len_encode(1), + (b"\xFF" if self._value else b"\x00"), + )) + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, _, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self.tag: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, _, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l != 1: + raise InvalidLength( + "Boolean's length must be equal to 1", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + first_octet = byte2int(v) + if first_octet == 0: + value = False + elif first_octet == 0xFF: + value = True + else: + raise DecodeError( + "unacceptable Boolean value", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj = self.__class__( + value=value, + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, 1, 1), + ) + return obj, v[1:] + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + value=str(self._value) if self.ready else None, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + ) + + +class Integer(Obj): + """``INTEGER`` integer type + + >>> b = Integer(-123) + INTEGER -123 + >>> b == Integer(-123) + True + >>> int(b) + -123 + >>> Integer(optional=True) + INTEGER OPTIONAL + >>> Integer(impl=tag_ctxp(1), default=123) + [1] INTEGER 123 OPTIONAL DEFAULT + + >>> Integer(2, bounds=(1, 3)) + INTEGER 2 + >>> Integer(5, bounds=(1, 3)) + Traceback (most recent call last): + pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3 + + :: + + class Version(Integer): + schema = ( + ("v1", 0), + ("v2", 1), + ("v3", 2), + ) + + >>> v = Version("v1") + Version INTEGER v1 + >>> int(v) + 0 + >>> v.named + 'v1' + >>> v.specs + {'v3': 2, 'v1': 0, 'v2': 1} + """ + __slots__ = ("specs", "_bound_min", "_bound_max") + tag_default = tag_encode(2) + asn1_type_name = "INTEGER" + + def __init__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=False, + _specs=None, + _decoded=(0, 0, 0), + ): + """ + :param value: set the value. Either integer type, named value + (if ``schema`` is specified in the class), or + :py:class:`pyderasn.Integer` object + :param bounds: set ``(MIN, MAX)`` value constraint. + (-inf, +inf) by default + :param bytes impl: override default tag with ``IMPLICIT`` one + :param bytes expl: override default tag with ``EXPLICIT`` one + :param default: set default value. Type same as in ``value`` + :param bool optional: is object ``OPTIONAL`` in sequence + """ + super(Integer, self).__init__(impl, expl, default, optional, _decoded) + self._value = value + specs = getattr(self, "schema", {}) if _specs is None else _specs + self.specs = specs if isinstance(specs, dict) else dict(specs) + if bounds is None: + self._bound_min, self._bound_max = getattr( + self, + "bounds", + (float("-inf"), float("+inf")), + ) + else: + self._bound_min, self._bound_max = bounds + if value is not None: + self._value = self._value_sanitize(value) + if default is not None: + default = self._value_sanitize(default) + self.default = self.__class__( + value=default, + impl=self.tag, + expl=self._expl, + _specs=self.specs, + ) + if self._value is None: + self._value = default + + def _value_sanitize(self, value): + if issubclass(value.__class__, Integer): + value = value._value + elif isinstance(value, integer_types): + pass + elif isinstance(value, str): + value = self.specs.get(value) + if value is None: + raise ObjUnknown("integer value: %s" % value) + else: + raise InvalidValueType((self.__class__, int, str)) + if not self._bound_min <= value <= self._bound_max: + raise BoundsError(self._bound_min, value, self._bound_max) + return value + + @property + def ready(self): + return self._value is not None + + def copy(self): + obj = self.__class__(_specs=self.specs) + obj._value = self._value + obj._bound_min = self._bound_min + obj._bound_max = self._bound_max + obj.tag = self.tag + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + return obj + + def __int__(self): + self._assert_ready() + return int(self._value) + + def __hash__(self): + self._assert_ready() + return hash( + self.tag + + bytes(self._expl or b"") + + str(self._value).encode("ascii"), + ) + + def __eq__(self, their): + if isinstance(their, integer_types): + return self._value == their + if not issubclass(their.__class__, Integer): + return False + return ( + self._value == their._value and + self.tag == their.tag and + self._expl == their._expl + ) + + def __lt__(self, their): + return self._value < their + + def __gt__(self, their): + return self._value > their + + @property + def named(self): + for name, value in self.specs.items(): + if value == self._value: + return name + + def __call__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + bounds=( + (self._bound_min, self._bound_max) + if bounds is None else bounds + ), + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + _specs=self.specs, + ) + + def _encode(self): + self._assert_ready() + value = self._value + if PY2: + if value == 0: + octets = bytearray([0]) + elif value < 0: + value = -value + value -= 1 + octets = bytearray() + while value > 0: + octets.append((value & 0xFF) ^ 0xFF) + value >>= 8 + if len(octets) == 0 or octets[-1] & 0x80 == 0: + octets.append(0xFF) + else: + octets = bytearray() + while value > 0: + octets.append(value & 0xFF) + value >>= 8 + if octets[-1] & 0x80 > 0: + octets.append(0x00) + octets.reverse() + octets = bytes(octets) + else: + bytes_len = ceil(value.bit_length() / 8) or 1 + while True: + try: + octets = value.to_bytes( + bytes_len, + byteorder="big", + signed=True, + ) + except OverflowError: + bytes_len += 1 + else: + break + return b"".join((self.tag, len_encode(len(octets)), octets)) + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, _, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self.tag: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] + first_octet = byte2int(v) + if l > 1: + second_octet = byte2int(v[1:]) + if ( + ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or + ((first_octet == 0xFF) and (second_octet & 0x80 != 0)) + ): + raise DecodeError( + "non normalized integer", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if PY2: + value = 0 + if first_octet & 0x80 > 0: + octets = bytearray() + for octet in bytearray(v): + octets.append(octet ^ 0xFF) + for octet in octets: + value = (value << 8) | octet + value += 1 + value = -value + else: + for octet in bytearray(v): + value = (value << 8) | octet + else: + value = int.from_bytes(v, byteorder="big", signed=True) + try: + obj = self.__class__( + value=value, + bounds=(self._bound_min, self._bound_max), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _specs=self.specs, + _decoded=(offset, llen, l), + ) + except BoundsError as err: + raise DecodeError( + msg=str(err), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + return obj, tail + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + value=(self.named or str(self._value)) if self.ready else None, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + ) + + +class BitString(Obj): + """``BIT STRING`` bit string type + + >>> BitString(b"hello world") + BIT STRING 88 bits 68656c6c6f20776f726c64 + >>> bytes(b) + b'hello world' + >>> b == b"hello world" + True + >>> b.bit_len + 88 + + >>> b = BitString("'010110000000'B") + BIT STRING 12 bits 5800 + >>> b.bit_len + 12 + >>> b[0], b[1], b[2], b[3] + (False, True, False, True) + >>> b[1000] + False + >>> [v for v in b] + [False, True, False, True, True, False, False, False, False, False, False, False] + + :: + + class KeyUsage(BitString): + schema = ( + ('digitalSignature', 0), + ('nonRepudiation', 1), + ('keyEncipherment', 2), + ) + + >>> b = KeyUsage(('keyEncipherment', 'nonRepudiation')) + KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment + >>> b.named + ['nonRepudiation', 'keyEncipherment'] + >>> b.specs + {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2} + """ + __slots__ = ("specs",) + tag_default = tag_encode(3) + asn1_type_name = "BIT STRING" + + def __init__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=False, + _specs=None, + _decoded=(0, 0, 0), + ): + """ + :param value: set the value. Either binary type, tuple of named + values (if ``schema`` is specified in the class), + string in ``'XXX...'B`` form, or + :py:class:`pyderasn.BitString` object + :param bytes impl: override default tag with ``IMPLICIT`` one + :param bytes expl: override default tag with ``EXPLICIT`` one + :param default: set default value. Type same as in ``value`` + :param bool optional: is object ``OPTIONAL`` in sequence + """ + super(BitString, self).__init__(impl, expl, default, optional, _decoded) + specs = getattr(self, "schema", {}) if _specs is None else _specs + self.specs = specs if isinstance(specs, dict) else dict(specs) + self._value = None if value is None else self._value_sanitize(value) + if default is not None: + default = self._value_sanitize(default) + self.default = self.__class__( + value=default, + impl=self.tag, + expl=self._expl, + ) + if value is None: + self._value = default + + def _bits2octets(self, bits): + if len(self.specs) > 0: + bits = bits.rstrip("0") + bit_len = len(bits) + bits += "0" * ((8 - (bit_len % 8)) % 8) + octets = bytearray(len(bits) // 8) + for i in six_xrange(len(octets)): + octets[i] = int(bits[i * 8:(i * 8) + 8], 2) + return bit_len, bytes(octets) + + def _value_sanitize(self, value): + if issubclass(value.__class__, BitString): + return value._value + if isinstance(value, (string_types, binary_type)): + if ( + isinstance(value, string_types) and + value.startswith("'") and + value.endswith("'B") + ): + value = value[1:-2] + if not set(value) <= set(("0", "1")): + raise ValueError("B's coding contains unacceptable chars") + return self._bits2octets(value) + elif isinstance(value, binary_type): + return (len(value) * 8, value) + else: + raise InvalidValueType(( + self.__class__, + string_types, + binary_type, + )) + if isinstance(value, tuple): + if ( + len(value) == 2 and + isinstance(value[0], integer_types) and + isinstance(value[1], binary_type) + ): + return value + bits = [] + for name in value: + bit = self.specs.get(name) + if bit is None: + raise ObjUnknown("BitString value: %s" % name) + bits.append(bit) + if len(bits) == 0: + return self._bits2octets("") + bits = set(bits) + return self._bits2octets("".join( + ("1" if bit in bits else "0") + for bit in six_xrange(max(bits) + 1) + )) + raise InvalidValueType((self.__class__, binary_type, string_types)) + + @property + def ready(self): + return self._value is not None + + def copy(self): + obj = self.__class__(_specs=self.specs) + obj._value = self._value + obj.tag = self.tag + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + return obj + + def __iter__(self): + self._assert_ready() + for i in six_xrange(self._value[0]): + yield self[i] + + @property + def bit_len(self): + self._assert_ready() + return self._value[0] + + def __bytes__(self): + self._assert_ready() + return self._value[1] + + def __eq__(self, their): + if isinstance(their, bytes): + return self._value[1] == their + if not issubclass(their.__class__, BitString): + return False + return ( + self._value == their._value and + self.tag == their.tag and + self._expl == their._expl + ) + + @property + def named(self): + return [name for name, bit in self.specs.items() if self[bit]] + + def __call__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + _specs=self.specs, + ) + + def __getitem__(self, key): + if isinstance(key, int): + bit_len, octets = self._value + if key >= bit_len: + return False + return ( + byte2int(memoryview(octets)[key // 8:]) >> + (7 - (key % 8)) + ) & 1 == 1 + if isinstance(key, string_types): + value = self.specs.get(key) + if value is None: + raise ObjUnknown("BitString value: %s" % key) + return self[value] + raise InvalidValueType((int, str)) + + def _encode(self): + self._assert_ready() + bit_len, octets = self._value + return b"".join(( + self.tag, + len_encode(len(octets) + 1), + int2byte((8 - bit_len % 8) % 8), + octets, + )) + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, _, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self.tag: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + pad_size = byte2int(v) + if l == 1 and pad_size != 0: + raise DecodeError( + "invalid empty value", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if pad_size > 7: + raise DecodeError( + "too big pad", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if byte2int(v[-1:]) & ((1 << pad_size) - 1) != 0: + raise DecodeError( + "invalid pad", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] + obj = self.__class__( + value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _specs=self.specs, + _decoded=(offset, llen, l), + ) + return obj, tail + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + value = None + blob = None + if self.ready: + bit_len, blob = self._value + value = "%d bits" % bit_len + if len(self.specs) > 0: + blob = tuple(self.named) + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + value=value, + blob=blob, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + ) + + +class OctetString(Obj): + """``OCTET STRING`` binary string type + + >>> s = OctetString(b"hello world") + OCTET STRING 11 bytes 68656c6c6f20776f726c64 + >>> s == OctetString(b"hello world") + True + >>> bytes(s) + b'hello world' + + >>> OctetString(b"hello", bounds=(4, 4)) + Traceback (most recent call last): + pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4 + >>> OctetString(b"hell", bounds=(4, 4)) + OCTET STRING 4 bytes 68656c6c + """ + __slots__ = ("_bound_min", "_bound_max") + tag_default = tag_encode(4) + asn1_type_name = "OCTET STRING" + + def __init__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + ): + """ + :param value: set the value. Either binary type, or + :py:class:`pyderasn.OctetString` object + :param bounds: set ``(MIN, MAX)`` value size constraint. + (-inf, +inf) by default + :param bytes impl: override default tag with ``IMPLICIT`` one + :param bytes expl: override default tag with ``EXPLICIT`` one + :param default: set default value. Type same as in ``value`` + :param bool optional: is object ``OPTIONAL`` in sequence + """ + super(OctetString, self).__init__( + impl, + expl, + default, + optional, + _decoded, + ) + self._value = value + if bounds is None: + self._bound_min, self._bound_max = getattr( + self, + "bounds", + (0, float("+inf")), + ) + else: + self._bound_min, self._bound_max = bounds + if value is not None: + self._value = self._value_sanitize(value) + if default is not None: + default = self._value_sanitize(default) + self.default = self.__class__( + value=default, + impl=self.tag, + expl=self._expl, + ) + if self._value is None: + self._value = default + + def _value_sanitize(self, value): + if issubclass(value.__class__, OctetString): + value = value._value + elif isinstance(value, binary_type): + pass + else: + raise InvalidValueType((self.__class__, bytes)) + if not self._bound_min <= len(value) <= self._bound_max: + raise BoundsError(self._bound_min, len(value), self._bound_max) + return value + + @property + def ready(self): + return self._value is not None + + def copy(self): + obj = self.__class__() + obj._value = self._value + obj._bound_min = self._bound_min + obj._bound_max = self._bound_max + obj.tag = self.tag + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + return obj + + def __bytes__(self): + self._assert_ready() + return self._value + + def __eq__(self, their): + if isinstance(their, binary_type): + return self._value == their + if not issubclass(their.__class__, OctetString): + return False + return ( + self._value == their._value and + self.tag == their.tag and + self._expl == their._expl + ) + + def __call__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + bounds=( + (self._bound_min, self._bound_max) + if bounds is None else bounds + ), + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + ) + + def _encode(self): + self._assert_ready() + return b"".join(( + self.tag, + len_encode(len(self._value)), + self._value, + )) + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, _, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self.tag: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] + try: + obj = self.__class__( + value=v.tobytes(), + bounds=(self._bound_min, self._bound_max), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, l), + ) + except BoundsError as err: + raise DecodeError( + msg=str(err), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + return obj, tail + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + value=("%d bytes" % len(self._value)) if self.ready else None, + blob=self._value if self.ready else None, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + ) + + +class Null(Obj): + """``NULL`` null object + + >>> n = Null() + NULL + >>> n.ready + True + """ + __slots__ = () + tag_default = tag_encode(5) + asn1_type_name = "NULL" + + def __init__( + self, + value=None, # unused, but Sequence passes it + impl=None, + expl=None, + optional=False, + _decoded=(0, 0, 0), + ): + """ + :param bytes impl: override default tag with ``IMPLICIT`` one + :param bytes expl: override default tag with ``EXPLICIT`` one + :param bool optional: is object ``OPTIONAL`` in sequence + """ + super(Null, self).__init__(impl, expl, None, optional, _decoded) + self.default = None + + @property + def ready(self): + return True + + def copy(self): + obj = self.__class__() + obj.tag = self.tag + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + return obj + + def __eq__(self, their): + if not issubclass(their.__class__, Null): + return False + return ( + self.tag == their.tag and + self._expl == their._expl + ) + + def __call__( + self, + value=None, + impl=None, + expl=None, + optional=None, + ): + return self.__class__( + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + optional=self.optional if optional is None else optional, + ) + + def _encode(self): + return self.tag + len_encode(0) + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, _, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self.tag: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, _, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l != 0: + raise InvalidLength( + "Null must have zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj = self.__class__( + impl=self.tag, + expl=self._expl, + optional=self.optional, + _decoded=(offset, 1, 0), + ) + return obj, v + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + optional=self.optional, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + ) + + +class ObjectIdentifier(Obj): + """``OBJECT IDENTIFIER`` OID type + + >>> oid = ObjectIdentifier((1, 2, 3)) + OBJECT IDENTIFIER 1.2.3 + >>> oid == ObjectIdentifier("1.2.3") + True + >>> tuple(oid) + (1, 2, 3) + >>> str(oid) + '1.2.3' + >>> oid + (4, 5) + ObjectIdentifier("1.7") + OBJECT IDENTIFIER + + >>> str(ObjectIdentifier((3, 1))) + Traceback (most recent call last): + pyderasn.InvalidOID: unacceptable first arc value + """ + __slots__ = () + tag_default = tag_encode(6) + asn1_type_name = "OBJECT IDENTIFIER" + + def __init__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + ): + """ + :param value: set the value. Either tuples of integers, + string of "."-concatenated integers, or + :py:class:`pyderasn.ObjectIdentifier` object + :param bytes impl: override default tag with ``IMPLICIT`` one + :param bytes expl: override default tag with ``EXPLICIT`` one + :param default: set default value. Type same as in ``value`` + :param bool optional: is object ``OPTIONAL`` in sequence + """ + super(ObjectIdentifier, self).__init__( + impl, + expl, + default, + optional, + _decoded, + ) + self._value = value + if value is not None: + self._value = self._value_sanitize(value) + if default is not None: + default = self._value_sanitize(default) + self.default = self.__class__( + value=default, + impl=self.tag, + expl=self._expl, + ) + if self._value is None: + self._value = default + + def __add__(self, their): + if isinstance(their, self.__class__): + return self.__class__(self._value + their._value) + if isinstance(their, tuple): + return self.__class__(self._value + their) + raise InvalidValueType((self.__class__, tuple)) + + def _value_sanitize(self, value): + if issubclass(value.__class__, ObjectIdentifier): + return value._value + if isinstance(value, string_types): + try: + value = tuple(int(arc) for arc in value.split(".")) + except ValueError: + raise InvalidOID("unacceptable arcs values") + if isinstance(value, tuple): + if len(value) < 2: + raise InvalidOID("less than 2 arcs") + first_arc = value[0] + if first_arc in (0, 1): + if not (0 <= value[1] <= 39): + raise InvalidOID("second arc is too wide") + elif first_arc == 2: + pass + else: + raise InvalidOID("unacceptable first arc value") + return value + raise InvalidValueType((self.__class__, str, tuple)) + + @property + def ready(self): + return self._value is not None + + def copy(self): + obj = self.__class__() + obj._value = self._value + obj.tag = self.tag + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + return obj + + def __iter__(self): + self._assert_ready() + return iter(self._value) + + def __str__(self): + return ".".join(str(arc) for arc in self._value or ()) + + def __hash__(self): + self._assert_ready() + return hash( + self.tag + + bytes(self._expl or b"") + + str(self._value).encode("ascii"), + ) + + def __eq__(self, their): + if isinstance(their, tuple): + return self._value == their + if not issubclass(their.__class__, ObjectIdentifier): + return False + return ( + self.tag == their.tag and + self._expl == their._expl and + self._value == their._value + ) + + def __lt__(self, their): + return self._value < their + + def __gt__(self, their): + return self._value > their + + def __call__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + ) + + def _encode(self): + self._assert_ready() + value = self._value + first_value = value[1] + first_arc = value[0] + if first_arc == 0: + pass + elif first_arc == 1: + first_value += 40 + elif first_arc == 2: + first_value += 80 + else: # pragma: no cover + raise RuntimeError("invalid arc is stored") + octets = [zero_ended_encode(first_value)] + for arc in value[2:]: + octets.append(zero_ended_encode(arc)) + v = b"".join(octets) + return b"".join((self.tag, len_encode(len(v)), v)) + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, _, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self.tag: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] + arcs = [] + while len(v) > 0: + i = 0 + arc = 0 + while True: + octet = indexbytes(v, i) + arc = (arc << 7) | (octet & 0x7F) + if octet & 0x80 == 0: + arcs.append(arc) + v = v[i + 1:] + break + i += 1 + if i == len(v): + raise DecodeError( + "unfinished OID", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + first_arc = 0 + second_arc = arcs[0] + if 0 <= second_arc <= 39: + first_arc = 0 + elif 40 <= second_arc <= 79: + first_arc = 1 + second_arc -= 40 + else: + first_arc = 2 + second_arc -= 80 + obj = self.__class__( + value=tuple([first_arc, second_arc] + arcs[1:]), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, l), + ) + return obj, tail + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + value=str(self) if self.ready else None, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + ) + + +class Enumerated(Integer): + """``ENUMERATED`` integer type + + This type is identical to :py:class:`pyderasn.Integer`, but requires + schema to be specified and does not accept values missing from it. + """ + __slots__ = () + tag_default = tag_encode(10) + asn1_type_name = "ENUMERATED" + + def __init__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=False, + _specs=None, + _decoded=(0, 0, 0), + bounds=None, # dummy argument, workability for Integer.decode + ): + super(Enumerated, self).__init__( + value=value, + impl=impl, + expl=expl, + default=default, + optional=optional, + _specs=_specs, + _decoded=_decoded, + ) + if len(self.specs) == 0: + raise ValueError("schema must be specified") + + def _value_sanitize(self, value): + if isinstance(value, self.__class__): + value = value._value + elif isinstance(value, integer_types): + if value not in list(self.specs.values()): + raise DecodeError( + "unknown integer value: %s" % value, + klass=self.__class__, + ) + elif isinstance(value, string_types): + value = self.specs.get(value) + if value is None: + raise ObjUnknown("integer value: %s" % value) + else: + raise InvalidValueType((self.__class__, int, str)) + return value + + def copy(self): + obj = self.__class__(_specs=self.specs) + obj._value = self._value + obj._bound_min = self._bound_min + obj._bound_max = self._bound_max + obj.tag = self.tag + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + return obj + + def __call__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=None, + _specs=None, + ): + return self.__class__( + value=value, + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + _specs=self.specs, + ) + + +class CommonString(OctetString): + """Common class for all strings + + Everything resembles :py:class:`pyderasn.OctetString`, except + ability to deal with unicode text strings. + + >>> hexenc("привет мир".encode("utf-8")) + 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180' + >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80")) + True + >>> s = UTF8String("привет мир") + UTF8String UTF8String привет мир + >>> str(s) + 'привет мир' + >>> hexenc(bytes(s)) + 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180' + + >>> PrintableString("привет мир") + Traceback (most recent call last): + UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128) + + >>> BMPString("ада", bounds=(2, 2)) + Traceback (most recent call last): + pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2 + >>> s = BMPString("ад", bounds=(2, 2)) + >>> s.encoding + 'utf-16-be' + >>> hexenc(bytes(s)) + '04300434' + + .. list-table:: + :header-rows: 1 + + * - Class + - Text Encoding + * - :py:class:`pyderasn.UTF8String` + - utf-8 + * - :py:class:`pyderasn.NumericString` + - ascii + * - :py:class:`pyderasn.PrintableString` + - ascii + * - :py:class:`pyderasn.TeletexString` + - ascii + * - :py:class:`pyderasn.T61String` + - ascii + * - :py:class:`pyderasn.VideotexString` + - iso-8859-1 + * - :py:class:`pyderasn.IA5String` + - ascii + * - :py:class:`pyderasn.GraphicString` + - iso-8859-1 + * - :py:class:`pyderasn.VisibleString` + - ascii + * - :py:class:`pyderasn.ISO646String` + - ascii + * - :py:class:`pyderasn.GeneralString` + - iso-8859-1 + * - :py:class:`pyderasn.UniversalString` + - utf-32-be + * - :py:class:`pyderasn.BMPString` + - utf-16-be + """ + __slots__ = ("encoding",) + + def _value_sanitize(self, value): + value_raw = None + value_decoded = None + if isinstance(value, self.__class__): + value_raw = value._value + elif isinstance(value, text_type): + value_decoded = value + elif isinstance(value, binary_type): + value_raw = value + else: + raise InvalidValueType((self.__class__, text_type, binary_type)) + value_raw = ( + value_decoded.encode(self.encoding) + if value_raw is None else value_raw + ) + value_decoded = ( + value_raw.decode(self.encoding) + if value_decoded is None else value_decoded + ) + if not self._bound_min <= len(value_decoded) <= self._bound_max: + raise BoundsError( + self._bound_min, + len(value_decoded), + self._bound_max, + ) + return value_raw + + def __eq__(self, their): + if isinstance(their, binary_type): + return self._value == their + if isinstance(their, text_type): + return self._value == their.encode(self.encoding) + if not isinstance(their, self.__class__): + return False + return ( + self._value == their._value and + self.tag == their.tag and + self._expl == their._expl + ) + + def __unicode__(self): + if self.ready: + return self._value.decode(self.encoding) + return text_type(self._value) + + def __repr__(self): + return pp_console_row(next(self.pps(no_unicode=PY2))) + + def pps(self, decode_path=(), no_unicode=False): + value = None + if self.ready: + value = hexenc(bytes(self)) if no_unicode else self.__unicode__() + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + value=value, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + ) + + +class UTF8String(CommonString): + __slots__ = () + tag_default = tag_encode(12) + encoding = "utf-8" + asn1_type_name = "UTF8String" + + +class NumericString(CommonString): + __slots__ = () + tag_default = tag_encode(18) + encoding = "ascii" + asn1_type_name = "NumericString" + + +class PrintableString(CommonString): + __slots__ = () + tag_default = tag_encode(19) + encoding = "ascii" + asn1_type_name = "PrintableString" + + +class TeletexString(CommonString): + __slots__ = () + tag_default = tag_encode(20) + encoding = "ascii" + asn1_type_name = "TeletexString" + + +class T61String(TeletexString): + __slots__ = () + asn1_type_name = "T61String" + + +class VideotexString(CommonString): + __slots__ = () + tag_default = tag_encode(21) + encoding = "iso-8859-1" + asn1_type_name = "VideotexString" + + +class IA5String(CommonString): + __slots__ = () + tag_default = tag_encode(22) + encoding = "ascii" + asn1_type_name = "IA5" + + +class UTCTime(CommonString): + """``UTCTime`` datetime type + + >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123)) + UTCTime UTCTime 2017-09-30T22:07:50 + >>> str(t) + '170930220750Z' + >>> bytes(t) + b'170930220750Z' + >>> t.todatetime() + datetime.datetime(2017, 9, 30, 22, 7, 50) + >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime() + datetime.datetime(1957, 9, 30, 22, 7, 50) + """ + __slots__ = () + tag_default = tag_encode(23) + encoding = "ascii" + asn1_type_name = "UTCTime" + + fmt = "%y%m%d%H%M%SZ" + + def __init__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + bounds=None, # dummy argument, workability for OctetString.decode + ): + """ + :param value: set the value. Either datetime type, or + :py:class:`pyderasn.UTCTime` object + :param bytes impl: override default tag with ``IMPLICIT`` one + :param bytes expl: override default tag with ``EXPLICIT`` one + :param default: set default value. Type same as in ``value`` + :param bool optional: is object ``OPTIONAL`` in sequence + """ + super(UTCTime, self).__init__( + impl=impl, + expl=expl, + default=default, + optional=optional, + _decoded=_decoded, + ) + self._value = value + if value is not None: + self._value = self._value_sanitize(value) + if default is not None: + default = self._value_sanitize(default) + self.default = self.__class__( + value=default, + impl=self.tag, + expl=self._expl, + ) + if self._value is None: + self._value = default + + def _value_sanitize(self, value): + if isinstance(value, self.__class__): + return value._value + if isinstance(value, datetime): + return value.strftime(self.fmt).encode("ascii") + if isinstance(value, binary_type): + value_decoded = value.decode("ascii") + if len(value_decoded) == 2 + 2 + 2 + 2 + 2 + 2 + 1: + try: + datetime.strptime(value_decoded, self.fmt) + except ValueError: + raise DecodeError("invalid UTCTime format") + return value + else: + raise DecodeError("invalid UTCTime length") + raise InvalidValueType((self.__class__, datetime)) + + def __eq__(self, their): + if isinstance(their, binary_type): + return self._value == their + if isinstance(their, datetime): + return self.todatetime() == their + if not isinstance(their, self.__class__): + return False + return ( + self._value == their._value and + self.tag == their.tag and + self._expl == their._expl + ) + + def todatetime(self): + """Convert to datetime + + :returns: datetime + + Pay attention that UTCTime can not hold full year, so all years + having < 50 years are treated as 20xx, 19xx otherwise, according + to X.509 recomendation. + """ + value = datetime.strptime(self._value.decode("ascii"), self.fmt) + year = value.year % 100 + return datetime( + year=(2000 + year) if year < 50 else (1900 + year), + month=value.month, +, + hour=value.hour, + minute=value.minute, + second=value.second, + ) + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + value=self.todatetime().isoformat() if self.ready else None, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + ) + + +class GeneralizedTime(UTCTime): + """``GeneralizedTime`` datetime type + + This type is similar to :py:class:`pyderasn.UTCTime`. + + >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123)) + GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123 + >>> str(t) + '20170930220750.000123Z' + >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50)) + GeneralizedTime GeneralizedTime 2057-09-30T22:07:50 + """ + __slots__ = () + tag_default = tag_encode(24) + asn1_type_name = "GeneralizedTime" + + fmt = "%Y%m%d%H%M%SZ" + fmt_ms = "%Y%m%d%H%M%S.%fZ" + + def _value_sanitize(self, value): + if isinstance(value, self.__class__): + return value._value + if isinstance(value, datetime): + return value.strftime( + self.fmt_ms if value.microsecond > 0 else self.fmt + ).encode("ascii") + if isinstance(value, binary_type): + value_decoded = value.decode("ascii") + if len(value_decoded) == 4 + 2 + 2 + 2 + 2 + 2 + 1: + try: + datetime.strptime(value_decoded, self.fmt) + except ValueError: + raise DecodeError( + "invalid GeneralizedTime (without ms) format", + ) + return value + elif len(value_decoded) >= 4 + 2 + 2 + 2 + 2 + 2 + 1 + 1 + 1: + try: + datetime.strptime(value_decoded, self.fmt_ms) + except ValueError: + raise DecodeError( + "invalid GeneralizedTime (with ms) format", + ) + return value + else: + raise DecodeError( + "invalid GeneralizedTime length", + klass=self.__class__, + ) + raise InvalidValueType((self.__class__, datetime)) + + def todatetime(self): + value = self._value.decode("ascii") + if len(value) == 4 + 2 + 2 + 2 + 2 + 2 + 1: + return datetime.strptime(value, self.fmt) + return datetime.strptime(value, self.fmt_ms) + + +class GraphicString(CommonString): + __slots__ = () + tag_default = tag_encode(25) + encoding = "iso-8859-1" + asn1_type_name = "GraphicString" + + +class VisibleString(CommonString): + __slots__ = () + tag_default = tag_encode(26) + encoding = "ascii" + asn1_type_name = "VisibleString" + + +class ISO646String(VisibleString): + __slots__ = () + asn1_type_name = "ISO646String" + + +class GeneralString(CommonString): + __slots__ = () + tag_default = tag_encode(27) + encoding = "iso-8859-1" + asn1_type_name = "GeneralString" + + +class UniversalString(CommonString): + __slots__ = () + tag_default = tag_encode(28) + encoding = "utf-32-be" + asn1_type_name = "UniversalString" + + +class BMPString(CommonString): + __slots__ = () + tag_default = tag_encode(30) + encoding = "utf-16-be" + asn1_type_name = "BMPString" + + +class Choice(Obj): + """``CHOICE`` special type + + :: + + class GeneralName(Choice): + schema = ( + ('rfc822Name', IA5String(impl=tag_ctxp(1))), + ('dNSName', IA5String(impl=tag_ctxp(2))), + ) + + >>> gn = GeneralName() + GeneralName CHOICE + >>> gn["rfc822Name"] = IA5String("foo@bar.baz") + GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz] + >>> gn["dNSName"] = IA5String("bar.baz") + GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz] + >>> gn["rfc822Name"] + None + >>> gn["dNSName"] + [2] IA5String IA5 bar.baz + >>> gn.choice + 'dNSName' + >>> gn.value == gn["dNSName"] + True + >>> gn.specs + OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)]) + + >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz"))) + GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz] + """ + __slots__ = ("specs",) + tag_default = None + asn1_type_name = "CHOICE" + + def __init__( + self, + value=None, + schema=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + ): + """ + :param value: set the value. Either ``(choice, value)`` tuple, or + :py:class:`pyderasn.Choice` object + :param bytes impl: can not be set, do **not** use it + :param bytes expl: override default tag with ``EXPLICIT`` one + :param default: set default value. Type same as in ``value`` + :param bool optional: is object ``OPTIONAL`` in sequence + """ + if impl is not None: + raise ValueError("no implicit tag allowed for CHOICE") + super(Choice, self).__init__(None, expl, default, optional, _decoded) + if schema is None: + schema = getattr(self, "schema", ()) + if len(schema) == 0: + raise ValueError("schema must be specified") + self.specs = ( + schema if isinstance(schema, OrderedDict) else OrderedDict(schema) + ) + self._value = None + if value is not None: + self._value = self._value_sanitize(value) + if default is not None: + default_value = self._value_sanitize(default) + default_obj = self.__class__(impl=self.tag, expl=self._expl) + default_obj.specs = self.specs + default_obj._value = default_value + self.default = default_obj + if value is None: + self._value = default_obj.copy()._value + + def _value_sanitize(self, value): + if isinstance(value, self.__class__): + return value._value + if isinstance(value, tuple) and len(value) == 2: + choice, obj = value + spec = self.specs.get(choice) + if spec is None: + raise ObjUnknown(choice) + if not isinstance(obj, spec.__class__): + raise InvalidValueType((spec,)) + return (choice, spec(obj)) + raise InvalidValueType((self.__class__, tuple)) + + @property + def ready(self): + return self._value is not None and self._value[1].ready + + def copy(self): + obj = self.__class__(schema=self.specs) + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + value = self._value + if value is not None: + obj._value = (value[0], value[1].copy()) + return obj + + def __eq__(self, their): + if isinstance(their, tuple) and len(their) == 2: + return self._value == their + if not isinstance(their, self.__class__): + return False + return ( + self.specs == their.specs and + self._value == their._value + ) + + def __call__( + self, + value=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + schema=self.specs, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + ) + + @property + def choice(self): + self._assert_ready() + return self._value[0] + + @property + def value(self): + self._assert_ready() + return self._value[1] + + def __getitem__(self, key): + if key not in self.specs: + raise ObjUnknown(key) + if self._value is None: + return None + choice, value = self._value + if choice != key: + return None + return value + + def __setitem__(self, key, value): + spec = self.specs.get(key) + if spec is None: + raise ObjUnknown(key) + if not isinstance(value, spec.__class__): + raise InvalidValueType((spec.__class__,)) + self._value = (key, spec(value)) + + @property + def tlen(self): + return 0 + + @property + def decoded(self): + return self._value[1].decoded if self.ready else False + + def _encode(self): + self._assert_ready() + return self._value[1].encode() + + def _decode(self, tlv, offset=0, decode_path=()): + for choice, spec in self.specs.items(): + try: + value, tail = spec.decode( + tlv, + offset=offset, + leavemm=True, + decode_path=decode_path + (choice,), + ) + except TagMismatch: + continue + obj = self.__class__( + schema=self.specs, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, 0, value.tlvlen), + ) + obj._value = (choice, value) + return obj, tail + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + + def __repr__(self): + value = pp_console_row(next(self.pps())) + if self.ready: + value = "%s[%r]" % (value, self.value) + return value + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + value=self.choice if self.ready else None, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + ) + if self.ready: + yield self.value.pps(decode_path=decode_path + (self.choice,)) + + +class PrimitiveTypes(Choice): + """Predefined ``CHOICE`` for all generic primitive types + + It could be useful for general decoding of some unspecified values: + + >>> PrimitiveTypes().decode(hexdec("0403666f6f"))[0].value + OCTET STRING 3 bytes 666f6f + >>> PrimitiveTypes().decode(hexdec("0203123456"))[0].value + INTEGER 1193046 + """ + __slots__ = () + schema = tuple((klass.__name__, klass()) for klass in ( + Boolean, + Integer, + BitString, + OctetString, + Null, + ObjectIdentifier, + UTF8String, + NumericString, + PrintableString, + TeletexString, + VideotexString, + IA5String, + UTCTime, + GeneralizedTime, + GraphicString, + VisibleString, + ISO646String, + GeneralString, + UniversalString, + BMPString, + )) + + +class Any(Obj): + """``ANY`` special type + + >>> Any(Integer(-123)) + ANY 020185 + >>> a = Any(OctetString(b"hello world").encode()) + ANY 040b68656c6c6f20776f726c64 + >>> hexenc(bytes(a)) + b'0x040x0bhello world' + """ + __slots__ = () + tag_default = tag_encode(0) + asn1_type_name = "ANY" + + def __init__( + self, + value=None, + expl=None, + optional=False, + _decoded=(0, 0, 0), + ): + """ + :param value: set the value. Either any kind of pyderasn's + **ready** object, or bytes. Pay attention that + **no** validation is performed is raw binary value + is valid TLV + :param bytes expl: override default tag with ``EXPLICIT`` one + :param bool optional: is object ``OPTIONAL`` in sequence + """ + super(Any, self).__init__(None, expl, None, optional, _decoded) + self._value = None if value is None else self._value_sanitize(value) + + def _value_sanitize(self, value): + if isinstance(value, self.__class__): + return value._value + if isinstance(value, Obj): + return value.encode() + if isinstance(value, binary_type): + return value + raise InvalidValueType((self.__class__, Obj, binary_type)) + + @property + def ready(self): + return self._value is not None + + def copy(self): + obj = self.__class__() + obj._value = self._value + obj.tag = self.tag + obj._expl = self._expl + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + return obj + + def __eq__(self, their): + if isinstance(their, binary_type): + return self._value == their + if issubclass(their.__class__, Any): + return self._value == their._value + return False + + def __call__( + self, + value=None, + expl=None, + optional=None, + ): + return self.__class__( + value=value, + expl=self._expl if expl is None else expl, + optional=self.optional if optional is None else optional, + ) + + def __bytes__(self): + self._assert_ready() + return self._value + + @property + def tlen(self): + return 0 + + def _encode(self): + self._assert_ready() + return self._value + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, tlen, lv = tag_strip(tlv) + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + tlvlen = tlen + llen + l + v, tail = tlv[:tlvlen], v[l:] + obj = self.__class__( + value=v.tobytes(), + expl=self._expl, + optional=self.optional, + _decoded=(offset, 0, tlvlen), + ) + obj.tag = t + return obj, tail + + def __repr__(self): + return pp_console_row(next(self.pps())) + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + blob=self._value if self.ready else None, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + ) + + +######################################################################## +# ASN.1 constructed types +######################################################################## + +class Sequence(Obj): + __slots__ = ("specs",) + tag_default = tag_encode(form=TagFormConstructed, num=16) + asn1_type_name = "SEQUENCE" + + def __init__( + self, + value=None, + schema=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + ): + super(Sequence, self).__init__(impl, expl, default, optional, _decoded) + if schema is None: + schema = getattr(self, "schema", ()) + self.specs = ( + schema if isinstance(schema, OrderedDict) else OrderedDict(schema) + ) + self._value = {} + if value is not None: + self._value = self._value_sanitize(value) + if default is not None: + default_value = self._value_sanitize(default) + default_obj = self.__class__(impl=self.tag, expl=self._expl) + default_obj.specs = self.specs + default_obj._value = default_value + self.default = default_obj + if value is None: + self._value = default_obj.copy()._value + + def _value_sanitize(self, value): + if not issubclass(value.__class__, Sequence): + raise InvalidValueType((Sequence,)) + return value._value + + @property + def ready(self): + for name, spec in self.specs.items(): + value = self._value.get(name) + if value is None: + if spec.optional: + continue + return False + else: + if not value.ready: + return False + return True + + def copy(self): + obj = self.__class__(schema=self.specs) + obj.tag = self.tag + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + obj._value = {k: v.copy() for k, v in self._value.items()} + return obj + + def __eq__(self, their): + if not isinstance(their, self.__class__): + return False + return ( + self.specs == their.specs and + self.tag == their.tag and + self._expl == their._expl and + self._value == their._value + ) + + def __call__( + self, + value=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + schema=self.specs, + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + ) + + def __contains__(self, key): + return key in self._value + + def __setitem__(self, key, value): + spec = self.specs.get(key) + if spec is None: + raise ObjUnknown(key) + if value is None: + self._value.pop(key, None) + return + if not isinstance(value, spec.__class__): + raise InvalidValueType((spec.__class__,)) + value = spec(value=value) + if spec.default is not None and value == spec.default: + self._value.pop(key, None) + return + self._value[key] = value + + def __getitem__(self, key): + value = self._value.get(key) + if value is not None: + return value + spec = self.specs.get(key) + if spec is None: + raise ObjUnknown(key) + if spec.default is not None: + return spec.default + return None + + def _encoded_values(self): + raws = [] + for name, spec in self.specs.items(): + value = self._value.get(name) + if value is None: + if spec.optional: + continue + raise ObjNotReady(name) + raws.append(value.encode()) + return raws + + def _encode(self): + v = b"".join(self._encoded_values()) + return b"".join((self.tag, len_encode(len(v)), v)) + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self.tag: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] + sub_offset = offset + tlen + llen + values = {} + for name, spec in self.specs.items(): + if len(v) == 0 and spec.optional: + continue + try: + value, v_tail = spec.decode( + v, + sub_offset, + leavemm=True, + decode_path=decode_path + (name,), + ) + except TagMismatch: + if spec.optional: + continue + raise + sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen) + v = v_tail + if spec.default is not None and value == spec.default: + # Encoded default values are not valid in DER, + # but we still allow that + continue + values[name] = value + if len(v) > 0: + raise DecodeError( + "remaining data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj = self.__class__( + schema=self.specs, + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, l), + ) + obj._value = values + return obj, tail + + def __repr__(self): + value = pp_console_row(next(self.pps())) + cols = [] + for name in self.specs: + _value = self._value.get(name) + if _value is None: + continue + cols.append(repr(_value)) + return "%s[%s]" % (value, ", ".join(cols)) + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + ) + for name in self.specs: + value = self._value.get(name) + if value is None: + continue + yield value.pps(decode_path=decode_path + (name,)) + + +class Set(Sequence): + __slots__ = () + tag_default = tag_encode(form=TagFormConstructed, num=17) + asn1_type_name = "SET" + + def _encode(self): + raws = self._encoded_values() + raws.sort() + v = b"".join(raws) + return b"".join((self.tag, len_encode(len(v)), v)) + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self.tag: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + offset=offset, + ) + v, tail = v[:l], v[l:] + sub_offset = offset + tlen + llen + values = {} + specs_items = self.specs.items + while len(v) > 0: + for name, spec in specs_items(): + try: + value, v_tail = spec.decode( + v, + sub_offset, + leavemm=True, + decode_path=decode_path + (name,), + ) + except TagMismatch: + continue + sub_offset += ( + value.expl_tlvlen if value.expled else value.tlvlen + ) + v = v_tail + if spec.default is None or value != spec.default: # pragma: no cover + # SeqMixing.test_encoded_default_accepted covers that place + values[name] = value + break + else: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj = self.__class__( + schema=self.specs, + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, l), + ) + obj._value = values + return obj, tail + + +class SequenceOf(Obj): + __slots__ = ("spec", "_bound_min", "_bound_max") + tag_default = tag_encode(form=TagFormConstructed, num=16) + asn1_type_name = "SEQUENCE OF" + + def __init__( + self, + value=None, + schema=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=False, + _decoded=(0, 0, 0), + ): + super(SequenceOf, self).__init__( + impl, + expl, + default, + optional, + _decoded, + ) + if schema is None: + schema = getattr(self, "schema", None) + if schema is None: + raise ValueError("schema must be specified") + self.spec = schema + if bounds is None: + self._bound_min, self._bound_max = getattr( + self, + "bounds", + (0, float("+inf")), + ) + else: + self._bound_min, self._bound_max = bounds + self._value = [] + if value is not None: + self._value = self._value_sanitize(value) + if default is not None: + default_value = self._value_sanitize(default) + default_obj = self.__class__( + schema=schema, + impl=self.tag, + expl=self._expl, + ) + default_obj._value = default_value + self.default = default_obj + if value is None: + self._value = default_obj.copy()._value + + def _value_sanitize(self, value): + if issubclass(value.__class__, SequenceOf): + value = value._value + elif hasattr(value, "__iter__"): + value = list(value) + else: + raise InvalidValueType((self.__class__, iter)) + if not self._bound_min <= len(value) <= self._bound_max: + raise BoundsError(self._bound_min, len(value), self._bound_max) + for v in value: + if not isinstance(v, self.spec.__class__): + raise InvalidValueType((self.spec.__class__,)) + return value + + @property + def ready(self): + return all(v.ready for v in self._value) + + def copy(self): + obj = self.__class__(schema=self.spec) + obj._bound_min = self._bound_min + obj._bound_max = self._bound_max + obj.tag = self.tag + obj._expl = self._expl + obj.default = self.default + obj.optional = self.optional + obj.offset = self.offset + obj.llen = self.llen + obj.vlen = self.vlen + obj._value = [v.copy() for v in self._value] + return obj + + def __eq__(self, their): + if isinstance(their, self.__class__): + return ( + self.spec == their.spec and + self.tag == their.tag and + self._expl == their._expl and + self._value == their._value + ) + if hasattr(their, "__iter__"): + return self._value == list(their) + return False + + def __call__( + self, + value=None, + bounds=None, + impl=None, + expl=None, + default=None, + optional=None, + ): + return self.__class__( + value=value, + schema=self.spec, + bounds=( + (self._bound_min, self._bound_max) + if bounds is None else bounds + ), + impl=self.tag if impl is None else impl, + expl=self._expl if expl is None else expl, + default=self.default if default is None else default, + optional=self.optional if optional is None else optional, + ) + + def __contains__(self, key): + return key in self._value + + def append(self, value): + if not isinstance(value, self.spec.__class__): + raise InvalidValueType((self.spec.__class__,)) + if len(self._value) + 1 > self._bound_max: + raise BoundsError( + self._bound_min, + len(self._value) + 1, + self._bound_max, + ) + self._value.append(value) + + def __iter__(self): + self._assert_ready() + return iter(self._value) + + def __len__(self): + self._assert_ready() + return len(self._value) + + def __setitem__(self, key, value): + if not isinstance(value, self.spec.__class__): + raise InvalidValueType((self.spec.__class__,)) + self._value[key] = self.spec(value=value) + + def __getitem__(self, key): + return self._value[key] + + def _encoded_values(self): + return [v.encode() for v in self._value] + + def _encode(self): + v = b"".join(self._encoded_values()) + return b"".join((self.tag, len_encode(len(v)), v)) + + def _decode(self, tlv, offset=0, decode_path=()): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t != self.tag: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: + l, llen, v = len_decode(lv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + v, tail = v[:l], v[l:] + sub_offset = offset + tlen + llen + _value = [] + spec = self.spec + while len(v) > 0: + value, v_tail = spec.decode( + v, + sub_offset, + leavemm=True, + decode_path=decode_path + (str(len(_value)),), + ) + sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen) + v = v_tail + _value.append(value) + obj = self.__class__( + value=_value, + schema=spec, + bounds=(self._bound_min, self._bound_max), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, l), + ) + return obj, tail + + def __repr__(self): + return "%s[%s]" % ( + pp_console_row(next(self.pps())), + ", ".join(repr(v) for v in self._value), + ) + + def pps(self, decode_path=()): + yield _pp( + asn1_type_name=self.asn1_type_name, + obj_name=self.__class__.__name__, + decode_path=decode_path, + optional=self.optional, + default=self == self.default, + impl=None if self.tag == self.tag_default else tag_decode(self.tag), + expl=None if self._expl is None else tag_decode(self._expl), + offset=self.offset, + tlen=self.tlen, + llen=self.llen, + vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + ) + for i, value in enumerate(self._value): + yield value.pps(decode_path=decode_path + (str(i),)) + + +class SetOf(SequenceOf): + __slots__ = () + tag_default = tag_encode(form=TagFormConstructed, num=17) + asn1_type_name = "SET OF" + + def _encode(self): + raws = self._encoded_values() + raws.sort() + v = b"".join(raws) + return b"".join((self.tag, len_encode(len(v)), v)) + + +def obj_by_path(pypath): # pragma: no cover + """Import object specified as string Python path + + Modules must be separated from classes/functions with ``:``. + + >>> obj_by_path("") + + >>> obj_by_path("") + + """ + mod, objs = pypath.rsplit(":", 1) + from importlib import import_module + obj = import_module(mod) + for obj_name in objs.split("."): + obj = getattr(obj, obj_name) + return obj + + +def main(): # pragma: no cover + import argparse + parser = argparse.ArgumentParser(description="PyDERASN ASN.1 DER decoder") + parser.add_argument( + "--oids", + help="Python path to dictionary with OIDs", + ) + parser.add_argument( + "--schema", + help="Python path to schema definition to use", + ) + parser.add_argument( + "DERFile", + type=argparse.FileType("rb"), + help="Python path to schema definition to use", + ) + args = parser.parse_args() + der = memoryview( + args.DERFile.close() + oids = obj_by_path(args.oids) if args.oids else {} + if args.schema: + schema = obj_by_path(args.schema) + from functools import partial + pprinter = partial(pprint, big_blobs=True) + else: + # All of this below is a big hack with self references + choice = PrimitiveTypes() + choice.specs["SequenceOf"] = SequenceOf(schema=choice) + choice.specs["SetOf"] = SetOf(schema=choice) + for i in range(31): + choice.specs["SequenceOf%d" % i] = SequenceOf( + schema=choice, + expl=tag_ctxc(i), + ) + choice.specs["Any"] = Any() + + # Class name equals to type name, to omit it from output + class SEQUENCEOF(SequenceOf): + __slots__ = () + schema = choice + schema = SEQUENCEOF() + + def pprint_any(obj, oids=None): + def _pprint_pps(pps): + for pp in pps: + if hasattr(pp, "_fields"): + if pp.asn1_type_name == Choice.asn1_type_name: + continue + pp_kwargs = pp._asdict() + pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",) + pp = _pp(**pp_kwargs) + yield pp_console_row( + pp, + oids=oids, + with_offsets=True, + with_blob=False, + ) + for row in pp_console_blob(pp): + yield row + else: + for row in _pprint_pps(pp): + yield row + return "\n".join(_pprint_pps(obj.pps())) + pprinter = pprint_any + obj, tail = schema().decode(der) + print(pprinter(obj, oids=oids)) + if tail != b"": + print("\nTrailing data: %s" % hexenc(tail)) + + +if __name__ == "__main__": + main() diff --git a/pyderasn.pyi b/pyderasn.pyi new file mode 100644 index 0000000..83f0032 --- /dev/null +++ b/pyderasn.pyi @@ -0,0 +1,822 @@ +from datetime import datetime +from typing import Any as TAny +from typing import Dict +from typing import NamedTuple +from typing import Optional +from typing import Sequence as TSequence +from typing import Tuple +from typing import Type +from typing import Union + + +TagClassUniversal = ... # type: int +TagClassApplication = ... # type: int +TagClassContext = ... # type: int +TagClassPrivate = ... # type: int +TagFormPrimitive = ... # type: int +TagFormConstructed = ... # type: int +TagClassReprs = ... # type: Dict[int, str] + + +class DecodeError(Exception): + msg = ... # type: str + klass = ... # type: Type + decode_path = ... # type: Tuple[str, ...] + offset = ... # type: int + + def __init__( + self, + msg: str=..., + klass: Optional[TAny]=..., + decode_path: TAny=..., + offset: int=..., + ) -> None: ... + +class NotEnoughData(DecodeError): ... + +class TagMismatch(DecodeError): ... + +class InvalidLength(DecodeError): ... + +class InvalidOID(DecodeError): ... + +class ObjUnknown(ValueError): + name = ... # type: str + + def __init__(self, name: str) -> None: ... + +class ObjNotReady(ValueError): + name = ... # type: str + + def __init__(self, str) -> None: ... + +class InvalidValueType(ValueError): + expected_types = ... # type: Tuple[Type, ...] + + def __init__(self, expected_types: Tuple[Type, ...]) -> None: ... + +class BoundsError(ValueError): + bound_min = ... # type: int + value = ... # type: int + bound_max = ... # type: int + + def __init__(self, bound_min: int, value: int, bound_max: int) -> None: ... + +def hexdec(data: str) -> bytes: ... + +def hexenc(data: bytes) -> str: ... + +def int_bytes_len(num: int, byte_len: int=...) -> int: ... + +def zero_ended_encode(num: int) -> bytes: ... + +def tag_encode(num: int, klass: int=..., form: int=...) -> bytes: ... + +def tag_decode(tag: bytes) -> Tuple[int, int, int]: ... + +def tag_ctxp(num: int) -> bytes: ... + +def tag_ctxc(num: int) -> bytes: ... + +def tag_strip(data: memoryview) -> Tuple[memoryview, int, memoryview]: ... + +def len_encode(l: int) -> bytes: ... + +def len_decode(data: memoryview) -> Tuple[int, int, memoryview]: ... + +class Obj: + tag = ... # type: bytes + optional = ... # type: bool + + def __init__( + self, + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[TAny]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + @property + def decoded(self) -> bool: ... + + def copy(self) -> "Obj": ... + + @property + def tlen(self) -> int: ... + + @property + def tlvlen(self) -> int: ... + + def encode(self) -> bytes: ... + + def decode( + self, + data: bytes, + offset: int=..., + leavemm: bool=..., + decode_path: Tuple[str, ...]=..., + ) -> Tuple[Obj, bytes]: ... + + @property + def expled(self) -> bool: ... + + @property + def expl_tag(self) -> bytes: ... + + @property + def expl_tlen(self) -> int: ... + + @property + def expl_llen(self) -> int: ... + + @property + def expl_offset(self) -> int: ... + + @property + def expl_vlen(self) -> int: ... + + @property + def expl_tlvlen(self) -> int: ... + + +PP = NamedTuple("PP", ( + ("asn1_type_name", str), + ("obj_name", str), + ("decode_path", Tuple[str, ...]), + ("value", Optional[str]), + ("blob", Optional[Union[bytes, Tuple[str, ...]]]), + ("optional", bool), + ("default", bool), + ("impl", Optional[Tuple[int, int, int]]), + ("expl", Optional[Tuple[int, int, int]]), + ("offset", int), + ("tlen", int), + ("llen", int), + ("vlen", int), + ("expl_offset", int), + ("expl_tlen", int), + ("expl_llen", int), + ("expl_vlen", int), +)) + + +def pp_console_row( + pp: PP, + oids: Optional[Dict[str, str]]=..., + with_offsets: bool=..., + with_blob: bool=..., +): ... + +def pp_console_blob(pp: PP) -> TSequence[str]: ... + +def pprint( + obj: Obj, + oids: Optional[Dict[str, str]]=..., + big_blobs: bool=..., +): ... + + +class Boolean(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + default = ... # type: "Boolean" + + def __init__( + self, + value: Optional[Union["Boolean", bool]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["Boolean", bool]]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "Boolean": ... + + def __call__( + self, + value: Optional[Union["Boolean", bool]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["Boolean", bool]]=..., + optional: Optional[bool]=..., + ) -> "Boolean": ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class Integer(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + specs = ... # type: Dict[str, int] + default = ... # type: "Integer" + + def __init__( + self, + value: Optional[Union["Integer", int, str]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["Integer", int, str]]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "Integer": ... + + @property + def named(self) -> Optional[str]: ... + + def __call__( + self, + value: Optional[Union["Integer", int, str]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["Integer", int, str]]=..., + optional: Optional[bool]=..., + ) -> "Integer": ... + + def __int__(self) -> int: ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class BitString(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + specs = ... # type: Dict[str, int] + default = ... # type: "BitString" + + def __init__( + self, + value: Optional[Union["BitString", bytes, Tuple[str, ...]]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["BitString", bytes, Tuple[str, ...]]]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "BitString": ... + + @property + def bit_len(self) -> int: ... + + @property + def named(self) -> TSequence[str]: ... + + def __call__( + self, + value: Optional[Union["BitString", bytes, Tuple[str, ...]]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["BitString", bytes, Tuple[str, ...]]]=..., + optional: Optional[bool]=..., + ) -> "BitString": ... + + def __bytes__(self) -> bytes: ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class OctetString(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + default = ... # type: "OctetString" + + def __init__( + self, + value: Optional[Union["OctetString", bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["OctetString", bytes]]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "OctetString": ... + + def __call__( + self, + value: Optional[Union["OctetString", bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["OctetString", bytes]]=..., + optional: Optional[bool]=..., + ) -> "OctetString": ... + + def __bytes__(self) -> bytes: ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class Null(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + default = ... # type: "Null" + + def __init__( + self, + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "Null": ... + + def __call__( + self, + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + optional: Optional[bool]=..., + ) -> "Null": ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class ObjectIdentifier(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + default = ... # type: "ObjectIdentifier" + + def __init__( + self, + value: Optional[Union["ObjectIdentifier", str, Tuple[int, ...]]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["ObjectIdentifier", str, Tuple[int, ...]]]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "ObjectIdentifier": ... + + def __call__( + self, + value: Optional[Union["ObjectIdentifier", str, Tuple[int, ...]]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["ObjectIdentifier", str, Tuple[int, ...]]]=..., + optional: Optional[bool]=..., + ) -> "ObjectIdentifier": ... + + def __add__( + self, + their: Union["ObjectIdentifier", Tuple[int, ...]], + ) -> "ObjectIdentifier": ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class Enumerated(Integer): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + default = ... # type: "Enumerated" + + def __init__( + self, + value: Optional[Union["Enumerated", str, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["Enumerated", str, int]]=..., + optional: bool=..., + ) -> None: ... + + def copy(self) -> "Enumerated": ... + + def __call__( # type: ignore + self, + value: Optional[Union["Enumerated", str, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["Enumerated", str, int]]=..., + optional: Optional[bool]=..., + ) -> "Enumerated": ... + + +class CommonString(OctetString): + def pps( + self, + decode_path: Tuple[str, ...]=..., + no_unicode: bool=..., + ) -> TSequence[PP]: ... + + +class UTF8String(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "UTF8String" + + def __init__( + self, + value: Optional[Union["UTF8String", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["UTF8String", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + def __str__(self) -> str: ... + + +class NumericString(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "NumericString" + + def __init__( + self, + value: Optional[Union["NumericString", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["NumericString", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class PrintableString(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "PrintableString" + + def __init__( + self, + value: Optional[Union["PrintableString", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["PrintableString", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class TeletexString(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "TeletexString" + + def __init__( + self, + value: Optional[Union["TeletexString", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["TeletexString", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class T61String(TeletexString): + asn1_type_name = ... # type: str + default = ... # type: "T61String" + + +class VideotexString(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "VideotexString" + + def __init__( + self, + value: Optional[Union["VideotexString", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["VideotexString", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class IA5String(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "IA5String" + + def __init__( + self, + value: Optional[Union["IA5String", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["IA5String", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class UTCTime(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "UTCTime" + + def __init__( + self, + value: Optional[Union["UTCTime", datetime]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["UTCTime", datetime]]=..., + optional: bool=..., + ) -> None: ... + + def todatetime(self) -> datetime: ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... # type: ignore + + +class GeneralizedTime(UTCTime): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + default = ... # type: "GeneralizedTime" + + def todatetime(self) -> datetime: ... + + +class GraphicString(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "GraphicString" + + def __init__( + self, + value: Optional[Union["GraphicString", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["GraphicString", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class VisibleString(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "VisibleString" + + def __init__( + self, + value: Optional[Union["VisibleString", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["VisibleString", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class ISO646String(VisibleString): + asn1_type_name = ... # type: str + default = ... # type: "ISO646String" + + +class GeneralString(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "GeneralString" + + def __init__( + self, + value: Optional[Union["GeneralString", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["GeneralString", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class UniversalString(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "UniversalString" + + def __init__( + self, + value: Optional[Union["UniversalString", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["UniversalString", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class BMPString(CommonString): + tag_default = ... # type: bytes + encoding = ... # type: str + asn1_type_name = ... # type: str + default = ... # type: "BMPString" + + def __init__( + self, + value: Optional[Union["BMPString", str, bytes]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["BMPString", str, bytes]]=..., + optional: bool=..., + ) -> None: ... + + +class Choice(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + specs = ... # type: Dict[str, Obj] + default = ... # type: "Choice" + + def __init__( + self, + value: Optional[Union["Choice", Tuple[str, Obj]]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["Choice", Tuple[str, Obj]]]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "Choice": ... + + def __call__( + self, + value: Optional[Union["Choice", Tuple[str, Obj]]]=..., + expl: Optional[bytes]=..., + default: Optional[Union["Choice", Tuple[str, Obj]]]=..., + optional: Optional[bool]=..., + ) -> "Choice": ... + + def __getitem__(self, key: str) -> Optional[Obj]: ... + + def __setitem__(self, key: str, value: Obj) -> None: ... + + @property + def choice(self) -> str: ... + + @property + def value(self) -> Obj: ... + + @property + def tlen(self) -> int: ... + + @property + def decoded(self) -> bool: ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class PrimitiveTypes(Choice): + schema = ... # type: Dict[str, Obj] + + +class Any(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + default = ... # type: "Any" + + def __init__( + self, + value: Optional[Union[Obj, bytes]]=..., + expl: Optional[bytes]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "Any": ... + + def __call__( + self, + value: Optional[Union[Obj, bytes]]=..., + expl: Optional[bytes]=..., + optional: Optional[bool]=..., + ) -> "Any": ... + + def __bytes__(self) -> bytes: ... + + @property + def tlen(self) -> int: ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class Sequence(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + specs = ... # type: Dict[str, Obj] + default = ... # type: "Sequence" + + def __init__( + self, + value: Optional["Sequence"]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional["Sequence"]=..., + optional: bool=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "Sequence": ... + + def __call__( + self, + value: Optional["Sequence"]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional["Sequence"]=..., + optional: Optional[bool]=..., + ) -> "Sequence": ... + + def __getitem__(self, key: str) -> Optional[Obj]: ... + + def __setitem__(self, key: str, value: Obj) -> None: ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class Set(Sequence): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + default = ... # type: "Set" + + +class SequenceOf(Obj): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + spec = ... # type: Obj + default = ... # type: "SequenceOf" + + def __init__( + self, + value: Optional[Union["SequenceOf", TSequence[Obj]]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["SequenceOf", TSequence[Obj]]]=..., + optional: Optional[bool]=..., + ) -> None: ... + + @property + def ready(self) -> bool: ... + + def copy(self) -> "SequenceOf": ... + + def __call__( + self, + value: Optional[Union["SequenceOf", TSequence[Obj]]]=..., + bounds: Optional[Tuple[int, int]]=..., + impl: Optional[bytes]=..., + expl: Optional[bytes]=..., + default: Optional[Union["SequenceOf", TSequence[Obj]]]=..., + optional: Optional[bool]=..., + ) -> "SequenceOf": ... + + def __getitem__(self, key: int) -> Obj: ... + + def __iter__(self) -> TSequence[Obj]: ... + + def append(self, value: Obj) -> None: ... + + def pps(self, decode_path: Tuple[str, ...]=...) -> TSequence[PP]: ... + + +class SetOf(SequenceOf): + tag_default = ... # type: bytes + asn1_type_name = ... # type: str + default = ... # type: "SetOf" + + +def obj_by_path(pypath: str) -> TAny: ... diff --git a/ b/ new file mode 100644 index 0000000..762ee5b --- /dev/null +++ b/ @@ -0,0 +1,33 @@ +# coding: utf-8 + +from setuptools import setup + + +version = open("VERSION", "rb").read().strip().decode("ascii") + +setup( + name="pyderasn", + version=version, + description="Python ASN.1 DER codec with abstract structures", + long_description=open("README", "rb").read().decode("utf-8"), + author="Sergey Matveev", + author_email="", + url="", + license="LGPLv3+", + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Environment :: Console", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "Intended Audience :: Telecommunications Industry", + "License :: OSI Approved :: GNU Lesser # coding: utf-8
# PyDERASN -- Python ASN.1 DER codec with abstract structures
# Copyright (C) 2017 Sergey Matveev
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, "id-ce-cRLDistributionPoints", + "": "id-ce-authorityKeyIdentifier", + "": "id-ce-extKeyUsage", + "": "id-at-commonName", + "": "id-at-countryName", + "": "id-at-localityName", + "": "id-at-stateOrProvinceName", + "": "id-at-organizationName", + "": "id-at-organizationalUnitName", +} + + +class Version(Integer): + __slots__ = () + schema = ( + ("v1", 0), + ("v2", 1), + ("v3", 2), + ) + + +class CertificateSerialNumber(Integer): + __slots__ = () + pass + + +class AlgorithmIdentifier(Sequence): + __slots__ = () + schema = ( + ("algorithm", ObjectIdentifier()), + ("parameters", Any(optional=True)), + ) + + +class AttributeType(ObjectIdentifier): + __slots__ = () + pass + + +class AttributeValue(Any): + __slots__ = () + pass + + +class AttributeTypeAndValue(Sequence): + __slots__ = () + schema = ( + ("type", AttributeType()), + ("value", AttributeValue()), + ) + + +class RelativeDistinguishedName(SetOf): + __slots__ = () + schema = AttributeTypeAndValue() + bounds = (1, float("+inf")) + + +class RDNSequence(SequenceOf): + __slots__ = () + schema = RelativeDistinguishedName() + + +class Name(Choice): + __slots__ = () + schema = ( + ("rdnSequence", RDNSequence()), + ) + + +class Time(Choice): + __slots__ = () + schema = ( + ("utcTime", UTCTime()), + ("generalTime", GeneralizedTime()), + ) + + +class Validity(Sequence): + __slots__ = () + schema = ( + ("notBefore", Time()), + ("notAfter", Time()), + ) + + +class SubjectPublicKeyInfo(Sequence): + __slots__ = () + schema = ( + ("algorithm", AlgorithmIdentifier()), + ("subjectPublicKey", BitString()), + ) + + +class UniqueIdentifier(BitString): + __slots__ = () + pass + + +class Extension(Sequence): + __slots__ = () + schema = ( + ("extnID", ObjectIdentifier()), + ("critical", Boolean(default=False)), + ("extnValue", OctetString()), + ) + + +class Extensions(SequenceOf): + __slots__ = () + schema = Extension() + bounds = (1, float("+inf")) + + +class TBSCertificate(Sequence): + __slots__ = () + schema = ( + ("version", Version(expl=tag_ctxc(0), default="v1")), + ("serialNumber", CertificateSerialNumber()), + ("signature", AlgorithmIdentifier()), + ("issuer", Name()), + ("validity", Validity()), + ("subject", Name()), + ("subjectPublicKeyInfo", SubjectPublicKeyInfo()), + ("issuerUniqueID", UniqueIdentifier(impl=tag_ctxp(1), optional=True)), + ("subjectUniqueID", UniqueIdentifier(impl=tag_ctxp(2), optional=True)), + ("extensions", Extensions(expl=tag_ctxc(3), optional=True)), + ) + + +class Certificate(Sequence): + __slots__ = () + schema = ( + ("tbsCertificate", TBSCertificate()), + ("signatureAlgorithm", AlgorithmIdentifier()), + ("signatureValue", BitString()), + ) + + +class TestGoSelfSignedVector(TestCase): + def runTest(self): + raw = hexdec("".join(( + "30820218308201c20209008cc3379210ec2c98300d06092a864886f70d0101050", + "500308192310b3009060355040613025858311330110603550408130a536f6d65", + "2d5374617465310d300b06035504071304436974793121301f060355040a13184", + "96e7465726e6574205769646769747320507479204c7464311a30180603550403", + "131166616c73652e6578616d706c652e636f6d3120301e06092a864886f70d010", + "901161166616c7365406578616d706c652e636f6d301e170d3039313030383030", + "323535335a170d3130313030383030323535335a308192310b300906035504061", + "3025858311330110603550408130a536f6d652d5374617465310d300b06035504", + "071304436974793121301f060355040a1318496e7465726e65742057696467697", + "47320507479204c7464311a30180603550403131166616c73652e6578616d706c", + "652e636f6d3120301e06092a864886f70d010901161166616c7365406578616d7", + "06c652e636f6d305c300d06092a864886f70d0101010500034b003048024100cd", + "b7639c3278f006aa277f6eaf42902b592d8cbcbe38a1c92ba4695a331b1deadea", + "dd8e9a5c27e8c4c2fd0a8889657722a4f2af7589cf2c77045dc8fdeec357d0203", + "010001300d06092a864886f70d0101050500034100a67b06ec5ece92772ca413c", + "ba3ca12568fdc6c7b4511cd40a7f659980402df2b998bb9a4a8cbeb34c0f0a78c", + "f8d91ede14a5ed76bf116fe360aafa8821490435", + ))) + crt, tail = Certificate().decode(raw) + self.assertSequenceEqual(tail, b"") + tbs = crt["tbsCertificate"] + self.assertEqual(tbs["version"], 0) + self.assertFalse(tbs["version"].decoded) + self.assertNotIn("version", tbs) + self.assertEqual(tbs["serialNumber"], 10143011886257155224) + + def assert_raw_equals(obj, expect): + self.assertTrue(obj.decoded) + self.assertSequenceEqual( + raw[obj.offset:obj.offset + obj.tlvlen], + expect.encode(), + ) + assert_raw_equals(tbs["serialNumber"], Integer(10143011886257155224)) + algo_id = AlgorithmIdentifier() + algo_id["algorithm"] = ObjectIdentifier("1.2.840.113549.1.1.5") + algo_id["parameters"] = Any(Null()) + self.assertEqual(tbs["signature"], algo_id) + assert_raw_equals(tbs["signature"], algo_id) + issuer = Name() + rdnSeq = RDNSequence() + for oid, klass, text in ( + ("", PrintableString, "XX"), + ("", PrintableString, "Some-State"), + ("", PrintableString, "City"), + ("", PrintableString, "Internet Widgits Pty Ltd"), + ("", PrintableString, ""), + ("1.2.840.113549.1.9.1", IA5String, ""), + ): + attr = AttributeTypeAndValue() + attr["type"] = AttributeType(oid) + attr["value"] = AttributeValue(klass(text)) + rdn = RelativeDistinguishedName() + rdn.append(attr) + rdnSeq.append(rdn) + issuer["rdnSequence"] = rdnSeq + self.assertEqual(tbs["issuer"], issuer) + assert_raw_equals(tbs["issuer"], issuer) + validity = Validity() + validity["notBefore"] = Time( + ("utcTime", UTCTime(datetime(2009, 10, 8, 0, 25, 53))) + ) + validity["notAfter"] = Time( + ("utcTime", UTCTime(datetime(2010, 10, 8, 0, 25, 53))) + ) + self.assertEqual(tbs["validity"], validity) + assert_raw_equals(tbs["validity"], validity) + self.assertEqual(tbs["subject"], issuer) + assert_raw_equals(tbs["subject"], issuer) + spki = SubjectPublicKeyInfo() + algo_id["algorithm"] = ObjectIdentifier("1.2.840.113549.1.1.1") + spki["algorithm"] = algo_id + spki["subjectPublicKey"] = BitString(hexdec("".join(( + "3048024100cdb7639c3278f006aa277f6eaf42902b592d8cbcbe38a1c92ba4695", + "a331b1deadeadd8e9a5c27e8c4c2fd0a8889657722a4f2af7589cf2c77045dc8f", + "deec357d0203010001", + )))) + self.assertEqual(tbs["subjectPublicKeyInfo"], spki) + assert_raw_equals(tbs["subjectPublicKeyInfo"], spki) + self.assertNotIn("issuerUniqueID", tbs) + self.assertNotIn("subjectUniqueID", tbs) + self.assertNotIn("extensions", tbs) + algo_id["algorithm"] = ObjectIdentifier("1.2.840.113549.1.1.5") + self.assertEqual(crt["signatureAlgorithm"], algo_id) + self.assertEqual(crt["signatureValue"], BitString(hexdec("".join(( + "a67b06ec5ece92772ca413cba3ca12568fdc6c7b4511cd40a7f659980402df2b", + "998bb9a4a8cbeb34c0f0a78cf8d91ede14a5ed76bf116fe360aafa8821490435", + ))))) + self.assertSequenceEqual(crt.encode(), raw) + pprint(crt) + repr(crt) + + tbs = TBSCertificate() + tbs["serialNumber"] = CertificateSerialNumber(10143011886257155224) + + sign_algo_id = AlgorithmIdentifier() + sign_algo_id["algorithm"] = ObjectIdentifier("1.2.840.113549.1.1.5") + sign_algo_id["parameters"] = Any(Null()) + tbs["signature"] = sign_algo_id + + rdnSeq = RDNSequence() + for oid, klass, text in ( + ("", PrintableString, "XX"), + ("", PrintableString, "Some-State"), + ("", PrintableString, "City"), + ("", PrintableString, "Internet Widgits Pty Ltd"), + ("", PrintableString, ""), + ("1.2.840.113549.1.9.1", IA5String, ""), + ): + attr = AttributeTypeAndValue() + attr["type"] = AttributeType(oid) + attr["value"] = AttributeValue(klass(text)) + rdn = RelativeDistinguishedName() + rdn.append(attr) + rdnSeq.append(rdn) + issuer = Name() + issuer["rdnSequence"] = rdnSeq + tbs["issuer"] = issuer + tbs["subject"] = issuer + + validity = Validity() + validity["notBefore"] = Time(("utcTime", UTCTime(datetime(2009, 10, 8, 0, 25, 53)))) + validity["notAfter"] = Time(("utcTime", UTCTime(datetime(2010, 10, 8, 0, 25, 53)))) + tbs["validity"] = validity + + spki = SubjectPublicKeyInfo() + spki_algo_id = sign_algo_id.copy() + spki_algo_id["algorithm"] = ObjectIdentifier("1.2.840.113549.1.1.1") + spki["algorithm"] = spki_algo_id + spki["subjectPublicKey"] = BitString(hexdec("".join(( + "3048024100cdb7639c3278f006aa277f6eaf42902b592d8cbcbe38a1c92ba4695", + "a331b1deadeadd8e9a5c27e8c4c2fd0a8889657722a4f2af7589cf2c77045dc8f", + "deec357d0203010001", + )))) + tbs["subjectPublicKeyInfo"] = spki + + crt = Certificate() + crt["tbsCertificate"] = tbs + crt["signatureAlgorithm"] = sign_algo_id + crt["signatureValue"] = BitString(hexdec("".join(( + "a67b06ec5ece92772ca413cba3ca12568fdc6c7b4511cd40a7f659980402df2b", + "998bb9a4a8cbeb34c0f0a78cf8d91ede14a5ed76bf116fe360aafa8821490435", + )))) + self.assertSequenceEqual(crt.encode(), raw) + + +class TestGoPayPalVector(TestCase): + def runTest(self): + raw = hexdec("".join(( + "30820644308205ada003020102020300f09b300d06092a864886f70d010105050", + "030820112310b3009060355040613024553311230100603550408130942617263", + "656c6f6e61311230100603550407130942617263656c6f6e61312930270603550", + "40a13204950532043657274696669636174696f6e20417574686f726974792073", + "2e6c2e312e302c060355040a142567656e6572616c4069707363612e636f6d204", + "32e492e462e2020422d423632323130363935312e302c060355040b1325697073", + "434120434c41534541312043657274696669636174696f6e20417574686f72697", + "479312e302c06035504031325697073434120434c415345413120436572746966", + "69636174696f6e20417574686f726974793120301e06092a864886f70d0109011", + "61167656e6572616c4069707363612e636f6d301e170d30393032323432333034", + "31375a170d3131303232343233303431375a308194310b3009060355040613025", + "553311330110603550408130a43616c69666f726e696131163014060355040713", + "0d53616e204672616e636973636f3111300f060355040a1308536563757269747", + "931143012060355040b130b53656375726520556e6974312f302d060355040313", + "267777772e70617970616c2e636f6d0073736c2e736563757265636f6e6e65637", + "4696f6e2e636330819f300d06092a864886f70d010101050003818d0030818902", + "818100d269fa6f3a00b4211bc8b102d73f19b2c46db454f88b8accdb72c29e3c6", + "0b9c6913d82b77d99ffd12984c173539c82ddfc248c77d541f3e81e42a1ad2d9e", + "ff5b1026ce9d571773162338c8d6f1baa3965b16674a4f73973a4d14a4f4e23f8", + "b058342d1d0dc2f7ae5b610b211c0dc212a90ffae97715a4981ac40f33bb859b2", + "4f0203010001a38203213082031d30090603551d1304023000301106096086480", + "186f8420101040403020640300b0603551d0f0404030203f830130603551d2504", + "0c300a06082b06010505070301301d0603551d0e04160414618f61344355147f2", + "709ce4c8bea9b7b1925bc6e301f0603551d230418301680140e0760d439c91b5b", + "5d907b23c8d2349d4a9a463930090603551d1104023000301c0603551d1204153", + "013811167656e6572616c4069707363612e636f6d307206096086480186f84201", + "0d046516634f7267616e697a6174696f6e20496e666f726d6174696f6e204e4f5", + "42056414c4944415445442e20434c415345413120536572766572204365727469", + "666963617465206973737565642062792068747470733a2f2f7777772e6970736", + "3612e636f6d2f302f06096086480186f84201020422162068747470733a2f2f77", + "77772e69707363612e636f6d2f6970736361323030322f304306096086480186f", + "84201040436163468747470733a2f2f7777772e69707363612e636f6d2f697073", + "6361323030322f697073636132303032434c41534541312e63726c30460609608", + "6480186f84201030439163768747470733a2f2f7777772e69707363612e636f6d", + "2f6970736361323030322f7265766f636174696f6e434c41534541312e68746d6", + "c3f304306096086480186f84201070436163468747470733a2f2f7777772e6970", + "7363612e636f6d2f6970736361323030322f72656e6577616c434c41534541312", + "e68746d6c3f304106096086480186f84201080434163268747470733a2f2f7777", + "772e69707363612e636f6d2f6970736361323030322f706f6c696379434c41534", + "541312e68746d6c3081830603551d1f047c307a3039a037a0358633687474703a", + "2f2f7777772e69707363612e636f6d2f6970736361323030322f6970736361323", + "03032434c41534541312e63726c303da03ba0398637687474703a2f2f77777762", + "61636b2e69707363612e636f6d2f6970736361323030322f69707363613230303", + "2434c41534541312e63726c303206082b0601050507010104263024302206082b", + "060105050730018616687474703a2f2f6f6373702e69707363612e636f6d2f300", + from pyderasn import VisibleString


class TestGoSelfSignedVector(TestCase): assertRaisesRegex +from six import byte2int +from six import indexbytes +from six import int2byte +from six import iterbytes +from six import PY2 +from six import text_type + +from pyderasn import _pp +from pyderasn import Any +from pyderasn import BitString +from pyderasn import BMPString +from pyderasn import Boolean +from pyderasn import BoundsError +from pyderasn import Choice +from pyderasn import DecodeError +from pyderasn import Enumerated +from pyderasn import GeneralizedTime +from pyderasn import GeneralString +from pyderasn import GraphicString +from pyderasn import hexdec +from pyderasn import hexenc +from pyderasn import IA5String +from pyderasn import Integer +from pyderasn import InvalidLength +from pyderasn import InvalidOID +from pyderasn import InvalidValueType +from pyderasn import len_decode +from pyderasn import len_encode +from pyderasn import NotEnoughData +from pyderasn import Null +from pyderasn import NumericString +from pyderasn import ObjectIdentifier +from pyderasn import ObjNotReady +from pyderasn import ObjUnknown +from pyderasn import OctetString +from pyderasn import pp_console_row +from pyderasn import pprint +from pyderasn import PrintableString +from pyderasn import Sequence +from pyderasn import SequenceOf +from pyderasn import Set +from pyderasn import SetOf +from pyderasn import tag_ctxc +from pyderasn import tag_decode +from pyderasn import tag_encode +from pyderasn import tag_strip +from pyderasn import TagClassApplication +from pyderasn import TagClassContext +from pyderasn import TagClassPrivate +from pyderasn import TagClassUniversal +from pyderasn import TagFormConstructed +from pyderasn import TagFormPrimitive +from pyderasn import TagMismatch +from pyderasn import TeletexString +from pyderasn import UniversalString +from pyderasn import UTCTime +from pyderasn import UTF8String +from pyderasn import VideotexString +from pyderasn import VisibleString + + +settings.register_profile('local', settings( + deadline=5000, + perform_health_check=False, +)) +settings.load_profile('local') +LONG_TEST_MAX_EXAMPLES = settings().max_examples * 4 + +tag_classes = sampled_from(( + TagClassApplication, + TagClassContext, + TagClassPrivate, + TagClassUniversal, +)) +tag_forms = sampled_from((TagFormConstructed, TagFormPrimitive)) + + +class TestHex(TestCase): + @given(binary()) + def test_symmetric(self, data): + self.assertEqual(hexdec(hexenc(data)), data) + + +class TestTagCoder(TestCase): + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + tag_classes, + tag_forms, + integers(min_value=0, max_value=30), + binary(max_size=5), + ) + def test_short(self, klass, form, num, junk): + raw = tag_encode(klass=klass, form=form, num=num) + self.assertEqual(tag_decode(raw), (klass, form, num)) + self.assertEqual(len(raw), 1) + self.assertEqual( + byte2int(tag_encode(klass=klass, form=form, num=0)), + byte2int(raw) & (1 << 7 | 1 << 6 | 1 << 5), + ) + stripped, tlen, tail = tag_strip(memoryview(raw + junk)) + self.assertSequenceEqual(stripped.tobytes(), raw) + self.assertEqual(tlen, len(raw)) + self.assertSequenceEqual(tail, junk) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + tag_classes, + tag_forms, + integers(min_value=31), + binary(max_size=5), + ) + def test_long(self, klass, form, num, junk): + raw = tag_encode(klass=klass, form=form, num=num) + self.assertEqual(tag_decode(raw), (klass, form, num)) + self.assertGreater(len(raw), 1) + self.assertEqual( + byte2int(tag_encode(klass=klass, form=form, num=0)) | 31, + byte2int(raw[:1]), + ) + self.assertEqual(byte2int(raw[-1:]) & 0x80, 0) + self.assertTrue(all(b & 0x80 > 0 for b in iterbytes(raw[1:-1]))) + stripped, tlen, tail = tag_strip(memoryview(raw + junk)) + self.assertSequenceEqual(stripped.tobytes(), raw) + self.assertEqual(tlen, len(raw)) + self.assertSequenceEqual(tail, junk) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(integers(min_value=31)) + def test_unfinished_tag(self, num): + raw = bytearray(tag_encode(num=num)) + for i in range(1, len(raw)): + raw[i] |= 0x80 + with assertRaisesRegex(self, DecodeError, "unfinished tag"): + tag_strip(bytes(raw)) + + def test_go_vectors_valid(self): + for data, (eklass, etag, elen, eform) in ( + (b"\x80\x01", (TagClassContext, 0, 1, TagFormPrimitive)), + (b"\xa0\x01", (TagClassContext, 0, 1, TagFormConstructed)), + (b"\x02\x00", (TagClassUniversal, 2, 0, TagFormPrimitive)), + (b"\xfe\x00", (TagClassPrivate, 30, 0, TagFormConstructed)), + (b"\x1f\x1f\x00", (TagClassUniversal, 31, 0, TagFormPrimitive)), + (b"\x1f\x81\x00\x00", (TagClassUniversal, 128, 0, TagFormPrimitive)), + (b"\x1f\x81\x80\x01\x00", (TagClassUniversal, 0x4001, 0, TagFormPrimitive)), + (b"\x00\x81\x80", (TagClassUniversal, 0, 128, TagFormPrimitive)), + (b"\x00\x82\x01\x00", (TagClassUniversal, 0, 256, TagFormPrimitive)), + (b"\xa0\x84\x7f\xff\xff\xff", (TagClassContext, 0, 0x7fffffff, TagFormConstructed)), + ): + tag, _, len_encoded = tag_strip(memoryview(data)) + klass, form, num = tag_decode(tag) + _len, _, tail = len_decode(len_encoded) + self.assertSequenceEqual(tail, b"") + self.assertEqual(klass, eklass) + self.assertEqual(num, etag) + self.assertEqual(_len, elen) + self.assertEqual(form, eform) + + def test_go_vectors_invalid(self): + for data in ( + b"\x00\x83\x01\x00", + b"\x1f\x85", + b"\x30\x80", + b"\xa0\x82\x00\xff", + b"\xa0\x81\x7f", + ): + with self.assertRaises(DecodeError): + _, _, len_encoded = tag_strip(memoryview(data)) + len_decode(len_encoded) + + @given( + integers(min_value=0, max_value=127), + integers(min_value=0, max_value=2), + ) + def test_long_instead_of_short(self, l, dummy_num): + octets = (b"\x00" * dummy_num) + int2byte(l) + octets = int2byte((dummy_num + 1) | 0x80) + octets + with self.assertRaises(DecodeError): + len_decode(octets) + + +class TestLenCoder(TestCase): + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + integers(min_value=0, max_value=127), + binary(max_size=5), + ) + def test_short(self, l, junk): + raw = len_encode(l) + junk + decoded, llen, tail = len_decode(memoryview(raw)) + self.assertEqual(decoded, l) + self.assertEqual(llen, 1) + self.assertEqual(len(raw), 1 + len(junk)) + self.assertEqual(tail.tobytes(), junk) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + integers(min_value=128), + binary(max_size=5), + ) + def test_long(self, l, junk): + raw = len_encode(l) + junk + decoded, llen, tail = len_decode(memoryview(raw)) + self.assertEqual(decoded, l) + self.assertEqual((llen - 1) | 0x80, byte2int(raw)) + self.assertEqual(llen, len(raw) - len(junk)) + self.assertNotEqual(indexbytes(raw, 1), 0) + self.assertSequenceEqual(tail.tobytes(), junk) + + def test_empty(self): + with self.assertRaises(NotEnoughData): + len_decode(b"") + + @given(integers(min_value=128)) + def test_stripped(self, _len): + with self.assertRaises(NotEnoughData): + len_decode(len_encode(_len)[:-1]) + + +text_printable = text(alphabet=printable, min_size=1) + + +@composite +def text_letters(draw): + result = draw(text(alphabet=ascii_letters, min_size=1)) + if PY2: + result = result.encode("ascii") + return result + + +class CommonMixin(object): + def test_tag_default(self): + obj = self.base_klass() + self.assertEqual(obj.tag, obj.tag_default) + + def test_simultaneous_impl_expl(self): + with self.assertRaises(ValueError): + self.base_klass(impl=b"whatever", expl=b"whenever") + + @given(binary(), integers(), integers(), integers()) + def test_decoded(self, impl, offset, llen, vlen): + obj = self.base_klass(impl=impl, _decoded=(offset, llen, vlen)) + self.assertEqual(obj.offset, offset) + self.assertEqual(obj.llen, llen) + self.assertEqual(obj.vlen, vlen) + self.assertEqual(obj.tlen, len(impl)) + self.assertEqual(obj.tlvlen, obj.tlen + obj.llen + obj.vlen) + + @given(binary()) + def test_impl_inherited(self, impl_tag): + class Inherited(self.base_klass): + __slots__ = () + impl = impl_tag + obj = Inherited() + self.assertSequenceEqual(obj.impl, impl_tag) + self.assertFalse(obj.expled) + + @given(binary()) + def test_expl_inherited(self, expl_tag): + class Inherited(self.base_klass): + __slots__ = () + expl = expl_tag + obj = Inherited() + self.assertSequenceEqual(obj.expl, expl_tag) + self.assertTrue(obj.expled) + + def assert_copied_basic_fields(self, obj, obj_copied): + self.assertEqual(obj, obj_copied) + self.assertSequenceEqual(obj.tag, obj_copied.tag) + self.assertEqual(obj.expl_tag, obj_copied.expl_tag) + self.assertEqual(obj.default, obj_copied.default) + self.assertEqual(obj.optional, obj_copied.optional) + self.assertEqual(obj.offset, obj_copied.offset) + self.assertEqual(obj.llen, obj_copied.llen) + self.assertEqual(obj.vlen, obj_copied.vlen) + + +@composite +def boolean_values_strat(draw, do_expl=False): + value = draw(one_of(none(), booleans())) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + default = draw(one_of(none(), booleans())) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (value, impl, expl, default, optional, _decoded) + + +class BooleanInherited(Boolean): + __slots__ = () + + +class TestBoolean(CommonMixin, TestCase): + base_klass = Boolean + + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + Boolean(123) + repr(err.exception) + + @given(booleans()) + def test_optional(self, optional): + obj = Boolean(default=Boolean(False), optional=optional) + self.assertTrue(obj.optional) + + @given(booleans()) + def test_ready(self, value): + obj = Boolean() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + obj = Boolean(value) + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + + @given(booleans(), booleans(), binary(), binary()) + def test_comparison(self, value1, value2, tag1, tag2): + for klass in (Boolean, BooleanInherited): + obj1 = klass(value1) + obj2 = klass(value2) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == bool(obj2), value1 == value2) + obj1 = klass(value1, impl=tag1) + obj2 = klass(value1, impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + + @given(data_strategy()) + def test_call(self, d): + for klass in (Boolean, BooleanInherited): + ( + value_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(boolean_values_strat()) + obj_initial = klass( + value_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial or False, + _decoded_initial, + ) + ( + value, + impl, + expl, + default, + optional, + _decoded, + ) = d.draw(boolean_values_strat(do_expl=impl_initial is None)) + obj = obj_initial(value, impl, expl, default, optional) + if obj.ready: + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + self.assertEqual(obj, value_expected) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + self.assertEqual( + obj.default, + default_initial if default is None else default, + ) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + + @given(boolean_values_strat()) + def test_copy(self, values): + for klass in (Boolean, BooleanInherited): + obj = klass(*values) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + + @given( + booleans(), + integers(min_value=1).map(tag_encode), + ) + def test_stripped(self, value, tag_impl): + obj = Boolean(value, impl=tag_impl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + booleans(), + integers(min_value=1).map(tag_ctxc), + ) + def test_stripped_expl(self, value, tag_expl): + obj = Boolean(value, expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Boolean().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_expl_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Boolean(expl=Boolean.tag_default).decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Boolean().decode( + Boolean.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_expl_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Boolean(expl=Boolean.tag_default).decode( + Boolean.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + boolean_values_strat(), + booleans(), + integers(min_value=1).map(tag_ctxc), + integers(min_value=0), + ) + def test_symmetric(self, values, value, tag_expl, offset): + for klass in (Boolean, BooleanInherited): + _, _, _, default, optional, _decoded = values + obj = klass( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertNotEqual(obj_decoded, obj) + self.assertEqual(bool(obj_decoded), bool(obj_expled)) + self.assertEqual(bool(obj_decoded), bool(obj)) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + + @given(integers(min_value=2)) + def test_invalid_len(self, l): + with self.assertRaises(InvalidLength): + Boolean().decode(b"".join(( + Boolean.tag_default, + len_encode(l), + b"\x00" * l, + ))) + + @given(integers(min_value=0 + 1, max_value=255 - 1)) + def test_invalid_value(self, value): + with assertRaisesRegex(self, DecodeError, "unacceptable Boolean value"): + Boolean().decode(b"".join(( + Boolean.tag_default, + len_encode(1), + int2byte(value), + ))) + + +@composite +def integer_values_strat(draw, do_expl=False): + bound_min, value, default, bound_max = sorted(draw(sets( + integers(), + min_size=4, + max_size=4, + ))) + if draw(booleans()): + value = None + _specs = None + if draw(booleans()): + _specs = draw(sets(text_letters())) + values = draw(sets( + integers(), + min_size=len(_specs), + max_size=len(_specs), + )) + _specs = list(zip(_specs, values)) + bounds = None + if draw(booleans()): + bounds = (bound_min, bound_max) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + if draw(booleans()): + default = None + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (value, bounds, impl, expl, default, optional, _specs, _decoded) + + +class IntegerInherited(Integer): + __slots__ = () + + +class TestInteger(CommonMixin, TestCase): + base_klass = Integer + + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + Integer(12.3) + repr(err.exception) + + @given(sets(text_letters(), min_size=2)) + def test_unknown_name(self, names_input): + missing = names_input.pop() + + class Int(Integer): + __slots__ = () + schema = [(n, 123) for n in names_input] + with self.assertRaises(ObjUnknown) as err: + Int(missing) + repr(err.exception) + + @given(sets(text_letters(), min_size=2)) + def test_known_name(self, names_input): + class Int(Integer): + __slots__ = () + schema = [(n, 123) for n in names_input] + Int(names_input.pop()) + + @given(booleans()) + def test_optional(self, optional): + obj = Integer(default=Integer(0), optional=optional) + self.assertTrue(obj.optional) + + @given(integers()) + def test_ready(self, value): + obj = Integer() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + obj = Integer(value) + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + hash(obj) + + @given(integers(), integers(), binary(), binary()) + def test_comparison(self, value1, value2, tag1, tag2): + for klass in (Integer, IntegerInherited): + obj1 = klass(value1) + obj2 = klass(value2) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == int(obj2), value1 == value2) + obj1 = klass(value1, impl=tag1) + obj2 = klass(value1, impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + + @given(lists(integers())) + def test_sorted_works(self, values): + self.assertSequenceEqual( + [int(v) for v in sorted(Integer(v) for v in values)], + sorted(values), + ) + + @given(data_strategy()) + def test_named(self, d): + names_input = list(d.draw(sets(text_letters(), min_size=1))) + values_input = list(d.draw(sets( + integers(), + min_size=len(names_input), + max_size=len(names_input), + ))) + chosen_name = d.draw(sampled_from(names_input)) + names_input = dict(zip(names_input, values_input)) + + class Int(Integer): + __slots__ = () + schema = names_input + _int = Int(chosen_name) + self.assertEqual(_int.named, chosen_name) + self.assertEqual(int(_int), names_input[chosen_name]) + + @given(integers(), integers(min_value=0), integers(min_value=0)) + def test_bounds_satisfied(self, bound_min, bound_delta, value_delta): + value = bound_min + value_delta + bound_max = value + bound_delta + Integer(value=value, bounds=(bound_min, bound_max)) + + @given(sets(integers(), min_size=3, max_size=3)) + def test_bounds_unsatisfied(self, values): + values = sorted(values) + with self.assertRaises(BoundsError) as err: + Integer(value=values[0], bounds=(values[1], values[2])) + repr(err.exception) + with self.assertRaises(BoundsError) as err: + Integer(value=values[2], bounds=(values[0], values[1])) + repr(err.exception) + + @given(data_strategy()) + def test_call(self, d): + for klass in (Integer, IntegerInherited): + ( + value_initial, + bounds_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _specs_initial, + _decoded_initial, + ) = d.draw(integer_values_strat()) + obj_initial = klass( + value_initial, + bounds_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial or False, + _specs_initial, + _decoded_initial, + ) + ( + value, + bounds, + impl, + expl, + default, + optional, + _, + _decoded, + ) = d.draw(integer_values_strat(do_expl=impl_initial is None)) + if (default is None) and (obj_initial.default is not None): + bounds = None + if ( + (bounds is None) and + (value is not None) and + (bounds_initial is not None) and + not (bounds_initial[0] <= value <= bounds_initial[1]) + ): + value = None + if ( + (bounds is None) and + (default is not None) and + (bounds_initial is not None) and + not (bounds_initial[0] <= default <= bounds_initial[1]) + ): + default = None + obj = obj_initial(value, bounds, impl, expl, default, optional) + if obj.ready: + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + self.assertEqual(obj, value_expected) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + self.assertEqual( + obj.default, + default_initial if default is None else default, + ) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + self.assertEqual( + (obj._bound_min, obj._bound_max), + bounds or bounds_initial or (float("-inf"), float("+inf")), + ) + self.assertEqual( + obj.specs, + {} if _specs_initial is None else dict(_specs_initial), + ) + + @given(integer_values_strat()) + def test_copy(self, values): + for klass in (Integer, IntegerInherited): + obj = klass(*values) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj.specs, obj_copied.specs) + self.assertEqual(obj._bound_min, obj_copied._bound_min) + self.assertEqual(obj._bound_max, obj_copied._bound_max) + self.assertEqual(obj._value, obj_copied._value) + + @given( + integers(), + integers(min_value=1).map(tag_encode), + ) + def test_stripped(self, value, tag_impl): + obj = Integer(value, impl=tag_impl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + integers(), + integers(min_value=1).map(tag_ctxc), + ) + def test_stripped_expl(self, value, tag_expl): + obj = Integer(value, expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + def test_zero_len(self): + with self.assertRaises(NotEnoughData): + Integer().decode(b"".join(( + Integer.tag_default, + len_encode(0), + ))) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Integer().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Integer().decode( + Integer.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + sets(integers(), min_size=2, max_size=2), + integers(min_value=0), + lists(integers()), + ) + def test_invalid_bounds_while_decoding(self, ints, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + value, bound_min = list(sorted(ints)) + + class Int(Integer): + bounds = (bound_min, bound_min) + with self.assertRaises(DecodeError) as err: + Int().decode( + Integer(value).encode(), + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + integer_values_strat(), + integers(), + integers(min_value=1).map(tag_ctxc), + integers(min_value=0), + ) + def test_symmetric(self, values, value, tag_expl, offset): + for klass in (Integer, IntegerInherited): + _, _, _, _, default, optional, _, _decoded = values + obj = klass( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertNotEqual(obj_decoded, obj) + self.assertEqual(int(obj_decoded), int(obj_expled)) + self.assertEqual(int(obj_decoded), int(obj)) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + + def test_go_vectors_valid(self): + for data, expect in (( + (b"\x00", 0), + (b"\x7f", 127), + (b"\x80", -128), + (b"\xff\x7f", -129), + (b"\xff", -1), + (b"\x01", 1), + (b"\x00\xff", 255), + (b"\xff\x00", -256), + (b"\x01\x00", 256), + (b"\x00\x80", 128), + (b"\x01\x00", 256), + (b"\x80\x00\x00\x00\x00\x00\x00\x00", -9223372036854775808), + (b"\x80\x00\x00\x00", -2147483648), + )): + self.assertEqual( + Integer().decode(b"".join(( + Integer.tag_default, + len_encode(len(data)), + data, + )))[0], + expect, + ) + + def test_go_vectors_invalid(self): + for data in (( + b"\x00\x7f", + b"\xff\xf0", + )): + with self.assertRaises(DecodeError): + Integer().decode(b"".join(( + Integer.tag_default, + len_encode(len(data)), + data, + ))) + + +@composite +def bit_string_values_strat(draw, schema=None, value_required=False, do_expl=False): + if schema is None: + schema = () + if draw(booleans()): + schema = draw(sets(text_letters(), min_size=1, max_size=256)) + bits = draw(sets( + integers(min_value=0, max_value=255), + min_size=len(schema), + max_size=len(schema), + )) + schema = list(zip(schema, bits)) + + def _value(value_required): + if not value_required and draw(booleans()): + return + generation_choice = 0 + if value_required: + generation_choice = draw(sampled_from((1, 2, 3))) + if generation_choice == 1 or draw(booleans()): + return "'%s'B" % "".join(draw(lists( + sampled_from(("0", "1")), + max_size=len(schema), + ))) + elif generation_choice == 2 or draw(booleans()): + return draw(binary(max_size=len(schema) // 8)) + elif generation_choice == 3 or draw(booleans()): + return tuple(draw(lists(sampled_from([name for name, _ in schema])))) + return None + value = _value(value_required) + default = _value(value_required=False) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (schema, value, impl, expl, default, optional, _decoded) + + +class BitStringInherited(BitString): + __slots__ = () + + +class TestBitString(CommonMixin, TestCase): + base_klass = BitString + + @given(lists(booleans())) + def test_b_encoding(self, bits): + obj = BitString("'%s'B" % "".join("1" if bit else "0" for bit in bits)) + self.assertEqual(obj.bit_len, len(bits)) + self.assertSequenceEqual(list(obj), bits) + for i, bit in enumerate(bits): + self.assertEqual(obj[i], bit) + + @given(lists(booleans())) + def test_out_of_bounds_bits(self, bits): + obj = BitString("'%s'B" % "".join("1" if bit else "0" for bit in bits)) + for i in range(len(bits), len(bits) * 2): + self.assertFalse(obj[i]) + + def test_bad_b_encoding(self): + with self.assertRaises(ValueError): + BitString("'010120101'B") + + @given( + integers(min_value=1, max_value=255), + integers(min_value=1, max_value=255), + ) + def test_named_are_stripped(self, leading_zeros, trailing_zeros): + obj = BitString("'%s1%s'B" % (("0" * leading_zeros), ("0" * trailing_zeros))) + self.assertEqual(obj.bit_len, leading_zeros + 1 + trailing_zeros) + self.assertGreater(len(obj.encode()), (leading_zeros + 1 + trailing_zeros) // 8) + + class BS(BitString): + __slots__ = () + schema = (("whatever", 0),) + obj = BS("'%s1%s'B" % (("0" * leading_zeros), ("0" * trailing_zeros))) + self.assertEqual(obj.bit_len, leading_zeros + 1) + self.assertGreater(len(obj.encode()), (leading_zeros + 1) // 8) + + def test_zero_len(self): + with self.assertRaises(NotEnoughData): + BitString().decode(b"".join(( + BitString.tag_default, + len_encode(0), + ))) + + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + BitString(123) + repr(err.exception) + with self.assertRaises(InvalidValueType) as err: + BitString(u"123") + repr(err.exception) + + def test_obj_unknown(self): + with self.assertRaises(ObjUnknown) as err: + BitString(b"whatever")["whenever"] + repr(err.exception) + + def test_get_invalid_typ(self): + with self.assertRaises(InvalidValueType) as err: + BitString(b"whatever")[(1, 2, 3)] + repr(err.exception) + + @given(data_strategy()) + def test_unknown_name(self, d): + _schema = d.draw(sets(text_letters(), min_size=2, max_size=5)) + missing = _schema.pop() + + class BS(BitString): + __slots__ = () + schema = [(n, i) for i, n in enumerate(_schema)] + with self.assertRaises(ObjUnknown) as err: + BS((missing,)) + repr(err.exception) + + @given(booleans()) + def test_optional(self, optional): + obj = BitString(default=BitString(b""), optional=optional) + self.assertTrue(obj.optional) + + @given(binary()) + def test_ready(self, value): + obj = BitString() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + obj = BitString(value) + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + + @given( + tuples(integers(min_value=0), binary()), + tuples(integers(min_value=0), binary()), + binary(), + binary(), + ) + def test_comparison(self, value1, value2, tag1, tag2): + for klass in (BitString, BitStringInherited): + obj1 = klass(value1) + obj2 = klass(value2) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == bytes(obj2), value1[1] == value2[1]) + obj1 = klass(value1, impl=tag1) + obj2 = klass(value1, impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + + @given(data_strategy()) + def test_call(self, d): + for klass in (BitString, BitStringInherited): + ( + schema_initial, + value_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(bit_string_values_strat()) + + class BS(klass): + __slots__ = () + schema = schema_initial + obj_initial = BS( + value=value_initial, + impl=impl_initial, + expl=expl_initial, + default=default_initial, + optional=optional_initial or False, + _decoded=_decoded_initial, + ) + ( + _, + value, + impl, + expl, + default, + optional, + _decoded, + ) = d.draw(bit_string_values_strat( + schema=schema_initial, + do_expl=impl_initial is None, + )) + obj = obj_initial( + value=value, + impl=impl, + expl=expl, + default=default, + optional=optional, + ) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + self.assertEqual(obj.specs, obj_initial.specs) + + @given(bit_string_values_strat()) + def test_copy(self, values): + for klass in (BitString, BitStringInherited): + _schema, value, impl, expl, default, optional, _decoded = values + + class BS(klass): + __slots__ = () + schema = _schema + obj = BS( + value=value, + impl=impl, + expl=expl, + default=default, + optional=optional or False, + _decoded=_decoded, + ) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj.specs, obj_copied.specs) + self.assertEqual(obj._value, obj_copied._value) + + @given( + binary(), + integers(min_value=1).map(tag_encode), + ) + def test_stripped(self, value, tag_impl): + obj = BitString(value, impl=tag_impl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + binary(), + integers(min_value=1).map(tag_ctxc), + ) + def test_stripped_expl(self, value, tag_expl): + obj = BitString(value, expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + BitString().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + BitString().decode( + BitString.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(data_strategy()) + def test_symmetric(self, d): + ( + _schema, + value, + _, + _, + default, + optional, + _decoded, + ) = d.draw(bit_string_values_strat(value_required=True)) + tag_expl = tag_ctxc(d.draw(integers(min_value=1))) + offset = d.draw(integers(min_value=0)) + for klass in (BitString, BitStringInherited): + class BS(klass): + __slots__ = () + schema = _schema + obj = BS( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertNotEqual(obj_decoded, obj) + self.assertEqual(bytes(obj_decoded), bytes(obj_expled)) + self.assertEqual(bytes(obj_decoded), bytes(obj)) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + if isinstance(value, tuple): + self.assertSetEqual(set(value), set(obj_decoded.named)) + for name in value: + obj_decoded[name] + + @given(integers(min_value=1, max_value=255)) + def test_bad_zero_value(self, pad_size): + with self.assertRaises(DecodeError): + BitString().decode(b"".join(( + BitString.tag_default, + len_encode(1), + int2byte(pad_size), + ))) + + def test_go_vectors_invalid(self): + for data in (( + b"\x07\x01", + b"\x07\x40", + b"\x08\x00", + )): + with self.assertRaises(DecodeError): + BitString().decode(b"".join(( + BitString.tag_default, + len_encode(2), + data, + ))) + + def test_go_vectors_valid(self): + obj, _ = BitString().decode(b"".join(( + BitString.tag_default, + len_encode(1), + b"\x00", + ))) + self.assertEqual(bytes(obj), b"") + self.assertEqual(obj.bit_len, 0) + + obj, _ = BitString().decode(b"".join(( + BitString.tag_default, + len_encode(2), + b"\x07\x00", + ))) + self.assertEqual(bytes(obj), b"\x00") + self.assertEqual(obj.bit_len, 1) + + obj = BitString((16, b"\x82\x40")) + self.assertTrue(obj[0]) + self.assertFalse(obj[1]) + self.assertTrue(obj[6]) + self.assertTrue(obj[9]) + self.assertFalse(obj[17]) + + +@composite +def octet_string_values_strat(draw, do_expl=False): + bound_min, bound_max = sorted(draw(sets( + integers(min_value=0, max_value=1 << 7), + min_size=2, + max_size=2, + ))) + value = draw(one_of( + none(), + binary(min_size=bound_min, max_size=bound_max), + )) + default = draw(one_of( + none(), + binary(min_size=bound_min, max_size=bound_max), + )) + bounds = None + if draw(booleans()): + bounds = (bound_min, bound_max) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (value, bounds, impl, expl, default, optional, _decoded) + + +class OctetStringInherited(OctetString): + __slots__ = () + + +class TestOctetString(CommonMixin, TestCase): + base_klass = OctetString + + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + OctetString(text_type(123)) + repr(err.exception) + + @given(booleans()) + def test_optional(self, optional): + obj = OctetString(default=OctetString(b""), optional=optional) + self.assertTrue(obj.optional) + + @given(binary()) + def test_ready(self, value): + obj = OctetString() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + obj = OctetString(value) + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + + @given(binary(), binary(), binary(), binary()) + def test_comparison(self, value1, value2, tag1, tag2): + for klass in (OctetString, OctetStringInherited): + obj1 = klass(value1) + obj2 = klass(value2) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == bytes(obj2), value1 == value2) + obj1 = klass(value1, impl=tag1) + obj2 = klass(value1, impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + + @given(data_strategy()) + def test_bounds_satisfied(self, d): + bound_min = d.draw(integers(min_value=0, max_value=1 << 7)) + bound_max = d.draw(integers(min_value=bound_min, max_value=1 << 7)) + value = d.draw(binary(min_size=bound_min, max_size=bound_max)) + OctetString(value=value, bounds=(bound_min, bound_max)) + + @given(data_strategy()) + def test_bounds_unsatisfied(self, d): + bound_min = d.draw(integers(min_value=1, max_value=1 << 7)) + bound_max = d.draw(integers(min_value=bound_min, max_value=1 << 7)) + value = d.draw(binary(max_size=bound_min - 1)) + with self.assertRaises(BoundsError) as err: + OctetString(value=value, bounds=(bound_min, bound_max)) + repr(err.exception) + value = d.draw(binary(min_size=bound_max + 1)) + with self.assertRaises(BoundsError) as err: + OctetString(value=value, bounds=(bound_min, bound_max)) + repr(err.exception) + + @given(data_strategy()) + def test_call(self, d): + for klass in (OctetString, OctetStringInherited): + ( + value_initial, + bounds_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(octet_string_values_strat()) + obj_initial = klass( + value_initial, + bounds_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial or False, + _decoded_initial, + ) + ( + value, + bounds, + impl, + expl, + default, + optional, + _decoded, + ) = d.draw(octet_string_values_strat(do_expl=impl_initial is None)) + if (default is None) and (obj_initial.default is not None): + bounds = None + if ( + (bounds is None) and + (value is not None) and + (bounds_initial is not None) and + not (bounds_initial[0] <= len(value) <= bounds_initial[1]) + ): + value = None + if ( + (bounds is None) and + (default is not None) and + (bounds_initial is not None) and + not (bounds_initial[0] <= len(default) <= bounds_initial[1]) + ): + default = None + obj = obj_initial(value, bounds, impl, expl, default, optional) + if obj.ready: + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + self.assertEqual(obj, value_expected) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + self.assertEqual( + obj.default, + default_initial if default is None else default, + ) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + self.assertEqual( + (obj._bound_min, obj._bound_max), + bounds or bounds_initial or (0, float("+inf")), + ) + + @given(octet_string_values_strat()) + def test_copy(self, values): + for klass in (OctetString, OctetStringInherited): + obj = klass(*values) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj._bound_min, obj_copied._bound_min) + self.assertEqual(obj._bound_max, obj_copied._bound_max) + self.assertEqual(obj._value, obj_copied._value) + + @given( + binary(), + integers(min_value=1).map(tag_encode), + ) + def test_stripped(self, value, tag_impl): + obj = OctetString(value, impl=tag_impl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + binary(), + integers(min_value=1).map(tag_ctxc), + ) + def test_stripped_expl(self, value, tag_expl): + obj = OctetString(value, expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + OctetString().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + OctetString().decode( + OctetString.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + sets(integers(min_value=0, max_value=10), min_size=2, max_size=2), + integers(min_value=0), + lists(integers()), + ) + def test_invalid_bounds_while_decoding(self, ints, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + value, bound_min = list(sorted(ints)) + + class String(OctetString): + bounds = (bound_min, bound_min) + with self.assertRaises(DecodeError) as err: + String().decode( + OctetString(b"\x00" * value).encode(), + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + octet_string_values_strat(), + binary(), + integers(min_value=1).map(tag_ctxc), + integers(min_value=0), + ) + def test_symmetric(self, values, value, tag_expl, offset): + for klass in (OctetString, OctetStringInherited): + _, _, _, _, default, optional, _decoded = values + obj = klass( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertNotEqual(obj_decoded, obj) + self.assertEqual(bytes(obj_decoded), bytes(obj_expled)) + self.assertEqual(bytes(obj_decoded), bytes(obj)) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + + +@composite +def null_values_strat(draw, do_expl=False): + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (impl, expl, optional, _decoded) + + +class NullInherited(Null): + __slots__ = () + + +class TestNull(CommonMixin, TestCase): + base_klass = Null + + def test_ready(self): + obj = Null() + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + + @given(binary(), binary()) + def test_comparison(self, tag1, tag2): + for klass in (Null, NullInherited): + obj1 = klass(impl=tag1) + obj2 = klass(impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + self.assertNotEqual(obj1, tag2) + + @given(data_strategy()) + def test_call(self, d): + for klass in (Null, NullInherited): + ( + impl_initial, + expl_initial, + optional_initial, + _decoded_initial, + ) = d.draw(null_values_strat()) + obj_initial = klass( + impl=impl_initial, + expl=expl_initial, + optional=optional_initial or False, + _decoded=_decoded_initial, + ) + ( + impl, + expl, + optional, + _decoded, + ) = d.draw(null_values_strat(do_expl=impl_initial is None)) + obj = obj_initial(impl=impl, expl=expl, optional=optional) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + self.assertEqual(obj.optional, optional) + + @given(null_values_strat()) + def test_copy(self, values): + for klass in (Null, NullInherited): + impl, expl, optional, _decoded = values + obj = klass( + impl=impl, + expl=expl, + optional=optional or False, + _decoded=_decoded, + ) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + + @given(integers(min_value=1).map(tag_encode)) + def test_stripped(self, tag_impl): + obj = Null(impl=tag_impl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given(integers(min_value=1).map(tag_ctxc)) + def test_stripped_expl(self, tag_expl): + obj = Null(expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Null().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Null().decode( + Null.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given(binary(min_size=1)) + def test_tag_mismatch(self, impl): + assume(impl != Null.tag_default) + with self.assertRaises(TagMismatch): + Null(impl=impl).decode(Null().encode()) + + @given( + null_values_strat(), + integers(min_value=1).map(tag_ctxc), + integers(min_value=0), + ) + def test_symmetric(self, values, tag_expl, offset): + for klass in (Null, NullInherited): + _, _, optional, _decoded = values + obj = klass(optional=optional, _decoded=_decoded) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertNotEqual(obj_decoded, obj) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + + @given(integers(min_value=1)) + def test_invalid_len(self, l): + with self.assertRaises(InvalidLength): + Null().decode(b"".join(( + Null.tag_default, + len_encode(l), + ))) + + +@composite +def oid_strategy(draw): + first_arc = draw(integers(min_value=0, max_value=2)) + second_arc = 0 + if first_arc in (0, 1): + second_arc = draw(integers(min_value=0, max_value=39)) + else: + second_arc = draw(integers(min_value=0)) + other_arcs = draw(lists(integers(min_value=0))) + return tuple([first_arc, second_arc] + other_arcs) + + +@composite +def oid_values_strat(draw, do_expl=False): + value = draw(one_of(none(), oid_strategy())) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + default = draw(one_of(none(), oid_strategy())) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (value, impl, expl, default, optional, _decoded) + + +class ObjectIdentifierInherited(ObjectIdentifier): + __slots__ = () + + +class TestObjectIdentifier(CommonMixin, TestCase): + base_klass = ObjectIdentifier + + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + ObjectIdentifier(123) + repr(err.exception) + + @given(booleans()) + def test_optional(self, optional): + obj = ObjectIdentifier(default=ObjectIdentifier("1.2.3"), optional=optional) + self.assertTrue(obj.optional) + + @given(oid_strategy()) + def test_ready(self, value): + obj = ObjectIdentifier() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + obj = ObjectIdentifier(value) + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + hash(obj) + + @given(oid_strategy(), oid_strategy(), binary(), binary()) + def test_comparison(self, value1, value2, tag1, tag2): + for klass in (ObjectIdentifier, ObjectIdentifierInherited): + obj1 = klass(value1) + obj2 = klass(value2) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == tuple(obj2), value1 == value2) + self.assertEqual(str(obj1) == str(obj2), value1 == value2) + obj1 = klass(value1, impl=tag1) + obj2 = klass(value1, impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + + @given(lists(oid_strategy())) + def test_sorted_works(self, values): + self.assertSequenceEqual( + [tuple(v) for v in sorted(ObjectIdentifier(v) for v in values)], + sorted(values), + ) + + @given(data_strategy()) + def test_call(self, d): + for klass in (ObjectIdentifier, ObjectIdentifierInherited): + ( + value_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(oid_values_strat()) + obj_initial = klass( + value_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial or False, + _decoded_initial, + ) + ( + value, + impl, + expl, + default, + optional, + _decoded, + ) = d.draw(oid_values_strat(do_expl=impl_initial is None)) + obj = obj_initial(value, impl, expl, default, optional) + if obj.ready: + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + self.assertEqual(obj, value_expected) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + self.assertEqual( + obj.default, + default_initial if default is None else default, + ) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + + @given(oid_values_strat()) + def test_copy(self, values): + for klass in (ObjectIdentifier, ObjectIdentifierInherited): + obj = klass(*values) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj._value, obj_copied._value) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + oid_strategy(), + integers(min_value=1).map(tag_encode), + ) + def test_stripped(self, value, tag_impl): + obj = ObjectIdentifier(value, impl=tag_impl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + oid_strategy(), + integers(min_value=1).map(tag_ctxc), + ) + def test_stripped_expl(self, value, tag_expl): + obj = ObjectIdentifier(value, expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + ObjectIdentifier().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + ObjectIdentifier().decode( + ObjectIdentifier.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + def test_zero_oid(self): + with self.assertRaises(NotEnoughData): + ObjectIdentifier().decode( + b"".join((ObjectIdentifier.tag_default, len_encode(0))) + ) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(oid_strategy()) + def test_unfinished_oid(self, value): + assume(list(value)[-1] > 255) + obj_encoded = ObjectIdentifier(value).encode() + obj, _ = ObjectIdentifier().decode(obj_encoded) + data = obj_encoded[obj.tlen + obj.llen:-1] + data = b"".join(( + ObjectIdentifier.tag_default, + len_encode(len(data)), + data, + )) + with assertRaisesRegex(self, DecodeError, "unfinished OID"): + obj.decode(data) + + @given(integers(min_value=0)) + def test_invalid_short(self, value): + with self.assertRaises(InvalidOID): + ObjectIdentifier((value,)) + with self.assertRaises(InvalidOID): + ObjectIdentifier("%d" % value) + + @given(integers(min_value=3), integers(min_value=0)) + def test_invalid_first_arc(self, first_arc, second_arc): + with self.assertRaises(InvalidOID): + ObjectIdentifier((first_arc, second_arc)) + with self.assertRaises(InvalidOID): + ObjectIdentifier("%d.%d" % (first_arc, second_arc)) + + @given(integers(min_value=0, max_value=1), integers(min_value=40)) + def test_invalid_second_arc(self, first_arc, second_arc): + with self.assertRaises(InvalidOID): + ObjectIdentifier((first_arc, second_arc)) + with self.assertRaises(InvalidOID): + ObjectIdentifier("%d.%d" % (first_arc, second_arc)) + + @given(text(alphabet=ascii_letters + ".", min_size=1)) + def test_junk(self, oid): + with self.assertRaises(InvalidOID): + ObjectIdentifier(oid) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(oid_strategy()) + def test_validness(self, oid): + obj = ObjectIdentifier(oid) + self.assertEqual(obj, ObjectIdentifier(".".join(str(arc) for arc in oid))) + str(obj) + repr(obj) + pprint(obj) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + oid_values_strat(), + oid_strategy(), + integers(min_value=1).map(tag_ctxc), + integers(min_value=0), + ) + def test_symmetric(self, values, value, tag_expl, offset): + for klass in (ObjectIdentifier, ObjectIdentifierInherited): + _, _, _, default, optional, _decoded = values + obj = klass( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertNotEqual(obj_decoded, obj) + self.assertEqual(tuple(obj_decoded), tuple(obj_expled)) + self.assertEqual(tuple(obj_decoded), tuple(obj)) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + + @given( + oid_strategy().map(ObjectIdentifier), + oid_strategy().map(ObjectIdentifier), + ) + def test_add(self, oid1, oid2): + oid_expect = ObjectIdentifier(str(oid1) + "." + str(oid2)) + for oid_to_add in (oid2, tuple(oid2)): + self.assertEqual(oid1 + oid_to_add, oid_expect) + with self.assertRaises(InvalidValueType): + oid1 + str(oid2) + + def test_go_vectors_valid(self): + for data, expect in ( + (b"\x55", (2, 5)), + (b"\x55\x02", (2, 5, 2)), + (b"\x55\x02\xc0\x00", (2, 5, 2, 8192)), + (b"\x81\x34\x03", (2, 100, 3)), + ): + self.assertEqual( + ObjectIdentifier().decode(b"".join(( + ObjectIdentifier.tag_default, + len_encode(len(data)), + data, + )))[0], + expect, + ) + + def test_go_vectors_invalid(self): + data = b"\x55\x02\xc0\x80\x80\x80\x80" + with self.assertRaises(DecodeError): + ObjectIdentifier().decode(b"".join(( + Integer.tag_default, + len_encode(len(data)), + data, + ))) + + +@composite +def enumerated_values_strat(draw, schema=None, do_expl=False): + if schema is None: + schema = list(draw(sets(text_printable, min_size=1, max_size=3))) + values = list(draw(sets( + integers(), + min_size=len(schema), + max_size=len(schema), + ))) + schema = list(zip(schema, values)) + value = draw(one_of(none(), sampled_from([k for k, v in schema]))) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + default = draw(one_of(none(), sampled_from([v for k, v in schema]))) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (schema, value, impl, expl, default, optional, _decoded) + + +class TestEnumerated(CommonMixin, TestCase): + class EWhatever(Enumerated): + __slots__ = () + schema = (("whatever", 0),) + + base_klass = EWhatever + + def test_schema_required(self): + with assertRaisesRegex(self, ValueError, "schema must be specified"): + Enumerated() + + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + self.base_klass((1, 2)) + repr(err.exception) + + @given(sets(text_letters(), min_size=2)) + def test_unknown_name(self, schema_input): + missing = schema_input.pop() + + class E(Enumerated): + __slots__ = () + schema = [(n, 123) for n in schema_input] + with self.assertRaises(ObjUnknown) as err: + E(missing) + repr(err.exception) + + @given( + sets(text_letters(), min_size=2), + sets(integers(), min_size=2), + ) + def test_unknown_value(self, schema_input, values_input): + schema_input.pop() + missing_value = values_input.pop() + _input = list(zip(schema_input, values_input)) + + class E(Enumerated): + __slots__ = () + schema = _input + with self.assertRaises(DecodeError) as err: + E(missing_value) + repr(err.exception) + + @given(booleans()) + def test_optional(self, optional): + obj = self.base_klass(default="whatever", optional=optional) + self.assertTrue(obj.optional) + + def test_ready(self): + obj = self.base_klass() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + obj = self.base_klass("whatever") + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + + @given(integers(), integers(), binary(), binary()) + def test_comparison(self, value1, value2, tag1, tag2): + class E(Enumerated): + __slots__ = () + schema = ( + ("whatever0", value1), + ("whatever1", value2), + ) + + class EInherited(E): + __slots__ = () + for klass in (E, EInherited): + obj1 = klass(value1) + obj2 = klass(value2) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == int(obj2), value1 == value2) + obj1 = klass(value1, impl=tag1) + obj2 = klass(value1, impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + + @given(data_strategy()) + def test_call(self, d): + ( + schema_initial, + value_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(enumerated_values_strat()) + + class E(Enumerated): + __slots__ = () + schema = schema_initial + obj_initial = E( + value=value_initial, + impl=impl_initial, + expl=expl_initial, + default=default_initial, + optional=optional_initial or False, + _decoded=_decoded_initial, + ) + ( + _, + value, + impl, + expl, + default, + optional, + _decoded, + ) = d.draw(enumerated_values_strat( + schema=schema_initial, + do_expl=impl_initial is None, + )) + obj = obj_initial( + value=value, + impl=impl, + expl=expl, + default=default, + optional=optional, + ) + if obj.ready: + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + self.assertEqual( + int(obj), + dict(schema_initial).get(value_expected, value_expected), + ) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + self.assertEqual( + obj.default, + default_initial if default is None else default, + ) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + self.assertEqual(obj.specs, dict(schema_initial)) + + @given(enumerated_values_strat()) + def test_copy(self, values): + schema_input, value, impl, expl, default, optional, _decoded = values + + class E(Enumerated): + __slots__ = () + schema = schema_input + obj = E( + value=value, + impl=impl, + expl=expl, + default=default, + optional=optional, + _decoded=_decoded, + ) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj.specs, obj_copied.specs) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(data_strategy()) + def test_symmetric(self, d): + schema_input, _, _, _, default, optional, _decoded = d.draw( + enumerated_values_strat(), + ) + tag_expl = d.draw(integers(min_value=1).map(tag_ctxc)) + offset = d.draw(integers(min_value=0)) + value = d.draw(sampled_from(sorted([v for _, v in schema_input]))) + + class E(Enumerated): + __slots__ = () + schema = schema_input + obj = E( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertNotEqual(obj_decoded, obj) + self.assertEqual(int(obj_decoded), int(obj_expled)) + self.assertEqual(int(obj_decoded), int(obj)) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + + +@composite +def string_values_strat(draw, alphabet, do_expl=False): + bound_min, bound_max = sorted(draw(sets( + integers(min_value=0, max_value=1 << 7), + min_size=2, + max_size=2, + ))) + value = draw(one_of( + none(), + text(alphabet=alphabet, min_size=bound_min, max_size=bound_max), + )) + default = draw(one_of( + none(), + text(alphabet=alphabet, min_size=bound_min, max_size=bound_max), + )) + bounds = None + if draw(booleans()): + bounds = (bound_min, bound_max) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (value, bounds, impl, expl, default, optional, _decoded) + + +class StringMixin(object): + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + self.base_klass((1, 2)) + repr(err.exception) + + def text_alphabet(self): + if self.base_klass.encoding in ("ascii", "iso-8859-1"): + return printable + whitespace + return None + + @given(booleans()) + def test_optional(self, optional): + obj = self.base_klass(default=self.base_klass(""), optional=optional) + self.assertTrue(obj.optional) + + @given(data_strategy()) + def test_ready(self, d): + obj = self.base_klass() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + text_type(obj) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + value = d.draw(text(alphabet=self.text_alphabet())) + obj = self.base_klass(value) + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + text_type(obj) + + @given(data_strategy()) + def test_comparison(self, d): + value1 = d.draw(text(alphabet=self.text_alphabet())) + value2 = d.draw(text(alphabet=self.text_alphabet())) + tag1 = d.draw(binary()) + tag2 = d.draw(binary()) + obj1 = self.base_klass(value1) + obj2 = self.base_klass(value2) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == bytes(obj2), value1 == value2) + self.assertEqual(obj1 == text_type(obj2), value1 == value2) + obj1 = self.base_klass(value1, impl=tag1) + obj2 = self.base_klass(value1, impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + + @given(data_strategy()) + def test_bounds_satisfied(self, d): + bound_min = d.draw(integers(min_value=0, max_value=1 << 7)) + bound_max = d.draw(integers(min_value=bound_min, max_value=1 << 7)) + value = d.draw(text( + alphabet=self.text_alphabet(), + min_size=bound_min, + max_size=bound_max, + )) + self.base_klass(value=value, bounds=(bound_min, bound_max)) + + @given(data_strategy()) + def test_bounds_unsatisfied(self, d): + bound_min = d.draw(integers(min_value=1, max_value=1 << 7)) + bound_max = d.draw(integers(min_value=bound_min, max_value=1 << 7)) + value = d.draw(text(alphabet=self.text_alphabet(), max_size=bound_min - 1)) + with self.assertRaises(BoundsError) as err: + self.base_klass(value=value, bounds=(bound_min, bound_max)) + repr(err.exception) + value = d.draw(text(alphabet=self.text_alphabet(), min_size=bound_max + 1)) + with self.assertRaises(BoundsError) as err: + self.base_klass(value=value, bounds=(bound_min, bound_max)) + repr(err.exception) + + @given(data_strategy()) + def test_call(self, d): + ( + value_initial, + bounds_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(string_values_strat(self.text_alphabet())) + obj_initial = self.base_klass( + value_initial, + bounds_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial or False, + _decoded_initial, + ) + ( + value, + bounds, + impl, + expl, + default, + optional, + _decoded, + ) = d.draw(string_values_strat( + self.text_alphabet(), + do_expl=impl_initial is None, + )) + if (default is None) and (obj_initial.default is not None): + bounds = None + if ( + (bounds is None) and + (value is not None) and + (bounds_initial is not None) and + not (bounds_initial[0] <= len(value) <= bounds_initial[1]) + ): + value = None + if ( + (bounds is None) and + (default is not None) and + (bounds_initial is not None) and + not (bounds_initial[0] <= len(default) <= bounds_initial[1]) + ): + default = None + obj = obj_initial(value, bounds, impl, expl, default, optional) + if obj.ready: + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + self.assertEqual(obj, value_expected) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + self.assertEqual( + obj.default, + default_initial if default is None else default, + ) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + self.assertEqual( + (obj._bound_min, obj._bound_max), + bounds or bounds_initial or (0, float("+inf")), + ) + + @given(data_strategy()) + def test_copy(self, d): + values = d.draw(string_values_strat(self.text_alphabet())) + obj = self.base_klass(*values) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj._bound_min, obj_copied._bound_min) + self.assertEqual(obj._bound_max, obj_copied._bound_max) + self.assertEqual(obj._value, obj_copied._value) + + @given(data_strategy()) + def test_stripped(self, d): + value = d.draw(text(alphabet=self.text_alphabet())) + tag_impl = tag_encode(d.draw(integers(min_value=1))) + obj = self.base_klass(value, impl=tag_impl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given(data_strategy()) + def test_stripped_expl(self, d): + value = d.draw(text(alphabet=self.text_alphabet())) + tag_expl = tag_ctxc(d.draw(integers(min_value=1))) + obj = self.base_klass(value, expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + self.base_klass().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + self.base_klass().decode( + self.base_klass.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + sets(integers(min_value=0, max_value=10), min_size=2, max_size=2), + integers(min_value=0), + lists(integers()), + ) + def test_invalid_bounds_while_decoding(self, ints, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + value, bound_min = list(sorted(ints)) + + class String(self.base_klass): + # Multiply this value by four, to satisfy UTF-32 bounds + # (4 bytes per character) validation + bounds = (bound_min * 4, bound_min * 4) + with self.assertRaises(DecodeError) as err: + String().decode( + self.base_klass(b"\x00\x00\x00\x00" * value).encode(), + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given(data_strategy()) + def test_symmetric(self, d): + values = d.draw(string_values_strat(self.text_alphabet())) + value = d.draw(text(alphabet=self.text_alphabet())) + tag_expl = tag_ctxc(d.draw(integers(min_value=1))) + offset = d.draw(integers(min_value=0)) + _, _, _, _, default, optional, _decoded = values + obj = self.base_klass( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertNotEqual(obj_decoded, obj) + self.assertEqual(bytes(obj_decoded), bytes(obj_expled)) + self.assertEqual(bytes(obj_decoded), bytes(obj)) + self.assertEqual(text_type(obj_decoded), text_type(obj_expled)) + self.assertEqual(text_type(obj_decoded), text_type(obj)) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + + +class TestUTF8String(StringMixin, CommonMixin, TestCase): + base_klass = UTF8String + + +class TestNumericString(StringMixin, CommonMixin, TestCase): + base_klass = NumericString + + +class TestPrintableString(StringMixin, CommonMixin, TestCase): + base_klass = PrintableString + + +class TestTeletexString(StringMixin, CommonMixin, TestCase): + base_klass = TeletexString + + +class TestVideotexString(StringMixin, CommonMixin, TestCase): + base_klass = VideotexString + + +class TestIA5String(StringMixin, CommonMixin, TestCase): + base_klass = IA5String + + +class TestGraphicString(StringMixin, CommonMixin, TestCase): + base_klass = GraphicString + + +class TestVisibleString(StringMixin, CommonMixin, TestCase): + base_klass = VisibleString + + +class TestGeneralString(StringMixin, CommonMixin, TestCase): + base_klass = GeneralString + + +class TestUniversalString(StringMixin, CommonMixin, TestCase): + base_klass = UniversalString + + +class TestBMPString(StringMixin, CommonMixin, TestCase): + base_klass = BMPString + + +@composite +def generalized_time_values_strat( + draw, + min_datetime, + max_datetime, + omit_ms=False, + do_expl=False, +): + value = None + if draw(booleans()): + value = draw(datetimes(min_value=min_datetime, max_value=max_datetime)) + if omit_ms: + value = value.replace(microsecond=0) + default = None + if draw(booleans()): + default = draw(datetimes(min_value=min_datetime, max_value=max_datetime)) + if omit_ms: + default = default.replace(microsecond=0) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (value, impl, expl, default, optional, _decoded) + + +class TimeMixin(object): + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + self.base_klass( + repr(err.exception) + + @given(data_strategy()) + def test_optional(self, d): + default = d.draw(datetimes( + min_value=self.min_datetime, + max_value=self.max_datetime, + )) + optional = d.draw(booleans()) + obj = self.base_klass(default=default, optional=optional) + self.assertTrue(obj.optional) + + @given(data_strategy()) + def test_ready(self, d): + obj = self.base_klass() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + value = d.draw(datetimes(min_value=self.min_datetime)) + obj = self.base_klass(value) + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + + @given(data_strategy()) + def test_comparison(self, d): + value1 = d.draw(datetimes( + min_value=self.min_datetime, + max_value=self.max_datetime, + )) + value2 = d.draw(datetimes( + min_value=self.min_datetime, + max_value=self.max_datetime, + )) + tag1 = d.draw(binary()) + tag2 = d.draw(binary()) + if self.omit_ms: + value1 = value1.replace(microsecond=0) + value2 = value2.replace(microsecond=0) + obj1 = self.base_klass(value1) + obj2 = self.base_klass(value2) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == obj2.todatetime(), value1 == value2) + self.assertEqual(obj1 == bytes(obj2), value1 == value2) + obj1 = self.base_klass(value1, impl=tag1) + obj2 = self.base_klass(value1, impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + + @given(data_strategy()) + def test_call(self, d): + ( + value_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(generalized_time_values_strat( + min_datetime=self.min_datetime, + max_datetime=self.max_datetime, + omit_ms=self.omit_ms, + )) + obj_initial = self.base_klass( + value=value_initial, + impl=impl_initial, + expl=expl_initial, + default=default_initial, + optional=optional_initial or False, + _decoded=_decoded_initial, + ) + ( + value, + impl, + expl, + default, + optional, + _decoded, + ) = d.draw(generalized_time_values_strat( + min_datetime=self.min_datetime, + max_datetime=self.max_datetime, + omit_ms=self.omit_ms, + do_expl=impl_initial is None, + )) + obj = obj_initial( + value=value, + impl=impl, + expl=expl, + default=default, + optional=optional, + ) + if obj.ready: + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + self.assertEqual(obj, value_expected) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + self.assertEqual( + obj.default, + default_initial if default is None else default, + ) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + + @given(data_strategy()) + def test_copy(self, d): + values = d.draw(generalized_time_values_strat( + min_datetime=self.min_datetime, + max_datetime=self.max_datetime, + )) + obj = self.base_klass(*values) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj._value, obj_copied._value) + + @given(data_strategy()) + def test_stripped(self, d): + value = d.draw(datetimes( + min_value=self.min_datetime, + max_value=self.max_datetime, + )) + tag_impl = tag_encode(d.draw(integers(min_value=1))) + obj = self.base_klass(value, impl=tag_impl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given(data_strategy()) + def test_stripped_expl(self, d): + value = d.draw(datetimes( + min_value=self.min_datetime, + max_value=self.max_datetime, + )) + tag_expl = tag_ctxc(d.draw(integers(min_value=1))) + obj = self.base_klass(value, expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(data_strategy()) + def test_symmetric(self, d): + values = d.draw(generalized_time_values_strat( + min_datetime=self.min_datetime, + max_datetime=self.max_datetime, + )) + value = d.draw(datetimes( + min_value=self.min_datetime, + max_value=self.max_datetime, + )) + tag_expl = tag_ctxc(d.draw(integers(min_value=1))) + offset = d.draw(integers(min_value=0)) + _, _, _, default, optional, _decoded = values + obj = self.base_klass( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertEqual(obj_decoded.todatetime(), obj_expled.todatetime()) + self.assertEqual(obj_decoded.todatetime(), obj.todatetime()) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + + +class TestGeneralizedTime(TimeMixin, CommonMixin, TestCase): + base_klass = GeneralizedTime + omit_ms = False + min_datetime = datetime(1900, 1, 1) + max_datetime = datetime(9999, 12, 31) + + def test_go_vectors_invalid(self): + for data in (( + b"20100102030405", + b"00000100000000Z", + b"20101302030405Z", + b"20100002030405Z", + b"20100100030405Z", + b"20100132030405Z", + b"20100231030405Z", + b"20100102240405Z", + b"20100102036005Z", + b"20100102030460Z", + b"-20100102030410Z", + b"2010-0102030410Z", + b"2010-0002030410Z", + b"201001-02030410Z", + b"20100102-030410Z", + b"2010010203-0410Z", + b"201001020304-10Z", + # These ones are INVALID in *DER*, but accepted + # by Go's encoding/asn1 + b"20100102030405+0607", + b"20100102030405-0607", + )): + with self.assertRaises(DecodeError) as err: + GeneralizedTime(data) + repr(err.exception) + + def test_go_vectors_valid(self): + self.assertEqual( + GeneralizedTime(b"20100102030405Z").todatetime(), + datetime(2010, 1, 2, 3, 4, 5, 0), + ) + + +class TestUTCTime(TimeMixin, CommonMixin, TestCase): + base_klass = UTCTime + omit_ms = True + min_datetime = datetime(2000, 1, 1) + max_datetime = datetime(2049, 12, 31) + + def test_go_vectors_invalid(self): + for data in (( + b"a10506234540Z", + b"91a506234540Z", + b"9105a6234540Z", + b"910506a34540Z", + b"910506334a40Z", + b"91050633444aZ", + b"910506334461Z", + b"910506334400Za", + b"000100000000Z", + b"101302030405Z", + b"100002030405Z", + b"100100030405Z", + b"100132030405Z", + b"100231030405Z", + b"100102240405Z", + b"100102036005Z", + b"100102030460Z", + b"-100102030410Z", + b"10-0102030410Z", + b"10-0002030410Z", + b"1001-02030410Z", + b"100102-030410Z", + b"10010203-0410Z", + b"1001020304-10Z", + # These ones are INVALID in *DER*, but accepted + # by Go's encoding/asn1 + b"910506164540-0700", + b"910506164540+0730", + b"9105062345Z", + b"5105062345Z", + )): + with self.assertRaises(DecodeError) as err: + UTCTime(data) + repr(err.exception) + + def test_go_vectors_valid(self): + self.assertEqual( + UTCTime(b"910506234540Z").todatetime(), + datetime(1991, 5, 6, 23, 45, 40, 0), + ) + + @given(integers(min_value=0, max_value=49)) + def test_pre50(self, year): + self.assertEqual( + UTCTime(("%02d1231235959Z" % year).encode("ascii")).todatetime().year, + 2000 + year, + ) + + @given(integers(min_value=50, max_value=99)) + def test_post50(self, year): + self.assertEqual( + UTCTime(("%02d1231235959Z" % year).encode("ascii")).todatetime().year, + 1900 + year, + ) + + +@composite +def any_values_strat(draw, do_expl=False): + value = draw(one_of(none(), binary())) + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (value, expl, optional, _decoded) + + +class AnyInherited(Any): + __slots__ = () + + +class TestAny(CommonMixin, TestCase): + base_klass = Any + + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + Any(123) + repr(err.exception) + + @given(booleans()) + def test_optional(self, optional): + obj = Any(optional=optional) + self.assertEqual(obj.optional, optional) + + @given(binary()) + def test_ready(self, value): + obj = Any() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + obj = Any(value) + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + + @given(integers()) + def test_basic(self, value): + integer_encoded = Integer(value).encode() + for obj in ( + Any(integer_encoded), + Any(Integer(value)), + Any(Any(Integer(value))), + ): + self.assertSequenceEqual(bytes(obj), integer_encoded) + self.assertEqual( + obj.decode(obj.encode())[0].vlen, + len(integer_encoded), + ) + repr(obj) + pprint(obj) + self.assertSequenceEqual(obj.encode(), integer_encoded) + + @given(binary(), binary()) + def test_comparison(self, value1, value2): + for klass in (Any, AnyInherited): + obj1 = klass(value1) + obj2 = klass(value2) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == bytes(obj2), value1 == value2) + + @given(data_strategy()) + def test_call(self, d): + for klass in (Any, AnyInherited): + ( + value_initial, + expl_initial, + optional_initial, + _decoded_initial, + ) = d.draw(any_values_strat()) + obj_initial = klass( + value_initial, + expl_initial, + optional_initial or False, + _decoded_initial, + ) + ( + value, + expl, + optional, + _decoded, + ) = d.draw(any_values_strat(do_expl=True)) + obj = obj_initial(value, expl, optional) + if obj.ready: + value_expected = None if value is None else value + self.assertEqual(obj, value_expected) + self.assertEqual(obj.expl_tag, expl or expl_initial) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + self.assertEqual(obj.optional, optional) + + def test_simultaneous_impl_expl(self): + # override it, as Any does not have implicit tag + pass + + def test_decoded(self): + # override it, as Any does not have implicit tag + pass + + @given(any_values_strat()) + def test_copy(self, values): + for klass in (Any, AnyInherited): + obj = klass(*values) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj._value, obj_copied._value) + + @given(binary().map(OctetString)) + def test_stripped(self, value): + obj = Any(value) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + binary(), + integers(min_value=1).map(tag_ctxc), + ) + def test_stripped_expl(self, value, tag_expl): + obj = Any(value, expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Any().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + Any().decode( + Any.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + any_values_strat(), + integers().map(lambda x: Integer(x).encode()), + integers(min_value=1).map(tag_ctxc), + integers(min_value=0), + ) + def test_symmetric(self, values, value, tag_expl, offset): + for klass in (Any, AnyInherited): + _, _, optional, _decoded = values + obj = klass(value=value, optional=optional, _decoded=_decoded) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertEqual(bytes(obj_decoded), bytes(obj_expled)) + self.assertEqual(bytes(obj_decoded), bytes(obj)) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + self.assertEqual(obj_decoded.tlen, 0) + self.assertEqual(obj_decoded.llen, 0) + self.assertEqual(obj_decoded.vlen, len(value)) + + +@composite +def choice_values_strat(draw, value_required=False, schema=None, do_expl=False): + if schema is None: + names = list(draw(sets(text_letters(), min_size=1, max_size=5))) + tags = [tag_encode(tag) for tag in draw(sets( + integers(min_value=0), + min_size=len(names), + max_size=len(names), + ))] + schema = [(name, Integer(impl=tag)) for name, tag in zip(names, tags)] + value = None + if value_required or draw(booleans()): + value = draw(tuples( + sampled_from([name for name, _ in schema]), + integers().map(Integer), + )) + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + default = draw(one_of( + none(), + tuples(sampled_from([name for name, _ in schema]), integers().map(Integer)), + )) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (schema, value, expl, default, optional, _decoded) + + +class ChoiceInherited(Choice): + __slots__ = () + + +class TestChoice(CommonMixin, TestCase): + class Wahl(Choice): + schema = (("whatever", Boolean()),) + base_klass = Wahl + + def test_schema_required(self): + with assertRaisesRegex(self, ValueError, "schema must be specified"): + Choice() + + def test_impl_forbidden(self): + with assertRaisesRegex(self, ValueError, "no implicit tag allowed"): + Choice(impl=b"whatever") + + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + self.base_klass(123) + repr(err.exception) + with self.assertRaises(ObjUnknown) as err: + self.base_klass(("whenever", Boolean(False))) + repr(err.exception) + with self.assertRaises(InvalidValueType) as err: + self.base_klass(("whatever", Integer(123))) + repr(err.exception) + + @given(booleans()) + def test_optional(self, optional): + obj = self.base_klass( + default=self.base_klass(("whatever", Boolean(False))), + optional=optional, + ) + self.assertTrue(obj.optional) + + @given(booleans()) + def test_ready(self, value): + obj = self.base_klass() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + self.assertIsNone(obj["whatever"]) + with self.assertRaises(ObjNotReady) as err: + obj.encode() + repr(err.exception) + obj["whatever"] = Boolean() + self.assertFalse(obj.ready) + repr(obj) + pprint(obj) + obj["whatever"] = Boolean(value) + self.assertTrue(obj.ready) + repr(obj) + pprint(obj) + + @given(booleans(), booleans()) + def test_comparison(self, value1, value2): + class WahlInherited(self.base_klass): + __slots__ = () + for klass in (self.base_klass, WahlInherited): + obj1 = klass(("whatever", Boolean(value1))) + obj2 = klass(("whatever", Boolean(value2))) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == obj2._value, value1 == value2) + self.assertFalse(obj1 == obj2._value[1]) + + @given(data_strategy()) + def test_call(self, d): + for klass in (Choice, ChoiceInherited): + ( + schema_initial, + value_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(choice_values_strat()) + + class Wahl(klass): + __slots__ = () + schema = schema_initial + obj_initial = Wahl( + value=value_initial, + expl=expl_initial, + default=default_initial, + optional=optional_initial or False, + _decoded=_decoded_initial, + ) + ( + _, + value, + expl, + default, + optional, + _decoded, + ) = d.draw(choice_values_strat(schema=schema_initial, do_expl=True)) + obj = obj_initial(value, expl, default, optional) + if obj.ready: + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + self.assertEqual(obj.choice, value_expected[0]) + self.assertEqual(obj.value, int(value_expected[1])) + self.assertEqual(obj.expl_tag, expl or expl_initial) + default_expect = default_initial if default is None else default + if default_expect is not None: + self.assertEqual(obj.default.choice, default_expect[0]) + self.assertEqual(obj.default.value, int(default_expect[1])) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + self.assertEqual(obj.specs, obj_initial.specs) + + def test_simultaneous_impl_expl(self): + # override it, as Any does not have implicit tag + pass + + def test_decoded(self): + # override it, as Any does not have implicit tag + pass + + @given(choice_values_strat()) + def test_copy(self, values): + _schema, value, expl, default, optional, _decoded = values + + class Wahl(self.base_klass): + __slots__ = () + schema = _schema + obj = Wahl( + value=value, + expl=expl, + default=default, + optional=optional or False, + _decoded=_decoded, + ) + obj_copied = obj.copy() + self.assertIsNone(obj.tag) + self.assertIsNone(obj_copied.tag) + # hack for assert_copied_basic_fields + obj.tag = "whatever" + obj_copied.tag = "whatever" + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj._value, obj_copied._value) + self.assertEqual(obj.specs, obj_copied.specs) + + @given(booleans()) + def test_stripped(self, value): + obj = self.base_klass(("whatever", Boolean(value))) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + booleans(), + integers(min_value=1).map(tag_ctxc), + ) + def test_stripped_expl(self, value, tag_expl): + obj = self.base_klass(("whatever", Boolean(value)), expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(data_strategy()) + def test_symmetric(self, d): + _schema, value, _, default, optional, _decoded = d.draw( + choice_values_strat(value_required=True) + ) + tag_expl = tag_ctxc(d.draw(integers(min_value=1))) + offset = d.draw(integers(min_value=0)) + + class Wahl(self.base_klass): + __slots__ = () + schema = _schema + obj = Wahl( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self.assertEqual(obj_decoded, obj_expled) + self.assertEqual(obj_decoded.choice, obj_expled.choice) + self.assertEqual(obj_decoded.value, obj_expled.value) + self.assertEqual(obj_decoded.choice, obj.choice) + self.assertEqual(obj_decoded.value, obj.value) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + self.assertSequenceEqual( + obj_expled_encoded[ + obj_decoded.value.offset - offset: + obj_decoded.value.offset + obj_decoded.value.tlvlen - offset + ], + obj_encoded, + ) + + @given(integers()) + def test_set_get(self, value): + class Wahl(Choice): + schema = ( + ("erste", Boolean()), + ("zweite", Integer()), + ) + obj = Wahl() + with self.assertRaises(ObjUnknown) as err: + obj["whatever"] = "whenever" + with self.assertRaises(InvalidValueType) as err: + obj["zweite"] = Boolean(False) + obj["zweite"] = Integer(value) + repr(err.exception) + with self.assertRaises(ObjUnknown) as err: + obj["whatever"] + repr(err.exception) + self.assertIsNone(obj["erste"]) + self.assertEqual(obj["zweite"], Integer(value)) + + def test_tag_mismatch(self): + class Wahl(Choice): + schema = ( + ("erste", Boolean()), + ) + int_encoded = Integer(123).encode() + bool_encoded = Boolean(False).encode() + obj = Wahl() + obj.decode(bool_encoded) + with self.assertRaises(TagMismatch): + obj.decode(int_encoded) + + +@composite +def seq_values_strat(draw, seq_klass, do_expl=False): + value = None + if draw(booleans()): + value = seq_klass() + value._value = { + k: v for k, v in draw(dictionaries( + integers(), + one_of( + booleans().map(Boolean), + integers().map(Integer), + ), + )).items() + } + schema = None + if draw(booleans()): + schema = list(draw(dictionaries( + integers(), + one_of( + booleans().map(Boolean), + integers().map(Integer), + ), + )).items()) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + default = None + if draw(booleans()): + default = seq_klass() + default._value = { + k: v for k, v in draw(dictionaries( + integers(), + one_of( + booleans().map(Boolean), + integers().map(Integer), + ), + )).items() + } + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return (value, schema, impl, expl, default, optional, _decoded) + + +@composite +def sequence_strat(draw, seq_klass): + inputs = draw(lists( + one_of( + tuples(just(Boolean), booleans(), one_of(none(), booleans())), + tuples(just(Integer), integers(), one_of(none(), integers())), + ), + max_size=6, + )) + tags = draw(sets( + integers(min_value=1), + min_size=len(inputs), + max_size=len(inputs), + )) + inits = [ + ({"expl": tag_ctxc(tag)} if expled else {"impl": tag_encode(tag)}) + for tag, expled in zip(tags, draw(lists( + booleans(), + min_size=len(inputs), + max_size=len(inputs), + ))) + ] + empties = [] + for i, optional in enumerate(draw(lists( + sampled_from(("required", "optional", "empty")), + min_size=len(inputs), + max_size=len(inputs), + ))): + if optional in ("optional", "empty"): + inits[i]["optional"] = True + if optional == "empty": + empties.append(i) + empties = set(empties) + names = list(draw(sets( + text_printable, + min_size=len(inputs), + max_size=len(inputs), + ))) + schema = [] + for i, (klass, value, default) in enumerate(inputs): + schema.append((names[i], klass(default=default, **inits[i]))) + seq_name = draw(text_letters()) + Seq = type(seq_name, (seq_klass,), {"__slots__": (), "schema": tuple(schema)}) + seq = Seq() + expects = [] + for i, (klass, value, default) in enumerate(inputs): + name = names[i] + _, spec = schema[i] + expect = { + "name": name, + "optional": False, + "presented": False, + "default_value": None if spec.default is None else default, + "value": None, + } + if i in empties: + expect["optional"] = True + else: + expect["presented"] = True + expect["value"] = value + if spec.optional: + expect["optional"] = True + if default is not None and default == value: + expect["presented"] = False + seq[name] = klass(value) + expects.append(expect) + return seq, expects + + +@composite +def sequences_strat(draw, seq_klass): + tags = draw(sets(integers(min_value=1), min_size=0, max_size=5)) + inits = [ + ({"expl": tag_ctxc(tag)} if expled else {"impl": tag_encode(tag)}) + for tag, expled in zip(tags, draw(lists( + booleans(), + min_size=len(tags), + max_size=len(tags), + ))) + ] + defaulted = set( + i for i, is_default in enumerate(draw(lists( + booleans(), + min_size=len(tags), + max_size=len(tags), + ))) if is_default + ) + names = list(draw(sets( + text_printable, + min_size=len(tags), + max_size=len(tags), + ))) + seq_expectses = draw(lists( + sequence_strat(seq_klass=seq_klass), + min_size=len(tags), + max_size=len(tags), + )) + seqs = [seq for seq, _ in seq_expectses] + schema = [] + for i, (name, seq) in enumerate(zip(names, seqs)): + schema.append(( + name, + seq(default=(seq if i in defaulted else None), **inits[i]), + )) + seq_name = draw(text_letters()) + Seq = type(seq_name, (seq_klass,), {"__slots__": (), "schema": tuple(schema)}) + seq_outer = Seq() + expect_outers = [] + for name, (seq_inner, expects_inner) in zip(names, seq_expectses): + expect = { + "name": name, + "expects": expects_inner, + "presented": False, + } + seq_outer[name] = seq_inner + if seq_outer.specs[name].default is None: + expect["presented"] = True + expect_outers.append(expect) + return seq_outer, expect_outers + + +class SeqMixing(object): + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + self.base_klass((1, 2, 3)) + repr(err.exception) + + def test_invalid_value_type_set(self): + class Seq(self.base_klass): + __slots__ = () + schema = (("whatever", Boolean()),) + seq = Seq() + with self.assertRaises(InvalidValueType) as err: + seq["whatever"] = Integer(123) + repr(err.exception) + + @given(booleans()) + def test_optional(self, optional): + obj = self.base_klass(default=self.base_klass(), optional=optional) + self.assertTrue(obj.optional) + + @given(data_strategy()) + def test_ready(self, d): + ready = { + str(i): v for i, v in enumerate(d.draw(lists( + booleans(), + min_size=1, + max_size=3, + ))) + } + non_ready = { + str(i + len(ready)): v for i, v in enumerate(d.draw(lists( + booleans(), + min_size=1, + max_size=3, + ))) + } + schema_input = [] + for name in d.draw(permutations( + list(ready.keys()) + list(non_ready.keys()), + )): + schema_input.append((name, Boolean())) + + class Seq(self.base_klass): + __slots__ = () + schema = tuple(schema_input) + seq = Seq() + for name in ready.keys(): + seq[name] + seq[name] = Boolean() + self.assertFalse(seq.ready) + repr(seq) + pprint(seq) + for name, value in ready.items(): + seq[name] = Boolean(value) + self.assertFalse(seq.ready) + repr(seq) + pprint(seq) + with self.assertRaises(ObjNotReady) as err: + seq.encode() + repr(err.exception) + for name, value in non_ready.items(): + seq[name] = Boolean(value) + self.assertTrue(seq.ready) + repr(seq) + pprint(seq) + + @given(data_strategy()) + def test_call(self, d): + class SeqInherited(self.base_klass): + __slots__ = () + for klass in (self.base_klass, SeqInherited): + ( + value_initial, + schema_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(seq_values_strat(seq_klass=klass)) + obj_initial = klass( + value_initial, + schema_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial or False, + _decoded_initial, + ) + ( + value, + _, + impl, + expl, + default, + optional, + _decoded, + ) = d.draw(seq_values_strat( + seq_klass=klass, + do_expl=impl_initial is None, + )) + obj = obj_initial(value, impl, expl, default, optional) + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + self.assertEqual(obj._value, getattr(value_expected, "_value", {})) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + self.assertEqual( + {} if obj.default is None else obj.default._value, + getattr(default_initial if default is None else default, "_value", {}), + ) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(list(obj.specs.items()), schema_initial or []) + self.assertEqual(obj.optional, optional) + + @given(data_strategy()) + def test_copy(self, d): + class SeqInherited(self.base_klass): + __slots__ = () + for klass in (self.base_klass, SeqInherited): + values = d.draw(seq_values_strat(seq_klass=klass)) + obj = klass(*values) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj.specs, obj_copied.specs) + self.assertEqual(obj._value, obj_copied._value) + + @given(data_strategy()) + def test_stripped(self, d): + value = d.draw(integers()) + tag_impl = tag_encode(d.draw(integers(min_value=1))) + + class Seq(self.base_klass): + __slots__ = () + impl = tag_impl + schema = (("whatever", Integer()),) + seq = Seq() + seq["whatever"] = Integer(value) + with self.assertRaises(NotEnoughData): + seq.decode(seq.encode()[:-1]) + + @given(data_strategy()) + def test_stripped_expl(self, d): + value = d.draw(integers()) + tag_expl = tag_ctxc(d.draw(integers(min_value=1))) + + class Seq(self.base_klass): + __slots__ = () + expl = tag_expl + schema = (("whatever", Integer()),) + seq = Seq() + seq["whatever"] = Integer(value) + with self.assertRaises(NotEnoughData): + seq.decode(seq.encode()[:-1]) + + @given(binary(min_size=2)) + def test_non_tag_mismatch_raised(self, junk): + try: + _, _, len_encoded = tag_strip(memoryview(junk)) + len_decode(len_encoded) + except Exception: + assume(True) + else: + assume(False) + + class Seq(self.base_klass): + __slots__ = () + schema = ( + ("whatever", Integer()), + ("junk", Any()), + ("whenever", Integer()), + ) + seq = Seq() + seq["whatever"] = Integer(123) + seq["junk"] = Any(junk) + seq["whenever"] = Integer(123) + with self.assertRaises(DecodeError): + seq.decode(seq.encode()) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + self.base_klass().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + self.base_klass().decode( + self.base_klass.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + def _assert_expects(self, seq, expects): + for expect in expects: + self.assertEqual( + seq.specs[expect["name"]].optional, + expect["optional"], + ) + if expect["default_value"] is not None: + self.assertEqual( + seq.specs[expect["name"]].default, + expect["default_value"], + ) + if expect["presented"]: + self.assertIn(expect["name"], seq) + self.assertEqual(seq[expect["name"]], expect["value"]) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(data_strategy()) + def test_symmetric(self, d): + seq, expects = d.draw(sequence_strat(seq_klass=self.base_klass)) + self.assertTrue(seq.ready) + self.assertFalse(seq.decoded) + self._assert_expects(seq, expects) + repr(seq) + pprint(seq) + seq_encoded = seq.encode() + seq_decoded, tail = seq.decode(seq_encoded) + self.assertEqual(tail, b"") + self.assertTrue(seq.ready) + self._assert_expects(seq_decoded, expects) + self.assertEqual(seq, seq_decoded) + self.assertEqual(seq_decoded.encode(), seq_encoded) + for expect in expects: + if not expect["presented"]: + self.assertNotIn(expect["name"], seq_decoded) + continue + self.assertIn(expect["name"], seq_decoded) + obj = seq_decoded[expect["name"]] + self.assertTrue(obj.decoded) + offset = obj.expl_offset if obj.expled else obj.offset + tlvlen = obj.expl_tlvlen if obj.expled else obj.tlvlen + self.assertSequenceEqual( + seq_encoded[offset:offset + tlvlen], + obj.encode(), + ) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(data_strategy()) + def test_symmetric_with_seq(self, d): + seq, expect_outers = d.draw(sequences_strat(seq_klass=self.base_klass)) + self.assertTrue(seq.ready) + seq_encoded = seq.encode() + seq_decoded, tail = seq.decode(seq_encoded) + self.assertEqual(tail, b"") + self.assertTrue(seq.ready) + self.assertEqual(seq, seq_decoded) + self.assertEqual(seq_decoded.encode(), seq_encoded) + for expect_outer in expect_outers: + if not expect_outer["presented"]: + self.assertNotIn(expect_outer["name"], seq_decoded) + continue + self.assertIn(expect_outer["name"], seq_decoded) + obj = seq_decoded[expect_outer["name"]] + self.assertTrue(obj.decoded) + offset = obj.expl_offset if obj.expled else obj.offset + tlvlen = obj.expl_tlvlen if obj.expled else obj.tlvlen + self.assertSequenceEqual( + seq_encoded[offset:offset + tlvlen], + obj.encode(), + ) + self._assert_expects(obj, expect_outer["expects"]) + + @given(data_strategy()) + def test_default_disappears(self, d): + _schema = list(d.draw(dictionaries( + text_letters(), + sets(integers(), min_size=2, max_size=2), + min_size=1, + )).items()) + + class Seq(self.base_klass): + __slots__ = () + schema = [ + (n, Integer(default=d)) + for n, (_, d) in _schema + ] + seq = Seq() + for name, (value, _) in _schema: + seq[name] = Integer(value) + self.assertEqual(len(seq._value), len(_schema)) + empty_seq = b"".join((self.base_klass.tag_default, len_encode(0))) + self.assertGreater(len(seq.encode()), len(empty_seq)) + for name, (_, default) in _schema: + seq[name] = Integer(default) + self.assertEqual(len(seq._value), 0) + self.assertSequenceEqual(seq.encode(), empty_seq) + + @given(data_strategy()) + def test_encoded_default_accepted(self, d): + _schema = list(d.draw(dictionaries( + text_letters(), + integers(), + min_size=1, + )).items()) + tags = [tag_encode(tag) for tag in d.draw(sets( + integers(min_value=0), + min_size=len(_schema), + max_size=len(_schema), + ))] + + class SeqWithoutDefault(self.base_klass): + __slots__ = () + schema = [ + (n, Integer(impl=t)) + for (n, _), t in zip(_schema, tags) + ] + seq_without_default = SeqWithoutDefault() + for name, value in _schema: + seq_without_default[name] = Integer(value) + seq_encoded = seq_without_default.encode() + + class SeqWithDefault(self.base_klass): + __slots__ = () + schema = [ + (n, Integer(default=v, impl=t)) + for (n, v), t in zip(_schema, tags) + ] + seq_with_default = SeqWithDefault() + seq_decoded, _ = seq_with_default.decode(seq_encoded) + for name, value in _schema: + self.assertEqual(seq_decoded[name], seq_with_default[name]) + self.assertEqual(seq_decoded[name], value) + + @given(data_strategy()) + def test_missing_from_spec(self, d): + names = list(d.draw(sets(text_letters(), min_size=2))) + tags = [tag_encode(tag) for tag in d.draw(sets( + integers(min_value=0), + min_size=len(names), + max_size=len(names), + ))] + names_tags = [(name, tag) for tag, name in sorted(zip(tags, names))] + + class SeqFull(self.base_klass): + __slots__ = () + schema = [(n, Integer(impl=t)) for n, t in names_tags] + seq_full = SeqFull() + for i, name in enumerate(names): + seq_full[name] = Integer(i) + seq_encoded = seq_full.encode() + altered = names_tags[:-2] + names_tags[-1:] + + class SeqMissing(self.base_klass): + __slots__ = () + schema = [(n, Integer(impl=t)) for n, t in altered] + seq_missing = SeqMissing() + with self.assertRaises(TagMismatch): + seq_missing.decode(seq_encoded) + + +class TestSequence(SeqMixing, CommonMixin, TestCase): + base_klass = Sequence + + @given( + integers(), + binary(min_size=1), + ) + def test_remaining(self, value, junk): + class Seq(Sequence): + __slots__ = () + schema = ( + ("whatever", Integer()), + ) + int_encoded = Integer(value).encode() + junked = b"".join(( + Sequence.tag_default, + len_encode(len(int_encoded + junk)), + int_encoded + junk, + )) + with assertRaisesRegex(self, DecodeError, "remaining"): + Seq().decode(junked) + + @given(sets(text_letters(), min_size=2)) + def test_obj_unknown(self, names): + missing = names.pop() + + class Seq(Sequence): + __slots__ = () + schema = [(n, Boolean()) for n in names] + seq = Seq() + with self.assertRaises(ObjUnknown) as err: + seq[missing] + repr(err.exception) + with self.assertRaises(ObjUnknown) as err: + seq[missing] = Boolean() + repr(err.exception) + + +class TestSet(SeqMixing, CommonMixin, TestCase): + base_klass = Set + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(data_strategy()) + def test_sorted(self, d): + tags = [ + tag_encode(tag) for tag in + d.draw(sets(integers(min_value=1), min_size=1, max_size=10)) + ] + + class Seq(Set): + __slots__ = () + schema = [(str(i), OctetString(impl=t)) for i, t in enumerate(tags)] + seq = Seq() + for name, _ in Seq.schema: + seq[name] = OctetString(b"") + seq_encoded = seq.encode() + seq_decoded, _ = seq.decode(seq_encoded) + self.assertSequenceEqual( + seq_encoded[seq_decoded.tlen + seq_decoded.llen:], + b"".join(sorted([seq[name].encode() for name, _ in Seq.schema])), + ) + + +@composite +def seqof_values_strat(draw, schema=None, do_expl=False): + if schema is None: + schema = draw(sampled_from((Boolean(), Integer()))) + bound_min, bound_max = sorted(draw(sets( + integers(min_value=0, max_value=10), + min_size=2, + max_size=2, + ))) + if isinstance(schema, Boolean): + values_generator = booleans().map(Boolean) + elif isinstance(schema, Integer): + values_generator = integers().map(Integer) + values_generator = lists( + values_generator, + min_size=bound_min, + max_size=bound_max, + ) + values = draw(one_of(none(), values_generator)) + impl = None + expl = None + if do_expl: + expl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + else: + impl = draw(one_of(none(), integers(min_value=1).map(tag_encode))) + default = draw(one_of(none(), values_generator)) + optional = draw(one_of(none(), booleans())) + _decoded = ( + draw(integers(min_value=0)), + draw(integers(min_value=0)), + draw(integers(min_value=0)), + ) + return ( + schema, + values, + (bound_min, bound_max), + impl, + expl, + default, + optional, + _decoded, + ) + + +class SeqOfMixing(object): + def test_invalid_value_type(self): + with self.assertRaises(InvalidValueType) as err: + self.base_klass(123) + repr(err.exception) + + def test_invalid_values_type(self): + class SeqOf(self.base_klass): + __slots__ = () + schema = Integer() + with self.assertRaises(InvalidValueType) as err: + SeqOf([Integer(123), Boolean(False), Integer(234)]) + repr(err.exception) + + def test_schema_required(self): + with assertRaisesRegex(self, ValueError, "schema must be specified"): + self.base_klass.__mro__[1]() + + @given(booleans(), booleans(), binary(), binary()) + def test_comparison(self, value1, value2, tag1, tag2): + class SeqOf(self.base_klass): + __slots__ = () + schema = Boolean() + obj1 = SeqOf([Boolean(value1)]) + obj2 = SeqOf([Boolean(value2)]) + self.assertEqual(obj1 == obj2, value1 == value2) + self.assertEqual(obj1 == list(obj2), value1 == value2) + self.assertEqual(obj1 == tuple(obj2), value1 == value2) + obj1 = SeqOf([Boolean(value1)], impl=tag1) + obj2 = SeqOf([Boolean(value1)], impl=tag2) + self.assertEqual(obj1 == obj2, tag1 == tag2) + + @given(lists(booleans())) + def test_iter(self, values): + class SeqOf(self.base_klass): + __slots__ = () + schema = Boolean() + obj = SeqOf([Boolean(value) for value in values]) + self.assertEqual(len(obj), len(values)) + for i, value in enumerate(obj): + self.assertEqual(value, values[i]) + + @given(data_strategy()) + def test_ready(self, d): + ready = [Integer(v) for v in d.draw(lists( + integers(), + min_size=1, + max_size=3, + ))] + non_ready = [ + Integer() for _ in + range(d.draw(integers(min_value=1, max_value=5))) + ] + + class SeqOf(self.base_klass): + __slots__ = () + schema = Integer() + values = d.draw(permutations(ready + non_ready)) + seqof = SeqOf() + for value in values: + seqof.append(value) + self.assertFalse(seqof.ready) + repr(seqof) + pprint(seqof) + with self.assertRaises(ObjNotReady) as err: + seqof.encode() + repr(err.exception) + for i, value in enumerate(values): + self.assertEqual(seqof[i], value) + if not seqof[i].ready: + seqof[i] = Integer(i) + self.assertTrue(seqof.ready) + repr(seqof) + pprint(seqof) + + def test_spec_mismatch(self): + class SeqOf(self.base_klass): + __slots__ = () + schema = Integer() + seqof = SeqOf() + seqof.append(Integer(123)) + with self.assertRaises(ValueError): + seqof.append(Boolean(False)) + with self.assertRaises(ValueError): + seqof[0] = Boolean(False) + + @given(data_strategy()) + def test_bounds_satisfied(self, d): + class SeqOf(self.base_klass): + __slots__ = () + schema = Boolean() + bound_min = d.draw(integers(min_value=0, max_value=1 << 7)) + bound_max = d.draw(integers(min_value=bound_min, max_value=1 << 7)) + value = [Boolean()] * d.draw(integers(min_value=bound_min, max_value=bound_max)) + SeqOf(value=value, bounds=(bound_min, bound_max)) + + @given(data_strategy()) + def test_bounds_unsatisfied(self, d): + class SeqOf(self.base_klass): + __slots__ = () + schema = Boolean() + bound_min = d.draw(integers(min_value=1, max_value=1 << 7)) + bound_max = d.draw(integers(min_value=bound_min, max_value=1 << 7)) + value = [Boolean()] * d.draw(integers(max_value=bound_min - 1)) + with self.assertRaises(BoundsError) as err: + SeqOf(value=value, bounds=(bound_min, bound_max)) + repr(err.exception) + value = [Boolean()] * d.draw(integers( + min_value=bound_max + 1, + max_value=bound_max + 10, + )) + with self.assertRaises(BoundsError) as err: + SeqOf(value=value, bounds=(bound_min, bound_max)) + repr(err.exception) + + @given(integers(min_value=1, max_value=10)) + def test_out_of_bounds(self, bound_max): + class SeqOf(self.base_klass): + __slots__ = () + schema = Integer() + bounds = (0, bound_max) + seqof = SeqOf() + for _ in range(bound_max): + seqof.append(Integer(123)) + with self.assertRaises(BoundsError): + seqof.append(Integer(123)) + + @given(data_strategy()) + def test_call(self, d): + ( + schema_initial, + value_initial, + bounds_initial, + impl_initial, + expl_initial, + default_initial, + optional_initial, + _decoded_initial, + ) = d.draw(seqof_values_strat()) + + class SeqOf(self.base_klass): + __slots__ = () + schema = schema_initial + obj_initial = SeqOf( + value=value_initial, + bounds=bounds_initial, + impl=impl_initial, + expl=expl_initial, + default=default_initial, + optional=optional_initial or False, + _decoded=_decoded_initial, + ) + ( + _, + value, + bounds, + impl, + expl, + default, + optional, + _decoded, + ) = d.draw(seqof_values_strat( + schema=schema_initial, + do_expl=impl_initial is None, + )) + if (default is None) and (obj_initial.default is not None): + bounds = None + if ( + (bounds is None) and + (value is not None) and + (bounds_initial is not None) and + not (bounds_initial[0] <= len(value) <= bounds_initial[1]) + ): + value = None + if ( + (bounds is None) and + (default is not None) and + (bounds_initial is not None) and + not (bounds_initial[0] <= len(default) <= bounds_initial[1]) + ): + default = None + obj = obj_initial( + value=value, + bounds=bounds, + impl=impl, + expl=expl, + default=default, + optional=optional, + ) + if obj.ready: + value_expected = default if value is None else value + value_expected = ( + default_initial if value_expected is None + else value_expected + ) + value_expected = () if value_expected is None else value_expected + self.assertEqual(obj, value_expected) + self.assertEqual(obj.tag, impl or impl_initial or obj.tag_default) + self.assertEqual(obj.expl_tag, expl or expl_initial) + self.assertEqual( + obj.default, + default_initial if default is None else default, + ) + if obj.default is None: + optional = optional_initial if optional is None else optional + optional = False if optional is None else optional + else: + optional = True + self.assertEqual(obj.optional, optional) + self.assertEqual( + (obj._bound_min, obj._bound_max), + bounds or bounds_initial or (0, float("+inf")), + ) + + @given(seqof_values_strat()) + def test_copy(self, values): + _schema, value, bounds, impl, expl, default, optional, _decoded = values + + class SeqOf(self.base_klass): + __slots__ = () + schema = _schema + obj = SeqOf( + value=value, + bounds=bounds, + impl=impl, + expl=expl, + default=default, + optional=optional or False, + _decoded=_decoded, + ) + obj_copied = obj.copy() + self.assert_copied_basic_fields(obj, obj_copied) + self.assertEqual(obj._bound_min, obj_copied._bound_min) + self.assertEqual(obj._bound_max, obj_copied._bound_max) + self.assertEqual(obj._value, obj_copied._value) + + @given( + lists(binary()), + integers(min_value=1).map(tag_encode), + ) + def test_stripped(self, values, tag_impl): + class SeqOf(self.base_klass): + __slots__ = () + schema = OctetString() + obj = SeqOf([OctetString(v) for v in values], impl=tag_impl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + lists(binary()), + integers(min_value=1).map(tag_ctxc), + ) + def test_stripped_expl(self, values, tag_expl): + class SeqOf(self.base_klass): + __slots__ = () + schema = OctetString() + obj = SeqOf([OctetString(v) for v in values], expl=tag_expl) + with self.assertRaises(NotEnoughData): + obj.decode(obj.encode()[:-1]) + + @given( + integers(min_value=31), + integers(min_value=0), + lists(integers()), + ) + def test_bad_tag(self, tag, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + self.base_klass().decode( + tag_encode(tag)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given( + integers(min_value=128), + integers(min_value=0), + lists(integers()), + ) + def test_bad_len(self, l, offset, decode_path): + decode_path = tuple(str(i) for i in decode_path) + with self.assertRaises(DecodeError) as err: + self.base_klass().decode( + self.base_klass.tag_default + len_encode(l)[:-1], + offset=offset, + decode_path=decode_path, + ) + repr(err.exception) + self.assertEqual(err.exception.offset, offset) + self.assertEqual(err.exception.decode_path, decode_path) + + @given(binary(min_size=1)) + def test_tag_mismatch(self, impl): + assume(impl != self.base_klass.tag_default) + with self.assertRaises(TagMismatch): + self.base_klass(impl=impl).decode(self.base_klass().encode()) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given( + seqof_values_strat(schema=Integer()), + lists(integers().map(Integer)), + integers(min_value=1).map(tag_ctxc), + integers(min_value=0), + ) + def test_symmetric(self, values, value, tag_expl, offset): + _, _, _, _, _, default, optional, _decoded = values + + class SeqOf(self.base_klass): + __slots__ = () + schema = Integer() + obj = SeqOf( + value=value, + default=default, + optional=optional, + _decoded=_decoded, + ) + repr(obj) + pprint(obj) + self.assertFalse(obj.expled) + obj_encoded = obj.encode() + obj_expled = obj(value, expl=tag_expl) + self.assertTrue(obj_expled.expled) + repr(obj_expled) + pprint(obj_expled) + obj_expled_encoded = obj_expled.encode() + obj_decoded, tail = obj_expled.decode(obj_expled_encoded, offset=offset) + repr(obj_decoded) + pprint(obj_decoded) + self.assertEqual(tail, b"") + self._test_symmetric_compare_objs(obj_decoded, obj_expled) + self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) + self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) + self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) + self.assertEqual( + obj_decoded.expl_llen, + len(len_encode(len(obj_encoded))), + ) + self.assertEqual(obj_decoded.tlvlen, len(obj_encoded)) + self.assertEqual(obj_decoded.expl_vlen, len(obj_encoded)) + self.assertEqual( + obj_decoded.offset, + offset + obj_decoded.expl_tlen + obj_decoded.expl_llen, + ) + self.assertEqual(obj_decoded.expl_offset, offset) + for obj_inner in obj_decoded: + self.assertIn(obj_inner, obj_decoded) + self.assertSequenceEqual( + obj_inner.encode(), + obj_expled_encoded[ + obj_inner.offset - offset: + obj_inner.offset + obj_inner.tlvlen - offset + ], + ) + + +class TestSequenceOf(SeqOfMixing, CommonMixin, TestCase): + class SeqOf(SequenceOf): + __slots__ = () + schema = "whatever" + base_klass = SeqOf + + def _test_symmetric_compare_objs(self, obj1, obj2): + self.assertEqual(obj1, obj2) + self.assertSequenceEqual(list(obj1), list(obj2)) + + +class TestSetOf(SeqOfMixing, CommonMixin, TestCase): + class SeqOf(SetOf): + __slots__ = () + schema = "whatever" + base_klass = SeqOf + + def _test_symmetric_compare_objs(self, obj1, obj2): + self.assertSetEqual( + set(int(v) for v in obj1), + set(int(v) for v in obj2), + ) + + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(data_strategy()) + def test_sorted(self, d): + values = [OctetString(v) for v in d.draw(lists(binary()))] + + class Seq(SetOf): + __slots__ = () + schema = OctetString() + seq = Seq(values) + seq_encoded = seq.encode() + seq_decoded, _ = seq.decode(seq_encoded) + self.assertSequenceEqual( + seq_encoded[seq_decoded.tlen + seq_decoded.llen:], + b"".join(sorted([v.encode() for v in values])), + ) + + +class TestGoMarshalVectors(TestCase): + def runTest(self): + self.assertSequenceEqual(Integer(10).encode(), hexdec("02010a")) + self.assertSequenceEqual(Integer(127).encode(), hexdec("02017f")) + self.assertSequenceEqual(Integer(128).encode(), hexdec("02020080")) + self.assertSequenceEqual(Integer(-128).encode(), hexdec("020180")) + self.assertSequenceEqual(Integer(-129).encode(), hexdec("0202ff7f")) + + class Seq(Sequence): + __slots__ = () + schema = ( + ("erste", Integer()), + ("zweite", Integer(optional=True)) + ) + seq = Seq() + seq["erste"] = Integer(64) + self.assertSequenceEqual(seq.encode(), hexdec("3003020140")) + seq["erste"] = Integer(0x123456) + self.assertSequenceEqual(seq.encode(), hexdec("30050203123456")) + seq["erste"] = Integer(64) + seq["zweite"] = Integer(65) + self.assertSequenceEqual(seq.encode(), hexdec("3006020140020141")) + + class NestedSeq(Sequence): + __slots__ = () + schema = ( + ("nest", Seq()), + ) + seq["erste"] = Integer(127) + seq["zweite"] = None + nested = NestedSeq() + nested["nest"] = seq + self.assertSequenceEqual(nested.encode(), hexdec("3005300302017f")) + + self.assertSequenceEqual( + OctetString(b"\x01\x02\x03").encode(), + hexdec("0403010203"), + ) + + class Seq(Sequence): + __slots__ = () + schema = ( + ("erste", Integer(impl=tag_encode(5, klass=TagClassContext))), + ) + seq = Seq() + seq["erste"] = Integer(64) + self.assertSequenceEqual(seq.encode(), hexdec("3003850140")) + + class Seq(Sequence): + __slots__ = () + schema = ( + ("erste", Integer(expl=tag_ctxc(5))), + ) + seq = Seq() + seq["erste"] = Integer(64) + self.assertSequenceEqual(seq.encode(), hexdec("3005a503020140")) + + class Seq(Sequence): + __slots__ = () + schema = ( + ("erste", Null( + impl=tag_encode(0, klass=TagClassContext), + optional=True, + )), + ) + seq = Seq() + seq["erste"] = Null() + self.assertSequenceEqual(seq.encode(), hexdec("30028000")) + seq["erste"] = None + self.assertSequenceEqual(seq.encode(), hexdec("3000")) + + self.assertSequenceEqual( + UTCTime(datetime(1970, 1, 1, 0, 0)).encode(), + hexdec("170d3730303130313030303030305a"), + ) + self.assertSequenceEqual( + UTCTime(datetime(2009, 11, 15, 22, 56, 16)).encode(), + hexdec("170d3039313131353232353631365a"), + ) + self.assertSequenceEqual( + GeneralizedTime(datetime(2100, 4, 5, 12, 1, 1)).encode(), + hexdec("180f32313030303430353132303130315a"), + ) + + class Seq(Sequence): + __slots__ = () + schema = ( + ("erste", GeneralizedTime()), + ) + seq = Seq() + seq["erste"] = GeneralizedTime(datetime(2009, 11, 15, 22, 56, 16)) + self.assertSequenceEqual( + seq.encode(), + hexdec("3011180f32303039313131353232353631365a"), + ) + + self.assertSequenceEqual( + BitString((1, b"\x80")).encode(), + hexdec("03020780"), + ) + self.assertSequenceEqual( + BitString((12, b"\x81\xF0")).encode(), + hexdec("03030481f0"), + ) + + self.assertSequenceEqual( + ObjectIdentifier("").encode(), + hexdec("06032a0304"), + ) + self.assertSequenceEqual( + ObjectIdentifier("1.2.840.133549.1.1.5").encode(), + hexdec("06092a864888932d010105"), + ) + self.assertSequenceEqual( + ObjectIdentifier("2.100.3").encode(), + hexdec("0603813403"), + ) + + self.assertSequenceEqual( + PrintableString("test").encode(), + hexdec("130474657374"), + ) + self.assertSequenceEqual( + PrintableString("x" * 127).encode(), + hexdec("137F" + "78" * 127), + ) + self.assertSequenceEqual( + PrintableString("x" * 128).encode(), + hexdec("138180" + "78" * 128), + ) + self.assertSequenceEqual(UTF8String("Σ").encode(), hexdec("0c02cea3")) + + class Seq(Sequence): + __slots__ = () + schema = ( + ("erste", IA5String()), + ) + seq = Seq() + seq["erste"] = IA5String("test") + self.assertSequenceEqual(seq.encode(), hexdec("3006160474657374")) + + class Seq(Sequence): + __slots__ = () + schema = ( + ("erste", PrintableString()), + ) + seq = Seq() + seq["erste"] = PrintableString("test") + self.assertSequenceEqual(seq.encode(), hexdec("3006130474657374")) + seq["erste"] = PrintableString("test*") + self.assertSequenceEqual(seq.encode(), hexdec("30071305746573742a")) + + class Seq(Sequence): + __slots__ = () + schema = ( + ("erste", Any(optional=True)), + ("zweite", Integer()), + ) + seq = Seq() + seq["zweite"] = Integer(64) + self.assertSequenceEqual(seq.encode(), hexdec("3003020140")) + + class Seq(SetOf): + __slots__ = () + schema = Integer() + seq = Seq() + seq.append(Integer(10)) + self.assertSequenceEqual(seq.encode(), hexdec("310302010a")) + + class _SeqOf(SequenceOf): + __slots__ = () + schema = PrintableString() + + class SeqOf(SequenceOf): + __slots__ = () + schema = _SeqOf() + _seqof = _SeqOf() + _seqof.append(PrintableString("1")) + seqof = SeqOf() + seqof.append(_seqof) + self.assertSequenceEqual(seqof.encode(), hexdec("30053003130131")) + + class Seq(Sequence): + __slots__ = () + schema = ( + ("erste", Integer(default=1)), + ) + seq = Seq() + seq["erste"] = Integer(0) + self.assertSequenceEqual(seq.encode(), hexdec("3003020100")) + seq["erste"] = Integer(1) + self.assertSequenceEqual(seq.encode(), hexdec("3000")) + seq["erste"] = Integer(2) + self.assertSequenceEqual(seq.encode(), hexdec("3003020102")) + + +class TestPP(TestCase): + @given(data_strategy()) + def test_oid_printing(self, d): + oids = { + str(ObjectIdentifier(k)): v * 2 + for k, v in d.draw(dictionaries(oid_strategy(), text_letters())).items() + } + chosen = d.draw(sampled_from(sorted(oids))) + chosen_id = oids[chosen] + pp = _pp(asn1_type_name=ObjectIdentifier.asn1_type_name, value=chosen) + self.assertNotIn(chosen_id, pp_console_row(pp)) + self.assertIn(chosen_id, pp_console_row(pp, oids=oids)) -- 2.44.0