]> Cypherpunks.ru repositories - pyderasn.git/blobdiff - pyderasn.py
Streaming of huge data support
[pyderasn.git] / pyderasn.py
index 3533c6b9f18f46f28d9812b1dce8824b29ecfa44..38f9fafdec12617fb68311b182dec1a5bca2eafd 100755 (executable)
@@ -644,6 +644,7 @@ Various
 .. autofunction:: pyderasn.abs_decode_path
 .. autofunction:: pyderasn.colonize_hex
 .. autofunction:: pyderasn.encode_cer
+.. autofunction:: pyderasn.file_mmaped
 .. autofunction:: pyderasn.hexenc
 .. autofunction:: pyderasn.hexdec
 .. autofunction:: pyderasn.tag_encode
@@ -782,6 +783,8 @@ from datetime import datetime
 from datetime import timedelta
 from io import BytesIO
 from math import ceil
+from mmap import mmap
+from mmap import PROT_READ
 from operator import attrgetter
 from string import ascii_letters
 from string import digits
@@ -813,6 +816,7 @@ except ImportError:  # pragma: no cover
 __version__ = "7.0"
 
 __all__ = (
+    "agg_octet_string",
     "Any",
     "BitString",
     "BMPString",
@@ -824,6 +828,7 @@ __all__ = (
     "encode_cer",
     "Enumerated",
     "ExceedingData",
+    "file_mmaped",
     "GeneralizedTime",
     "GeneralString",
     "GraphicString",
@@ -891,6 +896,14 @@ DECIMALS = frozenset(digits)
 DECIMAL_SIGNS = ".,"
 
 
+def file_mmaped(fd):
+    """Make mmap-ed memoryview for reading from file
+
+    :param fd: file object
+    :returns: memoryview over read-only mmap-ing of the whole file
+    """
+    return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
+
 def pureint(value):
     if not set(value) <= DECIMALS:
         raise ValueError("non-pure integer")
@@ -1931,6 +1944,7 @@ def pprint(
         with_colours=False,
         with_decode_path=False,
         decode_path_only=(),
+        decode_path=(),
 ):
     """Pretty print object
 
@@ -1983,7 +1997,7 @@ def pprint(
             else:
                 for row in _pprint_pps(pp):
                     yield row
-    return "\n".join(_pprint_pps(obj.pps()))
+    return "\n".join(_pprint_pps(obj.pps(decode_path)))
 
 
 ########################################################################
@@ -3132,13 +3146,10 @@ class OctetString(Obj):
     >>> OctetString(b"hell", bounds=(4, 4))
     OCTET STRING 4 bytes 68656c6c
 
-    .. note::
-
-       Pay attention that OCTET STRING can be encoded both in primitive
-       and constructed forms. Decoder always checks constructed form tag
-       additionally to specified primitive one. If BER decoding is
-       :ref:`not enabled <bered_ctx>`, then decoder will fail, because
-       of DER restrictions.
+    Memoryviews can be used as a values. If memoryview is made on
+    mmap-ed file, then it does not take storage inside OctetString
+    itself. In CER encoding mode it will be streamed to the specified
+    writer, copying 1 KB chunks.
     """
     __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
     tag_default = tag_encode(4)
@@ -3193,12 +3204,12 @@ class OctetString(Obj):
         )
 
     def _value_sanitize(self, value):
-        if value.__class__ == binary_type:
+        if value.__class__ == binary_type or value.__class__ == memoryview:
             pass
         elif issubclass(value.__class__, OctetString):
             value = value._value
         else:
-            raise InvalidValueType((self.__class__, bytes))
+            raise InvalidValueType((self.__class__, bytes, memoryview))
         if not self._bound_min <= len(value) <= self._bound_max:
             raise BoundsError(self._bound_min, len(value), self._bound_max)
         return value
@@ -3238,7 +3249,7 @@ class OctetString(Obj):
 
     def __bytes__(self):
         self._assert_ready()
-        return self._value
+        return bytes(self._value)
 
     def __eq__(self, their):
         if their.__class__ == binary_type:
@@ -3541,6 +3552,29 @@ class OctetString(Obj):
             yield pp
 
 
+def agg_octet_string(evgens, decode_path, raw, writer):
+    """Aggregate constructed string (OctetString and its derivatives)
+
+    :param evgens: iterator of generated events
+    :param decode_path: points to the string we want to decode
+    :param raw: slicebable (memoryview, bytearray, etc) with
+                the data evgens are generated one
+    :param writer: buffer.write where string is going to be saved
+    """
+    decode_path_len = len(decode_path)
+    for dp, obj, _ in evgens:
+        if dp[:decode_path_len] != decode_path:
+            continue
+        if not obj.ber_encoded:
+            write_full(writer, raw[
+                obj.offset + obj.tlen + obj.llen:
+                obj.offset + obj.tlen + obj.llen + obj.vlen -
+                (EOC_LEN if obj.expl_lenindef else 0)
+            ])
+        if len(dp) == decode_path_len:
+            break
+
+
 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
 
 
@@ -6695,14 +6729,22 @@ def main():  # pragma: no cover
         help="Allow explicit tag out-of-bound",
     )
     parser.add_argument(
-        "DERFile",
+        "--evgen",
+        action="store_true",
+        help="Turn on event generation mode",
+    )
+    parser.add_argument(
+        "RAWFile",
         type=argparse.FileType("rb"),
-        help="Path to DER file you want to decode",
+        help="Path to BER/CER/DER file you want to decode",
     )
     args = parser.parse_args()
-    args.DERFile.seek(args.skip)
-    der = memoryview(args.DERFile.read())
-    args.DERFile.close()
+    if PY2:
+        args.RAWFile.seek(args.skip)
+        raw = memoryview(args.RAWFile.read())
+        args.RAWFile.close()
+    else:
+        raw = file_mmaped(args.RAWFile)[args.skip:]
     oid_maps = (
         [obj_by_path(_path) for _path in (args.oids or "").split(",")]
         if args.oids else ()
@@ -6719,10 +6761,9 @@ def main():  # pragma: no cover
     }
     if args.defines_by_path is not None:
         ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
-    obj, tail = schema().decode(der, ctx=ctx)
     from os import environ
-    print(pprinter(
-        obj,
+    pprinter = partial(
+        pprinter,
         oid_maps=oid_maps,
         with_colours=environ.get("NO_COLOR") is None,
         with_decode_path=args.print_decode_path,
@@ -6730,7 +6771,13 @@ def main():  # pragma: no cover
             () if args.decode_path_only is None else
             tuple(args.decode_path_only.split(":"))
         ),
-    ))
+    )
+    if args.evgen:
+        for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
+            print(pprinter(obj, decode_path=decode_path))
+    else:
+        obj, tail = schema().decode(raw, ctx=ctx)
+        print(pprinter(obj))
     if tail != b"":
         print("\nTrailing data: %s" % hexenc(tail))