]> Cypherpunks.ru repositories - pyderasn.git/blobdiff - tests/test_cms.py
Initial support for CER encoding
[pyderasn.git] / tests / test_cms.py
diff --git a/tests/test_cms.py b/tests/test_cms.py
new file mode 100644 (file)
index 0000000..86df541
--- /dev/null
@@ -0,0 +1,235 @@
+# coding: utf-8
+# PyDERASN -- Python ASN.1 DER codec with abstract structures
+# Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, version 3 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program.  If not, see
+# <http://www.gnu.org/licenses/>.
+
+from datetime import datetime
+from hashlib import sha256
+from os import urandom
+from random import randint
+from subprocess import call
+from unittest import TestCase
+
+from pyderasn import Any
+from pyderasn import Choice
+from pyderasn import encode_cer
+from pyderasn import Integer
+from pyderasn import ObjectIdentifier
+from pyderasn import OctetString
+from pyderasn import Sequence
+from pyderasn import SetOf
+from pyderasn import tag_ctxc
+from pyderasn import tag_ctxp
+from pyderasn import UTCTime
+from tests.test_crts import AlgorithmIdentifier
+from tests.test_crts import Certificate
+from tests.test_crts import SubjectKeyIdentifier
+from tests.test_crts import Time
+
+
+class CMSVersion(Integer):
+    schema = (
+        ("v0", 0),
+        ("v1", 1),
+        ("v2", 2),
+        ("v3", 3),
+        ("v4", 4),
+        ("v5", 5),
+    )
+
+
+class AttributeValue(Any):
+    pass
+
+
+class AttributeValues(SetOf):
+    schema = AttributeValue()
+
+
+class Attribute(Sequence):
+    schema = (
+        ("attrType", ObjectIdentifier()),
+        ("attrValues", AttributeValues()),
+    )
+
+
+class SignatureAlgorithmIdentifier(AlgorithmIdentifier):
+    pass
+
+
+class SignedAttributes(SetOf):
+    schema = Attribute()
+    bounds = (1, 32)
+    der_forced = True
+
+
+class SignerIdentifier(Choice):
+    schema = (
+        # ("issuerAndSerialNumber", IssuerAndSerialNumber()),
+        ("subjectKeyIdentifier", SubjectKeyIdentifier(impl=tag_ctxp(0))),
+    )
+
+
+class DigestAlgorithmIdentifiers(SetOf):
+    schema = AlgorithmIdentifier()
+
+
+class DigestAlgorithmIdentifier(AlgorithmIdentifier):
+    pass
+
+
+class SignatureValue(OctetString):
+    pass
+
+
+class SignerInfo(Sequence):
+    schema = (
+        ("version", CMSVersion()),
+        ("sid", SignerIdentifier()),
+        ("digestAlgorithm", DigestAlgorithmIdentifier()),
+        ("signedAttrs", SignedAttributes(impl=tag_ctxc(0), optional=True)),
+        ("signatureAlgorithm", SignatureAlgorithmIdentifier()),
+        ("signature", SignatureValue()),
+        # ("unsignedAttrs", UnsignedAttributes(impl=tag_ctxc(1), optional=True)),
+    )
+
+
+class SignerInfos(SetOf):
+    schema = SignerInfo()
+
+
+class ContentType(ObjectIdentifier):
+    pass
+
+
+class EncapsulatedContentInfo(Sequence):
+    schema = (
+        ("eContentType", ContentType()),
+        ("eContent", OctetString(expl=tag_ctxc(0), optional=True)),
+    )
+
+
+class CertificateChoices(Choice):
+    schema = (
+        ('certificate', Certificate()),
+        # ...
+    )
+
+
+class CertificateSet(SetOf):
+    schema = CertificateChoices()
+
+
+class SignedData(Sequence):
+    schema = (
+        ("version", CMSVersion()),
+        ("digestAlgorithms", DigestAlgorithmIdentifiers()),
+        ("encapContentInfo", EncapsulatedContentInfo()),
+        ("certificates", CertificateSet(impl=tag_ctxc(0), optional=True)),
+        # ("crls", RevocationInfoChoices(impl=tag_ctxc(1), optional=True)),
+        ("signerInfos", SignerInfos()),
+    )
+
+
+class ContentInfo(Sequence):
+    schema = (
+        ("contentType", ContentType()),
+        ("content", Any(expl=tag_ctxc(0))),
+    )
+
+
+id_signedData = ObjectIdentifier("1.2.840.113549.1.7.2")
+id_sha256 = ObjectIdentifier("2.16.840.1.101.3.4.2.1")
+id_data = ObjectIdentifier("1.2.840.113549.1.7.1")
+id_ecdsa_with_SHA256 = ObjectIdentifier("1.2.840.10045.4.3.2")
+id_pkcs9_at_contentType = ObjectIdentifier("1.2.840.113549.1.9.3")
+id_pkcs9_at_messageDigest = ObjectIdentifier("1.2.840.113549.1.9.4")
+id_ce_subjectKeyIdentifier = ObjectIdentifier("2.5.29.14")
+
+
+class TestSignedDataCER(TestCase):
+    def runTest(self):
+        # openssl ecparam -name prime256v1 -genkey -out key.pem
+        # openssl req -x509 -new -key key.pem -outform PEM -out cert.pem
+        #    -days 365 -nodes -subj "/CN=doesnotmatter"
+        with open("cert.cer", "rb") as fd:
+            cert = Certificate().decod(fd.read())
+        for ext in cert["tbsCertificate"]["extensions"]:
+            if ext["extnID"] == id_ce_subjectKeyIdentifier:
+                skid = SubjectKeyIdentifier().decod(bytes(ext["extnValue"]))
+        ai_sha256 = AlgorithmIdentifier((
+            ("algorithm", id_sha256),
+        ))
+        data = urandom(randint(1000, 3000))
+        eci = EncapsulatedContentInfo((
+            ("eContentType", ContentType(id_data)),
+            ("eContent", OctetString(data)),
+        ))
+        signed_attrs = SignedAttributes([
+            Attribute((
+                ("attrType", id_pkcs9_at_contentType),
+                ("attrValues", AttributeValues([
+                    AttributeValue(id_data.encode())
+                ])),
+            )),
+            Attribute((
+                ("attrType", id_pkcs9_at_messageDigest),
+                ("attrValues", AttributeValues([
+                    AttributeValue(OctetString(
+                        sha256(bytes(eci["eContent"])).digest()
+                    ).encode()),
+                ])),
+            )),
+        ])
+        with open("/tmp/in", "wb") as fd:
+            fd.write(encode_cer(signed_attrs))
+        self.assertEqual(0, call(" ".join((
+            "openssl dgst -sha256",
+            "-sign key.pem",
+            "-binary",
+            "/tmp/in",
+            "> /tmp/signature",
+        )), shell=True))
+        ci = ContentInfo((
+            ("contentType", ContentType(id_signedData)),
+            ("content", Any((SignedData((
+                ("version", CMSVersion("v3")),
+                ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha256])),
+                ("encapContentInfo", eci),
+                ("certificates", CertificateSet([
+                    CertificateChoices(("certificate", cert)),
+                ])),
+                ("signerInfos", SignerInfos([SignerInfo((
+                    ("version", CMSVersion("v3")),
+                    ("sid", SignerIdentifier(
+                        ("subjectKeyIdentifier", skid)
+                    )),
+                    ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha256)),
+                    ("signedAttrs", signed_attrs),
+                    ("signatureAlgorithm", SignatureAlgorithmIdentifier((
+                        ("algorithm", id_ecdsa_with_SHA256),
+                    ))),
+                    ("signature", SignatureValue(open("/tmp/signature", "rb").read())),
+                ))])),
+            ))))),
+        ))
+        with open("/tmp/out.p7m", "wb") as fd:
+            fd.write(encode_cer(ci))
+        self.assertEqual(0, call(" ".join((
+            "openssl cms -verify",
+            "-inform DER -in /tmp/out.p7m",
+            "-signer cert.pem -CAfile cert.pem",
+            "-out /dev/null",
+        )), shell=True))