.. autofunction:: pyderasn.abs_decode_path
.. autofunction:: pyderasn.colonize_hex
+.. autofunction:: pyderasn.encode_cer
.. autofunction:: pyderasn.hexenc
.. autofunction:: pyderasn.hexdec
.. autofunction:: pyderasn.tag_encode
from copy import copy
from datetime import datetime
from datetime import timedelta
+from io import BytesIO
from math import ceil
from operator import attrgetter
from string import ascii_letters
"Choice",
"DecodeError",
"DecodePathDefBy",
+ "encode_cer",
"Enumerated",
"ExceedingData",
"GeneralizedTime",
return l, 1 + octets_num, data[1 + octets_num:]
+LEN1K = len_encode(1000)
+
+
+def write_full(writer, data):
+ """Fully write provided data
+
+ BytesIO does not guarantee that the whole data will be written at once.
+ """
+ data = memoryview(data)
+ written = 0
+ while written != len(data):
+ n = writer(data[written:])
+ if n is None:
+ raise ValueError("can not write to buf")
+ written += n
+
+
########################################################################
# Base class
########################################################################
"""
return self._tag_order
+ @property
+ def tag_order_cer(self):
+ return self.tag_order
+
@property
def tlen(self):
"""See :ref:`decoding`
return raw
return b"".join((self._expl, len_encode(len(raw)), raw))
+ def encode_cer(self, writer):
+ if self._expl is not None:
+ write_full(writer, self._expl + LENINDEF)
+ if getattr(self, "der_forced", False):
+ write_full(writer, self._encode())
+ else:
+ self._encode_cer(writer)
+ if self._expl is not None:
+ write_full(writer, EOC)
+
+ def _encode_cer(self, writer):
+ write_full(writer, self._encode())
+
def hexencode(self):
"""Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
"""
)
+def encode_cer(obj):
+ """Encode to CER in memory
+ """
+ buf = BytesIO()
+ obj.encode_cer(buf.write)
+ return buf.getvalue()
+
+
class DecodePathDefBy(object):
"""DEFINED BY representation inside decode path
"""
octets,
))
+ def _encode_cer(self, writer):
+ bit_len, octets = self._value
+ if len(octets) + 1 <= 1000:
+ write_full(writer, self._encode())
+ return
+ write_full(writer, self.tag_constructed)
+ write_full(writer, LENINDEF)
+ for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
+ write_full(writer, b"".join((
+ BitString.tag_default,
+ LEN1K,
+ int2byte(0),
+ octets[offset:offset + 999],
+ )))
+ tail = octets[offset+999:]
+ if len(tail) > 0:
+ tail = int2byte((8 - bit_len % 8) % 8) + tail
+ write_full(writer, b"".join((
+ BitString.tag_default,
+ len_encode(len(tail)),
+ tail,
+ )))
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
self._value,
))
+ def _encode_cer(self, writer):
+ octets = self._value
+ if len(octets) <= 1000:
+ write_full(writer, self._encode())
+ return
+ write_full(writer, self.tag_constructed)
+ write_full(writer, LENINDEF)
+ for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
+ write_full(writer, b"".join((
+ OctetString.tag_default,
+ LEN1K,
+ octets[offset:offset + 1000],
+ )))
+ tail = octets[offset+1000:]
+ if len(tail) > 0:
+ write_full(writer, b"".join((
+ OctetString.tag_default,
+ len_encode(len(tail)),
+ tail,
+ )))
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
value = self._encode_time()
return b"".join((self.tag, len_encode(len(value)), value))
+ def _encode_cer(self, writer):
+ write_full(writer, self._encode())
+
def todatetime(self):
return self._value
self._assert_ready()
return self._value[1].tag_order if self._tag_order is None else self._tag_order
+ @property
+ def tag_order_cer(self):
+ return min(v.tag_order_cer for v in itervalues(self.specs))
+
def __getitem__(self, key):
if key not in self.specs:
raise ObjUnknown(key)
self._assert_ready()
return self._value[1].encode()
+ def _encode_cer(self, writer):
+ self._assert_ready()
+ self._value[1].encode_cer(writer)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
for choice, spec in iteritems(self.specs):
sub_decode_path = decode_path + (choice,)
"""``ANY`` special type
>>> Any(Integer(-123))
- ANY 020185
+ ANY INTEGER -123 (0X:7B)
>>> a = Any(OctetString(b"hello world").encode())
ANY 040b68656c6c6f20776f726c64
>>> hexenc(bytes(a))
return value
if isinstance(value, self.__class__):
return value._value
- if isinstance(value, Obj):
- return value.encode()
- raise InvalidValueType((self.__class__, Obj, binary_type))
+ if not isinstance(value, Obj):
+ raise InvalidValueType((self.__class__, Obj, binary_type))
+ return value
@property
def ready(self):
def __eq__(self, their):
if their.__class__ == binary_type:
- return self._value == their
+ if self._value.__class__ == binary_type:
+ return self._value == their
+ return self._value.encode() == their
if issubclass(their.__class__, Any):
- return self._value == their._value
+ if self.ready and their.ready:
+ return bytes(self) == bytes(their)
+ return self.ready == their.ready
return False
def __call__(
def __bytes__(self):
self._assert_ready()
- return self._value
+ value = self._value
+ if value.__class__ == binary_type:
+ return value
+ return self._value.encode()
@property
def tlen(self):
def _encode(self):
self._assert_ready()
- return self._value
+ value = self._value
+ if value.__class__ == binary_type:
+ return value
+ return value.encode()
+
+ def _encode_cer(self, writer):
+ self._assert_ready()
+ value = self._value
+ if value.__class__ == binary_type:
+ write_full(writer, value)
+ else:
+ value.encode_cer(writer)
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
return pp_console_row(next(self.pps()))
def pps(self, decode_path=()):
+ value = self._value
+ if value is None:
+ pass
+ elif value.__class__ == binary_type:
+ value = None
+ else:
+ value = repr(value)
yield _pp(
obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
- blob=self._value if self.ready else None,
+ value=value,
+ blob=self._value if self._value.__class__ == binary_type else None,
optional=self.optional,
default=self == self.default,
impl=None if self.tag == self.tag_default else tag_decode(self.tag),
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in self._values_for_encoding():
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
))
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in sorted(
+ self._values_for_encoding(),
+ key=attrgetter("tag_order_cer"),
+ ):
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in self._values_for_encoding():
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
def _decode(
self,
tlv,
v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
+ write_full(writer, v)
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
return super(SetOf, self)._decode(
tlv,