]> Cypherpunks.ru repositories - pyderasn.git/blobdiff - pyderasn.py
Initial support for CER encoding
[pyderasn.git] / pyderasn.py
index 24204ce8780ca8813723e4d09d715186fe1d1201..3533c6b9f18f46f28d9812b1dce8824b29ecfa44 100755 (executable)
@@ -643,6 +643,7 @@ Various
 
 .. autofunction:: pyderasn.abs_decode_path
 .. autofunction:: pyderasn.colonize_hex
+.. autofunction:: pyderasn.encode_cer
 .. autofunction:: pyderasn.hexenc
 .. autofunction:: pyderasn.hexdec
 .. autofunction:: pyderasn.tag_encode
@@ -779,6 +780,7 @@ from collections import OrderedDict
 from copy import copy
 from datetime import datetime
 from datetime import timedelta
+from io import BytesIO
 from math import ceil
 from operator import attrgetter
 from string import ascii_letters
@@ -819,6 +821,7 @@ __all__ = (
     "Choice",
     "DecodeError",
     "DecodePathDefBy",
+    "encode_cer",
     "Enumerated",
     "ExceedingData",
     "GeneralizedTime",
@@ -1190,6 +1193,23 @@ def len_decode(data):
     return l, 1 + octets_num, data[1 + octets_num:]
 
 
+LEN1K = len_encode(1000)
+
+
+def write_full(writer, data):
+    """Fully write provided data
+
+    BytesIO does not guarantee that the whole data will be written at once.
+    """
+    data = memoryview(data)
+    written = 0
+    while written != len(data):
+        n = writer(data[written:])
+        if n is None:
+            raise ValueError("can not write to buf")
+        written += n
+
+
 ########################################################################
 # Base class
 ########################################################################
@@ -1314,6 +1334,10 @@ class Obj(object):
         """
         return self._tag_order
 
+    @property
+    def tag_order_cer(self):
+        return self.tag_order
+
     @property
     def tlen(self):
         """See :ref:`decoding`
@@ -1357,6 +1381,19 @@ class Obj(object):
             return raw
         return b"".join((self._expl, len_encode(len(raw)), raw))
 
+    def encode_cer(self, writer):
+        if self._expl is not None:
+            write_full(writer, self._expl + LENINDEF)
+        if getattr(self, "der_forced", False):
+            write_full(writer, self._encode())
+        else:
+            self._encode_cer(writer)
+        if self._expl is not None:
+            write_full(writer, EOC)
+
+    def _encode_cer(self, writer):
+        write_full(writer, self._encode())
+
     def hexencode(self):
         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
         """
@@ -1648,6 +1685,14 @@ class Obj(object):
             )
 
 
+def encode_cer(obj):
+    """Encode to CER in memory
+    """
+    buf = BytesIO()
+    obj.encode_cer(buf.write)
+    return buf.getvalue()
+
+
 class DecodePathDefBy(object):
     """DEFINED BY representation inside decode path
     """
@@ -2784,6 +2829,30 @@ class BitString(Obj):
             octets,
         ))
 
+    def _encode_cer(self, writer):
+        bit_len, octets = self._value
+        if len(octets) + 1 <= 1000:
+            write_full(writer, self._encode())
+            return
+        write_full(writer, self.tag_constructed)
+        write_full(writer, LENINDEF)
+        for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
+            write_full(writer, b"".join((
+                BitString.tag_default,
+                LEN1K,
+                int2byte(0),
+                octets[offset:offset + 999],
+            )))
+        tail = octets[offset+999:]
+        if len(tail) > 0:
+            tail = int2byte((8 - bit_len % 8) % 8) + tail
+            write_full(writer, b"".join((
+                BitString.tag_default,
+                len_encode(len(tail)),
+                tail,
+            )))
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
             t, tlen, lv = tag_strip(tlv)
@@ -3214,6 +3283,28 @@ class OctetString(Obj):
             self._value,
         ))
 
+    def _encode_cer(self, writer):
+        octets = self._value
+        if len(octets) <= 1000:
+            write_full(writer, self._encode())
+            return
+        write_full(writer, self.tag_constructed)
+        write_full(writer, LENINDEF)
+        for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
+            write_full(writer, b"".join((
+                OctetString.tag_default,
+                LEN1K,
+                octets[offset:offset + 1000],
+            )))
+        tail = octets[offset+1000:]
+        if len(tail) > 0:
+            write_full(writer, b"".join((
+                OctetString.tag_default,
+                len_encode(len(tail)),
+                tail,
+            )))
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
             t, tlen, lv = tag_strip(tlv)
@@ -4534,6 +4625,9 @@ class UTCTime(VisibleString):
         value = self._encode_time()
         return b"".join((self.tag, len_encode(len(value)), value))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self._encode())
+
     def todatetime(self):
         return self._value
 
@@ -4905,6 +4999,10 @@ class Choice(Obj):
         self._assert_ready()
         return self._value[1].tag_order if self._tag_order is None else self._tag_order
 
+    @property
+    def tag_order_cer(self):
+        return min(v.tag_order_cer for v in itervalues(self.specs))
+
     def __getitem__(self, key):
         if key not in self.specs:
             raise ObjUnknown(key)
@@ -4935,6 +5033,10 @@ class Choice(Obj):
         self._assert_ready()
         return self._value[1].encode()
 
+    def _encode_cer(self, writer):
+        self._assert_ready()
+        self._value[1].encode_cer(writer)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         for choice, spec in iteritems(self.specs):
             sub_decode_path = decode_path + (choice,)
@@ -5066,7 +5168,7 @@ class Any(Obj):
     """``ANY`` special type
 
     >>> Any(Integer(-123))
-    ANY 020185
+    ANY INTEGER -123 (0X:7B)
     >>> a = Any(OctetString(b"hello world").encode())
     ANY 040b68656c6c6f20776f726c64
     >>> hexenc(bytes(a))
@@ -5114,9 +5216,9 @@ class Any(Obj):
             return value
         if isinstance(value, self.__class__):
             return value._value
-        if isinstance(value, Obj):
-            return value.encode()
-        raise InvalidValueType((self.__class__, Obj, binary_type))
+        if not isinstance(value, Obj):
+            raise InvalidValueType((self.__class__, Obj, binary_type))
+        return value
 
     @property
     def ready(self):
@@ -5160,9 +5262,13 @@ class Any(Obj):
 
     def __eq__(self, their):
         if their.__class__ == binary_type:
-            return self._value == their
+            if self._value.__class__ == binary_type:
+                return self._value == their
+            return self._value.encode() == their
         if issubclass(their.__class__, Any):
-            return self._value == their._value
+            if self.ready and their.ready:
+                return bytes(self) == bytes(their)
+            return self.ready == their.ready
         return False
 
     def __call__(
@@ -5179,7 +5285,10 @@ class Any(Obj):
 
     def __bytes__(self):
         self._assert_ready()
-        return self._value
+        value = self._value
+        if value.__class__ == binary_type:
+            return value
+        return self._value.encode()
 
     @property
     def tlen(self):
@@ -5187,7 +5296,18 @@ class Any(Obj):
 
     def _encode(self):
         self._assert_ready()
-        return self._value
+        value = self._value
+        if value.__class__ == binary_type:
+            return value
+        return value.encode()
+
+    def _encode_cer(self, writer):
+        self._assert_ready()
+        value = self._value
+        if value.__class__ == binary_type:
+            write_full(writer, value)
+        else:
+            value.encode_cer(writer)
 
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
@@ -5264,12 +5384,20 @@ class Any(Obj):
         return pp_console_row(next(self.pps()))
 
     def pps(self, decode_path=()):
+        value = self._value
+        if value is None:
+            pass
+        elif value.__class__ == binary_type:
+            value = None
+        else:
+            value = repr(value)
         yield _pp(
             obj=self,
             asn1_type_name=self.asn1_type_name,
             obj_name=self.__class__.__name__,
             decode_path=decode_path,
-            blob=self._value if self.ready else None,
+            value=value,
+            blob=self._value if self._value.__class__ == binary_type else None,
             optional=self.optional,
             default=self == self.default,
             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
@@ -5591,6 +5719,12 @@ class Sequence(Obj):
         v = b"".join(v.encode() for v in self._values_for_encoding())
         return b"".join((self.tag, len_encode(len(v)), v))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self.tag + LENINDEF)
+        for v in self._values_for_encoding():
+            v.encode_cer(writer)
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
             t, tlen, lv = tag_strip(tlv)
@@ -5853,6 +5987,15 @@ class Set(Sequence):
         ))
         return b"".join((self.tag, len_encode(len(v)), v))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self.tag + LENINDEF)
+        for v in sorted(
+                self._values_for_encoding(),
+                key=attrgetter("tag_order_cer"),
+        ):
+            v.encode_cer(writer)
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
             t, tlen, lv = tag_strip(tlv)
@@ -6216,6 +6359,12 @@ class SequenceOf(Obj):
         v = b"".join(v.encode() for v in self._values_for_encoding())
         return b"".join((self.tag, len_encode(len(v)), v))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self.tag + LENINDEF)
+        for v in self._values_for_encoding():
+            v.encode_cer(writer)
+        write_full(writer, EOC)
+
     def _decode(
             self,
             tlv,
@@ -6411,6 +6560,12 @@ class SetOf(SequenceOf):
         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
         return b"".join((self.tag, len_encode(len(v)), v))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self.tag + LENINDEF)
+        for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
+            write_full(writer, v)
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         return super(SetOf, self)._decode(
             tlv,