]> Cypherpunks.ru repositories - pyderasn.git/commitdiff
Initial support for CER encoding
authorSergey Matveev <stargrave@stargrave.org>
Wed, 12 Feb 2020 07:42:30 +0000 (10:42 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sun, 16 Feb 2020 18:25:48 +0000 (21:25 +0300)
MANIFEST.in
doc/news.rst
pyderasn.py
tests/test_cms.py [new file with mode: 0644]
tests/test_crl.py
tests/test_crts.py
tests/test_pyderasn.py

index 704144148d58df8815abc0319d923e144a8abf23..1163e7983d8866b90a04c6296f309de8a27603b7 100644 (file)
@@ -28,6 +28,7 @@ include tests/compli_test_suite/LICENSE
 include tests/compli_test_suite/pdf/free_asn1_testsuite.pdf
 include tests/compli_test_suite/README.md
 include tests/compli_test_suite/suite/tc*.ber
+include tests/test_cms.py
 include tests/test_compli.py
 include tests/test_crl.py
 include tests/test_crts.py
index e9749c2236e515dc64e9ec952e598b41616eb1e7..2419f8ef02287ad7ba8c80f330346e67a6debfde 100644 (file)
@@ -9,11 +9,15 @@ News
   same tag to be successfully decoded
 * Fixed possibly invalid SET DER encoding where objects were not sorted
   by tag, but by encoded representation
-* Any does not allow empty data value now. Now it checks if it has valid
-  ASN.1 tag
+* ``Any`` does not allow empty data value now. Now it checks if it has
+  valid ASN.1 tag
+* ``Any`` allows an ordinary ``Obj`` storing, without its forceful
+  encoded representation storage
 * Initial support for so called ``evgen_mode``: event generation mode,
   where no in-memory objects storing happens, giving ability to process
   ASN.1 data without fully parsing it first
+* Initial experimental CER encoding mode, allowing streaming encoding of
+  the data directly to some writeable object
 
 .. _release6.3:
 
index 24204ce8780ca8813723e4d09d715186fe1d1201..3533c6b9f18f46f28d9812b1dce8824b29ecfa44 100755 (executable)
@@ -643,6 +643,7 @@ Various
 
 .. autofunction:: pyderasn.abs_decode_path
 .. autofunction:: pyderasn.colonize_hex
+.. autofunction:: pyderasn.encode_cer
 .. autofunction:: pyderasn.hexenc
 .. autofunction:: pyderasn.hexdec
 .. autofunction:: pyderasn.tag_encode
@@ -779,6 +780,7 @@ from collections import OrderedDict
 from copy import copy
 from datetime import datetime
 from datetime import timedelta
+from io import BytesIO
 from math import ceil
 from operator import attrgetter
 from string import ascii_letters
@@ -819,6 +821,7 @@ __all__ = (
     "Choice",
     "DecodeError",
     "DecodePathDefBy",
+    "encode_cer",
     "Enumerated",
     "ExceedingData",
     "GeneralizedTime",
@@ -1190,6 +1193,23 @@ def len_decode(data):
     return l, 1 + octets_num, data[1 + octets_num:]
 
 
+LEN1K = len_encode(1000)
+
+
+def write_full(writer, data):
+    """Fully write provided data
+
+    BytesIO does not guarantee that the whole data will be written at once.
+    """
+    data = memoryview(data)
+    written = 0
+    while written != len(data):
+        n = writer(data[written:])
+        if n is None:
+            raise ValueError("can not write to buf")
+        written += n
+
+
 ########################################################################
 # Base class
 ########################################################################
@@ -1314,6 +1334,10 @@ class Obj(object):
         """
         return self._tag_order
 
+    @property
+    def tag_order_cer(self):
+        return self.tag_order
+
     @property
     def tlen(self):
         """See :ref:`decoding`
@@ -1357,6 +1381,19 @@ class Obj(object):
             return raw
         return b"".join((self._expl, len_encode(len(raw)), raw))
 
+    def encode_cer(self, writer):
+        if self._expl is not None:
+            write_full(writer, self._expl + LENINDEF)
+        if getattr(self, "der_forced", False):
+            write_full(writer, self._encode())
+        else:
+            self._encode_cer(writer)
+        if self._expl is not None:
+            write_full(writer, EOC)
+
+    def _encode_cer(self, writer):
+        write_full(writer, self._encode())
+
     def hexencode(self):
         """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
         """
@@ -1648,6 +1685,14 @@ class Obj(object):
             )
 
 
+def encode_cer(obj):
+    """Encode to CER in memory
+    """
+    buf = BytesIO()
+    obj.encode_cer(buf.write)
+    return buf.getvalue()
+
+
 class DecodePathDefBy(object):
     """DEFINED BY representation inside decode path
     """
@@ -2784,6 +2829,30 @@ class BitString(Obj):
             octets,
         ))
 
+    def _encode_cer(self, writer):
+        bit_len, octets = self._value
+        if len(octets) + 1 <= 1000:
+            write_full(writer, self._encode())
+            return
+        write_full(writer, self.tag_constructed)
+        write_full(writer, LENINDEF)
+        for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
+            write_full(writer, b"".join((
+                BitString.tag_default,
+                LEN1K,
+                int2byte(0),
+                octets[offset:offset + 999],
+            )))
+        tail = octets[offset+999:]
+        if len(tail) > 0:
+            tail = int2byte((8 - bit_len % 8) % 8) + tail
+            write_full(writer, b"".join((
+                BitString.tag_default,
+                len_encode(len(tail)),
+                tail,
+            )))
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
             t, tlen, lv = tag_strip(tlv)
@@ -3214,6 +3283,28 @@ class OctetString(Obj):
             self._value,
         ))
 
+    def _encode_cer(self, writer):
+        octets = self._value
+        if len(octets) <= 1000:
+            write_full(writer, self._encode())
+            return
+        write_full(writer, self.tag_constructed)
+        write_full(writer, LENINDEF)
+        for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
+            write_full(writer, b"".join((
+                OctetString.tag_default,
+                LEN1K,
+                octets[offset:offset + 1000],
+            )))
+        tail = octets[offset+1000:]
+        if len(tail) > 0:
+            write_full(writer, b"".join((
+                OctetString.tag_default,
+                len_encode(len(tail)),
+                tail,
+            )))
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
             t, tlen, lv = tag_strip(tlv)
@@ -4534,6 +4625,9 @@ class UTCTime(VisibleString):
         value = self._encode_time()
         return b"".join((self.tag, len_encode(len(value)), value))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self._encode())
+
     def todatetime(self):
         return self._value
 
@@ -4905,6 +4999,10 @@ class Choice(Obj):
         self._assert_ready()
         return self._value[1].tag_order if self._tag_order is None else self._tag_order
 
+    @property
+    def tag_order_cer(self):
+        return min(v.tag_order_cer for v in itervalues(self.specs))
+
     def __getitem__(self, key):
         if key not in self.specs:
             raise ObjUnknown(key)
@@ -4935,6 +5033,10 @@ class Choice(Obj):
         self._assert_ready()
         return self._value[1].encode()
 
+    def _encode_cer(self, writer):
+        self._assert_ready()
+        self._value[1].encode_cer(writer)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         for choice, spec in iteritems(self.specs):
             sub_decode_path = decode_path + (choice,)
@@ -5066,7 +5168,7 @@ class Any(Obj):
     """``ANY`` special type
 
     >>> Any(Integer(-123))
-    ANY 020185
+    ANY INTEGER -123 (0X:7B)
     >>> a = Any(OctetString(b"hello world").encode())
     ANY 040b68656c6c6f20776f726c64
     >>> hexenc(bytes(a))
@@ -5114,9 +5216,9 @@ class Any(Obj):
             return value
         if isinstance(value, self.__class__):
             return value._value
-        if isinstance(value, Obj):
-            return value.encode()
-        raise InvalidValueType((self.__class__, Obj, binary_type))
+        if not isinstance(value, Obj):
+            raise InvalidValueType((self.__class__, Obj, binary_type))
+        return value
 
     @property
     def ready(self):
@@ -5160,9 +5262,13 @@ class Any(Obj):
 
     def __eq__(self, their):
         if their.__class__ == binary_type:
-            return self._value == their
+            if self._value.__class__ == binary_type:
+                return self._value == their
+            return self._value.encode() == their
         if issubclass(their.__class__, Any):
-            return self._value == their._value
+            if self.ready and their.ready:
+                return bytes(self) == bytes(their)
+            return self.ready == their.ready
         return False
 
     def __call__(
@@ -5179,7 +5285,10 @@ class Any(Obj):
 
     def __bytes__(self):
         self._assert_ready()
-        return self._value
+        value = self._value
+        if value.__class__ == binary_type:
+            return value
+        return self._value.encode()
 
     @property
     def tlen(self):
@@ -5187,7 +5296,18 @@ class Any(Obj):
 
     def _encode(self):
         self._assert_ready()
-        return self._value
+        value = self._value
+        if value.__class__ == binary_type:
+            return value
+        return value.encode()
+
+    def _encode_cer(self, writer):
+        self._assert_ready()
+        value = self._value
+        if value.__class__ == binary_type:
+            write_full(writer, value)
+        else:
+            value.encode_cer(writer)
 
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
@@ -5264,12 +5384,20 @@ class Any(Obj):
         return pp_console_row(next(self.pps()))
 
     def pps(self, decode_path=()):
+        value = self._value
+        if value is None:
+            pass
+        elif value.__class__ == binary_type:
+            value = None
+        else:
+            value = repr(value)
         yield _pp(
             obj=self,
             asn1_type_name=self.asn1_type_name,
             obj_name=self.__class__.__name__,
             decode_path=decode_path,
-            blob=self._value if self.ready else None,
+            value=value,
+            blob=self._value if self._value.__class__ == binary_type else None,
             optional=self.optional,
             default=self == self.default,
             impl=None if self.tag == self.tag_default else tag_decode(self.tag),
@@ -5591,6 +5719,12 @@ class Sequence(Obj):
         v = b"".join(v.encode() for v in self._values_for_encoding())
         return b"".join((self.tag, len_encode(len(v)), v))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self.tag + LENINDEF)
+        for v in self._values_for_encoding():
+            v.encode_cer(writer)
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
             t, tlen, lv = tag_strip(tlv)
@@ -5853,6 +5987,15 @@ class Set(Sequence):
         ))
         return b"".join((self.tag, len_encode(len(v)), v))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self.tag + LENINDEF)
+        for v in sorted(
+                self._values_for_encoding(),
+                key=attrgetter("tag_order_cer"),
+        ):
+            v.encode_cer(writer)
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         try:
             t, tlen, lv = tag_strip(tlv)
@@ -6216,6 +6359,12 @@ class SequenceOf(Obj):
         v = b"".join(v.encode() for v in self._values_for_encoding())
         return b"".join((self.tag, len_encode(len(v)), v))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self.tag + LENINDEF)
+        for v in self._values_for_encoding():
+            v.encode_cer(writer)
+        write_full(writer, EOC)
+
     def _decode(
             self,
             tlv,
@@ -6411,6 +6560,12 @@ class SetOf(SequenceOf):
         v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
         return b"".join((self.tag, len_encode(len(v)), v))
 
+    def _encode_cer(self, writer):
+        write_full(writer, self.tag + LENINDEF)
+        for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
+            write_full(writer, v)
+        write_full(writer, EOC)
+
     def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
         return super(SetOf, self)._decode(
             tlv,
diff --git a/tests/test_cms.py b/tests/test_cms.py
new file mode 100644 (file)
index 0000000..86df541
--- /dev/null
@@ -0,0 +1,235 @@
+# coding: utf-8
+# PyDERASN -- Python ASN.1 DER codec with abstract structures
+# Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, version 3 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program.  If not, see
+# <http://www.gnu.org/licenses/>.
+
+from datetime import datetime
+from hashlib import sha256
+from os import urandom
+from random import randint
+from subprocess import call
+from unittest import TestCase
+
+from pyderasn import Any
+from pyderasn import Choice
+from pyderasn import encode_cer
+from pyderasn import Integer
+from pyderasn import ObjectIdentifier
+from pyderasn import OctetString
+from pyderasn import Sequence
+from pyderasn import SetOf
+from pyderasn import tag_ctxc
+from pyderasn import tag_ctxp
+from pyderasn import UTCTime
+from tests.test_crts import AlgorithmIdentifier
+from tests.test_crts import Certificate
+from tests.test_crts import SubjectKeyIdentifier
+from tests.test_crts import Time
+
+
+class CMSVersion(Integer):
+    schema = (
+        ("v0", 0),
+        ("v1", 1),
+        ("v2", 2),
+        ("v3", 3),
+        ("v4", 4),
+        ("v5", 5),
+    )
+
+
+class AttributeValue(Any):
+    pass
+
+
+class AttributeValues(SetOf):
+    schema = AttributeValue()
+
+
+class Attribute(Sequence):
+    schema = (
+        ("attrType", ObjectIdentifier()),
+        ("attrValues", AttributeValues()),
+    )
+
+
+class SignatureAlgorithmIdentifier(AlgorithmIdentifier):
+    pass
+
+
+class SignedAttributes(SetOf):
+    schema = Attribute()
+    bounds = (1, 32)
+    der_forced = True
+
+
+class SignerIdentifier(Choice):
+    schema = (
+        # ("issuerAndSerialNumber", IssuerAndSerialNumber()),
+        ("subjectKeyIdentifier", SubjectKeyIdentifier(impl=tag_ctxp(0))),
+    )
+
+
+class DigestAlgorithmIdentifiers(SetOf):
+    schema = AlgorithmIdentifier()
+
+
+class DigestAlgorithmIdentifier(AlgorithmIdentifier):
+    pass
+
+
+class SignatureValue(OctetString):
+    pass
+
+
+class SignerInfo(Sequence):
+    schema = (
+        ("version", CMSVersion()),
+        ("sid", SignerIdentifier()),
+        ("digestAlgorithm", DigestAlgorithmIdentifier()),
+        ("signedAttrs", SignedAttributes(impl=tag_ctxc(0), optional=True)),
+        ("signatureAlgorithm", SignatureAlgorithmIdentifier()),
+        ("signature", SignatureValue()),
+        # ("unsignedAttrs", UnsignedAttributes(impl=tag_ctxc(1), optional=True)),
+    )
+
+
+class SignerInfos(SetOf):
+    schema = SignerInfo()
+
+
+class ContentType(ObjectIdentifier):
+    pass
+
+
+class EncapsulatedContentInfo(Sequence):
+    schema = (
+        ("eContentType", ContentType()),
+        ("eContent", OctetString(expl=tag_ctxc(0), optional=True)),
+    )
+
+
+class CertificateChoices(Choice):
+    schema = (
+        ('certificate', Certificate()),
+        # ...
+    )
+
+
+class CertificateSet(SetOf):
+    schema = CertificateChoices()
+
+
+class SignedData(Sequence):
+    schema = (
+        ("version", CMSVersion()),
+        ("digestAlgorithms", DigestAlgorithmIdentifiers()),
+        ("encapContentInfo", EncapsulatedContentInfo()),
+        ("certificates", CertificateSet(impl=tag_ctxc(0), optional=True)),
+        # ("crls", RevocationInfoChoices(impl=tag_ctxc(1), optional=True)),
+        ("signerInfos", SignerInfos()),
+    )
+
+
+class ContentInfo(Sequence):
+    schema = (
+        ("contentType", ContentType()),
+        ("content", Any(expl=tag_ctxc(0))),
+    )
+
+
+id_signedData = ObjectIdentifier("1.2.840.113549.1.7.2")
+id_sha256 = ObjectIdentifier("2.16.840.1.101.3.4.2.1")
+id_data = ObjectIdentifier("1.2.840.113549.1.7.1")
+id_ecdsa_with_SHA256 = ObjectIdentifier("1.2.840.10045.4.3.2")
+id_pkcs9_at_contentType = ObjectIdentifier("1.2.840.113549.1.9.3")
+id_pkcs9_at_messageDigest = ObjectIdentifier("1.2.840.113549.1.9.4")
+id_ce_subjectKeyIdentifier = ObjectIdentifier("2.5.29.14")
+
+
+class TestSignedDataCER(TestCase):
+    def runTest(self):
+        # openssl ecparam -name prime256v1 -genkey -out key.pem
+        # openssl req -x509 -new -key key.pem -outform PEM -out cert.pem
+        #    -days 365 -nodes -subj "/CN=doesnotmatter"
+        with open("cert.cer", "rb") as fd:
+            cert = Certificate().decod(fd.read())
+        for ext in cert["tbsCertificate"]["extensions"]:
+            if ext["extnID"] == id_ce_subjectKeyIdentifier:
+                skid = SubjectKeyIdentifier().decod(bytes(ext["extnValue"]))
+        ai_sha256 = AlgorithmIdentifier((
+            ("algorithm", id_sha256),
+        ))
+        data = urandom(randint(1000, 3000))
+        eci = EncapsulatedContentInfo((
+            ("eContentType", ContentType(id_data)),
+            ("eContent", OctetString(data)),
+        ))
+        signed_attrs = SignedAttributes([
+            Attribute((
+                ("attrType", id_pkcs9_at_contentType),
+                ("attrValues", AttributeValues([
+                    AttributeValue(id_data.encode())
+                ])),
+            )),
+            Attribute((
+                ("attrType", id_pkcs9_at_messageDigest),
+                ("attrValues", AttributeValues([
+                    AttributeValue(OctetString(
+                        sha256(bytes(eci["eContent"])).digest()
+                    ).encode()),
+                ])),
+            )),
+        ])
+        with open("/tmp/in", "wb") as fd:
+            fd.write(encode_cer(signed_attrs))
+        self.assertEqual(0, call(" ".join((
+            "openssl dgst -sha256",
+            "-sign key.pem",
+            "-binary",
+            "/tmp/in",
+            "> /tmp/signature",
+        )), shell=True))
+        ci = ContentInfo((
+            ("contentType", ContentType(id_signedData)),
+            ("content", Any((SignedData((
+                ("version", CMSVersion("v3")),
+                ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha256])),
+                ("encapContentInfo", eci),
+                ("certificates", CertificateSet([
+                    CertificateChoices(("certificate", cert)),
+                ])),
+                ("signerInfos", SignerInfos([SignerInfo((
+                    ("version", CMSVersion("v3")),
+                    ("sid", SignerIdentifier(
+                        ("subjectKeyIdentifier", skid)
+                    )),
+                    ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha256)),
+                    ("signedAttrs", signed_attrs),
+                    ("signatureAlgorithm", SignatureAlgorithmIdentifier((
+                        ("algorithm", id_ecdsa_with_SHA256),
+                    ))),
+                    ("signature", SignatureValue(open("/tmp/signature", "rb").read())),
+                ))])),
+            ))))),
+        ))
+        with open("/tmp/out.p7m", "wb") as fd:
+            fd.write(encode_cer(ci))
+        self.assertEqual(0, call(" ".join((
+            "openssl cms -verify",
+            "-inform DER -in /tmp/out.p7m",
+            "-signer cert.pem -CAfile cert.pem",
+            "-out /dev/null",
+        )), shell=True))
index 7368d7fc707550bfe57643c87ae39ba2cf88602d..317355213365e389c5d17f5bc3a31857df8e3dd2 100644 (file)
 # You should have received a copy of the GNU Lesser General Public
 # License along with this program.  If not, see
 # <http://www.gnu.org/licenses/>.
-"""CRL related schemes, just to test the performance with them
+"""CRL related schemas, just to test the performance with them
 """
 
+from os.path import exists
+from time import time
+from unittest import skipIf
+from unittest import TestCase
+
 from pyderasn import BitString
+from pyderasn import encode_cer
 from pyderasn import Sequence
 from pyderasn import SequenceOf
 from pyderasn import tag_ctxc
@@ -60,3 +66,24 @@ class CertificateList(Sequence):
         ("signatureAlgorithm", AlgorithmIdentifier()),
         ("signatureValue", BitString()),
     )
+
+@skipIf(not exists("revoke.crl"), "CACert's revoke.crl not found")
+class TestCACert(TestCase):
+    def test_cer(self):
+        with open("revoke.crl", "rb") as fd:
+            raw = fd.read()
+        print("DER read")
+        start = time()
+        crl1 = CertificateList().decod(raw)
+        print("DER decoded", time() - start)
+        start = time()
+        cer_raw = encode_cer(crl1)
+        print("CER encoded", time() - start)
+        start = time()
+        crl2 = CertificateList().decod(cer_raw, ctx={"bered": True})
+        print("CER decoded", time() - start)
+        self.assertEqual(crl2, crl1)
+        start = time()
+        der_raw = crl2.encode()
+        print("DER encoded", time() - start)
+        self.assertSequenceEqual(der_raw, raw)
index aaf359edf2b15b093f4cbc5540c29a761d5112fe..3f753e059013eabadeaa1c77fd67aed25d3a7133 100644 (file)
@@ -27,6 +27,7 @@ from pyderasn import Any
 from pyderasn import BitString
 from pyderasn import Boolean
 from pyderasn import Choice
+from pyderasn import encode_cer
 from pyderasn import GeneralizedTime
 from pyderasn import hexdec
 from pyderasn import IA5String
@@ -154,6 +155,15 @@ class UniqueIdentifier(BitString):
     pass
 
 
+class KeyIdentifier(OctetString):
+    pass
+
+
+class SubjectKeyIdentifier(KeyIdentifier):
+    pass
+
+
+
 class Extension(Sequence):
     schema = (
         ("extnID", ObjectIdentifier()),
@@ -188,6 +198,7 @@ class Certificate(Sequence):
         ("signatureAlgorithm", AlgorithmIdentifier()),
         ("signatureValue", BitString()),
     )
+    der_forced = True
 
 
 class TestGoSelfSignedVector(TestCase):
@@ -347,6 +358,10 @@ class TestGoSelfSignedVector(TestCase):
             "998bb9a4a8cbeb34c0f0a78cf8d91ede14a5ed76bf116fe360aafa8821490435",
         ))))
         self.assertSequenceEqual(crt.encode(), raw)
+        self.assertEqual(
+            Certificate().decod(encode_cer(crt), ctx={"bered": True}),
+            crt,
+        )
 
 
 class TestGoPayPalVector(TestCase):
@@ -408,3 +423,7 @@ class TestGoPayPalVector(TestCase):
         pprint(crt)
         repr(crt)
         pickle_loads(pickle_dumps(crt, pickle_proto))
+        self.assertEqual(
+            Certificate().decod(encode_cer(crt), ctx={"bered": True}),
+            crt,
+        )
index f160ba833b1933ace9c49871984ab2585982731b..cd165b857a40b5d628e5839082895b3fdb33a18b 100644 (file)
@@ -20,7 +20,9 @@ from copy import deepcopy
 from datetime import datetime
 from datetime import timedelta
 from importlib import import_module
+from operator import attrgetter
 from os import environ
+from os import urandom
 from random import random
 from string import ascii_letters
 from string import digits
@@ -72,6 +74,7 @@ from pyderasn import BoundsError
 from pyderasn import Choice
 from pyderasn import DecodeError
 from pyderasn import DecodePathDefBy
+from pyderasn import encode_cer
 from pyderasn import Enumerated
 from pyderasn import EOC
 from pyderasn import EOC_LEN
@@ -599,11 +602,18 @@ class TestBoolean(CommonMixin, TestCase):
             pprint(obj, big_blobs=True, with_decode_path=True)
             self.assertFalse(obj.expled)
             obj_encoded = obj.encode()
+            self.assertSequenceEqual(encode_cer(obj), obj_encoded)
             obj_expled = obj(value, expl=tag_expl)
             self.assertTrue(obj_expled.expled)
             repr(obj_expled)
             list(obj_expled.pps())
             pprint(obj_expled, big_blobs=True, with_decode_path=True)
+            obj_expled_cer = encode_cer(obj_expled)
+            self.assertNotEqual(obj_expled_cer, obj_encoded)
+            self.assertSequenceEqual(
+                obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+                obj_expled.encode(),
+            )
             obj_expled_hex_encoded = obj_expled.hexencode()
             ctx_copied = deepcopy(ctx_dummy)
             obj_decoded, tail = obj_expled.hexdecode(
@@ -1113,12 +1123,19 @@ class TestInteger(CommonMixin, TestCase):
             pprint(obj, big_blobs=True, with_decode_path=True)
             self.assertFalse(obj.expled)
             obj_encoded = obj.encode()
+            self.assertSequenceEqual(encode_cer(obj), obj_encoded)
             obj_expled = obj(value, expl=tag_expl)
             self.assertTrue(obj_expled.expled)
             repr(obj_expled)
             list(obj_expled.pps())
             pprint(obj_expled, big_blobs=True, with_decode_path=True)
             obj_expled_encoded = obj_expled.encode()
+            obj_expled_cer = encode_cer(obj_expled)
+            self.assertNotEqual(obj_expled_cer, obj_encoded)
+            self.assertSequenceEqual(
+                obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+                obj_expled_encoded,
+            )
             ctx_copied = deepcopy(ctx_dummy)
             obj_decoded, tail = obj_expled.decode(
                 obj_expled_encoded + tail_junk,
@@ -1522,12 +1539,19 @@ class TestBitString(CommonMixin, TestCase):
             pprint(obj, big_blobs=True, with_decode_path=True)
             self.assertFalse(obj.expled)
             obj_encoded = obj.encode()
+            self.assertSequenceEqual(encode_cer(obj), obj_encoded)
             obj_expled = obj(value, expl=tag_expl)
             self.assertTrue(obj_expled.expled)
             repr(obj_expled)
             list(obj_expled.pps())
             pprint(obj_expled, big_blobs=True, with_decode_path=True)
             obj_expled_encoded = obj_expled.encode()
+            obj_expled_cer = encode_cer(obj_expled)
+            self.assertNotEqual(obj_expled_cer, obj_encoded)
+            self.assertSequenceEqual(
+                obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+                obj_expled_encoded,
+            )
             ctx_copied = deepcopy(ctx_dummy)
             obj_decoded, tail = obj_expled.decode(
                 obj_expled_encoded + tail_junk,
@@ -1861,6 +1885,24 @@ class TestBitString(CommonMixin, TestCase):
         self.assertTrue(obj.lenindef)
         self.assertTrue(obj.bered)
 
+    @settings(max_examples=LONG_TEST_MAX_EXAMPLES)
+    @given(integers(min_value=1000, max_value=3000))
+    def test_cer(self, data_len):
+        data = urandom(data_len)
+        encoded = encode_cer(BitString(data))
+        ctx = {"bered": True}
+        self.assertSequenceEqual(bytes(BitString().decod(encoded, ctx=ctx)), data)
+        evgens = list(BitString().decode_evgen(encoded, ctx=ctx))
+        evgens_expected = data_len // 999
+        if evgens_expected * 999 != data_len:
+            evgens_expected += 1
+        evgens_expected += 1
+        self.assertEqual(len(evgens), evgens_expected)
+        for (_, obj, _) in evgens[:-2]:
+            self.assertEqual(obj.vlen, 1000)
+        _, obj, _ = evgens[-2]
+        self.assertEqual(obj.vlen, 1 + data_len - len(evgens[:-2]) * 999)
+
 
 @composite
 def octet_string_values_strategy(draw, do_expl=False):
@@ -2153,12 +2195,19 @@ class TestOctetString(CommonMixin, TestCase):
             pprint(obj, big_blobs=True, with_decode_path=True)
             self.assertFalse(obj.expled)
             obj_encoded = obj.encode()
+            self.assertSequenceEqual(encode_cer(obj), obj_encoded)
             obj_expled = obj(value, expl=tag_expl)
             self.assertTrue(obj_expled.expled)
             repr(obj_expled)
             list(obj_expled.pps())
             pprint(obj_expled, big_blobs=True, with_decode_path=True)
             obj_expled_encoded = obj_expled.encode()
+            obj_expled_cer = encode_cer(obj_expled)
+            self.assertNotEqual(obj_expled_cer, obj_encoded)
+            self.assertSequenceEqual(
+                obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+                obj_expled_encoded,
+            )
             ctx_copied = deepcopy(ctx_dummy)
             obj_decoded, tail = obj_expled.decode(
                 obj_expled_encoded + tail_junk,
@@ -2349,6 +2398,24 @@ class TestOctetString(CommonMixin, TestCase):
         self.assertEqual(err.exception.decode_path, decode_path + (str(chunks),))
         self.assertEqual(err.exception.offset, offset + 1 + 1 + chunks * len(bs))
 
+    @settings(max_examples=LONG_TEST_MAX_EXAMPLES)
+    @given(integers(min_value=1001, max_value=3000))
+    def test_cer(self, data_len):
+        data = urandom(data_len)
+        encoded = encode_cer(OctetString(data))
+        ctx = {"bered": True}
+        self.assertSequenceEqual(bytes(OctetString().decod(encoded, ctx=ctx)), data)
+        evgens = list(OctetString().decode_evgen(encoded, ctx=ctx))
+        evgens_expected = data_len // 1000
+        if evgens_expected * 1000 != data_len:
+            evgens_expected += 1
+        evgens_expected += 1
+        self.assertEqual(len(evgens), evgens_expected)
+        for (_, obj, _) in evgens[:-2]:
+            self.assertEqual(obj.vlen, 1000)
+        _, obj, _ = evgens[-2]
+        self.assertEqual(obj.vlen, data_len - len(evgens[:-2]) * 1000)
+
 
 @composite
 def null_values_strategy(draw, do_expl=False):
@@ -2498,12 +2565,19 @@ class TestNull(CommonMixin, TestCase):
             pprint(obj, big_blobs=True, with_decode_path=True)
             self.assertFalse(obj.expled)
             obj_encoded = obj.encode()
+            self.assertSequenceEqual(encode_cer(obj), obj_encoded)
             obj_expled = obj(expl=tag_expl)
             self.assertTrue(obj_expled.expled)
             repr(obj_expled)
             list(obj_expled.pps())
             pprint(obj_expled, big_blobs=True, with_decode_path=True)
             obj_expled_encoded = obj_expled.encode()
+            obj_expled_cer = encode_cer(obj_expled)
+            self.assertNotEqual(obj_expled_cer, obj_encoded)
+            self.assertSequenceEqual(
+                obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+                obj_expled_encoded,
+            )
             ctx_copied = deepcopy(ctx_dummy)
             obj_decoded, tail = obj_expled.decode(
                 obj_expled_encoded + tail_junk,
@@ -2857,12 +2931,19 @@ class TestObjectIdentifier(CommonMixin, TestCase):
             pprint(obj, big_blobs=True, with_decode_path=True)
             self.assertFalse(obj.expled)
             obj_encoded = obj.encode()
+            self.assertSequenceEqual(encode_cer(obj), obj_encoded)
             obj_expled = obj(value, expl=tag_expl)
             self.assertTrue(obj_expled.expled)
             repr(obj_expled)
             list(obj_expled.pps())
             pprint(obj_expled, big_blobs=True, with_decode_path=True)
             obj_expled_encoded = obj_expled.encode()
+            obj_expled_cer = encode_cer(obj_expled)
+            self.assertNotEqual(obj_expled_cer, obj_encoded)
+            self.assertSequenceEqual(
+                obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+                obj_expled_encoded,
+            )
             ctx_copied = deepcopy(ctx_dummy)
             obj_decoded, tail = obj_expled.decode(
                 obj_expled_encoded + tail_junk,
@@ -5956,6 +6037,12 @@ class SeqMixing(object):
         pprint(seq, big_blobs=True, with_decode_path=True)
         self.assertTrue(seq.ready)
         seq_encoded = seq.encode()
+        seq_encoded_cer = encode_cer(seq)
+        self.assertNotEqual(seq_encoded_cer, seq_encoded)
+        self.assertSequenceEqual(
+            seq.decod(seq_encoded_cer, ctx={"bered": True}).encode(),
+            seq_encoded,
+        )
         seq_decoded, tail = seq.decode(seq_encoded + tail_junk)
         self.assertFalse(seq_decoded.lenindef)
         self.assertFalse(seq_decoded.ber_encoded)
@@ -6671,6 +6758,12 @@ class SeqOfMixing(object):
         pprint(obj, big_blobs=True, with_decode_path=True)
         self.assertFalse(obj.expled)
         obj_encoded = obj.encode()
+        obj_encoded_cer = encode_cer(obj)
+        self.assertNotEqual(obj_encoded_cer, obj_encoded)
+        self.assertSequenceEqual(
+            obj.decod(obj_encoded_cer, ctx={"bered": True}).encode(),
+            obj_encoded,
+        )
         obj_expled = obj(value, expl=tag_expl)
         self.assertTrue(obj_expled.expled)
         repr(obj_expled)
@@ -7478,3 +7571,49 @@ class TestPickleDifferentVersion(TestCase):
             pickle_loads(pickled)
         pyderasn.__version__ = version_orig
         pickle_loads(pickled)
+
+
+class TestCERSetOrdering(TestCase):
+    def test_vectors(self):
+        """Taken from X.690-201508
+        """
+        class B(Choice):
+            schema = (
+                ("c", Integer(impl=tag_ctxp(2))),
+                ("d", Integer(impl=tag_ctxp(4))),
+            )
+
+        class F(Choice):
+            schema = (
+                ("g", Integer(impl=tag_ctxp(5))),
+                ("h", Integer(impl=tag_ctxp(6))),
+            )
+
+        class I(Choice):
+            schema = (
+                ("j", Integer(impl=tag_ctxp(0))),
+            )
+
+        class E(Choice):
+            schema = (
+                ("f", F()),
+                ("i", I()),
+            )
+
+        class A(Set):
+            schema = (
+                ("a", Integer(impl=tag_ctxp(3))),
+                ("b", B(expl=tag_ctxc(1))),
+                ("e", E()),
+            )
+
+        a = A((
+            ("a", Integer(123)),
+            ("b", B(("d", Integer(234)))),
+            ("e", E(("f", F(("g", Integer(345)))))),
+        ))
+        order = sorted(a._values_for_encoding(), key=attrgetter("tag_order_cer"))
+        self.assertSequenceEqual(
+            [i.__class__.__name__ for i in order],
+            ("E", "B", "Integer"),
+        )