X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=pyderasn.py;h=f7e6c4c53f9cb5d5316f555ca42658b0301d4308;hb=2d45a224943c79e95cbd4913b44420788bc6c17d;hp=3ed23da12c2fb486359224fc1275413b9da44980;hpb=dc78d31d6f27b6cbcfafe8973257a3acb0ce692c;p=pyderasn.git diff --git a/pyderasn.py b/pyderasn.py index 3ed23da..f7e6c4c 100755 --- a/pyderasn.py +++ b/pyderasn.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # coding: utf-8 -# PyDERASN -- Python ASN.1 DER codec with abstract structures +# PyDERASN -- Python ASN.1 DER/BER codec with abstract structures # Copyright (C) 2017-2018 Sergey Matveev # # This program is free software: you can redistribute it and/or modify @@ -16,10 +16,10 @@ # You should have received a copy of the GNU Lesser General Public # License along with this program. If not, see # . -"""Python ASN.1 DER codec with abstract structures +"""Python ASN.1 DER/BER codec with abstract structures -This library allows you to marshal and unmarshal various structures in -ASN.1 DER format, like this: +This library allows you to marshal various structures in ASN.1 DER +format, unmarshal them in BER/CER/DER ones. >>> i = Integer(123) >>> raw = i.encode() @@ -193,7 +193,7 @@ explicit tag. If you want to know information about it, then use: lesser than ``offset``), ``expl_tlen``, ``expl_llen``, ``expl_vlen`` (that actually equals to ordinary ``tlvlen``). -When error occurs, then :py:exc:`pyderasn.DecodeError` is raised. +When error occurs, :py:exc:`pyderasn.DecodeError` is raised. .. _ctx: @@ -206,6 +206,7 @@ decoding process. Currently available context options: +* :ref:`bered ` * :ref:`defines_by_path ` * :ref:`strict_default_existence ` @@ -269,11 +270,11 @@ for AlgorithmIdentifier of X.509's ``tbsCertificate.subjectPublicKeyInfo.algorithm.algorithm``:: ( - (('parameters',), { + (("parameters",), { id_ecPublicKey: ECParameters(), id_GostR3410_2001: GostR34102001PublicKeyParameters(), }), - (('..', 'subjectPublicKey'), { + (("..", "subjectPublicKey"), { id_rsaEncryption: RSAPublicKey(), id_GostR3410_2001: OctetString(), }), @@ -289,7 +290,7 @@ Following types can be automatically decoded (DEFINED BY): * :py:class:`pyderasn.BitString` (that is multiple of 8 bits) * :py:class:`pyderasn.OctetString` * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf` - ``Any``/``OctetString``-s + ``Any``/``BitString``/``OctetString``-s When any of those fields is automatically decoded, then ``.defined`` attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it @@ -363,6 +364,33 @@ First function is useful for path construction when some automatic decoding is already done. ``any`` means literally any value it meet -- useful for SEQUENCE/SET OF-s. +.. _bered_ctx: + +BER encoding +------------ + +.. warning:: + + Currently BER support is not extensively tested. + +By default PyDERASN accepts only DER encoded data. It always encodes to +DER. But you can optionally enable BER decoding with setting ``bered`` +:ref:`context ` argument to True. Indefinite lengths and +constructed primitive types should be parsed successfully. + +* If object is encoded in BER form (not the DER one), then ``bered`` + attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET + STRING`` can contain it. +* If object has an indefinite length encoding, then its ``lenindef`` + attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``, + ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can + contain it. +* If object has an indefinite length encoded explicit tag, then + ``expl_lenindef`` is set to True. + +EOC (end-of-contents) token's length is taken in advance in object's +value length. + Primitive types --------------- @@ -472,6 +500,8 @@ from collections import namedtuple from collections import OrderedDict from datetime import datetime from math import ceil +from os import environ +from string import digits from six import add_metaclass from six import binary_type @@ -559,6 +589,8 @@ TagClassReprs = { TagClassPrivate: "PRIVATE ", TagClassUniversal: "UNIV ", } +EOC = b"\x00\x00" +EOC_LEN = len(EOC) ######################################################################## @@ -604,6 +636,10 @@ class NotEnoughData(DecodeError): pass +class LenIndefForm(DecodeError): + pass + + class TagMismatch(DecodeError): pass @@ -795,6 +831,11 @@ def len_encode(l): def len_decode(data): + """Decode length + + :returns: (decoded length, length's length, remaining data) + :raises LenIndefForm: if indefinite form encoding is met + """ if len(data) == 0: raise NotEnoughData("no data at all") first_octet = byte2int(data) @@ -804,7 +845,7 @@ def len_decode(data): if octets_num + 1 > len(data): raise NotEnoughData("encoded length is longer than data") if octets_num == 0: - raise DecodeError("long form instead of short one") + raise LenIndefForm() if byte2int(data[1:]) == 0: raise DecodeError("leading zeros") l = 0 @@ -841,6 +882,9 @@ class Obj(object): "offset", "llen", "vlen", + "expl_lenindef", + "lenindef", + "bered", ) def __init__( @@ -862,6 +906,9 @@ class Obj(object): self.optional = optional self.offset, self.llen, self.vlen = _decoded self.default = None + self.expl_lenindef = False + self.lenindef = False + self.bered = False @property def ready(self): # pragma: no cover @@ -910,7 +957,7 @@ class Obj(object): def _encode(self): # pragma: no cover raise NotImplementedError() - def _decode(self, tlv, offset, decode_path, ctx): # pragma: no cover + def _decode(self, tlv, offset, decode_path, ctx, tag_only): # pragma: no cover raise NotImplementedError() def encode(self): @@ -919,7 +966,15 @@ class Obj(object): return raw return b"".join((self._expl, len_encode(len(raw)), raw)) - def decode(self, data, offset=0, leavemm=False, decode_path=(), ctx=None): + def decode( + self, + data, + offset=0, + leavemm=False, + decode_path=(), + ctx=None, + tag_only=False, + ): """Decode the data :param data: either binary or memoryview @@ -927,18 +982,25 @@ class Obj(object): :param bool leavemm: do we need to leave memoryview of remaining data as is, or convert it to bytes otherwise :param ctx: optional :ref:`context ` governing decoding process. + :param tag_only: decode only the tag, without length and contents + (used only in Choice and Set structures, trying to + determine if tag satisfies the scheme) :returns: (Obj, remaining data) """ if ctx is None: ctx = {} tlv = memoryview(data) if self._expl is None: - obj, tail = self._decode( + result = self._decode( tlv, offset, decode_path=decode_path, ctx=ctx, + tag_only=tag_only, ) + if tag_only: + return + obj, tail = result else: try: t, tlen, lv = tag_strip(tlv) @@ -957,6 +1019,35 @@ class Obj(object): ) try: l, llen, v = len_decode(lv) + except LenIndefForm as err: + if not ctx.get("bered", False): + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + llen, v = 1, lv[1:] + offset += tlen + llen + result = self._decode( + v, + offset=offset, + decode_path=decode_path, + ctx=ctx, + tag_only=tag_only, + ) + if tag_only: + return + obj, tail = result + eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:] + if eoc_expected.tobytes() != EOC: + raise DecodeError( + msg="no EOC", + decode_path=decode_path, + offset=offset, + ) + obj.vlen += EOC_LEN + obj.expl_lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -964,19 +1055,24 @@ class Obj(object): decode_path=decode_path, offset=offset, ) - if l > len(v): - raise NotEnoughData( - "encoded length is longer than data", - klass=self.__class__, + else: + if l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + result = self._decode( + v, + offset=offset + tlen + llen, decode_path=decode_path, - offset=offset, + ctx=ctx, + tag_only=tag_only, ) - obj, tail = self._decode( - v, - offset=offset + tlen + llen, - decode_path=decode_path, - ctx=ctx, - ) + if tag_only: + return + obj, tail = result return obj, (tail if leavemm else tail.tobytes()) @property @@ -993,6 +1089,8 @@ class Obj(object): @property def expl_llen(self): + if self.expl_lenindef: + return 1 return len(len_encode(self.tlvlen)) @property @@ -1011,11 +1109,14 @@ class Obj(object): class DecodePathDefBy(object): """DEFINED BY representation inside decode path """ - __slots__ = ('defined_by',) + __slots__ = ("defined_by",) def __init__(self, defined_by): self.defined_by = defined_by + def __ne__(self, their): + return not(self == their) + def __eq__(self, their): if not isinstance(their, self.__class__): return False @@ -1050,6 +1151,9 @@ PP = namedtuple("PP", ( "expl_tlen", "expl_llen", "expl_vlen", + "expl_lenindef", + "lenindef", + "bered", )) @@ -1071,6 +1175,9 @@ def _pp( expl_tlen=None, expl_llen=None, expl_vlen=None, + expl_lenindef=False, + lenindef=False, + bered=False, ): return PP( asn1_type_name, @@ -1090,6 +1197,9 @@ def _pp( expl_tlen, expl_llen, expl_vlen, + expl_lenindef, + lenindef, + bered, ) @@ -1115,7 +1225,17 @@ def pp_console_row( ) cols.append(_colorize(col, "red", with_colours, ())) col = "[%d,%d,%4d]" % (pp.tlen, pp.llen, pp.vlen) - cols.append(_colorize(col, "green", with_colours, ())) + col = _colorize(col, "green", with_colours, ()) + ber_deoffset = 0 + if pp.expl_lenindef: + ber_deoffset += 2 + if pp.lenindef: + ber_deoffset += 2 + col += ( + " " if ber_deoffset == 0 else + _colorize(("-%d" % ber_deoffset), "red", with_colours) + ) + cols.append(col) if len(pp.decode_path) > 0: cols.append(" ." * (len(pp.decode_path))) ent = pp.decode_path[-1] @@ -1130,9 +1250,9 @@ def pp_console_row( ): cols.append(_colorize("%s:" % oids[value], "green", with_colours)) else: - cols.append(_colorize("%s:" % value, "white", with_colours)) + cols.append(_colorize("%s:" % value, "white", with_colours, ("reverse",))) else: - cols.append(_colorize("%s:" % ent, "yellow", with_colours)) + cols.append(_colorize("%s:" % ent, "yellow", with_colours, ("reverse",))) if pp.expl is not None: klass, _, num = pp.expl col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num) @@ -1143,10 +1263,12 @@ def pp_console_row( cols.append(_colorize(col, "blue", with_colours)) if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper(): cols.append(_colorize(pp.obj_name, "magenta", with_colours)) + if pp.bered: + cols.append(_colorize("BER", "red", with_colours)) cols.append(_colorize(pp.asn1_type_name, "cyan", with_colours)) if pp.value is not None: value = pp.value - cols.append(_colorize(value, "white", with_colours)) + cols.append(_colorize(value, "white", with_colours, ("reverse",))) if ( oids is not None and pp.asn1_type_name == ObjectIdentifier.asn1_type_name and @@ -1166,7 +1288,7 @@ def pp_console_row( def pp_console_blob(pp): - cols = [" " * len("XXXXXYY [X,X,XXXX]")] + cols = [" " * len("XXXXXYY [X,X,XXXX]YY")] if len(pp.decode_path) > 0: cols.append(" ." * (len(pp.decode_path) + 1)) if isinstance(pp.blob, binary_type): @@ -1332,7 +1454,7 @@ class Boolean(Obj): (b"\xFF" if self._value else b"\x00"), )) - def _decode(self, tlv, offset, decode_path, ctx): + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1348,6 +1470,8 @@ class Boolean(Obj): decode_path=decode_path, offset=offset, ) + if tag_only: + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -1372,10 +1496,14 @@ class Boolean(Obj): offset=offset, ) first_octet = byte2int(v) + bered = False if first_octet == 0: value = False elif first_octet == 0xFF: value = True + elif ctx.get("bered", False): + value = True + bered = True else: raise DecodeError( "unacceptable Boolean value", @@ -1391,6 +1519,7 @@ class Boolean(Obj): optional=self.optional, _decoded=(offset, 1, 1), ) + obj.bered = bered return obj, v[1:] def __repr__(self): @@ -1414,6 +1543,8 @@ class Boolean(Obj): expl_tlen=self.expl_tlen if self.expled else None, expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, + bered=self.bered, ) @@ -1625,7 +1756,7 @@ class Integer(Obj): break return b"".join((self.tag, len_encode(len(octets)), octets)) - def _decode(self, tlv, offset, decode_path, ctx): + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -1641,6 +1772,8 @@ class Integer(Obj): decode_path=decode_path, offset=offset, ) + if tag_only: + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -1734,6 +1867,7 @@ class Integer(Obj): expl_tlen=self.expl_tlen if self.expled else None, expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, ) @@ -1749,6 +1883,8 @@ class BitString(Obj): >>> b.bit_len 88 + >>> BitString("'0A3B5F291CD'H") + BIT STRING 44 bits 0a3b5f291cd0 >>> b = BitString("'010110000000'B") BIT STRING 12 bits 5800 >>> b.bit_len @@ -1764,19 +1900,19 @@ class BitString(Obj): class KeyUsage(BitString): schema = ( - ('digitalSignature', 0), - ('nonRepudiation', 1), - ('keyEncipherment', 2), + ("digitalSignature", 0), + ("nonRepudiation", 1), + ("keyEncipherment", 2), ) - >>> b = KeyUsage(('keyEncipherment', 'nonRepudiation')) + >>> b = KeyUsage(("keyEncipherment", "nonRepudiation")) KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment >>> b.named ['nonRepudiation', 'keyEncipherment'] >>> b.specs {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2} """ - __slots__ = ("specs", "defined") + __slots__ = ("tag_constructed", "specs", "defined") tag_default = tag_encode(3) asn1_type_name = "BIT STRING" @@ -1814,6 +1950,12 @@ class BitString(Obj): if value is None: self._value = default self.defined = None + tag_klass, _, tag_num = tag_decode(self.tag) + self.tag_constructed = tag_encode( + klass=tag_klass, + form=TagFormConstructed, + num=tag_num, + ) def _bits2octets(self, bits): if len(self.specs) > 0: @@ -1831,21 +1973,25 @@ class BitString(Obj): if isinstance(value, (string_types, binary_type)): if ( isinstance(value, string_types) and - value.startswith("'") and - value.endswith("'B") + value.startswith("'") ): - value = value[1:-2] - if not set(value) <= set(("0", "1")): - raise ValueError("B's coding contains unacceptable chars") - return self._bits2octets(value) + if value.endswith("'B"): + value = value[1:-2] + if not set(value) <= set(("0", "1")): + raise ValueError("B's coding contains unacceptable chars") + return self._bits2octets(value) + elif value.endswith("'H"): + value = value[1:-2] + return ( + len(value) * 4, + hexdec(value + ("" if len(value) % 2 == 0 else "0")), + ) + else: + raise InvalidValueType((self.__class__, string_types, binary_type)) elif isinstance(value, binary_type): return (len(value) * 8, value) else: - raise InvalidValueType(( - self.__class__, - string_types, - binary_type, - )) + raise InvalidValueType((self.__class__, string_types, binary_type)) if isinstance(value, tuple): if ( len(value) == 2 and @@ -1959,22 +2105,7 @@ class BitString(Obj): octets, )) - def _decode(self, tlv, offset, decode_path, ctx): - try: - t, _, lv = tag_strip(tlv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if t != self.tag: - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) + def _decode_chunk(self, lv, offset, decode_path, ctx): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2013,7 +2144,7 @@ class BitString(Obj): decode_path=decode_path, offset=offset, ) - if byte2int(v[-1:]) & ((1 << pad_size) - 1) != 0: + if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0: raise DecodeError( "invalid pad", klass=self.__class__, @@ -2032,6 +2163,129 @@ class BitString(Obj): ) return obj, tail + def _decode(self, tlv, offset, decode_path, ctx, tag_only): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t == self.tag: + if tag_only: + return + return self._decode_chunk(lv, offset, decode_path, ctx) + if t == self.tag_constructed: + if not ctx.get("bered", False): + raise DecodeError( + msg="unallowed BER constructed encoding", + decode_path=decode_path, + offset=offset, + ) + if tag_only: + return + lenindef = False + try: + l, llen, v = len_decode(lv) + except LenIndefForm: + llen, l, v = 1, 0, lv[1:] + lenindef = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > 0 and l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not lenindef and l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if lenindef: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + msg="chunk out of bounds", + decode_path=len(chunks) - 1, + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = BitString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + ) + except TagMismatch: + raise DecodeError( + msg="expected BitString encoded chunk", + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if len(chunks) == 0: + raise DecodeError( + msg="no chunks", + decode_path=decode_path, + offset=offset, + ) + values = [] + bit_len = 0 + for chunk_i, chunk in enumerate(chunks[:-1]): + if chunk.bit_len % 8 != 0: + raise DecodeError( + msg="BitString chunk is not multiple of 8 bit", + decode_path=decode_path + (str(chunk_i),), + offset=chunk.offset, + ) + values.append(bytes(chunk)) + bit_len += chunk.bit_len + chunk_last = chunks[-1] + values.append(bytes(chunk_last)) + bit_len += chunk_last.bit_len + obj = self.__class__( + value=(bit_len, b"".join(values)), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _specs=self.specs, + _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + ) + obj.lenindef = lenindef + obj.bered = True + return obj, (v[EOC_LEN:] if lenindef else v) + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + def __repr__(self): return pp_console_row(next(self.pps())) @@ -2061,6 +2315,9 @@ class BitString(Obj): expl_tlen=self.expl_tlen if self.expled else None, expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, + lenindef=self.lenindef, + bered=self.bered, ) defined_by, defined = self.defined or (None, None) if defined_by is not None: @@ -2085,7 +2342,7 @@ class OctetString(Obj): >>> OctetString(b"hell", bounds=(4, 4)) OCTET STRING 4 bytes 68656c6c """ - __slots__ = ("_bound_min", "_bound_max", "defined") + __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined") tag_default = tag_encode(4) asn1_type_name = "OCTET STRING" @@ -2134,6 +2391,12 @@ class OctetString(Obj): if self._value is None: self._value = default self.defined = None + tag_klass, _, tag_num = tag_decode(self.tag) + self.tag_constructed = tag_encode( + klass=tag_klass, + form=TagFormConstructed, + num=tag_num, + ) def _value_sanitize(self, value): if issubclass(value.__class__, OctetString): @@ -2211,22 +2474,7 @@ class OctetString(Obj): self._value, )) - def _decode(self, tlv, offset, decode_path, ctx): - try: - t, _, lv = tag_strip(tlv) - except DecodeError as err: - raise err.__class__( - msg=err.msg, - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) - if t != self.tag: - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, - offset=offset, - ) + def _decode_chunk(self, lv, offset, decode_path, ctx): try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2254,6 +2502,13 @@ class OctetString(Obj): optional=self.optional, _decoded=(offset, llen, l), ) + except DecodeError as err: + raise DecodeError( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) except BoundsError as err: raise DecodeError( msg=str(err), @@ -2263,6 +2518,130 @@ class OctetString(Obj): ) return obj, tail + def _decode(self, tlv, offset, decode_path, ctx, tag_only): + try: + t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if t == self.tag: + if tag_only: + return + return self._decode_chunk(lv, offset, decode_path, ctx) + if t == self.tag_constructed: + if not ctx.get("bered", False): + raise DecodeError( + msg="unallowed BER constructed encoding", + decode_path=decode_path, + offset=offset, + ) + if tag_only: + return + lenindef = False + try: + l, llen, v = len_decode(lv) + except LenIndefForm: + llen, l, v = 1, 0, lv[1:] + lenindef = True + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if l > 0 and l > len(v): + raise NotEnoughData( + "encoded length is longer than data", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + if not lenindef and l == 0: + raise NotEnoughData( + "zero length", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + chunks = [] + sub_offset = offset + tlen + llen + vlen = 0 + while True: + if lenindef: + if v[:EOC_LEN].tobytes() == EOC: + break + else: + if vlen == l: + break + if vlen > l: + raise DecodeError( + msg="chunk out of bounds", + decode_path=len(chunks) - 1, + offset=chunks[-1].offset, + ) + sub_decode_path = decode_path + (str(len(chunks)),) + try: + chunk, v_tail = OctetString().decode( + v, + offset=sub_offset, + decode_path=sub_decode_path, + leavemm=True, + ctx=ctx, + ) + except TagMismatch: + raise DecodeError( + msg="expected OctetString encoded chunk", + decode_path=sub_decode_path, + offset=sub_offset, + ) + chunks.append(chunk) + sub_offset += chunk.tlvlen + vlen += chunk.tlvlen + v = v_tail + if len(chunks) == 0: + raise DecodeError( + msg="no chunks", + decode_path=decode_path, + offset=offset, + ) + try: + obj = self.__class__( + value=b"".join(bytes(chunk) for chunk in chunks), + bounds=(self._bound_min, self._bound_max), + impl=self.tag, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), + ) + except DecodeError as err: + raise DecodeError( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + except BoundsError as err: + raise DecodeError( + msg=str(err), + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj.lenindef = lenindef + obj.bered = True + return obj, (v[EOC_LEN:] if lenindef else v) + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + def __repr__(self): return pp_console_row(next(self.pps())) @@ -2285,6 +2664,9 @@ class OctetString(Obj): expl_tlen=self.expl_tlen if self.expled else None, expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, + lenindef=self.lenindef, + bered=self.bered, ) defined_by, defined = self.defined or (None, None) if defined_by is not None: @@ -2360,7 +2742,7 @@ class Null(Obj): def _encode(self): return self.tag + len_encode(0) - def _decode(self, tlv, offset, decode_path, ctx): + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2376,6 +2758,8 @@ class Null(Obj): decode_path=decode_path, offset=offset, ) + if tag_only: + return try: l, _, v = len_decode(lv) except DecodeError as err: @@ -2419,6 +2803,7 @@ class Null(Obj): expl_tlen=self.expl_tlen if self.expled else None, expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, ) @@ -2605,7 +2990,7 @@ class ObjectIdentifier(Obj): v = b"".join(octets) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx): + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, _, lv = tag_strip(tlv) except DecodeError as err: @@ -2621,6 +3006,8 @@ class ObjectIdentifier(Obj): decode_path=decode_path, offset=offset, ) + if tag_only: + return try: l, llen, v = len_decode(lv) except DecodeError as err: @@ -2705,6 +3092,7 @@ class ObjectIdentifier(Obj): expl_tlen=self.expl_tlen if self.expled else None, expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, ) @@ -2810,7 +3198,7 @@ class CommonString(OctetString): >>> PrintableString("привет мир") Traceback (most recent call last): - UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128) + pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128) >>> BMPString("ада", bounds=(2, 2)) Traceback (most recent call last): @@ -2866,14 +3254,17 @@ class CommonString(OctetString): value_raw = value else: raise InvalidValueType((self.__class__, text_type, binary_type)) - value_raw = ( - value_decoded.encode(self.encoding) - if value_raw is None else value_raw - ) - value_decoded = ( - value_raw.decode(self.encoding) - if value_decoded is None else value_decoded - ) + try: + value_raw = ( + value_decoded.encode(self.encoding) + if value_raw is None else value_raw + ) + value_decoded = ( + value_raw.decode(self.encoding) + if value_decoded is None else value_decoded + ) + except (UnicodeEncodeError, UnicodeDecodeError) as err: + raise DecodeError(str(err)) if not self._bound_min <= len(value_decoded) <= self._bound_max: raise BoundsError( self._bound_min, @@ -2920,6 +3311,11 @@ class CommonString(OctetString): tlen=self.tlen, llen=self.llen, vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, ) @@ -2935,6 +3331,13 @@ class NumericString(CommonString): tag_default = tag_encode(18) encoding = "ascii" asn1_type_name = "NumericString" + allowable_chars = set(digits.encode("ascii")) + + def _value_sanitize(self, value): + value = super(NumericString, self)._value_sanitize(value) + if not set(value) <= self.allowable_chars: + raise DecodeError("non-numeric value") + return value class PrintableString(CommonString): @@ -3101,6 +3504,11 @@ class UTCTime(CommonString): tlen=self.tlen, llen=self.llen, vlen=self.vlen, + expl_offset=self.expl_offset if self.expled else None, + expl_tlen=self.expl_tlen if self.expled else None, + expl_llen=self.expl_llen if self.expled else None, + expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, ) @@ -3209,8 +3617,8 @@ class Choice(Obj): class GeneralName(Choice): schema = ( - ('rfc822Name', IA5String(impl=tag_ctxp(1))), - ('dNSName', IA5String(impl=tag_ctxp(2))), + ("rfc822Name", IA5String(impl=tag_ctxp(1))), + ("dNSName", IA5String(impl=tag_ctxp(2))), ) >>> gn = GeneralName() @@ -3372,32 +3780,45 @@ class Choice(Obj): self._assert_ready() return self._value[1].encode() - def _decode(self, tlv, offset, decode_path, ctx): + def _decode(self, tlv, offset, decode_path, ctx, tag_only): for choice, spec in self.specs.items(): + sub_decode_path = decode_path + (choice,) try: - value, tail = spec.decode( + spec.decode( tlv, offset=offset, leavemm=True, - decode_path=decode_path + (choice,), + decode_path=sub_decode_path, ctx=ctx, + tag_only=True, ) except TagMismatch: continue - obj = self.__class__( - schema=self.specs, - expl=self._expl, - default=self.default, - optional=self.optional, - _decoded=(offset, 0, value.tlvlen), + break + else: + raise TagMismatch( + klass=self.__class__, + decode_path=decode_path, + offset=offset, ) - obj._value = (choice, value) - return obj, tail - raise TagMismatch( - klass=self.__class__, - decode_path=decode_path, + if tag_only: + return + value, tail = spec.decode( + tlv, offset=offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, ) + obj = self.__class__( + schema=self.specs, + expl=self._expl, + default=self.default, + optional=self.optional, + _decoded=(offset, 0, value.tlvlen), + ) + obj._value = (choice, value) + return obj, tail def __repr__(self): value = pp_console_row(next(self.pps())) @@ -3419,6 +3840,7 @@ class Choice(Obj): tlen=self.tlen, llen=self.llen, vlen=self.vlen, + expl_lenindef=self.expl_lenindef, ) if self.ready: yield self.value.pps(decode_path=decode_path + (self.choice,)) @@ -3547,10 +3969,52 @@ class Any(Obj): self._assert_ready() return self._value - def _decode(self, tlv, offset, decode_path, ctx): + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) + except DecodeError as err: + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + try: l, llen, v = len_decode(lv) + except LenIndefForm as err: + if not ctx.get("bered", False): + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + llen, vlen, v = 1, 0, lv[1:] + sub_offset = offset + tlen + llen + chunk_i = 0 + while True: + if v[:EOC_LEN].tobytes() == EOC: + tlvlen = tlen + llen + vlen + EOC_LEN + obj = self.__class__( + value=tlv[:tlvlen].tobytes(), + expl=self._expl, + optional=self.optional, + _decoded=(offset, 0, tlvlen), + ) + obj.lenindef = True + obj.tag = t + return obj, v[EOC_LEN:] + else: + chunk, v = Any().decode( + v, + offset=sub_offset, + decode_path=decode_path + (str(chunk_i),), + leavemm=True, + ctx=ctx, + ) + vlen += chunk.tlvlen + sub_offset += chunk.tlvlen + chunk_i += 1 except DecodeError as err: raise err.__class__( msg=err.msg, @@ -3597,6 +4061,8 @@ class Any(Obj): expl_tlen=self.expl_tlen if self.expled else None, expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, + lenindef=self.lenindef, ) defined_by, defined = self.defined or (None, None) if defined_by is not None: @@ -3674,7 +4140,7 @@ class Sequence(Obj): pyderasn.InvalidValueType: invalid value type, expected: >>> ext["extnID"] = ObjectIdentifier("1.2.3") - You can know if sequence is ready to be encoded: + You can determine if sequence is ready to be encoded: >>> ext.ready False @@ -3700,7 +4166,15 @@ class Sequence(Obj): Assign ``None`` to remove value from sequence. - You can know if value exists/set in the sequence and take its value: + You can set values in Sequence during its initialization: + + >>> AlgorithmIdentifier(( + ("algorithm", ObjectIdentifier("1.2.3")), + ("parameters", Any(Null())) + )) + AlgorithmIdentifier SEQUENCE[OBJECT IDENTIFIER 1.2.3, ANY 0500 OPTIONAL] + + You can determine if value exists/set in the sequence and take its value: >>> "extnID" in ext, "extnValue" in ext, "critical" in ext (True, True, False) @@ -3757,9 +4231,17 @@ class Sequence(Obj): ) self._value = {} if value is not None: - self._value = self._value_sanitize(value) + if issubclass(value.__class__, Sequence): + self._value = value._value + elif hasattr(value, "__iter__"): + for seq_key, seq_value in value: + self[seq_key] = seq_value + else: + raise InvalidValueType((Sequence,)) if default is not None: - default_value = self._value_sanitize(default) + if not issubclass(default.__class__, Sequence): + raise InvalidValueType((Sequence,)) + default_value = default._value default_obj = self.__class__(impl=self.tag, expl=self._expl) default_obj.specs = self.specs default_obj._value = default_value @@ -3767,11 +4249,6 @@ class Sequence(Obj): if value is None: self._value = default_obj.copy()._value - def _value_sanitize(self, value): - if not issubclass(value.__class__, Sequence): - raise InvalidValueType((Sequence,)) - return value._value - @property def ready(self): for name, spec in self.specs.items(): @@ -3868,7 +4345,7 @@ class Sequence(Obj): v = b"".join(self._encoded_values()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx): + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -3884,8 +4361,21 @@ class Sequence(Obj): decode_path=decode_path, offset=offset, ) + if tag_only: + return + lenindef = False try: l, llen, v = len_decode(lv) + except LenIndefForm as err: + if not ctx.get("bered", False): + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + l, llen, v = 0, 1, lv[1:] + lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -3900,11 +4390,16 @@ class Sequence(Obj): decode_path=decode_path, offset=offset, ) - v, tail = v[:l], v[l:] + if not lenindef: + v, tail = v[:l], v[l:] + vlen = 0 sub_offset = offset + tlen + llen values = {} for name, spec in self.specs.items(): - if len(v) == 0 and spec.optional: + if spec.optional and ( + (lenindef and v[:EOC_LEN].tobytes() == EOC) or + len(v) == 0 + ): continue sub_decode_path = decode_path + (name,) try: @@ -3967,7 +4462,9 @@ class Sequence(Obj): ) value.defined = (defined_by, defined_value) - sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen) + value_len = value.expl_tlvlen if value.expled else value.tlvlen + vlen += value_len + sub_offset += value_len v = v_tail if spec.default is not None and value == spec.default: if ctx.get("strict_default_existence", False): @@ -3994,7 +4491,17 @@ class Sequence(Obj): abs_decode_path(sub_decode_path[:-1], rel_path), (value, defined), )) - if len(v) > 0: + if lenindef: + if v[:EOC_LEN].tobytes() != EOC: + raise DecodeError( + "no EOC", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + tail = v[EOC_LEN:] + vlen += EOC_LEN + elif len(v) > 0: raise DecodeError( "remaining data", klass=self.__class__, @@ -4007,9 +4514,10 @@ class Sequence(Obj): expl=self._expl, default=self.default, optional=self.optional, - _decoded=(offset, llen, l), + _decoded=(offset, llen, vlen), ) obj._value = values + obj.lenindef = lenindef return obj, tail def __repr__(self): @@ -4039,6 +4547,8 @@ class Sequence(Obj): expl_tlen=self.expl_tlen if self.expled else None, expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, + lenindef=self.lenindef, ) for name in self.specs: value = self._value.get(name) @@ -4062,7 +4572,7 @@ class Set(Sequence): v = b"".join(raws) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx): + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -4078,8 +4588,21 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) + if tag_only: + return + lenindef = False try: l, llen, v = len_decode(lv) + except LenIndefForm as err: + if not ctx.get("bered", False): + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + l, llen, v = 0, 1, lv[1:] + lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -4093,29 +4616,28 @@ class Set(Sequence): klass=self.__class__, offset=offset, ) - v, tail = v[:l], v[l:] + if not lenindef: + v, tail = v[:l], v[l:] + vlen = 0 sub_offset = offset + tlen + llen values = {} specs_items = self.specs.items while len(v) > 0: + if lenindef and v[:EOC_LEN].tobytes() == EOC: + break for name, spec in specs_items(): + sub_decode_path = decode_path + (name,) try: - value, v_tail = spec.decode( + spec.decode( v, sub_offset, leavemm=True, - decode_path=decode_path + (name,), + decode_path=sub_decode_path, ctx=ctx, + tag_only=True, ) except TagMismatch: continue - sub_offset += ( - value.expl_tlvlen if value.expled else value.tlvlen - ) - v = v_tail - if spec.default is None or value != spec.default: # pragma: no cover - # SeqMixing.test_encoded_default_accepted covers that place - values[name] = value break else: raise TagMismatch( @@ -4123,16 +4645,38 @@ class Set(Sequence): decode_path=decode_path, offset=offset, ) + value, v_tail = spec.decode( + v, + sub_offset, + leavemm=True, + decode_path=sub_decode_path, + ctx=ctx, + ) + value_len = value.expl_tlvlen if value.expled else value.tlvlen + sub_offset += value_len + vlen += value_len + v = v_tail + if spec.default is None or value != spec.default: # pragma: no cover + # SeqMixing.test_encoded_default_accepted covers that place + values[name] = value obj = self.__class__( schema=self.specs, impl=self.tag, expl=self._expl, default=self.default, optional=self.optional, - _decoded=(offset, llen, l), + _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)), ) obj._value = values - return obj, tail + if not obj.ready: + raise DecodeError( + msg="not all values are ready", + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + obj.lenindef = lenindef + return obj, (v[EOC_LEN:] if lenindef else tail) class SequenceOf(Obj): @@ -4315,7 +4859,7 @@ class SequenceOf(Obj): v = b"".join(self._encoded_values()) return b"".join((self.tag, len_encode(len(v)), v)) - def _decode(self, tlv, offset, decode_path, ctx): + def _decode(self, tlv, offset, decode_path, ctx, tag_only): try: t, tlen, lv = tag_strip(tlv) except DecodeError as err: @@ -4331,8 +4875,21 @@ class SequenceOf(Obj): decode_path=decode_path, offset=offset, ) + if tag_only: + return + lenindef = False try: l, llen, v = len_decode(lv) + except LenIndefForm as err: + if not ctx.get("bered", False): + raise err.__class__( + msg=err.msg, + klass=self.__class__, + decode_path=decode_path, + offset=offset, + ) + l, llen, v = 0, 1, lv[1:] + lenindef = True except DecodeError as err: raise err.__class__( msg=err.msg, @@ -4347,11 +4904,15 @@ class SequenceOf(Obj): decode_path=decode_path, offset=offset, ) - v, tail = v[:l], v[l:] + if not lenindef: + v, tail = v[:l], v[l:] + vlen = 0 sub_offset = offset + tlen + llen _value = [] spec = self.spec while len(v) > 0: + if lenindef and v[:EOC_LEN].tobytes() == EOC: + break value, v_tail = spec.decode( v, sub_offset, @@ -4359,7 +4920,9 @@ class SequenceOf(Obj): decode_path=decode_path + (str(len(_value)),), ctx=ctx, ) - sub_offset += (value.expl_tlvlen if value.expled else value.tlvlen) + value_len = value.expl_tlvlen if value.expled else value.tlvlen + sub_offset += value_len + vlen += value_len v = v_tail _value.append(value) obj = self.__class__( @@ -4370,9 +4933,10 @@ class SequenceOf(Obj): expl=self._expl, default=self.default, optional=self.optional, - _decoded=(offset, llen, l), + _decoded=(offset, llen, vlen), ) - return obj, tail + obj.lenindef = lenindef + return obj, (v[EOC_LEN:] if lenindef else tail) def __repr__(self): return "%s[%s]" % ( @@ -4397,6 +4961,8 @@ class SequenceOf(Obj): expl_tlen=self.expl_tlen if self.expled else None, expl_llen=self.expl_llen if self.expled else None, expl_vlen=self.expl_vlen if self.expled else None, + expl_lenindef=self.expl_lenindef, + lenindef=self.lenindef, ) for i, value in enumerate(self._value): yield value.pps(decode_path=decode_path + (str(i),)) @@ -4480,7 +5046,7 @@ def generic_decoder(): # pragma: no cover def main(): # pragma: no cover import argparse - parser = argparse.ArgumentParser(description="PyDERASN ASN.1 DER decoder") + parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/DER decoder") parser.add_argument( "--skip", type=int, @@ -4500,9 +5066,9 @@ def main(): # pragma: no cover help="Python path to decoder's defines_by_path", ) parser.add_argument( - "--with-colours", + "--nobered", action='store_true', - help="Enable coloured output", + help="Disallow BER encoding", ) parser.add_argument( "DERFile", @@ -4520,17 +5086,14 @@ def main(): # pragma: no cover pprinter = partial(pprint, big_blobs=True) else: schema, pprinter = generic_decoder() - obj, tail = schema().decode( - der, - ctx=( - None if args.defines_by_path is None else - {"defines_by_path": obj_by_path(args.defines_by_path)} - ), - ) + ctx = {"bered": not args.nobered} + if args.defines_by_path is not None: + ctx["defines_by_path"] = obj_by_path(args.defines_by_path) + obj, tail = schema().decode(der, ctx=ctx) print(pprinter( obj, oids=oids, - with_colours=True if args.with_colours else False, + with_colours=True if environ.get("NO_COLOR") is None else False, )) if tail != b"": print("\nTrailing data: %s" % hexenc(tail))