From 038b5d9eab3e5dc2a203f064f22caa854f5b68ae Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 19 May 2018 17:47:04 +0300 Subject: [PATCH] BER BitString/OctetString support --- pyderasn.py | 307 +++++++++++++++++++++++++++++++++++------ tests/test_pyderasn.py | 137 ++++++++++++++++++ 2 files changed, 405 insertions(+), 39 deletions(-) diff --git a/pyderasn.py b/pyderasn.py index a560eda..6e772e0 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -561,6 +561,8 @@ TagClassReprs = { TagClassPrivate: "PRIVATE ", TagClassUniversal: "UNIV ", } +EOC = b"\x00\x00" +EOC_LEN = len(EOC) ######################################################################## @@ -606,6 +608,10 @@ class NotEnoughData(DecodeError): pass +class LenIndefiniteForm(DecodeError): + pass + + class TagMismatch(DecodeError): pass @@ -806,7 +812,7 @@ def len_decode(data): if octets_num + 1 > len(data): raise NotEnoughData("encoded length is longer than data") if octets_num == 0: - raise DecodeError("long form instead of short one") + raise LenIndefiniteForm() if byte2int(data[1:]) == 0: raise DecodeError("leading zeros") l = 0 @@ -1811,7 +1817,7 @@ class BitString(Obj): >>> b.specs {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2} """ - __slots__ = ("specs", "defined") + __slots__ = ("tag_constructed", "specs", "defined") tag_default = tag_encode(3) asn1_type_name = "BIT STRING" @@ -1849,6 +1855,12 @@ class BitString(Obj): if value is None: self._value = default self.defined = None + tag_klass, _, tag_num = tag_decode(self.tag) + self.tag_constructed = tag_encode( + klass=tag_klass, + form=TagFormConstructed, + num=tag_num, + ) def _bits2octets(self, bits): if len(self.specs) > 0: @@ -1994,24 +2006,7 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): - try: - t, _, lv = tag_strip(tlv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if t != self.tag: - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: - return + def _decode_chunk(self, lv, offset, decode_path, ctx): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2069,6 +2064,128 @@ class BitString(Obj): ) return obj, tail + def _decode(self, tlv, offset, decode_path, ctx, tag_only): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t == self.tag: + if tag_only: + return + return self._decode_chunk(lv, offset, decode_path, ctx) + if t == self.tag_constructed: + if not ctx.get("bered", False): + raise DecodeError( + msg="unallowed BER constructed encoding", + decode_path=decode_path, + offset=offset, + ) + if tag_only: + return + eoc_expected = False + try: + l, llen, v = len_decode(lv) + except LenIndefiniteForm: + llen, l, v = 1, 0, lv[1:] + eoc_expected = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > 0 and l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not eoc_expected and l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if eoc_expected: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + msg="chunk out of bounds", + decode_path=len(chunks) - 1, + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = BitString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + ) + except TagMismatch: + raise DecodeError( + msg="expected BitString encoded chunk", + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if len(chunks) == 0: + raise DecodeError( + msg="no chunks", + decode_path=decode_path, + offset=offset, + ) + values = [] + bit_len = 0 + for chunk_i, chunk in enumerate(chunks[:-1]): + if chunk.bit_len % 8 != 0: + raise DecodeError( + msg="BitString chunk is not multiple of 8 bit", + decode_path=decode_path + (str(chunk_i),), + offset=chunk.offset, + ) + values.append(bytes(chunk)) + bit_len += chunk.bit_len + chunk_last = chunks[-1] + values.append(bytes(chunk_last)) + bit_len += chunk_last.bit_len + obj = self.__class__( + value=(bit_len, b"".join(values)), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _specs=self.specs, + _decoded=(offset, llen, vlen + (EOC_LEN if eoc_expected else 0)), + ) + obj.bered = True + return obj, v[EOC_LEN if eoc_expected else 0:] + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + def __repr__(self): return pp_console_row(next(self.pps())) @@ -2122,7 +2239,7 @@ class OctetString(Obj): >>> OctetString(b"hell", bounds=(4, 4)) OCTET STRING 4 bytes 68656c6c """ - __slots__ = ("_bound_min", "_bound_max", "defined") + __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" @@ -2171,6 +2288,12 @@ class OctetString(Obj): if self._value is None: self._value = default self.defined = None + tag_klass, _, tag_num = tag_decode(self.tag) + self.tag_constructed = tag_encode( + klass=tag_klass, + form=TagFormConstructed, + num=tag_num, + ) def _value_sanitize(self, value): if issubclass(value.__class__, OctetString): @@ -2248,24 +2371,7 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): - try: - t, _, lv = tag_strip(tlv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if t != self.tag: - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: - return + def _decode_chunk(self, lv, offset, decode_path, ctx): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2309,6 +2415,129 @@ class OctetString(Obj): ) return obj, tail + def _decode(self, tlv, offset, decode_path, ctx, tag_only): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t == self.tag: + if tag_only: + return + return self._decode_chunk(lv, offset, decode_path, ctx) + if t == self.tag_constructed: + if not ctx.get("bered", False): + raise DecodeError( + msg="unallowed BER constructed encoding", + decode_path=decode_path, + offset=offset, + ) + if tag_only: + return + eoc_expected = False + try: + l, llen, v = len_decode(lv) + except LenIndefiniteForm: + llen, l, v = 1, 0, lv[1:] + eoc_expected = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > 0 and l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not eoc_expected and l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if eoc_expected: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + msg="chunk out of bounds", + decode_path=len(chunks) - 1, + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = OctetString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + ) + except TagMismatch: + raise DecodeError( + msg="expected OctetString encoded chunk", + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if len(chunks) == 0: + raise DecodeError( + msg="no chunks", + decode_path=decode_path, + offset=offset, + ) + try: + obj = self.__class__( + value=b"".join(bytes(chunk) for chunk in chunks), + bounds=(self._bound_min, self._bound_max), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, vlen + (EOC_LEN if eoc_expected else 0)), + ) + except DecodeError as err: + raise DecodeError( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + except BoundsError as err: + raise DecodeError( + msg=str(err), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj.bered = True + return obj, v[EOC_LEN if eoc_expected else 0:] + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + def __repr__(self): return pp_console_row(next(self.pps())) diff --git a/tests/test_pyderasn.py b/tests/test_pyderasn.py index a449068..15f54f9 100644 --- a/tests/test_pyderasn.py +++ b/tests/test_pyderasn.py @@ -43,6 +43,7 @@ from hypothesis.strategies import sets from hypothesis.strategies import text from hypothesis.strategies import tuples from six import assertRaisesRegex +from six import binary_type from six import byte2int from six import indexbytes from six import int2byte @@ -62,6 +63,7 @@ from pyderasn import Choice from pyderasn import DecodeError from pyderasn import DecodePathDefBy from pyderasn import Enumerated +from pyderasn import EOC from pyderasn import GeneralizedTime from pyderasn import GeneralString from pyderasn import GraphicString @@ -1396,6 +1398,86 @@ class TestBitString(CommonMixin, TestCase): self.assertTrue(obj[9]) self.assertFalse(obj[17]) + @given( + integers(min_value=1, max_value=30), + lists( + one_of( + binary(min_size=1, max_size=5), + lists( + binary(min_size=1, max_size=5), + min_size=1, + max_size=3, + ), + ), + min_size=0, + max_size=3, + ), + lists(booleans(), min_size=1), + ) + def test_constructed(self, impl, chunk_inputs, chunk_last_bits): + def chunk_constructed(contents): + return ( + tag_encode(form=TagFormConstructed, num=3) + + b"\x80" + + b"".join(BitString(content).encode() for content in contents) + + EOC + ) + chunks = [] + payload_expected = b"" + bit_len_expected = 0 + for chunk_input in chunk_inputs: + if isinstance(chunk_input, binary_type): + chunks.append(BitString(chunk_input).encode()) + payload_expected += chunk_input + bit_len_expected += len(chunk_input) * 8 + else: + chunks.append(chunk_constructed(chunk_input)) + payload = b"".join(chunk_input) + payload_expected += payload + bit_len_expected += len(payload) * 8 + chunk_last = BitString("'%s'B" % "".join( + "1" if bit else "0" for bit in chunk_last_bits + )) + payload_expected += bytes(chunk_last) + bit_len_expected += chunk_last.bit_len + encoded_indefinite = ( + tag_encode(form=TagFormConstructed, num=impl) + + b"\x80" + + b"".join(chunks) + + chunk_last.encode() + + EOC + ) + encoded_definite = ( + tag_encode(form=TagFormConstructed, num=impl) + + len_encode(len(b"".join(chunks) + chunk_last.encode())) + + b"".join(chunks) + + chunk_last.encode() + ) + with assertRaisesRegex(self, DecodeError, "unallowed BER"): + BitString(impl=tag_encode(impl)).decode(encoded_indefinite) + for encoded in (encoded_indefinite, encoded_definite): + obj, tail = BitString(impl=tag_encode(impl)).decode( + encoded, ctx={"bered": True} + ) + self.assertSequenceEqual(tail, b"") + self.assertEqual(obj.bit_len, bit_len_expected) + self.assertSequenceEqual(bytes(obj), payload_expected) + self.assertTrue(obj.bered) + self.assertEqual(len(encoded), obj.tlvlen) + + def test_x690_vector(self): + vector_payload = hexdec("0A3B5F291CD0") + vector = BitString((len(vector_payload) * 8 - 4, vector_payload)) + obj, tail = BitString().decode(hexdec("0307040A3B5F291CD0")) + self.assertSequenceEqual(tail, b"") + self.assertEqual(obj, vector) + obj, tail = BitString().decode( + hexdec("23800303000A3B0305045F291CD00000"), + ctx={"bered": True}, + ) + self.assertSequenceEqual(tail, b"") + self.assertEqual(obj, vector) + @composite def octet_string_values_strategy(draw, do_expl=False): @@ -1707,6 +1789,61 @@ class TestOctetString(CommonMixin, TestCase): ) self.assertEqual(obj_decoded.expl_offset, offset) + @given( + integers(min_value=1, max_value=30), + lists( + one_of( + binary(min_size=1, max_size=5), + lists( + binary(min_size=1, max_size=5), + min_size=1, + max_size=3, + ), + ), + min_size=1, + max_size=3, + ), + ) + def test_constructed(self, impl, chunk_inputs): + def chunk_constructed(contents): + return ( + tag_encode(form=TagFormConstructed, num=4) + + b"\x80" + + b"".join(OctetString(content).encode() for content in contents) + + EOC + ) + chunks = [] + payload_expected = b"" + for chunk_input in chunk_inputs: + if isinstance(chunk_input, binary_type): + chunks.append(OctetString(chunk_input).encode()) + payload_expected += chunk_input + else: + chunks.append(chunk_constructed(chunk_input)) + payload = b"".join(chunk_input) + payload_expected += payload + encoded_indefinite = ( + tag_encode(form=TagFormConstructed, num=impl) + + b"\x80" + + b"".join(chunks) + + EOC + ) + encoded_definite = ( + tag_encode(form=TagFormConstructed, num=impl) + + len_encode(len(b"".join(chunks))) + + b"".join(chunks) + ) + with assertRaisesRegex(self, DecodeError, "unallowed BER"): + OctetString(impl=tag_encode(impl)).decode(encoded_indefinite) + for encoded in (encoded_indefinite, encoded_definite): + obj, tail = OctetString(impl=tag_encode(impl)).decode( + encoded, ctx={"bered": True} + ) + self.assertSequenceEqual(tail, b"") + self.assertSequenceEqual(bytes(obj), payload_expected) + self.assertTrue(obj.bered) + self.assertEqual(len(encoded), obj.tlvlen) + @composite def null_values_strategy(draw, do_expl=False): -- 2.44.0