# coding: utf-8
# PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
-# Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
+# Copyright (C) 2017-2023 Sergey Matveev <stargrave@stargrave.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
from os import remove
from os import urandom
from subprocess import call
+from sys import getsizeof
from tempfile import NamedTemporaryFile
from time import time
from unittest import skipIf
from hypothesis import given
from hypothesis import settings
from hypothesis.strategies import integers
-from six import PY2
-from six.moves import xrange as six_xrange
from pyderasn import agg_octet_string
from pyderasn import Any
class SignedAttributes(SetOf):
schema = Attribute()
- bounds = (1, 32)
+ bounds = (1, float("+inf"))
der_forced = True
))))),
))
cms_path = self.tmpfile()
+ _, state = ci.encode1st()
+ with io_open(cms_path, "wb") as fd:
+ ci.encode2nd(fd.write, iter(state))
+ self.verify(cert_path, cms_path)
with io_open(cms_path, "wb") as fd:
ci.encode_cer(fd.write)
self.verify(cert_path, cms_path)
fd = open(cms_path, "rb")
- raw = memoryview(fd.read()) if PY2 else file_mmaped(fd)
+ raw = file_mmaped(fd)
ctx = {"bered": True}
for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
if decode_path == ("content",):
buf = BytesIO()
agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, buf.write)
self.assertSequenceEqual(buf.getvalue(), data)
+ fd.close()
- @skipIf(PY2, "no mmaped memoryview support in PY2")
- @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set")
- def test_huge(self):
- """Huge CMS test
-
- Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs
- data to sign. Pay attention that openssl cms is unable to do
- stream verification and eats huge amounts (several times more,
- that CMS itself) of memory.
- """
- key_path, cert_path, cert, skid = self.keypair()
+ def create_huge_file(self):
rnd = urandom(1<<20)
data_path = self.tmpfile()
start = time()
with open(data_path, "wb") as fd:
- for _ in six_xrange(int(environ.get("PYDERASN_TEST_CMS_HUGE"))):
+ for _ in range(int(environ.get("PYDERASN_TEST_CMS_HUGE"))):
# dgst.update(rnd)
fd.write(rnd)
print("data file written", time() - start)
- data_fd = open(data_path, "rb")
- data_raw = file_mmaped(data_fd)
+ return file_mmaped(open(data_path, "rb"))
+
+ @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set")
+ def test_huge_cer(self):
+ """Huge CMS test
+ Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs
+ data to sign. Pay attention that openssl cms is unable to do
+ stream verification and eats huge amounts (several times more,
+ than CMS itself) of memory.
+ """
+ data_raw = self.create_huge_file()
+ key_path, cert_path, cert, skid = self.keypair()
from sys import getallocatedblocks
mem_start = getallocatedblocks()
start = time()
ci.encode_cer(fd.write)
print("CMS written", time() - start)
self.verify(cert_path, cms_path)
+ eci_fd.close()
+
+ @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set")
+ def test_huge_der_2pass(self):
+ """Same test as above, but 2pass DER encoder and just signature verification
+ """
+ data_raw = self.create_huge_file()
+ key_path, cert_path, cert, skid = self.keypair()
+ from sys import getallocatedblocks
+ mem_start = getallocatedblocks()
+ dgst = sha512(data_raw).digest()
+ start = time()
+ eci = EncapsulatedContentInfo((
+ ("eContentType", ContentType(id_data)),
+ ("eContent", OctetString(data_raw)),
+ ))
+ signed_attrs = SignedAttributes([
+ Attribute((
+ ("attrType", id_pkcs9_at_contentType),
+ ("attrValues", AttributeValues([AttributeValue(id_data)])),
+ )),
+ Attribute((
+ ("attrType", id_pkcs9_at_messageDigest),
+ ("attrValues", AttributeValues([AttributeValue(OctetString(dgst))])),
+ )),
+ ])
+ signature = self.sign(signed_attrs, key_path)
+ self.assertLess(getallocatedblocks(), mem_start * 2)
+ start = time()
+ ci = ContentInfo((
+ ("contentType", ContentType(id_signedData)),
+ ("content", Any((SignedData((
+ ("version", CMSVersion("v3")),
+ ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha512])),
+ ("encapContentInfo", eci),
+ ("certificates", CertificateSet([
+ CertificateChoices(("certificate", cert)),
+ ])),
+ ("signerInfos", SignerInfos([SignerInfo((
+ ("version", CMSVersion("v3")),
+ ("sid", SignerIdentifier(("subjectKeyIdentifier", skid))),
+ ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha512)),
+ ("signedAttrs", signed_attrs),
+ ("signatureAlgorithm", SignatureAlgorithmIdentifier((
+ ("algorithm", id_ecdsa_with_SHA512),
+ ))),
+ ("signature", SignatureValue(signature)),
+ ))])),
+ ))))),
+ ))
+ _, state = ci.encode1st()
+ print("2pass state size", getsizeof(state))
+ cms_path = self.tmpfile()
+ with io_open(cms_path, "wb") as fd:
+ ci.encode2nd(fd.write, iter(state))
+ print("CMS written", time() - start)
+ self.assertLess(getallocatedblocks(), mem_start * 2)
+ self.verify(cert_path, cms_path)