.. autofunction:: pyderasn.abs_decode_path
.. autofunction:: pyderasn.colonize_hex
.. autofunction:: pyderasn.encode_cer
+.. autofunction:: pyderasn.file_mmaped
.. autofunction:: pyderasn.hexenc
.. autofunction:: pyderasn.hexdec
.. autofunction:: pyderasn.tag_encode
from datetime import timedelta
from io import BytesIO
from math import ceil
+from mmap import mmap
+from mmap import PROT_READ
from operator import attrgetter
from string import ascii_letters
from string import digits
__version__ = "7.0"
__all__ = (
+ "agg_octet_string",
"Any",
"BitString",
"BMPString",
"encode_cer",
"Enumerated",
"ExceedingData",
+ "file_mmaped",
"GeneralizedTime",
"GeneralString",
"GraphicString",
SET01 = frozenset("01")
DECIMALS = frozenset(digits)
DECIMAL_SIGNS = ".,"
+NEXT_ATTR_NAME = "next" if PY2 else "__next__"
+def file_mmaped(fd):
+ """Make mmap-ed memoryview for reading from file
+
+ :param fd: file object
+ :returns: memoryview over read-only mmap-ing of the whole file
+ """
+ return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
+
def pureint(value):
if not set(value) <= DECIMALS:
raise ValueError("non-pure integer")
with_colours=False,
with_decode_path=False,
decode_path_only=(),
+ decode_path=(),
):
"""Pretty print object
else:
for row in _pprint_pps(pp):
yield row
- return "\n".join(_pprint_pps(obj.pps()))
+ return "\n".join(_pprint_pps(obj.pps(decode_path)))
########################################################################
>>> OctetString(b"hell", bounds=(4, 4))
OCTET STRING 4 bytes 68656c6c
- .. note::
-
- Pay attention that OCTET STRING can be encoded both in primitive
- and constructed forms. Decoder always checks constructed form tag
- additionally to specified primitive one. If BER decoding is
- :ref:`not enabled <bered_ctx>`, then decoder will fail, because
- of DER restrictions.
+ Memoryviews can be used as a values. If memoryview is made on
+ mmap-ed file, then it does not take storage inside OctetString
+ itself. In CER encoding mode it will be streamed to the specified
+ writer, copying 1 KB chunks.
"""
__slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
tag_default = tag_encode(4)
)
def _value_sanitize(self, value):
- if value.__class__ == binary_type:
+ if value.__class__ == binary_type or value.__class__ == memoryview:
pass
elif issubclass(value.__class__, OctetString):
value = value._value
else:
- raise InvalidValueType((self.__class__, bytes))
+ raise InvalidValueType((self.__class__, bytes, memoryview))
if not self._bound_min <= len(value) <= self._bound_max:
raise BoundsError(self._bound_min, len(value), self._bound_max)
return value
def __bytes__(self):
self._assert_ready()
- return self._value
+ return bytes(self._value)
def __eq__(self, their):
if their.__class__ == binary_type:
yield pp
+def agg_octet_string(evgens, decode_path, raw, writer):
+ """Aggregate constructed string (OctetString and its derivatives)
+
+ :param evgens: iterator of generated events
+ :param decode_path: points to the string we want to decode
+ :param raw: slicebable (memoryview, bytearray, etc) with
+ the data evgens are generated one
+ :param writer: buffer.write where string is going to be saved
+ """
+ decode_path_len = len(decode_path)
+ for dp, obj, _ in evgens:
+ if dp[:decode_path_len] != decode_path:
+ continue
+ if not obj.ber_encoded:
+ write_full(writer, raw[
+ obj.offset + obj.tlen + obj.llen:
+ obj.offset + obj.tlen + obj.llen + obj.vlen -
+ (EOC_LEN if obj.expl_lenindef else 0)
+ ])
+ if len(dp) == decode_path_len:
+ break
+
+
NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
>>> ints
Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
- Also you can initialize sequence with preinitialized values:
+ You can initialize sequence with preinitialized values:
>>> ints = Ints([Integer(123), Integer(234)])
+
+ Also you can use iterator as a value:
+
+ >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
+
+ And it won't be iterated until encoding process. Pay attention that
+ bounds and required schema checks are done only during the encoding
+ process in that case! After encode was called, then value is zeroed
+ back to empty list and you have to set it again. That mode is useful
+ mainly with CER encoding mode, where all objects from the iterable
+ will be streamed to the buffer, without copying all of them to
+ memory first.
"""
__slots__ = ("spec", "_bound_min", "_bound_max")
tag_default = tag_encode(form=TagFormConstructed, num=16)
self._value = copy(default_obj._value)
def _value_sanitize(self, value):
+ iterator = False
if issubclass(value.__class__, SequenceOf):
value = value._value
+ elif hasattr(value, NEXT_ATTR_NAME):
+ iterator = True
+ value = value
elif hasattr(value, "__iter__"):
value = list(value)
else:
- raise InvalidValueType((self.__class__, iter))
- if not self._bound_min <= len(value) <= self._bound_max:
- raise BoundsError(self._bound_min, len(value), self._bound_max)
- for v in value:
- if not isinstance(v, self.spec.__class__):
- raise InvalidValueType((self.spec.__class__,))
+ raise InvalidValueType((self.__class__, iter, "iterator"))
+ if not iterator:
+ if not self._bound_min <= len(value) <= self._bound_max:
+ raise BoundsError(self._bound_min, len(value), self._bound_max)
+ class_expected = self.spec.__class__
+ for v in value:
+ if not isinstance(v, class_expected):
+ raise InvalidValueType((class_expected,))
return value
@property
def ready(self):
+ if hasattr(self._value, NEXT_ATTR_NAME):
+ return True
+ if self._bound_min > 0 and len(self._value) == 0:
+ return False
return all(v.ready for v in self._value)
@property
return any(v.bered for v in self._value)
def __getstate__(self):
+ if hasattr(self._value, NEXT_ATTR_NAME):
+ raise ValueError("can not pickle SequenceOf with iterator")
return SequenceOfState(
__version__,
self.tag,
self._value.append(value)
def __iter__(self):
- self._assert_ready()
return iter(self._value)
def __len__(self):
- self._assert_ready()
return len(self._value)
def __setitem__(self, key, value):
return iter(self._value)
def _encode(self):
- v = b"".join(v.encode() for v in self._values_for_encoding())
- return b"".join((self.tag, len_encode(len(v)), v))
+ iterator = hasattr(self._value, NEXT_ATTR_NAME)
+ if iterator:
+ values = []
+ values_append = values.append
+ class_expected = self.spec.__class__
+ values_for_encoding = self._values_for_encoding()
+ self._value = []
+ for v in values_for_encoding:
+ if not isinstance(v, class_expected):
+ raise InvalidValueType((class_expected,))
+ values_append(v.encode())
+ if not self._bound_min <= len(values) <= self._bound_max:
+ raise BoundsError(self._bound_min, len(values), self._bound_max)
+ value = b"".join(values)
+ else:
+ value = b"".join(v.encode() for v in self._values_for_encoding())
+ return b"".join((self.tag, len_encode(len(value)), value))
def _encode_cer(self, writer):
write_full(writer, self.tag + LENINDEF)
- for v in self._values_for_encoding():
- v.encode_cer(writer)
+ iterator = hasattr(self._value, NEXT_ATTR_NAME)
+ if iterator:
+ class_expected = self.spec.__class__
+ values_count = 0
+ values_for_encoding = self._values_for_encoding()
+ self._value = []
+ for v in values_for_encoding:
+ if not isinstance(v, class_expected):
+ raise InvalidValueType((class_expected,))
+ v.encode_cer(writer)
+ values_count += 1
+ if not self._bound_min <= values_count <= self._bound_max:
+ raise BoundsError(self._bound_min, values_count, self._bound_max)
+ else:
+ for v in self._values_for_encoding():
+ v.encode_cer(writer)
write_full(writer, EOC)
def _decode(
tag_default = tag_encode(form=TagFormConstructed, num=17)
asn1_type_name = "SET OF"
+ def _value_sanitize(self, value):
+ value = super(SetOf, self)._value_sanitize(value)
+ if hasattr(value, NEXT_ATTR_NAME):
+ raise ValueError(
+ "SetOf does not support iterator values, as no sense in them"
+ )
+ return value
+
def _encode(self):
v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
return b"".join((self.tag, len_encode(len(v)), v))
help="Allow explicit tag out-of-bound",
)
parser.add_argument(
- "DERFile",
+ "--evgen",
+ action="store_true",
+ help="Turn on event generation mode",
+ )
+ parser.add_argument(
+ "RAWFile",
type=argparse.FileType("rb"),
- help="Path to DER file you want to decode",
+ help="Path to BER/CER/DER file you want to decode",
)
args = parser.parse_args()
- args.DERFile.seek(args.skip)
- der = memoryview(args.DERFile.read())
- args.DERFile.close()
+ if PY2:
+ args.RAWFile.seek(args.skip)
+ raw = memoryview(args.RAWFile.read())
+ args.RAWFile.close()
+ else:
+ raw = file_mmaped(args.RAWFile)[args.skip:]
oid_maps = (
[obj_by_path(_path) for _path in (args.oids or "").split(",")]
if args.oids else ()
}
if args.defines_by_path is not None:
ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
- obj, tail = schema().decode(der, ctx=ctx)
from os import environ
- print(pprinter(
- obj,
+ pprinter = partial(
+ pprinter,
oid_maps=oid_maps,
with_colours=environ.get("NO_COLOR") is None,
with_decode_path=args.print_decode_path,
() if args.decode_path_only is None else
tuple(args.decode_path_only.split(":"))
),
- ))
+ )
+ if args.evgen:
+ for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
+ print(pprinter(obj, decode_path=decode_path))
+ else:
+ obj, tail = schema().decode(raw, ctx=ctx)
+ print(pprinter(obj))
if tail != b"":
print("\nTrailing data: %s" % hexenc(tail))