3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
673 POSIX compliant systems have ``mmap`` syscall, giving ability to work
674 the memory mapped file. You can deal with the file like it was an
675 ordinary binary string, allowing you not to load it to the memory first.
676 Also you can use them as an input for OCTET STRING, taking no Python
677 memory for their storage.
679 There is convenient :py:func:`pyderasn.file_mmaped` function that
680 creates read-only memoryview on the file contents::
682 with open("huge", "rb") as fd:
683 raw = file_mmaped(fd)
684 obj = Something.decode(raw)
688 mmap-ed files in Python2.7 does not implement buffer protocol, so
689 memoryview won't work on them.
693 mmap maps the **whole** file. So it plays no role if you seek-ed it
694 before. Take the slice of the resulting memoryview with required
699 If you use ZFS as underlying storage, then pay attention that
700 currently most platforms does not deal good with ZFS ARC and ordinary
701 page cache used for mmaps. It can take twice the necessary size in
702 the memory: both in page cache and ZFS ARC.
707 We can parse any kind of data now, but how can we produce files
708 streamingly, without storing their encoded representation in memory?
709 SEQUENCE by default encodes in memory all its values, joins them in huge
710 binary string, just to know the exact size of SEQUENCE's value for
711 encoding it in TLV. DER requires you to know all exact sizes of the
714 You can use CER encoding mode, that slightly differs from the DER, but
715 does not require exact sizes knowledge, allowing streaming encoding
716 directly to some writer/buffer. Just use
717 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
718 encoded data will flow::
720 opener = io.open if PY2 else open
721 with opener("result", "wb") as fd:
722 obj.encode_cer(fd.write)
727 obj.encode_cer(buf.write)
729 If you do not want to create in-memory buffer every time, then you can
730 use :py:func:`pyderasn.encode_cer` function::
732 data = encode_cer(obj)
734 Remember that CER is **not valid** DER in most cases, so you **have to**
735 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
736 decoding. Also currently there is **no** validation that provided CER is
737 valid one -- you are sure that it has only valid BER encoding.
741 SET OF values can not be streamingly encoded, because they are
742 required to be sorted byte-by-byte. Big SET OF values still will take
743 much memory. Use neither SET nor SET OF values, as modern ASN.1
746 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
747 OCTET STRINGs! They will be streamingly copied from underlying file to
748 the buffer using 1 KB chunks.
750 Some structures require that some of the elements have to be forcefully
751 DER encoded. For example ``SignedData`` CMS requires you to encode
752 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
753 encode everything else in BER. You can tell any of the structures to be
754 forcefully encoded in DER during CER encoding, by specifying
755 ``der_forced=True`` attribute::
757 class Certificate(Sequence):
761 class SignedAttributes(SetOf):
769 In most cases, huge quantity of binary data is stored as OCTET STRING.
770 CER encoding splits it on 1 KB chunks. BER allows splitting on various
771 levels of chunks inclusion::
773 SOME STRING[CONSTRUCTED]
774 OCTET STRING[CONSTRUCTED]
775 OCTET STRING[PRIMITIVE]
777 OCTET STRING[PRIMITIVE]
779 OCTET STRING[PRIMITIVE]
781 OCTET STRING[PRIMITIVE]
783 OCTET STRING[CONSTRUCTED]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[CONSTRUCTED]
789 OCTET STRING[CONSTRUCTED]
790 OCTET STRING[PRIMITIVE]
793 You can not just take the offset and some ``.vlen`` of the STRING and
794 treat it as the payload. If you decode it without
795 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
796 and ``bytes()`` will give the whole payload contents.
798 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
799 small memory footprint. There is convenient
800 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
801 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
802 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
803 SHA512 digest of its ``eContent``::
805 fd = open("data.p7m", "rb")
806 raw = file_mmaped(fd)
807 ctx = {"bered": True}
808 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
809 if decode_path == ("content",):
813 raise ValueError("no content found")
814 hasher_state = sha512()
816 hasher_state.update(data)
818 evgens = SignedData().decode_evgen(
819 raw[content.offset:],
820 offset=content.offset,
823 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
825 digest = hasher_state.digest()
827 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
828 copy the payload (without BER/CER encoding interleaved overhead) in it.
829 Virtually it won't take memory more than for keeping small structures
830 and 1 KB binary chunks.
832 SEQUENCE OF iterators
833 _____________________
835 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
836 classes. The only difference with providing the full list of objects, is
837 that type and bounds checking is done during encoding process. Also
838 sequence's value will be emptied after encoding, forcing you to set its
841 This is very useful when you have to create some huge objects, like
842 CRLs, with thousands and millions of entities inside. You can write the
843 generator taking necessary data from the database and giving the
844 ``RevokedCertificate`` objects. Only binary representation of that
845 objects will take memory during DER encoding.
849 .. autoclass:: pyderasn.Obj
857 .. autoclass:: pyderasn.Boolean
862 .. autoclass:: pyderasn.Integer
863 :members: __init__, named
867 .. autoclass:: pyderasn.BitString
868 :members: __init__, bit_len, named
872 .. autoclass:: pyderasn.OctetString
877 .. autoclass:: pyderasn.Null
882 .. autoclass:: pyderasn.ObjectIdentifier
887 .. autoclass:: pyderasn.Enumerated
891 .. autoclass:: pyderasn.CommonString
895 .. autoclass:: pyderasn.NumericString
899 .. autoclass:: pyderasn.PrintableString
900 :members: __init__, allow_asterisk, allow_ampersand
904 .. autoclass:: pyderasn.UTCTime
905 :members: __init__, todatetime
909 .. autoclass:: pyderasn.GeneralizedTime
910 :members: __init__, todatetime
917 .. autoclass:: pyderasn.Choice
918 :members: __init__, choice, value
922 .. autoclass:: PrimitiveTypes
926 .. autoclass:: pyderasn.Any
934 .. autoclass:: pyderasn.Sequence
939 .. autoclass:: pyderasn.Set
944 .. autoclass:: pyderasn.SequenceOf
949 .. autoclass:: pyderasn.SetOf
955 .. autofunction:: pyderasn.abs_decode_path
956 .. autofunction:: pyderasn.agg_octet_string
957 .. autofunction:: pyderasn.colonize_hex
958 .. autofunction:: pyderasn.encode_cer
959 .. autofunction:: pyderasn.file_mmaped
960 .. autofunction:: pyderasn.hexenc
961 .. autofunction:: pyderasn.hexdec
962 .. autofunction:: pyderasn.tag_encode
963 .. autofunction:: pyderasn.tag_decode
964 .. autofunction:: pyderasn.tag_ctxp
965 .. autofunction:: pyderasn.tag_ctxc
966 .. autoclass:: pyderasn.DecodeError
968 .. autoclass:: pyderasn.NotEnoughData
969 .. autoclass:: pyderasn.ExceedingData
970 .. autoclass:: pyderasn.LenIndefForm
971 .. autoclass:: pyderasn.TagMismatch
972 .. autoclass:: pyderasn.InvalidLength
973 .. autoclass:: pyderasn.InvalidOID
974 .. autoclass:: pyderasn.ObjUnknown
975 .. autoclass:: pyderasn.ObjNotReady
976 .. autoclass:: pyderasn.InvalidValueType
977 .. autoclass:: pyderasn.BoundsError
984 You can decode DER/BER files using command line abilities::
986 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
988 If there is no schema for your file, then you can try parsing it without,
989 but of course IMPLICIT tags will often make it impossible. But result is
990 good enough for the certificate above::
992 $ python -m pyderasn path/to/file
993 0 [1,3,1604] . >: SEQUENCE OF
994 4 [1,3,1453] . . >: SEQUENCE OF
995 8 [0,0, 5] . . . . >: [0] ANY
996 . . . . . A0:03:02:01:02
997 13 [1,1, 3] . . . . >: INTEGER 61595
998 18 [1,1, 13] . . . . >: SEQUENCE OF
999 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1000 31 [1,1, 0] . . . . . . >: NULL
1001 33 [1,3, 274] . . . . >: SEQUENCE OF
1002 37 [1,1, 11] . . . . . . >: SET OF
1003 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1004 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1005 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1007 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1008 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1009 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1010 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1011 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1012 . . . . . . . . . 61:2E:63:6F:6D:2F
1013 1461 [1,1, 13] . . >: SEQUENCE OF
1014 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1015 1474 [1,1, 0] . . . . >: NULL
1016 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1017 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1018 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1024 If you have got dictionaries with ObjectIdentifiers, like example one
1025 from ``tests/test_crts.py``::
1028 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1029 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1031 "2.5.4.10": "id-at-organizationName",
1032 "2.5.4.11": "id-at-organizationalUnitName",
1035 then you can pass it to pretty printer to see human readable OIDs::
1037 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1039 37 [1,1, 11] . . . . . . >: SET OF
1040 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1041 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1042 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1043 50 [1,1, 18] . . . . . . >: SET OF
1044 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1045 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1046 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1047 70 [1,1, 18] . . . . . . >: SET OF
1048 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1049 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1050 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1056 Each decoded element has so-called decode path: sequence of structure
1057 names it is passing during the decode process. Each element has its own
1058 unique path inside the whole ASN.1 tree. You can print it out with
1059 ``--print-decode-path`` option::
1061 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1062 0 [1,3,1604] Certificate SEQUENCE []
1063 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1064 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1065 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1066 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1067 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1068 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1070 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1071 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1072 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1073 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1074 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1075 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1076 . . . . . . . 13:02:45:53
1077 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1080 Now you can print only the specified tree, for example signature algorithm::
1082 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1083 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1084 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1085 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1089 from array import array
1090 from codecs import getdecoder
1091 from codecs import getencoder
1092 from collections import namedtuple
1093 from collections import OrderedDict
1094 from copy import copy
1095 from datetime import datetime
1096 from datetime import timedelta
1097 from io import BytesIO
1098 from math import ceil
1099 from mmap import mmap
1100 from mmap import PROT_READ
1101 from operator import attrgetter
1102 from string import ascii_letters
1103 from string import digits
1104 from sys import version_info
1105 from unicodedata import category as unicat
1107 from six import add_metaclass
1108 from six import binary_type
1109 from six import byte2int
1110 from six import indexbytes
1111 from six import int2byte
1112 from six import integer_types
1113 from six import iterbytes
1114 from six import iteritems
1115 from six import itervalues
1117 from six import string_types
1118 from six import text_type
1119 from six import unichr as six_unichr
1120 from six.moves import xrange as six_xrange
1124 from termcolor import colored
1125 except ImportError: # pragma: no cover
1126 def colored(what, *args, **kwargs):
1175 "TagClassApplication",
1178 "TagClassUniversal",
1179 "TagFormConstructed",
1190 TagClassUniversal = 0
1191 TagClassApplication = 1 << 6
1192 TagClassContext = 1 << 7
1193 TagClassPrivate = 1 << 6 | 1 << 7
1194 TagFormPrimitive = 0
1195 TagFormConstructed = 1 << 5
1197 TagClassContext: "",
1198 TagClassApplication: "APPLICATION ",
1199 TagClassPrivate: "PRIVATE ",
1200 TagClassUniversal: "UNIV ",
1204 LENINDEF = b"\x80" # length indefinite mark
1205 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1206 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1207 SET01 = frozenset("01")
1208 DECIMALS = frozenset(digits)
1209 DECIMAL_SIGNS = ".,"
1210 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1213 def file_mmaped(fd):
1214 """Make mmap-ed memoryview for reading from file
1216 :param fd: file object
1217 :returns: memoryview over read-only mmap-ing of the whole file
1219 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1222 if not set(value) <= DECIMALS:
1223 raise ValueError("non-pure integer")
1226 def fractions2float(fractions_raw):
1227 pureint(fractions_raw)
1228 return float("0." + fractions_raw)
1231 def get_def_by_path(defines_by_path, sub_decode_path):
1232 """Get define by decode path
1234 for path, define in defines_by_path:
1235 if len(path) != len(sub_decode_path):
1237 for p1, p2 in zip(path, sub_decode_path):
1238 if (not p1 is any) and (p1 != p2):
1244 ########################################################################
1246 ########################################################################
1248 class ASN1Error(ValueError):
1252 class DecodeError(ASN1Error):
1253 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1255 :param str msg: reason of decode failing
1256 :param klass: optional exact DecodeError inherited class (like
1257 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1258 :py:exc:`InvalidLength`)
1259 :param decode_path: tuple of strings. It contains human
1260 readable names of the fields through which
1261 decoding process has passed
1262 :param int offset: binary offset where failure happened
1264 super(DecodeError, self).__init__()
1267 self.decode_path = decode_path
1268 self.offset = offset
1273 "" if self.klass is None else self.klass.__name__,
1275 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1276 if len(self.decode_path) > 0 else ""
1278 ("(at %d)" % self.offset) if self.offset > 0 else "",
1284 return "%s(%s)" % (self.__class__.__name__, self)
1287 class NotEnoughData(DecodeError):
1291 class ExceedingData(ASN1Error):
1292 def __init__(self, nbytes):
1293 super(ExceedingData, self).__init__()
1294 self.nbytes = nbytes
1297 return "%d trailing bytes" % self.nbytes
1300 return "%s(%s)" % (self.__class__.__name__, self)
1303 class LenIndefForm(DecodeError):
1307 class TagMismatch(DecodeError):
1311 class InvalidLength(DecodeError):
1315 class InvalidOID(DecodeError):
1319 class ObjUnknown(ASN1Error):
1320 def __init__(self, name):
1321 super(ObjUnknown, self).__init__()
1325 return "object is unknown: %s" % self.name
1328 return "%s(%s)" % (self.__class__.__name__, self)
1331 class ObjNotReady(ASN1Error):
1332 def __init__(self, name):
1333 super(ObjNotReady, self).__init__()
1337 return "object is not ready: %s" % self.name
1340 return "%s(%s)" % (self.__class__.__name__, self)
1343 class InvalidValueType(ASN1Error):
1344 def __init__(self, expected_types):
1345 super(InvalidValueType, self).__init__()
1346 self.expected_types = expected_types
1349 return "invalid value type, expected: %s" % ", ".join(
1350 [repr(t) for t in self.expected_types]
1354 return "%s(%s)" % (self.__class__.__name__, self)
1357 class BoundsError(ASN1Error):
1358 def __init__(self, bound_min, value, bound_max):
1359 super(BoundsError, self).__init__()
1360 self.bound_min = bound_min
1362 self.bound_max = bound_max
1365 return "unsatisfied bounds: %s <= %s <= %s" % (
1372 return "%s(%s)" % (self.__class__.__name__, self)
1375 ########################################################################
1377 ########################################################################
1379 _hexdecoder = getdecoder("hex")
1380 _hexencoder = getencoder("hex")
1384 """Binary data to hexadecimal string convert
1386 return _hexdecoder(data)[0]
1390 """Hexadecimal string to binary data convert
1392 return _hexencoder(data)[0].decode("ascii")
1395 def int_bytes_len(num, byte_len=8):
1398 return int(ceil(float(num.bit_length()) / byte_len))
1401 def zero_ended_encode(num):
1402 octets = bytearray(int_bytes_len(num, 7))
1404 octets[i] = num & 0x7F
1408 octets[i] = 0x80 | (num & 0x7F)
1411 return bytes(octets)
1414 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1415 """Encode tag to binary form
1417 :param int num: tag's number
1418 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1419 :py:data:`pyderasn.TagClassContext`,
1420 :py:data:`pyderasn.TagClassApplication`,
1421 :py:data:`pyderasn.TagClassPrivate`)
1422 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1423 :py:data:`pyderasn.TagFormConstructed`)
1427 return int2byte(klass | form | num)
1428 # [XX|X|11111][1.......][1.......] ... [0.......]
1429 return int2byte(klass | form | 31) + zero_ended_encode(num)
1432 def tag_decode(tag):
1433 """Decode tag from binary form
1437 No validation is performed, assuming that it has already passed.
1439 It returns tuple with three integers, as
1440 :py:func:`pyderasn.tag_encode` accepts.
1442 first_octet = byte2int(tag)
1443 klass = first_octet & 0xC0
1444 form = first_octet & 0x20
1445 if first_octet & 0x1F < 0x1F:
1446 return (klass, form, first_octet & 0x1F)
1448 for octet in iterbytes(tag[1:]):
1451 return (klass, form, num)
1455 """Create CONTEXT PRIMITIVE tag
1457 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1461 """Create CONTEXT CONSTRUCTED tag
1463 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1466 def tag_strip(data):
1467 """Take off tag from the data
1469 :returns: (encoded tag, tag length, remaining data)
1472 raise NotEnoughData("no data at all")
1473 if byte2int(data) & 0x1F < 31:
1474 return data[:1], 1, data[1:]
1479 raise DecodeError("unfinished tag")
1480 if indexbytes(data, i) & 0x80 == 0:
1483 return data[:i], i, data[i:]
1489 octets = bytearray(int_bytes_len(l) + 1)
1490 octets[0] = 0x80 | (len(octets) - 1)
1491 for i in six_xrange(len(octets) - 1, 0, -1):
1492 octets[i] = l & 0xFF
1494 return bytes(octets)
1497 def len_decode(data):
1500 :returns: (decoded length, length's length, remaining data)
1501 :raises LenIndefForm: if indefinite form encoding is met
1504 raise NotEnoughData("no data at all")
1505 first_octet = byte2int(data)
1506 if first_octet & 0x80 == 0:
1507 return first_octet, 1, data[1:]
1508 octets_num = first_octet & 0x7F
1509 if octets_num + 1 > len(data):
1510 raise NotEnoughData("encoded length is longer than data")
1512 raise LenIndefForm()
1513 if byte2int(data[1:]) == 0:
1514 raise DecodeError("leading zeros")
1516 for v in iterbytes(data[1:1 + octets_num]):
1519 raise DecodeError("long form instead of short one")
1520 return l, 1 + octets_num, data[1 + octets_num:]
1523 LEN0 = len_encode(0)
1524 LEN1 = len_encode(1)
1525 LEN1K = len_encode(1000)
1528 def write_full(writer, data):
1529 """Fully write provided data
1531 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1533 BytesIO does not guarantee that the whole data will be written at
1534 once. That function write everything provided, raising an error if
1535 ``writer`` returns None.
1537 data = memoryview(data)
1539 while written != len(data):
1540 n = writer(data[written:])
1542 raise ValueError("can not write to buf")
1546 ########################################################################
1548 ########################################################################
1550 class AutoAddSlots(type):
1551 def __new__(cls, name, bases, _dict):
1552 _dict["__slots__"] = _dict.get("__slots__", ())
1553 return type.__new__(cls, name, bases, _dict)
1556 BasicState = namedtuple("BasicState", (
1569 ), **NAMEDTUPLE_KWARGS)
1572 @add_metaclass(AutoAddSlots)
1574 """Common ASN.1 object class
1576 All ASN.1 types are inherited from it. It has metaclass that
1577 automatically adds ``__slots__`` to all inherited classes.
1602 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1603 self._expl = getattr(self, "expl", None) if expl is None else expl
1604 if self.tag != self.tag_default and self._expl is not None:
1605 raise ValueError("implicit and explicit tags can not be set simultaneously")
1606 if self.tag is None:
1607 self._tag_order = None
1609 tag_class, _, tag_num = tag_decode(
1610 self.tag if self._expl is None else self._expl
1612 self._tag_order = (tag_class, tag_num)
1613 if default is not None:
1615 self.optional = optional
1616 self.offset, self.llen, self.vlen = _decoded
1618 self.expl_lenindef = False
1619 self.lenindef = False
1620 self.ber_encoded = False
1623 def ready(self): # pragma: no cover
1624 """Is object ready to be encoded?
1626 raise NotImplementedError()
1628 def _assert_ready(self):
1630 raise ObjNotReady(self.__class__.__name__)
1634 """Is either object or any elements inside is BER encoded?
1636 return self.expl_lenindef or self.lenindef or self.ber_encoded
1640 """Is object decoded?
1642 return (self.llen + self.vlen) > 0
1644 def __getstate__(self): # pragma: no cover
1645 """Used for making safe to be mutable pickleable copies
1647 raise NotImplementedError()
1649 def __setstate__(self, state):
1650 if state.version != __version__:
1651 raise ValueError("data is pickled by different PyDERASN version")
1652 self.tag = state.tag
1653 self._tag_order = state.tag_order
1654 self._expl = state.expl
1655 self.default = state.default
1656 self.optional = state.optional
1657 self.offset = state.offset
1658 self.llen = state.llen
1659 self.vlen = state.vlen
1660 self.expl_lenindef = state.expl_lenindef
1661 self.lenindef = state.lenindef
1662 self.ber_encoded = state.ber_encoded
1665 def tag_order(self):
1666 """Tag's (class, number) used for DER/CER sorting
1668 return self._tag_order
1671 def tag_order_cer(self):
1672 return self.tag_order
1676 """See :ref:`decoding`
1678 return len(self.tag)
1682 """See :ref:`decoding`
1684 return self.tlen + self.llen + self.vlen
1686 def __str__(self): # pragma: no cover
1687 return self.__bytes__() if PY2 else self.__unicode__()
1689 def __ne__(self, their):
1690 return not(self == their)
1692 def __gt__(self, their): # pragma: no cover
1693 return not(self < their)
1695 def __le__(self, their): # pragma: no cover
1696 return (self == their) or (self < their)
1698 def __ge__(self, their): # pragma: no cover
1699 return (self == their) or (self > their)
1701 def _encode(self): # pragma: no cover
1702 raise NotImplementedError()
1704 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1705 yield NotImplemented
1708 """DER encode the structure
1710 :returns: DER representation
1712 raw = self._encode()
1713 if self._expl is None:
1715 return b"".join((self._expl, len_encode(len(raw)), raw))
1717 def encode_cer(self, writer):
1718 """CER encode the structure to specified writer
1720 :param writer: must comply with ``io.RawIOBase.write``
1721 behaviour. It takes slice to be written and
1722 returns number of bytes processed. If it returns
1723 None, then exception will be raised
1725 if self._expl is not None:
1726 write_full(writer, self._expl + LENINDEF)
1727 if getattr(self, "der_forced", False):
1728 write_full(writer, self._encode())
1730 self._encode_cer(writer)
1731 if self._expl is not None:
1732 write_full(writer, EOC)
1734 def _encode_cer(self, writer):
1735 write_full(writer, self._encode())
1737 def hexencode(self):
1738 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1740 return hexenc(self.encode())
1750 _ctx_immutable=True,
1754 :param data: either binary or memoryview
1755 :param int offset: initial data's offset
1756 :param bool leavemm: do we need to leave memoryview of remaining
1757 data as is, or convert it to bytes otherwise
1758 :param decode_path: current decode path (tuples of strings,
1759 possibly with DecodePathDefBy) with will be
1760 the root for all underlying objects
1761 :param ctx: optional :ref:`context <ctx>` governing decoding process
1762 :param bool tag_only: decode only the tag, without length and
1763 contents (used only in Choice and Set
1764 structures, trying to determine if tag satisfies
1766 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1768 :returns: (Obj, remaining data)
1770 .. seealso:: :ref:`decoding`
1772 result = next(self.decode_evgen(
1784 _, obj, tail = result
1795 _ctx_immutable=True,
1798 """Decode with evgen mode on
1800 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1801 it returns the generator producing ``(decode_path, obj, tail)``
1802 values. See :ref:`evgen mode <evgen_mode>`.
1806 elif _ctx_immutable:
1808 tlv = memoryview(data)
1811 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1814 if self._expl is None:
1815 for result in self._decode(
1818 decode_path=decode_path,
1821 evgen_mode=_evgen_mode,
1826 _decode_path, obj, tail = result
1827 if not _decode_path is decode_path:
1831 t, tlen, lv = tag_strip(tlv)
1832 except DecodeError as err:
1833 raise err.__class__(
1835 klass=self.__class__,
1836 decode_path=decode_path,
1841 klass=self.__class__,
1842 decode_path=decode_path,
1846 l, llen, v = len_decode(lv)
1847 except LenIndefForm as err:
1848 if not ctx.get("bered", False):
1849 raise err.__class__(
1851 klass=self.__class__,
1852 decode_path=decode_path,
1856 offset += tlen + llen
1857 for result in self._decode(
1860 decode_path=decode_path,
1863 evgen_mode=_evgen_mode,
1865 if tag_only: # pragma: no cover
1868 _decode_path, obj, tail = result
1869 if not _decode_path is decode_path:
1871 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1872 if eoc_expected.tobytes() != EOC:
1875 klass=self.__class__,
1876 decode_path=decode_path,
1880 obj.expl_lenindef = True
1881 except DecodeError as err:
1882 raise err.__class__(
1884 klass=self.__class__,
1885 decode_path=decode_path,
1890 raise NotEnoughData(
1891 "encoded length is longer than data",
1892 klass=self.__class__,
1893 decode_path=decode_path,
1896 for result in self._decode(
1898 offset=offset + tlen + llen,
1899 decode_path=decode_path,
1902 evgen_mode=_evgen_mode,
1904 if tag_only: # pragma: no cover
1907 _decode_path, obj, tail = result
1908 if not _decode_path is decode_path:
1910 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
1912 "explicit tag out-of-bound, longer than data",
1913 klass=self.__class__,
1914 decode_path=decode_path,
1917 yield decode_path, obj, (tail if leavemm else tail.tobytes())
1919 def decod(self, data, offset=0, decode_path=(), ctx=None):
1920 """Decode the data, check that tail is empty
1922 :raises ExceedingData: if tail is not empty
1924 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
1925 (decode without tail) that also checks that there is no
1928 obj, tail = self.decode(
1931 decode_path=decode_path,
1936 raise ExceedingData(len(tail))
1939 def hexdecode(self, data, *args, **kwargs):
1940 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
1942 return self.decode(hexdec(data), *args, **kwargs)
1944 def hexdecod(self, data, *args, **kwargs):
1945 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
1947 return self.decod(hexdec(data), *args, **kwargs)
1951 """See :ref:`decoding`
1953 return self._expl is not None
1957 """See :ref:`decoding`
1962 def expl_tlen(self):
1963 """See :ref:`decoding`
1965 return len(self._expl)
1968 def expl_llen(self):
1969 """See :ref:`decoding`
1971 if self.expl_lenindef:
1973 return len(len_encode(self.tlvlen))
1976 def expl_offset(self):
1977 """See :ref:`decoding`
1979 return self.offset - self.expl_tlen - self.expl_llen
1982 def expl_vlen(self):
1983 """See :ref:`decoding`
1988 def expl_tlvlen(self):
1989 """See :ref:`decoding`
1991 return self.expl_tlen + self.expl_llen + self.expl_vlen
1994 def fulloffset(self):
1995 """See :ref:`decoding`
1997 return self.expl_offset if self.expled else self.offset
2001 """See :ref:`decoding`
2003 return self.expl_tlvlen if self.expled else self.tlvlen
2005 def pps_lenindef(self, decode_path):
2006 if self.lenindef and not (
2007 getattr(self, "defined", None) is not None and
2008 self.defined[1].lenindef
2011 asn1_type_name="EOC",
2013 decode_path=decode_path,
2015 self.offset + self.tlvlen -
2016 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2024 if self.expl_lenindef:
2026 asn1_type_name="EOC",
2027 obj_name="EXPLICIT",
2028 decode_path=decode_path,
2029 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2038 def encode_cer(obj):
2039 """Encode to CER in memory buffer
2041 :returns bytes: memory buffer contents
2044 obj.encode_cer(buf.write)
2045 return buf.getvalue()
2048 class DecodePathDefBy(object):
2049 """DEFINED BY representation inside decode path
2051 __slots__ = ("defined_by",)
2053 def __init__(self, defined_by):
2054 self.defined_by = defined_by
2056 def __ne__(self, their):
2057 return not(self == their)
2059 def __eq__(self, their):
2060 if not isinstance(their, self.__class__):
2062 return self.defined_by == their.defined_by
2065 return "DEFINED BY " + str(self.defined_by)
2068 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2071 ########################################################################
2073 ########################################################################
2075 PP = namedtuple("PP", (
2098 ), **NAMEDTUPLE_KWARGS)
2103 asn1_type_name="unknown",
2120 expl_lenindef=False,
2151 def _colourize(what, colour, with_colours, attrs=("bold",)):
2152 return colored(what, colour, attrs=attrs) if with_colours else what
2155 def colonize_hex(hexed):
2156 """Separate hexadecimal string with colons
2158 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2167 with_decode_path=False,
2168 decode_path_len_decrease=0,
2175 " " if pp.expl_offset is None else
2176 ("-%d" % (pp.offset - pp.expl_offset))
2178 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2180 col = _colourize(col, "red", with_colours, ())
2181 col += _colourize("B", "red", with_colours) if pp.bered else " "
2183 col = "[%d,%d,%4d]%s" % (
2187 LENINDEF_PP_CHAR if pp.lenindef else " "
2189 col = _colourize(col, "green", with_colours, ())
2191 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2192 if decode_path_len > 0:
2193 cols.append(" ." * decode_path_len)
2194 ent = pp.decode_path[-1]
2195 if isinstance(ent, DecodePathDefBy):
2196 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2197 value = str(ent.defined_by)
2200 len(oid_maps) > 0 and
2201 ent.defined_by.asn1_type_name ==
2202 ObjectIdentifier.asn1_type_name
2204 for oid_map in oid_maps:
2205 oid_name = oid_map.get(value)
2206 if oid_name is not None:
2207 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2209 if oid_name is None:
2210 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2212 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2213 if pp.expl is not None:
2214 klass, _, num = pp.expl
2215 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2216 cols.append(_colourize(col, "blue", with_colours))
2217 if pp.impl is not None:
2218 klass, _, num = pp.impl
2219 col = "[%s%d]" % (TagClassReprs[klass], num)
2220 cols.append(_colourize(col, "blue", with_colours))
2221 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2222 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2224 cols.append(_colourize("BER", "red", with_colours))
2225 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2226 if pp.value is not None:
2228 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2230 len(oid_maps) > 0 and
2231 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2233 for oid_map in oid_maps:
2234 oid_name = oid_map.get(value)
2235 if oid_name is not None:
2236 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2238 if pp.asn1_type_name == Integer.asn1_type_name:
2239 hex_repr = hex(int(pp.obj._value))[2:].upper()
2240 if len(hex_repr) % 2 != 0:
2241 hex_repr = "0" + hex_repr
2242 cols.append(_colourize(
2243 "(%s)" % colonize_hex(hex_repr),
2248 if pp.blob.__class__ == binary_type:
2249 cols.append(hexenc(pp.blob))
2250 elif pp.blob.__class__ == tuple:
2251 cols.append(", ".join(pp.blob))
2253 cols.append(_colourize("OPTIONAL", "red", with_colours))
2255 cols.append(_colourize("DEFAULT", "red", with_colours))
2256 if with_decode_path:
2257 cols.append(_colourize(
2258 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2262 return " ".join(cols)
2265 def pp_console_blob(pp, decode_path_len_decrease=0):
2266 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2267 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2268 if decode_path_len > 0:
2269 cols.append(" ." * (decode_path_len + 1))
2270 if pp.blob.__class__ == binary_type:
2271 blob = hexenc(pp.blob).upper()
2272 for i in six_xrange(0, len(blob), 32):
2273 chunk = blob[i:i + 32]
2274 yield " ".join(cols + [colonize_hex(chunk)])
2275 elif pp.blob.__class__ == tuple:
2276 yield " ".join(cols + [", ".join(pp.blob)])
2284 with_decode_path=False,
2285 decode_path_only=(),
2288 """Pretty print object
2290 :param Obj obj: object you want to pretty print
2291 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2292 Its human readable form is printed when OID is met
2293 :param big_blobs: if large binary objects are met (like OctetString
2294 values), do we need to print them too, on separate
2296 :param with_colours: colourize output, if ``termcolor`` library
2298 :param with_decode_path: print decode path
2299 :param decode_path_only: print only that specified decode path
2301 def _pprint_pps(pps):
2303 if hasattr(pp, "_fields"):
2305 decode_path_only != () and
2307 str(p) for p in pp.decode_path[:len(decode_path_only)]
2308 ) != decode_path_only
2312 yield pp_console_row(
2317 with_colours=with_colours,
2318 with_decode_path=with_decode_path,
2319 decode_path_len_decrease=len(decode_path_only),
2321 for row in pp_console_blob(
2323 decode_path_len_decrease=len(decode_path_only),
2327 yield pp_console_row(
2332 with_colours=with_colours,
2333 with_decode_path=with_decode_path,
2334 decode_path_len_decrease=len(decode_path_only),
2337 for row in _pprint_pps(pp):
2339 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2342 ########################################################################
2343 # ASN.1 primitive types
2344 ########################################################################
2346 BooleanState = namedtuple(
2348 BasicState._fields + ("value",),
2354 """``BOOLEAN`` boolean type
2356 >>> b = Boolean(True)
2358 >>> b == Boolean(True)
2364 tag_default = tag_encode(1)
2365 asn1_type_name = "BOOLEAN"
2377 :param value: set the value. Either boolean type, or
2378 :py:class:`pyderasn.Boolean` object
2379 :param bytes impl: override default tag with ``IMPLICIT`` one
2380 :param bytes expl: override default tag with ``EXPLICIT`` one
2381 :param default: set default value. Type same as in ``value``
2382 :param bool optional: is object ``OPTIONAL`` in sequence
2384 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2385 self._value = None if value is None else self._value_sanitize(value)
2386 if default is not None:
2387 default = self._value_sanitize(default)
2388 self.default = self.__class__(
2394 self._value = default
2396 def _value_sanitize(self, value):
2397 if value.__class__ == bool:
2399 if issubclass(value.__class__, Boolean):
2401 raise InvalidValueType((self.__class__, bool))
2405 return self._value is not None
2407 def __getstate__(self):
2408 return BooleanState(
2424 def __setstate__(self, state):
2425 super(Boolean, self).__setstate__(state)
2426 self._value = state.value
2428 def __nonzero__(self):
2429 self._assert_ready()
2433 self._assert_ready()
2436 def __eq__(self, their):
2437 if their.__class__ == bool:
2438 return self._value == their
2439 if not issubclass(their.__class__, Boolean):
2442 self._value == their._value and
2443 self.tag == their.tag and
2444 self._expl == their._expl
2455 return self.__class__(
2457 impl=self.tag if impl is None else impl,
2458 expl=self._expl if expl is None else expl,
2459 default=self.default if default is None else default,
2460 optional=self.optional if optional is None else optional,
2464 self._assert_ready()
2465 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2467 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2469 t, _, lv = tag_strip(tlv)
2470 except DecodeError as err:
2471 raise err.__class__(
2473 klass=self.__class__,
2474 decode_path=decode_path,
2479 klass=self.__class__,
2480 decode_path=decode_path,
2487 l, _, v = len_decode(lv)
2488 except DecodeError as err:
2489 raise err.__class__(
2491 klass=self.__class__,
2492 decode_path=decode_path,
2496 raise InvalidLength(
2497 "Boolean's length must be equal to 1",
2498 klass=self.__class__,
2499 decode_path=decode_path,
2503 raise NotEnoughData(
2504 "encoded length is longer than data",
2505 klass=self.__class__,
2506 decode_path=decode_path,
2509 first_octet = byte2int(v)
2511 if first_octet == 0:
2513 elif first_octet == 0xFF:
2515 elif ctx.get("bered", False):
2520 "unacceptable Boolean value",
2521 klass=self.__class__,
2522 decode_path=decode_path,
2525 obj = self.__class__(
2529 default=self.default,
2530 optional=self.optional,
2531 _decoded=(offset, 1, 1),
2533 obj.ber_encoded = ber_encoded
2534 yield decode_path, obj, v[1:]
2537 return pp_console_row(next(self.pps()))
2539 def pps(self, decode_path=()):
2542 asn1_type_name=self.asn1_type_name,
2543 obj_name=self.__class__.__name__,
2544 decode_path=decode_path,
2545 value=str(self._value) if self.ready else None,
2546 optional=self.optional,
2547 default=self == self.default,
2548 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2549 expl=None if self._expl is None else tag_decode(self._expl),
2554 expl_offset=self.expl_offset if self.expled else None,
2555 expl_tlen=self.expl_tlen if self.expled else None,
2556 expl_llen=self.expl_llen if self.expled else None,
2557 expl_vlen=self.expl_vlen if self.expled else None,
2558 expl_lenindef=self.expl_lenindef,
2559 ber_encoded=self.ber_encoded,
2562 for pp in self.pps_lenindef(decode_path):
2566 IntegerState = namedtuple(
2568 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2574 """``INTEGER`` integer type
2576 >>> b = Integer(-123)
2578 >>> b == Integer(-123)
2583 >>> Integer(2, bounds=(1, 3))
2585 >>> Integer(5, bounds=(1, 3))
2586 Traceback (most recent call last):
2587 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2591 class Version(Integer):
2598 >>> v = Version("v1")
2605 {'v3': 2, 'v1': 0, 'v2': 1}
2607 __slots__ = ("specs", "_bound_min", "_bound_max")
2608 tag_default = tag_encode(2)
2609 asn1_type_name = "INTEGER"
2623 :param value: set the value. Either integer type, named value
2624 (if ``schema`` is specified in the class), or
2625 :py:class:`pyderasn.Integer` object
2626 :param bounds: set ``(MIN, MAX)`` value constraint.
2627 (-inf, +inf) by default
2628 :param bytes impl: override default tag with ``IMPLICIT`` one
2629 :param bytes expl: override default tag with ``EXPLICIT`` one
2630 :param default: set default value. Type same as in ``value``
2631 :param bool optional: is object ``OPTIONAL`` in sequence
2633 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2635 specs = getattr(self, "schema", {}) if _specs is None else _specs
2636 self.specs = specs if specs.__class__ == dict else dict(specs)
2637 self._bound_min, self._bound_max = getattr(
2640 (float("-inf"), float("+inf")),
2641 ) if bounds is None else bounds
2642 if value is not None:
2643 self._value = self._value_sanitize(value)
2644 if default is not None:
2645 default = self._value_sanitize(default)
2646 self.default = self.__class__(
2652 if self._value is None:
2653 self._value = default
2655 def _value_sanitize(self, value):
2656 if isinstance(value, integer_types):
2658 elif issubclass(value.__class__, Integer):
2659 value = value._value
2660 elif value.__class__ == str:
2661 value = self.specs.get(value)
2663 raise ObjUnknown("integer value: %s" % value)
2665 raise InvalidValueType((self.__class__, int, str))
2666 if not self._bound_min <= value <= self._bound_max:
2667 raise BoundsError(self._bound_min, value, self._bound_max)
2672 return self._value is not None
2674 def __getstate__(self):
2675 return IntegerState(
2694 def __setstate__(self, state):
2695 super(Integer, self).__setstate__(state)
2696 self.specs = state.specs
2697 self._value = state.value
2698 self._bound_min = state.bound_min
2699 self._bound_max = state.bound_max
2702 self._assert_ready()
2703 return int(self._value)
2706 self._assert_ready()
2707 return hash(b"".join((
2709 bytes(self._expl or b""),
2710 str(self._value).encode("ascii"),
2713 def __eq__(self, their):
2714 if isinstance(their, integer_types):
2715 return self._value == their
2716 if not issubclass(their.__class__, Integer):
2719 self._value == their._value and
2720 self.tag == their.tag and
2721 self._expl == their._expl
2724 def __lt__(self, their):
2725 return self._value < their._value
2729 """Return named representation (if exists) of the value
2731 for name, value in iteritems(self.specs):
2732 if value == self._value:
2745 return self.__class__(
2748 (self._bound_min, self._bound_max)
2749 if bounds is None else bounds
2751 impl=self.tag if impl is None else impl,
2752 expl=self._expl if expl is None else expl,
2753 default=self.default if default is None else default,
2754 optional=self.optional if optional is None else optional,
2759 self._assert_ready()
2763 octets = bytearray([0])
2767 octets = bytearray()
2769 octets.append((value & 0xFF) ^ 0xFF)
2771 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2774 octets = bytearray()
2776 octets.append(value & 0xFF)
2778 if octets[-1] & 0x80 > 0:
2781 octets = bytes(octets)
2783 bytes_len = ceil(value.bit_length() / 8) or 1
2786 octets = value.to_bytes(
2791 except OverflowError:
2795 return b"".join((self.tag, len_encode(len(octets)), octets))
2797 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2799 t, _, lv = tag_strip(tlv)
2800 except DecodeError as err:
2801 raise err.__class__(
2803 klass=self.__class__,
2804 decode_path=decode_path,
2809 klass=self.__class__,
2810 decode_path=decode_path,
2817 l, llen, v = len_decode(lv)
2818 except DecodeError as err:
2819 raise err.__class__(
2821 klass=self.__class__,
2822 decode_path=decode_path,
2826 raise NotEnoughData(
2827 "encoded length is longer than data",
2828 klass=self.__class__,
2829 decode_path=decode_path,
2833 raise NotEnoughData(
2835 klass=self.__class__,
2836 decode_path=decode_path,
2839 v, tail = v[:l], v[l:]
2840 first_octet = byte2int(v)
2842 second_octet = byte2int(v[1:])
2844 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2845 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
2848 "non normalized integer",
2849 klass=self.__class__,
2850 decode_path=decode_path,
2855 if first_octet & 0x80 > 0:
2856 octets = bytearray()
2857 for octet in bytearray(v):
2858 octets.append(octet ^ 0xFF)
2859 for octet in octets:
2860 value = (value << 8) | octet
2864 for octet in bytearray(v):
2865 value = (value << 8) | octet
2867 value = int.from_bytes(v, byteorder="big", signed=True)
2869 obj = self.__class__(
2871 bounds=(self._bound_min, self._bound_max),
2874 default=self.default,
2875 optional=self.optional,
2877 _decoded=(offset, llen, l),
2879 except BoundsError as err:
2882 klass=self.__class__,
2883 decode_path=decode_path,
2886 yield decode_path, obj, tail
2889 return pp_console_row(next(self.pps()))
2891 def pps(self, decode_path=()):
2894 asn1_type_name=self.asn1_type_name,
2895 obj_name=self.__class__.__name__,
2896 decode_path=decode_path,
2897 value=(self.named or str(self._value)) if self.ready else None,
2898 optional=self.optional,
2899 default=self == self.default,
2900 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2901 expl=None if self._expl is None else tag_decode(self._expl),
2906 expl_offset=self.expl_offset if self.expled else None,
2907 expl_tlen=self.expl_tlen if self.expled else None,
2908 expl_llen=self.expl_llen if self.expled else None,
2909 expl_vlen=self.expl_vlen if self.expled else None,
2910 expl_lenindef=self.expl_lenindef,
2913 for pp in self.pps_lenindef(decode_path):
2917 BitStringState = namedtuple(
2919 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
2924 class BitString(Obj):
2925 """``BIT STRING`` bit string type
2927 >>> BitString(b"hello world")
2928 BIT STRING 88 bits 68656c6c6f20776f726c64
2931 >>> b == b"hello world"
2936 >>> BitString("'0A3B5F291CD'H")
2937 BIT STRING 44 bits 0a3b5f291cd0
2938 >>> b = BitString("'010110000000'B")
2939 BIT STRING 12 bits 5800
2942 >>> b[0], b[1], b[2], b[3]
2943 (False, True, False, True)
2947 [False, True, False, True, True, False, False, False, False, False, False, False]
2951 class KeyUsage(BitString):
2953 ("digitalSignature", 0),
2954 ("nonRepudiation", 1),
2955 ("keyEncipherment", 2),
2958 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
2959 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
2961 ['nonRepudiation', 'keyEncipherment']
2963 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
2967 Pay attention that BIT STRING can be encoded both in primitive
2968 and constructed forms. Decoder always checks constructed form tag
2969 additionally to specified primitive one. If BER decoding is
2970 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
2971 of DER restrictions.
2973 __slots__ = ("tag_constructed", "specs", "defined")
2974 tag_default = tag_encode(3)
2975 asn1_type_name = "BIT STRING"
2988 :param value: set the value. Either binary type, tuple of named
2989 values (if ``schema`` is specified in the class),
2990 string in ``'XXX...'B`` form, or
2991 :py:class:`pyderasn.BitString` object
2992 :param bytes impl: override default tag with ``IMPLICIT`` one
2993 :param bytes expl: override default tag with ``EXPLICIT`` one
2994 :param default: set default value. Type same as in ``value``
2995 :param bool optional: is object ``OPTIONAL`` in sequence
2997 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
2998 specs = getattr(self, "schema", {}) if _specs is None else _specs
2999 self.specs = specs if specs.__class__ == dict else dict(specs)
3000 self._value = None if value is None else self._value_sanitize(value)
3001 if default is not None:
3002 default = self._value_sanitize(default)
3003 self.default = self.__class__(
3009 self._value = default
3011 tag_klass, _, tag_num = tag_decode(self.tag)
3012 self.tag_constructed = tag_encode(
3014 form=TagFormConstructed,
3018 def _bits2octets(self, bits):
3019 if len(self.specs) > 0:
3020 bits = bits.rstrip("0")
3022 bits += "0" * ((8 - (bit_len % 8)) % 8)
3023 octets = bytearray(len(bits) // 8)
3024 for i in six_xrange(len(octets)):
3025 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3026 return bit_len, bytes(octets)
3028 def _value_sanitize(self, value):
3029 if isinstance(value, (string_types, binary_type)):
3031 isinstance(value, string_types) and
3032 value.startswith("'")
3034 if value.endswith("'B"):
3036 if not frozenset(value) <= SET01:
3037 raise ValueError("B's coding contains unacceptable chars")
3038 return self._bits2octets(value)
3039 if value.endswith("'H"):
3043 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3045 if value.__class__ == binary_type:
3046 return (len(value) * 8, value)
3047 raise InvalidValueType((self.__class__, string_types, binary_type))
3048 if value.__class__ == tuple:
3051 isinstance(value[0], integer_types) and
3052 value[1].__class__ == binary_type
3057 bit = self.specs.get(name)
3059 raise ObjUnknown("BitString value: %s" % name)
3062 return self._bits2octets("")
3063 bits = frozenset(bits)
3064 return self._bits2octets("".join(
3065 ("1" if bit in bits else "0")
3066 for bit in six_xrange(max(bits) + 1)
3068 if issubclass(value.__class__, BitString):
3070 raise InvalidValueType((self.__class__, binary_type, string_types))
3074 return self._value is not None
3076 def __getstate__(self):
3077 return BitStringState(
3092 self.tag_constructed,
3096 def __setstate__(self, state):
3097 super(BitString, self).__setstate__(state)
3098 self.specs = state.specs
3099 self._value = state.value
3100 self.tag_constructed = state.tag_constructed
3101 self.defined = state.defined
3104 self._assert_ready()
3105 for i in six_xrange(self._value[0]):
3110 """Returns number of bits in the string
3112 self._assert_ready()
3113 return self._value[0]
3115 def __bytes__(self):
3116 self._assert_ready()
3117 return self._value[1]
3119 def __eq__(self, their):
3120 if their.__class__ == bytes:
3121 return self._value[1] == their
3122 if not issubclass(their.__class__, BitString):
3125 self._value == their._value and
3126 self.tag == their.tag and
3127 self._expl == their._expl
3132 """Named representation (if exists) of the bits
3134 :returns: [str(name), ...]
3136 return [name for name, bit in iteritems(self.specs) if self[bit]]
3146 return self.__class__(
3148 impl=self.tag if impl is None else impl,
3149 expl=self._expl if expl is None else expl,
3150 default=self.default if default is None else default,
3151 optional=self.optional if optional is None else optional,
3155 def __getitem__(self, key):
3156 if key.__class__ == int:
3157 bit_len, octets = self._value
3161 byte2int(memoryview(octets)[key // 8:]) >>
3164 if isinstance(key, string_types):
3165 value = self.specs.get(key)
3167 raise ObjUnknown("BitString value: %s" % key)
3169 raise InvalidValueType((int, str))
3172 self._assert_ready()
3173 bit_len, octets = self._value
3176 len_encode(len(octets) + 1),
3177 int2byte((8 - bit_len % 8) % 8),
3181 def _encode_cer(self, writer):
3182 bit_len, octets = self._value
3183 if len(octets) + 1 <= 1000:
3184 write_full(writer, self._encode())
3186 write_full(writer, self.tag_constructed)
3187 write_full(writer, LENINDEF)
3188 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3189 write_full(writer, b"".join((
3190 BitString.tag_default,
3193 octets[offset:offset + 999],
3195 tail = octets[offset+999:]
3197 tail = int2byte((8 - bit_len % 8) % 8) + tail
3198 write_full(writer, b"".join((
3199 BitString.tag_default,
3200 len_encode(len(tail)),
3203 write_full(writer, EOC)
3205 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3207 t, tlen, lv = tag_strip(tlv)
3208 except DecodeError as err:
3209 raise err.__class__(
3211 klass=self.__class__,
3212 decode_path=decode_path,
3216 if tag_only: # pragma: no cover
3220 l, llen, v = len_decode(lv)
3221 except DecodeError as err:
3222 raise err.__class__(
3224 klass=self.__class__,
3225 decode_path=decode_path,
3229 raise NotEnoughData(
3230 "encoded length is longer than data",
3231 klass=self.__class__,
3232 decode_path=decode_path,
3236 raise NotEnoughData(
3238 klass=self.__class__,
3239 decode_path=decode_path,
3242 pad_size = byte2int(v)
3243 if l == 1 and pad_size != 0:
3245 "invalid empty value",
3246 klass=self.__class__,
3247 decode_path=decode_path,
3253 klass=self.__class__,
3254 decode_path=decode_path,
3257 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3260 klass=self.__class__,
3261 decode_path=decode_path,
3264 v, tail = v[:l], v[l:]
3265 bit_len = (len(v) - 1) * 8 - pad_size
3266 obj = self.__class__(
3267 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3270 default=self.default,
3271 optional=self.optional,
3273 _decoded=(offset, llen, l),
3276 obj._value = (bit_len, None)
3277 yield decode_path, obj, tail
3279 if t != self.tag_constructed:
3281 klass=self.__class__,
3282 decode_path=decode_path,
3285 if not ctx.get("bered", False):
3287 "unallowed BER constructed encoding",
3288 klass=self.__class__,
3289 decode_path=decode_path,
3292 if tag_only: # pragma: no cover
3297 l, llen, v = len_decode(lv)
3298 except LenIndefForm:
3299 llen, l, v = 1, 0, lv[1:]
3301 except DecodeError as err:
3302 raise err.__class__(
3304 klass=self.__class__,
3305 decode_path=decode_path,
3309 raise NotEnoughData(
3310 "encoded length is longer than data",
3311 klass=self.__class__,
3312 decode_path=decode_path,
3315 if not lenindef and l == 0:
3316 raise NotEnoughData(
3318 klass=self.__class__,
3319 decode_path=decode_path,
3323 sub_offset = offset + tlen + llen
3327 if v[:EOC_LEN].tobytes() == EOC:
3334 "chunk out of bounds",
3335 klass=self.__class__,
3336 decode_path=decode_path + (str(len(chunks) - 1),),
3337 offset=chunks[-1].offset,
3339 sub_decode_path = decode_path + (str(len(chunks)),)
3342 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3345 decode_path=sub_decode_path,
3348 _ctx_immutable=False,
3350 yield _decode_path, chunk, v_tail
3352 _, chunk, v_tail = next(BitString().decode_evgen(
3355 decode_path=sub_decode_path,
3358 _ctx_immutable=False,
3363 "expected BitString encoded chunk",
3364 klass=self.__class__,
3365 decode_path=sub_decode_path,
3368 chunks.append(chunk)
3369 sub_offset += chunk.tlvlen
3370 vlen += chunk.tlvlen
3372 if len(chunks) == 0:
3375 klass=self.__class__,
3376 decode_path=decode_path,
3381 for chunk_i, chunk in enumerate(chunks[:-1]):
3382 if chunk.bit_len % 8 != 0:
3384 "BitString chunk is not multiple of 8 bits",
3385 klass=self.__class__,
3386 decode_path=decode_path + (str(chunk_i),),
3387 offset=chunk.offset,
3390 values.append(bytes(chunk))
3391 bit_len += chunk.bit_len
3392 chunk_last = chunks[-1]
3394 values.append(bytes(chunk_last))
3395 bit_len += chunk_last.bit_len
3396 obj = self.__class__(
3397 value=None if evgen_mode else (bit_len, b"".join(values)),
3400 default=self.default,
3401 optional=self.optional,
3403 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3406 obj._value = (bit_len, None)
3407 obj.lenindef = lenindef
3408 obj.ber_encoded = True
3409 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3412 return pp_console_row(next(self.pps()))
3414 def pps(self, decode_path=()):
3418 bit_len, blob = self._value
3419 value = "%d bits" % bit_len
3420 if len(self.specs) > 0 and blob is not None:
3421 blob = tuple(self.named)
3424 asn1_type_name=self.asn1_type_name,
3425 obj_name=self.__class__.__name__,
3426 decode_path=decode_path,
3429 optional=self.optional,
3430 default=self == self.default,
3431 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3432 expl=None if self._expl is None else tag_decode(self._expl),
3437 expl_offset=self.expl_offset if self.expled else None,
3438 expl_tlen=self.expl_tlen if self.expled else None,
3439 expl_llen=self.expl_llen if self.expled else None,
3440 expl_vlen=self.expl_vlen if self.expled else None,
3441 expl_lenindef=self.expl_lenindef,
3442 lenindef=self.lenindef,
3443 ber_encoded=self.ber_encoded,
3446 defined_by, defined = self.defined or (None, None)
3447 if defined_by is not None:
3449 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3451 for pp in self.pps_lenindef(decode_path):
3455 OctetStringState = namedtuple(
3457 BasicState._fields + (
3468 class OctetString(Obj):
3469 """``OCTET STRING`` binary string type
3471 >>> s = OctetString(b"hello world")
3472 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3473 >>> s == OctetString(b"hello world")
3478 >>> OctetString(b"hello", bounds=(4, 4))
3479 Traceback (most recent call last):
3480 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3481 >>> OctetString(b"hell", bounds=(4, 4))
3482 OCTET STRING 4 bytes 68656c6c
3484 Memoryviews can be used as a values. If memoryview is made on
3485 mmap-ed file, then it does not take storage inside OctetString
3486 itself. In CER encoding mode it will be streamed to the specified
3487 writer, copying 1 KB chunks.
3489 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3490 tag_default = tag_encode(4)
3491 asn1_type_name = "OCTET STRING"
3492 evgen_mode_skip_value = True
3506 :param value: set the value. Either binary type, or
3507 :py:class:`pyderasn.OctetString` object
3508 :param bounds: set ``(MIN, MAX)`` value size constraint.
3509 (-inf, +inf) by default
3510 :param bytes impl: override default tag with ``IMPLICIT`` one
3511 :param bytes expl: override default tag with ``EXPLICIT`` one
3512 :param default: set default value. Type same as in ``value``
3513 :param bool optional: is object ``OPTIONAL`` in sequence
3515 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3517 self._bound_min, self._bound_max = getattr(
3521 ) if bounds is None else bounds
3522 if value is not None:
3523 self._value = self._value_sanitize(value)
3524 if default is not None:
3525 default = self._value_sanitize(default)
3526 self.default = self.__class__(
3531 if self._value is None:
3532 self._value = default
3534 tag_klass, _, tag_num = tag_decode(self.tag)
3535 self.tag_constructed = tag_encode(
3537 form=TagFormConstructed,
3541 def _value_sanitize(self, value):
3542 if value.__class__ == binary_type or value.__class__ == memoryview:
3544 elif issubclass(value.__class__, OctetString):
3545 value = value._value
3547 raise InvalidValueType((self.__class__, bytes, memoryview))
3548 if not self._bound_min <= len(value) <= self._bound_max:
3549 raise BoundsError(self._bound_min, len(value), self._bound_max)
3554 return self._value is not None
3556 def __getstate__(self):
3557 return OctetStringState(
3573 self.tag_constructed,
3577 def __setstate__(self, state):
3578 super(OctetString, self).__setstate__(state)
3579 self._value = state.value
3580 self._bound_min = state.bound_min
3581 self._bound_max = state.bound_max
3582 self.tag_constructed = state.tag_constructed
3583 self.defined = state.defined
3585 def __bytes__(self):
3586 self._assert_ready()
3587 return bytes(self._value)
3589 def __eq__(self, their):
3590 if their.__class__ == binary_type:
3591 return self._value == their
3592 if not issubclass(their.__class__, OctetString):
3595 self._value == their._value and
3596 self.tag == their.tag and
3597 self._expl == their._expl
3600 def __lt__(self, their):
3601 return self._value < their._value
3612 return self.__class__(
3615 (self._bound_min, self._bound_max)
3616 if bounds is None else bounds
3618 impl=self.tag if impl is None else impl,
3619 expl=self._expl if expl is None else expl,
3620 default=self.default if default is None else default,
3621 optional=self.optional if optional is None else optional,
3625 self._assert_ready()
3628 len_encode(len(self._value)),
3632 def _encode_cer(self, writer):
3633 octets = self._value
3634 if len(octets) <= 1000:
3635 write_full(writer, self._encode())
3637 write_full(writer, self.tag_constructed)
3638 write_full(writer, LENINDEF)
3639 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3640 write_full(writer, b"".join((
3641 OctetString.tag_default,
3643 octets[offset:offset + 1000],
3645 tail = octets[offset+1000:]
3647 write_full(writer, b"".join((
3648 OctetString.tag_default,
3649 len_encode(len(tail)),
3652 write_full(writer, EOC)
3654 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3656 t, tlen, lv = tag_strip(tlv)
3657 except DecodeError as err:
3658 raise err.__class__(
3660 klass=self.__class__,
3661 decode_path=decode_path,
3669 l, llen, v = len_decode(lv)
3670 except DecodeError as err:
3671 raise err.__class__(
3673 klass=self.__class__,
3674 decode_path=decode_path,
3678 raise NotEnoughData(
3679 "encoded length is longer than data",
3680 klass=self.__class__,
3681 decode_path=decode_path,
3684 v, tail = v[:l], v[l:]
3685 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3687 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3688 klass=self.__class__,
3689 decode_path=decode_path,
3693 obj = self.__class__(
3695 None if (evgen_mode and self.evgen_mode_skip_value)
3698 bounds=(self._bound_min, self._bound_max),
3701 default=self.default,
3702 optional=self.optional,
3703 _decoded=(offset, llen, l),
3706 except DecodeError as err:
3709 klass=self.__class__,
3710 decode_path=decode_path,
3713 except BoundsError as err:
3716 klass=self.__class__,
3717 decode_path=decode_path,
3720 yield decode_path, obj, tail
3722 if t != self.tag_constructed:
3724 klass=self.__class__,
3725 decode_path=decode_path,
3728 if not ctx.get("bered", False):
3730 "unallowed BER constructed encoding",
3731 klass=self.__class__,
3732 decode_path=decode_path,
3740 l, llen, v = len_decode(lv)
3741 except LenIndefForm:
3742 llen, l, v = 1, 0, lv[1:]
3744 except DecodeError as err:
3745 raise err.__class__(
3747 klass=self.__class__,
3748 decode_path=decode_path,
3752 raise NotEnoughData(
3753 "encoded length is longer than data",
3754 klass=self.__class__,
3755 decode_path=decode_path,
3760 sub_offset = offset + tlen + llen
3765 if v[:EOC_LEN].tobytes() == EOC:
3772 "chunk out of bounds",
3773 klass=self.__class__,
3774 decode_path=decode_path + (str(len(chunks) - 1),),
3775 offset=chunks[-1].offset,
3779 sub_decode_path = decode_path + (str(chunks_count),)
3780 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3783 decode_path=sub_decode_path,
3786 _ctx_immutable=False,
3788 yield _decode_path, chunk, v_tail
3789 if not chunk.ber_encoded:
3790 payload_len += chunk.vlen
3793 sub_decode_path = decode_path + (str(len(chunks)),)
3794 _, chunk, v_tail = next(OctetString().decode_evgen(
3797 decode_path=sub_decode_path,
3800 _ctx_immutable=False,
3803 chunks.append(chunk)
3806 "expected OctetString encoded chunk",
3807 klass=self.__class__,
3808 decode_path=sub_decode_path,
3811 sub_offset += chunk.tlvlen
3812 vlen += chunk.tlvlen
3814 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3816 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3817 klass=self.__class__,
3818 decode_path=decode_path,
3822 obj = self.__class__(
3824 None if evgen_mode else
3825 b"".join(bytes(chunk) for chunk in chunks)
3827 bounds=(self._bound_min, self._bound_max),
3830 default=self.default,
3831 optional=self.optional,
3832 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3835 except DecodeError as err:
3838 klass=self.__class__,
3839 decode_path=decode_path,
3842 except BoundsError as err:
3845 klass=self.__class__,
3846 decode_path=decode_path,
3849 obj.lenindef = lenindef
3850 obj.ber_encoded = True
3851 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3854 return pp_console_row(next(self.pps()))
3856 def pps(self, decode_path=()):
3859 asn1_type_name=self.asn1_type_name,
3860 obj_name=self.__class__.__name__,
3861 decode_path=decode_path,
3862 value=("%d bytes" % len(self._value)) if self.ready else None,
3863 blob=self._value if self.ready else None,
3864 optional=self.optional,
3865 default=self == self.default,
3866 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3867 expl=None if self._expl is None else tag_decode(self._expl),
3872 expl_offset=self.expl_offset if self.expled else None,
3873 expl_tlen=self.expl_tlen if self.expled else None,
3874 expl_llen=self.expl_llen if self.expled else None,
3875 expl_vlen=self.expl_vlen if self.expled else None,
3876 expl_lenindef=self.expl_lenindef,
3877 lenindef=self.lenindef,
3878 ber_encoded=self.ber_encoded,
3881 defined_by, defined = self.defined or (None, None)
3882 if defined_by is not None:
3884 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3886 for pp in self.pps_lenindef(decode_path):
3890 def agg_octet_string(evgens, decode_path, raw, writer):
3891 """Aggregate constructed string (OctetString and its derivatives)
3893 :param evgens: iterator of generated events
3894 :param decode_path: points to the string we want to decode
3895 :param raw: slicebable (memoryview, bytearray, etc) with
3896 the data evgens are generated on
3897 :param writer: buffer.write where string is going to be saved
3898 :param writer: where string is going to be saved. Must comply
3899 with ``io.RawIOBase.write`` behaviour
3901 decode_path_len = len(decode_path)
3902 for dp, obj, _ in evgens:
3903 if dp[:decode_path_len] != decode_path:
3905 if not obj.ber_encoded:
3906 write_full(writer, raw[
3907 obj.offset + obj.tlen + obj.llen:
3908 obj.offset + obj.tlen + obj.llen + obj.vlen -
3909 (EOC_LEN if obj.expl_lenindef else 0)
3911 if len(dp) == decode_path_len:
3915 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
3919 """``NULL`` null object
3927 tag_default = tag_encode(5)
3928 asn1_type_name = "NULL"
3932 value=None, # unused, but Sequence passes it
3939 :param bytes impl: override default tag with ``IMPLICIT`` one
3940 :param bytes expl: override default tag with ``EXPLICIT`` one
3941 :param bool optional: is object ``OPTIONAL`` in sequence
3943 super(Null, self).__init__(impl, expl, None, optional, _decoded)
3950 def __getstate__(self):
3966 def __eq__(self, their):
3967 if not issubclass(their.__class__, Null):
3970 self.tag == their.tag and
3971 self._expl == their._expl
3981 return self.__class__(
3982 impl=self.tag if impl is None else impl,
3983 expl=self._expl if expl is None else expl,
3984 optional=self.optional if optional is None else optional,
3988 return self.tag + LEN0
3990 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3992 t, _, lv = tag_strip(tlv)
3993 except DecodeError as err:
3994 raise err.__class__(
3996 klass=self.__class__,
3997 decode_path=decode_path,
4002 klass=self.__class__,
4003 decode_path=decode_path,
4006 if tag_only: # pragma: no cover
4010 l, _, v = len_decode(lv)
4011 except DecodeError as err:
4012 raise err.__class__(
4014 klass=self.__class__,
4015 decode_path=decode_path,
4019 raise InvalidLength(
4020 "Null must have zero length",
4021 klass=self.__class__,
4022 decode_path=decode_path,
4025 obj = self.__class__(
4028 optional=self.optional,
4029 _decoded=(offset, 1, 0),
4031 yield decode_path, obj, v
4034 return pp_console_row(next(self.pps()))
4036 def pps(self, decode_path=()):
4039 asn1_type_name=self.asn1_type_name,
4040 obj_name=self.__class__.__name__,
4041 decode_path=decode_path,
4042 optional=self.optional,
4043 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4044 expl=None if self._expl is None else tag_decode(self._expl),
4049 expl_offset=self.expl_offset if self.expled else None,
4050 expl_tlen=self.expl_tlen if self.expled else None,
4051 expl_llen=self.expl_llen if self.expled else None,
4052 expl_vlen=self.expl_vlen if self.expled else None,
4053 expl_lenindef=self.expl_lenindef,
4056 for pp in self.pps_lenindef(decode_path):
4060 ObjectIdentifierState = namedtuple(
4061 "ObjectIdentifierState",
4062 BasicState._fields + ("value", "defines"),
4067 class ObjectIdentifier(Obj):
4068 """``OBJECT IDENTIFIER`` OID type
4070 >>> oid = ObjectIdentifier((1, 2, 3))
4071 OBJECT IDENTIFIER 1.2.3
4072 >>> oid == ObjectIdentifier("1.2.3")
4078 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4079 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4081 >>> str(ObjectIdentifier((3, 1)))
4082 Traceback (most recent call last):
4083 pyderasn.InvalidOID: unacceptable first arc value
4085 __slots__ = ("defines",)
4086 tag_default = tag_encode(6)
4087 asn1_type_name = "OBJECT IDENTIFIER"
4100 :param value: set the value. Either tuples of integers,
4101 string of "."-concatenated integers, or
4102 :py:class:`pyderasn.ObjectIdentifier` object
4103 :param defines: sequence of tuples. Each tuple has two elements.
4104 First one is relative to current one decode
4105 path, aiming to the field defined by that OID.
4106 Read about relative path in
4107 :py:func:`pyderasn.abs_decode_path`. Second
4108 tuple element is ``{OID: pyderasn.Obj()}``
4109 dictionary, mapping between current OID value
4110 and structure applied to defined field.
4111 :ref:`Read about DEFINED BY <definedby>`
4112 :param bytes impl: override default tag with ``IMPLICIT`` one
4113 :param bytes expl: override default tag with ``EXPLICIT`` one
4114 :param default: set default value. Type same as in ``value``
4115 :param bool optional: is object ``OPTIONAL`` in sequence
4117 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4119 if value is not None:
4120 self._value = self._value_sanitize(value)
4121 if default is not None:
4122 default = self._value_sanitize(default)
4123 self.default = self.__class__(
4128 if self._value is None:
4129 self._value = default
4130 self.defines = defines
4132 def __add__(self, their):
4133 if their.__class__ == tuple:
4134 return self.__class__(self._value + array("L", their))
4135 if isinstance(their, self.__class__):
4136 return self.__class__(self._value + their._value)
4137 raise InvalidValueType((self.__class__, tuple))
4139 def _value_sanitize(self, value):
4140 if issubclass(value.__class__, ObjectIdentifier):
4142 if isinstance(value, string_types):
4144 value = array("L", (pureint(arc) for arc in value.split(".")))
4146 raise InvalidOID("unacceptable arcs values")
4147 if value.__class__ == tuple:
4149 value = array("L", value)
4150 except OverflowError as err:
4151 raise InvalidOID(repr(err))
4152 if value.__class__ is array:
4154 raise InvalidOID("less than 2 arcs")
4155 first_arc = value[0]
4156 if first_arc in (0, 1):
4157 if not (0 <= value[1] <= 39):
4158 raise InvalidOID("second arc is too wide")
4159 elif first_arc == 2:
4162 raise InvalidOID("unacceptable first arc value")
4163 if not all(arc >= 0 for arc in value):
4164 raise InvalidOID("negative arc value")
4166 raise InvalidValueType((self.__class__, str, tuple))
4170 return self._value is not None
4172 def __getstate__(self):
4173 return ObjectIdentifierState(
4190 def __setstate__(self, state):
4191 super(ObjectIdentifier, self).__setstate__(state)
4192 self._value = state.value
4193 self.defines = state.defines
4196 self._assert_ready()
4197 return iter(self._value)
4200 return ".".join(str(arc) for arc in self._value or ())
4203 self._assert_ready()
4204 return hash(b"".join((
4206 bytes(self._expl or b""),
4207 str(self._value).encode("ascii"),
4210 def __eq__(self, their):
4211 if their.__class__ == tuple:
4212 return self._value == array("L", their)
4213 if not issubclass(their.__class__, ObjectIdentifier):
4216 self.tag == their.tag and
4217 self._expl == their._expl and
4218 self._value == their._value
4221 def __lt__(self, their):
4222 return self._value < their._value
4233 return self.__class__(
4235 defines=self.defines if defines is None else defines,
4236 impl=self.tag if impl is None else impl,
4237 expl=self._expl if expl is None else expl,
4238 default=self.default if default is None else default,
4239 optional=self.optional if optional is None else optional,
4243 self._assert_ready()
4245 first_value = value[1]
4246 first_arc = value[0]
4249 elif first_arc == 1:
4251 elif first_arc == 2:
4253 else: # pragma: no cover
4254 raise RuntimeError("invalid arc is stored")
4255 octets = [zero_ended_encode(first_value)]
4256 for arc in value[2:]:
4257 octets.append(zero_ended_encode(arc))
4258 v = b"".join(octets)
4259 return b"".join((self.tag, len_encode(len(v)), v))
4261 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4263 t, _, lv = tag_strip(tlv)
4264 except DecodeError as err:
4265 raise err.__class__(
4267 klass=self.__class__,
4268 decode_path=decode_path,
4273 klass=self.__class__,
4274 decode_path=decode_path,
4277 if tag_only: # pragma: no cover
4281 l, llen, v = len_decode(lv)
4282 except DecodeError as err:
4283 raise err.__class__(
4285 klass=self.__class__,
4286 decode_path=decode_path,
4290 raise NotEnoughData(
4291 "encoded length is longer than data",
4292 klass=self.__class__,
4293 decode_path=decode_path,
4297 raise NotEnoughData(
4299 klass=self.__class__,
4300 decode_path=decode_path,
4303 v, tail = v[:l], v[l:]
4310 octet = indexbytes(v, i)
4311 if i == 0 and octet == 0x80:
4312 if ctx.get("bered", False):
4316 "non normalized arc encoding",
4317 klass=self.__class__,
4318 decode_path=decode_path,
4321 arc = (arc << 7) | (octet & 0x7F)
4322 if octet & 0x80 == 0:
4325 except OverflowError:
4327 "too huge value for local unsigned long",
4328 klass=self.__class__,
4329 decode_path=decode_path,
4338 klass=self.__class__,
4339 decode_path=decode_path,
4343 second_arc = arcs[0]
4344 if 0 <= second_arc <= 39:
4346 elif 40 <= second_arc <= 79:
4352 obj = self.__class__(
4353 value=array("L", (first_arc, second_arc)) + arcs[1:],
4356 default=self.default,
4357 optional=self.optional,
4358 _decoded=(offset, llen, l),
4361 obj.ber_encoded = True
4362 yield decode_path, obj, tail
4365 return pp_console_row(next(self.pps()))
4367 def pps(self, decode_path=()):
4370 asn1_type_name=self.asn1_type_name,
4371 obj_name=self.__class__.__name__,
4372 decode_path=decode_path,
4373 value=str(self) if self.ready else None,
4374 optional=self.optional,
4375 default=self == self.default,
4376 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4377 expl=None if self._expl is None else tag_decode(self._expl),
4382 expl_offset=self.expl_offset if self.expled else None,
4383 expl_tlen=self.expl_tlen if self.expled else None,
4384 expl_llen=self.expl_llen if self.expled else None,
4385 expl_vlen=self.expl_vlen if self.expled else None,
4386 expl_lenindef=self.expl_lenindef,
4387 ber_encoded=self.ber_encoded,
4390 for pp in self.pps_lenindef(decode_path):
4394 class Enumerated(Integer):
4395 """``ENUMERATED`` integer type
4397 This type is identical to :py:class:`pyderasn.Integer`, but requires
4398 schema to be specified and does not accept values missing from it.
4401 tag_default = tag_encode(10)
4402 asn1_type_name = "ENUMERATED"
4413 bounds=None, # dummy argument, workability for Integer.decode
4415 super(Enumerated, self).__init__(
4416 value, bounds, impl, expl, default, optional, _specs, _decoded,
4418 if len(self.specs) == 0:
4419 raise ValueError("schema must be specified")
4421 def _value_sanitize(self, value):
4422 if isinstance(value, self.__class__):
4423 value = value._value
4424 elif isinstance(value, integer_types):
4425 for _value in itervalues(self.specs):
4430 "unknown integer value: %s" % value,
4431 klass=self.__class__,
4433 elif isinstance(value, string_types):
4434 value = self.specs.get(value)
4436 raise ObjUnknown("integer value: %s" % value)
4438 raise InvalidValueType((self.__class__, int, str))
4450 return self.__class__(
4452 impl=self.tag if impl is None else impl,
4453 expl=self._expl if expl is None else expl,
4454 default=self.default if default is None else default,
4455 optional=self.optional if optional is None else optional,
4460 def escape_control_unicode(c):
4461 if unicat(c)[0] == "C":
4462 c = repr(c).lstrip("u").strip("'")
4466 class CommonString(OctetString):
4467 """Common class for all strings
4469 Everything resembles :py:class:`pyderasn.OctetString`, except
4470 ability to deal with unicode text strings.
4472 >>> hexenc("привет мир".encode("utf-8"))
4473 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4474 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4476 >>> s = UTF8String("привет мир")
4477 UTF8String UTF8String привет мир
4479 'привет мир'
4480 >>> hexenc(bytes(s))
4481 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4483 >>> PrintableString("привет мир")
4484 Traceback (most recent call last):
4485 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4487 >>> BMPString("ада", bounds=(2, 2))
4488 Traceback (most recent call last):
4489 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4490 >>> s = BMPString("ад", bounds=(2, 2))
4493 >>> hexenc(bytes(s))
4501 * - :py:class:`pyderasn.UTF8String`
4503 * - :py:class:`pyderasn.NumericString`
4505 * - :py:class:`pyderasn.PrintableString`
4507 * - :py:class:`pyderasn.TeletexString`
4509 * - :py:class:`pyderasn.T61String`
4511 * - :py:class:`pyderasn.VideotexString`
4513 * - :py:class:`pyderasn.IA5String`
4515 * - :py:class:`pyderasn.GraphicString`
4517 * - :py:class:`pyderasn.VisibleString`
4519 * - :py:class:`pyderasn.ISO646String`
4521 * - :py:class:`pyderasn.GeneralString`
4523 * - :py:class:`pyderasn.UniversalString`
4525 * - :py:class:`pyderasn.BMPString`
4530 def _value_sanitize(self, value):
4532 value_decoded = None
4533 if isinstance(value, self.__class__):
4534 value_raw = value._value
4535 elif value.__class__ == text_type:
4536 value_decoded = value
4537 elif value.__class__ == binary_type:
4540 raise InvalidValueType((self.__class__, text_type, binary_type))
4543 value_decoded.encode(self.encoding)
4544 if value_raw is None else value_raw
4547 value_raw.decode(self.encoding)
4548 if value_decoded is None else value_decoded
4550 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4551 raise DecodeError(str(err))
4552 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4560 def __eq__(self, their):
4561 if their.__class__ == binary_type:
4562 return self._value == their
4563 if their.__class__ == text_type:
4564 return self._value == their.encode(self.encoding)
4565 if not isinstance(their, self.__class__):
4568 self._value == their._value and
4569 self.tag == their.tag and
4570 self._expl == their._expl
4573 def __unicode__(self):
4575 return self._value.decode(self.encoding)
4576 return text_type(self._value)
4579 return pp_console_row(next(self.pps(no_unicode=PY2)))
4581 def pps(self, decode_path=(), no_unicode=False):
4585 hexenc(bytes(self)) if no_unicode else
4586 "".join(escape_control_unicode(c) for c in self.__unicode__())
4590 asn1_type_name=self.asn1_type_name,
4591 obj_name=self.__class__.__name__,
4592 decode_path=decode_path,
4594 optional=self.optional,
4595 default=self == self.default,
4596 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4597 expl=None if self._expl is None else tag_decode(self._expl),
4602 expl_offset=self.expl_offset if self.expled else None,
4603 expl_tlen=self.expl_tlen if self.expled else None,
4604 expl_llen=self.expl_llen if self.expled else None,
4605 expl_vlen=self.expl_vlen if self.expled else None,
4606 expl_lenindef=self.expl_lenindef,
4607 ber_encoded=self.ber_encoded,
4610 for pp in self.pps_lenindef(decode_path):
4614 class UTF8String(CommonString):
4616 tag_default = tag_encode(12)
4618 asn1_type_name = "UTF8String"
4621 class AllowableCharsMixin(object):
4623 def allowable_chars(self):
4625 return self._allowable_chars
4626 return frozenset(six_unichr(c) for c in self._allowable_chars)
4629 class NumericString(AllowableCharsMixin, CommonString):
4632 Its value is properly sanitized: only ASCII digits with spaces can
4635 >>> NumericString().allowable_chars
4636 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4639 tag_default = tag_encode(18)
4641 asn1_type_name = "NumericString"
4642 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4644 def _value_sanitize(self, value):
4645 value = super(NumericString, self)._value_sanitize(value)
4646 if not frozenset(value) <= self._allowable_chars:
4647 raise DecodeError("non-numeric value")
4651 PrintableStringState = namedtuple(
4652 "PrintableStringState",
4653 OctetStringState._fields + ("allowable_chars",),
4658 class PrintableString(AllowableCharsMixin, CommonString):
4661 Its value is properly sanitized: see X.680 41.4 table 10.
4663 >>> PrintableString().allowable_chars
4664 frozenset([' ', "'", ..., 'z'])
4665 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4666 PrintableString PrintableString foo*bar
4667 >>> obj.allow_asterisk, obj.allow_ampersand
4671 tag_default = tag_encode(19)
4673 asn1_type_name = "PrintableString"
4674 _allowable_chars = frozenset(
4675 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4677 _asterisk = frozenset("*".encode("ascii"))
4678 _ampersand = frozenset("&".encode("ascii"))
4690 allow_asterisk=False,
4691 allow_ampersand=False,
4694 :param allow_asterisk: allow asterisk character
4695 :param allow_ampersand: allow ampersand character
4698 self._allowable_chars |= self._asterisk
4700 self._allowable_chars |= self._ampersand
4701 super(PrintableString, self).__init__(
4702 value, bounds, impl, expl, default, optional, _decoded, ctx,
4706 def allow_asterisk(self):
4707 """Is asterisk character allowed?
4709 return self._asterisk <= self._allowable_chars
4712 def allow_ampersand(self):
4713 """Is ampersand character allowed?
4715 return self._ampersand <= self._allowable_chars
4717 def _value_sanitize(self, value):
4718 value = super(PrintableString, self)._value_sanitize(value)
4719 if not frozenset(value) <= self._allowable_chars:
4720 raise DecodeError("non-printable value")
4723 def __getstate__(self):
4724 return PrintableStringState(
4725 *super(PrintableString, self).__getstate__(),
4726 **{"allowable_chars": self._allowable_chars}
4729 def __setstate__(self, state):
4730 super(PrintableString, self).__setstate__(state)
4731 self._allowable_chars = state.allowable_chars
4742 return self.__class__(
4745 (self._bound_min, self._bound_max)
4746 if bounds is None else bounds
4748 impl=self.tag if impl is None else impl,
4749 expl=self._expl if expl is None else expl,
4750 default=self.default if default is None else default,
4751 optional=self.optional if optional is None else optional,
4752 allow_asterisk=self.allow_asterisk,
4753 allow_ampersand=self.allow_ampersand,
4757 class TeletexString(CommonString):
4759 tag_default = tag_encode(20)
4761 asn1_type_name = "TeletexString"
4764 class T61String(TeletexString):
4766 asn1_type_name = "T61String"
4769 class VideotexString(CommonString):
4771 tag_default = tag_encode(21)
4772 encoding = "iso-8859-1"
4773 asn1_type_name = "VideotexString"
4776 class IA5String(CommonString):
4778 tag_default = tag_encode(22)
4780 asn1_type_name = "IA5"
4783 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4784 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4785 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4786 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4787 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4788 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4791 class VisibleString(CommonString):
4793 tag_default = tag_encode(26)
4795 asn1_type_name = "VisibleString"
4798 UTCTimeState = namedtuple(
4800 OctetStringState._fields + ("ber_raw",),
4805 def str_to_time_fractions(value):
4807 year, v = (v // 10**10), (v % 10**10)
4808 month, v = (v // 10**8), (v % 10**8)
4809 day, v = (v // 10**6), (v % 10**6)
4810 hour, v = (v // 10**4), (v % 10**4)
4811 minute, second = (v // 100), (v % 100)
4812 return year, month, day, hour, minute, second
4815 class UTCTime(VisibleString):
4816 """``UTCTime`` datetime type
4818 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
4819 UTCTime UTCTime 2017-09-30T22:07:50
4825 datetime.datetime(2017, 9, 30, 22, 7, 50)
4826 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
4827 datetime.datetime(1957, 9, 30, 22, 7, 50)
4829 If BER encoded value was met, then ``ber_raw`` attribute will hold
4830 its raw representation.
4834 Pay attention that UTCTime can not hold full year, so all years
4835 having < 50 years are treated as 20xx, 19xx otherwise, according
4836 to X.509 recommendation.
4840 No strict validation of UTC offsets are made, but very crude:
4842 * minutes are not exceeding 60
4843 * offset value is not exceeding 14 hours
4845 __slots__ = ("ber_raw",)
4846 tag_default = tag_encode(23)
4848 asn1_type_name = "UTCTime"
4849 evgen_mode_skip_value = False
4859 bounds=None, # dummy argument, workability for OctetString.decode
4863 :param value: set the value. Either datetime type, or
4864 :py:class:`pyderasn.UTCTime` object
4865 :param bytes impl: override default tag with ``IMPLICIT`` one
4866 :param bytes expl: override default tag with ``EXPLICIT`` one
4867 :param default: set default value. Type same as in ``value``
4868 :param bool optional: is object ``OPTIONAL`` in sequence
4870 super(UTCTime, self).__init__(
4871 None, None, impl, expl, None, optional, _decoded, ctx,
4875 if value is not None:
4876 self._value, self.ber_raw = self._value_sanitize(value, ctx)
4877 self.ber_encoded = self.ber_raw is not None
4878 if default is not None:
4879 default, _ = self._value_sanitize(default)
4880 self.default = self.__class__(
4885 if self._value is None:
4886 self._value = default
4888 self.optional = optional
4890 def _strptime_bered(self, value):
4891 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
4894 raise ValueError("no timezone")
4895 year += 2000 if year < 50 else 1900
4896 decoded = datetime(year, month, day, hour, minute)
4898 if value[-1] == "Z":
4902 raise ValueError("invalid UTC offset")
4903 if value[-5] == "-":
4905 elif value[-5] == "+":
4908 raise ValueError("invalid UTC offset")
4909 v = pureint(value[-4:])
4910 offset, v = (60 * (v % 100)), v // 100
4912 raise ValueError("invalid UTC offset minutes")
4914 if offset > 14 * 3600:
4915 raise ValueError("too big UTC offset")
4919 return offset, decoded
4921 raise ValueError("invalid UTC offset seconds")
4922 seconds = pureint(value)
4924 raise ValueError("invalid seconds value")
4925 return offset, decoded + timedelta(seconds=seconds)
4927 def _strptime(self, value):
4928 # datetime.strptime's format: %y%m%d%H%M%SZ
4929 if len(value) != LEN_YYMMDDHHMMSSZ:
4930 raise ValueError("invalid UTCTime length")
4931 if value[-1] != "Z":
4932 raise ValueError("non UTC timezone")
4933 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
4934 year += 2000 if year < 50 else 1900
4935 return datetime(year, month, day, hour, minute, second)
4937 def _dt_sanitize(self, value):
4938 if value.year < 1950 or value.year > 2049:
4939 raise ValueError("UTCTime can hold only 1950-2049 years")
4940 return value.replace(microsecond=0)
4942 def _value_sanitize(self, value, ctx=None):
4943 if value.__class__ == binary_type:
4945 value_decoded = value.decode("ascii")
4946 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4947 raise DecodeError("invalid UTCTime encoding: %r" % err)
4950 return self._strptime(value_decoded), None
4951 except (TypeError, ValueError) as _err:
4953 if (ctx is not None) and ctx.get("bered", False):
4955 offset, _value = self._strptime_bered(value_decoded)
4956 _value = _value - timedelta(seconds=offset)
4957 return self._dt_sanitize(_value), value
4958 except (TypeError, ValueError, OverflowError) as _err:
4961 "invalid %s format: %r" % (self.asn1_type_name, err),
4962 klass=self.__class__,
4964 if isinstance(value, self.__class__):
4965 return value._value, None
4966 if value.__class__ == datetime:
4967 return self._dt_sanitize(value), None
4968 raise InvalidValueType((self.__class__, datetime))
4970 def _pp_value(self):
4972 value = self._value.isoformat()
4973 if self.ber_encoded:
4974 value += " (%s)" % self.ber_raw
4978 def __unicode__(self):
4980 value = self._value.isoformat()
4981 if self.ber_encoded:
4982 value += " (%s)" % self.ber_raw
4984 return text_type(self._pp_value())
4986 def __getstate__(self):
4987 return UTCTimeState(
4988 *super(UTCTime, self).__getstate__(),
4989 **{"ber_raw": self.ber_raw}
4992 def __setstate__(self, state):
4993 super(UTCTime, self).__setstate__(state)
4994 self.ber_raw = state.ber_raw
4996 def __bytes__(self):
4997 self._assert_ready()
4998 return self._encode_time()
5000 def __eq__(self, their):
5001 if their.__class__ == binary_type:
5002 return self._encode_time() == their
5003 if their.__class__ == datetime:
5004 return self.todatetime() == their
5005 if not isinstance(their, self.__class__):
5008 self._value == their._value and
5009 self.tag == their.tag and
5010 self._expl == their._expl
5013 def _encode_time(self):
5014 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5017 self._assert_ready()
5018 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5020 def _encode_cer(self, writer):
5021 write_full(writer, self._encode())
5023 def todatetime(self):
5027 return pp_console_row(next(self.pps()))
5029 def pps(self, decode_path=()):
5032 asn1_type_name=self.asn1_type_name,
5033 obj_name=self.__class__.__name__,
5034 decode_path=decode_path,
5035 value=self._pp_value(),
5036 optional=self.optional,
5037 default=self == self.default,
5038 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5039 expl=None if self._expl is None else tag_decode(self._expl),
5044 expl_offset=self.expl_offset if self.expled else None,
5045 expl_tlen=self.expl_tlen if self.expled else None,
5046 expl_llen=self.expl_llen if self.expled else None,
5047 expl_vlen=self.expl_vlen if self.expled else None,
5048 expl_lenindef=self.expl_lenindef,
5049 ber_encoded=self.ber_encoded,
5052 for pp in self.pps_lenindef(decode_path):
5056 class GeneralizedTime(UTCTime):
5057 """``GeneralizedTime`` datetime type
5059 This type is similar to :py:class:`pyderasn.UTCTime`.
5061 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5062 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5064 '20170930220750.000123Z'
5065 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5066 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5070 Only microsecond fractions are supported in DER encoding.
5071 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5072 higher precision values.
5076 BER encoded data can loss information (accuracy) during decoding
5077 because of float transformations.
5081 Local times (without explicit timezone specification) are treated
5082 as UTC one, no transformations are made.
5086 Zero year is unsupported.
5089 tag_default = tag_encode(24)
5090 asn1_type_name = "GeneralizedTime"
5092 def _dt_sanitize(self, value):
5095 def _strptime_bered(self, value):
5096 if len(value) < 4 + 3 * 2:
5097 raise ValueError("invalid GeneralizedTime")
5098 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5099 decoded = datetime(year, month, day, hour)
5100 offset, value = 0, value[10:]
5102 return offset, decoded
5103 if value[-1] == "Z":
5106 for char, sign in (("-", -1), ("+", 1)):
5107 idx = value.rfind(char)
5110 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5111 v = pureint(offset_raw)
5112 if len(offset_raw) == 4:
5113 offset, v = (60 * (v % 100)), v // 100
5115 raise ValueError("invalid UTC offset minutes")
5116 elif len(offset_raw) == 2:
5119 raise ValueError("invalid UTC offset")
5121 if offset > 14 * 3600:
5122 raise ValueError("too big UTC offset")
5126 return offset, decoded
5127 if value[0] in DECIMAL_SIGNS:
5129 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5132 raise ValueError("stripped minutes")
5133 decoded += timedelta(seconds=60 * pureint(value[:2]))
5136 return offset, decoded
5137 if value[0] in DECIMAL_SIGNS:
5139 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5142 raise ValueError("stripped seconds")
5143 decoded += timedelta(seconds=pureint(value[:2]))
5146 return offset, decoded
5147 if value[0] not in DECIMAL_SIGNS:
5148 raise ValueError("invalid format after seconds")
5150 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5153 def _strptime(self, value):
5155 if l == LEN_YYYYMMDDHHMMSSZ:
5156 # datetime.strptime's format: %Y%m%d%H%M%SZ
5157 if value[-1] != "Z":
5158 raise ValueError("non UTC timezone")
5159 return datetime(*str_to_time_fractions(value[:-1]))
5160 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5161 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5162 if value[-1] != "Z":
5163 raise ValueError("non UTC timezone")
5164 if value[14] != ".":
5165 raise ValueError("no fractions separator")
5168 raise ValueError("trailing zero")
5171 raise ValueError("only microsecond fractions are supported")
5172 us = pureint(us + ("0" * (6 - us_len)))
5173 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5174 return datetime(year, month, day, hour, minute, second, us)
5175 raise ValueError("invalid GeneralizedTime length")
5177 def _encode_time(self):
5179 encoded = value.strftime("%Y%m%d%H%M%S")
5180 if value.microsecond > 0:
5181 encoded += (".%06d" % value.microsecond).rstrip("0")
5182 return (encoded + "Z").encode("ascii")
5185 self._assert_ready()
5187 if value.microsecond > 0:
5188 encoded = self._encode_time()
5189 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5190 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5193 class GraphicString(CommonString):
5195 tag_default = tag_encode(25)
5196 encoding = "iso-8859-1"
5197 asn1_type_name = "GraphicString"
5200 class ISO646String(VisibleString):
5202 asn1_type_name = "ISO646String"
5205 class GeneralString(CommonString):
5207 tag_default = tag_encode(27)
5208 encoding = "iso-8859-1"
5209 asn1_type_name = "GeneralString"
5212 class UniversalString(CommonString):
5214 tag_default = tag_encode(28)
5215 encoding = "utf-32-be"
5216 asn1_type_name = "UniversalString"
5219 class BMPString(CommonString):
5221 tag_default = tag_encode(30)
5222 encoding = "utf-16-be"
5223 asn1_type_name = "BMPString"
5226 ChoiceState = namedtuple(
5228 BasicState._fields + ("specs", "value",),
5234 """``CHOICE`` special type
5238 class GeneralName(Choice):
5240 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5241 ("dNSName", IA5String(impl=tag_ctxp(2))),
5244 >>> gn = GeneralName()
5246 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5247 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5248 >>> gn["dNSName"] = IA5String("bar.baz")
5249 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5250 >>> gn["rfc822Name"]
5253 [2] IA5String IA5 bar.baz
5256 >>> gn.value == gn["dNSName"]
5259 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5261 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5262 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5264 __slots__ = ("specs",)
5266 asn1_type_name = "CHOICE"
5279 :param value: set the value. Either ``(choice, value)`` tuple, or
5280 :py:class:`pyderasn.Choice` object
5281 :param bytes impl: can not be set, do **not** use it
5282 :param bytes expl: override default tag with ``EXPLICIT`` one
5283 :param default: set default value. Type same as in ``value``
5284 :param bool optional: is object ``OPTIONAL`` in sequence
5286 if impl is not None:
5287 raise ValueError("no implicit tag allowed for CHOICE")
5288 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5290 schema = getattr(self, "schema", ())
5291 if len(schema) == 0:
5292 raise ValueError("schema must be specified")
5294 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5297 if value is not None:
5298 self._value = self._value_sanitize(value)
5299 if default is not None:
5300 default_value = self._value_sanitize(default)
5301 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5302 default_obj.specs = self.specs
5303 default_obj._value = default_value
5304 self.default = default_obj
5306 self._value = copy(default_obj._value)
5307 if self._expl is not None:
5308 tag_class, _, tag_num = tag_decode(self._expl)
5309 self._tag_order = (tag_class, tag_num)
5311 def _value_sanitize(self, value):
5312 if (value.__class__ == tuple) and len(value) == 2:
5314 spec = self.specs.get(choice)
5316 raise ObjUnknown(choice)
5317 if not isinstance(obj, spec.__class__):
5318 raise InvalidValueType((spec,))
5319 return (choice, spec(obj))
5320 if isinstance(value, self.__class__):
5322 raise InvalidValueType((self.__class__, tuple))
5326 return self._value is not None and self._value[1].ready
5330 return self.expl_lenindef or (
5331 (self._value is not None) and
5332 self._value[1].bered
5335 def __getstate__(self):
5353 def __setstate__(self, state):
5354 super(Choice, self).__setstate__(state)
5355 self.specs = state.specs
5356 self._value = state.value
5358 def __eq__(self, their):
5359 if (their.__class__ == tuple) and len(their) == 2:
5360 return self._value == their
5361 if not isinstance(their, self.__class__):
5364 self.specs == their.specs and
5365 self._value == their._value
5375 return self.__class__(
5378 expl=self._expl if expl is None else expl,
5379 default=self.default if default is None else default,
5380 optional=self.optional if optional is None else optional,
5385 """Name of the choice
5387 self._assert_ready()
5388 return self._value[0]
5392 """Value of underlying choice
5394 self._assert_ready()
5395 return self._value[1]
5398 def tag_order(self):
5399 self._assert_ready()
5400 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5403 def tag_order_cer(self):
5404 return min(v.tag_order_cer for v in itervalues(self.specs))
5406 def __getitem__(self, key):
5407 if key not in self.specs:
5408 raise ObjUnknown(key)
5409 if self._value is None:
5411 choice, value = self._value
5416 def __setitem__(self, key, value):
5417 spec = self.specs.get(key)
5419 raise ObjUnknown(key)
5420 if not isinstance(value, spec.__class__):
5421 raise InvalidValueType((spec.__class__,))
5422 self._value = (key, spec(value))
5430 return self._value[1].decoded if self.ready else False
5433 self._assert_ready()
5434 return self._value[1].encode()
5436 def _encode_cer(self, writer):
5437 self._assert_ready()
5438 self._value[1].encode_cer(writer)
5440 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5441 for choice, spec in iteritems(self.specs):
5442 sub_decode_path = decode_path + (choice,)
5448 decode_path=sub_decode_path,
5451 _ctx_immutable=False,
5458 klass=self.__class__,
5459 decode_path=decode_path,
5462 if tag_only: # pragma: no cover
5466 for _decode_path, value, tail in spec.decode_evgen(
5470 decode_path=sub_decode_path,
5472 _ctx_immutable=False,
5474 yield _decode_path, value, tail
5476 _, value, tail = next(spec.decode_evgen(
5480 decode_path=sub_decode_path,
5482 _ctx_immutable=False,
5485 obj = self.__class__(
5488 default=self.default,
5489 optional=self.optional,
5490 _decoded=(offset, 0, value.fulllen),
5492 obj._value = (choice, value)
5493 yield decode_path, obj, tail
5496 value = pp_console_row(next(self.pps()))
5498 value = "%s[%r]" % (value, self.value)
5501 def pps(self, decode_path=()):
5504 asn1_type_name=self.asn1_type_name,
5505 obj_name=self.__class__.__name__,
5506 decode_path=decode_path,
5507 value=self.choice if self.ready else None,
5508 optional=self.optional,
5509 default=self == self.default,
5510 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5511 expl=None if self._expl is None else tag_decode(self._expl),
5516 expl_lenindef=self.expl_lenindef,
5520 yield self.value.pps(decode_path=decode_path + (self.choice,))
5521 for pp in self.pps_lenindef(decode_path):
5525 class PrimitiveTypes(Choice):
5526 """Predefined ``CHOICE`` for all generic primitive types
5528 It could be useful for general decoding of some unspecified values:
5530 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5531 OCTET STRING 3 bytes 666f6f
5532 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5536 schema = tuple((klass.__name__, klass()) for klass in (
5560 AnyState = namedtuple(
5562 BasicState._fields + ("value", "defined"),
5568 """``ANY`` special type
5570 >>> Any(Integer(-123))
5571 ANY INTEGER -123 (0X:7B)
5572 >>> a = Any(OctetString(b"hello world").encode())
5573 ANY 040b68656c6c6f20776f726c64
5574 >>> hexenc(bytes(a))
5575 b'0x040x0bhello world'
5577 __slots__ = ("defined",)
5578 tag_default = tag_encode(0)
5579 asn1_type_name = "ANY"
5589 :param value: set the value. Either any kind of pyderasn's
5590 **ready** object, or bytes. Pay attention that
5591 **no** validation is performed if raw binary value
5592 is valid TLV, except just tag decoding
5593 :param bytes expl: override default tag with ``EXPLICIT`` one
5594 :param bool optional: is object ``OPTIONAL`` in sequence
5596 super(Any, self).__init__(None, expl, None, optional, _decoded)
5600 value = self._value_sanitize(value)
5602 if self._expl is None:
5603 if value.__class__ == binary_type:
5604 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5606 tag_class, tag_num = value.tag_order
5608 tag_class, _, tag_num = tag_decode(self._expl)
5609 self._tag_order = (tag_class, tag_num)
5612 def _value_sanitize(self, value):
5613 if value.__class__ == binary_type:
5615 raise ValueError("Any value can not be empty")
5617 if isinstance(value, self.__class__):
5619 if not isinstance(value, Obj):
5620 raise InvalidValueType((self.__class__, Obj, binary_type))
5625 return self._value is not None
5628 def tag_order(self):
5629 self._assert_ready()
5630 return self._tag_order
5634 if self.expl_lenindef or self.lenindef:
5636 if self.defined is None:
5638 return self.defined[1].bered
5640 def __getstate__(self):
5658 def __setstate__(self, state):
5659 super(Any, self).__setstate__(state)
5660 self._value = state.value
5661 self.defined = state.defined
5663 def __eq__(self, their):
5664 if their.__class__ == binary_type:
5665 if self._value.__class__ == binary_type:
5666 return self._value == their
5667 return self._value.encode() == their
5668 if issubclass(their.__class__, Any):
5669 if self.ready and their.ready:
5670 return bytes(self) == bytes(their)
5671 return self.ready == their.ready
5680 return self.__class__(
5682 expl=self._expl if expl is None else expl,
5683 optional=self.optional if optional is None else optional,
5686 def __bytes__(self):
5687 self._assert_ready()
5689 if value.__class__ == binary_type:
5691 return self._value.encode()
5698 self._assert_ready()
5700 if value.__class__ == binary_type:
5702 return value.encode()
5704 def _encode_cer(self, writer):
5705 self._assert_ready()
5707 if value.__class__ == binary_type:
5708 write_full(writer, value)
5710 value.encode_cer(writer)
5712 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5714 t, tlen, lv = tag_strip(tlv)
5715 except DecodeError as err:
5716 raise err.__class__(
5718 klass=self.__class__,
5719 decode_path=decode_path,
5723 l, llen, v = len_decode(lv)
5724 except LenIndefForm as err:
5725 if not ctx.get("bered", False):
5726 raise err.__class__(
5728 klass=self.__class__,
5729 decode_path=decode_path,
5732 llen, vlen, v = 1, 0, lv[1:]
5733 sub_offset = offset + tlen + llen
5735 while v[:EOC_LEN].tobytes() != EOC:
5736 chunk, v = Any().decode(
5739 decode_path=decode_path + (str(chunk_i),),
5742 _ctx_immutable=False,
5744 vlen += chunk.tlvlen
5745 sub_offset += chunk.tlvlen
5747 tlvlen = tlen + llen + vlen + EOC_LEN
5748 obj = self.__class__(
5749 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5751 optional=self.optional,
5752 _decoded=(offset, 0, tlvlen),
5755 obj.tag = t.tobytes()
5756 yield decode_path, obj, v[EOC_LEN:]
5758 except DecodeError as err:
5759 raise err.__class__(
5761 klass=self.__class__,
5762 decode_path=decode_path,
5766 raise NotEnoughData(
5767 "encoded length is longer than data",
5768 klass=self.__class__,
5769 decode_path=decode_path,
5772 tlvlen = tlen + llen + l
5773 v, tail = tlv[:tlvlen], v[l:]
5774 obj = self.__class__(
5775 value=None if evgen_mode else v.tobytes(),
5777 optional=self.optional,
5778 _decoded=(offset, 0, tlvlen),
5780 obj.tag = t.tobytes()
5781 yield decode_path, obj, tail
5784 return pp_console_row(next(self.pps()))
5786 def pps(self, decode_path=()):
5790 elif value.__class__ == binary_type:
5796 asn1_type_name=self.asn1_type_name,
5797 obj_name=self.__class__.__name__,
5798 decode_path=decode_path,
5800 blob=self._value if self._value.__class__ == binary_type else None,
5801 optional=self.optional,
5802 default=self == self.default,
5803 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5804 expl=None if self._expl is None else tag_decode(self._expl),
5809 expl_offset=self.expl_offset if self.expled else None,
5810 expl_tlen=self.expl_tlen if self.expled else None,
5811 expl_llen=self.expl_llen if self.expled else None,
5812 expl_vlen=self.expl_vlen if self.expled else None,
5813 expl_lenindef=self.expl_lenindef,
5814 lenindef=self.lenindef,
5817 defined_by, defined = self.defined or (None, None)
5818 if defined_by is not None:
5820 decode_path=decode_path + (DecodePathDefBy(defined_by),)
5822 for pp in self.pps_lenindef(decode_path):
5826 ########################################################################
5827 # ASN.1 constructed types
5828 ########################################################################
5830 def abs_decode_path(decode_path, rel_path):
5831 """Create an absolute decode path from current and relative ones
5833 :param decode_path: current decode path, starting point. Tuple of strings
5834 :param rel_path: relative path to ``decode_path``. Tuple of strings.
5835 If first tuple's element is "/", then treat it as
5836 an absolute path, ignoring ``decode_path`` as
5837 starting point. Also this tuple can contain ".."
5838 elements, stripping the leading element from
5841 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
5842 ("foo", "bar", "baz", "whatever")
5843 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
5845 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
5848 if rel_path[0] == "/":
5850 if rel_path[0] == "..":
5851 return abs_decode_path(decode_path[:-1], rel_path[1:])
5852 return decode_path + rel_path
5855 SequenceState = namedtuple(
5857 BasicState._fields + ("specs", "value",),
5862 class Sequence(Obj):
5863 """``SEQUENCE`` structure type
5865 You have to make specification of sequence::
5867 class Extension(Sequence):
5869 ("extnID", ObjectIdentifier()),
5870 ("critical", Boolean(default=False)),
5871 ("extnValue", OctetString()),
5874 Then, you can work with it as with dictionary.
5876 >>> ext = Extension()
5877 >>> Extension().specs
5879 ('extnID', OBJECT IDENTIFIER),
5880 ('critical', BOOLEAN False OPTIONAL DEFAULT),
5881 ('extnValue', OCTET STRING),
5883 >>> ext["extnID"] = "1.2.3"
5884 Traceback (most recent call last):
5885 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
5886 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
5888 You can determine if sequence is ready to be encoded:
5893 Traceback (most recent call last):
5894 pyderasn.ObjNotReady: object is not ready: extnValue
5895 >>> ext["extnValue"] = OctetString(b"foobar")
5899 Value you want to assign, must have the same **type** as in
5900 corresponding specification, but it can have different tags,
5901 optional/default attributes -- they will be taken from specification
5904 class TBSCertificate(Sequence):
5906 ("version", Version(expl=tag_ctxc(0), default="v1")),
5909 >>> tbs = TBSCertificate()
5910 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
5912 Assign ``None`` to remove value from sequence.
5914 You can set values in Sequence during its initialization:
5916 >>> AlgorithmIdentifier((
5917 ("algorithm", ObjectIdentifier("1.2.3")),
5918 ("parameters", Any(Null()))
5920 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
5922 You can determine if value exists/set in the sequence and take its value:
5924 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
5927 OBJECT IDENTIFIER 1.2.3
5929 But pay attention that if value has default, then it won't be (not
5930 in) in the sequence (because ``DEFAULT`` must not be encoded in
5931 DER), but you can read its value:
5933 >>> "critical" in ext, ext["critical"]
5934 (False, BOOLEAN False)
5935 >>> ext["critical"] = Boolean(True)
5936 >>> "critical" in ext, ext["critical"]
5937 (True, BOOLEAN True)
5939 All defaulted values are always optional.
5941 .. _allow_default_values_ctx:
5943 DER prohibits default value encoding and will raise an error if
5944 default value is unexpectedly met during decode.
5945 If :ref:`bered <bered_ctx>` context option is set, then no error
5946 will be raised, but ``bered`` attribute set. You can disable strict
5947 defaulted values existence validation by setting
5948 ``"allow_default_values": True`` :ref:`context <ctx>` option.
5952 Check for default value existence is not performed in
5953 ``evgen_mode``, because previously decoded values are not stored
5954 in memory, to be able to compare them.
5956 Two sequences are equal if they have equal specification (schema),
5957 implicit/explicit tagging and the same values.
5959 __slots__ = ("specs",)
5960 tag_default = tag_encode(form=TagFormConstructed, num=16)
5961 asn1_type_name = "SEQUENCE"
5973 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
5975 schema = getattr(self, "schema", ())
5977 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5980 if value is not None:
5981 if issubclass(value.__class__, Sequence):
5982 self._value = value._value
5983 elif hasattr(value, "__iter__"):
5984 for seq_key, seq_value in value:
5985 self[seq_key] = seq_value
5987 raise InvalidValueType((Sequence,))
5988 if default is not None:
5989 if not issubclass(default.__class__, Sequence):
5990 raise InvalidValueType((Sequence,))
5991 default_value = default._value
5992 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5993 default_obj.specs = self.specs
5994 default_obj._value = default_value
5995 self.default = default_obj
5997 self._value = copy(default_obj._value)
6001 for name, spec in iteritems(self.specs):
6002 value = self._value.get(name)
6013 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6015 return any(value.bered for value in itervalues(self._value))
6017 def __getstate__(self):
6018 return SequenceState(
6032 {k: copy(v) for k, v in iteritems(self._value)},
6035 def __setstate__(self, state):
6036 super(Sequence, self).__setstate__(state)
6037 self.specs = state.specs
6038 self._value = state.value
6040 def __eq__(self, their):
6041 if not isinstance(their, self.__class__):
6044 self.specs == their.specs and
6045 self.tag == their.tag and
6046 self._expl == their._expl and
6047 self._value == their._value
6058 return self.__class__(
6061 impl=self.tag if impl is None else impl,
6062 expl=self._expl if expl is None else expl,
6063 default=self.default if default is None else default,
6064 optional=self.optional if optional is None else optional,
6067 def __contains__(self, key):
6068 return key in self._value
6070 def __setitem__(self, key, value):
6071 spec = self.specs.get(key)
6073 raise ObjUnknown(key)
6075 self._value.pop(key, None)
6077 if not isinstance(value, spec.__class__):
6078 raise InvalidValueType((spec.__class__,))
6079 value = spec(value=value)
6080 if spec.default is not None and value == spec.default:
6081 self._value.pop(key, None)
6083 self._value[key] = value
6085 def __getitem__(self, key):
6086 value = self._value.get(key)
6087 if value is not None:
6089 spec = self.specs.get(key)
6091 raise ObjUnknown(key)
6092 if spec.default is not None:
6096 def _values_for_encoding(self):
6097 for name, spec in iteritems(self.specs):
6098 value = self._value.get(name)
6102 raise ObjNotReady(name)
6106 v = b"".join(v.encode() for v in self._values_for_encoding())
6107 return b"".join((self.tag, len_encode(len(v)), v))
6109 def _encode_cer(self, writer):
6110 write_full(writer, self.tag + LENINDEF)
6111 for v in self._values_for_encoding():
6112 v.encode_cer(writer)
6113 write_full(writer, EOC)
6115 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6117 t, tlen, lv = tag_strip(tlv)
6118 except DecodeError as err:
6119 raise err.__class__(
6121 klass=self.__class__,
6122 decode_path=decode_path,
6127 klass=self.__class__,
6128 decode_path=decode_path,
6131 if tag_only: # pragma: no cover
6135 ctx_bered = ctx.get("bered", False)
6137 l, llen, v = len_decode(lv)
6138 except LenIndefForm as err:
6140 raise err.__class__(
6142 klass=self.__class__,
6143 decode_path=decode_path,
6146 l, llen, v = 0, 1, lv[1:]
6148 except DecodeError as err:
6149 raise err.__class__(
6151 klass=self.__class__,
6152 decode_path=decode_path,
6156 raise NotEnoughData(
6157 "encoded length is longer than data",
6158 klass=self.__class__,
6159 decode_path=decode_path,
6163 v, tail = v[:l], v[l:]
6165 sub_offset = offset + tlen + llen
6168 ctx_allow_default_values = ctx.get("allow_default_values", False)
6169 for name, spec in iteritems(self.specs):
6170 if spec.optional and (
6171 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6175 sub_decode_path = decode_path + (name,)
6178 for _decode_path, value, v_tail in spec.decode_evgen(
6182 decode_path=sub_decode_path,
6184 _ctx_immutable=False,
6186 yield _decode_path, value, v_tail
6188 _, value, v_tail = next(spec.decode_evgen(
6192 decode_path=sub_decode_path,
6194 _ctx_immutable=False,
6197 except TagMismatch as err:
6198 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6202 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6203 if not evgen_mode and defined is not None:
6204 defined_by, defined_spec = defined
6205 if issubclass(value.__class__, SequenceOf):
6206 for i, _value in enumerate(value):
6207 sub_sub_decode_path = sub_decode_path + (
6209 DecodePathDefBy(defined_by),
6211 defined_value, defined_tail = defined_spec.decode(
6212 memoryview(bytes(_value)),
6214 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6215 if value.expled else (value.tlen + value.llen)
6218 decode_path=sub_sub_decode_path,
6220 _ctx_immutable=False,
6222 if len(defined_tail) > 0:
6225 klass=self.__class__,
6226 decode_path=sub_sub_decode_path,
6229 _value.defined = (defined_by, defined_value)
6231 defined_value, defined_tail = defined_spec.decode(
6232 memoryview(bytes(value)),
6234 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6235 if value.expled else (value.tlen + value.llen)
6238 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6240 _ctx_immutable=False,
6242 if len(defined_tail) > 0:
6245 klass=self.__class__,
6246 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6249 value.defined = (defined_by, defined_value)
6251 value_len = value.fulllen
6253 sub_offset += value_len
6256 if spec.default is not None and value == spec.default:
6257 # This will not work in evgen_mode
6258 if ctx_bered or ctx_allow_default_values:
6262 "DEFAULT value met",
6263 klass=self.__class__,
6264 decode_path=sub_decode_path,
6267 values[name] = value
6268 spec_defines = getattr(spec, "defines", ())
6269 if len(spec_defines) == 0:
6270 defines_by_path = ctx.get("defines_by_path", ())
6271 if len(defines_by_path) > 0:
6272 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6273 if spec_defines is not None and len(spec_defines) > 0:
6274 for rel_path, schema in spec_defines:
6275 defined = schema.get(value, None)
6276 if defined is not None:
6277 ctx.setdefault("_defines", []).append((
6278 abs_decode_path(sub_decode_path[:-1], rel_path),
6282 if v[:EOC_LEN].tobytes() != EOC:
6285 klass=self.__class__,
6286 decode_path=decode_path,
6294 klass=self.__class__,
6295 decode_path=decode_path,
6298 obj = self.__class__(
6302 default=self.default,
6303 optional=self.optional,
6304 _decoded=(offset, llen, vlen),
6307 obj.lenindef = lenindef
6308 obj.ber_encoded = ber_encoded
6309 yield decode_path, obj, tail
6312 value = pp_console_row(next(self.pps()))
6314 for name in self.specs:
6315 _value = self._value.get(name)
6318 cols.append("%s: %s" % (name, repr(_value)))
6319 return "%s[%s]" % (value, "; ".join(cols))
6321 def pps(self, decode_path=()):
6324 asn1_type_name=self.asn1_type_name,
6325 obj_name=self.__class__.__name__,
6326 decode_path=decode_path,
6327 optional=self.optional,
6328 default=self == self.default,
6329 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6330 expl=None if self._expl is None else tag_decode(self._expl),
6335 expl_offset=self.expl_offset if self.expled else None,
6336 expl_tlen=self.expl_tlen if self.expled else None,
6337 expl_llen=self.expl_llen if self.expled else None,
6338 expl_vlen=self.expl_vlen if self.expled else None,
6339 expl_lenindef=self.expl_lenindef,
6340 lenindef=self.lenindef,
6341 ber_encoded=self.ber_encoded,
6344 for name in self.specs:
6345 value = self._value.get(name)
6348 yield value.pps(decode_path=decode_path + (name,))
6349 for pp in self.pps_lenindef(decode_path):
6353 class Set(Sequence):
6354 """``SET`` structure type
6356 Its usage is identical to :py:class:`pyderasn.Sequence`.
6358 .. _allow_unordered_set_ctx:
6360 DER prohibits unordered values encoding and will raise an error
6361 during decode. If :ref:`bered <bered_ctx>` context option is set,
6362 then no error will occur. Also you can disable strict values
6363 ordering check by setting ``"allow_unordered_set": True``
6364 :ref:`context <ctx>` option.
6367 tag_default = tag_encode(form=TagFormConstructed, num=17)
6368 asn1_type_name = "SET"
6370 def _values_for_encoding(self):
6372 super(Set, self)._values_for_encoding(),
6373 key=attrgetter("tag_order"),
6376 def _encode_cer(self, writer):
6377 write_full(writer, self.tag + LENINDEF)
6379 super(Set, self)._values_for_encoding(),
6380 key=attrgetter("tag_order_cer"),
6382 v.encode_cer(writer)
6383 write_full(writer, EOC)
6385 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6387 t, tlen, lv = tag_strip(tlv)
6388 except DecodeError as err:
6389 raise err.__class__(
6391 klass=self.__class__,
6392 decode_path=decode_path,
6397 klass=self.__class__,
6398 decode_path=decode_path,
6405 ctx_bered = ctx.get("bered", False)
6407 l, llen, v = len_decode(lv)
6408 except LenIndefForm as err:
6410 raise err.__class__(
6412 klass=self.__class__,
6413 decode_path=decode_path,
6416 l, llen, v = 0, 1, lv[1:]
6418 except DecodeError as err:
6419 raise err.__class__(
6421 klass=self.__class__,
6422 decode_path=decode_path,
6426 raise NotEnoughData(
6427 "encoded length is longer than data",
6428 klass=self.__class__,
6432 v, tail = v[:l], v[l:]
6434 sub_offset = offset + tlen + llen
6437 ctx_allow_default_values = ctx.get("allow_default_values", False)
6438 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6439 tag_order_prev = (0, 0)
6440 _specs_items = copy(self.specs)
6443 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6445 for name, spec in iteritems(_specs_items):
6446 sub_decode_path = decode_path + (name,)
6452 decode_path=sub_decode_path,
6455 _ctx_immutable=False,
6462 klass=self.__class__,
6463 decode_path=decode_path,
6467 for _decode_path, value, v_tail in spec.decode_evgen(
6471 decode_path=sub_decode_path,
6473 _ctx_immutable=False,
6475 yield _decode_path, value, v_tail
6477 _, value, v_tail = next(spec.decode_evgen(
6481 decode_path=sub_decode_path,
6483 _ctx_immutable=False,
6486 value_tag_order = value.tag_order
6487 value_len = value.fulllen
6488 if tag_order_prev >= value_tag_order:
6489 if ctx_bered or ctx_allow_unordered_set:
6493 "unordered " + self.asn1_type_name,
6494 klass=self.__class__,
6495 decode_path=sub_decode_path,
6498 if spec.default is None or value != spec.default:
6500 elif ctx_bered or ctx_allow_default_values:
6504 "DEFAULT value met",
6505 klass=self.__class__,
6506 decode_path=sub_decode_path,
6509 values[name] = value
6510 del _specs_items[name]
6511 tag_order_prev = value_tag_order
6512 sub_offset += value_len
6516 obj = self.__class__(
6520 default=self.default,
6521 optional=self.optional,
6522 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6525 if v[:EOC_LEN].tobytes() != EOC:
6528 klass=self.__class__,
6529 decode_path=decode_path,
6534 for name, spec in iteritems(self.specs):
6535 if name not in values and not spec.optional:
6537 "%s value is not ready" % name,
6538 klass=self.__class__,
6539 decode_path=decode_path,
6544 obj.ber_encoded = ber_encoded
6545 yield decode_path, obj, tail
6548 SequenceOfState = namedtuple(
6550 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6555 class SequenceOf(Obj):
6556 """``SEQUENCE OF`` sequence type
6558 For that kind of type you must specify the object it will carry on
6559 (bounds are for example here, not required)::
6561 class Ints(SequenceOf):
6566 >>> ints.append(Integer(123))
6567 >>> ints.append(Integer(234))
6569 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6570 >>> [int(i) for i in ints]
6572 >>> ints.append(Integer(345))
6573 Traceback (most recent call last):
6574 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6577 >>> ints[1] = Integer(345)
6579 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6581 You can initialize sequence with preinitialized values:
6583 >>> ints = Ints([Integer(123), Integer(234)])
6585 Also you can use iterator as a value:
6587 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6589 And it won't be iterated until encoding process. Pay attention that
6590 bounds and required schema checks are done only during the encoding
6591 process in that case! After encode was called, then value is zeroed
6592 back to empty list and you have to set it again. That mode is useful
6593 mainly with CER encoding mode, where all objects from the iterable
6594 will be streamed to the buffer, without copying all of them to
6597 __slots__ = ("spec", "_bound_min", "_bound_max")
6598 tag_default = tag_encode(form=TagFormConstructed, num=16)
6599 asn1_type_name = "SEQUENCE OF"
6612 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6614 schema = getattr(self, "schema", None)
6616 raise ValueError("schema must be specified")
6618 self._bound_min, self._bound_max = getattr(
6622 ) if bounds is None else bounds
6624 if value is not None:
6625 self._value = self._value_sanitize(value)
6626 if default is not None:
6627 default_value = self._value_sanitize(default)
6628 default_obj = self.__class__(
6633 default_obj._value = default_value
6634 self.default = default_obj
6636 self._value = copy(default_obj._value)
6638 def _value_sanitize(self, value):
6640 if issubclass(value.__class__, SequenceOf):
6641 value = value._value
6642 elif hasattr(value, NEXT_ATTR_NAME):
6644 elif hasattr(value, "__iter__"):
6647 raise InvalidValueType((self.__class__, iter, "iterator"))
6649 if not self._bound_min <= len(value) <= self._bound_max:
6650 raise BoundsError(self._bound_min, len(value), self._bound_max)
6651 class_expected = self.spec.__class__
6653 if not isinstance(v, class_expected):
6654 raise InvalidValueType((class_expected,))
6659 if hasattr(self._value, NEXT_ATTR_NAME):
6661 if self._bound_min > 0 and len(self._value) == 0:
6663 return all(v.ready for v in self._value)
6667 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6669 return any(v.bered for v in self._value)
6671 def __getstate__(self):
6672 if hasattr(self._value, NEXT_ATTR_NAME):
6673 raise ValueError("can not pickle SequenceOf with iterator")
6674 return SequenceOfState(
6688 [copy(v) for v in self._value],
6693 def __setstate__(self, state):
6694 super(SequenceOf, self).__setstate__(state)
6695 self.spec = state.spec
6696 self._value = state.value
6697 self._bound_min = state.bound_min
6698 self._bound_max = state.bound_max
6700 def __eq__(self, their):
6701 if isinstance(their, self.__class__):
6703 self.spec == their.spec and
6704 self.tag == their.tag and
6705 self._expl == their._expl and
6706 self._value == their._value
6708 if hasattr(their, "__iter__"):
6709 return self._value == list(their)
6721 return self.__class__(
6725 (self._bound_min, self._bound_max)
6726 if bounds is None else bounds
6728 impl=self.tag if impl is None else impl,
6729 expl=self._expl if expl is None else expl,
6730 default=self.default if default is None else default,
6731 optional=self.optional if optional is None else optional,
6734 def __contains__(self, key):
6735 return key in self._value
6737 def append(self, value):
6738 if not isinstance(value, self.spec.__class__):
6739 raise InvalidValueType((self.spec.__class__,))
6740 if len(self._value) + 1 > self._bound_max:
6743 len(self._value) + 1,
6746 self._value.append(value)
6749 return iter(self._value)
6752 return len(self._value)
6754 def __setitem__(self, key, value):
6755 if not isinstance(value, self.spec.__class__):
6756 raise InvalidValueType((self.spec.__class__,))
6757 self._value[key] = self.spec(value=value)
6759 def __getitem__(self, key):
6760 return self._value[key]
6762 def _values_for_encoding(self):
6763 return iter(self._value)
6766 iterator = hasattr(self._value, NEXT_ATTR_NAME)
6769 values_append = values.append
6770 class_expected = self.spec.__class__
6771 values_for_encoding = self._values_for_encoding()
6773 for v in values_for_encoding:
6774 if not isinstance(v, class_expected):
6775 raise InvalidValueType((class_expected,))
6776 values_append(v.encode())
6777 if not self._bound_min <= len(values) <= self._bound_max:
6778 raise BoundsError(self._bound_min, len(values), self._bound_max)
6779 value = b"".join(values)
6781 value = b"".join(v.encode() for v in self._values_for_encoding())
6782 return b"".join((self.tag, len_encode(len(value)), value))
6784 def _encode_cer(self, writer):
6785 write_full(writer, self.tag + LENINDEF)
6786 iterator = hasattr(self._value, NEXT_ATTR_NAME)
6788 class_expected = self.spec.__class__
6790 values_for_encoding = self._values_for_encoding()
6792 for v in values_for_encoding:
6793 if not isinstance(v, class_expected):
6794 raise InvalidValueType((class_expected,))
6795 v.encode_cer(writer)
6797 if not self._bound_min <= values_count <= self._bound_max:
6798 raise BoundsError(self._bound_min, values_count, self._bound_max)
6800 for v in self._values_for_encoding():
6801 v.encode_cer(writer)
6802 write_full(writer, EOC)
6812 ordering_check=False,
6815 t, tlen, lv = tag_strip(tlv)
6816 except DecodeError as err:
6817 raise err.__class__(
6819 klass=self.__class__,
6820 decode_path=decode_path,
6825 klass=self.__class__,
6826 decode_path=decode_path,
6833 ctx_bered = ctx.get("bered", False)
6835 l, llen, v = len_decode(lv)
6836 except LenIndefForm as err:
6838 raise err.__class__(
6840 klass=self.__class__,
6841 decode_path=decode_path,
6844 l, llen, v = 0, 1, lv[1:]
6846 except DecodeError as err:
6847 raise err.__class__(
6849 klass=self.__class__,
6850 decode_path=decode_path,
6854 raise NotEnoughData(
6855 "encoded length is longer than data",
6856 klass=self.__class__,
6857 decode_path=decode_path,
6861 v, tail = v[:l], v[l:]
6863 sub_offset = offset + tlen + llen
6866 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6867 value_prev = memoryview(v[:0])
6871 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6873 sub_decode_path = decode_path + (str(_value_count),)
6875 for _decode_path, value, v_tail in spec.decode_evgen(
6879 decode_path=sub_decode_path,
6881 _ctx_immutable=False,
6883 yield _decode_path, value, v_tail
6885 _, value, v_tail = next(spec.decode_evgen(
6889 decode_path=sub_decode_path,
6891 _ctx_immutable=False,
6894 value_len = value.fulllen
6896 if value_prev.tobytes() > v[:value_len].tobytes():
6897 if ctx_bered or ctx_allow_unordered_set:
6901 "unordered " + self.asn1_type_name,
6902 klass=self.__class__,
6903 decode_path=sub_decode_path,
6906 value_prev = v[:value_len]
6909 _value.append(value)
6910 sub_offset += value_len
6913 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
6915 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
6916 klass=self.__class__,
6917 decode_path=decode_path,
6921 obj = self.__class__(
6922 value=None if evgen_mode else _value,
6924 bounds=(self._bound_min, self._bound_max),
6927 default=self.default,
6928 optional=self.optional,
6929 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6931 except BoundsError as err:
6934 klass=self.__class__,
6935 decode_path=decode_path,
6939 if v[:EOC_LEN].tobytes() != EOC:
6942 klass=self.__class__,
6943 decode_path=decode_path,
6948 obj.ber_encoded = ber_encoded
6949 yield decode_path, obj, tail
6953 pp_console_row(next(self.pps())),
6954 ", ".join(repr(v) for v in self._value),
6957 def pps(self, decode_path=()):
6960 asn1_type_name=self.asn1_type_name,
6961 obj_name=self.__class__.__name__,
6962 decode_path=decode_path,
6963 optional=self.optional,
6964 default=self == self.default,
6965 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6966 expl=None if self._expl is None else tag_decode(self._expl),
6971 expl_offset=self.expl_offset if self.expled else None,
6972 expl_tlen=self.expl_tlen if self.expled else None,
6973 expl_llen=self.expl_llen if self.expled else None,
6974 expl_vlen=self.expl_vlen if self.expled else None,
6975 expl_lenindef=self.expl_lenindef,
6976 lenindef=self.lenindef,
6977 ber_encoded=self.ber_encoded,
6980 for i, value in enumerate(self._value):
6981 yield value.pps(decode_path=decode_path + (str(i),))
6982 for pp in self.pps_lenindef(decode_path):
6986 class SetOf(SequenceOf):
6987 """``SET OF`` sequence type
6989 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
6992 tag_default = tag_encode(form=TagFormConstructed, num=17)
6993 asn1_type_name = "SET OF"
6995 def _value_sanitize(self, value):
6996 value = super(SetOf, self)._value_sanitize(value)
6997 if hasattr(value, NEXT_ATTR_NAME):
6999 "SetOf does not support iterator values, as no sense in them"
7004 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7005 return b"".join((self.tag, len_encode(len(v)), v))
7007 def _encode_cer(self, writer):
7008 write_full(writer, self.tag + LENINDEF)
7009 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7010 write_full(writer, v)
7011 write_full(writer, EOC)
7013 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7014 return super(SetOf, self)._decode(
7021 ordering_check=True,
7025 def obj_by_path(pypath): # pragma: no cover
7026 """Import object specified as string Python path
7028 Modules must be separated from classes/functions with ``:``.
7030 >>> obj_by_path("foo.bar:Baz")
7031 <class 'foo.bar.Baz'>
7032 >>> obj_by_path("foo.bar:Baz.boo")
7033 <classmethod 'foo.bar.Baz.boo'>
7035 mod, objs = pypath.rsplit(":", 1)
7036 from importlib import import_module
7037 obj = import_module(mod)
7038 for obj_name in objs.split("."):
7039 obj = getattr(obj, obj_name)
7043 def generic_decoder(): # pragma: no cover
7044 # All of this below is a big hack with self references
7045 choice = PrimitiveTypes()
7046 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7047 choice.specs["SetOf"] = SetOf(schema=choice)
7048 for i in six_xrange(31):
7049 choice.specs["SequenceOf%d" % i] = SequenceOf(
7053 choice.specs["Any"] = Any()
7055 # Class name equals to type name, to omit it from output
7056 class SEQUENCEOF(SequenceOf):
7064 with_decode_path=False,
7065 decode_path_only=(),
7068 def _pprint_pps(pps):
7070 if hasattr(pp, "_fields"):
7072 decode_path_only != () and
7073 pp.decode_path[:len(decode_path_only)] != decode_path_only
7076 if pp.asn1_type_name == Choice.asn1_type_name:
7078 pp_kwargs = pp._asdict()
7079 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7080 pp = _pp(**pp_kwargs)
7081 yield pp_console_row(
7086 with_colours=with_colours,
7087 with_decode_path=with_decode_path,
7088 decode_path_len_decrease=len(decode_path_only),
7090 for row in pp_console_blob(
7092 decode_path_len_decrease=len(decode_path_only),
7096 for row in _pprint_pps(pp):
7098 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7099 return SEQUENCEOF(), pprint_any
7102 def main(): # pragma: no cover
7104 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7105 parser.add_argument(
7109 help="Skip that number of bytes from the beginning",
7111 parser.add_argument(
7113 help="Python paths to dictionary with OIDs, comma separated",
7115 parser.add_argument(
7117 help="Python path to schema definition to use",
7119 parser.add_argument(
7120 "--defines-by-path",
7121 help="Python path to decoder's defines_by_path",
7123 parser.add_argument(
7125 action="store_true",
7126 help="Disallow BER encoding",
7128 parser.add_argument(
7129 "--print-decode-path",
7130 action="store_true",
7131 help="Print decode paths",
7133 parser.add_argument(
7134 "--decode-path-only",
7135 help="Print only specified decode path",
7137 parser.add_argument(
7139 action="store_true",
7140 help="Allow explicit tag out-of-bound",
7142 parser.add_argument(
7144 action="store_true",
7145 help="Turn on event generation mode",
7147 parser.add_argument(
7149 type=argparse.FileType("rb"),
7150 help="Path to BER/CER/DER file you want to decode",
7152 args = parser.parse_args()
7154 args.RAWFile.seek(args.skip)
7155 raw = memoryview(args.RAWFile.read())
7156 args.RAWFile.close()
7158 raw = file_mmaped(args.RAWFile)[args.skip:]
7160 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7161 if args.oids else ()
7163 from functools import partial
7165 schema = obj_by_path(args.schema)
7166 pprinter = partial(pprint, big_blobs=True)
7168 schema, pprinter = generic_decoder()
7170 "bered": not args.nobered,
7171 "allow_expl_oob": args.allow_expl_oob,
7173 if args.defines_by_path is not None:
7174 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7175 from os import environ
7179 with_colours=environ.get("NO_COLOR") is None,
7180 with_decode_path=args.print_decode_path,
7182 () if args.decode_path_only is None else
7183 tuple(args.decode_path_only.split(":"))
7187 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7188 print(pprinter(obj, decode_path=decode_path))
7190 obj, tail = schema().decode(raw, ctx=ctx)
7191 print(pprinter(obj))
7193 print("\nTrailing data: %s" % hexenc(tail))
7196 if __name__ == "__main__":