3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
670 SEQUENCE/SET values with DEFAULT specified are automatically decoded
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
687 with open("huge", "rb") as fd:
688 raw = file_mmaped(fd)
689 obj = Something.decode(raw)
693 mmap-ed files in Python2.7 does not implement buffer protocol, so
694 memoryview won't work on them.
698 mmap maps the **whole** file. So it plays no role if you seek-ed it
699 before. Take the slice of the resulting memoryview with required
704 If you use ZFS as underlying storage, then pay attention that
705 currently most platforms does not deal good with ZFS ARC and ordinary
706 page cache used for mmaps. It can take twice the necessary size in
707 the memory: both in page cache and ZFS ARC.
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
725 opener = io.open if PY2 else open
726 with opener("result", "wb") as fd:
727 obj.encode_cer(fd.write)
732 obj.encode_cer(buf.write)
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
737 data = encode_cer(obj)
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
746 SET OF values can not be streamingly encoded, because they are
747 required to be sorted byte-by-byte. Big SET OF values still will take
748 much memory. Use neither SET nor SET OF values, as modern ASN.1
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
762 class Certificate(Sequence):
766 class SignedAttributes(SetOf):
771 .. _agg_octet_string:
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
780 SOME STRING[CONSTRUCTED]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[CONSTRUCTED]
797 OCTET STRING[PRIMITIVE]
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
812 fd = open("data.p7m", "rb")
813 raw = file_mmaped(fd)
814 ctx = {"bered": True}
815 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816 if decode_path == ("content",):
820 raise ValueError("no content found")
821 hasher_state = sha512()
823 hasher_state.update(data)
825 evgens = SignedData().decode_evgen(
826 raw[content.offset:],
827 offset=content.offset,
830 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
832 digest = hasher_state.digest()
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
841 SEQUENCE OF iterators
842 _____________________
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
878 fulllen, state = obj.encode1st()
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
885 opener = io.open if PY2 else open
886 with opener("result", "wb") as fd:
887 obj.encode2nd(fd.write, iter(state))
891 You **MUST NOT** use 1st pass state if anything is changed in the
892 objects. It is intended to be used immediately after 1st pass is
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
908 .. autofunction:: pyderasn.browse
912 .. autoclass:: pyderasn.Obj
920 .. autoclass:: pyderasn.Boolean
925 .. autoclass:: pyderasn.Integer
926 :members: __init__, named, tohex
930 .. autoclass:: pyderasn.BitString
931 :members: __init__, bit_len, named
935 .. autoclass:: pyderasn.OctetString
940 .. autoclass:: pyderasn.Null
945 .. autoclass:: pyderasn.ObjectIdentifier
950 .. autoclass:: pyderasn.Enumerated
954 .. autoclass:: pyderasn.CommonString
958 .. autoclass:: pyderasn.NumericString
962 .. autoclass:: pyderasn.PrintableString
963 :members: __init__, allow_asterisk, allow_ampersand
967 .. autoclass:: pyderasn.IA5String
971 .. autoclass:: pyderasn.VisibleString
975 .. autoclass:: pyderasn.UTCTime
976 :members: __init__, todatetime
980 .. autoclass:: pyderasn.GeneralizedTime
981 :members: __init__, todatetime
988 .. autoclass:: pyderasn.Choice
989 :members: __init__, choice, value
993 .. autoclass:: PrimitiveTypes
997 .. autoclass:: pyderasn.Any
1005 .. autoclass:: pyderasn.Sequence
1010 .. autoclass:: pyderasn.Set
1015 .. autoclass:: pyderasn.SequenceOf
1020 .. autoclass:: pyderasn.SetOf
1026 .. autofunction:: pyderasn.abs_decode_path
1027 .. autofunction:: pyderasn.agg_octet_string
1028 .. autofunction:: pyderasn.ascii_visualize
1029 .. autofunction:: pyderasn.colonize_hex
1030 .. autofunction:: pyderasn.encode2pass
1031 .. autofunction:: pyderasn.encode_cer
1032 .. autofunction:: pyderasn.file_mmaped
1033 .. autofunction:: pyderasn.hexenc
1034 .. autofunction:: pyderasn.hexdec
1035 .. autofunction:: pyderasn.hexdump
1036 .. autofunction:: pyderasn.tag_encode
1037 .. autofunction:: pyderasn.tag_decode
1038 .. autofunction:: pyderasn.tag_ctxp
1039 .. autofunction:: pyderasn.tag_ctxc
1040 .. autoclass:: pyderasn.DecodeError
1042 .. autoclass:: pyderasn.NotEnoughData
1043 .. autoclass:: pyderasn.ExceedingData
1044 .. autoclass:: pyderasn.LenIndefForm
1045 .. autoclass:: pyderasn.TagMismatch
1046 .. autoclass:: pyderasn.InvalidLength
1047 .. autoclass:: pyderasn.InvalidOID
1048 .. autoclass:: pyderasn.ObjUnknown
1049 .. autoclass:: pyderasn.ObjNotReady
1050 .. autoclass:: pyderasn.InvalidValueType
1051 .. autoclass:: pyderasn.BoundsError
1058 You can decode DER/BER files using command line abilities::
1060 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1062 If there is no schema for your file, then you can try parsing it without,
1063 but of course IMPLICIT tags will often make it impossible. But result is
1064 good enough for the certificate above::
1066 $ python -m pyderasn path/to/file
1067 0 [1,3,1604] . >: SEQUENCE OF
1068 4 [1,3,1453] . . >: SEQUENCE OF
1069 8 [0,0, 5] . . . . >: [0] ANY
1070 . . . . . A0:03:02:01:02
1071 13 [1,1, 3] . . . . >: INTEGER 61595
1072 18 [1,1, 13] . . . . >: SEQUENCE OF
1073 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1074 31 [1,1, 0] . . . . . . >: NULL
1075 33 [1,3, 274] . . . . >: SEQUENCE OF
1076 37 [1,1, 11] . . . . . . >: SET OF
1077 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1078 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1079 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1081 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1082 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1083 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1084 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1085 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1086 . . . . . . . . . 61:2E:63:6F:6D:2F
1087 1461 [1,1, 13] . . >: SEQUENCE OF
1088 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1089 1474 [1,1, 0] . . . . >: NULL
1090 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1091 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1092 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1098 If you have got dictionaries with ObjectIdentifiers, like example one
1099 from ``tests/test_crts.py``::
1102 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1103 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1105 "2.5.4.10": "id-at-organizationName",
1106 "2.5.4.11": "id-at-organizationalUnitName",
1109 then you can pass it to pretty printer to see human readable OIDs::
1111 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1113 37 [1,1, 11] . . . . . . >: SET OF
1114 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1115 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1116 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1117 50 [1,1, 18] . . . . . . >: SET OF
1118 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1119 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1120 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1121 70 [1,1, 18] . . . . . . >: SET OF
1122 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1123 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1124 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1130 Each decoded element has so-called decode path: sequence of structure
1131 names it is passing during the decode process. Each element has its own
1132 unique path inside the whole ASN.1 tree. You can print it out with
1133 ``--print-decode-path`` option::
1135 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1136 0 [1,3,1604] Certificate SEQUENCE []
1137 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1138 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1139 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1140 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1141 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1142 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1144 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1145 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1146 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1147 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1148 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1149 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1150 . . . . . . . 13:02:45:53
1151 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1154 Now you can print only the specified tree, for example signature algorithm::
1156 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1157 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1158 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1159 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1163 from array import array
1164 from codecs import getdecoder
1165 from codecs import getencoder
1166 from collections import namedtuple
1167 from collections import OrderedDict
1168 from copy import copy
1169 from datetime import datetime
1170 from datetime import timedelta
1171 from io import BytesIO
1172 from math import ceil
1173 from mmap import mmap
1174 from mmap import PROT_READ
1175 from operator import attrgetter
1176 from string import ascii_letters
1177 from string import digits
1178 from sys import maxsize as sys_maxsize
1179 from sys import version_info
1180 from unicodedata import category as unicat
1182 from six import add_metaclass
1183 from six import binary_type
1184 from six import byte2int
1185 from six import indexbytes
1186 from six import int2byte
1187 from six import integer_types
1188 from six import iterbytes
1189 from six import iteritems
1190 from six import itervalues
1192 from six import string_types
1193 from six import text_type
1194 from six import unichr as six_unichr
1195 from six.moves import xrange as six_xrange
1199 from termcolor import colored
1200 except ImportError: # pragma: no cover
1201 def colored(what, *args, **kwargs):
1252 "TagClassApplication",
1255 "TagClassUniversal",
1256 "TagFormConstructed",
1267 TagClassUniversal = 0
1268 TagClassApplication = 1 << 6
1269 TagClassContext = 1 << 7
1270 TagClassPrivate = 1 << 6 | 1 << 7
1271 TagFormPrimitive = 0
1272 TagFormConstructed = 1 << 5
1274 TagClassContext: "",
1275 TagClassApplication: "APPLICATION ",
1276 TagClassPrivate: "PRIVATE ",
1277 TagClassUniversal: "UNIV ",
1281 LENINDEF = b"\x80" # length indefinite mark
1282 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1283 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1284 SET01 = frozenset("01")
1285 DECIMALS = frozenset(digits)
1286 DECIMAL_SIGNS = ".,"
1287 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1290 def file_mmaped(fd):
1291 """Make mmap-ed memoryview for reading from file
1293 :param fd: file object
1294 :returns: memoryview over read-only mmap-ing of the whole file
1296 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1300 if not set(value) <= DECIMALS:
1301 raise ValueError("non-pure integer")
1305 def fractions2float(fractions_raw):
1306 pureint(fractions_raw)
1307 return float("0." + fractions_raw)
1310 def get_def_by_path(defines_by_path, sub_decode_path):
1311 """Get define by decode path
1313 for path, define in defines_by_path:
1314 if len(path) != len(sub_decode_path):
1316 for p1, p2 in zip(path, sub_decode_path):
1317 if (p1 is not any) and (p1 != p2):
1323 ########################################################################
1325 ########################################################################
1327 class ASN1Error(ValueError):
1331 class DecodeError(ASN1Error):
1332 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1334 :param str msg: reason of decode failing
1335 :param klass: optional exact DecodeError inherited class (like
1336 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1337 :py:exc:`InvalidLength`)
1338 :param decode_path: tuple of strings. It contains human
1339 readable names of the fields through which
1340 decoding process has passed
1341 :param int offset: binary offset where failure happened
1343 super(DecodeError, self).__init__()
1346 self.decode_path = decode_path
1347 self.offset = offset
1352 "" if self.klass is None else self.klass.__name__,
1354 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1355 if len(self.decode_path) > 0 else ""
1357 ("(at %d)" % self.offset) if self.offset > 0 else "",
1363 return "%s(%s)" % (self.__class__.__name__, self)
1366 class NotEnoughData(DecodeError):
1370 class ExceedingData(ASN1Error):
1371 def __init__(self, nbytes):
1372 super(ExceedingData, self).__init__()
1373 self.nbytes = nbytes
1376 return "%d trailing bytes" % self.nbytes
1379 return "%s(%s)" % (self.__class__.__name__, self)
1382 class LenIndefForm(DecodeError):
1386 class TagMismatch(DecodeError):
1390 class InvalidLength(DecodeError):
1394 class InvalidOID(DecodeError):
1398 class ObjUnknown(ASN1Error):
1399 def __init__(self, name):
1400 super(ObjUnknown, self).__init__()
1404 return "object is unknown: %s" % self.name
1407 return "%s(%s)" % (self.__class__.__name__, self)
1410 class ObjNotReady(ASN1Error):
1411 def __init__(self, name):
1412 super(ObjNotReady, self).__init__()
1416 return "object is not ready: %s" % self.name
1419 return "%s(%s)" % (self.__class__.__name__, self)
1422 class InvalidValueType(ASN1Error):
1423 def __init__(self, expected_types):
1424 super(InvalidValueType, self).__init__()
1425 self.expected_types = expected_types
1428 return "invalid value type, expected: %s" % ", ".join(
1429 [repr(t) for t in self.expected_types]
1433 return "%s(%s)" % (self.__class__.__name__, self)
1436 class BoundsError(ASN1Error):
1437 def __init__(self, bound_min, value, bound_max):
1438 super(BoundsError, self).__init__()
1439 self.bound_min = bound_min
1441 self.bound_max = bound_max
1444 return "unsatisfied bounds: %s <= %s <= %s" % (
1451 return "%s(%s)" % (self.__class__.__name__, self)
1454 ########################################################################
1456 ########################################################################
1458 _hexdecoder = getdecoder("hex")
1459 _hexencoder = getencoder("hex")
1463 """Binary data to hexadecimal string convert
1465 return _hexdecoder(data)[0]
1469 """Hexadecimal string to binary data convert
1471 return _hexencoder(data)[0].decode("ascii")
1474 def int_bytes_len(num, byte_len=8):
1477 return int(ceil(float(num.bit_length()) / byte_len))
1480 def zero_ended_encode(num):
1481 octets = bytearray(int_bytes_len(num, 7))
1483 octets[i] = num & 0x7F
1487 octets[i] = 0x80 | (num & 0x7F)
1490 return bytes(octets)
1493 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1494 """Encode tag to binary form
1496 :param int num: tag's number
1497 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1498 :py:data:`pyderasn.TagClassContext`,
1499 :py:data:`pyderasn.TagClassApplication`,
1500 :py:data:`pyderasn.TagClassPrivate`)
1501 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1502 :py:data:`pyderasn.TagFormConstructed`)
1506 return int2byte(klass | form | num)
1507 # [XX|X|11111][1.......][1.......] ... [0.......]
1508 return int2byte(klass | form | 31) + zero_ended_encode(num)
1511 def tag_decode(tag):
1512 """Decode tag from binary form
1516 No validation is performed, assuming that it has already passed.
1518 It returns tuple with three integers, as
1519 :py:func:`pyderasn.tag_encode` accepts.
1521 first_octet = byte2int(tag)
1522 klass = first_octet & 0xC0
1523 form = first_octet & 0x20
1524 if first_octet & 0x1F < 0x1F:
1525 return (klass, form, first_octet & 0x1F)
1527 for octet in iterbytes(tag[1:]):
1530 return (klass, form, num)
1534 """Create CONTEXT PRIMITIVE tag
1536 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1540 """Create CONTEXT CONSTRUCTED tag
1542 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1545 def tag_strip(data):
1546 """Take off tag from the data
1548 :returns: (encoded tag, tag length, remaining data)
1551 raise NotEnoughData("no data at all")
1552 if byte2int(data) & 0x1F < 31:
1553 return data[:1], 1, data[1:]
1558 raise DecodeError("unfinished tag")
1559 if indexbytes(data, i) & 0x80 == 0:
1561 if i == 1 and indexbytes(data, 1) < 0x1F:
1562 raise DecodeError("unexpected long form")
1563 if i > 1 and indexbytes(data, 1) & 0x7F == 0:
1564 raise DecodeError("leading zero byte in tag value")
1566 return data[:i], i, data[i:]
1572 octets = bytearray(int_bytes_len(l) + 1)
1573 octets[0] = 0x80 | (len(octets) - 1)
1574 for i in six_xrange(len(octets) - 1, 0, -1):
1575 octets[i] = l & 0xFF
1577 return bytes(octets)
1580 def len_decode(data):
1583 :returns: (decoded length, length's length, remaining data)
1584 :raises LenIndefForm: if indefinite form encoding is met
1587 raise NotEnoughData("no data at all")
1588 first_octet = byte2int(data)
1589 if first_octet & 0x80 == 0:
1590 return first_octet, 1, data[1:]
1591 octets_num = first_octet & 0x7F
1592 if octets_num + 1 > len(data):
1593 raise NotEnoughData("encoded length is longer than data")
1595 raise LenIndefForm()
1596 if byte2int(data[1:]) == 0:
1597 raise DecodeError("leading zeros")
1599 for v in iterbytes(data[1:1 + octets_num]):
1602 raise DecodeError("long form instead of short one")
1603 return l, 1 + octets_num, data[1 + octets_num:]
1606 LEN0 = len_encode(0)
1607 LEN1 = len_encode(1)
1608 LEN1K = len_encode(1000)
1612 """How many bytes length field will take
1616 if l < 256: # 1 << 8
1618 if l < 65536: # 1 << 16
1620 if l < 16777216: # 1 << 24
1622 if l < 4294967296: # 1 << 32
1624 if l < 1099511627776: # 1 << 40
1626 if l < 281474976710656: # 1 << 48
1628 if l < 72057594037927936: # 1 << 56
1630 raise OverflowError("too big length")
1633 def write_full(writer, data):
1634 """Fully write provided data
1636 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1638 BytesIO does not guarantee that the whole data will be written at
1639 once. That function write everything provided, raising an error if
1640 ``writer`` returns None.
1642 data = memoryview(data)
1644 while written != len(data):
1645 n = writer(data[written:])
1647 raise ValueError("can not write to buf")
1651 # If it is 64-bit system, then use compact 64-bit array of unsigned
1652 # longs. Use an ordinary list with universal integers otherwise, that
1654 if sys_maxsize > 2 ** 32:
1655 def state_2pass_new():
1658 def state_2pass_new():
1662 ########################################################################
1664 ########################################################################
1666 class AutoAddSlots(type):
1667 def __new__(cls, name, bases, _dict):
1668 _dict["__slots__"] = _dict.get("__slots__", ())
1669 return type.__new__(cls, name, bases, _dict)
1672 BasicState = namedtuple("BasicState", (
1685 ), **NAMEDTUPLE_KWARGS)
1688 @add_metaclass(AutoAddSlots)
1690 """Common ASN.1 object class
1692 All ASN.1 types are inherited from it. It has metaclass that
1693 automatically adds ``__slots__`` to all inherited classes.
1718 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1719 self._expl = getattr(self, "expl", None) if expl is None else expl
1720 if self.tag != self.tag_default and self._expl is not None:
1721 raise ValueError("implicit and explicit tags can not be set simultaneously")
1722 if self.tag is None:
1723 self._tag_order = None
1725 tag_class, _, tag_num = tag_decode(
1726 self.tag if self._expl is None else self._expl
1728 self._tag_order = (tag_class, tag_num)
1729 if default is not None:
1731 self.optional = optional
1732 self.offset, self.llen, self.vlen = _decoded
1734 self.expl_lenindef = False
1735 self.lenindef = False
1736 self.ber_encoded = False
1739 def ready(self): # pragma: no cover
1740 """Is object ready to be encoded?
1742 raise NotImplementedError()
1744 def _assert_ready(self):
1746 raise ObjNotReady(self.__class__.__name__)
1750 """Is either object or any elements inside is BER encoded?
1752 return self.expl_lenindef or self.lenindef or self.ber_encoded
1756 """Is object decoded?
1758 return (self.llen + self.vlen) > 0
1760 def __getstate__(self): # pragma: no cover
1761 """Used for making safe to be mutable pickleable copies
1763 raise NotImplementedError()
1765 def __setstate__(self, state):
1766 if state.version != __version__:
1767 raise ValueError("data is pickled by different PyDERASN version")
1768 self.tag = state.tag
1769 self._tag_order = state.tag_order
1770 self._expl = state.expl
1771 self.default = state.default
1772 self.optional = state.optional
1773 self.offset = state.offset
1774 self.llen = state.llen
1775 self.vlen = state.vlen
1776 self.expl_lenindef = state.expl_lenindef
1777 self.lenindef = state.lenindef
1778 self.ber_encoded = state.ber_encoded
1781 def tag_order(self):
1782 """Tag's (class, number) used for DER/CER sorting
1784 return self._tag_order
1787 def tag_order_cer(self):
1788 return self.tag_order
1792 """.. seealso:: :ref:`decoding`
1794 return len(self.tag)
1798 """.. seealso:: :ref:`decoding`
1800 return self.tlen + self.llen + self.vlen
1802 def __str__(self): # pragma: no cover
1803 return self.__bytes__() if PY2 else self.__unicode__()
1805 def __ne__(self, their):
1806 return not(self == their)
1808 def __gt__(self, their): # pragma: no cover
1809 return not(self < their)
1811 def __le__(self, their): # pragma: no cover
1812 return (self == their) or (self < their)
1814 def __ge__(self, their): # pragma: no cover
1815 return (self == their) or (self > their)
1817 def _encode(self): # pragma: no cover
1818 raise NotImplementedError()
1820 def _encode_cer(self, writer):
1821 write_full(writer, self._encode())
1823 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1824 yield NotImplemented
1826 def _encode1st(self, state):
1827 raise NotImplementedError()
1829 def _encode2nd(self, writer, state_iter):
1830 raise NotImplementedError()
1833 """DER encode the structure
1835 :returns: DER representation
1837 raw = self._encode()
1838 if self._expl is None:
1840 return b"".join((self._expl, len_encode(len(raw)), raw))
1842 def encode1st(self, state=None):
1843 """Do the 1st pass of 2-pass encoding
1845 :rtype: (int, array("L"))
1846 :returns: full length of encoded data and precalculated various
1850 state = state_2pass_new()
1851 if self._expl is None:
1852 return self._encode1st(state)
1854 idx = len(state) - 1
1855 vlen, _ = self._encode1st(state)
1857 fulllen = len(self._expl) + len_size(vlen) + vlen
1858 return fulllen, state
1860 def encode2nd(self, writer, state_iter):
1861 """Do the 2nd pass of 2-pass encoding
1863 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1864 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1866 if self._expl is None:
1867 self._encode2nd(writer, state_iter)
1869 write_full(writer, self._expl + len_encode(next(state_iter)))
1870 self._encode2nd(writer, state_iter)
1872 def encode_cer(self, writer):
1873 """CER encode the structure to specified writer
1875 :param writer: must comply with ``io.RawIOBase.write``
1876 behaviour. It takes slice to be written and
1877 returns number of bytes processed. If it returns
1878 None, then exception will be raised
1880 if self._expl is not None:
1881 write_full(writer, self._expl + LENINDEF)
1882 if getattr(self, "der_forced", False):
1883 write_full(writer, self._encode())
1885 self._encode_cer(writer)
1886 if self._expl is not None:
1887 write_full(writer, EOC)
1889 def hexencode(self):
1890 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1892 return hexenc(self.encode())
1902 _ctx_immutable=True,
1906 :param data: either binary or memoryview
1907 :param int offset: initial data's offset
1908 :param bool leavemm: do we need to leave memoryview of remaining
1909 data as is, or convert it to bytes otherwise
1910 :param decode_path: current decode path (tuples of strings,
1911 possibly with DecodePathDefBy) with will be
1912 the root for all underlying objects
1913 :param ctx: optional :ref:`context <ctx>` governing decoding process
1914 :param bool tag_only: decode only the tag, without length and
1915 contents (used only in Choice and Set
1916 structures, trying to determine if tag satisfies
1918 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1920 :returns: (Obj, remaining data)
1922 .. seealso:: :ref:`decoding`
1924 result = next(self.decode_evgen(
1936 _, obj, tail = result
1947 _ctx_immutable=True,
1950 """Decode with evgen mode on
1952 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1953 it returns the generator producing ``(decode_path, obj, tail)``
1955 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1959 elif _ctx_immutable:
1961 tlv = memoryview(data)
1964 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1967 if self._expl is None:
1968 for result in self._decode(
1971 decode_path=decode_path,
1974 evgen_mode=_evgen_mode,
1979 _decode_path, obj, tail = result
1980 if _decode_path is not decode_path:
1984 t, tlen, lv = tag_strip(tlv)
1985 except DecodeError as err:
1986 raise err.__class__(
1988 klass=self.__class__,
1989 decode_path=decode_path,
1994 klass=self.__class__,
1995 decode_path=decode_path,
1999 l, llen, v = len_decode(lv)
2000 except LenIndefForm as err:
2001 if not ctx.get("bered", False):
2002 raise err.__class__(
2004 klass=self.__class__,
2005 decode_path=decode_path,
2009 offset += tlen + llen
2010 for result in self._decode(
2013 decode_path=decode_path,
2016 evgen_mode=_evgen_mode,
2018 if tag_only: # pragma: no cover
2021 _decode_path, obj, tail = result
2022 if _decode_path is not decode_path:
2024 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2025 if eoc_expected.tobytes() != EOC:
2028 klass=self.__class__,
2029 decode_path=decode_path,
2033 obj.expl_lenindef = True
2034 except DecodeError as err:
2035 raise err.__class__(
2037 klass=self.__class__,
2038 decode_path=decode_path,
2043 raise NotEnoughData(
2044 "encoded length is longer than data",
2045 klass=self.__class__,
2046 decode_path=decode_path,
2049 for result in self._decode(
2051 offset=offset + tlen + llen,
2052 decode_path=decode_path,
2055 evgen_mode=_evgen_mode,
2057 if tag_only: # pragma: no cover
2060 _decode_path, obj, tail = result
2061 if _decode_path is not decode_path:
2063 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2065 "explicit tag out-of-bound, longer than data",
2066 klass=self.__class__,
2067 decode_path=decode_path,
2070 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2072 def decod(self, data, offset=0, decode_path=(), ctx=None):
2073 """Decode the data, check that tail is empty
2075 :raises ExceedingData: if tail is not empty
2077 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2078 (decode without tail) that also checks that there is no
2081 obj, tail = self.decode(
2084 decode_path=decode_path,
2089 raise ExceedingData(len(tail))
2092 def hexdecode(self, data, *args, **kwargs):
2093 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2095 return self.decode(hexdec(data), *args, **kwargs)
2097 def hexdecod(self, data, *args, **kwargs):
2098 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2100 return self.decod(hexdec(data), *args, **kwargs)
2104 """.. seealso:: :ref:`decoding`
2106 return self._expl is not None
2110 """.. seealso:: :ref:`decoding`
2115 def expl_tlen(self):
2116 """.. seealso:: :ref:`decoding`
2118 return len(self._expl)
2121 def expl_llen(self):
2122 """.. seealso:: :ref:`decoding`
2124 if self.expl_lenindef:
2126 return len(len_encode(self.tlvlen))
2129 def expl_offset(self):
2130 """.. seealso:: :ref:`decoding`
2132 return self.offset - self.expl_tlen - self.expl_llen
2135 def expl_vlen(self):
2136 """.. seealso:: :ref:`decoding`
2141 def expl_tlvlen(self):
2142 """.. seealso:: :ref:`decoding`
2144 return self.expl_tlen + self.expl_llen + self.expl_vlen
2147 def fulloffset(self):
2148 """.. seealso:: :ref:`decoding`
2150 return self.expl_offset if self.expled else self.offset
2154 """.. seealso:: :ref:`decoding`
2156 return self.expl_tlvlen if self.expled else self.tlvlen
2158 def pps_lenindef(self, decode_path):
2159 if self.lenindef and not (
2160 getattr(self, "defined", None) is not None and
2161 self.defined[1].lenindef
2164 asn1_type_name="EOC",
2166 decode_path=decode_path,
2168 self.offset + self.tlvlen -
2169 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2177 if self.expl_lenindef:
2179 asn1_type_name="EOC",
2180 obj_name="EXPLICIT",
2181 decode_path=decode_path,
2182 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2191 def encode_cer(obj):
2192 """Encode to CER in memory buffer
2194 :returns bytes: memory buffer contents
2197 obj.encode_cer(buf.write)
2198 return buf.getvalue()
2201 def encode2pass(obj):
2202 """Encode (2-pass mode) to DER in memory buffer
2204 :returns bytes: memory buffer contents
2207 _, state = obj.encode1st()
2208 obj.encode2nd(buf.write, iter(state))
2209 return buf.getvalue()
2212 class DecodePathDefBy(object):
2213 """DEFINED BY representation inside decode path
2215 __slots__ = ("defined_by",)
2217 def __init__(self, defined_by):
2218 self.defined_by = defined_by
2220 def __ne__(self, their):
2221 return not(self == their)
2223 def __eq__(self, their):
2224 if not isinstance(their, self.__class__):
2226 return self.defined_by == their.defined_by
2229 return "DEFINED BY " + str(self.defined_by)
2232 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2235 ########################################################################
2237 ########################################################################
2239 PP = namedtuple("PP", (
2262 ), **NAMEDTUPLE_KWARGS)
2267 asn1_type_name="unknown",
2284 expl_lenindef=False,
2315 def _colourize(what, colour, with_colours, attrs=("bold",)):
2316 return colored(what, colour, attrs=attrs) if with_colours else what
2319 def colonize_hex(hexed):
2320 """Separate hexadecimal string with colons
2322 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2325 def find_oid_name(asn1_type_name, oid_maps, value):
2326 if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2327 for oid_map in oid_maps:
2328 oid_name = oid_map.get(value)
2329 if oid_name is not None:
2340 with_decode_path=False,
2341 decode_path_len_decrease=0,
2348 " " if pp.expl_offset is None else
2349 ("-%d" % (pp.offset - pp.expl_offset))
2351 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2353 col = _colourize(col, "red", with_colours, ())
2354 col += _colourize("B", "red", with_colours) if pp.bered else " "
2356 col = "[%d,%d,%4d]%s" % (
2357 pp.tlen, pp.llen, pp.vlen,
2358 LENINDEF_PP_CHAR if pp.lenindef else " "
2360 col = _colourize(col, "green", with_colours, ())
2362 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2363 if decode_path_len > 0:
2364 cols.append(" ." * decode_path_len)
2365 ent = pp.decode_path[-1]
2366 if isinstance(ent, DecodePathDefBy):
2367 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2368 value = str(ent.defined_by)
2369 oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2370 if oid_name is None:
2371 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2373 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2375 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2376 if pp.expl is not None:
2377 klass, _, num = pp.expl
2378 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2379 cols.append(_colourize(col, "blue", with_colours))
2380 if pp.impl is not None:
2381 klass, _, num = pp.impl
2382 col = "[%s%d]" % (TagClassReprs[klass], num)
2383 cols.append(_colourize(col, "blue", with_colours))
2384 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2385 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2387 cols.append(_colourize("BER", "red", with_colours))
2388 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2389 if pp.value is not None:
2391 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2392 oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2393 if oid_name is not None:
2394 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2395 if pp.asn1_type_name == Integer.asn1_type_name:
2396 cols.append(_colourize(
2397 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2400 if pp.blob.__class__ == binary_type:
2401 cols.append(hexenc(pp.blob))
2402 elif pp.blob.__class__ == tuple:
2403 cols.append(", ".join(pp.blob))
2405 cols.append(_colourize("OPTIONAL", "red", with_colours))
2407 cols.append(_colourize("DEFAULT", "red", with_colours))
2408 if with_decode_path:
2409 cols.append(_colourize(
2410 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2414 return " ".join(cols)
2417 def pp_console_blob(pp, decode_path_len_decrease=0):
2418 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2419 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2420 if decode_path_len > 0:
2421 cols.append(" ." * (decode_path_len + 1))
2422 if pp.blob.__class__ == binary_type:
2423 blob = hexenc(pp.blob).upper()
2424 for i in six_xrange(0, len(blob), 32):
2425 chunk = blob[i:i + 32]
2426 yield " ".join(cols + [colonize_hex(chunk)])
2427 elif pp.blob.__class__ == tuple:
2428 yield " ".join(cols + [", ".join(pp.blob)])
2436 with_decode_path=False,
2437 decode_path_only=(),
2440 """Pretty print object
2442 :param Obj obj: object you want to pretty print
2443 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2444 Its human readable form is printed when OID is met
2445 :param big_blobs: if large binary objects are met (like OctetString
2446 values), do we need to print them too, on separate
2448 :param with_colours: colourize output, if ``termcolor`` library
2450 :param with_decode_path: print decode path
2451 :param decode_path_only: print only that specified decode path
2453 def _pprint_pps(pps):
2455 if hasattr(pp, "_fields"):
2457 decode_path_only != () and
2459 str(p) for p in pp.decode_path[:len(decode_path_only)]
2460 ) != decode_path_only
2464 yield pp_console_row(
2469 with_colours=with_colours,
2470 with_decode_path=with_decode_path,
2471 decode_path_len_decrease=len(decode_path_only),
2473 for row in pp_console_blob(
2475 decode_path_len_decrease=len(decode_path_only),
2479 yield pp_console_row(
2484 with_colours=with_colours,
2485 with_decode_path=with_decode_path,
2486 decode_path_len_decrease=len(decode_path_only),
2489 for row in _pprint_pps(pp):
2491 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2494 ########################################################################
2495 # ASN.1 primitive types
2496 ########################################################################
2498 BooleanState = namedtuple(
2500 BasicState._fields + ("value",),
2506 """``BOOLEAN`` boolean type
2508 >>> b = Boolean(True)
2510 >>> b == Boolean(True)
2516 tag_default = tag_encode(1)
2517 asn1_type_name = "BOOLEAN"
2529 :param value: set the value. Either boolean type, or
2530 :py:class:`pyderasn.Boolean` object
2531 :param bytes impl: override default tag with ``IMPLICIT`` one
2532 :param bytes expl: override default tag with ``EXPLICIT`` one
2533 :param default: set default value. Type same as in ``value``
2534 :param bool optional: is object ``OPTIONAL`` in sequence
2536 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2537 self._value = None if value is None else self._value_sanitize(value)
2538 if default is not None:
2539 default = self._value_sanitize(default)
2540 self.default = self.__class__(
2546 self._value = default
2548 def _value_sanitize(self, value):
2549 if value.__class__ == bool:
2551 if issubclass(value.__class__, Boolean):
2553 raise InvalidValueType((self.__class__, bool))
2557 return self._value is not None
2559 def __getstate__(self):
2560 return BooleanState(
2576 def __setstate__(self, state):
2577 super(Boolean, self).__setstate__(state)
2578 self._value = state.value
2580 def __nonzero__(self):
2581 self._assert_ready()
2585 self._assert_ready()
2588 def __eq__(self, their):
2589 if their.__class__ == bool:
2590 return self._value == their
2591 if not issubclass(their.__class__, Boolean):
2594 self._value == their._value and
2595 self.tag == their.tag and
2596 self._expl == their._expl
2607 return self.__class__(
2609 impl=self.tag if impl is None else impl,
2610 expl=self._expl if expl is None else expl,
2611 default=self.default if default is None else default,
2612 optional=self.optional if optional is None else optional,
2616 self._assert_ready()
2617 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2619 def _encode1st(self, state):
2620 return len(self.tag) + 2, state
2622 def _encode2nd(self, writer, state_iter):
2623 self._assert_ready()
2624 write_full(writer, self._encode())
2626 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2628 t, _, lv = tag_strip(tlv)
2629 except DecodeError as err:
2630 raise err.__class__(
2632 klass=self.__class__,
2633 decode_path=decode_path,
2638 klass=self.__class__,
2639 decode_path=decode_path,
2646 l, _, v = len_decode(lv)
2647 except DecodeError as err:
2648 raise err.__class__(
2650 klass=self.__class__,
2651 decode_path=decode_path,
2655 raise InvalidLength(
2656 "Boolean's length must be equal to 1",
2657 klass=self.__class__,
2658 decode_path=decode_path,
2662 raise NotEnoughData(
2663 "encoded length is longer than data",
2664 klass=self.__class__,
2665 decode_path=decode_path,
2668 first_octet = byte2int(v)
2670 if first_octet == 0:
2672 elif first_octet == 0xFF:
2674 elif ctx.get("bered", False):
2679 "unacceptable Boolean value",
2680 klass=self.__class__,
2681 decode_path=decode_path,
2684 obj = self.__class__(
2688 default=self.default,
2689 optional=self.optional,
2690 _decoded=(offset, 1, 1),
2692 obj.ber_encoded = ber_encoded
2693 yield decode_path, obj, v[1:]
2696 return pp_console_row(next(self.pps()))
2698 def pps(self, decode_path=()):
2701 asn1_type_name=self.asn1_type_name,
2702 obj_name=self.__class__.__name__,
2703 decode_path=decode_path,
2704 value=str(self._value) if self.ready else None,
2705 optional=self.optional,
2706 default=self == self.default,
2707 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2708 expl=None if self._expl is None else tag_decode(self._expl),
2713 expl_offset=self.expl_offset if self.expled else None,
2714 expl_tlen=self.expl_tlen if self.expled else None,
2715 expl_llen=self.expl_llen if self.expled else None,
2716 expl_vlen=self.expl_vlen if self.expled else None,
2717 expl_lenindef=self.expl_lenindef,
2718 ber_encoded=self.ber_encoded,
2721 for pp in self.pps_lenindef(decode_path):
2725 IntegerState = namedtuple(
2727 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2733 """``INTEGER`` integer type
2735 >>> b = Integer(-123)
2737 >>> b == Integer(-123)
2742 >>> Integer(2, bounds=(1, 3))
2744 >>> Integer(5, bounds=(1, 3))
2745 Traceback (most recent call last):
2746 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2750 class Version(Integer):
2757 >>> v = Version("v1")
2764 {'v3': 2, 'v1': 0, 'v2': 1}
2766 __slots__ = ("specs", "_bound_min", "_bound_max")
2767 tag_default = tag_encode(2)
2768 asn1_type_name = "INTEGER"
2782 :param value: set the value. Either integer type, named value
2783 (if ``schema`` is specified in the class), or
2784 :py:class:`pyderasn.Integer` object
2785 :param bounds: set ``(MIN, MAX)`` value constraint.
2786 (-inf, +inf) by default
2787 :param bytes impl: override default tag with ``IMPLICIT`` one
2788 :param bytes expl: override default tag with ``EXPLICIT`` one
2789 :param default: set default value. Type same as in ``value``
2790 :param bool optional: is object ``OPTIONAL`` in sequence
2792 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2794 specs = getattr(self, "schema", {}) if _specs is None else _specs
2795 self.specs = specs if specs.__class__ == dict else dict(specs)
2796 self._bound_min, self._bound_max = getattr(
2799 (float("-inf"), float("+inf")),
2800 ) if bounds is None else bounds
2801 if value is not None:
2802 self._value = self._value_sanitize(value)
2803 if default is not None:
2804 default = self._value_sanitize(default)
2805 self.default = self.__class__(
2811 if self._value is None:
2812 self._value = default
2814 def _value_sanitize(self, value):
2815 if isinstance(value, integer_types):
2817 elif issubclass(value.__class__, Integer):
2818 value = value._value
2819 elif value.__class__ == str:
2820 value = self.specs.get(value)
2822 raise ObjUnknown("integer value: %s" % value)
2824 raise InvalidValueType((self.__class__, int, str))
2825 if not self._bound_min <= value <= self._bound_max:
2826 raise BoundsError(self._bound_min, value, self._bound_max)
2831 return self._value is not None
2833 def __getstate__(self):
2834 return IntegerState(
2853 def __setstate__(self, state):
2854 super(Integer, self).__setstate__(state)
2855 self.specs = state.specs
2856 self._value = state.value
2857 self._bound_min = state.bound_min
2858 self._bound_max = state.bound_max
2861 self._assert_ready()
2862 return int(self._value)
2865 """Hexadecimal representation
2867 Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2869 hex_repr = hex(int(self))[2:].upper()
2870 if len(hex_repr) % 2 != 0:
2871 hex_repr = "0" + hex_repr
2875 self._assert_ready()
2876 return hash(b"".join((
2878 bytes(self._expl or b""),
2879 str(self._value).encode("ascii"),
2882 def __eq__(self, their):
2883 if isinstance(their, integer_types):
2884 return self._value == their
2885 if not issubclass(their.__class__, Integer):
2888 self._value == their._value and
2889 self.tag == their.tag and
2890 self._expl == their._expl
2893 def __lt__(self, their):
2894 return self._value < their._value
2898 """Return named representation (if exists) of the value
2900 for name, value in iteritems(self.specs):
2901 if value == self._value:
2914 return self.__class__(
2917 (self._bound_min, self._bound_max)
2918 if bounds is None else bounds
2920 impl=self.tag if impl is None else impl,
2921 expl=self._expl if expl is None else expl,
2922 default=self.default if default is None else default,
2923 optional=self.optional if optional is None else optional,
2927 def _encode_payload(self):
2928 self._assert_ready()
2932 octets = bytearray([0])
2936 octets = bytearray()
2938 octets.append((value & 0xFF) ^ 0xFF)
2940 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2943 octets = bytearray()
2945 octets.append(value & 0xFF)
2947 if octets[-1] & 0x80 > 0:
2950 octets = bytes(octets)
2952 bytes_len = ceil(value.bit_length() / 8) or 1
2955 octets = value.to_bytes(
2960 except OverflowError:
2967 octets = self._encode_payload()
2968 return b"".join((self.tag, len_encode(len(octets)), octets))
2970 def _encode1st(self, state):
2971 l = len(self._encode_payload())
2972 return len(self.tag) + len_size(l) + l, state
2974 def _encode2nd(self, writer, state_iter):
2975 write_full(writer, self._encode())
2977 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2979 t, _, lv = tag_strip(tlv)
2980 except DecodeError as err:
2981 raise err.__class__(
2983 klass=self.__class__,
2984 decode_path=decode_path,
2989 klass=self.__class__,
2990 decode_path=decode_path,
2997 l, llen, v = len_decode(lv)
2998 except DecodeError as err:
2999 raise err.__class__(
3001 klass=self.__class__,
3002 decode_path=decode_path,
3006 raise NotEnoughData(
3007 "encoded length is longer than data",
3008 klass=self.__class__,
3009 decode_path=decode_path,
3013 raise NotEnoughData(
3015 klass=self.__class__,
3016 decode_path=decode_path,
3019 v, tail = v[:l], v[l:]
3020 first_octet = byte2int(v)
3022 second_octet = byte2int(v[1:])
3024 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3025 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3028 "non normalized integer",
3029 klass=self.__class__,
3030 decode_path=decode_path,
3035 if first_octet & 0x80 > 0:
3036 octets = bytearray()
3037 for octet in bytearray(v):
3038 octets.append(octet ^ 0xFF)
3039 for octet in octets:
3040 value = (value << 8) | octet
3044 for octet in bytearray(v):
3045 value = (value << 8) | octet
3047 value = int.from_bytes(v, byteorder="big", signed=True)
3049 obj = self.__class__(
3051 bounds=(self._bound_min, self._bound_max),
3054 default=self.default,
3055 optional=self.optional,
3057 _decoded=(offset, llen, l),
3059 except BoundsError as err:
3062 klass=self.__class__,
3063 decode_path=decode_path,
3066 yield decode_path, obj, tail
3069 return pp_console_row(next(self.pps()))
3071 def pps(self, decode_path=()):
3074 asn1_type_name=self.asn1_type_name,
3075 obj_name=self.__class__.__name__,
3076 decode_path=decode_path,
3077 value=(self.named or str(self._value)) if self.ready else None,
3078 optional=self.optional,
3079 default=self == self.default,
3080 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3081 expl=None if self._expl is None else tag_decode(self._expl),
3086 expl_offset=self.expl_offset if self.expled else None,
3087 expl_tlen=self.expl_tlen if self.expled else None,
3088 expl_llen=self.expl_llen if self.expled else None,
3089 expl_vlen=self.expl_vlen if self.expled else None,
3090 expl_lenindef=self.expl_lenindef,
3093 for pp in self.pps_lenindef(decode_path):
3097 BitStringState = namedtuple(
3099 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3104 class BitString(Obj):
3105 """``BIT STRING`` bit string type
3107 >>> BitString(b"hello world")
3108 BIT STRING 88 bits 68656c6c6f20776f726c64
3111 >>> b == b"hello world"
3116 >>> BitString("'0A3B5F291CD'H")
3117 BIT STRING 44 bits 0a3b5f291cd0
3118 >>> b = BitString("'010110000000'B")
3119 BIT STRING 12 bits 5800
3122 >>> b[0], b[1], b[2], b[3]
3123 (False, True, False, True)
3127 [False, True, False, True, True, False, False, False, False, False, False, False]
3131 class KeyUsage(BitString):
3133 ("digitalSignature", 0),
3134 ("nonRepudiation", 1),
3135 ("keyEncipherment", 2),
3138 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3139 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3141 ['nonRepudiation', 'keyEncipherment']
3143 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3147 Pay attention that BIT STRING can be encoded both in primitive
3148 and constructed forms. Decoder always checks constructed form tag
3149 additionally to specified primitive one. If BER decoding is
3150 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3151 of DER restrictions.
3153 __slots__ = ("tag_constructed", "specs", "defined")
3154 tag_default = tag_encode(3)
3155 asn1_type_name = "BIT STRING"
3168 :param value: set the value. Either binary type, tuple of named
3169 values (if ``schema`` is specified in the class),
3170 string in ``'XXX...'B`` form, or
3171 :py:class:`pyderasn.BitString` object
3172 :param bytes impl: override default tag with ``IMPLICIT`` one
3173 :param bytes expl: override default tag with ``EXPLICIT`` one
3174 :param default: set default value. Type same as in ``value``
3175 :param bool optional: is object ``OPTIONAL`` in sequence
3177 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3178 specs = getattr(self, "schema", {}) if _specs is None else _specs
3179 self.specs = specs if specs.__class__ == dict else dict(specs)
3180 self._value = None if value is None else self._value_sanitize(value)
3181 if default is not None:
3182 default = self._value_sanitize(default)
3183 self.default = self.__class__(
3189 self._value = default
3191 tag_klass, _, tag_num = tag_decode(self.tag)
3192 self.tag_constructed = tag_encode(
3194 form=TagFormConstructed,
3198 def _bits2octets(self, bits):
3199 if len(self.specs) > 0:
3200 bits = bits.rstrip("0")
3202 bits += "0" * ((8 - (bit_len % 8)) % 8)
3203 octets = bytearray(len(bits) // 8)
3204 for i in six_xrange(len(octets)):
3205 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3206 return bit_len, bytes(octets)
3208 def _value_sanitize(self, value):
3209 if isinstance(value, (string_types, binary_type)):
3211 isinstance(value, string_types) and
3212 value.startswith("'")
3214 if value.endswith("'B"):
3216 if not frozenset(value) <= SET01:
3217 raise ValueError("B's coding contains unacceptable chars")
3218 return self._bits2octets(value)
3219 if value.endswith("'H"):
3223 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3225 if value.__class__ == binary_type:
3226 return (len(value) * 8, value)
3227 raise InvalidValueType((self.__class__, string_types, binary_type))
3228 if value.__class__ == tuple:
3231 isinstance(value[0], integer_types) and
3232 value[1].__class__ == binary_type
3237 bit = self.specs.get(name)
3239 raise ObjUnknown("BitString value: %s" % name)
3242 return self._bits2octets("")
3243 bits = frozenset(bits)
3244 return self._bits2octets("".join(
3245 ("1" if bit in bits else "0")
3246 for bit in six_xrange(max(bits) + 1)
3248 if issubclass(value.__class__, BitString):
3250 raise InvalidValueType((self.__class__, binary_type, string_types))
3254 return self._value is not None
3256 def __getstate__(self):
3257 return BitStringState(
3272 self.tag_constructed,
3276 def __setstate__(self, state):
3277 super(BitString, self).__setstate__(state)
3278 self.specs = state.specs
3279 self._value = state.value
3280 self.tag_constructed = state.tag_constructed
3281 self.defined = state.defined
3284 self._assert_ready()
3285 for i in six_xrange(self._value[0]):
3290 """Returns number of bits in the string
3292 self._assert_ready()
3293 return self._value[0]
3295 def __bytes__(self):
3296 self._assert_ready()
3297 return self._value[1]
3299 def __eq__(self, their):
3300 if their.__class__ == bytes:
3301 return self._value[1] == their
3302 if not issubclass(their.__class__, BitString):
3305 self._value == their._value and
3306 self.tag == their.tag and
3307 self._expl == their._expl
3312 """Named representation (if exists) of the bits
3314 :returns: [str(name), ...]
3316 return [name for name, bit in iteritems(self.specs) if self[bit]]
3326 return self.__class__(
3328 impl=self.tag if impl is None else impl,
3329 expl=self._expl if expl is None else expl,
3330 default=self.default if default is None else default,
3331 optional=self.optional if optional is None else optional,
3335 def __getitem__(self, key):
3336 if key.__class__ == int:
3337 bit_len, octets = self._value
3341 byte2int(memoryview(octets)[key // 8:]) >>
3344 if isinstance(key, string_types):
3345 value = self.specs.get(key)
3347 raise ObjUnknown("BitString value: %s" % key)
3349 raise InvalidValueType((int, str))
3352 self._assert_ready()
3353 bit_len, octets = self._value
3356 len_encode(len(octets) + 1),
3357 int2byte((8 - bit_len % 8) % 8),
3361 def _encode1st(self, state):
3362 self._assert_ready()
3363 _, octets = self._value
3365 return len(self.tag) + len_size(l) + l, state
3367 def _encode2nd(self, writer, state_iter):
3368 bit_len, octets = self._value
3369 write_full(writer, b"".join((
3371 len_encode(len(octets) + 1),
3372 int2byte((8 - bit_len % 8) % 8),
3374 write_full(writer, octets)
3376 def _encode_cer(self, writer):
3377 bit_len, octets = self._value
3378 if len(octets) + 1 <= 1000:
3379 write_full(writer, self._encode())
3381 write_full(writer, self.tag_constructed)
3382 write_full(writer, LENINDEF)
3383 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3384 write_full(writer, b"".join((
3385 BitString.tag_default,
3388 octets[offset:offset + 999],
3390 tail = octets[offset + 999:]
3392 tail = int2byte((8 - bit_len % 8) % 8) + tail
3393 write_full(writer, b"".join((
3394 BitString.tag_default,
3395 len_encode(len(tail)),
3398 write_full(writer, EOC)
3400 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3402 t, tlen, lv = tag_strip(tlv)
3403 except DecodeError as err:
3404 raise err.__class__(
3406 klass=self.__class__,
3407 decode_path=decode_path,
3411 if tag_only: # pragma: no cover
3415 l, llen, v = len_decode(lv)
3416 except DecodeError as err:
3417 raise err.__class__(
3419 klass=self.__class__,
3420 decode_path=decode_path,
3424 raise NotEnoughData(
3425 "encoded length is longer than data",
3426 klass=self.__class__,
3427 decode_path=decode_path,
3431 raise NotEnoughData(
3433 klass=self.__class__,
3434 decode_path=decode_path,
3437 pad_size = byte2int(v)
3438 if l == 1 and pad_size != 0:
3440 "invalid empty value",
3441 klass=self.__class__,
3442 decode_path=decode_path,
3448 klass=self.__class__,
3449 decode_path=decode_path,
3452 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3455 klass=self.__class__,
3456 decode_path=decode_path,
3459 v, tail = v[:l], v[l:]
3460 bit_len = (len(v) - 1) * 8 - pad_size
3461 obj = self.__class__(
3462 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3465 default=self.default,
3466 optional=self.optional,
3468 _decoded=(offset, llen, l),
3471 obj._value = (bit_len, None)
3472 yield decode_path, obj, tail
3474 if t != self.tag_constructed:
3476 klass=self.__class__,
3477 decode_path=decode_path,
3480 if not ctx.get("bered", False):
3482 "unallowed BER constructed encoding",
3483 klass=self.__class__,
3484 decode_path=decode_path,
3487 if tag_only: # pragma: no cover
3492 l, llen, v = len_decode(lv)
3493 except LenIndefForm:
3494 llen, l, v = 1, 0, lv[1:]
3496 except DecodeError as err:
3497 raise err.__class__(
3499 klass=self.__class__,
3500 decode_path=decode_path,
3504 raise NotEnoughData(
3505 "encoded length is longer than data",
3506 klass=self.__class__,
3507 decode_path=decode_path,
3510 if not lenindef and l == 0:
3511 raise NotEnoughData(
3513 klass=self.__class__,
3514 decode_path=decode_path,
3518 sub_offset = offset + tlen + llen
3522 if v[:EOC_LEN].tobytes() == EOC:
3529 "chunk out of bounds",
3530 klass=self.__class__,
3531 decode_path=decode_path + (str(len(chunks) - 1),),
3532 offset=chunks[-1].offset,
3534 sub_decode_path = decode_path + (str(len(chunks)),)
3537 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3540 decode_path=sub_decode_path,
3543 _ctx_immutable=False,
3545 yield _decode_path, chunk, v_tail
3547 _, chunk, v_tail = next(BitString().decode_evgen(
3550 decode_path=sub_decode_path,
3553 _ctx_immutable=False,
3558 "expected BitString encoded chunk",
3559 klass=self.__class__,
3560 decode_path=sub_decode_path,
3563 chunks.append(chunk)
3564 sub_offset += chunk.tlvlen
3565 vlen += chunk.tlvlen
3567 if len(chunks) == 0:
3570 klass=self.__class__,
3571 decode_path=decode_path,
3576 for chunk_i, chunk in enumerate(chunks[:-1]):
3577 if chunk.bit_len % 8 != 0:
3579 "BitString chunk is not multiple of 8 bits",
3580 klass=self.__class__,
3581 decode_path=decode_path + (str(chunk_i),),
3582 offset=chunk.offset,
3585 values.append(bytes(chunk))
3586 bit_len += chunk.bit_len
3587 chunk_last = chunks[-1]
3589 values.append(bytes(chunk_last))
3590 bit_len += chunk_last.bit_len
3591 obj = self.__class__(
3592 value=None if evgen_mode else (bit_len, b"".join(values)),
3595 default=self.default,
3596 optional=self.optional,
3598 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3601 obj._value = (bit_len, None)
3602 obj.lenindef = lenindef
3603 obj.ber_encoded = True
3604 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3607 return pp_console_row(next(self.pps()))
3609 def pps(self, decode_path=()):
3613 bit_len, blob = self._value
3614 value = "%d bits" % bit_len
3615 if len(self.specs) > 0 and blob is not None:
3616 blob = tuple(self.named)
3619 asn1_type_name=self.asn1_type_name,
3620 obj_name=self.__class__.__name__,
3621 decode_path=decode_path,
3624 optional=self.optional,
3625 default=self == self.default,
3626 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3627 expl=None if self._expl is None else tag_decode(self._expl),
3632 expl_offset=self.expl_offset if self.expled else None,
3633 expl_tlen=self.expl_tlen if self.expled else None,
3634 expl_llen=self.expl_llen if self.expled else None,
3635 expl_vlen=self.expl_vlen if self.expled else None,
3636 expl_lenindef=self.expl_lenindef,
3637 lenindef=self.lenindef,
3638 ber_encoded=self.ber_encoded,
3641 defined_by, defined = self.defined or (None, None)
3642 if defined_by is not None:
3644 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3646 for pp in self.pps_lenindef(decode_path):
3650 OctetStringState = namedtuple(
3652 BasicState._fields + (
3663 class OctetString(Obj):
3664 """``OCTET STRING`` binary string type
3666 >>> s = OctetString(b"hello world")
3667 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3668 >>> s == OctetString(b"hello world")
3673 >>> OctetString(b"hello", bounds=(4, 4))
3674 Traceback (most recent call last):
3675 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3676 >>> OctetString(b"hell", bounds=(4, 4))
3677 OCTET STRING 4 bytes 68656c6c
3679 Memoryviews can be used as a values. If memoryview is made on
3680 mmap-ed file, then it does not take storage inside OctetString
3681 itself. In CER encoding mode it will be streamed to the specified
3682 writer, copying 1 KB chunks.
3684 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3685 tag_default = tag_encode(4)
3686 asn1_type_name = "OCTET STRING"
3687 evgen_mode_skip_value = True
3701 :param value: set the value. Either binary type, or
3702 :py:class:`pyderasn.OctetString` object
3703 :param bounds: set ``(MIN, MAX)`` value size constraint.
3704 (-inf, +inf) by default
3705 :param bytes impl: override default tag with ``IMPLICIT`` one
3706 :param bytes expl: override default tag with ``EXPLICIT`` one
3707 :param default: set default value. Type same as in ``value``
3708 :param bool optional: is object ``OPTIONAL`` in sequence
3710 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3712 self._bound_min, self._bound_max = getattr(
3716 ) if bounds is None else bounds
3717 if value is not None:
3718 self._value = self._value_sanitize(value)
3719 if default is not None:
3720 default = self._value_sanitize(default)
3721 self.default = self.__class__(
3726 if self._value is None:
3727 self._value = default
3729 tag_klass, _, tag_num = tag_decode(self.tag)
3730 self.tag_constructed = tag_encode(
3732 form=TagFormConstructed,
3736 def _value_sanitize(self, value):
3737 if value.__class__ == binary_type or value.__class__ == memoryview:
3739 elif issubclass(value.__class__, OctetString):
3740 value = value._value
3742 raise InvalidValueType((self.__class__, bytes, memoryview))
3743 if not self._bound_min <= len(value) <= self._bound_max:
3744 raise BoundsError(self._bound_min, len(value), self._bound_max)
3749 return self._value is not None
3751 def __getstate__(self):
3752 return OctetStringState(
3768 self.tag_constructed,
3772 def __setstate__(self, state):
3773 super(OctetString, self).__setstate__(state)
3774 self._value = state.value
3775 self._bound_min = state.bound_min
3776 self._bound_max = state.bound_max
3777 self.tag_constructed = state.tag_constructed
3778 self.defined = state.defined
3780 def __bytes__(self):
3781 self._assert_ready()
3782 return bytes(self._value)
3784 def __eq__(self, their):
3785 if their.__class__ == binary_type:
3786 return self._value == their
3787 if not issubclass(their.__class__, OctetString):
3790 self._value == their._value and
3791 self.tag == their.tag and
3792 self._expl == their._expl
3795 def __lt__(self, their):
3796 return self._value < their._value
3807 return self.__class__(
3810 (self._bound_min, self._bound_max)
3811 if bounds is None else bounds
3813 impl=self.tag if impl is None else impl,
3814 expl=self._expl if expl is None else expl,
3815 default=self.default if default is None else default,
3816 optional=self.optional if optional is None else optional,
3820 self._assert_ready()
3823 len_encode(len(self._value)),
3827 def _encode1st(self, state):
3828 self._assert_ready()
3829 l = len(self._value)
3830 return len(self.tag) + len_size(l) + l, state
3832 def _encode2nd(self, writer, state_iter):
3834 write_full(writer, self.tag + len_encode(len(value)))
3835 write_full(writer, value)
3837 def _encode_cer(self, writer):
3838 octets = self._value
3839 if len(octets) <= 1000:
3840 write_full(writer, self._encode())
3842 write_full(writer, self.tag_constructed)
3843 write_full(writer, LENINDEF)
3844 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3845 write_full(writer, b"".join((
3846 OctetString.tag_default,
3848 octets[offset:offset + 1000],
3850 tail = octets[offset + 1000:]
3852 write_full(writer, b"".join((
3853 OctetString.tag_default,
3854 len_encode(len(tail)),
3857 write_full(writer, EOC)
3859 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3861 t, tlen, lv = tag_strip(tlv)
3862 except DecodeError as err:
3863 raise err.__class__(
3865 klass=self.__class__,
3866 decode_path=decode_path,
3874 l, llen, v = len_decode(lv)
3875 except DecodeError as err:
3876 raise err.__class__(
3878 klass=self.__class__,
3879 decode_path=decode_path,
3883 raise NotEnoughData(
3884 "encoded length is longer than data",
3885 klass=self.__class__,
3886 decode_path=decode_path,
3889 v, tail = v[:l], v[l:]
3890 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3892 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3893 klass=self.__class__,
3894 decode_path=decode_path,
3898 obj = self.__class__(
3900 None if (evgen_mode and self.evgen_mode_skip_value)
3903 bounds=(self._bound_min, self._bound_max),
3906 default=self.default,
3907 optional=self.optional,
3908 _decoded=(offset, llen, l),
3911 except DecodeError as err:
3914 klass=self.__class__,
3915 decode_path=decode_path,
3918 except BoundsError as err:
3921 klass=self.__class__,
3922 decode_path=decode_path,
3925 yield decode_path, obj, tail
3927 if t != self.tag_constructed:
3929 klass=self.__class__,
3930 decode_path=decode_path,
3933 if not ctx.get("bered", False):
3935 "unallowed BER constructed encoding",
3936 klass=self.__class__,
3937 decode_path=decode_path,
3945 l, llen, v = len_decode(lv)
3946 except LenIndefForm:
3947 llen, l, v = 1, 0, lv[1:]
3949 except DecodeError as err:
3950 raise err.__class__(
3952 klass=self.__class__,
3953 decode_path=decode_path,
3957 raise NotEnoughData(
3958 "encoded length is longer than data",
3959 klass=self.__class__,
3960 decode_path=decode_path,
3965 sub_offset = offset + tlen + llen
3970 if v[:EOC_LEN].tobytes() == EOC:
3977 "chunk out of bounds",
3978 klass=self.__class__,
3979 decode_path=decode_path + (str(len(chunks) - 1),),
3980 offset=chunks[-1].offset,
3984 sub_decode_path = decode_path + (str(chunks_count),)
3985 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3988 decode_path=sub_decode_path,
3991 _ctx_immutable=False,
3993 yield _decode_path, chunk, v_tail
3994 if not chunk.ber_encoded:
3995 payload_len += chunk.vlen
3998 sub_decode_path = decode_path + (str(len(chunks)),)
3999 _, chunk, v_tail = next(OctetString().decode_evgen(
4002 decode_path=sub_decode_path,
4005 _ctx_immutable=False,
4008 chunks.append(chunk)
4011 "expected OctetString encoded chunk",
4012 klass=self.__class__,
4013 decode_path=sub_decode_path,
4016 sub_offset += chunk.tlvlen
4017 vlen += chunk.tlvlen
4019 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4021 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4022 klass=self.__class__,
4023 decode_path=decode_path,
4027 obj = self.__class__(
4029 None if evgen_mode else
4030 b"".join(bytes(chunk) for chunk in chunks)
4032 bounds=(self._bound_min, self._bound_max),
4035 default=self.default,
4036 optional=self.optional,
4037 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4040 except DecodeError as err:
4043 klass=self.__class__,
4044 decode_path=decode_path,
4047 except BoundsError as err:
4050 klass=self.__class__,
4051 decode_path=decode_path,
4054 obj.lenindef = lenindef
4055 obj.ber_encoded = True
4056 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4059 return pp_console_row(next(self.pps()))
4061 def pps(self, decode_path=()):
4064 asn1_type_name=self.asn1_type_name,
4065 obj_name=self.__class__.__name__,
4066 decode_path=decode_path,
4067 value=("%d bytes" % len(self._value)) if self.ready else None,
4068 blob=self._value if self.ready else None,
4069 optional=self.optional,
4070 default=self == self.default,
4071 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4072 expl=None if self._expl is None else tag_decode(self._expl),
4077 expl_offset=self.expl_offset if self.expled else None,
4078 expl_tlen=self.expl_tlen if self.expled else None,
4079 expl_llen=self.expl_llen if self.expled else None,
4080 expl_vlen=self.expl_vlen if self.expled else None,
4081 expl_lenindef=self.expl_lenindef,
4082 lenindef=self.lenindef,
4083 ber_encoded=self.ber_encoded,
4086 defined_by, defined = self.defined or (None, None)
4087 if defined_by is not None:
4089 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4091 for pp in self.pps_lenindef(decode_path):
4095 def agg_octet_string(evgens, decode_path, raw, writer):
4096 """Aggregate constructed string (OctetString and its derivatives)
4098 :param evgens: iterator of generated events
4099 :param decode_path: points to the string we want to decode
4100 :param raw: slicebable (memoryview, bytearray, etc) with
4101 the data evgens are generated on
4102 :param writer: buffer.write where string is going to be saved
4103 :param writer: where string is going to be saved. Must comply
4104 with ``io.RawIOBase.write`` behaviour
4106 .. seealso:: :ref:`agg_octet_string`
4108 decode_path_len = len(decode_path)
4109 for dp, obj, _ in evgens:
4110 if dp[:decode_path_len] != decode_path:
4112 if not obj.ber_encoded:
4113 write_full(writer, raw[
4114 obj.offset + obj.tlen + obj.llen:
4115 obj.offset + obj.tlen + obj.llen + obj.vlen -
4116 (EOC_LEN if obj.expl_lenindef else 0)
4118 if len(dp) == decode_path_len:
4122 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4126 """``NULL`` null object
4134 tag_default = tag_encode(5)
4135 asn1_type_name = "NULL"
4139 value=None, # unused, but Sequence passes it
4146 :param bytes impl: override default tag with ``IMPLICIT`` one
4147 :param bytes expl: override default tag with ``EXPLICIT`` one
4148 :param bool optional: is object ``OPTIONAL`` in sequence
4150 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4157 def __getstate__(self):
4173 def __eq__(self, their):
4174 if not issubclass(their.__class__, Null):
4177 self.tag == their.tag and
4178 self._expl == their._expl
4188 return self.__class__(
4189 impl=self.tag if impl is None else impl,
4190 expl=self._expl if expl is None else expl,
4191 optional=self.optional if optional is None else optional,
4195 return self.tag + LEN0
4197 def _encode1st(self, state):
4198 return len(self.tag) + 1, state
4200 def _encode2nd(self, writer, state_iter):
4201 write_full(writer, self.tag + LEN0)
4203 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4205 t, _, lv = tag_strip(tlv)
4206 except DecodeError as err:
4207 raise err.__class__(
4209 klass=self.__class__,
4210 decode_path=decode_path,
4215 klass=self.__class__,
4216 decode_path=decode_path,
4219 if tag_only: # pragma: no cover
4223 l, _, v = len_decode(lv)
4224 except DecodeError as err:
4225 raise err.__class__(
4227 klass=self.__class__,
4228 decode_path=decode_path,
4232 raise InvalidLength(
4233 "Null must have zero length",
4234 klass=self.__class__,
4235 decode_path=decode_path,
4238 obj = self.__class__(
4241 optional=self.optional,
4242 _decoded=(offset, 1, 0),
4244 yield decode_path, obj, v
4247 return pp_console_row(next(self.pps()))
4249 def pps(self, decode_path=()):
4252 asn1_type_name=self.asn1_type_name,
4253 obj_name=self.__class__.__name__,
4254 decode_path=decode_path,
4255 optional=self.optional,
4256 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4257 expl=None if self._expl is None else tag_decode(self._expl),
4262 expl_offset=self.expl_offset if self.expled else None,
4263 expl_tlen=self.expl_tlen if self.expled else None,
4264 expl_llen=self.expl_llen if self.expled else None,
4265 expl_vlen=self.expl_vlen if self.expled else None,
4266 expl_lenindef=self.expl_lenindef,
4269 for pp in self.pps_lenindef(decode_path):
4273 ObjectIdentifierState = namedtuple(
4274 "ObjectIdentifierState",
4275 BasicState._fields + ("value", "defines"),
4280 class ObjectIdentifier(Obj):
4281 """``OBJECT IDENTIFIER`` OID type
4283 >>> oid = ObjectIdentifier((1, 2, 3))
4284 OBJECT IDENTIFIER 1.2.3
4285 >>> oid == ObjectIdentifier("1.2.3")
4291 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4292 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4294 >>> str(ObjectIdentifier((3, 1)))
4295 Traceback (most recent call last):
4296 pyderasn.InvalidOID: unacceptable first arc value
4298 __slots__ = ("defines",)
4299 tag_default = tag_encode(6)
4300 asn1_type_name = "OBJECT IDENTIFIER"
4313 :param value: set the value. Either tuples of integers,
4314 string of "."-concatenated integers, or
4315 :py:class:`pyderasn.ObjectIdentifier` object
4316 :param defines: sequence of tuples. Each tuple has two elements.
4317 First one is relative to current one decode
4318 path, aiming to the field defined by that OID.
4319 Read about relative path in
4320 :py:func:`pyderasn.abs_decode_path`. Second
4321 tuple element is ``{OID: pyderasn.Obj()}``
4322 dictionary, mapping between current OID value
4323 and structure applied to defined field.
4325 .. seealso:: :ref:`definedby`
4327 :param bytes impl: override default tag with ``IMPLICIT`` one
4328 :param bytes expl: override default tag with ``EXPLICIT`` one
4329 :param default: set default value. Type same as in ``value``
4330 :param bool optional: is object ``OPTIONAL`` in sequence
4332 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4334 if value is not None:
4335 self._value = self._value_sanitize(value)
4336 if default is not None:
4337 default = self._value_sanitize(default)
4338 self.default = self.__class__(
4343 if self._value is None:
4344 self._value = default
4345 self.defines = defines
4347 def __add__(self, their):
4348 if their.__class__ == tuple:
4349 return self.__class__(self._value + array("L", their))
4350 if isinstance(their, self.__class__):
4351 return self.__class__(self._value + their._value)
4352 raise InvalidValueType((self.__class__, tuple))
4354 def _value_sanitize(self, value):
4355 if issubclass(value.__class__, ObjectIdentifier):
4357 if isinstance(value, string_types):
4359 value = array("L", (pureint(arc) for arc in value.split(".")))
4361 raise InvalidOID("unacceptable arcs values")
4362 if value.__class__ == tuple:
4364 value = array("L", value)
4365 except OverflowError as err:
4366 raise InvalidOID(repr(err))
4367 if value.__class__ is array:
4369 raise InvalidOID("less than 2 arcs")
4370 first_arc = value[0]
4371 if first_arc in (0, 1):
4372 if not (0 <= value[1] <= 39):
4373 raise InvalidOID("second arc is too wide")
4374 elif first_arc == 2:
4377 raise InvalidOID("unacceptable first arc value")
4378 if not all(arc >= 0 for arc in value):
4379 raise InvalidOID("negative arc value")
4381 raise InvalidValueType((self.__class__, str, tuple))
4385 return self._value is not None
4387 def __getstate__(self):
4388 return ObjectIdentifierState(
4405 def __setstate__(self, state):
4406 super(ObjectIdentifier, self).__setstate__(state)
4407 self._value = state.value
4408 self.defines = state.defines
4411 self._assert_ready()
4412 return iter(self._value)
4415 return ".".join(str(arc) for arc in self._value or ())
4418 self._assert_ready()
4419 return hash(b"".join((
4421 bytes(self._expl or b""),
4422 str(self._value).encode("ascii"),
4425 def __eq__(self, their):
4426 if their.__class__ == tuple:
4427 return self._value == array("L", their)
4428 if not issubclass(their.__class__, ObjectIdentifier):
4431 self.tag == their.tag and
4432 self._expl == their._expl and
4433 self._value == their._value
4436 def __lt__(self, their):
4437 return self._value < their._value
4448 return self.__class__(
4450 defines=self.defines if defines is None else defines,
4451 impl=self.tag if impl is None else impl,
4452 expl=self._expl if expl is None else expl,
4453 default=self.default if default is None else default,
4454 optional=self.optional if optional is None else optional,
4457 def _encode_octets(self):
4458 self._assert_ready()
4460 first_value = value[1]
4461 first_arc = value[0]
4464 elif first_arc == 1:
4466 elif first_arc == 2:
4468 else: # pragma: no cover
4469 raise RuntimeError("invalid arc is stored")
4470 octets = [zero_ended_encode(first_value)]
4471 for arc in value[2:]:
4472 octets.append(zero_ended_encode(arc))
4473 return b"".join(octets)
4476 v = self._encode_octets()
4477 return b"".join((self.tag, len_encode(len(v)), v))
4479 def _encode1st(self, state):
4480 l = len(self._encode_octets())
4481 return len(self.tag) + len_size(l) + l, state
4483 def _encode2nd(self, writer, state_iter):
4484 write_full(writer, self._encode())
4486 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4488 t, _, lv = tag_strip(tlv)
4489 except DecodeError as err:
4490 raise err.__class__(
4492 klass=self.__class__,
4493 decode_path=decode_path,
4498 klass=self.__class__,
4499 decode_path=decode_path,
4502 if tag_only: # pragma: no cover
4506 l, llen, v = len_decode(lv)
4507 except DecodeError as err:
4508 raise err.__class__(
4510 klass=self.__class__,
4511 decode_path=decode_path,
4515 raise NotEnoughData(
4516 "encoded length is longer than data",
4517 klass=self.__class__,
4518 decode_path=decode_path,
4522 raise NotEnoughData(
4524 klass=self.__class__,
4525 decode_path=decode_path,
4528 v, tail = v[:l], v[l:]
4535 octet = indexbytes(v, i)
4536 if i == 0 and octet == 0x80:
4537 if ctx.get("bered", False):
4541 "non normalized arc encoding",
4542 klass=self.__class__,
4543 decode_path=decode_path,
4546 arc = (arc << 7) | (octet & 0x7F)
4547 if octet & 0x80 == 0:
4550 except OverflowError:
4552 "too huge value for local unsigned long",
4553 klass=self.__class__,
4554 decode_path=decode_path,
4563 klass=self.__class__,
4564 decode_path=decode_path,
4568 second_arc = arcs[0]
4569 if 0 <= second_arc <= 39:
4571 elif 40 <= second_arc <= 79:
4577 obj = self.__class__(
4578 value=array("L", (first_arc, second_arc)) + arcs[1:],
4581 default=self.default,
4582 optional=self.optional,
4583 _decoded=(offset, llen, l),
4586 obj.ber_encoded = True
4587 yield decode_path, obj, tail
4590 return pp_console_row(next(self.pps()))
4592 def pps(self, decode_path=()):
4595 asn1_type_name=self.asn1_type_name,
4596 obj_name=self.__class__.__name__,
4597 decode_path=decode_path,
4598 value=str(self) if self.ready else None,
4599 optional=self.optional,
4600 default=self == self.default,
4601 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4602 expl=None if self._expl is None else tag_decode(self._expl),
4607 expl_offset=self.expl_offset if self.expled else None,
4608 expl_tlen=self.expl_tlen if self.expled else None,
4609 expl_llen=self.expl_llen if self.expled else None,
4610 expl_vlen=self.expl_vlen if self.expled else None,
4611 expl_lenindef=self.expl_lenindef,
4612 ber_encoded=self.ber_encoded,
4615 for pp in self.pps_lenindef(decode_path):
4619 class Enumerated(Integer):
4620 """``ENUMERATED`` integer type
4622 This type is identical to :py:class:`pyderasn.Integer`, but requires
4623 schema to be specified and does not accept values missing from it.
4626 tag_default = tag_encode(10)
4627 asn1_type_name = "ENUMERATED"
4638 bounds=None, # dummy argument, workability for Integer.decode
4640 super(Enumerated, self).__init__(
4641 value, bounds, impl, expl, default, optional, _specs, _decoded,
4643 if len(self.specs) == 0:
4644 raise ValueError("schema must be specified")
4646 def _value_sanitize(self, value):
4647 if isinstance(value, self.__class__):
4648 value = value._value
4649 elif isinstance(value, integer_types):
4650 for _value in itervalues(self.specs):
4655 "unknown integer value: %s" % value,
4656 klass=self.__class__,
4658 elif isinstance(value, string_types):
4659 value = self.specs.get(value)
4661 raise ObjUnknown("integer value: %s" % value)
4663 raise InvalidValueType((self.__class__, int, str))
4675 return self.__class__(
4677 impl=self.tag if impl is None else impl,
4678 expl=self._expl if expl is None else expl,
4679 default=self.default if default is None else default,
4680 optional=self.optional if optional is None else optional,
4685 def escape_control_unicode(c):
4686 if unicat(c)[0] == "C":
4687 c = repr(c).lstrip("u").strip("'")
4691 class CommonString(OctetString):
4692 """Common class for all strings
4694 Everything resembles :py:class:`pyderasn.OctetString`, except
4695 ability to deal with unicode text strings.
4697 >>> hexenc("привет мир".encode("utf-8"))
4698 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4699 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4701 >>> s = UTF8String("привет мир")
4702 UTF8String UTF8String привет мир
4704 'привет мир'
4705 >>> hexenc(bytes(s))
4706 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4708 >>> PrintableString("привет мир")
4709 Traceback (most recent call last):
4710 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4712 >>> BMPString("ада", bounds=(2, 2))
4713 Traceback (most recent call last):
4714 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4715 >>> s = BMPString("ад", bounds=(2, 2))
4718 >>> hexenc(bytes(s))
4725 - Text Encoding, validation
4726 * - :py:class:`pyderasn.UTF8String`
4728 * - :py:class:`pyderasn.NumericString`
4729 - proper alphabet validation
4730 * - :py:class:`pyderasn.PrintableString`
4731 - proper alphabet validation
4732 * - :py:class:`pyderasn.TeletexString`
4734 * - :py:class:`pyderasn.T61String`
4736 * - :py:class:`pyderasn.VideotexString`
4738 * - :py:class:`pyderasn.IA5String`
4739 - proper alphabet validation
4740 * - :py:class:`pyderasn.GraphicString`
4742 * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4743 - proper alphabet validation
4744 * - :py:class:`pyderasn.GeneralString`
4746 * - :py:class:`pyderasn.UniversalString`
4748 * - :py:class:`pyderasn.BMPString`
4753 def _value_sanitize(self, value):
4755 value_decoded = None
4756 if isinstance(value, self.__class__):
4757 value_raw = value._value
4758 elif value.__class__ == text_type:
4759 value_decoded = value
4760 elif value.__class__ == binary_type:
4763 raise InvalidValueType((self.__class__, text_type, binary_type))
4766 value_decoded.encode(self.encoding)
4767 if value_raw is None else value_raw
4770 value_raw.decode(self.encoding)
4771 if value_decoded is None else value_decoded
4773 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4774 raise DecodeError(str(err))
4775 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4783 def __eq__(self, their):
4784 if their.__class__ == binary_type:
4785 return self._value == their
4786 if their.__class__ == text_type:
4787 return self._value == their.encode(self.encoding)
4788 if not isinstance(their, self.__class__):
4791 self._value == their._value and
4792 self.tag == their.tag and
4793 self._expl == their._expl
4796 def __unicode__(self):
4798 return self._value.decode(self.encoding)
4799 return text_type(self._value)
4802 return pp_console_row(next(self.pps(no_unicode=PY2)))
4804 def pps(self, decode_path=(), no_unicode=False):
4808 hexenc(bytes(self)) if no_unicode else
4809 "".join(escape_control_unicode(c) for c in self.__unicode__())
4813 asn1_type_name=self.asn1_type_name,
4814 obj_name=self.__class__.__name__,
4815 decode_path=decode_path,
4817 optional=self.optional,
4818 default=self == self.default,
4819 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4820 expl=None if self._expl is None else tag_decode(self._expl),
4825 expl_offset=self.expl_offset if self.expled else None,
4826 expl_tlen=self.expl_tlen if self.expled else None,
4827 expl_llen=self.expl_llen if self.expled else None,
4828 expl_vlen=self.expl_vlen if self.expled else None,
4829 expl_lenindef=self.expl_lenindef,
4830 ber_encoded=self.ber_encoded,
4833 for pp in self.pps_lenindef(decode_path):
4837 class UTF8String(CommonString):
4839 tag_default = tag_encode(12)
4841 asn1_type_name = "UTF8String"
4844 class AllowableCharsMixin(object):
4846 def allowable_chars(self):
4848 return self._allowable_chars
4849 return frozenset(six_unichr(c) for c in self._allowable_chars)
4851 def _value_sanitize(self, value):
4852 value = super(AllowableCharsMixin, self)._value_sanitize(value)
4853 if not frozenset(value) <= self._allowable_chars:
4854 raise DecodeError("non satisfying alphabet value")
4858 class NumericString(AllowableCharsMixin, CommonString):
4861 Its value is properly sanitized: only ASCII digits with spaces can
4864 >>> NumericString().allowable_chars
4865 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4868 tag_default = tag_encode(18)
4870 asn1_type_name = "NumericString"
4871 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4874 PrintableStringState = namedtuple(
4875 "PrintableStringState",
4876 OctetStringState._fields + ("allowable_chars",),
4881 class PrintableString(AllowableCharsMixin, CommonString):
4884 Its value is properly sanitized: see X.680 41.4 table 10.
4886 >>> PrintableString().allowable_chars
4887 frozenset([' ', "'", ..., 'z'])
4888 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4889 PrintableString PrintableString foo*bar
4890 >>> obj.allow_asterisk, obj.allow_ampersand
4894 tag_default = tag_encode(19)
4896 asn1_type_name = "PrintableString"
4897 _allowable_chars = frozenset(
4898 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4900 _asterisk = frozenset("*".encode("ascii"))
4901 _ampersand = frozenset("&".encode("ascii"))
4913 allow_asterisk=False,
4914 allow_ampersand=False,
4917 :param allow_asterisk: allow asterisk character
4918 :param allow_ampersand: allow ampersand character
4921 self._allowable_chars |= self._asterisk
4923 self._allowable_chars |= self._ampersand
4924 super(PrintableString, self).__init__(
4925 value, bounds, impl, expl, default, optional, _decoded, ctx,
4929 def allow_asterisk(self):
4930 """Is asterisk character allowed?
4932 return self._asterisk <= self._allowable_chars
4935 def allow_ampersand(self):
4936 """Is ampersand character allowed?
4938 return self._ampersand <= self._allowable_chars
4940 def __getstate__(self):
4941 return PrintableStringState(
4942 *super(PrintableString, self).__getstate__(),
4943 **{"allowable_chars": self._allowable_chars}
4946 def __setstate__(self, state):
4947 super(PrintableString, self).__setstate__(state)
4948 self._allowable_chars = state.allowable_chars
4959 return self.__class__(
4962 (self._bound_min, self._bound_max)
4963 if bounds is None else bounds
4965 impl=self.tag if impl is None else impl,
4966 expl=self._expl if expl is None else expl,
4967 default=self.default if default is None else default,
4968 optional=self.optional if optional is None else optional,
4969 allow_asterisk=self.allow_asterisk,
4970 allow_ampersand=self.allow_ampersand,
4974 class TeletexString(CommonString):
4976 tag_default = tag_encode(20)
4977 encoding = "iso-8859-1"
4978 asn1_type_name = "TeletexString"
4981 class T61String(TeletexString):
4983 asn1_type_name = "T61String"
4986 class VideotexString(CommonString):
4988 tag_default = tag_encode(21)
4989 encoding = "iso-8859-1"
4990 asn1_type_name = "VideotexString"
4993 class IA5String(AllowableCharsMixin, CommonString):
4996 Its value is properly sanitized: it is a mix of
4998 * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4999 * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
5000 * DEL character (0x7F)
5002 It is just 7-bit ASCII.
5004 >>> IA5String().allowable_chars
5005 frozenset(["NUL", ... "DEL"])
5008 tag_default = tag_encode(22)
5010 asn1_type_name = "IA5"
5011 _allowable_chars = frozenset(b"".join(
5012 six_unichr(c).encode("ascii") for c in six_xrange(128)
5016 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5017 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5018 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5019 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5020 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5021 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5024 class VisibleString(AllowableCharsMixin, CommonString):
5027 Its value is properly sanitized. ASCII subset from space to tilde is
5028 allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5030 >>> VisibleString().allowable_chars
5031 frozenset([" ", ... "~"])
5034 tag_default = tag_encode(26)
5036 asn1_type_name = "VisibleString"
5037 _allowable_chars = frozenset(b"".join(
5038 six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5042 class ISO646String(VisibleString):
5044 asn1_type_name = "ISO646String"
5047 UTCTimeState = namedtuple(
5049 OctetStringState._fields + ("ber_raw",),
5054 def str_to_time_fractions(value):
5056 year, v = (v // 10**10), (v % 10**10)
5057 month, v = (v // 10**8), (v % 10**8)
5058 day, v = (v // 10**6), (v % 10**6)
5059 hour, v = (v // 10**4), (v % 10**4)
5060 minute, second = (v // 100), (v % 100)
5061 return year, month, day, hour, minute, second
5064 class UTCTime(VisibleString):
5065 """``UTCTime`` datetime type
5067 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5068 UTCTime UTCTime 2017-09-30T22:07:50
5074 datetime.datetime(2017, 9, 30, 22, 7, 50)
5075 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5076 datetime.datetime(1957, 9, 30, 22, 7, 50)
5078 If BER encoded value was met, then ``ber_raw`` attribute will hold
5079 its raw representation.
5083 Pay attention that UTCTime can not hold full year, so all years
5084 having < 50 years are treated as 20xx, 19xx otherwise, according
5085 to X.509 recommendation.
5089 No strict validation of UTC offsets are made, but very crude:
5091 * minutes are not exceeding 60
5092 * offset value is not exceeding 14 hours
5094 __slots__ = ("ber_raw",)
5095 tag_default = tag_encode(23)
5097 asn1_type_name = "UTCTime"
5098 evgen_mode_skip_value = False
5108 bounds=None, # dummy argument, workability for OctetString.decode
5112 :param value: set the value. Either datetime type, or
5113 :py:class:`pyderasn.UTCTime` object
5114 :param bytes impl: override default tag with ``IMPLICIT`` one
5115 :param bytes expl: override default tag with ``EXPLICIT`` one
5116 :param default: set default value. Type same as in ``value``
5117 :param bool optional: is object ``OPTIONAL`` in sequence
5119 super(UTCTime, self).__init__(
5120 None, None, impl, expl, None, optional, _decoded, ctx,
5124 if value is not None:
5125 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5126 self.ber_encoded = self.ber_raw is not None
5127 if default is not None:
5128 default, _ = self._value_sanitize(default)
5129 self.default = self.__class__(
5134 if self._value is None:
5135 self._value = default
5137 self.optional = optional
5139 def _strptime_bered(self, value):
5140 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5143 raise ValueError("no timezone")
5144 year += 2000 if year < 50 else 1900
5145 decoded = datetime(year, month, day, hour, minute)
5147 if value[-1] == "Z":
5151 raise ValueError("invalid UTC offset")
5152 if value[-5] == "-":
5154 elif value[-5] == "+":
5157 raise ValueError("invalid UTC offset")
5158 v = pureint(value[-4:])
5159 offset, v = (60 * (v % 100)), v // 100
5161 raise ValueError("invalid UTC offset minutes")
5163 if offset > 14 * 3600:
5164 raise ValueError("too big UTC offset")
5168 return offset, decoded
5170 raise ValueError("invalid UTC offset seconds")
5171 seconds = pureint(value)
5173 raise ValueError("invalid seconds value")
5174 return offset, decoded + timedelta(seconds=seconds)
5176 def _strptime(self, value):
5177 # datetime.strptime's format: %y%m%d%H%M%SZ
5178 if len(value) != LEN_YYMMDDHHMMSSZ:
5179 raise ValueError("invalid UTCTime length")
5180 if value[-1] != "Z":
5181 raise ValueError("non UTC timezone")
5182 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5183 year += 2000 if year < 50 else 1900
5184 return datetime(year, month, day, hour, minute, second)
5186 def _dt_sanitize(self, value):
5187 if value.year < 1950 or value.year > 2049:
5188 raise ValueError("UTCTime can hold only 1950-2049 years")
5189 return value.replace(microsecond=0)
5191 def _value_sanitize(self, value, ctx=None):
5192 if value.__class__ == binary_type:
5194 value_decoded = value.decode("ascii")
5195 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5196 raise DecodeError("invalid UTCTime encoding: %r" % err)
5199 return self._strptime(value_decoded), None
5200 except (TypeError, ValueError) as _err:
5202 if (ctx is not None) and ctx.get("bered", False):
5204 offset, _value = self._strptime_bered(value_decoded)
5205 _value = _value - timedelta(seconds=offset)
5206 return self._dt_sanitize(_value), value
5207 except (TypeError, ValueError, OverflowError) as _err:
5210 "invalid %s format: %r" % (self.asn1_type_name, err),
5211 klass=self.__class__,
5213 if isinstance(value, self.__class__):
5214 return value._value, None
5215 if value.__class__ == datetime:
5216 return self._dt_sanitize(value), None
5217 raise InvalidValueType((self.__class__, datetime))
5219 def _pp_value(self):
5221 value = self._value.isoformat()
5222 if self.ber_encoded:
5223 value += " (%s)" % self.ber_raw
5227 def __unicode__(self):
5229 value = self._value.isoformat()
5230 if self.ber_encoded:
5231 value += " (%s)" % self.ber_raw
5233 return text_type(self._pp_value())
5235 def __getstate__(self):
5236 return UTCTimeState(
5237 *super(UTCTime, self).__getstate__(),
5238 **{"ber_raw": self.ber_raw}
5241 def __setstate__(self, state):
5242 super(UTCTime, self).__setstate__(state)
5243 self.ber_raw = state.ber_raw
5245 def __bytes__(self):
5246 self._assert_ready()
5247 return self._encode_time()
5249 def __eq__(self, their):
5250 if their.__class__ == binary_type:
5251 return self._encode_time() == their
5252 if their.__class__ == datetime:
5253 return self.todatetime() == their
5254 if not isinstance(their, self.__class__):
5257 self._value == their._value and
5258 self.tag == their.tag and
5259 self._expl == their._expl
5262 def _encode_time(self):
5263 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5266 self._assert_ready()
5267 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5269 def _encode1st(self, state):
5270 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5272 def _encode2nd(self, writer, state_iter):
5273 self._assert_ready()
5274 write_full(writer, self._encode())
5276 def _encode_cer(self, writer):
5277 write_full(writer, self._encode())
5279 def todatetime(self):
5283 return pp_console_row(next(self.pps()))
5285 def pps(self, decode_path=()):
5288 asn1_type_name=self.asn1_type_name,
5289 obj_name=self.__class__.__name__,
5290 decode_path=decode_path,
5291 value=self._pp_value(),
5292 optional=self.optional,
5293 default=self == self.default,
5294 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5295 expl=None if self._expl is None else tag_decode(self._expl),
5300 expl_offset=self.expl_offset if self.expled else None,
5301 expl_tlen=self.expl_tlen if self.expled else None,
5302 expl_llen=self.expl_llen if self.expled else None,
5303 expl_vlen=self.expl_vlen if self.expled else None,
5304 expl_lenindef=self.expl_lenindef,
5305 ber_encoded=self.ber_encoded,
5308 for pp in self.pps_lenindef(decode_path):
5312 class GeneralizedTime(UTCTime):
5313 """``GeneralizedTime`` datetime type
5315 This type is similar to :py:class:`pyderasn.UTCTime`.
5317 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5318 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5320 '20170930220750.000123Z'
5321 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5322 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5326 Only microsecond fractions are supported in DER encoding.
5327 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5328 higher precision values.
5332 BER encoded data can loss information (accuracy) during decoding
5333 because of float transformations.
5337 Local times (without explicit timezone specification) are treated
5338 as UTC one, no transformations are made.
5342 Zero year is unsupported.
5345 tag_default = tag_encode(24)
5346 asn1_type_name = "GeneralizedTime"
5348 def _dt_sanitize(self, value):
5351 def _strptime_bered(self, value):
5352 if len(value) < 4 + 3 * 2:
5353 raise ValueError("invalid GeneralizedTime")
5354 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5355 decoded = datetime(year, month, day, hour)
5356 offset, value = 0, value[10:]
5358 return offset, decoded
5359 if value[-1] == "Z":
5362 for char, sign in (("-", -1), ("+", 1)):
5363 idx = value.rfind(char)
5366 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5367 v = pureint(offset_raw)
5368 if len(offset_raw) == 4:
5369 offset, v = (60 * (v % 100)), v // 100
5371 raise ValueError("invalid UTC offset minutes")
5372 elif len(offset_raw) == 2:
5375 raise ValueError("invalid UTC offset")
5377 if offset > 14 * 3600:
5378 raise ValueError("too big UTC offset")
5382 return offset, decoded
5383 if value[0] in DECIMAL_SIGNS:
5385 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5388 raise ValueError("stripped minutes")
5389 decoded += timedelta(seconds=60 * pureint(value[:2]))
5392 return offset, decoded
5393 if value[0] in DECIMAL_SIGNS:
5395 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5398 raise ValueError("stripped seconds")
5399 decoded += timedelta(seconds=pureint(value[:2]))
5402 return offset, decoded
5403 if value[0] not in DECIMAL_SIGNS:
5404 raise ValueError("invalid format after seconds")
5406 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5409 def _strptime(self, value):
5411 if l == LEN_YYYYMMDDHHMMSSZ:
5412 # datetime.strptime's format: %Y%m%d%H%M%SZ
5413 if value[-1] != "Z":
5414 raise ValueError("non UTC timezone")
5415 return datetime(*str_to_time_fractions(value[:-1]))
5416 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5417 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5418 if value[-1] != "Z":
5419 raise ValueError("non UTC timezone")
5420 if value[14] != ".":
5421 raise ValueError("no fractions separator")
5424 raise ValueError("trailing zero")
5427 raise ValueError("only microsecond fractions are supported")
5428 us = pureint(us + ("0" * (6 - us_len)))
5429 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5430 return datetime(year, month, day, hour, minute, second, us)
5431 raise ValueError("invalid GeneralizedTime length")
5433 def _encode_time(self):
5435 encoded = value.strftime("%Y%m%d%H%M%S")
5436 if value.microsecond > 0:
5437 encoded += (".%06d" % value.microsecond).rstrip("0")
5438 return (encoded + "Z").encode("ascii")
5441 self._assert_ready()
5443 if value.microsecond > 0:
5444 encoded = self._encode_time()
5445 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5446 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5448 def _encode1st(self, state):
5449 self._assert_ready()
5450 vlen = len(self._encode_time())
5451 return len(self.tag) + len_size(vlen) + vlen, state
5453 def _encode2nd(self, writer, state_iter):
5454 write_full(writer, self._encode())
5457 class GraphicString(CommonString):
5459 tag_default = tag_encode(25)
5460 encoding = "iso-8859-1"
5461 asn1_type_name = "GraphicString"
5464 class GeneralString(CommonString):
5466 tag_default = tag_encode(27)
5467 encoding = "iso-8859-1"
5468 asn1_type_name = "GeneralString"
5471 class UniversalString(CommonString):
5473 tag_default = tag_encode(28)
5474 encoding = "utf-32-be"
5475 asn1_type_name = "UniversalString"
5478 class BMPString(CommonString):
5480 tag_default = tag_encode(30)
5481 encoding = "utf-16-be"
5482 asn1_type_name = "BMPString"
5485 ChoiceState = namedtuple(
5487 BasicState._fields + ("specs", "value",),
5493 """``CHOICE`` special type
5497 class GeneralName(Choice):
5499 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5500 ("dNSName", IA5String(impl=tag_ctxp(2))),
5503 >>> gn = GeneralName()
5505 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5506 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5507 >>> gn["dNSName"] = IA5String("bar.baz")
5508 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5509 >>> gn["rfc822Name"]
5512 [2] IA5String IA5 bar.baz
5515 >>> gn.value == gn["dNSName"]
5518 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5520 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5521 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5523 __slots__ = ("specs",)
5525 asn1_type_name = "CHOICE"
5538 :param value: set the value. Either ``(choice, value)`` tuple, or
5539 :py:class:`pyderasn.Choice` object
5540 :param bytes impl: can not be set, do **not** use it
5541 :param bytes expl: override default tag with ``EXPLICIT`` one
5542 :param default: set default value. Type same as in ``value``
5543 :param bool optional: is object ``OPTIONAL`` in sequence
5545 if impl is not None:
5546 raise ValueError("no implicit tag allowed for CHOICE")
5547 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5549 schema = getattr(self, "schema", ())
5550 if len(schema) == 0:
5551 raise ValueError("schema must be specified")
5553 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5556 if value is not None:
5557 self._value = self._value_sanitize(value)
5558 if default is not None:
5559 default_value = self._value_sanitize(default)
5560 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5561 default_obj.specs = self.specs
5562 default_obj._value = default_value
5563 self.default = default_obj
5565 self._value = copy(default_obj._value)
5566 if self._expl is not None:
5567 tag_class, _, tag_num = tag_decode(self._expl)
5568 self._tag_order = (tag_class, tag_num)
5570 def _value_sanitize(self, value):
5571 if (value.__class__ == tuple) and len(value) == 2:
5573 spec = self.specs.get(choice)
5575 raise ObjUnknown(choice)
5576 if not isinstance(obj, spec.__class__):
5577 raise InvalidValueType((spec,))
5578 return (choice, spec(obj))
5579 if isinstance(value, self.__class__):
5581 raise InvalidValueType((self.__class__, tuple))
5585 return self._value is not None and self._value[1].ready
5589 return self.expl_lenindef or (
5590 (self._value is not None) and
5591 self._value[1].bered
5594 def __getstate__(self):
5612 def __setstate__(self, state):
5613 super(Choice, self).__setstate__(state)
5614 self.specs = state.specs
5615 self._value = state.value
5617 def __eq__(self, their):
5618 if (their.__class__ == tuple) and len(their) == 2:
5619 return self._value == their
5620 if not isinstance(their, self.__class__):
5623 self.specs == their.specs and
5624 self._value == their._value
5634 return self.__class__(
5637 expl=self._expl if expl is None else expl,
5638 default=self.default if default is None else default,
5639 optional=self.optional if optional is None else optional,
5644 """Name of the choice
5646 self._assert_ready()
5647 return self._value[0]
5651 """Value of underlying choice
5653 self._assert_ready()
5654 return self._value[1]
5657 def tag_order(self):
5658 self._assert_ready()
5659 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5662 def tag_order_cer(self):
5663 return min(v.tag_order_cer for v in itervalues(self.specs))
5665 def __getitem__(self, key):
5666 if key not in self.specs:
5667 raise ObjUnknown(key)
5668 if self._value is None:
5670 choice, value = self._value
5675 def __setitem__(self, key, value):
5676 spec = self.specs.get(key)
5678 raise ObjUnknown(key)
5679 if not isinstance(value, spec.__class__):
5680 raise InvalidValueType((spec.__class__,))
5681 self._value = (key, spec(value))
5689 return self._value[1].decoded if self.ready else False
5692 self._assert_ready()
5693 return self._value[1].encode()
5695 def _encode1st(self, state):
5696 self._assert_ready()
5697 return self._value[1].encode1st(state)
5699 def _encode2nd(self, writer, state_iter):
5700 self._value[1].encode2nd(writer, state_iter)
5702 def _encode_cer(self, writer):
5703 self._assert_ready()
5704 self._value[1].encode_cer(writer)
5706 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5707 for choice, spec in iteritems(self.specs):
5708 sub_decode_path = decode_path + (choice,)
5714 decode_path=sub_decode_path,
5717 _ctx_immutable=False,
5724 klass=self.__class__,
5725 decode_path=decode_path,
5728 if tag_only: # pragma: no cover
5732 for _decode_path, value, tail in spec.decode_evgen(
5736 decode_path=sub_decode_path,
5738 _ctx_immutable=False,
5740 yield _decode_path, value, tail
5742 _, value, tail = next(spec.decode_evgen(
5746 decode_path=sub_decode_path,
5748 _ctx_immutable=False,
5751 obj = self.__class__(
5754 default=self.default,
5755 optional=self.optional,
5756 _decoded=(offset, 0, value.fulllen),
5758 obj._value = (choice, value)
5759 yield decode_path, obj, tail
5762 value = pp_console_row(next(self.pps()))
5764 value = "%s[%r]" % (value, self.value)
5767 def pps(self, decode_path=()):
5770 asn1_type_name=self.asn1_type_name,
5771 obj_name=self.__class__.__name__,
5772 decode_path=decode_path,
5773 value=self.choice if self.ready else None,
5774 optional=self.optional,
5775 default=self == self.default,
5776 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5777 expl=None if self._expl is None else tag_decode(self._expl),
5782 expl_lenindef=self.expl_lenindef,
5786 yield self.value.pps(decode_path=decode_path + (self.choice,))
5787 for pp in self.pps_lenindef(decode_path):
5791 class PrimitiveTypes(Choice):
5792 """Predefined ``CHOICE`` for all generic primitive types
5794 It could be useful for general decoding of some unspecified values:
5796 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5797 OCTET STRING 3 bytes 666f6f
5798 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5802 schema = tuple((klass.__name__, klass()) for klass in (
5826 AnyState = namedtuple(
5828 BasicState._fields + ("value", "defined"),
5834 """``ANY`` special type
5836 >>> Any(Integer(-123))
5837 ANY INTEGER -123 (0X:7B)
5838 >>> a = Any(OctetString(b"hello world").encode())
5839 ANY 040b68656c6c6f20776f726c64
5840 >>> hexenc(bytes(a))
5841 b'0x040x0bhello world'
5843 __slots__ = ("defined",)
5844 tag_default = tag_encode(0)
5845 asn1_type_name = "ANY"
5855 :param value: set the value. Either any kind of pyderasn's
5856 **ready** object, or bytes. Pay attention that
5857 **no** validation is performed if raw binary value
5858 is valid TLV, except just tag decoding
5859 :param bytes expl: override default tag with ``EXPLICIT`` one
5860 :param bool optional: is object ``OPTIONAL`` in sequence
5862 super(Any, self).__init__(None, expl, None, optional, _decoded)
5866 value = self._value_sanitize(value)
5868 if self._expl is None:
5869 if value.__class__ == binary_type:
5870 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5872 tag_class, tag_num = value.tag_order
5874 tag_class, _, tag_num = tag_decode(self._expl)
5875 self._tag_order = (tag_class, tag_num)
5878 def _value_sanitize(self, value):
5879 if value.__class__ == binary_type:
5881 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5883 if isinstance(value, self.__class__):
5885 if not isinstance(value, Obj):
5886 raise InvalidValueType((self.__class__, Obj, binary_type))
5891 return self._value is not None
5894 def tag_order(self):
5895 self._assert_ready()
5896 return self._tag_order
5900 if self.expl_lenindef or self.lenindef:
5902 if self.defined is None:
5904 return self.defined[1].bered
5906 def __getstate__(self):
5924 def __setstate__(self, state):
5925 super(Any, self).__setstate__(state)
5926 self._value = state.value
5927 self.defined = state.defined
5929 def __eq__(self, their):
5930 if their.__class__ == binary_type:
5931 if self._value.__class__ == binary_type:
5932 return self._value == their
5933 return self._value.encode() == their
5934 if issubclass(their.__class__, Any):
5935 if self.ready and their.ready:
5936 return bytes(self) == bytes(their)
5937 return self.ready == their.ready
5946 return self.__class__(
5948 expl=self._expl if expl is None else expl,
5949 optional=self.optional if optional is None else optional,
5952 def __bytes__(self):
5953 self._assert_ready()
5955 if value.__class__ == binary_type:
5957 return self._value.encode()
5964 self._assert_ready()
5966 if value.__class__ == binary_type:
5968 return value.encode()
5970 def _encode1st(self, state):
5971 self._assert_ready()
5973 if value.__class__ == binary_type:
5974 return len(value), state
5975 return value.encode1st(state)
5977 def _encode2nd(self, writer, state_iter):
5979 if value.__class__ == binary_type:
5980 write_full(writer, value)
5982 value.encode2nd(writer, state_iter)
5984 def _encode_cer(self, writer):
5985 self._assert_ready()
5987 if value.__class__ == binary_type:
5988 write_full(writer, value)
5990 value.encode_cer(writer)
5992 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5994 t, tlen, lv = tag_strip(tlv)
5995 except DecodeError as err:
5996 raise err.__class__(
5998 klass=self.__class__,
5999 decode_path=decode_path,
6003 l, llen, v = len_decode(lv)
6004 except LenIndefForm as err:
6005 if not ctx.get("bered", False):
6006 raise err.__class__(
6008 klass=self.__class__,
6009 decode_path=decode_path,
6012 llen, vlen, v = 1, 0, lv[1:]
6013 sub_offset = offset + tlen + llen
6015 while v[:EOC_LEN].tobytes() != EOC:
6016 chunk, v = Any().decode(
6019 decode_path=decode_path + (str(chunk_i),),
6022 _ctx_immutable=False,
6024 vlen += chunk.tlvlen
6025 sub_offset += chunk.tlvlen
6027 tlvlen = tlen + llen + vlen + EOC_LEN
6028 obj = self.__class__(
6029 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6031 optional=self.optional,
6032 _decoded=(offset, 0, tlvlen),
6035 obj.tag = t.tobytes()
6036 yield decode_path, obj, v[EOC_LEN:]
6038 except DecodeError as err:
6039 raise err.__class__(
6041 klass=self.__class__,
6042 decode_path=decode_path,
6046 raise NotEnoughData(
6047 "encoded length is longer than data",
6048 klass=self.__class__,
6049 decode_path=decode_path,
6052 tlvlen = tlen + llen + l
6053 v, tail = tlv[:tlvlen], v[l:]
6054 obj = self.__class__(
6055 value=None if evgen_mode else v.tobytes(),
6057 optional=self.optional,
6058 _decoded=(offset, 0, tlvlen),
6060 obj.tag = t.tobytes()
6061 yield decode_path, obj, tail
6064 return pp_console_row(next(self.pps()))
6066 def pps(self, decode_path=()):
6070 elif value.__class__ == binary_type:
6076 asn1_type_name=self.asn1_type_name,
6077 obj_name=self.__class__.__name__,
6078 decode_path=decode_path,
6080 blob=self._value if self._value.__class__ == binary_type else None,
6081 optional=self.optional,
6082 default=self == self.default,
6083 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6084 expl=None if self._expl is None else tag_decode(self._expl),
6089 expl_offset=self.expl_offset if self.expled else None,
6090 expl_tlen=self.expl_tlen if self.expled else None,
6091 expl_llen=self.expl_llen if self.expled else None,
6092 expl_vlen=self.expl_vlen if self.expled else None,
6093 expl_lenindef=self.expl_lenindef,
6094 lenindef=self.lenindef,
6097 defined_by, defined = self.defined or (None, None)
6098 if defined_by is not None:
6100 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6102 for pp in self.pps_lenindef(decode_path):
6106 ########################################################################
6107 # ASN.1 constructed types
6108 ########################################################################
6110 def abs_decode_path(decode_path, rel_path):
6111 """Create an absolute decode path from current and relative ones
6113 :param decode_path: current decode path, starting point. Tuple of strings
6114 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6115 If first tuple's element is "/", then treat it as
6116 an absolute path, ignoring ``decode_path`` as
6117 starting point. Also this tuple can contain ".."
6118 elements, stripping the leading element from
6121 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6122 ("foo", "bar", "baz", "whatever")
6123 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6125 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6128 if rel_path[0] == "/":
6130 if rel_path[0] == "..":
6131 return abs_decode_path(decode_path[:-1], rel_path[1:])
6132 return decode_path + rel_path
6135 SequenceState = namedtuple(
6137 BasicState._fields + ("specs", "value",),
6142 class SequenceEncode1stMixing(object):
6143 def _encode1st(self, state):
6145 idx = len(state) - 1
6147 for v in self._values_for_encoding():
6148 l, _ = v.encode1st(state)
6151 return len(self.tag) + len_size(vlen) + vlen, state
6154 class Sequence(SequenceEncode1stMixing, Obj):
6155 """``SEQUENCE`` structure type
6157 You have to make specification of sequence::
6159 class Extension(Sequence):
6161 ("extnID", ObjectIdentifier()),
6162 ("critical", Boolean(default=False)),
6163 ("extnValue", OctetString()),
6166 Then, you can work with it as with dictionary.
6168 >>> ext = Extension()
6169 >>> Extension().specs
6171 ('extnID', OBJECT IDENTIFIER),
6172 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6173 ('extnValue', OCTET STRING),
6175 >>> ext["extnID"] = "1.2.3"
6176 Traceback (most recent call last):
6177 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6178 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6180 You can determine if sequence is ready to be encoded:
6185 Traceback (most recent call last):
6186 pyderasn.ObjNotReady: object is not ready: extnValue
6187 >>> ext["extnValue"] = OctetString(b"foobar")
6191 Value you want to assign, must have the same **type** as in
6192 corresponding specification, but it can have different tags,
6193 optional/default attributes -- they will be taken from specification
6196 class TBSCertificate(Sequence):
6198 ("version", Version(expl=tag_ctxc(0), default="v1")),
6201 >>> tbs = TBSCertificate()
6202 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6204 Assign ``None`` to remove value from sequence.
6206 You can set values in Sequence during its initialization:
6208 >>> AlgorithmIdentifier((
6209 ("algorithm", ObjectIdentifier("1.2.3")),
6210 ("parameters", Any(Null()))
6212 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6214 You can determine if value exists/set in the sequence and take its value:
6216 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6219 OBJECT IDENTIFIER 1.2.3
6221 But pay attention that if value has default, then it won't be (not
6222 in) in the sequence (because ``DEFAULT`` must not be encoded in
6223 DER), but you can read its value:
6225 >>> "critical" in ext, ext["critical"]
6226 (False, BOOLEAN False)
6227 >>> ext["critical"] = Boolean(True)
6228 >>> "critical" in ext, ext["critical"]
6229 (True, BOOLEAN True)
6231 All defaulted values are always optional.
6233 .. _allow_default_values_ctx:
6235 DER prohibits default value encoding and will raise an error if
6236 default value is unexpectedly met during decode.
6237 If :ref:`bered <bered_ctx>` context option is set, then no error
6238 will be raised, but ``bered`` attribute set. You can disable strict
6239 defaulted values existence validation by setting
6240 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6242 All values with DEFAULT specified are decoded atomically in
6243 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6244 SEQUENCE, then it will be yielded as a single element, not
6245 disassembled. That is required for DEFAULT existence check.
6247 Two sequences are equal if they have equal specification (schema),
6248 implicit/explicit tagging and the same values.
6250 __slots__ = ("specs",)
6251 tag_default = tag_encode(form=TagFormConstructed, num=16)
6252 asn1_type_name = "SEQUENCE"
6264 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6266 schema = getattr(self, "schema", ())
6268 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6271 if value is not None:
6272 if issubclass(value.__class__, Sequence):
6273 self._value = value._value
6274 elif hasattr(value, "__iter__"):
6275 for seq_key, seq_value in value:
6276 self[seq_key] = seq_value
6278 raise InvalidValueType((Sequence,))
6279 if default is not None:
6280 if not issubclass(default.__class__, Sequence):
6281 raise InvalidValueType((Sequence,))
6282 default_value = default._value
6283 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6284 default_obj.specs = self.specs
6285 default_obj._value = default_value
6286 self.default = default_obj
6288 self._value = copy(default_obj._value)
6292 for name, spec in iteritems(self.specs):
6293 value = self._value.get(name)
6304 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6306 return any(value.bered for value in itervalues(self._value))
6308 def __getstate__(self):
6309 return SequenceState(
6323 {k: copy(v) for k, v in iteritems(self._value)},
6326 def __setstate__(self, state):
6327 super(Sequence, self).__setstate__(state)
6328 self.specs = state.specs
6329 self._value = state.value
6331 def __eq__(self, their):
6332 if not isinstance(their, self.__class__):
6335 self.specs == their.specs and
6336 self.tag == their.tag and
6337 self._expl == their._expl and
6338 self._value == their._value
6349 return self.__class__(
6352 impl=self.tag if impl is None else impl,
6353 expl=self._expl if expl is None else expl,
6354 default=self.default if default is None else default,
6355 optional=self.optional if optional is None else optional,
6358 def __contains__(self, key):
6359 return key in self._value
6361 def __setitem__(self, key, value):
6362 spec = self.specs.get(key)
6364 raise ObjUnknown(key)
6366 self._value.pop(key, None)
6368 if not isinstance(value, spec.__class__):
6369 raise InvalidValueType((spec.__class__,))
6370 value = spec(value=value)
6371 if spec.default is not None and value == spec.default:
6372 self._value.pop(key, None)
6374 self._value[key] = value
6376 def __getitem__(self, key):
6377 value = self._value.get(key)
6378 if value is not None:
6380 spec = self.specs.get(key)
6382 raise ObjUnknown(key)
6383 if spec.default is not None:
6387 def _values_for_encoding(self):
6388 for name, spec in iteritems(self.specs):
6389 value = self._value.get(name)
6393 raise ObjNotReady(name)
6397 v = b"".join(v.encode() for v in self._values_for_encoding())
6398 return b"".join((self.tag, len_encode(len(v)), v))
6400 def _encode2nd(self, writer, state_iter):
6401 write_full(writer, self.tag + len_encode(next(state_iter)))
6402 for v in self._values_for_encoding():
6403 v.encode2nd(writer, state_iter)
6405 def _encode_cer(self, writer):
6406 write_full(writer, self.tag + LENINDEF)
6407 for v in self._values_for_encoding():
6408 v.encode_cer(writer)
6409 write_full(writer, EOC)
6411 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6413 t, tlen, lv = tag_strip(tlv)
6414 except DecodeError as err:
6415 raise err.__class__(
6417 klass=self.__class__,
6418 decode_path=decode_path,
6423 klass=self.__class__,
6424 decode_path=decode_path,
6427 if tag_only: # pragma: no cover
6431 ctx_bered = ctx.get("bered", False)
6433 l, llen, v = len_decode(lv)
6434 except LenIndefForm as err:
6436 raise err.__class__(
6438 klass=self.__class__,
6439 decode_path=decode_path,
6442 l, llen, v = 0, 1, lv[1:]
6444 except DecodeError as err:
6445 raise err.__class__(
6447 klass=self.__class__,
6448 decode_path=decode_path,
6452 raise NotEnoughData(
6453 "encoded length is longer than data",
6454 klass=self.__class__,
6455 decode_path=decode_path,
6459 v, tail = v[:l], v[l:]
6461 sub_offset = offset + tlen + llen
6464 ctx_allow_default_values = ctx.get("allow_default_values", False)
6465 for name, spec in iteritems(self.specs):
6466 if spec.optional and (
6467 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6471 spec_defaulted = spec.default is not None
6472 sub_decode_path = decode_path + (name,)
6474 if evgen_mode and not spec_defaulted:
6475 for _decode_path, value, v_tail in spec.decode_evgen(
6479 decode_path=sub_decode_path,
6481 _ctx_immutable=False,
6483 yield _decode_path, value, v_tail
6485 _, value, v_tail = next(spec.decode_evgen(
6489 decode_path=sub_decode_path,
6491 _ctx_immutable=False,
6494 except TagMismatch as err:
6495 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6499 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6500 if not evgen_mode and defined is not None:
6501 defined_by, defined_spec = defined
6502 if issubclass(value.__class__, SequenceOf):
6503 for i, _value in enumerate(value):
6504 sub_sub_decode_path = sub_decode_path + (
6506 DecodePathDefBy(defined_by),
6508 defined_value, defined_tail = defined_spec.decode(
6509 memoryview(bytes(_value)),
6511 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6512 if value.expled else (value.tlen + value.llen)
6515 decode_path=sub_sub_decode_path,
6517 _ctx_immutable=False,
6519 if len(defined_tail) > 0:
6522 klass=self.__class__,
6523 decode_path=sub_sub_decode_path,
6526 _value.defined = (defined_by, defined_value)
6528 defined_value, defined_tail = defined_spec.decode(
6529 memoryview(bytes(value)),
6531 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6532 if value.expled else (value.tlen + value.llen)
6535 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6537 _ctx_immutable=False,
6539 if len(defined_tail) > 0:
6542 klass=self.__class__,
6543 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6546 value.defined = (defined_by, defined_value)
6548 value_len = value.fulllen
6550 sub_offset += value_len
6554 yield sub_decode_path, value, v_tail
6555 if value == spec.default:
6556 if ctx_bered or ctx_allow_default_values:
6560 "DEFAULT value met",
6561 klass=self.__class__,
6562 decode_path=sub_decode_path,
6566 values[name] = value
6567 spec_defines = getattr(spec, "defines", ())
6568 if len(spec_defines) == 0:
6569 defines_by_path = ctx.get("defines_by_path", ())
6570 if len(defines_by_path) > 0:
6571 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6572 if spec_defines is not None and len(spec_defines) > 0:
6573 for rel_path, schema in spec_defines:
6574 defined = schema.get(value, None)
6575 if defined is not None:
6576 ctx.setdefault("_defines", []).append((
6577 abs_decode_path(sub_decode_path[:-1], rel_path),
6581 if v[:EOC_LEN].tobytes() != EOC:
6584 klass=self.__class__,
6585 decode_path=decode_path,
6593 klass=self.__class__,
6594 decode_path=decode_path,
6597 obj = self.__class__(
6601 default=self.default,
6602 optional=self.optional,
6603 _decoded=(offset, llen, vlen),
6606 obj.lenindef = lenindef
6607 obj.ber_encoded = ber_encoded
6608 yield decode_path, obj, tail
6611 value = pp_console_row(next(self.pps()))
6613 for name in self.specs:
6614 _value = self._value.get(name)
6617 cols.append("%s: %s" % (name, repr(_value)))
6618 return "%s[%s]" % (value, "; ".join(cols))
6620 def pps(self, decode_path=()):
6623 asn1_type_name=self.asn1_type_name,
6624 obj_name=self.__class__.__name__,
6625 decode_path=decode_path,
6626 optional=self.optional,
6627 default=self == self.default,
6628 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6629 expl=None if self._expl is None else tag_decode(self._expl),
6634 expl_offset=self.expl_offset if self.expled else None,
6635 expl_tlen=self.expl_tlen if self.expled else None,
6636 expl_llen=self.expl_llen if self.expled else None,
6637 expl_vlen=self.expl_vlen if self.expled else None,
6638 expl_lenindef=self.expl_lenindef,
6639 lenindef=self.lenindef,
6640 ber_encoded=self.ber_encoded,
6643 for name in self.specs:
6644 value = self._value.get(name)
6647 yield value.pps(decode_path=decode_path + (name,))
6648 for pp in self.pps_lenindef(decode_path):
6652 class Set(Sequence, SequenceEncode1stMixing):
6653 """``SET`` structure type
6655 Its usage is identical to :py:class:`pyderasn.Sequence`.
6657 .. _allow_unordered_set_ctx:
6659 DER prohibits unordered values encoding and will raise an error
6660 during decode. If :ref:`bered <bered_ctx>` context option is set,
6661 then no error will occur. Also you can disable strict values
6662 ordering check by setting ``"allow_unordered_set": True``
6663 :ref:`context <ctx>` option.
6666 tag_default = tag_encode(form=TagFormConstructed, num=17)
6667 asn1_type_name = "SET"
6669 def _values_for_encoding(self):
6671 super(Set, self)._values_for_encoding(),
6672 key=attrgetter("tag_order"),
6675 def _encode_cer(self, writer):
6676 write_full(writer, self.tag + LENINDEF)
6678 super(Set, self)._values_for_encoding(),
6679 key=attrgetter("tag_order_cer"),
6681 v.encode_cer(writer)
6682 write_full(writer, EOC)
6684 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6686 t, tlen, lv = tag_strip(tlv)
6687 except DecodeError as err:
6688 raise err.__class__(
6690 klass=self.__class__,
6691 decode_path=decode_path,
6696 klass=self.__class__,
6697 decode_path=decode_path,
6704 ctx_bered = ctx.get("bered", False)
6706 l, llen, v = len_decode(lv)
6707 except LenIndefForm as err:
6709 raise err.__class__(
6711 klass=self.__class__,
6712 decode_path=decode_path,
6715 l, llen, v = 0, 1, lv[1:]
6717 except DecodeError as err:
6718 raise err.__class__(
6720 klass=self.__class__,
6721 decode_path=decode_path,
6725 raise NotEnoughData(
6726 "encoded length is longer than data",
6727 klass=self.__class__,
6731 v, tail = v[:l], v[l:]
6733 sub_offset = offset + tlen + llen
6736 ctx_allow_default_values = ctx.get("allow_default_values", False)
6737 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6738 tag_order_prev = (0, 0)
6739 _specs_items = copy(self.specs)
6742 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6744 for name, spec in iteritems(_specs_items):
6745 sub_decode_path = decode_path + (name,)
6751 decode_path=sub_decode_path,
6754 _ctx_immutable=False,
6761 klass=self.__class__,
6762 decode_path=decode_path,
6765 spec_defaulted = spec.default is not None
6766 if evgen_mode and not spec_defaulted:
6767 for _decode_path, value, v_tail in spec.decode_evgen(
6771 decode_path=sub_decode_path,
6773 _ctx_immutable=False,
6775 yield _decode_path, value, v_tail
6777 _, value, v_tail = next(spec.decode_evgen(
6781 decode_path=sub_decode_path,
6783 _ctx_immutable=False,
6786 value_tag_order = value.tag_order
6787 value_len = value.fulllen
6788 if tag_order_prev >= value_tag_order:
6789 if ctx_bered or ctx_allow_unordered_set:
6793 "unordered " + self.asn1_type_name,
6794 klass=self.__class__,
6795 decode_path=sub_decode_path,
6800 yield sub_decode_path, value, v_tail
6801 if value != spec.default:
6803 elif ctx_bered or ctx_allow_default_values:
6807 "DEFAULT value met",
6808 klass=self.__class__,
6809 decode_path=sub_decode_path,
6812 values[name] = value
6813 del _specs_items[name]
6814 tag_order_prev = value_tag_order
6815 sub_offset += value_len
6819 obj = self.__class__(
6823 default=self.default,
6824 optional=self.optional,
6825 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6828 if v[:EOC_LEN].tobytes() != EOC:
6831 klass=self.__class__,
6832 decode_path=decode_path,
6837 for name, spec in iteritems(self.specs):
6838 if name not in values and not spec.optional:
6840 "%s value is not ready" % name,
6841 klass=self.__class__,
6842 decode_path=decode_path,
6847 obj.ber_encoded = ber_encoded
6848 yield decode_path, obj, tail
6851 SequenceOfState = namedtuple(
6853 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6858 class SequenceOf(SequenceEncode1stMixing, Obj):
6859 """``SEQUENCE OF`` sequence type
6861 For that kind of type you must specify the object it will carry on
6862 (bounds are for example here, not required)::
6864 class Ints(SequenceOf):
6869 >>> ints.append(Integer(123))
6870 >>> ints.append(Integer(234))
6872 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6873 >>> [int(i) for i in ints]
6875 >>> ints.append(Integer(345))
6876 Traceback (most recent call last):
6877 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6880 >>> ints[1] = Integer(345)
6882 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6884 You can initialize sequence with preinitialized values:
6886 >>> ints = Ints([Integer(123), Integer(234)])
6888 Also you can use iterator as a value:
6890 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6892 And it won't be iterated until encoding process. Pay attention that
6893 bounds and required schema checks are done only during the encoding
6894 process in that case! After encode was called, then value is zeroed
6895 back to empty list and you have to set it again. That mode is useful
6896 mainly with CER encoding mode, where all objects from the iterable
6897 will be streamed to the buffer, without copying all of them to
6900 __slots__ = ("spec", "_bound_min", "_bound_max")
6901 tag_default = tag_encode(form=TagFormConstructed, num=16)
6902 asn1_type_name = "SEQUENCE OF"
6915 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6917 schema = getattr(self, "schema", None)
6919 raise ValueError("schema must be specified")
6921 self._bound_min, self._bound_max = getattr(
6925 ) if bounds is None else bounds
6927 if value is not None:
6928 self._value = self._value_sanitize(value)
6929 if default is not None:
6930 default_value = self._value_sanitize(default)
6931 default_obj = self.__class__(
6936 default_obj._value = default_value
6937 self.default = default_obj
6939 self._value = copy(default_obj._value)
6941 def _value_sanitize(self, value):
6943 if issubclass(value.__class__, SequenceOf):
6944 value = value._value
6945 elif hasattr(value, NEXT_ATTR_NAME):
6947 elif hasattr(value, "__iter__"):
6950 raise InvalidValueType((self.__class__, iter, "iterator"))
6952 if not self._bound_min <= len(value) <= self._bound_max:
6953 raise BoundsError(self._bound_min, len(value), self._bound_max)
6954 class_expected = self.spec.__class__
6956 if not isinstance(v, class_expected):
6957 raise InvalidValueType((class_expected,))
6962 if hasattr(self._value, NEXT_ATTR_NAME):
6964 if self._bound_min > 0 and len(self._value) == 0:
6966 return all(v.ready for v in self._value)
6970 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6972 return any(v.bered for v in self._value)
6974 def __getstate__(self):
6975 if hasattr(self._value, NEXT_ATTR_NAME):
6976 raise ValueError("can not pickle SequenceOf with iterator")
6977 return SequenceOfState(
6991 [copy(v) for v in self._value],
6996 def __setstate__(self, state):
6997 super(SequenceOf, self).__setstate__(state)
6998 self.spec = state.spec
6999 self._value = state.value
7000 self._bound_min = state.bound_min
7001 self._bound_max = state.bound_max
7003 def __eq__(self, their):
7004 if isinstance(their, self.__class__):
7006 self.spec == their.spec and
7007 self.tag == their.tag and
7008 self._expl == their._expl and
7009 self._value == their._value
7011 if hasattr(their, "__iter__"):
7012 return self._value == list(their)
7024 return self.__class__(
7028 (self._bound_min, self._bound_max)
7029 if bounds is None else bounds
7031 impl=self.tag if impl is None else impl,
7032 expl=self._expl if expl is None else expl,
7033 default=self.default if default is None else default,
7034 optional=self.optional if optional is None else optional,
7037 def __contains__(self, key):
7038 return key in self._value
7040 def append(self, value):
7041 if not isinstance(value, self.spec.__class__):
7042 raise InvalidValueType((self.spec.__class__,))
7043 if len(self._value) + 1 > self._bound_max:
7046 len(self._value) + 1,
7049 self._value.append(value)
7052 return iter(self._value)
7055 return len(self._value)
7057 def __setitem__(self, key, value):
7058 if not isinstance(value, self.spec.__class__):
7059 raise InvalidValueType((self.spec.__class__,))
7060 self._value[key] = self.spec(value=value)
7062 def __getitem__(self, key):
7063 return self._value[key]
7065 def _values_for_encoding(self):
7066 return iter(self._value)
7069 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7072 values_append = values.append
7073 class_expected = self.spec.__class__
7074 values_for_encoding = self._values_for_encoding()
7076 for v in values_for_encoding:
7077 if not isinstance(v, class_expected):
7078 raise InvalidValueType((class_expected,))
7079 values_append(v.encode())
7080 if not self._bound_min <= len(values) <= self._bound_max:
7081 raise BoundsError(self._bound_min, len(values), self._bound_max)
7082 value = b"".join(values)
7084 value = b"".join(v.encode() for v in self._values_for_encoding())
7085 return b"".join((self.tag, len_encode(len(value)), value))
7087 def _encode1st(self, state):
7088 state = super(SequenceOf, self)._encode1st(state)
7089 if hasattr(self._value, NEXT_ATTR_NAME):
7093 def _encode2nd(self, writer, state_iter):
7094 write_full(writer, self.tag + len_encode(next(state_iter)))
7095 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7098 class_expected = self.spec.__class__
7099 values_for_encoding = self._values_for_encoding()
7101 for v in values_for_encoding:
7102 if not isinstance(v, class_expected):
7103 raise InvalidValueType((class_expected,))
7104 v.encode2nd(writer, state_iter)
7106 if not self._bound_min <= values_count <= self._bound_max:
7107 raise BoundsError(self._bound_min, values_count, self._bound_max)
7109 for v in self._values_for_encoding():
7110 v.encode2nd(writer, state_iter)
7112 def _encode_cer(self, writer):
7113 write_full(writer, self.tag + LENINDEF)
7114 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7116 class_expected = self.spec.__class__
7118 values_for_encoding = self._values_for_encoding()
7120 for v in values_for_encoding:
7121 if not isinstance(v, class_expected):
7122 raise InvalidValueType((class_expected,))
7123 v.encode_cer(writer)
7125 if not self._bound_min <= values_count <= self._bound_max:
7126 raise BoundsError(self._bound_min, values_count, self._bound_max)
7128 for v in self._values_for_encoding():
7129 v.encode_cer(writer)
7130 write_full(writer, EOC)
7140 ordering_check=False,
7143 t, tlen, lv = tag_strip(tlv)
7144 except DecodeError as err:
7145 raise err.__class__(
7147 klass=self.__class__,
7148 decode_path=decode_path,
7153 klass=self.__class__,
7154 decode_path=decode_path,
7161 ctx_bered = ctx.get("bered", False)
7163 l, llen, v = len_decode(lv)
7164 except LenIndefForm as err:
7166 raise err.__class__(
7168 klass=self.__class__,
7169 decode_path=decode_path,
7172 l, llen, v = 0, 1, lv[1:]
7174 except DecodeError as err:
7175 raise err.__class__(
7177 klass=self.__class__,
7178 decode_path=decode_path,
7182 raise NotEnoughData(
7183 "encoded length is longer than data",
7184 klass=self.__class__,
7185 decode_path=decode_path,
7189 v, tail = v[:l], v[l:]
7191 sub_offset = offset + tlen + llen
7194 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7195 value_prev = memoryview(v[:0])
7199 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7201 sub_decode_path = decode_path + (str(_value_count),)
7203 for _decode_path, value, v_tail in spec.decode_evgen(
7207 decode_path=sub_decode_path,
7209 _ctx_immutable=False,
7211 yield _decode_path, value, v_tail
7213 _, value, v_tail = next(spec.decode_evgen(
7217 decode_path=sub_decode_path,
7219 _ctx_immutable=False,
7222 value_len = value.fulllen
7224 if value_prev.tobytes() > v[:value_len].tobytes():
7225 if ctx_bered or ctx_allow_unordered_set:
7229 "unordered " + self.asn1_type_name,
7230 klass=self.__class__,
7231 decode_path=sub_decode_path,
7234 value_prev = v[:value_len]
7237 _value.append(value)
7238 sub_offset += value_len
7241 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7243 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7244 klass=self.__class__,
7245 decode_path=decode_path,
7249 obj = self.__class__(
7250 value=None if evgen_mode else _value,
7252 bounds=(self._bound_min, self._bound_max),
7255 default=self.default,
7256 optional=self.optional,
7257 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7259 except BoundsError as err:
7262 klass=self.__class__,
7263 decode_path=decode_path,
7267 if v[:EOC_LEN].tobytes() != EOC:
7270 klass=self.__class__,
7271 decode_path=decode_path,
7276 obj.ber_encoded = ber_encoded
7277 yield decode_path, obj, tail
7281 pp_console_row(next(self.pps())),
7282 ", ".join(repr(v) for v in self._value),
7285 def pps(self, decode_path=()):
7288 asn1_type_name=self.asn1_type_name,
7289 obj_name=self.__class__.__name__,
7290 decode_path=decode_path,
7291 optional=self.optional,
7292 default=self == self.default,
7293 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7294 expl=None if self._expl is None else tag_decode(self._expl),
7299 expl_offset=self.expl_offset if self.expled else None,
7300 expl_tlen=self.expl_tlen if self.expled else None,
7301 expl_llen=self.expl_llen if self.expled else None,
7302 expl_vlen=self.expl_vlen if self.expled else None,
7303 expl_lenindef=self.expl_lenindef,
7304 lenindef=self.lenindef,
7305 ber_encoded=self.ber_encoded,
7308 for i, value in enumerate(self._value):
7309 yield value.pps(decode_path=decode_path + (str(i),))
7310 for pp in self.pps_lenindef(decode_path):
7314 class SetOf(SequenceOf):
7315 """``SET OF`` sequence type
7317 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7320 tag_default = tag_encode(form=TagFormConstructed, num=17)
7321 asn1_type_name = "SET OF"
7323 def _value_sanitize(self, value):
7324 value = super(SetOf, self)._value_sanitize(value)
7325 if hasattr(value, NEXT_ATTR_NAME):
7327 "SetOf does not support iterator values, as no sense in them"
7332 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7333 return b"".join((self.tag, len_encode(len(v)), v))
7335 def _encode2nd(self, writer, state_iter):
7336 write_full(writer, self.tag + len_encode(next(state_iter)))
7338 for v in self._values_for_encoding():
7340 v.encode2nd(buf.write, state_iter)
7341 values.append(buf.getvalue())
7344 write_full(writer, v)
7346 def _encode_cer(self, writer):
7347 write_full(writer, self.tag + LENINDEF)
7348 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7349 write_full(writer, v)
7350 write_full(writer, EOC)
7352 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7353 return super(SetOf, self)._decode(
7360 ordering_check=True,
7364 def obj_by_path(pypath): # pragma: no cover
7365 """Import object specified as string Python path
7367 Modules must be separated from classes/functions with ``:``.
7369 >>> obj_by_path("foo.bar:Baz")
7370 <class 'foo.bar.Baz'>
7371 >>> obj_by_path("foo.bar:Baz.boo")
7372 <classmethod 'foo.bar.Baz.boo'>
7374 mod, objs = pypath.rsplit(":", 1)
7375 from importlib import import_module
7376 obj = import_module(mod)
7377 for obj_name in objs.split("."):
7378 obj = getattr(obj, obj_name)
7382 def generic_decoder(): # pragma: no cover
7383 # All of this below is a big hack with self references
7384 choice = PrimitiveTypes()
7385 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7386 choice.specs["SetOf"] = SetOf(schema=choice)
7387 for i in six_xrange(31):
7388 choice.specs["SequenceOf%d" % i] = SequenceOf(
7392 choice.specs["Any"] = Any()
7394 # Class name equals to type name, to omit it from output
7395 class SEQUENCEOF(SequenceOf):
7403 with_decode_path=False,
7404 decode_path_only=(),
7407 def _pprint_pps(pps):
7409 if hasattr(pp, "_fields"):
7411 decode_path_only != () and
7412 pp.decode_path[:len(decode_path_only)] != decode_path_only
7415 if pp.asn1_type_name == Choice.asn1_type_name:
7417 pp_kwargs = pp._asdict()
7418 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7419 pp = _pp(**pp_kwargs)
7420 yield pp_console_row(
7425 with_colours=with_colours,
7426 with_decode_path=with_decode_path,
7427 decode_path_len_decrease=len(decode_path_only),
7429 for row in pp_console_blob(
7431 decode_path_len_decrease=len(decode_path_only),
7435 for row in _pprint_pps(pp):
7437 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7438 return SEQUENCEOF(), pprint_any
7441 def ascii_visualize(ba):
7442 """Output only ASCII printable characters, like in hexdump -C
7444 Example output for given binary string (right part)::
7446 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.|
7449 return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7453 """Generate ``hexdump -C`` like output
7457 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..|
7458 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j|
7460 Result of that function is a generator of lines, where each line is
7465 ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7466 " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7467 " |i..?.....G...k.j|"]
7471 hexed = hexenc(raw).upper()
7472 addr, cols = 0, ["%08x " % 0]
7473 for i in six_xrange(0, len(hexed), 2):
7474 if i != 0 and i // 2 % 8 == 0:
7476 if i != 0 and i // 2 % 16 == 0:
7477 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7480 cols = ["%08x " % addr]
7481 cols.append(" " + hexed[i:i + 2])
7483 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7487 def browse(raw, obj, oid_maps=()):
7488 """Interactive browser
7490 :param bytes raw: binary data you decoded
7491 :param obj: decoded :py:class:`pyderasn.Obj`
7492 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7493 Its human readable form is printed when OID is met
7495 .. note:: `urwid <http://urwid.org/>`__ dependency required
7497 This browser is an interactive terminal application for browsing
7498 structures of your decoded ASN.1 objects. You can quit it with **q**
7499 key. It consists of three windows:
7502 View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7503 **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7504 **Left** key goes to constructed element above. **Plus**/**Minus**
7505 keys collapse/uncollapse constructed elements. **Space** toggles it
7507 window with various information about element. You can scroll it
7508 with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7510 window with raw data hexdump and highlighted current element's
7511 contents. It automatically focuses on element's data. You can
7512 scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7513 speed) keys. If element has explicit tag, then it also will be
7514 highlighted with different colour
7516 Window's header contains current decode path and progress bars with
7517 position in *info* and *hexdump* windows.
7519 If you press **d**, then current element will be saved in the
7520 current directory under its decode path name (adding ".0", ".1", etc
7521 suffix if such file already exists). **D** will save it with explicit tag.
7523 You can also invoke it with ``--browse`` command line argument.
7525 from copy import deepcopy
7526 from os.path import exists as path_exists
7529 class TW(urwid.TreeWidget):
7530 def __init__(self, state, *args, **kwargs):
7532 self.scrolled = {"info": False, "hexdump": False}
7533 super(TW, self).__init__(*args, **kwargs)
7536 pp = self.get_node().get_value()
7537 constructed = len(pp) > 1
7538 return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7540 def _state_update(self):
7541 pp, _ = self._get_pp()
7542 self.state["decode_path"].set_text(
7543 ":".join(str(p) for p in pp.decode_path)
7545 lines = deepcopy(self.state["hexed"])
7547 def attr_set(i, attr):
7548 line = lines[i // 16]
7549 idx = 1 + (i - 16 * (i // 16))
7550 line[idx] = (attr, line[idx])
7552 if pp.expl_offset is not None:
7553 for i in six_xrange(
7555 pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7557 attr_set(i, "select-expl")
7558 for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7559 attr_set(i, "select-value")
7560 self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7561 self.state["hexdump"].set_focus(pp.offset // 16)
7562 self.state["hexdump"].set_focus_valign("middle")
7563 self.state["hexdump_bar"].set_completion(
7564 (100 * pp.offset // 16) //
7565 len(self.state["hexdump"]._body.positions())
7569 [("header", "Name: "), pp.obj_name],
7570 [("header", "Type: "), pp.asn1_type_name],
7571 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7572 [("header", "[TLV]len: "), "%d/%d/%d" % (
7573 pp.tlen, pp.llen, pp.vlen,
7575 [("header", "TLVlen: "), "%d" % sum((
7576 pp.tlen, pp.llen, pp.vlen,
7578 [("header", "Slice: "), "[%d:%d]" % (
7579 pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7583 lines.append([("warning", "LENINDEF")])
7585 lines.append([("warning", "BER encoded")])
7587 lines.append([("warning", "BERed")])
7588 if pp.expl is not None:
7589 lines.append([("header", "EXPLICIT")])
7590 klass, _, num = pp.expl
7591 lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)])
7592 if pp.expl_offset is not None:
7593 lines.append([" Offset: %d" % pp.expl_offset])
7594 lines.append([" [TLV]len: %d/%d/%d" % (
7595 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7597 lines.append([" TLVlen: %d" % sum((
7598 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7600 lines.append([" Slice: [%d:%d]" % (
7602 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7604 if pp.impl is not None:
7605 klass, _, num = pp.impl
7607 ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7610 lines.append(["OPTIONAL"])
7612 lines.append(["DEFAULT"])
7613 if len(pp.decode_path) > 0:
7614 ent = pp.decode_path[-1]
7615 if isinstance(ent, DecodePathDefBy):
7617 value = str(ent.defined_by)
7618 oid_name = find_oid_name(
7619 ent.defined_by.asn1_type_name, oid_maps, value,
7621 lines.append([("header", "DEFINED BY: "), "%s" % (
7622 value if oid_name is None
7623 else "%s (%s)" % (oid_name, value)
7626 if pp.value is not None:
7627 lines.append([("header", "Value: "), pp.value])
7629 len(oid_maps) > 0 and
7630 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7632 for oid_map in oid_maps:
7633 oid_name = oid_map.get(pp.value)
7634 if oid_name is not None:
7635 lines.append([("header", "Human: "), oid_name])
7637 if pp.asn1_type_name == Integer.asn1_type_name:
7639 ("header", "Decimal: "), "%d" % int(pp.obj),
7642 ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7644 if pp.blob.__class__ == binary_type:
7645 blob = hexenc(pp.blob).upper()
7646 for i in six_xrange(0, len(blob), 32):
7647 lines.append([colonize_hex(blob[i:i + 32])])
7648 elif pp.blob.__class__ == tuple:
7649 lines.append([", ".join(pp.blob)])
7650 self.state["info"]._set_body([urwid.Text(line) for line in lines])
7651 self.state["info_bar"].set_completion(0)
7653 def selectable(self):
7654 if self.state["widget_current"] != self:
7655 self.state["widget_current"] = self
7656 self.scrolled["info"] = False
7657 self.scrolled["hexdump"] = False
7658 self._state_update()
7659 return super(TW, self).selectable()
7661 def get_display_text(self):
7662 pp, constructed = self._get_pp()
7663 style = "constructed" if constructed else ""
7664 if len(pp.decode_path) == 0:
7665 return (style, pp.obj_name)
7666 if pp.asn1_type_name == "EOC":
7667 return ("eoc", "EOC")
7668 ent = pp.decode_path[-1]
7669 if isinstance(ent, DecodePathDefBy):
7670 value = str(ent.defined_by)
7671 oid_name = find_oid_name(
7672 ent.defined_by.asn1_type_name, oid_maps, value,
7674 return ("defby", "DEFBY:" + (
7675 value if oid_name is None else oid_name
7679 def _scroll(self, what, step):
7680 self.state[what]._invalidate()
7681 pos = self.state[what].focus_position
7682 if not self.scrolled[what]:
7683 self.scrolled[what] = True
7685 pos = max(0, pos + step)
7686 pos = min(pos, len(self.state[what]._body.positions()) - 1)
7687 self.state[what].set_focus(pos)
7688 self.state[what].set_focus_valign("top")
7689 self.state[what + "_bar"].set_completion(
7690 (100 * pos) // len(self.state[what]._body.positions())
7693 def keypress(self, size, key):
7695 raise urwid.ExitMainLoop()
7698 self.expanded = not self.expanded
7699 self.update_expanded_icon()
7702 hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7703 if key in hexdump_steps:
7704 self._scroll("hexdump", hexdump_steps[key])
7707 info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7708 if key in info_steps:
7709 self._scroll("info", info_steps[key])
7712 if key in ("d", "D"):
7713 pp, _ = self._get_pp()
7714 dp = ":".join(str(p) for p in pp.decode_path)
7715 dp = dp.replace(" ", "_")
7718 if key == "d" or pp.expl_offset is None:
7719 data = self.state["raw"][pp.offset:(
7720 pp.offset + pp.tlen + pp.llen + pp.vlen
7723 data = self.state["raw"][pp.expl_offset:(
7724 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7728 def duplicate_path(dp, ctr):
7731 return "%s.%d" % (dp, ctr)
7734 if not path_exists(duplicate_path(dp, ctr)):
7737 dp = duplicate_path(dp, ctr)
7738 with open(dp, "wb") as fd:
7740 self.state["decode_path"].set_text(
7741 ("warning", "Saved to: " + dp)
7744 return super(TW, self).keypress(size, key)
7746 class PN(urwid.ParentNode):
7747 def __init__(self, state, value, *args, **kwargs):
7749 if not hasattr(value, "_fields"):
7751 super(PN, self).__init__(value, *args, **kwargs)
7753 def load_widget(self):
7754 return TW(self.state, self)
7756 def load_child_keys(self):
7757 value = self.get_value()
7758 if hasattr(value, "_fields"):
7760 return range(len(value[1:]))
7762 def load_child_node(self, key):
7765 self.get_value()[key + 1],
7768 depth=self.get_depth() + 1,
7771 class LabeledPG(urwid.ProgressBar):
7772 def __init__(self, label, *args, **kwargs):
7774 super(LabeledPG, self).__init__(*args, **kwargs)
7777 return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7779 WinHexdump = urwid.ListBox([urwid.Text("")])
7780 WinInfo = urwid.ListBox([urwid.Text("")])
7781 WinDecodePath = urwid.Text("", "center")
7782 WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7783 WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7784 WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7787 "hexed": list(hexdump(raw)),
7788 "widget_current": None,
7790 "info_bar": WinInfoBar,
7791 "hexdump": WinHexdump,
7792 "hexdump_bar": WinHexdumpBar,
7793 "decode_path": WinDecodePath,
7797 help_text = " ".join((
7799 "space:(un)collapse",
7800 "(pg)up/down/home/end:nav",
7801 "jkJK:hexdump hlHL:info",
7807 ("weight", 1, WinTree),
7808 ("weight", 2, urwid.Pile([
7809 urwid.LineBox(WinInfo),
7810 urwid.LineBox(WinHexdump),
7813 header=urwid.Columns([
7814 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7815 ("weight", 1, WinInfoBar),
7816 ("weight", 1, WinHexdumpBar),
7818 footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7821 ("header", "bold", ""),
7822 ("constructed", "bold", ""),
7823 ("help", "light magenta", ""),
7824 ("warning", "light red", ""),
7825 ("defby", "light red", ""),
7826 ("eoc", "dark red", ""),
7827 ("select-value", "light green", ""),
7828 ("select-expl", "light red", ""),
7829 ("pg-normal", "", "light blue"),
7830 ("pg-complete", "black", "yellow"),
7835 def main(): # pragma: no cover
7837 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7838 parser.add_argument(
7842 help="Skip that number of bytes from the beginning",
7844 parser.add_argument(
7846 help="Python paths to dictionary with OIDs, comma separated",
7848 parser.add_argument(
7850 help="Python path to schema definition to use",
7852 parser.add_argument(
7853 "--defines-by-path",
7854 help="Python path to decoder's defines_by_path",
7856 parser.add_argument(
7858 action="store_true",
7859 help="Disallow BER encoding",
7861 parser.add_argument(
7862 "--print-decode-path",
7863 action="store_true",
7864 help="Print decode paths",
7866 parser.add_argument(
7867 "--decode-path-only",
7868 help="Print only specified decode path",
7870 parser.add_argument(
7872 action="store_true",
7873 help="Allow explicit tag out-of-bound",
7875 parser.add_argument(
7877 action="store_true",
7878 help="Turn on event generation mode",
7880 parser.add_argument(
7882 action="store_true",
7883 help="Start ASN.1 browser",
7885 parser.add_argument(
7887 type=argparse.FileType("rb"),
7888 help="Path to BER/CER/DER file you want to decode",
7890 args = parser.parse_args()
7892 args.RAWFile.seek(args.skip)
7893 raw = memoryview(args.RAWFile.read())
7894 args.RAWFile.close()
7896 raw = file_mmaped(args.RAWFile)[args.skip:]
7898 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7899 if args.oids else ()
7901 from functools import partial
7903 schema = obj_by_path(args.schema)
7904 pprinter = partial(pprint, big_blobs=True)
7906 schema, pprinter = generic_decoder()
7908 "bered": not args.nobered,
7909 "allow_expl_oob": args.allow_expl_oob,
7911 if args.defines_by_path is not None:
7912 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7914 obj, _ = schema().decode(raw, ctx=ctx)
7915 browse(raw, obj, oid_maps)
7916 from sys import exit as sys_exit
7918 from os import environ
7922 with_colours=environ.get("NO_COLOR") is None,
7923 with_decode_path=args.print_decode_path,
7925 () if args.decode_path_only is None else
7926 tuple(args.decode_path_only.split(":"))
7930 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7931 print(pprinter(obj, decode_path=decode_path))
7933 obj, tail = schema().decode(raw, ctx=ctx)
7934 print(pprinter(obj))
7936 print("\nTrailing data: %s" % hexenc(tail))
7939 if __name__ == "__main__":
7940 from pyderasn import *