3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
670 SEQUENCE/SET values with DEFAULT specified are automatically decoded
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
687 with open("huge", "rb") as fd:
688 raw = file_mmaped(fd)
689 obj = Something.decode(raw)
693 mmap-ed files in Python2.7 does not implement buffer protocol, so
694 memoryview won't work on them.
698 mmap maps the **whole** file. So it plays no role if you seek-ed it
699 before. Take the slice of the resulting memoryview with required
704 If you use ZFS as underlying storage, then pay attention that
705 currently most platforms does not deal good with ZFS ARC and ordinary
706 page cache used for mmaps. It can take twice the necessary size in
707 the memory: both in page cache and ZFS ARC.
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
725 opener = io.open if PY2 else open
726 with opener("result", "wb") as fd:
727 obj.encode_cer(fd.write)
732 obj.encode_cer(buf.write)
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
737 data = encode_cer(obj)
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
746 SET OF values can not be streamingly encoded, because they are
747 required to be sorted byte-by-byte. Big SET OF values still will take
748 much memory. Use neither SET nor SET OF values, as modern ASN.1
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
762 class Certificate(Sequence):
766 class SignedAttributes(SetOf):
771 .. _agg_octet_string:
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
780 SOME STRING[CONSTRUCTED]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[CONSTRUCTED]
797 OCTET STRING[PRIMITIVE]
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
812 fd = open("data.p7m", "rb")
813 raw = file_mmaped(fd)
814 ctx = {"bered": True}
815 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816 if decode_path == ("content",):
820 raise ValueError("no content found")
821 hasher_state = sha512()
823 hasher_state.update(data)
825 evgens = SignedData().decode_evgen(
826 raw[content.offset:],
827 offset=content.offset,
830 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
832 digest = hasher_state.digest()
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
841 SEQUENCE OF iterators
842 _____________________
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
878 fulllen, state = obj.encode1st()
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
885 opener = io.open if PY2 else open
886 with opener("result", "wb") as fd:
887 obj.encode2nd(fd.write, iter(state))
891 You **MUST NOT** use 1st pass state if anything is changed in the
892 objects. It is intended to be used immediately after 1st pass is
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
908 .. autofunction:: pyderasn.browse
912 .. autoclass:: pyderasn.Obj
920 .. autoclass:: pyderasn.Boolean
925 .. autoclass:: pyderasn.Integer
926 :members: __init__, named, tohex
930 .. autoclass:: pyderasn.BitString
931 :members: __init__, bit_len, named
935 .. autoclass:: pyderasn.OctetString
940 .. autoclass:: pyderasn.Null
945 .. autoclass:: pyderasn.ObjectIdentifier
950 .. autoclass:: pyderasn.Enumerated
954 .. autoclass:: pyderasn.CommonString
958 .. autoclass:: pyderasn.NumericString
962 .. autoclass:: pyderasn.PrintableString
963 :members: __init__, allow_asterisk, allow_ampersand
967 .. autoclass:: pyderasn.IA5String
971 .. autoclass:: pyderasn.VisibleString
975 .. autoclass:: pyderasn.UTCTime
976 :members: __init__, todatetime
980 .. autoclass:: pyderasn.GeneralizedTime
981 :members: __init__, todatetime
988 .. autoclass:: pyderasn.Choice
989 :members: __init__, choice, value
993 .. autoclass:: PrimitiveTypes
997 .. autoclass:: pyderasn.Any
1005 .. autoclass:: pyderasn.Sequence
1010 .. autoclass:: pyderasn.Set
1015 .. autoclass:: pyderasn.SequenceOf
1020 .. autoclass:: pyderasn.SetOf
1026 .. autofunction:: pyderasn.abs_decode_path
1027 .. autofunction:: pyderasn.agg_octet_string
1028 .. autofunction:: pyderasn.ascii_visualize
1029 .. autofunction:: pyderasn.colonize_hex
1030 .. autofunction:: pyderasn.encode2pass
1031 .. autofunction:: pyderasn.encode_cer
1032 .. autofunction:: pyderasn.file_mmaped
1033 .. autofunction:: pyderasn.hexenc
1034 .. autofunction:: pyderasn.hexdec
1035 .. autofunction:: pyderasn.hexdump
1036 .. autofunction:: pyderasn.tag_encode
1037 .. autofunction:: pyderasn.tag_decode
1038 .. autofunction:: pyderasn.tag_ctxp
1039 .. autofunction:: pyderasn.tag_ctxc
1040 .. autoclass:: pyderasn.DecodeError
1042 .. autoclass:: pyderasn.NotEnoughData
1043 .. autoclass:: pyderasn.ExceedingData
1044 .. autoclass:: pyderasn.LenIndefForm
1045 .. autoclass:: pyderasn.TagMismatch
1046 .. autoclass:: pyderasn.InvalidLength
1047 .. autoclass:: pyderasn.InvalidOID
1048 .. autoclass:: pyderasn.ObjUnknown
1049 .. autoclass:: pyderasn.ObjNotReady
1050 .. autoclass:: pyderasn.InvalidValueType
1051 .. autoclass:: pyderasn.BoundsError
1058 You can decode DER/BER files using command line abilities::
1060 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1062 If there is no schema for your file, then you can try parsing it without,
1063 but of course IMPLICIT tags will often make it impossible. But result is
1064 good enough for the certificate above::
1066 $ python -m pyderasn path/to/file
1067 0 [1,3,1604] . >: SEQUENCE OF
1068 4 [1,3,1453] . . >: SEQUENCE OF
1069 8 [0,0, 5] . . . . >: [0] ANY
1070 . . . . . A0:03:02:01:02
1071 13 [1,1, 3] . . . . >: INTEGER 61595
1072 18 [1,1, 13] . . . . >: SEQUENCE OF
1073 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1074 31 [1,1, 0] . . . . . . >: NULL
1075 33 [1,3, 274] . . . . >: SEQUENCE OF
1076 37 [1,1, 11] . . . . . . >: SET OF
1077 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1078 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1079 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1081 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1082 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1083 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1084 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1085 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1086 . . . . . . . . . 61:2E:63:6F:6D:2F
1087 1461 [1,1, 13] . . >: SEQUENCE OF
1088 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1089 1474 [1,1, 0] . . . . >: NULL
1090 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1091 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1092 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1098 If you have got dictionaries with ObjectIdentifiers, like example one
1099 from ``tests/test_crts.py``::
1102 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1103 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1105 "2.5.4.10": "id-at-organizationName",
1106 "2.5.4.11": "id-at-organizationalUnitName",
1109 then you can pass it to pretty printer to see human readable OIDs::
1111 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1113 37 [1,1, 11] . . . . . . >: SET OF
1114 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1115 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1116 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1117 50 [1,1, 18] . . . . . . >: SET OF
1118 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1119 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1120 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1121 70 [1,1, 18] . . . . . . >: SET OF
1122 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1123 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1124 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1130 Each decoded element has so-called decode path: sequence of structure
1131 names it is passing during the decode process. Each element has its own
1132 unique path inside the whole ASN.1 tree. You can print it out with
1133 ``--print-decode-path`` option::
1135 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1136 0 [1,3,1604] Certificate SEQUENCE []
1137 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1138 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1139 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1140 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1141 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1142 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1144 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1145 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1146 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1147 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1148 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1149 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1150 . . . . . . . 13:02:45:53
1151 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1154 Now you can print only the specified tree, for example signature algorithm::
1156 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1157 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1158 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1159 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1163 from array import array
1164 from codecs import getdecoder
1165 from codecs import getencoder
1166 from collections import namedtuple
1167 from collections import OrderedDict
1168 from copy import copy
1169 from datetime import datetime
1170 from datetime import timedelta
1171 from io import BytesIO
1172 from math import ceil
1173 from operator import attrgetter
1174 from string import ascii_letters
1175 from string import digits
1176 from sys import maxsize as sys_maxsize
1177 from sys import version_info
1178 from unicodedata import category as unicat
1180 from six import add_metaclass
1181 from six import binary_type
1182 from six import byte2int
1183 from six import indexbytes
1184 from six import int2byte
1185 from six import integer_types
1186 from six import iterbytes
1187 from six import iteritems
1188 from six import itervalues
1190 from six import string_types
1191 from six import text_type
1192 from six import unichr as six_unichr
1193 from six.moves import xrange as six_xrange
1197 from termcolor import colored
1198 except ImportError: # pragma: no cover
1199 def colored(what, *args, **kwargs):
1250 "TagClassApplication",
1253 "TagClassUniversal",
1254 "TagFormConstructed",
1265 TagClassUniversal = 0
1266 TagClassApplication = 1 << 6
1267 TagClassContext = 1 << 7
1268 TagClassPrivate = 1 << 6 | 1 << 7
1269 TagFormPrimitive = 0
1270 TagFormConstructed = 1 << 5
1272 TagClassContext: "",
1273 TagClassApplication: "APPLICATION ",
1274 TagClassPrivate: "PRIVATE ",
1275 TagClassUniversal: "UNIV ",
1279 LENINDEF = b"\x80" # length indefinite mark
1280 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1281 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1282 SET01 = frozenset("01")
1283 DECIMALS = frozenset(digits)
1284 DECIMAL_SIGNS = ".,"
1285 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1288 def file_mmaped(fd):
1289 """Make mmap-ed memoryview for reading from file
1291 :param fd: file object
1292 :returns: memoryview over read-only mmap-ing of the whole file
1296 It is known to work under neither Python 2.x nor Windows.
1299 return memoryview(mmap.mmap(fd.fileno(), length=0, prot=mmap.PROT_READ))
1303 if not set(value) <= DECIMALS:
1304 raise ValueError("non-pure integer")
1308 def fractions2float(fractions_raw):
1309 pureint(fractions_raw)
1310 return float("0." + fractions_raw)
1313 def get_def_by_path(defines_by_path, sub_decode_path):
1314 """Get define by decode path
1316 for path, define in defines_by_path:
1317 if len(path) != len(sub_decode_path):
1319 for p1, p2 in zip(path, sub_decode_path):
1320 if (p1 is not any) and (p1 != p2):
1326 ########################################################################
1328 ########################################################################
1330 class ASN1Error(ValueError):
1334 class DecodeError(ASN1Error):
1335 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1337 :param str msg: reason of decode failing
1338 :param klass: optional exact DecodeError inherited class (like
1339 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1340 :py:exc:`InvalidLength`)
1341 :param decode_path: tuple of strings. It contains human
1342 readable names of the fields through which
1343 decoding process has passed
1344 :param int offset: binary offset where failure happened
1346 super(DecodeError, self).__init__()
1349 self.decode_path = decode_path
1350 self.offset = offset
1355 "" if self.klass is None else self.klass.__name__,
1357 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1358 if len(self.decode_path) > 0 else ""
1360 ("(at %d)" % self.offset) if self.offset > 0 else "",
1366 return "%s(%s)" % (self.__class__.__name__, self)
1369 class NotEnoughData(DecodeError):
1373 class ExceedingData(ASN1Error):
1374 def __init__(self, nbytes):
1375 super(ExceedingData, self).__init__()
1376 self.nbytes = nbytes
1379 return "%d trailing bytes" % self.nbytes
1382 return "%s(%s)" % (self.__class__.__name__, self)
1385 class LenIndefForm(DecodeError):
1389 class TagMismatch(DecodeError):
1393 class InvalidLength(DecodeError):
1397 class InvalidOID(DecodeError):
1401 class ObjUnknown(ASN1Error):
1402 def __init__(self, name):
1403 super(ObjUnknown, self).__init__()
1407 return "object is unknown: %s" % self.name
1410 return "%s(%s)" % (self.__class__.__name__, self)
1413 class ObjNotReady(ASN1Error):
1414 def __init__(self, name):
1415 super(ObjNotReady, self).__init__()
1419 return "object is not ready: %s" % self.name
1422 return "%s(%s)" % (self.__class__.__name__, self)
1425 class InvalidValueType(ASN1Error):
1426 def __init__(self, expected_types):
1427 super(InvalidValueType, self).__init__()
1428 self.expected_types = expected_types
1431 return "invalid value type, expected: %s" % ", ".join(
1432 [repr(t) for t in self.expected_types]
1436 return "%s(%s)" % (self.__class__.__name__, self)
1439 class BoundsError(ASN1Error):
1440 def __init__(self, bound_min, value, bound_max):
1441 super(BoundsError, self).__init__()
1442 self.bound_min = bound_min
1444 self.bound_max = bound_max
1447 return "unsatisfied bounds: %s <= %s <= %s" % (
1454 return "%s(%s)" % (self.__class__.__name__, self)
1457 ########################################################################
1459 ########################################################################
1461 _hexdecoder = getdecoder("hex")
1462 _hexencoder = getencoder("hex")
1466 """Binary data to hexadecimal string convert
1468 return _hexdecoder(data)[0]
1472 """Hexadecimal string to binary data convert
1474 return _hexencoder(data)[0].decode("ascii")
1477 def int_bytes_len(num, byte_len=8):
1480 return int(ceil(float(num.bit_length()) / byte_len))
1483 def zero_ended_encode(num):
1484 octets = bytearray(int_bytes_len(num, 7))
1486 octets[i] = num & 0x7F
1490 octets[i] = 0x80 | (num & 0x7F)
1493 return bytes(octets)
1496 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1497 """Encode tag to binary form
1499 :param int num: tag's number
1500 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1501 :py:data:`pyderasn.TagClassContext`,
1502 :py:data:`pyderasn.TagClassApplication`,
1503 :py:data:`pyderasn.TagClassPrivate`)
1504 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1505 :py:data:`pyderasn.TagFormConstructed`)
1509 return int2byte(klass | form | num)
1510 # [XX|X|11111][1.......][1.......] ... [0.......]
1511 return int2byte(klass | form | 31) + zero_ended_encode(num)
1514 def tag_decode(tag):
1515 """Decode tag from binary form
1519 No validation is performed, assuming that it has already passed.
1521 It returns tuple with three integers, as
1522 :py:func:`pyderasn.tag_encode` accepts.
1524 first_octet = byte2int(tag)
1525 klass = first_octet & 0xC0
1526 form = first_octet & 0x20
1527 if first_octet & 0x1F < 0x1F:
1528 return (klass, form, first_octet & 0x1F)
1530 for octet in iterbytes(tag[1:]):
1533 return (klass, form, num)
1537 """Create CONTEXT PRIMITIVE tag
1539 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1543 """Create CONTEXT CONSTRUCTED tag
1545 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1548 def tag_strip(data):
1549 """Take off tag from the data
1551 :returns: (encoded tag, tag length, remaining data)
1554 raise NotEnoughData("no data at all")
1555 if byte2int(data) & 0x1F < 31:
1556 return data[:1], 1, data[1:]
1561 raise DecodeError("unfinished tag")
1562 if indexbytes(data, i) & 0x80 == 0:
1564 if i == 1 and indexbytes(data, 1) < 0x1F:
1565 raise DecodeError("unexpected long form")
1566 if i > 1 and indexbytes(data, 1) & 0x7F == 0:
1567 raise DecodeError("leading zero byte in tag value")
1569 return data[:i], i, data[i:]
1575 octets = bytearray(int_bytes_len(l) + 1)
1576 octets[0] = 0x80 | (len(octets) - 1)
1577 for i in six_xrange(len(octets) - 1, 0, -1):
1578 octets[i] = l & 0xFF
1580 return bytes(octets)
1583 def len_decode(data):
1586 :returns: (decoded length, length's length, remaining data)
1587 :raises LenIndefForm: if indefinite form encoding is met
1590 raise NotEnoughData("no data at all")
1591 first_octet = byte2int(data)
1592 if first_octet & 0x80 == 0:
1593 return first_octet, 1, data[1:]
1594 octets_num = first_octet & 0x7F
1595 if octets_num + 1 > len(data):
1596 raise NotEnoughData("encoded length is longer than data")
1598 raise LenIndefForm()
1599 if byte2int(data[1:]) == 0:
1600 raise DecodeError("leading zeros")
1602 for v in iterbytes(data[1:1 + octets_num]):
1605 raise DecodeError("long form instead of short one")
1606 return l, 1 + octets_num, data[1 + octets_num:]
1609 LEN0 = len_encode(0)
1610 LEN1 = len_encode(1)
1611 LEN1K = len_encode(1000)
1615 """How many bytes length field will take
1619 if l < 256: # 1 << 8
1621 if l < 65536: # 1 << 16
1623 if l < 16777216: # 1 << 24
1625 if l < 4294967296: # 1 << 32
1627 if l < 1099511627776: # 1 << 40
1629 if l < 281474976710656: # 1 << 48
1631 if l < 72057594037927936: # 1 << 56
1633 raise OverflowError("too big length")
1636 def write_full(writer, data):
1637 """Fully write provided data
1639 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1641 BytesIO does not guarantee that the whole data will be written at
1642 once. That function write everything provided, raising an error if
1643 ``writer`` returns None.
1645 data = memoryview(data)
1647 while written != len(data):
1648 n = writer(data[written:])
1650 raise ValueError("can not write to buf")
1654 # If it is 64-bit system, then use compact 64-bit array of unsigned
1655 # longs. Use an ordinary list with universal integers otherwise, that
1657 if sys_maxsize > 2 ** 32:
1658 def state_2pass_new():
1661 def state_2pass_new():
1665 ########################################################################
1667 ########################################################################
1669 class AutoAddSlots(type):
1670 def __new__(cls, name, bases, _dict):
1671 _dict["__slots__"] = _dict.get("__slots__", ())
1672 return type.__new__(cls, name, bases, _dict)
1675 BasicState = namedtuple("BasicState", (
1688 ), **NAMEDTUPLE_KWARGS)
1691 @add_metaclass(AutoAddSlots)
1693 """Common ASN.1 object class
1695 All ASN.1 types are inherited from it. It has metaclass that
1696 automatically adds ``__slots__`` to all inherited classes.
1721 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1722 self._expl = getattr(self, "expl", None) if expl is None else expl
1723 if self.tag != self.tag_default and self._expl is not None:
1724 raise ValueError("implicit and explicit tags can not be set simultaneously")
1725 if self.tag is None:
1726 self._tag_order = None
1728 tag_class, _, tag_num = tag_decode(
1729 self.tag if self._expl is None else self._expl
1731 self._tag_order = (tag_class, tag_num)
1732 if default is not None:
1734 self.optional = optional
1735 self.offset, self.llen, self.vlen = _decoded
1737 self.expl_lenindef = False
1738 self.lenindef = False
1739 self.ber_encoded = False
1742 def ready(self): # pragma: no cover
1743 """Is object ready to be encoded?
1745 raise NotImplementedError()
1747 def _assert_ready(self):
1749 raise ObjNotReady(self.__class__.__name__)
1753 """Is either object or any elements inside is BER encoded?
1755 return self.expl_lenindef or self.lenindef or self.ber_encoded
1759 """Is object decoded?
1761 return (self.llen + self.vlen) > 0
1763 def __getstate__(self): # pragma: no cover
1764 """Used for making safe to be mutable pickleable copies
1766 raise NotImplementedError()
1768 def __setstate__(self, state):
1769 if state.version != __version__:
1770 raise ValueError("data is pickled by different PyDERASN version")
1771 self.tag = state.tag
1772 self._tag_order = state.tag_order
1773 self._expl = state.expl
1774 self.default = state.default
1775 self.optional = state.optional
1776 self.offset = state.offset
1777 self.llen = state.llen
1778 self.vlen = state.vlen
1779 self.expl_lenindef = state.expl_lenindef
1780 self.lenindef = state.lenindef
1781 self.ber_encoded = state.ber_encoded
1784 def tag_order(self):
1785 """Tag's (class, number) used for DER/CER sorting
1787 return self._tag_order
1790 def tag_order_cer(self):
1791 return self.tag_order
1795 """.. seealso:: :ref:`decoding`
1797 return len(self.tag)
1801 """.. seealso:: :ref:`decoding`
1803 return self.tlen + self.llen + self.vlen
1805 def __str__(self): # pragma: no cover
1806 return self.__bytes__() if PY2 else self.__unicode__()
1808 def __ne__(self, their):
1809 return not(self == their)
1811 def __gt__(self, their): # pragma: no cover
1812 return not(self < their)
1814 def __le__(self, their): # pragma: no cover
1815 return (self == their) or (self < their)
1817 def __ge__(self, their): # pragma: no cover
1818 return (self == their) or (self > their)
1820 def _encode(self): # pragma: no cover
1821 raise NotImplementedError()
1823 def _encode_cer(self, writer):
1824 write_full(writer, self._encode())
1826 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1827 yield NotImplemented
1829 def _encode1st(self, state):
1830 raise NotImplementedError()
1832 def _encode2nd(self, writer, state_iter):
1833 raise NotImplementedError()
1836 """DER encode the structure
1838 :returns: DER representation
1840 raw = self._encode()
1841 if self._expl is None:
1843 return b"".join((self._expl, len_encode(len(raw)), raw))
1845 def encode1st(self, state=None):
1846 """Do the 1st pass of 2-pass encoding
1848 :rtype: (int, array("L"))
1849 :returns: full length of encoded data and precalculated various
1853 state = state_2pass_new()
1854 if self._expl is None:
1855 return self._encode1st(state)
1857 idx = len(state) - 1
1858 vlen, _ = self._encode1st(state)
1860 fulllen = len(self._expl) + len_size(vlen) + vlen
1861 return fulllen, state
1863 def encode2nd(self, writer, state_iter):
1864 """Do the 2nd pass of 2-pass encoding
1866 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1867 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1869 if self._expl is None:
1870 self._encode2nd(writer, state_iter)
1872 write_full(writer, self._expl + len_encode(next(state_iter)))
1873 self._encode2nd(writer, state_iter)
1875 def encode_cer(self, writer):
1876 """CER encode the structure to specified writer
1878 :param writer: must comply with ``io.RawIOBase.write``
1879 behaviour. It takes slice to be written and
1880 returns number of bytes processed. If it returns
1881 None, then exception will be raised
1883 if self._expl is not None:
1884 write_full(writer, self._expl + LENINDEF)
1885 if getattr(self, "der_forced", False):
1886 write_full(writer, self._encode())
1888 self._encode_cer(writer)
1889 if self._expl is not None:
1890 write_full(writer, EOC)
1892 def hexencode(self):
1893 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1895 return hexenc(self.encode())
1905 _ctx_immutable=True,
1909 :param data: either binary or memoryview
1910 :param int offset: initial data's offset
1911 :param bool leavemm: do we need to leave memoryview of remaining
1912 data as is, or convert it to bytes otherwise
1913 :param decode_path: current decode path (tuples of strings,
1914 possibly with DecodePathDefBy) with will be
1915 the root for all underlying objects
1916 :param ctx: optional :ref:`context <ctx>` governing decoding process
1917 :param bool tag_only: decode only the tag, without length and
1918 contents (used only in Choice and Set
1919 structures, trying to determine if tag satisfies
1921 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1923 :returns: (Obj, remaining data)
1925 .. seealso:: :ref:`decoding`
1927 result = next(self.decode_evgen(
1939 _, obj, tail = result
1950 _ctx_immutable=True,
1953 """Decode with evgen mode on
1955 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1956 it returns the generator producing ``(decode_path, obj, tail)``
1958 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1962 elif _ctx_immutable:
1964 tlv = memoryview(data)
1967 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1970 if self._expl is None:
1971 for result in self._decode(
1974 decode_path=decode_path,
1977 evgen_mode=_evgen_mode,
1982 _decode_path, obj, tail = result
1983 if _decode_path is not decode_path:
1987 t, tlen, lv = tag_strip(tlv)
1988 except DecodeError as err:
1989 raise err.__class__(
1991 klass=self.__class__,
1992 decode_path=decode_path,
1997 klass=self.__class__,
1998 decode_path=decode_path,
2002 l, llen, v = len_decode(lv)
2003 except LenIndefForm as err:
2004 if not ctx.get("bered", False):
2005 raise err.__class__(
2007 klass=self.__class__,
2008 decode_path=decode_path,
2012 offset += tlen + llen
2013 for result in self._decode(
2016 decode_path=decode_path,
2019 evgen_mode=_evgen_mode,
2021 if tag_only: # pragma: no cover
2024 _decode_path, obj, tail = result
2025 if _decode_path is not decode_path:
2027 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2028 if eoc_expected.tobytes() != EOC:
2031 klass=self.__class__,
2032 decode_path=decode_path,
2036 obj.expl_lenindef = True
2037 except DecodeError as err:
2038 raise err.__class__(
2040 klass=self.__class__,
2041 decode_path=decode_path,
2046 raise NotEnoughData(
2047 "encoded length is longer than data",
2048 klass=self.__class__,
2049 decode_path=decode_path,
2052 for result in self._decode(
2054 offset=offset + tlen + llen,
2055 decode_path=decode_path,
2058 evgen_mode=_evgen_mode,
2060 if tag_only: # pragma: no cover
2063 _decode_path, obj, tail = result
2064 if _decode_path is not decode_path:
2066 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2068 "explicit tag out-of-bound, longer than data",
2069 klass=self.__class__,
2070 decode_path=decode_path,
2073 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2075 def decod(self, data, offset=0, decode_path=(), ctx=None):
2076 """Decode the data, check that tail is empty
2078 :raises ExceedingData: if tail is not empty
2080 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2081 (decode without tail) that also checks that there is no
2084 obj, tail = self.decode(
2087 decode_path=decode_path,
2092 raise ExceedingData(len(tail))
2095 def hexdecode(self, data, *args, **kwargs):
2096 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2098 return self.decode(hexdec(data), *args, **kwargs)
2100 def hexdecod(self, data, *args, **kwargs):
2101 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2103 return self.decod(hexdec(data), *args, **kwargs)
2107 """.. seealso:: :ref:`decoding`
2109 return self._expl is not None
2113 """.. seealso:: :ref:`decoding`
2118 def expl_tlen(self):
2119 """.. seealso:: :ref:`decoding`
2121 return len(self._expl)
2124 def expl_llen(self):
2125 """.. seealso:: :ref:`decoding`
2127 if self.expl_lenindef:
2129 return len(len_encode(self.tlvlen))
2132 def expl_offset(self):
2133 """.. seealso:: :ref:`decoding`
2135 return self.offset - self.expl_tlen - self.expl_llen
2138 def expl_vlen(self):
2139 """.. seealso:: :ref:`decoding`
2144 def expl_tlvlen(self):
2145 """.. seealso:: :ref:`decoding`
2147 return self.expl_tlen + self.expl_llen + self.expl_vlen
2150 def fulloffset(self):
2151 """.. seealso:: :ref:`decoding`
2153 return self.expl_offset if self.expled else self.offset
2157 """.. seealso:: :ref:`decoding`
2159 return self.expl_tlvlen if self.expled else self.tlvlen
2161 def pps_lenindef(self, decode_path):
2162 if self.lenindef and not (
2163 getattr(self, "defined", None) is not None and
2164 self.defined[1].lenindef
2167 asn1_type_name="EOC",
2169 decode_path=decode_path,
2171 self.offset + self.tlvlen -
2172 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2180 if self.expl_lenindef:
2182 asn1_type_name="EOC",
2183 obj_name="EXPLICIT",
2184 decode_path=decode_path,
2185 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2194 def encode_cer(obj):
2195 """Encode to CER in memory buffer
2197 :returns bytes: memory buffer contents
2200 obj.encode_cer(buf.write)
2201 return buf.getvalue()
2204 def encode2pass(obj):
2205 """Encode (2-pass mode) to DER in memory buffer
2207 :returns bytes: memory buffer contents
2210 _, state = obj.encode1st()
2211 obj.encode2nd(buf.write, iter(state))
2212 return buf.getvalue()
2215 class DecodePathDefBy(object):
2216 """DEFINED BY representation inside decode path
2218 __slots__ = ("defined_by",)
2220 def __init__(self, defined_by):
2221 self.defined_by = defined_by
2223 def __ne__(self, their):
2224 return not(self == their)
2226 def __eq__(self, their):
2227 if not isinstance(their, self.__class__):
2229 return self.defined_by == their.defined_by
2232 return "DEFINED BY " + str(self.defined_by)
2235 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2238 ########################################################################
2240 ########################################################################
2242 PP = namedtuple("PP", (
2265 ), **NAMEDTUPLE_KWARGS)
2270 asn1_type_name="unknown",
2287 expl_lenindef=False,
2318 def _colourize(what, colour, with_colours, attrs=("bold",)):
2319 return colored(what, colour, attrs=attrs) if with_colours else what
2322 def colonize_hex(hexed):
2323 """Separate hexadecimal string with colons
2325 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2328 def find_oid_name(asn1_type_name, oid_maps, value):
2329 if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2330 for oid_map in oid_maps:
2331 oid_name = oid_map.get(value)
2332 if oid_name is not None:
2343 with_decode_path=False,
2344 decode_path_len_decrease=0,
2351 " " if pp.expl_offset is None else
2352 ("-%d" % (pp.offset - pp.expl_offset))
2354 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2356 col = _colourize(col, "red", with_colours, ())
2357 col += _colourize("B", "red", with_colours) if pp.bered else " "
2359 col = "[%d,%d,%4d]%s" % (
2360 pp.tlen, pp.llen, pp.vlen,
2361 LENINDEF_PP_CHAR if pp.lenindef else " "
2363 col = _colourize(col, "green", with_colours, ())
2365 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2366 if decode_path_len > 0:
2367 cols.append(" ." * decode_path_len)
2368 ent = pp.decode_path[-1]
2369 if isinstance(ent, DecodePathDefBy):
2370 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2371 value = str(ent.defined_by)
2372 oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2373 if oid_name is None:
2374 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2376 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2378 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2379 if pp.expl is not None:
2380 klass, _, num = pp.expl
2381 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2382 cols.append(_colourize(col, "blue", with_colours))
2383 if pp.impl is not None:
2384 klass, _, num = pp.impl
2385 col = "[%s%d]" % (TagClassReprs[klass], num)
2386 cols.append(_colourize(col, "blue", with_colours))
2387 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2388 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2390 cols.append(_colourize("BER", "red", with_colours))
2391 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2392 if pp.value is not None:
2394 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2395 oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2396 if oid_name is not None:
2397 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2398 if pp.asn1_type_name == Integer.asn1_type_name:
2399 cols.append(_colourize(
2400 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2403 if pp.blob.__class__ == binary_type:
2404 cols.append(hexenc(pp.blob))
2405 elif pp.blob.__class__ == tuple:
2406 cols.append(", ".join(pp.blob))
2408 cols.append(_colourize("OPTIONAL", "red", with_colours))
2410 cols.append(_colourize("DEFAULT", "red", with_colours))
2411 if with_decode_path:
2412 cols.append(_colourize(
2413 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2417 return " ".join(cols)
2420 def pp_console_blob(pp, decode_path_len_decrease=0):
2421 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2422 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2423 if decode_path_len > 0:
2424 cols.append(" ." * (decode_path_len + 1))
2425 if pp.blob.__class__ == binary_type:
2426 blob = hexenc(pp.blob).upper()
2427 for i in six_xrange(0, len(blob), 32):
2428 chunk = blob[i:i + 32]
2429 yield " ".join(cols + [colonize_hex(chunk)])
2430 elif pp.blob.__class__ == tuple:
2431 yield " ".join(cols + [", ".join(pp.blob)])
2439 with_decode_path=False,
2440 decode_path_only=(),
2443 """Pretty print object
2445 :param Obj obj: object you want to pretty print
2446 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2447 Its human readable form is printed when OID is met
2448 :param big_blobs: if large binary objects are met (like OctetString
2449 values), do we need to print them too, on separate
2451 :param with_colours: colourize output, if ``termcolor`` library
2453 :param with_decode_path: print decode path
2454 :param decode_path_only: print only that specified decode path
2456 def _pprint_pps(pps):
2458 if hasattr(pp, "_fields"):
2460 decode_path_only != () and
2462 str(p) for p in pp.decode_path[:len(decode_path_only)]
2463 ) != decode_path_only
2467 yield pp_console_row(
2472 with_colours=with_colours,
2473 with_decode_path=with_decode_path,
2474 decode_path_len_decrease=len(decode_path_only),
2476 for row in pp_console_blob(
2478 decode_path_len_decrease=len(decode_path_only),
2482 yield pp_console_row(
2487 with_colours=with_colours,
2488 with_decode_path=with_decode_path,
2489 decode_path_len_decrease=len(decode_path_only),
2492 for row in _pprint_pps(pp):
2494 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2497 ########################################################################
2498 # ASN.1 primitive types
2499 ########################################################################
2501 BooleanState = namedtuple(
2503 BasicState._fields + ("value",),
2509 """``BOOLEAN`` boolean type
2511 >>> b = Boolean(True)
2513 >>> b == Boolean(True)
2519 tag_default = tag_encode(1)
2520 asn1_type_name = "BOOLEAN"
2532 :param value: set the value. Either boolean type, or
2533 :py:class:`pyderasn.Boolean` object
2534 :param bytes impl: override default tag with ``IMPLICIT`` one
2535 :param bytes expl: override default tag with ``EXPLICIT`` one
2536 :param default: set default value. Type same as in ``value``
2537 :param bool optional: is object ``OPTIONAL`` in sequence
2539 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2540 self._value = None if value is None else self._value_sanitize(value)
2541 if default is not None:
2542 default = self._value_sanitize(default)
2543 self.default = self.__class__(
2549 self._value = default
2551 def _value_sanitize(self, value):
2552 if value.__class__ == bool:
2554 if issubclass(value.__class__, Boolean):
2556 raise InvalidValueType((self.__class__, bool))
2560 return self._value is not None
2562 def __getstate__(self):
2563 return BooleanState(
2579 def __setstate__(self, state):
2580 super(Boolean, self).__setstate__(state)
2581 self._value = state.value
2583 def __nonzero__(self):
2584 self._assert_ready()
2588 self._assert_ready()
2591 def __eq__(self, their):
2592 if their.__class__ == bool:
2593 return self._value == their
2594 if not issubclass(their.__class__, Boolean):
2597 self._value == their._value and
2598 self.tag == their.tag and
2599 self._expl == their._expl
2610 return self.__class__(
2612 impl=self.tag if impl is None else impl,
2613 expl=self._expl if expl is None else expl,
2614 default=self.default if default is None else default,
2615 optional=self.optional if optional is None else optional,
2619 self._assert_ready()
2620 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2622 def _encode1st(self, state):
2623 return len(self.tag) + 2, state
2625 def _encode2nd(self, writer, state_iter):
2626 self._assert_ready()
2627 write_full(writer, self._encode())
2629 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2631 t, _, lv = tag_strip(tlv)
2632 except DecodeError as err:
2633 raise err.__class__(
2635 klass=self.__class__,
2636 decode_path=decode_path,
2641 klass=self.__class__,
2642 decode_path=decode_path,
2649 l, _, v = len_decode(lv)
2650 except DecodeError as err:
2651 raise err.__class__(
2653 klass=self.__class__,
2654 decode_path=decode_path,
2658 raise InvalidLength(
2659 "Boolean's length must be equal to 1",
2660 klass=self.__class__,
2661 decode_path=decode_path,
2665 raise NotEnoughData(
2666 "encoded length is longer than data",
2667 klass=self.__class__,
2668 decode_path=decode_path,
2671 first_octet = byte2int(v)
2673 if first_octet == 0:
2675 elif first_octet == 0xFF:
2677 elif ctx.get("bered", False):
2682 "unacceptable Boolean value",
2683 klass=self.__class__,
2684 decode_path=decode_path,
2687 obj = self.__class__(
2691 default=self.default,
2692 optional=self.optional,
2693 _decoded=(offset, 1, 1),
2695 obj.ber_encoded = ber_encoded
2696 yield decode_path, obj, v[1:]
2699 return pp_console_row(next(self.pps()))
2701 def pps(self, decode_path=()):
2704 asn1_type_name=self.asn1_type_name,
2705 obj_name=self.__class__.__name__,
2706 decode_path=decode_path,
2707 value=str(self._value) if self.ready else None,
2708 optional=self.optional,
2709 default=self == self.default,
2710 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2711 expl=None if self._expl is None else tag_decode(self._expl),
2716 expl_offset=self.expl_offset if self.expled else None,
2717 expl_tlen=self.expl_tlen if self.expled else None,
2718 expl_llen=self.expl_llen if self.expled else None,
2719 expl_vlen=self.expl_vlen if self.expled else None,
2720 expl_lenindef=self.expl_lenindef,
2721 ber_encoded=self.ber_encoded,
2724 for pp in self.pps_lenindef(decode_path):
2728 IntegerState = namedtuple(
2730 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2736 """``INTEGER`` integer type
2738 >>> b = Integer(-123)
2740 >>> b == Integer(-123)
2745 >>> Integer(2, bounds=(1, 3))
2747 >>> Integer(5, bounds=(1, 3))
2748 Traceback (most recent call last):
2749 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2753 class Version(Integer):
2760 >>> v = Version("v1")
2767 {'v3': 2, 'v1': 0, 'v2': 1}
2769 __slots__ = ("specs", "_bound_min", "_bound_max")
2770 tag_default = tag_encode(2)
2771 asn1_type_name = "INTEGER"
2785 :param value: set the value. Either integer type, named value
2786 (if ``schema`` is specified in the class), or
2787 :py:class:`pyderasn.Integer` object
2788 :param bounds: set ``(MIN, MAX)`` value constraint.
2789 (-inf, +inf) by default
2790 :param bytes impl: override default tag with ``IMPLICIT`` one
2791 :param bytes expl: override default tag with ``EXPLICIT`` one
2792 :param default: set default value. Type same as in ``value``
2793 :param bool optional: is object ``OPTIONAL`` in sequence
2795 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2797 specs = getattr(self, "schema", {}) if _specs is None else _specs
2798 self.specs = specs if specs.__class__ == dict else dict(specs)
2799 self._bound_min, self._bound_max = getattr(
2802 (float("-inf"), float("+inf")),
2803 ) if bounds is None else bounds
2804 if value is not None:
2805 self._value = self._value_sanitize(value)
2806 if default is not None:
2807 default = self._value_sanitize(default)
2808 self.default = self.__class__(
2814 if self._value is None:
2815 self._value = default
2817 def _value_sanitize(self, value):
2818 if isinstance(value, integer_types):
2820 elif issubclass(value.__class__, Integer):
2821 value = value._value
2822 elif value.__class__ == str:
2823 value = self.specs.get(value)
2825 raise ObjUnknown("integer value: %s" % value)
2827 raise InvalidValueType((self.__class__, int, str))
2828 if not self._bound_min <= value <= self._bound_max:
2829 raise BoundsError(self._bound_min, value, self._bound_max)
2834 return self._value is not None
2836 def __getstate__(self):
2837 return IntegerState(
2856 def __setstate__(self, state):
2857 super(Integer, self).__setstate__(state)
2858 self.specs = state.specs
2859 self._value = state.value
2860 self._bound_min = state.bound_min
2861 self._bound_max = state.bound_max
2864 self._assert_ready()
2865 return int(self._value)
2868 """Hexadecimal representation
2870 Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2872 hex_repr = hex(int(self))[2:].upper()
2873 if len(hex_repr) % 2 != 0:
2874 hex_repr = "0" + hex_repr
2878 self._assert_ready()
2879 return hash(b"".join((
2881 bytes(self._expl or b""),
2882 str(self._value).encode("ascii"),
2885 def __eq__(self, their):
2886 if isinstance(their, integer_types):
2887 return self._value == their
2888 if not issubclass(their.__class__, Integer):
2891 self._value == their._value and
2892 self.tag == their.tag and
2893 self._expl == their._expl
2896 def __lt__(self, their):
2897 return self._value < their._value
2901 """Return named representation (if exists) of the value
2903 for name, value in iteritems(self.specs):
2904 if value == self._value:
2917 return self.__class__(
2920 (self._bound_min, self._bound_max)
2921 if bounds is None else bounds
2923 impl=self.tag if impl is None else impl,
2924 expl=self._expl if expl is None else expl,
2925 default=self.default if default is None else default,
2926 optional=self.optional if optional is None else optional,
2930 def _encode_payload(self):
2931 self._assert_ready()
2935 octets = bytearray([0])
2939 octets = bytearray()
2941 octets.append((value & 0xFF) ^ 0xFF)
2943 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2946 octets = bytearray()
2948 octets.append(value & 0xFF)
2950 if octets[-1] & 0x80 > 0:
2953 octets = bytes(octets)
2955 bytes_len = ceil(value.bit_length() / 8) or 1
2958 octets = value.to_bytes(
2963 except OverflowError:
2970 octets = self._encode_payload()
2971 return b"".join((self.tag, len_encode(len(octets)), octets))
2973 def _encode1st(self, state):
2974 l = len(self._encode_payload())
2975 return len(self.tag) + len_size(l) + l, state
2977 def _encode2nd(self, writer, state_iter):
2978 write_full(writer, self._encode())
2980 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2982 t, _, lv = tag_strip(tlv)
2983 except DecodeError as err:
2984 raise err.__class__(
2986 klass=self.__class__,
2987 decode_path=decode_path,
2992 klass=self.__class__,
2993 decode_path=decode_path,
3000 l, llen, v = len_decode(lv)
3001 except DecodeError as err:
3002 raise err.__class__(
3004 klass=self.__class__,
3005 decode_path=decode_path,
3009 raise NotEnoughData(
3010 "encoded length is longer than data",
3011 klass=self.__class__,
3012 decode_path=decode_path,
3016 raise NotEnoughData(
3018 klass=self.__class__,
3019 decode_path=decode_path,
3022 v, tail = v[:l], v[l:]
3023 first_octet = byte2int(v)
3025 second_octet = byte2int(v[1:])
3027 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3028 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3031 "non normalized integer",
3032 klass=self.__class__,
3033 decode_path=decode_path,
3038 if first_octet & 0x80 > 0:
3039 octets = bytearray()
3040 for octet in bytearray(v):
3041 octets.append(octet ^ 0xFF)
3042 for octet in octets:
3043 value = (value << 8) | octet
3047 for octet in bytearray(v):
3048 value = (value << 8) | octet
3050 value = int.from_bytes(v, byteorder="big", signed=True)
3052 obj = self.__class__(
3054 bounds=(self._bound_min, self._bound_max),
3057 default=self.default,
3058 optional=self.optional,
3060 _decoded=(offset, llen, l),
3062 except BoundsError as err:
3065 klass=self.__class__,
3066 decode_path=decode_path,
3069 yield decode_path, obj, tail
3072 return pp_console_row(next(self.pps()))
3074 def pps(self, decode_path=()):
3077 asn1_type_name=self.asn1_type_name,
3078 obj_name=self.__class__.__name__,
3079 decode_path=decode_path,
3080 value=(self.named or str(self._value)) if self.ready else None,
3081 optional=self.optional,
3082 default=self == self.default,
3083 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3084 expl=None if self._expl is None else tag_decode(self._expl),
3089 expl_offset=self.expl_offset if self.expled else None,
3090 expl_tlen=self.expl_tlen if self.expled else None,
3091 expl_llen=self.expl_llen if self.expled else None,
3092 expl_vlen=self.expl_vlen if self.expled else None,
3093 expl_lenindef=self.expl_lenindef,
3096 for pp in self.pps_lenindef(decode_path):
3100 BitStringState = namedtuple(
3102 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3107 class BitString(Obj):
3108 """``BIT STRING`` bit string type
3110 >>> BitString(b"hello world")
3111 BIT STRING 88 bits 68656c6c6f20776f726c64
3114 >>> b == b"hello world"
3119 >>> BitString("'0A3B5F291CD'H")
3120 BIT STRING 44 bits 0a3b5f291cd0
3121 >>> b = BitString("'010110000000'B")
3122 BIT STRING 12 bits 5800
3125 >>> b[0], b[1], b[2], b[3]
3126 (False, True, False, True)
3130 [False, True, False, True, True, False, False, False, False, False, False, False]
3134 class KeyUsage(BitString):
3136 ("digitalSignature", 0),
3137 ("nonRepudiation", 1),
3138 ("keyEncipherment", 2),
3141 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3142 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3144 ['nonRepudiation', 'keyEncipherment']
3146 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3150 Pay attention that BIT STRING can be encoded both in primitive
3151 and constructed forms. Decoder always checks constructed form tag
3152 additionally to specified primitive one. If BER decoding is
3153 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3154 of DER restrictions.
3156 __slots__ = ("tag_constructed", "specs", "defined")
3157 tag_default = tag_encode(3)
3158 asn1_type_name = "BIT STRING"
3171 :param value: set the value. Either binary type, tuple of named
3172 values (if ``schema`` is specified in the class),
3173 string in ``'XXX...'B`` form, or
3174 :py:class:`pyderasn.BitString` object
3175 :param bytes impl: override default tag with ``IMPLICIT`` one
3176 :param bytes expl: override default tag with ``EXPLICIT`` one
3177 :param default: set default value. Type same as in ``value``
3178 :param bool optional: is object ``OPTIONAL`` in sequence
3180 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3181 specs = getattr(self, "schema", {}) if _specs is None else _specs
3182 self.specs = specs if specs.__class__ == dict else dict(specs)
3183 self._value = None if value is None else self._value_sanitize(value)
3184 if default is not None:
3185 default = self._value_sanitize(default)
3186 self.default = self.__class__(
3192 self._value = default
3194 tag_klass, _, tag_num = tag_decode(self.tag)
3195 self.tag_constructed = tag_encode(
3197 form=TagFormConstructed,
3201 def _bits2octets(self, bits):
3202 if len(self.specs) > 0:
3203 bits = bits.rstrip("0")
3205 bits += "0" * ((8 - (bit_len % 8)) % 8)
3206 octets = bytearray(len(bits) // 8)
3207 for i in six_xrange(len(octets)):
3208 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3209 return bit_len, bytes(octets)
3211 def _value_sanitize(self, value):
3212 if isinstance(value, (string_types, binary_type)):
3214 isinstance(value, string_types) and
3215 value.startswith("'")
3217 if value.endswith("'B"):
3219 if not frozenset(value) <= SET01:
3220 raise ValueError("B's coding contains unacceptable chars")
3221 return self._bits2octets(value)
3222 if value.endswith("'H"):
3226 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3228 if value.__class__ == binary_type:
3229 return (len(value) * 8, value)
3230 raise InvalidValueType((self.__class__, string_types, binary_type))
3231 if value.__class__ == tuple:
3234 isinstance(value[0], integer_types) and
3235 value[1].__class__ == binary_type
3240 bit = self.specs.get(name)
3242 raise ObjUnknown("BitString value: %s" % name)
3245 return self._bits2octets("")
3246 bits = frozenset(bits)
3247 return self._bits2octets("".join(
3248 ("1" if bit in bits else "0")
3249 for bit in six_xrange(max(bits) + 1)
3251 if issubclass(value.__class__, BitString):
3253 raise InvalidValueType((self.__class__, binary_type, string_types))
3257 return self._value is not None
3259 def __getstate__(self):
3260 return BitStringState(
3275 self.tag_constructed,
3279 def __setstate__(self, state):
3280 super(BitString, self).__setstate__(state)
3281 self.specs = state.specs
3282 self._value = state.value
3283 self.tag_constructed = state.tag_constructed
3284 self.defined = state.defined
3287 self._assert_ready()
3288 for i in six_xrange(self._value[0]):
3293 """Returns number of bits in the string
3295 self._assert_ready()
3296 return self._value[0]
3298 def __bytes__(self):
3299 self._assert_ready()
3300 return self._value[1]
3302 def __eq__(self, their):
3303 if their.__class__ == bytes:
3304 return self._value[1] == their
3305 if not issubclass(their.__class__, BitString):
3308 self._value == their._value and
3309 self.tag == their.tag and
3310 self._expl == their._expl
3315 """Named representation (if exists) of the bits
3317 :returns: [str(name), ...]
3319 return [name for name, bit in iteritems(self.specs) if self[bit]]
3329 return self.__class__(
3331 impl=self.tag if impl is None else impl,
3332 expl=self._expl if expl is None else expl,
3333 default=self.default if default is None else default,
3334 optional=self.optional if optional is None else optional,
3338 def __getitem__(self, key):
3339 if key.__class__ == int:
3340 bit_len, octets = self._value
3344 byte2int(memoryview(octets)[key // 8:]) >>
3347 if isinstance(key, string_types):
3348 value = self.specs.get(key)
3350 raise ObjUnknown("BitString value: %s" % key)
3352 raise InvalidValueType((int, str))
3355 self._assert_ready()
3356 bit_len, octets = self._value
3359 len_encode(len(octets) + 1),
3360 int2byte((8 - bit_len % 8) % 8),
3364 def _encode1st(self, state):
3365 self._assert_ready()
3366 _, octets = self._value
3368 return len(self.tag) + len_size(l) + l, state
3370 def _encode2nd(self, writer, state_iter):
3371 bit_len, octets = self._value
3372 write_full(writer, b"".join((
3374 len_encode(len(octets) + 1),
3375 int2byte((8 - bit_len % 8) % 8),
3377 write_full(writer, octets)
3379 def _encode_cer(self, writer):
3380 bit_len, octets = self._value
3381 if len(octets) + 1 <= 1000:
3382 write_full(writer, self._encode())
3384 write_full(writer, self.tag_constructed)
3385 write_full(writer, LENINDEF)
3386 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3387 write_full(writer, b"".join((
3388 BitString.tag_default,
3391 octets[offset:offset + 999],
3393 tail = octets[offset + 999:]
3395 tail = int2byte((8 - bit_len % 8) % 8) + tail
3396 write_full(writer, b"".join((
3397 BitString.tag_default,
3398 len_encode(len(tail)),
3401 write_full(writer, EOC)
3403 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3405 t, tlen, lv = tag_strip(tlv)
3406 except DecodeError as err:
3407 raise err.__class__(
3409 klass=self.__class__,
3410 decode_path=decode_path,
3414 if tag_only: # pragma: no cover
3418 l, llen, v = len_decode(lv)
3419 except DecodeError as err:
3420 raise err.__class__(
3422 klass=self.__class__,
3423 decode_path=decode_path,
3427 raise NotEnoughData(
3428 "encoded length is longer than data",
3429 klass=self.__class__,
3430 decode_path=decode_path,
3434 raise NotEnoughData(
3436 klass=self.__class__,
3437 decode_path=decode_path,
3440 pad_size = byte2int(v)
3441 if l == 1 and pad_size != 0:
3443 "invalid empty value",
3444 klass=self.__class__,
3445 decode_path=decode_path,
3451 klass=self.__class__,
3452 decode_path=decode_path,
3455 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3458 klass=self.__class__,
3459 decode_path=decode_path,
3462 v, tail = v[:l], v[l:]
3463 bit_len = (len(v) - 1) * 8 - pad_size
3464 obj = self.__class__(
3465 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3468 default=self.default,
3469 optional=self.optional,
3471 _decoded=(offset, llen, l),
3474 obj._value = (bit_len, None)
3475 yield decode_path, obj, tail
3477 if t != self.tag_constructed:
3479 klass=self.__class__,
3480 decode_path=decode_path,
3483 if not ctx.get("bered", False):
3485 "unallowed BER constructed encoding",
3486 klass=self.__class__,
3487 decode_path=decode_path,
3490 if tag_only: # pragma: no cover
3495 l, llen, v = len_decode(lv)
3496 except LenIndefForm:
3497 llen, l, v = 1, 0, lv[1:]
3499 except DecodeError as err:
3500 raise err.__class__(
3502 klass=self.__class__,
3503 decode_path=decode_path,
3507 raise NotEnoughData(
3508 "encoded length is longer than data",
3509 klass=self.__class__,
3510 decode_path=decode_path,
3513 if not lenindef and l == 0:
3514 raise NotEnoughData(
3516 klass=self.__class__,
3517 decode_path=decode_path,
3521 sub_offset = offset + tlen + llen
3525 if v[:EOC_LEN].tobytes() == EOC:
3532 "chunk out of bounds",
3533 klass=self.__class__,
3534 decode_path=decode_path + (str(len(chunks) - 1),),
3535 offset=chunks[-1].offset,
3537 sub_decode_path = decode_path + (str(len(chunks)),)
3540 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3543 decode_path=sub_decode_path,
3546 _ctx_immutable=False,
3548 yield _decode_path, chunk, v_tail
3550 _, chunk, v_tail = next(BitString().decode_evgen(
3553 decode_path=sub_decode_path,
3556 _ctx_immutable=False,
3561 "expected BitString encoded chunk",
3562 klass=self.__class__,
3563 decode_path=sub_decode_path,
3566 chunks.append(chunk)
3567 sub_offset += chunk.tlvlen
3568 vlen += chunk.tlvlen
3570 if len(chunks) == 0:
3573 klass=self.__class__,
3574 decode_path=decode_path,
3579 for chunk_i, chunk in enumerate(chunks[:-1]):
3580 if chunk.bit_len % 8 != 0:
3582 "BitString chunk is not multiple of 8 bits",
3583 klass=self.__class__,
3584 decode_path=decode_path + (str(chunk_i),),
3585 offset=chunk.offset,
3588 values.append(bytes(chunk))
3589 bit_len += chunk.bit_len
3590 chunk_last = chunks[-1]
3592 values.append(bytes(chunk_last))
3593 bit_len += chunk_last.bit_len
3594 obj = self.__class__(
3595 value=None if evgen_mode else (bit_len, b"".join(values)),
3598 default=self.default,
3599 optional=self.optional,
3601 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3604 obj._value = (bit_len, None)
3605 obj.lenindef = lenindef
3606 obj.ber_encoded = True
3607 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3610 return pp_console_row(next(self.pps()))
3612 def pps(self, decode_path=()):
3616 bit_len, blob = self._value
3617 value = "%d bits" % bit_len
3618 if len(self.specs) > 0 and blob is not None:
3619 blob = tuple(self.named)
3622 asn1_type_name=self.asn1_type_name,
3623 obj_name=self.__class__.__name__,
3624 decode_path=decode_path,
3627 optional=self.optional,
3628 default=self == self.default,
3629 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3630 expl=None if self._expl is None else tag_decode(self._expl),
3635 expl_offset=self.expl_offset if self.expled else None,
3636 expl_tlen=self.expl_tlen if self.expled else None,
3637 expl_llen=self.expl_llen if self.expled else None,
3638 expl_vlen=self.expl_vlen if self.expled else None,
3639 expl_lenindef=self.expl_lenindef,
3640 lenindef=self.lenindef,
3641 ber_encoded=self.ber_encoded,
3644 defined_by, defined = self.defined or (None, None)
3645 if defined_by is not None:
3647 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3649 for pp in self.pps_lenindef(decode_path):
3653 OctetStringState = namedtuple(
3655 BasicState._fields + (
3666 class OctetString(Obj):
3667 """``OCTET STRING`` binary string type
3669 >>> s = OctetString(b"hello world")
3670 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3671 >>> s == OctetString(b"hello world")
3676 >>> OctetString(b"hello", bounds=(4, 4))
3677 Traceback (most recent call last):
3678 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3679 >>> OctetString(b"hell", bounds=(4, 4))
3680 OCTET STRING 4 bytes 68656c6c
3682 Memoryviews can be used as a values. If memoryview is made on
3683 mmap-ed file, then it does not take storage inside OctetString
3684 itself. In CER encoding mode it will be streamed to the specified
3685 writer, copying 1 KB chunks.
3687 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3688 tag_default = tag_encode(4)
3689 asn1_type_name = "OCTET STRING"
3690 evgen_mode_skip_value = True
3704 :param value: set the value. Either binary type, or
3705 :py:class:`pyderasn.OctetString` object
3706 :param bounds: set ``(MIN, MAX)`` value size constraint.
3707 (-inf, +inf) by default
3708 :param bytes impl: override default tag with ``IMPLICIT`` one
3709 :param bytes expl: override default tag with ``EXPLICIT`` one
3710 :param default: set default value. Type same as in ``value``
3711 :param bool optional: is object ``OPTIONAL`` in sequence
3713 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3715 self._bound_min, self._bound_max = getattr(
3719 ) if bounds is None else bounds
3720 if value is not None:
3721 self._value = self._value_sanitize(value)
3722 if default is not None:
3723 default = self._value_sanitize(default)
3724 self.default = self.__class__(
3729 if self._value is None:
3730 self._value = default
3732 tag_klass, _, tag_num = tag_decode(self.tag)
3733 self.tag_constructed = tag_encode(
3735 form=TagFormConstructed,
3739 def _value_sanitize(self, value):
3740 if value.__class__ == binary_type or value.__class__ == memoryview:
3742 elif issubclass(value.__class__, OctetString):
3743 value = value._value
3745 raise InvalidValueType((self.__class__, bytes, memoryview))
3746 if not self._bound_min <= len(value) <= self._bound_max:
3747 raise BoundsError(self._bound_min, len(value), self._bound_max)
3752 return self._value is not None
3754 def __getstate__(self):
3755 return OctetStringState(
3771 self.tag_constructed,
3775 def __setstate__(self, state):
3776 super(OctetString, self).__setstate__(state)
3777 self._value = state.value
3778 self._bound_min = state.bound_min
3779 self._bound_max = state.bound_max
3780 self.tag_constructed = state.tag_constructed
3781 self.defined = state.defined
3783 def __bytes__(self):
3784 self._assert_ready()
3785 return bytes(self._value)
3787 def __eq__(self, their):
3788 if their.__class__ == binary_type:
3789 return self._value == their
3790 if not issubclass(their.__class__, OctetString):
3793 self._value == their._value and
3794 self.tag == their.tag and
3795 self._expl == their._expl
3798 def __lt__(self, their):
3799 return self._value < their._value
3810 return self.__class__(
3813 (self._bound_min, self._bound_max)
3814 if bounds is None else bounds
3816 impl=self.tag if impl is None else impl,
3817 expl=self._expl if expl is None else expl,
3818 default=self.default if default is None else default,
3819 optional=self.optional if optional is None else optional,
3823 self._assert_ready()
3826 len_encode(len(self._value)),
3830 def _encode1st(self, state):
3831 self._assert_ready()
3832 l = len(self._value)
3833 return len(self.tag) + len_size(l) + l, state
3835 def _encode2nd(self, writer, state_iter):
3837 write_full(writer, self.tag + len_encode(len(value)))
3838 write_full(writer, value)
3840 def _encode_cer(self, writer):
3841 octets = self._value
3842 if len(octets) <= 1000:
3843 write_full(writer, self._encode())
3845 write_full(writer, self.tag_constructed)
3846 write_full(writer, LENINDEF)
3847 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3848 write_full(writer, b"".join((
3849 OctetString.tag_default,
3851 octets[offset:offset + 1000],
3853 tail = octets[offset + 1000:]
3855 write_full(writer, b"".join((
3856 OctetString.tag_default,
3857 len_encode(len(tail)),
3860 write_full(writer, EOC)
3862 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3864 t, tlen, lv = tag_strip(tlv)
3865 except DecodeError as err:
3866 raise err.__class__(
3868 klass=self.__class__,
3869 decode_path=decode_path,
3877 l, llen, v = len_decode(lv)
3878 except DecodeError as err:
3879 raise err.__class__(
3881 klass=self.__class__,
3882 decode_path=decode_path,
3886 raise NotEnoughData(
3887 "encoded length is longer than data",
3888 klass=self.__class__,
3889 decode_path=decode_path,
3892 v, tail = v[:l], v[l:]
3893 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3895 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3896 klass=self.__class__,
3897 decode_path=decode_path,
3901 obj = self.__class__(
3903 None if (evgen_mode and self.evgen_mode_skip_value)
3906 bounds=(self._bound_min, self._bound_max),
3909 default=self.default,
3910 optional=self.optional,
3911 _decoded=(offset, llen, l),
3914 except DecodeError as err:
3917 klass=self.__class__,
3918 decode_path=decode_path,
3921 except BoundsError as err:
3924 klass=self.__class__,
3925 decode_path=decode_path,
3928 yield decode_path, obj, tail
3930 if t != self.tag_constructed:
3932 klass=self.__class__,
3933 decode_path=decode_path,
3936 if not ctx.get("bered", False):
3938 "unallowed BER constructed encoding",
3939 klass=self.__class__,
3940 decode_path=decode_path,
3948 l, llen, v = len_decode(lv)
3949 except LenIndefForm:
3950 llen, l, v = 1, 0, lv[1:]
3952 except DecodeError as err:
3953 raise err.__class__(
3955 klass=self.__class__,
3956 decode_path=decode_path,
3960 raise NotEnoughData(
3961 "encoded length is longer than data",
3962 klass=self.__class__,
3963 decode_path=decode_path,
3968 sub_offset = offset + tlen + llen
3973 if v[:EOC_LEN].tobytes() == EOC:
3980 "chunk out of bounds",
3981 klass=self.__class__,
3982 decode_path=decode_path + (str(len(chunks) - 1),),
3983 offset=chunks[-1].offset,
3987 sub_decode_path = decode_path + (str(chunks_count),)
3988 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3991 decode_path=sub_decode_path,
3994 _ctx_immutable=False,
3996 yield _decode_path, chunk, v_tail
3997 if not chunk.ber_encoded:
3998 payload_len += chunk.vlen
4001 sub_decode_path = decode_path + (str(len(chunks)),)
4002 _, chunk, v_tail = next(OctetString().decode_evgen(
4005 decode_path=sub_decode_path,
4008 _ctx_immutable=False,
4011 chunks.append(chunk)
4014 "expected OctetString encoded chunk",
4015 klass=self.__class__,
4016 decode_path=sub_decode_path,
4019 sub_offset += chunk.tlvlen
4020 vlen += chunk.tlvlen
4022 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4024 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4025 klass=self.__class__,
4026 decode_path=decode_path,
4030 obj = self.__class__(
4032 None if evgen_mode else
4033 b"".join(bytes(chunk) for chunk in chunks)
4035 bounds=(self._bound_min, self._bound_max),
4038 default=self.default,
4039 optional=self.optional,
4040 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4043 except DecodeError as err:
4046 klass=self.__class__,
4047 decode_path=decode_path,
4050 except BoundsError as err:
4053 klass=self.__class__,
4054 decode_path=decode_path,
4057 obj.lenindef = lenindef
4058 obj.ber_encoded = True
4059 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4062 return pp_console_row(next(self.pps()))
4064 def pps(self, decode_path=()):
4067 asn1_type_name=self.asn1_type_name,
4068 obj_name=self.__class__.__name__,
4069 decode_path=decode_path,
4070 value=("%d bytes" % len(self._value)) if self.ready else None,
4071 blob=self._value if self.ready else None,
4072 optional=self.optional,
4073 default=self == self.default,
4074 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4075 expl=None if self._expl is None else tag_decode(self._expl),
4080 expl_offset=self.expl_offset if self.expled else None,
4081 expl_tlen=self.expl_tlen if self.expled else None,
4082 expl_llen=self.expl_llen if self.expled else None,
4083 expl_vlen=self.expl_vlen if self.expled else None,
4084 expl_lenindef=self.expl_lenindef,
4085 lenindef=self.lenindef,
4086 ber_encoded=self.ber_encoded,
4089 defined_by, defined = self.defined or (None, None)
4090 if defined_by is not None:
4092 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4094 for pp in self.pps_lenindef(decode_path):
4098 def agg_octet_string(evgens, decode_path, raw, writer):
4099 """Aggregate constructed string (OctetString and its derivatives)
4101 :param evgens: iterator of generated events
4102 :param decode_path: points to the string we want to decode
4103 :param raw: slicebable (memoryview, bytearray, etc) with
4104 the data evgens are generated on
4105 :param writer: buffer.write where string is going to be saved
4106 :param writer: where string is going to be saved. Must comply
4107 with ``io.RawIOBase.write`` behaviour
4109 .. seealso:: :ref:`agg_octet_string`
4111 decode_path_len = len(decode_path)
4112 for dp, obj, _ in evgens:
4113 if dp[:decode_path_len] != decode_path:
4115 if not obj.ber_encoded:
4116 write_full(writer, raw[
4117 obj.offset + obj.tlen + obj.llen:
4118 obj.offset + obj.tlen + obj.llen + obj.vlen -
4119 (EOC_LEN if obj.expl_lenindef else 0)
4121 if len(dp) == decode_path_len:
4125 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4129 """``NULL`` null object
4137 tag_default = tag_encode(5)
4138 asn1_type_name = "NULL"
4142 value=None, # unused, but Sequence passes it
4149 :param bytes impl: override default tag with ``IMPLICIT`` one
4150 :param bytes expl: override default tag with ``EXPLICIT`` one
4151 :param bool optional: is object ``OPTIONAL`` in sequence
4153 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4160 def __getstate__(self):
4176 def __eq__(self, their):
4177 if not issubclass(their.__class__, Null):
4180 self.tag == their.tag and
4181 self._expl == their._expl
4191 return self.__class__(
4192 impl=self.tag if impl is None else impl,
4193 expl=self._expl if expl is None else expl,
4194 optional=self.optional if optional is None else optional,
4198 return self.tag + LEN0
4200 def _encode1st(self, state):
4201 return len(self.tag) + 1, state
4203 def _encode2nd(self, writer, state_iter):
4204 write_full(writer, self.tag + LEN0)
4206 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4208 t, _, lv = tag_strip(tlv)
4209 except DecodeError as err:
4210 raise err.__class__(
4212 klass=self.__class__,
4213 decode_path=decode_path,
4218 klass=self.__class__,
4219 decode_path=decode_path,
4222 if tag_only: # pragma: no cover
4226 l, _, v = len_decode(lv)
4227 except DecodeError as err:
4228 raise err.__class__(
4230 klass=self.__class__,
4231 decode_path=decode_path,
4235 raise InvalidLength(
4236 "Null must have zero length",
4237 klass=self.__class__,
4238 decode_path=decode_path,
4241 obj = self.__class__(
4244 optional=self.optional,
4245 _decoded=(offset, 1, 0),
4247 yield decode_path, obj, v
4250 return pp_console_row(next(self.pps()))
4252 def pps(self, decode_path=()):
4255 asn1_type_name=self.asn1_type_name,
4256 obj_name=self.__class__.__name__,
4257 decode_path=decode_path,
4258 optional=self.optional,
4259 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4260 expl=None if self._expl is None else tag_decode(self._expl),
4265 expl_offset=self.expl_offset if self.expled else None,
4266 expl_tlen=self.expl_tlen if self.expled else None,
4267 expl_llen=self.expl_llen if self.expled else None,
4268 expl_vlen=self.expl_vlen if self.expled else None,
4269 expl_lenindef=self.expl_lenindef,
4272 for pp in self.pps_lenindef(decode_path):
4276 ObjectIdentifierState = namedtuple(
4277 "ObjectIdentifierState",
4278 BasicState._fields + ("value", "defines"),
4283 class ObjectIdentifier(Obj):
4284 """``OBJECT IDENTIFIER`` OID type
4286 >>> oid = ObjectIdentifier((1, 2, 3))
4287 OBJECT IDENTIFIER 1.2.3
4288 >>> oid == ObjectIdentifier("1.2.3")
4294 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4295 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4297 >>> str(ObjectIdentifier((3, 1)))
4298 Traceback (most recent call last):
4299 pyderasn.InvalidOID: unacceptable first arc value
4301 __slots__ = ("defines",)
4302 tag_default = tag_encode(6)
4303 asn1_type_name = "OBJECT IDENTIFIER"
4316 :param value: set the value. Either tuples of integers,
4317 string of "."-concatenated integers, or
4318 :py:class:`pyderasn.ObjectIdentifier` object
4319 :param defines: sequence of tuples. Each tuple has two elements.
4320 First one is relative to current one decode
4321 path, aiming to the field defined by that OID.
4322 Read about relative path in
4323 :py:func:`pyderasn.abs_decode_path`. Second
4324 tuple element is ``{OID: pyderasn.Obj()}``
4325 dictionary, mapping between current OID value
4326 and structure applied to defined field.
4328 .. seealso:: :ref:`definedby`
4330 :param bytes impl: override default tag with ``IMPLICIT`` one
4331 :param bytes expl: override default tag with ``EXPLICIT`` one
4332 :param default: set default value. Type same as in ``value``
4333 :param bool optional: is object ``OPTIONAL`` in sequence
4335 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4337 if value is not None:
4338 self._value = self._value_sanitize(value)
4339 if default is not None:
4340 default = self._value_sanitize(default)
4341 self.default = self.__class__(
4346 if self._value is None:
4347 self._value = default
4348 self.defines = defines
4350 def __add__(self, their):
4351 if their.__class__ == tuple:
4352 return self.__class__(self._value + array("L", their))
4353 if isinstance(their, self.__class__):
4354 return self.__class__(self._value + their._value)
4355 raise InvalidValueType((self.__class__, tuple))
4357 def _value_sanitize(self, value):
4358 if issubclass(value.__class__, ObjectIdentifier):
4360 if isinstance(value, string_types):
4362 value = array("L", (pureint(arc) for arc in value.split(".")))
4364 raise InvalidOID("unacceptable arcs values")
4365 if value.__class__ == tuple:
4367 value = array("L", value)
4368 except OverflowError as err:
4369 raise InvalidOID(repr(err))
4370 if value.__class__ is array:
4372 raise InvalidOID("less than 2 arcs")
4373 first_arc = value[0]
4374 if first_arc in (0, 1):
4375 if not (0 <= value[1] <= 39):
4376 raise InvalidOID("second arc is too wide")
4377 elif first_arc == 2:
4380 raise InvalidOID("unacceptable first arc value")
4381 if not all(arc >= 0 for arc in value):
4382 raise InvalidOID("negative arc value")
4384 raise InvalidValueType((self.__class__, str, tuple))
4388 return self._value is not None
4390 def __getstate__(self):
4391 return ObjectIdentifierState(
4408 def __setstate__(self, state):
4409 super(ObjectIdentifier, self).__setstate__(state)
4410 self._value = state.value
4411 self.defines = state.defines
4414 self._assert_ready()
4415 return iter(self._value)
4418 return ".".join(str(arc) for arc in self._value or ())
4421 self._assert_ready()
4422 return hash(b"".join((
4424 bytes(self._expl or b""),
4425 str(self._value).encode("ascii"),
4428 def __eq__(self, their):
4429 if their.__class__ == tuple:
4430 return self._value == array("L", their)
4431 if not issubclass(their.__class__, ObjectIdentifier):
4434 self.tag == their.tag and
4435 self._expl == their._expl and
4436 self._value == their._value
4439 def __lt__(self, their):
4440 return self._value < their._value
4451 return self.__class__(
4453 defines=self.defines if defines is None else defines,
4454 impl=self.tag if impl is None else impl,
4455 expl=self._expl if expl is None else expl,
4456 default=self.default if default is None else default,
4457 optional=self.optional if optional is None else optional,
4460 def _encode_octets(self):
4461 self._assert_ready()
4463 first_value = value[1]
4464 first_arc = value[0]
4467 elif first_arc == 1:
4469 elif first_arc == 2:
4471 else: # pragma: no cover
4472 raise RuntimeError("invalid arc is stored")
4473 octets = [zero_ended_encode(first_value)]
4474 for arc in value[2:]:
4475 octets.append(zero_ended_encode(arc))
4476 return b"".join(octets)
4479 v = self._encode_octets()
4480 return b"".join((self.tag, len_encode(len(v)), v))
4482 def _encode1st(self, state):
4483 l = len(self._encode_octets())
4484 return len(self.tag) + len_size(l) + l, state
4486 def _encode2nd(self, writer, state_iter):
4487 write_full(writer, self._encode())
4489 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4491 t, _, lv = tag_strip(tlv)
4492 except DecodeError as err:
4493 raise err.__class__(
4495 klass=self.__class__,
4496 decode_path=decode_path,
4501 klass=self.__class__,
4502 decode_path=decode_path,
4505 if tag_only: # pragma: no cover
4509 l, llen, v = len_decode(lv)
4510 except DecodeError as err:
4511 raise err.__class__(
4513 klass=self.__class__,
4514 decode_path=decode_path,
4518 raise NotEnoughData(
4519 "encoded length is longer than data",
4520 klass=self.__class__,
4521 decode_path=decode_path,
4525 raise NotEnoughData(
4527 klass=self.__class__,
4528 decode_path=decode_path,
4531 v, tail = v[:l], v[l:]
4538 octet = indexbytes(v, i)
4539 if i == 0 and octet == 0x80:
4540 if ctx.get("bered", False):
4544 "non normalized arc encoding",
4545 klass=self.__class__,
4546 decode_path=decode_path,
4549 arc = (arc << 7) | (octet & 0x7F)
4550 if octet & 0x80 == 0:
4553 except OverflowError:
4555 "too huge value for local unsigned long",
4556 klass=self.__class__,
4557 decode_path=decode_path,
4566 klass=self.__class__,
4567 decode_path=decode_path,
4571 second_arc = arcs[0]
4572 if 0 <= second_arc <= 39:
4574 elif 40 <= second_arc <= 79:
4580 obj = self.__class__(
4581 value=array("L", (first_arc, second_arc)) + arcs[1:],
4584 default=self.default,
4585 optional=self.optional,
4586 _decoded=(offset, llen, l),
4589 obj.ber_encoded = True
4590 yield decode_path, obj, tail
4593 return pp_console_row(next(self.pps()))
4595 def pps(self, decode_path=()):
4598 asn1_type_name=self.asn1_type_name,
4599 obj_name=self.__class__.__name__,
4600 decode_path=decode_path,
4601 value=str(self) if self.ready else None,
4602 optional=self.optional,
4603 default=self == self.default,
4604 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4605 expl=None if self._expl is None else tag_decode(self._expl),
4610 expl_offset=self.expl_offset if self.expled else None,
4611 expl_tlen=self.expl_tlen if self.expled else None,
4612 expl_llen=self.expl_llen if self.expled else None,
4613 expl_vlen=self.expl_vlen if self.expled else None,
4614 expl_lenindef=self.expl_lenindef,
4615 ber_encoded=self.ber_encoded,
4618 for pp in self.pps_lenindef(decode_path):
4622 class Enumerated(Integer):
4623 """``ENUMERATED`` integer type
4625 This type is identical to :py:class:`pyderasn.Integer`, but requires
4626 schema to be specified and does not accept values missing from it.
4629 tag_default = tag_encode(10)
4630 asn1_type_name = "ENUMERATED"
4641 bounds=None, # dummy argument, workability for Integer.decode
4643 super(Enumerated, self).__init__(
4644 value, bounds, impl, expl, default, optional, _specs, _decoded,
4646 if len(self.specs) == 0:
4647 raise ValueError("schema must be specified")
4649 def _value_sanitize(self, value):
4650 if isinstance(value, self.__class__):
4651 value = value._value
4652 elif isinstance(value, integer_types):
4653 for _value in itervalues(self.specs):
4658 "unknown integer value: %s" % value,
4659 klass=self.__class__,
4661 elif isinstance(value, string_types):
4662 value = self.specs.get(value)
4664 raise ObjUnknown("integer value: %s" % value)
4666 raise InvalidValueType((self.__class__, int, str))
4678 return self.__class__(
4680 impl=self.tag if impl is None else impl,
4681 expl=self._expl if expl is None else expl,
4682 default=self.default if default is None else default,
4683 optional=self.optional if optional is None else optional,
4688 def escape_control_unicode(c):
4689 if unicat(c)[0] == "C":
4690 c = repr(c).lstrip("u").strip("'")
4694 class CommonString(OctetString):
4695 """Common class for all strings
4697 Everything resembles :py:class:`pyderasn.OctetString`, except
4698 ability to deal with unicode text strings.
4700 >>> hexenc("привет мир".encode("utf-8"))
4701 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4702 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4704 >>> s = UTF8String("привет мир")
4705 UTF8String UTF8String привет мир
4707 'привет мир'
4708 >>> hexenc(bytes(s))
4709 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4711 >>> PrintableString("привет мир")
4712 Traceback (most recent call last):
4713 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4715 >>> BMPString("ада", bounds=(2, 2))
4716 Traceback (most recent call last):
4717 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4718 >>> s = BMPString("ад", bounds=(2, 2))
4721 >>> hexenc(bytes(s))
4728 - Text Encoding, validation
4729 * - :py:class:`pyderasn.UTF8String`
4731 * - :py:class:`pyderasn.NumericString`
4732 - proper alphabet validation
4733 * - :py:class:`pyderasn.PrintableString`
4734 - proper alphabet validation
4735 * - :py:class:`pyderasn.TeletexString`
4737 * - :py:class:`pyderasn.T61String`
4739 * - :py:class:`pyderasn.VideotexString`
4741 * - :py:class:`pyderasn.IA5String`
4742 - proper alphabet validation
4743 * - :py:class:`pyderasn.GraphicString`
4745 * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4746 - proper alphabet validation
4747 * - :py:class:`pyderasn.GeneralString`
4749 * - :py:class:`pyderasn.UniversalString`
4751 * - :py:class:`pyderasn.BMPString`
4756 def _value_sanitize(self, value):
4758 value_decoded = None
4759 if isinstance(value, self.__class__):
4760 value_raw = value._value
4761 elif value.__class__ == text_type:
4762 value_decoded = value
4763 elif value.__class__ == binary_type:
4766 raise InvalidValueType((self.__class__, text_type, binary_type))
4769 value_decoded.encode(self.encoding)
4770 if value_raw is None else value_raw
4773 value_raw.decode(self.encoding)
4774 if value_decoded is None else value_decoded
4776 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4777 raise DecodeError(str(err))
4778 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4786 def __eq__(self, their):
4787 if their.__class__ == binary_type:
4788 return self._value == their
4789 if their.__class__ == text_type:
4790 return self._value == their.encode(self.encoding)
4791 if not isinstance(their, self.__class__):
4794 self._value == their._value and
4795 self.tag == their.tag and
4796 self._expl == their._expl
4799 def __unicode__(self):
4801 return self._value.decode(self.encoding)
4802 return text_type(self._value)
4805 return pp_console_row(next(self.pps(no_unicode=PY2)))
4807 def pps(self, decode_path=(), no_unicode=False):
4811 hexenc(bytes(self)) if no_unicode else
4812 "".join(escape_control_unicode(c) for c in self.__unicode__())
4816 asn1_type_name=self.asn1_type_name,
4817 obj_name=self.__class__.__name__,
4818 decode_path=decode_path,
4820 optional=self.optional,
4821 default=self == self.default,
4822 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4823 expl=None if self._expl is None else tag_decode(self._expl),
4828 expl_offset=self.expl_offset if self.expled else None,
4829 expl_tlen=self.expl_tlen if self.expled else None,
4830 expl_llen=self.expl_llen if self.expled else None,
4831 expl_vlen=self.expl_vlen if self.expled else None,
4832 expl_lenindef=self.expl_lenindef,
4833 ber_encoded=self.ber_encoded,
4836 for pp in self.pps_lenindef(decode_path):
4840 class UTF8String(CommonString):
4842 tag_default = tag_encode(12)
4844 asn1_type_name = "UTF8String"
4847 class AllowableCharsMixin(object):
4849 def allowable_chars(self):
4851 return self._allowable_chars
4852 return frozenset(six_unichr(c) for c in self._allowable_chars)
4854 def _value_sanitize(self, value):
4855 value = super(AllowableCharsMixin, self)._value_sanitize(value)
4856 if not frozenset(value) <= self._allowable_chars:
4857 raise DecodeError("non satisfying alphabet value")
4861 class NumericString(AllowableCharsMixin, CommonString):
4864 Its value is properly sanitized: only ASCII digits with spaces can
4867 >>> NumericString().allowable_chars
4868 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4871 tag_default = tag_encode(18)
4873 asn1_type_name = "NumericString"
4874 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4877 PrintableStringState = namedtuple(
4878 "PrintableStringState",
4879 OctetStringState._fields + ("allowable_chars",),
4884 class PrintableString(AllowableCharsMixin, CommonString):
4887 Its value is properly sanitized: see X.680 41.4 table 10.
4889 >>> PrintableString().allowable_chars
4890 frozenset([' ', "'", ..., 'z'])
4891 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4892 PrintableString PrintableString foo*bar
4893 >>> obj.allow_asterisk, obj.allow_ampersand
4897 tag_default = tag_encode(19)
4899 asn1_type_name = "PrintableString"
4900 _allowable_chars = frozenset(
4901 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4903 _asterisk = frozenset("*".encode("ascii"))
4904 _ampersand = frozenset("&".encode("ascii"))
4916 allow_asterisk=False,
4917 allow_ampersand=False,
4920 :param allow_asterisk: allow asterisk character
4921 :param allow_ampersand: allow ampersand character
4924 self._allowable_chars |= self._asterisk
4926 self._allowable_chars |= self._ampersand
4927 super(PrintableString, self).__init__(
4928 value, bounds, impl, expl, default, optional, _decoded, ctx,
4932 def allow_asterisk(self):
4933 """Is asterisk character allowed?
4935 return self._asterisk <= self._allowable_chars
4938 def allow_ampersand(self):
4939 """Is ampersand character allowed?
4941 return self._ampersand <= self._allowable_chars
4943 def __getstate__(self):
4944 return PrintableStringState(
4945 *super(PrintableString, self).__getstate__(),
4946 **{"allowable_chars": self._allowable_chars}
4949 def __setstate__(self, state):
4950 super(PrintableString, self).__setstate__(state)
4951 self._allowable_chars = state.allowable_chars
4962 return self.__class__(
4965 (self._bound_min, self._bound_max)
4966 if bounds is None else bounds
4968 impl=self.tag if impl is None else impl,
4969 expl=self._expl if expl is None else expl,
4970 default=self.default if default is None else default,
4971 optional=self.optional if optional is None else optional,
4972 allow_asterisk=self.allow_asterisk,
4973 allow_ampersand=self.allow_ampersand,
4977 class TeletexString(CommonString):
4979 tag_default = tag_encode(20)
4980 encoding = "iso-8859-1"
4981 asn1_type_name = "TeletexString"
4984 class T61String(TeletexString):
4986 asn1_type_name = "T61String"
4989 class VideotexString(CommonString):
4991 tag_default = tag_encode(21)
4992 encoding = "iso-8859-1"
4993 asn1_type_name = "VideotexString"
4996 class IA5String(AllowableCharsMixin, CommonString):
4999 Its value is properly sanitized: it is a mix of
5001 * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
5002 * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
5003 * DEL character (0x7F)
5005 It is just 7-bit ASCII.
5007 >>> IA5String().allowable_chars
5008 frozenset(["NUL", ... "DEL"])
5011 tag_default = tag_encode(22)
5013 asn1_type_name = "IA5"
5014 _allowable_chars = frozenset(b"".join(
5015 six_unichr(c).encode("ascii") for c in six_xrange(128)
5019 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
5020 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
5021 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
5022 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5023 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5024 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5027 class VisibleString(AllowableCharsMixin, CommonString):
5030 Its value is properly sanitized. ASCII subset from space to tilde is
5031 allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5033 >>> VisibleString().allowable_chars
5034 frozenset([" ", ... "~"])
5037 tag_default = tag_encode(26)
5039 asn1_type_name = "VisibleString"
5040 _allowable_chars = frozenset(b"".join(
5041 six_unichr(c).encode("ascii") for c in six_xrange(ord(" "), ord("~") + 1)
5045 class ISO646String(VisibleString):
5047 asn1_type_name = "ISO646String"
5050 UTCTimeState = namedtuple(
5052 OctetStringState._fields + ("ber_raw",),
5057 def str_to_time_fractions(value):
5059 year, v = (v // 10**10), (v % 10**10)
5060 month, v = (v // 10**8), (v % 10**8)
5061 day, v = (v // 10**6), (v % 10**6)
5062 hour, v = (v // 10**4), (v % 10**4)
5063 minute, second = (v // 100), (v % 100)
5064 return year, month, day, hour, minute, second
5067 class UTCTime(VisibleString):
5068 """``UTCTime`` datetime type
5070 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5071 UTCTime UTCTime 2017-09-30T22:07:50
5077 datetime.datetime(2017, 9, 30, 22, 7, 50)
5078 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5079 datetime.datetime(1957, 9, 30, 22, 7, 50)
5081 If BER encoded value was met, then ``ber_raw`` attribute will hold
5082 its raw representation.
5086 Only **naive** ``datetime`` objects are supported.
5087 Library assumes that all work is done in UTC.
5091 Pay attention that ``UTCTime`` can not hold full year, so all years
5092 having < 50 years are treated as 20xx, 19xx otherwise, according to
5093 X.509 recommendation. Use ``GeneralizedTime`` instead for
5098 No strict validation of UTC offsets are made (only applicable to
5099 **BER**), but very crude:
5101 * minutes are not exceeding 60
5102 * offset value is not exceeding 14 hours
5104 __slots__ = ("ber_raw",)
5105 tag_default = tag_encode(23)
5107 asn1_type_name = "UTCTime"
5108 evgen_mode_skip_value = False
5118 bounds=None, # dummy argument, workability for OctetString.decode
5122 :param value: set the value. Either datetime type, or
5123 :py:class:`pyderasn.UTCTime` object
5124 :param bytes impl: override default tag with ``IMPLICIT`` one
5125 :param bytes expl: override default tag with ``EXPLICIT`` one
5126 :param default: set default value. Type same as in ``value``
5127 :param bool optional: is object ``OPTIONAL`` in sequence
5129 super(UTCTime, self).__init__(
5130 None, None, impl, expl, None, optional, _decoded, ctx,
5134 if value is not None:
5135 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5136 self.ber_encoded = self.ber_raw is not None
5137 if default is not None:
5138 default, _ = self._value_sanitize(default)
5139 self.default = self.__class__(
5144 if self._value is None:
5145 self._value = default
5147 self.optional = optional
5149 def _strptime_bered(self, value):
5150 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5153 raise ValueError("no timezone")
5154 year += 2000 if year < 50 else 1900
5155 decoded = datetime(year, month, day, hour, minute)
5157 if value[-1] == "Z":
5161 raise ValueError("invalid UTC offset")
5162 if value[-5] == "-":
5164 elif value[-5] == "+":
5167 raise ValueError("invalid UTC offset")
5168 v = pureint(value[-4:])
5169 offset, v = (60 * (v % 100)), v // 100
5171 raise ValueError("invalid UTC offset minutes")
5173 if offset > 14 * 3600:
5174 raise ValueError("too big UTC offset")
5178 return offset, decoded
5180 raise ValueError("invalid UTC offset seconds")
5181 seconds = pureint(value)
5183 raise ValueError("invalid seconds value")
5184 return offset, decoded + timedelta(seconds=seconds)
5186 def _strptime(self, value):
5187 # datetime.strptime's format: %y%m%d%H%M%SZ
5188 if len(value) != LEN_YYMMDDHHMMSSZ:
5189 raise ValueError("invalid UTCTime length")
5190 if value[-1] != "Z":
5191 raise ValueError("non UTC timezone")
5192 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5193 year += 2000 if year < 50 else 1900
5194 return datetime(year, month, day, hour, minute, second)
5196 def _dt_sanitize(self, value):
5197 if value.year < 1950 or value.year > 2049:
5198 raise ValueError("UTCTime can hold only 1950-2049 years")
5199 return value.replace(microsecond=0)
5201 def _value_sanitize(self, value, ctx=None):
5202 if value.__class__ == binary_type:
5204 value_decoded = value.decode("ascii")
5205 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5206 raise DecodeError("invalid UTCTime encoding: %r" % err)
5209 return self._strptime(value_decoded), None
5210 except (TypeError, ValueError) as _err:
5212 if (ctx is not None) and ctx.get("bered", False):
5214 offset, _value = self._strptime_bered(value_decoded)
5215 _value = _value - timedelta(seconds=offset)
5216 return self._dt_sanitize(_value), value
5217 except (TypeError, ValueError, OverflowError) as _err:
5220 "invalid %s format: %r" % (self.asn1_type_name, err),
5221 klass=self.__class__,
5223 if isinstance(value, self.__class__):
5224 return value._value, None
5225 if value.__class__ == datetime:
5226 if value.tzinfo is not None:
5227 raise ValueError("only naive datetime supported")
5228 return self._dt_sanitize(value), None
5229 raise InvalidValueType((self.__class__, datetime))
5231 def _pp_value(self):
5233 value = self._value.isoformat()
5234 if self.ber_encoded:
5235 value += " (%s)" % self.ber_raw
5239 def __unicode__(self):
5241 value = self._value.isoformat()
5242 if self.ber_encoded:
5243 value += " (%s)" % self.ber_raw
5245 return text_type(self._pp_value())
5247 def __getstate__(self):
5248 return UTCTimeState(
5249 *super(UTCTime, self).__getstate__(),
5250 **{"ber_raw": self.ber_raw}
5253 def __setstate__(self, state):
5254 super(UTCTime, self).__setstate__(state)
5255 self.ber_raw = state.ber_raw
5257 def __bytes__(self):
5258 self._assert_ready()
5259 return self._encode_time()
5261 def __eq__(self, their):
5262 if their.__class__ == binary_type:
5263 return self._encode_time() == their
5264 if their.__class__ == datetime:
5265 return self.todatetime() == their
5266 if not isinstance(their, self.__class__):
5269 self._value == their._value and
5270 self.tag == their.tag and
5271 self._expl == their._expl
5274 def _encode_time(self):
5275 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5278 self._assert_ready()
5279 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5281 def _encode1st(self, state):
5282 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5284 def _encode2nd(self, writer, state_iter):
5285 self._assert_ready()
5286 write_full(writer, self._encode())
5288 def _encode_cer(self, writer):
5289 write_full(writer, self._encode())
5291 def todatetime(self):
5295 return pp_console_row(next(self.pps()))
5297 def pps(self, decode_path=()):
5300 asn1_type_name=self.asn1_type_name,
5301 obj_name=self.__class__.__name__,
5302 decode_path=decode_path,
5303 value=self._pp_value(),
5304 optional=self.optional,
5305 default=self == self.default,
5306 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5307 expl=None if self._expl is None else tag_decode(self._expl),
5312 expl_offset=self.expl_offset if self.expled else None,
5313 expl_tlen=self.expl_tlen if self.expled else None,
5314 expl_llen=self.expl_llen if self.expled else None,
5315 expl_vlen=self.expl_vlen if self.expled else None,
5316 expl_lenindef=self.expl_lenindef,
5317 ber_encoded=self.ber_encoded,
5320 for pp in self.pps_lenindef(decode_path):
5324 class GeneralizedTime(UTCTime):
5325 """``GeneralizedTime`` datetime type
5327 This type is similar to :py:class:`pyderasn.UTCTime`.
5329 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5330 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5332 '20170930220750.000123Z'
5333 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5334 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5338 Only **naive** datetime objects are supported.
5339 Library assumes that all work is done in UTC.
5343 Only **microsecond** fractions are supported in DER encoding.
5344 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5345 higher precision values.
5349 **BER** encoded data can loss information (accuracy) during
5350 decoding because of float transformations.
5354 **Zero** year is unsupported.
5357 tag_default = tag_encode(24)
5358 asn1_type_name = "GeneralizedTime"
5360 def _dt_sanitize(self, value):
5363 def _strptime_bered(self, value):
5364 if len(value) < 4 + 3 * 2:
5365 raise ValueError("invalid GeneralizedTime")
5366 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5367 decoded = datetime(year, month, day, hour)
5368 offset, value = 0, value[10:]
5370 return offset, decoded
5371 if value[-1] == "Z":
5374 for char, sign in (("-", -1), ("+", 1)):
5375 idx = value.rfind(char)
5378 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5379 v = pureint(offset_raw)
5380 if len(offset_raw) == 4:
5381 offset, v = (60 * (v % 100)), v // 100
5383 raise ValueError("invalid UTC offset minutes")
5384 elif len(offset_raw) == 2:
5387 raise ValueError("invalid UTC offset")
5389 if offset > 14 * 3600:
5390 raise ValueError("too big UTC offset")
5394 return offset, decoded
5395 if value[0] in DECIMAL_SIGNS:
5397 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5400 raise ValueError("stripped minutes")
5401 decoded += timedelta(seconds=60 * pureint(value[:2]))
5404 return offset, decoded
5405 if value[0] in DECIMAL_SIGNS:
5407 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5410 raise ValueError("stripped seconds")
5411 decoded += timedelta(seconds=pureint(value[:2]))
5414 return offset, decoded
5415 if value[0] not in DECIMAL_SIGNS:
5416 raise ValueError("invalid format after seconds")
5418 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5421 def _strptime(self, value):
5423 if l == LEN_YYYYMMDDHHMMSSZ:
5424 # datetime.strptime's format: %Y%m%d%H%M%SZ
5425 if value[-1] != "Z":
5426 raise ValueError("non UTC timezone")
5427 return datetime(*str_to_time_fractions(value[:-1]))
5428 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5429 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5430 if value[-1] != "Z":
5431 raise ValueError("non UTC timezone")
5432 if value[14] != ".":
5433 raise ValueError("no fractions separator")
5436 raise ValueError("trailing zero")
5439 raise ValueError("only microsecond fractions are supported")
5440 us = pureint(us + ("0" * (6 - us_len)))
5441 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5442 return datetime(year, month, day, hour, minute, second, us)
5443 raise ValueError("invalid GeneralizedTime length")
5445 def _encode_time(self):
5447 encoded = value.strftime("%Y%m%d%H%M%S")
5448 if value.microsecond > 0:
5449 encoded += (".%06d" % value.microsecond).rstrip("0")
5450 return (encoded + "Z").encode("ascii")
5453 self._assert_ready()
5455 if value.microsecond > 0:
5456 encoded = self._encode_time()
5457 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5458 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5460 def _encode1st(self, state):
5461 self._assert_ready()
5462 vlen = len(self._encode_time())
5463 return len(self.tag) + len_size(vlen) + vlen, state
5465 def _encode2nd(self, writer, state_iter):
5466 write_full(writer, self._encode())
5469 class GraphicString(CommonString):
5471 tag_default = tag_encode(25)
5472 encoding = "iso-8859-1"
5473 asn1_type_name = "GraphicString"
5476 class GeneralString(CommonString):
5478 tag_default = tag_encode(27)
5479 encoding = "iso-8859-1"
5480 asn1_type_name = "GeneralString"
5483 class UniversalString(CommonString):
5485 tag_default = tag_encode(28)
5486 encoding = "utf-32-be"
5487 asn1_type_name = "UniversalString"
5490 class BMPString(CommonString):
5492 tag_default = tag_encode(30)
5493 encoding = "utf-16-be"
5494 asn1_type_name = "BMPString"
5497 ChoiceState = namedtuple(
5499 BasicState._fields + ("specs", "value",),
5505 """``CHOICE`` special type
5509 class GeneralName(Choice):
5511 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5512 ("dNSName", IA5String(impl=tag_ctxp(2))),
5515 >>> gn = GeneralName()
5517 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5518 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5519 >>> gn["dNSName"] = IA5String("bar.baz")
5520 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5521 >>> gn["rfc822Name"]
5524 [2] IA5String IA5 bar.baz
5527 >>> gn.value == gn["dNSName"]
5530 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5532 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5533 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5535 __slots__ = ("specs",)
5537 asn1_type_name = "CHOICE"
5550 :param value: set the value. Either ``(choice, value)`` tuple, or
5551 :py:class:`pyderasn.Choice` object
5552 :param bytes impl: can not be set, do **not** use it
5553 :param bytes expl: override default tag with ``EXPLICIT`` one
5554 :param default: set default value. Type same as in ``value``
5555 :param bool optional: is object ``OPTIONAL`` in sequence
5557 if impl is not None:
5558 raise ValueError("no implicit tag allowed for CHOICE")
5559 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5561 schema = getattr(self, "schema", ())
5562 if len(schema) == 0:
5563 raise ValueError("schema must be specified")
5565 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5568 if value is not None:
5569 self._value = self._value_sanitize(value)
5570 if default is not None:
5571 default_value = self._value_sanitize(default)
5572 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5573 default_obj.specs = self.specs
5574 default_obj._value = default_value
5575 self.default = default_obj
5577 self._value = copy(default_obj._value)
5578 if self._expl is not None:
5579 tag_class, _, tag_num = tag_decode(self._expl)
5580 self._tag_order = (tag_class, tag_num)
5582 def _value_sanitize(self, value):
5583 if (value.__class__ == tuple) and len(value) == 2:
5585 spec = self.specs.get(choice)
5587 raise ObjUnknown(choice)
5588 if not isinstance(obj, spec.__class__):
5589 raise InvalidValueType((spec,))
5590 return (choice, spec(obj))
5591 if isinstance(value, self.__class__):
5593 raise InvalidValueType((self.__class__, tuple))
5597 return self._value is not None and self._value[1].ready
5601 return self.expl_lenindef or (
5602 (self._value is not None) and
5603 self._value[1].bered
5606 def __getstate__(self):
5624 def __setstate__(self, state):
5625 super(Choice, self).__setstate__(state)
5626 self.specs = state.specs
5627 self._value = state.value
5629 def __eq__(self, their):
5630 if (their.__class__ == tuple) and len(their) == 2:
5631 return self._value == their
5632 if not isinstance(their, self.__class__):
5635 self.specs == their.specs and
5636 self._value == their._value
5646 return self.__class__(
5649 expl=self._expl if expl is None else expl,
5650 default=self.default if default is None else default,
5651 optional=self.optional if optional is None else optional,
5656 """Name of the choice
5658 self._assert_ready()
5659 return self._value[0]
5663 """Value of underlying choice
5665 self._assert_ready()
5666 return self._value[1]
5669 def tag_order(self):
5670 self._assert_ready()
5671 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5674 def tag_order_cer(self):
5675 return min(v.tag_order_cer for v in itervalues(self.specs))
5677 def __getitem__(self, key):
5678 if key not in self.specs:
5679 raise ObjUnknown(key)
5680 if self._value is None:
5682 choice, value = self._value
5687 def __setitem__(self, key, value):
5688 spec = self.specs.get(key)
5690 raise ObjUnknown(key)
5691 if not isinstance(value, spec.__class__):
5692 raise InvalidValueType((spec.__class__,))
5693 self._value = (key, spec(value))
5701 return self._value[1].decoded if self.ready else False
5704 self._assert_ready()
5705 return self._value[1].encode()
5707 def _encode1st(self, state):
5708 self._assert_ready()
5709 return self._value[1].encode1st(state)
5711 def _encode2nd(self, writer, state_iter):
5712 self._value[1].encode2nd(writer, state_iter)
5714 def _encode_cer(self, writer):
5715 self._assert_ready()
5716 self._value[1].encode_cer(writer)
5718 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5719 for choice, spec in iteritems(self.specs):
5720 sub_decode_path = decode_path + (choice,)
5726 decode_path=sub_decode_path,
5729 _ctx_immutable=False,
5736 klass=self.__class__,
5737 decode_path=decode_path,
5740 if tag_only: # pragma: no cover
5744 for _decode_path, value, tail in spec.decode_evgen(
5748 decode_path=sub_decode_path,
5750 _ctx_immutable=False,
5752 yield _decode_path, value, tail
5754 _, value, tail = next(spec.decode_evgen(
5758 decode_path=sub_decode_path,
5760 _ctx_immutable=False,
5763 obj = self.__class__(
5766 default=self.default,
5767 optional=self.optional,
5768 _decoded=(offset, 0, value.fulllen),
5770 obj._value = (choice, value)
5771 yield decode_path, obj, tail
5774 value = pp_console_row(next(self.pps()))
5776 value = "%s[%r]" % (value, self.value)
5779 def pps(self, decode_path=()):
5782 asn1_type_name=self.asn1_type_name,
5783 obj_name=self.__class__.__name__,
5784 decode_path=decode_path,
5785 value=self.choice if self.ready else None,
5786 optional=self.optional,
5787 default=self == self.default,
5788 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5789 expl=None if self._expl is None else tag_decode(self._expl),
5794 expl_lenindef=self.expl_lenindef,
5798 yield self.value.pps(decode_path=decode_path + (self.choice,))
5799 for pp in self.pps_lenindef(decode_path):
5803 class PrimitiveTypes(Choice):
5804 """Predefined ``CHOICE`` for all generic primitive types
5806 It could be useful for general decoding of some unspecified values:
5808 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5809 OCTET STRING 3 bytes 666f6f
5810 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5814 schema = tuple((klass.__name__, klass()) for klass in (
5838 AnyState = namedtuple(
5840 BasicState._fields + ("value", "defined"),
5846 """``ANY`` special type
5848 >>> Any(Integer(-123))
5849 ANY INTEGER -123 (0X:7B)
5850 >>> a = Any(OctetString(b"hello world").encode())
5851 ANY 040b68656c6c6f20776f726c64
5852 >>> hexenc(bytes(a))
5853 b'0x040x0bhello world'
5855 __slots__ = ("defined",)
5856 tag_default = tag_encode(0)
5857 asn1_type_name = "ANY"
5867 :param value: set the value. Either any kind of pyderasn's
5868 **ready** object, or bytes. Pay attention that
5869 **no** validation is performed if raw binary value
5870 is valid TLV, except just tag decoding
5871 :param bytes expl: override default tag with ``EXPLICIT`` one
5872 :param bool optional: is object ``OPTIONAL`` in sequence
5874 super(Any, self).__init__(None, expl, None, optional, _decoded)
5878 value = self._value_sanitize(value)
5880 if self._expl is None:
5881 if value.__class__ == binary_type:
5882 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5884 tag_class, tag_num = value.tag_order
5886 tag_class, _, tag_num = tag_decode(self._expl)
5887 self._tag_order = (tag_class, tag_num)
5890 def _value_sanitize(self, value):
5891 if value.__class__ == binary_type:
5893 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5895 if isinstance(value, self.__class__):
5897 if not isinstance(value, Obj):
5898 raise InvalidValueType((self.__class__, Obj, binary_type))
5903 return self._value is not None
5906 def tag_order(self):
5907 self._assert_ready()
5908 return self._tag_order
5912 if self.expl_lenindef or self.lenindef:
5914 if self.defined is None:
5916 return self.defined[1].bered
5918 def __getstate__(self):
5936 def __setstate__(self, state):
5937 super(Any, self).__setstate__(state)
5938 self._value = state.value
5939 self.defined = state.defined
5941 def __eq__(self, their):
5942 if their.__class__ == binary_type:
5943 if self._value.__class__ == binary_type:
5944 return self._value == their
5945 return self._value.encode() == their
5946 if issubclass(their.__class__, Any):
5947 if self.ready and their.ready:
5948 return bytes(self) == bytes(their)
5949 return self.ready == their.ready
5958 return self.__class__(
5960 expl=self._expl if expl is None else expl,
5961 optional=self.optional if optional is None else optional,
5964 def __bytes__(self):
5965 self._assert_ready()
5967 if value.__class__ == binary_type:
5969 return self._value.encode()
5976 self._assert_ready()
5978 if value.__class__ == binary_type:
5980 return value.encode()
5982 def _encode1st(self, state):
5983 self._assert_ready()
5985 if value.__class__ == binary_type:
5986 return len(value), state
5987 return value.encode1st(state)
5989 def _encode2nd(self, writer, state_iter):
5991 if value.__class__ == binary_type:
5992 write_full(writer, value)
5994 value.encode2nd(writer, state_iter)
5996 def _encode_cer(self, writer):
5997 self._assert_ready()
5999 if value.__class__ == binary_type:
6000 write_full(writer, value)
6002 value.encode_cer(writer)
6004 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6006 t, tlen, lv = tag_strip(tlv)
6007 except DecodeError as err:
6008 raise err.__class__(
6010 klass=self.__class__,
6011 decode_path=decode_path,
6015 l, llen, v = len_decode(lv)
6016 except LenIndefForm as err:
6017 if not ctx.get("bered", False):
6018 raise err.__class__(
6020 klass=self.__class__,
6021 decode_path=decode_path,
6024 llen, vlen, v = 1, 0, lv[1:]
6025 sub_offset = offset + tlen + llen
6027 while v[:EOC_LEN].tobytes() != EOC:
6028 chunk, v = Any().decode(
6031 decode_path=decode_path + (str(chunk_i),),
6034 _ctx_immutable=False,
6036 vlen += chunk.tlvlen
6037 sub_offset += chunk.tlvlen
6039 tlvlen = tlen + llen + vlen + EOC_LEN
6040 obj = self.__class__(
6041 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6043 optional=self.optional,
6044 _decoded=(offset, 0, tlvlen),
6047 obj.tag = t.tobytes()
6048 yield decode_path, obj, v[EOC_LEN:]
6050 except DecodeError as err:
6051 raise err.__class__(
6053 klass=self.__class__,
6054 decode_path=decode_path,
6058 raise NotEnoughData(
6059 "encoded length is longer than data",
6060 klass=self.__class__,
6061 decode_path=decode_path,
6064 tlvlen = tlen + llen + l
6065 v, tail = tlv[:tlvlen], v[l:]
6066 obj = self.__class__(
6067 value=None if evgen_mode else v.tobytes(),
6069 optional=self.optional,
6070 _decoded=(offset, 0, tlvlen),
6072 obj.tag = t.tobytes()
6073 yield decode_path, obj, tail
6076 return pp_console_row(next(self.pps()))
6078 def pps(self, decode_path=()):
6082 elif value.__class__ == binary_type:
6088 asn1_type_name=self.asn1_type_name,
6089 obj_name=self.__class__.__name__,
6090 decode_path=decode_path,
6092 blob=self._value if self._value.__class__ == binary_type else None,
6093 optional=self.optional,
6094 default=self == self.default,
6095 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6096 expl=None if self._expl is None else tag_decode(self._expl),
6101 expl_offset=self.expl_offset if self.expled else None,
6102 expl_tlen=self.expl_tlen if self.expled else None,
6103 expl_llen=self.expl_llen if self.expled else None,
6104 expl_vlen=self.expl_vlen if self.expled else None,
6105 expl_lenindef=self.expl_lenindef,
6106 lenindef=self.lenindef,
6109 defined_by, defined = self.defined or (None, None)
6110 if defined_by is not None:
6112 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6114 for pp in self.pps_lenindef(decode_path):
6118 ########################################################################
6119 # ASN.1 constructed types
6120 ########################################################################
6122 def abs_decode_path(decode_path, rel_path):
6123 """Create an absolute decode path from current and relative ones
6125 :param decode_path: current decode path, starting point. Tuple of strings
6126 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6127 If first tuple's element is "/", then treat it as
6128 an absolute path, ignoring ``decode_path`` as
6129 starting point. Also this tuple can contain ".."
6130 elements, stripping the leading element from
6133 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6134 ("foo", "bar", "baz", "whatever")
6135 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6137 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6140 if rel_path[0] == "/":
6142 if rel_path[0] == "..":
6143 return abs_decode_path(decode_path[:-1], rel_path[1:])
6144 return decode_path + rel_path
6147 SequenceState = namedtuple(
6149 BasicState._fields + ("specs", "value",),
6154 class SequenceEncode1stMixing(object):
6155 def _encode1st(self, state):
6157 idx = len(state) - 1
6159 for v in self._values_for_encoding():
6160 l, _ = v.encode1st(state)
6163 return len(self.tag) + len_size(vlen) + vlen, state
6166 class Sequence(SequenceEncode1stMixing, Obj):
6167 """``SEQUENCE`` structure type
6169 You have to make specification of sequence::
6171 class Extension(Sequence):
6173 ("extnID", ObjectIdentifier()),
6174 ("critical", Boolean(default=False)),
6175 ("extnValue", OctetString()),
6178 Then, you can work with it as with dictionary.
6180 >>> ext = Extension()
6181 >>> Extension().specs
6183 ('extnID', OBJECT IDENTIFIER),
6184 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6185 ('extnValue', OCTET STRING),
6187 >>> ext["extnID"] = "1.2.3"
6188 Traceback (most recent call last):
6189 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6190 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6192 You can determine if sequence is ready to be encoded:
6197 Traceback (most recent call last):
6198 pyderasn.ObjNotReady: object is not ready: extnValue
6199 >>> ext["extnValue"] = OctetString(b"foobar")
6203 Value you want to assign, must have the same **type** as in
6204 corresponding specification, but it can have different tags,
6205 optional/default attributes -- they will be taken from specification
6208 class TBSCertificate(Sequence):
6210 ("version", Version(expl=tag_ctxc(0), default="v1")),
6213 >>> tbs = TBSCertificate()
6214 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6216 Assign ``None`` to remove value from sequence.
6218 You can set values in Sequence during its initialization:
6220 >>> AlgorithmIdentifier((
6221 ("algorithm", ObjectIdentifier("1.2.3")),
6222 ("parameters", Any(Null()))
6224 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6226 You can determine if value exists/set in the sequence and take its value:
6228 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6231 OBJECT IDENTIFIER 1.2.3
6233 But pay attention that if value has default, then it won't be (not
6234 in) in the sequence (because ``DEFAULT`` must not be encoded in
6235 DER), but you can read its value:
6237 >>> "critical" in ext, ext["critical"]
6238 (False, BOOLEAN False)
6239 >>> ext["critical"] = Boolean(True)
6240 >>> "critical" in ext, ext["critical"]
6241 (True, BOOLEAN True)
6243 All defaulted values are always optional.
6245 .. _allow_default_values_ctx:
6247 DER prohibits default value encoding and will raise an error if
6248 default value is unexpectedly met during decode.
6249 If :ref:`bered <bered_ctx>` context option is set, then no error
6250 will be raised, but ``bered`` attribute set. You can disable strict
6251 defaulted values existence validation by setting
6252 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6254 All values with DEFAULT specified are decoded atomically in
6255 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6256 SEQUENCE, then it will be yielded as a single element, not
6257 disassembled. That is required for DEFAULT existence check.
6259 Two sequences are equal if they have equal specification (schema),
6260 implicit/explicit tagging and the same values.
6262 __slots__ = ("specs",)
6263 tag_default = tag_encode(form=TagFormConstructed, num=16)
6264 asn1_type_name = "SEQUENCE"
6276 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6278 schema = getattr(self, "schema", ())
6280 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6283 if value is not None:
6284 if issubclass(value.__class__, Sequence):
6285 self._value = value._value
6286 elif hasattr(value, "__iter__"):
6287 for seq_key, seq_value in value:
6288 self[seq_key] = seq_value
6290 raise InvalidValueType((Sequence,))
6291 if default is not None:
6292 if not issubclass(default.__class__, Sequence):
6293 raise InvalidValueType((Sequence,))
6294 default_value = default._value
6295 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6296 default_obj.specs = self.specs
6297 default_obj._value = default_value
6298 self.default = default_obj
6300 self._value = copy(default_obj._value)
6304 for name, spec in iteritems(self.specs):
6305 value = self._value.get(name)
6316 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6318 return any(value.bered for value in itervalues(self._value))
6320 def __getstate__(self):
6321 return SequenceState(
6335 {k: copy(v) for k, v in iteritems(self._value)},
6338 def __setstate__(self, state):
6339 super(Sequence, self).__setstate__(state)
6340 self.specs = state.specs
6341 self._value = state.value
6343 def __eq__(self, their):
6344 if not isinstance(their, self.__class__):
6347 self.specs == their.specs and
6348 self.tag == their.tag and
6349 self._expl == their._expl and
6350 self._value == their._value
6361 return self.__class__(
6364 impl=self.tag if impl is None else impl,
6365 expl=self._expl if expl is None else expl,
6366 default=self.default if default is None else default,
6367 optional=self.optional if optional is None else optional,
6370 def __contains__(self, key):
6371 return key in self._value
6373 def __setitem__(self, key, value):
6374 spec = self.specs.get(key)
6376 raise ObjUnknown(key)
6378 self._value.pop(key, None)
6380 if not isinstance(value, spec.__class__):
6381 raise InvalidValueType((spec.__class__,))
6382 value = spec(value=value)
6383 if spec.default is not None and value == spec.default:
6384 self._value.pop(key, None)
6386 self._value[key] = value
6388 def __getitem__(self, key):
6389 value = self._value.get(key)
6390 if value is not None:
6392 spec = self.specs.get(key)
6394 raise ObjUnknown(key)
6395 if spec.default is not None:
6399 def _values_for_encoding(self):
6400 for name, spec in iteritems(self.specs):
6401 value = self._value.get(name)
6405 raise ObjNotReady(name)
6409 v = b"".join(v.encode() for v in self._values_for_encoding())
6410 return b"".join((self.tag, len_encode(len(v)), v))
6412 def _encode2nd(self, writer, state_iter):
6413 write_full(writer, self.tag + len_encode(next(state_iter)))
6414 for v in self._values_for_encoding():
6415 v.encode2nd(writer, state_iter)
6417 def _encode_cer(self, writer):
6418 write_full(writer, self.tag + LENINDEF)
6419 for v in self._values_for_encoding():
6420 v.encode_cer(writer)
6421 write_full(writer, EOC)
6423 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6425 t, tlen, lv = tag_strip(tlv)
6426 except DecodeError as err:
6427 raise err.__class__(
6429 klass=self.__class__,
6430 decode_path=decode_path,
6435 klass=self.__class__,
6436 decode_path=decode_path,
6439 if tag_only: # pragma: no cover
6443 ctx_bered = ctx.get("bered", False)
6445 l, llen, v = len_decode(lv)
6446 except LenIndefForm as err:
6448 raise err.__class__(
6450 klass=self.__class__,
6451 decode_path=decode_path,
6454 l, llen, v = 0, 1, lv[1:]
6456 except DecodeError as err:
6457 raise err.__class__(
6459 klass=self.__class__,
6460 decode_path=decode_path,
6464 raise NotEnoughData(
6465 "encoded length is longer than data",
6466 klass=self.__class__,
6467 decode_path=decode_path,
6471 v, tail = v[:l], v[l:]
6473 sub_offset = offset + tlen + llen
6476 ctx_allow_default_values = ctx.get("allow_default_values", False)
6477 for name, spec in iteritems(self.specs):
6478 if spec.optional and (
6479 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6483 spec_defaulted = spec.default is not None
6484 sub_decode_path = decode_path + (name,)
6486 if evgen_mode and not spec_defaulted:
6487 for _decode_path, value, v_tail in spec.decode_evgen(
6491 decode_path=sub_decode_path,
6493 _ctx_immutable=False,
6495 yield _decode_path, value, v_tail
6497 _, value, v_tail = next(spec.decode_evgen(
6501 decode_path=sub_decode_path,
6503 _ctx_immutable=False,
6506 except TagMismatch as err:
6507 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6511 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6512 if not evgen_mode and defined is not None:
6513 defined_by, defined_spec = defined
6514 if issubclass(value.__class__, SequenceOf):
6515 for i, _value in enumerate(value):
6516 sub_sub_decode_path = sub_decode_path + (
6518 DecodePathDefBy(defined_by),
6520 defined_value, defined_tail = defined_spec.decode(
6521 memoryview(bytes(_value)),
6523 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6524 if value.expled else (value.tlen + value.llen)
6527 decode_path=sub_sub_decode_path,
6529 _ctx_immutable=False,
6531 if len(defined_tail) > 0:
6534 klass=self.__class__,
6535 decode_path=sub_sub_decode_path,
6538 _value.defined = (defined_by, defined_value)
6540 defined_value, defined_tail = defined_spec.decode(
6541 memoryview(bytes(value)),
6543 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6544 if value.expled else (value.tlen + value.llen)
6547 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6549 _ctx_immutable=False,
6551 if len(defined_tail) > 0:
6554 klass=self.__class__,
6555 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6558 value.defined = (defined_by, defined_value)
6560 value_len = value.fulllen
6562 sub_offset += value_len
6566 yield sub_decode_path, value, v_tail
6567 if value == spec.default:
6568 if ctx_bered or ctx_allow_default_values:
6572 "DEFAULT value met",
6573 klass=self.__class__,
6574 decode_path=sub_decode_path,
6578 values[name] = value
6579 spec_defines = getattr(spec, "defines", ())
6580 if len(spec_defines) == 0:
6581 defines_by_path = ctx.get("defines_by_path", ())
6582 if len(defines_by_path) > 0:
6583 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6584 if spec_defines is not None and len(spec_defines) > 0:
6585 for rel_path, schema in spec_defines:
6586 defined = schema.get(value, None)
6587 if defined is not None:
6588 ctx.setdefault("_defines", []).append((
6589 abs_decode_path(sub_decode_path[:-1], rel_path),
6593 if v[:EOC_LEN].tobytes() != EOC:
6596 klass=self.__class__,
6597 decode_path=decode_path,
6605 klass=self.__class__,
6606 decode_path=decode_path,
6609 obj = self.__class__(
6613 default=self.default,
6614 optional=self.optional,
6615 _decoded=(offset, llen, vlen),
6618 obj.lenindef = lenindef
6619 obj.ber_encoded = ber_encoded
6620 yield decode_path, obj, tail
6623 value = pp_console_row(next(self.pps()))
6625 for name in self.specs:
6626 _value = self._value.get(name)
6629 cols.append("%s: %s" % (name, repr(_value)))
6630 return "%s[%s]" % (value, "; ".join(cols))
6632 def pps(self, decode_path=()):
6635 asn1_type_name=self.asn1_type_name,
6636 obj_name=self.__class__.__name__,
6637 decode_path=decode_path,
6638 optional=self.optional,
6639 default=self == self.default,
6640 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6641 expl=None if self._expl is None else tag_decode(self._expl),
6646 expl_offset=self.expl_offset if self.expled else None,
6647 expl_tlen=self.expl_tlen if self.expled else None,
6648 expl_llen=self.expl_llen if self.expled else None,
6649 expl_vlen=self.expl_vlen if self.expled else None,
6650 expl_lenindef=self.expl_lenindef,
6651 lenindef=self.lenindef,
6652 ber_encoded=self.ber_encoded,
6655 for name in self.specs:
6656 value = self._value.get(name)
6659 yield value.pps(decode_path=decode_path + (name,))
6660 for pp in self.pps_lenindef(decode_path):
6664 class Set(Sequence, SequenceEncode1stMixing):
6665 """``SET`` structure type
6667 Its usage is identical to :py:class:`pyderasn.Sequence`.
6669 .. _allow_unordered_set_ctx:
6671 DER prohibits unordered values encoding and will raise an error
6672 during decode. If :ref:`bered <bered_ctx>` context option is set,
6673 then no error will occur. Also you can disable strict values
6674 ordering check by setting ``"allow_unordered_set": True``
6675 :ref:`context <ctx>` option.
6678 tag_default = tag_encode(form=TagFormConstructed, num=17)
6679 asn1_type_name = "SET"
6681 def _values_for_encoding(self):
6683 super(Set, self)._values_for_encoding(),
6684 key=attrgetter("tag_order"),
6687 def _encode_cer(self, writer):
6688 write_full(writer, self.tag + LENINDEF)
6690 super(Set, self)._values_for_encoding(),
6691 key=attrgetter("tag_order_cer"),
6693 v.encode_cer(writer)
6694 write_full(writer, EOC)
6696 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6698 t, tlen, lv = tag_strip(tlv)
6699 except DecodeError as err:
6700 raise err.__class__(
6702 klass=self.__class__,
6703 decode_path=decode_path,
6708 klass=self.__class__,
6709 decode_path=decode_path,
6716 ctx_bered = ctx.get("bered", False)
6718 l, llen, v = len_decode(lv)
6719 except LenIndefForm as err:
6721 raise err.__class__(
6723 klass=self.__class__,
6724 decode_path=decode_path,
6727 l, llen, v = 0, 1, lv[1:]
6729 except DecodeError as err:
6730 raise err.__class__(
6732 klass=self.__class__,
6733 decode_path=decode_path,
6737 raise NotEnoughData(
6738 "encoded length is longer than data",
6739 klass=self.__class__,
6743 v, tail = v[:l], v[l:]
6745 sub_offset = offset + tlen + llen
6748 ctx_allow_default_values = ctx.get("allow_default_values", False)
6749 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6750 tag_order_prev = (0, 0)
6751 _specs_items = copy(self.specs)
6754 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6756 for name, spec in iteritems(_specs_items):
6757 sub_decode_path = decode_path + (name,)
6763 decode_path=sub_decode_path,
6766 _ctx_immutable=False,
6773 klass=self.__class__,
6774 decode_path=decode_path,
6777 spec_defaulted = spec.default is not None
6778 if evgen_mode and not spec_defaulted:
6779 for _decode_path, value, v_tail in spec.decode_evgen(
6783 decode_path=sub_decode_path,
6785 _ctx_immutable=False,
6787 yield _decode_path, value, v_tail
6789 _, value, v_tail = next(spec.decode_evgen(
6793 decode_path=sub_decode_path,
6795 _ctx_immutable=False,
6798 value_tag_order = value.tag_order
6799 value_len = value.fulllen
6800 if tag_order_prev >= value_tag_order:
6801 if ctx_bered or ctx_allow_unordered_set:
6805 "unordered " + self.asn1_type_name,
6806 klass=self.__class__,
6807 decode_path=sub_decode_path,
6812 yield sub_decode_path, value, v_tail
6813 if value != spec.default:
6815 elif ctx_bered or ctx_allow_default_values:
6819 "DEFAULT value met",
6820 klass=self.__class__,
6821 decode_path=sub_decode_path,
6824 values[name] = value
6825 del _specs_items[name]
6826 tag_order_prev = value_tag_order
6827 sub_offset += value_len
6831 obj = self.__class__(
6835 default=self.default,
6836 optional=self.optional,
6837 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6840 if v[:EOC_LEN].tobytes() != EOC:
6843 klass=self.__class__,
6844 decode_path=decode_path,
6849 for name, spec in iteritems(self.specs):
6850 if name not in values and not spec.optional:
6852 "%s value is not ready" % name,
6853 klass=self.__class__,
6854 decode_path=decode_path,
6859 obj.ber_encoded = ber_encoded
6860 yield decode_path, obj, tail
6863 SequenceOfState = namedtuple(
6865 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6870 class SequenceOf(SequenceEncode1stMixing, Obj):
6871 """``SEQUENCE OF`` sequence type
6873 For that kind of type you must specify the object it will carry on
6874 (bounds are for example here, not required)::
6876 class Ints(SequenceOf):
6881 >>> ints.append(Integer(123))
6882 >>> ints.append(Integer(234))
6884 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6885 >>> [int(i) for i in ints]
6887 >>> ints.append(Integer(345))
6888 Traceback (most recent call last):
6889 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6892 >>> ints[1] = Integer(345)
6894 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6896 You can initialize sequence with preinitialized values:
6898 >>> ints = Ints([Integer(123), Integer(234)])
6900 Also you can use iterator as a value:
6902 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6904 And it won't be iterated until encoding process. Pay attention that
6905 bounds and required schema checks are done only during the encoding
6906 process in that case! After encode was called, then value is zeroed
6907 back to empty list and you have to set it again. That mode is useful
6908 mainly with CER encoding mode, where all objects from the iterable
6909 will be streamed to the buffer, without copying all of them to
6912 __slots__ = ("spec", "_bound_min", "_bound_max")
6913 tag_default = tag_encode(form=TagFormConstructed, num=16)
6914 asn1_type_name = "SEQUENCE OF"
6927 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6929 schema = getattr(self, "schema", None)
6931 raise ValueError("schema must be specified")
6933 self._bound_min, self._bound_max = getattr(
6937 ) if bounds is None else bounds
6939 if value is not None:
6940 self._value = self._value_sanitize(value)
6941 if default is not None:
6942 default_value = self._value_sanitize(default)
6943 default_obj = self.__class__(
6948 default_obj._value = default_value
6949 self.default = default_obj
6951 self._value = copy(default_obj._value)
6953 def _value_sanitize(self, value):
6955 if issubclass(value.__class__, SequenceOf):
6956 value = value._value
6957 elif hasattr(value, NEXT_ATTR_NAME):
6959 elif hasattr(value, "__iter__"):
6962 raise InvalidValueType((self.__class__, iter, "iterator"))
6964 if not self._bound_min <= len(value) <= self._bound_max:
6965 raise BoundsError(self._bound_min, len(value), self._bound_max)
6966 class_expected = self.spec.__class__
6968 if not isinstance(v, class_expected):
6969 raise InvalidValueType((class_expected,))
6974 if hasattr(self._value, NEXT_ATTR_NAME):
6976 if self._bound_min > 0 and len(self._value) == 0:
6978 return all(v.ready for v in self._value)
6982 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6984 return any(v.bered for v in self._value)
6986 def __getstate__(self):
6987 if hasattr(self._value, NEXT_ATTR_NAME):
6988 raise ValueError("can not pickle SequenceOf with iterator")
6989 return SequenceOfState(
7003 [copy(v) for v in self._value],
7008 def __setstate__(self, state):
7009 super(SequenceOf, self).__setstate__(state)
7010 self.spec = state.spec
7011 self._value = state.value
7012 self._bound_min = state.bound_min
7013 self._bound_max = state.bound_max
7015 def __eq__(self, their):
7016 if isinstance(their, self.__class__):
7018 self.spec == their.spec and
7019 self.tag == their.tag and
7020 self._expl == their._expl and
7021 self._value == their._value
7023 if hasattr(their, "__iter__"):
7024 return self._value == list(their)
7036 return self.__class__(
7040 (self._bound_min, self._bound_max)
7041 if bounds is None else bounds
7043 impl=self.tag if impl is None else impl,
7044 expl=self._expl if expl is None else expl,
7045 default=self.default if default is None else default,
7046 optional=self.optional if optional is None else optional,
7049 def __contains__(self, key):
7050 return key in self._value
7052 def append(self, value):
7053 if not isinstance(value, self.spec.__class__):
7054 raise InvalidValueType((self.spec.__class__,))
7055 if len(self._value) + 1 > self._bound_max:
7058 len(self._value) + 1,
7061 self._value.append(value)
7064 return iter(self._value)
7067 return len(self._value)
7069 def __setitem__(self, key, value):
7070 if not isinstance(value, self.spec.__class__):
7071 raise InvalidValueType((self.spec.__class__,))
7072 self._value[key] = self.spec(value=value)
7074 def __getitem__(self, key):
7075 return self._value[key]
7077 def _values_for_encoding(self):
7078 return iter(self._value)
7081 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7084 values_append = values.append
7085 class_expected = self.spec.__class__
7086 values_for_encoding = self._values_for_encoding()
7088 for v in values_for_encoding:
7089 if not isinstance(v, class_expected):
7090 raise InvalidValueType((class_expected,))
7091 values_append(v.encode())
7092 if not self._bound_min <= len(values) <= self._bound_max:
7093 raise BoundsError(self._bound_min, len(values), self._bound_max)
7094 value = b"".join(values)
7096 value = b"".join(v.encode() for v in self._values_for_encoding())
7097 return b"".join((self.tag, len_encode(len(value)), value))
7099 def _encode1st(self, state):
7100 state = super(SequenceOf, self)._encode1st(state)
7101 if hasattr(self._value, NEXT_ATTR_NAME):
7105 def _encode2nd(self, writer, state_iter):
7106 write_full(writer, self.tag + len_encode(next(state_iter)))
7107 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7110 class_expected = self.spec.__class__
7111 values_for_encoding = self._values_for_encoding()
7113 for v in values_for_encoding:
7114 if not isinstance(v, class_expected):
7115 raise InvalidValueType((class_expected,))
7116 v.encode2nd(writer, state_iter)
7118 if not self._bound_min <= values_count <= self._bound_max:
7119 raise BoundsError(self._bound_min, values_count, self._bound_max)
7121 for v in self._values_for_encoding():
7122 v.encode2nd(writer, state_iter)
7124 def _encode_cer(self, writer):
7125 write_full(writer, self.tag + LENINDEF)
7126 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7128 class_expected = self.spec.__class__
7130 values_for_encoding = self._values_for_encoding()
7132 for v in values_for_encoding:
7133 if not isinstance(v, class_expected):
7134 raise InvalidValueType((class_expected,))
7135 v.encode_cer(writer)
7137 if not self._bound_min <= values_count <= self._bound_max:
7138 raise BoundsError(self._bound_min, values_count, self._bound_max)
7140 for v in self._values_for_encoding():
7141 v.encode_cer(writer)
7142 write_full(writer, EOC)
7152 ordering_check=False,
7155 t, tlen, lv = tag_strip(tlv)
7156 except DecodeError as err:
7157 raise err.__class__(
7159 klass=self.__class__,
7160 decode_path=decode_path,
7165 klass=self.__class__,
7166 decode_path=decode_path,
7173 ctx_bered = ctx.get("bered", False)
7175 l, llen, v = len_decode(lv)
7176 except LenIndefForm as err:
7178 raise err.__class__(
7180 klass=self.__class__,
7181 decode_path=decode_path,
7184 l, llen, v = 0, 1, lv[1:]
7186 except DecodeError as err:
7187 raise err.__class__(
7189 klass=self.__class__,
7190 decode_path=decode_path,
7194 raise NotEnoughData(
7195 "encoded length is longer than data",
7196 klass=self.__class__,
7197 decode_path=decode_path,
7201 v, tail = v[:l], v[l:]
7203 sub_offset = offset + tlen + llen
7206 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7207 value_prev = memoryview(v[:0])
7211 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7213 sub_decode_path = decode_path + (str(_value_count),)
7215 for _decode_path, value, v_tail in spec.decode_evgen(
7219 decode_path=sub_decode_path,
7221 _ctx_immutable=False,
7223 yield _decode_path, value, v_tail
7225 _, value, v_tail = next(spec.decode_evgen(
7229 decode_path=sub_decode_path,
7231 _ctx_immutable=False,
7234 value_len = value.fulllen
7236 if value_prev.tobytes() > v[:value_len].tobytes():
7237 if ctx_bered or ctx_allow_unordered_set:
7241 "unordered " + self.asn1_type_name,
7242 klass=self.__class__,
7243 decode_path=sub_decode_path,
7246 value_prev = v[:value_len]
7249 _value.append(value)
7250 sub_offset += value_len
7253 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7255 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7256 klass=self.__class__,
7257 decode_path=decode_path,
7261 obj = self.__class__(
7262 value=None if evgen_mode else _value,
7264 bounds=(self._bound_min, self._bound_max),
7267 default=self.default,
7268 optional=self.optional,
7269 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7271 except BoundsError as err:
7274 klass=self.__class__,
7275 decode_path=decode_path,
7279 if v[:EOC_LEN].tobytes() != EOC:
7282 klass=self.__class__,
7283 decode_path=decode_path,
7288 obj.ber_encoded = ber_encoded
7289 yield decode_path, obj, tail
7293 pp_console_row(next(self.pps())),
7294 ", ".join(repr(v) for v in self._value),
7297 def pps(self, decode_path=()):
7300 asn1_type_name=self.asn1_type_name,
7301 obj_name=self.__class__.__name__,
7302 decode_path=decode_path,
7303 optional=self.optional,
7304 default=self == self.default,
7305 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7306 expl=None if self._expl is None else tag_decode(self._expl),
7311 expl_offset=self.expl_offset if self.expled else None,
7312 expl_tlen=self.expl_tlen if self.expled else None,
7313 expl_llen=self.expl_llen if self.expled else None,
7314 expl_vlen=self.expl_vlen if self.expled else None,
7315 expl_lenindef=self.expl_lenindef,
7316 lenindef=self.lenindef,
7317 ber_encoded=self.ber_encoded,
7320 for i, value in enumerate(self._value):
7321 yield value.pps(decode_path=decode_path + (str(i),))
7322 for pp in self.pps_lenindef(decode_path):
7326 class SetOf(SequenceOf):
7327 """``SET OF`` sequence type
7329 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7332 tag_default = tag_encode(form=TagFormConstructed, num=17)
7333 asn1_type_name = "SET OF"
7335 def _value_sanitize(self, value):
7336 value = super(SetOf, self)._value_sanitize(value)
7337 if hasattr(value, NEXT_ATTR_NAME):
7339 "SetOf does not support iterator values, as no sense in them"
7344 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7345 return b"".join((self.tag, len_encode(len(v)), v))
7347 def _encode2nd(self, writer, state_iter):
7348 write_full(writer, self.tag + len_encode(next(state_iter)))
7350 for v in self._values_for_encoding():
7352 v.encode2nd(buf.write, state_iter)
7353 values.append(buf.getvalue())
7356 write_full(writer, v)
7358 def _encode_cer(self, writer):
7359 write_full(writer, self.tag + LENINDEF)
7360 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7361 write_full(writer, v)
7362 write_full(writer, EOC)
7364 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7365 return super(SetOf, self)._decode(
7372 ordering_check=True,
7376 def obj_by_path(pypath): # pragma: no cover
7377 """Import object specified as string Python path
7379 Modules must be separated from classes/functions with ``:``.
7381 >>> obj_by_path("foo.bar:Baz")
7382 <class 'foo.bar.Baz'>
7383 >>> obj_by_path("foo.bar:Baz.boo")
7384 <classmethod 'foo.bar.Baz.boo'>
7386 mod, objs = pypath.rsplit(":", 1)
7387 from importlib import import_module
7388 obj = import_module(mod)
7389 for obj_name in objs.split("."):
7390 obj = getattr(obj, obj_name)
7394 def generic_decoder(): # pragma: no cover
7395 # All of this below is a big hack with self references
7396 choice = PrimitiveTypes()
7397 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7398 choice.specs["SetOf"] = SetOf(schema=choice)
7399 for i in six_xrange(31):
7400 choice.specs["SequenceOf%d" % i] = SequenceOf(
7404 choice.specs["Any"] = Any()
7406 # Class name equals to type name, to omit it from output
7407 class SEQUENCEOF(SequenceOf):
7415 with_decode_path=False,
7416 decode_path_only=(),
7419 def _pprint_pps(pps):
7421 if hasattr(pp, "_fields"):
7423 decode_path_only != () and
7424 pp.decode_path[:len(decode_path_only)] != decode_path_only
7427 if pp.asn1_type_name == Choice.asn1_type_name:
7429 pp_kwargs = pp._asdict()
7430 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7431 pp = _pp(**pp_kwargs)
7432 yield pp_console_row(
7437 with_colours=with_colours,
7438 with_decode_path=with_decode_path,
7439 decode_path_len_decrease=len(decode_path_only),
7441 for row in pp_console_blob(
7443 decode_path_len_decrease=len(decode_path_only),
7447 for row in _pprint_pps(pp):
7449 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7450 return SEQUENCEOF(), pprint_any
7453 def ascii_visualize(ba):
7454 """Output only ASCII printable characters, like in hexdump -C
7456 Example output for given binary string (right part)::
7458 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.|
7461 return "".join((six_unichr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7465 """Generate ``hexdump -C`` like output
7469 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..|
7470 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j|
7472 Result of that function is a generator of lines, where each line is
7477 ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7478 " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7479 " |i..?.....G...k.j|"]
7483 hexed = hexenc(raw).upper()
7484 addr, cols = 0, ["%08x " % 0]
7485 for i in six_xrange(0, len(hexed), 2):
7486 if i != 0 and i // 2 % 8 == 0:
7488 if i != 0 and i // 2 % 16 == 0:
7489 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7492 cols = ["%08x " % addr]
7493 cols.append(" " + hexed[i:i + 2])
7495 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7499 def browse(raw, obj, oid_maps=()):
7500 """Interactive browser
7502 :param bytes raw: binary data you decoded
7503 :param obj: decoded :py:class:`pyderasn.Obj`
7504 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7505 Its human readable form is printed when OID is met
7507 .. note:: `urwid <http://urwid.org/>`__ dependency required
7509 This browser is an interactive terminal application for browsing
7510 structures of your decoded ASN.1 objects. You can quit it with **q**
7511 key. It consists of three windows:
7514 View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7515 **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7516 **Left** key goes to constructed element above. **Plus**/**Minus**
7517 keys collapse/uncollapse constructed elements. **Space** toggles it
7519 window with various information about element. You can scroll it
7520 with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7522 window with raw data hexdump and highlighted current element's
7523 contents. It automatically focuses on element's data. You can
7524 scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7525 speed) keys. If element has explicit tag, then it also will be
7526 highlighted with different colour
7528 Window's header contains current decode path and progress bars with
7529 position in *info* and *hexdump* windows.
7531 If you press **d**, then current element will be saved in the
7532 current directory under its decode path name (adding ".0", ".1", etc
7533 suffix if such file already exists). **D** will save it with explicit tag.
7535 You can also invoke it with ``--browse`` command line argument.
7537 from copy import deepcopy
7538 from os.path import exists as path_exists
7541 class TW(urwid.TreeWidget):
7542 def __init__(self, state, *args, **kwargs):
7544 self.scrolled = {"info": False, "hexdump": False}
7545 super(TW, self).__init__(*args, **kwargs)
7548 pp = self.get_node().get_value()
7549 constructed = len(pp) > 1
7550 return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7552 def _state_update(self):
7553 pp, _ = self._get_pp()
7554 self.state["decode_path"].set_text(
7555 ":".join(str(p) for p in pp.decode_path)
7557 lines = deepcopy(self.state["hexed"])
7559 def attr_set(i, attr):
7560 line = lines[i // 16]
7561 idx = 1 + (i - 16 * (i // 16))
7562 line[idx] = (attr, line[idx])
7564 if pp.expl_offset is not None:
7565 for i in six_xrange(
7567 pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7569 attr_set(i, "select-expl")
7570 for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7571 attr_set(i, "select-value")
7572 self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7573 self.state["hexdump"].set_focus(pp.offset // 16)
7574 self.state["hexdump"].set_focus_valign("middle")
7575 self.state["hexdump_bar"].set_completion(
7576 (100 * pp.offset // 16) //
7577 len(self.state["hexdump"]._body.positions())
7581 [("header", "Name: "), pp.obj_name],
7582 [("header", "Type: "), pp.asn1_type_name],
7583 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7584 [("header", "[TLV]len: "), "%d/%d/%d" % (
7585 pp.tlen, pp.llen, pp.vlen,
7587 [("header", "TLVlen: "), "%d" % sum((
7588 pp.tlen, pp.llen, pp.vlen,
7590 [("header", "Slice: "), "[%d:%d]" % (
7591 pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7595 lines.append([("warning", "LENINDEF")])
7597 lines.append([("warning", "BER encoded")])
7599 lines.append([("warning", "BERed")])
7600 if pp.expl is not None:
7601 lines.append([("header", "EXPLICIT")])
7602 klass, _, num = pp.expl
7603 lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)])
7604 if pp.expl_offset is not None:
7605 lines.append([" Offset: %d" % pp.expl_offset])
7606 lines.append([" [TLV]len: %d/%d/%d" % (
7607 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7609 lines.append([" TLVlen: %d" % sum((
7610 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7612 lines.append([" Slice: [%d:%d]" % (
7614 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7616 if pp.impl is not None:
7617 klass, _, num = pp.impl
7619 ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7622 lines.append(["OPTIONAL"])
7624 lines.append(["DEFAULT"])
7625 if len(pp.decode_path) > 0:
7626 ent = pp.decode_path[-1]
7627 if isinstance(ent, DecodePathDefBy):
7629 value = str(ent.defined_by)
7630 oid_name = find_oid_name(
7631 ent.defined_by.asn1_type_name, oid_maps, value,
7633 lines.append([("header", "DEFINED BY: "), "%s" % (
7634 value if oid_name is None
7635 else "%s (%s)" % (oid_name, value)
7638 if pp.value is not None:
7639 lines.append([("header", "Value: "), pp.value])
7641 len(oid_maps) > 0 and
7642 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7644 for oid_map in oid_maps:
7645 oid_name = oid_map.get(pp.value)
7646 if oid_name is not None:
7647 lines.append([("header", "Human: "), oid_name])
7649 if pp.asn1_type_name == Integer.asn1_type_name:
7651 ("header", "Decimal: "), "%d" % int(pp.obj),
7654 ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7656 if pp.blob.__class__ == binary_type:
7657 blob = hexenc(pp.blob).upper()
7658 for i in six_xrange(0, len(blob), 32):
7659 lines.append([colonize_hex(blob[i:i + 32])])
7660 elif pp.blob.__class__ == tuple:
7661 lines.append([", ".join(pp.blob)])
7662 self.state["info"]._set_body([urwid.Text(line) for line in lines])
7663 self.state["info_bar"].set_completion(0)
7665 def selectable(self):
7666 if self.state["widget_current"] != self:
7667 self.state["widget_current"] = self
7668 self.scrolled["info"] = False
7669 self.scrolled["hexdump"] = False
7670 self._state_update()
7671 return super(TW, self).selectable()
7673 def get_display_text(self):
7674 pp, constructed = self._get_pp()
7675 style = "constructed" if constructed else ""
7676 if len(pp.decode_path) == 0:
7677 return (style, pp.obj_name)
7678 if pp.asn1_type_name == "EOC":
7679 return ("eoc", "EOC")
7680 ent = pp.decode_path[-1]
7681 if isinstance(ent, DecodePathDefBy):
7682 value = str(ent.defined_by)
7683 oid_name = find_oid_name(
7684 ent.defined_by.asn1_type_name, oid_maps, value,
7686 return ("defby", "DEFBY:" + (
7687 value if oid_name is None else oid_name
7691 def _scroll(self, what, step):
7692 self.state[what]._invalidate()
7693 pos = self.state[what].focus_position
7694 if not self.scrolled[what]:
7695 self.scrolled[what] = True
7697 pos = max(0, pos + step)
7698 pos = min(pos, len(self.state[what]._body.positions()) - 1)
7699 self.state[what].set_focus(pos)
7700 self.state[what].set_focus_valign("top")
7701 self.state[what + "_bar"].set_completion(
7702 (100 * pos) // len(self.state[what]._body.positions())
7705 def keypress(self, size, key):
7707 raise urwid.ExitMainLoop()
7710 self.expanded = not self.expanded
7711 self.update_expanded_icon()
7714 hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7715 if key in hexdump_steps:
7716 self._scroll("hexdump", hexdump_steps[key])
7719 info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7720 if key in info_steps:
7721 self._scroll("info", info_steps[key])
7724 if key in ("d", "D"):
7725 pp, _ = self._get_pp()
7726 dp = ":".join(str(p) for p in pp.decode_path)
7727 dp = dp.replace(" ", "_")
7730 if key == "d" or pp.expl_offset is None:
7731 data = self.state["raw"][pp.offset:(
7732 pp.offset + pp.tlen + pp.llen + pp.vlen
7735 data = self.state["raw"][pp.expl_offset:(
7736 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7740 def duplicate_path(dp, ctr):
7743 return "%s.%d" % (dp, ctr)
7746 if not path_exists(duplicate_path(dp, ctr)):
7749 dp = duplicate_path(dp, ctr)
7750 with open(dp, "wb") as fd:
7752 self.state["decode_path"].set_text(
7753 ("warning", "Saved to: " + dp)
7756 return super(TW, self).keypress(size, key)
7758 class PN(urwid.ParentNode):
7759 def __init__(self, state, value, *args, **kwargs):
7761 if not hasattr(value, "_fields"):
7763 super(PN, self).__init__(value, *args, **kwargs)
7765 def load_widget(self):
7766 return TW(self.state, self)
7768 def load_child_keys(self):
7769 value = self.get_value()
7770 if hasattr(value, "_fields"):
7772 return range(len(value[1:]))
7774 def load_child_node(self, key):
7777 self.get_value()[key + 1],
7780 depth=self.get_depth() + 1,
7783 class LabeledPG(urwid.ProgressBar):
7784 def __init__(self, label, *args, **kwargs):
7786 super(LabeledPG, self).__init__(*args, **kwargs)
7789 return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7791 WinHexdump = urwid.ListBox([urwid.Text("")])
7792 WinInfo = urwid.ListBox([urwid.Text("")])
7793 WinDecodePath = urwid.Text("", "center")
7794 WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7795 WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7796 WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7799 "hexed": list(hexdump(raw)),
7800 "widget_current": None,
7802 "info_bar": WinInfoBar,
7803 "hexdump": WinHexdump,
7804 "hexdump_bar": WinHexdumpBar,
7805 "decode_path": WinDecodePath,
7809 help_text = " ".join((
7811 "space:(un)collapse",
7812 "(pg)up/down/home/end:nav",
7813 "jkJK:hexdump hlHL:info",
7819 ("weight", 1, WinTree),
7820 ("weight", 2, urwid.Pile([
7821 urwid.LineBox(WinInfo),
7822 urwid.LineBox(WinHexdump),
7825 header=urwid.Columns([
7826 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7827 ("weight", 1, WinInfoBar),
7828 ("weight", 1, WinHexdumpBar),
7830 footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7833 ("header", "bold", ""),
7834 ("constructed", "bold", ""),
7835 ("help", "light magenta", ""),
7836 ("warning", "light red", ""),
7837 ("defby", "light red", ""),
7838 ("eoc", "dark red", ""),
7839 ("select-value", "light green", ""),
7840 ("select-expl", "light red", ""),
7841 ("pg-normal", "", "light blue"),
7842 ("pg-complete", "black", "yellow"),
7847 def main(): # pragma: no cover
7849 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7850 parser.add_argument(
7854 help="Skip that number of bytes from the beginning",
7856 parser.add_argument(
7858 help="Python paths to dictionary with OIDs, comma separated",
7860 parser.add_argument(
7862 help="Python path to schema definition to use",
7864 parser.add_argument(
7865 "--defines-by-path",
7866 help="Python path to decoder's defines_by_path",
7868 parser.add_argument(
7870 action="store_true",
7871 help="Disallow BER encoding",
7873 parser.add_argument(
7874 "--print-decode-path",
7875 action="store_true",
7876 help="Print decode paths",
7878 parser.add_argument(
7879 "--decode-path-only",
7880 help="Print only specified decode path",
7882 parser.add_argument(
7884 action="store_true",
7885 help="Allow explicit tag out-of-bound",
7887 parser.add_argument(
7889 action="store_true",
7890 help="Turn on event generation mode",
7892 parser.add_argument(
7894 action="store_true",
7895 help="Start ASN.1 browser",
7897 parser.add_argument(
7899 type=argparse.FileType("rb"),
7900 help="Path to BER/CER/DER file you want to decode",
7902 args = parser.parse_args()
7904 raw = file_mmaped(args.RAWFile)[args.skip:]
7906 args.RAWFile.seek(args.skip)
7907 raw = memoryview(args.RAWFile.read())
7908 args.RAWFile.close()
7910 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7911 if args.oids else ()
7913 from functools import partial
7915 schema = obj_by_path(args.schema)
7916 pprinter = partial(pprint, big_blobs=True)
7918 schema, pprinter = generic_decoder()
7920 "bered": not args.nobered,
7921 "allow_expl_oob": args.allow_expl_oob,
7923 if args.defines_by_path is not None:
7924 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7926 obj, _ = schema().decode(raw, ctx=ctx)
7927 browse(raw, obj, oid_maps)
7928 from sys import exit as sys_exit
7930 from os import environ
7934 with_colours=environ.get("NO_COLOR") is None,
7935 with_decode_path=args.print_decode_path,
7937 () if args.decode_path_only is None else
7938 tuple(args.decode_path_only.split(":"))
7942 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7943 print(pprinter(obj, decode_path=decode_path))
7945 obj, tail = schema().decode(raw, ctx=ctx)
7946 print(pprinter(obj))
7948 print("\nTrailing data: %s" % hexenc(tail))
7951 if __name__ == "__main__":
7952 from pyderasn import *