]> Cypherpunks.ru repositories - pyderasn.git/commitdiff
keep_memoryview context option 9.2
authorSergey Matveev <stargrave@stargrave.org>
Wed, 23 Mar 2022 15:23:55 +0000 (18:23 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Wed, 23 Mar 2022 20:02:49 +0000 (23:02 +0300)
VERSION
doc/install.rst
doc/news.rst
pyderasn.py
tests/test_crts.py
tests/test_pyderasn.py

diff --git a/VERSION b/VERSION
index 28a2186428bce031c24134633c4ab80a86f11668..1a2c3557ba4eb1138289c1a848eec2acccf85f9f 100644 (file)
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-9.1
+9.2
index c05ee24b6ec035d032644cb333211b57c1dfc191..41b4cb0b26a5a845e73b67dae819c40a0bf91191 100644 (file)
@@ -4,11 +4,11 @@ Install
 Preferable way is to :ref:`download <download>` tarball with the
 signature from `official website <http://www.pyderasn.cypherpunks.ru/>`__::
 
-    $ [fetch|wget] http://www.pyderasn.cypherpunks.ru/download/pyderasn-9.1.tar.zst
-    $ [fetch|wget] http://www.pyderasn.cypherpunks.ru/download/pyderasn-9.1.tar.zst.sig
-    $ gpg --verify pyderasn-9.1.tar.zst.sig pyderasn-9.1.tar.zst
-    $ zstd -d < pyderasn-9.1.tar.zst | tar xf -
-    $ cd pyderasn-9.1
+    $ [fetch|wget] http://www.pyderasn.cypherpunks.ru/download/pyderasn-9.2.tar.zst
+    $ [fetch|wget] http://www.pyderasn.cypherpunks.ru/download/pyderasn-9.2.tar.zst.sig
+    $ gpg --verify pyderasn-9.2.tar.zst.sig pyderasn-9.2.tar.zst
+    $ zstd -d < pyderasn-9.2.tar.zst | tar xf -
+    $ cd pyderasn-9.2
     $ python setup.py install
     # or copy pyderasn.py (possibly termcolor.py) to your PYTHONPATH
 
@@ -18,7 +18,7 @@ signature from `official website <http://www.pyderasn.cypherpunks.ru/>`__::
 
 You could use pip (**no** OpenPGP authentication is performed!) with PyPI::
 
-    $ echo pyderasn==9.1 --hash=sha256:TO-BE-FILLED > requirements.txt
+    $ echo pyderasn==9.2 --hash=sha256:TO-BE-FILLED > requirements.txt
     $ pip install --requirement requirements.txt
 
 You have to verify downloaded tarballs integrity and authenticity to be
index 68d0a6e4cc12d60401f815bfd5078a73934db35e..ae6db1717b232da5790399692d9158d7ab4ee441 100644 (file)
@@ -1,6 +1,15 @@
 News
 ====
 
+.. _release9.2:
+
+9.2
+---
+* ``keep_memoryview`` context option appeared, respected by OctetString
+  and Any objects during DER decoding. If set, then their internal
+  values will keep memoryview reference instead of full bytes copy
+* Correspondingly OctetString and Any have ``.memoryview()`` method
+
 .. _release9.1:
 
 9.1
index 116ef6e9e8d899fb2dfa95e431b6915045993c4c..3c84666a583ac4a85e5fde9119a891bf7680ba87 100755 (executable)
@@ -235,6 +235,7 @@ Currently available context options:
 * :ref:`bered <bered_ctx>`
 * :ref:`defines_by_path <defines_by_path_ctx>`
 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
+* :ref:`keep_memoryview <keep_memoryview_ctx>`
 
 .. _pprinting:
 
@@ -706,6 +707,15 @@ creates read-only memoryview on the file contents::
    page cache used for mmaps. It can take twice the necessary size in
    the memory: both in page cache and ZFS ARC.
 
+.. _keep_memoryview_ctx:
+
+That read-only memoryview could be safe to be used as a value inside
+decoded :py:class:`pyderasn.OctetString` and :py:class:`pyderasn.Any`
+objects. You can enable that by setting `"keep_memoryview": True` in
+:ref:`decode context <ctx>`. No OCTET STRING and ANY values will be
+copied to memory. Of course that works only in DER encoding, where the
+value is continuously encoded.
+
 CER encoding
 ____________
 
@@ -1186,7 +1196,7 @@ except ImportError:  # pragma: no cover
     tzUTC = "missing"
 
 
-__version__ = "9.1"
+__version__ = "9.2"
 
 __all__ = (
     "agg_octet_string",
@@ -3630,6 +3640,7 @@ class OctetString(Obj):
     tag_default = tag_encode(4)
     asn1_type_name = "OCTET STRING"
     evgen_mode_skip_value = True
+    memoryview_safe = True
 
     def __init__(
             self,
@@ -3726,6 +3737,10 @@ class OctetString(Obj):
         self._assert_ready()
         return bytes(self._value)
 
+    def memoryview(self):
+        self._assert_ready()
+        return memoryview(self._value)
+
     def __eq__(self, their):
         if their.__class__ == bytes:
             return self._value == their
@@ -3839,12 +3854,15 @@ class OctetString(Obj):
                     decode_path=decode_path,
                     offset=offset,
                 )
+            if evgen_mode and self.evgen_mode_skip_value:
+                value = None
+            elif self.memoryview_safe and ctx.get("keep_memoryview", False):
+                value = v
+            else:
+                value = v.tobytes()
             try:
                 obj = self.__class__(
-                    value=(
-                        None if (evgen_mode and self.evgen_mode_skip_value)
-                        else v.tobytes()
-                    ),
+                    value=value,
                     bounds=(self._bound_min, self._bound_max),
                     impl=self.tag,
                     expl=self._expl,
@@ -4694,6 +4712,7 @@ class CommonString(OctetString):
          - utf-16-be
     """
     __slots__ = ()
+    memoryview_safe = False
 
     def _value_sanitize(self, value):
         value_raw = None
@@ -4743,6 +4762,9 @@ class CommonString(OctetString):
             return self._value.decode(self.encoding)
         return str(self._value)
 
+    def memoryview(self):
+        raise NotImplementedError()
+
     def __repr__(self):
         return pp_console_row(next(self.pps()))
 
@@ -5842,7 +5864,7 @@ class Any(Obj):
             value = self._value_sanitize(value)
             self._value = value
             if self._expl is None:
-                if value.__class__ == bytes:
+                if value.__class__ == bytes or value.__class__ == memoryview:
                     tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
                 else:
                     tag_class, tag_num = value.tag_order
@@ -5852,7 +5874,7 @@ class Any(Obj):
         self.defined = None
 
     def _value_sanitize(self, value):
-        if value.__class__ == bytes:
+        if value.__class__ == bytes or value.__class__ == memoryview:
             if len(value) == 0:
                 raise ValueError("%s value can not be empty" % self.__class__.__name__)
             return value
@@ -5903,13 +5925,13 @@ class Any(Obj):
         self.defined = state.defined
 
     def __eq__(self, their):
-        if their.__class__ == bytes:
-            if self._value.__class__ == bytes:
+        if their.__class__ == bytes or their.__class__ == memoryview:
+            if self._value.__class__ == bytes or their.__class__ == memoryview:
                 return self._value == their
             return self._value.encode() == their
         if issubclass(their.__class__, Any):
             if self.ready and their.ready:
-                return bytes(self) == bytes(their)
+                return self.memoryview() == their.memoryview()
             return self.ready == their.ready
         return False
 
@@ -5930,8 +5952,17 @@ class Any(Obj):
         value = self._value
         if value.__class__ == bytes:
             return value
+        if value.__class__ == memoryview:
+            return bytes(value)
         return self._value.encode()
 
+    def memoryview(self):
+        self._assert_ready()
+        value = self._value
+        if value.__class__ == memoryview:
+            return memoryview(value)
+        return memoryview(bytes(self))
+
     @property
     def tlen(self):
         return 0
@@ -5939,20 +5970,20 @@ class Any(Obj):
     def _encode(self):
         self._assert_ready()
         value = self._value
-        if value.__class__ == bytes:
-            return value
+        if value.__class__ == bytes or value.__class__ == memoryview:
+            return bytes(self)
         return value.encode()
 
     def _encode1st(self, state):
         self._assert_ready()
         value = self._value
-        if value.__class__ == bytes:
+        if value.__class__ == bytes or value.__class__ == memoryview:
             return len(value), state
         return value.encode1st(state)
 
     def _encode2nd(self, writer, state_iter):
         value = self._value
-        if value.__class__ == bytes:
+        if value.__class__ == bytes or value.__class__ == memoryview:
             write_full(writer, value)
         else:
             value.encode2nd(writer, state_iter)
@@ -5960,7 +5991,7 @@ class Any(Obj):
     def _encode_cer(self, writer):
         self._assert_ready()
         value = self._value
-        if value.__class__ == bytes:
+        if value.__class__ == bytes or value.__class__ == memoryview:
             write_full(writer, value)
         else:
             value.encode_cer(writer)
@@ -6027,8 +6058,14 @@ class Any(Obj):
             )
         tlvlen = tlen + llen + l
         v, tail = tlv[:tlvlen], v[l:]
+        if evgen_mode:
+            value = None
+        elif ctx.get("keep_memoryview", False):
+            value = v
+        else:
+            value = v.tobytes()
         obj = self.__class__(
-            value=None if evgen_mode else v.tobytes(),
+            value=value,
             expl=self._expl,
             optional=self.optional,
             _decoded=(offset, 0, tlvlen),
@@ -6043,7 +6080,7 @@ class Any(Obj):
         value = self._value
         if value is None:
             pass
-        elif value.__class__ == bytes:
+        elif value.__class__ == bytes or value.__class__ == memoryview:
             value = None
         else:
             value = repr(value)
@@ -6053,7 +6090,10 @@ class Any(Obj):
             obj_name=self.__class__.__name__,
             decode_path=decode_path,
             value=value,
-            blob=self._value if self._value.__class__ == bytes else None,
+            blob=self._value if (
+                self._value.__class__ == bytes or
+                value.__class__ == memoryview
+            ) else None,
             optional=self.optional,
             default=self == self.default,
             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
index 92b802d310feae0e757ad240bb7ad6c3b3b2f858..8ceeb086f9a1eca60710eb889618520185104753 100644 (file)
@@ -229,7 +229,7 @@ class TestGoSelfSignedVector(TestCase):
             "ba3ca12568fdc6c7b4511cd40a7f659980402df2b998bb9a4a8cbeb34c0f0a78c",
             "f8d91ede14a5ed76bf116fe360aafa8821490435",
         )))
-        crt = Certificate().decod(raw)
+        crt = Certificate().decod(raw, ctx={"keep_memoryview": True})
         tbs = crt["tbsCertificate"]
         self.assertEqual(tbs["version"], 0)
         self.assertFalse(tbs["version"].decoded)
@@ -301,6 +301,7 @@ class TestGoSelfSignedVector(TestCase):
             "998bb9a4a8cbeb34c0f0a78cf8d91ede14a5ed76bf116fe360aafa8821490435",
         )))))
         self.assertSequenceEqual(crt.encode(), raw)
+        crt = Certificate().decod(raw)
         pprint(crt)
         repr(crt)
         pickle_loads(pickle_dumps(crt, pickle_proto))
index 412f012657bb51ed8819f0a5112314148eb3767c..ae7b4ef675c751ec698390efcf837668dffeb252 100644 (file)
@@ -2216,8 +2216,18 @@ class TestOctetString(CommonMixin, TestCase):
         integers(min_value=0),
         binary(max_size=5),
         decode_path_strat,
+        booleans(),
     )
-    def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path):
+    def test_symmetric(
+            self,
+            values,
+            value,
+            tag_expl,
+            offset,
+            tail_junk,
+            decode_path,
+            keep_memoryview,
+    ):
         for klass in (OctetString, OctetStringInherited):
             _, _, _, _, default, optional, _decoded = values
             obj = klass(
@@ -2245,6 +2255,7 @@ class TestOctetString(CommonMixin, TestCase):
                 obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
                 obj_expled_encoded,
             )
+            ctx_dummy["keep_memoryview"] = keep_memoryview
             ctx_copied = deepcopy(ctx_dummy)
             obj_decoded, tail = obj_expled.decode(
                 obj_expled_encoded + tail_junk,
@@ -2260,6 +2271,10 @@ class TestOctetString(CommonMixin, TestCase):
             self.assertNotEqual(obj_decoded, obj)
             self.assertEqual(bytes(obj_decoded), bytes(obj_expled))
             self.assertEqual(bytes(obj_decoded), bytes(obj))
+            self.assertIsInstance(
+                obj_decoded._value,
+                memoryview if keep_memoryview else bytes,
+            )
             self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded)
             self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl)
             self.assertEqual(obj_decoded.expl_tlen, len(tag_expl))
@@ -5267,8 +5282,18 @@ class TestAny(CommonMixin, TestCase):
         integers(min_value=0),
         binary(max_size=5),
         decode_path_strat,
+        booleans(),
     )
-    def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path):
+    def test_symmetric(
+            self,
+            values,
+            value,
+            tag_expl,
+            offset,
+            tail_junk,
+            decode_path,
+            keep_memoryview,
+    ):
         for klass in (Any, AnyInherited):
             _, _, optional, _decoded = values
             obj = klass(value=value, optional=optional, _decoded=_decoded)
@@ -5288,6 +5313,7 @@ class TestAny(CommonMixin, TestCase):
             list(obj_expled.pps())
             pprint(obj_expled, big_blobs=True, with_decode_path=True)
             obj_expled_encoded = obj_expled.encode()
+            ctx_dummy["keep_memoryview"] = keep_memoryview
             ctx_copied = deepcopy(ctx_dummy)
             obj_decoded, tail = obj_expled.decode(
                 obj_expled_encoded + tail_junk,
@@ -5302,6 +5328,10 @@ class TestAny(CommonMixin, TestCase):
             self.assertEqual(obj_decoded, obj_expled)
             self.assertEqual(bytes(obj_decoded), bytes(obj_expled))
             self.assertEqual(bytes(obj_decoded), bytes(obj))
+            self.assertIsInstance(
+                obj_decoded._value,
+                memoryview if keep_memoryview else bytes,
+            )
             self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded)
             self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl)
             self.assertEqual(obj_decoded.expl_tlen, len(tag_expl))