3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2021 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/CER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER/CER
23 format, unmarshal BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
358 Also it could be helpful to add quick ASN.1 pprinting command in your
359 pdb's configuration file::
361 alias pp1 import pyderasn ;; print(pyderasn.pprint(%1, oid_maps=(locals().get("OID_STR_TO_NAME", {}),)))
368 ASN.1 structures often have ANY and OCTET STRING fields, that are
369 DEFINED BY some previously met ObjectIdentifier. This library provides
370 ability to specify mapping between some OID and field that must be
371 decoded with specific specification.
378 :py:class:`pyderasn.ObjectIdentifier` field inside
379 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
380 necessary for decoding structures. For example, CMS (:rfc:`5652`)
383 class ContentInfo(Sequence):
385 ("contentType", ContentType(defines=((("content",), {
386 id_digestedData: DigestedData(),
387 id_signedData: SignedData(),
389 ("content", Any(expl=tag_ctxc(0))),
392 ``contentType`` field tells that it defines that ``content`` must be
393 decoded with ``SignedData`` specification, if ``contentType`` equals to
394 ``id-signedData``. The same applies to ``DigestedData``. If
395 ``contentType`` contains unknown OID, then no automatic decoding is
398 You can specify multiple fields, that will be autodecoded -- that is why
399 ``defines`` kwarg is a sequence. You can specify defined field
400 relatively or absolutely to current decode path. For example ``defines``
401 for AlgorithmIdentifier of X.509's
402 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
406 id_ecPublicKey: ECParameters(),
407 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
409 (("..", "subjectPublicKey"), {
410 id_rsaEncryption: RSAPublicKey(),
411 id_GostR3410_2001: OctetString(),
415 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
416 autodecode its parameters inside SPKI's algorithm and its public key
419 Following types can be automatically decoded (DEFINED BY):
421 * :py:class:`pyderasn.Any`
422 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
423 * :py:class:`pyderasn.OctetString`
424 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
425 ``Any``/``BitString``/``OctetString``-s
427 When any of those fields is automatically decoded, then ``.defined``
428 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
429 was defined, ``value`` contains corresponding decoded value. For example
430 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
432 .. _defines_by_path_ctx:
434 defines_by_path context option
435 ______________________________
437 Sometimes you either can not or do not want to explicitly set *defines*
438 in the schema. You can dynamically apply those definitions when calling
439 :py:meth:`pyderasn.Obj.decode` method.
441 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
442 value must be sequence of following tuples::
444 (decode_path, defines)
446 where ``decode_path`` is a tuple holding so-called decode path to the
447 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
448 ``defines``, holding exactly the same value as accepted in its
449 :ref:`keyword argument <defines>`.
451 For example, again for CMS, you want to automatically decode
452 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
453 structures it may hold. Also, automatically decode ``controlSequence``
456 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
459 ((("content",), {id_signedData: SignedData()}),),
464 DecodePathDefBy(id_signedData),
469 id_cct_PKIData: PKIData(),
470 id_cct_PKIResponse: PKIResponse(),
476 DecodePathDefBy(id_signedData),
479 DecodePathDefBy(id_cct_PKIResponse),
485 id_cmc_recipientNonce: RecipientNonce(),
486 id_cmc_senderNonce: SenderNonce(),
487 id_cmc_statusInfoV2: CMCStatusInfoV2(),
488 id_cmc_transactionId: TransactionId(),
493 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
494 First function is useful for path construction when some automatic
495 decoding is already done. ``any`` means literally any value it meet --
496 useful for SEQUENCE/SET OF-s.
503 By default PyDERASN accepts only DER encoded data. By default it encodes
504 to DER. But you can optionally enable BER decoding with setting
505 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
506 constructed primitive types should be parsed successfully.
508 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
509 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
510 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
511 ``UTCTime``, ``GeneralizedTime`` can contain it.
512 * If object has an indefinite length encoding, then its ``lenindef``
513 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
514 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
516 * If object has an indefinite length encoded explicit tag, then
517 ``expl_lenindef`` is set to True.
518 * If object has either any of BER-related encoding (explicit tag
519 indefinite length, object's indefinite length, BER-encoding) or any
520 underlying component has that kind of encoding, then ``bered``
521 attribute is set to True. For example SignedData CMS can have
522 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
523 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
525 EOC (end-of-contents) token's length is taken in advance in object's
528 .. _allow_expl_oob_ctx:
530 Allow explicit tag out-of-bound
531 -------------------------------
533 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
534 one value, more than one object. If you set ``allow_expl_oob`` context
535 option to True, then no error will be raised and that invalid encoding
536 will be silently further processed. But pay attention that offsets and
537 lengths will be invalid in that case.
541 This option should be used only for skipping some decode errors, just
542 to see the decoded structure somehow.
546 Streaming and dealing with huge structures
547 ------------------------------------------
554 ASN.1 structures can be huge, they can hold millions of objects inside
555 (for example Certificate Revocation Lists (CRL), holding revocation
556 state for every previously issued X.509 certificate). CACert.org's 8 MiB
557 CRL file takes more than half a gigabyte of memory to hold the decoded
560 If you just simply want to check the signature over the ``tbsCertList``,
561 you can create specialized schema with that field represented as
562 OctetString for example::
564 class TBSCertListFast(Sequence):
567 ("revokedCertificates", OctetString(
568 impl=SequenceOf.tag_default,
574 This allows you to quickly decode a few fields and check the signature
575 over the ``tbsCertList`` bytes.
577 But how can you get all certificate's serial number from it, after you
578 trust that CRL after signature validation? You can use so called
579 ``evgen`` (event generation) mode, to catch the events/facts of some
580 successful object decoding. Let's use command line capabilities::
582 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
583 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
584 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
585 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
586 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
587 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
588 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
589 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
590 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
592 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
593 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
594 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
596 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
597 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
598 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
599 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
600 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
601 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
603 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
604 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
605 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
606 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
607 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
608 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
609 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
610 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
611 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
612 0 [1,4,9145534] CertificateList SEQUENCE
614 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
615 decodes underlying values. It can not tell if SEQUENCE is decoded, so
616 the event of the upper level SEQUENCE is the last one we see.
617 ``version`` field is just a single INTEGER -- it is decoded and event is
618 fired immediately. Then we see that ``algorithm`` and ``parameters``
619 fields are decoded and only after them the ``signature`` SEQUENCE is
620 fired as a successfully decoded. There are 4 events for each revoked
621 certificate entry in that CRL: ``userCertificate`` serial number,
622 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
623 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
625 We can do that in our ordinary Python code and understand where we are
626 by looking at deterministically generated decode paths (do not forget
627 about useful ``--print-decode-path`` CLI option). We must use
628 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
629 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
630 obj, tail)`` tuples::
632 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
634 len(decode_path) == 4 and
635 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
636 decode_path[3] == "userCertificate"
638 print("serial number:", int(obj))
640 Virtually it does not take any memory except at least needed for single
641 object storage. You can easily use that mode to determine required
642 object ``.offset`` and ``.*len`` to be able to decode it separately, or
643 maybe verify signature upon it just by taking bytes by ``.offset`` and
646 .. _evgen_mode_upto_ctx:
651 There is full ability to get any kind of data from the CRL in the
652 example above. However it is not too convenient to get the whole
653 ``RevokedCertificate`` structure, that is pretty lightweight and one may
654 do not want to disassemble it. You can use ``evgen_mode_upto``
655 :ref:`ctx <ctx>` option that semantically equals to
656 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
657 mapped to any non-None value. If specified decode path is met, then any
658 subsequent objects won't be decoded in evgen mode. That allows us to
659 parse the CRL above with fully assembled ``RevokedCertificate``::
661 for decode_path, obj, _ in CertificateList().decode_evgen(
663 ctx={"evgen_mode_upto": (
664 (("tbsCertList", "revokedCertificates", any), True),
668 len(decode_path) == 3 and
669 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
671 print("serial number:", int(obj["userCertificate"]))
675 SEQUENCE/SET values with DEFAULT specified are automatically decoded
683 POSIX compliant systems have ``mmap`` syscall, giving ability to work
684 the memory mapped file. You can deal with the file like it was an
685 ordinary binary string, allowing you not to load it to the memory first.
686 Also you can use them as an input for OCTET STRING, taking no Python
687 memory for their storage.
689 There is convenient :py:func:`pyderasn.file_mmaped` function that
690 creates read-only memoryview on the file contents::
692 with open("huge", "rb") as fd:
693 raw = file_mmaped(fd)
694 obj = Something.decode(raw)
698 mmap-ed files in Python2.7 does not implement buffer protocol, so
699 memoryview won't work on them.
703 mmap maps the **whole** file. So it plays no role if you seek-ed it
704 before. Take the slice of the resulting memoryview with required
709 If you use ZFS as underlying storage, then pay attention that
710 currently most platforms does not deal good with ZFS ARC and ordinary
711 page cache used for mmaps. It can take twice the necessary size in
712 the memory: both in page cache and ZFS ARC.
717 We can parse any kind of data now, but how can we produce files
718 streamingly, without storing their encoded representation in memory?
719 SEQUENCE by default encodes in memory all its values, joins them in huge
720 binary string, just to know the exact size of SEQUENCE's value for
721 encoding it in TLV. DER requires you to know all exact sizes of the
724 You can use CER encoding mode, that slightly differs from the DER, but
725 does not require exact sizes knowledge, allowing streaming encoding
726 directly to some writer/buffer. Just use
727 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
728 encoded data will flow::
730 opener = io.open if PY2 else open
731 with opener("result", "wb") as fd:
732 obj.encode_cer(fd.write)
737 obj.encode_cer(buf.write)
739 If you do not want to create in-memory buffer every time, then you can
740 use :py:func:`pyderasn.encode_cer` function::
742 data = encode_cer(obj)
744 Remember that CER is **not valid** DER in most cases, so you **have to**
745 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
746 decoding. Also currently there is **no** validation that provided CER is
747 valid one -- you are sure that it has only valid BER encoding.
751 SET OF values can not be streamingly encoded, because they are
752 required to be sorted byte-by-byte. Big SET OF values still will take
753 much memory. Use neither SET nor SET OF values, as modern ASN.1
756 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
757 OCTET STRINGs! They will be streamingly copied from underlying file to
758 the buffer using 1 KB chunks.
760 Some structures require that some of the elements have to be forcefully
761 DER encoded. For example ``SignedData`` CMS requires you to encode
762 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
763 encode everything else in BER. You can tell any of the structures to be
764 forcefully encoded in DER during CER encoding, by specifying
765 ``der_forced=True`` attribute::
767 class Certificate(Sequence):
771 class SignedAttributes(SetOf):
773 bounds = (1, float("+inf"))
776 .. _agg_octet_string:
781 In most cases, huge quantity of binary data is stored as OCTET STRING.
782 CER encoding splits it on 1 KB chunks. BER allows splitting on various
783 levels of chunks inclusion::
785 SOME STRING[CONSTRUCTED]
786 OCTET STRING[CONSTRUCTED]
787 OCTET STRING[PRIMITIVE]
789 OCTET STRING[PRIMITIVE]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[PRIMITIVE]
798 OCTET STRING[PRIMITIVE]
800 OCTET STRING[CONSTRUCTED]
801 OCTET STRING[CONSTRUCTED]
802 OCTET STRING[PRIMITIVE]
805 You can not just take the offset and some ``.vlen`` of the STRING and
806 treat it as the payload. If you decode it without
807 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
808 and ``bytes()`` will give the whole payload contents.
810 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
811 small memory footprint. There is convenient
812 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
813 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
814 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
815 SHA512 digest of its ``eContent``::
817 fd = open("data.p7m", "rb")
818 raw = file_mmaped(fd)
819 ctx = {"bered": True}
820 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
821 if decode_path == ("content",):
825 raise ValueError("no content found")
826 hasher_state = sha512()
828 hasher_state.update(data)
830 evgens = SignedData().decode_evgen(
831 raw[content.offset:],
832 offset=content.offset,
835 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
837 digest = hasher_state.digest()
839 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
840 copy the payload (without BER/CER encoding interleaved overhead) in it.
841 Virtually it won't take memory more than for keeping small structures
842 and 1 KB binary chunks.
846 SEQUENCE OF iterators
847 _____________________
849 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
850 classes. The only difference with providing the full list of objects, is
851 that type and bounds checking is done during encoding process. Also
852 sequence's value will be emptied after encoding, forcing you to set its
855 This is very useful when you have to create some huge objects, like
856 CRLs, with thousands and millions of entities inside. You can write the
857 generator taking necessary data from the database and giving the
858 ``RevokedCertificate`` objects. Only binary representation of that
859 objects will take memory during DER encoding.
864 There is ability to do 2-pass encoding to DER, writing results directly
865 to specified writer (buffer, file, whatever). It could be 1.5+ times
866 slower than ordinary encoding, but it takes little memory for 1st pass
867 state storing. For example, 1st pass state for CACert.org's CRL with
868 ~416K of certificate entries takes nearly 3.5 MB of memory.
869 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
870 nearly 0.5 KB of memory.
872 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
873 iterators <seqof-iterators>` and write directly to opened file, then
874 there is very small memory footprint.
876 1st pass traverses through all the objects of the structure and returns
877 the size of DER encoded structure, together with 1st pass state object.
878 That state contains precalculated lengths for various objects inside the
883 fulllen, state = obj.encode1st()
885 2nd pass takes the writer and 1st pass state. It traverses through all
886 the objects again, but writes their encoded representation to the writer.
890 opener = io.open if PY2 else open
891 with opener("result", "wb") as fd:
892 obj.encode2nd(fd.write, iter(state))
896 You **MUST NOT** use 1st pass state if anything is changed in the
897 objects. It is intended to be used immediately after 1st pass is
900 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
901 have to reinitialize the values after the 1st pass. And you **have to**
902 be sure that the iterator gives exactly the same values as previously.
903 Yes, you have to run your iterator twice -- because this is two pass
906 If you want to encode to the memory, then you can use convenient
907 :py:func:`pyderasn.encode2pass` helper.
913 .. autofunction:: pyderasn.browse
917 .. autoclass:: pyderasn.Obj
925 .. autoclass:: pyderasn.Boolean
930 .. autoclass:: pyderasn.Integer
931 :members: __init__, named, tohex
935 .. autoclass:: pyderasn.BitString
936 :members: __init__, bit_len, named
940 .. autoclass:: pyderasn.OctetString
945 .. autoclass:: pyderasn.Null
950 .. autoclass:: pyderasn.ObjectIdentifier
955 .. autoclass:: pyderasn.Enumerated
959 .. autoclass:: pyderasn.CommonString
963 .. autoclass:: pyderasn.NumericString
967 .. autoclass:: pyderasn.PrintableString
968 :members: __init__, allow_asterisk, allow_ampersand
972 .. autoclass:: pyderasn.IA5String
976 .. autoclass:: pyderasn.VisibleString
980 .. autoclass:: pyderasn.UTCTime
981 :members: __init__, todatetime
985 .. autoclass:: pyderasn.GeneralizedTime
986 :members: __init__, todatetime
993 .. autoclass:: pyderasn.Choice
994 :members: __init__, choice, value
998 .. autoclass:: PrimitiveTypes
1002 .. autoclass:: pyderasn.Any
1010 .. autoclass:: pyderasn.Sequence
1015 .. autoclass:: pyderasn.Set
1020 .. autoclass:: pyderasn.SequenceOf
1025 .. autoclass:: pyderasn.SetOf
1031 .. autofunction:: pyderasn.abs_decode_path
1032 .. autofunction:: pyderasn.agg_octet_string
1033 .. autofunction:: pyderasn.ascii_visualize
1034 .. autofunction:: pyderasn.colonize_hex
1035 .. autofunction:: pyderasn.encode2pass
1036 .. autofunction:: pyderasn.encode_cer
1037 .. autofunction:: pyderasn.file_mmaped
1038 .. autofunction:: pyderasn.hexenc
1039 .. autofunction:: pyderasn.hexdec
1040 .. autofunction:: pyderasn.hexdump
1041 .. autofunction:: pyderasn.tag_encode
1042 .. autofunction:: pyderasn.tag_decode
1043 .. autofunction:: pyderasn.tag_ctxp
1044 .. autofunction:: pyderasn.tag_ctxc
1045 .. autoclass:: pyderasn.DecodeError
1047 .. autoclass:: pyderasn.NotEnoughData
1048 .. autoclass:: pyderasn.ExceedingData
1049 .. autoclass:: pyderasn.LenIndefForm
1050 .. autoclass:: pyderasn.TagMismatch
1051 .. autoclass:: pyderasn.InvalidLength
1052 .. autoclass:: pyderasn.InvalidOID
1053 .. autoclass:: pyderasn.ObjUnknown
1054 .. autoclass:: pyderasn.ObjNotReady
1055 .. autoclass:: pyderasn.InvalidValueType
1056 .. autoclass:: pyderasn.BoundsError
1063 You can decode DER/BER files using command line abilities::
1065 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1067 If there is no schema for your file, then you can try parsing it without,
1068 but of course IMPLICIT tags will often make it impossible. But result is
1069 good enough for the certificate above::
1071 $ python -m pyderasn path/to/file
1072 0 [1,3,1604] . >: SEQUENCE OF
1073 4 [1,3,1453] . . >: SEQUENCE OF
1074 8 [0,0, 5] . . . . >: [0] ANY
1075 . . . . . A0:03:02:01:02
1076 13 [1,1, 3] . . . . >: INTEGER 61595
1077 18 [1,1, 13] . . . . >: SEQUENCE OF
1078 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1079 31 [1,1, 0] . . . . . . >: NULL
1080 33 [1,3, 274] . . . . >: SEQUENCE OF
1081 37 [1,1, 11] . . . . . . >: SET OF
1082 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1083 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1084 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1086 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1087 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1088 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1089 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1090 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1091 . . . . . . . . . 61:2E:63:6F:6D:2F
1092 1461 [1,1, 13] . . >: SEQUENCE OF
1093 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1094 1474 [1,1, 0] . . . . >: NULL
1095 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1096 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1097 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1103 If you have got dictionaries with ObjectIdentifiers, like example one
1104 from ``tests/test_crts.py``::
1107 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1108 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1110 "2.5.4.10": "id-at-organizationName",
1111 "2.5.4.11": "id-at-organizationalUnitName",
1114 then you can pass it to pretty printer to see human readable OIDs::
1116 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1118 37 [1,1, 11] . . . . . . >: SET OF
1119 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1120 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1121 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1122 50 [1,1, 18] . . . . . . >: SET OF
1123 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1124 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1125 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1126 70 [1,1, 18] . . . . . . >: SET OF
1127 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1128 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1129 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1135 Each decoded element has so-called decode path: sequence of structure
1136 names it is passing during the decode process. Each element has its own
1137 unique path inside the whole ASN.1 tree. You can print it out with
1138 ``--print-decode-path`` option::
1140 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1141 0 [1,3,1604] Certificate SEQUENCE []
1142 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1143 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1144 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1145 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1146 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1147 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1149 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1150 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1151 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1152 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1153 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1154 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1155 . . . . . . . 13:02:45:53
1156 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1159 Now you can print only the specified tree, for example signature algorithm::
1161 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1162 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1163 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1164 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1168 from array import array
1169 from codecs import getdecoder
1170 from codecs import getencoder
1171 from collections import namedtuple
1172 from collections import OrderedDict
1173 from copy import copy
1174 from datetime import datetime
1175 from datetime import timedelta
1176 from io import BytesIO
1177 from math import ceil
1178 from operator import attrgetter
1179 from string import ascii_letters
1180 from string import digits
1181 from sys import maxsize as sys_maxsize
1182 from sys import version_info
1183 from unicodedata import category as unicat
1185 from six import add_metaclass
1186 from six import binary_type
1187 from six import byte2int
1188 from six import indexbytes
1189 from six import int2byte
1190 from six import integer_types
1191 from six import iterbytes
1192 from six import iteritems
1193 from six import itervalues
1195 from six import string_types
1196 from six import text_type
1197 from six import unichr as six_unichr
1198 from six.moves import xrange as six_xrange
1202 from termcolor import colored
1203 except ImportError: # pragma: no cover
1204 def colored(what, *args, **kwargs):
1255 "TagClassApplication",
1258 "TagClassUniversal",
1259 "TagFormConstructed",
1270 TagClassUniversal = 0
1271 TagClassApplication = 1 << 6
1272 TagClassContext = 1 << 7
1273 TagClassPrivate = 1 << 6 | 1 << 7
1274 TagFormPrimitive = 0
1275 TagFormConstructed = 1 << 5
1277 TagClassContext: "",
1278 TagClassApplication: "APPLICATION ",
1279 TagClassPrivate: "PRIVATE ",
1280 TagClassUniversal: "UNIV ",
1284 LENINDEF = b"\x80" # length indefinite mark
1285 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1286 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1287 SET01 = frozenset("01")
1288 DECIMALS = frozenset(digits)
1289 DECIMAL_SIGNS = ".,"
1290 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1293 def file_mmaped(fd):
1294 """Make mmap-ed memoryview for reading from file
1296 :param fd: file object
1297 :returns: memoryview over read-only mmap-ing of the whole file
1301 It is known to work under neither Python 2.x nor Windows.
1304 return memoryview(mmap.mmap(fd.fileno(), length=0, prot=mmap.PROT_READ))
1308 if not set(value) <= DECIMALS:
1309 raise ValueError("non-pure integer")
1313 def fractions2float(fractions_raw):
1314 pureint(fractions_raw)
1315 return float("0." + fractions_raw)
1318 def get_def_by_path(defines_by_path, sub_decode_path):
1319 """Get define by decode path
1321 for path, define in defines_by_path:
1322 if len(path) != len(sub_decode_path):
1324 for p1, p2 in zip(path, sub_decode_path):
1325 if (p1 is not any) and (p1 != p2):
1331 ########################################################################
1333 ########################################################################
1335 class ASN1Error(ValueError):
1339 class DecodeError(ASN1Error):
1340 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1342 :param str msg: reason of decode failing
1343 :param klass: optional exact DecodeError inherited class (like
1344 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1345 :py:exc:`InvalidLength`)
1346 :param decode_path: tuple of strings. It contains human
1347 readable names of the fields through which
1348 decoding process has passed
1349 :param int offset: binary offset where failure happened
1351 super(DecodeError, self).__init__()
1354 self.decode_path = decode_path
1355 self.offset = offset
1360 "" if self.klass is None else self.klass.__name__,
1362 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1363 if len(self.decode_path) > 0 else ""
1365 ("(at %d)" % self.offset) if self.offset > 0 else "",
1371 return "%s(%s)" % (self.__class__.__name__, self)
1374 class NotEnoughData(DecodeError):
1378 class ExceedingData(ASN1Error):
1379 def __init__(self, nbytes):
1380 super(ExceedingData, self).__init__()
1381 self.nbytes = nbytes
1384 return "%d trailing bytes" % self.nbytes
1387 return "%s(%s)" % (self.__class__.__name__, self)
1390 class LenIndefForm(DecodeError):
1394 class TagMismatch(DecodeError):
1398 class InvalidLength(DecodeError):
1402 class InvalidOID(DecodeError):
1406 class ObjUnknown(ASN1Error):
1407 def __init__(self, name):
1408 super(ObjUnknown, self).__init__()
1412 return "object is unknown: %s" % self.name
1415 return "%s(%s)" % (self.__class__.__name__, self)
1418 class ObjNotReady(ASN1Error):
1419 def __init__(self, name):
1420 super(ObjNotReady, self).__init__()
1424 return "object is not ready: %s" % self.name
1427 return "%s(%s)" % (self.__class__.__name__, self)
1430 class InvalidValueType(ASN1Error):
1431 def __init__(self, expected_types):
1432 super(InvalidValueType, self).__init__()
1433 self.expected_types = expected_types
1436 return "invalid value type, expected: %s" % ", ".join(
1437 [repr(t) for t in self.expected_types]
1441 return "%s(%s)" % (self.__class__.__name__, self)
1444 class BoundsError(ASN1Error):
1445 def __init__(self, bound_min, value, bound_max):
1446 super(BoundsError, self).__init__()
1447 self.bound_min = bound_min
1449 self.bound_max = bound_max
1452 return "unsatisfied bounds: %s <= %s <= %s" % (
1459 return "%s(%s)" % (self.__class__.__name__, self)
1462 ########################################################################
1464 ########################################################################
1466 _hexdecoder = getdecoder("hex")
1467 _hexencoder = getencoder("hex")
1471 """Binary data to hexadecimal string convert
1473 return _hexdecoder(data)[0]
1477 """Hexadecimal string to binary data convert
1479 return _hexencoder(data)[0].decode("ascii")
1482 def int_bytes_len(num, byte_len=8):
1485 return int(ceil(float(num.bit_length()) / byte_len))
1488 def zero_ended_encode(num):
1489 octets = bytearray(int_bytes_len(num, 7))
1491 octets[i] = num & 0x7F
1495 octets[i] = 0x80 | (num & 0x7F)
1498 return bytes(octets)
1501 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1502 """Encode tag to binary form
1504 :param int num: tag's number
1505 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1506 :py:data:`pyderasn.TagClassContext`,
1507 :py:data:`pyderasn.TagClassApplication`,
1508 :py:data:`pyderasn.TagClassPrivate`)
1509 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1510 :py:data:`pyderasn.TagFormConstructed`)
1514 return int2byte(klass | form | num)
1515 # [XX|X|11111][1.......][1.......] ... [0.......]
1516 return int2byte(klass | form | 31) + zero_ended_encode(num)
1519 def tag_decode(tag):
1520 """Decode tag from binary form
1524 No validation is performed, assuming that it has already passed.
1526 It returns tuple with three integers, as
1527 :py:func:`pyderasn.tag_encode` accepts.
1529 first_octet = byte2int(tag)
1530 klass = first_octet & 0xC0
1531 form = first_octet & 0x20
1532 if first_octet & 0x1F < 0x1F:
1533 return (klass, form, first_octet & 0x1F)
1535 for octet in iterbytes(tag[1:]):
1538 return (klass, form, num)
1542 """Create CONTEXT PRIMITIVE tag
1544 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1548 """Create CONTEXT CONSTRUCTED tag
1550 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1553 def tag_strip(data):
1554 """Take off tag from the data
1556 :returns: (encoded tag, tag length, remaining data)
1559 raise NotEnoughData("no data at all")
1560 if byte2int(data) & 0x1F < 31:
1561 return data[:1], 1, data[1:]
1566 raise DecodeError("unfinished tag")
1567 if indexbytes(data, i) & 0x80 == 0:
1569 if i == 1 and indexbytes(data, 1) < 0x1F:
1570 raise DecodeError("unexpected long form")
1571 if i > 1 and indexbytes(data, 1) & 0x7F == 0:
1572 raise DecodeError("leading zero byte in tag value")
1574 return data[:i], i, data[i:]
1580 octets = bytearray(int_bytes_len(l) + 1)
1581 octets[0] = 0x80 | (len(octets) - 1)
1582 for i in six_xrange(len(octets) - 1, 0, -1):
1583 octets[i] = l & 0xFF
1585 return bytes(octets)
1588 def len_decode(data):
1591 :returns: (decoded length, length's length, remaining data)
1592 :raises LenIndefForm: if indefinite form encoding is met
1595 raise NotEnoughData("no data at all")
1596 first_octet = byte2int(data)
1597 if first_octet & 0x80 == 0:
1598 return first_octet, 1, data[1:]
1599 octets_num = first_octet & 0x7F
1600 if octets_num + 1 > len(data):
1601 raise NotEnoughData("encoded length is longer than data")
1603 raise LenIndefForm()
1604 if byte2int(data[1:]) == 0:
1605 raise DecodeError("leading zeros")
1607 for v in iterbytes(data[1:1 + octets_num]):
1610 raise DecodeError("long form instead of short one")
1611 return l, 1 + octets_num, data[1 + octets_num:]
1614 LEN0 = len_encode(0)
1615 LEN1 = len_encode(1)
1616 LEN1K = len_encode(1000)
1620 """How many bytes length field will take
1624 if l < 256: # 1 << 8
1626 if l < 65536: # 1 << 16
1628 if l < 16777216: # 1 << 24
1630 if l < 4294967296: # 1 << 32
1632 if l < 1099511627776: # 1 << 40
1634 if l < 281474976710656: # 1 << 48
1636 if l < 72057594037927936: # 1 << 56
1638 raise OverflowError("too big length")
1641 def write_full(writer, data):
1642 """Fully write provided data
1644 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1646 BytesIO does not guarantee that the whole data will be written at
1647 once. That function write everything provided, raising an error if
1648 ``writer`` returns None.
1650 data = memoryview(data)
1652 while written != len(data):
1653 n = writer(data[written:])
1655 raise ValueError("can not write to buf")
1659 # If it is 64-bit system, then use compact 64-bit array of unsigned
1660 # longs. Use an ordinary list with universal integers otherwise, that
1662 if sys_maxsize > 2 ** 32:
1663 def state_2pass_new():
1666 def state_2pass_new():
1670 ########################################################################
1672 ########################################################################
1674 class AutoAddSlots(type):
1675 def __new__(cls, name, bases, _dict):
1676 _dict["__slots__"] = _dict.get("__slots__", ())
1677 return type.__new__(cls, name, bases, _dict)
1680 BasicState = namedtuple("BasicState", (
1693 ), **NAMEDTUPLE_KWARGS)
1696 @add_metaclass(AutoAddSlots)
1698 """Common ASN.1 object class
1700 All ASN.1 types are inherited from it. It has metaclass that
1701 automatically adds ``__slots__`` to all inherited classes.
1726 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1727 self._expl = getattr(self, "expl", None) if expl is None else expl
1728 if self.tag != self.tag_default and self._expl is not None:
1729 raise ValueError("implicit and explicit tags can not be set simultaneously")
1730 if self.tag is None:
1731 self._tag_order = None
1733 tag_class, _, tag_num = tag_decode(
1734 self.tag if self._expl is None else self._expl
1736 self._tag_order = (tag_class, tag_num)
1737 if default is not None:
1739 self.optional = optional
1740 self.offset, self.llen, self.vlen = _decoded
1742 self.expl_lenindef = False
1743 self.lenindef = False
1744 self.ber_encoded = False
1747 def ready(self): # pragma: no cover
1748 """Is object ready to be encoded?
1750 raise NotImplementedError()
1752 def _assert_ready(self):
1754 raise ObjNotReady(self.__class__.__name__)
1758 """Is either object or any elements inside is BER encoded?
1760 return self.expl_lenindef or self.lenindef or self.ber_encoded
1764 """Is object decoded?
1766 return (self.llen + self.vlen) > 0
1768 def __getstate__(self): # pragma: no cover
1769 """Used for making safe to be mutable pickleable copies
1771 raise NotImplementedError()
1773 def __setstate__(self, state):
1774 if state.version != __version__:
1775 raise ValueError("data is pickled by different PyDERASN version")
1776 self.tag = state.tag
1777 self._tag_order = state.tag_order
1778 self._expl = state.expl
1779 self.default = state.default
1780 self.optional = state.optional
1781 self.offset = state.offset
1782 self.llen = state.llen
1783 self.vlen = state.vlen
1784 self.expl_lenindef = state.expl_lenindef
1785 self.lenindef = state.lenindef
1786 self.ber_encoded = state.ber_encoded
1789 def tag_order(self):
1790 """Tag's (class, number) used for DER/CER sorting
1792 return self._tag_order
1795 def tag_order_cer(self):
1796 return self.tag_order
1800 """.. seealso:: :ref:`decoding`
1802 return len(self.tag)
1806 """.. seealso:: :ref:`decoding`
1808 return self.tlen + self.llen + self.vlen
1810 def __str__(self): # pragma: no cover
1811 return self.__bytes__() if PY2 else self.__unicode__()
1813 def __ne__(self, their):
1814 return not(self == their)
1816 def __gt__(self, their): # pragma: no cover
1817 return not(self < their)
1819 def __le__(self, their): # pragma: no cover
1820 return (self == their) or (self < their)
1822 def __ge__(self, their): # pragma: no cover
1823 return (self == their) or (self > their)
1825 def _encode(self): # pragma: no cover
1826 raise NotImplementedError()
1828 def _encode_cer(self, writer):
1829 write_full(writer, self._encode())
1831 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1832 yield NotImplemented
1834 def _encode1st(self, state):
1835 raise NotImplementedError()
1837 def _encode2nd(self, writer, state_iter):
1838 raise NotImplementedError()
1841 """DER encode the structure
1843 :returns: DER representation
1845 raw = self._encode()
1846 if self._expl is None:
1848 return b"".join((self._expl, len_encode(len(raw)), raw))
1850 def encode1st(self, state=None):
1851 """Do the 1st pass of 2-pass encoding
1853 :rtype: (int, array("L"))
1854 :returns: full length of encoded data and precalculated various
1858 state = state_2pass_new()
1859 if self._expl is None:
1860 return self._encode1st(state)
1862 idx = len(state) - 1
1863 vlen, _ = self._encode1st(state)
1865 fulllen = len(self._expl) + len_size(vlen) + vlen
1866 return fulllen, state
1868 def encode2nd(self, writer, state_iter):
1869 """Do the 2nd pass of 2-pass encoding
1871 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1872 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1874 if self._expl is None:
1875 self._encode2nd(writer, state_iter)
1877 write_full(writer, self._expl + len_encode(next(state_iter)))
1878 self._encode2nd(writer, state_iter)
1880 def encode_cer(self, writer):
1881 """CER encode the structure to specified writer
1883 :param writer: must comply with ``io.RawIOBase.write``
1884 behaviour. It takes slice to be written and
1885 returns number of bytes processed. If it returns
1886 None, then exception will be raised
1888 if self._expl is not None:
1889 write_full(writer, self._expl + LENINDEF)
1890 if getattr(self, "der_forced", False):
1891 write_full(writer, self._encode())
1893 self._encode_cer(writer)
1894 if self._expl is not None:
1895 write_full(writer, EOC)
1897 def hexencode(self):
1898 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1900 return hexenc(self.encode())
1910 _ctx_immutable=True,
1914 :param data: either binary or memoryview
1915 :param int offset: initial data's offset
1916 :param bool leavemm: do we need to leave memoryview of remaining
1917 data as is, or convert it to bytes otherwise
1918 :param decode_path: current decode path (tuples of strings,
1919 possibly with DecodePathDefBy) with will be
1920 the root for all underlying objects
1921 :param ctx: optional :ref:`context <ctx>` governing decoding process
1922 :param bool tag_only: decode only the tag, without length and
1923 contents (used only in Choice and Set
1924 structures, trying to determine if tag satisfies
1926 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1928 :returns: (Obj, remaining data)
1930 .. seealso:: :ref:`decoding`
1932 result = next(self.decode_evgen(
1944 _, obj, tail = result
1955 _ctx_immutable=True,
1958 """Decode with evgen mode on
1960 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1961 it returns the generator producing ``(decode_path, obj, tail)``
1963 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1967 elif _ctx_immutable:
1969 tlv = memoryview(data)
1972 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1975 if self._expl is None:
1976 for result in self._decode(
1979 decode_path=decode_path,
1982 evgen_mode=_evgen_mode,
1987 _decode_path, obj, tail = result
1988 if _decode_path is not decode_path:
1992 t, tlen, lv = tag_strip(tlv)
1993 except DecodeError as err:
1994 raise err.__class__(
1996 klass=self.__class__,
1997 decode_path=decode_path,
2002 klass=self.__class__,
2003 decode_path=decode_path,
2007 l, llen, v = len_decode(lv)
2008 except LenIndefForm as err:
2009 if not ctx.get("bered", False):
2010 raise err.__class__(
2012 klass=self.__class__,
2013 decode_path=decode_path,
2017 offset += tlen + llen
2018 for result in self._decode(
2021 decode_path=decode_path,
2024 evgen_mode=_evgen_mode,
2026 if tag_only: # pragma: no cover
2029 _decode_path, obj, tail = result
2030 if _decode_path is not decode_path:
2032 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2033 if eoc_expected.tobytes() != EOC:
2036 klass=self.__class__,
2037 decode_path=decode_path,
2041 obj.expl_lenindef = True
2042 except DecodeError as err:
2043 raise err.__class__(
2045 klass=self.__class__,
2046 decode_path=decode_path,
2051 raise NotEnoughData(
2052 "encoded length is longer than data",
2053 klass=self.__class__,
2054 decode_path=decode_path,
2057 for result in self._decode(
2059 offset=offset + tlen + llen,
2060 decode_path=decode_path,
2063 evgen_mode=_evgen_mode,
2065 if tag_only: # pragma: no cover
2068 _decode_path, obj, tail = result
2069 if _decode_path is not decode_path:
2071 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2073 "explicit tag out-of-bound, longer than data",
2074 klass=self.__class__,
2075 decode_path=decode_path,
2078 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2080 def decod(self, data, offset=0, decode_path=(), ctx=None):
2081 """Decode the data, check that tail is empty
2083 :raises ExceedingData: if tail is not empty
2085 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2086 (decode without tail) that also checks that there is no
2089 obj, tail = self.decode(
2092 decode_path=decode_path,
2097 raise ExceedingData(len(tail))
2100 def hexdecode(self, data, *args, **kwargs):
2101 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2103 return self.decode(hexdec(data), *args, **kwargs)
2105 def hexdecod(self, data, *args, **kwargs):
2106 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2108 return self.decod(hexdec(data), *args, **kwargs)
2112 """.. seealso:: :ref:`decoding`
2114 return self._expl is not None
2118 """.. seealso:: :ref:`decoding`
2123 def expl_tlen(self):
2124 """.. seealso:: :ref:`decoding`
2126 return len(self._expl)
2129 def expl_llen(self):
2130 """.. seealso:: :ref:`decoding`
2132 if self.expl_lenindef:
2134 return len(len_encode(self.tlvlen))
2137 def expl_offset(self):
2138 """.. seealso:: :ref:`decoding`
2140 return self.offset - self.expl_tlen - self.expl_llen
2143 def expl_vlen(self):
2144 """.. seealso:: :ref:`decoding`
2149 def expl_tlvlen(self):
2150 """.. seealso:: :ref:`decoding`
2152 return self.expl_tlen + self.expl_llen + self.expl_vlen
2155 def fulloffset(self):
2156 """.. seealso:: :ref:`decoding`
2158 return self.expl_offset if self.expled else self.offset
2162 """.. seealso:: :ref:`decoding`
2164 return self.expl_tlvlen if self.expled else self.tlvlen
2166 def pps_lenindef(self, decode_path):
2167 if self.lenindef and not (
2168 getattr(self, "defined", None) is not None and
2169 self.defined[1].lenindef
2172 asn1_type_name="EOC",
2174 decode_path=decode_path,
2176 self.offset + self.tlvlen -
2177 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2185 if self.expl_lenindef:
2187 asn1_type_name="EOC",
2188 obj_name="EXPLICIT",
2189 decode_path=decode_path,
2190 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2199 def encode_cer(obj):
2200 """Encode to CER in memory buffer
2202 :returns bytes: memory buffer contents
2205 obj.encode_cer(buf.write)
2206 return buf.getvalue()
2209 def encode2pass(obj):
2210 """Encode (2-pass mode) to DER in memory buffer
2212 :returns bytes: memory buffer contents
2215 _, state = obj.encode1st()
2216 obj.encode2nd(buf.write, iter(state))
2217 return buf.getvalue()
2220 class DecodePathDefBy(object):
2221 """DEFINED BY representation inside decode path
2223 __slots__ = ("defined_by",)
2225 def __init__(self, defined_by):
2226 self.defined_by = defined_by
2228 def __ne__(self, their):
2229 return not(self == their)
2231 def __eq__(self, their):
2232 if not isinstance(their, self.__class__):
2234 return self.defined_by == their.defined_by
2237 return "DEFINED BY " + str(self.defined_by)
2240 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2243 ########################################################################
2245 ########################################################################
2247 PP = namedtuple("PP", (
2270 ), **NAMEDTUPLE_KWARGS)
2275 asn1_type_name="unknown",
2292 expl_lenindef=False,
2323 def _colourize(what, colour, with_colours, attrs=("bold",)):
2324 return colored(what, colour, attrs=attrs) if with_colours else what
2327 def colonize_hex(hexed):
2328 """Separate hexadecimal string with colons
2330 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2333 def find_oid_name(asn1_type_name, oid_maps, value):
2334 if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2335 for oid_map in oid_maps:
2336 oid_name = oid_map.get(value)
2337 if oid_name is not None:
2348 with_decode_path=False,
2349 decode_path_len_decrease=0,
2356 " " if pp.expl_offset is None else
2357 ("-%d" % (pp.offset - pp.expl_offset))
2359 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2361 col = _colourize(col, "red", with_colours, ())
2362 col += _colourize("B", "red", with_colours) if pp.bered else " "
2364 col = "[%d,%d,%4d]%s" % (
2365 pp.tlen, pp.llen, pp.vlen,
2366 LENINDEF_PP_CHAR if pp.lenindef else " "
2368 col = _colourize(col, "green", with_colours, ())
2370 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2371 if decode_path_len > 0:
2372 cols.append(" ." * decode_path_len)
2373 ent = pp.decode_path[-1]
2374 if isinstance(ent, DecodePathDefBy):
2375 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2376 value = str(ent.defined_by)
2377 oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2378 if oid_name is None:
2379 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2381 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2383 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2384 if pp.expl is not None:
2385 klass, _, num = pp.expl
2386 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2387 cols.append(_colourize(col, "blue", with_colours))
2388 if pp.impl is not None:
2389 klass, _, num = pp.impl
2390 col = "[%s%d]" % (TagClassReprs[klass], num)
2391 cols.append(_colourize(col, "blue", with_colours))
2392 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2393 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2395 cols.append(_colourize("BER", "red", with_colours))
2396 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2397 if pp.value is not None:
2399 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2400 oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2401 if oid_name is not None:
2402 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2403 if pp.asn1_type_name == Integer.asn1_type_name:
2404 cols.append(_colourize(
2405 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2408 if pp.blob.__class__ == binary_type:
2409 cols.append(hexenc(pp.blob))
2410 elif pp.blob.__class__ == tuple:
2411 cols.append(", ".join(pp.blob))
2413 cols.append(_colourize("OPTIONAL", "red", with_colours))
2415 cols.append(_colourize("DEFAULT", "red", with_colours))
2416 if with_decode_path:
2417 cols.append(_colourize(
2418 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2422 return " ".join(cols)
2425 def pp_console_blob(pp, decode_path_len_decrease=0):
2426 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2427 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2428 if decode_path_len > 0:
2429 cols.append(" ." * (decode_path_len + 1))
2430 if pp.blob.__class__ == binary_type:
2431 blob = hexenc(pp.blob).upper()
2432 for i in six_xrange(0, len(blob), 32):
2433 chunk = blob[i:i + 32]
2434 yield " ".join(cols + [colonize_hex(chunk)])
2435 elif pp.blob.__class__ == tuple:
2436 yield " ".join(cols + [", ".join(pp.blob)])
2444 with_decode_path=False,
2445 decode_path_only=(),
2448 """Pretty print object
2450 :param Obj obj: object you want to pretty print
2451 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2452 Its human readable form is printed when OID is met
2453 :param big_blobs: if large binary objects are met (like OctetString
2454 values), do we need to print them too, on separate
2456 :param with_colours: colourize output, if ``termcolor`` library
2458 :param with_decode_path: print decode path
2459 :param decode_path_only: print only that specified decode path
2461 def _pprint_pps(pps):
2463 if hasattr(pp, "_fields"):
2465 decode_path_only != () and
2467 str(p) for p in pp.decode_path[:len(decode_path_only)]
2468 ) != decode_path_only
2472 yield pp_console_row(
2477 with_colours=with_colours,
2478 with_decode_path=with_decode_path,
2479 decode_path_len_decrease=len(decode_path_only),
2481 for row in pp_console_blob(
2483 decode_path_len_decrease=len(decode_path_only),
2487 yield pp_console_row(
2492 with_colours=with_colours,
2493 with_decode_path=with_decode_path,
2494 decode_path_len_decrease=len(decode_path_only),
2497 for row in _pprint_pps(pp):
2499 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2502 ########################################################################
2503 # ASN.1 primitive types
2504 ########################################################################
2506 BooleanState = namedtuple(
2508 BasicState._fields + ("value",),
2514 """``BOOLEAN`` boolean type
2516 >>> b = Boolean(True)
2518 >>> b == Boolean(True)
2524 tag_default = tag_encode(1)
2525 asn1_type_name = "BOOLEAN"
2537 :param value: set the value. Either boolean type, or
2538 :py:class:`pyderasn.Boolean` object
2539 :param bytes impl: override default tag with ``IMPLICIT`` one
2540 :param bytes expl: override default tag with ``EXPLICIT`` one
2541 :param default: set default value. Type same as in ``value``
2542 :param bool optional: is object ``OPTIONAL`` in sequence
2544 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2545 self._value = None if value is None else self._value_sanitize(value)
2546 if default is not None:
2547 default = self._value_sanitize(default)
2548 self.default = self.__class__(
2554 self._value = default
2556 def _value_sanitize(self, value):
2557 if value.__class__ == bool:
2559 if issubclass(value.__class__, Boolean):
2561 raise InvalidValueType((self.__class__, bool))
2565 return self._value is not None
2567 def __getstate__(self):
2568 return BooleanState(
2584 def __setstate__(self, state):
2585 super(Boolean, self).__setstate__(state)
2586 self._value = state.value
2588 def __nonzero__(self):
2589 self._assert_ready()
2593 self._assert_ready()
2596 def __eq__(self, their):
2597 if their.__class__ == bool:
2598 return self._value == their
2599 if not issubclass(their.__class__, Boolean):
2602 self._value == their._value and
2603 self.tag == their.tag and
2604 self._expl == their._expl
2615 return self.__class__(
2617 impl=self.tag if impl is None else impl,
2618 expl=self._expl if expl is None else expl,
2619 default=self.default if default is None else default,
2620 optional=self.optional if optional is None else optional,
2624 self._assert_ready()
2625 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2627 def _encode1st(self, state):
2628 return len(self.tag) + 2, state
2630 def _encode2nd(self, writer, state_iter):
2631 self._assert_ready()
2632 write_full(writer, self._encode())
2634 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2636 t, _, lv = tag_strip(tlv)
2637 except DecodeError as err:
2638 raise err.__class__(
2640 klass=self.__class__,
2641 decode_path=decode_path,
2646 klass=self.__class__,
2647 decode_path=decode_path,
2654 l, _, v = len_decode(lv)
2655 except DecodeError as err:
2656 raise err.__class__(
2658 klass=self.__class__,
2659 decode_path=decode_path,
2663 raise InvalidLength(
2664 "Boolean's length must be equal to 1",
2665 klass=self.__class__,
2666 decode_path=decode_path,
2670 raise NotEnoughData(
2671 "encoded length is longer than data",
2672 klass=self.__class__,
2673 decode_path=decode_path,
2676 first_octet = byte2int(v)
2678 if first_octet == 0:
2680 elif first_octet == 0xFF:
2682 elif ctx.get("bered", False):
2687 "unacceptable Boolean value",
2688 klass=self.__class__,
2689 decode_path=decode_path,
2692 obj = self.__class__(
2696 default=self.default,
2697 optional=self.optional,
2698 _decoded=(offset, 1, 1),
2700 obj.ber_encoded = ber_encoded
2701 yield decode_path, obj, v[1:]
2704 return pp_console_row(next(self.pps()))
2706 def pps(self, decode_path=()):
2709 asn1_type_name=self.asn1_type_name,
2710 obj_name=self.__class__.__name__,
2711 decode_path=decode_path,
2712 value=str(self._value) if self.ready else None,
2713 optional=self.optional,
2714 default=self == self.default,
2715 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2716 expl=None if self._expl is None else tag_decode(self._expl),
2721 expl_offset=self.expl_offset if self.expled else None,
2722 expl_tlen=self.expl_tlen if self.expled else None,
2723 expl_llen=self.expl_llen if self.expled else None,
2724 expl_vlen=self.expl_vlen if self.expled else None,
2725 expl_lenindef=self.expl_lenindef,
2726 ber_encoded=self.ber_encoded,
2729 for pp in self.pps_lenindef(decode_path):
2733 IntegerState = namedtuple(
2735 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2741 """``INTEGER`` integer type
2743 >>> b = Integer(-123)
2745 >>> b == Integer(-123)
2750 >>> Integer(2, bounds=(1, 3))
2752 >>> Integer(5, bounds=(1, 3))
2753 Traceback (most recent call last):
2754 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2758 class Version(Integer):
2765 >>> v = Version("v1")
2772 {'v3': 2, 'v1': 0, 'v2': 1}
2774 __slots__ = ("specs", "_bound_min", "_bound_max")
2775 tag_default = tag_encode(2)
2776 asn1_type_name = "INTEGER"
2790 :param value: set the value. Either integer type, named value
2791 (if ``schema`` is specified in the class), or
2792 :py:class:`pyderasn.Integer` object
2793 :param bounds: set ``(MIN, MAX)`` value constraint.
2794 (-inf, +inf) by default
2795 :param bytes impl: override default tag with ``IMPLICIT`` one
2796 :param bytes expl: override default tag with ``EXPLICIT`` one
2797 :param default: set default value. Type same as in ``value``
2798 :param bool optional: is object ``OPTIONAL`` in sequence
2800 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2802 specs = getattr(self, "schema", {}) if _specs is None else _specs
2803 self.specs = specs if specs.__class__ == dict else dict(specs)
2804 self._bound_min, self._bound_max = getattr(
2807 (float("-inf"), float("+inf")),
2808 ) if bounds is None else bounds
2809 if value is not None:
2810 self._value = self._value_sanitize(value)
2811 if default is not None:
2812 default = self._value_sanitize(default)
2813 self.default = self.__class__(
2819 if self._value is None:
2820 self._value = default
2822 def _value_sanitize(self, value):
2823 if isinstance(value, integer_types):
2825 elif issubclass(value.__class__, Integer):
2826 value = value._value
2827 elif value.__class__ == str:
2828 value = self.specs.get(value)
2830 raise ObjUnknown("integer value: %s" % value)
2832 raise InvalidValueType((self.__class__, int, str))
2833 if not self._bound_min <= value <= self._bound_max:
2834 raise BoundsError(self._bound_min, value, self._bound_max)
2839 return self._value is not None
2841 def __getstate__(self):
2842 return IntegerState(
2861 def __setstate__(self, state):
2862 super(Integer, self).__setstate__(state)
2863 self.specs = state.specs
2864 self._value = state.value
2865 self._bound_min = state.bound_min
2866 self._bound_max = state.bound_max
2869 self._assert_ready()
2870 return int(self._value)
2873 """Hexadecimal representation
2875 Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2877 hex_repr = hex(int(self))[2:].upper()
2878 if len(hex_repr) % 2 != 0:
2879 hex_repr = "0" + hex_repr
2883 self._assert_ready()
2884 return hash(b"".join((
2886 bytes(self._expl or b""),
2887 str(self._value).encode("ascii"),
2890 def __eq__(self, their):
2891 if isinstance(their, integer_types):
2892 return self._value == their
2893 if not issubclass(their.__class__, Integer):
2896 self._value == their._value and
2897 self.tag == their.tag and
2898 self._expl == their._expl
2901 def __lt__(self, their):
2902 return self._value < their._value
2906 """Return named representation (if exists) of the value
2908 for name, value in iteritems(self.specs):
2909 if value == self._value:
2922 return self.__class__(
2925 (self._bound_min, self._bound_max)
2926 if bounds is None else bounds
2928 impl=self.tag if impl is None else impl,
2929 expl=self._expl if expl is None else expl,
2930 default=self.default if default is None else default,
2931 optional=self.optional if optional is None else optional,
2935 def _encode_payload(self):
2936 self._assert_ready()
2940 octets = bytearray([0])
2944 octets = bytearray()
2946 octets.append((value & 0xFF) ^ 0xFF)
2948 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2951 octets = bytearray()
2953 octets.append(value & 0xFF)
2955 if octets[-1] & 0x80 > 0:
2958 octets = bytes(octets)
2960 bytes_len = ceil(value.bit_length() / 8) or 1
2963 octets = value.to_bytes(
2968 except OverflowError:
2975 octets = self._encode_payload()
2976 return b"".join((self.tag, len_encode(len(octets)), octets))
2978 def _encode1st(self, state):
2979 l = len(self._encode_payload())
2980 return len(self.tag) + len_size(l) + l, state
2982 def _encode2nd(self, writer, state_iter):
2983 write_full(writer, self._encode())
2985 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2987 t, _, lv = tag_strip(tlv)
2988 except DecodeError as err:
2989 raise err.__class__(
2991 klass=self.__class__,
2992 decode_path=decode_path,
2997 klass=self.__class__,
2998 decode_path=decode_path,
3005 l, llen, v = len_decode(lv)
3006 except DecodeError as err:
3007 raise err.__class__(
3009 klass=self.__class__,
3010 decode_path=decode_path,
3014 raise NotEnoughData(
3015 "encoded length is longer than data",
3016 klass=self.__class__,
3017 decode_path=decode_path,
3021 raise NotEnoughData(
3023 klass=self.__class__,
3024 decode_path=decode_path,
3027 v, tail = v[:l], v[l:]
3028 first_octet = byte2int(v)
3030 second_octet = byte2int(v[1:])
3032 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3033 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3036 "non normalized integer",
3037 klass=self.__class__,
3038 decode_path=decode_path,
3043 if first_octet & 0x80 > 0:
3044 octets = bytearray()
3045 for octet in bytearray(v):
3046 octets.append(octet ^ 0xFF)
3047 for octet in octets:
3048 value = (value << 8) | octet
3052 for octet in bytearray(v):
3053 value = (value << 8) | octet
3055 value = int.from_bytes(v, byteorder="big", signed=True)
3057 obj = self.__class__(
3059 bounds=(self._bound_min, self._bound_max),
3062 default=self.default,
3063 optional=self.optional,
3065 _decoded=(offset, llen, l),
3067 except BoundsError as err:
3070 klass=self.__class__,
3071 decode_path=decode_path,
3074 yield decode_path, obj, tail
3077 return pp_console_row(next(self.pps()))
3079 def pps(self, decode_path=()):
3082 asn1_type_name=self.asn1_type_name,
3083 obj_name=self.__class__.__name__,
3084 decode_path=decode_path,
3085 value=(self.named or str(self._value)) if self.ready else None,
3086 optional=self.optional,
3087 default=self == self.default,
3088 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3089 expl=None if self._expl is None else tag_decode(self._expl),
3094 expl_offset=self.expl_offset if self.expled else None,
3095 expl_tlen=self.expl_tlen if self.expled else None,
3096 expl_llen=self.expl_llen if self.expled else None,
3097 expl_vlen=self.expl_vlen if self.expled else None,
3098 expl_lenindef=self.expl_lenindef,
3101 for pp in self.pps_lenindef(decode_path):
3105 BitStringState = namedtuple(
3107 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3112 class BitString(Obj):
3113 """``BIT STRING`` bit string type
3115 >>> BitString(b"hello world")
3116 BIT STRING 88 bits 68656c6c6f20776f726c64
3119 >>> b == b"hello world"
3124 >>> BitString("'0A3B5F291CD'H")
3125 BIT STRING 44 bits 0a3b5f291cd0
3126 >>> b = BitString("'010110000000'B")
3127 BIT STRING 12 bits 5800
3130 >>> b[0], b[1], b[2], b[3]
3131 (False, True, False, True)
3135 [False, True, False, True, True, False, False, False, False, False, False, False]
3139 class KeyUsage(BitString):
3141 ("digitalSignature", 0),
3142 ("nonRepudiation", 1),
3143 ("keyEncipherment", 2),
3146 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3147 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3149 ['nonRepudiation', 'keyEncipherment']
3151 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3155 Pay attention that BIT STRING can be encoded both in primitive
3156 and constructed forms. Decoder always checks constructed form tag
3157 additionally to specified primitive one. If BER decoding is
3158 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3159 of DER restrictions.
3161 __slots__ = ("tag_constructed", "specs", "defined")
3162 tag_default = tag_encode(3)
3163 asn1_type_name = "BIT STRING"
3176 :param value: set the value. Either binary type, tuple of named
3177 values (if ``schema`` is specified in the class),
3178 string in ``'XXX...'B`` form, or
3179 :py:class:`pyderasn.BitString` object
3180 :param bytes impl: override default tag with ``IMPLICIT`` one
3181 :param bytes expl: override default tag with ``EXPLICIT`` one
3182 :param default: set default value. Type same as in ``value``
3183 :param bool optional: is object ``OPTIONAL`` in sequence
3185 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3186 specs = getattr(self, "schema", {}) if _specs is None else _specs
3187 self.specs = specs if specs.__class__ == dict else dict(specs)
3188 self._value = None if value is None else self._value_sanitize(value)
3189 if default is not None:
3190 default = self._value_sanitize(default)
3191 self.default = self.__class__(
3197 self._value = default
3199 tag_klass, _, tag_num = tag_decode(self.tag)
3200 self.tag_constructed = tag_encode(
3202 form=TagFormConstructed,
3206 def _bits2octets(self, bits):
3207 if len(self.specs) > 0:
3208 bits = bits.rstrip("0")
3210 bits += "0" * ((8 - (bit_len % 8)) % 8)
3211 octets = bytearray(len(bits) // 8)
3212 for i in six_xrange(len(octets)):
3213 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3214 return bit_len, bytes(octets)
3216 def _value_sanitize(self, value):
3217 if isinstance(value, (string_types, binary_type)):
3219 isinstance(value, string_types) and
3220 value.startswith("'")
3222 if value.endswith("'B"):
3224 if not frozenset(value) <= SET01:
3225 raise ValueError("B's coding contains unacceptable chars")
3226 return self._bits2octets(value)
3227 if value.endswith("'H"):
3231 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3233 if value.__class__ == binary_type:
3234 return (len(value) * 8, value)
3235 raise InvalidValueType((self.__class__, string_types, binary_type))
3236 if value.__class__ == tuple:
3239 isinstance(value[0], integer_types) and
3240 value[1].__class__ == binary_type
3245 bit = self.specs.get(name)
3247 raise ObjUnknown("BitString value: %s" % name)
3250 return self._bits2octets("")
3251 bits = frozenset(bits)
3252 return self._bits2octets("".join(
3253 ("1" if bit in bits else "0")
3254 for bit in six_xrange(max(bits) + 1)
3256 if issubclass(value.__class__, BitString):
3258 raise InvalidValueType((self.__class__, binary_type, string_types))
3262 return self._value is not None
3264 def __getstate__(self):
3265 return BitStringState(
3280 self.tag_constructed,
3284 def __setstate__(self, state):
3285 super(BitString, self).__setstate__(state)
3286 self.specs = state.specs
3287 self._value = state.value
3288 self.tag_constructed = state.tag_constructed
3289 self.defined = state.defined
3292 self._assert_ready()
3293 for i in six_xrange(self._value[0]):
3298 """Returns number of bits in the string
3300 self._assert_ready()
3301 return self._value[0]
3303 def __bytes__(self):
3304 self._assert_ready()
3305 return self._value[1]
3307 def __eq__(self, their):
3308 if their.__class__ == bytes:
3309 return self._value[1] == their
3310 if not issubclass(their.__class__, BitString):
3313 self._value == their._value and
3314 self.tag == their.tag and
3315 self._expl == their._expl
3320 """Named representation (if exists) of the bits
3322 :returns: [str(name), ...]
3324 return [name for name, bit in iteritems(self.specs) if self[bit]]
3334 return self.__class__(
3336 impl=self.tag if impl is None else impl,
3337 expl=self._expl if expl is None else expl,
3338 default=self.default if default is None else default,
3339 optional=self.optional if optional is None else optional,
3343 def __getitem__(self, key):
3344 if key.__class__ == int:
3345 bit_len, octets = self._value
3349 byte2int(memoryview(octets)[key // 8:]) >>
3352 if isinstance(key, string_types):
3353 value = self.specs.get(key)
3355 raise ObjUnknown("BitString value: %s" % key)
3357 raise InvalidValueType((int, str))
3360 self._assert_ready()
3361 bit_len, octets = self._value
3364 len_encode(len(octets) + 1),
3365 int2byte((8 - bit_len % 8) % 8),
3369 def _encode1st(self, state):
3370 self._assert_ready()
3371 _, octets = self._value
3373 return len(self.tag) + len_size(l) + l, state
3375 def _encode2nd(self, writer, state_iter):
3376 bit_len, octets = self._value
3377 write_full(writer, b"".join((
3379 len_encode(len(octets) + 1),
3380 int2byte((8 - bit_len % 8) % 8),
3382 write_full(writer, octets)
3384 def _encode_cer(self, writer):
3385 bit_len, octets = self._value
3386 if len(octets) + 1 <= 1000:
3387 write_full(writer, self._encode())
3389 write_full(writer, self.tag_constructed)
3390 write_full(writer, LENINDEF)
3391 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3392 write_full(writer, b"".join((
3393 BitString.tag_default,
3396 octets[offset:offset + 999],
3398 tail = octets[offset + 999:]
3400 tail = int2byte((8 - bit_len % 8) % 8) + tail
3401 write_full(writer, b"".join((
3402 BitString.tag_default,
3403 len_encode(len(tail)),
3406 write_full(writer, EOC)
3408 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3410 t, tlen, lv = tag_strip(tlv)
3411 except DecodeError as err:
3412 raise err.__class__(
3414 klass=self.__class__,
3415 decode_path=decode_path,
3419 if tag_only: # pragma: no cover
3423 l, llen, v = len_decode(lv)
3424 except DecodeError as err:
3425 raise err.__class__(
3427 klass=self.__class__,
3428 decode_path=decode_path,
3432 raise NotEnoughData(
3433 "encoded length is longer than data",
3434 klass=self.__class__,
3435 decode_path=decode_path,
3439 raise NotEnoughData(
3441 klass=self.__class__,
3442 decode_path=decode_path,
3445 pad_size = byte2int(v)
3446 if l == 1 and pad_size != 0:
3448 "invalid empty value",
3449 klass=self.__class__,
3450 decode_path=decode_path,
3456 klass=self.__class__,
3457 decode_path=decode_path,
3460 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3463 klass=self.__class__,
3464 decode_path=decode_path,
3467 v, tail = v[:l], v[l:]
3468 bit_len = (len(v) - 1) * 8 - pad_size
3469 obj = self.__class__(
3470 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3473 default=self.default,
3474 optional=self.optional,
3476 _decoded=(offset, llen, l),
3479 obj._value = (bit_len, None)
3480 yield decode_path, obj, tail
3482 if t != self.tag_constructed:
3484 klass=self.__class__,
3485 decode_path=decode_path,
3488 if not ctx.get("bered", False):
3490 "unallowed BER constructed encoding",
3491 klass=self.__class__,
3492 decode_path=decode_path,
3495 if tag_only: # pragma: no cover
3500 l, llen, v = len_decode(lv)
3501 except LenIndefForm:
3502 llen, l, v = 1, 0, lv[1:]
3504 except DecodeError as err:
3505 raise err.__class__(
3507 klass=self.__class__,
3508 decode_path=decode_path,
3512 raise NotEnoughData(
3513 "encoded length is longer than data",
3514 klass=self.__class__,
3515 decode_path=decode_path,
3518 if not lenindef and l == 0:
3519 raise NotEnoughData(
3521 klass=self.__class__,
3522 decode_path=decode_path,
3526 sub_offset = offset + tlen + llen
3530 if v[:EOC_LEN].tobytes() == EOC:
3537 "chunk out of bounds",
3538 klass=self.__class__,
3539 decode_path=decode_path + (str(len(chunks) - 1),),
3540 offset=chunks[-1].offset,
3542 sub_decode_path = decode_path + (str(len(chunks)),)
3545 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3548 decode_path=sub_decode_path,
3551 _ctx_immutable=False,
3553 yield _decode_path, chunk, v_tail
3555 _, chunk, v_tail = next(BitString().decode_evgen(
3558 decode_path=sub_decode_path,
3561 _ctx_immutable=False,
3566 "expected BitString encoded chunk",
3567 klass=self.__class__,
3568 decode_path=sub_decode_path,
3571 chunks.append(chunk)
3572 sub_offset += chunk.tlvlen
3573 vlen += chunk.tlvlen
3575 if len(chunks) == 0:
3578 klass=self.__class__,
3579 decode_path=decode_path,
3584 for chunk_i, chunk in enumerate(chunks[:-1]):
3585 if chunk.bit_len % 8 != 0:
3587 "BitString chunk is not multiple of 8 bits",
3588 klass=self.__class__,
3589 decode_path=decode_path + (str(chunk_i),),
3590 offset=chunk.offset,
3593 values.append(bytes(chunk))
3594 bit_len += chunk.bit_len
3595 chunk_last = chunks[-1]
3597 values.append(bytes(chunk_last))
3598 bit_len += chunk_last.bit_len
3599 obj = self.__class__(
3600 value=None if evgen_mode else (bit_len, b"".join(values)),
3603 default=self.default,
3604 optional=self.optional,
3606 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3609 obj._value = (bit_len, None)
3610 obj.lenindef = lenindef
3611 obj.ber_encoded = True
3612 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3615 return pp_console_row(next(self.pps()))
3617 def pps(self, decode_path=()):
3621 bit_len, blob = self._value
3622 value = "%d bits" % bit_len
3623 if len(self.specs) > 0 and blob is not None:
3624 blob = tuple(self.named)
3627 asn1_type_name=self.asn1_type_name,
3628 obj_name=self.__class__.__name__,
3629 decode_path=decode_path,
3632 optional=self.optional,
3633 default=self == self.default,
3634 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3635 expl=None if self._expl is None else tag_decode(self._expl),
3640 expl_offset=self.expl_offset if self.expled else None,
3641 expl_tlen=self.expl_tlen if self.expled else None,
3642 expl_llen=self.expl_llen if self.expled else None,
3643 expl_vlen=self.expl_vlen if self.expled else None,
3644 expl_lenindef=self.expl_lenindef,
3645 lenindef=self.lenindef,
3646 ber_encoded=self.ber_encoded,
3649 defined_by, defined = self.defined or (None, None)
3650 if defined_by is not None:
3652 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3654 for pp in self.pps_lenindef(decode_path):
3658 OctetStringState = namedtuple(
3660 BasicState._fields + (
3671 class OctetString(Obj):
3672 """``OCTET STRING`` binary string type
3674 >>> s = OctetString(b"hello world")
3675 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3676 >>> s == OctetString(b"hello world")
3681 >>> OctetString(b"hello", bounds=(4, 4))
3682 Traceback (most recent call last):
3683 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3684 >>> OctetString(b"hell", bounds=(4, 4))
3685 OCTET STRING 4 bytes 68656c6c
3687 Memoryviews can be used as a values. If memoryview is made on
3688 mmap-ed file, then it does not take storage inside OctetString
3689 itself. In CER encoding mode it will be streamed to the specified
3690 writer, copying 1 KB chunks.
3692 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3693 tag_default = tag_encode(4)
3694 asn1_type_name = "OCTET STRING"
3695 evgen_mode_skip_value = True
3709 :param value: set the value. Either binary type, or
3710 :py:class:`pyderasn.OctetString` object
3711 :param bounds: set ``(MIN, MAX)`` value size constraint.
3712 (-inf, +inf) by default
3713 :param bytes impl: override default tag with ``IMPLICIT`` one
3714 :param bytes expl: override default tag with ``EXPLICIT`` one
3715 :param default: set default value. Type same as in ``value``
3716 :param bool optional: is object ``OPTIONAL`` in sequence
3718 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3720 self._bound_min, self._bound_max = getattr(
3724 ) if bounds is None else bounds
3725 if value is not None:
3726 self._value = self._value_sanitize(value)
3727 if default is not None:
3728 default = self._value_sanitize(default)
3729 self.default = self.__class__(
3734 if self._value is None:
3735 self._value = default
3737 tag_klass, _, tag_num = tag_decode(self.tag)
3738 self.tag_constructed = tag_encode(
3740 form=TagFormConstructed,
3744 def _value_sanitize(self, value):
3745 if value.__class__ == binary_type or value.__class__ == memoryview:
3747 elif issubclass(value.__class__, OctetString):
3748 value = value._value
3750 raise InvalidValueType((self.__class__, bytes, memoryview))
3751 if not self._bound_min <= len(value) <= self._bound_max:
3752 raise BoundsError(self._bound_min, len(value), self._bound_max)
3757 return self._value is not None
3759 def __getstate__(self):
3760 return OctetStringState(
3776 self.tag_constructed,
3780 def __setstate__(self, state):
3781 super(OctetString, self).__setstate__(state)
3782 self._value = state.value
3783 self._bound_min = state.bound_min
3784 self._bound_max = state.bound_max
3785 self.tag_constructed = state.tag_constructed
3786 self.defined = state.defined
3788 def __bytes__(self):
3789 self._assert_ready()
3790 return bytes(self._value)
3792 def __eq__(self, their):
3793 if their.__class__ == binary_type:
3794 return self._value == their
3795 if not issubclass(their.__class__, OctetString):
3798 self._value == their._value and
3799 self.tag == their.tag and
3800 self._expl == their._expl
3803 def __lt__(self, their):
3804 return self._value < their._value
3815 return self.__class__(
3818 (self._bound_min, self._bound_max)
3819 if bounds is None else bounds
3821 impl=self.tag if impl is None else impl,
3822 expl=self._expl if expl is None else expl,
3823 default=self.default if default is None else default,
3824 optional=self.optional if optional is None else optional,
3828 self._assert_ready()
3831 len_encode(len(self._value)),
3835 def _encode1st(self, state):
3836 self._assert_ready()
3837 l = len(self._value)
3838 return len(self.tag) + len_size(l) + l, state
3840 def _encode2nd(self, writer, state_iter):
3842 write_full(writer, self.tag + len_encode(len(value)))
3843 write_full(writer, value)
3845 def _encode_cer(self, writer):
3846 octets = self._value
3847 if len(octets) <= 1000:
3848 write_full(writer, self._encode())
3850 write_full(writer, self.tag_constructed)
3851 write_full(writer, LENINDEF)
3852 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3853 write_full(writer, b"".join((
3854 OctetString.tag_default,
3856 octets[offset:offset + 1000],
3858 tail = octets[offset + 1000:]
3860 write_full(writer, b"".join((
3861 OctetString.tag_default,
3862 len_encode(len(tail)),
3865 write_full(writer, EOC)
3867 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3869 t, tlen, lv = tag_strip(tlv)
3870 except DecodeError as err:
3871 raise err.__class__(
3873 klass=self.__class__,
3874 decode_path=decode_path,
3882 l, llen, v = len_decode(lv)
3883 except DecodeError as err:
3884 raise err.__class__(
3886 klass=self.__class__,
3887 decode_path=decode_path,
3891 raise NotEnoughData(
3892 "encoded length is longer than data",
3893 klass=self.__class__,
3894 decode_path=decode_path,
3897 v, tail = v[:l], v[l:]
3898 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3900 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3901 klass=self.__class__,
3902 decode_path=decode_path,
3906 obj = self.__class__(
3908 None if (evgen_mode and self.evgen_mode_skip_value)
3911 bounds=(self._bound_min, self._bound_max),
3914 default=self.default,
3915 optional=self.optional,
3916 _decoded=(offset, llen, l),
3919 except DecodeError as err:
3922 klass=self.__class__,
3923 decode_path=decode_path,
3926 except BoundsError as err:
3929 klass=self.__class__,
3930 decode_path=decode_path,
3933 yield decode_path, obj, tail
3935 if t != self.tag_constructed:
3937 klass=self.__class__,
3938 decode_path=decode_path,
3941 if not ctx.get("bered", False):
3943 "unallowed BER constructed encoding",
3944 klass=self.__class__,
3945 decode_path=decode_path,
3953 l, llen, v = len_decode(lv)
3954 except LenIndefForm:
3955 llen, l, v = 1, 0, lv[1:]
3957 except DecodeError as err:
3958 raise err.__class__(
3960 klass=self.__class__,
3961 decode_path=decode_path,
3965 raise NotEnoughData(
3966 "encoded length is longer than data",
3967 klass=self.__class__,
3968 decode_path=decode_path,
3973 sub_offset = offset + tlen + llen
3978 if v[:EOC_LEN].tobytes() == EOC:
3985 "chunk out of bounds",
3986 klass=self.__class__,
3987 decode_path=decode_path + (str(len(chunks) - 1),),
3988 offset=chunks[-1].offset,
3992 sub_decode_path = decode_path + (str(chunks_count),)
3993 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3996 decode_path=sub_decode_path,
3999 _ctx_immutable=False,
4001 yield _decode_path, chunk, v_tail
4002 if not chunk.ber_encoded:
4003 payload_len += chunk.vlen
4006 sub_decode_path = decode_path + (str(len(chunks)),)
4007 _, chunk, v_tail = next(OctetString().decode_evgen(
4010 decode_path=sub_decode_path,
4013 _ctx_immutable=False,
4016 chunks.append(chunk)
4019 "expected OctetString encoded chunk",
4020 klass=self.__class__,
4021 decode_path=sub_decode_path,
4024 sub_offset += chunk.tlvlen
4025 vlen += chunk.tlvlen
4027 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4029 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4030 klass=self.__class__,
4031 decode_path=decode_path,
4035 obj = self.__class__(
4037 None if evgen_mode else
4038 b"".join(bytes(chunk) for chunk in chunks)
4040 bounds=(self._bound_min, self._bound_max),
4043 default=self.default,
4044 optional=self.optional,
4045 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4048 except DecodeError as err:
4051 klass=self.__class__,
4052 decode_path=decode_path,
4055 except BoundsError as err:
4058 klass=self.__class__,
4059 decode_path=decode_path,
4062 obj.lenindef = lenindef
4063 obj.ber_encoded = True
4064 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4067 return pp_console_row(next(self.pps()))
4069 def pps(self, decode_path=()):
4072 asn1_type_name=self.asn1_type_name,
4073 obj_name=self.__class__.__name__,
4074 decode_path=decode_path,
4075 value=("%d bytes" % len(self._value)) if self.ready else None,
4076 blob=self._value if self.ready else None,
4077 optional=self.optional,
4078 default=self == self.default,
4079 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4080 expl=None if self._expl is None else tag_decode(self._expl),
4085 expl_offset=self.expl_offset if self.expled else None,
4086 expl_tlen=self.expl_tlen if self.expled else None,
4087 expl_llen=self.expl_llen if self.expled else None,
4088 expl_vlen=self.expl_vlen if self.expled else None,
4089 expl_lenindef=self.expl_lenindef,
4090 lenindef=self.lenindef,
4091 ber_encoded=self.ber_encoded,
4094 defined_by, defined = self.defined or (None, None)
4095 if defined_by is not None:
4097 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4099 for pp in self.pps_lenindef(decode_path):
4103 def agg_octet_string(evgens, decode_path, raw, writer):
4104 """Aggregate constructed string (OctetString and its derivatives)
4106 :param evgens: iterator of generated events
4107 :param decode_path: points to the string we want to decode
4108 :param raw: slicebable (memoryview, bytearray, etc) with
4109 the data evgens are generated on
4110 :param writer: buffer.write where string is going to be saved
4111 :param writer: where string is going to be saved. Must comply
4112 with ``io.RawIOBase.write`` behaviour
4114 .. seealso:: :ref:`agg_octet_string`
4116 decode_path_len = len(decode_path)
4117 for dp, obj, _ in evgens:
4118 if dp[:decode_path_len] != decode_path:
4120 if not obj.ber_encoded:
4121 write_full(writer, raw[
4122 obj.offset + obj.tlen + obj.llen:
4123 obj.offset + obj.tlen + obj.llen + obj.vlen -
4124 (EOC_LEN if obj.expl_lenindef else 0)
4126 if len(dp) == decode_path_len:
4130 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4134 """``NULL`` null object
4142 tag_default = tag_encode(5)
4143 asn1_type_name = "NULL"
4147 value=None, # unused, but Sequence passes it
4154 :param bytes impl: override default tag with ``IMPLICIT`` one
4155 :param bytes expl: override default tag with ``EXPLICIT`` one
4156 :param bool optional: is object ``OPTIONAL`` in sequence
4158 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4165 def __getstate__(self):
4181 def __eq__(self, their):
4182 if not issubclass(their.__class__, Null):
4185 self.tag == their.tag and
4186 self._expl == their._expl
4196 return self.__class__(
4197 impl=self.tag if impl is None else impl,
4198 expl=self._expl if expl is None else expl,
4199 optional=self.optional if optional is None else optional,
4203 return self.tag + LEN0
4205 def _encode1st(self, state):
4206 return len(self.tag) + 1, state
4208 def _encode2nd(self, writer, state_iter):
4209 write_full(writer, self.tag + LEN0)
4211 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4213 t, _, lv = tag_strip(tlv)
4214 except DecodeError as err:
4215 raise err.__class__(
4217 klass=self.__class__,
4218 decode_path=decode_path,
4223 klass=self.__class__,
4224 decode_path=decode_path,
4227 if tag_only: # pragma: no cover
4231 l, _, v = len_decode(lv)
4232 except DecodeError as err:
4233 raise err.__class__(
4235 klass=self.__class__,
4236 decode_path=decode_path,
4240 raise InvalidLength(
4241 "Null must have zero length",
4242 klass=self.__class__,
4243 decode_path=decode_path,
4246 obj = self.__class__(
4249 optional=self.optional,
4250 _decoded=(offset, 1, 0),
4252 yield decode_path, obj, v
4255 return pp_console_row(next(self.pps()))
4257 def pps(self, decode_path=()):
4260 asn1_type_name=self.asn1_type_name,
4261 obj_name=self.__class__.__name__,
4262 decode_path=decode_path,
4263 optional=self.optional,
4264 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4265 expl=None if self._expl is None else tag_decode(self._expl),
4270 expl_offset=self.expl_offset if self.expled else None,
4271 expl_tlen=self.expl_tlen if self.expled else None,
4272 expl_llen=self.expl_llen if self.expled else None,
4273 expl_vlen=self.expl_vlen if self.expled else None,
4274 expl_lenindef=self.expl_lenindef,
4277 for pp in self.pps_lenindef(decode_path):
4281 ObjectIdentifierState = namedtuple(
4282 "ObjectIdentifierState",
4283 BasicState._fields + ("value", "defines"),
4288 class ObjectIdentifier(Obj):
4289 """``OBJECT IDENTIFIER`` OID type
4291 >>> oid = ObjectIdentifier((1, 2, 3))
4292 OBJECT IDENTIFIER 1.2.3
4293 >>> oid == ObjectIdentifier("1.2.3")
4299 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4300 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4302 >>> str(ObjectIdentifier((3, 1)))
4303 Traceback (most recent call last):
4304 pyderasn.InvalidOID: unacceptable first arc value
4306 __slots__ = ("defines",)
4307 tag_default = tag_encode(6)
4308 asn1_type_name = "OBJECT IDENTIFIER"
4321 :param value: set the value. Either tuples of integers,
4322 string of "."-concatenated integers, or
4323 :py:class:`pyderasn.ObjectIdentifier` object
4324 :param defines: sequence of tuples. Each tuple has two elements.
4325 First one is relative to current one decode
4326 path, aiming to the field defined by that OID.
4327 Read about relative path in
4328 :py:func:`pyderasn.abs_decode_path`. Second
4329 tuple element is ``{OID: pyderasn.Obj()}``
4330 dictionary, mapping between current OID value
4331 and structure applied to defined field.
4333 .. seealso:: :ref:`definedby`
4335 :param bytes impl: override default tag with ``IMPLICIT`` one
4336 :param bytes expl: override default tag with ``EXPLICIT`` one
4337 :param default: set default value. Type same as in ``value``
4338 :param bool optional: is object ``OPTIONAL`` in sequence
4340 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4342 if value is not None:
4343 self._value = self._value_sanitize(value)
4344 if default is not None:
4345 default = self._value_sanitize(default)
4346 self.default = self.__class__(
4351 if self._value is None:
4352 self._value = default
4353 self.defines = defines
4355 def __add__(self, their):
4356 if their.__class__ == tuple:
4357 return self.__class__(self._value + array("L", their))
4358 if isinstance(their, self.__class__):
4359 return self.__class__(self._value + their._value)
4360 raise InvalidValueType((self.__class__, tuple))
4362 def _value_sanitize(self, value):
4363 if issubclass(value.__class__, ObjectIdentifier):
4365 if isinstance(value, string_types):
4367 value = array("L", (pureint(arc) for arc in value.split(".")))
4369 raise InvalidOID("unacceptable arcs values")
4370 if value.__class__ == tuple:
4372 value = array("L", value)
4373 except OverflowError as err:
4374 raise InvalidOID(repr(err))
4375 if value.__class__ is array:
4377 raise InvalidOID("less than 2 arcs")
4378 first_arc = value[0]
4379 if first_arc in (0, 1):
4380 if not (0 <= value[1] <= 39):
4381 raise InvalidOID("second arc is too wide")
4382 elif first_arc == 2:
4385 raise InvalidOID("unacceptable first arc value")
4386 if not all(arc >= 0 for arc in value):
4387 raise InvalidOID("negative arc value")
4389 raise InvalidValueType((self.__class__, str, tuple))
4393 return self._value is not None
4395 def __getstate__(self):
4396 return ObjectIdentifierState(
4413 def __setstate__(self, state):
4414 super(ObjectIdentifier, self).__setstate__(state)
4415 self._value = state.value
4416 self.defines = state.defines
4419 self._assert_ready()
4420 return iter(self._value)
4423 return ".".join(str(arc) for arc in self._value or ())
4426 self._assert_ready()
4427 return hash(b"".join((
4429 bytes(self._expl or b""),
4430 str(self._value).encode("ascii"),
4433 def __eq__(self, their):
4434 if their.__class__ == tuple:
4435 return self._value == array("L", their)
4436 if not issubclass(their.__class__, ObjectIdentifier):
4439 self.tag == their.tag and
4440 self._expl == their._expl and
4441 self._value == their._value
4444 def __lt__(self, their):
4445 return self._value < their._value
4456 return self.__class__(
4458 defines=self.defines if defines is None else defines,
4459 impl=self.tag if impl is None else impl,
4460 expl=self._expl if expl is None else expl,
4461 default=self.default if default is None else default,
4462 optional=self.optional if optional is None else optional,
4465 def _encode_octets(self):
4466 self._assert_ready()
4468 first_value = value[1]
4469 first_arc = value[0]
4472 elif first_arc == 1:
4474 elif first_arc == 2:
4476 else: # pragma: no cover
4477 raise RuntimeError("invalid arc is stored")
4478 octets = [zero_ended_encode(first_value)]
4479 for arc in value[2:]:
4480 octets.append(zero_ended_encode(arc))
4481 return b"".join(octets)
4484 v = self._encode_octets()
4485 return b"".join((self.tag, len_encode(len(v)), v))
4487 def _encode1st(self, state):
4488 l = len(self._encode_octets())
4489 return len(self.tag) + len_size(l) + l, state
4491 def _encode2nd(self, writer, state_iter):
4492 write_full(writer, self._encode())
4494 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4496 t, _, lv = tag_strip(tlv)
4497 except DecodeError as err:
4498 raise err.__class__(
4500 klass=self.__class__,
4501 decode_path=decode_path,
4506 klass=self.__class__,
4507 decode_path=decode_path,
4510 if tag_only: # pragma: no cover
4514 l, llen, v = len_decode(lv)
4515 except DecodeError as err:
4516 raise err.__class__(
4518 klass=self.__class__,
4519 decode_path=decode_path,
4523 raise NotEnoughData(
4524 "encoded length is longer than data",
4525 klass=self.__class__,
4526 decode_path=decode_path,
4530 raise NotEnoughData(
4532 klass=self.__class__,
4533 decode_path=decode_path,
4536 v, tail = v[:l], v[l:]
4543 octet = indexbytes(v, i)
4544 if i == 0 and octet == 0x80:
4545 if ctx.get("bered", False):
4549 "non normalized arc encoding",
4550 klass=self.__class__,
4551 decode_path=decode_path,
4554 arc = (arc << 7) | (octet & 0x7F)
4555 if octet & 0x80 == 0:
4558 except OverflowError:
4560 "too huge value for local unsigned long",
4561 klass=self.__class__,
4562 decode_path=decode_path,
4571 klass=self.__class__,
4572 decode_path=decode_path,
4576 second_arc = arcs[0]
4577 if 0 <= second_arc <= 39:
4579 elif 40 <= second_arc <= 79:
4585 obj = self.__class__(
4586 value=array("L", (first_arc, second_arc)) + arcs[1:],
4589 default=self.default,
4590 optional=self.optional,
4591 _decoded=(offset, llen, l),
4594 obj.ber_encoded = True
4595 yield decode_path, obj, tail
4598 return pp_console_row(next(self.pps()))
4600 def pps(self, decode_path=()):
4603 asn1_type_name=self.asn1_type_name,
4604 obj_name=self.__class__.__name__,
4605 decode_path=decode_path,
4606 value=str(self) if self.ready else None,
4607 optional=self.optional,
4608 default=self == self.default,
4609 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4610 expl=None if self._expl is None else tag_decode(self._expl),
4615 expl_offset=self.expl_offset if self.expled else None,
4616 expl_tlen=self.expl_tlen if self.expled else None,
4617 expl_llen=self.expl_llen if self.expled else None,
4618 expl_vlen=self.expl_vlen if self.expled else None,
4619 expl_lenindef=self.expl_lenindef,
4620 ber_encoded=self.ber_encoded,
4623 for pp in self.pps_lenindef(decode_path):
4627 class Enumerated(Integer):
4628 """``ENUMERATED`` integer type
4630 This type is identical to :py:class:`pyderasn.Integer`, but requires
4631 schema to be specified and does not accept values missing from it.
4634 tag_default = tag_encode(10)
4635 asn1_type_name = "ENUMERATED"
4646 bounds=None, # dummy argument, workability for Integer.decode
4648 super(Enumerated, self).__init__(
4649 value, bounds, impl, expl, default, optional, _specs, _decoded,
4651 if len(self.specs) == 0:
4652 raise ValueError("schema must be specified")
4654 def _value_sanitize(self, value):
4655 if isinstance(value, self.__class__):
4656 value = value._value
4657 elif isinstance(value, integer_types):
4658 for _value in itervalues(self.specs):
4663 "unknown integer value: %s" % value,
4664 klass=self.__class__,
4666 elif isinstance(value, string_types):
4667 value = self.specs.get(value)
4669 raise ObjUnknown("integer value: %s" % value)
4671 raise InvalidValueType((self.__class__, int, str))
4683 return self.__class__(
4685 impl=self.tag if impl is None else impl,
4686 expl=self._expl if expl is None else expl,
4687 default=self.default if default is None else default,
4688 optional=self.optional if optional is None else optional,
4693 def escape_control_unicode(c):
4694 if unicat(c)[0] == "C":
4695 c = repr(c).lstrip("u").strip("'")
4699 class CommonString(OctetString):
4700 """Common class for all strings
4702 Everything resembles :py:class:`pyderasn.OctetString`, except
4703 ability to deal with unicode text strings.
4705 >>> hexenc("привет мир".encode("utf-8"))
4706 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4707 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4709 >>> s = UTF8String("привет мир")
4710 UTF8String UTF8String привет мир
4712 'привет мир'
4713 >>> hexenc(bytes(s))
4714 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4716 >>> PrintableString("привет мир")
4717 Traceback (most recent call last):
4718 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4720 >>> BMPString("ада", bounds=(2, 2))
4721 Traceback (most recent call last):
4722 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4723 >>> s = BMPString("ад", bounds=(2, 2))
4726 >>> hexenc(bytes(s))
4733 - Text Encoding, validation
4734 * - :py:class:`pyderasn.UTF8String`
4736 * - :py:class:`pyderasn.NumericString`
4737 - proper alphabet validation
4738 * - :py:class:`pyderasn.PrintableString`
4739 - proper alphabet validation
4740 * - :py:class:`pyderasn.TeletexString`
4742 * - :py:class:`pyderasn.T61String`
4744 * - :py:class:`pyderasn.VideotexString`
4746 * - :py:class:`pyderasn.IA5String`
4747 - proper alphabet validation
4748 * - :py:class:`pyderasn.GraphicString`
4750 * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4751 - proper alphabet validation
4752 * - :py:class:`pyderasn.GeneralString`
4754 * - :py:class:`pyderasn.UniversalString`
4756 * - :py:class:`pyderasn.BMPString`
4761 def _value_sanitize(self, value):
4763 value_decoded = None
4764 if isinstance(value, self.__class__):
4765 value_raw = value._value
4766 elif value.__class__ == text_type:
4767 value_decoded = value
4768 elif value.__class__ == binary_type:
4771 raise InvalidValueType((self.__class__, text_type, binary_type))
4774 value_decoded.encode(self.encoding)
4775 if value_raw is None else value_raw
4778 value_raw.decode(self.encoding)
4779 if value_decoded is None else value_decoded
4781 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4782 raise DecodeError(str(err))
4783 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4791 def __eq__(self, their):
4792 if their.__class__ == binary_type:
4793 return self._value == their
4794 if their.__class__ == text_type:
4795 return self._value == their.encode(self.encoding)
4796 if not isinstance(their, self.__class__):
4799 self._value == their._value and
4800 self.tag == their.tag and
4801 self._expl == their._expl
4804 def __unicode__(self):
4806 return self._value.decode(self.encoding)
4807 return text_type(self._value)
4810 return pp_console_row(next(self.pps(no_unicode=PY2)))
4812 def pps(self, decode_path=(), no_unicode=False):
4816 hexenc(bytes(self)) if no_unicode else
4817 "".join(escape_control_unicode(c) for c in self.__unicode__())
4821 asn1_type_name=self.asn1_type_name,
4822 obj_name=self.__class__.__name__,
4823 decode_path=decode_path,
4825 optional=self.optional,
4826 default=self == self.default,
4827 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4828 expl=None if self._expl is None else tag_decode(self._expl),
4833 expl_offset=self.expl_offset if self.expled else None,
4834 expl_tlen=self.expl_tlen if self.expled else None,
4835 expl_llen=self.expl_llen if self.expled else None,
4836 expl_vlen=self.expl_vlen if self.expled else None,
4837 expl_lenindef=self.expl_lenindef,
4838 ber_encoded=self.ber_encoded,
4841 for pp in self.pps_lenindef(decode_path):
4845 class UTF8String(CommonString):
4847 tag_default = tag_encode(12)
4849 asn1_type_name = "UTF8String"
4852 class AllowableCharsMixin(object):
4854 def allowable_chars(self):
4856 return self._allowable_chars
4857 return frozenset(six_unichr(c) for c in self._allowable_chars)
4859 def _value_sanitize(self, value):
4860 value = super(AllowableCharsMixin, self)._value_sanitize(value)
4861 if not frozenset(value) <= self._allowable_chars:
4862 raise DecodeError("non satisfying alphabet value")
4866 class NumericString(AllowableCharsMixin, CommonString):
4869 Its value is properly sanitized: only ASCII digits with spaces can
4872 >>> NumericString().allowable_chars
4873 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4876 tag_default = tag_encode(18)
4878 asn1_type_name = "NumericString"
4879 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4882 PrintableStringState = namedtuple(
4883 "PrintableStringState",
4884 OctetStringState._fields + ("allowable_chars",),
4889 class PrintableString(AllowableCharsMixin, CommonString):
4892 Its value is properly sanitized: see X.680 41.4 table 10.
4894 >>> PrintableString().allowable_chars
4895 frozenset([' ', "'", ..., 'z'])
4896 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4897 PrintableString PrintableString foo*bar
4898 >>> obj.allow_asterisk, obj.allow_ampersand
4902 tag_default = tag_encode(19)
4904 asn1_type_name = "PrintableString"
4905 _allowable_chars = frozenset(
4906 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4908 _asterisk = frozenset("*".encode("ascii"))
4909 _ampersand = frozenset("&".encode("ascii"))
4921 allow_asterisk=False,
4922 allow_ampersand=False,
4925 :param allow_asterisk: allow asterisk character
4926 :param allow_ampersand: allow ampersand character
4929 self._allowable_chars |= self._asterisk
4931 self._allowable_chars |= self._ampersand
4932 super(PrintableString, self).__init__(
4933 value, bounds, impl, expl, default, optional, _decoded, ctx,
4937 def allow_asterisk(self):
4938 """Is asterisk character allowed?
4940 return self._asterisk <= self._allowable_chars
4943 def allow_ampersand(self):
4944 """Is ampersand character allowed?
4946 return self._ampersand <= self._allowable_chars
4948 def __getstate__(self):
4949 return PrintableStringState(
4950 *super(PrintableString, self).__getstate__(),
4951 **{"allowable_chars": self._allowable_chars}
4954 def __setstate__(self, state):
4955 super(PrintableString, self).__setstate__(state)
4956 self._allowable_chars = state.allowable_chars
4967 return self.__class__(
4970 (self._bound_min, self._bound_max)
4971 if bounds is None else bounds
4973 impl=self.tag if impl is None else impl,
4974 expl=self._expl if expl is None else expl,
4975 default=self.default if default is None else default,
4976 optional=self.optional if optional is None else optional,
4977 allow_asterisk=self.allow_asterisk,
4978 allow_ampersand=self.allow_ampersand,
4982 class TeletexString(CommonString):
4984 tag_default = tag_encode(20)
4985 encoding = "iso-8859-1"
4986 asn1_type_name = "TeletexString"
4989 class T61String(TeletexString):
4991 asn1_type_name = "T61String"
4994 class VideotexString(CommonString):
4996 tag_default = tag_encode(21)
4997 encoding = "iso-8859-1"
4998 asn1_type_name = "VideotexString"
5001 class IA5String(AllowableCharsMixin, CommonString):
5004 Its value is properly sanitized: it is a mix of
5006 * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
5007 * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
5008 * DEL character (0x7F)
5010 It is just 7-bit ASCII.
5012 >>> IA5String().allowable_chars
5013 frozenset(["NUL", ... "DEL"])
5016 tag_default = tag_encode(22)
5018 asn1_type_name = "IA5"
5019 _allowable_chars = frozenset(b"".join(
5020 six_unichr(c).encode("ascii") for c in six_xrange(128)
5024 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5025 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5026 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5027 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5028 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5029 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5032 class VisibleString(AllowableCharsMixin, CommonString):
5035 Its value is properly sanitized. ASCII subset from space to tilde is
5036 allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5038 >>> VisibleString().allowable_chars
5039 frozenset([" ", ... "~"])
5042 tag_default = tag_encode(26)
5044 asn1_type_name = "VisibleString"
5045 _allowable_chars = frozenset(b"".join(
5046 six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5050 class ISO646String(VisibleString):
5052 asn1_type_name = "ISO646String"
5055 UTCTimeState = namedtuple(
5057 OctetStringState._fields + ("ber_raw",),
5062 def str_to_time_fractions(value):
5064 year, v = (v // 10**10), (v % 10**10)
5065 month, v = (v // 10**8), (v % 10**8)
5066 day, v = (v // 10**6), (v % 10**6)
5067 hour, v = (v // 10**4), (v % 10**4)
5068 minute, second = (v // 100), (v % 100)
5069 return year, month, day, hour, minute, second
5072 class UTCTime(VisibleString):
5073 """``UTCTime`` datetime type
5075 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5076 UTCTime UTCTime 2017-09-30T22:07:50
5082 datetime.datetime(2017, 9, 30, 22, 7, 50)
5083 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5084 datetime.datetime(1957, 9, 30, 22, 7, 50)
5086 If BER encoded value was met, then ``ber_raw`` attribute will hold
5087 its raw representation.
5091 Only **naive** ``datetime`` objects are supported.
5092 Library assumes that all work is done in UTC.
5096 Pay attention that ``UTCTime`` can not hold full year, so all years
5097 having < 50 years are treated as 20xx, 19xx otherwise, according to
5098 X.509 recommendation. Use ``GeneralizedTime`` instead for
5103 No strict validation of UTC offsets are made (only applicable to
5104 **BER**), but very crude:
5106 * minutes are not exceeding 60
5107 * offset value is not exceeding 14 hours
5109 __slots__ = ("ber_raw",)
5110 tag_default = tag_encode(23)
5112 asn1_type_name = "UTCTime"
5113 evgen_mode_skip_value = False
5123 bounds=None, # dummy argument, workability for OctetString.decode
5127 :param value: set the value. Either datetime type, or
5128 :py:class:`pyderasn.UTCTime` object
5129 :param bytes impl: override default tag with ``IMPLICIT`` one
5130 :param bytes expl: override default tag with ``EXPLICIT`` one
5131 :param default: set default value. Type same as in ``value``
5132 :param bool optional: is object ``OPTIONAL`` in sequence
5134 super(UTCTime, self).__init__(
5135 None, None, impl, expl, None, optional, _decoded, ctx,
5139 if value is not None:
5140 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5141 self.ber_encoded = self.ber_raw is not None
5142 if default is not None:
5143 default, _ = self._value_sanitize(default)
5144 self.default = self.__class__(
5149 if self._value is None:
5150 self._value = default
5152 self.optional = optional
5154 def _strptime_bered(self, value):
5155 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5158 raise ValueError("no timezone")
5159 year += 2000 if year < 50 else 1900
5160 decoded = datetime(year, month, day, hour, minute)
5162 if value[-1] == "Z":
5166 raise ValueError("invalid UTC offset")
5167 if value[-5] == "-":
5169 elif value[-5] == "+":
5172 raise ValueError("invalid UTC offset")
5173 v = pureint(value[-4:])
5174 offset, v = (60 * (v % 100)), v // 100
5176 raise ValueError("invalid UTC offset minutes")
5178 if offset > 14 * 3600:
5179 raise ValueError("too big UTC offset")
5183 return offset, decoded
5185 raise ValueError("invalid UTC offset seconds")
5186 seconds = pureint(value)
5188 raise ValueError("invalid seconds value")
5189 return offset, decoded + timedelta(seconds=seconds)
5191 def _strptime(self, value):
5192 # datetime.strptime's format: %y%m%d%H%M%SZ
5193 if len(value) != LEN_YYMMDDHHMMSSZ:
5194 raise ValueError("invalid UTCTime length")
5195 if value[-1] != "Z":
5196 raise ValueError("non UTC timezone")
5197 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5198 year += 2000 if year < 50 else 1900
5199 return datetime(year, month, day, hour, minute, second)
5201 def _dt_sanitize(self, value):
5202 if value.year < 1950 or value.year > 2049:
5203 raise ValueError("UTCTime can hold only 1950-2049 years")
5204 return value.replace(microsecond=0)
5206 def _value_sanitize(self, value, ctx=None):
5207 if value.__class__ == binary_type:
5209 value_decoded = value.decode("ascii")
5210 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5211 raise DecodeError("invalid UTCTime encoding: %r" % err)
5214 return self._strptime(value_decoded), None
5215 except (TypeError, ValueError) as _err:
5217 if (ctx is not None) and ctx.get("bered", False):
5219 offset, _value = self._strptime_bered(value_decoded)
5220 _value = _value - timedelta(seconds=offset)
5221 return self._dt_sanitize(_value), value
5222 except (TypeError, ValueError, OverflowError) as _err:
5225 "invalid %s format: %r" % (self.asn1_type_name, err),
5226 klass=self.__class__,
5228 if isinstance(value, self.__class__):
5229 return value._value, None
5230 if value.__class__ == datetime:
5231 if value.tzinfo is not None:
5232 raise ValueError("only naive datetime supported")
5233 return self._dt_sanitize(value), None
5234 raise InvalidValueType((self.__class__, datetime))
5236 def _pp_value(self):
5238 value = self._value.isoformat()
5239 if self.ber_encoded:
5240 value += " (%s)" % self.ber_raw
5244 def __unicode__(self):
5246 value = self._value.isoformat()
5247 if self.ber_encoded:
5248 value += " (%s)" % self.ber_raw
5250 return text_type(self._pp_value())
5252 def __getstate__(self):
5253 return UTCTimeState(
5254 *super(UTCTime, self).__getstate__(),
5255 **{"ber_raw": self.ber_raw}
5258 def __setstate__(self, state):
5259 super(UTCTime, self).__setstate__(state)
5260 self.ber_raw = state.ber_raw
5262 def __bytes__(self):
5263 self._assert_ready()
5264 return self._encode_time()
5266 def __eq__(self, their):
5267 if their.__class__ == binary_type:
5268 return self._encode_time() == their
5269 if their.__class__ == datetime:
5270 return self.todatetime() == their
5271 if not isinstance(their, self.__class__):
5274 self._value == their._value and
5275 self.tag == their.tag and
5276 self._expl == their._expl
5279 def _encode_time(self):
5280 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5283 self._assert_ready()
5284 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5286 def _encode1st(self, state):
5287 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5289 def _encode2nd(self, writer, state_iter):
5290 self._assert_ready()
5291 write_full(writer, self._encode())
5293 def _encode_cer(self, writer):
5294 write_full(writer, self._encode())
5296 def todatetime(self):
5300 return pp_console_row(next(self.pps()))
5302 def pps(self, decode_path=()):
5305 asn1_type_name=self.asn1_type_name,
5306 obj_name=self.__class__.__name__,
5307 decode_path=decode_path,
5308 value=self._pp_value(),
5309 optional=self.optional,
5310 default=self == self.default,
5311 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5312 expl=None if self._expl is None else tag_decode(self._expl),
5317 expl_offset=self.expl_offset if self.expled else None,
5318 expl_tlen=self.expl_tlen if self.expled else None,
5319 expl_llen=self.expl_llen if self.expled else None,
5320 expl_vlen=self.expl_vlen if self.expled else None,
5321 expl_lenindef=self.expl_lenindef,
5322 ber_encoded=self.ber_encoded,
5325 for pp in self.pps_lenindef(decode_path):
5329 class GeneralizedTime(UTCTime):
5330 """``GeneralizedTime`` datetime type
5332 This type is similar to :py:class:`pyderasn.UTCTime`.
5334 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5335 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5337 '20170930220750.000123Z'
5338 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5339 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5343 Only **naive** datetime objects are supported.
5344 Library assumes that all work is done in UTC.
5348 Only **microsecond** fractions are supported in DER encoding.
5349 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5350 higher precision values.
5354 **BER** encoded data can loss information (accuracy) during
5355 decoding because of float transformations.
5359 **Zero** year is unsupported.
5362 tag_default = tag_encode(24)
5363 asn1_type_name = "GeneralizedTime"
5365 def _dt_sanitize(self, value):
5368 def _strptime_bered(self, value):
5369 if len(value) < 4 + 3 * 2:
5370 raise ValueError("invalid GeneralizedTime")
5371 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5372 decoded = datetime(year, month, day, hour)
5373 offset, value = 0, value[10:]
5375 return offset, decoded
5376 if value[-1] == "Z":
5379 for char, sign in (("-", -1), ("+", 1)):
5380 idx = value.rfind(char)
5383 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5384 v = pureint(offset_raw)
5385 if len(offset_raw) == 4:
5386 offset, v = (60 * (v % 100)), v // 100
5388 raise ValueError("invalid UTC offset minutes")
5389 elif len(offset_raw) == 2:
5392 raise ValueError("invalid UTC offset")
5394 if offset > 14 * 3600:
5395 raise ValueError("too big UTC offset")
5399 return offset, decoded
5400 if value[0] in DECIMAL_SIGNS:
5402 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5405 raise ValueError("stripped minutes")
5406 decoded += timedelta(seconds=60 * pureint(value[:2]))
5409 return offset, decoded
5410 if value[0] in DECIMAL_SIGNS:
5412 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5415 raise ValueError("stripped seconds")
5416 decoded += timedelta(seconds=pureint(value[:2]))
5419 return offset, decoded
5420 if value[0] not in DECIMAL_SIGNS:
5421 raise ValueError("invalid format after seconds")
5423 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5426 def _strptime(self, value):
5428 if l == LEN_YYYYMMDDHHMMSSZ:
5429 # datetime.strptime's format: %Y%m%d%H%M%SZ
5430 if value[-1] != "Z":
5431 raise ValueError("non UTC timezone")
5432 return datetime(*str_to_time_fractions(value[:-1]))
5433 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5434 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5435 if value[-1] != "Z":
5436 raise ValueError("non UTC timezone")
5437 if value[14] != ".":
5438 raise ValueError("no fractions separator")
5441 raise ValueError("trailing zero")
5444 raise ValueError("only microsecond fractions are supported")
5445 us = pureint(us + ("0" * (6 - us_len)))
5446 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5447 return datetime(year, month, day, hour, minute, second, us)
5448 raise ValueError("invalid GeneralizedTime length")
5450 def _encode_time(self):
5452 encoded = value.strftime("%Y%m%d%H%M%S")
5453 if value.microsecond > 0:
5454 encoded += (".%06d" % value.microsecond).rstrip("0")
5455 return (encoded + "Z").encode("ascii")
5458 self._assert_ready()
5460 if value.microsecond > 0:
5461 encoded = self._encode_time()
5462 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5463 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5465 def _encode1st(self, state):
5466 self._assert_ready()
5467 vlen = len(self._encode_time())
5468 return len(self.tag) + len_size(vlen) + vlen, state
5470 def _encode2nd(self, writer, state_iter):
5471 write_full(writer, self._encode())
5474 class GraphicString(CommonString):
5476 tag_default = tag_encode(25)
5477 encoding = "iso-8859-1"
5478 asn1_type_name = "GraphicString"
5481 class GeneralString(CommonString):
5483 tag_default = tag_encode(27)
5484 encoding = "iso-8859-1"
5485 asn1_type_name = "GeneralString"
5488 class UniversalString(CommonString):
5490 tag_default = tag_encode(28)
5491 encoding = "utf-32-be"
5492 asn1_type_name = "UniversalString"
5495 class BMPString(CommonString):
5497 tag_default = tag_encode(30)
5498 encoding = "utf-16-be"
5499 asn1_type_name = "BMPString"
5502 ChoiceState = namedtuple(
5504 BasicState._fields + ("specs", "value",),
5510 """``CHOICE`` special type
5514 class GeneralName(Choice):
5516 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5517 ("dNSName", IA5String(impl=tag_ctxp(2))),
5520 >>> gn = GeneralName()
5522 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5523 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5524 >>> gn["dNSName"] = IA5String("bar.baz")
5525 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5526 >>> gn["rfc822Name"]
5529 [2] IA5String IA5 bar.baz
5532 >>> gn.value == gn["dNSName"]
5535 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5537 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5538 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5540 __slots__ = ("specs",)
5542 asn1_type_name = "CHOICE"
5555 :param value: set the value. Either ``(choice, value)`` tuple, or
5556 :py:class:`pyderasn.Choice` object
5557 :param bytes impl: can not be set, do **not** use it
5558 :param bytes expl: override default tag with ``EXPLICIT`` one
5559 :param default: set default value. Type same as in ``value``
5560 :param bool optional: is object ``OPTIONAL`` in sequence
5562 if impl is not None:
5563 raise ValueError("no implicit tag allowed for CHOICE")
5564 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5566 schema = getattr(self, "schema", ())
5567 if len(schema) == 0:
5568 raise ValueError("schema must be specified")
5570 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5573 if value is not None:
5574 self._value = self._value_sanitize(value)
5575 if default is not None:
5576 default_value = self._value_sanitize(default)
5577 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5578 default_obj.specs = self.specs
5579 default_obj._value = default_value
5580 self.default = default_obj
5582 self._value = copy(default_obj._value)
5583 if self._expl is not None:
5584 tag_class, _, tag_num = tag_decode(self._expl)
5585 self._tag_order = (tag_class, tag_num)
5587 def _value_sanitize(self, value):
5588 if (value.__class__ == tuple) and len(value) == 2:
5590 spec = self.specs.get(choice)
5592 raise ObjUnknown(choice)
5593 if not isinstance(obj, spec.__class__):
5594 raise InvalidValueType((spec,))
5595 return (choice, spec(obj))
5596 if isinstance(value, self.__class__):
5598 raise InvalidValueType((self.__class__, tuple))
5602 return self._value is not None and self._value[1].ready
5606 return self.expl_lenindef or (
5607 (self._value is not None) and
5608 self._value[1].bered
5611 def __getstate__(self):
5629 def __setstate__(self, state):
5630 super(Choice, self).__setstate__(state)
5631 self.specs = state.specs
5632 self._value = state.value
5634 def __eq__(self, their):
5635 if (their.__class__ == tuple) and len(their) == 2:
5636 return self._value == their
5637 if not isinstance(their, self.__class__):
5640 self.specs == their.specs and
5641 self._value == their._value
5651 return self.__class__(
5654 expl=self._expl if expl is None else expl,
5655 default=self.default if default is None else default,
5656 optional=self.optional if optional is None else optional,
5661 """Name of the choice
5663 self._assert_ready()
5664 return self._value[0]
5668 """Value of underlying choice
5670 self._assert_ready()
5671 return self._value[1]
5674 def tag_order(self):
5675 self._assert_ready()
5676 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5679 def tag_order_cer(self):
5680 return min(v.tag_order_cer for v in itervalues(self.specs))
5682 def __getitem__(self, key):
5683 if key not in self.specs:
5684 raise ObjUnknown(key)
5685 if self._value is None:
5687 choice, value = self._value
5692 def __setitem__(self, key, value):
5693 spec = self.specs.get(key)
5695 raise ObjUnknown(key)
5696 if not isinstance(value, spec.__class__):
5697 raise InvalidValueType((spec.__class__,))
5698 self._value = (key, spec(value))
5706 return self._value[1].decoded if self.ready else False
5709 self._assert_ready()
5710 return self._value[1].encode()
5712 def _encode1st(self, state):
5713 self._assert_ready()
5714 return self._value[1].encode1st(state)
5716 def _encode2nd(self, writer, state_iter):
5717 self._value[1].encode2nd(writer, state_iter)
5719 def _encode_cer(self, writer):
5720 self._assert_ready()
5721 self._value[1].encode_cer(writer)
5723 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5724 for choice, spec in iteritems(self.specs):
5725 sub_decode_path = decode_path + (choice,)
5731 decode_path=sub_decode_path,
5734 _ctx_immutable=False,
5741 klass=self.__class__,
5742 decode_path=decode_path,
5745 if tag_only: # pragma: no cover
5749 for _decode_path, value, tail in spec.decode_evgen(
5753 decode_path=sub_decode_path,
5755 _ctx_immutable=False,
5757 yield _decode_path, value, tail
5759 _, value, tail = next(spec.decode_evgen(
5763 decode_path=sub_decode_path,
5765 _ctx_immutable=False,
5768 obj = self.__class__(
5771 default=self.default,
5772 optional=self.optional,
5773 _decoded=(offset, 0, value.fulllen),
5775 obj._value = (choice, value)
5776 yield decode_path, obj, tail
5779 value = pp_console_row(next(self.pps()))
5781 value = "%s[%r]" % (value, self.value)
5784 def pps(self, decode_path=()):
5787 asn1_type_name=self.asn1_type_name,
5788 obj_name=self.__class__.__name__,
5789 decode_path=decode_path,
5790 value=self.choice if self.ready else None,
5791 optional=self.optional,
5792 default=self == self.default,
5793 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5794 expl=None if self._expl is None else tag_decode(self._expl),
5799 expl_lenindef=self.expl_lenindef,
5803 yield self.value.pps(decode_path=decode_path + (self.choice,))
5804 for pp in self.pps_lenindef(decode_path):
5808 class PrimitiveTypes(Choice):
5809 """Predefined ``CHOICE`` for all generic primitive types
5811 It could be useful for general decoding of some unspecified values:
5813 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5814 OCTET STRING 3 bytes 666f6f
5815 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5819 schema = tuple((klass.__name__, klass()) for klass in (
5843 AnyState = namedtuple(
5845 BasicState._fields + ("value", "defined"),
5851 """``ANY`` special type
5853 >>> Any(Integer(-123))
5854 ANY INTEGER -123 (0X:7B)
5855 >>> a = Any(OctetString(b"hello world").encode())
5856 ANY 040b68656c6c6f20776f726c64
5857 >>> hexenc(bytes(a))
5858 b'0x040x0bhello world'
5860 __slots__ = ("defined",)
5861 tag_default = tag_encode(0)
5862 asn1_type_name = "ANY"
5872 :param value: set the value. Either any kind of pyderasn's
5873 **ready** object, or bytes. Pay attention that
5874 **no** validation is performed if raw binary value
5875 is valid TLV, except just tag decoding
5876 :param bytes expl: override default tag with ``EXPLICIT`` one
5877 :param bool optional: is object ``OPTIONAL`` in sequence
5879 super(Any, self).__init__(None, expl, None, optional, _decoded)
5883 value = self._value_sanitize(value)
5885 if self._expl is None:
5886 if value.__class__ == binary_type:
5887 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5889 tag_class, tag_num = value.tag_order
5891 tag_class, _, tag_num = tag_decode(self._expl)
5892 self._tag_order = (tag_class, tag_num)
5895 def _value_sanitize(self, value):
5896 if value.__class__ == binary_type:
5898 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5900 if isinstance(value, self.__class__):
5902 if not isinstance(value, Obj):
5903 raise InvalidValueType((self.__class__, Obj, binary_type))
5908 return self._value is not None
5911 def tag_order(self):
5912 self._assert_ready()
5913 return self._tag_order
5917 if self.expl_lenindef or self.lenindef:
5919 if self.defined is None:
5921 return self.defined[1].bered
5923 def __getstate__(self):
5941 def __setstate__(self, state):
5942 super(Any, self).__setstate__(state)
5943 self._value = state.value
5944 self.defined = state.defined
5946 def __eq__(self, their):
5947 if their.__class__ == binary_type:
5948 if self._value.__class__ == binary_type:
5949 return self._value == their
5950 return self._value.encode() == their
5951 if issubclass(their.__class__, Any):
5952 if self.ready and their.ready:
5953 return bytes(self) == bytes(their)
5954 return self.ready == their.ready
5963 return self.__class__(
5965 expl=self._expl if expl is None else expl,
5966 optional=self.optional if optional is None else optional,
5969 def __bytes__(self):
5970 self._assert_ready()
5972 if value.__class__ == binary_type:
5974 return self._value.encode()
5981 self._assert_ready()
5983 if value.__class__ == binary_type:
5985 return value.encode()
5987 def _encode1st(self, state):
5988 self._assert_ready()
5990 if value.__class__ == binary_type:
5991 return len(value), state
5992 return value.encode1st(state)
5994 def _encode2nd(self, writer, state_iter):
5996 if value.__class__ == binary_type:
5997 write_full(writer, value)
5999 value.encode2nd(writer, state_iter)
6001 def _encode_cer(self, writer):
6002 self._assert_ready()
6004 if value.__class__ == binary_type:
6005 write_full(writer, value)
6007 value.encode_cer(writer)
6009 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6011 t, tlen, lv = tag_strip(tlv)
6012 except DecodeError as err:
6013 raise err.__class__(
6015 klass=self.__class__,
6016 decode_path=decode_path,
6020 l, llen, v = len_decode(lv)
6021 except LenIndefForm as err:
6022 if not ctx.get("bered", False):
6023 raise err.__class__(
6025 klass=self.__class__,
6026 decode_path=decode_path,
6029 llen, vlen, v = 1, 0, lv[1:]
6030 sub_offset = offset + tlen + llen
6032 while v[:EOC_LEN].tobytes() != EOC:
6033 chunk, v = Any().decode(
6036 decode_path=decode_path + (str(chunk_i),),
6039 _ctx_immutable=False,
6041 vlen += chunk.tlvlen
6042 sub_offset += chunk.tlvlen
6044 tlvlen = tlen + llen + vlen + EOC_LEN
6045 obj = self.__class__(
6046 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6048 optional=self.optional,
6049 _decoded=(offset, 0, tlvlen),
6052 obj.tag = t.tobytes()
6053 yield decode_path, obj, v[EOC_LEN:]
6055 except DecodeError as err:
6056 raise err.__class__(
6058 klass=self.__class__,
6059 decode_path=decode_path,
6063 raise NotEnoughData(
6064 "encoded length is longer than data",
6065 klass=self.__class__,
6066 decode_path=decode_path,
6069 tlvlen = tlen + llen + l
6070 v, tail = tlv[:tlvlen], v[l:]
6071 obj = self.__class__(
6072 value=None if evgen_mode else v.tobytes(),
6074 optional=self.optional,
6075 _decoded=(offset, 0, tlvlen),
6077 obj.tag = t.tobytes()
6078 yield decode_path, obj, tail
6081 return pp_console_row(next(self.pps()))
6083 def pps(self, decode_path=()):
6087 elif value.__class__ == binary_type:
6093 asn1_type_name=self.asn1_type_name,
6094 obj_name=self.__class__.__name__,
6095 decode_path=decode_path,
6097 blob=self._value if self._value.__class__ == binary_type else None,
6098 optional=self.optional,
6099 default=self == self.default,
6100 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6101 expl=None if self._expl is None else tag_decode(self._expl),
6106 expl_offset=self.expl_offset if self.expled else None,
6107 expl_tlen=self.expl_tlen if self.expled else None,
6108 expl_llen=self.expl_llen if self.expled else None,
6109 expl_vlen=self.expl_vlen if self.expled else None,
6110 expl_lenindef=self.expl_lenindef,
6111 lenindef=self.lenindef,
6114 defined_by, defined = self.defined or (None, None)
6115 if defined_by is not None:
6117 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6119 for pp in self.pps_lenindef(decode_path):
6123 ########################################################################
6124 # ASN.1 constructed types
6125 ########################################################################
6127 def abs_decode_path(decode_path, rel_path):
6128 """Create an absolute decode path from current and relative ones
6130 :param decode_path: current decode path, starting point. Tuple of strings
6131 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6132 If first tuple's element is "/", then treat it as
6133 an absolute path, ignoring ``decode_path`` as
6134 starting point. Also this tuple can contain ".."
6135 elements, stripping the leading element from
6138 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6139 ("foo", "bar", "baz", "whatever")
6140 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6142 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6145 if rel_path[0] == "/":
6147 if rel_path[0] == "..":
6148 return abs_decode_path(decode_path[:-1], rel_path[1:])
6149 return decode_path + rel_path
6152 SequenceState = namedtuple(
6154 BasicState._fields + ("specs", "value",),
6159 class SequenceEncode1stMixing(object):
6160 def _encode1st(self, state):
6162 idx = len(state) - 1
6164 for v in self._values_for_encoding():
6165 l, _ = v.encode1st(state)
6168 return len(self.tag) + len_size(vlen) + vlen, state
6171 class Sequence(SequenceEncode1stMixing, Obj):
6172 """``SEQUENCE`` structure type
6174 You have to make specification of sequence::
6176 class Extension(Sequence):
6178 ("extnID", ObjectIdentifier()),
6179 ("critical", Boolean(default=False)),
6180 ("extnValue", OctetString()),
6183 Then, you can work with it as with dictionary.
6185 >>> ext = Extension()
6186 >>> Extension().specs
6188 ('extnID', OBJECT IDENTIFIER),
6189 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6190 ('extnValue', OCTET STRING),
6192 >>> ext["extnID"] = "1.2.3"
6193 Traceback (most recent call last):
6194 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6195 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6197 You can determine if sequence is ready to be encoded:
6202 Traceback (most recent call last):
6203 pyderasn.ObjNotReady: object is not ready: extnValue
6204 >>> ext["extnValue"] = OctetString(b"foobar")
6208 Value you want to assign, must have the same **type** as in
6209 corresponding specification, but it can have different tags,
6210 optional/default attributes -- they will be taken from specification
6213 class TBSCertificate(Sequence):
6215 ("version", Version(expl=tag_ctxc(0), default="v1")),
6218 >>> tbs = TBSCertificate()
6219 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6221 Assign ``None`` to remove value from sequence.
6223 You can set values in Sequence during its initialization:
6225 >>> AlgorithmIdentifier((
6226 ("algorithm", ObjectIdentifier("1.2.3")),
6227 ("parameters", Any(Null()))
6229 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6231 You can determine if value exists/set in the sequence and take its value:
6233 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6236 OBJECT IDENTIFIER 1.2.3
6238 But pay attention that if value has default, then it won't be (not
6239 in) in the sequence (because ``DEFAULT`` must not be encoded in
6240 DER), but you can read its value:
6242 >>> "critical" in ext, ext["critical"]
6243 (False, BOOLEAN False)
6244 >>> ext["critical"] = Boolean(True)
6245 >>> "critical" in ext, ext["critical"]
6246 (True, BOOLEAN True)
6248 All defaulted values are always optional.
6250 .. _allow_default_values_ctx:
6252 DER prohibits default value encoding and will raise an error if
6253 default value is unexpectedly met during decode.
6254 If :ref:`bered <bered_ctx>` context option is set, then no error
6255 will be raised, but ``bered`` attribute set. You can disable strict
6256 defaulted values existence validation by setting
6257 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6259 All values with DEFAULT specified are decoded atomically in
6260 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6261 SEQUENCE, then it will be yielded as a single element, not
6262 disassembled. That is required for DEFAULT existence check.
6264 Two sequences are equal if they have equal specification (schema),
6265 implicit/explicit tagging and the same values.
6267 __slots__ = ("specs",)
6268 tag_default = tag_encode(form=TagFormConstructed, num=16)
6269 asn1_type_name = "SEQUENCE"
6281 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6283 schema = getattr(self, "schema", ())
6285 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6288 if value is not None:
6289 if issubclass(value.__class__, Sequence):
6290 self._value = value._value
6291 elif hasattr(value, "__iter__"):
6292 for seq_key, seq_value in value:
6293 self[seq_key] = seq_value
6295 raise InvalidValueType((Sequence,))
6296 if default is not None:
6297 if not issubclass(default.__class__, Sequence):
6298 raise InvalidValueType((Sequence,))
6299 default_value = default._value
6300 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6301 default_obj.specs = self.specs
6302 default_obj._value = default_value
6303 self.default = default_obj
6305 self._value = copy(default_obj._value)
6309 for name, spec in iteritems(self.specs):
6310 value = self._value.get(name)
6321 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6323 return any(value.bered for value in itervalues(self._value))
6325 def __getstate__(self):
6326 return SequenceState(
6340 {k: copy(v) for k, v in iteritems(self._value)},
6343 def __setstate__(self, state):
6344 super(Sequence, self).__setstate__(state)
6345 self.specs = state.specs
6346 self._value = state.value
6348 def __eq__(self, their):
6349 if not isinstance(their, self.__class__):
6352 self.specs == their.specs and
6353 self.tag == their.tag and
6354 self._expl == their._expl and
6355 self._value == their._value
6366 return self.__class__(
6369 impl=self.tag if impl is None else impl,
6370 expl=self._expl if expl is None else expl,
6371 default=self.default if default is None else default,
6372 optional=self.optional if optional is None else optional,
6375 def __contains__(self, key):
6376 return key in self._value
6378 def __setitem__(self, key, value):
6379 spec = self.specs.get(key)
6381 raise ObjUnknown(key)
6383 self._value.pop(key, None)
6385 if not isinstance(value, spec.__class__):
6386 raise InvalidValueType((spec.__class__,))
6387 value = spec(value=value)
6388 if spec.default is not None and value == spec.default:
6389 self._value.pop(key, None)
6391 self._value[key] = value
6393 def __getitem__(self, key):
6394 value = self._value.get(key)
6395 if value is not None:
6397 spec = self.specs.get(key)
6399 raise ObjUnknown(key)
6400 if spec.default is not None:
6404 def _values_for_encoding(self):
6405 for name, spec in iteritems(self.specs):
6406 value = self._value.get(name)
6410 raise ObjNotReady(name)
6414 v = b"".join(v.encode() for v in self._values_for_encoding())
6415 return b"".join((self.tag, len_encode(len(v)), v))
6417 def _encode2nd(self, writer, state_iter):
6418 write_full(writer, self.tag + len_encode(next(state_iter)))
6419 for v in self._values_for_encoding():
6420 v.encode2nd(writer, state_iter)
6422 def _encode_cer(self, writer):
6423 write_full(writer, self.tag + LENINDEF)
6424 for v in self._values_for_encoding():
6425 v.encode_cer(writer)
6426 write_full(writer, EOC)
6428 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6430 t, tlen, lv = tag_strip(tlv)
6431 except DecodeError as err:
6432 raise err.__class__(
6434 klass=self.__class__,
6435 decode_path=decode_path,
6440 klass=self.__class__,
6441 decode_path=decode_path,
6444 if tag_only: # pragma: no cover
6448 ctx_bered = ctx.get("bered", False)
6450 l, llen, v = len_decode(lv)
6451 except LenIndefForm as err:
6453 raise err.__class__(
6455 klass=self.__class__,
6456 decode_path=decode_path,
6459 l, llen, v = 0, 1, lv[1:]
6461 except DecodeError as err:
6462 raise err.__class__(
6464 klass=self.__class__,
6465 decode_path=decode_path,
6469 raise NotEnoughData(
6470 "encoded length is longer than data",
6471 klass=self.__class__,
6472 decode_path=decode_path,
6476 v, tail = v[:l], v[l:]
6478 sub_offset = offset + tlen + llen
6481 ctx_allow_default_values = ctx.get("allow_default_values", False)
6482 for name, spec in iteritems(self.specs):
6483 if spec.optional and (
6484 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6488 spec_defaulted = spec.default is not None
6489 sub_decode_path = decode_path + (name,)
6491 if evgen_mode and not spec_defaulted:
6492 for _decode_path, value, v_tail in spec.decode_evgen(
6496 decode_path=sub_decode_path,
6498 _ctx_immutable=False,
6500 yield _decode_path, value, v_tail
6502 _, value, v_tail = next(spec.decode_evgen(
6506 decode_path=sub_decode_path,
6508 _ctx_immutable=False,
6511 except TagMismatch as err:
6512 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6516 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6517 if not evgen_mode and defined is not None:
6518 defined_by, defined_spec = defined
6519 if issubclass(value.__class__, SequenceOf):
6520 for i, _value in enumerate(value):
6521 sub_sub_decode_path = sub_decode_path + (
6523 DecodePathDefBy(defined_by),
6525 defined_value, defined_tail = defined_spec.decode(
6526 memoryview(bytes(_value)),
6528 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6529 if value.expled else (value.tlen + value.llen)
6532 decode_path=sub_sub_decode_path,
6534 _ctx_immutable=False,
6536 if len(defined_tail) > 0:
6539 klass=self.__class__,
6540 decode_path=sub_sub_decode_path,
6543 _value.defined = (defined_by, defined_value)
6545 defined_value, defined_tail = defined_spec.decode(
6546 memoryview(bytes(value)),
6548 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6549 if value.expled else (value.tlen + value.llen)
6552 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6554 _ctx_immutable=False,
6556 if len(defined_tail) > 0:
6559 klass=self.__class__,
6560 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6563 value.defined = (defined_by, defined_value)
6565 value_len = value.fulllen
6567 sub_offset += value_len
6571 yield sub_decode_path, value, v_tail
6572 if value == spec.default:
6573 if ctx_bered or ctx_allow_default_values:
6577 "DEFAULT value met",
6578 klass=self.__class__,
6579 decode_path=sub_decode_path,
6583 values[name] = value
6584 spec_defines = getattr(spec, "defines", ())
6585 if len(spec_defines) == 0:
6586 defines_by_path = ctx.get("defines_by_path", ())
6587 if len(defines_by_path) > 0:
6588 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6589 if spec_defines is not None and len(spec_defines) > 0:
6590 for rel_path, schema in spec_defines:
6591 defined = schema.get(value, None)
6592 if defined is not None:
6593 ctx.setdefault("_defines", []).append((
6594 abs_decode_path(sub_decode_path[:-1], rel_path),
6598 if v[:EOC_LEN].tobytes() != EOC:
6601 klass=self.__class__,
6602 decode_path=decode_path,
6610 klass=self.__class__,
6611 decode_path=decode_path,
6614 obj = self.__class__(
6618 default=self.default,
6619 optional=self.optional,
6620 _decoded=(offset, llen, vlen),
6623 obj.lenindef = lenindef
6624 obj.ber_encoded = ber_encoded
6625 yield decode_path, obj, tail
6628 value = pp_console_row(next(self.pps()))
6630 for name in self.specs:
6631 _value = self._value.get(name)
6634 cols.append("%s: %s" % (name, repr(_value)))
6635 return "%s[%s]" % (value, "; ".join(cols))
6637 def pps(self, decode_path=()):
6640 asn1_type_name=self.asn1_type_name,
6641 obj_name=self.__class__.__name__,
6642 decode_path=decode_path,
6643 optional=self.optional,
6644 default=self == self.default,
6645 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6646 expl=None if self._expl is None else tag_decode(self._expl),
6651 expl_offset=self.expl_offset if self.expled else None,
6652 expl_tlen=self.expl_tlen if self.expled else None,
6653 expl_llen=self.expl_llen if self.expled else None,
6654 expl_vlen=self.expl_vlen if self.expled else None,
6655 expl_lenindef=self.expl_lenindef,
6656 lenindef=self.lenindef,
6657 ber_encoded=self.ber_encoded,
6660 for name in self.specs:
6661 value = self._value.get(name)
6664 yield value.pps(decode_path=decode_path + (name,))
6665 for pp in self.pps_lenindef(decode_path):
6669 class Set(Sequence, SequenceEncode1stMixing):
6670 """``SET`` structure type
6672 Its usage is identical to :py:class:`pyderasn.Sequence`.
6674 .. _allow_unordered_set_ctx:
6676 DER prohibits unordered values encoding and will raise an error
6677 during decode. If :ref:`bered <bered_ctx>` context option is set,
6678 then no error will occur. Also you can disable strict values
6679 ordering check by setting ``"allow_unordered_set": True``
6680 :ref:`context <ctx>` option.
6683 tag_default = tag_encode(form=TagFormConstructed, num=17)
6684 asn1_type_name = "SET"
6686 def _values_for_encoding(self):
6688 super(Set, self)._values_for_encoding(),
6689 key=attrgetter("tag_order"),
6692 def _encode_cer(self, writer):
6693 write_full(writer, self.tag + LENINDEF)
6695 super(Set, self)._values_for_encoding(),
6696 key=attrgetter("tag_order_cer"),
6698 v.encode_cer(writer)
6699 write_full(writer, EOC)
6701 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6703 t, tlen, lv = tag_strip(tlv)
6704 except DecodeError as err:
6705 raise err.__class__(
6707 klass=self.__class__,
6708 decode_path=decode_path,
6713 klass=self.__class__,
6714 decode_path=decode_path,
6721 ctx_bered = ctx.get("bered", False)
6723 l, llen, v = len_decode(lv)
6724 except LenIndefForm as err:
6726 raise err.__class__(
6728 klass=self.__class__,
6729 decode_path=decode_path,
6732 l, llen, v = 0, 1, lv[1:]
6734 except DecodeError as err:
6735 raise err.__class__(
6737 klass=self.__class__,
6738 decode_path=decode_path,
6742 raise NotEnoughData(
6743 "encoded length is longer than data",
6744 klass=self.__class__,
6748 v, tail = v[:l], v[l:]
6750 sub_offset = offset + tlen + llen
6753 ctx_allow_default_values = ctx.get("allow_default_values", False)
6754 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6755 tag_order_prev = (0, 0)
6756 _specs_items = copy(self.specs)
6759 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6761 for name, spec in iteritems(_specs_items):
6762 sub_decode_path = decode_path + (name,)
6768 decode_path=sub_decode_path,
6771 _ctx_immutable=False,
6778 klass=self.__class__,
6779 decode_path=decode_path,
6782 spec_defaulted = spec.default is not None
6783 if evgen_mode and not spec_defaulted:
6784 for _decode_path, value, v_tail in spec.decode_evgen(
6788 decode_path=sub_decode_path,
6790 _ctx_immutable=False,
6792 yield _decode_path, value, v_tail
6794 _, value, v_tail = next(spec.decode_evgen(
6798 decode_path=sub_decode_path,
6800 _ctx_immutable=False,
6803 value_tag_order = value.tag_order
6804 value_len = value.fulllen
6805 if tag_order_prev >= value_tag_order:
6806 if ctx_bered or ctx_allow_unordered_set:
6810 "unordered " + self.asn1_type_name,
6811 klass=self.__class__,
6812 decode_path=sub_decode_path,
6817 yield sub_decode_path, value, v_tail
6818 if value != spec.default:
6820 elif ctx_bered or ctx_allow_default_values:
6824 "DEFAULT value met",
6825 klass=self.__class__,
6826 decode_path=sub_decode_path,
6829 values[name] = value
6830 del _specs_items[name]
6831 tag_order_prev = value_tag_order
6832 sub_offset += value_len
6836 obj = self.__class__(
6840 default=self.default,
6841 optional=self.optional,
6842 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6845 if v[:EOC_LEN].tobytes() != EOC:
6848 klass=self.__class__,
6849 decode_path=decode_path,
6854 for name, spec in iteritems(self.specs):
6855 if name not in values and not spec.optional:
6857 "%s value is not ready" % name,
6858 klass=self.__class__,
6859 decode_path=decode_path,
6864 obj.ber_encoded = ber_encoded
6865 yield decode_path, obj, tail
6868 SequenceOfState = namedtuple(
6870 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6875 class SequenceOf(SequenceEncode1stMixing, Obj):
6876 """``SEQUENCE OF`` sequence type
6878 For that kind of type you must specify the object it will carry on
6879 (bounds are for example here, not required)::
6881 class Ints(SequenceOf):
6886 >>> ints.append(Integer(123))
6887 >>> ints.append(Integer(234))
6889 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6890 >>> [int(i) for i in ints]
6892 >>> ints.append(Integer(345))
6893 Traceback (most recent call last):
6894 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6897 >>> ints[1] = Integer(345)
6899 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6901 You can initialize sequence with preinitialized values:
6903 >>> ints = Ints([Integer(123), Integer(234)])
6905 Also you can use iterator as a value:
6907 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6909 And it won't be iterated until encoding process. Pay attention that
6910 bounds and required schema checks are done only during the encoding
6911 process in that case! After encode was called, then value is zeroed
6912 back to empty list and you have to set it again. That mode is useful
6913 mainly with CER encoding mode, where all objects from the iterable
6914 will be streamed to the buffer, without copying all of them to
6917 __slots__ = ("spec", "_bound_min", "_bound_max")
6918 tag_default = tag_encode(form=TagFormConstructed, num=16)
6919 asn1_type_name = "SEQUENCE OF"
6932 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6934 schema = getattr(self, "schema", None)
6936 raise ValueError("schema must be specified")
6938 self._bound_min, self._bound_max = getattr(
6942 ) if bounds is None else bounds
6944 if value is not None:
6945 self._value = self._value_sanitize(value)
6946 if default is not None:
6947 default_value = self._value_sanitize(default)
6948 default_obj = self.__class__(
6953 default_obj._value = default_value
6954 self.default = default_obj
6956 self._value = copy(default_obj._value)
6958 def _value_sanitize(self, value):
6960 if issubclass(value.__class__, SequenceOf):
6961 value = value._value
6962 elif hasattr(value, NEXT_ATTR_NAME):
6964 elif hasattr(value, "__iter__"):
6967 raise InvalidValueType((self.__class__, iter, "iterator"))
6969 if not self._bound_min <= len(value) <= self._bound_max:
6970 raise BoundsError(self._bound_min, len(value), self._bound_max)
6971 class_expected = self.spec.__class__
6973 if not isinstance(v, class_expected):
6974 raise InvalidValueType((class_expected,))
6979 if hasattr(self._value, NEXT_ATTR_NAME):
6981 if self._bound_min > 0 and len(self._value) == 0:
6983 return all(v.ready for v in self._value)
6987 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6989 return any(v.bered for v in self._value)
6991 def __getstate__(self):
6992 if hasattr(self._value, NEXT_ATTR_NAME):
6993 raise ValueError("can not pickle SequenceOf with iterator")
6994 return SequenceOfState(
7008 [copy(v) for v in self._value],
7013 def __setstate__(self, state):
7014 super(SequenceOf, self).__setstate__(state)
7015 self.spec = state.spec
7016 self._value = state.value
7017 self._bound_min = state.bound_min
7018 self._bound_max = state.bound_max
7020 def __eq__(self, their):
7021 if isinstance(their, self.__class__):
7023 self.spec == their.spec and
7024 self.tag == their.tag and
7025 self._expl == their._expl and
7026 self._value == their._value
7028 if hasattr(their, "__iter__"):
7029 return self._value == list(their)
7041 return self.__class__(
7045 (self._bound_min, self._bound_max)
7046 if bounds is None else bounds
7048 impl=self.tag if impl is None else impl,
7049 expl=self._expl if expl is None else expl,
7050 default=self.default if default is None else default,
7051 optional=self.optional if optional is None else optional,
7054 def __contains__(self, key):
7055 return key in self._value
7057 def append(self, value):
7058 if not isinstance(value, self.spec.__class__):
7059 raise InvalidValueType((self.spec.__class__,))
7060 if len(self._value) + 1 > self._bound_max:
7063 len(self._value) + 1,
7066 self._value.append(value)
7069 return iter(self._value)
7072 return len(self._value)
7074 def __setitem__(self, key, value):
7075 if not isinstance(value, self.spec.__class__):
7076 raise InvalidValueType((self.spec.__class__,))
7077 self._value[key] = self.spec(value=value)
7079 def __getitem__(self, key):
7080 return self._value[key]
7082 def _values_for_encoding(self):
7083 return iter(self._value)
7086 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7089 values_append = values.append
7090 class_expected = self.spec.__class__
7091 values_for_encoding = self._values_for_encoding()
7093 for v in values_for_encoding:
7094 if not isinstance(v, class_expected):
7095 raise InvalidValueType((class_expected,))
7096 values_append(v.encode())
7097 if not self._bound_min <= len(values) <= self._bound_max:
7098 raise BoundsError(self._bound_min, len(values), self._bound_max)
7099 value = b"".join(values)
7101 value = b"".join(v.encode() for v in self._values_for_encoding())
7102 return b"".join((self.tag, len_encode(len(value)), value))
7104 def _encode1st(self, state):
7105 state = super(SequenceOf, self)._encode1st(state)
7106 if hasattr(self._value, NEXT_ATTR_NAME):
7110 def _encode2nd(self, writer, state_iter):
7111 write_full(writer, self.tag + len_encode(next(state_iter)))
7112 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7115 class_expected = self.spec.__class__
7116 values_for_encoding = self._values_for_encoding()
7118 for v in values_for_encoding:
7119 if not isinstance(v, class_expected):
7120 raise InvalidValueType((class_expected,))
7121 v.encode2nd(writer, state_iter)
7123 if not self._bound_min <= values_count <= self._bound_max:
7124 raise BoundsError(self._bound_min, values_count, self._bound_max)
7126 for v in self._values_for_encoding():
7127 v.encode2nd(writer, state_iter)
7129 def _encode_cer(self, writer):
7130 write_full(writer, self.tag + LENINDEF)
7131 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7133 class_expected = self.spec.__class__
7135 values_for_encoding = self._values_for_encoding()
7137 for v in values_for_encoding:
7138 if not isinstance(v, class_expected):
7139 raise InvalidValueType((class_expected,))
7140 v.encode_cer(writer)
7142 if not self._bound_min <= values_count <= self._bound_max:
7143 raise BoundsError(self._bound_min, values_count, self._bound_max)
7145 for v in self._values_for_encoding():
7146 v.encode_cer(writer)
7147 write_full(writer, EOC)
7157 ordering_check=False,
7160 t, tlen, lv = tag_strip(tlv)
7161 except DecodeError as err:
7162 raise err.__class__(
7164 klass=self.__class__,
7165 decode_path=decode_path,
7170 klass=self.__class__,
7171 decode_path=decode_path,
7178 ctx_bered = ctx.get("bered", False)
7180 l, llen, v = len_decode(lv)
7181 except LenIndefForm as err:
7183 raise err.__class__(
7185 klass=self.__class__,
7186 decode_path=decode_path,
7189 l, llen, v = 0, 1, lv[1:]
7191 except DecodeError as err:
7192 raise err.__class__(
7194 klass=self.__class__,
7195 decode_path=decode_path,
7199 raise NotEnoughData(
7200 "encoded length is longer than data",
7201 klass=self.__class__,
7202 decode_path=decode_path,
7206 v, tail = v[:l], v[l:]
7208 sub_offset = offset + tlen + llen
7211 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7212 value_prev = memoryview(v[:0])
7216 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7218 sub_decode_path = decode_path + (str(_value_count),)
7220 for _decode_path, value, v_tail in spec.decode_evgen(
7224 decode_path=sub_decode_path,
7226 _ctx_immutable=False,
7228 yield _decode_path, value, v_tail
7230 _, value, v_tail = next(spec.decode_evgen(
7234 decode_path=sub_decode_path,
7236 _ctx_immutable=False,
7239 value_len = value.fulllen
7241 if value_prev.tobytes() > v[:value_len].tobytes():
7242 if ctx_bered or ctx_allow_unordered_set:
7246 "unordered " + self.asn1_type_name,
7247 klass=self.__class__,
7248 decode_path=sub_decode_path,
7251 value_prev = v[:value_len]
7254 _value.append(value)
7255 sub_offset += value_len
7258 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7260 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7261 klass=self.__class__,
7262 decode_path=decode_path,
7266 obj = self.__class__(
7267 value=None if evgen_mode else _value,
7269 bounds=(self._bound_min, self._bound_max),
7272 default=self.default,
7273 optional=self.optional,
7274 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7276 except BoundsError as err:
7279 klass=self.__class__,
7280 decode_path=decode_path,
7284 if v[:EOC_LEN].tobytes() != EOC:
7287 klass=self.__class__,
7288 decode_path=decode_path,
7293 obj.ber_encoded = ber_encoded
7294 yield decode_path, obj, tail
7298 pp_console_row(next(self.pps())),
7299 ", ".join(repr(v) for v in self._value),
7302 def pps(self, decode_path=()):
7305 asn1_type_name=self.asn1_type_name,
7306 obj_name=self.__class__.__name__,
7307 decode_path=decode_path,
7308 optional=self.optional,
7309 default=self == self.default,
7310 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7311 expl=None if self._expl is None else tag_decode(self._expl),
7316 expl_offset=self.expl_offset if self.expled else None,
7317 expl_tlen=self.expl_tlen if self.expled else None,
7318 expl_llen=self.expl_llen if self.expled else None,
7319 expl_vlen=self.expl_vlen if self.expled else None,
7320 expl_lenindef=self.expl_lenindef,
7321 lenindef=self.lenindef,
7322 ber_encoded=self.ber_encoded,
7325 for i, value in enumerate(self._value):
7326 yield value.pps(decode_path=decode_path + (str(i),))
7327 for pp in self.pps_lenindef(decode_path):
7331 class SetOf(SequenceOf):
7332 """``SET OF`` sequence type
7334 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7337 tag_default = tag_encode(form=TagFormConstructed, num=17)
7338 asn1_type_name = "SET OF"
7340 def _value_sanitize(self, value):
7341 value = super(SetOf, self)._value_sanitize(value)
7342 if hasattr(value, NEXT_ATTR_NAME):
7344 "SetOf does not support iterator values, as no sense in them"
7349 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7350 return b"".join((self.tag, len_encode(len(v)), v))
7352 def _encode2nd(self, writer, state_iter):
7353 write_full(writer, self.tag + len_encode(next(state_iter)))
7355 for v in self._values_for_encoding():
7357 v.encode2nd(buf.write, state_iter)
7358 values.append(buf.getvalue())
7361 write_full(writer, v)
7363 def _encode_cer(self, writer):
7364 write_full(writer, self.tag + LENINDEF)
7365 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7366 write_full(writer, v)
7367 write_full(writer, EOC)
7369 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7370 return super(SetOf, self)._decode(
7377 ordering_check=True,
7381 def obj_by_path(pypath): # pragma: no cover
7382 """Import object specified as string Python path
7384 Modules must be separated from classes/functions with ``:``.
7386 >>> obj_by_path("foo.bar:Baz")
7387 <class 'foo.bar.Baz'>
7388 >>> obj_by_path("foo.bar:Baz.boo")
7389 <classmethod 'foo.bar.Baz.boo'>
7391 mod, objs = pypath.rsplit(":", 1)
7392 from importlib import import_module
7393 obj = import_module(mod)
7394 for obj_name in objs.split("."):
7395 obj = getattr(obj, obj_name)
7399 def generic_decoder(): # pragma: no cover
7400 # All of this below is a big hack with self references
7401 choice = PrimitiveTypes()
7402 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7403 choice.specs["SetOf"] = SetOf(schema=choice)
7404 for i in six_xrange(31):
7405 choice.specs["SequenceOf%d" % i] = SequenceOf(
7409 choice.specs["Any"] = Any()
7411 # Class name equals to type name, to omit it from output
7412 class SEQUENCEOF(SequenceOf):
7420 with_decode_path=False,
7421 decode_path_only=(),
7424 def _pprint_pps(pps):
7426 if hasattr(pp, "_fields"):
7428 decode_path_only != () and
7429 pp.decode_path[:len(decode_path_only)] != decode_path_only
7432 if pp.asn1_type_name == Choice.asn1_type_name:
7434 pp_kwargs = pp._asdict()
7435 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7436 pp = _pp(**pp_kwargs)
7437 yield pp_console_row(
7442 with_colours=with_colours,
7443 with_decode_path=with_decode_path,
7444 decode_path_len_decrease=len(decode_path_only),
7446 for row in pp_console_blob(
7448 decode_path_len_decrease=len(decode_path_only),
7452 for row in _pprint_pps(pp):
7454 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7455 return SEQUENCEOF(), pprint_any
7458 def ascii_visualize(ba):
7459 """Output only ASCII printable characters, like in hexdump -C
7461 Example output for given binary string (right part)::
7463 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.|
7466 return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7470 """Generate ``hexdump -C`` like output
7474 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..|
7475 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j|
7477 Result of that function is a generator of lines, where each line is
7482 ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7483 " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7484 " |i..?.....G...k.j|"]
7488 hexed = hexenc(raw).upper()
7489 addr, cols = 0, ["%08x " % 0]
7490 for i in six_xrange(0, len(hexed), 2):
7491 if i != 0 and i // 2 % 8 == 0:
7493 if i != 0 and i // 2 % 16 == 0:
7494 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7497 cols = ["%08x " % addr]
7498 cols.append(" " + hexed[i:i + 2])
7500 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7504 def browse(raw, obj, oid_maps=()):
7505 """Interactive browser
7507 :param bytes raw: binary data you decoded
7508 :param obj: decoded :py:class:`pyderasn.Obj`
7509 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7510 Its human readable form is printed when OID is met
7512 .. note:: `urwid <http://urwid.org/>`__ dependency required
7514 This browser is an interactive terminal application for browsing
7515 structures of your decoded ASN.1 objects. You can quit it with **q**
7516 key. It consists of three windows:
7519 View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7520 **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7521 **Left** key goes to constructed element above. **Plus**/**Minus**
7522 keys collapse/uncollapse constructed elements. **Space** toggles it
7524 window with various information about element. You can scroll it
7525 with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7527 window with raw data hexdump and highlighted current element's
7528 contents. It automatically focuses on element's data. You can
7529 scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7530 speed) keys. If element has explicit tag, then it also will be
7531 highlighted with different colour
7533 Window's header contains current decode path and progress bars with
7534 position in *info* and *hexdump* windows.
7536 If you press **d**, then current element will be saved in the
7537 current directory under its decode path name (adding ".0", ".1", etc
7538 suffix if such file already exists). **D** will save it with explicit tag.
7540 You can also invoke it with ``--browse`` command line argument.
7542 from copy import deepcopy
7543 from os.path import exists as path_exists
7546 class TW(urwid.TreeWidget):
7547 def __init__(self, state, *args, **kwargs):
7549 self.scrolled = {"info": False, "hexdump": False}
7550 super(TW, self).__init__(*args, **kwargs)
7553 pp = self.get_node().get_value()
7554 constructed = len(pp) > 1
7555 return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7557 def _state_update(self):
7558 pp, _ = self._get_pp()
7559 self.state["decode_path"].set_text(
7560 ":".join(str(p) for p in pp.decode_path)
7562 lines = deepcopy(self.state["hexed"])
7564 def attr_set(i, attr):
7565 line = lines[i // 16]
7566 idx = 1 + (i - 16 * (i // 16))
7567 line[idx] = (attr, line[idx])
7569 if pp.expl_offset is not None:
7570 for i in six_xrange(
7572 pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7574 attr_set(i, "select-expl")
7575 for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7576 attr_set(i, "select-value")
7577 self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7578 self.state["hexdump"].set_focus(pp.offset // 16)
7579 self.state["hexdump"].set_focus_valign("middle")
7580 self.state["hexdump_bar"].set_completion(
7581 (100 * pp.offset // 16) //
7582 len(self.state["hexdump"]._body.positions())
7586 [("header", "Name: "), pp.obj_name],
7587 [("header", "Type: "), pp.asn1_type_name],
7588 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7589 [("header", "[TLV]len: "), "%d/%d/%d" % (
7590 pp.tlen, pp.llen, pp.vlen,
7592 [("header", "TLVlen: "), "%d" % sum((
7593 pp.tlen, pp.llen, pp.vlen,
7595 [("header", "Slice: "), "[%d:%d]" % (
7596 pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7600 lines.append([("warning", "LENINDEF")])
7602 lines.append([("warning", "BER encoded")])
7604 lines.append([("warning", "BERed")])
7605 if pp.expl is not None:
7606 lines.append([("header", "EXPLICIT")])
7607 klass, _, num = pp.expl
7608 lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)])
7609 if pp.expl_offset is not None:
7610 lines.append([" Offset: %d" % pp.expl_offset])
7611 lines.append([" [TLV]len: %d/%d/%d" % (
7612 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7614 lines.append([" TLVlen: %d" % sum((
7615 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7617 lines.append([" Slice: [%d:%d]" % (
7619 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7621 if pp.impl is not None:
7622 klass, _, num = pp.impl
7624 ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7627 lines.append(["OPTIONAL"])
7629 lines.append(["DEFAULT"])
7630 if len(pp.decode_path) > 0:
7631 ent = pp.decode_path[-1]
7632 if isinstance(ent, DecodePathDefBy):
7634 value = str(ent.defined_by)
7635 oid_name = find_oid_name(
7636 ent.defined_by.asn1_type_name, oid_maps, value,
7638 lines.append([("header", "DEFINED BY: "), "%s" % (
7639 value if oid_name is None
7640 else "%s (%s)" % (oid_name, value)
7643 if pp.value is not None:
7644 lines.append([("header", "Value: "), pp.value])
7646 len(oid_maps) > 0 and
7647 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7649 for oid_map in oid_maps:
7650 oid_name = oid_map.get(pp.value)
7651 if oid_name is not None:
7652 lines.append([("header", "Human: "), oid_name])
7654 if pp.asn1_type_name == Integer.asn1_type_name:
7656 ("header", "Decimal: "), "%d" % int(pp.obj),
7659 ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7661 if pp.blob.__class__ == binary_type:
7662 blob = hexenc(pp.blob).upper()
7663 for i in six_xrange(0, len(blob), 32):
7664 lines.append([colonize_hex(blob[i:i + 32])])
7665 elif pp.blob.__class__ == tuple:
7666 lines.append([", ".join(pp.blob)])
7667 self.state["info"]._set_body([urwid.Text(line) for line in lines])
7668 self.state["info_bar"].set_completion(0)
7670 def selectable(self):
7671 if self.state["widget_current"] != self:
7672 self.state["widget_current"] = self
7673 self.scrolled["info"] = False
7674 self.scrolled["hexdump"] = False
7675 self._state_update()
7676 return super(TW, self).selectable()
7678 def _get_display_text_without_offset(self):
7679 pp, constructed = self._get_pp()
7680 style = "constructed" if constructed else ""
7681 if len(pp.decode_path) == 0:
7682 return (style, pp.obj_name)
7683 if pp.asn1_type_name == "EOC":
7684 return ("eoc", "EOC")
7685 ent = pp.decode_path[-1]
7686 if isinstance(ent, DecodePathDefBy):
7687 value = str(ent.defined_by)
7688 oid_name = find_oid_name(
7689 ent.defined_by.asn1_type_name, oid_maps, value,
7691 return ("defby", "DEFBY:" + (
7692 value if oid_name is None else oid_name
7696 def get_display_text(self):
7697 pp, _ = self._get_pp()
7698 return "%s: [%d]" % (self._get_display_text_without_offset(), pp.offset)
7700 def _scroll(self, what, step):
7701 self.state[what]._invalidate()
7702 pos = self.state[what].focus_position
7703 if not self.scrolled[what]:
7704 self.scrolled[what] = True
7706 pos = max(0, pos + step)
7707 pos = min(pos, len(self.state[what]._body.positions()) - 1)
7708 self.state[what].set_focus(pos)
7709 self.state[what].set_focus_valign("top")
7710 self.state[what + "_bar"].set_completion(
7711 (100 * pos) // len(self.state[what]._body.positions())
7714 def keypress(self, size, key):
7716 raise urwid.ExitMainLoop()
7719 self.expanded = not self.expanded
7720 self.update_expanded_icon()
7723 hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7724 if key in hexdump_steps:
7725 self._scroll("hexdump", hexdump_steps[key])
7728 info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7729 if key in info_steps:
7730 self._scroll("info", info_steps[key])
7733 if key in ("d", "D"):
7734 pp, _ = self._get_pp()
7735 dp = ":".join(str(p) for p in pp.decode_path)
7736 dp = dp.replace(" ", "_")
7739 if key == "d" or pp.expl_offset is None:
7740 data = self.state["raw"][pp.offset:(
7741 pp.offset + pp.tlen + pp.llen + pp.vlen
7744 data = self.state["raw"][pp.expl_offset:(
7745 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7749 def duplicate_path(dp, ctr):
7752 return "%s.%d" % (dp, ctr)
7755 if not path_exists(duplicate_path(dp, ctr)):
7758 dp = duplicate_path(dp, ctr)
7759 with open(dp, "wb") as fd:
7761 self.state["decode_path"].set_text(
7762 ("warning", "Saved to: " + dp)
7765 return super(TW, self).keypress(size, key)
7767 class PN(urwid.ParentNode):
7768 def __init__(self, state, value, *args, **kwargs):
7770 if not hasattr(value, "_fields"):
7772 super(PN, self).__init__(value, *args, **kwargs)
7774 def load_widget(self):
7775 return TW(self.state, self)
7777 def load_child_keys(self):
7778 value = self.get_value()
7779 if hasattr(value, "_fields"):
7781 return range(len(value[1:]))
7783 def load_child_node(self, key):
7786 self.get_value()[key + 1],
7789 depth=self.get_depth() + 1,
7792 class LabeledPG(urwid.ProgressBar):
7793 def __init__(self, label, *args, **kwargs):
7795 super(LabeledPG, self).__init__(*args, **kwargs)
7798 return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7800 WinHexdump = urwid.ListBox([urwid.Text("")])
7801 WinInfo = urwid.ListBox([urwid.Text("")])
7802 WinDecodePath = urwid.Text("", "center")
7803 WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7804 WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7805 WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7808 "hexed": list(hexdump(raw)),
7809 "widget_current": None,
7811 "info_bar": WinInfoBar,
7812 "hexdump": WinHexdump,
7813 "hexdump_bar": WinHexdumpBar,
7814 "decode_path": WinDecodePath,
7818 help_text = " ".join((
7820 "space:(un)collapse",
7821 "(pg)up/down/home/end:nav",
7822 "jkJK:hexdump hlHL:info",
7828 ("weight", 1, WinTree),
7829 ("weight", 2, urwid.Pile([
7830 urwid.LineBox(WinInfo),
7831 urwid.LineBox(WinHexdump),
7834 header=urwid.Columns([
7835 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7836 ("weight", 1, WinInfoBar),
7837 ("weight", 1, WinHexdumpBar),
7839 footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7842 ("header", "bold", ""),
7843 ("constructed", "bold", ""),
7844 ("help", "light magenta", ""),
7845 ("warning", "light red", ""),
7846 ("defby", "light red", ""),
7847 ("eoc", "dark red", ""),
7848 ("select-value", "light green", ""),
7849 ("select-expl", "light red", ""),
7850 ("pg-normal", "", "light blue"),
7851 ("pg-complete", "black", "yellow"),
7856 def main(): # pragma: no cover
7858 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7859 parser.add_argument(
7863 help="Skip that number of bytes from the beginning",
7865 parser.add_argument(
7867 help="Python paths to dictionary with OIDs, comma separated",
7869 parser.add_argument(
7871 help="Python path to schema definition to use",
7873 parser.add_argument(
7874 "--defines-by-path",
7875 help="Python path to decoder's defines_by_path",
7877 parser.add_argument(
7879 action="store_true",
7880 help="Disallow BER encoding",
7882 parser.add_argument(
7883 "--print-decode-path",
7884 action="store_true",
7885 help="Print decode paths",
7887 parser.add_argument(
7888 "--decode-path-only",
7889 help="Print only specified decode path",
7891 parser.add_argument(
7893 action="store_true",
7894 help="Allow explicit tag out-of-bound",
7896 parser.add_argument(
7898 action="store_true",
7899 help="Turn on event generation mode",
7901 parser.add_argument(
7903 action="store_true",
7904 help="Start ASN.1 browser",
7906 parser.add_argument(
7908 type=argparse.FileType("rb"),
7909 help="Path to BER/CER/DER file you want to decode",
7911 args = parser.parse_args()
7913 raw = file_mmaped(args.RAWFile)[args.skip:]
7915 args.RAWFile.seek(args.skip)
7916 raw = memoryview(args.RAWFile.read())
7917 args.RAWFile.close()
7919 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7920 if args.oids else ()
7922 from functools import partial
7924 schema = obj_by_path(args.schema)
7925 pprinter = partial(pprint, big_blobs=True)
7927 schema, pprinter = generic_decoder()
7929 "bered": not args.nobered,
7930 "allow_expl_oob": args.allow_expl_oob,
7932 if args.defines_by_path is not None:
7933 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7935 obj, _ = schema().decode(raw, ctx=ctx)
7936 browse(raw, obj, oid_maps)
7937 from sys import exit as sys_exit
7939 from os import environ
7943 with_colours=environ.get("NO_COLOR") is None,
7944 with_decode_path=args.print_decode_path,
7946 () if args.decode_path_only is None else
7947 tuple(args.decode_path_only.split(":"))
7951 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7952 print(pprinter(obj, decode_path=decode_path))
7954 obj, tail = schema().decode(raw, ctx=ctx)
7955 print(pprinter(obj))
7957 print("\nTrailing data: %s" % hexenc(tail))
7960 if __name__ == "__main__":
7961 from pyderasn import *