]> Cypherpunks.ru repositories - pygost.git/commitdiff
Use DEFINES PyDERASN feature for less .decode() invocations
authorSergey Matveev <stargrave@stargrave.org>
Tue, 1 Jan 2019 19:30:41 +0000 (22:30 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Tue, 1 Jan 2019 19:30:41 +0000 (22:30 +0300)
pygost/asn1schemas/cms.py
pygost/asn1schemas/oids.py [new file with mode: 0644]
pygost/asn1schemas/pfx.py
pygost/test_cms.py
pygost/test_pfx.py
pygost/test_x509.py

index 187850bbb03c7309f211e0bd5b153f60c4319d65..88288340905ef17449351420dd060129ed2bc7cd 100644 (file)
@@ -29,6 +29,12 @@ from pyderasn import SetOf
 from pyderasn import tag_ctxc
 from pyderasn import tag_ctxp
 
+from pygost.asn1schemas.oids import id_digestedData
+from pygost.asn1schemas.oids import id_envelopedData
+from pygost.asn1schemas.oids import id_Gost28147_89
+from pygost.asn1schemas.oids import id_signedData
+from pygost.asn1schemas.oids import id_tc26_gost3410_2012_256
+from pygost.asn1schemas.oids import id_tc26_gost3410_2012_512
 from pygost.asn1schemas.x509 import AlgorithmIdentifier
 from pygost.asn1schemas.x509 import SubjectPublicKeyInfo
 
@@ -48,8 +54,57 @@ class RecipientIdentifier(Choice):
     )
 
 
+class Gost2814789Key(OctetString):
+    bounds = (32, 32)
+
+
+class Gost2814789MAC(OctetString):
+    bounds = (4, 4)
+
+
+class Gost2814789EncryptedKey(Sequence):
+    schema = (
+        ("encryptedKey", Gost2814789Key()),
+        ("maskKey", Gost2814789Key(impl=tag_ctxp(0), optional=True)),
+        ("macKey", Gost2814789MAC()),
+    )
+
+
+class GostR34102001TransportParameters(Sequence):
+    schema = (
+        ("encryptionParamSet", ObjectIdentifier()),
+        ("ephemeralPublicKey", SubjectPublicKeyInfo(
+            impl=tag_ctxc(0),
+            optional=True,
+        )),
+        ("ukm", OctetString()),
+    )
+
+
+class GostR3410KeyTransport(Sequence):
+    schema = (
+        ("sessionEncryptedKey", Gost2814789EncryptedKey()),
+        ("transportParameters", GostR34102001TransportParameters(
+            impl=tag_ctxc(0),
+            optional=True,
+        )),
+    )
+
+
 class KeyEncryptionAlgorithmIdentifier(AlgorithmIdentifier):
-    pass
+    schema = (
+        ("algorithm", ObjectIdentifier(defines=(
+            (("..", "encryptedKey"), {
+                id_tc26_gost3410_2012_256: GostR3410KeyTransport(),
+                id_tc26_gost3410_2012_512: GostR3410KeyTransport(),
+            }),
+            (("..", "recipientEncryptedKeys", any, "encryptedKey"), {
+                id_tc26_gost3410_2012_256: Gost2814789EncryptedKey(),
+                id_tc26_gost3410_2012_512: Gost2814789EncryptedKey(),
+            }),
+        ))),
+        ("parameters", Any(optional=True)),
+    )
 
 
 class EncryptedKey(OctetString):
@@ -127,8 +182,24 @@ class RecipientInfos(SetOf):
     bounds = (1, float("+inf"))
 
 
+class Gost2814789IV(OctetString):
+    bounds = (8, 8)
+
+
+class Gost2814789Parameters(Sequence):
+    schema = (
+        ("iv", Gost2814789IV()),
+        ("encryptionParamSet", ObjectIdentifier()),
+    )
+
+
 class ContentEncryptionAlgorithmIdentifier(AlgorithmIdentifier):
-    pass
+    schema = (
+        ("algorithm", ObjectIdentifier(defines=(
+            (("parameters",), {id_Gost28147_89: Gost2814789Parameters()}),
+        ))),
+        ("parameters", Any(optional=True)),
+    )
 
 
 class EncryptedContent(OctetString):
@@ -153,61 +224,6 @@ class EnvelopedData(Sequence):
     )
 
 
-class ContentInfo(Sequence):
-    schema = (
-        ("contentType", ContentType()),
-        ("content", Any(expl=tag_ctxc(0))),
-    )
-
-
-class Gost2814789IV(OctetString):
-    bounds = (8, 8)
-
-
-class Gost2814789Parameters(Sequence):
-    schema = (
-        ("iv", Gost2814789IV()),
-        ("encryptionParamSet", ObjectIdentifier()),
-    )
-
-
-class Gost2814789Key(OctetString):
-    bounds = (32, 32)
-
-
-class Gost2814789MAC(OctetString):
-    bounds = (4, 4)
-
-
-class Gost2814789EncryptedKey(Sequence):
-    schema = (
-        ("encryptedKey", Gost2814789Key()),
-        ("maskKey", Gost2814789Key(impl=tag_ctxp(0), optional=True)),
-        ("macKey", Gost2814789MAC()),
-    )
-
-
-class GostR34102001TransportParameters(Sequence):
-    schema = (
-        ("encryptionParamSet", ObjectIdentifier()),
-        ("ephemeralPublicKey", SubjectPublicKeyInfo(
-            impl=tag_ctxc(0),
-            optional=True,
-        )),
-        ("ukm", OctetString()),
-    )
-
-
-class GostR3410KeyTransport(Sequence):
-    schema = (
-        ("sessionEncryptedKey", Gost2814789EncryptedKey()),
-        ("transportParameters", GostR34102001TransportParameters(
-            impl=tag_ctxc(0),
-            optional=True,
-        )),
-    )
-
-
 class EncapsulatedContentInfo(Sequence):
     schema = (
         ("eContentType", ContentType()),
@@ -276,3 +292,16 @@ class DigestedData(Sequence):
         ("encapContentInfo", EncapsulatedContentInfo()),
         ("digest", Digest()),
     )
+
+
+class ContentInfo(Sequence):
+    schema = (
+        ("contentType", ContentType(defines=(
+            (("content",), {
+                id_digestedData: DigestedData(),
+                id_envelopedData: EnvelopedData(),
+                id_signedData: SignedData(),
+            }),
+        ))),
+        ("content", Any(expl=tag_ctxc(0))),
+    )
diff --git a/pygost/asn1schemas/oids.py b/pygost/asn1schemas/oids.py
new file mode 100644 (file)
index 0000000..1e8e4f6
--- /dev/null
@@ -0,0 +1,16 @@
+from pyderasn import ObjectIdentifier
+
+
+id_pkcs7 = ObjectIdentifier("1.2.840.113549.1.7")
+id_signedData = id_pkcs7 + (2,)
+id_envelopedData = id_pkcs7 + (3,)
+id_digestedData = id_pkcs7 + (5,)
+id_encryptedData = id_pkcs7 + (6,)
+
+id_data = ObjectIdentifier("1.2.840.113549.1.7.1")
+id_tc26_gost3410_2012_256 = ObjectIdentifier("1.2.643.7.1.1.1.1")
+id_tc26_gost3410_2012_512 = ObjectIdentifier("1.2.643.7.1.1.1.2")
+id_Gost28147_89 = ObjectIdentifier("1.2.643.2.2.21")
+
+id_pbes2 = ObjectIdentifier("1.2.840.113549.1.5.13")
+id_pbkdf2 = ObjectIdentifier("1.2.840.113549.1.5.12")
index 790cc181f5d70af3366b217b69c9887d37bb0b41..73dcd3c89d268737fd0e96082745dba73494d095 100644 (file)
@@ -30,30 +30,63 @@ from pyderasn import tag_ctxp
 
 from pygost.asn1schemas.cms import CMSVersion
 from pygost.asn1schemas.cms import ContentType
+from pygost.asn1schemas.cms import Gost2814789Parameters
+from pygost.asn1schemas.oids import id_data
+from pygost.asn1schemas.oids import id_encryptedData
+from pygost.asn1schemas.oids import id_Gost28147_89
+from pygost.asn1schemas.oids import id_pbes2
+from pygost.asn1schemas.oids import id_pbkdf2
 from pygost.asn1schemas.x509 import AlgorithmIdentifier
 
 
-class EncryptionAlgorithmIdentifier(AlgorithmIdentifier):
+class PBKDF2Salt(Choice):
+    schema = (
+        ("specified", OctetString()),
+        # ("otherSource", PBKDF2SaltSources()),
+    )
+
+
+id_hmacWithSHA1 = ObjectIdentifier("1.2.840.113549.2.7")
+
+
+class PBKDF2PRFs(AlgorithmIdentifier):
     schema = (
-        ("algorithm", ObjectIdentifier()),
+        ("algorithm", ObjectIdentifier(default=id_hmacWithSHA1)),
         ("parameters", Any(optional=True)),
     )
 
 
-class ContentEncryptionAlgorithmIdentifier(EncryptionAlgorithmIdentifier):
-    pass
+class IterationCount(Integer):
+    bounds = (1, float("+inf"))
+
+
+class KeyLength(Integer):
+    bounds = (1, float("+inf"))
+
+
+class PBKDF2Params(Sequence):
+    schema = (
+        ("salt", PBKDF2Salt()),
+        ("iterationCount", IterationCount(optional=True)),
+        ("keyLength", KeyLength(optional=True)),
+        ("prf", PBKDF2PRFs()),
+    )
 
 
 class PBES2KDFs(AlgorithmIdentifier):
     schema = (
-        ("algorithm", ObjectIdentifier()),
+        ("algorithm", ObjectIdentifier(defines=(
+            (("parameters",), {id_pbkdf2: PBKDF2Params()}),
+        ))),
         ("parameters", Any(optional=True)),
     )
 
 
 class PBES2Encs(AlgorithmIdentifier):
     schema = (
-        ("algorithm", ObjectIdentifier()),
+        ("algorithm", ObjectIdentifier(defines=(
+            (("parameters",), {id_Gost28147_89: Gost2814789Parameters()}),
+        ))),
         ("parameters", Any(optional=True)),
     )
 
@@ -65,6 +98,24 @@ class PBES2Params(Sequence):
     )
 
 
+class EncryptionAlgorithmIdentifier(AlgorithmIdentifier):
+    schema = (
+        ("algorithm", ObjectIdentifier(defines=(
+            (("parameters",), {id_pbes2: PBES2Params()}),
+        ))),
+        ("parameters", Any(optional=True)),
+    )
+
+
+class ContentEncryptionAlgorithmIdentifier(EncryptionAlgorithmIdentifier):
+    schema = (
+        ("algorithm", ObjectIdentifier(defines=(
+            (("parameters",), {id_pbes2: PBES2Params()}),
+        ))),
+        ("parameters", Any(optional=True)),
+    )
+
+
 class EncryptedContent(OctetString):
     pass
 
@@ -106,7 +157,9 @@ class PKCS12Attributes(SetOf):
 
 class SafeBag(Sequence):
     schema = (
-        ("bagId", ObjectIdentifier()),
+        ("bagId", ObjectIdentifier(defines=(
+            (("bagValue",), {id_encryptedData: EncryptedData()}),
+        ))),
         ("bagValue", PKCS12BagSet(expl=tag_ctxc(0))),
         ("bagAttributes", PKCS12Attributes(optional=True)),
     )
@@ -123,7 +176,9 @@ class OctetStringSafeContents(Sequence):
 
 class AuthSafe(Sequence):
     schema = (
-        ("contentType", ContentType()),
+        ("contentType", ContentType(defines=(
+            (("content",), {id_data: OctetStringSafeContents()}),
+        ))),
         ("content", Any(expl=tag_ctxc(0))),
     )
 
@@ -160,37 +215,3 @@ class EncryptedPrivateKeyInfo(Sequence):
 
 class PKCS8ShroudedKeyBag(EncryptedPrivateKeyInfo):
     pass
-
-
-class PBKDF2Salt(Choice):
-    schema = (
-        ("specified", OctetString()),
-        # ("otherSource", PBKDF2SaltSources()),
-    )
-
-
-id_hmacWithSHA1 = ObjectIdentifier("1.2.840.113549.2.7")
-
-
-class PBKDF2PRFs(AlgorithmIdentifier):
-    schema = (
-        ("algorithm", ObjectIdentifier(default=id_hmacWithSHA1)),
-        ("parameters", Any(optional=True)),
-    )
-
-
-class IterationCount(Integer):
-    bounds = (1, float("+inf"))
-
-
-class KeyLength(Integer):
-    bounds = (1, float("+inf"))
-
-
-class PBKDF2Params(Sequence):
-    schema = (
-        ("salt", PBKDF2Salt()),
-        ("iterationCount", IterationCount(optional=True)),
-        ("keyLength", KeyLength(optional=True)),
-        ("prf", PBKDF2PRFs()),
-    )
index 8eeea91f3edabe6a93eacdfd9d1e0e41c6e31d0e..da6c8596bd95005f305b9fc102740059004d6027 100644 (file)
@@ -35,15 +35,13 @@ from pygost.wrap import unwrap_cryptopro
 from pygost.wrap import unwrap_gost
 
 try:
+    from pyderasn import DecodePathDefBy
     from pyderasn import OctetString
 
     from pygost.asn1schemas.cms import ContentInfo
-    from pygost.asn1schemas.cms import DigestedData
-    from pygost.asn1schemas.cms import EnvelopedData
-    from pygost.asn1schemas.cms import Gost2814789EncryptedKey
-    from pygost.asn1schemas.cms import Gost2814789Parameters
-    from pygost.asn1schemas.cms import GostR3410KeyTransport
-    from pygost.asn1schemas.cms import SignedData
+    from pygost.asn1schemas.oids import id_envelopedData
+    from pygost.asn1schemas.oids import id_tc26_gost3410_2012_256
+    from pygost.asn1schemas.oids import id_tc26_gost3410_2012_512
 except ImportError:
     pyderasn_exists = False
 else:
@@ -67,8 +65,8 @@ class TestSigned(TestCase):
     ):
         content_info, tail = ContentInfo().decode(content_info_raw)
         self.assertSequenceEqual(tail, b"")
-        signed_data, tail = SignedData().decode(bytes(content_info["content"]))
-        self.assertSequenceEqual(tail, b"")
+        self.assertIsNotNone(content_info["content"].defined)
+        _, signed_data = content_info["content"].defined
         self.assertEqual(len(signed_data["signerInfos"]), 1)
         curve = GOST3410Curve(*CURVE_PARAMS[curve_name])
         self.assertTrue(verify(
@@ -127,8 +125,8 @@ class TestDigested(TestCase):
     def process_cms(self, content_info_raw, hasher):
         content_info, tail = ContentInfo().decode(content_info_raw)
         self.assertSequenceEqual(tail, b"")
-        digested_data, tail = DigestedData().decode(bytes(content_info["content"]))
-        self.assertSequenceEqual(tail, b"")
+        self.assertIsNotNone(content_info["content"].defined)
+        _, digested_data = content_info["content"].defined
         self.assertSequenceEqual(
             hasher(bytes(digested_data["encapContentInfo"]["eContent"])).digest(),
             bytes(digested_data["digest"]),
@@ -169,30 +167,56 @@ class TestEnvelopedKTRI(TestCase):
             plaintext_expected,
     ):
         sbox = "Gost28147_tc26_ParamZ"
-        content_info, tail = ContentInfo().decode(content_info_raw)
-        self.assertSequenceEqual(tail, b"")
-        enveloped_data, tail = EnvelopedData().decode(bytes(content_info["content"]))
+        content_info, tail = ContentInfo().decode(content_info_raw, ctx={
+            "defines_by_path": [
+                (
+                    (
+                        "content",
+                        DecodePathDefBy(id_envelopedData),
+                        "recipientInfos",
+                        any,
+                        "ktri",
+                        "encryptedKey",
+                        DecodePathDefBy(spki_algorithm),
+                        "transportParameters",
+                        "ephemeralPublicKey",
+                        "algorithm",
+                        "algorithm",
+                    ),
+                    (
+                        (
+                            ("..", "subjectPublicKey"),
+                            {
+                                id_tc26_gost3410_2012_256: OctetString(),
+                                id_tc26_gost3410_2012_512: OctetString(),
+                            },
+                        ),
+                    ),
+                ) for spki_algorithm in (
+                    id_tc26_gost3410_2012_256,
+                    id_tc26_gost3410_2012_512,
+                )
+            ],
+        })
         self.assertSequenceEqual(tail, b"")
+        self.assertIsNotNone(content_info["content"].defined)
+        _, enveloped_data = content_info["content"].defined
         eci = enveloped_data["encryptedContentInfo"]
         ri = enveloped_data["recipientInfos"][0]
-        encrypted_key, tail = GostR3410KeyTransport().decode(
-            bytes(ri["ktri"]["encryptedKey"])
-        )
-        self.assertSequenceEqual(tail, b"")
+        self.assertIsNotNone(ri["ktri"]["encryptedKey"].defined)
+        _, encrypted_key = ri["ktri"]["encryptedKey"].defined
         ukm = bytes(encrypted_key["transportParameters"]["ukm"])
-        spk = bytes(encrypted_key["transportParameters"]["ephemeralPublicKey"]["subjectPublicKey"])
-        pub_key_their, tail = OctetString().decode(spk)
-        self.assertSequenceEqual(tail, b"")
+        spk = encrypted_key["transportParameters"]["ephemeralPublicKey"]["subjectPublicKey"]
+        self.assertIsNotNone(spk.defined)
+        _, pub_key_their = spk.defined
         curve = GOST3410Curve(*CURVE_PARAMS[curve_name])
         kek = keker(curve, prv_key_our, bytes(pub_key_their), ukm)
         key_wrapped = bytes(encrypted_key["sessionEncryptedKey"]["encryptedKey"])
         mac = bytes(encrypted_key["sessionEncryptedKey"]["macKey"])
         cek = unwrap_cryptopro(kek, ukm + key_wrapped + mac, sbox=sbox)
         ciphertext = bytes(eci["encryptedContent"])
-        encryption_params, tail = Gost2814789Parameters().decode(
-            bytes(eci["contentEncryptionAlgorithm"]["parameters"])
-        )
-        self.assertSequenceEqual(tail, b"")
+        self.assertIsNotNone(eci["contentEncryptionAlgorithm"]["parameters"].defined)
+        _, encryption_params = eci["contentEncryptionAlgorithm"]["parameters"].defined
         iv = bytes(encryption_params["iv"])
         self.assertSequenceEqual(
             cfb_decrypt(cek, ciphertext, iv, sbox=sbox, mesh=True),
@@ -323,32 +347,54 @@ class TestEnvelopedKARI(TestCase):
             plaintext_expected,
     ):
         sbox = "Gost28147_tc26_ParamZ"
-        content_info, tail = ContentInfo().decode(content_info_raw)
-        self.assertSequenceEqual(tail, b"")
-        enveloped_data, tail = EnvelopedData().decode(bytes(content_info["content"]))
+        content_info, tail = ContentInfo().decode(content_info_raw, ctx={
+            "defines_by_path": [
+                (
+                    (
+                        "content",
+                        DecodePathDefBy(id_envelopedData),
+                        "recipientInfos",
+                        any,
+                        "kari",
+                        "originator",
+                        "originatorKey",
+                        "algorithm",
+                        "algorithm",
+                    ),
+                    (
+                        (
+                            ("..", "publicKey"),
+                            {
+                                id_tc26_gost3410_2012_256: OctetString(),
+                                id_tc26_gost3410_2012_512: OctetString(),
+                            },
+                        ),
+                    ),
+                ) for spki_algorithm in (
+                    id_tc26_gost3410_2012_256,
+                    id_tc26_gost3410_2012_512,
+                )
+            ],
+        })
         self.assertSequenceEqual(tail, b"")
+        self.assertIsNotNone(content_info["content"].defined)
+        _, enveloped_data = content_info["content"].defined
         eci = enveloped_data["encryptedContentInfo"]
         kari = enveloped_data["recipientInfos"][0]["kari"]
-        pub_key_their, tail = OctetString().decode(
-            bytes(kari["originator"]["originatorKey"]["publicKey"]),
-        )
-        self.assertSequenceEqual(tail, b"")
+        self.assertIsNotNone(kari["originator"]["originatorKey"]["publicKey"].defined)
+        _, pub_key_their = kari["originator"]["originatorKey"]["publicKey"].defined
         ukm = bytes(kari["ukm"])
         rek = kari["recipientEncryptedKeys"][0]
         curve = GOST3410Curve(*CURVE_PARAMS[curve_name])
         kek = keker(curve, prv_key_our, bytes(pub_key_their), ukm)
-        encrypted_key, tail = Gost2814789EncryptedKey().decode(
-            bytes(rek["encryptedKey"]),
-        )
-        self.assertSequenceEqual(tail, b"")
+        self.assertIsNotNone(rek["encryptedKey"].defined)
+        _, encrypted_key = rek["encryptedKey"].defined
         key_wrapped = bytes(encrypted_key["encryptedKey"])
         mac = bytes(encrypted_key["macKey"])
         cek = unwrap_gost(kek, ukm + key_wrapped + mac, sbox=sbox)
         ciphertext = bytes(eci["encryptedContent"])
-        encryption_params, tail = Gost2814789Parameters().decode(
-            bytes(eci["contentEncryptionAlgorithm"]["parameters"])
-        )
-        self.assertSequenceEqual(tail, b"")
+        self.assertIsNotNone(eci["contentEncryptionAlgorithm"]["parameters"].defined)
+        _, encryption_params = eci["contentEncryptionAlgorithm"]["parameters"].defined
         iv = bytes(encryption_params["iv"])
         self.assertSequenceEqual(
             cfb_decrypt(cek, ciphertext, iv, sbox=sbox, mesh=True),
index 39b477f50945e601700ccd5abf9e44da6937b9cd..8dac161614b5da8537fda7d8709e9833fa633c9b 100644 (file)
@@ -23,14 +23,11 @@ from unittest import TestCase
 from pygost.gost28147 import cfb_decrypt
 from pygost.gost34112012512 import GOST34112012512
 from pygost.gost34112012512 import pbkdf2 as gost34112012_pbkdf2
+from pygost.utils import hexdec
 
 
 try:
-    from pygost.asn1schemas.cms import Gost2814789Parameters
-    from pygost.asn1schemas.pfx import EncryptedData
     from pygost.asn1schemas.pfx import OctetStringSafeContents
-    from pygost.asn1schemas.pfx import PBES2Params
-    from pygost.asn1schemas.pfx import PBKDF2Params
     from pygost.asn1schemas.pfx import PFX
     from pygost.asn1schemas.pfx import PKCS8ShroudedKeyBag
 except ImportError:
@@ -86,12 +83,8 @@ G2ki9enTqos4KpUU0j9IDpl1UXiaA1YDIwUjlAp+81GkLmyt8Fw6Gt/X5JZySAY=
 
         pfx, tail = PFX().decode(self.pfx_raw)
         self.assertSequenceEqual(tail, b"")
-        octet_string_safe_contents, tail = OctetStringSafeContents().decode(
-            bytes(pfx["authSafe"]["content"]),
-        )
-        self.assertSequenceEqual(tail, b"")
+        _, octet_string_safe_contents = pfx["authSafe"]["content"].defined
         outer_safe_contents = octet_string_safe_contents["safeContents"]
-
         octet_string_safe_contents, tail = OctetStringSafeContents().decode(
             bytes(outer_safe_contents[0]["bagValue"]),
         )
@@ -101,18 +94,9 @@ G2ki9enTqos4KpUU0j9IDpl1UXiaA1YDIwUjlAp+81GkLmyt8Fw6Gt/X5JZySAY=
             bytes(safe_bag["bagValue"]),
         )
         self.assertSequenceEqual(tail, b"")
-        pbes2_params, tail = PBES2Params().decode(
-            bytes(shrouded_key_bag["encryptionAlgorithm"]["parameters"]),
-        )
-        self.assertSequenceEqual(tail, b"")
-        pbkdf2_params, tail = PBKDF2Params().decode(
-            bytes(pbes2_params["keyDerivationFunc"]["parameters"]),
-        )
-        self.assertSequenceEqual(tail, b"")
-        enc_scheme_params, tail = Gost2814789Parameters().decode(
-            bytes(pbes2_params["encryptionScheme"]["parameters"]),
-        )
-        self.assertSequenceEqual(tail, b"")
+        _, pbes2_params = shrouded_key_bag["encryptionAlgorithm"]["parameters"].defined
+        _, pbkdf2_params = pbes2_params["keyDerivationFunc"]["parameters"].defined
+        _, enc_scheme_params = pbes2_params["encryptionScheme"]["parameters"].defined
 
         key = gost34112012_pbkdf2(
             password=self.password.encode("utf-8"),
@@ -152,28 +136,12 @@ ATAMBggqhQMHAQEDAgUAA0EA9oq0Vvk8kkgIwkp0x0J5eKtia4MNTiwKAm7jgnCZIx3O98BThaTX
 
         pfx, tail = PFX().decode(self.pfx_raw)
         self.assertSequenceEqual(tail, b"")
-        octet_string_safe_contents, tail = OctetStringSafeContents().decode(
-            bytes(pfx["authSafe"]["content"]),
-        )
-        self.assertSequenceEqual(tail, b"")
+        _, octet_string_safe_contents = pfx["authSafe"]["content"].defined
         outer_safe_contents = octet_string_safe_contents["safeContents"]
-
-        encrypted_data, tail = EncryptedData().decode(
-            bytes(outer_safe_contents[1]["bagValue"]),
-        )
-        self.assertSequenceEqual(tail, b"")
-        pbes2_params, _ = PBES2Params().decode(
-            bytes(encrypted_data["encryptedContentInfo"]["contentEncryptionAlgorithm"]["parameters"]),
-        )
-        self.assertSequenceEqual(tail, b"")
-        pbkdf2_params, tail = PBKDF2Params().decode(
-            bytes(pbes2_params["keyDerivationFunc"]["parameters"]),
-        )
-        self.assertSequenceEqual(tail, b"")
-        enc_scheme_params, tail = Gost2814789Parameters().decode(
-            bytes(pbes2_params["encryptionScheme"]["parameters"]),
-        )
-        self.assertSequenceEqual(tail, b"")
+        _, encrypted_data = outer_safe_contents[1]["bagValue"].defined
+        _, pbes2_params = encrypted_data["encryptedContentInfo"]["contentEncryptionAlgorithm"]["parameters"].defined
+        _, pbkdf2_params = pbes2_params["keyDerivationFunc"]["parameters"].defined
+        _, enc_scheme_params = pbes2_params["encryptionScheme"]["parameters"].defined
         key = gost34112012_pbkdf2(
             password=self.password.encode("utf-8"),
             salt=bytes(pbkdf2_params["salt"]["specified"]),
@@ -194,12 +162,8 @@ ATAMBggqhQMHAQEDAgUAA0EA9oq0Vvk8kkgIwkp0x0J5eKtia4MNTiwKAm7jgnCZIx3O98BThaTX
     def test_mac(self):
         pfx, tail = PFX().decode(self.pfx_raw)
         self.assertSequenceEqual(tail, b"")
-        octet_string_safe_contents, tail = OctetStringSafeContents().decode(
-            bytes(pfx["authSafe"]["content"]),
-        )
-        self.assertSequenceEqual(tail, b"")
+        _, octet_string_safe_contents = pfx["authSafe"]["content"].defined
         outer_safe_contents = octet_string_safe_contents["safeContents"]
-
         mac_data = pfx["macData"]
         mac_key = gost34112012_pbkdf2(
             password=self.password.encode('utf-8'),
index cddee1c37ae166af20eca6ab2daf1c40faf480f5..df82bdb3ebc61fc9b3f9ae58f254f4aee4869cb1 100644 (file)
@@ -32,6 +32,8 @@ from pygost.utils import hexdec
 try:
     from pyderasn import OctetString
 
+    from pygost.asn1schemas.oids import id_tc26_gost3410_2012_256
+    from pygost.asn1schemas.oids import id_tc26_gost3410_2012_512
     from pygost.asn1schemas.x509 import Certificate
 except ImportError:
     pyderasn_exists = False
@@ -48,15 +50,34 @@ class TestCertificate(TestCase):
     """
 
     def process_cert(self, curve_name, mode, hasher, prv_key_raw, cert_raw):
-        cert, tail = Certificate().decode(cert_raw)
+        cert, tail = Certificate().decode(cert_raw, ctx={
+            "defines_by_path": (
+                (
+                    (
+                        "tbsCertificate",
+                        "subjectPublicKeyInfo",
+                        "algorithm",
+                        "algorithm",
+                    ),
+                    (
+                        (
+                            ("..", "subjectPublicKey"),
+                            {
+                                id_tc26_gost3410_2012_256: OctetString(),
+                                id_tc26_gost3410_2012_512: OctetString(),
+                            },
+                        ),
+                    ),
+                ),
+            ),
+        })
         self.assertSequenceEqual(tail, b"")
         curve = GOST3410Curve(*CURVE_PARAMS[curve_name])
         prv_key = prv_unmarshal(prv_key_raw)
-        pub_key_raw, tail = OctetString().decode(
-            bytes(cert["tbsCertificate"]["subjectPublicKeyInfo"]["subjectPublicKey"])
-        )
+        spk = cert["tbsCertificate"]["subjectPublicKeyInfo"]["subjectPublicKey"]
+        self.assertIsNotNone(spk.defined)
+        _, pub_key_raw = spk.defined
         pub_key = pub_unmarshal(bytes(pub_key_raw), mode=mode)
-        self.assertSequenceEqual(tail, b"")
         self.assertSequenceEqual(pub_key, public_key(curve, prv_key))
         self.assertTrue(verify(
             curve,