3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
670 SEQUENCE/SET values with DEFAULT specified are automatically decoded
678 POSIX compliant systems have ``mmap`` syscall, giving ability to work
679 the memory mapped file. You can deal with the file like it was an
680 ordinary binary string, allowing you not to load it to the memory first.
681 Also you can use them as an input for OCTET STRING, taking no Python
682 memory for their storage.
684 There is convenient :py:func:`pyderasn.file_mmaped` function that
685 creates read-only memoryview on the file contents::
687 with open("huge", "rb") as fd:
688 raw = file_mmaped(fd)
689 obj = Something.decode(raw)
693 mmap-ed files in Python2.7 does not implement buffer protocol, so
694 memoryview won't work on them.
698 mmap maps the **whole** file. So it plays no role if you seek-ed it
699 before. Take the slice of the resulting memoryview with required
704 If you use ZFS as underlying storage, then pay attention that
705 currently most platforms does not deal good with ZFS ARC and ordinary
706 page cache used for mmaps. It can take twice the necessary size in
707 the memory: both in page cache and ZFS ARC.
712 We can parse any kind of data now, but how can we produce files
713 streamingly, without storing their encoded representation in memory?
714 SEQUENCE by default encodes in memory all its values, joins them in huge
715 binary string, just to know the exact size of SEQUENCE's value for
716 encoding it in TLV. DER requires you to know all exact sizes of the
719 You can use CER encoding mode, that slightly differs from the DER, but
720 does not require exact sizes knowledge, allowing streaming encoding
721 directly to some writer/buffer. Just use
722 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
723 encoded data will flow::
725 opener = io.open if PY2 else open
726 with opener("result", "wb") as fd:
727 obj.encode_cer(fd.write)
732 obj.encode_cer(buf.write)
734 If you do not want to create in-memory buffer every time, then you can
735 use :py:func:`pyderasn.encode_cer` function::
737 data = encode_cer(obj)
739 Remember that CER is **not valid** DER in most cases, so you **have to**
740 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
741 decoding. Also currently there is **no** validation that provided CER is
742 valid one -- you are sure that it has only valid BER encoding.
746 SET OF values can not be streamingly encoded, because they are
747 required to be sorted byte-by-byte. Big SET OF values still will take
748 much memory. Use neither SET nor SET OF values, as modern ASN.1
751 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
752 OCTET STRINGs! They will be streamingly copied from underlying file to
753 the buffer using 1 KB chunks.
755 Some structures require that some of the elements have to be forcefully
756 DER encoded. For example ``SignedData`` CMS requires you to encode
757 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
758 encode everything else in BER. You can tell any of the structures to be
759 forcefully encoded in DER during CER encoding, by specifying
760 ``der_forced=True`` attribute::
762 class Certificate(Sequence):
766 class SignedAttributes(SetOf):
771 .. _agg_octet_string:
776 In most cases, huge quantity of binary data is stored as OCTET STRING.
777 CER encoding splits it on 1 KB chunks. BER allows splitting on various
778 levels of chunks inclusion::
780 SOME STRING[CONSTRUCTED]
781 OCTET STRING[CONSTRUCTED]
782 OCTET STRING[PRIMITIVE]
784 OCTET STRING[PRIMITIVE]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[CONSTRUCTED]
796 OCTET STRING[CONSTRUCTED]
797 OCTET STRING[PRIMITIVE]
800 You can not just take the offset and some ``.vlen`` of the STRING and
801 treat it as the payload. If you decode it without
802 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
803 and ``bytes()`` will give the whole payload contents.
805 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
806 small memory footprint. There is convenient
807 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
808 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
809 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
810 SHA512 digest of its ``eContent``::
812 fd = open("data.p7m", "rb")
813 raw = file_mmaped(fd)
814 ctx = {"bered": True}
815 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
816 if decode_path == ("content",):
820 raise ValueError("no content found")
821 hasher_state = sha512()
823 hasher_state.update(data)
825 evgens = SignedData().decode_evgen(
826 raw[content.offset:],
827 offset=content.offset,
830 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
832 digest = hasher_state.digest()
834 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
835 copy the payload (without BER/CER encoding interleaved overhead) in it.
836 Virtually it won't take memory more than for keeping small structures
837 and 1 KB binary chunks.
841 SEQUENCE OF iterators
842 _____________________
844 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
845 classes. The only difference with providing the full list of objects, is
846 that type and bounds checking is done during encoding process. Also
847 sequence's value will be emptied after encoding, forcing you to set its
850 This is very useful when you have to create some huge objects, like
851 CRLs, with thousands and millions of entities inside. You can write the
852 generator taking necessary data from the database and giving the
853 ``RevokedCertificate`` objects. Only binary representation of that
854 objects will take memory during DER encoding.
859 There is ability to do 2-pass encoding to DER, writing results directly
860 to specified writer (buffer, file, whatever). It could be 1.5+ times
861 slower than ordinary encoding, but it takes little memory for 1st pass
862 state storing. For example, 1st pass state for CACert.org's CRL with
863 ~416K of certificate entries takes nearly 3.5 MB of memory.
864 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
865 nearly 0.5 KB of memory.
867 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
868 iterators <seqof-iterators>` and write directly to opened file, then
869 there is very small memory footprint.
871 1st pass traverses through all the objects of the structure and returns
872 the size of DER encoded structure, together with 1st pass state object.
873 That state contains precalculated lengths for various objects inside the
878 fulllen, state = obj.encode1st()
880 2nd pass takes the writer and 1st pass state. It traverses through all
881 the objects again, but writes their encoded representation to the writer.
885 opener = io.open if PY2 else open
886 with opener("result", "wb") as fd:
887 obj.encode2nd(fd.write, iter(state))
891 You **MUST NOT** use 1st pass state if anything is changed in the
892 objects. It is intended to be used immediately after 1st pass is
895 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
896 have to reinitialize the values after the 1st pass. And you **have to**
897 be sure that the iterator gives exactly the same values as previously.
898 Yes, you have to run your iterator twice -- because this is two pass
901 If you want to encode to the memory, then you can use convenient
902 :py:func:`pyderasn.encode2pass` helper.
906 .. autoclass:: pyderasn.Obj
914 .. autoclass:: pyderasn.Boolean
919 .. autoclass:: pyderasn.Integer
920 :members: __init__, named, tohex
924 .. autoclass:: pyderasn.BitString
925 :members: __init__, bit_len, named
929 .. autoclass:: pyderasn.OctetString
934 .. autoclass:: pyderasn.Null
939 .. autoclass:: pyderasn.ObjectIdentifier
944 .. autoclass:: pyderasn.Enumerated
948 .. autoclass:: pyderasn.CommonString
952 .. autoclass:: pyderasn.NumericString
956 .. autoclass:: pyderasn.PrintableString
957 :members: __init__, allow_asterisk, allow_ampersand
961 .. autoclass:: pyderasn.UTCTime
962 :members: __init__, todatetime
966 .. autoclass:: pyderasn.GeneralizedTime
967 :members: __init__, todatetime
974 .. autoclass:: pyderasn.Choice
975 :members: __init__, choice, value
979 .. autoclass:: PrimitiveTypes
983 .. autoclass:: pyderasn.Any
991 .. autoclass:: pyderasn.Sequence
996 .. autoclass:: pyderasn.Set
1001 .. autoclass:: pyderasn.SequenceOf
1006 .. autoclass:: pyderasn.SetOf
1012 .. autofunction:: pyderasn.abs_decode_path
1013 .. autofunction:: pyderasn.agg_octet_string
1014 .. autofunction:: pyderasn.colonize_hex
1015 .. autofunction:: pyderasn.encode2pass
1016 .. autofunction:: pyderasn.encode_cer
1017 .. autofunction:: pyderasn.file_mmaped
1018 .. autofunction:: pyderasn.hexenc
1019 .. autofunction:: pyderasn.hexdec
1020 .. autofunction:: pyderasn.tag_encode
1021 .. autofunction:: pyderasn.tag_decode
1022 .. autofunction:: pyderasn.tag_ctxp
1023 .. autofunction:: pyderasn.tag_ctxc
1024 .. autoclass:: pyderasn.DecodeError
1026 .. autoclass:: pyderasn.NotEnoughData
1027 .. autoclass:: pyderasn.ExceedingData
1028 .. autoclass:: pyderasn.LenIndefForm
1029 .. autoclass:: pyderasn.TagMismatch
1030 .. autoclass:: pyderasn.InvalidLength
1031 .. autoclass:: pyderasn.InvalidOID
1032 .. autoclass:: pyderasn.ObjUnknown
1033 .. autoclass:: pyderasn.ObjNotReady
1034 .. autoclass:: pyderasn.InvalidValueType
1035 .. autoclass:: pyderasn.BoundsError
1042 You can decode DER/BER files using command line abilities::
1044 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1046 If there is no schema for your file, then you can try parsing it without,
1047 but of course IMPLICIT tags will often make it impossible. But result is
1048 good enough for the certificate above::
1050 $ python -m pyderasn path/to/file
1051 0 [1,3,1604] . >: SEQUENCE OF
1052 4 [1,3,1453] . . >: SEQUENCE OF
1053 8 [0,0, 5] . . . . >: [0] ANY
1054 . . . . . A0:03:02:01:02
1055 13 [1,1, 3] . . . . >: INTEGER 61595
1056 18 [1,1, 13] . . . . >: SEQUENCE OF
1057 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1058 31 [1,1, 0] . . . . . . >: NULL
1059 33 [1,3, 274] . . . . >: SEQUENCE OF
1060 37 [1,1, 11] . . . . . . >: SET OF
1061 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1062 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1063 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1065 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1066 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1067 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1068 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1069 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1070 . . . . . . . . . 61:2E:63:6F:6D:2F
1071 1461 [1,1, 13] . . >: SEQUENCE OF
1072 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1073 1474 [1,1, 0] . . . . >: NULL
1074 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1075 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1076 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1082 If you have got dictionaries with ObjectIdentifiers, like example one
1083 from ``tests/test_crts.py``::
1086 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1087 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1089 "2.5.4.10": "id-at-organizationName",
1090 "2.5.4.11": "id-at-organizationalUnitName",
1093 then you can pass it to pretty printer to see human readable OIDs::
1095 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1097 37 [1,1, 11] . . . . . . >: SET OF
1098 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1099 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1100 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1101 50 [1,1, 18] . . . . . . >: SET OF
1102 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1103 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1104 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1105 70 [1,1, 18] . . . . . . >: SET OF
1106 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1107 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1108 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1114 Each decoded element has so-called decode path: sequence of structure
1115 names it is passing during the decode process. Each element has its own
1116 unique path inside the whole ASN.1 tree. You can print it out with
1117 ``--print-decode-path`` option::
1119 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1120 0 [1,3,1604] Certificate SEQUENCE []
1121 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1122 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1123 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1124 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1125 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1126 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1128 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1129 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1130 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1131 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1132 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1133 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1134 . . . . . . . 13:02:45:53
1135 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1138 Now you can print only the specified tree, for example signature algorithm::
1140 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1141 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1142 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1143 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1147 from array import array
1148 from codecs import getdecoder
1149 from codecs import getencoder
1150 from collections import namedtuple
1151 from collections import OrderedDict
1152 from copy import copy
1153 from datetime import datetime
1154 from datetime import timedelta
1155 from io import BytesIO
1156 from math import ceil
1157 from mmap import mmap
1158 from mmap import PROT_READ
1159 from operator import attrgetter
1160 from string import ascii_letters
1161 from string import digits
1162 from sys import maxsize as sys_maxsize
1163 from sys import version_info
1164 from unicodedata import category as unicat
1166 from six import add_metaclass
1167 from six import binary_type
1168 from six import byte2int
1169 from six import indexbytes
1170 from six import int2byte
1171 from six import integer_types
1172 from six import iterbytes
1173 from six import iteritems
1174 from six import itervalues
1176 from six import string_types
1177 from six import text_type
1178 from six import unichr as six_unichr
1179 from six.moves import xrange as six_xrange
1183 from termcolor import colored
1184 except ImportError: # pragma: no cover
1185 def colored(what, *args, **kwargs):
1236 "TagClassApplication",
1239 "TagClassUniversal",
1240 "TagFormConstructed",
1251 TagClassUniversal = 0
1252 TagClassApplication = 1 << 6
1253 TagClassContext = 1 << 7
1254 TagClassPrivate = 1 << 6 | 1 << 7
1255 TagFormPrimitive = 0
1256 TagFormConstructed = 1 << 5
1258 TagClassContext: "",
1259 TagClassApplication: "APPLICATION ",
1260 TagClassPrivate: "PRIVATE ",
1261 TagClassUniversal: "UNIV ",
1265 LENINDEF = b"\x80" # length indefinite mark
1266 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1267 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1268 SET01 = frozenset("01")
1269 DECIMALS = frozenset(digits)
1270 DECIMAL_SIGNS = ".,"
1271 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1274 def file_mmaped(fd):
1275 """Make mmap-ed memoryview for reading from file
1277 :param fd: file object
1278 :returns: memoryview over read-only mmap-ing of the whole file
1280 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1283 if not set(value) <= DECIMALS:
1284 raise ValueError("non-pure integer")
1287 def fractions2float(fractions_raw):
1288 pureint(fractions_raw)
1289 return float("0." + fractions_raw)
1292 def get_def_by_path(defines_by_path, sub_decode_path):
1293 """Get define by decode path
1295 for path, define in defines_by_path:
1296 if len(path) != len(sub_decode_path):
1298 for p1, p2 in zip(path, sub_decode_path):
1299 if (not p1 is any) and (p1 != p2):
1305 ########################################################################
1307 ########################################################################
1309 class ASN1Error(ValueError):
1313 class DecodeError(ASN1Error):
1314 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1316 :param str msg: reason of decode failing
1317 :param klass: optional exact DecodeError inherited class (like
1318 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1319 :py:exc:`InvalidLength`)
1320 :param decode_path: tuple of strings. It contains human
1321 readable names of the fields through which
1322 decoding process has passed
1323 :param int offset: binary offset where failure happened
1325 super(DecodeError, self).__init__()
1328 self.decode_path = decode_path
1329 self.offset = offset
1334 "" if self.klass is None else self.klass.__name__,
1336 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1337 if len(self.decode_path) > 0 else ""
1339 ("(at %d)" % self.offset) if self.offset > 0 else "",
1345 return "%s(%s)" % (self.__class__.__name__, self)
1348 class NotEnoughData(DecodeError):
1352 class ExceedingData(ASN1Error):
1353 def __init__(self, nbytes):
1354 super(ExceedingData, self).__init__()
1355 self.nbytes = nbytes
1358 return "%d trailing bytes" % self.nbytes
1361 return "%s(%s)" % (self.__class__.__name__, self)
1364 class LenIndefForm(DecodeError):
1368 class TagMismatch(DecodeError):
1372 class InvalidLength(DecodeError):
1376 class InvalidOID(DecodeError):
1380 class ObjUnknown(ASN1Error):
1381 def __init__(self, name):
1382 super(ObjUnknown, self).__init__()
1386 return "object is unknown: %s" % self.name
1389 return "%s(%s)" % (self.__class__.__name__, self)
1392 class ObjNotReady(ASN1Error):
1393 def __init__(self, name):
1394 super(ObjNotReady, self).__init__()
1398 return "object is not ready: %s" % self.name
1401 return "%s(%s)" % (self.__class__.__name__, self)
1404 class InvalidValueType(ASN1Error):
1405 def __init__(self, expected_types):
1406 super(InvalidValueType, self).__init__()
1407 self.expected_types = expected_types
1410 return "invalid value type, expected: %s" % ", ".join(
1411 [repr(t) for t in self.expected_types]
1415 return "%s(%s)" % (self.__class__.__name__, self)
1418 class BoundsError(ASN1Error):
1419 def __init__(self, bound_min, value, bound_max):
1420 super(BoundsError, self).__init__()
1421 self.bound_min = bound_min
1423 self.bound_max = bound_max
1426 return "unsatisfied bounds: %s <= %s <= %s" % (
1433 return "%s(%s)" % (self.__class__.__name__, self)
1436 ########################################################################
1438 ########################################################################
1440 _hexdecoder = getdecoder("hex")
1441 _hexencoder = getencoder("hex")
1445 """Binary data to hexadecimal string convert
1447 return _hexdecoder(data)[0]
1451 """Hexadecimal string to binary data convert
1453 return _hexencoder(data)[0].decode("ascii")
1456 def int_bytes_len(num, byte_len=8):
1459 return int(ceil(float(num.bit_length()) / byte_len))
1462 def zero_ended_encode(num):
1463 octets = bytearray(int_bytes_len(num, 7))
1465 octets[i] = num & 0x7F
1469 octets[i] = 0x80 | (num & 0x7F)
1472 return bytes(octets)
1475 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1476 """Encode tag to binary form
1478 :param int num: tag's number
1479 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1480 :py:data:`pyderasn.TagClassContext`,
1481 :py:data:`pyderasn.TagClassApplication`,
1482 :py:data:`pyderasn.TagClassPrivate`)
1483 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1484 :py:data:`pyderasn.TagFormConstructed`)
1488 return int2byte(klass | form | num)
1489 # [XX|X|11111][1.......][1.......] ... [0.......]
1490 return int2byte(klass | form | 31) + zero_ended_encode(num)
1493 def tag_decode(tag):
1494 """Decode tag from binary form
1498 No validation is performed, assuming that it has already passed.
1500 It returns tuple with three integers, as
1501 :py:func:`pyderasn.tag_encode` accepts.
1503 first_octet = byte2int(tag)
1504 klass = first_octet & 0xC0
1505 form = first_octet & 0x20
1506 if first_octet & 0x1F < 0x1F:
1507 return (klass, form, first_octet & 0x1F)
1509 for octet in iterbytes(tag[1:]):
1512 return (klass, form, num)
1516 """Create CONTEXT PRIMITIVE tag
1518 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1522 """Create CONTEXT CONSTRUCTED tag
1524 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1527 def tag_strip(data):
1528 """Take off tag from the data
1530 :returns: (encoded tag, tag length, remaining data)
1533 raise NotEnoughData("no data at all")
1534 if byte2int(data) & 0x1F < 31:
1535 return data[:1], 1, data[1:]
1540 raise DecodeError("unfinished tag")
1541 if indexbytes(data, i) & 0x80 == 0:
1544 return data[:i], i, data[i:]
1550 octets = bytearray(int_bytes_len(l) + 1)
1551 octets[0] = 0x80 | (len(octets) - 1)
1552 for i in six_xrange(len(octets) - 1, 0, -1):
1553 octets[i] = l & 0xFF
1555 return bytes(octets)
1558 def len_decode(data):
1561 :returns: (decoded length, length's length, remaining data)
1562 :raises LenIndefForm: if indefinite form encoding is met
1565 raise NotEnoughData("no data at all")
1566 first_octet = byte2int(data)
1567 if first_octet & 0x80 == 0:
1568 return first_octet, 1, data[1:]
1569 octets_num = first_octet & 0x7F
1570 if octets_num + 1 > len(data):
1571 raise NotEnoughData("encoded length is longer than data")
1573 raise LenIndefForm()
1574 if byte2int(data[1:]) == 0:
1575 raise DecodeError("leading zeros")
1577 for v in iterbytes(data[1:1 + octets_num]):
1580 raise DecodeError("long form instead of short one")
1581 return l, 1 + octets_num, data[1 + octets_num:]
1584 LEN0 = len_encode(0)
1585 LEN1 = len_encode(1)
1586 LEN1K = len_encode(1000)
1590 """How many bytes length field will take
1594 if l < 256: # 1 << 8
1596 if l < 65536: # 1 << 16
1598 if l < 16777216: # 1 << 24
1600 if l < 4294967296: # 1 << 32
1602 if l < 1099511627776: # 1 << 40
1604 if l < 281474976710656: # 1 << 48
1606 if l < 72057594037927936: # 1 << 56
1608 raise OverflowError("too big length")
1611 def write_full(writer, data):
1612 """Fully write provided data
1614 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1616 BytesIO does not guarantee that the whole data will be written at
1617 once. That function write everything provided, raising an error if
1618 ``writer`` returns None.
1620 data = memoryview(data)
1622 while written != len(data):
1623 n = writer(data[written:])
1625 raise ValueError("can not write to buf")
1629 # If it is 64-bit system, then use compact 64-bit array of unsigned
1630 # longs. Use an ordinary list with universal integers otherwise, that
1632 if sys_maxsize > 2 ** 32:
1633 def state_2pass_new():
1636 def state_2pass_new():
1640 ########################################################################
1642 ########################################################################
1644 class AutoAddSlots(type):
1645 def __new__(cls, name, bases, _dict):
1646 _dict["__slots__"] = _dict.get("__slots__", ())
1647 return type.__new__(cls, name, bases, _dict)
1650 BasicState = namedtuple("BasicState", (
1663 ), **NAMEDTUPLE_KWARGS)
1666 @add_metaclass(AutoAddSlots)
1668 """Common ASN.1 object class
1670 All ASN.1 types are inherited from it. It has metaclass that
1671 automatically adds ``__slots__`` to all inherited classes.
1696 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1697 self._expl = getattr(self, "expl", None) if expl is None else expl
1698 if self.tag != self.tag_default and self._expl is not None:
1699 raise ValueError("implicit and explicit tags can not be set simultaneously")
1700 if self.tag is None:
1701 self._tag_order = None
1703 tag_class, _, tag_num = tag_decode(
1704 self.tag if self._expl is None else self._expl
1706 self._tag_order = (tag_class, tag_num)
1707 if default is not None:
1709 self.optional = optional
1710 self.offset, self.llen, self.vlen = _decoded
1712 self.expl_lenindef = False
1713 self.lenindef = False
1714 self.ber_encoded = False
1717 def ready(self): # pragma: no cover
1718 """Is object ready to be encoded?
1720 raise NotImplementedError()
1722 def _assert_ready(self):
1724 raise ObjNotReady(self.__class__.__name__)
1728 """Is either object or any elements inside is BER encoded?
1730 return self.expl_lenindef or self.lenindef or self.ber_encoded
1734 """Is object decoded?
1736 return (self.llen + self.vlen) > 0
1738 def __getstate__(self): # pragma: no cover
1739 """Used for making safe to be mutable pickleable copies
1741 raise NotImplementedError()
1743 def __setstate__(self, state):
1744 if state.version != __version__:
1745 raise ValueError("data is pickled by different PyDERASN version")
1746 self.tag = state.tag
1747 self._tag_order = state.tag_order
1748 self._expl = state.expl
1749 self.default = state.default
1750 self.optional = state.optional
1751 self.offset = state.offset
1752 self.llen = state.llen
1753 self.vlen = state.vlen
1754 self.expl_lenindef = state.expl_lenindef
1755 self.lenindef = state.lenindef
1756 self.ber_encoded = state.ber_encoded
1759 def tag_order(self):
1760 """Tag's (class, number) used for DER/CER sorting
1762 return self._tag_order
1765 def tag_order_cer(self):
1766 return self.tag_order
1770 """.. seealso:: :ref:`decoding`
1772 return len(self.tag)
1776 """.. seealso:: :ref:`decoding`
1778 return self.tlen + self.llen + self.vlen
1780 def __str__(self): # pragma: no cover
1781 return self.__bytes__() if PY2 else self.__unicode__()
1783 def __ne__(self, their):
1784 return not(self == their)
1786 def __gt__(self, their): # pragma: no cover
1787 return not(self < their)
1789 def __le__(self, their): # pragma: no cover
1790 return (self == their) or (self < their)
1792 def __ge__(self, their): # pragma: no cover
1793 return (self == their) or (self > their)
1795 def _encode(self): # pragma: no cover
1796 raise NotImplementedError()
1798 def _encode_cer(self, writer):
1799 write_full(writer, self._encode())
1801 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1802 yield NotImplemented
1804 def _encode1st(self, state):
1805 raise NotImplementedError()
1807 def _encode2nd(self, writer, state_iter):
1808 raise NotImplementedError()
1811 """DER encode the structure
1813 :returns: DER representation
1815 raw = self._encode()
1816 if self._expl is None:
1818 return b"".join((self._expl, len_encode(len(raw)), raw))
1820 def encode1st(self, state=None):
1821 """Do the 1st pass of 2-pass encoding
1823 :rtype: (int, array("L"))
1824 :returns: full length of encoded data and precalculated various
1828 state = state_2pass_new()
1829 if self._expl is None:
1830 return self._encode1st(state)
1832 idx = len(state) - 1
1833 vlen, _ = self._encode1st(state)
1835 fulllen = len(self._expl) + len_size(vlen) + vlen
1836 return fulllen, state
1838 def encode2nd(self, writer, state_iter):
1839 """Do the 2nd pass of 2-pass encoding
1841 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1842 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1844 if self._expl is None:
1845 self._encode2nd(writer, state_iter)
1847 write_full(writer, self._expl + len_encode(next(state_iter)))
1848 self._encode2nd(writer, state_iter)
1850 def encode_cer(self, writer):
1851 """CER encode the structure to specified writer
1853 :param writer: must comply with ``io.RawIOBase.write``
1854 behaviour. It takes slice to be written and
1855 returns number of bytes processed. If it returns
1856 None, then exception will be raised
1858 if self._expl is not None:
1859 write_full(writer, self._expl + LENINDEF)
1860 if getattr(self, "der_forced", False):
1861 write_full(writer, self._encode())
1863 self._encode_cer(writer)
1864 if self._expl is not None:
1865 write_full(writer, EOC)
1867 def hexencode(self):
1868 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1870 return hexenc(self.encode())
1880 _ctx_immutable=True,
1884 :param data: either binary or memoryview
1885 :param int offset: initial data's offset
1886 :param bool leavemm: do we need to leave memoryview of remaining
1887 data as is, or convert it to bytes otherwise
1888 :param decode_path: current decode path (tuples of strings,
1889 possibly with DecodePathDefBy) with will be
1890 the root for all underlying objects
1891 :param ctx: optional :ref:`context <ctx>` governing decoding process
1892 :param bool tag_only: decode only the tag, without length and
1893 contents (used only in Choice and Set
1894 structures, trying to determine if tag satisfies
1896 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1898 :returns: (Obj, remaining data)
1900 .. seealso:: :ref:`decoding`
1902 result = next(self.decode_evgen(
1914 _, obj, tail = result
1925 _ctx_immutable=True,
1928 """Decode with evgen mode on
1930 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1931 it returns the generator producing ``(decode_path, obj, tail)``
1933 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1937 elif _ctx_immutable:
1939 tlv = memoryview(data)
1942 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1945 if self._expl is None:
1946 for result in self._decode(
1949 decode_path=decode_path,
1952 evgen_mode=_evgen_mode,
1957 _decode_path, obj, tail = result
1958 if not _decode_path is decode_path:
1962 t, tlen, lv = tag_strip(tlv)
1963 except DecodeError as err:
1964 raise err.__class__(
1966 klass=self.__class__,
1967 decode_path=decode_path,
1972 klass=self.__class__,
1973 decode_path=decode_path,
1977 l, llen, v = len_decode(lv)
1978 except LenIndefForm as err:
1979 if not ctx.get("bered", False):
1980 raise err.__class__(
1982 klass=self.__class__,
1983 decode_path=decode_path,
1987 offset += tlen + llen
1988 for result in self._decode(
1991 decode_path=decode_path,
1994 evgen_mode=_evgen_mode,
1996 if tag_only: # pragma: no cover
1999 _decode_path, obj, tail = result
2000 if not _decode_path is decode_path:
2002 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2003 if eoc_expected.tobytes() != EOC:
2006 klass=self.__class__,
2007 decode_path=decode_path,
2011 obj.expl_lenindef = True
2012 except DecodeError as err:
2013 raise err.__class__(
2015 klass=self.__class__,
2016 decode_path=decode_path,
2021 raise NotEnoughData(
2022 "encoded length is longer than data",
2023 klass=self.__class__,
2024 decode_path=decode_path,
2027 for result in self._decode(
2029 offset=offset + tlen + llen,
2030 decode_path=decode_path,
2033 evgen_mode=_evgen_mode,
2035 if tag_only: # pragma: no cover
2038 _decode_path, obj, tail = result
2039 if not _decode_path is decode_path:
2041 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2043 "explicit tag out-of-bound, longer than data",
2044 klass=self.__class__,
2045 decode_path=decode_path,
2048 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2050 def decod(self, data, offset=0, decode_path=(), ctx=None):
2051 """Decode the data, check that tail is empty
2053 :raises ExceedingData: if tail is not empty
2055 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2056 (decode without tail) that also checks that there is no
2059 obj, tail = self.decode(
2062 decode_path=decode_path,
2067 raise ExceedingData(len(tail))
2070 def hexdecode(self, data, *args, **kwargs):
2071 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2073 return self.decode(hexdec(data), *args, **kwargs)
2075 def hexdecod(self, data, *args, **kwargs):
2076 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2078 return self.decod(hexdec(data), *args, **kwargs)
2082 """.. seealso:: :ref:`decoding`
2084 return self._expl is not None
2088 """.. seealso:: :ref:`decoding`
2093 def expl_tlen(self):
2094 """.. seealso:: :ref:`decoding`
2096 return len(self._expl)
2099 def expl_llen(self):
2100 """.. seealso:: :ref:`decoding`
2102 if self.expl_lenindef:
2104 return len(len_encode(self.tlvlen))
2107 def expl_offset(self):
2108 """.. seealso:: :ref:`decoding`
2110 return self.offset - self.expl_tlen - self.expl_llen
2113 def expl_vlen(self):
2114 """.. seealso:: :ref:`decoding`
2119 def expl_tlvlen(self):
2120 """.. seealso:: :ref:`decoding`
2122 return self.expl_tlen + self.expl_llen + self.expl_vlen
2125 def fulloffset(self):
2126 """.. seealso:: :ref:`decoding`
2128 return self.expl_offset if self.expled else self.offset
2132 """.. seealso:: :ref:`decoding`
2134 return self.expl_tlvlen if self.expled else self.tlvlen
2136 def pps_lenindef(self, decode_path):
2137 if self.lenindef and not (
2138 getattr(self, "defined", None) is not None and
2139 self.defined[1].lenindef
2142 asn1_type_name="EOC",
2144 decode_path=decode_path,
2146 self.offset + self.tlvlen -
2147 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2155 if self.expl_lenindef:
2157 asn1_type_name="EOC",
2158 obj_name="EXPLICIT",
2159 decode_path=decode_path,
2160 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2169 def encode_cer(obj):
2170 """Encode to CER in memory buffer
2172 :returns bytes: memory buffer contents
2175 obj.encode_cer(buf.write)
2176 return buf.getvalue()
2179 def encode2pass(obj):
2180 """Encode (2-pass mode) to DER in memory buffer
2182 :returns bytes: memory buffer contents
2185 _, state = obj.encode1st()
2186 obj.encode2nd(buf.write, iter(state))
2187 return buf.getvalue()
2190 class DecodePathDefBy(object):
2191 """DEFINED BY representation inside decode path
2193 __slots__ = ("defined_by",)
2195 def __init__(self, defined_by):
2196 self.defined_by = defined_by
2198 def __ne__(self, their):
2199 return not(self == their)
2201 def __eq__(self, their):
2202 if not isinstance(their, self.__class__):
2204 return self.defined_by == their.defined_by
2207 return "DEFINED BY " + str(self.defined_by)
2210 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2213 ########################################################################
2215 ########################################################################
2217 PP = namedtuple("PP", (
2240 ), **NAMEDTUPLE_KWARGS)
2245 asn1_type_name="unknown",
2262 expl_lenindef=False,
2293 def _colourize(what, colour, with_colours, attrs=("bold",)):
2294 return colored(what, colour, attrs=attrs) if with_colours else what
2297 def colonize_hex(hexed):
2298 """Separate hexadecimal string with colons
2300 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2309 with_decode_path=False,
2310 decode_path_len_decrease=0,
2317 " " if pp.expl_offset is None else
2318 ("-%d" % (pp.offset - pp.expl_offset))
2320 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2322 col = _colourize(col, "red", with_colours, ())
2323 col += _colourize("B", "red", with_colours) if pp.bered else " "
2325 col = "[%d,%d,%4d]%s" % (
2329 LENINDEF_PP_CHAR if pp.lenindef else " "
2331 col = _colourize(col, "green", with_colours, ())
2333 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2334 if decode_path_len > 0:
2335 cols.append(" ." * decode_path_len)
2336 ent = pp.decode_path[-1]
2337 if isinstance(ent, DecodePathDefBy):
2338 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2339 value = str(ent.defined_by)
2342 len(oid_maps) > 0 and
2343 ent.defined_by.asn1_type_name ==
2344 ObjectIdentifier.asn1_type_name
2346 for oid_map in oid_maps:
2347 oid_name = oid_map.get(value)
2348 if oid_name is not None:
2349 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2351 if oid_name is None:
2352 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2354 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2355 if pp.expl is not None:
2356 klass, _, num = pp.expl
2357 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2358 cols.append(_colourize(col, "blue", with_colours))
2359 if pp.impl is not None:
2360 klass, _, num = pp.impl
2361 col = "[%s%d]" % (TagClassReprs[klass], num)
2362 cols.append(_colourize(col, "blue", with_colours))
2363 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2364 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2366 cols.append(_colourize("BER", "red", with_colours))
2367 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2368 if pp.value is not None:
2370 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2372 len(oid_maps) > 0 and
2373 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2375 for oid_map in oid_maps:
2376 oid_name = oid_map.get(value)
2377 if oid_name is not None:
2378 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2380 if pp.asn1_type_name == Integer.asn1_type_name:
2381 cols.append(_colourize(
2382 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2385 if pp.blob.__class__ == binary_type:
2386 cols.append(hexenc(pp.blob))
2387 elif pp.blob.__class__ == tuple:
2388 cols.append(", ".join(pp.blob))
2390 cols.append(_colourize("OPTIONAL", "red", with_colours))
2392 cols.append(_colourize("DEFAULT", "red", with_colours))
2393 if with_decode_path:
2394 cols.append(_colourize(
2395 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2399 return " ".join(cols)
2402 def pp_console_blob(pp, decode_path_len_decrease=0):
2403 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2404 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2405 if decode_path_len > 0:
2406 cols.append(" ." * (decode_path_len + 1))
2407 if pp.blob.__class__ == binary_type:
2408 blob = hexenc(pp.blob).upper()
2409 for i in six_xrange(0, len(blob), 32):
2410 chunk = blob[i:i + 32]
2411 yield " ".join(cols + [colonize_hex(chunk)])
2412 elif pp.blob.__class__ == tuple:
2413 yield " ".join(cols + [", ".join(pp.blob)])
2421 with_decode_path=False,
2422 decode_path_only=(),
2425 """Pretty print object
2427 :param Obj obj: object you want to pretty print
2428 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2429 Its human readable form is printed when OID is met
2430 :param big_blobs: if large binary objects are met (like OctetString
2431 values), do we need to print them too, on separate
2433 :param with_colours: colourize output, if ``termcolor`` library
2435 :param with_decode_path: print decode path
2436 :param decode_path_only: print only that specified decode path
2438 def _pprint_pps(pps):
2440 if hasattr(pp, "_fields"):
2442 decode_path_only != () and
2444 str(p) for p in pp.decode_path[:len(decode_path_only)]
2445 ) != decode_path_only
2449 yield pp_console_row(
2454 with_colours=with_colours,
2455 with_decode_path=with_decode_path,
2456 decode_path_len_decrease=len(decode_path_only),
2458 for row in pp_console_blob(
2460 decode_path_len_decrease=len(decode_path_only),
2464 yield pp_console_row(
2469 with_colours=with_colours,
2470 with_decode_path=with_decode_path,
2471 decode_path_len_decrease=len(decode_path_only),
2474 for row in _pprint_pps(pp):
2476 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2479 ########################################################################
2480 # ASN.1 primitive types
2481 ########################################################################
2483 BooleanState = namedtuple(
2485 BasicState._fields + ("value",),
2491 """``BOOLEAN`` boolean type
2493 >>> b = Boolean(True)
2495 >>> b == Boolean(True)
2501 tag_default = tag_encode(1)
2502 asn1_type_name = "BOOLEAN"
2514 :param value: set the value. Either boolean type, or
2515 :py:class:`pyderasn.Boolean` object
2516 :param bytes impl: override default tag with ``IMPLICIT`` one
2517 :param bytes expl: override default tag with ``EXPLICIT`` one
2518 :param default: set default value. Type same as in ``value``
2519 :param bool optional: is object ``OPTIONAL`` in sequence
2521 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2522 self._value = None if value is None else self._value_sanitize(value)
2523 if default is not None:
2524 default = self._value_sanitize(default)
2525 self.default = self.__class__(
2531 self._value = default
2533 def _value_sanitize(self, value):
2534 if value.__class__ == bool:
2536 if issubclass(value.__class__, Boolean):
2538 raise InvalidValueType((self.__class__, bool))
2542 return self._value is not None
2544 def __getstate__(self):
2545 return BooleanState(
2561 def __setstate__(self, state):
2562 super(Boolean, self).__setstate__(state)
2563 self._value = state.value
2565 def __nonzero__(self):
2566 self._assert_ready()
2570 self._assert_ready()
2573 def __eq__(self, their):
2574 if their.__class__ == bool:
2575 return self._value == their
2576 if not issubclass(their.__class__, Boolean):
2579 self._value == their._value and
2580 self.tag == their.tag and
2581 self._expl == their._expl
2592 return self.__class__(
2594 impl=self.tag if impl is None else impl,
2595 expl=self._expl if expl is None else expl,
2596 default=self.default if default is None else default,
2597 optional=self.optional if optional is None else optional,
2601 self._assert_ready()
2602 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2604 def _encode1st(self, state):
2605 return len(self.tag) + 2, state
2607 def _encode2nd(self, writer, state_iter):
2608 self._assert_ready()
2609 write_full(writer, self._encode())
2611 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2613 t, _, lv = tag_strip(tlv)
2614 except DecodeError as err:
2615 raise err.__class__(
2617 klass=self.__class__,
2618 decode_path=decode_path,
2623 klass=self.__class__,
2624 decode_path=decode_path,
2631 l, _, v = len_decode(lv)
2632 except DecodeError as err:
2633 raise err.__class__(
2635 klass=self.__class__,
2636 decode_path=decode_path,
2640 raise InvalidLength(
2641 "Boolean's length must be equal to 1",
2642 klass=self.__class__,
2643 decode_path=decode_path,
2647 raise NotEnoughData(
2648 "encoded length is longer than data",
2649 klass=self.__class__,
2650 decode_path=decode_path,
2653 first_octet = byte2int(v)
2655 if first_octet == 0:
2657 elif first_octet == 0xFF:
2659 elif ctx.get("bered", False):
2664 "unacceptable Boolean value",
2665 klass=self.__class__,
2666 decode_path=decode_path,
2669 obj = self.__class__(
2673 default=self.default,
2674 optional=self.optional,
2675 _decoded=(offset, 1, 1),
2677 obj.ber_encoded = ber_encoded
2678 yield decode_path, obj, v[1:]
2681 return pp_console_row(next(self.pps()))
2683 def pps(self, decode_path=()):
2686 asn1_type_name=self.asn1_type_name,
2687 obj_name=self.__class__.__name__,
2688 decode_path=decode_path,
2689 value=str(self._value) if self.ready else None,
2690 optional=self.optional,
2691 default=self == self.default,
2692 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2693 expl=None if self._expl is None else tag_decode(self._expl),
2698 expl_offset=self.expl_offset if self.expled else None,
2699 expl_tlen=self.expl_tlen if self.expled else None,
2700 expl_llen=self.expl_llen if self.expled else None,
2701 expl_vlen=self.expl_vlen if self.expled else None,
2702 expl_lenindef=self.expl_lenindef,
2703 ber_encoded=self.ber_encoded,
2706 for pp in self.pps_lenindef(decode_path):
2710 IntegerState = namedtuple(
2712 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2718 """``INTEGER`` integer type
2720 >>> b = Integer(-123)
2722 >>> b == Integer(-123)
2727 >>> Integer(2, bounds=(1, 3))
2729 >>> Integer(5, bounds=(1, 3))
2730 Traceback (most recent call last):
2731 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2735 class Version(Integer):
2742 >>> v = Version("v1")
2749 {'v3': 2, 'v1': 0, 'v2': 1}
2751 __slots__ = ("specs", "_bound_min", "_bound_max")
2752 tag_default = tag_encode(2)
2753 asn1_type_name = "INTEGER"
2767 :param value: set the value. Either integer type, named value
2768 (if ``schema`` is specified in the class), or
2769 :py:class:`pyderasn.Integer` object
2770 :param bounds: set ``(MIN, MAX)`` value constraint.
2771 (-inf, +inf) by default
2772 :param bytes impl: override default tag with ``IMPLICIT`` one
2773 :param bytes expl: override default tag with ``EXPLICIT`` one
2774 :param default: set default value. Type same as in ``value``
2775 :param bool optional: is object ``OPTIONAL`` in sequence
2777 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2779 specs = getattr(self, "schema", {}) if _specs is None else _specs
2780 self.specs = specs if specs.__class__ == dict else dict(specs)
2781 self._bound_min, self._bound_max = getattr(
2784 (float("-inf"), float("+inf")),
2785 ) if bounds is None else bounds
2786 if value is not None:
2787 self._value = self._value_sanitize(value)
2788 if default is not None:
2789 default = self._value_sanitize(default)
2790 self.default = self.__class__(
2796 if self._value is None:
2797 self._value = default
2799 def _value_sanitize(self, value):
2800 if isinstance(value, integer_types):
2802 elif issubclass(value.__class__, Integer):
2803 value = value._value
2804 elif value.__class__ == str:
2805 value = self.specs.get(value)
2807 raise ObjUnknown("integer value: %s" % value)
2809 raise InvalidValueType((self.__class__, int, str))
2810 if not self._bound_min <= value <= self._bound_max:
2811 raise BoundsError(self._bound_min, value, self._bound_max)
2816 return self._value is not None
2818 def __getstate__(self):
2819 return IntegerState(
2838 def __setstate__(self, state):
2839 super(Integer, self).__setstate__(state)
2840 self.specs = state.specs
2841 self._value = state.value
2842 self._bound_min = state.bound_min
2843 self._bound_max = state.bound_max
2846 self._assert_ready()
2847 return int(self._value)
2850 """Hexadecimal representation
2852 Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2854 hex_repr = hex(int(self))[2:].upper()
2855 if len(hex_repr) % 2 != 0:
2856 hex_repr = "0" + hex_repr
2860 self._assert_ready()
2861 return hash(b"".join((
2863 bytes(self._expl or b""),
2864 str(self._value).encode("ascii"),
2867 def __eq__(self, their):
2868 if isinstance(their, integer_types):
2869 return self._value == their
2870 if not issubclass(their.__class__, Integer):
2873 self._value == their._value and
2874 self.tag == their.tag and
2875 self._expl == their._expl
2878 def __lt__(self, their):
2879 return self._value < their._value
2883 """Return named representation (if exists) of the value
2885 for name, value in iteritems(self.specs):
2886 if value == self._value:
2899 return self.__class__(
2902 (self._bound_min, self._bound_max)
2903 if bounds is None else bounds
2905 impl=self.tag if impl is None else impl,
2906 expl=self._expl if expl is None else expl,
2907 default=self.default if default is None else default,
2908 optional=self.optional if optional is None else optional,
2912 def _encode_payload(self):
2913 self._assert_ready()
2917 octets = bytearray([0])
2921 octets = bytearray()
2923 octets.append((value & 0xFF) ^ 0xFF)
2925 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2928 octets = bytearray()
2930 octets.append(value & 0xFF)
2932 if octets[-1] & 0x80 > 0:
2935 octets = bytes(octets)
2937 bytes_len = ceil(value.bit_length() / 8) or 1
2940 octets = value.to_bytes(
2945 except OverflowError:
2952 octets = self._encode_payload()
2953 return b"".join((self.tag, len_encode(len(octets)), octets))
2955 def _encode1st(self, state):
2956 l = len(self._encode_payload())
2957 return len(self.tag) + len_size(l) + l, state
2959 def _encode2nd(self, writer, state_iter):
2960 write_full(writer, self._encode())
2962 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2964 t, _, lv = tag_strip(tlv)
2965 except DecodeError as err:
2966 raise err.__class__(
2968 klass=self.__class__,
2969 decode_path=decode_path,
2974 klass=self.__class__,
2975 decode_path=decode_path,
2982 l, llen, v = len_decode(lv)
2983 except DecodeError as err:
2984 raise err.__class__(
2986 klass=self.__class__,
2987 decode_path=decode_path,
2991 raise NotEnoughData(
2992 "encoded length is longer than data",
2993 klass=self.__class__,
2994 decode_path=decode_path,
2998 raise NotEnoughData(
3000 klass=self.__class__,
3001 decode_path=decode_path,
3004 v, tail = v[:l], v[l:]
3005 first_octet = byte2int(v)
3007 second_octet = byte2int(v[1:])
3009 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3010 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3013 "non normalized integer",
3014 klass=self.__class__,
3015 decode_path=decode_path,
3020 if first_octet & 0x80 > 0:
3021 octets = bytearray()
3022 for octet in bytearray(v):
3023 octets.append(octet ^ 0xFF)
3024 for octet in octets:
3025 value = (value << 8) | octet
3029 for octet in bytearray(v):
3030 value = (value << 8) | octet
3032 value = int.from_bytes(v, byteorder="big", signed=True)
3034 obj = self.__class__(
3036 bounds=(self._bound_min, self._bound_max),
3039 default=self.default,
3040 optional=self.optional,
3042 _decoded=(offset, llen, l),
3044 except BoundsError as err:
3047 klass=self.__class__,
3048 decode_path=decode_path,
3051 yield decode_path, obj, tail
3054 return pp_console_row(next(self.pps()))
3056 def pps(self, decode_path=()):
3059 asn1_type_name=self.asn1_type_name,
3060 obj_name=self.__class__.__name__,
3061 decode_path=decode_path,
3062 value=(self.named or str(self._value)) if self.ready else None,
3063 optional=self.optional,
3064 default=self == self.default,
3065 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3066 expl=None if self._expl is None else tag_decode(self._expl),
3071 expl_offset=self.expl_offset if self.expled else None,
3072 expl_tlen=self.expl_tlen if self.expled else None,
3073 expl_llen=self.expl_llen if self.expled else None,
3074 expl_vlen=self.expl_vlen if self.expled else None,
3075 expl_lenindef=self.expl_lenindef,
3078 for pp in self.pps_lenindef(decode_path):
3082 BitStringState = namedtuple(
3084 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3089 class BitString(Obj):
3090 """``BIT STRING`` bit string type
3092 >>> BitString(b"hello world")
3093 BIT STRING 88 bits 68656c6c6f20776f726c64
3096 >>> b == b"hello world"
3101 >>> BitString("'0A3B5F291CD'H")
3102 BIT STRING 44 bits 0a3b5f291cd0
3103 >>> b = BitString("'010110000000'B")
3104 BIT STRING 12 bits 5800
3107 >>> b[0], b[1], b[2], b[3]
3108 (False, True, False, True)
3112 [False, True, False, True, True, False, False, False, False, False, False, False]
3116 class KeyUsage(BitString):
3118 ("digitalSignature", 0),
3119 ("nonRepudiation", 1),
3120 ("keyEncipherment", 2),
3123 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3124 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3126 ['nonRepudiation', 'keyEncipherment']
3128 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3132 Pay attention that BIT STRING can be encoded both in primitive
3133 and constructed forms. Decoder always checks constructed form tag
3134 additionally to specified primitive one. If BER decoding is
3135 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3136 of DER restrictions.
3138 __slots__ = ("tag_constructed", "specs", "defined")
3139 tag_default = tag_encode(3)
3140 asn1_type_name = "BIT STRING"
3153 :param value: set the value. Either binary type, tuple of named
3154 values (if ``schema`` is specified in the class),
3155 string in ``'XXX...'B`` form, or
3156 :py:class:`pyderasn.BitString` object
3157 :param bytes impl: override default tag with ``IMPLICIT`` one
3158 :param bytes expl: override default tag with ``EXPLICIT`` one
3159 :param default: set default value. Type same as in ``value``
3160 :param bool optional: is object ``OPTIONAL`` in sequence
3162 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3163 specs = getattr(self, "schema", {}) if _specs is None else _specs
3164 self.specs = specs if specs.__class__ == dict else dict(specs)
3165 self._value = None if value is None else self._value_sanitize(value)
3166 if default is not None:
3167 default = self._value_sanitize(default)
3168 self.default = self.__class__(
3174 self._value = default
3176 tag_klass, _, tag_num = tag_decode(self.tag)
3177 self.tag_constructed = tag_encode(
3179 form=TagFormConstructed,
3183 def _bits2octets(self, bits):
3184 if len(self.specs) > 0:
3185 bits = bits.rstrip("0")
3187 bits += "0" * ((8 - (bit_len % 8)) % 8)
3188 octets = bytearray(len(bits) // 8)
3189 for i in six_xrange(len(octets)):
3190 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3191 return bit_len, bytes(octets)
3193 def _value_sanitize(self, value):
3194 if isinstance(value, (string_types, binary_type)):
3196 isinstance(value, string_types) and
3197 value.startswith("'")
3199 if value.endswith("'B"):
3201 if not frozenset(value) <= SET01:
3202 raise ValueError("B's coding contains unacceptable chars")
3203 return self._bits2octets(value)
3204 if value.endswith("'H"):
3208 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3210 if value.__class__ == binary_type:
3211 return (len(value) * 8, value)
3212 raise InvalidValueType((self.__class__, string_types, binary_type))
3213 if value.__class__ == tuple:
3216 isinstance(value[0], integer_types) and
3217 value[1].__class__ == binary_type
3222 bit = self.specs.get(name)
3224 raise ObjUnknown("BitString value: %s" % name)
3227 return self._bits2octets("")
3228 bits = frozenset(bits)
3229 return self._bits2octets("".join(
3230 ("1" if bit in bits else "0")
3231 for bit in six_xrange(max(bits) + 1)
3233 if issubclass(value.__class__, BitString):
3235 raise InvalidValueType((self.__class__, binary_type, string_types))
3239 return self._value is not None
3241 def __getstate__(self):
3242 return BitStringState(
3257 self.tag_constructed,
3261 def __setstate__(self, state):
3262 super(BitString, self).__setstate__(state)
3263 self.specs = state.specs
3264 self._value = state.value
3265 self.tag_constructed = state.tag_constructed
3266 self.defined = state.defined
3269 self._assert_ready()
3270 for i in six_xrange(self._value[0]):
3275 """Returns number of bits in the string
3277 self._assert_ready()
3278 return self._value[0]
3280 def __bytes__(self):
3281 self._assert_ready()
3282 return self._value[1]
3284 def __eq__(self, their):
3285 if their.__class__ == bytes:
3286 return self._value[1] == their
3287 if not issubclass(their.__class__, BitString):
3290 self._value == their._value and
3291 self.tag == their.tag and
3292 self._expl == their._expl
3297 """Named representation (if exists) of the bits
3299 :returns: [str(name), ...]
3301 return [name for name, bit in iteritems(self.specs) if self[bit]]
3311 return self.__class__(
3313 impl=self.tag if impl is None else impl,
3314 expl=self._expl if expl is None else expl,
3315 default=self.default if default is None else default,
3316 optional=self.optional if optional is None else optional,
3320 def __getitem__(self, key):
3321 if key.__class__ == int:
3322 bit_len, octets = self._value
3326 byte2int(memoryview(octets)[key // 8:]) >>
3329 if isinstance(key, string_types):
3330 value = self.specs.get(key)
3332 raise ObjUnknown("BitString value: %s" % key)
3334 raise InvalidValueType((int, str))
3337 self._assert_ready()
3338 bit_len, octets = self._value
3341 len_encode(len(octets) + 1),
3342 int2byte((8 - bit_len % 8) % 8),
3346 def _encode1st(self, state):
3347 self._assert_ready()
3348 _, octets = self._value
3350 return len(self.tag) + len_size(l) + l, state
3352 def _encode2nd(self, writer, state_iter):
3353 bit_len, octets = self._value
3354 write_full(writer, b"".join((
3356 len_encode(len(octets) + 1),
3357 int2byte((8 - bit_len % 8) % 8),
3359 write_full(writer, octets)
3361 def _encode_cer(self, writer):
3362 bit_len, octets = self._value
3363 if len(octets) + 1 <= 1000:
3364 write_full(writer, self._encode())
3366 write_full(writer, self.tag_constructed)
3367 write_full(writer, LENINDEF)
3368 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3369 write_full(writer, b"".join((
3370 BitString.tag_default,
3373 octets[offset:offset + 999],
3375 tail = octets[offset+999:]
3377 tail = int2byte((8 - bit_len % 8) % 8) + tail
3378 write_full(writer, b"".join((
3379 BitString.tag_default,
3380 len_encode(len(tail)),
3383 write_full(writer, EOC)
3385 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3387 t, tlen, lv = tag_strip(tlv)
3388 except DecodeError as err:
3389 raise err.__class__(
3391 klass=self.__class__,
3392 decode_path=decode_path,
3396 if tag_only: # pragma: no cover
3400 l, llen, v = len_decode(lv)
3401 except DecodeError as err:
3402 raise err.__class__(
3404 klass=self.__class__,
3405 decode_path=decode_path,
3409 raise NotEnoughData(
3410 "encoded length is longer than data",
3411 klass=self.__class__,
3412 decode_path=decode_path,
3416 raise NotEnoughData(
3418 klass=self.__class__,
3419 decode_path=decode_path,
3422 pad_size = byte2int(v)
3423 if l == 1 and pad_size != 0:
3425 "invalid empty value",
3426 klass=self.__class__,
3427 decode_path=decode_path,
3433 klass=self.__class__,
3434 decode_path=decode_path,
3437 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3440 klass=self.__class__,
3441 decode_path=decode_path,
3444 v, tail = v[:l], v[l:]
3445 bit_len = (len(v) - 1) * 8 - pad_size
3446 obj = self.__class__(
3447 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3450 default=self.default,
3451 optional=self.optional,
3453 _decoded=(offset, llen, l),
3456 obj._value = (bit_len, None)
3457 yield decode_path, obj, tail
3459 if t != self.tag_constructed:
3461 klass=self.__class__,
3462 decode_path=decode_path,
3465 if not ctx.get("bered", False):
3467 "unallowed BER constructed encoding",
3468 klass=self.__class__,
3469 decode_path=decode_path,
3472 if tag_only: # pragma: no cover
3477 l, llen, v = len_decode(lv)
3478 except LenIndefForm:
3479 llen, l, v = 1, 0, lv[1:]
3481 except DecodeError as err:
3482 raise err.__class__(
3484 klass=self.__class__,
3485 decode_path=decode_path,
3489 raise NotEnoughData(
3490 "encoded length is longer than data",
3491 klass=self.__class__,
3492 decode_path=decode_path,
3495 if not lenindef and l == 0:
3496 raise NotEnoughData(
3498 klass=self.__class__,
3499 decode_path=decode_path,
3503 sub_offset = offset + tlen + llen
3507 if v[:EOC_LEN].tobytes() == EOC:
3514 "chunk out of bounds",
3515 klass=self.__class__,
3516 decode_path=decode_path + (str(len(chunks) - 1),),
3517 offset=chunks[-1].offset,
3519 sub_decode_path = decode_path + (str(len(chunks)),)
3522 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3525 decode_path=sub_decode_path,
3528 _ctx_immutable=False,
3530 yield _decode_path, chunk, v_tail
3532 _, chunk, v_tail = next(BitString().decode_evgen(
3535 decode_path=sub_decode_path,
3538 _ctx_immutable=False,
3543 "expected BitString encoded chunk",
3544 klass=self.__class__,
3545 decode_path=sub_decode_path,
3548 chunks.append(chunk)
3549 sub_offset += chunk.tlvlen
3550 vlen += chunk.tlvlen
3552 if len(chunks) == 0:
3555 klass=self.__class__,
3556 decode_path=decode_path,
3561 for chunk_i, chunk in enumerate(chunks[:-1]):
3562 if chunk.bit_len % 8 != 0:
3564 "BitString chunk is not multiple of 8 bits",
3565 klass=self.__class__,
3566 decode_path=decode_path + (str(chunk_i),),
3567 offset=chunk.offset,
3570 values.append(bytes(chunk))
3571 bit_len += chunk.bit_len
3572 chunk_last = chunks[-1]
3574 values.append(bytes(chunk_last))
3575 bit_len += chunk_last.bit_len
3576 obj = self.__class__(
3577 value=None if evgen_mode else (bit_len, b"".join(values)),
3580 default=self.default,
3581 optional=self.optional,
3583 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3586 obj._value = (bit_len, None)
3587 obj.lenindef = lenindef
3588 obj.ber_encoded = True
3589 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3592 return pp_console_row(next(self.pps()))
3594 def pps(self, decode_path=()):
3598 bit_len, blob = self._value
3599 value = "%d bits" % bit_len
3600 if len(self.specs) > 0 and blob is not None:
3601 blob = tuple(self.named)
3604 asn1_type_name=self.asn1_type_name,
3605 obj_name=self.__class__.__name__,
3606 decode_path=decode_path,
3609 optional=self.optional,
3610 default=self == self.default,
3611 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3612 expl=None if self._expl is None else tag_decode(self._expl),
3617 expl_offset=self.expl_offset if self.expled else None,
3618 expl_tlen=self.expl_tlen if self.expled else None,
3619 expl_llen=self.expl_llen if self.expled else None,
3620 expl_vlen=self.expl_vlen if self.expled else None,
3621 expl_lenindef=self.expl_lenindef,
3622 lenindef=self.lenindef,
3623 ber_encoded=self.ber_encoded,
3626 defined_by, defined = self.defined or (None, None)
3627 if defined_by is not None:
3629 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3631 for pp in self.pps_lenindef(decode_path):
3635 OctetStringState = namedtuple(
3637 BasicState._fields + (
3648 class OctetString(Obj):
3649 """``OCTET STRING`` binary string type
3651 >>> s = OctetString(b"hello world")
3652 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3653 >>> s == OctetString(b"hello world")
3658 >>> OctetString(b"hello", bounds=(4, 4))
3659 Traceback (most recent call last):
3660 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3661 >>> OctetString(b"hell", bounds=(4, 4))
3662 OCTET STRING 4 bytes 68656c6c
3664 Memoryviews can be used as a values. If memoryview is made on
3665 mmap-ed file, then it does not take storage inside OctetString
3666 itself. In CER encoding mode it will be streamed to the specified
3667 writer, copying 1 KB chunks.
3669 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3670 tag_default = tag_encode(4)
3671 asn1_type_name = "OCTET STRING"
3672 evgen_mode_skip_value = True
3686 :param value: set the value. Either binary type, or
3687 :py:class:`pyderasn.OctetString` object
3688 :param bounds: set ``(MIN, MAX)`` value size constraint.
3689 (-inf, +inf) by default
3690 :param bytes impl: override default tag with ``IMPLICIT`` one
3691 :param bytes expl: override default tag with ``EXPLICIT`` one
3692 :param default: set default value. Type same as in ``value``
3693 :param bool optional: is object ``OPTIONAL`` in sequence
3695 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3697 self._bound_min, self._bound_max = getattr(
3701 ) if bounds is None else bounds
3702 if value is not None:
3703 self._value = self._value_sanitize(value)
3704 if default is not None:
3705 default = self._value_sanitize(default)
3706 self.default = self.__class__(
3711 if self._value is None:
3712 self._value = default
3714 tag_klass, _, tag_num = tag_decode(self.tag)
3715 self.tag_constructed = tag_encode(
3717 form=TagFormConstructed,
3721 def _value_sanitize(self, value):
3722 if value.__class__ == binary_type or value.__class__ == memoryview:
3724 elif issubclass(value.__class__, OctetString):
3725 value = value._value
3727 raise InvalidValueType((self.__class__, bytes, memoryview))
3728 if not self._bound_min <= len(value) <= self._bound_max:
3729 raise BoundsError(self._bound_min, len(value), self._bound_max)
3734 return self._value is not None
3736 def __getstate__(self):
3737 return OctetStringState(
3753 self.tag_constructed,
3757 def __setstate__(self, state):
3758 super(OctetString, self).__setstate__(state)
3759 self._value = state.value
3760 self._bound_min = state.bound_min
3761 self._bound_max = state.bound_max
3762 self.tag_constructed = state.tag_constructed
3763 self.defined = state.defined
3765 def __bytes__(self):
3766 self._assert_ready()
3767 return bytes(self._value)
3769 def __eq__(self, their):
3770 if their.__class__ == binary_type:
3771 return self._value == their
3772 if not issubclass(their.__class__, OctetString):
3775 self._value == their._value and
3776 self.tag == their.tag and
3777 self._expl == their._expl
3780 def __lt__(self, their):
3781 return self._value < their._value
3792 return self.__class__(
3795 (self._bound_min, self._bound_max)
3796 if bounds is None else bounds
3798 impl=self.tag if impl is None else impl,
3799 expl=self._expl if expl is None else expl,
3800 default=self.default if default is None else default,
3801 optional=self.optional if optional is None else optional,
3805 self._assert_ready()
3808 len_encode(len(self._value)),
3812 def _encode1st(self, state):
3813 self._assert_ready()
3814 l = len(self._value)
3815 return len(self.tag) + len_size(l) + l, state
3817 def _encode2nd(self, writer, state_iter):
3819 write_full(writer, self.tag + len_encode(len(value)))
3820 write_full(writer, value)
3822 def _encode_cer(self, writer):
3823 octets = self._value
3824 if len(octets) <= 1000:
3825 write_full(writer, self._encode())
3827 write_full(writer, self.tag_constructed)
3828 write_full(writer, LENINDEF)
3829 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3830 write_full(writer, b"".join((
3831 OctetString.tag_default,
3833 octets[offset:offset + 1000],
3835 tail = octets[offset+1000:]
3837 write_full(writer, b"".join((
3838 OctetString.tag_default,
3839 len_encode(len(tail)),
3842 write_full(writer, EOC)
3844 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3846 t, tlen, lv = tag_strip(tlv)
3847 except DecodeError as err:
3848 raise err.__class__(
3850 klass=self.__class__,
3851 decode_path=decode_path,
3859 l, llen, v = len_decode(lv)
3860 except DecodeError as err:
3861 raise err.__class__(
3863 klass=self.__class__,
3864 decode_path=decode_path,
3868 raise NotEnoughData(
3869 "encoded length is longer than data",
3870 klass=self.__class__,
3871 decode_path=decode_path,
3874 v, tail = v[:l], v[l:]
3875 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3877 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3878 klass=self.__class__,
3879 decode_path=decode_path,
3883 obj = self.__class__(
3885 None if (evgen_mode and self.evgen_mode_skip_value)
3888 bounds=(self._bound_min, self._bound_max),
3891 default=self.default,
3892 optional=self.optional,
3893 _decoded=(offset, llen, l),
3896 except DecodeError as err:
3899 klass=self.__class__,
3900 decode_path=decode_path,
3903 except BoundsError as err:
3906 klass=self.__class__,
3907 decode_path=decode_path,
3910 yield decode_path, obj, tail
3912 if t != self.tag_constructed:
3914 klass=self.__class__,
3915 decode_path=decode_path,
3918 if not ctx.get("bered", False):
3920 "unallowed BER constructed encoding",
3921 klass=self.__class__,
3922 decode_path=decode_path,
3930 l, llen, v = len_decode(lv)
3931 except LenIndefForm:
3932 llen, l, v = 1, 0, lv[1:]
3934 except DecodeError as err:
3935 raise err.__class__(
3937 klass=self.__class__,
3938 decode_path=decode_path,
3942 raise NotEnoughData(
3943 "encoded length is longer than data",
3944 klass=self.__class__,
3945 decode_path=decode_path,
3950 sub_offset = offset + tlen + llen
3955 if v[:EOC_LEN].tobytes() == EOC:
3962 "chunk out of bounds",
3963 klass=self.__class__,
3964 decode_path=decode_path + (str(len(chunks) - 1),),
3965 offset=chunks[-1].offset,
3969 sub_decode_path = decode_path + (str(chunks_count),)
3970 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3973 decode_path=sub_decode_path,
3976 _ctx_immutable=False,
3978 yield _decode_path, chunk, v_tail
3979 if not chunk.ber_encoded:
3980 payload_len += chunk.vlen
3983 sub_decode_path = decode_path + (str(len(chunks)),)
3984 _, chunk, v_tail = next(OctetString().decode_evgen(
3987 decode_path=sub_decode_path,
3990 _ctx_immutable=False,
3993 chunks.append(chunk)
3996 "expected OctetString encoded chunk",
3997 klass=self.__class__,
3998 decode_path=sub_decode_path,
4001 sub_offset += chunk.tlvlen
4002 vlen += chunk.tlvlen
4004 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
4006 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
4007 klass=self.__class__,
4008 decode_path=decode_path,
4012 obj = self.__class__(
4014 None if evgen_mode else
4015 b"".join(bytes(chunk) for chunk in chunks)
4017 bounds=(self._bound_min, self._bound_max),
4020 default=self.default,
4021 optional=self.optional,
4022 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4025 except DecodeError as err:
4028 klass=self.__class__,
4029 decode_path=decode_path,
4032 except BoundsError as err:
4035 klass=self.__class__,
4036 decode_path=decode_path,
4039 obj.lenindef = lenindef
4040 obj.ber_encoded = True
4041 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4044 return pp_console_row(next(self.pps()))
4046 def pps(self, decode_path=()):
4049 asn1_type_name=self.asn1_type_name,
4050 obj_name=self.__class__.__name__,
4051 decode_path=decode_path,
4052 value=("%d bytes" % len(self._value)) if self.ready else None,
4053 blob=self._value if self.ready else None,
4054 optional=self.optional,
4055 default=self == self.default,
4056 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4057 expl=None if self._expl is None else tag_decode(self._expl),
4062 expl_offset=self.expl_offset if self.expled else None,
4063 expl_tlen=self.expl_tlen if self.expled else None,
4064 expl_llen=self.expl_llen if self.expled else None,
4065 expl_vlen=self.expl_vlen if self.expled else None,
4066 expl_lenindef=self.expl_lenindef,
4067 lenindef=self.lenindef,
4068 ber_encoded=self.ber_encoded,
4071 defined_by, defined = self.defined or (None, None)
4072 if defined_by is not None:
4074 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4076 for pp in self.pps_lenindef(decode_path):
4080 def agg_octet_string(evgens, decode_path, raw, writer):
4081 """Aggregate constructed string (OctetString and its derivatives)
4083 :param evgens: iterator of generated events
4084 :param decode_path: points to the string we want to decode
4085 :param raw: slicebable (memoryview, bytearray, etc) with
4086 the data evgens are generated on
4087 :param writer: buffer.write where string is going to be saved
4088 :param writer: where string is going to be saved. Must comply
4089 with ``io.RawIOBase.write`` behaviour
4091 .. seealso:: :ref:`agg_octet_string`
4093 decode_path_len = len(decode_path)
4094 for dp, obj, _ in evgens:
4095 if dp[:decode_path_len] != decode_path:
4097 if not obj.ber_encoded:
4098 write_full(writer, raw[
4099 obj.offset + obj.tlen + obj.llen:
4100 obj.offset + obj.tlen + obj.llen + obj.vlen -
4101 (EOC_LEN if obj.expl_lenindef else 0)
4103 if len(dp) == decode_path_len:
4107 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4111 """``NULL`` null object
4119 tag_default = tag_encode(5)
4120 asn1_type_name = "NULL"
4124 value=None, # unused, but Sequence passes it
4131 :param bytes impl: override default tag with ``IMPLICIT`` one
4132 :param bytes expl: override default tag with ``EXPLICIT`` one
4133 :param bool optional: is object ``OPTIONAL`` in sequence
4135 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4142 def __getstate__(self):
4158 def __eq__(self, their):
4159 if not issubclass(their.__class__, Null):
4162 self.tag == their.tag and
4163 self._expl == their._expl
4173 return self.__class__(
4174 impl=self.tag if impl is None else impl,
4175 expl=self._expl if expl is None else expl,
4176 optional=self.optional if optional is None else optional,
4180 return self.tag + LEN0
4182 def _encode1st(self, state):
4183 return len(self.tag) + 1, state
4185 def _encode2nd(self, writer, state_iter):
4186 write_full(writer, self.tag + LEN0)
4188 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4190 t, _, lv = tag_strip(tlv)
4191 except DecodeError as err:
4192 raise err.__class__(
4194 klass=self.__class__,
4195 decode_path=decode_path,
4200 klass=self.__class__,
4201 decode_path=decode_path,
4204 if tag_only: # pragma: no cover
4208 l, _, v = len_decode(lv)
4209 except DecodeError as err:
4210 raise err.__class__(
4212 klass=self.__class__,
4213 decode_path=decode_path,
4217 raise InvalidLength(
4218 "Null must have zero length",
4219 klass=self.__class__,
4220 decode_path=decode_path,
4223 obj = self.__class__(
4226 optional=self.optional,
4227 _decoded=(offset, 1, 0),
4229 yield decode_path, obj, v
4232 return pp_console_row(next(self.pps()))
4234 def pps(self, decode_path=()):
4237 asn1_type_name=self.asn1_type_name,
4238 obj_name=self.__class__.__name__,
4239 decode_path=decode_path,
4240 optional=self.optional,
4241 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4242 expl=None if self._expl is None else tag_decode(self._expl),
4247 expl_offset=self.expl_offset if self.expled else None,
4248 expl_tlen=self.expl_tlen if self.expled else None,
4249 expl_llen=self.expl_llen if self.expled else None,
4250 expl_vlen=self.expl_vlen if self.expled else None,
4251 expl_lenindef=self.expl_lenindef,
4254 for pp in self.pps_lenindef(decode_path):
4258 ObjectIdentifierState = namedtuple(
4259 "ObjectIdentifierState",
4260 BasicState._fields + ("value", "defines"),
4265 class ObjectIdentifier(Obj):
4266 """``OBJECT IDENTIFIER`` OID type
4268 >>> oid = ObjectIdentifier((1, 2, 3))
4269 OBJECT IDENTIFIER 1.2.3
4270 >>> oid == ObjectIdentifier("1.2.3")
4276 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4277 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4279 >>> str(ObjectIdentifier((3, 1)))
4280 Traceback (most recent call last):
4281 pyderasn.InvalidOID: unacceptable first arc value
4283 __slots__ = ("defines",)
4284 tag_default = tag_encode(6)
4285 asn1_type_name = "OBJECT IDENTIFIER"
4298 :param value: set the value. Either tuples of integers,
4299 string of "."-concatenated integers, or
4300 :py:class:`pyderasn.ObjectIdentifier` object
4301 :param defines: sequence of tuples. Each tuple has two elements.
4302 First one is relative to current one decode
4303 path, aiming to the field defined by that OID.
4304 Read about relative path in
4305 :py:func:`pyderasn.abs_decode_path`. Second
4306 tuple element is ``{OID: pyderasn.Obj()}``
4307 dictionary, mapping between current OID value
4308 and structure applied to defined field.
4310 .. seealso:: :ref:`definedby`
4312 :param bytes impl: override default tag with ``IMPLICIT`` one
4313 :param bytes expl: override default tag with ``EXPLICIT`` one
4314 :param default: set default value. Type same as in ``value``
4315 :param bool optional: is object ``OPTIONAL`` in sequence
4317 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4319 if value is not None:
4320 self._value = self._value_sanitize(value)
4321 if default is not None:
4322 default = self._value_sanitize(default)
4323 self.default = self.__class__(
4328 if self._value is None:
4329 self._value = default
4330 self.defines = defines
4332 def __add__(self, their):
4333 if their.__class__ == tuple:
4334 return self.__class__(self._value + array("L", their))
4335 if isinstance(their, self.__class__):
4336 return self.__class__(self._value + their._value)
4337 raise InvalidValueType((self.__class__, tuple))
4339 def _value_sanitize(self, value):
4340 if issubclass(value.__class__, ObjectIdentifier):
4342 if isinstance(value, string_types):
4344 value = array("L", (pureint(arc) for arc in value.split(".")))
4346 raise InvalidOID("unacceptable arcs values")
4347 if value.__class__ == tuple:
4349 value = array("L", value)
4350 except OverflowError as err:
4351 raise InvalidOID(repr(err))
4352 if value.__class__ is array:
4354 raise InvalidOID("less than 2 arcs")
4355 first_arc = value[0]
4356 if first_arc in (0, 1):
4357 if not (0 <= value[1] <= 39):
4358 raise InvalidOID("second arc is too wide")
4359 elif first_arc == 2:
4362 raise InvalidOID("unacceptable first arc value")
4363 if not all(arc >= 0 for arc in value):
4364 raise InvalidOID("negative arc value")
4366 raise InvalidValueType((self.__class__, str, tuple))
4370 return self._value is not None
4372 def __getstate__(self):
4373 return ObjectIdentifierState(
4390 def __setstate__(self, state):
4391 super(ObjectIdentifier, self).__setstate__(state)
4392 self._value = state.value
4393 self.defines = state.defines
4396 self._assert_ready()
4397 return iter(self._value)
4400 return ".".join(str(arc) for arc in self._value or ())
4403 self._assert_ready()
4404 return hash(b"".join((
4406 bytes(self._expl or b""),
4407 str(self._value).encode("ascii"),
4410 def __eq__(self, their):
4411 if their.__class__ == tuple:
4412 return self._value == array("L", their)
4413 if not issubclass(their.__class__, ObjectIdentifier):
4416 self.tag == their.tag and
4417 self._expl == their._expl and
4418 self._value == their._value
4421 def __lt__(self, their):
4422 return self._value < their._value
4433 return self.__class__(
4435 defines=self.defines if defines is None else defines,
4436 impl=self.tag if impl is None else impl,
4437 expl=self._expl if expl is None else expl,
4438 default=self.default if default is None else default,
4439 optional=self.optional if optional is None else optional,
4442 def _encode_octets(self):
4443 self._assert_ready()
4445 first_value = value[1]
4446 first_arc = value[0]
4449 elif first_arc == 1:
4451 elif first_arc == 2:
4453 else: # pragma: no cover
4454 raise RuntimeError("invalid arc is stored")
4455 octets = [zero_ended_encode(first_value)]
4456 for arc in value[2:]:
4457 octets.append(zero_ended_encode(arc))
4458 return b"".join(octets)
4461 v = self._encode_octets()
4462 return b"".join((self.tag, len_encode(len(v)), v))
4464 def _encode1st(self, state):
4465 l = len(self._encode_octets())
4466 return len(self.tag) + len_size(l) + l, state
4468 def _encode2nd(self, writer, state_iter):
4469 write_full(writer, self._encode())
4471 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4473 t, _, lv = tag_strip(tlv)
4474 except DecodeError as err:
4475 raise err.__class__(
4477 klass=self.__class__,
4478 decode_path=decode_path,
4483 klass=self.__class__,
4484 decode_path=decode_path,
4487 if tag_only: # pragma: no cover
4491 l, llen, v = len_decode(lv)
4492 except DecodeError as err:
4493 raise err.__class__(
4495 klass=self.__class__,
4496 decode_path=decode_path,
4500 raise NotEnoughData(
4501 "encoded length is longer than data",
4502 klass=self.__class__,
4503 decode_path=decode_path,
4507 raise NotEnoughData(
4509 klass=self.__class__,
4510 decode_path=decode_path,
4513 v, tail = v[:l], v[l:]
4520 octet = indexbytes(v, i)
4521 if i == 0 and octet == 0x80:
4522 if ctx.get("bered", False):
4526 "non normalized arc encoding",
4527 klass=self.__class__,
4528 decode_path=decode_path,
4531 arc = (arc << 7) | (octet & 0x7F)
4532 if octet & 0x80 == 0:
4535 except OverflowError:
4537 "too huge value for local unsigned long",
4538 klass=self.__class__,
4539 decode_path=decode_path,
4548 klass=self.__class__,
4549 decode_path=decode_path,
4553 second_arc = arcs[0]
4554 if 0 <= second_arc <= 39:
4556 elif 40 <= second_arc <= 79:
4562 obj = self.__class__(
4563 value=array("L", (first_arc, second_arc)) + arcs[1:],
4566 default=self.default,
4567 optional=self.optional,
4568 _decoded=(offset, llen, l),
4571 obj.ber_encoded = True
4572 yield decode_path, obj, tail
4575 return pp_console_row(next(self.pps()))
4577 def pps(self, decode_path=()):
4580 asn1_type_name=self.asn1_type_name,
4581 obj_name=self.__class__.__name__,
4582 decode_path=decode_path,
4583 value=str(self) if self.ready else None,
4584 optional=self.optional,
4585 default=self == self.default,
4586 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4587 expl=None if self._expl is None else tag_decode(self._expl),
4592 expl_offset=self.expl_offset if self.expled else None,
4593 expl_tlen=self.expl_tlen if self.expled else None,
4594 expl_llen=self.expl_llen if self.expled else None,
4595 expl_vlen=self.expl_vlen if self.expled else None,
4596 expl_lenindef=self.expl_lenindef,
4597 ber_encoded=self.ber_encoded,
4600 for pp in self.pps_lenindef(decode_path):
4604 class Enumerated(Integer):
4605 """``ENUMERATED`` integer type
4607 This type is identical to :py:class:`pyderasn.Integer`, but requires
4608 schema to be specified and does not accept values missing from it.
4611 tag_default = tag_encode(10)
4612 asn1_type_name = "ENUMERATED"
4623 bounds=None, # dummy argument, workability for Integer.decode
4625 super(Enumerated, self).__init__(
4626 value, bounds, impl, expl, default, optional, _specs, _decoded,
4628 if len(self.specs) == 0:
4629 raise ValueError("schema must be specified")
4631 def _value_sanitize(self, value):
4632 if isinstance(value, self.__class__):
4633 value = value._value
4634 elif isinstance(value, integer_types):
4635 for _value in itervalues(self.specs):
4640 "unknown integer value: %s" % value,
4641 klass=self.__class__,
4643 elif isinstance(value, string_types):
4644 value = self.specs.get(value)
4646 raise ObjUnknown("integer value: %s" % value)
4648 raise InvalidValueType((self.__class__, int, str))
4660 return self.__class__(
4662 impl=self.tag if impl is None else impl,
4663 expl=self._expl if expl is None else expl,
4664 default=self.default if default is None else default,
4665 optional=self.optional if optional is None else optional,
4670 def escape_control_unicode(c):
4671 if unicat(c)[0] == "C":
4672 c = repr(c).lstrip("u").strip("'")
4676 class CommonString(OctetString):
4677 """Common class for all strings
4679 Everything resembles :py:class:`pyderasn.OctetString`, except
4680 ability to deal with unicode text strings.
4682 >>> hexenc("привет мир".encode("utf-8"))
4683 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4684 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4686 >>> s = UTF8String("привет мир")
4687 UTF8String UTF8String привет мир
4689 'привет мир'
4690 >>> hexenc(bytes(s))
4691 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4693 >>> PrintableString("привет мир")
4694 Traceback (most recent call last):
4695 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4697 >>> BMPString("ада", bounds=(2, 2))
4698 Traceback (most recent call last):
4699 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4700 >>> s = BMPString("ад", bounds=(2, 2))
4703 >>> hexenc(bytes(s))
4711 * - :py:class:`pyderasn.UTF8String`
4713 * - :py:class:`pyderasn.NumericString`
4715 * - :py:class:`pyderasn.PrintableString`
4717 * - :py:class:`pyderasn.TeletexString`
4719 * - :py:class:`pyderasn.T61String`
4721 * - :py:class:`pyderasn.VideotexString`
4723 * - :py:class:`pyderasn.IA5String`
4725 * - :py:class:`pyderasn.GraphicString`
4727 * - :py:class:`pyderasn.VisibleString`
4729 * - :py:class:`pyderasn.ISO646String`
4731 * - :py:class:`pyderasn.GeneralString`
4733 * - :py:class:`pyderasn.UniversalString`
4735 * - :py:class:`pyderasn.BMPString`
4740 def _value_sanitize(self, value):
4742 value_decoded = None
4743 if isinstance(value, self.__class__):
4744 value_raw = value._value
4745 elif value.__class__ == text_type:
4746 value_decoded = value
4747 elif value.__class__ == binary_type:
4750 raise InvalidValueType((self.__class__, text_type, binary_type))
4753 value_decoded.encode(self.encoding)
4754 if value_raw is None else value_raw
4757 value_raw.decode(self.encoding)
4758 if value_decoded is None else value_decoded
4760 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4761 raise DecodeError(str(err))
4762 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4770 def __eq__(self, their):
4771 if their.__class__ == binary_type:
4772 return self._value == their
4773 if their.__class__ == text_type:
4774 return self._value == their.encode(self.encoding)
4775 if not isinstance(their, self.__class__):
4778 self._value == their._value and
4779 self.tag == their.tag and
4780 self._expl == their._expl
4783 def __unicode__(self):
4785 return self._value.decode(self.encoding)
4786 return text_type(self._value)
4789 return pp_console_row(next(self.pps(no_unicode=PY2)))
4791 def pps(self, decode_path=(), no_unicode=False):
4795 hexenc(bytes(self)) if no_unicode else
4796 "".join(escape_control_unicode(c) for c in self.__unicode__())
4800 asn1_type_name=self.asn1_type_name,
4801 obj_name=self.__class__.__name__,
4802 decode_path=decode_path,
4804 optional=self.optional,
4805 default=self == self.default,
4806 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4807 expl=None if self._expl is None else tag_decode(self._expl),
4812 expl_offset=self.expl_offset if self.expled else None,
4813 expl_tlen=self.expl_tlen if self.expled else None,
4814 expl_llen=self.expl_llen if self.expled else None,
4815 expl_vlen=self.expl_vlen if self.expled else None,
4816 expl_lenindef=self.expl_lenindef,
4817 ber_encoded=self.ber_encoded,
4820 for pp in self.pps_lenindef(decode_path):
4824 class UTF8String(CommonString):
4826 tag_default = tag_encode(12)
4828 asn1_type_name = "UTF8String"
4831 class AllowableCharsMixin(object):
4833 def allowable_chars(self):
4835 return self._allowable_chars
4836 return frozenset(six_unichr(c) for c in self._allowable_chars)
4839 class NumericString(AllowableCharsMixin, CommonString):
4842 Its value is properly sanitized: only ASCII digits with spaces can
4845 >>> NumericString().allowable_chars
4846 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4849 tag_default = tag_encode(18)
4851 asn1_type_name = "NumericString"
4852 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4854 def _value_sanitize(self, value):
4855 value = super(NumericString, self)._value_sanitize(value)
4856 if not frozenset(value) <= self._allowable_chars:
4857 raise DecodeError("non-numeric value")
4861 PrintableStringState = namedtuple(
4862 "PrintableStringState",
4863 OctetStringState._fields + ("allowable_chars",),
4868 class PrintableString(AllowableCharsMixin, CommonString):
4871 Its value is properly sanitized: see X.680 41.4 table 10.
4873 >>> PrintableString().allowable_chars
4874 frozenset([' ', "'", ..., 'z'])
4875 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4876 PrintableString PrintableString foo*bar
4877 >>> obj.allow_asterisk, obj.allow_ampersand
4881 tag_default = tag_encode(19)
4883 asn1_type_name = "PrintableString"
4884 _allowable_chars = frozenset(
4885 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4887 _asterisk = frozenset("*".encode("ascii"))
4888 _ampersand = frozenset("&".encode("ascii"))
4900 allow_asterisk=False,
4901 allow_ampersand=False,
4904 :param allow_asterisk: allow asterisk character
4905 :param allow_ampersand: allow ampersand character
4908 self._allowable_chars |= self._asterisk
4910 self._allowable_chars |= self._ampersand
4911 super(PrintableString, self).__init__(
4912 value, bounds, impl, expl, default, optional, _decoded, ctx,
4916 def allow_asterisk(self):
4917 """Is asterisk character allowed?
4919 return self._asterisk <= self._allowable_chars
4922 def allow_ampersand(self):
4923 """Is ampersand character allowed?
4925 return self._ampersand <= self._allowable_chars
4927 def _value_sanitize(self, value):
4928 value = super(PrintableString, self)._value_sanitize(value)
4929 if not frozenset(value) <= self._allowable_chars:
4930 raise DecodeError("non-printable value")
4933 def __getstate__(self):
4934 return PrintableStringState(
4935 *super(PrintableString, self).__getstate__(),
4936 **{"allowable_chars": self._allowable_chars}
4939 def __setstate__(self, state):
4940 super(PrintableString, self).__setstate__(state)
4941 self._allowable_chars = state.allowable_chars
4952 return self.__class__(
4955 (self._bound_min, self._bound_max)
4956 if bounds is None else bounds
4958 impl=self.tag if impl is None else impl,
4959 expl=self._expl if expl is None else expl,
4960 default=self.default if default is None else default,
4961 optional=self.optional if optional is None else optional,
4962 allow_asterisk=self.allow_asterisk,
4963 allow_ampersand=self.allow_ampersand,
4967 class TeletexString(CommonString):
4969 tag_default = tag_encode(20)
4971 asn1_type_name = "TeletexString"
4974 class T61String(TeletexString):
4976 asn1_type_name = "T61String"
4979 class VideotexString(CommonString):
4981 tag_default = tag_encode(21)
4982 encoding = "iso-8859-1"
4983 asn1_type_name = "VideotexString"
4986 class IA5String(CommonString):
4988 tag_default = tag_encode(22)
4990 asn1_type_name = "IA5"
4993 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4994 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4995 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4996 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4997 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4998 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5001 class VisibleString(CommonString):
5003 tag_default = tag_encode(26)
5005 asn1_type_name = "VisibleString"
5008 UTCTimeState = namedtuple(
5010 OctetStringState._fields + ("ber_raw",),
5015 def str_to_time_fractions(value):
5017 year, v = (v // 10**10), (v % 10**10)
5018 month, v = (v // 10**8), (v % 10**8)
5019 day, v = (v // 10**6), (v % 10**6)
5020 hour, v = (v // 10**4), (v % 10**4)
5021 minute, second = (v // 100), (v % 100)
5022 return year, month, day, hour, minute, second
5025 class UTCTime(VisibleString):
5026 """``UTCTime`` datetime type
5028 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5029 UTCTime UTCTime 2017-09-30T22:07:50
5035 datetime.datetime(2017, 9, 30, 22, 7, 50)
5036 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5037 datetime.datetime(1957, 9, 30, 22, 7, 50)
5039 If BER encoded value was met, then ``ber_raw`` attribute will hold
5040 its raw representation.
5044 Pay attention that UTCTime can not hold full year, so all years
5045 having < 50 years are treated as 20xx, 19xx otherwise, according
5046 to X.509 recommendation.
5050 No strict validation of UTC offsets are made, but very crude:
5052 * minutes are not exceeding 60
5053 * offset value is not exceeding 14 hours
5055 __slots__ = ("ber_raw",)
5056 tag_default = tag_encode(23)
5058 asn1_type_name = "UTCTime"
5059 evgen_mode_skip_value = False
5069 bounds=None, # dummy argument, workability for OctetString.decode
5073 :param value: set the value. Either datetime type, or
5074 :py:class:`pyderasn.UTCTime` object
5075 :param bytes impl: override default tag with ``IMPLICIT`` one
5076 :param bytes expl: override default tag with ``EXPLICIT`` one
5077 :param default: set default value. Type same as in ``value``
5078 :param bool optional: is object ``OPTIONAL`` in sequence
5080 super(UTCTime, self).__init__(
5081 None, None, impl, expl, None, optional, _decoded, ctx,
5085 if value is not None:
5086 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5087 self.ber_encoded = self.ber_raw is not None
5088 if default is not None:
5089 default, _ = self._value_sanitize(default)
5090 self.default = self.__class__(
5095 if self._value is None:
5096 self._value = default
5098 self.optional = optional
5100 def _strptime_bered(self, value):
5101 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5104 raise ValueError("no timezone")
5105 year += 2000 if year < 50 else 1900
5106 decoded = datetime(year, month, day, hour, minute)
5108 if value[-1] == "Z":
5112 raise ValueError("invalid UTC offset")
5113 if value[-5] == "-":
5115 elif value[-5] == "+":
5118 raise ValueError("invalid UTC offset")
5119 v = pureint(value[-4:])
5120 offset, v = (60 * (v % 100)), v // 100
5122 raise ValueError("invalid UTC offset minutes")
5124 if offset > 14 * 3600:
5125 raise ValueError("too big UTC offset")
5129 return offset, decoded
5131 raise ValueError("invalid UTC offset seconds")
5132 seconds = pureint(value)
5134 raise ValueError("invalid seconds value")
5135 return offset, decoded + timedelta(seconds=seconds)
5137 def _strptime(self, value):
5138 # datetime.strptime's format: %y%m%d%H%M%SZ
5139 if len(value) != LEN_YYMMDDHHMMSSZ:
5140 raise ValueError("invalid UTCTime length")
5141 if value[-1] != "Z":
5142 raise ValueError("non UTC timezone")
5143 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5144 year += 2000 if year < 50 else 1900
5145 return datetime(year, month, day, hour, minute, second)
5147 def _dt_sanitize(self, value):
5148 if value.year < 1950 or value.year > 2049:
5149 raise ValueError("UTCTime can hold only 1950-2049 years")
5150 return value.replace(microsecond=0)
5152 def _value_sanitize(self, value, ctx=None):
5153 if value.__class__ == binary_type:
5155 value_decoded = value.decode("ascii")
5156 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5157 raise DecodeError("invalid UTCTime encoding: %r" % err)
5160 return self._strptime(value_decoded), None
5161 except (TypeError, ValueError) as _err:
5163 if (ctx is not None) and ctx.get("bered", False):
5165 offset, _value = self._strptime_bered(value_decoded)
5166 _value = _value - timedelta(seconds=offset)
5167 return self._dt_sanitize(_value), value
5168 except (TypeError, ValueError, OverflowError) as _err:
5171 "invalid %s format: %r" % (self.asn1_type_name, err),
5172 klass=self.__class__,
5174 if isinstance(value, self.__class__):
5175 return value._value, None
5176 if value.__class__ == datetime:
5177 return self._dt_sanitize(value), None
5178 raise InvalidValueType((self.__class__, datetime))
5180 def _pp_value(self):
5182 value = self._value.isoformat()
5183 if self.ber_encoded:
5184 value += " (%s)" % self.ber_raw
5188 def __unicode__(self):
5190 value = self._value.isoformat()
5191 if self.ber_encoded:
5192 value += " (%s)" % self.ber_raw
5194 return text_type(self._pp_value())
5196 def __getstate__(self):
5197 return UTCTimeState(
5198 *super(UTCTime, self).__getstate__(),
5199 **{"ber_raw": self.ber_raw}
5202 def __setstate__(self, state):
5203 super(UTCTime, self).__setstate__(state)
5204 self.ber_raw = state.ber_raw
5206 def __bytes__(self):
5207 self._assert_ready()
5208 return self._encode_time()
5210 def __eq__(self, their):
5211 if their.__class__ == binary_type:
5212 return self._encode_time() == their
5213 if their.__class__ == datetime:
5214 return self.todatetime() == their
5215 if not isinstance(their, self.__class__):
5218 self._value == their._value and
5219 self.tag == their.tag and
5220 self._expl == their._expl
5223 def _encode_time(self):
5224 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5227 self._assert_ready()
5228 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5230 def _encode1st(self, state):
5231 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5233 def _encode2nd(self, writer, state_iter):
5234 self._assert_ready()
5235 write_full(writer, self._encode())
5237 def _encode_cer(self, writer):
5238 write_full(writer, self._encode())
5240 def todatetime(self):
5244 return pp_console_row(next(self.pps()))
5246 def pps(self, decode_path=()):
5249 asn1_type_name=self.asn1_type_name,
5250 obj_name=self.__class__.__name__,
5251 decode_path=decode_path,
5252 value=self._pp_value(),
5253 optional=self.optional,
5254 default=self == self.default,
5255 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5256 expl=None if self._expl is None else tag_decode(self._expl),
5261 expl_offset=self.expl_offset if self.expled else None,
5262 expl_tlen=self.expl_tlen if self.expled else None,
5263 expl_llen=self.expl_llen if self.expled else None,
5264 expl_vlen=self.expl_vlen if self.expled else None,
5265 expl_lenindef=self.expl_lenindef,
5266 ber_encoded=self.ber_encoded,
5269 for pp in self.pps_lenindef(decode_path):
5273 class GeneralizedTime(UTCTime):
5274 """``GeneralizedTime`` datetime type
5276 This type is similar to :py:class:`pyderasn.UTCTime`.
5278 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5279 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5281 '20170930220750.000123Z'
5282 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5283 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5287 Only microsecond fractions are supported in DER encoding.
5288 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5289 higher precision values.
5293 BER encoded data can loss information (accuracy) during decoding
5294 because of float transformations.
5298 Local times (without explicit timezone specification) are treated
5299 as UTC one, no transformations are made.
5303 Zero year is unsupported.
5306 tag_default = tag_encode(24)
5307 asn1_type_name = "GeneralizedTime"
5309 def _dt_sanitize(self, value):
5312 def _strptime_bered(self, value):
5313 if len(value) < 4 + 3 * 2:
5314 raise ValueError("invalid GeneralizedTime")
5315 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5316 decoded = datetime(year, month, day, hour)
5317 offset, value = 0, value[10:]
5319 return offset, decoded
5320 if value[-1] == "Z":
5323 for char, sign in (("-", -1), ("+", 1)):
5324 idx = value.rfind(char)
5327 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5328 v = pureint(offset_raw)
5329 if len(offset_raw) == 4:
5330 offset, v = (60 * (v % 100)), v // 100
5332 raise ValueError("invalid UTC offset minutes")
5333 elif len(offset_raw) == 2:
5336 raise ValueError("invalid UTC offset")
5338 if offset > 14 * 3600:
5339 raise ValueError("too big UTC offset")
5343 return offset, decoded
5344 if value[0] in DECIMAL_SIGNS:
5346 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5349 raise ValueError("stripped minutes")
5350 decoded += timedelta(seconds=60 * pureint(value[:2]))
5353 return offset, decoded
5354 if value[0] in DECIMAL_SIGNS:
5356 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5359 raise ValueError("stripped seconds")
5360 decoded += timedelta(seconds=pureint(value[:2]))
5363 return offset, decoded
5364 if value[0] not in DECIMAL_SIGNS:
5365 raise ValueError("invalid format after seconds")
5367 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5370 def _strptime(self, value):
5372 if l == LEN_YYYYMMDDHHMMSSZ:
5373 # datetime.strptime's format: %Y%m%d%H%M%SZ
5374 if value[-1] != "Z":
5375 raise ValueError("non UTC timezone")
5376 return datetime(*str_to_time_fractions(value[:-1]))
5377 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5378 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5379 if value[-1] != "Z":
5380 raise ValueError("non UTC timezone")
5381 if value[14] != ".":
5382 raise ValueError("no fractions separator")
5385 raise ValueError("trailing zero")
5388 raise ValueError("only microsecond fractions are supported")
5389 us = pureint(us + ("0" * (6 - us_len)))
5390 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5391 return datetime(year, month, day, hour, minute, second, us)
5392 raise ValueError("invalid GeneralizedTime length")
5394 def _encode_time(self):
5396 encoded = value.strftime("%Y%m%d%H%M%S")
5397 if value.microsecond > 0:
5398 encoded += (".%06d" % value.microsecond).rstrip("0")
5399 return (encoded + "Z").encode("ascii")
5402 self._assert_ready()
5404 if value.microsecond > 0:
5405 encoded = self._encode_time()
5406 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5407 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5409 def _encode1st(self, state):
5410 self._assert_ready()
5411 vlen = len(self._encode_time())
5412 return len(self.tag) + len_size(vlen) + vlen, state
5414 def _encode2nd(self, writer, state_iter):
5415 write_full(writer, self._encode())
5418 class GraphicString(CommonString):
5420 tag_default = tag_encode(25)
5421 encoding = "iso-8859-1"
5422 asn1_type_name = "GraphicString"
5425 class ISO646String(VisibleString):
5427 asn1_type_name = "ISO646String"
5430 class GeneralString(CommonString):
5432 tag_default = tag_encode(27)
5433 encoding = "iso-8859-1"
5434 asn1_type_name = "GeneralString"
5437 class UniversalString(CommonString):
5439 tag_default = tag_encode(28)
5440 encoding = "utf-32-be"
5441 asn1_type_name = "UniversalString"
5444 class BMPString(CommonString):
5446 tag_default = tag_encode(30)
5447 encoding = "utf-16-be"
5448 asn1_type_name = "BMPString"
5451 ChoiceState = namedtuple(
5453 BasicState._fields + ("specs", "value",),
5459 """``CHOICE`` special type
5463 class GeneralName(Choice):
5465 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5466 ("dNSName", IA5String(impl=tag_ctxp(2))),
5469 >>> gn = GeneralName()
5471 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5472 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5473 >>> gn["dNSName"] = IA5String("bar.baz")
5474 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5475 >>> gn["rfc822Name"]
5478 [2] IA5String IA5 bar.baz
5481 >>> gn.value == gn["dNSName"]
5484 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5486 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5487 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5489 __slots__ = ("specs",)
5491 asn1_type_name = "CHOICE"
5504 :param value: set the value. Either ``(choice, value)`` tuple, or
5505 :py:class:`pyderasn.Choice` object
5506 :param bytes impl: can not be set, do **not** use it
5507 :param bytes expl: override default tag with ``EXPLICIT`` one
5508 :param default: set default value. Type same as in ``value``
5509 :param bool optional: is object ``OPTIONAL`` in sequence
5511 if impl is not None:
5512 raise ValueError("no implicit tag allowed for CHOICE")
5513 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5515 schema = getattr(self, "schema", ())
5516 if len(schema) == 0:
5517 raise ValueError("schema must be specified")
5519 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5522 if value is not None:
5523 self._value = self._value_sanitize(value)
5524 if default is not None:
5525 default_value = self._value_sanitize(default)
5526 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5527 default_obj.specs = self.specs
5528 default_obj._value = default_value
5529 self.default = default_obj
5531 self._value = copy(default_obj._value)
5532 if self._expl is not None:
5533 tag_class, _, tag_num = tag_decode(self._expl)
5534 self._tag_order = (tag_class, tag_num)
5536 def _value_sanitize(self, value):
5537 if (value.__class__ == tuple) and len(value) == 2:
5539 spec = self.specs.get(choice)
5541 raise ObjUnknown(choice)
5542 if not isinstance(obj, spec.__class__):
5543 raise InvalidValueType((spec,))
5544 return (choice, spec(obj))
5545 if isinstance(value, self.__class__):
5547 raise InvalidValueType((self.__class__, tuple))
5551 return self._value is not None and self._value[1].ready
5555 return self.expl_lenindef or (
5556 (self._value is not None) and
5557 self._value[1].bered
5560 def __getstate__(self):
5578 def __setstate__(self, state):
5579 super(Choice, self).__setstate__(state)
5580 self.specs = state.specs
5581 self._value = state.value
5583 def __eq__(self, their):
5584 if (their.__class__ == tuple) and len(their) == 2:
5585 return self._value == their
5586 if not isinstance(their, self.__class__):
5589 self.specs == their.specs and
5590 self._value == their._value
5600 return self.__class__(
5603 expl=self._expl if expl is None else expl,
5604 default=self.default if default is None else default,
5605 optional=self.optional if optional is None else optional,
5610 """Name of the choice
5612 self._assert_ready()
5613 return self._value[0]
5617 """Value of underlying choice
5619 self._assert_ready()
5620 return self._value[1]
5623 def tag_order(self):
5624 self._assert_ready()
5625 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5628 def tag_order_cer(self):
5629 return min(v.tag_order_cer for v in itervalues(self.specs))
5631 def __getitem__(self, key):
5632 if key not in self.specs:
5633 raise ObjUnknown(key)
5634 if self._value is None:
5636 choice, value = self._value
5641 def __setitem__(self, key, value):
5642 spec = self.specs.get(key)
5644 raise ObjUnknown(key)
5645 if not isinstance(value, spec.__class__):
5646 raise InvalidValueType((spec.__class__,))
5647 self._value = (key, spec(value))
5655 return self._value[1].decoded if self.ready else False
5658 self._assert_ready()
5659 return self._value[1].encode()
5661 def _encode1st(self, state):
5662 self._assert_ready()
5663 return self._value[1].encode1st(state)
5665 def _encode2nd(self, writer, state_iter):
5666 self._value[1].encode2nd(writer, state_iter)
5668 def _encode_cer(self, writer):
5669 self._assert_ready()
5670 self._value[1].encode_cer(writer)
5672 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5673 for choice, spec in iteritems(self.specs):
5674 sub_decode_path = decode_path + (choice,)
5680 decode_path=sub_decode_path,
5683 _ctx_immutable=False,
5690 klass=self.__class__,
5691 decode_path=decode_path,
5694 if tag_only: # pragma: no cover
5698 for _decode_path, value, tail in spec.decode_evgen(
5702 decode_path=sub_decode_path,
5704 _ctx_immutable=False,
5706 yield _decode_path, value, tail
5708 _, value, tail = next(spec.decode_evgen(
5712 decode_path=sub_decode_path,
5714 _ctx_immutable=False,
5717 obj = self.__class__(
5720 default=self.default,
5721 optional=self.optional,
5722 _decoded=(offset, 0, value.fulllen),
5724 obj._value = (choice, value)
5725 yield decode_path, obj, tail
5728 value = pp_console_row(next(self.pps()))
5730 value = "%s[%r]" % (value, self.value)
5733 def pps(self, decode_path=()):
5736 asn1_type_name=self.asn1_type_name,
5737 obj_name=self.__class__.__name__,
5738 decode_path=decode_path,
5739 value=self.choice if self.ready else None,
5740 optional=self.optional,
5741 default=self == self.default,
5742 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5743 expl=None if self._expl is None else tag_decode(self._expl),
5748 expl_lenindef=self.expl_lenindef,
5752 yield self.value.pps(decode_path=decode_path + (self.choice,))
5753 for pp in self.pps_lenindef(decode_path):
5757 class PrimitiveTypes(Choice):
5758 """Predefined ``CHOICE`` for all generic primitive types
5760 It could be useful for general decoding of some unspecified values:
5762 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5763 OCTET STRING 3 bytes 666f6f
5764 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5768 schema = tuple((klass.__name__, klass()) for klass in (
5792 AnyState = namedtuple(
5794 BasicState._fields + ("value", "defined"),
5800 """``ANY`` special type
5802 >>> Any(Integer(-123))
5803 ANY INTEGER -123 (0X:7B)
5804 >>> a = Any(OctetString(b"hello world").encode())
5805 ANY 040b68656c6c6f20776f726c64
5806 >>> hexenc(bytes(a))
5807 b'0x040x0bhello world'
5809 __slots__ = ("defined",)
5810 tag_default = tag_encode(0)
5811 asn1_type_name = "ANY"
5821 :param value: set the value. Either any kind of pyderasn's
5822 **ready** object, or bytes. Pay attention that
5823 **no** validation is performed if raw binary value
5824 is valid TLV, except just tag decoding
5825 :param bytes expl: override default tag with ``EXPLICIT`` one
5826 :param bool optional: is object ``OPTIONAL`` in sequence
5828 super(Any, self).__init__(None, expl, None, optional, _decoded)
5832 value = self._value_sanitize(value)
5834 if self._expl is None:
5835 if value.__class__ == binary_type:
5836 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5838 tag_class, tag_num = value.tag_order
5840 tag_class, _, tag_num = tag_decode(self._expl)
5841 self._tag_order = (tag_class, tag_num)
5844 def _value_sanitize(self, value):
5845 if value.__class__ == binary_type:
5847 raise ValueError("Any value can not be empty")
5849 if isinstance(value, self.__class__):
5851 if not isinstance(value, Obj):
5852 raise InvalidValueType((self.__class__, Obj, binary_type))
5857 return self._value is not None
5860 def tag_order(self):
5861 self._assert_ready()
5862 return self._tag_order
5866 if self.expl_lenindef or self.lenindef:
5868 if self.defined is None:
5870 return self.defined[1].bered
5872 def __getstate__(self):
5890 def __setstate__(self, state):
5891 super(Any, self).__setstate__(state)
5892 self._value = state.value
5893 self.defined = state.defined
5895 def __eq__(self, their):
5896 if their.__class__ == binary_type:
5897 if self._value.__class__ == binary_type:
5898 return self._value == their
5899 return self._value.encode() == their
5900 if issubclass(their.__class__, Any):
5901 if self.ready and their.ready:
5902 return bytes(self) == bytes(their)
5903 return self.ready == their.ready
5912 return self.__class__(
5914 expl=self._expl if expl is None else expl,
5915 optional=self.optional if optional is None else optional,
5918 def __bytes__(self):
5919 self._assert_ready()
5921 if value.__class__ == binary_type:
5923 return self._value.encode()
5930 self._assert_ready()
5932 if value.__class__ == binary_type:
5934 return value.encode()
5936 def _encode1st(self, state):
5937 self._assert_ready()
5939 if value.__class__ == binary_type:
5940 return len(value), state
5941 return value.encode1st(state)
5943 def _encode2nd(self, writer, state_iter):
5945 if value.__class__ == binary_type:
5946 write_full(writer, value)
5948 value.encode2nd(writer, state_iter)
5950 def _encode_cer(self, writer):
5951 self._assert_ready()
5953 if value.__class__ == binary_type:
5954 write_full(writer, value)
5956 value.encode_cer(writer)
5958 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5960 t, tlen, lv = tag_strip(tlv)
5961 except DecodeError as err:
5962 raise err.__class__(
5964 klass=self.__class__,
5965 decode_path=decode_path,
5969 l, llen, v = len_decode(lv)
5970 except LenIndefForm as err:
5971 if not ctx.get("bered", False):
5972 raise err.__class__(
5974 klass=self.__class__,
5975 decode_path=decode_path,
5978 llen, vlen, v = 1, 0, lv[1:]
5979 sub_offset = offset + tlen + llen
5981 while v[:EOC_LEN].tobytes() != EOC:
5982 chunk, v = Any().decode(
5985 decode_path=decode_path + (str(chunk_i),),
5988 _ctx_immutable=False,
5990 vlen += chunk.tlvlen
5991 sub_offset += chunk.tlvlen
5993 tlvlen = tlen + llen + vlen + EOC_LEN
5994 obj = self.__class__(
5995 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5997 optional=self.optional,
5998 _decoded=(offset, 0, tlvlen),
6001 obj.tag = t.tobytes()
6002 yield decode_path, obj, v[EOC_LEN:]
6004 except DecodeError as err:
6005 raise err.__class__(
6007 klass=self.__class__,
6008 decode_path=decode_path,
6012 raise NotEnoughData(
6013 "encoded length is longer than data",
6014 klass=self.__class__,
6015 decode_path=decode_path,
6018 tlvlen = tlen + llen + l
6019 v, tail = tlv[:tlvlen], v[l:]
6020 obj = self.__class__(
6021 value=None if evgen_mode else v.tobytes(),
6023 optional=self.optional,
6024 _decoded=(offset, 0, tlvlen),
6026 obj.tag = t.tobytes()
6027 yield decode_path, obj, tail
6030 return pp_console_row(next(self.pps()))
6032 def pps(self, decode_path=()):
6036 elif value.__class__ == binary_type:
6042 asn1_type_name=self.asn1_type_name,
6043 obj_name=self.__class__.__name__,
6044 decode_path=decode_path,
6046 blob=self._value if self._value.__class__ == binary_type else None,
6047 optional=self.optional,
6048 default=self == self.default,
6049 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6050 expl=None if self._expl is None else tag_decode(self._expl),
6055 expl_offset=self.expl_offset if self.expled else None,
6056 expl_tlen=self.expl_tlen if self.expled else None,
6057 expl_llen=self.expl_llen if self.expled else None,
6058 expl_vlen=self.expl_vlen if self.expled else None,
6059 expl_lenindef=self.expl_lenindef,
6060 lenindef=self.lenindef,
6063 defined_by, defined = self.defined or (None, None)
6064 if defined_by is not None:
6066 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6068 for pp in self.pps_lenindef(decode_path):
6072 ########################################################################
6073 # ASN.1 constructed types
6074 ########################################################################
6076 def abs_decode_path(decode_path, rel_path):
6077 """Create an absolute decode path from current and relative ones
6079 :param decode_path: current decode path, starting point. Tuple of strings
6080 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6081 If first tuple's element is "/", then treat it as
6082 an absolute path, ignoring ``decode_path`` as
6083 starting point. Also this tuple can contain ".."
6084 elements, stripping the leading element from
6087 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6088 ("foo", "bar", "baz", "whatever")
6089 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6091 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6094 if rel_path[0] == "/":
6096 if rel_path[0] == "..":
6097 return abs_decode_path(decode_path[:-1], rel_path[1:])
6098 return decode_path + rel_path
6101 SequenceState = namedtuple(
6103 BasicState._fields + ("specs", "value",),
6108 class SequenceEncode1stMixing(object):
6109 def _encode1st(self, state):
6111 idx = len(state) - 1
6113 for v in self._values_for_encoding():
6114 l, _ = v.encode1st(state)
6117 return len(self.tag) + len_size(vlen) + vlen, state
6120 class Sequence(SequenceEncode1stMixing, Obj):
6121 """``SEQUENCE`` structure type
6123 You have to make specification of sequence::
6125 class Extension(Sequence):
6127 ("extnID", ObjectIdentifier()),
6128 ("critical", Boolean(default=False)),
6129 ("extnValue", OctetString()),
6132 Then, you can work with it as with dictionary.
6134 >>> ext = Extension()
6135 >>> Extension().specs
6137 ('extnID', OBJECT IDENTIFIER),
6138 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6139 ('extnValue', OCTET STRING),
6141 >>> ext["extnID"] = "1.2.3"
6142 Traceback (most recent call last):
6143 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6144 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6146 You can determine if sequence is ready to be encoded:
6151 Traceback (most recent call last):
6152 pyderasn.ObjNotReady: object is not ready: extnValue
6153 >>> ext["extnValue"] = OctetString(b"foobar")
6157 Value you want to assign, must have the same **type** as in
6158 corresponding specification, but it can have different tags,
6159 optional/default attributes -- they will be taken from specification
6162 class TBSCertificate(Sequence):
6164 ("version", Version(expl=tag_ctxc(0), default="v1")),
6167 >>> tbs = TBSCertificate()
6168 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6170 Assign ``None`` to remove value from sequence.
6172 You can set values in Sequence during its initialization:
6174 >>> AlgorithmIdentifier((
6175 ("algorithm", ObjectIdentifier("1.2.3")),
6176 ("parameters", Any(Null()))
6178 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6180 You can determine if value exists/set in the sequence and take its value:
6182 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6185 OBJECT IDENTIFIER 1.2.3
6187 But pay attention that if value has default, then it won't be (not
6188 in) in the sequence (because ``DEFAULT`` must not be encoded in
6189 DER), but you can read its value:
6191 >>> "critical" in ext, ext["critical"]
6192 (False, BOOLEAN False)
6193 >>> ext["critical"] = Boolean(True)
6194 >>> "critical" in ext, ext["critical"]
6195 (True, BOOLEAN True)
6197 All defaulted values are always optional.
6199 .. _allow_default_values_ctx:
6201 DER prohibits default value encoding and will raise an error if
6202 default value is unexpectedly met during decode.
6203 If :ref:`bered <bered_ctx>` context option is set, then no error
6204 will be raised, but ``bered`` attribute set. You can disable strict
6205 defaulted values existence validation by setting
6206 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6208 All values with DEFAULT specified are decoded atomically in
6209 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6210 SEQUENCE, then it will be yielded as a single element, not
6211 disassembled. That is required for DEFAULT existence check.
6213 Two sequences are equal if they have equal specification (schema),
6214 implicit/explicit tagging and the same values.
6216 __slots__ = ("specs",)
6217 tag_default = tag_encode(form=TagFormConstructed, num=16)
6218 asn1_type_name = "SEQUENCE"
6230 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6232 schema = getattr(self, "schema", ())
6234 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6237 if value is not None:
6238 if issubclass(value.__class__, Sequence):
6239 self._value = value._value
6240 elif hasattr(value, "__iter__"):
6241 for seq_key, seq_value in value:
6242 self[seq_key] = seq_value
6244 raise InvalidValueType((Sequence,))
6245 if default is not None:
6246 if not issubclass(default.__class__, Sequence):
6247 raise InvalidValueType((Sequence,))
6248 default_value = default._value
6249 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6250 default_obj.specs = self.specs
6251 default_obj._value = default_value
6252 self.default = default_obj
6254 self._value = copy(default_obj._value)
6258 for name, spec in iteritems(self.specs):
6259 value = self._value.get(name)
6270 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6272 return any(value.bered for value in itervalues(self._value))
6274 def __getstate__(self):
6275 return SequenceState(
6289 {k: copy(v) for k, v in iteritems(self._value)},
6292 def __setstate__(self, state):
6293 super(Sequence, self).__setstate__(state)
6294 self.specs = state.specs
6295 self._value = state.value
6297 def __eq__(self, their):
6298 if not isinstance(their, self.__class__):
6301 self.specs == their.specs and
6302 self.tag == their.tag and
6303 self._expl == their._expl and
6304 self._value == their._value
6315 return self.__class__(
6318 impl=self.tag if impl is None else impl,
6319 expl=self._expl if expl is None else expl,
6320 default=self.default if default is None else default,
6321 optional=self.optional if optional is None else optional,
6324 def __contains__(self, key):
6325 return key in self._value
6327 def __setitem__(self, key, value):
6328 spec = self.specs.get(key)
6330 raise ObjUnknown(key)
6332 self._value.pop(key, None)
6334 if not isinstance(value, spec.__class__):
6335 raise InvalidValueType((spec.__class__,))
6336 value = spec(value=value)
6337 if spec.default is not None and value == spec.default:
6338 self._value.pop(key, None)
6340 self._value[key] = value
6342 def __getitem__(self, key):
6343 value = self._value.get(key)
6344 if value is not None:
6346 spec = self.specs.get(key)
6348 raise ObjUnknown(key)
6349 if spec.default is not None:
6353 def _values_for_encoding(self):
6354 for name, spec in iteritems(self.specs):
6355 value = self._value.get(name)
6359 raise ObjNotReady(name)
6363 v = b"".join(v.encode() for v in self._values_for_encoding())
6364 return b"".join((self.tag, len_encode(len(v)), v))
6366 def _encode2nd(self, writer, state_iter):
6367 write_full(writer, self.tag + len_encode(next(state_iter)))
6368 for v in self._values_for_encoding():
6369 v.encode2nd(writer, state_iter)
6371 def _encode_cer(self, writer):
6372 write_full(writer, self.tag + LENINDEF)
6373 for v in self._values_for_encoding():
6374 v.encode_cer(writer)
6375 write_full(writer, EOC)
6377 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6379 t, tlen, lv = tag_strip(tlv)
6380 except DecodeError as err:
6381 raise err.__class__(
6383 klass=self.__class__,
6384 decode_path=decode_path,
6389 klass=self.__class__,
6390 decode_path=decode_path,
6393 if tag_only: # pragma: no cover
6397 ctx_bered = ctx.get("bered", False)
6399 l, llen, v = len_decode(lv)
6400 except LenIndefForm as err:
6402 raise err.__class__(
6404 klass=self.__class__,
6405 decode_path=decode_path,
6408 l, llen, v = 0, 1, lv[1:]
6410 except DecodeError as err:
6411 raise err.__class__(
6413 klass=self.__class__,
6414 decode_path=decode_path,
6418 raise NotEnoughData(
6419 "encoded length is longer than data",
6420 klass=self.__class__,
6421 decode_path=decode_path,
6425 v, tail = v[:l], v[l:]
6427 sub_offset = offset + tlen + llen
6430 ctx_allow_default_values = ctx.get("allow_default_values", False)
6431 for name, spec in iteritems(self.specs):
6432 if spec.optional and (
6433 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6437 spec_defaulted = spec.default is not None
6438 sub_decode_path = decode_path + (name,)
6440 if evgen_mode and not spec_defaulted:
6441 for _decode_path, value, v_tail in spec.decode_evgen(
6445 decode_path=sub_decode_path,
6447 _ctx_immutable=False,
6449 yield _decode_path, value, v_tail
6451 _, value, v_tail = next(spec.decode_evgen(
6455 decode_path=sub_decode_path,
6457 _ctx_immutable=False,
6460 except TagMismatch as err:
6461 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6465 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6466 if not evgen_mode and defined is not None:
6467 defined_by, defined_spec = defined
6468 if issubclass(value.__class__, SequenceOf):
6469 for i, _value in enumerate(value):
6470 sub_sub_decode_path = sub_decode_path + (
6472 DecodePathDefBy(defined_by),
6474 defined_value, defined_tail = defined_spec.decode(
6475 memoryview(bytes(_value)),
6477 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6478 if value.expled else (value.tlen + value.llen)
6481 decode_path=sub_sub_decode_path,
6483 _ctx_immutable=False,
6485 if len(defined_tail) > 0:
6488 klass=self.__class__,
6489 decode_path=sub_sub_decode_path,
6492 _value.defined = (defined_by, defined_value)
6494 defined_value, defined_tail = defined_spec.decode(
6495 memoryview(bytes(value)),
6497 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6498 if value.expled else (value.tlen + value.llen)
6501 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6503 _ctx_immutable=False,
6505 if len(defined_tail) > 0:
6508 klass=self.__class__,
6509 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6512 value.defined = (defined_by, defined_value)
6514 value_len = value.fulllen
6516 sub_offset += value_len
6520 yield sub_decode_path, value, v_tail
6521 if value == spec.default:
6522 if ctx_bered or ctx_allow_default_values:
6526 "DEFAULT value met",
6527 klass=self.__class__,
6528 decode_path=sub_decode_path,
6532 values[name] = value
6533 spec_defines = getattr(spec, "defines", ())
6534 if len(spec_defines) == 0:
6535 defines_by_path = ctx.get("defines_by_path", ())
6536 if len(defines_by_path) > 0:
6537 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6538 if spec_defines is not None and len(spec_defines) > 0:
6539 for rel_path, schema in spec_defines:
6540 defined = schema.get(value, None)
6541 if defined is not None:
6542 ctx.setdefault("_defines", []).append((
6543 abs_decode_path(sub_decode_path[:-1], rel_path),
6547 if v[:EOC_LEN].tobytes() != EOC:
6550 klass=self.__class__,
6551 decode_path=decode_path,
6559 klass=self.__class__,
6560 decode_path=decode_path,
6563 obj = self.__class__(
6567 default=self.default,
6568 optional=self.optional,
6569 _decoded=(offset, llen, vlen),
6572 obj.lenindef = lenindef
6573 obj.ber_encoded = ber_encoded
6574 yield decode_path, obj, tail
6577 value = pp_console_row(next(self.pps()))
6579 for name in self.specs:
6580 _value = self._value.get(name)
6583 cols.append("%s: %s" % (name, repr(_value)))
6584 return "%s[%s]" % (value, "; ".join(cols))
6586 def pps(self, decode_path=()):
6589 asn1_type_name=self.asn1_type_name,
6590 obj_name=self.__class__.__name__,
6591 decode_path=decode_path,
6592 optional=self.optional,
6593 default=self == self.default,
6594 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6595 expl=None if self._expl is None else tag_decode(self._expl),
6600 expl_offset=self.expl_offset if self.expled else None,
6601 expl_tlen=self.expl_tlen if self.expled else None,
6602 expl_llen=self.expl_llen if self.expled else None,
6603 expl_vlen=self.expl_vlen if self.expled else None,
6604 expl_lenindef=self.expl_lenindef,
6605 lenindef=self.lenindef,
6606 ber_encoded=self.ber_encoded,
6609 for name in self.specs:
6610 value = self._value.get(name)
6613 yield value.pps(decode_path=decode_path + (name,))
6614 for pp in self.pps_lenindef(decode_path):
6618 class Set(Sequence, SequenceEncode1stMixing):
6619 """``SET`` structure type
6621 Its usage is identical to :py:class:`pyderasn.Sequence`.
6623 .. _allow_unordered_set_ctx:
6625 DER prohibits unordered values encoding and will raise an error
6626 during decode. If :ref:`bered <bered_ctx>` context option is set,
6627 then no error will occur. Also you can disable strict values
6628 ordering check by setting ``"allow_unordered_set": True``
6629 :ref:`context <ctx>` option.
6632 tag_default = tag_encode(form=TagFormConstructed, num=17)
6633 asn1_type_name = "SET"
6635 def _values_for_encoding(self):
6637 super(Set, self)._values_for_encoding(),
6638 key=attrgetter("tag_order"),
6641 def _encode_cer(self, writer):
6642 write_full(writer, self.tag + LENINDEF)
6644 super(Set, self)._values_for_encoding(),
6645 key=attrgetter("tag_order_cer"),
6647 v.encode_cer(writer)
6648 write_full(writer, EOC)
6650 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6652 t, tlen, lv = tag_strip(tlv)
6653 except DecodeError as err:
6654 raise err.__class__(
6656 klass=self.__class__,
6657 decode_path=decode_path,
6662 klass=self.__class__,
6663 decode_path=decode_path,
6670 ctx_bered = ctx.get("bered", False)
6672 l, llen, v = len_decode(lv)
6673 except LenIndefForm as err:
6675 raise err.__class__(
6677 klass=self.__class__,
6678 decode_path=decode_path,
6681 l, llen, v = 0, 1, lv[1:]
6683 except DecodeError as err:
6684 raise err.__class__(
6686 klass=self.__class__,
6687 decode_path=decode_path,
6691 raise NotEnoughData(
6692 "encoded length is longer than data",
6693 klass=self.__class__,
6697 v, tail = v[:l], v[l:]
6699 sub_offset = offset + tlen + llen
6702 ctx_allow_default_values = ctx.get("allow_default_values", False)
6703 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6704 tag_order_prev = (0, 0)
6705 _specs_items = copy(self.specs)
6708 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6710 for name, spec in iteritems(_specs_items):
6711 sub_decode_path = decode_path + (name,)
6717 decode_path=sub_decode_path,
6720 _ctx_immutable=False,
6727 klass=self.__class__,
6728 decode_path=decode_path,
6731 spec_defaulted = spec.default is not None
6732 if evgen_mode and not spec_defaulted:
6733 for _decode_path, value, v_tail in spec.decode_evgen(
6737 decode_path=sub_decode_path,
6739 _ctx_immutable=False,
6741 yield _decode_path, value, v_tail
6743 _, value, v_tail = next(spec.decode_evgen(
6747 decode_path=sub_decode_path,
6749 _ctx_immutable=False,
6752 value_tag_order = value.tag_order
6753 value_len = value.fulllen
6754 if tag_order_prev >= value_tag_order:
6755 if ctx_bered or ctx_allow_unordered_set:
6759 "unordered " + self.asn1_type_name,
6760 klass=self.__class__,
6761 decode_path=sub_decode_path,
6766 yield sub_decode_path, value, v_tail
6767 if value != spec.default:
6769 elif ctx_bered or ctx_allow_default_values:
6773 "DEFAULT value met",
6774 klass=self.__class__,
6775 decode_path=sub_decode_path,
6778 values[name] = value
6779 del _specs_items[name]
6780 tag_order_prev = value_tag_order
6781 sub_offset += value_len
6785 obj = self.__class__(
6789 default=self.default,
6790 optional=self.optional,
6791 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6794 if v[:EOC_LEN].tobytes() != EOC:
6797 klass=self.__class__,
6798 decode_path=decode_path,
6803 for name, spec in iteritems(self.specs):
6804 if name not in values and not spec.optional:
6806 "%s value is not ready" % name,
6807 klass=self.__class__,
6808 decode_path=decode_path,
6813 obj.ber_encoded = ber_encoded
6814 yield decode_path, obj, tail
6817 SequenceOfState = namedtuple(
6819 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6824 class SequenceOf(SequenceEncode1stMixing, Obj):
6825 """``SEQUENCE OF`` sequence type
6827 For that kind of type you must specify the object it will carry on
6828 (bounds are for example here, not required)::
6830 class Ints(SequenceOf):
6835 >>> ints.append(Integer(123))
6836 >>> ints.append(Integer(234))
6838 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6839 >>> [int(i) for i in ints]
6841 >>> ints.append(Integer(345))
6842 Traceback (most recent call last):
6843 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6846 >>> ints[1] = Integer(345)
6848 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6850 You can initialize sequence with preinitialized values:
6852 >>> ints = Ints([Integer(123), Integer(234)])
6854 Also you can use iterator as a value:
6856 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6858 And it won't be iterated until encoding process. Pay attention that
6859 bounds and required schema checks are done only during the encoding
6860 process in that case! After encode was called, then value is zeroed
6861 back to empty list and you have to set it again. That mode is useful
6862 mainly with CER encoding mode, where all objects from the iterable
6863 will be streamed to the buffer, without copying all of them to
6866 __slots__ = ("spec", "_bound_min", "_bound_max")
6867 tag_default = tag_encode(form=TagFormConstructed, num=16)
6868 asn1_type_name = "SEQUENCE OF"
6881 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6883 schema = getattr(self, "schema", None)
6885 raise ValueError("schema must be specified")
6887 self._bound_min, self._bound_max = getattr(
6891 ) if bounds is None else bounds
6893 if value is not None:
6894 self._value = self._value_sanitize(value)
6895 if default is not None:
6896 default_value = self._value_sanitize(default)
6897 default_obj = self.__class__(
6902 default_obj._value = default_value
6903 self.default = default_obj
6905 self._value = copy(default_obj._value)
6907 def _value_sanitize(self, value):
6909 if issubclass(value.__class__, SequenceOf):
6910 value = value._value
6911 elif hasattr(value, NEXT_ATTR_NAME):
6913 elif hasattr(value, "__iter__"):
6916 raise InvalidValueType((self.__class__, iter, "iterator"))
6918 if not self._bound_min <= len(value) <= self._bound_max:
6919 raise BoundsError(self._bound_min, len(value), self._bound_max)
6920 class_expected = self.spec.__class__
6922 if not isinstance(v, class_expected):
6923 raise InvalidValueType((class_expected,))
6928 if hasattr(self._value, NEXT_ATTR_NAME):
6930 if self._bound_min > 0 and len(self._value) == 0:
6932 return all(v.ready for v in self._value)
6936 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6938 return any(v.bered for v in self._value)
6940 def __getstate__(self):
6941 if hasattr(self._value, NEXT_ATTR_NAME):
6942 raise ValueError("can not pickle SequenceOf with iterator")
6943 return SequenceOfState(
6957 [copy(v) for v in self._value],
6962 def __setstate__(self, state):
6963 super(SequenceOf, self).__setstate__(state)
6964 self.spec = state.spec
6965 self._value = state.value
6966 self._bound_min = state.bound_min
6967 self._bound_max = state.bound_max
6969 def __eq__(self, their):
6970 if isinstance(their, self.__class__):
6972 self.spec == their.spec and
6973 self.tag == their.tag and
6974 self._expl == their._expl and
6975 self._value == their._value
6977 if hasattr(their, "__iter__"):
6978 return self._value == list(their)
6990 return self.__class__(
6994 (self._bound_min, self._bound_max)
6995 if bounds is None else bounds
6997 impl=self.tag if impl is None else impl,
6998 expl=self._expl if expl is None else expl,
6999 default=self.default if default is None else default,
7000 optional=self.optional if optional is None else optional,
7003 def __contains__(self, key):
7004 return key in self._value
7006 def append(self, value):
7007 if not isinstance(value, self.spec.__class__):
7008 raise InvalidValueType((self.spec.__class__,))
7009 if len(self._value) + 1 > self._bound_max:
7012 len(self._value) + 1,
7015 self._value.append(value)
7018 return iter(self._value)
7021 return len(self._value)
7023 def __setitem__(self, key, value):
7024 if not isinstance(value, self.spec.__class__):
7025 raise InvalidValueType((self.spec.__class__,))
7026 self._value[key] = self.spec(value=value)
7028 def __getitem__(self, key):
7029 return self._value[key]
7031 def _values_for_encoding(self):
7032 return iter(self._value)
7035 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7038 values_append = values.append
7039 class_expected = self.spec.__class__
7040 values_for_encoding = self._values_for_encoding()
7042 for v in values_for_encoding:
7043 if not isinstance(v, class_expected):
7044 raise InvalidValueType((class_expected,))
7045 values_append(v.encode())
7046 if not self._bound_min <= len(values) <= self._bound_max:
7047 raise BoundsError(self._bound_min, len(values), self._bound_max)
7048 value = b"".join(values)
7050 value = b"".join(v.encode() for v in self._values_for_encoding())
7051 return b"".join((self.tag, len_encode(len(value)), value))
7053 def _encode1st(self, state):
7054 state = super(SequenceOf, self)._encode1st(state)
7055 if hasattr(self._value, NEXT_ATTR_NAME):
7059 def _encode2nd(self, writer, state_iter):
7060 write_full(writer, self.tag + len_encode(next(state_iter)))
7061 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7064 class_expected = self.spec.__class__
7065 values_for_encoding = self._values_for_encoding()
7067 for v in values_for_encoding:
7068 if not isinstance(v, class_expected):
7069 raise InvalidValueType((class_expected,))
7070 v.encode2nd(writer, state_iter)
7072 if not self._bound_min <= values_count <= self._bound_max:
7073 raise BoundsError(self._bound_min, values_count, self._bound_max)
7075 for v in self._values_for_encoding():
7076 v.encode2nd(writer, state_iter)
7078 def _encode_cer(self, writer):
7079 write_full(writer, self.tag + LENINDEF)
7080 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7082 class_expected = self.spec.__class__
7084 values_for_encoding = self._values_for_encoding()
7086 for v in values_for_encoding:
7087 if not isinstance(v, class_expected):
7088 raise InvalidValueType((class_expected,))
7089 v.encode_cer(writer)
7091 if not self._bound_min <= values_count <= self._bound_max:
7092 raise BoundsError(self._bound_min, values_count, self._bound_max)
7094 for v in self._values_for_encoding():
7095 v.encode_cer(writer)
7096 write_full(writer, EOC)
7106 ordering_check=False,
7109 t, tlen, lv = tag_strip(tlv)
7110 except DecodeError as err:
7111 raise err.__class__(
7113 klass=self.__class__,
7114 decode_path=decode_path,
7119 klass=self.__class__,
7120 decode_path=decode_path,
7127 ctx_bered = ctx.get("bered", False)
7129 l, llen, v = len_decode(lv)
7130 except LenIndefForm as err:
7132 raise err.__class__(
7134 klass=self.__class__,
7135 decode_path=decode_path,
7138 l, llen, v = 0, 1, lv[1:]
7140 except DecodeError as err:
7141 raise err.__class__(
7143 klass=self.__class__,
7144 decode_path=decode_path,
7148 raise NotEnoughData(
7149 "encoded length is longer than data",
7150 klass=self.__class__,
7151 decode_path=decode_path,
7155 v, tail = v[:l], v[l:]
7157 sub_offset = offset + tlen + llen
7160 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7161 value_prev = memoryview(v[:0])
7165 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7167 sub_decode_path = decode_path + (str(_value_count),)
7169 for _decode_path, value, v_tail in spec.decode_evgen(
7173 decode_path=sub_decode_path,
7175 _ctx_immutable=False,
7177 yield _decode_path, value, v_tail
7179 _, value, v_tail = next(spec.decode_evgen(
7183 decode_path=sub_decode_path,
7185 _ctx_immutable=False,
7188 value_len = value.fulllen
7190 if value_prev.tobytes() > v[:value_len].tobytes():
7191 if ctx_bered or ctx_allow_unordered_set:
7195 "unordered " + self.asn1_type_name,
7196 klass=self.__class__,
7197 decode_path=sub_decode_path,
7200 value_prev = v[:value_len]
7203 _value.append(value)
7204 sub_offset += value_len
7207 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7209 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7210 klass=self.__class__,
7211 decode_path=decode_path,
7215 obj = self.__class__(
7216 value=None if evgen_mode else _value,
7218 bounds=(self._bound_min, self._bound_max),
7221 default=self.default,
7222 optional=self.optional,
7223 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7225 except BoundsError as err:
7228 klass=self.__class__,
7229 decode_path=decode_path,
7233 if v[:EOC_LEN].tobytes() != EOC:
7236 klass=self.__class__,
7237 decode_path=decode_path,
7242 obj.ber_encoded = ber_encoded
7243 yield decode_path, obj, tail
7247 pp_console_row(next(self.pps())),
7248 ", ".join(repr(v) for v in self._value),
7251 def pps(self, decode_path=()):
7254 asn1_type_name=self.asn1_type_name,
7255 obj_name=self.__class__.__name__,
7256 decode_path=decode_path,
7257 optional=self.optional,
7258 default=self == self.default,
7259 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7260 expl=None if self._expl is None else tag_decode(self._expl),
7265 expl_offset=self.expl_offset if self.expled else None,
7266 expl_tlen=self.expl_tlen if self.expled else None,
7267 expl_llen=self.expl_llen if self.expled else None,
7268 expl_vlen=self.expl_vlen if self.expled else None,
7269 expl_lenindef=self.expl_lenindef,
7270 lenindef=self.lenindef,
7271 ber_encoded=self.ber_encoded,
7274 for i, value in enumerate(self._value):
7275 yield value.pps(decode_path=decode_path + (str(i),))
7276 for pp in self.pps_lenindef(decode_path):
7280 class SetOf(SequenceOf):
7281 """``SET OF`` sequence type
7283 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7286 tag_default = tag_encode(form=TagFormConstructed, num=17)
7287 asn1_type_name = "SET OF"
7289 def _value_sanitize(self, value):
7290 value = super(SetOf, self)._value_sanitize(value)
7291 if hasattr(value, NEXT_ATTR_NAME):
7293 "SetOf does not support iterator values, as no sense in them"
7298 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7299 return b"".join((self.tag, len_encode(len(v)), v))
7301 def _encode2nd(self, writer, state_iter):
7302 write_full(writer, self.tag + len_encode(next(state_iter)))
7304 for v in self._values_for_encoding():
7306 v.encode2nd(buf.write, state_iter)
7307 values.append(buf.getvalue())
7310 write_full(writer, v)
7312 def _encode_cer(self, writer):
7313 write_full(writer, self.tag + LENINDEF)
7314 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7315 write_full(writer, v)
7316 write_full(writer, EOC)
7318 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7319 return super(SetOf, self)._decode(
7326 ordering_check=True,
7330 def obj_by_path(pypath): # pragma: no cover
7331 """Import object specified as string Python path
7333 Modules must be separated from classes/functions with ``:``.
7335 >>> obj_by_path("foo.bar:Baz")
7336 <class 'foo.bar.Baz'>
7337 >>> obj_by_path("foo.bar:Baz.boo")
7338 <classmethod 'foo.bar.Baz.boo'>
7340 mod, objs = pypath.rsplit(":", 1)
7341 from importlib import import_module
7342 obj = import_module(mod)
7343 for obj_name in objs.split("."):
7344 obj = getattr(obj, obj_name)
7348 def generic_decoder(): # pragma: no cover
7349 # All of this below is a big hack with self references
7350 choice = PrimitiveTypes()
7351 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7352 choice.specs["SetOf"] = SetOf(schema=choice)
7353 for i in six_xrange(31):
7354 choice.specs["SequenceOf%d" % i] = SequenceOf(
7358 choice.specs["Any"] = Any()
7360 # Class name equals to type name, to omit it from output
7361 class SEQUENCEOF(SequenceOf):
7369 with_decode_path=False,
7370 decode_path_only=(),
7373 def _pprint_pps(pps):
7375 if hasattr(pp, "_fields"):
7377 decode_path_only != () and
7378 pp.decode_path[:len(decode_path_only)] != decode_path_only
7381 if pp.asn1_type_name == Choice.asn1_type_name:
7383 pp_kwargs = pp._asdict()
7384 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7385 pp = _pp(**pp_kwargs)
7386 yield pp_console_row(
7391 with_colours=with_colours,
7392 with_decode_path=with_decode_path,
7393 decode_path_len_decrease=len(decode_path_only),
7395 for row in pp_console_blob(
7397 decode_path_len_decrease=len(decode_path_only),
7401 for row in _pprint_pps(pp):
7403 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7404 return SEQUENCEOF(), pprint_any
7407 def main(): # pragma: no cover
7409 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7410 parser.add_argument(
7414 help="Skip that number of bytes from the beginning",
7416 parser.add_argument(
7418 help="Python paths to dictionary with OIDs, comma separated",
7420 parser.add_argument(
7422 help="Python path to schema definition to use",
7424 parser.add_argument(
7425 "--defines-by-path",
7426 help="Python path to decoder's defines_by_path",
7428 parser.add_argument(
7430 action="store_true",
7431 help="Disallow BER encoding",
7433 parser.add_argument(
7434 "--print-decode-path",
7435 action="store_true",
7436 help="Print decode paths",
7438 parser.add_argument(
7439 "--decode-path-only",
7440 help="Print only specified decode path",
7442 parser.add_argument(
7444 action="store_true",
7445 help="Allow explicit tag out-of-bound",
7447 parser.add_argument(
7449 action="store_true",
7450 help="Turn on event generation mode",
7452 parser.add_argument(
7454 type=argparse.FileType("rb"),
7455 help="Path to BER/CER/DER file you want to decode",
7457 args = parser.parse_args()
7459 args.RAWFile.seek(args.skip)
7460 raw = memoryview(args.RAWFile.read())
7461 args.RAWFile.close()
7463 raw = file_mmaped(args.RAWFile)[args.skip:]
7465 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7466 if args.oids else ()
7468 from functools import partial
7470 schema = obj_by_path(args.schema)
7471 pprinter = partial(pprint, big_blobs=True)
7473 schema, pprinter = generic_decoder()
7475 "bered": not args.nobered,
7476 "allow_expl_oob": args.allow_expl_oob,
7478 if args.defines_by_path is not None:
7479 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7480 from os import environ
7484 with_colours=environ.get("NO_COLOR") is None,
7485 with_decode_path=args.print_decode_path,
7487 () if args.decode_path_only is None else
7488 tuple(args.decode_path_only.split(":"))
7492 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7493 print(pprinter(obj, decode_path=decode_path))
7495 obj, tail = schema().decode(raw, ctx=ctx)
7496 print(pprinter(obj))
7498 print("\nTrailing data: %s" % hexenc(tail))
7501 if __name__ == "__main__":
7502 from pyderasn import *