+
+ @skipIf(PY2, "no mmaped memoryview support in PY2")
+ @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set")
+ def test_huge(self):
+ """Huge CMS test
+
+ Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs
+ data to sign. Pay attention that openssl cms is unable to do
+ stream verification and eats huge amounts (several times more,
+ that CMS itself) of memory.
+ """
+ key_path, cert_path, cert, skid = self.keypair()
+ rnd = urandom(1<<20)
+ data_path = self.tmpfile()
+ start = time()
+ with open(data_path, "wb") as fd:
+ for _ in six_xrange(int(environ.get("PYDERASN_TEST_CMS_HUGE"))):
+ # dgst.update(rnd)
+ fd.write(rnd)
+ print("data file written", time() - start)
+ data_fd = open(data_path, "rb")
+ data_raw = file_mmaped(data_fd)
+
+ from sys import getallocatedblocks
+ mem_start = getallocatedblocks()
+ start = time()
+ eci = EncapsulatedContentInfo((
+ ("eContentType", ContentType(id_data)),
+ ("eContent", OctetString(data_raw)),
+ ))
+ eci_path = self.tmpfile()
+ with open(eci_path, "wb") as fd:
+ OctetString(eci["eContent"]).encode_cer(writer=fd.write)
+ print("ECI file written", time() - start)
+ eci_fd = open(eci_path, "rb")
+ eci_raw = file_mmaped(eci_fd)
+
+ start = time()
+ dgst = sha512()
+ def hasher(data):
+ dgst.update(data)
+ return len(data)
+ evgens = OctetString().decode_evgen(eci_raw, ctx={"bered": True})
+ agg_octet_string(evgens, (), eci_raw, hasher)
+ dgst = dgst.digest()
+ print("digest calculated", time() - start)
+
+ signed_attrs = SignedAttributes([
+ Attribute((
+ ("attrType", id_pkcs9_at_contentType),
+ ("attrValues", AttributeValues([AttributeValue(id_data)])),
+ )),
+ Attribute((
+ ("attrType", id_pkcs9_at_messageDigest),
+ ("attrValues", AttributeValues([AttributeValue(OctetString(dgst))])),
+ )),
+ ])
+ signature = self.sign(signed_attrs, key_path)
+
+ self.assertLess(getallocatedblocks(), mem_start * 2)
+ start = time()
+ ci = ContentInfo((
+ ("contentType", ContentType(id_signedData)),
+ ("content", Any((SignedData((
+ ("version", CMSVersion("v3")),
+ ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha512])),
+ ("encapContentInfo", eci),
+ ("certificates", CertificateSet([
+ CertificateChoices(("certificate", cert)),
+ ])),
+ ("signerInfos", SignerInfos([SignerInfo((
+ ("version", CMSVersion("v3")),
+ ("sid", SignerIdentifier(("subjectKeyIdentifier", skid))),
+ ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha512)),
+ ("signedAttrs", signed_attrs),
+ ("signatureAlgorithm", SignatureAlgorithmIdentifier((
+ ("algorithm", id_ecdsa_with_SHA512),
+ ))),
+ ("signature", SignatureValue(signature)),
+ ))])),
+ ))))),
+ ))
+ cms_path = self.tmpfile()
+ with io_open(cms_path, "wb") as fd:
+ ci.encode_cer(writer=fd.write)
+ print("CMS written", time() - start)
+ self.verify(cert_path, cms_path)