From e83a585f513b694f5f953fab15e5419e837dda7b Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 15 Feb 2020 14:51:09 +0300 Subject: [PATCH] agg_octet_string --- doc/news.rst | 3 ++ pyderasn.py | 24 ++++++++++++ tests/test_cms.py | 98 ++++++++++++++++++++++++++++++++++------------- 3 files changed, 99 insertions(+), 26 deletions(-) diff --git a/doc/news.rst b/doc/news.rst index 5665a42..ec2cf02 100644 --- a/doc/news.rst +++ b/doc/news.rst @@ -17,6 +17,9 @@ News where no in-memory objects storing happens, giving ability to process ASN.1 data without fully parsing it first. ``python -m pyderasn`` has ``--evgen`` mode switcher +* Useful ``agg_octet_string`` that is able to streamingly decode string + from events of ``evgen_mode``, allowing strings retrieving without + copying them to memory first * Initial experimental CER encoding mode, allowing streaming encoding of the data directly to some writeable object * Ability to use mmap-ed memoryviews to skip files loading to memory diff --git a/pyderasn.py b/pyderasn.py index fe1d4ab..8d88a3b 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -816,6 +816,7 @@ except ImportError: # pragma: no cover __version__ = "7.0" __all__ = ( + "agg_octet_string", "Any", "BitString", "BMPString", @@ -3554,6 +3555,29 @@ class OctetString(Obj): yield pp +def agg_octet_string(evgens, decode_path, raw, writer): + """Aggregate constructed string (OctetString and its derivatives) + + :param evgens: iterator of generated events + :param decode_path: points to the string we want to decode + :param raw: slicebable (memoryview, bytearray, etc) with + the data evgens are generated one + :param writer: buffer.write where string is going to be saved + """ + decode_path_len = len(decode_path) + for dp, obj, _ in evgens: + if dp[:decode_path_len] != decode_path: + continue + if not obj.ber_encoded: + write_full(writer, raw[ + obj.offset + obj.tlen + obj.llen: + obj.offset + obj.tlen + obj.llen + obj.vlen - + (EOC_LEN if obj.expl_lenindef else 0) + ]) + if len(dp) == decode_path_len: + break + + NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS) diff --git a/tests/test_cms.py b/tests/test_cms.py index 86df541..359b6bb 100644 --- a/tests/test_cms.py +++ b/tests/test_cms.py @@ -15,16 +15,26 @@ # License along with this program. If not, see # . -from datetime import datetime from hashlib import sha256 +from io import BytesIO +from io import open as io_open +from os import remove from os import urandom -from random import randint from subprocess import call +from tempfile import NamedTemporaryFile +from unittest import skipIf from unittest import TestCase +from hypothesis import given +from hypothesis import settings +from hypothesis.strategies import integers +from six import PY2 + +from pyderasn import agg_octet_string from pyderasn import Any from pyderasn import Choice from pyderasn import encode_cer +from pyderasn import file_mmaped from pyderasn import Integer from pyderasn import ObjectIdentifier from pyderasn import OctetString @@ -32,11 +42,9 @@ from pyderasn import Sequence from pyderasn import SetOf from pyderasn import tag_ctxc from pyderasn import tag_ctxp -from pyderasn import UTCTime from tests.test_crts import AlgorithmIdentifier from tests.test_crts import Certificate from tests.test_crts import SubjectKeyIdentifier -from tests.test_crts import Time class CMSVersion(Integer): @@ -158,13 +166,39 @@ id_pkcs9_at_contentType = ObjectIdentifier("1.2.840.113549.1.9.3") id_pkcs9_at_messageDigest = ObjectIdentifier("1.2.840.113549.1.9.4") id_ce_subjectKeyIdentifier = ObjectIdentifier("2.5.29.14") - -class TestSignedDataCER(TestCase): - def runTest(self): - # openssl ecparam -name prime256v1 -genkey -out key.pem - # openssl req -x509 -new -key key.pem -outform PEM -out cert.pem - # -days 365 -nodes -subj "/CN=doesnotmatter" - with open("cert.cer", "rb") as fd: +openssl_cms_exists = call("openssl cms -help 2>/dev/null", shell=True) == 0 + +@skipIf(not openssl_cms_exists, "openssl cms command not found") +class TestSignedDataCERWithOpenSSL(TestCase): + @settings(deadline=None) + @given(integers(min_value=1000, max_value=5000)) + def runTest(self, data_len): + def tmpfile(): + tmp = NamedTemporaryFile(delete=False) + tmp.close() + tmp = tmp.name + self.addCleanup(lambda: remove(tmp)) + return tmp + key_path = tmpfile() + self.assertEqual(0, call( + "openssl ecparam -name prime256v1 -genkey -out " + key_path, + shell=True, + )) + cert_path = tmpfile() + self.assertEqual(0, call(" ".join(( + "openssl req -x509 -new", + ("-key " + key_path), + ("-outform PEM -out " + cert_path), + "-nodes -subj /CN=pyderasntest", + )), shell=True)) + cert_der_path = tmpfile() + self.assertEqual(0, call(" ".join(( + "openssl x509", + "-inform PEM -in " + cert_path, + "-outform DER -out " + cert_der_path, + )), shell=True)) + self.assertEqual(0, call("cat %s >> %s" % (key_path, cert_path), shell=True)) + with open(cert_der_path, "rb") as fd: cert = Certificate().decod(fd.read()) for ext in cert["tbsCertificate"]["extensions"]: if ext["extnID"] == id_ce_subjectKeyIdentifier: @@ -172,7 +206,7 @@ class TestSignedDataCER(TestCase): ai_sha256 = AlgorithmIdentifier(( ("algorithm", id_sha256), )) - data = urandom(randint(1000, 3000)) + data = urandom(data_len) eci = EncapsulatedContentInfo(( ("eContentType", ContentType(id_data)), ("eContent", OctetString(data)), @@ -193,15 +227,18 @@ class TestSignedDataCER(TestCase): ])), )), ]) - with open("/tmp/in", "wb") as fd: + input_path = tmpfile() + with open(input_path, "wb") as fd: fd.write(encode_cer(signed_attrs)) + signature_path = tmpfile() self.assertEqual(0, call(" ".join(( "openssl dgst -sha256", - "-sign key.pem", - "-binary", - "/tmp/in", - "> /tmp/signature", + ("-sign " + key_path), + "-binary", input_path, + ("> " + signature_path), )), shell=True)) + with open(signature_path, "rb") as fd: + signature = fd.read() ci = ContentInfo(( ("contentType", ContentType(id_signedData)), ("content", Any((SignedData(( @@ -213,23 +250,32 @@ class TestSignedDataCER(TestCase): ])), ("signerInfos", SignerInfos([SignerInfo(( ("version", CMSVersion("v3")), - ("sid", SignerIdentifier( - ("subjectKeyIdentifier", skid) - )), + ("sid", SignerIdentifier(("subjectKeyIdentifier", skid))), ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha256)), ("signedAttrs", signed_attrs), ("signatureAlgorithm", SignatureAlgorithmIdentifier(( ("algorithm", id_ecdsa_with_SHA256), ))), - ("signature", SignatureValue(open("/tmp/signature", "rb").read())), + ("signature", SignatureValue(signature)), ))])), ))))), )) - with open("/tmp/out.p7m", "wb") as fd: - fd.write(encode_cer(ci)) + output_path = tmpfile() + with io_open(output_path, "wb") as fd: + ci.encode_cer(writer=fd.write) self.assertEqual(0, call(" ".join(( "openssl cms -verify", - "-inform DER -in /tmp/out.p7m", - "-signer cert.pem -CAfile cert.pem", - "-out /dev/null", + ("-inform DER -in " + output_path), + "-signer %s -CAfile %s" % (cert_path, cert_path), + "-out /dev/null 2>/dev/null", )), shell=True)) + fd = open(output_path, "rb") + raw = memoryview(fd.read()) if PY2 else file_mmaped(fd) + ctx = {"bered": True} + for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx): + if decode_path == ("content",): + break + evgens = SignedData().decode_evgen(raw[obj.offset:], offset=obj.offset, ctx=ctx) + buf = BytesIO() + agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, buf.write) + self.assertSequenceEqual(buf.getvalue(), data) -- 2.44.0