3 # cython: language_level=3
4 # pylint: disable=line-too-long,superfluous-parens,protected-access,too-many-lines
5 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
6 # PyDERASN -- Python ASN.1 DER/CER/BER codec with abstract structures
7 # Copyright (C) 2017-2024 Sergey Matveev <stargrave@stargrave.org>
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Lesser General Public License as
11 # published by the Free Software Foundation, version 3 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public
19 # License along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Python ASN.1 DER/CER/BER codec with abstract structures
22 This library allows you to marshal various structures in ASN.1 DER/CER
23 format, unmarshal BER/CER/DER ones.
27 >>> Integer().decod(raw) == i
30 There are primitive types, holding single values
31 (:py:class:`pyderasn.BitString`,
32 :py:class:`pyderasn.Boolean`,
33 :py:class:`pyderasn.Enumerated`,
34 :py:class:`pyderasn.GeneralizedTime`,
35 :py:class:`pyderasn.Integer`,
36 :py:class:`pyderasn.Null`,
37 :py:class:`pyderasn.ObjectIdentifier`,
38 :py:class:`pyderasn.OctetString`,
39 :py:class:`pyderasn.UTCTime`,
40 :py:class:`various strings <pyderasn.CommonString>`
41 (:py:class:`pyderasn.BMPString`,
42 :py:class:`pyderasn.GeneralString`,
43 :py:class:`pyderasn.GraphicString`,
44 :py:class:`pyderasn.IA5String`,
45 :py:class:`pyderasn.ISO646String`,
46 :py:class:`pyderasn.NumericString`,
47 :py:class:`pyderasn.PrintableString`,
48 :py:class:`pyderasn.T61String`,
49 :py:class:`pyderasn.TeletexString`,
50 :py:class:`pyderasn.UniversalString`,
51 :py:class:`pyderasn.UTF8String`,
52 :py:class:`pyderasn.VideotexString`,
53 :py:class:`pyderasn.VisibleString`)),
54 constructed types, holding multiple primitive types
55 (:py:class:`pyderasn.Sequence`,
56 :py:class:`pyderasn.SequenceOf`,
57 :py:class:`pyderasn.Set`,
58 :py:class:`pyderasn.SetOf`),
59 and special types like
60 :py:class:`pyderasn.Any` and
61 :py:class:`pyderasn.Choice`.
69 Most types in ASN.1 has specific tag for them. ``Obj.tag_default`` is
70 the default tag used during coding process. You can override it with
71 either ``IMPLICIT`` (using either ``impl`` keyword argument or ``impl``
72 class attribute), or ``EXPLICIT`` one (using either ``expl`` keyword
73 argument or ``expl`` class attribute). Both arguments take raw binary
74 string, containing that tag. You can **not** set implicit and explicit
77 There are :py:func:`pyderasn.tag_ctxp` and :py:func:`pyderasn.tag_ctxc`
78 functions, allowing you to easily create ``CONTEXT``
79 ``PRIMITIVE``/``CONSTRUCTED`` tags, by specifying only the required tag
84 EXPLICIT tags always have **constructed** tag. PyDERASN does not
85 explicitly check correctness of schema input here.
89 Implicit tags have **primitive** (``tag_ctxp``) encoding for
94 >>> Integer(impl=tag_ctxp(1))
96 >>> Integer(expl=tag_ctxc(2))
99 Implicit tag is not explicitly shown.
101 Two objects of the same type, but with different implicit/explicit tags
104 You can get object's effective tag (either default or implicited) through
105 ``tag`` property. You can decode it using :py:func:`pyderasn.tag_decode`
108 >>> tag_decode(tag_ctxc(123))
110 >>> klass, form, num = tag_decode(tag_ctxc(123))
111 >>> klass == TagClassContext
113 >>> form == TagFormConstructed
116 To determine if object has explicit tag, use ``expled`` boolean property
117 and ``expl_tag`` property, returning explicit tag's value.
122 Many objects in sequences could be ``OPTIONAL`` and could have
123 ``DEFAULT`` value. You can specify that object's property using
124 corresponding keyword arguments.
126 >>> Integer(optional=True, default=123)
127 INTEGER 123 OPTIONAL DEFAULT
129 Those specifications do not play any role in primitive value encoding,
130 but are taken into account when dealing with sequences holding them. For
131 example ``TBSCertificate`` sequence holds defaulted, explicitly tagged
134 class Version(Integer):
140 class TBSCertificate(Sequence):
142 ("version", Version(expl=tag_ctxc(0), default="v1")),
145 When default argument is used and value is not specified, then it equals
153 Some objects give ability to set value size constraints. This is either
154 possible integer value, or allowed length of various strings and
155 sequences. Constraints are set in the following way::
160 And values satisfaction is checked as: ``MIN <= X <= MAX``.
162 For simplicity you can also set bounds the following way::
164 bounded_x = X(bounds=(MIN, MAX))
166 If bounds are not satisfied, then :py:exc:`pyderasn.BoundsError` is
172 All objects have ``ready`` boolean property, that tells if object is
173 ready to be encoded. If that kind of action is performed on unready
174 object, then :py:exc:`pyderasn.ObjNotReady` exception will be raised.
176 All objects are friendly to ``copy.copy()`` and copied objects can be
179 Also all objects can be safely ``pickle``-d, but pay attention that
180 pickling among different PyDERASN versions is prohibited.
187 Decoding is performed using :py:meth:`pyderasn.Obj.decode` method.
188 ``offset`` optional argument could be used to set initial object's
189 offset in the binary data, for convenience. It returns decoded object
190 and remaining unmarshalled data (tail). Internally all work is done on
191 ``memoryview(data)``, and you can leave returning tail as a memoryview,
192 by specifying ``leavemm=True`` argument.
194 Also note convenient :py:meth:`pyderasn.Obj.decod` method, that
195 immediately checks and raises if there is non-empty tail.
197 When object is decoded, ``decoded`` property is true and you can safely
198 use following properties:
200 * ``offset`` -- position including initial offset where object's tag starts
201 * ``tlen`` -- length of object's tag
202 * ``llen`` -- length of object's length value
203 * ``vlen`` -- length of object's value
204 * ``tlvlen`` -- length of the whole object
206 Pay attention that those values do **not** include anything related to
207 explicit tag. If you want to know information about it, then use:
209 * ``expled`` -- to know if explicit tag is set
210 * ``expl_offset`` (it is lesser than ``offset``)
213 * ``expl_vlen`` (that actually equals to ordinary ``tlvlen``)
214 * ``fulloffset`` -- it equals to ``expl_offset`` if explicit tag is set,
216 * ``fulllen`` -- it equals to ``expl_len`` if explicit tag is set,
219 When error occurs, :py:exc:`pyderasn.DecodeError` is raised.
226 You can specify so called context keyword argument during
227 :py:meth:`pyderasn.Obj.decode` invocation. It is dictionary containing
228 various options governing decoding process.
230 Currently available context options:
232 * :ref:`allow_default_values <allow_default_values_ctx>`
233 * :ref:`allow_expl_oob <allow_expl_oob_ctx>`
234 * :ref:`allow_unordered_set <allow_unordered_set_ctx>`
235 * :ref:`bered <bered_ctx>`
236 * :ref:`defines_by_path <defines_by_path_ctx>`
237 * :ref:`evgen_mode_upto <evgen_mode_upto_ctx>`
238 * :ref:`keep_memoryview <keep_memoryview_ctx>`
245 All objects have ``pps()`` method, that is a generator of
246 :py:class:`pyderasn.PP` namedtuple, holding various raw information
247 about the object. If ``pps`` is called on sequences, then all underlying
248 ``PP`` will be yielded.
250 You can use :py:func:`pyderasn.pp_console_row` function, converting
251 those ``PP`` to human readable string. Actually exactly it is used for
252 all object ``repr``. But it is easy to write custom formatters.
254 >>> from pyderasn import pprint
255 >>> encoded = Integer(-12345).encode()
256 >>> obj, tail = Integer().decode(encoded)
257 >>> print(pprint(obj))
258 0 [1,1, 2] INTEGER -12345
262 Example certificate::
264 >>> print(pprint(crt))
265 0 [1,3,1604] Certificate SEQUENCE
266 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE
267 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
268 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595
269 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
270 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
271 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
273 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
274 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF
275 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF
276 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE
277 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6
278 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY
279 . . . . . . . 13:02:45:53
281 1461 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
282 1463 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
283 1474 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
285 1476 [1,2, 129] . signatureValue: BIT STRING 1024 bits
286 . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
287 . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
292 Let's parse that output, human::
294 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL
295 ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
296 0 1 2 3 4 5 6 7 8 9 10 11
300 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
306 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence
312 52-2∞ B [1,1,1054]∞ . . . . eContent: [0] EXPLICIT BER OCTET STRING 1046 bytes
317 Offset of the object, where its DER/BER encoding begins.
318 Pay attention that it does **not** include explicit tag.
320 If explicit tag exists, then this is its length (tag + encoded length).
322 Length of object's tag. For example CHOICE does not have its own tag,
325 Length of encoded length.
327 Length of encoded value.
329 Visual indentation to show the depth of object in the hierarchy.
331 Object's name inside SEQUENCE/CHOICE.
333 If either IMPLICIT or EXPLICIT tag is set, then it will be shown
334 here. "IMPLICIT" is omitted.
336 Object's class name, if set. Omitted if it is just an ordinary simple
337 value (like with ``algorithm`` in example above).
341 Object's value, if set. Can consist of multiple words (like OCTET/BIT
342 STRINGs above). We see ``v3`` value in Version, because it is named.
343 ``rdnSequence`` is the choice of CHOICE type.
345 Possible other flags like OPTIONAL and DEFAULT, if value equals to the
346 default one, specified in the schema.
348 Shows does object contains any kind of BER encoded data (possibly
349 Sequence holding BER-encoded underlying value).
351 Only applicable to BER encoded data. Indefinite length encoding mark.
353 Only applicable to BER encoded data. If object has BER-specific
354 encoding, then ``BER`` will be shown. It does not depend on indefinite
355 length encoding. ``EOC``, ``BOOLEAN``, ``BIT STRING``, ``OCTET STRING``
356 (and its derivatives), ``SET``, ``SET OF``, ``UTCTime``, ``GeneralizedTime``
359 Also it could be helpful to add quick ASN.1 pprinting command in your
360 pdb's configuration file::
362 alias pp1 import pyderasn ;; print(pyderasn.pprint(%1, oid_maps=(locals().get("OID_STR_TO_NAME", {}),)))
369 ASN.1 structures often have ANY and OCTET STRING fields, that are
370 DEFINED BY some previously met ObjectIdentifier. This library provides
371 ability to specify mapping between some OID and field that must be
372 decoded with specific specification.
379 :py:class:`pyderasn.ObjectIdentifier` field inside
380 :py:class:`pyderasn.Sequence` can hold mapping between OIDs and
381 necessary for decoding structures. For example, CMS (:rfc:`5652`)
384 class ContentInfo(Sequence):
386 ("contentType", ContentType(defines=((("content",), {
387 id_digestedData: DigestedData(),
388 id_signedData: SignedData(),
390 ("content", Any(expl=tag_ctxc(0))),
393 ``contentType`` field tells that it defines that ``content`` must be
394 decoded with ``SignedData`` specification, if ``contentType`` equals to
395 ``id-signedData``. The same applies to ``DigestedData``. If
396 ``contentType`` contains unknown OID, then no automatic decoding is
399 You can specify multiple fields, that will be autodecoded -- that is why
400 ``defines`` kwarg is a sequence. You can specify defined field
401 relatively or absolutely to current decode path. For example ``defines``
402 for AlgorithmIdentifier of X.509's
403 ``tbsCertificate:subjectPublicKeyInfo:algorithm:algorithm``::
407 id_ecPublicKey: ECParameters(),
408 id_GostR3410_2001: GostR34102001PublicKeyParameters(),
410 (("..", "subjectPublicKey"), {
411 id_rsaEncryption: RSAPublicKey(),
412 id_GostR3410_2001: OctetString(),
416 tells that if certificate's SPKI algorithm is GOST R 34.10-2001, then
417 autodecode its parameters inside SPKI's algorithm and its public key
420 Following types can be automatically decoded (DEFINED BY):
422 * :py:class:`pyderasn.Any`
423 * :py:class:`pyderasn.BitString` (that is multiple of 8 bits)
424 * :py:class:`pyderasn.OctetString`
425 * :py:class:`pyderasn.SequenceOf`/:py:class:`pyderasn.SetOf`
426 ``Any``/``BitString``/``OctetString``-s
428 When any of those fields is automatically decoded, then ``.defined``
429 attribute contains ``(OID, value)`` tuple. ``OID`` tells by which OID it
430 was defined, ``value`` contains corresponding decoded value. For example
431 above, ``content_info["content"].defined == (id_signedData, signed_data)``.
433 .. _defines_by_path_ctx:
435 defines_by_path context option
436 ______________________________
438 Sometimes you either can not or do not want to explicitly set *defines*
439 in the schema. You can dynamically apply those definitions when calling
440 :py:meth:`pyderasn.Obj.decode` method.
442 Specify ``defines_by_path`` key in the :ref:`decode context <ctx>`. Its
443 value must be sequence of following tuples::
445 (decode_path, defines)
447 where ``decode_path`` is a tuple holding so-called decode path to the
448 exact :py:class:`pyderasn.ObjectIdentifier` field you want to apply
449 ``defines``, holding exactly the same value as accepted in its
450 :ref:`keyword argument <defines>`.
452 For example, again for CMS, you want to automatically decode
453 ``SignedData`` and CMC's (:rfc:`5272`) ``PKIData`` and ``PKIResponse``
454 structures it may hold. Also, automatically decode ``controlSequence``
457 content_info = ContentInfo().decod(data, ctx={"defines_by_path": (
460 ((("content",), {id_signedData: SignedData()}),),
465 DecodePathDefBy(id_signedData),
470 id_cct_PKIData: PKIData(),
471 id_cct_PKIResponse: PKIResponse(),
477 DecodePathDefBy(id_signedData),
480 DecodePathDefBy(id_cct_PKIResponse),
486 id_cmc_recipientNonce: RecipientNonce(),
487 id_cmc_senderNonce: SenderNonce(),
488 id_cmc_statusInfoV2: CMCStatusInfoV2(),
489 id_cmc_transactionId: TransactionId(),
494 Pay attention for :py:class:`pyderasn.DecodePathDefBy` and ``any``.
495 First function is useful for path construction when some automatic
496 decoding is already done. ``any`` means literally any value it meet --
497 useful for SEQUENCE/SET OF-s.
504 By default PyDERASN accepts only DER encoded data. By default it encodes
505 to DER. But you can optionally enable BER decoding with setting
506 ``bered`` :ref:`context <ctx>` argument to True. Indefinite lengths and
507 constructed primitive types should be parsed successfully.
509 * If object is encoded in BER form (not the DER one), then ``ber_encoded``
510 attribute is set to True. Only ``BOOLEAN``, ``BIT STRING``, ``OCTET
511 STRING``, ``OBJECT IDENTIFIER``, ``SEQUENCE``, ``SET``, ``SET OF``,
512 ``UTCTime``, ``GeneralizedTime`` can contain it.
513 * If object has an indefinite length encoding, then its ``lenindef``
514 attribute is set to True. Only ``BIT STRING``, ``OCTET STRING``,
515 ``SEQUENCE``, ``SET``, ``SEQUENCE OF``, ``SET OF``, ``ANY`` can
517 * If object has an indefinite length encoded explicit tag, then
518 ``expl_lenindef`` is set to True.
519 * If object has either any of BER-related encoding (explicit tag
520 indefinite length, object's indefinite length, BER-encoding) or any
521 underlying component has that kind of encoding, then ``bered``
522 attribute is set to True. For example SignedData CMS can have
523 ``ContentInfo:content:signerInfos:*`` ``bered`` value set to True, but
524 ``ContentInfo:content:signerInfos:*:signedAttrs`` won't.
526 EOC (end-of-contents) token's length is taken in advance in object's
529 .. _allow_expl_oob_ctx:
531 Allow explicit tag out-of-bound
532 -------------------------------
534 Invalid BER encoding could contain ``EXPLICIT`` tag containing more than
535 one value, more than one object. If you set ``allow_expl_oob`` context
536 option to True, then no error will be raised and that invalid encoding
537 will be silently further processed. But pay attention that offsets and
538 lengths will be invalid in that case.
542 This option should be used only for skipping some decode errors, just
543 to see the decoded structure somehow.
547 Streaming and dealing with huge structures
548 ------------------------------------------
555 ASN.1 structures can be huge, they can hold millions of objects inside
556 (for example Certificate Revocation Lists (CRL), holding revocation
557 state for every previously issued X.509 certificate). CACert.org's 8 MiB
558 CRL file takes more than half a gigabyte of memory to hold the decoded
561 If you just simply want to check the signature over the ``tbsCertList``,
562 you can create specialized schema with that field represented as
563 OctetString for example::
565 class TBSCertListFast(Sequence):
568 ("revokedCertificates", OctetString(
569 impl=SequenceOf.tag_default,
575 This allows you to quickly decode a few fields and check the signature
576 over the ``tbsCertList`` bytes.
578 But how can you get all certificate's serial number from it, after you
579 trust that CRL after signature validation? You can use so called
580 ``evgen`` (event generation) mode, to catch the events/facts of some
581 successful object decoding. Let's use command line capabilities::
583 $ python -m pyderasn --schema tests.test_crl:CertificateList --evgen revoke.crl
584 10 [1,1, 1] . . version: Version INTEGER v2 (01) OPTIONAL
585 15 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
586 26 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL
587 13 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE
588 34 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.10
589 39 [0,0, 9] . . . . . . value: [UNIV 19] AttributeValue ANY
590 32 [1,1, 14] . . . . . 0: AttributeTypeAndValue SEQUENCE
591 30 [1,1, 16] . . . . 0: RelativeDistinguishedName SET OF
593 188 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 17 (11)
594 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
595 191 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
596 191 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2003-04-01T14:25:08
597 186 [1,1, 18] . . . 0: RevokedCertificate SEQUENCE
598 208 [1,1, 1] . . . . userCertificate: CertificateSerialNumber INTEGER 20 (14)
599 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
600 211 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
601 211 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2002-10-01T02:18:01
602 206 [1,1, 18] . . . 1: RevokedCertificate SEQUENCE
604 9144992 [0,0, 15] . . . . revocationDate: Time CHOICE utcTime
605 9144992 [1,1, 13] . . . . . utcTime: UTCTime UTCTime 2020-02-08T07:25:06
606 9144985 [1,1, 20] . . . 415755: RevokedCertificate SEQUENCE
607 181 [1,4,9144821] . . revokedCertificates: RevokedCertificates SEQUENCE OF OPTIONAL
608 5 [1,4,9144997] . tbsCertList: TBSCertList SEQUENCE
609 9145009 [1,1, 9] . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.13
610 9145020 [0,0, 2] . . parameters: [UNIV 5] ANY OPTIONAL
611 9145007 [1,1, 13] . signatureAlgorithm: AlgorithmIdentifier SEQUENCE
612 9145022 [1,3, 513] . signatureValue: BIT STRING 4096 bits
613 0 [1,4,9145534] CertificateList SEQUENCE
615 Here we see how decoder works: it decodes SEQUENCE's tag, length, then
616 decodes underlying values. It can not tell if SEQUENCE is decoded, so
617 the event of the upper level SEQUENCE is the last one we see.
618 ``version`` field is just a single INTEGER -- it is decoded and event is
619 fired immediately. Then we see that ``algorithm`` and ``parameters``
620 fields are decoded and only after them the ``signature`` SEQUENCE is
621 fired as a successfully decoded. There are 4 events for each revoked
622 certificate entry in that CRL: ``userCertificate`` serial number,
623 ``utcTime`` of ``revocationDate`` CHOICE, ``RevokedCertificate`` itself
624 as a one of entity in ``revokedCertificates`` SEQUENCE OF.
626 We can do that in our ordinary Python code and understand where we are
627 by looking at deterministically generated decode paths (do not forget
628 about useful ``--print-decode-path`` CLI option). We must use
629 :py:meth:`pyderasn.Obj.decode_evgen` method, instead of ordinary
630 :py:meth:`pyderasn.Obj.decode`. It is generator yielding ``(decode_path,
631 obj, tail)`` tuples::
633 for decode_path, obj, _ in CertificateList().decode_evgen(crl_raw):
635 len(decode_path) == 4 and
636 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
637 decode_path[3] == "userCertificate"
639 print("serial number:", int(obj))
641 Virtually it does not take any memory except at least needed for single
642 object storage. You can easily use that mode to determine required
643 object ``.offset`` and ``.*len`` to be able to decode it separately, or
644 maybe verify signature upon it just by taking bytes by ``.offset`` and
647 .. _evgen_mode_upto_ctx:
652 There is full ability to get any kind of data from the CRL in the
653 example above. However it is not too convenient to get the whole
654 ``RevokedCertificate`` structure, that is pretty lightweight and one may
655 do not want to disassemble it. You can use ``evgen_mode_upto``
656 :ref:`ctx <ctx>` option that semantically equals to
657 :ref:`defines_by_path <defines_by_path_ctx>` -- list of decode paths
658 mapped to any non-None value. If specified decode path is met, then any
659 subsequent objects won't be decoded in evgen mode. That allows us to
660 parse the CRL above with fully assembled ``RevokedCertificate``::
662 for decode_path, obj, _ in CertificateList().decode_evgen(
664 ctx={"evgen_mode_upto": (
665 (("tbsCertList", "revokedCertificates", any), True),
669 len(decode_path) == 3 and
670 decode_path[:2] == ("tbsCertList", "revokedCertificates"),
672 print("serial number:", int(obj["userCertificate"]))
676 SEQUENCE/SET values with DEFAULT specified are automatically decoded
684 POSIX compliant systems have ``mmap`` syscall, giving ability to work
685 the memory mapped file. You can deal with the file like it was an
686 ordinary binary string, allowing you not to load it to the memory first.
687 Also you can use them as an input for OCTET STRING, taking no Python
688 memory for their storage.
690 There is convenient :py:func:`pyderasn.file_mmaped` function that
691 creates read-only memoryview on the file contents::
693 with open("huge", "rb") as fd:
694 raw = file_mmaped(fd)
695 obj = Something.decode(raw)
699 mmap maps the **whole** file. So it plays no role if you seek-ed it
700 before. Take the slice of the resulting memoryview with required
705 If you use ZFS as underlying storage, then pay attention that
706 currently most platforms does not deal good with ZFS ARC and ordinary
707 page cache used for mmaps. It can take twice the necessary size in
708 the memory: both in page cache and ZFS ARC.
710 .. _keep_memoryview_ctx:
712 That read-only memoryview could be safe to be used as a value inside
713 decoded :py:class:`pyderasn.OctetString` and :py:class:`pyderasn.Any`
714 objects. You can enable that by setting `"keep_memoryview": True` in
715 :ref:`decode context <ctx>`. No OCTET STRING and ANY values will be
716 copied to memory. Of course that works only in DER encoding, where the
717 value is continuously encoded.
722 We can parse any kind of data now, but how can we produce files
723 streamingly, without storing their encoded representation in memory?
724 SEQUENCE by default encodes in memory all its values, joins them in huge
725 binary string, just to know the exact size of SEQUENCE's value for
726 encoding it in TLV. DER requires you to know all exact sizes of the
729 You can use CER encoding mode, that slightly differs from the DER, but
730 does not require exact sizes knowledge, allowing streaming encoding
731 directly to some writer/buffer. Just use
732 :py:meth:`pyderasn.Obj.encode_cer` method, providing the writer where
733 encoded data will flow::
735 with open("result", "wb") as fd:
736 obj.encode_cer(fd.write)
741 obj.encode_cer(buf.write)
743 If you do not want to create in-memory buffer every time, then you can
744 use :py:func:`pyderasn.encode_cer` function::
746 data = encode_cer(obj)
748 Remember that CER is **not valid** DER in most cases, so you **have to**
749 use :ref:`bered <bered_ctx>` :ref:`ctx <ctx>` option during its
750 decoding. Also currently there is **no** validation that provided CER is
751 valid one -- you are sure that it has only valid BER encoding.
755 SET OF values can not be streamingly encoded, because they are
756 required to be sorted byte-by-byte. Big SET OF values still will take
757 much memory. Use neither SET nor SET OF values, as modern ASN.1
760 Do not forget about using :ref:`mmap-ed <mmap>` memoryviews for your
761 OCTET STRINGs! They will be streamingly copied from underlying file to
762 the buffer using 1 KB chunks.
764 Some structures require that some of the elements have to be forcefully
765 DER encoded. For example ``SignedData`` CMS requires you to encode
766 ``SignedAttributes`` and X.509 certificates in DER form, allowing you to
767 encode everything else in BER. You can tell any of the structures to be
768 forcefully encoded in DER during CER encoding, by specifying
769 ``der_forced=True`` attribute::
771 class Certificate(Sequence):
775 class SignedAttributes(SetOf):
777 bounds = (1, float("+inf"))
780 .. _agg_octet_string:
785 In most cases, huge quantity of binary data is stored as OCTET STRING.
786 CER encoding splits it on 1 KB chunks. BER allows splitting on various
787 levels of chunks inclusion::
789 SOME STRING[CONSTRUCTED]
790 OCTET STRING[CONSTRUCTED]
791 OCTET STRING[PRIMITIVE]
793 OCTET STRING[PRIMITIVE]
795 OCTET STRING[PRIMITIVE]
797 OCTET STRING[PRIMITIVE]
799 OCTET STRING[CONSTRUCTED]
800 OCTET STRING[PRIMITIVE]
802 OCTET STRING[PRIMITIVE]
804 OCTET STRING[CONSTRUCTED]
805 OCTET STRING[CONSTRUCTED]
806 OCTET STRING[PRIMITIVE]
809 You can not just take the offset and some ``.vlen`` of the STRING and
810 treat it as the payload. If you decode it without
811 :ref:`evgen mode <evgen_mode>`, then it will be automatically aggregated
812 and ``bytes()`` will give the whole payload contents.
814 You are forced to use :ref:`evgen mode <evgen_mode>` for decoding for
815 small memory footprint. There is convenient
816 :py:func:`pyderasn.agg_octet_string` helper for reconstructing the
817 payload. Let's assume you have got BER/CER encoded ``ContentInfo`` with
818 huge ``SignedData`` and ``EncapsulatedContentInfo``. Let's calculate the
819 SHA512 digest of its ``eContent``::
821 fd = open("data.p7m", "rb")
822 raw = file_mmaped(fd)
823 ctx = {"bered": True}
824 for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
825 if decode_path == ("content",):
829 raise ValueError("no content found")
830 hasher_state = sha512()
832 hasher_state.update(data)
834 evgens = SignedData().decode_evgen(
835 raw[content.offset:],
836 offset=content.offset,
839 agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, hasher)
841 digest = hasher_state.digest()
843 Simply replace ``hasher`` with some writeable file's ``fd.write`` to
844 copy the payload (without BER/CER encoding interleaved overhead) in it.
845 Virtually it won't take memory more than for keeping small structures
846 and 1 KB binary chunks.
850 SEQUENCE OF iterators
851 _____________________
853 You can use iterators as a value in :py:class:`pyderasn.SequenceOf`
854 classes. The only difference with providing the full list of objects, is
855 that type and bounds checking is done during encoding process. Also
856 sequence's value will be emptied after encoding, forcing you to set its
859 This is very useful when you have to create some huge objects, like
860 CRLs, with thousands and millions of entities inside. You can write the
861 generator taking necessary data from the database and giving the
862 ``RevokedCertificate`` objects. Only binary representation of that
863 objects will take memory during DER encoding.
868 There is ability to do 2-pass encoding to DER, writing results directly
869 to specified writer (buffer, file, whatever). It could be 1.5+ times
870 slower than ordinary encoding, but it takes little memory for 1st pass
871 state storing. For example, 1st pass state for CACert.org's CRL with
872 ~416K of certificate entries takes nearly 3.5 MB of memory.
873 ``SignedData`` with several gigabyte ``EncapsulatedContentInfo`` takes
874 nearly 0.5 KB of memory.
876 If you use :ref:`mmap-ed <mmap>` memoryviews, :ref:`SEQUENCE OF
877 iterators <seqof-iterators>` and write directly to opened file, then
878 there is very small memory footprint.
880 1st pass traverses through all the objects of the structure and returns
881 the size of DER encoded structure, together with 1st pass state object.
882 That state contains precalculated lengths for various objects inside the
887 fulllen, state = obj.encode1st()
889 2nd pass takes the writer and 1st pass state. It traverses through all
890 the objects again, but writes their encoded representation to the writer.
894 with open("result", "wb") as fd:
895 obj.encode2nd(fd.write, iter(state))
899 You **MUST NOT** use 1st pass state if anything is changed in the
900 objects. It is intended to be used immediately after 1st pass is
903 If you use :ref:`SEQUENCE OF iterators <seqof-iterators>`, then you
904 have to reinitialize the values after the 1st pass. And you **have to**
905 be sure that the iterator gives exactly the same values as previously.
906 Yes, you have to run your iterator twice -- because this is two pass
909 If you want to encode to the memory, then you can use convenient
910 :py:func:`pyderasn.encode2pass` helper.
916 .. autofunction:: pyderasn.browse
920 .. autoclass:: pyderasn.Obj
928 .. autoclass:: pyderasn.Boolean
933 .. autoclass:: pyderasn.Integer
934 :members: __init__, named, tohex
938 .. autoclass:: pyderasn.BitString
939 :members: __init__, bit_len, named
943 .. autoclass:: pyderasn.OctetString
948 .. autoclass:: pyderasn.Null
953 .. autoclass:: pyderasn.ObjectIdentifier
958 .. autoclass:: pyderasn.Enumerated
962 .. autoclass:: pyderasn.CommonString
966 .. autoclass:: pyderasn.NumericString
970 .. autoclass:: pyderasn.PrintableString
971 :members: __init__, allow_asterisk, allow_ampersand
975 .. autoclass:: pyderasn.IA5String
979 .. autoclass:: pyderasn.VisibleString
983 .. autoclass:: pyderasn.UTCTime
984 :members: __init__, todatetime, totzdatetime
988 .. autoclass:: pyderasn.GeneralizedTime
989 :members: __init__, todatetime, totzdatetime
996 .. autoclass:: pyderasn.Choice
997 :members: __init__, choice, value
1001 .. autoclass:: PrimitiveTypes
1005 .. autoclass:: pyderasn.Any
1013 .. autoclass:: pyderasn.Sequence
1018 .. autoclass:: pyderasn.Set
1023 .. autoclass:: pyderasn.SequenceOf
1028 .. autoclass:: pyderasn.SetOf
1034 .. autofunction:: pyderasn.abs_decode_path
1035 .. autofunction:: pyderasn.agg_octet_string
1036 .. autofunction:: pyderasn.ascii_visualize
1037 .. autofunction:: pyderasn.colonize_hex
1038 .. autofunction:: pyderasn.encode2pass
1039 .. autofunction:: pyderasn.encode_cer
1040 .. autofunction:: pyderasn.file_mmaped
1041 .. autofunction:: pyderasn.hexenc
1042 .. autofunction:: pyderasn.hexdec
1043 .. autofunction:: pyderasn.hexdump
1044 .. autofunction:: pyderasn.tag_encode
1045 .. autofunction:: pyderasn.tag_decode
1046 .. autofunction:: pyderasn.tag_ctxp
1047 .. autofunction:: pyderasn.tag_ctxc
1048 .. autoclass:: pyderasn.DecodeError
1050 .. autoclass:: pyderasn.NotEnoughData
1051 .. autoclass:: pyderasn.ExceedingData
1052 .. autoclass:: pyderasn.LenIndefForm
1053 .. autoclass:: pyderasn.TagMismatch
1054 .. autoclass:: pyderasn.InvalidLength
1055 .. autoclass:: pyderasn.InvalidOID
1056 .. autoclass:: pyderasn.ObjUnknown
1057 .. autoclass:: pyderasn.ObjNotReady
1058 .. autoclass:: pyderasn.InvalidValueType
1059 .. autoclass:: pyderasn.BoundsError
1066 You can decode DER/BER files using command line abilities::
1068 $ python -m pyderasn --schema tests.test_crts:Certificate path/to/file
1070 If there is no schema for your file, then you can try parsing it without,
1071 but of course IMPLICIT tags will often make it impossible. But result is
1072 good enough for the certificate above::
1074 $ python -m pyderasn path/to/file
1075 0 [1,3,1604] . >: SEQUENCE OF
1076 4 [1,3,1453] . . >: SEQUENCE OF
1077 8 [0,0, 5] . . . . >: [0] ANY
1078 . . . . . A0:03:02:01:02
1079 13 [1,1, 3] . . . . >: INTEGER 61595
1080 18 [1,1, 13] . . . . >: SEQUENCE OF
1081 20 [1,1, 9] . . . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1082 31 [1,1, 0] . . . . . . >: NULL
1083 33 [1,3, 274] . . . . >: SEQUENCE OF
1084 37 [1,1, 11] . . . . . . >: SET OF
1085 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1086 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER 2.5.4.6
1087 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1089 1409 [1,1, 50] . . . . . . >: SEQUENCE OF
1090 1411 [1,1, 8] . . . . . . . . >: OBJECT IDENTIFIER 1.3.6.1.5.5.7.1.1
1091 1421 [1,1, 38] . . . . . . . . >: OCTET STRING 38 bytes
1092 . . . . . . . . . 30:24:30:22:06:08:2B:06:01:05:05:07:30:01:86:16
1093 . . . . . . . . . 68:74:74:70:3A:2F:2F:6F:63:73:70:2E:69:70:73:63
1094 . . . . . . . . . 61:2E:63:6F:6D:2F
1095 1461 [1,1, 13] . . >: SEQUENCE OF
1096 1463 [1,1, 9] . . . . >: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1097 1474 [1,1, 0] . . . . >: NULL
1098 1476 [1,2, 129] . . >: BIT STRING 1024 bits
1099 . . . 68:EE:79:97:97:DD:3B:EF:16:6A:06:F2:14:9A:6E:CD
1100 . . . 9E:12:F7:AA:83:10:BD:D1:7C:98:FA:C7:AE:D4:0E:2C
1106 If you have got dictionaries with ObjectIdentifiers, like example one
1107 from ``tests/test_crts.py``::
1110 "1.2.840.113549.1.1.1": "id-rsaEncryption",
1111 "1.2.840.113549.1.1.5": "id-sha1WithRSAEncryption",
1113 "2.5.4.10": "id-at-organizationName",
1114 "2.5.4.11": "id-at-organizationalUnitName",
1117 then you can pass it to pretty printer to see human readable OIDs::
1119 $ python -m pyderasn --oids tests.test_crts:stroid2name path/to/file
1121 37 [1,1, 11] . . . . . . >: SET OF
1122 39 [1,1, 9] . . . . . . . . >: SEQUENCE OF
1123 41 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-countryName (2.5.4.6)
1124 46 [1,1, 2] . . . . . . . . . . >: PrintableString PrintableString ES
1125 50 [1,1, 18] . . . . . . >: SET OF
1126 52 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1127 54 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-stateOrProvinceName (2.5.4.8)
1128 59 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1129 70 [1,1, 18] . . . . . . >: SET OF
1130 72 [1,1, 16] . . . . . . . . >: SEQUENCE OF
1131 74 [1,1, 3] . . . . . . . . . . >: OBJECT IDENTIFIER id-at-localityName (2.5.4.7)
1132 79 [1,1, 9] . . . . . . . . . . >: PrintableString PrintableString Barcelona
1138 Each decoded element has so-called decode path: sequence of structure
1139 names it is passing during the decode process. Each element has its own
1140 unique path inside the whole ASN.1 tree. You can print it out with
1141 ``--print-decode-path`` option::
1143 $ python -m pyderasn --schema path.to:Certificate --print-decode-path path/to/file
1144 0 [1,3,1604] Certificate SEQUENCE []
1145 4 [1,3,1453] . tbsCertificate: TBSCertificate SEQUENCE [tbsCertificate]
1146 10-2 [1,1, 1] . . version: [0] EXPLICIT Version INTEGER v3 OPTIONAL [tbsCertificate:version]
1147 13 [1,1, 3] . . serialNumber: CertificateSerialNumber INTEGER 61595 [tbsCertificate:serialNumber]
1148 18 [1,1, 13] . . signature: AlgorithmIdentifier SEQUENCE [tbsCertificate:signature]
1149 20 [1,1, 9] . . . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5 [tbsCertificate:signature:algorithm]
1150 31 [0,0, 2] . . . parameters: [UNIV 5] ANY OPTIONAL [tbsCertificate:signature:parameters]
1152 33 [0,0, 278] . . issuer: Name CHOICE rdnSequence [tbsCertificate:issuer]
1153 33 [1,3, 274] . . . rdnSequence: RDNSequence SEQUENCE OF [tbsCertificate:issuer:rdnSequence]
1154 37 [1,1, 11] . . . . 0: RelativeDistinguishedName SET OF [tbsCertificate:issuer:rdnSequence:0]
1155 39 [1,1, 9] . . . . . 0: AttributeTypeAndValue SEQUENCE [tbsCertificate:issuer:rdnSequence:0:0]
1156 41 [1,1, 3] . . . . . . type: AttributeType OBJECT IDENTIFIER 2.5.4.6 [tbsCertificate:issuer:rdnSequence:0:0:type]
1157 46 [0,0, 4] . . . . . . value: [UNIV 19] AttributeValue ANY [tbsCertificate:issuer:rdnSequence:0:0:value]
1158 . . . . . . . 13:02:45:53
1159 46 [1,1, 2] . . . . . . . DEFINED BY 2.5.4.6: CountryName PrintableString ES [tbsCertificate:issuer:rdnSequence:0:0:value:DEFINED BY 2.5.4.6]
1162 Now you can print only the specified tree, for example signature algorithm::
1164 $ python -m pyderasn --schema path.to:Certificate --decode-path-only tbsCertificate:signature path/to/file
1165 18 [1,1, 13] AlgorithmIdentifier SEQUENCE
1166 20 [1,1, 9] . algorithm: OBJECT IDENTIFIER 1.2.840.113549.1.1.5
1167 31 [0,0, 2] . parameters: [UNIV 5] ANY OPTIONAL
1171 from array import array
1172 from collections import namedtuple
1173 from collections import OrderedDict
1174 from copy import copy
1175 from datetime import datetime
1176 from datetime import timedelta
1177 from io import BytesIO
1178 from math import ceil
1179 from operator import attrgetter
1180 from string import ascii_letters
1181 from string import digits
1182 from struct import Struct as struct_Struct
1183 from sys import maxsize as sys_maxsize
1184 from sys import version_info
1185 from unicodedata import category as unicat
1188 from termcolor import colored
1189 except ImportError: # pragma: no cover
1190 def colored(what, *args, **kwargs):
1194 from dateutil.tz import UTC as tzUTC
1195 except ImportError: # pragma: no cover
1247 "TagClassApplication",
1250 "TagClassUniversal",
1251 "TagFormConstructed",
1262 TagClassUniversal = 0
1263 TagClassApplication = 1 << 6
1264 TagClassContext = 1 << 7
1265 TagClassPrivate = 1 << 6 | 1 << 7
1266 TagFormPrimitive = 0
1267 TagFormConstructed = 1 << 5
1269 TagClassContext: "",
1270 TagClassApplication: "APPLICATION ",
1271 TagClassPrivate: "PRIVATE ",
1272 TagClassUniversal: "UNIV ",
1276 LENINDEF = b"\x80" # length indefinite mark
1277 LENINDEF_PP_CHAR = "∞"
1278 NAMEDTUPLE_KWARGS = {} if version_info < (3, 6) else {"module": __name__}
1279 SET01 = frozenset("01")
1280 DECIMALS = frozenset(digits)
1281 DECIMAL_SIGNS = ".,"
1282 NEXT_ATTR_NAME = "__next__"
1285 def file_mmaped(fd):
1286 """Make mmap-ed memoryview for reading from file
1288 :param fd: file object
1289 :returns: memoryview over read-only mmap-ing of the whole file
1293 It does not work under Windows.
1296 return memoryview(mmap.mmap(fd.fileno(), length=0, prot=mmap.PROT_READ))
1300 if not set(value) <= DECIMALS:
1301 raise ValueError("non-pure integer")
1305 def fractions2float(fractions_raw):
1306 pureint(fractions_raw)
1307 return float("0." + fractions_raw)
1310 def get_def_by_path(defines_by_path, sub_decode_path):
1311 """Get define by decode path
1313 for path, define in defines_by_path:
1314 if len(path) != len(sub_decode_path):
1316 for p1, p2 in zip(path, sub_decode_path):
1317 if (p1 is not any) and (p1 != p2):
1323 ########################################################################
1325 ########################################################################
1327 class ASN1Error(ValueError):
1331 class DecodeError(ASN1Error):
1332 def __init__(self, msg="", klass=None, decode_path=(), offset=0):
1334 :param str msg: reason of decode failing
1335 :param klass: optional exact DecodeError inherited class (like
1336 :py:exc:`NotEnoughData`, :py:exc:`TagMismatch`,
1337 :py:exc:`InvalidLength`)
1338 :param decode_path: tuple of strings. It contains human
1339 readable names of the fields through which
1340 decoding process has passed
1341 :param int offset: binary offset where failure happened
1346 self.decode_path = decode_path
1347 self.offset = offset
1352 "" if self.klass is None else self.klass.__name__,
1354 ("(%s)" % ":".join(str(dp) for dp in self.decode_path))
1355 if len(self.decode_path) > 0 else ""
1357 ("(at %d)" % self.offset) if self.offset > 0 else "",
1363 return "%s(%s)" % (self.__class__.__name__, self)
1366 class NotEnoughData(DecodeError):
1370 class ExceedingData(ASN1Error):
1371 def __init__(self, nbytes):
1373 self.nbytes = nbytes
1376 return "%d trailing bytes" % self.nbytes
1379 return "%s(%s)" % (self.__class__.__name__, self)
1382 class LenIndefForm(DecodeError):
1386 class TagMismatch(DecodeError):
1390 class InvalidLength(DecodeError):
1394 class InvalidOID(DecodeError):
1398 class ObjUnknown(ASN1Error):
1399 def __init__(self, name):
1404 return "object is unknown: %s" % self.name
1407 return "%s(%s)" % (self.__class__.__name__, self)
1410 class ObjNotReady(ASN1Error):
1411 def __init__(self, name):
1416 return "object is not ready: %s" % self.name
1419 return "%s(%s)" % (self.__class__.__name__, self)
1422 class InvalidValueType(ASN1Error):
1423 def __init__(self, expected_types):
1425 self.expected_types = expected_types
1428 return "invalid value type, expected: %s" % ", ".join(
1429 [repr(t) for t in self.expected_types]
1433 return "%s(%s)" % (self.__class__.__name__, self)
1436 class BoundsError(ASN1Error):
1437 def __init__(self, bound_min, value, bound_max):
1439 self.bound_min = bound_min
1441 self.bound_max = bound_max
1444 return "unsatisfied bounds: %s <= %s <= %s" % (
1451 return "%s(%s)" % (self.__class__.__name__, self)
1454 ########################################################################
1456 ########################################################################
1459 """Binary data to hexadecimal string convert
1461 return bytes.fromhex(data)
1465 """Hexadecimal string to binary data convert
1470 def int_bytes_len(num, byte_len=8):
1473 return int(ceil(float(num.bit_length()) / byte_len))
1476 def zero_ended_encode(num):
1477 octets = bytearray(int_bytes_len(num, 7))
1479 octets[i] = num & 0x7F
1483 octets[i] = 0x80 | (num & 0x7F)
1486 return bytes(octets)
1489 int2byte = struct_Struct(">B").pack
1492 def tag_encode(num, klass=TagClassUniversal, form=TagFormPrimitive):
1493 """Encode tag to binary form
1495 :param int num: tag's number
1496 :param int klass: tag's class (:py:data:`pyderasn.TagClassUniversal`,
1497 :py:data:`pyderasn.TagClassContext`,
1498 :py:data:`pyderasn.TagClassApplication`,
1499 :py:data:`pyderasn.TagClassPrivate`)
1500 :param int form: tag's form (:py:data:`pyderasn.TagFormPrimitive`,
1501 :py:data:`pyderasn.TagFormConstructed`)
1505 return int2byte(klass | form | num)
1506 # [XX|X|11111][1.......][1.......] ... [0.......]
1507 return int2byte(klass | form | 31) + zero_ended_encode(num)
1510 def tag_decode(tag):
1511 """Decode tag from binary form
1515 No validation is performed, assuming that it has already passed.
1517 It returns tuple with three integers, as
1518 :py:func:`pyderasn.tag_encode` accepts.
1520 first_octet = tag[0]
1521 klass = first_octet & 0xC0
1522 form = first_octet & 0x20
1523 if first_octet & 0x1F < 0x1F:
1524 return (klass, form, first_octet & 0x1F)
1526 for octet in tag[1:]:
1529 return (klass, form, num)
1533 """Create CONTEXT PRIMITIVE tag
1535 return tag_encode(num=num, klass=TagClassContext, form=TagFormPrimitive)
1539 """Create CONTEXT CONSTRUCTED tag
1541 return tag_encode(num=num, klass=TagClassContext, form=TagFormConstructed)
1544 def tag_strip(data):
1545 """Take off tag from the data
1547 :returns: (encoded tag, tag length, remaining data)
1550 raise NotEnoughData("no data at all")
1551 if data[0] & 0x1F < 31:
1552 return data[:1], 1, data[1:]
1557 raise DecodeError("unfinished tag")
1558 if data[i] & 0x80 == 0:
1560 if i == 1 and data[1] < 0x1F:
1561 raise DecodeError("unexpected long form")
1562 if i > 1 and data[1] & 0x7F == 0:
1563 raise DecodeError("leading zero byte in tag value")
1565 return data[:i], i, data[i:]
1571 octets = bytearray(int_bytes_len(l) + 1)
1572 octets[0] = 0x80 | (len(octets) - 1)
1573 for i in range(len(octets) - 1, 0, -1):
1574 octets[i] = l & 0xFF
1576 return bytes(octets)
1579 def len_decode(data):
1582 :returns: (decoded length, length's length, remaining data)
1583 :raises LenIndefForm: if indefinite form encoding is met
1586 raise NotEnoughData("no data at all")
1587 first_octet = data[0]
1588 if first_octet & 0x80 == 0:
1589 return first_octet, 1, data[1:]
1590 octets_num = first_octet & 0x7F
1591 if octets_num + 1 > len(data):
1592 raise NotEnoughData("encoded length is longer than data")
1594 raise LenIndefForm()
1596 raise DecodeError("leading zeros")
1598 for v in data[1:1 + octets_num]:
1601 raise DecodeError("long form instead of short one")
1602 return l, 1 + octets_num, data[1 + octets_num:]
1605 LEN0 = len_encode(0)
1606 LEN1 = len_encode(1)
1607 LEN1K = len_encode(1000)
1611 """How many bytes length field will take
1615 if l < 256: # 1 << 8
1617 if l < 65536: # 1 << 16
1619 if l < 16777216: # 1 << 24
1621 if l < 4294967296: # 1 << 32
1623 if l < 1099511627776: # 1 << 40
1625 if l < 281474976710656: # 1 << 48
1627 if l < 72057594037927936: # 1 << 56
1629 raise OverflowError("too big length")
1632 def write_full(writer, data):
1633 """Fully write provided data
1635 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1637 BytesIO does not guarantee that the whole data will be written at
1638 once. That function write everything provided, raising an error if
1639 ``writer`` returns None.
1641 data = memoryview(data)
1643 while written != len(data):
1644 n = writer(data[written:])
1646 raise ValueError("can not write to buf")
1650 # If it is 64-bit system, then use compact 64-bit array of unsigned
1651 # longs. Use an ordinary list with universal integers otherwise, that
1653 if sys_maxsize > 2 ** 32:
1654 def state_2pass_new():
1657 def state_2pass_new():
1661 ########################################################################
1663 ########################################################################
1665 class AutoAddSlots(type):
1666 def __new__(cls, name, bases, _dict):
1667 _dict["__slots__"] = _dict.get("__slots__", ())
1668 return super().__new__(cls, name, bases, _dict)
1671 BasicState = namedtuple("BasicState", (
1684 ), **NAMEDTUPLE_KWARGS)
1687 class Obj(metaclass=AutoAddSlots):
1688 """Common ASN.1 object class
1690 All ASN.1 types are inherited from it. It has metaclass that
1691 automatically adds ``__slots__`` to all inherited classes.
1716 self.tag = getattr(self, "impl", self.tag_default) if impl is None else impl
1717 self._expl = getattr(self, "expl", None) if expl is None else expl
1718 if self.tag != self.tag_default and self._expl is not None:
1719 raise ValueError("implicit and explicit tags can not be set simultaneously")
1720 if self.tag is None:
1721 self._tag_order = None
1723 tag_class, _, tag_num = tag_decode(
1724 self.tag if self._expl is None else self._expl
1726 self._tag_order = (tag_class, tag_num)
1727 if default is not None:
1729 self.optional = optional
1730 self.offset, self.llen, self.vlen = _decoded
1732 self.expl_lenindef = False
1733 self.lenindef = False
1734 self.ber_encoded = False
1737 def ready(self): # pragma: no cover
1738 """Is object ready to be encoded?
1740 raise NotImplementedError()
1742 def _assert_ready(self):
1744 raise ObjNotReady(self.__class__.__name__)
1748 """Is either object or any elements inside is BER encoded?
1750 return self.expl_lenindef or self.lenindef or self.ber_encoded
1754 """Is object decoded?
1756 return (self.llen + self.vlen) > 0
1758 def __getstate__(self): # pragma: no cover
1759 """Used for making safe to be mutable pickleable copies
1761 raise NotImplementedError()
1763 def __setstate__(self, state):
1764 if state.version != __version__:
1765 raise ValueError("data is pickled by different PyDERASN version")
1766 self.tag = state.tag
1767 self._tag_order = state.tag_order
1768 self._expl = state.expl
1769 self.default = state.default
1770 self.optional = state.optional
1771 self.offset = state.offset
1772 self.llen = state.llen
1773 self.vlen = state.vlen
1774 self.expl_lenindef = state.expl_lenindef
1775 self.lenindef = state.lenindef
1776 self.ber_encoded = state.ber_encoded
1779 def tag_order(self):
1780 """Tag's (class, number) used for DER/CER sorting
1782 return self._tag_order
1785 def tag_order_cer(self):
1786 return self.tag_order
1790 """.. seealso:: :ref:`decoding`
1792 return len(self.tag)
1796 """.. seealso:: :ref:`decoding`
1798 return self.tlen + self.llen + self.vlen
1800 def __str__(self): # pragma: no cover
1801 return self.__unicode__()
1803 def __ne__(self, their):
1804 return not(self == their)
1806 def __gt__(self, their): # pragma: no cover
1807 return not(self < their)
1809 def __le__(self, their): # pragma: no cover
1810 return (self == their) or (self < their)
1812 def __ge__(self, their): # pragma: no cover
1813 return (self == their) or (self > their)
1815 def _encode(self): # pragma: no cover
1816 raise NotImplementedError()
1818 def _encode_cer(self, writer):
1819 write_full(writer, self._encode())
1821 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
1822 yield NotImplemented
1824 def _encode1st(self, state):
1825 raise NotImplementedError()
1827 def _encode2nd(self, writer, state_iter):
1828 raise NotImplementedError()
1831 """DER encode the structure
1833 :returns: DER representation
1835 raw = self._encode()
1836 if self._expl is None:
1838 return b"".join((self._expl, len_encode(len(raw)), raw))
1840 def encode1st(self, state=None):
1841 """Do the 1st pass of 2-pass encoding
1843 :rtype: (int, array("L"))
1844 :returns: full length of encoded data and precalculated various
1848 state = state_2pass_new()
1849 if self._expl is None:
1850 return self._encode1st(state)
1852 idx = len(state) - 1
1853 vlen, _ = self._encode1st(state)
1855 fulllen = len(self._expl) + len_size(vlen) + vlen
1856 return fulllen, state
1858 def encode2nd(self, writer, state_iter):
1859 """Do the 2nd pass of 2-pass encoding
1861 :param writer: must comply with ``io.RawIOBase.write`` behaviour
1862 :param state_iter: iterator over the 1st pass state (``iter(state)``)
1864 if self._expl is None:
1865 self._encode2nd(writer, state_iter)
1867 write_full(writer, self._expl + len_encode(next(state_iter)))
1868 self._encode2nd(writer, state_iter)
1870 def encode_cer(self, writer):
1871 """CER encode the structure to specified writer
1873 :param writer: must comply with ``io.RawIOBase.write``
1874 behaviour. It takes slice to be written and
1875 returns number of bytes processed. If it returns
1876 None, then exception will be raised
1878 if self._expl is not None:
1879 write_full(writer, self._expl + LENINDEF)
1880 if getattr(self, "der_forced", False):
1881 write_full(writer, self._encode())
1883 self._encode_cer(writer)
1884 if self._expl is not None:
1885 write_full(writer, EOC)
1887 def hexencode(self):
1888 """Do hexadecimal encoded :py:meth:`pyderasn.Obj.encode`
1890 return hexenc(self.encode())
1900 _ctx_immutable=True,
1904 :param data: either binary or memoryview
1905 :param int offset: initial data's offset
1906 :param bool leavemm: do we need to leave memoryview of remaining
1907 data as is, or convert it to bytes otherwise
1908 :param decode_path: current decode path (tuples of strings,
1909 possibly with DecodePathDefBy) with will be
1910 the root for all underlying objects
1911 :param ctx: optional :ref:`context <ctx>` governing decoding process
1912 :param bool tag_only: decode only the tag, without length and
1913 contents (used only in Choice and Set
1914 structures, trying to determine if tag satisfies
1916 :param bool _ctx_immutable: do we need to ``copy.copy()`` ``ctx``
1918 :returns: (Obj, remaining data)
1920 .. seealso:: :ref:`decoding`
1922 result = next(self.decode_evgen(
1934 _, obj, tail = result
1945 _ctx_immutable=True,
1948 """Decode with evgen mode on
1950 That method is identical to :py:meth:`pyderasn.Obj.decode`, but
1951 it returns the generator producing ``(decode_path, obj, tail)``
1953 .. seealso:: :ref:`evgen mode <evgen_mode>`.
1957 elif _ctx_immutable:
1959 tlv = memoryview(data)
1962 get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path) is not None
1965 if self._expl is None:
1966 for result in self._decode(
1969 decode_path=decode_path,
1972 evgen_mode=_evgen_mode,
1977 _decode_path, obj, tail = result
1978 if _decode_path is not decode_path:
1982 t, tlen, lv = tag_strip(tlv)
1983 except DecodeError as err:
1984 raise err.__class__(
1986 klass=self.__class__,
1987 decode_path=decode_path,
1992 klass=self.__class__,
1993 decode_path=decode_path,
1997 l, llen, v = len_decode(lv)
1998 except LenIndefForm as err:
1999 if not ctx.get("bered", False):
2000 raise err.__class__(
2002 klass=self.__class__,
2003 decode_path=decode_path,
2007 offset += tlen + llen
2008 for result in self._decode(
2011 decode_path=decode_path,
2014 evgen_mode=_evgen_mode,
2016 if tag_only: # pragma: no cover
2019 _decode_path, obj, tail = result
2020 if _decode_path is not decode_path:
2022 eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
2023 if eoc_expected.tobytes() != EOC:
2026 klass=self.__class__,
2027 decode_path=decode_path,
2031 obj.expl_lenindef = True
2032 except DecodeError as err:
2033 raise err.__class__(
2035 klass=self.__class__,
2036 decode_path=decode_path,
2041 raise NotEnoughData(
2042 "encoded length is longer than data",
2043 klass=self.__class__,
2044 decode_path=decode_path,
2047 for result in self._decode(
2049 offset=offset + tlen + llen,
2050 decode_path=decode_path,
2053 evgen_mode=_evgen_mode,
2055 if tag_only: # pragma: no cover
2058 _decode_path, obj, tail = result
2059 if _decode_path is not decode_path:
2061 if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
2063 "explicit tag out-of-bound, longer than data",
2064 klass=self.__class__,
2065 decode_path=decode_path,
2068 yield decode_path, obj, (tail if leavemm else tail.tobytes())
2070 def decod(self, data, offset=0, decode_path=(), ctx=None):
2071 """Decode the data, check that tail is empty
2073 :raises ExceedingData: if tail is not empty
2075 This is just a wrapper over :py:meth:`pyderasn.Obj.decode`
2076 (decode without tail) that also checks that there is no
2079 obj, tail = self.decode(
2082 decode_path=decode_path,
2087 raise ExceedingData(len(tail))
2090 def hexdecode(self, data, *args, **kwargs):
2091 """Do :py:meth:`pyderasn.Obj.decode` with hexadecimal decoded data
2093 return self.decode(hexdec(data), *args, **kwargs)
2095 def hexdecod(self, data, *args, **kwargs):
2096 """Do :py:meth:`pyderasn.Obj.decod` with hexadecimal decoded data
2098 return self.decod(hexdec(data), *args, **kwargs)
2102 """.. seealso:: :ref:`decoding`
2104 return self._expl is not None
2108 """.. seealso:: :ref:`decoding`
2113 def expl_tlen(self):
2114 """.. seealso:: :ref:`decoding`
2116 return len(self._expl)
2119 def expl_llen(self):
2120 """.. seealso:: :ref:`decoding`
2122 if self.expl_lenindef:
2124 return len(len_encode(self.tlvlen))
2127 def expl_offset(self):
2128 """.. seealso:: :ref:`decoding`
2130 return self.offset - self.expl_tlen - self.expl_llen
2133 def expl_vlen(self):
2134 """.. seealso:: :ref:`decoding`
2139 def expl_tlvlen(self):
2140 """.. seealso:: :ref:`decoding`
2142 return self.expl_tlen + self.expl_llen + self.expl_vlen
2145 def fulloffset(self):
2146 """.. seealso:: :ref:`decoding`
2148 return self.expl_offset if self.expled else self.offset
2152 """.. seealso:: :ref:`decoding`
2154 return self.expl_tlvlen if self.expled else self.tlvlen
2156 def pps_lenindef(self, decode_path):
2157 if self.lenindef and not (
2158 getattr(self, "defined", None) is not None and
2159 self.defined[1].lenindef
2162 asn1_type_name="EOC",
2164 decode_path=decode_path,
2166 self.offset + self.tlvlen -
2167 (EOC_LEN * 2 if self.expl_lenindef else EOC_LEN)
2175 if self.expl_lenindef:
2177 asn1_type_name="EOC",
2178 obj_name="EXPLICIT",
2179 decode_path=decode_path,
2180 offset=self.expl_offset + self.expl_tlvlen - EOC_LEN,
2189 def encode_cer(obj):
2190 """Encode to CER in memory buffer
2192 :returns bytes: memory buffer contents
2195 obj.encode_cer(buf.write)
2196 return buf.getvalue()
2199 def encode2pass(obj):
2200 """Encode (2-pass mode) to DER in memory buffer
2202 :returns bytes: memory buffer contents
2205 _, state = obj.encode1st()
2206 obj.encode2nd(buf.write, iter(state))
2207 return buf.getvalue()
2210 class DecodePathDefBy:
2211 """DEFINED BY representation inside decode path
2213 __slots__ = ("defined_by",)
2215 def __init__(self, defined_by):
2216 self.defined_by = defined_by
2218 def __ne__(self, their):
2219 return not(self == their)
2221 def __eq__(self, their):
2222 if not isinstance(their, self.__class__):
2224 return self.defined_by == their.defined_by
2227 return "DEFINED BY " + str(self.defined_by)
2230 return "<%s: %s>" % (self.__class__.__name__, self.defined_by)
2233 ########################################################################
2235 ########################################################################
2237 PP = namedtuple("PP", (
2260 ), **NAMEDTUPLE_KWARGS)
2265 asn1_type_name="unknown",
2282 expl_lenindef=False,
2313 def _colourize(what, colour, with_colours, attrs=("bold",)):
2314 return colored(what, colour, attrs=attrs) if with_colours else what
2317 def colonize_hex(hexed):
2318 """Separate hexadecimal string with colons
2320 return ":".join(hexed[i:i + 2] for i in range(0, len(hexed), 2))
2323 def find_oid_name(asn1_type_name, oid_maps, value):
2324 if len(oid_maps) > 0 and asn1_type_name == ObjectIdentifier.asn1_type_name:
2325 for oid_map in oid_maps:
2326 oid_name = oid_map.get(value)
2327 if oid_name is not None:
2338 with_decode_path=False,
2339 decode_path_len_decrease=0,
2346 " " if pp.expl_offset is None else
2347 ("-%d" % (pp.offset - pp.expl_offset))
2349 LENINDEF_PP_CHAR if pp.expl_lenindef else " ",
2351 col = _colourize(col, "red", with_colours, ())
2352 col += _colourize("B", "red", with_colours) if pp.bered else " "
2354 col = "[%d,%d,%4d]%s" % (
2355 pp.tlen, pp.llen, pp.vlen,
2356 LENINDEF_PP_CHAR if pp.lenindef else " "
2358 col = _colourize(col, "green", with_colours, ())
2360 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2361 if decode_path_len > 0:
2362 cols.append(" ." * decode_path_len)
2363 ent = pp.decode_path[-1]
2364 if isinstance(ent, DecodePathDefBy):
2365 cols.append(_colourize("DEFINED BY", "red", with_colours, ("reverse",)))
2366 value = str(ent.defined_by)
2367 oid_name = find_oid_name(ent.defined_by.asn1_type_name, oid_maps, value)
2368 if oid_name is None:
2369 cols.append(_colourize("%s:" % value, "white", with_colours, ("reverse",)))
2371 cols.append(_colourize("%s:" % oid_name, "green", with_colours))
2373 cols.append(_colourize("%s:" % ent, "yellow", with_colours, ("reverse",)))
2374 if pp.expl is not None:
2375 klass, _, num = pp.expl
2376 col = "[%s%d] EXPLICIT" % (TagClassReprs[klass], num)
2377 cols.append(_colourize(col, "blue", with_colours))
2378 if pp.impl is not None:
2379 klass, _, num = pp.impl
2380 col = "[%s%d]" % (TagClassReprs[klass], num)
2381 cols.append(_colourize(col, "blue", with_colours))
2382 if pp.asn1_type_name.replace(" ", "") != pp.obj_name.upper():
2383 cols.append(_colourize(pp.obj_name, "magenta", with_colours))
2385 cols.append(_colourize("BER", "red", with_colours))
2386 cols.append(_colourize(pp.asn1_type_name, "cyan", with_colours))
2387 if pp.value is not None:
2389 cols.append(_colourize(value, "white", with_colours, ("reverse",)))
2390 oid_name = find_oid_name(pp.asn1_type_name, oid_maps, pp.value)
2391 if oid_name is not None:
2392 cols.append(_colourize("(%s)" % oid_name, "green", with_colours))
2393 if pp.asn1_type_name == Integer.asn1_type_name:
2394 cols.append(_colourize(
2395 "(%s)" % colonize_hex(pp.obj.tohex()), "green", with_colours,
2398 if pp.blob.__class__ == bytes:
2399 cols.append(hexenc(pp.blob))
2400 elif pp.blob.__class__ == tuple:
2401 cols.append(", ".join(pp.blob))
2403 cols.append(_colourize("OPTIONAL", "red", with_colours))
2405 cols.append(_colourize("DEFAULT", "red", with_colours))
2406 if with_decode_path:
2407 cols.append(_colourize(
2408 "[%s]" % ":".join(str(p) for p in pp.decode_path),
2412 return " ".join(cols)
2415 def pp_console_blob(pp, decode_path_len_decrease=0):
2416 cols = [" " * len("XXXXXYYZZ [X,X,XXXX]Z")]
2417 decode_path_len = len(pp.decode_path) - decode_path_len_decrease
2418 if decode_path_len > 0:
2419 cols.append(" ." * (decode_path_len + 1))
2420 if pp.blob.__class__ == bytes:
2421 blob = hexenc(pp.blob).upper()
2422 for i in range(0, len(blob), 32):
2423 chunk = blob[i:i + 32]
2424 yield " ".join(cols + [colonize_hex(chunk)])
2425 elif pp.blob.__class__ == tuple:
2426 yield " ".join(cols + [", ".join(pp.blob)])
2434 with_decode_path=False,
2435 decode_path_only=(),
2438 """Pretty print object
2440 :param Obj obj: object you want to pretty print
2441 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
2442 Its human readable form is printed when OID is met
2443 :param big_blobs: if large binary objects are met (like OctetString
2444 values), do we need to print them too, on separate
2446 :param with_colours: colourize output, if ``termcolor`` library
2448 :param with_decode_path: print decode path
2449 :param decode_path_only: print only that specified decode path
2451 def _pprint_pps(pps):
2453 if hasattr(pp, "_fields"):
2455 decode_path_only != () and
2457 str(p) for p in pp.decode_path[:len(decode_path_only)]
2458 ) != decode_path_only
2462 yield pp_console_row(
2467 with_colours=with_colours,
2468 with_decode_path=with_decode_path,
2469 decode_path_len_decrease=len(decode_path_only),
2471 for row in pp_console_blob(
2473 decode_path_len_decrease=len(decode_path_only),
2477 yield pp_console_row(
2482 with_colours=with_colours,
2483 with_decode_path=with_decode_path,
2484 decode_path_len_decrease=len(decode_path_only),
2487 for row in _pprint_pps(pp):
2489 return "\n".join(_pprint_pps(obj.pps(decode_path)))
2492 ########################################################################
2493 # ASN.1 primitive types
2494 ########################################################################
2496 BooleanState = namedtuple(
2498 BasicState._fields + ("value",),
2504 """``BOOLEAN`` boolean type
2506 >>> b = Boolean(True)
2508 >>> b == Boolean(True)
2514 tag_default = tag_encode(1)
2515 asn1_type_name = "BOOLEAN"
2527 :param value: set the value. Either boolean type, or
2528 :py:class:`pyderasn.Boolean` object
2529 :param bytes impl: override default tag with ``IMPLICIT`` one
2530 :param bytes expl: override default tag with ``EXPLICIT`` one
2531 :param default: set default value. Type same as in ``value``
2532 :param bool optional: is object ``OPTIONAL`` in sequence
2534 super().__init__(impl, expl, default, optional, _decoded)
2535 self._value = None if value is None else self._value_sanitize(value)
2536 if default is not None:
2537 default = self._value_sanitize(default)
2538 self.default = self.__class__(
2544 self._value = default
2546 def _value_sanitize(self, value):
2547 if value.__class__ == bool:
2549 if issubclass(value.__class__, Boolean):
2551 raise InvalidValueType((self.__class__, bool))
2555 return self._value is not None
2557 def __getstate__(self):
2558 return BooleanState(
2574 def __setstate__(self, state):
2575 super().__setstate__(state)
2576 self._value = state.value
2578 def __nonzero__(self):
2579 self._assert_ready()
2583 self._assert_ready()
2586 def __eq__(self, their):
2587 if their.__class__ == bool:
2588 return self._value == their
2589 if not issubclass(their.__class__, Boolean):
2592 self._value == their._value and
2593 self.tag == their.tag and
2594 self._expl == their._expl
2605 return self.__class__(
2607 impl=self.tag if impl is None else impl,
2608 expl=self._expl if expl is None else expl,
2609 default=self.default if default is None else default,
2610 optional=self.optional if optional is None else optional,
2614 self._assert_ready()
2615 return b"".join((self.tag, LEN1, (b"\xFF" if self._value else b"\x00")))
2617 def _encode1st(self, state):
2618 return len(self.tag) + 2, state
2620 def _encode2nd(self, writer, state_iter):
2621 self._assert_ready()
2622 write_full(writer, self._encode())
2624 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2626 t, _, lv = tag_strip(tlv)
2627 except DecodeError as err:
2628 raise err.__class__(
2630 klass=self.__class__,
2631 decode_path=decode_path,
2636 klass=self.__class__,
2637 decode_path=decode_path,
2644 l, _, v = len_decode(lv)
2645 except DecodeError as err:
2646 raise err.__class__(
2648 klass=self.__class__,
2649 decode_path=decode_path,
2653 raise InvalidLength(
2654 "Boolean's length must be equal to 1",
2655 klass=self.__class__,
2656 decode_path=decode_path,
2660 raise NotEnoughData(
2661 "encoded length is longer than data",
2662 klass=self.__class__,
2663 decode_path=decode_path,
2668 if first_octet == 0:
2670 elif first_octet == 0xFF:
2672 elif ctx.get("bered", False):
2677 "unacceptable Boolean value",
2678 klass=self.__class__,
2679 decode_path=decode_path,
2682 obj = self.__class__(
2686 default=self.default,
2687 optional=self.optional,
2688 _decoded=(offset, 1, 1),
2690 obj.ber_encoded = ber_encoded
2691 yield decode_path, obj, v[1:]
2694 return pp_console_row(next(self.pps()))
2696 def pps(self, decode_path=()):
2699 asn1_type_name=self.asn1_type_name,
2700 obj_name=self.__class__.__name__,
2701 decode_path=decode_path,
2702 value=str(self._value) if self.ready else None,
2703 optional=self.optional,
2704 default=self == self.default,
2705 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
2706 expl=None if self._expl is None else tag_decode(self._expl),
2711 expl_offset=self.expl_offset if self.expled else None,
2712 expl_tlen=self.expl_tlen if self.expled else None,
2713 expl_llen=self.expl_llen if self.expled else None,
2714 expl_vlen=self.expl_vlen if self.expled else None,
2715 expl_lenindef=self.expl_lenindef,
2716 ber_encoded=self.ber_encoded,
2719 for pp in self.pps_lenindef(decode_path):
2723 IntegerState = namedtuple(
2725 BasicState._fields + ("specs", "value", "bound_min", "bound_max"),
2731 """``INTEGER`` integer type
2733 >>> b = Integer(-123)
2735 >>> b == Integer(-123)
2740 >>> Integer(2, bounds=(1, 3))
2742 >>> Integer(5, bounds=(1, 3))
2743 Traceback (most recent call last):
2744 pyderasn.BoundsError: unsatisfied bounds: 1 <= 5 <= 3
2748 class Version(Integer):
2755 >>> v = Version("v1")
2762 {'v3': 2, 'v1': 0, 'v2': 1}
2764 __slots__ = ("specs", "_bound_min", "_bound_max")
2765 tag_default = tag_encode(2)
2766 asn1_type_name = "INTEGER"
2780 :param value: set the value. Either integer type, named value
2781 (if ``schema`` is specified in the class), or
2782 :py:class:`pyderasn.Integer` object
2783 :param bounds: set ``(MIN, MAX)`` value constraint.
2784 (-inf, +inf) by default
2785 :param bytes impl: override default tag with ``IMPLICIT`` one
2786 :param bytes expl: override default tag with ``EXPLICIT`` one
2787 :param default: set default value. Type same as in ``value``
2788 :param bool optional: is object ``OPTIONAL`` in sequence
2790 super().__init__(impl, expl, default, optional, _decoded)
2792 specs = getattr(self, "schema", {}) if _specs is None else _specs
2793 self.specs = specs if specs.__class__ == dict else dict(specs)
2794 self._bound_min, self._bound_max = getattr(
2797 (float("-inf"), float("+inf")),
2798 ) if bounds is None else bounds
2799 if value is not None:
2800 self._value = self._value_sanitize(value)
2801 if default is not None:
2802 default = self._value_sanitize(default)
2803 self.default = self.__class__(
2809 if self._value is None:
2810 self._value = default
2812 def _value_sanitize(self, value):
2813 if isinstance(value, int):
2815 elif issubclass(value.__class__, Integer):
2816 value = value._value
2817 elif value.__class__ == str:
2818 value = self.specs.get(value)
2820 raise ObjUnknown("integer value: %s" % value)
2822 raise InvalidValueType((self.__class__, int, str))
2823 if not self._bound_min <= value <= self._bound_max:
2824 raise BoundsError(self._bound_min, value, self._bound_max)
2829 return self._value is not None
2831 def __getstate__(self):
2832 return IntegerState(
2851 def __setstate__(self, state):
2852 super().__setstate__(state)
2853 self.specs = state.specs
2854 self._value = state.value
2855 self._bound_min = state.bound_min
2856 self._bound_max = state.bound_max
2859 self._assert_ready()
2860 return int(self._value)
2863 """Hexadecimal representation
2865 Use :py:func:`pyderasn.colonize_hex` for colonizing it.
2867 hex_repr = hex(int(self))[2:].upper()
2868 if len(hex_repr) % 2 != 0:
2869 hex_repr = "0" + hex_repr
2873 self._assert_ready()
2874 return hash(b"".join((
2876 bytes(self._expl or b""),
2877 str(self._value).encode("ascii"),
2880 def __eq__(self, their):
2881 if isinstance(their, int):
2882 return self._value == their
2883 if not issubclass(their.__class__, Integer):
2886 self._value == their._value and
2887 self.tag == their.tag and
2888 self._expl == their._expl
2891 def __lt__(self, their):
2892 return self._value < their._value
2896 """Return named representation (if exists) of the value
2898 for name, value in self.specs.items():
2899 if value == self._value:
2912 return self.__class__(
2915 (self._bound_min, self._bound_max)
2916 if bounds is None else bounds
2918 impl=self.tag if impl is None else impl,
2919 expl=self._expl if expl is None else expl,
2920 default=self.default if default is None else default,
2921 optional=self.optional if optional is None else optional,
2925 def _encode_payload(self):
2926 self._assert_ready()
2928 bytes_len = ceil(value.bit_length() / 8) or 1
2931 octets = value.to_bytes(bytes_len, byteorder="big", signed=True)
2932 except OverflowError:
2939 octets = self._encode_payload()
2940 return b"".join((self.tag, len_encode(len(octets)), octets))
2942 def _encode1st(self, state):
2943 l = len(self._encode_payload())
2944 return len(self.tag) + len_size(l) + l, state
2946 def _encode2nd(self, writer, state_iter):
2947 write_full(writer, self._encode())
2949 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
2951 t, _, lv = tag_strip(tlv)
2952 except DecodeError as err:
2953 raise err.__class__(
2955 klass=self.__class__,
2956 decode_path=decode_path,
2961 klass=self.__class__,
2962 decode_path=decode_path,
2969 l, llen, v = len_decode(lv)
2970 except DecodeError as err:
2971 raise err.__class__(
2973 klass=self.__class__,
2974 decode_path=decode_path,
2978 raise NotEnoughData(
2979 "encoded length is longer than data",
2980 klass=self.__class__,
2981 decode_path=decode_path,
2985 raise NotEnoughData(
2987 klass=self.__class__,
2988 decode_path=decode_path,
2991 v, tail = v[:l], v[l:]
2996 ((first_octet == 0x00) and (second_octet & 0x80 == 0)) or
2997 ((first_octet == 0xFF) and (second_octet & 0x80 != 0))
3000 "non normalized integer",
3001 klass=self.__class__,
3002 decode_path=decode_path,
3005 value = int.from_bytes(v, byteorder="big", signed=True)
3007 obj = self.__class__(
3009 bounds=(self._bound_min, self._bound_max),
3012 default=self.default,
3013 optional=self.optional,
3015 _decoded=(offset, llen, l),
3017 except BoundsError as err:
3020 klass=self.__class__,
3021 decode_path=decode_path,
3024 yield decode_path, obj, tail
3027 return pp_console_row(next(self.pps()))
3029 def pps(self, decode_path=()):
3032 asn1_type_name=self.asn1_type_name,
3033 obj_name=self.__class__.__name__,
3034 decode_path=decode_path,
3035 value=(self.named or str(self._value)) if self.ready else None,
3036 optional=self.optional,
3037 default=self == self.default,
3038 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3039 expl=None if self._expl is None else tag_decode(self._expl),
3044 expl_offset=self.expl_offset if self.expled else None,
3045 expl_tlen=self.expl_tlen if self.expled else None,
3046 expl_llen=self.expl_llen if self.expled else None,
3047 expl_vlen=self.expl_vlen if self.expled else None,
3048 expl_lenindef=self.expl_lenindef,
3051 for pp in self.pps_lenindef(decode_path):
3055 BitStringState = namedtuple(
3057 BasicState._fields + ("specs", "value", "tag_constructed", "defined"),
3062 class BitString(Obj):
3063 """``BIT STRING`` bit string type
3065 >>> BitString(b"hello world")
3066 BIT STRING 88 bits 68656c6c6f20776f726c64
3069 >>> b == b"hello world"
3074 >>> BitString("'0A3B5F291CD'H")
3075 BIT STRING 44 bits 0a3b5f291cd0
3076 >>> b = BitString("'010110000000'B")
3077 BIT STRING 12 bits 5800
3080 >>> b[0], b[1], b[2], b[3]
3081 (False, True, False, True)
3085 [False, True, False, True, True, False, False, False, False, False, False, False]
3089 class KeyUsage(BitString):
3091 ("digitalSignature", 0),
3092 ("nonRepudiation", 1),
3093 ("keyEncipherment", 2),
3096 >>> b = KeyUsage(("keyEncipherment", "nonRepudiation"))
3097 KeyUsage BIT STRING 3 bits nonRepudiation, keyEncipherment
3099 ['nonRepudiation', 'keyEncipherment']
3101 {'nonRepudiation': 1, 'digitalSignature': 0, 'keyEncipherment': 2}
3105 Pay attention that BIT STRING can be encoded both in primitive
3106 and constructed forms. Decoder always checks constructed form tag
3107 additionally to specified primitive one. If BER decoding is
3108 :ref:`not enabled <bered_ctx>`, then decoder will fail, because
3109 of DER restrictions.
3111 __slots__ = ("tag_constructed", "specs", "defined")
3112 tag_default = tag_encode(3)
3113 asn1_type_name = "BIT STRING"
3126 :param value: set the value. Either binary type, tuple of named
3127 values (if ``schema`` is specified in the class),
3128 string in ``'XXX...'B`` form, or
3129 :py:class:`pyderasn.BitString` object
3130 :param bytes impl: override default tag with ``IMPLICIT`` one
3131 :param bytes expl: override default tag with ``EXPLICIT`` one
3132 :param default: set default value. Type same as in ``value``
3133 :param bool optional: is object ``OPTIONAL`` in sequence
3135 super().__init__(impl, expl, default, optional, _decoded)
3136 specs = getattr(self, "schema", {}) if _specs is None else _specs
3137 self.specs = specs if specs.__class__ == dict else dict(specs)
3138 self._value = None if value is None else self._value_sanitize(value)
3139 if default is not None:
3140 default = self._value_sanitize(default)
3141 self.default = self.__class__(
3147 self._value = default
3149 tag_klass, _, tag_num = tag_decode(self.tag)
3150 self.tag_constructed = tag_encode(
3152 form=TagFormConstructed,
3156 def _bits2octets(self, bits):
3157 if len(self.specs) > 0:
3158 bits = bits.rstrip("0")
3160 bits += "0" * ((8 - (bit_len % 8)) % 8)
3161 octets = bytearray(len(bits) // 8)
3162 for i in range(len(octets)):
3163 octets[i] = int(bits[i * 8:(i * 8) + 8], 2)
3164 return bit_len, bytes(octets)
3166 def _value_sanitize(self, value):
3167 if isinstance(value, (str, bytes)):
3169 isinstance(value, str) and
3170 value.startswith("'")
3172 if value.endswith("'B"):
3174 if not frozenset(value) <= SET01:
3175 raise ValueError("B's coding contains unacceptable chars")
3176 return self._bits2octets(value)
3177 if value.endswith("'H"):
3181 hexdec(value + ("" if len(value) % 2 == 0 else "0")),
3183 if value.__class__ == bytes:
3184 return (len(value) * 8, value)
3185 raise InvalidValueType((self.__class__, str, bytes))
3186 if value.__class__ == tuple:
3189 isinstance(value[0], int) and
3190 value[1].__class__ == bytes
3195 bit = self.specs.get(name)
3197 raise ObjUnknown("BitString value: %s" % name)
3200 return self._bits2octets("")
3201 bits = frozenset(bits)
3202 return self._bits2octets("".join(
3203 ("1" if bit in bits else "0")
3204 for bit in range(max(bits) + 1)
3206 if issubclass(value.__class__, BitString):
3208 raise InvalidValueType((self.__class__, bytes, str))
3212 return self._value is not None
3214 def __getstate__(self):
3215 return BitStringState(
3230 self.tag_constructed,
3234 def __setstate__(self, state):
3235 super().__setstate__(state)
3236 self.specs = state.specs
3237 self._value = state.value
3238 self.tag_constructed = state.tag_constructed
3239 self.defined = state.defined
3242 self._assert_ready()
3243 for i in range(self._value[0]):
3248 """Returns number of bits in the string
3250 self._assert_ready()
3251 return self._value[0]
3253 def __bytes__(self):
3254 self._assert_ready()
3255 return self._value[1]
3257 def __eq__(self, their):
3258 if their.__class__ == bytes:
3259 return self._value[1] == their
3260 if not issubclass(their.__class__, BitString):
3263 self._value == their._value and
3264 self.tag == their.tag and
3265 self._expl == their._expl
3270 """Named representation (if exists) of the bits
3272 :returns: [str(name), ...]
3274 return [name for name, bit in self.specs.items() if self[bit]]
3284 return self.__class__(
3286 impl=self.tag if impl is None else impl,
3287 expl=self._expl if expl is None else expl,
3288 default=self.default if default is None else default,
3289 optional=self.optional if optional is None else optional,
3293 def __getitem__(self, key):
3294 if key.__class__ == int:
3295 bit_len, octets = self._value
3298 return memoryview(octets)[key // 8] >> (7 - (key % 8)) & 1 == 1
3299 if isinstance(key, str):
3300 value = self.specs.get(key)
3302 raise ObjUnknown("BitString value: %s" % key)
3304 raise InvalidValueType((int, str))
3307 self._assert_ready()
3308 bit_len, octets = self._value
3311 len_encode(len(octets) + 1),
3312 int2byte((8 - bit_len % 8) % 8),
3316 def _encode1st(self, state):
3317 self._assert_ready()
3318 _, octets = self._value
3320 return len(self.tag) + len_size(l) + l, state
3322 def _encode2nd(self, writer, state_iter):
3323 bit_len, octets = self._value
3324 write_full(writer, b"".join((
3326 len_encode(len(octets) + 1),
3327 int2byte((8 - bit_len % 8) % 8),
3329 write_full(writer, octets)
3331 def _encode_cer(self, writer):
3332 bit_len, octets = self._value
3333 if len(octets) + 1 <= 1000:
3334 write_full(writer, self._encode())
3336 write_full(writer, self.tag_constructed)
3337 write_full(writer, LENINDEF)
3338 for offset in range(0, (len(octets) // 999) * 999, 999):
3339 write_full(writer, b"".join((
3340 BitString.tag_default,
3343 octets[offset:offset + 999],
3345 tail = octets[offset + 999:]
3347 tail = int2byte((8 - bit_len % 8) % 8) + tail
3348 write_full(writer, b"".join((
3349 BitString.tag_default,
3350 len_encode(len(tail)),
3353 write_full(writer, EOC)
3355 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3357 t, tlen, lv = tag_strip(tlv)
3358 except DecodeError as err:
3359 raise err.__class__(
3361 klass=self.__class__,
3362 decode_path=decode_path,
3366 if tag_only: # pragma: no cover
3370 l, llen, v = len_decode(lv)
3371 except DecodeError as err:
3372 raise err.__class__(
3374 klass=self.__class__,
3375 decode_path=decode_path,
3379 raise NotEnoughData(
3380 "encoded length is longer than data",
3381 klass=self.__class__,
3382 decode_path=decode_path,
3386 raise NotEnoughData(
3388 klass=self.__class__,
3389 decode_path=decode_path,
3393 if l == 1 and pad_size != 0:
3395 "invalid empty value",
3396 klass=self.__class__,
3397 decode_path=decode_path,
3403 klass=self.__class__,
3404 decode_path=decode_path,
3407 if v[l - 1] & ((1 << pad_size) - 1) != 0:
3410 klass=self.__class__,
3411 decode_path=decode_path,
3414 v, tail = v[:l], v[l:]
3415 bit_len = (len(v) - 1) * 8 - pad_size
3416 obj = self.__class__(
3417 value=None if evgen_mode else (bit_len, v[1:].tobytes()),
3420 default=self.default,
3421 optional=self.optional,
3423 _decoded=(offset, llen, l),
3426 obj._value = (bit_len, None)
3427 yield decode_path, obj, tail
3429 if t != self.tag_constructed:
3431 klass=self.__class__,
3432 decode_path=decode_path,
3435 if not ctx.get("bered", False):
3437 "unallowed BER constructed encoding",
3438 klass=self.__class__,
3439 decode_path=decode_path,
3442 if tag_only: # pragma: no cover
3447 l, llen, v = len_decode(lv)
3448 except LenIndefForm:
3449 llen, l, v = 1, 0, lv[1:]
3451 except DecodeError as err:
3452 raise err.__class__(
3454 klass=self.__class__,
3455 decode_path=decode_path,
3459 raise NotEnoughData(
3460 "encoded length is longer than data",
3461 klass=self.__class__,
3462 decode_path=decode_path,
3465 if not lenindef and l == 0:
3466 raise NotEnoughData(
3468 klass=self.__class__,
3469 decode_path=decode_path,
3473 sub_offset = offset + tlen + llen
3477 if v[:EOC_LEN].tobytes() == EOC:
3484 "chunk out of bounds",
3485 klass=self.__class__,
3486 decode_path=decode_path + (str(len(chunks) - 1),),
3487 offset=chunks[-1].offset,
3489 sub_decode_path = decode_path + (str(len(chunks)),)
3492 for _decode_path, chunk, v_tail in BitString().decode_evgen(
3495 decode_path=sub_decode_path,
3498 _ctx_immutable=False,
3500 yield _decode_path, chunk, v_tail
3502 _, chunk, v_tail = next(BitString().decode_evgen(
3505 decode_path=sub_decode_path,
3508 _ctx_immutable=False,
3513 "expected BitString encoded chunk",
3514 klass=self.__class__,
3515 decode_path=sub_decode_path,
3518 chunks.append(chunk)
3519 sub_offset += chunk.tlvlen
3520 vlen += chunk.tlvlen
3522 if len(chunks) == 0:
3525 klass=self.__class__,
3526 decode_path=decode_path,
3531 for chunk_i, chunk in enumerate(chunks[:-1]):
3532 if chunk.bit_len % 8 != 0:
3534 "BitString chunk is not multiple of 8 bits",
3535 klass=self.__class__,
3536 decode_path=decode_path + (str(chunk_i),),
3537 offset=chunk.offset,
3540 values.append(bytes(chunk))
3541 bit_len += chunk.bit_len
3542 chunk_last = chunks[-1]
3544 values.append(bytes(chunk_last))
3545 bit_len += chunk_last.bit_len
3546 obj = self.__class__(
3547 value=None if evgen_mode else (bit_len, b"".join(values)),
3550 default=self.default,
3551 optional=self.optional,
3553 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
3556 obj._value = (bit_len, None)
3557 obj.lenindef = lenindef
3558 obj.ber_encoded = True
3559 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
3562 return pp_console_row(next(self.pps()))
3564 def pps(self, decode_path=()):
3568 bit_len, blob = self._value
3569 value = "%d bits" % bit_len
3570 if len(self.specs) > 0 and blob is not None:
3571 blob = tuple(self.named)
3574 asn1_type_name=self.asn1_type_name,
3575 obj_name=self.__class__.__name__,
3576 decode_path=decode_path,
3579 optional=self.optional,
3580 default=self == self.default,
3581 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
3582 expl=None if self._expl is None else tag_decode(self._expl),
3587 expl_offset=self.expl_offset if self.expled else None,
3588 expl_tlen=self.expl_tlen if self.expled else None,
3589 expl_llen=self.expl_llen if self.expled else None,
3590 expl_vlen=self.expl_vlen if self.expled else None,
3591 expl_lenindef=self.expl_lenindef,
3592 lenindef=self.lenindef,
3593 ber_encoded=self.ber_encoded,
3596 defined_by, defined = self.defined or (None, None)
3597 if defined_by is not None:
3599 decode_path=decode_path + (DecodePathDefBy(defined_by),)
3601 for pp in self.pps_lenindef(decode_path):
3605 OctetStringState = namedtuple(
3607 BasicState._fields + (
3618 class OctetString(Obj):
3619 """``OCTET STRING`` binary string type
3621 >>> s = OctetString(b"hello world")
3622 OCTET STRING 11 bytes 68656c6c6f20776f726c64
3623 >>> s == OctetString(b"hello world")
3628 >>> OctetString(b"hello", bounds=(4, 4))
3629 Traceback (most recent call last):
3630 pyderasn.BoundsError: unsatisfied bounds: 4 <= 5 <= 4
3631 >>> OctetString(b"hell", bounds=(4, 4))
3632 OCTET STRING 4 bytes 68656c6c
3634 Memoryviews can be used as a values. If memoryview is made on
3635 mmap-ed file, then it does not take storage inside OctetString
3636 itself. In CER encoding mode it will be streamed to the specified
3637 writer, copying 1 KB chunks.
3639 __slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
3640 tag_default = tag_encode(4)
3641 asn1_type_name = "OCTET STRING"
3642 evgen_mode_skip_value = True
3643 memoryview_safe = True
3657 :param value: set the value. Either binary type, or
3658 :py:class:`pyderasn.OctetString` object
3659 :param bounds: set ``(MIN, MAX)`` value size constraint.
3660 (-inf, +inf) by default
3661 :param bytes impl: override default tag with ``IMPLICIT`` one
3662 :param bytes expl: override default tag with ``EXPLICIT`` one
3663 :param default: set default value. Type same as in ``value``
3664 :param bool optional: is object ``OPTIONAL`` in sequence
3666 super().__init__(impl, expl, default, optional, _decoded)
3668 self._bound_min, self._bound_max = getattr(
3672 ) if bounds is None else bounds
3673 if value is not None:
3674 self._value = self._value_sanitize(value)
3675 if default is not None:
3676 default = self._value_sanitize(default)
3677 self.default = self.__class__(
3682 if self._value is None:
3683 self._value = default
3685 tag_klass, _, tag_num = tag_decode(self.tag)
3686 self.tag_constructed = tag_encode(
3688 form=TagFormConstructed,
3692 def _value_sanitize(self, value):
3693 if value.__class__ == bytes or value.__class__ == memoryview:
3695 elif issubclass(value.__class__, OctetString):
3696 value = value._value
3698 raise InvalidValueType((self.__class__, bytes, memoryview))
3699 if not self._bound_min <= len(value) <= self._bound_max:
3700 raise BoundsError(self._bound_min, len(value), self._bound_max)
3705 return self._value is not None
3707 def __getstate__(self):
3708 return OctetStringState(
3724 self.tag_constructed,
3728 def __setstate__(self, state):
3729 super().__setstate__(state)
3730 self._value = state.value
3731 self._bound_min = state.bound_min
3732 self._bound_max = state.bound_max
3733 self.tag_constructed = state.tag_constructed
3734 self.defined = state.defined
3736 def __bytes__(self):
3737 self._assert_ready()
3738 return bytes(self._value)
3740 def memoryview(self):
3741 self._assert_ready()
3742 return memoryview(self._value)
3744 def __eq__(self, their):
3745 if their.__class__ == bytes:
3746 return self._value == their
3747 if not issubclass(their.__class__, OctetString):
3750 self._value == their._value and
3751 self.tag == their.tag and
3752 self._expl == their._expl
3755 def __lt__(self, their):
3756 return self._value < their._value
3767 return self.__class__(
3770 (self._bound_min, self._bound_max)
3771 if bounds is None else bounds
3773 impl=self.tag if impl is None else impl,
3774 expl=self._expl if expl is None else expl,
3775 default=self.default if default is None else default,
3776 optional=self.optional if optional is None else optional,
3780 self._assert_ready()
3783 len_encode(len(self._value)),
3787 def _encode1st(self, state):
3788 self._assert_ready()
3789 l = len(self._value)
3790 return len(self.tag) + len_size(l) + l, state
3792 def _encode2nd(self, writer, state_iter):
3794 write_full(writer, self.tag + len_encode(len(value)))
3795 write_full(writer, value)
3797 def _encode_cer(self, writer):
3798 octets = self._value
3799 if len(octets) <= 1000:
3800 write_full(writer, self._encode())
3802 write_full(writer, self.tag_constructed)
3803 write_full(writer, LENINDEF)
3804 for offset in range(0, (len(octets) // 1000) * 1000, 1000):
3805 write_full(writer, b"".join((
3806 OctetString.tag_default,
3808 octets[offset:offset + 1000],
3810 tail = octets[offset + 1000:]
3812 write_full(writer, b"".join((
3813 OctetString.tag_default,
3814 len_encode(len(tail)),
3817 write_full(writer, EOC)
3819 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
3821 t, tlen, lv = tag_strip(tlv)
3822 except DecodeError as err:
3823 raise err.__class__(
3825 klass=self.__class__,
3826 decode_path=decode_path,
3834 l, llen, v = len_decode(lv)
3835 except DecodeError as err:
3836 raise err.__class__(
3838 klass=self.__class__,
3839 decode_path=decode_path,
3843 raise NotEnoughData(
3844 "encoded length is longer than data",
3845 klass=self.__class__,
3846 decode_path=decode_path,
3849 v, tail = v[:l], v[l:]
3850 if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
3852 msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
3853 klass=self.__class__,
3854 decode_path=decode_path,
3857 if evgen_mode and self.evgen_mode_skip_value:
3859 elif self.memoryview_safe and ctx.get("keep_memoryview", False):
3864 obj = self.__class__(
3866 bounds=(self._bound_min, self._bound_max),
3869 default=self.default,
3870 optional=self.optional,
3871 _decoded=(offset, llen, l),
3874 except DecodeError as err:
3877 klass=self.__class__,
3878 decode_path=decode_path,
3881 except BoundsError as err:
3884 klass=self.__class__,
3885 decode_path=decode_path,
3888 yield decode_path, obj, tail
3890 if t != self.tag_constructed:
3892 klass=self.__class__,
3893 decode_path=decode_path,
3896 if not ctx.get("bered", False):
3898 "unallowed BER constructed encoding",
3899 klass=self.__class__,
3900 decode_path=decode_path,
3908 l, llen, v = len_decode(lv)
3909 except LenIndefForm:
3910 llen, l, v = 1, 0, lv[1:]
3912 except DecodeError as err:
3913 raise err.__class__(
3915 klass=self.__class__,
3916 decode_path=decode_path,
3920 raise NotEnoughData(
3921 "encoded length is longer than data",
3922 klass=self.__class__,
3923 decode_path=decode_path,
3928 sub_offset = offset + tlen + llen
3933 if v[:EOC_LEN].tobytes() == EOC:
3940 "chunk out of bounds",
3941 klass=self.__class__,
3942 decode_path=decode_path + (str(len(chunks) - 1),),
3943 offset=chunks[-1].offset,
3947 sub_decode_path = decode_path + (str(chunks_count),)
3948 for _decode_path, chunk, v_tail in OctetString().decode_evgen(
3951 decode_path=sub_decode_path,
3954 _ctx_immutable=False,
3956 yield _decode_path, chunk, v_tail
3957 if not chunk.ber_encoded:
3958 payload_len += chunk.vlen
3961 sub_decode_path = decode_path + (str(len(chunks)),)
3962 _, chunk, v_tail = next(OctetString().decode_evgen(
3965 decode_path=sub_decode_path,
3968 _ctx_immutable=False,
3971 chunks.append(chunk)
3974 "expected OctetString encoded chunk",
3975 klass=self.__class__,
3976 decode_path=sub_decode_path,
3979 sub_offset += chunk.tlvlen
3980 vlen += chunk.tlvlen
3982 if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
3984 msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
3985 klass=self.__class__,
3986 decode_path=decode_path,
3990 obj = self.__class__(
3992 None if evgen_mode else
3993 b"".join(bytes(chunk) for chunk in chunks)
3995 bounds=(self._bound_min, self._bound_max),
3998 default=self.default,
3999 optional=self.optional,
4000 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
4003 except DecodeError as err:
4006 klass=self.__class__,
4007 decode_path=decode_path,
4010 except BoundsError as err:
4013 klass=self.__class__,
4014 decode_path=decode_path,
4017 obj.lenindef = lenindef
4018 obj.ber_encoded = True
4019 yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
4022 return pp_console_row(next(self.pps()))
4024 def pps(self, decode_path=()):
4027 asn1_type_name=self.asn1_type_name,
4028 obj_name=self.__class__.__name__,
4029 decode_path=decode_path,
4030 value=("%d bytes" % len(self._value)) if self.ready else None,
4031 blob=self._value if self.ready else None,
4032 optional=self.optional,
4033 default=self == self.default,
4034 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4035 expl=None if self._expl is None else tag_decode(self._expl),
4040 expl_offset=self.expl_offset if self.expled else None,
4041 expl_tlen=self.expl_tlen if self.expled else None,
4042 expl_llen=self.expl_llen if self.expled else None,
4043 expl_vlen=self.expl_vlen if self.expled else None,
4044 expl_lenindef=self.expl_lenindef,
4045 lenindef=self.lenindef,
4046 ber_encoded=self.ber_encoded,
4049 defined_by, defined = self.defined or (None, None)
4050 if defined_by is not None:
4052 decode_path=decode_path + (DecodePathDefBy(defined_by),)
4054 for pp in self.pps_lenindef(decode_path):
4058 def agg_octet_string(evgens, decode_path, raw, writer):
4059 """Aggregate constructed string (OctetString and its derivatives)
4061 :param evgens: iterator of generated events
4062 :param decode_path: points to the string we want to decode
4063 :param raw: slicebable (memoryview, bytearray, etc) with
4064 the data evgens are generated on
4065 :param writer: buffer.write where string is going to be saved
4066 :param writer: where string is going to be saved. Must comply
4067 with ``io.RawIOBase.write`` behaviour
4069 .. seealso:: :ref:`agg_octet_string`
4071 decode_path_len = len(decode_path)
4072 for dp, obj, _ in evgens:
4073 if dp[:decode_path_len] != decode_path:
4075 if not obj.ber_encoded:
4076 write_full(writer, raw[
4077 obj.offset + obj.tlen + obj.llen:
4078 obj.offset + obj.tlen + obj.llen + obj.vlen -
4079 (EOC_LEN if obj.expl_lenindef else 0)
4081 if len(dp) == decode_path_len:
4085 NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
4089 """``NULL`` null object
4097 tag_default = tag_encode(5)
4098 asn1_type_name = "NULL"
4102 value=None, # unused, but Sequence passes it
4109 :param bytes impl: override default tag with ``IMPLICIT`` one
4110 :param bytes expl: override default tag with ``EXPLICIT`` one
4111 :param bool optional: is object ``OPTIONAL`` in sequence
4113 super().__init__(impl, expl, None, optional, _decoded)
4120 def __getstate__(self):
4136 def __eq__(self, their):
4137 if not issubclass(their.__class__, Null):
4140 self.tag == their.tag and
4141 self._expl == their._expl
4151 return self.__class__(
4152 impl=self.tag if impl is None else impl,
4153 expl=self._expl if expl is None else expl,
4154 optional=self.optional if optional is None else optional,
4158 return self.tag + LEN0
4160 def _encode1st(self, state):
4161 return len(self.tag) + 1, state
4163 def _encode2nd(self, writer, state_iter):
4164 write_full(writer, self.tag + LEN0)
4166 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4168 t, _, lv = tag_strip(tlv)
4169 except DecodeError as err:
4170 raise err.__class__(
4172 klass=self.__class__,
4173 decode_path=decode_path,
4178 klass=self.__class__,
4179 decode_path=decode_path,
4182 if tag_only: # pragma: no cover
4186 l, _, v = len_decode(lv)
4187 except DecodeError as err:
4188 raise err.__class__(
4190 klass=self.__class__,
4191 decode_path=decode_path,
4195 raise InvalidLength(
4196 "Null must have zero length",
4197 klass=self.__class__,
4198 decode_path=decode_path,
4201 obj = self.__class__(
4204 optional=self.optional,
4205 _decoded=(offset, 1, 0),
4207 yield decode_path, obj, v
4210 return pp_console_row(next(self.pps()))
4212 def pps(self, decode_path=()):
4215 asn1_type_name=self.asn1_type_name,
4216 obj_name=self.__class__.__name__,
4217 decode_path=decode_path,
4218 optional=self.optional,
4219 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4220 expl=None if self._expl is None else tag_decode(self._expl),
4225 expl_offset=self.expl_offset if self.expled else None,
4226 expl_tlen=self.expl_tlen if self.expled else None,
4227 expl_llen=self.expl_llen if self.expled else None,
4228 expl_vlen=self.expl_vlen if self.expled else None,
4229 expl_lenindef=self.expl_lenindef,
4232 for pp in self.pps_lenindef(decode_path):
4236 ObjectIdentifierState = namedtuple(
4237 "ObjectIdentifierState",
4238 BasicState._fields + ("value", "defines"),
4243 class ObjectIdentifier(Obj):
4244 """``OBJECT IDENTIFIER`` OID type
4246 >>> oid = ObjectIdentifier((1, 2, 3))
4247 OBJECT IDENTIFIER 1.2.3
4248 >>> oid == ObjectIdentifier("1.2.3")
4254 >>> oid + (4, 5) + ObjectIdentifier("1.7")
4255 OBJECT IDENTIFIER 1.2.3.4.5.1.7
4257 >>> str(ObjectIdentifier((3, 1)))
4258 Traceback (most recent call last):
4259 pyderasn.InvalidOID: unacceptable first arc value
4261 __slots__ = ("defines",)
4262 tag_default = tag_encode(6)
4263 asn1_type_name = "OBJECT IDENTIFIER"
4276 :param value: set the value. Either tuples of integers,
4277 string of "."-concatenated integers, or
4278 :py:class:`pyderasn.ObjectIdentifier` object
4279 :param defines: sequence of tuples. Each tuple has two elements.
4280 First one is relative to current one decode
4281 path, aiming to the field defined by that OID.
4282 Read about relative path in
4283 :py:func:`pyderasn.abs_decode_path`. Second
4284 tuple element is ``{OID: pyderasn.Obj()}``
4285 dictionary, mapping between current OID value
4286 and structure applied to defined field.
4288 .. seealso:: :ref:`definedby`
4290 :param bytes impl: override default tag with ``IMPLICIT`` one
4291 :param bytes expl: override default tag with ``EXPLICIT`` one
4292 :param default: set default value. Type same as in ``value``
4293 :param bool optional: is object ``OPTIONAL`` in sequence
4295 super().__init__(impl, expl, default, optional, _decoded)
4297 if value is not None:
4298 self._value = self._value_sanitize(value)
4299 if default is not None:
4300 default = self._value_sanitize(default)
4301 self.default = self.__class__(
4306 if self._value is None:
4307 self._value = default
4308 self.defines = defines
4310 def __add__(self, their):
4311 if their.__class__ == tuple:
4312 return self.__class__(self._value + array("L", their))
4313 if isinstance(their, self.__class__):
4314 return self.__class__(self._value + their._value)
4315 raise InvalidValueType((self.__class__, tuple))
4317 def _value_sanitize(self, value):
4318 if issubclass(value.__class__, ObjectIdentifier):
4320 if isinstance(value, str):
4322 value = array("L", (pureint(arc) for arc in value.split(".")))
4324 raise InvalidOID("unacceptable arcs values")
4325 if value.__class__ == tuple:
4327 value = array("L", value)
4328 except OverflowError as err:
4329 raise InvalidOID(repr(err))
4330 if value.__class__ is array:
4332 raise InvalidOID("less than 2 arcs")
4333 first_arc = value[0]
4334 if first_arc in (0, 1):
4335 if not (0 <= value[1] <= 39):
4336 raise InvalidOID("second arc is too wide")
4337 elif first_arc == 2:
4340 raise InvalidOID("unacceptable first arc value")
4341 if not all(arc >= 0 for arc in value):
4342 raise InvalidOID("negative arc value")
4344 raise InvalidValueType((self.__class__, str, tuple))
4348 return self._value is not None
4350 def __getstate__(self):
4351 return ObjectIdentifierState(
4368 def __setstate__(self, state):
4369 super().__setstate__(state)
4370 self._value = state.value
4371 self.defines = state.defines
4374 self._assert_ready()
4375 return iter(self._value)
4378 return ".".join(str(arc) for arc in self._value or ())
4381 self._assert_ready()
4382 return hash(b"".join((
4384 bytes(self._expl or b""),
4385 str(self._value).encode("ascii"),
4388 def __eq__(self, their):
4389 if their.__class__ == tuple:
4390 return self._value == array("L", their)
4391 if not issubclass(their.__class__, ObjectIdentifier):
4394 self.tag == their.tag and
4395 self._expl == their._expl and
4396 self._value == their._value
4399 def __lt__(self, their):
4400 return self._value < their._value
4411 return self.__class__(
4413 defines=self.defines if defines is None else defines,
4414 impl=self.tag if impl is None else impl,
4415 expl=self._expl if expl is None else expl,
4416 default=self.default if default is None else default,
4417 optional=self.optional if optional is None else optional,
4420 def _encode_octets(self):
4421 self._assert_ready()
4423 first_value = value[1]
4424 first_arc = value[0]
4427 elif first_arc == 1:
4429 elif first_arc == 2:
4431 else: # pragma: no cover
4432 raise RuntimeError("invalid arc is stored")
4433 octets = [zero_ended_encode(first_value)]
4434 for arc in value[2:]:
4435 octets.append(zero_ended_encode(arc))
4436 return b"".join(octets)
4439 v = self._encode_octets()
4440 return b"".join((self.tag, len_encode(len(v)), v))
4442 def _encode1st(self, state):
4443 l = len(self._encode_octets())
4444 return len(self.tag) + len_size(l) + l, state
4446 def _encode2nd(self, writer, state_iter):
4447 write_full(writer, self._encode())
4449 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
4451 t, _, lv = tag_strip(tlv)
4452 except DecodeError as err:
4453 raise err.__class__(
4455 klass=self.__class__,
4456 decode_path=decode_path,
4461 klass=self.__class__,
4462 decode_path=decode_path,
4465 if tag_only: # pragma: no cover
4469 l, llen, v = len_decode(lv)
4470 except DecodeError as err:
4471 raise err.__class__(
4473 klass=self.__class__,
4474 decode_path=decode_path,
4478 raise NotEnoughData(
4479 "encoded length is longer than data",
4480 klass=self.__class__,
4481 decode_path=decode_path,
4485 raise NotEnoughData(
4487 klass=self.__class__,
4488 decode_path=decode_path,
4491 v, tail = v[:l], v[l:]
4499 if i == 0 and octet == 0x80:
4500 if ctx.get("bered", False):
4504 "non normalized arc encoding",
4505 klass=self.__class__,
4506 decode_path=decode_path,
4509 arc = (arc << 7) | (octet & 0x7F)
4510 if octet & 0x80 == 0:
4513 except OverflowError:
4515 "too huge value for local unsigned long",
4516 klass=self.__class__,
4517 decode_path=decode_path,
4526 klass=self.__class__,
4527 decode_path=decode_path,
4531 second_arc = arcs[0]
4532 if 0 <= second_arc <= 39:
4534 elif 40 <= second_arc <= 79:
4540 obj = self.__class__(
4541 value=array("L", (first_arc, second_arc)) + arcs[1:],
4544 default=self.default,
4545 optional=self.optional,
4546 _decoded=(offset, llen, l),
4549 obj.ber_encoded = True
4550 yield decode_path, obj, tail
4553 return pp_console_row(next(self.pps()))
4555 def pps(self, decode_path=()):
4558 asn1_type_name=self.asn1_type_name,
4559 obj_name=self.__class__.__name__,
4560 decode_path=decode_path,
4561 value=str(self) if self.ready else None,
4562 optional=self.optional,
4563 default=self == self.default,
4564 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4565 expl=None if self._expl is None else tag_decode(self._expl),
4570 expl_offset=self.expl_offset if self.expled else None,
4571 expl_tlen=self.expl_tlen if self.expled else None,
4572 expl_llen=self.expl_llen if self.expled else None,
4573 expl_vlen=self.expl_vlen if self.expled else None,
4574 expl_lenindef=self.expl_lenindef,
4575 ber_encoded=self.ber_encoded,
4578 for pp in self.pps_lenindef(decode_path):
4582 class Enumerated(Integer):
4583 """``ENUMERATED`` integer type
4585 This type is identical to :py:class:`pyderasn.Integer`, but requires
4586 schema to be specified and does not accept values missing from it.
4589 tag_default = tag_encode(10)
4590 asn1_type_name = "ENUMERATED"
4601 bounds=None, # dummy argument, workability for Integer.decode
4604 value, bounds, impl, expl, default, optional, _specs, _decoded,
4606 if len(self.specs) == 0:
4607 raise ValueError("schema must be specified")
4609 def _value_sanitize(self, value):
4610 if isinstance(value, self.__class__):
4611 value = value._value
4612 elif isinstance(value, int):
4613 for _value in self.specs.values():
4618 "unknown integer value: %s" % value,
4619 klass=self.__class__,
4621 elif isinstance(value, str):
4622 value = self.specs.get(value)
4624 raise ObjUnknown("integer value: %s" % value)
4626 raise InvalidValueType((self.__class__, int, str))
4638 return self.__class__(
4640 impl=self.tag if impl is None else impl,
4641 expl=self._expl if expl is None else expl,
4642 default=self.default if default is None else default,
4643 optional=self.optional if optional is None else optional,
4648 def escape_control_unicode(c):
4649 if unicat(c)[0] == "C":
4650 c = repr(c).lstrip("u").strip("'")
4654 class CommonString(OctetString):
4655 """Common class for all strings
4657 Everything resembles :py:class:`pyderasn.OctetString`, except
4658 ability to deal with unicode text strings.
4660 >>> hexenc("привет мир".encode("utf-8"))
4661 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4662 >>> UTF8String("привет мир") == UTF8String(hexdec("d0...80"))
4664 >>> s = UTF8String("привет мир")
4665 UTF8String UTF8String привет мир
4667 'привет мир'
4668 >>> hexenc(bytes(s))
4669 'd0bfd180d0b8d0b2d0b5d18220d0bcd0b8d180'
4671 >>> PrintableString("привет мир")
4672 Traceback (most recent call last):
4673 pyderasn.DecodeError: 'ascii' codec can't encode characters in position 0-5: ordinal not in range(128)
4675 >>> BMPString("ада", bounds=(2, 2))
4676 Traceback (most recent call last):
4677 pyderasn.BoundsError: unsatisfied bounds: 2 <= 3 <= 2
4678 >>> s = BMPString("ад", bounds=(2, 2))
4681 >>> hexenc(bytes(s))
4688 - Text Encoding, validation
4689 * - :py:class:`pyderasn.UTF8String`
4691 * - :py:class:`pyderasn.NumericString`
4692 - proper alphabet validation
4693 * - :py:class:`pyderasn.PrintableString`
4694 - proper alphabet validation
4695 * - :py:class:`pyderasn.TeletexString`
4697 * - :py:class:`pyderasn.T61String`
4699 * - :py:class:`pyderasn.VideotexString`
4701 * - :py:class:`pyderasn.IA5String`
4702 - proper alphabet validation
4703 * - :py:class:`pyderasn.GraphicString`
4705 * - :py:class:`pyderasn.VisibleString`, :py:class:`pyderasn.ISO646String`
4706 - proper alphabet validation
4707 * - :py:class:`pyderasn.GeneralString`
4709 * - :py:class:`pyderasn.UniversalString`
4711 * - :py:class:`pyderasn.BMPString`
4715 memoryview_safe = False
4717 def _value_sanitize(self, value):
4719 value_decoded = None
4720 if isinstance(value, self.__class__):
4721 value_raw = value._value
4722 elif value.__class__ == str:
4723 value_decoded = value
4724 elif value.__class__ == bytes:
4727 raise InvalidValueType((self.__class__, str, bytes))
4730 value_decoded.encode(self.encoding)
4731 if value_raw is None else value_raw
4734 value_raw.decode(self.encoding)
4735 if value_decoded is None else value_decoded
4737 except (UnicodeEncodeError, UnicodeDecodeError) as err:
4738 raise DecodeError(str(err))
4739 if not self._bound_min <= len(value_decoded) <= self._bound_max:
4747 def __eq__(self, their):
4748 if their.__class__ == bytes:
4749 return self._value == their
4750 if their.__class__ == str:
4751 return self._value == their.encode(self.encoding)
4752 if not isinstance(their, self.__class__):
4755 self._value == their._value and
4756 self.tag == their.tag and
4757 self._expl == their._expl
4760 def __unicode__(self):
4762 return self._value.decode(self.encoding)
4763 return str(self._value)
4765 def memoryview(self):
4766 raise ValueError("CommonString does not support .memoryview()")
4769 return pp_console_row(next(self.pps()))
4771 def pps(self, decode_path=()):
4774 value = "".join(escape_control_unicode(c) for c in self.__unicode__())
4777 asn1_type_name=self.asn1_type_name,
4778 obj_name=self.__class__.__name__,
4779 decode_path=decode_path,
4781 optional=self.optional,
4782 default=self == self.default,
4783 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
4784 expl=None if self._expl is None else tag_decode(self._expl),
4789 expl_offset=self.expl_offset if self.expled else None,
4790 expl_tlen=self.expl_tlen if self.expled else None,
4791 expl_llen=self.expl_llen if self.expled else None,
4792 expl_vlen=self.expl_vlen if self.expled else None,
4793 expl_lenindef=self.expl_lenindef,
4794 ber_encoded=self.ber_encoded,
4797 for pp in self.pps_lenindef(decode_path):
4801 class UTF8String(CommonString):
4803 tag_default = tag_encode(12)
4805 asn1_type_name = "UTF8String"
4808 class AllowableCharsMixin:
4812 def allowable_chars(self):
4813 return frozenset(chr(c) for c in self._allowable_chars)
4815 def _value_sanitize(self, value):
4816 value = super()._value_sanitize(value)
4817 if not frozenset(value) <= self._allowable_chars:
4818 raise DecodeError("non satisfying alphabet value")
4822 NUMERIC_ALLOWABLE_CHARS = frozenset(digits.encode("ascii") + b" ")
4825 class NumericString(AllowableCharsMixin, CommonString):
4828 Its value is properly sanitized: only ASCII digits with spaces can
4831 >>> NumericString().allowable_chars
4832 frozenset(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '])
4834 __slots__ = ("_allowable_chars",)
4835 tag_default = tag_encode(18)
4837 asn1_type_name = "NumericString"
4839 def __init__(self, *args, **kwargs):
4840 self._allowable_chars = NUMERIC_ALLOWABLE_CHARS
4841 super().__init__(*args, **kwargs)
4844 PrintableStringState = namedtuple(
4845 "PrintableStringState",
4846 OctetStringState._fields + ("allowable_chars",),
4851 PRINTABLE_ALLOWABLE_CHARS = frozenset(
4852 (ascii_letters + digits + " '()+,-./:=?").encode("ascii")
4856 class PrintableString(AllowableCharsMixin, CommonString):
4859 Its value is properly sanitized: see X.680 41.4 table 10.
4861 >>> PrintableString().allowable_chars
4862 frozenset([' ', "'", ..., 'z'])
4863 >>> obj = PrintableString("foo*bar", allow_asterisk=True)
4864 PrintableString PrintableString foo*bar
4865 >>> obj.allow_asterisk, obj.allow_ampersand
4868 __slots__ = ("_allowable_chars",)
4869 tag_default = tag_encode(19)
4871 asn1_type_name = "PrintableString"
4872 _asterisk = frozenset("*".encode("ascii"))
4873 _ampersand = frozenset("&".encode("ascii"))
4885 allow_asterisk=False,
4886 allow_ampersand=False,
4889 :param allow_asterisk: allow asterisk character
4890 :param allow_ampersand: allow ampersand character
4892 allowable_chars = PRINTABLE_ALLOWABLE_CHARS
4894 allowable_chars |= self._asterisk
4896 allowable_chars |= self._ampersand
4897 self._allowable_chars = allowable_chars
4899 value, bounds, impl, expl, default, optional, _decoded, ctx,
4903 def allow_asterisk(self):
4904 """Is asterisk character allowed?
4906 return self._asterisk <= self._allowable_chars
4909 def allow_ampersand(self):
4910 """Is ampersand character allowed?
4912 return self._ampersand <= self._allowable_chars
4914 def __getstate__(self):
4915 return PrintableStringState(
4916 *super().__getstate__(),
4917 **{"allowable_chars": self._allowable_chars}
4920 def __setstate__(self, state):
4921 super().__setstate__(state)
4922 self._allowable_chars = state.allowable_chars
4933 return self.__class__(
4936 (self._bound_min, self._bound_max)
4937 if bounds is None else bounds
4939 impl=self.tag if impl is None else impl,
4940 expl=self._expl if expl is None else expl,
4941 default=self.default if default is None else default,
4942 optional=self.optional if optional is None else optional,
4943 allow_asterisk=self.allow_asterisk,
4944 allow_ampersand=self.allow_ampersand,
4948 class TeletexString(CommonString):
4950 tag_default = tag_encode(20)
4951 encoding = "iso-8859-1"
4952 asn1_type_name = "TeletexString"
4955 class T61String(TeletexString):
4957 asn1_type_name = "T61String"
4960 class VideotexString(CommonString):
4962 tag_default = tag_encode(21)
4963 encoding = "iso-8859-1"
4964 asn1_type_name = "VideotexString"
4967 IA5_ALLOWABLE_CHARS = frozenset(b"".join(
4968 chr(c).encode("ascii") for c in range(128)
4972 class IA5String(AllowableCharsMixin, CommonString):
4975 Its value is properly sanitized: it is a mix of
4977 * http://www.itscj.ipsj.or.jp/iso-ir/006.pdf (G)
4978 * http://www.itscj.ipsj.or.jp/iso-ir/001.pdf (C0)
4979 * DEL character (0x7F)
4981 It is just 7-bit ASCII.
4983 >>> IA5String().allowable_chars
4984 frozenset(["NUL", ... "DEL"])
4986 __slots__ = ("_allowable_chars",)
4987 tag_default = tag_encode(22)
4989 asn1_type_name = "IA5"
4991 def __init__(self, *args, **kwargs):
4992 self._allowable_chars = IA5_ALLOWABLE_CHARS
4993 super().__init__(*args, **kwargs)
4996 LEN_YYMMDDHHMMSSZ = len("YYMMDDHHMMSSZ")
4997 LEN_LEN_YYMMDDHHMMSSZ = len_encode(LEN_YYMMDDHHMMSSZ)
4998 LEN_YYMMDDHHMMSSZ_WITH_LEN = len(LEN_LEN_YYMMDDHHMMSSZ) + LEN_YYMMDDHHMMSSZ
4999 LEN_YYYYMMDDHHMMSSDMZ = len("YYYYMMDDHHMMSSDMZ")
5000 LEN_YYYYMMDDHHMMSSZ = len("YYYYMMDDHHMMSSZ")
5001 LEN_LEN_YYYYMMDDHHMMSSZ = len_encode(LEN_YYYYMMDDHHMMSSZ)
5004 VISIBLE_ALLOWABLE_CHARS = frozenset(b"".join(
5005 chr(c).encode("ascii") for c in range(ord(" "), ord("~") + 1)
5009 class VisibleString(AllowableCharsMixin, CommonString):
5012 Its value is properly sanitized. ASCII subset from space to tilde is
5013 allowed: http://www.itscj.ipsj.or.jp/iso-ir/006.pdf
5015 >>> VisibleString().allowable_chars
5016 frozenset([" ", ... "~"])
5018 __slots__ = ("_allowable_chars",)
5019 tag_default = tag_encode(26)
5021 asn1_type_name = "VisibleString"
5023 def __init__(self, *args, **kwargs):
5024 self._allowable_chars = VISIBLE_ALLOWABLE_CHARS
5025 super().__init__(*args, **kwargs)
5028 class ISO646String(VisibleString):
5030 asn1_type_name = "ISO646String"
5033 UTCTimeState = namedtuple(
5035 OctetStringState._fields + ("ber_raw",),
5040 def str_to_time_fractions(value):
5042 year, v = (v // 10**10), (v % 10**10)
5043 month, v = (v // 10**8), (v % 10**8)
5044 day, v = (v // 10**6), (v % 10**6)
5045 hour, v = (v // 10**4), (v % 10**4)
5046 minute, second = (v // 100), (v % 100)
5047 return year, month, day, hour, minute, second
5050 class UTCTime(VisibleString):
5051 """``UTCTime`` datetime type
5053 >>> t = UTCTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5054 UTCTime UTCTime 2017-09-30T22:07:50
5060 datetime.datetime(2017, 9, 30, 22, 7, 50)
5061 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).todatetime()
5062 datetime.datetime(1957, 9, 30, 22, 7, 50)
5063 >>> UTCTime(datetime(2057, 9, 30, 22, 7, 50)).totzdatetime()
5064 datetime.datetime(1957, 9, 30, 22, 7, 50, tzinfo=tzutc())
5066 If BER encoded value was met, then ``ber_raw`` attribute will hold
5067 its raw representation.
5071 Only **naive** ``datetime`` objects are supported.
5072 Library assumes that all work is done in UTC.
5076 Pay attention that ``UTCTime`` can not hold full year, so all years
5077 having < 50 years are treated as 20xx, 19xx otherwise, according to
5078 X.509 recommendation. Use ``GeneralizedTime`` instead for
5083 No strict validation of UTC offsets are made (only applicable to
5084 **BER**), but very crude:
5086 * minutes are not exceeding 60
5087 * offset value is not exceeding 14 hours
5089 __slots__ = ("ber_raw",)
5090 tag_default = tag_encode(23)
5092 asn1_type_name = "UTCTime"
5093 evgen_mode_skip_value = False
5103 bounds=None, # dummy argument, workability for OctetString.decode
5107 :param value: set the value. Either datetime type, or
5108 :py:class:`pyderasn.UTCTime` object
5109 :param bytes impl: override default tag with ``IMPLICIT`` one
5110 :param bytes expl: override default tag with ``EXPLICIT`` one
5111 :param default: set default value. Type same as in ``value``
5112 :param bool optional: is object ``OPTIONAL`` in sequence
5114 super().__init__(None, None, impl, expl, None, optional, _decoded, ctx)
5117 if value is not None:
5118 self._value, self.ber_raw = self._value_sanitize(value, ctx)
5119 self.ber_encoded = self.ber_raw is not None
5120 if default is not None:
5121 default, _ = self._value_sanitize(default)
5122 self.default = self.__class__(
5127 if self._value is None:
5128 self._value = default
5130 self.optional = optional
5132 def _strptime_bered(self, value):
5133 year, month, day, hour, minute, _ = str_to_time_fractions(value[:10] + "00")
5136 raise ValueError("no timezone")
5137 year += 2000 if year < 50 else 1900
5138 decoded = datetime(year, month, day, hour, minute)
5140 if value[-1] == "Z":
5144 raise ValueError("invalid UTC offset")
5145 if value[-5] == "-":
5147 elif value[-5] == "+":
5150 raise ValueError("invalid UTC offset")
5151 v = pureint(value[-4:])
5152 offset, v = (60 * (v % 100)), v // 100
5154 raise ValueError("invalid UTC offset minutes")
5156 if offset > 14 * 3600:
5157 raise ValueError("too big UTC offset")
5161 return offset, decoded
5163 raise ValueError("invalid UTC offset seconds")
5164 seconds = pureint(value)
5166 raise ValueError("invalid seconds value")
5167 return offset, decoded + timedelta(seconds=seconds)
5169 def _strptime(self, value):
5170 # datetime.strptime's format: %y%m%d%H%M%SZ
5171 if len(value) != LEN_YYMMDDHHMMSSZ:
5172 raise ValueError("invalid UTCTime length")
5173 if value[-1] != "Z":
5174 raise ValueError("non UTC timezone")
5175 year, month, day, hour, minute, second = str_to_time_fractions(value[:-1])
5176 year += 2000 if year < 50 else 1900
5177 return datetime(year, month, day, hour, minute, second)
5179 def _dt_sanitize(self, value):
5180 if value.year < 1950 or value.year > 2049:
5181 raise ValueError("UTCTime can hold only 1950-2049 years")
5182 return value.replace(microsecond=0)
5184 def _value_sanitize(self, value, ctx=None):
5185 if value.__class__ == bytes:
5187 value_decoded = value.decode("ascii")
5188 except (UnicodeEncodeError, UnicodeDecodeError) as err:
5189 raise DecodeError("invalid UTCTime encoding: %r" % err)
5192 return self._strptime(value_decoded), None
5193 except (TypeError, ValueError) as _err:
5195 if (ctx is not None) and ctx.get("bered", False):
5197 offset, _value = self._strptime_bered(value_decoded)
5198 _value = _value - timedelta(seconds=offset)
5199 return self._dt_sanitize(_value), value
5200 except (TypeError, ValueError, OverflowError) as _err:
5203 "invalid %s format: %r" % (self.asn1_type_name, err),
5204 klass=self.__class__,
5206 if isinstance(value, self.__class__):
5207 return value._value, None
5208 if value.__class__ == datetime:
5209 if value.tzinfo is not None:
5210 raise ValueError("only naive datetime supported")
5211 return self._dt_sanitize(value), None
5212 raise InvalidValueType((self.__class__, datetime))
5214 def _pp_value(self):
5216 value = self._value.isoformat()
5217 if self.ber_encoded:
5218 value += " (%s)" % self.ber_raw
5222 def __unicode__(self):
5224 value = self._value.isoformat()
5225 if self.ber_encoded:
5226 value += " (%s)" % self.ber_raw
5228 return str(self._pp_value())
5230 def __getstate__(self):
5231 return UTCTimeState(*super().__getstate__(), **{"ber_raw": self.ber_raw})
5233 def __setstate__(self, state):
5234 super().__setstate__(state)
5235 self.ber_raw = state.ber_raw
5237 def __bytes__(self):
5238 self._assert_ready()
5239 return self._encode_time()
5241 def __eq__(self, their):
5242 if their.__class__ == bytes:
5243 return self._encode_time() == their
5244 if their.__class__ == datetime:
5245 return self.todatetime() == their
5246 if not isinstance(their, self.__class__):
5249 self._value == their._value and
5250 self.tag == their.tag and
5251 self._expl == their._expl
5254 def _encode_time(self):
5255 return self._value.strftime("%y%m%d%H%M%SZ").encode("ascii")
5258 self._assert_ready()
5259 return b"".join((self.tag, LEN_LEN_YYMMDDHHMMSSZ, self._encode_time()))
5261 def _encode1st(self, state):
5262 return len(self.tag) + LEN_YYMMDDHHMMSSZ_WITH_LEN, state
5264 def _encode2nd(self, writer, state_iter):
5265 self._assert_ready()
5266 write_full(writer, self._encode())
5268 def _encode_cer(self, writer):
5269 write_full(writer, self._encode())
5271 def todatetime(self):
5274 def totzdatetime(self):
5276 return self._value.replace(tzinfo=tzUTC)
5277 except TypeError as err:
5278 raise NotImplementedError("Missing dateutil.tz") from err
5281 return pp_console_row(next(self.pps()))
5283 def pps(self, decode_path=()):
5286 asn1_type_name=self.asn1_type_name,
5287 obj_name=self.__class__.__name__,
5288 decode_path=decode_path,
5289 value=self._pp_value(),
5290 optional=self.optional,
5291 default=self == self.default,
5292 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5293 expl=None if self._expl is None else tag_decode(self._expl),
5298 expl_offset=self.expl_offset if self.expled else None,
5299 expl_tlen=self.expl_tlen if self.expled else None,
5300 expl_llen=self.expl_llen if self.expled else None,
5301 expl_vlen=self.expl_vlen if self.expled else None,
5302 expl_lenindef=self.expl_lenindef,
5303 ber_encoded=self.ber_encoded,
5306 for pp in self.pps_lenindef(decode_path):
5310 class GeneralizedTime(UTCTime):
5311 """``GeneralizedTime`` datetime type
5313 This type is similar to :py:class:`pyderasn.UTCTime`.
5315 >>> t = GeneralizedTime(datetime(2017, 9, 30, 22, 7, 50, 123))
5316 GeneralizedTime GeneralizedTime 2017-09-30T22:07:50.000123
5318 '20170930220750.000123Z'
5319 >>> t = GeneralizedTime(datetime(2057, 9, 30, 22, 7, 50))
5320 GeneralizedTime GeneralizedTime 2057-09-30T22:07:50
5324 Only **naive** datetime objects are supported.
5325 Library assumes that all work is done in UTC.
5329 Only **microsecond** fractions are supported in DER encoding.
5330 :py:exc:`pyderasn.DecodeError` will be raised during decoding of
5331 higher precision values.
5335 **BER** encoded data can loss information (accuracy) during
5336 decoding because of float transformations.
5340 **Zero** year is unsupported.
5343 tag_default = tag_encode(24)
5344 asn1_type_name = "GeneralizedTime"
5346 def _dt_sanitize(self, value):
5349 def _strptime_bered(self, value):
5350 if len(value) < 4 + 3 * 2:
5351 raise ValueError("invalid GeneralizedTime")
5352 year, month, day, hour, _, _ = str_to_time_fractions(value[:10] + "0000")
5353 decoded = datetime(year, month, day, hour)
5354 offset, value = 0, value[10:]
5356 return offset, decoded
5357 if value[-1] == "Z":
5360 for char, sign in (("-", -1), ("+", 1)):
5361 idx = value.rfind(char)
5364 offset_raw, value = value[idx + 1:].replace(":", ""), value[:idx]
5365 v = pureint(offset_raw)
5366 if len(offset_raw) == 4:
5367 offset, v = (60 * (v % 100)), v // 100
5369 raise ValueError("invalid UTC offset minutes")
5370 elif len(offset_raw) == 2:
5373 raise ValueError("invalid UTC offset")
5375 if offset > 14 * 3600:
5376 raise ValueError("too big UTC offset")
5380 return offset, decoded
5381 if value[0] in DECIMAL_SIGNS:
5383 decoded + timedelta(seconds=3600 * fractions2float(value[1:]))
5386 raise ValueError("stripped minutes")
5387 decoded += timedelta(seconds=60 * pureint(value[:2]))
5390 return offset, decoded
5391 if value[0] in DECIMAL_SIGNS:
5393 decoded + timedelta(seconds=60 * fractions2float(value[1:]))
5396 raise ValueError("stripped seconds")
5397 decoded += timedelta(seconds=pureint(value[:2]))
5400 return offset, decoded
5401 if value[0] not in DECIMAL_SIGNS:
5402 raise ValueError("invalid format after seconds")
5404 decoded + timedelta(microseconds=10**6 * fractions2float(value[1:]))
5407 def _strptime(self, value):
5409 if l == LEN_YYYYMMDDHHMMSSZ:
5410 # datetime.strptime's format: %Y%m%d%H%M%SZ
5411 if value[-1] != "Z":
5412 raise ValueError("non UTC timezone")
5413 return datetime(*str_to_time_fractions(value[:-1]))
5414 if l >= LEN_YYYYMMDDHHMMSSDMZ:
5415 # datetime.strptime's format: %Y%m%d%H%M%S.%fZ
5416 if value[-1] != "Z":
5417 raise ValueError("non UTC timezone")
5418 if value[14] != ".":
5419 raise ValueError("no fractions separator")
5422 raise ValueError("trailing zero")
5425 raise ValueError("only microsecond fractions are supported")
5426 us = pureint(us + ("0" * (6 - us_len)))
5427 year, month, day, hour, minute, second = str_to_time_fractions(value[:14])
5428 return datetime(year, month, day, hour, minute, second, us)
5429 raise ValueError("invalid GeneralizedTime length")
5431 def _encode_time(self):
5433 encoded = value.strftime("%Y%m%d%H%M%S")
5434 if value.microsecond > 0:
5435 encoded += (".%06d" % value.microsecond).rstrip("0")
5436 return (encoded + "Z").encode("ascii")
5439 self._assert_ready()
5441 if value.microsecond > 0:
5442 encoded = self._encode_time()
5443 return b"".join((self.tag, len_encode(len(encoded)), encoded))
5444 return b"".join((self.tag, LEN_LEN_YYYYMMDDHHMMSSZ, self._encode_time()))
5446 def _encode1st(self, state):
5447 self._assert_ready()
5448 vlen = len(self._encode_time())
5449 return len(self.tag) + len_size(vlen) + vlen, state
5451 def _encode2nd(self, writer, state_iter):
5452 write_full(writer, self._encode())
5455 class GraphicString(CommonString):
5457 tag_default = tag_encode(25)
5458 encoding = "iso-8859-1"
5459 asn1_type_name = "GraphicString"
5462 class GeneralString(CommonString):
5464 tag_default = tag_encode(27)
5465 encoding = "iso-8859-1"
5466 asn1_type_name = "GeneralString"
5469 class UniversalString(CommonString):
5471 tag_default = tag_encode(28)
5472 encoding = "utf-32-be"
5473 asn1_type_name = "UniversalString"
5476 class BMPString(CommonString):
5478 tag_default = tag_encode(30)
5479 encoding = "utf-16-be"
5480 asn1_type_name = "BMPString"
5483 ChoiceState = namedtuple(
5485 BasicState._fields + ("specs", "value",),
5491 """``CHOICE`` special type
5495 class GeneralName(Choice):
5497 ("rfc822Name", IA5String(impl=tag_ctxp(1))),
5498 ("dNSName", IA5String(impl=tag_ctxp(2))),
5501 >>> gn = GeneralName()
5503 >>> gn["rfc822Name"] = IA5String("foo@bar.baz")
5504 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5505 >>> gn["dNSName"] = IA5String("bar.baz")
5506 GeneralName CHOICE dNSName[[2] IA5String IA5 bar.baz]
5507 >>> gn["rfc822Name"]
5510 [2] IA5String IA5 bar.baz
5513 >>> gn.value == gn["dNSName"]
5516 OrderedDict([('rfc822Name', [1] IA5String IA5), ('dNSName', [2] IA5String IA5)])
5518 >>> GeneralName(("rfc822Name", IA5String("foo@bar.baz")))
5519 GeneralName CHOICE rfc822Name[[1] IA5String IA5 foo@bar.baz]
5521 __slots__ = ("specs",)
5523 asn1_type_name = "CHOICE"
5536 :param value: set the value. Either ``(choice, value)`` tuple, or
5537 :py:class:`pyderasn.Choice` object
5538 :param bytes impl: can not be set, do **not** use it
5539 :param bytes expl: override default tag with ``EXPLICIT`` one
5540 :param default: set default value. Type same as in ``value``
5541 :param bool optional: is object ``OPTIONAL`` in sequence
5543 if impl is not None:
5544 raise ValueError("no implicit tag allowed for CHOICE")
5545 super().__init__(None, expl, default, optional, _decoded)
5547 schema = getattr(self, "schema", ())
5548 if len(schema) == 0:
5549 raise ValueError("schema must be specified")
5551 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
5554 if value is not None:
5555 self._value = self._value_sanitize(value)
5556 if default is not None:
5557 default_value = self._value_sanitize(default)
5558 default_obj = self.__class__(impl=self.tag, expl=self._expl)
5559 default_obj.specs = self.specs
5560 default_obj._value = default_value
5561 self.default = default_obj
5563 self._value = copy(default_obj._value)
5564 if self._expl is not None:
5565 tag_class, _, tag_num = tag_decode(self._expl)
5566 self._tag_order = (tag_class, tag_num)
5568 def _value_sanitize(self, value):
5569 if (value.__class__ == tuple) and len(value) == 2:
5571 spec = self.specs.get(choice)
5573 raise ObjUnknown(choice)
5574 if not isinstance(obj, spec.__class__):
5575 raise InvalidValueType((spec,))
5576 return (choice, spec(obj))
5577 if isinstance(value, self.__class__):
5579 raise InvalidValueType((self.__class__, tuple))
5583 return self._value is not None and self._value[1].ready
5587 return self.expl_lenindef or (
5588 (self._value is not None) and
5589 self._value[1].bered
5592 def __getstate__(self):
5610 def __setstate__(self, state):
5611 super().__setstate__(state)
5612 self.specs = state.specs
5613 self._value = state.value
5615 def __eq__(self, their):
5616 if (their.__class__ == tuple) and len(their) == 2:
5617 return self._value == their
5618 if not isinstance(their, self.__class__):
5621 self.specs == their.specs and
5622 self._value == their._value
5632 return self.__class__(
5635 expl=self._expl if expl is None else expl,
5636 default=self.default if default is None else default,
5637 optional=self.optional if optional is None else optional,
5642 """Name of the choice
5644 self._assert_ready()
5645 return self._value[0]
5649 """Value of underlying choice
5651 self._assert_ready()
5652 return self._value[1]
5655 def tag_order(self):
5656 self._assert_ready()
5657 return self._value[1].tag_order if self._tag_order is None else self._tag_order
5660 def tag_order_cer(self):
5661 return min(v.tag_order_cer for v in self.specs.values())
5663 def __getitem__(self, key):
5664 if key not in self.specs:
5665 raise ObjUnknown(key)
5666 if self._value is None:
5668 choice, value = self._value
5673 def __setitem__(self, key, value):
5674 spec = self.specs.get(key)
5676 raise ObjUnknown(key)
5677 if not isinstance(value, spec.__class__):
5678 raise InvalidValueType((spec.__class__,))
5679 self._value = (key, spec(value))
5687 return self._value[1].decoded if self.ready else False
5690 self._assert_ready()
5691 return self._value[1].encode()
5693 def _encode1st(self, state):
5694 self._assert_ready()
5695 return self._value[1].encode1st(state)
5697 def _encode2nd(self, writer, state_iter):
5698 self._value[1].encode2nd(writer, state_iter)
5700 def _encode_cer(self, writer):
5701 self._assert_ready()
5702 self._value[1].encode_cer(writer)
5704 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
5705 for choice, spec in self.specs.items():
5706 sub_decode_path = decode_path + (choice,)
5712 decode_path=sub_decode_path,
5715 _ctx_immutable=False,
5722 klass=self.__class__,
5723 decode_path=decode_path,
5726 if tag_only: # pragma: no cover
5730 for _decode_path, value, tail in spec.decode_evgen(
5734 decode_path=sub_decode_path,
5736 _ctx_immutable=False,
5738 yield _decode_path, value, tail
5740 _, value, tail = next(spec.decode_evgen(
5744 decode_path=sub_decode_path,
5746 _ctx_immutable=False,
5749 obj = self.__class__(
5752 default=self.default,
5753 optional=self.optional,
5754 _decoded=(offset, 0, value.fulllen),
5756 obj._value = (choice, value)
5757 yield decode_path, obj, tail
5760 value = pp_console_row(next(self.pps()))
5762 value = "%s[%r]" % (value, self.value)
5765 def pps(self, decode_path=()):
5768 asn1_type_name=self.asn1_type_name,
5769 obj_name=self.__class__.__name__,
5770 decode_path=decode_path,
5771 value=self.choice if self.ready else None,
5772 optional=self.optional,
5773 default=self == self.default,
5774 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
5775 expl=None if self._expl is None else tag_decode(self._expl),
5780 expl_lenindef=self.expl_lenindef,
5784 yield self.value.pps(decode_path=decode_path + (self.choice,))
5785 for pp in self.pps_lenindef(decode_path):
5789 class PrimitiveTypes(Choice):
5790 """Predefined ``CHOICE`` for all generic primitive types
5792 It could be useful for general decoding of some unspecified values:
5794 >>> PrimitiveTypes().decod(hexdec("0403666f6f")).value
5795 OCTET STRING 3 bytes 666f6f
5796 >>> PrimitiveTypes().decod(hexdec("0203123456")).value
5800 schema = tuple((klass.__name__, klass()) for klass in (
5824 AnyState = namedtuple(
5826 BasicState._fields + ("value", "defined"),
5832 """``ANY`` special type
5834 >>> Any(Integer(-123))
5835 ANY INTEGER -123 (0X:7B)
5836 >>> a = Any(OctetString(b"hello world").encode())
5837 ANY 040b68656c6c6f20776f726c64
5838 >>> hexenc(bytes(a))
5839 b'0x040x0bhello world'
5841 __slots__ = ("defined",)
5842 tag_default = tag_encode(0)
5843 asn1_type_name = "ANY"
5853 :param value: set the value. Either any kind of pyderasn's
5854 **ready** object, or bytes. Pay attention that
5855 **no** validation is performed if raw binary value
5856 is valid TLV, except just tag decoding
5857 :param bytes expl: override default tag with ``EXPLICIT`` one
5858 :param bool optional: is object ``OPTIONAL`` in sequence
5860 super().__init__(None, expl, None, optional, _decoded)
5864 value = self._value_sanitize(value)
5866 if self._expl is None:
5867 if value.__class__ == bytes or value.__class__ == memoryview:
5868 tag_class, _, tag_num = tag_decode(tag_strip(value)[0])
5870 tag_class, tag_num = value.tag_order
5872 tag_class, _, tag_num = tag_decode(self._expl)
5873 self._tag_order = (tag_class, tag_num)
5876 def _value_sanitize(self, value):
5877 if value.__class__ == bytes or value.__class__ == memoryview:
5879 raise ValueError("%s value can not be empty" % self.__class__.__name__)
5881 if isinstance(value, self.__class__):
5883 if not isinstance(value, Obj):
5884 raise InvalidValueType((self.__class__, Obj, bytes))
5889 return self._value is not None
5892 def tag_order(self):
5893 self._assert_ready()
5894 return self._tag_order
5898 if self.expl_lenindef or self.lenindef:
5900 if self.defined is None:
5902 return self.defined[1].bered
5904 def __getstate__(self):
5922 def __setstate__(self, state):
5923 super().__setstate__(state)
5924 self._value = state.value
5925 self.defined = state.defined
5927 def __eq__(self, their):
5928 if their.__class__ == bytes or their.__class__ == memoryview:
5929 if self._value.__class__ == bytes or their.__class__ == memoryview:
5930 return self._value == their
5931 return self._value.encode() == their
5932 if issubclass(their.__class__, Any):
5933 if self.ready and their.ready:
5934 return self.memoryview() == their.memoryview()
5935 return self.ready == their.ready
5944 return self.__class__(
5946 expl=self._expl if expl is None else expl,
5947 optional=self.optional if optional is None else optional,
5950 def __bytes__(self):
5951 self._assert_ready()
5953 if value.__class__ == bytes:
5955 if value.__class__ == memoryview:
5957 return self._value.encode()
5959 def memoryview(self):
5960 self._assert_ready()
5962 if value.__class__ == memoryview:
5963 return memoryview(value)
5964 return memoryview(bytes(self))
5971 self._assert_ready()
5973 if value.__class__ == bytes or value.__class__ == memoryview:
5975 return value.encode()
5977 def _encode1st(self, state):
5978 self._assert_ready()
5980 if value.__class__ == bytes or value.__class__ == memoryview:
5981 return len(value), state
5982 return value.encode1st(state)
5984 def _encode2nd(self, writer, state_iter):
5986 if value.__class__ == bytes or value.__class__ == memoryview:
5987 write_full(writer, value)
5989 value.encode2nd(writer, state_iter)
5991 def _encode_cer(self, writer):
5992 self._assert_ready()
5994 if value.__class__ == bytes or value.__class__ == memoryview:
5995 write_full(writer, value)
5997 value.encode_cer(writer)
5999 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6001 t, tlen, lv = tag_strip(tlv)
6002 except DecodeError as err:
6003 raise err.__class__(
6005 klass=self.__class__,
6006 decode_path=decode_path,
6010 l, llen, v = len_decode(lv)
6011 except LenIndefForm as err:
6012 if not ctx.get("bered", False):
6013 raise err.__class__(
6015 klass=self.__class__,
6016 decode_path=decode_path,
6019 llen, vlen, v = 1, 0, lv[1:]
6020 sub_offset = offset + tlen + llen
6022 while v[:EOC_LEN].tobytes() != EOC:
6023 chunk, v = Any().decode(
6026 decode_path=decode_path + (str(chunk_i),),
6029 _ctx_immutable=False,
6031 vlen += chunk.tlvlen
6032 sub_offset += chunk.tlvlen
6034 tlvlen = tlen + llen + vlen + EOC_LEN
6035 obj = self.__class__(
6036 value=None if evgen_mode else tlv[:tlvlen].tobytes(),
6038 optional=self.optional,
6039 _decoded=(offset, 0, tlvlen),
6042 obj.tag = t.tobytes()
6043 yield decode_path, obj, v[EOC_LEN:]
6045 except DecodeError as err:
6046 raise err.__class__(
6048 klass=self.__class__,
6049 decode_path=decode_path,
6053 raise NotEnoughData(
6054 "encoded length is longer than data",
6055 klass=self.__class__,
6056 decode_path=decode_path,
6059 tlvlen = tlen + llen + l
6060 v, tail = tlv[:tlvlen], v[l:]
6063 elif ctx.get("keep_memoryview", False):
6067 obj = self.__class__(
6070 optional=self.optional,
6071 _decoded=(offset, 0, tlvlen),
6073 obj.tag = t.tobytes()
6074 yield decode_path, obj, tail
6077 return pp_console_row(next(self.pps()))
6079 def pps(self, decode_path=()):
6083 elif value.__class__ == bytes or value.__class__ == memoryview:
6089 asn1_type_name=self.asn1_type_name,
6090 obj_name=self.__class__.__name__,
6091 decode_path=decode_path,
6093 blob=self._value if (
6094 self._value.__class__ == bytes or
6095 value.__class__ == memoryview
6097 optional=self.optional,
6098 default=self == self.default,
6099 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6100 expl=None if self._expl is None else tag_decode(self._expl),
6105 expl_offset=self.expl_offset if self.expled else None,
6106 expl_tlen=self.expl_tlen if self.expled else None,
6107 expl_llen=self.expl_llen if self.expled else None,
6108 expl_vlen=self.expl_vlen if self.expled else None,
6109 expl_lenindef=self.expl_lenindef,
6110 lenindef=self.lenindef,
6113 defined_by, defined = self.defined or (None, None)
6114 if defined_by is not None:
6116 decode_path=decode_path + (DecodePathDefBy(defined_by),)
6118 for pp in self.pps_lenindef(decode_path):
6122 ########################################################################
6123 # ASN.1 constructed types
6124 ########################################################################
6126 def abs_decode_path(decode_path, rel_path):
6127 """Create an absolute decode path from current and relative ones
6129 :param decode_path: current decode path, starting point. Tuple of strings
6130 :param rel_path: relative path to ``decode_path``. Tuple of strings.
6131 If first tuple's element is "/", then treat it as
6132 an absolute path, ignoring ``decode_path`` as
6133 starting point. Also this tuple can contain ".."
6134 elements, stripping the leading element from
6137 >>> abs_decode_path(("foo", "bar"), ("baz", "whatever"))
6138 ("foo", "bar", "baz", "whatever")
6139 >>> abs_decode_path(("foo", "bar", "baz"), ("..", "..", "whatever"))
6141 >>> abs_decode_path(("foo", "bar"), ("/", "baz", "whatever"))
6144 if rel_path[0] == "/":
6146 if rel_path[0] == "..":
6147 return abs_decode_path(decode_path[:-1], rel_path[1:])
6148 return decode_path + rel_path
6151 SequenceState = namedtuple(
6153 BasicState._fields + ("specs", "value",),
6158 class SequenceEncode1stMixin:
6161 def _encode1st(self, state):
6163 idx = len(state) - 1
6165 for v in self._values_for_encoding():
6166 l, _ = v.encode1st(state)
6169 return len(self.tag) + len_size(vlen) + vlen, state
6172 class Sequence(SequenceEncode1stMixin, Obj):
6173 """``SEQUENCE`` structure type
6175 You have to make specification of sequence::
6177 class Extension(Sequence):
6179 ("extnID", ObjectIdentifier()),
6180 ("critical", Boolean(default=False)),
6181 ("extnValue", OctetString()),
6184 Then, you can work with it as with dictionary.
6186 >>> ext = Extension()
6187 >>> Extension().specs
6189 ('extnID', OBJECT IDENTIFIER),
6190 ('critical', BOOLEAN False OPTIONAL DEFAULT),
6191 ('extnValue', OCTET STRING),
6193 >>> ext["extnID"] = "1.2.3"
6194 Traceback (most recent call last):
6195 pyderasn.InvalidValueType: invalid value type, expected: <class 'pyderasn.ObjectIdentifier'>
6196 >>> ext["extnID"] = ObjectIdentifier("1.2.3")
6198 You can determine if sequence is ready to be encoded:
6203 Traceback (most recent call last):
6204 pyderasn.ObjNotReady: object is not ready: extnValue
6205 >>> ext["extnValue"] = OctetString(b"foobar")
6209 Value you want to assign, must have the same **type** as in
6210 corresponding specification, but it can have different tags,
6211 optional/default attributes -- they will be taken from specification
6214 class TBSCertificate(Sequence):
6216 ("version", Version(expl=tag_ctxc(0), default="v1")),
6219 >>> tbs = TBSCertificate()
6220 >>> tbs["version"] = Version("v2") # no need to explicitly add ``expl``
6222 Assign ``None`` to remove value from sequence.
6224 You can set values in Sequence during its initialization:
6226 >>> AlgorithmIdentifier((
6227 ("algorithm", ObjectIdentifier("1.2.3")),
6228 ("parameters", Any(Null()))
6230 AlgorithmIdentifier SEQUENCE[algorithm: OBJECT IDENTIFIER 1.2.3; parameters: ANY 0500 OPTIONAL]
6232 You can determine if value exists/set in the sequence and take its value:
6234 >>> "extnID" in ext, "extnValue" in ext, "critical" in ext
6237 OBJECT IDENTIFIER 1.2.3
6239 But pay attention that if value has default, then it won't be (not
6240 in) in the sequence (because ``DEFAULT`` must not be encoded in
6241 DER), but you can read its value:
6243 >>> "critical" in ext, ext["critical"]
6244 (False, BOOLEAN False)
6245 >>> ext["critical"] = Boolean(True)
6246 >>> "critical" in ext, ext["critical"]
6247 (True, BOOLEAN True)
6249 All defaulted values are always optional.
6251 .. _allow_default_values_ctx:
6253 DER prohibits default value encoding and will raise an error if
6254 default value is unexpectedly met during decode.
6255 If :ref:`bered <bered_ctx>` context option is set, then no error
6256 will be raised, but ``bered`` attribute set. You can disable strict
6257 defaulted values existence validation by setting
6258 ``"allow_default_values": True`` :ref:`context <ctx>` option.
6260 All values with DEFAULT specified are decoded atomically in
6261 :ref:`evgen mode <evgen_mode>`. If DEFAULT value is some kind of
6262 SEQUENCE, then it will be yielded as a single element, not
6263 disassembled. That is required for DEFAULT existence check.
6265 Two sequences are equal if they have equal specification (schema),
6266 implicit/explicit tagging and the same values.
6268 __slots__ = ("specs",)
6269 tag_default = tag_encode(form=TagFormConstructed, num=16)
6270 asn1_type_name = "SEQUENCE"
6282 super().__init__(impl, expl, default, optional, _decoded)
6284 schema = getattr(self, "schema", ())
6286 schema if schema.__class__ == OrderedDict else OrderedDict(schema)
6289 if value is not None:
6290 if issubclass(value.__class__, Sequence):
6291 self._value = value._value
6292 elif hasattr(value, "__iter__"):
6293 for seq_key, seq_value in value:
6294 self[seq_key] = seq_value
6296 raise InvalidValueType((Sequence,))
6297 if default is not None:
6298 if not issubclass(default.__class__, Sequence):
6299 raise InvalidValueType((Sequence,))
6300 default_value = default._value
6301 default_obj = self.__class__(impl=self.tag, expl=self._expl)
6302 default_obj.specs = self.specs
6303 default_obj._value = default_value
6304 self.default = default_obj
6306 self._value = copy(default_obj._value)
6310 for name, spec in self.specs.items():
6311 value = self._value.get(name)
6322 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6324 return any(value.bered for value in self._value.values())
6326 def __getstate__(self):
6327 return SequenceState(
6341 {k: copy(v) for k, v in self._value.items()},
6344 def __setstate__(self, state):
6345 super().__setstate__(state)
6346 self.specs = state.specs
6347 self._value = state.value
6349 def __eq__(self, their):
6350 if not isinstance(their, self.__class__):
6353 self.specs == their.specs and
6354 self.tag == their.tag and
6355 self._expl == their._expl and
6356 self._value == their._value
6367 return self.__class__(
6370 impl=self.tag if impl is None else impl,
6371 expl=self._expl if expl is None else expl,
6372 default=self.default if default is None else default,
6373 optional=self.optional if optional is None else optional,
6376 def __contains__(self, key):
6377 return key in self._value
6379 def __setitem__(self, key, value):
6380 spec = self.specs.get(key)
6382 raise ObjUnknown(key)
6384 self._value.pop(key, None)
6386 if not isinstance(value, spec.__class__):
6387 raise InvalidValueType((spec.__class__,))
6388 value = spec(value=value)
6389 if spec.default is not None and value == spec.default:
6390 self._value.pop(key, None)
6392 self._value[key] = value
6394 def __getitem__(self, key):
6395 value = self._value.get(key)
6396 if value is not None:
6398 spec = self.specs.get(key)
6400 raise ObjUnknown(key)
6401 if spec.default is not None:
6405 def _values_for_encoding(self):
6406 for name, spec in self.specs.items():
6407 value = self._value.get(name)
6411 raise ObjNotReady(name)
6415 v = b"".join(v.encode() for v in self._values_for_encoding())
6416 return b"".join((self.tag, len_encode(len(v)), v))
6418 def _encode2nd(self, writer, state_iter):
6419 write_full(writer, self.tag + len_encode(next(state_iter)))
6420 for v in self._values_for_encoding():
6421 v.encode2nd(writer, state_iter)
6423 def _encode_cer(self, writer):
6424 write_full(writer, self.tag + LENINDEF)
6425 for v in self._values_for_encoding():
6426 v.encode_cer(writer)
6427 write_full(writer, EOC)
6429 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6431 t, tlen, lv = tag_strip(tlv)
6432 except DecodeError as err:
6433 raise err.__class__(
6435 klass=self.__class__,
6436 decode_path=decode_path,
6441 klass=self.__class__,
6442 decode_path=decode_path,
6445 if tag_only: # pragma: no cover
6449 ctx_bered = ctx.get("bered", False)
6451 l, llen, v = len_decode(lv)
6452 except LenIndefForm as err:
6454 raise err.__class__(
6456 klass=self.__class__,
6457 decode_path=decode_path,
6460 l, llen, v = 0, 1, lv[1:]
6462 except DecodeError as err:
6463 raise err.__class__(
6465 klass=self.__class__,
6466 decode_path=decode_path,
6470 raise NotEnoughData(
6471 "encoded length is longer than data",
6472 klass=self.__class__,
6473 decode_path=decode_path,
6477 v, tail = v[:l], v[l:]
6479 sub_offset = offset + tlen + llen
6482 ctx_allow_default_values = ctx.get("allow_default_values", False)
6483 for name, spec in self.specs.items():
6484 if spec.optional and (
6485 (lenindef and v[:EOC_LEN].tobytes() == EOC) or
6489 spec_defaulted = spec.default is not None
6490 sub_decode_path = decode_path + (name,)
6492 if evgen_mode and not spec_defaulted:
6493 for _decode_path, value, v_tail in spec.decode_evgen(
6497 decode_path=sub_decode_path,
6499 _ctx_immutable=False,
6501 yield _decode_path, value, v_tail
6503 _, value, v_tail = next(spec.decode_evgen(
6507 decode_path=sub_decode_path,
6509 _ctx_immutable=False,
6512 except TagMismatch as err:
6513 if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
6517 defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
6518 if not evgen_mode and defined is not None:
6519 defined_by, defined_spec = defined
6520 if issubclass(value.__class__, SequenceOf):
6521 for i, _value in enumerate(value):
6522 sub_sub_decode_path = sub_decode_path + (
6524 DecodePathDefBy(defined_by),
6526 defined_value, defined_tail = defined_spec.decode(
6527 memoryview(bytes(_value)),
6529 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6530 if value.expled else (value.tlen + value.llen)
6533 decode_path=sub_sub_decode_path,
6535 _ctx_immutable=False,
6537 if len(defined_tail) > 0:
6540 klass=self.__class__,
6541 decode_path=sub_sub_decode_path,
6544 _value.defined = (defined_by, defined_value)
6546 defined_value, defined_tail = defined_spec.decode(
6547 memoryview(bytes(value)),
6549 (value.tlen + value.llen + value.expl_tlen + value.expl_llen)
6550 if value.expled else (value.tlen + value.llen)
6553 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6555 _ctx_immutable=False,
6557 if len(defined_tail) > 0:
6560 klass=self.__class__,
6561 decode_path=sub_decode_path + (DecodePathDefBy(defined_by),),
6564 value.defined = (defined_by, defined_value)
6566 value_len = value.fulllen
6568 sub_offset += value_len
6572 yield sub_decode_path, value, v_tail
6573 if value == spec.default:
6574 if ctx_bered or ctx_allow_default_values:
6578 "DEFAULT value met",
6579 klass=self.__class__,
6580 decode_path=sub_decode_path,
6584 values[name] = value
6585 spec_defines = getattr(spec, "defines", ())
6586 if len(spec_defines) == 0:
6587 defines_by_path = ctx.get("defines_by_path", ())
6588 if len(defines_by_path) > 0:
6589 spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
6590 if spec_defines is not None and len(spec_defines) > 0:
6591 for rel_path, schema in spec_defines:
6592 defined = schema.get(value, None)
6593 if defined is not None:
6594 ctx.setdefault("_defines", []).append((
6595 abs_decode_path(sub_decode_path[:-1], rel_path),
6599 if v[:EOC_LEN].tobytes() != EOC:
6602 klass=self.__class__,
6603 decode_path=decode_path,
6611 klass=self.__class__,
6612 decode_path=decode_path,
6615 obj = self.__class__(
6619 default=self.default,
6620 optional=self.optional,
6621 _decoded=(offset, llen, vlen),
6624 obj.lenindef = lenindef
6625 obj.ber_encoded = ber_encoded
6626 yield decode_path, obj, tail
6629 value = pp_console_row(next(self.pps()))
6631 for name in self.specs:
6632 _value = self._value.get(name)
6635 cols.append("%s: %s" % (name, repr(_value)))
6636 return "%s[%s]" % (value, "; ".join(cols))
6638 def pps(self, decode_path=()):
6641 asn1_type_name=self.asn1_type_name,
6642 obj_name=self.__class__.__name__,
6643 decode_path=decode_path,
6644 optional=self.optional,
6645 default=self == self.default,
6646 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
6647 expl=None if self._expl is None else tag_decode(self._expl),
6652 expl_offset=self.expl_offset if self.expled else None,
6653 expl_tlen=self.expl_tlen if self.expled else None,
6654 expl_llen=self.expl_llen if self.expled else None,
6655 expl_vlen=self.expl_vlen if self.expled else None,
6656 expl_lenindef=self.expl_lenindef,
6657 lenindef=self.lenindef,
6658 ber_encoded=self.ber_encoded,
6661 for name in self.specs:
6662 value = self._value.get(name)
6665 yield value.pps(decode_path=decode_path + (name,))
6666 for pp in self.pps_lenindef(decode_path):
6670 class Set(Sequence, SequenceEncode1stMixin):
6671 """``SET`` structure type
6673 Its usage is identical to :py:class:`pyderasn.Sequence`.
6675 .. _allow_unordered_set_ctx:
6677 DER prohibits unordered values encoding and will raise an error
6678 during decode. If :ref:`bered <bered_ctx>` context option is set,
6679 then no error will occur. Also you can disable strict values
6680 ordering check by setting ``"allow_unordered_set": True``
6681 :ref:`context <ctx>` option.
6684 tag_default = tag_encode(form=TagFormConstructed, num=17)
6685 asn1_type_name = "SET"
6687 def _values_for_encoding(self):
6688 return sorted(super()._values_for_encoding(), key=attrgetter("tag_order"))
6690 def _encode_cer(self, writer):
6691 write_full(writer, self.tag + LENINDEF)
6693 super()._values_for_encoding(),
6694 key=attrgetter("tag_order_cer"),
6696 v.encode_cer(writer)
6697 write_full(writer, EOC)
6699 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
6701 t, tlen, lv = tag_strip(tlv)
6702 except DecodeError as err:
6703 raise err.__class__(
6705 klass=self.__class__,
6706 decode_path=decode_path,
6711 klass=self.__class__,
6712 decode_path=decode_path,
6719 ctx_bered = ctx.get("bered", False)
6721 l, llen, v = len_decode(lv)
6722 except LenIndefForm as err:
6724 raise err.__class__(
6726 klass=self.__class__,
6727 decode_path=decode_path,
6730 l, llen, v = 0, 1, lv[1:]
6732 except DecodeError as err:
6733 raise err.__class__(
6735 klass=self.__class__,
6736 decode_path=decode_path,
6740 raise NotEnoughData(
6741 "encoded length is longer than data",
6742 klass=self.__class__,
6746 v, tail = v[:l], v[l:]
6748 sub_offset = offset + tlen + llen
6751 ctx_allow_default_values = ctx.get("allow_default_values", False)
6752 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
6753 tag_order_prev = (0, 0)
6754 _specs_items = copy(self.specs)
6757 if lenindef and v[:EOC_LEN].tobytes() == EOC:
6759 for name, spec in _specs_items.items():
6760 sub_decode_path = decode_path + (name,)
6766 decode_path=sub_decode_path,
6769 _ctx_immutable=False,
6776 klass=self.__class__,
6777 decode_path=decode_path,
6780 spec_defaulted = spec.default is not None
6781 if evgen_mode and not spec_defaulted:
6782 for _decode_path, value, v_tail in spec.decode_evgen(
6786 decode_path=sub_decode_path,
6788 _ctx_immutable=False,
6790 yield _decode_path, value, v_tail
6792 _, value, v_tail = next(spec.decode_evgen(
6796 decode_path=sub_decode_path,
6798 _ctx_immutable=False,
6801 value_tag_order = value.tag_order
6802 value_len = value.fulllen
6803 if tag_order_prev >= value_tag_order:
6804 if ctx_bered or ctx_allow_unordered_set:
6808 "unordered " + self.asn1_type_name,
6809 klass=self.__class__,
6810 decode_path=sub_decode_path,
6815 yield sub_decode_path, value, v_tail
6816 if value != spec.default:
6818 elif ctx_bered or ctx_allow_default_values:
6822 "DEFAULT value met",
6823 klass=self.__class__,
6824 decode_path=sub_decode_path,
6827 values[name] = value
6828 del _specs_items[name]
6829 tag_order_prev = value_tag_order
6830 sub_offset += value_len
6834 obj = self.__class__(
6838 default=self.default,
6839 optional=self.optional,
6840 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
6843 if v[:EOC_LEN].tobytes() != EOC:
6846 klass=self.__class__,
6847 decode_path=decode_path,
6852 for name, spec in self.specs.items():
6853 if name not in values and not spec.optional:
6855 "%s value is not ready" % name,
6856 klass=self.__class__,
6857 decode_path=decode_path,
6862 obj.ber_encoded = ber_encoded
6863 yield decode_path, obj, tail
6866 SequenceOfState = namedtuple(
6868 BasicState._fields + ("spec", "value", "bound_min", "bound_max"),
6873 class SequenceOf(SequenceEncode1stMixin, Obj):
6874 """``SEQUENCE OF`` sequence type
6876 For that kind of type you must specify the object it will carry on
6877 (bounds are for example here, not required)::
6879 class Ints(SequenceOf):
6884 >>> ints.append(Integer(123))
6885 >>> ints.append(Integer(234))
6887 Ints SEQUENCE OF[INTEGER 123, INTEGER 234]
6888 >>> [int(i) for i in ints]
6890 >>> ints.append(Integer(345))
6891 Traceback (most recent call last):
6892 pyderasn.BoundsError: unsatisfied bounds: 0 <= 3 <= 2
6895 >>> ints[1] = Integer(345)
6897 Ints SEQUENCE OF[INTEGER 123, INTEGER 345]
6899 You can initialize sequence with preinitialized values:
6901 >>> ints = Ints([Integer(123), Integer(234)])
6903 Also you can use iterator as a value:
6905 >>> ints = Ints(iter(Integer(i) for i in range(1000000)))
6907 And it won't be iterated until encoding process. Pay attention that
6908 bounds and required schema checks are done only during the encoding
6909 process in that case! After encode was called, then value is zeroed
6910 back to empty list and you have to set it again. That mode is useful
6911 mainly with CER encoding mode, where all objects from the iterable
6912 will be streamed to the buffer, without copying all of them to
6915 __slots__ = ("spec", "_bound_min", "_bound_max")
6916 tag_default = tag_encode(form=TagFormConstructed, num=16)
6917 asn1_type_name = "SEQUENCE OF"
6930 super().__init__(impl, expl, default, optional, _decoded)
6932 schema = getattr(self, "schema", None)
6934 raise ValueError("schema must be specified")
6936 self._bound_min, self._bound_max = getattr(
6940 ) if bounds is None else bounds
6942 if value is not None:
6943 self._value = self._value_sanitize(value)
6944 if default is not None:
6945 default_value = self._value_sanitize(default)
6946 default_obj = self.__class__(
6951 default_obj._value = default_value
6952 self.default = default_obj
6954 self._value = copy(default_obj._value)
6956 def _value_sanitize(self, value):
6958 if issubclass(value.__class__, SequenceOf):
6959 value = value._value
6960 elif hasattr(value, NEXT_ATTR_NAME):
6962 elif hasattr(value, "__iter__"):
6965 raise InvalidValueType((self.__class__, iter, "iterator"))
6967 if not self._bound_min <= len(value) <= self._bound_max:
6968 raise BoundsError(self._bound_min, len(value), self._bound_max)
6969 class_expected = self.spec.__class__
6971 if not isinstance(v, class_expected):
6972 raise InvalidValueType((class_expected,))
6977 if hasattr(self._value, NEXT_ATTR_NAME):
6979 if self._bound_min > 0 and len(self._value) == 0:
6981 return all(v.ready for v in self._value)
6985 if self.expl_lenindef or self.lenindef or self.ber_encoded:
6987 return any(v.bered for v in self._value)
6989 def __getstate__(self):
6990 if hasattr(self._value, NEXT_ATTR_NAME):
6991 raise ValueError("can not pickle SequenceOf with iterator")
6992 return SequenceOfState(
7006 [copy(v) for v in self._value],
7011 def __setstate__(self, state):
7012 super().__setstate__(state)
7013 self.spec = state.spec
7014 self._value = state.value
7015 self._bound_min = state.bound_min
7016 self._bound_max = state.bound_max
7018 def __eq__(self, their):
7019 if isinstance(their, self.__class__):
7021 self.spec == their.spec and
7022 self.tag == their.tag and
7023 self._expl == their._expl and
7024 self._value == their._value
7026 if hasattr(their, "__iter__"):
7027 return self._value == list(their)
7039 return self.__class__(
7043 (self._bound_min, self._bound_max)
7044 if bounds is None else bounds
7046 impl=self.tag if impl is None else impl,
7047 expl=self._expl if expl is None else expl,
7048 default=self.default if default is None else default,
7049 optional=self.optional if optional is None else optional,
7052 def __contains__(self, key):
7053 return key in self._value
7055 def append(self, value):
7056 if not isinstance(value, self.spec.__class__):
7057 raise InvalidValueType((self.spec.__class__,))
7058 if len(self._value) + 1 > self._bound_max:
7061 len(self._value) + 1,
7064 self._value.append(value)
7067 return iter(self._value)
7070 return len(self._value)
7072 def __setitem__(self, key, value):
7073 if not isinstance(value, self.spec.__class__):
7074 raise InvalidValueType((self.spec.__class__,))
7075 self._value[key] = self.spec(value=value)
7077 def __getitem__(self, key):
7078 return self._value[key]
7080 def _values_for_encoding(self):
7081 return iter(self._value)
7084 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7087 values_append = values.append
7088 class_expected = self.spec.__class__
7089 values_for_encoding = self._values_for_encoding()
7091 for v in values_for_encoding:
7092 if not isinstance(v, class_expected):
7093 raise InvalidValueType((class_expected,))
7094 values_append(v.encode())
7095 if not self._bound_min <= len(values) <= self._bound_max:
7096 raise BoundsError(self._bound_min, len(values), self._bound_max)
7097 value = b"".join(values)
7099 value = b"".join(v.encode() for v in self._values_for_encoding())
7100 return b"".join((self.tag, len_encode(len(value)), value))
7102 def _encode1st(self, state):
7103 state = super()._encode1st(state)
7104 if hasattr(self._value, NEXT_ATTR_NAME):
7108 def _encode2nd(self, writer, state_iter):
7109 write_full(writer, self.tag + len_encode(next(state_iter)))
7110 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7113 class_expected = self.spec.__class__
7114 values_for_encoding = self._values_for_encoding()
7116 for v in values_for_encoding:
7117 if not isinstance(v, class_expected):
7118 raise InvalidValueType((class_expected,))
7119 v.encode2nd(writer, state_iter)
7121 if not self._bound_min <= values_count <= self._bound_max:
7122 raise BoundsError(self._bound_min, values_count, self._bound_max)
7124 for v in self._values_for_encoding():
7125 v.encode2nd(writer, state_iter)
7127 def _encode_cer(self, writer):
7128 write_full(writer, self.tag + LENINDEF)
7129 iterator = hasattr(self._value, NEXT_ATTR_NAME)
7131 class_expected = self.spec.__class__
7133 values_for_encoding = self._values_for_encoding()
7135 for v in values_for_encoding:
7136 if not isinstance(v, class_expected):
7137 raise InvalidValueType((class_expected,))
7138 v.encode_cer(writer)
7140 if not self._bound_min <= values_count <= self._bound_max:
7141 raise BoundsError(self._bound_min, values_count, self._bound_max)
7143 for v in self._values_for_encoding():
7144 v.encode_cer(writer)
7145 write_full(writer, EOC)
7155 ordering_check=False,
7158 t, tlen, lv = tag_strip(tlv)
7159 except DecodeError as err:
7160 raise err.__class__(
7162 klass=self.__class__,
7163 decode_path=decode_path,
7168 klass=self.__class__,
7169 decode_path=decode_path,
7176 ctx_bered = ctx.get("bered", False)
7178 l, llen, v = len_decode(lv)
7179 except LenIndefForm as err:
7181 raise err.__class__(
7183 klass=self.__class__,
7184 decode_path=decode_path,
7187 l, llen, v = 0, 1, lv[1:]
7189 except DecodeError as err:
7190 raise err.__class__(
7192 klass=self.__class__,
7193 decode_path=decode_path,
7197 raise NotEnoughData(
7198 "encoded length is longer than data",
7199 klass=self.__class__,
7200 decode_path=decode_path,
7204 v, tail = v[:l], v[l:]
7206 sub_offset = offset + tlen + llen
7209 ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
7210 value_prev = memoryview(v[:0])
7214 if lenindef and v[:EOC_LEN].tobytes() == EOC:
7216 sub_decode_path = decode_path + (str(_value_count),)
7218 for _decode_path, value, v_tail in spec.decode_evgen(
7222 decode_path=sub_decode_path,
7224 _ctx_immutable=False,
7226 yield _decode_path, value, v_tail
7228 _, value, v_tail = next(spec.decode_evgen(
7232 decode_path=sub_decode_path,
7234 _ctx_immutable=False,
7237 value_len = value.fulllen
7239 if value_prev.tobytes() > v[:value_len].tobytes():
7240 if ctx_bered or ctx_allow_unordered_set:
7244 "unordered " + self.asn1_type_name,
7245 klass=self.__class__,
7246 decode_path=sub_decode_path,
7249 value_prev = v[:value_len]
7252 _value.append(value)
7253 sub_offset += value_len
7256 if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
7258 msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
7259 klass=self.__class__,
7260 decode_path=decode_path,
7264 obj = self.__class__(
7265 value=None if evgen_mode else _value,
7267 bounds=(self._bound_min, self._bound_max),
7270 default=self.default,
7271 optional=self.optional,
7272 _decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
7274 except BoundsError as err:
7277 klass=self.__class__,
7278 decode_path=decode_path,
7282 if v[:EOC_LEN].tobytes() != EOC:
7285 klass=self.__class__,
7286 decode_path=decode_path,
7291 obj.ber_encoded = ber_encoded
7292 yield decode_path, obj, tail
7296 pp_console_row(next(self.pps())),
7297 ", ".join(repr(v) for v in self._value),
7300 def pps(self, decode_path=()):
7303 asn1_type_name=self.asn1_type_name,
7304 obj_name=self.__class__.__name__,
7305 decode_path=decode_path,
7306 optional=self.optional,
7307 default=self == self.default,
7308 impl=None if self.tag == self.tag_default else tag_decode(self.tag),
7309 expl=None if self._expl is None else tag_decode(self._expl),
7314 expl_offset=self.expl_offset if self.expled else None,
7315 expl_tlen=self.expl_tlen if self.expled else None,
7316 expl_llen=self.expl_llen if self.expled else None,
7317 expl_vlen=self.expl_vlen if self.expled else None,
7318 expl_lenindef=self.expl_lenindef,
7319 lenindef=self.lenindef,
7320 ber_encoded=self.ber_encoded,
7323 for i, value in enumerate(self._value):
7324 yield value.pps(decode_path=decode_path + (str(i),))
7325 for pp in self.pps_lenindef(decode_path):
7329 class SetOf(SequenceOf):
7330 """``SET OF`` sequence type
7332 Its usage is identical to :py:class:`pyderasn.SequenceOf`.
7335 tag_default = tag_encode(form=TagFormConstructed, num=17)
7336 asn1_type_name = "SET OF"
7338 def _value_sanitize(self, value):
7339 value = super()._value_sanitize(value)
7340 if hasattr(value, NEXT_ATTR_NAME):
7342 "SetOf does not support iterator values, as no sense in them"
7347 v = b"".join(sorted(v.encode() for v in self._values_for_encoding()))
7348 return b"".join((self.tag, len_encode(len(v)), v))
7350 def _encode2nd(self, writer, state_iter):
7351 write_full(writer, self.tag + len_encode(next(state_iter)))
7353 for v in self._values_for_encoding():
7355 v.encode2nd(buf.write, state_iter)
7356 values.append(buf.getvalue())
7359 write_full(writer, v)
7361 def _encode_cer(self, writer):
7362 write_full(writer, self.tag + LENINDEF)
7363 for v in sorted(encode_cer(v) for v in self._values_for_encoding()):
7364 write_full(writer, v)
7365 write_full(writer, EOC)
7367 def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
7368 return super()._decode(
7375 ordering_check=True,
7379 def obj_by_path(pypath): # pragma: no cover
7380 """Import object specified as string Python path
7382 Modules must be separated from classes/functions with ``:``.
7384 >>> obj_by_path("foo.bar:Baz")
7385 <class 'foo.bar.Baz'>
7386 >>> obj_by_path("foo.bar:Baz.boo")
7387 <classmethod 'foo.bar.Baz.boo'>
7389 mod, objs = pypath.rsplit(":", 1)
7390 from importlib import import_module
7391 obj = import_module(mod)
7392 for obj_name in objs.split("."):
7393 obj = getattr(obj, obj_name)
7397 def generic_decoder(): # pragma: no cover
7398 # All of this below is a big hack with self references
7399 choice = PrimitiveTypes()
7400 choice.specs["SequenceOf"] = SequenceOf(schema=choice)
7401 choice.specs["SetOf"] = SetOf(schema=choice)
7403 choice.specs["SequenceOf%d" % i] = SequenceOf(
7407 choice.specs["Any"] = Any()
7409 # Class name equals to type name, to omit it from output
7410 class SEQUENCEOF(SequenceOf):
7418 with_decode_path=False,
7419 decode_path_only=(),
7422 def _pprint_pps(pps):
7424 if hasattr(pp, "_fields"):
7426 decode_path_only != () and
7427 pp.decode_path[:len(decode_path_only)] != decode_path_only
7430 if pp.asn1_type_name == Choice.asn1_type_name:
7432 pp_kwargs = pp._asdict()
7433 pp_kwargs["decode_path"] = pp.decode_path[:-1] + (">",)
7434 pp = _pp(**pp_kwargs)
7435 yield pp_console_row(
7440 with_colours=with_colours,
7441 with_decode_path=with_decode_path,
7442 decode_path_len_decrease=len(decode_path_only),
7444 for row in pp_console_blob(
7446 decode_path_len_decrease=len(decode_path_only),
7450 for row in _pprint_pps(pp):
7452 return "\n".join(_pprint_pps(obj.pps(decode_path)))
7453 return SEQUENCEOF(), pprint_any
7456 def ascii_visualize(ba):
7457 """Output only ASCII printable characters, like in hexdump -C
7459 Example output for given binary string (right part)::
7461 92 2b 39 20 65 91 e6 8e 95 93 1a 58 df 02 78 ea |.+9 e......X..x.|
7464 return "".join((chr(b) if 0x20 <= b <= 0x7E else ".") for b in ba)
7468 """Generate ``hexdump -C`` like output
7472 00000000 30 80 30 80 a0 80 02 01 02 00 00 02 14 54 a5 18 |0.0..........T..|
7473 00000010 69 ef 8b 3f 15 fd ea ad bd 47 e0 94 81 6b 06 6a |i..?.....G...k.j|
7475 Result of that function is a generator of lines, where each line is
7480 ["00000010 ", " 69", " ef", " 8b", " 3f", " 15", " fd", " ea", " ad ",
7481 " bd", " 47", " e0", " 94", " 81", " 6b", " 06", " 6a ",
7482 " |i..?.....G...k.j|"]
7486 hexed = hexenc(raw).upper()
7487 addr, cols = 0, ["%08x " % 0]
7488 for i in range(0, len(hexed), 2):
7489 if i != 0 and i // 2 % 8 == 0:
7491 if i != 0 and i // 2 % 16 == 0:
7492 cols.append(" |%s|" % ascii_visualize(bytes(raw[addr:addr + 16])))
7495 cols = ["%08x " % addr]
7496 cols.append(" " + hexed[i:i + 2])
7498 cols.append(" |%s|" % ascii_visualize(bytes(raw[addr:])))
7502 def browse(raw, obj, oid_maps=()):
7503 """Interactive browser
7505 :param bytes raw: binary data you decoded
7506 :param obj: decoded :py:class:`pyderasn.Obj`
7507 :param oid_maps: list of ``str(OID) <-> human readable string`` dictionaries.
7508 Its human readable form is printed when OID is met
7510 .. note:: `urwid <http://urwid.org/>`__ dependency required
7512 This browser is an interactive terminal application for browsing
7513 structures of your decoded ASN.1 objects. You can quit it with **q**
7514 key. It consists of three windows:
7517 View of ASN.1 elements hierarchy. You can navigate it using **Up**,
7518 **Down**, **PageUp**, **PageDown**, **Home**, **End** keys.
7519 **Left** key goes to constructed element above. **Plus**/**Minus**
7520 keys collapse/uncollapse constructed elements. **Space** toggles it
7522 window with various information about element. You can scroll it
7523 with **h**/**l** (down, up) (**H**/**L** for triple speed) keys
7525 window with raw data hexdump and highlighted current element's
7526 contents. It automatically focuses on element's data. You can
7527 scroll it with **j**/**k** (down, up) (**J**/**K** for triple
7528 speed) keys. If element has explicit tag, then it also will be
7529 highlighted with different colour
7531 Window's header contains current decode path and progress bars with
7532 position in *info* and *hexdump* windows.
7534 If you press **d**, then current element will be saved in the
7535 current directory under its decode path name (adding ".0", ".1", etc
7536 suffix if such file already exists). **D** will save it with explicit tag.
7538 You can also invoke it with ``--browse`` command line argument.
7540 from copy import deepcopy
7541 from os.path import exists as path_exists
7544 class TW(urwid.TreeWidget):
7545 def __init__(self, state, *args, **kwargs):
7547 self.scrolled = {"info": False, "hexdump": False}
7548 super().__init__(*args, **kwargs)
7551 pp = self.get_node().get_value()
7552 constructed = len(pp) > 1
7553 return (pp if hasattr(pp, "_fields") else pp[0]), constructed
7555 def _state_update(self):
7556 pp, _ = self._get_pp()
7557 self.state["decode_path"].set_text(
7558 ":".join(str(p) for p in pp.decode_path)
7560 lines = deepcopy(self.state["hexed"])
7562 def attr_set(i, attr):
7563 line = lines[i // 16]
7564 idx = 1 + (i - 16 * (i // 16))
7565 line[idx] = (attr, line[idx])
7567 if pp.expl_offset is not None:
7570 pp.expl_offset + pp.expl_tlen + pp.expl_llen,
7572 attr_set(i, "select-expl")
7573 for i in range(pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen):
7574 attr_set(i, "select-value")
7575 self.state["hexdump"]._set_body([urwid.Text(line) for line in lines])
7576 self.state["hexdump"].set_focus(pp.offset // 16)
7577 self.state["hexdump"].set_focus_valign("middle")
7578 self.state["hexdump_bar"].set_completion(
7579 (100 * pp.offset // 16) //
7580 len(self.state["hexdump"]._body.positions())
7584 [("header", "Name: "), pp.obj_name],
7585 [("header", "Type: "), pp.asn1_type_name],
7586 [("header", "Offset: "), "%d (0x%x)" % (pp.offset, pp.offset)],
7587 [("header", "[TLV]len: "), "%d/%d/%d" % (
7588 pp.tlen, pp.llen, pp.vlen,
7590 [("header", "TLVlen: "), "%d" % sum((
7591 pp.tlen, pp.llen, pp.vlen,
7593 [("header", "Slice: "), "[%d:%d]" % (
7594 pp.offset, pp.offset + pp.tlen + pp.llen + pp.vlen,
7598 lines.append([("warning", "LENINDEF")])
7600 lines.append([("warning", "BER encoded")])
7602 lines.append([("warning", "BERed")])
7603 if pp.expl is not None:
7604 lines.append([("header", "EXPLICIT")])
7605 klass, _, num = pp.expl
7606 lines.append([" Tag: %s%d" % (TagClassReprs[klass], num)])
7607 if pp.expl_offset is not None:
7608 lines.append([" Offset: %d" % pp.expl_offset])
7609 lines.append([" [TLV]len: %d/%d/%d" % (
7610 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7612 lines.append([" TLVlen: %d" % sum((
7613 pp.expl_tlen, pp.expl_llen, pp.expl_vlen,
7615 lines.append([" Slice: [%d:%d]" % (
7617 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen,
7619 if pp.impl is not None:
7620 klass, _, num = pp.impl
7622 ("header", "IMPLICIT: "), "%s%d" % (TagClassReprs[klass], num),
7625 lines.append(["OPTIONAL"])
7627 lines.append(["DEFAULT"])
7628 if len(pp.decode_path) > 0:
7629 ent = pp.decode_path[-1]
7630 if isinstance(ent, DecodePathDefBy):
7632 value = str(ent.defined_by)
7633 oid_name = find_oid_name(
7634 ent.defined_by.asn1_type_name, oid_maps, value,
7636 lines.append([("header", "DEFINED BY: "), "%s" % (
7637 value if oid_name is None
7638 else "%s (%s)" % (oid_name, value)
7641 if pp.value is not None:
7642 lines.append([("header", "Value: "), pp.value])
7644 len(oid_maps) > 0 and
7645 pp.asn1_type_name == ObjectIdentifier.asn1_type_name
7647 for oid_map in oid_maps:
7648 oid_name = oid_map.get(pp.value)
7649 if oid_name is not None:
7650 lines.append([("header", "Human: "), oid_name])
7652 if pp.asn1_type_name == Integer.asn1_type_name:
7654 ("header", "Decimal: "), "%d" % int(pp.obj),
7657 ("header", "Hexadecimal: "), colonize_hex(pp.obj.tohex()),
7659 if pp.blob.__class__ == bytes:
7660 blob = hexenc(pp.blob).upper()
7661 for i in range(0, len(blob), 32):
7662 lines.append([colonize_hex(blob[i:i + 32])])
7663 elif pp.blob.__class__ == tuple:
7664 lines.append([", ".join(pp.blob)])
7665 self.state["info"]._set_body([urwid.Text(line) for line in lines])
7666 self.state["info_bar"].set_completion(0)
7668 def selectable(self):
7669 if self.state["widget_current"] != self:
7670 self.state["widget_current"] = self
7671 self.scrolled["info"] = False
7672 self.scrolled["hexdump"] = False
7673 self._state_update()
7674 return super().selectable()
7676 def _get_display_text_without_offset(self):
7677 pp, constructed = self._get_pp()
7678 style = "constructed" if constructed else ""
7679 if len(pp.decode_path) == 0:
7680 return (style, pp.obj_name)
7681 if pp.asn1_type_name == "EOC":
7682 return ("eoc", "EOC")
7683 ent = pp.decode_path[-1]
7684 if isinstance(ent, DecodePathDefBy):
7685 value = str(ent.defined_by)
7686 oid_name = find_oid_name(
7687 ent.defined_by.asn1_type_name, oid_maps, value,
7689 return ("defby", "DEFBY:" + (
7690 value if oid_name is None else oid_name
7694 def get_display_text(self):
7695 pp, _ = self._get_pp()
7696 style, ent = self._get_display_text_without_offset()
7697 return [(style, ent), " [%d]" % pp.offset]
7699 def _scroll(self, what, step):
7700 self.state[what]._invalidate()
7701 pos = self.state[what].focus_position
7702 if not self.scrolled[what]:
7703 self.scrolled[what] = True
7705 pos = max(0, pos + step)
7706 pos = min(pos, len(self.state[what]._body.positions()) - 1)
7707 self.state[what].set_focus(pos)
7708 self.state[what].set_focus_valign("top")
7709 self.state[what + "_bar"].set_completion(
7710 (100 * pos) // len(self.state[what]._body.positions())
7713 def keypress(self, size, key):
7715 raise urwid.ExitMainLoop()
7718 self.expanded = not self.expanded
7719 self.update_expanded_icon()
7722 hexdump_steps = {"j": 1, "k": -1, "J": 5, "K": -5}
7723 if key in hexdump_steps:
7724 self._scroll("hexdump", hexdump_steps[key])
7727 info_steps = {"h": 1, "l": -1, "H": 5, "L": -5}
7728 if key in info_steps:
7729 self._scroll("info", info_steps[key])
7732 if key in ("d", "D"):
7733 pp, _ = self._get_pp()
7734 dp = ":".join(str(p) for p in pp.decode_path)
7735 dp = dp.replace(" ", "_")
7738 if key == "d" or pp.expl_offset is None:
7739 data = self.state["raw"][pp.offset:(
7740 pp.offset + pp.tlen + pp.llen + pp.vlen
7743 data = self.state["raw"][pp.expl_offset:(
7744 pp.expl_offset + pp.expl_tlen + pp.expl_llen + pp.expl_vlen
7748 def duplicate_path(dp, ctr):
7751 return "%s.%d" % (dp, ctr)
7754 if not path_exists(duplicate_path(dp, ctr)):
7757 dp = duplicate_path(dp, ctr)
7758 with open(dp, "wb") as fd:
7760 self.state["decode_path"].set_text(
7761 ("warning", "Saved to: " + dp)
7764 return super().keypress(size, key)
7766 class PN(urwid.ParentNode):
7767 def __init__(self, state, value, *args, **kwargs):
7769 if not hasattr(value, "_fields"):
7771 super().__init__(value, *args, **kwargs)
7773 def load_widget(self):
7774 return TW(self.state, self)
7776 def load_child_keys(self):
7777 value = self.get_value()
7778 if hasattr(value, "_fields"):
7780 return range(len(value[1:]))
7782 def load_child_node(self, key):
7785 self.get_value()[key + 1],
7788 depth=self.get_depth() + 1,
7791 class LabeledPG(urwid.ProgressBar):
7792 def __init__(self, label, *args, **kwargs):
7794 super().__init__(*args, **kwargs)
7797 return "%s: %s" % (self.label, super().get_text())
7799 WinHexdump = urwid.ListBox([urwid.Text("")])
7800 WinInfo = urwid.ListBox([urwid.Text("")])
7801 WinDecodePath = urwid.Text("", "center")
7802 WinInfoBar = LabeledPG("info", "pg-normal", "pg-complete")
7803 WinHexdumpBar = LabeledPG("hexdump", "pg-normal", "pg-complete")
7804 WinTree = urwid.TreeListBox(urwid.TreeWalker(PN(
7807 "hexed": list(hexdump(raw)),
7808 "widget_current": None,
7810 "info_bar": WinInfoBar,
7811 "hexdump": WinHexdump,
7812 "hexdump_bar": WinHexdumpBar,
7813 "decode_path": WinDecodePath,
7817 help_text = " ".join((
7819 "space:(un)collapse",
7820 "(pg)up/down/home/end:nav",
7821 "jkJK:hexdump hlHL:info",
7827 ("weight", 1, WinTree),
7828 ("weight", 2, urwid.Pile([
7829 urwid.LineBox(WinInfo),
7830 urwid.LineBox(WinHexdump),
7833 header=urwid.Columns([
7834 ("weight", 2, urwid.AttrWrap(WinDecodePath, "header")),
7835 ("weight", 1, WinInfoBar),
7836 ("weight", 1, WinHexdumpBar),
7838 footer=urwid.AttrWrap(urwid.Text(help_text), "help")
7841 ("header", "bold", ""),
7842 ("constructed", "bold", ""),
7843 ("help", "light magenta", ""),
7844 ("warning", "light red", ""),
7845 ("defby", "light red", ""),
7846 ("eoc", "dark red", ""),
7847 ("select-value", "light green", ""),
7848 ("select-expl", "light red", ""),
7849 ("pg-normal", "", "light blue"),
7850 ("pg-complete", "black", "yellow"),
7855 def main(): # pragma: no cover
7857 parser = argparse.ArgumentParser(description="PyDERASN ASN.1 BER/CER/DER decoder")
7858 parser.add_argument(
7862 help="Skip that number of bytes from the beginning",
7864 parser.add_argument(
7866 help="Python paths to dictionary with OIDs, comma separated",
7868 parser.add_argument(
7870 help="Python path to schema definition to use",
7872 parser.add_argument(
7873 "--defines-by-path",
7874 help="Python path to decoder's defines_by_path",
7876 parser.add_argument(
7878 action="store_true",
7879 help="Disallow BER encoding",
7881 parser.add_argument(
7882 "--print-decode-path",
7883 action="store_true",
7884 help="Print decode paths",
7886 parser.add_argument(
7887 "--decode-path-only",
7888 help="Print only specified decode path",
7890 parser.add_argument(
7892 action="store_true",
7893 help="Allow explicit tag out-of-bound",
7895 parser.add_argument(
7897 action="store_true",
7898 help="Turn on event generation mode",
7900 parser.add_argument(
7902 action="store_true",
7903 help="Start ASN.1 browser",
7905 parser.add_argument(
7907 type=argparse.FileType("rb"),
7908 help="Path to BER/CER/DER file you want to decode",
7910 args = parser.parse_args()
7912 raw = file_mmaped(args.RAWFile)[args.skip:]
7914 args.RAWFile.seek(args.skip)
7915 raw = memoryview(args.RAWFile.read())
7916 args.RAWFile.close()
7918 [obj_by_path(_path) for _path in (args.oids or "").split(",")]
7919 if args.oids else ()
7921 from functools import partial
7923 schema = obj_by_path(args.schema)
7924 pprinter = partial(pprint, big_blobs=True)
7926 schema, pprinter = generic_decoder()
7928 "bered": not args.nobered,
7929 "allow_expl_oob": args.allow_expl_oob,
7931 if args.defines_by_path is not None:
7932 ctx["defines_by_path"] = obj_by_path(args.defines_by_path)
7934 obj, _ = schema().decode(raw, ctx=ctx)
7935 browse(raw, obj, oid_maps)
7936 from sys import exit as sys_exit
7938 from os import environ
7942 with_colours=environ.get("NO_COLOR") is None,
7943 with_decode_path=args.print_decode_path,
7945 () if args.decode_path_only is None else
7946 tuple(args.decode_path_only.split(":"))
7950 for decode_path, obj, tail in schema().decode_evgen(raw, ctx=ctx):
7951 print(pprinter(obj, decode_path=decode_path))
7953 obj, tail = schema().decode(raw, ctx=ctx)
7954 print(pprinter(obj))
7956 print("\nTrailing data: %s" % hexenc(tail))
7959 if __name__ == "__main__":
7960 from pyderasn import *