3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
670 SEQUENCE/SET values with DEFAULT specified are automatically decoded
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
687 with open("huge", "rb") as fd:
688 raw = file_mmaped(fd)
689 obj = Something.decode(raw)
693 mmap-ed files in Python2.7 does not implement buffer protocol, so
694 memoryview won't work on them.
698 mmap maps the **whole** file. So it plays no role if you seek-ed it
699 before. Take the slice of the resulting memoryview with required
704 If you use ZFS as underlying storage, then pay attention that
705 currently most platforms does not deal good with ZFS ARC and ordinary
706 page cache used for mmaps. It can take twice the necessary size in
707 the memory: both in page cache and ZFS ARC.
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
725 opener = io.open if PY2 else open
726 with opener("result", "wb") as fd:
727 obj.encode_cer(fd.write)
732 obj.encode_cer(buf.write)
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
737 data = encode_cer(obj)
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
746 SET OF values can not be streamingly encoded, because they are
747 required to be sorted byte-by-byte. Big SET OF values still will take
748 much memory. Use neither SET nor SET OF values, as modern ASN.1
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
762 class Certificate(Sequence):
766 class SignedAttributes(SetOf):
771 .. _agg_octet_string:
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
780 SOME STRING[CONSTRUCTED]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[CONSTRUCTED]
797 OCTET STRING[PRIMITIVE]
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
812 fd = open("data.p7m", "rb")
813 raw = file_mmaped(fd)
814 ctx = {"bered": True}
815 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816 if decode_path == ("content",):
820 raise ValueError("no content found")
821 hasher_state = sha512()
823 hasher_state.update(data)
825 evgens = SignedData().decode_evgen(
826 raw[content.offset:],
827 offset=content.offset,
830 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
832 digest = hasher_state.digest()
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
841 SEQUENCE OF iterators
842 _____________________
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
878 fulllen, state = obj.encode1st()
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
885 opener = io.open if PY2 else open
886 with opener("result", "wb") as fd:
887 obj.encode2nd(fd.write, iter(state))
891 You **MUST NOT** use 1st pass state if anything is changed in the
892 objects. It is intended to be used immediately after 1st pass is
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
908 .. autofunction:: pyderasn.browse
912 .. autoclass:: pyderasn.Obj
920 .. autoclass:: pyderasn.Boolean
925 .. autoclass:: pyderasn.Integer
926 :members: __init__, named, tohex
930 .. autoclass:: pyderasn.BitString
931 :members: __init__, bit_len, named
935 .. autoclass:: pyderasn.OctetString
940 .. autoclass:: pyderasn.Null
945 .. autoclass:: pyderasn.ObjectIdentifier
950 .. autoclass:: pyderasn.Enumerated
954 .. autoclass:: pyderasn.CommonString
958 .. autoclass:: pyderasn.NumericString
962 .. autoclass:: pyderasn.PrintableString
963 :members: __init__, allow_asterisk, allow_ampersand
967 .. autoclass:: pyderasn.IA5String
971 .. autoclass:: pyderasn.VisibleString
975 .. autoclass:: pyderasn.UTCTime
976 :members: __init__, todatetime
980 .. autoclass:: pyderasn.GeneralizedTime
981 :members: __init__, todatetime
988 .. autoclass:: pyderasn.Choice
989 :members: __init__, choice, value
993 .. autoclass:: PrimitiveTypes
997 .. autoclass:: pyderasn.Any
1005 .. autoclass:: pyderasn.Sequence
1010 .. autoclass:: pyderasn.Set
1015 .. autoclass:: pyderasn.SequenceOf
1020 .. autoclass:: pyderasn.SetOf
1026 .. autofunction:: pyderasn.abs_decode_path
1027 .. autofunction:: pyderasn.agg_octet_string
1028 .. autofunction:: pyderasn.ascii_visualize
1029 .. autofunction:: pyderasn.colonize_hex
1030 .. autofunction:: pyderasn.encode2pass
1031 .. autofunction:: pyderasn.encode_cer
1032 .. autofunction:: pyderasn.file_mmaped
1033 .. autofunction:: pyderasn.hexenc
1034 .. autofunction:: pyderasn.hexdec
1035 .. autofunction:: pyderasn.hexdump
1036 .. autofunction:: pyderasn.tag_encode
1037 .. autofunction:: pyderasn.tag_decode
1038 .. autofunction:: pyderasn.tag_ctxp
1039 .. autofunction:: pyderasn.tag_ctxc
1040 .. autoclass:: pyderasn.DecodeError
1042 .. autoclass:: pyderasn.NotEnoughData
1043 .. autoclass:: pyderasn.ExceedingData
1044 .. autoclass:: pyderasn.LenIndefForm
1045 .. autoclass:: pyderasn.TagMismatch
1046 .. autoclass:: pyderasn.InvalidLength
1047 .. autoclass:: pyderasn.InvalidOID
1048 .. autoclass:: pyderasn.ObjUnknown
1049 .. autoclass:: pyderasn.ObjNotReady
1050 .. autoclass:: pyderasn.InvalidValueType
1051 .. autoclass:: pyderasn.BoundsError
1058 You can decode DER/BER files using command line abilities::
1060 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1062 If there is no schema for your file, then you can try parsing it without,
1063 but of course IMPLICIT tags will often make it impossible. But result is
1064 good enough for the certificate above::
1066 $ python -m pyderasn path/to/file
1067 0 [1,3,1604] . >: SEQUENCE OF
1068 4 [1,3,1453] . . >: SEQUENCE OF
1069 8 [0,0, 5] . . . . >: [0] ANY
1070 . . . . . A0:03:02:01:02
1071 13 [1,1, 3] . . . . >: INTEGER 61595
1072 18 [1,1, 13] . . . . >: SEQUENCE OF
1073 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1074 31 [1,1, 0] . . . . . . >: NULL
1075 33 [1,3, 274] . . . . >: SEQUENCE OF
1076 37 [1,1, 11] . . . . . . >: SET OF
1077 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1078 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1079 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1081 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1082 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1083 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1084 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1085 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1086 . . . . . . . . . 61:2E:63:6F:6D:2F
1087 1461 [1,1, 13] . . >: SEQUENCE OF
1088 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1089 1474 [1,1, 0] . . . . >: NULL
1090 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1091 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1092 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1098 If you have got dictionaries with ObjectIdentifiers, like example one
1099 from ``tests/test_crts.py``::
1102 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1103 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1105 "2.5.4.10": "id-at-organizationName",
1106 "2.5.4.11": "id-at-organizationalUnitName",
1109 then you can pass it to pretty printer to see human readable OIDs::
1111 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1113 37 [1,1, 11] . . . . . . >: SET OF
1114 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1115 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1116 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1117 50 [1,1, 18] . . . . . . >: SET OF
1118 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1119 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1120 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1121 70 [1,1, 18] . . . . . . >: SET OF
1122 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1123 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1124 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1130 Each decoded element has so-called decode path: sequence of structure
1131 names it is passing during the decode process. Each element has its own
1132 unique path inside the whole ASN.1 tree. You can print it out with
1133 ``--print-decode-path`` option::
1135 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1136 0 [1,3,1604] Certificate SEQUENCE []
1137 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1138 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1139 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1140 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1141 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1142 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1144 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1145 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1146 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1147 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1148 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1149 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1150 . . . . . . . 13:02:45:53
1151 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1154 Now you can print only the specified tree, for example signature algorithm::
1156 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1157 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1158 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1159 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1163 from array import array
1164 from codecs import getdecoder
1165 from codecs import getencoder
1166 from collections import namedtuple
1167 from collections import OrderedDict
1168 from copy import copy
1169 from datetime import datetime
1170 from datetime import timedelta
1171 from io import BytesIO
1172 from math import ceil
1173 from mmap import mmap
1174 from mmap import PROT_READ
1175 from operator import attrgetter
1176 from string import ascii_letters
1177 from string import digits
1178 from sys import maxsize as sys_maxsize
1179 from sys import version_info
1180 from unicodedata import category as unicat
1182 from six import add_metaclass
1183 from six import binary_type
1184 from six import byte2int
1185 from six import indexbytes
1186 from six import int2byte
1187 from six import integer_types
1188 from six import iterbytes
1189 from six import iteritems
1190 from six import itervalues
1192 from six import string_types
1193 from six import text_type
1194 from six import unichr as six_unichr
1195 from six.moves import xrange as six_xrange
1199 from termcolor import colored
1200 except ImportError: # pragma: no cover
1201 def colored(what, *args, **kwargs):
1252 "TagClassApplication",
1255 "TagClassUniversal",
1256 "TagFormConstructed",
1267 TagClassUniversal = 0
1268 TagClassApplication = 1 << 6
1269 TagClassContext = 1 << 7
1270 TagClassPrivate = 1 << 6 | 1 << 7
1271 TagFormPrimitive = 0
1272 TagFormConstructed = 1 << 5
1274 TagClassContext: "",
1275 TagClassApplication: "APPLICATION ",
1276 TagClassPrivate: "PRIVATE ",
1277 TagClassUniversal: "UNIV ",
1281 LENINDEF = b"\x80" # length indefinite mark
1282 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1283 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1284 SET01 = frozenset("01")
1285 DECIMALS = frozenset(digits)
1286 DECIMAL_SIGNS = ".,"
1287 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1290 def file_mmaped(fd):
1291 """Make mmap-ed memoryview for reading from file
1293 :param fd: file object
1294 :returns: memoryview over read-only mmap-ing of the whole file
1296 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1300 if not set(value) <= DECIMALS:
1301 raise ValueError("non-pure integer")
1305 def fractions2float(fractions_raw):
1306 pureint(fractions_raw)
1307 return float("0." + fractions_raw)
1310 def get_def_by_path(defines_by_path, sub_decode_path):
1311 """Get define by decode path
1313 for path, define in defines_by_path:
1314 if len(path) != len(sub_decode_path):
1316 for p1, p2 in zip(path, sub_decode_path):
1317 if (p1 is not any) and (p1 != p2):
1323 ########################################################################
1325 ########################################################################
1327 class ASN1Error(ValueError):
1331 class DecodeError(ASN1Error):
1332 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1334 :param str msg: reason of decode failing
1335 :param klass: optional exact DecodeError inherited class (like
1336 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1337 :py:exc:`InvalidLength`)
1338 :param decode_path: tuple of strings. It contains human
1339 readable names of the fields through which
1340 decoding process has passed
1341 :param int offset: binary offset where failure happened
1343 super(DecodeError, self).__init__()
1346 self.decode_path = decode_path
1347 self.offset = offset
1352 "" if self.klass is None else self.klass.__name__,
1354 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1355 if len(self.decode_path) > 0 else ""
1357 ("(at %d)" % self.offset) if self.offset > 0 else "",
1363 return "%s(%s)" % (self.__class__.__name__, self)
1366 class NotEnoughData(DecodeError):
1370 class ExceedingData(ASN1Error):
1371 def __init__(self, nbytes):
1372 super(ExceedingData, self).__init__()
1373 self.nbytes = nbytes
1376 return "%d trailing bytes" % self.nbytes
1379 return "%s(%s)" % (self.__class__.__name__, self)
1382 class LenIndefForm(DecodeError):
1386 class TagMismatch(DecodeError):
1390 class InvalidLength(DecodeError):
1394 class InvalidOID(DecodeError):
1398 class ObjUnknown(ASN1Error):
1399 def __init__(self, name):
1400 super(ObjUnknown, self).__init__()
1404 return "object is unknown: %s" % self.name
1407 return "%s(%s)" % (self.__class__.__name__, self)
1410 class ObjNotReady(ASN1Error):
1411 def __init__(self, name):
1412 super(ObjNotReady, self).__init__()
1416 return "object is not ready: %s" % self.name
1419 return "%s(%s)" % (self.__class__.__name__, self)
1422 class InvalidValueType(ASN1Error):
1423 def __init__(self, expected_types):
1424 super(InvalidValueType, self).__init__()
1425 self.expected_types = expected_types
1428 return "invalid value type, expected: %s" % ", ".join(
1429 [repr(t) for t in self.expected_types]
1433 return "%s(%s)" % (self.__class__.__name__, self)
1436 class BoundsError(ASN1Error):
1437 def __init__(self, bound_min, value, bound_max):
1438 super(BoundsError, self).__init__()
1439 self.bound_min = bound_min
1441 self.bound_max = bound_max
1444 return "unsatisfied bounds: %s <= %s <= %s" % (
1451 return "%s(%s)" % (self.__class__.__name__, self)
1454 ########################################################################
1456 ########################################################################
1458 _hexdecoder = getdecoder("hex")
1459 _hexencoder = getencoder("hex")
1463 """Binary data to hexadecimal string convert
1465 return _hexdecoder(data)[0]
1469 """Hexadecimal string to binary data convert
1471 return _hexencoder(data)[0].decode("ascii")
1474 def int_bytes_len(num, byte_len=8):
1477 return int(ceil(float(num.bit_length()) / byte_len))
1480 def zero_ended_encode(num):
1481 octets = bytearray(int_bytes_len(num, 7))
1483 octets[i] = num & 0x7F
1487 octets[i] = 0x80 | (num & 0x7F)
1490 return bytes(octets)
1493 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1494 """Encode tag to binary form
1496 :param int num: tag's number
1497 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1498 :py:data:`pyderasn.TagClassContext`,
1499 :py:data:`pyderasn.TagClassApplication`,
1500 :py:data:`pyderasn.TagClassPrivate`)
1501 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1502 :py:data:`pyderasn.TagFormConstructed`)
1506 return int2byte(klass | form | num)
1507 # [XX|X|11111][1.......][1.......] ... [0.......]
1508 return int2byte(klass | form | 31) + zero_ended_encode(num)
1511 def tag_decode(tag):
1512 """Decode tag from binary form
1516 No validation is performed, assuming that it has already passed.
1518 It returns tuple with three integers, as
1519 :py:func:`pyderasn.tag_encode` accepts.
1521 first_octet = byte2int(tag)
1522 klass = first_octet & 0xC0
1523 form = first_octet & 0x20
1524 if first_octet & 0x1F < 0x1F:
1525 return (klass, form, first_octet & 0x1F)
1527 for octet in iterbytes(tag[1:]):
1530 return (klass, form, num)
1534 """Create CONTEXT PRIMITIVE tag
1536 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1540 """Create CONTEXT CONSTRUCTED tag
1542 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1545 def tag_strip(data):
1546 """Take off tag from the data
1548 :returns: (encoded tag, tag length, remaining data)
1551 raise NotEnoughData("no data at all")
1552 if byte2int(data) & 0x1F < 31:
1553 return data[:1], 1, data[1:]
1558 raise DecodeError("unfinished tag")
1559 if indexbytes(data, i) & 0x80 == 0:
1562 return data[:i], i, data[i:]
1568 octets = bytearray(int_bytes_len(l) + 1)
1569 octets[0] = 0x80 | (len(octets) - 1)
1570 for i in six_xrange(len(octets) - 1, 0, -1):
1571 octets[i] = l & 0xFF
1573 return bytes(octets)
1576 def len_decode(data):
1579 :returns: (decoded length, length's length, remaining data)
1580 :raises LenIndefForm: if indefinite form encoding is met
1583 raise NotEnoughData("no data at all")
1584 first_octet = byte2int(data)
1585 if first_octet & 0x80 == 0:
1586 return first_octet, 1, data[1:]
1587 octets_num = first_octet & 0x7F
1588 if octets_num + 1 > len(data):
1589 raise NotEnoughData("encoded length is longer than data")
1591 raise LenIndefForm()
1592 if byte2int(data[1:]) == 0:
1593 raise DecodeError("leading zeros")
1595 for v in iterbytes(data[1:1 + octets_num]):
1598 raise DecodeError("long form instead of short one")
1599 return l, 1 + octets_num, data[1 + octets_num:]
1602 LEN0 = len_encode(0)
1603 LEN1 = len_encode(1)
1604 LEN1K = len_encode(1000)
1608 """How many bytes length field will take
1612 if l < 256: # 1 << 8
1614 if l < 65536: # 1 << 16
1616 if l < 16777216: # 1 << 24
1618 if l < 4294967296: # 1 << 32
1620 if l < 1099511627776: # 1 << 40
1622 if l < 281474976710656: # 1 << 48
1624 if l < 72057594037927936: # 1 << 56
1626 raise OverflowError("too big length")
1629 def write_full(writer, data):
1630 """Fully write provided data
1632 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1634 BytesIO does not guarantee that the whole data will be written at
1635 once. That function write everything provided, raising an error if
1636 ``writer`` returns None.
1638 data = memoryview(data)
1640 while written != len(data):
1641 n = writer(data[written:])
1643 raise ValueError("can not write to buf")
1647 # If it is 64-bit system, then use compact 64-bit array of unsigned
1648 # longs. Use an ordinary list with universal integers otherwise, that
1650 if sys_maxsize > 2 ** 32:
1651 def state_2pass_new():
1654 def state_2pass_new():
1658 ########################################################################
1660 ########################################################################
1662 class AutoAddSlots(type):
1663 def __new__(cls, name, bases, _dict):
1664 _dict["__slots__"] = _dict.get("__slots__", ())
1665 return type.__new__(cls, name, bases, _dict)
1668 BasicState = namedtuple("BasicState", (
1681 ), **NAMEDTUPLE_KWARGS)
1684 @add_metaclass(AutoAddSlots)
1686 """Common ASN.1 object class
1688 All ASN.1 types are inherited from it. It has metaclass that
1689 automatically adds ``__slots__`` to all inherited classes.
1714 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1715 self._expl = getattr(self, "expl", None) if expl is None else expl
1716 if self.tag != self.tag_default and self._expl is not None:
1717 raise ValueError("implicit and explicit tags can not be set simultaneously")
1718 if self.tag is None:
1719 self._tag_order = None
1721 tag_class, _, tag_num = tag_decode(
1722 self.tag if self._expl is None else self._expl
1724 self._tag_order = (tag_class, tag_num)
1725 if default is not None:
1727 self.optional = optional
1728 self.offset, self.llen, self.vlen = _decoded
1730 self.expl_lenindef = False
1731 self.lenindef = False
1732 self.ber_encoded = False
1735 def ready(self): # pragma: no cover
1736 """Is object ready to be encoded?
1738 raise NotImplementedError()
1740 def _assert_ready(self):
1742 raise ObjNotReady(self.__class__.__name__)
1746 """Is either object or any elements inside is BER encoded?
1748 return self.expl_lenindef or self.lenindef or self.ber_encoded
1752 """Is object decoded?
1754 return (self.llen + self.vlen) > 0
1756 def __getstate__(self): # pragma: no cover
1757 """Used for making safe to be mutable pickleable copies
1759 raise NotImplementedError()
1761 def __setstate__(self, state):
1762 if state.version != __version__:
1763 raise ValueError("data is pickled by different PyDERASN version")
1764 self.tag = state.tag
1765 self._tag_order = state.tag_order
1766 self._expl = state.expl
1767 self.default = state.default
1768 self.optional = state.optional
1769 self.offset = state.offset
1770 self.llen = state.llen
1771 self.vlen = state.vlen
1772 self.expl_lenindef = state.expl_lenindef
1773 self.lenindef = state.lenindef
1774 self.ber_encoded = state.ber_encoded
1777 def tag_order(self):
1778 """Tag's (class, number) used for DER/CER sorting
1780 return self._tag_order
1783 def tag_order_cer(self):
1784 return self.tag_order
1788 """.. seealso:: :ref:`decoding`
1790 return len(self.tag)
1794 """.. seealso:: :ref:`decoding`
1796 return self.tlen + self.llen + self.vlen
1798 def __str__(self): # pragma: no cover
1799 return self.__bytes__() if PY2 else self.__unicode__()
1801 def __ne__(self, their):
1802 return not(self == their)
1804 def __gt__(self, their): # pragma: no cover
1805 return not(self < their)
1807 def __le__(self, their): # pragma: no cover
1808 return (self == their) or (self < their)
1810 def __ge__(self, their): # pragma: no cover
1811 return (self == their) or (self > their)
1813 def _encode(self): # pragma: no cover
1814 raise NotImplementedError()
1816 def _encode_cer(self, writer):
1817 write_full(writer, self._encode())
1819 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1820 yield NotImplemented
1822 def _encode1st(self, state):
1823 raise NotImplementedError()
1825 def _encode2nd(self, writer, state_iter):
1826 raise NotImplementedError()
1829 """DER encode the structure
1831 :returns: DER representation
1833 raw = self._encode()
1834 if self._expl is None:
1836 return b"".join((self._expl, len_encode(len(raw)), raw))
1838 def encode1st(self, state=None):
1839 """Do the 1st pass of 2-pass encoding
1841 :rtype: (int, array("L"))
1842 :returns: full length of encoded data and precalculated various
1846 state = state_2pass_new()
1847 if self._expl is None:
1848 return self._encode1st(state)
1850 idx = len(state) - 1
1851 vlen, _ = self._encode1st(state)
1853 fulllen = len(self._expl) + len_size(vlen) + vlen
1854 return fulllen, state
1856 def encode2nd(self, writer, state_iter):
1857 """Do the 2nd pass of 2-pass encoding
1859 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1860 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1862 if self._expl is None:
1863 self._encode2nd(writer, state_iter)
1865 write_full(writer, self._expl + len_encode(next(state_iter)))
1866 self._encode2nd(writer, state_iter)
1868 def encode_cer(self, writer):
1869 """CER encode the structure to specified writer
1871 :param writer: must comply with ``io.RawIOBase.write``
1872 behaviour. It takes slice to be written and
1873 returns number of bytes processed. If it returns
1874 None, then exception will be raised
1876 if self._expl is not None:
1877 write_full(writer, self._expl + LENINDEF)
1878 if getattr(self, "der_forced", False):
1879 write_full(writer, self._encode())
1881 self._encode_cer(writer)
1882 if self._expl is not None:
1883 write_full(writer, EOC)
1885 def hexencode(self):
1886 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1888 return hexenc(self.encode())
1898 _ctx_immutable=True,
1902 :param data: either binary or memoryview
1903 :param int offset: initial data's offset
1904 :param bool leavemm: do we need to leave memoryview of remaining
1905 data as is, or convert it to bytes otherwise
1906 :param decode_path: current decode path (tuples of strings,
1907 possibly with DecodePathDefBy) with will be
1908 the root for all underlying objects
1909 :param ctx: optional :ref:`context <ctx>` governing decoding process
1910 :param bool tag_only: decode only the tag, without length and
1911 contents (used only in Choice and Set
1912 structures, trying to determine if tag satisfies
1914 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1916 :returns: (Obj, remaining data)
1918 .. seealso:: :ref:`decoding`
1920 result = next(self.decode_evgen(
1932 _, obj, tail = result
1943 _ctx_immutable=True,
1946 """Decode with evgen mode on
1948 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1949 it returns the generator producing ``(decode_path, obj, tail)``
1951 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1955 elif _ctx_immutable:
1957 tlv = memoryview(data)
1960 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1963 if self._expl is None:
1964 for result in self._decode(
1967 decode_path=decode_path,
1970 evgen_mode=_evgen_mode,
1975 _decode_path, obj, tail = result
1976 if _decode_path is not decode_path:
1980 t, tlen, lv = tag_strip(tlv)
1981 except DecodeError as err:
1982 raise err.__class__(
1984 klass=self.__class__,
1985 decode_path=decode_path,
1990 klass=self.__class__,
1991 decode_path=decode_path,
1995 l, llen, v = len_decode(lv)
1996 except LenIndefForm as err:
1997 if not ctx.get("bered", False):
1998 raise err.__class__(
2000 klass=self.__class__,
2001 decode_path=decode_path,
2005 offset += tlen + llen
2006 for result in self._decode(
2009 decode_path=decode_path,
2012 evgen_mode=_evgen_mode,
2014 if tag_only: # pragma: no cover
2017 _decode_path, obj, tail = result
2018 if _decode_path is not decode_path:
2020 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2021 if eoc_expected.tobytes() != EOC:
2024 klass=self.__class__,
2025 decode_path=decode_path,
2029 obj.expl_lenindef = True
2030 except DecodeError as err:
2031 raise err.__class__(
2033 klass=self.__class__,
2034 decode_path=decode_path,
2039 raise NotEnoughData(
2040 "encoded length is longer than data",
2041 klass=self.__class__,
2042 decode_path=decode_path,
2045 for result in self._decode(
2047 offset=offset + tlen + llen,
2048 decode_path=decode_path,
2051 evgen_mode=_evgen_mode,
2053 if tag_only: # pragma: no cover
2056 _decode_path, obj, tail = result
2057 if _decode_path is not decode_path:
2059 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2061 "explicit tag out-of-bound, longer than data",
2062 klass=self.__class__,
2063 decode_path=decode_path,
2066 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2068 def decod(self, data, offset=0, decode_path=(), ctx=None):
2069 """Decode the data, check that tail is empty
2071 :raises ExceedingData: if tail is not empty
2073 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2074 (decode without tail) that also checks that there is no
2077 obj, tail = self.decode(
2080 decode_path=decode_path,
2085 raise ExceedingData(len(tail))
2088 def hexdecode(self, data, *args, **kwargs):
2089 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2091 return self.decode(hexdec(data), *args, **kwargs)
2093 def hexdecod(self, data, *args, **kwargs):
2094 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2096 return self.decod(hexdec(data), *args, **kwargs)
2100 """.. seealso:: :ref:`decoding`
2102 return self._expl is not None
2106 """.. seealso:: :ref:`decoding`
2111 def expl_tlen(self):
2112 """.. seealso:: :ref:`decoding`
2114 return len(self._expl)
2117 def expl_llen(self):
2118 """.. seealso:: :ref:`decoding`
2120 if self.expl_lenindef:
2122 return len(len_encode(self.tlvlen))
2125 def expl_offset(self):
2126 """.. seealso:: :ref:`decoding`
2128 return self.offset - self.expl_tlen - self.expl_llen
2131 def expl_vlen(self):
2132 """.. seealso:: :ref:`decoding`
2137 def expl_tlvlen(self):
2138 """.. seealso:: :ref:`decoding`
2140 return self.expl_tlen + self.expl_llen + self.expl_vlen
2143 def fulloffset(self):
2144 """.. seealso:: :ref:`decoding`
2146 return self.expl_offset if self.expled else self.offset
2150 """.. seealso:: :ref:`decoding`
2152 return self.expl_tlvlen if self.expled else self.tlvlen
2154 def pps_lenindef(self, decode_path):
2155 if self.lenindef and not (
2156 getattr(self, "defined", None) is not None and
2157 self.defined[1].lenindef
2160 asn1_type_name="EOC",
2162 decode_path=decode_path,
2164 self.offset + self.tlvlen -
2165 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2173 if self.expl_lenindef:
2175 asn1_type_name="EOC",
2176 obj_name="EXPLICIT",
2177 decode_path=decode_path,
2178 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2187 def encode_cer(obj):
2188 """Encode to CER in memory buffer
2190 :returns bytes: memory buffer contents
2193 obj.encode_cer(buf.write)
2194 return buf.getvalue()
2197 def encode2pass(obj):
2198 """Encode (2-pass mode) to DER in memory buffer
2200 :returns bytes: memory buffer contents
2203 _, state = obj.encode1st()
2204 obj.encode2nd(buf.write, iter(state))
2205 return buf.getvalue()
2208 class DecodePathDefBy(object):
2209 """DEFINED BY representation inside decode path
2211 __slots__ = ("defined_by",)
2213 def __init__(self, defined_by):
2214 self.defined_by = defined_by
2216 def __ne__(self, their):
2217 return not(self == their)
2219 def __eq__(self, their):
2220 if not isinstance(their, self.__class__):
2222 return self.defined_by == their.defined_by
2225 return "DEFINED BY " + str(self.defined_by)
2228 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2231 ########################################################################
2233 ########################################################################
2235 PP = namedtuple("PP", (
2258 ), **NAMEDTUPLE_KWARGS)
2263 asn1_type_name="unknown",
2280 expl_lenindef=False,
2311 def _colourize(what, colour, with_colours, attrs=("bold",)):
2312 return colored(what, colour, attrs=attrs) if with_colours else what
2315 def colonize_hex(hexed):
2316 """Separate hexadecimal string with colons
2318 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2321 def find_oid_name(asn1_type_name, oid_maps, value):
2322 if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2323 for oid_map in oid_maps:
2324 oid_name = oid_map.get(value)
2325 if oid_name is not None:
2336 with_decode_path=False,
2337 decode_path_len_decrease=0,
2344 " " if pp.expl_offset is None else
2345 ("-%d" % (pp.offset - pp.expl_offset))
2347 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2349 col = _colourize(col, "red", with_colours, ())
2350 col += _colourize("B", "red", with_colours) if pp.bered else " "
2352 col = "[%d,%d,%4d]%s" % (
2353 pp.tlen, pp.llen, pp.vlen,
2354 LENINDEF_PP_CHAR if pp.lenindef else " "
2356 col = _colourize(col, "green", with_colours, ())
2358 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2359 if decode_path_len > 0:
2360 cols.append(" ." * decode_path_len)
2361 ent = pp.decode_path[-1]
2362 if isinstance(ent, DecodePathDefBy):
2363 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2364 value = str(ent.defined_by)
2365 oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2366 if oid_name is None:
2367 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2369 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2371 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2372 if pp.expl is not None:
2373 klass, _, num = pp.expl
2374 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2375 cols.append(_colourize(col, "blue", with_colours))
2376 if pp.impl is not None:
2377 klass, _, num = pp.impl
2378 col = "[%s%d]" % (TagClassReprs[klass], num)
2379 cols.append(_colourize(col, "blue", with_colours))
2380 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2381 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2383 cols.append(_colourize("BER", "red", with_colours))
2384 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2385 if pp.value is not None:
2387 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2388 oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2389 if oid_name is not None:
2390 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2391 if pp.asn1_type_name == Integer.asn1_type_name:
2392 cols.append(_colourize(
2393 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2396 if pp.blob.__class__ == binary_type:
2397 cols.append(hexenc(pp.blob))
2398 elif pp.blob.__class__ == tuple:
2399 cols.append(", ".join(pp.blob))
2401 cols.append(_colourize("OPTIONAL", "red", with_colours))
2403 cols.append(_colourize("DEFAULT", "red", with_colours))
2404 if with_decode_path:
2405 cols.append(_colourize(
2406 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2410 return " ".join(cols)
2413 def pp_console_blob(pp, decode_path_len_decrease=0):
2414 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2415 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2416 if decode_path_len > 0:
2417 cols.append(" ." * (decode_path_len + 1))
2418 if pp.blob.__class__ == binary_type:
2419 blob = hexenc(pp.blob).upper()
2420 for i in six_xrange(0, len(blob), 32):
2421 chunk = blob[i:i + 32]
2422 yield " ".join(cols + [colonize_hex(chunk)])
2423 elif pp.blob.__class__ == tuple:
2424 yield " ".join(cols + [", ".join(pp.blob)])
2432 with_decode_path=False,
2433 decode_path_only=(),
2436 """Pretty print object
2438 :param Obj obj: object you want to pretty print
2439 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2440 Its human readable form is printed when OID is met
2441 :param big_blobs: if large binary objects are met (like OctetString
2442 values), do we need to print them too, on separate
2444 :param with_colours: colourize output, if ``termcolor`` library
2446 :param with_decode_path: print decode path
2447 :param decode_path_only: print only that specified decode path
2449 def _pprint_pps(pps):
2451 if hasattr(pp, "_fields"):
2453 decode_path_only != () and
2455 str(p) for p in pp.decode_path[:len(decode_path_only)]
2456 ) != decode_path_only
2460 yield pp_console_row(
2465 with_colours=with_colours,
2466 with_decode_path=with_decode_path,
2467 decode_path_len_decrease=len(decode_path_only),
2469 for row in pp_console_blob(
2471 decode_path_len_decrease=len(decode_path_only),
2475 yield pp_console_row(
2480 with_colours=with_colours,
2481 with_decode_path=with_decode_path,
2482 decode_path_len_decrease=len(decode_path_only),
2485 for row in _pprint_pps(pp):
2487 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2490 ########################################################################
2491 # ASN.1 primitive types
2492 ########################################################################
2494 BooleanState = namedtuple(
2496 BasicState._fields + ("value",),
2502 """``BOOLEAN`` boolean type
2504 >>> b = Boolean(True)
2506 >>> b == Boolean(True)
2512 tag_default = tag_encode(1)
2513 asn1_type_name = "BOOLEAN"
2525 :param value: set the value. Either boolean type, or
2526 :py:class:`pyderasn.Boolean` object
2527 :param bytes impl: override default tag with ``IMPLICIT`` one
2528 :param bytes expl: override default tag with ``EXPLICIT`` one
2529 :param default: set default value. Type same as in ``value``
2530 :param bool optional: is object ``OPTIONAL`` in sequence
2532 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2533 self._value = None if value is None else self._value_sanitize(value)
2534 if default is not None:
2535 default = self._value_sanitize(default)
2536 self.default = self.__class__(
2542 self._value = default
2544 def _value_sanitize(self, value):
2545 if value.__class__ == bool:
2547 if issubclass(value.__class__, Boolean):
2549 raise InvalidValueType((self.__class__, bool))
2553 return self._value is not None
2555 def __getstate__(self):
2556 return BooleanState(
2572 def __setstate__(self, state):
2573 super(Boolean, self).__setstate__(state)
2574 self._value = state.value
2576 def __nonzero__(self):
2577 self._assert_ready()
2581 self._assert_ready()
2584 def __eq__(self, their):
2585 if their.__class__ == bool:
2586 return self._value == their
2587 if not issubclass(their.__class__, Boolean):
2590 self._value == their._value and
2591 self.tag == their.tag and
2592 self._expl == their._expl
2603 return self.__class__(
2605 impl=self.tag if impl is None else impl,
2606 expl=self._expl if expl is None else expl,
2607 default=self.default if default is None else default,
2608 optional=self.optional if optional is None else optional,
2612 self._assert_ready()
2613 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2615 def _encode1st(self, state):
2616 return len(self.tag) + 2, state
2618 def _encode2nd(self, writer, state_iter):
2619 self._assert_ready()
2620 write_full(writer, self._encode())
2622 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2624 t, _, lv = tag_strip(tlv)
2625 except DecodeError as err:
2626 raise err.__class__(
2628 klass=self.__class__,
2629 decode_path=decode_path,
2634 klass=self.__class__,
2635 decode_path=decode_path,
2642 l, _, v = len_decode(lv)
2643 except DecodeError as err:
2644 raise err.__class__(
2646 klass=self.__class__,
2647 decode_path=decode_path,
2651 raise InvalidLength(
2652 "Boolean's length must be equal to 1",
2653 klass=self.__class__,
2654 decode_path=decode_path,
2658 raise NotEnoughData(
2659 "encoded length is longer than data",
2660 klass=self.__class__,
2661 decode_path=decode_path,
2664 first_octet = byte2int(v)
2666 if first_octet == 0:
2668 elif first_octet == 0xFF:
2670 elif ctx.get("bered", False):
2675 "unacceptable Boolean value",
2676 klass=self.__class__,
2677 decode_path=decode_path,
2680 obj = self.__class__(
2684 default=self.default,
2685 optional=self.optional,
2686 _decoded=(offset, 1, 1),
2688 obj.ber_encoded = ber_encoded
2689 yield decode_path, obj, v[1:]
2692 return pp_console_row(next(self.pps()))
2694 def pps(self, decode_path=()):
2697 asn1_type_name=self.asn1_type_name,
2698 obj_name=self.__class__.__name__,
2699 decode_path=decode_path,
2700 value=str(self._value) if self.ready else None,
2701 optional=self.optional,
2702 default=self == self.default,
2703 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2704 expl=None if self._expl is None else tag_decode(self._expl),
2709 expl_offset=self.expl_offset if self.expled else None,
2710 expl_tlen=self.expl_tlen if self.expled else None,
2711 expl_llen=self.expl_llen if self.expled else None,
2712 expl_vlen=self.expl_vlen if self.expled else None,
2713 expl_lenindef=self.expl_lenindef,
2714 ber_encoded=self.ber_encoded,
2717 for pp in self.pps_lenindef(decode_path):
2721 IntegerState = namedtuple(
2723 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2729 """``INTEGER`` integer type
2731 >>> b = Integer(-123)
2733 >>> b == Integer(-123)
2738 >>> Integer(2, bounds=(1, 3))
2740 >>> Integer(5, bounds=(1, 3))
2741 Traceback (most recent call last):
2742 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2746 class Version(Integer):
2753 >>> v = Version("v1")
2760 {'v3': 2, 'v1': 0, 'v2': 1}
2762 __slots__ = ("specs", "_bound_min", "_bound_max")
2763 tag_default = tag_encode(2)
2764 asn1_type_name = "INTEGER"
2778 :param value: set the value. Either integer type, named value
2779 (if ``schema`` is specified in the class), or
2780 :py:class:`pyderasn.Integer` object
2781 :param bounds: set ``(MIN, MAX)`` value constraint.
2782 (-inf, +inf) by default
2783 :param bytes impl: override default tag with ``IMPLICIT`` one
2784 :param bytes expl: override default tag with ``EXPLICIT`` one
2785 :param default: set default value. Type same as in ``value``
2786 :param bool optional: is object ``OPTIONAL`` in sequence
2788 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2790 specs = getattr(self, "schema", {}) if _specs is None else _specs
2791 self.specs = specs if specs.__class__ == dict else dict(specs)
2792 self._bound_min, self._bound_max = getattr(
2795 (float("-inf"), float("+inf")),
2796 ) if bounds is None else bounds
2797 if value is not None:
2798 self._value = self._value_sanitize(value)
2799 if default is not None:
2800 default = self._value_sanitize(default)
2801 self.default = self.__class__(
2807 if self._value is None:
2808 self._value = default
2810 def _value_sanitize(self, value):
2811 if isinstance(value, integer_types):
2813 elif issubclass(value.__class__, Integer):
2814 value = value._value
2815 elif value.__class__ == str:
2816 value = self.specs.get(value)
2818 raise ObjUnknown("integer value: %s" % value)
2820 raise InvalidValueType((self.__class__, int, str))
2821 if not self._bound_min <= value <= self._bound_max:
2822 raise BoundsError(self._bound_min, value, self._bound_max)
2827 return self._value is not None
2829 def __getstate__(self):
2830 return IntegerState(
2849 def __setstate__(self, state):
2850 super(Integer, self).__setstate__(state)
2851 self.specs = state.specs
2852 self._value = state.value
2853 self._bound_min = state.bound_min
2854 self._bound_max = state.bound_max
2857 self._assert_ready()
2858 return int(self._value)
2861 """Hexadecimal representation
2863 Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2865 hex_repr = hex(int(self))[2:].upper()
2866 if len(hex_repr) % 2 != 0:
2867 hex_repr = "0" + hex_repr
2871 self._assert_ready()
2872 return hash(b"".join((
2874 bytes(self._expl or b""),
2875 str(self._value).encode("ascii"),
2878 def __eq__(self, their):
2879 if isinstance(their, integer_types):
2880 return self._value == their
2881 if not issubclass(their.__class__, Integer):
2884 self._value == their._value and
2885 self.tag == their.tag and
2886 self._expl == their._expl
2889 def __lt__(self, their):
2890 return self._value < their._value
2894 """Return named representation (if exists) of the value
2896 for name, value in iteritems(self.specs):
2897 if value == self._value:
2910 return self.__class__(
2913 (self._bound_min, self._bound_max)
2914 if bounds is None else bounds
2916 impl=self.tag if impl is None else impl,
2917 expl=self._expl if expl is None else expl,
2918 default=self.default if default is None else default,
2919 optional=self.optional if optional is None else optional,
2923 def _encode_payload(self):
2924 self._assert_ready()
2928 octets = bytearray([0])
2932 octets = bytearray()
2934 octets.append((value & 0xFF) ^ 0xFF)
2936 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2939 octets = bytearray()
2941 octets.append(value & 0xFF)
2943 if octets[-1] & 0x80 > 0:
2946 octets = bytes(octets)
2948 bytes_len = ceil(value.bit_length() / 8) or 1
2951 octets = value.to_bytes(
2956 except OverflowError:
2963 octets = self._encode_payload()
2964 return b"".join((self.tag, len_encode(len(octets)), octets))
2966 def _encode1st(self, state):
2967 l = len(self._encode_payload())
2968 return len(self.tag) + len_size(l) + l, state
2970 def _encode2nd(self, writer, state_iter):
2971 write_full(writer, self._encode())
2973 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2975 t, _, lv = tag_strip(tlv)
2976 except DecodeError as err:
2977 raise err.__class__(
2979 klass=self.__class__,
2980 decode_path=decode_path,
2985 klass=self.__class__,
2986 decode_path=decode_path,
2993 l, llen, v = len_decode(lv)
2994 except DecodeError as err:
2995 raise err.__class__(
2997 klass=self.__class__,
2998 decode_path=decode_path,
3002 raise NotEnoughData(
3003 "encoded length is longer than data",
3004 klass=self.__class__,
3005 decode_path=decode_path,
3009 raise NotEnoughData(
3011 klass=self.__class__,
3012 decode_path=decode_path,
3015 v, tail = v[:l], v[l:]
3016 first_octet = byte2int(v)
3018 second_octet = byte2int(v[1:])
3020 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3021 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3024 "non normalized integer",
3025 klass=self.__class__,
3026 decode_path=decode_path,
3031 if first_octet & 0x80 > 0:
3032 octets = bytearray()
3033 for octet in bytearray(v):
3034 octets.append(octet ^ 0xFF)
3035 for octet in octets:
3036 value = (value << 8) | octet
3040 for octet in bytearray(v):
3041 value = (value << 8) | octet
3043 value = int.from_bytes(v, byteorder="big", signed=True)
3045 obj = self.__class__(
3047 bounds=(self._bound_min, self._bound_max),
3050 default=self.default,
3051 optional=self.optional,
3053 _decoded=(offset, llen, l),
3055 except BoundsError as err:
3058 klass=self.__class__,
3059 decode_path=decode_path,
3062 yield decode_path, obj, tail
3065 return pp_console_row(next(self.pps()))
3067 def pps(self, decode_path=()):
3070 asn1_type_name=self.asn1_type_name,
3071 obj_name=self.__class__.__name__,
3072 decode_path=decode_path,
3073 value=(self.named or str(self._value)) if self.ready else None,
3074 optional=self.optional,
3075 default=self == self.default,
3076 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3077 expl=None if self._expl is None else tag_decode(self._expl),
3082 expl_offset=self.expl_offset if self.expled else None,
3083 expl_tlen=self.expl_tlen if self.expled else None,
3084 expl_llen=self.expl_llen if self.expled else None,
3085 expl_vlen=self.expl_vlen if self.expled else None,
3086 expl_lenindef=self.expl_lenindef,
3089 for pp in self.pps_lenindef(decode_path):
3093 BitStringState = namedtuple(
3095 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3100 class BitString(Obj):
3101 """``BIT STRING`` bit string type
3103 >>> BitString(b"hello world")
3104 BIT STRING 88 bits 68656c6c6f20776f726c64
3107 >>> b == b"hello world"
3112 >>> BitString("'0A3B5F291CD'H")
3113 BIT STRING 44 bits 0a3b5f291cd0
3114 >>> b = BitString("'010110000000'B")
3115 BIT STRING 12 bits 5800
3118 >>> b[0], b[1], b[2], b[3]
3119 (False, True, False, True)
3123 [False, True, False, True, True, False, False, False, False, False, False, False]
3127 class KeyUsage(BitString):
3129 ("digitalSignature", 0),
3130 ("nonRepudiation", 1),
3131 ("keyEncipherment", 2),
3134 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3135 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3137 ['nonRepudiation', 'keyEncipherment']
3139 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3143 Pay attention that BIT STRING can be encoded both in primitive
3144 and constructed forms. Decoder always checks constructed form tag
3145 additionally to specified primitive one. If BER decoding is
3146 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3147 of DER restrictions.
3149 __slots__ = ("tag_constructed", "specs", "defined")
3150 tag_default = tag_encode(3)
3151 asn1_type_name = "BIT STRING"
3164 :param value: set the value. Either binary type, tuple of named
3165 values (if ``schema`` is specified in the class),
3166 string in ``'XXX...'B`` form, or
3167 :py:class:`pyderasn.BitString` object
3168 :param bytes impl: override default tag with ``IMPLICIT`` one
3169 :param bytes expl: override default tag with ``EXPLICIT`` one
3170 :param default: set default value. Type same as in ``value``
3171 :param bool optional: is object ``OPTIONAL`` in sequence
3173 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3174 specs = getattr(self, "schema", {}) if _specs is None else _specs
3175 self.specs = specs if specs.__class__ == dict else dict(specs)
3176 self._value = None if value is None else self._value_sanitize(value)
3177 if default is not None:
3178 default = self._value_sanitize(default)
3179 self.default = self.__class__(
3185 self._value = default
3187 tag_klass, _, tag_num = tag_decode(self.tag)
3188 self.tag_constructed = tag_encode(
3190 form=TagFormConstructed,
3194 def _bits2octets(self, bits):
3195 if len(self.specs) > 0:
3196 bits = bits.rstrip("0")
3198 bits += "0" * ((8 - (bit_len % 8)) % 8)
3199 octets = bytearray(len(bits) // 8)
3200 for i in six_xrange(len(octets)):
3201 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3202 return bit_len, bytes(octets)
3204 def _value_sanitize(self, value):
3205 if isinstance(value, (string_types, binary_type)):
3207 isinstance(value, string_types) and
3208 value.startswith("'")
3210 if value.endswith("'B"):
3212 if not frozenset(value) <= SET01:
3213 raise ValueError("B's coding contains unacceptable chars")
3214 return self._bits2octets(value)
3215 if value.endswith("'H"):
3219 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3221 if value.__class__ == binary_type:
3222 return (len(value) * 8, value)
3223 raise InvalidValueType((self.__class__, string_types, binary_type))
3224 if value.__class__ == tuple:
3227 isinstance(value[0], integer_types) and
3228 value[1].__class__ == binary_type
3233 bit = self.specs.get(name)
3235 raise ObjUnknown("BitString value: %s" % name)
3238 return self._bits2octets("")
3239 bits = frozenset(bits)
3240 return self._bits2octets("".join(
3241 ("1" if bit in bits else "0")
3242 for bit in six_xrange(max(bits) + 1)
3244 if issubclass(value.__class__, BitString):
3246 raise InvalidValueType((self.__class__, binary_type, string_types))
3250 return self._value is not None
3252 def __getstate__(self):
3253 return BitStringState(
3268 self.tag_constructed,
3272 def __setstate__(self, state):
3273 super(BitString, self).__setstate__(state)
3274 self.specs = state.specs
3275 self._value = state.value
3276 self.tag_constructed = state.tag_constructed
3277 self.defined = state.defined
3280 self._assert_ready()
3281 for i in six_xrange(self._value[0]):
3286 """Returns number of bits in the string
3288 self._assert_ready()
3289 return self._value[0]
3291 def __bytes__(self):
3292 self._assert_ready()
3293 return self._value[1]
3295 def __eq__(self, their):
3296 if their.__class__ == bytes:
3297 return self._value[1] == their
3298 if not issubclass(their.__class__, BitString):
3301 self._value == their._value and
3302 self.tag == their.tag and
3303 self._expl == their._expl
3308 """Named representation (if exists) of the bits
3310 :returns: [str(name), ...]
3312 return [name for name, bit in iteritems(self.specs) if self[bit]]
3322 return self.__class__(
3324 impl=self.tag if impl is None else impl,
3325 expl=self._expl if expl is None else expl,
3326 default=self.default if default is None else default,
3327 optional=self.optional if optional is None else optional,
3331 def __getitem__(self, key):
3332 if key.__class__ == int:
3333 bit_len, octets = self._value
3337 byte2int(memoryview(octets)[key // 8:]) >>
3340 if isinstance(key, string_types):
3341 value = self.specs.get(key)
3343 raise ObjUnknown("BitString value: %s" % key)
3345 raise InvalidValueType((int, str))
3348 self._assert_ready()
3349 bit_len, octets = self._value
3352 len_encode(len(octets) + 1),
3353 int2byte((8 - bit_len % 8) % 8),
3357 def _encode1st(self, state):
3358 self._assert_ready()
3359 _, octets = self._value
3361 return len(self.tag) + len_size(l) + l, state
3363 def _encode2nd(self, writer, state_iter):
3364 bit_len, octets = self._value
3365 write_full(writer, b"".join((
3367 len_encode(len(octets) + 1),
3368 int2byte((8 - bit_len % 8) % 8),
3370 write_full(writer, octets)
3372 def _encode_cer(self, writer):
3373 bit_len, octets = self._value
3374 if len(octets) + 1 <= 1000:
3375 write_full(writer, self._encode())
3377 write_full(writer, self.tag_constructed)
3378 write_full(writer, LENINDEF)
3379 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3380 write_full(writer, b"".join((
3381 BitString.tag_default,
3384 octets[offset:offset + 999],
3386 tail = octets[offset + 999:]
3388 tail = int2byte((8 - bit_len % 8) % 8) + tail
3389 write_full(writer, b"".join((
3390 BitString.tag_default,
3391 len_encode(len(tail)),
3394 write_full(writer, EOC)
3396 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3398 t, tlen, lv = tag_strip(tlv)
3399 except DecodeError as err:
3400 raise err.__class__(
3402 klass=self.__class__,
3403 decode_path=decode_path,
3407 if tag_only: # pragma: no cover
3411 l, llen, v = len_decode(lv)
3412 except DecodeError as err:
3413 raise err.__class__(
3415 klass=self.__class__,
3416 decode_path=decode_path,
3420 raise NotEnoughData(
3421 "encoded length is longer than data",
3422 klass=self.__class__,
3423 decode_path=decode_path,
3427 raise NotEnoughData(
3429 klass=self.__class__,
3430 decode_path=decode_path,
3433 pad_size = byte2int(v)
3434 if l == 1 and pad_size != 0:
3436 "invalid empty value",
3437 klass=self.__class__,
3438 decode_path=decode_path,
3444 klass=self.__class__,
3445 decode_path=decode_path,
3448 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3451 klass=self.__class__,
3452 decode_path=decode_path,
3455 v, tail = v[:l], v[l:]
3456 bit_len = (len(v) - 1) * 8 - pad_size
3457 obj = self.__class__(
3458 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3461 default=self.default,
3462 optional=self.optional,
3464 _decoded=(offset, llen, l),
3467 obj._value = (bit_len, None)
3468 yield decode_path, obj, tail
3470 if t != self.tag_constructed:
3472 klass=self.__class__,
3473 decode_path=decode_path,
3476 if not ctx.get("bered", False):
3478 "unallowed BER constructed encoding",
3479 klass=self.__class__,
3480 decode_path=decode_path,
3483 if tag_only: # pragma: no cover
3488 l, llen, v = len_decode(lv)
3489 except LenIndefForm:
3490 llen, l, v = 1, 0, lv[1:]
3492 except DecodeError as err:
3493 raise err.__class__(
3495 klass=self.__class__,
3496 decode_path=decode_path,
3500 raise NotEnoughData(
3501 "encoded length is longer than data",
3502 klass=self.__class__,
3503 decode_path=decode_path,
3506 if not lenindef and l == 0:
3507 raise NotEnoughData(
3509 klass=self.__class__,
3510 decode_path=decode_path,
3514 sub_offset = offset + tlen + llen
3518 if v[:EOC_LEN].tobytes() == EOC:
3525 "chunk out of bounds",
3526 klass=self.__class__,
3527 decode_path=decode_path + (str(len(chunks) - 1),),
3528 offset=chunks[-1].offset,
3530 sub_decode_path = decode_path + (str(len(chunks)),)
3533 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3536 decode_path=sub_decode_path,
3539 _ctx_immutable=False,
3541 yield _decode_path, chunk, v_tail
3543 _, chunk, v_tail = next(BitString().decode_evgen(
3546 decode_path=sub_decode_path,
3549 _ctx_immutable=False,
3554 "expected BitString encoded chunk",
3555 klass=self.__class__,
3556 decode_path=sub_decode_path,
3559 chunks.append(chunk)
3560 sub_offset += chunk.tlvlen
3561 vlen += chunk.tlvlen
3563 if len(chunks) == 0:
3566 klass=self.__class__,
3567 decode_path=decode_path,
3572 for chunk_i, chunk in enumerate(chunks[:-1]):
3573 if chunk.bit_len % 8 != 0:
3575 "BitString chunk is not multiple of 8 bits",
3576 klass=self.__class__,
3577 decode_path=decode_path + (str(chunk_i),),
3578 offset=chunk.offset,
3581 values.append(bytes(chunk))
3582 bit_len += chunk.bit_len
3583 chunk_last = chunks[-1]
3585 values.append(bytes(chunk_last))
3586 bit_len += chunk_last.bit_len
3587 obj = self.__class__(
3588 value=None if evgen_mode else (bit_len, b"".join(values)),
3591 default=self.default,
3592 optional=self.optional,
3594 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3597 obj._value = (bit_len, None)
3598 obj.lenindef = lenindef
3599 obj.ber_encoded = True
3600 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3603 return pp_console_row(next(self.pps()))
3605 def pps(self, decode_path=()):
3609 bit_len, blob = self._value
3610 value = "%d bits" % bit_len
3611 if len(self.specs) > 0 and blob is not None:
3612 blob = tuple(self.named)
3615 asn1_type_name=self.asn1_type_name,
3616 obj_name=self.__class__.__name__,
3617 decode_path=decode_path,
3620 optional=self.optional,
3621 default=self == self.default,
3622 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3623 expl=None if self._expl is None else tag_decode(self._expl),
3628 expl_offset=self.expl_offset if self.expled else None,
3629 expl_tlen=self.expl_tlen if self.expled else None,
3630 expl_llen=self.expl_llen if self.expled else None,
3631 expl_vlen=self.expl_vlen if self.expled else None,
3632 expl_lenindef=self.expl_lenindef,
3633 lenindef=self.lenindef,
3634 ber_encoded=self.ber_encoded,
3637 defined_by, defined = self.defined or (None, None)
3638 if defined_by is not None:
3640 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3642 for pp in self.pps_lenindef(decode_path):
3646 OctetStringState = namedtuple(
3648 BasicState._fields + (
3659 class OctetString(Obj):
3660 """``OCTET STRING`` binary string type
3662 >>> s = OctetString(b"hello world")
3663 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3664 >>> s == OctetString(b"hello world")
3669 >>> OctetString(b"hello", bounds=(4, 4))
3670 Traceback (most recent call last):
3671 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3672 >>> OctetString(b"hell", bounds=(4, 4))
3673 OCTET STRING 4 bytes 68656c6c
3675 Memoryviews can be used as a values. If memoryview is made on
3676 mmap-ed file, then it does not take storage inside OctetString
3677 itself. In CER encoding mode it will be streamed to the specified
3678 writer, copying 1 KB chunks.
3680 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3681 tag_default = tag_encode(4)
3682 asn1_type_name = "OCTET STRING"
3683 evgen_mode_skip_value = True
3697 :param value: set the value. Either binary type, or
3698 :py:class:`pyderasn.OctetString` object
3699 :param bounds: set ``(MIN, MAX)`` value size constraint.
3700 (-inf, +inf) by default
3701 :param bytes impl: override default tag with ``IMPLICIT`` one
3702 :param bytes expl: override default tag with ``EXPLICIT`` one
3703 :param default: set default value. Type same as in ``value``
3704 :param bool optional: is object ``OPTIONAL`` in sequence
3706 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3708 self._bound_min, self._bound_max = getattr(
3712 ) if bounds is None else bounds
3713 if value is not None:
3714 self._value = self._value_sanitize(value)
3715 if default is not None:
3716 default = self._value_sanitize(default)
3717 self.default = self.__class__(
3722 if self._value is None:
3723 self._value = default
3725 tag_klass, _, tag_num = tag_decode(self.tag)
3726 self.tag_constructed = tag_encode(
3728 form=TagFormConstructed,
3732 def _value_sanitize(self, value):
3733 if value.__class__ == binary_type or value.__class__ == memoryview:
3735 elif issubclass(value.__class__, OctetString):
3736 value = value._value
3738 raise InvalidValueType((self.__class__, bytes, memoryview))
3739 if not self._bound_min <= len(value) <= self._bound_max:
3740 raise BoundsError(self._bound_min, len(value), self._bound_max)
3745 return self._value is not None
3747 def __getstate__(self):
3748 return OctetStringState(
3764 self.tag_constructed,
3768 def __setstate__(self, state):
3769 super(OctetString, self).__setstate__(state)
3770 self._value = state.value
3771 self._bound_min = state.bound_min
3772 self._bound_max = state.bound_max
3773 self.tag_constructed = state.tag_constructed
3774 self.defined = state.defined
3776 def __bytes__(self):
3777 self._assert_ready()
3778 return bytes(self._value)
3780 def __eq__(self, their):
3781 if their.__class__ == binary_type:
3782 return self._value == their
3783 if not issubclass(their.__class__, OctetString):
3786 self._value == their._value and
3787 self.tag == their.tag and
3788 self._expl == their._expl
3791 def __lt__(self, their):
3792 return self._value < their._value
3803 return self.__class__(
3806 (self._bound_min, self._bound_max)
3807 if bounds is None else bounds
3809 impl=self.tag if impl is None else impl,
3810 expl=self._expl if expl is None else expl,
3811 default=self.default if default is None else default,
3812 optional=self.optional if optional is None else optional,
3816 self._assert_ready()
3819 len_encode(len(self._value)),
3823 def _encode1st(self, state):
3824 self._assert_ready()
3825 l = len(self._value)
3826 return len(self.tag) + len_size(l) + l, state
3828 def _encode2nd(self, writer, state_iter):
3830 write_full(writer, self.tag + len_encode(len(value)))
3831 write_full(writer, value)
3833 def _encode_cer(self, writer):
3834 octets = self._value
3835 if len(octets) <= 1000:
3836 write_full(writer, self._encode())
3838 write_full(writer, self.tag_constructed)
3839 write_full(writer, LENINDEF)
3840 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3841 write_full(writer, b"".join((
3842 OctetString.tag_default,
3844 octets[offset:offset + 1000],
3846 tail = octets[offset + 1000:]
3848 write_full(writer, b"".join((
3849 OctetString.tag_default,
3850 len_encode(len(tail)),
3853 write_full(writer, EOC)
3855 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3857 t, tlen, lv = tag_strip(tlv)
3858 except DecodeError as err:
3859 raise err.__class__(
3861 klass=self.__class__,
3862 decode_path=decode_path,
3870 l, llen, v = len_decode(lv)
3871 except DecodeError as err:
3872 raise err.__class__(
3874 klass=self.__class__,
3875 decode_path=decode_path,
3879 raise NotEnoughData(
3880 "encoded length is longer than data",
3881 klass=self.__class__,
3882 decode_path=decode_path,
3885 v, tail = v[:l], v[l:]
3886 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3888 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3889 klass=self.__class__,
3890 decode_path=decode_path,
3894 obj = self.__class__(
3896 None if (evgen_mode and self.evgen_mode_skip_value)
3899 bounds=(self._bound_min, self._bound_max),
3902 default=self.default,
3903 optional=self.optional,
3904 _decoded=(offset, llen, l),
3907 except DecodeError as err:
3910 klass=self.__class__,
3911 decode_path=decode_path,
3914 except BoundsError as err:
3917 klass=self.__class__,
3918 decode_path=decode_path,
3921 yield decode_path, obj, tail
3923 if t != self.tag_constructed:
3925 klass=self.__class__,
3926 decode_path=decode_path,
3929 if not ctx.get("bered", False):
3931 "unallowed BER constructed encoding",
3932 klass=self.__class__,
3933 decode_path=decode_path,
3941 l, llen, v = len_decode(lv)
3942 except LenIndefForm:
3943 llen, l, v = 1, 0, lv[1:]
3945 except DecodeError as err:
3946 raise err.__class__(
3948 klass=self.__class__,
3949 decode_path=decode_path,
3953 raise NotEnoughData(
3954 "encoded length is longer than data",
3955 klass=self.__class__,
3956 decode_path=decode_path,
3961 sub_offset = offset + tlen + llen
3966 if v[:EOC_LEN].tobytes() == EOC:
3973 "chunk out of bounds",
3974 klass=self.__class__,
3975 decode_path=decode_path + (str(len(chunks) - 1),),
3976 offset=chunks[-1].offset,
3980 sub_decode_path = decode_path + (str(chunks_count),)
3981 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3984 decode_path=sub_decode_path,
3987 _ctx_immutable=False,
3989 yield _decode_path, chunk, v_tail
3990 if not chunk.ber_encoded:
3991 payload_len += chunk.vlen
3994 sub_decode_path = decode_path + (str(len(chunks)),)
3995 _, chunk, v_tail = next(OctetString().decode_evgen(
3998 decode_path=sub_decode_path,
4001 _ctx_immutable=False,
4004 chunks.append(chunk)
4007 "expected OctetString encoded chunk",
4008 klass=self.__class__,
4009 decode_path=sub_decode_path,
4012 sub_offset += chunk.tlvlen
4013 vlen += chunk.tlvlen
4015 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4017 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4018 klass=self.__class__,
4019 decode_path=decode_path,
4023 obj = self.__class__(
4025 None if evgen_mode else
4026 b"".join(bytes(chunk) for chunk in chunks)
4028 bounds=(self._bound_min, self._bound_max),
4031 default=self.default,
4032 optional=self.optional,
4033 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4036 except DecodeError as err:
4039 klass=self.__class__,
4040 decode_path=decode_path,
4043 except BoundsError as err:
4046 klass=self.__class__,
4047 decode_path=decode_path,
4050 obj.lenindef = lenindef
4051 obj.ber_encoded = True
4052 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4055 return pp_console_row(next(self.pps()))
4057 def pps(self, decode_path=()):
4060 asn1_type_name=self.asn1_type_name,
4061 obj_name=self.__class__.__name__,
4062 decode_path=decode_path,
4063 value=("%d bytes" % len(self._value)) if self.ready else None,
4064 blob=self._value if self.ready else None,
4065 optional=self.optional,
4066 default=self == self.default,
4067 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4068 expl=None if self._expl is None else tag_decode(self._expl),
4073 expl_offset=self.expl_offset if self.expled else None,
4074 expl_tlen=self.expl_tlen if self.expled else None,
4075 expl_llen=self.expl_llen if self.expled else None,
4076 expl_vlen=self.expl_vlen if self.expled else None,
4077 expl_lenindef=self.expl_lenindef,
4078 lenindef=self.lenindef,
4079 ber_encoded=self.ber_encoded,
4082 defined_by, defined = self.defined or (None, None)
4083 if defined_by is not None:
4085 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4087 for pp in self.pps_lenindef(decode_path):
4091 def agg_octet_string(evgens, decode_path, raw, writer):
4092 """Aggregate constructed string (OctetString and its derivatives)
4094 :param evgens: iterator of generated events
4095 :param decode_path: points to the string we want to decode
4096 :param raw: slicebable (memoryview, bytearray, etc) with
4097 the data evgens are generated on
4098 :param writer: buffer.write where string is going to be saved
4099 :param writer: where string is going to be saved. Must comply
4100 with ``io.RawIOBase.write`` behaviour
4102 .. seealso:: :ref:`agg_octet_string`
4104 decode_path_len = len(decode_path)
4105 for dp, obj, _ in evgens:
4106 if dp[:decode_path_len] != decode_path:
4108 if not obj.ber_encoded:
4109 write_full(writer, raw[
4110 obj.offset + obj.tlen + obj.llen:
4111 obj.offset + obj.tlen + obj.llen + obj.vlen -
4112 (EOC_LEN if obj.expl_lenindef else 0)
4114 if len(dp) == decode_path_len:
4118 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4122 """``NULL`` null object
4130 tag_default = tag_encode(5)
4131 asn1_type_name = "NULL"
4135 value=None, # unused, but Sequence passes it
4142 :param bytes impl: override default tag with ``IMPLICIT`` one
4143 :param bytes expl: override default tag with ``EXPLICIT`` one
4144 :param bool optional: is object ``OPTIONAL`` in sequence
4146 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4153 def __getstate__(self):
4169 def __eq__(self, their):
4170 if not issubclass(their.__class__, Null):
4173 self.tag == their.tag and
4174 self._expl == their._expl
4184 return self.__class__(
4185 impl=self.tag if impl is None else impl,
4186 expl=self._expl if expl is None else expl,
4187 optional=self.optional if optional is None else optional,
4191 return self.tag + LEN0
4193 def _encode1st(self, state):
4194 return len(self.tag) + 1, state
4196 def _encode2nd(self, writer, state_iter):
4197 write_full(writer, self.tag + LEN0)
4199 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4201 t, _, lv = tag_strip(tlv)
4202 except DecodeError as err:
4203 raise err.__class__(
4205 klass=self.__class__,
4206 decode_path=decode_path,
4211 klass=self.__class__,
4212 decode_path=decode_path,
4215 if tag_only: # pragma: no cover
4219 l, _, v = len_decode(lv)
4220 except DecodeError as err:
4221 raise err.__class__(
4223 klass=self.__class__,
4224 decode_path=decode_path,
4228 raise InvalidLength(
4229 "Null must have zero length",
4230 klass=self.__class__,
4231 decode_path=decode_path,
4234 obj = self.__class__(
4237 optional=self.optional,
4238 _decoded=(offset, 1, 0),
4240 yield decode_path, obj, v
4243 return pp_console_row(next(self.pps()))
4245 def pps(self, decode_path=()):
4248 asn1_type_name=self.asn1_type_name,
4249 obj_name=self.__class__.__name__,
4250 decode_path=decode_path,
4251 optional=self.optional,
4252 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4253 expl=None if self._expl is None else tag_decode(self._expl),
4258 expl_offset=self.expl_offset if self.expled else None,
4259 expl_tlen=self.expl_tlen if self.expled else None,
4260 expl_llen=self.expl_llen if self.expled else None,
4261 expl_vlen=self.expl_vlen if self.expled else None,
4262 expl_lenindef=self.expl_lenindef,
4265 for pp in self.pps_lenindef(decode_path):
4269 ObjectIdentifierState = namedtuple(
4270 "ObjectIdentifierState",
4271 BasicState._fields + ("value", "defines"),
4276 class ObjectIdentifier(Obj):
4277 """``OBJECT IDENTIFIER`` OID type
4279 >>> oid = ObjectIdentifier((1, 2, 3))
4280 OBJECT IDENTIFIER 1.2.3
4281 >>> oid == ObjectIdentifier("1.2.3")
4287 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4288 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4290 >>> str(ObjectIdentifier((3, 1)))
4291 Traceback (most recent call last):
4292 pyderasn.InvalidOID: unacceptable first arc value
4294 __slots__ = ("defines",)
4295 tag_default = tag_encode(6)
4296 asn1_type_name = "OBJECT IDENTIFIER"
4309 :param value: set the value. Either tuples of integers,
4310 string of "."-concatenated integers, or
4311 :py:class:`pyderasn.ObjectIdentifier` object
4312 :param defines: sequence of tuples. Each tuple has two elements.
4313 First one is relative to current one decode
4314 path, aiming to the field defined by that OID.
4315 Read about relative path in
4316 :py:func:`pyderasn.abs_decode_path`. Second
4317 tuple element is ``{OID: pyderasn.Obj()}``
4318 dictionary, mapping between current OID value
4319 and structure applied to defined field.
4321 .. seealso:: :ref:`definedby`
4323 :param bytes impl: override default tag with ``IMPLICIT`` one
4324 :param bytes expl: override default tag with ``EXPLICIT`` one
4325 :param default: set default value. Type same as in ``value``
4326 :param bool optional: is object ``OPTIONAL`` in sequence
4328 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4330 if value is not None:
4331 self._value = self._value_sanitize(value)
4332 if default is not None:
4333 default = self._value_sanitize(default)
4334 self.default = self.__class__(
4339 if self._value is None:
4340 self._value = default
4341 self.defines = defines
4343 def __add__(self, their):
4344 if their.__class__ == tuple:
4345 return self.__class__(self._value + array("L", their))
4346 if isinstance(their, self.__class__):
4347 return self.__class__(self._value + their._value)
4348 raise InvalidValueType((self.__class__, tuple))
4350 def _value_sanitize(self, value):
4351 if issubclass(value.__class__, ObjectIdentifier):
4353 if isinstance(value, string_types):
4355 value = array("L", (pureint(arc) for arc in value.split(".")))
4357 raise InvalidOID("unacceptable arcs values")
4358 if value.__class__ == tuple:
4360 value = array("L", value)
4361 except OverflowError as err:
4362 raise InvalidOID(repr(err))
4363 if value.__class__ is array:
4365 raise InvalidOID("less than 2 arcs")
4366 first_arc = value[0]
4367 if first_arc in (0, 1):
4368 if not (0 <= value[1] <= 39):
4369 raise InvalidOID("second arc is too wide")
4370 elif first_arc == 2:
4373 raise InvalidOID("unacceptable first arc value")
4374 if not all(arc >= 0 for arc in value):
4375 raise InvalidOID("negative arc value")
4377 raise InvalidValueType((self.__class__, str, tuple))
4381 return self._value is not None
4383 def __getstate__(self):
4384 return ObjectIdentifierState(
4401 def __setstate__(self, state):
4402 super(ObjectIdentifier, self).__setstate__(state)
4403 self._value = state.value
4404 self.defines = state.defines
4407 self._assert_ready()
4408 return iter(self._value)
4411 return ".".join(str(arc) for arc in self._value or ())
4414 self._assert_ready()
4415 return hash(b"".join((
4417 bytes(self._expl or b""),
4418 str(self._value).encode("ascii"),
4421 def __eq__(self, their):
4422 if their.__class__ == tuple:
4423 return self._value == array("L", their)
4424 if not issubclass(their.__class__, ObjectIdentifier):
4427 self.tag == their.tag and
4428 self._expl == their._expl and
4429 self._value == their._value
4432 def __lt__(self, their):
4433 return self._value < their._value
4444 return self.__class__(
4446 defines=self.defines if defines is None else defines,
4447 impl=self.tag if impl is None else impl,
4448 expl=self._expl if expl is None else expl,
4449 default=self.default if default is None else default,
4450 optional=self.optional if optional is None else optional,
4453 def _encode_octets(self):
4454 self._assert_ready()
4456 first_value = value[1]
4457 first_arc = value[0]
4460 elif first_arc == 1:
4462 elif first_arc == 2:
4464 else: # pragma: no cover
4465 raise RuntimeError("invalid arc is stored")
4466 octets = [zero_ended_encode(first_value)]
4467 for arc in value[2:]:
4468 octets.append(zero_ended_encode(arc))
4469 return b"".join(octets)
4472 v = self._encode_octets()
4473 return b"".join((self.tag, len_encode(len(v)), v))
4475 def _encode1st(self, state):
4476 l = len(self._encode_octets())
4477 return len(self.tag) + len_size(l) + l, state
4479 def _encode2nd(self, writer, state_iter):
4480 write_full(writer, self._encode())
4482 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4484 t, _, lv = tag_strip(tlv)
4485 except DecodeError as err:
4486 raise err.__class__(
4488 klass=self.__class__,
4489 decode_path=decode_path,
4494 klass=self.__class__,
4495 decode_path=decode_path,
4498 if tag_only: # pragma: no cover
4502 l, llen, v = len_decode(lv)
4503 except DecodeError as err:
4504 raise err.__class__(
4506 klass=self.__class__,
4507 decode_path=decode_path,
4511 raise NotEnoughData(
4512 "encoded length is longer than data",
4513 klass=self.__class__,
4514 decode_path=decode_path,
4518 raise NotEnoughData(
4520 klass=self.__class__,
4521 decode_path=decode_path,
4524 v, tail = v[:l], v[l:]
4531 octet = indexbytes(v, i)
4532 if i == 0 and octet == 0x80:
4533 if ctx.get("bered", False):
4537 "non normalized arc encoding",
4538 klass=self.__class__,
4539 decode_path=decode_path,
4542 arc = (arc << 7) | (octet & 0x7F)
4543 if octet & 0x80 == 0:
4546 except OverflowError:
4548 "too huge value for local unsigned long",
4549 klass=self.__class__,
4550 decode_path=decode_path,
4559 klass=self.__class__,
4560 decode_path=decode_path,
4564 second_arc = arcs[0]
4565 if 0 <= second_arc <= 39:
4567 elif 40 <= second_arc <= 79:
4573 obj = self.__class__(
4574 value=array("L", (first_arc, second_arc)) + arcs[1:],
4577 default=self.default,
4578 optional=self.optional,
4579 _decoded=(offset, llen, l),
4582 obj.ber_encoded = True
4583 yield decode_path, obj, tail
4586 return pp_console_row(next(self.pps()))
4588 def pps(self, decode_path=()):
4591 asn1_type_name=self.asn1_type_name,
4592 obj_name=self.__class__.__name__,
4593 decode_path=decode_path,
4594 value=str(self) if self.ready else None,
4595 optional=self.optional,
4596 default=self == self.default,
4597 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4598 expl=None if self._expl is None else tag_decode(self._expl),
4603 expl_offset=self.expl_offset if self.expled else None,
4604 expl_tlen=self.expl_tlen if self.expled else None,
4605 expl_llen=self.expl_llen if self.expled else None,
4606 expl_vlen=self.expl_vlen if self.expled else None,
4607 expl_lenindef=self.expl_lenindef,
4608 ber_encoded=self.ber_encoded,
4611 for pp in self.pps_lenindef(decode_path):
4615 class Enumerated(Integer):
4616 """``ENUMERATED`` integer type
4618 This type is identical to :py:class:`pyderasn.Integer`, but requires
4619 schema to be specified and does not accept values missing from it.
4622 tag_default = tag_encode(10)
4623 asn1_type_name = "ENUMERATED"
4634 bounds=None, # dummy argument, workability for Integer.decode
4636 super(Enumerated, self).__init__(
4637 value, bounds, impl, expl, default, optional, _specs, _decoded,
4639 if len(self.specs) == 0:
4640 raise ValueError("schema must be specified")
4642 def _value_sanitize(self, value):
4643 if isinstance(value, self.__class__):
4644 value = value._value
4645 elif isinstance(value, integer_types):
4646 for _value in itervalues(self.specs):
4651 "unknown integer value: %s" % value,
4652 klass=self.__class__,
4654 elif isinstance(value, string_types):
4655 value = self.specs.get(value)
4657 raise ObjUnknown("integer value: %s" % value)
4659 raise InvalidValueType((self.__class__, int, str))
4671 return self.__class__(
4673 impl=self.tag if impl is None else impl,
4674 expl=self._expl if expl is None else expl,
4675 default=self.default if default is None else default,
4676 optional=self.optional if optional is None else optional,
4681 def escape_control_unicode(c):
4682 if unicat(c)[0] == "C":
4683 c = repr(c).lstrip("u").strip("'")
4687 class CommonString(OctetString):
4688 """Common class for all strings
4690 Everything resembles :py:class:`pyderasn.OctetString`, except
4691 ability to deal with unicode text strings.
4693 >>> hexenc("привет мир".encode("utf-8"))
4694 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4695 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4697 >>> s = UTF8String("привет мир")
4698 UTF8String UTF8String привет мир
4700 'привет мир'
4701 >>> hexenc(bytes(s))
4702 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4704 >>> PrintableString("привет мир")
4705 Traceback (most recent call last):
4706 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4708 >>> BMPString("ада", bounds=(2, 2))
4709 Traceback (most recent call last):
4710 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4711 >>> s = BMPString("ад", bounds=(2, 2))
4714 >>> hexenc(bytes(s))
4721 - Text Encoding, validation
4722 * - :py:class:`pyderasn.UTF8String`
4724 * - :py:class:`pyderasn.NumericString`
4725 - proper alphabet validation
4726 * - :py:class:`pyderasn.PrintableString`
4727 - proper alphabet validation
4728 * - :py:class:`pyderasn.TeletexString`
4730 * - :py:class:`pyderasn.T61String`
4732 * - :py:class:`pyderasn.VideotexString`
4734 * - :py:class:`pyderasn.IA5String`
4735 - proper alphabet validation
4736 * - :py:class:`pyderasn.GraphicString`
4738 * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4739 - proper alphabet validation
4740 * - :py:class:`pyderasn.GeneralString`
4742 * - :py:class:`pyderasn.UniversalString`
4744 * - :py:class:`pyderasn.BMPString`
4749 def _value_sanitize(self, value):
4751 value_decoded = None
4752 if isinstance(value, self.__class__):
4753 value_raw = value._value
4754 elif value.__class__ == text_type:
4755 value_decoded = value
4756 elif value.__class__ == binary_type:
4759 raise InvalidValueType((self.__class__, text_type, binary_type))
4762 value_decoded.encode(self.encoding)
4763 if value_raw is None else value_raw
4766 value_raw.decode(self.encoding)
4767 if value_decoded is None else value_decoded
4769 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4770 raise DecodeError(str(err))
4771 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4779 def __eq__(self, their):
4780 if their.__class__ == binary_type:
4781 return self._value == their
4782 if their.__class__ == text_type:
4783 return self._value == their.encode(self.encoding)
4784 if not isinstance(their, self.__class__):
4787 self._value == their._value and
4788 self.tag == their.tag and
4789 self._expl == their._expl
4792 def __unicode__(self):
4794 return self._value.decode(self.encoding)
4795 return text_type(self._value)
4798 return pp_console_row(next(self.pps(no_unicode=PY2)))
4800 def pps(self, decode_path=(), no_unicode=False):
4804 hexenc(bytes(self)) if no_unicode else
4805 "".join(escape_control_unicode(c) for c in self.__unicode__())
4809 asn1_type_name=self.asn1_type_name,
4810 obj_name=self.__class__.__name__,
4811 decode_path=decode_path,
4813 optional=self.optional,
4814 default=self == self.default,
4815 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4816 expl=None if self._expl is None else tag_decode(self._expl),
4821 expl_offset=self.expl_offset if self.expled else None,
4822 expl_tlen=self.expl_tlen if self.expled else None,
4823 expl_llen=self.expl_llen if self.expled else None,
4824 expl_vlen=self.expl_vlen if self.expled else None,
4825 expl_lenindef=self.expl_lenindef,
4826 ber_encoded=self.ber_encoded,
4829 for pp in self.pps_lenindef(decode_path):
4833 class UTF8String(CommonString):
4835 tag_default = tag_encode(12)
4837 asn1_type_name = "UTF8String"
4840 class AllowableCharsMixin(object):
4842 def allowable_chars(self):
4844 return self._allowable_chars
4845 return frozenset(six_unichr(c) for c in self._allowable_chars)
4847 def _value_sanitize(self, value):
4848 value = super(AllowableCharsMixin, self)._value_sanitize(value)
4849 if not frozenset(value) <= self._allowable_chars:
4850 raise DecodeError("non satisfying alphabet value")
4854 class NumericString(AllowableCharsMixin, CommonString):
4857 Its value is properly sanitized: only ASCII digits with spaces can
4860 >>> NumericString().allowable_chars
4861 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4864 tag_default = tag_encode(18)
4866 asn1_type_name = "NumericString"
4867 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4870 PrintableStringState = namedtuple(
4871 "PrintableStringState",
4872 OctetStringState._fields + ("allowable_chars",),
4877 class PrintableString(AllowableCharsMixin, CommonString):
4880 Its value is properly sanitized: see X.680 41.4 table 10.
4882 >>> PrintableString().allowable_chars
4883 frozenset([' ', "'", ..., 'z'])
4884 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4885 PrintableString PrintableString foo*bar
4886 >>> obj.allow_asterisk, obj.allow_ampersand
4890 tag_default = tag_encode(19)
4892 asn1_type_name = "PrintableString"
4893 _allowable_chars = frozenset(
4894 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4896 _asterisk = frozenset("*".encode("ascii"))
4897 _ampersand = frozenset("&".encode("ascii"))
4909 allow_asterisk=False,
4910 allow_ampersand=False,
4913 :param allow_asterisk: allow asterisk character
4914 :param allow_ampersand: allow ampersand character
4917 self._allowable_chars |= self._asterisk
4919 self._allowable_chars |= self._ampersand
4920 super(PrintableString, self).__init__(
4921 value, bounds, impl, expl, default, optional, _decoded, ctx,
4925 def allow_asterisk(self):
4926 """Is asterisk character allowed?
4928 return self._asterisk <= self._allowable_chars
4931 def allow_ampersand(self):
4932 """Is ampersand character allowed?
4934 return self._ampersand <= self._allowable_chars
4936 def __getstate__(self):
4937 return PrintableStringState(
4938 *super(PrintableString, self).__getstate__(),
4939 **{"allowable_chars": self._allowable_chars}
4942 def __setstate__(self, state):
4943 super(PrintableString, self).__setstate__(state)
4944 self._allowable_chars = state.allowable_chars
4955 return self.__class__(
4958 (self._bound_min, self._bound_max)
4959 if bounds is None else bounds
4961 impl=self.tag if impl is None else impl,
4962 expl=self._expl if expl is None else expl,
4963 default=self.default if default is None else default,
4964 optional=self.optional if optional is None else optional,
4965 allow_asterisk=self.allow_asterisk,
4966 allow_ampersand=self.allow_ampersand,
4970 class TeletexString(CommonString):
4972 tag_default = tag_encode(20)
4973 encoding = "iso-8859-1"
4974 asn1_type_name = "TeletexString"
4977 class T61String(TeletexString):
4979 asn1_type_name = "T61String"
4982 class VideotexString(CommonString):
4984 tag_default = tag_encode(21)
4985 encoding = "iso-8859-1"
4986 asn1_type_name = "VideotexString"
4989 class IA5String(AllowableCharsMixin, CommonString):
4992 Its value is properly sanitized: it is a mix of
4994 * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4995 * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
4996 * DEL character (0x7F)
4998 It is just 7-bit ASCII.
5000 >>> IA5String().allowable_chars
5001 frozenset(["NUL", ... "DEL"])
5004 tag_default = tag_encode(22)
5006 asn1_type_name = "IA5"
5007 _allowable_chars = frozenset(b"".join(
5008 six_unichr(c).encode("ascii") for c in six_xrange(128)
5012 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5013 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5014 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5015 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5016 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5017 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5020 class VisibleString(AllowableCharsMixin, CommonString):
5023 Its value is properly sanitized. ASCII subset from space to tilde is
5024 allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5026 >>> VisibleString().allowable_chars
5027 frozenset([" ", ... "~"])
5030 tag_default = tag_encode(26)
5032 asn1_type_name = "VisibleString"
5033 _allowable_chars = frozenset(b"".join(
5034 six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5038 class ISO646String(VisibleString):
5040 asn1_type_name = "ISO646String"
5043 UTCTimeState = namedtuple(
5045 OctetStringState._fields + ("ber_raw",),
5050 def str_to_time_fractions(value):
5052 year, v = (v // 10**10), (v % 10**10)
5053 month, v = (v // 10**8), (v % 10**8)
5054 day, v = (v // 10**6), (v % 10**6)
5055 hour, v = (v // 10**4), (v % 10**4)
5056 minute, second = (v // 100), (v % 100)
5057 return year, month, day, hour, minute, second
5060 class UTCTime(VisibleString):
5061 """``UTCTime`` datetime type
5063 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5064 UTCTime UTCTime 2017-09-30T22:07:50
5070 datetime.datetime(2017, 9, 30, 22, 7, 50)
5071 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5072 datetime.datetime(1957, 9, 30, 22, 7, 50)
5074 If BER encoded value was met, then ``ber_raw`` attribute will hold
5075 its raw representation.
5079 Pay attention that UTCTime can not hold full year, so all years
5080 having < 50 years are treated as 20xx, 19xx otherwise, according
5081 to X.509 recommendation.
5085 No strict validation of UTC offsets are made, but very crude:
5087 * minutes are not exceeding 60
5088 * offset value is not exceeding 14 hours
5090 __slots__ = ("ber_raw",)
5091 tag_default = tag_encode(23)
5093 asn1_type_name = "UTCTime"
5094 evgen_mode_skip_value = False
5104 bounds=None, # dummy argument, workability for OctetString.decode
5108 :param value: set the value. Either datetime type, or
5109 :py:class:`pyderasn.UTCTime` object
5110 :param bytes impl: override default tag with ``IMPLICIT`` one
5111 :param bytes expl: override default tag with ``EXPLICIT`` one
5112 :param default: set default value. Type same as in ``value``
5113 :param bool optional: is object ``OPTIONAL`` in sequence
5115 super(UTCTime, self).__init__(
5116 None, None, impl, expl, None, optional, _decoded, ctx,
5120 if value is not None:
5121 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5122 self.ber_encoded = self.ber_raw is not None
5123 if default is not None:
5124 default, _ = self._value_sanitize(default)
5125 self.default = self.__class__(
5130 if self._value is None:
5131 self._value = default
5133 self.optional = optional
5135 def _strptime_bered(self, value):
5136 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5139 raise ValueError("no timezone")
5140 year += 2000 if year < 50 else 1900
5141 decoded = datetime(year, month, day, hour, minute)
5143 if value[-1] == "Z":
5147 raise ValueError("invalid UTC offset")
5148 if value[-5] == "-":
5150 elif value[-5] == "+":
5153 raise ValueError("invalid UTC offset")
5154 v = pureint(value[-4:])
5155 offset, v = (60 * (v % 100)), v // 100
5157 raise ValueError("invalid UTC offset minutes")
5159 if offset > 14 * 3600:
5160 raise ValueError("too big UTC offset")
5164 return offset, decoded
5166 raise ValueError("invalid UTC offset seconds")
5167 seconds = pureint(value)
5169 raise ValueError("invalid seconds value")
5170 return offset, decoded + timedelta(seconds=seconds)
5172 def _strptime(self, value):
5173 # datetime.strptime's format: %y%m%d%H%M%SZ
5174 if len(value) != LEN_YYMMDDHHMMSSZ:
5175 raise ValueError("invalid UTCTime length")
5176 if value[-1] != "Z":
5177 raise ValueError("non UTC timezone")
5178 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5179 year += 2000 if year < 50 else 1900
5180 return datetime(year, month, day, hour, minute, second)
5182 def _dt_sanitize(self, value):
5183 if value.year < 1950 or value.year > 2049:
5184 raise ValueError("UTCTime can hold only 1950-2049 years")
5185 return value.replace(microsecond=0)
5187 def _value_sanitize(self, value, ctx=None):
5188 if value.__class__ == binary_type:
5190 value_decoded = value.decode("ascii")
5191 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5192 raise DecodeError("invalid UTCTime encoding: %r" % err)
5195 return self._strptime(value_decoded), None
5196 except (TypeError, ValueError) as _err:
5198 if (ctx is not None) and ctx.get("bered", False):
5200 offset, _value = self._strptime_bered(value_decoded)
5201 _value = _value - timedelta(seconds=offset)
5202 return self._dt_sanitize(_value), value
5203 except (TypeError, ValueError, OverflowError) as _err:
5206 "invalid %s format: %r" % (self.asn1_type_name, err),
5207 klass=self.__class__,
5209 if isinstance(value, self.__class__):
5210 return value._value, None
5211 if value.__class__ == datetime:
5212 return self._dt_sanitize(value), None
5213 raise InvalidValueType((self.__class__, datetime))
5215 def _pp_value(self):
5217 value = self._value.isoformat()
5218 if self.ber_encoded:
5219 value += " (%s)" % self.ber_raw
5223 def __unicode__(self):
5225 value = self._value.isoformat()
5226 if self.ber_encoded:
5227 value += " (%s)" % self.ber_raw
5229 return text_type(self._pp_value())
5231 def __getstate__(self):
5232 return UTCTimeState(
5233 *super(UTCTime, self).__getstate__(),
5234 **{"ber_raw": self.ber_raw}
5237 def __setstate__(self, state):
5238 super(UTCTime, self).__setstate__(state)
5239 self.ber_raw = state.ber_raw
5241 def __bytes__(self):
5242 self._assert_ready()
5243 return self._encode_time()
5245 def __eq__(self, their):
5246 if their.__class__ == binary_type:
5247 return self._encode_time() == their
5248 if their.__class__ == datetime:
5249 return self.todatetime() == their
5250 if not isinstance(their, self.__class__):
5253 self._value == their._value and
5254 self.tag == their.tag and
5255 self._expl == their._expl
5258 def _encode_time(self):
5259 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5262 self._assert_ready()
5263 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5265 def _encode1st(self, state):
5266 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5268 def _encode2nd(self, writer, state_iter):
5269 self._assert_ready()
5270 write_full(writer, self._encode())
5272 def _encode_cer(self, writer):
5273 write_full(writer, self._encode())
5275 def todatetime(self):
5279 return pp_console_row(next(self.pps()))
5281 def pps(self, decode_path=()):
5284 asn1_type_name=self.asn1_type_name,
5285 obj_name=self.__class__.__name__,
5286 decode_path=decode_path,
5287 value=self._pp_value(),
5288 optional=self.optional,
5289 default=self == self.default,
5290 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5291 expl=None if self._expl is None else tag_decode(self._expl),
5296 expl_offset=self.expl_offset if self.expled else None,
5297 expl_tlen=self.expl_tlen if self.expled else None,
5298 expl_llen=self.expl_llen if self.expled else None,
5299 expl_vlen=self.expl_vlen if self.expled else None,
5300 expl_lenindef=self.expl_lenindef,
5301 ber_encoded=self.ber_encoded,
5304 for pp in self.pps_lenindef(decode_path):
5308 class GeneralizedTime(UTCTime):
5309 """``GeneralizedTime`` datetime type
5311 This type is similar to :py:class:`pyderasn.UTCTime`.
5313 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5314 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5316 '20170930220750.000123Z'
5317 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5318 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5322 Only microsecond fractions are supported in DER encoding.
5323 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5324 higher precision values.
5328 BER encoded data can loss information (accuracy) during decoding
5329 because of float transformations.
5333 Local times (without explicit timezone specification) are treated
5334 as UTC one, no transformations are made.
5338 Zero year is unsupported.
5341 tag_default = tag_encode(24)
5342 asn1_type_name = "GeneralizedTime"
5344 def _dt_sanitize(self, value):
5347 def _strptime_bered(self, value):
5348 if len(value) < 4 + 3 * 2:
5349 raise ValueError("invalid GeneralizedTime")
5350 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5351 decoded = datetime(year, month, day, hour)
5352 offset, value = 0, value[10:]
5354 return offset, decoded
5355 if value[-1] == "Z":
5358 for char, sign in (("-", -1), ("+", 1)):
5359 idx = value.rfind(char)
5362 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5363 v = pureint(offset_raw)
5364 if len(offset_raw) == 4:
5365 offset, v = (60 * (v % 100)), v // 100
5367 raise ValueError("invalid UTC offset minutes")
5368 elif len(offset_raw) == 2:
5371 raise ValueError("invalid UTC offset")
5373 if offset > 14 * 3600:
5374 raise ValueError("too big UTC offset")
5378 return offset, decoded
5379 if value[0] in DECIMAL_SIGNS:
5381 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5384 raise ValueError("stripped minutes")
5385 decoded += timedelta(seconds=60 * pureint(value[:2]))
5388 return offset, decoded
5389 if value[0] in DECIMAL_SIGNS:
5391 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5394 raise ValueError("stripped seconds")
5395 decoded += timedelta(seconds=pureint(value[:2]))
5398 return offset, decoded
5399 if value[0] not in DECIMAL_SIGNS:
5400 raise ValueError("invalid format after seconds")
5402 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5405 def _strptime(self, value):
5407 if l == LEN_YYYYMMDDHHMMSSZ:
5408 # datetime.strptime's format: %Y%m%d%H%M%SZ
5409 if value[-1] != "Z":
5410 raise ValueError("non UTC timezone")
5411 return datetime(*str_to_time_fractions(value[:-1]))
5412 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5413 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5414 if value[-1] != "Z":
5415 raise ValueError("non UTC timezone")
5416 if value[14] != ".":
5417 raise ValueError("no fractions separator")
5420 raise ValueError("trailing zero")
5423 raise ValueError("only microsecond fractions are supported")
5424 us = pureint(us + ("0" * (6 - us_len)))
5425 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5426 return datetime(year, month, day, hour, minute, second, us)
5427 raise ValueError("invalid GeneralizedTime length")
5429 def _encode_time(self):
5431 encoded = value.strftime("%Y%m%d%H%M%S")
5432 if value.microsecond > 0:
5433 encoded += (".%06d" % value.microsecond).rstrip("0")
5434 return (encoded + "Z").encode("ascii")
5437 self._assert_ready()
5439 if value.microsecond > 0:
5440 encoded = self._encode_time()
5441 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5442 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5444 def _encode1st(self, state):
5445 self._assert_ready()
5446 vlen = len(self._encode_time())
5447 return len(self.tag) + len_size(vlen) + vlen, state
5449 def _encode2nd(self, writer, state_iter):
5450 write_full(writer, self._encode())
5453 class GraphicString(CommonString):
5455 tag_default = tag_encode(25)
5456 encoding = "iso-8859-1"
5457 asn1_type_name = "GraphicString"
5460 class GeneralString(CommonString):
5462 tag_default = tag_encode(27)
5463 encoding = "iso-8859-1"
5464 asn1_type_name = "GeneralString"
5467 class UniversalString(CommonString):
5469 tag_default = tag_encode(28)
5470 encoding = "utf-32-be"
5471 asn1_type_name = "UniversalString"
5474 class BMPString(CommonString):
5476 tag_default = tag_encode(30)
5477 encoding = "utf-16-be"
5478 asn1_type_name = "BMPString"
5481 ChoiceState = namedtuple(
5483 BasicState._fields + ("specs", "value",),
5489 """``CHOICE`` special type
5493 class GeneralName(Choice):
5495 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5496 ("dNSName", IA5String(impl=tag_ctxp(2))),
5499 >>> gn = GeneralName()
5501 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5502 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5503 >>> gn["dNSName"] = IA5String("bar.baz")
5504 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5505 >>> gn["rfc822Name"]
5508 [2] IA5String IA5 bar.baz
5511 >>> gn.value == gn["dNSName"]
5514 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5516 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5517 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5519 __slots__ = ("specs",)
5521 asn1_type_name = "CHOICE"
5534 :param value: set the value. Either ``(choice, value)`` tuple, or
5535 :py:class:`pyderasn.Choice` object
5536 :param bytes impl: can not be set, do **not** use it
5537 :param bytes expl: override default tag with ``EXPLICIT`` one
5538 :param default: set default value. Type same as in ``value``
5539 :param bool optional: is object ``OPTIONAL`` in sequence
5541 if impl is not None:
5542 raise ValueError("no implicit tag allowed for CHOICE")
5543 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5545 schema = getattr(self, "schema", ())
5546 if len(schema) == 0:
5547 raise ValueError("schema must be specified")
5549 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5552 if value is not None:
5553 self._value = self._value_sanitize(value)
5554 if default is not None:
5555 default_value = self._value_sanitize(default)
5556 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5557 default_obj.specs = self.specs
5558 default_obj._value = default_value
5559 self.default = default_obj
5561 self._value = copy(default_obj._value)
5562 if self._expl is not None:
5563 tag_class, _, tag_num = tag_decode(self._expl)
5564 self._tag_order = (tag_class, tag_num)
5566 def _value_sanitize(self, value):
5567 if (value.__class__ == tuple) and len(value) == 2:
5569 spec = self.specs.get(choice)
5571 raise ObjUnknown(choice)
5572 if not isinstance(obj, spec.__class__):
5573 raise InvalidValueType((spec,))
5574 return (choice, spec(obj))
5575 if isinstance(value, self.__class__):
5577 raise InvalidValueType((self.__class__, tuple))
5581 return self._value is not None and self._value[1].ready
5585 return self.expl_lenindef or (
5586 (self._value is not None) and
5587 self._value[1].bered
5590 def __getstate__(self):
5608 def __setstate__(self, state):
5609 super(Choice, self).__setstate__(state)
5610 self.specs = state.specs
5611 self._value = state.value
5613 def __eq__(self, their):
5614 if (their.__class__ == tuple) and len(their) == 2:
5615 return self._value == their
5616 if not isinstance(their, self.__class__):
5619 self.specs == their.specs and
5620 self._value == their._value
5630 return self.__class__(
5633 expl=self._expl if expl is None else expl,
5634 default=self.default if default is None else default,
5635 optional=self.optional if optional is None else optional,
5640 """Name of the choice
5642 self._assert_ready()
5643 return self._value[0]
5647 """Value of underlying choice
5649 self._assert_ready()
5650 return self._value[1]
5653 def tag_order(self):
5654 self._assert_ready()
5655 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5658 def tag_order_cer(self):
5659 return min(v.tag_order_cer for v in itervalues(self.specs))
5661 def __getitem__(self, key):
5662 if key not in self.specs:
5663 raise ObjUnknown(key)
5664 if self._value is None:
5666 choice, value = self._value
5671 def __setitem__(self, key, value):
5672 spec = self.specs.get(key)
5674 raise ObjUnknown(key)
5675 if not isinstance(value, spec.__class__):
5676 raise InvalidValueType((spec.__class__,))
5677 self._value = (key, spec(value))
5685 return self._value[1].decoded if self.ready else False
5688 self._assert_ready()
5689 return self._value[1].encode()
5691 def _encode1st(self, state):
5692 self._assert_ready()
5693 return self._value[1].encode1st(state)
5695 def _encode2nd(self, writer, state_iter):
5696 self._value[1].encode2nd(writer, state_iter)
5698 def _encode_cer(self, writer):
5699 self._assert_ready()
5700 self._value[1].encode_cer(writer)
5702 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5703 for choice, spec in iteritems(self.specs):
5704 sub_decode_path = decode_path + (choice,)
5710 decode_path=sub_decode_path,
5713 _ctx_immutable=False,
5720 klass=self.__class__,
5721 decode_path=decode_path,
5724 if tag_only: # pragma: no cover
5728 for _decode_path, value, tail in spec.decode_evgen(
5732 decode_path=sub_decode_path,
5734 _ctx_immutable=False,
5736 yield _decode_path, value, tail
5738 _, value, tail = next(spec.decode_evgen(
5742 decode_path=sub_decode_path,
5744 _ctx_immutable=False,
5747 obj = self.__class__(
5750 default=self.default,
5751 optional=self.optional,
5752 _decoded=(offset, 0, value.fulllen),
5754 obj._value = (choice, value)
5755 yield decode_path, obj, tail
5758 value = pp_console_row(next(self.pps()))
5760 value = "%s[%r]" % (value, self.value)
5763 def pps(self, decode_path=()):
5766 asn1_type_name=self.asn1_type_name,
5767 obj_name=self.__class__.__name__,
5768 decode_path=decode_path,
5769 value=self.choice if self.ready else None,
5770 optional=self.optional,
5771 default=self == self.default,
5772 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5773 expl=None if self._expl is None else tag_decode(self._expl),
5778 expl_lenindef=self.expl_lenindef,
5782 yield self.value.pps(decode_path=decode_path + (self.choice,))
5783 for pp in self.pps_lenindef(decode_path):
5787 class PrimitiveTypes(Choice):
5788 """Predefined ``CHOICE`` for all generic primitive types
5790 It could be useful for general decoding of some unspecified values:
5792 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5793 OCTET STRING 3 bytes 666f6f
5794 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5798 schema = tuple((klass.__name__, klass()) for klass in (
5822 AnyState = namedtuple(
5824 BasicState._fields + ("value", "defined"),
5830 """``ANY`` special type
5832 >>> Any(Integer(-123))
5833 ANY INTEGER -123 (0X:7B)
5834 >>> a = Any(OctetString(b"hello world").encode())
5835 ANY 040b68656c6c6f20776f726c64
5836 >>> hexenc(bytes(a))
5837 b'0x040x0bhello world'
5839 __slots__ = ("defined",)
5840 tag_default = tag_encode(0)
5841 asn1_type_name = "ANY"
5851 :param value: set the value. Either any kind of pyderasn's
5852 **ready** object, or bytes. Pay attention that
5853 **no** validation is performed if raw binary value
5854 is valid TLV, except just tag decoding
5855 :param bytes expl: override default tag with ``EXPLICIT`` one
5856 :param bool optional: is object ``OPTIONAL`` in sequence
5858 super(Any, self).__init__(None, expl, None, optional, _decoded)
5862 value = self._value_sanitize(value)
5864 if self._expl is None:
5865 if value.__class__ == binary_type:
5866 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5868 tag_class, tag_num = value.tag_order
5870 tag_class, _, tag_num = tag_decode(self._expl)
5871 self._tag_order = (tag_class, tag_num)
5874 def _value_sanitize(self, value):
5875 if value.__class__ == binary_type:
5877 raise ValueError("Any value can not be empty")
5879 if isinstance(value, self.__class__):
5881 if not isinstance(value, Obj):
5882 raise InvalidValueType((self.__class__, Obj, binary_type))
5887 return self._value is not None
5890 def tag_order(self):
5891 self._assert_ready()
5892 return self._tag_order
5896 if self.expl_lenindef or self.lenindef:
5898 if self.defined is None:
5900 return self.defined[1].bered
5902 def __getstate__(self):
5920 def __setstate__(self, state):
5921 super(Any, self).__setstate__(state)
5922 self._value = state.value
5923 self.defined = state.defined
5925 def __eq__(self, their):
5926 if their.__class__ == binary_type:
5927 if self._value.__class__ == binary_type:
5928 return self._value == their
5929 return self._value.encode() == their
5930 if issubclass(their.__class__, Any):
5931 if self.ready and their.ready:
5932 return bytes(self) == bytes(their)
5933 return self.ready == their.ready
5942 return self.__class__(
5944 expl=self._expl if expl is None else expl,
5945 optional=self.optional if optional is None else optional,
5948 def __bytes__(self):
5949 self._assert_ready()
5951 if value.__class__ == binary_type:
5953 return self._value.encode()
5960 self._assert_ready()
5962 if value.__class__ == binary_type:
5964 return value.encode()
5966 def _encode1st(self, state):
5967 self._assert_ready()
5969 if value.__class__ == binary_type:
5970 return len(value), state
5971 return value.encode1st(state)
5973 def _encode2nd(self, writer, state_iter):
5975 if value.__class__ == binary_type:
5976 write_full(writer, value)
5978 value.encode2nd(writer, state_iter)
5980 def _encode_cer(self, writer):
5981 self._assert_ready()
5983 if value.__class__ == binary_type:
5984 write_full(writer, value)
5986 value.encode_cer(writer)
5988 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5990 t, tlen, lv = tag_strip(tlv)
5991 except DecodeError as err:
5992 raise err.__class__(
5994 klass=self.__class__,
5995 decode_path=decode_path,
5999 l, llen, v = len_decode(lv)
6000 except LenIndefForm as err:
6001 if not ctx.get("bered", False):
6002 raise err.__class__(
6004 klass=self.__class__,
6005 decode_path=decode_path,
6008 llen, vlen, v = 1, 0, lv[1:]
6009 sub_offset = offset + tlen + llen
6011 while v[:EOC_LEN].tobytes() != EOC:
6012 chunk, v = Any().decode(
6015 decode_path=decode_path + (str(chunk_i),),
6018 _ctx_immutable=False,
6020 vlen += chunk.tlvlen
6021 sub_offset += chunk.tlvlen
6023 tlvlen = tlen + llen + vlen + EOC_LEN
6024 obj = self.__class__(
6025 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6027 optional=self.optional,
6028 _decoded=(offset, 0, tlvlen),
6031 obj.tag = t.tobytes()
6032 yield decode_path, obj, v[EOC_LEN:]
6034 except DecodeError as err:
6035 raise err.__class__(
6037 klass=self.__class__,
6038 decode_path=decode_path,
6042 raise NotEnoughData(
6043 "encoded length is longer than data",
6044 klass=self.__class__,
6045 decode_path=decode_path,
6048 tlvlen = tlen + llen + l
6049 v, tail = tlv[:tlvlen], v[l:]
6050 obj = self.__class__(
6051 value=None if evgen_mode else v.tobytes(),
6053 optional=self.optional,
6054 _decoded=(offset, 0, tlvlen),
6056 obj.tag = t.tobytes()
6057 yield decode_path, obj, tail
6060 return pp_console_row(next(self.pps()))
6062 def pps(self, decode_path=()):
6066 elif value.__class__ == binary_type:
6072 asn1_type_name=self.asn1_type_name,
6073 obj_name=self.__class__.__name__,
6074 decode_path=decode_path,
6076 blob=self._value if self._value.__class__ == binary_type else None,
6077 optional=self.optional,
6078 default=self == self.default,
6079 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6080 expl=None if self._expl is None else tag_decode(self._expl),
6085 expl_offset=self.expl_offset if self.expled else None,
6086 expl_tlen=self.expl_tlen if self.expled else None,
6087 expl_llen=self.expl_llen if self.expled else None,
6088 expl_vlen=self.expl_vlen if self.expled else None,
6089 expl_lenindef=self.expl_lenindef,
6090 lenindef=self.lenindef,
6093 defined_by, defined = self.defined or (None, None)
6094 if defined_by is not None:
6096 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6098 for pp in self.pps_lenindef(decode_path):
6102 ########################################################################
6103 # ASN.1 constructed types
6104 ########################################################################
6106 def abs_decode_path(decode_path, rel_path):
6107 """Create an absolute decode path from current and relative ones
6109 :param decode_path: current decode path, starting point. Tuple of strings
6110 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6111 If first tuple's element is "/", then treat it as
6112 an absolute path, ignoring ``decode_path`` as
6113 starting point. Also this tuple can contain ".."
6114 elements, stripping the leading element from
6117 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6118 ("foo", "bar", "baz", "whatever")
6119 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6121 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6124 if rel_path[0] == "/":
6126 if rel_path[0] == "..":
6127 return abs_decode_path(decode_path[:-1], rel_path[1:])
6128 return decode_path + rel_path
6131 SequenceState = namedtuple(
6133 BasicState._fields + ("specs", "value",),
6138 class SequenceEncode1stMixing(object):
6139 def _encode1st(self, state):
6141 idx = len(state) - 1
6143 for v in self._values_for_encoding():
6144 l, _ = v.encode1st(state)
6147 return len(self.tag) + len_size(vlen) + vlen, state
6150 class Sequence(SequenceEncode1stMixing, Obj):
6151 """``SEQUENCE`` structure type
6153 You have to make specification of sequence::
6155 class Extension(Sequence):
6157 ("extnID", ObjectIdentifier()),
6158 ("critical", Boolean(default=False)),
6159 ("extnValue", OctetString()),
6162 Then, you can work with it as with dictionary.
6164 >>> ext = Extension()
6165 >>> Extension().specs
6167 ('extnID', OBJECT IDENTIFIER),
6168 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6169 ('extnValue', OCTET STRING),
6171 >>> ext["extnID"] = "1.2.3"
6172 Traceback (most recent call last):
6173 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6174 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6176 You can determine if sequence is ready to be encoded:
6181 Traceback (most recent call last):
6182 pyderasn.ObjNotReady: object is not ready: extnValue
6183 >>> ext["extnValue"] = OctetString(b"foobar")
6187 Value you want to assign, must have the same **type** as in
6188 corresponding specification, but it can have different tags,
6189 optional/default attributes -- they will be taken from specification
6192 class TBSCertificate(Sequence):
6194 ("version", Version(expl=tag_ctxc(0), default="v1")),
6197 >>> tbs = TBSCertificate()
6198 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6200 Assign ``None`` to remove value from sequence.
6202 You can set values in Sequence during its initialization:
6204 >>> AlgorithmIdentifier((
6205 ("algorithm", ObjectIdentifier("1.2.3")),
6206 ("parameters", Any(Null()))
6208 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6210 You can determine if value exists/set in the sequence and take its value:
6212 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6215 OBJECT IDENTIFIER 1.2.3
6217 But pay attention that if value has default, then it won't be (not
6218 in) in the sequence (because ``DEFAULT`` must not be encoded in
6219 DER), but you can read its value:
6221 >>> "critical" in ext, ext["critical"]
6222 (False, BOOLEAN False)
6223 >>> ext["critical"] = Boolean(True)
6224 >>> "critical" in ext, ext["critical"]
6225 (True, BOOLEAN True)
6227 All defaulted values are always optional.
6229 .. _allow_default_values_ctx:
6231 DER prohibits default value encoding and will raise an error if
6232 default value is unexpectedly met during decode.
6233 If :ref:`bered <bered_ctx>` context option is set, then no error
6234 will be raised, but ``bered`` attribute set. You can disable strict
6235 defaulted values existence validation by setting
6236 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6238 All values with DEFAULT specified are decoded atomically in
6239 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6240 SEQUENCE, then it will be yielded as a single element, not
6241 disassembled. That is required for DEFAULT existence check.
6243 Two sequences are equal if they have equal specification (schema),
6244 implicit/explicit tagging and the same values.
6246 __slots__ = ("specs",)
6247 tag_default = tag_encode(form=TagFormConstructed, num=16)
6248 asn1_type_name = "SEQUENCE"
6260 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6262 schema = getattr(self, "schema", ())
6264 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6267 if value is not None:
6268 if issubclass(value.__class__, Sequence):
6269 self._value = value._value
6270 elif hasattr(value, "__iter__"):
6271 for seq_key, seq_value in value:
6272 self[seq_key] = seq_value
6274 raise InvalidValueType((Sequence,))
6275 if default is not None:
6276 if not issubclass(default.__class__, Sequence):
6277 raise InvalidValueType((Sequence,))
6278 default_value = default._value
6279 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6280 default_obj.specs = self.specs
6281 default_obj._value = default_value
6282 self.default = default_obj
6284 self._value = copy(default_obj._value)
6288 for name, spec in iteritems(self.specs):
6289 value = self._value.get(name)
6300 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6302 return any(value.bered for value in itervalues(self._value))
6304 def __getstate__(self):
6305 return SequenceState(
6319 {k: copy(v) for k, v in iteritems(self._value)},
6322 def __setstate__(self, state):
6323 super(Sequence, self).__setstate__(state)
6324 self.specs = state.specs
6325 self._value = state.value
6327 def __eq__(self, their):
6328 if not isinstance(their, self.__class__):
6331 self.specs == their.specs and
6332 self.tag == their.tag and
6333 self._expl == their._expl and
6334 self._value == their._value
6345 return self.__class__(
6348 impl=self.tag if impl is None else impl,
6349 expl=self._expl if expl is None else expl,
6350 default=self.default if default is None else default,
6351 optional=self.optional if optional is None else optional,
6354 def __contains__(self, key):
6355 return key in self._value
6357 def __setitem__(self, key, value):
6358 spec = self.specs.get(key)
6360 raise ObjUnknown(key)
6362 self._value.pop(key, None)
6364 if not isinstance(value, spec.__class__):
6365 raise InvalidValueType((spec.__class__,))
6366 value = spec(value=value)
6367 if spec.default is not None and value == spec.default:
6368 self._value.pop(key, None)
6370 self._value[key] = value
6372 def __getitem__(self, key):
6373 value = self._value.get(key)
6374 if value is not None:
6376 spec = self.specs.get(key)
6378 raise ObjUnknown(key)
6379 if spec.default is not None:
6383 def _values_for_encoding(self):
6384 for name, spec in iteritems(self.specs):
6385 value = self._value.get(name)
6389 raise ObjNotReady(name)
6393 v = b"".join(v.encode() for v in self._values_for_encoding())
6394 return b"".join((self.tag, len_encode(len(v)), v))
6396 def _encode2nd(self, writer, state_iter):
6397 write_full(writer, self.tag + len_encode(next(state_iter)))
6398 for v in self._values_for_encoding():
6399 v.encode2nd(writer, state_iter)
6401 def _encode_cer(self, writer):
6402 write_full(writer, self.tag + LENINDEF)
6403 for v in self._values_for_encoding():
6404 v.encode_cer(writer)
6405 write_full(writer, EOC)
6407 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6409 t, tlen, lv = tag_strip(tlv)
6410 except DecodeError as err:
6411 raise err.__class__(
6413 klass=self.__class__,
6414 decode_path=decode_path,
6419 klass=self.__class__,
6420 decode_path=decode_path,
6423 if tag_only: # pragma: no cover
6427 ctx_bered = ctx.get("bered", False)
6429 l, llen, v = len_decode(lv)
6430 except LenIndefForm as err:
6432 raise err.__class__(
6434 klass=self.__class__,
6435 decode_path=decode_path,
6438 l, llen, v = 0, 1, lv[1:]
6440 except DecodeError as err:
6441 raise err.__class__(
6443 klass=self.__class__,
6444 decode_path=decode_path,
6448 raise NotEnoughData(
6449 "encoded length is longer than data",
6450 klass=self.__class__,
6451 decode_path=decode_path,
6455 v, tail = v[:l], v[l:]
6457 sub_offset = offset + tlen + llen
6460 ctx_allow_default_values = ctx.get("allow_default_values", False)
6461 for name, spec in iteritems(self.specs):
6462 if spec.optional and (
6463 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6467 spec_defaulted = spec.default is not None
6468 sub_decode_path = decode_path + (name,)
6470 if evgen_mode and not spec_defaulted:
6471 for _decode_path, value, v_tail in spec.decode_evgen(
6475 decode_path=sub_decode_path,
6477 _ctx_immutable=False,
6479 yield _decode_path, value, v_tail
6481 _, value, v_tail = next(spec.decode_evgen(
6485 decode_path=sub_decode_path,
6487 _ctx_immutable=False,
6490 except TagMismatch as err:
6491 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6495 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6496 if not evgen_mode and defined is not None:
6497 defined_by, defined_spec = defined
6498 if issubclass(value.__class__, SequenceOf):
6499 for i, _value in enumerate(value):
6500 sub_sub_decode_path = sub_decode_path + (
6502 DecodePathDefBy(defined_by),
6504 defined_value, defined_tail = defined_spec.decode(
6505 memoryview(bytes(_value)),
6507 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6508 if value.expled else (value.tlen + value.llen)
6511 decode_path=sub_sub_decode_path,
6513 _ctx_immutable=False,
6515 if len(defined_tail) > 0:
6518 klass=self.__class__,
6519 decode_path=sub_sub_decode_path,
6522 _value.defined = (defined_by, defined_value)
6524 defined_value, defined_tail = defined_spec.decode(
6525 memoryview(bytes(value)),
6527 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6528 if value.expled else (value.tlen + value.llen)
6531 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6533 _ctx_immutable=False,
6535 if len(defined_tail) > 0:
6538 klass=self.__class__,
6539 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6542 value.defined = (defined_by, defined_value)
6544 value_len = value.fulllen
6546 sub_offset += value_len
6550 yield sub_decode_path, value, v_tail
6551 if value == spec.default:
6552 if ctx_bered or ctx_allow_default_values:
6556 "DEFAULT value met",
6557 klass=self.__class__,
6558 decode_path=sub_decode_path,
6562 values[name] = value
6563 spec_defines = getattr(spec, "defines", ())
6564 if len(spec_defines) == 0:
6565 defines_by_path = ctx.get("defines_by_path", ())
6566 if len(defines_by_path) > 0:
6567 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6568 if spec_defines is not None and len(spec_defines) > 0:
6569 for rel_path, schema in spec_defines:
6570 defined = schema.get(value, None)
6571 if defined is not None:
6572 ctx.setdefault("_defines", []).append((
6573 abs_decode_path(sub_decode_path[:-1], rel_path),
6577 if v[:EOC_LEN].tobytes() != EOC:
6580 klass=self.__class__,
6581 decode_path=decode_path,
6589 klass=self.__class__,
6590 decode_path=decode_path,
6593 obj = self.__class__(
6597 default=self.default,
6598 optional=self.optional,
6599 _decoded=(offset, llen, vlen),
6602 obj.lenindef = lenindef
6603 obj.ber_encoded = ber_encoded
6604 yield decode_path, obj, tail
6607 value = pp_console_row(next(self.pps()))
6609 for name in self.specs:
6610 _value = self._value.get(name)
6613 cols.append("%s: %s" % (name, repr(_value)))
6614 return "%s[%s]" % (value, "; ".join(cols))
6616 def pps(self, decode_path=()):
6619 asn1_type_name=self.asn1_type_name,
6620 obj_name=self.__class__.__name__,
6621 decode_path=decode_path,
6622 optional=self.optional,
6623 default=self == self.default,
6624 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6625 expl=None if self._expl is None else tag_decode(self._expl),
6630 expl_offset=self.expl_offset if self.expled else None,
6631 expl_tlen=self.expl_tlen if self.expled else None,
6632 expl_llen=self.expl_llen if self.expled else None,
6633 expl_vlen=self.expl_vlen if self.expled else None,
6634 expl_lenindef=self.expl_lenindef,
6635 lenindef=self.lenindef,
6636 ber_encoded=self.ber_encoded,
6639 for name in self.specs:
6640 value = self._value.get(name)
6643 yield value.pps(decode_path=decode_path + (name,))
6644 for pp in self.pps_lenindef(decode_path):
6648 class Set(Sequence, SequenceEncode1stMixing):
6649 """``SET`` structure type
6651 Its usage is identical to :py:class:`pyderasn.Sequence`.
6653 .. _allow_unordered_set_ctx:
6655 DER prohibits unordered values encoding and will raise an error
6656 during decode. If :ref:`bered <bered_ctx>` context option is set,
6657 then no error will occur. Also you can disable strict values
6658 ordering check by setting ``"allow_unordered_set": True``
6659 :ref:`context <ctx>` option.
6662 tag_default = tag_encode(form=TagFormConstructed, num=17)
6663 asn1_type_name = "SET"
6665 def _values_for_encoding(self):
6667 super(Set, self)._values_for_encoding(),
6668 key=attrgetter("tag_order"),
6671 def _encode_cer(self, writer):
6672 write_full(writer, self.tag + LENINDEF)
6674 super(Set, self)._values_for_encoding(),
6675 key=attrgetter("tag_order_cer"),
6677 v.encode_cer(writer)
6678 write_full(writer, EOC)
6680 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6682 t, tlen, lv = tag_strip(tlv)
6683 except DecodeError as err:
6684 raise err.__class__(
6686 klass=self.__class__,
6687 decode_path=decode_path,
6692 klass=self.__class__,
6693 decode_path=decode_path,
6700 ctx_bered = ctx.get("bered", False)
6702 l, llen, v = len_decode(lv)
6703 except LenIndefForm as err:
6705 raise err.__class__(
6707 klass=self.__class__,
6708 decode_path=decode_path,
6711 l, llen, v = 0, 1, lv[1:]
6713 except DecodeError as err:
6714 raise err.__class__(
6716 klass=self.__class__,
6717 decode_path=decode_path,
6721 raise NotEnoughData(
6722 "encoded length is longer than data",
6723 klass=self.__class__,
6727 v, tail = v[:l], v[l:]
6729 sub_offset = offset + tlen + llen
6732 ctx_allow_default_values = ctx.get("allow_default_values", False)
6733 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6734 tag_order_prev = (0, 0)
6735 _specs_items = copy(self.specs)
6738 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6740 for name, spec in iteritems(_specs_items):
6741 sub_decode_path = decode_path + (name,)
6747 decode_path=sub_decode_path,
6750 _ctx_immutable=False,
6757 klass=self.__class__,
6758 decode_path=decode_path,
6761 spec_defaulted = spec.default is not None
6762 if evgen_mode and not spec_defaulted:
6763 for _decode_path, value, v_tail in spec.decode_evgen(
6767 decode_path=sub_decode_path,
6769 _ctx_immutable=False,
6771 yield _decode_path, value, v_tail
6773 _, value, v_tail = next(spec.decode_evgen(
6777 decode_path=sub_decode_path,
6779 _ctx_immutable=False,
6782 value_tag_order = value.tag_order
6783 value_len = value.fulllen
6784 if tag_order_prev >= value_tag_order:
6785 if ctx_bered or ctx_allow_unordered_set:
6789 "unordered " + self.asn1_type_name,
6790 klass=self.__class__,
6791 decode_path=sub_decode_path,
6796 yield sub_decode_path, value, v_tail
6797 if value != spec.default:
6799 elif ctx_bered or ctx_allow_default_values:
6803 "DEFAULT value met",
6804 klass=self.__class__,
6805 decode_path=sub_decode_path,
6808 values[name] = value
6809 del _specs_items[name]
6810 tag_order_prev = value_tag_order
6811 sub_offset += value_len
6815 obj = self.__class__(
6819 default=self.default,
6820 optional=self.optional,
6821 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6824 if v[:EOC_LEN].tobytes() != EOC:
6827 klass=self.__class__,
6828 decode_path=decode_path,
6833 for name, spec in iteritems(self.specs):
6834 if name not in values and not spec.optional:
6836 "%s value is not ready" % name,
6837 klass=self.__class__,
6838 decode_path=decode_path,
6843 obj.ber_encoded = ber_encoded
6844 yield decode_path, obj, tail
6847 SequenceOfState = namedtuple(
6849 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6854 class SequenceOf(SequenceEncode1stMixing, Obj):
6855 """``SEQUENCE OF`` sequence type
6857 For that kind of type you must specify the object it will carry on
6858 (bounds are for example here, not required)::
6860 class Ints(SequenceOf):
6865 >>> ints.append(Integer(123))
6866 >>> ints.append(Integer(234))
6868 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6869 >>> [int(i) for i in ints]
6871 >>> ints.append(Integer(345))
6872 Traceback (most recent call last):
6873 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6876 >>> ints[1] = Integer(345)
6878 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6880 You can initialize sequence with preinitialized values:
6882 >>> ints = Ints([Integer(123), Integer(234)])
6884 Also you can use iterator as a value:
6886 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6888 And it won't be iterated until encoding process. Pay attention that
6889 bounds and required schema checks are done only during the encoding
6890 process in that case! After encode was called, then value is zeroed
6891 back to empty list and you have to set it again. That mode is useful
6892 mainly with CER encoding mode, where all objects from the iterable
6893 will be streamed to the buffer, without copying all of them to
6896 __slots__ = ("spec", "_bound_min", "_bound_max")
6897 tag_default = tag_encode(form=TagFormConstructed, num=16)
6898 asn1_type_name = "SEQUENCE OF"
6911 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6913 schema = getattr(self, "schema", None)
6915 raise ValueError("schema must be specified")
6917 self._bound_min, self._bound_max = getattr(
6921 ) if bounds is None else bounds
6923 if value is not None:
6924 self._value = self._value_sanitize(value)
6925 if default is not None:
6926 default_value = self._value_sanitize(default)
6927 default_obj = self.__class__(
6932 default_obj._value = default_value
6933 self.default = default_obj
6935 self._value = copy(default_obj._value)
6937 def _value_sanitize(self, value):
6939 if issubclass(value.__class__, SequenceOf):
6940 value = value._value
6941 elif hasattr(value, NEXT_ATTR_NAME):
6943 elif hasattr(value, "__iter__"):
6946 raise InvalidValueType((self.__class__, iter, "iterator"))
6948 if not self._bound_min <= len(value) <= self._bound_max:
6949 raise BoundsError(self._bound_min, len(value), self._bound_max)
6950 class_expected = self.spec.__class__
6952 if not isinstance(v, class_expected):
6953 raise InvalidValueType((class_expected,))
6958 if hasattr(self._value, NEXT_ATTR_NAME):
6960 if self._bound_min > 0 and len(self._value) == 0:
6962 return all(v.ready for v in self._value)
6966 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6968 return any(v.bered for v in self._value)
6970 def __getstate__(self):
6971 if hasattr(self._value, NEXT_ATTR_NAME):
6972 raise ValueError("can not pickle SequenceOf with iterator")
6973 return SequenceOfState(
6987 [copy(v) for v in self._value],
6992 def __setstate__(self, state):
6993 super(SequenceOf, self).__setstate__(state)
6994 self.spec = state.spec
6995 self._value = state.value
6996 self._bound_min = state.bound_min
6997 self._bound_max = state.bound_max
6999 def __eq__(self, their):
7000 if isinstance(their, self.__class__):
7002 self.spec == their.spec and
7003 self.tag == their.tag and
7004 self._expl == their._expl and
7005 self._value == their._value
7007 if hasattr(their, "__iter__"):
7008 return self._value == list(their)
7020 return self.__class__(
7024 (self._bound_min, self._bound_max)
7025 if bounds is None else bounds
7027 impl=self.tag if impl is None else impl,
7028 expl=self._expl if expl is None else expl,
7029 default=self.default if default is None else default,
7030 optional=self.optional if optional is None else optional,
7033 def __contains__(self, key):
7034 return key in self._value
7036 def append(self, value):
7037 if not isinstance(value, self.spec.__class__):
7038 raise InvalidValueType((self.spec.__class__,))
7039 if len(self._value) + 1 > self._bound_max:
7042 len(self._value) + 1,
7045 self._value.append(value)
7048 return iter(self._value)
7051 return len(self._value)
7053 def __setitem__(self, key, value):
7054 if not isinstance(value, self.spec.__class__):
7055 raise InvalidValueType((self.spec.__class__,))
7056 self._value[key] = self.spec(value=value)
7058 def __getitem__(self, key):
7059 return self._value[key]
7061 def _values_for_encoding(self):
7062 return iter(self._value)
7065 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7068 values_append = values.append
7069 class_expected = self.spec.__class__
7070 values_for_encoding = self._values_for_encoding()
7072 for v in values_for_encoding:
7073 if not isinstance(v, class_expected):
7074 raise InvalidValueType((class_expected,))
7075 values_append(v.encode())
7076 if not self._bound_min <= len(values) <= self._bound_max:
7077 raise BoundsError(self._bound_min, len(values), self._bound_max)
7078 value = b"".join(values)
7080 value = b"".join(v.encode() for v in self._values_for_encoding())
7081 return b"".join((self.tag, len_encode(len(value)), value))
7083 def _encode1st(self, state):
7084 state = super(SequenceOf, self)._encode1st(state)
7085 if hasattr(self._value, NEXT_ATTR_NAME):
7089 def _encode2nd(self, writer, state_iter):
7090 write_full(writer, self.tag + len_encode(next(state_iter)))
7091 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7094 class_expected = self.spec.__class__
7095 values_for_encoding = self._values_for_encoding()
7097 for v in values_for_encoding:
7098 if not isinstance(v, class_expected):
7099 raise InvalidValueType((class_expected,))
7100 v.encode2nd(writer, state_iter)
7102 if not self._bound_min <= values_count <= self._bound_max:
7103 raise BoundsError(self._bound_min, values_count, self._bound_max)
7105 for v in self._values_for_encoding():
7106 v.encode2nd(writer, state_iter)
7108 def _encode_cer(self, writer):
7109 write_full(writer, self.tag + LENINDEF)
7110 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7112 class_expected = self.spec.__class__
7114 values_for_encoding = self._values_for_encoding()
7116 for v in values_for_encoding:
7117 if not isinstance(v, class_expected):
7118 raise InvalidValueType((class_expected,))
7119 v.encode_cer(writer)
7121 if not self._bound_min <= values_count <= self._bound_max:
7122 raise BoundsError(self._bound_min, values_count, self._bound_max)
7124 for v in self._values_for_encoding():
7125 v.encode_cer(writer)
7126 write_full(writer, EOC)
7136 ordering_check=False,
7139 t, tlen, lv = tag_strip(tlv)
7140 except DecodeError as err:
7141 raise err.__class__(
7143 klass=self.__class__,
7144 decode_path=decode_path,
7149 klass=self.__class__,
7150 decode_path=decode_path,
7157 ctx_bered = ctx.get("bered", False)
7159 l, llen, v = len_decode(lv)
7160 except LenIndefForm as err:
7162 raise err.__class__(
7164 klass=self.__class__,
7165 decode_path=decode_path,
7168 l, llen, v = 0, 1, lv[1:]
7170 except DecodeError as err:
7171 raise err.__class__(
7173 klass=self.__class__,
7174 decode_path=decode_path,
7178 raise NotEnoughData(
7179 "encoded length is longer than data",
7180 klass=self.__class__,
7181 decode_path=decode_path,
7185 v, tail = v[:l], v[l:]
7187 sub_offset = offset + tlen + llen
7190 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7191 value_prev = memoryview(v[:0])
7195 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7197 sub_decode_path = decode_path + (str(_value_count),)
7199 for _decode_path, value, v_tail in spec.decode_evgen(
7203 decode_path=sub_decode_path,
7205 _ctx_immutable=False,
7207 yield _decode_path, value, v_tail
7209 _, value, v_tail = next(spec.decode_evgen(
7213 decode_path=sub_decode_path,
7215 _ctx_immutable=False,
7218 value_len = value.fulllen
7220 if value_prev.tobytes() > v[:value_len].tobytes():
7221 if ctx_bered or ctx_allow_unordered_set:
7225 "unordered " + self.asn1_type_name,
7226 klass=self.__class__,
7227 decode_path=sub_decode_path,
7230 value_prev = v[:value_len]
7233 _value.append(value)
7234 sub_offset += value_len
7237 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7239 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7240 klass=self.__class__,
7241 decode_path=decode_path,
7245 obj = self.__class__(
7246 value=None if evgen_mode else _value,
7248 bounds=(self._bound_min, self._bound_max),
7251 default=self.default,
7252 optional=self.optional,
7253 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7255 except BoundsError as err:
7258 klass=self.__class__,
7259 decode_path=decode_path,
7263 if v[:EOC_LEN].tobytes() != EOC:
7266 klass=self.__class__,
7267 decode_path=decode_path,
7272 obj.ber_encoded = ber_encoded
7273 yield decode_path, obj, tail
7277 pp_console_row(next(self.pps())),
7278 ", ".join(repr(v) for v in self._value),
7281 def pps(self, decode_path=()):
7284 asn1_type_name=self.asn1_type_name,
7285 obj_name=self.__class__.__name__,
7286 decode_path=decode_path,
7287 optional=self.optional,
7288 default=self == self.default,
7289 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7290 expl=None if self._expl is None else tag_decode(self._expl),
7295 expl_offset=self.expl_offset if self.expled else None,
7296 expl_tlen=self.expl_tlen if self.expled else None,
7297 expl_llen=self.expl_llen if self.expled else None,
7298 expl_vlen=self.expl_vlen if self.expled else None,
7299 expl_lenindef=self.expl_lenindef,
7300 lenindef=self.lenindef,
7301 ber_encoded=self.ber_encoded,
7304 for i, value in enumerate(self._value):
7305 yield value.pps(decode_path=decode_path + (str(i),))
7306 for pp in self.pps_lenindef(decode_path):
7310 class SetOf(SequenceOf):
7311 """``SET OF`` sequence type
7313 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7316 tag_default = tag_encode(form=TagFormConstructed, num=17)
7317 asn1_type_name = "SET OF"
7319 def _value_sanitize(self, value):
7320 value = super(SetOf, self)._value_sanitize(value)
7321 if hasattr(value, NEXT_ATTR_NAME):
7323 "SetOf does not support iterator values, as no sense in them"
7328 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7329 return b"".join((self.tag, len_encode(len(v)), v))
7331 def _encode2nd(self, writer, state_iter):
7332 write_full(writer, self.tag + len_encode(next(state_iter)))
7334 for v in self._values_for_encoding():
7336 v.encode2nd(buf.write, state_iter)
7337 values.append(buf.getvalue())
7340 write_full(writer, v)
7342 def _encode_cer(self, writer):
7343 write_full(writer, self.tag + LENINDEF)
7344 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7345 write_full(writer, v)
7346 write_full(writer, EOC)
7348 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7349 return super(SetOf, self)._decode(
7356 ordering_check=True,
7360 def obj_by_path(pypath): # pragma: no cover
7361 """Import object specified as string Python path
7363 Modules must be separated from classes/functions with ``:``.
7365 >>> obj_by_path("foo.bar:Baz")
7366 <class 'foo.bar.Baz'>
7367 >>> obj_by_path("foo.bar:Baz.boo")
7368 <classmethod 'foo.bar.Baz.boo'>
7370 mod, objs = pypath.rsplit(":", 1)
7371 from importlib import import_module
7372 obj = import_module(mod)
7373 for obj_name in objs.split("."):
7374 obj = getattr(obj, obj_name)
7378 def generic_decoder(): # pragma: no cover
7379 # All of this below is a big hack with self references
7380 choice = PrimitiveTypes()
7381 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7382 choice.specs["SetOf"] = SetOf(schema=choice)
7383 for i in six_xrange(31):
7384 choice.specs["SequenceOf%d" % i] = SequenceOf(
7388 choice.specs["Any"] = Any()
7390 # Class name equals to type name, to omit it from output
7391 class SEQUENCEOF(SequenceOf):
7399 with_decode_path=False,
7400 decode_path_only=(),
7403 def _pprint_pps(pps):
7405 if hasattr(pp, "_fields"):
7407 decode_path_only != () and
7408 pp.decode_path[:len(decode_path_only)] != decode_path_only
7411 if pp.asn1_type_name == Choice.asn1_type_name:
7413 pp_kwargs = pp._asdict()
7414 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7415 pp = _pp(**pp_kwargs)
7416 yield pp_console_row(
7421 with_colours=with_colours,
7422 with_decode_path=with_decode_path,
7423 decode_path_len_decrease=len(decode_path_only),
7425 for row in pp_console_blob(
7427 decode_path_len_decrease=len(decode_path_only),
7431 for row in _pprint_pps(pp):
7433 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7434 return SEQUENCEOF(), pprint_any
7437 def ascii_visualize(ba):
7438 """Output only ASCII printable characters, like in hexdump -C
7440 Example output for given binary string (right part)::
7442 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.|
7445 return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7449 """Generate ``hexdump -C`` like output
7453 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..|
7454 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j|
7456 Result of that function is a generator of lines, where each line is
7461 ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7462 " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7463 " |i..?.....G...k.j|"]
7467 hexed = hexenc(raw).upper()
7468 addr, cols = 0, ["%08x " % 0]
7469 for i in six_xrange(0, len(hexed), 2):
7470 if i != 0 and i // 2 % 8 == 0:
7472 if i != 0 and i // 2 % 16 == 0:
7473 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7476 cols = ["%08x " % addr]
7477 cols.append(" " + hexed[i:i + 2])
7479 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7483 def browse(raw, obj, oid_maps=()):
7484 """Interactive browser
7486 :param bytes raw: binary data you decoded
7487 :param obj: decoded :py:class:`pyderasn.Obj`
7488 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7489 Its human readable form is printed when OID is met
7491 .. note:: `urwid <http://urwid.org/>`__ dependency required
7493 This browser is an interactive terminal application for browsing
7494 structures of your decoded ASN.1 objects. You can quit it with **q**
7495 key. It consists of three windows:
7498 View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7499 **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7500 **Left** key goes to constructed element above. **Plus**/**Minus**
7501 keys collapse/uncollapse constructed elements. **Space** toggles it
7503 window with various information about element. You can scroll it
7504 with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7506 window with raw data hexdump and highlighted current element's
7507 contents. It automatically focuses on element's data. You can
7508 scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7509 speed) keys. If element has explicit tag, then it also will be
7510 highlighted with different colour
7512 Window's header contains current decode path and progress bars with
7513 position in *info* and *hexdump* windows.
7515 If you press **d**, then current element will be saved in the
7516 current directory under its decode path name (adding ".0", ".1", etc
7517 suffix if such file already exists). **D** will save it with explicit tag.
7519 You can also invoke it with ``--browse`` command line argument.
7521 from copy import deepcopy
7522 from os.path import exists as path_exists
7525 class TW(urwid.TreeWidget):
7526 def __init__(self, state, *args, **kwargs):
7528 self.scrolled = {"info": False, "hexdump": False}
7529 super(TW, self).__init__(*args, **kwargs)
7532 pp = self.get_node().get_value()
7533 constructed = len(pp) > 1
7534 return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7536 def _state_update(self):
7537 pp, _ = self._get_pp()
7538 self.state["decode_path"].set_text(
7539 ":".join(str(p) for p in pp.decode_path)
7541 lines = deepcopy(self.state["hexed"])
7543 def attr_set(i, attr):
7544 line = lines[i // 16]
7545 idx = 1 + (i - 16 * (i // 16))
7546 line[idx] = (attr, line[idx])
7548 if pp.expl_offset is not None:
7549 for i in six_xrange(
7551 pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7553 attr_set(i, "select-expl")
7554 for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7555 attr_set(i, "select-value")
7556 self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7557 self.state["hexdump"].set_focus(pp.offset // 16)
7558 self.state["hexdump"].set_focus_valign("middle")
7559 self.state["hexdump_bar"].set_completion(
7560 (100 * pp.offset // 16) //
7561 len(self.state["hexdump"]._body.positions())
7565 [("header", "Name: "), pp.obj_name],
7566 [("header", "Type: "), pp.asn1_type_name],
7567 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7568 [("header", "[TLV]len: "), "%d/%d/%d" % (
7569 pp.tlen, pp.llen, pp.vlen,
7571 [("header", "TLVlen: "), "%d" % sum((
7572 pp.tlen, pp.llen, pp.vlen,
7574 [("header", "Slice: "), "[%d:%d]" % (
7575 pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7579 lines.append([("warning", "LENINDEF")])
7581 lines.append([("warning", "BER encoded")])
7583 lines.append([("warning", "BERed")])
7584 if pp.expl is not None:
7585 lines.append([("header", "EXPLICIT")])
7586 klass, _, num = pp.expl
7587 lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)])
7588 if pp.expl_offset is not None:
7589 lines.append([" Offset: %d" % pp.expl_offset])
7590 lines.append([" [TLV]len: %d/%d/%d" % (
7591 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7593 lines.append([" TLVlen: %d" % sum((
7594 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7596 lines.append([" Slice: [%d:%d]" % (
7598 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7600 if pp.impl is not None:
7601 klass, _, num = pp.impl
7603 ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7606 lines.append(["OPTIONAL"])
7608 lines.append(["DEFAULT"])
7609 if len(pp.decode_path) > 0:
7610 ent = pp.decode_path[-1]
7611 if isinstance(ent, DecodePathDefBy):
7613 value = str(ent.defined_by)
7614 oid_name = find_oid_name(
7615 ent.defined_by.asn1_type_name, oid_maps, value,
7617 lines.append([("header", "DEFINED BY: "), "%s" % (
7618 value if oid_name is None
7619 else "%s (%s)" % (oid_name, value)
7622 if pp.value is not None:
7623 lines.append([("header", "Value: "), pp.value])
7625 len(oid_maps) > 0 and
7626 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7628 for oid_map in oid_maps:
7629 oid_name = oid_map.get(pp.value)
7630 if oid_name is not None:
7631 lines.append([("header", "Human: "), oid_name])
7633 if pp.asn1_type_name == Integer.asn1_type_name:
7635 ("header", "Decimal: "), "%d" % int(pp.obj),
7638 ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7640 if pp.blob.__class__ == binary_type:
7641 blob = hexenc(pp.blob).upper()
7642 for i in six_xrange(0, len(blob), 32):
7643 lines.append([colonize_hex(blob[i:i + 32])])
7644 elif pp.blob.__class__ == tuple:
7645 lines.append([", ".join(pp.blob)])
7646 self.state["info"]._set_body([urwid.Text(line) for line in lines])
7647 self.state["info_bar"].set_completion(0)
7649 def selectable(self):
7650 if self.state["widget_current"] != self:
7651 self.state["widget_current"] = self
7652 self.scrolled["info"] = False
7653 self.scrolled["hexdump"] = False
7654 self._state_update()
7655 return super(TW, self).selectable()
7657 def get_display_text(self):
7658 pp, constructed = self._get_pp()
7659 style = "constructed" if constructed else ""
7660 if len(pp.decode_path) == 0:
7661 return (style, pp.obj_name)
7662 if pp.asn1_type_name == "EOC":
7663 return ("eoc", "EOC")
7664 ent = pp.decode_path[-1]
7665 if isinstance(ent, DecodePathDefBy):
7666 value = str(ent.defined_by)
7667 oid_name = find_oid_name(
7668 ent.defined_by.asn1_type_name, oid_maps, value,
7670 return ("defby", "DEFBY:" + (
7671 value if oid_name is None else oid_name
7675 def _scroll(self, what, step):
7676 self.state[what]._invalidate()
7677 pos = self.state[what].focus_position
7678 if not self.scrolled[what]:
7679 self.scrolled[what] = True
7681 pos = max(0, pos + step)
7682 pos = min(pos, len(self.state[what]._body.positions()) - 1)
7683 self.state[what].set_focus(pos)
7684 self.state[what].set_focus_valign("top")
7685 self.state[what + "_bar"].set_completion(
7686 (100 * pos) // len(self.state[what]._body.positions())
7689 def keypress(self, size, key):
7691 raise urwid.ExitMainLoop()
7694 self.expanded = not self.expanded
7695 self.update_expanded_icon()
7698 hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7699 if key in hexdump_steps:
7700 self._scroll("hexdump", hexdump_steps[key])
7703 info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7704 if key in info_steps:
7705 self._scroll("info", info_steps[key])
7708 if key in ("d", "D"):
7709 pp, _ = self._get_pp()
7710 dp = ":".join(str(p) for p in pp.decode_path)
7711 dp = dp.replace(" ", "_")
7714 if key == "d" or pp.expl_offset is None:
7715 data = self.state["raw"][pp.offset:(
7716 pp.offset + pp.tlen + pp.llen + pp.vlen
7719 data = self.state["raw"][pp.expl_offset:(
7720 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7724 def duplicate_path(dp, ctr):
7727 return "%s.%d" % (dp, ctr)
7730 if not path_exists(duplicate_path(dp, ctr)):
7733 dp = duplicate_path(dp, ctr)
7734 with open(dp, "wb") as fd:
7736 self.state["decode_path"].set_text(
7737 ("warning", "Saved to: " + dp)
7740 return super(TW, self).keypress(size, key)
7742 class PN(urwid.ParentNode):
7743 def __init__(self, state, value, *args, **kwargs):
7745 if not hasattr(value, "_fields"):
7747 super(PN, self).__init__(value, *args, **kwargs)
7749 def load_widget(self):
7750 return TW(self.state, self)
7752 def load_child_keys(self):
7753 value = self.get_value()
7754 if hasattr(value, "_fields"):
7756 return range(len(value[1:]))
7758 def load_child_node(self, key):
7761 self.get_value()[key + 1],
7764 depth=self.get_depth() + 1,
7767 class LabeledPG(urwid.ProgressBar):
7768 def __init__(self, label, *args, **kwargs):
7770 super(LabeledPG, self).__init__(*args, **kwargs)
7773 return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7775 WinHexdump = urwid.ListBox([urwid.Text("")])
7776 WinInfo = urwid.ListBox([urwid.Text("")])
7777 WinDecodePath = urwid.Text("", "center")
7778 WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7779 WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7780 WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7783 "hexed": list(hexdump(raw)),
7784 "widget_current": None,
7786 "info_bar": WinInfoBar,
7787 "hexdump": WinHexdump,
7788 "hexdump_bar": WinHexdumpBar,
7789 "decode_path": WinDecodePath,
7793 help_text = " ".join((
7795 "space:(un)collapse",
7796 "(pg)up/down/home/end:nav",
7797 "jkJK:hexdump hlHL:info",
7803 ("weight", 1, WinTree),
7804 ("weight", 2, urwid.Pile([
7805 urwid.LineBox(WinInfo),
7806 urwid.LineBox(WinHexdump),
7809 header=urwid.Columns([
7810 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7811 ("weight", 1, WinInfoBar),
7812 ("weight", 1, WinHexdumpBar),
7814 footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7817 ("header", "bold", ""),
7818 ("constructed", "bold", ""),
7819 ("help", "light magenta", ""),
7820 ("warning", "light red", ""),
7821 ("defby", "light red", ""),
7822 ("eoc", "dark red", ""),
7823 ("select-value", "light green", ""),
7824 ("select-expl", "light red", ""),
7825 ("pg-normal", "", "light blue"),
7826 ("pg-complete", "black", "yellow"),
7831 def main(): # pragma: no cover
7833 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7834 parser.add_argument(
7838 help="Skip that number of bytes from the beginning",
7840 parser.add_argument(
7842 help="Python paths to dictionary with OIDs, comma separated",
7844 parser.add_argument(
7846 help="Python path to schema definition to use",
7848 parser.add_argument(
7849 "--defines-by-path",
7850 help="Python path to decoder's defines_by_path",
7852 parser.add_argument(
7854 action="store_true",
7855 help="Disallow BER encoding",
7857 parser.add_argument(
7858 "--print-decode-path",
7859 action="store_true",
7860 help="Print decode paths",
7862 parser.add_argument(
7863 "--decode-path-only",
7864 help="Print only specified decode path",
7866 parser.add_argument(
7868 action="store_true",
7869 help="Allow explicit tag out-of-bound",
7871 parser.add_argument(
7873 action="store_true",
7874 help="Turn on event generation mode",
7876 parser.add_argument(
7878 action="store_true",
7879 help="Start ASN.1 browser",
7881 parser.add_argument(
7883 type=argparse.FileType("rb"),
7884 help="Path to BER/CER/DER file you want to decode",
7886 args = parser.parse_args()
7888 args.RAWFile.seek(args.skip)
7889 raw = memoryview(args.RAWFile.read())
7890 args.RAWFile.close()
7892 raw = file_mmaped(args.RAWFile)[args.skip:]
7894 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7895 if args.oids else ()
7897 from functools import partial
7899 schema = obj_by_path(args.schema)
7900 pprinter = partial(pprint, big_blobs=True)
7902 schema, pprinter = generic_decoder()
7904 "bered": not args.nobered,
7905 "allow_expl_oob": args.allow_expl_oob,
7907 if args.defines_by_path is not None:
7908 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7910 obj, _ = schema().decode(raw, ctx=ctx)
7911 browse(raw, obj, oid_maps)
7912 from sys import exit as sys_exit
7914 from os import environ
7918 with_colours=environ.get("NO_COLOR") is None,
7919 with_decode_path=args.print_decode_path,
7921 () if args.decode_path_only is None else
7922 tuple(args.decode_path_only.split(":"))
7926 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7927 print(pprinter(obj, decode_path=decode_path))
7929 obj, tail = schema().decode(raw, ctx=ctx)
7930 print(pprinter(obj))
7932 print("\nTrailing data: %s" % hexenc(tail))
7935 if __name__ == "__main__":
7936 from pyderasn import *