3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
673 POSIX compliant systems have ``mmap`` syscall, giving ability to work
674 the memory mapped file. You can deal with the file like it was an
675 ordinary binary string, allowing you not to load it to the memory first.
676 Also you can use them as an input for OCTET STRING, taking no Python
677 memory for their storage.
679 There is convenient :py:func:`pyderasn.file_mmaped` function that
680 creates read-only memoryview on the file contents::
682 with open("huge", "rb") as fd:
683 raw = file_mmaped(fd)
684 obj = Something.decode(raw)
688 mmap-ed files in Python2.7 does not implement buffer protocol, so
689 memoryview won't work on them.
693 mmap maps the **whole** file. So it plays no role if you seek-ed it
694 before. Take the slice of the resulting memoryview with required
699 If you use ZFS as underlying storage, then pay attention that
700 currently most platforms does not deal good with ZFS ARC and ordinary
701 page cache used for mmaps. It can take twice the necessary size in
702 the memory: both in page cache and ZFS ARC.
707 We can parse any kind of data now, but how can we produce files
708 streamingly, without storing their encoded representation in memory?
709 SEQUENCE by default encodes in memory all its values, joins them in huge
710 binary string, just to know the exact size of SEQUENCE's value for
711 encoding it in TLV. DER requires you to know all exact sizes of the
714 You can use CER encoding mode, that slightly differs from the DER, but
715 does not require exact sizes knowledge, allowing streaming encoding
716 directly to some writer/buffer. Just use
717 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
718 encoded data will flow::
720 opener = io.open if PY2 else open
721 with opener("result", "wb") as fd:
722 obj.encode_cer(fd.write)
727 obj.encode_cer(buf.write)
729 If you do not want to create in-memory buffer every time, then you can
730 use :py:func:`pyderasn.encode_cer` function::
732 data = encode_cer(obj)
734 Remember that CER is **not valid** DER in most cases, so you **have to**
735 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
736 decoding. Also currently there is **no** validation that provided CER is
737 valid one -- you are sure that it has only valid BER encoding.
741 SET OF values can not be streamingly encoded, because they are
742 required to be sorted byte-by-byte. Big SET OF values still will take
743 much memory. Use neither SET nor SET OF values, as modern ASN.1
746 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
747 OCTET STRINGs! They will be streamingly copied from underlying file to
748 the buffer using 1 KB chunks.
750 Some structures require that some of the elements have to be forcefully
751 DER encoded. For example ``SignedData`` CMS requires you to encode
752 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
753 encode everything else in BER. You can tell any of the structures to be
754 forcefully encoded in DER during CER encoding, by specifying
755 ``der_forced=True`` attribute::
757 class Certificate(Sequence):
761 class SignedAttributes(SetOf):
769 In most cases, huge quantity of binary data is stored as OCTET STRING.
770 CER encoding splits it on 1 KB chunks. BER allows splitting on various
771 levels of chunks inclusion::
773 SOME STRING[CONSTRUCTED]
774 OCTET STRING[CONSTRUCTED]
775 OCTET STRING[PRIMITIVE]
777 OCTET STRING[PRIMITIVE]
779 OCTET STRING[PRIMITIVE]
781 OCTET STRING[PRIMITIVE]
783 OCTET STRING[CONSTRUCTED]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[CONSTRUCTED]
789 OCTET STRING[CONSTRUCTED]
790 OCTET STRING[PRIMITIVE]
793 You can not just take the offset and some ``.vlen`` of the STRING and
794 treat it as the payload. If you decode it without
795 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
796 and ``bytes()`` will give the whole payload contents.
798 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
799 small memory footprint. There is convenient
800 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
801 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
802 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
803 SHA512 digest of its ``eContent``::
805 fd = open("data.p7m", "rb")
806 raw = file_mmaped(fd)
807 ctx = {"bered": True}
808 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
809 if decode_path == ("content",):
813 raise ValueError("no content found")
814 hasher_state = sha512()
816 hasher_state.update(data)
818 evgens = SignedData().decode_evgen(
819 raw[content.offset:],
820 offset=content.offset,
823 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
825 digest = hasher_state.digest()
827 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
828 copy the payload (without BER/CER encoding interleaved overhead) in it.
829 Virtually it won't take memory more than for keeping small structures
830 and 1 KB binary chunks.
832 SEQUENCE OF iterators
833 _____________________
835 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
836 classes. The only difference with providing the full list of objects, is
837 that type and bounds checking is done during encoding process. Also
838 sequence's value will be emptied after encoding, forcing you to set its
841 This is very useful when you have to create some huge objects, like
842 CRLs, with thousands and millions of entities inside. You can write the
843 generator taking necessary data from the database and giving the
844 ``RevokedCertificate`` objects. Only binary representation of that
845 objects will take memory during DER encoding.
849 .. autoclass:: pyderasn.Obj
857 .. autoclass:: pyderasn.Boolean
862 .. autoclass:: pyderasn.Integer
863 :members: __init__, named
867 .. autoclass:: pyderasn.BitString
868 :members: __init__, bit_len, named
872 .. autoclass:: pyderasn.OctetString
877 .. autoclass:: pyderasn.Null
882 .. autoclass:: pyderasn.ObjectIdentifier
887 .. autoclass:: pyderasn.Enumerated
891 .. autoclass:: pyderasn.CommonString
895 .. autoclass:: pyderasn.NumericString
899 .. autoclass:: pyderasn.PrintableString
900 :members: __init__, allow_asterisk, allow_ampersand
904 .. autoclass:: pyderasn.UTCTime
905 :members: __init__, todatetime
909 .. autoclass:: pyderasn.GeneralizedTime
910 :members: __init__, todatetime
917 .. autoclass:: pyderasn.Choice
918 :members: __init__, choice, value
922 .. autoclass:: PrimitiveTypes
926 .. autoclass:: pyderasn.Any
934 .. autoclass:: pyderasn.Sequence
939 .. autoclass:: pyderasn.Set
944 .. autoclass:: pyderasn.SequenceOf
949 .. autoclass:: pyderasn.SetOf
955 .. autofunction:: pyderasn.abs_decode_path
956 .. autofunction:: pyderasn.agg_octet_string
957 .. autofunction:: pyderasn.colonize_hex
958 .. autofunction:: pyderasn.encode_cer
959 .. autofunction:: pyderasn.file_mmaped
960 .. autofunction:: pyderasn.hexenc
961 .. autofunction:: pyderasn.hexdec
962 .. autofunction:: pyderasn.tag_encode
963 .. autofunction:: pyderasn.tag_decode
964 .. autofunction:: pyderasn.tag_ctxp
965 .. autofunction:: pyderasn.tag_ctxc
966 .. autoclass:: pyderasn.DecodeError
968 .. autoclass:: pyderasn.NotEnoughData
969 .. autoclass:: pyderasn.ExceedingData
970 .. autoclass:: pyderasn.LenIndefForm
971 .. autoclass:: pyderasn.TagMismatch
972 .. autoclass:: pyderasn.InvalidLength
973 .. autoclass:: pyderasn.InvalidOID
974 .. autoclass:: pyderasn.ObjUnknown
975 .. autoclass:: pyderasn.ObjNotReady
976 .. autoclass:: pyderasn.InvalidValueType
977 .. autoclass:: pyderasn.BoundsError
984 You can decode DER/BER files using command line abilities::
986 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
988 If there is no schema for your file, then you can try parsing it without,
989 but of course IMPLICIT tags will often make it impossible. But result is
990 good enough for the certificate above::
992 $ python -m pyderasn path/to/file
993 0 [1,3,1604] . >: SEQUENCE OF
994 4 [1,3,1453] . . >: SEQUENCE OF
995 8 [0,0, 5] . . . . >: [0] ANY
996 . . . . . A0:03:02:01:02
997 13 [1,1, 3] . . . . >: INTEGER 61595
998 18 [1,1, 13] . . . . >: SEQUENCE OF
999 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1000 31 [1,1, 0] . . . . . . >: NULL
1001 33 [1,3, 274] . . . . >: SEQUENCE OF
1002 37 [1,1, 11] . . . . . . >: SET OF
1003 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1004 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1005 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1007 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1008 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1009 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1010 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1011 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1012 . . . . . . . . . 61:2E:63:6F:6D:2F
1013 1461 [1,1, 13] . . >: SEQUENCE OF
1014 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1015 1474 [1,1, 0] . . . . >: NULL
1016 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1017 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1018 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1024 If you have got dictionaries with ObjectIdentifiers, like example one
1025 from ``tests/test_crts.py``::
1028 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1029 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1031 "2.5.4.10": "id-at-organizationName",
1032 "2.5.4.11": "id-at-organizationalUnitName",
1035 then you can pass it to pretty printer to see human readable OIDs::
1037 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1039 37 [1,1, 11] . . . . . . >: SET OF
1040 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1041 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1042 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1043 50 [1,1, 18] . . . . . . >: SET OF
1044 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1045 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1046 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1047 70 [1,1, 18] . . . . . . >: SET OF
1048 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1049 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1050 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1056 Each decoded element has so-called decode path: sequence of structure
1057 names it is passing during the decode process. Each element has its own
1058 unique path inside the whole ASN.1 tree. You can print it out with
1059 ``--print-decode-path`` option::
1061 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1062 0 [1,3,1604] Certificate SEQUENCE []
1063 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1064 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1065 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1066 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1067 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1068 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1070 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1071 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1072 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1073 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1074 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1075 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1076 . . . . . . . 13:02:45:53
1077 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1080 Now you can print only the specified tree, for example signature algorithm::
1082 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1083 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1084 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1085 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1089 from array import array
1090 from codecs import getdecoder
1091 from codecs import getencoder
1092 from collections import namedtuple
1093 from collections import OrderedDict
1094 from copy import copy
1095 from datetime import datetime
1096 from datetime import timedelta
1097 from io import BytesIO
1098 from math import ceil
1099 from mmap import mmap
1100 from mmap import PROT_READ
1101 from operator import attrgetter
1102 from string import ascii_letters
1103 from string import digits
1104 from sys import version_info
1105 from unicodedata import category as unicat
1107 from six import add_metaclass
1108 from six import binary_type
1109 from six import byte2int
1110 from six import indexbytes
1111 from six import int2byte
1112 from six import integer_types
1113 from six import iterbytes
1114 from six import iteritems
1115 from six import itervalues
1117 from six import string_types
1118 from six import text_type
1119 from six import unichr as six_unichr
1120 from six.moves import xrange as six_xrange
1124 from termcolor import colored
1125 except ImportError: # pragma: no cover
1126 def colored(what, *args, **kwargs):
1175 "TagClassApplication",
1178 "TagClassUniversal",
1179 "TagFormConstructed",
1190 TagClassUniversal = 0
1191 TagClassApplication = 1 << 6
1192 TagClassContext = 1 << 7
1193 TagClassPrivate = 1 << 6 | 1 << 7
1194 TagFormPrimitive = 0
1195 TagFormConstructed = 1 << 5
1197 TagClassContext: "",
1198 TagClassApplication: "APPLICATION ",
1199 TagClassPrivate: "PRIVATE ",
1200 TagClassUniversal: "UNIV ",
1204 LENINDEF = b"\x80" # length indefinite mark
1205 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1206 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1207 SET01 = frozenset("01")
1208 DECIMALS = frozenset(digits)
1209 DECIMAL_SIGNS = ".,"
1210 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1213 def file_mmaped(fd):
1214 """Make mmap-ed memoryview for reading from file
1216 :param fd: file object
1217 :returns: memoryview over read-only mmap-ing of the whole file
1219 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1222 if not set(value) <= DECIMALS:
1223 raise ValueError("non-pure integer")
1226 def fractions2float(fractions_raw):
1227 pureint(fractions_raw)
1228 return float("0." + fractions_raw)
1231 def get_def_by_path(defines_by_path, sub_decode_path):
1232 """Get define by decode path
1234 for path, define in defines_by_path:
1235 if len(path) != len(sub_decode_path):
1237 for p1, p2 in zip(path, sub_decode_path):
1238 if (not p1 is any) and (p1 != p2):
1244 ########################################################################
1246 ########################################################################
1248 class ASN1Error(ValueError):
1252 class DecodeError(ASN1Error):
1253 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1255 :param str msg: reason of decode failing
1256 :param klass: optional exact DecodeError inherited class (like
1257 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1258 :py:exc:`InvalidLength`)
1259 :param decode_path: tuple of strings. It contains human
1260 readable names of the fields through which
1261 decoding process has passed
1262 :param int offset: binary offset where failure happened
1264 super(DecodeError, self).__init__()
1267 self.decode_path = decode_path
1268 self.offset = offset
1273 "" if self.klass is None else self.klass.__name__,
1275 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1276 if len(self.decode_path) > 0 else ""
1278 ("(at %d)" % self.offset) if self.offset > 0 else "",
1284 return "%s(%s)" % (self.__class__.__name__, self)
1287 class NotEnoughData(DecodeError):
1291 class ExceedingData(ASN1Error):
1292 def __init__(self, nbytes):
1293 super(ExceedingData, self).__init__()
1294 self.nbytes = nbytes
1297 return "%d trailing bytes" % self.nbytes
1300 return "%s(%s)" % (self.__class__.__name__, self)
1303 class LenIndefForm(DecodeError):
1307 class TagMismatch(DecodeError):
1311 class InvalidLength(DecodeError):
1315 class InvalidOID(DecodeError):
1319 class ObjUnknown(ASN1Error):
1320 def __init__(self, name):
1321 super(ObjUnknown, self).__init__()
1325 return "object is unknown: %s" % self.name
1328 return "%s(%s)" % (self.__class__.__name__, self)
1331 class ObjNotReady(ASN1Error):
1332 def __init__(self, name):
1333 super(ObjNotReady, self).__init__()
1337 return "object is not ready: %s" % self.name
1340 return "%s(%s)" % (self.__class__.__name__, self)
1343 class InvalidValueType(ASN1Error):
1344 def __init__(self, expected_types):
1345 super(InvalidValueType, self).__init__()
1346 self.expected_types = expected_types
1349 return "invalid value type, expected: %s" % ", ".join(
1350 [repr(t) for t in self.expected_types]
1354 return "%s(%s)" % (self.__class__.__name__, self)
1357 class BoundsError(ASN1Error):
1358 def __init__(self, bound_min, value, bound_max):
1359 super(BoundsError, self).__init__()
1360 self.bound_min = bound_min
1362 self.bound_max = bound_max
1365 return "unsatisfied bounds: %s <= %s <= %s" % (
1372 return "%s(%s)" % (self.__class__.__name__, self)
1375 ########################################################################
1377 ########################################################################
1379 _hexdecoder = getdecoder("hex")
1380 _hexencoder = getencoder("hex")
1384 """Binary data to hexadecimal string convert
1386 return _hexdecoder(data)[0]
1390 """Hexadecimal string to binary data convert
1392 return _hexencoder(data)[0].decode("ascii")
1395 def int_bytes_len(num, byte_len=8):
1398 return int(ceil(float(num.bit_length()) / byte_len))
1401 def zero_ended_encode(num):
1402 octets = bytearray(int_bytes_len(num, 7))
1404 octets[i] = num & 0x7F
1408 octets[i] = 0x80 | (num & 0x7F)
1411 return bytes(octets)
1414 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1415 """Encode tag to binary form
1417 :param int num: tag's number
1418 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1419 :py:data:`pyderasn.TagClassContext`,
1420 :py:data:`pyderasn.TagClassApplication`,
1421 :py:data:`pyderasn.TagClassPrivate`)
1422 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1423 :py:data:`pyderasn.TagFormConstructed`)
1427 return int2byte(klass | form | num)
1428 # [XX|X|11111][1.......][1.......] ... [0.......]
1429 return int2byte(klass | form | 31) + zero_ended_encode(num)
1432 def tag_decode(tag):
1433 """Decode tag from binary form
1437 No validation is performed, assuming that it has already passed.
1439 It returns tuple with three integers, as
1440 :py:func:`pyderasn.tag_encode` accepts.
1442 first_octet = byte2int(tag)
1443 klass = first_octet & 0xC0
1444 form = first_octet & 0x20
1445 if first_octet & 0x1F < 0x1F:
1446 return (klass, form, first_octet & 0x1F)
1448 for octet in iterbytes(tag[1:]):
1451 return (klass, form, num)
1455 """Create CONTEXT PRIMITIVE tag
1457 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1461 """Create CONTEXT CONSTRUCTED tag
1463 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1466 def tag_strip(data):
1467 """Take off tag from the data
1469 :returns: (encoded tag, tag length, remaining data)
1472 raise NotEnoughData("no data at all")
1473 if byte2int(data) & 0x1F < 31:
1474 return data[:1], 1, data[1:]
1479 raise DecodeError("unfinished tag")
1480 if indexbytes(data, i) & 0x80 == 0:
1483 return data[:i], i, data[i:]
1489 octets = bytearray(int_bytes_len(l) + 1)
1490 octets[0] = 0x80 | (len(octets) - 1)
1491 for i in six_xrange(len(octets) - 1, 0, -1):
1492 octets[i] = l & 0xFF
1494 return bytes(octets)
1497 def len_decode(data):
1500 :returns: (decoded length, length's length, remaining data)
1501 :raises LenIndefForm: if indefinite form encoding is met
1504 raise NotEnoughData("no data at all")
1505 first_octet = byte2int(data)
1506 if first_octet & 0x80 == 0:
1507 return first_octet, 1, data[1:]
1508 octets_num = first_octet & 0x7F
1509 if octets_num + 1 > len(data):
1510 raise NotEnoughData("encoded length is longer than data")
1512 raise LenIndefForm()
1513 if byte2int(data[1:]) == 0:
1514 raise DecodeError("leading zeros")
1516 for v in iterbytes(data[1:1 + octets_num]):
1519 raise DecodeError("long form instead of short one")
1520 return l, 1 + octets_num, data[1 + octets_num:]
1523 LEN1K = len_encode(1000)
1526 def write_full(writer, data):
1527 """Fully write provided data
1529 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1531 BytesIO does not guarantee that the whole data will be written at
1532 once. That function write everything provided, raising an error if
1533 ``writer`` returns None.
1535 data = memoryview(data)
1537 while written != len(data):
1538 n = writer(data[written:])
1540 raise ValueError("can not write to buf")
1544 ########################################################################
1546 ########################################################################
1548 class AutoAddSlots(type):
1549 def __new__(cls, name, bases, _dict):
1550 _dict["__slots__"] = _dict.get("__slots__", ())
1551 return type.__new__(cls, name, bases, _dict)
1554 BasicState = namedtuple("BasicState", (
1567 ), **NAMEDTUPLE_KWARGS)
1570 @add_metaclass(AutoAddSlots)
1572 """Common ASN.1 object class
1574 All ASN.1 types are inherited from it. It has metaclass that
1575 automatically adds ``__slots__`` to all inherited classes.
1600 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1601 self._expl = getattr(self, "expl", None) if expl is None else expl
1602 if self.tag != self.tag_default and self._expl is not None:
1603 raise ValueError("implicit and explicit tags can not be set simultaneously")
1604 if self.tag is None:
1605 self._tag_order = None
1607 tag_class, _, tag_num = tag_decode(
1608 self.tag if self._expl is None else self._expl
1610 self._tag_order = (tag_class, tag_num)
1611 if default is not None:
1613 self.optional = optional
1614 self.offset, self.llen, self.vlen = _decoded
1616 self.expl_lenindef = False
1617 self.lenindef = False
1618 self.ber_encoded = False
1621 def ready(self): # pragma: no cover
1622 """Is object ready to be encoded?
1624 raise NotImplementedError()
1626 def _assert_ready(self):
1628 raise ObjNotReady(self.__class__.__name__)
1632 """Is either object or any elements inside is BER encoded?
1634 return self.expl_lenindef or self.lenindef or self.ber_encoded
1638 """Is object decoded?
1640 return (self.llen + self.vlen) > 0
1642 def __getstate__(self): # pragma: no cover
1643 """Used for making safe to be mutable pickleable copies
1645 raise NotImplementedError()
1647 def __setstate__(self, state):
1648 if state.version != __version__:
1649 raise ValueError("data is pickled by different PyDERASN version")
1650 self.tag = state.tag
1651 self._tag_order = state.tag_order
1652 self._expl = state.expl
1653 self.default = state.default
1654 self.optional = state.optional
1655 self.offset = state.offset
1656 self.llen = state.llen
1657 self.vlen = state.vlen
1658 self.expl_lenindef = state.expl_lenindef
1659 self.lenindef = state.lenindef
1660 self.ber_encoded = state.ber_encoded
1663 def tag_order(self):
1664 """Tag's (class, number) used for DER/CER sorting
1666 return self._tag_order
1669 def tag_order_cer(self):
1670 return self.tag_order
1674 """See :ref:`decoding`
1676 return len(self.tag)
1680 """See :ref:`decoding`
1682 return self.tlen + self.llen + self.vlen
1684 def __str__(self): # pragma: no cover
1685 return self.__bytes__() if PY2 else self.__unicode__()
1687 def __ne__(self, their):
1688 return not(self == their)
1690 def __gt__(self, their): # pragma: no cover
1691 return not(self < their)
1693 def __le__(self, their): # pragma: no cover
1694 return (self == their) or (self < their)
1696 def __ge__(self, their): # pragma: no cover
1697 return (self == their) or (self > their)
1699 def _encode(self): # pragma: no cover
1700 raise NotImplementedError()
1702 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1703 yield NotImplemented
1706 """DER encode the structure
1708 :returns: DER representation
1710 raw = self._encode()
1711 if self._expl is None:
1713 return b"".join((self._expl, len_encode(len(raw)), raw))
1715 def encode_cer(self, writer):
1716 """CER encode the structure to specified writer
1718 :param writer: must comply with ``io.RawIOBase.write``
1719 behaviour. It takes slice to be written and
1720 returns number of bytes processed. If it returns
1721 None, then exception will be raised
1723 if self._expl is not None:
1724 write_full(writer, self._expl + LENINDEF)
1725 if getattr(self, "der_forced", False):
1726 write_full(writer, self._encode())
1728 self._encode_cer(writer)
1729 if self._expl is not None:
1730 write_full(writer, EOC)
1732 def _encode_cer(self, writer):
1733 write_full(writer, self._encode())
1735 def hexencode(self):
1736 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1738 return hexenc(self.encode())
1748 _ctx_immutable=True,
1752 :param data: either binary or memoryview
1753 :param int offset: initial data's offset
1754 :param bool leavemm: do we need to leave memoryview of remaining
1755 data as is, or convert it to bytes otherwise
1756 :param decode_path: current decode path (tuples of strings,
1757 possibly with DecodePathDefBy) with will be
1758 the root for all underlying objects
1759 :param ctx: optional :ref:`context <ctx>` governing decoding process
1760 :param bool tag_only: decode only the tag, without length and
1761 contents (used only in Choice and Set
1762 structures, trying to determine if tag satisfies
1764 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1766 :returns: (Obj, remaining data)
1768 .. seealso:: :ref:`decoding`
1770 result = next(self.decode_evgen(
1782 _, obj, tail = result
1793 _ctx_immutable=True,
1796 """Decode with evgen mode on
1798 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1799 it returns the generator producing ``(decode_path, obj, tail)``
1800 values. See :ref:`evgen mode <evgen_mode>`.
1804 elif _ctx_immutable:
1806 tlv = memoryview(data)
1809 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1812 if self._expl is None:
1813 for result in self._decode(
1816 decode_path=decode_path,
1819 evgen_mode=_evgen_mode,
1824 _decode_path, obj, tail = result
1825 if not _decode_path is decode_path:
1829 t, tlen, lv = tag_strip(tlv)
1830 except DecodeError as err:
1831 raise err.__class__(
1833 klass=self.__class__,
1834 decode_path=decode_path,
1839 klass=self.__class__,
1840 decode_path=decode_path,
1844 l, llen, v = len_decode(lv)
1845 except LenIndefForm as err:
1846 if not ctx.get("bered", False):
1847 raise err.__class__(
1849 klass=self.__class__,
1850 decode_path=decode_path,
1854 offset += tlen + llen
1855 for result in self._decode(
1858 decode_path=decode_path,
1861 evgen_mode=_evgen_mode,
1863 if tag_only: # pragma: no cover
1866 _decode_path, obj, tail = result
1867 if not _decode_path is decode_path:
1869 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1870 if eoc_expected.tobytes() != EOC:
1873 klass=self.__class__,
1874 decode_path=decode_path,
1878 obj.expl_lenindef = True
1879 except DecodeError as err:
1880 raise err.__class__(
1882 klass=self.__class__,
1883 decode_path=decode_path,
1888 raise NotEnoughData(
1889 "encoded length is longer than data",
1890 klass=self.__class__,
1891 decode_path=decode_path,
1894 for result in self._decode(
1896 offset=offset + tlen + llen,
1897 decode_path=decode_path,
1900 evgen_mode=_evgen_mode,
1902 if tag_only: # pragma: no cover
1905 _decode_path, obj, tail = result
1906 if not _decode_path is decode_path:
1908 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
1910 "explicit tag out-of-bound, longer than data",
1911 klass=self.__class__,
1912 decode_path=decode_path,
1915 yield decode_path, obj, (tail if leavemm else tail.tobytes())
1917 def decod(self, data, offset=0, decode_path=(), ctx=None):
1918 """Decode the data, check that tail is empty
1920 :raises ExceedingData: if tail is not empty
1922 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
1923 (decode without tail) that also checks that there is no
1926 obj, tail = self.decode(
1929 decode_path=decode_path,
1934 raise ExceedingData(len(tail))
1937 def hexdecode(self, data, *args, **kwargs):
1938 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
1940 return self.decode(hexdec(data), *args, **kwargs)
1942 def hexdecod(self, data, *args, **kwargs):
1943 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
1945 return self.decod(hexdec(data), *args, **kwargs)
1949 """See :ref:`decoding`
1951 return self._expl is not None
1955 """See :ref:`decoding`
1960 def expl_tlen(self):
1961 """See :ref:`decoding`
1963 return len(self._expl)
1966 def expl_llen(self):
1967 """See :ref:`decoding`
1969 if self.expl_lenindef:
1971 return len(len_encode(self.tlvlen))
1974 def expl_offset(self):
1975 """See :ref:`decoding`
1977 return self.offset - self.expl_tlen - self.expl_llen
1980 def expl_vlen(self):
1981 """See :ref:`decoding`
1986 def expl_tlvlen(self):
1987 """See :ref:`decoding`
1989 return self.expl_tlen + self.expl_llen + self.expl_vlen
1992 def fulloffset(self):
1993 """See :ref:`decoding`
1995 return self.expl_offset if self.expled else self.offset
1999 """See :ref:`decoding`
2001 return self.expl_tlvlen if self.expled else self.tlvlen
2003 def pps_lenindef(self, decode_path):
2004 if self.lenindef and not (
2005 getattr(self, "defined", None) is not None and
2006 self.defined[1].lenindef
2009 asn1_type_name="EOC",
2011 decode_path=decode_path,
2013 self.offset + self.tlvlen -
2014 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2022 if self.expl_lenindef:
2024 asn1_type_name="EOC",
2025 obj_name="EXPLICIT",
2026 decode_path=decode_path,
2027 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2036 def encode_cer(obj):
2037 """Encode to CER in memory buffer
2039 :returns bytes: memory buffer contents
2042 obj.encode_cer(buf.write)
2043 return buf.getvalue()
2046 class DecodePathDefBy(object):
2047 """DEFINED BY representation inside decode path
2049 __slots__ = ("defined_by",)
2051 def __init__(self, defined_by):
2052 self.defined_by = defined_by
2054 def __ne__(self, their):
2055 return not(self == their)
2057 def __eq__(self, their):
2058 if not isinstance(their, self.__class__):
2060 return self.defined_by == their.defined_by
2063 return "DEFINED BY " + str(self.defined_by)
2066 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2069 ########################################################################
2071 ########################################################################
2073 PP = namedtuple("PP", (
2096 ), **NAMEDTUPLE_KWARGS)
2101 asn1_type_name="unknown",
2118 expl_lenindef=False,
2149 def _colourize(what, colour, with_colours, attrs=("bold",)):
2150 return colored(what, colour, attrs=attrs) if with_colours else what
2153 def colonize_hex(hexed):
2154 """Separate hexadecimal string with colons
2156 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2165 with_decode_path=False,
2166 decode_path_len_decrease=0,
2173 " " if pp.expl_offset is None else
2174 ("-%d" % (pp.offset - pp.expl_offset))
2176 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2178 col = _colourize(col, "red", with_colours, ())
2179 col += _colourize("B", "red", with_colours) if pp.bered else " "
2181 col = "[%d,%d,%4d]%s" % (
2185 LENINDEF_PP_CHAR if pp.lenindef else " "
2187 col = _colourize(col, "green", with_colours, ())
2189 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2190 if decode_path_len > 0:
2191 cols.append(" ." * decode_path_len)
2192 ent = pp.decode_path[-1]
2193 if isinstance(ent, DecodePathDefBy):
2194 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2195 value = str(ent.defined_by)
2198 len(oid_maps) > 0 and
2199 ent.defined_by.asn1_type_name ==
2200 ObjectIdentifier.asn1_type_name
2202 for oid_map in oid_maps:
2203 oid_name = oid_map.get(value)
2204 if oid_name is not None:
2205 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2207 if oid_name is None:
2208 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2210 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2211 if pp.expl is not None:
2212 klass, _, num = pp.expl
2213 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2214 cols.append(_colourize(col, "blue", with_colours))
2215 if pp.impl is not None:
2216 klass, _, num = pp.impl
2217 col = "[%s%d]" % (TagClassReprs[klass], num)
2218 cols.append(_colourize(col, "blue", with_colours))
2219 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2220 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2222 cols.append(_colourize("BER", "red", with_colours))
2223 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2224 if pp.value is not None:
2226 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2228 len(oid_maps) > 0 and
2229 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2231 for oid_map in oid_maps:
2232 oid_name = oid_map.get(value)
2233 if oid_name is not None:
2234 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2236 if pp.asn1_type_name == Integer.asn1_type_name:
2237 hex_repr = hex(int(pp.obj._value))[2:].upper()
2238 if len(hex_repr) % 2 != 0:
2239 hex_repr = "0" + hex_repr
2240 cols.append(_colourize(
2241 "(%s)" % colonize_hex(hex_repr),
2246 if pp.blob.__class__ == binary_type:
2247 cols.append(hexenc(pp.blob))
2248 elif pp.blob.__class__ == tuple:
2249 cols.append(", ".join(pp.blob))
2251 cols.append(_colourize("OPTIONAL", "red", with_colours))
2253 cols.append(_colourize("DEFAULT", "red", with_colours))
2254 if with_decode_path:
2255 cols.append(_colourize(
2256 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2260 return " ".join(cols)
2263 def pp_console_blob(pp, decode_path_len_decrease=0):
2264 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2265 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2266 if decode_path_len > 0:
2267 cols.append(" ." * (decode_path_len + 1))
2268 if pp.blob.__class__ == binary_type:
2269 blob = hexenc(pp.blob).upper()
2270 for i in six_xrange(0, len(blob), 32):
2271 chunk = blob[i:i + 32]
2272 yield " ".join(cols + [colonize_hex(chunk)])
2273 elif pp.blob.__class__ == tuple:
2274 yield " ".join(cols + [", ".join(pp.blob)])
2282 with_decode_path=False,
2283 decode_path_only=(),
2286 """Pretty print object
2288 :param Obj obj: object you want to pretty print
2289 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2290 Its human readable form is printed when OID is met
2291 :param big_blobs: if large binary objects are met (like OctetString
2292 values), do we need to print them too, on separate
2294 :param with_colours: colourize output, if ``termcolor`` library
2296 :param with_decode_path: print decode path
2297 :param decode_path_only: print only that specified decode path
2299 def _pprint_pps(pps):
2301 if hasattr(pp, "_fields"):
2303 decode_path_only != () and
2305 str(p) for p in pp.decode_path[:len(decode_path_only)]
2306 ) != decode_path_only
2310 yield pp_console_row(
2315 with_colours=with_colours,
2316 with_decode_path=with_decode_path,
2317 decode_path_len_decrease=len(decode_path_only),
2319 for row in pp_console_blob(
2321 decode_path_len_decrease=len(decode_path_only),
2325 yield pp_console_row(
2330 with_colours=with_colours,
2331 with_decode_path=with_decode_path,
2332 decode_path_len_decrease=len(decode_path_only),
2335 for row in _pprint_pps(pp):
2337 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2340 ########################################################################
2341 # ASN.1 primitive types
2342 ########################################################################
2344 BooleanState = namedtuple(
2346 BasicState._fields + ("value",),
2352 """``BOOLEAN`` boolean type
2354 >>> b = Boolean(True)
2356 >>> b == Boolean(True)
2362 tag_default = tag_encode(1)
2363 asn1_type_name = "BOOLEAN"
2375 :param value: set the value. Either boolean type, or
2376 :py:class:`pyderasn.Boolean` object
2377 :param bytes impl: override default tag with ``IMPLICIT`` one
2378 :param bytes expl: override default tag with ``EXPLICIT`` one
2379 :param default: set default value. Type same as in ``value``
2380 :param bool optional: is object ``OPTIONAL`` in sequence
2382 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2383 self._value = None if value is None else self._value_sanitize(value)
2384 if default is not None:
2385 default = self._value_sanitize(default)
2386 self.default = self.__class__(
2392 self._value = default
2394 def _value_sanitize(self, value):
2395 if value.__class__ == bool:
2397 if issubclass(value.__class__, Boolean):
2399 raise InvalidValueType((self.__class__, bool))
2403 return self._value is not None
2405 def __getstate__(self):
2406 return BooleanState(
2422 def __setstate__(self, state):
2423 super(Boolean, self).__setstate__(state)
2424 self._value = state.value
2426 def __nonzero__(self):
2427 self._assert_ready()
2431 self._assert_ready()
2434 def __eq__(self, their):
2435 if their.__class__ == bool:
2436 return self._value == their
2437 if not issubclass(their.__class__, Boolean):
2440 self._value == their._value and
2441 self.tag == their.tag and
2442 self._expl == their._expl
2453 return self.__class__(
2455 impl=self.tag if impl is None else impl,
2456 expl=self._expl if expl is None else expl,
2457 default=self.default if default is None else default,
2458 optional=self.optional if optional is None else optional,
2462 self._assert_ready()
2466 (b"\xFF" if self._value else b"\x00"),
2469 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2471 t, _, lv = tag_strip(tlv)
2472 except DecodeError as err:
2473 raise err.__class__(
2475 klass=self.__class__,
2476 decode_path=decode_path,
2481 klass=self.__class__,
2482 decode_path=decode_path,
2489 l, _, v = len_decode(lv)
2490 except DecodeError as err:
2491 raise err.__class__(
2493 klass=self.__class__,
2494 decode_path=decode_path,
2498 raise InvalidLength(
2499 "Boolean's length must be equal to 1",
2500 klass=self.__class__,
2501 decode_path=decode_path,
2505 raise NotEnoughData(
2506 "encoded length is longer than data",
2507 klass=self.__class__,
2508 decode_path=decode_path,
2511 first_octet = byte2int(v)
2513 if first_octet == 0:
2515 elif first_octet == 0xFF:
2517 elif ctx.get("bered", False):
2522 "unacceptable Boolean value",
2523 klass=self.__class__,
2524 decode_path=decode_path,
2527 obj = self.__class__(
2531 default=self.default,
2532 optional=self.optional,
2533 _decoded=(offset, 1, 1),
2535 obj.ber_encoded = ber_encoded
2536 yield decode_path, obj, v[1:]
2539 return pp_console_row(next(self.pps()))
2541 def pps(self, decode_path=()):
2544 asn1_type_name=self.asn1_type_name,
2545 obj_name=self.__class__.__name__,
2546 decode_path=decode_path,
2547 value=str(self._value) if self.ready else None,
2548 optional=self.optional,
2549 default=self == self.default,
2550 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2551 expl=None if self._expl is None else tag_decode(self._expl),
2556 expl_offset=self.expl_offset if self.expled else None,
2557 expl_tlen=self.expl_tlen if self.expled else None,
2558 expl_llen=self.expl_llen if self.expled else None,
2559 expl_vlen=self.expl_vlen if self.expled else None,
2560 expl_lenindef=self.expl_lenindef,
2561 ber_encoded=self.ber_encoded,
2564 for pp in self.pps_lenindef(decode_path):
2568 IntegerState = namedtuple(
2570 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2576 """``INTEGER`` integer type
2578 >>> b = Integer(-123)
2580 >>> b == Integer(-123)
2585 >>> Integer(2, bounds=(1, 3))
2587 >>> Integer(5, bounds=(1, 3))
2588 Traceback (most recent call last):
2589 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2593 class Version(Integer):
2600 >>> v = Version("v1")
2607 {'v3': 2, 'v1': 0, 'v2': 1}
2609 __slots__ = ("specs", "_bound_min", "_bound_max")
2610 tag_default = tag_encode(2)
2611 asn1_type_name = "INTEGER"
2625 :param value: set the value. Either integer type, named value
2626 (if ``schema`` is specified in the class), or
2627 :py:class:`pyderasn.Integer` object
2628 :param bounds: set ``(MIN, MAX)`` value constraint.
2629 (-inf, +inf) by default
2630 :param bytes impl: override default tag with ``IMPLICIT`` one
2631 :param bytes expl: override default tag with ``EXPLICIT`` one
2632 :param default: set default value. Type same as in ``value``
2633 :param bool optional: is object ``OPTIONAL`` in sequence
2635 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2637 specs = getattr(self, "schema", {}) if _specs is None else _specs
2638 self.specs = specs if specs.__class__ == dict else dict(specs)
2639 self._bound_min, self._bound_max = getattr(
2642 (float("-inf"), float("+inf")),
2643 ) if bounds is None else bounds
2644 if value is not None:
2645 self._value = self._value_sanitize(value)
2646 if default is not None:
2647 default = self._value_sanitize(default)
2648 self.default = self.__class__(
2654 if self._value is None:
2655 self._value = default
2657 def _value_sanitize(self, value):
2658 if isinstance(value, integer_types):
2660 elif issubclass(value.__class__, Integer):
2661 value = value._value
2662 elif value.__class__ == str:
2663 value = self.specs.get(value)
2665 raise ObjUnknown("integer value: %s" % value)
2667 raise InvalidValueType((self.__class__, int, str))
2668 if not self._bound_min <= value <= self._bound_max:
2669 raise BoundsError(self._bound_min, value, self._bound_max)
2674 return self._value is not None
2676 def __getstate__(self):
2677 return IntegerState(
2696 def __setstate__(self, state):
2697 super(Integer, self).__setstate__(state)
2698 self.specs = state.specs
2699 self._value = state.value
2700 self._bound_min = state.bound_min
2701 self._bound_max = state.bound_max
2704 self._assert_ready()
2705 return int(self._value)
2708 self._assert_ready()
2709 return hash(b"".join((
2711 bytes(self._expl or b""),
2712 str(self._value).encode("ascii"),
2715 def __eq__(self, their):
2716 if isinstance(their, integer_types):
2717 return self._value == their
2718 if not issubclass(their.__class__, Integer):
2721 self._value == their._value and
2722 self.tag == their.tag and
2723 self._expl == their._expl
2726 def __lt__(self, their):
2727 return self._value < their._value
2731 """Return named representation (if exists) of the value
2733 for name, value in iteritems(self.specs):
2734 if value == self._value:
2747 return self.__class__(
2750 (self._bound_min, self._bound_max)
2751 if bounds is None else bounds
2753 impl=self.tag if impl is None else impl,
2754 expl=self._expl if expl is None else expl,
2755 default=self.default if default is None else default,
2756 optional=self.optional if optional is None else optional,
2761 self._assert_ready()
2765 octets = bytearray([0])
2769 octets = bytearray()
2771 octets.append((value & 0xFF) ^ 0xFF)
2773 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2776 octets = bytearray()
2778 octets.append(value & 0xFF)
2780 if octets[-1] & 0x80 > 0:
2783 octets = bytes(octets)
2785 bytes_len = ceil(value.bit_length() / 8) or 1
2788 octets = value.to_bytes(
2793 except OverflowError:
2797 return b"".join((self.tag, len_encode(len(octets)), octets))
2799 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2801 t, _, lv = tag_strip(tlv)
2802 except DecodeError as err:
2803 raise err.__class__(
2805 klass=self.__class__,
2806 decode_path=decode_path,
2811 klass=self.__class__,
2812 decode_path=decode_path,
2819 l, llen, v = len_decode(lv)
2820 except DecodeError as err:
2821 raise err.__class__(
2823 klass=self.__class__,
2824 decode_path=decode_path,
2828 raise NotEnoughData(
2829 "encoded length is longer than data",
2830 klass=self.__class__,
2831 decode_path=decode_path,
2835 raise NotEnoughData(
2837 klass=self.__class__,
2838 decode_path=decode_path,
2841 v, tail = v[:l], v[l:]
2842 first_octet = byte2int(v)
2844 second_octet = byte2int(v[1:])
2846 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2847 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
2850 "non normalized integer",
2851 klass=self.__class__,
2852 decode_path=decode_path,
2857 if first_octet & 0x80 > 0:
2858 octets = bytearray()
2859 for octet in bytearray(v):
2860 octets.append(octet ^ 0xFF)
2861 for octet in octets:
2862 value = (value << 8) | octet
2866 for octet in bytearray(v):
2867 value = (value << 8) | octet
2869 value = int.from_bytes(v, byteorder="big", signed=True)
2871 obj = self.__class__(
2873 bounds=(self._bound_min, self._bound_max),
2876 default=self.default,
2877 optional=self.optional,
2879 _decoded=(offset, llen, l),
2881 except BoundsError as err:
2884 klass=self.__class__,
2885 decode_path=decode_path,
2888 yield decode_path, obj, tail
2891 return pp_console_row(next(self.pps()))
2893 def pps(self, decode_path=()):
2896 asn1_type_name=self.asn1_type_name,
2897 obj_name=self.__class__.__name__,
2898 decode_path=decode_path,
2899 value=(self.named or str(self._value)) if self.ready else None,
2900 optional=self.optional,
2901 default=self == self.default,
2902 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2903 expl=None if self._expl is None else tag_decode(self._expl),
2908 expl_offset=self.expl_offset if self.expled else None,
2909 expl_tlen=self.expl_tlen if self.expled else None,
2910 expl_llen=self.expl_llen if self.expled else None,
2911 expl_vlen=self.expl_vlen if self.expled else None,
2912 expl_lenindef=self.expl_lenindef,
2915 for pp in self.pps_lenindef(decode_path):
2919 BitStringState = namedtuple(
2921 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
2926 class BitString(Obj):
2927 """``BIT STRING`` bit string type
2929 >>> BitString(b"hello world")
2930 BIT STRING 88 bits 68656c6c6f20776f726c64
2933 >>> b == b"hello world"
2938 >>> BitString("'0A3B5F291CD'H")
2939 BIT STRING 44 bits 0a3b5f291cd0
2940 >>> b = BitString("'010110000000'B")
2941 BIT STRING 12 bits 5800
2944 >>> b[0], b[1], b[2], b[3]
2945 (False, True, False, True)
2949 [False, True, False, True, True, False, False, False, False, False, False, False]
2953 class KeyUsage(BitString):
2955 ("digitalSignature", 0),
2956 ("nonRepudiation", 1),
2957 ("keyEncipherment", 2),
2960 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
2961 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
2963 ['nonRepudiation', 'keyEncipherment']
2965 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
2969 Pay attention that BIT STRING can be encoded both in primitive
2970 and constructed forms. Decoder always checks constructed form tag
2971 additionally to specified primitive one. If BER decoding is
2972 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
2973 of DER restrictions.
2975 __slots__ = ("tag_constructed", "specs", "defined")
2976 tag_default = tag_encode(3)
2977 asn1_type_name = "BIT STRING"
2990 :param value: set the value. Either binary type, tuple of named
2991 values (if ``schema`` is specified in the class),
2992 string in ``'XXX...'B`` form, or
2993 :py:class:`pyderasn.BitString` object
2994 :param bytes impl: override default tag with ``IMPLICIT`` one
2995 :param bytes expl: override default tag with ``EXPLICIT`` one
2996 :param default: set default value. Type same as in ``value``
2997 :param bool optional: is object ``OPTIONAL`` in sequence
2999 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3000 specs = getattr(self, "schema", {}) if _specs is None else _specs
3001 self.specs = specs if specs.__class__ == dict else dict(specs)
3002 self._value = None if value is None else self._value_sanitize(value)
3003 if default is not None:
3004 default = self._value_sanitize(default)
3005 self.default = self.__class__(
3011 self._value = default
3013 tag_klass, _, tag_num = tag_decode(self.tag)
3014 self.tag_constructed = tag_encode(
3016 form=TagFormConstructed,
3020 def _bits2octets(self, bits):
3021 if len(self.specs) > 0:
3022 bits = bits.rstrip("0")
3024 bits += "0" * ((8 - (bit_len % 8)) % 8)
3025 octets = bytearray(len(bits) // 8)
3026 for i in six_xrange(len(octets)):
3027 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3028 return bit_len, bytes(octets)
3030 def _value_sanitize(self, value):
3031 if isinstance(value, (string_types, binary_type)):
3033 isinstance(value, string_types) and
3034 value.startswith("'")
3036 if value.endswith("'B"):
3038 if not frozenset(value) <= SET01:
3039 raise ValueError("B's coding contains unacceptable chars")
3040 return self._bits2octets(value)
3041 if value.endswith("'H"):
3045 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3047 if value.__class__ == binary_type:
3048 return (len(value) * 8, value)
3049 raise InvalidValueType((self.__class__, string_types, binary_type))
3050 if value.__class__ == tuple:
3053 isinstance(value[0], integer_types) and
3054 value[1].__class__ == binary_type
3059 bit = self.specs.get(name)
3061 raise ObjUnknown("BitString value: %s" % name)
3064 return self._bits2octets("")
3065 bits = frozenset(bits)
3066 return self._bits2octets("".join(
3067 ("1" if bit in bits else "0")
3068 for bit in six_xrange(max(bits) + 1)
3070 if issubclass(value.__class__, BitString):
3072 raise InvalidValueType((self.__class__, binary_type, string_types))
3076 return self._value is not None
3078 def __getstate__(self):
3079 return BitStringState(
3094 self.tag_constructed,
3098 def __setstate__(self, state):
3099 super(BitString, self).__setstate__(state)
3100 self.specs = state.specs
3101 self._value = state.value
3102 self.tag_constructed = state.tag_constructed
3103 self.defined = state.defined
3106 self._assert_ready()
3107 for i in six_xrange(self._value[0]):
3112 """Returns number of bits in the string
3114 self._assert_ready()
3115 return self._value[0]
3117 def __bytes__(self):
3118 self._assert_ready()
3119 return self._value[1]
3121 def __eq__(self, their):
3122 if their.__class__ == bytes:
3123 return self._value[1] == their
3124 if not issubclass(their.__class__, BitString):
3127 self._value == their._value and
3128 self.tag == their.tag and
3129 self._expl == their._expl
3134 """Named representation (if exists) of the bits
3136 :returns: [str(name), ...]
3138 return [name for name, bit in iteritems(self.specs) if self[bit]]
3148 return self.__class__(
3150 impl=self.tag if impl is None else impl,
3151 expl=self._expl if expl is None else expl,
3152 default=self.default if default is None else default,
3153 optional=self.optional if optional is None else optional,
3157 def __getitem__(self, key):
3158 if key.__class__ == int:
3159 bit_len, octets = self._value
3163 byte2int(memoryview(octets)[key // 8:]) >>
3166 if isinstance(key, string_types):
3167 value = self.specs.get(key)
3169 raise ObjUnknown("BitString value: %s" % key)
3171 raise InvalidValueType((int, str))
3174 self._assert_ready()
3175 bit_len, octets = self._value
3178 len_encode(len(octets) + 1),
3179 int2byte((8 - bit_len % 8) % 8),
3183 def _encode_cer(self, writer):
3184 bit_len, octets = self._value
3185 if len(octets) + 1 <= 1000:
3186 write_full(writer, self._encode())
3188 write_full(writer, self.tag_constructed)
3189 write_full(writer, LENINDEF)
3190 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3191 write_full(writer, b"".join((
3192 BitString.tag_default,
3195 octets[offset:offset + 999],
3197 tail = octets[offset+999:]
3199 tail = int2byte((8 - bit_len % 8) % 8) + tail
3200 write_full(writer, b"".join((
3201 BitString.tag_default,
3202 len_encode(len(tail)),
3205 write_full(writer, EOC)
3207 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3209 t, tlen, lv = tag_strip(tlv)
3210 except DecodeError as err:
3211 raise err.__class__(
3213 klass=self.__class__,
3214 decode_path=decode_path,
3218 if tag_only: # pragma: no cover
3222 l, llen, v = len_decode(lv)
3223 except DecodeError as err:
3224 raise err.__class__(
3226 klass=self.__class__,
3227 decode_path=decode_path,
3231 raise NotEnoughData(
3232 "encoded length is longer than data",
3233 klass=self.__class__,
3234 decode_path=decode_path,
3238 raise NotEnoughData(
3240 klass=self.__class__,
3241 decode_path=decode_path,
3244 pad_size = byte2int(v)
3245 if l == 1 and pad_size != 0:
3247 "invalid empty value",
3248 klass=self.__class__,
3249 decode_path=decode_path,
3255 klass=self.__class__,
3256 decode_path=decode_path,
3259 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3262 klass=self.__class__,
3263 decode_path=decode_path,
3266 v, tail = v[:l], v[l:]
3267 bit_len = (len(v) - 1) * 8 - pad_size
3268 obj = self.__class__(
3269 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3272 default=self.default,
3273 optional=self.optional,
3275 _decoded=(offset, llen, l),
3278 obj._value = (bit_len, None)
3279 yield decode_path, obj, tail
3281 if t != self.tag_constructed:
3283 klass=self.__class__,
3284 decode_path=decode_path,
3287 if not ctx.get("bered", False):
3289 "unallowed BER constructed encoding",
3290 klass=self.__class__,
3291 decode_path=decode_path,
3294 if tag_only: # pragma: no cover
3299 l, llen, v = len_decode(lv)
3300 except LenIndefForm:
3301 llen, l, v = 1, 0, lv[1:]
3303 except DecodeError as err:
3304 raise err.__class__(
3306 klass=self.__class__,
3307 decode_path=decode_path,
3311 raise NotEnoughData(
3312 "encoded length is longer than data",
3313 klass=self.__class__,
3314 decode_path=decode_path,
3317 if not lenindef and l == 0:
3318 raise NotEnoughData(
3320 klass=self.__class__,
3321 decode_path=decode_path,
3325 sub_offset = offset + tlen + llen
3329 if v[:EOC_LEN].tobytes() == EOC:
3336 "chunk out of bounds",
3337 klass=self.__class__,
3338 decode_path=decode_path + (str(len(chunks) - 1),),
3339 offset=chunks[-1].offset,
3341 sub_decode_path = decode_path + (str(len(chunks)),)
3344 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3347 decode_path=sub_decode_path,
3350 _ctx_immutable=False,
3352 yield _decode_path, chunk, v_tail
3354 _, chunk, v_tail = next(BitString().decode_evgen(
3357 decode_path=sub_decode_path,
3360 _ctx_immutable=False,
3365 "expected BitString encoded chunk",
3366 klass=self.__class__,
3367 decode_path=sub_decode_path,
3370 chunks.append(chunk)
3371 sub_offset += chunk.tlvlen
3372 vlen += chunk.tlvlen
3374 if len(chunks) == 0:
3377 klass=self.__class__,
3378 decode_path=decode_path,
3383 for chunk_i, chunk in enumerate(chunks[:-1]):
3384 if chunk.bit_len % 8 != 0:
3386 "BitString chunk is not multiple of 8 bits",
3387 klass=self.__class__,
3388 decode_path=decode_path + (str(chunk_i),),
3389 offset=chunk.offset,
3392 values.append(bytes(chunk))
3393 bit_len += chunk.bit_len
3394 chunk_last = chunks[-1]
3396 values.append(bytes(chunk_last))
3397 bit_len += chunk_last.bit_len
3398 obj = self.__class__(
3399 value=None if evgen_mode else (bit_len, b"".join(values)),
3402 default=self.default,
3403 optional=self.optional,
3405 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3408 obj._value = (bit_len, None)
3409 obj.lenindef = lenindef
3410 obj.ber_encoded = True
3411 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3414 return pp_console_row(next(self.pps()))
3416 def pps(self, decode_path=()):
3420 bit_len, blob = self._value
3421 value = "%d bits" % bit_len
3422 if len(self.specs) > 0 and blob is not None:
3423 blob = tuple(self.named)
3426 asn1_type_name=self.asn1_type_name,
3427 obj_name=self.__class__.__name__,
3428 decode_path=decode_path,
3431 optional=self.optional,
3432 default=self == self.default,
3433 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3434 expl=None if self._expl is None else tag_decode(self._expl),
3439 expl_offset=self.expl_offset if self.expled else None,
3440 expl_tlen=self.expl_tlen if self.expled else None,
3441 expl_llen=self.expl_llen if self.expled else None,
3442 expl_vlen=self.expl_vlen if self.expled else None,
3443 expl_lenindef=self.expl_lenindef,
3444 lenindef=self.lenindef,
3445 ber_encoded=self.ber_encoded,
3448 defined_by, defined = self.defined or (None, None)
3449 if defined_by is not None:
3451 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3453 for pp in self.pps_lenindef(decode_path):
3457 OctetStringState = namedtuple(
3459 BasicState._fields + (
3470 class OctetString(Obj):
3471 """``OCTET STRING`` binary string type
3473 >>> s = OctetString(b"hello world")
3474 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3475 >>> s == OctetString(b"hello world")
3480 >>> OctetString(b"hello", bounds=(4, 4))
3481 Traceback (most recent call last):
3482 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3483 >>> OctetString(b"hell", bounds=(4, 4))
3484 OCTET STRING 4 bytes 68656c6c
3486 Memoryviews can be used as a values. If memoryview is made on
3487 mmap-ed file, then it does not take storage inside OctetString
3488 itself. In CER encoding mode it will be streamed to the specified
3489 writer, copying 1 KB chunks.
3491 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3492 tag_default = tag_encode(4)
3493 asn1_type_name = "OCTET STRING"
3494 evgen_mode_skip_value = True
3508 :param value: set the value. Either binary type, or
3509 :py:class:`pyderasn.OctetString` object
3510 :param bounds: set ``(MIN, MAX)`` value size constraint.
3511 (-inf, +inf) by default
3512 :param bytes impl: override default tag with ``IMPLICIT`` one
3513 :param bytes expl: override default tag with ``EXPLICIT`` one
3514 :param default: set default value. Type same as in ``value``
3515 :param bool optional: is object ``OPTIONAL`` in sequence
3517 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3519 self._bound_min, self._bound_max = getattr(
3523 ) if bounds is None else bounds
3524 if value is not None:
3525 self._value = self._value_sanitize(value)
3526 if default is not None:
3527 default = self._value_sanitize(default)
3528 self.default = self.__class__(
3533 if self._value is None:
3534 self._value = default
3536 tag_klass, _, tag_num = tag_decode(self.tag)
3537 self.tag_constructed = tag_encode(
3539 form=TagFormConstructed,
3543 def _value_sanitize(self, value):
3544 if value.__class__ == binary_type or value.__class__ == memoryview:
3546 elif issubclass(value.__class__, OctetString):
3547 value = value._value
3549 raise InvalidValueType((self.__class__, bytes, memoryview))
3550 if not self._bound_min <= len(value) <= self._bound_max:
3551 raise BoundsError(self._bound_min, len(value), self._bound_max)
3556 return self._value is not None
3558 def __getstate__(self):
3559 return OctetStringState(
3575 self.tag_constructed,
3579 def __setstate__(self, state):
3580 super(OctetString, self).__setstate__(state)
3581 self._value = state.value
3582 self._bound_min = state.bound_min
3583 self._bound_max = state.bound_max
3584 self.tag_constructed = state.tag_constructed
3585 self.defined = state.defined
3587 def __bytes__(self):
3588 self._assert_ready()
3589 return bytes(self._value)
3591 def __eq__(self, their):
3592 if their.__class__ == binary_type:
3593 return self._value == their
3594 if not issubclass(their.__class__, OctetString):
3597 self._value == their._value and
3598 self.tag == their.tag and
3599 self._expl == their._expl
3602 def __lt__(self, their):
3603 return self._value < their._value
3614 return self.__class__(
3617 (self._bound_min, self._bound_max)
3618 if bounds is None else bounds
3620 impl=self.tag if impl is None else impl,
3621 expl=self._expl if expl is None else expl,
3622 default=self.default if default is None else default,
3623 optional=self.optional if optional is None else optional,
3627 self._assert_ready()
3630 len_encode(len(self._value)),
3634 def _encode_cer(self, writer):
3635 octets = self._value
3636 if len(octets) <= 1000:
3637 write_full(writer, self._encode())
3639 write_full(writer, self.tag_constructed)
3640 write_full(writer, LENINDEF)
3641 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3642 write_full(writer, b"".join((
3643 OctetString.tag_default,
3645 octets[offset:offset + 1000],
3647 tail = octets[offset+1000:]
3649 write_full(writer, b"".join((
3650 OctetString.tag_default,
3651 len_encode(len(tail)),
3654 write_full(writer, EOC)
3656 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3658 t, tlen, lv = tag_strip(tlv)
3659 except DecodeError as err:
3660 raise err.__class__(
3662 klass=self.__class__,
3663 decode_path=decode_path,
3671 l, llen, v = len_decode(lv)
3672 except DecodeError as err:
3673 raise err.__class__(
3675 klass=self.__class__,
3676 decode_path=decode_path,
3680 raise NotEnoughData(
3681 "encoded length is longer than data",
3682 klass=self.__class__,
3683 decode_path=decode_path,
3686 v, tail = v[:l], v[l:]
3687 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3689 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3690 klass=self.__class__,
3691 decode_path=decode_path,
3695 obj = self.__class__(
3697 None if (evgen_mode and self.evgen_mode_skip_value)
3700 bounds=(self._bound_min, self._bound_max),
3703 default=self.default,
3704 optional=self.optional,
3705 _decoded=(offset, llen, l),
3708 except DecodeError as err:
3711 klass=self.__class__,
3712 decode_path=decode_path,
3715 except BoundsError as err:
3718 klass=self.__class__,
3719 decode_path=decode_path,
3722 yield decode_path, obj, tail
3724 if t != self.tag_constructed:
3726 klass=self.__class__,
3727 decode_path=decode_path,
3730 if not ctx.get("bered", False):
3732 "unallowed BER constructed encoding",
3733 klass=self.__class__,
3734 decode_path=decode_path,
3742 l, llen, v = len_decode(lv)
3743 except LenIndefForm:
3744 llen, l, v = 1, 0, lv[1:]
3746 except DecodeError as err:
3747 raise err.__class__(
3749 klass=self.__class__,
3750 decode_path=decode_path,
3754 raise NotEnoughData(
3755 "encoded length is longer than data",
3756 klass=self.__class__,
3757 decode_path=decode_path,
3762 sub_offset = offset + tlen + llen
3767 if v[:EOC_LEN].tobytes() == EOC:
3774 "chunk out of bounds",
3775 klass=self.__class__,
3776 decode_path=decode_path + (str(len(chunks) - 1),),
3777 offset=chunks[-1].offset,
3781 sub_decode_path = decode_path + (str(chunks_count),)
3782 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3785 decode_path=sub_decode_path,
3788 _ctx_immutable=False,
3790 yield _decode_path, chunk, v_tail
3791 if not chunk.ber_encoded:
3792 payload_len += chunk.vlen
3795 sub_decode_path = decode_path + (str(len(chunks)),)
3796 _, chunk, v_tail = next(OctetString().decode_evgen(
3799 decode_path=sub_decode_path,
3802 _ctx_immutable=False,
3805 chunks.append(chunk)
3808 "expected OctetString encoded chunk",
3809 klass=self.__class__,
3810 decode_path=sub_decode_path,
3813 sub_offset += chunk.tlvlen
3814 vlen += chunk.tlvlen
3816 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3818 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3819 klass=self.__class__,
3820 decode_path=decode_path,
3824 obj = self.__class__(
3826 None if evgen_mode else
3827 b"".join(bytes(chunk) for chunk in chunks)
3829 bounds=(self._bound_min, self._bound_max),
3832 default=self.default,
3833 optional=self.optional,
3834 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3837 except DecodeError as err:
3840 klass=self.__class__,
3841 decode_path=decode_path,
3844 except BoundsError as err:
3847 klass=self.__class__,
3848 decode_path=decode_path,
3851 obj.lenindef = lenindef
3852 obj.ber_encoded = True
3853 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3856 return pp_console_row(next(self.pps()))
3858 def pps(self, decode_path=()):
3861 asn1_type_name=self.asn1_type_name,
3862 obj_name=self.__class__.__name__,
3863 decode_path=decode_path,
3864 value=("%d bytes" % len(self._value)) if self.ready else None,
3865 blob=self._value if self.ready else None,
3866 optional=self.optional,
3867 default=self == self.default,
3868 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3869 expl=None if self._expl is None else tag_decode(self._expl),
3874 expl_offset=self.expl_offset if self.expled else None,
3875 expl_tlen=self.expl_tlen if self.expled else None,
3876 expl_llen=self.expl_llen if self.expled else None,
3877 expl_vlen=self.expl_vlen if self.expled else None,
3878 expl_lenindef=self.expl_lenindef,
3879 lenindef=self.lenindef,
3880 ber_encoded=self.ber_encoded,
3883 defined_by, defined = self.defined or (None, None)
3884 if defined_by is not None:
3886 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3888 for pp in self.pps_lenindef(decode_path):
3892 def agg_octet_string(evgens, decode_path, raw, writer):
3893 """Aggregate constructed string (OctetString and its derivatives)
3895 :param evgens: iterator of generated events
3896 :param decode_path: points to the string we want to decode
3897 :param raw: slicebable (memoryview, bytearray, etc) with
3898 the data evgens are generated on
3899 :param writer: buffer.write where string is going to be saved
3900 :param writer: where string is going to be saved. Must comply
3901 with ``io.RawIOBase.write`` behaviour
3903 decode_path_len = len(decode_path)
3904 for dp, obj, _ in evgens:
3905 if dp[:decode_path_len] != decode_path:
3907 if not obj.ber_encoded:
3908 write_full(writer, raw[
3909 obj.offset + obj.tlen + obj.llen:
3910 obj.offset + obj.tlen + obj.llen + obj.vlen -
3911 (EOC_LEN if obj.expl_lenindef else 0)
3913 if len(dp) == decode_path_len:
3917 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
3921 """``NULL`` null object
3929 tag_default = tag_encode(5)
3930 asn1_type_name = "NULL"
3934 value=None, # unused, but Sequence passes it
3941 :param bytes impl: override default tag with ``IMPLICIT`` one
3942 :param bytes expl: override default tag with ``EXPLICIT`` one
3943 :param bool optional: is object ``OPTIONAL`` in sequence
3945 super(Null, self).__init__(impl, expl, None, optional, _decoded)
3952 def __getstate__(self):
3968 def __eq__(self, their):
3969 if not issubclass(their.__class__, Null):
3972 self.tag == their.tag and
3973 self._expl == their._expl
3983 return self.__class__(
3984 impl=self.tag if impl is None else impl,
3985 expl=self._expl if expl is None else expl,
3986 optional=self.optional if optional is None else optional,
3990 return self.tag + len_encode(0)
3992 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3994 t, _, lv = tag_strip(tlv)
3995 except DecodeError as err:
3996 raise err.__class__(
3998 klass=self.__class__,
3999 decode_path=decode_path,
4004 klass=self.__class__,
4005 decode_path=decode_path,
4008 if tag_only: # pragma: no cover
4012 l, _, v = len_decode(lv)
4013 except DecodeError as err:
4014 raise err.__class__(
4016 klass=self.__class__,
4017 decode_path=decode_path,
4021 raise InvalidLength(
4022 "Null must have zero length",
4023 klass=self.__class__,
4024 decode_path=decode_path,
4027 obj = self.__class__(
4030 optional=self.optional,
4031 _decoded=(offset, 1, 0),
4033 yield decode_path, obj, v
4036 return pp_console_row(next(self.pps()))
4038 def pps(self, decode_path=()):
4041 asn1_type_name=self.asn1_type_name,
4042 obj_name=self.__class__.__name__,
4043 decode_path=decode_path,
4044 optional=self.optional,
4045 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4046 expl=None if self._expl is None else tag_decode(self._expl),
4051 expl_offset=self.expl_offset if self.expled else None,
4052 expl_tlen=self.expl_tlen if self.expled else None,
4053 expl_llen=self.expl_llen if self.expled else None,
4054 expl_vlen=self.expl_vlen if self.expled else None,
4055 expl_lenindef=self.expl_lenindef,
4058 for pp in self.pps_lenindef(decode_path):
4062 ObjectIdentifierState = namedtuple(
4063 "ObjectIdentifierState",
4064 BasicState._fields + ("value", "defines"),
4069 class ObjectIdentifier(Obj):
4070 """``OBJECT IDENTIFIER`` OID type
4072 >>> oid = ObjectIdentifier((1, 2, 3))
4073 OBJECT IDENTIFIER 1.2.3
4074 >>> oid == ObjectIdentifier("1.2.3")
4080 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4081 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4083 >>> str(ObjectIdentifier((3, 1)))
4084 Traceback (most recent call last):
4085 pyderasn.InvalidOID: unacceptable first arc value
4087 __slots__ = ("defines",)
4088 tag_default = tag_encode(6)
4089 asn1_type_name = "OBJECT IDENTIFIER"
4102 :param value: set the value. Either tuples of integers,
4103 string of "."-concatenated integers, or
4104 :py:class:`pyderasn.ObjectIdentifier` object
4105 :param defines: sequence of tuples. Each tuple has two elements.
4106 First one is relative to current one decode
4107 path, aiming to the field defined by that OID.
4108 Read about relative path in
4109 :py:func:`pyderasn.abs_decode_path`. Second
4110 tuple element is ``{OID: pyderasn.Obj()}``
4111 dictionary, mapping between current OID value
4112 and structure applied to defined field.
4113 :ref:`Read about DEFINED BY <definedby>`
4114 :param bytes impl: override default tag with ``IMPLICIT`` one
4115 :param bytes expl: override default tag with ``EXPLICIT`` one
4116 :param default: set default value. Type same as in ``value``
4117 :param bool optional: is object ``OPTIONAL`` in sequence
4119 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4121 if value is not None:
4122 self._value = self._value_sanitize(value)
4123 if default is not None:
4124 default = self._value_sanitize(default)
4125 self.default = self.__class__(
4130 if self._value is None:
4131 self._value = default
4132 self.defines = defines
4134 def __add__(self, their):
4135 if their.__class__ == tuple:
4136 return self.__class__(self._value + array("L", their))
4137 if isinstance(their, self.__class__):
4138 return self.__class__(self._value + their._value)
4139 raise InvalidValueType((self.__class__, tuple))
4141 def _value_sanitize(self, value):
4142 if issubclass(value.__class__, ObjectIdentifier):
4144 if isinstance(value, string_types):
4146 value = array("L", (pureint(arc) for arc in value.split(".")))
4148 raise InvalidOID("unacceptable arcs values")
4149 if value.__class__ == tuple:
4151 value = array("L", value)
4152 except OverflowError as err:
4153 raise InvalidOID(repr(err))
4154 if value.__class__ is array:
4156 raise InvalidOID("less than 2 arcs")
4157 first_arc = value[0]
4158 if first_arc in (0, 1):
4159 if not (0 <= value[1] <= 39):
4160 raise InvalidOID("second arc is too wide")
4161 elif first_arc == 2:
4164 raise InvalidOID("unacceptable first arc value")
4165 if not all(arc >= 0 for arc in value):
4166 raise InvalidOID("negative arc value")
4168 raise InvalidValueType((self.__class__, str, tuple))
4172 return self._value is not None
4174 def __getstate__(self):
4175 return ObjectIdentifierState(
4192 def __setstate__(self, state):
4193 super(ObjectIdentifier, self).__setstate__(state)
4194 self._value = state.value
4195 self.defines = state.defines
4198 self._assert_ready()
4199 return iter(self._value)
4202 return ".".join(str(arc) for arc in self._value or ())
4205 self._assert_ready()
4206 return hash(b"".join((
4208 bytes(self._expl or b""),
4209 str(self._value).encode("ascii"),
4212 def __eq__(self, their):
4213 if their.__class__ == tuple:
4214 return self._value == array("L", their)
4215 if not issubclass(their.__class__, ObjectIdentifier):
4218 self.tag == their.tag and
4219 self._expl == their._expl and
4220 self._value == their._value
4223 def __lt__(self, their):
4224 return self._value < their._value
4235 return self.__class__(
4237 defines=self.defines if defines is None else defines,
4238 impl=self.tag if impl is None else impl,
4239 expl=self._expl if expl is None else expl,
4240 default=self.default if default is None else default,
4241 optional=self.optional if optional is None else optional,
4245 self._assert_ready()
4247 first_value = value[1]
4248 first_arc = value[0]
4251 elif first_arc == 1:
4253 elif first_arc == 2:
4255 else: # pragma: no cover
4256 raise RuntimeError("invalid arc is stored")
4257 octets = [zero_ended_encode(first_value)]
4258 for arc in value[2:]:
4259 octets.append(zero_ended_encode(arc))
4260 v = b"".join(octets)
4261 return b"".join((self.tag, len_encode(len(v)), v))
4263 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4265 t, _, lv = tag_strip(tlv)
4266 except DecodeError as err:
4267 raise err.__class__(
4269 klass=self.__class__,
4270 decode_path=decode_path,
4275 klass=self.__class__,
4276 decode_path=decode_path,
4279 if tag_only: # pragma: no cover
4283 l, llen, v = len_decode(lv)
4284 except DecodeError as err:
4285 raise err.__class__(
4287 klass=self.__class__,
4288 decode_path=decode_path,
4292 raise NotEnoughData(
4293 "encoded length is longer than data",
4294 klass=self.__class__,
4295 decode_path=decode_path,
4299 raise NotEnoughData(
4301 klass=self.__class__,
4302 decode_path=decode_path,
4305 v, tail = v[:l], v[l:]
4312 octet = indexbytes(v, i)
4313 if i == 0 and octet == 0x80:
4314 if ctx.get("bered", False):
4318 "non normalized arc encoding",
4319 klass=self.__class__,
4320 decode_path=decode_path,
4323 arc = (arc << 7) | (octet & 0x7F)
4324 if octet & 0x80 == 0:
4327 except OverflowError:
4329 "too huge value for local unsigned long",
4330 klass=self.__class__,
4331 decode_path=decode_path,
4340 klass=self.__class__,
4341 decode_path=decode_path,
4345 second_arc = arcs[0]
4346 if 0 <= second_arc <= 39:
4348 elif 40 <= second_arc <= 79:
4354 obj = self.__class__(
4355 value=array("L", (first_arc, second_arc)) + arcs[1:],
4358 default=self.default,
4359 optional=self.optional,
4360 _decoded=(offset, llen, l),
4363 obj.ber_encoded = True
4364 yield decode_path, obj, tail
4367 return pp_console_row(next(self.pps()))
4369 def pps(self, decode_path=()):
4372 asn1_type_name=self.asn1_type_name,
4373 obj_name=self.__class__.__name__,
4374 decode_path=decode_path,
4375 value=str(self) if self.ready else None,
4376 optional=self.optional,
4377 default=self == self.default,
4378 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4379 expl=None if self._expl is None else tag_decode(self._expl),
4384 expl_offset=self.expl_offset if self.expled else None,
4385 expl_tlen=self.expl_tlen if self.expled else None,
4386 expl_llen=self.expl_llen if self.expled else None,
4387 expl_vlen=self.expl_vlen if self.expled else None,
4388 expl_lenindef=self.expl_lenindef,
4389 ber_encoded=self.ber_encoded,
4392 for pp in self.pps_lenindef(decode_path):
4396 class Enumerated(Integer):
4397 """``ENUMERATED`` integer type
4399 This type is identical to :py:class:`pyderasn.Integer`, but requires
4400 schema to be specified and does not accept values missing from it.
4403 tag_default = tag_encode(10)
4404 asn1_type_name = "ENUMERATED"
4415 bounds=None, # dummy argument, workability for Integer.decode
4417 super(Enumerated, self).__init__(
4418 value, bounds, impl, expl, default, optional, _specs, _decoded,
4420 if len(self.specs) == 0:
4421 raise ValueError("schema must be specified")
4423 def _value_sanitize(self, value):
4424 if isinstance(value, self.__class__):
4425 value = value._value
4426 elif isinstance(value, integer_types):
4427 for _value in itervalues(self.specs):
4432 "unknown integer value: %s" % value,
4433 klass=self.__class__,
4435 elif isinstance(value, string_types):
4436 value = self.specs.get(value)
4438 raise ObjUnknown("integer value: %s" % value)
4440 raise InvalidValueType((self.__class__, int, str))
4452 return self.__class__(
4454 impl=self.tag if impl is None else impl,
4455 expl=self._expl if expl is None else expl,
4456 default=self.default if default is None else default,
4457 optional=self.optional if optional is None else optional,
4462 def escape_control_unicode(c):
4463 if unicat(c)[0] == "C":
4464 c = repr(c).lstrip("u").strip("'")
4468 class CommonString(OctetString):
4469 """Common class for all strings
4471 Everything resembles :py:class:`pyderasn.OctetString`, except
4472 ability to deal with unicode text strings.
4474 >>> hexenc("привет мир".encode("utf-8"))
4475 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4476 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4478 >>> s = UTF8String("привет мир")
4479 UTF8String UTF8String привет мир
4481 'привет мир'
4482 >>> hexenc(bytes(s))
4483 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4485 >>> PrintableString("привет мир")
4486 Traceback (most recent call last):
4487 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4489 >>> BMPString("ада", bounds=(2, 2))
4490 Traceback (most recent call last):
4491 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4492 >>> s = BMPString("ад", bounds=(2, 2))
4495 >>> hexenc(bytes(s))
4503 * - :py:class:`pyderasn.UTF8String`
4505 * - :py:class:`pyderasn.NumericString`
4507 * - :py:class:`pyderasn.PrintableString`
4509 * - :py:class:`pyderasn.TeletexString`
4511 * - :py:class:`pyderasn.T61String`
4513 * - :py:class:`pyderasn.VideotexString`
4515 * - :py:class:`pyderasn.IA5String`
4517 * - :py:class:`pyderasn.GraphicString`
4519 * - :py:class:`pyderasn.VisibleString`
4521 * - :py:class:`pyderasn.ISO646String`
4523 * - :py:class:`pyderasn.GeneralString`
4525 * - :py:class:`pyderasn.UniversalString`
4527 * - :py:class:`pyderasn.BMPString`
4532 def _value_sanitize(self, value):
4534 value_decoded = None
4535 if isinstance(value, self.__class__):
4536 value_raw = value._value
4537 elif value.__class__ == text_type:
4538 value_decoded = value
4539 elif value.__class__ == binary_type:
4542 raise InvalidValueType((self.__class__, text_type, binary_type))
4545 value_decoded.encode(self.encoding)
4546 if value_raw is None else value_raw
4549 value_raw.decode(self.encoding)
4550 if value_decoded is None else value_decoded
4552 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4553 raise DecodeError(str(err))
4554 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4562 def __eq__(self, their):
4563 if their.__class__ == binary_type:
4564 return self._value == their
4565 if their.__class__ == text_type:
4566 return self._value == their.encode(self.encoding)
4567 if not isinstance(their, self.__class__):
4570 self._value == their._value and
4571 self.tag == their.tag and
4572 self._expl == their._expl
4575 def __unicode__(self):
4577 return self._value.decode(self.encoding)
4578 return text_type(self._value)
4581 return pp_console_row(next(self.pps(no_unicode=PY2)))
4583 def pps(self, decode_path=(), no_unicode=False):
4587 hexenc(bytes(self)) if no_unicode else
4588 "".join(escape_control_unicode(c) for c in self.__unicode__())
4592 asn1_type_name=self.asn1_type_name,
4593 obj_name=self.__class__.__name__,
4594 decode_path=decode_path,
4596 optional=self.optional,
4597 default=self == self.default,
4598 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4599 expl=None if self._expl is None else tag_decode(self._expl),
4604 expl_offset=self.expl_offset if self.expled else None,
4605 expl_tlen=self.expl_tlen if self.expled else None,
4606 expl_llen=self.expl_llen if self.expled else None,
4607 expl_vlen=self.expl_vlen if self.expled else None,
4608 expl_lenindef=self.expl_lenindef,
4609 ber_encoded=self.ber_encoded,
4612 for pp in self.pps_lenindef(decode_path):
4616 class UTF8String(CommonString):
4618 tag_default = tag_encode(12)
4620 asn1_type_name = "UTF8String"
4623 class AllowableCharsMixin(object):
4625 def allowable_chars(self):
4627 return self._allowable_chars
4628 return frozenset(six_unichr(c) for c in self._allowable_chars)
4631 class NumericString(AllowableCharsMixin, CommonString):
4634 Its value is properly sanitized: only ASCII digits with spaces can
4637 >>> NumericString().allowable_chars
4638 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4641 tag_default = tag_encode(18)
4643 asn1_type_name = "NumericString"
4644 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4646 def _value_sanitize(self, value):
4647 value = super(NumericString, self)._value_sanitize(value)
4648 if not frozenset(value) <= self._allowable_chars:
4649 raise DecodeError("non-numeric value")
4653 PrintableStringState = namedtuple(
4654 "PrintableStringState",
4655 OctetStringState._fields + ("allowable_chars",),
4660 class PrintableString(AllowableCharsMixin, CommonString):
4663 Its value is properly sanitized: see X.680 41.4 table 10.
4665 >>> PrintableString().allowable_chars
4666 frozenset([' ', "'", ..., 'z'])
4667 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4668 PrintableString PrintableString foo*bar
4669 >>> obj.allow_asterisk, obj.allow_ampersand
4673 tag_default = tag_encode(19)
4675 asn1_type_name = "PrintableString"
4676 _allowable_chars = frozenset(
4677 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4679 _asterisk = frozenset("*".encode("ascii"))
4680 _ampersand = frozenset("&".encode("ascii"))
4692 allow_asterisk=False,
4693 allow_ampersand=False,
4696 :param allow_asterisk: allow asterisk character
4697 :param allow_ampersand: allow ampersand character
4700 self._allowable_chars |= self._asterisk
4702 self._allowable_chars |= self._ampersand
4703 super(PrintableString, self).__init__(
4704 value, bounds, impl, expl, default, optional, _decoded, ctx,
4708 def allow_asterisk(self):
4709 """Is asterisk character allowed?
4711 return self._asterisk <= self._allowable_chars
4714 def allow_ampersand(self):
4715 """Is ampersand character allowed?
4717 return self._ampersand <= self._allowable_chars
4719 def _value_sanitize(self, value):
4720 value = super(PrintableString, self)._value_sanitize(value)
4721 if not frozenset(value) <= self._allowable_chars:
4722 raise DecodeError("non-printable value")
4725 def __getstate__(self):
4726 return PrintableStringState(
4727 *super(PrintableString, self).__getstate__(),
4728 **{"allowable_chars": self._allowable_chars}
4731 def __setstate__(self, state):
4732 super(PrintableString, self).__setstate__(state)
4733 self._allowable_chars = state.allowable_chars
4744 return self.__class__(
4747 (self._bound_min, self._bound_max)
4748 if bounds is None else bounds
4750 impl=self.tag if impl is None else impl,
4751 expl=self._expl if expl is None else expl,
4752 default=self.default if default is None else default,
4753 optional=self.optional if optional is None else optional,
4754 allow_asterisk=self.allow_asterisk,
4755 allow_ampersand=self.allow_ampersand,
4759 class TeletexString(CommonString):
4761 tag_default = tag_encode(20)
4763 asn1_type_name = "TeletexString"
4766 class T61String(TeletexString):
4768 asn1_type_name = "T61String"
4771 class VideotexString(CommonString):
4773 tag_default = tag_encode(21)
4774 encoding = "iso-8859-1"
4775 asn1_type_name = "VideotexString"
4778 class IA5String(CommonString):
4780 tag_default = tag_encode(22)
4782 asn1_type_name = "IA5"
4785 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4786 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4787 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4790 class VisibleString(CommonString):
4792 tag_default = tag_encode(26)
4794 asn1_type_name = "VisibleString"
4797 UTCTimeState = namedtuple(
4799 OctetStringState._fields + ("ber_raw",),
4804 def str_to_time_fractions(value):
4806 year, v = (v // 10**10), (v % 10**10)
4807 month, v = (v // 10**8), (v % 10**8)
4808 day, v = (v // 10**6), (v % 10**6)
4809 hour, v = (v // 10**4), (v % 10**4)
4810 minute, second = (v // 100), (v % 100)
4811 return year, month, day, hour, minute, second
4814 class UTCTime(VisibleString):
4815 """``UTCTime`` datetime type
4817 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
4818 UTCTime UTCTime 2017-09-30T22:07:50
4824 datetime.datetime(2017, 9, 30, 22, 7, 50)
4825 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
4826 datetime.datetime(1957, 9, 30, 22, 7, 50)
4828 If BER encoded value was met, then ``ber_raw`` attribute will hold
4829 its raw representation.
4833 Pay attention that UTCTime can not hold full year, so all years
4834 having < 50 years are treated as 20xx, 19xx otherwise, according
4835 to X.509 recommendation.
4839 No strict validation of UTC offsets are made, but very crude:
4841 * minutes are not exceeding 60
4842 * offset value is not exceeding 14 hours
4844 __slots__ = ("ber_raw",)
4845 tag_default = tag_encode(23)
4847 asn1_type_name = "UTCTime"
4848 evgen_mode_skip_value = False
4858 bounds=None, # dummy argument, workability for OctetString.decode
4862 :param value: set the value. Either datetime type, or
4863 :py:class:`pyderasn.UTCTime` object
4864 :param bytes impl: override default tag with ``IMPLICIT`` one
4865 :param bytes expl: override default tag with ``EXPLICIT`` one
4866 :param default: set default value. Type same as in ``value``
4867 :param bool optional: is object ``OPTIONAL`` in sequence
4869 super(UTCTime, self).__init__(
4870 None, None, impl, expl, None, optional, _decoded, ctx,
4874 if value is not None:
4875 self._value, self.ber_raw = self._value_sanitize(value, ctx)
4876 self.ber_encoded = self.ber_raw is not None
4877 if default is not None:
4878 default, _ = self._value_sanitize(default)
4879 self.default = self.__class__(
4884 if self._value is None:
4885 self._value = default
4887 self.optional = optional
4889 def _strptime_bered(self, value):
4890 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
4893 raise ValueError("no timezone")
4894 year += 2000 if year < 50 else 1900
4895 decoded = datetime(year, month, day, hour, minute)
4897 if value[-1] == "Z":
4901 raise ValueError("invalid UTC offset")
4902 if value[-5] == "-":
4904 elif value[-5] == "+":
4907 raise ValueError("invalid UTC offset")
4908 v = pureint(value[-4:])
4909 offset, v = (60 * (v % 100)), v // 100
4911 raise ValueError("invalid UTC offset minutes")
4913 if offset > 14 * 3600:
4914 raise ValueError("too big UTC offset")
4918 return offset, decoded
4920 raise ValueError("invalid UTC offset seconds")
4921 seconds = pureint(value)
4923 raise ValueError("invalid seconds value")
4924 return offset, decoded + timedelta(seconds=seconds)
4926 def _strptime(self, value):
4927 # datetime.strptime's format: %y%m%d%H%M%SZ
4928 if len(value) != LEN_YYMMDDHHMMSSZ:
4929 raise ValueError("invalid UTCTime length")
4930 if value[-1] != "Z":
4931 raise ValueError("non UTC timezone")
4932 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
4933 year += 2000 if year < 50 else 1900
4934 return datetime(year, month, day, hour, minute, second)
4936 def _dt_sanitize(self, value):
4937 if value.year < 1950 or value.year > 2049:
4938 raise ValueError("UTCTime can hold only 1950-2049 years")
4939 return value.replace(microsecond=0)
4941 def _value_sanitize(self, value, ctx=None):
4942 if value.__class__ == binary_type:
4944 value_decoded = value.decode("ascii")
4945 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4946 raise DecodeError("invalid UTCTime encoding: %r" % err)
4949 return self._strptime(value_decoded), None
4950 except (TypeError, ValueError) as _err:
4952 if (ctx is not None) and ctx.get("bered", False):
4954 offset, _value = self._strptime_bered(value_decoded)
4955 _value = _value - timedelta(seconds=offset)
4956 return self._dt_sanitize(_value), value
4957 except (TypeError, ValueError, OverflowError) as _err:
4960 "invalid %s format: %r" % (self.asn1_type_name, err),
4961 klass=self.__class__,
4963 if isinstance(value, self.__class__):
4964 return value._value, None
4965 if value.__class__ == datetime:
4966 return self._dt_sanitize(value), None
4967 raise InvalidValueType((self.__class__, datetime))
4969 def _pp_value(self):
4971 value = self._value.isoformat()
4972 if self.ber_encoded:
4973 value += " (%s)" % self.ber_raw
4977 def __unicode__(self):
4979 value = self._value.isoformat()
4980 if self.ber_encoded:
4981 value += " (%s)" % self.ber_raw
4983 return text_type(self._pp_value())
4985 def __getstate__(self):
4986 return UTCTimeState(
4987 *super(UTCTime, self).__getstate__(),
4988 **{"ber_raw": self.ber_raw}
4991 def __setstate__(self, state):
4992 super(UTCTime, self).__setstate__(state)
4993 self.ber_raw = state.ber_raw
4995 def __bytes__(self):
4996 self._assert_ready()
4997 return self._encode_time()
4999 def __eq__(self, their):
5000 if their.__class__ == binary_type:
5001 return self._encode_time() == their
5002 if their.__class__ == datetime:
5003 return self.todatetime() == their
5004 if not isinstance(their, self.__class__):
5007 self._value == their._value and
5008 self.tag == their.tag and
5009 self._expl == their._expl
5012 def _encode_time(self):
5013 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5016 self._assert_ready()
5017 value = self._encode_time()
5018 return b"".join((self.tag, len_encode(len(value)), value))
5020 def _encode_cer(self, writer):
5021 write_full(writer, self._encode())
5023 def todatetime(self):
5027 return pp_console_row(next(self.pps()))
5029 def pps(self, decode_path=()):
5032 asn1_type_name=self.asn1_type_name,
5033 obj_name=self.__class__.__name__,
5034 decode_path=decode_path,
5035 value=self._pp_value(),
5036 optional=self.optional,
5037 default=self == self.default,
5038 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5039 expl=None if self._expl is None else tag_decode(self._expl),
5044 expl_offset=self.expl_offset if self.expled else None,
5045 expl_tlen=self.expl_tlen if self.expled else None,
5046 expl_llen=self.expl_llen if self.expled else None,
5047 expl_vlen=self.expl_vlen if self.expled else None,
5048 expl_lenindef=self.expl_lenindef,
5049 ber_encoded=self.ber_encoded,
5052 for pp in self.pps_lenindef(decode_path):
5056 class GeneralizedTime(UTCTime):
5057 """``GeneralizedTime`` datetime type
5059 This type is similar to :py:class:`pyderasn.UTCTime`.
5061 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5062 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5064 '20170930220750.000123Z'
5065 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5066 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5070 Only microsecond fractions are supported in DER encoding.
5071 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5072 higher precision values.
5076 BER encoded data can loss information (accuracy) during decoding
5077 because of float transformations.
5081 Local times (without explicit timezone specification) are treated
5082 as UTC one, no transformations are made.
5086 Zero year is unsupported.
5089 tag_default = tag_encode(24)
5090 asn1_type_name = "GeneralizedTime"
5092 def _dt_sanitize(self, value):
5095 def _strptime_bered(self, value):
5096 if len(value) < 4 + 3 * 2:
5097 raise ValueError("invalid GeneralizedTime")
5098 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5099 decoded = datetime(year, month, day, hour)
5100 offset, value = 0, value[10:]
5102 return offset, decoded
5103 if value[-1] == "Z":
5106 for char, sign in (("-", -1), ("+", 1)):
5107 idx = value.rfind(char)
5110 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5111 v = pureint(offset_raw)
5112 if len(offset_raw) == 4:
5113 offset, v = (60 * (v % 100)), v // 100
5115 raise ValueError("invalid UTC offset minutes")
5116 elif len(offset_raw) == 2:
5119 raise ValueError("invalid UTC offset")
5121 if offset > 14 * 3600:
5122 raise ValueError("too big UTC offset")
5126 return offset, decoded
5127 if value[0] in DECIMAL_SIGNS:
5129 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5132 raise ValueError("stripped minutes")
5133 decoded += timedelta(seconds=60 * pureint(value[:2]))
5136 return offset, decoded
5137 if value[0] in DECIMAL_SIGNS:
5139 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5142 raise ValueError("stripped seconds")
5143 decoded += timedelta(seconds=pureint(value[:2]))
5146 return offset, decoded
5147 if value[0] not in DECIMAL_SIGNS:
5148 raise ValueError("invalid format after seconds")
5150 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5153 def _strptime(self, value):
5155 if l == LEN_YYYYMMDDHHMMSSZ:
5156 # datetime.strptime's format: %Y%m%d%H%M%SZ
5157 if value[-1] != "Z":
5158 raise ValueError("non UTC timezone")
5159 return datetime(*str_to_time_fractions(value[:-1]))
5160 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5161 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5162 if value[-1] != "Z":
5163 raise ValueError("non UTC timezone")
5164 if value[14] != ".":
5165 raise ValueError("no fractions separator")
5168 raise ValueError("trailing zero")
5171 raise ValueError("only microsecond fractions are supported")
5172 us = pureint(us + ("0" * (6 - us_len)))
5173 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5174 return datetime(year, month, day, hour, minute, second, us)
5175 raise ValueError("invalid GeneralizedTime length")
5177 def _encode_time(self):
5179 encoded = value.strftime("%Y%m%d%H%M%S")
5180 if value.microsecond > 0:
5181 encoded += (".%06d" % value.microsecond).rstrip("0")
5182 return (encoded + "Z").encode("ascii")
5185 class GraphicString(CommonString):
5187 tag_default = tag_encode(25)
5188 encoding = "iso-8859-1"
5189 asn1_type_name = "GraphicString"
5192 class ISO646String(VisibleString):
5194 asn1_type_name = "ISO646String"
5197 class GeneralString(CommonString):
5199 tag_default = tag_encode(27)
5200 encoding = "iso-8859-1"
5201 asn1_type_name = "GeneralString"
5204 class UniversalString(CommonString):
5206 tag_default = tag_encode(28)
5207 encoding = "utf-32-be"
5208 asn1_type_name = "UniversalString"
5211 class BMPString(CommonString):
5213 tag_default = tag_encode(30)
5214 encoding = "utf-16-be"
5215 asn1_type_name = "BMPString"
5218 ChoiceState = namedtuple(
5220 BasicState._fields + ("specs", "value",),
5226 """``CHOICE`` special type
5230 class GeneralName(Choice):
5232 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5233 ("dNSName", IA5String(impl=tag_ctxp(2))),
5236 >>> gn = GeneralName()
5238 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5239 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5240 >>> gn["dNSName"] = IA5String("bar.baz")
5241 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5242 >>> gn["rfc822Name"]
5245 [2] IA5String IA5 bar.baz
5248 >>> gn.value == gn["dNSName"]
5251 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5253 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5254 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5256 __slots__ = ("specs",)
5258 asn1_type_name = "CHOICE"
5271 :param value: set the value. Either ``(choice, value)`` tuple, or
5272 :py:class:`pyderasn.Choice` object
5273 :param bytes impl: can not be set, do **not** use it
5274 :param bytes expl: override default tag with ``EXPLICIT`` one
5275 :param default: set default value. Type same as in ``value``
5276 :param bool optional: is object ``OPTIONAL`` in sequence
5278 if impl is not None:
5279 raise ValueError("no implicit tag allowed for CHOICE")
5280 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5282 schema = getattr(self, "schema", ())
5283 if len(schema) == 0:
5284 raise ValueError("schema must be specified")
5286 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5289 if value is not None:
5290 self._value = self._value_sanitize(value)
5291 if default is not None:
5292 default_value = self._value_sanitize(default)
5293 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5294 default_obj.specs = self.specs
5295 default_obj._value = default_value
5296 self.default = default_obj
5298 self._value = copy(default_obj._value)
5299 if self._expl is not None:
5300 tag_class, _, tag_num = tag_decode(self._expl)
5301 self._tag_order = (tag_class, tag_num)
5303 def _value_sanitize(self, value):
5304 if (value.__class__ == tuple) and len(value) == 2:
5306 spec = self.specs.get(choice)
5308 raise ObjUnknown(choice)
5309 if not isinstance(obj, spec.__class__):
5310 raise InvalidValueType((spec,))
5311 return (choice, spec(obj))
5312 if isinstance(value, self.__class__):
5314 raise InvalidValueType((self.__class__, tuple))
5318 return self._value is not None and self._value[1].ready
5322 return self.expl_lenindef or (
5323 (self._value is not None) and
5324 self._value[1].bered
5327 def __getstate__(self):
5345 def __setstate__(self, state):
5346 super(Choice, self).__setstate__(state)
5347 self.specs = state.specs
5348 self._value = state.value
5350 def __eq__(self, their):
5351 if (their.__class__ == tuple) and len(their) == 2:
5352 return self._value == their
5353 if not isinstance(their, self.__class__):
5356 self.specs == their.specs and
5357 self._value == their._value
5367 return self.__class__(
5370 expl=self._expl if expl is None else expl,
5371 default=self.default if default is None else default,
5372 optional=self.optional if optional is None else optional,
5377 """Name of the choice
5379 self._assert_ready()
5380 return self._value[0]
5384 """Value of underlying choice
5386 self._assert_ready()
5387 return self._value[1]
5390 def tag_order(self):
5391 self._assert_ready()
5392 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5395 def tag_order_cer(self):
5396 return min(v.tag_order_cer for v in itervalues(self.specs))
5398 def __getitem__(self, key):
5399 if key not in self.specs:
5400 raise ObjUnknown(key)
5401 if self._value is None:
5403 choice, value = self._value
5408 def __setitem__(self, key, value):
5409 spec = self.specs.get(key)
5411 raise ObjUnknown(key)
5412 if not isinstance(value, spec.__class__):
5413 raise InvalidValueType((spec.__class__,))
5414 self._value = (key, spec(value))
5422 return self._value[1].decoded if self.ready else False
5425 self._assert_ready()
5426 return self._value[1].encode()
5428 def _encode_cer(self, writer):
5429 self._assert_ready()
5430 self._value[1].encode_cer(writer)
5432 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5433 for choice, spec in iteritems(self.specs):
5434 sub_decode_path = decode_path + (choice,)
5440 decode_path=sub_decode_path,
5443 _ctx_immutable=False,
5450 klass=self.__class__,
5451 decode_path=decode_path,
5454 if tag_only: # pragma: no cover
5458 for _decode_path, value, tail in spec.decode_evgen(
5462 decode_path=sub_decode_path,
5464 _ctx_immutable=False,
5466 yield _decode_path, value, tail
5468 _, value, tail = next(spec.decode_evgen(
5472 decode_path=sub_decode_path,
5474 _ctx_immutable=False,
5477 obj = self.__class__(
5480 default=self.default,
5481 optional=self.optional,
5482 _decoded=(offset, 0, value.fulllen),
5484 obj._value = (choice, value)
5485 yield decode_path, obj, tail
5488 value = pp_console_row(next(self.pps()))
5490 value = "%s[%r]" % (value, self.value)
5493 def pps(self, decode_path=()):
5496 asn1_type_name=self.asn1_type_name,
5497 obj_name=self.__class__.__name__,
5498 decode_path=decode_path,
5499 value=self.choice if self.ready else None,
5500 optional=self.optional,
5501 default=self == self.default,
5502 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5503 expl=None if self._expl is None else tag_decode(self._expl),
5508 expl_lenindef=self.expl_lenindef,
5512 yield self.value.pps(decode_path=decode_path + (self.choice,))
5513 for pp in self.pps_lenindef(decode_path):
5517 class PrimitiveTypes(Choice):
5518 """Predefined ``CHOICE`` for all generic primitive types
5520 It could be useful for general decoding of some unspecified values:
5522 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5523 OCTET STRING 3 bytes 666f6f
5524 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5528 schema = tuple((klass.__name__, klass()) for klass in (
5552 AnyState = namedtuple(
5554 BasicState._fields + ("value", "defined"),
5560 """``ANY`` special type
5562 >>> Any(Integer(-123))
5563 ANY INTEGER -123 (0X:7B)
5564 >>> a = Any(OctetString(b"hello world").encode())
5565 ANY 040b68656c6c6f20776f726c64
5566 >>> hexenc(bytes(a))
5567 b'0x040x0bhello world'
5569 __slots__ = ("defined",)
5570 tag_default = tag_encode(0)
5571 asn1_type_name = "ANY"
5581 :param value: set the value. Either any kind of pyderasn's
5582 **ready** object, or bytes. Pay attention that
5583 **no** validation is performed if raw binary value
5584 is valid TLV, except just tag decoding
5585 :param bytes expl: override default tag with ``EXPLICIT`` one
5586 :param bool optional: is object ``OPTIONAL`` in sequence
5588 super(Any, self).__init__(None, expl, None, optional, _decoded)
5592 value = self._value_sanitize(value)
5594 if self._expl is None:
5595 if value.__class__ == binary_type:
5596 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5598 tag_class, tag_num = value.tag_order
5600 tag_class, _, tag_num = tag_decode(self._expl)
5601 self._tag_order = (tag_class, tag_num)
5604 def _value_sanitize(self, value):
5605 if value.__class__ == binary_type:
5607 raise ValueError("Any value can not be empty")
5609 if isinstance(value, self.__class__):
5611 if not isinstance(value, Obj):
5612 raise InvalidValueType((self.__class__, Obj, binary_type))
5617 return self._value is not None
5620 def tag_order(self):
5621 self._assert_ready()
5622 return self._tag_order
5626 if self.expl_lenindef or self.lenindef:
5628 if self.defined is None:
5630 return self.defined[1].bered
5632 def __getstate__(self):
5650 def __setstate__(self, state):
5651 super(Any, self).__setstate__(state)
5652 self._value = state.value
5653 self.defined = state.defined
5655 def __eq__(self, their):
5656 if their.__class__ == binary_type:
5657 if self._value.__class__ == binary_type:
5658 return self._value == their
5659 return self._value.encode() == their
5660 if issubclass(their.__class__, Any):
5661 if self.ready and their.ready:
5662 return bytes(self) == bytes(their)
5663 return self.ready == their.ready
5672 return self.__class__(
5674 expl=self._expl if expl is None else expl,
5675 optional=self.optional if optional is None else optional,
5678 def __bytes__(self):
5679 self._assert_ready()
5681 if value.__class__ == binary_type:
5683 return self._value.encode()
5690 self._assert_ready()
5692 if value.__class__ == binary_type:
5694 return value.encode()
5696 def _encode_cer(self, writer):
5697 self._assert_ready()
5699 if value.__class__ == binary_type:
5700 write_full(writer, value)
5702 value.encode_cer(writer)
5704 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5706 t, tlen, lv = tag_strip(tlv)
5707 except DecodeError as err:
5708 raise err.__class__(
5710 klass=self.__class__,
5711 decode_path=decode_path,
5715 l, llen, v = len_decode(lv)
5716 except LenIndefForm as err:
5717 if not ctx.get("bered", False):
5718 raise err.__class__(
5720 klass=self.__class__,
5721 decode_path=decode_path,
5724 llen, vlen, v = 1, 0, lv[1:]
5725 sub_offset = offset + tlen + llen
5727 while v[:EOC_LEN].tobytes() != EOC:
5728 chunk, v = Any().decode(
5731 decode_path=decode_path + (str(chunk_i),),
5734 _ctx_immutable=False,
5736 vlen += chunk.tlvlen
5737 sub_offset += chunk.tlvlen
5739 tlvlen = tlen + llen + vlen + EOC_LEN
5740 obj = self.__class__(
5741 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5743 optional=self.optional,
5744 _decoded=(offset, 0, tlvlen),
5747 obj.tag = t.tobytes()
5748 yield decode_path, obj, v[EOC_LEN:]
5750 except DecodeError as err:
5751 raise err.__class__(
5753 klass=self.__class__,
5754 decode_path=decode_path,
5758 raise NotEnoughData(
5759 "encoded length is longer than data",
5760 klass=self.__class__,
5761 decode_path=decode_path,
5764 tlvlen = tlen + llen + l
5765 v, tail = tlv[:tlvlen], v[l:]
5766 obj = self.__class__(
5767 value=None if evgen_mode else v.tobytes(),
5769 optional=self.optional,
5770 _decoded=(offset, 0, tlvlen),
5772 obj.tag = t.tobytes()
5773 yield decode_path, obj, tail
5776 return pp_console_row(next(self.pps()))
5778 def pps(self, decode_path=()):
5782 elif value.__class__ == binary_type:
5788 asn1_type_name=self.asn1_type_name,
5789 obj_name=self.__class__.__name__,
5790 decode_path=decode_path,
5792 blob=self._value if self._value.__class__ == binary_type else None,
5793 optional=self.optional,
5794 default=self == self.default,
5795 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5796 expl=None if self._expl is None else tag_decode(self._expl),
5801 expl_offset=self.expl_offset if self.expled else None,
5802 expl_tlen=self.expl_tlen if self.expled else None,
5803 expl_llen=self.expl_llen if self.expled else None,
5804 expl_vlen=self.expl_vlen if self.expled else None,
5805 expl_lenindef=self.expl_lenindef,
5806 lenindef=self.lenindef,
5809 defined_by, defined = self.defined or (None, None)
5810 if defined_by is not None:
5812 decode_path=decode_path + (DecodePathDefBy(defined_by),)
5814 for pp in self.pps_lenindef(decode_path):
5818 ########################################################################
5819 # ASN.1 constructed types
5820 ########################################################################
5822 def abs_decode_path(decode_path, rel_path):
5823 """Create an absolute decode path from current and relative ones
5825 :param decode_path: current decode path, starting point. Tuple of strings
5826 :param rel_path: relative path to ``decode_path``. Tuple of strings.
5827 If first tuple's element is "/", then treat it as
5828 an absolute path, ignoring ``decode_path`` as
5829 starting point. Also this tuple can contain ".."
5830 elements, stripping the leading element from
5833 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
5834 ("foo", "bar", "baz", "whatever")
5835 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
5837 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
5840 if rel_path[0] == "/":
5842 if rel_path[0] == "..":
5843 return abs_decode_path(decode_path[:-1], rel_path[1:])
5844 return decode_path + rel_path
5847 SequenceState = namedtuple(
5849 BasicState._fields + ("specs", "value",),
5854 class Sequence(Obj):
5855 """``SEQUENCE`` structure type
5857 You have to make specification of sequence::
5859 class Extension(Sequence):
5861 ("extnID", ObjectIdentifier()),
5862 ("critical", Boolean(default=False)),
5863 ("extnValue", OctetString()),
5866 Then, you can work with it as with dictionary.
5868 >>> ext = Extension()
5869 >>> Extension().specs
5871 ('extnID', OBJECT IDENTIFIER),
5872 ('critical', BOOLEAN False OPTIONAL DEFAULT),
5873 ('extnValue', OCTET STRING),
5875 >>> ext["extnID"] = "1.2.3"
5876 Traceback (most recent call last):
5877 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
5878 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
5880 You can determine if sequence is ready to be encoded:
5885 Traceback (most recent call last):
5886 pyderasn.ObjNotReady: object is not ready: extnValue
5887 >>> ext["extnValue"] = OctetString(b"foobar")
5891 Value you want to assign, must have the same **type** as in
5892 corresponding specification, but it can have different tags,
5893 optional/default attributes -- they will be taken from specification
5896 class TBSCertificate(Sequence):
5898 ("version", Version(expl=tag_ctxc(0), default="v1")),
5901 >>> tbs = TBSCertificate()
5902 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
5904 Assign ``None`` to remove value from sequence.
5906 You can set values in Sequence during its initialization:
5908 >>> AlgorithmIdentifier((
5909 ("algorithm", ObjectIdentifier("1.2.3")),
5910 ("parameters", Any(Null()))
5912 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
5914 You can determine if value exists/set in the sequence and take its value:
5916 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
5919 OBJECT IDENTIFIER 1.2.3
5921 But pay attention that if value has default, then it won't be (not
5922 in) in the sequence (because ``DEFAULT`` must not be encoded in
5923 DER), but you can read its value:
5925 >>> "critical" in ext, ext["critical"]
5926 (False, BOOLEAN False)
5927 >>> ext["critical"] = Boolean(True)
5928 >>> "critical" in ext, ext["critical"]
5929 (True, BOOLEAN True)
5931 All defaulted values are always optional.
5933 .. _allow_default_values_ctx:
5935 DER prohibits default value encoding and will raise an error if
5936 default value is unexpectedly met during decode.
5937 If :ref:`bered <bered_ctx>` context option is set, then no error
5938 will be raised, but ``bered`` attribute set. You can disable strict
5939 defaulted values existence validation by setting
5940 ``"allow_default_values": True`` :ref:`context <ctx>` option.
5944 Check for default value existence is not performed in
5945 ``evgen_mode``, because previously decoded values are not stored
5946 in memory, to be able to compare them.
5948 Two sequences are equal if they have equal specification (schema),
5949 implicit/explicit tagging and the same values.
5951 __slots__ = ("specs",)
5952 tag_default = tag_encode(form=TagFormConstructed, num=16)
5953 asn1_type_name = "SEQUENCE"
5965 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
5967 schema = getattr(self, "schema", ())
5969 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5972 if value is not None:
5973 if issubclass(value.__class__, Sequence):
5974 self._value = value._value
5975 elif hasattr(value, "__iter__"):
5976 for seq_key, seq_value in value:
5977 self[seq_key] = seq_value
5979 raise InvalidValueType((Sequence,))
5980 if default is not None:
5981 if not issubclass(default.__class__, Sequence):
5982 raise InvalidValueType((Sequence,))
5983 default_value = default._value
5984 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5985 default_obj.specs = self.specs
5986 default_obj._value = default_value
5987 self.default = default_obj
5989 self._value = copy(default_obj._value)
5993 for name, spec in iteritems(self.specs):
5994 value = self._value.get(name)
6005 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6007 return any(value.bered for value in itervalues(self._value))
6009 def __getstate__(self):
6010 return SequenceState(
6024 {k: copy(v) for k, v in iteritems(self._value)},
6027 def __setstate__(self, state):
6028 super(Sequence, self).__setstate__(state)
6029 self.specs = state.specs
6030 self._value = state.value
6032 def __eq__(self, their):
6033 if not isinstance(their, self.__class__):
6036 self.specs == their.specs and
6037 self.tag == their.tag and
6038 self._expl == their._expl and
6039 self._value == their._value
6050 return self.__class__(
6053 impl=self.tag if impl is None else impl,
6054 expl=self._expl if expl is None else expl,
6055 default=self.default if default is None else default,
6056 optional=self.optional if optional is None else optional,
6059 def __contains__(self, key):
6060 return key in self._value
6062 def __setitem__(self, key, value):
6063 spec = self.specs.get(key)
6065 raise ObjUnknown(key)
6067 self._value.pop(key, None)
6069 if not isinstance(value, spec.__class__):
6070 raise InvalidValueType((spec.__class__,))
6071 value = spec(value=value)
6072 if spec.default is not None and value == spec.default:
6073 self._value.pop(key, None)
6075 self._value[key] = value
6077 def __getitem__(self, key):
6078 value = self._value.get(key)
6079 if value is not None:
6081 spec = self.specs.get(key)
6083 raise ObjUnknown(key)
6084 if spec.default is not None:
6088 def _values_for_encoding(self):
6089 for name, spec in iteritems(self.specs):
6090 value = self._value.get(name)
6094 raise ObjNotReady(name)
6098 v = b"".join(v.encode() for v in self._values_for_encoding())
6099 return b"".join((self.tag, len_encode(len(v)), v))
6101 def _encode_cer(self, writer):
6102 write_full(writer, self.tag + LENINDEF)
6103 for v in self._values_for_encoding():
6104 v.encode_cer(writer)
6105 write_full(writer, EOC)
6107 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6109 t, tlen, lv = tag_strip(tlv)
6110 except DecodeError as err:
6111 raise err.__class__(
6113 klass=self.__class__,
6114 decode_path=decode_path,
6119 klass=self.__class__,
6120 decode_path=decode_path,
6123 if tag_only: # pragma: no cover
6127 ctx_bered = ctx.get("bered", False)
6129 l, llen, v = len_decode(lv)
6130 except LenIndefForm as err:
6132 raise err.__class__(
6134 klass=self.__class__,
6135 decode_path=decode_path,
6138 l, llen, v = 0, 1, lv[1:]
6140 except DecodeError as err:
6141 raise err.__class__(
6143 klass=self.__class__,
6144 decode_path=decode_path,
6148 raise NotEnoughData(
6149 "encoded length is longer than data",
6150 klass=self.__class__,
6151 decode_path=decode_path,
6155 v, tail = v[:l], v[l:]
6157 sub_offset = offset + tlen + llen
6160 ctx_allow_default_values = ctx.get("allow_default_values", False)
6161 for name, spec in iteritems(self.specs):
6162 if spec.optional and (
6163 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6167 sub_decode_path = decode_path + (name,)
6170 for _decode_path, value, v_tail in spec.decode_evgen(
6174 decode_path=sub_decode_path,
6176 _ctx_immutable=False,
6178 yield _decode_path, value, v_tail
6180 _, value, v_tail = next(spec.decode_evgen(
6184 decode_path=sub_decode_path,
6186 _ctx_immutable=False,
6189 except TagMismatch as err:
6190 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6194 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6195 if not evgen_mode and defined is not None:
6196 defined_by, defined_spec = defined
6197 if issubclass(value.__class__, SequenceOf):
6198 for i, _value in enumerate(value):
6199 sub_sub_decode_path = sub_decode_path + (
6201 DecodePathDefBy(defined_by),
6203 defined_value, defined_tail = defined_spec.decode(
6204 memoryview(bytes(_value)),
6206 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6207 if value.expled else (value.tlen + value.llen)
6210 decode_path=sub_sub_decode_path,
6212 _ctx_immutable=False,
6214 if len(defined_tail) > 0:
6217 klass=self.__class__,
6218 decode_path=sub_sub_decode_path,
6221 _value.defined = (defined_by, defined_value)
6223 defined_value, defined_tail = defined_spec.decode(
6224 memoryview(bytes(value)),
6226 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6227 if value.expled else (value.tlen + value.llen)
6230 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6232 _ctx_immutable=False,
6234 if len(defined_tail) > 0:
6237 klass=self.__class__,
6238 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6241 value.defined = (defined_by, defined_value)
6243 value_len = value.fulllen
6245 sub_offset += value_len
6248 if spec.default is not None and value == spec.default:
6249 # This will not work in evgen_mode
6250 if ctx_bered or ctx_allow_default_values:
6254 "DEFAULT value met",
6255 klass=self.__class__,
6256 decode_path=sub_decode_path,
6259 values[name] = value
6260 spec_defines = getattr(spec, "defines", ())
6261 if len(spec_defines) == 0:
6262 defines_by_path = ctx.get("defines_by_path", ())
6263 if len(defines_by_path) > 0:
6264 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6265 if spec_defines is not None and len(spec_defines) > 0:
6266 for rel_path, schema in spec_defines:
6267 defined = schema.get(value, None)
6268 if defined is not None:
6269 ctx.setdefault("_defines", []).append((
6270 abs_decode_path(sub_decode_path[:-1], rel_path),
6274 if v[:EOC_LEN].tobytes() != EOC:
6277 klass=self.__class__,
6278 decode_path=decode_path,
6286 klass=self.__class__,
6287 decode_path=decode_path,
6290 obj = self.__class__(
6294 default=self.default,
6295 optional=self.optional,
6296 _decoded=(offset, llen, vlen),
6299 obj.lenindef = lenindef
6300 obj.ber_encoded = ber_encoded
6301 yield decode_path, obj, tail
6304 value = pp_console_row(next(self.pps()))
6306 for name in self.specs:
6307 _value = self._value.get(name)
6310 cols.append("%s: %s" % (name, repr(_value)))
6311 return "%s[%s]" % (value, "; ".join(cols))
6313 def pps(self, decode_path=()):
6316 asn1_type_name=self.asn1_type_name,
6317 obj_name=self.__class__.__name__,
6318 decode_path=decode_path,
6319 optional=self.optional,
6320 default=self == self.default,
6321 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6322 expl=None if self._expl is None else tag_decode(self._expl),
6327 expl_offset=self.expl_offset if self.expled else None,
6328 expl_tlen=self.expl_tlen if self.expled else None,
6329 expl_llen=self.expl_llen if self.expled else None,
6330 expl_vlen=self.expl_vlen if self.expled else None,
6331 expl_lenindef=self.expl_lenindef,
6332 lenindef=self.lenindef,
6333 ber_encoded=self.ber_encoded,
6336 for name in self.specs:
6337 value = self._value.get(name)
6340 yield value.pps(decode_path=decode_path + (name,))
6341 for pp in self.pps_lenindef(decode_path):
6345 class Set(Sequence):
6346 """``SET`` structure type
6348 Its usage is identical to :py:class:`pyderasn.Sequence`.
6350 .. _allow_unordered_set_ctx:
6352 DER prohibits unordered values encoding and will raise an error
6353 during decode. If :ref:`bered <bered_ctx>` context option is set,
6354 then no error will occur. Also you can disable strict values
6355 ordering check by setting ``"allow_unordered_set": True``
6356 :ref:`context <ctx>` option.
6359 tag_default = tag_encode(form=TagFormConstructed, num=17)
6360 asn1_type_name = "SET"
6363 v = b"".join(value.encode() for value in sorted(
6364 self._values_for_encoding(),
6365 key=attrgetter("tag_order"),
6367 return b"".join((self.tag, len_encode(len(v)), v))
6369 def _encode_cer(self, writer):
6370 write_full(writer, self.tag + LENINDEF)
6372 self._values_for_encoding(),
6373 key=attrgetter("tag_order_cer"),
6375 v.encode_cer(writer)
6376 write_full(writer, EOC)
6378 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6380 t, tlen, lv = tag_strip(tlv)
6381 except DecodeError as err:
6382 raise err.__class__(
6384 klass=self.__class__,
6385 decode_path=decode_path,
6390 klass=self.__class__,
6391 decode_path=decode_path,
6398 ctx_bered = ctx.get("bered", False)
6400 l, llen, v = len_decode(lv)
6401 except LenIndefForm as err:
6403 raise err.__class__(
6405 klass=self.__class__,
6406 decode_path=decode_path,
6409 l, llen, v = 0, 1, lv[1:]
6411 except DecodeError as err:
6412 raise err.__class__(
6414 klass=self.__class__,
6415 decode_path=decode_path,
6419 raise NotEnoughData(
6420 "encoded length is longer than data",
6421 klass=self.__class__,
6425 v, tail = v[:l], v[l:]
6427 sub_offset = offset + tlen + llen
6430 ctx_allow_default_values = ctx.get("allow_default_values", False)
6431 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6432 tag_order_prev = (0, 0)
6433 _specs_items = copy(self.specs)
6436 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6438 for name, spec in iteritems(_specs_items):
6439 sub_decode_path = decode_path + (name,)
6445 decode_path=sub_decode_path,
6448 _ctx_immutable=False,
6455 klass=self.__class__,
6456 decode_path=decode_path,
6460 for _decode_path, value, v_tail in spec.decode_evgen(
6464 decode_path=sub_decode_path,
6466 _ctx_immutable=False,
6468 yield _decode_path, value, v_tail
6470 _, value, v_tail = next(spec.decode_evgen(
6474 decode_path=sub_decode_path,
6476 _ctx_immutable=False,
6479 value_tag_order = value.tag_order
6480 value_len = value.fulllen
6481 if tag_order_prev >= value_tag_order:
6482 if ctx_bered or ctx_allow_unordered_set:
6486 "unordered " + self.asn1_type_name,
6487 klass=self.__class__,
6488 decode_path=sub_decode_path,
6491 if spec.default is None or value != spec.default:
6493 elif ctx_bered or ctx_allow_default_values:
6497 "DEFAULT value met",
6498 klass=self.__class__,
6499 decode_path=sub_decode_path,
6502 values[name] = value
6503 del _specs_items[name]
6504 tag_order_prev = value_tag_order
6505 sub_offset += value_len
6509 obj = self.__class__(
6513 default=self.default,
6514 optional=self.optional,
6515 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6518 if v[:EOC_LEN].tobytes() != EOC:
6521 klass=self.__class__,
6522 decode_path=decode_path,
6527 for name, spec in iteritems(self.specs):
6528 if name not in values and not spec.optional:
6530 "%s value is not ready" % name,
6531 klass=self.__class__,
6532 decode_path=decode_path,
6537 obj.ber_encoded = ber_encoded
6538 yield decode_path, obj, tail
6541 SequenceOfState = namedtuple(
6543 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6548 class SequenceOf(Obj):
6549 """``SEQUENCE OF`` sequence type
6551 For that kind of type you must specify the object it will carry on
6552 (bounds are for example here, not required)::
6554 class Ints(SequenceOf):
6559 >>> ints.append(Integer(123))
6560 >>> ints.append(Integer(234))
6562 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6563 >>> [int(i) for i in ints]
6565 >>> ints.append(Integer(345))
6566 Traceback (most recent call last):
6567 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6570 >>> ints[1] = Integer(345)
6572 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6574 You can initialize sequence with preinitialized values:
6576 >>> ints = Ints([Integer(123), Integer(234)])
6578 Also you can use iterator as a value:
6580 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6582 And it won't be iterated until encoding process. Pay attention that
6583 bounds and required schema checks are done only during the encoding
6584 process in that case! After encode was called, then value is zeroed
6585 back to empty list and you have to set it again. That mode is useful
6586 mainly with CER encoding mode, where all objects from the iterable
6587 will be streamed to the buffer, without copying all of them to
6590 __slots__ = ("spec", "_bound_min", "_bound_max")
6591 tag_default = tag_encode(form=TagFormConstructed, num=16)
6592 asn1_type_name = "SEQUENCE OF"
6605 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6607 schema = getattr(self, "schema", None)
6609 raise ValueError("schema must be specified")
6611 self._bound_min, self._bound_max = getattr(
6615 ) if bounds is None else bounds
6617 if value is not None:
6618 self._value = self._value_sanitize(value)
6619 if default is not None:
6620 default_value = self._value_sanitize(default)
6621 default_obj = self.__class__(
6626 default_obj._value = default_value
6627 self.default = default_obj
6629 self._value = copy(default_obj._value)
6631 def _value_sanitize(self, value):
6633 if issubclass(value.__class__, SequenceOf):
6634 value = value._value
6635 elif hasattr(value, NEXT_ATTR_NAME):
6637 elif hasattr(value, "__iter__"):
6640 raise InvalidValueType((self.__class__, iter, "iterator"))
6642 if not self._bound_min <= len(value) <= self._bound_max:
6643 raise BoundsError(self._bound_min, len(value), self._bound_max)
6644 class_expected = self.spec.__class__
6646 if not isinstance(v, class_expected):
6647 raise InvalidValueType((class_expected,))
6652 if hasattr(self._value, NEXT_ATTR_NAME):
6654 if self._bound_min > 0 and len(self._value) == 0:
6656 return all(v.ready for v in self._value)
6660 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6662 return any(v.bered for v in self._value)
6664 def __getstate__(self):
6665 if hasattr(self._value, NEXT_ATTR_NAME):
6666 raise ValueError("can not pickle SequenceOf with iterator")
6667 return SequenceOfState(
6681 [copy(v) for v in self._value],
6686 def __setstate__(self, state):
6687 super(SequenceOf, self).__setstate__(state)
6688 self.spec = state.spec
6689 self._value = state.value
6690 self._bound_min = state.bound_min
6691 self._bound_max = state.bound_max
6693 def __eq__(self, their):
6694 if isinstance(their, self.__class__):
6696 self.spec == their.spec and
6697 self.tag == their.tag and
6698 self._expl == their._expl and
6699 self._value == their._value
6701 if hasattr(their, "__iter__"):
6702 return self._value == list(their)
6714 return self.__class__(
6718 (self._bound_min, self._bound_max)
6719 if bounds is None else bounds
6721 impl=self.tag if impl is None else impl,
6722 expl=self._expl if expl is None else expl,
6723 default=self.default if default is None else default,
6724 optional=self.optional if optional is None else optional,
6727 def __contains__(self, key):
6728 return key in self._value
6730 def append(self, value):
6731 if not isinstance(value, self.spec.__class__):
6732 raise InvalidValueType((self.spec.__class__,))
6733 if len(self._value) + 1 > self._bound_max:
6736 len(self._value) + 1,
6739 self._value.append(value)
6742 return iter(self._value)
6745 return len(self._value)
6747 def __setitem__(self, key, value):
6748 if not isinstance(value, self.spec.__class__):
6749 raise InvalidValueType((self.spec.__class__,))
6750 self._value[key] = self.spec(value=value)
6752 def __getitem__(self, key):
6753 return self._value[key]
6755 def _values_for_encoding(self):
6756 return iter(self._value)
6759 iterator = hasattr(self._value, NEXT_ATTR_NAME)
6762 values_append = values.append
6763 class_expected = self.spec.__class__
6764 values_for_encoding = self._values_for_encoding()
6766 for v in values_for_encoding:
6767 if not isinstance(v, class_expected):
6768 raise InvalidValueType((class_expected,))
6769 values_append(v.encode())
6770 if not self._bound_min <= len(values) <= self._bound_max:
6771 raise BoundsError(self._bound_min, len(values), self._bound_max)
6772 value = b"".join(values)
6774 value = b"".join(v.encode() for v in self._values_for_encoding())
6775 return b"".join((self.tag, len_encode(len(value)), value))
6777 def _encode_cer(self, writer):
6778 write_full(writer, self.tag + LENINDEF)
6779 iterator = hasattr(self._value, NEXT_ATTR_NAME)
6781 class_expected = self.spec.__class__
6783 values_for_encoding = self._values_for_encoding()
6785 for v in values_for_encoding:
6786 if not isinstance(v, class_expected):
6787 raise InvalidValueType((class_expected,))
6788 v.encode_cer(writer)
6790 if not self._bound_min <= values_count <= self._bound_max:
6791 raise BoundsError(self._bound_min, values_count, self._bound_max)
6793 for v in self._values_for_encoding():
6794 v.encode_cer(writer)
6795 write_full(writer, EOC)
6805 ordering_check=False,
6808 t, tlen, lv = tag_strip(tlv)
6809 except DecodeError as err:
6810 raise err.__class__(
6812 klass=self.__class__,
6813 decode_path=decode_path,
6818 klass=self.__class__,
6819 decode_path=decode_path,
6826 ctx_bered = ctx.get("bered", False)
6828 l, llen, v = len_decode(lv)
6829 except LenIndefForm as err:
6831 raise err.__class__(
6833 klass=self.__class__,
6834 decode_path=decode_path,
6837 l, llen, v = 0, 1, lv[1:]
6839 except DecodeError as err:
6840 raise err.__class__(
6842 klass=self.__class__,
6843 decode_path=decode_path,
6847 raise NotEnoughData(
6848 "encoded length is longer than data",
6849 klass=self.__class__,
6850 decode_path=decode_path,
6854 v, tail = v[:l], v[l:]
6856 sub_offset = offset + tlen + llen
6859 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6860 value_prev = memoryview(v[:0])
6864 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6866 sub_decode_path = decode_path + (str(_value_count),)
6868 for _decode_path, value, v_tail in spec.decode_evgen(
6872 decode_path=sub_decode_path,
6874 _ctx_immutable=False,
6876 yield _decode_path, value, v_tail
6878 _, value, v_tail = next(spec.decode_evgen(
6882 decode_path=sub_decode_path,
6884 _ctx_immutable=False,
6887 value_len = value.fulllen
6889 if value_prev.tobytes() > v[:value_len].tobytes():
6890 if ctx_bered or ctx_allow_unordered_set:
6894 "unordered " + self.asn1_type_name,
6895 klass=self.__class__,
6896 decode_path=sub_decode_path,
6899 value_prev = v[:value_len]
6902 _value.append(value)
6903 sub_offset += value_len
6906 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
6908 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
6909 klass=self.__class__,
6910 decode_path=decode_path,
6914 obj = self.__class__(
6915 value=None if evgen_mode else _value,
6917 bounds=(self._bound_min, self._bound_max),
6920 default=self.default,
6921 optional=self.optional,
6922 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6924 except BoundsError as err:
6927 klass=self.__class__,
6928 decode_path=decode_path,
6932 if v[:EOC_LEN].tobytes() != EOC:
6935 klass=self.__class__,
6936 decode_path=decode_path,
6941 obj.ber_encoded = ber_encoded
6942 yield decode_path, obj, tail
6946 pp_console_row(next(self.pps())),
6947 ", ".join(repr(v) for v in self._value),
6950 def pps(self, decode_path=()):
6953 asn1_type_name=self.asn1_type_name,
6954 obj_name=self.__class__.__name__,
6955 decode_path=decode_path,
6956 optional=self.optional,
6957 default=self == self.default,
6958 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6959 expl=None if self._expl is None else tag_decode(self._expl),
6964 expl_offset=self.expl_offset if self.expled else None,
6965 expl_tlen=self.expl_tlen if self.expled else None,
6966 expl_llen=self.expl_llen if self.expled else None,
6967 expl_vlen=self.expl_vlen if self.expled else None,
6968 expl_lenindef=self.expl_lenindef,
6969 lenindef=self.lenindef,
6970 ber_encoded=self.ber_encoded,
6973 for i, value in enumerate(self._value):
6974 yield value.pps(decode_path=decode_path + (str(i),))
6975 for pp in self.pps_lenindef(decode_path):
6979 class SetOf(SequenceOf):
6980 """``SET OF`` sequence type
6982 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
6985 tag_default = tag_encode(form=TagFormConstructed, num=17)
6986 asn1_type_name = "SET OF"
6988 def _value_sanitize(self, value):
6989 value = super(SetOf, self)._value_sanitize(value)
6990 if hasattr(value, NEXT_ATTR_NAME):
6992 "SetOf does not support iterator values, as no sense in them"
6997 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
6998 return b"".join((self.tag, len_encode(len(v)), v))
7000 def _encode_cer(self, writer):
7001 write_full(writer, self.tag + LENINDEF)
7002 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7003 write_full(writer, v)
7004 write_full(writer, EOC)
7006 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7007 return super(SetOf, self)._decode(
7014 ordering_check=True,
7018 def obj_by_path(pypath): # pragma: no cover
7019 """Import object specified as string Python path
7021 Modules must be separated from classes/functions with ``:``.
7023 >>> obj_by_path("foo.bar:Baz")
7024 <class 'foo.bar.Baz'>
7025 >>> obj_by_path("foo.bar:Baz.boo")
7026 <classmethod 'foo.bar.Baz.boo'>
7028 mod, objs = pypath.rsplit(":", 1)
7029 from importlib import import_module
7030 obj = import_module(mod)
7031 for obj_name in objs.split("."):
7032 obj = getattr(obj, obj_name)
7036 def generic_decoder(): # pragma: no cover
7037 # All of this below is a big hack with self references
7038 choice = PrimitiveTypes()
7039 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7040 choice.specs["SetOf"] = SetOf(schema=choice)
7041 for i in six_xrange(31):
7042 choice.specs["SequenceOf%d" % i] = SequenceOf(
7046 choice.specs["Any"] = Any()
7048 # Class name equals to type name, to omit it from output
7049 class SEQUENCEOF(SequenceOf):
7057 with_decode_path=False,
7058 decode_path_only=(),
7061 def _pprint_pps(pps):
7063 if hasattr(pp, "_fields"):
7065 decode_path_only != () and
7066 pp.decode_path[:len(decode_path_only)] != decode_path_only
7069 if pp.asn1_type_name == Choice.asn1_type_name:
7071 pp_kwargs = pp._asdict()
7072 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7073 pp = _pp(**pp_kwargs)
7074 yield pp_console_row(
7079 with_colours=with_colours,
7080 with_decode_path=with_decode_path,
7081 decode_path_len_decrease=len(decode_path_only),
7083 for row in pp_console_blob(
7085 decode_path_len_decrease=len(decode_path_only),
7089 for row in _pprint_pps(pp):
7091 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7092 return SEQUENCEOF(), pprint_any
7095 def main(): # pragma: no cover
7097 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7098 parser.add_argument(
7102 help="Skip that number of bytes from the beginning",
7104 parser.add_argument(
7106 help="Python paths to dictionary with OIDs, comma separated",
7108 parser.add_argument(
7110 help="Python path to schema definition to use",
7112 parser.add_argument(
7113 "--defines-by-path",
7114 help="Python path to decoder's defines_by_path",
7116 parser.add_argument(
7118 action="store_true",
7119 help="Disallow BER encoding",
7121 parser.add_argument(
7122 "--print-decode-path",
7123 action="store_true",
7124 help="Print decode paths",
7126 parser.add_argument(
7127 "--decode-path-only",
7128 help="Print only specified decode path",
7130 parser.add_argument(
7132 action="store_true",
7133 help="Allow explicit tag out-of-bound",
7135 parser.add_argument(
7137 action="store_true",
7138 help="Turn on event generation mode",
7140 parser.add_argument(
7142 type=argparse.FileType("rb"),
7143 help="Path to BER/CER/DER file you want to decode",
7145 args = parser.parse_args()
7147 args.RAWFile.seek(args.skip)
7148 raw = memoryview(args.RAWFile.read())
7149 args.RAWFile.close()
7151 raw = file_mmaped(args.RAWFile)[args.skip:]
7153 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7154 if args.oids else ()
7156 from functools import partial
7158 schema = obj_by_path(args.schema)
7159 pprinter = partial(pprint, big_blobs=True)
7161 schema, pprinter = generic_decoder()
7163 "bered": not args.nobered,
7164 "allow_expl_oob": args.allow_expl_oob,
7166 if args.defines_by_path is not None:
7167 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7168 from os import environ
7172 with_colours=environ.get("NO_COLOR") is None,
7173 with_decode_path=args.print_decode_path,
7175 () if args.decode_path_only is None else
7176 tuple(args.decode_path_only.split(":"))
7180 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7181 print(pprinter(obj, decode_path=decode_path))
7183 obj, tail = schema().decode(raw, ctx=ctx)
7184 print(pprinter(obj))
7186 print("\nTrailing data: %s" % hexenc(tail))
7189 if __name__ == "__main__":