From: Sergey Matveev Date: Wed, 12 Feb 2020 07:42:30 +0000 (+0300) Subject: Initial support for CER encoding X-Git-Tag: 7.0~10 X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=commitdiff_plain;h=011b627453f5bfe82bdd160edbb249a73f21082a Initial support for CER encoding --- diff --git a/MANIFEST.in b/MANIFEST.in index 7041441..1163e79 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -28,6 +28,7 @@ include tests/compli_test_suite/LICENSE include tests/compli_test_suite/pdf/free_asn1_testsuite.pdf include tests/compli_test_suite/README.md include tests/compli_test_suite/suite/tc*.ber +include tests/test_cms.py include tests/test_compli.py include tests/test_crl.py include tests/test_crts.py diff --git a/doc/news.rst b/doc/news.rst index e9749c2..2419f8e 100644 --- a/doc/news.rst +++ b/doc/news.rst @@ -9,11 +9,15 @@ News same tag to be successfully decoded * Fixed possibly invalid SET DER encoding where objects were not sorted by tag, but by encoded representation -* Any does not allow empty data value now. Now it checks if it has valid - ASN.1 tag +* ``Any`` does not allow empty data value now. Now it checks if it has + valid ASN.1 tag +* ``Any`` allows an ordinary ``Obj`` storing, without its forceful + encoded representation storage * Initial support for so called ``evgen_mode``: event generation mode, where no in-memory objects storing happens, giving ability to process ASN.1 data without fully parsing it first +* Initial experimental CER encoding mode, allowing streaming encoding of + the data directly to some writeable object .. _release6.3: diff --git a/pyderasn.py b/pyderasn.py index 24204ce..3533c6b 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -643,6 +643,7 @@ Various .. autofunction:: pyderasn.abs_decode_path .. autofunction:: pyderasn.colonize_hex +.. autofunction:: pyderasn.encode_cer .. autofunction:: pyderasn.hexenc .. autofunction:: pyderasn.hexdec .. autofunction:: pyderasn.tag_encode @@ -779,6 +780,7 @@ from collections import OrderedDict from copy import copy from datetime import datetime from datetime import timedelta +from io import BytesIO from math import ceil from operator import attrgetter from string import ascii_letters @@ -819,6 +821,7 @@ __all__ = ( "Choice", "DecodeError", "DecodePathDefBy", + "encode_cer", "Enumerated", "ExceedingData", "GeneralizedTime", @@ -1190,6 +1193,23 @@ def len_decode(data): return l, 1 + octets_num, data[1 + octets_num:] +LEN1K = len_encode(1000) + + +def write_full(writer, data): + """Fully write provided data + + BytesIO does not guarantee that the whole data will be written at once. + """ + data = memoryview(data) + written = 0 + while written != len(data): + n = writer(data[written:]) + if n is None: + raise ValueError("can not write to buf") + written += n + + ######################################################################## # Base class ######################################################################## @@ -1314,6 +1334,10 @@ class Obj(object): """ return self._tag_order + @property + def tag_order_cer(self): + return self.tag_order + @property def tlen(self): """See :ref:`decoding` @@ -1357,6 +1381,19 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) + def encode_cer(self, writer): + if self._expl is not None: + write_full(writer, self._expl + LENINDEF) + if getattr(self, "der_forced", False): + write_full(writer, self._encode()) + else: + self._encode_cer(writer) + if self._expl is not None: + write_full(writer, EOC) + + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def hexencode(self): """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode` """ @@ -1648,6 +1685,14 @@ class Obj(object): ) +def encode_cer(obj): + """Encode to CER in memory + """ + buf = BytesIO() + obj.encode_cer(buf.write) + return buf.getvalue() + + class DecodePathDefBy(object): """DEFINED BY representation inside decode path """ @@ -2784,6 +2829,30 @@ class BitString(Obj): octets, )) + def _encode_cer(self, writer): + bit_len, octets = self._value + if len(octets) + 1 <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 999) * 999, 999): + write_full(writer, b"".join(( + BitString.tag_default, + LEN1K, + int2byte(0), + octets[offset:offset + 999], + ))) + tail = octets[offset+999:] + if len(tail) > 0: + tail = int2byte((8 - bit_len % 8) % 8) + tail + write_full(writer, b"".join(( + BitString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) @@ -3214,6 +3283,28 @@ class OctetString(Obj): self._value, )) + def _encode_cer(self, writer): + octets = self._value + if len(octets) <= 1000: + write_full(writer, self._encode()) + return + write_full(writer, self.tag_constructed) + write_full(writer, LENINDEF) + for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000): + write_full(writer, b"".join(( + OctetString.tag_default, + LEN1K, + octets[offset:offset + 1000], + ))) + tail = octets[offset+1000:] + if len(tail) > 0: + write_full(writer, b"".join(( + OctetString.tag_default, + len_encode(len(tail)), + tail, + ))) + write_full(writer, EOC) + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) @@ -4534,6 +4625,9 @@ class UTCTime(VisibleString): value = self._encode_time() return b"".join((self.tag, len_encode(len(value)), value)) + def _encode_cer(self, writer): + write_full(writer, self._encode()) + def todatetime(self): return self._value @@ -4905,6 +4999,10 @@ class Choice(Obj): self._assert_ready() return self._value[1].tag_order if self._tag_order is None else self._tag_order + @property + def tag_order_cer(self): + return min(v.tag_order_cer for v in itervalues(self.specs)) + def __getitem__(self, key): if key not in self.specs: raise ObjUnknown(key) @@ -4935,6 +5033,10 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() + def _encode_cer(self, writer): + self._assert_ready() + self._value[1].encode_cer(writer) + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): for choice, spec in iteritems(self.specs): sub_decode_path = decode_path + (choice,) @@ -5066,7 +5168,7 @@ class Any(Obj): """``ANY`` special type >>> Any(Integer(-123)) - ANY 020185 + ANY INTEGER -123 (0X:7B) >>> a = Any(OctetString(b"hello world").encode()) ANY 040b68656c6c6f20776f726c64 >>> hexenc(bytes(a)) @@ -5114,9 +5216,9 @@ class Any(Obj): return value if isinstance(value, self.__class__): return value._value - if isinstance(value, Obj): - return value.encode() - raise InvalidValueType((self.__class__, Obj, binary_type)) + if not isinstance(value, Obj): + raise InvalidValueType((self.__class__, Obj, binary_type)) + return value @property def ready(self): @@ -5160,9 +5262,13 @@ class Any(Obj): def __eq__(self, their): if their.__class__ == binary_type: - return self._value == their + if self._value.__class__ == binary_type: + return self._value == their + return self._value.encode() == their if issubclass(their.__class__, Any): - return self._value == their._value + if self.ready and their.ready: + return bytes(self) == bytes(their) + return self.ready == their.ready return False def __call__( @@ -5179,7 +5285,10 @@ class Any(Obj): def __bytes__(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return self._value.encode() @property def tlen(self): @@ -5187,7 +5296,18 @@ class Any(Obj): def _encode(self): self._assert_ready() - return self._value + value = self._value + if value.__class__ == binary_type: + return value + return value.encode() + + def _encode_cer(self, writer): + self._assert_ready() + value = self._value + if value.__class__ == binary_type: + write_full(writer, value) + else: + value.encode_cer(writer) def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: @@ -5264,12 +5384,20 @@ class Any(Obj): return pp_console_row(next(self.pps())) def pps(self, decode_path=()): + value = self._value + if value is None: + pass + elif value.__class__ == binary_type: + value = None + else: + value = repr(value) yield _pp( obj=self, asn1_type_name=self.asn1_type_name, obj_name=self.__class__.__name__, decode_path=decode_path, - blob=self._value if self.ready else None, + value=value, + blob=self._value if self._value.__class__ == binary_type else None, optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), @@ -5591,6 +5719,12 @@ class Sequence(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) @@ -5853,6 +5987,15 @@ class Set(Sequence): )) return b"".join((self.tag, len_encode(len(v)), v)) + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted( + self._values_for_encoding(), + key=attrgetter("tag_order_cer"), + ): + v.encode_cer(writer) + write_full(writer, EOC) + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): try: t, tlen, lv = tag_strip(tlv) @@ -6216,6 +6359,12 @@ class SequenceOf(Obj): v = b"".join(v.encode() for v in self._values_for_encoding()) return b"".join((self.tag, len_encode(len(v)), v)) + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in self._values_for_encoding(): + v.encode_cer(writer) + write_full(writer, EOC) + def _decode( self, tlv, @@ -6411,6 +6560,12 @@ class SetOf(SequenceOf): v = b"".join(sorted(v.encode() for v in self._values_for_encoding())) return b"".join((self.tag, len_encode(len(v)), v)) + def _encode_cer(self, writer): + write_full(writer, self.tag + LENINDEF) + for v in sorted(encode_cer(v) for v in self._values_for_encoding()): + write_full(writer, v) + write_full(writer, EOC) + def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): return super(SetOf, self)._decode( tlv, diff --git a/tests/test_cms.py b/tests/test_cms.py new file mode 100644 index 0000000..86df541 --- /dev/null +++ b/tests/test_cms.py @@ -0,0 +1,235 @@ +# coding: utf-8 +# PyDERASN -- Python ASN.1 DER codec with abstract structures +# Copyright (C) 2017-2020 Sergey Matveev +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program. If not, see +# . + +from datetime import datetime +from hashlib import sha256 +from os import urandom +from random import randint +from subprocess import call +from unittest import TestCase + +from pyderasn import Any +from pyderasn import Choice +from pyderasn import encode_cer +from pyderasn import Integer +from pyderasn import ObjectIdentifier +from pyderasn import OctetString +from pyderasn import Sequence +from pyderasn import SetOf +from pyderasn import tag_ctxc +from pyderasn import tag_ctxp +from pyderasn import UTCTime +from tests.test_crts import AlgorithmIdentifier +from tests.test_crts import Certificate +from tests.test_crts import SubjectKeyIdentifier +from tests.test_crts import Time + + +class CMSVersion(Integer): + schema = ( + ("v0", 0), + ("v1", 1), + ("v2", 2), + ("v3", 3), + ("v4", 4), + ("v5", 5), + ) + + +class AttributeValue(Any): + pass + + +class AttributeValues(SetOf): + schema = AttributeValue() + + +class Attribute(Sequence): + schema = ( + ("attrType", ObjectIdentifier()), + ("attrValues", AttributeValues()), + ) + + +class SignatureAlgorithmIdentifier(AlgorithmIdentifier): + pass + + +class SignedAttributes(SetOf): + schema = Attribute() + bounds = (1, 32) + der_forced = True + + +class SignerIdentifier(Choice): + schema = ( + # ("issuerAndSerialNumber", IssuerAndSerialNumber()), + ("subjectKeyIdentifier", SubjectKeyIdentifier(impl=tag_ctxp(0))), + ) + + +class DigestAlgorithmIdentifiers(SetOf): + schema = AlgorithmIdentifier() + + +class DigestAlgorithmIdentifier(AlgorithmIdentifier): + pass + + +class SignatureValue(OctetString): + pass + + +class SignerInfo(Sequence): + schema = ( + ("version", CMSVersion()), + ("sid", SignerIdentifier()), + ("digestAlgorithm", DigestAlgorithmIdentifier()), + ("signedAttrs", SignedAttributes(impl=tag_ctxc(0), optional=True)), + ("signatureAlgorithm", SignatureAlgorithmIdentifier()), + ("signature", SignatureValue()), + # ("unsignedAttrs", UnsignedAttributes(impl=tag_ctxc(1), optional=True)), + ) + + +class SignerInfos(SetOf): + schema = SignerInfo() + + +class ContentType(ObjectIdentifier): + pass + + +class EncapsulatedContentInfo(Sequence): + schema = ( + ("eContentType", ContentType()), + ("eContent", OctetString(expl=tag_ctxc(0), optional=True)), + ) + + +class CertificateChoices(Choice): + schema = ( + ('certificate', Certificate()), + # ... + ) + + +class CertificateSet(SetOf): + schema = CertificateChoices() + + +class SignedData(Sequence): + schema = ( + ("version", CMSVersion()), + ("digestAlgorithms", DigestAlgorithmIdentifiers()), + ("encapContentInfo", EncapsulatedContentInfo()), + ("certificates", CertificateSet(impl=tag_ctxc(0), optional=True)), + # ("crls", RevocationInfoChoices(impl=tag_ctxc(1), optional=True)), + ("signerInfos", SignerInfos()), + ) + + +class ContentInfo(Sequence): + schema = ( + ("contentType", ContentType()), + ("content", Any(expl=tag_ctxc(0))), + ) + + +id_signedData = ObjectIdentifier("1.2.840.113549.1.7.2") +id_sha256 = ObjectIdentifier("2.16.840.1.101.3.4.2.1") +id_data = ObjectIdentifier("1.2.840.113549.1.7.1") +id_ecdsa_with_SHA256 = ObjectIdentifier("1.2.840.10045.4.3.2") +id_pkcs9_at_contentType = ObjectIdentifier("1.2.840.113549.1.9.3") +id_pkcs9_at_messageDigest = ObjectIdentifier("1.2.840.113549.1.9.4") +id_ce_subjectKeyIdentifier = ObjectIdentifier("2.5.29.14") + + +class TestSignedDataCER(TestCase): + def runTest(self): + # openssl ecparam -name prime256v1 -genkey -out key.pem + # openssl req -x509 -new -key key.pem -outform PEM -out cert.pem + # -days 365 -nodes -subj "/CN=doesnotmatter" + with open("cert.cer", "rb") as fd: + cert = Certificate().decod(fd.read()) + for ext in cert["tbsCertificate"]["extensions"]: + if ext["extnID"] == id_ce_subjectKeyIdentifier: + skid = SubjectKeyIdentifier().decod(bytes(ext["extnValue"])) + ai_sha256 = AlgorithmIdentifier(( + ("algorithm", id_sha256), + )) + data = urandom(randint(1000, 3000)) + eci = EncapsulatedContentInfo(( + ("eContentType", ContentType(id_data)), + ("eContent", OctetString(data)), + )) + signed_attrs = SignedAttributes([ + Attribute(( + ("attrType", id_pkcs9_at_contentType), + ("attrValues", AttributeValues([ + AttributeValue(id_data.encode()) + ])), + )), + Attribute(( + ("attrType", id_pkcs9_at_messageDigest), + ("attrValues", AttributeValues([ + AttributeValue(OctetString( + sha256(bytes(eci["eContent"])).digest() + ).encode()), + ])), + )), + ]) + with open("/tmp/in", "wb") as fd: + fd.write(encode_cer(signed_attrs)) + self.assertEqual(0, call(" ".join(( + "openssl dgst -sha256", + "-sign key.pem", + "-binary", + "/tmp/in", + "> /tmp/signature", + )), shell=True)) + ci = ContentInfo(( + ("contentType", ContentType(id_signedData)), + ("content", Any((SignedData(( + ("version", CMSVersion("v3")), + ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha256])), + ("encapContentInfo", eci), + ("certificates", CertificateSet([ + CertificateChoices(("certificate", cert)), + ])), + ("signerInfos", SignerInfos([SignerInfo(( + ("version", CMSVersion("v3")), + ("sid", SignerIdentifier( + ("subjectKeyIdentifier", skid) + )), + ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha256)), + ("signedAttrs", signed_attrs), + ("signatureAlgorithm", SignatureAlgorithmIdentifier(( + ("algorithm", id_ecdsa_with_SHA256), + ))), + ("signature", SignatureValue(open("/tmp/signature", "rb").read())), + ))])), + ))))), + )) + with open("/tmp/out.p7m", "wb") as fd: + fd.write(encode_cer(ci)) + self.assertEqual(0, call(" ".join(( + "openssl cms -verify", + "-inform DER -in /tmp/out.p7m", + "-signer cert.pem -CAfile cert.pem", + "-out /dev/null", + )), shell=True)) diff --git a/tests/test_crl.py b/tests/test_crl.py index 7368d7f..3173552 100644 --- a/tests/test_crl.py +++ b/tests/test_crl.py @@ -14,10 +14,16 @@ # You should have received a copy of the GNU Lesser General Public # License along with this program. If not, see # . -"""CRL related schemes, just to test the performance with them +"""CRL related schemas, just to test the performance with them """ +from os.path import exists +from time import time +from unittest import skipIf +from unittest import TestCase + from pyderasn import BitString +from pyderasn import encode_cer from pyderasn import Sequence from pyderasn import SequenceOf from pyderasn import tag_ctxc @@ -60,3 +66,24 @@ class CertificateList(Sequence): ("signatureAlgorithm", AlgorithmIdentifier()), ("signatureValue", BitString()), ) + +@skipIf(not exists("revoke.crl"), "CACert's revoke.crl not found") +class TestCACert(TestCase): + def test_cer(self): + with open("revoke.crl", "rb") as fd: + raw = fd.read() + print("DER read") + start = time() + crl1 = CertificateList().decod(raw) + print("DER decoded", time() - start) + start = time() + cer_raw = encode_cer(crl1) + print("CER encoded", time() - start) + start = time() + crl2 = CertificateList().decod(cer_raw, ctx={"bered": True}) + print("CER decoded", time() - start) + self.assertEqual(crl2, crl1) + start = time() + der_raw = crl2.encode() + print("DER encoded", time() - start) + self.assertSequenceEqual(der_raw, raw) diff --git a/tests/test_crts.py b/tests/test_crts.py index aaf359e..3f753e0 100644 --- a/tests/test_crts.py +++ b/tests/test_crts.py @@ -27,6 +27,7 @@ from pyderasn import Any from pyderasn import BitString from pyderasn import Boolean from pyderasn import Choice +from pyderasn import encode_cer from pyderasn import GeneralizedTime from pyderasn import hexdec from pyderasn import IA5String @@ -154,6 +155,15 @@ class UniqueIdentifier(BitString): pass +class KeyIdentifier(OctetString): + pass + + +class SubjectKeyIdentifier(KeyIdentifier): + pass + + + class Extension(Sequence): schema = ( ("extnID", ObjectIdentifier()), @@ -188,6 +198,7 @@ class Certificate(Sequence): ("signatureAlgorithm", AlgorithmIdentifier()), ("signatureValue", BitString()), ) + der_forced = True class TestGoSelfSignedVector(TestCase): @@ -347,6 +358,10 @@ class TestGoSelfSignedVector(TestCase): "998bb9a4a8cbeb34c0f0a78cf8d91ede14a5ed76bf116fe360aafa8821490435", )))) self.assertSequenceEqual(crt.encode(), raw) + self.assertEqual( + Certificate().decod(encode_cer(crt), ctx={"bered": True}), + crt, + ) class TestGoPayPalVector(TestCase): @@ -408,3 +423,7 @@ class TestGoPayPalVector(TestCase): pprint(crt) repr(crt) pickle_loads(pickle_dumps(crt, pickle_proto)) + self.assertEqual( + Certificate().decod(encode_cer(crt), ctx={"bered": True}), + crt, + ) diff --git a/tests/test_pyderasn.py b/tests/test_pyderasn.py index f160ba8..cd165b8 100644 --- a/tests/test_pyderasn.py +++ b/tests/test_pyderasn.py @@ -20,7 +20,9 @@ from copy import deepcopy from datetime import datetime from datetime import timedelta from importlib import import_module +from operator import attrgetter from os import environ +from os import urandom from random import random from string import ascii_letters from string import digits @@ -72,6 +74,7 @@ from pyderasn import BoundsError from pyderasn import Choice from pyderasn import DecodeError from pyderasn import DecodePathDefBy +from pyderasn import encode_cer from pyderasn import Enumerated from pyderasn import EOC from pyderasn import EOC_LEN @@ -599,11 +602,18 @@ class TestBoolean(CommonMixin, TestCase): pprint(obj, big_blobs=True, with_decode_path=True) self.assertFalse(obj.expled) obj_encoded = obj.encode() + self.assertSequenceEqual(encode_cer(obj), obj_encoded) obj_expled = obj(value, expl=tag_expl) self.assertTrue(obj_expled.expled) repr(obj_expled) list(obj_expled.pps()) pprint(obj_expled, big_blobs=True, with_decode_path=True) + obj_expled_cer = encode_cer(obj_expled) + self.assertNotEqual(obj_expled_cer, obj_encoded) + self.assertSequenceEqual( + obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(), + obj_expled.encode(), + ) obj_expled_hex_encoded = obj_expled.hexencode() ctx_copied = deepcopy(ctx_dummy) obj_decoded, tail = obj_expled.hexdecode( @@ -1113,12 +1123,19 @@ class TestInteger(CommonMixin, TestCase): pprint(obj, big_blobs=True, with_decode_path=True) self.assertFalse(obj.expled) obj_encoded = obj.encode() + self.assertSequenceEqual(encode_cer(obj), obj_encoded) obj_expled = obj(value, expl=tag_expl) self.assertTrue(obj_expled.expled) repr(obj_expled) list(obj_expled.pps()) pprint(obj_expled, big_blobs=True, with_decode_path=True) obj_expled_encoded = obj_expled.encode() + obj_expled_cer = encode_cer(obj_expled) + self.assertNotEqual(obj_expled_cer, obj_encoded) + self.assertSequenceEqual( + obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(), + obj_expled_encoded, + ) ctx_copied = deepcopy(ctx_dummy) obj_decoded, tail = obj_expled.decode( obj_expled_encoded + tail_junk, @@ -1522,12 +1539,19 @@ class TestBitString(CommonMixin, TestCase): pprint(obj, big_blobs=True, with_decode_path=True) self.assertFalse(obj.expled) obj_encoded = obj.encode() + self.assertSequenceEqual(encode_cer(obj), obj_encoded) obj_expled = obj(value, expl=tag_expl) self.assertTrue(obj_expled.expled) repr(obj_expled) list(obj_expled.pps()) pprint(obj_expled, big_blobs=True, with_decode_path=True) obj_expled_encoded = obj_expled.encode() + obj_expled_cer = encode_cer(obj_expled) + self.assertNotEqual(obj_expled_cer, obj_encoded) + self.assertSequenceEqual( + obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(), + obj_expled_encoded, + ) ctx_copied = deepcopy(ctx_dummy) obj_decoded, tail = obj_expled.decode( obj_expled_encoded + tail_junk, @@ -1861,6 +1885,24 @@ class TestBitString(CommonMixin, TestCase): self.assertTrue(obj.lenindef) self.assertTrue(obj.bered) + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(integers(min_value=1000, max_value=3000)) + def test_cer(self, data_len): + data = urandom(data_len) + encoded = encode_cer(BitString(data)) + ctx = {"bered": True} + self.assertSequenceEqual(bytes(BitString().decod(encoded, ctx=ctx)), data) + evgens = list(BitString().decode_evgen(encoded, ctx=ctx)) + evgens_expected = data_len // 999 + if evgens_expected * 999 != data_len: + evgens_expected += 1 + evgens_expected += 1 + self.assertEqual(len(evgens), evgens_expected) + for (_, obj, _) in evgens[:-2]: + self.assertEqual(obj.vlen, 1000) + _, obj, _ = evgens[-2] + self.assertEqual(obj.vlen, 1 + data_len - len(evgens[:-2]) * 999) + @composite def octet_string_values_strategy(draw, do_expl=False): @@ -2153,12 +2195,19 @@ class TestOctetString(CommonMixin, TestCase): pprint(obj, big_blobs=True, with_decode_path=True) self.assertFalse(obj.expled) obj_encoded = obj.encode() + self.assertSequenceEqual(encode_cer(obj), obj_encoded) obj_expled = obj(value, expl=tag_expl) self.assertTrue(obj_expled.expled) repr(obj_expled) list(obj_expled.pps()) pprint(obj_expled, big_blobs=True, with_decode_path=True) obj_expled_encoded = obj_expled.encode() + obj_expled_cer = encode_cer(obj_expled) + self.assertNotEqual(obj_expled_cer, obj_encoded) + self.assertSequenceEqual( + obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(), + obj_expled_encoded, + ) ctx_copied = deepcopy(ctx_dummy) obj_decoded, tail = obj_expled.decode( obj_expled_encoded + tail_junk, @@ -2349,6 +2398,24 @@ class TestOctetString(CommonMixin, TestCase): self.assertEqual(err.exception.decode_path, decode_path + (str(chunks),)) self.assertEqual(err.exception.offset, offset + 1 + 1 + chunks * len(bs)) + @settings(max_examples=LONG_TEST_MAX_EXAMPLES) + @given(integers(min_value=1001, max_value=3000)) + def test_cer(self, data_len): + data = urandom(data_len) + encoded = encode_cer(OctetString(data)) + ctx = {"bered": True} + self.assertSequenceEqual(bytes(OctetString().decod(encoded, ctx=ctx)), data) + evgens = list(OctetString().decode_evgen(encoded, ctx=ctx)) + evgens_expected = data_len // 1000 + if evgens_expected * 1000 != data_len: + evgens_expected += 1 + evgens_expected += 1 + self.assertEqual(len(evgens), evgens_expected) + for (_, obj, _) in evgens[:-2]: + self.assertEqual(obj.vlen, 1000) + _, obj, _ = evgens[-2] + self.assertEqual(obj.vlen, data_len - len(evgens[:-2]) * 1000) + @composite def null_values_strategy(draw, do_expl=False): @@ -2498,12 +2565,19 @@ class TestNull(CommonMixin, TestCase): pprint(obj, big_blobs=True, with_decode_path=True) self.assertFalse(obj.expled) obj_encoded = obj.encode() + self.assertSequenceEqual(encode_cer(obj), obj_encoded) obj_expled = obj(expl=tag_expl) self.assertTrue(obj_expled.expled) repr(obj_expled) list(obj_expled.pps()) pprint(obj_expled, big_blobs=True, with_decode_path=True) obj_expled_encoded = obj_expled.encode() + obj_expled_cer = encode_cer(obj_expled) + self.assertNotEqual(obj_expled_cer, obj_encoded) + self.assertSequenceEqual( + obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(), + obj_expled_encoded, + ) ctx_copied = deepcopy(ctx_dummy) obj_decoded, tail = obj_expled.decode( obj_expled_encoded + tail_junk, @@ -2857,12 +2931,19 @@ class TestObjectIdentifier(CommonMixin, TestCase): pprint(obj, big_blobs=True, with_decode_path=True) self.assertFalse(obj.expled) obj_encoded = obj.encode() + self.assertSequenceEqual(encode_cer(obj), obj_encoded) obj_expled = obj(value, expl=tag_expl) self.assertTrue(obj_expled.expled) repr(obj_expled) list(obj_expled.pps()) pprint(obj_expled, big_blobs=True, with_decode_path=True) obj_expled_encoded = obj_expled.encode() + obj_expled_cer = encode_cer(obj_expled) + self.assertNotEqual(obj_expled_cer, obj_encoded) + self.assertSequenceEqual( + obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(), + obj_expled_encoded, + ) ctx_copied = deepcopy(ctx_dummy) obj_decoded, tail = obj_expled.decode( obj_expled_encoded + tail_junk, @@ -5956,6 +6037,12 @@ class SeqMixing(object): pprint(seq, big_blobs=True, with_decode_path=True) self.assertTrue(seq.ready) seq_encoded = seq.encode() + seq_encoded_cer = encode_cer(seq) + self.assertNotEqual(seq_encoded_cer, seq_encoded) + self.assertSequenceEqual( + seq.decod(seq_encoded_cer, ctx={"bered": True}).encode(), + seq_encoded, + ) seq_decoded, tail = seq.decode(seq_encoded + tail_junk) self.assertFalse(seq_decoded.lenindef) self.assertFalse(seq_decoded.ber_encoded) @@ -6671,6 +6758,12 @@ class SeqOfMixing(object): pprint(obj, big_blobs=True, with_decode_path=True) self.assertFalse(obj.expled) obj_encoded = obj.encode() + obj_encoded_cer = encode_cer(obj) + self.assertNotEqual(obj_encoded_cer, obj_encoded) + self.assertSequenceEqual( + obj.decod(obj_encoded_cer, ctx={"bered": True}).encode(), + obj_encoded, + ) obj_expled = obj(value, expl=tag_expl) self.assertTrue(obj_expled.expled) repr(obj_expled) @@ -7478,3 +7571,49 @@ class TestPickleDifferentVersion(TestCase): pickle_loads(pickled) pyderasn.__version__ = version_orig pickle_loads(pickled) + + +class TestCERSetOrdering(TestCase): + def test_vectors(self): + """Taken from X.690-201508 + """ + class B(Choice): + schema = ( + ("c", Integer(impl=tag_ctxp(2))), + ("d", Integer(impl=tag_ctxp(4))), + ) + + class F(Choice): + schema = ( + ("g", Integer(impl=tag_ctxp(5))), + ("h", Integer(impl=tag_ctxp(6))), + ) + + class I(Choice): + schema = ( + ("j", Integer(impl=tag_ctxp(0))), + ) + + class E(Choice): + schema = ( + ("f", F()), + ("i", I()), + ) + + class A(Set): + schema = ( + ("a", Integer(impl=tag_ctxp(3))), + ("b", B(expl=tag_ctxc(1))), + ("e", E()), + ) + + a = A(( + ("a", Integer(123)), + ("b", B(("d", Integer(234)))), + ("e", E(("f", F(("g", Integer(345)))))), + )) + order = sorted(a._values_for_encoding(), key=attrgetter("tag_order_cer")) + self.assertSequenceEqual( + [i.__class__.__name__ for i in order], + ("E", "B", "Integer"), + )