X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=blobdiff_plain;f=tests%2Ftest_cms.py;fp=tests%2Ftest_cms.py;h=51bdaf1cbf26adb3d2843d9481d8c36766032eb3;hp=faa52ab4aac0a0d11695c04e3c9d3bf4e0e883ee;hb=659eb0e090ad8c9209c4e5b030806125509844f9;hpb=d16d490552c084fb018ba8eb077c94d8b2d9326d diff --git a/tests/test_cms.py b/tests/test_cms.py index faa52ab..51bdaf1 100644 --- a/tests/test_cms.py +++ b/tests/test_cms.py @@ -22,6 +22,7 @@ from os import environ from os import remove from os import urandom from subprocess import call +from sys import getsizeof from tempfile import NamedTemporaryFile from time import time from unittest import skipIf @@ -276,6 +277,10 @@ class TestSignedDataCERWithOpenSSL(TestCase): ))))), )) cms_path = self.tmpfile() + _, state = ci.encode1st() + with io_open(cms_path, "wb") as fd: + ci.encode2nd(fd.write, iter(state)) + self.verify(cert_path, cms_path) with io_open(cms_path, "wb") as fd: ci.encode_cer(fd.write) self.verify(cert_path, cms_path) @@ -290,17 +295,7 @@ class TestSignedDataCERWithOpenSSL(TestCase): agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, buf.write) self.assertSequenceEqual(buf.getvalue(), data) - @skipIf(PY2, "no mmaped memoryview support in PY2") - @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set") - def test_huge(self): - """Huge CMS test - - Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs - data to sign. Pay attention that openssl cms is unable to do - stream verification and eats huge amounts (several times more, - that CMS itself) of memory. - """ - key_path, cert_path, cert, skid = self.keypair() + def create_huge_file(self): rnd = urandom(1<<20) data_path = self.tmpfile() start = time() @@ -309,10 +304,21 @@ class TestSignedDataCERWithOpenSSL(TestCase): # dgst.update(rnd) fd.write(rnd) print("data file written", time() - start) - data_fd = open(data_path, "rb") - data_raw = file_mmaped(data_fd) + return file_mmaped(open(data_path, "rb")) - from sys import getallocatedblocks + @skipIf(PY2, "no mmaped memoryview support in PY2") + @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set") + def test_huge_cer(self): + """Huge CMS test + + Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs + data to sign. Pay attention that openssl cms is unable to do + stream verification and eats huge amounts (several times more, + than CMS itself) of memory. + """ + data_raw = self.create_huge_file() + key_path, cert_path, cert, skid = self.keypair() + from sys import getallocatedblocks # PY2 does not have it mem_start = getallocatedblocks() start = time() eci = EncapsulatedContentInfo(( @@ -376,3 +382,61 @@ class TestSignedDataCERWithOpenSSL(TestCase): ci.encode_cer(fd.write) print("CMS written", time() - start) self.verify(cert_path, cms_path) + + @skipIf(PY2, "no mmaped memoryview support in PY2") + @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set") + def test_huge_der_2pass(self): + """Same test as above, but 2pass DER encoder and just signature verification + """ + data_raw = self.create_huge_file() + key_path, cert_path, cert, skid = self.keypair() + from sys import getallocatedblocks + mem_start = getallocatedblocks() + dgst = sha512(data_raw).digest() + start = time() + eci = EncapsulatedContentInfo(( + ("eContentType", ContentType(id_data)), + ("eContent", OctetString(data_raw)), + )) + signed_attrs = SignedAttributes([ + Attribute(( + ("attrType", id_pkcs9_at_contentType), + ("attrValues", AttributeValues([AttributeValue(id_data)])), + )), + Attribute(( + ("attrType", id_pkcs9_at_messageDigest), + ("attrValues", AttributeValues([AttributeValue(OctetString(dgst))])), + )), + ]) + signature = self.sign(signed_attrs, key_path) + self.assertLess(getallocatedblocks(), mem_start * 2) + start = time() + ci = ContentInfo(( + ("contentType", ContentType(id_signedData)), + ("content", Any((SignedData(( + ("version", CMSVersion("v3")), + ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha512])), + ("encapContentInfo", eci), + ("certificates", CertificateSet([ + CertificateChoices(("certificate", cert)), + ])), + ("signerInfos", SignerInfos([SignerInfo(( + ("version", CMSVersion("v3")), + ("sid", SignerIdentifier(("subjectKeyIdentifier", skid))), + ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha512)), + ("signedAttrs", signed_attrs), + ("signatureAlgorithm", SignatureAlgorithmIdentifier(( + ("algorithm", id_ecdsa_with_SHA512), + ))), + ("signature", SignatureValue(signature)), + ))])), + ))))), + )) + _, state = ci.encode1st() + print("2pass state size", getsizeof(state)) + cms_path = self.tmpfile() + with io_open(cms_path, "wb") as fd: + ci.encode2nd(fd.write, iter(state)) + print("CMS written", time() - start) + self.assertLess(getallocatedblocks(), mem_start * 2) + self.verify(cert_path, cms_path)