3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2020 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER
23 format, unmarshal them in BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
244 All objects have ``pps()`` method, that is a generator of
245 :py:class:`pyderasn.PP` namedtuple, holding various raw information
246 about the object. If ``pps`` is called on sequences, then all underlying
247 ``PP`` will be yielded.
249 You can use :py:func:`pyderasn.pp_console_row` function, converting
250 those ``PP`` to human readable string. Actually exactly it is used for
251 all object ``repr``. But it is easy to write custom formatters.
253 >>> from pyderasn import pprint
254 >>> encoded = Integer(-12345).encode()
255 >>> obj, tail = Integer().decode(encoded)
256 >>> print(pprint(obj))
257 0 [1,1, 2] INTEGER -12345
261 Example certificate::
263 >>> print(pprint(crt))
264 0 [1,3,1604] Certificate SEQUENCE
265 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
266 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
267 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
268 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
269 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
270 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
272 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
273 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
274 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
275 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
276 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
277 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
278 . . . . . . . 13:02:45:53
280 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
281 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
282 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
284 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
285 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
286 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
291 Let's parse that output, human::
293 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
294 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
295 0 1 2 3 4 5 6 7 8 9 10 11
299 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
305 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
311 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
316 Offset of the object, where its DER/BER encoding begins.
317 Pay attention that it does **not** include explicit tag.
319 If explicit tag exists, then this is its length (tag + encoded length).
321 Length of object's tag. For example CHOICE does not have its own tag,
324 Length of encoded length.
326 Length of encoded value.
328 Visual indentation to show the depth of object in the hierarchy.
330 Object's name inside SEQUENCE/CHOICE.
332 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
333 here. "IMPLICIT" is omitted.
335 Object's class name, if set. Omitted if it is just an ordinary simple
336 value (like with ``algorithm`` in example above).
340 Object's value, if set. Can consist of multiple words (like OCTET/BIT
341 STRINGs above). We see ``v3`` value in Version, because it is named.
342 ``rdnSequence`` is the choice of CHOICE type.
344 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
345 default one, specified in the schema.
347 Shows does object contains any kind of BER encoded data (possibly
348 Sequence holding BER-encoded underlying value).
350 Only applicable to BER encoded data. Indefinite length encoding mark.
352 Only applicable to BER encoded data. If object has BER-specific
353 encoding, then ``BER`` will be shown. It does not depend on indefinite
354 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
355 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
363 ASN.1 structures often have ANY and OCTET STRING fields, that are
364 DEFINED BY some previously met ObjectIdentifier. This library provides
365 ability to specify mapping between some OID and field that must be
366 decoded with specific specification.
373 :py:class:`pyderasn.ObjectIdentifier` field inside
374 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
375 necessary for decoding structures. For example, CMS (:rfc:`5652`)
378 class ContentInfo(Sequence):
380 ("contentType", ContentType(defines=((("content",), {
381 id_digestedData: DigestedData(),
382 id_signedData: SignedData(),
384 ("content", Any(expl=tag_ctxc(0))),
387 ``contentType`` field tells that it defines that ``content`` must be
388 decoded with ``SignedData`` specification, if ``contentType`` equals to
389 ``id-signedData``. The same applies to ``DigestedData``. If
390 ``contentType`` contains unknown OID, then no automatic decoding is
393 You can specify multiple fields, that will be autodecoded -- that is why
394 ``defines`` kwarg is a sequence. You can specify defined field
395 relatively or absolutely to current decode path. For example ``defines``
396 for AlgorithmIdentifier of X.509's
397 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
401 id_ecPublicKey: ECParameters(),
402 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
404 (("..", "subjectPublicKey"), {
405 id_rsaEncryption: RSAPublicKey(),
406 id_GostR3410_2001: OctetString(),
410 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
411 autodecode its parameters inside SPKI's algorithm and its public key
414 Following types can be automatically decoded (DEFINED BY):
416 * :py:class:`pyderasn.Any`
417 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
418 * :py:class:`pyderasn.OctetString`
419 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
420 ``Any``/``BitString``/``OctetString``-s
422 When any of those fields is automatically decoded, then ``.defined``
423 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
424 was defined, ``value`` contains corresponding decoded value. For example
425 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
427 .. _defines_by_path_ctx:
429 defines_by_path context option
430 ______________________________
432 Sometimes you either can not or do not want to explicitly set *defines*
433 in the schema. You can dynamically apply those definitions when calling
434 :py:meth:`pyderasn.Obj.decode` method.
436 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
437 value must be sequence of following tuples::
439 (decode_path, defines)
441 where ``decode_path`` is a tuple holding so-called decode path to the
442 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
443 ``defines``, holding exactly the same value as accepted in its
444 :ref:`keyword argument <defines>`.
446 For example, again for CMS, you want to automatically decode
447 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
448 structures it may hold. Also, automatically decode ``controlSequence``
451 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
454 ((("content",), {id_signedData: SignedData()}),),
459 DecodePathDefBy(id_signedData),
464 id_cct_PKIData: PKIData(),
465 id_cct_PKIResponse: PKIResponse(),
471 DecodePathDefBy(id_signedData),
474 DecodePathDefBy(id_cct_PKIResponse),
480 id_cmc_recipientNonce: RecipientNonce(),
481 id_cmc_senderNonce: SenderNonce(),
482 id_cmc_statusInfoV2: CMCStatusInfoV2(),
483 id_cmc_transactionId: TransactionId(),
488 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
489 First function is useful for path construction when some automatic
490 decoding is already done. ``any`` means literally any value it meet --
491 useful for SEQUENCE/SET OF-s.
498 By default PyDERASN accepts only DER encoded data. By default it encodes
499 to DER. But you can optionally enable BER decoding with setting
500 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
501 constructed primitive types should be parsed successfully.
503 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
504 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
505 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
506 ``UTCTime``, ``GeneralizedTime`` can contain it.
507 * If object has an indefinite length encoding, then its ``lenindef``
508 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
509 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
511 * If object has an indefinite length encoded explicit tag, then
512 ``expl_lenindef`` is set to True.
513 * If object has either any of BER-related encoding (explicit tag
514 indefinite length, object's indefinite length, BER-encoding) or any
515 underlying component has that kind of encoding, then ``bered``
516 attribute is set to True. For example SignedData CMS can have
517 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
518 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
520 EOC (end-of-contents) token's length is taken in advance in object's
523 .. _allow_expl_oob_ctx:
525 Allow explicit tag out-of-bound
526 -------------------------------
528 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
529 one value, more than one object. If you set ``allow_expl_oob`` context
530 option to True, then no error will be raised and that invalid encoding
531 will be silently further processed. But pay attention that offsets and
532 lengths will be invalid in that case.
536 This option should be used only for skipping some decode errors, just
537 to see the decoded structure somehow.
541 Streaming and dealing with huge structures
542 ------------------------------------------
549 ASN.1 structures can be huge, they can hold millions of objects inside
550 (for example Certificate Revocation Lists (CRL), holding revocation
551 state for every previously issued X.509 certificate). CACert.org's 8 MiB
552 CRL file takes more than half a gigabyte of memory to hold the decoded
555 If you just simply want to check the signature over the ``tbsCertList``,
556 you can create specialized schema with that field represented as
557 OctetString for example::
559 class TBSCertListFast(Sequence):
562 ("revokedCertificates", OctetString(
563 impl=SequenceOf.tag_default,
569 This allows you to quickly decode a few fields and check the signature
570 over the ``tbsCertList`` bytes.
572 But how can you get all certificate's serial number from it, after you
573 trust that CRL after signature validation? You can use so called
574 ``evgen`` (event generation) mode, to catch the events/facts of some
575 successful object decoding. Let's use command line capabilities::
577 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
578 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
579 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
580 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
581 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
582 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
583 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
584 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
585 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
587 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
588 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
589 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
590 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
591 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
592 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
593 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
594 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
595 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
596 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
598 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
599 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
600 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
601 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
602 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
603 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
604 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
605 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
606 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
607 0 [1,4,9145534] CertificateList SEQUENCE
609 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
610 decodes underlying values. It can not tell if SEQUENCE is decoded, so
611 the event of the upper level SEQUENCE is the last one we see.
612 ``version`` field is just a single INTEGER -- it is decoded and event is
613 fired immediately. Then we see that ``algorithm`` and ``parameters``
614 fields are decoded and only after them the ``signature`` SEQUENCE is
615 fired as a successfully decoded. There are 4 events for each revoked
616 certificate entry in that CRL: ``userCertificate`` serial number,
617 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
618 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
620 We can do that in our ordinary Python code and understand where we are
621 by looking at deterministically generated decode paths (do not forget
622 about useful ``--print-decode-path`` CLI option). We must use
623 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
624 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
625 obj, tail)`` tuples::
627 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
629 len(decode_path) == 4 and
630 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
631 decode_path[3] == "userCertificate"
633 print("serial number:", int(obj))
635 Virtually it does not take any memory except at least needed for single
636 object storage. You can easily use that mode to determine required
637 object ``.offset`` and ``.*len`` to be able to decode it separately, or
638 maybe verify signature upon it just by taking bytes by ``.offset`` and
641 .. _evgen_mode_upto_ctx:
646 There is full ability to get any kind of data from the CRL in the
647 example above. However it is not too convenient to get the whole
648 ``RevokedCertificate`` structure, that is pretty lightweight and one may
649 do not want to disassemble it. You can use ``evgen_mode_upto``
650 :ref:`ctx <ctx>` option that semantically equals to
651 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
652 mapped to any non-None value. If specified decode path is met, then any
653 subsequent objects won't be decoded in evgen mode. That allows us to
654 parse the CRL above with fully assembled ``RevokedCertificate``::
656 for decode_path, obj, _ in CertificateList().decode_evgen(
658 ctx={"evgen_mode_upto": (
659 (("tbsCertList", "revokedCertificates", any), True),
663 len(decode_path) == 3 and
664 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
666 print("serial number:", int(obj["userCertificate"]))
673 POSIX compliant systems have ``mmap`` syscall, giving ability to work
674 the memory mapped file. You can deal with the file like it was an
675 ordinary binary string, allowing you not to load it to the memory first.
676 Also you can use them as an input for OCTET STRING, taking no Python
677 memory for their storage.
679 There is convenient :py:func:`pyderasn.file_mmaped` function that
680 creates read-only memoryview on the file contents::
682 with open("huge", "rb") as fd:
683 raw = file_mmaped(fd)
684 obj = Something.decode(raw)
688 mmap-ed files in Python2.7 does not implement buffer protocol, so
689 memoryview won't work on them.
693 mmap maps the **whole** file. So it plays no role if you seek-ed it
694 before. Take the slice of the resulting memoryview with required
699 If you use ZFS as underlying storage, then pay attention that
700 currently most platforms does not deal good with ZFS ARC and ordinary
701 page cache used for mmaps. It can take twice the necessary size in
702 the memory: both in page cache and ZFS ARC.
707 We can parse any kind of data now, but how can we produce files
708 streamingly, without storing their encoded representation in memory?
709 SEQUENCE by default encodes in memory all its values, joins them in huge
710 binary string, just to know the exact size of SEQUENCE's value for
711 encoding it in TLV. DER requires you to know all exact sizes of the
714 You can use CER encoding mode, that slightly differs from the DER, but
715 does not require exact sizes knowledge, allowing streaming encoding
716 directly to some writer/buffer. Just use
717 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
718 encoded data will flow::
720 opener = io.open if PY2 else open
721 with opener("result", "wb") as fd:
722 obj.encode_cer(fd.write)
727 obj.encode_cer(buf.write)
729 If you do not want to create in-memory buffer every time, then you can
730 use :py:func:`pyderasn.encode_cer` function::
732 data = encode_cer(obj)
734 Remember that CER is **not valid** DER in most cases, so you **have to**
735 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
736 decoding. Also currently there is **no** validation that provided CER is
737 valid one -- you are sure that it has only valid BER encoding.
741 SET OF values can not be streamingly encoded, because they are
742 required to be sorted byte-by-byte. Big SET OF values still will take
743 much memory. Use neither SET nor SET OF values, as modern ASN.1
746 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
747 OCTET STRINGs! They will be streamingly copied from underlying file to
748 the buffer using 1 KB chunks.
750 Some structures require that some of the elements have to be forcefully
751 DER encoded. For example ``SignedData`` CMS requires you to encode
752 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
753 encode everything else in BER. You can tell any of the structures to be
754 forcefully encoded in DER during CER encoding, by specifying
755 ``der_forced=True`` attribute::
757 class Certificate(Sequence):
761 class SignedAttributes(SetOf):
766 .. _agg_octet_string:
771 In most cases, huge quantity of binary data is stored as OCTET STRING.
772 CER encoding splits it on 1 KB chunks. BER allows splitting on various
773 levels of chunks inclusion::
775 SOME STRING[CONSTRUCTED]
776 OCTET STRING[CONSTRUCTED]
777 OCTET STRING[PRIMITIVE]
779 OCTET STRING[PRIMITIVE]
781 OCTET STRING[PRIMITIVE]
783 OCTET STRING[PRIMITIVE]
785 OCTET STRING[CONSTRUCTED]
786 OCTET STRING[PRIMITIVE]
788 OCTET STRING[PRIMITIVE]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[CONSTRUCTED]
792 OCTET STRING[PRIMITIVE]
795 You can not just take the offset and some ``.vlen`` of the STRING and
796 treat it as the payload. If you decode it without
797 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
798 and ``bytes()`` will give the whole payload contents.
800 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
801 small memory footprint. There is convenient
802 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
803 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
804 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
805 SHA512 digest of its ``eContent``::
807 fd = open("data.p7m", "rb")
808 raw = file_mmaped(fd)
809 ctx = {"bered": True}
810 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
811 if decode_path == ("content",):
815 raise ValueError("no content found")
816 hasher_state = sha512()
818 hasher_state.update(data)
820 evgens = SignedData().decode_evgen(
821 raw[content.offset:],
822 offset=content.offset,
825 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
827 digest = hasher_state.digest()
829 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
830 copy the payload (without BER/CER encoding interleaved overhead) in it.
831 Virtually it won't take memory more than for keeping small structures
832 and 1 KB binary chunks.
836 SEQUENCE OF iterators
837 _____________________
839 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
840 classes. The only difference with providing the full list of objects, is
841 that type and bounds checking is done during encoding process. Also
842 sequence's value will be emptied after encoding, forcing you to set its
845 This is very useful when you have to create some huge objects, like
846 CRLs, with thousands and millions of entities inside. You can write the
847 generator taking necessary data from the database and giving the
848 ``RevokedCertificate`` objects. Only binary representation of that
849 objects will take memory during DER encoding.
854 There is ability to do 2-pass encoding to DER, writing results directly
855 to specified writer (buffer, file, whatever). It could be 1.5+ times
856 slower than ordinary encoding, but it takes little memory for 1st pass
857 state storing. For example, 1st pass state for CACert.org's CRL with
858 ~416K of certificate entries takes nearly 3.5 MB of memory.
859 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
860 nearly 0.5 KB of memory.
862 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
863 iterators <seqof-iterators>` and write directly to opened file, then
864 there is very small memory footprint.
866 1st pass traverses through all the objects of the structure and returns
867 the size of DER encoded structure, together with 1st pass state object.
868 That state contains precalculated lengths for various objects inside the
873 fulllen, state = obj.encode1st()
875 2nd pass takes the writer and 1st pass state. It traverses through all
876 the objects again, but writes their encoded representation to the writer.
880 opener = io.open if PY2 else open
881 with opener("result", "wb") as fd:
882 obj.encode2nd(fd.write, iter(state))
886 You **MUST NOT** use 1st pass state if anything is changed in the
887 objects. It is intended to be used immediately after 1st pass is
890 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
891 have to reinitialize the values after the 1st pass. And you **have to**
892 be sure that the iterator gives exactly the same values as previously.
893 Yes, you have to run your iterator twice -- because this is two pass
896 If you want to encode to the memory, then you can use convenient
897 :py:func:`pyderasn.encode2pass` helper.
901 .. autoclass:: pyderasn.Obj
909 .. autoclass:: pyderasn.Boolean
914 .. autoclass:: pyderasn.Integer
915 :members: __init__, named
919 .. autoclass:: pyderasn.BitString
920 :members: __init__, bit_len, named
924 .. autoclass:: pyderasn.OctetString
929 .. autoclass:: pyderasn.Null
934 .. autoclass:: pyderasn.ObjectIdentifier
939 .. autoclass:: pyderasn.Enumerated
943 .. autoclass:: pyderasn.CommonString
947 .. autoclass:: pyderasn.NumericString
951 .. autoclass:: pyderasn.PrintableString
952 :members: __init__, allow_asterisk, allow_ampersand
956 .. autoclass:: pyderasn.UTCTime
957 :members: __init__, todatetime
961 .. autoclass:: pyderasn.GeneralizedTime
962 :members: __init__, todatetime
969 .. autoclass:: pyderasn.Choice
970 :members: __init__, choice, value
974 .. autoclass:: PrimitiveTypes
978 .. autoclass:: pyderasn.Any
986 .. autoclass:: pyderasn.Sequence
991 .. autoclass:: pyderasn.Set
996 .. autoclass:: pyderasn.SequenceOf
1001 .. autoclass:: pyderasn.SetOf
1007 .. autofunction:: pyderasn.abs_decode_path
1008 .. autofunction:: pyderasn.agg_octet_string
1009 .. autofunction:: pyderasn.colonize_hex
1010 .. autofunction:: pyderasn.encode2pass
1011 .. autofunction:: pyderasn.encode_cer
1012 .. autofunction:: pyderasn.file_mmaped
1013 .. autofunction:: pyderasn.hexenc
1014 .. autofunction:: pyderasn.hexdec
1015 .. autofunction:: pyderasn.tag_encode
1016 .. autofunction:: pyderasn.tag_decode
1017 .. autofunction:: pyderasn.tag_ctxp
1018 .. autofunction:: pyderasn.tag_ctxc
1019 .. autoclass:: pyderasn.DecodeError
1021 .. autoclass:: pyderasn.NotEnoughData
1022 .. autoclass:: pyderasn.ExceedingData
1023 .. autoclass:: pyderasn.LenIndefForm
1024 .. autoclass:: pyderasn.TagMismatch
1025 .. autoclass:: pyderasn.InvalidLength
1026 .. autoclass:: pyderasn.InvalidOID
1027 .. autoclass:: pyderasn.ObjUnknown
1028 .. autoclass:: pyderasn.ObjNotReady
1029 .. autoclass:: pyderasn.InvalidValueType
1030 .. autoclass:: pyderasn.BoundsError
1037 You can decode DER/BER files using command line abilities::
1039 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1041 If there is no schema for your file, then you can try parsing it without,
1042 but of course IMPLICIT tags will often make it impossible. But result is
1043 good enough for the certificate above::
1045 $ python -m pyderasn path/to/file
1046 0 [1,3,1604] . >: SEQUENCE OF
1047 4 [1,3,1453] . . >: SEQUENCE OF
1048 8 [0,0, 5] . . . . >: [0] ANY
1049 . . . . . A0:03:02:01:02
1050 13 [1,1, 3] . . . . >: INTEGER 61595
1051 18 [1,1, 13] . . . . >: SEQUENCE OF
1052 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1053 31 [1,1, 0] . . . . . . >: NULL
1054 33 [1,3, 274] . . . . >: SEQUENCE OF
1055 37 [1,1, 11] . . . . . . >: SET OF
1056 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1057 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1058 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1060 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1061 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1062 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1063 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1064 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1065 . . . . . . . . . 61:2E:63:6F:6D:2F
1066 1461 [1,1, 13] . . >: SEQUENCE OF
1067 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1068 1474 [1,1, 0] . . . . >: NULL
1069 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1070 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1071 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1077 If you have got dictionaries with ObjectIdentifiers, like example one
1078 from ``tests/test_crts.py``::
1081 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1082 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1084 "2.5.4.10": "id-at-organizationName",
1085 "2.5.4.11": "id-at-organizationalUnitName",
1088 then you can pass it to pretty printer to see human readable OIDs::
1090 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1092 37 [1,1, 11] . . . . . . >: SET OF
1093 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1094 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1095 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1096 50 [1,1, 18] . . . . . . >: SET OF
1097 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1098 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1099 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1100 70 [1,1, 18] . . . . . . >: SET OF
1101 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1102 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1103 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1109 Each decoded element has so-called decode path: sequence of structure
1110 names it is passing during the decode process. Each element has its own
1111 unique path inside the whole ASN.1 tree. You can print it out with
1112 ``--print-decode-path`` option::
1114 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1115 0 [1,3,1604] Certificate SEQUENCE []
1116 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1117 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1118 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1119 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1120 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1121 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1123 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1124 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1125 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1126 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1127 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1128 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1129 . . . . . . . 13:02:45:53
1130 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1133 Now you can print only the specified tree, for example signature algorithm::
1135 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1136 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1137 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1138 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1142 from array import array
1143 from codecs import getdecoder
1144 from codecs import getencoder
1145 from collections import namedtuple
1146 from collections import OrderedDict
1147 from copy import copy
1148 from datetime import datetime
1149 from datetime import timedelta
1150 from io import BytesIO
1151 from math import ceil
1152 from mmap import mmap
1153 from mmap import PROT_READ
1154 from operator import attrgetter
1155 from string import ascii_letters
1156 from string import digits
1157 from sys import maxsize as sys_maxsize
1158 from sys import version_info
1159 from unicodedata import category as unicat
1161 from six import add_metaclass
1162 from six import binary_type
1163 from six import byte2int
1164 from six import indexbytes
1165 from six import int2byte
1166 from six import integer_types
1167 from six import iterbytes
1168 from six import iteritems
1169 from six import itervalues
1171 from six import string_types
1172 from six import text_type
1173 from six import unichr as six_unichr
1174 from six.moves import xrange as six_xrange
1178 from termcolor import colored
1179 except ImportError: # pragma: no cover
1180 def colored(what, *args, **kwargs):
1230 "TagClassApplication",
1233 "TagClassUniversal",
1234 "TagFormConstructed",
1245 TagClassUniversal = 0
1246 TagClassApplication = 1 << 6
1247 TagClassContext = 1 << 7
1248 TagClassPrivate = 1 << 6 | 1 << 7
1249 TagFormPrimitive = 0
1250 TagFormConstructed = 1 << 5
1252 TagClassContext: "",
1253 TagClassApplication: "APPLICATION ",
1254 TagClassPrivate: "PRIVATE ",
1255 TagClassUniversal: "UNIV ",
1259 LENINDEF = b"\x80" # length indefinite mark
1260 LENINDEF_PP_CHAR = "I" if PY2 else "∞"
1261 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1262 SET01 = frozenset("01")
1263 DECIMALS = frozenset(digits)
1264 DECIMAL_SIGNS = ".,"
1265 NEXT_ATTR_NAME = "next" if PY2 else "__next__"
1268 def file_mmaped(fd):
1269 """Make mmap-ed memoryview for reading from file
1271 :param fd: file object
1272 :returns: memoryview over read-only mmap-ing of the whole file
1274 return memoryview(mmap(fd.fileno(), 0, prot=PROT_READ))
1277 if not set(value) <= DECIMALS:
1278 raise ValueError("non-pure integer")
1281 def fractions2float(fractions_raw):
1282 pureint(fractions_raw)
1283 return float("0." + fractions_raw)
1286 def get_def_by_path(defines_by_path, sub_decode_path):
1287 """Get define by decode path
1289 for path, define in defines_by_path:
1290 if len(path) != len(sub_decode_path):
1292 for p1, p2 in zip(path, sub_decode_path):
1293 if (not p1 is any) and (p1 != p2):
1299 ########################################################################
1301 ########################################################################
1303 class ASN1Error(ValueError):
1307 class DecodeError(ASN1Error):
1308 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1310 :param str msg: reason of decode failing
1311 :param klass: optional exact DecodeError inherited class (like
1312 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1313 :py:exc:`InvalidLength`)
1314 :param decode_path: tuple of strings. It contains human
1315 readable names of the fields through which
1316 decoding process has passed
1317 :param int offset: binary offset where failure happened
1319 super(DecodeError, self).__init__()
1322 self.decode_path = decode_path
1323 self.offset = offset
1328 "" if self.klass is None else self.klass.__name__,
1330 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1331 if len(self.decode_path) > 0 else ""
1333 ("(at %d)" % self.offset) if self.offset > 0 else "",
1339 return "%s(%s)" % (self.__class__.__name__, self)
1342 class NotEnoughData(DecodeError):
1346 class ExceedingData(ASN1Error):
1347 def __init__(self, nbytes):
1348 super(ExceedingData, self).__init__()
1349 self.nbytes = nbytes
1352 return "%d trailing bytes" % self.nbytes
1355 return "%s(%s)" % (self.__class__.__name__, self)
1358 class LenIndefForm(DecodeError):
1362 class TagMismatch(DecodeError):
1366 class InvalidLength(DecodeError):
1370 class InvalidOID(DecodeError):
1374 class ObjUnknown(ASN1Error):
1375 def __init__(self, name):
1376 super(ObjUnknown, self).__init__()
1380 return "object is unknown: %s" % self.name
1383 return "%s(%s)" % (self.__class__.__name__, self)
1386 class ObjNotReady(ASN1Error):
1387 def __init__(self, name):
1388 super(ObjNotReady, self).__init__()
1392 return "object is not ready: %s" % self.name
1395 return "%s(%s)" % (self.__class__.__name__, self)
1398 class InvalidValueType(ASN1Error):
1399 def __init__(self, expected_types):
1400 super(InvalidValueType, self).__init__()
1401 self.expected_types = expected_types
1404 return "invalid value type, expected: %s" % ", ".join(
1405 [repr(t) for t in self.expected_types]
1409 return "%s(%s)" % (self.__class__.__name__, self)
1412 class BoundsError(ASN1Error):
1413 def __init__(self, bound_min, value, bound_max):
1414 super(BoundsError, self).__init__()
1415 self.bound_min = bound_min
1417 self.bound_max = bound_max
1420 return "unsatisfied bounds: %s <= %s <= %s" % (
1427 return "%s(%s)" % (self.__class__.__name__, self)
1430 ########################################################################
1432 ########################################################################
1434 _hexdecoder = getdecoder("hex")
1435 _hexencoder = getencoder("hex")
1439 """Binary data to hexadecimal string convert
1441 return _hexdecoder(data)[0]
1445 """Hexadecimal string to binary data convert
1447 return _hexencoder(data)[0].decode("ascii")
1450 def int_bytes_len(num, byte_len=8):
1453 return int(ceil(float(num.bit_length()) / byte_len))
1456 def zero_ended_encode(num):
1457 octets = bytearray(int_bytes_len(num, 7))
1459 octets[i] = num & 0x7F
1463 octets[i] = 0x80 | (num & 0x7F)
1466 return bytes(octets)
1469 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1470 """Encode tag to binary form
1472 :param int num: tag's number
1473 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1474 :py:data:`pyderasn.TagClassContext`,
1475 :py:data:`pyderasn.TagClassApplication`,
1476 :py:data:`pyderasn.TagClassPrivate`)
1477 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1478 :py:data:`pyderasn.TagFormConstructed`)
1482 return int2byte(klass | form | num)
1483 # [XX|X|11111][1.......][1.......] ... [0.......]
1484 return int2byte(klass | form | 31) + zero_ended_encode(num)
1487 def tag_decode(tag):
1488 """Decode tag from binary form
1492 No validation is performed, assuming that it has already passed.
1494 It returns tuple with three integers, as
1495 :py:func:`pyderasn.tag_encode` accepts.
1497 first_octet = byte2int(tag)
1498 klass = first_octet & 0xC0
1499 form = first_octet & 0x20
1500 if first_octet & 0x1F < 0x1F:
1501 return (klass, form, first_octet & 0x1F)
1503 for octet in iterbytes(tag[1:]):
1506 return (klass, form, num)
1510 """Create CONTEXT PRIMITIVE tag
1512 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1516 """Create CONTEXT CONSTRUCTED tag
1518 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1521 def tag_strip(data):
1522 """Take off tag from the data
1524 :returns: (encoded tag, tag length, remaining data)
1527 raise NotEnoughData("no data at all")
1528 if byte2int(data) & 0x1F < 31:
1529 return data[:1], 1, data[1:]
1534 raise DecodeError("unfinished tag")
1535 if indexbytes(data, i) & 0x80 == 0:
1538 return data[:i], i, data[i:]
1544 octets = bytearray(int_bytes_len(l) + 1)
1545 octets[0] = 0x80 | (len(octets) - 1)
1546 for i in six_xrange(len(octets) - 1, 0, -1):
1547 octets[i] = l & 0xFF
1549 return bytes(octets)
1552 def len_decode(data):
1555 :returns: (decoded length, length's length, remaining data)
1556 :raises LenIndefForm: if indefinite form encoding is met
1559 raise NotEnoughData("no data at all")
1560 first_octet = byte2int(data)
1561 if first_octet & 0x80 == 0:
1562 return first_octet, 1, data[1:]
1563 octets_num = first_octet & 0x7F
1564 if octets_num + 1 > len(data):
1565 raise NotEnoughData("encoded length is longer than data")
1567 raise LenIndefForm()
1568 if byte2int(data[1:]) == 0:
1569 raise DecodeError("leading zeros")
1571 for v in iterbytes(data[1:1 + octets_num]):
1574 raise DecodeError("long form instead of short one")
1575 return l, 1 + octets_num, data[1 + octets_num:]
1578 LEN0 = len_encode(0)
1579 LEN1 = len_encode(1)
1580 LEN1K = len_encode(1000)
1584 """How many bytes length field will take
1588 if l < 256: # 1 << 8
1590 if l < 65536: # 1 << 16
1592 if l < 16777216: # 1 << 24
1594 if l < 4294967296: # 1 << 32
1596 if l < 1099511627776: # 1 << 40
1598 if l < 281474976710656: # 1 << 48
1600 if l < 72057594037927936: # 1 << 56
1602 raise OverflowError("too big length")
1605 def write_full(writer, data):
1606 """Fully write provided data
1608 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1610 BytesIO does not guarantee that the whole data will be written at
1611 once. That function write everything provided, raising an error if
1612 ``writer`` returns None.
1614 data = memoryview(data)
1616 while written != len(data):
1617 n = writer(data[written:])
1619 raise ValueError("can not write to buf")
1623 # If it is 64-bit system, then use compact 64-bit array of unsigned
1624 # longs. Use an ordinary list with universal integers otherwise, that
1626 if sys_maxsize > 2 ** 32:
1627 def state_2pass_new():
1630 def state_2pass_new():
1634 ########################################################################
1636 ########################################################################
1638 class AutoAddSlots(type):
1639 def __new__(cls, name, bases, _dict):
1640 _dict["__slots__"] = _dict.get("__slots__", ())
1641 return type.__new__(cls, name, bases, _dict)
1644 BasicState = namedtuple("BasicState", (
1657 ), **NAMEDTUPLE_KWARGS)
1660 @add_metaclass(AutoAddSlots)
1662 """Common ASN.1 object class
1664 All ASN.1 types are inherited from it. It has metaclass that
1665 automatically adds ``__slots__`` to all inherited classes.
1690 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1691 self._expl = getattr(self, "expl", None) if expl is None else expl
1692 if self.tag != self.tag_default and self._expl is not None:
1693 raise ValueError("implicit and explicit tags can not be set simultaneously")
1694 if self.tag is None:
1695 self._tag_order = None
1697 tag_class, _, tag_num = tag_decode(
1698 self.tag if self._expl is None else self._expl
1700 self._tag_order = (tag_class, tag_num)
1701 if default is not None:
1703 self.optional = optional
1704 self.offset, self.llen, self.vlen = _decoded
1706 self.expl_lenindef = False
1707 self.lenindef = False
1708 self.ber_encoded = False
1711 def ready(self): # pragma: no cover
1712 """Is object ready to be encoded?
1714 raise NotImplementedError()
1716 def _assert_ready(self):
1718 raise ObjNotReady(self.__class__.__name__)
1722 """Is either object or any elements inside is BER encoded?
1724 return self.expl_lenindef or self.lenindef or self.ber_encoded
1728 """Is object decoded?
1730 return (self.llen + self.vlen) > 0
1732 def __getstate__(self): # pragma: no cover
1733 """Used for making safe to be mutable pickleable copies
1735 raise NotImplementedError()
1737 def __setstate__(self, state):
1738 if state.version != __version__:
1739 raise ValueError("data is pickled by different PyDERASN version")
1740 self.tag = state.tag
1741 self._tag_order = state.tag_order
1742 self._expl = state.expl
1743 self.default = state.default
1744 self.optional = state.optional
1745 self.offset = state.offset
1746 self.llen = state.llen
1747 self.vlen = state.vlen
1748 self.expl_lenindef = state.expl_lenindef
1749 self.lenindef = state.lenindef
1750 self.ber_encoded = state.ber_encoded
1753 def tag_order(self):
1754 """Tag's (class, number) used for DER/CER sorting
1756 return self._tag_order
1759 def tag_order_cer(self):
1760 return self.tag_order
1764 """.. seealso:: :ref:`decoding`
1766 return len(self.tag)
1770 """.. seealso:: :ref:`decoding`
1772 return self.tlen + self.llen + self.vlen
1774 def __str__(self): # pragma: no cover
1775 return self.__bytes__() if PY2 else self.__unicode__()
1777 def __ne__(self, their):
1778 return not(self == their)
1780 def __gt__(self, their): # pragma: no cover
1781 return not(self < their)
1783 def __le__(self, their): # pragma: no cover
1784 return (self == their) or (self < their)
1786 def __ge__(self, their): # pragma: no cover
1787 return (self == their) or (self > their)
1789 def _encode(self): # pragma: no cover
1790 raise NotImplementedError()
1792 def _encode_cer(self, writer):
1793 write_full(writer, self._encode())
1795 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1796 yield NotImplemented
1798 def _encode1st(self, state):
1799 raise NotImplementedError()
1801 def _encode2nd(self, writer, state_iter):
1802 raise NotImplementedError()
1805 """DER encode the structure
1807 :returns: DER representation
1809 raw = self._encode()
1810 if self._expl is None:
1812 return b"".join((self._expl, len_encode(len(raw)), raw))
1814 def encode1st(self, state=None):
1815 """Do the 1st pass of 2-pass encoding
1817 :rtype: (int, array("L"))
1818 :returns: full length of encoded data and precalculated various
1822 state = state_2pass_new()
1823 if self._expl is None:
1824 return self._encode1st(state)
1826 idx = len(state) - 1
1827 vlen, _ = self._encode1st(state)
1829 fulllen = len(self._expl) + len_size(vlen) + vlen
1830 return fulllen, state
1832 def encode2nd(self, writer, state_iter):
1833 """Do the 2nd pass of 2-pass encoding
1835 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1836 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1838 if self._expl is None:
1839 self._encode2nd(writer, state_iter)
1841 write_full(writer, self._expl + len_encode(next(state_iter)))
1842 self._encode2nd(writer, state_iter)
1844 def encode_cer(self, writer):
1845 """CER encode the structure to specified writer
1847 :param writer: must comply with ``io.RawIOBase.write``
1848 behaviour. It takes slice to be written and
1849 returns number of bytes processed. If it returns
1850 None, then exception will be raised
1852 if self._expl is not None:
1853 write_full(writer, self._expl + LENINDEF)
1854 if getattr(self, "der_forced", False):
1855 write_full(writer, self._encode())
1857 self._encode_cer(writer)
1858 if self._expl is not None:
1859 write_full(writer, EOC)
1861 def hexencode(self):
1862 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1864 return hexenc(self.encode())
1874 _ctx_immutable=True,
1878 :param data: either binary or memoryview
1879 :param int offset: initial data's offset
1880 :param bool leavemm: do we need to leave memoryview of remaining
1881 data as is, or convert it to bytes otherwise
1882 :param decode_path: current decode path (tuples of strings,
1883 possibly with DecodePathDefBy) with will be
1884 the root for all underlying objects
1885 :param ctx: optional :ref:`context <ctx>` governing decoding process
1886 :param bool tag_only: decode only the tag, without length and
1887 contents (used only in Choice and Set
1888 structures, trying to determine if tag satisfies
1890 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1892 :returns: (Obj, remaining data)
1894 .. seealso:: :ref:`decoding`
1896 result = next(self.decode_evgen(
1908 _, obj, tail = result
1919 _ctx_immutable=True,
1922 """Decode with evgen mode on
1924 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1925 it returns the generator producing ``(decode_path, obj, tail)``
1927 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1931 elif _ctx_immutable:
1933 tlv = memoryview(data)
1936 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1939 if self._expl is None:
1940 for result in self._decode(
1943 decode_path=decode_path,
1946 evgen_mode=_evgen_mode,
1951 _decode_path, obj, tail = result
1952 if not _decode_path is decode_path:
1956 t, tlen, lv = tag_strip(tlv)
1957 except DecodeError as err:
1958 raise err.__class__(
1960 klass=self.__class__,
1961 decode_path=decode_path,
1966 klass=self.__class__,
1967 decode_path=decode_path,
1971 l, llen, v = len_decode(lv)
1972 except LenIndefForm as err:
1973 if not ctx.get("bered", False):
1974 raise err.__class__(
1976 klass=self.__class__,
1977 decode_path=decode_path,
1981 offset += tlen + llen
1982 for result in self._decode(
1985 decode_path=decode_path,
1988 evgen_mode=_evgen_mode,
1990 if tag_only: # pragma: no cover
1993 _decode_path, obj, tail = result
1994 if not _decode_path is decode_path:
1996 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
1997 if eoc_expected.tobytes() != EOC:
2000 klass=self.__class__,
2001 decode_path=decode_path,
2005 obj.expl_lenindef = True
2006 except DecodeError as err:
2007 raise err.__class__(
2009 klass=self.__class__,
2010 decode_path=decode_path,
2015 raise NotEnoughData(
2016 "encoded length is longer than data",
2017 klass=self.__class__,
2018 decode_path=decode_path,
2021 for result in self._decode(
2023 offset=offset + tlen + llen,
2024 decode_path=decode_path,
2027 evgen_mode=_evgen_mode,
2029 if tag_only: # pragma: no cover
2032 _decode_path, obj, tail = result
2033 if not _decode_path is decode_path:
2035 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2037 "explicit tag out-of-bound, longer than data",
2038 klass=self.__class__,
2039 decode_path=decode_path,
2042 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2044 def decod(self, data, offset=0, decode_path=(), ctx=None):
2045 """Decode the data, check that tail is empty
2047 :raises ExceedingData: if tail is not empty
2049 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2050 (decode without tail) that also checks that there is no
2053 obj, tail = self.decode(
2056 decode_path=decode_path,
2061 raise ExceedingData(len(tail))
2064 def hexdecode(self, data, *args, **kwargs):
2065 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2067 return self.decode(hexdec(data), *args, **kwargs)
2069 def hexdecod(self, data, *args, **kwargs):
2070 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2072 return self.decod(hexdec(data), *args, **kwargs)
2076 """.. seealso:: :ref:`decoding`
2078 return self._expl is not None
2082 """.. seealso:: :ref:`decoding`
2087 def expl_tlen(self):
2088 """.. seealso:: :ref:`decoding`
2090 return len(self._expl)
2093 def expl_llen(self):
2094 """.. seealso:: :ref:`decoding`
2096 if self.expl_lenindef:
2098 return len(len_encode(self.tlvlen))
2101 def expl_offset(self):
2102 """.. seealso:: :ref:`decoding`
2104 return self.offset - self.expl_tlen - self.expl_llen
2107 def expl_vlen(self):
2108 """.. seealso:: :ref:`decoding`
2113 def expl_tlvlen(self):
2114 """.. seealso:: :ref:`decoding`
2116 return self.expl_tlen + self.expl_llen + self.expl_vlen
2119 def fulloffset(self):
2120 """.. seealso:: :ref:`decoding`
2122 return self.expl_offset if self.expled else self.offset
2126 """.. seealso:: :ref:`decoding`
2128 return self.expl_tlvlen if self.expled else self.tlvlen
2130 def pps_lenindef(self, decode_path):
2131 if self.lenindef and not (
2132 getattr(self, "defined", None) is not None and
2133 self.defined[1].lenindef
2136 asn1_type_name="EOC",
2138 decode_path=decode_path,
2140 self.offset + self.tlvlen -
2141 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2149 if self.expl_lenindef:
2151 asn1_type_name="EOC",
2152 obj_name="EXPLICIT",
2153 decode_path=decode_path,
2154 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2163 def encode_cer(obj):
2164 """Encode to CER in memory buffer
2166 :returns bytes: memory buffer contents
2169 obj.encode_cer(buf.write)
2170 return buf.getvalue()
2173 def encode2pass(obj):
2174 """Encode (2-pass mode) to DER in memory buffer
2176 :returns bytes: memory buffer contents
2179 _, state = obj.encode1st()
2180 obj.encode2nd(buf.write, iter(state))
2181 return buf.getvalue()
2184 class DecodePathDefBy(object):
2185 """DEFINED BY representation inside decode path
2187 __slots__ = ("defined_by",)
2189 def __init__(self, defined_by):
2190 self.defined_by = defined_by
2192 def __ne__(self, their):
2193 return not(self == their)
2195 def __eq__(self, their):
2196 if not isinstance(their, self.__class__):
2198 return self.defined_by == their.defined_by
2201 return "DEFINED BY " + str(self.defined_by)
2204 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2207 ########################################################################
2209 ########################################################################
2211 PP = namedtuple("PP", (
2234 ), **NAMEDTUPLE_KWARGS)
2239 asn1_type_name="unknown",
2256 expl_lenindef=False,
2287 def _colourize(what, colour, with_colours, attrs=("bold",)):
2288 return colored(what, colour, attrs=attrs) if with_colours else what
2291 def colonize_hex(hexed):
2292 """Separate hexadecimal string with colons
2294 return ":".join(hexed[i:i + 2] for i in six_xrange(0, len(hexed), 2))
2303 with_decode_path=False,
2304 decode_path_len_decrease=0,
2311 " " if pp.expl_offset is None else
2312 ("-%d" % (pp.offset - pp.expl_offset))
2314 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2316 col = _colourize(col, "red", with_colours, ())
2317 col += _colourize("B", "red", with_colours) if pp.bered else " "
2319 col = "[%d,%d,%4d]%s" % (
2323 LENINDEF_PP_CHAR if pp.lenindef else " "
2325 col = _colourize(col, "green", with_colours, ())
2327 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2328 if decode_path_len > 0:
2329 cols.append(" ." * decode_path_len)
2330 ent = pp.decode_path[-1]
2331 if isinstance(ent, DecodePathDefBy):
2332 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2333 value = str(ent.defined_by)
2336 len(oid_maps) > 0 and
2337 ent.defined_by.asn1_type_name ==
2338 ObjectIdentifier.asn1_type_name
2340 for oid_map in oid_maps:
2341 oid_name = oid_map.get(value)
2342 if oid_name is not None:
2343 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2345 if oid_name is None:
2346 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2348 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2349 if pp.expl is not None:
2350 klass, _, num = pp.expl
2351 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2352 cols.append(_colourize(col, "blue", with_colours))
2353 if pp.impl is not None:
2354 klass, _, num = pp.impl
2355 col = "[%s%d]" % (TagClassReprs[klass], num)
2356 cols.append(_colourize(col, "blue", with_colours))
2357 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2358 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2360 cols.append(_colourize("BER", "red", with_colours))
2361 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2362 if pp.value is not None:
2364 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2366 len(oid_maps) > 0 and
2367 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
2369 for oid_map in oid_maps:
2370 oid_name = oid_map.get(value)
2371 if oid_name is not None:
2372 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2374 if pp.asn1_type_name == Integer.asn1_type_name:
2375 hex_repr = hex(int(pp.obj._value))[2:].upper()
2376 if len(hex_repr) % 2 != 0:
2377 hex_repr = "0" + hex_repr
2378 cols.append(_colourize(
2379 "(%s)" % colonize_hex(hex_repr),
2384 if pp.blob.__class__ == binary_type:
2385 cols.append(hexenc(pp.blob))
2386 elif pp.blob.__class__ == tuple:
2387 cols.append(", ".join(pp.blob))
2389 cols.append(_colourize("OPTIONAL", "red", with_colours))
2391 cols.append(_colourize("DEFAULT", "red", with_colours))
2392 if with_decode_path:
2393 cols.append(_colourize(
2394 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2398 return " ".join(cols)
2401 def pp_console_blob(pp, decode_path_len_decrease=0):
2402 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2403 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2404 if decode_path_len > 0:
2405 cols.append(" ." * (decode_path_len + 1))
2406 if pp.blob.__class__ == binary_type:
2407 blob = hexenc(pp.blob).upper()
2408 for i in six_xrange(0, len(blob), 32):
2409 chunk = blob[i:i + 32]
2410 yield " ".join(cols + [colonize_hex(chunk)])
2411 elif pp.blob.__class__ == tuple:
2412 yield " ".join(cols + [", ".join(pp.blob)])
2420 with_decode_path=False,
2421 decode_path_only=(),
2424 """Pretty print object
2426 :param Obj obj: object you want to pretty print
2427 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionary.
2428 Its human readable form is printed when OID is met
2429 :param big_blobs: if large binary objects are met (like OctetString
2430 values), do we need to print them too, on separate
2432 :param with_colours: colourize output, if ``termcolor`` library
2434 :param with_decode_path: print decode path
2435 :param decode_path_only: print only that specified decode path
2437 def _pprint_pps(pps):
2439 if hasattr(pp, "_fields"):
2441 decode_path_only != () and
2443 str(p) for p in pp.decode_path[:len(decode_path_only)]
2444 ) != decode_path_only
2448 yield pp_console_row(
2453 with_colours=with_colours,
2454 with_decode_path=with_decode_path,
2455 decode_path_len_decrease=len(decode_path_only),
2457 for row in pp_console_blob(
2459 decode_path_len_decrease=len(decode_path_only),
2463 yield pp_console_row(
2468 with_colours=with_colours,
2469 with_decode_path=with_decode_path,
2470 decode_path_len_decrease=len(decode_path_only),
2473 for row in _pprint_pps(pp):
2475 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2478 ########################################################################
2479 # ASN.1 primitive types
2480 ########################################################################
2482 BooleanState = namedtuple(
2484 BasicState._fields + ("value",),
2490 """``BOOLEAN`` boolean type
2492 >>> b = Boolean(True)
2494 >>> b == Boolean(True)
2500 tag_default = tag_encode(1)
2501 asn1_type_name = "BOOLEAN"
2513 :param value: set the value. Either boolean type, or
2514 :py:class:`pyderasn.Boolean` object
2515 :param bytes impl: override default tag with ``IMPLICIT`` one
2516 :param bytes expl: override default tag with ``EXPLICIT`` one
2517 :param default: set default value. Type same as in ``value``
2518 :param bool optional: is object ``OPTIONAL`` in sequence
2520 super(Boolean, self).__init__(impl, expl, default, optional, _decoded)
2521 self._value = None if value is None else self._value_sanitize(value)
2522 if default is not None:
2523 default = self._value_sanitize(default)
2524 self.default = self.__class__(
2530 self._value = default
2532 def _value_sanitize(self, value):
2533 if value.__class__ == bool:
2535 if issubclass(value.__class__, Boolean):
2537 raise InvalidValueType((self.__class__, bool))
2541 return self._value is not None
2543 def __getstate__(self):
2544 return BooleanState(
2560 def __setstate__(self, state):
2561 super(Boolean, self).__setstate__(state)
2562 self._value = state.value
2564 def __nonzero__(self):
2565 self._assert_ready()
2569 self._assert_ready()
2572 def __eq__(self, their):
2573 if their.__class__ == bool:
2574 return self._value == their
2575 if not issubclass(their.__class__, Boolean):
2578 self._value == their._value and
2579 self.tag == their.tag and
2580 self._expl == their._expl
2591 return self.__class__(
2593 impl=self.tag if impl is None else impl,
2594 expl=self._expl if expl is None else expl,
2595 default=self.default if default is None else default,
2596 optional=self.optional if optional is None else optional,
2600 self._assert_ready()
2601 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2603 def _encode1st(self, state):
2604 return len(self.tag) + 2, state
2606 def _encode2nd(self, writer, state_iter):
2607 self._assert_ready()
2608 write_full(writer, self._encode())
2610 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2612 t, _, lv = tag_strip(tlv)
2613 except DecodeError as err:
2614 raise err.__class__(
2616 klass=self.__class__,
2617 decode_path=decode_path,
2622 klass=self.__class__,
2623 decode_path=decode_path,
2630 l, _, v = len_decode(lv)
2631 except DecodeError as err:
2632 raise err.__class__(
2634 klass=self.__class__,
2635 decode_path=decode_path,
2639 raise InvalidLength(
2640 "Boolean's length must be equal to 1",
2641 klass=self.__class__,
2642 decode_path=decode_path,
2646 raise NotEnoughData(
2647 "encoded length is longer than data",
2648 klass=self.__class__,
2649 decode_path=decode_path,
2652 first_octet = byte2int(v)
2654 if first_octet == 0:
2656 elif first_octet == 0xFF:
2658 elif ctx.get("bered", False):
2663 "unacceptable Boolean value",
2664 klass=self.__class__,
2665 decode_path=decode_path,
2668 obj = self.__class__(
2672 default=self.default,
2673 optional=self.optional,
2674 _decoded=(offset, 1, 1),
2676 obj.ber_encoded = ber_encoded
2677 yield decode_path, obj, v[1:]
2680 return pp_console_row(next(self.pps()))
2682 def pps(self, decode_path=()):
2685 asn1_type_name=self.asn1_type_name,
2686 obj_name=self.__class__.__name__,
2687 decode_path=decode_path,
2688 value=str(self._value) if self.ready else None,
2689 optional=self.optional,
2690 default=self == self.default,
2691 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2692 expl=None if self._expl is None else tag_decode(self._expl),
2697 expl_offset=self.expl_offset if self.expled else None,
2698 expl_tlen=self.expl_tlen if self.expled else None,
2699 expl_llen=self.expl_llen if self.expled else None,
2700 expl_vlen=self.expl_vlen if self.expled else None,
2701 expl_lenindef=self.expl_lenindef,
2702 ber_encoded=self.ber_encoded,
2705 for pp in self.pps_lenindef(decode_path):
2709 IntegerState = namedtuple(
2711 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2717 """``INTEGER`` integer type
2719 >>> b = Integer(-123)
2721 >>> b == Integer(-123)
2726 >>> Integer(2, bounds=(1, 3))
2728 >>> Integer(5, bounds=(1, 3))
2729 Traceback (most recent call last):
2730 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2734 class Version(Integer):
2741 >>> v = Version("v1")
2748 {'v3': 2, 'v1': 0, 'v2': 1}
2750 __slots__ = ("specs", "_bound_min", "_bound_max")
2751 tag_default = tag_encode(2)
2752 asn1_type_name = "INTEGER"
2766 :param value: set the value. Either integer type, named value
2767 (if ``schema`` is specified in the class), or
2768 :py:class:`pyderasn.Integer` object
2769 :param bounds: set ``(MIN, MAX)`` value constraint.
2770 (-inf, +inf) by default
2771 :param bytes impl: override default tag with ``IMPLICIT`` one
2772 :param bytes expl: override default tag with ``EXPLICIT`` one
2773 :param default: set default value. Type same as in ``value``
2774 :param bool optional: is object ``OPTIONAL`` in sequence
2776 super(Integer, self).__init__(impl, expl, default, optional, _decoded)
2778 specs = getattr(self, "schema", {}) if _specs is None else _specs
2779 self.specs = specs if specs.__class__ == dict else dict(specs)
2780 self._bound_min, self._bound_max = getattr(
2783 (float("-inf"), float("+inf")),
2784 ) if bounds is None else bounds
2785 if value is not None:
2786 self._value = self._value_sanitize(value)
2787 if default is not None:
2788 default = self._value_sanitize(default)
2789 self.default = self.__class__(
2795 if self._value is None:
2796 self._value = default
2798 def _value_sanitize(self, value):
2799 if isinstance(value, integer_types):
2801 elif issubclass(value.__class__, Integer):
2802 value = value._value
2803 elif value.__class__ == str:
2804 value = self.specs.get(value)
2806 raise ObjUnknown("integer value: %s" % value)
2808 raise InvalidValueType((self.__class__, int, str))
2809 if not self._bound_min <= value <= self._bound_max:
2810 raise BoundsError(self._bound_min, value, self._bound_max)
2815 return self._value is not None
2817 def __getstate__(self):
2818 return IntegerState(
2837 def __setstate__(self, state):
2838 super(Integer, self).__setstate__(state)
2839 self.specs = state.specs
2840 self._value = state.value
2841 self._bound_min = state.bound_min
2842 self._bound_max = state.bound_max
2845 self._assert_ready()
2846 return int(self._value)
2849 self._assert_ready()
2850 return hash(b"".join((
2852 bytes(self._expl or b""),
2853 str(self._value).encode("ascii"),
2856 def __eq__(self, their):
2857 if isinstance(their, integer_types):
2858 return self._value == their
2859 if not issubclass(their.__class__, Integer):
2862 self._value == their._value and
2863 self.tag == their.tag and
2864 self._expl == their._expl
2867 def __lt__(self, their):
2868 return self._value < their._value
2872 """Return named representation (if exists) of the value
2874 for name, value in iteritems(self.specs):
2875 if value == self._value:
2888 return self.__class__(
2891 (self._bound_min, self._bound_max)
2892 if bounds is None else bounds
2894 impl=self.tag if impl is None else impl,
2895 expl=self._expl if expl is None else expl,
2896 default=self.default if default is None else default,
2897 optional=self.optional if optional is None else optional,
2901 def _encode_payload(self):
2902 self._assert_ready()
2906 octets = bytearray([0])
2910 octets = bytearray()
2912 octets.append((value & 0xFF) ^ 0xFF)
2914 if len(octets) == 0 or octets[-1] & 0x80 == 0:
2917 octets = bytearray()
2919 octets.append(value & 0xFF)
2921 if octets[-1] & 0x80 > 0:
2924 octets = bytes(octets)
2926 bytes_len = ceil(value.bit_length() / 8) or 1
2929 octets = value.to_bytes(
2934 except OverflowError:
2939 return b"".join((self.tag, len_encode(len(octets)), octets))
2942 octets = self._encode_payload()
2943 return b"".join((self.tag, len_encode(len(octets)), octets))
2945 def _encode1st(self, state):
2946 l = len(self._encode_payload())
2947 return len(self.tag) + len_size(l) + l, state
2949 def _encode2nd(self, writer, state_iter):
2950 write_full(writer, self._encode())
2952 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2954 t, _, lv = tag_strip(tlv)
2955 except DecodeError as err:
2956 raise err.__class__(
2958 klass=self.__class__,
2959 decode_path=decode_path,
2964 klass=self.__class__,
2965 decode_path=decode_path,
2972 l, llen, v = len_decode(lv)
2973 except DecodeError as err:
2974 raise err.__class__(
2976 klass=self.__class__,
2977 decode_path=decode_path,
2981 raise NotEnoughData(
2982 "encoded length is longer than data",
2983 klass=self.__class__,
2984 decode_path=decode_path,
2988 raise NotEnoughData(
2990 klass=self.__class__,
2991 decode_path=decode_path,
2994 v, tail = v[:l], v[l:]
2995 first_octet = byte2int(v)
2997 second_octet = byte2int(v[1:])
2999 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
3000 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3003 "non normalized integer",
3004 klass=self.__class__,
3005 decode_path=decode_path,
3010 if first_octet & 0x80 > 0:
3011 octets = bytearray()
3012 for octet in bytearray(v):
3013 octets.append(octet ^ 0xFF)
3014 for octet in octets:
3015 value = (value << 8) | octet
3019 for octet in bytearray(v):
3020 value = (value << 8) | octet
3022 value = int.from_bytes(v, byteorder="big", signed=True)
3024 obj = self.__class__(
3026 bounds=(self._bound_min, self._bound_max),
3029 default=self.default,
3030 optional=self.optional,
3032 _decoded=(offset, llen, l),
3034 except BoundsError as err:
3037 klass=self.__class__,
3038 decode_path=decode_path,
3041 yield decode_path, obj, tail
3044 return pp_console_row(next(self.pps()))
3046 def pps(self, decode_path=()):
3049 asn1_type_name=self.asn1_type_name,
3050 obj_name=self.__class__.__name__,
3051 decode_path=decode_path,
3052 value=(self.named or str(self._value)) if self.ready else None,
3053 optional=self.optional,
3054 default=self == self.default,
3055 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3056 expl=None if self._expl is None else tag_decode(self._expl),
3061 expl_offset=self.expl_offset if self.expled else None,
3062 expl_tlen=self.expl_tlen if self.expled else None,
3063 expl_llen=self.expl_llen if self.expled else None,
3064 expl_vlen=self.expl_vlen if self.expled else None,
3065 expl_lenindef=self.expl_lenindef,
3068 for pp in self.pps_lenindef(decode_path):
3072 BitStringState = namedtuple(
3074 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3079 class BitString(Obj):
3080 """``BIT STRING`` bit string type
3082 >>> BitString(b"hello world")
3083 BIT STRING 88 bits 68656c6c6f20776f726c64
3086 >>> b == b"hello world"
3091 >>> BitString("'0A3B5F291CD'H")
3092 BIT STRING 44 bits 0a3b5f291cd0
3093 >>> b = BitString("'010110000000'B")
3094 BIT STRING 12 bits 5800
3097 >>> b[0], b[1], b[2], b[3]
3098 (False, True, False, True)
3102 [False, True, False, True, True, False, False, False, False, False, False, False]
3106 class KeyUsage(BitString):
3108 ("digitalSignature", 0),
3109 ("nonRepudiation", 1),
3110 ("keyEncipherment", 2),
3113 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3114 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3116 ['nonRepudiation', 'keyEncipherment']
3118 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3122 Pay attention that BIT STRING can be encoded both in primitive
3123 and constructed forms. Decoder always checks constructed form tag
3124 additionally to specified primitive one. If BER decoding is
3125 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3126 of DER restrictions.
3128 __slots__ = ("tag_constructed", "specs", "defined")
3129 tag_default = tag_encode(3)
3130 asn1_type_name = "BIT STRING"
3143 :param value: set the value. Either binary type, tuple of named
3144 values (if ``schema`` is specified in the class),
3145 string in ``'XXX...'B`` form, or
3146 :py:class:`pyderasn.BitString` object
3147 :param bytes impl: override default tag with ``IMPLICIT`` one
3148 :param bytes expl: override default tag with ``EXPLICIT`` one
3149 :param default: set default value. Type same as in ``value``
3150 :param bool optional: is object ``OPTIONAL`` in sequence
3152 super(BitString, self).__init__(impl, expl, default, optional, _decoded)
3153 specs = getattr(self, "schema", {}) if _specs is None else _specs
3154 self.specs = specs if specs.__class__ == dict else dict(specs)
3155 self._value = None if value is None else self._value_sanitize(value)
3156 if default is not None:
3157 default = self._value_sanitize(default)
3158 self.default = self.__class__(
3164 self._value = default
3166 tag_klass, _, tag_num = tag_decode(self.tag)
3167 self.tag_constructed = tag_encode(
3169 form=TagFormConstructed,
3173 def _bits2octets(self, bits):
3174 if len(self.specs) > 0:
3175 bits = bits.rstrip("0")
3177 bits += "0" * ((8 - (bit_len % 8)) % 8)
3178 octets = bytearray(len(bits) // 8)
3179 for i in six_xrange(len(octets)):
3180 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3181 return bit_len, bytes(octets)
3183 def _value_sanitize(self, value):
3184 if isinstance(value, (string_types, binary_type)):
3186 isinstance(value, string_types) and
3187 value.startswith("'")
3189 if value.endswith("'B"):
3191 if not frozenset(value) <= SET01:
3192 raise ValueError("B's coding contains unacceptable chars")
3193 return self._bits2octets(value)
3194 if value.endswith("'H"):
3198 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3200 if value.__class__ == binary_type:
3201 return (len(value) * 8, value)
3202 raise InvalidValueType((self.__class__, string_types, binary_type))
3203 if value.__class__ == tuple:
3206 isinstance(value[0], integer_types) and
3207 value[1].__class__ == binary_type
3212 bit = self.specs.get(name)
3214 raise ObjUnknown("BitString value: %s" % name)
3217 return self._bits2octets("")
3218 bits = frozenset(bits)
3219 return self._bits2octets("".join(
3220 ("1" if bit in bits else "0")
3221 for bit in six_xrange(max(bits) + 1)
3223 if issubclass(value.__class__, BitString):
3225 raise InvalidValueType((self.__class__, binary_type, string_types))
3229 return self._value is not None
3231 def __getstate__(self):
3232 return BitStringState(
3247 self.tag_constructed,
3251 def __setstate__(self, state):
3252 super(BitString, self).__setstate__(state)
3253 self.specs = state.specs
3254 self._value = state.value
3255 self.tag_constructed = state.tag_constructed
3256 self.defined = state.defined
3259 self._assert_ready()
3260 for i in six_xrange(self._value[0]):
3265 """Returns number of bits in the string
3267 self._assert_ready()
3268 return self._value[0]
3270 def __bytes__(self):
3271 self._assert_ready()
3272 return self._value[1]
3274 def __eq__(self, their):
3275 if their.__class__ == bytes:
3276 return self._value[1] == their
3277 if not issubclass(their.__class__, BitString):
3280 self._value == their._value and
3281 self.tag == their.tag and
3282 self._expl == their._expl
3287 """Named representation (if exists) of the bits
3289 :returns: [str(name), ...]
3291 return [name for name, bit in iteritems(self.specs) if self[bit]]
3301 return self.__class__(
3303 impl=self.tag if impl is None else impl,
3304 expl=self._expl if expl is None else expl,
3305 default=self.default if default is None else default,
3306 optional=self.optional if optional is None else optional,
3310 def __getitem__(self, key):
3311 if key.__class__ == int:
3312 bit_len, octets = self._value
3316 byte2int(memoryview(octets)[key // 8:]) >>
3319 if isinstance(key, string_types):
3320 value = self.specs.get(key)
3322 raise ObjUnknown("BitString value: %s" % key)
3324 raise InvalidValueType((int, str))
3327 self._assert_ready()
3328 bit_len, octets = self._value
3331 len_encode(len(octets) + 1),
3332 int2byte((8 - bit_len % 8) % 8),
3336 def _encode1st(self, state):
3337 self._assert_ready()
3338 _, octets = self._value
3340 return len(self.tag) + len_size(l) + l, state
3342 def _encode2nd(self, writer, state_iter):
3343 bit_len, octets = self._value
3344 write_full(writer, b"".join((
3346 len_encode(len(octets) + 1),
3347 int2byte((8 - bit_len % 8) % 8),
3349 write_full(writer, octets)
3351 def _encode_cer(self, writer):
3352 bit_len, octets = self._value
3353 if len(octets) + 1 <= 1000:
3354 write_full(writer, self._encode())
3356 write_full(writer, self.tag_constructed)
3357 write_full(writer, LENINDEF)
3358 for offset in six_xrange(0, (len(octets) // 999) * 999, 999):
3359 write_full(writer, b"".join((
3360 BitString.tag_default,
3363 octets[offset:offset + 999],
3365 tail = octets[offset+999:]
3367 tail = int2byte((8 - bit_len % 8) % 8) + tail
3368 write_full(writer, b"".join((
3369 BitString.tag_default,
3370 len_encode(len(tail)),
3373 write_full(writer, EOC)
3375 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3377 t, tlen, lv = tag_strip(tlv)
3378 except DecodeError as err:
3379 raise err.__class__(
3381 klass=self.__class__,
3382 decode_path=decode_path,
3386 if tag_only: # pragma: no cover
3390 l, llen, v = len_decode(lv)
3391 except DecodeError as err:
3392 raise err.__class__(
3394 klass=self.__class__,
3395 decode_path=decode_path,
3399 raise NotEnoughData(
3400 "encoded length is longer than data",
3401 klass=self.__class__,
3402 decode_path=decode_path,
3406 raise NotEnoughData(
3408 klass=self.__class__,
3409 decode_path=decode_path,
3412 pad_size = byte2int(v)
3413 if l == 1 and pad_size != 0:
3415 "invalid empty value",
3416 klass=self.__class__,
3417 decode_path=decode_path,
3423 klass=self.__class__,
3424 decode_path=decode_path,
3427 if byte2int(v[l - 1:l]) & ((1 << pad_size) - 1) != 0:
3430 klass=self.__class__,
3431 decode_path=decode_path,
3434 v, tail = v[:l], v[l:]
3435 bit_len = (len(v) - 1) * 8 - pad_size
3436 obj = self.__class__(
3437 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3440 default=self.default,
3441 optional=self.optional,
3443 _decoded=(offset, llen, l),
3446 obj._value = (bit_len, None)
3447 yield decode_path, obj, tail
3449 if t != self.tag_constructed:
3451 klass=self.__class__,
3452 decode_path=decode_path,
3455 if not ctx.get("bered", False):
3457 "unallowed BER constructed encoding",
3458 klass=self.__class__,
3459 decode_path=decode_path,
3462 if tag_only: # pragma: no cover
3467 l, llen, v = len_decode(lv)
3468 except LenIndefForm:
3469 llen, l, v = 1, 0, lv[1:]
3471 except DecodeError as err:
3472 raise err.__class__(
3474 klass=self.__class__,
3475 decode_path=decode_path,
3479 raise NotEnoughData(
3480 "encoded length is longer than data",
3481 klass=self.__class__,
3482 decode_path=decode_path,
3485 if not lenindef and l == 0:
3486 raise NotEnoughData(
3488 klass=self.__class__,
3489 decode_path=decode_path,
3493 sub_offset = offset + tlen + llen
3497 if v[:EOC_LEN].tobytes() == EOC:
3504 "chunk out of bounds",
3505 klass=self.__class__,
3506 decode_path=decode_path + (str(len(chunks) - 1),),
3507 offset=chunks[-1].offset,
3509 sub_decode_path = decode_path + (str(len(chunks)),)
3512 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3515 decode_path=sub_decode_path,
3518 _ctx_immutable=False,
3520 yield _decode_path, chunk, v_tail
3522 _, chunk, v_tail = next(BitString().decode_evgen(
3525 decode_path=sub_decode_path,
3528 _ctx_immutable=False,
3533 "expected BitString encoded chunk",
3534 klass=self.__class__,
3535 decode_path=sub_decode_path,
3538 chunks.append(chunk)
3539 sub_offset += chunk.tlvlen
3540 vlen += chunk.tlvlen
3542 if len(chunks) == 0:
3545 klass=self.__class__,
3546 decode_path=decode_path,
3551 for chunk_i, chunk in enumerate(chunks[:-1]):
3552 if chunk.bit_len % 8 != 0:
3554 "BitString chunk is not multiple of 8 bits",
3555 klass=self.__class__,
3556 decode_path=decode_path + (str(chunk_i),),
3557 offset=chunk.offset,
3560 values.append(bytes(chunk))
3561 bit_len += chunk.bit_len
3562 chunk_last = chunks[-1]
3564 values.append(bytes(chunk_last))
3565 bit_len += chunk_last.bit_len
3566 obj = self.__class__(
3567 value=None if evgen_mode else (bit_len, b"".join(values)),
3570 default=self.default,
3571 optional=self.optional,
3573 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3576 obj._value = (bit_len, None)
3577 obj.lenindef = lenindef
3578 obj.ber_encoded = True
3579 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3582 return pp_console_row(next(self.pps()))
3584 def pps(self, decode_path=()):
3588 bit_len, blob = self._value
3589 value = "%d bits" % bit_len
3590 if len(self.specs) > 0 and blob is not None:
3591 blob = tuple(self.named)
3594 asn1_type_name=self.asn1_type_name,
3595 obj_name=self.__class__.__name__,
3596 decode_path=decode_path,
3599 optional=self.optional,
3600 default=self == self.default,
3601 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3602 expl=None if self._expl is None else tag_decode(self._expl),
3607 expl_offset=self.expl_offset if self.expled else None,
3608 expl_tlen=self.expl_tlen if self.expled else None,
3609 expl_llen=self.expl_llen if self.expled else None,
3610 expl_vlen=self.expl_vlen if self.expled else None,
3611 expl_lenindef=self.expl_lenindef,
3612 lenindef=self.lenindef,
3613 ber_encoded=self.ber_encoded,
3616 defined_by, defined = self.defined or (None, None)
3617 if defined_by is not None:
3619 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3621 for pp in self.pps_lenindef(decode_path):
3625 OctetStringState = namedtuple(
3627 BasicState._fields + (
3638 class OctetString(Obj):
3639 """``OCTET STRING`` binary string type
3641 >>> s = OctetString(b"hello world")
3642 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3643 >>> s == OctetString(b"hello world")
3648 >>> OctetString(b"hello", bounds=(4, 4))
3649 Traceback (most recent call last):
3650 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3651 >>> OctetString(b"hell", bounds=(4, 4))
3652 OCTET STRING 4 bytes 68656c6c
3654 Memoryviews can be used as a values. If memoryview is made on
3655 mmap-ed file, then it does not take storage inside OctetString
3656 itself. In CER encoding mode it will be streamed to the specified
3657 writer, copying 1 KB chunks.
3659 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3660 tag_default = tag_encode(4)
3661 asn1_type_name = "OCTET STRING"
3662 evgen_mode_skip_value = True
3676 :param value: set the value. Either binary type, or
3677 :py:class:`pyderasn.OctetString` object
3678 :param bounds: set ``(MIN, MAX)`` value size constraint.
3679 (-inf, +inf) by default
3680 :param bytes impl: override default tag with ``IMPLICIT`` one
3681 :param bytes expl: override default tag with ``EXPLICIT`` one
3682 :param default: set default value. Type same as in ``value``
3683 :param bool optional: is object ``OPTIONAL`` in sequence
3685 super(OctetString, self).__init__(impl, expl, default, optional, _decoded)
3687 self._bound_min, self._bound_max = getattr(
3691 ) if bounds is None else bounds
3692 if value is not None:
3693 self._value = self._value_sanitize(value)
3694 if default is not None:
3695 default = self._value_sanitize(default)
3696 self.default = self.__class__(
3701 if self._value is None:
3702 self._value = default
3704 tag_klass, _, tag_num = tag_decode(self.tag)
3705 self.tag_constructed = tag_encode(
3707 form=TagFormConstructed,
3711 def _value_sanitize(self, value):
3712 if value.__class__ == binary_type or value.__class__ == memoryview:
3714 elif issubclass(value.__class__, OctetString):
3715 value = value._value
3717 raise InvalidValueType((self.__class__, bytes, memoryview))
3718 if not self._bound_min <= len(value) <= self._bound_max:
3719 raise BoundsError(self._bound_min, len(value), self._bound_max)
3724 return self._value is not None
3726 def __getstate__(self):
3727 return OctetStringState(
3743 self.tag_constructed,
3747 def __setstate__(self, state):
3748 super(OctetString, self).__setstate__(state)
3749 self._value = state.value
3750 self._bound_min = state.bound_min
3751 self._bound_max = state.bound_max
3752 self.tag_constructed = state.tag_constructed
3753 self.defined = state.defined
3755 def __bytes__(self):
3756 self._assert_ready()
3757 return bytes(self._value)
3759 def __eq__(self, their):
3760 if their.__class__ == binary_type:
3761 return self._value == their
3762 if not issubclass(their.__class__, OctetString):
3765 self._value == their._value and
3766 self.tag == their.tag and
3767 self._expl == their._expl
3770 def __lt__(self, their):
3771 return self._value < their._value
3782 return self.__class__(
3785 (self._bound_min, self._bound_max)
3786 if bounds is None else bounds
3788 impl=self.tag if impl is None else impl,
3789 expl=self._expl if expl is None else expl,
3790 default=self.default if default is None else default,
3791 optional=self.optional if optional is None else optional,
3795 self._assert_ready()
3798 len_encode(len(self._value)),
3802 def _encode1st(self, state):
3803 self._assert_ready()
3804 l = len(self._value)
3805 return len(self.tag) + len_size(l) + l, state
3807 def _encode2nd(self, writer, state_iter):
3809 write_full(writer, self.tag + len_encode(len(value)))
3810 write_full(writer, value)
3812 def _encode_cer(self, writer):
3813 octets = self._value
3814 if len(octets) <= 1000:
3815 write_full(writer, self._encode())
3817 write_full(writer, self.tag_constructed)
3818 write_full(writer, LENINDEF)
3819 for offset in six_xrange(0, (len(octets) // 1000) * 1000, 1000):
3820 write_full(writer, b"".join((
3821 OctetString.tag_default,
3823 octets[offset:offset + 1000],
3825 tail = octets[offset+1000:]
3827 write_full(writer, b"".join((
3828 OctetString.tag_default,
3829 len_encode(len(tail)),
3832 write_full(writer, EOC)
3834 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3836 t, tlen, lv = tag_strip(tlv)
3837 except DecodeError as err:
3838 raise err.__class__(
3840 klass=self.__class__,
3841 decode_path=decode_path,
3849 l, llen, v = len_decode(lv)
3850 except DecodeError as err:
3851 raise err.__class__(
3853 klass=self.__class__,
3854 decode_path=decode_path,
3858 raise NotEnoughData(
3859 "encoded length is longer than data",
3860 klass=self.__class__,
3861 decode_path=decode_path,
3864 v, tail = v[:l], v[l:]
3865 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3867 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3868 klass=self.__class__,
3869 decode_path=decode_path,
3873 obj = self.__class__(
3875 None if (evgen_mode and self.evgen_mode_skip_value)
3878 bounds=(self._bound_min, self._bound_max),
3881 default=self.default,
3882 optional=self.optional,
3883 _decoded=(offset, llen, l),
3886 except DecodeError as err:
3889 klass=self.__class__,
3890 decode_path=decode_path,
3893 except BoundsError as err:
3896 klass=self.__class__,
3897 decode_path=decode_path,
3900 yield decode_path, obj, tail
3902 if t != self.tag_constructed:
3904 klass=self.__class__,
3905 decode_path=decode_path,
3908 if not ctx.get("bered", False):
3910 "unallowed BER constructed encoding",
3911 klass=self.__class__,
3912 decode_path=decode_path,
3920 l, llen, v = len_decode(lv)
3921 except LenIndefForm:
3922 llen, l, v = 1, 0, lv[1:]
3924 except DecodeError as err:
3925 raise err.__class__(
3927 klass=self.__class__,
3928 decode_path=decode_path,
3932 raise NotEnoughData(
3933 "encoded length is longer than data",
3934 klass=self.__class__,
3935 decode_path=decode_path,
3940 sub_offset = offset + tlen + llen
3945 if v[:EOC_LEN].tobytes() == EOC:
3952 "chunk out of bounds",
3953 klass=self.__class__,
3954 decode_path=decode_path + (str(len(chunks) - 1),),
3955 offset=chunks[-1].offset,
3959 sub_decode_path = decode_path + (str(chunks_count),)
3960 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3963 decode_path=sub_decode_path,
3966 _ctx_immutable=False,
3968 yield _decode_path, chunk, v_tail
3969 if not chunk.ber_encoded:
3970 payload_len += chunk.vlen
3973 sub_decode_path = decode_path + (str(len(chunks)),)
3974 _, chunk, v_tail = next(OctetString().decode_evgen(
3977 decode_path=sub_decode_path,
3980 _ctx_immutable=False,
3983 chunks.append(chunk)
3986 "expected OctetString encoded chunk",
3987 klass=self.__class__,
3988 decode_path=sub_decode_path,
3991 sub_offset += chunk.tlvlen
3992 vlen += chunk.tlvlen
3994 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3996 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3997 klass=self.__class__,
3998 decode_path=decode_path,
4002 obj = self.__class__(
4004 None if evgen_mode else
4005 b"".join(bytes(chunk) for chunk in chunks)
4007 bounds=(self._bound_min, self._bound_max),
4010 default=self.default,
4011 optional=self.optional,
4012 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4015 except DecodeError as err:
4018 klass=self.__class__,
4019 decode_path=decode_path,
4022 except BoundsError as err:
4025 klass=self.__class__,
4026 decode_path=decode_path,
4029 obj.lenindef = lenindef
4030 obj.ber_encoded = True
4031 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4034 return pp_console_row(next(self.pps()))
4036 def pps(self, decode_path=()):
4039 asn1_type_name=self.asn1_type_name,
4040 obj_name=self.__class__.__name__,
4041 decode_path=decode_path,
4042 value=("%d bytes" % len(self._value)) if self.ready else None,
4043 blob=self._value if self.ready else None,
4044 optional=self.optional,
4045 default=self == self.default,
4046 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4047 expl=None if self._expl is None else tag_decode(self._expl),
4052 expl_offset=self.expl_offset if self.expled else None,
4053 expl_tlen=self.expl_tlen if self.expled else None,
4054 expl_llen=self.expl_llen if self.expled else None,
4055 expl_vlen=self.expl_vlen if self.expled else None,
4056 expl_lenindef=self.expl_lenindef,
4057 lenindef=self.lenindef,
4058 ber_encoded=self.ber_encoded,
4061 defined_by, defined = self.defined or (None, None)
4062 if defined_by is not None:
4064 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4066 for pp in self.pps_lenindef(decode_path):
4070 def agg_octet_string(evgens, decode_path, raw, writer):
4071 """Aggregate constructed string (OctetString and its derivatives)
4073 :param evgens: iterator of generated events
4074 :param decode_path: points to the string we want to decode
4075 :param raw: slicebable (memoryview, bytearray, etc) with
4076 the data evgens are generated on
4077 :param writer: buffer.write where string is going to be saved
4078 :param writer: where string is going to be saved. Must comply
4079 with ``io.RawIOBase.write`` behaviour
4081 .. seealso:: :ref:`agg_octet_string`
4083 decode_path_len = len(decode_path)
4084 for dp, obj, _ in evgens:
4085 if dp[:decode_path_len] != decode_path:
4087 if not obj.ber_encoded:
4088 write_full(writer, raw[
4089 obj.offset + obj.tlen + obj.llen:
4090 obj.offset + obj.tlen + obj.llen + obj.vlen -
4091 (EOC_LEN if obj.expl_lenindef else 0)
4093 if len(dp) == decode_path_len:
4097 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4101 """``NULL`` null object
4109 tag_default = tag_encode(5)
4110 asn1_type_name = "NULL"
4114 value=None, # unused, but Sequence passes it
4121 :param bytes impl: override default tag with ``IMPLICIT`` one
4122 :param bytes expl: override default tag with ``EXPLICIT`` one
4123 :param bool optional: is object ``OPTIONAL`` in sequence
4125 super(Null, self).__init__(impl, expl, None, optional, _decoded)
4132 def __getstate__(self):
4148 def __eq__(self, their):
4149 if not issubclass(their.__class__, Null):
4152 self.tag == their.tag and
4153 self._expl == their._expl
4163 return self.__class__(
4164 impl=self.tag if impl is None else impl,
4165 expl=self._expl if expl is None else expl,
4166 optional=self.optional if optional is None else optional,
4170 return self.tag + LEN0
4172 def _encode1st(self, state):
4173 return len(self.tag) + 1, state
4175 def _encode2nd(self, writer, state_iter):
4176 write_full(writer, self.tag + LEN0)
4178 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4180 t, _, lv = tag_strip(tlv)
4181 except DecodeError as err:
4182 raise err.__class__(
4184 klass=self.__class__,
4185 decode_path=decode_path,
4190 klass=self.__class__,
4191 decode_path=decode_path,
4194 if tag_only: # pragma: no cover
4198 l, _, v = len_decode(lv)
4199 except DecodeError as err:
4200 raise err.__class__(
4202 klass=self.__class__,
4203 decode_path=decode_path,
4207 raise InvalidLength(
4208 "Null must have zero length",
4209 klass=self.__class__,
4210 decode_path=decode_path,
4213 obj = self.__class__(
4216 optional=self.optional,
4217 _decoded=(offset, 1, 0),
4219 yield decode_path, obj, v
4222 return pp_console_row(next(self.pps()))
4224 def pps(self, decode_path=()):
4227 asn1_type_name=self.asn1_type_name,
4228 obj_name=self.__class__.__name__,
4229 decode_path=decode_path,
4230 optional=self.optional,
4231 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4232 expl=None if self._expl is None else tag_decode(self._expl),
4237 expl_offset=self.expl_offset if self.expled else None,
4238 expl_tlen=self.expl_tlen if self.expled else None,
4239 expl_llen=self.expl_llen if self.expled else None,
4240 expl_vlen=self.expl_vlen if self.expled else None,
4241 expl_lenindef=self.expl_lenindef,
4244 for pp in self.pps_lenindef(decode_path):
4248 ObjectIdentifierState = namedtuple(
4249 "ObjectIdentifierState",
4250 BasicState._fields + ("value", "defines"),
4255 class ObjectIdentifier(Obj):
4256 """``OBJECT IDENTIFIER`` OID type
4258 >>> oid = ObjectIdentifier((1, 2, 3))
4259 OBJECT IDENTIFIER 1.2.3
4260 >>> oid == ObjectIdentifier("1.2.3")
4266 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4267 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4269 >>> str(ObjectIdentifier((3, 1)))
4270 Traceback (most recent call last):
4271 pyderasn.InvalidOID: unacceptable first arc value
4273 __slots__ = ("defines",)
4274 tag_default = tag_encode(6)
4275 asn1_type_name = "OBJECT IDENTIFIER"
4288 :param value: set the value. Either tuples of integers,
4289 string of "."-concatenated integers, or
4290 :py:class:`pyderasn.ObjectIdentifier` object
4291 :param defines: sequence of tuples. Each tuple has two elements.
4292 First one is relative to current one decode
4293 path, aiming to the field defined by that OID.
4294 Read about relative path in
4295 :py:func:`pyderasn.abs_decode_path`. Second
4296 tuple element is ``{OID: pyderasn.Obj()}``
4297 dictionary, mapping between current OID value
4298 and structure applied to defined field.
4300 .. seealso:: :ref:`definedby`
4302 :param bytes impl: override default tag with ``IMPLICIT`` one
4303 :param bytes expl: override default tag with ``EXPLICIT`` one
4304 :param default: set default value. Type same as in ``value``
4305 :param bool optional: is object ``OPTIONAL`` in sequence
4307 super(ObjectIdentifier, self).__init__(impl, expl, default, optional, _decoded)
4309 if value is not None:
4310 self._value = self._value_sanitize(value)
4311 if default is not None:
4312 default = self._value_sanitize(default)
4313 self.default = self.__class__(
4318 if self._value is None:
4319 self._value = default
4320 self.defines = defines
4322 def __add__(self, their):
4323 if their.__class__ == tuple:
4324 return self.__class__(self._value + array("L", their))
4325 if isinstance(their, self.__class__):
4326 return self.__class__(self._value + their._value)
4327 raise InvalidValueType((self.__class__, tuple))
4329 def _value_sanitize(self, value):
4330 if issubclass(value.__class__, ObjectIdentifier):
4332 if isinstance(value, string_types):
4334 value = array("L", (pureint(arc) for arc in value.split(".")))
4336 raise InvalidOID("unacceptable arcs values")
4337 if value.__class__ == tuple:
4339 value = array("L", value)
4340 except OverflowError as err:
4341 raise InvalidOID(repr(err))
4342 if value.__class__ is array:
4344 raise InvalidOID("less than 2 arcs")
4345 first_arc = value[0]
4346 if first_arc in (0, 1):
4347 if not (0 <= value[1] <= 39):
4348 raise InvalidOID("second arc is too wide")
4349 elif first_arc == 2:
4352 raise InvalidOID("unacceptable first arc value")
4353 if not all(arc >= 0 for arc in value):
4354 raise InvalidOID("negative arc value")
4356 raise InvalidValueType((self.__class__, str, tuple))
4360 return self._value is not None
4362 def __getstate__(self):
4363 return ObjectIdentifierState(
4380 def __setstate__(self, state):
4381 super(ObjectIdentifier, self).__setstate__(state)
4382 self._value = state.value
4383 self.defines = state.defines
4386 self._assert_ready()
4387 return iter(self._value)
4390 return ".".join(str(arc) for arc in self._value or ())
4393 self._assert_ready()
4394 return hash(b"".join((
4396 bytes(self._expl or b""),
4397 str(self._value).encode("ascii"),
4400 def __eq__(self, their):
4401 if their.__class__ == tuple:
4402 return self._value == array("L", their)
4403 if not issubclass(their.__class__, ObjectIdentifier):
4406 self.tag == their.tag and
4407 self._expl == their._expl and
4408 self._value == their._value
4411 def __lt__(self, their):
4412 return self._value < their._value
4423 return self.__class__(
4425 defines=self.defines if defines is None else defines,
4426 impl=self.tag if impl is None else impl,
4427 expl=self._expl if expl is None else expl,
4428 default=self.default if default is None else default,
4429 optional=self.optional if optional is None else optional,
4432 def _encode_octets(self):
4433 self._assert_ready()
4435 first_value = value[1]
4436 first_arc = value[0]
4439 elif first_arc == 1:
4441 elif first_arc == 2:
4443 else: # pragma: no cover
4444 raise RuntimeError("invalid arc is stored")
4445 octets = [zero_ended_encode(first_value)]
4446 for arc in value[2:]:
4447 octets.append(zero_ended_encode(arc))
4448 return b"".join(octets)
4451 v = self._encode_octets()
4452 return b"".join((self.tag, len_encode(len(v)), v))
4454 def _encode1st(self, state):
4455 l = len(self._encode_octets())
4456 return len(self.tag) + len_size(l) + l, state
4458 def _encode2nd(self, writer, state_iter):
4459 write_full(writer, self._encode())
4461 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4463 t, _, lv = tag_strip(tlv)
4464 except DecodeError as err:
4465 raise err.__class__(
4467 klass=self.__class__,
4468 decode_path=decode_path,
4473 klass=self.__class__,
4474 decode_path=decode_path,
4477 if tag_only: # pragma: no cover
4481 l, llen, v = len_decode(lv)
4482 except DecodeError as err:
4483 raise err.__class__(
4485 klass=self.__class__,
4486 decode_path=decode_path,
4490 raise NotEnoughData(
4491 "encoded length is longer than data",
4492 klass=self.__class__,
4493 decode_path=decode_path,
4497 raise NotEnoughData(
4499 klass=self.__class__,
4500 decode_path=decode_path,
4503 v, tail = v[:l], v[l:]
4510 octet = indexbytes(v, i)
4511 if i == 0 and octet == 0x80:
4512 if ctx.get("bered", False):
4516 "non normalized arc encoding",
4517 klass=self.__class__,
4518 decode_path=decode_path,
4521 arc = (arc << 7) | (octet & 0x7F)
4522 if octet & 0x80 == 0:
4525 except OverflowError:
4527 "too huge value for local unsigned long",
4528 klass=self.__class__,
4529 decode_path=decode_path,
4538 klass=self.__class__,
4539 decode_path=decode_path,
4543 second_arc = arcs[0]
4544 if 0 <= second_arc <= 39:
4546 elif 40 <= second_arc <= 79:
4552 obj = self.__class__(
4553 value=array("L", (first_arc, second_arc)) + arcs[1:],
4556 default=self.default,
4557 optional=self.optional,
4558 _decoded=(offset, llen, l),
4561 obj.ber_encoded = True
4562 yield decode_path, obj, tail
4565 return pp_console_row(next(self.pps()))
4567 def pps(self, decode_path=()):
4570 asn1_type_name=self.asn1_type_name,
4571 obj_name=self.__class__.__name__,
4572 decode_path=decode_path,
4573 value=str(self) if self.ready else None,
4574 optional=self.optional,
4575 default=self == self.default,
4576 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4577 expl=None if self._expl is None else tag_decode(self._expl),
4582 expl_offset=self.expl_offset if self.expled else None,
4583 expl_tlen=self.expl_tlen if self.expled else None,
4584 expl_llen=self.expl_llen if self.expled else None,
4585 expl_vlen=self.expl_vlen if self.expled else None,
4586 expl_lenindef=self.expl_lenindef,
4587 ber_encoded=self.ber_encoded,
4590 for pp in self.pps_lenindef(decode_path):
4594 class Enumerated(Integer):
4595 """``ENUMERATED`` integer type
4597 This type is identical to :py:class:`pyderasn.Integer`, but requires
4598 schema to be specified and does not accept values missing from it.
4601 tag_default = tag_encode(10)
4602 asn1_type_name = "ENUMERATED"
4613 bounds=None, # dummy argument, workability for Integer.decode
4615 super(Enumerated, self).__init__(
4616 value, bounds, impl, expl, default, optional, _specs, _decoded,
4618 if len(self.specs) == 0:
4619 raise ValueError("schema must be specified")
4621 def _value_sanitize(self, value):
4622 if isinstance(value, self.__class__):
4623 value = value._value
4624 elif isinstance(value, integer_types):
4625 for _value in itervalues(self.specs):
4630 "unknown integer value: %s" % value,
4631 klass=self.__class__,
4633 elif isinstance(value, string_types):
4634 value = self.specs.get(value)
4636 raise ObjUnknown("integer value: %s" % value)
4638 raise InvalidValueType((self.__class__, int, str))
4650 return self.__class__(
4652 impl=self.tag if impl is None else impl,
4653 expl=self._expl if expl is None else expl,
4654 default=self.default if default is None else default,
4655 optional=self.optional if optional is None else optional,
4660 def escape_control_unicode(c):
4661 if unicat(c)[0] == "C":
4662 c = repr(c).lstrip("u").strip("'")
4666 class CommonString(OctetString):
4667 """Common class for all strings
4669 Everything resembles :py:class:`pyderasn.OctetString`, except
4670 ability to deal with unicode text strings.
4672 >>> hexenc("привет мир".encode("utf-8"))
4673 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4674 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4676 >>> s = UTF8String("привет мир")
4677 UTF8String UTF8String привет мир
4679 'привет мир'
4680 >>> hexenc(bytes(s))
4681 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4683 >>> PrintableString("привет мир")
4684 Traceback (most recent call last):
4685 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4687 >>> BMPString("ада", bounds=(2, 2))
4688 Traceback (most recent call last):
4689 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4690 >>> s = BMPString("ад", bounds=(2, 2))
4693 >>> hexenc(bytes(s))
4701 * - :py:class:`pyderasn.UTF8String`
4703 * - :py:class:`pyderasn.NumericString`
4705 * - :py:class:`pyderasn.PrintableString`
4707 * - :py:class:`pyderasn.TeletexString`
4709 * - :py:class:`pyderasn.T61String`
4711 * - :py:class:`pyderasn.VideotexString`
4713 * - :py:class:`pyderasn.IA5String`
4715 * - :py:class:`pyderasn.GraphicString`
4717 * - :py:class:`pyderasn.VisibleString`
4719 * - :py:class:`pyderasn.ISO646String`
4721 * - :py:class:`pyderasn.GeneralString`
4723 * - :py:class:`pyderasn.UniversalString`
4725 * - :py:class:`pyderasn.BMPString`
4730 def _value_sanitize(self, value):
4732 value_decoded = None
4733 if isinstance(value, self.__class__):
4734 value_raw = value._value
4735 elif value.__class__ == text_type:
4736 value_decoded = value
4737 elif value.__class__ == binary_type:
4740 raise InvalidValueType((self.__class__, text_type, binary_type))
4743 value_decoded.encode(self.encoding)
4744 if value_raw is None else value_raw
4747 value_raw.decode(self.encoding)
4748 if value_decoded is None else value_decoded
4750 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4751 raise DecodeError(str(err))
4752 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4760 def __eq__(self, their):
4761 if their.__class__ == binary_type:
4762 return self._value == their
4763 if their.__class__ == text_type:
4764 return self._value == their.encode(self.encoding)
4765 if not isinstance(their, self.__class__):
4768 self._value == their._value and
4769 self.tag == their.tag and
4770 self._expl == their._expl
4773 def __unicode__(self):
4775 return self._value.decode(self.encoding)
4776 return text_type(self._value)
4779 return pp_console_row(next(self.pps(no_unicode=PY2)))
4781 def pps(self, decode_path=(), no_unicode=False):
4785 hexenc(bytes(self)) if no_unicode else
4786 "".join(escape_control_unicode(c) for c in self.__unicode__())
4790 asn1_type_name=self.asn1_type_name,
4791 obj_name=self.__class__.__name__,
4792 decode_path=decode_path,
4794 optional=self.optional,
4795 default=self == self.default,
4796 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4797 expl=None if self._expl is None else tag_decode(self._expl),
4802 expl_offset=self.expl_offset if self.expled else None,
4803 expl_tlen=self.expl_tlen if self.expled else None,
4804 expl_llen=self.expl_llen if self.expled else None,
4805 expl_vlen=self.expl_vlen if self.expled else None,
4806 expl_lenindef=self.expl_lenindef,
4807 ber_encoded=self.ber_encoded,
4810 for pp in self.pps_lenindef(decode_path):
4814 class UTF8String(CommonString):
4816 tag_default = tag_encode(12)
4818 asn1_type_name = "UTF8String"
4821 class AllowableCharsMixin(object):
4823 def allowable_chars(self):
4825 return self._allowable_chars
4826 return frozenset(six_unichr(c) for c in self._allowable_chars)
4829 class NumericString(AllowableCharsMixin, CommonString):
4832 Its value is properly sanitized: only ASCII digits with spaces can
4835 >>> NumericString().allowable_chars
4836 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4839 tag_default = tag_encode(18)
4841 asn1_type_name = "NumericString"
4842 _allowable_chars = frozenset(digits.encode("ascii") + b" ")
4844 def _value_sanitize(self, value):
4845 value = super(NumericString, self)._value_sanitize(value)
4846 if not frozenset(value) <= self._allowable_chars:
4847 raise DecodeError("non-numeric value")
4851 PrintableStringState = namedtuple(
4852 "PrintableStringState",
4853 OctetStringState._fields + ("allowable_chars",),
4858 class PrintableString(AllowableCharsMixin, CommonString):
4861 Its value is properly sanitized: see X.680 41.4 table 10.
4863 >>> PrintableString().allowable_chars
4864 frozenset([' ', "'", ..., 'z'])
4865 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4866 PrintableString PrintableString foo*bar
4867 >>> obj.allow_asterisk, obj.allow_ampersand
4871 tag_default = tag_encode(19)
4873 asn1_type_name = "PrintableString"
4874 _allowable_chars = frozenset(
4875 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4877 _asterisk = frozenset("*".encode("ascii"))
4878 _ampersand = frozenset("&".encode("ascii"))
4890 allow_asterisk=False,
4891 allow_ampersand=False,
4894 :param allow_asterisk: allow asterisk character
4895 :param allow_ampersand: allow ampersand character
4898 self._allowable_chars |= self._asterisk
4900 self._allowable_chars |= self._ampersand
4901 super(PrintableString, self).__init__(
4902 value, bounds, impl, expl, default, optional, _decoded, ctx,
4906 def allow_asterisk(self):
4907 """Is asterisk character allowed?
4909 return self._asterisk <= self._allowable_chars
4912 def allow_ampersand(self):
4913 """Is ampersand character allowed?
4915 return self._ampersand <= self._allowable_chars
4917 def _value_sanitize(self, value):
4918 value = super(PrintableString, self)._value_sanitize(value)
4919 if not frozenset(value) <= self._allowable_chars:
4920 raise DecodeError("non-printable value")
4923 def __getstate__(self):
4924 return PrintableStringState(
4925 *super(PrintableString, self).__getstate__(),
4926 **{"allowable_chars": self._allowable_chars}
4929 def __setstate__(self, state):
4930 super(PrintableString, self).__setstate__(state)
4931 self._allowable_chars = state.allowable_chars
4942 return self.__class__(
4945 (self._bound_min, self._bound_max)
4946 if bounds is None else bounds
4948 impl=self.tag if impl is None else impl,
4949 expl=self._expl if expl is None else expl,
4950 default=self.default if default is None else default,
4951 optional=self.optional if optional is None else optional,
4952 allow_asterisk=self.allow_asterisk,
4953 allow_ampersand=self.allow_ampersand,
4957 class TeletexString(CommonString):
4959 tag_default = tag_encode(20)
4961 asn1_type_name = "TeletexString"
4964 class T61String(TeletexString):
4966 asn1_type_name = "T61String"
4969 class VideotexString(CommonString):
4971 tag_default = tag_encode(21)
4972 encoding = "iso-8859-1"
4973 asn1_type_name = "VideotexString"
4976 class IA5String(CommonString):
4978 tag_default = tag_encode(22)
4980 asn1_type_name = "IA5"
4983 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4984 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4985 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4986 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
4987 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
4988 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
4991 class VisibleString(CommonString):
4993 tag_default = tag_encode(26)
4995 asn1_type_name = "VisibleString"
4998 UTCTimeState = namedtuple(
5000 OctetStringState._fields + ("ber_raw",),
5005 def str_to_time_fractions(value):
5007 year, v = (v // 10**10), (v % 10**10)
5008 month, v = (v // 10**8), (v % 10**8)
5009 day, v = (v // 10**6), (v % 10**6)
5010 hour, v = (v // 10**4), (v % 10**4)
5011 minute, second = (v // 100), (v % 100)
5012 return year, month, day, hour, minute, second
5015 class UTCTime(VisibleString):
5016 """``UTCTime`` datetime type
5018 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5019 UTCTime UTCTime 2017-09-30T22:07:50
5025 datetime.datetime(2017, 9, 30, 22, 7, 50)
5026 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5027 datetime.datetime(1957, 9, 30, 22, 7, 50)
5029 If BER encoded value was met, then ``ber_raw`` attribute will hold
5030 its raw representation.
5034 Pay attention that UTCTime can not hold full year, so all years
5035 having < 50 years are treated as 20xx, 19xx otherwise, according
5036 to X.509 recommendation.
5040 No strict validation of UTC offsets are made, but very crude:
5042 * minutes are not exceeding 60
5043 * offset value is not exceeding 14 hours
5045 __slots__ = ("ber_raw",)
5046 tag_default = tag_encode(23)
5048 asn1_type_name = "UTCTime"
5049 evgen_mode_skip_value = False
5059 bounds=None, # dummy argument, workability for OctetString.decode
5063 :param value: set the value. Either datetime type, or
5064 :py:class:`pyderasn.UTCTime` object
5065 :param bytes impl: override default tag with ``IMPLICIT`` one
5066 :param bytes expl: override default tag with ``EXPLICIT`` one
5067 :param default: set default value. Type same as in ``value``
5068 :param bool optional: is object ``OPTIONAL`` in sequence
5070 super(UTCTime, self).__init__(
5071 None, None, impl, expl, None, optional, _decoded, ctx,
5075 if value is not None:
5076 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5077 self.ber_encoded = self.ber_raw is not None
5078 if default is not None:
5079 default, _ = self._value_sanitize(default)
5080 self.default = self.__class__(
5085 if self._value is None:
5086 self._value = default
5088 self.optional = optional
5090 def _strptime_bered(self, value):
5091 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5094 raise ValueError("no timezone")
5095 year += 2000 if year < 50 else 1900
5096 decoded = datetime(year, month, day, hour, minute)
5098 if value[-1] == "Z":
5102 raise ValueError("invalid UTC offset")
5103 if value[-5] == "-":
5105 elif value[-5] == "+":
5108 raise ValueError("invalid UTC offset")
5109 v = pureint(value[-4:])
5110 offset, v = (60 * (v % 100)), v // 100
5112 raise ValueError("invalid UTC offset minutes")
5114 if offset > 14 * 3600:
5115 raise ValueError("too big UTC offset")
5119 return offset, decoded
5121 raise ValueError("invalid UTC offset seconds")
5122 seconds = pureint(value)
5124 raise ValueError("invalid seconds value")
5125 return offset, decoded + timedelta(seconds=seconds)
5127 def _strptime(self, value):
5128 # datetime.strptime's format: %y%m%d%H%M%SZ
5129 if len(value) != LEN_YYMMDDHHMMSSZ:
5130 raise ValueError("invalid UTCTime length")
5131 if value[-1] != "Z":
5132 raise ValueError("non UTC timezone")
5133 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5134 year += 2000 if year < 50 else 1900
5135 return datetime(year, month, day, hour, minute, second)
5137 def _dt_sanitize(self, value):
5138 if value.year < 1950 or value.year > 2049:
5139 raise ValueError("UTCTime can hold only 1950-2049 years")
5140 return value.replace(microsecond=0)
5142 def _value_sanitize(self, value, ctx=None):
5143 if value.__class__ == binary_type:
5145 value_decoded = value.decode("ascii")
5146 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5147 raise DecodeError("invalid UTCTime encoding: %r" % err)
5150 return self._strptime(value_decoded), None
5151 except (TypeError, ValueError) as _err:
5153 if (ctx is not None) and ctx.get("bered", False):
5155 offset, _value = self._strptime_bered(value_decoded)
5156 _value = _value - timedelta(seconds=offset)
5157 return self._dt_sanitize(_value), value
5158 except (TypeError, ValueError, OverflowError) as _err:
5161 "invalid %s format: %r" % (self.asn1_type_name, err),
5162 klass=self.__class__,
5164 if isinstance(value, self.__class__):
5165 return value._value, None
5166 if value.__class__ == datetime:
5167 return self._dt_sanitize(value), None
5168 raise InvalidValueType((self.__class__, datetime))
5170 def _pp_value(self):
5172 value = self._value.isoformat()
5173 if self.ber_encoded:
5174 value += " (%s)" % self.ber_raw
5178 def __unicode__(self):
5180 value = self._value.isoformat()
5181 if self.ber_encoded:
5182 value += " (%s)" % self.ber_raw
5184 return text_type(self._pp_value())
5186 def __getstate__(self):
5187 return UTCTimeState(
5188 *super(UTCTime, self).__getstate__(),
5189 **{"ber_raw": self.ber_raw}
5192 def __setstate__(self, state):
5193 super(UTCTime, self).__setstate__(state)
5194 self.ber_raw = state.ber_raw
5196 def __bytes__(self):
5197 self._assert_ready()
5198 return self._encode_time()
5200 def __eq__(self, their):
5201 if their.__class__ == binary_type:
5202 return self._encode_time() == their
5203 if their.__class__ == datetime:
5204 return self.todatetime() == their
5205 if not isinstance(their, self.__class__):
5208 self._value == their._value and
5209 self.tag == their.tag and
5210 self._expl == their._expl
5213 def _encode_time(self):
5214 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5217 self._assert_ready()
5218 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5220 def _encode1st(self, state):
5221 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5223 def _encode2nd(self, writer, state_iter):
5224 self._assert_ready()
5225 write_full(writer, self._encode())
5227 def _encode_cer(self, writer):
5228 write_full(writer, self._encode())
5230 def todatetime(self):
5234 return pp_console_row(next(self.pps()))
5236 def pps(self, decode_path=()):
5239 asn1_type_name=self.asn1_type_name,
5240 obj_name=self.__class__.__name__,
5241 decode_path=decode_path,
5242 value=self._pp_value(),
5243 optional=self.optional,
5244 default=self == self.default,
5245 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5246 expl=None if self._expl is None else tag_decode(self._expl),
5251 expl_offset=self.expl_offset if self.expled else None,
5252 expl_tlen=self.expl_tlen if self.expled else None,
5253 expl_llen=self.expl_llen if self.expled else None,
5254 expl_vlen=self.expl_vlen if self.expled else None,
5255 expl_lenindef=self.expl_lenindef,
5256 ber_encoded=self.ber_encoded,
5259 for pp in self.pps_lenindef(decode_path):
5263 class GeneralizedTime(UTCTime):
5264 """``GeneralizedTime`` datetime type
5266 This type is similar to :py:class:`pyderasn.UTCTime`.
5268 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5269 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5271 '20170930220750.000123Z'
5272 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5273 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5277 Only microsecond fractions are supported in DER encoding.
5278 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5279 higher precision values.
5283 BER encoded data can loss information (accuracy) during decoding
5284 because of float transformations.
5288 Local times (without explicit timezone specification) are treated
5289 as UTC one, no transformations are made.
5293 Zero year is unsupported.
5296 tag_default = tag_encode(24)
5297 asn1_type_name = "GeneralizedTime"
5299 def _dt_sanitize(self, value):
5302 def _strptime_bered(self, value):
5303 if len(value) < 4 + 3 * 2:
5304 raise ValueError("invalid GeneralizedTime")
5305 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5306 decoded = datetime(year, month, day, hour)
5307 offset, value = 0, value[10:]
5309 return offset, decoded
5310 if value[-1] == "Z":
5313 for char, sign in (("-", -1), ("+", 1)):
5314 idx = value.rfind(char)
5317 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5318 v = pureint(offset_raw)
5319 if len(offset_raw) == 4:
5320 offset, v = (60 * (v % 100)), v // 100
5322 raise ValueError("invalid UTC offset minutes")
5323 elif len(offset_raw) == 2:
5326 raise ValueError("invalid UTC offset")
5328 if offset > 14 * 3600:
5329 raise ValueError("too big UTC offset")
5333 return offset, decoded
5334 if value[0] in DECIMAL_SIGNS:
5336 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5339 raise ValueError("stripped minutes")
5340 decoded += timedelta(seconds=60 * pureint(value[:2]))
5343 return offset, decoded
5344 if value[0] in DECIMAL_SIGNS:
5346 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5349 raise ValueError("stripped seconds")
5350 decoded += timedelta(seconds=pureint(value[:2]))
5353 return offset, decoded
5354 if value[0] not in DECIMAL_SIGNS:
5355 raise ValueError("invalid format after seconds")
5357 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5360 def _strptime(self, value):
5362 if l == LEN_YYYYMMDDHHMMSSZ:
5363 # datetime.strptime's format: %Y%m%d%H%M%SZ
5364 if value[-1] != "Z":
5365 raise ValueError("non UTC timezone")
5366 return datetime(*str_to_time_fractions(value[:-1]))
5367 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5368 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5369 if value[-1] != "Z":
5370 raise ValueError("non UTC timezone")
5371 if value[14] != ".":
5372 raise ValueError("no fractions separator")
5375 raise ValueError("trailing zero")
5378 raise ValueError("only microsecond fractions are supported")
5379 us = pureint(us + ("0" * (6 - us_len)))
5380 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5381 return datetime(year, month, day, hour, minute, second, us)
5382 raise ValueError("invalid GeneralizedTime length")
5384 def _encode_time(self):
5386 encoded = value.strftime("%Y%m%d%H%M%S")
5387 if value.microsecond > 0:
5388 encoded += (".%06d" % value.microsecond).rstrip("0")
5389 return (encoded + "Z").encode("ascii")
5392 self._assert_ready()
5394 if value.microsecond > 0:
5395 encoded = self._encode_time()
5396 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5397 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5399 def _encode1st(self, state):
5400 self._assert_ready()
5401 vlen = len(self._encode_time())
5402 return len(self.tag) + len_size(vlen) + vlen, state
5404 def _encode2nd(self, writer, state_iter):
5405 write_full(writer, self._encode())
5408 class GraphicString(CommonString):
5410 tag_default = tag_encode(25)
5411 encoding = "iso-8859-1"
5412 asn1_type_name = "GraphicString"
5415 class ISO646String(VisibleString):
5417 asn1_type_name = "ISO646String"
5420 class GeneralString(CommonString):
5422 tag_default = tag_encode(27)
5423 encoding = "iso-8859-1"
5424 asn1_type_name = "GeneralString"
5427 class UniversalString(CommonString):
5429 tag_default = tag_encode(28)
5430 encoding = "utf-32-be"
5431 asn1_type_name = "UniversalString"
5434 class BMPString(CommonString):
5436 tag_default = tag_encode(30)
5437 encoding = "utf-16-be"
5438 asn1_type_name = "BMPString"
5441 ChoiceState = namedtuple(
5443 BasicState._fields + ("specs", "value",),
5449 """``CHOICE`` special type
5453 class GeneralName(Choice):
5455 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5456 ("dNSName", IA5String(impl=tag_ctxp(2))),
5459 >>> gn = GeneralName()
5461 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5462 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5463 >>> gn["dNSName"] = IA5String("bar.baz")
5464 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5465 >>> gn["rfc822Name"]
5468 [2] IA5String IA5 bar.baz
5471 >>> gn.value == gn["dNSName"]
5474 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5476 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5477 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5479 __slots__ = ("specs",)
5481 asn1_type_name = "CHOICE"
5494 :param value: set the value. Either ``(choice, value)`` tuple, or
5495 :py:class:`pyderasn.Choice` object
5496 :param bytes impl: can not be set, do **not** use it
5497 :param bytes expl: override default tag with ``EXPLICIT`` one
5498 :param default: set default value. Type same as in ``value``
5499 :param bool optional: is object ``OPTIONAL`` in sequence
5501 if impl is not None:
5502 raise ValueError("no implicit tag allowed for CHOICE")
5503 super(Choice, self).__init__(None, expl, default, optional, _decoded)
5505 schema = getattr(self, "schema", ())
5506 if len(schema) == 0:
5507 raise ValueError("schema must be specified")
5509 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5512 if value is not None:
5513 self._value = self._value_sanitize(value)
5514 if default is not None:
5515 default_value = self._value_sanitize(default)
5516 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5517 default_obj.specs = self.specs
5518 default_obj._value = default_value
5519 self.default = default_obj
5521 self._value = copy(default_obj._value)
5522 if self._expl is not None:
5523 tag_class, _, tag_num = tag_decode(self._expl)
5524 self._tag_order = (tag_class, tag_num)
5526 def _value_sanitize(self, value):
5527 if (value.__class__ == tuple) and len(value) == 2:
5529 spec = self.specs.get(choice)
5531 raise ObjUnknown(choice)
5532 if not isinstance(obj, spec.__class__):
5533 raise InvalidValueType((spec,))
5534 return (choice, spec(obj))
5535 if isinstance(value, self.__class__):
5537 raise InvalidValueType((self.__class__, tuple))
5541 return self._value is not None and self._value[1].ready
5545 return self.expl_lenindef or (
5546 (self._value is not None) and
5547 self._value[1].bered
5550 def __getstate__(self):
5568 def __setstate__(self, state):
5569 super(Choice, self).__setstate__(state)
5570 self.specs = state.specs
5571 self._value = state.value
5573 def __eq__(self, their):
5574 if (their.__class__ == tuple) and len(their) == 2:
5575 return self._value == their
5576 if not isinstance(their, self.__class__):
5579 self.specs == their.specs and
5580 self._value == their._value
5590 return self.__class__(
5593 expl=self._expl if expl is None else expl,
5594 default=self.default if default is None else default,
5595 optional=self.optional if optional is None else optional,
5600 """Name of the choice
5602 self._assert_ready()
5603 return self._value[0]
5607 """Value of underlying choice
5609 self._assert_ready()
5610 return self._value[1]
5613 def tag_order(self):
5614 self._assert_ready()
5615 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5618 def tag_order_cer(self):
5619 return min(v.tag_order_cer for v in itervalues(self.specs))
5621 def __getitem__(self, key):
5622 if key not in self.specs:
5623 raise ObjUnknown(key)
5624 if self._value is None:
5626 choice, value = self._value
5631 def __setitem__(self, key, value):
5632 spec = self.specs.get(key)
5634 raise ObjUnknown(key)
5635 if not isinstance(value, spec.__class__):
5636 raise InvalidValueType((spec.__class__,))
5637 self._value = (key, spec(value))
5645 return self._value[1].decoded if self.ready else False
5648 self._assert_ready()
5649 return self._value[1].encode()
5651 def _encode1st(self, state):
5652 self._assert_ready()
5653 return self._value[1].encode1st(state)
5655 def _encode2nd(self, writer, state_iter):
5656 self._value[1].encode2nd(writer, state_iter)
5658 def _encode_cer(self, writer):
5659 self._assert_ready()
5660 self._value[1].encode_cer(writer)
5662 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5663 for choice, spec in iteritems(self.specs):
5664 sub_decode_path = decode_path + (choice,)
5670 decode_path=sub_decode_path,
5673 _ctx_immutable=False,
5680 klass=self.__class__,
5681 decode_path=decode_path,
5684 if tag_only: # pragma: no cover
5688 for _decode_path, value, tail in spec.decode_evgen(
5692 decode_path=sub_decode_path,
5694 _ctx_immutable=False,
5696 yield _decode_path, value, tail
5698 _, value, tail = next(spec.decode_evgen(
5702 decode_path=sub_decode_path,
5704 _ctx_immutable=False,
5707 obj = self.__class__(
5710 default=self.default,
5711 optional=self.optional,
5712 _decoded=(offset, 0, value.fulllen),
5714 obj._value = (choice, value)
5715 yield decode_path, obj, tail
5718 value = pp_console_row(next(self.pps()))
5720 value = "%s[%r]" % (value, self.value)
5723 def pps(self, decode_path=()):
5726 asn1_type_name=self.asn1_type_name,
5727 obj_name=self.__class__.__name__,
5728 decode_path=decode_path,
5729 value=self.choice if self.ready else None,
5730 optional=self.optional,
5731 default=self == self.default,
5732 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5733 expl=None if self._expl is None else tag_decode(self._expl),
5738 expl_lenindef=self.expl_lenindef,
5742 yield self.value.pps(decode_path=decode_path + (self.choice,))
5743 for pp in self.pps_lenindef(decode_path):
5747 class PrimitiveTypes(Choice):
5748 """Predefined ``CHOICE`` for all generic primitive types
5750 It could be useful for general decoding of some unspecified values:
5752 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5753 OCTET STRING 3 bytes 666f6f
5754 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5758 schema = tuple((klass.__name__, klass()) for klass in (
5782 AnyState = namedtuple(
5784 BasicState._fields + ("value", "defined"),
5790 """``ANY`` special type
5792 >>> Any(Integer(-123))
5793 ANY INTEGER -123 (0X:7B)
5794 >>> a = Any(OctetString(b"hello world").encode())
5795 ANY 040b68656c6c6f20776f726c64
5796 >>> hexenc(bytes(a))
5797 b'0x040x0bhello world'
5799 __slots__ = ("defined",)
5800 tag_default = tag_encode(0)
5801 asn1_type_name = "ANY"
5811 :param value: set the value. Either any kind of pyderasn's
5812 **ready** object, or bytes. Pay attention that
5813 **no** validation is performed if raw binary value
5814 is valid TLV, except just tag decoding
5815 :param bytes expl: override default tag with ``EXPLICIT`` one
5816 :param bool optional: is object ``OPTIONAL`` in sequence
5818 super(Any, self).__init__(None, expl, None, optional, _decoded)
5822 value = self._value_sanitize(value)
5824 if self._expl is None:
5825 if value.__class__ == binary_type:
5826 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5828 tag_class, tag_num = value.tag_order
5830 tag_class, _, tag_num = tag_decode(self._expl)
5831 self._tag_order = (tag_class, tag_num)
5834 def _value_sanitize(self, value):
5835 if value.__class__ == binary_type:
5837 raise ValueError("Any value can not be empty")
5839 if isinstance(value, self.__class__):
5841 if not isinstance(value, Obj):
5842 raise InvalidValueType((self.__class__, Obj, binary_type))
5847 return self._value is not None
5850 def tag_order(self):
5851 self._assert_ready()
5852 return self._tag_order
5856 if self.expl_lenindef or self.lenindef:
5858 if self.defined is None:
5860 return self.defined[1].bered
5862 def __getstate__(self):
5880 def __setstate__(self, state):
5881 super(Any, self).__setstate__(state)
5882 self._value = state.value
5883 self.defined = state.defined
5885 def __eq__(self, their):
5886 if their.__class__ == binary_type:
5887 if self._value.__class__ == binary_type:
5888 return self._value == their
5889 return self._value.encode() == their
5890 if issubclass(their.__class__, Any):
5891 if self.ready and their.ready:
5892 return bytes(self) == bytes(their)
5893 return self.ready == their.ready
5902 return self.__class__(
5904 expl=self._expl if expl is None else expl,
5905 optional=self.optional if optional is None else optional,
5908 def __bytes__(self):
5909 self._assert_ready()
5911 if value.__class__ == binary_type:
5913 return self._value.encode()
5920 self._assert_ready()
5922 if value.__class__ == binary_type:
5924 return value.encode()
5926 def _encode1st(self, state):
5927 self._assert_ready()
5929 if value.__class__ == binary_type:
5930 return len(value), state
5931 return value.encode1st(state)
5933 def _encode2nd(self, writer, state_iter):
5935 if value.__class__ == binary_type:
5936 write_full(writer, value)
5938 value.encode2nd(writer, state_iter)
5940 def _encode_cer(self, writer):
5941 self._assert_ready()
5943 if value.__class__ == binary_type:
5944 write_full(writer, value)
5946 value.encode_cer(writer)
5948 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5950 t, tlen, lv = tag_strip(tlv)
5951 except DecodeError as err:
5952 raise err.__class__(
5954 klass=self.__class__,
5955 decode_path=decode_path,
5959 l, llen, v = len_decode(lv)
5960 except LenIndefForm as err:
5961 if not ctx.get("bered", False):
5962 raise err.__class__(
5964 klass=self.__class__,
5965 decode_path=decode_path,
5968 llen, vlen, v = 1, 0, lv[1:]
5969 sub_offset = offset + tlen + llen
5971 while v[:EOC_LEN].tobytes() != EOC:
5972 chunk, v = Any().decode(
5975 decode_path=decode_path + (str(chunk_i),),
5978 _ctx_immutable=False,
5980 vlen += chunk.tlvlen
5981 sub_offset += chunk.tlvlen
5983 tlvlen = tlen + llen + vlen + EOC_LEN
5984 obj = self.__class__(
5985 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
5987 optional=self.optional,
5988 _decoded=(offset, 0, tlvlen),
5991 obj.tag = t.tobytes()
5992 yield decode_path, obj, v[EOC_LEN:]
5994 except DecodeError as err:
5995 raise err.__class__(
5997 klass=self.__class__,
5998 decode_path=decode_path,
6002 raise NotEnoughData(
6003 "encoded length is longer than data",
6004 klass=self.__class__,
6005 decode_path=decode_path,
6008 tlvlen = tlen + llen + l
6009 v, tail = tlv[:tlvlen], v[l:]
6010 obj = self.__class__(
6011 value=None if evgen_mode else v.tobytes(),
6013 optional=self.optional,
6014 _decoded=(offset, 0, tlvlen),
6016 obj.tag = t.tobytes()
6017 yield decode_path, obj, tail
6020 return pp_console_row(next(self.pps()))
6022 def pps(self, decode_path=()):
6026 elif value.__class__ == binary_type:
6032 asn1_type_name=self.asn1_type_name,
6033 obj_name=self.__class__.__name__,
6034 decode_path=decode_path,
6036 blob=self._value if self._value.__class__ == binary_type else None,
6037 optional=self.optional,
6038 default=self == self.default,
6039 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6040 expl=None if self._expl is None else tag_decode(self._expl),
6045 expl_offset=self.expl_offset if self.expled else None,
6046 expl_tlen=self.expl_tlen if self.expled else None,
6047 expl_llen=self.expl_llen if self.expled else None,
6048 expl_vlen=self.expl_vlen if self.expled else None,
6049 expl_lenindef=self.expl_lenindef,
6050 lenindef=self.lenindef,
6053 defined_by, defined = self.defined or (None, None)
6054 if defined_by is not None:
6056 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6058 for pp in self.pps_lenindef(decode_path):
6062 ########################################################################
6063 # ASN.1 constructed types
6064 ########################################################################
6066 def abs_decode_path(decode_path, rel_path):
6067 """Create an absolute decode path from current and relative ones
6069 :param decode_path: current decode path, starting point. Tuple of strings
6070 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6071 If first tuple's element is "/", then treat it as
6072 an absolute path, ignoring ``decode_path`` as
6073 starting point. Also this tuple can contain ".."
6074 elements, stripping the leading element from
6077 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6078 ("foo", "bar", "baz", "whatever")
6079 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6081 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6084 if rel_path[0] == "/":
6086 if rel_path[0] == "..":
6087 return abs_decode_path(decode_path[:-1], rel_path[1:])
6088 return decode_path + rel_path
6091 SequenceState = namedtuple(
6093 BasicState._fields + ("specs", "value",),
6098 class SequenceEncode1stMixing(object):
6099 def _encode1st(self, state):
6101 idx = len(state) - 1
6103 for v in self._values_for_encoding():
6104 l, _ = v.encode1st(state)
6107 return len(self.tag) + len_size(vlen) + vlen, state
6110 class Sequence(SequenceEncode1stMixing, Obj):
6111 """``SEQUENCE`` structure type
6113 You have to make specification of sequence::
6115 class Extension(Sequence):
6117 ("extnID", ObjectIdentifier()),
6118 ("critical", Boolean(default=False)),
6119 ("extnValue", OctetString()),
6122 Then, you can work with it as with dictionary.
6124 >>> ext = Extension()
6125 >>> Extension().specs
6127 ('extnID', OBJECT IDENTIFIER),
6128 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6129 ('extnValue', OCTET STRING),
6131 >>> ext["extnID"] = "1.2.3"
6132 Traceback (most recent call last):
6133 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6134 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6136 You can determine if sequence is ready to be encoded:
6141 Traceback (most recent call last):
6142 pyderasn.ObjNotReady: object is not ready: extnValue
6143 >>> ext["extnValue"] = OctetString(b"foobar")
6147 Value you want to assign, must have the same **type** as in
6148 corresponding specification, but it can have different tags,
6149 optional/default attributes -- they will be taken from specification
6152 class TBSCertificate(Sequence):
6154 ("version", Version(expl=tag_ctxc(0), default="v1")),
6157 >>> tbs = TBSCertificate()
6158 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6160 Assign ``None`` to remove value from sequence.
6162 You can set values in Sequence during its initialization:
6164 >>> AlgorithmIdentifier((
6165 ("algorithm", ObjectIdentifier("1.2.3")),
6166 ("parameters", Any(Null()))
6168 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6170 You can determine if value exists/set in the sequence and take its value:
6172 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6175 OBJECT IDENTIFIER 1.2.3
6177 But pay attention that if value has default, then it won't be (not
6178 in) in the sequence (because ``DEFAULT`` must not be encoded in
6179 DER), but you can read its value:
6181 >>> "critical" in ext, ext["critical"]
6182 (False, BOOLEAN False)
6183 >>> ext["critical"] = Boolean(True)
6184 >>> "critical" in ext, ext["critical"]
6185 (True, BOOLEAN True)
6187 All defaulted values are always optional.
6189 .. _allow_default_values_ctx:
6191 DER prohibits default value encoding and will raise an error if
6192 default value is unexpectedly met during decode.
6193 If :ref:`bered <bered_ctx>` context option is set, then no error
6194 will be raised, but ``bered`` attribute set. You can disable strict
6195 defaulted values existence validation by setting
6196 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6200 Check for default value existence is not performed in
6201 ``evgen_mode``, because previously decoded values are not stored
6202 in memory, to be able to compare them.
6204 Two sequences are equal if they have equal specification (schema),
6205 implicit/explicit tagging and the same values.
6207 __slots__ = ("specs",)
6208 tag_default = tag_encode(form=TagFormConstructed, num=16)
6209 asn1_type_name = "SEQUENCE"
6221 super(Sequence, self).__init__(impl, expl, default, optional, _decoded)
6223 schema = getattr(self, "schema", ())
6225 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6228 if value is not None:
6229 if issubclass(value.__class__, Sequence):
6230 self._value = value._value
6231 elif hasattr(value, "__iter__"):
6232 for seq_key, seq_value in value:
6233 self[seq_key] = seq_value
6235 raise InvalidValueType((Sequence,))
6236 if default is not None:
6237 if not issubclass(default.__class__, Sequence):
6238 raise InvalidValueType((Sequence,))
6239 default_value = default._value
6240 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6241 default_obj.specs = self.specs
6242 default_obj._value = default_value
6243 self.default = default_obj
6245 self._value = copy(default_obj._value)
6249 for name, spec in iteritems(self.specs):
6250 value = self._value.get(name)
6261 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6263 return any(value.bered for value in itervalues(self._value))
6265 def __getstate__(self):
6266 return SequenceState(
6280 {k: copy(v) for k, v in iteritems(self._value)},
6283 def __setstate__(self, state):
6284 super(Sequence, self).__setstate__(state)
6285 self.specs = state.specs
6286 self._value = state.value
6288 def __eq__(self, their):
6289 if not isinstance(their, self.__class__):
6292 self.specs == their.specs and
6293 self.tag == their.tag and
6294 self._expl == their._expl and
6295 self._value == their._value
6306 return self.__class__(
6309 impl=self.tag if impl is None else impl,
6310 expl=self._expl if expl is None else expl,
6311 default=self.default if default is None else default,
6312 optional=self.optional if optional is None else optional,
6315 def __contains__(self, key):
6316 return key in self._value
6318 def __setitem__(self, key, value):
6319 spec = self.specs.get(key)
6321 raise ObjUnknown(key)
6323 self._value.pop(key, None)
6325 if not isinstance(value, spec.__class__):
6326 raise InvalidValueType((spec.__class__,))
6327 value = spec(value=value)
6328 if spec.default is not None and value == spec.default:
6329 self._value.pop(key, None)
6331 self._value[key] = value
6333 def __getitem__(self, key):
6334 value = self._value.get(key)
6335 if value is not None:
6337 spec = self.specs.get(key)
6339 raise ObjUnknown(key)
6340 if spec.default is not None:
6344 def _values_for_encoding(self):
6345 for name, spec in iteritems(self.specs):
6346 value = self._value.get(name)
6350 raise ObjNotReady(name)
6354 v = b"".join(v.encode() for v in self._values_for_encoding())
6355 return b"".join((self.tag, len_encode(len(v)), v))
6357 def _encode2nd(self, writer, state_iter):
6358 write_full(writer, self.tag + len_encode(next(state_iter)))
6359 for v in self._values_for_encoding():
6360 v.encode2nd(writer, state_iter)
6362 def _encode_cer(self, writer):
6363 write_full(writer, self.tag + LENINDEF)
6364 for v in self._values_for_encoding():
6365 v.encode_cer(writer)
6366 write_full(writer, EOC)
6368 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6370 t, tlen, lv = tag_strip(tlv)
6371 except DecodeError as err:
6372 raise err.__class__(
6374 klass=self.__class__,
6375 decode_path=decode_path,
6380 klass=self.__class__,
6381 decode_path=decode_path,
6384 if tag_only: # pragma: no cover
6388 ctx_bered = ctx.get("bered", False)
6390 l, llen, v = len_decode(lv)
6391 except LenIndefForm as err:
6393 raise err.__class__(
6395 klass=self.__class__,
6396 decode_path=decode_path,
6399 l, llen, v = 0, 1, lv[1:]
6401 except DecodeError as err:
6402 raise err.__class__(
6404 klass=self.__class__,
6405 decode_path=decode_path,
6409 raise NotEnoughData(
6410 "encoded length is longer than data",
6411 klass=self.__class__,
6412 decode_path=decode_path,
6416 v, tail = v[:l], v[l:]
6418 sub_offset = offset + tlen + llen
6421 ctx_allow_default_values = ctx.get("allow_default_values", False)
6422 for name, spec in iteritems(self.specs):
6423 if spec.optional and (
6424 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6428 sub_decode_path = decode_path + (name,)
6431 for _decode_path, value, v_tail in spec.decode_evgen(
6435 decode_path=sub_decode_path,
6437 _ctx_immutable=False,
6439 yield _decode_path, value, v_tail
6441 _, value, v_tail = next(spec.decode_evgen(
6445 decode_path=sub_decode_path,
6447 _ctx_immutable=False,
6450 except TagMismatch as err:
6451 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6455 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6456 if not evgen_mode and defined is not None:
6457 defined_by, defined_spec = defined
6458 if issubclass(value.__class__, SequenceOf):
6459 for i, _value in enumerate(value):
6460 sub_sub_decode_path = sub_decode_path + (
6462 DecodePathDefBy(defined_by),
6464 defined_value, defined_tail = defined_spec.decode(
6465 memoryview(bytes(_value)),
6467 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6468 if value.expled else (value.tlen + value.llen)
6471 decode_path=sub_sub_decode_path,
6473 _ctx_immutable=False,
6475 if len(defined_tail) > 0:
6478 klass=self.__class__,
6479 decode_path=sub_sub_decode_path,
6482 _value.defined = (defined_by, defined_value)
6484 defined_value, defined_tail = defined_spec.decode(
6485 memoryview(bytes(value)),
6487 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6488 if value.expled else (value.tlen + value.llen)
6491 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6493 _ctx_immutable=False,
6495 if len(defined_tail) > 0:
6498 klass=self.__class__,
6499 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6502 value.defined = (defined_by, defined_value)
6504 value_len = value.fulllen
6506 sub_offset += value_len
6509 if spec.default is not None and value == spec.default:
6510 # This will not work in evgen_mode
6511 if ctx_bered or ctx_allow_default_values:
6515 "DEFAULT value met",
6516 klass=self.__class__,
6517 decode_path=sub_decode_path,
6520 values[name] = value
6521 spec_defines = getattr(spec, "defines", ())
6522 if len(spec_defines) == 0:
6523 defines_by_path = ctx.get("defines_by_path", ())
6524 if len(defines_by_path) > 0:
6525 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6526 if spec_defines is not None and len(spec_defines) > 0:
6527 for rel_path, schema in spec_defines:
6528 defined = schema.get(value, None)
6529 if defined is not None:
6530 ctx.setdefault("_defines", []).append((
6531 abs_decode_path(sub_decode_path[:-1], rel_path),
6535 if v[:EOC_LEN].tobytes() != EOC:
6538 klass=self.__class__,
6539 decode_path=decode_path,
6547 klass=self.__class__,
6548 decode_path=decode_path,
6551 obj = self.__class__(
6555 default=self.default,
6556 optional=self.optional,
6557 _decoded=(offset, llen, vlen),
6560 obj.lenindef = lenindef
6561 obj.ber_encoded = ber_encoded
6562 yield decode_path, obj, tail
6565 value = pp_console_row(next(self.pps()))
6567 for name in self.specs:
6568 _value = self._value.get(name)
6571 cols.append("%s: %s" % (name, repr(_value)))
6572 return "%s[%s]" % (value, "; ".join(cols))
6574 def pps(self, decode_path=()):
6577 asn1_type_name=self.asn1_type_name,
6578 obj_name=self.__class__.__name__,
6579 decode_path=decode_path,
6580 optional=self.optional,
6581 default=self == self.default,
6582 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6583 expl=None if self._expl is None else tag_decode(self._expl),
6588 expl_offset=self.expl_offset if self.expled else None,
6589 expl_tlen=self.expl_tlen if self.expled else None,
6590 expl_llen=self.expl_llen if self.expled else None,
6591 expl_vlen=self.expl_vlen if self.expled else None,
6592 expl_lenindef=self.expl_lenindef,
6593 lenindef=self.lenindef,
6594 ber_encoded=self.ber_encoded,
6597 for name in self.specs:
6598 value = self._value.get(name)
6601 yield value.pps(decode_path=decode_path + (name,))
6602 for pp in self.pps_lenindef(decode_path):
6606 class Set(Sequence, SequenceEncode1stMixing):
6607 """``SET`` structure type
6609 Its usage is identical to :py:class:`pyderasn.Sequence`.
6611 .. _allow_unordered_set_ctx:
6613 DER prohibits unordered values encoding and will raise an error
6614 during decode. If :ref:`bered <bered_ctx>` context option is set,
6615 then no error will occur. Also you can disable strict values
6616 ordering check by setting ``"allow_unordered_set": True``
6617 :ref:`context <ctx>` option.
6620 tag_default = tag_encode(form=TagFormConstructed, num=17)
6621 asn1_type_name = "SET"
6623 def _values_for_encoding(self):
6625 super(Set, self)._values_for_encoding(),
6626 key=attrgetter("tag_order"),
6629 def _encode_cer(self, writer):
6630 write_full(writer, self.tag + LENINDEF)
6632 super(Set, self)._values_for_encoding(),
6633 key=attrgetter("tag_order_cer"),
6635 v.encode_cer(writer)
6636 write_full(writer, EOC)
6638 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6640 t, tlen, lv = tag_strip(tlv)
6641 except DecodeError as err:
6642 raise err.__class__(
6644 klass=self.__class__,
6645 decode_path=decode_path,
6650 klass=self.__class__,
6651 decode_path=decode_path,
6658 ctx_bered = ctx.get("bered", False)
6660 l, llen, v = len_decode(lv)
6661 except LenIndefForm as err:
6663 raise err.__class__(
6665 klass=self.__class__,
6666 decode_path=decode_path,
6669 l, llen, v = 0, 1, lv[1:]
6671 except DecodeError as err:
6672 raise err.__class__(
6674 klass=self.__class__,
6675 decode_path=decode_path,
6679 raise NotEnoughData(
6680 "encoded length is longer than data",
6681 klass=self.__class__,
6685 v, tail = v[:l], v[l:]
6687 sub_offset = offset + tlen + llen
6690 ctx_allow_default_values = ctx.get("allow_default_values", False)
6691 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6692 tag_order_prev = (0, 0)
6693 _specs_items = copy(self.specs)
6696 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6698 for name, spec in iteritems(_specs_items):
6699 sub_decode_path = decode_path + (name,)
6705 decode_path=sub_decode_path,
6708 _ctx_immutable=False,
6715 klass=self.__class__,
6716 decode_path=decode_path,
6720 for _decode_path, value, v_tail in spec.decode_evgen(
6724 decode_path=sub_decode_path,
6726 _ctx_immutable=False,
6728 yield _decode_path, value, v_tail
6730 _, value, v_tail = next(spec.decode_evgen(
6734 decode_path=sub_decode_path,
6736 _ctx_immutable=False,
6739 value_tag_order = value.tag_order
6740 value_len = value.fulllen
6741 if tag_order_prev >= value_tag_order:
6742 if ctx_bered or ctx_allow_unordered_set:
6746 "unordered " + self.asn1_type_name,
6747 klass=self.__class__,
6748 decode_path=sub_decode_path,
6751 if spec.default is None or value != spec.default:
6753 elif ctx_bered or ctx_allow_default_values:
6757 "DEFAULT value met",
6758 klass=self.__class__,
6759 decode_path=sub_decode_path,
6762 values[name] = value
6763 del _specs_items[name]
6764 tag_order_prev = value_tag_order
6765 sub_offset += value_len
6769 obj = self.__class__(
6773 default=self.default,
6774 optional=self.optional,
6775 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6778 if v[:EOC_LEN].tobytes() != EOC:
6781 klass=self.__class__,
6782 decode_path=decode_path,
6787 for name, spec in iteritems(self.specs):
6788 if name not in values and not spec.optional:
6790 "%s value is not ready" % name,
6791 klass=self.__class__,
6792 decode_path=decode_path,
6797 obj.ber_encoded = ber_encoded
6798 yield decode_path, obj, tail
6801 SequenceOfState = namedtuple(
6803 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6808 class SequenceOf(SequenceEncode1stMixing, Obj):
6809 """``SEQUENCE OF`` sequence type
6811 For that kind of type you must specify the object it will carry on
6812 (bounds are for example here, not required)::
6814 class Ints(SequenceOf):
6819 >>> ints.append(Integer(123))
6820 >>> ints.append(Integer(234))
6822 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6823 >>> [int(i) for i in ints]
6825 >>> ints.append(Integer(345))
6826 Traceback (most recent call last):
6827 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6830 >>> ints[1] = Integer(345)
6832 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6834 You can initialize sequence with preinitialized values:
6836 >>> ints = Ints([Integer(123), Integer(234)])
6838 Also you can use iterator as a value:
6840 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6842 And it won't be iterated until encoding process. Pay attention that
6843 bounds and required schema checks are done only during the encoding
6844 process in that case! After encode was called, then value is zeroed
6845 back to empty list and you have to set it again. That mode is useful
6846 mainly with CER encoding mode, where all objects from the iterable
6847 will be streamed to the buffer, without copying all of them to
6850 __slots__ = ("spec", "_bound_min", "_bound_max")
6851 tag_default = tag_encode(form=TagFormConstructed, num=16)
6852 asn1_type_name = "SEQUENCE OF"
6865 super(SequenceOf, self).__init__(impl, expl, default, optional, _decoded)
6867 schema = getattr(self, "schema", None)
6869 raise ValueError("schema must be specified")
6871 self._bound_min, self._bound_max = getattr(
6875 ) if bounds is None else bounds
6877 if value is not None:
6878 self._value = self._value_sanitize(value)
6879 if default is not None:
6880 default_value = self._value_sanitize(default)
6881 default_obj = self.__class__(
6886 default_obj._value = default_value
6887 self.default = default_obj
6889 self._value = copy(default_obj._value)
6891 def _value_sanitize(self, value):
6893 if issubclass(value.__class__, SequenceOf):
6894 value = value._value
6895 elif hasattr(value, NEXT_ATTR_NAME):
6897 elif hasattr(value, "__iter__"):
6900 raise InvalidValueType((self.__class__, iter, "iterator"))
6902 if not self._bound_min <= len(value) <= self._bound_max:
6903 raise BoundsError(self._bound_min, len(value), self._bound_max)
6904 class_expected = self.spec.__class__
6906 if not isinstance(v, class_expected):
6907 raise InvalidValueType((class_expected,))
6912 if hasattr(self._value, NEXT_ATTR_NAME):
6914 if self._bound_min > 0 and len(self._value) == 0:
6916 return all(v.ready for v in self._value)
6920 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6922 return any(v.bered for v in self._value)
6924 def __getstate__(self):
6925 if hasattr(self._value, NEXT_ATTR_NAME):
6926 raise ValueError("can not pickle SequenceOf with iterator")
6927 return SequenceOfState(
6941 [copy(v) for v in self._value],
6946 def __setstate__(self, state):
6947 super(SequenceOf, self).__setstate__(state)
6948 self.spec = state.spec
6949 self._value = state.value
6950 self._bound_min = state.bound_min
6951 self._bound_max = state.bound_max
6953 def __eq__(self, their):
6954 if isinstance(their, self.__class__):
6956 self.spec == their.spec and
6957 self.tag == their.tag and
6958 self._expl == their._expl and
6959 self._value == their._value
6961 if hasattr(their, "__iter__"):
6962 return self._value == list(their)
6974 return self.__class__(
6978 (self._bound_min, self._bound_max)
6979 if bounds is None else bounds
6981 impl=self.tag if impl is None else impl,
6982 expl=self._expl if expl is None else expl,
6983 default=self.default if default is None else default,
6984 optional=self.optional if optional is None else optional,
6987 def __contains__(self, key):
6988 return key in self._value
6990 def append(self, value):
6991 if not isinstance(value, self.spec.__class__):
6992 raise InvalidValueType((self.spec.__class__,))
6993 if len(self._value) + 1 > self._bound_max:
6996 len(self._value) + 1,
6999 self._value.append(value)
7002 return iter(self._value)
7005 return len(self._value)
7007 def __setitem__(self, key, value):
7008 if not isinstance(value, self.spec.__class__):
7009 raise InvalidValueType((self.spec.__class__,))
7010 self._value[key] = self.spec(value=value)
7012 def __getitem__(self, key):
7013 return self._value[key]
7015 def _values_for_encoding(self):
7016 return iter(self._value)
7019 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7022 values_append = values.append
7023 class_expected = self.spec.__class__
7024 values_for_encoding = self._values_for_encoding()
7026 for v in values_for_encoding:
7027 if not isinstance(v, class_expected):
7028 raise InvalidValueType((class_expected,))
7029 values_append(v.encode())
7030 if not self._bound_min <= len(values) <= self._bound_max:
7031 raise BoundsError(self._bound_min, len(values), self._bound_max)
7032 value = b"".join(values)
7034 value = b"".join(v.encode() for v in self._values_for_encoding())
7035 return b"".join((self.tag, len_encode(len(value)), value))
7037 def _encode1st(self, state):
7038 state = super(SequenceOf, self)._encode1st(state)
7039 if hasattr(self._value, NEXT_ATTR_NAME):
7043 def _encode2nd(self, writer, state_iter):
7044 write_full(writer, self.tag + len_encode(next(state_iter)))
7045 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7048 class_expected = self.spec.__class__
7049 values_for_encoding = self._values_for_encoding()
7051 for v in values_for_encoding:
7052 if not isinstance(v, class_expected):
7053 raise InvalidValueType((class_expected,))
7054 v.encode2nd(writer, state_iter)
7056 if not self._bound_min <= values_count <= self._bound_max:
7057 raise BoundsError(self._bound_min, values_count, self._bound_max)
7059 for v in self._values_for_encoding():
7060 v.encode2nd(writer, state_iter)
7062 def _encode_cer(self, writer):
7063 write_full(writer, self.tag + LENINDEF)
7064 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7066 class_expected = self.spec.__class__
7068 values_for_encoding = self._values_for_encoding()
7070 for v in values_for_encoding:
7071 if not isinstance(v, class_expected):
7072 raise InvalidValueType((class_expected,))
7073 v.encode_cer(writer)
7075 if not self._bound_min <= values_count <= self._bound_max:
7076 raise BoundsError(self._bound_min, values_count, self._bound_max)
7078 for v in self._values_for_encoding():
7079 v.encode_cer(writer)
7080 write_full(writer, EOC)
7090 ordering_check=False,
7093 t, tlen, lv = tag_strip(tlv)
7094 except DecodeError as err:
7095 raise err.__class__(
7097 klass=self.__class__,
7098 decode_path=decode_path,
7103 klass=self.__class__,
7104 decode_path=decode_path,
7111 ctx_bered = ctx.get("bered", False)
7113 l, llen, v = len_decode(lv)
7114 except LenIndefForm as err:
7116 raise err.__class__(
7118 klass=self.__class__,
7119 decode_path=decode_path,
7122 l, llen, v = 0, 1, lv[1:]
7124 except DecodeError as err:
7125 raise err.__class__(
7127 klass=self.__class__,
7128 decode_path=decode_path,
7132 raise NotEnoughData(
7133 "encoded length is longer than data",
7134 klass=self.__class__,
7135 decode_path=decode_path,
7139 v, tail = v[:l], v[l:]
7141 sub_offset = offset + tlen + llen
7144 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7145 value_prev = memoryview(v[:0])
7149 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7151 sub_decode_path = decode_path + (str(_value_count),)
7153 for _decode_path, value, v_tail in spec.decode_evgen(
7157 decode_path=sub_decode_path,
7159 _ctx_immutable=False,
7161 yield _decode_path, value, v_tail
7163 _, value, v_tail = next(spec.decode_evgen(
7167 decode_path=sub_decode_path,
7169 _ctx_immutable=False,
7172 value_len = value.fulllen
7174 if value_prev.tobytes() > v[:value_len].tobytes():
7175 if ctx_bered or ctx_allow_unordered_set:
7179 "unordered " + self.asn1_type_name,
7180 klass=self.__class__,
7181 decode_path=sub_decode_path,
7184 value_prev = v[:value_len]
7187 _value.append(value)
7188 sub_offset += value_len
7191 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7193 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7194 klass=self.__class__,
7195 decode_path=decode_path,
7199 obj = self.__class__(
7200 value=None if evgen_mode else _value,
7202 bounds=(self._bound_min, self._bound_max),
7205 default=self.default,
7206 optional=self.optional,
7207 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7209 except BoundsError as err:
7212 klass=self.__class__,
7213 decode_path=decode_path,
7217 if v[:EOC_LEN].tobytes() != EOC:
7220 klass=self.__class__,
7221 decode_path=decode_path,
7226 obj.ber_encoded = ber_encoded
7227 yield decode_path, obj, tail
7231 pp_console_row(next(self.pps())),
7232 ", ".join(repr(v) for v in self._value),
7235 def pps(self, decode_path=()):
7238 asn1_type_name=self.asn1_type_name,
7239 obj_name=self.__class__.__name__,
7240 decode_path=decode_path,
7241 optional=self.optional,
7242 default=self == self.default,
7243 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7244 expl=None if self._expl is None else tag_decode(self._expl),
7249 expl_offset=self.expl_offset if self.expled else None,
7250 expl_tlen=self.expl_tlen if self.expled else None,
7251 expl_llen=self.expl_llen if self.expled else None,
7252 expl_vlen=self.expl_vlen if self.expled else None,
7253 expl_lenindef=self.expl_lenindef,
7254 lenindef=self.lenindef,
7255 ber_encoded=self.ber_encoded,
7258 for i, value in enumerate(self._value):
7259 yield value.pps(decode_path=decode_path + (str(i),))
7260 for pp in self.pps_lenindef(decode_path):
7264 class SetOf(SequenceOf):
7265 """``SET OF`` sequence type
7267 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7270 tag_default = tag_encode(form=TagFormConstructed, num=17)
7271 asn1_type_name = "SET OF"
7273 def _value_sanitize(self, value):
7274 value = super(SetOf, self)._value_sanitize(value)
7275 if hasattr(value, NEXT_ATTR_NAME):
7277 "SetOf does not support iterator values, as no sense in them"
7282 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7283 return b"".join((self.tag, len_encode(len(v)), v))
7285 def _encode2nd(self, writer, state_iter):
7286 write_full(writer, self.tag + len_encode(next(state_iter)))
7288 for v in self._values_for_encoding():
7290 v.encode2nd(buf.write, state_iter)
7291 values.append(buf.getvalue())
7294 write_full(writer, v)
7296 def _encode_cer(self, writer):
7297 write_full(writer, self.tag + LENINDEF)
7298 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7299 write_full(writer, v)
7300 write_full(writer, EOC)
7302 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7303 return super(SetOf, self)._decode(
7310 ordering_check=True,
7314 def obj_by_path(pypath): # pragma: no cover
7315 """Import object specified as string Python path
7317 Modules must be separated from classes/functions with ``:``.
7319 >>> obj_by_path("foo.bar:Baz")
7320 <class 'foo.bar.Baz'>
7321 >>> obj_by_path("foo.bar:Baz.boo")
7322 <classmethod 'foo.bar.Baz.boo'>
7324 mod, objs = pypath.rsplit(":", 1)
7325 from importlib import import_module
7326 obj = import_module(mod)
7327 for obj_name in objs.split("."):
7328 obj = getattr(obj, obj_name)
7332 def generic_decoder(): # pragma: no cover
7333 # All of this below is a big hack with self references
7334 choice = PrimitiveTypes()
7335 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7336 choice.specs["SetOf"] = SetOf(schema=choice)
7337 for i in six_xrange(31):
7338 choice.specs["SequenceOf%d" % i] = SequenceOf(
7342 choice.specs["Any"] = Any()
7344 # Class name equals to type name, to omit it from output
7345 class SEQUENCEOF(SequenceOf):
7353 with_decode_path=False,
7354 decode_path_only=(),
7357 def _pprint_pps(pps):
7359 if hasattr(pp, "_fields"):
7361 decode_path_only != () and
7362 pp.decode_path[:len(decode_path_only)] != decode_path_only
7365 if pp.asn1_type_name == Choice.asn1_type_name:
7367 pp_kwargs = pp._asdict()
7368 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7369 pp = _pp(**pp_kwargs)
7370 yield pp_console_row(
7375 with_colours=with_colours,
7376 with_decode_path=with_decode_path,
7377 decode_path_len_decrease=len(decode_path_only),
7379 for row in pp_console_blob(
7381 decode_path_len_decrease=len(decode_path_only),
7385 for row in _pprint_pps(pp):
7387 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7388 return SEQUENCEOF(), pprint_any
7391 def main(): # pragma: no cover
7393 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7394 parser.add_argument(
7398 help="Skip that number of bytes from the beginning",
7400 parser.add_argument(
7402 help="Python paths to dictionary with OIDs, comma separated",
7404 parser.add_argument(
7406 help="Python path to schema definition to use",
7408 parser.add_argument(
7409 "--defines-by-path",
7410 help="Python path to decoder's defines_by_path",
7412 parser.add_argument(
7414 action="store_true",
7415 help="Disallow BER encoding",
7417 parser.add_argument(
7418 "--print-decode-path",
7419 action="store_true",
7420 help="Print decode paths",
7422 parser.add_argument(
7423 "--decode-path-only",
7424 help="Print only specified decode path",
7426 parser.add_argument(
7428 action="store_true",
7429 help="Allow explicit tag out-of-bound",
7431 parser.add_argument(
7433 action="store_true",
7434 help="Turn on event generation mode",
7436 parser.add_argument(
7438 type=argparse.FileType("rb"),
7439 help="Path to BER/CER/DER file you want to decode",
7441 args = parser.parse_args()
7443 args.RAWFile.seek(args.skip)
7444 raw = memoryview(args.RAWFile.read())
7445 args.RAWFile.close()
7447 raw = file_mmaped(args.RAWFile)[args.skip:]
7449 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7450 if args.oids else ()
7452 from functools import partial
7454 schema = obj_by_path(args.schema)
7455 pprinter = partial(pprint, big_blobs=True)
7457 schema, pprinter = generic_decoder()
7459 "bered": not args.nobered,
7460 "allow_expl_oob": args.allow_expl_oob,
7462 if args.defines_by_path is not None:
7463 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7464 from os import environ
7468 with_colours=environ.get("NO_COLOR") is None,
7469 with_decode_path=args.print_decode_path,
7471 () if args.decode_path_only is None else
7472 tuple(args.decode_path_only.split(":"))
7476 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7477 print(pprinter(obj, decode_path=decode_path))
7479 obj, tail = schema().decode(raw, ctx=ctx)
7480 print(pprinter(obj))
7482 print("\nTrailing data: %s" % hexenc(tail))
7485 if __name__ == "__main__":