#!/usr/bin/env python
# coding: utf-8
# PyDERASN -- Python ASN.1 DER/BER codec with abstract structures
-# Copyright (C) 2017-2018 Sergey Matveev <stargrave@stargrave.org>
+# Copyright (C) 2017-2019 Sergey Matveev <stargrave@stargrave.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
* If object is encoded in BER form (not the DER one), then ``ber_encoded``
attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
- STRING``, ``SEQUENCE``, ``SET``, ``SET OF`` can contain it.
+ STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``
+ can contain it.
* If object has an indefinite length encoding, then its ``lenindef``
attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
-------
.. autofunction:: pyderasn.abs_decode_path
+.. autofunction:: pyderasn.colonize_hex
.. autofunction:: pyderasn.hexenc
.. autofunction:: pyderasn.hexdec
.. autofunction:: pyderasn.tag_encode
from codecs import getencoder
from collections import namedtuple
from collections import OrderedDict
+from copy import copy
from datetime import datetime
from math import ceil
from os import environ
+from string import ascii_letters
from string import digits
from six import add_metaclass
from six import int2byte
from six import integer_types
from six import iterbytes
+from six import iteritems
+from six import itervalues
from six import PY2
from six import string_types
from six import text_type
+from six import unichr as six_unichr
from six.moves import xrange as six_xrange
try:
from termcolor import colored
-except ImportError:
+except ImportError: # pragma: no cover
def colored(what, *args):
return what
# Errors
########################################################################
-class DecodeError(Exception):
+class ASN1Error(ValueError):
+ pass
+
+
+class DecodeError(ASN1Error):
def __init__(self, msg="", klass=None, decode_path=(), offset=0):
"""
:param str msg: reason of decode failing
pass
-class ObjUnknown(ValueError):
+class ObjUnknown(ASN1Error):
def __init__(self, name):
super(ObjUnknown, self).__init__()
self.name = name
return "%s(%s)" % (self.__class__.__name__, self)
-class ObjNotReady(ValueError):
+class ObjNotReady(ASN1Error):
def __init__(self, name):
super(ObjNotReady, self).__init__()
self.name = name
return "%s(%s)" % (self.__class__.__name__, self)
-class InvalidValueType(ValueError):
+class InvalidValueType(ASN1Error):
def __init__(self, expected_types):
super(InvalidValueType, self).__init__()
self.expected_types = expected_types
return "%s(%s)" % (self.__class__.__name__, self)
-class BoundsError(ValueError):
+class BoundsError(ASN1Error):
def __init__(self, bound_min, value, bound_max):
super(BoundsError, self).__init__()
self.bound_min = bound_min
decode_path=(),
ctx=None,
tag_only=False,
+ _ctx_immutable=True,
):
"""Decode the data
:param int offset: initial data's offset
:param bool leavemm: do we need to leave memoryview of remaining
data as is, or convert it to bytes otherwise
- :param ctx: optional :ref:`context <ctx>` governing decoding process.
+ :param ctx: optional :ref:`context <ctx>` governing decoding process
:param tag_only: decode only the tag, without length and contents
(used only in Choice and Set structures, trying to
determine if tag satisfies the scheme)
+ :param _ctx_immutable: do we need to copy ``ctx`` before using it
:returns: (Obj, remaining data)
"""
if ctx is None:
ctx = {}
+ elif _ctx_immutable:
+ ctx = copy(ctx)
tlv = memoryview(data)
if self._expl is None:
result = self._decode(
ctx=ctx,
tag_only=tag_only,
)
- if tag_only:
+ if tag_only: # pragma: no cover
return
obj, tail = result
eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
ctx=ctx,
tag_only=tag_only,
)
- if tag_only:
+ if tag_only: # pragma: no cover
return
obj, tail = result
if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
########################################################################
PP = namedtuple("PP", (
+ "obj",
"asn1_type_name",
"obj_name",
"decode_path",
def _pp(
+ obj=None,
asn1_type_name="unknown",
obj_name="unknown",
decode_path=(),
bered=False,
):
return PP(
+ obj,
asn1_type_name,
obj_name,
decode_path,
return colored(what, colour, attrs=attrs) if with_colours else what
+def colonize_hex(hexed):
+ """Separate hexadecimal string with colons
+ """
+ return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
+
+
def pp_console_row(
pp,
oids=None,
value in oids
):
cols.append(_colourize("(%s)" % oids[value], "green", with_colours))
+ if pp.asn1_type_name == Integer.asn1_type_name:
+ hex_repr = hex(int(pp.obj._value))[2:].upper()
+ if len(hex_repr) % 2 != 0:
+ hex_repr = "0" + hex_repr
+ cols.append(_colourize(
+ "(%s)" % colonize_hex(hex_repr),
+ "green",
+ with_colours,
+ ))
if with_blob:
if isinstance(pp.blob, binary_type):
cols.append(hexenc(pp.blob))
cols.append(" ." * (decode_path_len + 1))
if isinstance(pp.blob, binary_type):
blob = hexenc(pp.blob).upper()
- for i in range(0, len(blob), 32):
+ for i in six_xrange(0, len(blob), 32):
chunk = blob[i:i + 32]
- yield " ".join(cols + [":".join(
- chunk[j:j + 2] for j in range(0, len(chunk), 2)
- )])
+ yield " ".join(cols + [colonize_hex(chunk)])
elif isinstance(pp.blob, tuple):
yield " ".join(cols + [", ".join(pp.blob)])
self._value = default
def _value_sanitize(self, value):
- if issubclass(value.__class__, Boolean):
- return value._value
if isinstance(value, bool):
return value
+ if issubclass(value.__class__, Boolean):
+ return value._value
raise InvalidValueType((self.__class__, bool))
@property
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
return obj
def __nonzero__(self):
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
self._value = default
def _value_sanitize(self, value):
- if issubclass(value.__class__, Integer):
- value = value._value
- elif isinstance(value, integer_types):
+ if isinstance(value, integer_types):
pass
+ elif issubclass(value.__class__, Integer):
+ value = value._value
elif isinstance(value, str):
value = self.specs.get(value)
if value is None:
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
return obj
def __int__(self):
@property
def named(self):
- for name, value in self.specs.items():
+ for name, value in iteritems(self.specs):
if value == self._value:
return name
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
yield pp
+SET01 = frozenset(("0", "1"))
+
+
class BitString(Obj):
"""``BIT STRING`` bit string type
return bit_len, bytes(octets)
def _value_sanitize(self, value):
- if issubclass(value.__class__, BitString):
- return value._value
if isinstance(value, (string_types, binary_type)):
if (
isinstance(value, string_types) and
):
if value.endswith("'B"):
value = value[1:-2]
- if not set(value) <= set(("0", "1")):
+ if not frozenset(value) <= SET01:
raise ValueError("B's coding contains unacceptable chars")
return self._bits2octets(value)
elif value.endswith("'H"):
bits.append(bit)
if len(bits) == 0:
return self._bits2octets("")
- bits = set(bits)
+ bits = frozenset(bits)
return self._bits2octets("".join(
("1" if bit in bits else "0")
for bit in six_xrange(max(bits) + 1)
))
+ if issubclass(value.__class__, BitString):
+ return value._value
raise InvalidValueType((self.__class__, binary_type, string_types))
@property
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
return obj
def __iter__(self):
@property
def named(self):
- return [name for name, bit in self.specs.items() if self[bit]]
+ return [name for name, bit in iteritems(self.specs) if self[bit]]
def __call__(
self,
offset=offset,
)
if t == self.tag:
- if tag_only:
+ if tag_only: # pragma: no cover
return
return self._decode_chunk(lv, offset, decode_path, ctx)
if t == self.tag_constructed:
decode_path=decode_path,
offset=offset,
)
- if tag_only:
+ if tag_only: # pragma: no cover
return
lenindef = False
try:
decode_path=sub_decode_path,
leavemm=True,
ctx=ctx,
+ _ctx_immutable=False,
)
except TagMismatch:
raise DecodeError(
if len(self.specs) > 0:
blob = tuple(self.named)
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
)
def _value_sanitize(self, value):
- if issubclass(value.__class__, OctetString):
- value = value._value
- elif isinstance(value, binary_type):
+ if isinstance(value, binary_type):
pass
+ elif issubclass(value.__class__, OctetString):
+ value = value._value
else:
raise InvalidValueType((self.__class__, bytes))
if not self._bound_min <= len(value) <= self._bound_max:
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
return obj
def __bytes__(self):
decode_path=sub_decode_path,
leavemm=True,
ctx=ctx,
+ _ctx_immutable=False,
)
except TagMismatch:
raise DecodeError(
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
return obj
def __eq__(self, their):
decode_path=decode_path,
offset=offset,
)
- if tag_only:
+ if tag_only: # pragma: no cover
return
try:
l, _, v = len_decode(lv)
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
return obj
def __iter__(self):
decode_path=decode_path,
offset=offset,
)
- if tag_only:
+ if tag_only: # pragma: no cover
return
try:
l, llen, v = len_decode(lv)
)
v, tail = v[:l], v[l:]
arcs = []
+ ber_encoded = False
while len(v) > 0:
i = 0
arc = 0
while True:
octet = indexbytes(v, i)
+ if i == 0 and octet == 0x80:
+ if ctx.get("bered", False):
+ ber_encoded = True
+ else:
+ raise DecodeError("non normalized arc encoding")
arc = (arc << 7) | (octet & 0x7F)
if octet & 0x80 == 0:
arcs.append(arc)
optional=self.optional,
_decoded=(offset, llen, l),
)
+ if ber_encoded:
+ obj.ber_encoded = True
return obj, tail
def __repr__(self):
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
expl_llen=self.expl_llen if self.expled else None,
expl_vlen=self.expl_vlen if self.expled else None,
expl_lenindef=self.expl_lenindef,
+ ber_encoded=self.ber_encoded,
bered=self.bered,
)
for pp in self.pps_lenindef(decode_path):
if isinstance(value, self.__class__):
value = value._value
elif isinstance(value, integer_types):
- if value not in list(self.specs.values()):
+ for _value in itervalues(self.specs):
+ if _value == value:
+ break
+ else:
raise DecodeError(
"unknown integer value: %s" % value,
klass=self.__class__,
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
return obj
def __call__(
if self.ready:
value = hexenc(bytes(self)) if no_unicode else self.__unicode__()
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
asn1_type_name = "UTF8String"
-class NumericString(CommonString):
+class AllowableCharsMixin(object):
+ @property
+ def allowable_chars(self):
+ if PY2:
+ return self._allowable_chars
+ return frozenset(six_unichr(c) for c in self._allowable_chars)
+
+
+class NumericString(AllowableCharsMixin, CommonString):
"""Numeric string
- Its value is properly sanitized: only ASCII digits can be stored.
+ Its value is properly sanitized: only ASCII digits with spaces can
+ be stored.
+
+ >>> NumericString().allowable_chars
+ set(['3', '4', '7', '5', '1', '0', '8', '9', ' ', '6', '2'])
"""
__slots__ = ()
tag_default = tag_encode(18)
encoding = "ascii"
asn1_type_name = "NumericString"
- allowable_chars = set(digits.encode("ascii"))
+ _allowable_chars = frozenset(digits.encode("ascii") + b" ")
def _value_sanitize(self, value):
value = super(NumericString, self)._value_sanitize(value)
- if not set(value) <= self.allowable_chars:
+ if not frozenset(value) <= self._allowable_chars:
raise DecodeError("non-numeric value")
return value
-class PrintableString(CommonString):
+class PrintableString(AllowableCharsMixin, CommonString):
+ """Printable string
+
+ Its value is properly sanitized: see X.680 41.4 table 10.
+
+ >>> PrintableString().allowable_chars
+ >>> set([' ', "'", ..., 'z'])
+ """
__slots__ = ()
tag_default = tag_encode(19)
encoding = "ascii"
asn1_type_name = "PrintableString"
+ _allowable_chars = frozenset(
+ (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
+ )
+
+ def _value_sanitize(self, value):
+ value = super(PrintableString, self)._value_sanitize(value)
+ if not frozenset(value) <= self._allowable_chars:
+ raise DecodeError("non-printable value")
+ return value
class TeletexString(CommonString):
self._value = default
def _value_sanitize(self, value):
- if isinstance(value, self.__class__):
- return value._value
- if isinstance(value, datetime):
- return value.strftime(self.fmt).encode("ascii")
if isinstance(value, binary_type):
try:
value_decoded = value.decode("ascii")
return value
else:
raise DecodeError("invalid UTCTime length")
+ if isinstance(value, self.__class__):
+ return value._value
+ if isinstance(value, datetime):
+ return value.strftime(self.fmt).encode("ascii")
raise InvalidValueType((self.__class__, datetime))
def __eq__(self, their):
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
fmt_ms = "%Y%m%d%H%M%S.%fZ"
def _value_sanitize(self, value):
- if isinstance(value, self.__class__):
- return value._value
- if isinstance(value, datetime):
- return value.strftime(
- self.fmt_ms if value.microsecond > 0 else self.fmt
- ).encode("ascii")
if isinstance(value, binary_type):
try:
value_decoded = value.decode("ascii")
"invalid GeneralizedTime length",
klass=self.__class__,
)
+ if isinstance(value, self.__class__):
+ return value._value
+ if isinstance(value, datetime):
+ return value.strftime(
+ self.fmt_ms if value.microsecond > 0 else self.fmt
+ ).encode("ascii")
raise InvalidValueType((self.__class__, datetime))
def todatetime(self):
self._value = default_obj.copy()._value
def _value_sanitize(self, value):
- if isinstance(value, self.__class__):
- return value._value
if isinstance(value, tuple) and len(value) == 2:
choice, obj = value
spec = self.specs.get(choice)
if not isinstance(obj, spec.__class__):
raise InvalidValueType((spec,))
return (choice, spec(obj))
+ if isinstance(value, self.__class__):
+ return value._value
raise InvalidValueType((self.__class__, tuple))
@property
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
value = self._value
if value is not None:
obj._value = (value[0], value[1].copy())
return self._value[1].encode()
def _decode(self, tlv, offset, decode_path, ctx, tag_only):
- for choice, spec in self.specs.items():
+ for choice, spec in iteritems(self.specs):
sub_decode_path = decode_path + (choice,)
try:
spec.decode(
decode_path=sub_decode_path,
ctx=ctx,
tag_only=True,
+ _ctx_immutable=False,
)
except TagMismatch:
continue
decode_path=decode_path,
offset=offset,
)
- if tag_only:
+ if tag_only: # pragma: no cover
return
value, tail = spec.decode(
tlv,
leavemm=True,
decode_path=sub_decode_path,
ctx=ctx,
+ _ctx_immutable=False,
)
obj = self.__class__(
schema=self.specs,
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
self.defined = None
def _value_sanitize(self, value):
+ if isinstance(value, binary_type):
+ return value
if isinstance(value, self.__class__):
return value._value
if isinstance(value, Obj):
return value.encode()
- if isinstance(value, binary_type):
- return value
raise InvalidValueType((self.__class__, Obj, binary_type))
@property
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
return obj
def __eq__(self, their):
decode_path=decode_path + (str(chunk_i),),
leavemm=True,
ctx=ctx,
+ _ctx_immutable=False,
)
vlen += chunk.tlvlen
sub_offset += chunk.tlvlen
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
def abs_decode_path(decode_path, rel_path):
"""Create an absolute decode path from current and relative ones
- :param decode_path: current decode path, starting point.
- Tuple of strings
+ :param decode_path: current decode path, starting point. Tuple of strings
:param rel_path: relative path to ``decode_path``. Tuple of strings.
If first tuple's element is "/", then treat it as
an absolute path, ignoring ``decode_path`` as
@property
def ready(self):
- for name, spec in self.specs.items():
+ for name, spec in iteritems(self.specs):
value = self._value.get(name)
if value is None:
if spec.optional:
def bered(self):
if self.expl_lenindef or self.lenindef or self.ber_encoded:
return True
- return any(value.bered for value in self._value.values())
+ return any(value.bered for value in itervalues(self._value))
def copy(self):
obj = self.__class__(schema=self.specs)
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
- obj._value = {k: v.copy() for k, v in self._value.items()}
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
+ obj._value = {k: v.copy() for k, v in iteritems(self._value)}
return obj
def __eq__(self, their):
def _encoded_values(self):
raws = []
- for name, spec in self.specs.items():
+ for name, spec in iteritems(self.specs):
value = self._value.get(name)
if value is None:
if spec.optional:
decode_path=decode_path,
offset=offset,
)
- if tag_only:
+ if tag_only: # pragma: no cover
return
lenindef = False
ctx_bered = ctx.get("bered", False)
values = {}
ber_encoded = False
ctx_allow_default_values = ctx.get("allow_default_values", False)
- for name, spec in self.specs.items():
+ for name, spec in iteritems(self.specs):
if spec.optional and (
(lenindef and v[:EOC_LEN].tobytes() == EOC) or
len(v) == 0
leavemm=True,
decode_path=sub_decode_path,
ctx=ctx,
+ _ctx_immutable=False,
)
except TagMismatch:
if spec.optional:
leavemm=True,
decode_path=sub_sub_decode_path,
ctx=ctx,
+ _ctx_immutable=False,
)
if len(defined_tail) > 0:
raise DecodeError(
leavemm=True,
decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
ctx=ctx,
+ _ctx_immutable=False,
)
if len(defined_tail) > 0:
raise DecodeError(
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
v = b"".join(raws)
return b"".join((self.tag, len_encode(len(v)), v))
+ def _specs_items(self):
+ return iteritems(self.specs)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only):
try:
t, tlen, lv = tag_strip(tlv)
ctx_allow_default_values = ctx.get("allow_default_values", False)
ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
value_prev = memoryview(v[:0])
- specs_items = self.specs.items
+
while len(v) > 0:
if lenindef and v[:EOC_LEN].tobytes() == EOC:
break
- for name, spec in specs_items():
+ for name, spec in self._specs_items():
sub_decode_path = decode_path + (name,)
try:
spec.decode(
decode_path=sub_decode_path,
ctx=ctx,
tag_only=True,
+ _ctx_immutable=False,
)
except TagMismatch:
continue
leavemm=True,
decode_path=sub_decode_path,
ctx=ctx,
+ _ctx_immutable=False,
)
value_len = value.fulllen
if value_prev.tobytes() > v[:value_len].tobytes():
obj.offset = self.offset
obj.llen = self.llen
obj.vlen = self.vlen
+ obj.expl_lenindef = self.expl_lenindef
+ obj.lenindef = self.lenindef
+ obj.ber_encoded = self.ber_encoded
obj._value = [v.copy() for v in self._value]
return obj
leavemm=True,
decode_path=sub_decode_path,
ctx=ctx,
+ _ctx_immutable=False,
)
value_len = value.fulllen
if ordering_check:
def pps(self, decode_path=()):
yield _pp(
+ obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
choice = PrimitiveTypes()
choice.specs["SequenceOf"] = SequenceOf(schema=choice)
choice.specs["SetOf"] = SetOf(schema=choice)
- for i in range(31):
+ for i in six_xrange(31):
choice.specs["SequenceOf%d" % i] = SequenceOf(
schema=choice,
expl=tag_ctxc(i),