From: Sergey Matveev Date: Wed, 23 Mar 2022 15:23:55 +0000 (+0300) Subject: keep_memoryview context option X-Git-Tag: 9.2^0 X-Git-Url: http://www.git.cypherpunks.ru/?p=pyderasn.git;a=commitdiff_plain;h=e99d654183cf4fc7506893db16e11adaa56c75ce keep_memoryview context option --- diff --git a/VERSION b/VERSION index 28a2186..1a2c355 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -9.1 +9.2 diff --git a/doc/install.rst b/doc/install.rst index c05ee24..41b4cb0 100644 --- a/doc/install.rst +++ b/doc/install.rst @@ -4,11 +4,11 @@ Install Preferable way is to :ref:`download ` tarball with the signature from `official website `__:: - $ [fetch|wget] http://www.pyderasn.cypherpunks.ru/download/pyderasn-9.1.tar.zst - $ [fetch|wget] http://www.pyderasn.cypherpunks.ru/download/pyderasn-9.1.tar.zst.sig - $ gpg --verify pyderasn-9.1.tar.zst.sig pyderasn-9.1.tar.zst - $ zstd -d < pyderasn-9.1.tar.zst | tar xf - - $ cd pyderasn-9.1 + $ [fetch|wget] http://www.pyderasn.cypherpunks.ru/download/pyderasn-9.2.tar.zst + $ [fetch|wget] http://www.pyderasn.cypherpunks.ru/download/pyderasn-9.2.tar.zst.sig + $ gpg --verify pyderasn-9.2.tar.zst.sig pyderasn-9.2.tar.zst + $ zstd -d < pyderasn-9.2.tar.zst | tar xf - + $ cd pyderasn-9.2 $ python setup.py install # or copy pyderasn.py (possibly termcolor.py) to your PYTHONPATH @@ -18,7 +18,7 @@ signature from `official website `__:: You could use pip (**no** OpenPGP authentication is performed!) with PyPI:: - $ echo pyderasn==9.1 --hash=sha256:TO-BE-FILLED > requirements.txt + $ echo pyderasn==9.2 --hash=sha256:TO-BE-FILLED > requirements.txt $ pip install --requirement requirements.txt You have to verify downloaded tarballs integrity and authenticity to be diff --git a/doc/news.rst b/doc/news.rst index 68d0a6e..ae6db17 100644 --- a/doc/news.rst +++ b/doc/news.rst @@ -1,6 +1,15 @@ News ==== +.. _release9.2: + +9.2 +--- +* ``keep_memoryview`` context option appeared, respected by OctetString + and Any objects during DER decoding. If set, then their internal + values will keep memoryview reference instead of full bytes copy +* Correspondingly OctetString and Any have ``.memoryview()`` method + .. _release9.1: 9.1 diff --git a/pyderasn.py b/pyderasn.py index 116ef6e..3c84666 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -235,6 +235,7 @@ Currently available context options: * :ref:`bered ` * :ref:`defines_by_path ` * :ref:`evgen_mode_upto ` +* :ref:`keep_memoryview ` .. _pprinting: @@ -706,6 +707,15 @@ creates read-only memoryview on the file contents:: page cache used for mmaps. It can take twice the necessary size in the memory: both in page cache and ZFS ARC. +.. _keep_memoryview_ctx: + +That read-only memoryview could be safe to be used as a value inside +decoded :py:class:`pyderasn.OctetString` and :py:class:`pyderasn.Any` +objects. You can enable that by setting `"keep_memoryview": True` in +:ref:`decode context `. No OCTET STRING and ANY values will be +copied to memory. Of course that works only in DER encoding, where the +value is continuously encoded. + CER encoding ____________ @@ -1186,7 +1196,7 @@ except ImportError: # pragma: no cover tzUTC = "missing" -__version__ = "9.1" +__version__ = "9.2" __all__ = ( "agg_octet_string", @@ -3630,6 +3640,7 @@ class OctetString(Obj): tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" evgen_mode_skip_value = True + memoryview_safe = True def __init__( self, @@ -3726,6 +3737,10 @@ class OctetString(Obj): self._assert_ready() return bytes(self._value) + def memoryview(self): + self._assert_ready() + return memoryview(self._value) + def __eq__(self, their): if their.__class__ == bytes: return self._value == their @@ -3839,12 +3854,15 @@ class OctetString(Obj): decode_path=decode_path, offset=offset, ) + if evgen_mode and self.evgen_mode_skip_value: + value = None + elif self.memoryview_safe and ctx.get("keep_memoryview", False): + value = v + else: + value = v.tobytes() try: obj = self.__class__( - value=( - None if (evgen_mode and self.evgen_mode_skip_value) - else v.tobytes() - ), + value=value, bounds=(self._bound_min, self._bound_max), impl=self.tag, expl=self._expl, @@ -4694,6 +4712,7 @@ class CommonString(OctetString): - utf-16-be """ __slots__ = () + memoryview_safe = False def _value_sanitize(self, value): value_raw = None @@ -4743,6 +4762,9 @@ class CommonString(OctetString): return self._value.decode(self.encoding) return str(self._value) + def memoryview(self): + raise NotImplementedError() + def __repr__(self): return pp_console_row(next(self.pps())) @@ -5842,7 +5864,7 @@ class Any(Obj): value = self._value_sanitize(value) self._value = value if self._expl is None: - if value.__class__ == bytes: + if value.__class__ == bytes or value.__class__ == memoryview: tag_class, _, tag_num = tag_decode(tag_strip(value)[0]) else: tag_class, tag_num = value.tag_order @@ -5852,7 +5874,7 @@ class Any(Obj): self.defined = None def _value_sanitize(self, value): - if value.__class__ == bytes: + if value.__class__ == bytes or value.__class__ == memoryview: if len(value) == 0: raise ValueError("%s value can not be empty" % self.__class__.__name__) return value @@ -5903,13 +5925,13 @@ class Any(Obj): self.defined = state.defined def __eq__(self, their): - if their.__class__ == bytes: - if self._value.__class__ == bytes: + if their.__class__ == bytes or their.__class__ == memoryview: + if self._value.__class__ == bytes or their.__class__ == memoryview: return self._value == their return self._value.encode() == their if issubclass(their.__class__, Any): if self.ready and their.ready: - return bytes(self) == bytes(their) + return self.memoryview() == their.memoryview() return self.ready == their.ready return False @@ -5930,8 +5952,17 @@ class Any(Obj): value = self._value if value.__class__ == bytes: return value + if value.__class__ == memoryview: + return bytes(value) return self._value.encode() + def memoryview(self): + self._assert_ready() + value = self._value + if value.__class__ == memoryview: + return memoryview(value) + return memoryview(bytes(self)) + @property def tlen(self): return 0 @@ -5939,20 +5970,20 @@ class Any(Obj): def _encode(self): self._assert_ready() value = self._value - if value.__class__ == bytes: - return value + if value.__class__ == bytes or value.__class__ == memoryview: + return bytes(self) return value.encode() def _encode1st(self, state): self._assert_ready() value = self._value - if value.__class__ == bytes: + if value.__class__ == bytes or value.__class__ == memoryview: return len(value), state return value.encode1st(state) def _encode2nd(self, writer, state_iter): value = self._value - if value.__class__ == bytes: + if value.__class__ == bytes or value.__class__ == memoryview: write_full(writer, value) else: value.encode2nd(writer, state_iter) @@ -5960,7 +5991,7 @@ class Any(Obj): def _encode_cer(self, writer): self._assert_ready() value = self._value - if value.__class__ == bytes: + if value.__class__ == bytes or value.__class__ == memoryview: write_full(writer, value) else: value.encode_cer(writer) @@ -6027,8 +6058,14 @@ class Any(Obj): ) tlvlen = tlen + llen + l v, tail = tlv[:tlvlen], v[l:] + if evgen_mode: + value = None + elif ctx.get("keep_memoryview", False): + value = v + else: + value = v.tobytes() obj = self.__class__( - value=None if evgen_mode else v.tobytes(), + value=value, expl=self._expl, optional=self.optional, _decoded=(offset, 0, tlvlen), @@ -6043,7 +6080,7 @@ class Any(Obj): value = self._value if value is None: pass - elif value.__class__ == bytes: + elif value.__class__ == bytes or value.__class__ == memoryview: value = None else: value = repr(value) @@ -6053,7 +6090,10 @@ class Any(Obj): obj_name=self.__class__.__name__, decode_path=decode_path, value=value, - blob=self._value if self._value.__class__ == bytes else None, + blob=self._value if ( + self._value.__class__ == bytes or + value.__class__ == memoryview + ) else None, optional=self.optional, default=self == self.default, impl=None if self.tag == self.tag_default else tag_decode(self.tag), diff --git a/tests/test_crts.py b/tests/test_crts.py index 92b802d..8ceeb08 100644 --- a/tests/test_crts.py +++ b/tests/test_crts.py @@ -229,7 +229,7 @@ class TestGoSelfSignedVector(TestCase): "ba3ca12568fdc6c7b4511cd40a7f659980402df2b998bb9a4a8cbeb34c0f0a78c", "f8d91ede14a5ed76bf116fe360aafa8821490435", ))) - crt = Certificate().decod(raw) + crt = Certificate().decod(raw, ctx={"keep_memoryview": True}) tbs = crt["tbsCertificate"] self.assertEqual(tbs["version"], 0) self.assertFalse(tbs["version"].decoded) @@ -301,6 +301,7 @@ class TestGoSelfSignedVector(TestCase): "998bb9a4a8cbeb34c0f0a78cf8d91ede14a5ed76bf116fe360aafa8821490435", ))))) self.assertSequenceEqual(crt.encode(), raw) + crt = Certificate().decod(raw) pprint(crt) repr(crt) pickle_loads(pickle_dumps(crt, pickle_proto)) diff --git a/tests/test_pyderasn.py b/tests/test_pyderasn.py index 412f012..ae7b4ef 100644 --- a/tests/test_pyderasn.py +++ b/tests/test_pyderasn.py @@ -2216,8 +2216,18 @@ class TestOctetString(CommonMixin, TestCase): integers(min_value=0), binary(max_size=5), decode_path_strat, + booleans(), ) - def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path): + def test_symmetric( + self, + values, + value, + tag_expl, + offset, + tail_junk, + decode_path, + keep_memoryview, + ): for klass in (OctetString, OctetStringInherited): _, _, _, _, default, optional, _decoded = values obj = klass( @@ -2245,6 +2255,7 @@ class TestOctetString(CommonMixin, TestCase): obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(), obj_expled_encoded, ) + ctx_dummy["keep_memoryview"] = keep_memoryview ctx_copied = deepcopy(ctx_dummy) obj_decoded, tail = obj_expled.decode( obj_expled_encoded + tail_junk, @@ -2260,6 +2271,10 @@ class TestOctetString(CommonMixin, TestCase): self.assertNotEqual(obj_decoded, obj) self.assertEqual(bytes(obj_decoded), bytes(obj_expled)) self.assertEqual(bytes(obj_decoded), bytes(obj)) + self.assertIsInstance( + obj_decoded._value, + memoryview if keep_memoryview else bytes, + ) self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) self.assertEqual(obj_decoded.expl_tlen, len(tag_expl)) @@ -5267,8 +5282,18 @@ class TestAny(CommonMixin, TestCase): integers(min_value=0), binary(max_size=5), decode_path_strat, + booleans(), ) - def test_symmetric(self, values, value, tag_expl, offset, tail_junk, decode_path): + def test_symmetric( + self, + values, + value, + tag_expl, + offset, + tail_junk, + decode_path, + keep_memoryview, + ): for klass in (Any, AnyInherited): _, _, optional, _decoded = values obj = klass(value=value, optional=optional, _decoded=_decoded) @@ -5288,6 +5313,7 @@ class TestAny(CommonMixin, TestCase): list(obj_expled.pps()) pprint(obj_expled, big_blobs=True, with_decode_path=True) obj_expled_encoded = obj_expled.encode() + ctx_dummy["keep_memoryview"] = keep_memoryview ctx_copied = deepcopy(ctx_dummy) obj_decoded, tail = obj_expled.decode( obj_expled_encoded + tail_junk, @@ -5302,6 +5328,10 @@ class TestAny(CommonMixin, TestCase): self.assertEqual(obj_decoded, obj_expled) self.assertEqual(bytes(obj_decoded), bytes(obj_expled)) self.assertEqual(bytes(obj_decoded), bytes(obj)) + self.assertIsInstance( + obj_decoded._value, + memoryview if keep_memoryview else bytes, + ) self.assertSequenceEqual(obj_decoded.encode(), obj_expled_encoded) self.assertSequenceEqual(obj_decoded.expl_tag, tag_expl) self.assertEqual(obj_decoded.expl_tlen, len(tag_expl))