+# coding: utf-8
+# PyDERASN -- Python ASN.1 DER codec with abstract structures
+# Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, version 3 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+
+from datetime import datetime
+from hashlib import sha256
+from os import urandom
+from random import randint
+from subprocess import call
+from unittest import TestCase
+
+from pyderasn import Any
+from pyderasn import Choice
+from pyderasn import encode_cer
+from pyderasn import Integer
+from pyderasn import ObjectIdentifier
+from pyderasn import OctetString
+from pyderasn import Sequence
+from pyderasn import SetOf
+from pyderasn import tag_ctxc
+from pyderasn import tag_ctxp
+from pyderasn import UTCTime
+from tests.test_crts import AlgorithmIdentifier
+from tests.test_crts import Certificate
+from tests.test_crts import SubjectKeyIdentifier
+from tests.test_crts import Time
+
+
+class CMSVersion(Integer):
+ schema = (
+ ("v0", 0),
+ ("v1", 1),
+ ("v2", 2),
+ ("v3", 3),
+ ("v4", 4),
+ ("v5", 5),
+ )
+
+
+class AttributeValue(Any):
+ pass
+
+
+class AttributeValues(SetOf):
+ schema = AttributeValue()
+
+
+class Attribute(Sequence):
+ schema = (
+ ("attrType", ObjectIdentifier()),
+ ("attrValues", AttributeValues()),
+ )
+
+
+class SignatureAlgorithmIdentifier(AlgorithmIdentifier):
+ pass
+
+
+class SignedAttributes(SetOf):
+ schema = Attribute()
+ bounds = (1, 32)
+ der_forced = True
+
+
+class SignerIdentifier(Choice):
+ schema = (
+ # ("issuerAndSerialNumber", IssuerAndSerialNumber()),
+ ("subjectKeyIdentifier", SubjectKeyIdentifier(impl=tag_ctxp(0))),
+ )
+
+
+class DigestAlgorithmIdentifiers(SetOf):
+ schema = AlgorithmIdentifier()
+
+
+class DigestAlgorithmIdentifier(AlgorithmIdentifier):
+ pass
+
+
+class SignatureValue(OctetString):
+ pass
+
+
+class SignerInfo(Sequence):
+ schema = (
+ ("version", CMSVersion()),
+ ("sid", SignerIdentifier()),
+ ("digestAlgorithm", DigestAlgorithmIdentifier()),
+ ("signedAttrs", SignedAttributes(impl=tag_ctxc(0), optional=True)),
+ ("signatureAlgorithm", SignatureAlgorithmIdentifier()),
+ ("signature", SignatureValue()),
+ # ("unsignedAttrs", UnsignedAttributes(impl=tag_ctxc(1), optional=True)),
+ )
+
+
+class SignerInfos(SetOf):
+ schema = SignerInfo()
+
+
+class ContentType(ObjectIdentifier):
+ pass
+
+
+class EncapsulatedContentInfo(Sequence):
+ schema = (
+ ("eContentType", ContentType()),
+ ("eContent", OctetString(expl=tag_ctxc(0), optional=True)),
+ )
+
+
+class CertificateChoices(Choice):
+ schema = (
+ ('certificate', Certificate()),
+ # ...
+ )
+
+
+class CertificateSet(SetOf):
+ schema = CertificateChoices()
+
+
+class SignedData(Sequence):
+ schema = (
+ ("version", CMSVersion()),
+ ("digestAlgorithms", DigestAlgorithmIdentifiers()),
+ ("encapContentInfo", EncapsulatedContentInfo()),
+ ("certificates", CertificateSet(impl=tag_ctxc(0), optional=True)),
+ # ("crls", RevocationInfoChoices(impl=tag_ctxc(1), optional=True)),
+ ("signerInfos", SignerInfos()),
+ )
+
+
+class ContentInfo(Sequence):
+ schema = (
+ ("contentType", ContentType()),
+ ("content", Any(expl=tag_ctxc(0))),
+ )
+
+
+id_signedData = ObjectIdentifier("1.2.840.113549.1.7.2")
+id_sha256 = ObjectIdentifier("2.16.840.1.101.3.4.2.1")
+id_data = ObjectIdentifier("1.2.840.113549.1.7.1")
+id_ecdsa_with_SHA256 = ObjectIdentifier("1.2.840.10045.4.3.2")
+id_pkcs9_at_contentType = ObjectIdentifier("1.2.840.113549.1.9.3")
+id_pkcs9_at_messageDigest = ObjectIdentifier("1.2.840.113549.1.9.4")
+id_ce_subjectKeyIdentifier = ObjectIdentifier("2.5.29.14")
+
+
+class TestSignedDataCER(TestCase):
+ def runTest(self):
+ # openssl ecparam -name prime256v1 -genkey -out key.pem
+ # openssl req -x509 -new -key key.pem -outform PEM -out cert.pem
+ # -days 365 -nodes -subj "/CN=doesnotmatter"
+ with open("cert.cer", "rb") as fd:
+ cert = Certificate().decod(fd.read())
+ for ext in cert["tbsCertificate"]["extensions"]:
+ if ext["extnID"] == id_ce_subjectKeyIdentifier:
+ skid = SubjectKeyIdentifier().decod(bytes(ext["extnValue"]))
+ ai_sha256 = AlgorithmIdentifier((
+ ("algorithm", id_sha256),
+ ))
+ data = urandom(randint(1000, 3000))
+ eci = EncapsulatedContentInfo((
+ ("eContentType", ContentType(id_data)),
+ ("eContent", OctetString(data)),
+ ))
+ signed_attrs = SignedAttributes([
+ Attribute((
+ ("attrType", id_pkcs9_at_contentType),
+ ("attrValues", AttributeValues([
+ AttributeValue(id_data.encode())
+ ])),
+ )),
+ Attribute((
+ ("attrType", id_pkcs9_at_messageDigest),
+ ("attrValues", AttributeValues([
+ AttributeValue(OctetString(
+ sha256(bytes(eci["eContent"])).digest()
+ ).encode()),
+ ])),
+ )),
+ ])
+ with open("/tmp/in", "wb") as fd:
+ fd.write(encode_cer(signed_attrs))
+ self.assertEqual(0, call(" ".join((
+ "openssl dgst -sha256",
+ "-sign key.pem",
+ "-binary",
+ "/tmp/in",
+ "> /tmp/signature",
+ )), shell=True))
+ ci = ContentInfo((
+ ("contentType", ContentType(id_signedData)),
+ ("content", Any((SignedData((
+ ("version", CMSVersion("v3")),
+ ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha256])),
+ ("encapContentInfo", eci),
+ ("certificates", CertificateSet([
+ CertificateChoices(("certificate", cert)),
+ ])),
+ ("signerInfos", SignerInfos([SignerInfo((
+ ("version", CMSVersion("v3")),
+ ("sid", SignerIdentifier(
+ ("subjectKeyIdentifier", skid)
+ )),
+ ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha256)),
+ ("signedAttrs", signed_attrs),
+ ("signatureAlgorithm", SignatureAlgorithmIdentifier((
+ ("algorithm", id_ecdsa_with_SHA256),
+ ))),
+ ("signature", SignatureValue(open("/tmp/signature", "rb").read())),
+ ))])),
+ ))))),
+ ))
+ with open("/tmp/out.p7m", "wb") as fd:
+ fd.write(encode_cer(ci))
+ self.assertEqual(0, call(" ".join((
+ "openssl cms -verify",
+ "-inform DER -in /tmp/out.p7m",
+ "-signer cert.pem -CAfile cert.pem",
+ "-out /dev/null",
+ )), shell=True))