X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=tests%2Ftest_cms.py;fp=tests%2Ftest_cms.py;h=86df541d2169b43f8b656dd13d215b1bbb1dbb12;hb=011b627453f5bfe82bdd160edbb249a73f21082a;hp=0000000000000000000000000000000000000000;hpb=e1249a0c754920c57e6d7682458f50fd65b70026;p=pyderasn.git diff --git a/tests/test_cms.py b/tests/test_cms.py new file mode 100644 index 0000000..86df541 --- /dev/null +++ b/tests/test_cms.py @@ -0,0 +1,235 @@ +# coding: utf-8 +# PyDERASN -- Python ASN.1 DER codec with abstract structures +# Copyright (C) 2017-2020 Sergey Matveev +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program. If not, see +# . + +from datetime import datetime +from hashlib import sha256 +from os import urandom +from random import randint +from subprocess import call +from unittest import TestCase + +from pyderasn import Any +from pyderasn import Choice +from pyderasn import encode_cer +from pyderasn import Integer +from pyderasn import ObjectIdentifier +from pyderasn import OctetString +from pyderasn import Sequence +from pyderasn import SetOf +from pyderasn import tag_ctxc +from pyderasn import tag_ctxp +from pyderasn import UTCTime +from tests.test_crts import AlgorithmIdentifier +from tests.test_crts import Certificate +from tests.test_crts import SubjectKeyIdentifier +from tests.test_crts import Time + + +class CMSVersion(Integer): + schema = ( + ("v0", 0), + ("v1", 1), + ("v2", 2), + ("v3", 3), + ("v4", 4), + ("v5", 5), + ) + + +class AttributeValue(Any): + pass + + +class AttributeValues(SetOf): + schema = AttributeValue() + + +class Attribute(Sequence): + schema = ( + ("attrType", ObjectIdentifier()), + ("attrValues", AttributeValues()), + ) + + +class SignatureAlgorithmIdentifier(AlgorithmIdentifier): + pass + + +class SignedAttributes(SetOf): + schema = Attribute() + bounds = (1, 32) + der_forced = True + + +class SignerIdentifier(Choice): + schema = ( + # ("issuerAndSerialNumber", IssuerAndSerialNumber()), + ("subjectKeyIdentifier", SubjectKeyIdentifier(impl=tag_ctxp(0))), + ) + + +class DigestAlgorithmIdentifiers(SetOf): + schema = AlgorithmIdentifier() + + +class DigestAlgorithmIdentifier(AlgorithmIdentifier): + pass + + +class SignatureValue(OctetString): + pass + + +class SignerInfo(Sequence): + schema = ( + ("version", CMSVersion()), + ("sid", SignerIdentifier()), + ("digestAlgorithm", DigestAlgorithmIdentifier()), + ("signedAttrs", SignedAttributes(impl=tag_ctxc(0), optional=True)), + ("signatureAlgorithm", SignatureAlgorithmIdentifier()), + ("signature", SignatureValue()), + # ("unsignedAttrs", UnsignedAttributes(impl=tag_ctxc(1), optional=True)), + ) + + +class SignerInfos(SetOf): + schema = SignerInfo() + + +class ContentType(ObjectIdentifier): + pass + + +class EncapsulatedContentInfo(Sequence): + schema = ( + ("eContentType", ContentType()), + ("eContent", OctetString(expl=tag_ctxc(0), optional=True)), + ) + + +class CertificateChoices(Choice): + schema = ( + ('certificate', Certificate()), + # ... + ) + + +class CertificateSet(SetOf): + schema = CertificateChoices() + + +class SignedData(Sequence): + schema = ( + ("version", CMSVersion()), + ("digestAlgorithms", DigestAlgorithmIdentifiers()), + ("encapContentInfo", EncapsulatedContentInfo()), + ("certificates", CertificateSet(impl=tag_ctxc(0), optional=True)), + # ("crls", RevocationInfoChoices(impl=tag_ctxc(1), optional=True)), + ("signerInfos", SignerInfos()), + ) + + +class ContentInfo(Sequence): + schema = ( + ("contentType", ContentType()), + ("content", Any(expl=tag_ctxc(0))), + ) + + +id_signedData = ObjectIdentifier("1.2.840.113549.1.7.2") +id_sha256 = ObjectIdentifier("2.16.840.1.101.3.4.2.1") +id_data = ObjectIdentifier("1.2.840.113549.1.7.1") +id_ecdsa_with_SHA256 = ObjectIdentifier("1.2.840.10045.4.3.2") +id_pkcs9_at_contentType = ObjectIdentifier("1.2.840.113549.1.9.3") +id_pkcs9_at_messageDigest = ObjectIdentifier("1.2.840.113549.1.9.4") +id_ce_subjectKeyIdentifier = ObjectIdentifier("2.5.29.14") + + +class TestSignedDataCER(TestCase): + def runTest(self): + # openssl ecparam -name prime256v1 -genkey -out key.pem + # openssl req -x509 -new -key key.pem -outform PEM -out cert.pem + # -days 365 -nodes -subj "/CN=doesnotmatter" + with open("cert.cer", "rb") as fd: + cert = Certificate().decod(fd.read()) + for ext in cert["tbsCertificate"]["extensions"]: + if ext["extnID"] == id_ce_subjectKeyIdentifier: + skid = SubjectKeyIdentifier().decod(bytes(ext["extnValue"])) + ai_sha256 = AlgorithmIdentifier(( + ("algorithm", id_sha256), + )) + data = urandom(randint(1000, 3000)) + eci = EncapsulatedContentInfo(( + ("eContentType", ContentType(id_data)), + ("eContent", OctetString(data)), + )) + signed_attrs = SignedAttributes([ + Attribute(( + ("attrType", id_pkcs9_at_contentType), + ("attrValues", AttributeValues([ + AttributeValue(id_data.encode()) + ])), + )), + Attribute(( + ("attrType", id_pkcs9_at_messageDigest), + ("attrValues", AttributeValues([ + AttributeValue(OctetString( + sha256(bytes(eci["eContent"])).digest() + ).encode()), + ])), + )), + ]) + with open("/tmp/in", "wb") as fd: + fd.write(encode_cer(signed_attrs)) + self.assertEqual(0, call(" ".join(( + "openssl dgst -sha256", + "-sign key.pem", + "-binary", + "/tmp/in", + "> /tmp/signature", + )), shell=True)) + ci = ContentInfo(( + ("contentType", ContentType(id_signedData)), + ("content", Any((SignedData(( + ("version", CMSVersion("v3")), + ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha256])), + ("encapContentInfo", eci), + ("certificates", CertificateSet([ + CertificateChoices(("certificate", cert)), + ])), + ("signerInfos", SignerInfos([SignerInfo(( + ("version", CMSVersion("v3")), + ("sid", SignerIdentifier( + ("subjectKeyIdentifier", skid) + )), + ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha256)), + ("signedAttrs", signed_attrs), + ("signatureAlgorithm", SignatureAlgorithmIdentifier(( + ("algorithm", id_ecdsa_with_SHA256), + ))), + ("signature", SignatureValue(open("/tmp/signature", "rb").read())), + ))])), + ))))), + )) + with open("/tmp/out.p7m", "wb") as fd: + fd.write(encode_cer(ci)) + self.assertEqual(0, call(" ".join(( + "openssl cms -verify", + "-inform DER -in /tmp/out.p7m", + "-signer cert.pem -CAfile cert.pem", + "-out /dev/null", + )), shell=True))