3 # cython: language_level=3
4 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
5 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, version 3 of the License.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Lesser General Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Python ASN.1 DER/BER codec with abstract structures
20 This library allows you to marshal various structures in ASN.1 DER
21 format, unmarshal them in BER/CER/DER ones.
25 >>> Integer().decod(raw) == i
28 There are primitive types, holding single values
29 (:py:class:`pyderasn.BitString`,
30 :py:class:`pyderasn.Boolean`,
31 :py:class:`pyderasn.Enumerated`,
32 :py:class:`pyderasn.GeneralizedTime`,
33 :py:class:`pyderasn.Integer`,
34 :py:class:`pyderasn.Null`,
35 :py:class:`pyderasn.ObjectIdentifier`,
36 :py:class:`pyderasn.OctetString`,
37 :py:class:`pyderasn.UTCTime`,
38 :py:class:`various strings <pyderasn.CommonString>`
39 (:py:class:`pyderasn.BMPString`,
40 :py:class:`pyderasn.GeneralString`,
41 :py:class:`pyderasn.GraphicString`,
42 :py:class:`pyderasn.IA5String`,
43 :py:class:`pyderasn.ISO646String`,
44 :py:class:`pyderasn.NumericString`,
45 :py:class:`pyderasn.PrintableString`,
46 :py:class:`pyderasn.T61String`,
47 :py:class:`pyderasn.TeletexString`,
48 :py:class:`pyderasn.UniversalString`,
49 :py:class:`pyderasn.UTF8String`,
50 :py:class:`pyderasn.VideotexString`,
51 :py:class:`pyderasn.VisibleString`)),
52 constructed types, holding multiple primitive types
53 (:py:class:`pyderasn.Sequence`,
54 :py:class:`pyderasn.SequenceOf`,
55 :py:class:`pyderasn.Set`,
56 :py:class:`pyderasn.SetOf`),
57 and special types like
58 :py:class:`pyderasn.Any` and
59 :py:class:`pyderasn.Choice`.
67 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
68 the default tag used during coding process. You can override it with
69 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
70 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
71 argument or ``expl`` class attribute). Both arguments take raw binary
72 string, containing that tag. You can **not** set implicit and explicit
75 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
76 functions, allowing you to easily create ``CONTEXT``
77 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
82 EXPLICIT tags always have **constructed** tag. PyDERASN does not
83 explicitly check correctness of schema input here.
87 Implicit tags have **primitive** (``tag_ctxp``) encoding for
92 >>> Integer(impl=tag_ctxp(1))
94 >>> Integer(expl=tag_ctxc(2))
97 Implicit tag is not explicitly shown.
99 Two objects of the same type, but with different implicit/explicit tags
102 You can get object's effective tag (either default or implicited) through
103 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 >>> tag_decode(tag_ctxc(123))
108 >>> klass, form, num = tag_decode(tag_ctxc(123))
109 >>> klass == TagClassContext
111 >>> form == TagFormConstructed
114 To determine if object has explicit tag, use ``expled`` boolean property
115 and ``expl_tag`` property, returning explicit tag's value.
120 Many objects in sequences could be ``OPTIONAL`` and could have
121 ``DEFAULT`` value. You can specify that object's property using
122 corresponding keyword arguments.
124 >>> Integer(optional=True, default=123)
125 INTEGER 123 OPTIONAL DEFAULT
127 Those specifications do not play any role in primitive value encoding,
128 but are taken into account when dealing with sequences holding them. For
129 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 class Version(Integer):
138 class TBSCertificate(Sequence):
140 ("version", Version(expl=tag_ctxc(0), default="v1")),
143 When default argument is used and value is not specified, then it equals
151 Some objects give ability to set value size constraints. This is either
152 possible integer value, or allowed length of various strings and
153 sequences. Constraints are set in the following way::
158 And values satisfaction is checked as: ``MIN <= X <= MAX``.
160 For simplicity you can also set bounds the following way::
162 bounded_x = X(bounds=(MIN, MAX))
164 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
170 All objects have ``ready`` boolean property, that tells if object is
171 ready to be encoded. If that kind of action is performed on unready
172 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
174 All objects are friendly to ``copy.copy()`` and copied objects can be
177 Also all objects can be safely ``pickle``-d, but pay attention that
178 pickling among different PyDERASN versions is prohibited.
185 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
186 ``offset`` optional argument could be used to set initial object's
187 offset in the binary data, for convenience. It returns decoded object
188 and remaining unmarshalled data (tail). Internally all work is done on
189 ``memoryview(data)``, and you can leave returning tail as a memoryview,
190 by specifying ``leavemm=True`` argument.
192 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
193 immediately checks and raises if there is non-empty tail.
195 When object is decoded, ``decoded`` property is true and you can safely
196 use following properties:
198 * ``offset`` -- position including initial offset where object's tag starts
199 * ``tlen`` -- length of object's tag
200 * ``llen`` -- length of object's length value
201 * ``vlen`` -- length of object's value
202 * ``tlvlen`` -- length of the whole object
204 Pay attention that those values do **not** include anything related to
205 explicit tag. If you want to know information about it, then use:
207 * ``expled`` -- to know if explicit tag is set
208 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
212 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
214 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
224 You can specify so called context keyword argument during
225 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
226 various options governing decoding process.
228 Currently available context options:
230 * :ref:`allow_default_values <allow_default_values_ctx>`
231 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
232 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
233 * :ref:`bered <bered_ctx>`
234 * :ref:`defines_by_path <defines_by_path_ctx>`
235 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
242 All objects have ``pps()`` method, that is a generator of
243 :py:class:`pyderasn.PP` namedtuple, holding various raw information
244 about the object. If ``pps`` is called on sequences, then all underlying
245 ``PP`` will be yielded.
247 You can use :py:func:`pyderasn.pp_console_row` function, converting
248 those ``PP`` to human readable string. Actually exactly it is used for
249 all object ``repr``. But it is easy to write custom formatters.
251 >>> from pyderasn import pprint
252 >>> encoded = Integer(-12345).encode()
253 >>> obj, tail = Integer().decode(encoded)
254 >>> print(pprint(obj))
255 0 [1,1, 2] INTEGER -12345
259 Example certificate::
261 >>> print(pprint(crt))
262 0 [1,3,1604] Certificate SEQUENCE
263 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
264 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
265 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
266 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
267 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
268 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
270 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
271 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
272 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
273 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
274 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
275 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
276 . . . . . . . 13:02:45:53
278 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
279 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
280 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
282 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
283 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
284 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
289 Let's parse that output, human::
291 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
292 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
293 0 1 2 3 4 5 6 7 8 9 10 11
297 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
303 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
309 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
314 Offset of the object, where its DER/BER encoding begins.
315 Pay attention that it does **not** include explicit tag.
317 If explicit tag exists, then this is its length (tag + encoded length).
319 Length of object's tag. For example CHOICE does not have its own tag,
322 Length of encoded length.
324 Length of encoded value.
326 Visual indentation to show the depth of object in the hierarchy.
328 Object's name inside SEQUENCE/CHOICE.
330 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
331 here. "IMPLICIT" is omitted.
333 Object's class name, if set. Omitted if it is just an ordinary simple
334 value (like with ``algorithm`` in example above).
338 Object's value, if set. Can consist of multiple words (like OCTET/BIT
339 STRINGs above). We see ``v3`` value in Version, because it is named.
340 ``rdnSequence`` is the choice of CHOICE type.
342 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
343 default one, specified in the schema.
345 Shows does object contains any kind of BER encoded data (possibly
346 Sequence holding BER-encoded underlying value).
348 Only applicable to BER encoded data. Indefinite length encoding mark.
350 Only applicable to BER encoded data. If object has BER-specific
351 encoding, then ``BER`` will be shown. It does not depend on indefinite
352 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
353 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
361 ASN.1 structures often have ANY and OCTET STRING fields, that are
362 DEFINED BY some previously met ObjectIdentifier. This library provides
363 ability to specify mapping between some OID and field that must be
364 decoded with specific specification.
371 :py:class:`pyderasn.ObjectIdentifier` field inside
372 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
373 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 class ContentInfo(Sequence):
378 ("contentType", ContentType(defines=((("content",), {
379 id_digestedData: DigestedData(),
380 id_signedData: SignedData(),
382 ("content", Any(expl=tag_ctxc(0))),
385 ``contentType`` field tells that it defines that ``content`` must be
386 decoded with ``SignedData`` specification, if ``contentType`` equals to
387 ``id-signedData``. The same applies to ``DigestedData``. If
388 ``contentType`` contains unknown OID, then no automatic decoding is
391 You can specify multiple fields, that will be autodecoded -- that is why
392 ``defines`` kwarg is a sequence. You can specify defined field
393 relatively or absolutely to current decode path. For example ``defines``
394 for AlgorithmIdentifier of X.509's
395 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
399 id_ecPublicKey: ECParameters(),
400 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
402 (("..", "subjectPublicKey"), {
403 id_rsaEncryption: RSAPublicKey(),
404 id_GostR3410_2001: OctetString(),
408 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
409 autodecode its parameters inside SPKI's algorithm and its public key
412 Following types can be automatically decoded (DEFINED BY):
414 * :py:class:`pyderasn.Any`
415 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
416 * :py:class:`pyderasn.OctetString`
417 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
418 ``Any``/``BitString``/``OctetString``-s
420 When any of those fields is automatically decoded, then ``.defined``
421 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
422 was defined, ``value`` contains corresponding decoded value. For example
423 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
425 .. _defines_by_path_ctx:
427 defines_by_path context option
428 ______________________________
430 Sometimes you either can not or do not want to explicitly set *defines*
431 in the schema. You can dynamically apply those definitions when calling
432 :py:meth:`pyderasn.Obj.decode` method.
434 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
435 value must be sequence of following tuples::
437 (decode_path, defines)
439 where ``decode_path`` is a tuple holding so-called decode path to the
440 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
441 ``defines``, holding exactly the same value as accepted in its
442 :ref:`keyword argument <defines>`.
444 For example, again for CMS, you want to automatically decode
445 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
446 structures it may hold. Also, automatically decode ``controlSequence``
449 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452 ((("content",), {id_signedData: SignedData()}),),
457 DecodePathDefBy(id_signedData),
462 id_cct_PKIData: PKIData(),
463 id_cct_PKIResponse: PKIResponse(),
469 DecodePathDefBy(id_signedData),
472 DecodePathDefBy(id_cct_PKIResponse),
478 id_cmc_recipientNonce: RecipientNonce(),
479 id_cmc_senderNonce: SenderNonce(),
480 id_cmc_statusInfoV2: CMCStatusInfoV2(),
481 id_cmc_transactionId: TransactionId(),
486 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
487 First function is useful for path construction when some automatic
488 decoding is already done. ``any`` means literally any value it meet --
489 useful for SEQUENCE/SET OF-s.
496 By default PyDERASN accepts only DER encoded data. By default it encodes
497 to DER. But you can optionally enable BER decoding with setting
498 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
499 constructed primitive types should be parsed successfully.
501 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
502 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
503 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
504 ``UTCTime``, ``GeneralizedTime`` can contain it.
505 * If object has an indefinite length encoding, then its ``lenindef``
506 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
507 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
509 * If object has an indefinite length encoded explicit tag, then
510 ``expl_lenindef`` is set to True.
511 * If object has either any of BER-related encoding (explicit tag
512 indefinite length, object's indefinite length, BER-encoding) or any
513 underlying component has that kind of encoding, then ``bered``
514 attribute is set to True. For example SignedData CMS can have
515 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
516 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
518 EOC (end-of-contents) token's length is taken in advance in object's
521 .. _allow_expl_oob_ctx:
523 Allow explicit tag out-of-bound
524 -------------------------------
526 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
527 one value, more than one object. If you set ``allow_expl_oob`` context
528 option to True, then no error will be raised and that invalid encoding
529 will be silently further processed. But pay attention that offsets and
530 lengths will be invalid in that case.
534 This option should be used only for skipping some decode errors, just
535 to see the decoded structure somehow.
539 Streaming and dealing with huge structures
540 ------------------------------------------
547 ASN.1 structures can be huge, they can hold millions of objects inside
548 (for example Certificate Revocation Lists (CRL), holding revocation
549 state for every previously issued X.509 certificate). CACert.org's 8 MiB
550 CRL file takes more than half a gigabyte of memory to hold the decoded
553 If you just simply want to check the signature over the ``tbsCertList``,
554 you can create specialized schema with that field represented as
555 OctetString for example::
557 class TBSCertListFast(Sequence):
560 ("revokedCertificates", OctetString(
561 impl=SequenceOf.tag_default,
567 This allows you to quickly decode a few fields and check the signature
568 over the ``tbsCertList`` bytes.
570 But how can you get all certificate's serial number from it, after you
571 trust that CRL after signature validation? You can use so called
572 ``evgen`` (event generation) mode, to catch the events/facts of some
573 successful object decoding. Let's use command line capabilities::
575 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
576 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
577 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
578 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
579 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
580 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
581 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
582 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
583 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
585 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
586 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
587 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
590 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
591 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
592 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
596 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
597 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
598 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
599 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
600 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
601 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
602 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
603 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
604 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
605 0 [1,4,9145534] CertificateList SEQUENCE
607 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
608 decodes underlying values. It can not tell if SEQUENCE is decoded, so
609 the event of the upper level SEQUENCE is the last one we see.
610 ``version`` field is just a single INTEGER -- it is decoded and event is
611 fired immediately. Then we see that ``algorithm`` and ``parameters``
612 fields are decoded and only after them the ``signature`` SEQUENCE is
613 fired as a successfully decoded. There are 4 events for each revoked
614 certificate entry in that CRL: ``userCertificate`` serial number,
615 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
616 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
618 We can do that in our ordinary Python code and understand where we are
619 by looking at deterministically generated decode paths (do not forget
620 about useful ``--print-decode-path`` CLI option). We must use
621 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
622 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
623 obj, tail)`` tuples::
625 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
627 len(decode_path) == 4 and
628 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
629 decode_path[3] == "userCertificate"
631 print("serial number:", int(obj))
633 Virtually it does not take any memory except at least needed for single
634 object storage. You can easily use that mode to determine required
635 object ``.offset`` and ``.*len`` to be able to decode it separately, or
636 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 .. _evgen_mode_upto_ctx:
644 There is full ability to get any kind of data from the CRL in the
645 example above. However it is not too convenient to get the whole
646 ``RevokedCertificate`` structure, that is pretty lightweight and one may
647 do not want to disassemble it. You can use ``evgen_mode_upto``
648 :ref:`ctx <ctx>` option that semantically equals to
649 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
650 mapped to any non-None value. If specified decode path is met, then any
651 subsequent objects won't be decoded in evgen mode. That allows us to
652 parse the CRL above with fully assembled ``RevokedCertificate``::
654 for decode_path, obj, _ in CertificateList().decode_evgen(
656 ctx={"evgen_mode_upto": (
657 (("tbsCertList", "revokedCertificates", any), True),
661 len(decode_path) == 3 and
662 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
664 print("serial number:", int(obj["userCertificate"]))
671 POSIX compliant systems have ``mmap`` syscall, giving ability to work
672 the memory mapped file. You can deal with the file like it was an
673 ordinary binary string, allowing you not to load it to the memory first.
674 Also you can use them as an input for OCTET STRING, taking no Python
675 memory for their storage.
677 There is convenient :py:func:`pyderasn.file_mmaped` function that
678 creates read-only memoryview on the file contents::
680 with open("huge", "rb") as fd:
681 raw = file_mmaped(fd)
682 obj = Something.decode(raw)
686 mmap-ed files in Python2.7 does not implement buffer protocol, so
687 memoryview won't work on them.
691 mmap maps the **whole** file. So it plays no role if you seek-ed it
692 before. Take the slice of the resulting memoryview with required
697 If you use ZFS as underlying storage, then pay attention that
698 currently most platforms does not deal good with ZFS ARC and ordinary
699 page cache used for mmaps. It can take twice the necessary size in
700 the memory: both in page cache and ZFS ARC.
705 We can parse any kind of data now, but how can we produce files
706 streamingly, without storing their encoded representation in memory?
707 SEQUENCE by default encodes in memory all its values, joins them in huge
708 binary string, just to know the exact size of SEQUENCE's value for
709 encoding it in TLV. DER requires you to know all exact sizes of the
712 You can use CER encoding mode, that slightly differs from the DER, but
713 does not require exact sizes knowledge, allowing streaming encoding
714 directly to some writer/buffer. Just use
715 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
716 encoded data will flow::
718 opener = io.open if PY2 else open
719 with opener("result", "wb") as fd:
720 obj.encode_cer(fd.write)
725 obj.encode_cer(buf.write)
727 If you do not want to create in-memory buffer every time, then you can
728 use :py:func:`pyderasn.encode_cer` function::
730 data = encode_cer(obj)
732 Remember that CER is **not valid** DER in most cases, so you **have to**
733 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
734 decoding. Also currently there is **no** validation that provided CER is
735 valid one -- you are sure that it has only valid BER encoding.
739 SET OF values can not be streamingly encoded, because they are
740 required to be sorted byte-by-byte. Big SET OF values still will take
741 much memory. Use neither SET nor SET OF values, as modern ASN.1
744 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
745 OCTET STRINGs! They will be streamingly copied from underlying file to
746 the buffer using 1 KB chunks.
748 Some structures require that some of the elements have to be forcefully
749 DER encoded. For example ``SignedData`` CMS requires you to encode
750 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
751 encode everything else in BER. You can tell any of the structures to be
752 forcefully encoded in DER during CER encoding, by specifying
753 ``der_forced=True`` attribute::
755 class Certificate(Sequence):
759 class SignedAttributes(SetOf):
767 In most cases, huge quantity of binary data is stored as OCTET STRING.
768 CER encoding splits it on 1 KB chunks. BER allows splitting on various
769 levels of chunks inclusion::
771 SOME STRING[CONSTRUCTED]
772 OCTET STRING[CONSTRUCTED]
773 OCTET STRING[PRIMITIVE]
775 OCTET STRING[PRIMITIVE]
777 OCTET STRING[PRIMITIVE]
779 OCTET STRING[PRIMITIVE]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[CONSTRUCTED]
787 OCTET STRING[CONSTRUCTED]
788 OCTET STRING[PRIMITIVE]
791 You can not just take the offset and some ``.vlen`` of the STRING and
792 treat it as the payload. If you decode it without
793 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
794 and ``bytes()`` will give the whole payload contents.
796 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
797 small memory footprint. There is convenient
798 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
799 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
800 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
801 SHA512 digest of its ``eContent``::
803 fd = open("data.p7m", "rb")
804 raw = file_mmaped(fd)
805 ctx = {"bered": True}
806 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
807 if decode_path == ("content",):
811 raise ValueError("no content found")
812 hasher_state = sha512()
814 hasher_state.update(data)
816 evgens = SignedData().decode_evgen(
817 raw[content.offset:],
818 offset=content.offset,
821 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
823 digest = hasher_state.digest()
825 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
826 copy the payload (without BER/CER encoding interleaved overhead) in it.
827 Virtually it won't take memory more than for keeping small structures
828 and 1 KB binary chunks.
830 SEQUENCE OF iterators
831 _____________________
833 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
834 classes. The only difference with providing the full list of objects, is
835 that type and bounds checking is done during encoding process. Also
836 sequence's value will be emptied after encoding, forcing you to set its
839 This is very useful when you have to create some huge objects, like
840 CRLs, with thousands and millions of entities inside. You can write the
841 generator taking necessary data from the database and giving the
842 ``RevokedCertificate`` objects. Only binary representation of that
843 objects will take memory during DER encoding.
847 .. autoclass:: pyderasn.Obj
855 .. autoclass:: pyderasn.Boolean
860 .. autoclass:: pyderasn.Integer
861 :members: __init__, named
865 .. autoclass:: pyderasn.BitString
866 :members: __init__, bit_len, named
870 .. autoclass:: pyderasn.OctetString
875 .. autoclass:: pyderasn.Null
880 .. autoclass:: pyderasn.ObjectIdentifier
885 .. autoclass:: pyderasn.Enumerated
889 .. autoclass:: pyderasn.CommonString
893 .. autoclass:: pyderasn.NumericString
897 .. autoclass:: pyderasn.PrintableString
898 :members: __init__, allow_asterisk, allow_ampersand
902 .. autoclass:: pyderasn.UTCTime
903 :members: __init__, todatetime
907 .. autoclass:: pyderasn.GeneralizedTime
908 :members: __init__, todatetime
915 .. autoclass:: pyderasn.Choice
916 :members: __init__, choice, value
920 .. autoclass:: PrimitiveTypes
924 .. autoclass:: pyderasn.Any
932 .. autoclass:: pyderasn.Sequence
937 .. autoclass:: pyderasn.Set
942 .. autoclass:: pyderasn.SequenceOf
947 .. autoclass:: pyderasn.SetOf
953 .. autofunction:: pyderasn.abs_decode_path
954 .. autofunction:: pyderasn.agg_octet_string
955 .. autofunction:: pyderasn.colonize_hex
956 .. autofunction:: pyderasn.encode_cer
957 .. autofunction:: pyderasn.file_mmaped
958 .. autofunction:: pyderasn.hexenc
959 .. autofunction:: pyderasn.hexdec
960 .. autofunction:: pyderasn.tag_encode
961 .. autofunction:: pyderasn.tag_decode
962 .. autofunction:: pyderasn.tag_ctxp
963 .. autofunction:: pyderasn.tag_ctxc
964 .. autoclass:: pyderasn.DecodeError
966 .. autoclass:: pyderasn.NotEnoughData
967 .. autoclass:: pyderasn.ExceedingData
968 .. autoclass:: pyderasn.LenIndefForm
969 .. autoclass:: pyderasn.TagMismatch
970 .. autoclass:: pyderasn.InvalidLength
971 .. autoclass:: pyderasn.InvalidOID
972 .. autoclass:: pyderasn.ObjUnknown
973 .. autoclass:: pyderasn.ObjNotReady
974 .. autoclass:: pyderasn.InvalidValueType
975 .. autoclass:: pyderasn.BoundsError
982 You can decode DER/BER files using command line abilities::
984 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
986 If there is no schema for your file, then you can try parsing it without,
987 but of course IMPLICIT tags will often make it impossible. But result is
988 good enough for the certificate above::
990 $ python -m pyderasn path/to/file
991 0 [1,3,1604] . >: SEQUENCE OF
992 4 [1,3,1453] . . >: SEQUENCE OF
993 8 [0,0, 5] . . . . >: [0] ANY
994 . . . . . A0:03:02:01:02
995 13 [1,1, 3] . . . . >: INTEGER 61595
996 18 [1,1, 13] . . . . >: SEQUENCE OF
997 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
998 31 [1,1, 0] . . . . . . >: NULL
999 33 [1,3, 274] . . . . >: SEQUENCE OF
1000 37 [1,1, 11] . . . . . . >: SET OF
1001 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1002 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1003 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1005 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1006 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1007 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1008 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1009 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1010 . . . . . . . . . 61:2E:63:6F:6D:2F
1011 1461 [1,1, 13] . . >: SEQUENCE OF
1012 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1013 1474 [1,1, 0] . . . . >: NULL
1014 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1015 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1016 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1022 If you have got dictionaries with ObjectIdentifiers, like example one
1023 from ``tests/test_crts.py``::
1026 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1027 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1029 "2.5.4.10": "id-at-organizationName",
1030 "2.5.4.11": "id-at-organizationalUnitName",
1033 then you can pass it to pretty printer to see human readable OIDs::
1035 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1037 37 [1,1, 11] . . . . . . >: SET OF
1038 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1039 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1040 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1041 50 [1,1, 18] . . . . . . >: SET OF
1042 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1043 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1044 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1045 70 [1,1, 18] . . . . . . >: SET OF
1046 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1047 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1048 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1054 Each decoded element has so-called decode path: sequence of structure
1055 names it is passing during the decode process. Each element has its own
1056 unique path inside the whole ASN.1 tree. You can print it out with
1057 ``--print-decode-path`` option::
1059 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1060 0 [1,3,1604] Certificate SEQUENCE []
1061 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1062 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1063 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1064 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1065 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1066 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1068 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1069 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1070 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1071 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1072 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1073 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1074 . . . . . . . 13:02:45:53
1075 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1078 Now you can print only the specified tree, for example signature algorithm::
1080 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1081 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1082 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1083 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1087 from array import array
1088 from codecs import getdecoder
1089 from codecs import getencoder
1090 from collections import namedtuple
1091 from collections import OrderedDict
1092 from copy import copy
1093 from datetime import datetime
1094 from datetime import timedelta
1095 from io import BytesIO
1096 from math import ceil
1097 from mmap import mmap
1098 from mmap import PROT_READ
1099 from operator import attrgetter
1100 from string import ascii_letters
1101 from string import digits
1102 from sys import version_info
1103 from unicodedata import category as unicat
1105 from six import add_metaclass
1106 from six import binary_type
1107 from six import byte2int
1108 from six import indexbytes
1109 from six import int2byte
1110 from six import integer_types
1111 from six import iterbytes
1112 from six import iteritems
1113 from six import itervalues
1115 from six import string_types
1116 from six import text_type
1117 from six import unichr as six_unichr
1118 from six.moves import xrange as six_xrange
1122 from termcolor import colored
1123 except ImportError: # pragma: no cover
1124 def colored(what, *args, **kwargs):
1173 "TagClassApplication",
1176 "TagClassUniversal",
1177 "TagFormConstructed",
1188 TagClassUniversal = 0
1189 TagClassApplication = 1 << 6
1190 TagClassContext = 1 << 7
1191 TagClassPrivate = 1 << 6 | 1 << 7
1192 TagFormPrimitive = 0
1193 TagFormConstructed = 1 << 5
1195 TagClassContext: "",
1196 TagClassApplication: "APPLICATION ",
1197 TagClassPrivate: "PRIVATE ",
1198 TagClassUniversal: "UNIV ",
1202 LENINDEF = b"\x80" # length indefinite mark
1203 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1204 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1205 SET01 = frozenset("01")
1206 DECIMALS = frozenset(digits)
1207 DECIMAL_SIGNS = ".,"
1208 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1211 def file_mmaped(fd):
1212 """Make mmap-ed memoryview for reading from file
1214 :param fd: file object
1215 :returns: memoryview over read-only mmap-ing of the whole file
1217 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1220 if not set(value) <= DECIMALS:
1221 raise ValueError("non-pure integer")
1224 def fractions2float(fractions_raw):
1225 pureint(fractions_raw)
1226 return float("0." + fractions_raw)
1229 def get_def_by_path(defines_by_path, sub_decode_path):
1230 """Get define by decode path
1232 for path, define in defines_by_path:
1233 if len(path) != len(sub_decode_path):
1235 for p1, p2 in zip(path, sub_decode_path):
1236 if (not p1 is any) and (p1 != p2):
1242 ########################################################################
1244 ########################################################################
1246 class ASN1Error(ValueError):
1250 class DecodeError(ASN1Error):
1251 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1253 :param str msg: reason of decode failing
1254 :param klass: optional exact DecodeError inherited class (like
1255 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1256 :py:exc:`InvalidLength`)
1257 :param decode_path: tuple of strings. It contains human
1258 readable names of the fields through which
1259 decoding process has passed
1260 :param int offset: binary offset where failure happened
1262 super(DecodeError, self).__init__()
1265 self.decode_path = decode_path
1266 self.offset = offset
1271 "" if self.klass is None else self.klass.__name__,
1273 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1274 if len(self.decode_path) > 0 else ""
1276 ("(at %d)" % self.offset) if self.offset > 0 else "",
1282 return "%s(%s)" % (self.__class__.__name__, self)
1285 class NotEnoughData(DecodeError):
1289 class ExceedingData(ASN1Error):
1290 def __init__(self, nbytes):
1291 super(ExceedingData, self).__init__()
1292 self.nbytes = nbytes
1295 return "%d trailing bytes" % self.nbytes
1298 return "%s(%s)" % (self.__class__.__name__, self)
1301 class LenIndefForm(DecodeError):
1305 class TagMismatch(DecodeError):
1309 class InvalidLength(DecodeError):
1313 class InvalidOID(DecodeError):
1317 class ObjUnknown(ASN1Error):
1318 def __init__(self, name):
1319 super(ObjUnknown, self).__init__()
1323 return "object is unknown: %s" % self.name
1326 return "%s(%s)" % (self.__class__.__name__, self)
1329 class ObjNotReady(ASN1Error):
1330 def __init__(self, name):
1331 super(ObjNotReady, self).__init__()
1335 return "object is not ready: %s" % self.name
1338 return "%s(%s)" % (self.__class__.__name__, self)
1341 class InvalidValueType(ASN1Error):
1342 def __init__(self, expected_types):
1343 super(InvalidValueType, self).__init__()
1344 self.expected_types = expected_types
1347 return "invalid value type, expected: %s" % ", ".join(
1348 [repr(t) for t in self.expected_types]
1352 return "%s(%s)" % (self.__class__.__name__, self)
1355 class BoundsError(ASN1Error):
1356 def __init__(self, bound_min, value, bound_max):
1357 super(BoundsError, self).__init__()
1358 self.bound_min = bound_min
1360 self.bound_max = bound_max
1363 return "unsatisfied bounds: %s <= %s <= %s" % (
1370 return "%s(%s)" % (self.__class__.__name__, self)
1373 ########################################################################
1375 ########################################################################
1377 _hexdecoder = getdecoder("hex")
1378 _hexencoder = getencoder("hex")
1382 """Binary data to hexadecimal string convert
1384 return _hexdecoder(data)[0]
1388 """Hexadecimal string to binary data convert
1390 return _hexencoder(data)[0].decode("ascii")
1393 def int_bytes_len(num, byte_len=8):
1396 return int(ceil(float(num.bit_length()) / byte_len))
1399 def zero_ended_encode(num):
1400 octets = bytearray(int_bytes_len(num, 7))
1402 octets[i] = num & 0x7F
1406 octets[i] = 0x80 | (num & 0x7F)
1409 return bytes(octets)
1412 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1413 """Encode tag to binary form
1415 :param int num: tag's number
1416 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1417 :py:data:`pyderasn.TagClassContext`,
1418 :py:data:`pyderasn.TagClassApplication`,
1419 :py:data:`pyderasn.TagClassPrivate`)
1420 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1421 :py:data:`pyderasn.TagFormConstructed`)
1425 return int2byte(klass | form | num)
1426 # [XX|X|11111][1.......][1.......] ... [0.......]
1427 return int2byte(klass | form | 31) + zero_ended_encode(num)
1430 def tag_decode(tag):
1431 """Decode tag from binary form
1435 No validation is performed, assuming that it has already passed.
1437 It returns tuple with three integers, as
1438 :py:func:`pyderasn.tag_encode` accepts.
1440 first_octet = byte2int(tag)
1441 klass = first_octet & 0xC0
1442 form = first_octet & 0x20
1443 if first_octet & 0x1F < 0x1F:
1444 return (klass, form, first_octet & 0x1F)
1446 for octet in iterbytes(tag[1:]):
1449 return (klass, form, num)
1453 """Create CONTEXT PRIMITIVE tag
1455 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1459 """Create CONTEXT CONSTRUCTED tag
1461 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1464 def tag_strip(data):
1465 """Take off tag from the data
1467 :returns: (encoded tag, tag length, remaining data)
1470 raise NotEnoughData("no data at all")
1471 if byte2int(data) & 0x1F < 31:
1472 return data[:1], 1, data[1:]
1477 raise DecodeError("unfinished tag")
1478 if indexbytes(data, i) & 0x80 == 0:
1481 return data[:i], i, data[i:]
1487 octets = bytearray(int_bytes_len(l) + 1)
1488 octets[0] = 0x80 | (len(octets) - 1)
1489 for i in six_xrange(len(octets) - 1, 0, -1):
1490 octets[i] = l & 0xFF
1492 return bytes(octets)
1495 def len_decode(data):
1498 :returns: (decoded length, length's length, remaining data)
1499 :raises LenIndefForm: if indefinite form encoding is met
1502 raise NotEnoughData("no data at all")
1503 first_octet = byte2int(data)
1504 if first_octet & 0x80 == 0:
1505 return first_octet, 1, data[1:]
1506 octets_num = first_octet & 0x7F
1507 if octets_num + 1 > len(data):
1508 raise NotEnoughData("encoded length is longer than data")
1510 raise LenIndefForm()
1511 if byte2int(data[1:]) == 0:
1512 raise DecodeError("leading zeros")
1514 for v in iterbytes(data[1:1 + octets_num]):
1517 raise DecodeError("long form instead of short one")
1518 return l, 1 + octets_num, data[1 + octets_num:]
1521 LEN1K = len_encode(1000)
1524 def write_full(writer, data):
1525 """Fully write provided data
1527 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1529 BytesIO does not guarantee that the whole data will be written at
1530 once. That function write everything provided, raising an error if
1531 ``writer`` returns None.
1533 data = memoryview(data)
1535 while written != len(data):
1536 n = writer(data[written:])
1538 raise ValueError("can not write to buf")
1542 ########################################################################
1544 ########################################################################
1546 class AutoAddSlots(type):
1547 def __new__(cls, name, bases, _dict):
1548 _dict["__slots__"] = _dict.get("__slots__", ())
1549 return type.__new__(cls, name, bases, _dict)
1552 BasicState = namedtuple("BasicState", (
1565 ), **NAMEDTUPLE_KWARGS)
1568 @add_metaclass(AutoAddSlots)
1570 """Common ASN.1 object class
1572 All ASN.1 types are inherited from it. It has metaclass that
1573 automatically adds ``__slots__`` to all inherited classes.
1598 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1599 self._expl = getattr(self, "expl", None) if expl is None else expl
1600 if self.tag != self.tag_default and self._expl is not None:
1601 raise ValueError("implicit and explicit tags can not be set simultaneously")
1602 if self.tag is None:
1603 self._tag_order = None
1605 tag_class, _, tag_num = tag_decode(
1606 self.tag if self._expl is None else self._expl
1608 self._tag_order = (tag_class, tag_num)
1609 if default is not None:
1611 self.optional = optional
1612 self.offset, self.llen, self.vlen = _decoded
1614 self.expl_lenindef = False
1615 self.lenindef = False
1616 self.ber_encoded = False
1619 def ready(self): # pragma: no cover
1620 """Is object ready to be encoded?
1622 raise NotImplementedError()
1624 def _assert_ready(self):
1626 raise ObjNotReady(self.__class__.__name__)
1630 """Is either object or any elements inside is BER encoded?
1632 return self.expl_lenindef or self.lenindef or self.ber_encoded
1636 """Is object decoded?
1638 return (self.llen + self.vlen) > 0
1640 def __getstate__(self): # pragma: no cover
1641 """Used for making safe to be mutable pickleable copies
1643 raise NotImplementedError()
1645 def __setstate__(self, state):
1646 if state.version != __version__:
1647 raise ValueError("data is pickled by different PyDERASN version")
1648 self.tag = state.tag
1649 self._tag_order = state.tag_order
1650 self._expl = state.expl
1651 self.default = state.default
1652 self.optional = state.optional
1653 self.offset = state.offset
1654 self.llen = state.llen
1655 self.vlen = state.vlen
1656 self.expl_lenindef = state.expl_lenindef
1657 self.lenindef = state.lenindef
1658 self.ber_encoded = state.ber_encoded
1661 def tag_order(self):
1662 """Tag's (class, number) used for DER/CER sorting
1664 return self._tag_order
1667 def tag_order_cer(self):
1668 return self.tag_order
1672 """See :ref:`decoding`
1674 return len(self.tag)
1678 """See :ref:`decoding`
1680 return self.tlen + self.llen + self.vlen
1682 def __str__(self): # pragma: no cover
1683 return self.__bytes__() if PY2 else self.__unicode__()
1685 def __ne__(self, their):
1686 return not(self == their)
1688 def __gt__(self, their): # pragma: no cover
1689 return not(self < their)
1691 def __le__(self, their): # pragma: no cover
1692 return (self == their) or (self < their)
1694 def __ge__(self, their): # pragma: no cover
1695 return (self == their) or (self > their)
1697 def _encode(self): # pragma: no cover
1698 raise NotImplementedError()
1700 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1701 yield NotImplemented
1704 """DER encode the structure
1706 :returns: DER representation
1708 raw = self._encode()
1709 if self._expl is None:
1711 return b"".join((self._expl, len_encode(len(raw)), raw))
1713 def encode_cer(self, writer):
1714 """CER encode the structure to specified writer
1716 :param writer: must comply with ``io.RawIOBase.write``
1717 behaviour. It takes slice to be written and
1718 returns number of bytes processed. If it returns
1719 None, then exception will be raised
1721 if self._expl is not None:
1722 write_full(writer, self._expl + LENINDEF)
1723 if getattr(self, "der_forced", False):
1724 write_full(writer, self._encode())
1726 self._encode_cer(writer)
1727 if self._expl is not None:
1728 write_full(writer, EOC)
1730 def _encode_cer(self, writer):
1731 write_full(writer, self._encode())
1733 def hexencode(self):
1734 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1736 return hexenc(self.encode())
1746 _ctx_immutable=True,
1750 :param data: either binary or memoryview
1751 :param int offset: initial data's offset
1752 :param bool leavemm: do we need to leave memoryview of remaining
1753 data as is, or convert it to bytes otherwise
1754 :param decode_path: current decode path (tuples of strings,
1755 possibly with DecodePathDefBy) with will be
1756 the root for all underlying objects
1757 :param ctx: optional :ref:`context <ctx>` governing decoding process
1758 :param bool tag_only: decode only the tag, without length and
1759 contents (used only in Choice and Set
1760 structures, trying to determine if tag satisfies
1762 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1764 :returns: (Obj, remaining data)
1766 .. seealso:: :ref:`decoding`
1768 result = next(self.decode_evgen(
1780 _, obj, tail = result
1791 _ctx_immutable=True,
1794 """Decode with evgen mode on
1796 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1797 it returns the generator producing ``(decode_path, obj, tail)``
1798 values. See :ref:`evgen mode <evgen_mode>`.
1802 elif _ctx_immutable:
1804 tlv = memoryview(data)
1807 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1810 if self._expl is None:
1811 for result in self._decode(
1814 decode_path=decode_path,
1817 evgen_mode=_evgen_mode,
1822 _decode_path, obj, tail = result
1823 if not _decode_path is decode_path:
1827 t, tlen, lv = tag_strip(tlv)
1828 except DecodeError as err:
1829 raise err.__class__(
1831 klass=self.__class__,
1832 decode_path=decode_path,
1837 klass=self.__class__,
1838 decode_path=decode_path,
1842 l, llen, v = len_decode(lv)
1843 except LenIndefForm as err:
1844 if not ctx.get("bered", False):
1845 raise err.__class__(
1847 klass=self.__class__,
1848 decode_path=decode_path,
1852 offset += tlen + llen
1853 for result in self._decode(
1856 decode_path=decode_path,
1859 evgen_mode=_evgen_mode,
1861 if tag_only: # pragma: no cover
1864 _decode_path, obj, tail = result
1865 if not _decode_path is decode_path:
1867 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1868 if eoc_expected.tobytes() != EOC:
1871 klass=self.__class__,
1872 decode_path=decode_path,
1876 obj.expl_lenindef = True
1877 except DecodeError as err:
1878 raise err.__class__(
1880 klass=self.__class__,
1881 decode_path=decode_path,
1886 raise NotEnoughData(
1887 "encoded length is longer than data",
1888 klass=self.__class__,
1889 decode_path=decode_path,
1892 for result in self._decode(
1894 offset=offset + tlen + llen,
1895 decode_path=decode_path,
1898 evgen_mode=_evgen_mode,
1900 if tag_only: # pragma: no cover
1903 _decode_path, obj, tail = result
1904 if not _decode_path is decode_path:
1906 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
1908 "explicit tag out-of-bound, longer than data",
1909 klass=self.__class__,
1910 decode_path=decode_path,
1913 yield decode_path, obj, (tail if leavemm else tail.tobytes())
1915 def decod(self, data, offset=0, decode_path=(), ctx=None):
1916 """Decode the data, check that tail is empty
1918 :raises ExceedingData: if tail is not empty
1920 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
1921 (decode without tail) that also checks that there is no
1924 obj, tail = self.decode(
1927 decode_path=decode_path,
1932 raise ExceedingData(len(tail))
1935 def hexdecode(self, data, *args, **kwargs):
1936 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
1938 return self.decode(hexdec(data), *args, **kwargs)
1940 def hexdecod(self, data, *args, **kwargs):
1941 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
1943 return self.decod(hexdec(data), *args, **kwargs)
1947 """See :ref:`decoding`
1949 return self._expl is not None
1953 """See :ref:`decoding`
1958 def expl_tlen(self):
1959 """See :ref:`decoding`
1961 return len(self._expl)
1964 def expl_llen(self):
1965 """See :ref:`decoding`
1967 if self.expl_lenindef:
1969 return len(len_encode(self.tlvlen))
1972 def expl_offset(self):
1973 """See :ref:`decoding`
1975 return self.offset - self.expl_tlen - self.expl_llen
1978 def expl_vlen(self):
1979 """See :ref:`decoding`
1984 def expl_tlvlen(self):
1985 """See :ref:`decoding`
1987 return self.expl_tlen + self.expl_llen + self.expl_vlen
1990 def fulloffset(self):
1991 """See :ref:`decoding`
1993 return self.expl_offset if self.expled else self.offset
1997 """See :ref:`decoding`
1999 return self.expl_tlvlen if self.expled else self.tlvlen
2001 def pps_lenindef(self, decode_path):
2002 if self.lenindef and not (
2003 getattr(self, "defined", None) is not None and
2004 self.defined[1].lenindef
2007 asn1_type_name="EOC",
2009 decode_path=decode_path,
2011 self.offset + self.tlvlen -
2012 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2020 if self.expl_lenindef:
2022 asn1_type_name="EOC",
2023 obj_name="EXPLICIT",
2024 decode_path=decode_path,
2025 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2034 def encode_cer(obj):
2035 """Encode to CER in memory buffer
2037 :returns bytes: memory buffer contents
2040 obj.encode_cer(buf.write)
2041 return buf.getvalue()
2044 class DecodePathDefBy(object):
2045 """DEFINED BY representation inside decode path
2047 __slots__ = ("defined_by",)
2049 def __init__(self, defined_by):
2050 self.defined_by = defined_by
2052 def __ne__(self, their):
2053 return not(self == their)
2055 def __eq__(self, their):
2056 if not isinstance(their, self.__class__):
2058 return self.defined_by == their.defined_by
2061 return "DEFINED BY " + str(self.defined_by)
2064 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2067 ########################################################################
2069 ########################################################################
2071 PP = namedtuple("PP", (
2094 ), **NAMEDTUPLE_KWARGS)
2099 asn1_type_name="unknown",
2116 expl_lenindef=False,
2147 def _colourize(what, colour, with_colours, attrs=("bold",)):
2148 return colored(what, colour, attrs=attrs) if with_colours else what
2151 def colonize_hex(hexed):
2152 """Separate hexadecimal string with colons
2154 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2163 with_decode_path=False,
2164 decode_path_len_decrease=0,
2171 " " if pp.expl_offset is None else
2172 ("-%d" % (pp.offset - pp.expl_offset))
2174 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2176 col = _colourize(col, "red", with_colours, ())
2177 col += _colourize("B", "red", with_colours) if pp.bered else " "
2179 col = "[%d,%d,%4d]%s" % (
2183 LENINDEF_PP_CHAR if pp.lenindef else " "
2185 col = _colourize(col, "green", with_colours, ())
2187 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2188 if decode_path_len > 0:
2189 cols.append(" ." * decode_path_len)
2190 ent = pp.decode_path[-1]
2191 if isinstance(ent, DecodePathDefBy):
2192 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2193 value = str(ent.defined_by)
2196 len(oid_maps) > 0 and
2197 ent.defined_by.asn1_type_name ==
2198 ObjectIdentifier.asn1_type_name
2200 for oid_map in oid_maps:
2201 oid_name = oid_map.get(value)
2202 if oid_name is not None:
2203 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2205 if oid_name is None:
2206 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2208 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2209 if pp.expl is not None:
2210 klass, _, num = pp.expl
2211 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2212 cols.append(_colourize(col, "blue", with_colours))
2213 if pp.impl is not None:
2214 klass, _, num = pp.impl
2215 col = "[%s%d]" % (TagClassReprs[klass], num)
2216 cols.append(_colourize(col, "blue", with_colours))
2217 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2218 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2220 cols.append(_colourize("BER", "red", with_colours))
2221 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2222 if pp.value is not None:
2224 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2226 len(oid_maps) > 0 and
2227 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2229 for oid_map in oid_maps:
2230 oid_name = oid_map.get(value)
2231 if oid_name is not None:
2232 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2234 if pp.asn1_type_name == Integer.asn1_type_name:
2235 hex_repr = hex(int(pp.obj._value))[2:].upper()
2236 if len(hex_repr) % 2 != 0:
2237 hex_repr = "0" + hex_repr
2238 cols.append(_colourize(
2239 "(%s)" % colonize_hex(hex_repr),
2244 if pp.blob.__class__ == binary_type:
2245 cols.append(hexenc(pp.blob))
2246 elif pp.blob.__class__ == tuple:
2247 cols.append(", ".join(pp.blob))
2249 cols.append(_colourize("OPTIONAL", "red", with_colours))
2251 cols.append(_colourize("DEFAULT", "red", with_colours))
2252 if with_decode_path:
2253 cols.append(_colourize(
2254 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2258 return " ".join(cols)
2261 def pp_console_blob(pp, decode_path_len_decrease=0):
2262 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2263 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2264 if decode_path_len > 0:
2265 cols.append(" ." * (decode_path_len + 1))
2266 if pp.blob.__class__ == binary_type:
2267 blob = hexenc(pp.blob).upper()
2268 for i in six_xrange(0, len(blob), 32):
2269 chunk = blob[i:i + 32]
2270 yield " ".join(cols + [colonize_hex(chunk)])
2271 elif pp.blob.__class__ == tuple:
2272 yield " ".join(cols + [", ".join(pp.blob)])
2280 with_decode_path=False,
2281 decode_path_only=(),
2284 """Pretty print object
2286 :param Obj obj: object you want to pretty print
2287 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2288 Its human readable form is printed when OID is met
2289 :param big_blobs: if large binary objects are met (like OctetString
2290 values), do we need to print them too, on separate
2292 :param with_colours: colourize output, if ``termcolor`` library
2294 :param with_decode_path: print decode path
2295 :param decode_path_only: print only that specified decode path
2297 def _pprint_pps(pps):
2299 if hasattr(pp, "_fields"):
2301 decode_path_only != () and
2303 str(p) for p in pp.decode_path[:len(decode_path_only)]
2304 ) != decode_path_only
2308 yield pp_console_row(
2313 with_colours=with_colours,
2314 with_decode_path=with_decode_path,
2315 decode_path_len_decrease=len(decode_path_only),
2317 for row in pp_console_blob(
2319 decode_path_len_decrease=len(decode_path_only),
2323 yield pp_console_row(
2328 with_colours=with_colours,
2329 with_decode_path=with_decode_path,
2330 decode_path_len_decrease=len(decode_path_only),
2333 for row in _pprint_pps(pp):
2335 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2338 ########################################################################
2339 # ASN.1 primitive types
2340 ########################################################################
2342 BooleanState = namedtuple(
2344 BasicState._fields + ("value",),
2350 """``BOOLEAN`` boolean type
2352 >>> b = Boolean(True)
2354 >>> b == Boolean(True)
2360 tag_default = tag_encode(1)
2361 asn1_type_name = "BOOLEAN"
2373 :param value: set the value. Either boolean type, or
2374 :py:class:`pyderasn.Boolean` object
2375 :param bytes impl: override default tag with ``IMPLICIT`` one
2376 :param bytes expl: override default tag with ``EXPLICIT`` one
2377 :param default: set default value. Type same as in ``value``
2378 :param bool optional: is object ``OPTIONAL`` in sequence
2380 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2381 self._value = None if value is None else self._value_sanitize(value)
2382 if default is not None:
2383 default = self._value_sanitize(default)
2384 self.default = self.__class__(
2390 self._value = default
2392 def _value_sanitize(self, value):
2393 if value.__class__ == bool:
2395 if issubclass(value.__class__, Boolean):
2397 raise InvalidValueType((self.__class__, bool))
2401 return self._value is not None
2403 def __getstate__(self):
2404 return BooleanState(
2420 def __setstate__(self, state):
2421 super(Boolean, self).__setstate__(state)
2422 self._value = state.value
2424 def __nonzero__(self):
2425 self._assert_ready()
2429 self._assert_ready()
2432 def __eq__(self, their):
2433 if their.__class__ == bool:
2434 return self._value == their
2435 if not issubclass(their.__class__, Boolean):
2438 self._value == their._value and
2439 self.tag == their.tag and
2440 self._expl == their._expl
2451 return self.__class__(
2453 impl=self.tag if impl is None else impl,
2454 expl=self._expl if expl is None else expl,
2455 default=self.default if default is None else default,
2456 optional=self.optional if optional is None else optional,
2460 self._assert_ready()
2464 (b"\xFF" if self._value else b"\x00"),
2467 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2469 t, _, lv = tag_strip(tlv)
2470 except DecodeError as err:
2471 raise err.__class__(
2473 klass=self.__class__,
2474 decode_path=decode_path,
2479 klass=self.__class__,
2480 decode_path=decode_path,
2487 l, _, v = len_decode(lv)
2488 except DecodeError as err:
2489 raise err.__class__(
2491 klass=self.__class__,
2492 decode_path=decode_path,
2496 raise InvalidLength(
2497 "Boolean's length must be equal to 1",
2498 klass=self.__class__,
2499 decode_path=decode_path,
2503 raise NotEnoughData(
2504 "encoded length is longer than data",
2505 klass=self.__class__,
2506 decode_path=decode_path,
2509 first_octet = byte2int(v)
2511 if first_octet == 0:
2513 elif first_octet == 0xFF:
2515 elif ctx.get("bered", False):
2520 "unacceptable Boolean value",
2521 klass=self.__class__,
2522 decode_path=decode_path,
2525 obj = self.__class__(
2529 default=self.default,
2530 optional=self.optional,
2531 _decoded=(offset, 1, 1),
2533 obj.ber_encoded = ber_encoded
2534 yield decode_path, obj, v[1:]
2537 return pp_console_row(next(self.pps()))
2539 def pps(self, decode_path=()):
2542 asn1_type_name=self.asn1_type_name,
2543 obj_name=self.__class__.__name__,
2544 decode_path=decode_path,
2545 value=str(self._value) if self.ready else None,
2546 optional=self.optional,
2547 default=self == self.default,
2548 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2549 expl=None if self._expl is None else tag_decode(self._expl),
2554 expl_offset=self.expl_offset if self.expled else None,
2555 expl_tlen=self.expl_tlen if self.expled else None,
2556 expl_llen=self.expl_llen if self.expled else None,
2557 expl_vlen=self.expl_vlen if self.expled else None,
2558 expl_lenindef=self.expl_lenindef,
2559 ber_encoded=self.ber_encoded,
2562 for pp in self.pps_lenindef(decode_path):
2566 IntegerState = namedtuple(
2568 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2574 """``INTEGER`` integer type
2576 >>> b = Integer(-123)
2578 >>> b == Integer(-123)
2583 >>> Integer(2, bounds=(1, 3))
2585 >>> Integer(5, bounds=(1, 3))
2586 Traceback (most recent call last):
2587 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2591 class Version(Integer):
2598 >>> v = Version("v1")
2605 {'v3': 2, 'v1': 0, 'v2': 1}
2607 __slots__ = ("specs", "_bound_min", "_bound_max")
2608 tag_default = tag_encode(2)
2609 asn1_type_name = "INTEGER"
2623 :param value: set the value. Either integer type, named value
2624 (if ``schema`` is specified in the class), or
2625 :py:class:`pyderasn.Integer` object
2626 :param bounds: set ``(MIN, MAX)`` value constraint.
2627 (-inf, +inf) by default
2628 :param bytes impl: override default tag with ``IMPLICIT`` one
2629 :param bytes expl: override default tag with ``EXPLICIT`` one
2630 :param default: set default value. Type same as in ``value``
2631 :param bool optional: is object ``OPTIONAL`` in sequence
2633 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2635 specs = getattr(self, "schema", {}) if _specs is None else _specs
2636 self.specs = specs if specs.__class__ == dict else dict(specs)
2637 self._bound_min, self._bound_max = getattr(
2640 (float("-inf"), float("+inf")),
2641 ) if bounds is None else bounds
2642 if value is not None:
2643 self._value = self._value_sanitize(value)
2644 if default is not None:
2645 default = self._value_sanitize(default)
2646 self.default = self.__class__(
2652 if self._value is None:
2653 self._value = default
2655 def _value_sanitize(self, value):
2656 if isinstance(value, integer_types):
2658 elif issubclass(value.__class__, Integer):
2659 value = value._value
2660 elif value.__class__ == str:
2661 value = self.specs.get(value)
2663 raise ObjUnknown("integer value: %s" % value)
2665 raise InvalidValueType((self.__class__, int, str))
2666 if not self._bound_min <= value <= self._bound_max:
2667 raise BoundsError(self._bound_min, value, self._bound_max)
2672 return self._value is not None
2674 def __getstate__(self):
2675 return IntegerState(
2694 def __setstate__(self, state):
2695 super(Integer, self).__setstate__(state)
2696 self.specs = state.specs
2697 self._value = state.value
2698 self._bound_min = state.bound_min
2699 self._bound_max = state.bound_max
2702 self._assert_ready()
2703 return int(self._value)
2706 self._assert_ready()
2709 bytes(self._expl or b"") +
2710 str(self._value).encode("ascii"),
2713 def __eq__(self, their):
2714 if isinstance(their, integer_types):
2715 return self._value == their
2716 if not issubclass(their.__class__, Integer):
2719 self._value == their._value and
2720 self.tag == their.tag and
2721 self._expl == their._expl
2724 def __lt__(self, their):
2725 return self._value < their._value
2729 """Return named representation (if exists) of the value
2731 for name, value in iteritems(self.specs):
2732 if value == self._value:
2745 return self.__class__(
2748 (self._bound_min, self._bound_max)
2749 if bounds is None else bounds
2751 impl=self.tag if impl is None else impl,
2752 expl=self._expl if expl is None else expl,
2753 default=self.default if default is None else default,
2754 optional=self.optional if optional is None else optional,
2759 self._assert_ready()
2763 octets = bytearray([0])
2767 octets = bytearray()
2769 octets.append((value & 0xFF) ^ 0xFF)
2771 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2774 octets = bytearray()
2776 octets.append(value & 0xFF)
2778 if octets[-1] & 0x80 > 0:
2781 octets = bytes(octets)
2783 bytes_len = ceil(value.bit_length() / 8) or 1
2786 octets = value.to_bytes(
2791 except OverflowError:
2795 return b"".join((self.tag, len_encode(len(octets)), octets))
2797 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2799 t, _, lv = tag_strip(tlv)
2800 except DecodeError as err:
2801 raise err.__class__(
2803 klass=self.__class__,
2804 decode_path=decode_path,
2809 klass=self.__class__,
2810 decode_path=decode_path,
2817 l, llen, v = len_decode(lv)
2818 except DecodeError as err:
2819 raise err.__class__(
2821 klass=self.__class__,
2822 decode_path=decode_path,
2826 raise NotEnoughData(
2827 "encoded length is longer than data",
2828 klass=self.__class__,
2829 decode_path=decode_path,
2833 raise NotEnoughData(
2835 klass=self.__class__,
2836 decode_path=decode_path,
2839 v, tail = v[:l], v[l:]
2840 first_octet = byte2int(v)
2842 second_octet = byte2int(v[1:])
2844 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2845 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
2848 "non normalized integer",
2849 klass=self.__class__,
2850 decode_path=decode_path,
2855 if first_octet & 0x80 > 0:
2856 octets = bytearray()
2857 for octet in bytearray(v):
2858 octets.append(octet ^ 0xFF)
2859 for octet in octets:
2860 value = (value << 8) | octet
2864 for octet in bytearray(v):
2865 value = (value << 8) | octet
2867 value = int.from_bytes(v, byteorder="big", signed=True)
2869 obj = self.__class__(
2871 bounds=(self._bound_min, self._bound_max),
2874 default=self.default,
2875 optional=self.optional,
2877 _decoded=(offset, llen, l),
2879 except BoundsError as err:
2882 klass=self.__class__,
2883 decode_path=decode_path,
2886 yield decode_path, obj, tail
2889 return pp_console_row(next(self.pps()))
2891 def pps(self, decode_path=()):
2894 asn1_type_name=self.asn1_type_name,
2895 obj_name=self.__class__.__name__,
2896 decode_path=decode_path,
2897 value=(self.named or str(self._value)) if self.ready else None,
2898 optional=self.optional,
2899 default=self == self.default,
2900 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2901 expl=None if self._expl is None else tag_decode(self._expl),
2906 expl_offset=self.expl_offset if self.expled else None,
2907 expl_tlen=self.expl_tlen if self.expled else None,
2908 expl_llen=self.expl_llen if self.expled else None,
2909 expl_vlen=self.expl_vlen if self.expled else None,
2910 expl_lenindef=self.expl_lenindef,
2913 for pp in self.pps_lenindef(decode_path):
2917 BitStringState = namedtuple(
2919 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
2924 class BitString(Obj):
2925 """``BIT STRING`` bit string type
2927 >>> BitString(b"hello world")
2928 BIT STRING 88 bits 68656c6c6f20776f726c64
2931 >>> b == b"hello world"
2936 >>> BitString("'0A3B5F291CD'H")
2937 BIT STRING 44 bits 0a3b5f291cd0
2938 >>> b = BitString("'010110000000'B")
2939 BIT STRING 12 bits 5800
2942 >>> b[0], b[1], b[2], b[3]
2943 (False, True, False, True)
2947 [False, True, False, True, True, False, False, False, False, False, False, False]
2951 class KeyUsage(BitString):
2953 ("digitalSignature", 0),
2954 ("nonRepudiation", 1),
2955 ("keyEncipherment", 2),
2958 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
2959 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
2961 ['nonRepudiation', 'keyEncipherment']
2963 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
2967 Pay attention that BIT STRING can be encoded both in primitive
2968 and constructed forms. Decoder always checks constructed form tag
2969 additionally to specified primitive one. If BER decoding is
2970 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
2971 of DER restrictions.
2973 __slots__ = ("tag_constructed", "specs", "defined")
2974 tag_default = tag_encode(3)
2975 asn1_type_name = "BIT STRING"
2988 :param value: set the value. Either binary type, tuple of named
2989 values (if ``schema`` is specified in the class),
2990 string in ``'XXX...'B`` form, or
2991 :py:class:`pyderasn.BitString` object
2992 :param bytes impl: override default tag with ``IMPLICIT`` one
2993 :param bytes expl: override default tag with ``EXPLICIT`` one
2994 :param default: set default value. Type same as in ``value``
2995 :param bool optional: is object ``OPTIONAL`` in sequence
2997 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
2998 specs = getattr(self, "schema", {}) if _specs is None else _specs
2999 self.specs = specs if specs.__class__ == dict else dict(specs)
3000 self._value = None if value is None else self._value_sanitize(value)
3001 if default is not None:
3002 default = self._value_sanitize(default)
3003 self.default = self.__class__(
3009 self._value = default
3011 tag_klass, _, tag_num = tag_decode(self.tag)
3012 self.tag_constructed = tag_encode(
3014 form=TagFormConstructed,
3018 def _bits2octets(self, bits):
3019 if len(self.specs) > 0:
3020 bits = bits.rstrip("0")
3022 bits += "0" * ((8 - (bit_len % 8)) % 8)
3023 octets = bytearray(len(bits) // 8)
3024 for i in six_xrange(len(octets)):
3025 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3026 return bit_len, bytes(octets)
3028 def _value_sanitize(self, value):
3029 if isinstance(value, (string_types, binary_type)):
3031 isinstance(value, string_types) and
3032 value.startswith("'")
3034 if value.endswith("'B"):
3036 if not frozenset(value) <= SET01:
3037 raise ValueError("B's coding contains unacceptable chars")
3038 return self._bits2octets(value)
3039 if value.endswith("'H"):
3043 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3045 if value.__class__ == binary_type:
3046 return (len(value) * 8, value)
3047 raise InvalidValueType((self.__class__, string_types, binary_type))
3048 if value.__class__ == tuple:
3051 isinstance(value[0], integer_types) and
3052 value[1].__class__ == binary_type
3057 bit = self.specs.get(name)
3059 raise ObjUnknown("BitString value: %s" % name)
3062 return self._bits2octets("")
3063 bits = frozenset(bits)
3064 return self._bits2octets("".join(
3065 ("1" if bit in bits else "0")
3066 for bit in six_xrange(max(bits) + 1)
3068 if issubclass(value.__class__, BitString):
3070 raise InvalidValueType((self.__class__, binary_type, string_types))
3074 return self._value is not None
3076 def __getstate__(self):
3077 return BitStringState(
3092 self.tag_constructed,
3096 def __setstate__(self, state):
3097 super(BitString, self).__setstate__(state)
3098 self.specs = state.specs
3099 self._value = state.value
3100 self.tag_constructed = state.tag_constructed
3101 self.defined = state.defined
3104 self._assert_ready()
3105 for i in six_xrange(self._value[0]):
3110 """Returns number of bits in the string
3112 self._assert_ready()
3113 return self._value[0]
3115 def __bytes__(self):
3116 self._assert_ready()
3117 return self._value[1]
3119 def __eq__(self, their):
3120 if their.__class__ == bytes:
3121 return self._value[1] == their
3122 if not issubclass(their.__class__, BitString):
3125 self._value == their._value and
3126 self.tag == their.tag and
3127 self._expl == their._expl
3132 """Named representation (if exists) of the bits
3134 :returns: [str(name), ...]
3136 return [name for name, bit in iteritems(self.specs) if self[bit]]
3146 return self.__class__(
3148 impl=self.tag if impl is None else impl,
3149 expl=self._expl if expl is None else expl,
3150 default=self.default if default is None else default,
3151 optional=self.optional if optional is None else optional,
3155 def __getitem__(self, key):
3156 if key.__class__ == int:
3157 bit_len, octets = self._value
3161 byte2int(memoryview(octets)[key // 8:]) >>
3164 if isinstance(key, string_types):
3165 value = self.specs.get(key)
3167 raise ObjUnknown("BitString value: %s" % key)
3169 raise InvalidValueType((int, str))
3172 self._assert_ready()
3173 bit_len, octets = self._value
3176 len_encode(len(octets) + 1),
3177 int2byte((8 - bit_len % 8) % 8),
3181 def _encode_cer(self, writer):
3182 bit_len, octets = self._value
3183 if len(octets) + 1 <= 1000:
3184 write_full(writer, self._encode())
3186 write_full(writer, self.tag_constructed)
3187 write_full(writer, LENINDEF)
3188 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3189 write_full(writer, b"".join((
3190 BitString.tag_default,
3193 octets[offset:offset + 999],
3195 tail = octets[offset+999:]
3197 tail = int2byte((8 - bit_len % 8) % 8) + tail
3198 write_full(writer, b"".join((
3199 BitString.tag_default,
3200 len_encode(len(tail)),
3203 write_full(writer, EOC)
3205 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3207 t, tlen, lv = tag_strip(tlv)
3208 except DecodeError as err:
3209 raise err.__class__(
3211 klass=self.__class__,
3212 decode_path=decode_path,
3216 if tag_only: # pragma: no cover
3220 l, llen, v = len_decode(lv)
3221 except DecodeError as err:
3222 raise err.__class__(
3224 klass=self.__class__,
3225 decode_path=decode_path,
3229 raise NotEnoughData(
3230 "encoded length is longer than data",
3231 klass=self.__class__,
3232 decode_path=decode_path,
3236 raise NotEnoughData(
3238 klass=self.__class__,
3239 decode_path=decode_path,
3242 pad_size = byte2int(v)
3243 if l == 1 and pad_size != 0:
3245 "invalid empty value",
3246 klass=self.__class__,
3247 decode_path=decode_path,
3253 klass=self.__class__,
3254 decode_path=decode_path,
3257 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3260 klass=self.__class__,
3261 decode_path=decode_path,
3264 v, tail = v[:l], v[l:]
3265 bit_len = (len(v) - 1) * 8 - pad_size
3266 obj = self.__class__(
3267 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3270 default=self.default,
3271 optional=self.optional,
3273 _decoded=(offset, llen, l),
3276 obj._value = (bit_len, None)
3277 yield decode_path, obj, tail
3279 if t != self.tag_constructed:
3281 klass=self.__class__,
3282 decode_path=decode_path,
3285 if not ctx.get("bered", False):
3287 "unallowed BER constructed encoding",
3288 klass=self.__class__,
3289 decode_path=decode_path,
3292 if tag_only: # pragma: no cover
3297 l, llen, v = len_decode(lv)
3298 except LenIndefForm:
3299 llen, l, v = 1, 0, lv[1:]
3301 except DecodeError as err:
3302 raise err.__class__(
3304 klass=self.__class__,
3305 decode_path=decode_path,
3309 raise NotEnoughData(
3310 "encoded length is longer than data",
3311 klass=self.__class__,
3312 decode_path=decode_path,
3315 if not lenindef and l == 0:
3316 raise NotEnoughData(
3318 klass=self.__class__,
3319 decode_path=decode_path,
3323 sub_offset = offset + tlen + llen
3327 if v[:EOC_LEN].tobytes() == EOC:
3334 "chunk out of bounds",
3335 klass=self.__class__,
3336 decode_path=decode_path + (str(len(chunks) - 1),),
3337 offset=chunks[-1].offset,
3339 sub_decode_path = decode_path + (str(len(chunks)),)
3342 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3345 decode_path=sub_decode_path,
3348 _ctx_immutable=False,
3350 yield _decode_path, chunk, v_tail
3352 _, chunk, v_tail = next(BitString().decode_evgen(
3355 decode_path=sub_decode_path,
3358 _ctx_immutable=False,
3363 "expected BitString encoded chunk",
3364 klass=self.__class__,
3365 decode_path=sub_decode_path,
3368 chunks.append(chunk)
3369 sub_offset += chunk.tlvlen
3370 vlen += chunk.tlvlen
3372 if len(chunks) == 0:
3375 klass=self.__class__,
3376 decode_path=decode_path,
3381 for chunk_i, chunk in enumerate(chunks[:-1]):
3382 if chunk.bit_len % 8 != 0:
3384 "BitString chunk is not multiple of 8 bits",
3385 klass=self.__class__,
3386 decode_path=decode_path + (str(chunk_i),),
3387 offset=chunk.offset,
3390 values.append(bytes(chunk))
3391 bit_len += chunk.bit_len
3392 chunk_last = chunks[-1]
3394 values.append(bytes(chunk_last))
3395 bit_len += chunk_last.bit_len
3396 obj = self.__class__(
3397 value=None if evgen_mode else (bit_len, b"".join(values)),
3400 default=self.default,
3401 optional=self.optional,
3403 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3406 obj._value = (bit_len, None)
3407 obj.lenindef = lenindef
3408 obj.ber_encoded = True
3409 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3412 return pp_console_row(next(self.pps()))
3414 def pps(self, decode_path=()):
3418 bit_len, blob = self._value
3419 value = "%d bits" % bit_len
3420 if len(self.specs) > 0 and blob is not None:
3421 blob = tuple(self.named)
3424 asn1_type_name=self.asn1_type_name,
3425 obj_name=self.__class__.__name__,
3426 decode_path=decode_path,
3429 optional=self.optional,
3430 default=self == self.default,
3431 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3432 expl=None if self._expl is None else tag_decode(self._expl),
3437 expl_offset=self.expl_offset if self.expled else None,
3438 expl_tlen=self.expl_tlen if self.expled else None,
3439 expl_llen=self.expl_llen if self.expled else None,
3440 expl_vlen=self.expl_vlen if self.expled else None,
3441 expl_lenindef=self.expl_lenindef,
3442 lenindef=self.lenindef,
3443 ber_encoded=self.ber_encoded,
3446 defined_by, defined = self.defined or (None, None)
3447 if defined_by is not None:
3449 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3451 for pp in self.pps_lenindef(decode_path):
3455 OctetStringState = namedtuple(
3457 BasicState._fields + (
3468 class OctetString(Obj):
3469 """``OCTET STRING`` binary string type
3471 >>> s = OctetString(b"hello world")
3472 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3473 >>> s == OctetString(b"hello world")
3478 >>> OctetString(b"hello", bounds=(4, 4))
3479 Traceback (most recent call last):
3480 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3481 >>> OctetString(b"hell", bounds=(4, 4))
3482 OCTET STRING 4 bytes 68656c6c
3484 Memoryviews can be used as a values. If memoryview is made on
3485 mmap-ed file, then it does not take storage inside OctetString
3486 itself. In CER encoding mode it will be streamed to the specified
3487 writer, copying 1 KB chunks.
3489 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3490 tag_default = tag_encode(4)
3491 asn1_type_name = "OCTET STRING"
3492 evgen_mode_skip_value = True
3506 :param value: set the value. Either binary type, or
3507 :py:class:`pyderasn.OctetString` object
3508 :param bounds: set ``(MIN, MAX)`` value size constraint.
3509 (-inf, +inf) by default
3510 :param bytes impl: override default tag with ``IMPLICIT`` one
3511 :param bytes expl: override default tag with ``EXPLICIT`` one
3512 :param default: set default value. Type same as in ``value``
3513 :param bool optional: is object ``OPTIONAL`` in sequence
3515 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3517 self._bound_min, self._bound_max = getattr(
3521 ) if bounds is None else bounds
3522 if value is not None:
3523 self._value = self._value_sanitize(value)
3524 if default is not None:
3525 default = self._value_sanitize(default)
3526 self.default = self.__class__(
3531 if self._value is None:
3532 self._value = default
3534 tag_klass, _, tag_num = tag_decode(self.tag)
3535 self.tag_constructed = tag_encode(
3537 form=TagFormConstructed,
3541 def _value_sanitize(self, value):
3542 if value.__class__ == binary_type or value.__class__ == memoryview:
3544 elif issubclass(value.__class__, OctetString):
3545 value = value._value
3547 raise InvalidValueType((self.__class__, bytes, memoryview))
3548 if not self._bound_min <= len(value) <= self._bound_max:
3549 raise BoundsError(self._bound_min, len(value), self._bound_max)
3554 return self._value is not None
3556 def __getstate__(self):
3557 return OctetStringState(
3573 self.tag_constructed,
3577 def __setstate__(self, state):
3578 super(OctetString, self).__setstate__(state)
3579 self._value = state.value
3580 self._bound_min = state.bound_min
3581 self._bound_max = state.bound_max
3582 self.tag_constructed = state.tag_constructed
3583 self.defined = state.defined
3585 def __bytes__(self):
3586 self._assert_ready()
3587 return bytes(self._value)
3589 def __eq__(self, their):
3590 if their.__class__ == binary_type:
3591 return self._value == their
3592 if not issubclass(their.__class__, OctetString):
3595 self._value == their._value and
3596 self.tag == their.tag and
3597 self._expl == their._expl
3600 def __lt__(self, their):
3601 return self._value < their._value
3612 return self.__class__(
3615 (self._bound_min, self._bound_max)
3616 if bounds is None else bounds
3618 impl=self.tag if impl is None else impl,
3619 expl=self._expl if expl is None else expl,
3620 default=self.default if default is None else default,
3621 optional=self.optional if optional is None else optional,
3625 self._assert_ready()
3628 len_encode(len(self._value)),
3632 def _encode_cer(self, writer):
3633 octets = self._value
3634 if len(octets) <= 1000:
3635 write_full(writer, self._encode())
3637 write_full(writer, self.tag_constructed)
3638 write_full(writer, LENINDEF)
3639 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3640 write_full(writer, b"".join((
3641 OctetString.tag_default,
3643 octets[offset:offset + 1000],
3645 tail = octets[offset+1000:]
3647 write_full(writer, b"".join((
3648 OctetString.tag_default,
3649 len_encode(len(tail)),
3652 write_full(writer, EOC)
3654 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3656 t, tlen, lv = tag_strip(tlv)
3657 except DecodeError as err:
3658 raise err.__class__(
3660 klass=self.__class__,
3661 decode_path=decode_path,
3669 l, llen, v = len_decode(lv)
3670 except DecodeError as err:
3671 raise err.__class__(
3673 klass=self.__class__,
3674 decode_path=decode_path,
3678 raise NotEnoughData(
3679 "encoded length is longer than data",
3680 klass=self.__class__,
3681 decode_path=decode_path,
3684 v, tail = v[:l], v[l:]
3685 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3687 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3688 klass=self.__class__,
3689 decode_path=decode_path,
3693 obj = self.__class__(
3695 None if (evgen_mode and self.evgen_mode_skip_value)
3698 bounds=(self._bound_min, self._bound_max),
3701 default=self.default,
3702 optional=self.optional,
3703 _decoded=(offset, llen, l),
3706 except DecodeError as err:
3709 klass=self.__class__,
3710 decode_path=decode_path,
3713 except BoundsError as err:
3716 klass=self.__class__,
3717 decode_path=decode_path,
3720 yield decode_path, obj, tail
3722 if t != self.tag_constructed:
3724 klass=self.__class__,
3725 decode_path=decode_path,
3728 if not ctx.get("bered", False):
3730 "unallowed BER constructed encoding",
3731 klass=self.__class__,
3732 decode_path=decode_path,
3740 l, llen, v = len_decode(lv)
3741 except LenIndefForm:
3742 llen, l, v = 1, 0, lv[1:]
3744 except DecodeError as err:
3745 raise err.__class__(
3747 klass=self.__class__,
3748 decode_path=decode_path,
3752 raise NotEnoughData(
3753 "encoded length is longer than data",
3754 klass=self.__class__,
3755 decode_path=decode_path,
3760 sub_offset = offset + tlen + llen
3765 if v[:EOC_LEN].tobytes() == EOC:
3772 "chunk out of bounds",
3773 klass=self.__class__,
3774 decode_path=decode_path + (str(len(chunks) - 1),),
3775 offset=chunks[-1].offset,
3779 sub_decode_path = decode_path + (str(chunks_count),)
3780 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3783 decode_path=sub_decode_path,
3786 _ctx_immutable=False,
3788 yield _decode_path, chunk, v_tail
3789 if not chunk.ber_encoded:
3790 payload_len += chunk.vlen
3793 sub_decode_path = decode_path + (str(len(chunks)),)
3794 _, chunk, v_tail = next(OctetString().decode_evgen(
3797 decode_path=sub_decode_path,
3800 _ctx_immutable=False,
3803 chunks.append(chunk)
3806 "expected OctetString encoded chunk",
3807 klass=self.__class__,
3808 decode_path=sub_decode_path,
3811 sub_offset += chunk.tlvlen
3812 vlen += chunk.tlvlen
3814 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3816 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3817 klass=self.__class__,
3818 decode_path=decode_path,
3822 obj = self.__class__(
3824 None if evgen_mode else
3825 b"".join(bytes(chunk) for chunk in chunks)
3827 bounds=(self._bound_min, self._bound_max),
3830 default=self.default,
3831 optional=self.optional,
3832 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3835 except DecodeError as err:
3838 klass=self.__class__,
3839 decode_path=decode_path,
3842 except BoundsError as err:
3845 klass=self.__class__,
3846 decode_path=decode_path,
3849 obj.lenindef = lenindef
3850 obj.ber_encoded = True
3851 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3854 return pp_console_row(next(self.pps()))
3856 def pps(self, decode_path=()):
3859 asn1_type_name=self.asn1_type_name,
3860 obj_name=self.__class__.__name__,
3861 decode_path=decode_path,
3862 value=("%d bytes" % len(self._value)) if self.ready else None,
3863 blob=self._value if self.ready else None,
3864 optional=self.optional,
3865 default=self == self.default,
3866 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3867 expl=None if self._expl is None else tag_decode(self._expl),
3872 expl_offset=self.expl_offset if self.expled else None,
3873 expl_tlen=self.expl_tlen if self.expled else None,
3874 expl_llen=self.expl_llen if self.expled else None,
3875 expl_vlen=self.expl_vlen if self.expled else None,
3876 expl_lenindef=self.expl_lenindef,
3877 lenindef=self.lenindef,
3878 ber_encoded=self.ber_encoded,
3881 defined_by, defined = self.defined or (None, None)
3882 if defined_by is not None:
3884 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3886 for pp in self.pps_lenindef(decode_path):
3890 def agg_octet_string(evgens, decode_path, raw, writer):
3891 """Aggregate constructed string (OctetString and its derivatives)
3893 :param evgens: iterator of generated events
3894 :param decode_path: points to the string we want to decode
3895 :param raw: slicebable (memoryview, bytearray, etc) with
3896 the data evgens are generated on
3897 :param writer: buffer.write where string is going to be saved
3898 :param writer: where string is going to be saved. Must comply
3899 with ``io.RawIOBase.write`` behaviour
3901 decode_path_len = len(decode_path)
3902 for dp, obj, _ in evgens:
3903 if dp[:decode_path_len] != decode_path:
3905 if not obj.ber_encoded:
3906 write_full(writer, raw[
3907 obj.offset + obj.tlen + obj.llen:
3908 obj.offset + obj.tlen + obj.llen + obj.vlen -
3909 (EOC_LEN if obj.expl_lenindef else 0)
3911 if len(dp) == decode_path_len:
3915 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
3919 """``NULL`` null object
3927 tag_default = tag_encode(5)
3928 asn1_type_name = "NULL"
3932 value=None, # unused, but Sequence passes it
3939 :param bytes impl: override default tag with ``IMPLICIT`` one
3940 :param bytes expl: override default tag with ``EXPLICIT`` one
3941 :param bool optional: is object ``OPTIONAL`` in sequence
3943 super(Null, self).__init__(impl, expl, None, optional, _decoded)
3950 def __getstate__(self):
3966 def __eq__(self, their):
3967 if not issubclass(their.__class__, Null):
3970 self.tag == their.tag and
3971 self._expl == their._expl
3981 return self.__class__(
3982 impl=self.tag if impl is None else impl,
3983 expl=self._expl if expl is None else expl,
3984 optional=self.optional if optional is None else optional,
3988 return self.tag + len_encode(0)
3990 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3992 t, _, lv = tag_strip(tlv)
3993 except DecodeError as err:
3994 raise err.__class__(
3996 klass=self.__class__,
3997 decode_path=decode_path,
4002 klass=self.__class__,
4003 decode_path=decode_path,
4006 if tag_only: # pragma: no cover
4010 l, _, v = len_decode(lv)
4011 except DecodeError as err:
4012 raise err.__class__(
4014 klass=self.__class__,
4015 decode_path=decode_path,
4019 raise InvalidLength(
4020 "Null must have zero length",
4021 klass=self.__class__,
4022 decode_path=decode_path,
4025 obj = self.__class__(
4028 optional=self.optional,
4029 _decoded=(offset, 1, 0),
4031 yield decode_path, obj, v
4034 return pp_console_row(next(self.pps()))
4036 def pps(self, decode_path=()):
4039 asn1_type_name=self.asn1_type_name,
4040 obj_name=self.__class__.__name__,
4041 decode_path=decode_path,
4042 optional=self.optional,
4043 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4044 expl=None if self._expl is None else tag_decode(self._expl),
4049 expl_offset=self.expl_offset if self.expled else None,
4050 expl_tlen=self.expl_tlen if self.expled else None,
4051 expl_llen=self.expl_llen if self.expled else None,
4052 expl_vlen=self.expl_vlen if self.expled else None,
4053 expl_lenindef=self.expl_lenindef,
4056 for pp in self.pps_lenindef(decode_path):
4060 ObjectIdentifierState = namedtuple(
4061 "ObjectIdentifierState",
4062 BasicState._fields + ("value", "defines"),
4067 class ObjectIdentifier(Obj):
4068 """``OBJECT IDENTIFIER`` OID type
4070 >>> oid = ObjectIdentifier((1, 2, 3))
4071 OBJECT IDENTIFIER 1.2.3
4072 >>> oid == ObjectIdentifier("1.2.3")
4078 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4079 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4081 >>> str(ObjectIdentifier((3, 1)))
4082 Traceback (most recent call last):
4083 pyderasn.InvalidOID: unacceptable first arc value
4085 __slots__ = ("defines",)
4086 tag_default = tag_encode(6)
4087 asn1_type_name = "OBJECT IDENTIFIER"
4100 :param value: set the value. Either tuples of integers,
4101 string of "."-concatenated integers, or
4102 :py:class:`pyderasn.ObjectIdentifier` object
4103 :param defines: sequence of tuples. Each tuple has two elements.
4104 First one is relative to current one decode
4105 path, aiming to the field defined by that OID.
4106 Read about relative path in
4107 :py:func:`pyderasn.abs_decode_path`. Second
4108 tuple element is ``{OID: pyderasn.Obj()}``
4109 dictionary, mapping between current OID value
4110 and structure applied to defined field.
4111 :ref:`Read about DEFINED BY <definedby>`
4112 :param bytes impl: override default tag with ``IMPLICIT`` one
4113 :param bytes expl: override default tag with ``EXPLICIT`` one
4114 :param default: set default value. Type same as in ``value``
4115 :param bool optional: is object ``OPTIONAL`` in sequence
4117 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4119 if value is not None:
4120 self._value = self._value_sanitize(value)
4121 if default is not None:
4122 default = self._value_sanitize(default)
4123 self.default = self.__class__(
4128 if self._value is None:
4129 self._value = default
4130 self.defines = defines
4132 def __add__(self, their):
4133 if their.__class__ == tuple:
4134 return self.__class__(self._value + array("L", their))
4135 if isinstance(their, self.__class__):
4136 return self.__class__(self._value + their._value)
4137 raise InvalidValueType((self.__class__, tuple))
4139 def _value_sanitize(self, value):
4140 if issubclass(value.__class__, ObjectIdentifier):
4142 if isinstance(value, string_types):
4144 value = array("L", (pureint(arc) for arc in value.split(".")))
4146 raise InvalidOID("unacceptable arcs values")
4147 if value.__class__ == tuple:
4149 value = array("L", value)
4150 except OverflowError as err:
4151 raise InvalidOID(repr(err))
4152 if value.__class__ is array:
4154 raise InvalidOID("less than 2 arcs")
4155 first_arc = value[0]
4156 if first_arc in (0, 1):
4157 if not (0 <= value[1] <= 39):
4158 raise InvalidOID("second arc is too wide")
4159 elif first_arc == 2:
4162 raise InvalidOID("unacceptable first arc value")
4163 if not all(arc >= 0 for arc in value):
4164 raise InvalidOID("negative arc value")
4166 raise InvalidValueType((self.__class__, str, tuple))
4170 return self._value is not None
4172 def __getstate__(self):
4173 return ObjectIdentifierState(
4190 def __setstate__(self, state):
4191 super(ObjectIdentifier, self).__setstate__(state)
4192 self._value = state.value
4193 self.defines = state.defines
4196 self._assert_ready()
4197 return iter(self._value)
4200 return ".".join(str(arc) for arc in self._value or ())
4203 self._assert_ready()
4206 bytes(self._expl or b"") +
4207 str(self._value).encode("ascii"),
4210 def __eq__(self, their):
4211 if their.__class__ == tuple:
4212 return self._value == array("L", their)
4213 if not issubclass(their.__class__, ObjectIdentifier):
4216 self.tag == their.tag and
4217 self._expl == their._expl and
4218 self._value == their._value
4221 def __lt__(self, their):
4222 return self._value < their._value
4233 return self.__class__(
4235 defines=self.defines if defines is None else defines,
4236 impl=self.tag if impl is None else impl,
4237 expl=self._expl if expl is None else expl,
4238 default=self.default if default is None else default,
4239 optional=self.optional if optional is None else optional,
4243 self._assert_ready()
4245 first_value = value[1]
4246 first_arc = value[0]
4249 elif first_arc == 1:
4251 elif first_arc == 2:
4253 else: # pragma: no cover
4254 raise RuntimeError("invalid arc is stored")
4255 octets = [zero_ended_encode(first_value)]
4256 for arc in value[2:]:
4257 octets.append(zero_ended_encode(arc))
4258 v = b"".join(octets)
4259 return b"".join((self.tag, len_encode(len(v)), v))
4261 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4263 t, _, lv = tag_strip(tlv)
4264 except DecodeError as err:
4265 raise err.__class__(
4267 klass=self.__class__,
4268 decode_path=decode_path,
4273 klass=self.__class__,
4274 decode_path=decode_path,
4277 if tag_only: # pragma: no cover
4281 l, llen, v = len_decode(lv)
4282 except DecodeError as err:
4283 raise err.__class__(
4285 klass=self.__class__,
4286 decode_path=decode_path,
4290 raise NotEnoughData(
4291 "encoded length is longer than data",
4292 klass=self.__class__,
4293 decode_path=decode_path,
4297 raise NotEnoughData(
4299 klass=self.__class__,
4300 decode_path=decode_path,
4303 v, tail = v[:l], v[l:]
4310 octet = indexbytes(v, i)
4311 if i == 0 and octet == 0x80:
4312 if ctx.get("bered", False):
4316 "non normalized arc encoding",
4317 klass=self.__class__,
4318 decode_path=decode_path,
4321 arc = (arc << 7) | (octet & 0x7F)
4322 if octet & 0x80 == 0:
4325 except OverflowError:
4327 "too huge value for local unsigned long",
4328 klass=self.__class__,
4329 decode_path=decode_path,
4338 klass=self.__class__,
4339 decode_path=decode_path,
4343 second_arc = arcs[0]
4344 if 0 <= second_arc <= 39:
4346 elif 40 <= second_arc <= 79:
4352 obj = self.__class__(
4353 value=array("L", (first_arc, second_arc)) + arcs[1:],
4356 default=self.default,
4357 optional=self.optional,
4358 _decoded=(offset, llen, l),
4361 obj.ber_encoded = True
4362 yield decode_path, obj, tail
4365 return pp_console_row(next(self.pps()))
4367 def pps(self, decode_path=()):
4370 asn1_type_name=self.asn1_type_name,
4371 obj_name=self.__class__.__name__,
4372 decode_path=decode_path,
4373 value=str(self) if self.ready else None,
4374 optional=self.optional,
4375 default=self == self.default,
4376 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4377 expl=None if self._expl is None else tag_decode(self._expl),
4382 expl_offset=self.expl_offset if self.expled else None,
4383 expl_tlen=self.expl_tlen if self.expled else None,
4384 expl_llen=self.expl_llen if self.expled else None,
4385 expl_vlen=self.expl_vlen if self.expled else None,
4386 expl_lenindef=self.expl_lenindef,
4387 ber_encoded=self.ber_encoded,
4390 for pp in self.pps_lenindef(decode_path):
4394 class Enumerated(Integer):
4395 """``ENUMERATED`` integer type
4397 This type is identical to :py:class:`pyderasn.Integer`, but requires
4398 schema to be specified and does not accept values missing from it.
4401 tag_default = tag_encode(10)
4402 asn1_type_name = "ENUMERATED"
4413 bounds=None, # dummy argument, workability for Integer.decode
4415 super(Enumerated, self).__init__(
4416 value, bounds, impl, expl, default, optional, _specs, _decoded,
4418 if len(self.specs) == 0:
4419 raise ValueError("schema must be specified")
4421 def _value_sanitize(self, value):
4422 if isinstance(value, self.__class__):
4423 value = value._value
4424 elif isinstance(value, integer_types):
4425 for _value in itervalues(self.specs):
4430 "unknown integer value: %s" % value,
4431 klass=self.__class__,
4433 elif isinstance(value, string_types):
4434 value = self.specs.get(value)
4436 raise ObjUnknown("integer value: %s" % value)
4438 raise InvalidValueType((self.__class__, int, str))
4450 return self.__class__(
4452 impl=self.tag if impl is None else impl,
4453 expl=self._expl if expl is None else expl,
4454 default=self.default if default is None else default,
4455 optional=self.optional if optional is None else optional,
4460 def escape_control_unicode(c):
4461 if unicat(c)[0] == "C":
4462 c = repr(c).lstrip("u").strip("'")
4466 class CommonString(OctetString):
4467 """Common class for all strings
4469 Everything resembles :py:class:`pyderasn.OctetString`, except
4470 ability to deal with unicode text strings.
4472 >>> hexenc("привет мир".encode("utf-8"))
4473 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4474 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4476 >>> s = UTF8String("привет мир")
4477 UTF8String UTF8String привет мир
4479 'привет мир'
4480 >>> hexenc(bytes(s))
4481 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4483 >>> PrintableString("привет мир")
4484 Traceback (most recent call last):
4485 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4487 >>> BMPString("ада", bounds=(2, 2))
4488 Traceback (most recent call last):
4489 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4490 >>> s = BMPString("ад", bounds=(2, 2))
4493 >>> hexenc(bytes(s))
4501 * - :py:class:`pyderasn.UTF8String`
4503 * - :py:class:`pyderasn.NumericString`
4505 * - :py:class:`pyderasn.PrintableString`
4507 * - :py:class:`pyderasn.TeletexString`
4509 * - :py:class:`pyderasn.T61String`
4511 * - :py:class:`pyderasn.VideotexString`
4513 * - :py:class:`pyderasn.IA5String`
4515 * - :py:class:`pyderasn.GraphicString`
4517 * - :py:class:`pyderasn.VisibleString`
4519 * - :py:class:`pyderasn.ISO646String`
4521 * - :py:class:`pyderasn.GeneralString`
4523 * - :py:class:`pyderasn.UniversalString`
4525 * - :py:class:`pyderasn.BMPString`
4530 def _value_sanitize(self, value):
4532 value_decoded = None
4533 if isinstance(value, self.__class__):
4534 value_raw = value._value
4535 elif value.__class__ == text_type:
4536 value_decoded = value
4537 elif value.__class__ == binary_type:
4540 raise InvalidValueType((self.__class__, text_type, binary_type))
4543 value_decoded.encode(self.encoding)
4544 if value_raw is None else value_raw
4547 value_raw.decode(self.encoding)
4548 if value_decoded is None else value_decoded
4550 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4551 raise DecodeError(str(err))
4552 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4560 def __eq__(self, their):
4561 if their.__class__ == binary_type:
4562 return self._value == their
4563 if their.__class__ == text_type:
4564 return self._value == their.encode(self.encoding)
4565 if not isinstance(their, self.__class__):
4568 self._value == their._value and
4569 self.tag == their.tag and
4570 self._expl == their._expl
4573 def __unicode__(self):
4575 return self._value.decode(self.encoding)
4576 return text_type(self._value)
4579 return pp_console_row(next(self.pps(no_unicode=PY2)))
4581 def pps(self, decode_path=(), no_unicode=False):
4585 hexenc(bytes(self)) if no_unicode else
4586 "".join(escape_control_unicode(c) for c in self.__unicode__())
4590 asn1_type_name=self.asn1_type_name,
4591 obj_name=self.__class__.__name__,
4592 decode_path=decode_path,
4594 optional=self.optional,
4595 default=self == self.default,
4596 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4597 expl=None if self._expl is None else tag_decode(self._expl),
4602 expl_offset=self.expl_offset if self.expled else None,
4603 expl_tlen=self.expl_tlen if self.expled else None,
4604 expl_llen=self.expl_llen if self.expled else None,
4605 expl_vlen=self.expl_vlen if self.expled else None,
4606 expl_lenindef=self.expl_lenindef,
4607 ber_encoded=self.ber_encoded,
4610 for pp in self.pps_lenindef(decode_path):
4614 class UTF8String(CommonString):
4616 tag_default = tag_encode(12)
4618 asn1_type_name = "UTF8String"
4621 class AllowableCharsMixin(object):
4623 def allowable_chars(self):
4625 return self._allowable_chars
4626 return frozenset(six_unichr(c) for c in self._allowable_chars)
4629 class NumericString(AllowableCharsMixin, CommonString):
4632 Its value is properly sanitized: only ASCII digits with spaces can
4635 >>> NumericString().allowable_chars
4636 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4639 tag_default = tag_encode(18)
4641 asn1_type_name = "NumericString"
4642 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4644 def _value_sanitize(self, value):
4645 value = super(NumericString, self)._value_sanitize(value)
4646 if not frozenset(value) <= self._allowable_chars:
4647 raise DecodeError("non-numeric value")
4651 PrintableStringState = namedtuple(
4652 "PrintableStringState",
4653 OctetStringState._fields + ("allowable_chars",),
4658 class PrintableString(AllowableCharsMixin, CommonString):
4661 Its value is properly sanitized: see X.680 41.4 table 10.
4663 >>> PrintableString().allowable_chars
4664 frozenset([' ', "'", ..., 'z'])
4665 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4666 PrintableString PrintableString foo*bar
4667 >>> obj.allow_asterisk, obj.allow_ampersand
4671 tag_default = tag_encode(19)
4673 asn1_type_name = "PrintableString"
4674 _allowable_chars = frozenset(
4675 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4677 _asterisk = frozenset("*".encode("ascii"))
4678 _ampersand = frozenset("&".encode("ascii"))
4690 allow_asterisk=False,
4691 allow_ampersand=False,
4694 :param allow_asterisk: allow asterisk character
4695 :param allow_ampersand: allow ampersand character
4698 self._allowable_chars |= self._asterisk
4700 self._allowable_chars |= self._ampersand
4701 super(PrintableString, self).__init__(
4702 value, bounds, impl, expl, default, optional, _decoded, ctx,
4706 def allow_asterisk(self):
4707 """Is asterisk character allowed?
4709 return self._asterisk <= self._allowable_chars
4712 def allow_ampersand(self):
4713 """Is ampersand character allowed?
4715 return self._ampersand <= self._allowable_chars
4717 def _value_sanitize(self, value):
4718 value = super(PrintableString, self)._value_sanitize(value)
4719 if not frozenset(value) <= self._allowable_chars:
4720 raise DecodeError("non-printable value")
4723 def __getstate__(self):
4724 return PrintableStringState(
4725 *super(PrintableString, self).__getstate__(),
4726 **{"allowable_chars": self._allowable_chars}
4729 def __setstate__(self, state):
4730 super(PrintableString, self).__setstate__(state)
4731 self._allowable_chars = state.allowable_chars
4742 return self.__class__(
4745 (self._bound_min, self._bound_max)
4746 if bounds is None else bounds
4748 impl=self.tag if impl is None else impl,
4749 expl=self._expl if expl is None else expl,
4750 default=self.default if default is None else default,
4751 optional=self.optional if optional is None else optional,
4752 allow_asterisk=self.allow_asterisk,
4753 allow_ampersand=self.allow_ampersand,
4757 class TeletexString(CommonString):
4759 tag_default = tag_encode(20)
4761 asn1_type_name = "TeletexString"
4764 class T61String(TeletexString):
4766 asn1_type_name = "T61String"
4769 class VideotexString(CommonString):
4771 tag_default = tag_encode(21)
4772 encoding = "iso-8859-1"
4773 asn1_type_name = "VideotexString"
4776 class IA5String(CommonString):
4778 tag_default = tag_encode(22)
4780 asn1_type_name = "IA5"
4783 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4784 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4785 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4788 class VisibleString(CommonString):
4790 tag_default = tag_encode(26)
4792 asn1_type_name = "VisibleString"
4795 UTCTimeState = namedtuple(
4797 OctetStringState._fields + ("ber_raw",),
4802 def str_to_time_fractions(value):
4804 year, v = (v // 10**10), (v % 10**10)
4805 month, v = (v // 10**8), (v % 10**8)
4806 day, v = (v // 10**6), (v % 10**6)
4807 hour, v = (v // 10**4), (v % 10**4)
4808 minute, second = (v // 100), (v % 100)
4809 return year, month, day, hour, minute, second
4812 class UTCTime(VisibleString):
4813 """``UTCTime`` datetime type
4815 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
4816 UTCTime UTCTime 2017-09-30T22:07:50
4822 datetime.datetime(2017, 9, 30, 22, 7, 50)
4823 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
4824 datetime.datetime(1957, 9, 30, 22, 7, 50)
4826 If BER encoded value was met, then ``ber_raw`` attribute will hold
4827 its raw representation.
4831 Pay attention that UTCTime can not hold full year, so all years
4832 having < 50 years are treated as 20xx, 19xx otherwise, according
4833 to X.509 recommendation.
4837 No strict validation of UTC offsets are made, but very crude:
4839 * minutes are not exceeding 60
4840 * offset value is not exceeding 14 hours
4842 __slots__ = ("ber_raw",)
4843 tag_default = tag_encode(23)
4845 asn1_type_name = "UTCTime"
4846 evgen_mode_skip_value = False
4856 bounds=None, # dummy argument, workability for OctetString.decode
4860 :param value: set the value. Either datetime type, or
4861 :py:class:`pyderasn.UTCTime` object
4862 :param bytes impl: override default tag with ``IMPLICIT`` one
4863 :param bytes expl: override default tag with ``EXPLICIT`` one
4864 :param default: set default value. Type same as in ``value``
4865 :param bool optional: is object ``OPTIONAL`` in sequence
4867 super(UTCTime, self).__init__(
4868 None, None, impl, expl, None, optional, _decoded, ctx,
4872 if value is not None:
4873 self._value, self.ber_raw = self._value_sanitize(value, ctx)
4874 self.ber_encoded = self.ber_raw is not None
4875 if default is not None:
4876 default, _ = self._value_sanitize(default)
4877 self.default = self.__class__(
4882 if self._value is None:
4883 self._value = default
4885 self.optional = optional
4887 def _strptime_bered(self, value):
4888 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
4891 raise ValueError("no timezone")
4892 year += 2000 if year < 50 else 1900
4893 decoded = datetime(year, month, day, hour, minute)
4895 if value[-1] == "Z":
4899 raise ValueError("invalid UTC offset")
4900 if value[-5] == "-":
4902 elif value[-5] == "+":
4905 raise ValueError("invalid UTC offset")
4906 v = pureint(value[-4:])
4907 offset, v = (60 * (v % 100)), v // 100
4909 raise ValueError("invalid UTC offset minutes")
4911 if offset > 14 * 3600:
4912 raise ValueError("too big UTC offset")
4916 return offset, decoded
4918 raise ValueError("invalid UTC offset seconds")
4919 seconds = pureint(value)
4921 raise ValueError("invalid seconds value")
4922 return offset, decoded + timedelta(seconds=seconds)
4924 def _strptime(self, value):
4925 # datetime.strptime's format: %y%m%d%H%M%SZ
4926 if len(value) != LEN_YYMMDDHHMMSSZ:
4927 raise ValueError("invalid UTCTime length")
4928 if value[-1] != "Z":
4929 raise ValueError("non UTC timezone")
4930 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
4931 year += 2000 if year < 50 else 1900
4932 return datetime(year, month, day, hour, minute, second)
4934 def _dt_sanitize(self, value):
4935 if value.year < 1950 or value.year > 2049:
4936 raise ValueError("UTCTime can hold only 1950-2049 years")
4937 return value.replace(microsecond=0)
4939 def _value_sanitize(self, value, ctx=None):
4940 if value.__class__ == binary_type:
4942 value_decoded = value.decode("ascii")
4943 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4944 raise DecodeError("invalid UTCTime encoding: %r" % err)
4947 return self._strptime(value_decoded), None
4948 except (TypeError, ValueError) as _err:
4950 if (ctx is not None) and ctx.get("bered", False):
4952 offset, _value = self._strptime_bered(value_decoded)
4953 _value = _value - timedelta(seconds=offset)
4954 return self._dt_sanitize(_value), value
4955 except (TypeError, ValueError, OverflowError) as _err:
4958 "invalid %s format: %r" % (self.asn1_type_name, err),
4959 klass=self.__class__,
4961 if isinstance(value, self.__class__):
4962 return value._value, None
4963 if value.__class__ == datetime:
4964 return self._dt_sanitize(value), None
4965 raise InvalidValueType((self.__class__, datetime))
4967 def _pp_value(self):
4969 value = self._value.isoformat()
4970 if self.ber_encoded:
4971 value += " (%s)" % self.ber_raw
4974 def __unicode__(self):
4976 value = self._value.isoformat()
4977 if self.ber_encoded:
4978 value += " (%s)" % self.ber_raw
4980 return text_type(self._pp_value())
4982 def __getstate__(self):
4983 return UTCTimeState(
4984 *super(UTCTime, self).__getstate__(),
4985 **{"ber_raw": self.ber_raw}
4988 def __setstate__(self, state):
4989 super(UTCTime, self).__setstate__(state)
4990 self.ber_raw = state.ber_raw
4992 def __bytes__(self):
4993 self._assert_ready()
4994 return self._encode_time()
4996 def __eq__(self, their):
4997 if their.__class__ == binary_type:
4998 return self._encode_time() == their
4999 if their.__class__ == datetime:
5000 return self.todatetime() == their
5001 if not isinstance(their, self.__class__):
5004 self._value == their._value and
5005 self.tag == their.tag and
5006 self._expl == their._expl
5009 def _encode_time(self):
5010 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5013 self._assert_ready()
5014 value = self._encode_time()
5015 return b"".join((self.tag, len_encode(len(value)), value))
5017 def _encode_cer(self, writer):
5018 write_full(writer, self._encode())
5020 def todatetime(self):
5024 return pp_console_row(next(self.pps()))
5026 def pps(self, decode_path=()):
5029 asn1_type_name=self.asn1_type_name,
5030 obj_name=self.__class__.__name__,
5031 decode_path=decode_path,
5032 value=self._pp_value(),
5033 optional=self.optional,
5034 default=self == self.default,
5035 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5036 expl=None if self._expl is None else tag_decode(self._expl),
5041 expl_offset=self.expl_offset if self.expled else None,
5042 expl_tlen=self.expl_tlen if self.expled else None,
5043 expl_llen=self.expl_llen if self.expled else None,
5044 expl_vlen=self.expl_vlen if self.expled else None,
5045 expl_lenindef=self.expl_lenindef,
5046 ber_encoded=self.ber_encoded,
5049 for pp in self.pps_lenindef(decode_path):
5053 class GeneralizedTime(UTCTime):
5054 """``GeneralizedTime`` datetime type
5056 This type is similar to :py:class:`pyderasn.UTCTime`.
5058 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5059 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5061 '20170930220750.000123Z'
5062 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5063 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5067 Only microsecond fractions are supported in DER encoding.
5068 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5069 higher precision values.
5073 BER encoded data can loss information (accuracy) during decoding
5074 because of float transformations.
5078 Local times (without explicit timezone specification) are treated
5079 as UTC one, no transformations are made.
5083 Zero year is unsupported.
5086 tag_default = tag_encode(24)
5087 asn1_type_name = "GeneralizedTime"
5089 def _dt_sanitize(self, value):
5092 def _strptime_bered(self, value):
5093 if len(value) < 4 + 3 * 2:
5094 raise ValueError("invalid GeneralizedTime")
5095 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5096 decoded = datetime(year, month, day, hour)
5097 offset, value = 0, value[10:]
5099 return offset, decoded
5100 if value[-1] == "Z":
5103 for char, sign in (("-", -1), ("+", 1)):
5104 idx = value.rfind(char)
5107 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5108 v = pureint(offset_raw)
5109 if len(offset_raw) == 4:
5110 offset, v = (60 * (v % 100)), v // 100
5112 raise ValueError("invalid UTC offset minutes")
5113 elif len(offset_raw) == 2:
5116 raise ValueError("invalid UTC offset")
5118 if offset > 14 * 3600:
5119 raise ValueError("too big UTC offset")
5123 return offset, decoded
5124 if value[0] in DECIMAL_SIGNS:
5126 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5129 raise ValueError("stripped minutes")
5130 decoded += timedelta(seconds=60 * pureint(value[:2]))
5133 return offset, decoded
5134 if value[0] in DECIMAL_SIGNS:
5136 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5139 raise ValueError("stripped seconds")
5140 decoded += timedelta(seconds=pureint(value[:2]))
5143 return offset, decoded
5144 if value[0] not in DECIMAL_SIGNS:
5145 raise ValueError("invalid format after seconds")
5147 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5150 def _strptime(self, value):
5152 if l == LEN_YYYYMMDDHHMMSSZ:
5153 # datetime.strptime's format: %Y%m%d%H%M%SZ
5154 if value[-1] != "Z":
5155 raise ValueError("non UTC timezone")
5156 return datetime(*str_to_time_fractions(value[:-1]))
5157 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5158 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5159 if value[-1] != "Z":
5160 raise ValueError("non UTC timezone")
5161 if value[14] != ".":
5162 raise ValueError("no fractions separator")
5165 raise ValueError("trailing zero")
5168 raise ValueError("only microsecond fractions are supported")
5169 us = pureint(us + ("0" * (6 - us_len)))
5170 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5171 return datetime(year, month, day, hour, minute, second, us)
5172 raise ValueError("invalid GeneralizedTime length")
5174 def _encode_time(self):
5176 encoded = value.strftime("%Y%m%d%H%M%S")
5177 if value.microsecond > 0:
5178 encoded += (".%06d" % value.microsecond).rstrip("0")
5179 return (encoded + "Z").encode("ascii")
5182 class GraphicString(CommonString):
5184 tag_default = tag_encode(25)
5185 encoding = "iso-8859-1"
5186 asn1_type_name = "GraphicString"
5189 class ISO646String(VisibleString):
5191 asn1_type_name = "ISO646String"
5194 class GeneralString(CommonString):
5196 tag_default = tag_encode(27)
5197 encoding = "iso-8859-1"
5198 asn1_type_name = "GeneralString"
5201 class UniversalString(CommonString):
5203 tag_default = tag_encode(28)
5204 encoding = "utf-32-be"
5205 asn1_type_name = "UniversalString"
5208 class BMPString(CommonString):
5210 tag_default = tag_encode(30)
5211 encoding = "utf-16-be"
5212 asn1_type_name = "BMPString"
5215 ChoiceState = namedtuple(
5217 BasicState._fields + ("specs", "value",),
5223 """``CHOICE`` special type
5227 class GeneralName(Choice):
5229 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5230 ("dNSName", IA5String(impl=tag_ctxp(2))),
5233 >>> gn = GeneralName()
5235 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5236 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5237 >>> gn["dNSName"] = IA5String("bar.baz")
5238 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5239 >>> gn["rfc822Name"]
5242 [2] IA5String IA5 bar.baz
5245 >>> gn.value == gn["dNSName"]
5248 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5250 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5251 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5253 __slots__ = ("specs",)
5255 asn1_type_name = "CHOICE"
5268 :param value: set the value. Either ``(choice, value)`` tuple, or
5269 :py:class:`pyderasn.Choice` object
5270 :param bytes impl: can not be set, do **not** use it
5271 :param bytes expl: override default tag with ``EXPLICIT`` one
5272 :param default: set default value. Type same as in ``value``
5273 :param bool optional: is object ``OPTIONAL`` in sequence
5275 if impl is not None:
5276 raise ValueError("no implicit tag allowed for CHOICE")
5277 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5279 schema = getattr(self, "schema", ())
5280 if len(schema) == 0:
5281 raise ValueError("schema must be specified")
5283 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5286 if value is not None:
5287 self._value = self._value_sanitize(value)
5288 if default is not None:
5289 default_value = self._value_sanitize(default)
5290 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5291 default_obj.specs = self.specs
5292 default_obj._value = default_value
5293 self.default = default_obj
5295 self._value = copy(default_obj._value)
5296 if self._expl is not None:
5297 tag_class, _, tag_num = tag_decode(self._expl)
5298 self._tag_order = (tag_class, tag_num)
5300 def _value_sanitize(self, value):
5301 if (value.__class__ == tuple) and len(value) == 2:
5303 spec = self.specs.get(choice)
5305 raise ObjUnknown(choice)
5306 if not isinstance(obj, spec.__class__):
5307 raise InvalidValueType((spec,))
5308 return (choice, spec(obj))
5309 if isinstance(value, self.__class__):
5311 raise InvalidValueType((self.__class__, tuple))
5315 return self._value is not None and self._value[1].ready
5319 return self.expl_lenindef or (
5320 (self._value is not None) and
5321 self._value[1].bered
5324 def __getstate__(self):
5342 def __setstate__(self, state):
5343 super(Choice, self).__setstate__(state)
5344 self.specs = state.specs
5345 self._value = state.value
5347 def __eq__(self, their):
5348 if (their.__class__ == tuple) and len(their) == 2:
5349 return self._value == their
5350 if not isinstance(their, self.__class__):
5353 self.specs == their.specs and
5354 self._value == their._value
5364 return self.__class__(
5367 expl=self._expl if expl is None else expl,
5368 default=self.default if default is None else default,
5369 optional=self.optional if optional is None else optional,
5374 """Name of the choice
5376 self._assert_ready()
5377 return self._value[0]
5381 """Value of underlying choice
5383 self._assert_ready()
5384 return self._value[1]
5387 def tag_order(self):
5388 self._assert_ready()
5389 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5392 def tag_order_cer(self):
5393 return min(v.tag_order_cer for v in itervalues(self.specs))
5395 def __getitem__(self, key):
5396 if key not in self.specs:
5397 raise ObjUnknown(key)
5398 if self._value is None:
5400 choice, value = self._value
5405 def __setitem__(self, key, value):
5406 spec = self.specs.get(key)
5408 raise ObjUnknown(key)
5409 if not isinstance(value, spec.__class__):
5410 raise InvalidValueType((spec.__class__,))
5411 self._value = (key, spec(value))
5419 return self._value[1].decoded if self.ready else False
5422 self._assert_ready()
5423 return self._value[1].encode()
5425 def _encode_cer(self, writer):
5426 self._assert_ready()
5427 self._value[1].encode_cer(writer)
5429 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5430 for choice, spec in iteritems(self.specs):
5431 sub_decode_path = decode_path + (choice,)
5437 decode_path=sub_decode_path,
5440 _ctx_immutable=False,
5447 klass=self.__class__,
5448 decode_path=decode_path,
5451 if tag_only: # pragma: no cover
5455 for _decode_path, value, tail in spec.decode_evgen(
5459 decode_path=sub_decode_path,
5461 _ctx_immutable=False,
5463 yield _decode_path, value, tail
5465 _, value, tail = next(spec.decode_evgen(
5469 decode_path=sub_decode_path,
5471 _ctx_immutable=False,
5474 obj = self.__class__(
5477 default=self.default,
5478 optional=self.optional,
5479 _decoded=(offset, 0, value.fulllen),
5481 obj._value = (choice, value)
5482 yield decode_path, obj, tail
5485 value = pp_console_row(next(self.pps()))
5487 value = "%s[%r]" % (value, self.value)
5490 def pps(self, decode_path=()):
5493 asn1_type_name=self.asn1_type_name,
5494 obj_name=self.__class__.__name__,
5495 decode_path=decode_path,
5496 value=self.choice if self.ready else None,
5497 optional=self.optional,
5498 default=self == self.default,
5499 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5500 expl=None if self._expl is None else tag_decode(self._expl),
5505 expl_lenindef=self.expl_lenindef,
5509 yield self.value.pps(decode_path=decode_path + (self.choice,))
5510 for pp in self.pps_lenindef(decode_path):
5514 class PrimitiveTypes(Choice):
5515 """Predefined ``CHOICE`` for all generic primitive types
5517 It could be useful for general decoding of some unspecified values:
5519 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5520 OCTET STRING 3 bytes 666f6f
5521 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5525 schema = tuple((klass.__name__, klass()) for klass in (
5549 AnyState = namedtuple(
5551 BasicState._fields + ("value", "defined"),
5557 """``ANY`` special type
5559 >>> Any(Integer(-123))
5560 ANY INTEGER -123 (0X:7B)
5561 >>> a = Any(OctetString(b"hello world").encode())
5562 ANY 040b68656c6c6f20776f726c64
5563 >>> hexenc(bytes(a))
5564 b'0x040x0bhello world'
5566 __slots__ = ("defined",)
5567 tag_default = tag_encode(0)
5568 asn1_type_name = "ANY"
5578 :param value: set the value. Either any kind of pyderasn's
5579 **ready** object, or bytes. Pay attention that
5580 **no** validation is performed if raw binary value
5581 is valid TLV, except just tag decoding
5582 :param bytes expl: override default tag with ``EXPLICIT`` one
5583 :param bool optional: is object ``OPTIONAL`` in sequence
5585 super(Any, self).__init__(None, expl, None, optional, _decoded)
5589 value = self._value_sanitize(value)
5591 if self._expl is None:
5592 if value.__class__ == binary_type:
5593 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5595 tag_class, tag_num = value.tag_order
5597 tag_class, _, tag_num = tag_decode(self._expl)
5598 self._tag_order = (tag_class, tag_num)
5601 def _value_sanitize(self, value):
5602 if value.__class__ == binary_type:
5604 raise ValueError("Any value can not be empty")
5606 if isinstance(value, self.__class__):
5608 if not isinstance(value, Obj):
5609 raise InvalidValueType((self.__class__, Obj, binary_type))
5614 return self._value is not None
5617 def tag_order(self):
5618 self._assert_ready()
5619 return self._tag_order
5623 if self.expl_lenindef or self.lenindef:
5625 if self.defined is None:
5627 return self.defined[1].bered
5629 def __getstate__(self):
5647 def __setstate__(self, state):
5648 super(Any, self).__setstate__(state)
5649 self._value = state.value
5650 self.defined = state.defined
5652 def __eq__(self, their):
5653 if their.__class__ == binary_type:
5654 if self._value.__class__ == binary_type:
5655 return self._value == their
5656 return self._value.encode() == their
5657 if issubclass(their.__class__, Any):
5658 if self.ready and their.ready:
5659 return bytes(self) == bytes(their)
5660 return self.ready == their.ready
5669 return self.__class__(
5671 expl=self._expl if expl is None else expl,
5672 optional=self.optional if optional is None else optional,
5675 def __bytes__(self):
5676 self._assert_ready()
5678 if value.__class__ == binary_type:
5680 return self._value.encode()
5687 self._assert_ready()
5689 if value.__class__ == binary_type:
5691 return value.encode()
5693 def _encode_cer(self, writer):
5694 self._assert_ready()
5696 if value.__class__ == binary_type:
5697 write_full(writer, value)
5699 value.encode_cer(writer)
5701 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5703 t, tlen, lv = tag_strip(tlv)
5704 except DecodeError as err:
5705 raise err.__class__(
5707 klass=self.__class__,
5708 decode_path=decode_path,
5712 l, llen, v = len_decode(lv)
5713 except LenIndefForm as err:
5714 if not ctx.get("bered", False):
5715 raise err.__class__(
5717 klass=self.__class__,
5718 decode_path=decode_path,
5721 llen, vlen, v = 1, 0, lv[1:]
5722 sub_offset = offset + tlen + llen
5724 while v[:EOC_LEN].tobytes() != EOC:
5725 chunk, v = Any().decode(
5728 decode_path=decode_path + (str(chunk_i),),
5731 _ctx_immutable=False,
5733 vlen += chunk.tlvlen
5734 sub_offset += chunk.tlvlen
5736 tlvlen = tlen + llen + vlen + EOC_LEN
5737 obj = self.__class__(
5738 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5740 optional=self.optional,
5741 _decoded=(offset, 0, tlvlen),
5744 obj.tag = t.tobytes()
5745 yield decode_path, obj, v[EOC_LEN:]
5747 except DecodeError as err:
5748 raise err.__class__(
5750 klass=self.__class__,
5751 decode_path=decode_path,
5755 raise NotEnoughData(
5756 "encoded length is longer than data",
5757 klass=self.__class__,
5758 decode_path=decode_path,
5761 tlvlen = tlen + llen + l
5762 v, tail = tlv[:tlvlen], v[l:]
5763 obj = self.__class__(
5764 value=None if evgen_mode else v.tobytes(),
5766 optional=self.optional,
5767 _decoded=(offset, 0, tlvlen),
5769 obj.tag = t.tobytes()
5770 yield decode_path, obj, tail
5773 return pp_console_row(next(self.pps()))
5775 def pps(self, decode_path=()):
5779 elif value.__class__ == binary_type:
5785 asn1_type_name=self.asn1_type_name,
5786 obj_name=self.__class__.__name__,
5787 decode_path=decode_path,
5789 blob=self._value if self._value.__class__ == binary_type else None,
5790 optional=self.optional,
5791 default=self == self.default,
5792 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5793 expl=None if self._expl is None else tag_decode(self._expl),
5798 expl_offset=self.expl_offset if self.expled else None,
5799 expl_tlen=self.expl_tlen if self.expled else None,
5800 expl_llen=self.expl_llen if self.expled else None,
5801 expl_vlen=self.expl_vlen if self.expled else None,
5802 expl_lenindef=self.expl_lenindef,
5803 lenindef=self.lenindef,
5806 defined_by, defined = self.defined or (None, None)
5807 if defined_by is not None:
5809 decode_path=decode_path + (DecodePathDefBy(defined_by),)
5811 for pp in self.pps_lenindef(decode_path):
5815 ########################################################################
5816 # ASN.1 constructed types
5817 ########################################################################
5819 def get_def_by_path(defines_by_path, sub_decode_path):
5820 """Get define by decode path
5822 for path, define in defines_by_path:
5823 if len(path) != len(sub_decode_path):
5825 for p1, p2 in zip(path, sub_decode_path):
5826 if (not p1 is any) and (p1 != p2):
5832 def abs_decode_path(decode_path, rel_path):
5833 """Create an absolute decode path from current and relative ones
5835 :param decode_path: current decode path, starting point. Tuple of strings
5836 :param rel_path: relative path to ``decode_path``. Tuple of strings.
5837 If first tuple's element is "/", then treat it as
5838 an absolute path, ignoring ``decode_path`` as
5839 starting point. Also this tuple can contain ".."
5840 elements, stripping the leading element from
5843 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
5844 ("foo", "bar", "baz", "whatever")
5845 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
5847 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
5850 if rel_path[0] == "/":
5852 if rel_path[0] == "..":
5853 return abs_decode_path(decode_path[:-1], rel_path[1:])
5854 return decode_path + rel_path
5857 SequenceState = namedtuple(
5859 BasicState._fields + ("specs", "value",),
5864 class Sequence(Obj):
5865 """``SEQUENCE`` structure type
5867 You have to make specification of sequence::
5869 class Extension(Sequence):
5871 ("extnID", ObjectIdentifier()),
5872 ("critical", Boolean(default=False)),
5873 ("extnValue", OctetString()),
5876 Then, you can work with it as with dictionary.
5878 >>> ext = Extension()
5879 >>> Extension().specs
5881 ('extnID', OBJECT IDENTIFIER),
5882 ('critical', BOOLEAN False OPTIONAL DEFAULT),
5883 ('extnValue', OCTET STRING),
5885 >>> ext["extnID"] = "1.2.3"
5886 Traceback (most recent call last):
5887 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
5888 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
5890 You can determine if sequence is ready to be encoded:
5895 Traceback (most recent call last):
5896 pyderasn.ObjNotReady: object is not ready: extnValue
5897 >>> ext["extnValue"] = OctetString(b"foobar")
5901 Value you want to assign, must have the same **type** as in
5902 corresponding specification, but it can have different tags,
5903 optional/default attributes -- they will be taken from specification
5906 class TBSCertificate(Sequence):
5908 ("version", Version(expl=tag_ctxc(0), default="v1")),
5911 >>> tbs = TBSCertificate()
5912 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
5914 Assign ``None`` to remove value from sequence.
5916 You can set values in Sequence during its initialization:
5918 >>> AlgorithmIdentifier((
5919 ("algorithm", ObjectIdentifier("1.2.3")),
5920 ("parameters", Any(Null()))
5922 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
5924 You can determine if value exists/set in the sequence and take its value:
5926 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
5929 OBJECT IDENTIFIER 1.2.3
5931 But pay attention that if value has default, then it won't be (not
5932 in) in the sequence (because ``DEFAULT`` must not be encoded in
5933 DER), but you can read its value:
5935 >>> "critical" in ext, ext["critical"]
5936 (False, BOOLEAN False)
5937 >>> ext["critical"] = Boolean(True)
5938 >>> "critical" in ext, ext["critical"]
5939 (True, BOOLEAN True)
5941 All defaulted values are always optional.
5943 .. _allow_default_values_ctx:
5945 DER prohibits default value encoding and will raise an error if
5946 default value is unexpectedly met during decode.
5947 If :ref:`bered <bered_ctx>` context option is set, then no error
5948 will be raised, but ``bered`` attribute set. You can disable strict
5949 defaulted values existence validation by setting
5950 ``"allow_default_values": True`` :ref:`context <ctx>` option.
5954 Check for default value existence is not performed in
5955 ``evgen_mode``, because previously decoded values are not stored
5956 in memory, to be able to compare them.
5958 Two sequences are equal if they have equal specification (schema),
5959 implicit/explicit tagging and the same values.
5961 __slots__ = ("specs",)
5962 tag_default = tag_encode(form=TagFormConstructed, num=16)
5963 asn1_type_name = "SEQUENCE"
5975 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
5977 schema = getattr(self, "schema", ())
5979 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5982 if value is not None:
5983 if issubclass(value.__class__, Sequence):
5984 self._value = value._value
5985 elif hasattr(value, "__iter__"):
5986 for seq_key, seq_value in value:
5987 self[seq_key] = seq_value
5989 raise InvalidValueType((Sequence,))
5990 if default is not None:
5991 if not issubclass(default.__class__, Sequence):
5992 raise InvalidValueType((Sequence,))
5993 default_value = default._value
5994 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5995 default_obj.specs = self.specs
5996 default_obj._value = default_value
5997 self.default = default_obj
5999 self._value = copy(default_obj._value)
6003 for name, spec in iteritems(self.specs):
6004 value = self._value.get(name)
6015 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6017 return any(value.bered for value in itervalues(self._value))
6019 def __getstate__(self):
6020 return SequenceState(
6034 {k: copy(v) for k, v in iteritems(self._value)},
6037 def __setstate__(self, state):
6038 super(Sequence, self).__setstate__(state)
6039 self.specs = state.specs
6040 self._value = state.value
6042 def __eq__(self, their):
6043 if not isinstance(their, self.__class__):
6046 self.specs == their.specs and
6047 self.tag == their.tag and
6048 self._expl == their._expl and
6049 self._value == their._value
6060 return self.__class__(
6063 impl=self.tag if impl is None else impl,
6064 expl=self._expl if expl is None else expl,
6065 default=self.default if default is None else default,
6066 optional=self.optional if optional is None else optional,
6069 def __contains__(self, key):
6070 return key in self._value
6072 def __setitem__(self, key, value):
6073 spec = self.specs.get(key)
6075 raise ObjUnknown(key)
6077 self._value.pop(key, None)
6079 if not isinstance(value, spec.__class__):
6080 raise InvalidValueType((spec.__class__,))
6081 value = spec(value=value)
6082 if spec.default is not None and value == spec.default:
6083 self._value.pop(key, None)
6085 self._value[key] = value
6087 def __getitem__(self, key):
6088 value = self._value.get(key)
6089 if value is not None:
6091 spec = self.specs.get(key)
6093 raise ObjUnknown(key)
6094 if spec.default is not None:
6098 def _values_for_encoding(self):
6099 for name, spec in iteritems(self.specs):
6100 value = self._value.get(name)
6104 raise ObjNotReady(name)
6108 v = b"".join(v.encode() for v in self._values_for_encoding())
6109 return b"".join((self.tag, len_encode(len(v)), v))
6111 def _encode_cer(self, writer):
6112 write_full(writer, self.tag + LENINDEF)
6113 for v in self._values_for_encoding():
6114 v.encode_cer(writer)
6115 write_full(writer, EOC)
6117 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6119 t, tlen, lv = tag_strip(tlv)
6120 except DecodeError as err:
6121 raise err.__class__(
6123 klass=self.__class__,
6124 decode_path=decode_path,
6129 klass=self.__class__,
6130 decode_path=decode_path,
6133 if tag_only: # pragma: no cover
6137 ctx_bered = ctx.get("bered", False)
6139 l, llen, v = len_decode(lv)
6140 except LenIndefForm as err:
6142 raise err.__class__(
6144 klass=self.__class__,
6145 decode_path=decode_path,
6148 l, llen, v = 0, 1, lv[1:]
6150 except DecodeError as err:
6151 raise err.__class__(
6153 klass=self.__class__,
6154 decode_path=decode_path,
6158 raise NotEnoughData(
6159 "encoded length is longer than data",
6160 klass=self.__class__,
6161 decode_path=decode_path,
6165 v, tail = v[:l], v[l:]
6167 sub_offset = offset + tlen + llen
6170 ctx_allow_default_values = ctx.get("allow_default_values", False)
6171 for name, spec in iteritems(self.specs):
6172 if spec.optional and (
6173 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6177 sub_decode_path = decode_path + (name,)
6180 for _decode_path, value, v_tail in spec.decode_evgen(
6184 decode_path=sub_decode_path,
6186 _ctx_immutable=False,
6188 yield _decode_path, value, v_tail
6190 _, value, v_tail = next(spec.decode_evgen(
6194 decode_path=sub_decode_path,
6196 _ctx_immutable=False,
6199 except TagMismatch as err:
6200 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6204 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6205 if not evgen_mode and defined is not None:
6206 defined_by, defined_spec = defined
6207 if issubclass(value.__class__, SequenceOf):
6208 for i, _value in enumerate(value):
6209 sub_sub_decode_path = sub_decode_path + (
6211 DecodePathDefBy(defined_by),
6213 defined_value, defined_tail = defined_spec.decode(
6214 memoryview(bytes(_value)),
6216 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6217 if value.expled else (value.tlen + value.llen)
6220 decode_path=sub_sub_decode_path,
6222 _ctx_immutable=False,
6224 if len(defined_tail) > 0:
6227 klass=self.__class__,
6228 decode_path=sub_sub_decode_path,
6231 _value.defined = (defined_by, defined_value)
6233 defined_value, defined_tail = defined_spec.decode(
6234 memoryview(bytes(value)),
6236 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6237 if value.expled else (value.tlen + value.llen)
6240 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6242 _ctx_immutable=False,
6244 if len(defined_tail) > 0:
6247 klass=self.__class__,
6248 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6251 value.defined = (defined_by, defined_value)
6253 value_len = value.fulllen
6255 sub_offset += value_len
6258 if spec.default is not None and value == spec.default:
6259 # This will not work in evgen_mode
6260 if ctx_bered or ctx_allow_default_values:
6264 "DEFAULT value met",
6265 klass=self.__class__,
6266 decode_path=sub_decode_path,
6269 values[name] = value
6270 spec_defines = getattr(spec, "defines", ())
6271 if len(spec_defines) == 0:
6272 defines_by_path = ctx.get("defines_by_path", ())
6273 if len(defines_by_path) > 0:
6274 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6275 if spec_defines is not None and len(spec_defines) > 0:
6276 for rel_path, schema in spec_defines:
6277 defined = schema.get(value, None)
6278 if defined is not None:
6279 ctx.setdefault("_defines", []).append((
6280 abs_decode_path(sub_decode_path[:-1], rel_path),
6284 if v[:EOC_LEN].tobytes() != EOC:
6287 klass=self.__class__,
6288 decode_path=decode_path,
6296 klass=self.__class__,
6297 decode_path=decode_path,
6300 obj = self.__class__(
6304 default=self.default,
6305 optional=self.optional,
6306 _decoded=(offset, llen, vlen),
6309 obj.lenindef = lenindef
6310 obj.ber_encoded = ber_encoded
6311 yield decode_path, obj, tail
6314 value = pp_console_row(next(self.pps()))
6316 for name in self.specs:
6317 _value = self._value.get(name)
6320 cols.append("%s: %s" % (name, repr(_value)))
6321 return "%s[%s]" % (value, "; ".join(cols))
6323 def pps(self, decode_path=()):
6326 asn1_type_name=self.asn1_type_name,
6327 obj_name=self.__class__.__name__,
6328 decode_path=decode_path,
6329 optional=self.optional,
6330 default=self == self.default,
6331 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6332 expl=None if self._expl is None else tag_decode(self._expl),
6337 expl_offset=self.expl_offset if self.expled else None,
6338 expl_tlen=self.expl_tlen if self.expled else None,
6339 expl_llen=self.expl_llen if self.expled else None,
6340 expl_vlen=self.expl_vlen if self.expled else None,
6341 expl_lenindef=self.expl_lenindef,
6342 lenindef=self.lenindef,
6343 ber_encoded=self.ber_encoded,
6346 for name in self.specs:
6347 value = self._value.get(name)
6350 yield value.pps(decode_path=decode_path + (name,))
6351 for pp in self.pps_lenindef(decode_path):
6355 class Set(Sequence):
6356 """``SET`` structure type
6358 Its usage is identical to :py:class:`pyderasn.Sequence`.
6360 .. _allow_unordered_set_ctx:
6362 DER prohibits unordered values encoding and will raise an error
6363 during decode. If :ref:`bered <bered_ctx>` context option is set,
6364 then no error will occur. Also you can disable strict values
6365 ordering check by setting ``"allow_unordered_set": True``
6366 :ref:`context <ctx>` option.
6369 tag_default = tag_encode(form=TagFormConstructed, num=17)
6370 asn1_type_name = "SET"
6373 v = b"".join(value.encode() for value in sorted(
6374 self._values_for_encoding(),
6375 key=attrgetter("tag_order"),
6377 return b"".join((self.tag, len_encode(len(v)), v))
6379 def _encode_cer(self, writer):
6380 write_full(writer, self.tag + LENINDEF)
6382 self._values_for_encoding(),
6383 key=attrgetter("tag_order_cer"),
6385 v.encode_cer(writer)
6386 write_full(writer, EOC)
6388 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6390 t, tlen, lv = tag_strip(tlv)
6391 except DecodeError as err:
6392 raise err.__class__(
6394 klass=self.__class__,
6395 decode_path=decode_path,
6400 klass=self.__class__,
6401 decode_path=decode_path,
6408 ctx_bered = ctx.get("bered", False)
6410 l, llen, v = len_decode(lv)
6411 except LenIndefForm as err:
6413 raise err.__class__(
6415 klass=self.__class__,
6416 decode_path=decode_path,
6419 l, llen, v = 0, 1, lv[1:]
6421 except DecodeError as err:
6422 raise err.__class__(
6424 klass=self.__class__,
6425 decode_path=decode_path,
6429 raise NotEnoughData(
6430 "encoded length is longer than data",
6431 klass=self.__class__,
6435 v, tail = v[:l], v[l:]
6437 sub_offset = offset + tlen + llen
6440 ctx_allow_default_values = ctx.get("allow_default_values", False)
6441 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6442 tag_order_prev = (0, 0)
6443 _specs_items = copy(self.specs)
6446 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6448 for name, spec in iteritems(_specs_items):
6449 sub_decode_path = decode_path + (name,)
6455 decode_path=sub_decode_path,
6458 _ctx_immutable=False,
6465 klass=self.__class__,
6466 decode_path=decode_path,
6470 for _decode_path, value, v_tail in spec.decode_evgen(
6474 decode_path=sub_decode_path,
6476 _ctx_immutable=False,
6478 yield _decode_path, value, v_tail
6480 _, value, v_tail = next(spec.decode_evgen(
6484 decode_path=sub_decode_path,
6486 _ctx_immutable=False,
6489 value_tag_order = value.tag_order
6490 value_len = value.fulllen
6491 if tag_order_prev >= value_tag_order:
6492 if ctx_bered or ctx_allow_unordered_set:
6496 "unordered " + self.asn1_type_name,
6497 klass=self.__class__,
6498 decode_path=sub_decode_path,
6501 if spec.default is None or value != spec.default:
6503 elif ctx_bered or ctx_allow_default_values:
6507 "DEFAULT value met",
6508 klass=self.__class__,
6509 decode_path=sub_decode_path,
6512 values[name] = value
6513 del _specs_items[name]
6514 tag_order_prev = value_tag_order
6515 sub_offset += value_len
6519 obj = self.__class__(
6523 default=self.default,
6524 optional=self.optional,
6525 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6528 if v[:EOC_LEN].tobytes() != EOC:
6531 klass=self.__class__,
6532 decode_path=decode_path,
6537 for name, spec in iteritems(self.specs):
6538 if name not in values and not spec.optional:
6540 "%s value is not ready" % name,
6541 klass=self.__class__,
6542 decode_path=decode_path,
6547 obj.ber_encoded = ber_encoded
6548 yield decode_path, obj, tail
6551 SequenceOfState = namedtuple(
6553 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6558 class SequenceOf(Obj):
6559 """``SEQUENCE OF`` sequence type
6561 For that kind of type you must specify the object it will carry on
6562 (bounds are for example here, not required)::
6564 class Ints(SequenceOf):
6569 >>> ints.append(Integer(123))
6570 >>> ints.append(Integer(234))
6572 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6573 >>> [int(i) for i in ints]
6575 >>> ints.append(Integer(345))
6576 Traceback (most recent call last):
6577 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6580 >>> ints[1] = Integer(345)
6582 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6584 You can initialize sequence with preinitialized values:
6586 >>> ints = Ints([Integer(123), Integer(234)])
6588 Also you can use iterator as a value:
6590 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6592 And it won't be iterated until encoding process. Pay attention that
6593 bounds and required schema checks are done only during the encoding
6594 process in that case! After encode was called, then value is zeroed
6595 back to empty list and you have to set it again. That mode is useful
6596 mainly with CER encoding mode, where all objects from the iterable
6597 will be streamed to the buffer, without copying all of them to
6600 __slots__ = ("spec", "_bound_min", "_bound_max")
6601 tag_default = tag_encode(form=TagFormConstructed, num=16)
6602 asn1_type_name = "SEQUENCE OF"
6615 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6617 schema = getattr(self, "schema", None)
6619 raise ValueError("schema must be specified")
6621 self._bound_min, self._bound_max = getattr(
6625 ) if bounds is None else bounds
6627 if value is not None:
6628 self._value = self._value_sanitize(value)
6629 if default is not None:
6630 default_value = self._value_sanitize(default)
6631 default_obj = self.__class__(
6636 default_obj._value = default_value
6637 self.default = default_obj
6639 self._value = copy(default_obj._value)
6641 def _value_sanitize(self, value):
6643 if issubclass(value.__class__, SequenceOf):
6644 value = value._value
6645 elif hasattr(value, NEXT_ATTR_NAME):
6648 elif hasattr(value, "__iter__"):
6651 raise InvalidValueType((self.__class__, iter, "iterator"))
6653 if not self._bound_min <= len(value) <= self._bound_max:
6654 raise BoundsError(self._bound_min, len(value), self._bound_max)
6655 class_expected = self.spec.__class__
6657 if not isinstance(v, class_expected):
6658 raise InvalidValueType((class_expected,))
6663 if hasattr(self._value, NEXT_ATTR_NAME):
6665 if self._bound_min > 0 and len(self._value) == 0:
6667 return all(v.ready for v in self._value)
6671 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6673 return any(v.bered for v in self._value)
6675 def __getstate__(self):
6676 if hasattr(self._value, NEXT_ATTR_NAME):
6677 raise ValueError("can not pickle SequenceOf with iterator")
6678 return SequenceOfState(
6692 [copy(v) for v in self._value],
6697 def __setstate__(self, state):
6698 super(SequenceOf, self).__setstate__(state)
6699 self.spec = state.spec
6700 self._value = state.value
6701 self._bound_min = state.bound_min
6702 self._bound_max = state.bound_max
6704 def __eq__(self, their):
6705 if isinstance(their, self.__class__):
6707 self.spec == their.spec and
6708 self.tag == their.tag and
6709 self._expl == their._expl and
6710 self._value == their._value
6712 if hasattr(their, "__iter__"):
6713 return self._value == list(their)
6725 return self.__class__(
6729 (self._bound_min, self._bound_max)
6730 if bounds is None else bounds
6732 impl=self.tag if impl is None else impl,
6733 expl=self._expl if expl is None else expl,
6734 default=self.default if default is None else default,
6735 optional=self.optional if optional is None else optional,
6738 def __contains__(self, key):
6739 return key in self._value
6741 def append(self, value):
6742 if not isinstance(value, self.spec.__class__):
6743 raise InvalidValueType((self.spec.__class__,))
6744 if len(self._value) + 1 > self._bound_max:
6747 len(self._value) + 1,
6750 self._value.append(value)
6753 return iter(self._value)
6756 return len(self._value)
6758 def __setitem__(self, key, value):
6759 if not isinstance(value, self.spec.__class__):
6760 raise InvalidValueType((self.spec.__class__,))
6761 self._value[key] = self.spec(value=value)
6763 def __getitem__(self, key):
6764 return self._value[key]
6766 def _values_for_encoding(self):
6767 return iter(self._value)
6770 iterator = hasattr(self._value, NEXT_ATTR_NAME)
6773 values_append = values.append
6774 class_expected = self.spec.__class__
6775 values_for_encoding = self._values_for_encoding()
6777 for v in values_for_encoding:
6778 if not isinstance(v, class_expected):
6779 raise InvalidValueType((class_expected,))
6780 values_append(v.encode())
6781 if not self._bound_min <= len(values) <= self._bound_max:
6782 raise BoundsError(self._bound_min, len(values), self._bound_max)
6783 value = b"".join(values)
6785 value = b"".join(v.encode() for v in self._values_for_encoding())
6786 return b"".join((self.tag, len_encode(len(value)), value))
6788 def _encode_cer(self, writer):
6789 write_full(writer, self.tag + LENINDEF)
6790 iterator = hasattr(self._value, NEXT_ATTR_NAME)
6792 class_expected = self.spec.__class__
6794 values_for_encoding = self._values_for_encoding()
6796 for v in values_for_encoding:
6797 if not isinstance(v, class_expected):
6798 raise InvalidValueType((class_expected,))
6799 v.encode_cer(writer)
6801 if not self._bound_min <= values_count <= self._bound_max:
6802 raise BoundsError(self._bound_min, values_count, self._bound_max)
6804 for v in self._values_for_encoding():
6805 v.encode_cer(writer)
6806 write_full(writer, EOC)
6816 ordering_check=False,
6819 t, tlen, lv = tag_strip(tlv)
6820 except DecodeError as err:
6821 raise err.__class__(
6823 klass=self.__class__,
6824 decode_path=decode_path,
6829 klass=self.__class__,
6830 decode_path=decode_path,
6837 ctx_bered = ctx.get("bered", False)
6839 l, llen, v = len_decode(lv)
6840 except LenIndefForm as err:
6842 raise err.__class__(
6844 klass=self.__class__,
6845 decode_path=decode_path,
6848 l, llen, v = 0, 1, lv[1:]
6850 except DecodeError as err:
6851 raise err.__class__(
6853 klass=self.__class__,
6854 decode_path=decode_path,
6858 raise NotEnoughData(
6859 "encoded length is longer than data",
6860 klass=self.__class__,
6861 decode_path=decode_path,
6865 v, tail = v[:l], v[l:]
6867 sub_offset = offset + tlen + llen
6870 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6871 value_prev = memoryview(v[:0])
6875 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6877 sub_decode_path = decode_path + (str(_value_count),)
6879 for _decode_path, value, v_tail in spec.decode_evgen(
6883 decode_path=sub_decode_path,
6885 _ctx_immutable=False,
6887 yield _decode_path, value, v_tail
6889 _, value, v_tail = next(spec.decode_evgen(
6893 decode_path=sub_decode_path,
6895 _ctx_immutable=False,
6898 value_len = value.fulllen
6900 if value_prev.tobytes() > v[:value_len].tobytes():
6901 if ctx_bered or ctx_allow_unordered_set:
6905 "unordered " + self.asn1_type_name,
6906 klass=self.__class__,
6907 decode_path=sub_decode_path,
6910 value_prev = v[:value_len]
6913 _value.append(value)
6914 sub_offset += value_len
6917 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
6919 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
6920 klass=self.__class__,
6921 decode_path=decode_path,
6925 obj = self.__class__(
6926 value=None if evgen_mode else _value,
6928 bounds=(self._bound_min, self._bound_max),
6931 default=self.default,
6932 optional=self.optional,
6933 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6935 except BoundsError as err:
6938 klass=self.__class__,
6939 decode_path=decode_path,
6943 if v[:EOC_LEN].tobytes() != EOC:
6946 klass=self.__class__,
6947 decode_path=decode_path,
6952 obj.ber_encoded = ber_encoded
6953 yield decode_path, obj, tail
6957 pp_console_row(next(self.pps())),
6958 ", ".join(repr(v) for v in self._value),
6961 def pps(self, decode_path=()):
6964 asn1_type_name=self.asn1_type_name,
6965 obj_name=self.__class__.__name__,
6966 decode_path=decode_path,
6967 optional=self.optional,
6968 default=self == self.default,
6969 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6970 expl=None if self._expl is None else tag_decode(self._expl),
6975 expl_offset=self.expl_offset if self.expled else None,
6976 expl_tlen=self.expl_tlen if self.expled else None,
6977 expl_llen=self.expl_llen if self.expled else None,
6978 expl_vlen=self.expl_vlen if self.expled else None,
6979 expl_lenindef=self.expl_lenindef,
6980 lenindef=self.lenindef,
6981 ber_encoded=self.ber_encoded,
6984 for i, value in enumerate(self._value):
6985 yield value.pps(decode_path=decode_path + (str(i),))
6986 for pp in self.pps_lenindef(decode_path):
6990 class SetOf(SequenceOf):
6991 """``SET OF`` sequence type
6993 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
6996 tag_default = tag_encode(form=TagFormConstructed, num=17)
6997 asn1_type_name = "SET OF"
6999 def _value_sanitize(self, value):
7000 value = super(SetOf, self)._value_sanitize(value)
7001 if hasattr(value, NEXT_ATTR_NAME):
7003 "SetOf does not support iterator values, as no sense in them"
7008 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7009 return b"".join((self.tag, len_encode(len(v)), v))
7011 def _encode_cer(self, writer):
7012 write_full(writer, self.tag + LENINDEF)
7013 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7014 write_full(writer, v)
7015 write_full(writer, EOC)
7017 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7018 return super(SetOf, self)._decode(
7025 ordering_check=True,
7029 def obj_by_path(pypath): # pragma: no cover
7030 """Import object specified as string Python path
7032 Modules must be separated from classes/functions with ``:``.
7034 >>> obj_by_path("foo.bar:Baz")
7035 <class 'foo.bar.Baz'>
7036 >>> obj_by_path("foo.bar:Baz.boo")
7037 <classmethod 'foo.bar.Baz.boo'>
7039 mod, objs = pypath.rsplit(":", 1)
7040 from importlib import import_module
7041 obj = import_module(mod)
7042 for obj_name in objs.split("."):
7043 obj = getattr(obj, obj_name)
7047 def generic_decoder(): # pragma: no cover
7048 # All of this below is a big hack with self references
7049 choice = PrimitiveTypes()
7050 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7051 choice.specs["SetOf"] = SetOf(schema=choice)
7052 for i in six_xrange(31):
7053 choice.specs["SequenceOf%d" % i] = SequenceOf(
7057 choice.specs["Any"] = Any()
7059 # Class name equals to type name, to omit it from output
7060 class SEQUENCEOF(SequenceOf):
7068 with_decode_path=False,
7069 decode_path_only=(),
7071 def _pprint_pps(pps):
7073 if hasattr(pp, "_fields"):
7075 decode_path_only != () and
7076 pp.decode_path[:len(decode_path_only)] != decode_path_only
7079 if pp.asn1_type_name == Choice.asn1_type_name:
7081 pp_kwargs = pp._asdict()
7082 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7083 pp = _pp(**pp_kwargs)
7084 yield pp_console_row(
7089 with_colours=with_colours,
7090 with_decode_path=with_decode_path,
7091 decode_path_len_decrease=len(decode_path_only),
7093 for row in pp_console_blob(
7095 decode_path_len_decrease=len(decode_path_only),
7099 for row in _pprint_pps(pp):
7101 return "\n".join(_pprint_pps(obj.pps()))
7102 return SEQUENCEOF(), pprint_any
7105 def main(): # pragma: no cover
7107 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7108 parser.add_argument(
7112 help="Skip that number of bytes from the beginning",
7114 parser.add_argument(
7116 help="Python paths to dictionary with OIDs, comma separated",
7118 parser.add_argument(
7120 help="Python path to schema definition to use",
7122 parser.add_argument(
7123 "--defines-by-path",
7124 help="Python path to decoder's defines_by_path",
7126 parser.add_argument(
7128 action="store_true",
7129 help="Disallow BER encoding",
7131 parser.add_argument(
7132 "--print-decode-path",
7133 action="store_true",
7134 help="Print decode paths",
7136 parser.add_argument(
7137 "--decode-path-only",
7138 help="Print only specified decode path",
7140 parser.add_argument(
7142 action="store_true",
7143 help="Allow explicit tag out-of-bound",
7145 parser.add_argument(
7147 action="store_true",
7148 help="Turn on event generation mode",
7150 parser.add_argument(
7152 type=argparse.FileType("rb"),
7153 help="Path to BER/CER/DER file you want to decode",
7155 args = parser.parse_args()
7157 args.RAWFile.seek(args.skip)
7158 raw = memoryview(args.RAWFile.read())
7159 args.RAWFile.close()
7161 raw = file_mmaped(args.RAWFile)[args.skip:]
7163 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7164 if args.oids else ()
7167 schema = obj_by_path(args.schema)
7168 from functools import partial
7169 pprinter = partial(pprint, big_blobs=True)
7171 schema, pprinter = generic_decoder()
7173 "bered": not args.nobered,
7174 "allow_expl_oob": args.allow_expl_oob,
7176 if args.defines_by_path is not None:
7177 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7178 from os import environ
7182 with_colours=environ.get("NO_COLOR") is None,
7183 with_decode_path=args.print_decode_path,
7185 () if args.decode_path_only is None else
7186 tuple(args.decode_path_only.split(":"))
7190 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7191 print(pprinter(obj, decode_path=decode_path))
7193 obj, tail = schema().decode(raw, ctx=ctx)
7194 print(pprinter(obj))
7196 print("\nTrailing data: %s" % hexenc(tail))
7199 if __name__ == "__main__":