# coding: utf-8
-# PyDERASN -- Python ASN.1 DER/BER codec with abstract structures
+# PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
# Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
#
# This program is free software: you can redistribute it and/or modify
from datetime import datetime
from datetime import timedelta
from importlib import import_module
+from io import BytesIO
from operator import attrgetter
from os import environ
from os import urandom
from pyderasn import Choice
from pyderasn import DecodeError
from pyderasn import DecodePathDefBy
+from pyderasn import encode2pass
from pyderasn import encode_cer
from pyderasn import Enumerated
from pyderasn import EOC
pprint(obj, big_blobs=True, with_decode_path=True)
with self.assertRaises(ObjNotReady) as err:
obj.encode()
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(obj)
repr(err.exception)
obj = Boolean(value)
self.assertTrue(obj.ready)
obj = Boolean(value, impl=tag_impl)
with self.assertRaises(NotEnoughData):
obj.decode(obj.encode()[:-1])
+ with self.assertRaises(NotEnoughData):
+ obj.decode(encode2pass(obj)[:-1])
@given(
booleans(),
obj = Boolean(value, expl=tag_expl)
with self.assertRaises(NotEnoughData):
obj.decode(obj.encode()[:-1])
+ with self.assertRaises(NotEnoughData):
+ obj.decode(encode2pass(obj)[:-1])
@given(
integers(min_value=31),
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
pprint(obj, big_blobs=True, with_decode_path=True)
with self.assertRaises(ObjNotReady) as err:
obj.encode()
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(obj)
repr(err.exception)
obj = Integer(value)
self.assertTrue(obj.ready)
Integer(values[0]).encode()
)
repr(err.exception)
+ with assertRaisesRegex(self, DecodeError, "bounds") as err:
+ Integer(bounds=(values[1], values[2])).decode(
+ encode2pass(Integer(values[0]))
+ )
with self.assertRaises(BoundsError) as err:
Integer(value=values[2], bounds=(values[0], values[1]))
repr(err.exception)
Integer(values[2]).encode()
)
repr(err.exception)
+ with assertRaisesRegex(self, DecodeError, "bounds") as err:
+ Integer(bounds=(values[0], values[1])).decode(
+ encode2pass(Integer(values[2]))
+ )
@given(data_strategy())
def test_call(self, d):
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
with self.assertRaises(ObjNotReady) as err:
obj.encode()
repr(err.exception)
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(obj)
obj = BitString(value)
self.assertTrue(obj.ready)
repr(obj)
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
with self.assertRaises(ObjNotReady) as err:
obj.encode()
repr(err.exception)
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(obj)
obj = OctetString(value)
self.assertTrue(obj.ready)
repr(obj)
OctetString(value).encode()
)
repr(err.exception)
+ with assertRaisesRegex(self, DecodeError, "bounds") as err:
+ OctetString(bounds=(bound_min, bound_max)).decode(
+ encode2pass(OctetString(value))
+ )
value = d.draw(binary(min_size=bound_max + 1))
with self.assertRaises(BoundsError) as err:
OctetString(value=value, bounds=(bound_min, bound_max))
OctetString(value).encode()
)
repr(err.exception)
+ with assertRaisesRegex(self, DecodeError, "bounds") as err:
+ OctetString(bounds=(bound_min, bound_max)).decode(
+ encode2pass(OctetString(value))
+ )
@given(data_strategy())
def test_call(self, d):
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(expl=tag_expl)
self.assertTrue(obj_expled.expled)
with self.assertRaises(ObjNotReady) as err:
obj.encode()
repr(err.exception)
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(obj)
obj = ObjectIdentifier(value)
self.assertTrue(obj.ready)
self.assertFalse(obj.ber_encoded)
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
self.assertSequenceEqual(encode_cer(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
repr(obj_expled)
with self.assertRaises(ObjNotReady) as err:
obj.encode()
repr(err.exception)
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(obj)
value = d.draw(text(alphabet=self.text_alphabet()))
obj = self.base_klass(value)
self.assertTrue(obj.ready)
self.base_klass(value).encode()
)
repr(err.exception)
+ with assertRaisesRegex(self, DecodeError, "bounds") as err:
+ self.base_klass(bounds=(bound_min, bound_max)).decode(
+ encode2pass(self.base_klass(value))
+ )
value = d.draw(text(alphabet=self.text_alphabet(), min_size=bound_max + 1))
with self.assertRaises(BoundsError) as err:
self.base_klass(value=value, bounds=(bound_min, bound_max))
self.base_klass(value).encode()
)
repr(err.exception)
+ with assertRaisesRegex(self, DecodeError, "bounds") as err:
+ self.base_klass(bounds=(bound_min, bound_max)).decode(
+ encode2pass(self.base_klass(value))
+ )
@given(data_strategy())
def test_call(self, d):
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
repr(obj_expled)
with self.assertRaises(ObjNotReady) as err:
obj.encode()
repr(err.exception)
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(obj)
value = d.draw(datetimes(
min_value=self.min_datetime,
max_value=self.max_datetime,
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
self.additional_symmetric_check(value, obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
with self.assertRaises(ObjNotReady) as err:
obj.encode()
repr(err.exception)
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(obj)
obj = Any(value)
self.assertTrue(obj.ready)
repr(obj)
tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
self.assertEqual(obj.tag_order, (tag_class, tag_num))
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
tag_class, _, tag_num = tag_decode(tag_expl)
with self.assertRaises(ObjNotReady) as err:
obj.encode()
repr(err.exception)
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(obj)
obj["whatever"] = Boolean()
self.assertFalse(obj.ready)
repr(obj)
self.assertFalse(obj.expled)
self.assertEqual(obj.tag_order, obj.value.tag_order)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
obj_expled = obj(value, expl=tag_expl)
self.assertTrue(obj_expled.expled)
tag_class, _, tag_num = tag_decode(tag_expl)
with self.assertRaises(ObjNotReady) as err:
seq.encode()
repr(err.exception)
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(seq)
for name, value in non_ready.items():
seq[name] = Boolean(value)
self.assertTrue(seq.ready)
pprint(seq, big_blobs=True, with_decode_path=True)
self.assertTrue(seq.ready)
seq_encoded = seq.encode()
+ self.assertEqual(encode2pass(seq), seq_encoded)
seq_encoded_cer = encode_cer(seq)
self.assertNotEqual(seq_encoded_cer, seq_encoded)
self.assertSequenceEqual(
seq, expect_outers = d.draw(sequences_strategy(seq_klass=self.base_klass))
self.assertTrue(seq.ready)
seq_encoded = seq.encode()
+ self.assertEqual(encode2pass(seq), seq_encoded)
seq_decoded, tail = seq.decode(seq_encoded)
self.assertEqual(tail, b"")
self.assertTrue(seq.ready)
min_size=len(_schema),
max_size=len(_schema),
))]
+ class Wahl(Choice):
+ schema = (("int", Integer()),)
class SeqWithoutDefault(self.base_klass):
schema = [
- (n, Integer(impl=t))
+ (n, Wahl(expl=t))
for (n, _), t in zip(_schema, tags)
]
seq_without_default = SeqWithoutDefault()
for name, value in _schema:
- seq_without_default[name] = Integer(value)
+ seq_without_default[name] = Wahl(("int", Integer(value)))
seq_encoded = seq_without_default.encode()
+ seq_without_default.decode(seq_encoded)
+ self.assertEqual(
+ len(list(seq_without_default.decode_evgen(seq_encoded))),
+ len(_schema) * 2 + 1,
+ )
class SeqWithDefault(self.base_klass):
schema = [
- (n, Integer(default=v, impl=t))
+ (n, Wahl(default=Wahl(("int", Integer(v))), expl=t))
for (n, v), t in zip(_schema, tags)
]
seq_with_default = SeqWithDefault()
with assertRaisesRegex(self, DecodeError, "DEFAULT value met"):
seq_with_default.decode(seq_encoded)
+ with assertRaisesRegex(self, DecodeError, "DEFAULT value met"):
+ list(seq_with_default.decode_evgen(seq_encoded))
for ctx in ({"bered": True}, {"allow_default_values": True}):
seq_decoded, _ = seq_with_default.decode(seq_encoded, ctx=ctx)
self.assertTrue(seq_decoded.ber_encoded)
self.assertTrue(seq_decoded.bered)
for name, value in _schema:
self.assertEqual(seq_decoded[name], seq_with_default[name])
- self.assertEqual(seq_decoded[name], value)
+ self.assertEqual(seq_decoded[name].value, value)
+ self.assertEqual(
+ len(list(seq_with_default.decode_evgen(seq_encoded, ctx=ctx))),
+ len(_schema) + 1,
+ )
+
+ seq_without_default = SeqWithoutDefault()
+ for name, value in _schema:
+ seq_without_default[name] = Wahl(("int", Integer(value + 1)))
+ seq_encoded = seq_without_default.encode()
+ seq_with_default.decode(seq_encoded)
+ self.assertEqual(
+ len(list(seq_with_default.decode_evgen(seq_encoded))),
+ len(_schema) + 1,
+ )
@given(data_strategy())
def test_missing_from_spec(self, d):
seq_missing = SeqMissing()
with self.assertRaises(TagMismatch):
seq_missing.decode(seq_encoded)
+ with self.assertRaises(TagMismatch):
+ list(seq_missing.decode_evgen(seq_encoded))
def test_bered(self):
class Seq(self.base_klass):
encoded = Seq.tag_default + len_encode(len(encoded)) + encoded
with self.assertRaises(DecodeError):
Seq().decode(encoded)
+ with self.assertRaises(DecodeError):
+ list(Seq().decode_evgen(encoded))
+ list(Seq().decode_evgen(encoded, ctx={"bered": True}))
decoded, _ = Seq().decode(encoded, ctx={"bered": True})
self.assertFalse(decoded.ber_encoded)
self.assertFalse(decoded.lenindef)
with self.assertRaises(ObjNotReady) as err:
seqof.encode()
repr(err.exception)
+ with self.assertRaises(ObjNotReady) as err:
+ encode2pass(seqof)
for i, value in enumerate(values):
self.assertEqual(seqof[i], value)
if not seqof[i].ready:
SeqOf(value).encode()
)
repr(err.exception)
+ with assertRaisesRegex(self, DecodeError, "bounds") as err:
+ SeqOf(bounds=(bound_min, bound_max)).decode(
+ encode2pass(SeqOf(value))
+ )
value = [Boolean(True)] * d.draw(integers(
min_value=bound_max + 1,
max_value=bound_max + 10,
SeqOf(value).encode()
)
repr(err.exception)
+ with assertRaisesRegex(self, DecodeError, "bounds") as err:
+ SeqOf(bounds=(bound_min, bound_max)).decode(
+ encode2pass(SeqOf(value))
+ )
@given(integers(min_value=1, max_value=10))
def test_out_of_bounds(self, bound_max):
pprint(obj, big_blobs=True, with_decode_path=True)
self.assertFalse(obj.expled)
obj_encoded = obj.encode()
+ self.assertEqual(encode2pass(obj), obj_encoded)
obj_encoded_cer = encode_cer(obj)
self.assertNotEqual(obj_encoded_cer, obj_encoded)
self.assertSequenceEqual(
register_class(SeqOf)
pickle_dumps(seqof)
+ def test_iterator_2pass(self):
+ class SeqOf(SequenceOf):
+ schema = Integer()
+ bounds = (1, float("+inf"))
+ def gen():
+ for i in six_xrange(10):
+ yield Integer(i)
+ seqof = SeqOf(gen())
+ self.assertTrue(seqof.ready)
+ _, state = seqof.encode1st()
+ self.assertFalse(seqof.ready)
+ seqof = seqof(gen())
+ self.assertTrue(seqof.ready)
+ buf = BytesIO()
+ seqof.encode2nd(buf.write, iter(state))
+ self.assertSequenceEqual(
+ [int(i) for i in seqof.decod(buf.getvalue())],
+ list(gen()),
+ )
+
def test_non_ready_bound_min(self):
class SeqOf(SequenceOf):
schema = Integer()