]> Cypherpunks.ru repositories - pyderasn.git/blobdiff - tests/test_cms.py
Raise copyright years
[pyderasn.git] / tests / test_cms.py
index ac9726fa6127a17952583a67741e9dc96d7ab018..fb455803925ec43e01e8674b2ecde49862e42bd4 100644 (file)
@@ -1,6 +1,6 @@
 # coding: utf-8
 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
-# Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
+# Copyright (C) 2017-2024 Sergey Matveev <stargrave@stargrave.org>
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as
@@ -22,6 +22,7 @@ from os import environ
 from os import remove
 from os import urandom
 from subprocess import call
+from sys import getsizeof
 from tempfile import NamedTemporaryFile
 from time import time
 from unittest import skipIf
@@ -30,8 +31,6 @@ from unittest import TestCase
 from hypothesis import given
 from hypothesis import settings
 from hypothesis.strategies import integers
-from six import PY2
-from six.moves import xrange as six_xrange
 
 from pyderasn import agg_octet_string
 from pyderasn import Any
@@ -82,7 +81,7 @@ class SignatureAlgorithmIdentifier(AlgorithmIdentifier):
 
 class SignedAttributes(SetOf):
     schema = Attribute()
-    bounds = (1, 32)
+    bounds = (1, float("+inf"))
     der_forced = True
 
 
@@ -276,11 +275,15 @@ class TestSignedDataCERWithOpenSSL(TestCase):
             ))))),
         ))
         cms_path = self.tmpfile()
+        _, state = ci.encode1st()
         with io_open(cms_path, "wb") as fd:
-            ci.encode_cer(writer=fd.write)
+            ci.encode2nd(fd.write, iter(state))
+        self.verify(cert_path, cms_path)
+        with io_open(cms_path, "wb") as fd:
+            ci.encode_cer(fd.write)
         self.verify(cert_path, cms_path)
         fd = open(cms_path, "rb")
-        raw = memoryview(fd.read()) if PY2 else file_mmaped(fd)
+        raw = file_mmaped(fd)
         ctx = {"bered": True}
         for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
             if decode_path == ("content",):
@@ -289,29 +292,30 @@ class TestSignedDataCERWithOpenSSL(TestCase):
         buf = BytesIO()
         agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, buf.write)
         self.assertSequenceEqual(buf.getvalue(), data)
+        fd.close()
 
-    @skipIf(PY2, "no mmaped memoryview support in PY2")
-    @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set")
-    def test_huge(self):
-        """Huge CMS test
-
-        Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs
-        data to sign. Pay attention that openssl cms is unable to do
-        stream verification and eats huge amounts (several times more,
-        that CMS itself) of memory.
-        """
-        key_path, cert_path, cert, skid = self.keypair()
+    def create_huge_file(self):
         rnd = urandom(1<<20)
         data_path = self.tmpfile()
         start = time()
         with open(data_path, "wb") as fd:
-            for _ in six_xrange(int(environ.get("PYDERASN_TEST_CMS_HUGE"))):
+            for _ in range(int(environ.get("PYDERASN_TEST_CMS_HUGE"))):
                 # dgst.update(rnd)
                 fd.write(rnd)
         print("data file written", time() - start)
-        data_fd = open(data_path, "rb")
-        data_raw = file_mmaped(data_fd)
+        return file_mmaped(open(data_path, "rb"))
+
+    @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set")
+    def test_huge_cer(self):
+        """Huge CMS test
 
+        Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs
+        data to sign. Pay attention that openssl cms is unable to do
+        stream verification and eats huge amounts (several times more,
+        than CMS itself) of memory.
+        """
+        data_raw = self.create_huge_file()
+        key_path, cert_path, cert, skid = self.keypair()
         from sys import getallocatedblocks
         mem_start = getallocatedblocks()
         start = time()
@@ -321,7 +325,7 @@ class TestSignedDataCERWithOpenSSL(TestCase):
         ))
         eci_path = self.tmpfile()
         with open(eci_path, "wb") as fd:
-            OctetString(eci["eContent"]).encode_cer(writer=fd.write)
+            OctetString(eci["eContent"]).encode_cer(fd.write)
         print("ECI file written", time() - start)
         eci_fd = open(eci_path, "rb")
         eci_raw = file_mmaped(eci_fd)
@@ -373,6 +377,64 @@ class TestSignedDataCERWithOpenSSL(TestCase):
         ))
         cms_path = self.tmpfile()
         with io_open(cms_path, "wb") as fd:
-            ci.encode_cer(writer=fd.write)
+            ci.encode_cer(fd.write)
+        print("CMS written", time() - start)
+        self.verify(cert_path, cms_path)
+        eci_fd.close()
+
+    @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set")
+    def test_huge_der_2pass(self):
+        """Same test as above, but 2pass DER encoder and just signature verification
+        """
+        data_raw = self.create_huge_file()
+        key_path, cert_path, cert, skid = self.keypair()
+        from sys import getallocatedblocks
+        mem_start = getallocatedblocks()
+        dgst = sha512(data_raw).digest()
+        start = time()
+        eci = EncapsulatedContentInfo((
+            ("eContentType", ContentType(id_data)),
+            ("eContent", OctetString(data_raw)),
+        ))
+        signed_attrs = SignedAttributes([
+            Attribute((
+                ("attrType", id_pkcs9_at_contentType),
+                ("attrValues", AttributeValues([AttributeValue(id_data)])),
+            )),
+            Attribute((
+                ("attrType", id_pkcs9_at_messageDigest),
+                ("attrValues", AttributeValues([AttributeValue(OctetString(dgst))])),
+            )),
+        ])
+        signature = self.sign(signed_attrs, key_path)
+        self.assertLess(getallocatedblocks(), mem_start * 2)
+        start = time()
+        ci = ContentInfo((
+            ("contentType", ContentType(id_signedData)),
+            ("content", Any((SignedData((
+                ("version", CMSVersion("v3")),
+                ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha512])),
+                ("encapContentInfo", eci),
+                ("certificates", CertificateSet([
+                    CertificateChoices(("certificate", cert)),
+                ])),
+                ("signerInfos", SignerInfos([SignerInfo((
+                    ("version", CMSVersion("v3")),
+                    ("sid", SignerIdentifier(("subjectKeyIdentifier", skid))),
+                    ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha512)),
+                    ("signedAttrs", signed_attrs),
+                    ("signatureAlgorithm", SignatureAlgorithmIdentifier((
+                        ("algorithm", id_ecdsa_with_SHA512),
+                    ))),
+                    ("signature", SignatureValue(signature)),
+                ))])),
+            ))))),
+        ))
+        _, state = ci.encode1st()
+        print("2pass state size", getsizeof(state))
+        cms_path = self.tmpfile()
+        with io_open(cms_path, "wb") as fd:
+            ci.encode2nd(fd.write, iter(state))
         print("CMS written", time() - start)
+        self.assertLess(getallocatedblocks(), mem_start * 2)
         self.verify(cert_path, cms_path)