include tests/compli_test_suite/pdf/free_asn1_testsuite.pdf
include tests/compli_test_suite/README.md
include tests/compli_test_suite/suite/tc*.ber
+include tests/test_cms.py
include tests/test_compli.py
include tests/test_crl.py
include tests/test_crts.py
same tag to be successfully decoded
* Fixed possibly invalid SET DER encoding where objects were not sorted
by tag, but by encoded representation
-* Any does not allow empty data value now. Now it checks if it has valid
- ASN.1 tag
+* ``Any`` does not allow empty data value now. Now it checks if it has
+ valid ASN.1 tag
+* ``Any`` allows an ordinary ``Obj`` storing, without its forceful
+ encoded representation storage
* Initial support for so called ``evgen_mode``: event generation mode,
where no in-memory objects storing happens, giving ability to process
ASN.1 data without fully parsing it first
+* Initial experimental CER encoding mode, allowing streaming encoding of
+ the data directly to some writeable object
.. _release6.3:
.. autofunction:: pyderasn.abs_decode_path
.. autofunction:: pyderasn.colonize_hex
+.. autofunction:: pyderasn.encode_cer
.. autofunction:: pyderasn.hexenc
.. autofunction:: pyderasn.hexdec
.. autofunction:: pyderasn.tag_encode
from copy import copy
from datetime import datetime
from datetime import timedelta
+from io import BytesIO
from math import ceil
from operator import attrgetter
from string import ascii_letters
"Choice",
"DecodeError",
"DecodePathDefBy",
+ "encode_cer",
"Enumerated",
"ExceedingData",
"GeneralizedTime",
return l, 1 + octets_num, data[1 + octets_num:]
+LEN1K = len_encode(1000)
+
+
+def write_full(writer, data):
+ """Fully write provided data
+
+ BytesIO does not guarantee that the whole data will be written at once.
+ """
+ data = memoryview(data)
+ written = 0
+ while written != len(data):
+ n = writer(data[written:])
+ if n is None:
+ raise ValueError("can not write to buf")
+ written += n
+
+
########################################################################
# Base class
########################################################################
"""
return self._tag_order
+ @property
+ def tag_order_cer(self):
+ return self.tag_order
+
@property
def tlen(self):
"""See :ref:`decoding`
return raw
return b"".join((self._expl, len_encode(len(raw)), raw))
+ def encode_cer(self, writer):
+ if self._expl is not None:
+ write_full(writer, self._expl + LENINDEF)
+ if getattr(self, "der_forced", False):
+ write_full(writer, self._encode())
+ else:
+ self._encode_cer(writer)
+ if self._expl is not None:
+ write_full(writer, EOC)
+
+ def _encode_cer(self, writer):
+ write_full(writer, self._encode())
+
def hexencode(self):
"""Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
"""
)
+def encode_cer(obj):
+ """Encode to CER in memory
+ """
+ buf = BytesIO()
+ obj.encode_cer(buf.write)
+ return buf.getvalue()
+
+
class DecodePathDefBy(object):
"""DEFINED BY representation inside decode path
"""
octets,
))
+ def _encode_cer(self, writer):
+ bit_len, octets = self._value
+ if len(octets) + 1 <= 1000:
+ write_full(writer, self._encode())
+ return
+ write_full(writer, self.tag_constructed)
+ write_full(writer, LENINDEF)
+ for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
+ write_full(writer, b"".join((
+ BitString.tag_default,
+ LEN1K,
+ int2byte(0),
+ octets[offset:offset + 999],
+ )))
+ tail = octets[offset+999:]
+ if len(tail) > 0:
+ tail = int2byte((8 - bit_len % 8) % 8) + tail
+ write_full(writer, b"".join((
+ BitString.tag_default,
+ len_encode(len(tail)),
+ tail,
+ )))
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
self._value,
))
+ def _encode_cer(self, writer):
+ octets = self._value
+ if len(octets) <= 1000:
+ write_full(writer, self._encode())
+ return
+ write_full(writer, self.tag_constructed)
+ write_full(writer, LENINDEF)
+ for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
+ write_full(writer, b"".join((
+ OctetString.tag_default,
+ LEN1K,
+ octets[offset:offset + 1000],
+ )))
+ tail = octets[offset+1000:]
+ if len(tail) > 0:
+ write_full(writer, b"".join((
+ OctetString.tag_default,
+ len_encode(len(tail)),
+ tail,
+ )))
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
value = self._encode_time()
return b"".join((self.tag, len_encode(len(value)), value))
+ def _encode_cer(self, writer):
+ write_full(writer, self._encode())
+
def todatetime(self):
return self._value
self._assert_ready()
return self._value[1].tag_order if self._tag_order is None else self._tag_order
+ @property
+ def tag_order_cer(self):
+ return min(v.tag_order_cer for v in itervalues(self.specs))
+
def __getitem__(self, key):
if key not in self.specs:
raise ObjUnknown(key)
self._assert_ready()
return self._value[1].encode()
+ def _encode_cer(self, writer):
+ self._assert_ready()
+ self._value[1].encode_cer(writer)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
for choice, spec in iteritems(self.specs):
sub_decode_path = decode_path + (choice,)
"""``ANY`` special type
>>> Any(Integer(-123))
- ANY 020185
+ ANY INTEGER -123 (0X:7B)
>>> a = Any(OctetString(b"hello world").encode())
ANY 040b68656c6c6f20776f726c64
>>> hexenc(bytes(a))
return value
if isinstance(value, self.__class__):
return value._value
- if isinstance(value, Obj):
- return value.encode()
- raise InvalidValueType((self.__class__, Obj, binary_type))
+ if not isinstance(value, Obj):
+ raise InvalidValueType((self.__class__, Obj, binary_type))
+ return value
@property
def ready(self):
def __eq__(self, their):
if their.__class__ == binary_type:
- return self._value == their
+ if self._value.__class__ == binary_type:
+ return self._value == their
+ return self._value.encode() == their
if issubclass(their.__class__, Any):
- return self._value == their._value
+ if self.ready and their.ready:
+ return bytes(self) == bytes(their)
+ return self.ready == their.ready
return False
def __call__(
def __bytes__(self):
self._assert_ready()
- return self._value
+ value = self._value
+ if value.__class__ == binary_type:
+ return value
+ return self._value.encode()
@property
def tlen(self):
def _encode(self):
self._assert_ready()
- return self._value
+ value = self._value
+ if value.__class__ == binary_type:
+ return value
+ return value.encode()
+
+ def _encode_cer(self, writer):
+ self._assert_ready()
+ value = self._value
+ if value.__class__ == binary_type:
+ write_full(writer, value)
+ else:
+ value.encode_cer(writer)
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
return pp_console_row(next(self.pps()))
def pps(self, decode_path=()):
+ value = self._value
+ if value is None:
+ pass
+ elif value.__class__ == binary_type:
+ value = None
+ else:
+ value = repr(value)
yield _pp(
obj=self,
asn1_type_name=self.asn1_type_name,
obj_name=self.__class__.__name__,
decode_path=decode_path,
- blob=self._value if self.ready else None,
+ value=value,
+ blob=self._value if self._value.__class__ == binary_type else None,
optional=self.optional,
default=self == self.default,
impl=None if self.tag == self.tag_default else tag_decode(self.tag),
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in self._values_for_encoding():
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
))
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in sorted(
+ self._values_for_encoding(),
+ key=attrgetter("tag_order_cer"),
+ ):
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
v = b"".join(v.encode() for v in self._values_for_encoding())
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in self._values_for_encoding():
+ v.encode_cer(writer)
+ write_full(writer, EOC)
+
def _decode(
self,
tlv,
v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
return b"".join((self.tag, len_encode(len(v)), v))
+ def _encode_cer(self, writer):
+ write_full(writer, self.tag + LENINDEF)
+ for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
+ write_full(writer, v)
+ write_full(writer, EOC)
+
def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
return super(SetOf, self)._decode(
tlv,
--- /dev/null
+# coding: utf-8
+# PyDERASN -- Python ASN.1 DER codec with abstract structures
+# Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as
+# published by the Free Software Foundation, version 3 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+
+from datetime import datetime
+from hashlib import sha256
+from os import urandom
+from random import randint
+from subprocess import call
+from unittest import TestCase
+
+from pyderasn import Any
+from pyderasn import Choice
+from pyderasn import encode_cer
+from pyderasn import Integer
+from pyderasn import ObjectIdentifier
+from pyderasn import OctetString
+from pyderasn import Sequence
+from pyderasn import SetOf
+from pyderasn import tag_ctxc
+from pyderasn import tag_ctxp
+from pyderasn import UTCTime
+from tests.test_crts import AlgorithmIdentifier
+from tests.test_crts import Certificate
+from tests.test_crts import SubjectKeyIdentifier
+from tests.test_crts import Time
+
+
+class CMSVersion(Integer):
+ schema = (
+ ("v0", 0),
+ ("v1", 1),
+ ("v2", 2),
+ ("v3", 3),
+ ("v4", 4),
+ ("v5", 5),
+ )
+
+
+class AttributeValue(Any):
+ pass
+
+
+class AttributeValues(SetOf):
+ schema = AttributeValue()
+
+
+class Attribute(Sequence):
+ schema = (
+ ("attrType", ObjectIdentifier()),
+ ("attrValues", AttributeValues()),
+ )
+
+
+class SignatureAlgorithmIdentifier(AlgorithmIdentifier):
+ pass
+
+
+class SignedAttributes(SetOf):
+ schema = Attribute()
+ bounds = (1, 32)
+ der_forced = True
+
+
+class SignerIdentifier(Choice):
+ schema = (
+ # ("issuerAndSerialNumber", IssuerAndSerialNumber()),
+ ("subjectKeyIdentifier", SubjectKeyIdentifier(impl=tag_ctxp(0))),
+ )
+
+
+class DigestAlgorithmIdentifiers(SetOf):
+ schema = AlgorithmIdentifier()
+
+
+class DigestAlgorithmIdentifier(AlgorithmIdentifier):
+ pass
+
+
+class SignatureValue(OctetString):
+ pass
+
+
+class SignerInfo(Sequence):
+ schema = (
+ ("version", CMSVersion()),
+ ("sid", SignerIdentifier()),
+ ("digestAlgorithm", DigestAlgorithmIdentifier()),
+ ("signedAttrs", SignedAttributes(impl=tag_ctxc(0), optional=True)),
+ ("signatureAlgorithm", SignatureAlgorithmIdentifier()),
+ ("signature", SignatureValue()),
+ # ("unsignedAttrs", UnsignedAttributes(impl=tag_ctxc(1), optional=True)),
+ )
+
+
+class SignerInfos(SetOf):
+ schema = SignerInfo()
+
+
+class ContentType(ObjectIdentifier):
+ pass
+
+
+class EncapsulatedContentInfo(Sequence):
+ schema = (
+ ("eContentType", ContentType()),
+ ("eContent", OctetString(expl=tag_ctxc(0), optional=True)),
+ )
+
+
+class CertificateChoices(Choice):
+ schema = (
+ ('certificate', Certificate()),
+ # ...
+ )
+
+
+class CertificateSet(SetOf):
+ schema = CertificateChoices()
+
+
+class SignedData(Sequence):
+ schema = (
+ ("version", CMSVersion()),
+ ("digestAlgorithms", DigestAlgorithmIdentifiers()),
+ ("encapContentInfo", EncapsulatedContentInfo()),
+ ("certificates", CertificateSet(impl=tag_ctxc(0), optional=True)),
+ # ("crls", RevocationInfoChoices(impl=tag_ctxc(1), optional=True)),
+ ("signerInfos", SignerInfos()),
+ )
+
+
+class ContentInfo(Sequence):
+ schema = (
+ ("contentType", ContentType()),
+ ("content", Any(expl=tag_ctxc(0))),
+ )
+
+
+id_signedData = ObjectIdentifier("1.2.840.113549.1.7.2")
+id_sha256 = ObjectIdentifier("2.16.840.1.101.3.4.2.1")
+id_data = ObjectIdentifier("1.2.840.113549.1.7.1")
+id_ecdsa_with_SHA256 = ObjectIdentifier("1.2.840.10045.4.3.2")
+id_pkcs9_at_contentType = ObjectIdentifier("1.2.840.113549.1.9.3")
+id_pkcs9_at_messageDigest = ObjectIdentifier("1.2.840.113549.1.9.4")
+id_ce_subjectKeyIdentifier = ObjectIdentifier("2.5.29.14")
+
+
+class TestSignedDataCER(TestCase):
+ def runTest(self):
+ # openssl ecparam -name prime256v1 -genkey -out key.pem
+ # openssl req -x509 -new -key key.pem -outform PEM -out cert.pem
+ # -days 365 -nodes -subj "/CN=doesnotmatter"
+ with open("cert.cer", "rb") as fd:
+ cert = Certificate().decod(fd.read())
+ for ext in cert["tbsCertificate"]["extensions"]:
+ if ext["extnID"] == id_ce_subjectKeyIdentifier:
+ skid = SubjectKeyIdentifier().decod(bytes(ext["extnValue"]))
+ ai_sha256 = AlgorithmIdentifier((
+ ("algorithm", id_sha256),
+ ))
+ data = urandom(randint(1000, 3000))
+ eci = EncapsulatedContentInfo((
+ ("eContentType", ContentType(id_data)),
+ ("eContent", OctetString(data)),
+ ))
+ signed_attrs = SignedAttributes([
+ Attribute((
+ ("attrType", id_pkcs9_at_contentType),
+ ("attrValues", AttributeValues([
+ AttributeValue(id_data.encode())
+ ])),
+ )),
+ Attribute((
+ ("attrType", id_pkcs9_at_messageDigest),
+ ("attrValues", AttributeValues([
+ AttributeValue(OctetString(
+ sha256(bytes(eci["eContent"])).digest()
+ ).encode()),
+ ])),
+ )),
+ ])
+ with open("/tmp/in", "wb") as fd:
+ fd.write(encode_cer(signed_attrs))
+ self.assertEqual(0, call(" ".join((
+ "openssl dgst -sha256",
+ "-sign key.pem",
+ "-binary",
+ "/tmp/in",
+ "> /tmp/signature",
+ )), shell=True))
+ ci = ContentInfo((
+ ("contentType", ContentType(id_signedData)),
+ ("content", Any((SignedData((
+ ("version", CMSVersion("v3")),
+ ("digestAlgorithms", DigestAlgorithmIdentifiers([ai_sha256])),
+ ("encapContentInfo", eci),
+ ("certificates", CertificateSet([
+ CertificateChoices(("certificate", cert)),
+ ])),
+ ("signerInfos", SignerInfos([SignerInfo((
+ ("version", CMSVersion("v3")),
+ ("sid", SignerIdentifier(
+ ("subjectKeyIdentifier", skid)
+ )),
+ ("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha256)),
+ ("signedAttrs", signed_attrs),
+ ("signatureAlgorithm", SignatureAlgorithmIdentifier((
+ ("algorithm", id_ecdsa_with_SHA256),
+ ))),
+ ("signature", SignatureValue(open("/tmp/signature", "rb").read())),
+ ))])),
+ ))))),
+ ))
+ with open("/tmp/out.p7m", "wb") as fd:
+ fd.write(encode_cer(ci))
+ self.assertEqual(0, call(" ".join((
+ "openssl cms -verify",
+ "-inform DER -in /tmp/out.p7m",
+ "-signer cert.pem -CAfile cert.pem",
+ "-out /dev/null",
+ )), shell=True))
# You should have received a copy of the GNU Lesser General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
-"""CRL related schemes, just to test the performance with them
+"""CRL related schemas, just to test the performance with them
"""
+from os.path import exists
+from time import time
+from unittest import skipIf
+from unittest import TestCase
+
from pyderasn import BitString
+from pyderasn import encode_cer
from pyderasn import Sequence
from pyderasn import SequenceOf
from pyderasn import tag_ctxc
("signatureAlgorithm", AlgorithmIdentifier()),
("signatureValue", BitString()),
)
+
+@skipIf(not exists("revoke.crl"), "CACert's revoke.crl not found")
+class TestCACert(TestCase):
+ def test_cer(self):
+ with open("revoke.crl", "rb") as fd:
+ raw = fd.read()
+ print("DER read")
+ start = time()
+ crl1 = CertificateList().decod(raw)
+ print("DER decoded", time() - start)
+ start = time()
+ cer_raw = encode_cer(crl1)
+ print("CER encoded", time() - start)
+ start = time()
+ crl2 = CertificateList().decod(cer_raw, ctx={"bered": True})
+ print("CER decoded", time() - start)
+ self.assertEqual(crl2, crl1)
+ start = time()
+ der_raw = crl2.encode()
+ print("DER encoded", time() - start)
+ self.assertSequenceEqual(der_raw, raw)
from pyderasn import BitString
from pyderasn import Boolean
from pyderasn import Choice
+from pyderasn import encode_cer
from pyderasn import GeneralizedTime
from pyderasn import hexdec
from pyderasn import IA5String
pass
+class KeyIdentifier(OctetString):
+ pass
+
+
+class SubjectKeyIdentifier(KeyIdentifier):
+ pass
+
+
+
class Extension(Sequence):
schema = (
("extnID", ObjectIdentifier()),
("signatureAlgorithm", AlgorithmIdentifier()),
("signatureValue", BitString()),
)
+ der_forced = True
class TestGoSelfSignedVector(TestCase):
"998bb9a4a8cbeb34c0f0a78cf8d91ede14a5ed76bf116fe360aafa8821490435",
))))
self.assertSequenceEqual(crt.encode(), raw)
+ self.assertEqual(
+ Certificate().decod(encode_cer(crt), ctx={"bered": True}),
+ crt,
+ )
class TestGoPayPalVector(TestCase):
pprint(crt)
repr(crt)
pickle_loads(pickle_dumps(crt, pickle_proto))
+ self.assertEqual(
+ Certificate().decod(encode_cer(crt), ctx={"bered": True}),
+ crt,
+ )
from datetime import datetime
from datetime import timedelta
from importlib import import_module
+from operator import attrgetter
from os import environ
+from os import urandom
from random import random
from string import ascii_letters
from string import digits
from pyderasn import Choice
from pyderasn import DecodeError
from pyderasn import DecodePathDefBy
+from pyderasn import encode_cer
from pyderasn import Enumerated
from pyderasn import EOC
from pyderasn import EOC_LEN
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
repr(obj_expled)
list(obj_expled.pps())
pprint(obj_expled, big_blobs=True, with_decode_path=True)
+ obj_expled_cer = encode_cer(obj_expled)
+ self.assertNotEqual(obj_expled_cer, obj_encoded)
+ self.assertSequenceEqual(
+ obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+ obj_expled.encode(),
+ )
obj_expled_hex_encoded = obj_expled.hexencode()
ctx_copied = deepcopy(ctx_dummy)
obj_decoded, tail = obj_expled.hexdecode(
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
repr(obj_expled)
list(obj_expled.pps())
pprint(obj_expled, big_blobs=True, with_decode_path=True)
obj_expled_encoded = obj_expled.encode()
+ obj_expled_cer = encode_cer(obj_expled)
+ self.assertNotEqual(obj_expled_cer, obj_encoded)
+ self.assertSequenceEqual(
+ obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+ obj_expled_encoded,
+ )
ctx_copied = deepcopy(ctx_dummy)
obj_decoded, tail = obj_expled.decode(
obj_expled_encoded + tail_junk,
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
repr(obj_expled)
list(obj_expled.pps())
pprint(obj_expled, big_blobs=True, with_decode_path=True)
obj_expled_encoded = obj_expled.encode()
+ obj_expled_cer = encode_cer(obj_expled)
+ self.assertNotEqual(obj_expled_cer, obj_encoded)
+ self.assertSequenceEqual(
+ obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+ obj_expled_encoded,
+ )
ctx_copied = deepcopy(ctx_dummy)
obj_decoded, tail = obj_expled.decode(
obj_expled_encoded + tail_junk,
self.assertTrue(obj.lenindef)
self.assertTrue(obj.bered)
+ @settings(max_examples=LONG_TEST_MAX_EXAMPLES)
+ @given(integers(min_value=1000, max_value=3000))
+ def test_cer(self, data_len):
+ data = urandom(data_len)
+ encoded = encode_cer(BitString(data))
+ ctx = {"bered": True}
+ self.assertSequenceEqual(bytes(BitString().decod(encoded, ctx=ctx)), data)
+ evgens = list(BitString().decode_evgen(encoded, ctx=ctx))
+ evgens_expected = data_len // 999
+ if evgens_expected * 999 != data_len:
+ evgens_expected += 1
+ evgens_expected += 1
+ self.assertEqual(len(evgens), evgens_expected)
+ for (_, obj, _) in evgens[:-2]:
+ self.assertEqual(obj.vlen, 1000)
+ _, obj, _ = evgens[-2]
+ self.assertEqual(obj.vlen, 1 + data_len - len(evgens[:-2]) * 999)
+
@composite
def octet_string_values_strategy(draw, do_expl=False):
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
repr(obj_expled)
list(obj_expled.pps())
pprint(obj_expled, big_blobs=True, with_decode_path=True)
obj_expled_encoded = obj_expled.encode()
+ obj_expled_cer = encode_cer(obj_expled)
+ self.assertNotEqual(obj_expled_cer, obj_encoded)
+ self.assertSequenceEqual(
+ obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+ obj_expled_encoded,
+ )
ctx_copied = deepcopy(ctx_dummy)
obj_decoded, tail = obj_expled.decode(
obj_expled_encoded + tail_junk,
self.assertEqual(err.exception.decode_path, decode_path + (str(chunks),))
self.assertEqual(err.exception.offset, offset + 1 + 1 + chunks * len(bs))
+ @settings(max_examples=LONG_TEST_MAX_EXAMPLES)
+ @given(integers(min_value=1001, max_value=3000))
+ def test_cer(self, data_len):
+ data = urandom(data_len)
+ encoded = encode_cer(OctetString(data))
+ ctx = {"bered": True}
+ self.assertSequenceEqual(bytes(OctetString().decod(encoded, ctx=ctx)), data)
+ evgens = list(OctetString().decode_evgen(encoded, ctx=ctx))
+ evgens_expected = data_len // 1000
+ if evgens_expected * 1000 != data_len:
+ evgens_expected += 1
+ evgens_expected += 1
+ self.assertEqual(len(evgens), evgens_expected)
+ for (_, obj, _) in evgens[:-2]:
+ self.assertEqual(obj.vlen, 1000)
+ _, obj, _ = evgens[-2]
+ self.assertEqual(obj.vlen, data_len - len(evgens[:-2]) * 1000)
+
@composite
def null_values_strategy(draw, do_expl=False):
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(expl=tag_expl)
self.assertTrue(obj_expled.expled)
repr(obj_expled)
list(obj_expled.pps())
pprint(obj_expled, big_blobs=True, with_decode_path=True)
obj_expled_encoded = obj_expled.encode()
+ obj_expled_cer = encode_cer(obj_expled)
+ self.assertNotEqual(obj_expled_cer, obj_encoded)
+ self.assertSequenceEqual(
+ obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+ obj_expled_encoded,
+ )
ctx_copied = deepcopy(ctx_dummy)
obj_decoded, tail = obj_expled.decode(
obj_expled_encoded + tail_junk,
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
repr(obj_expled)
list(obj_expled.pps())
pprint(obj_expled, big_blobs=True, with_decode_path=True)
obj_expled_encoded = obj_expled.encode()
+ obj_expled_cer = encode_cer(obj_expled)
+ self.assertNotEqual(obj_expled_cer, obj_encoded)
+ self.assertSequenceEqual(
+ obj_expled.decod(obj_expled_cer, ctx={"bered": True}).encode(),
+ obj_expled_encoded,
+ )
ctx_copied = deepcopy(ctx_dummy)
obj_decoded, tail = obj_expled.decode(
obj_expled_encoded + tail_junk,
pprint(seq, big_blobs=True, with_decode_path=True)
self.assertTrue(seq.ready)
seq_encoded = seq.encode()
+ seq_encoded_cer = encode_cer(seq)
+ self.assertNotEqual(seq_encoded_cer, seq_encoded)
+ self.assertSequenceEqual(
+ seq.decod(seq_encoded_cer, ctx={"bered": True}).encode(),
+ seq_encoded,
+ )
seq_decoded, tail = seq.decode(seq_encoded + tail_junk)
self.assertFalse(seq_decoded.lenindef)
self.assertFalse(seq_decoded.ber_encoded)
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ obj_encoded_cer = encode_cer(obj)
+ self.assertNotEqual(obj_encoded_cer, obj_encoded)
+ self.assertSequenceEqual(
+ obj.decod(obj_encoded_cer, ctx={"bered": True}).encode(),
+ obj_encoded,
+ )
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
repr(obj_expled)
pickle_loads(pickled)
pyderasn.__version__ = version_orig
pickle_loads(pickled)
+
+
+class TestCERSetOrdering(TestCase):
+ def test_vectors(self):
+ """Taken from X.690-201508
+ """
+ class B(Choice):
+ schema = (
+ ("c", Integer(impl=tag_ctxp(2))),
+ ("d", Integer(impl=tag_ctxp(4))),
+ )
+
+ class F(Choice):
+ schema = (
+ ("g", Integer(impl=tag_ctxp(5))),
+ ("h", Integer(impl=tag_ctxp(6))),
+ )
+
+ class I(Choice):
+ schema = (
+ ("j", Integer(impl=tag_ctxp(0))),
+ )
+
+ class E(Choice):
+ schema = (
+ ("f", F()),
+ ("i", I()),
+ )
+
+ class A(Set):
+ schema = (
+ ("a", Integer(impl=tag_ctxp(3))),
+ ("b", B(expl=tag_ctxc(1))),
+ ("e", E()),
+ )
+
+ a = A((
+ ("a", Integer(123)),
+ ("b", B(("d", Integer(234)))),
+ ("e", E(("f", F(("g", Integer(345)))))),
+ ))
+ order = sorted(a._values_for_encoding(), key=attrgetter("tag_order_cer"))
+ self.assertSequenceEqual(
+ [i.__class__.__name__ for i in order],
+ ("E", "B", "Integer"),
+ )