]> Cypherpunks.ru repositories - pyderasn.git/blobdiff - tests/test_crl.py
Raise copyright years
[pyderasn.git] / tests / test_crl.py
index 70692303726d28af6bb5b38292c49f146dabf75b..9174e2fb994965d2400162227e7d953c245403be 100644 (file)
@@ -1,6 +1,6 @@
 # coding: utf-8
 # coding: utf-8
-# PyDERASN -- Python ASN.1 DER codec with abstract structures
-# Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
+# PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
+# Copyright (C) 2017-2024 Sergey Matveev <stargrave@stargrave.org>
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as
 """CRL related schemas, just to test the performance with them
 """
 
 """CRL related schemas, just to test the performance with them
 """
 
+from io import BytesIO
 from os.path import exists
 from os.path import exists
+from sys import getsizeof
 from time import time
 from unittest import skipIf
 from unittest import TestCase
 
 from time import time
 from unittest import skipIf
 from unittest import TestCase
 
-from six import PY2
-
 from pyderasn import BitString
 from pyderasn import encode_cer
 from pyderasn import file_mmaped
 from pyderasn import BitString
 from pyderasn import encode_cer
 from pyderasn import file_mmaped
@@ -70,30 +70,69 @@ class CertificateList(Sequence):
         ("signatureValue", BitString()),
     )
 
         ("signatureValue", BitString()),
     )
 
-@skipIf(not exists("revoke.crl"), "CACert's revoke.crl not found")
+
+CRL_PATH = "revoke.crl"
+
+
+@skipIf(not exists(CRL_PATH), "CACert's revoke.crl not found")
 class TestCACert(TestCase):
 class TestCACert(TestCase):
-    def test_cer(self):
-        with open("revoke.crl", "rb") as fd:
+    def test_cer_and_2pass(self):
+        with open(CRL_PATH, "rb") as fd:
             raw = fd.read()
         print("DER read")
         start = time()
         crl1 = CertificateList().decod(raw)
         print("DER decoded", time() - start)
         start = time()
             raw = fd.read()
         print("DER read")
         start = time()
         crl1 = CertificateList().decod(raw)
         print("DER decoded", time() - start)
         start = time()
+        der_raw = crl1.encode()
+        print("DER encoded", time() - start)
+        self.assertSequenceEqual(der_raw, raw)
+        buf = BytesIO()
+        start = time()
+        _, state = crl1.encode1st()
+        print("1st pass state size", getsizeof(state))
+        crl1.encode2nd(buf.write, iter(state))
+        print("DER 2pass encoded", time() - start)
+        self.assertSequenceEqual(buf.getvalue(), raw)
+        start = time()
         cer_raw = encode_cer(crl1)
         print("CER encoded", time() - start)
         start = time()
         crl2 = CertificateList().decod(cer_raw, ctx={"bered": True})
         print("CER decoded", time() - start)
         self.assertEqual(crl2, crl1)
         cer_raw = encode_cer(crl1)
         print("CER encoded", time() - start)
         start = time()
         crl2 = CertificateList().decod(cer_raw, ctx={"bered": True})
         print("CER decoded", time() - start)
         self.assertEqual(crl2, crl1)
-        start = time()
-        der_raw = crl2.encode()
-        print("DER encoded", time() - start)
-        self.assertSequenceEqual(der_raw, raw)
 
 
-    @skipIf(PY2, "Py27 mmap does not implement buffer protocol")
     def test_mmaped(self):
     def test_mmaped(self):
-        fd = open("revoke.crl", "rb")
+        fd = open(CRL_PATH, "rb")
         start = time()
         CertificateList().decod(file_mmaped(fd))
         print("DER decoded", time() - start)
         start = time()
         CertificateList().decod(file_mmaped(fd))
         print("DER decoded", time() - start)
+
+    def test_evgens(self):
+        fd = open(CRL_PATH, "rb")
+        raw = file_mmaped(fd)
+        print("CRL opened")
+        evgens_count = 0
+        revoked_certs_count = 0
+        start = time()
+        for decode_path, _, _ in CertificateList().decode_evgen(raw):
+            evgens_count += 1
+            if (
+                    len(decode_path) == 3 and
+                    decode_path[:2] == ("tbsCertList", "revokedCertificates")
+            ):
+                revoked_certs_count += 1
+        print("CRL parsed", time() - start)
+        evgens_upto_count = 0
+        start = time()
+        for decode_path, _, _ in CertificateList().decode_evgen(raw, ctx={
+                "evgen_mode_upto": (
+                    (("tbsCertList", "revokedCertificates", any), True),
+                ),
+        }):
+            evgens_upto_count += 1
+        print("CRL upto parsed", time() - start)
+        self.assertEqual(
+            float(evgens_count - evgens_upto_count) / revoked_certs_count,
+            3,
+        )