# coding: utf-8 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures # Copyright (C) 2017-2020 Sergey Matveev # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as # published by the Free Software Foundation, version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program. If not, see # . from hashlib import sha512 from io import BytesIO from io import open as io_open from os import environ from os import remove from os import urandom from subprocess import call from tempfile import NamedTemporaryFile from time import time from unittest import skipIf from unittest import TestCase from hypothesis import given from hypothesis import settings from hypothesis.strategies import integers from six import PY2 from six.moves import xrange as six_xrange from pyderasn import agg_octet_string from pyderasn import Any from pyderasn import Choice from pyderasn import encode_cer from pyderasn import file_mmaped from pyderasn import Integer from pyderasn import ObjectIdentifier from pyderasn import OctetString from pyderasn import Sequence from pyderasn import SetOf from pyderasn import tag_ctxc from pyderasn import tag_ctxp from tests.test_crts import AlgorithmIdentifier from tests.test_crts import Certificate from tests.test_crts import SubjectKeyIdentifier class CMSVersion(Integer): schema = ( ("v0", 0), ("v1", 1), ("v2", 2), ("v3", 3), ("v4", 4), ("v5", 5), ) class AttributeValue(Any): pass class AttributeValues(SetOf): schema = AttributeValue() class Attribute(Sequence): schema = ( ("attrType", ObjectIdentifier()), ("attrValues", AttributeValues()), ) class SignatureAlgorithmIdentifier(AlgorithmIdentifier): pass class SignedAttributes(SetOf): schema = Attribute() bounds = (1, 32) der_forced = True class SignerIdentifier(Choice): schema = ( # ("issuerAndSerialNumber", IssuerAndSerialNumber()), ("subjectKeyIdentifier", SubjectKeyIdentifier(impl=tag_ctxp(0))), ) class DigestAlgorithmIdentifiers(SetOf): schema = AlgorithmIdentifier() class DigestAlgorithmIdentifier(AlgorithmIdentifier): pass class SignatureValue(OctetString): pass class SignerInfo(Sequence): schema = ( ("version", CMSVersion()), ("sid", SignerIdentifier()), ("digestAlgorithm", DigestAlgorithmIdentifier()), ("signedAttrs", SignedAttributes(impl=tag_ctxc(0), optional=True)), ("signatureAlgorithm", SignatureAlgorithmIdentifier()), ("signature", SignatureValue()), # ("unsignedAttrs", UnsignedAttributes(impl=tag_ctxc(1), optional=True)), ) class SignerInfos(SetOf): schema = SignerInfo() class ContentType(ObjectIdentifier): pass class EncapsulatedContentInfo(Sequence): schema = ( ("eContentType", ContentType()), ("eContent", OctetString(expl=tag_ctxc(0), optional=True)), ) class CertificateChoices(Choice): schema = ( ('certificate', Certificate()), # ... ) class CertificateSet(SetOf): schema = CertificateChoices() class SignedData(Sequence): schema = ( ("version", CMSVersion()), ("digestAlgorithms", DigestAlgorithmIdentifiers()), ("encapContentInfo", EncapsulatedContentInfo()), ("certificates", CertificateSet(impl=tag_ctxc(0), optional=True)), # ("crls", RevocationInfoChoices(impl=tag_ctxc(1), optional=True)), ("signerInfos", SignerInfos()), ) class ContentInfo(Sequence): schema = ( ("contentType", ContentType()), ("content", Any(expl=tag_ctxc(0))), ) id_signedData = ObjectIdentifier("1.2.840.113549.1.7.2") id_sha512 = ObjectIdentifier("2.16.840.") id_data = ObjectIdentifier("1.2.840.113549.1.7.1") id_ecdsa_with_SHA512 = ObjectIdentifier("1.2.840.10045.4.3.4") id_pkcs9_at_contentType = ObjectIdentifier("1.2.840.113549.1.9.3") id_pkcs9_at_messageDigest = ObjectIdentifier("1.2.840.113549.1.9.4") id_ce_subjectKeyIdentifier = ObjectIdentifier("") ai_sha512 = AlgorithmIdentifier((("algorithm", id_sha512),)) openssl_cms_exists = call("openssl cms -help 2>/dev/null", shell=True) == 0 @skipIf(not openssl_cms_exists, "openssl cms command not found") class TestSignedDataCERWithOpenSSL(TestCase): def tmpfile(self): tmp = NamedTemporaryFile(delete=False) tmp.close() self.addCleanup(lambda: remove(tmp.name)) return tmp.name def keypair(self): key_path = self.tmpfile() self.assertEqual(0, call( "openssl ecparam -name secp521r1 -genkey -out " + key_path, shell=True, )) cert_path = self.tmpfile() self.assertEqual(0, call(" ".join(( "openssl req -x509 -new", ("-key " + key_path), ("-outform PEM -out " + cert_path), "-nodes -subj /CN=pyderasntest", )), shell=True)) cert_der_path = self.tmpfile() self.assertEqual(0, call(" ".join(( "openssl x509", "-inform PEM -in " + cert_path, "-outform DER -out " + cert_der_path, )), shell=True)) self.assertEqual(0, call("cat %s >> %s" % (key_path, cert_path), shell=True)) with open(cert_der_path, "rb") as fd: cert = Certificate().decod(fd.read()) for ext in cert["tbsCertificate"]["extensions"]: if ext["extnID"] == id_ce_subjectKeyIdentifier: skid = SubjectKeyIdentifier().decod(bytes(ext["extnValue"])) return key_path, cert_path, cert, skid def sign(self, signed_attrs, key_path): input_path = self.tmpfile() with open(input_path, "wb") as fd: fd.write(encode_cer(signed_attrs)) signature_path = self.tmpfile() self.assertEqual(0, call(" ".join(( "openssl dgst -sha512", ("-sign " + key_path), "-binary", input_path, ("> " + signature_path), )), shell=True)) with open(signature_path, "rb") as fd: signature = fd.read() return signature def verify(self, cert_path, cms_path): self.assertEqual(0, call(" ".join(( "openssl cms -verify", ("-inform DER -in " + cms_path), "-signer %s -CAfile %s" % (cert_path, cert_path), "-out /dev/null 2>/dev/null", )), shell=True)) @settings(deadline=None) @given(integers(min_value=1000, max_value=5000)) def test_simple(self, data_len): key_path, cert_path, cert, skid = self.keypair() data = urandom(data_len) eci = EncapsulatedContentInfo(( ("eContentType", ContentType(id_data)), ("eContent", OctetString(data)), )) signed_attrs = SignedAttributes([ Attribute(( ("attrType", id_pkcs9_at_contentType), ("attrValues", AttributeValues([AttributeValue(id_data)])), )), Attribute(( ("attrType", id_pkcs9_at_messageDigest), ("attrValues", AttributeValues([ AttributeValue(OctetString( sha512(bytes(eci["eContent"])).digest() )), ])), )), ]) signature = self.sign(signed_attrs, key_path) ci = ContentInfo(( ("contentType", ContentType(id_signedData)), ("content", Any((SignedData(( ("version", CMSVersion("v3")), ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha512])), ("encapContentInfo", eci), ("certificates", CertificateSet([ CertificateChoices(("certificate", cert)), ])), ("signerInfos", SignerInfos([SignerInfo(( ("version", CMSVersion("v3")), ("sid", SignerIdentifier(("subjectKeyIdentifier", skid))), ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha512)), ("signedAttrs", signed_attrs), ("signatureAlgorithm", SignatureAlgorithmIdentifier(( ("algorithm", id_ecdsa_with_SHA512), ))), ("signature", SignatureValue(signature)), ))])), ))))), )) cms_path = self.tmpfile() with io_open(cms_path, "wb") as fd: ci.encode_cer(fd.write) self.verify(cert_path, cms_path) fd = open(cms_path, "rb") raw = memoryview(fd.read()) if PY2 else file_mmaped(fd) ctx = {"bered": True} for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx): if decode_path == ("content",): break evgens = SignedData().decode_evgen(raw[obj.offset:], offset=obj.offset, ctx=ctx) buf = BytesIO() agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, buf.write) self.assertSequenceEqual(buf.getvalue(), data) @skipIf(PY2, "no mmaped memoryview support in PY2") @skipIf("PYDERASN_TEST_CMS_HUGE" not in environ, "PYDERASN_TEST_CMS_HUGE is not set") def test_huge(self): """Huge CMS test Environment variable PYDERASN_TEST_CMS_HUGE tells how many MiBs data to sign. Pay attention that openssl cms is unable to do stream verification and eats huge amounts (several times more, that CMS itself) of memory. """ key_path, cert_path, cert, skid = self.keypair() rnd = urandom(1<<20) data_path = self.tmpfile() start = time() with open(data_path, "wb") as fd: for _ in six_xrange(int(environ.get("PYDERASN_TEST_CMS_HUGE"))): # dgst.update(rnd) fd.write(rnd) print("data file written", time() - start) data_fd = open(data_path, "rb") data_raw = file_mmaped(data_fd) from sys import getallocatedblocks mem_start = getallocatedblocks() start = time() eci = EncapsulatedContentInfo(( ("eContentType", ContentType(id_data)), ("eContent", OctetString(data_raw)), )) eci_path = self.tmpfile() with open(eci_path, "wb") as fd: OctetString(eci["eContent"]).encode_cer(fd.write) print("ECI file written", time() - start) eci_fd = open(eci_path, "rb") eci_raw = file_mmaped(eci_fd) start = time() dgst = sha512() def hasher(data): dgst.update(data) return len(data) evgens = OctetString().decode_evgen(eci_raw, ctx={"bered": True}) agg_octet_string(evgens, (), eci_raw, hasher) dgst = dgst.digest() print("digest calculated", time() - start) signed_attrs = SignedAttributes([ Attribute(( ("attrType", id_pkcs9_at_contentType), ("attrValues", AttributeValues([AttributeValue(id_data)])), )), Attribute(( ("attrType", id_pkcs9_at_messageDigest), ("attrValues", AttributeValues([AttributeValue(OctetString(dgst))])), )), ]) signature = self.sign(signed_attrs, key_path) self.assertLess(getallocatedblocks(), mem_start * 2) start = time() ci = ContentInfo(( ("contentType", ContentType(id_signedData)), ("content", Any((SignedData(( ("version", CMSVersion("v3")), ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha512])), ("encapContentInfo", eci), ("certificates", CertificateSet([ CertificateChoices(("certificate", cert)), ])), ("signerInfos", SignerInfos([SignerInfo(( ("version", CMSVersion("v3")), ("sid", SignerIdentifier(("subjectKeyIdentifier", skid))), ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha512)), ("signedAttrs", signed_attrs), ("signatureAlgorithm", SignatureAlgorithmIdentifier(( ("algorithm", id_ecdsa_with_SHA512), ))), ("signature", SignatureValue(signature)), ))])), ))))), )) cms_path = self.tmpfile() with io_open(cms_path, "wb") as fd: ci.encode_cer(fd.write) print("CMS written", time() - start) self.verify(cert_path, cms_path)