3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
673 POSIX compliant systems have ``mmap`` syscall, giving ability to work
674 the memory mapped file. You can deal with the file like it was an
675 ordinary binary string, allowing you not to load it to the memory first.
676 Also you can use them as an input for OCTET STRING, taking no Python
677 memory for their storage.
679 There is convenient :py:func:`pyderasn.file_mmaped` function that
680 creates read-only memoryview on the file contents::
682 with open("huge", "rb") as fd:
683 raw = file_mmaped(fd)
684 obj = Something.decode(raw)
688 mmap-ed files in Python2.7 does not implement buffer protocol, so
689 memoryview won't work on them.
693 mmap maps the **whole** file. So it plays no role if you seek-ed it
694 before. Take the slice of the resulting memoryview with required
699 If you use ZFS as underlying storage, then pay attention that
700 currently most platforms does not deal good with ZFS ARC and ordinary
701 page cache used for mmaps. It can take twice the necessary size in
702 the memory: both in page cache and ZFS ARC.
707 We can parse any kind of data now, but how can we produce files
708 streamingly, without storing their encoded representation in memory?
709 SEQUENCE by default encodes in memory all its values, joins them in huge
710 binary string, just to know the exact size of SEQUENCE's value for
711 encoding it in TLV. DER requires you to know all exact sizes of the
714 You can use CER encoding mode, that slightly differs from the DER, but
715 does not require exact sizes knowledge, allowing streaming encoding
716 directly to some writer/buffer. Just use
717 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
718 encoded data will flow::
720 opener = io.open if PY2 else open
721 with opener("result", "wb") as fd:
722 obj.encode_cer(fd.write)
727 obj.encode_cer(buf.write)
729 If you do not want to create in-memory buffer every time, then you can
730 use :py:func:`pyderasn.encode_cer` function::
732 data = encode_cer(obj)
734 Remember that CER is **not valid** DER in most cases, so you **have to**
735 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
736 decoding. Also currently there is **no** validation that provided CER is
737 valid one -- you are sure that it has only valid BER encoding.
741 SET OF values can not be streamingly encoded, because they are
742 required to be sorted byte-by-byte. Big SET OF values still will take
743 much memory. Use neither SET nor SET OF values, as modern ASN.1
746 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
747 OCTET STRINGs! They will be streamingly copied from underlying file to
748 the buffer using 1 KB chunks.
750 Some structures require that some of the elements have to be forcefully
751 DER encoded. For example ``SignedData`` CMS requires you to encode
752 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
753 encode everything else in BER. You can tell any of the structures to be
754 forcefully encoded in DER during CER encoding, by specifying
755 ``der_forced=True`` attribute::
757 class Certificate(Sequence):
761 class SignedAttributes(SetOf):
769 In most cases, huge quantity of binary data is stored as OCTET STRING.
770 CER encoding splits it on 1 KB chunks. BER allows splitting on various
771 levels of chunks inclusion::
773 SOME STRING[CONSTRUCTED]
774 OCTET STRING[CONSTRUCTED]
775 OCTET STRING[PRIMITIVE]
777 OCTET STRING[PRIMITIVE]
779 OCTET STRING[PRIMITIVE]
781 OCTET STRING[PRIMITIVE]
783 OCTET STRING[CONSTRUCTED]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[CONSTRUCTED]
789 OCTET STRING[CONSTRUCTED]
790 OCTET STRING[PRIMITIVE]
793 You can not just take the offset and some ``.vlen`` of the STRING and
794 treat it as the payload. If you decode it without
795 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
796 and ``bytes()`` will give the whole payload contents.
798 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
799 small memory footprint. There is convenient
800 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
801 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
802 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
803 SHA512 digest of its ``eContent``::
805 fd = open("data.p7m", "rb")
806 raw = file_mmaped(fd)
807 ctx = {"bered": True}
808 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
809 if decode_path == ("content",):
813 raise ValueError("no content found")
814 hasher_state = sha512()
816 hasher_state.update(data)
818 evgens = SignedData().decode_evgen(
819 raw[content.offset:],
820 offset=content.offset,
823 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
825 digest = hasher_state.digest()
827 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
828 copy the payload (without BER/CER encoding interleaved overhead) in it.
829 Virtually it won't take memory more than for keeping small structures
830 and 1 KB binary chunks.
834 SEQUENCE OF iterators
835 _____________________
837 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
838 classes. The only difference with providing the full list of objects, is
839 that type and bounds checking is done during encoding process. Also
840 sequence's value will be emptied after encoding, forcing you to set its
843 This is very useful when you have to create some huge objects, like
844 CRLs, with thousands and millions of entities inside. You can write the
845 generator taking necessary data from the database and giving the
846 ``RevokedCertificate`` objects. Only binary representation of that
847 objects will take memory during DER encoding.
852 There is ability to do 2-pass encoding to DER, writing results directly
853 to specified writer (buffer, file, whatever). It could be 1.5+ times
854 slower than ordinary encoding, but it takes little memory for 1st pass
855 state storing. For example, 1st pass state for CACert.org's CRL with
856 ~416K of certificate entries takes nearly 3.5 MB of memory.
857 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
858 nearly 0.5 KB of memory.
860 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
861 iterators <seqof-iterators>` and write directly to opened file, then
862 there is very small memory footprint.
864 1st pass traverses through all the objects of the structure and returns
865 the size of DER encoded structure, together with 1st pass state object.
866 That state contains precalculated lengths for various objects inside the
871 fulllen, state = obj.encode1st()
873 2nd pass takes the writer and 1st pass state. It traverses through all
874 the objects again, but writes their encoded representation to the writer.
878 opener = io.open if PY2 else open
879 with opener("result", "wb") as fd:
880 obj.encode2nd(fd.write, iter(state))
884 You **MUST NOT** use 1st pass state if anything is changed in the
885 objects. It is intended to be used immediately after 1st pass is
888 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
889 have to reinitialize the values after the 1st pass. And you **have to**
890 be sure that the iterator gives exactly the same values as previously.
891 Yes, you have to run your iterator twice -- because this is two pass
894 If you want to encode to the memory, then you can use convenient
895 :py:func:`pyderasn.encode2pass` helper.
899 .. autoclass:: pyderasn.Obj
907 .. autoclass:: pyderasn.Boolean
912 .. autoclass:: pyderasn.Integer
913 :members: __init__, named
917 .. autoclass:: pyderasn.BitString
918 :members: __init__, bit_len, named
922 .. autoclass:: pyderasn.OctetString
927 .. autoclass:: pyderasn.Null
932 .. autoclass:: pyderasn.ObjectIdentifier
937 .. autoclass:: pyderasn.Enumerated
941 .. autoclass:: pyderasn.CommonString
945 .. autoclass:: pyderasn.NumericString
949 .. autoclass:: pyderasn.PrintableString
950 :members: __init__, allow_asterisk, allow_ampersand
954 .. autoclass:: pyderasn.UTCTime
955 :members: __init__, todatetime
959 .. autoclass:: pyderasn.GeneralizedTime
960 :members: __init__, todatetime
967 .. autoclass:: pyderasn.Choice
968 :members: __init__, choice, value
972 .. autoclass:: PrimitiveTypes
976 .. autoclass:: pyderasn.Any
984 .. autoclass:: pyderasn.Sequence
989 .. autoclass:: pyderasn.Set
994 .. autoclass:: pyderasn.SequenceOf
999 .. autoclass:: pyderasn.SetOf
1005 .. autofunction:: pyderasn.abs_decode_path
1006 .. autofunction:: pyderasn.agg_octet_string
1007 .. autofunction:: pyderasn.colonize_hex
1008 .. autofunction:: pyderasn.encode2pass
1009 .. autofunction:: pyderasn.encode_cer
1010 .. autofunction:: pyderasn.file_mmaped
1011 .. autofunction:: pyderasn.hexenc
1012 .. autofunction:: pyderasn.hexdec
1013 .. autofunction:: pyderasn.tag_encode
1014 .. autofunction:: pyderasn.tag_decode
1015 .. autofunction:: pyderasn.tag_ctxp
1016 .. autofunction:: pyderasn.tag_ctxc
1017 .. autoclass:: pyderasn.DecodeError
1019 .. autoclass:: pyderasn.NotEnoughData
1020 .. autoclass:: pyderasn.ExceedingData
1021 .. autoclass:: pyderasn.LenIndefForm
1022 .. autoclass:: pyderasn.TagMismatch
1023 .. autoclass:: pyderasn.InvalidLength
1024 .. autoclass:: pyderasn.InvalidOID
1025 .. autoclass:: pyderasn.ObjUnknown
1026 .. autoclass:: pyderasn.ObjNotReady
1027 .. autoclass:: pyderasn.InvalidValueType
1028 .. autoclass:: pyderasn.BoundsError
1035 You can decode DER/BER files using command line abilities::
1037 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1039 If there is no schema for your file, then you can try parsing it without,
1040 but of course IMPLICIT tags will often make it impossible. But result is
1041 good enough for the certificate above::
1043 $ python -m pyderasn path/to/file
1044 0 [1,3,1604] . >: SEQUENCE OF
1045 4 [1,3,1453] . . >: SEQUENCE OF
1046 8 [0,0, 5] . . . . >: [0] ANY
1047 . . . . . A0:03:02:01:02
1048 13 [1,1, 3] . . . . >: INTEGER 61595
1049 18 [1,1, 13] . . . . >: SEQUENCE OF
1050 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1051 31 [1,1, 0] . . . . . . >: NULL
1052 33 [1,3, 274] . . . . >: SEQUENCE OF
1053 37 [1,1, 11] . . . . . . >: SET OF
1054 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1055 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1056 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1058 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1059 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1060 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1061 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1062 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1063 . . . . . . . . . 61:2E:63:6F:6D:2F
1064 1461 [1,1, 13] . . >: SEQUENCE OF
1065 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1066 1474 [1,1, 0] . . . . >: NULL
1067 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1068 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1069 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1075 If you have got dictionaries with ObjectIdentifiers, like example one
1076 from ``tests/test_crts.py``::
1079 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1080 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1082 "2.5.4.10": "id-at-organizationName",
1083 "2.5.4.11": "id-at-organizationalUnitName",
1086 then you can pass it to pretty printer to see human readable OIDs::
1088 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1090 37 [1,1, 11] . . . . . . >: SET OF
1091 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1092 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1093 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1094 50 [1,1, 18] . . . . . . >: SET OF
1095 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1096 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1097 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1098 70 [1,1, 18] . . . . . . >: SET OF
1099 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1100 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1101 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1107 Each decoded element has so-called decode path: sequence of structure
1108 names it is passing during the decode process. Each element has its own
1109 unique path inside the whole ASN.1 tree. You can print it out with
1110 ``--print-decode-path`` option::
1112 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1113 0 [1,3,1604] Certificate SEQUENCE []
1114 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1115 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1116 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1117 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1118 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1119 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1121 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1122 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1123 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1124 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1125 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1126 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1127 . . . . . . . 13:02:45:53
1128 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1131 Now you can print only the specified tree, for example signature algorithm::
1133 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1134 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1135 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1136 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1140 from array import array
1141 from codecs import getdecoder
1142 from codecs import getencoder
1143 from collections import namedtuple
1144 from collections import OrderedDict
1145 from copy import copy
1146 from datetime import datetime
1147 from datetime import timedelta
1148 from io import BytesIO
1149 from math import ceil
1150 from mmap import mmap
1151 from mmap import PROT_READ
1152 from operator import attrgetter
1153 from string import ascii_letters
1154 from string import digits
1155 from sys import maxsize as sys_maxsize
1156 from sys import version_info
1157 from unicodedata import category as unicat
1159 from six import add_metaclass
1160 from six import binary_type
1161 from six import byte2int
1162 from six import indexbytes
1163 from six import int2byte
1164 from six import integer_types
1165 from six import iterbytes
1166 from six import iteritems
1167 from six import itervalues
1169 from six import string_types
1170 from six import text_type
1171 from six import unichr as six_unichr
1172 from six.moves import xrange as six_xrange
1176 from termcolor import colored
1177 except ImportError: # pragma: no cover
1178 def colored(what, *args, **kwargs):
1228 "TagClassApplication",
1231 "TagClassUniversal",
1232 "TagFormConstructed",
1243 TagClassUniversal = 0
1244 TagClassApplication = 1 << 6
1245 TagClassContext = 1 << 7
1246 TagClassPrivate = 1 << 6 | 1 << 7
1247 TagFormPrimitive = 0
1248 TagFormConstructed = 1 << 5
1250 TagClassContext: "",
1251 TagClassApplication: "APPLICATION ",
1252 TagClassPrivate: "PRIVATE ",
1253 TagClassUniversal: "UNIV ",
1257 LENINDEF = b"\x80" # length indefinite mark
1258 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1259 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1260 SET01 = frozenset("01")
1261 DECIMALS = frozenset(digits)
1262 DECIMAL_SIGNS = ".,"
1263 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1266 def file_mmaped(fd):
1267 """Make mmap-ed memoryview for reading from file
1269 :param fd: file object
1270 :returns: memoryview over read-only mmap-ing of the whole file
1272 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1275 if not set(value) <= DECIMALS:
1276 raise ValueError("non-pure integer")
1279 def fractions2float(fractions_raw):
1280 pureint(fractions_raw)
1281 return float("0." + fractions_raw)
1284 def get_def_by_path(defines_by_path, sub_decode_path):
1285 """Get define by decode path
1287 for path, define in defines_by_path:
1288 if len(path) != len(sub_decode_path):
1290 for p1, p2 in zip(path, sub_decode_path):
1291 if (not p1 is any) and (p1 != p2):
1297 ########################################################################
1299 ########################################################################
1301 class ASN1Error(ValueError):
1305 class DecodeError(ASN1Error):
1306 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1308 :param str msg: reason of decode failing
1309 :param klass: optional exact DecodeError inherited class (like
1310 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1311 :py:exc:`InvalidLength`)
1312 :param decode_path: tuple of strings. It contains human
1313 readable names of the fields through which
1314 decoding process has passed
1315 :param int offset: binary offset where failure happened
1317 super(DecodeError, self).__init__()
1320 self.decode_path = decode_path
1321 self.offset = offset
1326 "" if self.klass is None else self.klass.__name__,
1328 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1329 if len(self.decode_path) > 0 else ""
1331 ("(at %d)" % self.offset) if self.offset > 0 else "",
1337 return "%s(%s)" % (self.__class__.__name__, self)
1340 class NotEnoughData(DecodeError):
1344 class ExceedingData(ASN1Error):
1345 def __init__(self, nbytes):
1346 super(ExceedingData, self).__init__()
1347 self.nbytes = nbytes
1350 return "%d trailing bytes" % self.nbytes
1353 return "%s(%s)" % (self.__class__.__name__, self)
1356 class LenIndefForm(DecodeError):
1360 class TagMismatch(DecodeError):
1364 class InvalidLength(DecodeError):
1368 class InvalidOID(DecodeError):
1372 class ObjUnknown(ASN1Error):
1373 def __init__(self, name):
1374 super(ObjUnknown, self).__init__()
1378 return "object is unknown: %s" % self.name
1381 return "%s(%s)" % (self.__class__.__name__, self)
1384 class ObjNotReady(ASN1Error):
1385 def __init__(self, name):
1386 super(ObjNotReady, self).__init__()
1390 return "object is not ready: %s" % self.name
1393 return "%s(%s)" % (self.__class__.__name__, self)
1396 class InvalidValueType(ASN1Error):
1397 def __init__(self, expected_types):
1398 super(InvalidValueType, self).__init__()
1399 self.expected_types = expected_types
1402 return "invalid value type, expected: %s" % ", ".join(
1403 [repr(t) for t in self.expected_types]
1407 return "%s(%s)" % (self.__class__.__name__, self)
1410 class BoundsError(ASN1Error):
1411 def __init__(self, bound_min, value, bound_max):
1412 super(BoundsError, self).__init__()
1413 self.bound_min = bound_min
1415 self.bound_max = bound_max
1418 return "unsatisfied bounds: %s <= %s <= %s" % (
1425 return "%s(%s)" % (self.__class__.__name__, self)
1428 ########################################################################
1430 ########################################################################
1432 _hexdecoder = getdecoder("hex")
1433 _hexencoder = getencoder("hex")
1437 """Binary data to hexadecimal string convert
1439 return _hexdecoder(data)[0]
1443 """Hexadecimal string to binary data convert
1445 return _hexencoder(data)[0].decode("ascii")
1448 def int_bytes_len(num, byte_len=8):
1451 return int(ceil(float(num.bit_length()) / byte_len))
1454 def zero_ended_encode(num):
1455 octets = bytearray(int_bytes_len(num, 7))
1457 octets[i] = num & 0x7F
1461 octets[i] = 0x80 | (num & 0x7F)
1464 return bytes(octets)
1467 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1468 """Encode tag to binary form
1470 :param int num: tag's number
1471 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1472 :py:data:`pyderasn.TagClassContext`,
1473 :py:data:`pyderasn.TagClassApplication`,
1474 :py:data:`pyderasn.TagClassPrivate`)
1475 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1476 :py:data:`pyderasn.TagFormConstructed`)
1480 return int2byte(klass | form | num)
1481 # [XX|X|11111][1.......][1.......] ... [0.......]
1482 return int2byte(klass | form | 31) + zero_ended_encode(num)
1485 def tag_decode(tag):
1486 """Decode tag from binary form
1490 No validation is performed, assuming that it has already passed.
1492 It returns tuple with three integers, as
1493 :py:func:`pyderasn.tag_encode` accepts.
1495 first_octet = byte2int(tag)
1496 klass = first_octet & 0xC0
1497 form = first_octet & 0x20
1498 if first_octet & 0x1F < 0x1F:
1499 return (klass, form, first_octet & 0x1F)
1501 for octet in iterbytes(tag[1:]):
1504 return (klass, form, num)
1508 """Create CONTEXT PRIMITIVE tag
1510 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1514 """Create CONTEXT CONSTRUCTED tag
1516 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1519 def tag_strip(data):
1520 """Take off tag from the data
1522 :returns: (encoded tag, tag length, remaining data)
1525 raise NotEnoughData("no data at all")
1526 if byte2int(data) & 0x1F < 31:
1527 return data[:1], 1, data[1:]
1532 raise DecodeError("unfinished tag")
1533 if indexbytes(data, i) & 0x80 == 0:
1536 return data[:i], i, data[i:]
1542 octets = bytearray(int_bytes_len(l) + 1)
1543 octets[0] = 0x80 | (len(octets) - 1)
1544 for i in six_xrange(len(octets) - 1, 0, -1):
1545 octets[i] = l & 0xFF
1547 return bytes(octets)
1550 def len_decode(data):
1553 :returns: (decoded length, length's length, remaining data)
1554 :raises LenIndefForm: if indefinite form encoding is met
1557 raise NotEnoughData("no data at all")
1558 first_octet = byte2int(data)
1559 if first_octet & 0x80 == 0:
1560 return first_octet, 1, data[1:]
1561 octets_num = first_octet & 0x7F
1562 if octets_num + 1 > len(data):
1563 raise NotEnoughData("encoded length is longer than data")
1565 raise LenIndefForm()
1566 if byte2int(data[1:]) == 0:
1567 raise DecodeError("leading zeros")
1569 for v in iterbytes(data[1:1 + octets_num]):
1572 raise DecodeError("long form instead of short one")
1573 return l, 1 + octets_num, data[1 + octets_num:]
1576 LEN0 = len_encode(0)
1577 LEN1 = len_encode(1)
1578 LEN1K = len_encode(1000)
1582 """How many bytes length field will take
1586 if l < 256: # 1 << 8
1588 if l < 65536: # 1 << 16
1590 if l < 16777216: # 1 << 24
1592 if l < 4294967296: # 1 << 32
1594 if l < 1099511627776: # 1 << 40
1596 if l < 281474976710656: # 1 << 48
1598 if l < 72057594037927936: # 1 << 56
1600 raise OverflowError("too big length")
1603 def write_full(writer, data):
1604 """Fully write provided data
1606 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1608 BytesIO does not guarantee that the whole data will be written at
1609 once. That function write everything provided, raising an error if
1610 ``writer`` returns None.
1612 data = memoryview(data)
1614 while written != len(data):
1615 n = writer(data[written:])
1617 raise ValueError("can not write to buf")
1621 # If it is 64-bit system, then use compact 64-bit array of unsigned
1622 # longs. Use an ordinary list with universal integers otherwise, that
1624 if sys_maxsize > 2 ** 32:
1625 def state_2pass_new():
1628 def state_2pass_new():
1632 ########################################################################
1634 ########################################################################
1636 class AutoAddSlots(type):
1637 def __new__(cls, name, bases, _dict):
1638 _dict["__slots__"] = _dict.get("__slots__", ())
1639 return type.__new__(cls, name, bases, _dict)
1642 BasicState = namedtuple("BasicState", (
1655 ), **NAMEDTUPLE_KWARGS)
1658 @add_metaclass(AutoAddSlots)
1660 """Common ASN.1 object class
1662 All ASN.1 types are inherited from it. It has metaclass that
1663 automatically adds ``__slots__`` to all inherited classes.
1688 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1689 self._expl = getattr(self, "expl", None) if expl is None else expl
1690 if self.tag != self.tag_default and self._expl is not None:
1691 raise ValueError("implicit and explicit tags can not be set simultaneously")
1692 if self.tag is None:
1693 self._tag_order = None
1695 tag_class, _, tag_num = tag_decode(
1696 self.tag if self._expl is None else self._expl
1698 self._tag_order = (tag_class, tag_num)
1699 if default is not None:
1701 self.optional = optional
1702 self.offset, self.llen, self.vlen = _decoded
1704 self.expl_lenindef = False
1705 self.lenindef = False
1706 self.ber_encoded = False
1709 def ready(self): # pragma: no cover
1710 """Is object ready to be encoded?
1712 raise NotImplementedError()
1714 def _assert_ready(self):
1716 raise ObjNotReady(self.__class__.__name__)
1720 """Is either object or any elements inside is BER encoded?
1722 return self.expl_lenindef or self.lenindef or self.ber_encoded
1726 """Is object decoded?
1728 return (self.llen + self.vlen) > 0
1730 def __getstate__(self): # pragma: no cover
1731 """Used for making safe to be mutable pickleable copies
1733 raise NotImplementedError()
1735 def __setstate__(self, state):
1736 if state.version != __version__:
1737 raise ValueError("data is pickled by different PyDERASN version")
1738 self.tag = state.tag
1739 self._tag_order = state.tag_order
1740 self._expl = state.expl
1741 self.default = state.default
1742 self.optional = state.optional
1743 self.offset = state.offset
1744 self.llen = state.llen
1745 self.vlen = state.vlen
1746 self.expl_lenindef = state.expl_lenindef
1747 self.lenindef = state.lenindef
1748 self.ber_encoded = state.ber_encoded
1751 def tag_order(self):
1752 """Tag's (class, number) used for DER/CER sorting
1754 return self._tag_order
1757 def tag_order_cer(self):
1758 return self.tag_order
1762 """See :ref:`decoding`
1764 return len(self.tag)
1768 """See :ref:`decoding`
1770 return self.tlen + self.llen + self.vlen
1772 def __str__(self): # pragma: no cover
1773 return self.__bytes__() if PY2 else self.__unicode__()
1775 def __ne__(self, their):
1776 return not(self == their)
1778 def __gt__(self, their): # pragma: no cover
1779 return not(self < their)
1781 def __le__(self, their): # pragma: no cover
1782 return (self == their) or (self < their)
1784 def __ge__(self, their): # pragma: no cover
1785 return (self == their) or (self > their)
1787 def _encode(self): # pragma: no cover
1788 raise NotImplementedError()
1790 def _encode_cer(self, writer):
1791 write_full(writer, self._encode())
1793 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1794 yield NotImplemented
1796 def _encode1st(self, state):
1797 raise NotImplementedError()
1799 def _encode2nd(self, writer, state_iter):
1800 raise NotImplementedError()
1803 """DER encode the structure
1805 :returns: DER representation
1807 raw = self._encode()
1808 if self._expl is None:
1810 return b"".join((self._expl, len_encode(len(raw)), raw))
1812 def encode1st(self, state=None):
1813 """Do the 1st pass of 2-pass encoding
1815 :rtype: (int, array("L"))
1816 :returns: full length of encoded data and precalculated various
1820 state = state_2pass_new()
1821 if self._expl is None:
1822 return self._encode1st(state)
1824 idx = len(state) - 1
1825 vlen, _ = self._encode1st(state)
1827 fulllen = len(self._expl) + len_size(vlen) + vlen
1828 return fulllen, state
1830 def encode2nd(self, writer, state_iter):
1831 """Do the 2nd pass of 2-pass encoding
1833 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1834 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1836 if self._expl is None:
1837 self._encode2nd(writer, state_iter)
1839 write_full(writer, self._expl + len_encode(next(state_iter)))
1840 self._encode2nd(writer, state_iter)
1842 def encode_cer(self, writer):
1843 """CER encode the structure to specified writer
1845 :param writer: must comply with ``io.RawIOBase.write``
1846 behaviour. It takes slice to be written and
1847 returns number of bytes processed. If it returns
1848 None, then exception will be raised
1850 if self._expl is not None:
1851 write_full(writer, self._expl + LENINDEF)
1852 if getattr(self, "der_forced", False):
1853 write_full(writer, self._encode())
1855 self._encode_cer(writer)
1856 if self._expl is not None:
1857 write_full(writer, EOC)
1859 def hexencode(self):
1860 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1862 return hexenc(self.encode())
1872 _ctx_immutable=True,
1876 :param data: either binary or memoryview
1877 :param int offset: initial data's offset
1878 :param bool leavemm: do we need to leave memoryview of remaining
1879 data as is, or convert it to bytes otherwise
1880 :param decode_path: current decode path (tuples of strings,
1881 possibly with DecodePathDefBy) with will be
1882 the root for all underlying objects
1883 :param ctx: optional :ref:`context <ctx>` governing decoding process
1884 :param bool tag_only: decode only the tag, without length and
1885 contents (used only in Choice and Set
1886 structures, trying to determine if tag satisfies
1888 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1890 :returns: (Obj, remaining data)
1892 .. seealso:: :ref:`decoding`
1894 result = next(self.decode_evgen(
1906 _, obj, tail = result
1917 _ctx_immutable=True,
1920 """Decode with evgen mode on
1922 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1923 it returns the generator producing ``(decode_path, obj, tail)``
1924 values. See :ref:`evgen mode <evgen_mode>`.
1928 elif _ctx_immutable:
1930 tlv = memoryview(data)
1933 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1936 if self._expl is None:
1937 for result in self._decode(
1940 decode_path=decode_path,
1943 evgen_mode=_evgen_mode,
1948 _decode_path, obj, tail = result
1949 if not _decode_path is decode_path:
1953 t, tlen, lv = tag_strip(tlv)
1954 except DecodeError as err:
1955 raise err.__class__(
1957 klass=self.__class__,
1958 decode_path=decode_path,
1963 klass=self.__class__,
1964 decode_path=decode_path,
1968 l, llen, v = len_decode(lv)
1969 except LenIndefForm as err:
1970 if not ctx.get("bered", False):
1971 raise err.__class__(
1973 klass=self.__class__,
1974 decode_path=decode_path,
1978 offset += tlen + llen
1979 for result in self._decode(
1982 decode_path=decode_path,
1985 evgen_mode=_evgen_mode,
1987 if tag_only: # pragma: no cover
1990 _decode_path, obj, tail = result
1991 if not _decode_path is decode_path:
1993 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1994 if eoc_expected.tobytes() != EOC:
1997 klass=self.__class__,
1998 decode_path=decode_path,
2002 obj.expl_lenindef = True
2003 except DecodeError as err:
2004 raise err.__class__(
2006 klass=self.__class__,
2007 decode_path=decode_path,
2012 raise NotEnoughData(
2013 "encoded length is longer than data",
2014 klass=self.__class__,
2015 decode_path=decode_path,
2018 for result in self._decode(
2020 offset=offset + tlen + llen,
2021 decode_path=decode_path,
2024 evgen_mode=_evgen_mode,
2026 if tag_only: # pragma: no cover
2029 _decode_path, obj, tail = result
2030 if not _decode_path is decode_path:
2032 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2034 "explicit tag out-of-bound, longer than data",
2035 klass=self.__class__,
2036 decode_path=decode_path,
2039 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2041 def decod(self, data, offset=0, decode_path=(), ctx=None):
2042 """Decode the data, check that tail is empty
2044 :raises ExceedingData: if tail is not empty
2046 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2047 (decode without tail) that also checks that there is no
2050 obj, tail = self.decode(
2053 decode_path=decode_path,
2058 raise ExceedingData(len(tail))
2061 def hexdecode(self, data, *args, **kwargs):
2062 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2064 return self.decode(hexdec(data), *args, **kwargs)
2066 def hexdecod(self, data, *args, **kwargs):
2067 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2069 return self.decod(hexdec(data), *args, **kwargs)
2073 """See :ref:`decoding`
2075 return self._expl is not None
2079 """See :ref:`decoding`
2084 def expl_tlen(self):
2085 """See :ref:`decoding`
2087 return len(self._expl)
2090 def expl_llen(self):
2091 """See :ref:`decoding`
2093 if self.expl_lenindef:
2095 return len(len_encode(self.tlvlen))
2098 def expl_offset(self):
2099 """See :ref:`decoding`
2101 return self.offset - self.expl_tlen - self.expl_llen
2104 def expl_vlen(self):
2105 """See :ref:`decoding`
2110 def expl_tlvlen(self):
2111 """See :ref:`decoding`
2113 return self.expl_tlen + self.expl_llen + self.expl_vlen
2116 def fulloffset(self):
2117 """See :ref:`decoding`
2119 return self.expl_offset if self.expled else self.offset
2123 """See :ref:`decoding`
2125 return self.expl_tlvlen if self.expled else self.tlvlen
2127 def pps_lenindef(self, decode_path):
2128 if self.lenindef and not (
2129 getattr(self, "defined", None) is not None and
2130 self.defined[1].lenindef
2133 asn1_type_name="EOC",
2135 decode_path=decode_path,
2137 self.offset + self.tlvlen -
2138 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2146 if self.expl_lenindef:
2148 asn1_type_name="EOC",
2149 obj_name="EXPLICIT",
2150 decode_path=decode_path,
2151 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2160 def encode_cer(obj):
2161 """Encode to CER in memory buffer
2163 :returns bytes: memory buffer contents
2166 obj.encode_cer(buf.write)
2167 return buf.getvalue()
2170 def encode2pass(obj):
2171 """Encode (2-pass mode) to DER in memory buffer
2173 :returns bytes: memory buffer contents
2176 _, state = obj.encode1st()
2177 obj.encode2nd(buf.write, iter(state))
2178 return buf.getvalue()
2181 class DecodePathDefBy(object):
2182 """DEFINED BY representation inside decode path
2184 __slots__ = ("defined_by",)
2186 def __init__(self, defined_by):
2187 self.defined_by = defined_by
2189 def __ne__(self, their):
2190 return not(self == their)
2192 def __eq__(self, their):
2193 if not isinstance(their, self.__class__):
2195 return self.defined_by == their.defined_by
2198 return "DEFINED BY " + str(self.defined_by)
2201 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2204 ########################################################################
2206 ########################################################################
2208 PP = namedtuple("PP", (
2231 ), **NAMEDTUPLE_KWARGS)
2236 asn1_type_name="unknown",
2253 expl_lenindef=False,
2284 def _colourize(what, colour, with_colours, attrs=("bold",)):
2285 return colored(what, colour, attrs=attrs) if with_colours else what
2288 def colonize_hex(hexed):
2289 """Separate hexadecimal string with colons
2291 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2300 with_decode_path=False,
2301 decode_path_len_decrease=0,
2308 " " if pp.expl_offset is None else
2309 ("-%d" % (pp.offset - pp.expl_offset))
2311 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2313 col = _colourize(col, "red", with_colours, ())
2314 col += _colourize("B", "red", with_colours) if pp.bered else " "
2316 col = "[%d,%d,%4d]%s" % (
2320 LENINDEF_PP_CHAR if pp.lenindef else " "
2322 col = _colourize(col, "green", with_colours, ())
2324 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2325 if decode_path_len > 0:
2326 cols.append(" ." * decode_path_len)
2327 ent = pp.decode_path[-1]
2328 if isinstance(ent, DecodePathDefBy):
2329 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2330 value = str(ent.defined_by)
2333 len(oid_maps) > 0 and
2334 ent.defined_by.asn1_type_name ==
2335 ObjectIdentifier.asn1_type_name
2337 for oid_map in oid_maps:
2338 oid_name = oid_map.get(value)
2339 if oid_name is not None:
2340 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2342 if oid_name is None:
2343 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2345 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2346 if pp.expl is not None:
2347 klass, _, num = pp.expl
2348 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2349 cols.append(_colourize(col, "blue", with_colours))
2350 if pp.impl is not None:
2351 klass, _, num = pp.impl
2352 col = "[%s%d]" % (TagClassReprs[klass], num)
2353 cols.append(_colourize(col, "blue", with_colours))
2354 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2355 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2357 cols.append(_colourize("BER", "red", with_colours))
2358 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2359 if pp.value is not None:
2361 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2363 len(oid_maps) > 0 and
2364 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2366 for oid_map in oid_maps:
2367 oid_name = oid_map.get(value)
2368 if oid_name is not None:
2369 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2371 if pp.asn1_type_name == Integer.asn1_type_name:
2372 hex_repr = hex(int(pp.obj._value))[2:].upper()
2373 if len(hex_repr) % 2 != 0:
2374 hex_repr = "0" + hex_repr
2375 cols.append(_colourize(
2376 "(%s)" % colonize_hex(hex_repr),
2381 if pp.blob.__class__ == binary_type:
2382 cols.append(hexenc(pp.blob))
2383 elif pp.blob.__class__ == tuple:
2384 cols.append(", ".join(pp.blob))
2386 cols.append(_colourize("OPTIONAL", "red", with_colours))
2388 cols.append(_colourize("DEFAULT", "red", with_colours))
2389 if with_decode_path:
2390 cols.append(_colourize(
2391 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2395 return " ".join(cols)
2398 def pp_console_blob(pp, decode_path_len_decrease=0):
2399 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2400 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2401 if decode_path_len > 0:
2402 cols.append(" ." * (decode_path_len + 1))
2403 if pp.blob.__class__ == binary_type:
2404 blob = hexenc(pp.blob).upper()
2405 for i in six_xrange(0, len(blob), 32):
2406 chunk = blob[i:i + 32]
2407 yield " ".join(cols + [colonize_hex(chunk)])
2408 elif pp.blob.__class__ == tuple:
2409 yield " ".join(cols + [", ".join(pp.blob)])
2417 with_decode_path=False,
2418 decode_path_only=(),
2421 """Pretty print object
2423 :param Obj obj: object you want to pretty print
2424 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2425 Its human readable form is printed when OID is met
2426 :param big_blobs: if large binary objects are met (like OctetString
2427 values), do we need to print them too, on separate
2429 :param with_colours: colourize output, if ``termcolor`` library
2431 :param with_decode_path: print decode path
2432 :param decode_path_only: print only that specified decode path
2434 def _pprint_pps(pps):
2436 if hasattr(pp, "_fields"):
2438 decode_path_only != () and
2440 str(p) for p in pp.decode_path[:len(decode_path_only)]
2441 ) != decode_path_only
2445 yield pp_console_row(
2450 with_colours=with_colours,
2451 with_decode_path=with_decode_path,
2452 decode_path_len_decrease=len(decode_path_only),
2454 for row in pp_console_blob(
2456 decode_path_len_decrease=len(decode_path_only),
2460 yield pp_console_row(
2465 with_colours=with_colours,
2466 with_decode_path=with_decode_path,
2467 decode_path_len_decrease=len(decode_path_only),
2470 for row in _pprint_pps(pp):
2472 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2475 ########################################################################
2476 # ASN.1 primitive types
2477 ########################################################################
2479 BooleanState = namedtuple(
2481 BasicState._fields + ("value",),
2487 """``BOOLEAN`` boolean type
2489 >>> b = Boolean(True)
2491 >>> b == Boolean(True)
2497 tag_default = tag_encode(1)
2498 asn1_type_name = "BOOLEAN"
2510 :param value: set the value. Either boolean type, or
2511 :py:class:`pyderasn.Boolean` object
2512 :param bytes impl: override default tag with ``IMPLICIT`` one
2513 :param bytes expl: override default tag with ``EXPLICIT`` one
2514 :param default: set default value. Type same as in ``value``
2515 :param bool optional: is object ``OPTIONAL`` in sequence
2517 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2518 self._value = None if value is None else self._value_sanitize(value)
2519 if default is not None:
2520 default = self._value_sanitize(default)
2521 self.default = self.__class__(
2527 self._value = default
2529 def _value_sanitize(self, value):
2530 if value.__class__ == bool:
2532 if issubclass(value.__class__, Boolean):
2534 raise InvalidValueType((self.__class__, bool))
2538 return self._value is not None
2540 def __getstate__(self):
2541 return BooleanState(
2557 def __setstate__(self, state):
2558 super(Boolean, self).__setstate__(state)
2559 self._value = state.value
2561 def __nonzero__(self):
2562 self._assert_ready()
2566 self._assert_ready()
2569 def __eq__(self, their):
2570 if their.__class__ == bool:
2571 return self._value == their
2572 if not issubclass(their.__class__, Boolean):
2575 self._value == their._value and
2576 self.tag == their.tag and
2577 self._expl == their._expl
2588 return self.__class__(
2590 impl=self.tag if impl is None else impl,
2591 expl=self._expl if expl is None else expl,
2592 default=self.default if default is None else default,
2593 optional=self.optional if optional is None else optional,
2597 self._assert_ready()
2598 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2600 def _encode1st(self, state):
2601 return len(self.tag) + 2, state
2603 def _encode2nd(self, writer, state_iter):
2604 self._assert_ready()
2605 write_full(writer, self._encode())
2607 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2609 t, _, lv = tag_strip(tlv)
2610 except DecodeError as err:
2611 raise err.__class__(
2613 klass=self.__class__,
2614 decode_path=decode_path,
2619 klass=self.__class__,
2620 decode_path=decode_path,
2627 l, _, v = len_decode(lv)
2628 except DecodeError as err:
2629 raise err.__class__(
2631 klass=self.__class__,
2632 decode_path=decode_path,
2636 raise InvalidLength(
2637 "Boolean's length must be equal to 1",
2638 klass=self.__class__,
2639 decode_path=decode_path,
2643 raise NotEnoughData(
2644 "encoded length is longer than data",
2645 klass=self.__class__,
2646 decode_path=decode_path,
2649 first_octet = byte2int(v)
2651 if first_octet == 0:
2653 elif first_octet == 0xFF:
2655 elif ctx.get("bered", False):
2660 "unacceptable Boolean value",
2661 klass=self.__class__,
2662 decode_path=decode_path,
2665 obj = self.__class__(
2669 default=self.default,
2670 optional=self.optional,
2671 _decoded=(offset, 1, 1),
2673 obj.ber_encoded = ber_encoded
2674 yield decode_path, obj, v[1:]
2677 return pp_console_row(next(self.pps()))
2679 def pps(self, decode_path=()):
2682 asn1_type_name=self.asn1_type_name,
2683 obj_name=self.__class__.__name__,
2684 decode_path=decode_path,
2685 value=str(self._value) if self.ready else None,
2686 optional=self.optional,
2687 default=self == self.default,
2688 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2689 expl=None if self._expl is None else tag_decode(self._expl),
2694 expl_offset=self.expl_offset if self.expled else None,
2695 expl_tlen=self.expl_tlen if self.expled else None,
2696 expl_llen=self.expl_llen if self.expled else None,
2697 expl_vlen=self.expl_vlen if self.expled else None,
2698 expl_lenindef=self.expl_lenindef,
2699 ber_encoded=self.ber_encoded,
2702 for pp in self.pps_lenindef(decode_path):
2706 IntegerState = namedtuple(
2708 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2714 """``INTEGER`` integer type
2716 >>> b = Integer(-123)
2718 >>> b == Integer(-123)
2723 >>> Integer(2, bounds=(1, 3))
2725 >>> Integer(5, bounds=(1, 3))
2726 Traceback (most recent call last):
2727 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2731 class Version(Integer):
2738 >>> v = Version("v1")
2745 {'v3': 2, 'v1': 0, 'v2': 1}
2747 __slots__ = ("specs", "_bound_min", "_bound_max")
2748 tag_default = tag_encode(2)
2749 asn1_type_name = "INTEGER"
2763 :param value: set the value. Either integer type, named value
2764 (if ``schema`` is specified in the class), or
2765 :py:class:`pyderasn.Integer` object
2766 :param bounds: set ``(MIN, MAX)`` value constraint.
2767 (-inf, +inf) by default
2768 :param bytes impl: override default tag with ``IMPLICIT`` one
2769 :param bytes expl: override default tag with ``EXPLICIT`` one
2770 :param default: set default value. Type same as in ``value``
2771 :param bool optional: is object ``OPTIONAL`` in sequence
2773 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2775 specs = getattr(self, "schema", {}) if _specs is None else _specs
2776 self.specs = specs if specs.__class__ == dict else dict(specs)
2777 self._bound_min, self._bound_max = getattr(
2780 (float("-inf"), float("+inf")),
2781 ) if bounds is None else bounds
2782 if value is not None:
2783 self._value = self._value_sanitize(value)
2784 if default is not None:
2785 default = self._value_sanitize(default)
2786 self.default = self.__class__(
2792 if self._value is None:
2793 self._value = default
2795 def _value_sanitize(self, value):
2796 if isinstance(value, integer_types):
2798 elif issubclass(value.__class__, Integer):
2799 value = value._value
2800 elif value.__class__ == str:
2801 value = self.specs.get(value)
2803 raise ObjUnknown("integer value: %s" % value)
2805 raise InvalidValueType((self.__class__, int, str))
2806 if not self._bound_min <= value <= self._bound_max:
2807 raise BoundsError(self._bound_min, value, self._bound_max)
2812 return self._value is not None
2814 def __getstate__(self):
2815 return IntegerState(
2834 def __setstate__(self, state):
2835 super(Integer, self).__setstate__(state)
2836 self.specs = state.specs
2837 self._value = state.value
2838 self._bound_min = state.bound_min
2839 self._bound_max = state.bound_max
2842 self._assert_ready()
2843 return int(self._value)
2846 self._assert_ready()
2847 return hash(b"".join((
2849 bytes(self._expl or b""),
2850 str(self._value).encode("ascii"),
2853 def __eq__(self, their):
2854 if isinstance(their, integer_types):
2855 return self._value == their
2856 if not issubclass(their.__class__, Integer):
2859 self._value == their._value and
2860 self.tag == their.tag and
2861 self._expl == their._expl
2864 def __lt__(self, their):
2865 return self._value < their._value
2869 """Return named representation (if exists) of the value
2871 for name, value in iteritems(self.specs):
2872 if value == self._value:
2885 return self.__class__(
2888 (self._bound_min, self._bound_max)
2889 if bounds is None else bounds
2891 impl=self.tag if impl is None else impl,
2892 expl=self._expl if expl is None else expl,
2893 default=self.default if default is None else default,
2894 optional=self.optional if optional is None else optional,
2898 def _encode_payload(self):
2899 self._assert_ready()
2903 octets = bytearray([0])
2907 octets = bytearray()
2909 octets.append((value & 0xFF) ^ 0xFF)
2911 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2914 octets = bytearray()
2916 octets.append(value & 0xFF)
2918 if octets[-1] & 0x80 > 0:
2921 octets = bytes(octets)
2923 bytes_len = ceil(value.bit_length() / 8) or 1
2926 octets = value.to_bytes(
2931 except OverflowError:
2936 return b"".join((self.tag, len_encode(len(octets)), octets))
2939 octets = self._encode_payload()
2940 return b"".join((self.tag, len_encode(len(octets)), octets))
2942 def _encode1st(self, state):
2943 l = len(self._encode_payload())
2944 return len(self.tag) + len_size(l) + l, state
2946 def _encode2nd(self, writer, state_iter):
2947 write_full(writer, self._encode())
2949 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2951 t, _, lv = tag_strip(tlv)
2952 except DecodeError as err:
2953 raise err.__class__(
2955 klass=self.__class__,
2956 decode_path=decode_path,
2961 klass=self.__class__,
2962 decode_path=decode_path,
2969 l, llen, v = len_decode(lv)
2970 except DecodeError as err:
2971 raise err.__class__(
2973 klass=self.__class__,
2974 decode_path=decode_path,
2978 raise NotEnoughData(
2979 "encoded length is longer than data",
2980 klass=self.__class__,
2981 decode_path=decode_path,
2985 raise NotEnoughData(
2987 klass=self.__class__,
2988 decode_path=decode_path,
2991 v, tail = v[:l], v[l:]
2992 first_octet = byte2int(v)
2994 second_octet = byte2int(v[1:])
2996 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2997 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3000 "non normalized integer",
3001 klass=self.__class__,
3002 decode_path=decode_path,
3007 if first_octet & 0x80 > 0:
3008 octets = bytearray()
3009 for octet in bytearray(v):
3010 octets.append(octet ^ 0xFF)
3011 for octet in octets:
3012 value = (value << 8) | octet
3016 for octet in bytearray(v):
3017 value = (value << 8) | octet
3019 value = int.from_bytes(v, byteorder="big", signed=True)
3021 obj = self.__class__(
3023 bounds=(self._bound_min, self._bound_max),
3026 default=self.default,
3027 optional=self.optional,
3029 _decoded=(offset, llen, l),
3031 except BoundsError as err:
3034 klass=self.__class__,
3035 decode_path=decode_path,
3038 yield decode_path, obj, tail
3041 return pp_console_row(next(self.pps()))
3043 def pps(self, decode_path=()):
3046 asn1_type_name=self.asn1_type_name,
3047 obj_name=self.__class__.__name__,
3048 decode_path=decode_path,
3049 value=(self.named or str(self._value)) if self.ready else None,
3050 optional=self.optional,
3051 default=self == self.default,
3052 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3053 expl=None if self._expl is None else tag_decode(self._expl),
3058 expl_offset=self.expl_offset if self.expled else None,
3059 expl_tlen=self.expl_tlen if self.expled else None,
3060 expl_llen=self.expl_llen if self.expled else None,
3061 expl_vlen=self.expl_vlen if self.expled else None,
3062 expl_lenindef=self.expl_lenindef,
3065 for pp in self.pps_lenindef(decode_path):
3069 BitStringState = namedtuple(
3071 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3076 class BitString(Obj):
3077 """``BIT STRING`` bit string type
3079 >>> BitString(b"hello world")
3080 BIT STRING 88 bits 68656c6c6f20776f726c64
3083 >>> b == b"hello world"
3088 >>> BitString("'0A3B5F291CD'H")
3089 BIT STRING 44 bits 0a3b5f291cd0
3090 >>> b = BitString("'010110000000'B")
3091 BIT STRING 12 bits 5800
3094 >>> b[0], b[1], b[2], b[3]
3095 (False, True, False, True)
3099 [False, True, False, True, True, False, False, False, False, False, False, False]
3103 class KeyUsage(BitString):
3105 ("digitalSignature", 0),
3106 ("nonRepudiation", 1),
3107 ("keyEncipherment", 2),
3110 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3111 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3113 ['nonRepudiation', 'keyEncipherment']
3115 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3119 Pay attention that BIT STRING can be encoded both in primitive
3120 and constructed forms. Decoder always checks constructed form tag
3121 additionally to specified primitive one. If BER decoding is
3122 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3123 of DER restrictions.
3125 __slots__ = ("tag_constructed", "specs", "defined")
3126 tag_default = tag_encode(3)
3127 asn1_type_name = "BIT STRING"
3140 :param value: set the value. Either binary type, tuple of named
3141 values (if ``schema`` is specified in the class),
3142 string in ``'XXX...'B`` form, or
3143 :py:class:`pyderasn.BitString` object
3144 :param bytes impl: override default tag with ``IMPLICIT`` one
3145 :param bytes expl: override default tag with ``EXPLICIT`` one
3146 :param default: set default value. Type same as in ``value``
3147 :param bool optional: is object ``OPTIONAL`` in sequence
3149 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3150 specs = getattr(self, "schema", {}) if _specs is None else _specs
3151 self.specs = specs if specs.__class__ == dict else dict(specs)
3152 self._value = None if value is None else self._value_sanitize(value)
3153 if default is not None:
3154 default = self._value_sanitize(default)
3155 self.default = self.__class__(
3161 self._value = default
3163 tag_klass, _, tag_num = tag_decode(self.tag)
3164 self.tag_constructed = tag_encode(
3166 form=TagFormConstructed,
3170 def _bits2octets(self, bits):
3171 if len(self.specs) > 0:
3172 bits = bits.rstrip("0")
3174 bits += "0" * ((8 - (bit_len % 8)) % 8)
3175 octets = bytearray(len(bits) // 8)
3176 for i in six_xrange(len(octets)):
3177 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3178 return bit_len, bytes(octets)
3180 def _value_sanitize(self, value):
3181 if isinstance(value, (string_types, binary_type)):
3183 isinstance(value, string_types) and
3184 value.startswith("'")
3186 if value.endswith("'B"):
3188 if not frozenset(value) <= SET01:
3189 raise ValueError("B's coding contains unacceptable chars")
3190 return self._bits2octets(value)
3191 if value.endswith("'H"):
3195 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3197 if value.__class__ == binary_type:
3198 return (len(value) * 8, value)
3199 raise InvalidValueType((self.__class__, string_types, binary_type))
3200 if value.__class__ == tuple:
3203 isinstance(value[0], integer_types) and
3204 value[1].__class__ == binary_type
3209 bit = self.specs.get(name)
3211 raise ObjUnknown("BitString value: %s" % name)
3214 return self._bits2octets("")
3215 bits = frozenset(bits)
3216 return self._bits2octets("".join(
3217 ("1" if bit in bits else "0")
3218 for bit in six_xrange(max(bits) + 1)
3220 if issubclass(value.__class__, BitString):
3222 raise InvalidValueType((self.__class__, binary_type, string_types))
3226 return self._value is not None
3228 def __getstate__(self):
3229 return BitStringState(
3244 self.tag_constructed,
3248 def __setstate__(self, state):
3249 super(BitString, self).__setstate__(state)
3250 self.specs = state.specs
3251 self._value = state.value
3252 self.tag_constructed = state.tag_constructed
3253 self.defined = state.defined
3256 self._assert_ready()
3257 for i in six_xrange(self._value[0]):
3262 """Returns number of bits in the string
3264 self._assert_ready()
3265 return self._value[0]
3267 def __bytes__(self):
3268 self._assert_ready()
3269 return self._value[1]
3271 def __eq__(self, their):
3272 if their.__class__ == bytes:
3273 return self._value[1] == their
3274 if not issubclass(their.__class__, BitString):
3277 self._value == their._value and
3278 self.tag == their.tag and
3279 self._expl == their._expl
3284 """Named representation (if exists) of the bits
3286 :returns: [str(name), ...]
3288 return [name for name, bit in iteritems(self.specs) if self[bit]]
3298 return self.__class__(
3300 impl=self.tag if impl is None else impl,
3301 expl=self._expl if expl is None else expl,
3302 default=self.default if default is None else default,
3303 optional=self.optional if optional is None else optional,
3307 def __getitem__(self, key):
3308 if key.__class__ == int:
3309 bit_len, octets = self._value
3313 byte2int(memoryview(octets)[key // 8:]) >>
3316 if isinstance(key, string_types):
3317 value = self.specs.get(key)
3319 raise ObjUnknown("BitString value: %s" % key)
3321 raise InvalidValueType((int, str))
3324 self._assert_ready()
3325 bit_len, octets = self._value
3328 len_encode(len(octets) + 1),
3329 int2byte((8 - bit_len % 8) % 8),
3333 def _encode1st(self, state):
3334 self._assert_ready()
3335 _, octets = self._value
3337 return len(self.tag) + len_size(l) + l, state
3339 def _encode2nd(self, writer, state_iter):
3340 bit_len, octets = self._value
3341 write_full(writer, b"".join((
3343 len_encode(len(octets) + 1),
3344 int2byte((8 - bit_len % 8) % 8),
3346 write_full(writer, octets)
3348 def _encode_cer(self, writer):
3349 bit_len, octets = self._value
3350 if len(octets) + 1 <= 1000:
3351 write_full(writer, self._encode())
3353 write_full(writer, self.tag_constructed)
3354 write_full(writer, LENINDEF)
3355 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3356 write_full(writer, b"".join((
3357 BitString.tag_default,
3360 octets[offset:offset + 999],
3362 tail = octets[offset+999:]
3364 tail = int2byte((8 - bit_len % 8) % 8) + tail
3365 write_full(writer, b"".join((
3366 BitString.tag_default,
3367 len_encode(len(tail)),
3370 write_full(writer, EOC)
3372 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3374 t, tlen, lv = tag_strip(tlv)
3375 except DecodeError as err:
3376 raise err.__class__(
3378 klass=self.__class__,
3379 decode_path=decode_path,
3383 if tag_only: # pragma: no cover
3387 l, llen, v = len_decode(lv)
3388 except DecodeError as err:
3389 raise err.__class__(
3391 klass=self.__class__,
3392 decode_path=decode_path,
3396 raise NotEnoughData(
3397 "encoded length is longer than data",
3398 klass=self.__class__,
3399 decode_path=decode_path,
3403 raise NotEnoughData(
3405 klass=self.__class__,
3406 decode_path=decode_path,
3409 pad_size = byte2int(v)
3410 if l == 1 and pad_size != 0:
3412 "invalid empty value",
3413 klass=self.__class__,
3414 decode_path=decode_path,
3420 klass=self.__class__,
3421 decode_path=decode_path,
3424 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3427 klass=self.__class__,
3428 decode_path=decode_path,
3431 v, tail = v[:l], v[l:]
3432 bit_len = (len(v) - 1) * 8 - pad_size
3433 obj = self.__class__(
3434 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3437 default=self.default,
3438 optional=self.optional,
3440 _decoded=(offset, llen, l),
3443 obj._value = (bit_len, None)
3444 yield decode_path, obj, tail
3446 if t != self.tag_constructed:
3448 klass=self.__class__,
3449 decode_path=decode_path,
3452 if not ctx.get("bered", False):
3454 "unallowed BER constructed encoding",
3455 klass=self.__class__,
3456 decode_path=decode_path,
3459 if tag_only: # pragma: no cover
3464 l, llen, v = len_decode(lv)
3465 except LenIndefForm:
3466 llen, l, v = 1, 0, lv[1:]
3468 except DecodeError as err:
3469 raise err.__class__(
3471 klass=self.__class__,
3472 decode_path=decode_path,
3476 raise NotEnoughData(
3477 "encoded length is longer than data",
3478 klass=self.__class__,
3479 decode_path=decode_path,
3482 if not lenindef and l == 0:
3483 raise NotEnoughData(
3485 klass=self.__class__,
3486 decode_path=decode_path,
3490 sub_offset = offset + tlen + llen
3494 if v[:EOC_LEN].tobytes() == EOC:
3501 "chunk out of bounds",
3502 klass=self.__class__,
3503 decode_path=decode_path + (str(len(chunks) - 1),),
3504 offset=chunks[-1].offset,
3506 sub_decode_path = decode_path + (str(len(chunks)),)
3509 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3512 decode_path=sub_decode_path,
3515 _ctx_immutable=False,
3517 yield _decode_path, chunk, v_tail
3519 _, chunk, v_tail = next(BitString().decode_evgen(
3522 decode_path=sub_decode_path,
3525 _ctx_immutable=False,
3530 "expected BitString encoded chunk",
3531 klass=self.__class__,
3532 decode_path=sub_decode_path,
3535 chunks.append(chunk)
3536 sub_offset += chunk.tlvlen
3537 vlen += chunk.tlvlen
3539 if len(chunks) == 0:
3542 klass=self.__class__,
3543 decode_path=decode_path,
3548 for chunk_i, chunk in enumerate(chunks[:-1]):
3549 if chunk.bit_len % 8 != 0:
3551 "BitString chunk is not multiple of 8 bits",
3552 klass=self.__class__,
3553 decode_path=decode_path + (str(chunk_i),),
3554 offset=chunk.offset,
3557 values.append(bytes(chunk))
3558 bit_len += chunk.bit_len
3559 chunk_last = chunks[-1]
3561 values.append(bytes(chunk_last))
3562 bit_len += chunk_last.bit_len
3563 obj = self.__class__(
3564 value=None if evgen_mode else (bit_len, b"".join(values)),
3567 default=self.default,
3568 optional=self.optional,
3570 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3573 obj._value = (bit_len, None)
3574 obj.lenindef = lenindef
3575 obj.ber_encoded = True
3576 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3579 return pp_console_row(next(self.pps()))
3581 def pps(self, decode_path=()):
3585 bit_len, blob = self._value
3586 value = "%d bits" % bit_len
3587 if len(self.specs) > 0 and blob is not None:
3588 blob = tuple(self.named)
3591 asn1_type_name=self.asn1_type_name,
3592 obj_name=self.__class__.__name__,
3593 decode_path=decode_path,
3596 optional=self.optional,
3597 default=self == self.default,
3598 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3599 expl=None if self._expl is None else tag_decode(self._expl),
3604 expl_offset=self.expl_offset if self.expled else None,
3605 expl_tlen=self.expl_tlen if self.expled else None,
3606 expl_llen=self.expl_llen if self.expled else None,
3607 expl_vlen=self.expl_vlen if self.expled else None,
3608 expl_lenindef=self.expl_lenindef,
3609 lenindef=self.lenindef,
3610 ber_encoded=self.ber_encoded,
3613 defined_by, defined = self.defined or (None, None)
3614 if defined_by is not None:
3616 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3618 for pp in self.pps_lenindef(decode_path):
3622 OctetStringState = namedtuple(
3624 BasicState._fields + (
3635 class OctetString(Obj):
3636 """``OCTET STRING`` binary string type
3638 >>> s = OctetString(b"hello world")
3639 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3640 >>> s == OctetString(b"hello world")
3645 >>> OctetString(b"hello", bounds=(4, 4))
3646 Traceback (most recent call last):
3647 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3648 >>> OctetString(b"hell", bounds=(4, 4))
3649 OCTET STRING 4 bytes 68656c6c
3651 Memoryviews can be used as a values. If memoryview is made on
3652 mmap-ed file, then it does not take storage inside OctetString
3653 itself. In CER encoding mode it will be streamed to the specified
3654 writer, copying 1 KB chunks.
3656 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3657 tag_default = tag_encode(4)
3658 asn1_type_name = "OCTET STRING"
3659 evgen_mode_skip_value = True
3673 :param value: set the value. Either binary type, or
3674 :py:class:`pyderasn.OctetString` object
3675 :param bounds: set ``(MIN, MAX)`` value size constraint.
3676 (-inf, +inf) by default
3677 :param bytes impl: override default tag with ``IMPLICIT`` one
3678 :param bytes expl: override default tag with ``EXPLICIT`` one
3679 :param default: set default value. Type same as in ``value``
3680 :param bool optional: is object ``OPTIONAL`` in sequence
3682 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3684 self._bound_min, self._bound_max = getattr(
3688 ) if bounds is None else bounds
3689 if value is not None:
3690 self._value = self._value_sanitize(value)
3691 if default is not None:
3692 default = self._value_sanitize(default)
3693 self.default = self.__class__(
3698 if self._value is None:
3699 self._value = default
3701 tag_klass, _, tag_num = tag_decode(self.tag)
3702 self.tag_constructed = tag_encode(
3704 form=TagFormConstructed,
3708 def _value_sanitize(self, value):
3709 if value.__class__ == binary_type or value.__class__ == memoryview:
3711 elif issubclass(value.__class__, OctetString):
3712 value = value._value
3714 raise InvalidValueType((self.__class__, bytes, memoryview))
3715 if not self._bound_min <= len(value) <= self._bound_max:
3716 raise BoundsError(self._bound_min, len(value), self._bound_max)
3721 return self._value is not None
3723 def __getstate__(self):
3724 return OctetStringState(
3740 self.tag_constructed,
3744 def __setstate__(self, state):
3745 super(OctetString, self).__setstate__(state)
3746 self._value = state.value
3747 self._bound_min = state.bound_min
3748 self._bound_max = state.bound_max
3749 self.tag_constructed = state.tag_constructed
3750 self.defined = state.defined
3752 def __bytes__(self):
3753 self._assert_ready()
3754 return bytes(self._value)
3756 def __eq__(self, their):
3757 if their.__class__ == binary_type:
3758 return self._value == their
3759 if not issubclass(their.__class__, OctetString):
3762 self._value == their._value and
3763 self.tag == their.tag and
3764 self._expl == their._expl
3767 def __lt__(self, their):
3768 return self._value < their._value
3779 return self.__class__(
3782 (self._bound_min, self._bound_max)
3783 if bounds is None else bounds
3785 impl=self.tag if impl is None else impl,
3786 expl=self._expl if expl is None else expl,
3787 default=self.default if default is None else default,
3788 optional=self.optional if optional is None else optional,
3792 self._assert_ready()
3795 len_encode(len(self._value)),
3799 def _encode1st(self, state):
3800 self._assert_ready()
3801 l = len(self._value)
3802 return len(self.tag) + len_size(l) + l, state
3804 def _encode2nd(self, writer, state_iter):
3806 write_full(writer, self.tag + len_encode(len(value)))
3807 write_full(writer, value)
3809 def _encode_cer(self, writer):
3810 octets = self._value
3811 if len(octets) <= 1000:
3812 write_full(writer, self._encode())
3814 write_full(writer, self.tag_constructed)
3815 write_full(writer, LENINDEF)
3816 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3817 write_full(writer, b"".join((
3818 OctetString.tag_default,
3820 octets[offset:offset + 1000],
3822 tail = octets[offset+1000:]
3824 write_full(writer, b"".join((
3825 OctetString.tag_default,
3826 len_encode(len(tail)),
3829 write_full(writer, EOC)
3831 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3833 t, tlen, lv = tag_strip(tlv)
3834 except DecodeError as err:
3835 raise err.__class__(
3837 klass=self.__class__,
3838 decode_path=decode_path,
3846 l, llen, v = len_decode(lv)
3847 except DecodeError as err:
3848 raise err.__class__(
3850 klass=self.__class__,
3851 decode_path=decode_path,
3855 raise NotEnoughData(
3856 "encoded length is longer than data",
3857 klass=self.__class__,
3858 decode_path=decode_path,
3861 v, tail = v[:l], v[l:]
3862 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3864 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3865 klass=self.__class__,
3866 decode_path=decode_path,
3870 obj = self.__class__(
3872 None if (evgen_mode and self.evgen_mode_skip_value)
3875 bounds=(self._bound_min, self._bound_max),
3878 default=self.default,
3879 optional=self.optional,
3880 _decoded=(offset, llen, l),
3883 except DecodeError as err:
3886 klass=self.__class__,
3887 decode_path=decode_path,
3890 except BoundsError as err:
3893 klass=self.__class__,
3894 decode_path=decode_path,
3897 yield decode_path, obj, tail
3899 if t != self.tag_constructed:
3901 klass=self.__class__,
3902 decode_path=decode_path,
3905 if not ctx.get("bered", False):
3907 "unallowed BER constructed encoding",
3908 klass=self.__class__,
3909 decode_path=decode_path,
3917 l, llen, v = len_decode(lv)
3918 except LenIndefForm:
3919 llen, l, v = 1, 0, lv[1:]
3921 except DecodeError as err:
3922 raise err.__class__(
3924 klass=self.__class__,
3925 decode_path=decode_path,
3929 raise NotEnoughData(
3930 "encoded length is longer than data",
3931 klass=self.__class__,
3932 decode_path=decode_path,
3937 sub_offset = offset + tlen + llen
3942 if v[:EOC_LEN].tobytes() == EOC:
3949 "chunk out of bounds",
3950 klass=self.__class__,
3951 decode_path=decode_path + (str(len(chunks) - 1),),
3952 offset=chunks[-1].offset,
3956 sub_decode_path = decode_path + (str(chunks_count),)
3957 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3960 decode_path=sub_decode_path,
3963 _ctx_immutable=False,
3965 yield _decode_path, chunk, v_tail
3966 if not chunk.ber_encoded:
3967 payload_len += chunk.vlen
3970 sub_decode_path = decode_path + (str(len(chunks)),)
3971 _, chunk, v_tail = next(OctetString().decode_evgen(
3974 decode_path=sub_decode_path,
3977 _ctx_immutable=False,
3980 chunks.append(chunk)
3983 "expected OctetString encoded chunk",
3984 klass=self.__class__,
3985 decode_path=sub_decode_path,
3988 sub_offset += chunk.tlvlen
3989 vlen += chunk.tlvlen
3991 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3993 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3994 klass=self.__class__,
3995 decode_path=decode_path,
3999 obj = self.__class__(
4001 None if evgen_mode else
4002 b"".join(bytes(chunk) for chunk in chunks)
4004 bounds=(self._bound_min, self._bound_max),
4007 default=self.default,
4008 optional=self.optional,
4009 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4012 except DecodeError as err:
4015 klass=self.__class__,
4016 decode_path=decode_path,
4019 except BoundsError as err:
4022 klass=self.__class__,
4023 decode_path=decode_path,
4026 obj.lenindef = lenindef
4027 obj.ber_encoded = True
4028 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4031 return pp_console_row(next(self.pps()))
4033 def pps(self, decode_path=()):
4036 asn1_type_name=self.asn1_type_name,
4037 obj_name=self.__class__.__name__,
4038 decode_path=decode_path,
4039 value=("%d bytes" % len(self._value)) if self.ready else None,
4040 blob=self._value if self.ready else None,
4041 optional=self.optional,
4042 default=self == self.default,
4043 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4044 expl=None if self._expl is None else tag_decode(self._expl),
4049 expl_offset=self.expl_offset if self.expled else None,
4050 expl_tlen=self.expl_tlen if self.expled else None,
4051 expl_llen=self.expl_llen if self.expled else None,
4052 expl_vlen=self.expl_vlen if self.expled else None,
4053 expl_lenindef=self.expl_lenindef,
4054 lenindef=self.lenindef,
4055 ber_encoded=self.ber_encoded,
4058 defined_by, defined = self.defined or (None, None)
4059 if defined_by is not None:
4061 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4063 for pp in self.pps_lenindef(decode_path):
4067 def agg_octet_string(evgens, decode_path, raw, writer):
4068 """Aggregate constructed string (OctetString and its derivatives)
4070 :param evgens: iterator of generated events
4071 :param decode_path: points to the string we want to decode
4072 :param raw: slicebable (memoryview, bytearray, etc) with
4073 the data evgens are generated on
4074 :param writer: buffer.write where string is going to be saved
4075 :param writer: where string is going to be saved. Must comply
4076 with ``io.RawIOBase.write`` behaviour
4078 decode_path_len = len(decode_path)
4079 for dp, obj, _ in evgens:
4080 if dp[:decode_path_len] != decode_path:
4082 if not obj.ber_encoded:
4083 write_full(writer, raw[
4084 obj.offset + obj.tlen + obj.llen:
4085 obj.offset + obj.tlen + obj.llen + obj.vlen -
4086 (EOC_LEN if obj.expl_lenindef else 0)
4088 if len(dp) == decode_path_len:
4092 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4096 """``NULL`` null object
4104 tag_default = tag_encode(5)
4105 asn1_type_name = "NULL"
4109 value=None, # unused, but Sequence passes it
4116 :param bytes impl: override default tag with ``IMPLICIT`` one
4117 :param bytes expl: override default tag with ``EXPLICIT`` one
4118 :param bool optional: is object ``OPTIONAL`` in sequence
4120 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4127 def __getstate__(self):
4143 def __eq__(self, their):
4144 if not issubclass(their.__class__, Null):
4147 self.tag == their.tag and
4148 self._expl == their._expl
4158 return self.__class__(
4159 impl=self.tag if impl is None else impl,
4160 expl=self._expl if expl is None else expl,
4161 optional=self.optional if optional is None else optional,
4165 return self.tag + LEN0
4167 def _encode1st(self, state):
4168 return len(self.tag) + 1, state
4170 def _encode2nd(self, writer, state_iter):
4171 write_full(writer, self.tag + LEN0)
4173 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4175 t, _, lv = tag_strip(tlv)
4176 except DecodeError as err:
4177 raise err.__class__(
4179 klass=self.__class__,
4180 decode_path=decode_path,
4185 klass=self.__class__,
4186 decode_path=decode_path,
4189 if tag_only: # pragma: no cover
4193 l, _, v = len_decode(lv)
4194 except DecodeError as err:
4195 raise err.__class__(
4197 klass=self.__class__,
4198 decode_path=decode_path,
4202 raise InvalidLength(
4203 "Null must have zero length",
4204 klass=self.__class__,
4205 decode_path=decode_path,
4208 obj = self.__class__(
4211 optional=self.optional,
4212 _decoded=(offset, 1, 0),
4214 yield decode_path, obj, v
4217 return pp_console_row(next(self.pps()))
4219 def pps(self, decode_path=()):
4222 asn1_type_name=self.asn1_type_name,
4223 obj_name=self.__class__.__name__,
4224 decode_path=decode_path,
4225 optional=self.optional,
4226 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4227 expl=None if self._expl is None else tag_decode(self._expl),
4232 expl_offset=self.expl_offset if self.expled else None,
4233 expl_tlen=self.expl_tlen if self.expled else None,
4234 expl_llen=self.expl_llen if self.expled else None,
4235 expl_vlen=self.expl_vlen if self.expled else None,
4236 expl_lenindef=self.expl_lenindef,
4239 for pp in self.pps_lenindef(decode_path):
4243 ObjectIdentifierState = namedtuple(
4244 "ObjectIdentifierState",
4245 BasicState._fields + ("value", "defines"),
4250 class ObjectIdentifier(Obj):
4251 """``OBJECT IDENTIFIER`` OID type
4253 >>> oid = ObjectIdentifier((1, 2, 3))
4254 OBJECT IDENTIFIER 1.2.3
4255 >>> oid == ObjectIdentifier("1.2.3")
4261 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4262 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4264 >>> str(ObjectIdentifier((3, 1)))
4265 Traceback (most recent call last):
4266 pyderasn.InvalidOID: unacceptable first arc value
4268 __slots__ = ("defines",)
4269 tag_default = tag_encode(6)
4270 asn1_type_name = "OBJECT IDENTIFIER"
4283 :param value: set the value. Either tuples of integers,
4284 string of "."-concatenated integers, or
4285 :py:class:`pyderasn.ObjectIdentifier` object
4286 :param defines: sequence of tuples. Each tuple has two elements.
4287 First one is relative to current one decode
4288 path, aiming to the field defined by that OID.
4289 Read about relative path in
4290 :py:func:`pyderasn.abs_decode_path`. Second
4291 tuple element is ``{OID: pyderasn.Obj()}``
4292 dictionary, mapping between current OID value
4293 and structure applied to defined field.
4294 :ref:`Read about DEFINED BY <definedby>`
4295 :param bytes impl: override default tag with ``IMPLICIT`` one
4296 :param bytes expl: override default tag with ``EXPLICIT`` one
4297 :param default: set default value. Type same as in ``value``
4298 :param bool optional: is object ``OPTIONAL`` in sequence
4300 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4302 if value is not None:
4303 self._value = self._value_sanitize(value)
4304 if default is not None:
4305 default = self._value_sanitize(default)
4306 self.default = self.__class__(
4311 if self._value is None:
4312 self._value = default
4313 self.defines = defines
4315 def __add__(self, their):
4316 if their.__class__ == tuple:
4317 return self.__class__(self._value + array("L", their))
4318 if isinstance(their, self.__class__):
4319 return self.__class__(self._value + their._value)
4320 raise InvalidValueType((self.__class__, tuple))
4322 def _value_sanitize(self, value):
4323 if issubclass(value.__class__, ObjectIdentifier):
4325 if isinstance(value, string_types):
4327 value = array("L", (pureint(arc) for arc in value.split(".")))
4329 raise InvalidOID("unacceptable arcs values")
4330 if value.__class__ == tuple:
4332 value = array("L", value)
4333 except OverflowError as err:
4334 raise InvalidOID(repr(err))
4335 if value.__class__ is array:
4337 raise InvalidOID("less than 2 arcs")
4338 first_arc = value[0]
4339 if first_arc in (0, 1):
4340 if not (0 <= value[1] <= 39):
4341 raise InvalidOID("second arc is too wide")
4342 elif first_arc == 2:
4345 raise InvalidOID("unacceptable first arc value")
4346 if not all(arc >= 0 for arc in value):
4347 raise InvalidOID("negative arc value")
4349 raise InvalidValueType((self.__class__, str, tuple))
4353 return self._value is not None
4355 def __getstate__(self):
4356 return ObjectIdentifierState(
4373 def __setstate__(self, state):
4374 super(ObjectIdentifier, self).__setstate__(state)
4375 self._value = state.value
4376 self.defines = state.defines
4379 self._assert_ready()
4380 return iter(self._value)
4383 return ".".join(str(arc) for arc in self._value or ())
4386 self._assert_ready()
4387 return hash(b"".join((
4389 bytes(self._expl or b""),
4390 str(self._value).encode("ascii"),
4393 def __eq__(self, their):
4394 if their.__class__ == tuple:
4395 return self._value == array("L", their)
4396 if not issubclass(their.__class__, ObjectIdentifier):
4399 self.tag == their.tag and
4400 self._expl == their._expl and
4401 self._value == their._value
4404 def __lt__(self, their):
4405 return self._value < their._value
4416 return self.__class__(
4418 defines=self.defines if defines is None else defines,
4419 impl=self.tag if impl is None else impl,
4420 expl=self._expl if expl is None else expl,
4421 default=self.default if default is None else default,
4422 optional=self.optional if optional is None else optional,
4425 def _encode_octets(self):
4426 self._assert_ready()
4428 first_value = value[1]
4429 first_arc = value[0]
4432 elif first_arc == 1:
4434 elif first_arc == 2:
4436 else: # pragma: no cover
4437 raise RuntimeError("invalid arc is stored")
4438 octets = [zero_ended_encode(first_value)]
4439 for arc in value[2:]:
4440 octets.append(zero_ended_encode(arc))
4441 return b"".join(octets)
4444 v = self._encode_octets()
4445 return b"".join((self.tag, len_encode(len(v)), v))
4447 def _encode1st(self, state):
4448 l = len(self._encode_octets())
4449 return len(self.tag) + len_size(l) + l, state
4451 def _encode2nd(self, writer, state_iter):
4452 write_full(writer, self._encode())
4454 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4456 t, _, lv = tag_strip(tlv)
4457 except DecodeError as err:
4458 raise err.__class__(
4460 klass=self.__class__,
4461 decode_path=decode_path,
4466 klass=self.__class__,
4467 decode_path=decode_path,
4470 if tag_only: # pragma: no cover
4474 l, llen, v = len_decode(lv)
4475 except DecodeError as err:
4476 raise err.__class__(
4478 klass=self.__class__,
4479 decode_path=decode_path,
4483 raise NotEnoughData(
4484 "encoded length is longer than data",
4485 klass=self.__class__,
4486 decode_path=decode_path,
4490 raise NotEnoughData(
4492 klass=self.__class__,
4493 decode_path=decode_path,
4496 v, tail = v[:l], v[l:]
4503 octet = indexbytes(v, i)
4504 if i == 0 and octet == 0x80:
4505 if ctx.get("bered", False):
4509 "non normalized arc encoding",
4510 klass=self.__class__,
4511 decode_path=decode_path,
4514 arc = (arc << 7) | (octet & 0x7F)
4515 if octet & 0x80 == 0:
4518 except OverflowError:
4520 "too huge value for local unsigned long",
4521 klass=self.__class__,
4522 decode_path=decode_path,
4531 klass=self.__class__,
4532 decode_path=decode_path,
4536 second_arc = arcs[0]
4537 if 0 <= second_arc <= 39:
4539 elif 40 <= second_arc <= 79:
4545 obj = self.__class__(
4546 value=array("L", (first_arc, second_arc)) + arcs[1:],
4549 default=self.default,
4550 optional=self.optional,
4551 _decoded=(offset, llen, l),
4554 obj.ber_encoded = True
4555 yield decode_path, obj, tail
4558 return pp_console_row(next(self.pps()))
4560 def pps(self, decode_path=()):
4563 asn1_type_name=self.asn1_type_name,
4564 obj_name=self.__class__.__name__,
4565 decode_path=decode_path,
4566 value=str(self) if self.ready else None,
4567 optional=self.optional,
4568 default=self == self.default,
4569 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4570 expl=None if self._expl is None else tag_decode(self._expl),
4575 expl_offset=self.expl_offset if self.expled else None,
4576 expl_tlen=self.expl_tlen if self.expled else None,
4577 expl_llen=self.expl_llen if self.expled else None,
4578 expl_vlen=self.expl_vlen if self.expled else None,
4579 expl_lenindef=self.expl_lenindef,
4580 ber_encoded=self.ber_encoded,
4583 for pp in self.pps_lenindef(decode_path):
4587 class Enumerated(Integer):
4588 """``ENUMERATED`` integer type
4590 This type is identical to :py:class:`pyderasn.Integer`, but requires
4591 schema to be specified and does not accept values missing from it.
4594 tag_default = tag_encode(10)
4595 asn1_type_name = "ENUMERATED"
4606 bounds=None, # dummy argument, workability for Integer.decode
4608 super(Enumerated, self).__init__(
4609 value, bounds, impl, expl, default, optional, _specs, _decoded,
4611 if len(self.specs) == 0:
4612 raise ValueError("schema must be specified")
4614 def _value_sanitize(self, value):
4615 if isinstance(value, self.__class__):
4616 value = value._value
4617 elif isinstance(value, integer_types):
4618 for _value in itervalues(self.specs):
4623 "unknown integer value: %s" % value,
4624 klass=self.__class__,
4626 elif isinstance(value, string_types):
4627 value = self.specs.get(value)
4629 raise ObjUnknown("integer value: %s" % value)
4631 raise InvalidValueType((self.__class__, int, str))
4643 return self.__class__(
4645 impl=self.tag if impl is None else impl,
4646 expl=self._expl if expl is None else expl,
4647 default=self.default if default is None else default,
4648 optional=self.optional if optional is None else optional,
4653 def escape_control_unicode(c):
4654 if unicat(c)[0] == "C":
4655 c = repr(c).lstrip("u").strip("'")
4659 class CommonString(OctetString):
4660 """Common class for all strings
4662 Everything resembles :py:class:`pyderasn.OctetString`, except
4663 ability to deal with unicode text strings.
4665 >>> hexenc("привет мир".encode("utf-8"))
4666 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4667 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4669 >>> s = UTF8String("привет мир")
4670 UTF8String UTF8String привет мир
4672 'привет мир'
4673 >>> hexenc(bytes(s))
4674 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4676 >>> PrintableString("привет мир")
4677 Traceback (most recent call last):
4678 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4680 >>> BMPString("ада", bounds=(2, 2))
4681 Traceback (most recent call last):
4682 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4683 >>> s = BMPString("ад", bounds=(2, 2))
4686 >>> hexenc(bytes(s))
4694 * - :py:class:`pyderasn.UTF8String`
4696 * - :py:class:`pyderasn.NumericString`
4698 * - :py:class:`pyderasn.PrintableString`
4700 * - :py:class:`pyderasn.TeletexString`
4702 * - :py:class:`pyderasn.T61String`
4704 * - :py:class:`pyderasn.VideotexString`
4706 * - :py:class:`pyderasn.IA5String`
4708 * - :py:class:`pyderasn.GraphicString`
4710 * - :py:class:`pyderasn.VisibleString`
4712 * - :py:class:`pyderasn.ISO646String`
4714 * - :py:class:`pyderasn.GeneralString`
4716 * - :py:class:`pyderasn.UniversalString`
4718 * - :py:class:`pyderasn.BMPString`
4723 def _value_sanitize(self, value):
4725 value_decoded = None
4726 if isinstance(value, self.__class__):
4727 value_raw = value._value
4728 elif value.__class__ == text_type:
4729 value_decoded = value
4730 elif value.__class__ == binary_type:
4733 raise InvalidValueType((self.__class__, text_type, binary_type))
4736 value_decoded.encode(self.encoding)
4737 if value_raw is None else value_raw
4740 value_raw.decode(self.encoding)
4741 if value_decoded is None else value_decoded
4743 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4744 raise DecodeError(str(err))
4745 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4753 def __eq__(self, their):
4754 if their.__class__ == binary_type:
4755 return self._value == their
4756 if their.__class__ == text_type:
4757 return self._value == their.encode(self.encoding)
4758 if not isinstance(their, self.__class__):
4761 self._value == their._value and
4762 self.tag == their.tag and
4763 self._expl == their._expl
4766 def __unicode__(self):
4768 return self._value.decode(self.encoding)
4769 return text_type(self._value)
4772 return pp_console_row(next(self.pps(no_unicode=PY2)))
4774 def pps(self, decode_path=(), no_unicode=False):
4778 hexenc(bytes(self)) if no_unicode else
4779 "".join(escape_control_unicode(c) for c in self.__unicode__())
4783 asn1_type_name=self.asn1_type_name,
4784 obj_name=self.__class__.__name__,
4785 decode_path=decode_path,
4787 optional=self.optional,
4788 default=self == self.default,
4789 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4790 expl=None if self._expl is None else tag_decode(self._expl),
4795 expl_offset=self.expl_offset if self.expled else None,
4796 expl_tlen=self.expl_tlen if self.expled else None,
4797 expl_llen=self.expl_llen if self.expled else None,
4798 expl_vlen=self.expl_vlen if self.expled else None,
4799 expl_lenindef=self.expl_lenindef,
4800 ber_encoded=self.ber_encoded,
4803 for pp in self.pps_lenindef(decode_path):
4807 class UTF8String(CommonString):
4809 tag_default = tag_encode(12)
4811 asn1_type_name = "UTF8String"
4814 class AllowableCharsMixin(object):
4816 def allowable_chars(self):
4818 return self._allowable_chars
4819 return frozenset(six_unichr(c) for c in self._allowable_chars)
4822 class NumericString(AllowableCharsMixin, CommonString):
4825 Its value is properly sanitized: only ASCII digits with spaces can
4828 >>> NumericString().allowable_chars
4829 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4832 tag_default = tag_encode(18)
4834 asn1_type_name = "NumericString"
4835 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4837 def _value_sanitize(self, value):
4838 value = super(NumericString, self)._value_sanitize(value)
4839 if not frozenset(value) <= self._allowable_chars:
4840 raise DecodeError("non-numeric value")
4844 PrintableStringState = namedtuple(
4845 "PrintableStringState",
4846 OctetStringState._fields + ("allowable_chars",),
4851 class PrintableString(AllowableCharsMixin, CommonString):
4854 Its value is properly sanitized: see X.680 41.4 table 10.
4856 >>> PrintableString().allowable_chars
4857 frozenset([' ', "'", ..., 'z'])
4858 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4859 PrintableString PrintableString foo*bar
4860 >>> obj.allow_asterisk, obj.allow_ampersand
4864 tag_default = tag_encode(19)
4866 asn1_type_name = "PrintableString"
4867 _allowable_chars = frozenset(
4868 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4870 _asterisk = frozenset("*".encode("ascii"))
4871 _ampersand = frozenset("&".encode("ascii"))
4883 allow_asterisk=False,
4884 allow_ampersand=False,
4887 :param allow_asterisk: allow asterisk character
4888 :param allow_ampersand: allow ampersand character
4891 self._allowable_chars |= self._asterisk
4893 self._allowable_chars |= self._ampersand
4894 super(PrintableString, self).__init__(
4895 value, bounds, impl, expl, default, optional, _decoded, ctx,
4899 def allow_asterisk(self):
4900 """Is asterisk character allowed?
4902 return self._asterisk <= self._allowable_chars
4905 def allow_ampersand(self):
4906 """Is ampersand character allowed?
4908 return self._ampersand <= self._allowable_chars
4910 def _value_sanitize(self, value):
4911 value = super(PrintableString, self)._value_sanitize(value)
4912 if not frozenset(value) <= self._allowable_chars:
4913 raise DecodeError("non-printable value")
4916 def __getstate__(self):
4917 return PrintableStringState(
4918 *super(PrintableString, self).__getstate__(),
4919 **{"allowable_chars": self._allowable_chars}
4922 def __setstate__(self, state):
4923 super(PrintableString, self).__setstate__(state)
4924 self._allowable_chars = state.allowable_chars
4935 return self.__class__(
4938 (self._bound_min, self._bound_max)
4939 if bounds is None else bounds
4941 impl=self.tag if impl is None else impl,
4942 expl=self._expl if expl is None else expl,
4943 default=self.default if default is None else default,
4944 optional=self.optional if optional is None else optional,
4945 allow_asterisk=self.allow_asterisk,
4946 allow_ampersand=self.allow_ampersand,
4950 class TeletexString(CommonString):
4952 tag_default = tag_encode(20)
4954 asn1_type_name = "TeletexString"
4957 class T61String(TeletexString):
4959 asn1_type_name = "T61String"
4962 class VideotexString(CommonString):
4964 tag_default = tag_encode(21)
4965 encoding = "iso-8859-1"
4966 asn1_type_name = "VideotexString"
4969 class IA5String(CommonString):
4971 tag_default = tag_encode(22)
4973 asn1_type_name = "IA5"
4976 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4977 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4978 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4979 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4980 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4981 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4984 class VisibleString(CommonString):
4986 tag_default = tag_encode(26)
4988 asn1_type_name = "VisibleString"
4991 UTCTimeState = namedtuple(
4993 OctetStringState._fields + ("ber_raw",),
4998 def str_to_time_fractions(value):
5000 year, v = (v // 10**10), (v % 10**10)
5001 month, v = (v // 10**8), (v % 10**8)
5002 day, v = (v // 10**6), (v % 10**6)
5003 hour, v = (v // 10**4), (v % 10**4)
5004 minute, second = (v // 100), (v % 100)
5005 return year, month, day, hour, minute, second
5008 class UTCTime(VisibleString):
5009 """``UTCTime`` datetime type
5011 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5012 UTCTime UTCTime 2017-09-30T22:07:50
5018 datetime.datetime(2017, 9, 30, 22, 7, 50)
5019 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5020 datetime.datetime(1957, 9, 30, 22, 7, 50)
5022 If BER encoded value was met, then ``ber_raw`` attribute will hold
5023 its raw representation.
5027 Pay attention that UTCTime can not hold full year, so all years
5028 having < 50 years are treated as 20xx, 19xx otherwise, according
5029 to X.509 recommendation.
5033 No strict validation of UTC offsets are made, but very crude:
5035 * minutes are not exceeding 60
5036 * offset value is not exceeding 14 hours
5038 __slots__ = ("ber_raw",)
5039 tag_default = tag_encode(23)
5041 asn1_type_name = "UTCTime"
5042 evgen_mode_skip_value = False
5052 bounds=None, # dummy argument, workability for OctetString.decode
5056 :param value: set the value. Either datetime type, or
5057 :py:class:`pyderasn.UTCTime` object
5058 :param bytes impl: override default tag with ``IMPLICIT`` one
5059 :param bytes expl: override default tag with ``EXPLICIT`` one
5060 :param default: set default value. Type same as in ``value``
5061 :param bool optional: is object ``OPTIONAL`` in sequence
5063 super(UTCTime, self).__init__(
5064 None, None, impl, expl, None, optional, _decoded, ctx,
5068 if value is not None:
5069 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5070 self.ber_encoded = self.ber_raw is not None
5071 if default is not None:
5072 default, _ = self._value_sanitize(default)
5073 self.default = self.__class__(
5078 if self._value is None:
5079 self._value = default
5081 self.optional = optional
5083 def _strptime_bered(self, value):
5084 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5087 raise ValueError("no timezone")
5088 year += 2000 if year < 50 else 1900
5089 decoded = datetime(year, month, day, hour, minute)
5091 if value[-1] == "Z":
5095 raise ValueError("invalid UTC offset")
5096 if value[-5] == "-":
5098 elif value[-5] == "+":
5101 raise ValueError("invalid UTC offset")
5102 v = pureint(value[-4:])
5103 offset, v = (60 * (v % 100)), v // 100
5105 raise ValueError("invalid UTC offset minutes")
5107 if offset > 14 * 3600:
5108 raise ValueError("too big UTC offset")
5112 return offset, decoded
5114 raise ValueError("invalid UTC offset seconds")
5115 seconds = pureint(value)
5117 raise ValueError("invalid seconds value")
5118 return offset, decoded + timedelta(seconds=seconds)
5120 def _strptime(self, value):
5121 # datetime.strptime's format: %y%m%d%H%M%SZ
5122 if len(value) != LEN_YYMMDDHHMMSSZ:
5123 raise ValueError("invalid UTCTime length")
5124 if value[-1] != "Z":
5125 raise ValueError("non UTC timezone")
5126 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5127 year += 2000 if year < 50 else 1900
5128 return datetime(year, month, day, hour, minute, second)
5130 def _dt_sanitize(self, value):
5131 if value.year < 1950 or value.year > 2049:
5132 raise ValueError("UTCTime can hold only 1950-2049 years")
5133 return value.replace(microsecond=0)
5135 def _value_sanitize(self, value, ctx=None):
5136 if value.__class__ == binary_type:
5138 value_decoded = value.decode("ascii")
5139 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5140 raise DecodeError("invalid UTCTime encoding: %r" % err)
5143 return self._strptime(value_decoded), None
5144 except (TypeError, ValueError) as _err:
5146 if (ctx is not None) and ctx.get("bered", False):
5148 offset, _value = self._strptime_bered(value_decoded)
5149 _value = _value - timedelta(seconds=offset)
5150 return self._dt_sanitize(_value), value
5151 except (TypeError, ValueError, OverflowError) as _err:
5154 "invalid %s format: %r" % (self.asn1_type_name, err),
5155 klass=self.__class__,
5157 if isinstance(value, self.__class__):
5158 return value._value, None
5159 if value.__class__ == datetime:
5160 return self._dt_sanitize(value), None
5161 raise InvalidValueType((self.__class__, datetime))
5163 def _pp_value(self):
5165 value = self._value.isoformat()
5166 if self.ber_encoded:
5167 value += " (%s)" % self.ber_raw
5171 def __unicode__(self):
5173 value = self._value.isoformat()
5174 if self.ber_encoded:
5175 value += " (%s)" % self.ber_raw
5177 return text_type(self._pp_value())
5179 def __getstate__(self):
5180 return UTCTimeState(
5181 *super(UTCTime, self).__getstate__(),
5182 **{"ber_raw": self.ber_raw}
5185 def __setstate__(self, state):
5186 super(UTCTime, self).__setstate__(state)
5187 self.ber_raw = state.ber_raw
5189 def __bytes__(self):
5190 self._assert_ready()
5191 return self._encode_time()
5193 def __eq__(self, their):
5194 if their.__class__ == binary_type:
5195 return self._encode_time() == their
5196 if their.__class__ == datetime:
5197 return self.todatetime() == their
5198 if not isinstance(their, self.__class__):
5201 self._value == their._value and
5202 self.tag == their.tag and
5203 self._expl == their._expl
5206 def _encode_time(self):
5207 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5210 self._assert_ready()
5211 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5213 def _encode1st(self, state):
5214 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5216 def _encode2nd(self, writer, state_iter):
5217 self._assert_ready()
5218 write_full(writer, self._encode())
5220 def _encode_cer(self, writer):
5221 write_full(writer, self._encode())
5223 def todatetime(self):
5227 return pp_console_row(next(self.pps()))
5229 def pps(self, decode_path=()):
5232 asn1_type_name=self.asn1_type_name,
5233 obj_name=self.__class__.__name__,
5234 decode_path=decode_path,
5235 value=self._pp_value(),
5236 optional=self.optional,
5237 default=self == self.default,
5238 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5239 expl=None if self._expl is None else tag_decode(self._expl),
5244 expl_offset=self.expl_offset if self.expled else None,
5245 expl_tlen=self.expl_tlen if self.expled else None,
5246 expl_llen=self.expl_llen if self.expled else None,
5247 expl_vlen=self.expl_vlen if self.expled else None,
5248 expl_lenindef=self.expl_lenindef,
5249 ber_encoded=self.ber_encoded,
5252 for pp in self.pps_lenindef(decode_path):
5256 class GeneralizedTime(UTCTime):
5257 """``GeneralizedTime`` datetime type
5259 This type is similar to :py:class:`pyderasn.UTCTime`.
5261 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5262 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5264 '20170930220750.000123Z'
5265 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5266 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5270 Only microsecond fractions are supported in DER encoding.
5271 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5272 higher precision values.
5276 BER encoded data can loss information (accuracy) during decoding
5277 because of float transformations.
5281 Local times (without explicit timezone specification) are treated
5282 as UTC one, no transformations are made.
5286 Zero year is unsupported.
5289 tag_default = tag_encode(24)
5290 asn1_type_name = "GeneralizedTime"
5292 def _dt_sanitize(self, value):
5295 def _strptime_bered(self, value):
5296 if len(value) < 4 + 3 * 2:
5297 raise ValueError("invalid GeneralizedTime")
5298 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5299 decoded = datetime(year, month, day, hour)
5300 offset, value = 0, value[10:]
5302 return offset, decoded
5303 if value[-1] == "Z":
5306 for char, sign in (("-", -1), ("+", 1)):
5307 idx = value.rfind(char)
5310 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5311 v = pureint(offset_raw)
5312 if len(offset_raw) == 4:
5313 offset, v = (60 * (v % 100)), v // 100
5315 raise ValueError("invalid UTC offset minutes")
5316 elif len(offset_raw) == 2:
5319 raise ValueError("invalid UTC offset")
5321 if offset > 14 * 3600:
5322 raise ValueError("too big UTC offset")
5326 return offset, decoded
5327 if value[0] in DECIMAL_SIGNS:
5329 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5332 raise ValueError("stripped minutes")
5333 decoded += timedelta(seconds=60 * pureint(value[:2]))
5336 return offset, decoded
5337 if value[0] in DECIMAL_SIGNS:
5339 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5342 raise ValueError("stripped seconds")
5343 decoded += timedelta(seconds=pureint(value[:2]))
5346 return offset, decoded
5347 if value[0] not in DECIMAL_SIGNS:
5348 raise ValueError("invalid format after seconds")
5350 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5353 def _strptime(self, value):
5355 if l == LEN_YYYYMMDDHHMMSSZ:
5356 # datetime.strptime's format: %Y%m%d%H%M%SZ
5357 if value[-1] != "Z":
5358 raise ValueError("non UTC timezone")
5359 return datetime(*str_to_time_fractions(value[:-1]))
5360 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5361 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5362 if value[-1] != "Z":
5363 raise ValueError("non UTC timezone")
5364 if value[14] != ".":
5365 raise ValueError("no fractions separator")
5368 raise ValueError("trailing zero")
5371 raise ValueError("only microsecond fractions are supported")
5372 us = pureint(us + ("0" * (6 - us_len)))
5373 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5374 return datetime(year, month, day, hour, minute, second, us)
5375 raise ValueError("invalid GeneralizedTime length")
5377 def _encode_time(self):
5379 encoded = value.strftime("%Y%m%d%H%M%S")
5380 if value.microsecond > 0:
5381 encoded += (".%06d" % value.microsecond).rstrip("0")
5382 return (encoded + "Z").encode("ascii")
5385 self._assert_ready()
5387 if value.microsecond > 0:
5388 encoded = self._encode_time()
5389 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5390 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5392 def _encode1st(self, state):
5393 self._assert_ready()
5394 vlen = len(self._encode_time())
5395 return len(self.tag) + len_size(vlen) + vlen, state
5397 def _encode2nd(self, writer, state_iter):
5398 write_full(writer, self._encode())
5401 class GraphicString(CommonString):
5403 tag_default = tag_encode(25)
5404 encoding = "iso-8859-1"
5405 asn1_type_name = "GraphicString"
5408 class ISO646String(VisibleString):
5410 asn1_type_name = "ISO646String"
5413 class GeneralString(CommonString):
5415 tag_default = tag_encode(27)
5416 encoding = "iso-8859-1"
5417 asn1_type_name = "GeneralString"
5420 class UniversalString(CommonString):
5422 tag_default = tag_encode(28)
5423 encoding = "utf-32-be"
5424 asn1_type_name = "UniversalString"
5427 class BMPString(CommonString):
5429 tag_default = tag_encode(30)
5430 encoding = "utf-16-be"
5431 asn1_type_name = "BMPString"
5434 ChoiceState = namedtuple(
5436 BasicState._fields + ("specs", "value",),
5442 """``CHOICE`` special type
5446 class GeneralName(Choice):
5448 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5449 ("dNSName", IA5String(impl=tag_ctxp(2))),
5452 >>> gn = GeneralName()
5454 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5455 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5456 >>> gn["dNSName"] = IA5String("bar.baz")
5457 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5458 >>> gn["rfc822Name"]
5461 [2] IA5String IA5 bar.baz
5464 >>> gn.value == gn["dNSName"]
5467 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5469 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5470 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5472 __slots__ = ("specs",)
5474 asn1_type_name = "CHOICE"
5487 :param value: set the value. Either ``(choice, value)`` tuple, or
5488 :py:class:`pyderasn.Choice` object
5489 :param bytes impl: can not be set, do **not** use it
5490 :param bytes expl: override default tag with ``EXPLICIT`` one
5491 :param default: set default value. Type same as in ``value``
5492 :param bool optional: is object ``OPTIONAL`` in sequence
5494 if impl is not None:
5495 raise ValueError("no implicit tag allowed for CHOICE")
5496 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5498 schema = getattr(self, "schema", ())
5499 if len(schema) == 0:
5500 raise ValueError("schema must be specified")
5502 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5505 if value is not None:
5506 self._value = self._value_sanitize(value)
5507 if default is not None:
5508 default_value = self._value_sanitize(default)
5509 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5510 default_obj.specs = self.specs
5511 default_obj._value = default_value
5512 self.default = default_obj
5514 self._value = copy(default_obj._value)
5515 if self._expl is not None:
5516 tag_class, _, tag_num = tag_decode(self._expl)
5517 self._tag_order = (tag_class, tag_num)
5519 def _value_sanitize(self, value):
5520 if (value.__class__ == tuple) and len(value) == 2:
5522 spec = self.specs.get(choice)
5524 raise ObjUnknown(choice)
5525 if not isinstance(obj, spec.__class__):
5526 raise InvalidValueType((spec,))
5527 return (choice, spec(obj))
5528 if isinstance(value, self.__class__):
5530 raise InvalidValueType((self.__class__, tuple))
5534 return self._value is not None and self._value[1].ready
5538 return self.expl_lenindef or (
5539 (self._value is not None) and
5540 self._value[1].bered
5543 def __getstate__(self):
5561 def __setstate__(self, state):
5562 super(Choice, self).__setstate__(state)
5563 self.specs = state.specs
5564 self._value = state.value
5566 def __eq__(self, their):
5567 if (their.__class__ == tuple) and len(their) == 2:
5568 return self._value == their
5569 if not isinstance(their, self.__class__):
5572 self.specs == their.specs and
5573 self._value == their._value
5583 return self.__class__(
5586 expl=self._expl if expl is None else expl,
5587 default=self.default if default is None else default,
5588 optional=self.optional if optional is None else optional,
5593 """Name of the choice
5595 self._assert_ready()
5596 return self._value[0]
5600 """Value of underlying choice
5602 self._assert_ready()
5603 return self._value[1]
5606 def tag_order(self):
5607 self._assert_ready()
5608 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5611 def tag_order_cer(self):
5612 return min(v.tag_order_cer for v in itervalues(self.specs))
5614 def __getitem__(self, key):
5615 if key not in self.specs:
5616 raise ObjUnknown(key)
5617 if self._value is None:
5619 choice, value = self._value
5624 def __setitem__(self, key, value):
5625 spec = self.specs.get(key)
5627 raise ObjUnknown(key)
5628 if not isinstance(value, spec.__class__):
5629 raise InvalidValueType((spec.__class__,))
5630 self._value = (key, spec(value))
5638 return self._value[1].decoded if self.ready else False
5641 self._assert_ready()
5642 return self._value[1].encode()
5644 def _encode1st(self, state):
5645 self._assert_ready()
5646 return self._value[1].encode1st(state)
5648 def _encode2nd(self, writer, state_iter):
5649 self._value[1].encode2nd(writer, state_iter)
5651 def _encode_cer(self, writer):
5652 self._assert_ready()
5653 self._value[1].encode_cer(writer)
5655 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5656 for choice, spec in iteritems(self.specs):
5657 sub_decode_path = decode_path + (choice,)
5663 decode_path=sub_decode_path,
5666 _ctx_immutable=False,
5673 klass=self.__class__,
5674 decode_path=decode_path,
5677 if tag_only: # pragma: no cover
5681 for _decode_path, value, tail in spec.decode_evgen(
5685 decode_path=sub_decode_path,
5687 _ctx_immutable=False,
5689 yield _decode_path, value, tail
5691 _, value, tail = next(spec.decode_evgen(
5695 decode_path=sub_decode_path,
5697 _ctx_immutable=False,
5700 obj = self.__class__(
5703 default=self.default,
5704 optional=self.optional,
5705 _decoded=(offset, 0, value.fulllen),
5707 obj._value = (choice, value)
5708 yield decode_path, obj, tail
5711 value = pp_console_row(next(self.pps()))
5713 value = "%s[%r]" % (value, self.value)
5716 def pps(self, decode_path=()):
5719 asn1_type_name=self.asn1_type_name,
5720 obj_name=self.__class__.__name__,
5721 decode_path=decode_path,
5722 value=self.choice if self.ready else None,
5723 optional=self.optional,
5724 default=self == self.default,
5725 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5726 expl=None if self._expl is None else tag_decode(self._expl),
5731 expl_lenindef=self.expl_lenindef,
5735 yield self.value.pps(decode_path=decode_path + (self.choice,))
5736 for pp in self.pps_lenindef(decode_path):
5740 class PrimitiveTypes(Choice):
5741 """Predefined ``CHOICE`` for all generic primitive types
5743 It could be useful for general decoding of some unspecified values:
5745 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5746 OCTET STRING 3 bytes 666f6f
5747 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5751 schema = tuple((klass.__name__, klass()) for klass in (
5775 AnyState = namedtuple(
5777 BasicState._fields + ("value", "defined"),
5783 """``ANY`` special type
5785 >>> Any(Integer(-123))
5786 ANY INTEGER -123 (0X:7B)
5787 >>> a = Any(OctetString(b"hello world").encode())
5788 ANY 040b68656c6c6f20776f726c64
5789 >>> hexenc(bytes(a))
5790 b'0x040x0bhello world'
5792 __slots__ = ("defined",)
5793 tag_default = tag_encode(0)
5794 asn1_type_name = "ANY"
5804 :param value: set the value. Either any kind of pyderasn's
5805 **ready** object, or bytes. Pay attention that
5806 **no** validation is performed if raw binary value
5807 is valid TLV, except just tag decoding
5808 :param bytes expl: override default tag with ``EXPLICIT`` one
5809 :param bool optional: is object ``OPTIONAL`` in sequence
5811 super(Any, self).__init__(None, expl, None, optional, _decoded)
5815 value = self._value_sanitize(value)
5817 if self._expl is None:
5818 if value.__class__ == binary_type:
5819 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5821 tag_class, tag_num = value.tag_order
5823 tag_class, _, tag_num = tag_decode(self._expl)
5824 self._tag_order = (tag_class, tag_num)
5827 def _value_sanitize(self, value):
5828 if value.__class__ == binary_type:
5830 raise ValueError("Any value can not be empty")
5832 if isinstance(value, self.__class__):
5834 if not isinstance(value, Obj):
5835 raise InvalidValueType((self.__class__, Obj, binary_type))
5840 return self._value is not None
5843 def tag_order(self):
5844 self._assert_ready()
5845 return self._tag_order
5849 if self.expl_lenindef or self.lenindef:
5851 if self.defined is None:
5853 return self.defined[1].bered
5855 def __getstate__(self):
5873 def __setstate__(self, state):
5874 super(Any, self).__setstate__(state)
5875 self._value = state.value
5876 self.defined = state.defined
5878 def __eq__(self, their):
5879 if their.__class__ == binary_type:
5880 if self._value.__class__ == binary_type:
5881 return self._value == their
5882 return self._value.encode() == their
5883 if issubclass(their.__class__, Any):
5884 if self.ready and their.ready:
5885 return bytes(self) == bytes(their)
5886 return self.ready == their.ready
5895 return self.__class__(
5897 expl=self._expl if expl is None else expl,
5898 optional=self.optional if optional is None else optional,
5901 def __bytes__(self):
5902 self._assert_ready()
5904 if value.__class__ == binary_type:
5906 return self._value.encode()
5913 self._assert_ready()
5915 if value.__class__ == binary_type:
5917 return value.encode()
5919 def _encode1st(self, state):
5920 self._assert_ready()
5922 if value.__class__ == binary_type:
5923 return len(value), state
5924 return value.encode1st(state)
5926 def _encode2nd(self, writer, state_iter):
5928 if value.__class__ == binary_type:
5929 write_full(writer, value)
5931 value.encode2nd(writer, state_iter)
5933 def _encode_cer(self, writer):
5934 self._assert_ready()
5936 if value.__class__ == binary_type:
5937 write_full(writer, value)
5939 value.encode_cer(writer)
5941 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5943 t, tlen, lv = tag_strip(tlv)
5944 except DecodeError as err:
5945 raise err.__class__(
5947 klass=self.__class__,
5948 decode_path=decode_path,
5952 l, llen, v = len_decode(lv)
5953 except LenIndefForm as err:
5954 if not ctx.get("bered", False):
5955 raise err.__class__(
5957 klass=self.__class__,
5958 decode_path=decode_path,
5961 llen, vlen, v = 1, 0, lv[1:]
5962 sub_offset = offset + tlen + llen
5964 while v[:EOC_LEN].tobytes() != EOC:
5965 chunk, v = Any().decode(
5968 decode_path=decode_path + (str(chunk_i),),
5971 _ctx_immutable=False,
5973 vlen += chunk.tlvlen
5974 sub_offset += chunk.tlvlen
5976 tlvlen = tlen + llen + vlen + EOC_LEN
5977 obj = self.__class__(
5978 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5980 optional=self.optional,
5981 _decoded=(offset, 0, tlvlen),
5984 obj.tag = t.tobytes()
5985 yield decode_path, obj, v[EOC_LEN:]
5987 except DecodeError as err:
5988 raise err.__class__(
5990 klass=self.__class__,
5991 decode_path=decode_path,
5995 raise NotEnoughData(
5996 "encoded length is longer than data",
5997 klass=self.__class__,
5998 decode_path=decode_path,
6001 tlvlen = tlen + llen + l
6002 v, tail = tlv[:tlvlen], v[l:]
6003 obj = self.__class__(
6004 value=None if evgen_mode else v.tobytes(),
6006 optional=self.optional,
6007 _decoded=(offset, 0, tlvlen),
6009 obj.tag = t.tobytes()
6010 yield decode_path, obj, tail
6013 return pp_console_row(next(self.pps()))
6015 def pps(self, decode_path=()):
6019 elif value.__class__ == binary_type:
6025 asn1_type_name=self.asn1_type_name,
6026 obj_name=self.__class__.__name__,
6027 decode_path=decode_path,
6029 blob=self._value if self._value.__class__ == binary_type else None,
6030 optional=self.optional,
6031 default=self == self.default,
6032 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6033 expl=None if self._expl is None else tag_decode(self._expl),
6038 expl_offset=self.expl_offset if self.expled else None,
6039 expl_tlen=self.expl_tlen if self.expled else None,
6040 expl_llen=self.expl_llen if self.expled else None,
6041 expl_vlen=self.expl_vlen if self.expled else None,
6042 expl_lenindef=self.expl_lenindef,
6043 lenindef=self.lenindef,
6046 defined_by, defined = self.defined or (None, None)
6047 if defined_by is not None:
6049 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6051 for pp in self.pps_lenindef(decode_path):
6055 ########################################################################
6056 # ASN.1 constructed types
6057 ########################################################################
6059 def abs_decode_path(decode_path, rel_path):
6060 """Create an absolute decode path from current and relative ones
6062 :param decode_path: current decode path, starting point. Tuple of strings
6063 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6064 If first tuple's element is "/", then treat it as
6065 an absolute path, ignoring ``decode_path`` as
6066 starting point. Also this tuple can contain ".."
6067 elements, stripping the leading element from
6070 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6071 ("foo", "bar", "baz", "whatever")
6072 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6074 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6077 if rel_path[0] == "/":
6079 if rel_path[0] == "..":
6080 return abs_decode_path(decode_path[:-1], rel_path[1:])
6081 return decode_path + rel_path
6084 SequenceState = namedtuple(
6086 BasicState._fields + ("specs", "value",),
6091 class SequenceEncode1stMixing(object):
6092 def _encode1st(self, state):
6094 idx = len(state) - 1
6096 for v in self._values_for_encoding():
6097 l, _ = v.encode1st(state)
6100 return len(self.tag) + len_size(vlen) + vlen, state
6103 class Sequence(SequenceEncode1stMixing, Obj):
6104 """``SEQUENCE`` structure type
6106 You have to make specification of sequence::
6108 class Extension(Sequence):
6110 ("extnID", ObjectIdentifier()),
6111 ("critical", Boolean(default=False)),
6112 ("extnValue", OctetString()),
6115 Then, you can work with it as with dictionary.
6117 >>> ext = Extension()
6118 >>> Extension().specs
6120 ('extnID', OBJECT IDENTIFIER),
6121 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6122 ('extnValue', OCTET STRING),
6124 >>> ext["extnID"] = "1.2.3"
6125 Traceback (most recent call last):
6126 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6127 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6129 You can determine if sequence is ready to be encoded:
6134 Traceback (most recent call last):
6135 pyderasn.ObjNotReady: object is not ready: extnValue
6136 >>> ext["extnValue"] = OctetString(b"foobar")
6140 Value you want to assign, must have the same **type** as in
6141 corresponding specification, but it can have different tags,
6142 optional/default attributes -- they will be taken from specification
6145 class TBSCertificate(Sequence):
6147 ("version", Version(expl=tag_ctxc(0), default="v1")),
6150 >>> tbs = TBSCertificate()
6151 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6153 Assign ``None`` to remove value from sequence.
6155 You can set values in Sequence during its initialization:
6157 >>> AlgorithmIdentifier((
6158 ("algorithm", ObjectIdentifier("1.2.3")),
6159 ("parameters", Any(Null()))
6161 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6163 You can determine if value exists/set in the sequence and take its value:
6165 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6168 OBJECT IDENTIFIER 1.2.3
6170 But pay attention that if value has default, then it won't be (not
6171 in) in the sequence (because ``DEFAULT`` must not be encoded in
6172 DER), but you can read its value:
6174 >>> "critical" in ext, ext["critical"]
6175 (False, BOOLEAN False)
6176 >>> ext["critical"] = Boolean(True)
6177 >>> "critical" in ext, ext["critical"]
6178 (True, BOOLEAN True)
6180 All defaulted values are always optional.
6182 .. _allow_default_values_ctx:
6184 DER prohibits default value encoding and will raise an error if
6185 default value is unexpectedly met during decode.
6186 If :ref:`bered <bered_ctx>` context option is set, then no error
6187 will be raised, but ``bered`` attribute set. You can disable strict
6188 defaulted values existence validation by setting
6189 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6193 Check for default value existence is not performed in
6194 ``evgen_mode``, because previously decoded values are not stored
6195 in memory, to be able to compare them.
6197 Two sequences are equal if they have equal specification (schema),
6198 implicit/explicit tagging and the same values.
6200 __slots__ = ("specs",)
6201 tag_default = tag_encode(form=TagFormConstructed, num=16)
6202 asn1_type_name = "SEQUENCE"
6214 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6216 schema = getattr(self, "schema", ())
6218 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6221 if value is not None:
6222 if issubclass(value.__class__, Sequence):
6223 self._value = value._value
6224 elif hasattr(value, "__iter__"):
6225 for seq_key, seq_value in value:
6226 self[seq_key] = seq_value
6228 raise InvalidValueType((Sequence,))
6229 if default is not None:
6230 if not issubclass(default.__class__, Sequence):
6231 raise InvalidValueType((Sequence,))
6232 default_value = default._value
6233 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6234 default_obj.specs = self.specs
6235 default_obj._value = default_value
6236 self.default = default_obj
6238 self._value = copy(default_obj._value)
6242 for name, spec in iteritems(self.specs):
6243 value = self._value.get(name)
6254 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6256 return any(value.bered for value in itervalues(self._value))
6258 def __getstate__(self):
6259 return SequenceState(
6273 {k: copy(v) for k, v in iteritems(self._value)},
6276 def __setstate__(self, state):
6277 super(Sequence, self).__setstate__(state)
6278 self.specs = state.specs
6279 self._value = state.value
6281 def __eq__(self, their):
6282 if not isinstance(their, self.__class__):
6285 self.specs == their.specs and
6286 self.tag == their.tag and
6287 self._expl == their._expl and
6288 self._value == their._value
6299 return self.__class__(
6302 impl=self.tag if impl is None else impl,
6303 expl=self._expl if expl is None else expl,
6304 default=self.default if default is None else default,
6305 optional=self.optional if optional is None else optional,
6308 def __contains__(self, key):
6309 return key in self._value
6311 def __setitem__(self, key, value):
6312 spec = self.specs.get(key)
6314 raise ObjUnknown(key)
6316 self._value.pop(key, None)
6318 if not isinstance(value, spec.__class__):
6319 raise InvalidValueType((spec.__class__,))
6320 value = spec(value=value)
6321 if spec.default is not None and value == spec.default:
6322 self._value.pop(key, None)
6324 self._value[key] = value
6326 def __getitem__(self, key):
6327 value = self._value.get(key)
6328 if value is not None:
6330 spec = self.specs.get(key)
6332 raise ObjUnknown(key)
6333 if spec.default is not None:
6337 def _values_for_encoding(self):
6338 for name, spec in iteritems(self.specs):
6339 value = self._value.get(name)
6343 raise ObjNotReady(name)
6347 v = b"".join(v.encode() for v in self._values_for_encoding())
6348 return b"".join((self.tag, len_encode(len(v)), v))
6350 def _encode2nd(self, writer, state_iter):
6351 write_full(writer, self.tag + len_encode(next(state_iter)))
6352 for v in self._values_for_encoding():
6353 v.encode2nd(writer, state_iter)
6355 def _encode_cer(self, writer):
6356 write_full(writer, self.tag + LENINDEF)
6357 for v in self._values_for_encoding():
6358 v.encode_cer(writer)
6359 write_full(writer, EOC)
6361 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6363 t, tlen, lv = tag_strip(tlv)
6364 except DecodeError as err:
6365 raise err.__class__(
6367 klass=self.__class__,
6368 decode_path=decode_path,
6373 klass=self.__class__,
6374 decode_path=decode_path,
6377 if tag_only: # pragma: no cover
6381 ctx_bered = ctx.get("bered", False)
6383 l, llen, v = len_decode(lv)
6384 except LenIndefForm as err:
6386 raise err.__class__(
6388 klass=self.__class__,
6389 decode_path=decode_path,
6392 l, llen, v = 0, 1, lv[1:]
6394 except DecodeError as err:
6395 raise err.__class__(
6397 klass=self.__class__,
6398 decode_path=decode_path,
6402 raise NotEnoughData(
6403 "encoded length is longer than data",
6404 klass=self.__class__,
6405 decode_path=decode_path,
6409 v, tail = v[:l], v[l:]
6411 sub_offset = offset + tlen + llen
6414 ctx_allow_default_values = ctx.get("allow_default_values", False)
6415 for name, spec in iteritems(self.specs):
6416 if spec.optional and (
6417 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6421 sub_decode_path = decode_path + (name,)
6424 for _decode_path, value, v_tail in spec.decode_evgen(
6428 decode_path=sub_decode_path,
6430 _ctx_immutable=False,
6432 yield _decode_path, value, v_tail
6434 _, value, v_tail = next(spec.decode_evgen(
6438 decode_path=sub_decode_path,
6440 _ctx_immutable=False,
6443 except TagMismatch as err:
6444 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6448 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6449 if not evgen_mode and defined is not None:
6450 defined_by, defined_spec = defined
6451 if issubclass(value.__class__, SequenceOf):
6452 for i, _value in enumerate(value):
6453 sub_sub_decode_path = sub_decode_path + (
6455 DecodePathDefBy(defined_by),
6457 defined_value, defined_tail = defined_spec.decode(
6458 memoryview(bytes(_value)),
6460 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6461 if value.expled else (value.tlen + value.llen)
6464 decode_path=sub_sub_decode_path,
6466 _ctx_immutable=False,
6468 if len(defined_tail) > 0:
6471 klass=self.__class__,
6472 decode_path=sub_sub_decode_path,
6475 _value.defined = (defined_by, defined_value)
6477 defined_value, defined_tail = defined_spec.decode(
6478 memoryview(bytes(value)),
6480 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6481 if value.expled else (value.tlen + value.llen)
6484 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6486 _ctx_immutable=False,
6488 if len(defined_tail) > 0:
6491 klass=self.__class__,
6492 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6495 value.defined = (defined_by, defined_value)
6497 value_len = value.fulllen
6499 sub_offset += value_len
6502 if spec.default is not None and value == spec.default:
6503 # This will not work in evgen_mode
6504 if ctx_bered or ctx_allow_default_values:
6508 "DEFAULT value met",
6509 klass=self.__class__,
6510 decode_path=sub_decode_path,
6513 values[name] = value
6514 spec_defines = getattr(spec, "defines", ())
6515 if len(spec_defines) == 0:
6516 defines_by_path = ctx.get("defines_by_path", ())
6517 if len(defines_by_path) > 0:
6518 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6519 if spec_defines is not None and len(spec_defines) > 0:
6520 for rel_path, schema in spec_defines:
6521 defined = schema.get(value, None)
6522 if defined is not None:
6523 ctx.setdefault("_defines", []).append((
6524 abs_decode_path(sub_decode_path[:-1], rel_path),
6528 if v[:EOC_LEN].tobytes() != EOC:
6531 klass=self.__class__,
6532 decode_path=decode_path,
6540 klass=self.__class__,
6541 decode_path=decode_path,
6544 obj = self.__class__(
6548 default=self.default,
6549 optional=self.optional,
6550 _decoded=(offset, llen, vlen),
6553 obj.lenindef = lenindef
6554 obj.ber_encoded = ber_encoded
6555 yield decode_path, obj, tail
6558 value = pp_console_row(next(self.pps()))
6560 for name in self.specs:
6561 _value = self._value.get(name)
6564 cols.append("%s: %s" % (name, repr(_value)))
6565 return "%s[%s]" % (value, "; ".join(cols))
6567 def pps(self, decode_path=()):
6570 asn1_type_name=self.asn1_type_name,
6571 obj_name=self.__class__.__name__,
6572 decode_path=decode_path,
6573 optional=self.optional,
6574 default=self == self.default,
6575 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6576 expl=None if self._expl is None else tag_decode(self._expl),
6581 expl_offset=self.expl_offset if self.expled else None,
6582 expl_tlen=self.expl_tlen if self.expled else None,
6583 expl_llen=self.expl_llen if self.expled else None,
6584 expl_vlen=self.expl_vlen if self.expled else None,
6585 expl_lenindef=self.expl_lenindef,
6586 lenindef=self.lenindef,
6587 ber_encoded=self.ber_encoded,
6590 for name in self.specs:
6591 value = self._value.get(name)
6594 yield value.pps(decode_path=decode_path + (name,))
6595 for pp in self.pps_lenindef(decode_path):
6599 class Set(Sequence, SequenceEncode1stMixing):
6600 """``SET`` structure type
6602 Its usage is identical to :py:class:`pyderasn.Sequence`.
6604 .. _allow_unordered_set_ctx:
6606 DER prohibits unordered values encoding and will raise an error
6607 during decode. If :ref:`bered <bered_ctx>` context option is set,
6608 then no error will occur. Also you can disable strict values
6609 ordering check by setting ``"allow_unordered_set": True``
6610 :ref:`context <ctx>` option.
6613 tag_default = tag_encode(form=TagFormConstructed, num=17)
6614 asn1_type_name = "SET"
6616 def _values_for_encoding(self):
6618 super(Set, self)._values_for_encoding(),
6619 key=attrgetter("tag_order"),
6622 def _encode_cer(self, writer):
6623 write_full(writer, self.tag + LENINDEF)
6625 super(Set, self)._values_for_encoding(),
6626 key=attrgetter("tag_order_cer"),
6628 v.encode_cer(writer)
6629 write_full(writer, EOC)
6631 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6633 t, tlen, lv = tag_strip(tlv)
6634 except DecodeError as err:
6635 raise err.__class__(
6637 klass=self.__class__,
6638 decode_path=decode_path,
6643 klass=self.__class__,
6644 decode_path=decode_path,
6651 ctx_bered = ctx.get("bered", False)
6653 l, llen, v = len_decode(lv)
6654 except LenIndefForm as err:
6656 raise err.__class__(
6658 klass=self.__class__,
6659 decode_path=decode_path,
6662 l, llen, v = 0, 1, lv[1:]
6664 except DecodeError as err:
6665 raise err.__class__(
6667 klass=self.__class__,
6668 decode_path=decode_path,
6672 raise NotEnoughData(
6673 "encoded length is longer than data",
6674 klass=self.__class__,
6678 v, tail = v[:l], v[l:]
6680 sub_offset = offset + tlen + llen
6683 ctx_allow_default_values = ctx.get("allow_default_values", False)
6684 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6685 tag_order_prev = (0, 0)
6686 _specs_items = copy(self.specs)
6689 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6691 for name, spec in iteritems(_specs_items):
6692 sub_decode_path = decode_path + (name,)
6698 decode_path=sub_decode_path,
6701 _ctx_immutable=False,
6708 klass=self.__class__,
6709 decode_path=decode_path,
6713 for _decode_path, value, v_tail in spec.decode_evgen(
6717 decode_path=sub_decode_path,
6719 _ctx_immutable=False,
6721 yield _decode_path, value, v_tail
6723 _, value, v_tail = next(spec.decode_evgen(
6727 decode_path=sub_decode_path,
6729 _ctx_immutable=False,
6732 value_tag_order = value.tag_order
6733 value_len = value.fulllen
6734 if tag_order_prev >= value_tag_order:
6735 if ctx_bered or ctx_allow_unordered_set:
6739 "unordered " + self.asn1_type_name,
6740 klass=self.__class__,
6741 decode_path=sub_decode_path,
6744 if spec.default is None or value != spec.default:
6746 elif ctx_bered or ctx_allow_default_values:
6750 "DEFAULT value met",
6751 klass=self.__class__,
6752 decode_path=sub_decode_path,
6755 values[name] = value
6756 del _specs_items[name]
6757 tag_order_prev = value_tag_order
6758 sub_offset += value_len
6762 obj = self.__class__(
6766 default=self.default,
6767 optional=self.optional,
6768 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6771 if v[:EOC_LEN].tobytes() != EOC:
6774 klass=self.__class__,
6775 decode_path=decode_path,
6780 for name, spec in iteritems(self.specs):
6781 if name not in values and not spec.optional:
6783 "%s value is not ready" % name,
6784 klass=self.__class__,
6785 decode_path=decode_path,
6790 obj.ber_encoded = ber_encoded
6791 yield decode_path, obj, tail
6794 SequenceOfState = namedtuple(
6796 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6801 class SequenceOf(SequenceEncode1stMixing, Obj):
6802 """``SEQUENCE OF`` sequence type
6804 For that kind of type you must specify the object it will carry on
6805 (bounds are for example here, not required)::
6807 class Ints(SequenceOf):
6812 >>> ints.append(Integer(123))
6813 >>> ints.append(Integer(234))
6815 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6816 >>> [int(i) for i in ints]
6818 >>> ints.append(Integer(345))
6819 Traceback (most recent call last):
6820 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6823 >>> ints[1] = Integer(345)
6825 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6827 You can initialize sequence with preinitialized values:
6829 >>> ints = Ints([Integer(123), Integer(234)])
6831 Also you can use iterator as a value:
6833 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6835 And it won't be iterated until encoding process. Pay attention that
6836 bounds and required schema checks are done only during the encoding
6837 process in that case! After encode was called, then value is zeroed
6838 back to empty list and you have to set it again. That mode is useful
6839 mainly with CER encoding mode, where all objects from the iterable
6840 will be streamed to the buffer, without copying all of them to
6843 __slots__ = ("spec", "_bound_min", "_bound_max")
6844 tag_default = tag_encode(form=TagFormConstructed, num=16)
6845 asn1_type_name = "SEQUENCE OF"
6858 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6860 schema = getattr(self, "schema", None)
6862 raise ValueError("schema must be specified")
6864 self._bound_min, self._bound_max = getattr(
6868 ) if bounds is None else bounds
6870 if value is not None:
6871 self._value = self._value_sanitize(value)
6872 if default is not None:
6873 default_value = self._value_sanitize(default)
6874 default_obj = self.__class__(
6879 default_obj._value = default_value
6880 self.default = default_obj
6882 self._value = copy(default_obj._value)
6884 def _value_sanitize(self, value):
6886 if issubclass(value.__class__, SequenceOf):
6887 value = value._value
6888 elif hasattr(value, NEXT_ATTR_NAME):
6890 elif hasattr(value, "__iter__"):
6893 raise InvalidValueType((self.__class__, iter, "iterator"))
6895 if not self._bound_min <= len(value) <= self._bound_max:
6896 raise BoundsError(self._bound_min, len(value), self._bound_max)
6897 class_expected = self.spec.__class__
6899 if not isinstance(v, class_expected):
6900 raise InvalidValueType((class_expected,))
6905 if hasattr(self._value, NEXT_ATTR_NAME):
6907 if self._bound_min > 0 and len(self._value) == 0:
6909 return all(v.ready for v in self._value)
6913 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6915 return any(v.bered for v in self._value)
6917 def __getstate__(self):
6918 if hasattr(self._value, NEXT_ATTR_NAME):
6919 raise ValueError("can not pickle SequenceOf with iterator")
6920 return SequenceOfState(
6934 [copy(v) for v in self._value],
6939 def __setstate__(self, state):
6940 super(SequenceOf, self).__setstate__(state)
6941 self.spec = state.spec
6942 self._value = state.value
6943 self._bound_min = state.bound_min
6944 self._bound_max = state.bound_max
6946 def __eq__(self, their):
6947 if isinstance(their, self.__class__):
6949 self.spec == their.spec and
6950 self.tag == their.tag and
6951 self._expl == their._expl and
6952 self._value == their._value
6954 if hasattr(their, "__iter__"):
6955 return self._value == list(their)
6967 return self.__class__(
6971 (self._bound_min, self._bound_max)
6972 if bounds is None else bounds
6974 impl=self.tag if impl is None else impl,
6975 expl=self._expl if expl is None else expl,
6976 default=self.default if default is None else default,
6977 optional=self.optional if optional is None else optional,
6980 def __contains__(self, key):
6981 return key in self._value
6983 def append(self, value):
6984 if not isinstance(value, self.spec.__class__):
6985 raise InvalidValueType((self.spec.__class__,))
6986 if len(self._value) + 1 > self._bound_max:
6989 len(self._value) + 1,
6992 self._value.append(value)
6995 return iter(self._value)
6998 return len(self._value)
7000 def __setitem__(self, key, value):
7001 if not isinstance(value, self.spec.__class__):
7002 raise InvalidValueType((self.spec.__class__,))
7003 self._value[key] = self.spec(value=value)
7005 def __getitem__(self, key):
7006 return self._value[key]
7008 def _values_for_encoding(self):
7009 return iter(self._value)
7012 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7015 values_append = values.append
7016 class_expected = self.spec.__class__
7017 values_for_encoding = self._values_for_encoding()
7019 for v in values_for_encoding:
7020 if not isinstance(v, class_expected):
7021 raise InvalidValueType((class_expected,))
7022 values_append(v.encode())
7023 if not self._bound_min <= len(values) <= self._bound_max:
7024 raise BoundsError(self._bound_min, len(values), self._bound_max)
7025 value = b"".join(values)
7027 value = b"".join(v.encode() for v in self._values_for_encoding())
7028 return b"".join((self.tag, len_encode(len(value)), value))
7030 def _encode1st(self, state):
7031 state = super(SequenceOf, self)._encode1st(state)
7032 if hasattr(self._value, NEXT_ATTR_NAME):
7036 def _encode2nd(self, writer, state_iter):
7037 write_full(writer, self.tag + len_encode(next(state_iter)))
7038 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7041 class_expected = self.spec.__class__
7042 values_for_encoding = self._values_for_encoding()
7044 for v in values_for_encoding:
7045 if not isinstance(v, class_expected):
7046 raise InvalidValueType((class_expected,))
7047 v.encode2nd(writer, state_iter)
7049 if not self._bound_min <= values_count <= self._bound_max:
7050 raise BoundsError(self._bound_min, values_count, self._bound_max)
7052 for v in self._values_for_encoding():
7053 v.encode2nd(writer, state_iter)
7055 def _encode_cer(self, writer):
7056 write_full(writer, self.tag + LENINDEF)
7057 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7059 class_expected = self.spec.__class__
7061 values_for_encoding = self._values_for_encoding()
7063 for v in values_for_encoding:
7064 if not isinstance(v, class_expected):
7065 raise InvalidValueType((class_expected,))
7066 v.encode_cer(writer)
7068 if not self._bound_min <= values_count <= self._bound_max:
7069 raise BoundsError(self._bound_min, values_count, self._bound_max)
7071 for v in self._values_for_encoding():
7072 v.encode_cer(writer)
7073 write_full(writer, EOC)
7083 ordering_check=False,
7086 t, tlen, lv = tag_strip(tlv)
7087 except DecodeError as err:
7088 raise err.__class__(
7090 klass=self.__class__,
7091 decode_path=decode_path,
7096 klass=self.__class__,
7097 decode_path=decode_path,
7104 ctx_bered = ctx.get("bered", False)
7106 l, llen, v = len_decode(lv)
7107 except LenIndefForm as err:
7109 raise err.__class__(
7111 klass=self.__class__,
7112 decode_path=decode_path,
7115 l, llen, v = 0, 1, lv[1:]
7117 except DecodeError as err:
7118 raise err.__class__(
7120 klass=self.__class__,
7121 decode_path=decode_path,
7125 raise NotEnoughData(
7126 "encoded length is longer than data",
7127 klass=self.__class__,
7128 decode_path=decode_path,
7132 v, tail = v[:l], v[l:]
7134 sub_offset = offset + tlen + llen
7137 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7138 value_prev = memoryview(v[:0])
7142 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7144 sub_decode_path = decode_path + (str(_value_count),)
7146 for _decode_path, value, v_tail in spec.decode_evgen(
7150 decode_path=sub_decode_path,
7152 _ctx_immutable=False,
7154 yield _decode_path, value, v_tail
7156 _, value, v_tail = next(spec.decode_evgen(
7160 decode_path=sub_decode_path,
7162 _ctx_immutable=False,
7165 value_len = value.fulllen
7167 if value_prev.tobytes() > v[:value_len].tobytes():
7168 if ctx_bered or ctx_allow_unordered_set:
7172 "unordered " + self.asn1_type_name,
7173 klass=self.__class__,
7174 decode_path=sub_decode_path,
7177 value_prev = v[:value_len]
7180 _value.append(value)
7181 sub_offset += value_len
7184 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7186 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7187 klass=self.__class__,
7188 decode_path=decode_path,
7192 obj = self.__class__(
7193 value=None if evgen_mode else _value,
7195 bounds=(self._bound_min, self._bound_max),
7198 default=self.default,
7199 optional=self.optional,
7200 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7202 except BoundsError as err:
7205 klass=self.__class__,
7206 decode_path=decode_path,
7210 if v[:EOC_LEN].tobytes() != EOC:
7213 klass=self.__class__,
7214 decode_path=decode_path,
7219 obj.ber_encoded = ber_encoded
7220 yield decode_path, obj, tail
7224 pp_console_row(next(self.pps())),
7225 ", ".join(repr(v) for v in self._value),
7228 def pps(self, decode_path=()):
7231 asn1_type_name=self.asn1_type_name,
7232 obj_name=self.__class__.__name__,
7233 decode_path=decode_path,
7234 optional=self.optional,
7235 default=self == self.default,
7236 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7237 expl=None if self._expl is None else tag_decode(self._expl),
7242 expl_offset=self.expl_offset if self.expled else None,
7243 expl_tlen=self.expl_tlen if self.expled else None,
7244 expl_llen=self.expl_llen if self.expled else None,
7245 expl_vlen=self.expl_vlen if self.expled else None,
7246 expl_lenindef=self.expl_lenindef,
7247 lenindef=self.lenindef,
7248 ber_encoded=self.ber_encoded,
7251 for i, value in enumerate(self._value):
7252 yield value.pps(decode_path=decode_path + (str(i),))
7253 for pp in self.pps_lenindef(decode_path):
7257 class SetOf(SequenceOf):
7258 """``SET OF`` sequence type
7260 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7263 tag_default = tag_encode(form=TagFormConstructed, num=17)
7264 asn1_type_name = "SET OF"
7266 def _value_sanitize(self, value):
7267 value = super(SetOf, self)._value_sanitize(value)
7268 if hasattr(value, NEXT_ATTR_NAME):
7270 "SetOf does not support iterator values, as no sense in them"
7275 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7276 return b"".join((self.tag, len_encode(len(v)), v))
7278 def _encode2nd(self, writer, state_iter):
7279 write_full(writer, self.tag + len_encode(next(state_iter)))
7281 for v in self._values_for_encoding():
7283 v.encode2nd(buf.write, state_iter)
7284 values.append(buf.getvalue())
7287 write_full(writer, v)
7289 def _encode_cer(self, writer):
7290 write_full(writer, self.tag + LENINDEF)
7291 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7292 write_full(writer, v)
7293 write_full(writer, EOC)
7295 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7296 return super(SetOf, self)._decode(
7303 ordering_check=True,
7307 def obj_by_path(pypath): # pragma: no cover
7308 """Import object specified as string Python path
7310 Modules must be separated from classes/functions with ``:``.
7312 >>> obj_by_path("foo.bar:Baz")
7313 <class 'foo.bar.Baz'>
7314 >>> obj_by_path("foo.bar:Baz.boo")
7315 <classmethod 'foo.bar.Baz.boo'>
7317 mod, objs = pypath.rsplit(":", 1)
7318 from importlib import import_module
7319 obj = import_module(mod)
7320 for obj_name in objs.split("."):
7321 obj = getattr(obj, obj_name)
7325 def generic_decoder(): # pragma: no cover
7326 # All of this below is a big hack with self references
7327 choice = PrimitiveTypes()
7328 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7329 choice.specs["SetOf"] = SetOf(schema=choice)
7330 for i in six_xrange(31):
7331 choice.specs["SequenceOf%d" % i] = SequenceOf(
7335 choice.specs["Any"] = Any()
7337 # Class name equals to type name, to omit it from output
7338 class SEQUENCEOF(SequenceOf):
7346 with_decode_path=False,
7347 decode_path_only=(),
7350 def _pprint_pps(pps):
7352 if hasattr(pp, "_fields"):
7354 decode_path_only != () and
7355 pp.decode_path[:len(decode_path_only)] != decode_path_only
7358 if pp.asn1_type_name == Choice.asn1_type_name:
7360 pp_kwargs = pp._asdict()
7361 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7362 pp = _pp(**pp_kwargs)
7363 yield pp_console_row(
7368 with_colours=with_colours,
7369 with_decode_path=with_decode_path,
7370 decode_path_len_decrease=len(decode_path_only),
7372 for row in pp_console_blob(
7374 decode_path_len_decrease=len(decode_path_only),
7378 for row in _pprint_pps(pp):
7380 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7381 return SEQUENCEOF(), pprint_any
7384 def main(): # pragma: no cover
7386 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7387 parser.add_argument(
7391 help="Skip that number of bytes from the beginning",
7393 parser.add_argument(
7395 help="Python paths to dictionary with OIDs, comma separated",
7397 parser.add_argument(
7399 help="Python path to schema definition to use",
7401 parser.add_argument(
7402 "--defines-by-path",
7403 help="Python path to decoder's defines_by_path",
7405 parser.add_argument(
7407 action="store_true",
7408 help="Disallow BER encoding",
7410 parser.add_argument(
7411 "--print-decode-path",
7412 action="store_true",
7413 help="Print decode paths",
7415 parser.add_argument(
7416 "--decode-path-only",
7417 help="Print only specified decode path",
7419 parser.add_argument(
7421 action="store_true",
7422 help="Allow explicit tag out-of-bound",
7424 parser.add_argument(
7426 action="store_true",
7427 help="Turn on event generation mode",
7429 parser.add_argument(
7431 type=argparse.FileType("rb"),
7432 help="Path to BER/CER/DER file you want to decode",
7434 args = parser.parse_args()
7436 args.RAWFile.seek(args.skip)
7437 raw = memoryview(args.RAWFile.read())
7438 args.RAWFile.close()
7440 raw = file_mmaped(args.RAWFile)[args.skip:]
7442 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7443 if args.oids else ()
7445 from functools import partial
7447 schema = obj_by_path(args.schema)
7448 pprinter = partial(pprint, big_blobs=True)
7450 schema, pprinter = generic_decoder()
7452 "bered": not args.nobered,
7453 "allow_expl_oob": args.allow_expl_oob,
7455 if args.defines_by_path is not None:
7456 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7457 from os import environ
7461 with_colours=environ.get("NO_COLOR") is None,
7462 with_decode_path=args.print_decode_path,
7464 () if args.decode_path_only is None else
7465 tuple(args.decode_path_only.split(":"))
7469 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7470 print(pprinter(obj, decode_path=decode_path))
7472 obj, tail = schema().decode(raw, ctx=ctx)
7473 print(pprinter(obj))
7475 print("\nTrailing data: %s" % hexenc(tail))
7478 if __name__ == "__main__":