3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
670 SEQUENCE/SET values with DEFAULT specified are automatically decoded
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
687 with open("huge", "rb") as fd:
688 raw = file_mmaped(fd)
689 obj = Something.decode(raw)
693 mmap-ed files in Python2.7 does not implement buffer protocol, so
694 memoryview won't work on them.
698 mmap maps the **whole** file. So it plays no role if you seek-ed it
699 before. Take the slice of the resulting memoryview with required
704 If you use ZFS as underlying storage, then pay attention that
705 currently most platforms does not deal good with ZFS ARC and ordinary
706 page cache used for mmaps. It can take twice the necessary size in
707 the memory: both in page cache and ZFS ARC.
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
725 opener = io.open if PY2 else open
726 with opener("result", "wb") as fd:
727 obj.encode_cer(fd.write)
732 obj.encode_cer(buf.write)
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
737 data = encode_cer(obj)
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
746 SET OF values can not be streamingly encoded, because they are
747 required to be sorted byte-by-byte. Big SET OF values still will take
748 much memory. Use neither SET nor SET OF values, as modern ASN.1
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
762 class Certificate(Sequence):
766 class SignedAttributes(SetOf):
771 .. _agg_octet_string:
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
780 SOME STRING[CONSTRUCTED]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[CONSTRUCTED]
797 OCTET STRING[PRIMITIVE]
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
812 fd = open("data.p7m", "rb")
813 raw = file_mmaped(fd)
814 ctx = {"bered": True}
815 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816 if decode_path == ("content",):
820 raise ValueError("no content found")
821 hasher_state = sha512()
823 hasher_state.update(data)
825 evgens = SignedData().decode_evgen(
826 raw[content.offset:],
827 offset=content.offset,
830 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
832 digest = hasher_state.digest()
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
841 SEQUENCE OF iterators
842 _____________________
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
878 fulllen, state = obj.encode1st()
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
885 opener = io.open if PY2 else open
886 with opener("result", "wb") as fd:
887 obj.encode2nd(fd.write, iter(state))
891 You **MUST NOT** use 1st pass state if anything is changed in the
892 objects. It is intended to be used immediately after 1st pass is
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
908 .. autofunction:: pyderasn.browse
912 .. autoclass:: pyderasn.Obj
920 .. autoclass:: pyderasn.Boolean
925 .. autoclass:: pyderasn.Integer
926 :members: __init__, named, tohex
930 .. autoclass:: pyderasn.BitString
931 :members: __init__, bit_len, named
935 .. autoclass:: pyderasn.OctetString
940 .. autoclass:: pyderasn.Null
945 .. autoclass:: pyderasn.ObjectIdentifier
950 .. autoclass:: pyderasn.Enumerated
954 .. autoclass:: pyderasn.CommonString
958 .. autoclass:: pyderasn.NumericString
962 .. autoclass:: pyderasn.PrintableString
963 :members: __init__, allow_asterisk, allow_ampersand
967 .. autoclass:: pyderasn.IA5String
971 .. autoclass:: pyderasn.VisibleString
975 .. autoclass:: pyderasn.UTCTime
976 :members: __init__, todatetime
980 .. autoclass:: pyderasn.GeneralizedTime
981 :members: __init__, todatetime
988 .. autoclass:: pyderasn.Choice
989 :members: __init__, choice, value
993 .. autoclass:: PrimitiveTypes
997 .. autoclass:: pyderasn.Any
1005 .. autoclass:: pyderasn.Sequence
1010 .. autoclass:: pyderasn.Set
1015 .. autoclass:: pyderasn.SequenceOf
1020 .. autoclass:: pyderasn.SetOf
1026 .. autofunction:: pyderasn.abs_decode_path
1027 .. autofunction:: pyderasn.agg_octet_string
1028 .. autofunction:: pyderasn.ascii_visualize
1029 .. autofunction:: pyderasn.colonize_hex
1030 .. autofunction:: pyderasn.encode2pass
1031 .. autofunction:: pyderasn.encode_cer
1032 .. autofunction:: pyderasn.file_mmaped
1033 .. autofunction:: pyderasn.hexenc
1034 .. autofunction:: pyderasn.hexdec
1035 .. autofunction:: pyderasn.hexdump
1036 .. autofunction:: pyderasn.tag_encode
1037 .. autofunction:: pyderasn.tag_decode
1038 .. autofunction:: pyderasn.tag_ctxp
1039 .. autofunction:: pyderasn.tag_ctxc
1040 .. autoclass:: pyderasn.DecodeError
1042 .. autoclass:: pyderasn.NotEnoughData
1043 .. autoclass:: pyderasn.ExceedingData
1044 .. autoclass:: pyderasn.LenIndefForm
1045 .. autoclass:: pyderasn.TagMismatch
1046 .. autoclass:: pyderasn.InvalidLength
1047 .. autoclass:: pyderasn.InvalidOID
1048 .. autoclass:: pyderasn.ObjUnknown
1049 .. autoclass:: pyderasn.ObjNotReady
1050 .. autoclass:: pyderasn.InvalidValueType
1051 .. autoclass:: pyderasn.BoundsError
1058 You can decode DER/BER files using command line abilities::
1060 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1062 If there is no schema for your file, then you can try parsing it without,
1063 but of course IMPLICIT tags will often make it impossible. But result is
1064 good enough for the certificate above::
1066 $ python -m pyderasn path/to/file
1067 0 [1,3,1604] . >: SEQUENCE OF
1068 4 [1,3,1453] . . >: SEQUENCE OF
1069 8 [0,0, 5] . . . . >: [0] ANY
1070 . . . . . A0:03:02:01:02
1071 13 [1,1, 3] . . . . >: INTEGER 61595
1072 18 [1,1, 13] . . . . >: SEQUENCE OF
1073 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1074 31 [1,1, 0] . . . . . . >: NULL
1075 33 [1,3, 274] . . . . >: SEQUENCE OF
1076 37 [1,1, 11] . . . . . . >: SET OF
1077 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1078 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1079 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1081 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1082 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1083 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1084 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1085 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1086 . . . . . . . . . 61:2E:63:6F:6D:2F
1087 1461 [1,1, 13] . . >: SEQUENCE OF
1088 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1089 1474 [1,1, 0] . . . . >: NULL
1090 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1091 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1092 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1098 If you have got dictionaries with ObjectIdentifiers, like example one
1099 from ``tests/test_crts.py``::
1102 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1103 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1105 "2.5.4.10": "id-at-organizationName",
1106 "2.5.4.11": "id-at-organizationalUnitName",
1109 then you can pass it to pretty printer to see human readable OIDs::
1111 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1113 37 [1,1, 11] . . . . . . >: SET OF
1114 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1115 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1116 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1117 50 [1,1, 18] . . . . . . >: SET OF
1118 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1119 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1120 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1121 70 [1,1, 18] . . . . . . >: SET OF
1122 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1123 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1124 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1130 Each decoded element has so-called decode path: sequence of structure
1131 names it is passing during the decode process. Each element has its own
1132 unique path inside the whole ASN.1 tree. You can print it out with
1133 ``--print-decode-path`` option::
1135 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1136 0 [1,3,1604] Certificate SEQUENCE []
1137 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1138 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1139 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1140 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1141 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1142 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1144 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1145 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1146 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1147 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1148 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1149 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1150 . . . . . . . 13:02:45:53
1151 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1154 Now you can print only the specified tree, for example signature algorithm::
1156 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1157 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1158 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1159 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1163 from array import array
1164 from codecs import getdecoder
1165 from codecs import getencoder
1166 from collections import namedtuple
1167 from collections import OrderedDict
1168 from copy import copy
1169 from datetime import datetime
1170 from datetime import timedelta
1171 from io import BytesIO
1172 from math import ceil
1173 from mmap import mmap
1174 from mmap import PROT_READ
1175 from operator import attrgetter
1176 from string import ascii_letters
1177 from string import digits
1178 from sys import maxsize as sys_maxsize
1179 from sys import version_info
1180 from unicodedata import category as unicat
1182 from six import add_metaclass
1183 from six import binary_type
1184 from six import byte2int
1185 from six import indexbytes
1186 from six import int2byte
1187 from six import integer_types
1188 from six import iterbytes
1189 from six import iteritems
1190 from six import itervalues
1192 from six import string_types
1193 from six import text_type
1194 from six import unichr as six_unichr
1195 from six.moves import xrange as six_xrange
1199 from termcolor import colored
1200 except ImportError: # pragma: no cover
1201 def colored(what, *args, **kwargs):
1252 "TagClassApplication",
1255 "TagClassUniversal",
1256 "TagFormConstructed",
1267 TagClassUniversal = 0
1268 TagClassApplication = 1 << 6
1269 TagClassContext = 1 << 7
1270 TagClassPrivate = 1 << 6 | 1 << 7
1271 TagFormPrimitive = 0
1272 TagFormConstructed = 1 << 5
1274 TagClassContext: "",
1275 TagClassApplication: "APPLICATION ",
1276 TagClassPrivate: "PRIVATE ",
1277 TagClassUniversal: "UNIV ",
1281 LENINDEF = b"\x80" # length indefinite mark
1282 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1283 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1284 SET01 = frozenset("01")
1285 DECIMALS = frozenset(digits)
1286 DECIMAL_SIGNS = ".,"
1287 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1290 def file_mmaped(fd):
1291 """Make mmap-ed memoryview for reading from file
1293 :param fd: file object
1294 :returns: memoryview over read-only mmap-ing of the whole file
1296 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1300 if not set(value) <= DECIMALS:
1301 raise ValueError("non-pure integer")
1305 def fractions2float(fractions_raw):
1306 pureint(fractions_raw)
1307 return float("0." + fractions_raw)
1310 def get_def_by_path(defines_by_path, sub_decode_path):
1311 """Get define by decode path
1313 for path, define in defines_by_path:
1314 if len(path) != len(sub_decode_path):
1316 for p1, p2 in zip(path, sub_decode_path):
1317 if (p1 is not any) and (p1 != p2):
1323 ########################################################################
1325 ########################################################################
1327 class ASN1Error(ValueError):
1331 class DecodeError(ASN1Error):
1332 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1334 :param str msg: reason of decode failing
1335 :param klass: optional exact DecodeError inherited class (like
1336 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1337 :py:exc:`InvalidLength`)
1338 :param decode_path: tuple of strings. It contains human
1339 readable names of the fields through which
1340 decoding process has passed
1341 :param int offset: binary offset where failure happened
1343 super(DecodeError, self).__init__()
1346 self.decode_path = decode_path
1347 self.offset = offset
1352 "" if self.klass is None else self.klass.__name__,
1354 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1355 if len(self.decode_path) > 0 else ""
1357 ("(at %d)" % self.offset) if self.offset > 0 else "",
1363 return "%s(%s)" % (self.__class__.__name__, self)
1366 class NotEnoughData(DecodeError):
1370 class ExceedingData(ASN1Error):
1371 def __init__(self, nbytes):
1372 super(ExceedingData, self).__init__()
1373 self.nbytes = nbytes
1376 return "%d trailing bytes" % self.nbytes
1379 return "%s(%s)" % (self.__class__.__name__, self)
1382 class LenIndefForm(DecodeError):
1386 class TagMismatch(DecodeError):
1390 class InvalidLength(DecodeError):
1394 class InvalidOID(DecodeError):
1398 class ObjUnknown(ASN1Error):
1399 def __init__(self, name):
1400 super(ObjUnknown, self).__init__()
1404 return "object is unknown: %s" % self.name
1407 return "%s(%s)" % (self.__class__.__name__, self)
1410 class ObjNotReady(ASN1Error):
1411 def __init__(self, name):
1412 super(ObjNotReady, self).__init__()
1416 return "object is not ready: %s" % self.name
1419 return "%s(%s)" % (self.__class__.__name__, self)
1422 class InvalidValueType(ASN1Error):
1423 def __init__(self, expected_types):
1424 super(InvalidValueType, self).__init__()
1425 self.expected_types = expected_types
1428 return "invalid value type, expected: %s" % ", ".join(
1429 [repr(t) for t in self.expected_types]
1433 return "%s(%s)" % (self.__class__.__name__, self)
1436 class BoundsError(ASN1Error):
1437 def __init__(self, bound_min, value, bound_max):
1438 super(BoundsError, self).__init__()
1439 self.bound_min = bound_min
1441 self.bound_max = bound_max
1444 return "unsatisfied bounds: %s <= %s <= %s" % (
1451 return "%s(%s)" % (self.__class__.__name__, self)
1454 ########################################################################
1456 ########################################################################
1458 _hexdecoder = getdecoder("hex")
1459 _hexencoder = getencoder("hex")
1463 """Binary data to hexadecimal string convert
1465 return _hexdecoder(data)[0]
1469 """Hexadecimal string to binary data convert
1471 return _hexencoder(data)[0].decode("ascii")
1474 def int_bytes_len(num, byte_len=8):
1477 return int(ceil(float(num.bit_length()) / byte_len))
1480 def zero_ended_encode(num):
1481 octets = bytearray(int_bytes_len(num, 7))
1483 octets[i] = num & 0x7F
1487 octets[i] = 0x80 | (num & 0x7F)
1490 return bytes(octets)
1493 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1494 """Encode tag to binary form
1496 :param int num: tag's number
1497 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1498 :py:data:`pyderasn.TagClassContext`,
1499 :py:data:`pyderasn.TagClassApplication`,
1500 :py:data:`pyderasn.TagClassPrivate`)
1501 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1502 :py:data:`pyderasn.TagFormConstructed`)
1506 return int2byte(klass | form | num)
1507 # [XX|X|11111][1.......][1.......] ... [0.......]
1508 return int2byte(klass | form | 31) + zero_ended_encode(num)
1511 def tag_decode(tag):
1512 """Decode tag from binary form
1516 No validation is performed, assuming that it has already passed.
1518 It returns tuple with three integers, as
1519 :py:func:`pyderasn.tag_encode` accepts.
1521 first_octet = byte2int(tag)
1522 klass = first_octet & 0xC0
1523 form = first_octet & 0x20
1524 if first_octet & 0x1F < 0x1F:
1525 return (klass, form, first_octet & 0x1F)
1527 for octet in iterbytes(tag[1:]):
1530 return (klass, form, num)
1534 """Create CONTEXT PRIMITIVE tag
1536 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1540 """Create CONTEXT CONSTRUCTED tag
1542 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1545 def tag_strip(data):
1546 """Take off tag from the data
1548 :returns: (encoded tag, tag length, remaining data)
1551 raise NotEnoughData("no data at all")
1552 if byte2int(data) & 0x1F < 31:
1553 return data[:1], 1, data[1:]
1558 raise DecodeError("unfinished tag")
1559 if indexbytes(data, i) & 0x80 == 0:
1561 if i > 1 and indexbytes(data, 1) & 0x7F == 0:
1562 raise DecodeError("leading zero byte in tag value")
1564 return data[:i], i, data[i:]
1570 octets = bytearray(int_bytes_len(l) + 1)
1571 octets[0] = 0x80 | (len(octets) - 1)
1572 for i in six_xrange(len(octets) - 1, 0, -1):
1573 octets[i] = l & 0xFF
1575 return bytes(octets)
1578 def len_decode(data):
1581 :returns: (decoded length, length's length, remaining data)
1582 :raises LenIndefForm: if indefinite form encoding is met
1585 raise NotEnoughData("no data at all")
1586 first_octet = byte2int(data)
1587 if first_octet & 0x80 == 0:
1588 return first_octet, 1, data[1:]
1589 octets_num = first_octet & 0x7F
1590 if octets_num + 1 > len(data):
1591 raise NotEnoughData("encoded length is longer than data")
1593 raise LenIndefForm()
1594 if byte2int(data[1:]) == 0:
1595 raise DecodeError("leading zeros")
1597 for v in iterbytes(data[1:1 + octets_num]):
1600 raise DecodeError("long form instead of short one")
1601 return l, 1 + octets_num, data[1 + octets_num:]
1604 LEN0 = len_encode(0)
1605 LEN1 = len_encode(1)
1606 LEN1K = len_encode(1000)
1610 """How many bytes length field will take
1614 if l < 256: # 1 << 8
1616 if l < 65536: # 1 << 16
1618 if l < 16777216: # 1 << 24
1620 if l < 4294967296: # 1 << 32
1622 if l < 1099511627776: # 1 << 40
1624 if l < 281474976710656: # 1 << 48
1626 if l < 72057594037927936: # 1 << 56
1628 raise OverflowError("too big length")
1631 def write_full(writer, data):
1632 """Fully write provided data
1634 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1636 BytesIO does not guarantee that the whole data will be written at
1637 once. That function write everything provided, raising an error if
1638 ``writer`` returns None.
1640 data = memoryview(data)
1642 while written != len(data):
1643 n = writer(data[written:])
1645 raise ValueError("can not write to buf")
1649 # If it is 64-bit system, then use compact 64-bit array of unsigned
1650 # longs. Use an ordinary list with universal integers otherwise, that
1652 if sys_maxsize > 2 ** 32:
1653 def state_2pass_new():
1656 def state_2pass_new():
1660 ########################################################################
1662 ########################################################################
1664 class AutoAddSlots(type):
1665 def __new__(cls, name, bases, _dict):
1666 _dict["__slots__"] = _dict.get("__slots__", ())
1667 return type.__new__(cls, name, bases, _dict)
1670 BasicState = namedtuple("BasicState", (
1683 ), **NAMEDTUPLE_KWARGS)
1686 @add_metaclass(AutoAddSlots)
1688 """Common ASN.1 object class
1690 All ASN.1 types are inherited from it. It has metaclass that
1691 automatically adds ``__slots__`` to all inherited classes.
1716 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1717 self._expl = getattr(self, "expl", None) if expl is None else expl
1718 if self.tag != self.tag_default and self._expl is not None:
1719 raise ValueError("implicit and explicit tags can not be set simultaneously")
1720 if self.tag is None:
1721 self._tag_order = None
1723 tag_class, _, tag_num = tag_decode(
1724 self.tag if self._expl is None else self._expl
1726 self._tag_order = (tag_class, tag_num)
1727 if default is not None:
1729 self.optional = optional
1730 self.offset, self.llen, self.vlen = _decoded
1732 self.expl_lenindef = False
1733 self.lenindef = False
1734 self.ber_encoded = False
1737 def ready(self): # pragma: no cover
1738 """Is object ready to be encoded?
1740 raise NotImplementedError()
1742 def _assert_ready(self):
1744 raise ObjNotReady(self.__class__.__name__)
1748 """Is either object or any elements inside is BER encoded?
1750 return self.expl_lenindef or self.lenindef or self.ber_encoded
1754 """Is object decoded?
1756 return (self.llen + self.vlen) > 0
1758 def __getstate__(self): # pragma: no cover
1759 """Used for making safe to be mutable pickleable copies
1761 raise NotImplementedError()
1763 def __setstate__(self, state):
1764 if state.version != __version__:
1765 raise ValueError("data is pickled by different PyDERASN version")
1766 self.tag = state.tag
1767 self._tag_order = state.tag_order
1768 self._expl = state.expl
1769 self.default = state.default
1770 self.optional = state.optional
1771 self.offset = state.offset
1772 self.llen = state.llen
1773 self.vlen = state.vlen
1774 self.expl_lenindef = state.expl_lenindef
1775 self.lenindef = state.lenindef
1776 self.ber_encoded = state.ber_encoded
1779 def tag_order(self):
1780 """Tag's (class, number) used for DER/CER sorting
1782 return self._tag_order
1785 def tag_order_cer(self):
1786 return self.tag_order
1790 """.. seealso:: :ref:`decoding`
1792 return len(self.tag)
1796 """.. seealso:: :ref:`decoding`
1798 return self.tlen + self.llen + self.vlen
1800 def __str__(self): # pragma: no cover
1801 return self.__bytes__() if PY2 else self.__unicode__()
1803 def __ne__(self, their):
1804 return not(self == their)
1806 def __gt__(self, their): # pragma: no cover
1807 return not(self < their)
1809 def __le__(self, their): # pragma: no cover
1810 return (self == their) or (self < their)
1812 def __ge__(self, their): # pragma: no cover
1813 return (self == their) or (self > their)
1815 def _encode(self): # pragma: no cover
1816 raise NotImplementedError()
1818 def _encode_cer(self, writer):
1819 write_full(writer, self._encode())
1821 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1822 yield NotImplemented
1824 def _encode1st(self, state):
1825 raise NotImplementedError()
1827 def _encode2nd(self, writer, state_iter):
1828 raise NotImplementedError()
1831 """DER encode the structure
1833 :returns: DER representation
1835 raw = self._encode()
1836 if self._expl is None:
1838 return b"".join((self._expl, len_encode(len(raw)), raw))
1840 def encode1st(self, state=None):
1841 """Do the 1st pass of 2-pass encoding
1843 :rtype: (int, array("L"))
1844 :returns: full length of encoded data and precalculated various
1848 state = state_2pass_new()
1849 if self._expl is None:
1850 return self._encode1st(state)
1852 idx = len(state) - 1
1853 vlen, _ = self._encode1st(state)
1855 fulllen = len(self._expl) + len_size(vlen) + vlen
1856 return fulllen, state
1858 def encode2nd(self, writer, state_iter):
1859 """Do the 2nd pass of 2-pass encoding
1861 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1862 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1864 if self._expl is None:
1865 self._encode2nd(writer, state_iter)
1867 write_full(writer, self._expl + len_encode(next(state_iter)))
1868 self._encode2nd(writer, state_iter)
1870 def encode_cer(self, writer):
1871 """CER encode the structure to specified writer
1873 :param writer: must comply with ``io.RawIOBase.write``
1874 behaviour. It takes slice to be written and
1875 returns number of bytes processed. If it returns
1876 None, then exception will be raised
1878 if self._expl is not None:
1879 write_full(writer, self._expl + LENINDEF)
1880 if getattr(self, "der_forced", False):
1881 write_full(writer, self._encode())
1883 self._encode_cer(writer)
1884 if self._expl is not None:
1885 write_full(writer, EOC)
1887 def hexencode(self):
1888 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1890 return hexenc(self.encode())
1900 _ctx_immutable=True,
1904 :param data: either binary or memoryview
1905 :param int offset: initial data's offset
1906 :param bool leavemm: do we need to leave memoryview of remaining
1907 data as is, or convert it to bytes otherwise
1908 :param decode_path: current decode path (tuples of strings,
1909 possibly with DecodePathDefBy) with will be
1910 the root for all underlying objects
1911 :param ctx: optional :ref:`context <ctx>` governing decoding process
1912 :param bool tag_only: decode only the tag, without length and
1913 contents (used only in Choice and Set
1914 structures, trying to determine if tag satisfies
1916 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1918 :returns: (Obj, remaining data)
1920 .. seealso:: :ref:`decoding`
1922 result = next(self.decode_evgen(
1934 _, obj, tail = result
1945 _ctx_immutable=True,
1948 """Decode with evgen mode on
1950 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1951 it returns the generator producing ``(decode_path, obj, tail)``
1953 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1957 elif _ctx_immutable:
1959 tlv = memoryview(data)
1962 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1965 if self._expl is None:
1966 for result in self._decode(
1969 decode_path=decode_path,
1972 evgen_mode=_evgen_mode,
1977 _decode_path, obj, tail = result
1978 if _decode_path is not decode_path:
1982 t, tlen, lv = tag_strip(tlv)
1983 except DecodeError as err:
1984 raise err.__class__(
1986 klass=self.__class__,
1987 decode_path=decode_path,
1992 klass=self.__class__,
1993 decode_path=decode_path,
1997 l, llen, v = len_decode(lv)
1998 except LenIndefForm as err:
1999 if not ctx.get("bered", False):
2000 raise err.__class__(
2002 klass=self.__class__,
2003 decode_path=decode_path,
2007 offset += tlen + llen
2008 for result in self._decode(
2011 decode_path=decode_path,
2014 evgen_mode=_evgen_mode,
2016 if tag_only: # pragma: no cover
2019 _decode_path, obj, tail = result
2020 if _decode_path is not decode_path:
2022 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2023 if eoc_expected.tobytes() != EOC:
2026 klass=self.__class__,
2027 decode_path=decode_path,
2031 obj.expl_lenindef = True
2032 except DecodeError as err:
2033 raise err.__class__(
2035 klass=self.__class__,
2036 decode_path=decode_path,
2041 raise NotEnoughData(
2042 "encoded length is longer than data",
2043 klass=self.__class__,
2044 decode_path=decode_path,
2047 for result in self._decode(
2049 offset=offset + tlen + llen,
2050 decode_path=decode_path,
2053 evgen_mode=_evgen_mode,
2055 if tag_only: # pragma: no cover
2058 _decode_path, obj, tail = result
2059 if _decode_path is not decode_path:
2061 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2063 "explicit tag out-of-bound, longer than data",
2064 klass=self.__class__,
2065 decode_path=decode_path,
2068 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2070 def decod(self, data, offset=0, decode_path=(), ctx=None):
2071 """Decode the data, check that tail is empty
2073 :raises ExceedingData: if tail is not empty
2075 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2076 (decode without tail) that also checks that there is no
2079 obj, tail = self.decode(
2082 decode_path=decode_path,
2087 raise ExceedingData(len(tail))
2090 def hexdecode(self, data, *args, **kwargs):
2091 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2093 return self.decode(hexdec(data), *args, **kwargs)
2095 def hexdecod(self, data, *args, **kwargs):
2096 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2098 return self.decod(hexdec(data), *args, **kwargs)
2102 """.. seealso:: :ref:`decoding`
2104 return self._expl is not None
2108 """.. seealso:: :ref:`decoding`
2113 def expl_tlen(self):
2114 """.. seealso:: :ref:`decoding`
2116 return len(self._expl)
2119 def expl_llen(self):
2120 """.. seealso:: :ref:`decoding`
2122 if self.expl_lenindef:
2124 return len(len_encode(self.tlvlen))
2127 def expl_offset(self):
2128 """.. seealso:: :ref:`decoding`
2130 return self.offset - self.expl_tlen - self.expl_llen
2133 def expl_vlen(self):
2134 """.. seealso:: :ref:`decoding`
2139 def expl_tlvlen(self):
2140 """.. seealso:: :ref:`decoding`
2142 return self.expl_tlen + self.expl_llen + self.expl_vlen
2145 def fulloffset(self):
2146 """.. seealso:: :ref:`decoding`
2148 return self.expl_offset if self.expled else self.offset
2152 """.. seealso:: :ref:`decoding`
2154 return self.expl_tlvlen if self.expled else self.tlvlen
2156 def pps_lenindef(self, decode_path):
2157 if self.lenindef and not (
2158 getattr(self, "defined", None) is not None and
2159 self.defined[1].lenindef
2162 asn1_type_name="EOC",
2164 decode_path=decode_path,
2166 self.offset + self.tlvlen -
2167 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2175 if self.expl_lenindef:
2177 asn1_type_name="EOC",
2178 obj_name="EXPLICIT",
2179 decode_path=decode_path,
2180 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2189 def encode_cer(obj):
2190 """Encode to CER in memory buffer
2192 :returns bytes: memory buffer contents
2195 obj.encode_cer(buf.write)
2196 return buf.getvalue()
2199 def encode2pass(obj):
2200 """Encode (2-pass mode) to DER in memory buffer
2202 :returns bytes: memory buffer contents
2205 _, state = obj.encode1st()
2206 obj.encode2nd(buf.write, iter(state))
2207 return buf.getvalue()
2210 class DecodePathDefBy(object):
2211 """DEFINED BY representation inside decode path
2213 __slots__ = ("defined_by",)
2215 def __init__(self, defined_by):
2216 self.defined_by = defined_by
2218 def __ne__(self, their):
2219 return not(self == their)
2221 def __eq__(self, their):
2222 if not isinstance(their, self.__class__):
2224 return self.defined_by == their.defined_by
2227 return "DEFINED BY " + str(self.defined_by)
2230 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2233 ########################################################################
2235 ########################################################################
2237 PP = namedtuple("PP", (
2260 ), **NAMEDTUPLE_KWARGS)
2265 asn1_type_name="unknown",
2282 expl_lenindef=False,
2313 def _colourize(what, colour, with_colours, attrs=("bold",)):
2314 return colored(what, colour, attrs=attrs) if with_colours else what
2317 def colonize_hex(hexed):
2318 """Separate hexadecimal string with colons
2320 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2323 def find_oid_name(asn1_type_name, oid_maps, value):
2324 if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2325 for oid_map in oid_maps:
2326 oid_name = oid_map.get(value)
2327 if oid_name is not None:
2338 with_decode_path=False,
2339 decode_path_len_decrease=0,
2346 " " if pp.expl_offset is None else
2347 ("-%d" % (pp.offset - pp.expl_offset))
2349 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2351 col = _colourize(col, "red", with_colours, ())
2352 col += _colourize("B", "red", with_colours) if pp.bered else " "
2354 col = "[%d,%d,%4d]%s" % (
2355 pp.tlen, pp.llen, pp.vlen,
2356 LENINDEF_PP_CHAR if pp.lenindef else " "
2358 col = _colourize(col, "green", with_colours, ())
2360 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2361 if decode_path_len > 0:
2362 cols.append(" ." * decode_path_len)
2363 ent = pp.decode_path[-1]
2364 if isinstance(ent, DecodePathDefBy):
2365 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2366 value = str(ent.defined_by)
2367 oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2368 if oid_name is None:
2369 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2371 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2373 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2374 if pp.expl is not None:
2375 klass, _, num = pp.expl
2376 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2377 cols.append(_colourize(col, "blue", with_colours))
2378 if pp.impl is not None:
2379 klass, _, num = pp.impl
2380 col = "[%s%d]" % (TagClassReprs[klass], num)
2381 cols.append(_colourize(col, "blue", with_colours))
2382 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2383 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2385 cols.append(_colourize("BER", "red", with_colours))
2386 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2387 if pp.value is not None:
2389 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2390 oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2391 if oid_name is not None:
2392 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2393 if pp.asn1_type_name == Integer.asn1_type_name:
2394 cols.append(_colourize(
2395 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2398 if pp.blob.__class__ == binary_type:
2399 cols.append(hexenc(pp.blob))
2400 elif pp.blob.__class__ == tuple:
2401 cols.append(", ".join(pp.blob))
2403 cols.append(_colourize("OPTIONAL", "red", with_colours))
2405 cols.append(_colourize("DEFAULT", "red", with_colours))
2406 if with_decode_path:
2407 cols.append(_colourize(
2408 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2412 return " ".join(cols)
2415 def pp_console_blob(pp, decode_path_len_decrease=0):
2416 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2417 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2418 if decode_path_len > 0:
2419 cols.append(" ." * (decode_path_len + 1))
2420 if pp.blob.__class__ == binary_type:
2421 blob = hexenc(pp.blob).upper()
2422 for i in six_xrange(0, len(blob), 32):
2423 chunk = blob[i:i + 32]
2424 yield " ".join(cols + [colonize_hex(chunk)])
2425 elif pp.blob.__class__ == tuple:
2426 yield " ".join(cols + [", ".join(pp.blob)])
2434 with_decode_path=False,
2435 decode_path_only=(),
2438 """Pretty print object
2440 :param Obj obj: object you want to pretty print
2441 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2442 Its human readable form is printed when OID is met
2443 :param big_blobs: if large binary objects are met (like OctetString
2444 values), do we need to print them too, on separate
2446 :param with_colours: colourize output, if ``termcolor`` library
2448 :param with_decode_path: print decode path
2449 :param decode_path_only: print only that specified decode path
2451 def _pprint_pps(pps):
2453 if hasattr(pp, "_fields"):
2455 decode_path_only != () and
2457 str(p) for p in pp.decode_path[:len(decode_path_only)]
2458 ) != decode_path_only
2462 yield pp_console_row(
2467 with_colours=with_colours,
2468 with_decode_path=with_decode_path,
2469 decode_path_len_decrease=len(decode_path_only),
2471 for row in pp_console_blob(
2473 decode_path_len_decrease=len(decode_path_only),
2477 yield pp_console_row(
2482 with_colours=with_colours,
2483 with_decode_path=with_decode_path,
2484 decode_path_len_decrease=len(decode_path_only),
2487 for row in _pprint_pps(pp):
2489 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2492 ########################################################################
2493 # ASN.1 primitive types
2494 ########################################################################
2496 BooleanState = namedtuple(
2498 BasicState._fields + ("value",),
2504 """``BOOLEAN`` boolean type
2506 >>> b = Boolean(True)
2508 >>> b == Boolean(True)
2514 tag_default = tag_encode(1)
2515 asn1_type_name = "BOOLEAN"
2527 :param value: set the value. Either boolean type, or
2528 :py:class:`pyderasn.Boolean` object
2529 :param bytes impl: override default tag with ``IMPLICIT`` one
2530 :param bytes expl: override default tag with ``EXPLICIT`` one
2531 :param default: set default value. Type same as in ``value``
2532 :param bool optional: is object ``OPTIONAL`` in sequence
2534 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2535 self._value = None if value is None else self._value_sanitize(value)
2536 if default is not None:
2537 default = self._value_sanitize(default)
2538 self.default = self.__class__(
2544 self._value = default
2546 def _value_sanitize(self, value):
2547 if value.__class__ == bool:
2549 if issubclass(value.__class__, Boolean):
2551 raise InvalidValueType((self.__class__, bool))
2555 return self._value is not None
2557 def __getstate__(self):
2558 return BooleanState(
2574 def __setstate__(self, state):
2575 super(Boolean, self).__setstate__(state)
2576 self._value = state.value
2578 def __nonzero__(self):
2579 self._assert_ready()
2583 self._assert_ready()
2586 def __eq__(self, their):
2587 if their.__class__ == bool:
2588 return self._value == their
2589 if not issubclass(their.__class__, Boolean):
2592 self._value == their._value and
2593 self.tag == their.tag and
2594 self._expl == their._expl
2605 return self.__class__(
2607 impl=self.tag if impl is None else impl,
2608 expl=self._expl if expl is None else expl,
2609 default=self.default if default is None else default,
2610 optional=self.optional if optional is None else optional,
2614 self._assert_ready()
2615 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2617 def _encode1st(self, state):
2618 return len(self.tag) + 2, state
2620 def _encode2nd(self, writer, state_iter):
2621 self._assert_ready()
2622 write_full(writer, self._encode())
2624 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2626 t, _, lv = tag_strip(tlv)
2627 except DecodeError as err:
2628 raise err.__class__(
2630 klass=self.__class__,
2631 decode_path=decode_path,
2636 klass=self.__class__,
2637 decode_path=decode_path,
2644 l, _, v = len_decode(lv)
2645 except DecodeError as err:
2646 raise err.__class__(
2648 klass=self.__class__,
2649 decode_path=decode_path,
2653 raise InvalidLength(
2654 "Boolean's length must be equal to 1",
2655 klass=self.__class__,
2656 decode_path=decode_path,
2660 raise NotEnoughData(
2661 "encoded length is longer than data",
2662 klass=self.__class__,
2663 decode_path=decode_path,
2666 first_octet = byte2int(v)
2668 if first_octet == 0:
2670 elif first_octet == 0xFF:
2672 elif ctx.get("bered", False):
2677 "unacceptable Boolean value",
2678 klass=self.__class__,
2679 decode_path=decode_path,
2682 obj = self.__class__(
2686 default=self.default,
2687 optional=self.optional,
2688 _decoded=(offset, 1, 1),
2690 obj.ber_encoded = ber_encoded
2691 yield decode_path, obj, v[1:]
2694 return pp_console_row(next(self.pps()))
2696 def pps(self, decode_path=()):
2699 asn1_type_name=self.asn1_type_name,
2700 obj_name=self.__class__.__name__,
2701 decode_path=decode_path,
2702 value=str(self._value) if self.ready else None,
2703 optional=self.optional,
2704 default=self == self.default,
2705 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2706 expl=None if self._expl is None else tag_decode(self._expl),
2711 expl_offset=self.expl_offset if self.expled else None,
2712 expl_tlen=self.expl_tlen if self.expled else None,
2713 expl_llen=self.expl_llen if self.expled else None,
2714 expl_vlen=self.expl_vlen if self.expled else None,
2715 expl_lenindef=self.expl_lenindef,
2716 ber_encoded=self.ber_encoded,
2719 for pp in self.pps_lenindef(decode_path):
2723 IntegerState = namedtuple(
2725 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2731 """``INTEGER`` integer type
2733 >>> b = Integer(-123)
2735 >>> b == Integer(-123)
2740 >>> Integer(2, bounds=(1, 3))
2742 >>> Integer(5, bounds=(1, 3))
2743 Traceback (most recent call last):
2744 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2748 class Version(Integer):
2755 >>> v = Version("v1")
2762 {'v3': 2, 'v1': 0, 'v2': 1}
2764 __slots__ = ("specs", "_bound_min", "_bound_max")
2765 tag_default = tag_encode(2)
2766 asn1_type_name = "INTEGER"
2780 :param value: set the value. Either integer type, named value
2781 (if ``schema`` is specified in the class), or
2782 :py:class:`pyderasn.Integer` object
2783 :param bounds: set ``(MIN, MAX)`` value constraint.
2784 (-inf, +inf) by default
2785 :param bytes impl: override default tag with ``IMPLICIT`` one
2786 :param bytes expl: override default tag with ``EXPLICIT`` one
2787 :param default: set default value. Type same as in ``value``
2788 :param bool optional: is object ``OPTIONAL`` in sequence
2790 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2792 specs = getattr(self, "schema", {}) if _specs is None else _specs
2793 self.specs = specs if specs.__class__ == dict else dict(specs)
2794 self._bound_min, self._bound_max = getattr(
2797 (float("-inf"), float("+inf")),
2798 ) if bounds is None else bounds
2799 if value is not None:
2800 self._value = self._value_sanitize(value)
2801 if default is not None:
2802 default = self._value_sanitize(default)
2803 self.default = self.__class__(
2809 if self._value is None:
2810 self._value = default
2812 def _value_sanitize(self, value):
2813 if isinstance(value, integer_types):
2815 elif issubclass(value.__class__, Integer):
2816 value = value._value
2817 elif value.__class__ == str:
2818 value = self.specs.get(value)
2820 raise ObjUnknown("integer value: %s" % value)
2822 raise InvalidValueType((self.__class__, int, str))
2823 if not self._bound_min <= value <= self._bound_max:
2824 raise BoundsError(self._bound_min, value, self._bound_max)
2829 return self._value is not None
2831 def __getstate__(self):
2832 return IntegerState(
2851 def __setstate__(self, state):
2852 super(Integer, self).__setstate__(state)
2853 self.specs = state.specs
2854 self._value = state.value
2855 self._bound_min = state.bound_min
2856 self._bound_max = state.bound_max
2859 self._assert_ready()
2860 return int(self._value)
2863 """Hexadecimal representation
2865 Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2867 hex_repr = hex(int(self))[2:].upper()
2868 if len(hex_repr) % 2 != 0:
2869 hex_repr = "0" + hex_repr
2873 self._assert_ready()
2874 return hash(b"".join((
2876 bytes(self._expl or b""),
2877 str(self._value).encode("ascii"),
2880 def __eq__(self, their):
2881 if isinstance(their, integer_types):
2882 return self._value == their
2883 if not issubclass(their.__class__, Integer):
2886 self._value == their._value and
2887 self.tag == their.tag and
2888 self._expl == their._expl
2891 def __lt__(self, their):
2892 return self._value < their._value
2896 """Return named representation (if exists) of the value
2898 for name, value in iteritems(self.specs):
2899 if value == self._value:
2912 return self.__class__(
2915 (self._bound_min, self._bound_max)
2916 if bounds is None else bounds
2918 impl=self.tag if impl is None else impl,
2919 expl=self._expl if expl is None else expl,
2920 default=self.default if default is None else default,
2921 optional=self.optional if optional is None else optional,
2925 def _encode_payload(self):
2926 self._assert_ready()
2930 octets = bytearray([0])
2934 octets = bytearray()
2936 octets.append((value & 0xFF) ^ 0xFF)
2938 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2941 octets = bytearray()
2943 octets.append(value & 0xFF)
2945 if octets[-1] & 0x80 > 0:
2948 octets = bytes(octets)
2950 bytes_len = ceil(value.bit_length() / 8) or 1
2953 octets = value.to_bytes(
2958 except OverflowError:
2965 octets = self._encode_payload()
2966 return b"".join((self.tag, len_encode(len(octets)), octets))
2968 def _encode1st(self, state):
2969 l = len(self._encode_payload())
2970 return len(self.tag) + len_size(l) + l, state
2972 def _encode2nd(self, writer, state_iter):
2973 write_full(writer, self._encode())
2975 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2977 t, _, lv = tag_strip(tlv)
2978 except DecodeError as err:
2979 raise err.__class__(
2981 klass=self.__class__,
2982 decode_path=decode_path,
2987 klass=self.__class__,
2988 decode_path=decode_path,
2995 l, llen, v = len_decode(lv)
2996 except DecodeError as err:
2997 raise err.__class__(
2999 klass=self.__class__,
3000 decode_path=decode_path,
3004 raise NotEnoughData(
3005 "encoded length is longer than data",
3006 klass=self.__class__,
3007 decode_path=decode_path,
3011 raise NotEnoughData(
3013 klass=self.__class__,
3014 decode_path=decode_path,
3017 v, tail = v[:l], v[l:]
3018 first_octet = byte2int(v)
3020 second_octet = byte2int(v[1:])
3022 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3023 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3026 "non normalized integer",
3027 klass=self.__class__,
3028 decode_path=decode_path,
3033 if first_octet & 0x80 > 0:
3034 octets = bytearray()
3035 for octet in bytearray(v):
3036 octets.append(octet ^ 0xFF)
3037 for octet in octets:
3038 value = (value << 8) | octet
3042 for octet in bytearray(v):
3043 value = (value << 8) | octet
3045 value = int.from_bytes(v, byteorder="big", signed=True)
3047 obj = self.__class__(
3049 bounds=(self._bound_min, self._bound_max),
3052 default=self.default,
3053 optional=self.optional,
3055 _decoded=(offset, llen, l),
3057 except BoundsError as err:
3060 klass=self.__class__,
3061 decode_path=decode_path,
3064 yield decode_path, obj, tail
3067 return pp_console_row(next(self.pps()))
3069 def pps(self, decode_path=()):
3072 asn1_type_name=self.asn1_type_name,
3073 obj_name=self.__class__.__name__,
3074 decode_path=decode_path,
3075 value=(self.named or str(self._value)) if self.ready else None,
3076 optional=self.optional,
3077 default=self == self.default,
3078 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3079 expl=None if self._expl is None else tag_decode(self._expl),
3084 expl_offset=self.expl_offset if self.expled else None,
3085 expl_tlen=self.expl_tlen if self.expled else None,
3086 expl_llen=self.expl_llen if self.expled else None,
3087 expl_vlen=self.expl_vlen if self.expled else None,
3088 expl_lenindef=self.expl_lenindef,
3091 for pp in self.pps_lenindef(decode_path):
3095 BitStringState = namedtuple(
3097 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3102 class BitString(Obj):
3103 """``BIT STRING`` bit string type
3105 >>> BitString(b"hello world")
3106 BIT STRING 88 bits 68656c6c6f20776f726c64
3109 >>> b == b"hello world"
3114 >>> BitString("'0A3B5F291CD'H")
3115 BIT STRING 44 bits 0a3b5f291cd0
3116 >>> b = BitString("'010110000000'B")
3117 BIT STRING 12 bits 5800
3120 >>> b[0], b[1], b[2], b[3]
3121 (False, True, False, True)
3125 [False, True, False, True, True, False, False, False, False, False, False, False]
3129 class KeyUsage(BitString):
3131 ("digitalSignature", 0),
3132 ("nonRepudiation", 1),
3133 ("keyEncipherment", 2),
3136 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3137 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3139 ['nonRepudiation', 'keyEncipherment']
3141 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3145 Pay attention that BIT STRING can be encoded both in primitive
3146 and constructed forms. Decoder always checks constructed form tag
3147 additionally to specified primitive one. If BER decoding is
3148 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3149 of DER restrictions.
3151 __slots__ = ("tag_constructed", "specs", "defined")
3152 tag_default = tag_encode(3)
3153 asn1_type_name = "BIT STRING"
3166 :param value: set the value. Either binary type, tuple of named
3167 values (if ``schema`` is specified in the class),
3168 string in ``'XXX...'B`` form, or
3169 :py:class:`pyderasn.BitString` object
3170 :param bytes impl: override default tag with ``IMPLICIT`` one
3171 :param bytes expl: override default tag with ``EXPLICIT`` one
3172 :param default: set default value. Type same as in ``value``
3173 :param bool optional: is object ``OPTIONAL`` in sequence
3175 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3176 specs = getattr(self, "schema", {}) if _specs is None else _specs
3177 self.specs = specs if specs.__class__ == dict else dict(specs)
3178 self._value = None if value is None else self._value_sanitize(value)
3179 if default is not None:
3180 default = self._value_sanitize(default)
3181 self.default = self.__class__(
3187 self._value = default
3189 tag_klass, _, tag_num = tag_decode(self.tag)
3190 self.tag_constructed = tag_encode(
3192 form=TagFormConstructed,
3196 def _bits2octets(self, bits):
3197 if len(self.specs) > 0:
3198 bits = bits.rstrip("0")
3200 bits += "0" * ((8 - (bit_len % 8)) % 8)
3201 octets = bytearray(len(bits) // 8)
3202 for i in six_xrange(len(octets)):
3203 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3204 return bit_len, bytes(octets)
3206 def _value_sanitize(self, value):
3207 if isinstance(value, (string_types, binary_type)):
3209 isinstance(value, string_types) and
3210 value.startswith("'")
3212 if value.endswith("'B"):
3214 if not frozenset(value) <= SET01:
3215 raise ValueError("B's coding contains unacceptable chars")
3216 return self._bits2octets(value)
3217 if value.endswith("'H"):
3221 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3223 if value.__class__ == binary_type:
3224 return (len(value) * 8, value)
3225 raise InvalidValueType((self.__class__, string_types, binary_type))
3226 if value.__class__ == tuple:
3229 isinstance(value[0], integer_types) and
3230 value[1].__class__ == binary_type
3235 bit = self.specs.get(name)
3237 raise ObjUnknown("BitString value: %s" % name)
3240 return self._bits2octets("")
3241 bits = frozenset(bits)
3242 return self._bits2octets("".join(
3243 ("1" if bit in bits else "0")
3244 for bit in six_xrange(max(bits) + 1)
3246 if issubclass(value.__class__, BitString):
3248 raise InvalidValueType((self.__class__, binary_type, string_types))
3252 return self._value is not None
3254 def __getstate__(self):
3255 return BitStringState(
3270 self.tag_constructed,
3274 def __setstate__(self, state):
3275 super(BitString, self).__setstate__(state)
3276 self.specs = state.specs
3277 self._value = state.value
3278 self.tag_constructed = state.tag_constructed
3279 self.defined = state.defined
3282 self._assert_ready()
3283 for i in six_xrange(self._value[0]):
3288 """Returns number of bits in the string
3290 self._assert_ready()
3291 return self._value[0]
3293 def __bytes__(self):
3294 self._assert_ready()
3295 return self._value[1]
3297 def __eq__(self, their):
3298 if their.__class__ == bytes:
3299 return self._value[1] == their
3300 if not issubclass(their.__class__, BitString):
3303 self._value == their._value and
3304 self.tag == their.tag and
3305 self._expl == their._expl
3310 """Named representation (if exists) of the bits
3312 :returns: [str(name), ...]
3314 return [name for name, bit in iteritems(self.specs) if self[bit]]
3324 return self.__class__(
3326 impl=self.tag if impl is None else impl,
3327 expl=self._expl if expl is None else expl,
3328 default=self.default if default is None else default,
3329 optional=self.optional if optional is None else optional,
3333 def __getitem__(self, key):
3334 if key.__class__ == int:
3335 bit_len, octets = self._value
3339 byte2int(memoryview(octets)[key // 8:]) >>
3342 if isinstance(key, string_types):
3343 value = self.specs.get(key)
3345 raise ObjUnknown("BitString value: %s" % key)
3347 raise InvalidValueType((int, str))
3350 self._assert_ready()
3351 bit_len, octets = self._value
3354 len_encode(len(octets) + 1),
3355 int2byte((8 - bit_len % 8) % 8),
3359 def _encode1st(self, state):
3360 self._assert_ready()
3361 _, octets = self._value
3363 return len(self.tag) + len_size(l) + l, state
3365 def _encode2nd(self, writer, state_iter):
3366 bit_len, octets = self._value
3367 write_full(writer, b"".join((
3369 len_encode(len(octets) + 1),
3370 int2byte((8 - bit_len % 8) % 8),
3372 write_full(writer, octets)
3374 def _encode_cer(self, writer):
3375 bit_len, octets = self._value
3376 if len(octets) + 1 <= 1000:
3377 write_full(writer, self._encode())
3379 write_full(writer, self.tag_constructed)
3380 write_full(writer, LENINDEF)
3381 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3382 write_full(writer, b"".join((
3383 BitString.tag_default,
3386 octets[offset:offset + 999],
3388 tail = octets[offset + 999:]
3390 tail = int2byte((8 - bit_len % 8) % 8) + tail
3391 write_full(writer, b"".join((
3392 BitString.tag_default,
3393 len_encode(len(tail)),
3396 write_full(writer, EOC)
3398 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3400 t, tlen, lv = tag_strip(tlv)
3401 except DecodeError as err:
3402 raise err.__class__(
3404 klass=self.__class__,
3405 decode_path=decode_path,
3409 if tag_only: # pragma: no cover
3413 l, llen, v = len_decode(lv)
3414 except DecodeError as err:
3415 raise err.__class__(
3417 klass=self.__class__,
3418 decode_path=decode_path,
3422 raise NotEnoughData(
3423 "encoded length is longer than data",
3424 klass=self.__class__,
3425 decode_path=decode_path,
3429 raise NotEnoughData(
3431 klass=self.__class__,
3432 decode_path=decode_path,
3435 pad_size = byte2int(v)
3436 if l == 1 and pad_size != 0:
3438 "invalid empty value",
3439 klass=self.__class__,
3440 decode_path=decode_path,
3446 klass=self.__class__,
3447 decode_path=decode_path,
3450 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3453 klass=self.__class__,
3454 decode_path=decode_path,
3457 v, tail = v[:l], v[l:]
3458 bit_len = (len(v) - 1) * 8 - pad_size
3459 obj = self.__class__(
3460 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3463 default=self.default,
3464 optional=self.optional,
3466 _decoded=(offset, llen, l),
3469 obj._value = (bit_len, None)
3470 yield decode_path, obj, tail
3472 if t != self.tag_constructed:
3474 klass=self.__class__,
3475 decode_path=decode_path,
3478 if not ctx.get("bered", False):
3480 "unallowed BER constructed encoding",
3481 klass=self.__class__,
3482 decode_path=decode_path,
3485 if tag_only: # pragma: no cover
3490 l, llen, v = len_decode(lv)
3491 except LenIndefForm:
3492 llen, l, v = 1, 0, lv[1:]
3494 except DecodeError as err:
3495 raise err.__class__(
3497 klass=self.__class__,
3498 decode_path=decode_path,
3502 raise NotEnoughData(
3503 "encoded length is longer than data",
3504 klass=self.__class__,
3505 decode_path=decode_path,
3508 if not lenindef and l == 0:
3509 raise NotEnoughData(
3511 klass=self.__class__,
3512 decode_path=decode_path,
3516 sub_offset = offset + tlen + llen
3520 if v[:EOC_LEN].tobytes() == EOC:
3527 "chunk out of bounds",
3528 klass=self.__class__,
3529 decode_path=decode_path + (str(len(chunks) - 1),),
3530 offset=chunks[-1].offset,
3532 sub_decode_path = decode_path + (str(len(chunks)),)
3535 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3538 decode_path=sub_decode_path,
3541 _ctx_immutable=False,
3543 yield _decode_path, chunk, v_tail
3545 _, chunk, v_tail = next(BitString().decode_evgen(
3548 decode_path=sub_decode_path,
3551 _ctx_immutable=False,
3556 "expected BitString encoded chunk",
3557 klass=self.__class__,
3558 decode_path=sub_decode_path,
3561 chunks.append(chunk)
3562 sub_offset += chunk.tlvlen
3563 vlen += chunk.tlvlen
3565 if len(chunks) == 0:
3568 klass=self.__class__,
3569 decode_path=decode_path,
3574 for chunk_i, chunk in enumerate(chunks[:-1]):
3575 if chunk.bit_len % 8 != 0:
3577 "BitString chunk is not multiple of 8 bits",
3578 klass=self.__class__,
3579 decode_path=decode_path + (str(chunk_i),),
3580 offset=chunk.offset,
3583 values.append(bytes(chunk))
3584 bit_len += chunk.bit_len
3585 chunk_last = chunks[-1]
3587 values.append(bytes(chunk_last))
3588 bit_len += chunk_last.bit_len
3589 obj = self.__class__(
3590 value=None if evgen_mode else (bit_len, b"".join(values)),
3593 default=self.default,
3594 optional=self.optional,
3596 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3599 obj._value = (bit_len, None)
3600 obj.lenindef = lenindef
3601 obj.ber_encoded = True
3602 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3605 return pp_console_row(next(self.pps()))
3607 def pps(self, decode_path=()):
3611 bit_len, blob = self._value
3612 value = "%d bits" % bit_len
3613 if len(self.specs) > 0 and blob is not None:
3614 blob = tuple(self.named)
3617 asn1_type_name=self.asn1_type_name,
3618 obj_name=self.__class__.__name__,
3619 decode_path=decode_path,
3622 optional=self.optional,
3623 default=self == self.default,
3624 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3625 expl=None if self._expl is None else tag_decode(self._expl),
3630 expl_offset=self.expl_offset if self.expled else None,
3631 expl_tlen=self.expl_tlen if self.expled else None,
3632 expl_llen=self.expl_llen if self.expled else None,
3633 expl_vlen=self.expl_vlen if self.expled else None,
3634 expl_lenindef=self.expl_lenindef,
3635 lenindef=self.lenindef,
3636 ber_encoded=self.ber_encoded,
3639 defined_by, defined = self.defined or (None, None)
3640 if defined_by is not None:
3642 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3644 for pp in self.pps_lenindef(decode_path):
3648 OctetStringState = namedtuple(
3650 BasicState._fields + (
3661 class OctetString(Obj):
3662 """``OCTET STRING`` binary string type
3664 >>> s = OctetString(b"hello world")
3665 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3666 >>> s == OctetString(b"hello world")
3671 >>> OctetString(b"hello", bounds=(4, 4))
3672 Traceback (most recent call last):
3673 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3674 >>> OctetString(b"hell", bounds=(4, 4))
3675 OCTET STRING 4 bytes 68656c6c
3677 Memoryviews can be used as a values. If memoryview is made on
3678 mmap-ed file, then it does not take storage inside OctetString
3679 itself. In CER encoding mode it will be streamed to the specified
3680 writer, copying 1 KB chunks.
3682 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3683 tag_default = tag_encode(4)
3684 asn1_type_name = "OCTET STRING"
3685 evgen_mode_skip_value = True
3699 :param value: set the value. Either binary type, or
3700 :py:class:`pyderasn.OctetString` object
3701 :param bounds: set ``(MIN, MAX)`` value size constraint.
3702 (-inf, +inf) by default
3703 :param bytes impl: override default tag with ``IMPLICIT`` one
3704 :param bytes expl: override default tag with ``EXPLICIT`` one
3705 :param default: set default value. Type same as in ``value``
3706 :param bool optional: is object ``OPTIONAL`` in sequence
3708 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3710 self._bound_min, self._bound_max = getattr(
3714 ) if bounds is None else bounds
3715 if value is not None:
3716 self._value = self._value_sanitize(value)
3717 if default is not None:
3718 default = self._value_sanitize(default)
3719 self.default = self.__class__(
3724 if self._value is None:
3725 self._value = default
3727 tag_klass, _, tag_num = tag_decode(self.tag)
3728 self.tag_constructed = tag_encode(
3730 form=TagFormConstructed,
3734 def _value_sanitize(self, value):
3735 if value.__class__ == binary_type or value.__class__ == memoryview:
3737 elif issubclass(value.__class__, OctetString):
3738 value = value._value
3740 raise InvalidValueType((self.__class__, bytes, memoryview))
3741 if not self._bound_min <= len(value) <= self._bound_max:
3742 raise BoundsError(self._bound_min, len(value), self._bound_max)
3747 return self._value is not None
3749 def __getstate__(self):
3750 return OctetStringState(
3766 self.tag_constructed,
3770 def __setstate__(self, state):
3771 super(OctetString, self).__setstate__(state)
3772 self._value = state.value
3773 self._bound_min = state.bound_min
3774 self._bound_max = state.bound_max
3775 self.tag_constructed = state.tag_constructed
3776 self.defined = state.defined
3778 def __bytes__(self):
3779 self._assert_ready()
3780 return bytes(self._value)
3782 def __eq__(self, their):
3783 if their.__class__ == binary_type:
3784 return self._value == their
3785 if not issubclass(their.__class__, OctetString):
3788 self._value == their._value and
3789 self.tag == their.tag and
3790 self._expl == their._expl
3793 def __lt__(self, their):
3794 return self._value < their._value
3805 return self.__class__(
3808 (self._bound_min, self._bound_max)
3809 if bounds is None else bounds
3811 impl=self.tag if impl is None else impl,
3812 expl=self._expl if expl is None else expl,
3813 default=self.default if default is None else default,
3814 optional=self.optional if optional is None else optional,
3818 self._assert_ready()
3821 len_encode(len(self._value)),
3825 def _encode1st(self, state):
3826 self._assert_ready()
3827 l = len(self._value)
3828 return len(self.tag) + len_size(l) + l, state
3830 def _encode2nd(self, writer, state_iter):
3832 write_full(writer, self.tag + len_encode(len(value)))
3833 write_full(writer, value)
3835 def _encode_cer(self, writer):
3836 octets = self._value
3837 if len(octets) <= 1000:
3838 write_full(writer, self._encode())
3840 write_full(writer, self.tag_constructed)
3841 write_full(writer, LENINDEF)
3842 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3843 write_full(writer, b"".join((
3844 OctetString.tag_default,
3846 octets[offset:offset + 1000],
3848 tail = octets[offset + 1000:]
3850 write_full(writer, b"".join((
3851 OctetString.tag_default,
3852 len_encode(len(tail)),
3855 write_full(writer, EOC)
3857 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3859 t, tlen, lv = tag_strip(tlv)
3860 except DecodeError as err:
3861 raise err.__class__(
3863 klass=self.__class__,
3864 decode_path=decode_path,
3872 l, llen, v = len_decode(lv)
3873 except DecodeError as err:
3874 raise err.__class__(
3876 klass=self.__class__,
3877 decode_path=decode_path,
3881 raise NotEnoughData(
3882 "encoded length is longer than data",
3883 klass=self.__class__,
3884 decode_path=decode_path,
3887 v, tail = v[:l], v[l:]
3888 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3890 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3891 klass=self.__class__,
3892 decode_path=decode_path,
3896 obj = self.__class__(
3898 None if (evgen_mode and self.evgen_mode_skip_value)
3901 bounds=(self._bound_min, self._bound_max),
3904 default=self.default,
3905 optional=self.optional,
3906 _decoded=(offset, llen, l),
3909 except DecodeError as err:
3912 klass=self.__class__,
3913 decode_path=decode_path,
3916 except BoundsError as err:
3919 klass=self.__class__,
3920 decode_path=decode_path,
3923 yield decode_path, obj, tail
3925 if t != self.tag_constructed:
3927 klass=self.__class__,
3928 decode_path=decode_path,
3931 if not ctx.get("bered", False):
3933 "unallowed BER constructed encoding",
3934 klass=self.__class__,
3935 decode_path=decode_path,
3943 l, llen, v = len_decode(lv)
3944 except LenIndefForm:
3945 llen, l, v = 1, 0, lv[1:]
3947 except DecodeError as err:
3948 raise err.__class__(
3950 klass=self.__class__,
3951 decode_path=decode_path,
3955 raise NotEnoughData(
3956 "encoded length is longer than data",
3957 klass=self.__class__,
3958 decode_path=decode_path,
3963 sub_offset = offset + tlen + llen
3968 if v[:EOC_LEN].tobytes() == EOC:
3975 "chunk out of bounds",
3976 klass=self.__class__,
3977 decode_path=decode_path + (str(len(chunks) - 1),),
3978 offset=chunks[-1].offset,
3982 sub_decode_path = decode_path + (str(chunks_count),)
3983 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3986 decode_path=sub_decode_path,
3989 _ctx_immutable=False,
3991 yield _decode_path, chunk, v_tail
3992 if not chunk.ber_encoded:
3993 payload_len += chunk.vlen
3996 sub_decode_path = decode_path + (str(len(chunks)),)
3997 _, chunk, v_tail = next(OctetString().decode_evgen(
4000 decode_path=sub_decode_path,
4003 _ctx_immutable=False,
4006 chunks.append(chunk)
4009 "expected OctetString encoded chunk",
4010 klass=self.__class__,
4011 decode_path=sub_decode_path,
4014 sub_offset += chunk.tlvlen
4015 vlen += chunk.tlvlen
4017 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4019 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4020 klass=self.__class__,
4021 decode_path=decode_path,
4025 obj = self.__class__(
4027 None if evgen_mode else
4028 b"".join(bytes(chunk) for chunk in chunks)
4030 bounds=(self._bound_min, self._bound_max),
4033 default=self.default,
4034 optional=self.optional,
4035 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4038 except DecodeError as err:
4041 klass=self.__class__,
4042 decode_path=decode_path,
4045 except BoundsError as err:
4048 klass=self.__class__,
4049 decode_path=decode_path,
4052 obj.lenindef = lenindef
4053 obj.ber_encoded = True
4054 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4057 return pp_console_row(next(self.pps()))
4059 def pps(self, decode_path=()):
4062 asn1_type_name=self.asn1_type_name,
4063 obj_name=self.__class__.__name__,
4064 decode_path=decode_path,
4065 value=("%d bytes" % len(self._value)) if self.ready else None,
4066 blob=self._value if self.ready else None,
4067 optional=self.optional,
4068 default=self == self.default,
4069 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4070 expl=None if self._expl is None else tag_decode(self._expl),
4075 expl_offset=self.expl_offset if self.expled else None,
4076 expl_tlen=self.expl_tlen if self.expled else None,
4077 expl_llen=self.expl_llen if self.expled else None,
4078 expl_vlen=self.expl_vlen if self.expled else None,
4079 expl_lenindef=self.expl_lenindef,
4080 lenindef=self.lenindef,
4081 ber_encoded=self.ber_encoded,
4084 defined_by, defined = self.defined or (None, None)
4085 if defined_by is not None:
4087 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4089 for pp in self.pps_lenindef(decode_path):
4093 def agg_octet_string(evgens, decode_path, raw, writer):
4094 """Aggregate constructed string (OctetString and its derivatives)
4096 :param evgens: iterator of generated events
4097 :param decode_path: points to the string we want to decode
4098 :param raw: slicebable (memoryview, bytearray, etc) with
4099 the data evgens are generated on
4100 :param writer: buffer.write where string is going to be saved
4101 :param writer: where string is going to be saved. Must comply
4102 with ``io.RawIOBase.write`` behaviour
4104 .. seealso:: :ref:`agg_octet_string`
4106 decode_path_len = len(decode_path)
4107 for dp, obj, _ in evgens:
4108 if dp[:decode_path_len] != decode_path:
4110 if not obj.ber_encoded:
4111 write_full(writer, raw[
4112 obj.offset + obj.tlen + obj.llen:
4113 obj.offset + obj.tlen + obj.llen + obj.vlen -
4114 (EOC_LEN if obj.expl_lenindef else 0)
4116 if len(dp) == decode_path_len:
4120 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4124 """``NULL`` null object
4132 tag_default = tag_encode(5)
4133 asn1_type_name = "NULL"
4137 value=None, # unused, but Sequence passes it
4144 :param bytes impl: override default tag with ``IMPLICIT`` one
4145 :param bytes expl: override default tag with ``EXPLICIT`` one
4146 :param bool optional: is object ``OPTIONAL`` in sequence
4148 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4155 def __getstate__(self):
4171 def __eq__(self, their):
4172 if not issubclass(their.__class__, Null):
4175 self.tag == their.tag and
4176 self._expl == their._expl
4186 return self.__class__(
4187 impl=self.tag if impl is None else impl,
4188 expl=self._expl if expl is None else expl,
4189 optional=self.optional if optional is None else optional,
4193 return self.tag + LEN0
4195 def _encode1st(self, state):
4196 return len(self.tag) + 1, state
4198 def _encode2nd(self, writer, state_iter):
4199 write_full(writer, self.tag + LEN0)
4201 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4203 t, _, lv = tag_strip(tlv)
4204 except DecodeError as err:
4205 raise err.__class__(
4207 klass=self.__class__,
4208 decode_path=decode_path,
4213 klass=self.__class__,
4214 decode_path=decode_path,
4217 if tag_only: # pragma: no cover
4221 l, _, v = len_decode(lv)
4222 except DecodeError as err:
4223 raise err.__class__(
4225 klass=self.__class__,
4226 decode_path=decode_path,
4230 raise InvalidLength(
4231 "Null must have zero length",
4232 klass=self.__class__,
4233 decode_path=decode_path,
4236 obj = self.__class__(
4239 optional=self.optional,
4240 _decoded=(offset, 1, 0),
4242 yield decode_path, obj, v
4245 return pp_console_row(next(self.pps()))
4247 def pps(self, decode_path=()):
4250 asn1_type_name=self.asn1_type_name,
4251 obj_name=self.__class__.__name__,
4252 decode_path=decode_path,
4253 optional=self.optional,
4254 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4255 expl=None if self._expl is None else tag_decode(self._expl),
4260 expl_offset=self.expl_offset if self.expled else None,
4261 expl_tlen=self.expl_tlen if self.expled else None,
4262 expl_llen=self.expl_llen if self.expled else None,
4263 expl_vlen=self.expl_vlen if self.expled else None,
4264 expl_lenindef=self.expl_lenindef,
4267 for pp in self.pps_lenindef(decode_path):
4271 ObjectIdentifierState = namedtuple(
4272 "ObjectIdentifierState",
4273 BasicState._fields + ("value", "defines"),
4278 class ObjectIdentifier(Obj):
4279 """``OBJECT IDENTIFIER`` OID type
4281 >>> oid = ObjectIdentifier((1, 2, 3))
4282 OBJECT IDENTIFIER 1.2.3
4283 >>> oid == ObjectIdentifier("1.2.3")
4289 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4290 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4292 >>> str(ObjectIdentifier((3, 1)))
4293 Traceback (most recent call last):
4294 pyderasn.InvalidOID: unacceptable first arc value
4296 __slots__ = ("defines",)
4297 tag_default = tag_encode(6)
4298 asn1_type_name = "OBJECT IDENTIFIER"
4311 :param value: set the value. Either tuples of integers,
4312 string of "."-concatenated integers, or
4313 :py:class:`pyderasn.ObjectIdentifier` object
4314 :param defines: sequence of tuples. Each tuple has two elements.
4315 First one is relative to current one decode
4316 path, aiming to the field defined by that OID.
4317 Read about relative path in
4318 :py:func:`pyderasn.abs_decode_path`. Second
4319 tuple element is ``{OID: pyderasn.Obj()}``
4320 dictionary, mapping between current OID value
4321 and structure applied to defined field.
4323 .. seealso:: :ref:`definedby`
4325 :param bytes impl: override default tag with ``IMPLICIT`` one
4326 :param bytes expl: override default tag with ``EXPLICIT`` one
4327 :param default: set default value. Type same as in ``value``
4328 :param bool optional: is object ``OPTIONAL`` in sequence
4330 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4332 if value is not None:
4333 self._value = self._value_sanitize(value)
4334 if default is not None:
4335 default = self._value_sanitize(default)
4336 self.default = self.__class__(
4341 if self._value is None:
4342 self._value = default
4343 self.defines = defines
4345 def __add__(self, their):
4346 if their.__class__ == tuple:
4347 return self.__class__(self._value + array("L", their))
4348 if isinstance(their, self.__class__):
4349 return self.__class__(self._value + their._value)
4350 raise InvalidValueType((self.__class__, tuple))
4352 def _value_sanitize(self, value):
4353 if issubclass(value.__class__, ObjectIdentifier):
4355 if isinstance(value, string_types):
4357 value = array("L", (pureint(arc) for arc in value.split(".")))
4359 raise InvalidOID("unacceptable arcs values")
4360 if value.__class__ == tuple:
4362 value = array("L", value)
4363 except OverflowError as err:
4364 raise InvalidOID(repr(err))
4365 if value.__class__ is array:
4367 raise InvalidOID("less than 2 arcs")
4368 first_arc = value[0]
4369 if first_arc in (0, 1):
4370 if not (0 <= value[1] <= 39):
4371 raise InvalidOID("second arc is too wide")
4372 elif first_arc == 2:
4375 raise InvalidOID("unacceptable first arc value")
4376 if not all(arc >= 0 for arc in value):
4377 raise InvalidOID("negative arc value")
4379 raise InvalidValueType((self.__class__, str, tuple))
4383 return self._value is not None
4385 def __getstate__(self):
4386 return ObjectIdentifierState(
4403 def __setstate__(self, state):
4404 super(ObjectIdentifier, self).__setstate__(state)
4405 self._value = state.value
4406 self.defines = state.defines
4409 self._assert_ready()
4410 return iter(self._value)
4413 return ".".join(str(arc) for arc in self._value or ())
4416 self._assert_ready()
4417 return hash(b"".join((
4419 bytes(self._expl or b""),
4420 str(self._value).encode("ascii"),
4423 def __eq__(self, their):
4424 if their.__class__ == tuple:
4425 return self._value == array("L", their)
4426 if not issubclass(their.__class__, ObjectIdentifier):
4429 self.tag == their.tag and
4430 self._expl == their._expl and
4431 self._value == their._value
4434 def __lt__(self, their):
4435 return self._value < their._value
4446 return self.__class__(
4448 defines=self.defines if defines is None else defines,
4449 impl=self.tag if impl is None else impl,
4450 expl=self._expl if expl is None else expl,
4451 default=self.default if default is None else default,
4452 optional=self.optional if optional is None else optional,
4455 def _encode_octets(self):
4456 self._assert_ready()
4458 first_value = value[1]
4459 first_arc = value[0]
4462 elif first_arc == 1:
4464 elif first_arc == 2:
4466 else: # pragma: no cover
4467 raise RuntimeError("invalid arc is stored")
4468 octets = [zero_ended_encode(first_value)]
4469 for arc in value[2:]:
4470 octets.append(zero_ended_encode(arc))
4471 return b"".join(octets)
4474 v = self._encode_octets()
4475 return b"".join((self.tag, len_encode(len(v)), v))
4477 def _encode1st(self, state):
4478 l = len(self._encode_octets())
4479 return len(self.tag) + len_size(l) + l, state
4481 def _encode2nd(self, writer, state_iter):
4482 write_full(writer, self._encode())
4484 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4486 t, _, lv = tag_strip(tlv)
4487 except DecodeError as err:
4488 raise err.__class__(
4490 klass=self.__class__,
4491 decode_path=decode_path,
4496 klass=self.__class__,
4497 decode_path=decode_path,
4500 if tag_only: # pragma: no cover
4504 l, llen, v = len_decode(lv)
4505 except DecodeError as err:
4506 raise err.__class__(
4508 klass=self.__class__,
4509 decode_path=decode_path,
4513 raise NotEnoughData(
4514 "encoded length is longer than data",
4515 klass=self.__class__,
4516 decode_path=decode_path,
4520 raise NotEnoughData(
4522 klass=self.__class__,
4523 decode_path=decode_path,
4526 v, tail = v[:l], v[l:]
4533 octet = indexbytes(v, i)
4534 if i == 0 and octet == 0x80:
4535 if ctx.get("bered", False):
4539 "non normalized arc encoding",
4540 klass=self.__class__,
4541 decode_path=decode_path,
4544 arc = (arc << 7) | (octet & 0x7F)
4545 if octet & 0x80 == 0:
4548 except OverflowError:
4550 "too huge value for local unsigned long",
4551 klass=self.__class__,
4552 decode_path=decode_path,
4561 klass=self.__class__,
4562 decode_path=decode_path,
4566 second_arc = arcs[0]
4567 if 0 <= second_arc <= 39:
4569 elif 40 <= second_arc <= 79:
4575 obj = self.__class__(
4576 value=array("L", (first_arc, second_arc)) + arcs[1:],
4579 default=self.default,
4580 optional=self.optional,
4581 _decoded=(offset, llen, l),
4584 obj.ber_encoded = True
4585 yield decode_path, obj, tail
4588 return pp_console_row(next(self.pps()))
4590 def pps(self, decode_path=()):
4593 asn1_type_name=self.asn1_type_name,
4594 obj_name=self.__class__.__name__,
4595 decode_path=decode_path,
4596 value=str(self) if self.ready else None,
4597 optional=self.optional,
4598 default=self == self.default,
4599 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4600 expl=None if self._expl is None else tag_decode(self._expl),
4605 expl_offset=self.expl_offset if self.expled else None,
4606 expl_tlen=self.expl_tlen if self.expled else None,
4607 expl_llen=self.expl_llen if self.expled else None,
4608 expl_vlen=self.expl_vlen if self.expled else None,
4609 expl_lenindef=self.expl_lenindef,
4610 ber_encoded=self.ber_encoded,
4613 for pp in self.pps_lenindef(decode_path):
4617 class Enumerated(Integer):
4618 """``ENUMERATED`` integer type
4620 This type is identical to :py:class:`pyderasn.Integer`, but requires
4621 schema to be specified and does not accept values missing from it.
4624 tag_default = tag_encode(10)
4625 asn1_type_name = "ENUMERATED"
4636 bounds=None, # dummy argument, workability for Integer.decode
4638 super(Enumerated, self).__init__(
4639 value, bounds, impl, expl, default, optional, _specs, _decoded,
4641 if len(self.specs) == 0:
4642 raise ValueError("schema must be specified")
4644 def _value_sanitize(self, value):
4645 if isinstance(value, self.__class__):
4646 value = value._value
4647 elif isinstance(value, integer_types):
4648 for _value in itervalues(self.specs):
4653 "unknown integer value: %s" % value,
4654 klass=self.__class__,
4656 elif isinstance(value, string_types):
4657 value = self.specs.get(value)
4659 raise ObjUnknown("integer value: %s" % value)
4661 raise InvalidValueType((self.__class__, int, str))
4673 return self.__class__(
4675 impl=self.tag if impl is None else impl,
4676 expl=self._expl if expl is None else expl,
4677 default=self.default if default is None else default,
4678 optional=self.optional if optional is None else optional,
4683 def escape_control_unicode(c):
4684 if unicat(c)[0] == "C":
4685 c = repr(c).lstrip("u").strip("'")
4689 class CommonString(OctetString):
4690 """Common class for all strings
4692 Everything resembles :py:class:`pyderasn.OctetString`, except
4693 ability to deal with unicode text strings.
4695 >>> hexenc("привет мир".encode("utf-8"))
4696 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4697 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4699 >>> s = UTF8String("привет мир")
4700 UTF8String UTF8String привет мир
4702 'привет мир'
4703 >>> hexenc(bytes(s))
4704 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4706 >>> PrintableString("привет мир")
4707 Traceback (most recent call last):
4708 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4710 >>> BMPString("ада", bounds=(2, 2))
4711 Traceback (most recent call last):
4712 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4713 >>> s = BMPString("ад", bounds=(2, 2))
4716 >>> hexenc(bytes(s))
4723 - Text Encoding, validation
4724 * - :py:class:`pyderasn.UTF8String`
4726 * - :py:class:`pyderasn.NumericString`
4727 - proper alphabet validation
4728 * - :py:class:`pyderasn.PrintableString`
4729 - proper alphabet validation
4730 * - :py:class:`pyderasn.TeletexString`
4732 * - :py:class:`pyderasn.T61String`
4734 * - :py:class:`pyderasn.VideotexString`
4736 * - :py:class:`pyderasn.IA5String`
4737 - proper alphabet validation
4738 * - :py:class:`pyderasn.GraphicString`
4740 * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4741 - proper alphabet validation
4742 * - :py:class:`pyderasn.GeneralString`
4744 * - :py:class:`pyderasn.UniversalString`
4746 * - :py:class:`pyderasn.BMPString`
4751 def _value_sanitize(self, value):
4753 value_decoded = None
4754 if isinstance(value, self.__class__):
4755 value_raw = value._value
4756 elif value.__class__ == text_type:
4757 value_decoded = value
4758 elif value.__class__ == binary_type:
4761 raise InvalidValueType((self.__class__, text_type, binary_type))
4764 value_decoded.encode(self.encoding)
4765 if value_raw is None else value_raw
4768 value_raw.decode(self.encoding)
4769 if value_decoded is None else value_decoded
4771 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4772 raise DecodeError(str(err))
4773 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4781 def __eq__(self, their):
4782 if their.__class__ == binary_type:
4783 return self._value == their
4784 if their.__class__ == text_type:
4785 return self._value == their.encode(self.encoding)
4786 if not isinstance(their, self.__class__):
4789 self._value == their._value and
4790 self.tag == their.tag and
4791 self._expl == their._expl
4794 def __unicode__(self):
4796 return self._value.decode(self.encoding)
4797 return text_type(self._value)
4800 return pp_console_row(next(self.pps(no_unicode=PY2)))
4802 def pps(self, decode_path=(), no_unicode=False):
4806 hexenc(bytes(self)) if no_unicode else
4807 "".join(escape_control_unicode(c) for c in self.__unicode__())
4811 asn1_type_name=self.asn1_type_name,
4812 obj_name=self.__class__.__name__,
4813 decode_path=decode_path,
4815 optional=self.optional,
4816 default=self == self.default,
4817 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4818 expl=None if self._expl is None else tag_decode(self._expl),
4823 expl_offset=self.expl_offset if self.expled else None,
4824 expl_tlen=self.expl_tlen if self.expled else None,
4825 expl_llen=self.expl_llen if self.expled else None,
4826 expl_vlen=self.expl_vlen if self.expled else None,
4827 expl_lenindef=self.expl_lenindef,
4828 ber_encoded=self.ber_encoded,
4831 for pp in self.pps_lenindef(decode_path):
4835 class UTF8String(CommonString):
4837 tag_default = tag_encode(12)
4839 asn1_type_name = "UTF8String"
4842 class AllowableCharsMixin(object):
4844 def allowable_chars(self):
4846 return self._allowable_chars
4847 return frozenset(six_unichr(c) for c in self._allowable_chars)
4849 def _value_sanitize(self, value):
4850 value = super(AllowableCharsMixin, self)._value_sanitize(value)
4851 if not frozenset(value) <= self._allowable_chars:
4852 raise DecodeError("non satisfying alphabet value")
4856 class NumericString(AllowableCharsMixin, CommonString):
4859 Its value is properly sanitized: only ASCII digits with spaces can
4862 >>> NumericString().allowable_chars
4863 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4866 tag_default = tag_encode(18)
4868 asn1_type_name = "NumericString"
4869 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4872 PrintableStringState = namedtuple(
4873 "PrintableStringState",
4874 OctetStringState._fields + ("allowable_chars",),
4879 class PrintableString(AllowableCharsMixin, CommonString):
4882 Its value is properly sanitized: see X.680 41.4 table 10.
4884 >>> PrintableString().allowable_chars
4885 frozenset([' ', "'", ..., 'z'])
4886 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4887 PrintableString PrintableString foo*bar
4888 >>> obj.allow_asterisk, obj.allow_ampersand
4892 tag_default = tag_encode(19)
4894 asn1_type_name = "PrintableString"
4895 _allowable_chars = frozenset(
4896 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4898 _asterisk = frozenset("*".encode("ascii"))
4899 _ampersand = frozenset("&".encode("ascii"))
4911 allow_asterisk=False,
4912 allow_ampersand=False,
4915 :param allow_asterisk: allow asterisk character
4916 :param allow_ampersand: allow ampersand character
4919 self._allowable_chars |= self._asterisk
4921 self._allowable_chars |= self._ampersand
4922 super(PrintableString, self).__init__(
4923 value, bounds, impl, expl, default, optional, _decoded, ctx,
4927 def allow_asterisk(self):
4928 """Is asterisk character allowed?
4930 return self._asterisk <= self._allowable_chars
4933 def allow_ampersand(self):
4934 """Is ampersand character allowed?
4936 return self._ampersand <= self._allowable_chars
4938 def __getstate__(self):
4939 return PrintableStringState(
4940 *super(PrintableString, self).__getstate__(),
4941 **{"allowable_chars": self._allowable_chars}
4944 def __setstate__(self, state):
4945 super(PrintableString, self).__setstate__(state)
4946 self._allowable_chars = state.allowable_chars
4957 return self.__class__(
4960 (self._bound_min, self._bound_max)
4961 if bounds is None else bounds
4963 impl=self.tag if impl is None else impl,
4964 expl=self._expl if expl is None else expl,
4965 default=self.default if default is None else default,
4966 optional=self.optional if optional is None else optional,
4967 allow_asterisk=self.allow_asterisk,
4968 allow_ampersand=self.allow_ampersand,
4972 class TeletexString(CommonString):
4974 tag_default = tag_encode(20)
4975 encoding = "iso-8859-1"
4976 asn1_type_name = "TeletexString"
4979 class T61String(TeletexString):
4981 asn1_type_name = "T61String"
4984 class VideotexString(CommonString):
4986 tag_default = tag_encode(21)
4987 encoding = "iso-8859-1"
4988 asn1_type_name = "VideotexString"
4991 class IA5String(AllowableCharsMixin, CommonString):
4994 Its value is properly sanitized: it is a mix of
4996 * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4997 * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
4998 * DEL character (0x7F)
5000 It is just 7-bit ASCII.
5002 >>> IA5String().allowable_chars
5003 frozenset(["NUL", ... "DEL"])
5006 tag_default = tag_encode(22)
5008 asn1_type_name = "IA5"
5009 _allowable_chars = frozenset(b"".join(
5010 six_unichr(c).encode("ascii") for c in six_xrange(128)
5014 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5015 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5016 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5017 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5018 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5019 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5022 class VisibleString(AllowableCharsMixin, CommonString):
5025 Its value is properly sanitized. ASCII subset from space to tilde is
5026 allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5028 >>> VisibleString().allowable_chars
5029 frozenset([" ", ... "~"])
5032 tag_default = tag_encode(26)
5034 asn1_type_name = "VisibleString"
5035 _allowable_chars = frozenset(b"".join(
5036 six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5040 class ISO646String(VisibleString):
5042 asn1_type_name = "ISO646String"
5045 UTCTimeState = namedtuple(
5047 OctetStringState._fields + ("ber_raw",),
5052 def str_to_time_fractions(value):
5054 year, v = (v // 10**10), (v % 10**10)
5055 month, v = (v // 10**8), (v % 10**8)
5056 day, v = (v // 10**6), (v % 10**6)
5057 hour, v = (v // 10**4), (v % 10**4)
5058 minute, second = (v // 100), (v % 100)
5059 return year, month, day, hour, minute, second
5062 class UTCTime(VisibleString):
5063 """``UTCTime`` datetime type
5065 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5066 UTCTime UTCTime 2017-09-30T22:07:50
5072 datetime.datetime(2017, 9, 30, 22, 7, 50)
5073 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5074 datetime.datetime(1957, 9, 30, 22, 7, 50)
5076 If BER encoded value was met, then ``ber_raw`` attribute will hold
5077 its raw representation.
5081 Pay attention that UTCTime can not hold full year, so all years
5082 having < 50 years are treated as 20xx, 19xx otherwise, according
5083 to X.509 recommendation.
5087 No strict validation of UTC offsets are made, but very crude:
5089 * minutes are not exceeding 60
5090 * offset value is not exceeding 14 hours
5092 __slots__ = ("ber_raw",)
5093 tag_default = tag_encode(23)
5095 asn1_type_name = "UTCTime"
5096 evgen_mode_skip_value = False
5106 bounds=None, # dummy argument, workability for OctetString.decode
5110 :param value: set the value. Either datetime type, or
5111 :py:class:`pyderasn.UTCTime` object
5112 :param bytes impl: override default tag with ``IMPLICIT`` one
5113 :param bytes expl: override default tag with ``EXPLICIT`` one
5114 :param default: set default value. Type same as in ``value``
5115 :param bool optional: is object ``OPTIONAL`` in sequence
5117 super(UTCTime, self).__init__(
5118 None, None, impl, expl, None, optional, _decoded, ctx,
5122 if value is not None:
5123 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5124 self.ber_encoded = self.ber_raw is not None
5125 if default is not None:
5126 default, _ = self._value_sanitize(default)
5127 self.default = self.__class__(
5132 if self._value is None:
5133 self._value = default
5135 self.optional = optional
5137 def _strptime_bered(self, value):
5138 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5141 raise ValueError("no timezone")
5142 year += 2000 if year < 50 else 1900
5143 decoded = datetime(year, month, day, hour, minute)
5145 if value[-1] == "Z":
5149 raise ValueError("invalid UTC offset")
5150 if value[-5] == "-":
5152 elif value[-5] == "+":
5155 raise ValueError("invalid UTC offset")
5156 v = pureint(value[-4:])
5157 offset, v = (60 * (v % 100)), v // 100
5159 raise ValueError("invalid UTC offset minutes")
5161 if offset > 14 * 3600:
5162 raise ValueError("too big UTC offset")
5166 return offset, decoded
5168 raise ValueError("invalid UTC offset seconds")
5169 seconds = pureint(value)
5171 raise ValueError("invalid seconds value")
5172 return offset, decoded + timedelta(seconds=seconds)
5174 def _strptime(self, value):
5175 # datetime.strptime's format: %y%m%d%H%M%SZ
5176 if len(value) != LEN_YYMMDDHHMMSSZ:
5177 raise ValueError("invalid UTCTime length")
5178 if value[-1] != "Z":
5179 raise ValueError("non UTC timezone")
5180 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5181 year += 2000 if year < 50 else 1900
5182 return datetime(year, month, day, hour, minute, second)
5184 def _dt_sanitize(self, value):
5185 if value.year < 1950 or value.year > 2049:
5186 raise ValueError("UTCTime can hold only 1950-2049 years")
5187 return value.replace(microsecond=0)
5189 def _value_sanitize(self, value, ctx=None):
5190 if value.__class__ == binary_type:
5192 value_decoded = value.decode("ascii")
5193 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5194 raise DecodeError("invalid UTCTime encoding: %r" % err)
5197 return self._strptime(value_decoded), None
5198 except (TypeError, ValueError) as _err:
5200 if (ctx is not None) and ctx.get("bered", False):
5202 offset, _value = self._strptime_bered(value_decoded)
5203 _value = _value - timedelta(seconds=offset)
5204 return self._dt_sanitize(_value), value
5205 except (TypeError, ValueError, OverflowError) as _err:
5208 "invalid %s format: %r" % (self.asn1_type_name, err),
5209 klass=self.__class__,
5211 if isinstance(value, self.__class__):
5212 return value._value, None
5213 if value.__class__ == datetime:
5214 return self._dt_sanitize(value), None
5215 raise InvalidValueType((self.__class__, datetime))
5217 def _pp_value(self):
5219 value = self._value.isoformat()
5220 if self.ber_encoded:
5221 value += " (%s)" % self.ber_raw
5225 def __unicode__(self):
5227 value = self._value.isoformat()
5228 if self.ber_encoded:
5229 value += " (%s)" % self.ber_raw
5231 return text_type(self._pp_value())
5233 def __getstate__(self):
5234 return UTCTimeState(
5235 *super(UTCTime, self).__getstate__(),
5236 **{"ber_raw": self.ber_raw}
5239 def __setstate__(self, state):
5240 super(UTCTime, self).__setstate__(state)
5241 self.ber_raw = state.ber_raw
5243 def __bytes__(self):
5244 self._assert_ready()
5245 return self._encode_time()
5247 def __eq__(self, their):
5248 if their.__class__ == binary_type:
5249 return self._encode_time() == their
5250 if their.__class__ == datetime:
5251 return self.todatetime() == their
5252 if not isinstance(their, self.__class__):
5255 self._value == their._value and
5256 self.tag == their.tag and
5257 self._expl == their._expl
5260 def _encode_time(self):
5261 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5264 self._assert_ready()
5265 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5267 def _encode1st(self, state):
5268 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5270 def _encode2nd(self, writer, state_iter):
5271 self._assert_ready()
5272 write_full(writer, self._encode())
5274 def _encode_cer(self, writer):
5275 write_full(writer, self._encode())
5277 def todatetime(self):
5281 return pp_console_row(next(self.pps()))
5283 def pps(self, decode_path=()):
5286 asn1_type_name=self.asn1_type_name,
5287 obj_name=self.__class__.__name__,
5288 decode_path=decode_path,
5289 value=self._pp_value(),
5290 optional=self.optional,
5291 default=self == self.default,
5292 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5293 expl=None if self._expl is None else tag_decode(self._expl),
5298 expl_offset=self.expl_offset if self.expled else None,
5299 expl_tlen=self.expl_tlen if self.expled else None,
5300 expl_llen=self.expl_llen if self.expled else None,
5301 expl_vlen=self.expl_vlen if self.expled else None,
5302 expl_lenindef=self.expl_lenindef,
5303 ber_encoded=self.ber_encoded,
5306 for pp in self.pps_lenindef(decode_path):
5310 class GeneralizedTime(UTCTime):
5311 """``GeneralizedTime`` datetime type
5313 This type is similar to :py:class:`pyderasn.UTCTime`.
5315 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5316 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5318 '20170930220750.000123Z'
5319 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5320 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5324 Only microsecond fractions are supported in DER encoding.
5325 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5326 higher precision values.
5330 BER encoded data can loss information (accuracy) during decoding
5331 because of float transformations.
5335 Local times (without explicit timezone specification) are treated
5336 as UTC one, no transformations are made.
5340 Zero year is unsupported.
5343 tag_default = tag_encode(24)
5344 asn1_type_name = "GeneralizedTime"
5346 def _dt_sanitize(self, value):
5349 def _strptime_bered(self, value):
5350 if len(value) < 4 + 3 * 2:
5351 raise ValueError("invalid GeneralizedTime")
5352 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5353 decoded = datetime(year, month, day, hour)
5354 offset, value = 0, value[10:]
5356 return offset, decoded
5357 if value[-1] == "Z":
5360 for char, sign in (("-", -1), ("+", 1)):
5361 idx = value.rfind(char)
5364 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5365 v = pureint(offset_raw)
5366 if len(offset_raw) == 4:
5367 offset, v = (60 * (v % 100)), v // 100
5369 raise ValueError("invalid UTC offset minutes")
5370 elif len(offset_raw) == 2:
5373 raise ValueError("invalid UTC offset")
5375 if offset > 14 * 3600:
5376 raise ValueError("too big UTC offset")
5380 return offset, decoded
5381 if value[0] in DECIMAL_SIGNS:
5383 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5386 raise ValueError("stripped minutes")
5387 decoded += timedelta(seconds=60 * pureint(value[:2]))
5390 return offset, decoded
5391 if value[0] in DECIMAL_SIGNS:
5393 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5396 raise ValueError("stripped seconds")
5397 decoded += timedelta(seconds=pureint(value[:2]))
5400 return offset, decoded
5401 if value[0] not in DECIMAL_SIGNS:
5402 raise ValueError("invalid format after seconds")
5404 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5407 def _strptime(self, value):
5409 if l == LEN_YYYYMMDDHHMMSSZ:
5410 # datetime.strptime's format: %Y%m%d%H%M%SZ
5411 if value[-1] != "Z":
5412 raise ValueError("non UTC timezone")
5413 return datetime(*str_to_time_fractions(value[:-1]))
5414 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5415 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5416 if value[-1] != "Z":
5417 raise ValueError("non UTC timezone")
5418 if value[14] != ".":
5419 raise ValueError("no fractions separator")
5422 raise ValueError("trailing zero")
5425 raise ValueError("only microsecond fractions are supported")
5426 us = pureint(us + ("0" * (6 - us_len)))
5427 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5428 return datetime(year, month, day, hour, minute, second, us)
5429 raise ValueError("invalid GeneralizedTime length")
5431 def _encode_time(self):
5433 encoded = value.strftime("%Y%m%d%H%M%S")
5434 if value.microsecond > 0:
5435 encoded += (".%06d" % value.microsecond).rstrip("0")
5436 return (encoded + "Z").encode("ascii")
5439 self._assert_ready()
5441 if value.microsecond > 0:
5442 encoded = self._encode_time()
5443 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5444 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5446 def _encode1st(self, state):
5447 self._assert_ready()
5448 vlen = len(self._encode_time())
5449 return len(self.tag) + len_size(vlen) + vlen, state
5451 def _encode2nd(self, writer, state_iter):
5452 write_full(writer, self._encode())
5455 class GraphicString(CommonString):
5457 tag_default = tag_encode(25)
5458 encoding = "iso-8859-1"
5459 asn1_type_name = "GraphicString"
5462 class GeneralString(CommonString):
5464 tag_default = tag_encode(27)
5465 encoding = "iso-8859-1"
5466 asn1_type_name = "GeneralString"
5469 class UniversalString(CommonString):
5471 tag_default = tag_encode(28)
5472 encoding = "utf-32-be"
5473 asn1_type_name = "UniversalString"
5476 class BMPString(CommonString):
5478 tag_default = tag_encode(30)
5479 encoding = "utf-16-be"
5480 asn1_type_name = "BMPString"
5483 ChoiceState = namedtuple(
5485 BasicState._fields + ("specs", "value",),
5491 """``CHOICE`` special type
5495 class GeneralName(Choice):
5497 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5498 ("dNSName", IA5String(impl=tag_ctxp(2))),
5501 >>> gn = GeneralName()
5503 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5504 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5505 >>> gn["dNSName"] = IA5String("bar.baz")
5506 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5507 >>> gn["rfc822Name"]
5510 [2] IA5String IA5 bar.baz
5513 >>> gn.value == gn["dNSName"]
5516 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5518 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5519 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5521 __slots__ = ("specs",)
5523 asn1_type_name = "CHOICE"
5536 :param value: set the value. Either ``(choice, value)`` tuple, or
5537 :py:class:`pyderasn.Choice` object
5538 :param bytes impl: can not be set, do **not** use it
5539 :param bytes expl: override default tag with ``EXPLICIT`` one
5540 :param default: set default value. Type same as in ``value``
5541 :param bool optional: is object ``OPTIONAL`` in sequence
5543 if impl is not None:
5544 raise ValueError("no implicit tag allowed for CHOICE")
5545 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5547 schema = getattr(self, "schema", ())
5548 if len(schema) == 0:
5549 raise ValueError("schema must be specified")
5551 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5554 if value is not None:
5555 self._value = self._value_sanitize(value)
5556 if default is not None:
5557 default_value = self._value_sanitize(default)
5558 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5559 default_obj.specs = self.specs
5560 default_obj._value = default_value
5561 self.default = default_obj
5563 self._value = copy(default_obj._value)
5564 if self._expl is not None:
5565 tag_class, _, tag_num = tag_decode(self._expl)
5566 self._tag_order = (tag_class, tag_num)
5568 def _value_sanitize(self, value):
5569 if (value.__class__ == tuple) and len(value) == 2:
5571 spec = self.specs.get(choice)
5573 raise ObjUnknown(choice)
5574 if not isinstance(obj, spec.__class__):
5575 raise InvalidValueType((spec,))
5576 return (choice, spec(obj))
5577 if isinstance(value, self.__class__):
5579 raise InvalidValueType((self.__class__, tuple))
5583 return self._value is not None and self._value[1].ready
5587 return self.expl_lenindef or (
5588 (self._value is not None) and
5589 self._value[1].bered
5592 def __getstate__(self):
5610 def __setstate__(self, state):
5611 super(Choice, self).__setstate__(state)
5612 self.specs = state.specs
5613 self._value = state.value
5615 def __eq__(self, their):
5616 if (their.__class__ == tuple) and len(their) == 2:
5617 return self._value == their
5618 if not isinstance(their, self.__class__):
5621 self.specs == their.specs and
5622 self._value == their._value
5632 return self.__class__(
5635 expl=self._expl if expl is None else expl,
5636 default=self.default if default is None else default,
5637 optional=self.optional if optional is None else optional,
5642 """Name of the choice
5644 self._assert_ready()
5645 return self._value[0]
5649 """Value of underlying choice
5651 self._assert_ready()
5652 return self._value[1]
5655 def tag_order(self):
5656 self._assert_ready()
5657 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5660 def tag_order_cer(self):
5661 return min(v.tag_order_cer for v in itervalues(self.specs))
5663 def __getitem__(self, key):
5664 if key not in self.specs:
5665 raise ObjUnknown(key)
5666 if self._value is None:
5668 choice, value = self._value
5673 def __setitem__(self, key, value):
5674 spec = self.specs.get(key)
5676 raise ObjUnknown(key)
5677 if not isinstance(value, spec.__class__):
5678 raise InvalidValueType((spec.__class__,))
5679 self._value = (key, spec(value))
5687 return self._value[1].decoded if self.ready else False
5690 self._assert_ready()
5691 return self._value[1].encode()
5693 def _encode1st(self, state):
5694 self._assert_ready()
5695 return self._value[1].encode1st(state)
5697 def _encode2nd(self, writer, state_iter):
5698 self._value[1].encode2nd(writer, state_iter)
5700 def _encode_cer(self, writer):
5701 self._assert_ready()
5702 self._value[1].encode_cer(writer)
5704 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5705 for choice, spec in iteritems(self.specs):
5706 sub_decode_path = decode_path + (choice,)
5712 decode_path=sub_decode_path,
5715 _ctx_immutable=False,
5722 klass=self.__class__,
5723 decode_path=decode_path,
5726 if tag_only: # pragma: no cover
5730 for _decode_path, value, tail in spec.decode_evgen(
5734 decode_path=sub_decode_path,
5736 _ctx_immutable=False,
5738 yield _decode_path, value, tail
5740 _, value, tail = next(spec.decode_evgen(
5744 decode_path=sub_decode_path,
5746 _ctx_immutable=False,
5749 obj = self.__class__(
5752 default=self.default,
5753 optional=self.optional,
5754 _decoded=(offset, 0, value.fulllen),
5756 obj._value = (choice, value)
5757 yield decode_path, obj, tail
5760 value = pp_console_row(next(self.pps()))
5762 value = "%s[%r]" % (value, self.value)
5765 def pps(self, decode_path=()):
5768 asn1_type_name=self.asn1_type_name,
5769 obj_name=self.__class__.__name__,
5770 decode_path=decode_path,
5771 value=self.choice if self.ready else None,
5772 optional=self.optional,
5773 default=self == self.default,
5774 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5775 expl=None if self._expl is None else tag_decode(self._expl),
5780 expl_lenindef=self.expl_lenindef,
5784 yield self.value.pps(decode_path=decode_path + (self.choice,))
5785 for pp in self.pps_lenindef(decode_path):
5789 class PrimitiveTypes(Choice):
5790 """Predefined ``CHOICE`` for all generic primitive types
5792 It could be useful for general decoding of some unspecified values:
5794 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5795 OCTET STRING 3 bytes 666f6f
5796 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5800 schema = tuple((klass.__name__, klass()) for klass in (
5824 AnyState = namedtuple(
5826 BasicState._fields + ("value", "defined"),
5832 """``ANY`` special type
5834 >>> Any(Integer(-123))
5835 ANY INTEGER -123 (0X:7B)
5836 >>> a = Any(OctetString(b"hello world").encode())
5837 ANY 040b68656c6c6f20776f726c64
5838 >>> hexenc(bytes(a))
5839 b'0x040x0bhello world'
5841 __slots__ = ("defined",)
5842 tag_default = tag_encode(0)
5843 asn1_type_name = "ANY"
5853 :param value: set the value. Either any kind of pyderasn's
5854 **ready** object, or bytes. Pay attention that
5855 **no** validation is performed if raw binary value
5856 is valid TLV, except just tag decoding
5857 :param bytes expl: override default tag with ``EXPLICIT`` one
5858 :param bool optional: is object ``OPTIONAL`` in sequence
5860 super(Any, self).__init__(None, expl, None, optional, _decoded)
5864 value = self._value_sanitize(value)
5866 if self._expl is None:
5867 if value.__class__ == binary_type:
5868 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5870 tag_class, tag_num = value.tag_order
5872 tag_class, _, tag_num = tag_decode(self._expl)
5873 self._tag_order = (tag_class, tag_num)
5876 def _value_sanitize(self, value):
5877 if value.__class__ == binary_type:
5879 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5881 if isinstance(value, self.__class__):
5883 if not isinstance(value, Obj):
5884 raise InvalidValueType((self.__class__, Obj, binary_type))
5889 return self._value is not None
5892 def tag_order(self):
5893 self._assert_ready()
5894 return self._tag_order
5898 if self.expl_lenindef or self.lenindef:
5900 if self.defined is None:
5902 return self.defined[1].bered
5904 def __getstate__(self):
5922 def __setstate__(self, state):
5923 super(Any, self).__setstate__(state)
5924 self._value = state.value
5925 self.defined = state.defined
5927 def __eq__(self, their):
5928 if their.__class__ == binary_type:
5929 if self._value.__class__ == binary_type:
5930 return self._value == their
5931 return self._value.encode() == their
5932 if issubclass(their.__class__, Any):
5933 if self.ready and their.ready:
5934 return bytes(self) == bytes(their)
5935 return self.ready == their.ready
5944 return self.__class__(
5946 expl=self._expl if expl is None else expl,
5947 optional=self.optional if optional is None else optional,
5950 def __bytes__(self):
5951 self._assert_ready()
5953 if value.__class__ == binary_type:
5955 return self._value.encode()
5962 self._assert_ready()
5964 if value.__class__ == binary_type:
5966 return value.encode()
5968 def _encode1st(self, state):
5969 self._assert_ready()
5971 if value.__class__ == binary_type:
5972 return len(value), state
5973 return value.encode1st(state)
5975 def _encode2nd(self, writer, state_iter):
5977 if value.__class__ == binary_type:
5978 write_full(writer, value)
5980 value.encode2nd(writer, state_iter)
5982 def _encode_cer(self, writer):
5983 self._assert_ready()
5985 if value.__class__ == binary_type:
5986 write_full(writer, value)
5988 value.encode_cer(writer)
5990 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5992 t, tlen, lv = tag_strip(tlv)
5993 except DecodeError as err:
5994 raise err.__class__(
5996 klass=self.__class__,
5997 decode_path=decode_path,
6001 l, llen, v = len_decode(lv)
6002 except LenIndefForm as err:
6003 if not ctx.get("bered", False):
6004 raise err.__class__(
6006 klass=self.__class__,
6007 decode_path=decode_path,
6010 llen, vlen, v = 1, 0, lv[1:]
6011 sub_offset = offset + tlen + llen
6013 while v[:EOC_LEN].tobytes() != EOC:
6014 chunk, v = Any().decode(
6017 decode_path=decode_path + (str(chunk_i),),
6020 _ctx_immutable=False,
6022 vlen += chunk.tlvlen
6023 sub_offset += chunk.tlvlen
6025 tlvlen = tlen + llen + vlen + EOC_LEN
6026 obj = self.__class__(
6027 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6029 optional=self.optional,
6030 _decoded=(offset, 0, tlvlen),
6033 obj.tag = t.tobytes()
6034 yield decode_path, obj, v[EOC_LEN:]
6036 except DecodeError as err:
6037 raise err.__class__(
6039 klass=self.__class__,
6040 decode_path=decode_path,
6044 raise NotEnoughData(
6045 "encoded length is longer than data",
6046 klass=self.__class__,
6047 decode_path=decode_path,
6050 tlvlen = tlen + llen + l
6051 v, tail = tlv[:tlvlen], v[l:]
6052 obj = self.__class__(
6053 value=None if evgen_mode else v.tobytes(),
6055 optional=self.optional,
6056 _decoded=(offset, 0, tlvlen),
6058 obj.tag = t.tobytes()
6059 yield decode_path, obj, tail
6062 return pp_console_row(next(self.pps()))
6064 def pps(self, decode_path=()):
6068 elif value.__class__ == binary_type:
6074 asn1_type_name=self.asn1_type_name,
6075 obj_name=self.__class__.__name__,
6076 decode_path=decode_path,
6078 blob=self._value if self._value.__class__ == binary_type else None,
6079 optional=self.optional,
6080 default=self == self.default,
6081 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6082 expl=None if self._expl is None else tag_decode(self._expl),
6087 expl_offset=self.expl_offset if self.expled else None,
6088 expl_tlen=self.expl_tlen if self.expled else None,
6089 expl_llen=self.expl_llen if self.expled else None,
6090 expl_vlen=self.expl_vlen if self.expled else None,
6091 expl_lenindef=self.expl_lenindef,
6092 lenindef=self.lenindef,
6095 defined_by, defined = self.defined or (None, None)
6096 if defined_by is not None:
6098 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6100 for pp in self.pps_lenindef(decode_path):
6104 ########################################################################
6105 # ASN.1 constructed types
6106 ########################################################################
6108 def abs_decode_path(decode_path, rel_path):
6109 """Create an absolute decode path from current and relative ones
6111 :param decode_path: current decode path, starting point. Tuple of strings
6112 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6113 If first tuple's element is "/", then treat it as
6114 an absolute path, ignoring ``decode_path`` as
6115 starting point. Also this tuple can contain ".."
6116 elements, stripping the leading element from
6119 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6120 ("foo", "bar", "baz", "whatever")
6121 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6123 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6126 if rel_path[0] == "/":
6128 if rel_path[0] == "..":
6129 return abs_decode_path(decode_path[:-1], rel_path[1:])
6130 return decode_path + rel_path
6133 SequenceState = namedtuple(
6135 BasicState._fields + ("specs", "value",),
6140 class SequenceEncode1stMixing(object):
6141 def _encode1st(self, state):
6143 idx = len(state) - 1
6145 for v in self._values_for_encoding():
6146 l, _ = v.encode1st(state)
6149 return len(self.tag) + len_size(vlen) + vlen, state
6152 class Sequence(SequenceEncode1stMixing, Obj):
6153 """``SEQUENCE`` structure type
6155 You have to make specification of sequence::
6157 class Extension(Sequence):
6159 ("extnID", ObjectIdentifier()),
6160 ("critical", Boolean(default=False)),
6161 ("extnValue", OctetString()),
6164 Then, you can work with it as with dictionary.
6166 >>> ext = Extension()
6167 >>> Extension().specs
6169 ('extnID', OBJECT IDENTIFIER),
6170 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6171 ('extnValue', OCTET STRING),
6173 >>> ext["extnID"] = "1.2.3"
6174 Traceback (most recent call last):
6175 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6176 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6178 You can determine if sequence is ready to be encoded:
6183 Traceback (most recent call last):
6184 pyderasn.ObjNotReady: object is not ready: extnValue
6185 >>> ext["extnValue"] = OctetString(b"foobar")
6189 Value you want to assign, must have the same **type** as in
6190 corresponding specification, but it can have different tags,
6191 optional/default attributes -- they will be taken from specification
6194 class TBSCertificate(Sequence):
6196 ("version", Version(expl=tag_ctxc(0), default="v1")),
6199 >>> tbs = TBSCertificate()
6200 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6202 Assign ``None`` to remove value from sequence.
6204 You can set values in Sequence during its initialization:
6206 >>> AlgorithmIdentifier((
6207 ("algorithm", ObjectIdentifier("1.2.3")),
6208 ("parameters", Any(Null()))
6210 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6212 You can determine if value exists/set in the sequence and take its value:
6214 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6217 OBJECT IDENTIFIER 1.2.3
6219 But pay attention that if value has default, then it won't be (not
6220 in) in the sequence (because ``DEFAULT`` must not be encoded in
6221 DER), but you can read its value:
6223 >>> "critical" in ext, ext["critical"]
6224 (False, BOOLEAN False)
6225 >>> ext["critical"] = Boolean(True)
6226 >>> "critical" in ext, ext["critical"]
6227 (True, BOOLEAN True)
6229 All defaulted values are always optional.
6231 .. _allow_default_values_ctx:
6233 DER prohibits default value encoding and will raise an error if
6234 default value is unexpectedly met during decode.
6235 If :ref:`bered <bered_ctx>` context option is set, then no error
6236 will be raised, but ``bered`` attribute set. You can disable strict
6237 defaulted values existence validation by setting
6238 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6240 All values with DEFAULT specified are decoded atomically in
6241 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6242 SEQUENCE, then it will be yielded as a single element, not
6243 disassembled. That is required for DEFAULT existence check.
6245 Two sequences are equal if they have equal specification (schema),
6246 implicit/explicit tagging and the same values.
6248 __slots__ = ("specs",)
6249 tag_default = tag_encode(form=TagFormConstructed, num=16)
6250 asn1_type_name = "SEQUENCE"
6262 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6264 schema = getattr(self, "schema", ())
6266 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6269 if value is not None:
6270 if issubclass(value.__class__, Sequence):
6271 self._value = value._value
6272 elif hasattr(value, "__iter__"):
6273 for seq_key, seq_value in value:
6274 self[seq_key] = seq_value
6276 raise InvalidValueType((Sequence,))
6277 if default is not None:
6278 if not issubclass(default.__class__, Sequence):
6279 raise InvalidValueType((Sequence,))
6280 default_value = default._value
6281 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6282 default_obj.specs = self.specs
6283 default_obj._value = default_value
6284 self.default = default_obj
6286 self._value = copy(default_obj._value)
6290 for name, spec in iteritems(self.specs):
6291 value = self._value.get(name)
6302 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6304 return any(value.bered for value in itervalues(self._value))
6306 def __getstate__(self):
6307 return SequenceState(
6321 {k: copy(v) for k, v in iteritems(self._value)},
6324 def __setstate__(self, state):
6325 super(Sequence, self).__setstate__(state)
6326 self.specs = state.specs
6327 self._value = state.value
6329 def __eq__(self, their):
6330 if not isinstance(their, self.__class__):
6333 self.specs == their.specs and
6334 self.tag == their.tag and
6335 self._expl == their._expl and
6336 self._value == their._value
6347 return self.__class__(
6350 impl=self.tag if impl is None else impl,
6351 expl=self._expl if expl is None else expl,
6352 default=self.default if default is None else default,
6353 optional=self.optional if optional is None else optional,
6356 def __contains__(self, key):
6357 return key in self._value
6359 def __setitem__(self, key, value):
6360 spec = self.specs.get(key)
6362 raise ObjUnknown(key)
6364 self._value.pop(key, None)
6366 if not isinstance(value, spec.__class__):
6367 raise InvalidValueType((spec.__class__,))
6368 value = spec(value=value)
6369 if spec.default is not None and value == spec.default:
6370 self._value.pop(key, None)
6372 self._value[key] = value
6374 def __getitem__(self, key):
6375 value = self._value.get(key)
6376 if value is not None:
6378 spec = self.specs.get(key)
6380 raise ObjUnknown(key)
6381 if spec.default is not None:
6385 def _values_for_encoding(self):
6386 for name, spec in iteritems(self.specs):
6387 value = self._value.get(name)
6391 raise ObjNotReady(name)
6395 v = b"".join(v.encode() for v in self._values_for_encoding())
6396 return b"".join((self.tag, len_encode(len(v)), v))
6398 def _encode2nd(self, writer, state_iter):
6399 write_full(writer, self.tag + len_encode(next(state_iter)))
6400 for v in self._values_for_encoding():
6401 v.encode2nd(writer, state_iter)
6403 def _encode_cer(self, writer):
6404 write_full(writer, self.tag + LENINDEF)
6405 for v in self._values_for_encoding():
6406 v.encode_cer(writer)
6407 write_full(writer, EOC)
6409 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6411 t, tlen, lv = tag_strip(tlv)
6412 except DecodeError as err:
6413 raise err.__class__(
6415 klass=self.__class__,
6416 decode_path=decode_path,
6421 klass=self.__class__,
6422 decode_path=decode_path,
6425 if tag_only: # pragma: no cover
6429 ctx_bered = ctx.get("bered", False)
6431 l, llen, v = len_decode(lv)
6432 except LenIndefForm as err:
6434 raise err.__class__(
6436 klass=self.__class__,
6437 decode_path=decode_path,
6440 l, llen, v = 0, 1, lv[1:]
6442 except DecodeError as err:
6443 raise err.__class__(
6445 klass=self.__class__,
6446 decode_path=decode_path,
6450 raise NotEnoughData(
6451 "encoded length is longer than data",
6452 klass=self.__class__,
6453 decode_path=decode_path,
6457 v, tail = v[:l], v[l:]
6459 sub_offset = offset + tlen + llen
6462 ctx_allow_default_values = ctx.get("allow_default_values", False)
6463 for name, spec in iteritems(self.specs):
6464 if spec.optional and (
6465 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6469 spec_defaulted = spec.default is not None
6470 sub_decode_path = decode_path + (name,)
6472 if evgen_mode and not spec_defaulted:
6473 for _decode_path, value, v_tail in spec.decode_evgen(
6477 decode_path=sub_decode_path,
6479 _ctx_immutable=False,
6481 yield _decode_path, value, v_tail
6483 _, value, v_tail = next(spec.decode_evgen(
6487 decode_path=sub_decode_path,
6489 _ctx_immutable=False,
6492 except TagMismatch as err:
6493 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6497 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6498 if not evgen_mode and defined is not None:
6499 defined_by, defined_spec = defined
6500 if issubclass(value.__class__, SequenceOf):
6501 for i, _value in enumerate(value):
6502 sub_sub_decode_path = sub_decode_path + (
6504 DecodePathDefBy(defined_by),
6506 defined_value, defined_tail = defined_spec.decode(
6507 memoryview(bytes(_value)),
6509 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6510 if value.expled else (value.tlen + value.llen)
6513 decode_path=sub_sub_decode_path,
6515 _ctx_immutable=False,
6517 if len(defined_tail) > 0:
6520 klass=self.__class__,
6521 decode_path=sub_sub_decode_path,
6524 _value.defined = (defined_by, defined_value)
6526 defined_value, defined_tail = defined_spec.decode(
6527 memoryview(bytes(value)),
6529 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6530 if value.expled else (value.tlen + value.llen)
6533 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6535 _ctx_immutable=False,
6537 if len(defined_tail) > 0:
6540 klass=self.__class__,
6541 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6544 value.defined = (defined_by, defined_value)
6546 value_len = value.fulllen
6548 sub_offset += value_len
6552 yield sub_decode_path, value, v_tail
6553 if value == spec.default:
6554 if ctx_bered or ctx_allow_default_values:
6558 "DEFAULT value met",
6559 klass=self.__class__,
6560 decode_path=sub_decode_path,
6564 values[name] = value
6565 spec_defines = getattr(spec, "defines", ())
6566 if len(spec_defines) == 0:
6567 defines_by_path = ctx.get("defines_by_path", ())
6568 if len(defines_by_path) > 0:
6569 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6570 if spec_defines is not None and len(spec_defines) > 0:
6571 for rel_path, schema in spec_defines:
6572 defined = schema.get(value, None)
6573 if defined is not None:
6574 ctx.setdefault("_defines", []).append((
6575 abs_decode_path(sub_decode_path[:-1], rel_path),
6579 if v[:EOC_LEN].tobytes() != EOC:
6582 klass=self.__class__,
6583 decode_path=decode_path,
6591 klass=self.__class__,
6592 decode_path=decode_path,
6595 obj = self.__class__(
6599 default=self.default,
6600 optional=self.optional,
6601 _decoded=(offset, llen, vlen),
6604 obj.lenindef = lenindef
6605 obj.ber_encoded = ber_encoded
6606 yield decode_path, obj, tail
6609 value = pp_console_row(next(self.pps()))
6611 for name in self.specs:
6612 _value = self._value.get(name)
6615 cols.append("%s: %s" % (name, repr(_value)))
6616 return "%s[%s]" % (value, "; ".join(cols))
6618 def pps(self, decode_path=()):
6621 asn1_type_name=self.asn1_type_name,
6622 obj_name=self.__class__.__name__,
6623 decode_path=decode_path,
6624 optional=self.optional,
6625 default=self == self.default,
6626 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6627 expl=None if self._expl is None else tag_decode(self._expl),
6632 expl_offset=self.expl_offset if self.expled else None,
6633 expl_tlen=self.expl_tlen if self.expled else None,
6634 expl_llen=self.expl_llen if self.expled else None,
6635 expl_vlen=self.expl_vlen if self.expled else None,
6636 expl_lenindef=self.expl_lenindef,
6637 lenindef=self.lenindef,
6638 ber_encoded=self.ber_encoded,
6641 for name in self.specs:
6642 value = self._value.get(name)
6645 yield value.pps(decode_path=decode_path + (name,))
6646 for pp in self.pps_lenindef(decode_path):
6650 class Set(Sequence, SequenceEncode1stMixing):
6651 """``SET`` structure type
6653 Its usage is identical to :py:class:`pyderasn.Sequence`.
6655 .. _allow_unordered_set_ctx:
6657 DER prohibits unordered values encoding and will raise an error
6658 during decode. If :ref:`bered <bered_ctx>` context option is set,
6659 then no error will occur. Also you can disable strict values
6660 ordering check by setting ``"allow_unordered_set": True``
6661 :ref:`context <ctx>` option.
6664 tag_default = tag_encode(form=TagFormConstructed, num=17)
6665 asn1_type_name = "SET"
6667 def _values_for_encoding(self):
6669 super(Set, self)._values_for_encoding(),
6670 key=attrgetter("tag_order"),
6673 def _encode_cer(self, writer):
6674 write_full(writer, self.tag + LENINDEF)
6676 super(Set, self)._values_for_encoding(),
6677 key=attrgetter("tag_order_cer"),
6679 v.encode_cer(writer)
6680 write_full(writer, EOC)
6682 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6684 t, tlen, lv = tag_strip(tlv)
6685 except DecodeError as err:
6686 raise err.__class__(
6688 klass=self.__class__,
6689 decode_path=decode_path,
6694 klass=self.__class__,
6695 decode_path=decode_path,
6702 ctx_bered = ctx.get("bered", False)
6704 l, llen, v = len_decode(lv)
6705 except LenIndefForm as err:
6707 raise err.__class__(
6709 klass=self.__class__,
6710 decode_path=decode_path,
6713 l, llen, v = 0, 1, lv[1:]
6715 except DecodeError as err:
6716 raise err.__class__(
6718 klass=self.__class__,
6719 decode_path=decode_path,
6723 raise NotEnoughData(
6724 "encoded length is longer than data",
6725 klass=self.__class__,
6729 v, tail = v[:l], v[l:]
6731 sub_offset = offset + tlen + llen
6734 ctx_allow_default_values = ctx.get("allow_default_values", False)
6735 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6736 tag_order_prev = (0, 0)
6737 _specs_items = copy(self.specs)
6740 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6742 for name, spec in iteritems(_specs_items):
6743 sub_decode_path = decode_path + (name,)
6749 decode_path=sub_decode_path,
6752 _ctx_immutable=False,
6759 klass=self.__class__,
6760 decode_path=decode_path,
6763 spec_defaulted = spec.default is not None
6764 if evgen_mode and not spec_defaulted:
6765 for _decode_path, value, v_tail in spec.decode_evgen(
6769 decode_path=sub_decode_path,
6771 _ctx_immutable=False,
6773 yield _decode_path, value, v_tail
6775 _, value, v_tail = next(spec.decode_evgen(
6779 decode_path=sub_decode_path,
6781 _ctx_immutable=False,
6784 value_tag_order = value.tag_order
6785 value_len = value.fulllen
6786 if tag_order_prev >= value_tag_order:
6787 if ctx_bered or ctx_allow_unordered_set:
6791 "unordered " + self.asn1_type_name,
6792 klass=self.__class__,
6793 decode_path=sub_decode_path,
6798 yield sub_decode_path, value, v_tail
6799 if value != spec.default:
6801 elif ctx_bered or ctx_allow_default_values:
6805 "DEFAULT value met",
6806 klass=self.__class__,
6807 decode_path=sub_decode_path,
6810 values[name] = value
6811 del _specs_items[name]
6812 tag_order_prev = value_tag_order
6813 sub_offset += value_len
6817 obj = self.__class__(
6821 default=self.default,
6822 optional=self.optional,
6823 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6826 if v[:EOC_LEN].tobytes() != EOC:
6829 klass=self.__class__,
6830 decode_path=decode_path,
6835 for name, spec in iteritems(self.specs):
6836 if name not in values and not spec.optional:
6838 "%s value is not ready" % name,
6839 klass=self.__class__,
6840 decode_path=decode_path,
6845 obj.ber_encoded = ber_encoded
6846 yield decode_path, obj, tail
6849 SequenceOfState = namedtuple(
6851 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6856 class SequenceOf(SequenceEncode1stMixing, Obj):
6857 """``SEQUENCE OF`` sequence type
6859 For that kind of type you must specify the object it will carry on
6860 (bounds are for example here, not required)::
6862 class Ints(SequenceOf):
6867 >>> ints.append(Integer(123))
6868 >>> ints.append(Integer(234))
6870 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6871 >>> [int(i) for i in ints]
6873 >>> ints.append(Integer(345))
6874 Traceback (most recent call last):
6875 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6878 >>> ints[1] = Integer(345)
6880 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6882 You can initialize sequence with preinitialized values:
6884 >>> ints = Ints([Integer(123), Integer(234)])
6886 Also you can use iterator as a value:
6888 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6890 And it won't be iterated until encoding process. Pay attention that
6891 bounds and required schema checks are done only during the encoding
6892 process in that case! After encode was called, then value is zeroed
6893 back to empty list and you have to set it again. That mode is useful
6894 mainly with CER encoding mode, where all objects from the iterable
6895 will be streamed to the buffer, without copying all of them to
6898 __slots__ = ("spec", "_bound_min", "_bound_max")
6899 tag_default = tag_encode(form=TagFormConstructed, num=16)
6900 asn1_type_name = "SEQUENCE OF"
6913 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6915 schema = getattr(self, "schema", None)
6917 raise ValueError("schema must be specified")
6919 self._bound_min, self._bound_max = getattr(
6923 ) if bounds is None else bounds
6925 if value is not None:
6926 self._value = self._value_sanitize(value)
6927 if default is not None:
6928 default_value = self._value_sanitize(default)
6929 default_obj = self.__class__(
6934 default_obj._value = default_value
6935 self.default = default_obj
6937 self._value = copy(default_obj._value)
6939 def _value_sanitize(self, value):
6941 if issubclass(value.__class__, SequenceOf):
6942 value = value._value
6943 elif hasattr(value, NEXT_ATTR_NAME):
6945 elif hasattr(value, "__iter__"):
6948 raise InvalidValueType((self.__class__, iter, "iterator"))
6950 if not self._bound_min <= len(value) <= self._bound_max:
6951 raise BoundsError(self._bound_min, len(value), self._bound_max)
6952 class_expected = self.spec.__class__
6954 if not isinstance(v, class_expected):
6955 raise InvalidValueType((class_expected,))
6960 if hasattr(self._value, NEXT_ATTR_NAME):
6962 if self._bound_min > 0 and len(self._value) == 0:
6964 return all(v.ready for v in self._value)
6968 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6970 return any(v.bered for v in self._value)
6972 def __getstate__(self):
6973 if hasattr(self._value, NEXT_ATTR_NAME):
6974 raise ValueError("can not pickle SequenceOf with iterator")
6975 return SequenceOfState(
6989 [copy(v) for v in self._value],
6994 def __setstate__(self, state):
6995 super(SequenceOf, self).__setstate__(state)
6996 self.spec = state.spec
6997 self._value = state.value
6998 self._bound_min = state.bound_min
6999 self._bound_max = state.bound_max
7001 def __eq__(self, their):
7002 if isinstance(their, self.__class__):
7004 self.spec == their.spec and
7005 self.tag == their.tag and
7006 self._expl == their._expl and
7007 self._value == their._value
7009 if hasattr(their, "__iter__"):
7010 return self._value == list(their)
7022 return self.__class__(
7026 (self._bound_min, self._bound_max)
7027 if bounds is None else bounds
7029 impl=self.tag if impl is None else impl,
7030 expl=self._expl if expl is None else expl,
7031 default=self.default if default is None else default,
7032 optional=self.optional if optional is None else optional,
7035 def __contains__(self, key):
7036 return key in self._value
7038 def append(self, value):
7039 if not isinstance(value, self.spec.__class__):
7040 raise InvalidValueType((self.spec.__class__,))
7041 if len(self._value) + 1 > self._bound_max:
7044 len(self._value) + 1,
7047 self._value.append(value)
7050 return iter(self._value)
7053 return len(self._value)
7055 def __setitem__(self, key, value):
7056 if not isinstance(value, self.spec.__class__):
7057 raise InvalidValueType((self.spec.__class__,))
7058 self._value[key] = self.spec(value=value)
7060 def __getitem__(self, key):
7061 return self._value[key]
7063 def _values_for_encoding(self):
7064 return iter(self._value)
7067 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7070 values_append = values.append
7071 class_expected = self.spec.__class__
7072 values_for_encoding = self._values_for_encoding()
7074 for v in values_for_encoding:
7075 if not isinstance(v, class_expected):
7076 raise InvalidValueType((class_expected,))
7077 values_append(v.encode())
7078 if not self._bound_min <= len(values) <= self._bound_max:
7079 raise BoundsError(self._bound_min, len(values), self._bound_max)
7080 value = b"".join(values)
7082 value = b"".join(v.encode() for v in self._values_for_encoding())
7083 return b"".join((self.tag, len_encode(len(value)), value))
7085 def _encode1st(self, state):
7086 state = super(SequenceOf, self)._encode1st(state)
7087 if hasattr(self._value, NEXT_ATTR_NAME):
7091 def _encode2nd(self, writer, state_iter):
7092 write_full(writer, self.tag + len_encode(next(state_iter)))
7093 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7096 class_expected = self.spec.__class__
7097 values_for_encoding = self._values_for_encoding()
7099 for v in values_for_encoding:
7100 if not isinstance(v, class_expected):
7101 raise InvalidValueType((class_expected,))
7102 v.encode2nd(writer, state_iter)
7104 if not self._bound_min <= values_count <= self._bound_max:
7105 raise BoundsError(self._bound_min, values_count, self._bound_max)
7107 for v in self._values_for_encoding():
7108 v.encode2nd(writer, state_iter)
7110 def _encode_cer(self, writer):
7111 write_full(writer, self.tag + LENINDEF)
7112 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7114 class_expected = self.spec.__class__
7116 values_for_encoding = self._values_for_encoding()
7118 for v in values_for_encoding:
7119 if not isinstance(v, class_expected):
7120 raise InvalidValueType((class_expected,))
7121 v.encode_cer(writer)
7123 if not self._bound_min <= values_count <= self._bound_max:
7124 raise BoundsError(self._bound_min, values_count, self._bound_max)
7126 for v in self._values_for_encoding():
7127 v.encode_cer(writer)
7128 write_full(writer, EOC)
7138 ordering_check=False,
7141 t, tlen, lv = tag_strip(tlv)
7142 except DecodeError as err:
7143 raise err.__class__(
7145 klass=self.__class__,
7146 decode_path=decode_path,
7151 klass=self.__class__,
7152 decode_path=decode_path,
7159 ctx_bered = ctx.get("bered", False)
7161 l, llen, v = len_decode(lv)
7162 except LenIndefForm as err:
7164 raise err.__class__(
7166 klass=self.__class__,
7167 decode_path=decode_path,
7170 l, llen, v = 0, 1, lv[1:]
7172 except DecodeError as err:
7173 raise err.__class__(
7175 klass=self.__class__,
7176 decode_path=decode_path,
7180 raise NotEnoughData(
7181 "encoded length is longer than data",
7182 klass=self.__class__,
7183 decode_path=decode_path,
7187 v, tail = v[:l], v[l:]
7189 sub_offset = offset + tlen + llen
7192 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7193 value_prev = memoryview(v[:0])
7197 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7199 sub_decode_path = decode_path + (str(_value_count),)
7201 for _decode_path, value, v_tail in spec.decode_evgen(
7205 decode_path=sub_decode_path,
7207 _ctx_immutable=False,
7209 yield _decode_path, value, v_tail
7211 _, value, v_tail = next(spec.decode_evgen(
7215 decode_path=sub_decode_path,
7217 _ctx_immutable=False,
7220 value_len = value.fulllen
7222 if value_prev.tobytes() > v[:value_len].tobytes():
7223 if ctx_bered or ctx_allow_unordered_set:
7227 "unordered " + self.asn1_type_name,
7228 klass=self.__class__,
7229 decode_path=sub_decode_path,
7232 value_prev = v[:value_len]
7235 _value.append(value)
7236 sub_offset += value_len
7239 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7241 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7242 klass=self.__class__,
7243 decode_path=decode_path,
7247 obj = self.__class__(
7248 value=None if evgen_mode else _value,
7250 bounds=(self._bound_min, self._bound_max),
7253 default=self.default,
7254 optional=self.optional,
7255 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7257 except BoundsError as err:
7260 klass=self.__class__,
7261 decode_path=decode_path,
7265 if v[:EOC_LEN].tobytes() != EOC:
7268 klass=self.__class__,
7269 decode_path=decode_path,
7274 obj.ber_encoded = ber_encoded
7275 yield decode_path, obj, tail
7279 pp_console_row(next(self.pps())),
7280 ", ".join(repr(v) for v in self._value),
7283 def pps(self, decode_path=()):
7286 asn1_type_name=self.asn1_type_name,
7287 obj_name=self.__class__.__name__,
7288 decode_path=decode_path,
7289 optional=self.optional,
7290 default=self == self.default,
7291 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7292 expl=None if self._expl is None else tag_decode(self._expl),
7297 expl_offset=self.expl_offset if self.expled else None,
7298 expl_tlen=self.expl_tlen if self.expled else None,
7299 expl_llen=self.expl_llen if self.expled else None,
7300 expl_vlen=self.expl_vlen if self.expled else None,
7301 expl_lenindef=self.expl_lenindef,
7302 lenindef=self.lenindef,
7303 ber_encoded=self.ber_encoded,
7306 for i, value in enumerate(self._value):
7307 yield value.pps(decode_path=decode_path + (str(i),))
7308 for pp in self.pps_lenindef(decode_path):
7312 class SetOf(SequenceOf):
7313 """``SET OF`` sequence type
7315 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7318 tag_default = tag_encode(form=TagFormConstructed, num=17)
7319 asn1_type_name = "SET OF"
7321 def _value_sanitize(self, value):
7322 value = super(SetOf, self)._value_sanitize(value)
7323 if hasattr(value, NEXT_ATTR_NAME):
7325 "SetOf does not support iterator values, as no sense in them"
7330 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7331 return b"".join((self.tag, len_encode(len(v)), v))
7333 def _encode2nd(self, writer, state_iter):
7334 write_full(writer, self.tag + len_encode(next(state_iter)))
7336 for v in self._values_for_encoding():
7338 v.encode2nd(buf.write, state_iter)
7339 values.append(buf.getvalue())
7342 write_full(writer, v)
7344 def _encode_cer(self, writer):
7345 write_full(writer, self.tag + LENINDEF)
7346 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7347 write_full(writer, v)
7348 write_full(writer, EOC)
7350 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7351 return super(SetOf, self)._decode(
7358 ordering_check=True,
7362 def obj_by_path(pypath): # pragma: no cover
7363 """Import object specified as string Python path
7365 Modules must be separated from classes/functions with ``:``.
7367 >>> obj_by_path("foo.bar:Baz")
7368 <class 'foo.bar.Baz'>
7369 >>> obj_by_path("foo.bar:Baz.boo")
7370 <classmethod 'foo.bar.Baz.boo'>
7372 mod, objs = pypath.rsplit(":", 1)
7373 from importlib import import_module
7374 obj = import_module(mod)
7375 for obj_name in objs.split("."):
7376 obj = getattr(obj, obj_name)
7380 def generic_decoder(): # pragma: no cover
7381 # All of this below is a big hack with self references
7382 choice = PrimitiveTypes()
7383 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7384 choice.specs["SetOf"] = SetOf(schema=choice)
7385 for i in six_xrange(31):
7386 choice.specs["SequenceOf%d" % i] = SequenceOf(
7390 choice.specs["Any"] = Any()
7392 # Class name equals to type name, to omit it from output
7393 class SEQUENCEOF(SequenceOf):
7401 with_decode_path=False,
7402 decode_path_only=(),
7405 def _pprint_pps(pps):
7407 if hasattr(pp, "_fields"):
7409 decode_path_only != () and
7410 pp.decode_path[:len(decode_path_only)] != decode_path_only
7413 if pp.asn1_type_name == Choice.asn1_type_name:
7415 pp_kwargs = pp._asdict()
7416 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7417 pp = _pp(**pp_kwargs)
7418 yield pp_console_row(
7423 with_colours=with_colours,
7424 with_decode_path=with_decode_path,
7425 decode_path_len_decrease=len(decode_path_only),
7427 for row in pp_console_blob(
7429 decode_path_len_decrease=len(decode_path_only),
7433 for row in _pprint_pps(pp):
7435 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7436 return SEQUENCEOF(), pprint_any
7439 def ascii_visualize(ba):
7440 """Output only ASCII printable characters, like in hexdump -C
7442 Example output for given binary string (right part)::
7444 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.|
7447 return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7451 """Generate ``hexdump -C`` like output
7455 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..|
7456 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j|
7458 Result of that function is a generator of lines, where each line is
7463 ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7464 " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7465 " |i..?.....G...k.j|"]
7469 hexed = hexenc(raw).upper()
7470 addr, cols = 0, ["%08x " % 0]
7471 for i in six_xrange(0, len(hexed), 2):
7472 if i != 0 and i // 2 % 8 == 0:
7474 if i != 0 and i // 2 % 16 == 0:
7475 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7478 cols = ["%08x " % addr]
7479 cols.append(" " + hexed[i:i + 2])
7481 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7485 def browse(raw, obj, oid_maps=()):
7486 """Interactive browser
7488 :param bytes raw: binary data you decoded
7489 :param obj: decoded :py:class:`pyderasn.Obj`
7490 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7491 Its human readable form is printed when OID is met
7493 .. note:: `urwid <http://urwid.org/>`__ dependency required
7495 This browser is an interactive terminal application for browsing
7496 structures of your decoded ASN.1 objects. You can quit it with **q**
7497 key. It consists of three windows:
7500 View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7501 **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7502 **Left** key goes to constructed element above. **Plus**/**Minus**
7503 keys collapse/uncollapse constructed elements. **Space** toggles it
7505 window with various information about element. You can scroll it
7506 with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7508 window with raw data hexdump and highlighted current element's
7509 contents. It automatically focuses on element's data. You can
7510 scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7511 speed) keys. If element has explicit tag, then it also will be
7512 highlighted with different colour
7514 Window's header contains current decode path and progress bars with
7515 position in *info* and *hexdump* windows.
7517 If you press **d**, then current element will be saved in the
7518 current directory under its decode path name (adding ".0", ".1", etc
7519 suffix if such file already exists). **D** will save it with explicit tag.
7521 You can also invoke it with ``--browse`` command line argument.
7523 from copy import deepcopy
7524 from os.path import exists as path_exists
7527 class TW(urwid.TreeWidget):
7528 def __init__(self, state, *args, **kwargs):
7530 self.scrolled = {"info": False, "hexdump": False}
7531 super(TW, self).__init__(*args, **kwargs)
7534 pp = self.get_node().get_value()
7535 constructed = len(pp) > 1
7536 return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7538 def _state_update(self):
7539 pp, _ = self._get_pp()
7540 self.state["decode_path"].set_text(
7541 ":".join(str(p) for p in pp.decode_path)
7543 lines = deepcopy(self.state["hexed"])
7545 def attr_set(i, attr):
7546 line = lines[i // 16]
7547 idx = 1 + (i - 16 * (i // 16))
7548 line[idx] = (attr, line[idx])
7550 if pp.expl_offset is not None:
7551 for i in six_xrange(
7553 pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7555 attr_set(i, "select-expl")
7556 for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7557 attr_set(i, "select-value")
7558 self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7559 self.state["hexdump"].set_focus(pp.offset // 16)
7560 self.state["hexdump"].set_focus_valign("middle")
7561 self.state["hexdump_bar"].set_completion(
7562 (100 * pp.offset // 16) //
7563 len(self.state["hexdump"]._body.positions())
7567 [("header", "Name: "), pp.obj_name],
7568 [("header", "Type: "), pp.asn1_type_name],
7569 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7570 [("header", "[TLV]len: "), "%d/%d/%d" % (
7571 pp.tlen, pp.llen, pp.vlen,
7573 [("header", "TLVlen: "), "%d" % sum((
7574 pp.tlen, pp.llen, pp.vlen,
7576 [("header", "Slice: "), "[%d:%d]" % (
7577 pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7581 lines.append([("warning", "LENINDEF")])
7583 lines.append([("warning", "BER encoded")])
7585 lines.append([("warning", "BERed")])
7586 if pp.expl is not None:
7587 lines.append([("header", "EXPLICIT")])
7588 klass, _, num = pp.expl
7589 lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)])
7590 if pp.expl_offset is not None:
7591 lines.append([" Offset: %d" % pp.expl_offset])
7592 lines.append([" [TLV]len: %d/%d/%d" % (
7593 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7595 lines.append([" TLVlen: %d" % sum((
7596 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7598 lines.append([" Slice: [%d:%d]" % (
7600 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7602 if pp.impl is not None:
7603 klass, _, num = pp.impl
7605 ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7608 lines.append(["OPTIONAL"])
7610 lines.append(["DEFAULT"])
7611 if len(pp.decode_path) > 0:
7612 ent = pp.decode_path[-1]
7613 if isinstance(ent, DecodePathDefBy):
7615 value = str(ent.defined_by)
7616 oid_name = find_oid_name(
7617 ent.defined_by.asn1_type_name, oid_maps, value,
7619 lines.append([("header", "DEFINED BY: "), "%s" % (
7620 value if oid_name is None
7621 else "%s (%s)" % (oid_name, value)
7624 if pp.value is not None:
7625 lines.append([("header", "Value: "), pp.value])
7627 len(oid_maps) > 0 and
7628 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7630 for oid_map in oid_maps:
7631 oid_name = oid_map.get(pp.value)
7632 if oid_name is not None:
7633 lines.append([("header", "Human: "), oid_name])
7635 if pp.asn1_type_name == Integer.asn1_type_name:
7637 ("header", "Decimal: "), "%d" % int(pp.obj),
7640 ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7642 if pp.blob.__class__ == binary_type:
7643 blob = hexenc(pp.blob).upper()
7644 for i in six_xrange(0, len(blob), 32):
7645 lines.append([colonize_hex(blob[i:i + 32])])
7646 elif pp.blob.__class__ == tuple:
7647 lines.append([", ".join(pp.blob)])
7648 self.state["info"]._set_body([urwid.Text(line) for line in lines])
7649 self.state["info_bar"].set_completion(0)
7651 def selectable(self):
7652 if self.state["widget_current"] != self:
7653 self.state["widget_current"] = self
7654 self.scrolled["info"] = False
7655 self.scrolled["hexdump"] = False
7656 self._state_update()
7657 return super(TW, self).selectable()
7659 def get_display_text(self):
7660 pp, constructed = self._get_pp()
7661 style = "constructed" if constructed else ""
7662 if len(pp.decode_path) == 0:
7663 return (style, pp.obj_name)
7664 if pp.asn1_type_name == "EOC":
7665 return ("eoc", "EOC")
7666 ent = pp.decode_path[-1]
7667 if isinstance(ent, DecodePathDefBy):
7668 value = str(ent.defined_by)
7669 oid_name = find_oid_name(
7670 ent.defined_by.asn1_type_name, oid_maps, value,
7672 return ("defby", "DEFBY:" + (
7673 value if oid_name is None else oid_name
7677 def _scroll(self, what, step):
7678 self.state[what]._invalidate()
7679 pos = self.state[what].focus_position
7680 if not self.scrolled[what]:
7681 self.scrolled[what] = True
7683 pos = max(0, pos + step)
7684 pos = min(pos, len(self.state[what]._body.positions()) - 1)
7685 self.state[what].set_focus(pos)
7686 self.state[what].set_focus_valign("top")
7687 self.state[what + "_bar"].set_completion(
7688 (100 * pos) // len(self.state[what]._body.positions())
7691 def keypress(self, size, key):
7693 raise urwid.ExitMainLoop()
7696 self.expanded = not self.expanded
7697 self.update_expanded_icon()
7700 hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7701 if key in hexdump_steps:
7702 self._scroll("hexdump", hexdump_steps[key])
7705 info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7706 if key in info_steps:
7707 self._scroll("info", info_steps[key])
7710 if key in ("d", "D"):
7711 pp, _ = self._get_pp()
7712 dp = ":".join(str(p) for p in pp.decode_path)
7713 dp = dp.replace(" ", "_")
7716 if key == "d" or pp.expl_offset is None:
7717 data = self.state["raw"][pp.offset:(
7718 pp.offset + pp.tlen + pp.llen + pp.vlen
7721 data = self.state["raw"][pp.expl_offset:(
7722 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7726 def duplicate_path(dp, ctr):
7729 return "%s.%d" % (dp, ctr)
7732 if not path_exists(duplicate_path(dp, ctr)):
7735 dp = duplicate_path(dp, ctr)
7736 with open(dp, "wb") as fd:
7738 self.state["decode_path"].set_text(
7739 ("warning", "Saved to: " + dp)
7742 return super(TW, self).keypress(size, key)
7744 class PN(urwid.ParentNode):
7745 def __init__(self, state, value, *args, **kwargs):
7747 if not hasattr(value, "_fields"):
7749 super(PN, self).__init__(value, *args, **kwargs)
7751 def load_widget(self):
7752 return TW(self.state, self)
7754 def load_child_keys(self):
7755 value = self.get_value()
7756 if hasattr(value, "_fields"):
7758 return range(len(value[1:]))
7760 def load_child_node(self, key):
7763 self.get_value()[key + 1],
7766 depth=self.get_depth() + 1,
7769 class LabeledPG(urwid.ProgressBar):
7770 def __init__(self, label, *args, **kwargs):
7772 super(LabeledPG, self).__init__(*args, **kwargs)
7775 return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7777 WinHexdump = urwid.ListBox([urwid.Text("")])
7778 WinInfo = urwid.ListBox([urwid.Text("")])
7779 WinDecodePath = urwid.Text("", "center")
7780 WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7781 WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7782 WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7785 "hexed": list(hexdump(raw)),
7786 "widget_current": None,
7788 "info_bar": WinInfoBar,
7789 "hexdump": WinHexdump,
7790 "hexdump_bar": WinHexdumpBar,
7791 "decode_path": WinDecodePath,
7795 help_text = " ".join((
7797 "space:(un)collapse",
7798 "(pg)up/down/home/end:nav",
7799 "jkJK:hexdump hlHL:info",
7805 ("weight", 1, WinTree),
7806 ("weight", 2, urwid.Pile([
7807 urwid.LineBox(WinInfo),
7808 urwid.LineBox(WinHexdump),
7811 header=urwid.Columns([
7812 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7813 ("weight", 1, WinInfoBar),
7814 ("weight", 1, WinHexdumpBar),
7816 footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7819 ("header", "bold", ""),
7820 ("constructed", "bold", ""),
7821 ("help", "light magenta", ""),
7822 ("warning", "light red", ""),
7823 ("defby", "light red", ""),
7824 ("eoc", "dark red", ""),
7825 ("select-value", "light green", ""),
7826 ("select-expl", "light red", ""),
7827 ("pg-normal", "", "light blue"),
7828 ("pg-complete", "black", "yellow"),
7833 def main(): # pragma: no cover
7835 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7836 parser.add_argument(
7840 help="Skip that number of bytes from the beginning",
7842 parser.add_argument(
7844 help="Python paths to dictionary with OIDs, comma separated",
7846 parser.add_argument(
7848 help="Python path to schema definition to use",
7850 parser.add_argument(
7851 "--defines-by-path",
7852 help="Python path to decoder's defines_by_path",
7854 parser.add_argument(
7856 action="store_true",
7857 help="Disallow BER encoding",
7859 parser.add_argument(
7860 "--print-decode-path",
7861 action="store_true",
7862 help="Print decode paths",
7864 parser.add_argument(
7865 "--decode-path-only",
7866 help="Print only specified decode path",
7868 parser.add_argument(
7870 action="store_true",
7871 help="Allow explicit tag out-of-bound",
7873 parser.add_argument(
7875 action="store_true",
7876 help="Turn on event generation mode",
7878 parser.add_argument(
7880 action="store_true",
7881 help="Start ASN.1 browser",
7883 parser.add_argument(
7885 type=argparse.FileType("rb"),
7886 help="Path to BER/CER/DER file you want to decode",
7888 args = parser.parse_args()
7890 args.RAWFile.seek(args.skip)
7891 raw = memoryview(args.RAWFile.read())
7892 args.RAWFile.close()
7894 raw = file_mmaped(args.RAWFile)[args.skip:]
7896 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7897 if args.oids else ()
7899 from functools import partial
7901 schema = obj_by_path(args.schema)
7902 pprinter = partial(pprint, big_blobs=True)
7904 schema, pprinter = generic_decoder()
7906 "bered": not args.nobered,
7907 "allow_expl_oob": args.allow_expl_oob,
7909 if args.defines_by_path is not None:
7910 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7912 obj, _ = schema().decode(raw, ctx=ctx)
7913 browse(raw, obj, oid_maps)
7914 from sys import exit as sys_exit
7916 from os import environ
7920 with_colours=environ.get("NO_COLOR") is None,
7921 with_decode_path=args.print_decode_path,
7923 () if args.decode_path_only is None else
7924 tuple(args.decode_path_only.split(":"))
7928 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7929 print(pprinter(obj, decode_path=decode_path))
7931 obj, tail = schema().decode(raw, ctx=ctx)
7932 print(pprinter(obj))
7934 print("\nTrailing data: %s" % hexenc(tail))
7937 if __name__ == "__main__":
7938 from pyderasn import *