3 # cython: language_level=3
4 # PyDERASN -- Python ASN.1 DER/BER codec with abstract structures
5 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Lesser General Public License as
9 # published by the Free Software Foundation, version 3 of the License.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Lesser General Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Python ASN.1 DER/BER codec with abstract structures
20 This library allows you to marshal various structures in ASN.1 DER
21 format, unmarshal them in BER/CER/DER ones.
25 >>> Integer().decod(raw) == i
28 There are primitive types, holding single values
29 (:py:class:`pyderasn.BitString`,
30 :py:class:`pyderasn.Boolean`,
31 :py:class:`pyderasn.Enumerated`,
32 :py:class:`pyderasn.GeneralizedTime`,
33 :py:class:`pyderasn.Integer`,
34 :py:class:`pyderasn.Null`,
35 :py:class:`pyderasn.ObjectIdentifier`,
36 :py:class:`pyderasn.OctetString`,
37 :py:class:`pyderasn.UTCTime`,
38 :py:class:`various strings <pyderasn.CommonString>`
39 (:py:class:`pyderasn.BMPString`,
40 :py:class:`pyderasn.GeneralString`,
41 :py:class:`pyderasn.GraphicString`,
42 :py:class:`pyderasn.IA5String`,
43 :py:class:`pyderasn.ISO646String`,
44 :py:class:`pyderasn.NumericString`,
45 :py:class:`pyderasn.PrintableString`,
46 :py:class:`pyderasn.T61String`,
47 :py:class:`pyderasn.TeletexString`,
48 :py:class:`pyderasn.UniversalString`,
49 :py:class:`pyderasn.UTF8String`,
50 :py:class:`pyderasn.VideotexString`,
51 :py:class:`pyderasn.VisibleString`)),
52 constructed types, holding multiple primitive types
53 (:py:class:`pyderasn.Sequence`,
54 :py:class:`pyderasn.SequenceOf`,
55 :py:class:`pyderasn.Set`,
56 :py:class:`pyderasn.SetOf`),
57 and special types like
58 :py:class:`pyderasn.Any` and
59 :py:class:`pyderasn.Choice`.
67 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
68 the default tag used during coding process. You can override it with
69 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
70 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
71 argument or ``expl`` class attribute). Both arguments take raw binary
72 string, containing that tag. You can **not** set implicit and explicit
75 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
76 functions, allowing you to easily create ``CONTEXT``
77 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
82 EXPLICIT tags always have **constructed** tag. PyDERASN does not
83 explicitly check correctness of schema input here.
87 Implicit tags have **primitive** (``tag_ctxp``) encoding for
92 >>> Integer(impl=tag_ctxp(1))
94 >>> Integer(expl=tag_ctxc(2))
97 Implicit tag is not explicitly shown.
99 Two objects of the same type, but with different implicit/explicit tags
102 You can get object's effective tag (either default or implicited) through
103 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
106 >>> tag_decode(tag_ctxc(123))
108 >>> klass, form, num = tag_decode(tag_ctxc(123))
109 >>> klass == TagClassContext
111 >>> form == TagFormConstructed
114 To determine if object has explicit tag, use ``expled`` boolean property
115 and ``expl_tag`` property, returning explicit tag's value.
120 Many objects in sequences could be ``OPTIONAL`` and could have
121 ``DEFAULT`` value. You can specify that object's property using
122 corresponding keyword arguments.
124 >>> Integer(optional=True, default=123)
125 INTEGER 123 OPTIONAL DEFAULT
127 Those specifications do not play any role in primitive value encoding,
128 but are taken into account when dealing with sequences holding them. For
129 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
132 class Version(Integer):
138 class TBSCertificate(Sequence):
140 ("version", Version(expl=tag_ctxc(0), default="v1")),
143 When default argument is used and value is not specified, then it equals
151 Some objects give ability to set value size constraints. This is either
152 possible integer value, or allowed length of various strings and
153 sequences. Constraints are set in the following way::
158 And values satisfaction is checked as: ``MIN <= X <= MAX``.
160 For simplicity you can also set bounds the following way::
162 bounded_x = X(bounds=(MIN, MAX))
164 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
170 All objects have ``ready`` boolean property, that tells if object is
171 ready to be encoded. If that kind of action is performed on unready
172 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
174 All objects are friendly to ``copy.copy()`` and copied objects can be
177 Also all objects can be safely ``pickle``-d, but pay attention that
178 pickling among different PyDERASN versions is prohibited.
185 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
186 ``offset`` optional argument could be used to set initial object's
187 offset in the binary data, for convenience. It returns decoded object
188 and remaining unmarshalled data (tail). Internally all work is done on
189 ``memoryview(data)``, and you can leave returning tail as a memoryview,
190 by specifying ``leavemm=True`` argument.
192 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
193 immediately checks and raises if there is non-empty tail.
195 When object is decoded, ``decoded`` property is true and you can safely
196 use following properties:
198 * ``offset`` -- position including initial offset where object's tag starts
199 * ``tlen`` -- length of object's tag
200 * ``llen`` -- length of object's length value
201 * ``vlen`` -- length of object's value
202 * ``tlvlen`` -- length of the whole object
204 Pay attention that those values do **not** include anything related to
205 explicit tag. If you want to know information about it, then use:
207 * ``expled`` -- to know if explicit tag is set
208 * ``expl_offset`` (it is lesser than ``offset``)
211 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
212 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
214 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
217 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
224 You can specify so called context keyword argument during
225 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
226 various options governing decoding process.
228 Currently available context options:
230 * :ref:`allow_default_values <allow_default_values_ctx>`
231 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
232 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
233 * :ref:`bered <bered_ctx>`
234 * :ref:`defines_by_path <defines_by_path_ctx>`
235 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
242 All objects have ``pps()`` method, that is a generator of
243 :py:class:`pyderasn.PP` namedtuple, holding various raw information
244 about the object. If ``pps`` is called on sequences, then all underlying
245 ``PP`` will be yielded.
247 You can use :py:func:`pyderasn.pp_console_row` function, converting
248 those ``PP`` to human readable string. Actually exactly it is used for
249 all object ``repr``. But it is easy to write custom formatters.
251 >>> from pyderasn import pprint
252 >>> encoded = Integer(-12345).encode()
253 >>> obj, tail = Integer().decode(encoded)
254 >>> print(pprint(obj))
255 0 [1,1, 2] INTEGER -12345
259 Example certificate::
261 >>> print(pprint(crt))
262 0 [1,3,1604] Certificate SEQUENCE
263 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
264 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
265 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
266 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
267 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
268 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
270 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
271 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
272 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
273 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
274 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
275 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
276 . . . . . . . 13:02:45:53
278 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
279 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
280 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
282 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
283 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
284 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
289 Let's parse that output, human::
291 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
292 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
293 0 1 2 3 4 5 6 7 8 9 10 11
297 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
303 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
309 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
314 Offset of the object, where its DER/BER encoding begins.
315 Pay attention that it does **not** include explicit tag.
317 If explicit tag exists, then this is its length (tag + encoded length).
319 Length of object's tag. For example CHOICE does not have its own tag,
322 Length of encoded length.
324 Length of encoded value.
326 Visual indentation to show the depth of object in the hierarchy.
328 Object's name inside SEQUENCE/CHOICE.
330 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
331 here. "IMPLICIT" is omitted.
333 Object's class name, if set. Omitted if it is just an ordinary simple
334 value (like with ``algorithm`` in example above).
338 Object's value, if set. Can consist of multiple words (like OCTET/BIT
339 STRINGs above). We see ``v3`` value in Version, because it is named.
340 ``rdnSequence`` is the choice of CHOICE type.
342 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
343 default one, specified in the schema.
345 Shows does object contains any kind of BER encoded data (possibly
346 Sequence holding BER-encoded underlying value).
348 Only applicable to BER encoded data. Indefinite length encoding mark.
350 Only applicable to BER encoded data. If object has BER-specific
351 encoding, then ``BER`` will be shown. It does not depend on indefinite
352 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
353 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
361 ASN.1 structures often have ANY and OCTET STRING fields, that are
362 DEFINED BY some previously met ObjectIdentifier. This library provides
363 ability to specify mapping between some OID and field that must be
364 decoded with specific specification.
371 :py:class:`pyderasn.ObjectIdentifier` field inside
372 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
373 necessary for decoding structures. For example, CMS (:rfc:`5652`)
376 class ContentInfo(Sequence):
378 ("contentType", ContentType(defines=((("content",), {
379 id_digestedData: DigestedData(),
380 id_signedData: SignedData(),
382 ("content", Any(expl=tag_ctxc(0))),
385 ``contentType`` field tells that it defines that ``content`` must be
386 decoded with ``SignedData`` specification, if ``contentType`` equals to
387 ``id-signedData``. The same applies to ``DigestedData``. If
388 ``contentType`` contains unknown OID, then no automatic decoding is
391 You can specify multiple fields, that will be autodecoded -- that is why
392 ``defines`` kwarg is a sequence. You can specify defined field
393 relatively or absolutely to current decode path. For example ``defines``
394 for AlgorithmIdentifier of X.509's
395 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
399 id_ecPublicKey: ECParameters(),
400 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
402 (("..", "subjectPublicKey"), {
403 id_rsaEncryption: RSAPublicKey(),
404 id_GostR3410_2001: OctetString(),
408 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
409 autodecode its parameters inside SPKI's algorithm and its public key
412 Following types can be automatically decoded (DEFINED BY):
414 * :py:class:`pyderasn.Any`
415 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
416 * :py:class:`pyderasn.OctetString`
417 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
418 ``Any``/``BitString``/``OctetString``-s
420 When any of those fields is automatically decoded, then ``.defined``
421 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
422 was defined, ``value`` contains corresponding decoded value. For example
423 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
425 .. _defines_by_path_ctx:
427 defines_by_path context option
428 ______________________________
430 Sometimes you either can not or do not want to explicitly set *defines*
431 in the schema. You can dynamically apply those definitions when calling
432 :py:meth:`pyderasn.Obj.decode` method.
434 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
435 value must be sequence of following tuples::
437 (decode_path, defines)
439 where ``decode_path`` is a tuple holding so-called decode path to the
440 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
441 ``defines``, holding exactly the same value as accepted in its
442 :ref:`keyword argument <defines>`.
444 For example, again for CMS, you want to automatically decode
445 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
446 structures it may hold. Also, automatically decode ``controlSequence``
449 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
452 ((("content",), {id_signedData: SignedData()}),),
457 DecodePathDefBy(id_signedData),
462 id_cct_PKIData: PKIData(),
463 id_cct_PKIResponse: PKIResponse(),
469 DecodePathDefBy(id_signedData),
472 DecodePathDefBy(id_cct_PKIResponse),
478 id_cmc_recipientNonce: RecipientNonce(),
479 id_cmc_senderNonce: SenderNonce(),
480 id_cmc_statusInfoV2: CMCStatusInfoV2(),
481 id_cmc_transactionId: TransactionId(),
486 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
487 First function is useful for path construction when some automatic
488 decoding is already done. ``any`` means literally any value it meet --
489 useful for SEQUENCE/SET OF-s.
496 By default PyDERASN accepts only DER encoded data. By default it encodes
497 to DER. But you can optionally enable BER decoding with setting
498 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
499 constructed primitive types should be parsed successfully.
501 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
502 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
503 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
504 ``UTCTime``, ``GeneralizedTime`` can contain it.
505 * If object has an indefinite length encoding, then its ``lenindef``
506 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
507 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
509 * If object has an indefinite length encoded explicit tag, then
510 ``expl_lenindef`` is set to True.
511 * If object has either any of BER-related encoding (explicit tag
512 indefinite length, object's indefinite length, BER-encoding) or any
513 underlying component has that kind of encoding, then ``bered``
514 attribute is set to True. For example SignedData CMS can have
515 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
516 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
518 EOC (end-of-contents) token's length is taken in advance in object's
521 .. _allow_expl_oob_ctx:
523 Allow explicit tag out-of-bound
524 -------------------------------
526 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
527 one value, more than one object. If you set ``allow_expl_oob`` context
528 option to True, then no error will be raised and that invalid encoding
529 will be silently further processed. But pay attention that offsets and
530 lengths will be invalid in that case.
534 This option should be used only for skipping some decode errors, just
535 to see the decoded structure somehow.
539 Streaming and dealing with huge structures
540 ------------------------------------------
547 ASN.1 structures can be huge, they can hold millions of objects inside
548 (for example Certificate Revocation Lists (CRL), holding revocation
549 state for every previously issued X.509 certificate). CACert.org's 8 MiB
550 CRL file takes more than half a gigabyte of memory to hold the decoded
553 If you just simply want to check the signature over the ``tbsCertList``,
554 you can create specialized schema with that field represented as
555 OctetString for example::
557 class TBSCertListFast(Sequence):
560 ("revokedCertificates", OctetString(
561 impl=SequenceOf.tag_default,
567 This allows you to quickly decode a few fields and check the signature
568 over the ``tbsCertList`` bytes.
570 But how can you get all certificate's serial number from it, after you
571 trust that CRL after signature validation? You can use so called
572 ``evgen`` (event generation) mode, to catch the events/facts of some
573 successful object decoding. Let's use command line capabilities::
575 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
576 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
577 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
578 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
579 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
580 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
581 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
582 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
583 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
585 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
586 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
587 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
590 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
591 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
592 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
596 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
597 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
598 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
599 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
600 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
601 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
602 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
603 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
604 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
605 0 [1,4,9145534] CertificateList SEQUENCE
607 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
608 decodes underlying values. It can not tell if SEQUENCE is decoded, so
609 the event of the upper level SEQUENCE is the last one we see.
610 ``version`` field is just a single INTEGER -- it is decoded and event is
611 fired immediately. Then we see that ``algorithm`` and ``parameters``
612 fields are decoded and only after them the ``signature`` SEQUENCE is
613 fired as a successfully decoded. There are 4 events for each revoked
614 certificate entry in that CRL: ``userCertificate`` serial number,
615 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
616 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
618 We can do that in our ordinary Python code and understand where we are
619 by looking at deterministically generated decode paths (do not forget
620 about useful ``--print-decode-path`` CLI option). We must use
621 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
622 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
623 obj, tail)`` tuples::
625 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
627 len(decode_path) == 4 and
628 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
629 decode_path[3] == "userCertificate"
631 print("serial number:", int(obj))
633 Virtually it does not take any memory except at least needed for single
634 object storage. You can easily use that mode to determine required
635 object ``.offset`` and ``.*len`` to be able to decode it separately, or
636 maybe verify signature upon it just by taking bytes by ``.offset`` and
639 .. _evgen_mode_upto_ctx:
644 There is full ability to get any kind of data from the CRL in the
645 example above. However it is not too convenient to get the whole
646 ``RevokedCertificate`` structure, that is pretty lightweight and one may
647 do not want to disassemble it. You can use ``evgen_mode_upto``
648 :ref:`ctx <ctx>` option that semantically equals to
649 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
650 mapped to any non-None value. If specified decode path is met, then any
651 subsequent objects won't be decoded in evgen mode. That allows us to
652 parse the CRL above with fully assembled ``RevokedCertificate``::
654 for decode_path, obj, _ in CertificateList().decode_evgen(
656 ctx={"evgen_mode_upto": (
657 (("tbsCertList", "revokedCertificates", any), True),
661 len(decode_path) == 3 and
662 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
664 print("serial number:", int(obj["userCertificate"]))
671 POSIX compliant systems have ``mmap`` syscall, giving ability to work
672 the memory mapped file. You can deal with the file like it was an
673 ordinary binary string, allowing you not to load it to the memory first.
674 Also you can use them as an input for OCTET STRING, taking no Python
675 memory for their storage.
677 There is convenient :py:func:`pyderasn.file_mmaped` function that
678 creates read-only memoryview on the file contents::
680 with open("huge", "rb") as fd:
681 raw = file_mmaped(fd)
682 obj = Something.decode(raw)
686 mmap-ed files in Python2.7 does not implement buffer protocol, so
687 memoryview won't work on them.
691 mmap maps the **whole** file. So it plays no role if you seek-ed it
692 before. Take the slice of the resulting memoryview with required
697 If you use ZFS as underlying storage, then pay attention that
698 currently most platforms does not deal good with ZFS ARC and ordinary
699 page cache used for mmaps. It can take twice the necessary size in
700 the memory: both in page cache and ZFS ARC.
705 We can parse any kind of data now, but how can we produce files
706 streamingly, without storing their encoded representation in memory?
707 SEQUENCE by default encodes in memory all its values, joins them in huge
708 binary string, just to know the exact size of SEQUENCE's value for
709 encoding it in TLV. DER requires you to know all exact sizes of the
712 You can use CER encoding mode, that slightly differs from the DER, but
713 does not require exact sizes knowledge, allowing streaming encoding
714 directly to some writer/buffer. Just use
715 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
716 encoded data will flow::
718 opener = io.open if PY2 else open
719 with opener("result", "wb") as fd:
720 obj.encode_cer(fd.write)
725 obj.encode_cer(buf.write)
727 If you do not want to create in-memory buffer every time, then you can
728 use :py:func:`pyderasn.encode_cer` function::
730 data = encode_cer(obj)
732 Remember that CER is **not valid** DER in most cases, so you **have to**
733 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
734 decoding. Also currently there is **no** validation that provided CER is
735 valid one -- you are sure that it has only valid BER encoding.
739 SET OF values can not be streamingly encoded, because they are
740 required to be sorted byte-by-byte. Big SET OF values still will take
741 much memory. Use neither SET nor SET OF values, as modern ASN.1
744 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
745 OCTET STRINGs! They will be streamingly copied from underlying file to
746 the buffer using 1 KB chunks.
748 Some structures require that some of the elements have to be forcefully
749 DER encoded. For example ``SignedData`` CMS requires you to encode
750 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
751 encode everything else in BER. You can tell any of the structures to be
752 forcefully encoded in DER during CER encoding, by specifying
753 ``der_forced=True`` attribute::
755 class Certificate(Sequence):
759 class SignedAttributes(SetOf):
767 In most cases, huge quantity of binary data is stored as OCTET STRING.
768 CER encoding splits it on 1 KB chunks. BER allows splitting on various
769 levels of chunks inclusion::
771 SOME STRING[CONSTRUCTED]
772 OCTET STRING[CONSTRUCTED]
773 OCTET STRING[PRIMITIVE]
775 OCTET STRING[PRIMITIVE]
777 OCTET STRING[PRIMITIVE]
779 OCTET STRING[PRIMITIVE]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[CONSTRUCTED]
787 OCTET STRING[CONSTRUCTED]
788 OCTET STRING[PRIMITIVE]
791 You can not just take the offset and some ``.vlen`` of the STRING and
792 treat it as the payload. If you decode it without
793 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
794 and ``bytes()`` will give the whole payload contents.
796 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
797 small memory footprint. There is convenient
798 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
799 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
800 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
801 SHA512 digest of its ``eContent``::
803 fd = open("data.p7m", "rb")
804 raw = file_mmaped(fd)
805 ctx = {"bered": True}
806 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
807 if decode_path == ("content",):
811 raise ValueError("no content found")
812 hasher_state = sha512()
814 hasher_state.update(data)
816 evgens = SignedData().decode_evgen(
817 raw[content.offset:],
818 offset=content.offset,
821 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
823 digest = hasher_state.digest()
825 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
826 copy the payload (without BER/CER encoding interleaved overhead) in it.
827 Virtually it won't take memory more than for keeping small structures
828 and 1 KB binary chunks.
830 SEQUENCE OF iterators
831 _____________________
833 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
834 classes. The only difference with providing the full list of objects, is
835 that type and bounds checking is done during encoding process. Also
836 sequence's value will be emptied after encoding, forcing you to set its
839 This is very useful when you have to create some huge objects, like
840 CRLs, with thousands and millions of entities inside. You can write the
841 generator taking necessary data from the database and giving the
842 ``RevokedCertificate`` objects. Only binary representation of that
843 objects will take memory during DER encoding.
847 .. autoclass:: pyderasn.Obj
855 .. autoclass:: pyderasn.Boolean
860 .. autoclass:: pyderasn.Integer
861 :members: __init__, named
865 .. autoclass:: pyderasn.BitString
866 :members: __init__, bit_len, named
870 .. autoclass:: pyderasn.OctetString
875 .. autoclass:: pyderasn.Null
880 .. autoclass:: pyderasn.ObjectIdentifier
885 .. autoclass:: pyderasn.Enumerated
889 .. autoclass:: pyderasn.CommonString
893 .. autoclass:: pyderasn.NumericString
897 .. autoclass:: pyderasn.PrintableString
898 :members: __init__, allow_asterisk, allow_ampersand
902 .. autoclass:: pyderasn.UTCTime
903 :members: __init__, todatetime
907 .. autoclass:: pyderasn.GeneralizedTime
908 :members: __init__, todatetime
915 .. autoclass:: pyderasn.Choice
916 :members: __init__, choice, value
920 .. autoclass:: PrimitiveTypes
924 .. autoclass:: pyderasn.Any
932 .. autoclass:: pyderasn.Sequence
937 .. autoclass:: pyderasn.Set
942 .. autoclass:: pyderasn.SequenceOf
947 .. autoclass:: pyderasn.SetOf
953 .. autofunction:: pyderasn.abs_decode_path
954 .. autofunction:: pyderasn.agg_octet_string
955 .. autofunction:: pyderasn.colonize_hex
956 .. autofunction:: pyderasn.encode_cer
957 .. autofunction:: pyderasn.file_mmaped
958 .. autofunction:: pyderasn.hexenc
959 .. autofunction:: pyderasn.hexdec
960 .. autofunction:: pyderasn.tag_encode
961 .. autofunction:: pyderasn.tag_decode
962 .. autofunction:: pyderasn.tag_ctxp
963 .. autofunction:: pyderasn.tag_ctxc
964 .. autoclass:: pyderasn.DecodeError
966 .. autoclass:: pyderasn.NotEnoughData
967 .. autoclass:: pyderasn.ExceedingData
968 .. autoclass:: pyderasn.LenIndefForm
969 .. autoclass:: pyderasn.TagMismatch
970 .. autoclass:: pyderasn.InvalidLength
971 .. autoclass:: pyderasn.InvalidOID
972 .. autoclass:: pyderasn.ObjUnknown
973 .. autoclass:: pyderasn.ObjNotReady
974 .. autoclass:: pyderasn.InvalidValueType
975 .. autoclass:: pyderasn.BoundsError
982 You can decode DER/BER files using command line abilities::
984 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
986 If there is no schema for your file, then you can try parsing it without,
987 but of course IMPLICIT tags will often make it impossible. But result is
988 good enough for the certificate above::
990 $ python -m pyderasn path/to/file
991 0 [1,3,1604] . >: SEQUENCE OF
992 4 [1,3,1453] . . >: SEQUENCE OF
993 8 [0,0, 5] . . . . >: [0] ANY
994 . . . . . A0:03:02:01:02
995 13 [1,1, 3] . . . . >: INTEGER 61595
996 18 [1,1, 13] . . . . >: SEQUENCE OF
997 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
998 31 [1,1, 0] . . . . . . >: NULL
999 33 [1,3, 274] . . . . >: SEQUENCE OF
1000 37 [1,1, 11] . . . . . . >: SET OF
1001 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1002 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1003 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1005 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1006 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1007 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1008 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1009 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1010 . . . . . . . . . 61:2E:63:6F:6D:2F
1011 1461 [1,1, 13] . . >: SEQUENCE OF
1012 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1013 1474 [1,1, 0] . . . . >: NULL
1014 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1015 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1016 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1022 If you have got dictionaries with ObjectIdentifiers, like example one
1023 from ``tests/test_crts.py``::
1026 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1027 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1029 "2.5.4.10": "id-at-organizationName",
1030 "2.5.4.11": "id-at-organizationalUnitName",
1033 then you can pass it to pretty printer to see human readable OIDs::
1035 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1037 37 [1,1, 11] . . . . . . >: SET OF
1038 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1039 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1040 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1041 50 [1,1, 18] . . . . . . >: SET OF
1042 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1043 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1044 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1045 70 [1,1, 18] . . . . . . >: SET OF
1046 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1047 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1048 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1054 Each decoded element has so-called decode path: sequence of structure
1055 names it is passing during the decode process. Each element has its own
1056 unique path inside the whole ASN.1 tree. You can print it out with
1057 ``--print-decode-path`` option::
1059 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1060 0 [1,3,1604] Certificate SEQUENCE []
1061 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1062 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1063 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1064 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1065 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1066 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1068 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1069 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1070 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1071 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1072 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1073 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1074 . . . . . . . 13:02:45:53
1075 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1078 Now you can print only the specified tree, for example signature algorithm::
1080 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1081 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1082 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1083 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1087 from codecs import getdecoder
1088 from codecs import getencoder
1089 from collections import namedtuple
1090 from collections import OrderedDict
1091 from copy import copy
1092 from datetime import datetime
1093 from datetime import timedelta
1094 from io import BytesIO
1095 from math import ceil
1096 from mmap import mmap
1097 from mmap import PROT_READ
1098 from operator import attrgetter
1099 from string import ascii_letters
1100 from string import digits
1101 from sys import version_info
1102 from unicodedata import category as unicat
1104 from six import add_metaclass
1105 from six import binary_type
1106 from six import byte2int
1107 from six import indexbytes
1108 from six import int2byte
1109 from six import integer_types
1110 from six import iterbytes
1111 from six import iteritems
1112 from six import itervalues
1114 from six import string_types
1115 from six import text_type
1116 from six import unichr as six_unichr
1117 from six.moves import xrange as six_xrange
1121 from termcolor import colored
1122 except ImportError: # pragma: no cover
1123 def colored(what, *args, **kwargs):
1172 "TagClassApplication",
1175 "TagClassUniversal",
1176 "TagFormConstructed",
1187 TagClassUniversal = 0
1188 TagClassApplication = 1 << 6
1189 TagClassContext = 1 << 7
1190 TagClassPrivate = 1 << 6 | 1 << 7
1191 TagFormPrimitive = 0
1192 TagFormConstructed = 1 << 5
1194 TagClassContext: "",
1195 TagClassApplication: "APPLICATION ",
1196 TagClassPrivate: "PRIVATE ",
1197 TagClassUniversal: "UNIV ",
1201 LENINDEF = b"\x80" # length indefinite mark
1202 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1203 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1204 SET01 = frozenset("01")
1205 DECIMALS = frozenset(digits)
1206 DECIMAL_SIGNS = ".,"
1207 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1210 def file_mmaped(fd):
1211 """Make mmap-ed memoryview for reading from file
1213 :param fd: file object
1214 :returns: memoryview over read-only mmap-ing of the whole file
1216 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1219 if not set(value) <= DECIMALS:
1220 raise ValueError("non-pure integer")
1223 def fractions2float(fractions_raw):
1224 pureint(fractions_raw)
1225 return float("0." + fractions_raw)
1228 def get_def_by_path(defines_by_path, sub_decode_path):
1229 """Get define by decode path
1231 for path, define in defines_by_path:
1232 if len(path) != len(sub_decode_path):
1234 for p1, p2 in zip(path, sub_decode_path):
1235 if (not p1 is any) and (p1 != p2):
1241 ########################################################################
1243 ########################################################################
1245 class ASN1Error(ValueError):
1249 class DecodeError(ASN1Error):
1250 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1252 :param str msg: reason of decode failing
1253 :param klass: optional exact DecodeError inherited class (like
1254 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1255 :py:exc:`InvalidLength`)
1256 :param decode_path: tuple of strings. It contains human
1257 readable names of the fields through which
1258 decoding process has passed
1259 :param int offset: binary offset where failure happened
1261 super(DecodeError, self).__init__()
1264 self.decode_path = decode_path
1265 self.offset = offset
1270 "" if self.klass is None else self.klass.__name__,
1272 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1273 if len(self.decode_path) > 0 else ""
1275 ("(at %d)" % self.offset) if self.offset > 0 else "",
1281 return "%s(%s)" % (self.__class__.__name__, self)
1284 class NotEnoughData(DecodeError):
1288 class ExceedingData(ASN1Error):
1289 def __init__(self, nbytes):
1290 super(ExceedingData, self).__init__()
1291 self.nbytes = nbytes
1294 return "%d trailing bytes" % self.nbytes
1297 return "%s(%s)" % (self.__class__.__name__, self)
1300 class LenIndefForm(DecodeError):
1304 class TagMismatch(DecodeError):
1308 class InvalidLength(DecodeError):
1312 class InvalidOID(DecodeError):
1316 class ObjUnknown(ASN1Error):
1317 def __init__(self, name):
1318 super(ObjUnknown, self).__init__()
1322 return "object is unknown: %s" % self.name
1325 return "%s(%s)" % (self.__class__.__name__, self)
1328 class ObjNotReady(ASN1Error):
1329 def __init__(self, name):
1330 super(ObjNotReady, self).__init__()
1334 return "object is not ready: %s" % self.name
1337 return "%s(%s)" % (self.__class__.__name__, self)
1340 class InvalidValueType(ASN1Error):
1341 def __init__(self, expected_types):
1342 super(InvalidValueType, self).__init__()
1343 self.expected_types = expected_types
1346 return "invalid value type, expected: %s" % ", ".join(
1347 [repr(t) for t in self.expected_types]
1351 return "%s(%s)" % (self.__class__.__name__, self)
1354 class BoundsError(ASN1Error):
1355 def __init__(self, bound_min, value, bound_max):
1356 super(BoundsError, self).__init__()
1357 self.bound_min = bound_min
1359 self.bound_max = bound_max
1362 return "unsatisfied bounds: %s <= %s <= %s" % (
1369 return "%s(%s)" % (self.__class__.__name__, self)
1372 ########################################################################
1374 ########################################################################
1376 _hexdecoder = getdecoder("hex")
1377 _hexencoder = getencoder("hex")
1381 """Binary data to hexadecimal string convert
1383 return _hexdecoder(data)[0]
1387 """Hexadecimal string to binary data convert
1389 return _hexencoder(data)[0].decode("ascii")
1392 def int_bytes_len(num, byte_len=8):
1395 return int(ceil(float(num.bit_length()) / byte_len))
1398 def zero_ended_encode(num):
1399 octets = bytearray(int_bytes_len(num, 7))
1401 octets[i] = num & 0x7F
1405 octets[i] = 0x80 | (num & 0x7F)
1408 return bytes(octets)
1411 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1412 """Encode tag to binary form
1414 :param int num: tag's number
1415 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1416 :py:data:`pyderasn.TagClassContext`,
1417 :py:data:`pyderasn.TagClassApplication`,
1418 :py:data:`pyderasn.TagClassPrivate`)
1419 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1420 :py:data:`pyderasn.TagFormConstructed`)
1424 return int2byte(klass | form | num)
1425 # [XX|X|11111][1.......][1.......] ... [0.......]
1426 return int2byte(klass | form | 31) + zero_ended_encode(num)
1429 def tag_decode(tag):
1430 """Decode tag from binary form
1434 No validation is performed, assuming that it has already passed.
1436 It returns tuple with three integers, as
1437 :py:func:`pyderasn.tag_encode` accepts.
1439 first_octet = byte2int(tag)
1440 klass = first_octet & 0xC0
1441 form = first_octet & 0x20
1442 if first_octet & 0x1F < 0x1F:
1443 return (klass, form, first_octet & 0x1F)
1445 for octet in iterbytes(tag[1:]):
1448 return (klass, form, num)
1452 """Create CONTEXT PRIMITIVE tag
1454 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1458 """Create CONTEXT CONSTRUCTED tag
1460 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1463 def tag_strip(data):
1464 """Take off tag from the data
1466 :returns: (encoded tag, tag length, remaining data)
1469 raise NotEnoughData("no data at all")
1470 if byte2int(data) & 0x1F < 31:
1471 return data[:1], 1, data[1:]
1476 raise DecodeError("unfinished tag")
1477 if indexbytes(data, i) & 0x80 == 0:
1480 return data[:i], i, data[i:]
1486 octets = bytearray(int_bytes_len(l) + 1)
1487 octets[0] = 0x80 | (len(octets) - 1)
1488 for i in six_xrange(len(octets) - 1, 0, -1):
1489 octets[i] = l & 0xFF
1491 return bytes(octets)
1494 def len_decode(data):
1497 :returns: (decoded length, length's length, remaining data)
1498 :raises LenIndefForm: if indefinite form encoding is met
1501 raise NotEnoughData("no data at all")
1502 first_octet = byte2int(data)
1503 if first_octet & 0x80 == 0:
1504 return first_octet, 1, data[1:]
1505 octets_num = first_octet & 0x7F
1506 if octets_num + 1 > len(data):
1507 raise NotEnoughData("encoded length is longer than data")
1509 raise LenIndefForm()
1510 if byte2int(data[1:]) == 0:
1511 raise DecodeError("leading zeros")
1513 for v in iterbytes(data[1:1 + octets_num]):
1516 raise DecodeError("long form instead of short one")
1517 return l, 1 + octets_num, data[1 + octets_num:]
1520 LEN1K = len_encode(1000)
1523 def write_full(writer, data):
1524 """Fully write provided data
1526 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1528 BytesIO does not guarantee that the whole data will be written at
1529 once. That function write everything provided, raising an error if
1530 ``writer`` returns None.
1532 data = memoryview(data)
1534 while written != len(data):
1535 n = writer(data[written:])
1537 raise ValueError("can not write to buf")
1541 ########################################################################
1543 ########################################################################
1545 class AutoAddSlots(type):
1546 def __new__(cls, name, bases, _dict):
1547 _dict["__slots__"] = _dict.get("__slots__", ())
1548 return type.__new__(cls, name, bases, _dict)
1551 BasicState = namedtuple("BasicState", (
1564 ), **NAMEDTUPLE_KWARGS)
1567 @add_metaclass(AutoAddSlots)
1569 """Common ASN.1 object class
1571 All ASN.1 types are inherited from it. It has metaclass that
1572 automatically adds ``__slots__`` to all inherited classes.
1597 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1598 self._expl = getattr(self, "expl", None) if expl is None else expl
1599 if self.tag != self.tag_default and self._expl is not None:
1600 raise ValueError("implicit and explicit tags can not be set simultaneously")
1601 if self.tag is None:
1602 self._tag_order = None
1604 tag_class, _, tag_num = tag_decode(
1605 self.tag if self._expl is None else self._expl
1607 self._tag_order = (tag_class, tag_num)
1608 if default is not None:
1610 self.optional = optional
1611 self.offset, self.llen, self.vlen = _decoded
1613 self.expl_lenindef = False
1614 self.lenindef = False
1615 self.ber_encoded = False
1618 def ready(self): # pragma: no cover
1619 """Is object ready to be encoded?
1621 raise NotImplementedError()
1623 def _assert_ready(self):
1625 raise ObjNotReady(self.__class__.__name__)
1629 """Is either object or any elements inside is BER encoded?
1631 return self.expl_lenindef or self.lenindef or self.ber_encoded
1635 """Is object decoded?
1637 return (self.llen + self.vlen) > 0
1639 def __getstate__(self): # pragma: no cover
1640 """Used for making safe to be mutable pickleable copies
1642 raise NotImplementedError()
1644 def __setstate__(self, state):
1645 if state.version != __version__:
1646 raise ValueError("data is pickled by different PyDERASN version")
1647 self.tag = state.tag
1648 self._tag_order = state.tag_order
1649 self._expl = state.expl
1650 self.default = state.default
1651 self.optional = state.optional
1652 self.offset = state.offset
1653 self.llen = state.llen
1654 self.vlen = state.vlen
1655 self.expl_lenindef = state.expl_lenindef
1656 self.lenindef = state.lenindef
1657 self.ber_encoded = state.ber_encoded
1660 def tag_order(self):
1661 """Tag's (class, number) used for DER/CER sorting
1663 return self._tag_order
1666 def tag_order_cer(self):
1667 return self.tag_order
1671 """See :ref:`decoding`
1673 return len(self.tag)
1677 """See :ref:`decoding`
1679 return self.tlen + self.llen + self.vlen
1681 def __str__(self): # pragma: no cover
1682 return self.__bytes__() if PY2 else self.__unicode__()
1684 def __ne__(self, their):
1685 return not(self == their)
1687 def __gt__(self, their): # pragma: no cover
1688 return not(self < their)
1690 def __le__(self, their): # pragma: no cover
1691 return (self == their) or (self < their)
1693 def __ge__(self, their): # pragma: no cover
1694 return (self == their) or (self > their)
1696 def _encode(self): # pragma: no cover
1697 raise NotImplementedError()
1699 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1700 yield NotImplemented
1703 """DER encode the structure
1705 :returns: DER representation
1707 raw = self._encode()
1708 if self._expl is None:
1710 return b"".join((self._expl, len_encode(len(raw)), raw))
1712 def encode_cer(self, writer):
1713 """CER encode the structure to specified writer
1715 :param writer: must comply with ``io.RawIOBase.write``
1716 behaviour. It takes slice to be written and
1717 returns number of bytes processed. If it returns
1718 None, then exception will be raised
1720 if self._expl is not None:
1721 write_full(writer, self._expl + LENINDEF)
1722 if getattr(self, "der_forced", False):
1723 write_full(writer, self._encode())
1725 self._encode_cer(writer)
1726 if self._expl is not None:
1727 write_full(writer, EOC)
1729 def _encode_cer(self, writer):
1730 write_full(writer, self._encode())
1732 def hexencode(self):
1733 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1735 return hexenc(self.encode())
1745 _ctx_immutable=True,
1749 :param data: either binary or memoryview
1750 :param int offset: initial data's offset
1751 :param bool leavemm: do we need to leave memoryview of remaining
1752 data as is, or convert it to bytes otherwise
1753 :param decode_path: current decode path (tuples of strings,
1754 possibly with DecodePathDefBy) with will be
1755 the root for all underlying objects
1756 :param ctx: optional :ref:`context <ctx>` governing decoding process
1757 :param bool tag_only: decode only the tag, without length and
1758 contents (used only in Choice and Set
1759 structures, trying to determine if tag satisfies
1761 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1763 :returns: (Obj, remaining data)
1765 .. seealso:: :ref:`decoding`
1767 result = next(self.decode_evgen(
1779 _, obj, tail = result
1790 _ctx_immutable=True,
1793 """Decode with evgen mode on
1795 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1796 it returns the generator producing ``(decode_path, obj, tail)``
1797 values. See :ref:`evgen mode <evgen_mode>`.
1801 elif _ctx_immutable:
1803 tlv = memoryview(data)
1806 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1809 if self._expl is None:
1810 for result in self._decode(
1813 decode_path=decode_path,
1816 evgen_mode=_evgen_mode,
1821 _decode_path, obj, tail = result
1822 if not _decode_path is decode_path:
1826 t, tlen, lv = tag_strip(tlv)
1827 except DecodeError as err:
1828 raise err.__class__(
1830 klass=self.__class__,
1831 decode_path=decode_path,
1836 klass=self.__class__,
1837 decode_path=decode_path,
1841 l, llen, v = len_decode(lv)
1842 except LenIndefForm as err:
1843 if not ctx.get("bered", False):
1844 raise err.__class__(
1846 klass=self.__class__,
1847 decode_path=decode_path,
1851 offset += tlen + llen
1852 for result in self._decode(
1855 decode_path=decode_path,
1858 evgen_mode=_evgen_mode,
1860 if tag_only: # pragma: no cover
1863 _decode_path, obj, tail = result
1864 if not _decode_path is decode_path:
1866 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1867 if eoc_expected.tobytes() != EOC:
1870 klass=self.__class__,
1871 decode_path=decode_path,
1875 obj.expl_lenindef = True
1876 except DecodeError as err:
1877 raise err.__class__(
1879 klass=self.__class__,
1880 decode_path=decode_path,
1885 raise NotEnoughData(
1886 "encoded length is longer than data",
1887 klass=self.__class__,
1888 decode_path=decode_path,
1891 for result in self._decode(
1893 offset=offset + tlen + llen,
1894 decode_path=decode_path,
1897 evgen_mode=_evgen_mode,
1899 if tag_only: # pragma: no cover
1902 _decode_path, obj, tail = result
1903 if not _decode_path is decode_path:
1905 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
1907 "explicit tag out-of-bound, longer than data",
1908 klass=self.__class__,
1909 decode_path=decode_path,
1912 yield decode_path, obj, (tail if leavemm else tail.tobytes())
1914 def decod(self, data, offset=0, decode_path=(), ctx=None):
1915 """Decode the data, check that tail is empty
1917 :raises ExceedingData: if tail is not empty
1919 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
1920 (decode without tail) that also checks that there is no
1923 obj, tail = self.decode(
1926 decode_path=decode_path,
1931 raise ExceedingData(len(tail))
1934 def hexdecode(self, data, *args, **kwargs):
1935 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
1937 return self.decode(hexdec(data), *args, **kwargs)
1939 def hexdecod(self, data, *args, **kwargs):
1940 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
1942 return self.decod(hexdec(data), *args, **kwargs)
1946 """See :ref:`decoding`
1948 return self._expl is not None
1952 """See :ref:`decoding`
1957 def expl_tlen(self):
1958 """See :ref:`decoding`
1960 return len(self._expl)
1963 def expl_llen(self):
1964 """See :ref:`decoding`
1966 if self.expl_lenindef:
1968 return len(len_encode(self.tlvlen))
1971 def expl_offset(self):
1972 """See :ref:`decoding`
1974 return self.offset - self.expl_tlen - self.expl_llen
1977 def expl_vlen(self):
1978 """See :ref:`decoding`
1983 def expl_tlvlen(self):
1984 """See :ref:`decoding`
1986 return self.expl_tlen + self.expl_llen + self.expl_vlen
1989 def fulloffset(self):
1990 """See :ref:`decoding`
1992 return self.expl_offset if self.expled else self.offset
1996 """See :ref:`decoding`
1998 return self.expl_tlvlen if self.expled else self.tlvlen
2000 def pps_lenindef(self, decode_path):
2001 if self.lenindef and not (
2002 getattr(self, "defined", None) is not None and
2003 self.defined[1].lenindef
2006 asn1_type_name="EOC",
2008 decode_path=decode_path,
2010 self.offset + self.tlvlen -
2011 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2019 if self.expl_lenindef:
2021 asn1_type_name="EOC",
2022 obj_name="EXPLICIT",
2023 decode_path=decode_path,
2024 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2033 def encode_cer(obj):
2034 """Encode to CER in memory buffer
2036 :returns bytes: memory buffer contents
2039 obj.encode_cer(buf.write)
2040 return buf.getvalue()
2043 class DecodePathDefBy(object):
2044 """DEFINED BY representation inside decode path
2046 __slots__ = ("defined_by",)
2048 def __init__(self, defined_by):
2049 self.defined_by = defined_by
2051 def __ne__(self, their):
2052 return not(self == their)
2054 def __eq__(self, their):
2055 if not isinstance(their, self.__class__):
2057 return self.defined_by == their.defined_by
2060 return "DEFINED BY " + str(self.defined_by)
2063 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2066 ########################################################################
2068 ########################################################################
2070 PP = namedtuple("PP", (
2093 ), **NAMEDTUPLE_KWARGS)
2098 asn1_type_name="unknown",
2115 expl_lenindef=False,
2146 def _colourize(what, colour, with_colours, attrs=("bold",)):
2147 return colored(what, colour, attrs=attrs) if with_colours else what
2150 def colonize_hex(hexed):
2151 """Separate hexadecimal string with colons
2153 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2162 with_decode_path=False,
2163 decode_path_len_decrease=0,
2170 " " if pp.expl_offset is None else
2171 ("-%d" % (pp.offset - pp.expl_offset))
2173 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2175 col = _colourize(col, "red", with_colours, ())
2176 col += _colourize("B", "red", with_colours) if pp.bered else " "
2178 col = "[%d,%d,%4d]%s" % (
2182 LENINDEF_PP_CHAR if pp.lenindef else " "
2184 col = _colourize(col, "green", with_colours, ())
2186 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2187 if decode_path_len > 0:
2188 cols.append(" ." * decode_path_len)
2189 ent = pp.decode_path[-1]
2190 if isinstance(ent, DecodePathDefBy):
2191 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2192 value = str(ent.defined_by)
2195 len(oid_maps) > 0 and
2196 ent.defined_by.asn1_type_name ==
2197 ObjectIdentifier.asn1_type_name
2199 for oid_map in oid_maps:
2200 oid_name = oid_map.get(value)
2201 if oid_name is not None:
2202 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2204 if oid_name is None:
2205 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2207 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2208 if pp.expl is not None:
2209 klass, _, num = pp.expl
2210 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2211 cols.append(_colourize(col, "blue", with_colours))
2212 if pp.impl is not None:
2213 klass, _, num = pp.impl
2214 col = "[%s%d]" % (TagClassReprs[klass], num)
2215 cols.append(_colourize(col, "blue", with_colours))
2216 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2217 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2219 cols.append(_colourize("BER", "red", with_colours))
2220 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2221 if pp.value is not None:
2223 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2225 len(oid_maps) > 0 and
2226 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2228 for oid_map in oid_maps:
2229 oid_name = oid_map.get(value)
2230 if oid_name is not None:
2231 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2233 if pp.asn1_type_name == Integer.asn1_type_name:
2234 hex_repr = hex(int(pp.obj._value))[2:].upper()
2235 if len(hex_repr) % 2 != 0:
2236 hex_repr = "0" + hex_repr
2237 cols.append(_colourize(
2238 "(%s)" % colonize_hex(hex_repr),
2243 if pp.blob.__class__ == binary_type:
2244 cols.append(hexenc(pp.blob))
2245 elif pp.blob.__class__ == tuple:
2246 cols.append(", ".join(pp.blob))
2248 cols.append(_colourize("OPTIONAL", "red", with_colours))
2250 cols.append(_colourize("DEFAULT", "red", with_colours))
2251 if with_decode_path:
2252 cols.append(_colourize(
2253 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2257 return " ".join(cols)
2260 def pp_console_blob(pp, decode_path_len_decrease=0):
2261 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2262 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2263 if decode_path_len > 0:
2264 cols.append(" ." * (decode_path_len + 1))
2265 if pp.blob.__class__ == binary_type:
2266 blob = hexenc(pp.blob).upper()
2267 for i in six_xrange(0, len(blob), 32):
2268 chunk = blob[i:i + 32]
2269 yield " ".join(cols + [colonize_hex(chunk)])
2270 elif pp.blob.__class__ == tuple:
2271 yield " ".join(cols + [", ".join(pp.blob)])
2279 with_decode_path=False,
2280 decode_path_only=(),
2283 """Pretty print object
2285 :param Obj obj: object you want to pretty print
2286 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2287 Its human readable form is printed when OID is met
2288 :param big_blobs: if large binary objects are met (like OctetString
2289 values), do we need to print them too, on separate
2291 :param with_colours: colourize output, if ``termcolor`` library
2293 :param with_decode_path: print decode path
2294 :param decode_path_only: print only that specified decode path
2296 def _pprint_pps(pps):
2298 if hasattr(pp, "_fields"):
2300 decode_path_only != () and
2302 str(p) for p in pp.decode_path[:len(decode_path_only)]
2303 ) != decode_path_only
2307 yield pp_console_row(
2312 with_colours=with_colours,
2313 with_decode_path=with_decode_path,
2314 decode_path_len_decrease=len(decode_path_only),
2316 for row in pp_console_blob(
2318 decode_path_len_decrease=len(decode_path_only),
2322 yield pp_console_row(
2327 with_colours=with_colours,
2328 with_decode_path=with_decode_path,
2329 decode_path_len_decrease=len(decode_path_only),
2332 for row in _pprint_pps(pp):
2334 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2337 ########################################################################
2338 # ASN.1 primitive types
2339 ########################################################################
2341 BooleanState = namedtuple(
2343 BasicState._fields + ("value",),
2349 """``BOOLEAN`` boolean type
2351 >>> b = Boolean(True)
2353 >>> b == Boolean(True)
2359 tag_default = tag_encode(1)
2360 asn1_type_name = "BOOLEAN"
2372 :param value: set the value. Either boolean type, or
2373 :py:class:`pyderasn.Boolean` object
2374 :param bytes impl: override default tag with ``IMPLICIT`` one
2375 :param bytes expl: override default tag with ``EXPLICIT`` one
2376 :param default: set default value. Type same as in ``value``
2377 :param bool optional: is object ``OPTIONAL`` in sequence
2379 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2380 self._value = None if value is None else self._value_sanitize(value)
2381 if default is not None:
2382 default = self._value_sanitize(default)
2383 self.default = self.__class__(
2389 self._value = default
2391 def _value_sanitize(self, value):
2392 if value.__class__ == bool:
2394 if issubclass(value.__class__, Boolean):
2396 raise InvalidValueType((self.__class__, bool))
2400 return self._value is not None
2402 def __getstate__(self):
2403 return BooleanState(
2419 def __setstate__(self, state):
2420 super(Boolean, self).__setstate__(state)
2421 self._value = state.value
2423 def __nonzero__(self):
2424 self._assert_ready()
2428 self._assert_ready()
2431 def __eq__(self, their):
2432 if their.__class__ == bool:
2433 return self._value == their
2434 if not issubclass(their.__class__, Boolean):
2437 self._value == their._value and
2438 self.tag == their.tag and
2439 self._expl == their._expl
2450 return self.__class__(
2452 impl=self.tag if impl is None else impl,
2453 expl=self._expl if expl is None else expl,
2454 default=self.default if default is None else default,
2455 optional=self.optional if optional is None else optional,
2459 self._assert_ready()
2463 (b"\xFF" if self._value else b"\x00"),
2466 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2468 t, _, lv = tag_strip(tlv)
2469 except DecodeError as err:
2470 raise err.__class__(
2472 klass=self.__class__,
2473 decode_path=decode_path,
2478 klass=self.__class__,
2479 decode_path=decode_path,
2486 l, _, v = len_decode(lv)
2487 except DecodeError as err:
2488 raise err.__class__(
2490 klass=self.__class__,
2491 decode_path=decode_path,
2495 raise InvalidLength(
2496 "Boolean's length must be equal to 1",
2497 klass=self.__class__,
2498 decode_path=decode_path,
2502 raise NotEnoughData(
2503 "encoded length is longer than data",
2504 klass=self.__class__,
2505 decode_path=decode_path,
2508 first_octet = byte2int(v)
2510 if first_octet == 0:
2512 elif first_octet == 0xFF:
2514 elif ctx.get("bered", False):
2519 "unacceptable Boolean value",
2520 klass=self.__class__,
2521 decode_path=decode_path,
2524 obj = self.__class__(
2528 default=self.default,
2529 optional=self.optional,
2530 _decoded=(offset, 1, 1),
2532 obj.ber_encoded = ber_encoded
2533 yield decode_path, obj, v[1:]
2536 return pp_console_row(next(self.pps()))
2538 def pps(self, decode_path=()):
2541 asn1_type_name=self.asn1_type_name,
2542 obj_name=self.__class__.__name__,
2543 decode_path=decode_path,
2544 value=str(self._value) if self.ready else None,
2545 optional=self.optional,
2546 default=self == self.default,
2547 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2548 expl=None if self._expl is None else tag_decode(self._expl),
2553 expl_offset=self.expl_offset if self.expled else None,
2554 expl_tlen=self.expl_tlen if self.expled else None,
2555 expl_llen=self.expl_llen if self.expled else None,
2556 expl_vlen=self.expl_vlen if self.expled else None,
2557 expl_lenindef=self.expl_lenindef,
2558 ber_encoded=self.ber_encoded,
2561 for pp in self.pps_lenindef(decode_path):
2565 IntegerState = namedtuple(
2567 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2573 """``INTEGER`` integer type
2575 >>> b = Integer(-123)
2577 >>> b == Integer(-123)
2582 >>> Integer(2, bounds=(1, 3))
2584 >>> Integer(5, bounds=(1, 3))
2585 Traceback (most recent call last):
2586 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2590 class Version(Integer):
2597 >>> v = Version("v1")
2604 {'v3': 2, 'v1': 0, 'v2': 1}
2606 __slots__ = ("specs", "_bound_min", "_bound_max")
2607 tag_default = tag_encode(2)
2608 asn1_type_name = "INTEGER"
2622 :param value: set the value. Either integer type, named value
2623 (if ``schema`` is specified in the class), or
2624 :py:class:`pyderasn.Integer` object
2625 :param bounds: set ``(MIN, MAX)`` value constraint.
2626 (-inf, +inf) by default
2627 :param bytes impl: override default tag with ``IMPLICIT`` one
2628 :param bytes expl: override default tag with ``EXPLICIT`` one
2629 :param default: set default value. Type same as in ``value``
2630 :param bool optional: is object ``OPTIONAL`` in sequence
2632 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2634 specs = getattr(self, "schema", {}) if _specs is None else _specs
2635 self.specs = specs if specs.__class__ == dict else dict(specs)
2636 self._bound_min, self._bound_max = getattr(
2639 (float("-inf"), float("+inf")),
2640 ) if bounds is None else bounds
2641 if value is not None:
2642 self._value = self._value_sanitize(value)
2643 if default is not None:
2644 default = self._value_sanitize(default)
2645 self.default = self.__class__(
2651 if self._value is None:
2652 self._value = default
2654 def _value_sanitize(self, value):
2655 if isinstance(value, integer_types):
2657 elif issubclass(value.__class__, Integer):
2658 value = value._value
2659 elif value.__class__ == str:
2660 value = self.specs.get(value)
2662 raise ObjUnknown("integer value: %s" % value)
2664 raise InvalidValueType((self.__class__, int, str))
2665 if not self._bound_min <= value <= self._bound_max:
2666 raise BoundsError(self._bound_min, value, self._bound_max)
2671 return self._value is not None
2673 def __getstate__(self):
2674 return IntegerState(
2693 def __setstate__(self, state):
2694 super(Integer, self).__setstate__(state)
2695 self.specs = state.specs
2696 self._value = state.value
2697 self._bound_min = state.bound_min
2698 self._bound_max = state.bound_max
2701 self._assert_ready()
2702 return int(self._value)
2705 self._assert_ready()
2708 bytes(self._expl or b"") +
2709 str(self._value).encode("ascii"),
2712 def __eq__(self, their):
2713 if isinstance(their, integer_types):
2714 return self._value == their
2715 if not issubclass(their.__class__, Integer):
2718 self._value == their._value and
2719 self.tag == their.tag and
2720 self._expl == their._expl
2723 def __lt__(self, their):
2724 return self._value < their._value
2728 """Return named representation (if exists) of the value
2730 for name, value in iteritems(self.specs):
2731 if value == self._value:
2744 return self.__class__(
2747 (self._bound_min, self._bound_max)
2748 if bounds is None else bounds
2750 impl=self.tag if impl is None else impl,
2751 expl=self._expl if expl is None else expl,
2752 default=self.default if default is None else default,
2753 optional=self.optional if optional is None else optional,
2758 self._assert_ready()
2762 octets = bytearray([0])
2766 octets = bytearray()
2768 octets.append((value & 0xFF) ^ 0xFF)
2770 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2773 octets = bytearray()
2775 octets.append(value & 0xFF)
2777 if octets[-1] & 0x80 > 0:
2780 octets = bytes(octets)
2782 bytes_len = ceil(value.bit_length() / 8) or 1
2785 octets = value.to_bytes(
2790 except OverflowError:
2794 return b"".join((self.tag, len_encode(len(octets)), octets))
2796 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2798 t, _, lv = tag_strip(tlv)
2799 except DecodeError as err:
2800 raise err.__class__(
2802 klass=self.__class__,
2803 decode_path=decode_path,
2808 klass=self.__class__,
2809 decode_path=decode_path,
2816 l, llen, v = len_decode(lv)
2817 except DecodeError as err:
2818 raise err.__class__(
2820 klass=self.__class__,
2821 decode_path=decode_path,
2825 raise NotEnoughData(
2826 "encoded length is longer than data",
2827 klass=self.__class__,
2828 decode_path=decode_path,
2832 raise NotEnoughData(
2834 klass=self.__class__,
2835 decode_path=decode_path,
2838 v, tail = v[:l], v[l:]
2839 first_octet = byte2int(v)
2841 second_octet = byte2int(v[1:])
2843 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2844 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
2847 "non normalized integer",
2848 klass=self.__class__,
2849 decode_path=decode_path,
2854 if first_octet & 0x80 > 0:
2855 octets = bytearray()
2856 for octet in bytearray(v):
2857 octets.append(octet ^ 0xFF)
2858 for octet in octets:
2859 value = (value << 8) | octet
2863 for octet in bytearray(v):
2864 value = (value << 8) | octet
2866 value = int.from_bytes(v, byteorder="big", signed=True)
2868 obj = self.__class__(
2870 bounds=(self._bound_min, self._bound_max),
2873 default=self.default,
2874 optional=self.optional,
2876 _decoded=(offset, llen, l),
2878 except BoundsError as err:
2881 klass=self.__class__,
2882 decode_path=decode_path,
2885 yield decode_path, obj, tail
2888 return pp_console_row(next(self.pps()))
2890 def pps(self, decode_path=()):
2893 asn1_type_name=self.asn1_type_name,
2894 obj_name=self.__class__.__name__,
2895 decode_path=decode_path,
2896 value=(self.named or str(self._value)) if self.ready else None,
2897 optional=self.optional,
2898 default=self == self.default,
2899 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2900 expl=None if self._expl is None else tag_decode(self._expl),
2905 expl_offset=self.expl_offset if self.expled else None,
2906 expl_tlen=self.expl_tlen if self.expled else None,
2907 expl_llen=self.expl_llen if self.expled else None,
2908 expl_vlen=self.expl_vlen if self.expled else None,
2909 expl_lenindef=self.expl_lenindef,
2912 for pp in self.pps_lenindef(decode_path):
2916 BitStringState = namedtuple(
2918 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
2923 class BitString(Obj):
2924 """``BIT STRING`` bit string type
2926 >>> BitString(b"hello world")
2927 BIT STRING 88 bits 68656c6c6f20776f726c64
2930 >>> b == b"hello world"
2935 >>> BitString("'0A3B5F291CD'H")
2936 BIT STRING 44 bits 0a3b5f291cd0
2937 >>> b = BitString("'010110000000'B")
2938 BIT STRING 12 bits 5800
2941 >>> b[0], b[1], b[2], b[3]
2942 (False, True, False, True)
2946 [False, True, False, True, True, False, False, False, False, False, False, False]
2950 class KeyUsage(BitString):
2952 ("digitalSignature", 0),
2953 ("nonRepudiation", 1),
2954 ("keyEncipherment", 2),
2957 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
2958 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
2960 ['nonRepudiation', 'keyEncipherment']
2962 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
2966 Pay attention that BIT STRING can be encoded both in primitive
2967 and constructed forms. Decoder always checks constructed form tag
2968 additionally to specified primitive one. If BER decoding is
2969 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
2970 of DER restrictions.
2972 __slots__ = ("tag_constructed", "specs", "defined")
2973 tag_default = tag_encode(3)
2974 asn1_type_name = "BIT STRING"
2987 :param value: set the value. Either binary type, tuple of named
2988 values (if ``schema`` is specified in the class),
2989 string in ``'XXX...'B`` form, or
2990 :py:class:`pyderasn.BitString` object
2991 :param bytes impl: override default tag with ``IMPLICIT`` one
2992 :param bytes expl: override default tag with ``EXPLICIT`` one
2993 :param default: set default value. Type same as in ``value``
2994 :param bool optional: is object ``OPTIONAL`` in sequence
2996 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
2997 specs = getattr(self, "schema", {}) if _specs is None else _specs
2998 self.specs = specs if specs.__class__ == dict else dict(specs)
2999 self._value = None if value is None else self._value_sanitize(value)
3000 if default is not None:
3001 default = self._value_sanitize(default)
3002 self.default = self.__class__(
3008 self._value = default
3010 tag_klass, _, tag_num = tag_decode(self.tag)
3011 self.tag_constructed = tag_encode(
3013 form=TagFormConstructed,
3017 def _bits2octets(self, bits):
3018 if len(self.specs) > 0:
3019 bits = bits.rstrip("0")
3021 bits += "0" * ((8 - (bit_len % 8)) % 8)
3022 octets = bytearray(len(bits) // 8)
3023 for i in six_xrange(len(octets)):
3024 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3025 return bit_len, bytes(octets)
3027 def _value_sanitize(self, value):
3028 if isinstance(value, (string_types, binary_type)):
3030 isinstance(value, string_types) and
3031 value.startswith("'")
3033 if value.endswith("'B"):
3035 if not frozenset(value) <= SET01:
3036 raise ValueError("B's coding contains unacceptable chars")
3037 return self._bits2octets(value)
3038 if value.endswith("'H"):
3042 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3044 if value.__class__ == binary_type:
3045 return (len(value) * 8, value)
3046 raise InvalidValueType((self.__class__, string_types, binary_type))
3047 if value.__class__ == tuple:
3050 isinstance(value[0], integer_types) and
3051 value[1].__class__ == binary_type
3056 bit = self.specs.get(name)
3058 raise ObjUnknown("BitString value: %s" % name)
3061 return self._bits2octets("")
3062 bits = frozenset(bits)
3063 return self._bits2octets("".join(
3064 ("1" if bit in bits else "0")
3065 for bit in six_xrange(max(bits) + 1)
3067 if issubclass(value.__class__, BitString):
3069 raise InvalidValueType((self.__class__, binary_type, string_types))
3073 return self._value is not None
3075 def __getstate__(self):
3076 return BitStringState(
3091 self.tag_constructed,
3095 def __setstate__(self, state):
3096 super(BitString, self).__setstate__(state)
3097 self.specs = state.specs
3098 self._value = state.value
3099 self.tag_constructed = state.tag_constructed
3100 self.defined = state.defined
3103 self._assert_ready()
3104 for i in six_xrange(self._value[0]):
3109 """Returns number of bits in the string
3111 self._assert_ready()
3112 return self._value[0]
3114 def __bytes__(self):
3115 self._assert_ready()
3116 return self._value[1]
3118 def __eq__(self, their):
3119 if their.__class__ == bytes:
3120 return self._value[1] == their
3121 if not issubclass(their.__class__, BitString):
3124 self._value == their._value and
3125 self.tag == their.tag and
3126 self._expl == their._expl
3131 """Named representation (if exists) of the bits
3133 :returns: [str(name), ...]
3135 return [name for name, bit in iteritems(self.specs) if self[bit]]
3145 return self.__class__(
3147 impl=self.tag if impl is None else impl,
3148 expl=self._expl if expl is None else expl,
3149 default=self.default if default is None else default,
3150 optional=self.optional if optional is None else optional,
3154 def __getitem__(self, key):
3155 if key.__class__ == int:
3156 bit_len, octets = self._value
3160 byte2int(memoryview(octets)[key // 8:]) >>
3163 if isinstance(key, string_types):
3164 value = self.specs.get(key)
3166 raise ObjUnknown("BitString value: %s" % key)
3168 raise InvalidValueType((int, str))
3171 self._assert_ready()
3172 bit_len, octets = self._value
3175 len_encode(len(octets) + 1),
3176 int2byte((8 - bit_len % 8) % 8),
3180 def _encode_cer(self, writer):
3181 bit_len, octets = self._value
3182 if len(octets) + 1 <= 1000:
3183 write_full(writer, self._encode())
3185 write_full(writer, self.tag_constructed)
3186 write_full(writer, LENINDEF)
3187 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3188 write_full(writer, b"".join((
3189 BitString.tag_default,
3192 octets[offset:offset + 999],
3194 tail = octets[offset+999:]
3196 tail = int2byte((8 - bit_len % 8) % 8) + tail
3197 write_full(writer, b"".join((
3198 BitString.tag_default,
3199 len_encode(len(tail)),
3202 write_full(writer, EOC)
3204 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3206 t, tlen, lv = tag_strip(tlv)
3207 except DecodeError as err:
3208 raise err.__class__(
3210 klass=self.__class__,
3211 decode_path=decode_path,
3215 if tag_only: # pragma: no cover
3219 l, llen, v = len_decode(lv)
3220 except DecodeError as err:
3221 raise err.__class__(
3223 klass=self.__class__,
3224 decode_path=decode_path,
3228 raise NotEnoughData(
3229 "encoded length is longer than data",
3230 klass=self.__class__,
3231 decode_path=decode_path,
3235 raise NotEnoughData(
3237 klass=self.__class__,
3238 decode_path=decode_path,
3241 pad_size = byte2int(v)
3242 if l == 1 and pad_size != 0:
3244 "invalid empty value",
3245 klass=self.__class__,
3246 decode_path=decode_path,
3252 klass=self.__class__,
3253 decode_path=decode_path,
3256 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3259 klass=self.__class__,
3260 decode_path=decode_path,
3263 v, tail = v[:l], v[l:]
3264 bit_len = (len(v) - 1) * 8 - pad_size
3265 obj = self.__class__(
3266 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3269 default=self.default,
3270 optional=self.optional,
3272 _decoded=(offset, llen, l),
3275 obj._value = (bit_len, None)
3276 yield decode_path, obj, tail
3278 if t != self.tag_constructed:
3280 klass=self.__class__,
3281 decode_path=decode_path,
3284 if not ctx.get("bered", False):
3286 "unallowed BER constructed encoding",
3287 klass=self.__class__,
3288 decode_path=decode_path,
3291 if tag_only: # pragma: no cover
3296 l, llen, v = len_decode(lv)
3297 except LenIndefForm:
3298 llen, l, v = 1, 0, lv[1:]
3300 except DecodeError as err:
3301 raise err.__class__(
3303 klass=self.__class__,
3304 decode_path=decode_path,
3308 raise NotEnoughData(
3309 "encoded length is longer than data",
3310 klass=self.__class__,
3311 decode_path=decode_path,
3314 if not lenindef and l == 0:
3315 raise NotEnoughData(
3317 klass=self.__class__,
3318 decode_path=decode_path,
3322 sub_offset = offset + tlen + llen
3326 if v[:EOC_LEN].tobytes() == EOC:
3333 "chunk out of bounds",
3334 klass=self.__class__,
3335 decode_path=decode_path + (str(len(chunks) - 1),),
3336 offset=chunks[-1].offset,
3338 sub_decode_path = decode_path + (str(len(chunks)),)
3341 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3344 decode_path=sub_decode_path,
3347 _ctx_immutable=False,
3349 yield _decode_path, chunk, v_tail
3351 _, chunk, v_tail = next(BitString().decode_evgen(
3354 decode_path=sub_decode_path,
3357 _ctx_immutable=False,
3362 "expected BitString encoded chunk",
3363 klass=self.__class__,
3364 decode_path=sub_decode_path,
3367 chunks.append(chunk)
3368 sub_offset += chunk.tlvlen
3369 vlen += chunk.tlvlen
3371 if len(chunks) == 0:
3374 klass=self.__class__,
3375 decode_path=decode_path,
3380 for chunk_i, chunk in enumerate(chunks[:-1]):
3381 if chunk.bit_len % 8 != 0:
3383 "BitString chunk is not multiple of 8 bits",
3384 klass=self.__class__,
3385 decode_path=decode_path + (str(chunk_i),),
3386 offset=chunk.offset,
3389 values.append(bytes(chunk))
3390 bit_len += chunk.bit_len
3391 chunk_last = chunks[-1]
3393 values.append(bytes(chunk_last))
3394 bit_len += chunk_last.bit_len
3395 obj = self.__class__(
3396 value=None if evgen_mode else (bit_len, b"".join(values)),
3399 default=self.default,
3400 optional=self.optional,
3402 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3405 obj._value = (bit_len, None)
3406 obj.lenindef = lenindef
3407 obj.ber_encoded = True
3408 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3411 return pp_console_row(next(self.pps()))
3413 def pps(self, decode_path=()):
3417 bit_len, blob = self._value
3418 value = "%d bits" % bit_len
3419 if len(self.specs) > 0 and blob is not None:
3420 blob = tuple(self.named)
3423 asn1_type_name=self.asn1_type_name,
3424 obj_name=self.__class__.__name__,
3425 decode_path=decode_path,
3428 optional=self.optional,
3429 default=self == self.default,
3430 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3431 expl=None if self._expl is None else tag_decode(self._expl),
3436 expl_offset=self.expl_offset if self.expled else None,
3437 expl_tlen=self.expl_tlen if self.expled else None,
3438 expl_llen=self.expl_llen if self.expled else None,
3439 expl_vlen=self.expl_vlen if self.expled else None,
3440 expl_lenindef=self.expl_lenindef,
3441 lenindef=self.lenindef,
3442 ber_encoded=self.ber_encoded,
3445 defined_by, defined = self.defined or (None, None)
3446 if defined_by is not None:
3448 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3450 for pp in self.pps_lenindef(decode_path):
3454 OctetStringState = namedtuple(
3456 BasicState._fields + (
3467 class OctetString(Obj):
3468 """``OCTET STRING`` binary string type
3470 >>> s = OctetString(b"hello world")
3471 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3472 >>> s == OctetString(b"hello world")
3477 >>> OctetString(b"hello", bounds=(4, 4))
3478 Traceback (most recent call last):
3479 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3480 >>> OctetString(b"hell", bounds=(4, 4))
3481 OCTET STRING 4 bytes 68656c6c
3483 Memoryviews can be used as a values. If memoryview is made on
3484 mmap-ed file, then it does not take storage inside OctetString
3485 itself. In CER encoding mode it will be streamed to the specified
3486 writer, copying 1 KB chunks.
3488 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3489 tag_default = tag_encode(4)
3490 asn1_type_name = "OCTET STRING"
3491 evgen_mode_skip_value = True
3505 :param value: set the value. Either binary type, or
3506 :py:class:`pyderasn.OctetString` object
3507 :param bounds: set ``(MIN, MAX)`` value size constraint.
3508 (-inf, +inf) by default
3509 :param bytes impl: override default tag with ``IMPLICIT`` one
3510 :param bytes expl: override default tag with ``EXPLICIT`` one
3511 :param default: set default value. Type same as in ``value``
3512 :param bool optional: is object ``OPTIONAL`` in sequence
3514 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3516 self._bound_min, self._bound_max = getattr(
3520 ) if bounds is None else bounds
3521 if value is not None:
3522 self._value = self._value_sanitize(value)
3523 if default is not None:
3524 default = self._value_sanitize(default)
3525 self.default = self.__class__(
3530 if self._value is None:
3531 self._value = default
3533 tag_klass, _, tag_num = tag_decode(self.tag)
3534 self.tag_constructed = tag_encode(
3536 form=TagFormConstructed,
3540 def _value_sanitize(self, value):
3541 if value.__class__ == binary_type or value.__class__ == memoryview:
3543 elif issubclass(value.__class__, OctetString):
3544 value = value._value
3546 raise InvalidValueType((self.__class__, bytes, memoryview))
3547 if not self._bound_min <= len(value) <= self._bound_max:
3548 raise BoundsError(self._bound_min, len(value), self._bound_max)
3553 return self._value is not None
3555 def __getstate__(self):
3556 return OctetStringState(
3572 self.tag_constructed,
3576 def __setstate__(self, state):
3577 super(OctetString, self).__setstate__(state)
3578 self._value = state.value
3579 self._bound_min = state.bound_min
3580 self._bound_max = state.bound_max
3581 self.tag_constructed = state.tag_constructed
3582 self.defined = state.defined
3584 def __bytes__(self):
3585 self._assert_ready()
3586 return bytes(self._value)
3588 def __eq__(self, their):
3589 if their.__class__ == binary_type:
3590 return self._value == their
3591 if not issubclass(their.__class__, OctetString):
3594 self._value == their._value and
3595 self.tag == their.tag and
3596 self._expl == their._expl
3599 def __lt__(self, their):
3600 return self._value < their._value
3611 return self.__class__(
3614 (self._bound_min, self._bound_max)
3615 if bounds is None else bounds
3617 impl=self.tag if impl is None else impl,
3618 expl=self._expl if expl is None else expl,
3619 default=self.default if default is None else default,
3620 optional=self.optional if optional is None else optional,
3624 self._assert_ready()
3627 len_encode(len(self._value)),
3631 def _encode_cer(self, writer):
3632 octets = self._value
3633 if len(octets) <= 1000:
3634 write_full(writer, self._encode())
3636 write_full(writer, self.tag_constructed)
3637 write_full(writer, LENINDEF)
3638 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3639 write_full(writer, b"".join((
3640 OctetString.tag_default,
3642 octets[offset:offset + 1000],
3644 tail = octets[offset+1000:]
3646 write_full(writer, b"".join((
3647 OctetString.tag_default,
3648 len_encode(len(tail)),
3651 write_full(writer, EOC)
3653 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3655 t, tlen, lv = tag_strip(tlv)
3656 except DecodeError as err:
3657 raise err.__class__(
3659 klass=self.__class__,
3660 decode_path=decode_path,
3668 l, llen, v = len_decode(lv)
3669 except DecodeError as err:
3670 raise err.__class__(
3672 klass=self.__class__,
3673 decode_path=decode_path,
3677 raise NotEnoughData(
3678 "encoded length is longer than data",
3679 klass=self.__class__,
3680 decode_path=decode_path,
3683 v, tail = v[:l], v[l:]
3684 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3686 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3687 klass=self.__class__,
3688 decode_path=decode_path,
3692 obj = self.__class__(
3694 None if (evgen_mode and self.evgen_mode_skip_value)
3697 bounds=(self._bound_min, self._bound_max),
3700 default=self.default,
3701 optional=self.optional,
3702 _decoded=(offset, llen, l),
3705 except DecodeError as err:
3708 klass=self.__class__,
3709 decode_path=decode_path,
3712 except BoundsError as err:
3715 klass=self.__class__,
3716 decode_path=decode_path,
3719 yield decode_path, obj, tail
3721 if t != self.tag_constructed:
3723 klass=self.__class__,
3724 decode_path=decode_path,
3727 if not ctx.get("bered", False):
3729 "unallowed BER constructed encoding",
3730 klass=self.__class__,
3731 decode_path=decode_path,
3739 l, llen, v = len_decode(lv)
3740 except LenIndefForm:
3741 llen, l, v = 1, 0, lv[1:]
3743 except DecodeError as err:
3744 raise err.__class__(
3746 klass=self.__class__,
3747 decode_path=decode_path,
3751 raise NotEnoughData(
3752 "encoded length is longer than data",
3753 klass=self.__class__,
3754 decode_path=decode_path,
3759 sub_offset = offset + tlen + llen
3764 if v[:EOC_LEN].tobytes() == EOC:
3771 "chunk out of bounds",
3772 klass=self.__class__,
3773 decode_path=decode_path + (str(len(chunks) - 1),),
3774 offset=chunks[-1].offset,
3778 sub_decode_path = decode_path + (str(chunks_count),)
3779 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3782 decode_path=sub_decode_path,
3785 _ctx_immutable=False,
3787 yield _decode_path, chunk, v_tail
3788 if not chunk.ber_encoded:
3789 payload_len += chunk.vlen
3792 sub_decode_path = decode_path + (str(len(chunks)),)
3793 _, chunk, v_tail = next(OctetString().decode_evgen(
3796 decode_path=sub_decode_path,
3799 _ctx_immutable=False,
3802 chunks.append(chunk)
3805 "expected OctetString encoded chunk",
3806 klass=self.__class__,
3807 decode_path=sub_decode_path,
3810 sub_offset += chunk.tlvlen
3811 vlen += chunk.tlvlen
3813 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3815 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3816 klass=self.__class__,
3817 decode_path=decode_path,
3821 obj = self.__class__(
3823 None if evgen_mode else
3824 b"".join(bytes(chunk) for chunk in chunks)
3826 bounds=(self._bound_min, self._bound_max),
3829 default=self.default,
3830 optional=self.optional,
3831 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3834 except DecodeError as err:
3837 klass=self.__class__,
3838 decode_path=decode_path,
3841 except BoundsError as err:
3844 klass=self.__class__,
3845 decode_path=decode_path,
3848 obj.lenindef = lenindef
3849 obj.ber_encoded = True
3850 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3853 return pp_console_row(next(self.pps()))
3855 def pps(self, decode_path=()):
3858 asn1_type_name=self.asn1_type_name,
3859 obj_name=self.__class__.__name__,
3860 decode_path=decode_path,
3861 value=("%d bytes" % len(self._value)) if self.ready else None,
3862 blob=self._value if self.ready else None,
3863 optional=self.optional,
3864 default=self == self.default,
3865 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3866 expl=None if self._expl is None else tag_decode(self._expl),
3871 expl_offset=self.expl_offset if self.expled else None,
3872 expl_tlen=self.expl_tlen if self.expled else None,
3873 expl_llen=self.expl_llen if self.expled else None,
3874 expl_vlen=self.expl_vlen if self.expled else None,
3875 expl_lenindef=self.expl_lenindef,
3876 lenindef=self.lenindef,
3877 ber_encoded=self.ber_encoded,
3880 defined_by, defined = self.defined or (None, None)
3881 if defined_by is not None:
3883 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3885 for pp in self.pps_lenindef(decode_path):
3889 def agg_octet_string(evgens, decode_path, raw, writer):
3890 """Aggregate constructed string (OctetString and its derivatives)
3892 :param evgens: iterator of generated events
3893 :param decode_path: points to the string we want to decode
3894 :param raw: slicebable (memoryview, bytearray, etc) with
3895 the data evgens are generated on
3896 :param writer: buffer.write where string is going to be saved
3897 :param writer: where string is going to be saved. Must comply
3898 with ``io.RawIOBase.write`` behaviour
3900 decode_path_len = len(decode_path)
3901 for dp, obj, _ in evgens:
3902 if dp[:decode_path_len] != decode_path:
3904 if not obj.ber_encoded:
3905 write_full(writer, raw[
3906 obj.offset + obj.tlen + obj.llen:
3907 obj.offset + obj.tlen + obj.llen + obj.vlen -
3908 (EOC_LEN if obj.expl_lenindef else 0)
3910 if len(dp) == decode_path_len:
3914 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
3918 """``NULL`` null object
3926 tag_default = tag_encode(5)
3927 asn1_type_name = "NULL"
3931 value=None, # unused, but Sequence passes it
3938 :param bytes impl: override default tag with ``IMPLICIT`` one
3939 :param bytes expl: override default tag with ``EXPLICIT`` one
3940 :param bool optional: is object ``OPTIONAL`` in sequence
3942 super(Null, self).__init__(impl, expl, None, optional, _decoded)
3949 def __getstate__(self):
3965 def __eq__(self, their):
3966 if not issubclass(their.__class__, Null):
3969 self.tag == their.tag and
3970 self._expl == their._expl
3980 return self.__class__(
3981 impl=self.tag if impl is None else impl,
3982 expl=self._expl if expl is None else expl,
3983 optional=self.optional if optional is None else optional,
3987 return self.tag + len_encode(0)
3989 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3991 t, _, lv = tag_strip(tlv)
3992 except DecodeError as err:
3993 raise err.__class__(
3995 klass=self.__class__,
3996 decode_path=decode_path,
4001 klass=self.__class__,
4002 decode_path=decode_path,
4005 if tag_only: # pragma: no cover
4009 l, _, v = len_decode(lv)
4010 except DecodeError as err:
4011 raise err.__class__(
4013 klass=self.__class__,
4014 decode_path=decode_path,
4018 raise InvalidLength(
4019 "Null must have zero length",
4020 klass=self.__class__,
4021 decode_path=decode_path,
4024 obj = self.__class__(
4027 optional=self.optional,
4028 _decoded=(offset, 1, 0),
4030 yield decode_path, obj, v
4033 return pp_console_row(next(self.pps()))
4035 def pps(self, decode_path=()):
4038 asn1_type_name=self.asn1_type_name,
4039 obj_name=self.__class__.__name__,
4040 decode_path=decode_path,
4041 optional=self.optional,
4042 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4043 expl=None if self._expl is None else tag_decode(self._expl),
4048 expl_offset=self.expl_offset if self.expled else None,
4049 expl_tlen=self.expl_tlen if self.expled else None,
4050 expl_llen=self.expl_llen if self.expled else None,
4051 expl_vlen=self.expl_vlen if self.expled else None,
4052 expl_lenindef=self.expl_lenindef,
4055 for pp in self.pps_lenindef(decode_path):
4059 ObjectIdentifierState = namedtuple(
4060 "ObjectIdentifierState",
4061 BasicState._fields + ("value", "defines"),
4066 class ObjectIdentifier(Obj):
4067 """``OBJECT IDENTIFIER`` OID type
4069 >>> oid = ObjectIdentifier((1, 2, 3))
4070 OBJECT IDENTIFIER 1.2.3
4071 >>> oid == ObjectIdentifier("1.2.3")
4077 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4078 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4080 >>> str(ObjectIdentifier((3, 1)))
4081 Traceback (most recent call last):
4082 pyderasn.InvalidOID: unacceptable first arc value
4084 __slots__ = ("defines",)
4085 tag_default = tag_encode(6)
4086 asn1_type_name = "OBJECT IDENTIFIER"
4099 :param value: set the value. Either tuples of integers,
4100 string of "."-concatenated integers, or
4101 :py:class:`pyderasn.ObjectIdentifier` object
4102 :param defines: sequence of tuples. Each tuple has two elements.
4103 First one is relative to current one decode
4104 path, aiming to the field defined by that OID.
4105 Read about relative path in
4106 :py:func:`pyderasn.abs_decode_path`. Second
4107 tuple element is ``{OID: pyderasn.Obj()}``
4108 dictionary, mapping between current OID value
4109 and structure applied to defined field.
4110 :ref:`Read about DEFINED BY <definedby>`
4111 :param bytes impl: override default tag with ``IMPLICIT`` one
4112 :param bytes expl: override default tag with ``EXPLICIT`` one
4113 :param default: set default value. Type same as in ``value``
4114 :param bool optional: is object ``OPTIONAL`` in sequence
4116 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4118 if value is not None:
4119 self._value = self._value_sanitize(value)
4120 if default is not None:
4121 default = self._value_sanitize(default)
4122 self.default = self.__class__(
4127 if self._value is None:
4128 self._value = default
4129 self.defines = defines
4131 def __add__(self, their):
4132 if their.__class__ == tuple:
4133 return self.__class__(self._value + their)
4134 if isinstance(their, self.__class__):
4135 return self.__class__(self._value + their._value)
4136 raise InvalidValueType((self.__class__, tuple))
4138 def _value_sanitize(self, value):
4139 if issubclass(value.__class__, ObjectIdentifier):
4141 if isinstance(value, string_types):
4143 value = tuple(pureint(arc) for arc in value.split("."))
4145 raise InvalidOID("unacceptable arcs values")
4146 if value.__class__ == tuple:
4148 raise InvalidOID("less than 2 arcs")
4149 first_arc = value[0]
4150 if first_arc in (0, 1):
4151 if not (0 <= value[1] <= 39):
4152 raise InvalidOID("second arc is too wide")
4153 elif first_arc == 2:
4156 raise InvalidOID("unacceptable first arc value")
4157 if not all(arc >= 0 for arc in value):
4158 raise InvalidOID("negative arc value")
4160 raise InvalidValueType((self.__class__, str, tuple))
4164 return self._value is not None
4166 def __getstate__(self):
4167 return ObjectIdentifierState(
4184 def __setstate__(self, state):
4185 super(ObjectIdentifier, self).__setstate__(state)
4186 self._value = state.value
4187 self.defines = state.defines
4190 self._assert_ready()
4191 return iter(self._value)
4194 return ".".join(str(arc) for arc in self._value or ())
4197 self._assert_ready()
4200 bytes(self._expl or b"") +
4201 str(self._value).encode("ascii"),
4204 def __eq__(self, their):
4205 if their.__class__ == tuple:
4206 return self._value == their
4207 if not issubclass(their.__class__, ObjectIdentifier):
4210 self.tag == their.tag and
4211 self._expl == their._expl and
4212 self._value == their._value
4215 def __lt__(self, their):
4216 return self._value < their._value
4227 return self.__class__(
4229 defines=self.defines if defines is None else defines,
4230 impl=self.tag if impl is None else impl,
4231 expl=self._expl if expl is None else expl,
4232 default=self.default if default is None else default,
4233 optional=self.optional if optional is None else optional,
4237 self._assert_ready()
4239 first_value = value[1]
4240 first_arc = value[0]
4243 elif first_arc == 1:
4245 elif first_arc == 2:
4247 else: # pragma: no cover
4248 raise RuntimeError("invalid arc is stored")
4249 octets = [zero_ended_encode(first_value)]
4250 for arc in value[2:]:
4251 octets.append(zero_ended_encode(arc))
4252 v = b"".join(octets)
4253 return b"".join((self.tag, len_encode(len(v)), v))
4255 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4257 t, _, lv = tag_strip(tlv)
4258 except DecodeError as err:
4259 raise err.__class__(
4261 klass=self.__class__,
4262 decode_path=decode_path,
4267 klass=self.__class__,
4268 decode_path=decode_path,
4271 if tag_only: # pragma: no cover
4275 l, llen, v = len_decode(lv)
4276 except DecodeError as err:
4277 raise err.__class__(
4279 klass=self.__class__,
4280 decode_path=decode_path,
4284 raise NotEnoughData(
4285 "encoded length is longer than data",
4286 klass=self.__class__,
4287 decode_path=decode_path,
4291 raise NotEnoughData(
4293 klass=self.__class__,
4294 decode_path=decode_path,
4297 v, tail = v[:l], v[l:]
4304 octet = indexbytes(v, i)
4305 if i == 0 and octet == 0x80:
4306 if ctx.get("bered", False):
4309 raise DecodeError("non normalized arc encoding")
4310 arc = (arc << 7) | (octet & 0x7F)
4311 if octet & 0x80 == 0:
4319 klass=self.__class__,
4320 decode_path=decode_path,
4324 second_arc = arcs[0]
4325 if 0 <= second_arc <= 39:
4327 elif 40 <= second_arc <= 79:
4333 obj = self.__class__(
4334 value=tuple([first_arc, second_arc] + arcs[1:]),
4337 default=self.default,
4338 optional=self.optional,
4339 _decoded=(offset, llen, l),
4342 obj.ber_encoded = True
4343 yield decode_path, obj, tail
4346 return pp_console_row(next(self.pps()))
4348 def pps(self, decode_path=()):
4351 asn1_type_name=self.asn1_type_name,
4352 obj_name=self.__class__.__name__,
4353 decode_path=decode_path,
4354 value=str(self) if self.ready else None,
4355 optional=self.optional,
4356 default=self == self.default,
4357 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4358 expl=None if self._expl is None else tag_decode(self._expl),
4363 expl_offset=self.expl_offset if self.expled else None,
4364 expl_tlen=self.expl_tlen if self.expled else None,
4365 expl_llen=self.expl_llen if self.expled else None,
4366 expl_vlen=self.expl_vlen if self.expled else None,
4367 expl_lenindef=self.expl_lenindef,
4368 ber_encoded=self.ber_encoded,
4371 for pp in self.pps_lenindef(decode_path):
4375 class Enumerated(Integer):
4376 """``ENUMERATED`` integer type
4378 This type is identical to :py:class:`pyderasn.Integer`, but requires
4379 schema to be specified and does not accept values missing from it.
4382 tag_default = tag_encode(10)
4383 asn1_type_name = "ENUMERATED"
4394 bounds=None, # dummy argument, workability for Integer.decode
4396 super(Enumerated, self).__init__(
4397 value, bounds, impl, expl, default, optional, _specs, _decoded,
4399 if len(self.specs) == 0:
4400 raise ValueError("schema must be specified")
4402 def _value_sanitize(self, value):
4403 if isinstance(value, self.__class__):
4404 value = value._value
4405 elif isinstance(value, integer_types):
4406 for _value in itervalues(self.specs):
4411 "unknown integer value: %s" % value,
4412 klass=self.__class__,
4414 elif isinstance(value, string_types):
4415 value = self.specs.get(value)
4417 raise ObjUnknown("integer value: %s" % value)
4419 raise InvalidValueType((self.__class__, int, str))
4431 return self.__class__(
4433 impl=self.tag if impl is None else impl,
4434 expl=self._expl if expl is None else expl,
4435 default=self.default if default is None else default,
4436 optional=self.optional if optional is None else optional,
4441 def escape_control_unicode(c):
4442 if unicat(c)[0] == "C":
4443 c = repr(c).lstrip("u").strip("'")
4447 class CommonString(OctetString):
4448 """Common class for all strings
4450 Everything resembles :py:class:`pyderasn.OctetString`, except
4451 ability to deal with unicode text strings.
4453 >>> hexenc("привет мир".encode("utf-8"))
4454 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4455 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4457 >>> s = UTF8String("привет мир")
4458 UTF8String UTF8String привет мир
4460 'привет мир'
4461 >>> hexenc(bytes(s))
4462 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4464 >>> PrintableString("привет мир")
4465 Traceback (most recent call last):
4466 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4468 >>> BMPString("ада", bounds=(2, 2))
4469 Traceback (most recent call last):
4470 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4471 >>> s = BMPString("ад", bounds=(2, 2))
4474 >>> hexenc(bytes(s))
4482 * - :py:class:`pyderasn.UTF8String`
4484 * - :py:class:`pyderasn.NumericString`
4486 * - :py:class:`pyderasn.PrintableString`
4488 * - :py:class:`pyderasn.TeletexString`
4490 * - :py:class:`pyderasn.T61String`
4492 * - :py:class:`pyderasn.VideotexString`
4494 * - :py:class:`pyderasn.IA5String`
4496 * - :py:class:`pyderasn.GraphicString`
4498 * - :py:class:`pyderasn.VisibleString`
4500 * - :py:class:`pyderasn.ISO646String`
4502 * - :py:class:`pyderasn.GeneralString`
4504 * - :py:class:`pyderasn.UniversalString`
4506 * - :py:class:`pyderasn.BMPString`
4511 def _value_sanitize(self, value):
4513 value_decoded = None
4514 if isinstance(value, self.__class__):
4515 value_raw = value._value
4516 elif value.__class__ == text_type:
4517 value_decoded = value
4518 elif value.__class__ == binary_type:
4521 raise InvalidValueType((self.__class__, text_type, binary_type))
4524 value_decoded.encode(self.encoding)
4525 if value_raw is None else value_raw
4528 value_raw.decode(self.encoding)
4529 if value_decoded is None else value_decoded
4531 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4532 raise DecodeError(str(err))
4533 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4541 def __eq__(self, their):
4542 if their.__class__ == binary_type:
4543 return self._value == their
4544 if their.__class__ == text_type:
4545 return self._value == their.encode(self.encoding)
4546 if not isinstance(their, self.__class__):
4549 self._value == their._value and
4550 self.tag == their.tag and
4551 self._expl == their._expl
4554 def __unicode__(self):
4556 return self._value.decode(self.encoding)
4557 return text_type(self._value)
4560 return pp_console_row(next(self.pps(no_unicode=PY2)))
4562 def pps(self, decode_path=(), no_unicode=False):
4566 hexenc(bytes(self)) if no_unicode else
4567 "".join(escape_control_unicode(c) for c in self.__unicode__())
4571 asn1_type_name=self.asn1_type_name,
4572 obj_name=self.__class__.__name__,
4573 decode_path=decode_path,
4575 optional=self.optional,
4576 default=self == self.default,
4577 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4578 expl=None if self._expl is None else tag_decode(self._expl),
4583 expl_offset=self.expl_offset if self.expled else None,
4584 expl_tlen=self.expl_tlen if self.expled else None,
4585 expl_llen=self.expl_llen if self.expled else None,
4586 expl_vlen=self.expl_vlen if self.expled else None,
4587 expl_lenindef=self.expl_lenindef,
4588 ber_encoded=self.ber_encoded,
4591 for pp in self.pps_lenindef(decode_path):
4595 class UTF8String(CommonString):
4597 tag_default = tag_encode(12)
4599 asn1_type_name = "UTF8String"
4602 class AllowableCharsMixin(object):
4604 def allowable_chars(self):
4606 return self._allowable_chars
4607 return frozenset(six_unichr(c) for c in self._allowable_chars)
4610 class NumericString(AllowableCharsMixin, CommonString):
4613 Its value is properly sanitized: only ASCII digits with spaces can
4616 >>> NumericString().allowable_chars
4617 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4620 tag_default = tag_encode(18)
4622 asn1_type_name = "NumericString"
4623 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4625 def _value_sanitize(self, value):
4626 value = super(NumericString, self)._value_sanitize(value)
4627 if not frozenset(value) <= self._allowable_chars:
4628 raise DecodeError("non-numeric value")
4632 PrintableStringState = namedtuple(
4633 "PrintableStringState",
4634 OctetStringState._fields + ("allowable_chars",),
4639 class PrintableString(AllowableCharsMixin, CommonString):
4642 Its value is properly sanitized: see X.680 41.4 table 10.
4644 >>> PrintableString().allowable_chars
4645 frozenset([' ', "'", ..., 'z'])
4646 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4647 PrintableString PrintableString foo*bar
4648 >>> obj.allow_asterisk, obj.allow_ampersand
4652 tag_default = tag_encode(19)
4654 asn1_type_name = "PrintableString"
4655 _allowable_chars = frozenset(
4656 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4658 _asterisk = frozenset("*".encode("ascii"))
4659 _ampersand = frozenset("&".encode("ascii"))
4671 allow_asterisk=False,
4672 allow_ampersand=False,
4675 :param allow_asterisk: allow asterisk character
4676 :param allow_ampersand: allow ampersand character
4679 self._allowable_chars |= self._asterisk
4681 self._allowable_chars |= self._ampersand
4682 super(PrintableString, self).__init__(
4683 value, bounds, impl, expl, default, optional, _decoded, ctx,
4687 def allow_asterisk(self):
4688 """Is asterisk character allowed?
4690 return self._asterisk <= self._allowable_chars
4693 def allow_ampersand(self):
4694 """Is ampersand character allowed?
4696 return self._ampersand <= self._allowable_chars
4698 def _value_sanitize(self, value):
4699 value = super(PrintableString, self)._value_sanitize(value)
4700 if not frozenset(value) <= self._allowable_chars:
4701 raise DecodeError("non-printable value")
4704 def __getstate__(self):
4705 return PrintableStringState(
4706 *super(PrintableString, self).__getstate__(),
4707 **{"allowable_chars": self._allowable_chars}
4710 def __setstate__(self, state):
4711 super(PrintableString, self).__setstate__(state)
4712 self._allowable_chars = state.allowable_chars
4723 return self.__class__(
4726 (self._bound_min, self._bound_max)
4727 if bounds is None else bounds
4729 impl=self.tag if impl is None else impl,
4730 expl=self._expl if expl is None else expl,
4731 default=self.default if default is None else default,
4732 optional=self.optional if optional is None else optional,
4733 allow_asterisk=self.allow_asterisk,
4734 allow_ampersand=self.allow_ampersand,
4738 class TeletexString(CommonString):
4740 tag_default = tag_encode(20)
4742 asn1_type_name = "TeletexString"
4745 class T61String(TeletexString):
4747 asn1_type_name = "T61String"
4750 class VideotexString(CommonString):
4752 tag_default = tag_encode(21)
4753 encoding = "iso-8859-1"
4754 asn1_type_name = "VideotexString"
4757 class IA5String(CommonString):
4759 tag_default = tag_encode(22)
4761 asn1_type_name = "IA5"
4764 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4765 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4766 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4769 class VisibleString(CommonString):
4771 tag_default = tag_encode(26)
4773 asn1_type_name = "VisibleString"
4776 UTCTimeState = namedtuple(
4778 OctetStringState._fields + ("ber_raw",),
4783 def str_to_time_fractions(value):
4785 year, v = (v // 10**10), (v % 10**10)
4786 month, v = (v // 10**8), (v % 10**8)
4787 day, v = (v // 10**6), (v % 10**6)
4788 hour, v = (v // 10**4), (v % 10**4)
4789 minute, second = (v // 100), (v % 100)
4790 return year, month, day, hour, minute, second
4793 class UTCTime(VisibleString):
4794 """``UTCTime`` datetime type
4796 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
4797 UTCTime UTCTime 2017-09-30T22:07:50
4803 datetime.datetime(2017, 9, 30, 22, 7, 50)
4804 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
4805 datetime.datetime(1957, 9, 30, 22, 7, 50)
4807 If BER encoded value was met, then ``ber_raw`` attribute will hold
4808 its raw representation.
4812 Pay attention that UTCTime can not hold full year, so all years
4813 having < 50 years are treated as 20xx, 19xx otherwise, according
4814 to X.509 recommendation.
4818 No strict validation of UTC offsets are made, but very crude:
4820 * minutes are not exceeding 60
4821 * offset value is not exceeding 14 hours
4823 __slots__ = ("ber_raw",)
4824 tag_default = tag_encode(23)
4826 asn1_type_name = "UTCTime"
4827 evgen_mode_skip_value = False
4837 bounds=None, # dummy argument, workability for OctetString.decode
4841 :param value: set the value. Either datetime type, or
4842 :py:class:`pyderasn.UTCTime` object
4843 :param bytes impl: override default tag with ``IMPLICIT`` one
4844 :param bytes expl: override default tag with ``EXPLICIT`` one
4845 :param default: set default value. Type same as in ``value``
4846 :param bool optional: is object ``OPTIONAL`` in sequence
4848 super(UTCTime, self).__init__(
4849 None, None, impl, expl, None, optional, _decoded, ctx,
4853 if value is not None:
4854 self._value, self.ber_raw = self._value_sanitize(value, ctx)
4855 self.ber_encoded = self.ber_raw is not None
4856 if default is not None:
4857 default, _ = self._value_sanitize(default)
4858 self.default = self.__class__(
4863 if self._value is None:
4864 self._value = default
4866 self.optional = optional
4868 def _strptime_bered(self, value):
4869 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
4872 raise ValueError("no timezone")
4873 year += 2000 if year < 50 else 1900
4874 decoded = datetime(year, month, day, hour, minute)
4876 if value[-1] == "Z":
4880 raise ValueError("invalid UTC offset")
4881 if value[-5] == "-":
4883 elif value[-5] == "+":
4886 raise ValueError("invalid UTC offset")
4887 v = pureint(value[-4:])
4888 offset, v = (60 * (v % 100)), v // 100
4890 raise ValueError("invalid UTC offset minutes")
4892 if offset > 14 * 3600:
4893 raise ValueError("too big UTC offset")
4897 return offset, decoded
4899 raise ValueError("invalid UTC offset seconds")
4900 seconds = pureint(value)
4902 raise ValueError("invalid seconds value")
4903 return offset, decoded + timedelta(seconds=seconds)
4905 def _strptime(self, value):
4906 # datetime.strptime's format: %y%m%d%H%M%SZ
4907 if len(value) != LEN_YYMMDDHHMMSSZ:
4908 raise ValueError("invalid UTCTime length")
4909 if value[-1] != "Z":
4910 raise ValueError("non UTC timezone")
4911 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
4912 year += 2000 if year < 50 else 1900
4913 return datetime(year, month, day, hour, minute, second)
4915 def _dt_sanitize(self, value):
4916 if value.year < 1950 or value.year > 2049:
4917 raise ValueError("UTCTime can hold only 1950-2049 years")
4918 return value.replace(microsecond=0)
4920 def _value_sanitize(self, value, ctx=None):
4921 if value.__class__ == binary_type:
4923 value_decoded = value.decode("ascii")
4924 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4925 raise DecodeError("invalid UTCTime encoding: %r" % err)
4928 return self._strptime(value_decoded), None
4929 except (TypeError, ValueError) as _err:
4931 if (ctx is not None) and ctx.get("bered", False):
4933 offset, _value = self._strptime_bered(value_decoded)
4934 _value = _value - timedelta(seconds=offset)
4935 return self._dt_sanitize(_value), value
4936 except (TypeError, ValueError, OverflowError) as _err:
4939 "invalid %s format: %r" % (self.asn1_type_name, err),
4940 klass=self.__class__,
4942 if isinstance(value, self.__class__):
4943 return value._value, None
4944 if value.__class__ == datetime:
4945 return self._dt_sanitize(value), None
4946 raise InvalidValueType((self.__class__, datetime))
4948 def _pp_value(self):
4950 value = self._value.isoformat()
4951 if self.ber_encoded:
4952 value += " (%s)" % self.ber_raw
4955 def __unicode__(self):
4957 value = self._value.isoformat()
4958 if self.ber_encoded:
4959 value += " (%s)" % self.ber_raw
4961 return text_type(self._pp_value())
4963 def __getstate__(self):
4964 return UTCTimeState(
4965 *super(UTCTime, self).__getstate__(),
4966 **{"ber_raw": self.ber_raw}
4969 def __setstate__(self, state):
4970 super(UTCTime, self).__setstate__(state)
4971 self.ber_raw = state.ber_raw
4973 def __bytes__(self):
4974 self._assert_ready()
4975 return self._encode_time()
4977 def __eq__(self, their):
4978 if their.__class__ == binary_type:
4979 return self._encode_time() == their
4980 if their.__class__ == datetime:
4981 return self.todatetime() == their
4982 if not isinstance(their, self.__class__):
4985 self._value == their._value and
4986 self.tag == their.tag and
4987 self._expl == their._expl
4990 def _encode_time(self):
4991 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
4994 self._assert_ready()
4995 value = self._encode_time()
4996 return b"".join((self.tag, len_encode(len(value)), value))
4998 def _encode_cer(self, writer):
4999 write_full(writer, self._encode())
5001 def todatetime(self):
5005 return pp_console_row(next(self.pps()))
5007 def pps(self, decode_path=()):
5010 asn1_type_name=self.asn1_type_name,
5011 obj_name=self.__class__.__name__,
5012 decode_path=decode_path,
5013 value=self._pp_value(),
5014 optional=self.optional,
5015 default=self == self.default,
5016 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5017 expl=None if self._expl is None else tag_decode(self._expl),
5022 expl_offset=self.expl_offset if self.expled else None,
5023 expl_tlen=self.expl_tlen if self.expled else None,
5024 expl_llen=self.expl_llen if self.expled else None,
5025 expl_vlen=self.expl_vlen if self.expled else None,
5026 expl_lenindef=self.expl_lenindef,
5027 ber_encoded=self.ber_encoded,
5030 for pp in self.pps_lenindef(decode_path):
5034 class GeneralizedTime(UTCTime):
5035 """``GeneralizedTime`` datetime type
5037 This type is similar to :py:class:`pyderasn.UTCTime`.
5039 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5040 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5042 '20170930220750.000123Z'
5043 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5044 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5048 Only microsecond fractions are supported in DER encoding.
5049 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5050 higher precision values.
5054 BER encoded data can loss information (accuracy) during decoding
5055 because of float transformations.
5059 Local times (without explicit timezone specification) are treated
5060 as UTC one, no transformations are made.
5064 Zero year is unsupported.
5067 tag_default = tag_encode(24)
5068 asn1_type_name = "GeneralizedTime"
5070 def _dt_sanitize(self, value):
5073 def _strptime_bered(self, value):
5074 if len(value) < 4 + 3 * 2:
5075 raise ValueError("invalid GeneralizedTime")
5076 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5077 decoded = datetime(year, month, day, hour)
5078 offset, value = 0, value[10:]
5080 return offset, decoded
5081 if value[-1] == "Z":
5084 for char, sign in (("-", -1), ("+", 1)):
5085 idx = value.rfind(char)
5088 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5089 v = pureint(offset_raw)
5090 if len(offset_raw) == 4:
5091 offset, v = (60 * (v % 100)), v // 100
5093 raise ValueError("invalid UTC offset minutes")
5094 elif len(offset_raw) == 2:
5097 raise ValueError("invalid UTC offset")
5099 if offset > 14 * 3600:
5100 raise ValueError("too big UTC offset")
5104 return offset, decoded
5105 if value[0] in DECIMAL_SIGNS:
5107 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5110 raise ValueError("stripped minutes")
5111 decoded += timedelta(seconds=60 * pureint(value[:2]))
5114 return offset, decoded
5115 if value[0] in DECIMAL_SIGNS:
5117 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5120 raise ValueError("stripped seconds")
5121 decoded += timedelta(seconds=pureint(value[:2]))
5124 return offset, decoded
5125 if value[0] not in DECIMAL_SIGNS:
5126 raise ValueError("invalid format after seconds")
5128 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5131 def _strptime(self, value):
5133 if l == LEN_YYYYMMDDHHMMSSZ:
5134 # datetime.strptime's format: %Y%m%d%H%M%SZ
5135 if value[-1] != "Z":
5136 raise ValueError("non UTC timezone")
5137 return datetime(*str_to_time_fractions(value[:-1]))
5138 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5139 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5140 if value[-1] != "Z":
5141 raise ValueError("non UTC timezone")
5142 if value[14] != ".":
5143 raise ValueError("no fractions separator")
5146 raise ValueError("trailing zero")
5149 raise ValueError("only microsecond fractions are supported")
5150 us = pureint(us + ("0" * (6 - us_len)))
5151 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5152 return datetime(year, month, day, hour, minute, second, us)
5153 raise ValueError("invalid GeneralizedTime length")
5155 def _encode_time(self):
5157 encoded = value.strftime("%Y%m%d%H%M%S")
5158 if value.microsecond > 0:
5159 encoded += (".%06d" % value.microsecond).rstrip("0")
5160 return (encoded + "Z").encode("ascii")
5163 class GraphicString(CommonString):
5165 tag_default = tag_encode(25)
5166 encoding = "iso-8859-1"
5167 asn1_type_name = "GraphicString"
5170 class ISO646String(VisibleString):
5172 asn1_type_name = "ISO646String"
5175 class GeneralString(CommonString):
5177 tag_default = tag_encode(27)
5178 encoding = "iso-8859-1"
5179 asn1_type_name = "GeneralString"
5182 class UniversalString(CommonString):
5184 tag_default = tag_encode(28)
5185 encoding = "utf-32-be"
5186 asn1_type_name = "UniversalString"
5189 class BMPString(CommonString):
5191 tag_default = tag_encode(30)
5192 encoding = "utf-16-be"
5193 asn1_type_name = "BMPString"
5196 ChoiceState = namedtuple(
5198 BasicState._fields + ("specs", "value",),
5204 """``CHOICE`` special type
5208 class GeneralName(Choice):
5210 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5211 ("dNSName", IA5String(impl=tag_ctxp(2))),
5214 >>> gn = GeneralName()
5216 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5217 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5218 >>> gn["dNSName"] = IA5String("bar.baz")
5219 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5220 >>> gn["rfc822Name"]
5223 [2] IA5String IA5 bar.baz
5226 >>> gn.value == gn["dNSName"]
5229 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5231 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5232 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5234 __slots__ = ("specs",)
5236 asn1_type_name = "CHOICE"
5249 :param value: set the value. Either ``(choice, value)`` tuple, or
5250 :py:class:`pyderasn.Choice` object
5251 :param bytes impl: can not be set, do **not** use it
5252 :param bytes expl: override default tag with ``EXPLICIT`` one
5253 :param default: set default value. Type same as in ``value``
5254 :param bool optional: is object ``OPTIONAL`` in sequence
5256 if impl is not None:
5257 raise ValueError("no implicit tag allowed for CHOICE")
5258 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5260 schema = getattr(self, "schema", ())
5261 if len(schema) == 0:
5262 raise ValueError("schema must be specified")
5264 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5267 if value is not None:
5268 self._value = self._value_sanitize(value)
5269 if default is not None:
5270 default_value = self._value_sanitize(default)
5271 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5272 default_obj.specs = self.specs
5273 default_obj._value = default_value
5274 self.default = default_obj
5276 self._value = copy(default_obj._value)
5277 if self._expl is not None:
5278 tag_class, _, tag_num = tag_decode(self._expl)
5279 self._tag_order = (tag_class, tag_num)
5281 def _value_sanitize(self, value):
5282 if (value.__class__ == tuple) and len(value) == 2:
5284 spec = self.specs.get(choice)
5286 raise ObjUnknown(choice)
5287 if not isinstance(obj, spec.__class__):
5288 raise InvalidValueType((spec,))
5289 return (choice, spec(obj))
5290 if isinstance(value, self.__class__):
5292 raise InvalidValueType((self.__class__, tuple))
5296 return self._value is not None and self._value[1].ready
5300 return self.expl_lenindef or (
5301 (self._value is not None) and
5302 self._value[1].bered
5305 def __getstate__(self):
5323 def __setstate__(self, state):
5324 super(Choice, self).__setstate__(state)
5325 self.specs = state.specs
5326 self._value = state.value
5328 def __eq__(self, their):
5329 if (their.__class__ == tuple) and len(their) == 2:
5330 return self._value == their
5331 if not isinstance(their, self.__class__):
5334 self.specs == their.specs and
5335 self._value == their._value
5345 return self.__class__(
5348 expl=self._expl if expl is None else expl,
5349 default=self.default if default is None else default,
5350 optional=self.optional if optional is None else optional,
5355 """Name of the choice
5357 self._assert_ready()
5358 return self._value[0]
5362 """Value of underlying choice
5364 self._assert_ready()
5365 return self._value[1]
5368 def tag_order(self):
5369 self._assert_ready()
5370 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5373 def tag_order_cer(self):
5374 return min(v.tag_order_cer for v in itervalues(self.specs))
5376 def __getitem__(self, key):
5377 if key not in self.specs:
5378 raise ObjUnknown(key)
5379 if self._value is None:
5381 choice, value = self._value
5386 def __setitem__(self, key, value):
5387 spec = self.specs.get(key)
5389 raise ObjUnknown(key)
5390 if not isinstance(value, spec.__class__):
5391 raise InvalidValueType((spec.__class__,))
5392 self._value = (key, spec(value))
5400 return self._value[1].decoded if self.ready else False
5403 self._assert_ready()
5404 return self._value[1].encode()
5406 def _encode_cer(self, writer):
5407 self._assert_ready()
5408 self._value[1].encode_cer(writer)
5410 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5411 for choice, spec in iteritems(self.specs):
5412 sub_decode_path = decode_path + (choice,)
5418 decode_path=sub_decode_path,
5421 _ctx_immutable=False,
5428 klass=self.__class__,
5429 decode_path=decode_path,
5432 if tag_only: # pragma: no cover
5436 for _decode_path, value, tail in spec.decode_evgen(
5440 decode_path=sub_decode_path,
5442 _ctx_immutable=False,
5444 yield _decode_path, value, tail
5446 _, value, tail = next(spec.decode_evgen(
5450 decode_path=sub_decode_path,
5452 _ctx_immutable=False,
5455 obj = self.__class__(
5458 default=self.default,
5459 optional=self.optional,
5460 _decoded=(offset, 0, value.fulllen),
5462 obj._value = (choice, value)
5463 yield decode_path, obj, tail
5466 value = pp_console_row(next(self.pps()))
5468 value = "%s[%r]" % (value, self.value)
5471 def pps(self, decode_path=()):
5474 asn1_type_name=self.asn1_type_name,
5475 obj_name=self.__class__.__name__,
5476 decode_path=decode_path,
5477 value=self.choice if self.ready else None,
5478 optional=self.optional,
5479 default=self == self.default,
5480 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5481 expl=None if self._expl is None else tag_decode(self._expl),
5486 expl_lenindef=self.expl_lenindef,
5490 yield self.value.pps(decode_path=decode_path + (self.choice,))
5491 for pp in self.pps_lenindef(decode_path):
5495 class PrimitiveTypes(Choice):
5496 """Predefined ``CHOICE`` for all generic primitive types
5498 It could be useful for general decoding of some unspecified values:
5500 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5501 OCTET STRING 3 bytes 666f6f
5502 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5506 schema = tuple((klass.__name__, klass()) for klass in (
5530 AnyState = namedtuple(
5532 BasicState._fields + ("value", "defined"),
5538 """``ANY`` special type
5540 >>> Any(Integer(-123))
5541 ANY INTEGER -123 (0X:7B)
5542 >>> a = Any(OctetString(b"hello world").encode())
5543 ANY 040b68656c6c6f20776f726c64
5544 >>> hexenc(bytes(a))
5545 b'0x040x0bhello world'
5547 __slots__ = ("defined",)
5548 tag_default = tag_encode(0)
5549 asn1_type_name = "ANY"
5559 :param value: set the value. Either any kind of pyderasn's
5560 **ready** object, or bytes. Pay attention that
5561 **no** validation is performed if raw binary value
5562 is valid TLV, except just tag decoding
5563 :param bytes expl: override default tag with ``EXPLICIT`` one
5564 :param bool optional: is object ``OPTIONAL`` in sequence
5566 super(Any, self).__init__(None, expl, None, optional, _decoded)
5570 value = self._value_sanitize(value)
5572 if self._expl is None:
5573 if value.__class__ == binary_type:
5574 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5576 tag_class, tag_num = value.tag_order
5578 tag_class, _, tag_num = tag_decode(self._expl)
5579 self._tag_order = (tag_class, tag_num)
5582 def _value_sanitize(self, value):
5583 if value.__class__ == binary_type:
5585 raise ValueError("Any value can not be empty")
5587 if isinstance(value, self.__class__):
5589 if not isinstance(value, Obj):
5590 raise InvalidValueType((self.__class__, Obj, binary_type))
5595 return self._value is not None
5598 def tag_order(self):
5599 self._assert_ready()
5600 return self._tag_order
5604 if self.expl_lenindef or self.lenindef:
5606 if self.defined is None:
5608 return self.defined[1].bered
5610 def __getstate__(self):
5628 def __setstate__(self, state):
5629 super(Any, self).__setstate__(state)
5630 self._value = state.value
5631 self.defined = state.defined
5633 def __eq__(self, their):
5634 if their.__class__ == binary_type:
5635 if self._value.__class__ == binary_type:
5636 return self._value == their
5637 return self._value.encode() == their
5638 if issubclass(their.__class__, Any):
5639 if self.ready and their.ready:
5640 return bytes(self) == bytes(their)
5641 return self.ready == their.ready
5650 return self.__class__(
5652 expl=self._expl if expl is None else expl,
5653 optional=self.optional if optional is None else optional,
5656 def __bytes__(self):
5657 self._assert_ready()
5659 if value.__class__ == binary_type:
5661 return self._value.encode()
5668 self._assert_ready()
5670 if value.__class__ == binary_type:
5672 return value.encode()
5674 def _encode_cer(self, writer):
5675 self._assert_ready()
5677 if value.__class__ == binary_type:
5678 write_full(writer, value)
5680 value.encode_cer(writer)
5682 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5684 t, tlen, lv = tag_strip(tlv)
5685 except DecodeError as err:
5686 raise err.__class__(
5688 klass=self.__class__,
5689 decode_path=decode_path,
5693 l, llen, v = len_decode(lv)
5694 except LenIndefForm as err:
5695 if not ctx.get("bered", False):
5696 raise err.__class__(
5698 klass=self.__class__,
5699 decode_path=decode_path,
5702 llen, vlen, v = 1, 0, lv[1:]
5703 sub_offset = offset + tlen + llen
5705 while v[:EOC_LEN].tobytes() != EOC:
5706 chunk, v = Any().decode(
5709 decode_path=decode_path + (str(chunk_i),),
5712 _ctx_immutable=False,
5714 vlen += chunk.tlvlen
5715 sub_offset += chunk.tlvlen
5717 tlvlen = tlen + llen + vlen + EOC_LEN
5718 obj = self.__class__(
5719 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5721 optional=self.optional,
5722 _decoded=(offset, 0, tlvlen),
5725 obj.tag = t.tobytes()
5726 yield decode_path, obj, v[EOC_LEN:]
5728 except DecodeError as err:
5729 raise err.__class__(
5731 klass=self.__class__,
5732 decode_path=decode_path,
5736 raise NotEnoughData(
5737 "encoded length is longer than data",
5738 klass=self.__class__,
5739 decode_path=decode_path,
5742 tlvlen = tlen + llen + l
5743 v, tail = tlv[:tlvlen], v[l:]
5744 obj = self.__class__(
5745 value=None if evgen_mode else v.tobytes(),
5747 optional=self.optional,
5748 _decoded=(offset, 0, tlvlen),
5750 obj.tag = t.tobytes()
5751 yield decode_path, obj, tail
5754 return pp_console_row(next(self.pps()))
5756 def pps(self, decode_path=()):
5760 elif value.__class__ == binary_type:
5766 asn1_type_name=self.asn1_type_name,
5767 obj_name=self.__class__.__name__,
5768 decode_path=decode_path,
5770 blob=self._value if self._value.__class__ == binary_type else None,
5771 optional=self.optional,
5772 default=self == self.default,
5773 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5774 expl=None if self._expl is None else tag_decode(self._expl),
5779 expl_offset=self.expl_offset if self.expled else None,
5780 expl_tlen=self.expl_tlen if self.expled else None,
5781 expl_llen=self.expl_llen if self.expled else None,
5782 expl_vlen=self.expl_vlen if self.expled else None,
5783 expl_lenindef=self.expl_lenindef,
5784 lenindef=self.lenindef,
5787 defined_by, defined = self.defined or (None, None)
5788 if defined_by is not None:
5790 decode_path=decode_path + (DecodePathDefBy(defined_by),)
5792 for pp in self.pps_lenindef(decode_path):
5796 ########################################################################
5797 # ASN.1 constructed types
5798 ########################################################################
5800 def get_def_by_path(defines_by_path, sub_decode_path):
5801 """Get define by decode path
5803 for path, define in defines_by_path:
5804 if len(path) != len(sub_decode_path):
5806 for p1, p2 in zip(path, sub_decode_path):
5807 if (not p1 is any) and (p1 != p2):
5813 def abs_decode_path(decode_path, rel_path):
5814 """Create an absolute decode path from current and relative ones
5816 :param decode_path: current decode path, starting point. Tuple of strings
5817 :param rel_path: relative path to ``decode_path``. Tuple of strings.
5818 If first tuple's element is "/", then treat it as
5819 an absolute path, ignoring ``decode_path`` as
5820 starting point. Also this tuple can contain ".."
5821 elements, stripping the leading element from
5824 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
5825 ("foo", "bar", "baz", "whatever")
5826 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
5828 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
5831 if rel_path[0] == "/":
5833 if rel_path[0] == "..":
5834 return abs_decode_path(decode_path[:-1], rel_path[1:])
5835 return decode_path + rel_path
5838 SequenceState = namedtuple(
5840 BasicState._fields + ("specs", "value",),
5845 class Sequence(Obj):
5846 """``SEQUENCE`` structure type
5848 You have to make specification of sequence::
5850 class Extension(Sequence):
5852 ("extnID", ObjectIdentifier()),
5853 ("critical", Boolean(default=False)),
5854 ("extnValue", OctetString()),
5857 Then, you can work with it as with dictionary.
5859 >>> ext = Extension()
5860 >>> Extension().specs
5862 ('extnID', OBJECT IDENTIFIER),
5863 ('critical', BOOLEAN False OPTIONAL DEFAULT),
5864 ('extnValue', OCTET STRING),
5866 >>> ext["extnID"] = "1.2.3"
5867 Traceback (most recent call last):
5868 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
5869 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
5871 You can determine if sequence is ready to be encoded:
5876 Traceback (most recent call last):
5877 pyderasn.ObjNotReady: object is not ready: extnValue
5878 >>> ext["extnValue"] = OctetString(b"foobar")
5882 Value you want to assign, must have the same **type** as in
5883 corresponding specification, but it can have different tags,
5884 optional/default attributes -- they will be taken from specification
5887 class TBSCertificate(Sequence):
5889 ("version", Version(expl=tag_ctxc(0), default="v1")),
5892 >>> tbs = TBSCertificate()
5893 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
5895 Assign ``None`` to remove value from sequence.
5897 You can set values in Sequence during its initialization:
5899 >>> AlgorithmIdentifier((
5900 ("algorithm", ObjectIdentifier("1.2.3")),
5901 ("parameters", Any(Null()))
5903 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
5905 You can determine if value exists/set in the sequence and take its value:
5907 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
5910 OBJECT IDENTIFIER 1.2.3
5912 But pay attention that if value has default, then it won't be (not
5913 in) in the sequence (because ``DEFAULT`` must not be encoded in
5914 DER), but you can read its value:
5916 >>> "critical" in ext, ext["critical"]
5917 (False, BOOLEAN False)
5918 >>> ext["critical"] = Boolean(True)
5919 >>> "critical" in ext, ext["critical"]
5920 (True, BOOLEAN True)
5922 All defaulted values are always optional.
5924 .. _allow_default_values_ctx:
5926 DER prohibits default value encoding and will raise an error if
5927 default value is unexpectedly met during decode.
5928 If :ref:`bered <bered_ctx>` context option is set, then no error
5929 will be raised, but ``bered`` attribute set. You can disable strict
5930 defaulted values existence validation by setting
5931 ``"allow_default_values": True`` :ref:`context <ctx>` option.
5935 Check for default value existence is not performed in
5936 ``evgen_mode``, because previously decoded values are not stored
5937 in memory, to be able to compare them.
5939 Two sequences are equal if they have equal specification (schema),
5940 implicit/explicit tagging and the same values.
5942 __slots__ = ("specs",)
5943 tag_default = tag_encode(form=TagFormConstructed, num=16)
5944 asn1_type_name = "SEQUENCE"
5956 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
5958 schema = getattr(self, "schema", ())
5960 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5963 if value is not None:
5964 if issubclass(value.__class__, Sequence):
5965 self._value = value._value
5966 elif hasattr(value, "__iter__"):
5967 for seq_key, seq_value in value:
5968 self[seq_key] = seq_value
5970 raise InvalidValueType((Sequence,))
5971 if default is not None:
5972 if not issubclass(default.__class__, Sequence):
5973 raise InvalidValueType((Sequence,))
5974 default_value = default._value
5975 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5976 default_obj.specs = self.specs
5977 default_obj._value = default_value
5978 self.default = default_obj
5980 self._value = copy(default_obj._value)
5984 for name, spec in iteritems(self.specs):
5985 value = self._value.get(name)
5996 if self.expl_lenindef or self.lenindef or self.ber_encoded:
5998 return any(value.bered for value in itervalues(self._value))
6000 def __getstate__(self):
6001 return SequenceState(
6015 {k: copy(v) for k, v in iteritems(self._value)},
6018 def __setstate__(self, state):
6019 super(Sequence, self).__setstate__(state)
6020 self.specs = state.specs
6021 self._value = state.value
6023 def __eq__(self, their):
6024 if not isinstance(their, self.__class__):
6027 self.specs == their.specs and
6028 self.tag == their.tag and
6029 self._expl == their._expl and
6030 self._value == their._value
6041 return self.__class__(
6044 impl=self.tag if impl is None else impl,
6045 expl=self._expl if expl is None else expl,
6046 default=self.default if default is None else default,
6047 optional=self.optional if optional is None else optional,
6050 def __contains__(self, key):
6051 return key in self._value
6053 def __setitem__(self, key, value):
6054 spec = self.specs.get(key)
6056 raise ObjUnknown(key)
6058 self._value.pop(key, None)
6060 if not isinstance(value, spec.__class__):
6061 raise InvalidValueType((spec.__class__,))
6062 value = spec(value=value)
6063 if spec.default is not None and value == spec.default:
6064 self._value.pop(key, None)
6066 self._value[key] = value
6068 def __getitem__(self, key):
6069 value = self._value.get(key)
6070 if value is not None:
6072 spec = self.specs.get(key)
6074 raise ObjUnknown(key)
6075 if spec.default is not None:
6079 def _values_for_encoding(self):
6080 for name, spec in iteritems(self.specs):
6081 value = self._value.get(name)
6085 raise ObjNotReady(name)
6089 v = b"".join(v.encode() for v in self._values_for_encoding())
6090 return b"".join((self.tag, len_encode(len(v)), v))
6092 def _encode_cer(self, writer):
6093 write_full(writer, self.tag + LENINDEF)
6094 for v in self._values_for_encoding():
6095 v.encode_cer(writer)
6096 write_full(writer, EOC)
6098 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6100 t, tlen, lv = tag_strip(tlv)
6101 except DecodeError as err:
6102 raise err.__class__(
6104 klass=self.__class__,
6105 decode_path=decode_path,
6110 klass=self.__class__,
6111 decode_path=decode_path,
6114 if tag_only: # pragma: no cover
6118 ctx_bered = ctx.get("bered", False)
6120 l, llen, v = len_decode(lv)
6121 except LenIndefForm as err:
6123 raise err.__class__(
6125 klass=self.__class__,
6126 decode_path=decode_path,
6129 l, llen, v = 0, 1, lv[1:]
6131 except DecodeError as err:
6132 raise err.__class__(
6134 klass=self.__class__,
6135 decode_path=decode_path,
6139 raise NotEnoughData(
6140 "encoded length is longer than data",
6141 klass=self.__class__,
6142 decode_path=decode_path,
6146 v, tail = v[:l], v[l:]
6148 sub_offset = offset + tlen + llen
6151 ctx_allow_default_values = ctx.get("allow_default_values", False)
6152 for name, spec in iteritems(self.specs):
6153 if spec.optional and (
6154 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6158 sub_decode_path = decode_path + (name,)
6161 for _decode_path, value, v_tail in spec.decode_evgen(
6165 decode_path=sub_decode_path,
6167 _ctx_immutable=False,
6169 yield _decode_path, value, v_tail
6171 _, value, v_tail = next(spec.decode_evgen(
6175 decode_path=sub_decode_path,
6177 _ctx_immutable=False,
6180 except TagMismatch as err:
6181 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6185 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6186 if not evgen_mode and defined is not None:
6187 defined_by, defined_spec = defined
6188 if issubclass(value.__class__, SequenceOf):
6189 for i, _value in enumerate(value):
6190 sub_sub_decode_path = sub_decode_path + (
6192 DecodePathDefBy(defined_by),
6194 defined_value, defined_tail = defined_spec.decode(
6195 memoryview(bytes(_value)),
6197 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6198 if value.expled else (value.tlen + value.llen)
6201 decode_path=sub_sub_decode_path,
6203 _ctx_immutable=False,
6205 if len(defined_tail) > 0:
6208 klass=self.__class__,
6209 decode_path=sub_sub_decode_path,
6212 _value.defined = (defined_by, defined_value)
6214 defined_value, defined_tail = defined_spec.decode(
6215 memoryview(bytes(value)),
6217 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6218 if value.expled else (value.tlen + value.llen)
6221 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6223 _ctx_immutable=False,
6225 if len(defined_tail) > 0:
6228 klass=self.__class__,
6229 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6232 value.defined = (defined_by, defined_value)
6234 value_len = value.fulllen
6236 sub_offset += value_len
6239 if spec.default is not None and value == spec.default:
6240 # This will not work in evgen_mode
6241 if ctx_bered or ctx_allow_default_values:
6245 "DEFAULT value met",
6246 klass=self.__class__,
6247 decode_path=sub_decode_path,
6250 values[name] = value
6251 spec_defines = getattr(spec, "defines", ())
6252 if len(spec_defines) == 0:
6253 defines_by_path = ctx.get("defines_by_path", ())
6254 if len(defines_by_path) > 0:
6255 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6256 if spec_defines is not None and len(spec_defines) > 0:
6257 for rel_path, schema in spec_defines:
6258 defined = schema.get(value, None)
6259 if defined is not None:
6260 ctx.setdefault("_defines", []).append((
6261 abs_decode_path(sub_decode_path[:-1], rel_path),
6265 if v[:EOC_LEN].tobytes() != EOC:
6268 klass=self.__class__,
6269 decode_path=decode_path,
6277 klass=self.__class__,
6278 decode_path=decode_path,
6281 obj = self.__class__(
6285 default=self.default,
6286 optional=self.optional,
6287 _decoded=(offset, llen, vlen),
6290 obj.lenindef = lenindef
6291 obj.ber_encoded = ber_encoded
6292 yield decode_path, obj, tail
6295 value = pp_console_row(next(self.pps()))
6297 for name in self.specs:
6298 _value = self._value.get(name)
6301 cols.append("%s: %s" % (name, repr(_value)))
6302 return "%s[%s]" % (value, "; ".join(cols))
6304 def pps(self, decode_path=()):
6307 asn1_type_name=self.asn1_type_name,
6308 obj_name=self.__class__.__name__,
6309 decode_path=decode_path,
6310 optional=self.optional,
6311 default=self == self.default,
6312 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6313 expl=None if self._expl is None else tag_decode(self._expl),
6318 expl_offset=self.expl_offset if self.expled else None,
6319 expl_tlen=self.expl_tlen if self.expled else None,
6320 expl_llen=self.expl_llen if self.expled else None,
6321 expl_vlen=self.expl_vlen if self.expled else None,
6322 expl_lenindef=self.expl_lenindef,
6323 lenindef=self.lenindef,
6324 ber_encoded=self.ber_encoded,
6327 for name in self.specs:
6328 value = self._value.get(name)
6331 yield value.pps(decode_path=decode_path + (name,))
6332 for pp in self.pps_lenindef(decode_path):
6336 class Set(Sequence):
6337 """``SET`` structure type
6339 Its usage is identical to :py:class:`pyderasn.Sequence`.
6341 .. _allow_unordered_set_ctx:
6343 DER prohibits unordered values encoding and will raise an error
6344 during decode. If :ref:`bered <bered_ctx>` context option is set,
6345 then no error will occur. Also you can disable strict values
6346 ordering check by setting ``"allow_unordered_set": True``
6347 :ref:`context <ctx>` option.
6350 tag_default = tag_encode(form=TagFormConstructed, num=17)
6351 asn1_type_name = "SET"
6354 v = b"".join(value.encode() for value in sorted(
6355 self._values_for_encoding(),
6356 key=attrgetter("tag_order"),
6358 return b"".join((self.tag, len_encode(len(v)), v))
6360 def _encode_cer(self, writer):
6361 write_full(writer, self.tag + LENINDEF)
6363 self._values_for_encoding(),
6364 key=attrgetter("tag_order_cer"),
6366 v.encode_cer(writer)
6367 write_full(writer, EOC)
6369 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6371 t, tlen, lv = tag_strip(tlv)
6372 except DecodeError as err:
6373 raise err.__class__(
6375 klass=self.__class__,
6376 decode_path=decode_path,
6381 klass=self.__class__,
6382 decode_path=decode_path,
6389 ctx_bered = ctx.get("bered", False)
6391 l, llen, v = len_decode(lv)
6392 except LenIndefForm as err:
6394 raise err.__class__(
6396 klass=self.__class__,
6397 decode_path=decode_path,
6400 l, llen, v = 0, 1, lv[1:]
6402 except DecodeError as err:
6403 raise err.__class__(
6405 klass=self.__class__,
6406 decode_path=decode_path,
6410 raise NotEnoughData(
6411 "encoded length is longer than data",
6412 klass=self.__class__,
6416 v, tail = v[:l], v[l:]
6418 sub_offset = offset + tlen + llen
6421 ctx_allow_default_values = ctx.get("allow_default_values", False)
6422 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6423 tag_order_prev = (0, 0)
6424 _specs_items = copy(self.specs)
6427 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6429 for name, spec in iteritems(_specs_items):
6430 sub_decode_path = decode_path + (name,)
6436 decode_path=sub_decode_path,
6439 _ctx_immutable=False,
6446 klass=self.__class__,
6447 decode_path=decode_path,
6451 for _decode_path, value, v_tail in spec.decode_evgen(
6455 decode_path=sub_decode_path,
6457 _ctx_immutable=False,
6459 yield _decode_path, value, v_tail
6461 _, value, v_tail = next(spec.decode_evgen(
6465 decode_path=sub_decode_path,
6467 _ctx_immutable=False,
6470 value_tag_order = value.tag_order
6471 value_len = value.fulllen
6472 if tag_order_prev >= value_tag_order:
6473 if ctx_bered or ctx_allow_unordered_set:
6477 "unordered " + self.asn1_type_name,
6478 klass=self.__class__,
6479 decode_path=sub_decode_path,
6482 if spec.default is None or value != spec.default:
6484 elif ctx_bered or ctx_allow_default_values:
6488 "DEFAULT value met",
6489 klass=self.__class__,
6490 decode_path=sub_decode_path,
6493 values[name] = value
6494 del _specs_items[name]
6495 tag_order_prev = value_tag_order
6496 sub_offset += value_len
6500 obj = self.__class__(
6504 default=self.default,
6505 optional=self.optional,
6506 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6509 if v[:EOC_LEN].tobytes() != EOC:
6512 klass=self.__class__,
6513 decode_path=decode_path,
6518 for name, spec in iteritems(self.specs):
6519 if name not in values and not spec.optional:
6521 "%s value is not ready" % name,
6522 klass=self.__class__,
6523 decode_path=decode_path,
6528 obj.ber_encoded = ber_encoded
6529 yield decode_path, obj, tail
6532 SequenceOfState = namedtuple(
6534 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6539 class SequenceOf(Obj):
6540 """``SEQUENCE OF`` sequence type
6542 For that kind of type you must specify the object it will carry on
6543 (bounds are for example here, not required)::
6545 class Ints(SequenceOf):
6550 >>> ints.append(Integer(123))
6551 >>> ints.append(Integer(234))
6553 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6554 >>> [int(i) for i in ints]
6556 >>> ints.append(Integer(345))
6557 Traceback (most recent call last):
6558 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6561 >>> ints[1] = Integer(345)
6563 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6565 You can initialize sequence with preinitialized values:
6567 >>> ints = Ints([Integer(123), Integer(234)])
6569 Also you can use iterator as a value:
6571 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6573 And it won't be iterated until encoding process. Pay attention that
6574 bounds and required schema checks are done only during the encoding
6575 process in that case! After encode was called, then value is zeroed
6576 back to empty list and you have to set it again. That mode is useful
6577 mainly with CER encoding mode, where all objects from the iterable
6578 will be streamed to the buffer, without copying all of them to
6581 __slots__ = ("spec", "_bound_min", "_bound_max")
6582 tag_default = tag_encode(form=TagFormConstructed, num=16)
6583 asn1_type_name = "SEQUENCE OF"
6596 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6598 schema = getattr(self, "schema", None)
6600 raise ValueError("schema must be specified")
6602 self._bound_min, self._bound_max = getattr(
6606 ) if bounds is None else bounds
6608 if value is not None:
6609 self._value = self._value_sanitize(value)
6610 if default is not None:
6611 default_value = self._value_sanitize(default)
6612 default_obj = self.__class__(
6617 default_obj._value = default_value
6618 self.default = default_obj
6620 self._value = copy(default_obj._value)
6622 def _value_sanitize(self, value):
6624 if issubclass(value.__class__, SequenceOf):
6625 value = value._value
6626 elif hasattr(value, NEXT_ATTR_NAME):
6629 elif hasattr(value, "__iter__"):
6632 raise InvalidValueType((self.__class__, iter, "iterator"))
6634 if not self._bound_min <= len(value) <= self._bound_max:
6635 raise BoundsError(self._bound_min, len(value), self._bound_max)
6636 class_expected = self.spec.__class__
6638 if not isinstance(v, class_expected):
6639 raise InvalidValueType((class_expected,))
6644 if hasattr(self._value, NEXT_ATTR_NAME):
6646 if self._bound_min > 0 and len(self._value) == 0:
6648 return all(v.ready for v in self._value)
6652 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6654 return any(v.bered for v in self._value)
6656 def __getstate__(self):
6657 if hasattr(self._value, NEXT_ATTR_NAME):
6658 raise ValueError("can not pickle SequenceOf with iterator")
6659 return SequenceOfState(
6673 [copy(v) for v in self._value],
6678 def __setstate__(self, state):
6679 super(SequenceOf, self).__setstate__(state)
6680 self.spec = state.spec
6681 self._value = state.value
6682 self._bound_min = state.bound_min
6683 self._bound_max = state.bound_max
6685 def __eq__(self, their):
6686 if isinstance(their, self.__class__):
6688 self.spec == their.spec and
6689 self.tag == their.tag and
6690 self._expl == their._expl and
6691 self._value == their._value
6693 if hasattr(their, "__iter__"):
6694 return self._value == list(their)
6706 return self.__class__(
6710 (self._bound_min, self._bound_max)
6711 if bounds is None else bounds
6713 impl=self.tag if impl is None else impl,
6714 expl=self._expl if expl is None else expl,
6715 default=self.default if default is None else default,
6716 optional=self.optional if optional is None else optional,
6719 def __contains__(self, key):
6720 return key in self._value
6722 def append(self, value):
6723 if not isinstance(value, self.spec.__class__):
6724 raise InvalidValueType((self.spec.__class__,))
6725 if len(self._value) + 1 > self._bound_max:
6728 len(self._value) + 1,
6731 self._value.append(value)
6734 return iter(self._value)
6737 return len(self._value)
6739 def __setitem__(self, key, value):
6740 if not isinstance(value, self.spec.__class__):
6741 raise InvalidValueType((self.spec.__class__,))
6742 self._value[key] = self.spec(value=value)
6744 def __getitem__(self, key):
6745 return self._value[key]
6747 def _values_for_encoding(self):
6748 return iter(self._value)
6751 iterator = hasattr(self._value, NEXT_ATTR_NAME)
6754 values_append = values.append
6755 class_expected = self.spec.__class__
6756 values_for_encoding = self._values_for_encoding()
6758 for v in values_for_encoding:
6759 if not isinstance(v, class_expected):
6760 raise InvalidValueType((class_expected,))
6761 values_append(v.encode())
6762 if not self._bound_min <= len(values) <= self._bound_max:
6763 raise BoundsError(self._bound_min, len(values), self._bound_max)
6764 value = b"".join(values)
6766 value = b"".join(v.encode() for v in self._values_for_encoding())
6767 return b"".join((self.tag, len_encode(len(value)), value))
6769 def _encode_cer(self, writer):
6770 write_full(writer, self.tag + LENINDEF)
6771 iterator = hasattr(self._value, NEXT_ATTR_NAME)
6773 class_expected = self.spec.__class__
6775 values_for_encoding = self._values_for_encoding()
6777 for v in values_for_encoding:
6778 if not isinstance(v, class_expected):
6779 raise InvalidValueType((class_expected,))
6780 v.encode_cer(writer)
6782 if not self._bound_min <= values_count <= self._bound_max:
6783 raise BoundsError(self._bound_min, values_count, self._bound_max)
6785 for v in self._values_for_encoding():
6786 v.encode_cer(writer)
6787 write_full(writer, EOC)
6797 ordering_check=False,
6800 t, tlen, lv = tag_strip(tlv)
6801 except DecodeError as err:
6802 raise err.__class__(
6804 klass=self.__class__,
6805 decode_path=decode_path,
6810 klass=self.__class__,
6811 decode_path=decode_path,
6818 ctx_bered = ctx.get("bered", False)
6820 l, llen, v = len_decode(lv)
6821 except LenIndefForm as err:
6823 raise err.__class__(
6825 klass=self.__class__,
6826 decode_path=decode_path,
6829 l, llen, v = 0, 1, lv[1:]
6831 except DecodeError as err:
6832 raise err.__class__(
6834 klass=self.__class__,
6835 decode_path=decode_path,
6839 raise NotEnoughData(
6840 "encoded length is longer than data",
6841 klass=self.__class__,
6842 decode_path=decode_path,
6846 v, tail = v[:l], v[l:]
6848 sub_offset = offset + tlen + llen
6851 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6852 value_prev = memoryview(v[:0])
6856 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6858 sub_decode_path = decode_path + (str(_value_count),)
6860 for _decode_path, value, v_tail in spec.decode_evgen(
6864 decode_path=sub_decode_path,
6866 _ctx_immutable=False,
6868 yield _decode_path, value, v_tail
6870 _, value, v_tail = next(spec.decode_evgen(
6874 decode_path=sub_decode_path,
6876 _ctx_immutable=False,
6879 value_len = value.fulllen
6881 if value_prev.tobytes() > v[:value_len].tobytes():
6882 if ctx_bered or ctx_allow_unordered_set:
6886 "unordered " + self.asn1_type_name,
6887 klass=self.__class__,
6888 decode_path=sub_decode_path,
6891 value_prev = v[:value_len]
6894 _value.append(value)
6895 sub_offset += value_len
6898 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
6900 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
6901 klass=self.__class__,
6902 decode_path=decode_path,
6906 obj = self.__class__(
6907 value=None if evgen_mode else _value,
6909 bounds=(self._bound_min, self._bound_max),
6912 default=self.default,
6913 optional=self.optional,
6914 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6916 except BoundsError as err:
6919 klass=self.__class__,
6920 decode_path=decode_path,
6924 if v[:EOC_LEN].tobytes() != EOC:
6927 klass=self.__class__,
6928 decode_path=decode_path,
6933 obj.ber_encoded = ber_encoded
6934 yield decode_path, obj, tail
6938 pp_console_row(next(self.pps())),
6939 ", ".join(repr(v) for v in self._value),
6942 def pps(self, decode_path=()):
6945 asn1_type_name=self.asn1_type_name,
6946 obj_name=self.__class__.__name__,
6947 decode_path=decode_path,
6948 optional=self.optional,
6949 default=self == self.default,
6950 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6951 expl=None if self._expl is None else tag_decode(self._expl),
6956 expl_offset=self.expl_offset if self.expled else None,
6957 expl_tlen=self.expl_tlen if self.expled else None,
6958 expl_llen=self.expl_llen if self.expled else None,
6959 expl_vlen=self.expl_vlen if self.expled else None,
6960 expl_lenindef=self.expl_lenindef,
6961 lenindef=self.lenindef,
6962 ber_encoded=self.ber_encoded,
6965 for i, value in enumerate(self._value):
6966 yield value.pps(decode_path=decode_path + (str(i),))
6967 for pp in self.pps_lenindef(decode_path):
6971 class SetOf(SequenceOf):
6972 """``SET OF`` sequence type
6974 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
6977 tag_default = tag_encode(form=TagFormConstructed, num=17)
6978 asn1_type_name = "SET OF"
6980 def _value_sanitize(self, value):
6981 value = super(SetOf, self)._value_sanitize(value)
6982 if hasattr(value, NEXT_ATTR_NAME):
6984 "SetOf does not support iterator values, as no sense in them"
6989 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
6990 return b"".join((self.tag, len_encode(len(v)), v))
6992 def _encode_cer(self, writer):
6993 write_full(writer, self.tag + LENINDEF)
6994 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
6995 write_full(writer, v)
6996 write_full(writer, EOC)
6998 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6999 return super(SetOf, self)._decode(
7006 ordering_check=True,
7010 def obj_by_path(pypath): # pragma: no cover
7011 """Import object specified as string Python path
7013 Modules must be separated from classes/functions with ``:``.
7015 >>> obj_by_path("foo.bar:Baz")
7016 <class 'foo.bar.Baz'>
7017 >>> obj_by_path("foo.bar:Baz.boo")
7018 <classmethod 'foo.bar.Baz.boo'>
7020 mod, objs = pypath.rsplit(":", 1)
7021 from importlib import import_module
7022 obj = import_module(mod)
7023 for obj_name in objs.split("."):
7024 obj = getattr(obj, obj_name)
7028 def generic_decoder(): # pragma: no cover
7029 # All of this below is a big hack with self references
7030 choice = PrimitiveTypes()
7031 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7032 choice.specs["SetOf"] = SetOf(schema=choice)
7033 for i in six_xrange(31):
7034 choice.specs["SequenceOf%d" % i] = SequenceOf(
7038 choice.specs["Any"] = Any()
7040 # Class name equals to type name, to omit it from output
7041 class SEQUENCEOF(SequenceOf):
7049 with_decode_path=False,
7050 decode_path_only=(),
7052 def _pprint_pps(pps):
7054 if hasattr(pp, "_fields"):
7056 decode_path_only != () and
7057 pp.decode_path[:len(decode_path_only)] != decode_path_only
7060 if pp.asn1_type_name == Choice.asn1_type_name:
7062 pp_kwargs = pp._asdict()
7063 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7064 pp = _pp(**pp_kwargs)
7065 yield pp_console_row(
7070 with_colours=with_colours,
7071 with_decode_path=with_decode_path,
7072 decode_path_len_decrease=len(decode_path_only),
7074 for row in pp_console_blob(
7076 decode_path_len_decrease=len(decode_path_only),
7080 for row in _pprint_pps(pp):
7082 return "\n".join(_pprint_pps(obj.pps()))
7083 return SEQUENCEOF(), pprint_any
7086 def main(): # pragma: no cover
7088 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/DER decoder")
7089 parser.add_argument(
7093 help="Skip that number of bytes from the beginning",
7095 parser.add_argument(
7097 help="Python paths to dictionary with OIDs, comma separated",
7099 parser.add_argument(
7101 help="Python path to schema definition to use",
7103 parser.add_argument(
7104 "--defines-by-path",
7105 help="Python path to decoder's defines_by_path",
7107 parser.add_argument(
7109 action="store_true",
7110 help="Disallow BER encoding",
7112 parser.add_argument(
7113 "--print-decode-path",
7114 action="store_true",
7115 help="Print decode paths",
7117 parser.add_argument(
7118 "--decode-path-only",
7119 help="Print only specified decode path",
7121 parser.add_argument(
7123 action="store_true",
7124 help="Allow explicit tag out-of-bound",
7126 parser.add_argument(
7128 action="store_true",
7129 help="Turn on event generation mode",
7131 parser.add_argument(
7133 type=argparse.FileType("rb"),
7134 help="Path to BER/CER/DER file you want to decode",
7136 args = parser.parse_args()
7138 args.RAWFile.seek(args.skip)
7139 raw = memoryview(args.RAWFile.read())
7140 args.RAWFile.close()
7142 raw = file_mmaped(args.RAWFile)[args.skip:]
7144 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7145 if args.oids else ()
7148 schema = obj_by_path(args.schema)
7149 from functools import partial
7150 pprinter = partial(pprint, big_blobs=True)
7152 schema, pprinter = generic_decoder()
7154 "bered": not args.nobered,
7155 "allow_expl_oob": args.allow_expl_oob,
7157 if args.defines_by_path is not None:
7158 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7159 from os import environ
7163 with_colours=environ.get("NO_COLOR") is None,
7164 with_decode_path=args.print_decode_path,
7166 () if args.decode_path_only is None else
7167 tuple(args.decode_path_only.split(":"))
7171 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7172 print(pprinter(obj, decode_path=decode_path))
7174 obj, tail = schema().decode(raw, ctx=ctx)
7175 print(pprinter(obj))
7177 print("\nTrailing data: %s" % hexenc(tail))
7180 if __name__ == "__main__":