X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=blobdiff_plain;f=tests%2Ftest_cms.py;h=fb455803925ec43e01e8674b2ecde49862e42bd4;hp=ac9726fa6127a17952583a67741e9dc96d7ab018;hb=HEAD;hpb=42198ee69940c96930d81533ecf9cec87d34b27b diff --git a/tests/test_cms.py b/tests/test_cms.py index ac9726f..fb45580 100644 --- a/tests/test_cms.py +++ b/tests/test_cms.py @@ -1,6 +1,6 @@ # coding: utf-8 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures -# Copyright (C) 2017-2020 Sergey Matveev +# Copyright (C) 2017-2024 Sergey Matveev # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as @@ -22,6 +22,7 @@ from os import environ from os import remove from os import urandom from subprocess import call +from sys import getsizeof from tempfile import NamedTemporaryFile from time import time from unittest import skipIf @@ -30,8 +31,6 @@ from unittest import TestCase from hypothesis import given from hypothesis import settings from hypothesis.strategies import integers -from six import PY2 -from six.moves import xrange as six_xrange from pyderasn import agg_octet_string from pyderasn import Any @@ -82,7 +81,7 @@ class SignatureAlgorithmIdentifier(AlgorithmIdentifier): class SignedAttributes(SetOf): schema = Attribute() - bounds = (1, 32) + bounds = (1, float("+inf")) der_forced = True @@ -276,11 +275,15 @@ class TestSignedDataCERWithOpenSSL(TestCase): ))))), )) cms_path = self.tmpfile() + _, state = ci.encode1st() with io_open(cms_path, "wb") as fd: - ci.encode_cer(writer=fd.write) + ci.encode2nd(fd.write, iter(state)) + self.verify(cert_path, cms_path) + with io_open(cms_path, "wb") as fd: + ci.encode_cer(fd.write) self.verify(cert_path, cms_path) fd = open(cms_path, "rb") - raw = memoryview(fd.read()) if PY2 else file_mmaped(fd) + raw = file_mmaped(fd) ctx = {"bered": True} for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx): if decode_path == ("content",): @@ -289,29 +292,30 @@ class TestSignedDataCERWithOpenSSL(TestCase): buf = BytesIO() agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, buf.write) self.assertSequenceEqual(buf.getvalue(), data) + fd.close() - @skipIf(PY2, "no mmaped memoryview support in PY2") - @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set") - def test_huge(self): - """Huge CMS test - - Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs - data to sign. Pay attention that openssl cms is unable to do - stream verification and eats huge amounts (several times more, - that CMS itself) of memory. - """ - key_path, cert_path, cert, skid = self.keypair() + def create_huge_file(self): rnd = urandom(1<<20) data_path = self.tmpfile() start = time() with open(data_path, "wb") as fd: - for _ in six_xrange(int(environ.get("PYDERASN_TEST_CMS_HUGE"))): + for _ in range(int(environ.get("PYDERASN_TEST_CMS_HUGE"))): # dgst.update(rnd) fd.write(rnd) print("data file written", time() - start) - data_fd = open(data_path, "rb") - data_raw = file_mmaped(data_fd) + return file_mmaped(open(data_path, "rb")) + + @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set") + def test_huge_cer(self): + """Huge CMS test + Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs + data to sign. Pay attention that openssl cms is unable to do + stream verification and eats huge amounts (several times more, + than CMS itself) of memory. + """ + data_raw = self.create_huge_file() + key_path, cert_path, cert, skid = self.keypair() from sys import getallocatedblocks mem_start = getallocatedblocks() start = time() @@ -321,7 +325,7 @@ class TestSignedDataCERWithOpenSSL(TestCase): )) eci_path = self.tmpfile() with open(eci_path, "wb") as fd: - OctetString(eci["eContent"]).encode_cer(writer=fd.write) + OctetString(eci["eContent"]).encode_cer(fd.write) print("ECI file written", time() - start) eci_fd = open(eci_path, "rb") eci_raw = file_mmaped(eci_fd) @@ -373,6 +377,64 @@ class TestSignedDataCERWithOpenSSL(TestCase): )) cms_path = self.tmpfile() with io_open(cms_path, "wb") as fd: - ci.encode_cer(writer=fd.write) + ci.encode_cer(fd.write) + print("CMS written", time() - start) + self.verify(cert_path, cms_path) + eci_fd.close() + + @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set") + def test_huge_der_2pass(self): + """Same test as above, but 2pass DER encoder and just signature verification + """ + data_raw = self.create_huge_file() + key_path, cert_path, cert, skid = self.keypair() + from sys import getallocatedblocks + mem_start = getallocatedblocks() + dgst = sha512(data_raw).digest() + start = time() + eci = EncapsulatedContentInfo(( + ("eContentType", ContentType(id_data)), + ("eContent", OctetString(data_raw)), + )) + signed_attrs = SignedAttributes([ + Attribute(( + ("attrType", id_pkcs9_at_contentType), + ("attrValues", AttributeValues([AttributeValue(id_data)])), + )), + Attribute(( + ("attrType", id_pkcs9_at_messageDigest), + ("attrValues", AttributeValues([AttributeValue(OctetString(dgst))])), + )), + ]) + signature = self.sign(signed_attrs, key_path) + self.assertLess(getallocatedblocks(), mem_start * 2) + start = time() + ci = ContentInfo(( + ("contentType", ContentType(id_signedData)), + ("content", Any((SignedData(( + ("version", CMSVersion("v3")), + ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha512])), + ("encapContentInfo", eci), + ("certificates", CertificateSet([ + CertificateChoices(("certificate", cert)), + ])), + ("signerInfos", SignerInfos([SignerInfo(( + ("version", CMSVersion("v3")), + ("sid", SignerIdentifier(("subjectKeyIdentifier", skid))), + ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha512)), + ("signedAttrs", signed_attrs), + ("signatureAlgorithm", SignatureAlgorithmIdentifier(( + ("algorithm", id_ecdsa_with_SHA512), + ))), + ("signature", SignatureValue(signature)), + ))])), + ))))), + )) + _, state = ci.encode1st() + print("2pass state size", getsizeof(state)) + cms_path = self.tmpfile() + with io_open(cms_path, "wb") as fd: + ci.encode2nd(fd.write, iter(state)) print("CMS written", time() - start) + self.assertLess(getallocatedblocks(), mem_start * 2) self.verify(cert_path, cms_path)