3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
670 SEQUENCE/SET values with DEFAULT specified are automatically decoded
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
687 with open("huge", "rb") as fd:
688 raw = file_mmaped(fd)
689 obj = Something.decode(raw)
693 mmap-ed files in Python2.7 does not implement buffer protocol, so
694 memoryview won't work on them.
698 mmap maps the **whole** file. So it plays no role if you seek-ed it
699 before. Take the slice of the resulting memoryview with required
704 If you use ZFS as underlying storage, then pay attention that
705 currently most platforms does not deal good with ZFS ARC and ordinary
706 page cache used for mmaps. It can take twice the necessary size in
707 the memory: both in page cache and ZFS ARC.
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
725 opener = io.open if PY2 else open
726 with opener("result", "wb") as fd:
727 obj.encode_cer(fd.write)
732 obj.encode_cer(buf.write)
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
737 data = encode_cer(obj)
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
746 SET OF values can not be streamingly encoded, because they are
747 required to be sorted byte-by-byte. Big SET OF values still will take
748 much memory. Use neither SET nor SET OF values, as modern ASN.1
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
762 class Certificate(Sequence):
766 class SignedAttributes(SetOf):
771 .. _agg_octet_string:
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
780 SOME STRING[CONSTRUCTED]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[CONSTRUCTED]
797 OCTET STRING[PRIMITIVE]
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
812 fd = open("data.p7m", "rb")
813 raw = file_mmaped(fd)
814 ctx = {"bered": True}
815 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816 if decode_path == ("content",):
820 raise ValueError("no content found")
821 hasher_state = sha512()
823 hasher_state.update(data)
825 evgens = SignedData().decode_evgen(
826 raw[content.offset:],
827 offset=content.offset,
830 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
832 digest = hasher_state.digest()
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
841 SEQUENCE OF iterators
842 _____________________
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
878 fulllen, state = obj.encode1st()
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
885 opener = io.open if PY2 else open
886 with opener("result", "wb") as fd:
887 obj.encode2nd(fd.write, iter(state))
891 You **MUST NOT** use 1st pass state if anything is changed in the
892 objects. It is intended to be used immediately after 1st pass is
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
906 .. autoclass:: pyderasn.Obj
914 .. autoclass:: pyderasn.Boolean
919 .. autoclass:: pyderasn.Integer
920 :members: __init__, named
924 .. autoclass:: pyderasn.BitString
925 :members: __init__, bit_len, named
929 .. autoclass:: pyderasn.OctetString
934 .. autoclass:: pyderasn.Null
939 .. autoclass:: pyderasn.ObjectIdentifier
944 .. autoclass:: pyderasn.Enumerated
948 .. autoclass:: pyderasn.CommonString
952 .. autoclass:: pyderasn.NumericString
956 .. autoclass:: pyderasn.PrintableString
957 :members: __init__, allow_asterisk, allow_ampersand
961 .. autoclass:: pyderasn.UTCTime
962 :members: __init__, todatetime
966 .. autoclass:: pyderasn.GeneralizedTime
967 :members: __init__, todatetime
974 .. autoclass:: pyderasn.Choice
975 :members: __init__, choice, value
979 .. autoclass:: PrimitiveTypes
983 .. autoclass:: pyderasn.Any
991 .. autoclass:: pyderasn.Sequence
996 .. autoclass:: pyderasn.Set
1001 .. autoclass:: pyderasn.SequenceOf
1006 .. autoclass:: pyderasn.SetOf
1012 .. autofunction:: pyderasn.abs_decode_path
1013 .. autofunction:: pyderasn.agg_octet_string
1014 .. autofunction:: pyderasn.colonize_hex
1015 .. autofunction:: pyderasn.encode2pass
1016 .. autofunction:: pyderasn.encode_cer
1017 .. autofunction:: pyderasn.file_mmaped
1018 .. autofunction:: pyderasn.hexenc
1019 .. autofunction:: pyderasn.hexdec
1020 .. autofunction:: pyderasn.tag_encode
1021 .. autofunction:: pyderasn.tag_decode
1022 .. autofunction:: pyderasn.tag_ctxp
1023 .. autofunction:: pyderasn.tag_ctxc
1024 .. autoclass:: pyderasn.DecodeError
1026 .. autoclass:: pyderasn.NotEnoughData
1027 .. autoclass:: pyderasn.ExceedingData
1028 .. autoclass:: pyderasn.LenIndefForm
1029 .. autoclass:: pyderasn.TagMismatch
1030 .. autoclass:: pyderasn.InvalidLength
1031 .. autoclass:: pyderasn.InvalidOID
1032 .. autoclass:: pyderasn.ObjUnknown
1033 .. autoclass:: pyderasn.ObjNotReady
1034 .. autoclass:: pyderasn.InvalidValueType
1035 .. autoclass:: pyderasn.BoundsError
1042 You can decode DER/BER files using command line abilities::
1044 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1046 If there is no schema for your file, then you can try parsing it without,
1047 but of course IMPLICIT tags will often make it impossible. But result is
1048 good enough for the certificate above::
1050 $ python -m pyderasn path/to/file
1051 0 [1,3,1604] . >: SEQUENCE OF
1052 4 [1,3,1453] . . >: SEQUENCE OF
1053 8 [0,0, 5] . . . . >: [0] ANY
1054 . . . . . A0:03:02:01:02
1055 13 [1,1, 3] . . . . >: INTEGER 61595
1056 18 [1,1, 13] . . . . >: SEQUENCE OF
1057 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1058 31 [1,1, 0] . . . . . . >: NULL
1059 33 [1,3, 274] . . . . >: SEQUENCE OF
1060 37 [1,1, 11] . . . . . . >: SET OF
1061 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1062 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1063 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1065 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1066 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1067 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1068 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1069 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1070 . . . . . . . . . 61:2E:63:6F:6D:2F
1071 1461 [1,1, 13] . . >: SEQUENCE OF
1072 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1073 1474 [1,1, 0] . . . . >: NULL
1074 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1075 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1076 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1082 If you have got dictionaries with ObjectIdentifiers, like example one
1083 from ``tests/test_crts.py``::
1086 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1087 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1089 "2.5.4.10": "id-at-organizationName",
1090 "2.5.4.11": "id-at-organizationalUnitName",
1093 then you can pass it to pretty printer to see human readable OIDs::
1095 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1097 37 [1,1, 11] . . . . . . >: SET OF
1098 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1099 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1100 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1101 50 [1,1, 18] . . . . . . >: SET OF
1102 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1103 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1104 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1105 70 [1,1, 18] . . . . . . >: SET OF
1106 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1107 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1108 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1114 Each decoded element has so-called decode path: sequence of structure
1115 names it is passing during the decode process. Each element has its own
1116 unique path inside the whole ASN.1 tree. You can print it out with
1117 ``--print-decode-path`` option::
1119 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1120 0 [1,3,1604] Certificate SEQUENCE []
1121 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1122 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1123 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1124 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1125 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1126 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1128 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1129 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1130 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1131 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1132 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1133 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1134 . . . . . . . 13:02:45:53
1135 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1138 Now you can print only the specified tree, for example signature algorithm::
1140 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1141 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1142 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1143 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1147 from array import array
1148 from codecs import getdecoder
1149 from codecs import getencoder
1150 from collections import namedtuple
1151 from collections import OrderedDict
1152 from copy import copy
1153 from datetime import datetime
1154 from datetime import timedelta
1155 from io import BytesIO
1156 from math import ceil
1157 from mmap import mmap
1158 from mmap import PROT_READ
1159 from operator import attrgetter
1160 from string import ascii_letters
1161 from string import digits
1162 from sys import maxsize as sys_maxsize
1163 from sys import version_info
1164 from unicodedata import category as unicat
1166 from six import add_metaclass
1167 from six import binary_type
1168 from six import byte2int
1169 from six import indexbytes
1170 from six import int2byte
1171 from six import integer_types
1172 from six import iterbytes
1173 from six import iteritems
1174 from six import itervalues
1176 from six import string_types
1177 from six import text_type
1178 from six import unichr as six_unichr
1179 from six.moves import xrange as six_xrange
1183 from termcolor import colored
1184 except ImportError: # pragma: no cover
1185 def colored(what, *args, **kwargs):
1236 "TagClassApplication",
1239 "TagClassUniversal",
1240 "TagFormConstructed",
1251 TagClassUniversal = 0
1252 TagClassApplication = 1 << 6
1253 TagClassContext = 1 << 7
1254 TagClassPrivate = 1 << 6 | 1 << 7
1255 TagFormPrimitive = 0
1256 TagFormConstructed = 1 << 5
1258 TagClassContext: "",
1259 TagClassApplication: "APPLICATION ",
1260 TagClassPrivate: "PRIVATE ",
1261 TagClassUniversal: "UNIV ",
1265 LENINDEF = b"\x80" # length indefinite mark
1266 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1267 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1268 SET01 = frozenset("01")
1269 DECIMALS = frozenset(digits)
1270 DECIMAL_SIGNS = ".,"
1271 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1274 def file_mmaped(fd):
1275 """Make mmap-ed memoryview for reading from file
1277 :param fd: file object
1278 :returns: memoryview over read-only mmap-ing of the whole file
1280 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1283 if not set(value) <= DECIMALS:
1284 raise ValueError("non-pure integer")
1287 def fractions2float(fractions_raw):
1288 pureint(fractions_raw)
1289 return float("0." + fractions_raw)
1292 def get_def_by_path(defines_by_path, sub_decode_path):
1293 """Get define by decode path
1295 for path, define in defines_by_path:
1296 if len(path) != len(sub_decode_path):
1298 for p1, p2 in zip(path, sub_decode_path):
1299 if (not p1 is any) and (p1 != p2):
1305 ########################################################################
1307 ########################################################################
1309 class ASN1Error(ValueError):
1313 class DecodeError(ASN1Error):
1314 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1316 :param str msg: reason of decode failing
1317 :param klass: optional exact DecodeError inherited class (like
1318 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1319 :py:exc:`InvalidLength`)
1320 :param decode_path: tuple of strings. It contains human
1321 readable names of the fields through which
1322 decoding process has passed
1323 :param int offset: binary offset where failure happened
1325 super(DecodeError, self).__init__()
1328 self.decode_path = decode_path
1329 self.offset = offset
1334 "" if self.klass is None else self.klass.__name__,
1336 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1337 if len(self.decode_path) > 0 else ""
1339 ("(at %d)" % self.offset) if self.offset > 0 else "",
1345 return "%s(%s)" % (self.__class__.__name__, self)
1348 class NotEnoughData(DecodeError):
1352 class ExceedingData(ASN1Error):
1353 def __init__(self, nbytes):
1354 super(ExceedingData, self).__init__()
1355 self.nbytes = nbytes
1358 return "%d trailing bytes" % self.nbytes
1361 return "%s(%s)" % (self.__class__.__name__, self)
1364 class LenIndefForm(DecodeError):
1368 class TagMismatch(DecodeError):
1372 class InvalidLength(DecodeError):
1376 class InvalidOID(DecodeError):
1380 class ObjUnknown(ASN1Error):
1381 def __init__(self, name):
1382 super(ObjUnknown, self).__init__()
1386 return "object is unknown: %s" % self.name
1389 return "%s(%s)" % (self.__class__.__name__, self)
1392 class ObjNotReady(ASN1Error):
1393 def __init__(self, name):
1394 super(ObjNotReady, self).__init__()
1398 return "object is not ready: %s" % self.name
1401 return "%s(%s)" % (self.__class__.__name__, self)
1404 class InvalidValueType(ASN1Error):
1405 def __init__(self, expected_types):
1406 super(InvalidValueType, self).__init__()
1407 self.expected_types = expected_types
1410 return "invalid value type, expected: %s" % ", ".join(
1411 [repr(t) for t in self.expected_types]
1415 return "%s(%s)" % (self.__class__.__name__, self)
1418 class BoundsError(ASN1Error):
1419 def __init__(self, bound_min, value, bound_max):
1420 super(BoundsError, self).__init__()
1421 self.bound_min = bound_min
1423 self.bound_max = bound_max
1426 return "unsatisfied bounds: %s <= %s <= %s" % (
1433 return "%s(%s)" % (self.__class__.__name__, self)
1436 ########################################################################
1438 ########################################################################
1440 _hexdecoder = getdecoder("hex")
1441 _hexencoder = getencoder("hex")
1445 """Binary data to hexadecimal string convert
1447 return _hexdecoder(data)[0]
1451 """Hexadecimal string to binary data convert
1453 return _hexencoder(data)[0].decode("ascii")
1456 def int_bytes_len(num, byte_len=8):
1459 return int(ceil(float(num.bit_length()) / byte_len))
1462 def zero_ended_encode(num):
1463 octets = bytearray(int_bytes_len(num, 7))
1465 octets[i] = num & 0x7F
1469 octets[i] = 0x80 | (num & 0x7F)
1472 return bytes(octets)
1475 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1476 """Encode tag to binary form
1478 :param int num: tag's number
1479 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1480 :py:data:`pyderasn.TagClassContext`,
1481 :py:data:`pyderasn.TagClassApplication`,
1482 :py:data:`pyderasn.TagClassPrivate`)
1483 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1484 :py:data:`pyderasn.TagFormConstructed`)
1488 return int2byte(klass | form | num)
1489 # [XX|X|11111][1.......][1.......] ... [0.......]
1490 return int2byte(klass | form | 31) + zero_ended_encode(num)
1493 def tag_decode(tag):
1494 """Decode tag from binary form
1498 No validation is performed, assuming that it has already passed.
1500 It returns tuple with three integers, as
1501 :py:func:`pyderasn.tag_encode` accepts.
1503 first_octet = byte2int(tag)
1504 klass = first_octet & 0xC0
1505 form = first_octet & 0x20
1506 if first_octet & 0x1F < 0x1F:
1507 return (klass, form, first_octet & 0x1F)
1509 for octet in iterbytes(tag[1:]):
1512 return (klass, form, num)
1516 """Create CONTEXT PRIMITIVE tag
1518 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1522 """Create CONTEXT CONSTRUCTED tag
1524 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1527 def tag_strip(data):
1528 """Take off tag from the data
1530 :returns: (encoded tag, tag length, remaining data)
1533 raise NotEnoughData("no data at all")
1534 if byte2int(data) & 0x1F < 31:
1535 return data[:1], 1, data[1:]
1540 raise DecodeError("unfinished tag")
1541 if indexbytes(data, i) & 0x80 == 0:
1544 return data[:i], i, data[i:]
1550 octets = bytearray(int_bytes_len(l) + 1)
1551 octets[0] = 0x80 | (len(octets) - 1)
1552 for i in six_xrange(len(octets) - 1, 0, -1):
1553 octets[i] = l & 0xFF
1555 return bytes(octets)
1558 def len_decode(data):
1561 :returns: (decoded length, length's length, remaining data)
1562 :raises LenIndefForm: if indefinite form encoding is met
1565 raise NotEnoughData("no data at all")
1566 first_octet = byte2int(data)
1567 if first_octet & 0x80 == 0:
1568 return first_octet, 1, data[1:]
1569 octets_num = first_octet & 0x7F
1570 if octets_num + 1 > len(data):
1571 raise NotEnoughData("encoded length is longer than data")
1573 raise LenIndefForm()
1574 if byte2int(data[1:]) == 0:
1575 raise DecodeError("leading zeros")
1577 for v in iterbytes(data[1:1 + octets_num]):
1580 raise DecodeError("long form instead of short one")
1581 return l, 1 + octets_num, data[1 + octets_num:]
1584 LEN0 = len_encode(0)
1585 LEN1 = len_encode(1)
1586 LEN1K = len_encode(1000)
1590 """How many bytes length field will take
1594 if l < 256: # 1 << 8
1596 if l < 65536: # 1 << 16
1598 if l < 16777216: # 1 << 24
1600 if l < 4294967296: # 1 << 32
1602 if l < 1099511627776: # 1 << 40
1604 if l < 281474976710656: # 1 << 48
1606 if l < 72057594037927936: # 1 << 56
1608 raise OverflowError("too big length")
1611 def write_full(writer, data):
1612 """Fully write provided data
1614 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1616 BytesIO does not guarantee that the whole data will be written at
1617 once. That function write everything provided, raising an error if
1618 ``writer`` returns None.
1620 data = memoryview(data)
1622 while written != len(data):
1623 n = writer(data[written:])
1625 raise ValueError("can not write to buf")
1629 # If it is 64-bit system, then use compact 64-bit array of unsigned
1630 # longs. Use an ordinary list with universal integers otherwise, that
1632 if sys_maxsize > 2 ** 32:
1633 def state_2pass_new():
1636 def state_2pass_new():
1640 ########################################################################
1642 ########################################################################
1644 class AutoAddSlots(type):
1645 def __new__(cls, name, bases, _dict):
1646 _dict["__slots__"] = _dict.get("__slots__", ())
1647 return type.__new__(cls, name, bases, _dict)
1650 BasicState = namedtuple("BasicState", (
1663 ), **NAMEDTUPLE_KWARGS)
1666 @add_metaclass(AutoAddSlots)
1668 """Common ASN.1 object class
1670 All ASN.1 types are inherited from it. It has metaclass that
1671 automatically adds ``__slots__`` to all inherited classes.
1696 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1697 self._expl = getattr(self, "expl", None) if expl is None else expl
1698 if self.tag != self.tag_default and self._expl is not None:
1699 raise ValueError("implicit and explicit tags can not be set simultaneously")
1700 if self.tag is None:
1701 self._tag_order = None
1703 tag_class, _, tag_num = tag_decode(
1704 self.tag if self._expl is None else self._expl
1706 self._tag_order = (tag_class, tag_num)
1707 if default is not None:
1709 self.optional = optional
1710 self.offset, self.llen, self.vlen = _decoded
1712 self.expl_lenindef = False
1713 self.lenindef = False
1714 self.ber_encoded = False
1717 def ready(self): # pragma: no cover
1718 """Is object ready to be encoded?
1720 raise NotImplementedError()
1722 def _assert_ready(self):
1724 raise ObjNotReady(self.__class__.__name__)
1728 """Is either object or any elements inside is BER encoded?
1730 return self.expl_lenindef or self.lenindef or self.ber_encoded
1734 """Is object decoded?
1736 return (self.llen + self.vlen) > 0
1738 def __getstate__(self): # pragma: no cover
1739 """Used for making safe to be mutable pickleable copies
1741 raise NotImplementedError()
1743 def __setstate__(self, state):
1744 if state.version != __version__:
1745 raise ValueError("data is pickled by different PyDERASN version")
1746 self.tag = state.tag
1747 self._tag_order = state.tag_order
1748 self._expl = state.expl
1749 self.default = state.default
1750 self.optional = state.optional
1751 self.offset = state.offset
1752 self.llen = state.llen
1753 self.vlen = state.vlen
1754 self.expl_lenindef = state.expl_lenindef
1755 self.lenindef = state.lenindef
1756 self.ber_encoded = state.ber_encoded
1759 def tag_order(self):
1760 """Tag's (class, number) used for DER/CER sorting
1762 return self._tag_order
1765 def tag_order_cer(self):
1766 return self.tag_order
1770 """.. seealso:: :ref:`decoding`
1772 return len(self.tag)
1776 """.. seealso:: :ref:`decoding`
1778 return self.tlen + self.llen + self.vlen
1780 def __str__(self): # pragma: no cover
1781 return self.__bytes__() if PY2 else self.__unicode__()
1783 def __ne__(self, their):
1784 return not(self == their)
1786 def __gt__(self, their): # pragma: no cover
1787 return not(self < their)
1789 def __le__(self, their): # pragma: no cover
1790 return (self == their) or (self < their)
1792 def __ge__(self, their): # pragma: no cover
1793 return (self == their) or (self > their)
1795 def _encode(self): # pragma: no cover
1796 raise NotImplementedError()
1798 def _encode_cer(self, writer):
1799 write_full(writer, self._encode())
1801 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1802 yield NotImplemented
1804 def _encode1st(self, state):
1805 raise NotImplementedError()
1807 def _encode2nd(self, writer, state_iter):
1808 raise NotImplementedError()
1811 """DER encode the structure
1813 :returns: DER representation
1815 raw = self._encode()
1816 if self._expl is None:
1818 return b"".join((self._expl, len_encode(len(raw)), raw))
1820 def encode1st(self, state=None):
1821 """Do the 1st pass of 2-pass encoding
1823 :rtype: (int, array("L"))
1824 :returns: full length of encoded data and precalculated various
1828 state = state_2pass_new()
1829 if self._expl is None:
1830 return self._encode1st(state)
1832 idx = len(state) - 1
1833 vlen, _ = self._encode1st(state)
1835 fulllen = len(self._expl) + len_size(vlen) + vlen
1836 return fulllen, state
1838 def encode2nd(self, writer, state_iter):
1839 """Do the 2nd pass of 2-pass encoding
1841 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1842 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1844 if self._expl is None:
1845 self._encode2nd(writer, state_iter)
1847 write_full(writer, self._expl + len_encode(next(state_iter)))
1848 self._encode2nd(writer, state_iter)
1850 def encode_cer(self, writer):
1851 """CER encode the structure to specified writer
1853 :param writer: must comply with ``io.RawIOBase.write``
1854 behaviour. It takes slice to be written and
1855 returns number of bytes processed. If it returns
1856 None, then exception will be raised
1858 if self._expl is not None:
1859 write_full(writer, self._expl + LENINDEF)
1860 if getattr(self, "der_forced", False):
1861 write_full(writer, self._encode())
1863 self._encode_cer(writer)
1864 if self._expl is not None:
1865 write_full(writer, EOC)
1867 def hexencode(self):
1868 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1870 return hexenc(self.encode())
1880 _ctx_immutable=True,
1884 :param data: either binary or memoryview
1885 :param int offset: initial data's offset
1886 :param bool leavemm: do we need to leave memoryview of remaining
1887 data as is, or convert it to bytes otherwise
1888 :param decode_path: current decode path (tuples of strings,
1889 possibly with DecodePathDefBy) with will be
1890 the root for all underlying objects
1891 :param ctx: optional :ref:`context <ctx>` governing decoding process
1892 :param bool tag_only: decode only the tag, without length and
1893 contents (used only in Choice and Set
1894 structures, trying to determine if tag satisfies
1896 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1898 :returns: (Obj, remaining data)
1900 .. seealso:: :ref:`decoding`
1902 result = next(self.decode_evgen(
1914 _, obj, tail = result
1925 _ctx_immutable=True,
1928 """Decode with evgen mode on
1930 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1931 it returns the generator producing ``(decode_path, obj, tail)``
1933 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1937 elif _ctx_immutable:
1939 tlv = memoryview(data)
1942 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1945 if self._expl is None:
1946 for result in self._decode(
1949 decode_path=decode_path,
1952 evgen_mode=_evgen_mode,
1957 _decode_path, obj, tail = result
1958 if not _decode_path is decode_path:
1962 t, tlen, lv = tag_strip(tlv)
1963 except DecodeError as err:
1964 raise err.__class__(
1966 klass=self.__class__,
1967 decode_path=decode_path,
1972 klass=self.__class__,
1973 decode_path=decode_path,
1977 l, llen, v = len_decode(lv)
1978 except LenIndefForm as err:
1979 if not ctx.get("bered", False):
1980 raise err.__class__(
1982 klass=self.__class__,
1983 decode_path=decode_path,
1987 offset += tlen + llen
1988 for result in self._decode(
1991 decode_path=decode_path,
1994 evgen_mode=_evgen_mode,
1996 if tag_only: # pragma: no cover
1999 _decode_path, obj, tail = result
2000 if not _decode_path is decode_path:
2002 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2003 if eoc_expected.tobytes() != EOC:
2006 klass=self.__class__,
2007 decode_path=decode_path,
2011 obj.expl_lenindef = True
2012 except DecodeError as err:
2013 raise err.__class__(
2015 klass=self.__class__,
2016 decode_path=decode_path,
2021 raise NotEnoughData(
2022 "encoded length is longer than data",
2023 klass=self.__class__,
2024 decode_path=decode_path,
2027 for result in self._decode(
2029 offset=offset + tlen + llen,
2030 decode_path=decode_path,
2033 evgen_mode=_evgen_mode,
2035 if tag_only: # pragma: no cover
2038 _decode_path, obj, tail = result
2039 if not _decode_path is decode_path:
2041 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2043 "explicit tag out-of-bound, longer than data",
2044 klass=self.__class__,
2045 decode_path=decode_path,
2048 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2050 def decod(self, data, offset=0, decode_path=(), ctx=None):
2051 """Decode the data, check that tail is empty
2053 :raises ExceedingData: if tail is not empty
2055 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2056 (decode without tail) that also checks that there is no
2059 obj, tail = self.decode(
2062 decode_path=decode_path,
2067 raise ExceedingData(len(tail))
2070 def hexdecode(self, data, *args, **kwargs):
2071 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2073 return self.decode(hexdec(data), *args, **kwargs)
2075 def hexdecod(self, data, *args, **kwargs):
2076 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2078 return self.decod(hexdec(data), *args, **kwargs)
2082 """.. seealso:: :ref:`decoding`
2084 return self._expl is not None
2088 """.. seealso:: :ref:`decoding`
2093 def expl_tlen(self):
2094 """.. seealso:: :ref:`decoding`
2096 return len(self._expl)
2099 def expl_llen(self):
2100 """.. seealso:: :ref:`decoding`
2102 if self.expl_lenindef:
2104 return len(len_encode(self.tlvlen))
2107 def expl_offset(self):
2108 """.. seealso:: :ref:`decoding`
2110 return self.offset - self.expl_tlen - self.expl_llen
2113 def expl_vlen(self):
2114 """.. seealso:: :ref:`decoding`
2119 def expl_tlvlen(self):
2120 """.. seealso:: :ref:`decoding`
2122 return self.expl_tlen + self.expl_llen + self.expl_vlen
2125 def fulloffset(self):
2126 """.. seealso:: :ref:`decoding`
2128 return self.expl_offset if self.expled else self.offset
2132 """.. seealso:: :ref:`decoding`
2134 return self.expl_tlvlen if self.expled else self.tlvlen
2136 def pps_lenindef(self, decode_path):
2137 if self.lenindef and not (
2138 getattr(self, "defined", None) is not None and
2139 self.defined[1].lenindef
2142 asn1_type_name="EOC",
2144 decode_path=decode_path,
2146 self.offset + self.tlvlen -
2147 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2155 if self.expl_lenindef:
2157 asn1_type_name="EOC",
2158 obj_name="EXPLICIT",
2159 decode_path=decode_path,
2160 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2169 def encode_cer(obj):
2170 """Encode to CER in memory buffer
2172 :returns bytes: memory buffer contents
2175 obj.encode_cer(buf.write)
2176 return buf.getvalue()
2179 def encode2pass(obj):
2180 """Encode (2-pass mode) to DER in memory buffer
2182 :returns bytes: memory buffer contents
2185 _, state = obj.encode1st()
2186 obj.encode2nd(buf.write, iter(state))
2187 return buf.getvalue()
2190 class DecodePathDefBy(object):
2191 """DEFINED BY representation inside decode path
2193 __slots__ = ("defined_by",)
2195 def __init__(self, defined_by):
2196 self.defined_by = defined_by
2198 def __ne__(self, their):
2199 return not(self == their)
2201 def __eq__(self, their):
2202 if not isinstance(their, self.__class__):
2204 return self.defined_by == their.defined_by
2207 return "DEFINED BY " + str(self.defined_by)
2210 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2213 ########################################################################
2215 ########################################################################
2217 PP = namedtuple("PP", (
2240 ), **NAMEDTUPLE_KWARGS)
2245 asn1_type_name="unknown",
2262 expl_lenindef=False,
2293 def _colourize(what, colour, with_colours, attrs=("bold",)):
2294 return colored(what, colour, attrs=attrs) if with_colours else what
2297 def colonize_hex(hexed):
2298 """Separate hexadecimal string with colons
2300 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2309 with_decode_path=False,
2310 decode_path_len_decrease=0,
2317 " " if pp.expl_offset is None else
2318 ("-%d" % (pp.offset - pp.expl_offset))
2320 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2322 col = _colourize(col, "red", with_colours, ())
2323 col += _colourize("B", "red", with_colours) if pp.bered else " "
2325 col = "[%d,%d,%4d]%s" % (
2329 LENINDEF_PP_CHAR if pp.lenindef else " "
2331 col = _colourize(col, "green", with_colours, ())
2333 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2334 if decode_path_len > 0:
2335 cols.append(" ." * decode_path_len)
2336 ent = pp.decode_path[-1]
2337 if isinstance(ent, DecodePathDefBy):
2338 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2339 value = str(ent.defined_by)
2342 len(oid_maps) > 0 and
2343 ent.defined_by.asn1_type_name ==
2344 ObjectIdentifier.asn1_type_name
2346 for oid_map in oid_maps:
2347 oid_name = oid_map.get(value)
2348 if oid_name is not None:
2349 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2351 if oid_name is None:
2352 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2354 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2355 if pp.expl is not None:
2356 klass, _, num = pp.expl
2357 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2358 cols.append(_colourize(col, "blue", with_colours))
2359 if pp.impl is not None:
2360 klass, _, num = pp.impl
2361 col = "[%s%d]" % (TagClassReprs[klass], num)
2362 cols.append(_colourize(col, "blue", with_colours))
2363 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2364 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2366 cols.append(_colourize("BER", "red", with_colours))
2367 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2368 if pp.value is not None:
2370 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2372 len(oid_maps) > 0 and
2373 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2375 for oid_map in oid_maps:
2376 oid_name = oid_map.get(value)
2377 if oid_name is not None:
2378 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2380 if pp.asn1_type_name == Integer.asn1_type_name:
2381 hex_repr = hex(int(pp.obj._value))[2:].upper()
2382 if len(hex_repr) % 2 != 0:
2383 hex_repr = "0" + hex_repr
2384 cols.append(_colourize(
2385 "(%s)" % colonize_hex(hex_repr),
2390 if pp.blob.__class__ == binary_type:
2391 cols.append(hexenc(pp.blob))
2392 elif pp.blob.__class__ == tuple:
2393 cols.append(", ".join(pp.blob))
2395 cols.append(_colourize("OPTIONAL", "red", with_colours))
2397 cols.append(_colourize("DEFAULT", "red", with_colours))
2398 if with_decode_path:
2399 cols.append(_colourize(
2400 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2404 return " ".join(cols)
2407 def pp_console_blob(pp, decode_path_len_decrease=0):
2408 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2409 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2410 if decode_path_len > 0:
2411 cols.append(" ." * (decode_path_len + 1))
2412 if pp.blob.__class__ == binary_type:
2413 blob = hexenc(pp.blob).upper()
2414 for i in six_xrange(0, len(blob), 32):
2415 chunk = blob[i:i + 32]
2416 yield " ".join(cols + [colonize_hex(chunk)])
2417 elif pp.blob.__class__ == tuple:
2418 yield " ".join(cols + [", ".join(pp.blob)])
2426 with_decode_path=False,
2427 decode_path_only=(),
2430 """Pretty print object
2432 :param Obj obj: object you want to pretty print
2433 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2434 Its human readable form is printed when OID is met
2435 :param big_blobs: if large binary objects are met (like OctetString
2436 values), do we need to print them too, on separate
2438 :param with_colours: colourize output, if ``termcolor`` library
2440 :param with_decode_path: print decode path
2441 :param decode_path_only: print only that specified decode path
2443 def _pprint_pps(pps):
2445 if hasattr(pp, "_fields"):
2447 decode_path_only != () and
2449 str(p) for p in pp.decode_path[:len(decode_path_only)]
2450 ) != decode_path_only
2454 yield pp_console_row(
2459 with_colours=with_colours,
2460 with_decode_path=with_decode_path,
2461 decode_path_len_decrease=len(decode_path_only),
2463 for row in pp_console_blob(
2465 decode_path_len_decrease=len(decode_path_only),
2469 yield pp_console_row(
2474 with_colours=with_colours,
2475 with_decode_path=with_decode_path,
2476 decode_path_len_decrease=len(decode_path_only),
2479 for row in _pprint_pps(pp):
2481 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2484 ########################################################################
2485 # ASN.1 primitive types
2486 ########################################################################
2488 BooleanState = namedtuple(
2490 BasicState._fields + ("value",),
2496 """``BOOLEAN`` boolean type
2498 >>> b = Boolean(True)
2500 >>> b == Boolean(True)
2506 tag_default = tag_encode(1)
2507 asn1_type_name = "BOOLEAN"
2519 :param value: set the value. Either boolean type, or
2520 :py:class:`pyderasn.Boolean` object
2521 :param bytes impl: override default tag with ``IMPLICIT`` one
2522 :param bytes expl: override default tag with ``EXPLICIT`` one
2523 :param default: set default value. Type same as in ``value``
2524 :param bool optional: is object ``OPTIONAL`` in sequence
2526 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2527 self._value = None if value is None else self._value_sanitize(value)
2528 if default is not None:
2529 default = self._value_sanitize(default)
2530 self.default = self.__class__(
2536 self._value = default
2538 def _value_sanitize(self, value):
2539 if value.__class__ == bool:
2541 if issubclass(value.__class__, Boolean):
2543 raise InvalidValueType((self.__class__, bool))
2547 return self._value is not None
2549 def __getstate__(self):
2550 return BooleanState(
2566 def __setstate__(self, state):
2567 super(Boolean, self).__setstate__(state)
2568 self._value = state.value
2570 def __nonzero__(self):
2571 self._assert_ready()
2575 self._assert_ready()
2578 def __eq__(self, their):
2579 if their.__class__ == bool:
2580 return self._value == their
2581 if not issubclass(their.__class__, Boolean):
2584 self._value == their._value and
2585 self.tag == their.tag and
2586 self._expl == their._expl
2597 return self.__class__(
2599 impl=self.tag if impl is None else impl,
2600 expl=self._expl if expl is None else expl,
2601 default=self.default if default is None else default,
2602 optional=self.optional if optional is None else optional,
2606 self._assert_ready()
2607 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2609 def _encode1st(self, state):
2610 return len(self.tag) + 2, state
2612 def _encode2nd(self, writer, state_iter):
2613 self._assert_ready()
2614 write_full(writer, self._encode())
2616 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2618 t, _, lv = tag_strip(tlv)
2619 except DecodeError as err:
2620 raise err.__class__(
2622 klass=self.__class__,
2623 decode_path=decode_path,
2628 klass=self.__class__,
2629 decode_path=decode_path,
2636 l, _, v = len_decode(lv)
2637 except DecodeError as err:
2638 raise err.__class__(
2640 klass=self.__class__,
2641 decode_path=decode_path,
2645 raise InvalidLength(
2646 "Boolean's length must be equal to 1",
2647 klass=self.__class__,
2648 decode_path=decode_path,
2652 raise NotEnoughData(
2653 "encoded length is longer than data",
2654 klass=self.__class__,
2655 decode_path=decode_path,
2658 first_octet = byte2int(v)
2660 if first_octet == 0:
2662 elif first_octet == 0xFF:
2664 elif ctx.get("bered", False):
2669 "unacceptable Boolean value",
2670 klass=self.__class__,
2671 decode_path=decode_path,
2674 obj = self.__class__(
2678 default=self.default,
2679 optional=self.optional,
2680 _decoded=(offset, 1, 1),
2682 obj.ber_encoded = ber_encoded
2683 yield decode_path, obj, v[1:]
2686 return pp_console_row(next(self.pps()))
2688 def pps(self, decode_path=()):
2691 asn1_type_name=self.asn1_type_name,
2692 obj_name=self.__class__.__name__,
2693 decode_path=decode_path,
2694 value=str(self._value) if self.ready else None,
2695 optional=self.optional,
2696 default=self == self.default,
2697 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2698 expl=None if self._expl is None else tag_decode(self._expl),
2703 expl_offset=self.expl_offset if self.expled else None,
2704 expl_tlen=self.expl_tlen if self.expled else None,
2705 expl_llen=self.expl_llen if self.expled else None,
2706 expl_vlen=self.expl_vlen if self.expled else None,
2707 expl_lenindef=self.expl_lenindef,
2708 ber_encoded=self.ber_encoded,
2711 for pp in self.pps_lenindef(decode_path):
2715 IntegerState = namedtuple(
2717 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2723 """``INTEGER`` integer type
2725 >>> b = Integer(-123)
2727 >>> b == Integer(-123)
2732 >>> Integer(2, bounds=(1, 3))
2734 >>> Integer(5, bounds=(1, 3))
2735 Traceback (most recent call last):
2736 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2740 class Version(Integer):
2747 >>> v = Version("v1")
2754 {'v3': 2, 'v1': 0, 'v2': 1}
2756 __slots__ = ("specs", "_bound_min", "_bound_max")
2757 tag_default = tag_encode(2)
2758 asn1_type_name = "INTEGER"
2772 :param value: set the value. Either integer type, named value
2773 (if ``schema`` is specified in the class), or
2774 :py:class:`pyderasn.Integer` object
2775 :param bounds: set ``(MIN, MAX)`` value constraint.
2776 (-inf, +inf) by default
2777 :param bytes impl: override default tag with ``IMPLICIT`` one
2778 :param bytes expl: override default tag with ``EXPLICIT`` one
2779 :param default: set default value. Type same as in ``value``
2780 :param bool optional: is object ``OPTIONAL`` in sequence
2782 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2784 specs = getattr(self, "schema", {}) if _specs is None else _specs
2785 self.specs = specs if specs.__class__ == dict else dict(specs)
2786 self._bound_min, self._bound_max = getattr(
2789 (float("-inf"), float("+inf")),
2790 ) if bounds is None else bounds
2791 if value is not None:
2792 self._value = self._value_sanitize(value)
2793 if default is not None:
2794 default = self._value_sanitize(default)
2795 self.default = self.__class__(
2801 if self._value is None:
2802 self._value = default
2804 def _value_sanitize(self, value):
2805 if isinstance(value, integer_types):
2807 elif issubclass(value.__class__, Integer):
2808 value = value._value
2809 elif value.__class__ == str:
2810 value = self.specs.get(value)
2812 raise ObjUnknown("integer value: %s" % value)
2814 raise InvalidValueType((self.__class__, int, str))
2815 if not self._bound_min <= value <= self._bound_max:
2816 raise BoundsError(self._bound_min, value, self._bound_max)
2821 return self._value is not None
2823 def __getstate__(self):
2824 return IntegerState(
2843 def __setstate__(self, state):
2844 super(Integer, self).__setstate__(state)
2845 self.specs = state.specs
2846 self._value = state.value
2847 self._bound_min = state.bound_min
2848 self._bound_max = state.bound_max
2851 self._assert_ready()
2852 return int(self._value)
2855 self._assert_ready()
2856 return hash(b"".join((
2858 bytes(self._expl or b""),
2859 str(self._value).encode("ascii"),
2862 def __eq__(self, their):
2863 if isinstance(their, integer_types):
2864 return self._value == their
2865 if not issubclass(their.__class__, Integer):
2868 self._value == their._value and
2869 self.tag == their.tag and
2870 self._expl == their._expl
2873 def __lt__(self, their):
2874 return self._value < their._value
2878 """Return named representation (if exists) of the value
2880 for name, value in iteritems(self.specs):
2881 if value == self._value:
2894 return self.__class__(
2897 (self._bound_min, self._bound_max)
2898 if bounds is None else bounds
2900 impl=self.tag if impl is None else impl,
2901 expl=self._expl if expl is None else expl,
2902 default=self.default if default is None else default,
2903 optional=self.optional if optional is None else optional,
2907 def _encode_payload(self):
2908 self._assert_ready()
2912 octets = bytearray([0])
2916 octets = bytearray()
2918 octets.append((value & 0xFF) ^ 0xFF)
2920 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2923 octets = bytearray()
2925 octets.append(value & 0xFF)
2927 if octets[-1] & 0x80 > 0:
2930 octets = bytes(octets)
2932 bytes_len = ceil(value.bit_length() / 8) or 1
2935 octets = value.to_bytes(
2940 except OverflowError:
2945 return b"".join((self.tag, len_encode(len(octets)), octets))
2948 octets = self._encode_payload()
2949 return b"".join((self.tag, len_encode(len(octets)), octets))
2951 def _encode1st(self, state):
2952 l = len(self._encode_payload())
2953 return len(self.tag) + len_size(l) + l, state
2955 def _encode2nd(self, writer, state_iter):
2956 write_full(writer, self._encode())
2958 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2960 t, _, lv = tag_strip(tlv)
2961 except DecodeError as err:
2962 raise err.__class__(
2964 klass=self.__class__,
2965 decode_path=decode_path,
2970 klass=self.__class__,
2971 decode_path=decode_path,
2978 l, llen, v = len_decode(lv)
2979 except DecodeError as err:
2980 raise err.__class__(
2982 klass=self.__class__,
2983 decode_path=decode_path,
2987 raise NotEnoughData(
2988 "encoded length is longer than data",
2989 klass=self.__class__,
2990 decode_path=decode_path,
2994 raise NotEnoughData(
2996 klass=self.__class__,
2997 decode_path=decode_path,
3000 v, tail = v[:l], v[l:]
3001 first_octet = byte2int(v)
3003 second_octet = byte2int(v[1:])
3005 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3006 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3009 "non normalized integer",
3010 klass=self.__class__,
3011 decode_path=decode_path,
3016 if first_octet & 0x80 > 0:
3017 octets = bytearray()
3018 for octet in bytearray(v):
3019 octets.append(octet ^ 0xFF)
3020 for octet in octets:
3021 value = (value << 8) | octet
3025 for octet in bytearray(v):
3026 value = (value << 8) | octet
3028 value = int.from_bytes(v, byteorder="big", signed=True)
3030 obj = self.__class__(
3032 bounds=(self._bound_min, self._bound_max),
3035 default=self.default,
3036 optional=self.optional,
3038 _decoded=(offset, llen, l),
3040 except BoundsError as err:
3043 klass=self.__class__,
3044 decode_path=decode_path,
3047 yield decode_path, obj, tail
3050 return pp_console_row(next(self.pps()))
3052 def pps(self, decode_path=()):
3055 asn1_type_name=self.asn1_type_name,
3056 obj_name=self.__class__.__name__,
3057 decode_path=decode_path,
3058 value=(self.named or str(self._value)) if self.ready else None,
3059 optional=self.optional,
3060 default=self == self.default,
3061 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3062 expl=None if self._expl is None else tag_decode(self._expl),
3067 expl_offset=self.expl_offset if self.expled else None,
3068 expl_tlen=self.expl_tlen if self.expled else None,
3069 expl_llen=self.expl_llen if self.expled else None,
3070 expl_vlen=self.expl_vlen if self.expled else None,
3071 expl_lenindef=self.expl_lenindef,
3074 for pp in self.pps_lenindef(decode_path):
3078 BitStringState = namedtuple(
3080 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3085 class BitString(Obj):
3086 """``BIT STRING`` bit string type
3088 >>> BitString(b"hello world")
3089 BIT STRING 88 bits 68656c6c6f20776f726c64
3092 >>> b == b"hello world"
3097 >>> BitString("'0A3B5F291CD'H")
3098 BIT STRING 44 bits 0a3b5f291cd0
3099 >>> b = BitString("'010110000000'B")
3100 BIT STRING 12 bits 5800
3103 >>> b[0], b[1], b[2], b[3]
3104 (False, True, False, True)
3108 [False, True, False, True, True, False, False, False, False, False, False, False]
3112 class KeyUsage(BitString):
3114 ("digitalSignature", 0),
3115 ("nonRepudiation", 1),
3116 ("keyEncipherment", 2),
3119 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3120 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3122 ['nonRepudiation', 'keyEncipherment']
3124 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3128 Pay attention that BIT STRING can be encoded both in primitive
3129 and constructed forms. Decoder always checks constructed form tag
3130 additionally to specified primitive one. If BER decoding is
3131 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3132 of DER restrictions.
3134 __slots__ = ("tag_constructed", "specs", "defined")
3135 tag_default = tag_encode(3)
3136 asn1_type_name = "BIT STRING"
3149 :param value: set the value. Either binary type, tuple of named
3150 values (if ``schema`` is specified in the class),
3151 string in ``'XXX...'B`` form, or
3152 :py:class:`pyderasn.BitString` object
3153 :param bytes impl: override default tag with ``IMPLICIT`` one
3154 :param bytes expl: override default tag with ``EXPLICIT`` one
3155 :param default: set default value. Type same as in ``value``
3156 :param bool optional: is object ``OPTIONAL`` in sequence
3158 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3159 specs = getattr(self, "schema", {}) if _specs is None else _specs
3160 self.specs = specs if specs.__class__ == dict else dict(specs)
3161 self._value = None if value is None else self._value_sanitize(value)
3162 if default is not None:
3163 default = self._value_sanitize(default)
3164 self.default = self.__class__(
3170 self._value = default
3172 tag_klass, _, tag_num = tag_decode(self.tag)
3173 self.tag_constructed = tag_encode(
3175 form=TagFormConstructed,
3179 def _bits2octets(self, bits):
3180 if len(self.specs) > 0:
3181 bits = bits.rstrip("0")
3183 bits += "0" * ((8 - (bit_len % 8)) % 8)
3184 octets = bytearray(len(bits) // 8)
3185 for i in six_xrange(len(octets)):
3186 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3187 return bit_len, bytes(octets)
3189 def _value_sanitize(self, value):
3190 if isinstance(value, (string_types, binary_type)):
3192 isinstance(value, string_types) and
3193 value.startswith("'")
3195 if value.endswith("'B"):
3197 if not frozenset(value) <= SET01:
3198 raise ValueError("B's coding contains unacceptable chars")
3199 return self._bits2octets(value)
3200 if value.endswith("'H"):
3204 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3206 if value.__class__ == binary_type:
3207 return (len(value) * 8, value)
3208 raise InvalidValueType((self.__class__, string_types, binary_type))
3209 if value.__class__ == tuple:
3212 isinstance(value[0], integer_types) and
3213 value[1].__class__ == binary_type
3218 bit = self.specs.get(name)
3220 raise ObjUnknown("BitString value: %s" % name)
3223 return self._bits2octets("")
3224 bits = frozenset(bits)
3225 return self._bits2octets("".join(
3226 ("1" if bit in bits else "0")
3227 for bit in six_xrange(max(bits) + 1)
3229 if issubclass(value.__class__, BitString):
3231 raise InvalidValueType((self.__class__, binary_type, string_types))
3235 return self._value is not None
3237 def __getstate__(self):
3238 return BitStringState(
3253 self.tag_constructed,
3257 def __setstate__(self, state):
3258 super(BitString, self).__setstate__(state)
3259 self.specs = state.specs
3260 self._value = state.value
3261 self.tag_constructed = state.tag_constructed
3262 self.defined = state.defined
3265 self._assert_ready()
3266 for i in six_xrange(self._value[0]):
3271 """Returns number of bits in the string
3273 self._assert_ready()
3274 return self._value[0]
3276 def __bytes__(self):
3277 self._assert_ready()
3278 return self._value[1]
3280 def __eq__(self, their):
3281 if their.__class__ == bytes:
3282 return self._value[1] == their
3283 if not issubclass(their.__class__, BitString):
3286 self._value == their._value and
3287 self.tag == their.tag and
3288 self._expl == their._expl
3293 """Named representation (if exists) of the bits
3295 :returns: [str(name), ...]
3297 return [name for name, bit in iteritems(self.specs) if self[bit]]
3307 return self.__class__(
3309 impl=self.tag if impl is None else impl,
3310 expl=self._expl if expl is None else expl,
3311 default=self.default if default is None else default,
3312 optional=self.optional if optional is None else optional,
3316 def __getitem__(self, key):
3317 if key.__class__ == int:
3318 bit_len, octets = self._value
3322 byte2int(memoryview(octets)[key // 8:]) >>
3325 if isinstance(key, string_types):
3326 value = self.specs.get(key)
3328 raise ObjUnknown("BitString value: %s" % key)
3330 raise InvalidValueType((int, str))
3333 self._assert_ready()
3334 bit_len, octets = self._value
3337 len_encode(len(octets) + 1),
3338 int2byte((8 - bit_len % 8) % 8),
3342 def _encode1st(self, state):
3343 self._assert_ready()
3344 _, octets = self._value
3346 return len(self.tag) + len_size(l) + l, state
3348 def _encode2nd(self, writer, state_iter):
3349 bit_len, octets = self._value
3350 write_full(writer, b"".join((
3352 len_encode(len(octets) + 1),
3353 int2byte((8 - bit_len % 8) % 8),
3355 write_full(writer, octets)
3357 def _encode_cer(self, writer):
3358 bit_len, octets = self._value
3359 if len(octets) + 1 <= 1000:
3360 write_full(writer, self._encode())
3362 write_full(writer, self.tag_constructed)
3363 write_full(writer, LENINDEF)
3364 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3365 write_full(writer, b"".join((
3366 BitString.tag_default,
3369 octets[offset:offset + 999],
3371 tail = octets[offset+999:]
3373 tail = int2byte((8 - bit_len % 8) % 8) + tail
3374 write_full(writer, b"".join((
3375 BitString.tag_default,
3376 len_encode(len(tail)),
3379 write_full(writer, EOC)
3381 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3383 t, tlen, lv = tag_strip(tlv)
3384 except DecodeError as err:
3385 raise err.__class__(
3387 klass=self.__class__,
3388 decode_path=decode_path,
3392 if tag_only: # pragma: no cover
3396 l, llen, v = len_decode(lv)
3397 except DecodeError as err:
3398 raise err.__class__(
3400 klass=self.__class__,
3401 decode_path=decode_path,
3405 raise NotEnoughData(
3406 "encoded length is longer than data",
3407 klass=self.__class__,
3408 decode_path=decode_path,
3412 raise NotEnoughData(
3414 klass=self.__class__,
3415 decode_path=decode_path,
3418 pad_size = byte2int(v)
3419 if l == 1 and pad_size != 0:
3421 "invalid empty value",
3422 klass=self.__class__,
3423 decode_path=decode_path,
3429 klass=self.__class__,
3430 decode_path=decode_path,
3433 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3436 klass=self.__class__,
3437 decode_path=decode_path,
3440 v, tail = v[:l], v[l:]
3441 bit_len = (len(v) - 1) * 8 - pad_size
3442 obj = self.__class__(
3443 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3446 default=self.default,
3447 optional=self.optional,
3449 _decoded=(offset, llen, l),
3452 obj._value = (bit_len, None)
3453 yield decode_path, obj, tail
3455 if t != self.tag_constructed:
3457 klass=self.__class__,
3458 decode_path=decode_path,
3461 if not ctx.get("bered", False):
3463 "unallowed BER constructed encoding",
3464 klass=self.__class__,
3465 decode_path=decode_path,
3468 if tag_only: # pragma: no cover
3473 l, llen, v = len_decode(lv)
3474 except LenIndefForm:
3475 llen, l, v = 1, 0, lv[1:]
3477 except DecodeError as err:
3478 raise err.__class__(
3480 klass=self.__class__,
3481 decode_path=decode_path,
3485 raise NotEnoughData(
3486 "encoded length is longer than data",
3487 klass=self.__class__,
3488 decode_path=decode_path,
3491 if not lenindef and l == 0:
3492 raise NotEnoughData(
3494 klass=self.__class__,
3495 decode_path=decode_path,
3499 sub_offset = offset + tlen + llen
3503 if v[:EOC_LEN].tobytes() == EOC:
3510 "chunk out of bounds",
3511 klass=self.__class__,
3512 decode_path=decode_path + (str(len(chunks) - 1),),
3513 offset=chunks[-1].offset,
3515 sub_decode_path = decode_path + (str(len(chunks)),)
3518 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3521 decode_path=sub_decode_path,
3524 _ctx_immutable=False,
3526 yield _decode_path, chunk, v_tail
3528 _, chunk, v_tail = next(BitString().decode_evgen(
3531 decode_path=sub_decode_path,
3534 _ctx_immutable=False,
3539 "expected BitString encoded chunk",
3540 klass=self.__class__,
3541 decode_path=sub_decode_path,
3544 chunks.append(chunk)
3545 sub_offset += chunk.tlvlen
3546 vlen += chunk.tlvlen
3548 if len(chunks) == 0:
3551 klass=self.__class__,
3552 decode_path=decode_path,
3557 for chunk_i, chunk in enumerate(chunks[:-1]):
3558 if chunk.bit_len % 8 != 0:
3560 "BitString chunk is not multiple of 8 bits",
3561 klass=self.__class__,
3562 decode_path=decode_path + (str(chunk_i),),
3563 offset=chunk.offset,
3566 values.append(bytes(chunk))
3567 bit_len += chunk.bit_len
3568 chunk_last = chunks[-1]
3570 values.append(bytes(chunk_last))
3571 bit_len += chunk_last.bit_len
3572 obj = self.__class__(
3573 value=None if evgen_mode else (bit_len, b"".join(values)),
3576 default=self.default,
3577 optional=self.optional,
3579 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3582 obj._value = (bit_len, None)
3583 obj.lenindef = lenindef
3584 obj.ber_encoded = True
3585 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3588 return pp_console_row(next(self.pps()))
3590 def pps(self, decode_path=()):
3594 bit_len, blob = self._value
3595 value = "%d bits" % bit_len
3596 if len(self.specs) > 0 and blob is not None:
3597 blob = tuple(self.named)
3600 asn1_type_name=self.asn1_type_name,
3601 obj_name=self.__class__.__name__,
3602 decode_path=decode_path,
3605 optional=self.optional,
3606 default=self == self.default,
3607 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3608 expl=None if self._expl is None else tag_decode(self._expl),
3613 expl_offset=self.expl_offset if self.expled else None,
3614 expl_tlen=self.expl_tlen if self.expled else None,
3615 expl_llen=self.expl_llen if self.expled else None,
3616 expl_vlen=self.expl_vlen if self.expled else None,
3617 expl_lenindef=self.expl_lenindef,
3618 lenindef=self.lenindef,
3619 ber_encoded=self.ber_encoded,
3622 defined_by, defined = self.defined or (None, None)
3623 if defined_by is not None:
3625 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3627 for pp in self.pps_lenindef(decode_path):
3631 OctetStringState = namedtuple(
3633 BasicState._fields + (
3644 class OctetString(Obj):
3645 """``OCTET STRING`` binary string type
3647 >>> s = OctetString(b"hello world")
3648 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3649 >>> s == OctetString(b"hello world")
3654 >>> OctetString(b"hello", bounds=(4, 4))
3655 Traceback (most recent call last):
3656 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3657 >>> OctetString(b"hell", bounds=(4, 4))
3658 OCTET STRING 4 bytes 68656c6c
3660 Memoryviews can be used as a values. If memoryview is made on
3661 mmap-ed file, then it does not take storage inside OctetString
3662 itself. In CER encoding mode it will be streamed to the specified
3663 writer, copying 1 KB chunks.
3665 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3666 tag_default = tag_encode(4)
3667 asn1_type_name = "OCTET STRING"
3668 evgen_mode_skip_value = True
3682 :param value: set the value. Either binary type, or
3683 :py:class:`pyderasn.OctetString` object
3684 :param bounds: set ``(MIN, MAX)`` value size constraint.
3685 (-inf, +inf) by default
3686 :param bytes impl: override default tag with ``IMPLICIT`` one
3687 :param bytes expl: override default tag with ``EXPLICIT`` one
3688 :param default: set default value. Type same as in ``value``
3689 :param bool optional: is object ``OPTIONAL`` in sequence
3691 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3693 self._bound_min, self._bound_max = getattr(
3697 ) if bounds is None else bounds
3698 if value is not None:
3699 self._value = self._value_sanitize(value)
3700 if default is not None:
3701 default = self._value_sanitize(default)
3702 self.default = self.__class__(
3707 if self._value is None:
3708 self._value = default
3710 tag_klass, _, tag_num = tag_decode(self.tag)
3711 self.tag_constructed = tag_encode(
3713 form=TagFormConstructed,
3717 def _value_sanitize(self, value):
3718 if value.__class__ == binary_type or value.__class__ == memoryview:
3720 elif issubclass(value.__class__, OctetString):
3721 value = value._value
3723 raise InvalidValueType((self.__class__, bytes, memoryview))
3724 if not self._bound_min <= len(value) <= self._bound_max:
3725 raise BoundsError(self._bound_min, len(value), self._bound_max)
3730 return self._value is not None
3732 def __getstate__(self):
3733 return OctetStringState(
3749 self.tag_constructed,
3753 def __setstate__(self, state):
3754 super(OctetString, self).__setstate__(state)
3755 self._value = state.value
3756 self._bound_min = state.bound_min
3757 self._bound_max = state.bound_max
3758 self.tag_constructed = state.tag_constructed
3759 self.defined = state.defined
3761 def __bytes__(self):
3762 self._assert_ready()
3763 return bytes(self._value)
3765 def __eq__(self, their):
3766 if their.__class__ == binary_type:
3767 return self._value == their
3768 if not issubclass(their.__class__, OctetString):
3771 self._value == their._value and
3772 self.tag == their.tag and
3773 self._expl == their._expl
3776 def __lt__(self, their):
3777 return self._value < their._value
3788 return self.__class__(
3791 (self._bound_min, self._bound_max)
3792 if bounds is None else bounds
3794 impl=self.tag if impl is None else impl,
3795 expl=self._expl if expl is None else expl,
3796 default=self.default if default is None else default,
3797 optional=self.optional if optional is None else optional,
3801 self._assert_ready()
3804 len_encode(len(self._value)),
3808 def _encode1st(self, state):
3809 self._assert_ready()
3810 l = len(self._value)
3811 return len(self.tag) + len_size(l) + l, state
3813 def _encode2nd(self, writer, state_iter):
3815 write_full(writer, self.tag + len_encode(len(value)))
3816 write_full(writer, value)
3818 def _encode_cer(self, writer):
3819 octets = self._value
3820 if len(octets) <= 1000:
3821 write_full(writer, self._encode())
3823 write_full(writer, self.tag_constructed)
3824 write_full(writer, LENINDEF)
3825 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3826 write_full(writer, b"".join((
3827 OctetString.tag_default,
3829 octets[offset:offset + 1000],
3831 tail = octets[offset+1000:]
3833 write_full(writer, b"".join((
3834 OctetString.tag_default,
3835 len_encode(len(tail)),
3838 write_full(writer, EOC)
3840 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3842 t, tlen, lv = tag_strip(tlv)
3843 except DecodeError as err:
3844 raise err.__class__(
3846 klass=self.__class__,
3847 decode_path=decode_path,
3855 l, llen, v = len_decode(lv)
3856 except DecodeError as err:
3857 raise err.__class__(
3859 klass=self.__class__,
3860 decode_path=decode_path,
3864 raise NotEnoughData(
3865 "encoded length is longer than data",
3866 klass=self.__class__,
3867 decode_path=decode_path,
3870 v, tail = v[:l], v[l:]
3871 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3873 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3874 klass=self.__class__,
3875 decode_path=decode_path,
3879 obj = self.__class__(
3881 None if (evgen_mode and self.evgen_mode_skip_value)
3884 bounds=(self._bound_min, self._bound_max),
3887 default=self.default,
3888 optional=self.optional,
3889 _decoded=(offset, llen, l),
3892 except DecodeError as err:
3895 klass=self.__class__,
3896 decode_path=decode_path,
3899 except BoundsError as err:
3902 klass=self.__class__,
3903 decode_path=decode_path,
3906 yield decode_path, obj, tail
3908 if t != self.tag_constructed:
3910 klass=self.__class__,
3911 decode_path=decode_path,
3914 if not ctx.get("bered", False):
3916 "unallowed BER constructed encoding",
3917 klass=self.__class__,
3918 decode_path=decode_path,
3926 l, llen, v = len_decode(lv)
3927 except LenIndefForm:
3928 llen, l, v = 1, 0, lv[1:]
3930 except DecodeError as err:
3931 raise err.__class__(
3933 klass=self.__class__,
3934 decode_path=decode_path,
3938 raise NotEnoughData(
3939 "encoded length is longer than data",
3940 klass=self.__class__,
3941 decode_path=decode_path,
3946 sub_offset = offset + tlen + llen
3951 if v[:EOC_LEN].tobytes() == EOC:
3958 "chunk out of bounds",
3959 klass=self.__class__,
3960 decode_path=decode_path + (str(len(chunks) - 1),),
3961 offset=chunks[-1].offset,
3965 sub_decode_path = decode_path + (str(chunks_count),)
3966 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3969 decode_path=sub_decode_path,
3972 _ctx_immutable=False,
3974 yield _decode_path, chunk, v_tail
3975 if not chunk.ber_encoded:
3976 payload_len += chunk.vlen
3979 sub_decode_path = decode_path + (str(len(chunks)),)
3980 _, chunk, v_tail = next(OctetString().decode_evgen(
3983 decode_path=sub_decode_path,
3986 _ctx_immutable=False,
3989 chunks.append(chunk)
3992 "expected OctetString encoded chunk",
3993 klass=self.__class__,
3994 decode_path=sub_decode_path,
3997 sub_offset += chunk.tlvlen
3998 vlen += chunk.tlvlen
4000 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4002 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4003 klass=self.__class__,
4004 decode_path=decode_path,
4008 obj = self.__class__(
4010 None if evgen_mode else
4011 b"".join(bytes(chunk) for chunk in chunks)
4013 bounds=(self._bound_min, self._bound_max),
4016 default=self.default,
4017 optional=self.optional,
4018 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4021 except DecodeError as err:
4024 klass=self.__class__,
4025 decode_path=decode_path,
4028 except BoundsError as err:
4031 klass=self.__class__,
4032 decode_path=decode_path,
4035 obj.lenindef = lenindef
4036 obj.ber_encoded = True
4037 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4040 return pp_console_row(next(self.pps()))
4042 def pps(self, decode_path=()):
4045 asn1_type_name=self.asn1_type_name,
4046 obj_name=self.__class__.__name__,
4047 decode_path=decode_path,
4048 value=("%d bytes" % len(self._value)) if self.ready else None,
4049 blob=self._value if self.ready else None,
4050 optional=self.optional,
4051 default=self == self.default,
4052 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4053 expl=None if self._expl is None else tag_decode(self._expl),
4058 expl_offset=self.expl_offset if self.expled else None,
4059 expl_tlen=self.expl_tlen if self.expled else None,
4060 expl_llen=self.expl_llen if self.expled else None,
4061 expl_vlen=self.expl_vlen if self.expled else None,
4062 expl_lenindef=self.expl_lenindef,
4063 lenindef=self.lenindef,
4064 ber_encoded=self.ber_encoded,
4067 defined_by, defined = self.defined or (None, None)
4068 if defined_by is not None:
4070 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4072 for pp in self.pps_lenindef(decode_path):
4076 def agg_octet_string(evgens, decode_path, raw, writer):
4077 """Aggregate constructed string (OctetString and its derivatives)
4079 :param evgens: iterator of generated events
4080 :param decode_path: points to the string we want to decode
4081 :param raw: slicebable (memoryview, bytearray, etc) with
4082 the data evgens are generated on
4083 :param writer: buffer.write where string is going to be saved
4084 :param writer: where string is going to be saved. Must comply
4085 with ``io.RawIOBase.write`` behaviour
4087 .. seealso:: :ref:`agg_octet_string`
4089 decode_path_len = len(decode_path)
4090 for dp, obj, _ in evgens:
4091 if dp[:decode_path_len] != decode_path:
4093 if not obj.ber_encoded:
4094 write_full(writer, raw[
4095 obj.offset + obj.tlen + obj.llen:
4096 obj.offset + obj.tlen + obj.llen + obj.vlen -
4097 (EOC_LEN if obj.expl_lenindef else 0)
4099 if len(dp) == decode_path_len:
4103 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4107 """``NULL`` null object
4115 tag_default = tag_encode(5)
4116 asn1_type_name = "NULL"
4120 value=None, # unused, but Sequence passes it
4127 :param bytes impl: override default tag with ``IMPLICIT`` one
4128 :param bytes expl: override default tag with ``EXPLICIT`` one
4129 :param bool optional: is object ``OPTIONAL`` in sequence
4131 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4138 def __getstate__(self):
4154 def __eq__(self, their):
4155 if not issubclass(their.__class__, Null):
4158 self.tag == their.tag and
4159 self._expl == their._expl
4169 return self.__class__(
4170 impl=self.tag if impl is None else impl,
4171 expl=self._expl if expl is None else expl,
4172 optional=self.optional if optional is None else optional,
4176 return self.tag + LEN0
4178 def _encode1st(self, state):
4179 return len(self.tag) + 1, state
4181 def _encode2nd(self, writer, state_iter):
4182 write_full(writer, self.tag + LEN0)
4184 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4186 t, _, lv = tag_strip(tlv)
4187 except DecodeError as err:
4188 raise err.__class__(
4190 klass=self.__class__,
4191 decode_path=decode_path,
4196 klass=self.__class__,
4197 decode_path=decode_path,
4200 if tag_only: # pragma: no cover
4204 l, _, v = len_decode(lv)
4205 except DecodeError as err:
4206 raise err.__class__(
4208 klass=self.__class__,
4209 decode_path=decode_path,
4213 raise InvalidLength(
4214 "Null must have zero length",
4215 klass=self.__class__,
4216 decode_path=decode_path,
4219 obj = self.__class__(
4222 optional=self.optional,
4223 _decoded=(offset, 1, 0),
4225 yield decode_path, obj, v
4228 return pp_console_row(next(self.pps()))
4230 def pps(self, decode_path=()):
4233 asn1_type_name=self.asn1_type_name,
4234 obj_name=self.__class__.__name__,
4235 decode_path=decode_path,
4236 optional=self.optional,
4237 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4238 expl=None if self._expl is None else tag_decode(self._expl),
4243 expl_offset=self.expl_offset if self.expled else None,
4244 expl_tlen=self.expl_tlen if self.expled else None,
4245 expl_llen=self.expl_llen if self.expled else None,
4246 expl_vlen=self.expl_vlen if self.expled else None,
4247 expl_lenindef=self.expl_lenindef,
4250 for pp in self.pps_lenindef(decode_path):
4254 ObjectIdentifierState = namedtuple(
4255 "ObjectIdentifierState",
4256 BasicState._fields + ("value", "defines"),
4261 class ObjectIdentifier(Obj):
4262 """``OBJECT IDENTIFIER`` OID type
4264 >>> oid = ObjectIdentifier((1, 2, 3))
4265 OBJECT IDENTIFIER 1.2.3
4266 >>> oid == ObjectIdentifier("1.2.3")
4272 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4273 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4275 >>> str(ObjectIdentifier((3, 1)))
4276 Traceback (most recent call last):
4277 pyderasn.InvalidOID: unacceptable first arc value
4279 __slots__ = ("defines",)
4280 tag_default = tag_encode(6)
4281 asn1_type_name = "OBJECT IDENTIFIER"
4294 :param value: set the value. Either tuples of integers,
4295 string of "."-concatenated integers, or
4296 :py:class:`pyderasn.ObjectIdentifier` object
4297 :param defines: sequence of tuples. Each tuple has two elements.
4298 First one is relative to current one decode
4299 path, aiming to the field defined by that OID.
4300 Read about relative path in
4301 :py:func:`pyderasn.abs_decode_path`. Second
4302 tuple element is ``{OID: pyderasn.Obj()}``
4303 dictionary, mapping between current OID value
4304 and structure applied to defined field.
4306 .. seealso:: :ref:`definedby`
4308 :param bytes impl: override default tag with ``IMPLICIT`` one
4309 :param bytes expl: override default tag with ``EXPLICIT`` one
4310 :param default: set default value. Type same as in ``value``
4311 :param bool optional: is object ``OPTIONAL`` in sequence
4313 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4315 if value is not None:
4316 self._value = self._value_sanitize(value)
4317 if default is not None:
4318 default = self._value_sanitize(default)
4319 self.default = self.__class__(
4324 if self._value is None:
4325 self._value = default
4326 self.defines = defines
4328 def __add__(self, their):
4329 if their.__class__ == tuple:
4330 return self.__class__(self._value + array("L", their))
4331 if isinstance(their, self.__class__):
4332 return self.__class__(self._value + their._value)
4333 raise InvalidValueType((self.__class__, tuple))
4335 def _value_sanitize(self, value):
4336 if issubclass(value.__class__, ObjectIdentifier):
4338 if isinstance(value, string_types):
4340 value = array("L", (pureint(arc) for arc in value.split(".")))
4342 raise InvalidOID("unacceptable arcs values")
4343 if value.__class__ == tuple:
4345 value = array("L", value)
4346 except OverflowError as err:
4347 raise InvalidOID(repr(err))
4348 if value.__class__ is array:
4350 raise InvalidOID("less than 2 arcs")
4351 first_arc = value[0]
4352 if first_arc in (0, 1):
4353 if not (0 <= value[1] <= 39):
4354 raise InvalidOID("second arc is too wide")
4355 elif first_arc == 2:
4358 raise InvalidOID("unacceptable first arc value")
4359 if not all(arc >= 0 for arc in value):
4360 raise InvalidOID("negative arc value")
4362 raise InvalidValueType((self.__class__, str, tuple))
4366 return self._value is not None
4368 def __getstate__(self):
4369 return ObjectIdentifierState(
4386 def __setstate__(self, state):
4387 super(ObjectIdentifier, self).__setstate__(state)
4388 self._value = state.value
4389 self.defines = state.defines
4392 self._assert_ready()
4393 return iter(self._value)
4396 return ".".join(str(arc) for arc in self._value or ())
4399 self._assert_ready()
4400 return hash(b"".join((
4402 bytes(self._expl or b""),
4403 str(self._value).encode("ascii"),
4406 def __eq__(self, their):
4407 if their.__class__ == tuple:
4408 return self._value == array("L", their)
4409 if not issubclass(their.__class__, ObjectIdentifier):
4412 self.tag == their.tag and
4413 self._expl == their._expl and
4414 self._value == their._value
4417 def __lt__(self, their):
4418 return self._value < their._value
4429 return self.__class__(
4431 defines=self.defines if defines is None else defines,
4432 impl=self.tag if impl is None else impl,
4433 expl=self._expl if expl is None else expl,
4434 default=self.default if default is None else default,
4435 optional=self.optional if optional is None else optional,
4438 def _encode_octets(self):
4439 self._assert_ready()
4441 first_value = value[1]
4442 first_arc = value[0]
4445 elif first_arc == 1:
4447 elif first_arc == 2:
4449 else: # pragma: no cover
4450 raise RuntimeError("invalid arc is stored")
4451 octets = [zero_ended_encode(first_value)]
4452 for arc in value[2:]:
4453 octets.append(zero_ended_encode(arc))
4454 return b"".join(octets)
4457 v = self._encode_octets()
4458 return b"".join((self.tag, len_encode(len(v)), v))
4460 def _encode1st(self, state):
4461 l = len(self._encode_octets())
4462 return len(self.tag) + len_size(l) + l, state
4464 def _encode2nd(self, writer, state_iter):
4465 write_full(writer, self._encode())
4467 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4469 t, _, lv = tag_strip(tlv)
4470 except DecodeError as err:
4471 raise err.__class__(
4473 klass=self.__class__,
4474 decode_path=decode_path,
4479 klass=self.__class__,
4480 decode_path=decode_path,
4483 if tag_only: # pragma: no cover
4487 l, llen, v = len_decode(lv)
4488 except DecodeError as err:
4489 raise err.__class__(
4491 klass=self.__class__,
4492 decode_path=decode_path,
4496 raise NotEnoughData(
4497 "encoded length is longer than data",
4498 klass=self.__class__,
4499 decode_path=decode_path,
4503 raise NotEnoughData(
4505 klass=self.__class__,
4506 decode_path=decode_path,
4509 v, tail = v[:l], v[l:]
4516 octet = indexbytes(v, i)
4517 if i == 0 and octet == 0x80:
4518 if ctx.get("bered", False):
4522 "non normalized arc encoding",
4523 klass=self.__class__,
4524 decode_path=decode_path,
4527 arc = (arc << 7) | (octet & 0x7F)
4528 if octet & 0x80 == 0:
4531 except OverflowError:
4533 "too huge value for local unsigned long",
4534 klass=self.__class__,
4535 decode_path=decode_path,
4544 klass=self.__class__,
4545 decode_path=decode_path,
4549 second_arc = arcs[0]
4550 if 0 <= second_arc <= 39:
4552 elif 40 <= second_arc <= 79:
4558 obj = self.__class__(
4559 value=array("L", (first_arc, second_arc)) + arcs[1:],
4562 default=self.default,
4563 optional=self.optional,
4564 _decoded=(offset, llen, l),
4567 obj.ber_encoded = True
4568 yield decode_path, obj, tail
4571 return pp_console_row(next(self.pps()))
4573 def pps(self, decode_path=()):
4576 asn1_type_name=self.asn1_type_name,
4577 obj_name=self.__class__.__name__,
4578 decode_path=decode_path,
4579 value=str(self) if self.ready else None,
4580 optional=self.optional,
4581 default=self == self.default,
4582 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4583 expl=None if self._expl is None else tag_decode(self._expl),
4588 expl_offset=self.expl_offset if self.expled else None,
4589 expl_tlen=self.expl_tlen if self.expled else None,
4590 expl_llen=self.expl_llen if self.expled else None,
4591 expl_vlen=self.expl_vlen if self.expled else None,
4592 expl_lenindef=self.expl_lenindef,
4593 ber_encoded=self.ber_encoded,
4596 for pp in self.pps_lenindef(decode_path):
4600 class Enumerated(Integer):
4601 """``ENUMERATED`` integer type
4603 This type is identical to :py:class:`pyderasn.Integer`, but requires
4604 schema to be specified and does not accept values missing from it.
4607 tag_default = tag_encode(10)
4608 asn1_type_name = "ENUMERATED"
4619 bounds=None, # dummy argument, workability for Integer.decode
4621 super(Enumerated, self).__init__(
4622 value, bounds, impl, expl, default, optional, _specs, _decoded,
4624 if len(self.specs) == 0:
4625 raise ValueError("schema must be specified")
4627 def _value_sanitize(self, value):
4628 if isinstance(value, self.__class__):
4629 value = value._value
4630 elif isinstance(value, integer_types):
4631 for _value in itervalues(self.specs):
4636 "unknown integer value: %s" % value,
4637 klass=self.__class__,
4639 elif isinstance(value, string_types):
4640 value = self.specs.get(value)
4642 raise ObjUnknown("integer value: %s" % value)
4644 raise InvalidValueType((self.__class__, int, str))
4656 return self.__class__(
4658 impl=self.tag if impl is None else impl,
4659 expl=self._expl if expl is None else expl,
4660 default=self.default if default is None else default,
4661 optional=self.optional if optional is None else optional,
4666 def escape_control_unicode(c):
4667 if unicat(c)[0] == "C":
4668 c = repr(c).lstrip("u").strip("'")
4672 class CommonString(OctetString):
4673 """Common class for all strings
4675 Everything resembles :py:class:`pyderasn.OctetString`, except
4676 ability to deal with unicode text strings.
4678 >>> hexenc("привет мир".encode("utf-8"))
4679 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4680 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4682 >>> s = UTF8String("привет мир")
4683 UTF8String UTF8String привет мир
4685 'привет мир'
4686 >>> hexenc(bytes(s))
4687 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4689 >>> PrintableString("привет мир")
4690 Traceback (most recent call last):
4691 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4693 >>> BMPString("ада", bounds=(2, 2))
4694 Traceback (most recent call last):
4695 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4696 >>> s = BMPString("ад", bounds=(2, 2))
4699 >>> hexenc(bytes(s))
4707 * - :py:class:`pyderasn.UTF8String`
4709 * - :py:class:`pyderasn.NumericString`
4711 * - :py:class:`pyderasn.PrintableString`
4713 * - :py:class:`pyderasn.TeletexString`
4715 * - :py:class:`pyderasn.T61String`
4717 * - :py:class:`pyderasn.VideotexString`
4719 * - :py:class:`pyderasn.IA5String`
4721 * - :py:class:`pyderasn.GraphicString`
4723 * - :py:class:`pyderasn.VisibleString`
4725 * - :py:class:`pyderasn.ISO646String`
4727 * - :py:class:`pyderasn.GeneralString`
4729 * - :py:class:`pyderasn.UniversalString`
4731 * - :py:class:`pyderasn.BMPString`
4736 def _value_sanitize(self, value):
4738 value_decoded = None
4739 if isinstance(value, self.__class__):
4740 value_raw = value._value
4741 elif value.__class__ == text_type:
4742 value_decoded = value
4743 elif value.__class__ == binary_type:
4746 raise InvalidValueType((self.__class__, text_type, binary_type))
4749 value_decoded.encode(self.encoding)
4750 if value_raw is None else value_raw
4753 value_raw.decode(self.encoding)
4754 if value_decoded is None else value_decoded
4756 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4757 raise DecodeError(str(err))
4758 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4766 def __eq__(self, their):
4767 if their.__class__ == binary_type:
4768 return self._value == their
4769 if their.__class__ == text_type:
4770 return self._value == their.encode(self.encoding)
4771 if not isinstance(their, self.__class__):
4774 self._value == their._value and
4775 self.tag == their.tag and
4776 self._expl == their._expl
4779 def __unicode__(self):
4781 return self._value.decode(self.encoding)
4782 return text_type(self._value)
4785 return pp_console_row(next(self.pps(no_unicode=PY2)))
4787 def pps(self, decode_path=(), no_unicode=False):
4791 hexenc(bytes(self)) if no_unicode else
4792 "".join(escape_control_unicode(c) for c in self.__unicode__())
4796 asn1_type_name=self.asn1_type_name,
4797 obj_name=self.__class__.__name__,
4798 decode_path=decode_path,
4800 optional=self.optional,
4801 default=self == self.default,
4802 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4803 expl=None if self._expl is None else tag_decode(self._expl),
4808 expl_offset=self.expl_offset if self.expled else None,
4809 expl_tlen=self.expl_tlen if self.expled else None,
4810 expl_llen=self.expl_llen if self.expled else None,
4811 expl_vlen=self.expl_vlen if self.expled else None,
4812 expl_lenindef=self.expl_lenindef,
4813 ber_encoded=self.ber_encoded,
4816 for pp in self.pps_lenindef(decode_path):
4820 class UTF8String(CommonString):
4822 tag_default = tag_encode(12)
4824 asn1_type_name = "UTF8String"
4827 class AllowableCharsMixin(object):
4829 def allowable_chars(self):
4831 return self._allowable_chars
4832 return frozenset(six_unichr(c) for c in self._allowable_chars)
4835 class NumericString(AllowableCharsMixin, CommonString):
4838 Its value is properly sanitized: only ASCII digits with spaces can
4841 >>> NumericString().allowable_chars
4842 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4845 tag_default = tag_encode(18)
4847 asn1_type_name = "NumericString"
4848 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4850 def _value_sanitize(self, value):
4851 value = super(NumericString, self)._value_sanitize(value)
4852 if not frozenset(value) <= self._allowable_chars:
4853 raise DecodeError("non-numeric value")
4857 PrintableStringState = namedtuple(
4858 "PrintableStringState",
4859 OctetStringState._fields + ("allowable_chars",),
4864 class PrintableString(AllowableCharsMixin, CommonString):
4867 Its value is properly sanitized: see X.680 41.4 table 10.
4869 >>> PrintableString().allowable_chars
4870 frozenset([' ', "'", ..., 'z'])
4871 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4872 PrintableString PrintableString foo*bar
4873 >>> obj.allow_asterisk, obj.allow_ampersand
4877 tag_default = tag_encode(19)
4879 asn1_type_name = "PrintableString"
4880 _allowable_chars = frozenset(
4881 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4883 _asterisk = frozenset("*".encode("ascii"))
4884 _ampersand = frozenset("&".encode("ascii"))
4896 allow_asterisk=False,
4897 allow_ampersand=False,
4900 :param allow_asterisk: allow asterisk character
4901 :param allow_ampersand: allow ampersand character
4904 self._allowable_chars |= self._asterisk
4906 self._allowable_chars |= self._ampersand
4907 super(PrintableString, self).__init__(
4908 value, bounds, impl, expl, default, optional, _decoded, ctx,
4912 def allow_asterisk(self):
4913 """Is asterisk character allowed?
4915 return self._asterisk <= self._allowable_chars
4918 def allow_ampersand(self):
4919 """Is ampersand character allowed?
4921 return self._ampersand <= self._allowable_chars
4923 def _value_sanitize(self, value):
4924 value = super(PrintableString, self)._value_sanitize(value)
4925 if not frozenset(value) <= self._allowable_chars:
4926 raise DecodeError("non-printable value")
4929 def __getstate__(self):
4930 return PrintableStringState(
4931 *super(PrintableString, self).__getstate__(),
4932 **{"allowable_chars": self._allowable_chars}
4935 def __setstate__(self, state):
4936 super(PrintableString, self).__setstate__(state)
4937 self._allowable_chars = state.allowable_chars
4948 return self.__class__(
4951 (self._bound_min, self._bound_max)
4952 if bounds is None else bounds
4954 impl=self.tag if impl is None else impl,
4955 expl=self._expl if expl is None else expl,
4956 default=self.default if default is None else default,
4957 optional=self.optional if optional is None else optional,
4958 allow_asterisk=self.allow_asterisk,
4959 allow_ampersand=self.allow_ampersand,
4963 class TeletexString(CommonString):
4965 tag_default = tag_encode(20)
4967 asn1_type_name = "TeletexString"
4970 class T61String(TeletexString):
4972 asn1_type_name = "T61String"
4975 class VideotexString(CommonString):
4977 tag_default = tag_encode(21)
4978 encoding = "iso-8859-1"
4979 asn1_type_name = "VideotexString"
4982 class IA5String(CommonString):
4984 tag_default = tag_encode(22)
4986 asn1_type_name = "IA5"
4989 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4990 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4991 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4992 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4993 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4994 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4997 class VisibleString(CommonString):
4999 tag_default = tag_encode(26)
5001 asn1_type_name = "VisibleString"
5004 UTCTimeState = namedtuple(
5006 OctetStringState._fields + ("ber_raw",),
5011 def str_to_time_fractions(value):
5013 year, v = (v // 10**10), (v % 10**10)
5014 month, v = (v // 10**8), (v % 10**8)
5015 day, v = (v // 10**6), (v % 10**6)
5016 hour, v = (v // 10**4), (v % 10**4)
5017 minute, second = (v // 100), (v % 100)
5018 return year, month, day, hour, minute, second
5021 class UTCTime(VisibleString):
5022 """``UTCTime`` datetime type
5024 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5025 UTCTime UTCTime 2017-09-30T22:07:50
5031 datetime.datetime(2017, 9, 30, 22, 7, 50)
5032 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5033 datetime.datetime(1957, 9, 30, 22, 7, 50)
5035 If BER encoded value was met, then ``ber_raw`` attribute will hold
5036 its raw representation.
5040 Pay attention that UTCTime can not hold full year, so all years
5041 having < 50 years are treated as 20xx, 19xx otherwise, according
5042 to X.509 recommendation.
5046 No strict validation of UTC offsets are made, but very crude:
5048 * minutes are not exceeding 60
5049 * offset value is not exceeding 14 hours
5051 __slots__ = ("ber_raw",)
5052 tag_default = tag_encode(23)
5054 asn1_type_name = "UTCTime"
5055 evgen_mode_skip_value = False
5065 bounds=None, # dummy argument, workability for OctetString.decode
5069 :param value: set the value. Either datetime type, or
5070 :py:class:`pyderasn.UTCTime` object
5071 :param bytes impl: override default tag with ``IMPLICIT`` one
5072 :param bytes expl: override default tag with ``EXPLICIT`` one
5073 :param default: set default value. Type same as in ``value``
5074 :param bool optional: is object ``OPTIONAL`` in sequence
5076 super(UTCTime, self).__init__(
5077 None, None, impl, expl, None, optional, _decoded, ctx,
5081 if value is not None:
5082 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5083 self.ber_encoded = self.ber_raw is not None
5084 if default is not None:
5085 default, _ = self._value_sanitize(default)
5086 self.default = self.__class__(
5091 if self._value is None:
5092 self._value = default
5094 self.optional = optional
5096 def _strptime_bered(self, value):
5097 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5100 raise ValueError("no timezone")
5101 year += 2000 if year < 50 else 1900
5102 decoded = datetime(year, month, day, hour, minute)
5104 if value[-1] == "Z":
5108 raise ValueError("invalid UTC offset")
5109 if value[-5] == "-":
5111 elif value[-5] == "+":
5114 raise ValueError("invalid UTC offset")
5115 v = pureint(value[-4:])
5116 offset, v = (60 * (v % 100)), v // 100
5118 raise ValueError("invalid UTC offset minutes")
5120 if offset > 14 * 3600:
5121 raise ValueError("too big UTC offset")
5125 return offset, decoded
5127 raise ValueError("invalid UTC offset seconds")
5128 seconds = pureint(value)
5130 raise ValueError("invalid seconds value")
5131 return offset, decoded + timedelta(seconds=seconds)
5133 def _strptime(self, value):
5134 # datetime.strptime's format: %y%m%d%H%M%SZ
5135 if len(value) != LEN_YYMMDDHHMMSSZ:
5136 raise ValueError("invalid UTCTime length")
5137 if value[-1] != "Z":
5138 raise ValueError("non UTC timezone")
5139 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5140 year += 2000 if year < 50 else 1900
5141 return datetime(year, month, day, hour, minute, second)
5143 def _dt_sanitize(self, value):
5144 if value.year < 1950 or value.year > 2049:
5145 raise ValueError("UTCTime can hold only 1950-2049 years")
5146 return value.replace(microsecond=0)
5148 def _value_sanitize(self, value, ctx=None):
5149 if value.__class__ == binary_type:
5151 value_decoded = value.decode("ascii")
5152 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5153 raise DecodeError("invalid UTCTime encoding: %r" % err)
5156 return self._strptime(value_decoded), None
5157 except (TypeError, ValueError) as _err:
5159 if (ctx is not None) and ctx.get("bered", False):
5161 offset, _value = self._strptime_bered(value_decoded)
5162 _value = _value - timedelta(seconds=offset)
5163 return self._dt_sanitize(_value), value
5164 except (TypeError, ValueError, OverflowError) as _err:
5167 "invalid %s format: %r" % (self.asn1_type_name, err),
5168 klass=self.__class__,
5170 if isinstance(value, self.__class__):
5171 return value._value, None
5172 if value.__class__ == datetime:
5173 return self._dt_sanitize(value), None
5174 raise InvalidValueType((self.__class__, datetime))
5176 def _pp_value(self):
5178 value = self._value.isoformat()
5179 if self.ber_encoded:
5180 value += " (%s)" % self.ber_raw
5184 def __unicode__(self):
5186 value = self._value.isoformat()
5187 if self.ber_encoded:
5188 value += " (%s)" % self.ber_raw
5190 return text_type(self._pp_value())
5192 def __getstate__(self):
5193 return UTCTimeState(
5194 *super(UTCTime, self).__getstate__(),
5195 **{"ber_raw": self.ber_raw}
5198 def __setstate__(self, state):
5199 super(UTCTime, self).__setstate__(state)
5200 self.ber_raw = state.ber_raw
5202 def __bytes__(self):
5203 self._assert_ready()
5204 return self._encode_time()
5206 def __eq__(self, their):
5207 if their.__class__ == binary_type:
5208 return self._encode_time() == their
5209 if their.__class__ == datetime:
5210 return self.todatetime() == their
5211 if not isinstance(their, self.__class__):
5214 self._value == their._value and
5215 self.tag == their.tag and
5216 self._expl == their._expl
5219 def _encode_time(self):
5220 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5223 self._assert_ready()
5224 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5226 def _encode1st(self, state):
5227 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5229 def _encode2nd(self, writer, state_iter):
5230 self._assert_ready()
5231 write_full(writer, self._encode())
5233 def _encode_cer(self, writer):
5234 write_full(writer, self._encode())
5236 def todatetime(self):
5240 return pp_console_row(next(self.pps()))
5242 def pps(self, decode_path=()):
5245 asn1_type_name=self.asn1_type_name,
5246 obj_name=self.__class__.__name__,
5247 decode_path=decode_path,
5248 value=self._pp_value(),
5249 optional=self.optional,
5250 default=self == self.default,
5251 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5252 expl=None if self._expl is None else tag_decode(self._expl),
5257 expl_offset=self.expl_offset if self.expled else None,
5258 expl_tlen=self.expl_tlen if self.expled else None,
5259 expl_llen=self.expl_llen if self.expled else None,
5260 expl_vlen=self.expl_vlen if self.expled else None,
5261 expl_lenindef=self.expl_lenindef,
5262 ber_encoded=self.ber_encoded,
5265 for pp in self.pps_lenindef(decode_path):
5269 class GeneralizedTime(UTCTime):
5270 """``GeneralizedTime`` datetime type
5272 This type is similar to :py:class:`pyderasn.UTCTime`.
5274 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5275 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5277 '20170930220750.000123Z'
5278 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5279 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5283 Only microsecond fractions are supported in DER encoding.
5284 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5285 higher precision values.
5289 BER encoded data can loss information (accuracy) during decoding
5290 because of float transformations.
5294 Local times (without explicit timezone specification) are treated
5295 as UTC one, no transformations are made.
5299 Zero year is unsupported.
5302 tag_default = tag_encode(24)
5303 asn1_type_name = "GeneralizedTime"
5305 def _dt_sanitize(self, value):
5308 def _strptime_bered(self, value):
5309 if len(value) < 4 + 3 * 2:
5310 raise ValueError("invalid GeneralizedTime")
5311 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5312 decoded = datetime(year, month, day, hour)
5313 offset, value = 0, value[10:]
5315 return offset, decoded
5316 if value[-1] == "Z":
5319 for char, sign in (("-", -1), ("+", 1)):
5320 idx = value.rfind(char)
5323 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5324 v = pureint(offset_raw)
5325 if len(offset_raw) == 4:
5326 offset, v = (60 * (v % 100)), v // 100
5328 raise ValueError("invalid UTC offset minutes")
5329 elif len(offset_raw) == 2:
5332 raise ValueError("invalid UTC offset")
5334 if offset > 14 * 3600:
5335 raise ValueError("too big UTC offset")
5339 return offset, decoded
5340 if value[0] in DECIMAL_SIGNS:
5342 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5345 raise ValueError("stripped minutes")
5346 decoded += timedelta(seconds=60 * pureint(value[:2]))
5349 return offset, decoded
5350 if value[0] in DECIMAL_SIGNS:
5352 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5355 raise ValueError("stripped seconds")
5356 decoded += timedelta(seconds=pureint(value[:2]))
5359 return offset, decoded
5360 if value[0] not in DECIMAL_SIGNS:
5361 raise ValueError("invalid format after seconds")
5363 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5366 def _strptime(self, value):
5368 if l == LEN_YYYYMMDDHHMMSSZ:
5369 # datetime.strptime's format: %Y%m%d%H%M%SZ
5370 if value[-1] != "Z":
5371 raise ValueError("non UTC timezone")
5372 return datetime(*str_to_time_fractions(value[:-1]))
5373 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5374 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5375 if value[-1] != "Z":
5376 raise ValueError("non UTC timezone")
5377 if value[14] != ".":
5378 raise ValueError("no fractions separator")
5381 raise ValueError("trailing zero")
5384 raise ValueError("only microsecond fractions are supported")
5385 us = pureint(us + ("0" * (6 - us_len)))
5386 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5387 return datetime(year, month, day, hour, minute, second, us)
5388 raise ValueError("invalid GeneralizedTime length")
5390 def _encode_time(self):
5392 encoded = value.strftime("%Y%m%d%H%M%S")
5393 if value.microsecond > 0:
5394 encoded += (".%06d" % value.microsecond).rstrip("0")
5395 return (encoded + "Z").encode("ascii")
5398 self._assert_ready()
5400 if value.microsecond > 0:
5401 encoded = self._encode_time()
5402 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5403 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5405 def _encode1st(self, state):
5406 self._assert_ready()
5407 vlen = len(self._encode_time())
5408 return len(self.tag) + len_size(vlen) + vlen, state
5410 def _encode2nd(self, writer, state_iter):
5411 write_full(writer, self._encode())
5414 class GraphicString(CommonString):
5416 tag_default = tag_encode(25)
5417 encoding = "iso-8859-1"
5418 asn1_type_name = "GraphicString"
5421 class ISO646String(VisibleString):
5423 asn1_type_name = "ISO646String"
5426 class GeneralString(CommonString):
5428 tag_default = tag_encode(27)
5429 encoding = "iso-8859-1"
5430 asn1_type_name = "GeneralString"
5433 class UniversalString(CommonString):
5435 tag_default = tag_encode(28)
5436 encoding = "utf-32-be"
5437 asn1_type_name = "UniversalString"
5440 class BMPString(CommonString):
5442 tag_default = tag_encode(30)
5443 encoding = "utf-16-be"
5444 asn1_type_name = "BMPString"
5447 ChoiceState = namedtuple(
5449 BasicState._fields + ("specs", "value",),
5455 """``CHOICE`` special type
5459 class GeneralName(Choice):
5461 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5462 ("dNSName", IA5String(impl=tag_ctxp(2))),
5465 >>> gn = GeneralName()
5467 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5468 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5469 >>> gn["dNSName"] = IA5String("bar.baz")
5470 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5471 >>> gn["rfc822Name"]
5474 [2] IA5String IA5 bar.baz
5477 >>> gn.value == gn["dNSName"]
5480 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5482 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5483 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5485 __slots__ = ("specs",)
5487 asn1_type_name = "CHOICE"
5500 :param value: set the value. Either ``(choice, value)`` tuple, or
5501 :py:class:`pyderasn.Choice` object
5502 :param bytes impl: can not be set, do **not** use it
5503 :param bytes expl: override default tag with ``EXPLICIT`` one
5504 :param default: set default value. Type same as in ``value``
5505 :param bool optional: is object ``OPTIONAL`` in sequence
5507 if impl is not None:
5508 raise ValueError("no implicit tag allowed for CHOICE")
5509 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5511 schema = getattr(self, "schema", ())
5512 if len(schema) == 0:
5513 raise ValueError("schema must be specified")
5515 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5518 if value is not None:
5519 self._value = self._value_sanitize(value)
5520 if default is not None:
5521 default_value = self._value_sanitize(default)
5522 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5523 default_obj.specs = self.specs
5524 default_obj._value = default_value
5525 self.default = default_obj
5527 self._value = copy(default_obj._value)
5528 if self._expl is not None:
5529 tag_class, _, tag_num = tag_decode(self._expl)
5530 self._tag_order = (tag_class, tag_num)
5532 def _value_sanitize(self, value):
5533 if (value.__class__ == tuple) and len(value) == 2:
5535 spec = self.specs.get(choice)
5537 raise ObjUnknown(choice)
5538 if not isinstance(obj, spec.__class__):
5539 raise InvalidValueType((spec,))
5540 return (choice, spec(obj))
5541 if isinstance(value, self.__class__):
5543 raise InvalidValueType((self.__class__, tuple))
5547 return self._value is not None and self._value[1].ready
5551 return self.expl_lenindef or (
5552 (self._value is not None) and
5553 self._value[1].bered
5556 def __getstate__(self):
5574 def __setstate__(self, state):
5575 super(Choice, self).__setstate__(state)
5576 self.specs = state.specs
5577 self._value = state.value
5579 def __eq__(self, their):
5580 if (their.__class__ == tuple) and len(their) == 2:
5581 return self._value == their
5582 if not isinstance(their, self.__class__):
5585 self.specs == their.specs and
5586 self._value == their._value
5596 return self.__class__(
5599 expl=self._expl if expl is None else expl,
5600 default=self.default if default is None else default,
5601 optional=self.optional if optional is None else optional,
5606 """Name of the choice
5608 self._assert_ready()
5609 return self._value[0]
5613 """Value of underlying choice
5615 self._assert_ready()
5616 return self._value[1]
5619 def tag_order(self):
5620 self._assert_ready()
5621 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5624 def tag_order_cer(self):
5625 return min(v.tag_order_cer for v in itervalues(self.specs))
5627 def __getitem__(self, key):
5628 if key not in self.specs:
5629 raise ObjUnknown(key)
5630 if self._value is None:
5632 choice, value = self._value
5637 def __setitem__(self, key, value):
5638 spec = self.specs.get(key)
5640 raise ObjUnknown(key)
5641 if not isinstance(value, spec.__class__):
5642 raise InvalidValueType((spec.__class__,))
5643 self._value = (key, spec(value))
5651 return self._value[1].decoded if self.ready else False
5654 self._assert_ready()
5655 return self._value[1].encode()
5657 def _encode1st(self, state):
5658 self._assert_ready()
5659 return self._value[1].encode1st(state)
5661 def _encode2nd(self, writer, state_iter):
5662 self._value[1].encode2nd(writer, state_iter)
5664 def _encode_cer(self, writer):
5665 self._assert_ready()
5666 self._value[1].encode_cer(writer)
5668 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5669 for choice, spec in iteritems(self.specs):
5670 sub_decode_path = decode_path + (choice,)
5676 decode_path=sub_decode_path,
5679 _ctx_immutable=False,
5686 klass=self.__class__,
5687 decode_path=decode_path,
5690 if tag_only: # pragma: no cover
5694 for _decode_path, value, tail in spec.decode_evgen(
5698 decode_path=sub_decode_path,
5700 _ctx_immutable=False,
5702 yield _decode_path, value, tail
5704 _, value, tail = next(spec.decode_evgen(
5708 decode_path=sub_decode_path,
5710 _ctx_immutable=False,
5713 obj = self.__class__(
5716 default=self.default,
5717 optional=self.optional,
5718 _decoded=(offset, 0, value.fulllen),
5720 obj._value = (choice, value)
5721 yield decode_path, obj, tail
5724 value = pp_console_row(next(self.pps()))
5726 value = "%s[%r]" % (value, self.value)
5729 def pps(self, decode_path=()):
5732 asn1_type_name=self.asn1_type_name,
5733 obj_name=self.__class__.__name__,
5734 decode_path=decode_path,
5735 value=self.choice if self.ready else None,
5736 optional=self.optional,
5737 default=self == self.default,
5738 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5739 expl=None if self._expl is None else tag_decode(self._expl),
5744 expl_lenindef=self.expl_lenindef,
5748 yield self.value.pps(decode_path=decode_path + (self.choice,))
5749 for pp in self.pps_lenindef(decode_path):
5753 class PrimitiveTypes(Choice):
5754 """Predefined ``CHOICE`` for all generic primitive types
5756 It could be useful for general decoding of some unspecified values:
5758 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5759 OCTET STRING 3 bytes 666f6f
5760 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5764 schema = tuple((klass.__name__, klass()) for klass in (
5788 AnyState = namedtuple(
5790 BasicState._fields + ("value", "defined"),
5796 """``ANY`` special type
5798 >>> Any(Integer(-123))
5799 ANY INTEGER -123 (0X:7B)
5800 >>> a = Any(OctetString(b"hello world").encode())
5801 ANY 040b68656c6c6f20776f726c64
5802 >>> hexenc(bytes(a))
5803 b'0x040x0bhello world'
5805 __slots__ = ("defined",)
5806 tag_default = tag_encode(0)
5807 asn1_type_name = "ANY"
5817 :param value: set the value. Either any kind of pyderasn's
5818 **ready** object, or bytes. Pay attention that
5819 **no** validation is performed if raw binary value
5820 is valid TLV, except just tag decoding
5821 :param bytes expl: override default tag with ``EXPLICIT`` one
5822 :param bool optional: is object ``OPTIONAL`` in sequence
5824 super(Any, self).__init__(None, expl, None, optional, _decoded)
5828 value = self._value_sanitize(value)
5830 if self._expl is None:
5831 if value.__class__ == binary_type:
5832 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5834 tag_class, tag_num = value.tag_order
5836 tag_class, _, tag_num = tag_decode(self._expl)
5837 self._tag_order = (tag_class, tag_num)
5840 def _value_sanitize(self, value):
5841 if value.__class__ == binary_type:
5843 raise ValueError("Any value can not be empty")
5845 if isinstance(value, self.__class__):
5847 if not isinstance(value, Obj):
5848 raise InvalidValueType((self.__class__, Obj, binary_type))
5853 return self._value is not None
5856 def tag_order(self):
5857 self._assert_ready()
5858 return self._tag_order
5862 if self.expl_lenindef or self.lenindef:
5864 if self.defined is None:
5866 return self.defined[1].bered
5868 def __getstate__(self):
5886 def __setstate__(self, state):
5887 super(Any, self).__setstate__(state)
5888 self._value = state.value
5889 self.defined = state.defined
5891 def __eq__(self, their):
5892 if their.__class__ == binary_type:
5893 if self._value.__class__ == binary_type:
5894 return self._value == their
5895 return self._value.encode() == their
5896 if issubclass(their.__class__, Any):
5897 if self.ready and their.ready:
5898 return bytes(self) == bytes(their)
5899 return self.ready == their.ready
5908 return self.__class__(
5910 expl=self._expl if expl is None else expl,
5911 optional=self.optional if optional is None else optional,
5914 def __bytes__(self):
5915 self._assert_ready()
5917 if value.__class__ == binary_type:
5919 return self._value.encode()
5926 self._assert_ready()
5928 if value.__class__ == binary_type:
5930 return value.encode()
5932 def _encode1st(self, state):
5933 self._assert_ready()
5935 if value.__class__ == binary_type:
5936 return len(value), state
5937 return value.encode1st(state)
5939 def _encode2nd(self, writer, state_iter):
5941 if value.__class__ == binary_type:
5942 write_full(writer, value)
5944 value.encode2nd(writer, state_iter)
5946 def _encode_cer(self, writer):
5947 self._assert_ready()
5949 if value.__class__ == binary_type:
5950 write_full(writer, value)
5952 value.encode_cer(writer)
5954 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5956 t, tlen, lv = tag_strip(tlv)
5957 except DecodeError as err:
5958 raise err.__class__(
5960 klass=self.__class__,
5961 decode_path=decode_path,
5965 l, llen, v = len_decode(lv)
5966 except LenIndefForm as err:
5967 if not ctx.get("bered", False):
5968 raise err.__class__(
5970 klass=self.__class__,
5971 decode_path=decode_path,
5974 llen, vlen, v = 1, 0, lv[1:]
5975 sub_offset = offset + tlen + llen
5977 while v[:EOC_LEN].tobytes() != EOC:
5978 chunk, v = Any().decode(
5981 decode_path=decode_path + (str(chunk_i),),
5984 _ctx_immutable=False,
5986 vlen += chunk.tlvlen
5987 sub_offset += chunk.tlvlen
5989 tlvlen = tlen + llen + vlen + EOC_LEN
5990 obj = self.__class__(
5991 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5993 optional=self.optional,
5994 _decoded=(offset, 0, tlvlen),
5997 obj.tag = t.tobytes()
5998 yield decode_path, obj, v[EOC_LEN:]
6000 except DecodeError as err:
6001 raise err.__class__(
6003 klass=self.__class__,
6004 decode_path=decode_path,
6008 raise NotEnoughData(
6009 "encoded length is longer than data",
6010 klass=self.__class__,
6011 decode_path=decode_path,
6014 tlvlen = tlen + llen + l
6015 v, tail = tlv[:tlvlen], v[l:]
6016 obj = self.__class__(
6017 value=None if evgen_mode else v.tobytes(),
6019 optional=self.optional,
6020 _decoded=(offset, 0, tlvlen),
6022 obj.tag = t.tobytes()
6023 yield decode_path, obj, tail
6026 return pp_console_row(next(self.pps()))
6028 def pps(self, decode_path=()):
6032 elif value.__class__ == binary_type:
6038 asn1_type_name=self.asn1_type_name,
6039 obj_name=self.__class__.__name__,
6040 decode_path=decode_path,
6042 blob=self._value if self._value.__class__ == binary_type else None,
6043 optional=self.optional,
6044 default=self == self.default,
6045 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6046 expl=None if self._expl is None else tag_decode(self._expl),
6051 expl_offset=self.expl_offset if self.expled else None,
6052 expl_tlen=self.expl_tlen if self.expled else None,
6053 expl_llen=self.expl_llen if self.expled else None,
6054 expl_vlen=self.expl_vlen if self.expled else None,
6055 expl_lenindef=self.expl_lenindef,
6056 lenindef=self.lenindef,
6059 defined_by, defined = self.defined or (None, None)
6060 if defined_by is not None:
6062 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6064 for pp in self.pps_lenindef(decode_path):
6068 ########################################################################
6069 # ASN.1 constructed types
6070 ########################################################################
6072 def abs_decode_path(decode_path, rel_path):
6073 """Create an absolute decode path from current and relative ones
6075 :param decode_path: current decode path, starting point. Tuple of strings
6076 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6077 If first tuple's element is "/", then treat it as
6078 an absolute path, ignoring ``decode_path`` as
6079 starting point. Also this tuple can contain ".."
6080 elements, stripping the leading element from
6083 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6084 ("foo", "bar", "baz", "whatever")
6085 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6087 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6090 if rel_path[0] == "/":
6092 if rel_path[0] == "..":
6093 return abs_decode_path(decode_path[:-1], rel_path[1:])
6094 return decode_path + rel_path
6097 SequenceState = namedtuple(
6099 BasicState._fields + ("specs", "value",),
6104 class SequenceEncode1stMixing(object):
6105 def _encode1st(self, state):
6107 idx = len(state) - 1
6109 for v in self._values_for_encoding():
6110 l, _ = v.encode1st(state)
6113 return len(self.tag) + len_size(vlen) + vlen, state
6116 class Sequence(SequenceEncode1stMixing, Obj):
6117 """``SEQUENCE`` structure type
6119 You have to make specification of sequence::
6121 class Extension(Sequence):
6123 ("extnID", ObjectIdentifier()),
6124 ("critical", Boolean(default=False)),
6125 ("extnValue", OctetString()),
6128 Then, you can work with it as with dictionary.
6130 >>> ext = Extension()
6131 >>> Extension().specs
6133 ('extnID', OBJECT IDENTIFIER),
6134 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6135 ('extnValue', OCTET STRING),
6137 >>> ext["extnID"] = "1.2.3"
6138 Traceback (most recent call last):
6139 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6140 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6142 You can determine if sequence is ready to be encoded:
6147 Traceback (most recent call last):
6148 pyderasn.ObjNotReady: object is not ready: extnValue
6149 >>> ext["extnValue"] = OctetString(b"foobar")
6153 Value you want to assign, must have the same **type** as in
6154 corresponding specification, but it can have different tags,
6155 optional/default attributes -- they will be taken from specification
6158 class TBSCertificate(Sequence):
6160 ("version", Version(expl=tag_ctxc(0), default="v1")),
6163 >>> tbs = TBSCertificate()
6164 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6166 Assign ``None`` to remove value from sequence.
6168 You can set values in Sequence during its initialization:
6170 >>> AlgorithmIdentifier((
6171 ("algorithm", ObjectIdentifier("1.2.3")),
6172 ("parameters", Any(Null()))
6174 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6176 You can determine if value exists/set in the sequence and take its value:
6178 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6181 OBJECT IDENTIFIER 1.2.3
6183 But pay attention that if value has default, then it won't be (not
6184 in) in the sequence (because ``DEFAULT`` must not be encoded in
6185 DER), but you can read its value:
6187 >>> "critical" in ext, ext["critical"]
6188 (False, BOOLEAN False)
6189 >>> ext["critical"] = Boolean(True)
6190 >>> "critical" in ext, ext["critical"]
6191 (True, BOOLEAN True)
6193 All defaulted values are always optional.
6195 .. _allow_default_values_ctx:
6197 DER prohibits default value encoding and will raise an error if
6198 default value is unexpectedly met during decode.
6199 If :ref:`bered <bered_ctx>` context option is set, then no error
6200 will be raised, but ``bered`` attribute set. You can disable strict
6201 defaulted values existence validation by setting
6202 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6204 All values with DEFAULT specified are decoded atomically in
6205 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6206 SEQUENCE, then it will be yielded as a single element, not
6207 disassembled. That is required for DEFAULT existence check.
6209 Two sequences are equal if they have equal specification (schema),
6210 implicit/explicit tagging and the same values.
6212 __slots__ = ("specs",)
6213 tag_default = tag_encode(form=TagFormConstructed, num=16)
6214 asn1_type_name = "SEQUENCE"
6226 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6228 schema = getattr(self, "schema", ())
6230 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6233 if value is not None:
6234 if issubclass(value.__class__, Sequence):
6235 self._value = value._value
6236 elif hasattr(value, "__iter__"):
6237 for seq_key, seq_value in value:
6238 self[seq_key] = seq_value
6240 raise InvalidValueType((Sequence,))
6241 if default is not None:
6242 if not issubclass(default.__class__, Sequence):
6243 raise InvalidValueType((Sequence,))
6244 default_value = default._value
6245 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6246 default_obj.specs = self.specs
6247 default_obj._value = default_value
6248 self.default = default_obj
6250 self._value = copy(default_obj._value)
6254 for name, spec in iteritems(self.specs):
6255 value = self._value.get(name)
6266 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6268 return any(value.bered for value in itervalues(self._value))
6270 def __getstate__(self):
6271 return SequenceState(
6285 {k: copy(v) for k, v in iteritems(self._value)},
6288 def __setstate__(self, state):
6289 super(Sequence, self).__setstate__(state)
6290 self.specs = state.specs
6291 self._value = state.value
6293 def __eq__(self, their):
6294 if not isinstance(their, self.__class__):
6297 self.specs == their.specs and
6298 self.tag == their.tag and
6299 self._expl == their._expl and
6300 self._value == their._value
6311 return self.__class__(
6314 impl=self.tag if impl is None else impl,
6315 expl=self._expl if expl is None else expl,
6316 default=self.default if default is None else default,
6317 optional=self.optional if optional is None else optional,
6320 def __contains__(self, key):
6321 return key in self._value
6323 def __setitem__(self, key, value):
6324 spec = self.specs.get(key)
6326 raise ObjUnknown(key)
6328 self._value.pop(key, None)
6330 if not isinstance(value, spec.__class__):
6331 raise InvalidValueType((spec.__class__,))
6332 value = spec(value=value)
6333 if spec.default is not None and value == spec.default:
6334 self._value.pop(key, None)
6336 self._value[key] = value
6338 def __getitem__(self, key):
6339 value = self._value.get(key)
6340 if value is not None:
6342 spec = self.specs.get(key)
6344 raise ObjUnknown(key)
6345 if spec.default is not None:
6349 def _values_for_encoding(self):
6350 for name, spec in iteritems(self.specs):
6351 value = self._value.get(name)
6355 raise ObjNotReady(name)
6359 v = b"".join(v.encode() for v in self._values_for_encoding())
6360 return b"".join((self.tag, len_encode(len(v)), v))
6362 def _encode2nd(self, writer, state_iter):
6363 write_full(writer, self.tag + len_encode(next(state_iter)))
6364 for v in self._values_for_encoding():
6365 v.encode2nd(writer, state_iter)
6367 def _encode_cer(self, writer):
6368 write_full(writer, self.tag + LENINDEF)
6369 for v in self._values_for_encoding():
6370 v.encode_cer(writer)
6371 write_full(writer, EOC)
6373 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6375 t, tlen, lv = tag_strip(tlv)
6376 except DecodeError as err:
6377 raise err.__class__(
6379 klass=self.__class__,
6380 decode_path=decode_path,
6385 klass=self.__class__,
6386 decode_path=decode_path,
6389 if tag_only: # pragma: no cover
6393 ctx_bered = ctx.get("bered", False)
6395 l, llen, v = len_decode(lv)
6396 except LenIndefForm as err:
6398 raise err.__class__(
6400 klass=self.__class__,
6401 decode_path=decode_path,
6404 l, llen, v = 0, 1, lv[1:]
6406 except DecodeError as err:
6407 raise err.__class__(
6409 klass=self.__class__,
6410 decode_path=decode_path,
6414 raise NotEnoughData(
6415 "encoded length is longer than data",
6416 klass=self.__class__,
6417 decode_path=decode_path,
6421 v, tail = v[:l], v[l:]
6423 sub_offset = offset + tlen + llen
6426 ctx_allow_default_values = ctx.get("allow_default_values", False)
6427 for name, spec in iteritems(self.specs):
6428 if spec.optional and (
6429 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6433 spec_defaulted = spec.default is not None
6434 sub_decode_path = decode_path + (name,)
6436 if evgen_mode and not spec_defaulted:
6437 for _decode_path, value, v_tail in spec.decode_evgen(
6441 decode_path=sub_decode_path,
6443 _ctx_immutable=False,
6445 yield _decode_path, value, v_tail
6447 _, value, v_tail = next(spec.decode_evgen(
6451 decode_path=sub_decode_path,
6453 _ctx_immutable=False,
6456 except TagMismatch as err:
6457 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6461 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6462 if not evgen_mode and defined is not None:
6463 defined_by, defined_spec = defined
6464 if issubclass(value.__class__, SequenceOf):
6465 for i, _value in enumerate(value):
6466 sub_sub_decode_path = sub_decode_path + (
6468 DecodePathDefBy(defined_by),
6470 defined_value, defined_tail = defined_spec.decode(
6471 memoryview(bytes(_value)),
6473 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6474 if value.expled else (value.tlen + value.llen)
6477 decode_path=sub_sub_decode_path,
6479 _ctx_immutable=False,
6481 if len(defined_tail) > 0:
6484 klass=self.__class__,
6485 decode_path=sub_sub_decode_path,
6488 _value.defined = (defined_by, defined_value)
6490 defined_value, defined_tail = defined_spec.decode(
6491 memoryview(bytes(value)),
6493 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6494 if value.expled else (value.tlen + value.llen)
6497 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6499 _ctx_immutable=False,
6501 if len(defined_tail) > 0:
6504 klass=self.__class__,
6505 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6508 value.defined = (defined_by, defined_value)
6510 value_len = value.fulllen
6512 sub_offset += value_len
6516 yield sub_decode_path, value, v_tail
6517 if value == spec.default:
6518 if ctx_bered or ctx_allow_default_values:
6522 "DEFAULT value met",
6523 klass=self.__class__,
6524 decode_path=sub_decode_path,
6528 values[name] = value
6529 spec_defines = getattr(spec, "defines", ())
6530 if len(spec_defines) == 0:
6531 defines_by_path = ctx.get("defines_by_path", ())
6532 if len(defines_by_path) > 0:
6533 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6534 if spec_defines is not None and len(spec_defines) > 0:
6535 for rel_path, schema in spec_defines:
6536 defined = schema.get(value, None)
6537 if defined is not None:
6538 ctx.setdefault("_defines", []).append((
6539 abs_decode_path(sub_decode_path[:-1], rel_path),
6543 if v[:EOC_LEN].tobytes() != EOC:
6546 klass=self.__class__,
6547 decode_path=decode_path,
6555 klass=self.__class__,
6556 decode_path=decode_path,
6559 obj = self.__class__(
6563 default=self.default,
6564 optional=self.optional,
6565 _decoded=(offset, llen, vlen),
6568 obj.lenindef = lenindef
6569 obj.ber_encoded = ber_encoded
6570 yield decode_path, obj, tail
6573 value = pp_console_row(next(self.pps()))
6575 for name in self.specs:
6576 _value = self._value.get(name)
6579 cols.append("%s: %s" % (name, repr(_value)))
6580 return "%s[%s]" % (value, "; ".join(cols))
6582 def pps(self, decode_path=()):
6585 asn1_type_name=self.asn1_type_name,
6586 obj_name=self.__class__.__name__,
6587 decode_path=decode_path,
6588 optional=self.optional,
6589 default=self == self.default,
6590 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6591 expl=None if self._expl is None else tag_decode(self._expl),
6596 expl_offset=self.expl_offset if self.expled else None,
6597 expl_tlen=self.expl_tlen if self.expled else None,
6598 expl_llen=self.expl_llen if self.expled else None,
6599 expl_vlen=self.expl_vlen if self.expled else None,
6600 expl_lenindef=self.expl_lenindef,
6601 lenindef=self.lenindef,
6602 ber_encoded=self.ber_encoded,
6605 for name in self.specs:
6606 value = self._value.get(name)
6609 yield value.pps(decode_path=decode_path + (name,))
6610 for pp in self.pps_lenindef(decode_path):
6614 class Set(Sequence, SequenceEncode1stMixing):
6615 """``SET`` structure type
6617 Its usage is identical to :py:class:`pyderasn.Sequence`.
6619 .. _allow_unordered_set_ctx:
6621 DER prohibits unordered values encoding and will raise an error
6622 during decode. If :ref:`bered <bered_ctx>` context option is set,
6623 then no error will occur. Also you can disable strict values
6624 ordering check by setting ``"allow_unordered_set": True``
6625 :ref:`context <ctx>` option.
6628 tag_default = tag_encode(form=TagFormConstructed, num=17)
6629 asn1_type_name = "SET"
6631 def _values_for_encoding(self):
6633 super(Set, self)._values_for_encoding(),
6634 key=attrgetter("tag_order"),
6637 def _encode_cer(self, writer):
6638 write_full(writer, self.tag + LENINDEF)
6640 super(Set, self)._values_for_encoding(),
6641 key=attrgetter("tag_order_cer"),
6643 v.encode_cer(writer)
6644 write_full(writer, EOC)
6646 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6648 t, tlen, lv = tag_strip(tlv)
6649 except DecodeError as err:
6650 raise err.__class__(
6652 klass=self.__class__,
6653 decode_path=decode_path,
6658 klass=self.__class__,
6659 decode_path=decode_path,
6666 ctx_bered = ctx.get("bered", False)
6668 l, llen, v = len_decode(lv)
6669 except LenIndefForm as err:
6671 raise err.__class__(
6673 klass=self.__class__,
6674 decode_path=decode_path,
6677 l, llen, v = 0, 1, lv[1:]
6679 except DecodeError as err:
6680 raise err.__class__(
6682 klass=self.__class__,
6683 decode_path=decode_path,
6687 raise NotEnoughData(
6688 "encoded length is longer than data",
6689 klass=self.__class__,
6693 v, tail = v[:l], v[l:]
6695 sub_offset = offset + tlen + llen
6698 ctx_allow_default_values = ctx.get("allow_default_values", False)
6699 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6700 tag_order_prev = (0, 0)
6701 _specs_items = copy(self.specs)
6704 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6706 for name, spec in iteritems(_specs_items):
6707 sub_decode_path = decode_path + (name,)
6713 decode_path=sub_decode_path,
6716 _ctx_immutable=False,
6723 klass=self.__class__,
6724 decode_path=decode_path,
6727 spec_defaulted = spec.default is not None
6728 if evgen_mode and not spec_defaulted:
6729 for _decode_path, value, v_tail in spec.decode_evgen(
6733 decode_path=sub_decode_path,
6735 _ctx_immutable=False,
6737 yield _decode_path, value, v_tail
6739 _, value, v_tail = next(spec.decode_evgen(
6743 decode_path=sub_decode_path,
6745 _ctx_immutable=False,
6748 value_tag_order = value.tag_order
6749 value_len = value.fulllen
6750 if tag_order_prev >= value_tag_order:
6751 if ctx_bered or ctx_allow_unordered_set:
6755 "unordered " + self.asn1_type_name,
6756 klass=self.__class__,
6757 decode_path=sub_decode_path,
6762 yield sub_decode_path, value, v_tail
6763 if value != spec.default:
6765 elif ctx_bered or ctx_allow_default_values:
6769 "DEFAULT value met",
6770 klass=self.__class__,
6771 decode_path=sub_decode_path,
6774 values[name] = value
6775 del _specs_items[name]
6776 tag_order_prev = value_tag_order
6777 sub_offset += value_len
6781 obj = self.__class__(
6785 default=self.default,
6786 optional=self.optional,
6787 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6790 if v[:EOC_LEN].tobytes() != EOC:
6793 klass=self.__class__,
6794 decode_path=decode_path,
6799 for name, spec in iteritems(self.specs):
6800 if name not in values and not spec.optional:
6802 "%s value is not ready" % name,
6803 klass=self.__class__,
6804 decode_path=decode_path,
6809 obj.ber_encoded = ber_encoded
6810 yield decode_path, obj, tail
6813 SequenceOfState = namedtuple(
6815 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6820 class SequenceOf(SequenceEncode1stMixing, Obj):
6821 """``SEQUENCE OF`` sequence type
6823 For that kind of type you must specify the object it will carry on
6824 (bounds are for example here, not required)::
6826 class Ints(SequenceOf):
6831 >>> ints.append(Integer(123))
6832 >>> ints.append(Integer(234))
6834 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6835 >>> [int(i) for i in ints]
6837 >>> ints.append(Integer(345))
6838 Traceback (most recent call last):
6839 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6842 >>> ints[1] = Integer(345)
6844 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6846 You can initialize sequence with preinitialized values:
6848 >>> ints = Ints([Integer(123), Integer(234)])
6850 Also you can use iterator as a value:
6852 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6854 And it won't be iterated until encoding process. Pay attention that
6855 bounds and required schema checks are done only during the encoding
6856 process in that case! After encode was called, then value is zeroed
6857 back to empty list and you have to set it again. That mode is useful
6858 mainly with CER encoding mode, where all objects from the iterable
6859 will be streamed to the buffer, without copying all of them to
6862 __slots__ = ("spec", "_bound_min", "_bound_max")
6863 tag_default = tag_encode(form=TagFormConstructed, num=16)
6864 asn1_type_name = "SEQUENCE OF"
6877 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6879 schema = getattr(self, "schema", None)
6881 raise ValueError("schema must be specified")
6883 self._bound_min, self._bound_max = getattr(
6887 ) if bounds is None else bounds
6889 if value is not None:
6890 self._value = self._value_sanitize(value)
6891 if default is not None:
6892 default_value = self._value_sanitize(default)
6893 default_obj = self.__class__(
6898 default_obj._value = default_value
6899 self.default = default_obj
6901 self._value = copy(default_obj._value)
6903 def _value_sanitize(self, value):
6905 if issubclass(value.__class__, SequenceOf):
6906 value = value._value
6907 elif hasattr(value, NEXT_ATTR_NAME):
6909 elif hasattr(value, "__iter__"):
6912 raise InvalidValueType((self.__class__, iter, "iterator"))
6914 if not self._bound_min <= len(value) <= self._bound_max:
6915 raise BoundsError(self._bound_min, len(value), self._bound_max)
6916 class_expected = self.spec.__class__
6918 if not isinstance(v, class_expected):
6919 raise InvalidValueType((class_expected,))
6924 if hasattr(self._value, NEXT_ATTR_NAME):
6926 if self._bound_min > 0 and len(self._value) == 0:
6928 return all(v.ready for v in self._value)
6932 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6934 return any(v.bered for v in self._value)
6936 def __getstate__(self):
6937 if hasattr(self._value, NEXT_ATTR_NAME):
6938 raise ValueError("can not pickle SequenceOf with iterator")
6939 return SequenceOfState(
6953 [copy(v) for v in self._value],
6958 def __setstate__(self, state):
6959 super(SequenceOf, self).__setstate__(state)
6960 self.spec = state.spec
6961 self._value = state.value
6962 self._bound_min = state.bound_min
6963 self._bound_max = state.bound_max
6965 def __eq__(self, their):
6966 if isinstance(their, self.__class__):
6968 self.spec == their.spec and
6969 self.tag == their.tag and
6970 self._expl == their._expl and
6971 self._value == their._value
6973 if hasattr(their, "__iter__"):
6974 return self._value == list(their)
6986 return self.__class__(
6990 (self._bound_min, self._bound_max)
6991 if bounds is None else bounds
6993 impl=self.tag if impl is None else impl,
6994 expl=self._expl if expl is None else expl,
6995 default=self.default if default is None else default,
6996 optional=self.optional if optional is None else optional,
6999 def __contains__(self, key):
7000 return key in self._value
7002 def append(self, value):
7003 if not isinstance(value, self.spec.__class__):
7004 raise InvalidValueType((self.spec.__class__,))
7005 if len(self._value) + 1 > self._bound_max:
7008 len(self._value) + 1,
7011 self._value.append(value)
7014 return iter(self._value)
7017 return len(self._value)
7019 def __setitem__(self, key, value):
7020 if not isinstance(value, self.spec.__class__):
7021 raise InvalidValueType((self.spec.__class__,))
7022 self._value[key] = self.spec(value=value)
7024 def __getitem__(self, key):
7025 return self._value[key]
7027 def _values_for_encoding(self):
7028 return iter(self._value)
7031 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7034 values_append = values.append
7035 class_expected = self.spec.__class__
7036 values_for_encoding = self._values_for_encoding()
7038 for v in values_for_encoding:
7039 if not isinstance(v, class_expected):
7040 raise InvalidValueType((class_expected,))
7041 values_append(v.encode())
7042 if not self._bound_min <= len(values) <= self._bound_max:
7043 raise BoundsError(self._bound_min, len(values), self._bound_max)
7044 value = b"".join(values)
7046 value = b"".join(v.encode() for v in self._values_for_encoding())
7047 return b"".join((self.tag, len_encode(len(value)), value))
7049 def _encode1st(self, state):
7050 state = super(SequenceOf, self)._encode1st(state)
7051 if hasattr(self._value, NEXT_ATTR_NAME):
7055 def _encode2nd(self, writer, state_iter):
7056 write_full(writer, self.tag + len_encode(next(state_iter)))
7057 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7060 class_expected = self.spec.__class__
7061 values_for_encoding = self._values_for_encoding()
7063 for v in values_for_encoding:
7064 if not isinstance(v, class_expected):
7065 raise InvalidValueType((class_expected,))
7066 v.encode2nd(writer, state_iter)
7068 if not self._bound_min <= values_count <= self._bound_max:
7069 raise BoundsError(self._bound_min, values_count, self._bound_max)
7071 for v in self._values_for_encoding():
7072 v.encode2nd(writer, state_iter)
7074 def _encode_cer(self, writer):
7075 write_full(writer, self.tag + LENINDEF)
7076 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7078 class_expected = self.spec.__class__
7080 values_for_encoding = self._values_for_encoding()
7082 for v in values_for_encoding:
7083 if not isinstance(v, class_expected):
7084 raise InvalidValueType((class_expected,))
7085 v.encode_cer(writer)
7087 if not self._bound_min <= values_count <= self._bound_max:
7088 raise BoundsError(self._bound_min, values_count, self._bound_max)
7090 for v in self._values_for_encoding():
7091 v.encode_cer(writer)
7092 write_full(writer, EOC)
7102 ordering_check=False,
7105 t, tlen, lv = tag_strip(tlv)
7106 except DecodeError as err:
7107 raise err.__class__(
7109 klass=self.__class__,
7110 decode_path=decode_path,
7115 klass=self.__class__,
7116 decode_path=decode_path,
7123 ctx_bered = ctx.get("bered", False)
7125 l, llen, v = len_decode(lv)
7126 except LenIndefForm as err:
7128 raise err.__class__(
7130 klass=self.__class__,
7131 decode_path=decode_path,
7134 l, llen, v = 0, 1, lv[1:]
7136 except DecodeError as err:
7137 raise err.__class__(
7139 klass=self.__class__,
7140 decode_path=decode_path,
7144 raise NotEnoughData(
7145 "encoded length is longer than data",
7146 klass=self.__class__,
7147 decode_path=decode_path,
7151 v, tail = v[:l], v[l:]
7153 sub_offset = offset + tlen + llen
7156 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7157 value_prev = memoryview(v[:0])
7161 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7163 sub_decode_path = decode_path + (str(_value_count),)
7165 for _decode_path, value, v_tail in spec.decode_evgen(
7169 decode_path=sub_decode_path,
7171 _ctx_immutable=False,
7173 yield _decode_path, value, v_tail
7175 _, value, v_tail = next(spec.decode_evgen(
7179 decode_path=sub_decode_path,
7181 _ctx_immutable=False,
7184 value_len = value.fulllen
7186 if value_prev.tobytes() > v[:value_len].tobytes():
7187 if ctx_bered or ctx_allow_unordered_set:
7191 "unordered " + self.asn1_type_name,
7192 klass=self.__class__,
7193 decode_path=sub_decode_path,
7196 value_prev = v[:value_len]
7199 _value.append(value)
7200 sub_offset += value_len
7203 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7205 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7206 klass=self.__class__,
7207 decode_path=decode_path,
7211 obj = self.__class__(
7212 value=None if evgen_mode else _value,
7214 bounds=(self._bound_min, self._bound_max),
7217 default=self.default,
7218 optional=self.optional,
7219 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7221 except BoundsError as err:
7224 klass=self.__class__,
7225 decode_path=decode_path,
7229 if v[:EOC_LEN].tobytes() != EOC:
7232 klass=self.__class__,
7233 decode_path=decode_path,
7238 obj.ber_encoded = ber_encoded
7239 yield decode_path, obj, tail
7243 pp_console_row(next(self.pps())),
7244 ", ".join(repr(v) for v in self._value),
7247 def pps(self, decode_path=()):
7250 asn1_type_name=self.asn1_type_name,
7251 obj_name=self.__class__.__name__,
7252 decode_path=decode_path,
7253 optional=self.optional,
7254 default=self == self.default,
7255 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7256 expl=None if self._expl is None else tag_decode(self._expl),
7261 expl_offset=self.expl_offset if self.expled else None,
7262 expl_tlen=self.expl_tlen if self.expled else None,
7263 expl_llen=self.expl_llen if self.expled else None,
7264 expl_vlen=self.expl_vlen if self.expled else None,
7265 expl_lenindef=self.expl_lenindef,
7266 lenindef=self.lenindef,
7267 ber_encoded=self.ber_encoded,
7270 for i, value in enumerate(self._value):
7271 yield value.pps(decode_path=decode_path + (str(i),))
7272 for pp in self.pps_lenindef(decode_path):
7276 class SetOf(SequenceOf):
7277 """``SET OF`` sequence type
7279 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7282 tag_default = tag_encode(form=TagFormConstructed, num=17)
7283 asn1_type_name = "SET OF"
7285 def _value_sanitize(self, value):
7286 value = super(SetOf, self)._value_sanitize(value)
7287 if hasattr(value, NEXT_ATTR_NAME):
7289 "SetOf does not support iterator values, as no sense in them"
7294 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7295 return b"".join((self.tag, len_encode(len(v)), v))
7297 def _encode2nd(self, writer, state_iter):
7298 write_full(writer, self.tag + len_encode(next(state_iter)))
7300 for v in self._values_for_encoding():
7302 v.encode2nd(buf.write, state_iter)
7303 values.append(buf.getvalue())
7306 write_full(writer, v)
7308 def _encode_cer(self, writer):
7309 write_full(writer, self.tag + LENINDEF)
7310 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7311 write_full(writer, v)
7312 write_full(writer, EOC)
7314 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7315 return super(SetOf, self)._decode(
7322 ordering_check=True,
7326 def obj_by_path(pypath): # pragma: no cover
7327 """Import object specified as string Python path
7329 Modules must be separated from classes/functions with ``:``.
7331 >>> obj_by_path("foo.bar:Baz")
7332 <class 'foo.bar.Baz'>
7333 >>> obj_by_path("foo.bar:Baz.boo")
7334 <classmethod 'foo.bar.Baz.boo'>
7336 mod, objs = pypath.rsplit(":", 1)
7337 from importlib import import_module
7338 obj = import_module(mod)
7339 for obj_name in objs.split("."):
7340 obj = getattr(obj, obj_name)
7344 def generic_decoder(): # pragma: no cover
7345 # All of this below is a big hack with self references
7346 choice = PrimitiveTypes()
7347 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7348 choice.specs["SetOf"] = SetOf(schema=choice)
7349 for i in six_xrange(31):
7350 choice.specs["SequenceOf%d" % i] = SequenceOf(
7354 choice.specs["Any"] = Any()
7356 # Class name equals to type name, to omit it from output
7357 class SEQUENCEOF(SequenceOf):
7365 with_decode_path=False,
7366 decode_path_only=(),
7369 def _pprint_pps(pps):
7371 if hasattr(pp, "_fields"):
7373 decode_path_only != () and
7374 pp.decode_path[:len(decode_path_only)] != decode_path_only
7377 if pp.asn1_type_name == Choice.asn1_type_name:
7379 pp_kwargs = pp._asdict()
7380 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7381 pp = _pp(**pp_kwargs)
7382 yield pp_console_row(
7387 with_colours=with_colours,
7388 with_decode_path=with_decode_path,
7389 decode_path_len_decrease=len(decode_path_only),
7391 for row in pp_console_blob(
7393 decode_path_len_decrease=len(decode_path_only),
7397 for row in _pprint_pps(pp):
7399 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7400 return SEQUENCEOF(), pprint_any
7403 def main(): # pragma: no cover
7405 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7406 parser.add_argument(
7410 help="Skip that number of bytes from the beginning",
7412 parser.add_argument(
7414 help="Python paths to dictionary with OIDs, comma separated",
7416 parser.add_argument(
7418 help="Python path to schema definition to use",
7420 parser.add_argument(
7421 "--defines-by-path",
7422 help="Python path to decoder's defines_by_path",
7424 parser.add_argument(
7426 action="store_true",
7427 help="Disallow BER encoding",
7429 parser.add_argument(
7430 "--print-decode-path",
7431 action="store_true",
7432 help="Print decode paths",
7434 parser.add_argument(
7435 "--decode-path-only",
7436 help="Print only specified decode path",
7438 parser.add_argument(
7440 action="store_true",
7441 help="Allow explicit tag out-of-bound",
7443 parser.add_argument(
7445 action="store_true",
7446 help="Turn on event generation mode",
7448 parser.add_argument(
7450 type=argparse.FileType("rb"),
7451 help="Path to BER/CER/DER file you want to decode",
7453 args = parser.parse_args()
7455 args.RAWFile.seek(args.skip)
7456 raw = memoryview(args.RAWFile.read())
7457 args.RAWFile.close()
7459 raw = file_mmaped(args.RAWFile)[args.skip:]
7461 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7462 if args.oids else ()
7464 from functools import partial
7466 schema = obj_by_path(args.schema)
7467 pprinter = partial(pprint, big_blobs=True)
7469 schema, pprinter = generic_decoder()
7471 "bered": not args.nobered,
7472 "allow_expl_oob": args.allow_expl_oob,
7474 if args.defines_by_path is not None:
7475 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7476 from os import environ
7480 with_colours=environ.get("NO_COLOR") is None,
7481 with_decode_path=args.print_decode_path,
7483 () if args.decode_path_only is None else
7484 tuple(args.decode_path_only.split(":"))
7488 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7489 print(pprinter(obj, decode_path=decode_path))
7491 obj, tail = schema().decode(raw, ctx=ctx)
7492 print(pprinter(obj))
7494 print("\nTrailing data: %s" % hexenc(tail))
7497 if __name__ == "__main__":