3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
670 SEQUENCE/SET values with DEFAULT specified are automatically decoded
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
687 with open("huge", "rb") as fd:
688 raw = file_mmaped(fd)
689 obj = Something.decode(raw)
693 mmap-ed files in Python2.7 does not implement buffer protocol, so
694 memoryview won't work on them.
698 mmap maps the **whole** file. So it plays no role if you seek-ed it
699 before. Take the slice of the resulting memoryview with required
704 If you use ZFS as underlying storage, then pay attention that
705 currently most platforms does not deal good with ZFS ARC and ordinary
706 page cache used for mmaps. It can take twice the necessary size in
707 the memory: both in page cache and ZFS ARC.
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
725 opener = io.open if PY2 else open
726 with opener("result", "wb") as fd:
727 obj.encode_cer(fd.write)
732 obj.encode_cer(buf.write)
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
737 data = encode_cer(obj)
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
746 SET OF values can not be streamingly encoded, because they are
747 required to be sorted byte-by-byte. Big SET OF values still will take
748 much memory. Use neither SET nor SET OF values, as modern ASN.1
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
762 class Certificate(Sequence):
766 class SignedAttributes(SetOf):
771 .. _agg_octet_string:
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
780 SOME STRING[CONSTRUCTED]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[CONSTRUCTED]
797 OCTET STRING[PRIMITIVE]
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
812 fd = open("data.p7m", "rb")
813 raw = file_mmaped(fd)
814 ctx = {"bered": True}
815 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816 if decode_path == ("content",):
820 raise ValueError("no content found")
821 hasher_state = sha512()
823 hasher_state.update(data)
825 evgens = SignedData().decode_evgen(
826 raw[content.offset:],
827 offset=content.offset,
830 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
832 digest = hasher_state.digest()
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
841 SEQUENCE OF iterators
842 _____________________
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
878 fulllen, state = obj.encode1st()
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
885 opener = io.open if PY2 else open
886 with opener("result", "wb") as fd:
887 obj.encode2nd(fd.write, iter(state))
891 You **MUST NOT** use 1st pass state if anything is changed in the
892 objects. It is intended to be used immediately after 1st pass is
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
908 .. autofunction:: pyderasn.browse
912 .. autoclass:: pyderasn.Obj
920 .. autoclass:: pyderasn.Boolean
925 .. autoclass:: pyderasn.Integer
926 :members: __init__, named, tohex
930 .. autoclass:: pyderasn.BitString
931 :members: __init__, bit_len, named
935 .. autoclass:: pyderasn.OctetString
940 .. autoclass:: pyderasn.Null
945 .. autoclass:: pyderasn.ObjectIdentifier
950 .. autoclass:: pyderasn.Enumerated
954 .. autoclass:: pyderasn.CommonString
958 .. autoclass:: pyderasn.NumericString
962 .. autoclass:: pyderasn.PrintableString
963 :members: __init__, allow_asterisk, allow_ampersand
967 .. autoclass:: pyderasn.IA5String
971 .. autoclass:: pyderasn.VisibleString
975 .. autoclass:: pyderasn.UTCTime
976 :members: __init__, todatetime
980 .. autoclass:: pyderasn.GeneralizedTime
981 :members: __init__, todatetime
988 .. autoclass:: pyderasn.Choice
989 :members: __init__, choice, value
993 .. autoclass:: PrimitiveTypes
997 .. autoclass:: pyderasn.Any
1005 .. autoclass:: pyderasn.Sequence
1010 .. autoclass:: pyderasn.Set
1015 .. autoclass:: pyderasn.SequenceOf
1020 .. autoclass:: pyderasn.SetOf
1026 .. autofunction:: pyderasn.abs_decode_path
1027 .. autofunction:: pyderasn.agg_octet_string
1028 .. autofunction:: pyderasn.ascii_visualize
1029 .. autofunction:: pyderasn.colonize_hex
1030 .. autofunction:: pyderasn.encode2pass
1031 .. autofunction:: pyderasn.encode_cer
1032 .. autofunction:: pyderasn.file_mmaped
1033 .. autofunction:: pyderasn.hexenc
1034 .. autofunction:: pyderasn.hexdec
1035 .. autofunction:: pyderasn.hexdump
1036 .. autofunction:: pyderasn.tag_encode
1037 .. autofunction:: pyderasn.tag_decode
1038 .. autofunction:: pyderasn.tag_ctxp
1039 .. autofunction:: pyderasn.tag_ctxc
1040 .. autoclass:: pyderasn.DecodeError
1042 .. autoclass:: pyderasn.NotEnoughData
1043 .. autoclass:: pyderasn.ExceedingData
1044 .. autoclass:: pyderasn.LenIndefForm
1045 .. autoclass:: pyderasn.TagMismatch
1046 .. autoclass:: pyderasn.InvalidLength
1047 .. autoclass:: pyderasn.InvalidOID
1048 .. autoclass:: pyderasn.ObjUnknown
1049 .. autoclass:: pyderasn.ObjNotReady
1050 .. autoclass:: pyderasn.InvalidValueType
1051 .. autoclass:: pyderasn.BoundsError
1058 You can decode DER/BER files using command line abilities::
1060 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1062 If there is no schema for your file, then you can try parsing it without,
1063 but of course IMPLICIT tags will often make it impossible. But result is
1064 good enough for the certificate above::
1066 $ python -m pyderasn path/to/file
1067 0 [1,3,1604] . >: SEQUENCE OF
1068 4 [1,3,1453] . . >: SEQUENCE OF
1069 8 [0,0, 5] . . . . >: [0] ANY
1070 . . . . . A0:03:02:01:02
1071 13 [1,1, 3] . . . . >: INTEGER 61595
1072 18 [1,1, 13] . . . . >: SEQUENCE OF
1073 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1074 31 [1,1, 0] . . . . . . >: NULL
1075 33 [1,3, 274] . . . . >: SEQUENCE OF
1076 37 [1,1, 11] . . . . . . >: SET OF
1077 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1078 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1079 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1081 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1082 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1083 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1084 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1085 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1086 . . . . . . . . . 61:2E:63:6F:6D:2F
1087 1461 [1,1, 13] . . >: SEQUENCE OF
1088 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1089 1474 [1,1, 0] . . . . >: NULL
1090 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1091 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1092 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1098 If you have got dictionaries with ObjectIdentifiers, like example one
1099 from ``tests/test_crts.py``::
1102 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1103 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1105 "2.5.4.10": "id-at-organizationName",
1106 "2.5.4.11": "id-at-organizationalUnitName",
1109 then you can pass it to pretty printer to see human readable OIDs::
1111 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1113 37 [1,1, 11] . . . . . . >: SET OF
1114 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1115 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1116 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1117 50 [1,1, 18] . . . . . . >: SET OF
1118 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1119 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1120 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1121 70 [1,1, 18] . . . . . . >: SET OF
1122 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1123 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1124 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1130 Each decoded element has so-called decode path: sequence of structure
1131 names it is passing during the decode process. Each element has its own
1132 unique path inside the whole ASN.1 tree. You can print it out with
1133 ``--print-decode-path`` option::
1135 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1136 0 [1,3,1604] Certificate SEQUENCE []
1137 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1138 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1139 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1140 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1141 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1142 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1144 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1145 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1146 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1147 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1148 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1149 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1150 . . . . . . . 13:02:45:53
1151 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1154 Now you can print only the specified tree, for example signature algorithm::
1156 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1157 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1158 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1159 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1163 from array import array
1164 from codecs import getdecoder
1165 from codecs import getencoder
1166 from collections import namedtuple
1167 from collections import OrderedDict
1168 from copy import copy
1169 from datetime import datetime
1170 from datetime import timedelta
1171 from io import BytesIO
1172 from math import ceil
1173 from mmap import mmap
1174 from mmap import PROT_READ
1175 from operator import attrgetter
1176 from string import ascii_letters
1177 from string import digits
1178 from sys import maxsize as sys_maxsize
1179 from sys import version_info
1180 from unicodedata import category as unicat
1182 from six import add_metaclass
1183 from six import binary_type
1184 from six import byte2int
1185 from six import indexbytes
1186 from six import int2byte
1187 from six import integer_types
1188 from six import iterbytes
1189 from six import iteritems
1190 from six import itervalues
1192 from six import string_types
1193 from six import text_type
1194 from six import unichr as six_unichr
1195 from six.moves import xrange as six_xrange
1199 from termcolor import colored
1200 except ImportError: # pragma: no cover
1201 def colored(what, *args, **kwargs):
1252 "TagClassApplication",
1255 "TagClassUniversal",
1256 "TagFormConstructed",
1267 TagClassUniversal = 0
1268 TagClassApplication = 1 << 6
1269 TagClassContext = 1 << 7
1270 TagClassPrivate = 1 << 6 | 1 << 7
1271 TagFormPrimitive = 0
1272 TagFormConstructed = 1 << 5
1274 TagClassContext: "",
1275 TagClassApplication: "APPLICATION ",
1276 TagClassPrivate: "PRIVATE ",
1277 TagClassUniversal: "UNIV ",
1281 LENINDEF = b"\x80" # length indefinite mark
1282 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1283 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1284 SET01 = frozenset("01")
1285 DECIMALS = frozenset(digits)
1286 DECIMAL_SIGNS = ".,"
1287 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1290 def file_mmaped(fd):
1291 """Make mmap-ed memoryview for reading from file
1293 :param fd: file object
1294 :returns: memoryview over read-only mmap-ing of the whole file
1296 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1300 if not set(value) <= DECIMALS:
1301 raise ValueError("non-pure integer")
1305 def fractions2float(fractions_raw):
1306 pureint(fractions_raw)
1307 return float("0." + fractions_raw)
1310 def get_def_by_path(defines_by_path, sub_decode_path):
1311 """Get define by decode path
1313 for path, define in defines_by_path:
1314 if len(path) != len(sub_decode_path):
1316 for p1, p2 in zip(path, sub_decode_path):
1317 if (p1 is not any) and (p1 != p2):
1323 ########################################################################
1325 ########################################################################
1327 class ASN1Error(ValueError):
1331 class DecodeError(ASN1Error):
1332 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1334 :param str msg: reason of decode failing
1335 :param klass: optional exact DecodeError inherited class (like
1336 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1337 :py:exc:`InvalidLength`)
1338 :param decode_path: tuple of strings. It contains human
1339 readable names of the fields through which
1340 decoding process has passed
1341 :param int offset: binary offset where failure happened
1343 super(DecodeError, self).__init__()
1346 self.decode_path = decode_path
1347 self.offset = offset
1352 "" if self.klass is None else self.klass.__name__,
1354 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1355 if len(self.decode_path) > 0 else ""
1357 ("(at %d)" % self.offset) if self.offset > 0 else "",
1363 return "%s(%s)" % (self.__class__.__name__, self)
1366 class NotEnoughData(DecodeError):
1370 class ExceedingData(ASN1Error):
1371 def __init__(self, nbytes):
1372 super(ExceedingData, self).__init__()
1373 self.nbytes = nbytes
1376 return "%d trailing bytes" % self.nbytes
1379 return "%s(%s)" % (self.__class__.__name__, self)
1382 class LenIndefForm(DecodeError):
1386 class TagMismatch(DecodeError):
1390 class InvalidLength(DecodeError):
1394 class InvalidOID(DecodeError):
1398 class ObjUnknown(ASN1Error):
1399 def __init__(self, name):
1400 super(ObjUnknown, self).__init__()
1404 return "object is unknown: %s" % self.name
1407 return "%s(%s)" % (self.__class__.__name__, self)
1410 class ObjNotReady(ASN1Error):
1411 def __init__(self, name):
1412 super(ObjNotReady, self).__init__()
1416 return "object is not ready: %s" % self.name
1419 return "%s(%s)" % (self.__class__.__name__, self)
1422 class InvalidValueType(ASN1Error):
1423 def __init__(self, expected_types):
1424 super(InvalidValueType, self).__init__()
1425 self.expected_types = expected_types
1428 return "invalid value type, expected: %s" % ", ".join(
1429 [repr(t) for t in self.expected_types]
1433 return "%s(%s)" % (self.__class__.__name__, self)
1436 class BoundsError(ASN1Error):
1437 def __init__(self, bound_min, value, bound_max):
1438 super(BoundsError, self).__init__()
1439 self.bound_min = bound_min
1441 self.bound_max = bound_max
1444 return "unsatisfied bounds: %s <= %s <= %s" % (
1451 return "%s(%s)" % (self.__class__.__name__, self)
1454 ########################################################################
1456 ########################################################################
1458 _hexdecoder = getdecoder("hex")
1459 _hexencoder = getencoder("hex")
1463 """Binary data to hexadecimal string convert
1465 return _hexdecoder(data)[0]
1469 """Hexadecimal string to binary data convert
1471 return _hexencoder(data)[0].decode("ascii")
1474 def int_bytes_len(num, byte_len=8):
1477 return int(ceil(float(num.bit_length()) / byte_len))
1480 def zero_ended_encode(num):
1481 octets = bytearray(int_bytes_len(num, 7))
1483 octets[i] = num & 0x7F
1487 octets[i] = 0x80 | (num & 0x7F)
1490 return bytes(octets)
1493 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1494 """Encode tag to binary form
1496 :param int num: tag's number
1497 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1498 :py:data:`pyderasn.TagClassContext`,
1499 :py:data:`pyderasn.TagClassApplication`,
1500 :py:data:`pyderasn.TagClassPrivate`)
1501 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1502 :py:data:`pyderasn.TagFormConstructed`)
1506 return int2byte(klass | form | num)
1507 # [XX|X|11111][1.......][1.......] ... [0.......]
1508 return int2byte(klass | form | 31) + zero_ended_encode(num)
1511 def tag_decode(tag):
1512 """Decode tag from binary form
1516 No validation is performed, assuming that it has already passed.
1518 It returns tuple with three integers, as
1519 :py:func:`pyderasn.tag_encode` accepts.
1521 first_octet = byte2int(tag)
1522 klass = first_octet & 0xC0
1523 form = first_octet & 0x20
1524 if first_octet & 0x1F < 0x1F:
1525 return (klass, form, first_octet & 0x1F)
1527 for octet in iterbytes(tag[1:]):
1530 return (klass, form, num)
1534 """Create CONTEXT PRIMITIVE tag
1536 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1540 """Create CONTEXT CONSTRUCTED tag
1542 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1545 def tag_strip(data):
1546 """Take off tag from the data
1548 :returns: (encoded tag, tag length, remaining data)
1551 raise NotEnoughData("no data at all")
1552 if byte2int(data) & 0x1F < 31:
1553 return data[:1], 1, data[1:]
1558 raise DecodeError("unfinished tag")
1559 if indexbytes(data, i) & 0x80 == 0:
1561 if i == 1 and indexbytes(data, 1) < 0x1F:
1562 raise DecodeError("unexpected long form")
1563 if i > 1 and indexbytes(data, 1) & 0x7F == 0:
1564 raise DecodeError("leading zero byte in tag value")
1566 return data[:i], i, data[i:]
1572 octets = bytearray(int_bytes_len(l) + 1)
1573 octets[0] = 0x80 | (len(octets) - 1)
1574 for i in six_xrange(len(octets) - 1, 0, -1):
1575 octets[i] = l & 0xFF
1577 return bytes(octets)
1580 def len_decode(data):
1583 :returns: (decoded length, length's length, remaining data)
1584 :raises LenIndefForm: if indefinite form encoding is met
1587 raise NotEnoughData("no data at all")
1588 first_octet = byte2int(data)
1589 if first_octet & 0x80 == 0:
1590 return first_octet, 1, data[1:]
1591 octets_num = first_octet & 0x7F
1592 if octets_num + 1 > len(data):
1593 raise NotEnoughData("encoded length is longer than data")
1595 raise LenIndefForm()
1596 if byte2int(data[1:]) == 0:
1597 raise DecodeError("leading zeros")
1599 for v in iterbytes(data[1:1 + octets_num]):
1602 raise DecodeError("long form instead of short one")
1603 return l, 1 + octets_num, data[1 + octets_num:]
1606 LEN0 = len_encode(0)
1607 LEN1 = len_encode(1)
1608 LEN1K = len_encode(1000)
1612 """How many bytes length field will take
1616 if l < 256: # 1 << 8
1618 if l < 65536: # 1 << 16
1620 if l < 16777216: # 1 << 24
1622 if l < 4294967296: # 1 << 32
1624 if l < 1099511627776: # 1 << 40
1626 if l < 281474976710656: # 1 << 48
1628 if l < 72057594037927936: # 1 << 56
1630 raise OverflowError("too big length")
1633 def write_full(writer, data):
1634 """Fully write provided data
1636 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1638 BytesIO does not guarantee that the whole data will be written at
1639 once. That function write everything provided, raising an error if
1640 ``writer`` returns None.
1642 data = memoryview(data)
1644 while written != len(data):
1645 n = writer(data[written:])
1647 raise ValueError("can not write to buf")
1651 # If it is 64-bit system, then use compact 64-bit array of unsigned
1652 # longs. Use an ordinary list with universal integers otherwise, that
1654 if sys_maxsize > 2 ** 32:
1655 def state_2pass_new():
1658 def state_2pass_new():
1662 ########################################################################
1664 ########################################################################
1666 class AutoAddSlots(type):
1667 def __new__(cls, name, bases, _dict):
1668 _dict["__slots__"] = _dict.get("__slots__", ())
1669 return type.__new__(cls, name, bases, _dict)
1672 BasicState = namedtuple("BasicState", (
1685 ), **NAMEDTUPLE_KWARGS)
1688 @add_metaclass(AutoAddSlots)
1690 """Common ASN.1 object class
1692 All ASN.1 types are inherited from it. It has metaclass that
1693 automatically adds ``__slots__`` to all inherited classes.
1718 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1719 self._expl = getattr(self, "expl", None) if expl is None else expl
1720 if self.tag != self.tag_default and self._expl is not None:
1721 raise ValueError("implicit and explicit tags can not be set simultaneously")
1722 if self.tag is None:
1723 self._tag_order = None
1725 tag_class, _, tag_num = tag_decode(
1726 self.tag if self._expl is None else self._expl
1728 self._tag_order = (tag_class, tag_num)
1729 if default is not None:
1731 self.optional = optional
1732 self.offset, self.llen, self.vlen = _decoded
1734 self.expl_lenindef = False
1735 self.lenindef = False
1736 self.ber_encoded = False
1739 def ready(self): # pragma: no cover
1740 """Is object ready to be encoded?
1742 raise NotImplementedError()
1744 def _assert_ready(self):
1746 raise ObjNotReady(self.__class__.__name__)
1750 """Is either object or any elements inside is BER encoded?
1752 return self.expl_lenindef or self.lenindef or self.ber_encoded
1756 """Is object decoded?
1758 return (self.llen + self.vlen) > 0
1760 def __getstate__(self): # pragma: no cover
1761 """Used for making safe to be mutable pickleable copies
1763 raise NotImplementedError()
1765 def __setstate__(self, state):
1766 if state.version != __version__:
1767 raise ValueError("data is pickled by different PyDERASN version")
1768 self.tag = state.tag
1769 self._tag_order = state.tag_order
1770 self._expl = state.expl
1771 self.default = state.default
1772 self.optional = state.optional
1773 self.offset = state.offset
1774 self.llen = state.llen
1775 self.vlen = state.vlen
1776 self.expl_lenindef = state.expl_lenindef
1777 self.lenindef = state.lenindef
1778 self.ber_encoded = state.ber_encoded
1781 def tag_order(self):
1782 """Tag's (class, number) used for DER/CER sorting
1784 return self._tag_order
1787 def tag_order_cer(self):
1788 return self.tag_order
1792 """.. seealso:: :ref:`decoding`
1794 return len(self.tag)
1798 """.. seealso:: :ref:`decoding`
1800 return self.tlen + self.llen + self.vlen
1802 def __str__(self): # pragma: no cover
1803 return self.__bytes__() if PY2 else self.__unicode__()
1805 def __ne__(self, their):
1806 return not(self == their)
1808 def __gt__(self, their): # pragma: no cover
1809 return not(self < their)
1811 def __le__(self, their): # pragma: no cover
1812 return (self == their) or (self < their)
1814 def __ge__(self, their): # pragma: no cover
1815 return (self == their) or (self > their)
1817 def _encode(self): # pragma: no cover
1818 raise NotImplementedError()
1820 def _encode_cer(self, writer):
1821 write_full(writer, self._encode())
1823 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1824 yield NotImplemented
1826 def _encode1st(self, state):
1827 raise NotImplementedError()
1829 def _encode2nd(self, writer, state_iter):
1830 raise NotImplementedError()
1833 """DER encode the structure
1835 :returns: DER representation
1837 raw = self._encode()
1838 if self._expl is None:
1840 return b"".join((self._expl, len_encode(len(raw)), raw))
1842 def encode1st(self, state=None):
1843 """Do the 1st pass of 2-pass encoding
1845 :rtype: (int, array("L"))
1846 :returns: full length of encoded data and precalculated various
1850 state = state_2pass_new()
1851 if self._expl is None:
1852 return self._encode1st(state)
1854 idx = len(state) - 1
1855 vlen, _ = self._encode1st(state)
1857 fulllen = len(self._expl) + len_size(vlen) + vlen
1858 return fulllen, state
1860 def encode2nd(self, writer, state_iter):
1861 """Do the 2nd pass of 2-pass encoding
1863 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1864 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1866 if self._expl is None:
1867 self._encode2nd(writer, state_iter)
1869 write_full(writer, self._expl + len_encode(next(state_iter)))
1870 self._encode2nd(writer, state_iter)
1872 def encode_cer(self, writer):
1873 """CER encode the structure to specified writer
1875 :param writer: must comply with ``io.RawIOBase.write``
1876 behaviour. It takes slice to be written and
1877 returns number of bytes processed. If it returns
1878 None, then exception will be raised
1880 if self._expl is not None:
1881 write_full(writer, self._expl + LENINDEF)
1882 if getattr(self, "der_forced", False):
1883 write_full(writer, self._encode())
1885 self._encode_cer(writer)
1886 if self._expl is not None:
1887 write_full(writer, EOC)
1889 def hexencode(self):
1890 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1892 return hexenc(self.encode())
1902 _ctx_immutable=True,
1906 :param data: either binary or memoryview
1907 :param int offset: initial data's offset
1908 :param bool leavemm: do we need to leave memoryview of remaining
1909 data as is, or convert it to bytes otherwise
1910 :param decode_path: current decode path (tuples of strings,
1911 possibly with DecodePathDefBy) with will be
1912 the root for all underlying objects
1913 :param ctx: optional :ref:`context <ctx>` governing decoding process
1914 :param bool tag_only: decode only the tag, without length and
1915 contents (used only in Choice and Set
1916 structures, trying to determine if tag satisfies
1918 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1920 :returns: (Obj, remaining data)
1922 .. seealso:: :ref:`decoding`
1924 result = next(self.decode_evgen(
1936 _, obj, tail = result
1947 _ctx_immutable=True,
1950 """Decode with evgen mode on
1952 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1953 it returns the generator producing ``(decode_path, obj, tail)``
1955 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1959 elif _ctx_immutable:
1961 tlv = memoryview(data)
1964 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1967 if self._expl is None:
1968 for result in self._decode(
1971 decode_path=decode_path,
1974 evgen_mode=_evgen_mode,
1979 _decode_path, obj, tail = result
1980 if _decode_path is not decode_path:
1984 t, tlen, lv = tag_strip(tlv)
1985 except DecodeError as err:
1986 raise err.__class__(
1988 klass=self.__class__,
1989 decode_path=decode_path,
1994 klass=self.__class__,
1995 decode_path=decode_path,
1999 l, llen, v = len_decode(lv)
2000 except LenIndefForm as err:
2001 if not ctx.get("bered", False):
2002 raise err.__class__(
2004 klass=self.__class__,
2005 decode_path=decode_path,
2009 offset += tlen + llen
2010 for result in self._decode(
2013 decode_path=decode_path,
2016 evgen_mode=_evgen_mode,
2018 if tag_only: # pragma: no cover
2021 _decode_path, obj, tail = result
2022 if _decode_path is not decode_path:
2024 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2025 if eoc_expected.tobytes() != EOC:
2028 klass=self.__class__,
2029 decode_path=decode_path,
2033 obj.expl_lenindef = True
2034 except DecodeError as err:
2035 raise err.__class__(
2037 klass=self.__class__,
2038 decode_path=decode_path,
2043 raise NotEnoughData(
2044 "encoded length is longer than data",
2045 klass=self.__class__,
2046 decode_path=decode_path,
2049 for result in self._decode(
2051 offset=offset + tlen + llen,
2052 decode_path=decode_path,
2055 evgen_mode=_evgen_mode,
2057 if tag_only: # pragma: no cover
2060 _decode_path, obj, tail = result
2061 if _decode_path is not decode_path:
2063 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2065 "explicit tag out-of-bound, longer than data",
2066 klass=self.__class__,
2067 decode_path=decode_path,
2070 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2072 def decod(self, data, offset=0, decode_path=(), ctx=None):
2073 """Decode the data, check that tail is empty
2075 :raises ExceedingData: if tail is not empty
2077 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2078 (decode without tail) that also checks that there is no
2081 obj, tail = self.decode(
2084 decode_path=decode_path,
2089 raise ExceedingData(len(tail))
2092 def hexdecode(self, data, *args, **kwargs):
2093 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2095 return self.decode(hexdec(data), *args, **kwargs)
2097 def hexdecod(self, data, *args, **kwargs):
2098 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2100 return self.decod(hexdec(data), *args, **kwargs)
2104 """.. seealso:: :ref:`decoding`
2106 return self._expl is not None
2110 """.. seealso:: :ref:`decoding`
2115 def expl_tlen(self):
2116 """.. seealso:: :ref:`decoding`
2118 return len(self._expl)
2121 def expl_llen(self):
2122 """.. seealso:: :ref:`decoding`
2124 if self.expl_lenindef:
2126 return len(len_encode(self.tlvlen))
2129 def expl_offset(self):
2130 """.. seealso:: :ref:`decoding`
2132 return self.offset - self.expl_tlen - self.expl_llen
2135 def expl_vlen(self):
2136 """.. seealso:: :ref:`decoding`
2141 def expl_tlvlen(self):
2142 """.. seealso:: :ref:`decoding`
2144 return self.expl_tlen + self.expl_llen + self.expl_vlen
2147 def fulloffset(self):
2148 """.. seealso:: :ref:`decoding`
2150 return self.expl_offset if self.expled else self.offset
2154 """.. seealso:: :ref:`decoding`
2156 return self.expl_tlvlen if self.expled else self.tlvlen
2158 def pps_lenindef(self, decode_path):
2159 if self.lenindef and not (
2160 getattr(self, "defined", None) is not None and
2161 self.defined[1].lenindef
2164 asn1_type_name="EOC",
2166 decode_path=decode_path,
2168 self.offset + self.tlvlen -
2169 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2177 if self.expl_lenindef:
2179 asn1_type_name="EOC",
2180 obj_name="EXPLICIT",
2181 decode_path=decode_path,
2182 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2191 def encode_cer(obj):
2192 """Encode to CER in memory buffer
2194 :returns bytes: memory buffer contents
2197 obj.encode_cer(buf.write)
2198 return buf.getvalue()
2201 def encode2pass(obj):
2202 """Encode (2-pass mode) to DER in memory buffer
2204 :returns bytes: memory buffer contents
2207 _, state = obj.encode1st()
2208 obj.encode2nd(buf.write, iter(state))
2209 return buf.getvalue()
2212 class DecodePathDefBy(object):
2213 """DEFINED BY representation inside decode path
2215 __slots__ = ("defined_by",)
2217 def __init__(self, defined_by):
2218 self.defined_by = defined_by
2220 def __ne__(self, their):
2221 return not(self == their)
2223 def __eq__(self, their):
2224 if not isinstance(their, self.__class__):
2226 return self.defined_by == their.defined_by
2229 return "DEFINED BY " + str(self.defined_by)
2232 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2235 ########################################################################
2237 ########################################################################
2239 PP = namedtuple("PP", (
2262 ), **NAMEDTUPLE_KWARGS)
2267 asn1_type_name="unknown",
2284 expl_lenindef=False,
2315 def _colourize(what, colour, with_colours, attrs=("bold",)):
2316 return colored(what, colour, attrs=attrs) if with_colours else what
2319 def colonize_hex(hexed):
2320 """Separate hexadecimal string with colons
2322 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2325 def find_oid_name(asn1_type_name, oid_maps, value):
2326 if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2327 for oid_map in oid_maps:
2328 oid_name = oid_map.get(value)
2329 if oid_name is not None:
2340 with_decode_path=False,
2341 decode_path_len_decrease=0,
2348 " " if pp.expl_offset is None else
2349 ("-%d" % (pp.offset - pp.expl_offset))
2351 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2353 col = _colourize(col, "red", with_colours, ())
2354 col += _colourize("B", "red", with_colours) if pp.bered else " "
2356 col = "[%d,%d,%4d]%s" % (
2357 pp.tlen, pp.llen, pp.vlen,
2358 LENINDEF_PP_CHAR if pp.lenindef else " "
2360 col = _colourize(col, "green", with_colours, ())
2362 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2363 if decode_path_len > 0:
2364 cols.append(" ." * decode_path_len)
2365 ent = pp.decode_path[-1]
2366 if isinstance(ent, DecodePathDefBy):
2367 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2368 value = str(ent.defined_by)
2369 oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2370 if oid_name is None:
2371 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2373 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2375 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2376 if pp.expl is not None:
2377 klass, _, num = pp.expl
2378 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2379 cols.append(_colourize(col, "blue", with_colours))
2380 if pp.impl is not None:
2381 klass, _, num = pp.impl
2382 col = "[%s%d]" % (TagClassReprs[klass], num)
2383 cols.append(_colourize(col, "blue", with_colours))
2384 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2385 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2387 cols.append(_colourize("BER", "red", with_colours))
2388 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2389 if pp.value is not None:
2391 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2392 oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2393 if oid_name is not None:
2394 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2395 if pp.asn1_type_name == Integer.asn1_type_name:
2396 cols.append(_colourize(
2397 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2400 if pp.blob.__class__ == binary_type:
2401 cols.append(hexenc(pp.blob))
2402 elif pp.blob.__class__ == tuple:
2403 cols.append(", ".join(pp.blob))
2405 cols.append(_colourize("OPTIONAL", "red", with_colours))
2407 cols.append(_colourize("DEFAULT", "red", with_colours))
2408 if with_decode_path:
2409 cols.append(_colourize(
2410 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2414 return " ".join(cols)
2417 def pp_console_blob(pp, decode_path_len_decrease=0):
2418 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2419 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2420 if decode_path_len > 0:
2421 cols.append(" ." * (decode_path_len + 1))
2422 if pp.blob.__class__ == binary_type:
2423 blob = hexenc(pp.blob).upper()
2424 for i in six_xrange(0, len(blob), 32):
2425 chunk = blob[i:i + 32]
2426 yield " ".join(cols + [colonize_hex(chunk)])
2427 elif pp.blob.__class__ == tuple:
2428 yield " ".join(cols + [", ".join(pp.blob)])
2436 with_decode_path=False,
2437 decode_path_only=(),
2440 """Pretty print object
2442 :param Obj obj: object you want to pretty print
2443 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2444 Its human readable form is printed when OID is met
2445 :param big_blobs: if large binary objects are met (like OctetString
2446 values), do we need to print them too, on separate
2448 :param with_colours: colourize output, if ``termcolor`` library
2450 :param with_decode_path: print decode path
2451 :param decode_path_only: print only that specified decode path
2453 def _pprint_pps(pps):
2455 if hasattr(pp, "_fields"):
2457 decode_path_only != () and
2459 str(p) for p in pp.decode_path[:len(decode_path_only)]
2460 ) != decode_path_only
2464 yield pp_console_row(
2469 with_colours=with_colours,
2470 with_decode_path=with_decode_path,
2471 decode_path_len_decrease=len(decode_path_only),
2473 for row in pp_console_blob(
2475 decode_path_len_decrease=len(decode_path_only),
2479 yield pp_console_row(
2484 with_colours=with_colours,
2485 with_decode_path=with_decode_path,
2486 decode_path_len_decrease=len(decode_path_only),
2489 for row in _pprint_pps(pp):
2491 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2494 ########################################################################
2495 # ASN.1 primitive types
2496 ########################################################################
2498 BooleanState = namedtuple(
2500 BasicState._fields + ("value",),
2506 """``BOOLEAN`` boolean type
2508 >>> b = Boolean(True)
2510 >>> b == Boolean(True)
2516 tag_default = tag_encode(1)
2517 asn1_type_name = "BOOLEAN"
2529 :param value: set the value. Either boolean type, or
2530 :py:class:`pyderasn.Boolean` object
2531 :param bytes impl: override default tag with ``IMPLICIT`` one
2532 :param bytes expl: override default tag with ``EXPLICIT`` one
2533 :param default: set default value. Type same as in ``value``
2534 :param bool optional: is object ``OPTIONAL`` in sequence
2536 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2537 self._value = None if value is None else self._value_sanitize(value)
2538 if default is not None:
2539 default = self._value_sanitize(default)
2540 self.default = self.__class__(
2546 self._value = default
2548 def _value_sanitize(self, value):
2549 if value.__class__ == bool:
2551 if issubclass(value.__class__, Boolean):
2553 raise InvalidValueType((self.__class__, bool))
2557 return self._value is not None
2559 def __getstate__(self):
2560 return BooleanState(
2576 def __setstate__(self, state):
2577 super(Boolean, self).__setstate__(state)
2578 self._value = state.value
2580 def __nonzero__(self):
2581 self._assert_ready()
2585 self._assert_ready()
2588 def __eq__(self, their):
2589 if their.__class__ == bool:
2590 return self._value == their
2591 if not issubclass(their.__class__, Boolean):
2594 self._value == their._value and
2595 self.tag == their.tag and
2596 self._expl == their._expl
2607 return self.__class__(
2609 impl=self.tag if impl is None else impl,
2610 expl=self._expl if expl is None else expl,
2611 default=self.default if default is None else default,
2612 optional=self.optional if optional is None else optional,
2616 self._assert_ready()
2617 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2619 def _encode1st(self, state):
2620 return len(self.tag) + 2, state
2622 def _encode2nd(self, writer, state_iter):
2623 self._assert_ready()
2624 write_full(writer, self._encode())
2626 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2628 t, _, lv = tag_strip(tlv)
2629 except DecodeError as err:
2630 raise err.__class__(
2632 klass=self.__class__,
2633 decode_path=decode_path,
2638 klass=self.__class__,
2639 decode_path=decode_path,
2646 l, _, v = len_decode(lv)
2647 except DecodeError as err:
2648 raise err.__class__(
2650 klass=self.__class__,
2651 decode_path=decode_path,
2655 raise InvalidLength(
2656 "Boolean's length must be equal to 1",
2657 klass=self.__class__,
2658 decode_path=decode_path,
2662 raise NotEnoughData(
2663 "encoded length is longer than data",
2664 klass=self.__class__,
2665 decode_path=decode_path,
2668 first_octet = byte2int(v)
2670 if first_octet == 0:
2672 elif first_octet == 0xFF:
2674 elif ctx.get("bered", False):
2679 "unacceptable Boolean value",
2680 klass=self.__class__,
2681 decode_path=decode_path,
2684 obj = self.__class__(
2688 default=self.default,
2689 optional=self.optional,
2690 _decoded=(offset, 1, 1),
2692 obj.ber_encoded = ber_encoded
2693 yield decode_path, obj, v[1:]
2696 return pp_console_row(next(self.pps()))
2698 def pps(self, decode_path=()):
2701 asn1_type_name=self.asn1_type_name,
2702 obj_name=self.__class__.__name__,
2703 decode_path=decode_path,
2704 value=str(self._value) if self.ready else None,
2705 optional=self.optional,
2706 default=self == self.default,
2707 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2708 expl=None if self._expl is None else tag_decode(self._expl),
2713 expl_offset=self.expl_offset if self.expled else None,
2714 expl_tlen=self.expl_tlen if self.expled else None,
2715 expl_llen=self.expl_llen if self.expled else None,
2716 expl_vlen=self.expl_vlen if self.expled else None,
2717 expl_lenindef=self.expl_lenindef,
2718 ber_encoded=self.ber_encoded,
2721 for pp in self.pps_lenindef(decode_path):
2725 IntegerState = namedtuple(
2727 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2733 """``INTEGER`` integer type
2735 >>> b = Integer(-123)
2737 >>> b == Integer(-123)
2742 >>> Integer(2, bounds=(1, 3))
2744 >>> Integer(5, bounds=(1, 3))
2745 Traceback (most recent call last):
2746 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2750 class Version(Integer):
2757 >>> v = Version("v1")
2764 {'v3': 2, 'v1': 0, 'v2': 1}
2766 __slots__ = ("specs", "_bound_min", "_bound_max")
2767 tag_default = tag_encode(2)
2768 asn1_type_name = "INTEGER"
2782 :param value: set the value. Either integer type, named value
2783 (if ``schema`` is specified in the class), or
2784 :py:class:`pyderasn.Integer` object
2785 :param bounds: set ``(MIN, MAX)`` value constraint.
2786 (-inf, +inf) by default
2787 :param bytes impl: override default tag with ``IMPLICIT`` one
2788 :param bytes expl: override default tag with ``EXPLICIT`` one
2789 :param default: set default value. Type same as in ``value``
2790 :param bool optional: is object ``OPTIONAL`` in sequence
2792 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2794 specs = getattr(self, "schema", {}) if _specs is None else _specs
2795 self.specs = specs if specs.__class__ == dict else dict(specs)
2796 self._bound_min, self._bound_max = getattr(
2799 (float("-inf"), float("+inf")),
2800 ) if bounds is None else bounds
2801 if value is not None:
2802 self._value = self._value_sanitize(value)
2803 if default is not None:
2804 default = self._value_sanitize(default)
2805 self.default = self.__class__(
2811 if self._value is None:
2812 self._value = default
2814 def _value_sanitize(self, value):
2815 if isinstance(value, integer_types):
2817 elif issubclass(value.__class__, Integer):
2818 value = value._value
2819 elif value.__class__ == str:
2820 value = self.specs.get(value)
2822 raise ObjUnknown("integer value: %s" % value)
2824 raise InvalidValueType((self.__class__, int, str))
2825 if not self._bound_min <= value <= self._bound_max:
2826 raise BoundsError(self._bound_min, value, self._bound_max)
2831 return self._value is not None
2833 def __getstate__(self):
2834 return IntegerState(
2853 def __setstate__(self, state):
2854 super(Integer, self).__setstate__(state)
2855 self.specs = state.specs
2856 self._value = state.value
2857 self._bound_min = state.bound_min
2858 self._bound_max = state.bound_max
2861 self._assert_ready()
2862 return int(self._value)
2865 """Hexadecimal representation
2867 Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2869 hex_repr = hex(int(self))[2:].upper()
2870 if len(hex_repr) % 2 != 0:
2871 hex_repr = "0" + hex_repr
2875 self._assert_ready()
2876 return hash(b"".join((
2878 bytes(self._expl or b""),
2879 str(self._value).encode("ascii"),
2882 def __eq__(self, their):
2883 if isinstance(their, integer_types):
2884 return self._value == their
2885 if not issubclass(their.__class__, Integer):
2888 self._value == their._value and
2889 self.tag == their.tag and
2890 self._expl == their._expl
2893 def __lt__(self, their):
2894 return self._value < their._value
2898 """Return named representation (if exists) of the value
2900 for name, value in iteritems(self.specs):
2901 if value == self._value:
2914 return self.__class__(
2917 (self._bound_min, self._bound_max)
2918 if bounds is None else bounds
2920 impl=self.tag if impl is None else impl,
2921 expl=self._expl if expl is None else expl,
2922 default=self.default if default is None else default,
2923 optional=self.optional if optional is None else optional,
2927 def _encode_payload(self):
2928 self._assert_ready()
2932 octets = bytearray([0])
2936 octets = bytearray()
2938 octets.append((value & 0xFF) ^ 0xFF)
2940 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2943 octets = bytearray()
2945 octets.append(value & 0xFF)
2947 if octets[-1] & 0x80 > 0:
2950 octets = bytes(octets)
2952 bytes_len = ceil(value.bit_length() / 8) or 1
2955 octets = value.to_bytes(
2960 except OverflowError:
2967 octets = self._encode_payload()
2968 return b"".join((self.tag, len_encode(len(octets)), octets))
2970 def _encode1st(self, state):
2971 l = len(self._encode_payload())
2972 return len(self.tag) + len_size(l) + l, state
2974 def _encode2nd(self, writer, state_iter):
2975 write_full(writer, self._encode())
2977 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2979 t, _, lv = tag_strip(tlv)
2980 except DecodeError as err:
2981 raise err.__class__(
2983 klass=self.__class__,
2984 decode_path=decode_path,
2989 klass=self.__class__,
2990 decode_path=decode_path,
2997 l, llen, v = len_decode(lv)
2998 except DecodeError as err:
2999 raise err.__class__(
3001 klass=self.__class__,
3002 decode_path=decode_path,
3006 raise NotEnoughData(
3007 "encoded length is longer than data",
3008 klass=self.__class__,
3009 decode_path=decode_path,
3013 raise NotEnoughData(
3015 klass=self.__class__,
3016 decode_path=decode_path,
3019 v, tail = v[:l], v[l:]
3020 first_octet = byte2int(v)
3022 second_octet = byte2int(v[1:])
3024 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3025 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3028 "non normalized integer",
3029 klass=self.__class__,
3030 decode_path=decode_path,
3035 if first_octet & 0x80 > 0:
3036 octets = bytearray()
3037 for octet in bytearray(v):
3038 octets.append(octet ^ 0xFF)
3039 for octet in octets:
3040 value = (value << 8) | octet
3044 for octet in bytearray(v):
3045 value = (value << 8) | octet
3047 value = int.from_bytes(v, byteorder="big", signed=True)
3049 obj = self.__class__(
3051 bounds=(self._bound_min, self._bound_max),
3054 default=self.default,
3055 optional=self.optional,
3057 _decoded=(offset, llen, l),
3059 except BoundsError as err:
3062 klass=self.__class__,
3063 decode_path=decode_path,
3066 yield decode_path, obj, tail
3069 return pp_console_row(next(self.pps()))
3071 def pps(self, decode_path=()):
3074 asn1_type_name=self.asn1_type_name,
3075 obj_name=self.__class__.__name__,
3076 decode_path=decode_path,
3077 value=(self.named or str(self._value)) if self.ready else None,
3078 optional=self.optional,
3079 default=self == self.default,
3080 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3081 expl=None if self._expl is None else tag_decode(self._expl),
3086 expl_offset=self.expl_offset if self.expled else None,
3087 expl_tlen=self.expl_tlen if self.expled else None,
3088 expl_llen=self.expl_llen if self.expled else None,
3089 expl_vlen=self.expl_vlen if self.expled else None,
3090 expl_lenindef=self.expl_lenindef,
3093 for pp in self.pps_lenindef(decode_path):
3097 BitStringState = namedtuple(
3099 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3104 class BitString(Obj):
3105 """``BIT STRING`` bit string type
3107 >>> BitString(b"hello world")
3108 BIT STRING 88 bits 68656c6c6f20776f726c64
3111 >>> b == b"hello world"
3116 >>> BitString("'0A3B5F291CD'H")
3117 BIT STRING 44 bits 0a3b5f291cd0
3118 >>> b = BitString("'010110000000'B")
3119 BIT STRING 12 bits 5800
3122 >>> b[0], b[1], b[2], b[3]
3123 (False, True, False, True)
3127 [False, True, False, True, True, False, False, False, False, False, False, False]
3131 class KeyUsage(BitString):
3133 ("digitalSignature", 0),
3134 ("nonRepudiation", 1),
3135 ("keyEncipherment", 2),
3138 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3139 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3141 ['nonRepudiation', 'keyEncipherment']
3143 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3147 Pay attention that BIT STRING can be encoded both in primitive
3148 and constructed forms. Decoder always checks constructed form tag
3149 additionally to specified primitive one. If BER decoding is
3150 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3151 of DER restrictions.
3153 __slots__ = ("tag_constructed", "specs", "defined")
3154 tag_default = tag_encode(3)
3155 asn1_type_name = "BIT STRING"
3168 :param value: set the value. Either binary type, tuple of named
3169 values (if ``schema`` is specified in the class),
3170 string in ``'XXX...'B`` form, or
3171 :py:class:`pyderasn.BitString` object
3172 :param bytes impl: override default tag with ``IMPLICIT`` one
3173 :param bytes expl: override default tag with ``EXPLICIT`` one
3174 :param default: set default value. Type same as in ``value``
3175 :param bool optional: is object ``OPTIONAL`` in sequence
3177 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3178 specs = getattr(self, "schema", {}) if _specs is None else _specs
3179 self.specs = specs if specs.__class__ == dict else dict(specs)
3180 self._value = None if value is None else self._value_sanitize(value)
3181 if default is not None:
3182 default = self._value_sanitize(default)
3183 self.default = self.__class__(
3189 self._value = default
3191 tag_klass, _, tag_num = tag_decode(self.tag)
3192 self.tag_constructed = tag_encode(
3194 form=TagFormConstructed,
3198 def _bits2octets(self, bits):
3199 if len(self.specs) > 0:
3200 bits = bits.rstrip("0")
3202 bits += "0" * ((8 - (bit_len % 8)) % 8)
3203 octets = bytearray(len(bits) // 8)
3204 for i in six_xrange(len(octets)):
3205 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3206 return bit_len, bytes(octets)
3208 def _value_sanitize(self, value):
3209 if isinstance(value, (string_types, binary_type)):
3211 isinstance(value, string_types) and
3212 value.startswith("'")
3214 if value.endswith("'B"):
3216 if not frozenset(value) <= SET01:
3217 raise ValueError("B's coding contains unacceptable chars")
3218 return self._bits2octets(value)
3219 if value.endswith("'H"):
3223 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3225 if value.__class__ == binary_type:
3226 return (len(value) * 8, value)
3227 raise InvalidValueType((self.__class__, string_types, binary_type))
3228 if value.__class__ == tuple:
3231 isinstance(value[0], integer_types) and
3232 value[1].__class__ == binary_type
3237 bit = self.specs.get(name)
3239 raise ObjUnknown("BitString value: %s" % name)
3242 return self._bits2octets("")
3243 bits = frozenset(bits)
3244 return self._bits2octets("".join(
3245 ("1" if bit in bits else "0")
3246 for bit in six_xrange(max(bits) + 1)
3248 if issubclass(value.__class__, BitString):
3250 raise InvalidValueType((self.__class__, binary_type, string_types))
3254 return self._value is not None
3256 def __getstate__(self):
3257 return BitStringState(
3272 self.tag_constructed,
3276 def __setstate__(self, state):
3277 super(BitString, self).__setstate__(state)
3278 self.specs = state.specs
3279 self._value = state.value
3280 self.tag_constructed = state.tag_constructed
3281 self.defined = state.defined
3284 self._assert_ready()
3285 for i in six_xrange(self._value[0]):
3290 """Returns number of bits in the string
3292 self._assert_ready()
3293 return self._value[0]
3295 def __bytes__(self):
3296 self._assert_ready()
3297 return self._value[1]
3299 def __eq__(self, their):
3300 if their.__class__ == bytes:
3301 return self._value[1] == their
3302 if not issubclass(their.__class__, BitString):
3305 self._value == their._value and
3306 self.tag == their.tag and
3307 self._expl == their._expl
3312 """Named representation (if exists) of the bits
3314 :returns: [str(name), ...]
3316 return [name for name, bit in iteritems(self.specs) if self[bit]]
3326 return self.__class__(
3328 impl=self.tag if impl is None else impl,
3329 expl=self._expl if expl is None else expl,
3330 default=self.default if default is None else default,
3331 optional=self.optional if optional is None else optional,
3335 def __getitem__(self, key):
3336 if key.__class__ == int:
3337 bit_len, octets = self._value
3341 byte2int(memoryview(octets)[key // 8:]) >>
3344 if isinstance(key, string_types):
3345 value = self.specs.get(key)
3347 raise ObjUnknown("BitString value: %s" % key)
3349 raise InvalidValueType((int, str))
3352 self._assert_ready()
3353 bit_len, octets = self._value
3356 len_encode(len(octets) + 1),
3357 int2byte((8 - bit_len % 8) % 8),
3361 def _encode1st(self, state):
3362 self._assert_ready()
3363 _, octets = self._value
3365 return len(self.tag) + len_size(l) + l, state
3367 def _encode2nd(self, writer, state_iter):
3368 bit_len, octets = self._value
3369 write_full(writer, b"".join((
3371 len_encode(len(octets) + 1),
3372 int2byte((8 - bit_len % 8) % 8),
3374 write_full(writer, octets)
3376 def _encode_cer(self, writer):
3377 bit_len, octets = self._value
3378 if len(octets) + 1 <= 1000:
3379 write_full(writer, self._encode())
3381 write_full(writer, self.tag_constructed)
3382 write_full(writer, LENINDEF)
3383 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3384 write_full(writer, b"".join((
3385 BitString.tag_default,
3388 octets[offset:offset + 999],
3390 tail = octets[offset + 999:]
3392 tail = int2byte((8 - bit_len % 8) % 8) + tail
3393 write_full(writer, b"".join((
3394 BitString.tag_default,
3395 len_encode(len(tail)),
3398 write_full(writer, EOC)
3400 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3402 t, tlen, lv = tag_strip(tlv)
3403 except DecodeError as err:
3404 raise err.__class__(
3406 klass=self.__class__,
3407 decode_path=decode_path,
3411 if tag_only: # pragma: no cover
3415 l, llen, v = len_decode(lv)
3416 except DecodeError as err:
3417 raise err.__class__(
3419 klass=self.__class__,
3420 decode_path=decode_path,
3424 raise NotEnoughData(
3425 "encoded length is longer than data",
3426 klass=self.__class__,
3427 decode_path=decode_path,
3431 raise NotEnoughData(
3433 klass=self.__class__,
3434 decode_path=decode_path,
3437 pad_size = byte2int(v)
3438 if l == 1 and pad_size != 0:
3440 "invalid empty value",
3441 klass=self.__class__,
3442 decode_path=decode_path,
3448 klass=self.__class__,
3449 decode_path=decode_path,
3452 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3455 klass=self.__class__,
3456 decode_path=decode_path,
3459 v, tail = v[:l], v[l:]
3460 bit_len = (len(v) - 1) * 8 - pad_size
3461 obj = self.__class__(
3462 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3465 default=self.default,
3466 optional=self.optional,
3468 _decoded=(offset, llen, l),
3471 obj._value = (bit_len, None)
3472 yield decode_path, obj, tail
3474 if t != self.tag_constructed:
3476 klass=self.__class__,
3477 decode_path=decode_path,
3480 if not ctx.get("bered", False):
3482 "unallowed BER constructed encoding",
3483 klass=self.__class__,
3484 decode_path=decode_path,
3487 if tag_only: # pragma: no cover
3492 l, llen, v = len_decode(lv)
3493 except LenIndefForm:
3494 llen, l, v = 1, 0, lv[1:]
3496 except DecodeError as err:
3497 raise err.__class__(
3499 klass=self.__class__,
3500 decode_path=decode_path,
3504 raise NotEnoughData(
3505 "encoded length is longer than data",
3506 klass=self.__class__,
3507 decode_path=decode_path,
3510 if not lenindef and l == 0:
3511 raise NotEnoughData(
3513 klass=self.__class__,
3514 decode_path=decode_path,
3518 sub_offset = offset + tlen + llen
3522 if v[:EOC_LEN].tobytes() == EOC:
3529 "chunk out of bounds",
3530 klass=self.__class__,
3531 decode_path=decode_path + (str(len(chunks) - 1),),
3532 offset=chunks[-1].offset,
3534 sub_decode_path = decode_path + (str(len(chunks)),)
3537 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3540 decode_path=sub_decode_path,
3543 _ctx_immutable=False,
3545 yield _decode_path, chunk, v_tail
3547 _, chunk, v_tail = next(BitString().decode_evgen(
3550 decode_path=sub_decode_path,
3553 _ctx_immutable=False,
3558 "expected BitString encoded chunk",
3559 klass=self.__class__,
3560 decode_path=sub_decode_path,
3563 chunks.append(chunk)
3564 sub_offset += chunk.tlvlen
3565 vlen += chunk.tlvlen
3567 if len(chunks) == 0:
3570 klass=self.__class__,
3571 decode_path=decode_path,
3576 for chunk_i, chunk in enumerate(chunks[:-1]):
3577 if chunk.bit_len % 8 != 0:
3579 "BitString chunk is not multiple of 8 bits",
3580 klass=self.__class__,
3581 decode_path=decode_path + (str(chunk_i),),
3582 offset=chunk.offset,
3585 values.append(bytes(chunk))
3586 bit_len += chunk.bit_len
3587 chunk_last = chunks[-1]
3589 values.append(bytes(chunk_last))
3590 bit_len += chunk_last.bit_len
3591 obj = self.__class__(
3592 value=None if evgen_mode else (bit_len, b"".join(values)),
3595 default=self.default,
3596 optional=self.optional,
3598 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3601 obj._value = (bit_len, None)
3602 obj.lenindef = lenindef
3603 obj.ber_encoded = True
3604 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3607 return pp_console_row(next(self.pps()))
3609 def pps(self, decode_path=()):
3613 bit_len, blob = self._value
3614 value = "%d bits" % bit_len
3615 if len(self.specs) > 0 and blob is not None:
3616 blob = tuple(self.named)
3619 asn1_type_name=self.asn1_type_name,
3620 obj_name=self.__class__.__name__,
3621 decode_path=decode_path,
3624 optional=self.optional,
3625 default=self == self.default,
3626 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3627 expl=None if self._expl is None else tag_decode(self._expl),
3632 expl_offset=self.expl_offset if self.expled else None,
3633 expl_tlen=self.expl_tlen if self.expled else None,
3634 expl_llen=self.expl_llen if self.expled else None,
3635 expl_vlen=self.expl_vlen if self.expled else None,
3636 expl_lenindef=self.expl_lenindef,
3637 lenindef=self.lenindef,
3638 ber_encoded=self.ber_encoded,
3641 defined_by, defined = self.defined or (None, None)
3642 if defined_by is not None:
3644 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3646 for pp in self.pps_lenindef(decode_path):
3650 OctetStringState = namedtuple(
3652 BasicState._fields + (
3663 class OctetString(Obj):
3664 """``OCTET STRING`` binary string type
3666 >>> s = OctetString(b"hello world")
3667 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3668 >>> s == OctetString(b"hello world")
3673 >>> OctetString(b"hello", bounds=(4, 4))
3674 Traceback (most recent call last):
3675 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3676 >>> OctetString(b"hell", bounds=(4, 4))
3677 OCTET STRING 4 bytes 68656c6c
3679 Memoryviews can be used as a values. If memoryview is made on
3680 mmap-ed file, then it does not take storage inside OctetString
3681 itself. In CER encoding mode it will be streamed to the specified
3682 writer, copying 1 KB chunks.
3684 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3685 tag_default = tag_encode(4)
3686 asn1_type_name = "OCTET STRING"
3687 evgen_mode_skip_value = True
3701 :param value: set the value. Either binary type, or
3702 :py:class:`pyderasn.OctetString` object
3703 :param bounds: set ``(MIN, MAX)`` value size constraint.
3704 (-inf, +inf) by default
3705 :param bytes impl: override default tag with ``IMPLICIT`` one
3706 :param bytes expl: override default tag with ``EXPLICIT`` one
3707 :param default: set default value. Type same as in ``value``
3708 :param bool optional: is object ``OPTIONAL`` in sequence
3710 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3712 self._bound_min, self._bound_max = getattr(
3716 ) if bounds is None else bounds
3717 if value is not None:
3718 self._value = self._value_sanitize(value)
3719 if default is not None:
3720 default = self._value_sanitize(default)
3721 self.default = self.__class__(
3726 if self._value is None:
3727 self._value = default
3729 tag_klass, _, tag_num = tag_decode(self.tag)
3730 self.tag_constructed = tag_encode(
3732 form=TagFormConstructed,
3736 def _value_sanitize(self, value):
3737 if value.__class__ == binary_type or value.__class__ == memoryview:
3739 elif issubclass(value.__class__, OctetString):
3740 value = value._value
3742 raise InvalidValueType((self.__class__, bytes, memoryview))
3743 if not self._bound_min <= len(value) <= self._bound_max:
3744 raise BoundsError(self._bound_min, len(value), self._bound_max)
3749 return self._value is not None
3751 def __getstate__(self):
3752 return OctetStringState(
3768 self.tag_constructed,
3772 def __setstate__(self, state):
3773 super(OctetString, self).__setstate__(state)
3774 self._value = state.value
3775 self._bound_min = state.bound_min
3776 self._bound_max = state.bound_max
3777 self.tag_constructed = state.tag_constructed
3778 self.defined = state.defined
3780 def __bytes__(self):
3781 self._assert_ready()
3782 return bytes(self._value)
3784 def __eq__(self, their):
3785 if their.__class__ == binary_type:
3786 return self._value == their
3787 if not issubclass(their.__class__, OctetString):
3790 self._value == their._value and
3791 self.tag == their.tag and
3792 self._expl == their._expl
3795 def __lt__(self, their):
3796 return self._value < their._value
3807 return self.__class__(
3810 (self._bound_min, self._bound_max)
3811 if bounds is None else bounds
3813 impl=self.tag if impl is None else impl,
3814 expl=self._expl if expl is None else expl,
3815 default=self.default if default is None else default,
3816 optional=self.optional if optional is None else optional,
3820 self._assert_ready()
3823 len_encode(len(self._value)),
3827 def _encode1st(self, state):
3828 self._assert_ready()
3829 l = len(self._value)
3830 return len(self.tag) + len_size(l) + l, state
3832 def _encode2nd(self, writer, state_iter):
3834 write_full(writer, self.tag + len_encode(len(value)))
3835 write_full(writer, value)
3837 def _encode_cer(self, writer):
3838 octets = self._value
3839 if len(octets) <= 1000:
3840 write_full(writer, self._encode())
3842 write_full(writer, self.tag_constructed)
3843 write_full(writer, LENINDEF)
3844 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3845 write_full(writer, b"".join((
3846 OctetString.tag_default,
3848 octets[offset:offset + 1000],
3850 tail = octets[offset + 1000:]
3852 write_full(writer, b"".join((
3853 OctetString.tag_default,
3854 len_encode(len(tail)),
3857 write_full(writer, EOC)
3859 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3861 t, tlen, lv = tag_strip(tlv)
3862 except DecodeError as err:
3863 raise err.__class__(
3865 klass=self.__class__,
3866 decode_path=decode_path,
3874 l, llen, v = len_decode(lv)
3875 except DecodeError as err:
3876 raise err.__class__(
3878 klass=self.__class__,
3879 decode_path=decode_path,
3883 raise NotEnoughData(
3884 "encoded length is longer than data",
3885 klass=self.__class__,
3886 decode_path=decode_path,
3889 v, tail = v[:l], v[l:]
3890 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3892 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3893 klass=self.__class__,
3894 decode_path=decode_path,
3898 obj = self.__class__(
3900 None if (evgen_mode and self.evgen_mode_skip_value)
3903 bounds=(self._bound_min, self._bound_max),
3906 default=self.default,
3907 optional=self.optional,
3908 _decoded=(offset, llen, l),
3911 except DecodeError as err:
3914 klass=self.__class__,
3915 decode_path=decode_path,
3918 except BoundsError as err:
3921 klass=self.__class__,
3922 decode_path=decode_path,
3925 yield decode_path, obj, tail
3927 if t != self.tag_constructed:
3929 klass=self.__class__,
3930 decode_path=decode_path,
3933 if not ctx.get("bered", False):
3935 "unallowed BER constructed encoding",
3936 klass=self.__class__,
3937 decode_path=decode_path,
3945 l, llen, v = len_decode(lv)
3946 except LenIndefForm:
3947 llen, l, v = 1, 0, lv[1:]
3949 except DecodeError as err:
3950 raise err.__class__(
3952 klass=self.__class__,
3953 decode_path=decode_path,
3957 raise NotEnoughData(
3958 "encoded length is longer than data",
3959 klass=self.__class__,
3960 decode_path=decode_path,
3965 sub_offset = offset + tlen + llen
3970 if v[:EOC_LEN].tobytes() == EOC:
3977 "chunk out of bounds",
3978 klass=self.__class__,
3979 decode_path=decode_path + (str(len(chunks) - 1),),
3980 offset=chunks[-1].offset,
3984 sub_decode_path = decode_path + (str(chunks_count),)
3985 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3988 decode_path=sub_decode_path,
3991 _ctx_immutable=False,
3993 yield _decode_path, chunk, v_tail
3994 if not chunk.ber_encoded:
3995 payload_len += chunk.vlen
3998 sub_decode_path = decode_path + (str(len(chunks)),)
3999 _, chunk, v_tail = next(OctetString().decode_evgen(
4002 decode_path=sub_decode_path,
4005 _ctx_immutable=False,
4008 chunks.append(chunk)
4011 "expected OctetString encoded chunk",
4012 klass=self.__class__,
4013 decode_path=sub_decode_path,
4016 sub_offset += chunk.tlvlen
4017 vlen += chunk.tlvlen
4019 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4021 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4022 klass=self.__class__,
4023 decode_path=decode_path,
4027 obj = self.__class__(
4029 None if evgen_mode else
4030 b"".join(bytes(chunk) for chunk in chunks)
4032 bounds=(self._bound_min, self._bound_max),
4035 default=self.default,
4036 optional=self.optional,
4037 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4040 except DecodeError as err:
4043 klass=self.__class__,
4044 decode_path=decode_path,
4047 except BoundsError as err:
4050 klass=self.__class__,
4051 decode_path=decode_path,
4054 obj.lenindef = lenindef
4055 obj.ber_encoded = True
4056 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4059 return pp_console_row(next(self.pps()))
4061 def pps(self, decode_path=()):
4064 asn1_type_name=self.asn1_type_name,
4065 obj_name=self.__class__.__name__,
4066 decode_path=decode_path,
4067 value=("%d bytes" % len(self._value)) if self.ready else None,
4068 blob=self._value if self.ready else None,
4069 optional=self.optional,
4070 default=self == self.default,
4071 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4072 expl=None if self._expl is None else tag_decode(self._expl),
4077 expl_offset=self.expl_offset if self.expled else None,
4078 expl_tlen=self.expl_tlen if self.expled else None,
4079 expl_llen=self.expl_llen if self.expled else None,
4080 expl_vlen=self.expl_vlen if self.expled else None,
4081 expl_lenindef=self.expl_lenindef,
4082 lenindef=self.lenindef,
4083 ber_encoded=self.ber_encoded,
4086 defined_by, defined = self.defined or (None, None)
4087 if defined_by is not None:
4089 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4091 for pp in self.pps_lenindef(decode_path):
4095 def agg_octet_string(evgens, decode_path, raw, writer):
4096 """Aggregate constructed string (OctetString and its derivatives)
4098 :param evgens: iterator of generated events
4099 :param decode_path: points to the string we want to decode
4100 :param raw: slicebable (memoryview, bytearray, etc) with
4101 the data evgens are generated on
4102 :param writer: buffer.write where string is going to be saved
4103 :param writer: where string is going to be saved. Must comply
4104 with ``io.RawIOBase.write`` behaviour
4106 .. seealso:: :ref:`agg_octet_string`
4108 decode_path_len = len(decode_path)
4109 for dp, obj, _ in evgens:
4110 if dp[:decode_path_len] != decode_path:
4112 if not obj.ber_encoded:
4113 write_full(writer, raw[
4114 obj.offset + obj.tlen + obj.llen:
4115 obj.offset + obj.tlen + obj.llen + obj.vlen -
4116 (EOC_LEN if obj.expl_lenindef else 0)
4118 if len(dp) == decode_path_len:
4122 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4126 """``NULL`` null object
4134 tag_default = tag_encode(5)
4135 asn1_type_name = "NULL"
4139 value=None, # unused, but Sequence passes it
4146 :param bytes impl: override default tag with ``IMPLICIT`` one
4147 :param bytes expl: override default tag with ``EXPLICIT`` one
4148 :param bool optional: is object ``OPTIONAL`` in sequence
4150 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4157 def __getstate__(self):
4173 def __eq__(self, their):
4174 if not issubclass(their.__class__, Null):
4177 self.tag == their.tag and
4178 self._expl == their._expl
4188 return self.__class__(
4189 impl=self.tag if impl is None else impl,
4190 expl=self._expl if expl is None else expl,
4191 optional=self.optional if optional is None else optional,
4195 return self.tag + LEN0
4197 def _encode1st(self, state):
4198 return len(self.tag) + 1, state
4200 def _encode2nd(self, writer, state_iter):
4201 write_full(writer, self.tag + LEN0)
4203 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4205 t, _, lv = tag_strip(tlv)
4206 except DecodeError as err:
4207 raise err.__class__(
4209 klass=self.__class__,
4210 decode_path=decode_path,
4215 klass=self.__class__,
4216 decode_path=decode_path,
4219 if tag_only: # pragma: no cover
4223 l, _, v = len_decode(lv)
4224 except DecodeError as err:
4225 raise err.__class__(
4227 klass=self.__class__,
4228 decode_path=decode_path,
4232 raise InvalidLength(
4233 "Null must have zero length",
4234 klass=self.__class__,
4235 decode_path=decode_path,
4238 obj = self.__class__(
4241 optional=self.optional,
4242 _decoded=(offset, 1, 0),
4244 yield decode_path, obj, v
4247 return pp_console_row(next(self.pps()))
4249 def pps(self, decode_path=()):
4252 asn1_type_name=self.asn1_type_name,
4253 obj_name=self.__class__.__name__,
4254 decode_path=decode_path,
4255 optional=self.optional,
4256 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4257 expl=None if self._expl is None else tag_decode(self._expl),
4262 expl_offset=self.expl_offset if self.expled else None,
4263 expl_tlen=self.expl_tlen if self.expled else None,
4264 expl_llen=self.expl_llen if self.expled else None,
4265 expl_vlen=self.expl_vlen if self.expled else None,
4266 expl_lenindef=self.expl_lenindef,
4269 for pp in self.pps_lenindef(decode_path):
4273 ObjectIdentifierState = namedtuple(
4274 "ObjectIdentifierState",
4275 BasicState._fields + ("value", "defines"),
4280 class ObjectIdentifier(Obj):
4281 """``OBJECT IDENTIFIER`` OID type
4283 >>> oid = ObjectIdentifier((1, 2, 3))
4284 OBJECT IDENTIFIER 1.2.3
4285 >>> oid == ObjectIdentifier("1.2.3")
4291 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4292 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4294 >>> str(ObjectIdentifier((3, 1)))
4295 Traceback (most recent call last):
4296 pyderasn.InvalidOID: unacceptable first arc value
4298 __slots__ = ("defines",)
4299 tag_default = tag_encode(6)
4300 asn1_type_name = "OBJECT IDENTIFIER"
4313 :param value: set the value. Either tuples of integers,
4314 string of "."-concatenated integers, or
4315 :py:class:`pyderasn.ObjectIdentifier` object
4316 :param defines: sequence of tuples. Each tuple has two elements.
4317 First one is relative to current one decode
4318 path, aiming to the field defined by that OID.
4319 Read about relative path in
4320 :py:func:`pyderasn.abs_decode_path`. Second
4321 tuple element is ``{OID: pyderasn.Obj()}``
4322 dictionary, mapping between current OID value
4323 and structure applied to defined field.
4325 .. seealso:: :ref:`definedby`
4327 :param bytes impl: override default tag with ``IMPLICIT`` one
4328 :param bytes expl: override default tag with ``EXPLICIT`` one
4329 :param default: set default value. Type same as in ``value``
4330 :param bool optional: is object ``OPTIONAL`` in sequence
4332 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4334 if value is not None:
4335 self._value = self._value_sanitize(value)
4336 if default is not None:
4337 default = self._value_sanitize(default)
4338 self.default = self.__class__(
4343 if self._value is None:
4344 self._value = default
4345 self.defines = defines
4347 def __add__(self, their):
4348 if their.__class__ == tuple:
4349 return self.__class__(self._value + array("L", their))
4350 if isinstance(their, self.__class__):
4351 return self.__class__(self._value + their._value)
4352 raise InvalidValueType((self.__class__, tuple))
4354 def _value_sanitize(self, value):
4355 if issubclass(value.__class__, ObjectIdentifier):
4357 if isinstance(value, string_types):
4359 value = array("L", (pureint(arc) for arc in value.split(".")))
4361 raise InvalidOID("unacceptable arcs values")
4362 if value.__class__ == tuple:
4364 value = array("L", value)
4365 except OverflowError as err:
4366 raise InvalidOID(repr(err))
4367 if value.__class__ is array:
4369 raise InvalidOID("less than 2 arcs")
4370 first_arc = value[0]
4371 if first_arc in (0, 1):
4372 if not (0 <= value[1] <= 39):
4373 raise InvalidOID("second arc is too wide")
4374 elif first_arc == 2:
4377 raise InvalidOID("unacceptable first arc value")
4378 if not all(arc >= 0 for arc in value):
4379 raise InvalidOID("negative arc value")
4381 raise InvalidValueType((self.__class__, str, tuple))
4385 return self._value is not None
4387 def __getstate__(self):
4388 return ObjectIdentifierState(
4405 def __setstate__(self, state):
4406 super(ObjectIdentifier, self).__setstate__(state)
4407 self._value = state.value
4408 self.defines = state.defines
4411 self._assert_ready()
4412 return iter(self._value)
4415 return ".".join(str(arc) for arc in self._value or ())
4418 self._assert_ready()
4419 return hash(b"".join((
4421 bytes(self._expl or b""),
4422 str(self._value).encode("ascii"),
4425 def __eq__(self, their):
4426 if their.__class__ == tuple:
4427 return self._value == array("L", their)
4428 if not issubclass(their.__class__, ObjectIdentifier):
4431 self.tag == their.tag and
4432 self._expl == their._expl and
4433 self._value == their._value
4436 def __lt__(self, their):
4437 return self._value < their._value
4448 return self.__class__(
4450 defines=self.defines if defines is None else defines,
4451 impl=self.tag if impl is None else impl,
4452 expl=self._expl if expl is None else expl,
4453 default=self.default if default is None else default,
4454 optional=self.optional if optional is None else optional,
4457 def _encode_octets(self):
4458 self._assert_ready()
4460 first_value = value[1]
4461 first_arc = value[0]
4464 elif first_arc == 1:
4466 elif first_arc == 2:
4468 else: # pragma: no cover
4469 raise RuntimeError("invalid arc is stored")
4470 octets = [zero_ended_encode(first_value)]
4471 for arc in value[2:]:
4472 octets.append(zero_ended_encode(arc))
4473 return b"".join(octets)
4476 v = self._encode_octets()
4477 return b"".join((self.tag, len_encode(len(v)), v))
4479 def _encode1st(self, state):
4480 l = len(self._encode_octets())
4481 return len(self.tag) + len_size(l) + l, state
4483 def _encode2nd(self, writer, state_iter):
4484 write_full(writer, self._encode())
4486 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4488 t, _, lv = tag_strip(tlv)
4489 except DecodeError as err:
4490 raise err.__class__(
4492 klass=self.__class__,
4493 decode_path=decode_path,
4498 klass=self.__class__,
4499 decode_path=decode_path,
4502 if tag_only: # pragma: no cover
4506 l, llen, v = len_decode(lv)
4507 except DecodeError as err:
4508 raise err.__class__(
4510 klass=self.__class__,
4511 decode_path=decode_path,
4515 raise NotEnoughData(
4516 "encoded length is longer than data",
4517 klass=self.__class__,
4518 decode_path=decode_path,
4522 raise NotEnoughData(
4524 klass=self.__class__,
4525 decode_path=decode_path,
4528 v, tail = v[:l], v[l:]
4535 octet = indexbytes(v, i)
4536 if i == 0 and octet == 0x80:
4537 if ctx.get("bered", False):
4541 "non normalized arc encoding",
4542 klass=self.__class__,
4543 decode_path=decode_path,
4546 arc = (arc << 7) | (octet & 0x7F)
4547 if octet & 0x80 == 0:
4550 except OverflowError:
4552 "too huge value for local unsigned long",
4553 klass=self.__class__,
4554 decode_path=decode_path,
4563 klass=self.__class__,
4564 decode_path=decode_path,
4568 second_arc = arcs[0]
4569 if 0 <= second_arc <= 39:
4571 elif 40 <= second_arc <= 79:
4577 obj = self.__class__(
4578 value=array("L", (first_arc, second_arc)) + arcs[1:],
4581 default=self.default,
4582 optional=self.optional,
4583 _decoded=(offset, llen, l),
4586 obj.ber_encoded = True
4587 yield decode_path, obj, tail
4590 return pp_console_row(next(self.pps()))
4592 def pps(self, decode_path=()):
4595 asn1_type_name=self.asn1_type_name,
4596 obj_name=self.__class__.__name__,
4597 decode_path=decode_path,
4598 value=str(self) if self.ready else None,
4599 optional=self.optional,
4600 default=self == self.default,
4601 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4602 expl=None if self._expl is None else tag_decode(self._expl),
4607 expl_offset=self.expl_offset if self.expled else None,
4608 expl_tlen=self.expl_tlen if self.expled else None,
4609 expl_llen=self.expl_llen if self.expled else None,
4610 expl_vlen=self.expl_vlen if self.expled else None,
4611 expl_lenindef=self.expl_lenindef,
4612 ber_encoded=self.ber_encoded,
4615 for pp in self.pps_lenindef(decode_path):
4619 class Enumerated(Integer):
4620 """``ENUMERATED`` integer type
4622 This type is identical to :py:class:`pyderasn.Integer`, but requires
4623 schema to be specified and does not accept values missing from it.
4626 tag_default = tag_encode(10)
4627 asn1_type_name = "ENUMERATED"
4638 bounds=None, # dummy argument, workability for Integer.decode
4640 super(Enumerated, self).__init__(
4641 value, bounds, impl, expl, default, optional, _specs, _decoded,
4643 if len(self.specs) == 0:
4644 raise ValueError("schema must be specified")
4646 def _value_sanitize(self, value):
4647 if isinstance(value, self.__class__):
4648 value = value._value
4649 elif isinstance(value, integer_types):
4650 for _value in itervalues(self.specs):
4655 "unknown integer value: %s" % value,
4656 klass=self.__class__,
4658 elif isinstance(value, string_types):
4659 value = self.specs.get(value)
4661 raise ObjUnknown("integer value: %s" % value)
4663 raise InvalidValueType((self.__class__, int, str))
4675 return self.__class__(
4677 impl=self.tag if impl is None else impl,
4678 expl=self._expl if expl is None else expl,
4679 default=self.default if default is None else default,
4680 optional=self.optional if optional is None else optional,
4685 def escape_control_unicode(c):
4686 if unicat(c)[0] == "C":
4687 c = repr(c).lstrip("u").strip("'")
4691 class CommonString(OctetString):
4692 """Common class for all strings
4694 Everything resembles :py:class:`pyderasn.OctetString`, except
4695 ability to deal with unicode text strings.
4697 >>> hexenc("привет мир".encode("utf-8"))
4698 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4699 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4701 >>> s = UTF8String("привет мир")
4702 UTF8String UTF8String привет мир
4704 'привет мир'
4705 >>> hexenc(bytes(s))
4706 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4708 >>> PrintableString("привет мир")
4709 Traceback (most recent call last):
4710 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4712 >>> BMPString("ада", bounds=(2, 2))
4713 Traceback (most recent call last):
4714 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4715 >>> s = BMPString("ад", bounds=(2, 2))
4718 >>> hexenc(bytes(s))
4725 - Text Encoding, validation
4726 * - :py:class:`pyderasn.UTF8String`
4728 * - :py:class:`pyderasn.NumericString`
4729 - proper alphabet validation
4730 * - :py:class:`pyderasn.PrintableString`
4731 - proper alphabet validation
4732 * - :py:class:`pyderasn.TeletexString`
4734 * - :py:class:`pyderasn.T61String`
4736 * - :py:class:`pyderasn.VideotexString`
4738 * - :py:class:`pyderasn.IA5String`
4739 - proper alphabet validation
4740 * - :py:class:`pyderasn.GraphicString`
4742 * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4743 - proper alphabet validation
4744 * - :py:class:`pyderasn.GeneralString`
4746 * - :py:class:`pyderasn.UniversalString`
4748 * - :py:class:`pyderasn.BMPString`
4753 def _value_sanitize(self, value):
4755 value_decoded = None
4756 if isinstance(value, self.__class__):
4757 value_raw = value._value
4758 elif value.__class__ == text_type:
4759 value_decoded = value
4760 elif value.__class__ == binary_type:
4763 raise InvalidValueType((self.__class__, text_type, binary_type))
4766 value_decoded.encode(self.encoding)
4767 if value_raw is None else value_raw
4770 value_raw.decode(self.encoding)
4771 if value_decoded is None else value_decoded
4773 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4774 raise DecodeError(str(err))
4775 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4783 def __eq__(self, their):
4784 if their.__class__ == binary_type:
4785 return self._value == their
4786 if their.__class__ == text_type:
4787 return self._value == their.encode(self.encoding)
4788 if not isinstance(their, self.__class__):
4791 self._value == their._value and
4792 self.tag == their.tag and
4793 self._expl == their._expl
4796 def __unicode__(self):
4798 return self._value.decode(self.encoding)
4799 return text_type(self._value)
4802 return pp_console_row(next(self.pps(no_unicode=PY2)))
4804 def pps(self, decode_path=(), no_unicode=False):
4808 hexenc(bytes(self)) if no_unicode else
4809 "".join(escape_control_unicode(c) for c in self.__unicode__())
4813 asn1_type_name=self.asn1_type_name,
4814 obj_name=self.__class__.__name__,
4815 decode_path=decode_path,
4817 optional=self.optional,
4818 default=self == self.default,
4819 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4820 expl=None if self._expl is None else tag_decode(self._expl),
4825 expl_offset=self.expl_offset if self.expled else None,
4826 expl_tlen=self.expl_tlen if self.expled else None,
4827 expl_llen=self.expl_llen if self.expled else None,
4828 expl_vlen=self.expl_vlen if self.expled else None,
4829 expl_lenindef=self.expl_lenindef,
4830 ber_encoded=self.ber_encoded,
4833 for pp in self.pps_lenindef(decode_path):
4837 class UTF8String(CommonString):
4839 tag_default = tag_encode(12)
4841 asn1_type_name = "UTF8String"
4844 class AllowableCharsMixin(object):
4846 def allowable_chars(self):
4848 return self._allowable_chars
4849 return frozenset(six_unichr(c) for c in self._allowable_chars)
4851 def _value_sanitize(self, value):
4852 value = super(AllowableCharsMixin, self)._value_sanitize(value)
4853 if not frozenset(value) <= self._allowable_chars:
4854 raise DecodeError("non satisfying alphabet value")
4858 class NumericString(AllowableCharsMixin, CommonString):
4861 Its value is properly sanitized: only ASCII digits with spaces can
4864 >>> NumericString().allowable_chars
4865 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4868 tag_default = tag_encode(18)
4870 asn1_type_name = "NumericString"
4871 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4874 PrintableStringState = namedtuple(
4875 "PrintableStringState",
4876 OctetStringState._fields + ("allowable_chars",),
4881 class PrintableString(AllowableCharsMixin, CommonString):
4884 Its value is properly sanitized: see X.680 41.4 table 10.
4886 >>> PrintableString().allowable_chars
4887 frozenset([' ', "'", ..., 'z'])
4888 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4889 PrintableString PrintableString foo*bar
4890 >>> obj.allow_asterisk, obj.allow_ampersand
4894 tag_default = tag_encode(19)
4896 asn1_type_name = "PrintableString"
4897 _allowable_chars = frozenset(
4898 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4900 _asterisk = frozenset("*".encode("ascii"))
4901 _ampersand = frozenset("&".encode("ascii"))
4913 allow_asterisk=False,
4914 allow_ampersand=False,
4917 :param allow_asterisk: allow asterisk character
4918 :param allow_ampersand: allow ampersand character
4921 self._allowable_chars |= self._asterisk
4923 self._allowable_chars |= self._ampersand
4924 super(PrintableString, self).__init__(
4925 value, bounds, impl, expl, default, optional, _decoded, ctx,
4929 def allow_asterisk(self):
4930 """Is asterisk character allowed?
4932 return self._asterisk <= self._allowable_chars
4935 def allow_ampersand(self):
4936 """Is ampersand character allowed?
4938 return self._ampersand <= self._allowable_chars
4940 def __getstate__(self):
4941 return PrintableStringState(
4942 *super(PrintableString, self).__getstate__(),
4943 **{"allowable_chars": self._allowable_chars}
4946 def __setstate__(self, state):
4947 super(PrintableString, self).__setstate__(state)
4948 self._allowable_chars = state.allowable_chars
4959 return self.__class__(
4962 (self._bound_min, self._bound_max)
4963 if bounds is None else bounds
4965 impl=self.tag if impl is None else impl,
4966 expl=self._expl if expl is None else expl,
4967 default=self.default if default is None else default,
4968 optional=self.optional if optional is None else optional,
4969 allow_asterisk=self.allow_asterisk,
4970 allow_ampersand=self.allow_ampersand,
4974 class TeletexString(CommonString):
4976 tag_default = tag_encode(20)
4977 encoding = "iso-8859-1"
4978 asn1_type_name = "TeletexString"
4981 class T61String(TeletexString):
4983 asn1_type_name = "T61String"
4986 class VideotexString(CommonString):
4988 tag_default = tag_encode(21)
4989 encoding = "iso-8859-1"
4990 asn1_type_name = "VideotexString"
4993 class IA5String(AllowableCharsMixin, CommonString):
4996 Its value is properly sanitized: it is a mix of
4998 * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4999 * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
5000 * DEL character (0x7F)
5002 It is just 7-bit ASCII.
5004 >>> IA5String().allowable_chars
5005 frozenset(["NUL", ... "DEL"])
5008 tag_default = tag_encode(22)
5010 asn1_type_name = "IA5"
5011 _allowable_chars = frozenset(b"".join(
5012 six_unichr(c).encode("ascii") for c in six_xrange(128)
5016 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5017 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5018 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5019 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5020 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5021 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5024 class VisibleString(AllowableCharsMixin, CommonString):
5027 Its value is properly sanitized. ASCII subset from space to tilde is
5028 allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5030 >>> VisibleString().allowable_chars
5031 frozenset([" ", ... "~"])
5034 tag_default = tag_encode(26)
5036 asn1_type_name = "VisibleString"
5037 _allowable_chars = frozenset(b"".join(
5038 six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5042 class ISO646String(VisibleString):
5044 asn1_type_name = "ISO646String"
5047 UTCTimeState = namedtuple(
5049 OctetStringState._fields + ("ber_raw",),
5054 def str_to_time_fractions(value):
5056 year, v = (v // 10**10), (v % 10**10)
5057 month, v = (v // 10**8), (v % 10**8)
5058 day, v = (v // 10**6), (v % 10**6)
5059 hour, v = (v // 10**4), (v % 10**4)
5060 minute, second = (v // 100), (v % 100)
5061 return year, month, day, hour, minute, second
5064 class UTCTime(VisibleString):
5065 """``UTCTime`` datetime type
5067 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5068 UTCTime UTCTime 2017-09-30T22:07:50
5074 datetime.datetime(2017, 9, 30, 22, 7, 50)
5075 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5076 datetime.datetime(1957, 9, 30, 22, 7, 50)
5078 If BER encoded value was met, then ``ber_raw`` attribute will hold
5079 its raw representation.
5083 Only **naive** ``datetime`` objects are supported.
5084 Library assumes that all work is done in UTC.
5088 Pay attention that ``UTCTime`` can not hold full year, so all years
5089 having < 50 years are treated as 20xx, 19xx otherwise, according to
5090 X.509 recommendation. Use ``GeneralizedTime`` instead for
5095 No strict validation of UTC offsets are made (only applicable to
5096 **BER**), but very crude:
5098 * minutes are not exceeding 60
5099 * offset value is not exceeding 14 hours
5101 __slots__ = ("ber_raw",)
5102 tag_default = tag_encode(23)
5104 asn1_type_name = "UTCTime"
5105 evgen_mode_skip_value = False
5115 bounds=None, # dummy argument, workability for OctetString.decode
5119 :param value: set the value. Either datetime type, or
5120 :py:class:`pyderasn.UTCTime` object
5121 :param bytes impl: override default tag with ``IMPLICIT`` one
5122 :param bytes expl: override default tag with ``EXPLICIT`` one
5123 :param default: set default value. Type same as in ``value``
5124 :param bool optional: is object ``OPTIONAL`` in sequence
5126 super(UTCTime, self).__init__(
5127 None, None, impl, expl, None, optional, _decoded, ctx,
5131 if value is not None:
5132 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5133 self.ber_encoded = self.ber_raw is not None
5134 if default is not None:
5135 default, _ = self._value_sanitize(default)
5136 self.default = self.__class__(
5141 if self._value is None:
5142 self._value = default
5144 self.optional = optional
5146 def _strptime_bered(self, value):
5147 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5150 raise ValueError("no timezone")
5151 year += 2000 if year < 50 else 1900
5152 decoded = datetime(year, month, day, hour, minute)
5154 if value[-1] == "Z":
5158 raise ValueError("invalid UTC offset")
5159 if value[-5] == "-":
5161 elif value[-5] == "+":
5164 raise ValueError("invalid UTC offset")
5165 v = pureint(value[-4:])
5166 offset, v = (60 * (v % 100)), v // 100
5168 raise ValueError("invalid UTC offset minutes")
5170 if offset > 14 * 3600:
5171 raise ValueError("too big UTC offset")
5175 return offset, decoded
5177 raise ValueError("invalid UTC offset seconds")
5178 seconds = pureint(value)
5180 raise ValueError("invalid seconds value")
5181 return offset, decoded + timedelta(seconds=seconds)
5183 def _strptime(self, value):
5184 # datetime.strptime's format: %y%m%d%H%M%SZ
5185 if len(value) != LEN_YYMMDDHHMMSSZ:
5186 raise ValueError("invalid UTCTime length")
5187 if value[-1] != "Z":
5188 raise ValueError("non UTC timezone")
5189 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5190 year += 2000 if year < 50 else 1900
5191 return datetime(year, month, day, hour, minute, second)
5193 def _dt_sanitize(self, value):
5194 if value.year < 1950 or value.year > 2049:
5195 raise ValueError("UTCTime can hold only 1950-2049 years")
5196 return value.replace(microsecond=0)
5198 def _value_sanitize(self, value, ctx=None):
5199 if value.__class__ == binary_type:
5201 value_decoded = value.decode("ascii")
5202 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5203 raise DecodeError("invalid UTCTime encoding: %r" % err)
5206 return self._strptime(value_decoded), None
5207 except (TypeError, ValueError) as _err:
5209 if (ctx is not None) and ctx.get("bered", False):
5211 offset, _value = self._strptime_bered(value_decoded)
5212 _value = _value - timedelta(seconds=offset)
5213 return self._dt_sanitize(_value), value
5214 except (TypeError, ValueError, OverflowError) as _err:
5217 "invalid %s format: %r" % (self.asn1_type_name, err),
5218 klass=self.__class__,
5220 if isinstance(value, self.__class__):
5221 return value._value, None
5222 if value.__class__ == datetime:
5223 if value.tzinfo is not None:
5224 raise ValueError("only naive datetime supported")
5225 return self._dt_sanitize(value), None
5226 raise InvalidValueType((self.__class__, datetime))
5228 def _pp_value(self):
5230 value = self._value.isoformat()
5231 if self.ber_encoded:
5232 value += " (%s)" % self.ber_raw
5236 def __unicode__(self):
5238 value = self._value.isoformat()
5239 if self.ber_encoded:
5240 value += " (%s)" % self.ber_raw
5242 return text_type(self._pp_value())
5244 def __getstate__(self):
5245 return UTCTimeState(
5246 *super(UTCTime, self).__getstate__(),
5247 **{"ber_raw": self.ber_raw}
5250 def __setstate__(self, state):
5251 super(UTCTime, self).__setstate__(state)
5252 self.ber_raw = state.ber_raw
5254 def __bytes__(self):
5255 self._assert_ready()
5256 return self._encode_time()
5258 def __eq__(self, their):
5259 if their.__class__ == binary_type:
5260 return self._encode_time() == their
5261 if their.__class__ == datetime:
5262 return self.todatetime() == their
5263 if not isinstance(their, self.__class__):
5266 self._value == their._value and
5267 self.tag == their.tag and
5268 self._expl == their._expl
5271 def _encode_time(self):
5272 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5275 self._assert_ready()
5276 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5278 def _encode1st(self, state):
5279 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5281 def _encode2nd(self, writer, state_iter):
5282 self._assert_ready()
5283 write_full(writer, self._encode())
5285 def _encode_cer(self, writer):
5286 write_full(writer, self._encode())
5288 def todatetime(self):
5292 return pp_console_row(next(self.pps()))
5294 def pps(self, decode_path=()):
5297 asn1_type_name=self.asn1_type_name,
5298 obj_name=self.__class__.__name__,
5299 decode_path=decode_path,
5300 value=self._pp_value(),
5301 optional=self.optional,
5302 default=self == self.default,
5303 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5304 expl=None if self._expl is None else tag_decode(self._expl),
5309 expl_offset=self.expl_offset if self.expled else None,
5310 expl_tlen=self.expl_tlen if self.expled else None,
5311 expl_llen=self.expl_llen if self.expled else None,
5312 expl_vlen=self.expl_vlen if self.expled else None,
5313 expl_lenindef=self.expl_lenindef,
5314 ber_encoded=self.ber_encoded,
5317 for pp in self.pps_lenindef(decode_path):
5321 class GeneralizedTime(UTCTime):
5322 """``GeneralizedTime`` datetime type
5324 This type is similar to :py:class:`pyderasn.UTCTime`.
5326 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5327 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5329 '20170930220750.000123Z'
5330 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5331 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5335 Only **naive** datetime objects are supported.
5336 Library assumes that all work is done in UTC.
5340 Only **microsecond** fractions are supported in DER encoding.
5341 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5342 higher precision values.
5346 **BER** encoded data can loss information (accuracy) during
5347 decoding because of float transformations.
5351 **Zero** year is unsupported.
5354 tag_default = tag_encode(24)
5355 asn1_type_name = "GeneralizedTime"
5357 def _dt_sanitize(self, value):
5360 def _strptime_bered(self, value):
5361 if len(value) < 4 + 3 * 2:
5362 raise ValueError("invalid GeneralizedTime")
5363 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5364 decoded = datetime(year, month, day, hour)
5365 offset, value = 0, value[10:]
5367 return offset, decoded
5368 if value[-1] == "Z":
5371 for char, sign in (("-", -1), ("+", 1)):
5372 idx = value.rfind(char)
5375 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5376 v = pureint(offset_raw)
5377 if len(offset_raw) == 4:
5378 offset, v = (60 * (v % 100)), v // 100
5380 raise ValueError("invalid UTC offset minutes")
5381 elif len(offset_raw) == 2:
5384 raise ValueError("invalid UTC offset")
5386 if offset > 14 * 3600:
5387 raise ValueError("too big UTC offset")
5391 return offset, decoded
5392 if value[0] in DECIMAL_SIGNS:
5394 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5397 raise ValueError("stripped minutes")
5398 decoded += timedelta(seconds=60 * pureint(value[:2]))
5401 return offset, decoded
5402 if value[0] in DECIMAL_SIGNS:
5404 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5407 raise ValueError("stripped seconds")
5408 decoded += timedelta(seconds=pureint(value[:2]))
5411 return offset, decoded
5412 if value[0] not in DECIMAL_SIGNS:
5413 raise ValueError("invalid format after seconds")
5415 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5418 def _strptime(self, value):
5420 if l == LEN_YYYYMMDDHHMMSSZ:
5421 # datetime.strptime's format: %Y%m%d%H%M%SZ
5422 if value[-1] != "Z":
5423 raise ValueError("non UTC timezone")
5424 return datetime(*str_to_time_fractions(value[:-1]))
5425 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5426 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5427 if value[-1] != "Z":
5428 raise ValueError("non UTC timezone")
5429 if value[14] != ".":
5430 raise ValueError("no fractions separator")
5433 raise ValueError("trailing zero")
5436 raise ValueError("only microsecond fractions are supported")
5437 us = pureint(us + ("0" * (6 - us_len)))
5438 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5439 return datetime(year, month, day, hour, minute, second, us)
5440 raise ValueError("invalid GeneralizedTime length")
5442 def _encode_time(self):
5444 encoded = value.strftime("%Y%m%d%H%M%S")
5445 if value.microsecond > 0:
5446 encoded += (".%06d" % value.microsecond).rstrip("0")
5447 return (encoded + "Z").encode("ascii")
5450 self._assert_ready()
5452 if value.microsecond > 0:
5453 encoded = self._encode_time()
5454 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5455 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5457 def _encode1st(self, state):
5458 self._assert_ready()
5459 vlen = len(self._encode_time())
5460 return len(self.tag) + len_size(vlen) + vlen, state
5462 def _encode2nd(self, writer, state_iter):
5463 write_full(writer, self._encode())
5466 class GraphicString(CommonString):
5468 tag_default = tag_encode(25)
5469 encoding = "iso-8859-1"
5470 asn1_type_name = "GraphicString"
5473 class GeneralString(CommonString):
5475 tag_default = tag_encode(27)
5476 encoding = "iso-8859-1"
5477 asn1_type_name = "GeneralString"
5480 class UniversalString(CommonString):
5482 tag_default = tag_encode(28)
5483 encoding = "utf-32-be"
5484 asn1_type_name = "UniversalString"
5487 class BMPString(CommonString):
5489 tag_default = tag_encode(30)
5490 encoding = "utf-16-be"
5491 asn1_type_name = "BMPString"
5494 ChoiceState = namedtuple(
5496 BasicState._fields + ("specs", "value",),
5502 """``CHOICE`` special type
5506 class GeneralName(Choice):
5508 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5509 ("dNSName", IA5String(impl=tag_ctxp(2))),
5512 >>> gn = GeneralName()
5514 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5515 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5516 >>> gn["dNSName"] = IA5String("bar.baz")
5517 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5518 >>> gn["rfc822Name"]
5521 [2] IA5String IA5 bar.baz
5524 >>> gn.value == gn["dNSName"]
5527 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5529 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5530 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5532 __slots__ = ("specs",)
5534 asn1_type_name = "CHOICE"
5547 :param value: set the value. Either ``(choice, value)`` tuple, or
5548 :py:class:`pyderasn.Choice` object
5549 :param bytes impl: can not be set, do **not** use it
5550 :param bytes expl: override default tag with ``EXPLICIT`` one
5551 :param default: set default value. Type same as in ``value``
5552 :param bool optional: is object ``OPTIONAL`` in sequence
5554 if impl is not None:
5555 raise ValueError("no implicit tag allowed for CHOICE")
5556 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5558 schema = getattr(self, "schema", ())
5559 if len(schema) == 0:
5560 raise ValueError("schema must be specified")
5562 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5565 if value is not None:
5566 self._value = self._value_sanitize(value)
5567 if default is not None:
5568 default_value = self._value_sanitize(default)
5569 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5570 default_obj.specs = self.specs
5571 default_obj._value = default_value
5572 self.default = default_obj
5574 self._value = copy(default_obj._value)
5575 if self._expl is not None:
5576 tag_class, _, tag_num = tag_decode(self._expl)
5577 self._tag_order = (tag_class, tag_num)
5579 def _value_sanitize(self, value):
5580 if (value.__class__ == tuple) and len(value) == 2:
5582 spec = self.specs.get(choice)
5584 raise ObjUnknown(choice)
5585 if not isinstance(obj, spec.__class__):
5586 raise InvalidValueType((spec,))
5587 return (choice, spec(obj))
5588 if isinstance(value, self.__class__):
5590 raise InvalidValueType((self.__class__, tuple))
5594 return self._value is not None and self._value[1].ready
5598 return self.expl_lenindef or (
5599 (self._value is not None) and
5600 self._value[1].bered
5603 def __getstate__(self):
5621 def __setstate__(self, state):
5622 super(Choice, self).__setstate__(state)
5623 self.specs = state.specs
5624 self._value = state.value
5626 def __eq__(self, their):
5627 if (their.__class__ == tuple) and len(their) == 2:
5628 return self._value == their
5629 if not isinstance(their, self.__class__):
5632 self.specs == their.specs and
5633 self._value == their._value
5643 return self.__class__(
5646 expl=self._expl if expl is None else expl,
5647 default=self.default if default is None else default,
5648 optional=self.optional if optional is None else optional,
5653 """Name of the choice
5655 self._assert_ready()
5656 return self._value[0]
5660 """Value of underlying choice
5662 self._assert_ready()
5663 return self._value[1]
5666 def tag_order(self):
5667 self._assert_ready()
5668 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5671 def tag_order_cer(self):
5672 return min(v.tag_order_cer for v in itervalues(self.specs))
5674 def __getitem__(self, key):
5675 if key not in self.specs:
5676 raise ObjUnknown(key)
5677 if self._value is None:
5679 choice, value = self._value
5684 def __setitem__(self, key, value):
5685 spec = self.specs.get(key)
5687 raise ObjUnknown(key)
5688 if not isinstance(value, spec.__class__):
5689 raise InvalidValueType((spec.__class__,))
5690 self._value = (key, spec(value))
5698 return self._value[1].decoded if self.ready else False
5701 self._assert_ready()
5702 return self._value[1].encode()
5704 def _encode1st(self, state):
5705 self._assert_ready()
5706 return self._value[1].encode1st(state)
5708 def _encode2nd(self, writer, state_iter):
5709 self._value[1].encode2nd(writer, state_iter)
5711 def _encode_cer(self, writer):
5712 self._assert_ready()
5713 self._value[1].encode_cer(writer)
5715 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5716 for choice, spec in iteritems(self.specs):
5717 sub_decode_path = decode_path + (choice,)
5723 decode_path=sub_decode_path,
5726 _ctx_immutable=False,
5733 klass=self.__class__,
5734 decode_path=decode_path,
5737 if tag_only: # pragma: no cover
5741 for _decode_path, value, tail in spec.decode_evgen(
5745 decode_path=sub_decode_path,
5747 _ctx_immutable=False,
5749 yield _decode_path, value, tail
5751 _, value, tail = next(spec.decode_evgen(
5755 decode_path=sub_decode_path,
5757 _ctx_immutable=False,
5760 obj = self.__class__(
5763 default=self.default,
5764 optional=self.optional,
5765 _decoded=(offset, 0, value.fulllen),
5767 obj._value = (choice, value)
5768 yield decode_path, obj, tail
5771 value = pp_console_row(next(self.pps()))
5773 value = "%s[%r]" % (value, self.value)
5776 def pps(self, decode_path=()):
5779 asn1_type_name=self.asn1_type_name,
5780 obj_name=self.__class__.__name__,
5781 decode_path=decode_path,
5782 value=self.choice if self.ready else None,
5783 optional=self.optional,
5784 default=self == self.default,
5785 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5786 expl=None if self._expl is None else tag_decode(self._expl),
5791 expl_lenindef=self.expl_lenindef,
5795 yield self.value.pps(decode_path=decode_path + (self.choice,))
5796 for pp in self.pps_lenindef(decode_path):
5800 class PrimitiveTypes(Choice):
5801 """Predefined ``CHOICE`` for all generic primitive types
5803 It could be useful for general decoding of some unspecified values:
5805 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5806 OCTET STRING 3 bytes 666f6f
5807 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5811 schema = tuple((klass.__name__, klass()) for klass in (
5835 AnyState = namedtuple(
5837 BasicState._fields + ("value", "defined"),
5843 """``ANY`` special type
5845 >>> Any(Integer(-123))
5846 ANY INTEGER -123 (0X:7B)
5847 >>> a = Any(OctetString(b"hello world").encode())
5848 ANY 040b68656c6c6f20776f726c64
5849 >>> hexenc(bytes(a))
5850 b'0x040x0bhello world'
5852 __slots__ = ("defined",)
5853 tag_default = tag_encode(0)
5854 asn1_type_name = "ANY"
5864 :param value: set the value. Either any kind of pyderasn's
5865 **ready** object, or bytes. Pay attention that
5866 **no** validation is performed if raw binary value
5867 is valid TLV, except just tag decoding
5868 :param bytes expl: override default tag with ``EXPLICIT`` one
5869 :param bool optional: is object ``OPTIONAL`` in sequence
5871 super(Any, self).__init__(None, expl, None, optional, _decoded)
5875 value = self._value_sanitize(value)
5877 if self._expl is None:
5878 if value.__class__ == binary_type:
5879 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5881 tag_class, tag_num = value.tag_order
5883 tag_class, _, tag_num = tag_decode(self._expl)
5884 self._tag_order = (tag_class, tag_num)
5887 def _value_sanitize(self, value):
5888 if value.__class__ == binary_type:
5890 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5892 if isinstance(value, self.__class__):
5894 if not isinstance(value, Obj):
5895 raise InvalidValueType((self.__class__, Obj, binary_type))
5900 return self._value is not None
5903 def tag_order(self):
5904 self._assert_ready()
5905 return self._tag_order
5909 if self.expl_lenindef or self.lenindef:
5911 if self.defined is None:
5913 return self.defined[1].bered
5915 def __getstate__(self):
5933 def __setstate__(self, state):
5934 super(Any, self).__setstate__(state)
5935 self._value = state.value
5936 self.defined = state.defined
5938 def __eq__(self, their):
5939 if their.__class__ == binary_type:
5940 if self._value.__class__ == binary_type:
5941 return self._value == their
5942 return self._value.encode() == their
5943 if issubclass(their.__class__, Any):
5944 if self.ready and their.ready:
5945 return bytes(self) == bytes(their)
5946 return self.ready == their.ready
5955 return self.__class__(
5957 expl=self._expl if expl is None else expl,
5958 optional=self.optional if optional is None else optional,
5961 def __bytes__(self):
5962 self._assert_ready()
5964 if value.__class__ == binary_type:
5966 return self._value.encode()
5973 self._assert_ready()
5975 if value.__class__ == binary_type:
5977 return value.encode()
5979 def _encode1st(self, state):
5980 self._assert_ready()
5982 if value.__class__ == binary_type:
5983 return len(value), state
5984 return value.encode1st(state)
5986 def _encode2nd(self, writer, state_iter):
5988 if value.__class__ == binary_type:
5989 write_full(writer, value)
5991 value.encode2nd(writer, state_iter)
5993 def _encode_cer(self, writer):
5994 self._assert_ready()
5996 if value.__class__ == binary_type:
5997 write_full(writer, value)
5999 value.encode_cer(writer)
6001 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6003 t, tlen, lv = tag_strip(tlv)
6004 except DecodeError as err:
6005 raise err.__class__(
6007 klass=self.__class__,
6008 decode_path=decode_path,
6012 l, llen, v = len_decode(lv)
6013 except LenIndefForm as err:
6014 if not ctx.get("bered", False):
6015 raise err.__class__(
6017 klass=self.__class__,
6018 decode_path=decode_path,
6021 llen, vlen, v = 1, 0, lv[1:]
6022 sub_offset = offset + tlen + llen
6024 while v[:EOC_LEN].tobytes() != EOC:
6025 chunk, v = Any().decode(
6028 decode_path=decode_path + (str(chunk_i),),
6031 _ctx_immutable=False,
6033 vlen += chunk.tlvlen
6034 sub_offset += chunk.tlvlen
6036 tlvlen = tlen + llen + vlen + EOC_LEN
6037 obj = self.__class__(
6038 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6040 optional=self.optional,
6041 _decoded=(offset, 0, tlvlen),
6044 obj.tag = t.tobytes()
6045 yield decode_path, obj, v[EOC_LEN:]
6047 except DecodeError as err:
6048 raise err.__class__(
6050 klass=self.__class__,
6051 decode_path=decode_path,
6055 raise NotEnoughData(
6056 "encoded length is longer than data",
6057 klass=self.__class__,
6058 decode_path=decode_path,
6061 tlvlen = tlen + llen + l
6062 v, tail = tlv[:tlvlen], v[l:]
6063 obj = self.__class__(
6064 value=None if evgen_mode else v.tobytes(),
6066 optional=self.optional,
6067 _decoded=(offset, 0, tlvlen),
6069 obj.tag = t.tobytes()
6070 yield decode_path, obj, tail
6073 return pp_console_row(next(self.pps()))
6075 def pps(self, decode_path=()):
6079 elif value.__class__ == binary_type:
6085 asn1_type_name=self.asn1_type_name,
6086 obj_name=self.__class__.__name__,
6087 decode_path=decode_path,
6089 blob=self._value if self._value.__class__ == binary_type else None,
6090 optional=self.optional,
6091 default=self == self.default,
6092 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6093 expl=None if self._expl is None else tag_decode(self._expl),
6098 expl_offset=self.expl_offset if self.expled else None,
6099 expl_tlen=self.expl_tlen if self.expled else None,
6100 expl_llen=self.expl_llen if self.expled else None,
6101 expl_vlen=self.expl_vlen if self.expled else None,
6102 expl_lenindef=self.expl_lenindef,
6103 lenindef=self.lenindef,
6106 defined_by, defined = self.defined or (None, None)
6107 if defined_by is not None:
6109 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6111 for pp in self.pps_lenindef(decode_path):
6115 ########################################################################
6116 # ASN.1 constructed types
6117 ########################################################################
6119 def abs_decode_path(decode_path, rel_path):
6120 """Create an absolute decode path from current and relative ones
6122 :param decode_path: current decode path, starting point. Tuple of strings
6123 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6124 If first tuple's element is "/", then treat it as
6125 an absolute path, ignoring ``decode_path`` as
6126 starting point. Also this tuple can contain ".."
6127 elements, stripping the leading element from
6130 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6131 ("foo", "bar", "baz", "whatever")
6132 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6134 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6137 if rel_path[0] == "/":
6139 if rel_path[0] == "..":
6140 return abs_decode_path(decode_path[:-1], rel_path[1:])
6141 return decode_path + rel_path
6144 SequenceState = namedtuple(
6146 BasicState._fields + ("specs", "value",),
6151 class SequenceEncode1stMixing(object):
6152 def _encode1st(self, state):
6154 idx = len(state) - 1
6156 for v in self._values_for_encoding():
6157 l, _ = v.encode1st(state)
6160 return len(self.tag) + len_size(vlen) + vlen, state
6163 class Sequence(SequenceEncode1stMixing, Obj):
6164 """``SEQUENCE`` structure type
6166 You have to make specification of sequence::
6168 class Extension(Sequence):
6170 ("extnID", ObjectIdentifier()),
6171 ("critical", Boolean(default=False)),
6172 ("extnValue", OctetString()),
6175 Then, you can work with it as with dictionary.
6177 >>> ext = Extension()
6178 >>> Extension().specs
6180 ('extnID', OBJECT IDENTIFIER),
6181 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6182 ('extnValue', OCTET STRING),
6184 >>> ext["extnID"] = "1.2.3"
6185 Traceback (most recent call last):
6186 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6187 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6189 You can determine if sequence is ready to be encoded:
6194 Traceback (most recent call last):
6195 pyderasn.ObjNotReady: object is not ready: extnValue
6196 >>> ext["extnValue"] = OctetString(b"foobar")
6200 Value you want to assign, must have the same **type** as in
6201 corresponding specification, but it can have different tags,
6202 optional/default attributes -- they will be taken from specification
6205 class TBSCertificate(Sequence):
6207 ("version", Version(expl=tag_ctxc(0), default="v1")),
6210 >>> tbs = TBSCertificate()
6211 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6213 Assign ``None`` to remove value from sequence.
6215 You can set values in Sequence during its initialization:
6217 >>> AlgorithmIdentifier((
6218 ("algorithm", ObjectIdentifier("1.2.3")),
6219 ("parameters", Any(Null()))
6221 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6223 You can determine if value exists/set in the sequence and take its value:
6225 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6228 OBJECT IDENTIFIER 1.2.3
6230 But pay attention that if value has default, then it won't be (not
6231 in) in the sequence (because ``DEFAULT`` must not be encoded in
6232 DER), but you can read its value:
6234 >>> "critical" in ext, ext["critical"]
6235 (False, BOOLEAN False)
6236 >>> ext["critical"] = Boolean(True)
6237 >>> "critical" in ext, ext["critical"]
6238 (True, BOOLEAN True)
6240 All defaulted values are always optional.
6242 .. _allow_default_values_ctx:
6244 DER prohibits default value encoding and will raise an error if
6245 default value is unexpectedly met during decode.
6246 If :ref:`bered <bered_ctx>` context option is set, then no error
6247 will be raised, but ``bered`` attribute set. You can disable strict
6248 defaulted values existence validation by setting
6249 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6251 All values with DEFAULT specified are decoded atomically in
6252 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6253 SEQUENCE, then it will be yielded as a single element, not
6254 disassembled. That is required for DEFAULT existence check.
6256 Two sequences are equal if they have equal specification (schema),
6257 implicit/explicit tagging and the same values.
6259 __slots__ = ("specs",)
6260 tag_default = tag_encode(form=TagFormConstructed, num=16)
6261 asn1_type_name = "SEQUENCE"
6273 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6275 schema = getattr(self, "schema", ())
6277 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6280 if value is not None:
6281 if issubclass(value.__class__, Sequence):
6282 self._value = value._value
6283 elif hasattr(value, "__iter__"):
6284 for seq_key, seq_value in value:
6285 self[seq_key] = seq_value
6287 raise InvalidValueType((Sequence,))
6288 if default is not None:
6289 if not issubclass(default.__class__, Sequence):
6290 raise InvalidValueType((Sequence,))
6291 default_value = default._value
6292 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6293 default_obj.specs = self.specs
6294 default_obj._value = default_value
6295 self.default = default_obj
6297 self._value = copy(default_obj._value)
6301 for name, spec in iteritems(self.specs):
6302 value = self._value.get(name)
6313 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6315 return any(value.bered for value in itervalues(self._value))
6317 def __getstate__(self):
6318 return SequenceState(
6332 {k: copy(v) for k, v in iteritems(self._value)},
6335 def __setstate__(self, state):
6336 super(Sequence, self).__setstate__(state)
6337 self.specs = state.specs
6338 self._value = state.value
6340 def __eq__(self, their):
6341 if not isinstance(their, self.__class__):
6344 self.specs == their.specs and
6345 self.tag == their.tag and
6346 self._expl == their._expl and
6347 self._value == their._value
6358 return self.__class__(
6361 impl=self.tag if impl is None else impl,
6362 expl=self._expl if expl is None else expl,
6363 default=self.default if default is None else default,
6364 optional=self.optional if optional is None else optional,
6367 def __contains__(self, key):
6368 return key in self._value
6370 def __setitem__(self, key, value):
6371 spec = self.specs.get(key)
6373 raise ObjUnknown(key)
6375 self._value.pop(key, None)
6377 if not isinstance(value, spec.__class__):
6378 raise InvalidValueType((spec.__class__,))
6379 value = spec(value=value)
6380 if spec.default is not None and value == spec.default:
6381 self._value.pop(key, None)
6383 self._value[key] = value
6385 def __getitem__(self, key):
6386 value = self._value.get(key)
6387 if value is not None:
6389 spec = self.specs.get(key)
6391 raise ObjUnknown(key)
6392 if spec.default is not None:
6396 def _values_for_encoding(self):
6397 for name, spec in iteritems(self.specs):
6398 value = self._value.get(name)
6402 raise ObjNotReady(name)
6406 v = b"".join(v.encode() for v in self._values_for_encoding())
6407 return b"".join((self.tag, len_encode(len(v)), v))
6409 def _encode2nd(self, writer, state_iter):
6410 write_full(writer, self.tag + len_encode(next(state_iter)))
6411 for v in self._values_for_encoding():
6412 v.encode2nd(writer, state_iter)
6414 def _encode_cer(self, writer):
6415 write_full(writer, self.tag + LENINDEF)
6416 for v in self._values_for_encoding():
6417 v.encode_cer(writer)
6418 write_full(writer, EOC)
6420 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6422 t, tlen, lv = tag_strip(tlv)
6423 except DecodeError as err:
6424 raise err.__class__(
6426 klass=self.__class__,
6427 decode_path=decode_path,
6432 klass=self.__class__,
6433 decode_path=decode_path,
6436 if tag_only: # pragma: no cover
6440 ctx_bered = ctx.get("bered", False)
6442 l, llen, v = len_decode(lv)
6443 except LenIndefForm as err:
6445 raise err.__class__(
6447 klass=self.__class__,
6448 decode_path=decode_path,
6451 l, llen, v = 0, 1, lv[1:]
6453 except DecodeError as err:
6454 raise err.__class__(
6456 klass=self.__class__,
6457 decode_path=decode_path,
6461 raise NotEnoughData(
6462 "encoded length is longer than data",
6463 klass=self.__class__,
6464 decode_path=decode_path,
6468 v, tail = v[:l], v[l:]
6470 sub_offset = offset + tlen + llen
6473 ctx_allow_default_values = ctx.get("allow_default_values", False)
6474 for name, spec in iteritems(self.specs):
6475 if spec.optional and (
6476 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6480 spec_defaulted = spec.default is not None
6481 sub_decode_path = decode_path + (name,)
6483 if evgen_mode and not spec_defaulted:
6484 for _decode_path, value, v_tail in spec.decode_evgen(
6488 decode_path=sub_decode_path,
6490 _ctx_immutable=False,
6492 yield _decode_path, value, v_tail
6494 _, value, v_tail = next(spec.decode_evgen(
6498 decode_path=sub_decode_path,
6500 _ctx_immutable=False,
6503 except TagMismatch as err:
6504 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6508 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6509 if not evgen_mode and defined is not None:
6510 defined_by, defined_spec = defined
6511 if issubclass(value.__class__, SequenceOf):
6512 for i, _value in enumerate(value):
6513 sub_sub_decode_path = sub_decode_path + (
6515 DecodePathDefBy(defined_by),
6517 defined_value, defined_tail = defined_spec.decode(
6518 memoryview(bytes(_value)),
6520 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6521 if value.expled else (value.tlen + value.llen)
6524 decode_path=sub_sub_decode_path,
6526 _ctx_immutable=False,
6528 if len(defined_tail) > 0:
6531 klass=self.__class__,
6532 decode_path=sub_sub_decode_path,
6535 _value.defined = (defined_by, defined_value)
6537 defined_value, defined_tail = defined_spec.decode(
6538 memoryview(bytes(value)),
6540 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6541 if value.expled else (value.tlen + value.llen)
6544 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6546 _ctx_immutable=False,
6548 if len(defined_tail) > 0:
6551 klass=self.__class__,
6552 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6555 value.defined = (defined_by, defined_value)
6557 value_len = value.fulllen
6559 sub_offset += value_len
6563 yield sub_decode_path, value, v_tail
6564 if value == spec.default:
6565 if ctx_bered or ctx_allow_default_values:
6569 "DEFAULT value met",
6570 klass=self.__class__,
6571 decode_path=sub_decode_path,
6575 values[name] = value
6576 spec_defines = getattr(spec, "defines", ())
6577 if len(spec_defines) == 0:
6578 defines_by_path = ctx.get("defines_by_path", ())
6579 if len(defines_by_path) > 0:
6580 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6581 if spec_defines is not None and len(spec_defines) > 0:
6582 for rel_path, schema in spec_defines:
6583 defined = schema.get(value, None)
6584 if defined is not None:
6585 ctx.setdefault("_defines", []).append((
6586 abs_decode_path(sub_decode_path[:-1], rel_path),
6590 if v[:EOC_LEN].tobytes() != EOC:
6593 klass=self.__class__,
6594 decode_path=decode_path,
6602 klass=self.__class__,
6603 decode_path=decode_path,
6606 obj = self.__class__(
6610 default=self.default,
6611 optional=self.optional,
6612 _decoded=(offset, llen, vlen),
6615 obj.lenindef = lenindef
6616 obj.ber_encoded = ber_encoded
6617 yield decode_path, obj, tail
6620 value = pp_console_row(next(self.pps()))
6622 for name in self.specs:
6623 _value = self._value.get(name)
6626 cols.append("%s: %s" % (name, repr(_value)))
6627 return "%s[%s]" % (value, "; ".join(cols))
6629 def pps(self, decode_path=()):
6632 asn1_type_name=self.asn1_type_name,
6633 obj_name=self.__class__.__name__,
6634 decode_path=decode_path,
6635 optional=self.optional,
6636 default=self == self.default,
6637 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6638 expl=None if self._expl is None else tag_decode(self._expl),
6643 expl_offset=self.expl_offset if self.expled else None,
6644 expl_tlen=self.expl_tlen if self.expled else None,
6645 expl_llen=self.expl_llen if self.expled else None,
6646 expl_vlen=self.expl_vlen if self.expled else None,
6647 expl_lenindef=self.expl_lenindef,
6648 lenindef=self.lenindef,
6649 ber_encoded=self.ber_encoded,
6652 for name in self.specs:
6653 value = self._value.get(name)
6656 yield value.pps(decode_path=decode_path + (name,))
6657 for pp in self.pps_lenindef(decode_path):
6661 class Set(Sequence, SequenceEncode1stMixing):
6662 """``SET`` structure type
6664 Its usage is identical to :py:class:`pyderasn.Sequence`.
6666 .. _allow_unordered_set_ctx:
6668 DER prohibits unordered values encoding and will raise an error
6669 during decode. If :ref:`bered <bered_ctx>` context option is set,
6670 then no error will occur. Also you can disable strict values
6671 ordering check by setting ``"allow_unordered_set": True``
6672 :ref:`context <ctx>` option.
6675 tag_default = tag_encode(form=TagFormConstructed, num=17)
6676 asn1_type_name = "SET"
6678 def _values_for_encoding(self):
6680 super(Set, self)._values_for_encoding(),
6681 key=attrgetter("tag_order"),
6684 def _encode_cer(self, writer):
6685 write_full(writer, self.tag + LENINDEF)
6687 super(Set, self)._values_for_encoding(),
6688 key=attrgetter("tag_order_cer"),
6690 v.encode_cer(writer)
6691 write_full(writer, EOC)
6693 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6695 t, tlen, lv = tag_strip(tlv)
6696 except DecodeError as err:
6697 raise err.__class__(
6699 klass=self.__class__,
6700 decode_path=decode_path,
6705 klass=self.__class__,
6706 decode_path=decode_path,
6713 ctx_bered = ctx.get("bered", False)
6715 l, llen, v = len_decode(lv)
6716 except LenIndefForm as err:
6718 raise err.__class__(
6720 klass=self.__class__,
6721 decode_path=decode_path,
6724 l, llen, v = 0, 1, lv[1:]
6726 except DecodeError as err:
6727 raise err.__class__(
6729 klass=self.__class__,
6730 decode_path=decode_path,
6734 raise NotEnoughData(
6735 "encoded length is longer than data",
6736 klass=self.__class__,
6740 v, tail = v[:l], v[l:]
6742 sub_offset = offset + tlen + llen
6745 ctx_allow_default_values = ctx.get("allow_default_values", False)
6746 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6747 tag_order_prev = (0, 0)
6748 _specs_items = copy(self.specs)
6751 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6753 for name, spec in iteritems(_specs_items):
6754 sub_decode_path = decode_path + (name,)
6760 decode_path=sub_decode_path,
6763 _ctx_immutable=False,
6770 klass=self.__class__,
6771 decode_path=decode_path,
6774 spec_defaulted = spec.default is not None
6775 if evgen_mode and not spec_defaulted:
6776 for _decode_path, value, v_tail in spec.decode_evgen(
6780 decode_path=sub_decode_path,
6782 _ctx_immutable=False,
6784 yield _decode_path, value, v_tail
6786 _, value, v_tail = next(spec.decode_evgen(
6790 decode_path=sub_decode_path,
6792 _ctx_immutable=False,
6795 value_tag_order = value.tag_order
6796 value_len = value.fulllen
6797 if tag_order_prev >= value_tag_order:
6798 if ctx_bered or ctx_allow_unordered_set:
6802 "unordered " + self.asn1_type_name,
6803 klass=self.__class__,
6804 decode_path=sub_decode_path,
6809 yield sub_decode_path, value, v_tail
6810 if value != spec.default:
6812 elif ctx_bered or ctx_allow_default_values:
6816 "DEFAULT value met",
6817 klass=self.__class__,
6818 decode_path=sub_decode_path,
6821 values[name] = value
6822 del _specs_items[name]
6823 tag_order_prev = value_tag_order
6824 sub_offset += value_len
6828 obj = self.__class__(
6832 default=self.default,
6833 optional=self.optional,
6834 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6837 if v[:EOC_LEN].tobytes() != EOC:
6840 klass=self.__class__,
6841 decode_path=decode_path,
6846 for name, spec in iteritems(self.specs):
6847 if name not in values and not spec.optional:
6849 "%s value is not ready" % name,
6850 klass=self.__class__,
6851 decode_path=decode_path,
6856 obj.ber_encoded = ber_encoded
6857 yield decode_path, obj, tail
6860 SequenceOfState = namedtuple(
6862 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6867 class SequenceOf(SequenceEncode1stMixing, Obj):
6868 """``SEQUENCE OF`` sequence type
6870 For that kind of type you must specify the object it will carry on
6871 (bounds are for example here, not required)::
6873 class Ints(SequenceOf):
6878 >>> ints.append(Integer(123))
6879 >>> ints.append(Integer(234))
6881 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6882 >>> [int(i) for i in ints]
6884 >>> ints.append(Integer(345))
6885 Traceback (most recent call last):
6886 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6889 >>> ints[1] = Integer(345)
6891 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6893 You can initialize sequence with preinitialized values:
6895 >>> ints = Ints([Integer(123), Integer(234)])
6897 Also you can use iterator as a value:
6899 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6901 And it won't be iterated until encoding process. Pay attention that
6902 bounds and required schema checks are done only during the encoding
6903 process in that case! After encode was called, then value is zeroed
6904 back to empty list and you have to set it again. That mode is useful
6905 mainly with CER encoding mode, where all objects from the iterable
6906 will be streamed to the buffer, without copying all of them to
6909 __slots__ = ("spec", "_bound_min", "_bound_max")
6910 tag_default = tag_encode(form=TagFormConstructed, num=16)
6911 asn1_type_name = "SEQUENCE OF"
6924 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6926 schema = getattr(self, "schema", None)
6928 raise ValueError("schema must be specified")
6930 self._bound_min, self._bound_max = getattr(
6934 ) if bounds is None else bounds
6936 if value is not None:
6937 self._value = self._value_sanitize(value)
6938 if default is not None:
6939 default_value = self._value_sanitize(default)
6940 default_obj = self.__class__(
6945 default_obj._value = default_value
6946 self.default = default_obj
6948 self._value = copy(default_obj._value)
6950 def _value_sanitize(self, value):
6952 if issubclass(value.__class__, SequenceOf):
6953 value = value._value
6954 elif hasattr(value, NEXT_ATTR_NAME):
6956 elif hasattr(value, "__iter__"):
6959 raise InvalidValueType((self.__class__, iter, "iterator"))
6961 if not self._bound_min <= len(value) <= self._bound_max:
6962 raise BoundsError(self._bound_min, len(value), self._bound_max)
6963 class_expected = self.spec.__class__
6965 if not isinstance(v, class_expected):
6966 raise InvalidValueType((class_expected,))
6971 if hasattr(self._value, NEXT_ATTR_NAME):
6973 if self._bound_min > 0 and len(self._value) == 0:
6975 return all(v.ready for v in self._value)
6979 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6981 return any(v.bered for v in self._value)
6983 def __getstate__(self):
6984 if hasattr(self._value, NEXT_ATTR_NAME):
6985 raise ValueError("can not pickle SequenceOf with iterator")
6986 return SequenceOfState(
7000 [copy(v) for v in self._value],
7005 def __setstate__(self, state):
7006 super(SequenceOf, self).__setstate__(state)
7007 self.spec = state.spec
7008 self._value = state.value
7009 self._bound_min = state.bound_min
7010 self._bound_max = state.bound_max
7012 def __eq__(self, their):
7013 if isinstance(their, self.__class__):
7015 self.spec == their.spec and
7016 self.tag == their.tag and
7017 self._expl == their._expl and
7018 self._value == their._value
7020 if hasattr(their, "__iter__"):
7021 return self._value == list(their)
7033 return self.__class__(
7037 (self._bound_min, self._bound_max)
7038 if bounds is None else bounds
7040 impl=self.tag if impl is None else impl,
7041 expl=self._expl if expl is None else expl,
7042 default=self.default if default is None else default,
7043 optional=self.optional if optional is None else optional,
7046 def __contains__(self, key):
7047 return key in self._value
7049 def append(self, value):
7050 if not isinstance(value, self.spec.__class__):
7051 raise InvalidValueType((self.spec.__class__,))
7052 if len(self._value) + 1 > self._bound_max:
7055 len(self._value) + 1,
7058 self._value.append(value)
7061 return iter(self._value)
7064 return len(self._value)
7066 def __setitem__(self, key, value):
7067 if not isinstance(value, self.spec.__class__):
7068 raise InvalidValueType((self.spec.__class__,))
7069 self._value[key] = self.spec(value=value)
7071 def __getitem__(self, key):
7072 return self._value[key]
7074 def _values_for_encoding(self):
7075 return iter(self._value)
7078 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7081 values_append = values.append
7082 class_expected = self.spec.__class__
7083 values_for_encoding = self._values_for_encoding()
7085 for v in values_for_encoding:
7086 if not isinstance(v, class_expected):
7087 raise InvalidValueType((class_expected,))
7088 values_append(v.encode())
7089 if not self._bound_min <= len(values) <= self._bound_max:
7090 raise BoundsError(self._bound_min, len(values), self._bound_max)
7091 value = b"".join(values)
7093 value = b"".join(v.encode() for v in self._values_for_encoding())
7094 return b"".join((self.tag, len_encode(len(value)), value))
7096 def _encode1st(self, state):
7097 state = super(SequenceOf, self)._encode1st(state)
7098 if hasattr(self._value, NEXT_ATTR_NAME):
7102 def _encode2nd(self, writer, state_iter):
7103 write_full(writer, self.tag + len_encode(next(state_iter)))
7104 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7107 class_expected = self.spec.__class__
7108 values_for_encoding = self._values_for_encoding()
7110 for v in values_for_encoding:
7111 if not isinstance(v, class_expected):
7112 raise InvalidValueType((class_expected,))
7113 v.encode2nd(writer, state_iter)
7115 if not self._bound_min <= values_count <= self._bound_max:
7116 raise BoundsError(self._bound_min, values_count, self._bound_max)
7118 for v in self._values_for_encoding():
7119 v.encode2nd(writer, state_iter)
7121 def _encode_cer(self, writer):
7122 write_full(writer, self.tag + LENINDEF)
7123 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7125 class_expected = self.spec.__class__
7127 values_for_encoding = self._values_for_encoding()
7129 for v in values_for_encoding:
7130 if not isinstance(v, class_expected):
7131 raise InvalidValueType((class_expected,))
7132 v.encode_cer(writer)
7134 if not self._bound_min <= values_count <= self._bound_max:
7135 raise BoundsError(self._bound_min, values_count, self._bound_max)
7137 for v in self._values_for_encoding():
7138 v.encode_cer(writer)
7139 write_full(writer, EOC)
7149 ordering_check=False,
7152 t, tlen, lv = tag_strip(tlv)
7153 except DecodeError as err:
7154 raise err.__class__(
7156 klass=self.__class__,
7157 decode_path=decode_path,
7162 klass=self.__class__,
7163 decode_path=decode_path,
7170 ctx_bered = ctx.get("bered", False)
7172 l, llen, v = len_decode(lv)
7173 except LenIndefForm as err:
7175 raise err.__class__(
7177 klass=self.__class__,
7178 decode_path=decode_path,
7181 l, llen, v = 0, 1, lv[1:]
7183 except DecodeError as err:
7184 raise err.__class__(
7186 klass=self.__class__,
7187 decode_path=decode_path,
7191 raise NotEnoughData(
7192 "encoded length is longer than data",
7193 klass=self.__class__,
7194 decode_path=decode_path,
7198 v, tail = v[:l], v[l:]
7200 sub_offset = offset + tlen + llen
7203 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7204 value_prev = memoryview(v[:0])
7208 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7210 sub_decode_path = decode_path + (str(_value_count),)
7212 for _decode_path, value, v_tail in spec.decode_evgen(
7216 decode_path=sub_decode_path,
7218 _ctx_immutable=False,
7220 yield _decode_path, value, v_tail
7222 _, value, v_tail = next(spec.decode_evgen(
7226 decode_path=sub_decode_path,
7228 _ctx_immutable=False,
7231 value_len = value.fulllen
7233 if value_prev.tobytes() > v[:value_len].tobytes():
7234 if ctx_bered or ctx_allow_unordered_set:
7238 "unordered " + self.asn1_type_name,
7239 klass=self.__class__,
7240 decode_path=sub_decode_path,
7243 value_prev = v[:value_len]
7246 _value.append(value)
7247 sub_offset += value_len
7250 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7252 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7253 klass=self.__class__,
7254 decode_path=decode_path,
7258 obj = self.__class__(
7259 value=None if evgen_mode else _value,
7261 bounds=(self._bound_min, self._bound_max),
7264 default=self.default,
7265 optional=self.optional,
7266 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7268 except BoundsError as err:
7271 klass=self.__class__,
7272 decode_path=decode_path,
7276 if v[:EOC_LEN].tobytes() != EOC:
7279 klass=self.__class__,
7280 decode_path=decode_path,
7285 obj.ber_encoded = ber_encoded
7286 yield decode_path, obj, tail
7290 pp_console_row(next(self.pps())),
7291 ", ".join(repr(v) for v in self._value),
7294 def pps(self, decode_path=()):
7297 asn1_type_name=self.asn1_type_name,
7298 obj_name=self.__class__.__name__,
7299 decode_path=decode_path,
7300 optional=self.optional,
7301 default=self == self.default,
7302 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7303 expl=None if self._expl is None else tag_decode(self._expl),
7308 expl_offset=self.expl_offset if self.expled else None,
7309 expl_tlen=self.expl_tlen if self.expled else None,
7310 expl_llen=self.expl_llen if self.expled else None,
7311 expl_vlen=self.expl_vlen if self.expled else None,
7312 expl_lenindef=self.expl_lenindef,
7313 lenindef=self.lenindef,
7314 ber_encoded=self.ber_encoded,
7317 for i, value in enumerate(self._value):
7318 yield value.pps(decode_path=decode_path + (str(i),))
7319 for pp in self.pps_lenindef(decode_path):
7323 class SetOf(SequenceOf):
7324 """``SET OF`` sequence type
7326 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7329 tag_default = tag_encode(form=TagFormConstructed, num=17)
7330 asn1_type_name = "SET OF"
7332 def _value_sanitize(self, value):
7333 value = super(SetOf, self)._value_sanitize(value)
7334 if hasattr(value, NEXT_ATTR_NAME):
7336 "SetOf does not support iterator values, as no sense in them"
7341 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7342 return b"".join((self.tag, len_encode(len(v)), v))
7344 def _encode2nd(self, writer, state_iter):
7345 write_full(writer, self.tag + len_encode(next(state_iter)))
7347 for v in self._values_for_encoding():
7349 v.encode2nd(buf.write, state_iter)
7350 values.append(buf.getvalue())
7353 write_full(writer, v)
7355 def _encode_cer(self, writer):
7356 write_full(writer, self.tag + LENINDEF)
7357 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7358 write_full(writer, v)
7359 write_full(writer, EOC)
7361 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7362 return super(SetOf, self)._decode(
7369 ordering_check=True,
7373 def obj_by_path(pypath): # pragma: no cover
7374 """Import object specified as string Python path
7376 Modules must be separated from classes/functions with ``:``.
7378 >>> obj_by_path("foo.bar:Baz")
7379 <class 'foo.bar.Baz'>
7380 >>> obj_by_path("foo.bar:Baz.boo")
7381 <classmethod 'foo.bar.Baz.boo'>
7383 mod, objs = pypath.rsplit(":", 1)
7384 from importlib import import_module
7385 obj = import_module(mod)
7386 for obj_name in objs.split("."):
7387 obj = getattr(obj, obj_name)
7391 def generic_decoder(): # pragma: no cover
7392 # All of this below is a big hack with self references
7393 choice = PrimitiveTypes()
7394 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7395 choice.specs["SetOf"] = SetOf(schema=choice)
7396 for i in six_xrange(31):
7397 choice.specs["SequenceOf%d" % i] = SequenceOf(
7401 choice.specs["Any"] = Any()
7403 # Class name equals to type name, to omit it from output
7404 class SEQUENCEOF(SequenceOf):
7412 with_decode_path=False,
7413 decode_path_only=(),
7416 def _pprint_pps(pps):
7418 if hasattr(pp, "_fields"):
7420 decode_path_only != () and
7421 pp.decode_path[:len(decode_path_only)] != decode_path_only
7424 if pp.asn1_type_name == Choice.asn1_type_name:
7426 pp_kwargs = pp._asdict()
7427 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7428 pp = _pp(**pp_kwargs)
7429 yield pp_console_row(
7434 with_colours=with_colours,
7435 with_decode_path=with_decode_path,
7436 decode_path_len_decrease=len(decode_path_only),
7438 for row in pp_console_blob(
7440 decode_path_len_decrease=len(decode_path_only),
7444 for row in _pprint_pps(pp):
7446 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7447 return SEQUENCEOF(), pprint_any
7450 def ascii_visualize(ba):
7451 """Output only ASCII printable characters, like in hexdump -C
7453 Example output for given binary string (right part)::
7455 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.|
7458 return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7462 """Generate ``hexdump -C`` like output
7466 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..|
7467 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j|
7469 Result of that function is a generator of lines, where each line is
7474 ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7475 " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7476 " |i..?.....G...k.j|"]
7480 hexed = hexenc(raw).upper()
7481 addr, cols = 0, ["%08x " % 0]
7482 for i in six_xrange(0, len(hexed), 2):
7483 if i != 0 and i // 2 % 8 == 0:
7485 if i != 0 and i // 2 % 16 == 0:
7486 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7489 cols = ["%08x " % addr]
7490 cols.append(" " + hexed[i:i + 2])
7492 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7496 def browse(raw, obj, oid_maps=()):
7497 """Interactive browser
7499 :param bytes raw: binary data you decoded
7500 :param obj: decoded :py:class:`pyderasn.Obj`
7501 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7502 Its human readable form is printed when OID is met
7504 .. note:: `urwid <http://urwid.org/>`__ dependency required
7506 This browser is an interactive terminal application for browsing
7507 structures of your decoded ASN.1 objects. You can quit it with **q**
7508 key. It consists of three windows:
7511 View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7512 **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7513 **Left** key goes to constructed element above. **Plus**/**Minus**
7514 keys collapse/uncollapse constructed elements. **Space** toggles it
7516 window with various information about element. You can scroll it
7517 with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7519 window with raw data hexdump and highlighted current element's
7520 contents. It automatically focuses on element's data. You can
7521 scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7522 speed) keys. If element has explicit tag, then it also will be
7523 highlighted with different colour
7525 Window's header contains current decode path and progress bars with
7526 position in *info* and *hexdump* windows.
7528 If you press **d**, then current element will be saved in the
7529 current directory under its decode path name (adding ".0", ".1", etc
7530 suffix if such file already exists). **D** will save it with explicit tag.
7532 You can also invoke it with ``--browse`` command line argument.
7534 from copy import deepcopy
7535 from os.path import exists as path_exists
7538 class TW(urwid.TreeWidget):
7539 def __init__(self, state, *args, **kwargs):
7541 self.scrolled = {"info": False, "hexdump": False}
7542 super(TW, self).__init__(*args, **kwargs)
7545 pp = self.get_node().get_value()
7546 constructed = len(pp) > 1
7547 return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7549 def _state_update(self):
7550 pp, _ = self._get_pp()
7551 self.state["decode_path"].set_text(
7552 ":".join(str(p) for p in pp.decode_path)
7554 lines = deepcopy(self.state["hexed"])
7556 def attr_set(i, attr):
7557 line = lines[i // 16]
7558 idx = 1 + (i - 16 * (i // 16))
7559 line[idx] = (attr, line[idx])
7561 if pp.expl_offset is not None:
7562 for i in six_xrange(
7564 pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7566 attr_set(i, "select-expl")
7567 for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7568 attr_set(i, "select-value")
7569 self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7570 self.state["hexdump"].set_focus(pp.offset // 16)
7571 self.state["hexdump"].set_focus_valign("middle")
7572 self.state["hexdump_bar"].set_completion(
7573 (100 * pp.offset // 16) //
7574 len(self.state["hexdump"]._body.positions())
7578 [("header", "Name: "), pp.obj_name],
7579 [("header", "Type: "), pp.asn1_type_name],
7580 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7581 [("header", "[TLV]len: "), "%d/%d/%d" % (
7582 pp.tlen, pp.llen, pp.vlen,
7584 [("header", "TLVlen: "), "%d" % sum((
7585 pp.tlen, pp.llen, pp.vlen,
7587 [("header", "Slice: "), "[%d:%d]" % (
7588 pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7592 lines.append([("warning", "LENINDEF")])
7594 lines.append([("warning", "BER encoded")])
7596 lines.append([("warning", "BERed")])
7597 if pp.expl is not None:
7598 lines.append([("header", "EXPLICIT")])
7599 klass, _, num = pp.expl
7600 lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)])
7601 if pp.expl_offset is not None:
7602 lines.append([" Offset: %d" % pp.expl_offset])
7603 lines.append([" [TLV]len: %d/%d/%d" % (
7604 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7606 lines.append([" TLVlen: %d" % sum((
7607 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7609 lines.append([" Slice: [%d:%d]" % (
7611 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7613 if pp.impl is not None:
7614 klass, _, num = pp.impl
7616 ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7619 lines.append(["OPTIONAL"])
7621 lines.append(["DEFAULT"])
7622 if len(pp.decode_path) > 0:
7623 ent = pp.decode_path[-1]
7624 if isinstance(ent, DecodePathDefBy):
7626 value = str(ent.defined_by)
7627 oid_name = find_oid_name(
7628 ent.defined_by.asn1_type_name, oid_maps, value,
7630 lines.append([("header", "DEFINED BY: "), "%s" % (
7631 value if oid_name is None
7632 else "%s (%s)" % (oid_name, value)
7635 if pp.value is not None:
7636 lines.append([("header", "Value: "), pp.value])
7638 len(oid_maps) > 0 and
7639 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7641 for oid_map in oid_maps:
7642 oid_name = oid_map.get(pp.value)
7643 if oid_name is not None:
7644 lines.append([("header", "Human: "), oid_name])
7646 if pp.asn1_type_name == Integer.asn1_type_name:
7648 ("header", "Decimal: "), "%d" % int(pp.obj),
7651 ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7653 if pp.blob.__class__ == binary_type:
7654 blob = hexenc(pp.blob).upper()
7655 for i in six_xrange(0, len(blob), 32):
7656 lines.append([colonize_hex(blob[i:i + 32])])
7657 elif pp.blob.__class__ == tuple:
7658 lines.append([", ".join(pp.blob)])
7659 self.state["info"]._set_body([urwid.Text(line) for line in lines])
7660 self.state["info_bar"].set_completion(0)
7662 def selectable(self):
7663 if self.state["widget_current"] != self:
7664 self.state["widget_current"] = self
7665 self.scrolled["info"] = False
7666 self.scrolled["hexdump"] = False
7667 self._state_update()
7668 return super(TW, self).selectable()
7670 def get_display_text(self):
7671 pp, constructed = self._get_pp()
7672 style = "constructed" if constructed else ""
7673 if len(pp.decode_path) == 0:
7674 return (style, pp.obj_name)
7675 if pp.asn1_type_name == "EOC":
7676 return ("eoc", "EOC")
7677 ent = pp.decode_path[-1]
7678 if isinstance(ent, DecodePathDefBy):
7679 value = str(ent.defined_by)
7680 oid_name = find_oid_name(
7681 ent.defined_by.asn1_type_name, oid_maps, value,
7683 return ("defby", "DEFBY:" + (
7684 value if oid_name is None else oid_name
7688 def _scroll(self, what, step):
7689 self.state[what]._invalidate()
7690 pos = self.state[what].focus_position
7691 if not self.scrolled[what]:
7692 self.scrolled[what] = True
7694 pos = max(0, pos + step)
7695 pos = min(pos, len(self.state[what]._body.positions()) - 1)
7696 self.state[what].set_focus(pos)
7697 self.state[what].set_focus_valign("top")
7698 self.state[what + "_bar"].set_completion(
7699 (100 * pos) // len(self.state[what]._body.positions())
7702 def keypress(self, size, key):
7704 raise urwid.ExitMainLoop()
7707 self.expanded = not self.expanded
7708 self.update_expanded_icon()
7711 hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7712 if key in hexdump_steps:
7713 self._scroll("hexdump", hexdump_steps[key])
7716 info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7717 if key in info_steps:
7718 self._scroll("info", info_steps[key])
7721 if key in ("d", "D"):
7722 pp, _ = self._get_pp()
7723 dp = ":".join(str(p) for p in pp.decode_path)
7724 dp = dp.replace(" ", "_")
7727 if key == "d" or pp.expl_offset is None:
7728 data = self.state["raw"][pp.offset:(
7729 pp.offset + pp.tlen + pp.llen + pp.vlen
7732 data = self.state["raw"][pp.expl_offset:(
7733 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7737 def duplicate_path(dp, ctr):
7740 return "%s.%d" % (dp, ctr)
7743 if not path_exists(duplicate_path(dp, ctr)):
7746 dp = duplicate_path(dp, ctr)
7747 with open(dp, "wb") as fd:
7749 self.state["decode_path"].set_text(
7750 ("warning", "Saved to: " + dp)
7753 return super(TW, self).keypress(size, key)
7755 class PN(urwid.ParentNode):
7756 def __init__(self, state, value, *args, **kwargs):
7758 if not hasattr(value, "_fields"):
7760 super(PN, self).__init__(value, *args, **kwargs)
7762 def load_widget(self):
7763 return TW(self.state, self)
7765 def load_child_keys(self):
7766 value = self.get_value()
7767 if hasattr(value, "_fields"):
7769 return range(len(value[1:]))
7771 def load_child_node(self, key):
7774 self.get_value()[key + 1],
7777 depth=self.get_depth() + 1,
7780 class LabeledPG(urwid.ProgressBar):
7781 def __init__(self, label, *args, **kwargs):
7783 super(LabeledPG, self).__init__(*args, **kwargs)
7786 return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7788 WinHexdump = urwid.ListBox([urwid.Text("")])
7789 WinInfo = urwid.ListBox([urwid.Text("")])
7790 WinDecodePath = urwid.Text("", "center")
7791 WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7792 WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7793 WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7796 "hexed": list(hexdump(raw)),
7797 "widget_current": None,
7799 "info_bar": WinInfoBar,
7800 "hexdump": WinHexdump,
7801 "hexdump_bar": WinHexdumpBar,
7802 "decode_path": WinDecodePath,
7806 help_text = " ".join((
7808 "space:(un)collapse",
7809 "(pg)up/down/home/end:nav",
7810 "jkJK:hexdump hlHL:info",
7816 ("weight", 1, WinTree),
7817 ("weight", 2, urwid.Pile([
7818 urwid.LineBox(WinInfo),
7819 urwid.LineBox(WinHexdump),
7822 header=urwid.Columns([
7823 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7824 ("weight", 1, WinInfoBar),
7825 ("weight", 1, WinHexdumpBar),
7827 footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7830 ("header", "bold", ""),
7831 ("constructed", "bold", ""),
7832 ("help", "light magenta", ""),
7833 ("warning", "light red", ""),
7834 ("defby", "light red", ""),
7835 ("eoc", "dark red", ""),
7836 ("select-value", "light green", ""),
7837 ("select-expl", "light red", ""),
7838 ("pg-normal", "", "light blue"),
7839 ("pg-complete", "black", "yellow"),
7844 def main(): # pragma: no cover
7846 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7847 parser.add_argument(
7851 help="Skip that number of bytes from the beginning",
7853 parser.add_argument(
7855 help="Python paths to dictionary with OIDs, comma separated",
7857 parser.add_argument(
7859 help="Python path to schema definition to use",
7861 parser.add_argument(
7862 "--defines-by-path",
7863 help="Python path to decoder's defines_by_path",
7865 parser.add_argument(
7867 action="store_true",
7868 help="Disallow BER encoding",
7870 parser.add_argument(
7871 "--print-decode-path",
7872 action="store_true",
7873 help="Print decode paths",
7875 parser.add_argument(
7876 "--decode-path-only",
7877 help="Print only specified decode path",
7879 parser.add_argument(
7881 action="store_true",
7882 help="Allow explicit tag out-of-bound",
7884 parser.add_argument(
7886 action="store_true",
7887 help="Turn on event generation mode",
7889 parser.add_argument(
7891 action="store_true",
7892 help="Start ASN.1 browser",
7894 parser.add_argument(
7896 type=argparse.FileType("rb"),
7897 help="Path to BER/CER/DER file you want to decode",
7899 args = parser.parse_args()
7901 args.RAWFile.seek(args.skip)
7902 raw = memoryview(args.RAWFile.read())
7903 args.RAWFile.close()
7905 raw = file_mmaped(args.RAWFile)[args.skip:]
7907 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7908 if args.oids else ()
7910 from functools import partial
7912 schema = obj_by_path(args.schema)
7913 pprinter = partial(pprint, big_blobs=True)
7915 schema, pprinter = generic_decoder()
7917 "bered": not args.nobered,
7918 "allow_expl_oob": args.allow_expl_oob,
7920 if args.defines_by_path is not None:
7921 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7923 obj, _ = schema().decode(raw, ctx=ctx)
7924 browse(raw, obj, oid_maps)
7925 from sys import exit as sys_exit
7927 from os import environ
7931 with_colours=environ.get("NO_COLOR") is None,
7932 with_decode_path=args.print_decode_path,
7934 () if args.decode_path_only is None else
7935 tuple(args.decode_path_only.split(":"))
7939 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7940 print(pprinter(obj, decode_path=decode_path))
7942 obj, tail = schema().decode(raw, ctx=ctx)
7943 print(pprinter(obj))
7945 print("\nTrailing data: %s" % hexenc(tail))
7948 if __name__ == "__main__":
7949 from pyderasn import *