3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
670 SEQUENCE/SET values with DEFAULT specified are automatically decoded
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
687 with open("huge", "rb") as fd:
688 raw = file_mmaped(fd)
689 obj = Something.decode(raw)
693 mmap-ed files in Python2.7 does not implement buffer protocol, so
694 memoryview won't work on them.
698 mmap maps the **whole** file. So it plays no role if you seek-ed it
699 before. Take the slice of the resulting memoryview with required
704 If you use ZFS as underlying storage, then pay attention that
705 currently most platforms does not deal good with ZFS ARC and ordinary
706 page cache used for mmaps. It can take twice the necessary size in
707 the memory: both in page cache and ZFS ARC.
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
725 opener = io.open if PY2 else open
726 with opener("result", "wb") as fd:
727 obj.encode_cer(fd.write)
732 obj.encode_cer(buf.write)
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
737 data = encode_cer(obj)
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
746 SET OF values can not be streamingly encoded, because they are
747 required to be sorted byte-by-byte. Big SET OF values still will take
748 much memory. Use neither SET nor SET OF values, as modern ASN.1
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
762 class Certificate(Sequence):
766 class SignedAttributes(SetOf):
771 .. _agg_octet_string:
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
780 SOME STRING[CONSTRUCTED]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[CONSTRUCTED]
797 OCTET STRING[PRIMITIVE]
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
812 fd = open("data.p7m", "rb")
813 raw = file_mmaped(fd)
814 ctx = {"bered": True}
815 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816 if decode_path == ("content",):
820 raise ValueError("no content found")
821 hasher_state = sha512()
823 hasher_state.update(data)
825 evgens = SignedData().decode_evgen(
826 raw[content.offset:],
827 offset=content.offset,
830 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
832 digest = hasher_state.digest()
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
841 SEQUENCE OF iterators
842 _____________________
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
878 fulllen, state = obj.encode1st()
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
885 opener = io.open if PY2 else open
886 with opener("result", "wb") as fd:
887 obj.encode2nd(fd.write, iter(state))
891 You **MUST NOT** use 1st pass state if anything is changed in the
892 objects. It is intended to be used immediately after 1st pass is
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
908 .. autofunction:: pyderasn.browse
912 .. autoclass:: pyderasn.Obj
920 .. autoclass:: pyderasn.Boolean
925 .. autoclass:: pyderasn.Integer
926 :members: __init__, named, tohex
930 .. autoclass:: pyderasn.BitString
931 :members: __init__, bit_len, named
935 .. autoclass:: pyderasn.OctetString
940 .. autoclass:: pyderasn.Null
945 .. autoclass:: pyderasn.ObjectIdentifier
950 .. autoclass:: pyderasn.Enumerated
954 .. autoclass:: pyderasn.CommonString
958 .. autoclass:: pyderasn.NumericString
962 .. autoclass:: pyderasn.PrintableString
963 :members: __init__, allow_asterisk, allow_ampersand
967 .. autoclass:: pyderasn.UTCTime
968 :members: __init__, todatetime
972 .. autoclass:: pyderasn.GeneralizedTime
973 :members: __init__, todatetime
980 .. autoclass:: pyderasn.Choice
981 :members: __init__, choice, value
985 .. autoclass:: PrimitiveTypes
989 .. autoclass:: pyderasn.Any
997 .. autoclass:: pyderasn.Sequence
1002 .. autoclass:: pyderasn.Set
1007 .. autoclass:: pyderasn.SequenceOf
1012 .. autoclass:: pyderasn.SetOf
1018 .. autofunction:: pyderasn.abs_decode_path
1019 .. autofunction:: pyderasn.agg_octet_string
1020 .. autofunction:: pyderasn.ascii_visualize
1021 .. autofunction:: pyderasn.colonize_hex
1022 .. autofunction:: pyderasn.encode2pass
1023 .. autofunction:: pyderasn.encode_cer
1024 .. autofunction:: pyderasn.file_mmaped
1025 .. autofunction:: pyderasn.hexenc
1026 .. autofunction:: pyderasn.hexdec
1027 .. autofunction:: pyderasn.hexdump
1028 .. autofunction:: pyderasn.tag_encode
1029 .. autofunction:: pyderasn.tag_decode
1030 .. autofunction:: pyderasn.tag_ctxp
1031 .. autofunction:: pyderasn.tag_ctxc
1032 .. autoclass:: pyderasn.DecodeError
1034 .. autoclass:: pyderasn.NotEnoughData
1035 .. autoclass:: pyderasn.ExceedingData
1036 .. autoclass:: pyderasn.LenIndefForm
1037 .. autoclass:: pyderasn.TagMismatch
1038 .. autoclass:: pyderasn.InvalidLength
1039 .. autoclass:: pyderasn.InvalidOID
1040 .. autoclass:: pyderasn.ObjUnknown
1041 .. autoclass:: pyderasn.ObjNotReady
1042 .. autoclass:: pyderasn.InvalidValueType
1043 .. autoclass:: pyderasn.BoundsError
1050 You can decode DER/BER files using command line abilities::
1052 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1054 If there is no schema for your file, then you can try parsing it without,
1055 but of course IMPLICIT tags will often make it impossible. But result is
1056 good enough for the certificate above::
1058 $ python -m pyderasn path/to/file
1059 0 [1,3,1604] . >: SEQUENCE OF
1060 4 [1,3,1453] . . >: SEQUENCE OF
1061 8 [0,0, 5] . . . . >: [0] ANY
1062 . . . . . A0:03:02:01:02
1063 13 [1,1, 3] . . . . >: INTEGER 61595
1064 18 [1,1, 13] . . . . >: SEQUENCE OF
1065 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1066 31 [1,1, 0] . . . . . . >: NULL
1067 33 [1,3, 274] . . . . >: SEQUENCE OF
1068 37 [1,1, 11] . . . . . . >: SET OF
1069 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1070 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1071 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1073 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1074 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1075 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1076 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1077 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1078 . . . . . . . . . 61:2E:63:6F:6D:2F
1079 1461 [1,1, 13] . . >: SEQUENCE OF
1080 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1081 1474 [1,1, 0] . . . . >: NULL
1082 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1083 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1084 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1090 If you have got dictionaries with ObjectIdentifiers, like example one
1091 from ``tests/test_crts.py``::
1094 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1095 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1097 "2.5.4.10": "id-at-organizationName",
1098 "2.5.4.11": "id-at-organizationalUnitName",
1101 then you can pass it to pretty printer to see human readable OIDs::
1103 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1105 37 [1,1, 11] . . . . . . >: SET OF
1106 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1107 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1108 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1109 50 [1,1, 18] . . . . . . >: SET OF
1110 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1111 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1112 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1113 70 [1,1, 18] . . . . . . >: SET OF
1114 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1115 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1116 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1122 Each decoded element has so-called decode path: sequence of structure
1123 names it is passing during the decode process. Each element has its own
1124 unique path inside the whole ASN.1 tree. You can print it out with
1125 ``--print-decode-path`` option::
1127 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1128 0 [1,3,1604] Certificate SEQUENCE []
1129 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1130 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1131 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1132 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1133 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1134 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1136 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1137 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1138 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1139 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1140 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1141 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1142 . . . . . . . 13:02:45:53
1143 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1146 Now you can print only the specified tree, for example signature algorithm::
1148 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1149 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1150 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1151 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1155 from array import array
1156 from codecs import getdecoder
1157 from codecs import getencoder
1158 from collections import namedtuple
1159 from collections import OrderedDict
1160 from copy import copy
1161 from datetime import datetime
1162 from datetime import timedelta
1163 from io import BytesIO
1164 from math import ceil
1165 from mmap import mmap
1166 from mmap import PROT_READ
1167 from operator import attrgetter
1168 from string import ascii_letters
1169 from string import digits
1170 from sys import maxsize as sys_maxsize
1171 from sys import version_info
1172 from unicodedata import category as unicat
1174 from six import add_metaclass
1175 from six import binary_type
1176 from six import byte2int
1177 from six import indexbytes
1178 from six import int2byte
1179 from six import integer_types
1180 from six import iterbytes
1181 from six import iteritems
1182 from six import itervalues
1184 from six import string_types
1185 from six import text_type
1186 from six import unichr as six_unichr
1187 from six.moves import xrange as six_xrange
1191 from termcolor import colored
1192 except ImportError: # pragma: no cover
1193 def colored(what, *args, **kwargs):
1244 "TagClassApplication",
1247 "TagClassUniversal",
1248 "TagFormConstructed",
1259 TagClassUniversal = 0
1260 TagClassApplication = 1 << 6
1261 TagClassContext = 1 << 7
1262 TagClassPrivate = 1 << 6 | 1 << 7
1263 TagFormPrimitive = 0
1264 TagFormConstructed = 1 << 5
1266 TagClassContext: "",
1267 TagClassApplication: "APPLICATION ",
1268 TagClassPrivate: "PRIVATE ",
1269 TagClassUniversal: "UNIV ",
1273 LENINDEF = b"\x80" # length indefinite mark
1274 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1275 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1276 SET01 = frozenset("01")
1277 DECIMALS = frozenset(digits)
1278 DECIMAL_SIGNS = ".,"
1279 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1282 def file_mmaped(fd):
1283 """Make mmap-ed memoryview for reading from file
1285 :param fd: file object
1286 :returns: memoryview over read-only mmap-ing of the whole file
1288 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1292 if not set(value) <= DECIMALS:
1293 raise ValueError("non-pure integer")
1297 def fractions2float(fractions_raw):
1298 pureint(fractions_raw)
1299 return float("0." + fractions_raw)
1302 def get_def_by_path(defines_by_path, sub_decode_path):
1303 """Get define by decode path
1305 for path, define in defines_by_path:
1306 if len(path) != len(sub_decode_path):
1308 for p1, p2 in zip(path, sub_decode_path):
1309 if (p1 is not any) and (p1 != p2):
1315 ########################################################################
1317 ########################################################################
1319 class ASN1Error(ValueError):
1323 class DecodeError(ASN1Error):
1324 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1326 :param str msg: reason of decode failing
1327 :param klass: optional exact DecodeError inherited class (like
1328 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1329 :py:exc:`InvalidLength`)
1330 :param decode_path: tuple of strings. It contains human
1331 readable names of the fields through which
1332 decoding process has passed
1333 :param int offset: binary offset where failure happened
1335 super(DecodeError, self).__init__()
1338 self.decode_path = decode_path
1339 self.offset = offset
1344 "" if self.klass is None else self.klass.__name__,
1346 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1347 if len(self.decode_path) > 0 else ""
1349 ("(at %d)" % self.offset) if self.offset > 0 else "",
1355 return "%s(%s)" % (self.__class__.__name__, self)
1358 class NotEnoughData(DecodeError):
1362 class ExceedingData(ASN1Error):
1363 def __init__(self, nbytes):
1364 super(ExceedingData, self).__init__()
1365 self.nbytes = nbytes
1368 return "%d trailing bytes" % self.nbytes
1371 return "%s(%s)" % (self.__class__.__name__, self)
1374 class LenIndefForm(DecodeError):
1378 class TagMismatch(DecodeError):
1382 class InvalidLength(DecodeError):
1386 class InvalidOID(DecodeError):
1390 class ObjUnknown(ASN1Error):
1391 def __init__(self, name):
1392 super(ObjUnknown, self).__init__()
1396 return "object is unknown: %s" % self.name
1399 return "%s(%s)" % (self.__class__.__name__, self)
1402 class ObjNotReady(ASN1Error):
1403 def __init__(self, name):
1404 super(ObjNotReady, self).__init__()
1408 return "object is not ready: %s" % self.name
1411 return "%s(%s)" % (self.__class__.__name__, self)
1414 class InvalidValueType(ASN1Error):
1415 def __init__(self, expected_types):
1416 super(InvalidValueType, self).__init__()
1417 self.expected_types = expected_types
1420 return "invalid value type, expected: %s" % ", ".join(
1421 [repr(t) for t in self.expected_types]
1425 return "%s(%s)" % (self.__class__.__name__, self)
1428 class BoundsError(ASN1Error):
1429 def __init__(self, bound_min, value, bound_max):
1430 super(BoundsError, self).__init__()
1431 self.bound_min = bound_min
1433 self.bound_max = bound_max
1436 return "unsatisfied bounds: %s <= %s <= %s" % (
1443 return "%s(%s)" % (self.__class__.__name__, self)
1446 ########################################################################
1448 ########################################################################
1450 _hexdecoder = getdecoder("hex")
1451 _hexencoder = getencoder("hex")
1455 """Binary data to hexadecimal string convert
1457 return _hexdecoder(data)[0]
1461 """Hexadecimal string to binary data convert
1463 return _hexencoder(data)[0].decode("ascii")
1466 def int_bytes_len(num, byte_len=8):
1469 return int(ceil(float(num.bit_length()) / byte_len))
1472 def zero_ended_encode(num):
1473 octets = bytearray(int_bytes_len(num, 7))
1475 octets[i] = num & 0x7F
1479 octets[i] = 0x80 | (num & 0x7F)
1482 return bytes(octets)
1485 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1486 """Encode tag to binary form
1488 :param int num: tag's number
1489 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1490 :py:data:`pyderasn.TagClassContext`,
1491 :py:data:`pyderasn.TagClassApplication`,
1492 :py:data:`pyderasn.TagClassPrivate`)
1493 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1494 :py:data:`pyderasn.TagFormConstructed`)
1498 return int2byte(klass | form | num)
1499 # [XX|X|11111][1.......][1.......] ... [0.......]
1500 return int2byte(klass | form | 31) + zero_ended_encode(num)
1503 def tag_decode(tag):
1504 """Decode tag from binary form
1508 No validation is performed, assuming that it has already passed.
1510 It returns tuple with three integers, as
1511 :py:func:`pyderasn.tag_encode` accepts.
1513 first_octet = byte2int(tag)
1514 klass = first_octet & 0xC0
1515 form = first_octet & 0x20
1516 if first_octet & 0x1F < 0x1F:
1517 return (klass, form, first_octet & 0x1F)
1519 for octet in iterbytes(tag[1:]):
1522 return (klass, form, num)
1526 """Create CONTEXT PRIMITIVE tag
1528 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1532 """Create CONTEXT CONSTRUCTED tag
1534 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1537 def tag_strip(data):
1538 """Take off tag from the data
1540 :returns: (encoded tag, tag length, remaining data)
1543 raise NotEnoughData("no data at all")
1544 if byte2int(data) & 0x1F < 31:
1545 return data[:1], 1, data[1:]
1550 raise DecodeError("unfinished tag")
1551 if indexbytes(data, i) & 0x80 == 0:
1554 return data[:i], i, data[i:]
1560 octets = bytearray(int_bytes_len(l) + 1)
1561 octets[0] = 0x80 | (len(octets) - 1)
1562 for i in six_xrange(len(octets) - 1, 0, -1):
1563 octets[i] = l & 0xFF
1565 return bytes(octets)
1568 def len_decode(data):
1571 :returns: (decoded length, length's length, remaining data)
1572 :raises LenIndefForm: if indefinite form encoding is met
1575 raise NotEnoughData("no data at all")
1576 first_octet = byte2int(data)
1577 if first_octet & 0x80 == 0:
1578 return first_octet, 1, data[1:]
1579 octets_num = first_octet & 0x7F
1580 if octets_num + 1 > len(data):
1581 raise NotEnoughData("encoded length is longer than data")
1583 raise LenIndefForm()
1584 if byte2int(data[1:]) == 0:
1585 raise DecodeError("leading zeros")
1587 for v in iterbytes(data[1:1 + octets_num]):
1590 raise DecodeError("long form instead of short one")
1591 return l, 1 + octets_num, data[1 + octets_num:]
1594 LEN0 = len_encode(0)
1595 LEN1 = len_encode(1)
1596 LEN1K = len_encode(1000)
1600 """How many bytes length field will take
1604 if l < 256: # 1 << 8
1606 if l < 65536: # 1 << 16
1608 if l < 16777216: # 1 << 24
1610 if l < 4294967296: # 1 << 32
1612 if l < 1099511627776: # 1 << 40
1614 if l < 281474976710656: # 1 << 48
1616 if l < 72057594037927936: # 1 << 56
1618 raise OverflowError("too big length")
1621 def write_full(writer, data):
1622 """Fully write provided data
1624 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1626 BytesIO does not guarantee that the whole data will be written at
1627 once. That function write everything provided, raising an error if
1628 ``writer`` returns None.
1630 data = memoryview(data)
1632 while written != len(data):
1633 n = writer(data[written:])
1635 raise ValueError("can not write to buf")
1639 # If it is 64-bit system, then use compact 64-bit array of unsigned
1640 # longs. Use an ordinary list with universal integers otherwise, that
1642 if sys_maxsize > 2 ** 32:
1643 def state_2pass_new():
1646 def state_2pass_new():
1650 ########################################################################
1652 ########################################################################
1654 class AutoAddSlots(type):
1655 def __new__(cls, name, bases, _dict):
1656 _dict["__slots__"] = _dict.get("__slots__", ())
1657 return type.__new__(cls, name, bases, _dict)
1660 BasicState = namedtuple("BasicState", (
1673 ), **NAMEDTUPLE_KWARGS)
1676 @add_metaclass(AutoAddSlots)
1678 """Common ASN.1 object class
1680 All ASN.1 types are inherited from it. It has metaclass that
1681 automatically adds ``__slots__`` to all inherited classes.
1706 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1707 self._expl = getattr(self, "expl", None) if expl is None else expl
1708 if self.tag != self.tag_default and self._expl is not None:
1709 raise ValueError("implicit and explicit tags can not be set simultaneously")
1710 if self.tag is None:
1711 self._tag_order = None
1713 tag_class, _, tag_num = tag_decode(
1714 self.tag if self._expl is None else self._expl
1716 self._tag_order = (tag_class, tag_num)
1717 if default is not None:
1719 self.optional = optional
1720 self.offset, self.llen, self.vlen = _decoded
1722 self.expl_lenindef = False
1723 self.lenindef = False
1724 self.ber_encoded = False
1727 def ready(self): # pragma: no cover
1728 """Is object ready to be encoded?
1730 raise NotImplementedError()
1732 def _assert_ready(self):
1734 raise ObjNotReady(self.__class__.__name__)
1738 """Is either object or any elements inside is BER encoded?
1740 return self.expl_lenindef or self.lenindef or self.ber_encoded
1744 """Is object decoded?
1746 return (self.llen + self.vlen) > 0
1748 def __getstate__(self): # pragma: no cover
1749 """Used for making safe to be mutable pickleable copies
1751 raise NotImplementedError()
1753 def __setstate__(self, state):
1754 if state.version != __version__:
1755 raise ValueError("data is pickled by different PyDERASN version")
1756 self.tag = state.tag
1757 self._tag_order = state.tag_order
1758 self._expl = state.expl
1759 self.default = state.default
1760 self.optional = state.optional
1761 self.offset = state.offset
1762 self.llen = state.llen
1763 self.vlen = state.vlen
1764 self.expl_lenindef = state.expl_lenindef
1765 self.lenindef = state.lenindef
1766 self.ber_encoded = state.ber_encoded
1769 def tag_order(self):
1770 """Tag's (class, number) used for DER/CER sorting
1772 return self._tag_order
1775 def tag_order_cer(self):
1776 return self.tag_order
1780 """.. seealso:: :ref:`decoding`
1782 return len(self.tag)
1786 """.. seealso:: :ref:`decoding`
1788 return self.tlen + self.llen + self.vlen
1790 def __str__(self): # pragma: no cover
1791 return self.__bytes__() if PY2 else self.__unicode__()
1793 def __ne__(self, their):
1794 return not(self == their)
1796 def __gt__(self, their): # pragma: no cover
1797 return not(self < their)
1799 def __le__(self, their): # pragma: no cover
1800 return (self == their) or (self < their)
1802 def __ge__(self, their): # pragma: no cover
1803 return (self == their) or (self > their)
1805 def _encode(self): # pragma: no cover
1806 raise NotImplementedError()
1808 def _encode_cer(self, writer):
1809 write_full(writer, self._encode())
1811 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1812 yield NotImplemented
1814 def _encode1st(self, state):
1815 raise NotImplementedError()
1817 def _encode2nd(self, writer, state_iter):
1818 raise NotImplementedError()
1821 """DER encode the structure
1823 :returns: DER representation
1825 raw = self._encode()
1826 if self._expl is None:
1828 return b"".join((self._expl, len_encode(len(raw)), raw))
1830 def encode1st(self, state=None):
1831 """Do the 1st pass of 2-pass encoding
1833 :rtype: (int, array("L"))
1834 :returns: full length of encoded data and precalculated various
1838 state = state_2pass_new()
1839 if self._expl is None:
1840 return self._encode1st(state)
1842 idx = len(state) - 1
1843 vlen, _ = self._encode1st(state)
1845 fulllen = len(self._expl) + len_size(vlen) + vlen
1846 return fulllen, state
1848 def encode2nd(self, writer, state_iter):
1849 """Do the 2nd pass of 2-pass encoding
1851 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1852 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1854 if self._expl is None:
1855 self._encode2nd(writer, state_iter)
1857 write_full(writer, self._expl + len_encode(next(state_iter)))
1858 self._encode2nd(writer, state_iter)
1860 def encode_cer(self, writer):
1861 """CER encode the structure to specified writer
1863 :param writer: must comply with ``io.RawIOBase.write``
1864 behaviour. It takes slice to be written and
1865 returns number of bytes processed. If it returns
1866 None, then exception will be raised
1868 if self._expl is not None:
1869 write_full(writer, self._expl + LENINDEF)
1870 if getattr(self, "der_forced", False):
1871 write_full(writer, self._encode())
1873 self._encode_cer(writer)
1874 if self._expl is not None:
1875 write_full(writer, EOC)
1877 def hexencode(self):
1878 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1880 return hexenc(self.encode())
1890 _ctx_immutable=True,
1894 :param data: either binary or memoryview
1895 :param int offset: initial data's offset
1896 :param bool leavemm: do we need to leave memoryview of remaining
1897 data as is, or convert it to bytes otherwise
1898 :param decode_path: current decode path (tuples of strings,
1899 possibly with DecodePathDefBy) with will be
1900 the root for all underlying objects
1901 :param ctx: optional :ref:`context <ctx>` governing decoding process
1902 :param bool tag_only: decode only the tag, without length and
1903 contents (used only in Choice and Set
1904 structures, trying to determine if tag satisfies
1906 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1908 :returns: (Obj, remaining data)
1910 .. seealso:: :ref:`decoding`
1912 result = next(self.decode_evgen(
1924 _, obj, tail = result
1935 _ctx_immutable=True,
1938 """Decode with evgen mode on
1940 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1941 it returns the generator producing ``(decode_path, obj, tail)``
1943 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1947 elif _ctx_immutable:
1949 tlv = memoryview(data)
1952 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1955 if self._expl is None:
1956 for result in self._decode(
1959 decode_path=decode_path,
1962 evgen_mode=_evgen_mode,
1967 _decode_path, obj, tail = result
1968 if _decode_path is not decode_path:
1972 t, tlen, lv = tag_strip(tlv)
1973 except DecodeError as err:
1974 raise err.__class__(
1976 klass=self.__class__,
1977 decode_path=decode_path,
1982 klass=self.__class__,
1983 decode_path=decode_path,
1987 l, llen, v = len_decode(lv)
1988 except LenIndefForm as err:
1989 if not ctx.get("bered", False):
1990 raise err.__class__(
1992 klass=self.__class__,
1993 decode_path=decode_path,
1997 offset += tlen + llen
1998 for result in self._decode(
2001 decode_path=decode_path,
2004 evgen_mode=_evgen_mode,
2006 if tag_only: # pragma: no cover
2009 _decode_path, obj, tail = result
2010 if _decode_path is not decode_path:
2012 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2013 if eoc_expected.tobytes() != EOC:
2016 klass=self.__class__,
2017 decode_path=decode_path,
2021 obj.expl_lenindef = True
2022 except DecodeError as err:
2023 raise err.__class__(
2025 klass=self.__class__,
2026 decode_path=decode_path,
2031 raise NotEnoughData(
2032 "encoded length is longer than data",
2033 klass=self.__class__,
2034 decode_path=decode_path,
2037 for result in self._decode(
2039 offset=offset + tlen + llen,
2040 decode_path=decode_path,
2043 evgen_mode=_evgen_mode,
2045 if tag_only: # pragma: no cover
2048 _decode_path, obj, tail = result
2049 if _decode_path is not decode_path:
2051 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2053 "explicit tag out-of-bound, longer than data",
2054 klass=self.__class__,
2055 decode_path=decode_path,
2058 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2060 def decod(self, data, offset=0, decode_path=(), ctx=None):
2061 """Decode the data, check that tail is empty
2063 :raises ExceedingData: if tail is not empty
2065 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2066 (decode without tail) that also checks that there is no
2069 obj, tail = self.decode(
2072 decode_path=decode_path,
2077 raise ExceedingData(len(tail))
2080 def hexdecode(self, data, *args, **kwargs):
2081 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2083 return self.decode(hexdec(data), *args, **kwargs)
2085 def hexdecod(self, data, *args, **kwargs):
2086 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2088 return self.decod(hexdec(data), *args, **kwargs)
2092 """.. seealso:: :ref:`decoding`
2094 return self._expl is not None
2098 """.. seealso:: :ref:`decoding`
2103 def expl_tlen(self):
2104 """.. seealso:: :ref:`decoding`
2106 return len(self._expl)
2109 def expl_llen(self):
2110 """.. seealso:: :ref:`decoding`
2112 if self.expl_lenindef:
2114 return len(len_encode(self.tlvlen))
2117 def expl_offset(self):
2118 """.. seealso:: :ref:`decoding`
2120 return self.offset - self.expl_tlen - self.expl_llen
2123 def expl_vlen(self):
2124 """.. seealso:: :ref:`decoding`
2129 def expl_tlvlen(self):
2130 """.. seealso:: :ref:`decoding`
2132 return self.expl_tlen + self.expl_llen + self.expl_vlen
2135 def fulloffset(self):
2136 """.. seealso:: :ref:`decoding`
2138 return self.expl_offset if self.expled else self.offset
2142 """.. seealso:: :ref:`decoding`
2144 return self.expl_tlvlen if self.expled else self.tlvlen
2146 def pps_lenindef(self, decode_path):
2147 if self.lenindef and not (
2148 getattr(self, "defined", None) is not None and
2149 self.defined[1].lenindef
2152 asn1_type_name="EOC",
2154 decode_path=decode_path,
2156 self.offset + self.tlvlen -
2157 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2165 if self.expl_lenindef:
2167 asn1_type_name="EOC",
2168 obj_name="EXPLICIT",
2169 decode_path=decode_path,
2170 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2179 def encode_cer(obj):
2180 """Encode to CER in memory buffer
2182 :returns bytes: memory buffer contents
2185 obj.encode_cer(buf.write)
2186 return buf.getvalue()
2189 def encode2pass(obj):
2190 """Encode (2-pass mode) to DER in memory buffer
2192 :returns bytes: memory buffer contents
2195 _, state = obj.encode1st()
2196 obj.encode2nd(buf.write, iter(state))
2197 return buf.getvalue()
2200 class DecodePathDefBy(object):
2201 """DEFINED BY representation inside decode path
2203 __slots__ = ("defined_by",)
2205 def __init__(self, defined_by):
2206 self.defined_by = defined_by
2208 def __ne__(self, their):
2209 return not(self == their)
2211 def __eq__(self, their):
2212 if not isinstance(their, self.__class__):
2214 return self.defined_by == their.defined_by
2217 return "DEFINED BY " + str(self.defined_by)
2220 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2223 ########################################################################
2225 ########################################################################
2227 PP = namedtuple("PP", (
2250 ), **NAMEDTUPLE_KWARGS)
2255 asn1_type_name="unknown",
2272 expl_lenindef=False,
2303 def _colourize(what, colour, with_colours, attrs=("bold",)):
2304 return colored(what, colour, attrs=attrs) if with_colours else what
2307 def colonize_hex(hexed):
2308 """Separate hexadecimal string with colons
2310 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2313 def find_oid_name(asn1_type_name, oid_maps, value):
2314 if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2315 for oid_map in oid_maps:
2316 oid_name = oid_map.get(value)
2317 if oid_name is not None:
2328 with_decode_path=False,
2329 decode_path_len_decrease=0,
2336 " " if pp.expl_offset is None else
2337 ("-%d" % (pp.offset - pp.expl_offset))
2339 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2341 col = _colourize(col, "red", with_colours, ())
2342 col += _colourize("B", "red", with_colours) if pp.bered else " "
2344 col = "[%d,%d,%4d]%s" % (
2345 pp.tlen, pp.llen, pp.vlen,
2346 LENINDEF_PP_CHAR if pp.lenindef else " "
2348 col = _colourize(col, "green", with_colours, ())
2350 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2351 if decode_path_len > 0:
2352 cols.append(" ." * decode_path_len)
2353 ent = pp.decode_path[-1]
2354 if isinstance(ent, DecodePathDefBy):
2355 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2356 value = str(ent.defined_by)
2357 oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2358 if oid_name is None:
2359 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2361 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2363 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2364 if pp.expl is not None:
2365 klass, _, num = pp.expl
2366 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2367 cols.append(_colourize(col, "blue", with_colours))
2368 if pp.impl is not None:
2369 klass, _, num = pp.impl
2370 col = "[%s%d]" % (TagClassReprs[klass], num)
2371 cols.append(_colourize(col, "blue", with_colours))
2372 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2373 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2375 cols.append(_colourize("BER", "red", with_colours))
2376 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2377 if pp.value is not None:
2379 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2380 oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2381 if oid_name is not None:
2382 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2383 if pp.asn1_type_name == Integer.asn1_type_name:
2384 cols.append(_colourize(
2385 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2388 if pp.blob.__class__ == binary_type:
2389 cols.append(hexenc(pp.blob))
2390 elif pp.blob.__class__ == tuple:
2391 cols.append(", ".join(pp.blob))
2393 cols.append(_colourize("OPTIONAL", "red", with_colours))
2395 cols.append(_colourize("DEFAULT", "red", with_colours))
2396 if with_decode_path:
2397 cols.append(_colourize(
2398 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2402 return " ".join(cols)
2405 def pp_console_blob(pp, decode_path_len_decrease=0):
2406 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2407 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2408 if decode_path_len > 0:
2409 cols.append(" ." * (decode_path_len + 1))
2410 if pp.blob.__class__ == binary_type:
2411 blob = hexenc(pp.blob).upper()
2412 for i in six_xrange(0, len(blob), 32):
2413 chunk = blob[i:i + 32]
2414 yield " ".join(cols + [colonize_hex(chunk)])
2415 elif pp.blob.__class__ == tuple:
2416 yield " ".join(cols + [", ".join(pp.blob)])
2424 with_decode_path=False,
2425 decode_path_only=(),
2428 """Pretty print object
2430 :param Obj obj: object you want to pretty print
2431 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2432 Its human readable form is printed when OID is met
2433 :param big_blobs: if large binary objects are met (like OctetString
2434 values), do we need to print them too, on separate
2436 :param with_colours: colourize output, if ``termcolor`` library
2438 :param with_decode_path: print decode path
2439 :param decode_path_only: print only that specified decode path
2441 def _pprint_pps(pps):
2443 if hasattr(pp, "_fields"):
2445 decode_path_only != () and
2447 str(p) for p in pp.decode_path[:len(decode_path_only)]
2448 ) != decode_path_only
2452 yield pp_console_row(
2457 with_colours=with_colours,
2458 with_decode_path=with_decode_path,
2459 decode_path_len_decrease=len(decode_path_only),
2461 for row in pp_console_blob(
2463 decode_path_len_decrease=len(decode_path_only),
2467 yield pp_console_row(
2472 with_colours=with_colours,
2473 with_decode_path=with_decode_path,
2474 decode_path_len_decrease=len(decode_path_only),
2477 for row in _pprint_pps(pp):
2479 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2482 ########################################################################
2483 # ASN.1 primitive types
2484 ########################################################################
2486 BooleanState = namedtuple(
2488 BasicState._fields + ("value",),
2494 """``BOOLEAN`` boolean type
2496 >>> b = Boolean(True)
2498 >>> b == Boolean(True)
2504 tag_default = tag_encode(1)
2505 asn1_type_name = "BOOLEAN"
2517 :param value: set the value. Either boolean type, or
2518 :py:class:`pyderasn.Boolean` object
2519 :param bytes impl: override default tag with ``IMPLICIT`` one
2520 :param bytes expl: override default tag with ``EXPLICIT`` one
2521 :param default: set default value. Type same as in ``value``
2522 :param bool optional: is object ``OPTIONAL`` in sequence
2524 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2525 self._value = None if value is None else self._value_sanitize(value)
2526 if default is not None:
2527 default = self._value_sanitize(default)
2528 self.default = self.__class__(
2534 self._value = default
2536 def _value_sanitize(self, value):
2537 if value.__class__ == bool:
2539 if issubclass(value.__class__, Boolean):
2541 raise InvalidValueType((self.__class__, bool))
2545 return self._value is not None
2547 def __getstate__(self):
2548 return BooleanState(
2564 def __setstate__(self, state):
2565 super(Boolean, self).__setstate__(state)
2566 self._value = state.value
2568 def __nonzero__(self):
2569 self._assert_ready()
2573 self._assert_ready()
2576 def __eq__(self, their):
2577 if their.__class__ == bool:
2578 return self._value == their
2579 if not issubclass(their.__class__, Boolean):
2582 self._value == their._value and
2583 self.tag == their.tag and
2584 self._expl == their._expl
2595 return self.__class__(
2597 impl=self.tag if impl is None else impl,
2598 expl=self._expl if expl is None else expl,
2599 default=self.default if default is None else default,
2600 optional=self.optional if optional is None else optional,
2604 self._assert_ready()
2605 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2607 def _encode1st(self, state):
2608 return len(self.tag) + 2, state
2610 def _encode2nd(self, writer, state_iter):
2611 self._assert_ready()
2612 write_full(writer, self._encode())
2614 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2616 t, _, lv = tag_strip(tlv)
2617 except DecodeError as err:
2618 raise err.__class__(
2620 klass=self.__class__,
2621 decode_path=decode_path,
2626 klass=self.__class__,
2627 decode_path=decode_path,
2634 l, _, v = len_decode(lv)
2635 except DecodeError as err:
2636 raise err.__class__(
2638 klass=self.__class__,
2639 decode_path=decode_path,
2643 raise InvalidLength(
2644 "Boolean's length must be equal to 1",
2645 klass=self.__class__,
2646 decode_path=decode_path,
2650 raise NotEnoughData(
2651 "encoded length is longer than data",
2652 klass=self.__class__,
2653 decode_path=decode_path,
2656 first_octet = byte2int(v)
2658 if first_octet == 0:
2660 elif first_octet == 0xFF:
2662 elif ctx.get("bered", False):
2667 "unacceptable Boolean value",
2668 klass=self.__class__,
2669 decode_path=decode_path,
2672 obj = self.__class__(
2676 default=self.default,
2677 optional=self.optional,
2678 _decoded=(offset, 1, 1),
2680 obj.ber_encoded = ber_encoded
2681 yield decode_path, obj, v[1:]
2684 return pp_console_row(next(self.pps()))
2686 def pps(self, decode_path=()):
2689 asn1_type_name=self.asn1_type_name,
2690 obj_name=self.__class__.__name__,
2691 decode_path=decode_path,
2692 value=str(self._value) if self.ready else None,
2693 optional=self.optional,
2694 default=self == self.default,
2695 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2696 expl=None if self._expl is None else tag_decode(self._expl),
2701 expl_offset=self.expl_offset if self.expled else None,
2702 expl_tlen=self.expl_tlen if self.expled else None,
2703 expl_llen=self.expl_llen if self.expled else None,
2704 expl_vlen=self.expl_vlen if self.expled else None,
2705 expl_lenindef=self.expl_lenindef,
2706 ber_encoded=self.ber_encoded,
2709 for pp in self.pps_lenindef(decode_path):
2713 IntegerState = namedtuple(
2715 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2721 """``INTEGER`` integer type
2723 >>> b = Integer(-123)
2725 >>> b == Integer(-123)
2730 >>> Integer(2, bounds=(1, 3))
2732 >>> Integer(5, bounds=(1, 3))
2733 Traceback (most recent call last):
2734 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2738 class Version(Integer):
2745 >>> v = Version("v1")
2752 {'v3': 2, 'v1': 0, 'v2': 1}
2754 __slots__ = ("specs", "_bound_min", "_bound_max")
2755 tag_default = tag_encode(2)
2756 asn1_type_name = "INTEGER"
2770 :param value: set the value. Either integer type, named value
2771 (if ``schema`` is specified in the class), or
2772 :py:class:`pyderasn.Integer` object
2773 :param bounds: set ``(MIN, MAX)`` value constraint.
2774 (-inf, +inf) by default
2775 :param bytes impl: override default tag with ``IMPLICIT`` one
2776 :param bytes expl: override default tag with ``EXPLICIT`` one
2777 :param default: set default value. Type same as in ``value``
2778 :param bool optional: is object ``OPTIONAL`` in sequence
2780 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2782 specs = getattr(self, "schema", {}) if _specs is None else _specs
2783 self.specs = specs if specs.__class__ == dict else dict(specs)
2784 self._bound_min, self._bound_max = getattr(
2787 (float("-inf"), float("+inf")),
2788 ) if bounds is None else bounds
2789 if value is not None:
2790 self._value = self._value_sanitize(value)
2791 if default is not None:
2792 default = self._value_sanitize(default)
2793 self.default = self.__class__(
2799 if self._value is None:
2800 self._value = default
2802 def _value_sanitize(self, value):
2803 if isinstance(value, integer_types):
2805 elif issubclass(value.__class__, Integer):
2806 value = value._value
2807 elif value.__class__ == str:
2808 value = self.specs.get(value)
2810 raise ObjUnknown("integer value: %s" % value)
2812 raise InvalidValueType((self.__class__, int, str))
2813 if not self._bound_min <= value <= self._bound_max:
2814 raise BoundsError(self._bound_min, value, self._bound_max)
2819 return self._value is not None
2821 def __getstate__(self):
2822 return IntegerState(
2841 def __setstate__(self, state):
2842 super(Integer, self).__setstate__(state)
2843 self.specs = state.specs
2844 self._value = state.value
2845 self._bound_min = state.bound_min
2846 self._bound_max = state.bound_max
2849 self._assert_ready()
2850 return int(self._value)
2853 """Hexadecimal representation
2855 Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2857 hex_repr = hex(int(self))[2:].upper()
2858 if len(hex_repr) % 2 != 0:
2859 hex_repr = "0" + hex_repr
2863 self._assert_ready()
2864 return hash(b"".join((
2866 bytes(self._expl or b""),
2867 str(self._value).encode("ascii"),
2870 def __eq__(self, their):
2871 if isinstance(their, integer_types):
2872 return self._value == their
2873 if not issubclass(their.__class__, Integer):
2876 self._value == their._value and
2877 self.tag == their.tag and
2878 self._expl == their._expl
2881 def __lt__(self, their):
2882 return self._value < their._value
2886 """Return named representation (if exists) of the value
2888 for name, value in iteritems(self.specs):
2889 if value == self._value:
2902 return self.__class__(
2905 (self._bound_min, self._bound_max)
2906 if bounds is None else bounds
2908 impl=self.tag if impl is None else impl,
2909 expl=self._expl if expl is None else expl,
2910 default=self.default if default is None else default,
2911 optional=self.optional if optional is None else optional,
2915 def _encode_payload(self):
2916 self._assert_ready()
2920 octets = bytearray([0])
2924 octets = bytearray()
2926 octets.append((value & 0xFF) ^ 0xFF)
2928 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2931 octets = bytearray()
2933 octets.append(value & 0xFF)
2935 if octets[-1] & 0x80 > 0:
2938 octets = bytes(octets)
2940 bytes_len = ceil(value.bit_length() / 8) or 1
2943 octets = value.to_bytes(
2948 except OverflowError:
2955 octets = self._encode_payload()
2956 return b"".join((self.tag, len_encode(len(octets)), octets))
2958 def _encode1st(self, state):
2959 l = len(self._encode_payload())
2960 return len(self.tag) + len_size(l) + l, state
2962 def _encode2nd(self, writer, state_iter):
2963 write_full(writer, self._encode())
2965 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2967 t, _, lv = tag_strip(tlv)
2968 except DecodeError as err:
2969 raise err.__class__(
2971 klass=self.__class__,
2972 decode_path=decode_path,
2977 klass=self.__class__,
2978 decode_path=decode_path,
2985 l, llen, v = len_decode(lv)
2986 except DecodeError as err:
2987 raise err.__class__(
2989 klass=self.__class__,
2990 decode_path=decode_path,
2994 raise NotEnoughData(
2995 "encoded length is longer than data",
2996 klass=self.__class__,
2997 decode_path=decode_path,
3001 raise NotEnoughData(
3003 klass=self.__class__,
3004 decode_path=decode_path,
3007 v, tail = v[:l], v[l:]
3008 first_octet = byte2int(v)
3010 second_octet = byte2int(v[1:])
3012 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3013 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3016 "non normalized integer",
3017 klass=self.__class__,
3018 decode_path=decode_path,
3023 if first_octet & 0x80 > 0:
3024 octets = bytearray()
3025 for octet in bytearray(v):
3026 octets.append(octet ^ 0xFF)
3027 for octet in octets:
3028 value = (value << 8) | octet
3032 for octet in bytearray(v):
3033 value = (value << 8) | octet
3035 value = int.from_bytes(v, byteorder="big", signed=True)
3037 obj = self.__class__(
3039 bounds=(self._bound_min, self._bound_max),
3042 default=self.default,
3043 optional=self.optional,
3045 _decoded=(offset, llen, l),
3047 except BoundsError as err:
3050 klass=self.__class__,
3051 decode_path=decode_path,
3054 yield decode_path, obj, tail
3057 return pp_console_row(next(self.pps()))
3059 def pps(self, decode_path=()):
3062 asn1_type_name=self.asn1_type_name,
3063 obj_name=self.__class__.__name__,
3064 decode_path=decode_path,
3065 value=(self.named or str(self._value)) if self.ready else None,
3066 optional=self.optional,
3067 default=self == self.default,
3068 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3069 expl=None if self._expl is None else tag_decode(self._expl),
3074 expl_offset=self.expl_offset if self.expled else None,
3075 expl_tlen=self.expl_tlen if self.expled else None,
3076 expl_llen=self.expl_llen if self.expled else None,
3077 expl_vlen=self.expl_vlen if self.expled else None,
3078 expl_lenindef=self.expl_lenindef,
3081 for pp in self.pps_lenindef(decode_path):
3085 BitStringState = namedtuple(
3087 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3092 class BitString(Obj):
3093 """``BIT STRING`` bit string type
3095 >>> BitString(b"hello world")
3096 BIT STRING 88 bits 68656c6c6f20776f726c64
3099 >>> b == b"hello world"
3104 >>> BitString("'0A3B5F291CD'H")
3105 BIT STRING 44 bits 0a3b5f291cd0
3106 >>> b = BitString("'010110000000'B")
3107 BIT STRING 12 bits 5800
3110 >>> b[0], b[1], b[2], b[3]
3111 (False, True, False, True)
3115 [False, True, False, True, True, False, False, False, False, False, False, False]
3119 class KeyUsage(BitString):
3121 ("digitalSignature", 0),
3122 ("nonRepudiation", 1),
3123 ("keyEncipherment", 2),
3126 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3127 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3129 ['nonRepudiation', 'keyEncipherment']
3131 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3135 Pay attention that BIT STRING can be encoded both in primitive
3136 and constructed forms. Decoder always checks constructed form tag
3137 additionally to specified primitive one. If BER decoding is
3138 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3139 of DER restrictions.
3141 __slots__ = ("tag_constructed", "specs", "defined")
3142 tag_default = tag_encode(3)
3143 asn1_type_name = "BIT STRING"
3156 :param value: set the value. Either binary type, tuple of named
3157 values (if ``schema`` is specified in the class),
3158 string in ``'XXX...'B`` form, or
3159 :py:class:`pyderasn.BitString` object
3160 :param bytes impl: override default tag with ``IMPLICIT`` one
3161 :param bytes expl: override default tag with ``EXPLICIT`` one
3162 :param default: set default value. Type same as in ``value``
3163 :param bool optional: is object ``OPTIONAL`` in sequence
3165 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3166 specs = getattr(self, "schema", {}) if _specs is None else _specs
3167 self.specs = specs if specs.__class__ == dict else dict(specs)
3168 self._value = None if value is None else self._value_sanitize(value)
3169 if default is not None:
3170 default = self._value_sanitize(default)
3171 self.default = self.__class__(
3177 self._value = default
3179 tag_klass, _, tag_num = tag_decode(self.tag)
3180 self.tag_constructed = tag_encode(
3182 form=TagFormConstructed,
3186 def _bits2octets(self, bits):
3187 if len(self.specs) > 0:
3188 bits = bits.rstrip("0")
3190 bits += "0" * ((8 - (bit_len % 8)) % 8)
3191 octets = bytearray(len(bits) // 8)
3192 for i in six_xrange(len(octets)):
3193 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3194 return bit_len, bytes(octets)
3196 def _value_sanitize(self, value):
3197 if isinstance(value, (string_types, binary_type)):
3199 isinstance(value, string_types) and
3200 value.startswith("'")
3202 if value.endswith("'B"):
3204 if not frozenset(value) <= SET01:
3205 raise ValueError("B's coding contains unacceptable chars")
3206 return self._bits2octets(value)
3207 if value.endswith("'H"):
3211 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3213 if value.__class__ == binary_type:
3214 return (len(value) * 8, value)
3215 raise InvalidValueType((self.__class__, string_types, binary_type))
3216 if value.__class__ == tuple:
3219 isinstance(value[0], integer_types) and
3220 value[1].__class__ == binary_type
3225 bit = self.specs.get(name)
3227 raise ObjUnknown("BitString value: %s" % name)
3230 return self._bits2octets("")
3231 bits = frozenset(bits)
3232 return self._bits2octets("".join(
3233 ("1" if bit in bits else "0")
3234 for bit in six_xrange(max(bits) + 1)
3236 if issubclass(value.__class__, BitString):
3238 raise InvalidValueType((self.__class__, binary_type, string_types))
3242 return self._value is not None
3244 def __getstate__(self):
3245 return BitStringState(
3260 self.tag_constructed,
3264 def __setstate__(self, state):
3265 super(BitString, self).__setstate__(state)
3266 self.specs = state.specs
3267 self._value = state.value
3268 self.tag_constructed = state.tag_constructed
3269 self.defined = state.defined
3272 self._assert_ready()
3273 for i in six_xrange(self._value[0]):
3278 """Returns number of bits in the string
3280 self._assert_ready()
3281 return self._value[0]
3283 def __bytes__(self):
3284 self._assert_ready()
3285 return self._value[1]
3287 def __eq__(self, their):
3288 if their.__class__ == bytes:
3289 return self._value[1] == their
3290 if not issubclass(their.__class__, BitString):
3293 self._value == their._value and
3294 self.tag == their.tag and
3295 self._expl == their._expl
3300 """Named representation (if exists) of the bits
3302 :returns: [str(name), ...]
3304 return [name for name, bit in iteritems(self.specs) if self[bit]]
3314 return self.__class__(
3316 impl=self.tag if impl is None else impl,
3317 expl=self._expl if expl is None else expl,
3318 default=self.default if default is None else default,
3319 optional=self.optional if optional is None else optional,
3323 def __getitem__(self, key):
3324 if key.__class__ == int:
3325 bit_len, octets = self._value
3329 byte2int(memoryview(octets)[key // 8:]) >>
3332 if isinstance(key, string_types):
3333 value = self.specs.get(key)
3335 raise ObjUnknown("BitString value: %s" % key)
3337 raise InvalidValueType((int, str))
3340 self._assert_ready()
3341 bit_len, octets = self._value
3344 len_encode(len(octets) + 1),
3345 int2byte((8 - bit_len % 8) % 8),
3349 def _encode1st(self, state):
3350 self._assert_ready()
3351 _, octets = self._value
3353 return len(self.tag) + len_size(l) + l, state
3355 def _encode2nd(self, writer, state_iter):
3356 bit_len, octets = self._value
3357 write_full(writer, b"".join((
3359 len_encode(len(octets) + 1),
3360 int2byte((8 - bit_len % 8) % 8),
3362 write_full(writer, octets)
3364 def _encode_cer(self, writer):
3365 bit_len, octets = self._value
3366 if len(octets) + 1 <= 1000:
3367 write_full(writer, self._encode())
3369 write_full(writer, self.tag_constructed)
3370 write_full(writer, LENINDEF)
3371 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3372 write_full(writer, b"".join((
3373 BitString.tag_default,
3376 octets[offset:offset + 999],
3378 tail = octets[offset + 999:]
3380 tail = int2byte((8 - bit_len % 8) % 8) + tail
3381 write_full(writer, b"".join((
3382 BitString.tag_default,
3383 len_encode(len(tail)),
3386 write_full(writer, EOC)
3388 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3390 t, tlen, lv = tag_strip(tlv)
3391 except DecodeError as err:
3392 raise err.__class__(
3394 klass=self.__class__,
3395 decode_path=decode_path,
3399 if tag_only: # pragma: no cover
3403 l, llen, v = len_decode(lv)
3404 except DecodeError as err:
3405 raise err.__class__(
3407 klass=self.__class__,
3408 decode_path=decode_path,
3412 raise NotEnoughData(
3413 "encoded length is longer than data",
3414 klass=self.__class__,
3415 decode_path=decode_path,
3419 raise NotEnoughData(
3421 klass=self.__class__,
3422 decode_path=decode_path,
3425 pad_size = byte2int(v)
3426 if l == 1 and pad_size != 0:
3428 "invalid empty value",
3429 klass=self.__class__,
3430 decode_path=decode_path,
3436 klass=self.__class__,
3437 decode_path=decode_path,
3440 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3443 klass=self.__class__,
3444 decode_path=decode_path,
3447 v, tail = v[:l], v[l:]
3448 bit_len = (len(v) - 1) * 8 - pad_size
3449 obj = self.__class__(
3450 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3453 default=self.default,
3454 optional=self.optional,
3456 _decoded=(offset, llen, l),
3459 obj._value = (bit_len, None)
3460 yield decode_path, obj, tail
3462 if t != self.tag_constructed:
3464 klass=self.__class__,
3465 decode_path=decode_path,
3468 if not ctx.get("bered", False):
3470 "unallowed BER constructed encoding",
3471 klass=self.__class__,
3472 decode_path=decode_path,
3475 if tag_only: # pragma: no cover
3480 l, llen, v = len_decode(lv)
3481 except LenIndefForm:
3482 llen, l, v = 1, 0, lv[1:]
3484 except DecodeError as err:
3485 raise err.__class__(
3487 klass=self.__class__,
3488 decode_path=decode_path,
3492 raise NotEnoughData(
3493 "encoded length is longer than data",
3494 klass=self.__class__,
3495 decode_path=decode_path,
3498 if not lenindef and l == 0:
3499 raise NotEnoughData(
3501 klass=self.__class__,
3502 decode_path=decode_path,
3506 sub_offset = offset + tlen + llen
3510 if v[:EOC_LEN].tobytes() == EOC:
3517 "chunk out of bounds",
3518 klass=self.__class__,
3519 decode_path=decode_path + (str(len(chunks) - 1),),
3520 offset=chunks[-1].offset,
3522 sub_decode_path = decode_path + (str(len(chunks)),)
3525 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3528 decode_path=sub_decode_path,
3531 _ctx_immutable=False,
3533 yield _decode_path, chunk, v_tail
3535 _, chunk, v_tail = next(BitString().decode_evgen(
3538 decode_path=sub_decode_path,
3541 _ctx_immutable=False,
3546 "expected BitString encoded chunk",
3547 klass=self.__class__,
3548 decode_path=sub_decode_path,
3551 chunks.append(chunk)
3552 sub_offset += chunk.tlvlen
3553 vlen += chunk.tlvlen
3555 if len(chunks) == 0:
3558 klass=self.__class__,
3559 decode_path=decode_path,
3564 for chunk_i, chunk in enumerate(chunks[:-1]):
3565 if chunk.bit_len % 8 != 0:
3567 "BitString chunk is not multiple of 8 bits",
3568 klass=self.__class__,
3569 decode_path=decode_path + (str(chunk_i),),
3570 offset=chunk.offset,
3573 values.append(bytes(chunk))
3574 bit_len += chunk.bit_len
3575 chunk_last = chunks[-1]
3577 values.append(bytes(chunk_last))
3578 bit_len += chunk_last.bit_len
3579 obj = self.__class__(
3580 value=None if evgen_mode else (bit_len, b"".join(values)),
3583 default=self.default,
3584 optional=self.optional,
3586 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3589 obj._value = (bit_len, None)
3590 obj.lenindef = lenindef
3591 obj.ber_encoded = True
3592 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3595 return pp_console_row(next(self.pps()))
3597 def pps(self, decode_path=()):
3601 bit_len, blob = self._value
3602 value = "%d bits" % bit_len
3603 if len(self.specs) > 0 and blob is not None:
3604 blob = tuple(self.named)
3607 asn1_type_name=self.asn1_type_name,
3608 obj_name=self.__class__.__name__,
3609 decode_path=decode_path,
3612 optional=self.optional,
3613 default=self == self.default,
3614 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3615 expl=None if self._expl is None else tag_decode(self._expl),
3620 expl_offset=self.expl_offset if self.expled else None,
3621 expl_tlen=self.expl_tlen if self.expled else None,
3622 expl_llen=self.expl_llen if self.expled else None,
3623 expl_vlen=self.expl_vlen if self.expled else None,
3624 expl_lenindef=self.expl_lenindef,
3625 lenindef=self.lenindef,
3626 ber_encoded=self.ber_encoded,
3629 defined_by, defined = self.defined or (None, None)
3630 if defined_by is not None:
3632 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3634 for pp in self.pps_lenindef(decode_path):
3638 OctetStringState = namedtuple(
3640 BasicState._fields + (
3651 class OctetString(Obj):
3652 """``OCTET STRING`` binary string type
3654 >>> s = OctetString(b"hello world")
3655 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3656 >>> s == OctetString(b"hello world")
3661 >>> OctetString(b"hello", bounds=(4, 4))
3662 Traceback (most recent call last):
3663 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3664 >>> OctetString(b"hell", bounds=(4, 4))
3665 OCTET STRING 4 bytes 68656c6c
3667 Memoryviews can be used as a values. If memoryview is made on
3668 mmap-ed file, then it does not take storage inside OctetString
3669 itself. In CER encoding mode it will be streamed to the specified
3670 writer, copying 1 KB chunks.
3672 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3673 tag_default = tag_encode(4)
3674 asn1_type_name = "OCTET STRING"
3675 evgen_mode_skip_value = True
3689 :param value: set the value. Either binary type, or
3690 :py:class:`pyderasn.OctetString` object
3691 :param bounds: set ``(MIN, MAX)`` value size constraint.
3692 (-inf, +inf) by default
3693 :param bytes impl: override default tag with ``IMPLICIT`` one
3694 :param bytes expl: override default tag with ``EXPLICIT`` one
3695 :param default: set default value. Type same as in ``value``
3696 :param bool optional: is object ``OPTIONAL`` in sequence
3698 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3700 self._bound_min, self._bound_max = getattr(
3704 ) if bounds is None else bounds
3705 if value is not None:
3706 self._value = self._value_sanitize(value)
3707 if default is not None:
3708 default = self._value_sanitize(default)
3709 self.default = self.__class__(
3714 if self._value is None:
3715 self._value = default
3717 tag_klass, _, tag_num = tag_decode(self.tag)
3718 self.tag_constructed = tag_encode(
3720 form=TagFormConstructed,
3724 def _value_sanitize(self, value):
3725 if value.__class__ == binary_type or value.__class__ == memoryview:
3727 elif issubclass(value.__class__, OctetString):
3728 value = value._value
3730 raise InvalidValueType((self.__class__, bytes, memoryview))
3731 if not self._bound_min <= len(value) <= self._bound_max:
3732 raise BoundsError(self._bound_min, len(value), self._bound_max)
3737 return self._value is not None
3739 def __getstate__(self):
3740 return OctetStringState(
3756 self.tag_constructed,
3760 def __setstate__(self, state):
3761 super(OctetString, self).__setstate__(state)
3762 self._value = state.value
3763 self._bound_min = state.bound_min
3764 self._bound_max = state.bound_max
3765 self.tag_constructed = state.tag_constructed
3766 self.defined = state.defined
3768 def __bytes__(self):
3769 self._assert_ready()
3770 return bytes(self._value)
3772 def __eq__(self, their):
3773 if their.__class__ == binary_type:
3774 return self._value == their
3775 if not issubclass(their.__class__, OctetString):
3778 self._value == their._value and
3779 self.tag == their.tag and
3780 self._expl == their._expl
3783 def __lt__(self, their):
3784 return self._value < their._value
3795 return self.__class__(
3798 (self._bound_min, self._bound_max)
3799 if bounds is None else bounds
3801 impl=self.tag if impl is None else impl,
3802 expl=self._expl if expl is None else expl,
3803 default=self.default if default is None else default,
3804 optional=self.optional if optional is None else optional,
3808 self._assert_ready()
3811 len_encode(len(self._value)),
3815 def _encode1st(self, state):
3816 self._assert_ready()
3817 l = len(self._value)
3818 return len(self.tag) + len_size(l) + l, state
3820 def _encode2nd(self, writer, state_iter):
3822 write_full(writer, self.tag + len_encode(len(value)))
3823 write_full(writer, value)
3825 def _encode_cer(self, writer):
3826 octets = self._value
3827 if len(octets) <= 1000:
3828 write_full(writer, self._encode())
3830 write_full(writer, self.tag_constructed)
3831 write_full(writer, LENINDEF)
3832 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3833 write_full(writer, b"".join((
3834 OctetString.tag_default,
3836 octets[offset:offset + 1000],
3838 tail = octets[offset + 1000:]
3840 write_full(writer, b"".join((
3841 OctetString.tag_default,
3842 len_encode(len(tail)),
3845 write_full(writer, EOC)
3847 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3849 t, tlen, lv = tag_strip(tlv)
3850 except DecodeError as err:
3851 raise err.__class__(
3853 klass=self.__class__,
3854 decode_path=decode_path,
3862 l, llen, v = len_decode(lv)
3863 except DecodeError as err:
3864 raise err.__class__(
3866 klass=self.__class__,
3867 decode_path=decode_path,
3871 raise NotEnoughData(
3872 "encoded length is longer than data",
3873 klass=self.__class__,
3874 decode_path=decode_path,
3877 v, tail = v[:l], v[l:]
3878 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3880 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3881 klass=self.__class__,
3882 decode_path=decode_path,
3886 obj = self.__class__(
3888 None if (evgen_mode and self.evgen_mode_skip_value)
3891 bounds=(self._bound_min, self._bound_max),
3894 default=self.default,
3895 optional=self.optional,
3896 _decoded=(offset, llen, l),
3899 except DecodeError as err:
3902 klass=self.__class__,
3903 decode_path=decode_path,
3906 except BoundsError as err:
3909 klass=self.__class__,
3910 decode_path=decode_path,
3913 yield decode_path, obj, tail
3915 if t != self.tag_constructed:
3917 klass=self.__class__,
3918 decode_path=decode_path,
3921 if not ctx.get("bered", False):
3923 "unallowed BER constructed encoding",
3924 klass=self.__class__,
3925 decode_path=decode_path,
3933 l, llen, v = len_decode(lv)
3934 except LenIndefForm:
3935 llen, l, v = 1, 0, lv[1:]
3937 except DecodeError as err:
3938 raise err.__class__(
3940 klass=self.__class__,
3941 decode_path=decode_path,
3945 raise NotEnoughData(
3946 "encoded length is longer than data",
3947 klass=self.__class__,
3948 decode_path=decode_path,
3953 sub_offset = offset + tlen + llen
3958 if v[:EOC_LEN].tobytes() == EOC:
3965 "chunk out of bounds",
3966 klass=self.__class__,
3967 decode_path=decode_path + (str(len(chunks) - 1),),
3968 offset=chunks[-1].offset,
3972 sub_decode_path = decode_path + (str(chunks_count),)
3973 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3976 decode_path=sub_decode_path,
3979 _ctx_immutable=False,
3981 yield _decode_path, chunk, v_tail
3982 if not chunk.ber_encoded:
3983 payload_len += chunk.vlen
3986 sub_decode_path = decode_path + (str(len(chunks)),)
3987 _, chunk, v_tail = next(OctetString().decode_evgen(
3990 decode_path=sub_decode_path,
3993 _ctx_immutable=False,
3996 chunks.append(chunk)
3999 "expected OctetString encoded chunk",
4000 klass=self.__class__,
4001 decode_path=sub_decode_path,
4004 sub_offset += chunk.tlvlen
4005 vlen += chunk.tlvlen
4007 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4009 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4010 klass=self.__class__,
4011 decode_path=decode_path,
4015 obj = self.__class__(
4017 None if evgen_mode else
4018 b"".join(bytes(chunk) for chunk in chunks)
4020 bounds=(self._bound_min, self._bound_max),
4023 default=self.default,
4024 optional=self.optional,
4025 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4028 except DecodeError as err:
4031 klass=self.__class__,
4032 decode_path=decode_path,
4035 except BoundsError as err:
4038 klass=self.__class__,
4039 decode_path=decode_path,
4042 obj.lenindef = lenindef
4043 obj.ber_encoded = True
4044 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4047 return pp_console_row(next(self.pps()))
4049 def pps(self, decode_path=()):
4052 asn1_type_name=self.asn1_type_name,
4053 obj_name=self.__class__.__name__,
4054 decode_path=decode_path,
4055 value=("%d bytes" % len(self._value)) if self.ready else None,
4056 blob=self._value if self.ready else None,
4057 optional=self.optional,
4058 default=self == self.default,
4059 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4060 expl=None if self._expl is None else tag_decode(self._expl),
4065 expl_offset=self.expl_offset if self.expled else None,
4066 expl_tlen=self.expl_tlen if self.expled else None,
4067 expl_llen=self.expl_llen if self.expled else None,
4068 expl_vlen=self.expl_vlen if self.expled else None,
4069 expl_lenindef=self.expl_lenindef,
4070 lenindef=self.lenindef,
4071 ber_encoded=self.ber_encoded,
4074 defined_by, defined = self.defined or (None, None)
4075 if defined_by is not None:
4077 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4079 for pp in self.pps_lenindef(decode_path):
4083 def agg_octet_string(evgens, decode_path, raw, writer):
4084 """Aggregate constructed string (OctetString and its derivatives)
4086 :param evgens: iterator of generated events
4087 :param decode_path: points to the string we want to decode
4088 :param raw: slicebable (memoryview, bytearray, etc) with
4089 the data evgens are generated on
4090 :param writer: buffer.write where string is going to be saved
4091 :param writer: where string is going to be saved. Must comply
4092 with ``io.RawIOBase.write`` behaviour
4094 .. seealso:: :ref:`agg_octet_string`
4096 decode_path_len = len(decode_path)
4097 for dp, obj, _ in evgens:
4098 if dp[:decode_path_len] != decode_path:
4100 if not obj.ber_encoded:
4101 write_full(writer, raw[
4102 obj.offset + obj.tlen + obj.llen:
4103 obj.offset + obj.tlen + obj.llen + obj.vlen -
4104 (EOC_LEN if obj.expl_lenindef else 0)
4106 if len(dp) == decode_path_len:
4110 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4114 """``NULL`` null object
4122 tag_default = tag_encode(5)
4123 asn1_type_name = "NULL"
4127 value=None, # unused, but Sequence passes it
4134 :param bytes impl: override default tag with ``IMPLICIT`` one
4135 :param bytes expl: override default tag with ``EXPLICIT`` one
4136 :param bool optional: is object ``OPTIONAL`` in sequence
4138 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4145 def __getstate__(self):
4161 def __eq__(self, their):
4162 if not issubclass(their.__class__, Null):
4165 self.tag == their.tag and
4166 self._expl == their._expl
4176 return self.__class__(
4177 impl=self.tag if impl is None else impl,
4178 expl=self._expl if expl is None else expl,
4179 optional=self.optional if optional is None else optional,
4183 return self.tag + LEN0
4185 def _encode1st(self, state):
4186 return len(self.tag) + 1, state
4188 def _encode2nd(self, writer, state_iter):
4189 write_full(writer, self.tag + LEN0)
4191 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4193 t, _, lv = tag_strip(tlv)
4194 except DecodeError as err:
4195 raise err.__class__(
4197 klass=self.__class__,
4198 decode_path=decode_path,
4203 klass=self.__class__,
4204 decode_path=decode_path,
4207 if tag_only: # pragma: no cover
4211 l, _, v = len_decode(lv)
4212 except DecodeError as err:
4213 raise err.__class__(
4215 klass=self.__class__,
4216 decode_path=decode_path,
4220 raise InvalidLength(
4221 "Null must have zero length",
4222 klass=self.__class__,
4223 decode_path=decode_path,
4226 obj = self.__class__(
4229 optional=self.optional,
4230 _decoded=(offset, 1, 0),
4232 yield decode_path, obj, v
4235 return pp_console_row(next(self.pps()))
4237 def pps(self, decode_path=()):
4240 asn1_type_name=self.asn1_type_name,
4241 obj_name=self.__class__.__name__,
4242 decode_path=decode_path,
4243 optional=self.optional,
4244 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4245 expl=None if self._expl is None else tag_decode(self._expl),
4250 expl_offset=self.expl_offset if self.expled else None,
4251 expl_tlen=self.expl_tlen if self.expled else None,
4252 expl_llen=self.expl_llen if self.expled else None,
4253 expl_vlen=self.expl_vlen if self.expled else None,
4254 expl_lenindef=self.expl_lenindef,
4257 for pp in self.pps_lenindef(decode_path):
4261 ObjectIdentifierState = namedtuple(
4262 "ObjectIdentifierState",
4263 BasicState._fields + ("value", "defines"),
4268 class ObjectIdentifier(Obj):
4269 """``OBJECT IDENTIFIER`` OID type
4271 >>> oid = ObjectIdentifier((1, 2, 3))
4272 OBJECT IDENTIFIER 1.2.3
4273 >>> oid == ObjectIdentifier("1.2.3")
4279 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4280 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4282 >>> str(ObjectIdentifier((3, 1)))
4283 Traceback (most recent call last):
4284 pyderasn.InvalidOID: unacceptable first arc value
4286 __slots__ = ("defines",)
4287 tag_default = tag_encode(6)
4288 asn1_type_name = "OBJECT IDENTIFIER"
4301 :param value: set the value. Either tuples of integers,
4302 string of "."-concatenated integers, or
4303 :py:class:`pyderasn.ObjectIdentifier` object
4304 :param defines: sequence of tuples. Each tuple has two elements.
4305 First one is relative to current one decode
4306 path, aiming to the field defined by that OID.
4307 Read about relative path in
4308 :py:func:`pyderasn.abs_decode_path`. Second
4309 tuple element is ``{OID: pyderasn.Obj()}``
4310 dictionary, mapping between current OID value
4311 and structure applied to defined field.
4313 .. seealso:: :ref:`definedby`
4315 :param bytes impl: override default tag with ``IMPLICIT`` one
4316 :param bytes expl: override default tag with ``EXPLICIT`` one
4317 :param default: set default value. Type same as in ``value``
4318 :param bool optional: is object ``OPTIONAL`` in sequence
4320 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4322 if value is not None:
4323 self._value = self._value_sanitize(value)
4324 if default is not None:
4325 default = self._value_sanitize(default)
4326 self.default = self.__class__(
4331 if self._value is None:
4332 self._value = default
4333 self.defines = defines
4335 def __add__(self, their):
4336 if their.__class__ == tuple:
4337 return self.__class__(self._value + array("L", their))
4338 if isinstance(their, self.__class__):
4339 return self.__class__(self._value + their._value)
4340 raise InvalidValueType((self.__class__, tuple))
4342 def _value_sanitize(self, value):
4343 if issubclass(value.__class__, ObjectIdentifier):
4345 if isinstance(value, string_types):
4347 value = array("L", (pureint(arc) for arc in value.split(".")))
4349 raise InvalidOID("unacceptable arcs values")
4350 if value.__class__ == tuple:
4352 value = array("L", value)
4353 except OverflowError as err:
4354 raise InvalidOID(repr(err))
4355 if value.__class__ is array:
4357 raise InvalidOID("less than 2 arcs")
4358 first_arc = value[0]
4359 if first_arc in (0, 1):
4360 if not (0 <= value[1] <= 39):
4361 raise InvalidOID("second arc is too wide")
4362 elif first_arc == 2:
4365 raise InvalidOID("unacceptable first arc value")
4366 if not all(arc >= 0 for arc in value):
4367 raise InvalidOID("negative arc value")
4369 raise InvalidValueType((self.__class__, str, tuple))
4373 return self._value is not None
4375 def __getstate__(self):
4376 return ObjectIdentifierState(
4393 def __setstate__(self, state):
4394 super(ObjectIdentifier, self).__setstate__(state)
4395 self._value = state.value
4396 self.defines = state.defines
4399 self._assert_ready()
4400 return iter(self._value)
4403 return ".".join(str(arc) for arc in self._value or ())
4406 self._assert_ready()
4407 return hash(b"".join((
4409 bytes(self._expl or b""),
4410 str(self._value).encode("ascii"),
4413 def __eq__(self, their):
4414 if their.__class__ == tuple:
4415 return self._value == array("L", their)
4416 if not issubclass(their.__class__, ObjectIdentifier):
4419 self.tag == their.tag and
4420 self._expl == their._expl and
4421 self._value == their._value
4424 def __lt__(self, their):
4425 return self._value < their._value
4436 return self.__class__(
4438 defines=self.defines if defines is None else defines,
4439 impl=self.tag if impl is None else impl,
4440 expl=self._expl if expl is None else expl,
4441 default=self.default if default is None else default,
4442 optional=self.optional if optional is None else optional,
4445 def _encode_octets(self):
4446 self._assert_ready()
4448 first_value = value[1]
4449 first_arc = value[0]
4452 elif first_arc == 1:
4454 elif first_arc == 2:
4456 else: # pragma: no cover
4457 raise RuntimeError("invalid arc is stored")
4458 octets = [zero_ended_encode(first_value)]
4459 for arc in value[2:]:
4460 octets.append(zero_ended_encode(arc))
4461 return b"".join(octets)
4464 v = self._encode_octets()
4465 return b"".join((self.tag, len_encode(len(v)), v))
4467 def _encode1st(self, state):
4468 l = len(self._encode_octets())
4469 return len(self.tag) + len_size(l) + l, state
4471 def _encode2nd(self, writer, state_iter):
4472 write_full(writer, self._encode())
4474 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4476 t, _, lv = tag_strip(tlv)
4477 except DecodeError as err:
4478 raise err.__class__(
4480 klass=self.__class__,
4481 decode_path=decode_path,
4486 klass=self.__class__,
4487 decode_path=decode_path,
4490 if tag_only: # pragma: no cover
4494 l, llen, v = len_decode(lv)
4495 except DecodeError as err:
4496 raise err.__class__(
4498 klass=self.__class__,
4499 decode_path=decode_path,
4503 raise NotEnoughData(
4504 "encoded length is longer than data",
4505 klass=self.__class__,
4506 decode_path=decode_path,
4510 raise NotEnoughData(
4512 klass=self.__class__,
4513 decode_path=decode_path,
4516 v, tail = v[:l], v[l:]
4523 octet = indexbytes(v, i)
4524 if i == 0 and octet == 0x80:
4525 if ctx.get("bered", False):
4529 "non normalized arc encoding",
4530 klass=self.__class__,
4531 decode_path=decode_path,
4534 arc = (arc << 7) | (octet & 0x7F)
4535 if octet & 0x80 == 0:
4538 except OverflowError:
4540 "too huge value for local unsigned long",
4541 klass=self.__class__,
4542 decode_path=decode_path,
4551 klass=self.__class__,
4552 decode_path=decode_path,
4556 second_arc = arcs[0]
4557 if 0 <= second_arc <= 39:
4559 elif 40 <= second_arc <= 79:
4565 obj = self.__class__(
4566 value=array("L", (first_arc, second_arc)) + arcs[1:],
4569 default=self.default,
4570 optional=self.optional,
4571 _decoded=(offset, llen, l),
4574 obj.ber_encoded = True
4575 yield decode_path, obj, tail
4578 return pp_console_row(next(self.pps()))
4580 def pps(self, decode_path=()):
4583 asn1_type_name=self.asn1_type_name,
4584 obj_name=self.__class__.__name__,
4585 decode_path=decode_path,
4586 value=str(self) if self.ready else None,
4587 optional=self.optional,
4588 default=self == self.default,
4589 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4590 expl=None if self._expl is None else tag_decode(self._expl),
4595 expl_offset=self.expl_offset if self.expled else None,
4596 expl_tlen=self.expl_tlen if self.expled else None,
4597 expl_llen=self.expl_llen if self.expled else None,
4598 expl_vlen=self.expl_vlen if self.expled else None,
4599 expl_lenindef=self.expl_lenindef,
4600 ber_encoded=self.ber_encoded,
4603 for pp in self.pps_lenindef(decode_path):
4607 class Enumerated(Integer):
4608 """``ENUMERATED`` integer type
4610 This type is identical to :py:class:`pyderasn.Integer`, but requires
4611 schema to be specified and does not accept values missing from it.
4614 tag_default = tag_encode(10)
4615 asn1_type_name = "ENUMERATED"
4626 bounds=None, # dummy argument, workability for Integer.decode
4628 super(Enumerated, self).__init__(
4629 value, bounds, impl, expl, default, optional, _specs, _decoded,
4631 if len(self.specs) == 0:
4632 raise ValueError("schema must be specified")
4634 def _value_sanitize(self, value):
4635 if isinstance(value, self.__class__):
4636 value = value._value
4637 elif isinstance(value, integer_types):
4638 for _value in itervalues(self.specs):
4643 "unknown integer value: %s" % value,
4644 klass=self.__class__,
4646 elif isinstance(value, string_types):
4647 value = self.specs.get(value)
4649 raise ObjUnknown("integer value: %s" % value)
4651 raise InvalidValueType((self.__class__, int, str))
4663 return self.__class__(
4665 impl=self.tag if impl is None else impl,
4666 expl=self._expl if expl is None else expl,
4667 default=self.default if default is None else default,
4668 optional=self.optional if optional is None else optional,
4673 def escape_control_unicode(c):
4674 if unicat(c)[0] == "C":
4675 c = repr(c).lstrip("u").strip("'")
4679 class CommonString(OctetString):
4680 """Common class for all strings
4682 Everything resembles :py:class:`pyderasn.OctetString`, except
4683 ability to deal with unicode text strings.
4685 >>> hexenc("привет мир".encode("utf-8"))
4686 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4687 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4689 >>> s = UTF8String("привет мир")
4690 UTF8String UTF8String привет мир
4692 'привет мир'
4693 >>> hexenc(bytes(s))
4694 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4696 >>> PrintableString("привет мир")
4697 Traceback (most recent call last):
4698 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4700 >>> BMPString("ада", bounds=(2, 2))
4701 Traceback (most recent call last):
4702 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4703 >>> s = BMPString("ад", bounds=(2, 2))
4706 >>> hexenc(bytes(s))
4714 * - :py:class:`pyderasn.UTF8String`
4716 * - :py:class:`pyderasn.NumericString`
4718 * - :py:class:`pyderasn.PrintableString`
4720 * - :py:class:`pyderasn.TeletexString`
4722 * - :py:class:`pyderasn.T61String`
4724 * - :py:class:`pyderasn.VideotexString`
4726 * - :py:class:`pyderasn.IA5String`
4728 * - :py:class:`pyderasn.GraphicString`
4730 * - :py:class:`pyderasn.VisibleString`
4732 * - :py:class:`pyderasn.ISO646String`
4734 * - :py:class:`pyderasn.GeneralString`
4736 * - :py:class:`pyderasn.UniversalString`
4738 * - :py:class:`pyderasn.BMPString`
4743 def _value_sanitize(self, value):
4745 value_decoded = None
4746 if isinstance(value, self.__class__):
4747 value_raw = value._value
4748 elif value.__class__ == text_type:
4749 value_decoded = value
4750 elif value.__class__ == binary_type:
4753 raise InvalidValueType((self.__class__, text_type, binary_type))
4756 value_decoded.encode(self.encoding)
4757 if value_raw is None else value_raw
4760 value_raw.decode(self.encoding)
4761 if value_decoded is None else value_decoded
4763 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4764 raise DecodeError(str(err))
4765 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4773 def __eq__(self, their):
4774 if their.__class__ == binary_type:
4775 return self._value == their
4776 if their.__class__ == text_type:
4777 return self._value == their.encode(self.encoding)
4778 if not isinstance(their, self.__class__):
4781 self._value == their._value and
4782 self.tag == their.tag and
4783 self._expl == their._expl
4786 def __unicode__(self):
4788 return self._value.decode(self.encoding)
4789 return text_type(self._value)
4792 return pp_console_row(next(self.pps(no_unicode=PY2)))
4794 def pps(self, decode_path=(), no_unicode=False):
4798 hexenc(bytes(self)) if no_unicode else
4799 "".join(escape_control_unicode(c) for c in self.__unicode__())
4803 asn1_type_name=self.asn1_type_name,
4804 obj_name=self.__class__.__name__,
4805 decode_path=decode_path,
4807 optional=self.optional,
4808 default=self == self.default,
4809 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4810 expl=None if self._expl is None else tag_decode(self._expl),
4815 expl_offset=self.expl_offset if self.expled else None,
4816 expl_tlen=self.expl_tlen if self.expled else None,
4817 expl_llen=self.expl_llen if self.expled else None,
4818 expl_vlen=self.expl_vlen if self.expled else None,
4819 expl_lenindef=self.expl_lenindef,
4820 ber_encoded=self.ber_encoded,
4823 for pp in self.pps_lenindef(decode_path):
4827 class UTF8String(CommonString):
4829 tag_default = tag_encode(12)
4831 asn1_type_name = "UTF8String"
4834 class AllowableCharsMixin(object):
4836 def allowable_chars(self):
4838 return self._allowable_chars
4839 return frozenset(six_unichr(c) for c in self._allowable_chars)
4842 class NumericString(AllowableCharsMixin, CommonString):
4845 Its value is properly sanitized: only ASCII digits with spaces can
4848 >>> NumericString().allowable_chars
4849 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4852 tag_default = tag_encode(18)
4854 asn1_type_name = "NumericString"
4855 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4857 def _value_sanitize(self, value):
4858 value = super(NumericString, self)._value_sanitize(value)
4859 if not frozenset(value) <= self._allowable_chars:
4860 raise DecodeError("non-numeric value")
4864 PrintableStringState = namedtuple(
4865 "PrintableStringState",
4866 OctetStringState._fields + ("allowable_chars",),
4871 class PrintableString(AllowableCharsMixin, CommonString):
4874 Its value is properly sanitized: see X.680 41.4 table 10.
4876 >>> PrintableString().allowable_chars
4877 frozenset([' ', "'", ..., 'z'])
4878 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4879 PrintableString PrintableString foo*bar
4880 >>> obj.allow_asterisk, obj.allow_ampersand
4884 tag_default = tag_encode(19)
4886 asn1_type_name = "PrintableString"
4887 _allowable_chars = frozenset(
4888 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4890 _asterisk = frozenset("*".encode("ascii"))
4891 _ampersand = frozenset("&".encode("ascii"))
4903 allow_asterisk=False,
4904 allow_ampersand=False,
4907 :param allow_asterisk: allow asterisk character
4908 :param allow_ampersand: allow ampersand character
4911 self._allowable_chars |= self._asterisk
4913 self._allowable_chars |= self._ampersand
4914 super(PrintableString, self).__init__(
4915 value, bounds, impl, expl, default, optional, _decoded, ctx,
4919 def allow_asterisk(self):
4920 """Is asterisk character allowed?
4922 return self._asterisk <= self._allowable_chars
4925 def allow_ampersand(self):
4926 """Is ampersand character allowed?
4928 return self._ampersand <= self._allowable_chars
4930 def _value_sanitize(self, value):
4931 value = super(PrintableString, self)._value_sanitize(value)
4932 if not frozenset(value) <= self._allowable_chars:
4933 raise DecodeError("non-printable value")
4936 def __getstate__(self):
4937 return PrintableStringState(
4938 *super(PrintableString, self).__getstate__(),
4939 **{"allowable_chars": self._allowable_chars}
4942 def __setstate__(self, state):
4943 super(PrintableString, self).__setstate__(state)
4944 self._allowable_chars = state.allowable_chars
4955 return self.__class__(
4958 (self._bound_min, self._bound_max)
4959 if bounds is None else bounds
4961 impl=self.tag if impl is None else impl,
4962 expl=self._expl if expl is None else expl,
4963 default=self.default if default is None else default,
4964 optional=self.optional if optional is None else optional,
4965 allow_asterisk=self.allow_asterisk,
4966 allow_ampersand=self.allow_ampersand,
4970 class TeletexString(CommonString):
4972 tag_default = tag_encode(20)
4974 asn1_type_name = "TeletexString"
4977 class T61String(TeletexString):
4979 asn1_type_name = "T61String"
4982 class VideotexString(CommonString):
4984 tag_default = tag_encode(21)
4985 encoding = "iso-8859-1"
4986 asn1_type_name = "VideotexString"
4989 class IA5String(CommonString):
4991 tag_default = tag_encode(22)
4993 asn1_type_name = "IA5"
4996 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4997 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4998 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4999 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5000 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5001 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5004 class VisibleString(CommonString):
5006 tag_default = tag_encode(26)
5008 asn1_type_name = "VisibleString"
5011 UTCTimeState = namedtuple(
5013 OctetStringState._fields + ("ber_raw",),
5018 def str_to_time_fractions(value):
5020 year, v = (v // 10**10), (v % 10**10)
5021 month, v = (v // 10**8), (v % 10**8)
5022 day, v = (v // 10**6), (v % 10**6)
5023 hour, v = (v // 10**4), (v % 10**4)
5024 minute, second = (v // 100), (v % 100)
5025 return year, month, day, hour, minute, second
5028 class UTCTime(VisibleString):
5029 """``UTCTime`` datetime type
5031 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5032 UTCTime UTCTime 2017-09-30T22:07:50
5038 datetime.datetime(2017, 9, 30, 22, 7, 50)
5039 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5040 datetime.datetime(1957, 9, 30, 22, 7, 50)
5042 If BER encoded value was met, then ``ber_raw`` attribute will hold
5043 its raw representation.
5047 Pay attention that UTCTime can not hold full year, so all years
5048 having < 50 years are treated as 20xx, 19xx otherwise, according
5049 to X.509 recommendation.
5053 No strict validation of UTC offsets are made, but very crude:
5055 * minutes are not exceeding 60
5056 * offset value is not exceeding 14 hours
5058 __slots__ = ("ber_raw",)
5059 tag_default = tag_encode(23)
5061 asn1_type_name = "UTCTime"
5062 evgen_mode_skip_value = False
5072 bounds=None, # dummy argument, workability for OctetString.decode
5076 :param value: set the value. Either datetime type, or
5077 :py:class:`pyderasn.UTCTime` object
5078 :param bytes impl: override default tag with ``IMPLICIT`` one
5079 :param bytes expl: override default tag with ``EXPLICIT`` one
5080 :param default: set default value. Type same as in ``value``
5081 :param bool optional: is object ``OPTIONAL`` in sequence
5083 super(UTCTime, self).__init__(
5084 None, None, impl, expl, None, optional, _decoded, ctx,
5088 if value is not None:
5089 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5090 self.ber_encoded = self.ber_raw is not None
5091 if default is not None:
5092 default, _ = self._value_sanitize(default)
5093 self.default = self.__class__(
5098 if self._value is None:
5099 self._value = default
5101 self.optional = optional
5103 def _strptime_bered(self, value):
5104 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5107 raise ValueError("no timezone")
5108 year += 2000 if year < 50 else 1900
5109 decoded = datetime(year, month, day, hour, minute)
5111 if value[-1] == "Z":
5115 raise ValueError("invalid UTC offset")
5116 if value[-5] == "-":
5118 elif value[-5] == "+":
5121 raise ValueError("invalid UTC offset")
5122 v = pureint(value[-4:])
5123 offset, v = (60 * (v % 100)), v // 100
5125 raise ValueError("invalid UTC offset minutes")
5127 if offset > 14 * 3600:
5128 raise ValueError("too big UTC offset")
5132 return offset, decoded
5134 raise ValueError("invalid UTC offset seconds")
5135 seconds = pureint(value)
5137 raise ValueError("invalid seconds value")
5138 return offset, decoded + timedelta(seconds=seconds)
5140 def _strptime(self, value):
5141 # datetime.strptime's format: %y%m%d%H%M%SZ
5142 if len(value) != LEN_YYMMDDHHMMSSZ:
5143 raise ValueError("invalid UTCTime length")
5144 if value[-1] != "Z":
5145 raise ValueError("non UTC timezone")
5146 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5147 year += 2000 if year < 50 else 1900
5148 return datetime(year, month, day, hour, minute, second)
5150 def _dt_sanitize(self, value):
5151 if value.year < 1950 or value.year > 2049:
5152 raise ValueError("UTCTime can hold only 1950-2049 years")
5153 return value.replace(microsecond=0)
5155 def _value_sanitize(self, value, ctx=None):
5156 if value.__class__ == binary_type:
5158 value_decoded = value.decode("ascii")
5159 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5160 raise DecodeError("invalid UTCTime encoding: %r" % err)
5163 return self._strptime(value_decoded), None
5164 except (TypeError, ValueError) as _err:
5166 if (ctx is not None) and ctx.get("bered", False):
5168 offset, _value = self._strptime_bered(value_decoded)
5169 _value = _value - timedelta(seconds=offset)
5170 return self._dt_sanitize(_value), value
5171 except (TypeError, ValueError, OverflowError) as _err:
5174 "invalid %s format: %r" % (self.asn1_type_name, err),
5175 klass=self.__class__,
5177 if isinstance(value, self.__class__):
5178 return value._value, None
5179 if value.__class__ == datetime:
5180 return self._dt_sanitize(value), None
5181 raise InvalidValueType((self.__class__, datetime))
5183 def _pp_value(self):
5185 value = self._value.isoformat()
5186 if self.ber_encoded:
5187 value += " (%s)" % self.ber_raw
5191 def __unicode__(self):
5193 value = self._value.isoformat()
5194 if self.ber_encoded:
5195 value += " (%s)" % self.ber_raw
5197 return text_type(self._pp_value())
5199 def __getstate__(self):
5200 return UTCTimeState(
5201 *super(UTCTime, self).__getstate__(),
5202 **{"ber_raw": self.ber_raw}
5205 def __setstate__(self, state):
5206 super(UTCTime, self).__setstate__(state)
5207 self.ber_raw = state.ber_raw
5209 def __bytes__(self):
5210 self._assert_ready()
5211 return self._encode_time()
5213 def __eq__(self, their):
5214 if their.__class__ == binary_type:
5215 return self._encode_time() == their
5216 if their.__class__ == datetime:
5217 return self.todatetime() == their
5218 if not isinstance(their, self.__class__):
5221 self._value == their._value and
5222 self.tag == their.tag and
5223 self._expl == their._expl
5226 def _encode_time(self):
5227 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5230 self._assert_ready()
5231 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5233 def _encode1st(self, state):
5234 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5236 def _encode2nd(self, writer, state_iter):
5237 self._assert_ready()
5238 write_full(writer, self._encode())
5240 def _encode_cer(self, writer):
5241 write_full(writer, self._encode())
5243 def todatetime(self):
5247 return pp_console_row(next(self.pps()))
5249 def pps(self, decode_path=()):
5252 asn1_type_name=self.asn1_type_name,
5253 obj_name=self.__class__.__name__,
5254 decode_path=decode_path,
5255 value=self._pp_value(),
5256 optional=self.optional,
5257 default=self == self.default,
5258 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5259 expl=None if self._expl is None else tag_decode(self._expl),
5264 expl_offset=self.expl_offset if self.expled else None,
5265 expl_tlen=self.expl_tlen if self.expled else None,
5266 expl_llen=self.expl_llen if self.expled else None,
5267 expl_vlen=self.expl_vlen if self.expled else None,
5268 expl_lenindef=self.expl_lenindef,
5269 ber_encoded=self.ber_encoded,
5272 for pp in self.pps_lenindef(decode_path):
5276 class GeneralizedTime(UTCTime):
5277 """``GeneralizedTime`` datetime type
5279 This type is similar to :py:class:`pyderasn.UTCTime`.
5281 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5282 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5284 '20170930220750.000123Z'
5285 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5286 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5290 Only microsecond fractions are supported in DER encoding.
5291 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5292 higher precision values.
5296 BER encoded data can loss information (accuracy) during decoding
5297 because of float transformations.
5301 Local times (without explicit timezone specification) are treated
5302 as UTC one, no transformations are made.
5306 Zero year is unsupported.
5309 tag_default = tag_encode(24)
5310 asn1_type_name = "GeneralizedTime"
5312 def _dt_sanitize(self, value):
5315 def _strptime_bered(self, value):
5316 if len(value) < 4 + 3 * 2:
5317 raise ValueError("invalid GeneralizedTime")
5318 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5319 decoded = datetime(year, month, day, hour)
5320 offset, value = 0, value[10:]
5322 return offset, decoded
5323 if value[-1] == "Z":
5326 for char, sign in (("-", -1), ("+", 1)):
5327 idx = value.rfind(char)
5330 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5331 v = pureint(offset_raw)
5332 if len(offset_raw) == 4:
5333 offset, v = (60 * (v % 100)), v // 100
5335 raise ValueError("invalid UTC offset minutes")
5336 elif len(offset_raw) == 2:
5339 raise ValueError("invalid UTC offset")
5341 if offset > 14 * 3600:
5342 raise ValueError("too big UTC offset")
5346 return offset, decoded
5347 if value[0] in DECIMAL_SIGNS:
5349 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5352 raise ValueError("stripped minutes")
5353 decoded += timedelta(seconds=60 * pureint(value[:2]))
5356 return offset, decoded
5357 if value[0] in DECIMAL_SIGNS:
5359 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5362 raise ValueError("stripped seconds")
5363 decoded += timedelta(seconds=pureint(value[:2]))
5366 return offset, decoded
5367 if value[0] not in DECIMAL_SIGNS:
5368 raise ValueError("invalid format after seconds")
5370 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5373 def _strptime(self, value):
5375 if l == LEN_YYYYMMDDHHMMSSZ:
5376 # datetime.strptime's format: %Y%m%d%H%M%SZ
5377 if value[-1] != "Z":
5378 raise ValueError("non UTC timezone")
5379 return datetime(*str_to_time_fractions(value[:-1]))
5380 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5381 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5382 if value[-1] != "Z":
5383 raise ValueError("non UTC timezone")
5384 if value[14] != ".":
5385 raise ValueError("no fractions separator")
5388 raise ValueError("trailing zero")
5391 raise ValueError("only microsecond fractions are supported")
5392 us = pureint(us + ("0" * (6 - us_len)))
5393 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5394 return datetime(year, month, day, hour, minute, second, us)
5395 raise ValueError("invalid GeneralizedTime length")
5397 def _encode_time(self):
5399 encoded = value.strftime("%Y%m%d%H%M%S")
5400 if value.microsecond > 0:
5401 encoded += (".%06d" % value.microsecond).rstrip("0")
5402 return (encoded + "Z").encode("ascii")
5405 self._assert_ready()
5407 if value.microsecond > 0:
5408 encoded = self._encode_time()
5409 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5410 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5412 def _encode1st(self, state):
5413 self._assert_ready()
5414 vlen = len(self._encode_time())
5415 return len(self.tag) + len_size(vlen) + vlen, state
5417 def _encode2nd(self, writer, state_iter):
5418 write_full(writer, self._encode())
5421 class GraphicString(CommonString):
5423 tag_default = tag_encode(25)
5424 encoding = "iso-8859-1"
5425 asn1_type_name = "GraphicString"
5428 class ISO646String(VisibleString):
5430 asn1_type_name = "ISO646String"
5433 class GeneralString(CommonString):
5435 tag_default = tag_encode(27)
5436 encoding = "iso-8859-1"
5437 asn1_type_name = "GeneralString"
5440 class UniversalString(CommonString):
5442 tag_default = tag_encode(28)
5443 encoding = "utf-32-be"
5444 asn1_type_name = "UniversalString"
5447 class BMPString(CommonString):
5449 tag_default = tag_encode(30)
5450 encoding = "utf-16-be"
5451 asn1_type_name = "BMPString"
5454 ChoiceState = namedtuple(
5456 BasicState._fields + ("specs", "value",),
5462 """``CHOICE`` special type
5466 class GeneralName(Choice):
5468 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5469 ("dNSName", IA5String(impl=tag_ctxp(2))),
5472 >>> gn = GeneralName()
5474 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5475 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5476 >>> gn["dNSName"] = IA5String("bar.baz")
5477 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5478 >>> gn["rfc822Name"]
5481 [2] IA5String IA5 bar.baz
5484 >>> gn.value == gn["dNSName"]
5487 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5489 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5490 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5492 __slots__ = ("specs",)
5494 asn1_type_name = "CHOICE"
5507 :param value: set the value. Either ``(choice, value)`` tuple, or
5508 :py:class:`pyderasn.Choice` object
5509 :param bytes impl: can not be set, do **not** use it
5510 :param bytes expl: override default tag with ``EXPLICIT`` one
5511 :param default: set default value. Type same as in ``value``
5512 :param bool optional: is object ``OPTIONAL`` in sequence
5514 if impl is not None:
5515 raise ValueError("no implicit tag allowed for CHOICE")
5516 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5518 schema = getattr(self, "schema", ())
5519 if len(schema) == 0:
5520 raise ValueError("schema must be specified")
5522 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5525 if value is not None:
5526 self._value = self._value_sanitize(value)
5527 if default is not None:
5528 default_value = self._value_sanitize(default)
5529 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5530 default_obj.specs = self.specs
5531 default_obj._value = default_value
5532 self.default = default_obj
5534 self._value = copy(default_obj._value)
5535 if self._expl is not None:
5536 tag_class, _, tag_num = tag_decode(self._expl)
5537 self._tag_order = (tag_class, tag_num)
5539 def _value_sanitize(self, value):
5540 if (value.__class__ == tuple) and len(value) == 2:
5542 spec = self.specs.get(choice)
5544 raise ObjUnknown(choice)
5545 if not isinstance(obj, spec.__class__):
5546 raise InvalidValueType((spec,))
5547 return (choice, spec(obj))
5548 if isinstance(value, self.__class__):
5550 raise InvalidValueType((self.__class__, tuple))
5554 return self._value is not None and self._value[1].ready
5558 return self.expl_lenindef or (
5559 (self._value is not None) and
5560 self._value[1].bered
5563 def __getstate__(self):
5581 def __setstate__(self, state):
5582 super(Choice, self).__setstate__(state)
5583 self.specs = state.specs
5584 self._value = state.value
5586 def __eq__(self, their):
5587 if (their.__class__ == tuple) and len(their) == 2:
5588 return self._value == their
5589 if not isinstance(their, self.__class__):
5592 self.specs == their.specs and
5593 self._value == their._value
5603 return self.__class__(
5606 expl=self._expl if expl is None else expl,
5607 default=self.default if default is None else default,
5608 optional=self.optional if optional is None else optional,
5613 """Name of the choice
5615 self._assert_ready()
5616 return self._value[0]
5620 """Value of underlying choice
5622 self._assert_ready()
5623 return self._value[1]
5626 def tag_order(self):
5627 self._assert_ready()
5628 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5631 def tag_order_cer(self):
5632 return min(v.tag_order_cer for v in itervalues(self.specs))
5634 def __getitem__(self, key):
5635 if key not in self.specs:
5636 raise ObjUnknown(key)
5637 if self._value is None:
5639 choice, value = self._value
5644 def __setitem__(self, key, value):
5645 spec = self.specs.get(key)
5647 raise ObjUnknown(key)
5648 if not isinstance(value, spec.__class__):
5649 raise InvalidValueType((spec.__class__,))
5650 self._value = (key, spec(value))
5658 return self._value[1].decoded if self.ready else False
5661 self._assert_ready()
5662 return self._value[1].encode()
5664 def _encode1st(self, state):
5665 self._assert_ready()
5666 return self._value[1].encode1st(state)
5668 def _encode2nd(self, writer, state_iter):
5669 self._value[1].encode2nd(writer, state_iter)
5671 def _encode_cer(self, writer):
5672 self._assert_ready()
5673 self._value[1].encode_cer(writer)
5675 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5676 for choice, spec in iteritems(self.specs):
5677 sub_decode_path = decode_path + (choice,)
5683 decode_path=sub_decode_path,
5686 _ctx_immutable=False,
5693 klass=self.__class__,
5694 decode_path=decode_path,
5697 if tag_only: # pragma: no cover
5701 for _decode_path, value, tail in spec.decode_evgen(
5705 decode_path=sub_decode_path,
5707 _ctx_immutable=False,
5709 yield _decode_path, value, tail
5711 _, value, tail = next(spec.decode_evgen(
5715 decode_path=sub_decode_path,
5717 _ctx_immutable=False,
5720 obj = self.__class__(
5723 default=self.default,
5724 optional=self.optional,
5725 _decoded=(offset, 0, value.fulllen),
5727 obj._value = (choice, value)
5728 yield decode_path, obj, tail
5731 value = pp_console_row(next(self.pps()))
5733 value = "%s[%r]" % (value, self.value)
5736 def pps(self, decode_path=()):
5739 asn1_type_name=self.asn1_type_name,
5740 obj_name=self.__class__.__name__,
5741 decode_path=decode_path,
5742 value=self.choice if self.ready else None,
5743 optional=self.optional,
5744 default=self == self.default,
5745 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5746 expl=None if self._expl is None else tag_decode(self._expl),
5751 expl_lenindef=self.expl_lenindef,
5755 yield self.value.pps(decode_path=decode_path + (self.choice,))
5756 for pp in self.pps_lenindef(decode_path):
5760 class PrimitiveTypes(Choice):
5761 """Predefined ``CHOICE`` for all generic primitive types
5763 It could be useful for general decoding of some unspecified values:
5765 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5766 OCTET STRING 3 bytes 666f6f
5767 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5771 schema = tuple((klass.__name__, klass()) for klass in (
5795 AnyState = namedtuple(
5797 BasicState._fields + ("value", "defined"),
5803 """``ANY`` special type
5805 >>> Any(Integer(-123))
5806 ANY INTEGER -123 (0X:7B)
5807 >>> a = Any(OctetString(b"hello world").encode())
5808 ANY 040b68656c6c6f20776f726c64
5809 >>> hexenc(bytes(a))
5810 b'0x040x0bhello world'
5812 __slots__ = ("defined",)
5813 tag_default = tag_encode(0)
5814 asn1_type_name = "ANY"
5824 :param value: set the value. Either any kind of pyderasn's
5825 **ready** object, or bytes. Pay attention that
5826 **no** validation is performed if raw binary value
5827 is valid TLV, except just tag decoding
5828 :param bytes expl: override default tag with ``EXPLICIT`` one
5829 :param bool optional: is object ``OPTIONAL`` in sequence
5831 super(Any, self).__init__(None, expl, None, optional, _decoded)
5835 value = self._value_sanitize(value)
5837 if self._expl is None:
5838 if value.__class__ == binary_type:
5839 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5841 tag_class, tag_num = value.tag_order
5843 tag_class, _, tag_num = tag_decode(self._expl)
5844 self._tag_order = (tag_class, tag_num)
5847 def _value_sanitize(self, value):
5848 if value.__class__ == binary_type:
5850 raise ValueError("Any value can not be empty")
5852 if isinstance(value, self.__class__):
5854 if not isinstance(value, Obj):
5855 raise InvalidValueType((self.__class__, Obj, binary_type))
5860 return self._value is not None
5863 def tag_order(self):
5864 self._assert_ready()
5865 return self._tag_order
5869 if self.expl_lenindef or self.lenindef:
5871 if self.defined is None:
5873 return self.defined[1].bered
5875 def __getstate__(self):
5893 def __setstate__(self, state):
5894 super(Any, self).__setstate__(state)
5895 self._value = state.value
5896 self.defined = state.defined
5898 def __eq__(self, their):
5899 if their.__class__ == binary_type:
5900 if self._value.__class__ == binary_type:
5901 return self._value == their
5902 return self._value.encode() == their
5903 if issubclass(their.__class__, Any):
5904 if self.ready and their.ready:
5905 return bytes(self) == bytes(their)
5906 return self.ready == their.ready
5915 return self.__class__(
5917 expl=self._expl if expl is None else expl,
5918 optional=self.optional if optional is None else optional,
5921 def __bytes__(self):
5922 self._assert_ready()
5924 if value.__class__ == binary_type:
5926 return self._value.encode()
5933 self._assert_ready()
5935 if value.__class__ == binary_type:
5937 return value.encode()
5939 def _encode1st(self, state):
5940 self._assert_ready()
5942 if value.__class__ == binary_type:
5943 return len(value), state
5944 return value.encode1st(state)
5946 def _encode2nd(self, writer, state_iter):
5948 if value.__class__ == binary_type:
5949 write_full(writer, value)
5951 value.encode2nd(writer, state_iter)
5953 def _encode_cer(self, writer):
5954 self._assert_ready()
5956 if value.__class__ == binary_type:
5957 write_full(writer, value)
5959 value.encode_cer(writer)
5961 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5963 t, tlen, lv = tag_strip(tlv)
5964 except DecodeError as err:
5965 raise err.__class__(
5967 klass=self.__class__,
5968 decode_path=decode_path,
5972 l, llen, v = len_decode(lv)
5973 except LenIndefForm as err:
5974 if not ctx.get("bered", False):
5975 raise err.__class__(
5977 klass=self.__class__,
5978 decode_path=decode_path,
5981 llen, vlen, v = 1, 0, lv[1:]
5982 sub_offset = offset + tlen + llen
5984 while v[:EOC_LEN].tobytes() != EOC:
5985 chunk, v = Any().decode(
5988 decode_path=decode_path + (str(chunk_i),),
5991 _ctx_immutable=False,
5993 vlen += chunk.tlvlen
5994 sub_offset += chunk.tlvlen
5996 tlvlen = tlen + llen + vlen + EOC_LEN
5997 obj = self.__class__(
5998 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6000 optional=self.optional,
6001 _decoded=(offset, 0, tlvlen),
6004 obj.tag = t.tobytes()
6005 yield decode_path, obj, v[EOC_LEN:]
6007 except DecodeError as err:
6008 raise err.__class__(
6010 klass=self.__class__,
6011 decode_path=decode_path,
6015 raise NotEnoughData(
6016 "encoded length is longer than data",
6017 klass=self.__class__,
6018 decode_path=decode_path,
6021 tlvlen = tlen + llen + l
6022 v, tail = tlv[:tlvlen], v[l:]
6023 obj = self.__class__(
6024 value=None if evgen_mode else v.tobytes(),
6026 optional=self.optional,
6027 _decoded=(offset, 0, tlvlen),
6029 obj.tag = t.tobytes()
6030 yield decode_path, obj, tail
6033 return pp_console_row(next(self.pps()))
6035 def pps(self, decode_path=()):
6039 elif value.__class__ == binary_type:
6045 asn1_type_name=self.asn1_type_name,
6046 obj_name=self.__class__.__name__,
6047 decode_path=decode_path,
6049 blob=self._value if self._value.__class__ == binary_type else None,
6050 optional=self.optional,
6051 default=self == self.default,
6052 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6053 expl=None if self._expl is None else tag_decode(self._expl),
6058 expl_offset=self.expl_offset if self.expled else None,
6059 expl_tlen=self.expl_tlen if self.expled else None,
6060 expl_llen=self.expl_llen if self.expled else None,
6061 expl_vlen=self.expl_vlen if self.expled else None,
6062 expl_lenindef=self.expl_lenindef,
6063 lenindef=self.lenindef,
6066 defined_by, defined = self.defined or (None, None)
6067 if defined_by is not None:
6069 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6071 for pp in self.pps_lenindef(decode_path):
6075 ########################################################################
6076 # ASN.1 constructed types
6077 ########################################################################
6079 def abs_decode_path(decode_path, rel_path):
6080 """Create an absolute decode path from current and relative ones
6082 :param decode_path: current decode path, starting point. Tuple of strings
6083 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6084 If first tuple's element is "/", then treat it as
6085 an absolute path, ignoring ``decode_path`` as
6086 starting point. Also this tuple can contain ".."
6087 elements, stripping the leading element from
6090 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6091 ("foo", "bar", "baz", "whatever")
6092 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6094 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6097 if rel_path[0] == "/":
6099 if rel_path[0] == "..":
6100 return abs_decode_path(decode_path[:-1], rel_path[1:])
6101 return decode_path + rel_path
6104 SequenceState = namedtuple(
6106 BasicState._fields + ("specs", "value",),
6111 class SequenceEncode1stMixing(object):
6112 def _encode1st(self, state):
6114 idx = len(state) - 1
6116 for v in self._values_for_encoding():
6117 l, _ = v.encode1st(state)
6120 return len(self.tag) + len_size(vlen) + vlen, state
6123 class Sequence(SequenceEncode1stMixing, Obj):
6124 """``SEQUENCE`` structure type
6126 You have to make specification of sequence::
6128 class Extension(Sequence):
6130 ("extnID", ObjectIdentifier()),
6131 ("critical", Boolean(default=False)),
6132 ("extnValue", OctetString()),
6135 Then, you can work with it as with dictionary.
6137 >>> ext = Extension()
6138 >>> Extension().specs
6140 ('extnID', OBJECT IDENTIFIER),
6141 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6142 ('extnValue', OCTET STRING),
6144 >>> ext["extnID"] = "1.2.3"
6145 Traceback (most recent call last):
6146 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6147 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6149 You can determine if sequence is ready to be encoded:
6154 Traceback (most recent call last):
6155 pyderasn.ObjNotReady: object is not ready: extnValue
6156 >>> ext["extnValue"] = OctetString(b"foobar")
6160 Value you want to assign, must have the same **type** as in
6161 corresponding specification, but it can have different tags,
6162 optional/default attributes -- they will be taken from specification
6165 class TBSCertificate(Sequence):
6167 ("version", Version(expl=tag_ctxc(0), default="v1")),
6170 >>> tbs = TBSCertificate()
6171 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6173 Assign ``None`` to remove value from sequence.
6175 You can set values in Sequence during its initialization:
6177 >>> AlgorithmIdentifier((
6178 ("algorithm", ObjectIdentifier("1.2.3")),
6179 ("parameters", Any(Null()))
6181 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6183 You can determine if value exists/set in the sequence and take its value:
6185 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6188 OBJECT IDENTIFIER 1.2.3
6190 But pay attention that if value has default, then it won't be (not
6191 in) in the sequence (because ``DEFAULT`` must not be encoded in
6192 DER), but you can read its value:
6194 >>> "critical" in ext, ext["critical"]
6195 (False, BOOLEAN False)
6196 >>> ext["critical"] = Boolean(True)
6197 >>> "critical" in ext, ext["critical"]
6198 (True, BOOLEAN True)
6200 All defaulted values are always optional.
6202 .. _allow_default_values_ctx:
6204 DER prohibits default value encoding and will raise an error if
6205 default value is unexpectedly met during decode.
6206 If :ref:`bered <bered_ctx>` context option is set, then no error
6207 will be raised, but ``bered`` attribute set. You can disable strict
6208 defaulted values existence validation by setting
6209 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6211 All values with DEFAULT specified are decoded atomically in
6212 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6213 SEQUENCE, then it will be yielded as a single element, not
6214 disassembled. That is required for DEFAULT existence check.
6216 Two sequences are equal if they have equal specification (schema),
6217 implicit/explicit tagging and the same values.
6219 __slots__ = ("specs",)
6220 tag_default = tag_encode(form=TagFormConstructed, num=16)
6221 asn1_type_name = "SEQUENCE"
6233 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6235 schema = getattr(self, "schema", ())
6237 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6240 if value is not None:
6241 if issubclass(value.__class__, Sequence):
6242 self._value = value._value
6243 elif hasattr(value, "__iter__"):
6244 for seq_key, seq_value in value:
6245 self[seq_key] = seq_value
6247 raise InvalidValueType((Sequence,))
6248 if default is not None:
6249 if not issubclass(default.__class__, Sequence):
6250 raise InvalidValueType((Sequence,))
6251 default_value = default._value
6252 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6253 default_obj.specs = self.specs
6254 default_obj._value = default_value
6255 self.default = default_obj
6257 self._value = copy(default_obj._value)
6261 for name, spec in iteritems(self.specs):
6262 value = self._value.get(name)
6273 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6275 return any(value.bered for value in itervalues(self._value))
6277 def __getstate__(self):
6278 return SequenceState(
6292 {k: copy(v) for k, v in iteritems(self._value)},
6295 def __setstate__(self, state):
6296 super(Sequence, self).__setstate__(state)
6297 self.specs = state.specs
6298 self._value = state.value
6300 def __eq__(self, their):
6301 if not isinstance(their, self.__class__):
6304 self.specs == their.specs and
6305 self.tag == their.tag and
6306 self._expl == their._expl and
6307 self._value == their._value
6318 return self.__class__(
6321 impl=self.tag if impl is None else impl,
6322 expl=self._expl if expl is None else expl,
6323 default=self.default if default is None else default,
6324 optional=self.optional if optional is None else optional,
6327 def __contains__(self, key):
6328 return key in self._value
6330 def __setitem__(self, key, value):
6331 spec = self.specs.get(key)
6333 raise ObjUnknown(key)
6335 self._value.pop(key, None)
6337 if not isinstance(value, spec.__class__):
6338 raise InvalidValueType((spec.__class__,))
6339 value = spec(value=value)
6340 if spec.default is not None and value == spec.default:
6341 self._value.pop(key, None)
6343 self._value[key] = value
6345 def __getitem__(self, key):
6346 value = self._value.get(key)
6347 if value is not None:
6349 spec = self.specs.get(key)
6351 raise ObjUnknown(key)
6352 if spec.default is not None:
6356 def _values_for_encoding(self):
6357 for name, spec in iteritems(self.specs):
6358 value = self._value.get(name)
6362 raise ObjNotReady(name)
6366 v = b"".join(v.encode() for v in self._values_for_encoding())
6367 return b"".join((self.tag, len_encode(len(v)), v))
6369 def _encode2nd(self, writer, state_iter):
6370 write_full(writer, self.tag + len_encode(next(state_iter)))
6371 for v in self._values_for_encoding():
6372 v.encode2nd(writer, state_iter)
6374 def _encode_cer(self, writer):
6375 write_full(writer, self.tag + LENINDEF)
6376 for v in self._values_for_encoding():
6377 v.encode_cer(writer)
6378 write_full(writer, EOC)
6380 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6382 t, tlen, lv = tag_strip(tlv)
6383 except DecodeError as err:
6384 raise err.__class__(
6386 klass=self.__class__,
6387 decode_path=decode_path,
6392 klass=self.__class__,
6393 decode_path=decode_path,
6396 if tag_only: # pragma: no cover
6400 ctx_bered = ctx.get("bered", False)
6402 l, llen, v = len_decode(lv)
6403 except LenIndefForm as err:
6405 raise err.__class__(
6407 klass=self.__class__,
6408 decode_path=decode_path,
6411 l, llen, v = 0, 1, lv[1:]
6413 except DecodeError as err:
6414 raise err.__class__(
6416 klass=self.__class__,
6417 decode_path=decode_path,
6421 raise NotEnoughData(
6422 "encoded length is longer than data",
6423 klass=self.__class__,
6424 decode_path=decode_path,
6428 v, tail = v[:l], v[l:]
6430 sub_offset = offset + tlen + llen
6433 ctx_allow_default_values = ctx.get("allow_default_values", False)
6434 for name, spec in iteritems(self.specs):
6435 if spec.optional and (
6436 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6440 spec_defaulted = spec.default is not None
6441 sub_decode_path = decode_path + (name,)
6443 if evgen_mode and not spec_defaulted:
6444 for _decode_path, value, v_tail in spec.decode_evgen(
6448 decode_path=sub_decode_path,
6450 _ctx_immutable=False,
6452 yield _decode_path, value, v_tail
6454 _, value, v_tail = next(spec.decode_evgen(
6458 decode_path=sub_decode_path,
6460 _ctx_immutable=False,
6463 except TagMismatch as err:
6464 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6468 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6469 if not evgen_mode and defined is not None:
6470 defined_by, defined_spec = defined
6471 if issubclass(value.__class__, SequenceOf):
6472 for i, _value in enumerate(value):
6473 sub_sub_decode_path = sub_decode_path + (
6475 DecodePathDefBy(defined_by),
6477 defined_value, defined_tail = defined_spec.decode(
6478 memoryview(bytes(_value)),
6480 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6481 if value.expled else (value.tlen + value.llen)
6484 decode_path=sub_sub_decode_path,
6486 _ctx_immutable=False,
6488 if len(defined_tail) > 0:
6491 klass=self.__class__,
6492 decode_path=sub_sub_decode_path,
6495 _value.defined = (defined_by, defined_value)
6497 defined_value, defined_tail = defined_spec.decode(
6498 memoryview(bytes(value)),
6500 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6501 if value.expled else (value.tlen + value.llen)
6504 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6506 _ctx_immutable=False,
6508 if len(defined_tail) > 0:
6511 klass=self.__class__,
6512 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6515 value.defined = (defined_by, defined_value)
6517 value_len = value.fulllen
6519 sub_offset += value_len
6523 yield sub_decode_path, value, v_tail
6524 if value == spec.default:
6525 if ctx_bered or ctx_allow_default_values:
6529 "DEFAULT value met",
6530 klass=self.__class__,
6531 decode_path=sub_decode_path,
6535 values[name] = value
6536 spec_defines = getattr(spec, "defines", ())
6537 if len(spec_defines) == 0:
6538 defines_by_path = ctx.get("defines_by_path", ())
6539 if len(defines_by_path) > 0:
6540 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6541 if spec_defines is not None and len(spec_defines) > 0:
6542 for rel_path, schema in spec_defines:
6543 defined = schema.get(value, None)
6544 if defined is not None:
6545 ctx.setdefault("_defines", []).append((
6546 abs_decode_path(sub_decode_path[:-1], rel_path),
6550 if v[:EOC_LEN].tobytes() != EOC:
6553 klass=self.__class__,
6554 decode_path=decode_path,
6562 klass=self.__class__,
6563 decode_path=decode_path,
6566 obj = self.__class__(
6570 default=self.default,
6571 optional=self.optional,
6572 _decoded=(offset, llen, vlen),
6575 obj.lenindef = lenindef
6576 obj.ber_encoded = ber_encoded
6577 yield decode_path, obj, tail
6580 value = pp_console_row(next(self.pps()))
6582 for name in self.specs:
6583 _value = self._value.get(name)
6586 cols.append("%s: %s" % (name, repr(_value)))
6587 return "%s[%s]" % (value, "; ".join(cols))
6589 def pps(self, decode_path=()):
6592 asn1_type_name=self.asn1_type_name,
6593 obj_name=self.__class__.__name__,
6594 decode_path=decode_path,
6595 optional=self.optional,
6596 default=self == self.default,
6597 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6598 expl=None if self._expl is None else tag_decode(self._expl),
6603 expl_offset=self.expl_offset if self.expled else None,
6604 expl_tlen=self.expl_tlen if self.expled else None,
6605 expl_llen=self.expl_llen if self.expled else None,
6606 expl_vlen=self.expl_vlen if self.expled else None,
6607 expl_lenindef=self.expl_lenindef,
6608 lenindef=self.lenindef,
6609 ber_encoded=self.ber_encoded,
6612 for name in self.specs:
6613 value = self._value.get(name)
6616 yield value.pps(decode_path=decode_path + (name,))
6617 for pp in self.pps_lenindef(decode_path):
6621 class Set(Sequence, SequenceEncode1stMixing):
6622 """``SET`` structure type
6624 Its usage is identical to :py:class:`pyderasn.Sequence`.
6626 .. _allow_unordered_set_ctx:
6628 DER prohibits unordered values encoding and will raise an error
6629 during decode. If :ref:`bered <bered_ctx>` context option is set,
6630 then no error will occur. Also you can disable strict values
6631 ordering check by setting ``"allow_unordered_set": True``
6632 :ref:`context <ctx>` option.
6635 tag_default = tag_encode(form=TagFormConstructed, num=17)
6636 asn1_type_name = "SET"
6638 def _values_for_encoding(self):
6640 super(Set, self)._values_for_encoding(),
6641 key=attrgetter("tag_order"),
6644 def _encode_cer(self, writer):
6645 write_full(writer, self.tag + LENINDEF)
6647 super(Set, self)._values_for_encoding(),
6648 key=attrgetter("tag_order_cer"),
6650 v.encode_cer(writer)
6651 write_full(writer, EOC)
6653 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6655 t, tlen, lv = tag_strip(tlv)
6656 except DecodeError as err:
6657 raise err.__class__(
6659 klass=self.__class__,
6660 decode_path=decode_path,
6665 klass=self.__class__,
6666 decode_path=decode_path,
6673 ctx_bered = ctx.get("bered", False)
6675 l, llen, v = len_decode(lv)
6676 except LenIndefForm as err:
6678 raise err.__class__(
6680 klass=self.__class__,
6681 decode_path=decode_path,
6684 l, llen, v = 0, 1, lv[1:]
6686 except DecodeError as err:
6687 raise err.__class__(
6689 klass=self.__class__,
6690 decode_path=decode_path,
6694 raise NotEnoughData(
6695 "encoded length is longer than data",
6696 klass=self.__class__,
6700 v, tail = v[:l], v[l:]
6702 sub_offset = offset + tlen + llen
6705 ctx_allow_default_values = ctx.get("allow_default_values", False)
6706 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6707 tag_order_prev = (0, 0)
6708 _specs_items = copy(self.specs)
6711 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6713 for name, spec in iteritems(_specs_items):
6714 sub_decode_path = decode_path + (name,)
6720 decode_path=sub_decode_path,
6723 _ctx_immutable=False,
6730 klass=self.__class__,
6731 decode_path=decode_path,
6734 spec_defaulted = spec.default is not None
6735 if evgen_mode and not spec_defaulted:
6736 for _decode_path, value, v_tail in spec.decode_evgen(
6740 decode_path=sub_decode_path,
6742 _ctx_immutable=False,
6744 yield _decode_path, value, v_tail
6746 _, value, v_tail = next(spec.decode_evgen(
6750 decode_path=sub_decode_path,
6752 _ctx_immutable=False,
6755 value_tag_order = value.tag_order
6756 value_len = value.fulllen
6757 if tag_order_prev >= value_tag_order:
6758 if ctx_bered or ctx_allow_unordered_set:
6762 "unordered " + self.asn1_type_name,
6763 klass=self.__class__,
6764 decode_path=sub_decode_path,
6769 yield sub_decode_path, value, v_tail
6770 if value != spec.default:
6772 elif ctx_bered or ctx_allow_default_values:
6776 "DEFAULT value met",
6777 klass=self.__class__,
6778 decode_path=sub_decode_path,
6781 values[name] = value
6782 del _specs_items[name]
6783 tag_order_prev = value_tag_order
6784 sub_offset += value_len
6788 obj = self.__class__(
6792 default=self.default,
6793 optional=self.optional,
6794 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6797 if v[:EOC_LEN].tobytes() != EOC:
6800 klass=self.__class__,
6801 decode_path=decode_path,
6806 for name, spec in iteritems(self.specs):
6807 if name not in values and not spec.optional:
6809 "%s value is not ready" % name,
6810 klass=self.__class__,
6811 decode_path=decode_path,
6816 obj.ber_encoded = ber_encoded
6817 yield decode_path, obj, tail
6820 SequenceOfState = namedtuple(
6822 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6827 class SequenceOf(SequenceEncode1stMixing, Obj):
6828 """``SEQUENCE OF`` sequence type
6830 For that kind of type you must specify the object it will carry on
6831 (bounds are for example here, not required)::
6833 class Ints(SequenceOf):
6838 >>> ints.append(Integer(123))
6839 >>> ints.append(Integer(234))
6841 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6842 >>> [int(i) for i in ints]
6844 >>> ints.append(Integer(345))
6845 Traceback (most recent call last):
6846 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6849 >>> ints[1] = Integer(345)
6851 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6853 You can initialize sequence with preinitialized values:
6855 >>> ints = Ints([Integer(123), Integer(234)])
6857 Also you can use iterator as a value:
6859 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6861 And it won't be iterated until encoding process. Pay attention that
6862 bounds and required schema checks are done only during the encoding
6863 process in that case! After encode was called, then value is zeroed
6864 back to empty list and you have to set it again. That mode is useful
6865 mainly with CER encoding mode, where all objects from the iterable
6866 will be streamed to the buffer, without copying all of them to
6869 __slots__ = ("spec", "_bound_min", "_bound_max")
6870 tag_default = tag_encode(form=TagFormConstructed, num=16)
6871 asn1_type_name = "SEQUENCE OF"
6884 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6886 schema = getattr(self, "schema", None)
6888 raise ValueError("schema must be specified")
6890 self._bound_min, self._bound_max = getattr(
6894 ) if bounds is None else bounds
6896 if value is not None:
6897 self._value = self._value_sanitize(value)
6898 if default is not None:
6899 default_value = self._value_sanitize(default)
6900 default_obj = self.__class__(
6905 default_obj._value = default_value
6906 self.default = default_obj
6908 self._value = copy(default_obj._value)
6910 def _value_sanitize(self, value):
6912 if issubclass(value.__class__, SequenceOf):
6913 value = value._value
6914 elif hasattr(value, NEXT_ATTR_NAME):
6916 elif hasattr(value, "__iter__"):
6919 raise InvalidValueType((self.__class__, iter, "iterator"))
6921 if not self._bound_min <= len(value) <= self._bound_max:
6922 raise BoundsError(self._bound_min, len(value), self._bound_max)
6923 class_expected = self.spec.__class__
6925 if not isinstance(v, class_expected):
6926 raise InvalidValueType((class_expected,))
6931 if hasattr(self._value, NEXT_ATTR_NAME):
6933 if self._bound_min > 0 and len(self._value) == 0:
6935 return all(v.ready for v in self._value)
6939 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6941 return any(v.bered for v in self._value)
6943 def __getstate__(self):
6944 if hasattr(self._value, NEXT_ATTR_NAME):
6945 raise ValueError("can not pickle SequenceOf with iterator")
6946 return SequenceOfState(
6960 [copy(v) for v in self._value],
6965 def __setstate__(self, state):
6966 super(SequenceOf, self).__setstate__(state)
6967 self.spec = state.spec
6968 self._value = state.value
6969 self._bound_min = state.bound_min
6970 self._bound_max = state.bound_max
6972 def __eq__(self, their):
6973 if isinstance(their, self.__class__):
6975 self.spec == their.spec and
6976 self.tag == their.tag and
6977 self._expl == their._expl and
6978 self._value == their._value
6980 if hasattr(their, "__iter__"):
6981 return self._value == list(their)
6993 return self.__class__(
6997 (self._bound_min, self._bound_max)
6998 if bounds is None else bounds
7000 impl=self.tag if impl is None else impl,
7001 expl=self._expl if expl is None else expl,
7002 default=self.default if default is None else default,
7003 optional=self.optional if optional is None else optional,
7006 def __contains__(self, key):
7007 return key in self._value
7009 def append(self, value):
7010 if not isinstance(value, self.spec.__class__):
7011 raise InvalidValueType((self.spec.__class__,))
7012 if len(self._value) + 1 > self._bound_max:
7015 len(self._value) + 1,
7018 self._value.append(value)
7021 return iter(self._value)
7024 return len(self._value)
7026 def __setitem__(self, key, value):
7027 if not isinstance(value, self.spec.__class__):
7028 raise InvalidValueType((self.spec.__class__,))
7029 self._value[key] = self.spec(value=value)
7031 def __getitem__(self, key):
7032 return self._value[key]
7034 def _values_for_encoding(self):
7035 return iter(self._value)
7038 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7041 values_append = values.append
7042 class_expected = self.spec.__class__
7043 values_for_encoding = self._values_for_encoding()
7045 for v in values_for_encoding:
7046 if not isinstance(v, class_expected):
7047 raise InvalidValueType((class_expected,))
7048 values_append(v.encode())
7049 if not self._bound_min <= len(values) <= self._bound_max:
7050 raise BoundsError(self._bound_min, len(values), self._bound_max)
7051 value = b"".join(values)
7053 value = b"".join(v.encode() for v in self._values_for_encoding())
7054 return b"".join((self.tag, len_encode(len(value)), value))
7056 def _encode1st(self, state):
7057 state = super(SequenceOf, self)._encode1st(state)
7058 if hasattr(self._value, NEXT_ATTR_NAME):
7062 def _encode2nd(self, writer, state_iter):
7063 write_full(writer, self.tag + len_encode(next(state_iter)))
7064 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7067 class_expected = self.spec.__class__
7068 values_for_encoding = self._values_for_encoding()
7070 for v in values_for_encoding:
7071 if not isinstance(v, class_expected):
7072 raise InvalidValueType((class_expected,))
7073 v.encode2nd(writer, state_iter)
7075 if not self._bound_min <= values_count <= self._bound_max:
7076 raise BoundsError(self._bound_min, values_count, self._bound_max)
7078 for v in self._values_for_encoding():
7079 v.encode2nd(writer, state_iter)
7081 def _encode_cer(self, writer):
7082 write_full(writer, self.tag + LENINDEF)
7083 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7085 class_expected = self.spec.__class__
7087 values_for_encoding = self._values_for_encoding()
7089 for v in values_for_encoding:
7090 if not isinstance(v, class_expected):
7091 raise InvalidValueType((class_expected,))
7092 v.encode_cer(writer)
7094 if not self._bound_min <= values_count <= self._bound_max:
7095 raise BoundsError(self._bound_min, values_count, self._bound_max)
7097 for v in self._values_for_encoding():
7098 v.encode_cer(writer)
7099 write_full(writer, EOC)
7109 ordering_check=False,
7112 t, tlen, lv = tag_strip(tlv)
7113 except DecodeError as err:
7114 raise err.__class__(
7116 klass=self.__class__,
7117 decode_path=decode_path,
7122 klass=self.__class__,
7123 decode_path=decode_path,
7130 ctx_bered = ctx.get("bered", False)
7132 l, llen, v = len_decode(lv)
7133 except LenIndefForm as err:
7135 raise err.__class__(
7137 klass=self.__class__,
7138 decode_path=decode_path,
7141 l, llen, v = 0, 1, lv[1:]
7143 except DecodeError as err:
7144 raise err.__class__(
7146 klass=self.__class__,
7147 decode_path=decode_path,
7151 raise NotEnoughData(
7152 "encoded length is longer than data",
7153 klass=self.__class__,
7154 decode_path=decode_path,
7158 v, tail = v[:l], v[l:]
7160 sub_offset = offset + tlen + llen
7163 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7164 value_prev = memoryview(v[:0])
7168 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7170 sub_decode_path = decode_path + (str(_value_count),)
7172 for _decode_path, value, v_tail in spec.decode_evgen(
7176 decode_path=sub_decode_path,
7178 _ctx_immutable=False,
7180 yield _decode_path, value, v_tail
7182 _, value, v_tail = next(spec.decode_evgen(
7186 decode_path=sub_decode_path,
7188 _ctx_immutable=False,
7191 value_len = value.fulllen
7193 if value_prev.tobytes() > v[:value_len].tobytes():
7194 if ctx_bered or ctx_allow_unordered_set:
7198 "unordered " + self.asn1_type_name,
7199 klass=self.__class__,
7200 decode_path=sub_decode_path,
7203 value_prev = v[:value_len]
7206 _value.append(value)
7207 sub_offset += value_len
7210 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7212 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7213 klass=self.__class__,
7214 decode_path=decode_path,
7218 obj = self.__class__(
7219 value=None if evgen_mode else _value,
7221 bounds=(self._bound_min, self._bound_max),
7224 default=self.default,
7225 optional=self.optional,
7226 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7228 except BoundsError as err:
7231 klass=self.__class__,
7232 decode_path=decode_path,
7236 if v[:EOC_LEN].tobytes() != EOC:
7239 klass=self.__class__,
7240 decode_path=decode_path,
7245 obj.ber_encoded = ber_encoded
7246 yield decode_path, obj, tail
7250 pp_console_row(next(self.pps())),
7251 ", ".join(repr(v) for v in self._value),
7254 def pps(self, decode_path=()):
7257 asn1_type_name=self.asn1_type_name,
7258 obj_name=self.__class__.__name__,
7259 decode_path=decode_path,
7260 optional=self.optional,
7261 default=self == self.default,
7262 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7263 expl=None if self._expl is None else tag_decode(self._expl),
7268 expl_offset=self.expl_offset if self.expled else None,
7269 expl_tlen=self.expl_tlen if self.expled else None,
7270 expl_llen=self.expl_llen if self.expled else None,
7271 expl_vlen=self.expl_vlen if self.expled else None,
7272 expl_lenindef=self.expl_lenindef,
7273 lenindef=self.lenindef,
7274 ber_encoded=self.ber_encoded,
7277 for i, value in enumerate(self._value):
7278 yield value.pps(decode_path=decode_path + (str(i),))
7279 for pp in self.pps_lenindef(decode_path):
7283 class SetOf(SequenceOf):
7284 """``SET OF`` sequence type
7286 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7289 tag_default = tag_encode(form=TagFormConstructed, num=17)
7290 asn1_type_name = "SET OF"
7292 def _value_sanitize(self, value):
7293 value = super(SetOf, self)._value_sanitize(value)
7294 if hasattr(value, NEXT_ATTR_NAME):
7296 "SetOf does not support iterator values, as no sense in them"
7301 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7302 return b"".join((self.tag, len_encode(len(v)), v))
7304 def _encode2nd(self, writer, state_iter):
7305 write_full(writer, self.tag + len_encode(next(state_iter)))
7307 for v in self._values_for_encoding():
7309 v.encode2nd(buf.write, state_iter)
7310 values.append(buf.getvalue())
7313 write_full(writer, v)
7315 def _encode_cer(self, writer):
7316 write_full(writer, self.tag + LENINDEF)
7317 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7318 write_full(writer, v)
7319 write_full(writer, EOC)
7321 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7322 return super(SetOf, self)._decode(
7329 ordering_check=True,
7333 def obj_by_path(pypath): # pragma: no cover
7334 """Import object specified as string Python path
7336 Modules must be separated from classes/functions with ``:``.
7338 >>> obj_by_path("foo.bar:Baz")
7339 <class 'foo.bar.Baz'>
7340 >>> obj_by_path("foo.bar:Baz.boo")
7341 <classmethod 'foo.bar.Baz.boo'>
7343 mod, objs = pypath.rsplit(":", 1)
7344 from importlib import import_module
7345 obj = import_module(mod)
7346 for obj_name in objs.split("."):
7347 obj = getattr(obj, obj_name)
7351 def generic_decoder(): # pragma: no cover
7352 # All of this below is a big hack with self references
7353 choice = PrimitiveTypes()
7354 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7355 choice.specs["SetOf"] = SetOf(schema=choice)
7356 for i in six_xrange(31):
7357 choice.specs["SequenceOf%d" % i] = SequenceOf(
7361 choice.specs["Any"] = Any()
7363 # Class name equals to type name, to omit it from output
7364 class SEQUENCEOF(SequenceOf):
7372 with_decode_path=False,
7373 decode_path_only=(),
7376 def _pprint_pps(pps):
7378 if hasattr(pp, "_fields"):
7380 decode_path_only != () and
7381 pp.decode_path[:len(decode_path_only)] != decode_path_only
7384 if pp.asn1_type_name == Choice.asn1_type_name:
7386 pp_kwargs = pp._asdict()
7387 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7388 pp = _pp(**pp_kwargs)
7389 yield pp_console_row(
7394 with_colours=with_colours,
7395 with_decode_path=with_decode_path,
7396 decode_path_len_decrease=len(decode_path_only),
7398 for row in pp_console_blob(
7400 decode_path_len_decrease=len(decode_path_only),
7404 for row in _pprint_pps(pp):
7406 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7407 return SEQUENCEOF(), pprint_any
7410 def ascii_visualize(ba):
7411 """Output only ASCII printable characters, like in hexdump -C
7413 Example output for given binary string (right part)::
7415 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.|
7418 return "".join((chr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7422 """Generate ``hexdump -C`` like output
7426 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..|
7427 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j|
7429 Result of that function is a generator of lines, where each line is
7434 ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7435 " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7436 " |i..?.....G...k.j|"]
7440 hexed = hexenc(raw).upper()
7441 addr, cols = 0, ["%08x " % 0]
7442 for i in six_xrange(0, len(hexed), 2):
7443 if i != 0 and i // 2 % 8 == 0:
7445 if i != 0 and i // 2 % 16 == 0:
7446 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:addr + 16])))
7449 cols = ["%08x " % addr]
7450 cols.append(" " + hexed[i:i + 2])
7452 cols.append(" |%s|" % ascii_visualize(bytearray(raw[addr:])))
7456 def browse(raw, obj, oid_maps=()):
7457 """Interactive browser
7459 :param bytes raw: binary data you decoded
7460 :param obj: decoded :py:class:`pyderasn.Obj`
7461 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7462 Its human readable form is printed when OID is met
7464 .. note:: `urwid <http://urwid.org/>`__ dependency required
7466 This browser is an interactive terminal application for browsing
7467 structures of your decoded ASN.1 objects. You can quit it with **q**
7468 key. It consists of three windows:
7471 View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7472 **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7473 **Left** key goes to constructed element above. **Plus**/**Minus**
7474 keys collapse/uncollapse constructed elements. **Space** toggles it
7476 window with various information about element. You can scroll it
7477 with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7479 window with raw data hexdump and highlighted current element's
7480 contents. It automatically focuses on element's data. You can
7481 scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7482 speed) keys. If element has explicit tag, then it also will be
7483 highlighted with different colour
7485 Window's header contains current decode path and progress bars with
7486 position in *info* and *hexdump* windows.
7488 If you press **d**, then current element will be saved in the
7489 current directory under its decode path name (adding ".0", ".1", etc
7490 suffix if such file already exists). **D** will save it with explicit tag.
7492 You can also invoke it with ``--browse`` command line argument.
7494 from copy import deepcopy
7495 from os.path import exists as path_exists
7498 class TW(urwid.TreeWidget):
7499 def __init__(self, state, *args, **kwargs):
7501 self.scrolled = {"info": False, "hexdump": False}
7502 super(TW, self).__init__(*args, **kwargs)
7505 pp = self.get_node().get_value()
7506 constructed = len(pp) > 1
7507 return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7509 def _state_update(self):
7510 pp, _ = self._get_pp()
7511 self.state["decode_path"].set_text(
7512 ":".join(str(p) for p in pp.decode_path)
7514 lines = deepcopy(self.state["hexed"])
7516 def attr_set(i, attr):
7517 line = lines[i // 16]
7518 idx = 1 + (i - 16 * (i // 16))
7519 line[idx] = (attr, line[idx])
7521 if pp.expl_offset is not None:
7522 for i in six_xrange(
7524 pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7526 attr_set(i, "select-expl")
7527 for i in six_xrange(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7528 attr_set(i, "select-value")
7529 self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7530 self.state["hexdump"].set_focus(pp.offset // 16)
7531 self.state["hexdump"].set_focus_valign("middle")
7532 self.state["hexdump_bar"].set_completion(
7533 (100 * pp.offset // 16) //
7534 len(self.state["hexdump"]._body.positions())
7538 [("header", "Name: "), pp.obj_name],
7539 [("header", "Type: "), pp.asn1_type_name],
7540 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7541 [("header", "[TLV]len: "), "%d/%d/%d" % (
7542 pp.tlen, pp.llen, pp.vlen,
7544 [("header", "TLVlen: "), "%d" % sum((
7545 pp.tlen, pp.llen, pp.vlen,
7547 [("header", "Slice: "), "[%d:%d]" % (
7548 pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7552 lines.append([("warning", "LENINDEF")])
7554 lines.append([("warning", "BER encoded")])
7556 lines.append([("warning", "BERed")])
7557 if pp.expl is not None:
7558 lines.append([("header", "EXPLICIT")])
7559 klass, _, num = pp.expl
7560 lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)])
7561 if pp.expl_offset is not None:
7562 lines.append([" Offset: %d" % pp.expl_offset])
7563 lines.append([" [TLV]len: %d/%d/%d" % (
7564 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7566 lines.append([" TLVlen: %d" % sum((
7567 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7569 lines.append([" Slice: [%d:%d]" % (
7571 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7573 if pp.impl is not None:
7574 klass, _, num = pp.impl
7576 ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7579 lines.append(["OPTIONAL"])
7581 lines.append(["DEFAULT"])
7582 if len(pp.decode_path) > 0:
7583 ent = pp.decode_path[-1]
7584 if isinstance(ent, DecodePathDefBy):
7586 value = str(ent.defined_by)
7587 oid_name = find_oid_name(
7588 ent.defined_by.asn1_type_name, oid_maps, value,
7590 lines.append([("header", "DEFINED BY: "), "%s" % (
7591 value if oid_name is None
7592 else "%s (%s)" % (oid_name, value)
7595 if pp.value is not None:
7596 lines.append([("header", "Value: "), pp.value])
7598 len(oid_maps) > 0 and
7599 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7601 for oid_map in oid_maps:
7602 oid_name = oid_map.get(pp.value)
7603 if oid_name is not None:
7604 lines.append([("header", "Human: "), oid_name])
7606 if pp.asn1_type_name == Integer.asn1_type_name:
7608 ("header", "Decimal: "), "%d" % int(pp.obj),
7611 ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7613 if pp.blob.__class__ == binary_type:
7614 blob = hexenc(pp.blob).upper()
7615 for i in six_xrange(0, len(blob), 32):
7616 lines.append([colonize_hex(blob[i:i + 32])])
7617 elif pp.blob.__class__ == tuple:
7618 lines.append([", ".join(pp.blob)])
7619 self.state["info"]._set_body([urwid.Text(line) for line in lines])
7620 self.state["info_bar"].set_completion(0)
7622 def selectable(self):
7623 if self.state["widget_current"] != self:
7624 self.state["widget_current"] = self
7625 self.scrolled["info"] = False
7626 self.scrolled["hexdump"] = False
7627 self._state_update()
7628 return super(TW, self).selectable()
7630 def get_display_text(self):
7631 pp, constructed = self._get_pp()
7632 style = "constructed" if constructed else ""
7633 if len(pp.decode_path) == 0:
7634 return (style, pp.obj_name)
7635 if pp.asn1_type_name == "EOC":
7636 return ("eoc", "EOC")
7637 ent = pp.decode_path[-1]
7638 if isinstance(ent, DecodePathDefBy):
7639 value = str(ent.defined_by)
7640 oid_name = find_oid_name(
7641 ent.defined_by.asn1_type_name, oid_maps, value,
7643 return ("defby", "DEFBY:" + (
7644 value if oid_name is None else oid_name
7648 def _scroll(self, what, step):
7649 self.state[what]._invalidate()
7650 pos = self.state[what].focus_position
7651 if not self.scrolled[what]:
7652 self.scrolled[what] = True
7654 pos = max(0, pos + step)
7655 pos = min(pos, len(self.state[what]._body.positions()) - 1)
7656 self.state[what].set_focus(pos)
7657 self.state[what].set_focus_valign("top")
7658 self.state[what + "_bar"].set_completion(
7659 (100 * pos) // len(self.state[what]._body.positions())
7662 def keypress(self, size, key):
7664 raise urwid.ExitMainLoop()
7667 self.expanded = not self.expanded
7668 self.update_expanded_icon()
7671 hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7672 if key in hexdump_steps:
7673 self._scroll("hexdump", hexdump_steps[key])
7676 info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7677 if key in info_steps:
7678 self._scroll("info", info_steps[key])
7681 if key in ("d", "D"):
7682 pp, _ = self._get_pp()
7683 dp = ":".join(str(p) for p in pp.decode_path)
7684 dp = dp.replace(" ", "_")
7687 if key == "d" or pp.expl_offset is None:
7688 data = self.state["raw"][pp.offset:(
7689 pp.offset + pp.tlen + pp.llen + pp.vlen
7692 data = self.state["raw"][pp.expl_offset:(
7693 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7697 def duplicate_path(dp, ctr):
7700 return "%s.%d" % (dp, ctr)
7703 if not path_exists(duplicate_path(dp, ctr)):
7706 dp = duplicate_path(dp, ctr)
7707 with open(dp, "wb") as fd:
7709 self.state["decode_path"].set_text(
7710 ("warning", "Saved to: " + dp)
7713 return super(TW, self).keypress(size, key)
7715 class PN(urwid.ParentNode):
7716 def __init__(self, state, value, *args, **kwargs):
7718 if not hasattr(value, "_fields"):
7720 super(PN, self).__init__(value, *args, **kwargs)
7722 def load_widget(self):
7723 return TW(self.state, self)
7725 def load_child_keys(self):
7726 value = self.get_value()
7727 if hasattr(value, "_fields"):
7729 return range(len(value[1:]))
7731 def load_child_node(self, key):
7734 self.get_value()[key + 1],
7737 depth=self.get_depth() + 1,
7740 class LabeledPG(urwid.ProgressBar):
7741 def __init__(self, label, *args, **kwargs):
7743 super(LabeledPG, self).__init__(*args, **kwargs)
7746 return "%s: %s" % (self.label, super(LabeledPG, self).get_text())
7748 WinHexdump = urwid.ListBox([urwid.Text("")])
7749 WinInfo = urwid.ListBox([urwid.Text("")])
7750 WinDecodePath = urwid.Text("", "center")
7751 WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7752 WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7753 WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7756 "hexed": list(hexdump(raw)),
7757 "widget_current": None,
7759 "info_bar": WinInfoBar,
7760 "hexdump": WinHexdump,
7761 "hexdump_bar": WinHexdumpBar,
7762 "decode_path": WinDecodePath,
7766 help_text = " ".join((
7768 "space:(un)collapse",
7769 "(pg)up/down/home/end:nav",
7770 "jkJK:hexdump hlHL:info",
7776 ("weight", 1, WinTree),
7777 ("weight", 2, urwid.Pile([
7778 urwid.LineBox(WinInfo),
7779 urwid.LineBox(WinHexdump),
7782 header=urwid.Columns([
7783 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7784 ("weight", 1, WinInfoBar),
7785 ("weight", 1, WinHexdumpBar),
7787 footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7790 ("header", "bold", ""),
7791 ("constructed", "bold", ""),
7792 ("help", "light magenta", ""),
7793 ("warning", "light red", ""),
7794 ("defby", "light red", ""),
7795 ("eoc", "dark red", ""),
7796 ("select-value", "light green", ""),
7797 ("select-expl", "light red", ""),
7798 ("pg-normal", "", "light blue"),
7799 ("pg-complete", "black", "yellow"),
7804 def main(): # pragma: no cover
7806 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7807 parser.add_argument(
7811 help="Skip that number of bytes from the beginning",
7813 parser.add_argument(
7815 help="Python paths to dictionary with OIDs, comma separated",
7817 parser.add_argument(
7819 help="Python path to schema definition to use",
7821 parser.add_argument(
7822 "--defines-by-path",
7823 help="Python path to decoder's defines_by_path",
7825 parser.add_argument(
7827 action="store_true",
7828 help="Disallow BER encoding",
7830 parser.add_argument(
7831 "--print-decode-path",
7832 action="store_true",
7833 help="Print decode paths",
7835 parser.add_argument(
7836 "--decode-path-only",
7837 help="Print only specified decode path",
7839 parser.add_argument(
7841 action="store_true",
7842 help="Allow explicit tag out-of-bound",
7844 parser.add_argument(
7846 action="store_true",
7847 help="Turn on event generation mode",
7849 parser.add_argument(
7851 action="store_true",
7852 help="Start ASN.1 browser",
7854 parser.add_argument(
7856 type=argparse.FileType("rb"),
7857 help="Path to BER/CER/DER file you want to decode",
7859 args = parser.parse_args()
7861 args.RAWFile.seek(args.skip)
7862 raw = memoryview(args.RAWFile.read())
7863 args.RAWFile.close()
7865 raw = file_mmaped(args.RAWFile)[args.skip:]
7867 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7868 if args.oids else ()
7870 from functools import partial
7872 schema = obj_by_path(args.schema)
7873 pprinter = partial(pprint, big_blobs=True)
7875 schema, pprinter = generic_decoder()
7877 "bered": not args.nobered,
7878 "allow_expl_oob": args.allow_expl_oob,
7880 if args.defines_by_path is not None:
7881 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7883 obj, _ = schema().decode(raw, ctx=ctx)
7884 browse(raw, obj, oid_maps)
7885 from sys import exit as sys_exit
7887 from os import environ
7891 with_colours=environ.get("NO_COLOR") is None,
7892 with_decode_path=args.print_decode_path,
7894 () if args.decode_path_only is None else
7895 tuple(args.decode_path_only.split(":"))
7899 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7900 print(pprinter(obj, decode_path=decode_path))
7902 obj, tail = schema().decode(raw, ctx=ctx)
7903 print(pprinter(obj))
7905 print("\nTrailing data: %s" % hexenc(tail))
7908 if __name__ == "__main__":
7909 from pyderasn import *