3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
670 SEQUENCE/SET values with DEFAULT specified are automatically decoded
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
687 with open("huge", "rb") as fd:
688 raw = file_mmaped(fd)
689 obj = Something.decode(raw)
693 mmap-ed files in Python2.7 does not implement buffer protocol, so
694 memoryview won't work on them.
698 mmap maps the **whole** file. So it plays no role if you seek-ed it
699 before. Take the slice of the resulting memoryview with required
704 If you use ZFS as underlying storage, then pay attention that
705 currently most platforms does not deal good with ZFS ARC and ordinary
706 page cache used for mmaps. It can take twice the necessary size in
707 the memory: both in page cache and ZFS ARC.
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
725 opener = io.open if PY2 else open
726 with opener("result", "wb") as fd:
727 obj.encode_cer(fd.write)
732 obj.encode_cer(buf.write)
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
737 data = encode_cer(obj)
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
746 SET OF values can not be streamingly encoded, because they are
747 required to be sorted byte-by-byte. Big SET OF values still will take
748 much memory. Use neither SET nor SET OF values, as modern ASN.1
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
762 class Certificate(Sequence):
766 class SignedAttributes(SetOf):
771 .. _agg_octet_string:
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
780 SOME STRING[CONSTRUCTED]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[CONSTRUCTED]
797 OCTET STRING[PRIMITIVE]
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
812 fd = open("data.p7m", "rb")
813 raw = file_mmaped(fd)
814 ctx = {"bered": True}
815 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816 if decode_path == ("content",):
820 raise ValueError("no content found")
821 hasher_state = sha512()
823 hasher_state.update(data)
825 evgens = SignedData().decode_evgen(
826 raw[content.offset:],
827 offset=content.offset,
830 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
832 digest = hasher_state.digest()
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
841 SEQUENCE OF iterators
842 _____________________
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
878 fulllen, state = obj.encode1st()
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
885 opener = io.open if PY2 else open
886 with opener("result", "wb") as fd:
887 obj.encode2nd(fd.write, iter(state))
891 You **MUST NOT** use 1st pass state if anything is changed in the
892 objects. It is intended to be used immediately after 1st pass is
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
906 .. autoclass:: pyderasn.Obj
914 .. autoclass:: pyderasn.Boolean
919 .. autoclass:: pyderasn.Integer
920 :members: __init__, named
924 .. autoclass:: pyderasn.BitString
925 :members: __init__, bit_len, named
929 .. autoclass:: pyderasn.OctetString
934 .. autoclass:: pyderasn.Null
939 .. autoclass:: pyderasn.ObjectIdentifier
944 .. autoclass:: pyderasn.Enumerated
948 .. autoclass:: pyderasn.CommonString
952 .. autoclass:: pyderasn.NumericString
956 .. autoclass:: pyderasn.PrintableString
957 :members: __init__, allow_asterisk, allow_ampersand
961 .. autoclass:: pyderasn.UTCTime
962 :members: __init__, todatetime
966 .. autoclass:: pyderasn.GeneralizedTime
967 :members: __init__, todatetime
974 .. autoclass:: pyderasn.Choice
975 :members: __init__, choice, value
979 .. autoclass:: PrimitiveTypes
983 .. autoclass:: pyderasn.Any
991 .. autoclass:: pyderasn.Sequence
996 .. autoclass:: pyderasn.Set
1001 .. autoclass:: pyderasn.SequenceOf
1006 .. autoclass:: pyderasn.SetOf
1012 .. autofunction:: pyderasn.abs_decode_path
1013 .. autofunction:: pyderasn.agg_octet_string
1014 .. autofunction:: pyderasn.colonize_hex
1015 .. autofunction:: pyderasn.encode2pass
1016 .. autofunction:: pyderasn.encode_cer
1017 .. autofunction:: pyderasn.file_mmaped
1018 .. autofunction:: pyderasn.hexenc
1019 .. autofunction:: pyderasn.hexdec
1020 .. autofunction:: pyderasn.tag_encode
1021 .. autofunction:: pyderasn.tag_decode
1022 .. autofunction:: pyderasn.tag_ctxp
1023 .. autofunction:: pyderasn.tag_ctxc
1024 .. autoclass:: pyderasn.DecodeError
1026 .. autoclass:: pyderasn.NotEnoughData
1027 .. autoclass:: pyderasn.ExceedingData
1028 .. autoclass:: pyderasn.LenIndefForm
1029 .. autoclass:: pyderasn.TagMismatch
1030 .. autoclass:: pyderasn.InvalidLength
1031 .. autoclass:: pyderasn.InvalidOID
1032 .. autoclass:: pyderasn.ObjUnknown
1033 .. autoclass:: pyderasn.ObjNotReady
1034 .. autoclass:: pyderasn.InvalidValueType
1035 .. autoclass:: pyderasn.BoundsError
1042 You can decode DER/BER files using command line abilities::
1044 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1046 If there is no schema for your file, then you can try parsing it without,
1047 but of course IMPLICIT tags will often make it impossible. But result is
1048 good enough for the certificate above::
1050 $ python -m pyderasn path/to/file
1051 0 [1,3,1604] . >: SEQUENCE OF
1052 4 [1,3,1453] . . >: SEQUENCE OF
1053 8 [0,0, 5] . . . . >: [0] ANY
1054 . . . . . A0:03:02:01:02
1055 13 [1,1, 3] . . . . >: INTEGER 61595
1056 18 [1,1, 13] . . . . >: SEQUENCE OF
1057 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1058 31 [1,1, 0] . . . . . . >: NULL
1059 33 [1,3, 274] . . . . >: SEQUENCE OF
1060 37 [1,1, 11] . . . . . . >: SET OF
1061 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1062 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1063 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1065 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1066 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1067 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1068 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1069 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1070 . . . . . . . . . 61:2E:63:6F:6D:2F
1071 1461 [1,1, 13] . . >: SEQUENCE OF
1072 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1073 1474 [1,1, 0] . . . . >: NULL
1074 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1075 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1076 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1082 If you have got dictionaries with ObjectIdentifiers, like example one
1083 from ``tests/test_crts.py``::
1086 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1087 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1089 "2.5.4.10": "id-at-organizationName",
1090 "2.5.4.11": "id-at-organizationalUnitName",
1093 then you can pass it to pretty printer to see human readable OIDs::
1095 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1097 37 [1,1, 11] . . . . . . >: SET OF
1098 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1099 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1100 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1101 50 [1,1, 18] . . . . . . >: SET OF
1102 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1103 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1104 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1105 70 [1,1, 18] . . . . . . >: SET OF
1106 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1107 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1108 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1114 Each decoded element has so-called decode path: sequence of structure
1115 names it is passing during the decode process. Each element has its own
1116 unique path inside the whole ASN.1 tree. You can print it out with
1117 ``--print-decode-path`` option::
1119 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1120 0 [1,3,1604] Certificate SEQUENCE []
1121 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1122 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1123 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1124 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1125 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1126 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1128 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1129 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1130 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1131 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1132 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1133 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1134 . . . . . . . 13:02:45:53
1135 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1138 Now you can print only the specified tree, for example signature algorithm::
1140 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1141 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1142 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1143 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1147 from array import array
1148 from codecs import getdecoder
1149 from codecs import getencoder
1150 from collections import namedtuple
1151 from collections import OrderedDict
1152 from copy import copy
1153 from datetime import datetime
1154 from datetime import timedelta
1155 from io import BytesIO
1156 from math import ceil
1157 from mmap import mmap
1158 from mmap import PROT_READ
1159 from operator import attrgetter
1160 from string import ascii_letters
1161 from string import digits
1162 from sys import maxsize as sys_maxsize
1163 from sys import version_info
1164 from unicodedata import category as unicat
1166 from six import add_metaclass
1167 from six import binary_type
1168 from six import byte2int
1169 from six import indexbytes
1170 from six import int2byte
1171 from six import integer_types
1172 from six import iterbytes
1173 from six import iteritems
1174 from six import itervalues
1176 from six import string_types
1177 from six import text_type
1178 from six import unichr as six_unichr
1179 from six.moves import xrange as six_xrange
1183 from termcolor import colored
1184 except ImportError: # pragma: no cover
1185 def colored(what, *args, **kwargs):
1235 "TagClassApplication",
1238 "TagClassUniversal",
1239 "TagFormConstructed",
1250 TagClassUniversal = 0
1251 TagClassApplication = 1 << 6
1252 TagClassContext = 1 << 7
1253 TagClassPrivate = 1 << 6 | 1 << 7
1254 TagFormPrimitive = 0
1255 TagFormConstructed = 1 << 5
1257 TagClassContext: "",
1258 TagClassApplication: "APPLICATION ",
1259 TagClassPrivate: "PRIVATE ",
1260 TagClassUniversal: "UNIV ",
1264 LENINDEF = b"\x80" # length indefinite mark
1265 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1266 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1267 SET01 = frozenset("01")
1268 DECIMALS = frozenset(digits)
1269 DECIMAL_SIGNS = ".,"
1270 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1273 def file_mmaped(fd):
1274 """Make mmap-ed memoryview for reading from file
1276 :param fd: file object
1277 :returns: memoryview over read-only mmap-ing of the whole file
1279 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1282 if not set(value) <= DECIMALS:
1283 raise ValueError("non-pure integer")
1286 def fractions2float(fractions_raw):
1287 pureint(fractions_raw)
1288 return float("0." + fractions_raw)
1291 def get_def_by_path(defines_by_path, sub_decode_path):
1292 """Get define by decode path
1294 for path, define in defines_by_path:
1295 if len(path) != len(sub_decode_path):
1297 for p1, p2 in zip(path, sub_decode_path):
1298 if (not p1 is any) and (p1 != p2):
1304 ########################################################################
1306 ########################################################################
1308 class ASN1Error(ValueError):
1312 class DecodeError(ASN1Error):
1313 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1315 :param str msg: reason of decode failing
1316 :param klass: optional exact DecodeError inherited class (like
1317 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1318 :py:exc:`InvalidLength`)
1319 :param decode_path: tuple of strings. It contains human
1320 readable names of the fields through which
1321 decoding process has passed
1322 :param int offset: binary offset where failure happened
1324 super(DecodeError, self).__init__()
1327 self.decode_path = decode_path
1328 self.offset = offset
1333 "" if self.klass is None else self.klass.__name__,
1335 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1336 if len(self.decode_path) > 0 else ""
1338 ("(at %d)" % self.offset) if self.offset > 0 else "",
1344 return "%s(%s)" % (self.__class__.__name__, self)
1347 class NotEnoughData(DecodeError):
1351 class ExceedingData(ASN1Error):
1352 def __init__(self, nbytes):
1353 super(ExceedingData, self).__init__()
1354 self.nbytes = nbytes
1357 return "%d trailing bytes" % self.nbytes
1360 return "%s(%s)" % (self.__class__.__name__, self)
1363 class LenIndefForm(DecodeError):
1367 class TagMismatch(DecodeError):
1371 class InvalidLength(DecodeError):
1375 class InvalidOID(DecodeError):
1379 class ObjUnknown(ASN1Error):
1380 def __init__(self, name):
1381 super(ObjUnknown, self).__init__()
1385 return "object is unknown: %s" % self.name
1388 return "%s(%s)" % (self.__class__.__name__, self)
1391 class ObjNotReady(ASN1Error):
1392 def __init__(self, name):
1393 super(ObjNotReady, self).__init__()
1397 return "object is not ready: %s" % self.name
1400 return "%s(%s)" % (self.__class__.__name__, self)
1403 class InvalidValueType(ASN1Error):
1404 def __init__(self, expected_types):
1405 super(InvalidValueType, self).__init__()
1406 self.expected_types = expected_types
1409 return "invalid value type, expected: %s" % ", ".join(
1410 [repr(t) for t in self.expected_types]
1414 return "%s(%s)" % (self.__class__.__name__, self)
1417 class BoundsError(ASN1Error):
1418 def __init__(self, bound_min, value, bound_max):
1419 super(BoundsError, self).__init__()
1420 self.bound_min = bound_min
1422 self.bound_max = bound_max
1425 return "unsatisfied bounds: %s <= %s <= %s" % (
1432 return "%s(%s)" % (self.__class__.__name__, self)
1435 ########################################################################
1437 ########################################################################
1439 _hexdecoder = getdecoder("hex")
1440 _hexencoder = getencoder("hex")
1444 """Binary data to hexadecimal string convert
1446 return _hexdecoder(data)[0]
1450 """Hexadecimal string to binary data convert
1452 return _hexencoder(data)[0].decode("ascii")
1455 def int_bytes_len(num, byte_len=8):
1458 return int(ceil(float(num.bit_length()) / byte_len))
1461 def zero_ended_encode(num):
1462 octets = bytearray(int_bytes_len(num, 7))
1464 octets[i] = num & 0x7F
1468 octets[i] = 0x80 | (num & 0x7F)
1471 return bytes(octets)
1474 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1475 """Encode tag to binary form
1477 :param int num: tag's number
1478 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1479 :py:data:`pyderasn.TagClassContext`,
1480 :py:data:`pyderasn.TagClassApplication`,
1481 :py:data:`pyderasn.TagClassPrivate`)
1482 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1483 :py:data:`pyderasn.TagFormConstructed`)
1487 return int2byte(klass | form | num)
1488 # [XX|X|11111][1.......][1.......] ... [0.......]
1489 return int2byte(klass | form | 31) + zero_ended_encode(num)
1492 def tag_decode(tag):
1493 """Decode tag from binary form
1497 No validation is performed, assuming that it has already passed.
1499 It returns tuple with three integers, as
1500 :py:func:`pyderasn.tag_encode` accepts.
1502 first_octet = byte2int(tag)
1503 klass = first_octet & 0xC0
1504 form = first_octet & 0x20
1505 if first_octet & 0x1F < 0x1F:
1506 return (klass, form, first_octet & 0x1F)
1508 for octet in iterbytes(tag[1:]):
1511 return (klass, form, num)
1515 """Create CONTEXT PRIMITIVE tag
1517 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1521 """Create CONTEXT CONSTRUCTED tag
1523 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1526 def tag_strip(data):
1527 """Take off tag from the data
1529 :returns: (encoded tag, tag length, remaining data)
1532 raise NotEnoughData("no data at all")
1533 if byte2int(data) & 0x1F < 31:
1534 return data[:1], 1, data[1:]
1539 raise DecodeError("unfinished tag")
1540 if indexbytes(data, i) & 0x80 == 0:
1543 return data[:i], i, data[i:]
1549 octets = bytearray(int_bytes_len(l) + 1)
1550 octets[0] = 0x80 | (len(octets) - 1)
1551 for i in six_xrange(len(octets) - 1, 0, -1):
1552 octets[i] = l & 0xFF
1554 return bytes(octets)
1557 def len_decode(data):
1560 :returns: (decoded length, length's length, remaining data)
1561 :raises LenIndefForm: if indefinite form encoding is met
1564 raise NotEnoughData("no data at all")
1565 first_octet = byte2int(data)
1566 if first_octet & 0x80 == 0:
1567 return first_octet, 1, data[1:]
1568 octets_num = first_octet & 0x7F
1569 if octets_num + 1 > len(data):
1570 raise NotEnoughData("encoded length is longer than data")
1572 raise LenIndefForm()
1573 if byte2int(data[1:]) == 0:
1574 raise DecodeError("leading zeros")
1576 for v in iterbytes(data[1:1 + octets_num]):
1579 raise DecodeError("long form instead of short one")
1580 return l, 1 + octets_num, data[1 + octets_num:]
1583 LEN0 = len_encode(0)
1584 LEN1 = len_encode(1)
1585 LEN1K = len_encode(1000)
1589 """How many bytes length field will take
1593 if l < 256: # 1 << 8
1595 if l < 65536: # 1 << 16
1597 if l < 16777216: # 1 << 24
1599 if l < 4294967296: # 1 << 32
1601 if l < 1099511627776: # 1 << 40
1603 if l < 281474976710656: # 1 << 48
1605 if l < 72057594037927936: # 1 << 56
1607 raise OverflowError("too big length")
1610 def write_full(writer, data):
1611 """Fully write provided data
1613 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1615 BytesIO does not guarantee that the whole data will be written at
1616 once. That function write everything provided, raising an error if
1617 ``writer`` returns None.
1619 data = memoryview(data)
1621 while written != len(data):
1622 n = writer(data[written:])
1624 raise ValueError("can not write to buf")
1628 # If it is 64-bit system, then use compact 64-bit array of unsigned
1629 # longs. Use an ordinary list with universal integers otherwise, that
1631 if sys_maxsize > 2 ** 32:
1632 def state_2pass_new():
1635 def state_2pass_new():
1639 ########################################################################
1641 ########################################################################
1643 class AutoAddSlots(type):
1644 def __new__(cls, name, bases, _dict):
1645 _dict["__slots__"] = _dict.get("__slots__", ())
1646 return type.__new__(cls, name, bases, _dict)
1649 BasicState = namedtuple("BasicState", (
1662 ), **NAMEDTUPLE_KWARGS)
1665 @add_metaclass(AutoAddSlots)
1667 """Common ASN.1 object class
1669 All ASN.1 types are inherited from it. It has metaclass that
1670 automatically adds ``__slots__`` to all inherited classes.
1695 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1696 self._expl = getattr(self, "expl", None) if expl is None else expl
1697 if self.tag != self.tag_default and self._expl is not None:
1698 raise ValueError("implicit and explicit tags can not be set simultaneously")
1699 if self.tag is None:
1700 self._tag_order = None
1702 tag_class, _, tag_num = tag_decode(
1703 self.tag if self._expl is None else self._expl
1705 self._tag_order = (tag_class, tag_num)
1706 if default is not None:
1708 self.optional = optional
1709 self.offset, self.llen, self.vlen = _decoded
1711 self.expl_lenindef = False
1712 self.lenindef = False
1713 self.ber_encoded = False
1716 def ready(self): # pragma: no cover
1717 """Is object ready to be encoded?
1719 raise NotImplementedError()
1721 def _assert_ready(self):
1723 raise ObjNotReady(self.__class__.__name__)
1727 """Is either object or any elements inside is BER encoded?
1729 return self.expl_lenindef or self.lenindef or self.ber_encoded
1733 """Is object decoded?
1735 return (self.llen + self.vlen) > 0
1737 def __getstate__(self): # pragma: no cover
1738 """Used for making safe to be mutable pickleable copies
1740 raise NotImplementedError()
1742 def __setstate__(self, state):
1743 if state.version != __version__:
1744 raise ValueError("data is pickled by different PyDERASN version")
1745 self.tag = state.tag
1746 self._tag_order = state.tag_order
1747 self._expl = state.expl
1748 self.default = state.default
1749 self.optional = state.optional
1750 self.offset = state.offset
1751 self.llen = state.llen
1752 self.vlen = state.vlen
1753 self.expl_lenindef = state.expl_lenindef
1754 self.lenindef = state.lenindef
1755 self.ber_encoded = state.ber_encoded
1758 def tag_order(self):
1759 """Tag's (class, number) used for DER/CER sorting
1761 return self._tag_order
1764 def tag_order_cer(self):
1765 return self.tag_order
1769 """.. seealso:: :ref:`decoding`
1771 return len(self.tag)
1775 """.. seealso:: :ref:`decoding`
1777 return self.tlen + self.llen + self.vlen
1779 def __str__(self): # pragma: no cover
1780 return self.__bytes__() if PY2 else self.__unicode__()
1782 def __ne__(self, their):
1783 return not(self == their)
1785 def __gt__(self, their): # pragma: no cover
1786 return not(self < their)
1788 def __le__(self, their): # pragma: no cover
1789 return (self == their) or (self < their)
1791 def __ge__(self, their): # pragma: no cover
1792 return (self == their) or (self > their)
1794 def _encode(self): # pragma: no cover
1795 raise NotImplementedError()
1797 def _encode_cer(self, writer):
1798 write_full(writer, self._encode())
1800 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1801 yield NotImplemented
1803 def _encode1st(self, state):
1804 raise NotImplementedError()
1806 def _encode2nd(self, writer, state_iter):
1807 raise NotImplementedError()
1810 """DER encode the structure
1812 :returns: DER representation
1814 raw = self._encode()
1815 if self._expl is None:
1817 return b"".join((self._expl, len_encode(len(raw)), raw))
1819 def encode1st(self, state=None):
1820 """Do the 1st pass of 2-pass encoding
1822 :rtype: (int, array("L"))
1823 :returns: full length of encoded data and precalculated various
1827 state = state_2pass_new()
1828 if self._expl is None:
1829 return self._encode1st(state)
1831 idx = len(state) - 1
1832 vlen, _ = self._encode1st(state)
1834 fulllen = len(self._expl) + len_size(vlen) + vlen
1835 return fulllen, state
1837 def encode2nd(self, writer, state_iter):
1838 """Do the 2nd pass of 2-pass encoding
1840 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1841 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1843 if self._expl is None:
1844 self._encode2nd(writer, state_iter)
1846 write_full(writer, self._expl + len_encode(next(state_iter)))
1847 self._encode2nd(writer, state_iter)
1849 def encode_cer(self, writer):
1850 """CER encode the structure to specified writer
1852 :param writer: must comply with ``io.RawIOBase.write``
1853 behaviour. It takes slice to be written and
1854 returns number of bytes processed. If it returns
1855 None, then exception will be raised
1857 if self._expl is not None:
1858 write_full(writer, self._expl + LENINDEF)
1859 if getattr(self, "der_forced", False):
1860 write_full(writer, self._encode())
1862 self._encode_cer(writer)
1863 if self._expl is not None:
1864 write_full(writer, EOC)
1866 def hexencode(self):
1867 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1869 return hexenc(self.encode())
1879 _ctx_immutable=True,
1883 :param data: either binary or memoryview
1884 :param int offset: initial data's offset
1885 :param bool leavemm: do we need to leave memoryview of remaining
1886 data as is, or convert it to bytes otherwise
1887 :param decode_path: current decode path (tuples of strings,
1888 possibly with DecodePathDefBy) with will be
1889 the root for all underlying objects
1890 :param ctx: optional :ref:`context <ctx>` governing decoding process
1891 :param bool tag_only: decode only the tag, without length and
1892 contents (used only in Choice and Set
1893 structures, trying to determine if tag satisfies
1895 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1897 :returns: (Obj, remaining data)
1899 .. seealso:: :ref:`decoding`
1901 result = next(self.decode_evgen(
1913 _, obj, tail = result
1924 _ctx_immutable=True,
1927 """Decode with evgen mode on
1929 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1930 it returns the generator producing ``(decode_path, obj, tail)``
1932 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1936 elif _ctx_immutable:
1938 tlv = memoryview(data)
1941 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1944 if self._expl is None:
1945 for result in self._decode(
1948 decode_path=decode_path,
1951 evgen_mode=_evgen_mode,
1956 _decode_path, obj, tail = result
1957 if not _decode_path is decode_path:
1961 t, tlen, lv = tag_strip(tlv)
1962 except DecodeError as err:
1963 raise err.__class__(
1965 klass=self.__class__,
1966 decode_path=decode_path,
1971 klass=self.__class__,
1972 decode_path=decode_path,
1976 l, llen, v = len_decode(lv)
1977 except LenIndefForm as err:
1978 if not ctx.get("bered", False):
1979 raise err.__class__(
1981 klass=self.__class__,
1982 decode_path=decode_path,
1986 offset += tlen + llen
1987 for result in self._decode(
1990 decode_path=decode_path,
1993 evgen_mode=_evgen_mode,
1995 if tag_only: # pragma: no cover
1998 _decode_path, obj, tail = result
1999 if not _decode_path is decode_path:
2001 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2002 if eoc_expected.tobytes() != EOC:
2005 klass=self.__class__,
2006 decode_path=decode_path,
2010 obj.expl_lenindef = True
2011 except DecodeError as err:
2012 raise err.__class__(
2014 klass=self.__class__,
2015 decode_path=decode_path,
2020 raise NotEnoughData(
2021 "encoded length is longer than data",
2022 klass=self.__class__,
2023 decode_path=decode_path,
2026 for result in self._decode(
2028 offset=offset + tlen + llen,
2029 decode_path=decode_path,
2032 evgen_mode=_evgen_mode,
2034 if tag_only: # pragma: no cover
2037 _decode_path, obj, tail = result
2038 if not _decode_path is decode_path:
2040 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2042 "explicit tag out-of-bound, longer than data",
2043 klass=self.__class__,
2044 decode_path=decode_path,
2047 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2049 def decod(self, data, offset=0, decode_path=(), ctx=None):
2050 """Decode the data, check that tail is empty
2052 :raises ExceedingData: if tail is not empty
2054 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2055 (decode without tail) that also checks that there is no
2058 obj, tail = self.decode(
2061 decode_path=decode_path,
2066 raise ExceedingData(len(tail))
2069 def hexdecode(self, data, *args, **kwargs):
2070 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2072 return self.decode(hexdec(data), *args, **kwargs)
2074 def hexdecod(self, data, *args, **kwargs):
2075 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2077 return self.decod(hexdec(data), *args, **kwargs)
2081 """.. seealso:: :ref:`decoding`
2083 return self._expl is not None
2087 """.. seealso:: :ref:`decoding`
2092 def expl_tlen(self):
2093 """.. seealso:: :ref:`decoding`
2095 return len(self._expl)
2098 def expl_llen(self):
2099 """.. seealso:: :ref:`decoding`
2101 if self.expl_lenindef:
2103 return len(len_encode(self.tlvlen))
2106 def expl_offset(self):
2107 """.. seealso:: :ref:`decoding`
2109 return self.offset - self.expl_tlen - self.expl_llen
2112 def expl_vlen(self):
2113 """.. seealso:: :ref:`decoding`
2118 def expl_tlvlen(self):
2119 """.. seealso:: :ref:`decoding`
2121 return self.expl_tlen + self.expl_llen + self.expl_vlen
2124 def fulloffset(self):
2125 """.. seealso:: :ref:`decoding`
2127 return self.expl_offset if self.expled else self.offset
2131 """.. seealso:: :ref:`decoding`
2133 return self.expl_tlvlen if self.expled else self.tlvlen
2135 def pps_lenindef(self, decode_path):
2136 if self.lenindef and not (
2137 getattr(self, "defined", None) is not None and
2138 self.defined[1].lenindef
2141 asn1_type_name="EOC",
2143 decode_path=decode_path,
2145 self.offset + self.tlvlen -
2146 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2154 if self.expl_lenindef:
2156 asn1_type_name="EOC",
2157 obj_name="EXPLICIT",
2158 decode_path=decode_path,
2159 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2168 def encode_cer(obj):
2169 """Encode to CER in memory buffer
2171 :returns bytes: memory buffer contents
2174 obj.encode_cer(buf.write)
2175 return buf.getvalue()
2178 def encode2pass(obj):
2179 """Encode (2-pass mode) to DER in memory buffer
2181 :returns bytes: memory buffer contents
2184 _, state = obj.encode1st()
2185 obj.encode2nd(buf.write, iter(state))
2186 return buf.getvalue()
2189 class DecodePathDefBy(object):
2190 """DEFINED BY representation inside decode path
2192 __slots__ = ("defined_by",)
2194 def __init__(self, defined_by):
2195 self.defined_by = defined_by
2197 def __ne__(self, their):
2198 return not(self == their)
2200 def __eq__(self, their):
2201 if not isinstance(their, self.__class__):
2203 return self.defined_by == their.defined_by
2206 return "DEFINED BY " + str(self.defined_by)
2209 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2212 ########################################################################
2214 ########################################################################
2216 PP = namedtuple("PP", (
2239 ), **NAMEDTUPLE_KWARGS)
2244 asn1_type_name="unknown",
2261 expl_lenindef=False,
2292 def _colourize(what, colour, with_colours, attrs=("bold",)):
2293 return colored(what, colour, attrs=attrs) if with_colours else what
2296 def colonize_hex(hexed):
2297 """Separate hexadecimal string with colons
2299 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2308 with_decode_path=False,
2309 decode_path_len_decrease=0,
2316 " " if pp.expl_offset is None else
2317 ("-%d" % (pp.offset - pp.expl_offset))
2319 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2321 col = _colourize(col, "red", with_colours, ())
2322 col += _colourize("B", "red", with_colours) if pp.bered else " "
2324 col = "[%d,%d,%4d]%s" % (
2328 LENINDEF_PP_CHAR if pp.lenindef else " "
2330 col = _colourize(col, "green", with_colours, ())
2332 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2333 if decode_path_len > 0:
2334 cols.append(" ." * decode_path_len)
2335 ent = pp.decode_path[-1]
2336 if isinstance(ent, DecodePathDefBy):
2337 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2338 value = str(ent.defined_by)
2341 len(oid_maps) > 0 and
2342 ent.defined_by.asn1_type_name ==
2343 ObjectIdentifier.asn1_type_name
2345 for oid_map in oid_maps:
2346 oid_name = oid_map.get(value)
2347 if oid_name is not None:
2348 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2350 if oid_name is None:
2351 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2353 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2354 if pp.expl is not None:
2355 klass, _, num = pp.expl
2356 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2357 cols.append(_colourize(col, "blue", with_colours))
2358 if pp.impl is not None:
2359 klass, _, num = pp.impl
2360 col = "[%s%d]" % (TagClassReprs[klass], num)
2361 cols.append(_colourize(col, "blue", with_colours))
2362 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2363 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2365 cols.append(_colourize("BER", "red", with_colours))
2366 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2367 if pp.value is not None:
2369 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2371 len(oid_maps) > 0 and
2372 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2374 for oid_map in oid_maps:
2375 oid_name = oid_map.get(value)
2376 if oid_name is not None:
2377 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2379 if pp.asn1_type_name == Integer.asn1_type_name:
2380 hex_repr = hex(int(pp.obj._value))[2:].upper()
2381 if len(hex_repr) % 2 != 0:
2382 hex_repr = "0" + hex_repr
2383 cols.append(_colourize(
2384 "(%s)" % colonize_hex(hex_repr),
2389 if pp.blob.__class__ == binary_type:
2390 cols.append(hexenc(pp.blob))
2391 elif pp.blob.__class__ == tuple:
2392 cols.append(", ".join(pp.blob))
2394 cols.append(_colourize("OPTIONAL", "red", with_colours))
2396 cols.append(_colourize("DEFAULT", "red", with_colours))
2397 if with_decode_path:
2398 cols.append(_colourize(
2399 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2403 return " ".join(cols)
2406 def pp_console_blob(pp, decode_path_len_decrease=0):
2407 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2408 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2409 if decode_path_len > 0:
2410 cols.append(" ." * (decode_path_len + 1))
2411 if pp.blob.__class__ == binary_type:
2412 blob = hexenc(pp.blob).upper()
2413 for i in six_xrange(0, len(blob), 32):
2414 chunk = blob[i:i + 32]
2415 yield " ".join(cols + [colonize_hex(chunk)])
2416 elif pp.blob.__class__ == tuple:
2417 yield " ".join(cols + [", ".join(pp.blob)])
2425 with_decode_path=False,
2426 decode_path_only=(),
2429 """Pretty print object
2431 :param Obj obj: object you want to pretty print
2432 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2433 Its human readable form is printed when OID is met
2434 :param big_blobs: if large binary objects are met (like OctetString
2435 values), do we need to print them too, on separate
2437 :param with_colours: colourize output, if ``termcolor`` library
2439 :param with_decode_path: print decode path
2440 :param decode_path_only: print only that specified decode path
2442 def _pprint_pps(pps):
2444 if hasattr(pp, "_fields"):
2446 decode_path_only != () and
2448 str(p) for p in pp.decode_path[:len(decode_path_only)]
2449 ) != decode_path_only
2453 yield pp_console_row(
2458 with_colours=with_colours,
2459 with_decode_path=with_decode_path,
2460 decode_path_len_decrease=len(decode_path_only),
2462 for row in pp_console_blob(
2464 decode_path_len_decrease=len(decode_path_only),
2468 yield pp_console_row(
2473 with_colours=with_colours,
2474 with_decode_path=with_decode_path,
2475 decode_path_len_decrease=len(decode_path_only),
2478 for row in _pprint_pps(pp):
2480 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2483 ########################################################################
2484 # ASN.1 primitive types
2485 ########################################################################
2487 BooleanState = namedtuple(
2489 BasicState._fields + ("value",),
2495 """``BOOLEAN`` boolean type
2497 >>> b = Boolean(True)
2499 >>> b == Boolean(True)
2505 tag_default = tag_encode(1)
2506 asn1_type_name = "BOOLEAN"
2518 :param value: set the value. Either boolean type, or
2519 :py:class:`pyderasn.Boolean` object
2520 :param bytes impl: override default tag with ``IMPLICIT`` one
2521 :param bytes expl: override default tag with ``EXPLICIT`` one
2522 :param default: set default value. Type same as in ``value``
2523 :param bool optional: is object ``OPTIONAL`` in sequence
2525 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2526 self._value = None if value is None else self._value_sanitize(value)
2527 if default is not None:
2528 default = self._value_sanitize(default)
2529 self.default = self.__class__(
2535 self._value = default
2537 def _value_sanitize(self, value):
2538 if value.__class__ == bool:
2540 if issubclass(value.__class__, Boolean):
2542 raise InvalidValueType((self.__class__, bool))
2546 return self._value is not None
2548 def __getstate__(self):
2549 return BooleanState(
2565 def __setstate__(self, state):
2566 super(Boolean, self).__setstate__(state)
2567 self._value = state.value
2569 def __nonzero__(self):
2570 self._assert_ready()
2574 self._assert_ready()
2577 def __eq__(self, their):
2578 if their.__class__ == bool:
2579 return self._value == their
2580 if not issubclass(their.__class__, Boolean):
2583 self._value == their._value and
2584 self.tag == their.tag and
2585 self._expl == their._expl
2596 return self.__class__(
2598 impl=self.tag if impl is None else impl,
2599 expl=self._expl if expl is None else expl,
2600 default=self.default if default is None else default,
2601 optional=self.optional if optional is None else optional,
2605 self._assert_ready()
2606 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2608 def _encode1st(self, state):
2609 return len(self.tag) + 2, state
2611 def _encode2nd(self, writer, state_iter):
2612 self._assert_ready()
2613 write_full(writer, self._encode())
2615 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2617 t, _, lv = tag_strip(tlv)
2618 except DecodeError as err:
2619 raise err.__class__(
2621 klass=self.__class__,
2622 decode_path=decode_path,
2627 klass=self.__class__,
2628 decode_path=decode_path,
2635 l, _, v = len_decode(lv)
2636 except DecodeError as err:
2637 raise err.__class__(
2639 klass=self.__class__,
2640 decode_path=decode_path,
2644 raise InvalidLength(
2645 "Boolean's length must be equal to 1",
2646 klass=self.__class__,
2647 decode_path=decode_path,
2651 raise NotEnoughData(
2652 "encoded length is longer than data",
2653 klass=self.__class__,
2654 decode_path=decode_path,
2657 first_octet = byte2int(v)
2659 if first_octet == 0:
2661 elif first_octet == 0xFF:
2663 elif ctx.get("bered", False):
2668 "unacceptable Boolean value",
2669 klass=self.__class__,
2670 decode_path=decode_path,
2673 obj = self.__class__(
2677 default=self.default,
2678 optional=self.optional,
2679 _decoded=(offset, 1, 1),
2681 obj.ber_encoded = ber_encoded
2682 yield decode_path, obj, v[1:]
2685 return pp_console_row(next(self.pps()))
2687 def pps(self, decode_path=()):
2690 asn1_type_name=self.asn1_type_name,
2691 obj_name=self.__class__.__name__,
2692 decode_path=decode_path,
2693 value=str(self._value) if self.ready else None,
2694 optional=self.optional,
2695 default=self == self.default,
2696 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2697 expl=None if self._expl is None else tag_decode(self._expl),
2702 expl_offset=self.expl_offset if self.expled else None,
2703 expl_tlen=self.expl_tlen if self.expled else None,
2704 expl_llen=self.expl_llen if self.expled else None,
2705 expl_vlen=self.expl_vlen if self.expled else None,
2706 expl_lenindef=self.expl_lenindef,
2707 ber_encoded=self.ber_encoded,
2710 for pp in self.pps_lenindef(decode_path):
2714 IntegerState = namedtuple(
2716 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2722 """``INTEGER`` integer type
2724 >>> b = Integer(-123)
2726 >>> b == Integer(-123)
2731 >>> Integer(2, bounds=(1, 3))
2733 >>> Integer(5, bounds=(1, 3))
2734 Traceback (most recent call last):
2735 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2739 class Version(Integer):
2746 >>> v = Version("v1")
2753 {'v3': 2, 'v1': 0, 'v2': 1}
2755 __slots__ = ("specs", "_bound_min", "_bound_max")
2756 tag_default = tag_encode(2)
2757 asn1_type_name = "INTEGER"
2771 :param value: set the value. Either integer type, named value
2772 (if ``schema`` is specified in the class), or
2773 :py:class:`pyderasn.Integer` object
2774 :param bounds: set ``(MIN, MAX)`` value constraint.
2775 (-inf, +inf) by default
2776 :param bytes impl: override default tag with ``IMPLICIT`` one
2777 :param bytes expl: override default tag with ``EXPLICIT`` one
2778 :param default: set default value. Type same as in ``value``
2779 :param bool optional: is object ``OPTIONAL`` in sequence
2781 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2783 specs = getattr(self, "schema", {}) if _specs is None else _specs
2784 self.specs = specs if specs.__class__ == dict else dict(specs)
2785 self._bound_min, self._bound_max = getattr(
2788 (float("-inf"), float("+inf")),
2789 ) if bounds is None else bounds
2790 if value is not None:
2791 self._value = self._value_sanitize(value)
2792 if default is not None:
2793 default = self._value_sanitize(default)
2794 self.default = self.__class__(
2800 if self._value is None:
2801 self._value = default
2803 def _value_sanitize(self, value):
2804 if isinstance(value, integer_types):
2806 elif issubclass(value.__class__, Integer):
2807 value = value._value
2808 elif value.__class__ == str:
2809 value = self.specs.get(value)
2811 raise ObjUnknown("integer value: %s" % value)
2813 raise InvalidValueType((self.__class__, int, str))
2814 if not self._bound_min <= value <= self._bound_max:
2815 raise BoundsError(self._bound_min, value, self._bound_max)
2820 return self._value is not None
2822 def __getstate__(self):
2823 return IntegerState(
2842 def __setstate__(self, state):
2843 super(Integer, self).__setstate__(state)
2844 self.specs = state.specs
2845 self._value = state.value
2846 self._bound_min = state.bound_min
2847 self._bound_max = state.bound_max
2850 self._assert_ready()
2851 return int(self._value)
2854 self._assert_ready()
2855 return hash(b"".join((
2857 bytes(self._expl or b""),
2858 str(self._value).encode("ascii"),
2861 def __eq__(self, their):
2862 if isinstance(their, integer_types):
2863 return self._value == their
2864 if not issubclass(their.__class__, Integer):
2867 self._value == their._value and
2868 self.tag == their.tag and
2869 self._expl == their._expl
2872 def __lt__(self, their):
2873 return self._value < their._value
2877 """Return named representation (if exists) of the value
2879 for name, value in iteritems(self.specs):
2880 if value == self._value:
2893 return self.__class__(
2896 (self._bound_min, self._bound_max)
2897 if bounds is None else bounds
2899 impl=self.tag if impl is None else impl,
2900 expl=self._expl if expl is None else expl,
2901 default=self.default if default is None else default,
2902 optional=self.optional if optional is None else optional,
2906 def _encode_payload(self):
2907 self._assert_ready()
2911 octets = bytearray([0])
2915 octets = bytearray()
2917 octets.append((value & 0xFF) ^ 0xFF)
2919 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2922 octets = bytearray()
2924 octets.append(value & 0xFF)
2926 if octets[-1] & 0x80 > 0:
2929 octets = bytes(octets)
2931 bytes_len = ceil(value.bit_length() / 8) or 1
2934 octets = value.to_bytes(
2939 except OverflowError:
2944 return b"".join((self.tag, len_encode(len(octets)), octets))
2947 octets = self._encode_payload()
2948 return b"".join((self.tag, len_encode(len(octets)), octets))
2950 def _encode1st(self, state):
2951 l = len(self._encode_payload())
2952 return len(self.tag) + len_size(l) + l, state
2954 def _encode2nd(self, writer, state_iter):
2955 write_full(writer, self._encode())
2957 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2959 t, _, lv = tag_strip(tlv)
2960 except DecodeError as err:
2961 raise err.__class__(
2963 klass=self.__class__,
2964 decode_path=decode_path,
2969 klass=self.__class__,
2970 decode_path=decode_path,
2977 l, llen, v = len_decode(lv)
2978 except DecodeError as err:
2979 raise err.__class__(
2981 klass=self.__class__,
2982 decode_path=decode_path,
2986 raise NotEnoughData(
2987 "encoded length is longer than data",
2988 klass=self.__class__,
2989 decode_path=decode_path,
2993 raise NotEnoughData(
2995 klass=self.__class__,
2996 decode_path=decode_path,
2999 v, tail = v[:l], v[l:]
3000 first_octet = byte2int(v)
3002 second_octet = byte2int(v[1:])
3004 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3005 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3008 "non normalized integer",
3009 klass=self.__class__,
3010 decode_path=decode_path,
3015 if first_octet & 0x80 > 0:
3016 octets = bytearray()
3017 for octet in bytearray(v):
3018 octets.append(octet ^ 0xFF)
3019 for octet in octets:
3020 value = (value << 8) | octet
3024 for octet in bytearray(v):
3025 value = (value << 8) | octet
3027 value = int.from_bytes(v, byteorder="big", signed=True)
3029 obj = self.__class__(
3031 bounds=(self._bound_min, self._bound_max),
3034 default=self.default,
3035 optional=self.optional,
3037 _decoded=(offset, llen, l),
3039 except BoundsError as err:
3042 klass=self.__class__,
3043 decode_path=decode_path,
3046 yield decode_path, obj, tail
3049 return pp_console_row(next(self.pps()))
3051 def pps(self, decode_path=()):
3054 asn1_type_name=self.asn1_type_name,
3055 obj_name=self.__class__.__name__,
3056 decode_path=decode_path,
3057 value=(self.named or str(self._value)) if self.ready else None,
3058 optional=self.optional,
3059 default=self == self.default,
3060 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3061 expl=None if self._expl is None else tag_decode(self._expl),
3066 expl_offset=self.expl_offset if self.expled else None,
3067 expl_tlen=self.expl_tlen if self.expled else None,
3068 expl_llen=self.expl_llen if self.expled else None,
3069 expl_vlen=self.expl_vlen if self.expled else None,
3070 expl_lenindef=self.expl_lenindef,
3073 for pp in self.pps_lenindef(decode_path):
3077 BitStringState = namedtuple(
3079 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3084 class BitString(Obj):
3085 """``BIT STRING`` bit string type
3087 >>> BitString(b"hello world")
3088 BIT STRING 88 bits 68656c6c6f20776f726c64
3091 >>> b == b"hello world"
3096 >>> BitString("'0A3B5F291CD'H")
3097 BIT STRING 44 bits 0a3b5f291cd0
3098 >>> b = BitString("'010110000000'B")
3099 BIT STRING 12 bits 5800
3102 >>> b[0], b[1], b[2], b[3]
3103 (False, True, False, True)
3107 [False, True, False, True, True, False, False, False, False, False, False, False]
3111 class KeyUsage(BitString):
3113 ("digitalSignature", 0),
3114 ("nonRepudiation", 1),
3115 ("keyEncipherment", 2),
3118 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3119 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3121 ['nonRepudiation', 'keyEncipherment']
3123 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3127 Pay attention that BIT STRING can be encoded both in primitive
3128 and constructed forms. Decoder always checks constructed form tag
3129 additionally to specified primitive one. If BER decoding is
3130 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3131 of DER restrictions.
3133 __slots__ = ("tag_constructed", "specs", "defined")
3134 tag_default = tag_encode(3)
3135 asn1_type_name = "BIT STRING"
3148 :param value: set the value. Either binary type, tuple of named
3149 values (if ``schema`` is specified in the class),
3150 string in ``'XXX...'B`` form, or
3151 :py:class:`pyderasn.BitString` object
3152 :param bytes impl: override default tag with ``IMPLICIT`` one
3153 :param bytes expl: override default tag with ``EXPLICIT`` one
3154 :param default: set default value. Type same as in ``value``
3155 :param bool optional: is object ``OPTIONAL`` in sequence
3157 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3158 specs = getattr(self, "schema", {}) if _specs is None else _specs
3159 self.specs = specs if specs.__class__ == dict else dict(specs)
3160 self._value = None if value is None else self._value_sanitize(value)
3161 if default is not None:
3162 default = self._value_sanitize(default)
3163 self.default = self.__class__(
3169 self._value = default
3171 tag_klass, _, tag_num = tag_decode(self.tag)
3172 self.tag_constructed = tag_encode(
3174 form=TagFormConstructed,
3178 def _bits2octets(self, bits):
3179 if len(self.specs) > 0:
3180 bits = bits.rstrip("0")
3182 bits += "0" * ((8 - (bit_len % 8)) % 8)
3183 octets = bytearray(len(bits) // 8)
3184 for i in six_xrange(len(octets)):
3185 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3186 return bit_len, bytes(octets)
3188 def _value_sanitize(self, value):
3189 if isinstance(value, (string_types, binary_type)):
3191 isinstance(value, string_types) and
3192 value.startswith("'")
3194 if value.endswith("'B"):
3196 if not frozenset(value) <= SET01:
3197 raise ValueError("B's coding contains unacceptable chars")
3198 return self._bits2octets(value)
3199 if value.endswith("'H"):
3203 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3205 if value.__class__ == binary_type:
3206 return (len(value) * 8, value)
3207 raise InvalidValueType((self.__class__, string_types, binary_type))
3208 if value.__class__ == tuple:
3211 isinstance(value[0], integer_types) and
3212 value[1].__class__ == binary_type
3217 bit = self.specs.get(name)
3219 raise ObjUnknown("BitString value: %s" % name)
3222 return self._bits2octets("")
3223 bits = frozenset(bits)
3224 return self._bits2octets("".join(
3225 ("1" if bit in bits else "0")
3226 for bit in six_xrange(max(bits) + 1)
3228 if issubclass(value.__class__, BitString):
3230 raise InvalidValueType((self.__class__, binary_type, string_types))
3234 return self._value is not None
3236 def __getstate__(self):
3237 return BitStringState(
3252 self.tag_constructed,
3256 def __setstate__(self, state):
3257 super(BitString, self).__setstate__(state)
3258 self.specs = state.specs
3259 self._value = state.value
3260 self.tag_constructed = state.tag_constructed
3261 self.defined = state.defined
3264 self._assert_ready()
3265 for i in six_xrange(self._value[0]):
3270 """Returns number of bits in the string
3272 self._assert_ready()
3273 return self._value[0]
3275 def __bytes__(self):
3276 self._assert_ready()
3277 return self._value[1]
3279 def __eq__(self, their):
3280 if their.__class__ == bytes:
3281 return self._value[1] == their
3282 if not issubclass(their.__class__, BitString):
3285 self._value == their._value and
3286 self.tag == their.tag and
3287 self._expl == their._expl
3292 """Named representation (if exists) of the bits
3294 :returns: [str(name), ...]
3296 return [name for name, bit in iteritems(self.specs) if self[bit]]
3306 return self.__class__(
3308 impl=self.tag if impl is None else impl,
3309 expl=self._expl if expl is None else expl,
3310 default=self.default if default is None else default,
3311 optional=self.optional if optional is None else optional,
3315 def __getitem__(self, key):
3316 if key.__class__ == int:
3317 bit_len, octets = self._value
3321 byte2int(memoryview(octets)[key // 8:]) >>
3324 if isinstance(key, string_types):
3325 value = self.specs.get(key)
3327 raise ObjUnknown("BitString value: %s" % key)
3329 raise InvalidValueType((int, str))
3332 self._assert_ready()
3333 bit_len, octets = self._value
3336 len_encode(len(octets) + 1),
3337 int2byte((8 - bit_len % 8) % 8),
3341 def _encode1st(self, state):
3342 self._assert_ready()
3343 _, octets = self._value
3345 return len(self.tag) + len_size(l) + l, state
3347 def _encode2nd(self, writer, state_iter):
3348 bit_len, octets = self._value
3349 write_full(writer, b"".join((
3351 len_encode(len(octets) + 1),
3352 int2byte((8 - bit_len % 8) % 8),
3354 write_full(writer, octets)
3356 def _encode_cer(self, writer):
3357 bit_len, octets = self._value
3358 if len(octets) + 1 <= 1000:
3359 write_full(writer, self._encode())
3361 write_full(writer, self.tag_constructed)
3362 write_full(writer, LENINDEF)
3363 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3364 write_full(writer, b"".join((
3365 BitString.tag_default,
3368 octets[offset:offset + 999],
3370 tail = octets[offset+999:]
3372 tail = int2byte((8 - bit_len % 8) % 8) + tail
3373 write_full(writer, b"".join((
3374 BitString.tag_default,
3375 len_encode(len(tail)),
3378 write_full(writer, EOC)
3380 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3382 t, tlen, lv = tag_strip(tlv)
3383 except DecodeError as err:
3384 raise err.__class__(
3386 klass=self.__class__,
3387 decode_path=decode_path,
3391 if tag_only: # pragma: no cover
3395 l, llen, v = len_decode(lv)
3396 except DecodeError as err:
3397 raise err.__class__(
3399 klass=self.__class__,
3400 decode_path=decode_path,
3404 raise NotEnoughData(
3405 "encoded length is longer than data",
3406 klass=self.__class__,
3407 decode_path=decode_path,
3411 raise NotEnoughData(
3413 klass=self.__class__,
3414 decode_path=decode_path,
3417 pad_size = byte2int(v)
3418 if l == 1 and pad_size != 0:
3420 "invalid empty value",
3421 klass=self.__class__,
3422 decode_path=decode_path,
3428 klass=self.__class__,
3429 decode_path=decode_path,
3432 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3435 klass=self.__class__,
3436 decode_path=decode_path,
3439 v, tail = v[:l], v[l:]
3440 bit_len = (len(v) - 1) * 8 - pad_size
3441 obj = self.__class__(
3442 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3445 default=self.default,
3446 optional=self.optional,
3448 _decoded=(offset, llen, l),
3451 obj._value = (bit_len, None)
3452 yield decode_path, obj, tail
3454 if t != self.tag_constructed:
3456 klass=self.__class__,
3457 decode_path=decode_path,
3460 if not ctx.get("bered", False):
3462 "unallowed BER constructed encoding",
3463 klass=self.__class__,
3464 decode_path=decode_path,
3467 if tag_only: # pragma: no cover
3472 l, llen, v = len_decode(lv)
3473 except LenIndefForm:
3474 llen, l, v = 1, 0, lv[1:]
3476 except DecodeError as err:
3477 raise err.__class__(
3479 klass=self.__class__,
3480 decode_path=decode_path,
3484 raise NotEnoughData(
3485 "encoded length is longer than data",
3486 klass=self.__class__,
3487 decode_path=decode_path,
3490 if not lenindef and l == 0:
3491 raise NotEnoughData(
3493 klass=self.__class__,
3494 decode_path=decode_path,
3498 sub_offset = offset + tlen + llen
3502 if v[:EOC_LEN].tobytes() == EOC:
3509 "chunk out of bounds",
3510 klass=self.__class__,
3511 decode_path=decode_path + (str(len(chunks) - 1),),
3512 offset=chunks[-1].offset,
3514 sub_decode_path = decode_path + (str(len(chunks)),)
3517 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3520 decode_path=sub_decode_path,
3523 _ctx_immutable=False,
3525 yield _decode_path, chunk, v_tail
3527 _, chunk, v_tail = next(BitString().decode_evgen(
3530 decode_path=sub_decode_path,
3533 _ctx_immutable=False,
3538 "expected BitString encoded chunk",
3539 klass=self.__class__,
3540 decode_path=sub_decode_path,
3543 chunks.append(chunk)
3544 sub_offset += chunk.tlvlen
3545 vlen += chunk.tlvlen
3547 if len(chunks) == 0:
3550 klass=self.__class__,
3551 decode_path=decode_path,
3556 for chunk_i, chunk in enumerate(chunks[:-1]):
3557 if chunk.bit_len % 8 != 0:
3559 "BitString chunk is not multiple of 8 bits",
3560 klass=self.__class__,
3561 decode_path=decode_path + (str(chunk_i),),
3562 offset=chunk.offset,
3565 values.append(bytes(chunk))
3566 bit_len += chunk.bit_len
3567 chunk_last = chunks[-1]
3569 values.append(bytes(chunk_last))
3570 bit_len += chunk_last.bit_len
3571 obj = self.__class__(
3572 value=None if evgen_mode else (bit_len, b"".join(values)),
3575 default=self.default,
3576 optional=self.optional,
3578 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3581 obj._value = (bit_len, None)
3582 obj.lenindef = lenindef
3583 obj.ber_encoded = True
3584 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3587 return pp_console_row(next(self.pps()))
3589 def pps(self, decode_path=()):
3593 bit_len, blob = self._value
3594 value = "%d bits" % bit_len
3595 if len(self.specs) > 0 and blob is not None:
3596 blob = tuple(self.named)
3599 asn1_type_name=self.asn1_type_name,
3600 obj_name=self.__class__.__name__,
3601 decode_path=decode_path,
3604 optional=self.optional,
3605 default=self == self.default,
3606 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3607 expl=None if self._expl is None else tag_decode(self._expl),
3612 expl_offset=self.expl_offset if self.expled else None,
3613 expl_tlen=self.expl_tlen if self.expled else None,
3614 expl_llen=self.expl_llen if self.expled else None,
3615 expl_vlen=self.expl_vlen if self.expled else None,
3616 expl_lenindef=self.expl_lenindef,
3617 lenindef=self.lenindef,
3618 ber_encoded=self.ber_encoded,
3621 defined_by, defined = self.defined or (None, None)
3622 if defined_by is not None:
3624 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3626 for pp in self.pps_lenindef(decode_path):
3630 OctetStringState = namedtuple(
3632 BasicState._fields + (
3643 class OctetString(Obj):
3644 """``OCTET STRING`` binary string type
3646 >>> s = OctetString(b"hello world")
3647 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3648 >>> s == OctetString(b"hello world")
3653 >>> OctetString(b"hello", bounds=(4, 4))
3654 Traceback (most recent call last):
3655 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3656 >>> OctetString(b"hell", bounds=(4, 4))
3657 OCTET STRING 4 bytes 68656c6c
3659 Memoryviews can be used as a values. If memoryview is made on
3660 mmap-ed file, then it does not take storage inside OctetString
3661 itself. In CER encoding mode it will be streamed to the specified
3662 writer, copying 1 KB chunks.
3664 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3665 tag_default = tag_encode(4)
3666 asn1_type_name = "OCTET STRING"
3667 evgen_mode_skip_value = True
3681 :param value: set the value. Either binary type, or
3682 :py:class:`pyderasn.OctetString` object
3683 :param bounds: set ``(MIN, MAX)`` value size constraint.
3684 (-inf, +inf) by default
3685 :param bytes impl: override default tag with ``IMPLICIT`` one
3686 :param bytes expl: override default tag with ``EXPLICIT`` one
3687 :param default: set default value. Type same as in ``value``
3688 :param bool optional: is object ``OPTIONAL`` in sequence
3690 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3692 self._bound_min, self._bound_max = getattr(
3696 ) if bounds is None else bounds
3697 if value is not None:
3698 self._value = self._value_sanitize(value)
3699 if default is not None:
3700 default = self._value_sanitize(default)
3701 self.default = self.__class__(
3706 if self._value is None:
3707 self._value = default
3709 tag_klass, _, tag_num = tag_decode(self.tag)
3710 self.tag_constructed = tag_encode(
3712 form=TagFormConstructed,
3716 def _value_sanitize(self, value):
3717 if value.__class__ == binary_type or value.__class__ == memoryview:
3719 elif issubclass(value.__class__, OctetString):
3720 value = value._value
3722 raise InvalidValueType((self.__class__, bytes, memoryview))
3723 if not self._bound_min <= len(value) <= self._bound_max:
3724 raise BoundsError(self._bound_min, len(value), self._bound_max)
3729 return self._value is not None
3731 def __getstate__(self):
3732 return OctetStringState(
3748 self.tag_constructed,
3752 def __setstate__(self, state):
3753 super(OctetString, self).__setstate__(state)
3754 self._value = state.value
3755 self._bound_min = state.bound_min
3756 self._bound_max = state.bound_max
3757 self.tag_constructed = state.tag_constructed
3758 self.defined = state.defined
3760 def __bytes__(self):
3761 self._assert_ready()
3762 return bytes(self._value)
3764 def __eq__(self, their):
3765 if their.__class__ == binary_type:
3766 return self._value == their
3767 if not issubclass(their.__class__, OctetString):
3770 self._value == their._value and
3771 self.tag == their.tag and
3772 self._expl == their._expl
3775 def __lt__(self, their):
3776 return self._value < their._value
3787 return self.__class__(
3790 (self._bound_min, self._bound_max)
3791 if bounds is None else bounds
3793 impl=self.tag if impl is None else impl,
3794 expl=self._expl if expl is None else expl,
3795 default=self.default if default is None else default,
3796 optional=self.optional if optional is None else optional,
3800 self._assert_ready()
3803 len_encode(len(self._value)),
3807 def _encode1st(self, state):
3808 self._assert_ready()
3809 l = len(self._value)
3810 return len(self.tag) + len_size(l) + l, state
3812 def _encode2nd(self, writer, state_iter):
3814 write_full(writer, self.tag + len_encode(len(value)))
3815 write_full(writer, value)
3817 def _encode_cer(self, writer):
3818 octets = self._value
3819 if len(octets) <= 1000:
3820 write_full(writer, self._encode())
3822 write_full(writer, self.tag_constructed)
3823 write_full(writer, LENINDEF)
3824 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3825 write_full(writer, b"".join((
3826 OctetString.tag_default,
3828 octets[offset:offset + 1000],
3830 tail = octets[offset+1000:]
3832 write_full(writer, b"".join((
3833 OctetString.tag_default,
3834 len_encode(len(tail)),
3837 write_full(writer, EOC)
3839 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3841 t, tlen, lv = tag_strip(tlv)
3842 except DecodeError as err:
3843 raise err.__class__(
3845 klass=self.__class__,
3846 decode_path=decode_path,
3854 l, llen, v = len_decode(lv)
3855 except DecodeError as err:
3856 raise err.__class__(
3858 klass=self.__class__,
3859 decode_path=decode_path,
3863 raise NotEnoughData(
3864 "encoded length is longer than data",
3865 klass=self.__class__,
3866 decode_path=decode_path,
3869 v, tail = v[:l], v[l:]
3870 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3872 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3873 klass=self.__class__,
3874 decode_path=decode_path,
3878 obj = self.__class__(
3880 None if (evgen_mode and self.evgen_mode_skip_value)
3883 bounds=(self._bound_min, self._bound_max),
3886 default=self.default,
3887 optional=self.optional,
3888 _decoded=(offset, llen, l),
3891 except DecodeError as err:
3894 klass=self.__class__,
3895 decode_path=decode_path,
3898 except BoundsError as err:
3901 klass=self.__class__,
3902 decode_path=decode_path,
3905 yield decode_path, obj, tail
3907 if t != self.tag_constructed:
3909 klass=self.__class__,
3910 decode_path=decode_path,
3913 if not ctx.get("bered", False):
3915 "unallowed BER constructed encoding",
3916 klass=self.__class__,
3917 decode_path=decode_path,
3925 l, llen, v = len_decode(lv)
3926 except LenIndefForm:
3927 llen, l, v = 1, 0, lv[1:]
3929 except DecodeError as err:
3930 raise err.__class__(
3932 klass=self.__class__,
3933 decode_path=decode_path,
3937 raise NotEnoughData(
3938 "encoded length is longer than data",
3939 klass=self.__class__,
3940 decode_path=decode_path,
3945 sub_offset = offset + tlen + llen
3950 if v[:EOC_LEN].tobytes() == EOC:
3957 "chunk out of bounds",
3958 klass=self.__class__,
3959 decode_path=decode_path + (str(len(chunks) - 1),),
3960 offset=chunks[-1].offset,
3964 sub_decode_path = decode_path + (str(chunks_count),)
3965 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3968 decode_path=sub_decode_path,
3971 _ctx_immutable=False,
3973 yield _decode_path, chunk, v_tail
3974 if not chunk.ber_encoded:
3975 payload_len += chunk.vlen
3978 sub_decode_path = decode_path + (str(len(chunks)),)
3979 _, chunk, v_tail = next(OctetString().decode_evgen(
3982 decode_path=sub_decode_path,
3985 _ctx_immutable=False,
3988 chunks.append(chunk)
3991 "expected OctetString encoded chunk",
3992 klass=self.__class__,
3993 decode_path=sub_decode_path,
3996 sub_offset += chunk.tlvlen
3997 vlen += chunk.tlvlen
3999 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4001 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4002 klass=self.__class__,
4003 decode_path=decode_path,
4007 obj = self.__class__(
4009 None if evgen_mode else
4010 b"".join(bytes(chunk) for chunk in chunks)
4012 bounds=(self._bound_min, self._bound_max),
4015 default=self.default,
4016 optional=self.optional,
4017 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4020 except DecodeError as err:
4023 klass=self.__class__,
4024 decode_path=decode_path,
4027 except BoundsError as err:
4030 klass=self.__class__,
4031 decode_path=decode_path,
4034 obj.lenindef = lenindef
4035 obj.ber_encoded = True
4036 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4039 return pp_console_row(next(self.pps()))
4041 def pps(self, decode_path=()):
4044 asn1_type_name=self.asn1_type_name,
4045 obj_name=self.__class__.__name__,
4046 decode_path=decode_path,
4047 value=("%d bytes" % len(self._value)) if self.ready else None,
4048 blob=self._value if self.ready else None,
4049 optional=self.optional,
4050 default=self == self.default,
4051 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4052 expl=None if self._expl is None else tag_decode(self._expl),
4057 expl_offset=self.expl_offset if self.expled else None,
4058 expl_tlen=self.expl_tlen if self.expled else None,
4059 expl_llen=self.expl_llen if self.expled else None,
4060 expl_vlen=self.expl_vlen if self.expled else None,
4061 expl_lenindef=self.expl_lenindef,
4062 lenindef=self.lenindef,
4063 ber_encoded=self.ber_encoded,
4066 defined_by, defined = self.defined or (None, None)
4067 if defined_by is not None:
4069 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4071 for pp in self.pps_lenindef(decode_path):
4075 def agg_octet_string(evgens, decode_path, raw, writer):
4076 """Aggregate constructed string (OctetString and its derivatives)
4078 :param evgens: iterator of generated events
4079 :param decode_path: points to the string we want to decode
4080 :param raw: slicebable (memoryview, bytearray, etc) with
4081 the data evgens are generated on
4082 :param writer: buffer.write where string is going to be saved
4083 :param writer: where string is going to be saved. Must comply
4084 with ``io.RawIOBase.write`` behaviour
4086 .. seealso:: :ref:`agg_octet_string`
4088 decode_path_len = len(decode_path)
4089 for dp, obj, _ in evgens:
4090 if dp[:decode_path_len] != decode_path:
4092 if not obj.ber_encoded:
4093 write_full(writer, raw[
4094 obj.offset + obj.tlen + obj.llen:
4095 obj.offset + obj.tlen + obj.llen + obj.vlen -
4096 (EOC_LEN if obj.expl_lenindef else 0)
4098 if len(dp) == decode_path_len:
4102 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4106 """``NULL`` null object
4114 tag_default = tag_encode(5)
4115 asn1_type_name = "NULL"
4119 value=None, # unused, but Sequence passes it
4126 :param bytes impl: override default tag with ``IMPLICIT`` one
4127 :param bytes expl: override default tag with ``EXPLICIT`` one
4128 :param bool optional: is object ``OPTIONAL`` in sequence
4130 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4137 def __getstate__(self):
4153 def __eq__(self, their):
4154 if not issubclass(their.__class__, Null):
4157 self.tag == their.tag and
4158 self._expl == their._expl
4168 return self.__class__(
4169 impl=self.tag if impl is None else impl,
4170 expl=self._expl if expl is None else expl,
4171 optional=self.optional if optional is None else optional,
4175 return self.tag + LEN0
4177 def _encode1st(self, state):
4178 return len(self.tag) + 1, state
4180 def _encode2nd(self, writer, state_iter):
4181 write_full(writer, self.tag + LEN0)
4183 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4185 t, _, lv = tag_strip(tlv)
4186 except DecodeError as err:
4187 raise err.__class__(
4189 klass=self.__class__,
4190 decode_path=decode_path,
4195 klass=self.__class__,
4196 decode_path=decode_path,
4199 if tag_only: # pragma: no cover
4203 l, _, v = len_decode(lv)
4204 except DecodeError as err:
4205 raise err.__class__(
4207 klass=self.__class__,
4208 decode_path=decode_path,
4212 raise InvalidLength(
4213 "Null must have zero length",
4214 klass=self.__class__,
4215 decode_path=decode_path,
4218 obj = self.__class__(
4221 optional=self.optional,
4222 _decoded=(offset, 1, 0),
4224 yield decode_path, obj, v
4227 return pp_console_row(next(self.pps()))
4229 def pps(self, decode_path=()):
4232 asn1_type_name=self.asn1_type_name,
4233 obj_name=self.__class__.__name__,
4234 decode_path=decode_path,
4235 optional=self.optional,
4236 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4237 expl=None if self._expl is None else tag_decode(self._expl),
4242 expl_offset=self.expl_offset if self.expled else None,
4243 expl_tlen=self.expl_tlen if self.expled else None,
4244 expl_llen=self.expl_llen if self.expled else None,
4245 expl_vlen=self.expl_vlen if self.expled else None,
4246 expl_lenindef=self.expl_lenindef,
4249 for pp in self.pps_lenindef(decode_path):
4253 ObjectIdentifierState = namedtuple(
4254 "ObjectIdentifierState",
4255 BasicState._fields + ("value", "defines"),
4260 class ObjectIdentifier(Obj):
4261 """``OBJECT IDENTIFIER`` OID type
4263 >>> oid = ObjectIdentifier((1, 2, 3))
4264 OBJECT IDENTIFIER 1.2.3
4265 >>> oid == ObjectIdentifier("1.2.3")
4271 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4272 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4274 >>> str(ObjectIdentifier((3, 1)))
4275 Traceback (most recent call last):
4276 pyderasn.InvalidOID: unacceptable first arc value
4278 __slots__ = ("defines",)
4279 tag_default = tag_encode(6)
4280 asn1_type_name = "OBJECT IDENTIFIER"
4293 :param value: set the value. Either tuples of integers,
4294 string of "."-concatenated integers, or
4295 :py:class:`pyderasn.ObjectIdentifier` object
4296 :param defines: sequence of tuples. Each tuple has two elements.
4297 First one is relative to current one decode
4298 path, aiming to the field defined by that OID.
4299 Read about relative path in
4300 :py:func:`pyderasn.abs_decode_path`. Second
4301 tuple element is ``{OID: pyderasn.Obj()}``
4302 dictionary, mapping between current OID value
4303 and structure applied to defined field.
4305 .. seealso:: :ref:`definedby`
4307 :param bytes impl: override default tag with ``IMPLICIT`` one
4308 :param bytes expl: override default tag with ``EXPLICIT`` one
4309 :param default: set default value. Type same as in ``value``
4310 :param bool optional: is object ``OPTIONAL`` in sequence
4312 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4314 if value is not None:
4315 self._value = self._value_sanitize(value)
4316 if default is not None:
4317 default = self._value_sanitize(default)
4318 self.default = self.__class__(
4323 if self._value is None:
4324 self._value = default
4325 self.defines = defines
4327 def __add__(self, their):
4328 if their.__class__ == tuple:
4329 return self.__class__(self._value + array("L", their))
4330 if isinstance(their, self.__class__):
4331 return self.__class__(self._value + their._value)
4332 raise InvalidValueType((self.__class__, tuple))
4334 def _value_sanitize(self, value):
4335 if issubclass(value.__class__, ObjectIdentifier):
4337 if isinstance(value, string_types):
4339 value = array("L", (pureint(arc) for arc in value.split(".")))
4341 raise InvalidOID("unacceptable arcs values")
4342 if value.__class__ == tuple:
4344 value = array("L", value)
4345 except OverflowError as err:
4346 raise InvalidOID(repr(err))
4347 if value.__class__ is array:
4349 raise InvalidOID("less than 2 arcs")
4350 first_arc = value[0]
4351 if first_arc in (0, 1):
4352 if not (0 <= value[1] <= 39):
4353 raise InvalidOID("second arc is too wide")
4354 elif first_arc == 2:
4357 raise InvalidOID("unacceptable first arc value")
4358 if not all(arc >= 0 for arc in value):
4359 raise InvalidOID("negative arc value")
4361 raise InvalidValueType((self.__class__, str, tuple))
4365 return self._value is not None
4367 def __getstate__(self):
4368 return ObjectIdentifierState(
4385 def __setstate__(self, state):
4386 super(ObjectIdentifier, self).__setstate__(state)
4387 self._value = state.value
4388 self.defines = state.defines
4391 self._assert_ready()
4392 return iter(self._value)
4395 return ".".join(str(arc) for arc in self._value or ())
4398 self._assert_ready()
4399 return hash(b"".join((
4401 bytes(self._expl or b""),
4402 str(self._value).encode("ascii"),
4405 def __eq__(self, their):
4406 if their.__class__ == tuple:
4407 return self._value == array("L", their)
4408 if not issubclass(their.__class__, ObjectIdentifier):
4411 self.tag == their.tag and
4412 self._expl == their._expl and
4413 self._value == their._value
4416 def __lt__(self, their):
4417 return self._value < their._value
4428 return self.__class__(
4430 defines=self.defines if defines is None else defines,
4431 impl=self.tag if impl is None else impl,
4432 expl=self._expl if expl is None else expl,
4433 default=self.default if default is None else default,
4434 optional=self.optional if optional is None else optional,
4437 def _encode_octets(self):
4438 self._assert_ready()
4440 first_value = value[1]
4441 first_arc = value[0]
4444 elif first_arc == 1:
4446 elif first_arc == 2:
4448 else: # pragma: no cover
4449 raise RuntimeError("invalid arc is stored")
4450 octets = [zero_ended_encode(first_value)]
4451 for arc in value[2:]:
4452 octets.append(zero_ended_encode(arc))
4453 return b"".join(octets)
4456 v = self._encode_octets()
4457 return b"".join((self.tag, len_encode(len(v)), v))
4459 def _encode1st(self, state):
4460 l = len(self._encode_octets())
4461 return len(self.tag) + len_size(l) + l, state
4463 def _encode2nd(self, writer, state_iter):
4464 write_full(writer, self._encode())
4466 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4468 t, _, lv = tag_strip(tlv)
4469 except DecodeError as err:
4470 raise err.__class__(
4472 klass=self.__class__,
4473 decode_path=decode_path,
4478 klass=self.__class__,
4479 decode_path=decode_path,
4482 if tag_only: # pragma: no cover
4486 l, llen, v = len_decode(lv)
4487 except DecodeError as err:
4488 raise err.__class__(
4490 klass=self.__class__,
4491 decode_path=decode_path,
4495 raise NotEnoughData(
4496 "encoded length is longer than data",
4497 klass=self.__class__,
4498 decode_path=decode_path,
4502 raise NotEnoughData(
4504 klass=self.__class__,
4505 decode_path=decode_path,
4508 v, tail = v[:l], v[l:]
4515 octet = indexbytes(v, i)
4516 if i == 0 and octet == 0x80:
4517 if ctx.get("bered", False):
4521 "non normalized arc encoding",
4522 klass=self.__class__,
4523 decode_path=decode_path,
4526 arc = (arc << 7) | (octet & 0x7F)
4527 if octet & 0x80 == 0:
4530 except OverflowError:
4532 "too huge value for local unsigned long",
4533 klass=self.__class__,
4534 decode_path=decode_path,
4543 klass=self.__class__,
4544 decode_path=decode_path,
4548 second_arc = arcs[0]
4549 if 0 <= second_arc <= 39:
4551 elif 40 <= second_arc <= 79:
4557 obj = self.__class__(
4558 value=array("L", (first_arc, second_arc)) + arcs[1:],
4561 default=self.default,
4562 optional=self.optional,
4563 _decoded=(offset, llen, l),
4566 obj.ber_encoded = True
4567 yield decode_path, obj, tail
4570 return pp_console_row(next(self.pps()))
4572 def pps(self, decode_path=()):
4575 asn1_type_name=self.asn1_type_name,
4576 obj_name=self.__class__.__name__,
4577 decode_path=decode_path,
4578 value=str(self) if self.ready else None,
4579 optional=self.optional,
4580 default=self == self.default,
4581 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4582 expl=None if self._expl is None else tag_decode(self._expl),
4587 expl_offset=self.expl_offset if self.expled else None,
4588 expl_tlen=self.expl_tlen if self.expled else None,
4589 expl_llen=self.expl_llen if self.expled else None,
4590 expl_vlen=self.expl_vlen if self.expled else None,
4591 expl_lenindef=self.expl_lenindef,
4592 ber_encoded=self.ber_encoded,
4595 for pp in self.pps_lenindef(decode_path):
4599 class Enumerated(Integer):
4600 """``ENUMERATED`` integer type
4602 This type is identical to :py:class:`pyderasn.Integer`, but requires
4603 schema to be specified and does not accept values missing from it.
4606 tag_default = tag_encode(10)
4607 asn1_type_name = "ENUMERATED"
4618 bounds=None, # dummy argument, workability for Integer.decode
4620 super(Enumerated, self).__init__(
4621 value, bounds, impl, expl, default, optional, _specs, _decoded,
4623 if len(self.specs) == 0:
4624 raise ValueError("schema must be specified")
4626 def _value_sanitize(self, value):
4627 if isinstance(value, self.__class__):
4628 value = value._value
4629 elif isinstance(value, integer_types):
4630 for _value in itervalues(self.specs):
4635 "unknown integer value: %s" % value,
4636 klass=self.__class__,
4638 elif isinstance(value, string_types):
4639 value = self.specs.get(value)
4641 raise ObjUnknown("integer value: %s" % value)
4643 raise InvalidValueType((self.__class__, int, str))
4655 return self.__class__(
4657 impl=self.tag if impl is None else impl,
4658 expl=self._expl if expl is None else expl,
4659 default=self.default if default is None else default,
4660 optional=self.optional if optional is None else optional,
4665 def escape_control_unicode(c):
4666 if unicat(c)[0] == "C":
4667 c = repr(c).lstrip("u").strip("'")
4671 class CommonString(OctetString):
4672 """Common class for all strings
4674 Everything resembles :py:class:`pyderasn.OctetString`, except
4675 ability to deal with unicode text strings.
4677 >>> hexenc("привет мир".encode("utf-8"))
4678 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4679 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4681 >>> s = UTF8String("привет мир")
4682 UTF8String UTF8String привет мир
4684 'привет мир'
4685 >>> hexenc(bytes(s))
4686 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4688 >>> PrintableString("привет мир")
4689 Traceback (most recent call last):
4690 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4692 >>> BMPString("ада", bounds=(2, 2))
4693 Traceback (most recent call last):
4694 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4695 >>> s = BMPString("ад", bounds=(2, 2))
4698 >>> hexenc(bytes(s))
4706 * - :py:class:`pyderasn.UTF8String`
4708 * - :py:class:`pyderasn.NumericString`
4710 * - :py:class:`pyderasn.PrintableString`
4712 * - :py:class:`pyderasn.TeletexString`
4714 * - :py:class:`pyderasn.T61String`
4716 * - :py:class:`pyderasn.VideotexString`
4718 * - :py:class:`pyderasn.IA5String`
4720 * - :py:class:`pyderasn.GraphicString`
4722 * - :py:class:`pyderasn.VisibleString`
4724 * - :py:class:`pyderasn.ISO646String`
4726 * - :py:class:`pyderasn.GeneralString`
4728 * - :py:class:`pyderasn.UniversalString`
4730 * - :py:class:`pyderasn.BMPString`
4735 def _value_sanitize(self, value):
4737 value_decoded = None
4738 if isinstance(value, self.__class__):
4739 value_raw = value._value
4740 elif value.__class__ == text_type:
4741 value_decoded = value
4742 elif value.__class__ == binary_type:
4745 raise InvalidValueType((self.__class__, text_type, binary_type))
4748 value_decoded.encode(self.encoding)
4749 if value_raw is None else value_raw
4752 value_raw.decode(self.encoding)
4753 if value_decoded is None else value_decoded
4755 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4756 raise DecodeError(str(err))
4757 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4765 def __eq__(self, their):
4766 if their.__class__ == binary_type:
4767 return self._value == their
4768 if their.__class__ == text_type:
4769 return self._value == their.encode(self.encoding)
4770 if not isinstance(their, self.__class__):
4773 self._value == their._value and
4774 self.tag == their.tag and
4775 self._expl == their._expl
4778 def __unicode__(self):
4780 return self._value.decode(self.encoding)
4781 return text_type(self._value)
4784 return pp_console_row(next(self.pps(no_unicode=PY2)))
4786 def pps(self, decode_path=(), no_unicode=False):
4790 hexenc(bytes(self)) if no_unicode else
4791 "".join(escape_control_unicode(c) for c in self.__unicode__())
4795 asn1_type_name=self.asn1_type_name,
4796 obj_name=self.__class__.__name__,
4797 decode_path=decode_path,
4799 optional=self.optional,
4800 default=self == self.default,
4801 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4802 expl=None if self._expl is None else tag_decode(self._expl),
4807 expl_offset=self.expl_offset if self.expled else None,
4808 expl_tlen=self.expl_tlen if self.expled else None,
4809 expl_llen=self.expl_llen if self.expled else None,
4810 expl_vlen=self.expl_vlen if self.expled else None,
4811 expl_lenindef=self.expl_lenindef,
4812 ber_encoded=self.ber_encoded,
4815 for pp in self.pps_lenindef(decode_path):
4819 class UTF8String(CommonString):
4821 tag_default = tag_encode(12)
4823 asn1_type_name = "UTF8String"
4826 class AllowableCharsMixin(object):
4828 def allowable_chars(self):
4830 return self._allowable_chars
4831 return frozenset(six_unichr(c) for c in self._allowable_chars)
4834 class NumericString(AllowableCharsMixin, CommonString):
4837 Its value is properly sanitized: only ASCII digits with spaces can
4840 >>> NumericString().allowable_chars
4841 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4844 tag_default = tag_encode(18)
4846 asn1_type_name = "NumericString"
4847 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4849 def _value_sanitize(self, value):
4850 value = super(NumericString, self)._value_sanitize(value)
4851 if not frozenset(value) <= self._allowable_chars:
4852 raise DecodeError("non-numeric value")
4856 PrintableStringState = namedtuple(
4857 "PrintableStringState",
4858 OctetStringState._fields + ("allowable_chars",),
4863 class PrintableString(AllowableCharsMixin, CommonString):
4866 Its value is properly sanitized: see X.680 41.4 table 10.
4868 >>> PrintableString().allowable_chars
4869 frozenset([' ', "'", ..., 'z'])
4870 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4871 PrintableString PrintableString foo*bar
4872 >>> obj.allow_asterisk, obj.allow_ampersand
4876 tag_default = tag_encode(19)
4878 asn1_type_name = "PrintableString"
4879 _allowable_chars = frozenset(
4880 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4882 _asterisk = frozenset("*".encode("ascii"))
4883 _ampersand = frozenset("&".encode("ascii"))
4895 allow_asterisk=False,
4896 allow_ampersand=False,
4899 :param allow_asterisk: allow asterisk character
4900 :param allow_ampersand: allow ampersand character
4903 self._allowable_chars |= self._asterisk
4905 self._allowable_chars |= self._ampersand
4906 super(PrintableString, self).__init__(
4907 value, bounds, impl, expl, default, optional, _decoded, ctx,
4911 def allow_asterisk(self):
4912 """Is asterisk character allowed?
4914 return self._asterisk <= self._allowable_chars
4917 def allow_ampersand(self):
4918 """Is ampersand character allowed?
4920 return self._ampersand <= self._allowable_chars
4922 def _value_sanitize(self, value):
4923 value = super(PrintableString, self)._value_sanitize(value)
4924 if not frozenset(value) <= self._allowable_chars:
4925 raise DecodeError("non-printable value")
4928 def __getstate__(self):
4929 return PrintableStringState(
4930 *super(PrintableString, self).__getstate__(),
4931 **{"allowable_chars": self._allowable_chars}
4934 def __setstate__(self, state):
4935 super(PrintableString, self).__setstate__(state)
4936 self._allowable_chars = state.allowable_chars
4947 return self.__class__(
4950 (self._bound_min, self._bound_max)
4951 if bounds is None else bounds
4953 impl=self.tag if impl is None else impl,
4954 expl=self._expl if expl is None else expl,
4955 default=self.default if default is None else default,
4956 optional=self.optional if optional is None else optional,
4957 allow_asterisk=self.allow_asterisk,
4958 allow_ampersand=self.allow_ampersand,
4962 class TeletexString(CommonString):
4964 tag_default = tag_encode(20)
4966 asn1_type_name = "TeletexString"
4969 class T61String(TeletexString):
4971 asn1_type_name = "T61String"
4974 class VideotexString(CommonString):
4976 tag_default = tag_encode(21)
4977 encoding = "iso-8859-1"
4978 asn1_type_name = "VideotexString"
4981 class IA5String(CommonString):
4983 tag_default = tag_encode(22)
4985 asn1_type_name = "IA5"
4988 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4989 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4990 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4991 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4992 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4993 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4996 class VisibleString(CommonString):
4998 tag_default = tag_encode(26)
5000 asn1_type_name = "VisibleString"
5003 UTCTimeState = namedtuple(
5005 OctetStringState._fields + ("ber_raw",),
5010 def str_to_time_fractions(value):
5012 year, v = (v // 10**10), (v % 10**10)
5013 month, v = (v // 10**8), (v % 10**8)
5014 day, v = (v // 10**6), (v % 10**6)
5015 hour, v = (v // 10**4), (v % 10**4)
5016 minute, second = (v // 100), (v % 100)
5017 return year, month, day, hour, minute, second
5020 class UTCTime(VisibleString):
5021 """``UTCTime`` datetime type
5023 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5024 UTCTime UTCTime 2017-09-30T22:07:50
5030 datetime.datetime(2017, 9, 30, 22, 7, 50)
5031 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5032 datetime.datetime(1957, 9, 30, 22, 7, 50)
5034 If BER encoded value was met, then ``ber_raw`` attribute will hold
5035 its raw representation.
5039 Pay attention that UTCTime can not hold full year, so all years
5040 having < 50 years are treated as 20xx, 19xx otherwise, according
5041 to X.509 recommendation.
5045 No strict validation of UTC offsets are made, but very crude:
5047 * minutes are not exceeding 60
5048 * offset value is not exceeding 14 hours
5050 __slots__ = ("ber_raw",)
5051 tag_default = tag_encode(23)
5053 asn1_type_name = "UTCTime"
5054 evgen_mode_skip_value = False
5064 bounds=None, # dummy argument, workability for OctetString.decode
5068 :param value: set the value. Either datetime type, or
5069 :py:class:`pyderasn.UTCTime` object
5070 :param bytes impl: override default tag with ``IMPLICIT`` one
5071 :param bytes expl: override default tag with ``EXPLICIT`` one
5072 :param default: set default value. Type same as in ``value``
5073 :param bool optional: is object ``OPTIONAL`` in sequence
5075 super(UTCTime, self).__init__(
5076 None, None, impl, expl, None, optional, _decoded, ctx,
5080 if value is not None:
5081 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5082 self.ber_encoded = self.ber_raw is not None
5083 if default is not None:
5084 default, _ = self._value_sanitize(default)
5085 self.default = self.__class__(
5090 if self._value is None:
5091 self._value = default
5093 self.optional = optional
5095 def _strptime_bered(self, value):
5096 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5099 raise ValueError("no timezone")
5100 year += 2000 if year < 50 else 1900
5101 decoded = datetime(year, month, day, hour, minute)
5103 if value[-1] == "Z":
5107 raise ValueError("invalid UTC offset")
5108 if value[-5] == "-":
5110 elif value[-5] == "+":
5113 raise ValueError("invalid UTC offset")
5114 v = pureint(value[-4:])
5115 offset, v = (60 * (v % 100)), v // 100
5117 raise ValueError("invalid UTC offset minutes")
5119 if offset > 14 * 3600:
5120 raise ValueError("too big UTC offset")
5124 return offset, decoded
5126 raise ValueError("invalid UTC offset seconds")
5127 seconds = pureint(value)
5129 raise ValueError("invalid seconds value")
5130 return offset, decoded + timedelta(seconds=seconds)
5132 def _strptime(self, value):
5133 # datetime.strptime's format: %y%m%d%H%M%SZ
5134 if len(value) != LEN_YYMMDDHHMMSSZ:
5135 raise ValueError("invalid UTCTime length")
5136 if value[-1] != "Z":
5137 raise ValueError("non UTC timezone")
5138 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5139 year += 2000 if year < 50 else 1900
5140 return datetime(year, month, day, hour, minute, second)
5142 def _dt_sanitize(self, value):
5143 if value.year < 1950 or value.year > 2049:
5144 raise ValueError("UTCTime can hold only 1950-2049 years")
5145 return value.replace(microsecond=0)
5147 def _value_sanitize(self, value, ctx=None):
5148 if value.__class__ == binary_type:
5150 value_decoded = value.decode("ascii")
5151 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5152 raise DecodeError("invalid UTCTime encoding: %r" % err)
5155 return self._strptime(value_decoded), None
5156 except (TypeError, ValueError) as _err:
5158 if (ctx is not None) and ctx.get("bered", False):
5160 offset, _value = self._strptime_bered(value_decoded)
5161 _value = _value - timedelta(seconds=offset)
5162 return self._dt_sanitize(_value), value
5163 except (TypeError, ValueError, OverflowError) as _err:
5166 "invalid %s format: %r" % (self.asn1_type_name, err),
5167 klass=self.__class__,
5169 if isinstance(value, self.__class__):
5170 return value._value, None
5171 if value.__class__ == datetime:
5172 return self._dt_sanitize(value), None
5173 raise InvalidValueType((self.__class__, datetime))
5175 def _pp_value(self):
5177 value = self._value.isoformat()
5178 if self.ber_encoded:
5179 value += " (%s)" % self.ber_raw
5183 def __unicode__(self):
5185 value = self._value.isoformat()
5186 if self.ber_encoded:
5187 value += " (%s)" % self.ber_raw
5189 return text_type(self._pp_value())
5191 def __getstate__(self):
5192 return UTCTimeState(
5193 *super(UTCTime, self).__getstate__(),
5194 **{"ber_raw": self.ber_raw}
5197 def __setstate__(self, state):
5198 super(UTCTime, self).__setstate__(state)
5199 self.ber_raw = state.ber_raw
5201 def __bytes__(self):
5202 self._assert_ready()
5203 return self._encode_time()
5205 def __eq__(self, their):
5206 if their.__class__ == binary_type:
5207 return self._encode_time() == their
5208 if their.__class__ == datetime:
5209 return self.todatetime() == their
5210 if not isinstance(their, self.__class__):
5213 self._value == their._value and
5214 self.tag == their.tag and
5215 self._expl == their._expl
5218 def _encode_time(self):
5219 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5222 self._assert_ready()
5223 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5225 def _encode1st(self, state):
5226 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5228 def _encode2nd(self, writer, state_iter):
5229 self._assert_ready()
5230 write_full(writer, self._encode())
5232 def _encode_cer(self, writer):
5233 write_full(writer, self._encode())
5235 def todatetime(self):
5239 return pp_console_row(next(self.pps()))
5241 def pps(self, decode_path=()):
5244 asn1_type_name=self.asn1_type_name,
5245 obj_name=self.__class__.__name__,
5246 decode_path=decode_path,
5247 value=self._pp_value(),
5248 optional=self.optional,
5249 default=self == self.default,
5250 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5251 expl=None if self._expl is None else tag_decode(self._expl),
5256 expl_offset=self.expl_offset if self.expled else None,
5257 expl_tlen=self.expl_tlen if self.expled else None,
5258 expl_llen=self.expl_llen if self.expled else None,
5259 expl_vlen=self.expl_vlen if self.expled else None,
5260 expl_lenindef=self.expl_lenindef,
5261 ber_encoded=self.ber_encoded,
5264 for pp in self.pps_lenindef(decode_path):
5268 class GeneralizedTime(UTCTime):
5269 """``GeneralizedTime`` datetime type
5271 This type is similar to :py:class:`pyderasn.UTCTime`.
5273 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5274 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5276 '20170930220750.000123Z'
5277 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5278 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5282 Only microsecond fractions are supported in DER encoding.
5283 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5284 higher precision values.
5288 BER encoded data can loss information (accuracy) during decoding
5289 because of float transformations.
5293 Local times (without explicit timezone specification) are treated
5294 as UTC one, no transformations are made.
5298 Zero year is unsupported.
5301 tag_default = tag_encode(24)
5302 asn1_type_name = "GeneralizedTime"
5304 def _dt_sanitize(self, value):
5307 def _strptime_bered(self, value):
5308 if len(value) < 4 + 3 * 2:
5309 raise ValueError("invalid GeneralizedTime")
5310 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5311 decoded = datetime(year, month, day, hour)
5312 offset, value = 0, value[10:]
5314 return offset, decoded
5315 if value[-1] == "Z":
5318 for char, sign in (("-", -1), ("+", 1)):
5319 idx = value.rfind(char)
5322 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5323 v = pureint(offset_raw)
5324 if len(offset_raw) == 4:
5325 offset, v = (60 * (v % 100)), v // 100
5327 raise ValueError("invalid UTC offset minutes")
5328 elif len(offset_raw) == 2:
5331 raise ValueError("invalid UTC offset")
5333 if offset > 14 * 3600:
5334 raise ValueError("too big UTC offset")
5338 return offset, decoded
5339 if value[0] in DECIMAL_SIGNS:
5341 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5344 raise ValueError("stripped minutes")
5345 decoded += timedelta(seconds=60 * pureint(value[:2]))
5348 return offset, decoded
5349 if value[0] in DECIMAL_SIGNS:
5351 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5354 raise ValueError("stripped seconds")
5355 decoded += timedelta(seconds=pureint(value[:2]))
5358 return offset, decoded
5359 if value[0] not in DECIMAL_SIGNS:
5360 raise ValueError("invalid format after seconds")
5362 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5365 def _strptime(self, value):
5367 if l == LEN_YYYYMMDDHHMMSSZ:
5368 # datetime.strptime's format: %Y%m%d%H%M%SZ
5369 if value[-1] != "Z":
5370 raise ValueError("non UTC timezone")
5371 return datetime(*str_to_time_fractions(value[:-1]))
5372 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5373 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5374 if value[-1] != "Z":
5375 raise ValueError("non UTC timezone")
5376 if value[14] != ".":
5377 raise ValueError("no fractions separator")
5380 raise ValueError("trailing zero")
5383 raise ValueError("only microsecond fractions are supported")
5384 us = pureint(us + ("0" * (6 - us_len)))
5385 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5386 return datetime(year, month, day, hour, minute, second, us)
5387 raise ValueError("invalid GeneralizedTime length")
5389 def _encode_time(self):
5391 encoded = value.strftime("%Y%m%d%H%M%S")
5392 if value.microsecond > 0:
5393 encoded += (".%06d" % value.microsecond).rstrip("0")
5394 return (encoded + "Z").encode("ascii")
5397 self._assert_ready()
5399 if value.microsecond > 0:
5400 encoded = self._encode_time()
5401 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5402 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5404 def _encode1st(self, state):
5405 self._assert_ready()
5406 vlen = len(self._encode_time())
5407 return len(self.tag) + len_size(vlen) + vlen, state
5409 def _encode2nd(self, writer, state_iter):
5410 write_full(writer, self._encode())
5413 class GraphicString(CommonString):
5415 tag_default = tag_encode(25)
5416 encoding = "iso-8859-1"
5417 asn1_type_name = "GraphicString"
5420 class ISO646String(VisibleString):
5422 asn1_type_name = "ISO646String"
5425 class GeneralString(CommonString):
5427 tag_default = tag_encode(27)
5428 encoding = "iso-8859-1"
5429 asn1_type_name = "GeneralString"
5432 class UniversalString(CommonString):
5434 tag_default = tag_encode(28)
5435 encoding = "utf-32-be"
5436 asn1_type_name = "UniversalString"
5439 class BMPString(CommonString):
5441 tag_default = tag_encode(30)
5442 encoding = "utf-16-be"
5443 asn1_type_name = "BMPString"
5446 ChoiceState = namedtuple(
5448 BasicState._fields + ("specs", "value",),
5454 """``CHOICE`` special type
5458 class GeneralName(Choice):
5460 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5461 ("dNSName", IA5String(impl=tag_ctxp(2))),
5464 >>> gn = GeneralName()
5466 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5467 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5468 >>> gn["dNSName"] = IA5String("bar.baz")
5469 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5470 >>> gn["rfc822Name"]
5473 [2] IA5String IA5 bar.baz
5476 >>> gn.value == gn["dNSName"]
5479 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5481 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5482 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5484 __slots__ = ("specs",)
5486 asn1_type_name = "CHOICE"
5499 :param value: set the value. Either ``(choice, value)`` tuple, or
5500 :py:class:`pyderasn.Choice` object
5501 :param bytes impl: can not be set, do **not** use it
5502 :param bytes expl: override default tag with ``EXPLICIT`` one
5503 :param default: set default value. Type same as in ``value``
5504 :param bool optional: is object ``OPTIONAL`` in sequence
5506 if impl is not None:
5507 raise ValueError("no implicit tag allowed for CHOICE")
5508 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5510 schema = getattr(self, "schema", ())
5511 if len(schema) == 0:
5512 raise ValueError("schema must be specified")
5514 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5517 if value is not None:
5518 self._value = self._value_sanitize(value)
5519 if default is not None:
5520 default_value = self._value_sanitize(default)
5521 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5522 default_obj.specs = self.specs
5523 default_obj._value = default_value
5524 self.default = default_obj
5526 self._value = copy(default_obj._value)
5527 if self._expl is not None:
5528 tag_class, _, tag_num = tag_decode(self._expl)
5529 self._tag_order = (tag_class, tag_num)
5531 def _value_sanitize(self, value):
5532 if (value.__class__ == tuple) and len(value) == 2:
5534 spec = self.specs.get(choice)
5536 raise ObjUnknown(choice)
5537 if not isinstance(obj, spec.__class__):
5538 raise InvalidValueType((spec,))
5539 return (choice, spec(obj))
5540 if isinstance(value, self.__class__):
5542 raise InvalidValueType((self.__class__, tuple))
5546 return self._value is not None and self._value[1].ready
5550 return self.expl_lenindef or (
5551 (self._value is not None) and
5552 self._value[1].bered
5555 def __getstate__(self):
5573 def __setstate__(self, state):
5574 super(Choice, self).__setstate__(state)
5575 self.specs = state.specs
5576 self._value = state.value
5578 def __eq__(self, their):
5579 if (their.__class__ == tuple) and len(their) == 2:
5580 return self._value == their
5581 if not isinstance(their, self.__class__):
5584 self.specs == their.specs and
5585 self._value == their._value
5595 return self.__class__(
5598 expl=self._expl if expl is None else expl,
5599 default=self.default if default is None else default,
5600 optional=self.optional if optional is None else optional,
5605 """Name of the choice
5607 self._assert_ready()
5608 return self._value[0]
5612 """Value of underlying choice
5614 self._assert_ready()
5615 return self._value[1]
5618 def tag_order(self):
5619 self._assert_ready()
5620 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5623 def tag_order_cer(self):
5624 return min(v.tag_order_cer for v in itervalues(self.specs))
5626 def __getitem__(self, key):
5627 if key not in self.specs:
5628 raise ObjUnknown(key)
5629 if self._value is None:
5631 choice, value = self._value
5636 def __setitem__(self, key, value):
5637 spec = self.specs.get(key)
5639 raise ObjUnknown(key)
5640 if not isinstance(value, spec.__class__):
5641 raise InvalidValueType((spec.__class__,))
5642 self._value = (key, spec(value))
5650 return self._value[1].decoded if self.ready else False
5653 self._assert_ready()
5654 return self._value[1].encode()
5656 def _encode1st(self, state):
5657 self._assert_ready()
5658 return self._value[1].encode1st(state)
5660 def _encode2nd(self, writer, state_iter):
5661 self._value[1].encode2nd(writer, state_iter)
5663 def _encode_cer(self, writer):
5664 self._assert_ready()
5665 self._value[1].encode_cer(writer)
5667 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5668 for choice, spec in iteritems(self.specs):
5669 sub_decode_path = decode_path + (choice,)
5675 decode_path=sub_decode_path,
5678 _ctx_immutable=False,
5685 klass=self.__class__,
5686 decode_path=decode_path,
5689 if tag_only: # pragma: no cover
5693 for _decode_path, value, tail in spec.decode_evgen(
5697 decode_path=sub_decode_path,
5699 _ctx_immutable=False,
5701 yield _decode_path, value, tail
5703 _, value, tail = next(spec.decode_evgen(
5707 decode_path=sub_decode_path,
5709 _ctx_immutable=False,
5712 obj = self.__class__(
5715 default=self.default,
5716 optional=self.optional,
5717 _decoded=(offset, 0, value.fulllen),
5719 obj._value = (choice, value)
5720 yield decode_path, obj, tail
5723 value = pp_console_row(next(self.pps()))
5725 value = "%s[%r]" % (value, self.value)
5728 def pps(self, decode_path=()):
5731 asn1_type_name=self.asn1_type_name,
5732 obj_name=self.__class__.__name__,
5733 decode_path=decode_path,
5734 value=self.choice if self.ready else None,
5735 optional=self.optional,
5736 default=self == self.default,
5737 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5738 expl=None if self._expl is None else tag_decode(self._expl),
5743 expl_lenindef=self.expl_lenindef,
5747 yield self.value.pps(decode_path=decode_path + (self.choice,))
5748 for pp in self.pps_lenindef(decode_path):
5752 class PrimitiveTypes(Choice):
5753 """Predefined ``CHOICE`` for all generic primitive types
5755 It could be useful for general decoding of some unspecified values:
5757 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5758 OCTET STRING 3 bytes 666f6f
5759 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5763 schema = tuple((klass.__name__, klass()) for klass in (
5787 AnyState = namedtuple(
5789 BasicState._fields + ("value", "defined"),
5795 """``ANY`` special type
5797 >>> Any(Integer(-123))
5798 ANY INTEGER -123 (0X:7B)
5799 >>> a = Any(OctetString(b"hello world").encode())
5800 ANY 040b68656c6c6f20776f726c64
5801 >>> hexenc(bytes(a))
5802 b'0x040x0bhello world'
5804 __slots__ = ("defined",)
5805 tag_default = tag_encode(0)
5806 asn1_type_name = "ANY"
5816 :param value: set the value. Either any kind of pyderasn's
5817 **ready** object, or bytes. Pay attention that
5818 **no** validation is performed if raw binary value
5819 is valid TLV, except just tag decoding
5820 :param bytes expl: override default tag with ``EXPLICIT`` one
5821 :param bool optional: is object ``OPTIONAL`` in sequence
5823 super(Any, self).__init__(None, expl, None, optional, _decoded)
5827 value = self._value_sanitize(value)
5829 if self._expl is None:
5830 if value.__class__ == binary_type:
5831 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5833 tag_class, tag_num = value.tag_order
5835 tag_class, _, tag_num = tag_decode(self._expl)
5836 self._tag_order = (tag_class, tag_num)
5839 def _value_sanitize(self, value):
5840 if value.__class__ == binary_type:
5842 raise ValueError("Any value can not be empty")
5844 if isinstance(value, self.__class__):
5846 if not isinstance(value, Obj):
5847 raise InvalidValueType((self.__class__, Obj, binary_type))
5852 return self._value is not None
5855 def tag_order(self):
5856 self._assert_ready()
5857 return self._tag_order
5861 if self.expl_lenindef or self.lenindef:
5863 if self.defined is None:
5865 return self.defined[1].bered
5867 def __getstate__(self):
5885 def __setstate__(self, state):
5886 super(Any, self).__setstate__(state)
5887 self._value = state.value
5888 self.defined = state.defined
5890 def __eq__(self, their):
5891 if their.__class__ == binary_type:
5892 if self._value.__class__ == binary_type:
5893 return self._value == their
5894 return self._value.encode() == their
5895 if issubclass(their.__class__, Any):
5896 if self.ready and their.ready:
5897 return bytes(self) == bytes(their)
5898 return self.ready == their.ready
5907 return self.__class__(
5909 expl=self._expl if expl is None else expl,
5910 optional=self.optional if optional is None else optional,
5913 def __bytes__(self):
5914 self._assert_ready()
5916 if value.__class__ == binary_type:
5918 return self._value.encode()
5925 self._assert_ready()
5927 if value.__class__ == binary_type:
5929 return value.encode()
5931 def _encode1st(self, state):
5932 self._assert_ready()
5934 if value.__class__ == binary_type:
5935 return len(value), state
5936 return value.encode1st(state)
5938 def _encode2nd(self, writer, state_iter):
5940 if value.__class__ == binary_type:
5941 write_full(writer, value)
5943 value.encode2nd(writer, state_iter)
5945 def _encode_cer(self, writer):
5946 self._assert_ready()
5948 if value.__class__ == binary_type:
5949 write_full(writer, value)
5951 value.encode_cer(writer)
5953 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5955 t, tlen, lv = tag_strip(tlv)
5956 except DecodeError as err:
5957 raise err.__class__(
5959 klass=self.__class__,
5960 decode_path=decode_path,
5964 l, llen, v = len_decode(lv)
5965 except LenIndefForm as err:
5966 if not ctx.get("bered", False):
5967 raise err.__class__(
5969 klass=self.__class__,
5970 decode_path=decode_path,
5973 llen, vlen, v = 1, 0, lv[1:]
5974 sub_offset = offset + tlen + llen
5976 while v[:EOC_LEN].tobytes() != EOC:
5977 chunk, v = Any().decode(
5980 decode_path=decode_path + (str(chunk_i),),
5983 _ctx_immutable=False,
5985 vlen += chunk.tlvlen
5986 sub_offset += chunk.tlvlen
5988 tlvlen = tlen + llen + vlen + EOC_LEN
5989 obj = self.__class__(
5990 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5992 optional=self.optional,
5993 _decoded=(offset, 0, tlvlen),
5996 obj.tag = t.tobytes()
5997 yield decode_path, obj, v[EOC_LEN:]
5999 except DecodeError as err:
6000 raise err.__class__(
6002 klass=self.__class__,
6003 decode_path=decode_path,
6007 raise NotEnoughData(
6008 "encoded length is longer than data",
6009 klass=self.__class__,
6010 decode_path=decode_path,
6013 tlvlen = tlen + llen + l
6014 v, tail = tlv[:tlvlen], v[l:]
6015 obj = self.__class__(
6016 value=None if evgen_mode else v.tobytes(),
6018 optional=self.optional,
6019 _decoded=(offset, 0, tlvlen),
6021 obj.tag = t.tobytes()
6022 yield decode_path, obj, tail
6025 return pp_console_row(next(self.pps()))
6027 def pps(self, decode_path=()):
6031 elif value.__class__ == binary_type:
6037 asn1_type_name=self.asn1_type_name,
6038 obj_name=self.__class__.__name__,
6039 decode_path=decode_path,
6041 blob=self._value if self._value.__class__ == binary_type else None,
6042 optional=self.optional,
6043 default=self == self.default,
6044 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6045 expl=None if self._expl is None else tag_decode(self._expl),
6050 expl_offset=self.expl_offset if self.expled else None,
6051 expl_tlen=self.expl_tlen if self.expled else None,
6052 expl_llen=self.expl_llen if self.expled else None,
6053 expl_vlen=self.expl_vlen if self.expled else None,
6054 expl_lenindef=self.expl_lenindef,
6055 lenindef=self.lenindef,
6058 defined_by, defined = self.defined or (None, None)
6059 if defined_by is not None:
6061 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6063 for pp in self.pps_lenindef(decode_path):
6067 ########################################################################
6068 # ASN.1 constructed types
6069 ########################################################################
6071 def abs_decode_path(decode_path, rel_path):
6072 """Create an absolute decode path from current and relative ones
6074 :param decode_path: current decode path, starting point. Tuple of strings
6075 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6076 If first tuple's element is "/", then treat it as
6077 an absolute path, ignoring ``decode_path`` as
6078 starting point. Also this tuple can contain ".."
6079 elements, stripping the leading element from
6082 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6083 ("foo", "bar", "baz", "whatever")
6084 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6086 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6089 if rel_path[0] == "/":
6091 if rel_path[0] == "..":
6092 return abs_decode_path(decode_path[:-1], rel_path[1:])
6093 return decode_path + rel_path
6096 SequenceState = namedtuple(
6098 BasicState._fields + ("specs", "value",),
6103 class SequenceEncode1stMixing(object):
6104 def _encode1st(self, state):
6106 idx = len(state) - 1
6108 for v in self._values_for_encoding():
6109 l, _ = v.encode1st(state)
6112 return len(self.tag) + len_size(vlen) + vlen, state
6115 class Sequence(SequenceEncode1stMixing, Obj):
6116 """``SEQUENCE`` structure type
6118 You have to make specification of sequence::
6120 class Extension(Sequence):
6122 ("extnID", ObjectIdentifier()),
6123 ("critical", Boolean(default=False)),
6124 ("extnValue", OctetString()),
6127 Then, you can work with it as with dictionary.
6129 >>> ext = Extension()
6130 >>> Extension().specs
6132 ('extnID', OBJECT IDENTIFIER),
6133 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6134 ('extnValue', OCTET STRING),
6136 >>> ext["extnID"] = "1.2.3"
6137 Traceback (most recent call last):
6138 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6139 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6141 You can determine if sequence is ready to be encoded:
6146 Traceback (most recent call last):
6147 pyderasn.ObjNotReady: object is not ready: extnValue
6148 >>> ext["extnValue"] = OctetString(b"foobar")
6152 Value you want to assign, must have the same **type** as in
6153 corresponding specification, but it can have different tags,
6154 optional/default attributes -- they will be taken from specification
6157 class TBSCertificate(Sequence):
6159 ("version", Version(expl=tag_ctxc(0), default="v1")),
6162 >>> tbs = TBSCertificate()
6163 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6165 Assign ``None`` to remove value from sequence.
6167 You can set values in Sequence during its initialization:
6169 >>> AlgorithmIdentifier((
6170 ("algorithm", ObjectIdentifier("1.2.3")),
6171 ("parameters", Any(Null()))
6173 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6175 You can determine if value exists/set in the sequence and take its value:
6177 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6180 OBJECT IDENTIFIER 1.2.3
6182 But pay attention that if value has default, then it won't be (not
6183 in) in the sequence (because ``DEFAULT`` must not be encoded in
6184 DER), but you can read its value:
6186 >>> "critical" in ext, ext["critical"]
6187 (False, BOOLEAN False)
6188 >>> ext["critical"] = Boolean(True)
6189 >>> "critical" in ext, ext["critical"]
6190 (True, BOOLEAN True)
6192 All defaulted values are always optional.
6194 .. _allow_default_values_ctx:
6196 DER prohibits default value encoding and will raise an error if
6197 default value is unexpectedly met during decode.
6198 If :ref:`bered <bered_ctx>` context option is set, then no error
6199 will be raised, but ``bered`` attribute set. You can disable strict
6200 defaulted values existence validation by setting
6201 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6203 All values with DEFAULT specified are decoded atomically in
6204 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6205 SEQUENCE, then it will be yielded as a single element, not
6206 disassembled. That is required for DEFAULT existence check.
6208 Two sequences are equal if they have equal specification (schema),
6209 implicit/explicit tagging and the same values.
6211 __slots__ = ("specs",)
6212 tag_default = tag_encode(form=TagFormConstructed, num=16)
6213 asn1_type_name = "SEQUENCE"
6225 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6227 schema = getattr(self, "schema", ())
6229 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6232 if value is not None:
6233 if issubclass(value.__class__, Sequence):
6234 self._value = value._value
6235 elif hasattr(value, "__iter__"):
6236 for seq_key, seq_value in value:
6237 self[seq_key] = seq_value
6239 raise InvalidValueType((Sequence,))
6240 if default is not None:
6241 if not issubclass(default.__class__, Sequence):
6242 raise InvalidValueType((Sequence,))
6243 default_value = default._value
6244 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6245 default_obj.specs = self.specs
6246 default_obj._value = default_value
6247 self.default = default_obj
6249 self._value = copy(default_obj._value)
6253 for name, spec in iteritems(self.specs):
6254 value = self._value.get(name)
6265 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6267 return any(value.bered for value in itervalues(self._value))
6269 def __getstate__(self):
6270 return SequenceState(
6284 {k: copy(v) for k, v in iteritems(self._value)},
6287 def __setstate__(self, state):
6288 super(Sequence, self).__setstate__(state)
6289 self.specs = state.specs
6290 self._value = state.value
6292 def __eq__(self, their):
6293 if not isinstance(their, self.__class__):
6296 self.specs == their.specs and
6297 self.tag == their.tag and
6298 self._expl == their._expl and
6299 self._value == their._value
6310 return self.__class__(
6313 impl=self.tag if impl is None else impl,
6314 expl=self._expl if expl is None else expl,
6315 default=self.default if default is None else default,
6316 optional=self.optional if optional is None else optional,
6319 def __contains__(self, key):
6320 return key in self._value
6322 def __setitem__(self, key, value):
6323 spec = self.specs.get(key)
6325 raise ObjUnknown(key)
6327 self._value.pop(key, None)
6329 if not isinstance(value, spec.__class__):
6330 raise InvalidValueType((spec.__class__,))
6331 value = spec(value=value)
6332 if spec.default is not None and value == spec.default:
6333 self._value.pop(key, None)
6335 self._value[key] = value
6337 def __getitem__(self, key):
6338 value = self._value.get(key)
6339 if value is not None:
6341 spec = self.specs.get(key)
6343 raise ObjUnknown(key)
6344 if spec.default is not None:
6348 def _values_for_encoding(self):
6349 for name, spec in iteritems(self.specs):
6350 value = self._value.get(name)
6354 raise ObjNotReady(name)
6358 v = b"".join(v.encode() for v in self._values_for_encoding())
6359 return b"".join((self.tag, len_encode(len(v)), v))
6361 def _encode2nd(self, writer, state_iter):
6362 write_full(writer, self.tag + len_encode(next(state_iter)))
6363 for v in self._values_for_encoding():
6364 v.encode2nd(writer, state_iter)
6366 def _encode_cer(self, writer):
6367 write_full(writer, self.tag + LENINDEF)
6368 for v in self._values_for_encoding():
6369 v.encode_cer(writer)
6370 write_full(writer, EOC)
6372 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6374 t, tlen, lv = tag_strip(tlv)
6375 except DecodeError as err:
6376 raise err.__class__(
6378 klass=self.__class__,
6379 decode_path=decode_path,
6384 klass=self.__class__,
6385 decode_path=decode_path,
6388 if tag_only: # pragma: no cover
6392 ctx_bered = ctx.get("bered", False)
6394 l, llen, v = len_decode(lv)
6395 except LenIndefForm as err:
6397 raise err.__class__(
6399 klass=self.__class__,
6400 decode_path=decode_path,
6403 l, llen, v = 0, 1, lv[1:]
6405 except DecodeError as err:
6406 raise err.__class__(
6408 klass=self.__class__,
6409 decode_path=decode_path,
6413 raise NotEnoughData(
6414 "encoded length is longer than data",
6415 klass=self.__class__,
6416 decode_path=decode_path,
6420 v, tail = v[:l], v[l:]
6422 sub_offset = offset + tlen + llen
6425 ctx_allow_default_values = ctx.get("allow_default_values", False)
6426 for name, spec in iteritems(self.specs):
6427 if spec.optional and (
6428 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6432 spec_defaulted = spec.default is not None
6433 sub_decode_path = decode_path + (name,)
6435 if evgen_mode and not spec_defaulted:
6436 for _decode_path, value, v_tail in spec.decode_evgen(
6440 decode_path=sub_decode_path,
6442 _ctx_immutable=False,
6444 yield _decode_path, value, v_tail
6446 _, value, v_tail = next(spec.decode_evgen(
6450 decode_path=sub_decode_path,
6452 _ctx_immutable=False,
6455 except TagMismatch as err:
6456 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6460 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6461 if not evgen_mode and defined is not None:
6462 defined_by, defined_spec = defined
6463 if issubclass(value.__class__, SequenceOf):
6464 for i, _value in enumerate(value):
6465 sub_sub_decode_path = sub_decode_path + (
6467 DecodePathDefBy(defined_by),
6469 defined_value, defined_tail = defined_spec.decode(
6470 memoryview(bytes(_value)),
6472 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6473 if value.expled else (value.tlen + value.llen)
6476 decode_path=sub_sub_decode_path,
6478 _ctx_immutable=False,
6480 if len(defined_tail) > 0:
6483 klass=self.__class__,
6484 decode_path=sub_sub_decode_path,
6487 _value.defined = (defined_by, defined_value)
6489 defined_value, defined_tail = defined_spec.decode(
6490 memoryview(bytes(value)),
6492 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6493 if value.expled else (value.tlen + value.llen)
6496 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6498 _ctx_immutable=False,
6500 if len(defined_tail) > 0:
6503 klass=self.__class__,
6504 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6507 value.defined = (defined_by, defined_value)
6509 value_len = value.fulllen
6511 sub_offset += value_len
6515 yield sub_decode_path, value, v_tail
6516 if value == spec.default:
6517 if ctx_bered or ctx_allow_default_values:
6521 "DEFAULT value met",
6522 klass=self.__class__,
6523 decode_path=sub_decode_path,
6527 values[name] = value
6528 spec_defines = getattr(spec, "defines", ())
6529 if len(spec_defines) == 0:
6530 defines_by_path = ctx.get("defines_by_path", ())
6531 if len(defines_by_path) > 0:
6532 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6533 if spec_defines is not None and len(spec_defines) > 0:
6534 for rel_path, schema in spec_defines:
6535 defined = schema.get(value, None)
6536 if defined is not None:
6537 ctx.setdefault("_defines", []).append((
6538 abs_decode_path(sub_decode_path[:-1], rel_path),
6542 if v[:EOC_LEN].tobytes() != EOC:
6545 klass=self.__class__,
6546 decode_path=decode_path,
6554 klass=self.__class__,
6555 decode_path=decode_path,
6558 obj = self.__class__(
6562 default=self.default,
6563 optional=self.optional,
6564 _decoded=(offset, llen, vlen),
6567 obj.lenindef = lenindef
6568 obj.ber_encoded = ber_encoded
6569 yield decode_path, obj, tail
6572 value = pp_console_row(next(self.pps()))
6574 for name in self.specs:
6575 _value = self._value.get(name)
6578 cols.append("%s: %s" % (name, repr(_value)))
6579 return "%s[%s]" % (value, "; ".join(cols))
6581 def pps(self, decode_path=()):
6584 asn1_type_name=self.asn1_type_name,
6585 obj_name=self.__class__.__name__,
6586 decode_path=decode_path,
6587 optional=self.optional,
6588 default=self == self.default,
6589 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6590 expl=None if self._expl is None else tag_decode(self._expl),
6595 expl_offset=self.expl_offset if self.expled else None,
6596 expl_tlen=self.expl_tlen if self.expled else None,
6597 expl_llen=self.expl_llen if self.expled else None,
6598 expl_vlen=self.expl_vlen if self.expled else None,
6599 expl_lenindef=self.expl_lenindef,
6600 lenindef=self.lenindef,
6601 ber_encoded=self.ber_encoded,
6604 for name in self.specs:
6605 value = self._value.get(name)
6608 yield value.pps(decode_path=decode_path + (name,))
6609 for pp in self.pps_lenindef(decode_path):
6613 class Set(Sequence, SequenceEncode1stMixing):
6614 """``SET`` structure type
6616 Its usage is identical to :py:class:`pyderasn.Sequence`.
6618 .. _allow_unordered_set_ctx:
6620 DER prohibits unordered values encoding and will raise an error
6621 during decode. If :ref:`bered <bered_ctx>` context option is set,
6622 then no error will occur. Also you can disable strict values
6623 ordering check by setting ``"allow_unordered_set": True``
6624 :ref:`context <ctx>` option.
6627 tag_default = tag_encode(form=TagFormConstructed, num=17)
6628 asn1_type_name = "SET"
6630 def _values_for_encoding(self):
6632 super(Set, self)._values_for_encoding(),
6633 key=attrgetter("tag_order"),
6636 def _encode_cer(self, writer):
6637 write_full(writer, self.tag + LENINDEF)
6639 super(Set, self)._values_for_encoding(),
6640 key=attrgetter("tag_order_cer"),
6642 v.encode_cer(writer)
6643 write_full(writer, EOC)
6645 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6647 t, tlen, lv = tag_strip(tlv)
6648 except DecodeError as err:
6649 raise err.__class__(
6651 klass=self.__class__,
6652 decode_path=decode_path,
6657 klass=self.__class__,
6658 decode_path=decode_path,
6665 ctx_bered = ctx.get("bered", False)
6667 l, llen, v = len_decode(lv)
6668 except LenIndefForm as err:
6670 raise err.__class__(
6672 klass=self.__class__,
6673 decode_path=decode_path,
6676 l, llen, v = 0, 1, lv[1:]
6678 except DecodeError as err:
6679 raise err.__class__(
6681 klass=self.__class__,
6682 decode_path=decode_path,
6686 raise NotEnoughData(
6687 "encoded length is longer than data",
6688 klass=self.__class__,
6692 v, tail = v[:l], v[l:]
6694 sub_offset = offset + tlen + llen
6697 ctx_allow_default_values = ctx.get("allow_default_values", False)
6698 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6699 tag_order_prev = (0, 0)
6700 _specs_items = copy(self.specs)
6703 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6705 for name, spec in iteritems(_specs_items):
6706 sub_decode_path = decode_path + (name,)
6712 decode_path=sub_decode_path,
6715 _ctx_immutable=False,
6722 klass=self.__class__,
6723 decode_path=decode_path,
6726 spec_defaulted = spec.default is not None
6727 if evgen_mode and not spec_defaulted:
6728 for _decode_path, value, v_tail in spec.decode_evgen(
6732 decode_path=sub_decode_path,
6734 _ctx_immutable=False,
6736 yield _decode_path, value, v_tail
6738 _, value, v_tail = next(spec.decode_evgen(
6742 decode_path=sub_decode_path,
6744 _ctx_immutable=False,
6747 value_tag_order = value.tag_order
6748 value_len = value.fulllen
6749 if tag_order_prev >= value_tag_order:
6750 if ctx_bered or ctx_allow_unordered_set:
6754 "unordered " + self.asn1_type_name,
6755 klass=self.__class__,
6756 decode_path=sub_decode_path,
6761 yield sub_decode_path, value, v_tail
6762 if value != spec.default:
6764 elif ctx_bered or ctx_allow_default_values:
6768 "DEFAULT value met",
6769 klass=self.__class__,
6770 decode_path=sub_decode_path,
6773 values[name] = value
6774 del _specs_items[name]
6775 tag_order_prev = value_tag_order
6776 sub_offset += value_len
6780 obj = self.__class__(
6784 default=self.default,
6785 optional=self.optional,
6786 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6789 if v[:EOC_LEN].tobytes() != EOC:
6792 klass=self.__class__,
6793 decode_path=decode_path,
6798 for name, spec in iteritems(self.specs):
6799 if name not in values and not spec.optional:
6801 "%s value is not ready" % name,
6802 klass=self.__class__,
6803 decode_path=decode_path,
6808 obj.ber_encoded = ber_encoded
6809 yield decode_path, obj, tail
6812 SequenceOfState = namedtuple(
6814 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6819 class SequenceOf(SequenceEncode1stMixing, Obj):
6820 """``SEQUENCE OF`` sequence type
6822 For that kind of type you must specify the object it will carry on
6823 (bounds are for example here, not required)::
6825 class Ints(SequenceOf):
6830 >>> ints.append(Integer(123))
6831 >>> ints.append(Integer(234))
6833 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6834 >>> [int(i) for i in ints]
6836 >>> ints.append(Integer(345))
6837 Traceback (most recent call last):
6838 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6841 >>> ints[1] = Integer(345)
6843 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6845 You can initialize sequence with preinitialized values:
6847 >>> ints = Ints([Integer(123), Integer(234)])
6849 Also you can use iterator as a value:
6851 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6853 And it won't be iterated until encoding process. Pay attention that
6854 bounds and required schema checks are done only during the encoding
6855 process in that case! After encode was called, then value is zeroed
6856 back to empty list and you have to set it again. That mode is useful
6857 mainly with CER encoding mode, where all objects from the iterable
6858 will be streamed to the buffer, without copying all of them to
6861 __slots__ = ("spec", "_bound_min", "_bound_max")
6862 tag_default = tag_encode(form=TagFormConstructed, num=16)
6863 asn1_type_name = "SEQUENCE OF"
6876 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6878 schema = getattr(self, "schema", None)
6880 raise ValueError("schema must be specified")
6882 self._bound_min, self._bound_max = getattr(
6886 ) if bounds is None else bounds
6888 if value is not None:
6889 self._value = self._value_sanitize(value)
6890 if default is not None:
6891 default_value = self._value_sanitize(default)
6892 default_obj = self.__class__(
6897 default_obj._value = default_value
6898 self.default = default_obj
6900 self._value = copy(default_obj._value)
6902 def _value_sanitize(self, value):
6904 if issubclass(value.__class__, SequenceOf):
6905 value = value._value
6906 elif hasattr(value, NEXT_ATTR_NAME):
6908 elif hasattr(value, "__iter__"):
6911 raise InvalidValueType((self.__class__, iter, "iterator"))
6913 if not self._bound_min <= len(value) <= self._bound_max:
6914 raise BoundsError(self._bound_min, len(value), self._bound_max)
6915 class_expected = self.spec.__class__
6917 if not isinstance(v, class_expected):
6918 raise InvalidValueType((class_expected,))
6923 if hasattr(self._value, NEXT_ATTR_NAME):
6925 if self._bound_min > 0 and len(self._value) == 0:
6927 return all(v.ready for v in self._value)
6931 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6933 return any(v.bered for v in self._value)
6935 def __getstate__(self):
6936 if hasattr(self._value, NEXT_ATTR_NAME):
6937 raise ValueError("can not pickle SequenceOf with iterator")
6938 return SequenceOfState(
6952 [copy(v) for v in self._value],
6957 def __setstate__(self, state):
6958 super(SequenceOf, self).__setstate__(state)
6959 self.spec = state.spec
6960 self._value = state.value
6961 self._bound_min = state.bound_min
6962 self._bound_max = state.bound_max
6964 def __eq__(self, their):
6965 if isinstance(their, self.__class__):
6967 self.spec == their.spec and
6968 self.tag == their.tag and
6969 self._expl == their._expl and
6970 self._value == their._value
6972 if hasattr(their, "__iter__"):
6973 return self._value == list(their)
6985 return self.__class__(
6989 (self._bound_min, self._bound_max)
6990 if bounds is None else bounds
6992 impl=self.tag if impl is None else impl,
6993 expl=self._expl if expl is None else expl,
6994 default=self.default if default is None else default,
6995 optional=self.optional if optional is None else optional,
6998 def __contains__(self, key):
6999 return key in self._value
7001 def append(self, value):
7002 if not isinstance(value, self.spec.__class__):
7003 raise InvalidValueType((self.spec.__class__,))
7004 if len(self._value) + 1 > self._bound_max:
7007 len(self._value) + 1,
7010 self._value.append(value)
7013 return iter(self._value)
7016 return len(self._value)
7018 def __setitem__(self, key, value):
7019 if not isinstance(value, self.spec.__class__):
7020 raise InvalidValueType((self.spec.__class__,))
7021 self._value[key] = self.spec(value=value)
7023 def __getitem__(self, key):
7024 return self._value[key]
7026 def _values_for_encoding(self):
7027 return iter(self._value)
7030 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7033 values_append = values.append
7034 class_expected = self.spec.__class__
7035 values_for_encoding = self._values_for_encoding()
7037 for v in values_for_encoding:
7038 if not isinstance(v, class_expected):
7039 raise InvalidValueType((class_expected,))
7040 values_append(v.encode())
7041 if not self._bound_min <= len(values) <= self._bound_max:
7042 raise BoundsError(self._bound_min, len(values), self._bound_max)
7043 value = b"".join(values)
7045 value = b"".join(v.encode() for v in self._values_for_encoding())
7046 return b"".join((self.tag, len_encode(len(value)), value))
7048 def _encode1st(self, state):
7049 state = super(SequenceOf, self)._encode1st(state)
7050 if hasattr(self._value, NEXT_ATTR_NAME):
7054 def _encode2nd(self, writer, state_iter):
7055 write_full(writer, self.tag + len_encode(next(state_iter)))
7056 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7059 class_expected = self.spec.__class__
7060 values_for_encoding = self._values_for_encoding()
7062 for v in values_for_encoding:
7063 if not isinstance(v, class_expected):
7064 raise InvalidValueType((class_expected,))
7065 v.encode2nd(writer, state_iter)
7067 if not self._bound_min <= values_count <= self._bound_max:
7068 raise BoundsError(self._bound_min, values_count, self._bound_max)
7070 for v in self._values_for_encoding():
7071 v.encode2nd(writer, state_iter)
7073 def _encode_cer(self, writer):
7074 write_full(writer, self.tag + LENINDEF)
7075 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7077 class_expected = self.spec.__class__
7079 values_for_encoding = self._values_for_encoding()
7081 for v in values_for_encoding:
7082 if not isinstance(v, class_expected):
7083 raise InvalidValueType((class_expected,))
7084 v.encode_cer(writer)
7086 if not self._bound_min <= values_count <= self._bound_max:
7087 raise BoundsError(self._bound_min, values_count, self._bound_max)
7089 for v in self._values_for_encoding():
7090 v.encode_cer(writer)
7091 write_full(writer, EOC)
7101 ordering_check=False,
7104 t, tlen, lv = tag_strip(tlv)
7105 except DecodeError as err:
7106 raise err.__class__(
7108 klass=self.__class__,
7109 decode_path=decode_path,
7114 klass=self.__class__,
7115 decode_path=decode_path,
7122 ctx_bered = ctx.get("bered", False)
7124 l, llen, v = len_decode(lv)
7125 except LenIndefForm as err:
7127 raise err.__class__(
7129 klass=self.__class__,
7130 decode_path=decode_path,
7133 l, llen, v = 0, 1, lv[1:]
7135 except DecodeError as err:
7136 raise err.__class__(
7138 klass=self.__class__,
7139 decode_path=decode_path,
7143 raise NotEnoughData(
7144 "encoded length is longer than data",
7145 klass=self.__class__,
7146 decode_path=decode_path,
7150 v, tail = v[:l], v[l:]
7152 sub_offset = offset + tlen + llen
7155 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7156 value_prev = memoryview(v[:0])
7160 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7162 sub_decode_path = decode_path + (str(_value_count),)
7164 for _decode_path, value, v_tail in spec.decode_evgen(
7168 decode_path=sub_decode_path,
7170 _ctx_immutable=False,
7172 yield _decode_path, value, v_tail
7174 _, value, v_tail = next(spec.decode_evgen(
7178 decode_path=sub_decode_path,
7180 _ctx_immutable=False,
7183 value_len = value.fulllen
7185 if value_prev.tobytes() > v[:value_len].tobytes():
7186 if ctx_bered or ctx_allow_unordered_set:
7190 "unordered " + self.asn1_type_name,
7191 klass=self.__class__,
7192 decode_path=sub_decode_path,
7195 value_prev = v[:value_len]
7198 _value.append(value)
7199 sub_offset += value_len
7202 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7204 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7205 klass=self.__class__,
7206 decode_path=decode_path,
7210 obj = self.__class__(
7211 value=None if evgen_mode else _value,
7213 bounds=(self._bound_min, self._bound_max),
7216 default=self.default,
7217 optional=self.optional,
7218 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7220 except BoundsError as err:
7223 klass=self.__class__,
7224 decode_path=decode_path,
7228 if v[:EOC_LEN].tobytes() != EOC:
7231 klass=self.__class__,
7232 decode_path=decode_path,
7237 obj.ber_encoded = ber_encoded
7238 yield decode_path, obj, tail
7242 pp_console_row(next(self.pps())),
7243 ", ".join(repr(v) for v in self._value),
7246 def pps(self, decode_path=()):
7249 asn1_type_name=self.asn1_type_name,
7250 obj_name=self.__class__.__name__,
7251 decode_path=decode_path,
7252 optional=self.optional,
7253 default=self == self.default,
7254 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7255 expl=None if self._expl is None else tag_decode(self._expl),
7260 expl_offset=self.expl_offset if self.expled else None,
7261 expl_tlen=self.expl_tlen if self.expled else None,
7262 expl_llen=self.expl_llen if self.expled else None,
7263 expl_vlen=self.expl_vlen if self.expled else None,
7264 expl_lenindef=self.expl_lenindef,
7265 lenindef=self.lenindef,
7266 ber_encoded=self.ber_encoded,
7269 for i, value in enumerate(self._value):
7270 yield value.pps(decode_path=decode_path + (str(i),))
7271 for pp in self.pps_lenindef(decode_path):
7275 class SetOf(SequenceOf):
7276 """``SET OF`` sequence type
7278 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7281 tag_default = tag_encode(form=TagFormConstructed, num=17)
7282 asn1_type_name = "SET OF"
7284 def _value_sanitize(self, value):
7285 value = super(SetOf, self)._value_sanitize(value)
7286 if hasattr(value, NEXT_ATTR_NAME):
7288 "SetOf does not support iterator values, as no sense in them"
7293 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7294 return b"".join((self.tag, len_encode(len(v)), v))
7296 def _encode2nd(self, writer, state_iter):
7297 write_full(writer, self.tag + len_encode(next(state_iter)))
7299 for v in self._values_for_encoding():
7301 v.encode2nd(buf.write, state_iter)
7302 values.append(buf.getvalue())
7305 write_full(writer, v)
7307 def _encode_cer(self, writer):
7308 write_full(writer, self.tag + LENINDEF)
7309 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7310 write_full(writer, v)
7311 write_full(writer, EOC)
7313 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7314 return super(SetOf, self)._decode(
7321 ordering_check=True,
7325 def obj_by_path(pypath): # pragma: no cover
7326 """Import object specified as string Python path
7328 Modules must be separated from classes/functions with ``:``.
7330 >>> obj_by_path("foo.bar:Baz")
7331 <class 'foo.bar.Baz'>
7332 >>> obj_by_path("foo.bar:Baz.boo")
7333 <classmethod 'foo.bar.Baz.boo'>
7335 mod, objs = pypath.rsplit(":", 1)
7336 from importlib import import_module
7337 obj = import_module(mod)
7338 for obj_name in objs.split("."):
7339 obj = getattr(obj, obj_name)
7343 def generic_decoder(): # pragma: no cover
7344 # All of this below is a big hack with self references
7345 choice = PrimitiveTypes()
7346 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7347 choice.specs["SetOf"] = SetOf(schema=choice)
7348 for i in six_xrange(31):
7349 choice.specs["SequenceOf%d" % i] = SequenceOf(
7353 choice.specs["Any"] = Any()
7355 # Class name equals to type name, to omit it from output
7356 class SEQUENCEOF(SequenceOf):
7364 with_decode_path=False,
7365 decode_path_only=(),
7368 def _pprint_pps(pps):
7370 if hasattr(pp, "_fields"):
7372 decode_path_only != () and
7373 pp.decode_path[:len(decode_path_only)] != decode_path_only
7376 if pp.asn1_type_name == Choice.asn1_type_name:
7378 pp_kwargs = pp._asdict()
7379 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7380 pp = _pp(**pp_kwargs)
7381 yield pp_console_row(
7386 with_colours=with_colours,
7387 with_decode_path=with_decode_path,
7388 decode_path_len_decrease=len(decode_path_only),
7390 for row in pp_console_blob(
7392 decode_path_len_decrease=len(decode_path_only),
7396 for row in _pprint_pps(pp):
7398 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7399 return SEQUENCEOF(), pprint_any
7402 def main(): # pragma: no cover
7404 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7405 parser.add_argument(
7409 help="Skip that number of bytes from the beginning",
7411 parser.add_argument(
7413 help="Python paths to dictionary with OIDs, comma separated",
7415 parser.add_argument(
7417 help="Python path to schema definition to use",
7419 parser.add_argument(
7420 "--defines-by-path",
7421 help="Python path to decoder's defines_by_path",
7423 parser.add_argument(
7425 action="store_true",
7426 help="Disallow BER encoding",
7428 parser.add_argument(
7429 "--print-decode-path",
7430 action="store_true",
7431 help="Print decode paths",
7433 parser.add_argument(
7434 "--decode-path-only",
7435 help="Print only specified decode path",
7437 parser.add_argument(
7439 action="store_true",
7440 help="Allow explicit tag out-of-bound",
7442 parser.add_argument(
7444 action="store_true",
7445 help="Turn on event generation mode",
7447 parser.add_argument(
7449 type=argparse.FileType("rb"),
7450 help="Path to BER/CER/DER file you want to decode",
7452 args = parser.parse_args()
7454 args.RAWFile.seek(args.skip)
7455 raw = memoryview(args.RAWFile.read())
7456 args.RAWFile.close()
7458 raw = file_mmaped(args.RAWFile)[args.skip:]
7460 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7461 if args.oids else ()
7463 from functools import partial
7465 schema = obj_by_path(args.schema)
7466 pprinter = partial(pprint, big_blobs=True)
7468 schema, pprinter = generic_decoder()
7470 "bered": not args.nobered,
7471 "allow_expl_oob": args.allow_expl_oob,
7473 if args.defines_by_path is not None:
7474 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7475 from os import environ
7479 with_colours=environ.get("NO_COLOR") is None,
7480 with_decode_path=args.print_decode_path,
7482 () if args.decode_path_only is None else
7483 tuple(args.decode_path_only.split(":"))
7487 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7488 print(pprinter(obj, decode_path=decode_path))
7490 obj, tail = schema().decode(raw, ctx=ctx)
7491 print(pprinter(obj))
7493 print("\nTrailing data: %s" % hexenc(tail))
7496 if __name__ == "__main__":