X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=pyderasn.py;h=3cbe67ddd1fbc7386f1df1556f700a54a360c242;hb=d8f05bb5f06096c6e061c82d40aeb9f43bbb1a21;hp=04f8ed5a21cbbaa69123786ca9d595564836f7a9;hpb=277095bb49de4020e8fa47c216dcf1abe7a910d7;p=pyderasn.git diff --git a/pyderasn.py b/pyderasn.py index 04f8ed5..3cbe67d 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -561,6 +561,8 @@ TagClassReprs = { TagClassPrivate: "PRIVATE ", TagClassUniversal: "UNIV ", } +EOC = b"\x00\x00" +EOC_LEN = len(EOC) ######################################################################## @@ -606,6 +608,10 @@ class NotEnoughData(DecodeError): pass +class LenIndefiniteForm(DecodeError): + pass + + class TagMismatch(DecodeError): pass @@ -806,7 +812,7 @@ def len_decode(data): if octets_num + 1 > len(data): raise NotEnoughData("encoded length is longer than data") if octets_num == 0: - raise DecodeError("long form instead of short one") + raise LenIndefiniteForm() if byte2int(data[1:]) == 0: raise DecodeError("leading zeros") l = 0 @@ -843,6 +849,8 @@ class Obj(object): "offset", "llen", "vlen", + "bered", + "expl_bered", ) def __init__( @@ -864,6 +872,8 @@ class Obj(object): self.optional = optional self.offset, self.llen, self.vlen = _decoded self.default = None + self.bered = False + self.expl_bered = False @property def ready(self): # pragma: no cover @@ -974,6 +984,35 @@ class Obj(object): ) try: l, llen, v = len_decode(lv) + except LenIndefiniteForm as err: + if not ctx.get("bered", False): + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + llen, v = 1, lv[1:] + offset += tlen + llen + result = self._decode( + v, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + ) + if tag_only: + return + obj, tail = result + eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] + if eoc_expected.tobytes() != EOC: + raise DecodeError( + msg="no EOC", + decode_path=decode_path, + offset=offset, + ) + obj.vlen += EOC_LEN + obj.expl_bered = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -981,23 +1020,24 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - if l > len(v): - raise NotEnoughData( - "encoded length is longer than data", - klass=self.__class__, + else: + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + result = self._decode( + v, + offset=offset + tlen + llen, decode_path=decode_path, - offset=offset, + ctx=ctx, + tag_only=tag_only, ) - result = self._decode( - v, - offset=offset + tlen + llen, - decode_path=decode_path, - ctx=ctx, - tag_only=tag_only, - ) - if tag_only: - return - obj, tail = result + if tag_only: + return + obj, tail = result return obj, (tail if leavemm else tail.tobytes()) @property @@ -1014,6 +1054,8 @@ class Obj(object): @property def expl_llen(self): + if self.expl_bered: + return 1 return len(len_encode(self.tlvlen)) @property @@ -1398,10 +1440,14 @@ class Boolean(Obj): offset=offset, ) first_octet = byte2int(v) + bered = False if first_octet == 0: value = False elif first_octet == 0xFF: value = True + elif ctx.get("bered", False): + value = True + bered = True else: raise DecodeError( "unacceptable Boolean value", @@ -1417,6 +1463,7 @@ class Boolean(Obj): optional=self.optional, _decoded=(offset, 1, 1), ) + obj.bered = bered return obj, v[1:] def __repr__(self): @@ -1804,7 +1851,7 @@ class BitString(Obj): >>> b.specs {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2} """ - __slots__ = ("specs", "defined") + __slots__ = ("tag_constructed", "specs", "defined") tag_default = tag_encode(3) asn1_type_name = "BIT STRING" @@ -1842,6 +1889,12 @@ class BitString(Obj): if value is None: self._value = default self.defined = None + tag_klass, _, tag_num = tag_decode(self.tag) + self.tag_constructed = tag_encode( + klass=tag_klass, + form=TagFormConstructed, + num=tag_num, + ) def _bits2octets(self, bits): if len(self.specs) > 0: @@ -1987,24 +2040,7 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): - try: - t, _, lv = tag_strip(tlv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if t != self.tag: - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: - return + def _decode_chunk(self, lv, offset, decode_path, ctx): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2062,6 +2098,128 @@ class BitString(Obj): ) return obj, tail + def _decode(self, tlv, offset, decode_path, ctx, tag_only): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t == self.tag: + if tag_only: + return + return self._decode_chunk(lv, offset, decode_path, ctx) + if t == self.tag_constructed: + if not ctx.get("bered", False): + raise DecodeError( + msg="unallowed BER constructed encoding", + decode_path=decode_path, + offset=offset, + ) + if tag_only: + return + eoc_expected = False + try: + l, llen, v = len_decode(lv) + except LenIndefiniteForm: + llen, l, v = 1, 0, lv[1:] + eoc_expected = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > 0 and l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not eoc_expected and l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if eoc_expected: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + msg="chunk out of bounds", + decode_path=len(chunks) - 1, + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = BitString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + ) + except TagMismatch: + raise DecodeError( + msg="expected BitString encoded chunk", + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if len(chunks) == 0: + raise DecodeError( + msg="no chunks", + decode_path=decode_path, + offset=offset, + ) + values = [] + bit_len = 0 + for chunk_i, chunk in enumerate(chunks[:-1]): + if chunk.bit_len % 8 != 0: + raise DecodeError( + msg="BitString chunk is not multiple of 8 bit", + decode_path=decode_path + (str(chunk_i),), + offset=chunk.offset, + ) + values.append(bytes(chunk)) + bit_len += chunk.bit_len + chunk_last = chunks[-1] + values.append(bytes(chunk_last)) + bit_len += chunk_last.bit_len + obj = self.__class__( + value=(bit_len, b"".join(values)), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _specs=self.specs, + _decoded=(offset, llen, vlen + (EOC_LEN if eoc_expected else 0)), + ) + obj.bered = True + return obj, v[EOC_LEN if eoc_expected else 0:] + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + def __repr__(self): return pp_console_row(next(self.pps())) @@ -2115,7 +2273,7 @@ class OctetString(Obj): >>> OctetString(b"hell", bounds=(4, 4)) OCTET STRING 4 bytes 68656c6c """ - __slots__ = ("_bound_min", "_bound_max", "defined") + __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" @@ -2164,6 +2322,12 @@ class OctetString(Obj): if self._value is None: self._value = default self.defined = None + tag_klass, _, tag_num = tag_decode(self.tag) + self.tag_constructed = tag_encode( + klass=tag_klass, + form=TagFormConstructed, + num=tag_num, + ) def _value_sanitize(self, value): if issubclass(value.__class__, OctetString): @@ -2241,24 +2405,7 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset, decode_path, ctx, tag_only): - try: - t, _, lv = tag_strip(tlv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if t != self.tag: - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if tag_only: - return + def _decode_chunk(self, lv, offset, decode_path, ctx): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2302,6 +2449,129 @@ class OctetString(Obj): ) return obj, tail + def _decode(self, tlv, offset, decode_path, ctx, tag_only): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t == self.tag: + if tag_only: + return + return self._decode_chunk(lv, offset, decode_path, ctx) + if t == self.tag_constructed: + if not ctx.get("bered", False): + raise DecodeError( + msg="unallowed BER constructed encoding", + decode_path=decode_path, + offset=offset, + ) + if tag_only: + return + eoc_expected = False + try: + l, llen, v = len_decode(lv) + except LenIndefiniteForm: + llen, l, v = 1, 0, lv[1:] + eoc_expected = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > 0 and l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not eoc_expected and l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if eoc_expected: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + msg="chunk out of bounds", + decode_path=len(chunks) - 1, + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = OctetString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + ) + except TagMismatch: + raise DecodeError( + msg="expected OctetString encoded chunk", + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if len(chunks) == 0: + raise DecodeError( + msg="no chunks", + decode_path=decode_path, + offset=offset, + ) + try: + obj = self.__class__( + value=b"".join(bytes(chunk) for chunk in chunks), + bounds=(self._bound_min, self._bound_max), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, vlen + (EOC_LEN if eoc_expected else 0)), + ) + except DecodeError as err: + raise DecodeError( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + except BoundsError as err: + raise DecodeError( + msg=str(err), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj.bered = True + return obj, v[EOC_LEN if eoc_expected else 0:] + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + def __repr__(self): return pp_console_row(next(self.pps())) @@ -3740,7 +4010,7 @@ class Sequence(Obj): pyderasn.InvalidValueType: invalid value type, expected: >>> ext["extnID"] = ObjectIdentifier("1.2.3") - You can know if sequence is ready to be encoded: + You can determine if sequence is ready to be encoded: >>> ext.ready False @@ -3766,7 +4036,15 @@ class Sequence(Obj): Assign ``None`` to remove value from sequence. - You can know if value exists/set in the sequence and take its value: + You can set values in Sequence during its initialization: + + >>> AlgorithmIdentifier(( + ("algorithm", ObjectIdentifier("1.2.3")), + ("parameters", Any(Null())) + )) + AlgorithmIdentifier SEQUENCE[OBJECT IDENTIFIER 1.2.3, ANY 0500 OPTIONAL] + + You can determine if value exists/set in the sequence and take its value: >>> "extnID" in ext, "extnValue" in ext, "critical" in ext (True, True, False) @@ -3823,9 +4101,17 @@ class Sequence(Obj): ) self._value = {} if value is not None: - self._value = self._value_sanitize(value) + if issubclass(value.__class__, Sequence): + self._value = value._value + elif hasattr(value, "__iter__"): + for seq_key, seq_value in value: + self[seq_key] = seq_value + else: + raise InvalidValueType((Sequence,)) if default is not None: - default_value = self._value_sanitize(default) + if not issubclass(default.__class__, Sequence): + raise InvalidValueType((Sequence,)) + default_value = default._value default_obj = self.__class__(impl=self.tag, expl=self._expl) default_obj.specs = self.specs default_obj._value = default_value @@ -3833,11 +4119,6 @@ class Sequence(Obj): if value is None: self._value = default_obj.copy()._value - def _value_sanitize(self, value): - if not issubclass(value.__class__, Sequence): - raise InvalidValueType((Sequence,)) - return value._value - @property def ready(self): for name, spec in self.specs.items():